From 05d582fa38b90cca0629da341f413ef95c5cd568 Mon Sep 17 00:00:00 2001 From: Max Ghenis Date: Mon, 1 Jun 2026 20:39:04 -0400 Subject: [PATCH] Remove policyengine-us-data runtime imports --- src/microplex_us/data_sources/puf.py | 205 ++++------- src/microplex_us/pipelines/pe_l0.py | 169 ++------- src/microplex_us/pipelines/us.py | 344 +++++++++++++----- tests/pipelines/test_pe_l0.py | 64 ++-- tests/pipelines/test_us.py | 120 +++++- ...no_policyengine_us_data_runtime_imports.py | 25 ++ tests/test_puf_source_provider.py | 63 ++-- 7 files changed, 543 insertions(+), 447 deletions(-) create mode 100644 tests/test_no_policyengine_us_data_runtime_imports.py diff --git a/src/microplex_us/data_sources/puf.py b/src/microplex_us/data_sources/puf.py index 5747991..1020636 100644 --- a/src/microplex_us/data_sources/puf.py +++ b/src/microplex_us/data_sources/puf.py @@ -6,16 +6,11 @@ from __future__ import annotations -import pickle import re -import subprocess -import sys -import tempfile from collections.abc import Callable from dataclasses import dataclass, field from functools import cache, lru_cache from pathlib import Path -from textwrap import dedent from typing import Any import numpy as np @@ -40,8 +35,6 @@ predict_grouped_component_shares, ) from microplex_us.pipelines.pe_native_scores import ( - build_policyengine_us_data_subprocess_env, - resolve_policyengine_us_data_python, resolve_policyengine_us_data_repo_root, ) from microplex_us.source_manifests import load_us_source_manifest @@ -336,45 +329,6 @@ class PEStyleQRFImputationModel: fitted_model: Any -@dataclass(frozen=True) -class PEStyleSubprocessImputationPredictor: - """Run PE-style QRF imputation in the PE-US-data environment.""" - - policyengine_us_data_repo: str | Path - policyengine_us_data_python: str | Path | None = None - - def predict(self, X_test: pd.DataFrame) -> pd.DataFrame: - resolved_repo = resolve_policyengine_us_data_repo_root( - self.policyengine_us_data_repo - ) - resolved_python = resolve_policyengine_us_data_python( - self.policyengine_us_data_python, - repo_root=resolved_repo, - ) - env = build_policyengine_us_data_subprocess_env(resolved_repo) - with tempfile.TemporaryDirectory(prefix="microplex-us-puf-pretax-") as tempdir: - predictors_path = Path(tempdir) / "predictors.pkl" - predictions_path = Path(tempdir) / "predictions.pkl" - with predictors_path.open("wb") as handle: - pickle.dump(pd.DataFrame(X_test), handle) - subprocess.run( - [ - str(resolved_python), - "-c", - _build_pe_style_puf_pre_tax_subprocess_script(), - str(resolved_repo), - str(predictors_path), - str(predictions_path), - ], - check=True, - cwd=resolved_repo, - env=env, - ) - with predictions_path.open("rb") as handle: - predictions = pickle.load(handle) - return pd.DataFrame(predictions) - - PUF_DEMOGRAPHIC_VARIABLES = ( "AGEDP1", "AGEDP2", @@ -1294,7 +1248,6 @@ def map_puf_variables( FileNotFoundError, ImportError, ValueError, - subprocess.CalledProcessError, ): if require_pre_tax_contribution_model: raise @@ -1685,78 +1638,6 @@ def _default_pe_style_puf_social_security_share_model( ) -def _ensure_policyengine_us_data_repo_on_sys_path( - policyengine_us_data_repo: str | Path | None, -) -> None: - if policyengine_us_data_repo is None: - return - repo_root = Path(policyengine_us_data_repo).expanduser().resolve() - if not repo_root.exists(): - raise ValueError(f"PolicyEngine US-data repo does not exist: {repo_root}") - repo_root_str = str(repo_root) - if repo_root_str not in sys.path: - sys.path.insert(0, repo_root_str) - - -def _build_pe_style_puf_pre_tax_subprocess_script() -> str: - return dedent( - """ -import pickle -import sys - -import pandas as pd - -repo_root = sys.argv[1] -predictors_path = sys.argv[2] -predictions_path = sys.argv[3] - -if repo_root not in sys.path: - sys.path.insert(0, repo_root) - -from microimpute.models.qrf import QRF -from policyengine_us import Microsimulation -from policyengine_us_data.datasets.cps import CPS_2021 - -with open(predictors_path, "rb") as handle: - X_test = pickle.load(handle) -X_test = pd.DataFrame(X_test) - -predictors = ["employment_income", "age", "is_male"] -cps = Microsimulation(dataset=CPS_2021) -cps.subsample(10_000) -cps_df = cps.calculate_dataframe( - [*predictors, "household_weight", "pre_tax_contributions"] -) -train = cps_df.loc[:, [*predictors, "pre_tax_contributions"]].copy() -train = train.apply(lambda column: pd.to_numeric(column, errors="coerce").fillna(0.0)) -X_test = X_test.loc[:, predictors].copy() -X_test = X_test.apply(lambda column: pd.to_numeric(column, errors="coerce").fillna(0.0)) - -qrf = QRF(log_level="WARNING", memory_efficient=True) -fitted_model = qrf.fit( - X_train=train, - predictors=predictors, - imputed_variables=["pre_tax_contributions"], - n_jobs=1, -) -predictions = fitted_model.predict(X_test=X_test) - -with open(predictions_path, "wb") as handle: - pickle.dump( - pd.DataFrame( - { - "pre_tax_contributions": pd.to_numeric( - predictions["pre_tax_contributions"], - errors="coerce", - ).fillna(0.0) - } - ), - handle, - ) -""" - ).strip() - - def _load_pe_extended_cps_pre_tax_training_frame( *, policyengine_us_data_repo: str | Path, @@ -1819,6 +1700,69 @@ def _load_pe_extended_cps_pre_tax_training_frame( return train +def _load_microplex_cps_pre_tax_training_frame( + *, + training_year: int, +) -> pd.DataFrame: + cps = load_cps_asec(year=int(training_year)) + persons = cps.persons.to_pandas() + index = persons.index + employment_income = pd.to_numeric( + persons.get("employment_income", persons.get("wage_income", 0.0)), + errors="coerce", + ).fillna(0.0) + age = pd.to_numeric(persons.get("age", 0.0), errors="coerce").fillna(0.0) + if "is_male" in persons.columns: + is_male = pd.to_numeric(persons["is_male"], errors="coerce").fillna(0.0) + elif "sex" in persons.columns: + is_male = ( + pd.to_numeric(persons["sex"], errors="coerce") + .fillna(0) + .astype(int) + .eq(1) + .astype(float) + ) + else: + is_male = pd.Series(0.0, index=index) + + if "pre_tax_contributions" in persons.columns: + pre_tax_contributions = pd.to_numeric( + persons["pre_tax_contributions"], + errors="coerce", + ).fillna(0.0) + else: + pre_tax_contributions = pd.Series(0.0, index=index) + for column in ( + "traditional_401k_contributions", + "traditional_401k_contributions_desired", + "traditional_403b_contributions", + "traditional_403b_contributions_desired", + "pre_tax_health_insurance_premiums", + "health_savings_account_payroll_contributions", + ): + if column in persons.columns: + pre_tax_contributions = pre_tax_contributions.add( + pd.to_numeric(persons[column], errors="coerce").fillna(0.0), + fill_value=0.0, + ) + + train = pd.DataFrame( + { + "employment_income": employment_income, + "age": age, + "is_male": is_male, + "pre_tax_contributions": pre_tax_contributions, + }, + index=index, + ) + train = train.apply( + lambda column: pd.to_numeric(column, errors="coerce").fillna(0.0) + ) + if len(train) > 10_000: + train = train.sample(n=10_000, random_state=0) + return train + + @lru_cache(maxsize=4) def _default_pe_style_puf_pre_tax_contribution_model( *, @@ -1834,13 +1778,8 @@ def _default_pe_style_puf_pre_tax_contribution_model( training_year=pre_tax_training_year, ) except (FileNotFoundError, KeyError, OSError, ValueError): - return PEStyleQRFImputationModel( - predictors=predictors, - imputed_variable="pre_tax_contributions", - fitted_model=PEStyleSubprocessImputationPredictor( - policyengine_us_data_repo=policyengine_us_data_repo, - policyengine_us_data_python=policyengine_us_data_python, - ), + train = _load_microplex_cps_pre_tax_training_frame( + training_year=pre_tax_training_year, ) from microimpute.models.qrf import QRF @@ -1862,19 +1801,9 @@ def _default_pe_style_puf_pre_tax_contribution_model( ) from microimpute.models.qrf import QRF - from policyengine_us import Microsimulation - _ensure_policyengine_us_data_repo_on_sys_path(policyengine_us_data_repo) - from policyengine_us_data.datasets.cps import CPS_2021 - - cps = Microsimulation(dataset=CPS_2021) - cps.subsample(10_000) - cps_df = cps.calculate_dataframe( - [*predictors, "household_weight", "pre_tax_contributions"] - ) - train = cps_df.loc[:, [*predictors, "pre_tax_contributions"]].copy() - train = train.apply( - lambda column: pd.to_numeric(column, errors="coerce").fillna(0.0) + train = _load_microplex_cps_pre_tax_training_frame( + training_year=pre_tax_training_year ) qrf = QRF(log_level="WARNING", memory_efficient=True) diff --git a/src/microplex_us/pipelines/pe_l0.py b/src/microplex_us/pipelines/pe_l0.py index ef830d3..d24ba7b 100644 --- a/src/microplex_us/pipelines/pe_l0.py +++ b/src/microplex_us/pipelines/pe_l0.py @@ -2,10 +2,8 @@ from __future__ import annotations -import json -import os -import subprocess -import tempfile +from collections.abc import Callable +from os import PathLike from typing import Any, Self import numpy as np @@ -17,54 +15,9 @@ ) from scipy import sparse as sp -_PE_L0_SUBPROCESS_SCRIPT = """ -import json -import sys - -import numpy as np -from scipy import sparse as sp - -REPO_ROOT = sys.argv[1] -if REPO_ROOT not in sys.path: - sys.path.insert(0, REPO_ROOT) - -from policyengine_us_data.calibration.unified_calibration import fit_l0_weights - -X_sparse = sp.load_npz(sys.argv[2]) -targets = np.load(sys.argv[3]) -initial_weights = np.load(sys.argv[4]) -with open(sys.argv[5]) as handle: - target_names = json.load(handle) -output_path = sys.argv[6] -lambda_l0 = float(sys.argv[7]) -epochs = int(sys.argv[8]) -device = sys.argv[9] -verbose_freq = None if sys.argv[10] == "none" else int(sys.argv[10]) -beta = float(sys.argv[11]) -lambda_l2 = float(sys.argv[12]) -learning_rate = float(sys.argv[13]) -achievable = np.asarray(X_sparse.sum(axis=1)).reshape(-1) > 0 - -weights = fit_l0_weights( - X_sparse=X_sparse, - targets=targets, - lambda_l0=lambda_l0, - epochs=epochs, - device=device, - verbose_freq=verbose_freq, - beta=beta, - lambda_l2=lambda_l2, - learning_rate=learning_rate, - target_names=target_names, - initial_weights=initial_weights, - achievable=achievable, -) -np.save(output_path, np.asarray(weights, dtype=float)) -""".strip() - class PolicyEngineL0Calibrator: - """Wrap PolicyEngine US-data's L0 optimizer behind the Microplex interface.""" + """Legacy L0 adapter for explicit experiments behind the Microplex interface.""" def __init__( self, @@ -77,8 +30,9 @@ def __init__( tol: float = 1e-6, device: str = "cpu", verbose_freq: int | None = None, - policyengine_us_data_repo_root: str | os.PathLike[str] | None = None, - policyengine_us_data_python: str | os.PathLike[str] | None = None, + policyengine_us_data_repo_root: str | PathLike[str] | None = None, + policyengine_us_data_python: str | PathLike[str] | None = None, + fit_l0_weights_fn: Callable[..., np.ndarray] | None = None, ) -> None: self.lambda_l0 = float(lambda_l0) self.lambda_l2 = float(lambda_l2) @@ -90,6 +44,7 @@ def __init__( self.verbose_freq = verbose_freq self.policyengine_us_data_repo_root = policyengine_us_data_repo_root self.policyengine_us_data_python = policyengine_us_data_python + self.fit_l0_weights_fn = fit_l0_weights_fn self.weights_: np.ndarray | None = None self.is_fitted_: bool = False @@ -111,7 +66,9 @@ def fit( marginal_targets: dict[str, dict[str, float]], continuous_targets: dict[str, float] | None = None, weight_col: str = "weight", - linear_constraints: tuple[LinearConstraint, ...] | list[LinearConstraint] | None = None, + linear_constraints: tuple[LinearConstraint, ...] + | list[LinearConstraint] + | None = None, ) -> Self: self.n_records_ = len(data) self.marginal_targets_ = marginal_targets @@ -192,13 +149,9 @@ def _fit_weights( initial_weights=initial_weights, ) self.effective_backend_ = "policyengine_l0" - try: - from policyengine_us_data.calibration.unified_calibration import ( - fit_l0_weights, - ) - + if self.fit_l0_weights_fn is not None: achievable = np.asarray(X_sparse.sum(axis=1)).reshape(-1) > 0 - return fit_l0_weights( + return self.fit_l0_weights_fn( X_sparse=X_sparse, targets=targets, lambda_l0=self.lambda_l0, @@ -212,13 +165,12 @@ def _fit_weights( initial_weights=initial_weights, achievable=achievable, ) - except ImportError: - return self._fit_weights_via_policyengine_python( - X_sparse=X_sparse, - targets=targets, - initial_weights=initial_weights, - target_names=target_names, - ) + raise RuntimeError( + "The pe_l0 backend is legacy/experimental and no longer loads " + "policyengine-us-data implicitly. Pass an explicit fit_l0_weights_fn " + "for an experiment, or use the production entropy/dense calibration " + "path for MP eCPS replacement builds." + ) def _fit_dense_no_l0_weights( self, @@ -300,87 +252,6 @@ def gradient(candidate: np.ndarray) -> np.ndarray: self.n_iterations_ = completed_iter return weights - def _fit_weights_via_policyengine_python( - self, - *, - X_sparse, - targets: np.ndarray, - initial_weights: np.ndarray, - target_names: list[str], - ) -> np.ndarray: - from microplex_us.pipelines.pe_native_scores import ( - build_policyengine_us_data_pythonpath, - resolve_policyengine_us_data_python, - resolve_policyengine_us_data_repo_root, - ) - - repo_root = resolve_policyengine_us_data_repo_root( - self.policyengine_us_data_repo_root - ) - python_path = resolve_policyengine_us_data_python( - self.policyengine_us_data_python, - repo_root=repo_root, - ) - env = { - key: value - for key, value in os.environ.items() - if key in {"HOME", "PATH", "TMPDIR", "LANG", "LC_ALL", "TZ"} - } - env["PYTHONPATH"] = build_policyengine_us_data_pythonpath( - repo_root, - existing_pythonpath=os.environ.get("PYTHONPATH"), - ) - - with tempfile.TemporaryDirectory(prefix="microplex-pe-l0-") as tmpdir: - matrix_path = os.path.join(tmpdir, "constraints.npz") - targets_path = os.path.join(tmpdir, "targets.npy") - initial_path = os.path.join(tmpdir, "initial_weights.npy") - names_path = os.path.join(tmpdir, "target_names.json") - output_path = os.path.join(tmpdir, "weights.npy") - sp.save_npz(matrix_path, X_sparse) - np.save(targets_path, targets) - np.save(initial_path, initial_weights) - with open(names_path, "w") as handle: - json.dump(target_names, handle) - - verbose_freq = "none" if self.verbose_freq is None else str(self.verbose_freq) - try: - completed = subprocess.run( - [ - str(python_path), - "-c", - _PE_L0_SUBPROCESS_SCRIPT, - str(repo_root), - matrix_path, - targets_path, - initial_path, - names_path, - output_path, - str(self.lambda_l0), - str(self.epochs), - self.device, - verbose_freq, - str(self.beta), - str(self.lambda_l2), - str(self.learning_rate), - ], - check=True, - capture_output=True, - text=True, - env=env, - ) - except subprocess.CalledProcessError as exc: - stderr = (exc.stderr or "").strip() - stdout = (exc.stdout or "").strip() - detail = stderr or stdout or str(exc) - raise RuntimeError( - "PolicyEngine L0 subprocess calibration failed. " - f"Detail: {detail}" - ) from exc - if completed.stderr: - print(completed.stderr, end="") - return np.load(output_path) - def transform( self, data: pd.DataFrame, @@ -402,7 +273,9 @@ def fit_transform( marginal_targets: dict[str, dict[str, float]], continuous_targets: dict[str, float] | None = None, weight_col: str = "weight", - linear_constraints: tuple[LinearConstraint, ...] | list[LinearConstraint] | None = None, + linear_constraints: tuple[LinearConstraint, ...] + | list[LinearConstraint] + | None = None, ) -> pd.DataFrame: self.fit( data, diff --git a/src/microplex_us/pipelines/us.py b/src/microplex_us/pipelines/us.py index 2336778..9219ab0 100644 --- a/src/microplex_us/pipelines/us.py +++ b/src/microplex_us/pipelines/us.py @@ -201,6 +201,59 @@ DEFAULT_EITC_TAKEUP_RATES_BY_CHILDREN = {0: 0.65, 1: 0.86, 2: 0.85, 3: 0.85} DEFAULT_HEAD_START_TAKEUP_RATE = 0.30 DEFAULT_MEDICAID_TAKEUP_RATE = 0.93 +DEFAULT_MEDICAID_TAKEUP_RATES_BY_STATE = { + "AK": 0.88, + "AL": 0.92, + "AR": 0.79, + "AZ": 0.95, + "CA": 0.78, + "CO": 0.99, + "CT": 0.89, + "DC": 0.99, + "DE": 0.86, + "FL": 0.98, + "GA": 0.73, + "HI": 0.88, + "IA": 0.84, + "ID": 0.78, + "IL": 0.85, + "IN": 0.99, + "KS": 0.92, + "KY": 0.87, + "LA": 0.79, + "MA": 0.94, + "MD": 0.95, + "ME": 0.92, + "MI": 0.91, + "MN": 0.89, + "MO": 0.89, + "MS": 0.75, + "MT": 0.83, + "NC": 0.94, + "ND": 0.91, + "NE": 0.79, + "NH": 0.84, + "NJ": 0.74, + "NM": 0.84, + "NV": 0.93, + "NY": 0.86, + "OH": 0.82, + "OK": 0.77, + "OR": 0.92, + "PA": 0.64, + "RI": 0.94, + "SC": 0.93, + "SD": 0.88, + "TN": 0.92, + "TX": 0.76, + "UT": 0.53, + "VA": 0.82, + "VT": 0.93, + "WA": 0.98, + "WI": 0.91, + "WV": 0.83, + "WY": 0.70, +} DEFAULT_SNAP_TAKEUP_RATE = 0.82 DEFAULT_TANF_TAKEUP_RATE = 0.22 DEFAULT_VOLUNTARY_FILING_RATE = 0.05 @@ -218,13 +271,35 @@ "high": {"under_65": 0.025, "age_65_plus": 0.0037}, }, } +WIC_TAKEUP_CATEGORY_PREGNANT = "PREGNANT" +WIC_TAKEUP_CATEGORY_POSTPARTUM = "POSTPARTUM" +WIC_TAKEUP_CATEGORY_BREASTFEEDING = "BREASTFEEDING" +WIC_TAKEUP_CATEGORY_INFANT = "INFANT" +WIC_TAKEUP_CATEGORY_CHILD = "CHILD" +WIC_TAKEUP_CATEGORY_NONE = "NONE" +DEFAULT_WIC_TAKEUP_RATES = { + WIC_TAKEUP_CATEGORY_PREGNANT: 0.456, + WIC_TAKEUP_CATEGORY_POSTPARTUM: 0.689, + WIC_TAKEUP_CATEGORY_BREASTFEEDING: 0.663, + WIC_TAKEUP_CATEGORY_INFANT: 0.784, + WIC_TAKEUP_CATEGORY_CHILD: 0.460, + WIC_TAKEUP_CATEGORY_NONE: 0.0, +} +DEFAULT_WIC_NUTRITIONAL_RISK_RATES = { + WIC_TAKEUP_CATEGORY_PREGNANT: 0.913, + WIC_TAKEUP_CATEGORY_POSTPARTUM: 0.933, + WIC_TAKEUP_CATEGORY_BREASTFEEDING: 0.889, + WIC_TAKEUP_CATEGORY_INFANT: 0.950, + WIC_TAKEUP_CATEGORY_CHILD: 0.752, + WIC_TAKEUP_CATEGORY_NONE: 0.0, +} EITC_TAKEUP_CHILD_COUNT_HELPER_COLUMN = "_mp_eitc_child_count_for_takeup" VOLUNTARY_FILING_AGE_HEAD_HELPER_COLUMN = "_mp_voluntary_filing_age_head" VOLUNTARY_FILING_WAGE_INCOME_HELPER_COLUMN = "_mp_voluntary_filing_wage_income" def _stable_string_hash(value: str) -> np.uint64: - """Deterministic string hash matching policyengine-us-data's RNG helper.""" + """Deterministic string hash for reproducible MP stochastic inputs.""" with warnings.catch_warnings(): warnings.filterwarnings("ignore", "overflow encountered", RuntimeWarning) hashed = np.uint64(0) @@ -236,7 +311,7 @@ def _stable_string_hash(value: str) -> np.uint64: return hashed -def _policyengine_us_data_seeded_rng( +def _microplex_seeded_rng( variable_name: str, *, salt: str | None = None, @@ -246,75 +321,54 @@ def _policyengine_us_data_seeded_rng( return np.random.default_rng(seed=seed) -def _load_policyengine_us_data_takeup_rate(variable_name: str, year: int) -> float: - """Load an eCPS take-up rate, with scalar fallbacks for non-PE-data envs.""" - try: - from policyengine_us_data.parameters import load_take_up_rate - except ImportError: - if variable_name == "aca": - return DEFAULT_ACA_TAKEUP_RATE - if variable_name == "dc_ptc": - return DEFAULT_DC_PTC_TAKEUP_RATE - if variable_name == "early_head_start": - return DEFAULT_EARLY_HEAD_START_TAKEUP_RATE - if variable_name == "head_start": - return DEFAULT_HEAD_START_TAKEUP_RATE - if variable_name == "snap": - return DEFAULT_SNAP_TAKEUP_RATE - if variable_name == "tanf": - return DEFAULT_TANF_TAKEUP_RATE - raise - rate = load_take_up_rate(variable_name, year) - if isinstance(rate, dict): - raise TypeError(f"Expected scalar take-up rate for {variable_name!r}, got dict") - return float(rate) - - -def _load_policyengine_us_data_medicaid_takeup_rates(year: int) -> dict[str, float]: - """Load eCPS Medicaid take-up rates by state abbreviation.""" - try: - from policyengine_us_data.parameters import load_take_up_rate - except ImportError: - return { - state_abbr: DEFAULT_MEDICAID_TAKEUP_RATE - for state_abbr in STATE_FIPS.values() - } - rates = load_take_up_rate("medicaid", year) - if not isinstance(rates, dict): - raise TypeError(f"Expected dict take-up rate for 'medicaid', got {type(rates)}") - return {str(state): float(rate) for state, rate in rates.items()} +def _load_microplex_takeup_rate(variable_name: str, year: int) -> float: + """Load MP-owned scalar take-up assumptions for PE dataset inputs.""" + if variable_name == "aca": + return DEFAULT_ACA_TAKEUP_RATE + if variable_name == "dc_ptc": + return DEFAULT_DC_PTC_TAKEUP_RATE + if variable_name == "early_head_start": + return DEFAULT_EARLY_HEAD_START_TAKEUP_RATE + if variable_name == "head_start": + return 0.40 if year <= 2020 else DEFAULT_HEAD_START_TAKEUP_RATE + if variable_name == "snap": + return DEFAULT_SNAP_TAKEUP_RATE + if variable_name == "tanf": + return DEFAULT_TANF_TAKEUP_RATE + raise KeyError(f"Unknown Microplex take-up rate: {variable_name!r}") -def _load_policyengine_us_data_eitc_takeup_rates(year: int) -> dict[int, float]: - """Load eCPS EITC take-up rates by capped qualifying-child count.""" - try: - from policyengine_us_data.parameters import load_take_up_rate - except ImportError: - return dict(DEFAULT_EITC_TAKEUP_RATES_BY_CHILDREN) - rates = load_take_up_rate("eitc", year) - if not isinstance(rates, dict): - raise TypeError(f"Expected dict take-up rate for 'eitc', got {type(rates)}") - return {int(children): float(rate) for children, rate in rates.items()} +def _load_microplex_medicaid_takeup_rates(year: int) -> dict[str, float]: + """Load MP-owned Medicaid take-up rates by state abbreviation.""" + _ = year + return dict(DEFAULT_MEDICAID_TAKEUP_RATES_BY_STATE) -def _load_policyengine_us_data_voluntary_filing_rates(year: int) -> dict: - """Load current eCPS voluntary-filing rate table.""" - try: - from policyengine_us_data.parameters import load_take_up_rate - except ImportError: - return DEFAULT_VOLUNTARY_FILING_RATES - rates = load_take_up_rate("voluntary_filing", year) - if not isinstance(rates, dict): - # Older PE-US-data used a scalar voluntary filing rate. - scalar_rate = float(rates) - return { - children: { - wage: {age: scalar_rate for age in ("under_65", "age_65_plus")} - for wage in ("zero", "low", "medium", "high") - } - for children in ("no_children", "with_children") - } - return rates +def _load_microplex_eitc_takeup_rates(year: int) -> dict[int, float]: + """Load MP-owned EITC take-up rates by capped qualifying-child count.""" + _ = year + return dict(DEFAULT_EITC_TAKEUP_RATES_BY_CHILDREN) + + +def _load_microplex_voluntary_filing_rates(year: int) -> dict: + """Load MP-owned voluntary filing rate table.""" + _ = year + return { + children: {wage: dict(age_rates) for wage, age_rates in wage_rates.items()} + for children, wage_rates in DEFAULT_VOLUNTARY_FILING_RATES.items() + } + + +def _load_microplex_wic_takeup_rates(year: int) -> dict[str, float]: + """Load MP-owned WIC take-up rates by demographic category.""" + _ = year + return dict(DEFAULT_WIC_TAKEUP_RATES) + + +def _load_microplex_wic_nutritional_risk_rates(year: int) -> dict[str, float]: + """Load MP-owned WIC nutritional-risk rates by demographic category.""" + _ = year + return dict(DEFAULT_WIC_NUTRITIONAL_RISK_RATES) PUF_SUPPORT_CLONE_OVERRIDDEN_VARIABLES: tuple[str, ...] = ( @@ -4836,6 +4890,7 @@ def build_policyengine_entity_tables( tax_units = self._attach_policyengine_tax_unit_takeup_inputs(tax_units) persons = self._construct_aotc_eligibility_inputs(persons) persons = self._assign_family_and_spm_units(persons) + persons = self._attach_policyengine_wic_inputs(persons) families = self._collapse_group_table(persons, "family_id") spm_units = self._collapse_group_table(persons, "spm_unit_id") spm_units = self._attach_spm_unit_source_columns(persons, spm_units) @@ -8590,8 +8645,8 @@ def _attach_policyengine_aca_takeup( or self.config.policyengine_target_period or 2024 ) - rate = _load_policyengine_us_data_takeup_rate("aca", year) - rng = _policyengine_us_data_seeded_rng(column) + rate = _load_microplex_takeup_rate("aca", year) + rng = _microplex_seeded_rng(column) result[column] = rng.random(len(result)) < rate return result @@ -8624,8 +8679,8 @@ def _attach_policyengine_simple_tax_unit_takeup( return result year = self._policyengine_takeup_year() - rate = _load_policyengine_us_data_takeup_rate(rate_key, year) - rng = _policyengine_us_data_seeded_rng(column) + rate = _load_microplex_takeup_rate(rate_key, year) + rng = _microplex_seeded_rng(column) result[column] = rng.random(len(result)) < rate return result @@ -8642,7 +8697,7 @@ def _attach_policyengine_eitc_takeup( return result year = self._policyengine_takeup_year() - rates = _load_policyengine_us_data_eitc_takeup_rates(year) + rates = _load_microplex_eitc_takeup_rates(year) child_count_column = ( EITC_TAKEUP_CHILD_COUNT_HELPER_COLUMN if EITC_TAKEUP_CHILD_COUNT_HELPER_COLUMN in result.columns @@ -8660,7 +8715,7 @@ def _attach_policyengine_eitc_takeup( .astype(int) ) takeup_rate = dependent_count.map(lambda count: rates.get(int(count), 0.85)) - rng = _policyengine_us_data_seeded_rng(column) + rng = _microplex_seeded_rng(column) result[column] = rng.random(len(result)) < takeup_rate.to_numpy(dtype=float) return result @@ -8684,7 +8739,7 @@ def _attach_policyengine_voluntary_filing( ) year = self._policyengine_takeup_year() - rates = _load_policyengine_us_data_voluntary_filing_rates(year) + rates = _load_microplex_voluntary_filing_rates(year) takes_up_eitc = self._normal_bool_series( result.get("takes_up_eitc", False), index=result.index, @@ -8710,7 +8765,7 @@ def _attach_policyengine_voluntary_filing( wage_income=wage_income, age_head=age_head, ) - rng = _policyengine_us_data_seeded_rng(column) + rng = _microplex_seeded_rng(column) result[column] = (~takes_up_eitc.to_numpy(dtype=bool)) & ( rng.random(len(result)) < takeup_rate.to_numpy(dtype=float) ) @@ -8802,8 +8857,8 @@ def _attach_policyengine_simple_person_takeup( return result year = self._policyengine_takeup_year() - rate = _load_policyengine_us_data_takeup_rate(rate_key, year) - rng = _policyengine_us_data_seeded_rng(column) + rate = _load_microplex_takeup_rate(rate_key, year) + rng = _microplex_seeded_rng(column) result[column] = rng.random(len(result)) < rate return result @@ -8820,15 +8875,138 @@ def _attach_policyengine_medicaid_takeup( return result year = self._policyengine_takeup_year() - rates = _load_policyengine_us_data_medicaid_takeup_rates(year) + rates = _load_microplex_medicaid_takeup_rates(year) states = self._person_state_abbreviation(result) takeup_rate = states.map( lambda state: rates.get(state, DEFAULT_MEDICAID_TAKEUP_RATE) ) - rng = _policyengine_us_data_seeded_rng(column) + rng = _microplex_seeded_rng(column) result[column] = rng.random(len(result)) < takeup_rate.to_numpy(dtype=float) return result + def _attach_policyengine_wic_inputs( + self, + persons: pd.DataFrame, + ) -> pd.DataFrame: + result = persons.copy() + category = self._policyengine_wic_category_for_takeup(result) + year = self._policyengine_takeup_year() + + claim_column = "would_claim_wic" + if claim_column in result.columns: + result[claim_column] = self._normal_bool_series( + result[claim_column], + index=result.index, + ) + else: + claim_rates = _load_microplex_wic_takeup_rates(year) + claim_rate = category.map( + lambda value: claim_rates.get(str(value), 0.0) + ).fillna(0.0) + rng = _microplex_seeded_rng(claim_column) + result[claim_column] = rng.random(len(result)) < claim_rate.to_numpy( + dtype=float + ) + + risk_column = "is_wic_at_nutritional_risk" + if risk_column in result.columns: + result[risk_column] = self._normal_bool_series( + result[risk_column], + index=result.index, + ) + else: + risk_rates = _load_microplex_wic_nutritional_risk_rates(year) + risk_rate = category.map( + lambda value: risk_rates.get(str(value), 0.0) + ).fillna(0.0) + receives_wic = self._normal_bool_series( + result.get("receives_wic", False), + index=result.index, + ) + rng = _microplex_seeded_rng(risk_column) + result[risk_column] = receives_wic | ( + rng.random(len(result)) < risk_rate.to_numpy(dtype=float) + ) + return result + + def _policyengine_wic_category_for_takeup( + self, + persons: pd.DataFrame, + ) -> pd.Series: + index = persons.index + age = pd.to_numeric( + persons.get("age", pd.Series(0.0, index=index)), + errors="coerce", + ).fillna(0.0) + pregnant = self._normal_bool_series( + persons.get("is_pregnant", False), + index=index, + ) + breastfeeding = self._normal_bool_series( + persons.get("is_breastfeeding", False), + index=index, + ) + if "is_female" in persons.columns: + female = self._normal_bool_series(persons["is_female"], index=index) + elif "sex" in persons.columns: + female = ( + pd.to_numeric(persons["sex"], errors="coerce") + .fillna(0) + .astype(int) + .eq(2) + ) + else: + female = pd.Series(False, index=index) + + own_children = pd.to_numeric( + persons.get("own_children_in_household", pd.Series(0, index=index)), + errors="coerce", + ).fillna(0.0) + mother = breastfeeding | (female & own_children.gt(0)) + + group_column = next( + ( + column + for column in ("family_id", "spm_unit_id", "household_id") + if column in persons.columns + ), + None, + ) + if group_column is None: + min_age_group = age + else: + group_keys = persons[group_column].where( + persons[group_column].notna(), + pd.Series(np.arange(len(persons)), index=index), + ) + min_age_group = age.groupby(group_keys, sort=False).transform("min") + + category = np.select( + [ + pregnant.to_numpy(dtype=bool), + ( + mother.to_numpy(dtype=bool) + & breastfeeding.to_numpy(dtype=bool) + & min_age_group.lt(1.0).to_numpy(dtype=bool) + ), + ( + mother.to_numpy(dtype=bool) + & min_age_group.lt(0.5).to_numpy(dtype=bool) + ), + age.lt(1.0).to_numpy(dtype=bool), + age.lt(5.0).to_numpy(dtype=bool), + ], + [ + WIC_TAKEUP_CATEGORY_PREGNANT, + WIC_TAKEUP_CATEGORY_BREASTFEEDING, + WIC_TAKEUP_CATEGORY_POSTPARTUM, + WIC_TAKEUP_CATEGORY_INFANT, + WIC_TAKEUP_CATEGORY_CHILD, + ], + default=WIC_TAKEUP_CATEGORY_NONE, + ) + return pd.Series(category, index=index, dtype="string") + def _person_state_abbreviation(self, persons: pd.DataFrame) -> pd.Series: if "state" in persons.columns: state = persons["state"].astype("string").str.upper() @@ -8867,8 +9045,8 @@ def _attach_policyengine_tanf_takeup( return result year = self._policyengine_takeup_year() - rate = _load_policyengine_us_data_takeup_rate("tanf", year) - rng = _policyengine_us_data_seeded_rng(column) + rate = _load_microplex_takeup_rate("tanf", year) + rng = _microplex_seeded_rng(column) result[column] = rng.random(len(result)) < rate return result @@ -9540,8 +9718,8 @@ def _attach_policyengine_snap_takeup( or self.config.policyengine_target_period or 2024 ) - rate = _load_policyengine_us_data_takeup_rate("snap", year) - rng = _policyengine_us_data_seeded_rng(column) + rate = _load_microplex_takeup_rate("snap", year) + rng = _microplex_seeded_rng(column) result[column] = rng.random(len(result)) < rate return result diff --git a/tests/pipelines/test_pe_l0.py b/tests/pipelines/test_pe_l0.py index 48a4e55..3696129 100644 --- a/tests/pipelines/test_pe_l0.py +++ b/tests/pipelines/test_pe_l0.py @@ -2,9 +2,6 @@ from __future__ import annotations -import sys -import types - import numpy as np import pandas as pd import pytest @@ -13,31 +10,18 @@ from microplex_us.pipelines.pe_l0 import PolicyEngineL0Calibrator -def _install_fake_policyengine_l0(monkeypatch, weights: np.ndarray) -> dict[str, object]: +def _install_fake_policyengine_l0(weights: np.ndarray): calls: dict[str, object] = {} def fake_fit_l0_weights(**kwargs): calls.update(kwargs) return np.asarray(weights, dtype=float) - pe_pkg = types.ModuleType("policyengine_us_data") - cal_pkg = types.ModuleType("policyengine_us_data.calibration") - unified = types.ModuleType("policyengine_us_data.calibration.unified_calibration") - unified.fit_l0_weights = fake_fit_l0_weights - pe_pkg.calibration = cal_pkg - cal_pkg.unified_calibration = unified - monkeypatch.setitem(sys.modules, "policyengine_us_data", pe_pkg) - monkeypatch.setitem(sys.modules, "policyengine_us_data.calibration", cal_pkg) - monkeypatch.setitem( - sys.modules, - "policyengine_us_data.calibration.unified_calibration", - unified, - ) - return calls + return calls, fake_fit_l0_weights -def test_policyengine_l0_calibrator_supports_explicit_linear_constraints(monkeypatch): - calls = _install_fake_policyengine_l0(monkeypatch, np.array([1.0, 2.0])) +def test_policyengine_l0_calibrator_supports_explicit_linear_constraints(): + calls, fake_fit_l0_weights = _install_fake_policyengine_l0(np.array([1.0, 2.0])) data = pd.DataFrame({"weight": [1.0, 1.0]}) constraints = ( LinearConstraint("row1", np.array([1.0, 0.0]), 1.0), @@ -52,6 +36,7 @@ def test_policyengine_l0_calibrator_supports_explicit_linear_constraints(monkeyp epochs=25, tol=1e-6, device="cpu", + fit_l0_weights_fn=fake_fit_l0_weights, ) result = calibrator.fit_transform( data, @@ -71,14 +56,16 @@ def test_policyengine_l0_calibrator_supports_explicit_linear_constraints(monkeyp assert validation["sparsity"] == 0.0 -def test_policyengine_l0_calibrator_reports_sparsity(monkeypatch): - _install_fake_policyengine_l0(monkeypatch, np.array([0.0, 3.0, 0.0])) +def test_policyengine_l0_calibrator_reports_sparsity(): + _, fake_fit_l0_weights = _install_fake_policyengine_l0(np.array([0.0, 3.0, 0.0])) data = pd.DataFrame({"weight": [1.0, 1.0, 1.0]}) - constraints = ( - LinearConstraint("row", np.array([0.0, 1.0, 0.0]), 3.0), - ) + constraints = (LinearConstraint("row", np.array([0.0, 1.0, 0.0]), 3.0),) - calibrator = PolicyEngineL0Calibrator(epochs=5, tol=1e-6) + calibrator = PolicyEngineL0Calibrator( + epochs=5, + tol=1e-6, + fit_l0_weights_fn=fake_fit_l0_weights, + ) calibrator.fit( data, {}, @@ -90,7 +77,7 @@ def test_policyengine_l0_calibrator_reports_sparsity(monkeypatch): def test_policyengine_l0_lambda_zero_uses_dense_no_gate_path(monkeypatch): - calls = _install_fake_policyengine_l0(monkeypatch, np.array([99.0, 99.0])) + calls, fake_fit_l0_weights = _install_fake_policyengine_l0(np.array([99.0, 99.0])) data = pd.DataFrame({"weight": [1.0, 1.0]}) constraints = ( LinearConstraint("row1", np.array([1.0, 0.0]), 2.0), @@ -102,6 +89,7 @@ def test_policyengine_l0_lambda_zero_uses_dense_no_gate_path(monkeypatch): lambda_l2=0.0, epochs=100, tol=1e-10, + fit_l0_weights_fn=fake_fit_l0_weights, ) result = calibrator.fit_transform( data, @@ -116,10 +104,26 @@ def test_policyengine_l0_lambda_zero_uses_dense_no_gate_path(monkeypatch): assert validation["backend"] == "dense_projected_gradient" assert validation["uses_gates"] is False assert validation["loss_history"][0]["iteration"] == 0 - assert validation["loss_history"][-1]["objective_loss"] < validation[ - "loss_history" - ][0]["objective_loss"] + assert ( + validation["loss_history"][-1]["objective_loss"] + < validation["loss_history"][0]["objective_loss"] + ) assert result["weight"].to_numpy(dtype=float) == pytest.approx( [2.0, 3.0], rel=1e-5, ) + + +def test_policyengine_l0_requires_explicit_fit_function_for_nonzero_l0(): + data = pd.DataFrame({"weight": [1.0]}) + constraints = (LinearConstraint("row", np.array([1.0]), 1.0),) + + calibrator = PolicyEngineL0Calibrator(lambda_l0=1e-4, epochs=1) + + with pytest.raises(RuntimeError, match="no longer loads policyengine-us-data"): + calibrator.fit( + data, + {}, + weight_col="weight", + linear_constraints=constraints, + ) diff --git a/tests/pipelines/test_us.py b/tests/pipelines/test_us.py index 77d2127..66e3c1b 100644 --- a/tests/pipelines/test_us.py +++ b/tests/pipelines/test_us.py @@ -1041,7 +1041,7 @@ def fake_load_takeup_rate(variable_name: str, year: int) -> float: monkeypatch.setattr( us_pipeline_module, - "_load_policyengine_us_data_takeup_rate", + "_load_microplex_takeup_rate", fake_load_takeup_rate, ) pipeline = USMicroplexPipeline( @@ -1277,7 +1277,7 @@ def fake_load_takeup_rate(variable_name: str, year: int) -> float: monkeypatch.setattr( us_pipeline_module, - "_load_policyengine_us_data_takeup_rate", + "_load_microplex_takeup_rate", fake_load_takeup_rate, ) pipeline = USMicroplexPipeline( @@ -1385,22 +1385,22 @@ def fake_load_voluntary_rates( monkeypatch.setattr( us_pipeline_module, - "_load_policyengine_us_data_takeup_rate", + "_load_microplex_takeup_rate", fake_load_takeup_rate, ) monkeypatch.setattr( us_pipeline_module, - "_load_policyengine_us_data_medicaid_takeup_rates", + "_load_microplex_medicaid_takeup_rates", fake_load_medicaid_rates, ) monkeypatch.setattr( us_pipeline_module, - "_load_policyengine_us_data_eitc_takeup_rates", + "_load_microplex_eitc_takeup_rates", fake_load_eitc_rates, ) monkeypatch.setattr( us_pipeline_module, - "_load_policyengine_us_data_voluntary_filing_rates", + "_load_microplex_voluntary_filing_rates", fake_load_voluntary_rates, ) pipeline = USMicroplexPipeline( @@ -1460,6 +1460,78 @@ def fake_load_voluntary_rates( assert eitc_calls == [2024] assert voluntary_calls == [2024] + def test_build_policyengine_entity_tables_adds_wic_takeup_inputs( + self, + monkeypatch, + ): + wic_takeup_calls: list[int] = [] + wic_risk_calls: list[int] = [] + + def fake_wic_takeup_rates(year: int) -> dict[str, float]: + wic_takeup_calls.append(year) + return { + "PREGNANT": 0.0, + "POSTPARTUM": 1.0, + "BREASTFEEDING": 0.0, + "INFANT": 1.0, + "CHILD": 0.0, + "NONE": 0.0, + } + + def fake_wic_risk_rates(year: int) -> dict[str, float]: + wic_risk_calls.append(year) + return { + "PREGNANT": 0.0, + "POSTPARTUM": 0.0, + "BREASTFEEDING": 0.0, + "INFANT": 0.0, + "CHILD": 1.0, + "NONE": 0.0, + } + + monkeypatch.setattr( + us_pipeline_module, + "_load_microplex_wic_takeup_rates", + fake_wic_takeup_rates, + ) + monkeypatch.setattr( + us_pipeline_module, + "_load_microplex_wic_nutritional_risk_rates", + fake_wic_risk_rates, + ) + pipeline = USMicroplexPipeline( + USMicroplexBuildConfig(policyengine_dataset_year=2024) + ) + population = pd.DataFrame( + { + "person_id": [1, 2, 3, 4], + "household_id": [10, 10, 30, 40], + "family_id": [10, 10, 30, 40], + "spm_unit_id": [10, 10, 30, 40], + "weight": [1.0, 1.0, 1.0, 1.0], + "age": [30, 0, 4, 40], + "sex": [2, 1, 2, 1], + "income": [40_000.0, 0.0, 0.0, 35_000.0], + "relationship_to_head": [0, 2, 0, 0], + "state_fips": [6, 6, 6, 6], + "own_children_in_household": [1, 0, 0, 0], + "receives_wic": [False, True, False, False], + } + ) + + tables = pipeline.build_policyengine_entity_tables(population) + + persons = tables.persons.sort_values("person_id").reset_index(drop=True) + assert persons["would_claim_wic"].tolist() == [True, True, False, False] + assert persons["is_wic_at_nutritional_risk"].tolist() == [ + False, + True, + True, + False, + ] + assert wic_takeup_calls == [2024] + assert wic_risk_calls == [2024] + def test_build_policyengine_entity_tables_preserves_explicit_stochastic_takeup_inputs( self, monkeypatch, @@ -1476,26 +1548,42 @@ def fail_eitc_rates(year: int) -> dict[int, float]: def fail_voluntary_rates(year: int) -> dict: raise AssertionError(f"unexpected voluntary filing rate load: {year}") + def fail_wic_takeup_rates(year: int) -> dict[str, float]: + raise AssertionError(f"unexpected WIC take-up rate load: {year}") + + def fail_wic_risk_rates(year: int) -> dict[str, float]: + raise AssertionError(f"unexpected WIC nutritional-risk rate load: {year}") + monkeypatch.setattr( us_pipeline_module, - "_load_policyengine_us_data_takeup_rate", + "_load_microplex_takeup_rate", fail_scalar_rate, ) monkeypatch.setattr( us_pipeline_module, - "_load_policyengine_us_data_medicaid_takeup_rates", + "_load_microplex_medicaid_takeup_rates", fail_medicaid_rates, ) monkeypatch.setattr( us_pipeline_module, - "_load_policyengine_us_data_eitc_takeup_rates", + "_load_microplex_eitc_takeup_rates", fail_eitc_rates, ) monkeypatch.setattr( us_pipeline_module, - "_load_policyengine_us_data_voluntary_filing_rates", + "_load_microplex_voluntary_filing_rates", fail_voluntary_rates, ) + monkeypatch.setattr( + us_pipeline_module, + "_load_microplex_wic_takeup_rates", + fail_wic_takeup_rates, + ) + monkeypatch.setattr( + us_pipeline_module, + "_load_microplex_wic_nutritional_risk_rates", + fail_wic_risk_rates, + ) pipeline = USMicroplexPipeline( USMicroplexBuildConfig(policyengine_dataset_year=2024) ) @@ -1519,6 +1607,8 @@ def fail_voluntary_rates(year: int) -> dict: "would_file_taxes_voluntarily": [True, False], "takes_up_snap_if_eligible": [False, True], "takes_up_tanf_if_eligible": [True, False], + "would_claim_wic": [False, True], + "is_wic_at_nutritional_risk": [True, False], } ) @@ -1531,6 +1621,8 @@ def fail_voluntary_rates(year: int) -> dict: True, False, ] + assert persons["would_claim_wic"].tolist() == [False, True] + assert persons["is_wic_at_nutritional_risk"].tolist() == [True, False] tax_units = tables.tax_units.sort_values("household_id").reset_index(drop=True) assert tax_units["takes_up_aca_if_eligible"].tolist() == [True] @@ -1563,22 +1655,22 @@ def fail_voluntary_rates(year: int) -> dict: monkeypatch.setattr( us_pipeline_module, - "_load_policyengine_us_data_takeup_rate", + "_load_microplex_takeup_rate", fail_scalar_rate, ) monkeypatch.setattr( us_pipeline_module, - "_load_policyengine_us_data_medicaid_takeup_rates", + "_load_microplex_medicaid_takeup_rates", fail_medicaid_rates, ) monkeypatch.setattr( us_pipeline_module, - "_load_policyengine_us_data_eitc_takeup_rates", + "_load_microplex_eitc_takeup_rates", fake_eitc_rates, ) monkeypatch.setattr( us_pipeline_module, - "_load_policyengine_us_data_voluntary_filing_rates", + "_load_microplex_voluntary_filing_rates", fail_voluntary_rates, ) pipeline = USMicroplexPipeline( diff --git a/tests/test_no_policyengine_us_data_runtime_imports.py b/tests/test_no_policyengine_us_data_runtime_imports.py new file mode 100644 index 0000000..a487f28 --- /dev/null +++ b/tests/test_no_policyengine_us_data_runtime_imports.py @@ -0,0 +1,25 @@ +"""Runtime dependency boundaries for the MP package.""" + +from __future__ import annotations + +import ast +from pathlib import Path + + +def test_microplex_package_has_no_policyengine_us_data_imports(): + repo_root = Path(__file__).resolve().parents[1] + package_root = repo_root / "src" / "microplex_us" + offenders: list[str] = [] + for path in sorted(package_root.rglob("*.py")): + tree = ast.parse(path.read_text()) + for node in ast.walk(tree): + if isinstance(node, ast.ImportFrom) and (node.module or "").startswith( + "policyengine_us_data" + ): + offenders.append(f"{path.relative_to(repo_root)}:{node.lineno}") + elif isinstance(node, ast.Import): + for alias in node.names: + if alias.name.startswith("policyengine_us_data"): + offenders.append(f"{path.relative_to(repo_root)}:{node.lineno}") + + assert offenders == [] diff --git a/tests/test_puf_source_provider.py b/tests/test_puf_source_provider.py index f29bf05..8717bdf 100644 --- a/tests/test_puf_source_provider.py +++ b/tests/test_puf_source_provider.py @@ -2,10 +2,8 @@ from __future__ import annotations -import pickle import sys import types -from pathlib import Path import pandas as pd import pytest @@ -1108,7 +1106,7 @@ def _raise_missing_model(**_kwargs): ) -def test_map_puf_variables_can_impute_pre_tax_contributions_via_policyengine_subprocess( +def test_map_puf_variables_uses_microplex_cps_pre_tax_training_when_legacy_h5_missing( monkeypatch, tmp_path ): raw = pd.DataFrame( @@ -1122,42 +1120,37 @@ def test_map_puf_variables_can_impute_pre_tax_contributions_via_policyengine_sub "GENDER": [1], } ) - calls: dict[str, object] = {} - - def _resolve_repo(_repo): - return tmp_path - - def _resolve_python(_python, *, repo_root): - assert repo_root == tmp_path - return Path("/fake/python") + qrf_calls = _install_fake_qrf( + monkeypatch, + pd.DataFrame({"pre_tax_contributions": [4321.0]}), + ) + h5_calls: list[dict[str, object]] = [] + local_calls: list[int] = [] - def _build_env(repo_root): - assert repo_root == tmp_path - return {"PE_ENV": "1"} + def missing_h5(**kwargs): + h5_calls.append(dict(kwargs)) + raise FileNotFoundError("missing h5") - def _run(args, *, check, cwd, env): - calls["args"] = list(args) - calls["cwd"] = cwd - calls["env"] = dict(env) - assert check is True - out_path = Path(args[-1]) - with out_path.open("wb") as handle: - pickle.dump(pd.DataFrame({"pre_tax_contributions": [4321.0]}), handle) + def local_training_frame(*, training_year: int): + local_calls.append(training_year) + return pd.DataFrame( + { + "employment_income": [10_000.0, 20_000.0], + "age": [30.0, 45.0], + "is_male": [0.0, 1.0], + "pre_tax_contributions": [500.0, 1_500.0], + } + ) monkeypatch.setattr( - puf_module, "resolve_policyengine_us_data_repo_root", _resolve_repo - ) - monkeypatch.setattr( - puf_module, "resolve_policyengine_us_data_python", _resolve_python - ) - monkeypatch.setattr( - puf_module, "build_policyengine_us_data_subprocess_env", _build_env + puf_module, + "_load_pe_extended_cps_pre_tax_training_frame", + missing_h5, ) - monkeypatch.setattr(puf_module.subprocess, "run", _run) monkeypatch.setattr( puf_module, - "_load_pe_extended_cps_pre_tax_training_frame", - lambda **_kwargs: (_ for _ in ()).throw(FileNotFoundError("missing h5")), + "_load_microplex_cps_pre_tax_training_frame", + local_training_frame, ) mapped = map_puf_variables( @@ -1169,8 +1162,10 @@ def _run(args, *, check, cwd, env): ) assert mapped.loc[0, "pre_tax_contributions"] == 4321.0 - assert calls["cwd"] == tmp_path - assert calls["env"] == {"PE_ENV": "1"} + assert h5_calls == [{"policyengine_us_data_repo": tmp_path, "training_year": 2024}] + assert local_calls == [2024] + assert qrf_calls["predictors"] == ("employment_income", "age", "is_male") + assert qrf_calls["imputed_variables"] == ("pre_tax_contributions",) def test_map_puf_variables_maps_widow_status_to_surviving_spouse():