diff --git a/src/microplex_us/pipelines/us.py b/src/microplex_us/pipelines/us.py index 2336778..c9a6da2 100644 --- a/src/microplex_us/pipelines/us.py +++ b/src/microplex_us/pipelines/us.py @@ -195,6 +195,89 @@ "self_employment_income_would_be_qualified", ) +PUF_SUPPORT_CLONE_CPS_REFRESH_CONDITION_VARIABLES: tuple[str, ...] = ( + "age", + "is_male", + "state_fips", + "tax_unit_is_joint", + "tax_unit_count_dependents", + "is_tax_unit_head", + "is_tax_unit_spouse", + "is_tax_unit_dependent", + "employment_income", + "self_employment_income", + "social_security", +) + +PUF_SUPPORT_CLONE_CPS_REFRESH_INCOME_VARIABLES: frozenset[str] = frozenset( + { + "employment_income", + "self_employment_income", + "social_security", + } +) + +PUF_SUPPORT_CLONE_CPS_REFRESH_VARIABLES: tuple[str, ...] = ( + "is_male", + "cps_race", + "is_hispanic", + "detailed_occupation_recode", + "treasury_tipped_occupation_code", + "is_disabled", + "difficulty_seeing", + "difficulty_hearing", + "difficulty_walking_or_climbing_stairs", + "difficulty_dressing_or_bathing", + "difficulty_doing_errands", + "difficulty_remembering_or_making_decisions", + "meets_ssi_disability_criteria", + "social_security_retirement", + "social_security_disability", + "social_security_survivors", + "social_security_dependents", + "disability_benefits", + "workers_compensation", + "unemployment_compensation", + "child_support_received", + "veterans_benefits", + "educational_assistance", + "financial_assistance", + "survivor_benefits", + "strike_benefits", + "receives_wic", + "receives_housing_assistance", + "spm_unit_energy_subsidy", + "spm_unit_pre_subsidy_childcare_expenses", + "employer_sponsored_insurance_premiums", + "health_insurance_premiums_without_medicare_part_b", + "other_health_insurance_premiums", + "over_the_counter_health_expenses", + "other_medical_expenses", + "child_support_expense", + "weekly_hours_worked", + "hours_worked", + "hours_worked_last_week", + "weekly_hours_worked_before_lsr", + "weeks_worked", + "hourly_wage", + "is_paid_hourly", + "is_union_member_or_covered", + "employment_income_last_year", + "self_employment_income_last_year", + "taxable_401k_distributions", + "tax_exempt_401k_distributions", + "taxable_403b_distributions", + "tax_exempt_403b_distributions", + "keogh_distributions", + "taxable_sep_distributions", + "tax_exempt_sep_distributions", + "traditional_401k_contributions_desired", + "roth_401k_contributions_desired", + "traditional_ira_contributions_desired", + "roth_ira_contributions_desired", + "self_employed_pension_contributions_desired", +) + DEFAULT_ACA_TAKEUP_RATE = 0.672 DEFAULT_DC_PTC_TAKEUP_RATE = 0.32 DEFAULT_EARLY_HEAD_START_TAKEUP_RATE = 0.09 @@ -1945,6 +2028,13 @@ class USMicroplexBuildConfig: puf_support_clone_both_halves_override_variables: tuple[str, ...] = ( PUF_SUPPORT_CLONE_OVERRIDDEN_VARIABLES ) + puf_support_clone_refresh_cps_only_fields: bool = True + puf_support_clone_cps_refresh_variables: tuple[str, ...] = ( + PUF_SUPPORT_CLONE_CPS_REFRESH_VARIABLES + ) + puf_support_clone_cps_refresh_condition_variables: tuple[str, ...] = ( + PUF_SUPPORT_CLONE_CPS_REFRESH_CONDITION_VARIABLES + ) dependent_tax_leaf_soft_cap_multiplier: float | None = None dependent_tax_leaf_soft_cap_base_variables: tuple[str, ...] = ( "employment_income", @@ -5440,6 +5530,149 @@ def _prepare_puf_support_clone_frame(self, original: pd.DataFrame) -> pd.DataFra clone[self.config.puf_support_clone_flag_column] = 1.0 return clone + def _refresh_puf_support_clone_cps_only_fields( + self, + *, + original: pd.DataFrame, + clone: pd.DataFrame, + integrated_variables: Iterable[str], + preclone_columns: set[str], + ) -> tuple[pd.DataFrame, dict[str, Any]]: + """Refresh copied CPS-only clone fields after PUF income is grafted on. + + PUF support clones start as literal CPS copies, then receive PUF tax and + income fields. Any remaining copied CPS-only fields can become + incoherent with the clone's new income surface. Re-match those fields + from CPS donors using demographic predictors plus PUF-imputed income. + """ + summary: dict[str, Any] = { + "enabled": bool(self.config.puf_support_clone_refresh_cps_only_fields), + "condition_variables": [], + "refreshed_variables": [], + "social_security_reconciled_variables": [], + "matched_source_row_count": 0, + } + if not self.config.puf_support_clone_refresh_cps_only_fields: + return clone, summary + if original.empty or clone.empty: + return clone, summary + + integrated_set = set(integrated_variables) + condition_vars = [ + variable + for variable in self.config.puf_support_clone_cps_refresh_condition_variables + if variable in original.columns + and variable in clone.columns + and pd.api.types.is_numeric_dtype(original[variable]) + and pd.api.types.is_numeric_dtype(clone[variable]) + and self._is_compatible_donor_condition( + clone[variable], + original[variable], + ) + ] + if not condition_vars: + return clone, summary + + refresh_variables = [ + variable + for variable in self.config.puf_support_clone_cps_refresh_variables + if variable in preclone_columns + and variable not in integrated_set + and variable in original.columns + and variable in clone.columns + ] + if not refresh_variables: + return clone, summary + + train = original.loc[:, condition_vars].apply( + lambda series: pd.to_numeric(series, errors="coerce").fillna(0.0) + ) + test = clone.loc[:, condition_vars].apply( + lambda series: pd.to_numeric(series, errors="coerce").fillna(0.0) + ) + for variable in ( + set(condition_vars) & PUF_SUPPORT_CLONE_CPS_REFRESH_INCOME_VARIABLES + ): + train[variable] = np.arcsinh(train[variable]) + test[variable] = np.arcsinh(test[variable]) + scale = train.std(ddof=0).replace(0.0, 1.0) + center = train.mean() + train_values = ((train - center) / scale).to_numpy(dtype=float) + test_values = ((test - center) / scale).to_numpy(dtype=float) + + from sklearn.neighbors import NearestNeighbors + + matcher = NearestNeighbors(n_neighbors=1) + matcher.fit(train_values) + matched = matcher.kneighbors(test_values, return_distance=False).reshape(-1) + + refreshed = clone.copy() + for variable in refresh_variables: + refreshed[variable] = original[variable].to_numpy(copy=True)[matched] + + reconciled_variables = self._reconcile_puf_support_clone_social_security( + refreshed + ) + summary["condition_variables"] = condition_vars + summary["refreshed_variables"] = refresh_variables + summary["social_security_reconciled_variables"] = reconciled_variables + summary["matched_source_row_count"] = int(np.unique(matched).size) + return refreshed, summary + + def _reconcile_puf_support_clone_social_security( + self, + clone: pd.DataFrame, + ) -> list[str]: + """Scale cloned Social Security components to the PUF-imputed total.""" + if "social_security" not in clone.columns: + return [] + subcomponents = [ + variable + for variable in ( + "social_security_retirement", + "social_security_disability", + "social_security_survivors", + "social_security_dependents", + ) + if variable in clone.columns + ] + if not subcomponents: + return [] + + total = pd.to_numeric(clone["social_security"], errors="coerce").fillna(0.0) + sub_values = { + variable: pd.to_numeric(clone[variable], errors="coerce").fillna(0.0) + for variable in subcomponents + } + sub_sum = sum(sub_values.values()) + positive_total = total.gt(0.0) + positive_sub_sum = sub_sum.gt(0.0) + scale_mask = positive_total & positive_sub_sum + zero_mask = ~positive_total + + for variable, values in sub_values.items(): + adjusted = values.copy() + adjusted.loc[zero_mask] = 0.0 + adjusted.loc[scale_mask] = ( + values.loc[scale_mask] * total.loc[scale_mask] / sub_sum.loc[scale_mask] + ) + clone[variable] = adjusted + + fallback_mask = positive_total & ~positive_sub_sum + if fallback_mask.any(): + age = pd.to_numeric(clone.get("age", 0.0), errors="coerce").fillna(0.0) + if "social_security_retirement" in subcomponents: + clone.loc[ + fallback_mask & age.ge(62), + "social_security_retirement", + ] = total.loc[fallback_mask & age.ge(62)] + if "social_security_disability" in subcomponents: + clone.loc[ + fallback_mask & age.lt(62), + "social_security_disability", + ] = total.loc[fallback_mask & age.lt(62)] + return subcomponents + def _finalize_puf_support_clone_frame( self, *, @@ -5468,6 +5701,13 @@ def _finalize_puf_support_clone_frame( if variable in original.columns and variable in clone.columns: original[variable] = clone[variable].to_numpy(copy=True) + clone, cps_refresh_summary = self._refresh_puf_support_clone_cps_only_fields( + original=original, + clone=clone, + integrated_variables=integrated_variables, + preclone_columns=preclone_columns, + ) + generated_entity_id_columns = sorted( set(ENTITY_ID_COLUMNS.values()) & (set(clone.columns) - preclone_columns) ) @@ -5524,6 +5764,7 @@ def _finalize_puf_support_clone_frame( "overlap_variables": overlap_variables, "donor_only_variables": donor_only_variables, "both_halves_override_variables": sorted(both_halves_override), + "cps_only_refresh": cps_refresh_summary, "dropped_generated_entity_id_columns": generated_entity_id_columns, "variable_surface": { "ecps_imputed_variables": list(PUF_SUPPORT_CLONE_IMPUTED_VARIABLES), diff --git a/tests/pipelines/test_us.py b/tests/pipelines/test_us.py index 77d2127..fcf6d62 100644 --- a/tests/pipelines/test_us.py +++ b/tests/pipelines/test_us.py @@ -3847,6 +3847,74 @@ def frame_for(name, households, persons, capabilities): assert generated_lengths[-1] == (("ssi_reported",), 4) assert "ssi_reported" in result.columns + def test_puf_support_clone_refresh_rematches_cps_only_disability_to_puf_income( + self, + ): + pipeline = USMicroplexPipeline( + USMicroplexBuildConfig( + synthesis_backend="seed", + puf_support_clone_enabled=True, + ) + ) + original = pd.DataFrame( + { + "person_id": [1, 2], + "household_id": [1, 2], + "age": [40, 40], + "is_male": [1, 1], + "state_fips": [6, 6], + "employment_income": [0.0, 100_000.0], + "self_employment_income": [0.0, 0.0], + "social_security": [0.0, 0.0], + "is_disabled": [1, 0], + "difficulty_hearing": [1, 0], + "meets_ssi_disability_criteria": [1, 0], + } + ) + clone = original.copy() + clone["employment_income"] = [100_000.0, 0.0] + + refreshed, summary = pipeline._refresh_puf_support_clone_cps_only_fields( + original=original, + clone=clone, + integrated_variables=["employment_income"], + preclone_columns=set(original.columns), + ) + + assert refreshed["is_disabled"].tolist() == [0, 1] + assert refreshed["difficulty_hearing"].tolist() == [0, 1] + assert refreshed["meets_ssi_disability_criteria"].tolist() == [0, 1] + assert "employment_income" in summary["condition_variables"] + assert summary["matched_source_row_count"] == 2 + assert "is_disabled" in summary["refreshed_variables"] + + def test_puf_support_clone_refresh_reconciles_social_security_subcomponents( + self, + ): + pipeline = USMicroplexPipeline( + USMicroplexBuildConfig( + synthesis_backend="seed", + puf_support_clone_enabled=True, + ) + ) + clone = pd.DataFrame( + { + "age": [45, 70, 40], + "social_security": [12_000.0, 8_000.0, 0.0], + "social_security_retirement": [0.0, 2_000.0, 100.0], + "social_security_disability": [3_000.0, 0.0, 50.0], + } + ) + + reconciled = pipeline._reconcile_puf_support_clone_social_security(clone) + + assert reconciled == [ + "social_security_retirement", + "social_security_disability", + ] + assert clone["social_security_disability"].tolist() == [12_000.0, 0.0, 0.0] + assert clone["social_security_retirement"].tolist() == [0.0, 8_000.0, 0.0] + def test_integrate_donor_sources_puf_support_clone_validates_scaffold_and_donor( self, ):