Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 42 additions & 12 deletions src/microplex_us/pipelines/us.py
Original file line number Diff line number Diff line change
Expand Up @@ -10151,6 +10151,27 @@ def first_present(*columns: str) -> pd.Series:
)
return zero.copy()

def first_nonzero_or_present(*columns: str) -> pd.Series:
values = zero.copy()
found = False
for column in columns:
if column not in result.columns:
continue
candidate = (
pd.to_numeric(
result[column],
errors="coerce",
)
.fillna(0.0)
.astype(float)
)
if not found:
values = candidate.copy()
found = True
continue
values = values.where(values.ne(0.0), candidate)
return values if found else zero.copy()

def has_any(*columns: str) -> bool:
return any(column in result.columns for column in columns)

Expand Down Expand Up @@ -10272,14 +10293,17 @@ def has_any(*columns: str) -> bool:
result["takes_up_ssi_if_eligible"] = first_present("ssi").gt(0.0)

known_nonemployment = (
first_present("self_employment_income")
+ first_present("taxable_interest_income", "interest_income")
+ first_present("ordinary_dividend_income", "dividend_income")
first_nonzero_or_present(
"self_employment_income_before_lsr",
"self_employment_income",
)
+ first_nonzero_or_present("taxable_interest_income", "interest_income")
+ first_nonzero_or_present("ordinary_dividend_income", "dividend_income")
+ first_present("rental_income")
+ first_present("gross_social_security", "social_security")
+ first_present("ssi")
+ first_present("public_assistance")
+ first_present("taxable_pension_income", "pension_income")
+ first_nonzero_or_present("taxable_pension_income", "pension_income")
+ first_present("unemployment_compensation")
)
fallback_employment_income = (
Expand All @@ -10290,19 +10314,19 @@ def has_any(*columns: str) -> bool:
).clip(lower=0.0)

result["employment_income_before_lsr"] = (
first_present(
first_nonzero_or_present(
"employment_income_before_lsr", "employment_income", "wage_income"
)
if has_any(
"employment_income_before_lsr", "employment_income", "wage_income"
)
else fallback_employment_income
)
result["self_employment_income_before_lsr"] = first_present(
result["self_employment_income_before_lsr"] = first_nonzero_or_present(
"self_employment_income_before_lsr",
"self_employment_income",
)
result["taxable_interest_income"] = first_present(
result["taxable_interest_income"] = first_nonzero_or_present(
"taxable_interest_income",
"interest_income",
)
Expand All @@ -10315,17 +10339,21 @@ def has_any(*columns: str) -> bool:
result["non_qualified_dividend_income"] = first_present(
"non_qualified_dividend_income",
).clip(lower=0.0)
result["ordinary_dividend_income"] = first_present(
dividend_alias = first_nonzero_or_present(
"ordinary_dividend_income",
"dividend_income",
).clip(lower=0.0)
result["ordinary_dividend_income"] = dividend_alias
if has_any("qualified_dividend_income", "non_qualified_dividend_income"):
dividend_total = (
result["qualified_dividend_income"]
+ result["non_qualified_dividend_income"]
).clip(lower=0.0)
result["ordinary_dividend_income"] = dividend_total
result["dividend_income"] = dividend_total
result["ordinary_dividend_income"] = dividend_total.where(
dividend_total.ne(0.0),
dividend_alias,
)
result["dividend_income"] = result["ordinary_dividend_income"]
else:
result = normalize_dividend_columns(result)

Expand All @@ -10335,15 +10363,17 @@ def has_any(*columns: str) -> bool:
"capital_gains_distributions",
)
result["long_term_capital_gains_before_response"] = (
first_present(
first_nonzero_or_present(
"long_term_capital_gains_before_response",
"long_term_capital_gains",
"capital_gains",
)
if has_any(
"long_term_capital_gains_before_response",
"long_term_capital_gains",
"capital_gains",
)
else first_present("capital_gains")
else zero.copy()
)
result["partnership_s_corp_income"] = first_present("partnership_s_corp_income")
result["partnership_se_income"] = first_present("partnership_se_income")
Expand Down
17 changes: 11 additions & 6 deletions src/microplex_us/variables.py
Original file line number Diff line number Diff line change
Expand Up @@ -666,17 +666,22 @@ def normalize_dividend_columns(frame: pd.DataFrame) -> pd.DataFrame:
result = frame.copy()
qualified = _nonnegative_series(result, "qualified_dividend_income")
non_qualified = _nonnegative_series(result, "non_qualified_dividend_income")
total = (
_nonnegative_series(result, "ordinary_dividend_income")
if "ordinary_dividend_income" in result.columns
else _nonnegative_series(result, "dividend_income")
)
ordinary_total = _nonnegative_series(result, "ordinary_dividend_income")
dividend_total = _nonnegative_series(result, "dividend_income")
if "ordinary_dividend_income" in result.columns:
total = ordinary_total.where(ordinary_total.ne(0.0), dividend_total)
else:
total = dividend_total

has_qualified = "qualified_dividend_income" in result.columns
has_non_qualified = "non_qualified_dividend_income" in result.columns

if has_qualified and has_non_qualified:
normalized_total = qualified + non_qualified
component_total = qualified + non_qualified
total_only = component_total.eq(0.0) & total.gt(0.0)
non_qualified = non_qualified.where(~total_only, total)
component_total = qualified + non_qualified
normalized_total = component_total.where(component_total.ne(0.0), total)
elif has_qualified:
normalized_total = np.maximum(total.to_numpy(dtype=float), qualified.to_numpy(dtype=float))
non_qualified = pd.Series(
Expand Down
46 changes: 46 additions & 0 deletions tests/pipelines/test_us.py
Original file line number Diff line number Diff line change
Expand Up @@ -4919,6 +4919,52 @@ def test_augment_policyengine_person_inputs_materializes_agi_parity_inputs(self)
assert augmented["self_employed_health_insurance_ald"].tolist() == [15.0]
assert augmented["self_employed_pension_contribution_ald"].tolist() == [10.0]

def test_augment_policyengine_person_inputs_coalesces_sparse_source_aliases_by_row(
self,
):
pipeline = USMicroplexPipeline(USMicroplexBuildConfig())
persons = pd.DataFrame(
{
"age": [45, 50, 55],
"sex": [1, 2, 1],
"income": [60_000.0, 75_000.0, 0.0],
"employment_income_before_lsr": [0.0, 70_000.0, 0.0],
"wage_income": [50_000.0, 80_000.0, 0.0],
"self_employment_income_before_lsr": [0.0, 200.0, -300.0],
"self_employment_income": [500.0, 999.0, 50.0],
"taxable_interest_income": [0.0, 20.0, 0.0],
"interest_income": [100.0, 999.0, 0.0],
"ordinary_dividend_income": [0.0, 30.0, 0.0],
"dividend_income": [80.0, 999.0, 0.0],
"qualified_dividend_income": [0.0, 5.0, 0.0],
"non_qualified_dividend_income": [0.0, 25.0, 0.0],
"long_term_capital_gains_before_response": [0.0, 60.0, -10.0],
"long_term_capital_gains": [40.0, 999.0, 0.0],
"capital_gains": [999.0, 999.0, 25.0],
}
)

augmented = pipeline._augment_policyengine_person_inputs(persons)

assert augmented["employment_income_before_lsr"].tolist() == [
50_000.0,
70_000.0,
0.0,
]
assert augmented["self_employment_income_before_lsr"].tolist() == [
500.0,
200.0,
-300.0,
]
assert augmented["taxable_interest_income"].tolist() == [100.0, 20.0, 0.0]
assert augmented["ordinary_dividend_income"].tolist() == [80.0, 30.0, 0.0]
assert augmented["dividend_income"].tolist() == [80.0, 30.0, 0.0]
assert augmented["long_term_capital_gains_before_response"].tolist() == [
40.0,
60.0,
-10.0,
]

def test_augment_policyengine_person_inputs_derives_marital_status_flags_from_cps_codes(
self,
):
Expand Down
18 changes: 18 additions & 0 deletions tests/test_variables.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,24 @@ def test_normalize_dividend_columns_prefers_atomic_components_over_totals():
assert normalized["dividend_income"].tolist() == [42.0]


def test_normalize_dividend_columns_coalesces_sparse_total_aliases_by_row():
frame = pd.DataFrame(
{
"ordinary_dividend_income": [0.0, 30.0, 0.0],
"dividend_income": [80.0, 999.0, 0.0],
"qualified_dividend_income": [0.0, 5.0, 0.0],
"non_qualified_dividend_income": [0.0, 25.0, 0.0],
}
)

normalized = normalize_dividend_columns(frame)

assert normalized["qualified_dividend_income"].tolist() == [0.0, 5.0, 0.0]
assert normalized["non_qualified_dividend_income"].tolist() == [80.0, 25.0, 0.0]
assert normalized["ordinary_dividend_income"].tolist() == [80.0, 30.0, 0.0]
assert normalized["dividend_income"].tolist() == [80.0, 30.0, 0.0]


def test_normalize_social_security_columns_tracks_unclassified_residual():
frame = pd.DataFrame(
{
Expand Down
Loading