Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
136 changes: 28 additions & 108 deletions src/microplex_us/data_sources/cps.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@

# Default cache directory
DEFAULT_CACHE_DIR = Path.home() / ".cache" / "microplex"
CPS_ASEC_PROCESSED_CACHE_VERSION = "20260603_ecps_export_support_prior_year"
CPS_ASEC_PROCESSED_CACHE_VERSION = "20260603_lastyr_income_current_fallback"

CURRENT_HEALTH_COVERAGE_REPORTED_VAR_MAP = {
"reported_has_direct_purchase_health_coverage_at_interview": "NOW_DIR",
Expand Down Expand Up @@ -1332,119 +1332,45 @@ def _read_cps_asec_raw_files(
return persons_raw, households_raw


def _load_previous_cps_asec_persons_raw(
*,
year: int,
cache_dir: Path,
download: bool,
) -> pl.DataFrame | None:
previous_year = int(year) - 1
if previous_year not in CPS_URLS:
return None

zip_path = cache_dir / f"cps_asec_{previous_year}.zip"
if not zip_path.exists():
if not download:
return None
try:
zip_path = download_cps_asec(previous_year, cache_dir)
except Exception as exc:
print(f"Could not load previous CPS ASEC {previous_year}: {exc}")
return None

try:
persons_raw, _ = _read_cps_asec_raw_files(zip_path)
except Exception as exc:
print(f"Could not parse previous CPS ASEC {previous_year}: {exc}")
return None
return persons_raw


def _attach_previous_year_income(
*,
persons: pl.DataFrame,
current_persons_raw: pl.DataFrame,
previous_persons_raw: pl.DataFrame | None,
) -> pl.DataFrame:
default_exprs = [
pl.lit(-1.0).alias("employment_income_last_year"),
pl.lit(-1.0).alias("self_employment_income_last_year"),
pl.lit(False).alias("previous_year_income_available"),
]
current_required = {"PERIDNUM", "I_ERNVAL", "I_SEVAL"}
previous_required = {"PERIDNUM", "WSAL_VAL", "SEMP_VAL", "I_ERNVAL", "I_SEVAL"}
if previous_persons_raw is None:
return persons.with_columns(default_exprs)
if not current_required.issubset(set(current_persons_raw.columns)):
return persons.with_columns(default_exprs)
if not previous_required.issubset(set(previous_persons_raw.columns)):
return persons.with_columns(default_exprs)
if len(persons) != len(current_persons_raw):
return persons.with_columns(default_exprs)

current = current_persons_raw.select(sorted(current_required)).to_pandas()
current["_mp_row_order"] = np.arange(len(current))
previous = previous_persons_raw.select(sorted(previous_required)).to_pandas()
previous = previous.rename(
columns={
"WSAL_VAL": "employment_income_last_year",
"SEMP_VAL": "self_employment_income_last_year",
"I_ERNVAL": "_previous_year_wage_imputation_flag",
"I_SEVAL": "_previous_year_self_employment_imputation_flag",
}
)

for column in (
"I_ERNVAL",
"I_SEVAL",
"_previous_year_wage_imputation_flag",
"_previous_year_self_employment_imputation_flag",
"employment_income_last_year",
"self_employment_income_last_year",
# The EITC/CTC prior-year-earnings election (the COVID-era "lookback")
# expired after 2021, so employment_income_last_year /
# self_employment_income_last_year / previous_year_income_available feed no
# live PolicyEngine-US formula. Rather than load and panel-join the prior
# ASEC (an extra survey-year dependency that only covered the ~50% rotation
# overlap), fall back to current-year earnings as a placeholder. These
# columns can be dropped entirely once the export contract no longer
# requires them.
required = {"WSAL_VAL", "SEMP_VAL"}
if not required.issubset(set(current_persons_raw.columns)) or len(persons) != len(
current_persons_raw
):
if column in current.columns:
current[column] = pd.to_numeric(current[column], errors="coerce")
if column in previous.columns:
previous[column] = pd.to_numeric(previous[column], errors="coerce")

previous = previous[
previous["_previous_year_wage_imputation_flag"].eq(0)
& previous["_previous_year_self_employment_imputation_flag"].eq(0)
]
previous = previous.drop(
[
"_previous_year_wage_imputation_flag",
"_previous_year_self_employment_imputation_flag",
],
axis=1,
)
joined = (
current.set_index("PERIDNUM")
.join(previous.set_index("PERIDNUM"), how="left")
.sort_values("_mp_row_order")
)
previous_year_income_available = (
joined["employment_income_last_year"].notna()
& joined["self_employment_income_last_year"].notna()
& joined["I_ERNVAL"].eq(0)
& joined["I_SEVAL"].eq(0)
)
employment_income_last_year = (
joined["employment_income_last_year"].fillna(-1.0).to_numpy(dtype=float)
return persons.with_columns(
[
pl.lit(-1.0).alias("employment_income_last_year"),
pl.lit(-1.0).alias("self_employment_income_last_year"),
pl.lit(False).alias("previous_year_income_available"),
]
)

current = current_persons_raw.select(["WSAL_VAL", "SEMP_VAL"]).to_pandas()
employment = (
pd.to_numeric(current["WSAL_VAL"], errors="coerce").fillna(0.0).to_numpy(float)
)
self_employment_income_last_year = (
joined["self_employment_income_last_year"].fillna(-1.0).to_numpy(dtype=float)
self_employment = (
pd.to_numeric(current["SEMP_VAL"], errors="coerce").fillna(0.0).to_numpy(float)
)
return persons.with_columns(
[
pl.Series("employment_income_last_year", employment_income_last_year),
pl.Series(
"self_employment_income_last_year",
self_employment_income_last_year,
),
pl.Series("employment_income_last_year", employment),
pl.Series("self_employment_income_last_year", self_employment),
pl.Series(
"previous_year_income_available",
previous_year_income_available.to_numpy(dtype=bool),
(employment != 0.0) | (self_employment != 0.0),
),
]
)
Expand Down Expand Up @@ -1511,18 +1437,12 @@ def load_cps_asec(
print(f"Parsing CPS ASEC {year}...")

persons_raw, households_raw = _read_cps_asec_raw_files(zip_path)
previous_persons_raw = _load_previous_cps_asec_persons_raw(
year=year,
cache_dir=cache_dir,
download=download,
)

# Process person data
persons = _process_persons(persons_raw, year)
persons = _attach_previous_year_income(
persons=persons,
current_persons_raw=persons_raw,
previous_persons_raw=previous_persons_raw,
)

# Process or derive household data
Expand Down
37 changes: 15 additions & 22 deletions tests/test_cps_source_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -509,7 +509,11 @@ def test_load_cps_asec_derives_policyengine_value_inputs(tmp_path):
assert persons["medicare_part_b_premiums"].tolist() == [600, 0]


def test_load_cps_asec_attaches_previous_year_income_from_prior_asec(tmp_path):
def test_load_cps_asec_falls_back_last_year_income_to_current_earnings(tmp_path):
# The prior-year-earnings lookback (EITC/CTC prior-year election) expired,
# so last-year income is a placeholder set to current-year earnings
# (WSAL_VAL / SEMP_VAL) with no prior-ASEC dependency.
# previous_year_income_available tracks whether the row has any earnings.
current_person_rows = pd.DataFrame(
{
"PERIDNUM": ["A", "B", "C", "D"],
Expand All @@ -523,19 +527,8 @@ def test_load_cps_asec_attaches_previous_year_income_from_prior_asec(tmp_path):
"I_SEVAL": [0, 0, 0, 0],
}
)
previous_person_rows = pd.DataFrame(
{
"PERIDNUM": ["A", "B", "C"],
"WSAL_VAL": [50_000, 8_000, 25_000],
"SEMP_VAL": [6_000, 1_000, 2_500],
"I_ERNVAL": [0, 0, 0],
"I_SEVAL": [0, 0, 1],
}
)
with zipfile.ZipFile(tmp_path / "cps_asec_2023.zip", "w") as archive:
archive.writestr("pppub23.csv", current_person_rows.to_csv(index=False))
with zipfile.ZipFile(tmp_path / "cps_asec_2022.zip", "w") as archive:
archive.writestr("pppub22.csv", previous_person_rows.to_csv(index=False))

dataset = load_cps_asec(year=2023, cache_dir=tmp_path, download=False)
persons = (
Expand All @@ -545,21 +538,21 @@ def test_load_cps_asec_attaches_previous_year_income_from_prior_asec(tmp_path):
)

assert persons["employment_income_last_year"].tolist() == [
50_000.0,
8_000.0,
-1.0,
-1.0,
60_000.0,
10_000.0,
20_000.0,
0.0,
]
assert persons["self_employment_income_last_year"].tolist() == [
6_000.0,
1_000.0,
-1.0,
-1.0,
5_000.0,
0.0,
3_000.0,
0.0,
]
assert persons["previous_year_income_available"].tolist() == [
True,
False,
False,
True,
True,
False,
]

Expand Down
Loading