Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
241 changes: 241 additions & 0 deletions src/microplex_us/pipelines/us.py
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,89 @@
"self_employment_income_would_be_qualified",
)

PUF_SUPPORT_CLONE_CPS_REFRESH_CONDITION_VARIABLES: tuple[str, ...] = (
"age",
"is_male",
"state_fips",
"tax_unit_is_joint",
"tax_unit_count_dependents",
"is_tax_unit_head",
"is_tax_unit_spouse",
"is_tax_unit_dependent",
"employment_income",
"self_employment_income",
"social_security",
)

PUF_SUPPORT_CLONE_CPS_REFRESH_INCOME_VARIABLES: frozenset[str] = frozenset(
{
"employment_income",
"self_employment_income",
"social_security",
}
)

PUF_SUPPORT_CLONE_CPS_REFRESH_VARIABLES: tuple[str, ...] = (
"is_male",
"cps_race",
"is_hispanic",
"detailed_occupation_recode",
"treasury_tipped_occupation_code",
"is_disabled",
"difficulty_seeing",
"difficulty_hearing",
"difficulty_walking_or_climbing_stairs",
"difficulty_dressing_or_bathing",
"difficulty_doing_errands",
"difficulty_remembering_or_making_decisions",
"meets_ssi_disability_criteria",
"social_security_retirement",
"social_security_disability",
"social_security_survivors",
"social_security_dependents",
"disability_benefits",
"workers_compensation",
"unemployment_compensation",
"child_support_received",
"veterans_benefits",
"educational_assistance",
"financial_assistance",
"survivor_benefits",
"strike_benefits",
"receives_wic",
"receives_housing_assistance",
"spm_unit_energy_subsidy",
"spm_unit_pre_subsidy_childcare_expenses",
"employer_sponsored_insurance_premiums",
"health_insurance_premiums_without_medicare_part_b",
"other_health_insurance_premiums",
"over_the_counter_health_expenses",
"other_medical_expenses",
"child_support_expense",
"weekly_hours_worked",
"hours_worked",
"hours_worked_last_week",
"weekly_hours_worked_before_lsr",
"weeks_worked",
"hourly_wage",
"is_paid_hourly",
"is_union_member_or_covered",
"employment_income_last_year",
"self_employment_income_last_year",
"taxable_401k_distributions",
"tax_exempt_401k_distributions",
"taxable_403b_distributions",
"tax_exempt_403b_distributions",
"keogh_distributions",
"taxable_sep_distributions",
"tax_exempt_sep_distributions",
"traditional_401k_contributions_desired",
"roth_401k_contributions_desired",
"traditional_ira_contributions_desired",
"roth_ira_contributions_desired",
"self_employed_pension_contributions_desired",
)

DEFAULT_ACA_TAKEUP_RATE = 0.672
DEFAULT_DC_PTC_TAKEUP_RATE = 0.32
DEFAULT_EARLY_HEAD_START_TAKEUP_RATE = 0.09
Expand Down Expand Up @@ -1945,6 +2028,13 @@ class USMicroplexBuildConfig:
puf_support_clone_both_halves_override_variables: tuple[str, ...] = (
PUF_SUPPORT_CLONE_OVERRIDDEN_VARIABLES
)
puf_support_clone_refresh_cps_only_fields: bool = True
puf_support_clone_cps_refresh_variables: tuple[str, ...] = (
PUF_SUPPORT_CLONE_CPS_REFRESH_VARIABLES
)
puf_support_clone_cps_refresh_condition_variables: tuple[str, ...] = (
PUF_SUPPORT_CLONE_CPS_REFRESH_CONDITION_VARIABLES
)
dependent_tax_leaf_soft_cap_multiplier: float | None = None
dependent_tax_leaf_soft_cap_base_variables: tuple[str, ...] = (
"employment_income",
Expand Down Expand Up @@ -5440,6 +5530,149 @@ def _prepare_puf_support_clone_frame(self, original: pd.DataFrame) -> pd.DataFra
clone[self.config.puf_support_clone_flag_column] = 1.0
return clone

def _refresh_puf_support_clone_cps_only_fields(
self,
*,
original: pd.DataFrame,
clone: pd.DataFrame,
integrated_variables: Iterable[str],
preclone_columns: set[str],
) -> tuple[pd.DataFrame, dict[str, Any]]:
"""Refresh copied CPS-only clone fields after PUF income is grafted on.

PUF support clones start as literal CPS copies, then receive PUF tax and
income fields. Any remaining copied CPS-only fields can become
incoherent with the clone's new income surface. Re-match those fields
from CPS donors using demographic predictors plus PUF-imputed income.
"""
summary: dict[str, Any] = {
"enabled": bool(self.config.puf_support_clone_refresh_cps_only_fields),
"condition_variables": [],
"refreshed_variables": [],
"social_security_reconciled_variables": [],
"matched_source_row_count": 0,
}
if not self.config.puf_support_clone_refresh_cps_only_fields:
return clone, summary
if original.empty or clone.empty:
return clone, summary

integrated_set = set(integrated_variables)
condition_vars = [
variable
for variable in self.config.puf_support_clone_cps_refresh_condition_variables
if variable in original.columns
and variable in clone.columns
and pd.api.types.is_numeric_dtype(original[variable])
and pd.api.types.is_numeric_dtype(clone[variable])
and self._is_compatible_donor_condition(
clone[variable],
original[variable],
)
]
if not condition_vars:
return clone, summary

refresh_variables = [
variable
for variable in self.config.puf_support_clone_cps_refresh_variables
if variable in preclone_columns
and variable not in integrated_set
and variable in original.columns
and variable in clone.columns
]
if not refresh_variables:
return clone, summary

train = original.loc[:, condition_vars].apply(
lambda series: pd.to_numeric(series, errors="coerce").fillna(0.0)
)
test = clone.loc[:, condition_vars].apply(
lambda series: pd.to_numeric(series, errors="coerce").fillna(0.0)
)
for variable in (
set(condition_vars) & PUF_SUPPORT_CLONE_CPS_REFRESH_INCOME_VARIABLES
):
train[variable] = np.arcsinh(train[variable])
test[variable] = np.arcsinh(test[variable])
scale = train.std(ddof=0).replace(0.0, 1.0)
center = train.mean()
train_values = ((train - center) / scale).to_numpy(dtype=float)
test_values = ((test - center) / scale).to_numpy(dtype=float)

from sklearn.neighbors import NearestNeighbors

matcher = NearestNeighbors(n_neighbors=1)
matcher.fit(train_values)
matched = matcher.kneighbors(test_values, return_distance=False).reshape(-1)

refreshed = clone.copy()
for variable in refresh_variables:
refreshed[variable] = original[variable].to_numpy(copy=True)[matched]

reconciled_variables = self._reconcile_puf_support_clone_social_security(
refreshed
)
summary["condition_variables"] = condition_vars
summary["refreshed_variables"] = refresh_variables
summary["social_security_reconciled_variables"] = reconciled_variables
summary["matched_source_row_count"] = int(np.unique(matched).size)
return refreshed, summary

def _reconcile_puf_support_clone_social_security(
self,
clone: pd.DataFrame,
) -> list[str]:
"""Scale cloned Social Security components to the PUF-imputed total."""
if "social_security" not in clone.columns:
return []
subcomponents = [
variable
for variable in (
"social_security_retirement",
"social_security_disability",
"social_security_survivors",
"social_security_dependents",
)
if variable in clone.columns
]
if not subcomponents:
return []

total = pd.to_numeric(clone["social_security"], errors="coerce").fillna(0.0)
sub_values = {
variable: pd.to_numeric(clone[variable], errors="coerce").fillna(0.0)
for variable in subcomponents
}
sub_sum = sum(sub_values.values())
positive_total = total.gt(0.0)
positive_sub_sum = sub_sum.gt(0.0)
scale_mask = positive_total & positive_sub_sum
zero_mask = ~positive_total

for variable, values in sub_values.items():
adjusted = values.copy()
adjusted.loc[zero_mask] = 0.0
adjusted.loc[scale_mask] = (
values.loc[scale_mask] * total.loc[scale_mask] / sub_sum.loc[scale_mask]
)
clone[variable] = adjusted

fallback_mask = positive_total & ~positive_sub_sum
if fallback_mask.any():
age = pd.to_numeric(clone.get("age", 0.0), errors="coerce").fillna(0.0)
if "social_security_retirement" in subcomponents:
clone.loc[
fallback_mask & age.ge(62),
"social_security_retirement",
] = total.loc[fallback_mask & age.ge(62)]
if "social_security_disability" in subcomponents:
clone.loc[
fallback_mask & age.lt(62),
"social_security_disability",
] = total.loc[fallback_mask & age.lt(62)]
return subcomponents

def _finalize_puf_support_clone_frame(
self,
*,
Expand Down Expand Up @@ -5468,6 +5701,13 @@ def _finalize_puf_support_clone_frame(
if variable in original.columns and variable in clone.columns:
original[variable] = clone[variable].to_numpy(copy=True)

clone, cps_refresh_summary = self._refresh_puf_support_clone_cps_only_fields(
original=original,
clone=clone,
integrated_variables=integrated_variables,
preclone_columns=preclone_columns,
)

generated_entity_id_columns = sorted(
set(ENTITY_ID_COLUMNS.values()) & (set(clone.columns) - preclone_columns)
)
Expand Down Expand Up @@ -5524,6 +5764,7 @@ def _finalize_puf_support_clone_frame(
"overlap_variables": overlap_variables,
"donor_only_variables": donor_only_variables,
"both_halves_override_variables": sorted(both_halves_override),
"cps_only_refresh": cps_refresh_summary,
"dropped_generated_entity_id_columns": generated_entity_id_columns,
"variable_surface": {
"ecps_imputed_variables": list(PUF_SUPPORT_CLONE_IMPUTED_VARIABLES),
Expand Down
68 changes: 68 additions & 0 deletions tests/pipelines/test_us.py
Original file line number Diff line number Diff line change
Expand Up @@ -3847,6 +3847,74 @@ def frame_for(name, households, persons, capabilities):
assert generated_lengths[-1] == (("ssi_reported",), 4)
assert "ssi_reported" in result.columns

def test_puf_support_clone_refresh_rematches_cps_only_disability_to_puf_income(
self,
):
pipeline = USMicroplexPipeline(
USMicroplexBuildConfig(
synthesis_backend="seed",
puf_support_clone_enabled=True,
)
)
original = pd.DataFrame(
{
"person_id": [1, 2],
"household_id": [1, 2],
"age": [40, 40],
"is_male": [1, 1],
"state_fips": [6, 6],
"employment_income": [0.0, 100_000.0],
"self_employment_income": [0.0, 0.0],
"social_security": [0.0, 0.0],
"is_disabled": [1, 0],
"difficulty_hearing": [1, 0],
"meets_ssi_disability_criteria": [1, 0],
}
)
clone = original.copy()
clone["employment_income"] = [100_000.0, 0.0]

refreshed, summary = pipeline._refresh_puf_support_clone_cps_only_fields(
original=original,
clone=clone,
integrated_variables=["employment_income"],
preclone_columns=set(original.columns),
)

assert refreshed["is_disabled"].tolist() == [0, 1]
assert refreshed["difficulty_hearing"].tolist() == [0, 1]
assert refreshed["meets_ssi_disability_criteria"].tolist() == [0, 1]
assert "employment_income" in summary["condition_variables"]
assert summary["matched_source_row_count"] == 2
assert "is_disabled" in summary["refreshed_variables"]

def test_puf_support_clone_refresh_reconciles_social_security_subcomponents(
self,
):
pipeline = USMicroplexPipeline(
USMicroplexBuildConfig(
synthesis_backend="seed",
puf_support_clone_enabled=True,
)
)
clone = pd.DataFrame(
{
"age": [45, 70, 40],
"social_security": [12_000.0, 8_000.0, 0.0],
"social_security_retirement": [0.0, 2_000.0, 100.0],
"social_security_disability": [3_000.0, 0.0, 50.0],
}
)

reconciled = pipeline._reconcile_puf_support_clone_social_security(clone)

assert reconciled == [
"social_security_retirement",
"social_security_disability",
]
assert clone["social_security_disability"].tolist() == [12_000.0, 0.0, 0.0]
assert clone["social_security_retirement"].tolist() == [0.0, 8_000.0, 0.0]

def test_integrate_donor_sources_puf_support_clone_validates_scaffold_and_donor(
self,
):
Expand Down
Loading