Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
205 changes: 67 additions & 138 deletions src/microplex_us/data_sources/puf.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,11 @@

from __future__ import annotations

import pickle
import re
import subprocess
import sys
import tempfile
from collections.abc import Callable
from dataclasses import dataclass, field
from functools import cache, lru_cache
from pathlib import Path
from textwrap import dedent
from typing import Any

import numpy as np
Expand All @@ -40,8 +35,6 @@
predict_grouped_component_shares,
)
from microplex_us.pipelines.pe_native_scores import (
build_policyengine_us_data_subprocess_env,
resolve_policyengine_us_data_python,
resolve_policyengine_us_data_repo_root,
)
from microplex_us.source_manifests import load_us_source_manifest
Expand Down Expand Up @@ -336,45 +329,6 @@ class PEStyleQRFImputationModel:
fitted_model: Any


@dataclass(frozen=True)
class PEStyleSubprocessImputationPredictor:
"""Run PE-style QRF imputation in the PE-US-data environment."""

policyengine_us_data_repo: str | Path
policyengine_us_data_python: str | Path | None = None

def predict(self, X_test: pd.DataFrame) -> pd.DataFrame:
resolved_repo = resolve_policyengine_us_data_repo_root(
self.policyengine_us_data_repo
)
resolved_python = resolve_policyengine_us_data_python(
self.policyengine_us_data_python,
repo_root=resolved_repo,
)
env = build_policyengine_us_data_subprocess_env(resolved_repo)
with tempfile.TemporaryDirectory(prefix="microplex-us-puf-pretax-") as tempdir:
predictors_path = Path(tempdir) / "predictors.pkl"
predictions_path = Path(tempdir) / "predictions.pkl"
with predictors_path.open("wb") as handle:
pickle.dump(pd.DataFrame(X_test), handle)
subprocess.run(
[
str(resolved_python),
"-c",
_build_pe_style_puf_pre_tax_subprocess_script(),
str(resolved_repo),
str(predictors_path),
str(predictions_path),
],
check=True,
cwd=resolved_repo,
env=env,
)
with predictions_path.open("rb") as handle:
predictions = pickle.load(handle)
return pd.DataFrame(predictions)


PUF_DEMOGRAPHIC_VARIABLES = (
"AGEDP1",
"AGEDP2",
Expand Down Expand Up @@ -1294,7 +1248,6 @@ def map_puf_variables(
FileNotFoundError,
ImportError,
ValueError,
subprocess.CalledProcessError,
):
if require_pre_tax_contribution_model:
raise
Expand Down Expand Up @@ -1685,78 +1638,6 @@ def _default_pe_style_puf_social_security_share_model(
)


def _ensure_policyengine_us_data_repo_on_sys_path(
policyengine_us_data_repo: str | Path | None,
) -> None:
if policyengine_us_data_repo is None:
return
repo_root = Path(policyengine_us_data_repo).expanduser().resolve()
if not repo_root.exists():
raise ValueError(f"PolicyEngine US-data repo does not exist: {repo_root}")
repo_root_str = str(repo_root)
if repo_root_str not in sys.path:
sys.path.insert(0, repo_root_str)


def _build_pe_style_puf_pre_tax_subprocess_script() -> str:
return dedent(
"""
import pickle
import sys

import pandas as pd

repo_root = sys.argv[1]
predictors_path = sys.argv[2]
predictions_path = sys.argv[3]

if repo_root not in sys.path:
sys.path.insert(0, repo_root)

from microimpute.models.qrf import QRF
from policyengine_us import Microsimulation
from policyengine_us_data.datasets.cps import CPS_2021

with open(predictors_path, "rb") as handle:
X_test = pickle.load(handle)
X_test = pd.DataFrame(X_test)

predictors = ["employment_income", "age", "is_male"]
cps = Microsimulation(dataset=CPS_2021)
cps.subsample(10_000)
cps_df = cps.calculate_dataframe(
[*predictors, "household_weight", "pre_tax_contributions"]
)
train = cps_df.loc[:, [*predictors, "pre_tax_contributions"]].copy()
train = train.apply(lambda column: pd.to_numeric(column, errors="coerce").fillna(0.0))
X_test = X_test.loc[:, predictors].copy()
X_test = X_test.apply(lambda column: pd.to_numeric(column, errors="coerce").fillna(0.0))

qrf = QRF(log_level="WARNING", memory_efficient=True)
fitted_model = qrf.fit(
X_train=train,
predictors=predictors,
imputed_variables=["pre_tax_contributions"],
n_jobs=1,
)
predictions = fitted_model.predict(X_test=X_test)

with open(predictions_path, "wb") as handle:
pickle.dump(
pd.DataFrame(
{
"pre_tax_contributions": pd.to_numeric(
predictions["pre_tax_contributions"],
errors="coerce",
).fillna(0.0)
}
),
handle,
)
"""
).strip()


def _load_pe_extended_cps_pre_tax_training_frame(
*,
policyengine_us_data_repo: str | Path,
Expand Down Expand Up @@ -1819,6 +1700,69 @@ def _load_pe_extended_cps_pre_tax_training_frame(
return train


def _load_microplex_cps_pre_tax_training_frame(
*,
training_year: int,
) -> pd.DataFrame:
cps = load_cps_asec(year=int(training_year))
persons = cps.persons.to_pandas()
index = persons.index
employment_income = pd.to_numeric(
persons.get("employment_income", persons.get("wage_income", 0.0)),
errors="coerce",
).fillna(0.0)
age = pd.to_numeric(persons.get("age", 0.0), errors="coerce").fillna(0.0)
if "is_male" in persons.columns:
is_male = pd.to_numeric(persons["is_male"], errors="coerce").fillna(0.0)
elif "sex" in persons.columns:
is_male = (
pd.to_numeric(persons["sex"], errors="coerce")
.fillna(0)
.astype(int)
.eq(1)
.astype(float)
)
else:
is_male = pd.Series(0.0, index=index)

if "pre_tax_contributions" in persons.columns:
pre_tax_contributions = pd.to_numeric(
persons["pre_tax_contributions"],
errors="coerce",
).fillna(0.0)
else:
pre_tax_contributions = pd.Series(0.0, index=index)
for column in (
"traditional_401k_contributions",
"traditional_401k_contributions_desired",
"traditional_403b_contributions",
"traditional_403b_contributions_desired",
"pre_tax_health_insurance_premiums",
"health_savings_account_payroll_contributions",
):
if column in persons.columns:
pre_tax_contributions = pre_tax_contributions.add(
pd.to_numeric(persons[column], errors="coerce").fillna(0.0),
fill_value=0.0,
)

train = pd.DataFrame(
{
"employment_income": employment_income,
"age": age,
"is_male": is_male,
"pre_tax_contributions": pre_tax_contributions,
},
index=index,
)
train = train.apply(
lambda column: pd.to_numeric(column, errors="coerce").fillna(0.0)
)
if len(train) > 10_000:
train = train.sample(n=10_000, random_state=0)
return train


@lru_cache(maxsize=4)
def _default_pe_style_puf_pre_tax_contribution_model(
*,
Expand All @@ -1834,13 +1778,8 @@ def _default_pe_style_puf_pre_tax_contribution_model(
training_year=pre_tax_training_year,
)
except (FileNotFoundError, KeyError, OSError, ValueError):
return PEStyleQRFImputationModel(
predictors=predictors,
imputed_variable="pre_tax_contributions",
fitted_model=PEStyleSubprocessImputationPredictor(
policyengine_us_data_repo=policyengine_us_data_repo,
policyengine_us_data_python=policyengine_us_data_python,
),
train = _load_microplex_cps_pre_tax_training_frame(
training_year=pre_tax_training_year,
)

from microimpute.models.qrf import QRF
Expand All @@ -1862,19 +1801,9 @@ def _default_pe_style_puf_pre_tax_contribution_model(
)

from microimpute.models.qrf import QRF
from policyengine_us import Microsimulation

_ensure_policyengine_us_data_repo_on_sys_path(policyengine_us_data_repo)
from policyengine_us_data.datasets.cps import CPS_2021

cps = Microsimulation(dataset=CPS_2021)
cps.subsample(10_000)
cps_df = cps.calculate_dataframe(
[*predictors, "household_weight", "pre_tax_contributions"]
)
train = cps_df.loc[:, [*predictors, "pre_tax_contributions"]].copy()
train = train.apply(
lambda column: pd.to_numeric(column, errors="coerce").fillna(0.0)
train = _load_microplex_cps_pre_tax_training_frame(
training_year=pre_tax_training_year
)

qrf = QRF(log_level="WARNING", memory_efficient=True)
Expand Down
Loading
Loading