diff --git a/src/microplex_us/pipelines/us.py b/src/microplex_us/pipelines/us.py index 41d8fc9..379f14a 100644 --- a/src/microplex_us/pipelines/us.py +++ b/src/microplex_us/pipelines/us.py @@ -10151,6 +10151,27 @@ def first_present(*columns: str) -> pd.Series: ) return zero.copy() + def first_nonzero_or_present(*columns: str) -> pd.Series: + values = zero.copy() + found = False + for column in columns: + if column not in result.columns: + continue + candidate = ( + pd.to_numeric( + result[column], + errors="coerce", + ) + .fillna(0.0) + .astype(float) + ) + if not found: + values = candidate.copy() + found = True + continue + values = values.where(values.ne(0.0), candidate) + return values if found else zero.copy() + def has_any(*columns: str) -> bool: return any(column in result.columns for column in columns) @@ -10272,14 +10293,17 @@ def has_any(*columns: str) -> bool: result["takes_up_ssi_if_eligible"] = first_present("ssi").gt(0.0) known_nonemployment = ( - first_present("self_employment_income") - + first_present("taxable_interest_income", "interest_income") - + first_present("ordinary_dividend_income", "dividend_income") + first_nonzero_or_present( + "self_employment_income_before_lsr", + "self_employment_income", + ) + + first_nonzero_or_present("taxable_interest_income", "interest_income") + + first_nonzero_or_present("ordinary_dividend_income", "dividend_income") + first_present("rental_income") + first_present("gross_social_security", "social_security") + first_present("ssi") + first_present("public_assistance") - + first_present("taxable_pension_income", "pension_income") + + first_nonzero_or_present("taxable_pension_income", "pension_income") + first_present("unemployment_compensation") ) fallback_employment_income = ( @@ -10290,7 +10314,7 @@ def has_any(*columns: str) -> bool: ).clip(lower=0.0) result["employment_income_before_lsr"] = ( - first_present( + first_nonzero_or_present( "employment_income_before_lsr", "employment_income", "wage_income" ) if has_any( @@ -10298,11 +10322,11 @@ def has_any(*columns: str) -> bool: ) else fallback_employment_income ) - result["self_employment_income_before_lsr"] = first_present( + result["self_employment_income_before_lsr"] = first_nonzero_or_present( "self_employment_income_before_lsr", "self_employment_income", ) - result["taxable_interest_income"] = first_present( + result["taxable_interest_income"] = first_nonzero_or_present( "taxable_interest_income", "interest_income", ) @@ -10315,17 +10339,21 @@ def has_any(*columns: str) -> bool: result["non_qualified_dividend_income"] = first_present( "non_qualified_dividend_income", ).clip(lower=0.0) - result["ordinary_dividend_income"] = first_present( + dividend_alias = first_nonzero_or_present( "ordinary_dividend_income", "dividend_income", ).clip(lower=0.0) + result["ordinary_dividend_income"] = dividend_alias if has_any("qualified_dividend_income", "non_qualified_dividend_income"): dividend_total = ( result["qualified_dividend_income"] + result["non_qualified_dividend_income"] ).clip(lower=0.0) - result["ordinary_dividend_income"] = dividend_total - result["dividend_income"] = dividend_total + result["ordinary_dividend_income"] = dividend_total.where( + dividend_total.ne(0.0), + dividend_alias, + ) + result["dividend_income"] = result["ordinary_dividend_income"] else: result = normalize_dividend_columns(result) @@ -10335,15 +10363,17 @@ def has_any(*columns: str) -> bool: "capital_gains_distributions", ) result["long_term_capital_gains_before_response"] = ( - first_present( + first_nonzero_or_present( "long_term_capital_gains_before_response", "long_term_capital_gains", + "capital_gains", ) if has_any( "long_term_capital_gains_before_response", "long_term_capital_gains", + "capital_gains", ) - else first_present("capital_gains") + else zero.copy() ) result["partnership_s_corp_income"] = first_present("partnership_s_corp_income") result["partnership_se_income"] = first_present("partnership_se_income") diff --git a/src/microplex_us/variables.py b/src/microplex_us/variables.py index 791cc7f..e8cb768 100644 --- a/src/microplex_us/variables.py +++ b/src/microplex_us/variables.py @@ -666,17 +666,22 @@ def normalize_dividend_columns(frame: pd.DataFrame) -> pd.DataFrame: result = frame.copy() qualified = _nonnegative_series(result, "qualified_dividend_income") non_qualified = _nonnegative_series(result, "non_qualified_dividend_income") - total = ( - _nonnegative_series(result, "ordinary_dividend_income") - if "ordinary_dividend_income" in result.columns - else _nonnegative_series(result, "dividend_income") - ) + ordinary_total = _nonnegative_series(result, "ordinary_dividend_income") + dividend_total = _nonnegative_series(result, "dividend_income") + if "ordinary_dividend_income" in result.columns: + total = ordinary_total.where(ordinary_total.ne(0.0), dividend_total) + else: + total = dividend_total has_qualified = "qualified_dividend_income" in result.columns has_non_qualified = "non_qualified_dividend_income" in result.columns if has_qualified and has_non_qualified: - normalized_total = qualified + non_qualified + component_total = qualified + non_qualified + total_only = component_total.eq(0.0) & total.gt(0.0) + non_qualified = non_qualified.where(~total_only, total) + component_total = qualified + non_qualified + normalized_total = component_total.where(component_total.ne(0.0), total) elif has_qualified: normalized_total = np.maximum(total.to_numpy(dtype=float), qualified.to_numpy(dtype=float)) non_qualified = pd.Series( diff --git a/tests/pipelines/test_us.py b/tests/pipelines/test_us.py index 99a0244..e59e92f 100644 --- a/tests/pipelines/test_us.py +++ b/tests/pipelines/test_us.py @@ -4919,6 +4919,52 @@ def test_augment_policyengine_person_inputs_materializes_agi_parity_inputs(self) assert augmented["self_employed_health_insurance_ald"].tolist() == [15.0] assert augmented["self_employed_pension_contribution_ald"].tolist() == [10.0] + def test_augment_policyengine_person_inputs_coalesces_sparse_source_aliases_by_row( + self, + ): + pipeline = USMicroplexPipeline(USMicroplexBuildConfig()) + persons = pd.DataFrame( + { + "age": [45, 50, 55], + "sex": [1, 2, 1], + "income": [60_000.0, 75_000.0, 0.0], + "employment_income_before_lsr": [0.0, 70_000.0, 0.0], + "wage_income": [50_000.0, 80_000.0, 0.0], + "self_employment_income_before_lsr": [0.0, 200.0, -300.0], + "self_employment_income": [500.0, 999.0, 50.0], + "taxable_interest_income": [0.0, 20.0, 0.0], + "interest_income": [100.0, 999.0, 0.0], + "ordinary_dividend_income": [0.0, 30.0, 0.0], + "dividend_income": [80.0, 999.0, 0.0], + "qualified_dividend_income": [0.0, 5.0, 0.0], + "non_qualified_dividend_income": [0.0, 25.0, 0.0], + "long_term_capital_gains_before_response": [0.0, 60.0, -10.0], + "long_term_capital_gains": [40.0, 999.0, 0.0], + "capital_gains": [999.0, 999.0, 25.0], + } + ) + + augmented = pipeline._augment_policyengine_person_inputs(persons) + + assert augmented["employment_income_before_lsr"].tolist() == [ + 50_000.0, + 70_000.0, + 0.0, + ] + assert augmented["self_employment_income_before_lsr"].tolist() == [ + 500.0, + 200.0, + -300.0, + ] + assert augmented["taxable_interest_income"].tolist() == [100.0, 20.0, 0.0] + assert augmented["ordinary_dividend_income"].tolist() == [80.0, 30.0, 0.0] + assert augmented["dividend_income"].tolist() == [80.0, 30.0, 0.0] + assert augmented["long_term_capital_gains_before_response"].tolist() == [ + 40.0, + 60.0, + -10.0, + ] + def test_augment_policyengine_person_inputs_derives_marital_status_flags_from_cps_codes( self, ): diff --git a/tests/test_variables.py b/tests/test_variables.py index 05af542..1040584 100644 --- a/tests/test_variables.py +++ b/tests/test_variables.py @@ -45,6 +45,24 @@ def test_normalize_dividend_columns_prefers_atomic_components_over_totals(): assert normalized["dividend_income"].tolist() == [42.0] +def test_normalize_dividend_columns_coalesces_sparse_total_aliases_by_row(): + frame = pd.DataFrame( + { + "ordinary_dividend_income": [0.0, 30.0, 0.0], + "dividend_income": [80.0, 999.0, 0.0], + "qualified_dividend_income": [0.0, 5.0, 0.0], + "non_qualified_dividend_income": [0.0, 25.0, 0.0], + } + ) + + normalized = normalize_dividend_columns(frame) + + assert normalized["qualified_dividend_income"].tolist() == [0.0, 5.0, 0.0] + assert normalized["non_qualified_dividend_income"].tolist() == [80.0, 25.0, 0.0] + assert normalized["ordinary_dividend_income"].tolist() == [80.0, 30.0, 0.0] + assert normalized["dividend_income"].tolist() == [80.0, 30.0, 0.0] + + def test_normalize_social_security_columns_tracks_unclassified_residual(): frame = pd.DataFrame( {