From 90ce4757f23c092c897ed74f70c3c32061e9f0a7 Mon Sep 17 00:00:00 2001 From: Max Ghenis Date: Mon, 1 Jun 2026 21:09:54 -0400 Subject: [PATCH] Spread CPS age topcode across 80 to 84 --- src/microplex_us/data_sources/cps.py | 233 ++++++----- src/microplex_us/data_sources/cps_age.py | 46 +++ src/microplex_us/data_sources/cps_mappings.py | 365 +++++++++--------- .../data_sources/cps_transform.py | 101 ++--- tests/data_sources/test_cps_age.py | 55 +++ 5 files changed, 474 insertions(+), 326 deletions(-) create mode 100644 src/microplex_us/data_sources/cps_age.py create mode 100644 tests/data_sources/test_cps_age.py diff --git a/src/microplex_us/data_sources/cps.py b/src/microplex_us/data_sources/cps.py index daa6eec..273e487 100644 --- a/src/microplex_us/data_sources/cps.py +++ b/src/microplex_us/data_sources/cps.py @@ -29,6 +29,7 @@ apply_source_query, ) +from microplex_us.data_sources.cps_age import randomize_cps_topcoded_age_80_84 from microplex_us.data_sources.sampling import ( sample_frame_with_state_floor, sample_frame_without_replacement, @@ -37,7 +38,7 @@ # Default cache directory DEFAULT_CACHE_DIR = Path.home() / ".cache" / "microplex" -CPS_ASEC_PROCESSED_CACHE_VERSION = "20260601_ecps_spm_takeup_inputs" +CPS_ASEC_PROCESSED_CACHE_VERSION = "20260601_ecps_age80_84" # CPS ASEC data URLs by year CPS_URLS = { @@ -386,7 +387,9 @@ def _descriptor_from_tables( entity=EntityType.HOUSEHOLD, key_column="household_id", variable_names=household_variables, - weight_column="household_weight" if "household_weight" in households.columns else None, + weight_column="household_weight" + if "household_weight" in households.columns + else None, period_column="year" if "year" in households.columns else None, ), EntityObservation( @@ -410,7 +413,9 @@ def _ensure_person_ids(persons: pd.DataFrame) -> pd.DataFrame: return result if "person_number" in result.columns and "household_id" in result.columns: result["person_id"] = ( - result["household_id"].astype(str) + ":" + result["person_number"].astype(str) + result["household_id"].astype(str) + + ":" + + result["person_number"].astype(str) ) return result if "household_id" in result.columns: @@ -485,8 +490,14 @@ def _repair_relationship_to_head( for index in household_relationship.index.tolist() if household_ages.loc[index] >= 18 ] - candidate_pool = spouse_candidates or adult_candidates or household_relationship.index.tolist() - head_choice = max(candidate_pool, key=lambda index: household_ages.loc[index]) + candidate_pool = ( + spouse_candidates + or adult_candidates + or household_relationship.index.tolist() + ) + head_choice = max( + candidate_pool, key=lambda index: household_ages.loc[index] + ) normalized.loc[head_choice] = 0 head_index = [head_choice] elif len(head_index) > 1: @@ -545,8 +556,13 @@ def _normalize_relationship_to_head(persons: pd.DataFrame) -> pd.Series: order = persons.groupby("household_id").cumcount() normalized = pd.Series(3, index=persons.index, dtype=int) normalized.loc[order == 0] = 0 - normalized.loc[(order == 1) & (pd.to_numeric(persons.get("age", 0), errors="coerce").fillna(0) >= 18)] = 1 - normalized.loc[pd.to_numeric(persons.get("age", 0), errors="coerce").fillna(0) < 18] = 2 + normalized.loc[ + (order == 1) + & (pd.to_numeric(persons.get("age", 0), errors="coerce").fillna(0) >= 18) + ] = 1 + normalized.loc[ + pd.to_numeric(persons.get("age", 0), errors="coerce").fillna(0) < 18 + ] = 2 return _repair_relationship_to_head(persons, normalized) relationship = ( @@ -570,8 +586,13 @@ def _normalize_relationship_to_head(persons: pd.DataFrame) -> pd.Series: order = persons.groupby("household_id").cumcount() normalized = pd.Series(3, index=persons.index, dtype=int) normalized.loc[order == 0] = 0 - normalized.loc[(order == 1) & (pd.to_numeric(persons.get("age", 0), errors="coerce").fillna(0) >= 18)] = 1 - normalized.loc[pd.to_numeric(persons.get("age", 0), errors="coerce").fillna(0) < 18] = 2 + normalized.loc[ + (order == 1) + & (pd.to_numeric(persons.get("age", 0), errors="coerce").fillna(0) >= 18) + ] = 1 + normalized.loc[ + pd.to_numeric(persons.get("age", 0), errors="coerce").fillna(0) < 18 + ] = 2 return _repair_relationship_to_head(persons, normalized) @@ -589,15 +610,23 @@ def _add_cps_tax_unit_structure_columns(persons: pd.DataFrame) -> pd.DataFrame: result["is_tax_unit_dependent"] = 0.0 ages = pd.to_numeric(result.get("age", 0), errors="coerce").fillna(0.0) - spouse_person_number = pd.to_numeric( - result.get("spouse_person_number", 0), errors="coerce" - ).fillna(0).astype(int) - person_number = pd.to_numeric( - result.get("person_number", 0), errors="coerce" - ).fillna(0).astype(int) - - valid_tax_unit_ids = result["tax_unit_id"].notna() & result["tax_unit_id"].astype(str).str.strip().ne("") - grouped = result.loc[valid_tax_unit_ids].groupby(["household_id", "tax_unit_id"], sort=False) + spouse_person_number = ( + pd.to_numeric(result.get("spouse_person_number", 0), errors="coerce") + .fillna(0) + .astype(int) + ) + person_number = ( + pd.to_numeric(result.get("person_number", 0), errors="coerce") + .fillna(0) + .astype(int) + ) + + valid_tax_unit_ids = result["tax_unit_id"].notna() & result["tax_unit_id"].astype( + str + ).str.strip().ne("") + grouped = result.loc[valid_tax_unit_ids].groupby( + ["household_id", "tax_unit_id"], sort=False + ) for _, unit_persons in grouped: member_index = unit_persons.index unit_relationship = relationship.loc[member_index] @@ -621,8 +650,12 @@ def _add_cps_tax_unit_structure_columns(persons: pd.DataFrame) -> pd.DataFrame: continue spouse_index.extend([int(idx), int(spouse_idx)]) if not spouse_index: - spouse_index = unit_relationship[unit_relationship.eq(1)].index.astype(int).tolist() - spouse_index = [idx for idx in dict.fromkeys(spouse_index) if idx not in dependent_index] + spouse_index = ( + unit_relationship[unit_relationship.eq(1)].index.astype(int).tolist() + ) + spouse_index = [ + idx for idx in dict.fromkeys(spouse_index) if idx not in dependent_index + ] head_index: int | None = None head_candidates = [ @@ -659,7 +692,9 @@ def _add_cps_tax_unit_structure_columns(persons: pd.DataFrame) -> pd.DataFrame: ] result.loc[member_index, "tax_unit_is_joint"] = float(bool(spouse_index)) - result.loc[member_index, "tax_unit_count_dependents"] = float(len(dependent_index)) + result.loc[member_index, "tax_unit_count_dependents"] = float( + len(dependent_index) + ) result.loc[dependent_index, "is_tax_unit_dependent"] = 1.0 if head_index is not None: result.loc[head_index, "is_tax_unit_head"] = 1.0 @@ -714,9 +749,7 @@ def _sample_households_and_persons( ) -> tuple[pd.DataFrame, pd.DataFrame]: """Sample households and keep all linked person records.""" household_sort_columns = [ - column - for column in ("household_id", "year") - if column in households.columns + column for column in ("household_id", "year") if column in households.columns ] person_sort_columns = [ column @@ -759,7 +792,9 @@ def _sample_households_and_persons( person_sort_columns, kind="mergesort", ) - return sampled_households.reset_index(drop=True), sampled_persons.reset_index(drop=True) + return sampled_households.reset_index(drop=True), sampled_persons.reset_index( + drop=True + ) def _sample_cps_households( @@ -1202,14 +1237,13 @@ def _process_persons(df: pl.DataFrame, year: int) -> pl.DataFrame: if not selected: raise ValueError("No recognized variables found in person file") result = df.select(selected) + result = randomize_cps_topcoded_age_80_84(result) # Scale weights: CPS ASEC weights have 2 implied decimal places # See CPS documentation: A_FNLWGT is expressed in units of 1/100 # Divide by 100 to get actual population representation if "weight" in result.columns: - result = result.with_columns( - (pl.col("weight") / 100).alias("weight") - ) + result = result.with_columns((pl.col("weight") / 100).alias("weight")) if "march_supplement_weight" in result.columns: result = result.with_columns( (pl.col("march_supplement_weight") / 100).alias("march_supplement_weight") @@ -1217,11 +1251,13 @@ def _process_persons(df: pl.DataFrame, year: int) -> pl.DataFrame: # Add derived columns if "age" in result.columns: - result = result.with_columns([ - (pl.col("age") >= 18).alias("is_adult"), - (pl.col("age") < 18).alias("is_child"), - (pl.col("age") >= 65).alias("is_senior"), - ]) + result = result.with_columns( + [ + (pl.col("age") >= 18).alias("is_adult"), + (pl.col("age") < 18).alias("is_child"), + (pl.col("age") >= 65).alias("is_senior"), + ] + ) if "race" in result.columns and "cps_race" not in result.columns: result = result.with_columns(pl.col("race").alias("cps_race")) @@ -1260,22 +1296,18 @@ def _process_persons(df: pl.DataFrame, year: int) -> pl.DataFrame: ): reason_1 = pl.col("_social_security_reason_1") reason_2 = pl.col("_social_security_reason_2") - has_retirement_reason = ( - (reason_1 == SOCIAL_SECURITY_RETIREMENT_REASON_CODE) - | (reason_2 == SOCIAL_SECURITY_RETIREMENT_REASON_CODE) - ) - has_disability_reason = ( - (reason_1 == SOCIAL_SECURITY_DISABILITY_REASON_CODE) - | (reason_2 == SOCIAL_SECURITY_DISABILITY_REASON_CODE) - ) - has_survivor_reason = ( - reason_1.is_in(SOCIAL_SECURITY_SURVIVOR_REASON_CODES) - | reason_2.is_in(SOCIAL_SECURITY_SURVIVOR_REASON_CODES) + has_retirement_reason = (reason_1 == SOCIAL_SECURITY_RETIREMENT_REASON_CODE) | ( + reason_2 == SOCIAL_SECURITY_RETIREMENT_REASON_CODE ) - has_dependent_reason = ( - reason_1.is_in(SOCIAL_SECURITY_DEPENDENT_REASON_CODES) - | reason_2.is_in(SOCIAL_SECURITY_DEPENDENT_REASON_CODES) + has_disability_reason = (reason_1 == SOCIAL_SECURITY_DISABILITY_REASON_CODE) | ( + reason_2 == SOCIAL_SECURITY_DISABILITY_REASON_CODE ) + has_survivor_reason = reason_1.is_in( + SOCIAL_SECURITY_SURVIVOR_REASON_CODES + ) | reason_2.is_in(SOCIAL_SECURITY_SURVIVOR_REASON_CODES) + has_dependent_reason = reason_1.is_in( + SOCIAL_SECURITY_DEPENDENT_REASON_CODES + ) | reason_2.is_in(SOCIAL_SECURITY_DEPENDENT_REASON_CODES) unclassified_social_security = ( (pl.col("social_security") > 0) & ~has_retirement_reason @@ -1371,8 +1403,7 @@ def _process_persons(df: pl.DataFrame, year: int) -> pl.DataFrame: "wage_income", "self_employment_income", }.issubset(set(result.columns)) and any( - leaf not in result.columns - for leaf in _RETIREMENT_CONTRIBUTION_DESIRED_LEAVES + leaf not in result.columns for leaf in _RETIREMENT_CONTRIBUTION_DESIRED_LEAVES ): retirement_contributions = pl.col("_retirement_contributions") has_wages = pl.col("wage_income") > 0 @@ -1383,7 +1414,9 @@ def _process_persons(df: pl.DataFrame, year: int) -> pl.DataFrame: # No statutory limit applied here (PolicyEngine-US applies it). se_pension = ( pl.when(has_se) - .then(retirement_contributions * SE_PENSION_SHARE_OF_RETIREMENT_CONTRIBUTIONS) + .then( + retirement_contributions * SE_PENSION_SHARE_OF_RETIREMENT_CONTRIBUTIONS + ) .otherwise(0.0) ) remaining = pl.max_horizontal( @@ -1399,11 +1432,7 @@ def _process_persons(df: pl.DataFrame, year: int) -> pl.DataFrame: .then(remaining * DC_SHARE_OF_RETIREMENT_CONTRIBUTIONS) .otherwise(0.0) ) - ira_pool = ( - pl.when(has_earned_income) - .then(remaining - dc_pool) - .otherwise(0.0) - ) + ira_pool = pl.when(has_earned_income).then(remaining - dc_pool).otherwise(0.0) derived_retirement_columns: list[pl.Expr] = [] if "self_employed_pension_contributions_desired" not in result.columns: @@ -1471,11 +1500,14 @@ def _process_persons(df: pl.DataFrame, year: int) -> pl.DataFrame: }.issubset(set(result.columns)) and "disability_benefits" not in result.columns: result = result.with_columns( ( - pl.when(pl.col("_disability_income_code_1") != WORKERS_COMP_DISABILITY_CODE) + pl.when( + pl.col("_disability_income_code_1") != WORKERS_COMP_DISABILITY_CODE + ) .then(pl.col("_disability_income_1")) .otherwise(0) - + - pl.when(pl.col("_disability_income_code_2") != WORKERS_COMP_DISABILITY_CODE) + + pl.when( + pl.col("_disability_income_code_2") != WORKERS_COMP_DISABILITY_CODE + ) .then(pl.col("_disability_income_2")) .otherwise(0) ).alias("disability_benefits") @@ -1559,13 +1591,10 @@ def _process_persons(df: pl.DataFrame, year: int) -> pl.DataFrame: if "employer_sponsored_insurance_premiums" not in result.columns: # Employee-paid premium (PHIP_VAL), clipped at zero like eCPS. employee_paid = ( - pl.when( - pl.col("health_insurance_premiums_without_medicare_part_b") > 0 - ) + pl.when(pl.col("health_insurance_premiums_without_medicare_part_b") > 0) .then(pl.col("health_insurance_premiums_without_medicare_part_b")) .otherwise(0.0) - if "health_insurance_premiums_without_medicare_part_b" - in result.columns + if "health_insurance_premiums_without_medicare_part_b" in result.columns else pl.lit(0.0) ) total_premium = ( @@ -1602,13 +1631,9 @@ def _process_persons(df: pl.DataFrame, year: int) -> pl.DataFrame: .otherwise(0.0) .alias("employer_sponsored_insurance_premiums") ) - result = result.drop( - [c for c in _esi_source_columns if c in result.columns] - ) + result = result.drop([c for c in _esi_source_columns if c in result.columns]) else: - result = result.drop( - [c for c in _esi_source_columns if c in result.columns] - ) + result = result.drop([c for c in _esi_source_columns if c in result.columns]) for value_column in PERSON_ZERO_DEFAULT_VALUE_COLUMNS: if value_column not in result.columns: result = result.with_columns(pl.lit(0.0).alias(value_column)) @@ -1631,12 +1656,12 @@ def _process_persons(df: pl.DataFrame, year: int) -> pl.DataFrame: for col in PERSON_NONNEGATIVE_VALUE_COLUMNS: if col in result.columns: result = result.with_columns( - pl.when(pl.col(col) < 0) - .then(0) - .otherwise(pl.col(col)) - .alias(col) + pl.when(pl.col(col) < 0).then(0).otherwise(pl.col(col)).alias(col) ) - if "marital_status" in result.columns and "is_surviving_spouse" not in result.columns: + if ( + "marital_status" in result.columns + and "is_surviving_spouse" not in result.columns + ): result = result.with_columns( (pl.col("marital_status") == 4).alias("is_surviving_spouse") ) @@ -1644,16 +1669,14 @@ def _process_persons(df: pl.DataFrame, year: int) -> pl.DataFrame: result = result.with_columns( (pl.col("marital_status") == 6).alias("is_separated") ) - if ( - {"household_id", "person_number", "spouse_person_number"}.issubset(result.columns) - and "marital_unit_id" not in result.columns - ): - raw_marital_unit_id = ( - pl.col("household_id").cast(pl.Int64) * 1_000_000 - + pl.max_horizontal( - pl.col("person_number").cast(pl.Int64), - pl.col("spouse_person_number").fill_null(0).cast(pl.Int64), - ) + if {"household_id", "person_number", "spouse_person_number"}.issubset( + result.columns + ) and "marital_unit_id" not in result.columns: + raw_marital_unit_id = pl.col("household_id").cast( + pl.Int64 + ) * 1_000_000 + pl.max_horizontal( + pl.col("person_number").cast(pl.Int64), + pl.col("spouse_person_number").fill_null(0).cast(pl.Int64), ) result = result.with_columns( raw_marital_unit_id.rank("dense").cast(pl.Int64).alias("marital_unit_id") @@ -1711,18 +1734,24 @@ def _attach_cps_ssn_card_type( if len(persons_raw) != len(persons): return fallback - household_weights = households.select(["household_id", "household_weight"]).to_pandas() + household_weights = households.select( + ["household_id", "household_weight"] + ).to_pandas() household_weight_map = dict( zip( pd.to_numeric(household_weights["household_id"], errors="coerce"), - pd.to_numeric(household_weights["household_weight"], errors="coerce").fillna(0.0), + pd.to_numeric( + household_weights["household_weight"], errors="coerce" + ).fillna(0.0), ) ) person_household_ids = pd.to_numeric( persons["household_id"].to_pandas(), errors="coerce", ) - person_weights = person_household_ids.map(household_weight_map).fillna(0.0).to_numpy() + person_weights = ( + person_household_ids.map(household_weight_map).fillna(0.0).to_numpy() + ) raw = persons_raw.select(sorted(required_person_columns)).to_pandas() @@ -1801,12 +1830,19 @@ def select_random_subset_to_target( has_five_plus_years = peinusyr.isin(list(range(8, 27))).to_numpy() has_three_plus_years = peinusyr.isin(list(range(8, 28))).to_numpy() is_married = marital.isin([1, 2]).to_numpy() & spouse_pointer.gt(0).to_numpy() - eligible_naturalized = is_naturalized & is_adult & ( - has_five_plus_years | (has_three_plus_years & is_married) + eligible_naturalized = ( + is_naturalized + & is_adult + & (has_five_plus_years | (has_three_plus_years & is_married)) ) has_medicare = medicare.eq(1).to_numpy() - has_federal_pension = pension_source_1.isin([3]).to_numpy() | pension_source_2.isin([3]).to_numpy() - has_ss_disability = social_security_reason_1.isin([2]).to_numpy() | social_security_reason_2.isin([2]).to_numpy() + has_federal_pension = ( + pension_source_1.isin([3]).to_numpy() | pension_source_2.isin([3]).to_numpy() + ) + has_ss_disability = ( + social_security_reason_1.isin([2]).to_numpy() + | social_security_reason_2.isin([2]).to_numpy() + ) has_ihs = ihs.eq(1).to_numpy() has_medicaid = medicaid.eq(1).to_numpy() has_champva = champva.eq(1).to_numpy() @@ -1853,11 +1889,7 @@ def select_random_subset_to_target( & noncitizens & ((wage_income.gt(0).to_numpy()) | (self_employment_income.gt(0).to_numpy())) ) - student_mask = ( - (ssn_card_type != 3) - & noncitizens - & student_status.eq(2).to_numpy() - ) + student_mask = (ssn_card_type != 3) & noncitizens & student_status.eq(2).to_numpy() worker_ids = np.flatnonzero(worker_mask) selected_workers = select_random_subset_to_target( @@ -1959,10 +1991,7 @@ def _attach_household_geography_to_persons( return persons joined = persons.join( households.select(["household_id", *geography_columns]).rename( - { - column: f"_household_{column}" - for column in geography_columns - } + {column: f"_household_{column}" for column in geography_columns} ), on="household_id", how="left", @@ -2015,9 +2044,7 @@ def _derive_households(persons: pl.DataFrame) -> pl.DataFrame: if "year" in persons.columns: year_val = persons.select("year").unique().to_series()[0] - households = households.with_columns( - pl.lit(year_val).alias("year") - ) + households = households.with_columns(pl.lit(year_val).alias("year")) return households diff --git a/src/microplex_us/data_sources/cps_age.py b/src/microplex_us/data_sources/cps_age.py new file mode 100644 index 0000000..3492488 --- /dev/null +++ b/src/microplex_us/data_sources/cps_age.py @@ -0,0 +1,46 @@ +"""CPS ASEC age recodes.""" + +import warnings + +import numpy as np +import polars as pl + +CPS_AGE_80_84_RANDOMIZATION_KEY = "age_randomization_80_84" + + +def _stable_string_hash(value: str) -> np.uint64: + """Return a deterministic hash compatible with policyengine-us-data.""" + with warnings.catch_warnings(): + warnings.filterwarnings("ignore", "overflow encountered", RuntimeWarning) + result = np.uint64(0) + for byte in value.encode("utf-8"): + result = result * np.uint64(31) + np.uint64(byte) + result = result ^ (result >> np.uint64(33)) + result = result * np.uint64(0xFF51AFD7ED558CCD) + result = result ^ (result >> np.uint64(33)) + return result + + +def cps_seeded_rng(key: str) -> np.random.Generator: + """Create a deterministic CPS recode RNG without importing us-data.""" + seed = int(_stable_string_hash(key)) % (2**63) + return np.random.default_rng(seed=seed) + + +def randomize_cps_topcoded_age_80_84( + frame: pl.DataFrame, + *, + age_column: str = "age", +) -> pl.DataFrame: + """Spread CPS A_AGE==80, meaning ages 80-84, across integer ages 80-84.""" + if age_column not in frame.columns: + return frame + ages = frame[age_column].to_numpy().astype(np.int64, copy=True) + age_80 = ages == 80 + if not age_80.any(): + return frame + + rng = cps_seeded_rng(CPS_AGE_80_84_RANDOMIZATION_KEY) + draws = rng.integers(80, 85, len(ages), dtype=np.int64) + ages[age_80] = draws[age_80] + return frame.with_columns(pl.Series(age_column, ages)) diff --git a/src/microplex_us/data_sources/cps_mappings.py b/src/microplex_us/data_sources/cps_mappings.py index f9763b4..de755b9 100644 --- a/src/microplex_us/data_sources/cps_mappings.py +++ b/src/microplex_us/data_sources/cps_mappings.py @@ -15,6 +15,8 @@ import polars as pl +from microplex_us.data_sources.cps_age import randomize_cps_topcoded_age_80_84 + class CoverageLevel(Enum): """How well CPS covers a policyengine-us variable.""" @@ -66,53 +68,57 @@ def _register(mapping: VariableMapping) -> VariableMapping: # DIRECT MAPPINGS (Full Coverage) # ============================================================================= -_register(VariableMapping( - policyengine_us_variable="age", - statute_ref="26 USC 63(f), 24(c)(1)", - cps_columns=["A_AGE"], - coverage=CoverageLevel.FULL, - entity="Person", - notes="Direct mapping. CPS age is as of survey date (March).", -)) +_register( + VariableMapping( + policyengine_us_variable="age", + statute_ref="26 USC 63(f), 24(c)(1)", + cps_columns=["A_AGE"], + coverage=CoverageLevel.FULL, + entity="Person", + notes="CPS A_AGE is as of survey date (March); A_AGE==80 represents ages 80-84.", + ) +) def map_age(persons: pl.DataFrame) -> pl.DataFrame: - """Map CPS A_AGE to age.""" - return persons.with_columns( - pl.col("A_AGE").alias("age") + """Map CPS A_AGE to age, spreading the 80-84 topcode.""" + return randomize_cps_topcoded_age_80_84( + persons.with_columns(pl.col("A_AGE").alias("age")) ) -_register(VariableMapping( - policyengine_us_variable="household_size", - statute_ref="7 USC 2014(c)", - cps_columns=["H_NUMPER"], - coverage=CoverageLevel.FULL, - entity="Household", - notes="Direct mapping for SNAP household size.", -)) +_register( + VariableMapping( + policyengine_us_variable="household_size", + statute_ref="7 USC 2014(c)", + cps_columns=["H_NUMPER"], + coverage=CoverageLevel.FULL, + entity="Household", + notes="Direct mapping for SNAP household size.", + ) +) def map_household_size(households: pl.DataFrame) -> pl.DataFrame: """Map CPS H_NUMPER to household_size.""" - return households.with_columns( - pl.col("H_NUMPER").alias("household_size") - ) + return households.with_columns(pl.col("H_NUMPER").alias("household_size")) # ============================================================================= # EARNED INCOME (Full Coverage) # ============================================================================= -_register(VariableMapping( - policyengine_us_variable="earned_income", - statute_ref="26 USC 32(c)(2) - Earned income defined", - cps_columns=["WSAL_VAL", "SEMP_VAL"], - coverage=CoverageLevel.FULL, - entity="Person", - notes="Sum of wages/salaries and self-employment income. " - "32(c)(2) defines earned income for EITC purposes.", -)) +_register( + VariableMapping( + policyengine_us_variable="earned_income", + statute_ref="26 USC 32(c)(2) - Earned income defined", + cps_columns=["WSAL_VAL", "SEMP_VAL"], + coverage=CoverageLevel.FULL, + entity="Person", + notes="Sum of wages/salaries and self-employment income. " + "32(c)(2) defines earned income for EITC purposes.", + ) +) def map_earned_income(persons: pl.DataFrame) -> pl.DataFrame: @@ -122,8 +128,9 @@ def map_earned_income(persons: pl.DataFrame) -> pl.DataFrame: Earned income = wages + salaries + tips + self-employment income """ return persons.with_columns( - (pl.col("WSAL_VAL").fill_null(0) + pl.col("SEMP_VAL").fill_null(0)) - .alias("earned_income") + (pl.col("WSAL_VAL").fill_null(0) + pl.col("SEMP_VAL").fill_null(0)).alias( + "earned_income" + ) ) @@ -131,32 +138,34 @@ def map_earned_income(persons: pl.DataFrame) -> pl.DataFrame: # FILING STATUS (Derived) # ============================================================================= -_register(VariableMapping( - policyengine_us_variable="filing_status", - statute_ref="26 USC 1 (tax rates by status), 2 (definitions)", - cps_columns=["A_MARITL", "A_AGE", "A_EXPRRP"], - coverage=CoverageLevel.DERIVED, - entity="TaxUnit", - gaps=[ - CoverageGap( - component="head_of_household", - statute_ref="26 USC 2(b)", - impact="medium", - notes="Requires determining if taxpayer maintains household for " - "qualifying person. CPS has household relationships but " - "determining HoH status requires assumptions.", - ), - CoverageGap( - component="qualifying_widow", - statute_ref="26 USC 2(a)", - impact="low", - notes="Requires spouse death within 2 years AND dependent child. " - "CPS doesn't track year of spouse's death.", - ), - ], - notes="Simplified mapping: married w/ spouse -> joint, others -> single. " - "Head of household determination requires additional logic.", -)) +_register( + VariableMapping( + policyengine_us_variable="filing_status", + statute_ref="26 USC 1 (tax rates by status), 2 (definitions)", + cps_columns=["A_MARITL", "A_AGE", "A_EXPRRP"], + coverage=CoverageLevel.DERIVED, + entity="TaxUnit", + gaps=[ + CoverageGap( + component="head_of_household", + statute_ref="26 USC 2(b)", + impact="medium", + notes="Requires determining if taxpayer maintains household for " + "qualifying person. CPS has household relationships but " + "determining HoH status requires assumptions.", + ), + CoverageGap( + component="qualifying_widow", + statute_ref="26 USC 2(a)", + impact="low", + notes="Requires spouse death within 2 years AND dependent child. " + "CPS doesn't track year of spouse's death.", + ), + ], + notes="Simplified mapping: married w/ spouse -> joint, others -> single. " + "Head of household determination requires additional logic.", + ) +) # CPS A_MARITL codes _MARITL_MARRIED_SPOUSE_PRESENT = 1 @@ -192,52 +201,54 @@ def map_filing_status(persons: pl.DataFrame) -> pl.DataFrame: # BLINDNESS (Full Coverage) # ============================================================================= -_register(VariableMapping( - policyengine_us_variable="is_blind", - statute_ref="26 USC 63(f)(2) - Additional standard deduction for blind", - cps_columns=["PEDISEYE"], - coverage=CoverageLevel.FULL, - entity="Person", - notes="PEDISEYE: 1 = serious difficulty seeing, 2 = no difficulty. " - "Tax definition of blind (63(f)(4)) is more specific " - "(corrected vision <= 20/200 or field <= 20 deg) but CPS proxy is reasonable.", -)) +_register( + VariableMapping( + policyengine_us_variable="is_blind", + statute_ref="26 USC 63(f)(2) - Additional standard deduction for blind", + cps_columns=["PEDISEYE"], + coverage=CoverageLevel.FULL, + entity="Person", + notes="PEDISEYE: 1 = serious difficulty seeing, 2 = no difficulty. " + "Tax definition of blind (63(f)(4)) is more specific " + "(corrected vision <= 20/200 or field <= 20 deg) but CPS proxy is reasonable.", + ) +) def map_is_blind(persons: pl.DataFrame) -> pl.DataFrame: """Map CPS PEDISEYE to is_blind.""" - return persons.with_columns( - (pl.col("PEDISEYE") == 1).alias("is_blind") - ) + return persons.with_columns((pl.col("PEDISEYE") == 1).alias("is_blind")) # ============================================================================= # IS DEPENDENT (Derived) # ============================================================================= -_register(VariableMapping( - policyengine_us_variable="is_dependent", - statute_ref="26 USC 152 - Dependent defined", - cps_columns=["A_EXPRRP", "A_AGE", "WSAL_VAL"], - coverage=CoverageLevel.DERIVED, - entity="Person", - gaps=[ - CoverageGap( - component="support_test", - statute_ref="26 USC 152(c)(1)(D), 152(d)(1)(C)", - impact="medium", - notes="CPS doesn't track who provides >50% support.", - ), - CoverageGap( - component="joint_return_test", - statute_ref="26 USC 152(c)(1)(E)", - impact="low", - notes="Can't determine if dependent filed joint return.", - ), - ], - notes="Simplified: children under 19 (or under 24 if student) are dependents. " - "Doesn't verify support test or other 152 requirements.", -)) +_register( + VariableMapping( + policyengine_us_variable="is_dependent", + statute_ref="26 USC 152 - Dependent defined", + cps_columns=["A_EXPRRP", "A_AGE", "WSAL_VAL"], + coverage=CoverageLevel.DERIVED, + entity="Person", + gaps=[ + CoverageGap( + component="support_test", + statute_ref="26 USC 152(c)(1)(D), 152(d)(1)(C)", + impact="medium", + notes="CPS doesn't track who provides >50% support.", + ), + CoverageGap( + component="joint_return_test", + statute_ref="26 USC 152(c)(1)(E)", + impact="low", + notes="Can't determine if dependent filed joint return.", + ), + ], + notes="Simplified: children under 19 (or under 24 if student) are dependents. " + "Doesn't verify support test or other 152 requirements.", + ) +) def map_is_dependent(persons: pl.DataFrame) -> pl.DataFrame: @@ -250,10 +261,9 @@ def map_is_dependent(persons: pl.DataFrame) -> pl.DataFrame: """ # A_EXPRRP codes for children: 4 = child, 8 = grandchild return persons.with_columns( - ( - (pl.col("A_EXPRRP").is_in([4, 8])) & - (pl.col("A_AGE") < 19) - ).alias("is_dependent") + ((pl.col("A_EXPRRP").is_in([4, 8])) & (pl.col("A_AGE") < 19)).alias( + "is_dependent" + ) ) @@ -261,30 +271,32 @@ def map_is_dependent(persons: pl.DataFrame) -> pl.DataFrame: # CTC QUALIFYING CHILDREN (Derived) # ============================================================================= -_register(VariableMapping( - policyengine_us_variable="ctc_qualifying_children", - statute_ref="26 USC 24(c) - Qualifying child (under 17, per 152(c))", - cps_columns=["A_AGE", "A_EXPRRP", "PH_SEQ", "A_LINENO"], - coverage=CoverageLevel.DERIVED, - entity="TaxUnit", - gaps=[ - CoverageGap( - component="citizenship_test", - statute_ref="26 USC 24(c)(2)", - impact="low", - notes="Child must be US citizen/national/resident. " - "CPS has citizenship but we don't filter on it.", - ), - CoverageGap( - component="ssn_requirement", - statute_ref="26 USC 24(h)(7)", - impact="low", - notes="Child must have SSN. Not in CPS.", - ), - ], - notes="Count of children under 17 with qualifying relationship. " - "Age limit is 17 per 24(c)(1): 'has not attained age 17'.", -)) +_register( + VariableMapping( + policyengine_us_variable="ctc_qualifying_children", + statute_ref="26 USC 24(c) - Qualifying child (under 17, per 152(c))", + cps_columns=["A_AGE", "A_EXPRRP", "PH_SEQ", "A_LINENO"], + coverage=CoverageLevel.DERIVED, + entity="TaxUnit", + gaps=[ + CoverageGap( + component="citizenship_test", + statute_ref="26 USC 24(c)(2)", + impact="low", + notes="Child must be US citizen/national/resident. " + "CPS has citizenship but we don't filter on it.", + ), + CoverageGap( + component="ssn_requirement", + statute_ref="26 USC 24(h)(7)", + impact="low", + notes="Child must have SSN. Not in CPS.", + ), + ], + notes="Count of children under 17 with qualifying relationship. " + "Age limit is 17 per 24(c)(1): 'has not attained age 17'.", + ) +) def map_ctc_qualifying_children(persons: pl.DataFrame) -> pl.DataFrame: @@ -300,22 +312,18 @@ def map_ctc_qualifying_children(persons: pl.DataFrame) -> pl.DataFrame: # First, identify qualifying children persons_with_flag = persons.with_columns( ( - (pl.col("A_EXPRRP") == 4) & # Child of householder - (pl.col("A_AGE") < 17) # Under 17 + (pl.col("A_EXPRRP") == 4) # Child of householder + & (pl.col("A_AGE") < 17) # Under 17 ).alias("_is_ctc_child") ) # Count per household (using PH_SEQ as household ID) - child_counts = ( - persons_with_flag - .group_by("PH_SEQ") - .agg(pl.col("_is_ctc_child").sum().alias("ctc_qualifying_children")) + child_counts = persons_with_flag.group_by("PH_SEQ").agg( + pl.col("_is_ctc_child").sum().alias("ctc_qualifying_children") ) # Join back and assign to reference person (A_LINENO = 1 typically) - result = persons_with_flag.join( - child_counts, on="PH_SEQ", how="left" - ) + result = persons_with_flag.join(child_counts, on="PH_SEQ", how="left") # Only the tax unit head gets the count; others get 0 # (Simplified: reference person is tax unit head) @@ -333,51 +341,53 @@ def map_ctc_qualifying_children(persons: pl.DataFrame) -> pl.DataFrame: # AGI PROXY (Partial Coverage) # ============================================================================= -_register(VariableMapping( - policyengine_us_variable="adjusted_gross_income", - statute_ref="26 USC 62(a) - Adjusted gross income defined", - cps_columns=["WSAL_VAL", "SEMP_VAL", "INT_VAL", "DIV_VAL", "PNSN_VAL"], - coverage=CoverageLevel.PARTIAL, - entity="TaxUnit", - expected_gap_pct=0.15, # Expect ~15% undercount vs SOI - gaps=[ - CoverageGap( - component="capital_gains", - statute_ref="26 USC 61(a)(3), 1222", - impact="high", - notes="CPS does not collect capital gains. SOI 2021: ~$1.2T. " - "Major source of income for top brackets.", - ), - CoverageGap( - component="ira_deduction", - statute_ref="26 USC 62(a)(7)", - impact="medium", - notes="Above-the-line IRA deduction not in CPS. Our proxy will " - "overstate AGI by this amount.", - ), - CoverageGap( - component="student_loan_interest", - statute_ref="26 USC 62(a)(17)", - impact="low", - notes="Student loan interest deduction not in CPS.", - ), - CoverageGap( - component="self_employment_tax_deduction", - statute_ref="26 USC 62(a)(1)", - impact="medium", - notes="Deductible portion of SE tax not calculable from CPS.", - ), - CoverageGap( - component="interest_dividends_underreporting", - statute_ref="26 USC 61(a)(4), (7)", - impact="medium", - notes="CPS interest/dividends underreported by ~40% vs SOI.", - ), - ], - notes="AGI proxy using available CPS income. Known to undercount vs SOI " - "due to missing capital gains and underreported investment income. " - "Also overstates slightly due to missing above-line deductions.", -)) +_register( + VariableMapping( + policyengine_us_variable="adjusted_gross_income", + statute_ref="26 USC 62(a) - Adjusted gross income defined", + cps_columns=["WSAL_VAL", "SEMP_VAL", "INT_VAL", "DIV_VAL", "PNSN_VAL"], + coverage=CoverageLevel.PARTIAL, + entity="TaxUnit", + expected_gap_pct=0.15, # Expect ~15% undercount vs SOI + gaps=[ + CoverageGap( + component="capital_gains", + statute_ref="26 USC 61(a)(3), 1222", + impact="high", + notes="CPS does not collect capital gains. SOI 2021: ~$1.2T. " + "Major source of income for top brackets.", + ), + CoverageGap( + component="ira_deduction", + statute_ref="26 USC 62(a)(7)", + impact="medium", + notes="Above-the-line IRA deduction not in CPS. Our proxy will " + "overstate AGI by this amount.", + ), + CoverageGap( + component="student_loan_interest", + statute_ref="26 USC 62(a)(17)", + impact="low", + notes="Student loan interest deduction not in CPS.", + ), + CoverageGap( + component="self_employment_tax_deduction", + statute_ref="26 USC 62(a)(1)", + impact="medium", + notes="Deductible portion of SE tax not calculable from CPS.", + ), + CoverageGap( + component="interest_dividends_underreporting", + statute_ref="26 USC 61(a)(4), (7)", + impact="medium", + notes="CPS interest/dividends underreported by ~40% vs SOI.", + ), + ], + notes="AGI proxy using available CPS income. Known to undercount vs SOI " + "due to missing capital gains and underreported investment income. " + "Also overstates slightly due to missing above-line deductions.", + ) +) def map_agi_proxy(persons: pl.DataFrame) -> pl.DataFrame: @@ -410,6 +420,7 @@ def map_agi_proxy(persons: pl.DataFrame) -> pl.DataFrame: # REGISTRY FUNCTIONS # ============================================================================= + def get_mapping_metadata(variable_name: str) -> VariableMapping: """Get metadata for a variable mapping.""" # Handle aliases diff --git a/src/microplex_us/data_sources/cps_transform.py b/src/microplex_us/data_sources/cps_transform.py index 297c097..901e6e0 100644 --- a/src/microplex_us/data_sources/cps_transform.py +++ b/src/microplex_us/data_sources/cps_transform.py @@ -9,6 +9,7 @@ import polars as pl from microplex_us.data_sources.cps import CPSDataset +from microplex_us.data_sources.cps_age import randomize_cps_topcoded_age_80_84 from microplex_us.data_sources.cps_mappings import ( CoverageLevel, get_all_mappings, @@ -79,15 +80,21 @@ def _transform_persons(persons: pl.DataFrame) -> pl.DataFrame: # Age if "A_AGE" in result.columns: - result = result.with_columns(pl.col("A_AGE").alias("age")) + result = randomize_cps_topcoded_age_80_84( + result.with_columns(pl.col("A_AGE").alias("age")) + ) # Earned income (wages + self-employment) wage_col = "WSAL_VAL" if "WSAL_VAL" in result.columns else "wage_income" semp_col = "SEMP_VAL" if "SEMP_VAL" in result.columns else "self_employment_income" if wage_col in result.columns or semp_col in result.columns: - wage_expr = pl.col(wage_col).fill_null(0) if wage_col in result.columns else pl.lit(0) - semp_expr = pl.col(semp_col).fill_null(0) if semp_col in result.columns else pl.lit(0) + wage_expr = ( + pl.col(wage_col).fill_null(0) if wage_col in result.columns else pl.lit(0) + ) + semp_expr = ( + pl.col(semp_col).fill_null(0) if semp_col in result.columns else pl.lit(0) + ) # Earned income for EITC is non-negative result = result.with_columns( @@ -96,9 +103,7 @@ def _transform_persons(persons: pl.DataFrame) -> pl.DataFrame: # Is blind if "PEDISEYE" in result.columns: - result = result.with_columns( - (pl.col("PEDISEYE") == 1).alias("is_blind") - ) + result = result.with_columns((pl.col("PEDISEYE") == 1).alias("is_blind")) else: result = result.with_columns(pl.lit(False).alias("is_blind")) @@ -106,8 +111,8 @@ def _transform_persons(persons: pl.DataFrame) -> pl.DataFrame: if "A_EXPRRP" in result.columns and "age" in result.columns: result = result.with_columns( ( - (pl.col("A_EXPRRP").is_in([4, 8])) & # Child or grandchild - (pl.col("age") < 19) + (pl.col("A_EXPRRP").is_in([4, 8])) # Child or grandchild + & (pl.col("age") < 19) ).alias("is_dependent") ) else: @@ -116,7 +121,9 @@ def _transform_persons(persons: pl.DataFrame) -> pl.DataFrame: return result -def _construct_tax_units(persons: pl.DataFrame, households: pl.DataFrame) -> pl.DataFrame: +def _construct_tax_units( + persons: pl.DataFrame, households: pl.DataFrame +) -> pl.DataFrame: """ Construct tax units from persons. @@ -137,10 +144,8 @@ def _construct_tax_units(persons: pl.DataFrame, households: pl.DataFrame) -> pl. lineno_col = "A_LINENO" if "A_LINENO" in persons.columns else None # Aggregate earned income to household level - earned_income_agg = ( - persons - .group_by(hh_id_col) - .agg(pl.col("earned_income").sum().alias("earned_income")) + earned_income_agg = persons.group_by(hh_id_col).agg( + pl.col("earned_income").sum().alias("earned_income") ) # Count CTC qualifying children per household @@ -149,8 +154,8 @@ def _construct_tax_units(persons: pl.DataFrame, households: pl.DataFrame) -> pl. if "A_EXPRRP" in persons.columns: persons_with_ctc = persons.with_columns( ( - (pl.col("A_EXPRRP") == 4) & # Child of householder - (pl.col("age") < 17) + (pl.col("A_EXPRRP") == 4) # Child of householder + & (pl.col("age") < 17) ).alias("_is_ctc_child") ) else: @@ -158,16 +163,12 @@ def _construct_tax_units(persons: pl.DataFrame, households: pl.DataFrame) -> pl. (pl.col("age") < 17).alias("_is_ctc_child") ) - ctc_counts = ( - persons_with_ctc - .group_by(hh_id_col) - .agg(pl.col("_is_ctc_child").sum().alias("ctc_qualifying_children")) + ctc_counts = persons_with_ctc.group_by(hh_id_col).agg( + pl.col("_is_ctc_child").sum().alias("ctc_qualifying_children") ) else: - ctc_counts = ( - persons - .group_by(hh_id_col) - .agg(pl.lit(0).alias("ctc_qualifying_children")) + ctc_counts = persons.group_by(hh_id_col).agg( + pl.lit(0).alias("ctc_qualifying_children") ) # Get reference person attributes for each household @@ -195,9 +196,7 @@ def _construct_tax_units(persons: pl.DataFrame, households: pl.DataFrame) -> pl. .alias("filing_status") ) else: - ref_persons = ref_persons.with_columns( - pl.lit("single").alias("filing_status") - ) + ref_persons = ref_persons.with_columns(pl.lit("single").alias("filing_status")) # Get weight weight_col = "A_FNLWGT" if "A_FNLWGT" in ref_persons.columns else "weight" @@ -239,31 +238,39 @@ def _construct_tax_units(persons: pl.DataFrame, households: pl.DataFrame) -> pl. invest_agg_exprs.append(pl.col(div_col).fill_null(0).sum().alias("_div")) if invest_agg_exprs: - invest_income = ( - persons - .group_by(hh_id_col) - .agg(invest_agg_exprs) - ) + invest_income = persons.group_by(hh_id_col).agg(invest_agg_exprs) tax_units = tax_units.join(invest_income, on=hh_id_col, how="left") # Add to AGI proxy - int_expr = pl.col("_int").fill_null(0) if "_int" in tax_units.columns else pl.lit(0) - div_expr = pl.col("_div").fill_null(0) if "_div" in tax_units.columns else pl.lit(0) + int_expr = ( + pl.col("_int").fill_null(0) + if "_int" in tax_units.columns + else pl.lit(0) + ) + div_expr = ( + pl.col("_div").fill_null(0) + if "_div" in tax_units.columns + else pl.lit(0) + ) tax_units = tax_units.with_columns( (pl.col("agi_proxy") + int_expr + div_expr).alias("agi_proxy") ) # Drop temp columns - tax_units = tax_units.drop([c for c in ["_int", "_div"] if c in tax_units.columns]) + tax_units = tax_units.drop( + [c for c in ["_int", "_div"] if c in tax_units.columns] + ) # Fill nulls - tax_units = tax_units.with_columns([ - pl.col("earned_income").fill_null(0), - pl.col("ctc_qualifying_children").fill_null(0), - pl.col("agi_proxy").fill_null(0), - ]) + tax_units = tax_units.with_columns( + [ + pl.col("earned_income").fill_null(0), + pl.col("ctc_qualifying_children").fill_null(0), + pl.col("agi_proxy").fill_null(0), + ] + ) # Rename household ID to tax_unit_id tax_units = tax_units.rename({hh_id_col: "tax_unit_id"}) @@ -293,13 +300,15 @@ def _generate_coverage_report() -> dict: # Collect gaps for gap in m.gaps: - gaps.append({ - "variable": m.policyengine_us_variable, - "component": gap.component, - "statute_ref": gap.statute_ref, - "impact": gap.impact, - "notes": gap.notes, - }) + gaps.append( + { + "variable": m.policyengine_us_variable, + "component": gap.component, + "statute_ref": gap.statute_ref, + "impact": gap.impact, + "notes": gap.notes, + } + ) return { "full": full, diff --git a/tests/data_sources/test_cps_age.py b/tests/data_sources/test_cps_age.py new file mode 100644 index 0000000..6abbe72 --- /dev/null +++ b/tests/data_sources/test_cps_age.py @@ -0,0 +1,55 @@ +import zipfile + +import pandas as pd +import polars as pl + +from microplex_us.data_sources.cps import load_cps_asec +from microplex_us.data_sources.cps_age import randomize_cps_topcoded_age_80_84 +from microplex_us.data_sources.cps_mappings import map_age + + +def test_randomize_cps_topcoded_age_80_84_is_deterministic(): + frame = pl.DataFrame({"age": [79, *([80] * 20), 85]}) + + first = randomize_cps_topcoded_age_80_84(frame) + second = randomize_cps_topcoded_age_80_84(frame) + ages = first["age"].to_list() + + assert ages == second["age"].to_list() + assert ages[0] == 79 + assert ages[-1] == 85 + assert set(ages[1:-1]).issubset({80, 81, 82, 83, 84}) + assert len(set(ages[1:-1])) > 1 + + +def test_map_age_spreads_cps_80_to_80_84(): + frame = pl.DataFrame({"A_AGE": [79, *([80] * 20), 85]}) + + result = map_age(frame) + ages = result["age"].to_list() + + assert ages[0] == 79 + assert ages[-1] == 85 + assert set(ages[1:-1]).issubset({80, 81, 82, 83, 84}) + assert len(set(ages[1:-1])) > 1 + + +def test_load_cps_asec_spreads_topcoded_age_80(tmp_path): + person_rows = pd.DataFrame( + { + "PH_SEQ": [1] * 22, + "A_LINENO": list(range(1, 23)), + "A_AGE": [79, *([80] * 20), 85], + "A_FNLWGT": [100] * 22, + } + ) + with zipfile.ZipFile(tmp_path / "cps_asec_2023.zip", "w") as archive: + archive.writestr("pppub23.csv", person_rows.to_csv(index=False)) + + dataset = load_cps_asec(year=2023, cache_dir=tmp_path, download=False) + ages = dataset.persons.sort("person_number")["age"].to_list() + + assert ages[0] == 79 + assert ages[-1] == 85 + assert set(ages[1:-1]).issubset({80, 81, 82, 83, 84}) + assert len(set(ages[1:-1])) > 1