diff --git a/src/microplex_us/data_sources/puf.py b/src/microplex_us/data_sources/puf.py index aacfae6..ed4d2b2 100644 --- a/src/microplex_us/data_sources/puf.py +++ b/src/microplex_us/data_sources/puf.py @@ -38,6 +38,10 @@ fit_grouped_share_model, predict_grouped_component_shares, ) +from microplex_us.data_sources.soi import ( + download_pe_soi_targets, + validate_pe_soi_targets_file, +) from microplex_us.pipelines.pe_native_scores import ( build_policyengine_us_data_subprocess_env, resolve_policyengine_us_data_python, @@ -466,25 +470,13 @@ def _normalize_puf_uprating_mode(mode: str | None) -> str: def _resolve_pe_soi_path( *, - policyengine_us_data_repo: str | Path | None = None, + cache_dir: str | Path | None = None, soi_path: str | Path | None = None, ) -> Path: if soi_path is not None: - resolved = Path(soi_path) - elif policyengine_us_data_repo is not None: - resolved = ( - Path(policyengine_us_data_repo) - / "policyengine_us_data" - / "storage" - / "soi.csv" - ) - else: - raise ValueError( - "PE SOI uprating requires soi_path or policyengine_us_data_repo" - ) - if not resolved.exists(): - raise FileNotFoundError(f"Could not find PE SOI file at {resolved}") - return resolved + return validate_pe_soi_targets_file(soi_path) + + return download_pe_soi_targets(cache_dir=cache_dir) def _resolve_pe_uprating_factors_path( @@ -578,14 +570,14 @@ def uprate_raw_puf_pe_style( *, from_year: int = 2015, to_year: int = 2024, - policyengine_us_data_repo: str | Path | None = None, + cache_dir: str | Path | None = None, soi_path: str | Path | None = None, ) -> pd.DataFrame: """Uprate raw PUF columns using the PE SOI growth contract.""" if from_year == to_year: return puf.copy() resolved_soi_path = _resolve_pe_soi_path( - policyengine_us_data_repo=policyengine_us_data_repo, + cache_dir=cache_dir, soi_path=soi_path, ) soi_table = _load_pe_soi_table(str(resolved_soi_path.resolve())) @@ -1817,7 +1809,7 @@ def load_puf( raw, from_year=2015, to_year=raw_uprating_year, - policyengine_us_data_repo=policyengine_us_data_repo, + cache_dir=cache_dir, soi_path=soi_path, ) @@ -1960,6 +1952,7 @@ def _build_puf_tax_units( policyengine_us_data_python: str | Path | None = None, impute_pre_tax_contributions: bool = False, pre_tax_training_year: int = 2024, + cache_dir: str | Path | None = None, soi_path: str | Path | None = None, require_pre_tax_contribution_model: bool = False, ) -> pd.DataFrame: @@ -1971,7 +1964,7 @@ def _build_puf_tax_units( raw, from_year=2015, to_year=raw_uprating_year, - policyengine_us_data_repo=policyengine_us_data_repo, + cache_dir=cache_dir, soi_path=soi_path, ) tax_units = map_puf_variables( @@ -2259,6 +2252,7 @@ def load_frame(self, query: SourceQuery | None = None) -> ObservationFrame: policyengine_us_data_python=policyengine_us_data_python, impute_pre_tax_contributions=impute_pre_tax_contributions, pre_tax_training_year=pre_tax_training_year, + cache_dir=self.cache_dir, soi_path=soi_path, require_pre_tax_contribution_model=require_pre_tax_contribution_model, ) diff --git a/src/microplex_us/data_sources/soi.py b/src/microplex_us/data_sources/soi.py new file mode 100644 index 0000000..cdbeba0 --- /dev/null +++ b/src/microplex_us/data_sources/soi.py @@ -0,0 +1,116 @@ +"""IRS SOI target-table artifact resolution for US source adapters.""" + +from __future__ import annotations + +from pathlib import Path + +import pandas as pd +import requests + +DEFAULT_CACHE_DIR = Path.home() / ".cache" / "microplex" +SOI_TARGETS_POLICYENGINE_US_DATA_REF = "f7458313c86fa580fb1e43a2f18252d67cf76e4a" +# The HF policyengine-us-data model publishes current raw inputs/target DB +# artifacts, but not this historical PE-style long target table. +SOI_TARGETS_REPO_RAW_URL = ( + "https://raw.githubusercontent.com/PolicyEngine/policyengine-us-data" +) +SOI_TARGETS_REPO_PATH = ( + "policyengine_us_data/storage/calibration_targets/soi_targets.csv" +) + +PE_SOI_TARGETS_REQUIRED_COLUMNS = frozenset( + { + "Year", + "Variable", + "Filing status", + "AGI lower bound", + "AGI upper bound", + "Count", + "Taxable only", + "Value", + } +) + + +def _cache_safe_ref(ref: str) -> str: + return "".join( + character if character.isalnum() or character in "._-" else "_" + for character in ref + ) + + +def _soi_targets_cache_filename(revision: str) -> str: + return f"soi_targets_pe_us_data_{_cache_safe_ref(revision)}.csv" + + +def pe_soi_targets_url( + revision: str = SOI_TARGETS_POLICYENGINE_US_DATA_REF, +) -> str: + """Return the pinned PE-US-data raw URL for the SOI targets table.""" + + return f"{SOI_TARGETS_REPO_RAW_URL}/{revision}/{SOI_TARGETS_REPO_PATH}" + + +SOI_TARGETS_URL = pe_soi_targets_url() +SOI_TARGETS_CACHE_FILENAME = _soi_targets_cache_filename( + SOI_TARGETS_POLICYENGINE_US_DATA_REF +) + + +def validate_pe_soi_targets_file(path: str | Path) -> Path: + """Validate the PE-style long SOI target table schema.""" + + resolved = Path(path).expanduser() + if not resolved.exists(): + raise FileNotFoundError(f"Could not find PE SOI targets file at {resolved}") + + try: + columns = set(pd.read_csv(resolved, nrows=0).columns) + except Exception as exc: + raise ValueError(f"Could not read PE SOI targets file at {resolved}") from exc + + missing = sorted(PE_SOI_TARGETS_REQUIRED_COLUMNS - columns) + if missing: + raise ValueError( + "PE SOI targets file is missing required columns " + f"{missing}: {resolved}" + ) + return resolved + + +def pe_soi_targets_cache_path( + cache_dir: str | Path | None = None, + *, + revision: str = SOI_TARGETS_POLICYENGINE_US_DATA_REF, +) -> Path: + """Return the cache path for the PE-style SOI targets table.""" + + resolved_cache_dir = DEFAULT_CACHE_DIR if cache_dir is None else Path(cache_dir) + resolved_cache_dir = resolved_cache_dir.expanduser() + return resolved_cache_dir / _soi_targets_cache_filename(revision) + + +def download_pe_soi_targets( + cache_dir: str | Path | None = None, + *, + force: bool = False, + revision: str = SOI_TARGETS_POLICYENGINE_US_DATA_REF, + url: str | None = None, +) -> Path: + """Resolve the PE-style SOI targets table into the microplex cache.""" + + resolved_cache_dir = DEFAULT_CACHE_DIR if cache_dir is None else Path(cache_dir) + resolved_cache_dir = resolved_cache_dir.expanduser() + resolved_cache_dir.mkdir(parents=True, exist_ok=True) + destination = pe_soi_targets_cache_path(resolved_cache_dir, revision=revision) + if destination.exists() and not force: + return validate_pe_soi_targets_file(destination) + + response = requests.get(url or pe_soi_targets_url(revision), timeout=300) + response.raise_for_status() + destination.write_bytes(response.content) + try: + return validate_pe_soi_targets_file(destination) + except Exception: + destination.unlink(missing_ok=True) + raise diff --git a/src/microplex_us/pipelines/pe_us_data_rebuild.py b/src/microplex_us/pipelines/pe_us_data_rebuild.py index 791cb61..e0a502e 100644 --- a/src/microplex_us/pipelines/pe_us_data_rebuild.py +++ b/src/microplex_us/pipelines/pe_us_data_rebuild.py @@ -101,6 +101,7 @@ def default_policyengine_us_data_rebuild_source_providers( puf_cache_dir: str | Path | None = None, puf_path: str | Path | None = None, puf_demographics_path: str | Path | None = None, + soi_path: str | Path | None = None, puf_expand_persons: bool = True, include_donor_surveys: bool = True, include_acs: bool | None = None, @@ -141,6 +142,7 @@ def default_policyengine_us_data_rebuild_source_providers( cache_dir=puf_cache, puf_path=puf_path, demographics_path=puf_demographics_path, + soi_path=soi_path, expand_persons=bool(puf_expand_persons), uprating_mode=PUF_UPRATING_MODE_PE_SOI, cps_reference_year=( diff --git a/src/microplex_us/pipelines/pe_us_data_rebuild_checkpoint.py b/src/microplex_us/pipelines/pe_us_data_rebuild_checkpoint.py index 8c7e681..488484b 100644 --- a/src/microplex_us/pipelines/pe_us_data_rebuild_checkpoint.py +++ b/src/microplex_us/pipelines/pe_us_data_rebuild_checkpoint.py @@ -1828,6 +1828,7 @@ def run_policyengine_us_data_rebuild_checkpoint( puf_cache_dir: str | Path | None = None, puf_path: str | Path | None = None, puf_demographics_path: str | Path | None = None, + soi_path: str | Path | None = None, puf_expand_persons: bool = True, include_donor_surveys: bool = True, include_acs: bool | None = None, @@ -1915,6 +1916,7 @@ def run_policyengine_us_data_rebuild_checkpoint( puf_cache_dir=puf_cache_dir, puf_path=puf_path, puf_demographics_path=puf_demographics_path, + soi_path=soi_path, puf_expand_persons=puf_expand_persons, include_donor_surveys=include_donor_surveys, include_acs=include_acs, @@ -2110,6 +2112,7 @@ def main(argv: list[str] | None = None) -> None: parser.add_argument("--donor-cache-dir") parser.add_argument("--puf-path") parser.add_argument("--puf-demographics-path") + parser.add_argument("--soi-path") parser.add_argument("--cps-sample-n", type=int) parser.add_argument("--puf-sample-n", type=int) parser.add_argument("--donor-sample-n", type=int) @@ -2322,6 +2325,7 @@ def main(argv: list[str] | None = None) -> None: puf_cache_dir=args.puf_cache_dir, puf_path=args.puf_path, puf_demographics_path=args.puf_demographics_path, + soi_path=args.soi_path, puf_expand_persons=not args.no_puf_expand_persons, include_donor_surveys=args.include_donor_surveys, include_acs=args.include_acs, diff --git a/tests/pipelines/test_pe_us_data_rebuild.py b/tests/pipelines/test_pe_us_data_rebuild.py index 01cf675..1279df5 100644 --- a/tests/pipelines/test_pe_us_data_rebuild.py +++ b/tests/pipelines/test_pe_us_data_rebuild.py @@ -111,6 +111,7 @@ def test_default_policyengine_us_data_rebuild_source_providers_use_pe_style_bund puf_target_year=2024, cps_download=False, puf_expand_persons=False, + soi_path="/tmp/soi_targets.csv", policyengine_us_data_python="/tmp/pe-python", ) @@ -124,6 +125,7 @@ def test_default_policyengine_us_data_rebuild_source_providers_use_pe_style_bund assert puf_provider.cps_reference_year == 2022 assert puf_provider.expand_persons is False assert puf_provider.uprating_mode == PUF_UPRATING_MODE_PE_SOI + assert puf_provider.soi_path == "/tmp/soi_targets.csv" assert puf_provider.policyengine_us_data_python == "/tmp/pe-python" assert puf_provider.impute_pre_tax_contributions is False assert puf_provider.require_pre_tax_contribution_model is False diff --git a/tests/pipelines/test_pe_us_data_rebuild_checkpoint.py b/tests/pipelines/test_pe_us_data_rebuild_checkpoint.py index 6557317..858438b 100644 --- a/tests/pipelines/test_pe_us_data_rebuild_checkpoint.py +++ b/tests/pipelines/test_pe_us_data_rebuild_checkpoint.py @@ -221,6 +221,19 @@ def test_default_policyengine_us_data_rebuild_queries_assign_sample_sizes_by_pro } +def test_default_policyengine_us_data_rebuild_source_providers_forwards_soi_path( + tmp_path, +) -> None: + soi_path = tmp_path / "soi_targets.csv" + providers = default_policyengine_us_data_rebuild_source_providers( + include_donor_surveys=False, + cps_download=False, + soi_path=soi_path, + ) + + assert getattr(providers[1], "soi_path") == soi_path + + def test_default_policyengine_us_data_rebuild_queries_derive_donor_sample_size_from_sampled_sources() -> ( None ): @@ -706,6 +719,8 @@ def fake_run_policyengine_us_data_rebuild_checkpoint(**kwargs): "/tmp/policy_data.db", "--version-id", "run-1", + "--soi-path", + "/tmp/soi_targets.csv", "--donor-imputer-condition-selection", "pe_plus_puf_native_challenger", "--defer-native-audit", @@ -718,6 +733,7 @@ def fake_run_policyengine_us_data_rebuild_checkpoint(**kwargs): ) assert captured["config_overrides"]["n_synthetic"] == 100_000 assert captured["config_overrides"]["random_seed"] == 42 + assert captured["soi_path"] == "/tmp/soi_targets.csv" assert captured["defer_native_audit"] is True assert captured["defer_imputation_ablation"] is True stdout = capsys.readouterr().out diff --git a/tests/test_puf_source_provider.py b/tests/test_puf_source_provider.py index b4d975b..ac38443 100644 --- a/tests/test_puf_source_provider.py +++ b/tests/test_puf_source_provider.py @@ -12,6 +12,7 @@ from microplex.core import EntityType, SourceArchetype, SourceProvider, SourceQuery import microplex_us.data_sources.puf as puf_module +import microplex_us.data_sources.soi as soi_module from microplex_us.data_sources import PUFSourceProvider, expand_to_persons from microplex_us.data_sources.puf import ( PUF_UPRATING_MODE_PE_SOI, @@ -334,7 +335,7 @@ def fail_grouped_loader(**_kwargs): def test_uprate_raw_puf_pe_style_matches_pe_soi_contract(tmp_path): - soi_path = tmp_path / "soi.csv" + soi_path = tmp_path / "soi_targets.csv" _write_minimal_soi_csv(soi_path) raw = pd.DataFrame( @@ -361,6 +362,124 @@ def test_uprate_raw_puf_pe_style_matches_pe_soi_contract(tmp_path): assert result["S006"].tolist() == pytest.approx([110.0, 220.0]) +def test_uprate_raw_puf_pe_style_uses_cached_soi_targets_by_default( + tmp_path, + monkeypatch, +): + cache_dir = tmp_path / "cache" + cache_dir.mkdir() + soi_path = soi_module.pe_soi_targets_cache_path(cache_dir) + _write_minimal_soi_csv(soi_path) + + def fail_get(*args, **kwargs): + raise AssertionError( + "requests.get should not be called for cached SOI targets" + ) + + monkeypatch.setattr(soi_module.requests, "get", fail_get) + + raw = pd.DataFrame( + { + "E00200": [10.0], + "S006": [100.0], + } + ) + + result = uprate_raw_puf_pe_style( + raw, + from_year=2015, + to_year=2024, + cache_dir=cache_dir, + ) + + assert result["E00200"].tolist() == pytest.approx([15.0]) + assert result["S006"].tolist() == pytest.approx([110.0]) + + +def test_download_pe_soi_targets_fetches_missing_cache(tmp_path, monkeypatch): + source = tmp_path / "source_soi_targets.csv" + _write_minimal_soi_csv(source) + + class FakeResponse: + content = source.read_bytes() + + def raise_for_status(self): + return None + + calls: list[tuple[str, int]] = [] + + def fake_get(url, *, timeout): + calls.append((url, timeout)) + return FakeResponse() + + monkeypatch.setattr(soi_module.requests, "get", fake_get) + + cache_dir = tmp_path / "cache" + resolved = soi_module.download_pe_soi_targets(cache_dir) + + assert resolved == soi_module.pe_soi_targets_cache_path(cache_dir) + assert soi_module.SOI_TARGETS_POLICYENGINE_US_DATA_REF in resolved.name + assert calls == [(soi_module.SOI_TARGETS_URL, 300)] + assert pd.read_csv(resolved)["Variable"].tolist() + + +def test_download_pe_soi_targets_revision_controls_cache_name_and_url( + tmp_path, + monkeypatch, +): + source = tmp_path / "source_soi_targets.csv" + _write_minimal_soi_csv(source) + + class FakeResponse: + content = source.read_bytes() + + def raise_for_status(self): + return None + + calls: list[tuple[str, int]] = [] + + def fake_get(url, *, timeout): + calls.append((url, timeout)) + return FakeResponse() + + monkeypatch.setattr(soi_module.requests, "get", fake_get) + + revision = "custom/ref" + cache_dir = tmp_path / "cache" + resolved = soi_module.download_pe_soi_targets( + cache_dir, + revision=revision, + ) + + assert resolved == soi_module.pe_soi_targets_cache_path( + cache_dir, + revision=revision, + ) + assert resolved.name == "soi_targets_pe_us_data_custom_ref.csv" + assert calls == [(soi_module.pe_soi_targets_url(revision), 300)] + + +def test_pe_soi_targets_paths_expand_user(tmp_path, monkeypatch): + home = tmp_path / "home" + home.mkdir() + soi_path = home / "soi_targets.csv" + _write_minimal_soi_csv(soi_path) + monkeypatch.setenv("HOME", str(home)) + + assert soi_module.validate_pe_soi_targets_file("~/soi_targets.csv") == soi_path + assert soi_module.pe_soi_targets_cache_path("~/cache") == ( + home / "cache" / soi_module.SOI_TARGETS_CACHE_FILENAME + ) + + +def test_validate_pe_soi_targets_file_rejects_bad_schema(tmp_path): + bad_path = tmp_path / "soi_targets.csv" + pd.DataFrame({"Variable": ["count"]}).to_csv(bad_path, index=False) + + with pytest.raises(ValueError, match="missing required columns"): + soi_module.validate_pe_soi_targets_file(bad_path) + + def test_uprate_mapped_puf_with_pe_factors_uses_aliases_and_recomputes(tmp_path): repo_root = tmp_path / "pe-us-data" storage = repo_root / "policyengine_us_data" / "storage" @@ -402,7 +521,7 @@ def test_puf_source_provider_pe_soi_mode_uses_raw_uprating(tmp_path): repo_root = tmp_path / "pe-us-data" storage = repo_root / "policyengine_us_data" / "storage" storage.mkdir(parents=True) - soi_path = storage / "soi.csv" + soi_path = tmp_path / "soi_targets.csv" uprating_factors_path = storage / "uprating_factors.csv" _write_minimal_soi_csv(soi_path) _write_minimal_uprating_factors_csv(uprating_factors_path) @@ -441,6 +560,58 @@ def test_puf_source_provider_pe_soi_mode_uses_raw_uprating(tmp_path): assert person["non_sch_d_capital_gains"] == pytest.approx(13.0) +def test_puf_source_provider_pe_soi_mode_does_not_require_repo_storage_soi( + tmp_path, + monkeypatch, +): + repo_root = tmp_path / "pe-us-data" + storage = repo_root / "policyengine_us_data" / "storage" + storage.mkdir(parents=True) + _write_minimal_uprating_factors_csv(storage / "uprating_factors.csv") + cache_dir = tmp_path / "cache" + cache_dir.mkdir() + _write_minimal_soi_csv(soi_module.pe_soi_targets_cache_path(cache_dir)) + + def fail_get(*args, **kwargs): + raise AssertionError( + "requests.get should not be called for cached SOI targets" + ) + + monkeypatch.setattr(soi_module.requests, "get", fail_get) + + puf = pd.DataFrame( + { + "RECID": [101], + "MARS": [1], + "XTOT": [1], + "S006": [100.0], + "E00200": [10.0], + "E01100": [5.0], + "AGE_HEAD": [45], + "GENDER": [1], + } + ) + puf_path = tmp_path / "puf.csv" + demographics_path = tmp_path / "demographics.csv" + puf.to_csv(puf_path, index=False) + pd.DataFrame({"RECID": [101]}).to_csv(demographics_path, index=False) + + provider = PUFSourceProvider( + puf_path=puf_path, + demographics_path=demographics_path, + target_year=2024, + cache_dir=cache_dir, + uprating_mode=PUF_UPRATING_MODE_PE_SOI, + policyengine_us_data_repo=repo_root, + social_security_share_model_loader=_mock_social_security_share_model_loader, + ) + frame = provider.load_frame(SourceQuery(period=2024)) + person = frame.tables[EntityType.PERSON].iloc[0] + + assert not (storage / "soi.csv").exists() + assert person["employment_income"] == pytest.approx(18.0) + + def test_expand_to_persons_splits_negative_joint_self_employment_losses(): tax_units = pd.DataFrame( {