Skip to content

Commit dc092a5

Browse files
authored
Refresh CPS-only fields on PUF support clones (#163)
1 parent a7f0bc1 commit dc092a5

2 files changed

Lines changed: 309 additions & 0 deletions

File tree

src/microplex_us/pipelines/us.py

Lines changed: 241 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -195,6 +195,89 @@
195195
"self_employment_income_would_be_qualified",
196196
)
197197

198+
PUF_SUPPORT_CLONE_CPS_REFRESH_CONDITION_VARIABLES: tuple[str, ...] = (
199+
"age",
200+
"is_male",
201+
"state_fips",
202+
"tax_unit_is_joint",
203+
"tax_unit_count_dependents",
204+
"is_tax_unit_head",
205+
"is_tax_unit_spouse",
206+
"is_tax_unit_dependent",
207+
"employment_income",
208+
"self_employment_income",
209+
"social_security",
210+
)
211+
212+
PUF_SUPPORT_CLONE_CPS_REFRESH_INCOME_VARIABLES: frozenset[str] = frozenset(
213+
{
214+
"employment_income",
215+
"self_employment_income",
216+
"social_security",
217+
}
218+
)
219+
220+
PUF_SUPPORT_CLONE_CPS_REFRESH_VARIABLES: tuple[str, ...] = (
221+
"is_male",
222+
"cps_race",
223+
"is_hispanic",
224+
"detailed_occupation_recode",
225+
"treasury_tipped_occupation_code",
226+
"is_disabled",
227+
"difficulty_seeing",
228+
"difficulty_hearing",
229+
"difficulty_walking_or_climbing_stairs",
230+
"difficulty_dressing_or_bathing",
231+
"difficulty_doing_errands",
232+
"difficulty_remembering_or_making_decisions",
233+
"meets_ssi_disability_criteria",
234+
"social_security_retirement",
235+
"social_security_disability",
236+
"social_security_survivors",
237+
"social_security_dependents",
238+
"disability_benefits",
239+
"workers_compensation",
240+
"unemployment_compensation",
241+
"child_support_received",
242+
"veterans_benefits",
243+
"educational_assistance",
244+
"financial_assistance",
245+
"survivor_benefits",
246+
"strike_benefits",
247+
"receives_wic",
248+
"receives_housing_assistance",
249+
"spm_unit_energy_subsidy",
250+
"spm_unit_pre_subsidy_childcare_expenses",
251+
"employer_sponsored_insurance_premiums",
252+
"health_insurance_premiums_without_medicare_part_b",
253+
"other_health_insurance_premiums",
254+
"over_the_counter_health_expenses",
255+
"other_medical_expenses",
256+
"child_support_expense",
257+
"weekly_hours_worked",
258+
"hours_worked",
259+
"hours_worked_last_week",
260+
"weekly_hours_worked_before_lsr",
261+
"weeks_worked",
262+
"hourly_wage",
263+
"is_paid_hourly",
264+
"is_union_member_or_covered",
265+
"employment_income_last_year",
266+
"self_employment_income_last_year",
267+
"taxable_401k_distributions",
268+
"tax_exempt_401k_distributions",
269+
"taxable_403b_distributions",
270+
"tax_exempt_403b_distributions",
271+
"keogh_distributions",
272+
"taxable_sep_distributions",
273+
"tax_exempt_sep_distributions",
274+
"traditional_401k_contributions_desired",
275+
"roth_401k_contributions_desired",
276+
"traditional_ira_contributions_desired",
277+
"roth_ira_contributions_desired",
278+
"self_employed_pension_contributions_desired",
279+
)
280+
198281
DEFAULT_ACA_TAKEUP_RATE = 0.672
199282
DEFAULT_DC_PTC_TAKEUP_RATE = 0.32
200283
DEFAULT_EARLY_HEAD_START_TAKEUP_RATE = 0.09
@@ -1999,6 +2082,13 @@ class USMicroplexBuildConfig:
19992082
puf_support_clone_both_halves_override_variables: tuple[str, ...] = (
20002083
PUF_SUPPORT_CLONE_OVERRIDDEN_VARIABLES
20012084
)
2085+
puf_support_clone_refresh_cps_only_fields: bool = True
2086+
puf_support_clone_cps_refresh_variables: tuple[str, ...] = (
2087+
PUF_SUPPORT_CLONE_CPS_REFRESH_VARIABLES
2088+
)
2089+
puf_support_clone_cps_refresh_condition_variables: tuple[str, ...] = (
2090+
PUF_SUPPORT_CLONE_CPS_REFRESH_CONDITION_VARIABLES
2091+
)
20022092
dependent_tax_leaf_soft_cap_multiplier: float | None = None
20032093
dependent_tax_leaf_soft_cap_base_variables: tuple[str, ...] = (
20042094
"employment_income",
@@ -5495,6 +5585,149 @@ def _prepare_puf_support_clone_frame(self, original: pd.DataFrame) -> pd.DataFra
54955585
clone[self.config.puf_support_clone_flag_column] = 1.0
54965586
return clone
54975587

5588+
def _refresh_puf_support_clone_cps_only_fields(
5589+
self,
5590+
*,
5591+
original: pd.DataFrame,
5592+
clone: pd.DataFrame,
5593+
integrated_variables: Iterable[str],
5594+
preclone_columns: set[str],
5595+
) -> tuple[pd.DataFrame, dict[str, Any]]:
5596+
"""Refresh copied CPS-only clone fields after PUF income is grafted on.
5597+
5598+
PUF support clones start as literal CPS copies, then receive PUF tax and
5599+
income fields. Any remaining copied CPS-only fields can become
5600+
incoherent with the clone's new income surface. Re-match those fields
5601+
from CPS donors using demographic predictors plus PUF-imputed income.
5602+
"""
5603+
summary: dict[str, Any] = {
5604+
"enabled": bool(self.config.puf_support_clone_refresh_cps_only_fields),
5605+
"condition_variables": [],
5606+
"refreshed_variables": [],
5607+
"social_security_reconciled_variables": [],
5608+
"matched_source_row_count": 0,
5609+
}
5610+
if not self.config.puf_support_clone_refresh_cps_only_fields:
5611+
return clone, summary
5612+
if original.empty or clone.empty:
5613+
return clone, summary
5614+
5615+
integrated_set = set(integrated_variables)
5616+
condition_vars = [
5617+
variable
5618+
for variable in self.config.puf_support_clone_cps_refresh_condition_variables
5619+
if variable in original.columns
5620+
and variable in clone.columns
5621+
and pd.api.types.is_numeric_dtype(original[variable])
5622+
and pd.api.types.is_numeric_dtype(clone[variable])
5623+
and self._is_compatible_donor_condition(
5624+
clone[variable],
5625+
original[variable],
5626+
)
5627+
]
5628+
if not condition_vars:
5629+
return clone, summary
5630+
5631+
refresh_variables = [
5632+
variable
5633+
for variable in self.config.puf_support_clone_cps_refresh_variables
5634+
if variable in preclone_columns
5635+
and variable not in integrated_set
5636+
and variable in original.columns
5637+
and variable in clone.columns
5638+
]
5639+
if not refresh_variables:
5640+
return clone, summary
5641+
5642+
train = original.loc[:, condition_vars].apply(
5643+
lambda series: pd.to_numeric(series, errors="coerce").fillna(0.0)
5644+
)
5645+
test = clone.loc[:, condition_vars].apply(
5646+
lambda series: pd.to_numeric(series, errors="coerce").fillna(0.0)
5647+
)
5648+
for variable in (
5649+
set(condition_vars) & PUF_SUPPORT_CLONE_CPS_REFRESH_INCOME_VARIABLES
5650+
):
5651+
train[variable] = np.arcsinh(train[variable])
5652+
test[variable] = np.arcsinh(test[variable])
5653+
scale = train.std(ddof=0).replace(0.0, 1.0)
5654+
center = train.mean()
5655+
train_values = ((train - center) / scale).to_numpy(dtype=float)
5656+
test_values = ((test - center) / scale).to_numpy(dtype=float)
5657+
5658+
from sklearn.neighbors import NearestNeighbors
5659+
5660+
matcher = NearestNeighbors(n_neighbors=1)
5661+
matcher.fit(train_values)
5662+
matched = matcher.kneighbors(test_values, return_distance=False).reshape(-1)
5663+
5664+
refreshed = clone.copy()
5665+
for variable in refresh_variables:
5666+
refreshed[variable] = original[variable].to_numpy(copy=True)[matched]
5667+
5668+
reconciled_variables = self._reconcile_puf_support_clone_social_security(
5669+
refreshed
5670+
)
5671+
summary["condition_variables"] = condition_vars
5672+
summary["refreshed_variables"] = refresh_variables
5673+
summary["social_security_reconciled_variables"] = reconciled_variables
5674+
summary["matched_source_row_count"] = int(np.unique(matched).size)
5675+
return refreshed, summary
5676+
5677+
def _reconcile_puf_support_clone_social_security(
5678+
self,
5679+
clone: pd.DataFrame,
5680+
) -> list[str]:
5681+
"""Scale cloned Social Security components to the PUF-imputed total."""
5682+
if "social_security" not in clone.columns:
5683+
return []
5684+
subcomponents = [
5685+
variable
5686+
for variable in (
5687+
"social_security_retirement",
5688+
"social_security_disability",
5689+
"social_security_survivors",
5690+
"social_security_dependents",
5691+
)
5692+
if variable in clone.columns
5693+
]
5694+
if not subcomponents:
5695+
return []
5696+
5697+
total = pd.to_numeric(clone["social_security"], errors="coerce").fillna(0.0)
5698+
sub_values = {
5699+
variable: pd.to_numeric(clone[variable], errors="coerce").fillna(0.0)
5700+
for variable in subcomponents
5701+
}
5702+
sub_sum = sum(sub_values.values())
5703+
positive_total = total.gt(0.0)
5704+
positive_sub_sum = sub_sum.gt(0.0)
5705+
scale_mask = positive_total & positive_sub_sum
5706+
zero_mask = ~positive_total
5707+
5708+
for variable, values in sub_values.items():
5709+
adjusted = values.copy()
5710+
adjusted.loc[zero_mask] = 0.0
5711+
adjusted.loc[scale_mask] = (
5712+
values.loc[scale_mask] * total.loc[scale_mask] / sub_sum.loc[scale_mask]
5713+
)
5714+
clone[variable] = adjusted
5715+
5716+
fallback_mask = positive_total & ~positive_sub_sum
5717+
if fallback_mask.any():
5718+
age = pd.to_numeric(clone.get("age", 0.0), errors="coerce").fillna(0.0)
5719+
if "social_security_retirement" in subcomponents:
5720+
clone.loc[
5721+
fallback_mask & age.ge(62),
5722+
"social_security_retirement",
5723+
] = total.loc[fallback_mask & age.ge(62)]
5724+
if "social_security_disability" in subcomponents:
5725+
clone.loc[
5726+
fallback_mask & age.lt(62),
5727+
"social_security_disability",
5728+
] = total.loc[fallback_mask & age.lt(62)]
5729+
return subcomponents
5730+
54985731
def _finalize_puf_support_clone_frame(
54995732
self,
55005733
*,
@@ -5523,6 +5756,13 @@ def _finalize_puf_support_clone_frame(
55235756
if variable in original.columns and variable in clone.columns:
55245757
original[variable] = clone[variable].to_numpy(copy=True)
55255758

5759+
clone, cps_refresh_summary = self._refresh_puf_support_clone_cps_only_fields(
5760+
original=original,
5761+
clone=clone,
5762+
integrated_variables=integrated_variables,
5763+
preclone_columns=preclone_columns,
5764+
)
5765+
55265766
generated_entity_id_columns = sorted(
55275767
set(ENTITY_ID_COLUMNS.values()) & (set(clone.columns) - preclone_columns)
55285768
)
@@ -5579,6 +5819,7 @@ def _finalize_puf_support_clone_frame(
55795819
"overlap_variables": overlap_variables,
55805820
"donor_only_variables": donor_only_variables,
55815821
"both_halves_override_variables": sorted(both_halves_override),
5822+
"cps_only_refresh": cps_refresh_summary,
55825823
"dropped_generated_entity_id_columns": generated_entity_id_columns,
55835824
"variable_surface": {
55845825
"ecps_imputed_variables": list(PUF_SUPPORT_CLONE_IMPUTED_VARIABLES),

tests/pipelines/test_us.py

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3939,6 +3939,74 @@ def frame_for(name, households, persons, capabilities):
39393939
assert generated_lengths[-1] == (("ssi_reported",), 4)
39403940
assert "ssi_reported" in result.columns
39413941

3942+
def test_puf_support_clone_refresh_rematches_cps_only_disability_to_puf_income(
3943+
self,
3944+
):
3945+
pipeline = USMicroplexPipeline(
3946+
USMicroplexBuildConfig(
3947+
synthesis_backend="seed",
3948+
puf_support_clone_enabled=True,
3949+
)
3950+
)
3951+
original = pd.DataFrame(
3952+
{
3953+
"person_id": [1, 2],
3954+
"household_id": [1, 2],
3955+
"age": [40, 40],
3956+
"is_male": [1, 1],
3957+
"state_fips": [6, 6],
3958+
"employment_income": [0.0, 100_000.0],
3959+
"self_employment_income": [0.0, 0.0],
3960+
"social_security": [0.0, 0.0],
3961+
"is_disabled": [1, 0],
3962+
"difficulty_hearing": [1, 0],
3963+
"meets_ssi_disability_criteria": [1, 0],
3964+
}
3965+
)
3966+
clone = original.copy()
3967+
clone["employment_income"] = [100_000.0, 0.0]
3968+
3969+
refreshed, summary = pipeline._refresh_puf_support_clone_cps_only_fields(
3970+
original=original,
3971+
clone=clone,
3972+
integrated_variables=["employment_income"],
3973+
preclone_columns=set(original.columns),
3974+
)
3975+
3976+
assert refreshed["is_disabled"].tolist() == [0, 1]
3977+
assert refreshed["difficulty_hearing"].tolist() == [0, 1]
3978+
assert refreshed["meets_ssi_disability_criteria"].tolist() == [0, 1]
3979+
assert "employment_income" in summary["condition_variables"]
3980+
assert summary["matched_source_row_count"] == 2
3981+
assert "is_disabled" in summary["refreshed_variables"]
3982+
3983+
def test_puf_support_clone_refresh_reconciles_social_security_subcomponents(
3984+
self,
3985+
):
3986+
pipeline = USMicroplexPipeline(
3987+
USMicroplexBuildConfig(
3988+
synthesis_backend="seed",
3989+
puf_support_clone_enabled=True,
3990+
)
3991+
)
3992+
clone = pd.DataFrame(
3993+
{
3994+
"age": [45, 70, 40],
3995+
"social_security": [12_000.0, 8_000.0, 0.0],
3996+
"social_security_retirement": [0.0, 2_000.0, 100.0],
3997+
"social_security_disability": [3_000.0, 0.0, 50.0],
3998+
}
3999+
)
4000+
4001+
reconciled = pipeline._reconcile_puf_support_clone_social_security(clone)
4002+
4003+
assert reconciled == [
4004+
"social_security_retirement",
4005+
"social_security_disability",
4006+
]
4007+
assert clone["social_security_disability"].tolist() == [12_000.0, 0.0, 0.0]
4008+
assert clone["social_security_retirement"].tolist() == [0.0, 8_000.0, 0.0]
4009+
39424010
def test_integrate_donor_sources_puf_support_clone_validates_scaffold_and_donor(
39434011
self,
39444012
):

0 commit comments

Comments
 (0)