Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
299 changes: 295 additions & 4 deletions src/microplex_us/pipelines/check_export_columns.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,9 @@
import argparse
import json
import sys
from dataclasses import dataclass
from dataclasses import asdict, dataclass
from pathlib import Path
from typing import Any

# Path to the committed contract shipped alongside this module.
DEFAULT_CONTRACT_PATH = Path(__file__).with_name("ecps_export_contract.json")
Expand All @@ -65,6 +66,43 @@ def ok(self) -> bool:
return not self.missing_required and not self.forbidden_present


@dataclass
class ColumnSupportStats:
"""Compact support/variation summary for one exported H5 column."""

column: str
kind: str
row_count: int
nonzero_count: int | None
unique_count: int


@dataclass
class ColumnSupportIssue:
"""One eCPS-populated column missing equivalent MP support."""

column: str
requirement: str
baseline: ColumnSupportStats
candidate: ColumnSupportStats | None


@dataclass
class SupportDiff:
"""Result of comparing candidate support against eCPS support."""

issues: list[ColumnSupportIssue]
checked_columns: list[str]
baseline_populated_columns: list[str]
baseline_filler_columns: list[str]
exempt_columns: list[str]

@property
def ok(self) -> bool:
"""True when every eCPS-populated column has candidate support."""
return not self.issues


def compute_column_diff(
present: set[str],
*,
Expand Down Expand Up @@ -93,6 +131,171 @@ def compute_column_diff(
)


def compute_support_diff(
candidate_h5: Path,
*,
baseline_h5: Path,
period: int,
required_columns: set[str],
exempt_columns: frozenset[str] | set[str] = frozenset(),
) -> SupportDiff:
"""Compare candidate support against eCPS support for required columns.

Presence is not enough for release parity. If the pinned eCPS baseline
*populates* a required exported column, MP must populate it too:

- numeric columns: eCPS has at least one nonzero value, so MP must also
have at least one nonzero value;
- boolean/string/categorical columns: eCPS has more than one unique value,
so MP must also vary.

Columns where eCPS itself is all-zero/single-valued are treated as fillers
and do not require MP support. Explicit exemptions are reserved for known
rare, computed-downstream, or intentionally absent variables.
"""
period_key = str(int(period))
exempt = {str(column) for column in exempt_columns}
checked_columns: list[str] = []
baseline_populated_columns: list[str] = []
baseline_filler_columns: list[str] = []
issues: list[ColumnSupportIssue] = []

import h5py

with (
h5py.File(candidate_h5, "r") as candidate,
h5py.File(baseline_h5, "r") as baseline,
):
for column in sorted(required_columns):
if column in exempt:
continue
baseline_values = _h5_column_values(
baseline,
column,
period_key=period_key,
)
if baseline_values is None:
continue
checked_columns.append(column)
baseline_stats = _support_stats(column, baseline_values)
requirement = _support_requirement(baseline_stats)
if requirement is None:
baseline_filler_columns.append(column)
continue
baseline_populated_columns.append(column)
candidate_values = _h5_column_values(
candidate,
column,
period_key=period_key,
)
candidate_stats = (
None
if candidate_values is None
else _support_stats(column, candidate_values)
)
if not _satisfies_support_requirement(
candidate_stats,
requirement=requirement,
):
issues.append(
ColumnSupportIssue(
column=column,
requirement=requirement,
baseline=baseline_stats,
candidate=candidate_stats,
)
)

return SupportDiff(
issues=issues,
checked_columns=checked_columns,
baseline_populated_columns=baseline_populated_columns,
baseline_filler_columns=baseline_filler_columns,
exempt_columns=sorted(exempt & set(required_columns)),
)


def _h5_column_values(
handle: Any,
column: str,
*,
period_key: str,
):
"""Return one H5 column's values, supporting grouped and flat layouts."""
if column not in handle:
return None
item = handle[column]
import h5py
import numpy as np

if isinstance(item, h5py.Group):
if period_key not in item:
return None
item = item[period_key]
if not isinstance(item, h5py.Dataset):
return None
return np.asarray(item)


def _support_stats(column: str, values) -> ColumnSupportStats:
"""Summarize nonzero support and uniqueness for an exported column."""
import numpy as np

array = np.asarray(values)
flattened = array.reshape(-1)
unique_count = int(len(np.unique(flattened))) if flattened.size else 0
kind = _support_kind(flattened)
nonzero_count: int | None = None
if kind == "numeric":
numeric = flattened
if np.issubdtype(numeric.dtype, np.floating):
numeric = numeric[np.isfinite(numeric)]
nonzero_count = int(np.count_nonzero(numeric))
return ColumnSupportStats(
column=column,
kind=kind,
row_count=int(flattened.size),
nonzero_count=nonzero_count,
unique_count=unique_count,
)


def _support_kind(values) -> str:
"""Classify a NumPy array for support checking."""
import numpy as np

dtype = np.asarray(values).dtype
if np.issubdtype(dtype, np.bool_):
return "categorical"
if np.issubdtype(dtype, np.number):
return "numeric"
return "categorical"


def _support_requirement(stats: ColumnSupportStats) -> str | None:
"""Return the support MP must match for an eCPS column, if any."""
if stats.kind == "numeric":
return "numeric_nonzero" if (stats.nonzero_count or 0) > 0 else None
return "categorical_variation" if stats.unique_count > 1 else None


def _satisfies_support_requirement(
stats: ColumnSupportStats | None,
*,
requirement: str,
) -> bool:
"""Return whether candidate stats meet an eCPS-derived requirement."""
if stats is None:
return False
if requirement == "numeric_nonzero":
if stats.kind != "numeric":
return stats.unique_count > 1
return (stats.nonzero_count or 0) > 0
if requirement == "categorical_variation":
return stats.unique_count > 1
raise ValueError(f"Unknown support requirement: {requirement}")


def load_contract(path: Path) -> dict:
"""Load and validate the column-parity contract JSON."""
with open(path) as f:
Expand Down Expand Up @@ -188,6 +391,7 @@ def _format_report(
n_present: int,
n_required: int,
n_forbidden: int,
support_diff: SupportDiff | None = None,
) -> str:
"""Build a human-readable report for the diff."""
lines = [
Expand All @@ -203,12 +407,47 @@ def _format_report(
*_bullet_lines(diff.forbidden_present),
f" extra_unknown (informational, {len(diff.extra_unknown)}):",
*_bullet_lines(diff.extra_unknown),
"",
" RESULT: " + ("PASS" if diff.ok else "FAIL"),
]
if support_diff is not None:
lines.extend(
[
"",
" eCPS support parity:",
f" checked_columns: {len(support_diff.checked_columns)}",
f" eCPS-populated columns: {len(support_diff.baseline_populated_columns)}",
f" eCPS filler columns: {len(support_diff.baseline_filler_columns)}",
f" explicit support exemptions: {len(support_diff.exempt_columns)}",
f" unsupported_populated ({len(support_diff.issues)}):",
*_bullet_lines(
[
f"{issue.column} ({issue.requirement}; "
f"eCPS={_compact_stats(issue.baseline)}, "
f"candidate={_compact_stats(issue.candidate)})"
for issue in support_diff.issues
]
),
]
)
ok = diff.ok and (support_diff is None or support_diff.ok)
lines.extend(["", " RESULT: " + ("PASS" if ok else "FAIL")])
return "\n".join(lines)


def _compact_stats(stats: ColumnSupportStats | None) -> str:
"""Render support stats compactly for CLI output."""
if stats is None:
return "missing"
if stats.kind == "numeric":
return f"nonzero {stats.nonzero_count}/{stats.row_count}"
return f"unique {stats.unique_count}/{stats.row_count}"


def support_diff_to_dict(diff: SupportDiff) -> dict[str, Any]:
"""Return a JSON-serializable support parity payload."""
payload = asdict(diff)
return payload


def main(argv: list[str] | None = None) -> int:
"""Run the column-parity check; return the process exit code."""
parser = argparse.ArgumentParser(
Expand Down Expand Up @@ -256,6 +495,37 @@ def main(argv: list[str] | None = None) -> int:
default=str(DEFAULT_CONTRACT_PATH),
help="Override the contract JSON (default: committed contract).",
)
parser.add_argument(
"--support-baseline",
metavar="H5",
help=(
"Pinned eCPS baseline H5. When supplied with an H5 candidate, "
"also fail if eCPS has nonzero/variant support for a required "
"exported column and the candidate is all-zero/constant."
),
)
parser.add_argument(
"--period",
type=int,
default=2024,
help="Tax year period to inspect for H5 support parity (default: 2024).",
)
parser.add_argument(
"--support-exempt-column",
action="append",
default=[],
metavar="COLUMN",
help=(
"Required export column exempt from support parity because it is "
"declared rare, computed downstream, or intentionally absent. "
"Repeat for each explicit exception."
),
)
parser.add_argument(
"--support-diagnostics-json",
metavar="FILE",
help="Optional path to write support-parity diagnostics JSON.",
)
args = parser.parse_args(argv)

selected_inputs = [
Expand All @@ -267,6 +537,8 @@ def main(argv: list[str] | None = None) -> int:
parser.error(
"provide exactly one of an H5 path, --columns-json, or --entity-tables."
)
if args.support_baseline and not args.h5path:
parser.error("--support-baseline requires an H5 candidate path.")

contract = load_contract(Path(args.contract))
required = set(contract["required"])
Expand Down Expand Up @@ -294,16 +566,35 @@ def main(argv: list[str] | None = None) -> int:
optional=optional,
excluded=excluded,
)
support_diff = None
if args.support_baseline:
support_exempt = set(contract.get("support_exemptions", [])) | set(
args.support_exempt_column
)
support_diff = compute_support_diff(
Path(args.h5path),
baseline_h5=Path(args.support_baseline),
period=int(args.period),
required_columns=required,
exempt_columns=support_exempt,
)
if args.support_diagnostics_json:
output_path = Path(args.support_diagnostics_json)
output_path.parent.mkdir(parents=True, exist_ok=True)
output_path.write_text(
json.dumps(support_diff_to_dict(support_diff), indent=2) + "\n"
)
print(
_format_report(
diff,
source=source,
n_present=len(present),
n_required=len(required),
n_forbidden=len(forbidden),
support_diff=support_diff,
)
)
return 0 if diff.ok else 1
return 0 if diff.ok and (support_diff is None or support_diff.ok) else 1


if __name__ == "__main__":
Expand Down
Loading
Loading