Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions docs/api.md
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,14 @@
:undoc-members:
```

## Stage runtime writer

```{eval-rst}
.. automodule:: microplex_us.pipelines.stage_runtime
:members:
:undoc-members:
```

## Artifact helpers

```{eval-rst}
Expand Down
37 changes: 32 additions & 5 deletions docs/stage-contracts.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,35 @@ file is the machine-readable saved-run overlay for the stage taxonomy. It record
canonical stages, status for the current run, artifact paths, diagnostics owned
by each stage, and the current resume posture.

`status` is the saved-artifact readiness view: it reports whether the artifacts
for that stage are ready, incomplete, missing, metadata-only, or deferred.
`lifecycleStatus` is the runtime view: it reports whether the stage is pending,
running, complete, failed, or deferred in the current run. Keeping these fields
separate lets a failed run say both "Stage 5 failed" and "Stage 4's saved
artifact is ready for manual replay."

Each saved bundle also includes typed per-stage output manifests at
`stage_artifacts/manifests/<stage_id>.json`. These manifests are written through
`USStageRunWriter`, which validates each stage as a whole instead of updating
individual manifest keys directly. The manifest files live outside each stage's
payload directory so they do not change the content hash of reloadable stage
artifacts.

Live runs can use `USStageRuntimeWriter` to write those same per-stage manifests
incrementally. The writer exposes `start_stage`, `update`, `record_output`,
`record_diagnostic`, `complete_stage`, `fail_stage`, `defer_stage`, and
`finalize_from_artifact_manifest`. A stage can start only after the immediately
previous stage is complete unless explicit stage-input overrides are enabled.
The canonical multi-source versioned build path reserves the versioned artifact
directory before loading sources, writes Stage 1 immediately, writes Stage 2 as
source frames load, then finalizes all stage manifests against the completed
artifact manifest during save.

Other versioned convenience entry points still reconstruct their stage manifests
from the completed saved artifact manifest. They expose the same saved-run
contract files, but they do not yet produce live per-stage lifecycle updates
while the build is running.

The registry exposes two seam layers:

- `inputs` and `outputs` are structured stage resources. They identify artifact,
Expand Down Expand Up @@ -50,19 +72,24 @@ boundary artifacts where the pipeline already has stable outputs:

- Stage 4: `stage_artifacts/04_seed_scaffold/scaffold_seed_data.parquet`
- Stage 5: `seed_data.parquet` and `synthetic_data.parquet`
- Stage 6: `stage_artifacts/06_policyengine_entities/`
- Stage 6: `stage_artifacts/06_policyengine_entities/` for the pre-calibration
PolicyEngine entity-table checkpoint
- Stage 7: `calibrated_data.parquet`, `targets.json`, and
`stage_artifacts/07_calibration/calibration_summary.json`
`stage_artifacts/07_calibration/calibration_summary.json`, plus the calibrated
PolicyEngine entity-table bundle used by dataset export
- Stage 8: `policyengine_us.h5`
- Stage 9: validation and benchmark evidence artifacts

The Stage 4 artifact is the scaffold-projected seed before donor integration. It
is a diagnostic and manual replay boundary, not an automatic conditional resume
point yet.

Conditional execution is intentionally not implemented yet. The stage manifest
and artifacts are designed to make that possible later without changing the
saved-run contract again.
Conditional execution is intentionally narrow in this implementation. Stage 9
validation and benchmarking can be replayed against an existing complete Stage 8
dataset through `microplex-us-stage9-replay`. Earlier-stage conditional source
loading, donor integration, synthesis, calibration, and automatic graph
scheduling remain future work. The stage manifest and artifacts are designed to
make those routes possible later without changing the saved-run contract again.

## Artifact inventory and readiness

Expand Down
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ microplex-us-pe-native-target-diagnostics = "microplex_us.pipelines.pe_native_sc
microplex-us-r2-archive-artifact = "microplex_us.pipelines.r2_artifacts:main"
microplex-us-reweight-cd-age-targets = "microplex_us.pipelines.cd_age_reweighting:main"
microplex-us-score-pe-native-loss = "microplex_us.pipelines.pe_native_scores:main"
microplex-us-stage9-replay = "microplex_us.pipelines.stage9_replay:main"
microplex-us-write-transparency-sidecars = "microplex_us.pipelines.transparency_sidecars:main"
microplex-us-version-bump-benchmark = "microplex_us.pipelines.version_benchmark:main"

Expand Down
31 changes: 22 additions & 9 deletions src/microplex_us/pipelines/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -295,9 +295,12 @@ def _exports(module: str, names: tuple[str, ...]) -> dict[str, str]:
(
"USDataFlowStageSummary",
"USStageArtifactRecord",
"USStageFailureRecord",
"USStageLifecycleStatus",
"USStageManifest",
"USStageMetric",
"USStageRecord",
"USStageRuntimeEventRecord",
"USStageStatus",
"USValidationEvidenceManifest",
"USValidationEvidenceRecord",
Expand Down Expand Up @@ -338,6 +341,8 @@ def _exports(module: str, names: tuple[str, ...]) -> dict[str, str]:
"USSourceLoadingOutputs",
"USSourcePlanningOutputs",
"USStageInputOverride",
"USStageInputValidationSettings",
"USStageInputValidator",
"USStageOutputManifest",
"USStageRunWriter",
"USValidationBenchmarkingOutputs",
Expand All @@ -347,6 +352,20 @@ def _exports(module: str, names: tuple[str, ...]) -> dict[str, str]:
"write_us_stage_run_manifests_from_artifact_manifest",
),
),
**_exports(
"microplex_us.pipelines.stage_runtime",
(
"RuntimeUpdateSection",
"USStageRuntimeWriter",
),
),
**_exports(
"microplex_us.pipelines.stage9_replay",
(
"USStage9ReplayResult",
"replay_us_stage9_validation_benchmarking",
),
),
**_exports(
"microplex_us.pipelines.summarize_pe_native_family_drilldown",
(
Expand All @@ -356,21 +375,15 @@ def _exports(module: str, names: tuple[str, ...]) -> dict[str, str]:
),
**_exports(
"microplex_us.pipelines.summarize_pe_native_regressions",
(
"summarize_us_pe_native_regressions",
),
("summarize_us_pe_native_regressions",),
),
**_exports(
"microplex_us.pipelines.summarize_policyengine_oracle_regressions",
(
"summarize_us_policyengine_oracle_regressions",
),
("summarize_us_policyengine_oracle_regressions",),
),
**_exports(
"microplex_us.pipelines.summarize_policyengine_oracle_target_drilldown",
(
"summarize_us_policyengine_oracle_target_drilldown",
),
("summarize_us_policyengine_oracle_target_drilldown",),
),
**_exports(
"microplex_us.pipelines.source_stage_parity",
Expand Down
87 changes: 87 additions & 0 deletions src/microplex_us/pipelines/artifact_dataset_assembly.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
"""Dataset-assembly artifact helpers for saved US Microplex bundles."""

from __future__ import annotations

from pathlib import Path
from typing import Any

import pandas as pd

from microplex_us.capital_gains_lots import (
SyntheticCapitalGainsLotConfig,
generate_synthetic_capital_gains_lots,
synthetic_capital_gains_lot_metadata,
validate_capital_gains_lot_anchors,
write_capital_gains_lots_sqlite,
)
from microplex_us.pipelines.stage_contracts import (
resolve_us_stage_artifact_contract_path,
)
from microplex_us.pipelines.us import USMicroplexBuildResult


def _maybe_write_capital_gains_lot_artifact(
result: USMicroplexBuildResult,
output_dir: Path,
) -> tuple[Path | None, dict[str, Any] | None]:
if (
not result.config.capital_gains_lots_enabled
or result.policyengine_tables is None
):
return None, None
persons = result.policyengine_tables.persons
gain_column = "long_term_capital_gains_before_response"
if gain_column not in persons.columns:
return None, {
"enabled": True,
"written": False,
"reason": f"missing {gain_column}",
}

period = result.config.policyengine_dataset_year or 2024
lot_config = SyntheticCapitalGainsLotConfig(
random_seed=(
result.config.capital_gains_lots_random_seed
if result.config.capital_gains_lots_random_seed is not None
else result.config.random_seed
),
max_lots_per_person=int(result.config.capital_gains_lots_max_lots_per_person),
)
lots = generate_synthetic_capital_gains_lots(
persons,
period=period,
config=lot_config,
gain_column=gain_column,
)
validate_capital_gains_lot_anchors(persons, lots, gain_column=gain_column)
metadata = synthetic_capital_gains_lot_metadata(
lot_config,
period=period,
source_gain_column=gain_column,
)
nonzero_people = int(
pd.to_numeric(persons[gain_column], errors="coerce").fillna(0.0).ne(0.0).sum()
)
metadata.update(
{
"person_rows": int(len(persons)),
"nonzero_person_rows": nonzero_people,
"lot_rows": int(len(lots)),
}
)
path = resolve_us_stage_artifact_contract_path(
output_dir,
"08_dataset_assembly",
"capital_gains_lots",
)
write_capital_gains_lots_sqlite(lots, path, metadata=metadata)
return path, {
"enabled": True,
"written": True,
"path": path.name,
"person_rows": int(len(persons)),
"nonzero_person_rows": nonzero_people,
"lot_rows": int(len(lots)),
"source_gain_column": gain_column,
"max_lots_per_person": int(lot_config.max_lots_per_person),
}
118 changes: 118 additions & 0 deletions src/microplex_us/pipelines/artifact_io.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
"""Low-level filesystem helpers for saved US Microplex artifacts."""

from __future__ import annotations

import json
from collections.abc import Mapping
from pathlib import Path
from typing import Any

import pandas as pd

from microplex_us.pipelines.stage_contracts import (
get_us_stage_artifact_contract,
resolve_us_stage_artifact_contract_path,
)
from microplex_us.pipelines.stage_run import USArtifactRef, USDiagnosticOutput


def _stage_artifact_ref(
artifact_root: str | Path,
stage_id: str,
artifact_key: str,
*,
assume_exists: bool = False,
) -> USArtifactRef:
contract = get_us_stage_artifact_contract(stage_id, artifact_key)
return USArtifactRef(
key=artifact_key,
path=resolve_us_stage_artifact_contract_path(
artifact_root,
stage_id,
artifact_key,
),
format=contract.format,
required=contract.required,
resume_role=contract.resume_role,
assume_exists=assume_exists,
)


def _stage_diagnostics(
stage_id: str,
summary: Mapping[str, Any],
) -> dict[str, USDiagnosticOutput]:
return {
"stage_summary": USDiagnosticOutput(
key="stage_summary",
description=f"Runtime diagnostic summary for {stage_id}.",
summary=dict(summary),
)
}


def _write_parquet_unless_live_artifact_exists(
path: Path,
frame: pd.DataFrame,
*,
live_artifact: bool,
) -> None:
if live_artifact and path.exists():
return
path.parent.mkdir(parents=True, exist_ok=True)
frame.to_parquet(path, index=False)


def _write_json_unless_live_artifact_exists(
path: Path,
payload: Mapping[str, Any],
*,
live_artifact: bool,
) -> None:
if live_artifact and path.exists():
return
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(json.dumps(payload, indent=2, sort_keys=True))


def _resolve_saved_artifact_file(
artifact_root: Path,
manifest: dict[str, Any],
artifact_key: str,
) -> Path:
artifacts = dict(manifest.get("artifacts", {}))
filename = artifacts.get(artifact_key)
if not filename:
filename = (
"targets.json" if artifact_key == "targets" else f"{artifact_key}.parquet"
)
path = Path(filename)
if not path.is_absolute():
path = artifact_root / path
if not path.exists():
raise FileNotFoundError(f"Saved artifact file not found: {path}")
return path


def _resolve_optional_saved_artifact_file(
artifact_root: Path,
manifest: dict[str, Any],
artifact_key: str,
) -> Path | None:
artifacts = dict(manifest.get("artifacts", {}))
filename = artifacts.get(artifact_key)
if not filename:
return None
path = Path(str(filename))
if not path.is_absolute():
path = artifact_root / path
if not path.exists():
raise FileNotFoundError(f"Saved optional artifact file not found: {path}")
return path


def _write_json_atomically(path: Path, payload: dict[str, Any]) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
temp_path = path.with_name(f".{path.name}.tmp")
temp_path.write_text(json.dumps(payload, indent=2, sort_keys=True))
temp_path.replace(path)
Loading
Loading