Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 14 additions & 20 deletions src/microplex_us/data_sources/puf.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@
fit_grouped_share_model,
predict_grouped_component_shares,
)
from microplex_us.data_sources.soi import (
download_pe_soi_targets,
validate_pe_soi_targets_file,
)
from microplex_us.pipelines.pe_native_scores import (
build_policyengine_us_data_subprocess_env,
resolve_policyengine_us_data_python,
Expand Down Expand Up @@ -466,25 +470,13 @@ def _normalize_puf_uprating_mode(mode: str | None) -> str:

def _resolve_pe_soi_path(
*,
policyengine_us_data_repo: str | Path | None = None,
cache_dir: str | Path | None = None,
soi_path: str | Path | None = None,
) -> Path:
if soi_path is not None:
resolved = Path(soi_path)
elif policyengine_us_data_repo is not None:
resolved = (
Path(policyengine_us_data_repo)
/ "policyengine_us_data"
/ "storage"
/ "soi.csv"
)
else:
raise ValueError(
"PE SOI uprating requires soi_path or policyengine_us_data_repo"
)
if not resolved.exists():
raise FileNotFoundError(f"Could not find PE SOI file at {resolved}")
return resolved
return validate_pe_soi_targets_file(soi_path)

return download_pe_soi_targets(cache_dir=cache_dir)


def _resolve_pe_uprating_factors_path(
Expand Down Expand Up @@ -578,14 +570,14 @@ def uprate_raw_puf_pe_style(
*,
from_year: int = 2015,
to_year: int = 2024,
policyengine_us_data_repo: str | Path | None = None,
cache_dir: str | Path | None = None,
soi_path: str | Path | None = None,
) -> pd.DataFrame:
"""Uprate raw PUF columns using the PE SOI growth contract."""
if from_year == to_year:
return puf.copy()
resolved_soi_path = _resolve_pe_soi_path(
policyengine_us_data_repo=policyengine_us_data_repo,
cache_dir=cache_dir,
soi_path=soi_path,
)
soi_table = _load_pe_soi_table(str(resolved_soi_path.resolve()))
Expand Down Expand Up @@ -1817,7 +1809,7 @@ def load_puf(
raw,
from_year=2015,
to_year=raw_uprating_year,
policyengine_us_data_repo=policyengine_us_data_repo,
cache_dir=cache_dir,
soi_path=soi_path,
)

Expand Down Expand Up @@ -1960,6 +1952,7 @@ def _build_puf_tax_units(
policyengine_us_data_python: str | Path | None = None,
impute_pre_tax_contributions: bool = False,
pre_tax_training_year: int = 2024,
cache_dir: str | Path | None = None,
soi_path: str | Path | None = None,
require_pre_tax_contribution_model: bool = False,
) -> pd.DataFrame:
Expand All @@ -1971,7 +1964,7 @@ def _build_puf_tax_units(
raw,
from_year=2015,
to_year=raw_uprating_year,
policyengine_us_data_repo=policyengine_us_data_repo,
cache_dir=cache_dir,
soi_path=soi_path,
)
tax_units = map_puf_variables(
Expand Down Expand Up @@ -2259,6 +2252,7 @@ def load_frame(self, query: SourceQuery | None = None) -> ObservationFrame:
policyengine_us_data_python=policyengine_us_data_python,
impute_pre_tax_contributions=impute_pre_tax_contributions,
pre_tax_training_year=pre_tax_training_year,
cache_dir=self.cache_dir,
soi_path=soi_path,
require_pre_tax_contribution_model=require_pre_tax_contribution_model,
)
Expand Down
116 changes: 116 additions & 0 deletions src/microplex_us/data_sources/soi.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
"""IRS SOI target-table artifact resolution for US source adapters."""

from __future__ import annotations

from pathlib import Path

import pandas as pd
import requests

DEFAULT_CACHE_DIR = Path.home() / ".cache" / "microplex"
SOI_TARGETS_POLICYENGINE_US_DATA_REF = "f7458313c86fa580fb1e43a2f18252d67cf76e4a"
# The HF policyengine-us-data model publishes current raw inputs/target DB
# artifacts, but not this historical PE-style long target table.
SOI_TARGETS_REPO_RAW_URL = (
"https://raw.githubusercontent.com/PolicyEngine/policyengine-us-data"
)
SOI_TARGETS_REPO_PATH = (
"policyengine_us_data/storage/calibration_targets/soi_targets.csv"
)

PE_SOI_TARGETS_REQUIRED_COLUMNS = frozenset(
{
"Year",
"Variable",
"Filing status",
"AGI lower bound",
"AGI upper bound",
"Count",
"Taxable only",
"Value",
}
)


def _cache_safe_ref(ref: str) -> str:
return "".join(
character if character.isalnum() or character in "._-" else "_"
for character in ref
)


def _soi_targets_cache_filename(revision: str) -> str:
return f"soi_targets_pe_us_data_{_cache_safe_ref(revision)}.csv"


def pe_soi_targets_url(
revision: str = SOI_TARGETS_POLICYENGINE_US_DATA_REF,
) -> str:
"""Return the pinned PE-US-data raw URL for the SOI targets table."""

return f"{SOI_TARGETS_REPO_RAW_URL}/{revision}/{SOI_TARGETS_REPO_PATH}"


SOI_TARGETS_URL = pe_soi_targets_url()
SOI_TARGETS_CACHE_FILENAME = _soi_targets_cache_filename(
SOI_TARGETS_POLICYENGINE_US_DATA_REF
)


def validate_pe_soi_targets_file(path: str | Path) -> Path:
"""Validate the PE-style long SOI target table schema."""

resolved = Path(path).expanduser()
if not resolved.exists():
raise FileNotFoundError(f"Could not find PE SOI targets file at {resolved}")

try:
columns = set(pd.read_csv(resolved, nrows=0).columns)
except Exception as exc:
raise ValueError(f"Could not read PE SOI targets file at {resolved}") from exc

missing = sorted(PE_SOI_TARGETS_REQUIRED_COLUMNS - columns)
if missing:
raise ValueError(
"PE SOI targets file is missing required columns "
f"{missing}: {resolved}"
)
return resolved


def pe_soi_targets_cache_path(
cache_dir: str | Path | None = None,
*,
revision: str = SOI_TARGETS_POLICYENGINE_US_DATA_REF,
) -> Path:
"""Return the cache path for the PE-style SOI targets table."""

resolved_cache_dir = DEFAULT_CACHE_DIR if cache_dir is None else Path(cache_dir)
resolved_cache_dir = resolved_cache_dir.expanduser()
return resolved_cache_dir / _soi_targets_cache_filename(revision)


def download_pe_soi_targets(
cache_dir: str | Path | None = None,
*,
force: bool = False,
revision: str = SOI_TARGETS_POLICYENGINE_US_DATA_REF,
url: str | None = None,
) -> Path:
"""Resolve the PE-style SOI targets table into the microplex cache."""

resolved_cache_dir = DEFAULT_CACHE_DIR if cache_dir is None else Path(cache_dir)
resolved_cache_dir = resolved_cache_dir.expanduser()
resolved_cache_dir.mkdir(parents=True, exist_ok=True)
destination = pe_soi_targets_cache_path(resolved_cache_dir, revision=revision)
if destination.exists() and not force:
return validate_pe_soi_targets_file(destination)

response = requests.get(url or pe_soi_targets_url(revision), timeout=300)
response.raise_for_status()
destination.write_bytes(response.content)
try:
return validate_pe_soi_targets_file(destination)
except Exception:
destination.unlink(missing_ok=True)
raise
2 changes: 2 additions & 0 deletions src/microplex_us/pipelines/pe_us_data_rebuild.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ def default_policyengine_us_data_rebuild_source_providers(
puf_cache_dir: str | Path | None = None,
puf_path: str | Path | None = None,
puf_demographics_path: str | Path | None = None,
soi_path: str | Path | None = None,
puf_expand_persons: bool = True,
include_donor_surveys: bool = True,
include_acs: bool | None = None,
Expand Down Expand Up @@ -141,6 +142,7 @@ def default_policyengine_us_data_rebuild_source_providers(
cache_dir=puf_cache,
puf_path=puf_path,
demographics_path=puf_demographics_path,
soi_path=soi_path,
expand_persons=bool(puf_expand_persons),
uprating_mode=PUF_UPRATING_MODE_PE_SOI,
cps_reference_year=(
Expand Down
4 changes: 4 additions & 0 deletions src/microplex_us/pipelines/pe_us_data_rebuild_checkpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -1828,6 +1828,7 @@ def run_policyengine_us_data_rebuild_checkpoint(
puf_cache_dir: str | Path | None = None,
puf_path: str | Path | None = None,
puf_demographics_path: str | Path | None = None,
soi_path: str | Path | None = None,
puf_expand_persons: bool = True,
include_donor_surveys: bool = True,
include_acs: bool | None = None,
Expand Down Expand Up @@ -1915,6 +1916,7 @@ def run_policyengine_us_data_rebuild_checkpoint(
puf_cache_dir=puf_cache_dir,
puf_path=puf_path,
puf_demographics_path=puf_demographics_path,
soi_path=soi_path,
puf_expand_persons=puf_expand_persons,
include_donor_surveys=include_donor_surveys,
include_acs=include_acs,
Expand Down Expand Up @@ -2110,6 +2112,7 @@ def main(argv: list[str] | None = None) -> None:
parser.add_argument("--donor-cache-dir")
parser.add_argument("--puf-path")
parser.add_argument("--puf-demographics-path")
parser.add_argument("--soi-path")
parser.add_argument("--cps-sample-n", type=int)
parser.add_argument("--puf-sample-n", type=int)
parser.add_argument("--donor-sample-n", type=int)
Expand Down Expand Up @@ -2322,6 +2325,7 @@ def main(argv: list[str] | None = None) -> None:
puf_cache_dir=args.puf_cache_dir,
puf_path=args.puf_path,
puf_demographics_path=args.puf_demographics_path,
soi_path=args.soi_path,
puf_expand_persons=not args.no_puf_expand_persons,
include_donor_surveys=args.include_donor_surveys,
include_acs=args.include_acs,
Expand Down
2 changes: 2 additions & 0 deletions tests/pipelines/test_pe_us_data_rebuild.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ def test_default_policyengine_us_data_rebuild_source_providers_use_pe_style_bund
puf_target_year=2024,
cps_download=False,
puf_expand_persons=False,
soi_path="/tmp/soi_targets.csv",
policyengine_us_data_python="/tmp/pe-python",
)

Expand All @@ -124,6 +125,7 @@ def test_default_policyengine_us_data_rebuild_source_providers_use_pe_style_bund
assert puf_provider.cps_reference_year == 2022
assert puf_provider.expand_persons is False
assert puf_provider.uprating_mode == PUF_UPRATING_MODE_PE_SOI
assert puf_provider.soi_path == "/tmp/soi_targets.csv"
assert puf_provider.policyengine_us_data_python == "/tmp/pe-python"
assert puf_provider.impute_pre_tax_contributions is False
assert puf_provider.require_pre_tax_contribution_model is False
Expand Down
16 changes: 16 additions & 0 deletions tests/pipelines/test_pe_us_data_rebuild_checkpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,19 @@ def test_default_policyengine_us_data_rebuild_queries_assign_sample_sizes_by_pro
}


def test_default_policyengine_us_data_rebuild_source_providers_forwards_soi_path(
tmp_path,
) -> None:
soi_path = tmp_path / "soi_targets.csv"
providers = default_policyengine_us_data_rebuild_source_providers(
include_donor_surveys=False,
cps_download=False,
soi_path=soi_path,
)

assert getattr(providers[1], "soi_path") == soi_path


def test_default_policyengine_us_data_rebuild_queries_derive_donor_sample_size_from_sampled_sources() -> (
None
):
Expand Down Expand Up @@ -706,6 +719,8 @@ def fake_run_policyengine_us_data_rebuild_checkpoint(**kwargs):
"/tmp/policy_data.db",
"--version-id",
"run-1",
"--soi-path",
"/tmp/soi_targets.csv",
"--donor-imputer-condition-selection",
"pe_plus_puf_native_challenger",
"--defer-native-audit",
Expand All @@ -718,6 +733,7 @@ def fake_run_policyengine_us_data_rebuild_checkpoint(**kwargs):
)
assert captured["config_overrides"]["n_synthetic"] == 100_000
assert captured["config_overrides"]["random_seed"] == 42
assert captured["soi_path"] == "/tmp/soi_targets.csv"
assert captured["defer_native_audit"] is True
assert captured["defer_imputation_ablation"] is True
stdout = capsys.readouterr().out
Expand Down
Loading
Loading