From e5e4fbdfd382020e6d56a95f859113154cc9fc57 Mon Sep 17 00:00:00 2001 From: Max Ghenis Date: Wed, 3 Jun 2026 07:50:03 -0400 Subject: [PATCH] Populate eCPS-supported export inputs --- src/microplex_us/data_sources/cps.py | 899 +++++++++++++++++- src/microplex_us/data_sources/puf.py | 246 +++++ .../manifests/pe_source_impute_blocks.json | 16 +- src/microplex_us/pipelines/us.py | 249 +++++ src/microplex_us/variables.py | 12 + .../test_cps_export_support_fields.py | 268 ++++++ tests/pipelines/test_us.py | 188 ++++ tests/test_cps_source_provider.py | 59 +- tests/test_donor_survey_source_providers.py | 6 + tests/test_pe_source_impute_specs.py | 6 +- tests/test_puf_source_provider.py | 77 ++ 11 files changed, 1970 insertions(+), 56 deletions(-) create mode 100644 tests/data_sources/test_cps_export_support_fields.py diff --git a/src/microplex_us/data_sources/cps.py b/src/microplex_us/data_sources/cps.py index 21b665e..6c232de 100644 --- a/src/microplex_us/data_sources/cps.py +++ b/src/microplex_us/data_sources/cps.py @@ -8,6 +8,7 @@ Data source: https://www.census.gov/data/datasets/time-series/demo/cps/cps-asec.html """ +import zipfile from collections.abc import Callable from dataclasses import dataclass from pathlib import Path @@ -38,7 +39,107 @@ # Default cache directory DEFAULT_CACHE_DIR = Path.home() / ".cache" / "microplex" -CPS_ASEC_PROCESSED_CACHE_VERSION = "20260602_childcare_input" +CPS_ASEC_PROCESSED_CACHE_VERSION = "20260603_ecps_export_support_prior_year" + +CURRENT_HEALTH_COVERAGE_REPORTED_VAR_MAP = { + "reported_has_direct_purchase_health_coverage_at_interview": "NOW_DIR", + "reported_has_marketplace_health_coverage_at_interview": "NOW_MRK", + "reported_has_subsidized_marketplace_health_coverage_at_interview": "NOW_MRKS", + "reported_has_unsubsidized_marketplace_health_coverage_at_interview": "NOW_MRKUN", + "reported_has_non_marketplace_direct_purchase_health_coverage_at_interview": ( + "NOW_NONM" + ), + "reported_has_employer_sponsored_health_coverage_at_interview": "NOW_GRP", + "reported_has_medicare_health_coverage_at_interview": "NOW_MCARE", + "reported_has_medicaid_health_coverage_at_interview": "NOW_CAID", + "reported_has_means_tested_health_coverage_at_interview": "NOW_MCAID", + "reported_has_chip_health_coverage_at_interview": "NOW_PCHIP", + "reported_has_other_means_tested_health_coverage_at_interview": "NOW_OTHMT", + "reported_has_tricare_health_coverage_at_interview": "NOW_MIL", + "reported_has_champva_health_coverage_at_interview": "NOW_CHAMPVA", + "reported_has_va_health_coverage_at_interview": "NOW_VACARE", + "reported_has_indian_health_service_coverage_at_interview": "NOW_IHSFLG", +} + +CURRENT_HEALTH_COVERAGE_RULE_INPUT_ALIAS_MAP = { + "has_marketplace_health_coverage_at_interview": ( + "reported_has_marketplace_health_coverage_at_interview" + ), + "has_non_marketplace_direct_purchase_health_coverage_at_interview": ( + "reported_has_non_marketplace_direct_purchase_health_coverage_at_interview" + ), + "has_medicaid_health_coverage_at_interview": ( + "reported_has_medicaid_health_coverage_at_interview" + ), + "has_other_means_tested_health_coverage_at_interview": ( + "reported_has_other_means_tested_health_coverage_at_interview" + ), + "has_tricare_health_coverage_at_interview": ( + "reported_has_tricare_health_coverage_at_interview" + ), + "has_champva_health_coverage_at_interview": ( + "reported_has_champva_health_coverage_at_interview" + ), + "has_va_health_coverage_at_interview": ( + "reported_has_va_health_coverage_at_interview" + ), + "has_indian_health_service_coverage_at_interview": ( + "reported_has_indian_health_service_coverage_at_interview" + ), +} + +CENSUS_OCCUPATION_CODE_TO_TTOC = { + 725: 502, + 2350: 507, + 2633: 502, + 2752: 206, + 2755: 207, + 2770: 208, + 2910: 503, + 3602: 501, + 3630: 602, + 4000: 105, + 4010: 106, + 4030: 106, + 4040: 101, + 4055: 107, + 4110: 102, + 4120: 103, + 4130: 104, + 4140: 108, + 4150: 109, + 4160: 106, + 4230: 304, + 4251: 402, + 4350: 506, + 4420: 210, + 4500: 603, + 4510: 603, + 4521: 605, + 4522: 601, + 4600: 508, + 4621: 607, + 4655: 501, + 5130: 203, + 5300: 303, + 6355: 403, + 6442: 404, + 7120: 401, + 7200: 409, + 7315: 405, + 7320: 406, + 7340: 401, + 7540: 408, + 7610: 401, + 7800: 110, + 8510: 401, + 9122: 806, + 9141: 803, + 9142: 802, + 9350: 801, + 9610: 805, + 9620: 809, +} # CPS ASEC data URLs by year CPS_URLS = { @@ -74,6 +175,12 @@ "A_CLSWKR": "class_of_worker", "A_WKSTAT": "work_status", "A_HRS1": "hours_worked", + "A_HSCOL": "_high_school_or_college_status", + "A_HRLYWK": "_is_paid_hourly_code", + "A_HRSPAY": "_hourly_pay_cents", + "A_UNMEM": "_union_member_code", + "POCCU2": "detailed_occupation_recode", + "PEIOOCC": "_detailed_census_occupation_code", # Income (annual) "WSAL_VAL": "wage_income", "SEMP_VAL": "self_employment_income", @@ -85,9 +192,22 @@ "INT_VAL": "interest_income", "DIV_VAL": "dividend_income", "RNT_VAL": "rental_income", + "ANN_VAL": "_annuity_income", + "PNSN_VAL": "_pension_income", "SS_VAL": "social_security", "SSI_VAL": "ssi", "UC_VAL": "unemployment_compensation", + "LKWEEKS": "weeks_unemployed", + "VET_VAL": "veterans_benefits", + "WC_VAL": "workers_compensation", + "DST_SC1": "_retirement_distribution_code_1", + "DST_SC2": "_retirement_distribution_code_2", + "DST_SC1_YNG": "_retirement_distribution_code_1_yng", + "DST_SC2_YNG": "_retirement_distribution_code_2_yng", + "DST_VAL1": "_retirement_distribution_value_1", + "DST_VAL2": "_retirement_distribution_value_2", + "DST_VAL1_YNG": "_retirement_distribution_value_1_yng", + "DST_VAL2_YNG": "_retirement_distribution_value_2_yng", # CPS-derived direct income copies (mirror eCPS cps.py:1493-1495). "SRVS_VAL": "survivor_benefits", "ED_VAL": "educational_assistance", @@ -103,6 +223,13 @@ "MCAID": "has_medicaid", "NOW_GRP": "has_esi", "NOW_MRK": "has_marketplace_health_coverage", + **{ + census_name: f"_{leaf}" + for leaf, census_name in CURRENT_HEALTH_COVERAGE_REPORTED_VAR_MAP.items() + }, + "NOW_PRIV": "_reported_has_private_health_coverage_at_interview", + "NOW_PUB": "_reported_has_public_health_coverage_at_interview", + "NOW_COV": "_reported_current_health_coverage_code", # Employer-sponsored insurance policyholder + premium inputs (eCPS # cps.py:197-275). NOW_OWNGRP flags own-name current group (ESI) coverage; # NOW_HIPAID is who pays the premium; NOW_GRPFTYP is family vs self-only @@ -204,6 +331,23 @@ "social_security_dependents", "spm_unit_energy_subsidy", "spm_unit_pre_subsidy_childcare_expenses", + "hourly_wage", + "taxable_private_pension_income", + "tax_exempt_private_pension_income", + "taxable_401k_distributions", + "tax_exempt_401k_distributions", + "taxable_403b_distributions", + "tax_exempt_403b_distributions", + "regular_ira_distributions", + "roth_ira_distributions", + "tax_exempt_ira_distributions", + "taxable_sep_distributions", + "tax_exempt_sep_distributions", + "other_type_retirement_account_distributions", + "keogh_distributions", + "veterans_benefits", + "workers_compensation", + "weeks_unemployed", ) PERSON_ZERO_DEFAULT_VALUE_COLUMNS = ( @@ -221,6 +365,23 @@ "social_security_dependents", "spm_unit_energy_subsidy", "spm_unit_pre_subsidy_childcare_expenses", + "hourly_wage", + "taxable_private_pension_income", + "tax_exempt_private_pension_income", + "taxable_401k_distributions", + "tax_exempt_401k_distributions", + "taxable_403b_distributions", + "tax_exempt_403b_distributions", + "regular_ira_distributions", + "roth_ira_distributions", + "tax_exempt_ira_distributions", + "taxable_sep_distributions", + "tax_exempt_sep_distributions", + "other_type_retirement_account_distributions", + "keogh_distributions", + "veterans_benefits", + "workers_compensation", + "weeks_unemployed", ) PERSON_CACHE_REQUIRED_COLUMNS = ( @@ -245,6 +406,18 @@ "social_security_dependents", "receives_wic", "spm_unit_pre_subsidy_childcare_expenses", + "tax_exempt_private_pension_income", + "regular_ira_distributions", + "roth_ira_distributions", + "tax_exempt_ira_distributions", + "taxable_401k_distributions", + "taxable_403b_distributions", + "taxable_sep_distributions", + "other_type_retirement_account_distributions", + "keogh_distributions", + "veterans_benefits", + "workers_compensation", + "weeks_unemployed", ) PERSON_CPS_DISABILITY_COLUMNS = ( @@ -273,6 +446,7 @@ WORKERS_COMP_DISABILITY_CODE = 1 ALIMONY_OTHER_INCOME_CODE = 20 +STRIKE_BENEFITS_OTHER_INCOME_CODE = 12 SOCIAL_SECURITY_RETIREMENT_REASON_CODE = 1 SOCIAL_SECURITY_DISABILITY_REASON_CODE = 2 SOCIAL_SECURITY_SURVIVOR_REASON_CODES = (3, 5) @@ -294,6 +468,10 @@ DC_SHARE_OF_RETIREMENT_CONTRIBUTIONS = 0.908 ROTH_SHARE_OF_DC_CONTRIBUTIONS = 0.15 TRADITIONAL_SHARE_OF_IRA_CONTRIBUTIONS = 0.392 +TAXABLE_PENSION_FRACTION = 0.590 +TAXABLE_401K_DISTRIBUTION_FRACTION = 1.0 +TAXABLE_403B_DISTRIBUTION_FRACTION = 1.0 +TAXABLE_SEP_DISTRIBUTION_FRACTION = 1.0 # Census CPS ASEC 2024 technical documentation, PERRP (relationship to # household reference person). Codes 43/44/46/47 mark an unmarried partner of @@ -327,6 +505,17 @@ PE_CPS_UNDOCUMENTED_STUDENTS_TARGET = 0.21 * 1.9e6 +def derive_treasury_tipped_occupation_code( + census_occupation_codes: pd.Series | np.ndarray, +) -> np.ndarray: + """Map CPS detailed occupation codes to Treasury tipped occupation codes.""" + values = pd.Series(census_occupation_codes, copy=False) + values = pd.to_numeric(values, errors="coerce").fillna(-1).astype(int) + return ( + values.map(CENSUS_OCCUPATION_CODE_TO_TTOC).fillna(0).astype(np.int16).to_numpy() + ) + + def processed_cps_asec_cache_path(*, year: int, cache_dir: Path) -> Path: """Return the versioned processed-cache path for one CPS ASEC year.""" return cache_dir / ( @@ -1098,6 +1287,169 @@ def download_cps_asec( return cache_path +def _read_cps_asec_raw_files( + zip_path: Path, +) -> tuple[pl.DataFrame, pl.DataFrame | None]: + # Schema overrides for columns with large IDs that overflow int64. + schema_overrides = { + "PERIDNUM": pl.Utf8, + "H_IDNUM": pl.Utf8, + "OCCURNUM": pl.Utf8, + "QSTNUM": pl.Utf8, + } + + with zipfile.ZipFile(zip_path, "r") as zf: + person_file = None + household_file = None + + for name in zf.namelist(): + lower = name.lower() + if "pppub" in lower and lower.endswith(".csv"): + person_file = name + elif "hhpub" in lower and lower.endswith(".csv"): + household_file = name + + if person_file is None: + raise ValueError(f"Could not find person file in {zip_path}") + + with zf.open(person_file) as f: + persons_raw = pl.read_csv( + f, + infer_schema_length=10000, + schema_overrides=schema_overrides, + ) + + if household_file is None: + households_raw = None + else: + with zf.open(household_file) as f: + households_raw = pl.read_csv( + f, + infer_schema_length=10000, + schema_overrides=schema_overrides, + ) + + return persons_raw, households_raw + + +def _load_previous_cps_asec_persons_raw( + *, + year: int, + cache_dir: Path, + download: bool, +) -> pl.DataFrame | None: + previous_year = int(year) - 1 + if previous_year not in CPS_URLS: + return None + + zip_path = cache_dir / f"cps_asec_{previous_year}.zip" + if not zip_path.exists(): + if not download: + return None + try: + zip_path = download_cps_asec(previous_year, cache_dir) + except Exception as exc: + print(f"Could not load previous CPS ASEC {previous_year}: {exc}") + return None + + try: + persons_raw, _ = _read_cps_asec_raw_files(zip_path) + except Exception as exc: + print(f"Could not parse previous CPS ASEC {previous_year}: {exc}") + return None + return persons_raw + + +def _attach_previous_year_income( + *, + persons: pl.DataFrame, + current_persons_raw: pl.DataFrame, + previous_persons_raw: pl.DataFrame | None, +) -> pl.DataFrame: + default_exprs = [ + pl.lit(-1.0).alias("employment_income_last_year"), + pl.lit(-1.0).alias("self_employment_income_last_year"), + pl.lit(False).alias("previous_year_income_available"), + ] + current_required = {"PERIDNUM", "I_ERNVAL", "I_SEVAL"} + previous_required = {"PERIDNUM", "WSAL_VAL", "SEMP_VAL", "I_ERNVAL", "I_SEVAL"} + if previous_persons_raw is None: + return persons.with_columns(default_exprs) + if not current_required.issubset(set(current_persons_raw.columns)): + return persons.with_columns(default_exprs) + if not previous_required.issubset(set(previous_persons_raw.columns)): + return persons.with_columns(default_exprs) + if len(persons) != len(current_persons_raw): + return persons.with_columns(default_exprs) + + current = current_persons_raw.select(sorted(current_required)).to_pandas() + current["_mp_row_order"] = np.arange(len(current)) + previous = previous_persons_raw.select(sorted(previous_required)).to_pandas() + previous = previous.rename( + columns={ + "WSAL_VAL": "employment_income_last_year", + "SEMP_VAL": "self_employment_income_last_year", + "I_ERNVAL": "_previous_year_wage_imputation_flag", + "I_SEVAL": "_previous_year_self_employment_imputation_flag", + } + ) + + for column in ( + "I_ERNVAL", + "I_SEVAL", + "_previous_year_wage_imputation_flag", + "_previous_year_self_employment_imputation_flag", + "employment_income_last_year", + "self_employment_income_last_year", + ): + if column in current.columns: + current[column] = pd.to_numeric(current[column], errors="coerce") + if column in previous.columns: + previous[column] = pd.to_numeric(previous[column], errors="coerce") + + previous = previous[ + previous["_previous_year_wage_imputation_flag"].eq(0) + & previous["_previous_year_self_employment_imputation_flag"].eq(0) + ] + previous = previous.drop( + [ + "_previous_year_wage_imputation_flag", + "_previous_year_self_employment_imputation_flag", + ], + axis=1, + ) + joined = ( + current.set_index("PERIDNUM") + .join(previous.set_index("PERIDNUM"), how="left") + .sort_values("_mp_row_order") + ) + previous_year_income_available = ( + joined["employment_income_last_year"].notna() + & joined["self_employment_income_last_year"].notna() + & joined["I_ERNVAL"].eq(0) + & joined["I_SEVAL"].eq(0) + ) + employment_income_last_year = ( + joined["employment_income_last_year"].fillna(-1.0).to_numpy(dtype=float) + ) + self_employment_income_last_year = ( + joined["self_employment_income_last_year"].fillna(-1.0).to_numpy(dtype=float) + ) + return persons.with_columns( + [ + pl.Series("employment_income_last_year", employment_income_last_year), + pl.Series( + "self_employment_income_last_year", + self_employment_income_last_year, + ), + pl.Series( + "previous_year_income_available", + previous_year_income_available.to_numpy(dtype=bool), + ), + ] + ) + + def load_cps_asec( year: int = 2023, cache_dir: Path | None = None, @@ -1114,8 +1466,6 @@ def load_cps_asec( Returns: CPSDataset with persons and households DataFrames """ - import zipfile - if cache_dir is None: cache_dir = DEFAULT_CACHE_DIR @@ -1160,50 +1510,20 @@ def load_cps_asec( # Extract and parse print(f"Parsing CPS ASEC {year}...") - with zipfile.ZipFile(zip_path, "r") as zf: - # Find the person file (pppub*.csv) - person_file = None - household_file = None - - for name in zf.namelist(): - lower = name.lower() - if "pppub" in lower and lower.endswith(".csv"): - person_file = name - elif "hhpub" in lower and lower.endswith(".csv"): - household_file = name - - if person_file is None: - raise ValueError(f"Could not find person file in {zip_path}") - - # Schema overrides for columns with large IDs that overflow int64 - schema_overrides = { - "PERIDNUM": pl.Utf8, # Person ID - too large for int64 - "H_IDNUM": pl.Utf8, # Household ID - too large for int64 - "OCCURNUM": pl.Utf8, # Occurrence number - "QSTNUM": pl.Utf8, # Questionnaire number - } - - # Read person data - with zf.open(person_file) as f: - persons_raw = pl.read_csv( - f, - infer_schema_length=10000, - schema_overrides=schema_overrides, - ) - - # Read household data if available - if household_file: - with zf.open(household_file) as f: - households_raw = pl.read_csv( - f, - infer_schema_length=10000, - schema_overrides=schema_overrides, - ) - else: - households_raw = None + persons_raw, households_raw = _read_cps_asec_raw_files(zip_path) + previous_persons_raw = _load_previous_cps_asec_persons_raw( + year=year, + cache_dir=cache_dir, + download=download, + ) # Process person data persons = _process_persons(persons_raw, year) + persons = _attach_previous_year_income( + persons=persons, + current_persons_raw=persons_raw, + previous_persons_raw=previous_persons_raw, + ) # Process or derive household data if households_raw is not None: @@ -1271,16 +1591,262 @@ def _process_persons(df: pl.DataFrame, year: int) -> pl.DataFrame: result = result.with_columns( (pl.col("_cps_hispanic_code") != 0).alias("is_hispanic") ).drop("_cps_hispanic_code") + + health_staging_columns = [ + f"_{leaf}" for leaf in CURRENT_HEALTH_COVERAGE_REPORTED_VAR_MAP + ] + available_health_staging = [ + column for column in health_staging_columns if column in result.columns + ] + if available_health_staging: + result = result.with_columns( + [ + (pl.col(f"_{leaf}") == 1).alias(leaf) + for leaf in CURRENT_HEALTH_COVERAGE_REPORTED_VAR_MAP + if f"_{leaf}" in result.columns and leaf not in result.columns + ] + ) + result = result.with_columns( + [ + pl.col(reported_leaf).alias(leaf) + for leaf, reported_leaf in CURRENT_HEALTH_COVERAGE_RULE_INPUT_ALIAS_MAP.items() + if reported_leaf in result.columns and leaf not in result.columns + ] + ) + if ( + "_reported_has_private_health_coverage_at_interview" in result.columns + and "reported_has_private_health_coverage_at_interview" + not in result.columns + ): + result = result.with_columns( + ( + pl.col("_reported_has_private_health_coverage_at_interview") == 1 + ).alias("reported_has_private_health_coverage_at_interview") + ) + if ( + "_reported_has_public_health_coverage_at_interview" in result.columns + and "reported_has_public_health_coverage_at_interview" not in result.columns + ): + result = result.with_columns( + ( + pl.col("_reported_has_public_health_coverage_at_interview") == 1 + ).alias("reported_has_public_health_coverage_at_interview") + ) + if "_reported_current_health_coverage_code" in result.columns: + if "reported_is_insured_at_interview" not in result.columns: + result = result.with_columns( + (pl.col("_reported_current_health_coverage_code") == 1).alias( + "reported_is_insured_at_interview" + ) + ) + if "reported_is_uninsured_at_interview" not in result.columns: + result = result.with_columns( + (pl.col("_reported_current_health_coverage_code") != 1).alias( + "reported_is_uninsured_at_interview" + ) + ) + coverage_family_columns = [ + "reported_has_employer_sponsored_health_coverage_at_interview", + "reported_has_marketplace_health_coverage_at_interview", + "reported_has_non_marketplace_direct_purchase_health_coverage_at_interview", + "reported_has_medicare_health_coverage_at_interview", + "reported_has_means_tested_health_coverage_at_interview", + "reported_has_tricare_health_coverage_at_interview", + "reported_has_champva_health_coverage_at_interview", + "reported_has_va_health_coverage_at_interview", + "reported_has_indian_health_service_coverage_at_interview", + ] + available_coverage_family_columns = [ + column for column in coverage_family_columns if column in result.columns + ] + if ( + available_coverage_family_columns + and "reported_has_multiple_health_coverage_at_interview" + not in result.columns + ): + result = result.with_columns( + ( + pl.sum_horizontal( + *[ + pl.col(column).cast(pl.Int8) + for column in available_coverage_family_columns + ] + ) + > 1 + ).alias("reported_has_multiple_health_coverage_at_interview") + ) + if ( + "reported_has_marketplace_health_coverage_at_interview" in result.columns + and "has_marketplace_health_coverage" not in result.columns + ): + result = result.with_columns( + pl.col("reported_has_marketplace_health_coverage_at_interview").alias( + "has_marketplace_health_coverage" + ) + ) + if ( + "reported_has_employer_sponsored_health_coverage_at_interview" + in result.columns + and "has_esi" not in result.columns + ): + result = result.with_columns( + pl.col( + "reported_has_employer_sponsored_health_coverage_at_interview" + ).alias("has_esi") + ) + result = result.drop( + [ + column + for column in ( + *available_health_staging, + "_reported_has_private_health_coverage_at_interview", + "_reported_has_public_health_coverage_at_interview", + "_reported_current_health_coverage_code", + ) + if column in result.columns + ] + ) + + if ( + "_high_school_or_college_status" in result.columns + and "is_full_time_college_student" not in result.columns + ): + result = result.with_columns( + (pl.col("_high_school_or_college_status") == 2).alias( + "is_full_time_college_student" + ) + ).drop("_high_school_or_college_status") + elif "_high_school_or_college_status" in result.columns: + result = result.drop("_high_school_or_college_status") + + if "_is_paid_hourly_code" in result.columns: + if "is_paid_hourly" not in result.columns: + result = result.with_columns( + (pl.col("_is_paid_hourly_code") == 1).alias("is_paid_hourly") + ) + if ( + "_hourly_pay_cents" in result.columns + and "hourly_wage" not in result.columns + ): + result = result.with_columns( + pl.when( + (pl.col("_is_paid_hourly_code") == 1) + & (pl.col("_hourly_pay_cents") > 0) + ) + .then(pl.col("_hourly_pay_cents") / 100) + .otherwise(0.0) + .alias("hourly_wage") + ) + result = result.drop( + [ + column + for column in ("_is_paid_hourly_code", "_hourly_pay_cents") + if column in result.columns + ] + ) + elif "_hourly_pay_cents" in result.columns: + result = result.drop("_hourly_pay_cents") + + if ( + "_union_member_code" in result.columns + and "is_union_member_or_covered" not in result.columns + ): + result = result.with_columns( + (pl.col("_union_member_code") == 1).alias("is_union_member_or_covered") + ).drop("_union_member_code") + elif "_union_member_code" in result.columns: + result = result.drop("_union_member_code") + + if "detailed_occupation_recode" in result.columns: + occupation = pl.col("detailed_occupation_recode") + occupation_exprs: list[pl.Expr] = [] + if "has_never_worked" not in result.columns: + occupation_exprs.append((occupation == 53).alias("has_never_worked")) + if "is_military" not in result.columns: + occupation_exprs.append((occupation == 52).alias("is_military")) + if "is_computer_scientist" not in result.columns: + occupation_exprs.append((occupation == 8).alias("is_computer_scientist")) + if "is_farmer_fisher" not in result.columns: + occupation_exprs.append((occupation == 41).alias("is_farmer_fisher")) + if "is_executive_administrative_professional" not in result.columns: + occupation_exprs.append( + occupation.is_in( + [ + 1, + 2, + 3, + 5, + 6, + 7, + 9, + 10, + 11, + 12, + 13, + 14, + 15, + 16, + 18, + 19, + 25, + 26, + 27, + 28, + 29, + 34, + 36, + 38, + 39, + 40, + 42, + 50, + ] + ).alias("is_executive_administrative_professional") + ) + if occupation_exprs: + result = result.with_columns(occupation_exprs) + + if "_detailed_census_occupation_code" in result.columns: + if "treasury_tipped_occupation_code" not in result.columns: + tipped_codes = derive_treasury_tipped_occupation_code( + result["_detailed_census_occupation_code"].to_pandas() + ) + result = result.with_columns( + pl.Series("treasury_tipped_occupation_code", tipped_codes) + ) + if ( + "treasury_tipped_occupation_code" in result.columns + and "is_tipped_occupation" not in result.columns + ): + result = result.with_columns( + (pl.col("treasury_tipped_occupation_code") > 0).alias( + "is_tipped_occupation" + ) + ) + result = result.drop("_detailed_census_occupation_code") + if { "_other_income_code", "_other_income_value", }.issubset(set(result.columns)) and "alimony_income" not in result.columns: - result = result.with_columns( + other_income_exprs = [ pl.when(pl.col("_other_income_code") == ALIMONY_OTHER_INCOME_CODE) .then(pl.col("_other_income_value")) .otherwise(0) .alias("alimony_income") - ).drop(["_other_income_code", "_other_income_value"]) + ] + if "strike_benefits" not in result.columns: + other_income_exprs.append( + pl.when( + pl.col("_other_income_code") == STRIKE_BENEFITS_OTHER_INCOME_CODE + ) + .then(pl.col("_other_income_value")) + .otherwise(0) + .alias("strike_benefits") + ) + result = result.with_columns(other_income_exprs).drop( + ["_other_income_code", "_other_income_value"] + ) else: drop_columns = [ column @@ -1389,6 +1955,138 @@ def _process_persons(df: pl.DataFrame, year: int) -> pl.DataFrame: ] if drop_columns: result = result.drop(drop_columns) + + private_pension_staging = [ + column + for column in ("_pension_income", "_annuity_income") + if column in result.columns + ] + if private_pension_staging: + private_pension_total = pl.sum_horizontal( + *[pl.col(column) for column in private_pension_staging] + ) + pension_exprs: list[pl.Expr] = [] + if "pension_income" not in result.columns: + pension_exprs.append(private_pension_total.alias("pension_income")) + if "taxable_private_pension_income" not in result.columns: + pension_exprs.append( + (private_pension_total * TAXABLE_PENSION_FRACTION).alias( + "taxable_private_pension_income" + ) + ) + if "tax_exempt_private_pension_income" not in result.columns: + pension_exprs.append( + (private_pension_total * (1 - TAXABLE_PENSION_FRACTION)).alias( + "tax_exempt_private_pension_income" + ) + ) + if "taxable_pension_income" not in result.columns: + pension_exprs.append( + (private_pension_total * TAXABLE_PENSION_FRACTION).alias( + "taxable_pension_income" + ) + ) + result = result.with_columns(pension_exprs).drop(private_pension_staging) + + retirement_distribution_pairs = [ + ("_retirement_distribution_code_1", "_retirement_distribution_value_1"), + ("_retirement_distribution_code_2", "_retirement_distribution_value_2"), + ( + "_retirement_distribution_code_1_yng", + "_retirement_distribution_value_1_yng", + ), + ( + "_retirement_distribution_code_2_yng", + "_retirement_distribution_value_2_yng", + ), + ] + available_retirement_distribution_pairs = [ + (code_column, value_column) + for code_column, value_column in retirement_distribution_pairs + if code_column in result.columns and value_column in result.columns + ] + if available_retirement_distribution_pairs: + distribution_by_code = {} + for code in range(1, 8): + distribution_by_code[code] = pl.sum_horizontal( + *[ + pl.when(pl.col(code_column) == code) + .then(pl.col(value_column)) + .otherwise(0.0) + for code_column, value_column in available_retirement_distribution_pairs + ] + ) + retirement_distribution_exprs: list[pl.Expr] = [] + if "taxable_401k_distributions" not in result.columns: + retirement_distribution_exprs.append( + (distribution_by_code[1] * TAXABLE_401K_DISTRIBUTION_FRACTION).alias( + "taxable_401k_distributions" + ) + ) + if "tax_exempt_401k_distributions" not in result.columns: + retirement_distribution_exprs.append( + ( + distribution_by_code[1] * (1 - TAXABLE_401K_DISTRIBUTION_FRACTION) + ).alias("tax_exempt_401k_distributions") + ) + if "taxable_403b_distributions" not in result.columns: + retirement_distribution_exprs.append( + (distribution_by_code[2] * TAXABLE_403B_DISTRIBUTION_FRACTION).alias( + "taxable_403b_distributions" + ) + ) + if "tax_exempt_403b_distributions" not in result.columns: + retirement_distribution_exprs.append( + ( + distribution_by_code[2] * (1 - TAXABLE_403B_DISTRIBUTION_FRACTION) + ).alias("tax_exempt_403b_distributions") + ) + if "roth_ira_distributions" not in result.columns: + retirement_distribution_exprs.append( + distribution_by_code[3].alias("roth_ira_distributions") + ) + if "regular_ira_distributions" not in result.columns: + retirement_distribution_exprs.append( + distribution_by_code[4].alias("regular_ira_distributions") + ) + if "taxable_ira_distributions" not in result.columns: + retirement_distribution_exprs.append( + distribution_by_code[4].alias("taxable_ira_distributions") + ) + if "tax_exempt_ira_distributions" not in result.columns: + retirement_distribution_exprs.append( + distribution_by_code[3].alias("tax_exempt_ira_distributions") + ) + if "keogh_distributions" not in result.columns: + retirement_distribution_exprs.append( + distribution_by_code[5].alias("keogh_distributions") + ) + if "taxable_sep_distributions" not in result.columns: + retirement_distribution_exprs.append( + (distribution_by_code[6] * TAXABLE_SEP_DISTRIBUTION_FRACTION).alias( + "taxable_sep_distributions" + ) + ) + if "tax_exempt_sep_distributions" not in result.columns: + retirement_distribution_exprs.append( + ( + distribution_by_code[6] * (1 - TAXABLE_SEP_DISTRIBUTION_FRACTION) + ).alias("tax_exempt_sep_distributions") + ) + if "other_type_retirement_account_distributions" not in result.columns: + retirement_distribution_exprs.append( + distribution_by_code[7].alias( + "other_type_retirement_account_distributions" + ) + ) + result = result.with_columns(retirement_distribution_exprs).drop( + [ + column + for pair in available_retirement_distribution_pairs + for column in pair + ] + ) + # Split the bundled CPS retirement-contribution total (RETCB_VAL, staged # as _retirement_contributions) into the five account-type-specific # desired contribution leaves the eCPS contract requires. This mirrors @@ -1659,6 +2357,13 @@ def _process_persons(df: pl.DataFrame, year: int) -> pl.DataFrame: result = result.with_columns( pl.col("has_medicare").alias("takes_up_medicare_if_eligible") ) + if "weeks_unemployed" in result.columns: + result = result.with_columns( + pl.when(pl.col("weeks_unemployed") == -1) + .then(0) + .otherwise(pl.col("weeks_unemployed")) + .alias("weeks_unemployed") + ) for col in PERSON_NONNEGATIVE_VALUE_COLUMNS: if col in result.columns: result = result.with_columns( @@ -1708,6 +2413,7 @@ def _attach_cps_ssn_card_type( required_person_columns = { "PRCITSHP", "PEINUSYR", + "PENATVTY", "A_HSCOL", "A_AGE", "A_MARITL", @@ -1801,6 +2507,7 @@ def select_random_subset_to_target( prcitshp = numeric_series("PRCITSHP").astype(int) peinusyr = numeric_series("PEINUSYR").astype(int) + birth_country = numeric_series("PENATVTY").astype(int) age = numeric_series("A_AGE").astype(int) marital = numeric_series("A_MARITL").astype(int) spouse_pointer = numeric_series("A_SPOUSE").astype(int) @@ -1942,14 +2649,106 @@ def select_random_subset_to_target( 2: "NON_CITIZEN_VALID_EAD", 3: "OTHER_NON_CITIZEN", } + has_valid_ssn = ssn_card_type == 1 + taxpayer_id_type = np.where( + has_valid_ssn, + "VALID_SSN", + np.where(ssn_card_type != 0, "OTHER_TIN", "NONE"), + ) + immigration_status = _derive_cps_immigration_status( + ssn_card_type=ssn_card_type, + birth_country=birth_country.to_numpy(), + peinusyr=peinusyr.to_numpy(), + age=age.to_numpy(), + year=int(persons["year"][0]) + if "year" in persons.columns and len(persons) > 0 + else 2024, + ) return persons.with_columns( - pl.Series( - "ssn_card_type", - pd.Series(ssn_card_type).map(code_to_str).tolist(), - ) + [ + pl.Series( + "ssn_card_type", + pd.Series(ssn_card_type).map(code_to_str).tolist(), + ), + pl.Series("has_valid_ssn", has_valid_ssn), + pl.Series("taxpayer_id_type", taxpayer_id_type.tolist()), + pl.Series("immigration_status_str", immigration_status.tolist()), + ] ) +def _derive_cps_immigration_status( + *, + ssn_card_type: np.ndarray, + birth_country: np.ndarray, + peinusyr: np.ndarray, + age: np.ndarray, + year: int, +) -> np.ndarray: + """Approximate eCPS immigration-status tags from CPS ASEC citizenship inputs.""" + + arrival_year_map = { + 1: 1950, + 2: 1955, + 3: 1960, + 4: 1965, + 5: 1970, + 6: 1975, + 7: 1980, + 8: 1982, + 9: 1984, + 10: 1986, + 11: 1988, + 12: 1990, + 13: 1992, + 14: 1994, + 15: 1996, + 16: 1998, + 17: 2000, + 18: 2002, + 19: 2004, + 20: 2006, + 21: 2008, + 22: 2010, + 23: 2012, + 24: 2014, + 25: 2017, + 26: 2019, + 27: 2021, + 28: 2023, + 29: 2024, + } + arrival_years = pd.Series(peinusyr).map(arrival_year_map).fillna(2024).to_numpy() + years_in_us = year - arrival_years + age_at_entry = np.maximum(0, age - years_in_us) + + result = np.full(len(ssn_card_type), "LEGAL_PERMANENT_RESIDENT", dtype="U32") + result[ssn_card_type == 1] = "CITIZEN" + + arrived_before_1982 = np.isin(peinusyr, [1, 2, 3, 4, 5, 6, 7]) + result[(ssn_card_type == 0) & ~arrived_before_1982] = "UNDOCUMENTED" + + cofa_birth_country_codes = {511, 512} + cuban_haitian_birth_country_codes = {327, 332} + result[ + (ssn_card_type != 0) & np.isin(birth_country, list(cofa_birth_country_codes)) + ] = "LEGAL_PERMANENT_RESIDENT" + result[ + (ssn_card_type != 0) + & np.isin(birth_country, list(cuban_haitian_birth_country_codes)) + & (arrival_years >= 1980) + ] = "CUBAN_HAITIAN_ENTRANT" + result[ + (ssn_card_type == 2) + & (arrival_years <= 2007) + & (age_at_entry < 16) + & (age >= 15) + ] = "DACA" + result[(ssn_card_type == 3) & (years_in_us <= 5)] = "REFUGEE" + result[(ssn_card_type == 2) & (result == "LEGAL_PERMANENT_RESIDENT")] = "TPS" + return result + + def _processed_persons_have_household_geography(persons: pl.DataFrame) -> bool: """Whether cached processed person data can derive household geography.""" required_columns = set(PERSON_CACHE_REQUIRED_COLUMNS) diff --git a/src/microplex_us/data_sources/puf.py b/src/microplex_us/data_sources/puf.py index 1020636..dc0f6ef 100644 --- a/src/microplex_us/data_sources/puf.py +++ b/src/microplex_us/data_sources/puf.py @@ -229,10 +229,58 @@ ) MIN_PE_STYLE_SOCIAL_SECURITY_QRF_TRAINING_RECORDS = 100 PE_PUF_PERSON_EXPANSION_RANDOM_SEED = 64 +QBI_PUF_RANDOM_SEED = 64 +QBI_SIMULATION_RANDOM_SEED = 42 + +QBI_QUALIFICATION_PROBABILITIES = { + "self_employment_income": 0.8, + "farm_operations_income": 0.95, + "farm_rent_income": 0.5, + "rental_income": 0.4, + "estate_income": 0.5, + "partnership_s_corp_income": 0.85, +} +QBI_SSTB_PROBABILITY_BY_RAW_COLUMN = { + "E00900": 0.20, + "E26270": 0.15, + "E26390": 0.10, + "E26400": 0.10, +} +QBI_REIT_PTP_PROBABILITY = 0.07 +QBI_REIT_PTP_LOG_NORMAL_MU = 8.04 +QBI_REIT_PTP_LOG_NORMAL_SIGMA = 1.20 +QBI_BDC_PROBABILITY = 0.003 +QBI_BDC_LOG_NORMAL_MU = 8.71 +QBI_BDC_LOG_NORMAL_SIGMA = 1.00 +QBI_PROFIT_MARGIN_BETA_A = 2.0 +QBI_PROFIT_MARGIN_BETA_B = 3.0 +QBI_PROFIT_MARGIN_SCALE = 0.20 +QBI_PROFIT_MARGIN_SHIFT = 0.05 +QBI_HAS_EMPLOYEES_LOGIT_INTERCEPT = -3.1 +QBI_HAS_EMPLOYEES_LOGIT_SLOPE_PER_DOLLAR = 1.2e-6 +QBI_RENTAL_LABOR_RATIO_BETA_A = 1.5 +QBI_RENTAL_LABOR_RATIO_BETA_B = 8.0 +QBI_RENTAL_LABOR_RATIO_SCALE = 0.08 +QBI_NON_RENTAL_LABOR_RATIO_BETA_A = 2.0 +QBI_NON_RENTAL_LABOR_RATIO_BETA_B = 2.0 +QBI_NON_RENTAL_LABOR_RATIO_SCALE = 0.25 +QBI_DEPRECIATION_PROXY_SIGMA = 0.8 +QBI_UBIA_MULTIPLE_OF_QBI = 4.0 +QBI_UBIA_SIGMA = 1.0 +QBI_QUALIFICATION_COLUMNS = tuple( + f"{variable}_would_be_qualified" for variable in QBI_QUALIFICATION_PROBABILITIES +) +QBI_BOOLEAN_COLUMNS = ( + "business_is_sstb", + "sstb_self_employment_income_would_be_qualified", + *QBI_QUALIFICATION_COLUMNS, +) JOINT_HEAD_SHARE_ALLOCATION = { "employment_income": 0.6, "self_employment_income": 0.6, + "sstb_self_employment_income": 0.6, + "sstb_self_employment_income_before_lsr": 0.6, } JOINT_EQUAL_SHARE_ALLOCATION = ( @@ -262,6 +310,12 @@ "charitable_noncash", "ira_deduction", "student_loan_interest", + "w2_wages_from_qualified_business", + "unadjusted_basis_qualified_property", + "sstb_w2_wages_from_qualified_business", + "sstb_unadjusted_basis_qualified_property", + "qualified_reit_and_ptp_income", + "qualified_bdc_income", ) PUF_DEMOGRAPHIC_HELPER_COLUMNS = ( @@ -296,6 +350,7 @@ "pension_income", "social_security", "social_security_retirement", + *QBI_BOOLEAN_COLUMNS, *PUF_DEMOGRAPHIC_HELPER_COLUMNS, } @@ -1171,6 +1226,11 @@ def map_puf_variables( if medical_expense_floor is not None: for variable, fraction in MEDICAL_EXPENSE_CATEGORY_BREAKDOWNS.items(): result[variable] = medical_expense_floor.fillna(0) * fraction + result = _add_puf_qbi_simulation_columns( + result, + raw_puf=puf, + random_seed=random_seed, + ) # Map filing status code to string filing_status_map = { @@ -1460,6 +1520,187 @@ def _numeric_series(df: pd.DataFrame, column: str) -> pd.Series: return df[column].fillna(0).astype(float) +def _bernoulli_lognormal_sample( + n: int, + *, + probability: float, + log_mean: float, + log_sigma: float, + rng: np.random.Generator, +) -> np.ndarray: + positive = rng.binomial(1, probability, size=n).astype(bool) + return np.where( + positive, + rng.lognormal(mean=log_mean, sigma=log_sigma, size=n), + 0.0, + ) + + +def _conditional_lognormal_sample( + flag: np.ndarray, + target_mean: pd.Series | np.ndarray, + *, + log_sigma: float, + rng: np.random.Generator, +) -> np.ndarray: + flag_array = np.asarray(flag, dtype=bool) + target = np.maximum(np.asarray(target_mean, dtype=float), 0.0) + positive_target = target > 0.0 + mu = np.zeros_like(target, dtype=float) + mu[positive_target] = np.log(target[positive_target]) - (log_sigma**2 / 2.0) + draws = rng.lognormal(mean=mu, sigma=log_sigma, size=len(target)) + return np.where(flag_array & positive_target, draws, 0.0) + + +def _simulate_qbi_w2_wages_and_ubia( + puf: pd.DataFrame, + *, + seed: int = QBI_SIMULATION_RANDOM_SEED, +) -> tuple[np.ndarray, np.ndarray]: + """Simulate PE-US-data-style Section 199A W-2 wages and UBIA support.""" + rng = np.random.default_rng(seed) + qbi = sum( + _numeric_series(puf, income_type) * probability + for income_type, probability in QBI_QUALIFICATION_PROBABILITIES.items() + ).to_numpy(dtype=float) + + margins = ( + rng.beta(QBI_PROFIT_MARGIN_BETA_A, QBI_PROFIT_MARGIN_BETA_B, qbi.size) + * QBI_PROFIT_MARGIN_SCALE + + QBI_PROFIT_MARGIN_SHIFT + ) + revenues = np.maximum(qbi, 0.0) / margins + logit = ( + QBI_HAS_EMPLOYEES_LOGIT_INTERCEPT + + QBI_HAS_EMPLOYEES_LOGIT_SLOPE_PER_DOLLAR * revenues + ) + employee_probability = np.where( + revenues == 0.0, + 0.0, + 1.0 / (1.0 + np.exp(-logit)), + ) + has_employees = rng.binomial(1, employee_probability) + + rental_income = _numeric_series(puf, "rental_income") + is_rental = rental_income.to_numpy(dtype=float) > 0.0 + labor_ratios = np.where( + is_rental, + rng.beta( + QBI_RENTAL_LABOR_RATIO_BETA_A, + QBI_RENTAL_LABOR_RATIO_BETA_B, + qbi.size, + ) + * QBI_RENTAL_LABOR_RATIO_SCALE, + rng.beta( + QBI_NON_RENTAL_LABOR_RATIO_BETA_A, + QBI_NON_RENTAL_LABOR_RATIO_BETA_B, + qbi.size, + ) + * QBI_NON_RENTAL_LABOR_RATIO_SCALE, + ) + w2_wages = revenues * labor_ratios * has_employees + + depreciation_proxy = _conditional_lognormal_sample( + is_rental, + rental_income, + log_sigma=QBI_DEPRECIATION_PROXY_SIGMA, + rng=rng, + ) + is_capital_intensive = is_rental | (depreciation_proxy > 0.0) + ubia = _conditional_lognormal_sample( + is_capital_intensive, + QBI_UBIA_MULTIPLE_OF_QBI * np.maximum(qbi, 0.0), + log_sigma=QBI_UBIA_SIGMA, + rng=rng, + ) + return w2_wages, ubia + + +def _add_puf_qbi_simulation_columns( + mapped_puf: pd.DataFrame, + *, + raw_puf: pd.DataFrame, + random_seed: int, +) -> pd.DataFrame: + """Populate PE-US-data-style Section 199A support columns from PUF.""" + result = mapped_puf.copy() + rng = np.random.default_rng(QBI_PUF_RANDOM_SEED + int(random_seed)) + for variable, probability in QBI_QUALIFICATION_PROBABILITIES.items(): + result[f"{variable}_would_be_qualified"] = rng.random(len(result)) < probability + + w2_wages, ubia = _simulate_qbi_w2_wages_and_ubia( + result, + seed=QBI_SIMULATION_RANDOM_SEED, + ) + result["w2_wages_from_qualified_business"] = w2_wages + result["unadjusted_basis_qualified_property"] = ubia + + raw_sstb_sources = pd.DataFrame( + { + column: pd.to_numeric(raw_puf[column], errors="coerce").fillna(0.0) + if column in raw_puf.columns + else pd.Series(0.0, index=result.index, dtype=float) + for column in QBI_SSTB_PROBABILITY_BY_RAW_COLUMN + }, + index=result.index, + ) + largest_qbi_source = raw_sstb_sources.idxmax(axis=1) + has_any_qbi_source = raw_sstb_sources.abs().sum(axis=1).gt(0.0) + probability_sstb = ( + largest_qbi_source.map(QBI_SSTB_PROBABILITY_BY_RAW_COLUMN) + .fillna(0.0) + .where(has_any_qbi_source, 0.0) + ) + is_sstb = rng.binomial(n=1, p=probability_sstb).astype(bool) + result["business_is_sstb"] = is_sstb + + legacy_self_employment_income = _numeric_series(result, "self_employment_income") + result["sstb_self_employment_income_before_lsr"] = np.where( + is_sstb, + legacy_self_employment_income, + 0.0, + ) + result["sstb_self_employment_income"] = result[ + "sstb_self_employment_income_before_lsr" + ] + result["self_employment_income"] = np.where( + is_sstb, + 0.0, + legacy_self_employment_income, + ) + result["sstb_self_employment_income_would_be_qualified"] = np.where( + is_sstb, + result["self_employment_income_would_be_qualified"].astype(bool), + False, + ) + result["sstb_w2_wages_from_qualified_business"] = np.where( + is_sstb, + w2_wages, + 0.0, + ) + result["sstb_unadjusted_basis_qualified_property"] = np.where( + is_sstb, + ubia, + 0.0, + ) + + result["qualified_reit_and_ptp_income"] = _bernoulli_lognormal_sample( + len(result), + probability=QBI_REIT_PTP_PROBABILITY, + log_mean=QBI_REIT_PTP_LOG_NORMAL_MU, + log_sigma=QBI_REIT_PTP_LOG_NORMAL_SIGMA, + rng=rng, + ) + result["qualified_bdc_income"] = _bernoulli_lognormal_sample( + len(result), + probability=QBI_BDC_PROBABILITY, + log_mean=QBI_BDC_LOG_NORMAL_MU, + log_sigma=QBI_BDC_LOG_NORMAL_SIGMA, + rng=rng, + ) + return result + + def _default_cps_reference_year(target_year: int) -> int: return min(max(target_year - 1, 2021), 2023) @@ -1942,6 +2183,10 @@ def _add_derived_income_columns(df: pd.DataFrame) -> pd.DataFrame: result = normalize_dividend_columns(result) employment_income = _numeric_series(result, "employment_income") self_employment_income = _numeric_series(result, "self_employment_income") + sstb_self_employment_income = _numeric_series( + result, + "sstb_self_employment_income", + ) taxable_interest_income = _numeric_series(result, "taxable_interest_income") ordinary_dividend_income = _numeric_series(result, "ordinary_dividend_income") short_term_capital_gains = _numeric_series(result, "short_term_capital_gains") @@ -1970,6 +2215,7 @@ def _add_derived_income_columns(df: pd.DataFrame) -> pd.DataFrame: result["income"] = ( employment_income + self_employment_income + + sstb_self_employment_income + result["interest_income"] + result["dividend_income"] + rental_income diff --git a/src/microplex_us/manifests/pe_source_impute_blocks.json b/src/microplex_us/manifests/pe_source_impute_blocks.json index 07ea753..43d0f97 100644 --- a/src/microplex_us/manifests/pe_source_impute_blocks.json +++ b/src/microplex_us/manifests/pe_source_impute_blocks.json @@ -167,7 +167,9 @@ "TPTOTINC", "TVAL_BANK", "TVAL_STMF", - "TVAL_BOND" + "TVAL_BOND", + "TVEH_NUM", + "THVAL_VEH" ], "direct_columns": { "age": "TAGE", @@ -176,7 +178,9 @@ "weight": "WPFINWGT", "bank_account_assets": "TVAL_BANK", "stock_assets": "TVAL_STMF", - "bond_assets": "TVAL_BOND" + "bond_assets": "TVAL_BOND", + "household_vehicles_owned": "TVEH_NUM", + "household_vehicles_value": "THVAL_VEH" }, "sum_columns_contains": {}, "indicator_columns": { @@ -214,12 +218,16 @@ "count_under_18", "bank_account_assets", "stock_assets", - "bond_assets" + "bond_assets", + "household_vehicles_owned", + "household_vehicles_value" ], "target_variables": [ "bank_account_assets", "stock_assets", - "bond_assets" + "bond_assets", + "household_vehicles_owned", + "household_vehicles_value" ], "predictors": [ "employment_income", diff --git a/src/microplex_us/pipelines/us.py b/src/microplex_us/pipelines/us.py index 6b5651e..19c2d01 100644 --- a/src/microplex_us/pipelines/us.py +++ b/src/microplex_us/pipelines/us.py @@ -143,6 +143,11 @@ "w2_wages_from_qualified_business", "unadjusted_basis_qualified_property", "business_is_sstb", + "sstb_self_employment_income_before_lsr", + "sstb_self_employment_income", + "sstb_self_employment_income_would_be_qualified", + "sstb_w2_wages_from_qualified_business", + "sstb_unadjusted_basis_qualified_property", "short_term_capital_gains", "qualified_dividend_income", "charitable_cash_donations", @@ -187,6 +192,9 @@ "unreported_payroll_tax", "recapture_of_investment_credit", "deductible_mortgage_interest", + "home_mortgage_interest", + "investment_interest_expense", + "other_health_insurance_premiums", "qualified_reit_and_ptp_income", "qualified_bdc_income", "farm_operations_income", @@ -341,6 +349,7 @@ WIC_TAKEUP_CATEGORY_CHILD: 0.752, WIC_TAKEUP_CATEGORY_NONE: 0.0, } +DEFAULT_PREGNANCY_RATE = 0.041 EITC_TAKEUP_CHILD_COUNT_HELPER_COLUMN = "_mp_eitc_child_count_for_takeup" VOLUNTARY_FILING_AGE_HEAD_HELPER_COLUMN = "_mp_voluntary_filing_age_head" VOLUNTARY_FILING_WAGE_INCOME_HELPER_COLUMN = "_mp_voluntary_filing_wage_income" @@ -419,6 +428,25 @@ def _load_microplex_wic_nutritional_risk_rates(year: int) -> dict[str, float]: return dict(DEFAULT_WIC_NUTRITIONAL_RISK_RATES) +def _load_microplex_pregnancy_rates(year: int) -> dict[str, float]: + """Load pregnancy rates by state abbreviation, matching PE-US-data when present.""" + _ = year + try: + from policyengine_us_data.db.etl_pregnancy import ( + get_state_pregnancy_rates, + ) + + rates = get_state_pregnancy_rates() + except Exception: + LOGGER.warning( + "Failed to load state pregnancy rates; using national fallback", + exc_info=True, + ) + return {} + + return {str(state).upper(): float(rate) for state, rate in rates.items()} + + PUF_SUPPORT_CLONE_OVERRIDDEN_VARIABLES: tuple[str, ...] = ( "partnership_s_corp_income", "interest_deduction", @@ -427,6 +455,11 @@ def _load_microplex_wic_nutritional_risk_rates(year: int) -> dict[str, float]: "w2_wages_from_qualified_business", "unadjusted_basis_qualified_property", "business_is_sstb", + "sstb_self_employment_income_before_lsr", + "sstb_self_employment_income", + "sstb_self_employment_income_would_be_qualified", + "sstb_w2_wages_from_qualified_business", + "sstb_unadjusted_basis_qualified_property", "charitable_cash_donations", "self_employed_pension_contribution_ald", "unrecaptured_section_1250_gain", @@ -461,6 +494,9 @@ def _load_microplex_wic_nutritional_risk_rates(year: int) -> dict[str, float]: "unreported_payroll_tax", "recapture_of_investment_credit", "deductible_mortgage_interest", + "home_mortgage_interest", + "investment_interest_expense", + "other_health_insurance_premiums", "qualified_reit_and_ptp_income", "qualified_bdc_income", "farm_operations_income", @@ -469,6 +505,7 @@ def _load_microplex_wic_nutritional_risk_rates(year: int) -> dict[str, float]: "farm_rent_income_would_be_qualified", "partnership_s_corp_income_would_be_qualified", "rental_income_would_be_qualified", + "self_employment_income_would_be_qualified", ) PUF_SUPPORT_CLONE_SPECIAL_VARIABLES: tuple[str, ...] = ("weeks_unemployed",) @@ -4955,6 +4992,7 @@ def build_policyengine_entity_tables( households = self._build_policyengine_households(persons) tax_units, persons = self._build_policyengine_tax_units(persons) + tax_units = self._attach_policyengine_tax_unit_source_inputs(tax_units) tax_units = self._attach_policyengine_tax_unit_takeup_inputs(tax_units) persons = self._construct_aotc_eligibility_inputs(persons) persons = self._assign_family_and_spm_units(persons) @@ -7544,6 +7582,8 @@ def _build_policyengine_households(self, persons: pd.DataFrame) -> pd.DataFrame: "net_worth", "auto_loan_balance", "auto_loan_interest", + "household_vehicles_owned", + "household_vehicles_value", ) if column in persons.columns ] @@ -8840,6 +8880,38 @@ def _aggregate_policyengine_tax_unit_input_columns( ) head_age = age.loc[head_mask].iloc[0] if bool(head_mask.any()) else age.iloc[0] aggregated[VOLUNTARY_FILING_AGE_HEAD_HELPER_COLUMN] = float(head_age) + for column in ( + "interest_deduction", + "deductible_mortgage_interest", + "mortgage_interest_paid", + "first_home_mortgage_interest", + "second_home_mortgage_interest", + ): + if column not in unit_persons.columns: + continue + values = pd.to_numeric(unit_persons[column], errors="coerce").fillna(0.0) + aggregated[column] = float(values.clip(lower=0.0).sum()) + for column in ( + "first_home_mortgage_balance", + "second_home_mortgage_balance", + "scf_mortgage_debt", + "imputed_first_home_mortgage_balance_hint", + "imputed_second_home_mortgage_balance_hint", + ): + if column not in unit_persons.columns: + continue + values = pd.to_numeric(unit_persons[column], errors="coerce").fillna(0.0) + aggregated[column] = float(values.clip(lower=0.0).max()) + for column in ( + "first_home_mortgage_origination_year", + "second_home_mortgage_origination_year", + ): + if column not in unit_persons.columns: + continue + values = pd.to_numeric(unit_persons[column], errors="coerce").fillna(0.0) + positive = values.loc[values.gt(0.0)] + if not positive.empty: + aggregated[column] = int(positive.iloc[0]) for boolean_column in ( "takes_up_aca_if_eligible", "takes_up_dc_ptc", @@ -8853,6 +8925,90 @@ def _aggregate_policyengine_tax_unit_input_columns( aggregated[boolean_column] = value return aggregated + def _attach_policyengine_tax_unit_source_inputs( + self, + tax_units: pd.DataFrame, + ) -> pd.DataFrame: + """Attach structural tax-unit inputs derived from source columns.""" + result = tax_units.copy() + zero = pd.Series(0.0, index=result.index, dtype=float) + + def first_nonzero_or_present(*columns: str) -> pd.Series: + values = zero.copy() + found = False + for column in columns: + if column not in result.columns: + continue + candidate = ( + pd.to_numeric(result[column], errors="coerce") + .fillna(0.0) + .astype(float) + ) + if not found: + values = candidate.copy() + found = True + continue + values = values.where(values.ne(0.0), candidate) + return values if found else zero.copy() + + mortgage_interest = first_nonzero_or_present( + "first_home_mortgage_interest", + "deductible_mortgage_interest", + "mortgage_interest_paid", + ).clip(lower=0.0) + if ( + "first_home_mortgage_interest" in result.columns + or "deductible_mortgage_interest" in result.columns + or "mortgage_interest_paid" in result.columns + ): + result["first_home_mortgage_interest"] = mortgage_interest + + if "interest_deduction" in result.columns: + interest_deduction = first_nonzero_or_present( + "interest_deduction", + "first_home_mortgage_interest", + ).clip(lower=0.0) + result["interest_deduction"] = np.maximum( + interest_deduction, + mortgage_interest, + ) + + balance_hint = first_nonzero_or_present( + "first_home_mortgage_balance", + "imputed_first_home_mortgage_balance_hint", + "scf_mortgage_debt", + ).clip(lower=0.0) + if ( + "first_home_mortgage_balance" in result.columns + or "imputed_first_home_mortgage_balance_hint" in result.columns + or "scf_mortgage_debt" in result.columns + or bool(mortgage_interest.gt(0.0).any()) + ): + interest_implied_balance = mortgage_interest / 0.06 + result["first_home_mortgage_balance"] = np.maximum( + balance_hint, + interest_implied_balance, + ).where(mortgage_interest.gt(0.0) | balance_hint.gt(0.0), 0.0) + + origination_year = first_nonzero_or_present( + "first_home_mortgage_origination_year", + ) + if "first_home_mortgage_origination_year" in result.columns or bool( + mortgage_interest.gt(0.0).any() + ): + target_year = int( + self.config.policyengine_dataset_year + or self.config.policyengine_target_period + or 2024 + ) + fallback_year = max(1988, target_year - 10) + result["first_home_mortgage_origination_year"] = ( + origination_year.where(origination_year.gt(0.0), fallback_year) + .where(mortgage_interest.gt(0.0), 0.0) + .astype(int) + ) + return result + def _infer_policyengine_bool_for_group( self, group_rows: pd.DataFrame, @@ -9073,6 +9229,7 @@ def _attach_policyengine_person_takeup_inputs( ) -> pd.DataFrame: """Attach eCPS-style person stochastic inputs before materialization.""" result = self._attach_policyengine_medicaid_takeup(persons) + result = self._attach_policyengine_pregnancy_inputs(result) for column, rate_key in ( ("takes_up_head_start_if_eligible", "head_start"), ("takes_up_early_head_start_if_eligible", "early_head_start"), @@ -9126,6 +9283,48 @@ def _attach_policyengine_medicaid_takeup( result[column] = rng.random(len(result)) < takeup_rate.to_numpy(dtype=float) return result + def _attach_policyengine_pregnancy_inputs( + self, + persons: pd.DataFrame, + ) -> pd.DataFrame: + result = persons.copy() + column = "is_pregnant" + if column in result.columns: + result[column] = self._normal_bool_series( + result[column], index=result.index + ) + return result + + index = result.index + age = pd.to_numeric( + result.get("age", pd.Series(0.0, index=index)), + errors="coerce", + ).fillna(0.0) + if "is_female" in result.columns: + female = self._normal_bool_series(result["is_female"], index=index) + elif "sex" in result.columns: + female = ( + pd.to_numeric(result["sex"], errors="coerce") + .fillna(0) + .astype(int) + .eq(2) + ) + else: + female = pd.Series(False, index=index) + + year = self._policyengine_takeup_year() + rates = _load_microplex_pregnancy_rates(year) + states = self._person_state_abbreviation(result) + pregnancy_rate = states.map( + lambda state: rates.get(str(state).upper(), DEFAULT_PREGNANCY_RATE) + ).fillna(DEFAULT_PREGNANCY_RATE) + eligible = female & age.ge(15.0) & age.le(44.0) + rng = _microplex_seeded_rng(column) + result[column] = eligible.to_numpy(dtype=bool) & ( + rng.random(len(result)) < pregnancy_rate.to_numpy(dtype=float) + ) + return result + def _attach_policyengine_wic_inputs( self, persons: pd.DataFrame, @@ -10428,6 +10627,43 @@ def has_any(*columns: str) -> bool: result["farm_operations_income"] = first_present("farm_operations_income") result["farm_rent_income"] = first_present("farm_rent_income") result["rental_income"] = first_present("rental_income") + result["w2_wages_from_qualified_business"] = first_present( + "w2_wages_from_qualified_business" + ).clip(lower=0.0) + result["unadjusted_basis_qualified_property"] = first_present( + "unadjusted_basis_qualified_property" + ).clip(lower=0.0) + result["qualified_reit_and_ptp_income"] = first_present( + "qualified_reit_and_ptp_income" + ).clip(lower=0.0) + result["qualified_bdc_income"] = first_present("qualified_bdc_income").clip( + lower=0.0 + ) + result["sstb_self_employment_income_before_lsr"] = first_nonzero_or_present( + "sstb_self_employment_income_before_lsr", + "sstb_self_employment_income", + ) + result["sstb_w2_wages_from_qualified_business"] = first_present( + "sstb_w2_wages_from_qualified_business" + ).clip(lower=0.0) + result["sstb_unadjusted_basis_qualified_property"] = first_present( + "sstb_unadjusted_basis_qualified_property" + ).clip(lower=0.0) + for qbi_bool_column in ( + "business_is_sstb", + "self_employment_income_would_be_qualified", + "sstb_self_employment_income_would_be_qualified", + "farm_operations_income_would_be_qualified", + "farm_rent_income_would_be_qualified", + "partnership_s_corp_income_would_be_qualified", + "rental_income_would_be_qualified", + "estate_income_would_be_qualified", + ): + if qbi_bool_column in result.columns: + result[qbi_bool_column] = self._normal_bool_series( + result[qbi_bool_column], + index=result.index, + ) result["health_savings_account_ald"] = first_present( "health_savings_account_ald" ) @@ -10467,6 +10703,19 @@ def has_any(*columns: str) -> bool: "state_income_tax_paid", ) result["student_loan_interest"] = first_present("student_loan_interest") + result["home_mortgage_interest"] = first_nonzero_or_present( + "home_mortgage_interest", + "deductible_mortgage_interest", + "mortgage_interest_paid", + ).clip(lower=0.0) + result["investment_interest_expense"] = first_nonzero_or_present( + "investment_interest_expense", + "investment_income_elected_form_4952", + ).clip(lower=0.0) + result["other_health_insurance_premiums"] = first_nonzero_or_present( + "other_health_insurance_premiums", + "health_insurance_premiums_without_medicare_part_b", + ).clip(lower=0.0) return result def _resolve_policyengine_tax_benefit_system(self) -> Any: diff --git a/src/microplex_us/variables.py b/src/microplex_us/variables.py index 3b41ce9..2385e28 100644 --- a/src/microplex_us/variables.py +++ b/src/microplex_us/variables.py @@ -246,6 +246,18 @@ def minor_positive_employment_income_mask(frame: pd.DataFrame) -> pd.Series: "state_fips": VariableSemanticSpec(native_entity=EntityType.HOUSEHOLD), "tenure": VariableSemanticSpec(native_entity=EntityType.HOUSEHOLD), "state": VariableSemanticSpec(native_entity=EntityType.HOUSEHOLD), + "household_vehicles_owned": VariableSemanticSpec( + native_entity=EntityType.HOUSEHOLD, + projection_aggregation=ProjectionAggregation.MAX, + support_family=VariableSupportFamily.SUPPORT_SENSITIVE, + notes="Household vehicle count from the SIPP asset donor.", + ), + "household_vehicles_value": VariableSemanticSpec( + native_entity=EntityType.HOUSEHOLD, + projection_aggregation=ProjectionAggregation.MAX, + support_family=VariableSupportFamily.SUPPORT_SENSITIVE, + notes="Household vehicle value from the SIPP asset donor.", + ), "dividend_income": VariableSemanticSpec( native_entity=EntityType.PERSON, condition_entities=( diff --git a/tests/data_sources/test_cps_export_support_fields.py b/tests/data_sources/test_cps_export_support_fields.py new file mode 100644 index 0000000..054cdf4 --- /dev/null +++ b/tests/data_sources/test_cps_export_support_fields.py @@ -0,0 +1,268 @@ +"""Tests for CPS-backed eCPS export-support fields. + +These fields are required by the eCPS export contract and are populated in the +incumbent enhanced CPS. Microplex previously exported many of them only through +constant defaults, so the presence gate passed while the support gate failed. +""" + +import numpy as np +import polars as pl +import pytest + +from microplex_us.data_sources.cps import ( + CURRENT_HEALTH_COVERAGE_REPORTED_VAR_MAP, + PERSON_VARIABLES, + TAXABLE_PENSION_FRACTION, + _attach_cps_ssn_card_type, + _derive_cps_immigration_status, + _process_persons, +) + + +def _raw_person_frame(rows: list[dict]) -> pl.DataFrame: + n = len(rows) + return pl.DataFrame( + { + "PH_SEQ": [1] * n, + "A_LINENO": list(range(1, n + 1)), + "A_FNLWGT": [100.0] * n, + "A_AGE": [row.get("age", 40) for row in rows], + "A_HSCOL": [row.get("school", 0) for row in rows], + "A_HRLYWK": [row.get("hourly_code", 0) for row in rows], + "A_HRSPAY": [row.get("hourly_cents", -1) for row in rows], + "A_UNMEM": [row.get("union", 0) for row in rows], + "POCCU2": [row.get("poccu2", 0) for row in rows], + "PEIOOCC": [row.get("peioocc", -1) for row in rows], + "PNSN_VAL": [row.get("pension", 0.0) for row in rows], + "ANN_VAL": [row.get("annuity", 0.0) for row in rows], + "LKWEEKS": [row.get("weeks_unemployed", -1) for row in rows], + "VET_VAL": [row.get("veterans_benefits", 0.0) for row in rows], + "WC_VAL": [row.get("workers_compensation", 0.0) for row in rows], + "DST_SC1": [row.get("dst_sc1", 0) for row in rows], + "DST_VAL1": [row.get("dst_val1", 0.0) for row in rows], + "DST_SC2": [row.get("dst_sc2", 0) for row in rows], + "DST_VAL2": [row.get("dst_val2", 0.0) for row in rows], + "DST_SC1_YNG": [row.get("dst_sc1_yng", 0) for row in rows], + "DST_VAL1_YNG": [row.get("dst_val1_yng", 0.0) for row in rows], + "DST_SC2_YNG": [row.get("dst_sc2_yng", 0) for row in rows], + "DST_VAL2_YNG": [row.get("dst_val2_yng", 0.0) for row in rows], + "NOW_DIR": [row.get("now_dir", 2) for row in rows], + "NOW_MRK": [row.get("now_mrk", 2) for row in rows], + "NOW_MRKS": [row.get("now_mrks", 2) for row in rows], + "NOW_MRKUN": [row.get("now_mrkun", 2) for row in rows], + "NOW_NONM": [row.get("now_nonm", 2) for row in rows], + "NOW_GRP": [row.get("now_grp", 2) for row in rows], + "NOW_MCARE": [row.get("now_mcare", 2) for row in rows], + "NOW_CAID": [row.get("now_caid", 2) for row in rows], + "NOW_MCAID": [row.get("now_mcaid", 2) for row in rows], + "NOW_PCHIP": [row.get("now_pchip", 2) for row in rows], + "NOW_OTHMT": [row.get("now_othmt", 2) for row in rows], + "NOW_MIL": [row.get("now_mil", 2) for row in rows], + "NOW_CHAMPVA": [row.get("now_champva", 2) for row in rows], + "NOW_VACARE": [row.get("now_vacare", 2) for row in rows], + "NOW_IHSFLG": [row.get("now_ihs", 2) for row in rows], + "NOW_PRIV": [row.get("now_priv", 2) for row in rows], + "NOW_PUB": [row.get("now_pub", 2) for row in rows], + "NOW_COV": [row.get("now_cov", 2) for row in rows], + } + ) + + +def test_person_variables_maps_current_health_coverage_sources(): + for leaf, census_column in CURRENT_HEALTH_COVERAGE_REPORTED_VAR_MAP.items(): + assert PERSON_VARIABLES.get(census_column) == f"_{leaf}" + + +def test_process_persons_populates_health_coverage_support_fields(): + out = _process_persons( + _raw_person_frame( + [ + {"now_mrk": 1, "now_grp": 1, "now_priv": 1, "now_cov": 1}, + {"now_caid": 1, "now_mcaid": 1, "now_pub": 1, "now_cov": 1}, + {"now_cov": 2}, + ] + ), + 2025, + ) + + assert out["reported_has_marketplace_health_coverage_at_interview"].to_list() == [ + True, + False, + False, + ] + assert out[ + "reported_has_employer_sponsored_health_coverage_at_interview" + ].to_list() == [ + True, + False, + False, + ] + assert out["reported_has_medicaid_health_coverage_at_interview"].to_list() == [ + False, + True, + False, + ] + assert out["reported_has_multiple_health_coverage_at_interview"].to_list() == [ + True, + False, + False, + ] + assert out["has_marketplace_health_coverage_at_interview"].to_list() == [ + True, + False, + False, + ] + assert out["has_marketplace_health_coverage"].to_list() == [True, False, False] + assert out["has_esi"].to_list() == [True, False, False] + assert out["reported_is_insured_at_interview"].to_list() == [True, True, False] + assert out["reported_is_uninsured_at_interview"].to_list() == [False, False, True] + + +def test_process_persons_populates_labor_occupation_and_tipped_fields(): + out = _process_persons( + _raw_person_frame( + [ + { + "school": 2, + "hourly_code": 1, + "hourly_cents": 2150, + "union": 1, + "poccu2": 8, + "peioocc": 4000, + "weeks_unemployed": 12, + "veterans_benefits": 700.0, + "workers_compensation": 300.0, + }, + {"hourly_code": 2, "hourly_cents": -1, "poccu2": 52, "peioocc": -1}, + {"poccu2": 53, "weeks_unemployed": -1}, + ] + ), + 2025, + ) + + assert out["is_full_time_college_student"].to_list() == [True, False, False] + assert out["is_paid_hourly"].to_list() == [True, False, False] + assert out["hourly_wage"].to_list() == [21.5, 0.0, 0.0] + assert out["is_union_member_or_covered"].to_list() == [True, False, False] + assert out["detailed_occupation_recode"].to_list() == [8, 52, 53] + assert out["is_computer_scientist"].to_list() == [True, False, False] + assert out["is_military"].to_list() == [False, True, False] + assert out["has_never_worked"].to_list() == [False, False, True] + assert out["treasury_tipped_occupation_code"].to_list() == [105, 0, 0] + assert out["is_tipped_occupation"].to_list() == [True, False, False] + assert out["weeks_unemployed"].to_list() == [12, 0, 0] + assert out["veterans_benefits"].to_list() == [700.0, 0.0, 0.0] + assert out["workers_compensation"].to_list() == [300.0, 0.0, 0.0] + + +def test_process_persons_populates_pension_and_retirement_distribution_leaves(): + out = _process_persons( + _raw_person_frame( + [ + { + "pension": 10_000.0, + "annuity": 2_000.0, + "dst_sc1": 1, + "dst_val1": 1_500.0, + "dst_sc2": 4, + "dst_val2": 2_500.0, + }, + { + "dst_sc1": 2, + "dst_val1": 600.0, + "dst_sc2": 3, + "dst_val2": 700.0, + "dst_sc1_yng": 6, + "dst_val1_yng": 800.0, + "dst_sc2_yng": 7, + "dst_val2_yng": 900.0, + }, + ] + ), + 2025, + ) + + total_pension = 12_000.0 + assert out["pension_income"].to_list() == [total_pension, 0.0] + assert out["taxable_private_pension_income"].to_list() == pytest.approx( + [total_pension * TAXABLE_PENSION_FRACTION, 0.0] + ) + assert out["tax_exempt_private_pension_income"].to_list() == pytest.approx( + [total_pension * (1 - TAXABLE_PENSION_FRACTION), 0.0] + ) + assert out["taxable_401k_distributions"].to_list() == [1_500.0, 0.0] + assert out["regular_ira_distributions"].to_list() == [2_500.0, 0.0] + assert out["taxable_ira_distributions"].to_list() == [2_500.0, 0.0] + assert out["taxable_403b_distributions"].to_list() == [0.0, 600.0] + assert out["roth_ira_distributions"].to_list() == [0.0, 700.0] + assert out["tax_exempt_ira_distributions"].to_list() == [0.0, 700.0] + assert out["taxable_sep_distributions"].to_list() == [0.0, 800.0] + assert out["other_type_retirement_account_distributions"].to_list() == [0.0, 900.0] + + +def test_derive_cps_immigration_status_varies_from_ssn_card_type(): + status = _derive_cps_immigration_status( + ssn_card_type=np.array([1, 0, 2, 3]), + birth_country=np.array([57, 57, 57, 332]), + peinusyr=np.array([0, 29, 28, 20]), + age=np.array([40, 30, 30, 40]), + year=2024, + ) + + assert status.tolist() == [ + "CITIZEN", + "UNDOCUMENTED", + "TPS", + "CUBAN_HAITIAN_ENTRANT", + ] + + +def test_attach_cps_ssn_card_type_persists_identification_exports(): + persons = pl.DataFrame( + { + "household_id": [1, 2], + "year": [2025, 2025], + "age": [40, 30], + } + ) + households = pl.DataFrame({"household_id": [1, 2], "household_weight": [1.0, 1.0]}) + raw = pl.DataFrame( + { + "PRCITSHP": [1, 5], + "PEINUSYR": [0, 29], + "PENATVTY": [57, 57], + "A_HSCOL": [0, 0], + "A_AGE": [40, 30], + "A_MARITL": [0, 0], + "A_SPOUSE": [0, 0], + "MCARE": [0, 0], + "CAID": [0, 0], + "PEN_SC1": [0, 0], + "PEN_SC2": [0, 0], + "RESNSS1": [0, 0], + "RESNSS2": [0, 0], + "IHSFLG": [0, 0], + "CHAMPVA": [0, 0], + "MIL": [0, 0], + "PEIO1COW": [0, 0], + "A_MJOCC": [0, 0], + "SS_YN": [0, 0], + "SPM_ID": [1, 2], + "SPM_CAPHOUSESUB": [0.0, 0.0], + "PEAFEVER": [0, 0], + "SSI_YN": [0, 0], + "WSAL_VAL": [0.0, 0.0], + "SEMP_VAL": [0.0, 0.0], + } + ) + + out = _attach_cps_ssn_card_type( + persons=persons, + households=households, + persons_raw=raw, + ) + + assert out["ssn_card_type"].to_list() == ["CITIZEN", "NONE"] + assert out["has_valid_ssn"].to_list() == [True, False] + assert out["taxpayer_id_type"].to_list() == ["VALID_SSN", "NONE"] + assert out["immigration_status_str"].to_list() == ["CITIZEN", "UNDOCUMENTED"] diff --git a/tests/pipelines/test_us.py b/tests/pipelines/test_us.py index d3d1fcf..a66e4b9 100644 --- a/tests/pipelines/test_us.py +++ b/tests/pipelines/test_us.py @@ -1354,6 +1354,7 @@ def test_build_policyengine_entity_tables_adds_ecps_stochastic_takeup_inputs( ): scalar_calls: list[tuple[str, int]] = [] medicaid_calls: list[int] = [] + pregnancy_calls: list[int] = [] eitc_calls: list[int] = [] voluntary_calls: list[int] = [] @@ -1372,6 +1373,10 @@ def fake_load_medicaid_rates(year: int) -> dict[str, float]: medicaid_calls.append(year) return {"CA": 0.0, "TX": 1.0} + def fake_load_pregnancy_rates(year: int) -> dict[str, float]: + pregnancy_calls.append(year) + return {"CA": 1.0, "TX": 0.0} + def fake_load_eitc_rates(year: int) -> dict[int, float]: eitc_calls.append(year) return {0: 0.0, 1: 1.0, 2: 1.0, 3: 1.0} @@ -1398,6 +1403,11 @@ def fake_load_voluntary_rates( "_load_microplex_medicaid_takeup_rates", fake_load_medicaid_rates, ) + monkeypatch.setattr( + us_pipeline_module, + "_load_microplex_pregnancy_rates", + fake_load_pregnancy_rates, + ) monkeypatch.setattr( us_pipeline_module, "_load_microplex_eitc_takeup_rates", @@ -1443,6 +1453,7 @@ def fake_load_voluntary_rates( True, True, ] + assert persons["is_pregnant"].tolist() == [True, False, False] tax_units = tables.tax_units.sort_values("household_id").reset_index(drop=True) assert tax_units["takes_up_aca_if_eligible"].tolist() == [True, True] @@ -1462,9 +1473,70 @@ def fake_load_voluntary_rates( ("tanf", 2024), ] assert medicaid_calls == [2024] + assert pregnancy_calls == [2024] assert eitc_calls == [2024] assert voluntary_calls == [2024] + def test_attach_policyengine_pregnancy_inputs_assigns_eligible_females( + self, + monkeypatch, + ): + class FakeRng: + def random(self, size: int) -> np.ndarray: + return np.zeros(size) + + monkeypatch.setattr( + us_pipeline_module, + "_load_microplex_pregnancy_rates", + lambda year: {"CA": 0.10, "NY": 0.0}, + ) + monkeypatch.setattr( + us_pipeline_module, + "_microplex_seeded_rng", + lambda variable_name, *, salt=None: FakeRng(), + ) + pipeline = USMicroplexPipeline( + USMicroplexBuildConfig(policyengine_dataset_year=2024) + ) + persons = pd.DataFrame( + { + "age": [20, 44, 45, 30, 20], + "sex": [2, 2, 2, 1, 2], + "state_fips": [6, 36, 6, 6, 99], + } + ) + + result = pipeline._attach_policyengine_pregnancy_inputs(persons) + + assert result["is_pregnant"].tolist() == [ + True, + False, + False, + False, + True, + ] + + def test_attach_policyengine_pregnancy_inputs_preserves_explicit_column( + self, + monkeypatch, + ): + def fail_rates(year: int) -> dict[str, float]: + raise AssertionError(f"unexpected pregnancy rate load: {year}") + + monkeypatch.setattr( + us_pipeline_module, + "_load_microplex_pregnancy_rates", + fail_rates, + ) + pipeline = USMicroplexPipeline( + USMicroplexBuildConfig(policyengine_dataset_year=2024) + ) + persons = pd.DataFrame({"is_pregnant": [1, 0, True, False]}) + + result = pipeline._attach_policyengine_pregnancy_inputs(persons) + + assert result["is_pregnant"].tolist() == [True, False, True, False] + def test_build_policyengine_entity_tables_adds_wic_takeup_inputs( self, monkeypatch, @@ -1504,6 +1576,16 @@ def fake_wic_risk_rates(year: int) -> dict[str, float]: "_load_microplex_wic_nutritional_risk_rates", fake_wic_risk_rates, ) + monkeypatch.setattr( + us_pipeline_module, + "_load_microplex_pregnancy_rates", + lambda year: {}, + ) + monkeypatch.setattr( + us_pipeline_module, + "_microplex_seeded_rng", + lambda variable_name, *, salt=None: np.random.default_rng(0), + ) pipeline = USMicroplexPipeline( USMicroplexBuildConfig(policyengine_dataset_year=2024) ) @@ -1547,6 +1629,9 @@ def fail_scalar_rate(variable_name: str, year: int) -> float: def fail_medicaid_rates(year: int) -> dict[str, float]: raise AssertionError(f"unexpected Medicaid rate load: {year}") + def fail_pregnancy_rates(year: int) -> dict[str, float]: + raise AssertionError(f"unexpected pregnancy rate load: {year}") + def fail_eitc_rates(year: int) -> dict[int, float]: raise AssertionError(f"unexpected EITC rate load: {year}") @@ -1569,6 +1654,11 @@ def fail_wic_risk_rates(year: int) -> dict[str, float]: "_load_microplex_medicaid_takeup_rates", fail_medicaid_rates, ) + monkeypatch.setattr( + us_pipeline_module, + "_load_microplex_pregnancy_rates", + fail_pregnancy_rates, + ) monkeypatch.setattr( us_pipeline_module, "_load_microplex_eitc_takeup_rates", @@ -1604,6 +1694,7 @@ def fail_wic_risk_rates(year: int) -> dict[str, float]: "relationship_to_head": [0, 2], "state_fips": [6, 6], "takes_up_medicaid_if_eligible": [False, True], + "is_pregnant": [False, True], "takes_up_head_start_if_eligible": [False, True], "takes_up_early_head_start_if_eligible": [True, False], "takes_up_aca_if_eligible": [False, True], @@ -1621,6 +1712,7 @@ def fail_wic_risk_rates(year: int) -> dict[str, float]: persons = tables.persons.sort_values("person_id").reset_index(drop=True) assert persons["takes_up_medicaid_if_eligible"].tolist() == [False, True] + assert persons["is_pregnant"].tolist() == [False, True] assert persons["takes_up_head_start_if_eligible"].tolist() == [False, True] assert persons["takes_up_early_head_start_if_eligible"].tolist() == [ True, @@ -5252,6 +5344,64 @@ def test_augment_policyengine_person_inputs_materializes_agi_parity_inputs(self) assert augmented["self_employed_health_insurance_ald"].tolist() == [15.0] assert augmented["self_employed_pension_contribution_ald"].tolist() == [10.0] + def test_augment_policyengine_person_inputs_materializes_export_support_aliases( + self, + ): + pipeline = USMicroplexPipeline(USMicroplexBuildConfig()) + persons = pd.DataFrame( + { + "age": [45, 50], + "sex": [1, 2], + "w2_wages_from_qualified_business": [1_000.0, 0.0], + "unadjusted_basis_qualified_property": [10_000.0, 0.0], + "business_is_sstb": [1, 0], + "sstb_self_employment_income": [300.0, 0.0], + "sstb_w2_wages_from_qualified_business": [200.0, 0.0], + "sstb_unadjusted_basis_qualified_property": [2_000.0, 0.0], + "self_employment_income_would_be_qualified": [1, 0], + "sstb_self_employment_income_would_be_qualified": [1, 0], + "qualified_reit_and_ptp_income": [75.0, 0.0], + "qualified_bdc_income": [25.0, 0.0], + "deductible_mortgage_interest": [900.0, 0.0], + "investment_income_elected_form_4952": [40.0, 0.0], + "health_insurance_premiums_without_medicare_part_b": [120.0, 0.0], + } + ) + + augmented = pipeline._augment_policyengine_person_inputs(persons) + + assert augmented["w2_wages_from_qualified_business"].tolist() == [1_000.0, 0.0] + assert augmented["unadjusted_basis_qualified_property"].tolist() == [ + 10_000.0, + 0.0, + ] + assert augmented["business_is_sstb"].tolist() == [True, False] + assert augmented["sstb_self_employment_income_before_lsr"].tolist() == [ + 300.0, + 0.0, + ] + assert augmented["sstb_w2_wages_from_qualified_business"].tolist() == [ + 200.0, + 0.0, + ] + assert augmented["sstb_unadjusted_basis_qualified_property"].tolist() == [ + 2_000.0, + 0.0, + ] + assert augmented["self_employment_income_would_be_qualified"].tolist() == [ + True, + False, + ] + assert augmented["sstb_self_employment_income_would_be_qualified"].tolist() == [ + True, + False, + ] + assert augmented["qualified_reit_and_ptp_income"].tolist() == [75.0, 0.0] + assert augmented["qualified_bdc_income"].tolist() == [25.0, 0.0] + assert augmented["home_mortgage_interest"].tolist() == [900.0, 0.0] + assert augmented["investment_interest_expense"].tolist() == [40.0, 0.0] + assert augmented["other_health_insurance_premiums"].tolist() == [120.0, 0.0] + def test_augment_policyengine_person_inputs_coalesces_sparse_source_aliases_by_row( self, ): @@ -5298,6 +5448,44 @@ def test_augment_policyengine_person_inputs_coalesces_sparse_source_aliases_by_r -10.0, ] + def test_attach_policyengine_tax_unit_source_inputs_derives_mortgage_structure( + self, + ): + pipeline = USMicroplexPipeline( + USMicroplexBuildConfig(policyengine_dataset_year=2024) + ) + tax_units = pd.DataFrame( + { + "tax_unit_id": [1, 2], + "deductible_mortgage_interest": [600.0, 0.0], + "interest_deduction": [700.0, 0.0], + "scf_mortgage_debt": [8_000.0, 0.0], + } + ) + + augmented = pipeline._attach_policyengine_tax_unit_source_inputs(tax_units) + + assert augmented["first_home_mortgage_interest"].tolist() == [600.0, 0.0] + assert augmented["interest_deduction"].tolist() == [700.0, 0.0] + assert augmented["first_home_mortgage_balance"].tolist() == [10_000.0, 0.0] + assert augmented["first_home_mortgage_origination_year"].tolist() == [2014, 0] + + def test_build_policyengine_households_preserves_vehicle_exports(self): + pipeline = USMicroplexPipeline(USMicroplexBuildConfig()) + persons = pd.DataFrame( + { + "household_id": [10, 10, 20], + "weight": [1.0, 1.0, 2.0], + "household_vehicles_owned": [2.0, 2.0, 1.0], + "household_vehicles_value": [12_000.0, 12_000.0, 6_000.0], + } + ) + + households = pipeline._build_policyengine_households(persons) + + assert households["household_vehicles_owned"].tolist() == [2.0, 1.0] + assert households["household_vehicles_value"].tolist() == [12_000.0, 6_000.0] + def test_augment_policyengine_person_inputs_derives_marital_status_flags_from_cps_codes( self, ): diff --git a/tests/test_cps_source_provider.py b/tests/test_cps_source_provider.py index 541279f..e2b2970 100644 --- a/tests/test_cps_source_provider.py +++ b/tests/test_cps_source_provider.py @@ -479,7 +479,9 @@ def test_load_cps_asec_derives_policyengine_value_inputs(tmp_path): dataset = load_cps_asec(year=2023, cache_dir=tmp_path, download=False) persons = ( - dataset.persons.to_pandas().sort_values("person_number").reset_index(drop=True) + dataset.persons.to_pandas() + .sort_values(["household_id", "person_number"]) + .reset_index(drop=True) ) assert persons["alimony_income"].tolist() == [1200, 0] @@ -507,6 +509,61 @@ def test_load_cps_asec_derives_policyengine_value_inputs(tmp_path): assert persons["medicare_part_b_premiums"].tolist() == [600, 0] +def test_load_cps_asec_attaches_previous_year_income_from_prior_asec(tmp_path): + current_person_rows = pd.DataFrame( + { + "PERIDNUM": ["A", "B", "C", "D"], + "PH_SEQ": [1, 1, 2, 2], + "A_LINENO": [1, 2, 1, 2], + "A_AGE": [34, 31, 45, 17], + "A_FNLWGT": [100, 100, 200, 200], + "WSAL_VAL": [60_000, 10_000, 20_000, 0], + "SEMP_VAL": [5_000, 0, 3_000, 0], + "I_ERNVAL": [0, 1, 0, 0], + "I_SEVAL": [0, 0, 0, 0], + } + ) + previous_person_rows = pd.DataFrame( + { + "PERIDNUM": ["A", "B", "C"], + "WSAL_VAL": [50_000, 8_000, 25_000], + "SEMP_VAL": [6_000, 1_000, 2_500], + "I_ERNVAL": [0, 0, 0], + "I_SEVAL": [0, 0, 1], + } + ) + with zipfile.ZipFile(tmp_path / "cps_asec_2023.zip", "w") as archive: + archive.writestr("pppub23.csv", current_person_rows.to_csv(index=False)) + with zipfile.ZipFile(tmp_path / "cps_asec_2022.zip", "w") as archive: + archive.writestr("pppub22.csv", previous_person_rows.to_csv(index=False)) + + dataset = load_cps_asec(year=2023, cache_dir=tmp_path, download=False) + persons = ( + dataset.persons.to_pandas() + .sort_values(["household_id", "person_number"]) + .reset_index(drop=True) + ) + + assert persons["employment_income_last_year"].tolist() == [ + 50_000.0, + 8_000.0, + -1.0, + -1.0, + ] + assert persons["self_employment_income_last_year"].tolist() == [ + 6_000.0, + 1_000.0, + -1.0, + -1.0, + ] + assert persons["previous_year_income_available"].tolist() == [ + True, + False, + False, + False, + ] + + def test_load_cps_asec_derives_survivor_and_dependent_social_security(tmp_path): person_rows = pd.DataFrame( { diff --git a/tests/test_donor_survey_source_providers.py b/tests/test_donor_survey_source_providers.py index 218121e..b0bfffd 100644 --- a/tests/test_donor_survey_source_providers.py +++ b/tests/test_donor_survey_source_providers.py @@ -101,6 +101,8 @@ def _sipp_assets_tables(**_kwargs) -> DonorSurveyTables: "bank_account_assets": [2_500.0, 10_000.0], "stock_assets": [0.0, 4_000.0], "bond_assets": [0.0, 1_500.0], + "household_vehicles_owned": [2.0, 1.0], + "household_vehicles_value": [12_000.0, 6_000.0], "weight": [80.0, 90.0], "year": [2023, 2023], } @@ -694,6 +696,8 @@ def test_sipp_assets_provider_uses_manifest_backed_raw_loader( "TVAL_BANK": [1.0, 2.0, 3.0], "TVAL_STMF": [4.0, 5.0, 6.0], "TVAL_BOND": [7.0, 8.0, 9.0], + "TVEH_NUM": [0.0, 2.0, 1.0], + "THVAL_VEH": [0.0, 12_000.0, 6_000.0], } ).to_csv(path, index=False, sep="|") @@ -712,3 +716,5 @@ def test_sipp_assets_provider_uses_manifest_backed_raw_loader( assert persons["is_female"].tolist() == [1.0, 1.0] assert persons["is_married"].tolist() == [1.0, 0.0] assert persons["count_under_18"].tolist() == [0.0, 0.0] + assert persons["household_vehicles_owned"].tolist() == [2.0, 1.0] + assert persons["household_vehicles_value"].tolist() == [12_000.0, 6_000.0] diff --git a/tests/test_pe_source_impute_specs.py b/tests/test_pe_source_impute_specs.py index 17e6b57..8157ad1 100644 --- a/tests/test_pe_source_impute_specs.py +++ b/tests/test_pe_source_impute_specs.py @@ -25,6 +25,8 @@ def test_load_pe_source_impute_block_specs_reads_manifest() -> None: "bank_account_assets", "stock_assets", "bond_assets", + "household_vehicles_owned", + "household_vehicles_value", ) assert specs["sipp_tips"].raw_loader is not None assert specs["sipp_tips"].raw_loader.filename == "pu2023_slim.csv" @@ -83,7 +85,9 @@ def test_resolve_sipp_source_impute_block_spec_and_named_lookup() -> None: assert tips.matches_source_name("sipp_2023") is False -def test_prepare_pe_source_impute_condition_frame_derives_manifest_backed_predictors() -> None: +def test_prepare_pe_source_impute_condition_frame_derives_manifest_backed_predictors() -> ( + None +): spec = get_pe_source_impute_block_spec("acs") frame = pd.DataFrame( { diff --git a/tests/test_puf_source_provider.py b/tests/test_puf_source_provider.py index 8717bdf..70eaed0 100644 --- a/tests/test_puf_source_provider.py +++ b/tests/test_puf_source_provider.py @@ -5,6 +5,7 @@ import sys import types +import numpy as np import pandas as pd import pytest from microplex.core import EntityType, SourceArchetype, SourceProvider, SourceQuery @@ -371,6 +372,32 @@ def test_expand_to_persons_derives_retirement_social_security_for_older_records( assert persons["social_security_retirement"].tolist() == [40.0, 0.0] +def test_expand_to_persons_preserves_qbi_boolean_flags_for_joint_units(): + tax_units = pd.DataFrame( + { + "filing_status": ["JOINT"], + "business_is_sstb": [True], + "self_employment_income_would_be_qualified": [True], + "sstb_self_employment_income_would_be_qualified": [True], + "weight": [1.0], + "household_id": ["joint-household"], + "year": [2024], + } + ) + + persons = expand_to_persons(tax_units) + + assert persons["business_is_sstb"].tolist() == [True, True] + assert persons["self_employment_income_would_be_qualified"].tolist() == [ + True, + True, + ] + assert persons["sstb_self_employment_income_would_be_qualified"].tolist() == [ + True, + True, + ] + + def test_impute_puf_social_security_components_uses_grouped_cps_shares(): persons = pd.DataFrame( { @@ -983,6 +1010,56 @@ def test_map_puf_variables_uses_pe_puf_business_and_farm_income_formulas(): assert mapped.loc[0, "farm_rent_income"] == 125.0 +def test_map_puf_variables_adds_qbi_export_support_columns(): + n = 800 + raw = pd.DataFrame( + { + "RECID": range(1, n + 1), + "MARS": np.where(np.arange(n) % 3 == 0, 2, 1), + "XTOT": np.where(np.arange(n) % 3 == 0, 2, 1), + "S006": np.full(n, 100.0), + "E00900": np.linspace(1_000.0, 200_000.0, n), + "E02100": np.linspace(0.0, 40_000.0, n), + "E26270": np.linspace(0.0, 60_000.0, n), + "E26390": np.linspace(0.0, 8_000.0, n), + "E26400": np.zeros(n), + "E25850": np.linspace(0.0, 30_000.0, n), + "E25860": np.zeros(n), + } + ) + + mapped = map_puf_variables(raw) + + expected_columns = { + "business_is_sstb", + "w2_wages_from_qualified_business", + "unadjusted_basis_qualified_property", + "sstb_self_employment_income_before_lsr", + "sstb_self_employment_income_would_be_qualified", + "sstb_w2_wages_from_qualified_business", + "sstb_unadjusted_basis_qualified_property", + "qualified_reit_and_ptp_income", + "qualified_bdc_income", + "self_employment_income_would_be_qualified", + "farm_operations_income_would_be_qualified", + "farm_rent_income_would_be_qualified", + "rental_income_would_be_qualified", + "estate_income_would_be_qualified", + "partnership_s_corp_income_would_be_qualified", + } + assert expected_columns <= set(mapped.columns) + assert mapped["w2_wages_from_qualified_business"].sum() > 0.0 + assert mapped["unadjusted_basis_qualified_property"].sum() > 0.0 + assert mapped["qualified_reit_and_ptp_income"].sum() > 0.0 + assert mapped["business_is_sstb"].any() + assert mapped["self_employment_income_would_be_qualified"].nunique() == 2 + np.testing.assert_allclose( + mapped["self_employment_income"] + + mapped["sstb_self_employment_income_before_lsr"], + raw["E00900"], + ) + + def test_map_puf_variables_adds_pe_exact_irs_inputs(): raw = pd.DataFrame( {