Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
198 changes: 198 additions & 0 deletions src/microplex_us/pipelines/us.py
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,43 @@
"self_employed_pension_contributions_desired",
)

PUF_SUPPORT_CLONE_TOP_TAIL_ROUGH_AGI_CAP = 78_999_999.0
PUF_SUPPORT_CLONE_TOP_TAIL_ROUGH_AGI_VARIABLES: tuple[str, ...] = (
"employment_income",
"self_employment_income",
"taxable_interest_income",
"tax_exempt_interest_income",
"capital_gains",
"long_term_capital_gains",
"short_term_capital_gains",
"non_sch_d_capital_gains",
"dividend_income",
"ordinary_dividend_income",
"qualified_dividend_income",
"non_qualified_dividend_income",
"partnership_s_corp_income",
"rental_income",
"farm_income",
"ira_distributions",
"taxable_pension_income",
"total_pension_income",
"taxable_social_security",
"social_security",
)
PUF_SUPPORT_CLONE_TOP_TAIL_SCALE_VARIABLES: tuple[str, ...] = (
"capital_gains",
"long_term_capital_gains",
"short_term_capital_gains",
"non_sch_d_capital_gains",
"partnership_s_corp_income",
"dividend_income",
"qualified_dividend_income",
"non_qualified_dividend_income",
"ordinary_dividend_income",
"taxable_interest_income",
"tax_exempt_interest_income",
)

DEFAULT_ACA_TAKEUP_RATE = 0.672
DEFAULT_DC_PTC_TAKEUP_RATE = 0.32
DEFAULT_EARLY_HEAD_START_TAKEUP_RATE = 0.09
Expand Down Expand Up @@ -2089,6 +2126,15 @@ class USMicroplexBuildConfig:
puf_support_clone_cps_refresh_condition_variables: tuple[str, ...] = (
PUF_SUPPORT_CLONE_CPS_REFRESH_CONDITION_VARIABLES
)
puf_support_clone_top_tail_rough_agi_cap: float | None = (
PUF_SUPPORT_CLONE_TOP_TAIL_ROUGH_AGI_CAP
)
puf_support_clone_top_tail_rough_agi_variables: tuple[str, ...] = (
PUF_SUPPORT_CLONE_TOP_TAIL_ROUGH_AGI_VARIABLES
)
puf_support_clone_top_tail_scale_variables: tuple[str, ...] = (
PUF_SUPPORT_CLONE_TOP_TAIL_SCALE_VARIABLES
)
dependent_tax_leaf_soft_cap_multiplier: float | None = None
dependent_tax_leaf_soft_cap_base_variables: tuple[str, ...] = (
"employment_income",
Expand Down Expand Up @@ -5728,6 +5774,153 @@ def _reconcile_puf_support_clone_social_security(
] = total.loc[fallback_mask & age.lt(62)]
return subcomponents

def _puf_support_clone_top_tail_rough_agi(
self,
clone: pd.DataFrame,
) -> tuple[pd.Series, list[str]]:
"""Compute a nonredundant rough AGI proxy for PUF clone top-tail checks."""

configured = set(self.config.puf_support_clone_top_tail_rough_agi_variables)

def numeric(variable: str) -> pd.Series:
return (
pd.to_numeric(clone[variable], errors="coerce")
.replace([np.inf, -np.inf], np.nan)
.fillna(0.0)
)

components: list[pd.Series] = []
variables: list[str] = []

def add(variable: str) -> bool:
if variable not in configured or variable not in clone.columns:
return False
components.append(numeric(variable))
variables.append(variable)
return True

for variable in (
"employment_income",
"self_employment_income",
"taxable_interest_income",
"tax_exempt_interest_income",
"partnership_s_corp_income",
"rental_income",
"farm_income",
"ira_distributions",
):
add(variable)

if not add("capital_gains"):
add("long_term_capital_gains")
add("short_term_capital_gains")
add("non_sch_d_capital_gains")

if not add("dividend_income") and not add("ordinary_dividend_income"):
add("qualified_dividend_income")
add("non_qualified_dividend_income")

if not add("taxable_pension_income"):
add("total_pension_income")
if not add("taxable_social_security"):
add("social_security")

if not components:
return pd.Series(0.0, index=clone.index, dtype=float), []
return sum(components), variables

def _apply_puf_support_clone_top_tail_guard(
self,
clone: pd.DataFrame,
*,
integrated_variables: Iterable[str],
) -> tuple[pd.DataFrame, dict[str, Any]]:
"""Avoid arbitrary state placement of unsupported PUF top-tail clones.

PUF has no state geography, so the CPS support clone inherits state from
its scaffold row. Until the top tail gets state-aware support records,
do not let a single imputed clone enter the open-ended SOI AGI count bin
and then receive a large calibrated state weight.
"""

cap = self.config.puf_support_clone_top_tail_rough_agi_cap
summary: dict[str, Any] = {
"enabled": cap is not None,
"cap": float(cap) if cap is not None else None,
"affected_rows": 0,
"rough_agi_variables": [],
"scaled_variables": [],
"scale_basis_variables": [],
"max_rough_agi_before": None,
"max_rough_agi_after": None,
}
if cap is None or cap <= 0.0 or clone.empty:
return clone, summary

rough_agi, rough_agi_variables = self._puf_support_clone_top_tail_rough_agi(
clone
)
if not rough_agi_variables:
return clone, summary

summary["rough_agi_variables"] = rough_agi_variables
summary["max_rough_agi_before"] = float(rough_agi.max())

over_cap = rough_agi > float(cap)
if not bool(over_cap.any()):
summary["max_rough_agi_after"] = summary["max_rough_agi_before"]
return clone, summary

integrated_set = set(integrated_variables)
scale_variables = [
variable
for variable in self.config.puf_support_clone_top_tail_scale_variables
if variable in clone.columns and variable in integrated_set
]
if not scale_variables:
summary["max_rough_agi_after"] = summary["max_rough_agi_before"]
return clone, summary
scale_basis_variables = [
variable for variable in scale_variables if variable in rough_agi_variables
]
if not scale_basis_variables:
summary["max_rough_agi_after"] = summary["max_rough_agi_before"]
return clone, summary

scale_frame = pd.DataFrame(
{
variable: pd.to_numeric(clone[variable], errors="coerce")
.replace([np.inf, -np.inf], np.nan)
.fillna(0.0)
.clip(lower=0.0)
for variable in scale_basis_variables
},
index=clone.index,
)
scalable = scale_frame.sum(axis=1)
nonscalable = rough_agi - scalable
desired_scalable = (float(cap) - nonscalable).clip(lower=0.0)
eligible = over_cap & scalable.gt(0.0)
if not bool(eligible.any()):
summary["max_rough_agi_after"] = summary["max_rough_agi_before"]
return clone, summary

scale = (desired_scalable[eligible] / scalable[eligible]).clip(
lower=0.0,
upper=1.0,
)
guarded = clone.copy()
for variable in scale_variables:
values = pd.to_numeric(guarded.loc[eligible, variable], errors="coerce")
guarded.loc[eligible, variable] = values.fillna(0.0).clip(lower=0.0) * scale

guarded_rough_agi, _ = self._puf_support_clone_top_tail_rough_agi(guarded)
summary["affected_rows"] = int(eligible.sum())
summary["scaled_variables"] = scale_variables
summary["scale_basis_variables"] = scale_basis_variables
summary["max_rough_agi_after"] = float(guarded_rough_agi.max())
return guarded, summary

def _finalize_puf_support_clone_frame(
self,
*,
Expand Down Expand Up @@ -5762,6 +5955,10 @@ def _finalize_puf_support_clone_frame(
integrated_variables=integrated_variables,
preclone_columns=preclone_columns,
)
clone, top_tail_guard_summary = self._apply_puf_support_clone_top_tail_guard(
clone,
integrated_variables=integrated_variables,
)

generated_entity_id_columns = sorted(
set(ENTITY_ID_COLUMNS.values()) & (set(clone.columns) - preclone_columns)
Expand Down Expand Up @@ -5820,6 +6017,7 @@ def _finalize_puf_support_clone_frame(
"donor_only_variables": donor_only_variables,
"both_halves_override_variables": sorted(both_halves_override),
"cps_only_refresh": cps_refresh_summary,
"top_tail_guard": top_tail_guard_summary,
"dropped_generated_entity_id_columns": generated_entity_id_columns,
"variable_surface": {
"ecps_imputed_variables": list(PUF_SUPPORT_CLONE_IMPUTED_VARIABLES),
Expand Down
117 changes: 117 additions & 0 deletions tests/pipelines/test_us.py
Original file line number Diff line number Diff line change
Expand Up @@ -4007,6 +4007,123 @@ def test_puf_support_clone_refresh_reconciles_social_security_subcomponents(
assert clone["social_security_disability"].tolist() == [12_000.0, 0.0, 0.0]
assert clone["social_security_retirement"].tolist() == [0.0, 8_000.0, 0.0]

def test_puf_support_clone_top_tail_guard_scales_investment_income(self):
pipeline = USMicroplexPipeline(
USMicroplexBuildConfig(
synthesis_backend="seed",
puf_support_clone_enabled=True,
puf_support_clone_top_tail_rough_agi_cap=78_999_999.0,
)
)
clone = pd.DataFrame(
{
"employment_income": [100_000.0, 125_000.0],
"capital_gains": [120_000_000.0, 15_000_000.0],
"long_term_capital_gains": [100_000_000.0, 10_000_000.0],
"qualified_dividend_income": [1_000_000.0, 0.0],
}
)

guarded, summary = pipeline._apply_puf_support_clone_top_tail_guard(
clone,
integrated_variables=[
"capital_gains",
"long_term_capital_gains",
"qualified_dividend_income",
],
)

rough_agi, rough_agi_variables = pipeline._puf_support_clone_top_tail_rough_agi(
guarded
)
assert rough_agi.iloc[0] == pytest.approx(78_999_999.0)
assert rough_agi.iloc[1] == pytest.approx(15_125_000.0)
assert guarded["employment_income"].tolist() == [100_000.0, 125_000.0]
assert guarded["capital_gains"].iloc[0] < clone["capital_gains"].iloc[0]
assert (
guarded["long_term_capital_gains"].iloc[0]
< clone["long_term_capital_gains"].iloc[0]
)
assert summary["affected_rows"] == 1
assert rough_agi_variables == [
"employment_income",
"capital_gains",
"qualified_dividend_income",
]
assert summary["scale_basis_variables"] == [
"capital_gains",
"qualified_dividend_income",
]
assert "long_term_capital_gains" in summary["scaled_variables"]

def test_puf_support_clone_top_tail_guard_avoids_redundant_income_totals(self):
pipeline = USMicroplexPipeline(
USMicroplexBuildConfig(
synthesis_backend="seed",
puf_support_clone_enabled=True,
puf_support_clone_top_tail_rough_agi_cap=78_999_999.0,
)
)
clone = pd.DataFrame(
{
"employment_income": [100_000.0],
"capital_gains": [60_000_000.0],
"long_term_capital_gains": [60_000_000.0],
"short_term_capital_gains": [0.0],
"ordinary_dividend_income": [5_000_000.0],
"qualified_dividend_income": [4_000_000.0],
"non_qualified_dividend_income": [1_000_000.0],
}
)

rough_agi, rough_agi_variables = pipeline._puf_support_clone_top_tail_rough_agi(
clone
)
guarded, summary = pipeline._apply_puf_support_clone_top_tail_guard(
clone,
integrated_variables=[
"capital_gains",
"long_term_capital_gains",
"short_term_capital_gains",
"ordinary_dividend_income",
"qualified_dividend_income",
"non_qualified_dividend_income",
],
)

assert rough_agi.iloc[0] == pytest.approx(65_100_000.0)
assert rough_agi_variables == [
"employment_income",
"capital_gains",
"ordinary_dividend_income",
]
pd.testing.assert_frame_equal(guarded, clone)
assert summary["affected_rows"] == 0

def test_puf_support_clone_top_tail_guard_can_be_disabled(self):
pipeline = USMicroplexPipeline(
USMicroplexBuildConfig(
synthesis_backend="seed",
puf_support_clone_enabled=True,
puf_support_clone_top_tail_rough_agi_cap=None,
)
)
clone = pd.DataFrame(
{
"employment_income": [100_000.0],
"capital_gains": [20_000_000.0],
"long_term_capital_gains": [100_000_000.0],
}
)

guarded, summary = pipeline._apply_puf_support_clone_top_tail_guard(
clone,
integrated_variables=["capital_gains", "long_term_capital_gains"],
)

pd.testing.assert_frame_equal(guarded, clone)
assert summary["enabled"] is False

def test_integrate_donor_sources_puf_support_clone_validates_scaffold_and_donor(
self,
):
Expand Down
Loading