From f7104fd31631ffdb65acbe4cb138ed16750c6a83 Mon Sep 17 00:00:00 2001 From: Max Ghenis Date: Mon, 1 Jun 2026 23:33:28 -0400 Subject: [PATCH] Guard PUF support clone top tail --- src/microplex_us/pipelines/us.py | 198 +++++++++++++++++++++++++++++++ tests/pipelines/test_us.py | 117 ++++++++++++++++++ 2 files changed, 315 insertions(+) diff --git a/src/microplex_us/pipelines/us.py b/src/microplex_us/pipelines/us.py index 5c04f40..962968f 100644 --- a/src/microplex_us/pipelines/us.py +++ b/src/microplex_us/pipelines/us.py @@ -278,6 +278,43 @@ "self_employed_pension_contributions_desired", ) +PUF_SUPPORT_CLONE_TOP_TAIL_ROUGH_AGI_CAP = 78_999_999.0 +PUF_SUPPORT_CLONE_TOP_TAIL_ROUGH_AGI_VARIABLES: tuple[str, ...] = ( + "employment_income", + "self_employment_income", + "taxable_interest_income", + "tax_exempt_interest_income", + "capital_gains", + "long_term_capital_gains", + "short_term_capital_gains", + "non_sch_d_capital_gains", + "dividend_income", + "ordinary_dividend_income", + "qualified_dividend_income", + "non_qualified_dividend_income", + "partnership_s_corp_income", + "rental_income", + "farm_income", + "ira_distributions", + "taxable_pension_income", + "total_pension_income", + "taxable_social_security", + "social_security", +) +PUF_SUPPORT_CLONE_TOP_TAIL_SCALE_VARIABLES: tuple[str, ...] = ( + "capital_gains", + "long_term_capital_gains", + "short_term_capital_gains", + "non_sch_d_capital_gains", + "partnership_s_corp_income", + "dividend_income", + "qualified_dividend_income", + "non_qualified_dividend_income", + "ordinary_dividend_income", + "taxable_interest_income", + "tax_exempt_interest_income", +) + DEFAULT_ACA_TAKEUP_RATE = 0.672 DEFAULT_DC_PTC_TAKEUP_RATE = 0.32 DEFAULT_EARLY_HEAD_START_TAKEUP_RATE = 0.09 @@ -2089,6 +2126,15 @@ class USMicroplexBuildConfig: puf_support_clone_cps_refresh_condition_variables: tuple[str, ...] = ( PUF_SUPPORT_CLONE_CPS_REFRESH_CONDITION_VARIABLES ) + puf_support_clone_top_tail_rough_agi_cap: float | None = ( + PUF_SUPPORT_CLONE_TOP_TAIL_ROUGH_AGI_CAP + ) + puf_support_clone_top_tail_rough_agi_variables: tuple[str, ...] = ( + PUF_SUPPORT_CLONE_TOP_TAIL_ROUGH_AGI_VARIABLES + ) + puf_support_clone_top_tail_scale_variables: tuple[str, ...] = ( + PUF_SUPPORT_CLONE_TOP_TAIL_SCALE_VARIABLES + ) dependent_tax_leaf_soft_cap_multiplier: float | None = None dependent_tax_leaf_soft_cap_base_variables: tuple[str, ...] = ( "employment_income", @@ -5728,6 +5774,153 @@ def _reconcile_puf_support_clone_social_security( ] = total.loc[fallback_mask & age.lt(62)] return subcomponents + def _puf_support_clone_top_tail_rough_agi( + self, + clone: pd.DataFrame, + ) -> tuple[pd.Series, list[str]]: + """Compute a nonredundant rough AGI proxy for PUF clone top-tail checks.""" + + configured = set(self.config.puf_support_clone_top_tail_rough_agi_variables) + + def numeric(variable: str) -> pd.Series: + return ( + pd.to_numeric(clone[variable], errors="coerce") + .replace([np.inf, -np.inf], np.nan) + .fillna(0.0) + ) + + components: list[pd.Series] = [] + variables: list[str] = [] + + def add(variable: str) -> bool: + if variable not in configured or variable not in clone.columns: + return False + components.append(numeric(variable)) + variables.append(variable) + return True + + for variable in ( + "employment_income", + "self_employment_income", + "taxable_interest_income", + "tax_exempt_interest_income", + "partnership_s_corp_income", + "rental_income", + "farm_income", + "ira_distributions", + ): + add(variable) + + if not add("capital_gains"): + add("long_term_capital_gains") + add("short_term_capital_gains") + add("non_sch_d_capital_gains") + + if not add("dividend_income") and not add("ordinary_dividend_income"): + add("qualified_dividend_income") + add("non_qualified_dividend_income") + + if not add("taxable_pension_income"): + add("total_pension_income") + if not add("taxable_social_security"): + add("social_security") + + if not components: + return pd.Series(0.0, index=clone.index, dtype=float), [] + return sum(components), variables + + def _apply_puf_support_clone_top_tail_guard( + self, + clone: pd.DataFrame, + *, + integrated_variables: Iterable[str], + ) -> tuple[pd.DataFrame, dict[str, Any]]: + """Avoid arbitrary state placement of unsupported PUF top-tail clones. + + PUF has no state geography, so the CPS support clone inherits state from + its scaffold row. Until the top tail gets state-aware support records, + do not let a single imputed clone enter the open-ended SOI AGI count bin + and then receive a large calibrated state weight. + """ + + cap = self.config.puf_support_clone_top_tail_rough_agi_cap + summary: dict[str, Any] = { + "enabled": cap is not None, + "cap": float(cap) if cap is not None else None, + "affected_rows": 0, + "rough_agi_variables": [], + "scaled_variables": [], + "scale_basis_variables": [], + "max_rough_agi_before": None, + "max_rough_agi_after": None, + } + if cap is None or cap <= 0.0 or clone.empty: + return clone, summary + + rough_agi, rough_agi_variables = self._puf_support_clone_top_tail_rough_agi( + clone + ) + if not rough_agi_variables: + return clone, summary + + summary["rough_agi_variables"] = rough_agi_variables + summary["max_rough_agi_before"] = float(rough_agi.max()) + + over_cap = rough_agi > float(cap) + if not bool(over_cap.any()): + summary["max_rough_agi_after"] = summary["max_rough_agi_before"] + return clone, summary + + integrated_set = set(integrated_variables) + scale_variables = [ + variable + for variable in self.config.puf_support_clone_top_tail_scale_variables + if variable in clone.columns and variable in integrated_set + ] + if not scale_variables: + summary["max_rough_agi_after"] = summary["max_rough_agi_before"] + return clone, summary + scale_basis_variables = [ + variable for variable in scale_variables if variable in rough_agi_variables + ] + if not scale_basis_variables: + summary["max_rough_agi_after"] = summary["max_rough_agi_before"] + return clone, summary + + scale_frame = pd.DataFrame( + { + variable: pd.to_numeric(clone[variable], errors="coerce") + .replace([np.inf, -np.inf], np.nan) + .fillna(0.0) + .clip(lower=0.0) + for variable in scale_basis_variables + }, + index=clone.index, + ) + scalable = scale_frame.sum(axis=1) + nonscalable = rough_agi - scalable + desired_scalable = (float(cap) - nonscalable).clip(lower=0.0) + eligible = over_cap & scalable.gt(0.0) + if not bool(eligible.any()): + summary["max_rough_agi_after"] = summary["max_rough_agi_before"] + return clone, summary + + scale = (desired_scalable[eligible] / scalable[eligible]).clip( + lower=0.0, + upper=1.0, + ) + guarded = clone.copy() + for variable in scale_variables: + values = pd.to_numeric(guarded.loc[eligible, variable], errors="coerce") + guarded.loc[eligible, variable] = values.fillna(0.0).clip(lower=0.0) * scale + + guarded_rough_agi, _ = self._puf_support_clone_top_tail_rough_agi(guarded) + summary["affected_rows"] = int(eligible.sum()) + summary["scaled_variables"] = scale_variables + summary["scale_basis_variables"] = scale_basis_variables + summary["max_rough_agi_after"] = float(guarded_rough_agi.max()) + return guarded, summary + def _finalize_puf_support_clone_frame( self, *, @@ -5762,6 +5955,10 @@ def _finalize_puf_support_clone_frame( integrated_variables=integrated_variables, preclone_columns=preclone_columns, ) + clone, top_tail_guard_summary = self._apply_puf_support_clone_top_tail_guard( + clone, + integrated_variables=integrated_variables, + ) generated_entity_id_columns = sorted( set(ENTITY_ID_COLUMNS.values()) & (set(clone.columns) - preclone_columns) @@ -5820,6 +6017,7 @@ def _finalize_puf_support_clone_frame( "donor_only_variables": donor_only_variables, "both_halves_override_variables": sorted(both_halves_override), "cps_only_refresh": cps_refresh_summary, + "top_tail_guard": top_tail_guard_summary, "dropped_generated_entity_id_columns": generated_entity_id_columns, "variable_surface": { "ecps_imputed_variables": list(PUF_SUPPORT_CLONE_IMPUTED_VARIABLES), diff --git a/tests/pipelines/test_us.py b/tests/pipelines/test_us.py index 4fd184e..3d5caea 100644 --- a/tests/pipelines/test_us.py +++ b/tests/pipelines/test_us.py @@ -4007,6 +4007,123 @@ def test_puf_support_clone_refresh_reconciles_social_security_subcomponents( assert clone["social_security_disability"].tolist() == [12_000.0, 0.0, 0.0] assert clone["social_security_retirement"].tolist() == [0.0, 8_000.0, 0.0] + def test_puf_support_clone_top_tail_guard_scales_investment_income(self): + pipeline = USMicroplexPipeline( + USMicroplexBuildConfig( + synthesis_backend="seed", + puf_support_clone_enabled=True, + puf_support_clone_top_tail_rough_agi_cap=78_999_999.0, + ) + ) + clone = pd.DataFrame( + { + "employment_income": [100_000.0, 125_000.0], + "capital_gains": [120_000_000.0, 15_000_000.0], + "long_term_capital_gains": [100_000_000.0, 10_000_000.0], + "qualified_dividend_income": [1_000_000.0, 0.0], + } + ) + + guarded, summary = pipeline._apply_puf_support_clone_top_tail_guard( + clone, + integrated_variables=[ + "capital_gains", + "long_term_capital_gains", + "qualified_dividend_income", + ], + ) + + rough_agi, rough_agi_variables = pipeline._puf_support_clone_top_tail_rough_agi( + guarded + ) + assert rough_agi.iloc[0] == pytest.approx(78_999_999.0) + assert rough_agi.iloc[1] == pytest.approx(15_125_000.0) + assert guarded["employment_income"].tolist() == [100_000.0, 125_000.0] + assert guarded["capital_gains"].iloc[0] < clone["capital_gains"].iloc[0] + assert ( + guarded["long_term_capital_gains"].iloc[0] + < clone["long_term_capital_gains"].iloc[0] + ) + assert summary["affected_rows"] == 1 + assert rough_agi_variables == [ + "employment_income", + "capital_gains", + "qualified_dividend_income", + ] + assert summary["scale_basis_variables"] == [ + "capital_gains", + "qualified_dividend_income", + ] + assert "long_term_capital_gains" in summary["scaled_variables"] + + def test_puf_support_clone_top_tail_guard_avoids_redundant_income_totals(self): + pipeline = USMicroplexPipeline( + USMicroplexBuildConfig( + synthesis_backend="seed", + puf_support_clone_enabled=True, + puf_support_clone_top_tail_rough_agi_cap=78_999_999.0, + ) + ) + clone = pd.DataFrame( + { + "employment_income": [100_000.0], + "capital_gains": [60_000_000.0], + "long_term_capital_gains": [60_000_000.0], + "short_term_capital_gains": [0.0], + "ordinary_dividend_income": [5_000_000.0], + "qualified_dividend_income": [4_000_000.0], + "non_qualified_dividend_income": [1_000_000.0], + } + ) + + rough_agi, rough_agi_variables = pipeline._puf_support_clone_top_tail_rough_agi( + clone + ) + guarded, summary = pipeline._apply_puf_support_clone_top_tail_guard( + clone, + integrated_variables=[ + "capital_gains", + "long_term_capital_gains", + "short_term_capital_gains", + "ordinary_dividend_income", + "qualified_dividend_income", + "non_qualified_dividend_income", + ], + ) + + assert rough_agi.iloc[0] == pytest.approx(65_100_000.0) + assert rough_agi_variables == [ + "employment_income", + "capital_gains", + "ordinary_dividend_income", + ] + pd.testing.assert_frame_equal(guarded, clone) + assert summary["affected_rows"] == 0 + + def test_puf_support_clone_top_tail_guard_can_be_disabled(self): + pipeline = USMicroplexPipeline( + USMicroplexBuildConfig( + synthesis_backend="seed", + puf_support_clone_enabled=True, + puf_support_clone_top_tail_rough_agi_cap=None, + ) + ) + clone = pd.DataFrame( + { + "employment_income": [100_000.0], + "capital_gains": [20_000_000.0], + "long_term_capital_gains": [100_000_000.0], + } + ) + + guarded, summary = pipeline._apply_puf_support_clone_top_tail_guard( + clone, + integrated_variables=["capital_gains", "long_term_capital_gains"], + ) + + pd.testing.assert_frame_equal(guarded, clone) + assert summary["enabled"] is False + def test_integrate_donor_sources_puf_support_clone_validates_scaffold_and_donor( self, ):