diff --git a/src/microplex_us/pipelines/us.py b/src/microplex_us/pipelines/us.py index d1cf379..2336778 100644 --- a/src/microplex_us/pipelines/us.py +++ b/src/microplex_us/pipelines/us.py @@ -196,7 +196,31 @@ ) DEFAULT_ACA_TAKEUP_RATE = 0.672 +DEFAULT_DC_PTC_TAKEUP_RATE = 0.32 +DEFAULT_EARLY_HEAD_START_TAKEUP_RATE = 0.09 +DEFAULT_EITC_TAKEUP_RATES_BY_CHILDREN = {0: 0.65, 1: 0.86, 2: 0.85, 3: 0.85} +DEFAULT_HEAD_START_TAKEUP_RATE = 0.30 +DEFAULT_MEDICAID_TAKEUP_RATE = 0.93 DEFAULT_SNAP_TAKEUP_RATE = 0.82 +DEFAULT_TANF_TAKEUP_RATE = 0.22 +DEFAULT_VOLUNTARY_FILING_RATE = 0.05 +DEFAULT_VOLUNTARY_FILING_RATES = { + "no_children": { + "zero": {"under_65": 0.20, "age_65_plus": 0.05}, + "low": {"under_65": 0.24, "age_65_plus": 0.04}, + "medium": {"under_65": 0.0, "age_65_plus": 0.0}, + "high": {"under_65": 0.0, "age_65_plus": 0.005}, + }, + "with_children": { + "zero": {"under_65": 0.50, "age_65_plus": 0.075}, + "low": {"under_65": 0.60, "age_65_plus": 0.06}, + "medium": {"under_65": 0.0, "age_65_plus": 0.0}, + "high": {"under_65": 0.025, "age_65_plus": 0.0037}, + }, +} +EITC_TAKEUP_CHILD_COUNT_HELPER_COLUMN = "_mp_eitc_child_count_for_takeup" +VOLUNTARY_FILING_AGE_HEAD_HELPER_COLUMN = "_mp_voluntary_filing_age_head" +VOLUNTARY_FILING_WAGE_INCOME_HELPER_COLUMN = "_mp_voluntary_filing_wage_income" def _stable_string_hash(value: str) -> np.uint64: @@ -229,8 +253,16 @@ def _load_policyengine_us_data_takeup_rate(variable_name: str, year: int) -> flo except ImportError: if variable_name == "aca": return DEFAULT_ACA_TAKEUP_RATE + if variable_name == "dc_ptc": + return DEFAULT_DC_PTC_TAKEUP_RATE + if variable_name == "early_head_start": + return DEFAULT_EARLY_HEAD_START_TAKEUP_RATE + if variable_name == "head_start": + return DEFAULT_HEAD_START_TAKEUP_RATE if variable_name == "snap": return DEFAULT_SNAP_TAKEUP_RATE + if variable_name == "tanf": + return DEFAULT_TANF_TAKEUP_RATE raise rate = load_take_up_rate(variable_name, year) if isinstance(rate, dict): @@ -238,6 +270,53 @@ def _load_policyengine_us_data_takeup_rate(variable_name: str, year: int) -> flo return float(rate) +def _load_policyengine_us_data_medicaid_takeup_rates(year: int) -> dict[str, float]: + """Load eCPS Medicaid take-up rates by state abbreviation.""" + try: + from policyengine_us_data.parameters import load_take_up_rate + except ImportError: + return { + state_abbr: DEFAULT_MEDICAID_TAKEUP_RATE + for state_abbr in STATE_FIPS.values() + } + rates = load_take_up_rate("medicaid", year) + if not isinstance(rates, dict): + raise TypeError(f"Expected dict take-up rate for 'medicaid', got {type(rates)}") + return {str(state): float(rate) for state, rate in rates.items()} + + +def _load_policyengine_us_data_eitc_takeup_rates(year: int) -> dict[int, float]: + """Load eCPS EITC take-up rates by capped qualifying-child count.""" + try: + from policyengine_us_data.parameters import load_take_up_rate + except ImportError: + return dict(DEFAULT_EITC_TAKEUP_RATES_BY_CHILDREN) + rates = load_take_up_rate("eitc", year) + if not isinstance(rates, dict): + raise TypeError(f"Expected dict take-up rate for 'eitc', got {type(rates)}") + return {int(children): float(rate) for children, rate in rates.items()} + + +def _load_policyengine_us_data_voluntary_filing_rates(year: int) -> dict: + """Load current eCPS voluntary-filing rate table.""" + try: + from policyengine_us_data.parameters import load_take_up_rate + except ImportError: + return DEFAULT_VOLUNTARY_FILING_RATES + rates = load_take_up_rate("voluntary_filing", year) + if not isinstance(rates, dict): + # Older PE-US-data used a scalar voluntary filing rate. + scalar_rate = float(rates) + return { + children: { + wage: {age: scalar_rate for age in ("under_65", "age_65_plus")} + for wage in ("zero", "low", "medium", "high") + } + for children in ("no_children", "with_children") + } + return rates + + PUF_SUPPORT_CLONE_OVERRIDDEN_VARIABLES: tuple[str, ...] = ( "partnership_s_corp_income", "interest_deduction", @@ -4750,10 +4829,11 @@ def build_policyengine_entity_tables( persons = self._augment_policyengine_person_inputs(persons) persons["relationship_to_head"] = self._normalize_relationship_to_head(persons) persons = self._assign_policyengine_household_head_flag(persons) + persons = self._attach_policyengine_person_takeup_inputs(persons) households = self._build_policyengine_households(persons) tax_units, persons = self._build_policyengine_tax_units(persons) - tax_units = self._attach_policyengine_aca_takeup(tax_units) + tax_units = self._attach_policyengine_tax_unit_takeup_inputs(tax_units) persons = self._construct_aotc_eligibility_inputs(persons) persons = self._assign_family_and_spm_units(persons) families = self._collapse_group_table(persons, "family_id") @@ -8435,24 +8515,57 @@ def _aggregate_policyengine_tax_unit_input_columns( aggregated[column] = float(nonzero_values.iloc[0]) continue aggregated[column] = float(values.sum()) - aca_takeup = self._infer_policyengine_aca_takeup_for_tax_unit(unit_persons) - if aca_takeup is not None: - aggregated["takes_up_aca_if_eligible"] = aca_takeup + for child_count_column in ("eitc_children", "eitc_child_count"): + if child_count_column not in unit_persons.columns: + continue + values = pd.to_numeric( + unit_persons[child_count_column], errors="coerce" + ).fillna(0.0) + aggregated[EITC_TAKEUP_CHILD_COUNT_HELPER_COLUMN] = float(values.max()) + break + employment_income = pd.to_numeric( + unit_persons.get("employment_income", 0.0), errors="coerce" + ) + if isinstance(employment_income, pd.Series): + aggregated[VOLUNTARY_FILING_WAGE_INCOME_HELPER_COLUMN] = float( + employment_income.fillna(0.0).clip(lower=0.0).sum() + ) + age = pd.to_numeric(unit_persons.get("age", 0.0), errors="coerce").fillna(0.0) + head_mask = self._normal_bool_series( + unit_persons.get("is_tax_unit_head", False), + index=unit_persons.index, + ) + if not bool(head_mask.any()) and "relationship_to_head" in unit_persons.columns: + head_mask = ( + pd.to_numeric(unit_persons["relationship_to_head"], errors="coerce") + .fillna(-1) + .eq(0) + ) + head_age = age.loc[head_mask].iloc[0] if bool(head_mask.any()) else age.iloc[0] + aggregated[VOLUNTARY_FILING_AGE_HEAD_HELPER_COLUMN] = float(head_age) + for boolean_column in ( + "takes_up_aca_if_eligible", + "takes_up_dc_ptc", + "takes_up_eitc", + "would_file_taxes_voluntarily", + ): + value = self._infer_policyengine_bool_for_group( + unit_persons, boolean_column + ) + if value is not None: + aggregated[boolean_column] = value return aggregated - def _infer_policyengine_aca_takeup_for_tax_unit( + def _infer_policyengine_bool_for_group( self, - unit_persons: pd.DataFrame, + group_rows: pd.DataFrame, + column: str, ) -> bool | None: - if "takes_up_aca_if_eligible" in unit_persons.columns: + if column in group_rows.columns: return bool( - pd.to_numeric( - unit_persons["takes_up_aca_if_eligible"], - errors="coerce", - ) - .fillna(0.0) - .ne(0.0) - .any() + self._normal_bool_series( + group_rows[column], index=group_rows.index + ).any() ) return None @@ -8482,6 +8595,298 @@ def _attach_policyengine_aca_takeup( result[column] = rng.random(len(result)) < rate return result + def _attach_policyengine_tax_unit_takeup_inputs( + self, + tax_units: pd.DataFrame, + ) -> pd.DataFrame: + """Attach eCPS-style tax-unit stochastic inputs before materialization.""" + result = self._attach_policyengine_aca_takeup(tax_units) + result = self._attach_policyengine_simple_tax_unit_takeup( + result, + column="takes_up_dc_ptc", + rate_key="dc_ptc", + ) + result = self._attach_policyengine_eitc_takeup(result) + return self._attach_policyengine_voluntary_filing(result) + + def _attach_policyengine_simple_tax_unit_takeup( + self, + tax_units: pd.DataFrame, + *, + column: str, + rate_key: str, + ) -> pd.DataFrame: + result = tax_units.copy() + if column in result.columns: + result[column] = self._normal_bool_series( + result[column], index=result.index + ) + return result + + year = self._policyengine_takeup_year() + rate = _load_policyengine_us_data_takeup_rate(rate_key, year) + rng = _policyengine_us_data_seeded_rng(column) + result[column] = rng.random(len(result)) < rate + return result + + def _attach_policyengine_eitc_takeup( + self, + tax_units: pd.DataFrame, + ) -> pd.DataFrame: + result = tax_units.copy() + column = "takes_up_eitc" + if column in result.columns: + result[column] = self._normal_bool_series( + result[column], index=result.index + ) + return result + + year = self._policyengine_takeup_year() + rates = _load_policyengine_us_data_eitc_takeup_rates(year) + child_count_column = ( + EITC_TAKEUP_CHILD_COUNT_HELPER_COLUMN + if EITC_TAKEUP_CHILD_COUNT_HELPER_COLUMN in result.columns + else "n_dependents" + ) + raw_dependent_count = ( + result[child_count_column] + if child_count_column in result.columns + else pd.Series(0, index=result.index) + ) + dependent_count = ( + pd.to_numeric(raw_dependent_count, errors="coerce") + .fillna(0) + .clip(lower=0, upper=3) + .astype(int) + ) + takeup_rate = dependent_count.map(lambda count: rates.get(int(count), 0.85)) + rng = _policyengine_us_data_seeded_rng(column) + result[column] = rng.random(len(result)) < takeup_rate.to_numpy(dtype=float) + return result + + def _attach_policyengine_voluntary_filing( + self, + tax_units: pd.DataFrame, + ) -> pd.DataFrame: + result = tax_units.copy() + column = "would_file_taxes_voluntarily" + if column in result.columns: + result[column] = self._normal_bool_series( + result[column], index=result.index + ) + return result.drop( + columns=[ + EITC_TAKEUP_CHILD_COUNT_HELPER_COLUMN, + VOLUNTARY_FILING_AGE_HEAD_HELPER_COLUMN, + VOLUNTARY_FILING_WAGE_INCOME_HELPER_COLUMN, + ], + errors="ignore", + ) + + year = self._policyengine_takeup_year() + rates = _load_policyengine_us_data_voluntary_filing_rates(year) + takes_up_eitc = self._normal_bool_series( + result.get("takes_up_eitc", False), + index=result.index, + ) + child_count = self._tax_unit_child_count_for_takeup(result) + wage_income = pd.to_numeric( + result.get( + VOLUNTARY_FILING_WAGE_INCOME_HELPER_COLUMN, + pd.Series(0.0, index=result.index), + ), + errors="coerce", + ).fillna(0.0) + age_head = pd.to_numeric( + result.get( + VOLUNTARY_FILING_AGE_HEAD_HELPER_COLUMN, + pd.Series(0.0, index=result.index), + ), + errors="coerce", + ).fillna(0.0) + takeup_rate = self._voluntary_filing_rate_by_tax_unit( + rates, + child_count=child_count, + wage_income=wage_income, + age_head=age_head, + ) + rng = _policyengine_us_data_seeded_rng(column) + result[column] = (~takes_up_eitc.to_numpy(dtype=bool)) & ( + rng.random(len(result)) < takeup_rate.to_numpy(dtype=float) + ) + result = result.drop( + columns=[ + EITC_TAKEUP_CHILD_COUNT_HELPER_COLUMN, + VOLUNTARY_FILING_AGE_HEAD_HELPER_COLUMN, + VOLUNTARY_FILING_WAGE_INCOME_HELPER_COLUMN, + ], + errors="ignore", + ) + return result + + def _tax_unit_child_count_for_takeup(self, tax_units: pd.DataFrame) -> pd.Series: + child_count_column = ( + EITC_TAKEUP_CHILD_COUNT_HELPER_COLUMN + if EITC_TAKEUP_CHILD_COUNT_HELPER_COLUMN in tax_units.columns + else "n_dependents" + ) + raw_child_count = ( + tax_units[child_count_column] + if child_count_column in tax_units.columns + else pd.Series(0, index=tax_units.index) + ) + return ( + pd.to_numeric(raw_child_count, errors="coerce") + .fillna(0) + .clip(lower=0, upper=3) + .astype(int) + ) + + @staticmethod + def _voluntary_filing_rate_by_tax_unit( + rates: dict, + *, + child_count: pd.Series, + wage_income: pd.Series, + age_head: pd.Series, + ) -> pd.Series: + children_bin = np.where( + child_count.to_numpy(dtype=int) > 0, "with_children", "no_children" + ) + wage_values = wage_income.to_numpy(dtype=float) + wage_bin = np.select( + [wage_values <= 0.0, wage_values < 15_000.0, wage_values < 30_000.0], + ["zero", "low", "medium"], + default="high", + ) + age_bin = np.where( + age_head.to_numpy(dtype=float) >= 65.0, "age_65_plus", "under_65" + ) + values = [ + rates.get(children, {}) + .get(wage, {}) + .get(age, DEFAULT_VOLUNTARY_FILING_RATE) + for children, wage, age in zip(children_bin, wage_bin, age_bin, strict=True) + ] + return pd.Series(values, index=child_count.index, dtype=float) + + def _attach_policyengine_person_takeup_inputs( + self, + persons: pd.DataFrame, + ) -> pd.DataFrame: + """Attach eCPS-style person stochastic inputs before materialization.""" + result = self._attach_policyengine_medicaid_takeup(persons) + for column, rate_key in ( + ("takes_up_head_start_if_eligible", "head_start"), + ("takes_up_early_head_start_if_eligible", "early_head_start"), + ): + result = self._attach_policyengine_simple_person_takeup( + result, + column=column, + rate_key=rate_key, + ) + return result + + def _attach_policyengine_simple_person_takeup( + self, + persons: pd.DataFrame, + *, + column: str, + rate_key: str, + ) -> pd.DataFrame: + result = persons.copy() + if column in result.columns: + result[column] = self._normal_bool_series( + result[column], index=result.index + ) + return result + + year = self._policyengine_takeup_year() + rate = _load_policyengine_us_data_takeup_rate(rate_key, year) + rng = _policyengine_us_data_seeded_rng(column) + result[column] = rng.random(len(result)) < rate + return result + + def _attach_policyengine_medicaid_takeup( + self, + persons: pd.DataFrame, + ) -> pd.DataFrame: + result = persons.copy() + column = "takes_up_medicaid_if_eligible" + if column in result.columns: + result[column] = self._normal_bool_series( + result[column], index=result.index + ) + return result + + year = self._policyengine_takeup_year() + rates = _load_policyengine_us_data_medicaid_takeup_rates(year) + states = self._person_state_abbreviation(result) + takeup_rate = states.map( + lambda state: rates.get(state, DEFAULT_MEDICAID_TAKEUP_RATE) + ) + rng = _policyengine_us_data_seeded_rng(column) + result[column] = rng.random(len(result)) < takeup_rate.to_numpy(dtype=float) + return result + + def _person_state_abbreviation(self, persons: pd.DataFrame) -> pd.Series: + if "state" in persons.columns: + state = persons["state"].astype("string").str.upper() + known = set(STATE_FIPS.values()) + return state.where(state.isin(known), "CA").fillna("CA") + if "state_code_str" in persons.columns: + state = persons["state_code_str"].astype("string").str.upper() + known = set(STATE_FIPS.values()) + return state.where(state.isin(known), "CA").fillna("CA") + if "state_fips" in persons.columns: + state_fips = ( + pd.to_numeric(persons["state_fips"], errors="coerce") + .fillna(6) + .astype(int) + ) + return state_fips.map(lambda value: STATE_FIPS.get(int(value), "CA")) + return pd.Series("CA", index=persons.index, dtype="string") + + def _attach_policyengine_spm_takeup_inputs( + self, + spm_units: pd.DataFrame, + ) -> pd.DataFrame: + result = self._attach_policyengine_snap_takeup(spm_units) + return self._attach_policyengine_tanf_takeup(result) + + def _attach_policyengine_tanf_takeup( + self, + spm_units: pd.DataFrame, + ) -> pd.DataFrame: + result = spm_units.copy() + column = "takes_up_tanf_if_eligible" + if column in result.columns: + result[column] = self._normal_bool_series( + result[column], index=result.index + ) + return result + + year = self._policyengine_takeup_year() + rate = _load_policyengine_us_data_takeup_rate("tanf", year) + rng = _policyengine_us_data_seeded_rng(column) + result[column] = rng.random(len(result)) < rate + return result + + def _policyengine_takeup_year(self) -> int: + return int( + self.config.policyengine_dataset_year + or self.config.policyengine_target_period + or 2024 + ) + + @staticmethod + def _normal_bool_series(value: Any, *, index: pd.Index) -> pd.Series: + if isinstance(value, pd.Series): + series = value.reindex(index) + else: + series = pd.Series(value, index=index) + return pd.to_numeric(series, errors="coerce").fillna(0.0).ne(0.0).astype(bool) + def _split_preserved_tax_unit_members( self, unit_persons: pd.DataFrame, @@ -9093,12 +9498,13 @@ def _attach_spm_unit_source_columns( ) -> pd.DataFrame: """Attach observed SPM-unit inputs carried on CPS person rows.""" if "spm_unit_id" not in persons.columns: - return self._attach_policyengine_snap_takeup(spm_units) + return self._attach_policyengine_spm_takeup_inputs(spm_units) aggregation_by_column = { "receives_housing_assistance": "max", "takes_up_housing_assistance_if_eligible": "max", "takes_up_snap_if_eligible": "max", + "takes_up_tanf_if_eligible": "max", "spm_unit_energy_subsidy": "first", } aggregations = { @@ -9107,11 +9513,11 @@ def _attach_spm_unit_source_columns( if column in persons.columns and column not in spm_units.columns } if not aggregations: - return self._attach_policyengine_snap_takeup(spm_units) + return self._attach_policyengine_spm_takeup_inputs(spm_units) source_values = persons.groupby("spm_unit_id", as_index=False).agg(aggregations) merged = spm_units.merge(source_values, on="spm_unit_id", how="left") - return self._attach_policyengine_snap_takeup(merged) + return self._attach_policyengine_spm_takeup_inputs(merged) def _attach_policyengine_snap_takeup( self, diff --git a/tests/pipelines/test_us.py b/tests/pipelines/test_us.py index 3d0ebc3..77d2127 100644 --- a/tests/pipelines/test_us.py +++ b/tests/pipelines/test_us.py @@ -1057,13 +1057,20 @@ def fake_load_takeup_rate(variable_name: str, year: int) -> float: "income": [60_000.0, 0.0, 25_000.0], "relationship_to_head": [0, 2, 0], "takes_up_aca_if_eligible": [True, True, True], + "would_file_taxes_voluntarily": [False, False, False], } ) tables = pipeline.build_policyengine_entity_tables(population) spm_units = tables.spm_units.sort_values("household_id").reset_index(drop=True) - assert calls == [("snap", 2024)] + assert calls == [ + ("head_start", 2024), + ("early_head_start", 2024), + ("dc_ptc", 2024), + ("snap", 2024), + ("tanf", 2024), + ] assert spm_units["takes_up_snap_if_eligible"].tolist() == [False, False] def test_build_policyengine_entity_tables_recomputes_child_count_contract_inputs( @@ -1290,13 +1297,20 @@ def fake_load_takeup_rate(variable_name: str, year: int) -> float: "tenure": [1, 1, 1], "has_marketplace_health_coverage": [True, False, True], "takes_up_snap_if_eligible": [True, True, True], + "would_file_taxes_voluntarily": [False, False, False], } ) tables = pipeline.build_policyengine_entity_tables(population) tax_units = tables.tax_units.sort_values("household_id").reset_index(drop=True) - assert calls == [("aca", 2024)] + assert calls == [ + ("head_start", 2024), + ("early_head_start", 2024), + ("aca", 2024), + ("dc_ptc", 2024), + ("tanf", 2024), + ] assert tax_units["takes_up_aca_if_eligible"].tolist() == [ False, False, @@ -1329,6 +1343,278 @@ def test_build_policyengine_entity_tables_preserves_explicit_aca_takeup(self): tax_units = tables.tax_units.sort_values("household_id").reset_index(drop=True) assert tax_units["takes_up_aca_if_eligible"].tolist() == [True, False] + def test_build_policyengine_entity_tables_adds_ecps_stochastic_takeup_inputs( + self, + monkeypatch, + ): + scalar_calls: list[tuple[str, int]] = [] + medicaid_calls: list[int] = [] + eitc_calls: list[int] = [] + voluntary_calls: list[int] = [] + + def fake_load_takeup_rate(variable_name: str, year: int) -> float: + scalar_calls.append((variable_name, year)) + return { + "head_start": 0.0, + "early_head_start": 1.0, + "dc_ptc": 1.0, + "snap": 1.0, + "tanf": 0.0, + "aca": 1.0, + }[variable_name] + + def fake_load_medicaid_rates(year: int) -> dict[str, float]: + medicaid_calls.append(year) + return {"CA": 0.0, "TX": 1.0} + + def fake_load_eitc_rates(year: int) -> dict[int, float]: + eitc_calls.append(year) + return {0: 0.0, 1: 1.0, 2: 1.0, 3: 1.0} + + def fake_load_voluntary_rates( + year: int, + ) -> dict[str, dict[str, dict[str, float]]]: + voluntary_calls.append(year) + return { + children: { + wage: {age: 1.0 for age in ("under_65", "age_65_plus")} + for wage in ("zero", "low", "medium", "high") + } + for children in ("no_children", "with_children") + } + + monkeypatch.setattr( + us_pipeline_module, + "_load_policyengine_us_data_takeup_rate", + fake_load_takeup_rate, + ) + monkeypatch.setattr( + us_pipeline_module, + "_load_policyengine_us_data_medicaid_takeup_rates", + fake_load_medicaid_rates, + ) + monkeypatch.setattr( + us_pipeline_module, + "_load_policyengine_us_data_eitc_takeup_rates", + fake_load_eitc_rates, + ) + monkeypatch.setattr( + us_pipeline_module, + "_load_policyengine_us_data_voluntary_filing_rates", + fake_load_voluntary_rates, + ) + pipeline = USMicroplexPipeline( + USMicroplexBuildConfig(policyengine_dataset_year=2024) + ) + population = pd.DataFrame( + { + "person_id": [1, 2, 3], + "household_id": [10, 20, 20], + "spm_unit_id": [100, 200, 200], + "weight": [1.0, 1.0, 1.0], + "age": [34, 42, 8], + "sex": [2, 1, 2], + "income": [40_000.0, 35_000.0, 0.0], + "relationship_to_head": [0, 0, 2], + "state_fips": [6, 48, 48], + } + ) + + tables = pipeline.build_policyengine_entity_tables(population) + + persons = tables.persons.sort_values("person_id").reset_index(drop=True) + assert persons["takes_up_medicaid_if_eligible"].tolist() == [ + False, + True, + True, + ] + assert persons["takes_up_head_start_if_eligible"].tolist() == [ + False, + False, + False, + ] + assert persons["takes_up_early_head_start_if_eligible"].tolist() == [ + True, + True, + True, + ] + + tax_units = tables.tax_units.sort_values("household_id").reset_index(drop=True) + assert tax_units["takes_up_aca_if_eligible"].tolist() == [True, True] + assert tax_units["takes_up_dc_ptc"].tolist() == [True, True] + assert tax_units["takes_up_eitc"].tolist() == [False, True] + assert tax_units["would_file_taxes_voluntarily"].tolist() == [True, False] + + spm_units = tables.spm_units.sort_values("household_id").reset_index(drop=True) + assert spm_units["takes_up_snap_if_eligible"].tolist() == [True, True] + assert spm_units["takes_up_tanf_if_eligible"].tolist() == [False, False] + assert scalar_calls == [ + ("head_start", 2024), + ("early_head_start", 2024), + ("aca", 2024), + ("dc_ptc", 2024), + ("snap", 2024), + ("tanf", 2024), + ] + assert medicaid_calls == [2024] + assert eitc_calls == [2024] + assert voluntary_calls == [2024] + + def test_build_policyengine_entity_tables_preserves_explicit_stochastic_takeup_inputs( + self, + monkeypatch, + ): + def fail_scalar_rate(variable_name: str, year: int) -> float: + raise AssertionError(f"unexpected scalar rate load: {variable_name} {year}") + + def fail_medicaid_rates(year: int) -> dict[str, float]: + raise AssertionError(f"unexpected Medicaid rate load: {year}") + + def fail_eitc_rates(year: int) -> dict[int, float]: + raise AssertionError(f"unexpected EITC rate load: {year}") + + def fail_voluntary_rates(year: int) -> dict: + raise AssertionError(f"unexpected voluntary filing rate load: {year}") + + monkeypatch.setattr( + us_pipeline_module, + "_load_policyengine_us_data_takeup_rate", + fail_scalar_rate, + ) + monkeypatch.setattr( + us_pipeline_module, + "_load_policyengine_us_data_medicaid_takeup_rates", + fail_medicaid_rates, + ) + monkeypatch.setattr( + us_pipeline_module, + "_load_policyengine_us_data_eitc_takeup_rates", + fail_eitc_rates, + ) + monkeypatch.setattr( + us_pipeline_module, + "_load_policyengine_us_data_voluntary_filing_rates", + fail_voluntary_rates, + ) + pipeline = USMicroplexPipeline( + USMicroplexBuildConfig(policyengine_dataset_year=2024) + ) + population = pd.DataFrame( + { + "person_id": [1, 2], + "household_id": [10, 10], + "spm_unit_id": [100, 100], + "weight": [1.0, 1.0], + "age": [34, 8], + "sex": [2, 2], + "income": [40_000.0, 0.0], + "relationship_to_head": [0, 2], + "state_fips": [6, 6], + "takes_up_medicaid_if_eligible": [False, True], + "takes_up_head_start_if_eligible": [False, True], + "takes_up_early_head_start_if_eligible": [True, False], + "takes_up_aca_if_eligible": [False, True], + "takes_up_dc_ptc": [False, True], + "takes_up_eitc": [False, True], + "would_file_taxes_voluntarily": [True, False], + "takes_up_snap_if_eligible": [False, True], + "takes_up_tanf_if_eligible": [True, False], + } + ) + + tables = pipeline.build_policyengine_entity_tables(population) + + persons = tables.persons.sort_values("person_id").reset_index(drop=True) + assert persons["takes_up_medicaid_if_eligible"].tolist() == [False, True] + assert persons["takes_up_head_start_if_eligible"].tolist() == [False, True] + assert persons["takes_up_early_head_start_if_eligible"].tolist() == [ + True, + False, + ] + + tax_units = tables.tax_units.sort_values("household_id").reset_index(drop=True) + assert tax_units["takes_up_aca_if_eligible"].tolist() == [True] + assert tax_units["takes_up_dc_ptc"].tolist() == [True] + assert tax_units["takes_up_eitc"].tolist() == [True] + assert tax_units["would_file_taxes_voluntarily"].tolist() == [True] + + spm_units = tables.spm_units.sort_values("household_id").reset_index(drop=True) + assert spm_units["takes_up_snap_if_eligible"].tolist() == [True] + assert spm_units["takes_up_tanf_if_eligible"].tolist() == [True] + + def test_build_policyengine_entity_tables_uses_eitc_children_for_eitc_takeup( + self, + monkeypatch, + ): + eitc_calls: list[int] = [] + + def fail_scalar_rate(variable_name: str, year: int) -> float: + raise AssertionError(f"unexpected scalar rate load: {variable_name} {year}") + + def fail_medicaid_rates(year: int) -> dict[str, float]: + raise AssertionError(f"unexpected Medicaid rate load: {year}") + + def fake_eitc_rates(year: int) -> dict[int, float]: + eitc_calls.append(year) + return {0: 0.0, 1: 1.0, 2: 1.0, 3: 1.0} + + def fail_voluntary_rates(year: int) -> dict: + raise AssertionError(f"unexpected voluntary filing rate load: {year}") + + monkeypatch.setattr( + us_pipeline_module, + "_load_policyengine_us_data_takeup_rate", + fail_scalar_rate, + ) + monkeypatch.setattr( + us_pipeline_module, + "_load_policyengine_us_data_medicaid_takeup_rates", + fail_medicaid_rates, + ) + monkeypatch.setattr( + us_pipeline_module, + "_load_policyengine_us_data_eitc_takeup_rates", + fake_eitc_rates, + ) + monkeypatch.setattr( + us_pipeline_module, + "_load_policyengine_us_data_voluntary_filing_rates", + fail_voluntary_rates, + ) + pipeline = USMicroplexPipeline( + USMicroplexBuildConfig(policyengine_dataset_year=2024) + ) + population = pd.DataFrame( + { + "person_id": [1], + "household_id": [10], + "spm_unit_id": [100], + "weight": [1.0], + "age": [34], + "sex": [2], + "income": [40_000.0], + "relationship_to_head": [0], + "state_fips": [6], + "eitc_children": [1], + "eitc_child_count": [0], + "takes_up_medicaid_if_eligible": [True], + "takes_up_head_start_if_eligible": [False], + "takes_up_early_head_start_if_eligible": [False], + "takes_up_aca_if_eligible": [True], + "takes_up_dc_ptc": [False], + "would_file_taxes_voluntarily": [False], + "takes_up_snap_if_eligible": [True], + "takes_up_tanf_if_eligible": [False], + } + ) + + tables = pipeline.build_policyengine_entity_tables(population) + + tax_units = tables.tax_units.sort_values("household_id").reset_index(drop=True) + assert tax_units["takes_up_eitc"].tolist() == [True] + assert "_mp_eitc_child_count_for_takeup" not in tax_units.columns + assert eitc_calls == [2024] + def test_build_policyengine_entity_tables_fallback_employment_excludes_transfer_income( self, ):