Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/1150.fixed.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix Enhanced CPS PUF-clone calibration by anchoring source household weights and excluding Forbes-scale PUF donors from clone training.
160 changes: 160 additions & 0 deletions policyengine_us_data/calibration/puf_impute.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,35 @@

PUF_SUBSAMPLE_TARGET = 20_000
PUF_TOP_PERCENTILE = 99.5
FORBES_SYNTHETIC_FINANCIAL_THRESHOLD = 250_000_000
PUF_METADATA_MISSING_TOP_TAIL_THRESHOLD = 10_000_000
FORBES_METADATA_MARKER_THRESHOLDS = (
("forbes_unit_id", 0),
("forbes_replicate_id", 0),
("forbes_rank", 1),
)
PUF_METADATA_MISSING_TOP_TAIL_VARIABLES = (
"adjusted_gross_income",
"qualified_dividend_income",
"non_qualified_dividend_income",
"taxable_interest_income",
"tax_exempt_interest_income",
"long_term_capital_gains",
"short_term_capital_gains",
"non_sch_d_capital_gains",
"long_term_capital_gains_on_collectibles",
"unrecaptured_section_1250_gain",
"partnership_s_corp_income",
"self_employment_income",
"sstb_self_employment_income",
"rental_income",
"farm_income",
"farm_rent_income",
"farm_operations_income",
"estate_income",
"charitable_cash_donations",
"charitable_non_cash_donations",
)

DEMOGRAPHIC_PREDICTORS = [
"age",
Expand Down Expand Up @@ -925,6 +954,7 @@ def _run_qrf_imputation(
puf_sim = Microsimulation(dataset=puf_dataset)

puf_agi = puf_sim.calculate("adjusted_gross_income", map_to="person").values
puf_data = puf_sim.dataset.load_dataset()

X_train_full = puf_sim.calculate_dataframe(
DEMOGRAPHIC_PREDICTORS + IMPUTED_VARIABLES
Expand All @@ -936,6 +966,63 @@ def _run_qrf_imputation(

del puf_sim

tax_unit_ids = _period_array(puf_data, "tax_unit_id", time_period)
has_forbes_metadata = _has_forbes_metadata(
puf_data,
time_period,
expected_length=0 if tax_unit_ids is None else len(tax_unit_ids),
)
forbes_person_mask = _forbes_person_training_mask(
puf_data,
time_period,
n_persons=len(puf_agi),
)
if has_forbes_metadata:
top_tail_threshold = FORBES_SYNTHETIC_FINANCIAL_THRESHOLD
top_tail_label = "Forbes"
low_weight_mask = np.ones_like(forbes_person_mask, dtype=bool)
else:
top_tail_threshold = PUF_METADATA_MISSING_TOP_TAIL_THRESHOLD
top_tail_label = "metadata-missing top-tail"
low_weight_mask = np.ones_like(forbes_person_mask, dtype=bool)

forbes_person_mask |= low_weight_mask & (puf_agi >= top_tail_threshold)
for frame in (X_train_full, X_train_override):
candidate_columns = (
IMPUTED_VARIABLES + OVERRIDDEN_IMPUTED_VARIABLES
if has_forbes_metadata
else PUF_METADATA_MISSING_TOP_TAIL_VARIABLES
)
financial_columns = [
column for column in candidate_columns if column in frame.columns
]
if financial_columns:
forbes_person_mask |= low_weight_mask & (
frame[financial_columns].abs().max(axis=1).to_numpy()
>= top_tail_threshold
)
if len(forbes_person_mask) == len(puf_agi) and forbes_person_mask.any():
if len(X_train_full) != len(forbes_person_mask) or len(X_train_override) != len(
forbes_person_mask
):
logger.warning(
"Skipping Forbes donor exclusion because QRF training "
"frames do not match person-level PUF metadata lengths"
)
else:
logger.info(
"Excluding %d %s person records from PUF QRF training at threshold $%s",
int(forbes_person_mask.sum()),
top_tail_label,
f"{top_tail_threshold:,.0f}",
)
non_forbes_mask = ~forbes_person_mask
puf_agi = puf_agi[non_forbes_mask]
X_train_full = X_train_full.loc[non_forbes_mask].reset_index(drop=True)
X_train_override = X_train_override.loc[non_forbes_mask].reset_index(
drop=True
)

sub_idx = _stratified_subsample_index(puf_agi)
_log_stratified_subsample(
len(puf_agi),
Expand Down Expand Up @@ -975,6 +1062,79 @@ def _run_qrf_imputation(
return y_full, y_override


def _period_array(
data: Dict[str, Dict[int, np.ndarray]],
variable: str,
time_period: int,
) -> Optional[np.ndarray]:
if variable not in data:
return None
values = data[variable]
if isinstance(values, dict):
values = values.get(time_period, values.get(str(time_period)))
if values is None:
return None
return np.asarray(values)


def _has_forbes_metadata(
puf_data: Dict[str, Dict[int, np.ndarray]],
time_period: int,
expected_length: int,
) -> bool:
"""Return whether usable Forbes synthetic-record metadata is present."""
if expected_length <= 0:
return False
for variable, marker_threshold in FORBES_METADATA_MARKER_THRESHOLDS:
values = _period_array(puf_data, variable, time_period)
if values is None or len(values) != expected_length:
continue
values = np.asarray(values, dtype=float)
if np.any(values >= marker_threshold):
return True
return False


def _forbes_person_training_mask(
puf_data: Dict[str, Dict[int, np.ndarray]],
time_period: int,
n_persons: int,
) -> np.ndarray:
"""Return person-level mask for synthetic Forbes top-tail PUF records."""
tax_unit_id = _period_array(puf_data, "tax_unit_id", time_period)
person_tax_unit_id = _period_array(puf_data, "person_tax_unit_id", time_period)
if tax_unit_id is None or person_tax_unit_id is None:
return np.zeros(n_persons, dtype=bool)
if len(person_tax_unit_id) != n_persons:
return np.zeros(n_persons, dtype=bool)

tax_unit_forbes = np.zeros(len(tax_unit_id), dtype=bool)
for variable, default_threshold in FORBES_METADATA_MARKER_THRESHOLDS:
values = _period_array(puf_data, variable, time_period)
if values is None or len(values) != len(tax_unit_id):
continue
values = np.asarray(values, dtype=float)
if default_threshold == 0:
tax_unit_forbes |= values >= 0
else:
tax_unit_forbes |= values >= default_threshold

if not tax_unit_forbes.any():
return np.zeros(n_persons, dtype=bool)

sorted_index = np.argsort(tax_unit_id)
sorted_tax_unit_id = tax_unit_id[sorted_index]
sorted_tax_unit_forbes = tax_unit_forbes[sorted_index]

positions = np.searchsorted(sorted_tax_unit_id, person_tax_unit_id)
valid = positions < len(sorted_tax_unit_id)
person_mask = np.zeros(n_persons, dtype=bool)
valid_positions = positions[valid]
valid[valid] = sorted_tax_unit_id[valid_positions] == person_tax_unit_id[valid]
person_mask[valid] = sorted_tax_unit_forbes[positions[valid]]
return person_mask


def _stratified_subsample_index(
income: np.ndarray,
target_n: int = PUF_SUBSAMPLE_TARGET,
Expand Down
Loading