Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
863 changes: 863 additions & 0 deletions src/microplex_us/pipelines/export_lineage_manifest.py

Large diffs are not rendered by default.

57 changes: 57 additions & 0 deletions src/microplex_us/pipelines/mp300k_artifact_gates.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
"compatibility",
"column_contract",
"export_support",
"export_lineage",
"artifact_size",
"runtime",
"source_weight_diagnostics",
Expand Down Expand Up @@ -218,6 +219,10 @@ def build_mp300k_artifact_gate_report(
baseline_dataset=baseline_dataset,
period=period,
)
export_lineage_gate = _export_lineage_gate(
baseline_dataset=baseline_dataset,
period=period,
)
artifact_size_gate = _artifact_size_gate(
candidate_dataset,
baseline_dataset=baseline_dataset,
Expand Down Expand Up @@ -260,6 +265,7 @@ def build_mp300k_artifact_gate_report(
"compatibility": compatibility_gate,
"column_contract": column_contract_gate,
"export_support": export_support_gate,
"export_lineage": export_lineage_gate,
"artifact_size": artifact_size_gate,
"runtime": runtime_gate,
"source_weight_diagnostics": source_weight_diagnostics_gate,
Expand Down Expand Up @@ -594,6 +600,57 @@ def _export_support_gate(
)


def _export_lineage_gate(
*,
baseline_dataset: Path | None,
period: int,
) -> dict[str, Any]:
if baseline_dataset is None:
return _gate(
"unmeasured",
"pinned eCPS baseline H5 has not been attached for export-lineage comparison",
)
if not baseline_dataset.exists():
return _gate(
"fail",
"export-lineage comparison file is missing",
details={"missing": [str(baseline_dataset)]},
)

from microplex_us.pipelines.export_lineage_manifest import (
build_export_lineage_manifest,
)

manifest = build_export_lineage_manifest(
support_baseline=baseline_dataset,
period=period,
)
summary = dict(manifest["summary"])
columns = list(manifest["columns"])
default_only_columns = [
column["column"]
for column in columns
if column["export_path_status"] == "default_only"
]
details = {
"issues": manifest["issues"],
"default_only_columns": default_only_columns,
}
if manifest["issues"]:
return _gate(
"fail",
"required eCPS-populated exports lack source/code lineage",
metrics=summary,
details=details,
)
return _gate(
"pass",
"every eCPS-populated required export has source/code lineage",
metrics=summary,
details=details,
)


def _computed_policyengine_us_export_columns(columns: list[str]) -> set[str]:
try:
import policyengine_us
Expand Down
10 changes: 10 additions & 0 deletions src/microplex_us/pipelines/us.py
Original file line number Diff line number Diff line change
Expand Up @@ -10444,6 +10444,16 @@ def has_any(*columns: str) -> bool:
"pre_subsidy_rent",
"rent",
).clip(lower=0.0)
if has_any(
"weekly_hours_worked_before_lsr",
"hours_worked_last_week",
"hours_worked",
):
result["weekly_hours_worked_before_lsr"] = first_nonzero_or_present(
"weekly_hours_worked_before_lsr",
"hours_worked_last_week",
"hours_worked",
).clip(lower=0.0)

marital_status = (
pd.to_numeric(result["marital_status"], errors="coerce")
Expand Down
16 changes: 16 additions & 0 deletions src/microplex_us/policyengine/us.py
Original file line number Diff line number Diff line change
Expand Up @@ -3610,6 +3610,22 @@ def _with_policyengine_person_export_derivatives(
person_table["hours_worked"],
errors="coerce",
).fillna(0.0)
if (
"weekly_hours_worked_before_lsr" not in person_table.columns
and "hours_worked_last_week" in person_table.columns
):
person_table["weekly_hours_worked_before_lsr"] = pd.to_numeric(
person_table["hours_worked_last_week"],
errors="coerce",
).fillna(0.0)
elif (
"weekly_hours_worked_before_lsr" not in person_table.columns
and "hours_worked" in person_table.columns
):
person_table["weekly_hours_worked_before_lsr"] = pd.to_numeric(
person_table["hours_worked"],
errors="coerce",
).fillna(0.0)
if "has_tin" not in person_table.columns:
person_table["has_tin"] = _derive_has_tin_for_export(person_table)
if "has_itin" not in person_table.columns:
Expand Down
70 changes: 70 additions & 0 deletions tests/pipelines/test_export_lineage_manifest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
import json

import h5py
import numpy as np

from microplex_us.pipelines.export_lineage_manifest import (
build_export_lineage_manifest,
)


def _columns_by_name(payload):
return {column["column"]: column for column in payload["columns"]}


def test_export_lineage_manifest_tracks_source_backed_blocks():
payload = build_export_lineage_manifest()
columns = _columns_by_name(payload)

for column in (
"business_is_sstb",
"home_mortgage_interest",
"reported_has_medicaid_health_coverage_at_interview",
"ssn_card_type",
"weekly_hours_worked_before_lsr",
):
assert columns[column]["has_source_lineage"]

assert (
columns["selected_marketplace_plan_benchmark_ratio"]["export_path_status"]
== "default_only"
)
assert not columns["selected_marketplace_plan_benchmark_ratio"][
"has_source_lineage"
]


def test_export_lineage_manifest_flags_populated_ecps_default_only_column(tmp_path):
contract_path = tmp_path / "contract.json"
contract_path.write_text(
json.dumps(
{
"required": [
"selected_marketplace_plan_benchmark_ratio",
"weekly_hours_worked_before_lsr",
],
"forbidden": [],
}
)
)
baseline_path = tmp_path / "baseline.h5"
with h5py.File(baseline_path, "w") as handle:
selected = handle.create_group("selected_marketplace_plan_benchmark_ratio")
selected.create_dataset("2024", data=np.array([0.8, 1.0]))
weekly_hours = handle.create_group("weekly_hours_worked_before_lsr")
weekly_hours.create_dataset("2024", data=np.array([0.0, 40.0]))

payload = build_export_lineage_manifest(
contract_path=contract_path,
support_baseline=baseline_path,
)

issues = {issue["column"]: issue for issue in payload["issues"]}
assert issues == {
"selected_marketplace_plan_benchmark_ratio": {
"column": "selected_marketplace_plan_benchmark_ratio",
"ecps_support_requirement": "numeric_nonzero",
"export_path_status": "default_only",
"issue": "ecps_populated_export_has_no_source_lineage",
}
}
49 changes: 49 additions & 0 deletions tests/pipelines/test_mp300k_artifact_gates.py
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,7 @@ def test_write_mp300k_artifact_gate_report_passes_with_all_evidence(tmp_path):
assert record["gates"]["compatibility"]["metrics"]["person_count"] == 3
assert record["gates"]["column_contract"]["status"] == "pass"
assert record["gates"]["export_support"]["status"] == "pass"
assert record["gates"]["export_lineage"]["status"] == "pass"
assert record["gates"]["artifact_size"]["status"] == "pass"
assert record["gates"]["ecps_comparison"]["status"] == "pass"
assert record["gates"]["arch_target_coverage"]["status"] == "pass"
Expand Down Expand Up @@ -428,6 +429,54 @@ def test_export_support_gate_ignores_ecps_filler_columns(tmp_path):
assert support_gate["metrics"]["ecps_filler_export_column_count"] == 1


def test_export_lineage_gate_rejects_ecps_populated_default_only_column(tmp_path):
artifact_dir = tmp_path / "artifact"
artifact_dir.mkdir()
candidate_dataset = _write_minimal_policyengine_dataset(
artifact_dir / "candidate.h5"
)
_add_period_dataset(
candidate_dataset,
"selected_marketplace_plan_benchmark_ratio",
[1.0, 1.0],
)
baseline_dataset = _write_minimal_policyengine_dataset(tmp_path / "baseline.h5")
_add_period_dataset(
baseline_dataset,
"selected_marketplace_plan_benchmark_ratio",
[0.8, 1.2],
)
benchmark_manifest = tmp_path / "benchmark_manifest.json"
_write_benchmark_manifest(benchmark_manifest)
_write_artifact_manifest(artifact_dir, baseline_dataset=baseline_dataset)

report_path = write_mp300k_artifact_gate_report(
artifact_dir,
ecps_comparison_payload=_sound_ecps_comparison_payload(),
arch_coverage_payload=_arch_coverage_payload(),
runtime_smoke_payload={"runtime_ratio": 1.0},
benchmark_manifest_path=benchmark_manifest,
compute_native_scores=False,
update_manifest=False,
)

record = json.loads(report_path.read_text())
support_gate = record["gates"]["export_support"]
lineage_gate = record["gates"]["export_lineage"]

assert record["summary"]["status"] == "failed"
assert support_gate["status"] == "pass"
assert lineage_gate["status"] == "fail"
assert lineage_gate["details"]["issues"] == [
{
"column": "selected_marketplace_plan_benchmark_ratio",
"ecps_support_requirement": "numeric_nonzero",
"export_path_status": "default_only",
"issue": "ecps_populated_export_has_no_source_lineage",
}
]


def test_column_contract_gate_rejects_extra_candidate_columns(tmp_path):
artifact_dir = tmp_path / "artifact"
artifact_dir.mkdir()
Expand Down
2 changes: 2 additions & 0 deletions tests/pipelines/test_us.py
Original file line number Diff line number Diff line change
Expand Up @@ -5365,6 +5365,7 @@ def test_augment_policyengine_person_inputs_materializes_export_support_aliases(
"deductible_mortgage_interest": [900.0, 0.0],
"investment_income_elected_form_4952": [40.0, 0.0],
"health_insurance_premiums_without_medicare_part_b": [120.0, 0.0],
"hours_worked": [37.5, 0.0],
}
)

Expand Down Expand Up @@ -5401,6 +5402,7 @@ def test_augment_policyengine_person_inputs_materializes_export_support_aliases(
assert augmented["home_mortgage_interest"].tolist() == [900.0, 0.0]
assert augmented["investment_interest_expense"].tolist() == [40.0, 0.0]
assert augmented["other_health_insurance_premiums"].tolist() == [120.0, 0.0]
assert augmented["weekly_hours_worked_before_lsr"].tolist() == [37.5, 0.0]

def test_augment_policyengine_person_inputs_coalesces_sparse_source_aliases_by_row(
self,
Expand Down
6 changes: 6 additions & 0 deletions tests/policyengine/test_us.py
Original file line number Diff line number Diff line change
Expand Up @@ -2797,6 +2797,7 @@ class FakeSystem:
"hours_worked_last_week": FakeVariable("person"),
"in_nyc": FakeVariable("household", formulas={"2024": object()}),
"meets_ssi_disability_criteria": FakeVariable("person"),
"weekly_hours_worked_before_lsr": FakeVariable("person"),
}

tables = PolicyEngineUSEntityTableBundle(
Expand Down Expand Up @@ -2849,6 +2850,10 @@ class FakeSystem:
assert arrays["has_tin"]["2024"].tolist() == [False, True]
assert arrays["has_itin"]["2024"].tolist() == [False, True]
assert arrays["hours_worked_last_week"]["2024"].tolist() == [0.0, 50.0]
assert arrays["weekly_hours_worked_before_lsr"]["2024"].tolist() == [
0.0,
50.0,
]
assert arrays["meets_ssi_disability_criteria"]["2024"].tolist() == [
True,
False,
Expand All @@ -2864,6 +2869,7 @@ class FakeSystem:
"hours_worked_last_week",
"in_nyc",
"meets_ssi_disability_criteria",
"weekly_hours_worked_before_lsr",
}.issubset(columns)

def test_projects_frame_and_writes_time_period_dataset(self, tmp_path):
Expand Down
Loading