From 070c6555f033dba40d280ed0eecfd367646bd7d0 Mon Sep 17 00:00:00 2001 From: Anthony Volk Date: Mon, 1 Jun 2026 22:41:23 +0200 Subject: [PATCH 1/5] Resolve PE SOI targets from fresh data checkout --- src/microplex_us/data_sources/puf.py | 31 ++++++++++++------- .../pipelines/pe_us_data_rebuild.py | 2 ++ .../pe_us_data_rebuild_checkpoint.py | 4 +++ .../test_pe_us_data_rebuild_checkpoint.py | 13 ++++++++ tests/test_puf_source_provider.py | 25 +++++++++++++++ 5 files changed, 63 insertions(+), 12 deletions(-) diff --git a/src/microplex_us/data_sources/puf.py b/src/microplex_us/data_sources/puf.py index aacfae6..7110127 100644 --- a/src/microplex_us/data_sources/puf.py +++ b/src/microplex_us/data_sources/puf.py @@ -470,21 +470,28 @@ def _resolve_pe_soi_path( soi_path: str | Path | None = None, ) -> Path: if soi_path is not None: - resolved = Path(soi_path) - elif policyengine_us_data_repo is not None: - resolved = ( - Path(policyengine_us_data_repo) - / "policyengine_us_data" - / "storage" - / "soi.csv" - ) - else: + resolved = Path(soi_path).expanduser() + if not resolved.exists(): + raise FileNotFoundError(f"Could not find PE SOI file at {resolved}") + return resolved + if policyengine_us_data_repo is None: raise ValueError( "PE SOI uprating requires soi_path or policyengine_us_data_repo" ) - if not resolved.exists(): - raise FileNotFoundError(f"Could not find PE SOI file at {resolved}") - return resolved + storage = ( + Path(policyengine_us_data_repo).expanduser() + / "policyengine_us_data" + / "storage" + ) + candidates = ( + storage / "soi.csv", + storage / "calibration_targets" / "soi_targets.csv", + ) + for candidate in candidates: + if candidate.exists(): + return candidate + expected = ", ".join(str(candidate) for candidate in candidates) + raise FileNotFoundError(f"Could not find PE SOI file at any of: {expected}") def _resolve_pe_uprating_factors_path( diff --git a/src/microplex_us/pipelines/pe_us_data_rebuild.py b/src/microplex_us/pipelines/pe_us_data_rebuild.py index 791cb61..e0a502e 100644 --- a/src/microplex_us/pipelines/pe_us_data_rebuild.py +++ b/src/microplex_us/pipelines/pe_us_data_rebuild.py @@ -101,6 +101,7 @@ def default_policyengine_us_data_rebuild_source_providers( puf_cache_dir: str | Path | None = None, puf_path: str | Path | None = None, puf_demographics_path: str | Path | None = None, + soi_path: str | Path | None = None, puf_expand_persons: bool = True, include_donor_surveys: bool = True, include_acs: bool | None = None, @@ -141,6 +142,7 @@ def default_policyengine_us_data_rebuild_source_providers( cache_dir=puf_cache, puf_path=puf_path, demographics_path=puf_demographics_path, + soi_path=soi_path, expand_persons=bool(puf_expand_persons), uprating_mode=PUF_UPRATING_MODE_PE_SOI, cps_reference_year=( diff --git a/src/microplex_us/pipelines/pe_us_data_rebuild_checkpoint.py b/src/microplex_us/pipelines/pe_us_data_rebuild_checkpoint.py index 8c7e681..488484b 100644 --- a/src/microplex_us/pipelines/pe_us_data_rebuild_checkpoint.py +++ b/src/microplex_us/pipelines/pe_us_data_rebuild_checkpoint.py @@ -1828,6 +1828,7 @@ def run_policyengine_us_data_rebuild_checkpoint( puf_cache_dir: str | Path | None = None, puf_path: str | Path | None = None, puf_demographics_path: str | Path | None = None, + soi_path: str | Path | None = None, puf_expand_persons: bool = True, include_donor_surveys: bool = True, include_acs: bool | None = None, @@ -1915,6 +1916,7 @@ def run_policyengine_us_data_rebuild_checkpoint( puf_cache_dir=puf_cache_dir, puf_path=puf_path, puf_demographics_path=puf_demographics_path, + soi_path=soi_path, puf_expand_persons=puf_expand_persons, include_donor_surveys=include_donor_surveys, include_acs=include_acs, @@ -2110,6 +2112,7 @@ def main(argv: list[str] | None = None) -> None: parser.add_argument("--donor-cache-dir") parser.add_argument("--puf-path") parser.add_argument("--puf-demographics-path") + parser.add_argument("--soi-path") parser.add_argument("--cps-sample-n", type=int) parser.add_argument("--puf-sample-n", type=int) parser.add_argument("--donor-sample-n", type=int) @@ -2322,6 +2325,7 @@ def main(argv: list[str] | None = None) -> None: puf_cache_dir=args.puf_cache_dir, puf_path=args.puf_path, puf_demographics_path=args.puf_demographics_path, + soi_path=args.soi_path, puf_expand_persons=not args.no_puf_expand_persons, include_donor_surveys=args.include_donor_surveys, include_acs=args.include_acs, diff --git a/tests/pipelines/test_pe_us_data_rebuild_checkpoint.py b/tests/pipelines/test_pe_us_data_rebuild_checkpoint.py index 6557317..6fd5432 100644 --- a/tests/pipelines/test_pe_us_data_rebuild_checkpoint.py +++ b/tests/pipelines/test_pe_us_data_rebuild_checkpoint.py @@ -221,6 +221,19 @@ def test_default_policyengine_us_data_rebuild_queries_assign_sample_sizes_by_pro } +def test_default_policyengine_us_data_rebuild_source_providers_forwards_soi_path( + tmp_path, +) -> None: + soi_path = tmp_path / "soi_targets.csv" + providers = default_policyengine_us_data_rebuild_source_providers( + include_donor_surveys=False, + cps_download=False, + soi_path=soi_path, + ) + + assert getattr(providers[1], "soi_path") == soi_path + + def test_default_policyengine_us_data_rebuild_queries_derive_donor_sample_size_from_sampled_sources() -> ( None ): diff --git a/tests/test_puf_source_provider.py b/tests/test_puf_source_provider.py index b4d975b..c78038f 100644 --- a/tests/test_puf_source_provider.py +++ b/tests/test_puf_source_provider.py @@ -361,6 +361,31 @@ def test_uprate_raw_puf_pe_style_matches_pe_soi_contract(tmp_path): assert result["S006"].tolist() == pytest.approx([110.0, 220.0]) +def test_uprate_raw_puf_pe_style_falls_back_to_calibration_soi_targets(tmp_path): + repo_root = tmp_path / "pe-us-data" + storage = repo_root / "policyengine_us_data" / "storage" + calibration_targets = storage / "calibration_targets" + calibration_targets.mkdir(parents=True) + _write_minimal_soi_csv(calibration_targets / "soi_targets.csv") + + raw = pd.DataFrame( + { + "E00200": [10.0], + "S006": [100.0], + } + ) + + result = uprate_raw_puf_pe_style( + raw, + from_year=2015, + to_year=2024, + policyengine_us_data_repo=repo_root, + ) + + assert result["E00200"].tolist() == pytest.approx([15.0]) + assert result["S006"].tolist() == pytest.approx([110.0]) + + def test_uprate_mapped_puf_with_pe_factors_uses_aliases_and_recomputes(tmp_path): repo_root = tmp_path / "pe-us-data" storage = repo_root / "policyengine_us_data" / "storage" From 7a52024bece2b6d7d312f7b3208194c53c1159e3 Mon Sep 17 00:00:00 2001 From: Anthony Volk Date: Mon, 1 Jun 2026 22:59:58 +0200 Subject: [PATCH 2/5] Keep PE SOI repo path explicit --- src/microplex_us/data_sources/puf.py | 15 +++++---------- tests/test_puf_source_provider.py | 25 ------------------------- 2 files changed, 5 insertions(+), 35 deletions(-) diff --git a/src/microplex_us/data_sources/puf.py b/src/microplex_us/data_sources/puf.py index 7110127..e33a5c9 100644 --- a/src/microplex_us/data_sources/puf.py +++ b/src/microplex_us/data_sources/puf.py @@ -478,20 +478,15 @@ def _resolve_pe_soi_path( raise ValueError( "PE SOI uprating requires soi_path or policyengine_us_data_repo" ) - storage = ( + resolved = ( Path(policyengine_us_data_repo).expanduser() / "policyengine_us_data" / "storage" + / "soi.csv" ) - candidates = ( - storage / "soi.csv", - storage / "calibration_targets" / "soi_targets.csv", - ) - for candidate in candidates: - if candidate.exists(): - return candidate - expected = ", ".join(str(candidate) for candidate in candidates) - raise FileNotFoundError(f"Could not find PE SOI file at any of: {expected}") + if not resolved.exists(): + raise FileNotFoundError(f"Could not find PE SOI file at {resolved}") + return resolved def _resolve_pe_uprating_factors_path( diff --git a/tests/test_puf_source_provider.py b/tests/test_puf_source_provider.py index c78038f..b4d975b 100644 --- a/tests/test_puf_source_provider.py +++ b/tests/test_puf_source_provider.py @@ -361,31 +361,6 @@ def test_uprate_raw_puf_pe_style_matches_pe_soi_contract(tmp_path): assert result["S006"].tolist() == pytest.approx([110.0, 220.0]) -def test_uprate_raw_puf_pe_style_falls_back_to_calibration_soi_targets(tmp_path): - repo_root = tmp_path / "pe-us-data" - storage = repo_root / "policyengine_us_data" / "storage" - calibration_targets = storage / "calibration_targets" - calibration_targets.mkdir(parents=True) - _write_minimal_soi_csv(calibration_targets / "soi_targets.csv") - - raw = pd.DataFrame( - { - "E00200": [10.0], - "S006": [100.0], - } - ) - - result = uprate_raw_puf_pe_style( - raw, - from_year=2015, - to_year=2024, - policyengine_us_data_repo=repo_root, - ) - - assert result["E00200"].tolist() == pytest.approx([15.0]) - assert result["S006"].tolist() == pytest.approx([110.0]) - - def test_uprate_mapped_puf_with_pe_factors_uses_aliases_and_recomputes(tmp_path): repo_root = tmp_path / "pe-us-data" storage = repo_root / "policyengine_us_data" / "storage" From a6ee31d9c6f664d111759741463e2545b753fffe Mon Sep 17 00:00:00 2001 From: Anthony Volk Date: Tue, 2 Jun 2026 18:09:45 +0200 Subject: [PATCH 3/5] Cache PE SOI targets for PUF uprating --- src/microplex_us/data_sources/puf.py | 34 ++--- src/microplex_us/data_sources/soi.py | 102 ++++++++++++++ tests/pipelines/test_pe_us_data_rebuild.py | 2 + .../test_pe_us_data_rebuild_checkpoint.py | 3 + tests/test_puf_source_provider.py | 126 +++++++++++++++++- 5 files changed, 248 insertions(+), 19 deletions(-) create mode 100644 src/microplex_us/data_sources/soi.py diff --git a/src/microplex_us/data_sources/puf.py b/src/microplex_us/data_sources/puf.py index e33a5c9..ac07c88 100644 --- a/src/microplex_us/data_sources/puf.py +++ b/src/microplex_us/data_sources/puf.py @@ -38,6 +38,10 @@ fit_grouped_share_model, predict_grouped_component_shares, ) +from microplex_us.data_sources.soi import ( + download_pe_soi_targets, + validate_pe_soi_targets_file, +) from microplex_us.pipelines.pe_native_scores import ( build_policyengine_us_data_subprocess_env, resolve_policyengine_us_data_python, @@ -466,27 +470,17 @@ def _normalize_puf_uprating_mode(mode: str | None) -> str: def _resolve_pe_soi_path( *, + cache_dir: str | Path | None = None, policyengine_us_data_repo: str | Path | None = None, soi_path: str | Path | None = None, ) -> Path: if soi_path is not None: - resolved = Path(soi_path).expanduser() - if not resolved.exists(): - raise FileNotFoundError(f"Could not find PE SOI file at {resolved}") - return resolved - if policyengine_us_data_repo is None: - raise ValueError( - "PE SOI uprating requires soi_path or policyengine_us_data_repo" - ) - resolved = ( - Path(policyengine_us_data_repo).expanduser() - / "policyengine_us_data" - / "storage" - / "soi.csv" - ) - if not resolved.exists(): - raise FileNotFoundError(f"Could not find PE SOI file at {resolved}") - return resolved + return validate_pe_soi_targets_file(soi_path) + + # Kept in the signature for backward compatibility with older callers. + # SOI resolution is now cache-backed and does not probe PE-US-data storage. + _ = policyengine_us_data_repo + return download_pe_soi_targets(cache_dir=cache_dir) def _resolve_pe_uprating_factors_path( @@ -580,6 +574,7 @@ def uprate_raw_puf_pe_style( *, from_year: int = 2015, to_year: int = 2024, + cache_dir: str | Path | None = None, policyengine_us_data_repo: str | Path | None = None, soi_path: str | Path | None = None, ) -> pd.DataFrame: @@ -587,6 +582,7 @@ def uprate_raw_puf_pe_style( if from_year == to_year: return puf.copy() resolved_soi_path = _resolve_pe_soi_path( + cache_dir=cache_dir, policyengine_us_data_repo=policyengine_us_data_repo, soi_path=soi_path, ) @@ -1819,6 +1815,7 @@ def load_puf( raw, from_year=2015, to_year=raw_uprating_year, + cache_dir=cache_dir, policyengine_us_data_repo=policyengine_us_data_repo, soi_path=soi_path, ) @@ -1962,6 +1959,7 @@ def _build_puf_tax_units( policyengine_us_data_python: str | Path | None = None, impute_pre_tax_contributions: bool = False, pre_tax_training_year: int = 2024, + cache_dir: str | Path | None = None, soi_path: str | Path | None = None, require_pre_tax_contribution_model: bool = False, ) -> pd.DataFrame: @@ -1973,6 +1971,7 @@ def _build_puf_tax_units( raw, from_year=2015, to_year=raw_uprating_year, + cache_dir=cache_dir, policyengine_us_data_repo=policyengine_us_data_repo, soi_path=soi_path, ) @@ -2261,6 +2260,7 @@ def load_frame(self, query: SourceQuery | None = None) -> ObservationFrame: policyengine_us_data_python=policyengine_us_data_python, impute_pre_tax_contributions=impute_pre_tax_contributions, pre_tax_training_year=pre_tax_training_year, + cache_dir=self.cache_dir, soi_path=soi_path, require_pre_tax_contribution_model=require_pre_tax_contribution_model, ) diff --git a/src/microplex_us/data_sources/soi.py b/src/microplex_us/data_sources/soi.py new file mode 100644 index 0000000..e89de6c --- /dev/null +++ b/src/microplex_us/data_sources/soi.py @@ -0,0 +1,102 @@ +"""IRS SOI target-table artifact resolution for US source adapters.""" + +from __future__ import annotations + +from pathlib import Path + +import pandas as pd +import requests + +DEFAULT_CACHE_DIR = Path.home() / ".cache" / "microplex" +SOI_TARGETS_POLICYENGINE_US_DATA_REF = "f7458313c86fa580fb1e43a2f18252d67cf76e4a" +SOI_TARGETS_CACHE_FILENAME = ( + f"soi_targets_pe_us_data_{SOI_TARGETS_POLICYENGINE_US_DATA_REF}.csv" +) +# The HF policyengine-us-data model publishes current raw inputs/target DB +# artifacts, but not this historical PE-style long target table. +SOI_TARGETS_URL = ( + "https://raw.githubusercontent.com/PolicyEngine/policyengine-us-data/" + f"{SOI_TARGETS_POLICYENGINE_US_DATA_REF}/" + "policyengine_us_data/storage/calibration_targets/soi_targets.csv" +) + +PE_SOI_TARGETS_REQUIRED_COLUMNS = frozenset( + { + "Year", + "Variable", + "Filing status", + "AGI lower bound", + "AGI upper bound", + "Count", + "Taxable only", + "Value", + } +) + + +def _cache_safe_ref(ref: str) -> str: + return "".join( + character if character.isalnum() or character in "._-" else "_" + for character in ref + ) + + +def _soi_targets_cache_filename(revision: str) -> str: + return f"soi_targets_pe_us_data_{_cache_safe_ref(revision)}.csv" + + +def validate_pe_soi_targets_file(path: str | Path) -> Path: + """Validate the PE-style long SOI target table schema.""" + + resolved = Path(path) + if not resolved.exists(): + raise FileNotFoundError(f"Could not find PE SOI targets file at {resolved}") + + try: + columns = set(pd.read_csv(resolved, nrows=0).columns) + except Exception as exc: + raise ValueError(f"Could not read PE SOI targets file at {resolved}") from exc + + missing = sorted(PE_SOI_TARGETS_REQUIRED_COLUMNS - columns) + if missing: + raise ValueError( + "PE SOI targets file is missing required columns " + f"{missing}: {resolved}" + ) + return resolved + + +def pe_soi_targets_cache_path( + cache_dir: str | Path | None = None, + *, + revision: str = SOI_TARGETS_POLICYENGINE_US_DATA_REF, +) -> Path: + """Return the cache path for the PE-style SOI targets table.""" + + resolved_cache_dir = DEFAULT_CACHE_DIR if cache_dir is None else Path(cache_dir) + return resolved_cache_dir / _soi_targets_cache_filename(revision) + + +def download_pe_soi_targets( + cache_dir: str | Path | None = None, + *, + force: bool = False, + revision: str = SOI_TARGETS_POLICYENGINE_US_DATA_REF, + url: str = SOI_TARGETS_URL, +) -> Path: + """Resolve the PE-style SOI targets table into the microplex cache.""" + + resolved_cache_dir = DEFAULT_CACHE_DIR if cache_dir is None else Path(cache_dir) + resolved_cache_dir.mkdir(parents=True, exist_ok=True) + destination = pe_soi_targets_cache_path(resolved_cache_dir, revision=revision) + if destination.exists() and not force: + return validate_pe_soi_targets_file(destination) + + response = requests.get(url, timeout=300) + response.raise_for_status() + destination.write_bytes(response.content) + try: + return validate_pe_soi_targets_file(destination) + except Exception: + destination.unlink(missing_ok=True) + raise diff --git a/tests/pipelines/test_pe_us_data_rebuild.py b/tests/pipelines/test_pe_us_data_rebuild.py index 01cf675..1279df5 100644 --- a/tests/pipelines/test_pe_us_data_rebuild.py +++ b/tests/pipelines/test_pe_us_data_rebuild.py @@ -111,6 +111,7 @@ def test_default_policyengine_us_data_rebuild_source_providers_use_pe_style_bund puf_target_year=2024, cps_download=False, puf_expand_persons=False, + soi_path="/tmp/soi_targets.csv", policyengine_us_data_python="/tmp/pe-python", ) @@ -124,6 +125,7 @@ def test_default_policyengine_us_data_rebuild_source_providers_use_pe_style_bund assert puf_provider.cps_reference_year == 2022 assert puf_provider.expand_persons is False assert puf_provider.uprating_mode == PUF_UPRATING_MODE_PE_SOI + assert puf_provider.soi_path == "/tmp/soi_targets.csv" assert puf_provider.policyengine_us_data_python == "/tmp/pe-python" assert puf_provider.impute_pre_tax_contributions is False assert puf_provider.require_pre_tax_contribution_model is False diff --git a/tests/pipelines/test_pe_us_data_rebuild_checkpoint.py b/tests/pipelines/test_pe_us_data_rebuild_checkpoint.py index 6fd5432..858438b 100644 --- a/tests/pipelines/test_pe_us_data_rebuild_checkpoint.py +++ b/tests/pipelines/test_pe_us_data_rebuild_checkpoint.py @@ -719,6 +719,8 @@ def fake_run_policyengine_us_data_rebuild_checkpoint(**kwargs): "/tmp/policy_data.db", "--version-id", "run-1", + "--soi-path", + "/tmp/soi_targets.csv", "--donor-imputer-condition-selection", "pe_plus_puf_native_challenger", "--defer-native-audit", @@ -731,6 +733,7 @@ def fake_run_policyengine_us_data_rebuild_checkpoint(**kwargs): ) assert captured["config_overrides"]["n_synthetic"] == 100_000 assert captured["config_overrides"]["random_seed"] == 42 + assert captured["soi_path"] == "/tmp/soi_targets.csv" assert captured["defer_native_audit"] is True assert captured["defer_imputation_ablation"] is True stdout = capsys.readouterr().out diff --git a/tests/test_puf_source_provider.py b/tests/test_puf_source_provider.py index b4d975b..78f2da2 100644 --- a/tests/test_puf_source_provider.py +++ b/tests/test_puf_source_provider.py @@ -12,6 +12,7 @@ from microplex.core import EntityType, SourceArchetype, SourceProvider, SourceQuery import microplex_us.data_sources.puf as puf_module +import microplex_us.data_sources.soi as soi_module from microplex_us.data_sources import PUFSourceProvider, expand_to_persons from microplex_us.data_sources.puf import ( PUF_UPRATING_MODE_PE_SOI, @@ -334,7 +335,7 @@ def fail_grouped_loader(**_kwargs): def test_uprate_raw_puf_pe_style_matches_pe_soi_contract(tmp_path): - soi_path = tmp_path / "soi.csv" + soi_path = tmp_path / "soi_targets.csv" _write_minimal_soi_csv(soi_path) raw = pd.DataFrame( @@ -361,6 +362,75 @@ def test_uprate_raw_puf_pe_style_matches_pe_soi_contract(tmp_path): assert result["S006"].tolist() == pytest.approx([110.0, 220.0]) +def test_uprate_raw_puf_pe_style_uses_cached_soi_targets_by_default( + tmp_path, + monkeypatch, +): + cache_dir = tmp_path / "cache" + cache_dir.mkdir() + soi_path = soi_module.pe_soi_targets_cache_path(cache_dir) + _write_minimal_soi_csv(soi_path) + + def fail_get(*args, **kwargs): + raise AssertionError( + "requests.get should not be called for cached SOI targets" + ) + + monkeypatch.setattr(soi_module.requests, "get", fail_get) + + raw = pd.DataFrame( + { + "E00200": [10.0], + "S006": [100.0], + } + ) + + result = uprate_raw_puf_pe_style( + raw, + from_year=2015, + to_year=2024, + cache_dir=cache_dir, + ) + + assert result["E00200"].tolist() == pytest.approx([15.0]) + assert result["S006"].tolist() == pytest.approx([110.0]) + + +def test_download_pe_soi_targets_fetches_missing_cache(tmp_path, monkeypatch): + source = tmp_path / "source_soi_targets.csv" + _write_minimal_soi_csv(source) + + class FakeResponse: + content = source.read_bytes() + + def raise_for_status(self): + return None + + calls: list[tuple[str, int]] = [] + + def fake_get(url, *, timeout): + calls.append((url, timeout)) + return FakeResponse() + + monkeypatch.setattr(soi_module.requests, "get", fake_get) + + cache_dir = tmp_path / "cache" + resolved = soi_module.download_pe_soi_targets(cache_dir) + + assert resolved == soi_module.pe_soi_targets_cache_path(cache_dir) + assert soi_module.SOI_TARGETS_POLICYENGINE_US_DATA_REF in resolved.name + assert calls == [(soi_module.SOI_TARGETS_URL, 300)] + assert pd.read_csv(resolved)["Variable"].tolist() + + +def test_validate_pe_soi_targets_file_rejects_bad_schema(tmp_path): + bad_path = tmp_path / "soi_targets.csv" + pd.DataFrame({"Variable": ["count"]}).to_csv(bad_path, index=False) + + with pytest.raises(ValueError, match="missing required columns"): + soi_module.validate_pe_soi_targets_file(bad_path) + + def test_uprate_mapped_puf_with_pe_factors_uses_aliases_and_recomputes(tmp_path): repo_root = tmp_path / "pe-us-data" storage = repo_root / "policyengine_us_data" / "storage" @@ -402,7 +472,7 @@ def test_puf_source_provider_pe_soi_mode_uses_raw_uprating(tmp_path): repo_root = tmp_path / "pe-us-data" storage = repo_root / "policyengine_us_data" / "storage" storage.mkdir(parents=True) - soi_path = storage / "soi.csv" + soi_path = tmp_path / "soi_targets.csv" uprating_factors_path = storage / "uprating_factors.csv" _write_minimal_soi_csv(soi_path) _write_minimal_uprating_factors_csv(uprating_factors_path) @@ -441,6 +511,58 @@ def test_puf_source_provider_pe_soi_mode_uses_raw_uprating(tmp_path): assert person["non_sch_d_capital_gains"] == pytest.approx(13.0) +def test_puf_source_provider_pe_soi_mode_does_not_require_repo_storage_soi( + tmp_path, + monkeypatch, +): + repo_root = tmp_path / "pe-us-data" + storage = repo_root / "policyengine_us_data" / "storage" + storage.mkdir(parents=True) + _write_minimal_uprating_factors_csv(storage / "uprating_factors.csv") + cache_dir = tmp_path / "cache" + cache_dir.mkdir() + _write_minimal_soi_csv(soi_module.pe_soi_targets_cache_path(cache_dir)) + + def fail_get(*args, **kwargs): + raise AssertionError( + "requests.get should not be called for cached SOI targets" + ) + + monkeypatch.setattr(soi_module.requests, "get", fail_get) + + puf = pd.DataFrame( + { + "RECID": [101], + "MARS": [1], + "XTOT": [1], + "S006": [100.0], + "E00200": [10.0], + "E01100": [5.0], + "AGE_HEAD": [45], + "GENDER": [1], + } + ) + puf_path = tmp_path / "puf.csv" + demographics_path = tmp_path / "demographics.csv" + puf.to_csv(puf_path, index=False) + pd.DataFrame({"RECID": [101]}).to_csv(demographics_path, index=False) + + provider = PUFSourceProvider( + puf_path=puf_path, + demographics_path=demographics_path, + target_year=2024, + cache_dir=cache_dir, + uprating_mode=PUF_UPRATING_MODE_PE_SOI, + policyengine_us_data_repo=repo_root, + social_security_share_model_loader=_mock_social_security_share_model_loader, + ) + frame = provider.load_frame(SourceQuery(period=2024)) + person = frame.tables[EntityType.PERSON].iloc[0] + + assert not (storage / "soi.csv").exists() + assert person["employment_income"] == pytest.approx(18.0) + + def test_expand_to_persons_splits_negative_joint_self_employment_losses(): tax_units = pd.DataFrame( { From f90747e1877664adaceb9b6c6d8e38c41c39c022 Mon Sep 17 00:00:00 2001 From: Anthony Volk Date: Tue, 2 Jun 2026 19:15:05 +0200 Subject: [PATCH 4/5] Align SOI target path resolution with dataset flags --- src/microplex_us/data_sources/soi.py | 32 +++++++++++++----- tests/test_puf_source_provider.py | 49 ++++++++++++++++++++++++++++ 2 files changed, 72 insertions(+), 9 deletions(-) diff --git a/src/microplex_us/data_sources/soi.py b/src/microplex_us/data_sources/soi.py index e89de6c..cdbeba0 100644 --- a/src/microplex_us/data_sources/soi.py +++ b/src/microplex_us/data_sources/soi.py @@ -9,14 +9,12 @@ DEFAULT_CACHE_DIR = Path.home() / ".cache" / "microplex" SOI_TARGETS_POLICYENGINE_US_DATA_REF = "f7458313c86fa580fb1e43a2f18252d67cf76e4a" -SOI_TARGETS_CACHE_FILENAME = ( - f"soi_targets_pe_us_data_{SOI_TARGETS_POLICYENGINE_US_DATA_REF}.csv" -) # The HF policyengine-us-data model publishes current raw inputs/target DB # artifacts, but not this historical PE-style long target table. -SOI_TARGETS_URL = ( - "https://raw.githubusercontent.com/PolicyEngine/policyengine-us-data/" - f"{SOI_TARGETS_POLICYENGINE_US_DATA_REF}/" +SOI_TARGETS_REPO_RAW_URL = ( + "https://raw.githubusercontent.com/PolicyEngine/policyengine-us-data" +) +SOI_TARGETS_REPO_PATH = ( "policyengine_us_data/storage/calibration_targets/soi_targets.csv" ) @@ -45,10 +43,24 @@ def _soi_targets_cache_filename(revision: str) -> str: return f"soi_targets_pe_us_data_{_cache_safe_ref(revision)}.csv" +def pe_soi_targets_url( + revision: str = SOI_TARGETS_POLICYENGINE_US_DATA_REF, +) -> str: + """Return the pinned PE-US-data raw URL for the SOI targets table.""" + + return f"{SOI_TARGETS_REPO_RAW_URL}/{revision}/{SOI_TARGETS_REPO_PATH}" + + +SOI_TARGETS_URL = pe_soi_targets_url() +SOI_TARGETS_CACHE_FILENAME = _soi_targets_cache_filename( + SOI_TARGETS_POLICYENGINE_US_DATA_REF +) + + def validate_pe_soi_targets_file(path: str | Path) -> Path: """Validate the PE-style long SOI target table schema.""" - resolved = Path(path) + resolved = Path(path).expanduser() if not resolved.exists(): raise FileNotFoundError(f"Could not find PE SOI targets file at {resolved}") @@ -74,6 +86,7 @@ def pe_soi_targets_cache_path( """Return the cache path for the PE-style SOI targets table.""" resolved_cache_dir = DEFAULT_CACHE_DIR if cache_dir is None else Path(cache_dir) + resolved_cache_dir = resolved_cache_dir.expanduser() return resolved_cache_dir / _soi_targets_cache_filename(revision) @@ -82,17 +95,18 @@ def download_pe_soi_targets( *, force: bool = False, revision: str = SOI_TARGETS_POLICYENGINE_US_DATA_REF, - url: str = SOI_TARGETS_URL, + url: str | None = None, ) -> Path: """Resolve the PE-style SOI targets table into the microplex cache.""" resolved_cache_dir = DEFAULT_CACHE_DIR if cache_dir is None else Path(cache_dir) + resolved_cache_dir = resolved_cache_dir.expanduser() resolved_cache_dir.mkdir(parents=True, exist_ok=True) destination = pe_soi_targets_cache_path(resolved_cache_dir, revision=revision) if destination.exists() and not force: return validate_pe_soi_targets_file(destination) - response = requests.get(url, timeout=300) + response = requests.get(url or pe_soi_targets_url(revision), timeout=300) response.raise_for_status() destination.write_bytes(response.content) try: diff --git a/tests/test_puf_source_provider.py b/tests/test_puf_source_provider.py index 78f2da2..ac38443 100644 --- a/tests/test_puf_source_provider.py +++ b/tests/test_puf_source_provider.py @@ -423,6 +423,55 @@ def fake_get(url, *, timeout): assert pd.read_csv(resolved)["Variable"].tolist() +def test_download_pe_soi_targets_revision_controls_cache_name_and_url( + tmp_path, + monkeypatch, +): + source = tmp_path / "source_soi_targets.csv" + _write_minimal_soi_csv(source) + + class FakeResponse: + content = source.read_bytes() + + def raise_for_status(self): + return None + + calls: list[tuple[str, int]] = [] + + def fake_get(url, *, timeout): + calls.append((url, timeout)) + return FakeResponse() + + monkeypatch.setattr(soi_module.requests, "get", fake_get) + + revision = "custom/ref" + cache_dir = tmp_path / "cache" + resolved = soi_module.download_pe_soi_targets( + cache_dir, + revision=revision, + ) + + assert resolved == soi_module.pe_soi_targets_cache_path( + cache_dir, + revision=revision, + ) + assert resolved.name == "soi_targets_pe_us_data_custom_ref.csv" + assert calls == [(soi_module.pe_soi_targets_url(revision), 300)] + + +def test_pe_soi_targets_paths_expand_user(tmp_path, monkeypatch): + home = tmp_path / "home" + home.mkdir() + soi_path = home / "soi_targets.csv" + _write_minimal_soi_csv(soi_path) + monkeypatch.setenv("HOME", str(home)) + + assert soi_module.validate_pe_soi_targets_file("~/soi_targets.csv") == soi_path + assert soi_module.pe_soi_targets_cache_path("~/cache") == ( + home / "cache" / soi_module.SOI_TARGETS_CACHE_FILENAME + ) + + def test_validate_pe_soi_targets_file_rejects_bad_schema(tmp_path): bad_path = tmp_path / "soi_targets.csv" pd.DataFrame({"Variable": ["count"]}).to_csv(bad_path, index=False) From 4f41ea747b532d331f8aa196c645e567f6bf9e1e Mon Sep 17 00:00:00 2001 From: Anthony Volk Date: Tue, 2 Jun 2026 19:34:36 +0200 Subject: [PATCH 5/5] Remove legacy SOI repo argument plumbing --- src/microplex_us/data_sources/puf.py | 8 -------- 1 file changed, 8 deletions(-) diff --git a/src/microplex_us/data_sources/puf.py b/src/microplex_us/data_sources/puf.py index ac07c88..ed4d2b2 100644 --- a/src/microplex_us/data_sources/puf.py +++ b/src/microplex_us/data_sources/puf.py @@ -471,15 +471,11 @@ def _normalize_puf_uprating_mode(mode: str | None) -> str: def _resolve_pe_soi_path( *, cache_dir: str | Path | None = None, - policyengine_us_data_repo: str | Path | None = None, soi_path: str | Path | None = None, ) -> Path: if soi_path is not None: return validate_pe_soi_targets_file(soi_path) - # Kept in the signature for backward compatibility with older callers. - # SOI resolution is now cache-backed and does not probe PE-US-data storage. - _ = policyengine_us_data_repo return download_pe_soi_targets(cache_dir=cache_dir) @@ -575,7 +571,6 @@ def uprate_raw_puf_pe_style( from_year: int = 2015, to_year: int = 2024, cache_dir: str | Path | None = None, - policyengine_us_data_repo: str | Path | None = None, soi_path: str | Path | None = None, ) -> pd.DataFrame: """Uprate raw PUF columns using the PE SOI growth contract.""" @@ -583,7 +578,6 @@ def uprate_raw_puf_pe_style( return puf.copy() resolved_soi_path = _resolve_pe_soi_path( cache_dir=cache_dir, - policyengine_us_data_repo=policyengine_us_data_repo, soi_path=soi_path, ) soi_table = _load_pe_soi_table(str(resolved_soi_path.resolve())) @@ -1816,7 +1810,6 @@ def load_puf( from_year=2015, to_year=raw_uprating_year, cache_dir=cache_dir, - policyengine_us_data_repo=policyengine_us_data_repo, soi_path=soi_path, ) @@ -1972,7 +1965,6 @@ def _build_puf_tax_units( from_year=2015, to_year=raw_uprating_year, cache_dir=cache_dir, - policyengine_us_data_repo=policyengine_us_data_repo, soi_path=soi_path, ) tax_units = map_puf_variables(