From 50baa26c4ad1234d4f184cd1c7305cfcf6dbe7e4 Mon Sep 17 00:00:00 2001 From: Max Ghenis Date: Wed, 3 Jun 2026 15:53:30 -0400 Subject: [PATCH] Drop prior-year ASEC earnings lookback; fall back to current earnings The EITC/CTC prior-year-earnings election (the COVID-era "lookback") expired after 2021, so employment_income_last_year / self_employment_income_last_year / previous_year_income_available feed no live PolicyEngine-US formula. Remove the prior-ASEC load + PERIDNUM panel-join (an extra survey-year dependency that only covered the ~50% rotation overlap) and set last-year income to current-year earnings (WSAL_VAL / SEMP_VAL) as a placeholder. previous_year_income_available now tracks whether the row has any earnings, keeping the export columns populated/varying until they are removed. Co-Authored-By: Claude Opus 4.8 (1M context) --- src/microplex_us/data_sources/cps.py | 136 ++++++--------------------- tests/test_cps_source_provider.py | 37 +++----- 2 files changed, 43 insertions(+), 130 deletions(-) diff --git a/src/microplex_us/data_sources/cps.py b/src/microplex_us/data_sources/cps.py index 6c232de..790b96f 100644 --- a/src/microplex_us/data_sources/cps.py +++ b/src/microplex_us/data_sources/cps.py @@ -39,7 +39,7 @@ # Default cache directory DEFAULT_CACHE_DIR = Path.home() / ".cache" / "microplex" -CPS_ASEC_PROCESSED_CACHE_VERSION = "20260603_ecps_export_support_prior_year" +CPS_ASEC_PROCESSED_CACHE_VERSION = "20260603_lastyr_income_current_fallback" CURRENT_HEALTH_COVERAGE_REPORTED_VAR_MAP = { "reported_has_direct_purchase_health_coverage_at_interview": "NOW_DIR", @@ -1332,119 +1332,45 @@ def _read_cps_asec_raw_files( return persons_raw, households_raw -def _load_previous_cps_asec_persons_raw( - *, - year: int, - cache_dir: Path, - download: bool, -) -> pl.DataFrame | None: - previous_year = int(year) - 1 - if previous_year not in CPS_URLS: - return None - - zip_path = cache_dir / f"cps_asec_{previous_year}.zip" - if not zip_path.exists(): - if not download: - return None - try: - zip_path = download_cps_asec(previous_year, cache_dir) - except Exception as exc: - print(f"Could not load previous CPS ASEC {previous_year}: {exc}") - return None - - try: - persons_raw, _ = _read_cps_asec_raw_files(zip_path) - except Exception as exc: - print(f"Could not parse previous CPS ASEC {previous_year}: {exc}") - return None - return persons_raw - - def _attach_previous_year_income( *, persons: pl.DataFrame, current_persons_raw: pl.DataFrame, - previous_persons_raw: pl.DataFrame | None, ) -> pl.DataFrame: - default_exprs = [ - pl.lit(-1.0).alias("employment_income_last_year"), - pl.lit(-1.0).alias("self_employment_income_last_year"), - pl.lit(False).alias("previous_year_income_available"), - ] - current_required = {"PERIDNUM", "I_ERNVAL", "I_SEVAL"} - previous_required = {"PERIDNUM", "WSAL_VAL", "SEMP_VAL", "I_ERNVAL", "I_SEVAL"} - if previous_persons_raw is None: - return persons.with_columns(default_exprs) - if not current_required.issubset(set(current_persons_raw.columns)): - return persons.with_columns(default_exprs) - if not previous_required.issubset(set(previous_persons_raw.columns)): - return persons.with_columns(default_exprs) - if len(persons) != len(current_persons_raw): - return persons.with_columns(default_exprs) - - current = current_persons_raw.select(sorted(current_required)).to_pandas() - current["_mp_row_order"] = np.arange(len(current)) - previous = previous_persons_raw.select(sorted(previous_required)).to_pandas() - previous = previous.rename( - columns={ - "WSAL_VAL": "employment_income_last_year", - "SEMP_VAL": "self_employment_income_last_year", - "I_ERNVAL": "_previous_year_wage_imputation_flag", - "I_SEVAL": "_previous_year_self_employment_imputation_flag", - } - ) - - for column in ( - "I_ERNVAL", - "I_SEVAL", - "_previous_year_wage_imputation_flag", - "_previous_year_self_employment_imputation_flag", - "employment_income_last_year", - "self_employment_income_last_year", + # The EITC/CTC prior-year-earnings election (the COVID-era "lookback") + # expired after 2021, so employment_income_last_year / + # self_employment_income_last_year / previous_year_income_available feed no + # live PolicyEngine-US formula. Rather than load and panel-join the prior + # ASEC (an extra survey-year dependency that only covered the ~50% rotation + # overlap), fall back to current-year earnings as a placeholder. These + # columns can be dropped entirely once the export contract no longer + # requires them. + required = {"WSAL_VAL", "SEMP_VAL"} + if not required.issubset(set(current_persons_raw.columns)) or len(persons) != len( + current_persons_raw ): - if column in current.columns: - current[column] = pd.to_numeric(current[column], errors="coerce") - if column in previous.columns: - previous[column] = pd.to_numeric(previous[column], errors="coerce") - - previous = previous[ - previous["_previous_year_wage_imputation_flag"].eq(0) - & previous["_previous_year_self_employment_imputation_flag"].eq(0) - ] - previous = previous.drop( - [ - "_previous_year_wage_imputation_flag", - "_previous_year_self_employment_imputation_flag", - ], - axis=1, - ) - joined = ( - current.set_index("PERIDNUM") - .join(previous.set_index("PERIDNUM"), how="left") - .sort_values("_mp_row_order") - ) - previous_year_income_available = ( - joined["employment_income_last_year"].notna() - & joined["self_employment_income_last_year"].notna() - & joined["I_ERNVAL"].eq(0) - & joined["I_SEVAL"].eq(0) - ) - employment_income_last_year = ( - joined["employment_income_last_year"].fillna(-1.0).to_numpy(dtype=float) + return persons.with_columns( + [ + pl.lit(-1.0).alias("employment_income_last_year"), + pl.lit(-1.0).alias("self_employment_income_last_year"), + pl.lit(False).alias("previous_year_income_available"), + ] + ) + + current = current_persons_raw.select(["WSAL_VAL", "SEMP_VAL"]).to_pandas() + employment = ( + pd.to_numeric(current["WSAL_VAL"], errors="coerce").fillna(0.0).to_numpy(float) ) - self_employment_income_last_year = ( - joined["self_employment_income_last_year"].fillna(-1.0).to_numpy(dtype=float) + self_employment = ( + pd.to_numeric(current["SEMP_VAL"], errors="coerce").fillna(0.0).to_numpy(float) ) return persons.with_columns( [ - pl.Series("employment_income_last_year", employment_income_last_year), - pl.Series( - "self_employment_income_last_year", - self_employment_income_last_year, - ), + pl.Series("employment_income_last_year", employment), + pl.Series("self_employment_income_last_year", self_employment), pl.Series( "previous_year_income_available", - previous_year_income_available.to_numpy(dtype=bool), + (employment != 0.0) | (self_employment != 0.0), ), ] ) @@ -1511,18 +1437,12 @@ def load_cps_asec( print(f"Parsing CPS ASEC {year}...") persons_raw, households_raw = _read_cps_asec_raw_files(zip_path) - previous_persons_raw = _load_previous_cps_asec_persons_raw( - year=year, - cache_dir=cache_dir, - download=download, - ) # Process person data persons = _process_persons(persons_raw, year) persons = _attach_previous_year_income( persons=persons, current_persons_raw=persons_raw, - previous_persons_raw=previous_persons_raw, ) # Process or derive household data diff --git a/tests/test_cps_source_provider.py b/tests/test_cps_source_provider.py index e2b2970..f5fbbd0 100644 --- a/tests/test_cps_source_provider.py +++ b/tests/test_cps_source_provider.py @@ -509,7 +509,11 @@ def test_load_cps_asec_derives_policyengine_value_inputs(tmp_path): assert persons["medicare_part_b_premiums"].tolist() == [600, 0] -def test_load_cps_asec_attaches_previous_year_income_from_prior_asec(tmp_path): +def test_load_cps_asec_falls_back_last_year_income_to_current_earnings(tmp_path): + # The prior-year-earnings lookback (EITC/CTC prior-year election) expired, + # so last-year income is a placeholder set to current-year earnings + # (WSAL_VAL / SEMP_VAL) with no prior-ASEC dependency. + # previous_year_income_available tracks whether the row has any earnings. current_person_rows = pd.DataFrame( { "PERIDNUM": ["A", "B", "C", "D"], @@ -523,19 +527,8 @@ def test_load_cps_asec_attaches_previous_year_income_from_prior_asec(tmp_path): "I_SEVAL": [0, 0, 0, 0], } ) - previous_person_rows = pd.DataFrame( - { - "PERIDNUM": ["A", "B", "C"], - "WSAL_VAL": [50_000, 8_000, 25_000], - "SEMP_VAL": [6_000, 1_000, 2_500], - "I_ERNVAL": [0, 0, 0], - "I_SEVAL": [0, 0, 1], - } - ) with zipfile.ZipFile(tmp_path / "cps_asec_2023.zip", "w") as archive: archive.writestr("pppub23.csv", current_person_rows.to_csv(index=False)) - with zipfile.ZipFile(tmp_path / "cps_asec_2022.zip", "w") as archive: - archive.writestr("pppub22.csv", previous_person_rows.to_csv(index=False)) dataset = load_cps_asec(year=2023, cache_dir=tmp_path, download=False) persons = ( @@ -545,21 +538,21 @@ def test_load_cps_asec_attaches_previous_year_income_from_prior_asec(tmp_path): ) assert persons["employment_income_last_year"].tolist() == [ - 50_000.0, - 8_000.0, - -1.0, - -1.0, + 60_000.0, + 10_000.0, + 20_000.0, + 0.0, ] assert persons["self_employment_income_last_year"].tolist() == [ - 6_000.0, - 1_000.0, - -1.0, - -1.0, + 5_000.0, + 0.0, + 3_000.0, + 0.0, ] assert persons["previous_year_income_available"].tolist() == [ True, - False, - False, + True, + True, False, ]