diff --git a/src/microplex_us/pipelines/ecps_replacement_comparison.py b/src/microplex_us/pipelines/ecps_replacement_comparison.py index 0183652..4aa3eac 100644 --- a/src/microplex_us/pipelines/ecps_replacement_comparison.py +++ b/src/microplex_us/pipelines/ecps_replacement_comparison.py @@ -14,6 +14,14 @@ import h5py import numpy as np +from microplex_us.pipelines.pe_native_loss import ( + classify_pe_native_target_family, + loss_arrays_from_inputs, + pe_native_huber_loss, + pe_native_huber_loss_terms, + pe_native_relative_error, + subset_loss_arrays, +) from microplex_us.pipelines.pe_native_optimization import ( _PE_NATIVE_BROAD_MATRIX_SCRIPT, optimize_pe_native_loss_weights, @@ -29,9 +37,6 @@ from microplex_us.pipelines.performance import ( _write_matched_policyengine_us_baseline_dataset, ) -from microplex_us.pipelines.summarize_pe_native_family_drilldown import ( - classify_pe_native_target_family, -) _PROTECTED_TARGET_PATTERNS: dict[str, tuple[str, ...]] = { "ssi": ("ssi", "supplemental_security_income"), @@ -66,6 +71,7 @@ def build_sound_ecps_replacement_comparison( policyengine_us_data_repo: str | Path | None = None, policyengine_us_data_python: str | Path | None = None, skip_tax_expenditure_targets: bool = False, + target_scope: str = "all", exact_rescore: bool = False, force: bool = False, ) -> dict[str, Any]: @@ -132,7 +138,17 @@ def build_sound_ecps_replacement_comparison( policyengine_us_data_python=policyengine_us_data_python, skip_tax_expenditure_targets=skip_tax_expenditure_targets, ) + candidate_inputs = _filter_loss_inputs_by_scope( + candidate_inputs, + target_scope=target_scope, + ) + baseline_inputs = _filter_loss_inputs_by_scope( + baseline_inputs, + target_scope=target_scope, + ) target_names = _validate_common_targets(candidate_inputs, baseline_inputs) + if exact_rescore and target_scope != "all": + raise ValueError("exact_rescore is only supported for target_scope='all'") holdout_mask = _build_holdout_target_mask( target_names, fraction=holdout_target_fraction, @@ -140,7 +156,7 @@ def build_sound_ecps_replacement_comparison( ) refit_config = { - "method": "deterministic_pe_native_projected_gradient", + "method": "monotone_accelerated_projected_gradient", "lambda_l0": 0.0, "lambda_l2": 0.0, "use_gates": False, @@ -287,6 +303,7 @@ def build_sound_ecps_replacement_comparison( "ecps_refit_recovery_passed": ecps_refit_recovery_passed, "holdout_target_fraction": float(holdout_target_fraction), "holdout_targets": int(holdout_mask.sum()), + "target_scope_filter": target_scope, "protected_family_losses": protected_family_losses, "target_diagnostics": target_diagnostics["summary"], "support_audit": support_audit_summary, @@ -313,6 +330,7 @@ def build_sound_ecps_replacement_comparison( "ecps_refit_recovery_passed": ecps_refit_recovery_passed, "holdout_target_fraction": float(holdout_target_fraction), "holdout_targets": int(holdout_mask.sum()), + "target_scope_filter": target_scope, "protected_family_losses": protected_family_losses, }, "entity_structure": { @@ -350,6 +368,7 @@ def build_sound_ecps_replacement_comparison( "target_split": { "holdout_target_fraction": float(holdout_target_fraction), "holdout_target_seed": int(holdout_target_seed), + "target_scope_filter": target_scope, "train_targets": int((~holdout_mask).sum()), "holdout_targets": int(holdout_mask.sum()), "holdout_target_names": [ @@ -646,12 +665,108 @@ def _extract_pe_native_loss_inputs( prefix.with_suffix(".target_unscaled.npy") ), "scaling": _load_optional_array(prefix.with_suffix(".scaling.npy")), + "loss_denominator": _load_optional_array( + prefix.with_suffix(".loss_denominator.npy") + ), + "loss_target_weight": _load_optional_array( + prefix.with_suffix(".loss_target_weight.npy") + ), + "loss_bucket": _load_optional_array( + prefix.with_suffix(".loss_bucket.npy"), + allow_pickle=True, + ), + "loss_unit": _load_optional_array( + prefix.with_suffix(".loss_unit.npy"), + allow_pickle=True, + ), + "loss_scope": _load_optional_array( + prefix.with_suffix(".loss_scope.npy"), + allow_pickle=True, + ), + "loss_family": _load_optional_array( + prefix.with_suffix(".loss_family.npy"), + allow_pickle=True, + ), + "loss_epsilon": _load_optional_array( + prefix.with_suffix(".loss_epsilon.npy") + ), "metadata": json.loads(prefix.with_suffix(".meta.json").read_text()), } -def _load_optional_array(path: Path) -> np.ndarray | None: - return np.load(path) if path.exists() else None +def _load_optional_array( + path: Path, *, allow_pickle: bool = False +) -> np.ndarray | None: + return np.load(path, allow_pickle=allow_pickle) if path.exists() else None + + +def _filter_loss_inputs_by_scope( + loss_inputs: dict[str, Any], + *, + target_scope: str, +) -> dict[str, Any]: + if target_scope not in {"all", "national", "state"}: + raise ValueError("target_scope must be one of all, national, or state") + if target_scope == "all": + return loss_inputs + + metadata = dict(loss_inputs["metadata"]) + target_names = np.asarray(metadata.get("target_names", ()), dtype=object) + if target_names.size == 0: + raise ValueError("PE-native loss inputs do not include target names") + + scope = loss_inputs.get("loss_scope") + if scope is not None: + keep_mask = np.asarray(scope, dtype=object) == target_scope + elif target_scope == "national": + keep_mask = np.asarray( + [str(name).startswith("nation/") for name in target_names], + dtype=bool, + ) + else: + keep_mask = np.asarray( + [not str(name).startswith("nation/") for name in target_names], + dtype=bool, + ) + if not bool(keep_mask.any()): + raise ValueError(f"target_scope={target_scope!r} selected no targets") + + filtered = dict(loss_inputs) + filtered["scaled_matrix"] = np.asarray(loss_inputs["scaled_matrix"])[:, keep_mask] + for key in ( + "scaled_target", + "unscaled_target", + "scaling", + "loss_denominator", + "loss_target_weight", + "loss_bucket", + "loss_unit", + "loss_scope", + "loss_family", + "loss_epsilon", + ): + value = loss_inputs.get(key) + if value is not None: + filtered[key] = np.asarray(value)[keep_mask] + + filtered_names = target_names[keep_mask].tolist() + metadata["target_names"] = filtered_names + metadata["target_scope_filter"] = target_scope + metadata["n_targets_scope_filtered_from"] = int(target_names.size) + metadata["n_targets_kept"] = int(len(filtered_names)) + metadata["n_national_targets"] = int( + sum(str(name).startswith("nation/") for name in filtered_names) + ) + metadata["n_state_targets"] = int( + len(filtered_names) - metadata["n_national_targets"] + ) + sidecar_rows = metadata.get("target_loss_metadata") + if isinstance(sidecar_rows, list) and len(sidecar_rows) == target_names.size: + metadata["target_loss_metadata"] = [ + row for row, keep in zip(sidecar_rows, keep_mask, strict=True) if keep + ] + filtered["metadata"] = metadata + return filtered def _validate_common_targets( @@ -666,6 +781,24 @@ def _validate_common_targets( baseline_target = np.asarray(baseline_inputs["scaled_target"], dtype=np.float64) if not np.allclose(candidate_target, baseline_target): raise ValueError("candidate and baseline PE-native scaled targets differ") + for key in ( + "loss_denominator", + "loss_target_weight", + "loss_epsilon", + ): + left = candidate_inputs.get(key) + right = baseline_inputs.get(key) + if left is None and right is None: + continue + if left is None or right is None or not np.allclose(left, right): + raise ValueError(f"candidate and baseline PE-native {key} differ") + for key in ("loss_bucket", "loss_unit", "loss_scope", "loss_family"): + left = candidate_inputs.get(key) + right = baseline_inputs.get(key) + if left is None and right is None: + continue + if left is None or right is None or not np.array_equal(left, right): + raise ValueError(f"candidate and baseline PE-native {key} differ") return candidate_names @@ -709,7 +842,16 @@ def _fit_dense_refit( matrix = np.asarray(loss_inputs["scaled_matrix"], dtype=np.float64) target = np.asarray(loss_inputs["scaled_target"], dtype=np.float64) initial_weights = np.asarray(loss_inputs["initial_weights"], dtype=np.float64) + loss_arrays = loss_arrays_from_inputs(loss_inputs) train_mask = ~holdout_mask + train_loss_arrays = ( + subset_loss_arrays(loss_arrays, train_mask) if loss_arrays is not None else None + ) + holdout_loss_arrays = ( + subset_loss_arrays(loss_arrays, holdout_mask) + if loss_arrays is not None + else None + ) loss_curve: list[dict[str, float | int]] = [] def record_loss_curve( @@ -721,16 +863,23 @@ def record_loss_curve( { "iteration": int(iteration), "objective_train_loss": float(objective_loss), - "full_loss": _objective(matrix, target, weights), + "full_loss": _objective( + matrix, + target, + weights, + loss_arrays=loss_arrays, + ), "train_loss": _objective( matrix[:, train_mask], target[train_mask], weights, + loss_arrays=train_loss_arrays, ), "holdout_loss": _objective( matrix[:, holdout_mask], target[holdout_mask], weights, + loss_arrays=holdout_loss_arrays, ), "weight_sum": float(weights.sum()), "positive_household_count": int((weights > 1e-9).sum()), @@ -741,6 +890,7 @@ def record_loss_curve( scaled_matrix=matrix[:, train_mask], scaled_target=target[train_mask], initial_weights=initial_weights, + loss_arrays=train_loss_arrays, budget=None, max_iter=max_iter, l2_penalty=0.0, @@ -756,27 +906,41 @@ def record_loss_curve( return { "input_dataset": str(input_dataset_path.resolve()), "output_dataset": str(output_dataset_path.resolve()), - "initial_full_loss": _objective(matrix, target, initial_weights), - "optimized_full_loss": _objective(matrix, target, optimized_weights), + "initial_full_loss": _objective( + matrix, + target, + initial_weights, + loss_arrays=loss_arrays, + ), + "optimized_full_loss": _objective( + matrix, + target, + optimized_weights, + loss_arrays=loss_arrays, + ), "initial_train_loss": _objective( matrix[:, train_mask], target[train_mask], initial_weights, + loss_arrays=train_loss_arrays, ), "optimized_train_loss": _objective( matrix[:, train_mask], target[train_mask], optimized_weights, + loss_arrays=train_loss_arrays, ), "initial_holdout_loss": _objective( matrix[:, holdout_mask], target[holdout_mask], initial_weights, + loss_arrays=holdout_loss_arrays, ), "optimized_holdout_loss": _objective( matrix[:, holdout_mask], target[holdout_mask], optimized_weights, + loss_arrays=holdout_loss_arrays, ), "initial_weight_sum": float(initial_weights.sum()), "optimized_weight_sum": float(optimized_weights.sum()), @@ -788,8 +952,17 @@ def record_loss_curve( } -def _objective(matrix: np.ndarray, target: np.ndarray, weights: np.ndarray) -> float: - residual = matrix.T @ weights - target +def _objective( + matrix: np.ndarray, + target: np.ndarray, + weights: np.ndarray, + *, + loss_arrays: Any | None = None, +) -> float: + estimate = matrix.T @ weights + if loss_arrays is not None: + return pe_native_huber_loss(estimate, loss_arrays) + residual = estimate - target return float(np.dot(residual, residual)) @@ -850,6 +1023,12 @@ def _target_loss_diagnostics( raise ValueError("candidate and baseline target diagnostic scales differ") if not np.allclose(candidate_values["target"], baseline_values["target"]): raise ValueError("candidate and baseline target diagnostic values differ") + for key in ("loss_denominator", "loss_target_weight"): + if not np.allclose(candidate_values[key], baseline_values[key]): + raise ValueError(f"candidate and baseline target diagnostic {key} differ") + for key in ("loss_bucket", "loss_unit", "loss_scope"): + if not np.array_equal(candidate_values[key], baseline_values[key]): + raise ValueError(f"candidate and baseline target diagnostic {key} differ") if candidate_terms.shape != baseline_terms.shape: raise ValueError("candidate and baseline target loss term shapes differ") if len(target_names) != candidate_terms.shape[0]: @@ -861,6 +1040,8 @@ def _target_loss_diagnostics( candidate_wins = 0 baseline_wins = 0 ties = 0 + candidate_loss_total = float(candidate_terms.sum()) + baseline_loss_total = float(baseline_terms.sum()) for index, target_name in enumerate(target_names): candidate_loss = float(candidate_terms[index]) baseline_loss = float(baseline_terms[index]) @@ -879,6 +1060,14 @@ def _target_loss_diagnostics( "target_index": int(index), "target_name": str(target_name), "family": classify_pe_native_target_family(target_name), + "loss_scope": str(candidate_values["loss_scope"][index]), + "loss_unit": str(candidate_values["loss_unit"][index]), + "loss_bucket": str(candidate_values["loss_bucket"][index]), + "loss_denominator": float(candidate_values["loss_denominator"][index]), + "loss_target_weight": float( + candidate_values["loss_target_weight"][index] + ), + "loss_epsilon": float(candidate_values["loss_epsilon"][index]), "split": "holdout" if bool(holdout_mask[index]) else "train", "value_scale": str(candidate_values["value_scale"][index]), "target_value": float(candidate_values["target"][index]), @@ -894,6 +1083,16 @@ def _target_loss_diagnostics( ), "candidate_loss_term": candidate_loss, "baseline_loss_term": baseline_loss, + "candidate_loss_share": ( + candidate_loss / candidate_loss_total + if candidate_loss_total > 0.0 + else 0.0 + ), + "baseline_loss_share": ( + baseline_loss / baseline_loss_total + if baseline_loss_total > 0.0 + else 0.0 + ), "loss_delta": float(loss_delta), "candidate_abs_scaled_error": float(np.sqrt(candidate_loss)), "baseline_abs_scaled_error": float(np.sqrt(baseline_loss)), @@ -910,9 +1109,19 @@ def _target_loss_diagnostics( improvements = sorted(rows, key=lambda row: float(row["loss_delta"]))[:top_k] summary = { "n_targets": int(len(rows)), - "candidate_loss": float(candidate_terms.sum()), - "baseline_loss": float(baseline_terms.sum()), - "loss_delta": float(candidate_terms.sum() - baseline_terms.sum()), + "candidate_loss": candidate_loss_total, + "baseline_loss": baseline_loss_total, + "loss_delta": float(candidate_loss_total - baseline_loss_total), + "candidate_max_single_target_loss_share": ( + float(candidate_terms.max() / candidate_loss_total) + if candidate_loss_total > 0.0 and candidate_terms.size + else 0.0 + ), + "baseline_max_single_target_loss_share": ( + float(baseline_terms.max() / baseline_loss_total) + if baseline_loss_total > 0.0 and baseline_terms.size + else 0.0 + ), "candidate_wins": int(candidate_wins), "baseline_wins": int(baseline_wins), "ties": int(ties), @@ -925,6 +1134,7 @@ def _target_loss_diagnostics( "metric": "sound_ecps_target_loss_diagnostics", "summary": summary, "family_breakdown": _target_family_breakdown(rows, len(rows)), + "bucket_breakdown": _target_bucket_breakdown(rows), "top_regressions": regressions, "top_improvements": improvements, "targets": rows, @@ -946,6 +1156,12 @@ def _refit_matrix_score_summary( baseline_msre = _diagnostic_unweighted_msre(target_diagnostics, "baseline") candidate_metadata = dict(candidate_inputs.get("metadata") or {}) baseline_metadata = dict(baseline_inputs.get("metadata") or {}) + loss_metric = str( + candidate_metadata.get( + "loss_metric", + baseline_metadata.get("loss_metric", "enhanced_cps_native_loss"), + ) + ) n_targets_kept = int( candidate_metadata.get( "n_targets_kept", @@ -953,6 +1169,11 @@ def _refit_matrix_score_summary( ) ) summary: dict[str, Any] = { + "loss_metric": loss_metric, + "loss_config": candidate_metadata.get( + "loss_config", + baseline_metadata.get("loss_config"), + ), "candidate_enhanced_cps_native_loss": candidate_loss, "baseline_enhanced_cps_native_loss": baseline_loss, "enhanced_cps_native_loss_delta": candidate_loss - baseline_loss, @@ -962,6 +1183,12 @@ def _refit_matrix_score_summary( "unweighted_msre_delta": candidate_msre - baseline_msre, "n_targets_kept": n_targets_kept, "score_source": "refit_loss_matrix", + "candidate_max_single_target_loss_share": target_diagnostics["summary"].get( + "candidate_max_single_target_loss_share" + ), + "baseline_max_single_target_loss_share": target_diagnostics["summary"].get( + "baseline_max_single_target_loss_share" + ), } for key in ( "n_targets_total", @@ -987,7 +1214,7 @@ def _refit_matrix_score_payload( ) -> dict[str, Any]: family_breakdown = list(target_diagnostics.get("family_breakdown") or ()) return { - "metric": "enhanced_cps_native_loss", + "metric": str(summary.get("loss_metric") or "enhanced_cps_native_loss"), "score_source": "refit_loss_matrix", "period": int(period), "candidate_dataset": str(candidate_dataset_path.resolve()), @@ -1023,6 +1250,25 @@ def _target_value_diagnostics( matrix = np.asarray(loss_inputs["scaled_matrix"], dtype=np.float64) scaled_target = np.asarray(loss_inputs["scaled_target"], dtype=np.float64) scaled_estimate = matrix.T @ weights + loss_arrays = loss_arrays_from_inputs(loss_inputs) + if loss_arrays is not None: + target = loss_arrays.target_values.astype(np.float64, copy=True) + estimate = scaled_estimate.astype(np.float64, copy=True) + error = estimate - loss_arrays.objective_target + return { + "value_scale": np.full(target.shape, "native", dtype=object), + "target": target, + "estimate": estimate, + "error": error, + "relative_error": pe_native_relative_error(estimate, loss_arrays), + "loss_denominator": loss_arrays.denominator, + "loss_target_weight": loss_arrays.target_weight, + "loss_bucket": loss_arrays.bucket_keys, + "loss_unit": loss_arrays.unit_keys, + "loss_scope": loss_arrays.scope_keys, + "loss_family": loss_arrays.family_keys, + "loss_epsilon": loss_arrays.epsilon, + } unscaled_target = loss_inputs.get("unscaled_target") scaling = loss_inputs.get("scaling") target = scaled_target.astype(np.float64, copy=True) @@ -1051,6 +1297,13 @@ def _target_value_diagnostics( "estimate": estimate, "error": error, "relative_error": relative_error, + "loss_denominator": np.abs(target) + 1.0, + "loss_target_weight": np.ones(target.shape, dtype=np.float64), + "loss_bucket": np.full(target.shape, "legacy", dtype=object), + "loss_unit": np.full(target.shape, "legacy", dtype=object), + "loss_scope": np.full(target.shape, "legacy", dtype=object), + "loss_family": np.full(target.shape, "legacy", dtype=object), + "loss_epsilon": np.ones(target.shape, dtype=np.float64), } @@ -1090,6 +1343,37 @@ def _target_family_breakdown( ) +def _target_bucket_breakdown(target_rows: list[dict[str, Any]]) -> list[dict[str, Any]]: + buckets: dict[str, list[dict[str, Any]]] = {} + for row in target_rows: + buckets.setdefault(str(row["loss_bucket"]), []).append(row) + breakdown = [] + for bucket, rows in sorted(buckets.items()): + candidate_loss = sum(float(row["candidate_loss_term"]) for row in rows) + baseline_loss = sum(float(row["baseline_loss_term"]) for row in rows) + breakdown.append( + { + "bucket": bucket, + "scope": str(rows[0]["loss_scope"]), + "unit": str(rows[0]["loss_unit"]), + "n_targets": int(len(rows)), + "train_targets": int(sum(1 for row in rows if row["split"] == "train")), + "holdout_targets": int( + sum(1 for row in rows if row["split"] == "holdout") + ), + "candidate_loss_contribution": float(candidate_loss), + "baseline_loss_contribution": float(baseline_loss), + "loss_delta": float(candidate_loss - baseline_loss), + "candidate_target_weight_sum": float( + sum(float(row["loss_target_weight"]) for row in rows) + ), + } + ) + return sorted( + breakdown, key=lambda row: abs(float(row["loss_delta"])), reverse=True + ) + + def _support_audit_summary(support_audit: dict[str, Any]) -> dict[str, Any]: comparisons = dict(support_audit.get("comparisons") or {}) critical_rows = list(comparisons.get("critical_input_support") or ()) @@ -1143,7 +1427,11 @@ def _sort_rows_by_abs_delta( def _loss_terms(loss_inputs: dict[str, Any], weights: np.ndarray) -> np.ndarray: matrix = np.asarray(loss_inputs["scaled_matrix"], dtype=np.float64) target = np.asarray(loss_inputs["scaled_target"], dtype=np.float64) - residual = matrix.T @ weights - target + estimate = matrix.T @ weights + loss_arrays = loss_arrays_from_inputs(loss_inputs) + if loss_arrays is not None: + return pe_native_huber_loss_terms(estimate, loss_arrays) + residual = estimate - target return np.square(residual) @@ -1238,6 +1526,12 @@ def main(argv: list[str] | None = None) -> int: parser.add_argument("--policyengine-us-data-repo") parser.add_argument("--policyengine-us-data-python") parser.add_argument("--skip-tax-expenditure-targets", action="store_true") + parser.add_argument( + "--target-scope", + choices=("all", "national", "state"), + default="all", + help="Restrict the PE-native refit/scoring surface by target scope.", + ) parser.add_argument( "--exact-rescore", action="store_true", @@ -1278,6 +1572,7 @@ def main(argv: list[str] | None = None) -> int: policyengine_us_data_repo=args.policyengine_us_data_repo, policyengine_us_data_python=args.policyengine_us_data_python, skip_tax_expenditure_targets=args.skip_tax_expenditure_targets, + target_scope=args.target_scope, exact_rescore=args.exact_rescore, force=args.force, ) diff --git a/src/microplex_us/pipelines/pe_native_calibration_benchmark.py b/src/microplex_us/pipelines/pe_native_calibration_benchmark.py index af4a6f7..4290ea2 100644 --- a/src/microplex_us/pipelines/pe_native_calibration_benchmark.py +++ b/src/microplex_us/pipelines/pe_native_calibration_benchmark.py @@ -18,18 +18,21 @@ import h5py import numpy as np +from microplex_us.pipelines.pe_native_loss import loss_arrays_from_inputs from microplex_us.pipelines.pe_native_optimization import ( _PE_NATIVE_BROAD_MATRIX_SCRIPT, optimize_pe_native_loss_weights, rewrite_policyengine_us_dataset_weights, ) from microplex_us.pipelines.pe_native_scores import ( - _DEFAULT_PE_NATIVE_BASELINE_CACHE_DIR, _ENHANCED_CPS_BAD_TARGETS, build_policyengine_us_data_subprocess_env, compute_batch_us_pe_native_scores, resolve_policyengine_us_data_repo_root, - validate_policyengine_us_data_runtime, +) + +_DEFAULT_PE_NATIVE_BASELINE_CACHE_DIR = ( + Path.home() / ".cache" / "microplex-us" / "pe-native-baseline" ) @@ -260,6 +263,7 @@ def _extract_pe_native_loss_inputs( policyengine_us_data_repo: str | Path | None, policyengine_us_data_python: str | Path | None, skip_tax_expenditure_targets: bool, + target_scope_filter: str | None, ) -> dict[str, Any]: resolved_repo = resolve_policyengine_us_data_repo_root(policyengine_us_data_repo) env = build_policyengine_us_data_subprocess_env(resolved_repo) @@ -267,11 +271,6 @@ def _extract_pe_native_loss_inputs( command = [str(Path(policyengine_us_data_python).expanduser())] else: command = ["uv", "run", "--project", str(resolved_repo), "python"] - validate_policyengine_us_data_runtime( - command, - repo_root=resolved_repo, - env=env, - ) _log("extracting PE-native loss matrix") with TemporaryDirectory(prefix="microplex-us-pe-native-benchmark-") as temp_dir: prefix = Path(temp_dir) / "pe_native_matrix" @@ -287,6 +286,7 @@ def _extract_pe_native_loss_inputs( str(Path(input_dataset_path).expanduser().resolve()), "1" if skip_tax_expenditure_targets else "0", str(prefix), + target_scope_filter or "", ], cwd=resolved_repo, env=env, @@ -295,8 +295,10 @@ def _extract_pe_native_loss_inputs( check=False, ) if completed.returncode != 0: - detail = completed.stderr.strip() or completed.stdout.strip() or str( - completed.returncode + detail = ( + completed.stderr.strip() + or completed.stdout.strip() + or str(completed.returncode) ) raise RuntimeError(f"PE-native loss-matrix extraction failed: {detail}") _log(f"extracted PE-native loss matrix in {perf_counter() - started_at:.1f}s") @@ -304,6 +306,24 @@ def _extract_pe_native_loss_inputs( "scaled_matrix": np.load(prefix.with_suffix(".matrix.npy")), "scaled_target": np.load(prefix.with_suffix(".target.npy")), "initial_weights": np.load(prefix.with_suffix(".weights.npy")), + "unscaled_target": np.load(prefix.with_suffix(".target_unscaled.npy")), + "loss_denominator": np.load(prefix.with_suffix(".loss_denominator.npy")), + "loss_target_weight": np.load( + prefix.with_suffix(".loss_target_weight.npy") + ), + "loss_bucket": np.load( + prefix.with_suffix(".loss_bucket.npy"), allow_pickle=True + ), + "loss_unit": np.load( + prefix.with_suffix(".loss_unit.npy"), allow_pickle=True + ), + "loss_scope": np.load( + prefix.with_suffix(".loss_scope.npy"), allow_pickle=True + ), + "loss_family": np.load( + prefix.with_suffix(".loss_family.npy"), allow_pickle=True + ), + "loss_epsilon": np.load(prefix.with_suffix(".loss_epsilon.npy")), "metadata": json.loads(prefix.with_suffix(".meta.json").read_text()), } @@ -326,6 +346,7 @@ def build_policyengine_us_native_calibration_benchmark( batch_households: int | None = None, baseline_cache_dir: str | Path | None = _DEFAULT_PE_NATIVE_BASELINE_CACHE_DIR, skip_tax_expenditure_targets: bool = False, + target_scope_filter: str | None = None, force: bool = False, ) -> dict[str, Any]: """Run and score PE-native calibration variants against one baseline.""" @@ -369,6 +390,7 @@ def build_policyengine_us_native_calibration_benchmark( policyengine_us_data_repo=policyengine_us_data_repo, policyengine_us_data_python=policyengine_us_data_python, skip_tax_expenditure_targets=skip_tax_expenditure_targets, + target_scope_filter=target_scope_filter, ) if l2_penalties else None @@ -390,6 +412,7 @@ def build_policyengine_us_native_calibration_benchmark( scaled_matrix=loss_inputs["scaled_matrix"], scaled_target=loss_inputs["scaled_target"], initial_weights=loss_inputs["initial_weights"], + loss_arrays=loss_arrays_from_inputs(loss_inputs), budget=budget, max_iter=max_iter, l2_penalty=penalty, @@ -420,9 +443,7 @@ def build_policyengine_us_native_calibration_benchmark( "initial_weight_sum": float(summary["initial_weight_sum"]), "optimized_weight_sum": float(summary["optimized_weight_sum"]), "household_count": int(summary["household_count"]), - "positive_household_count": int( - summary["positive_household_count"] - ), + "positive_household_count": int(summary["positive_household_count"]), "budget": summary["budget"], "converged": bool(summary["converged"]), "iterations": int(summary["iterations"]), @@ -436,7 +457,12 @@ def build_policyengine_us_native_calibration_benchmark( "l2_penalty": penalty, "target_total_weight": resolved_target_total_weight, "target_total_weight_resolved_from": target_total_weight_resolved_from, + "optimizer_method": summary.get("method"), "step_size": summary.get("step_size"), + "initial_step_size": summary.get("initial_step_size"), + "line_search_backtracking_steps": summary.get( + "line_search_backtracking_steps" + ), "history_interval": summary.get("history_interval"), "loss_history": summary.get("loss_history", []), "reused_existing_output": False, @@ -479,9 +505,7 @@ def build_policyengine_us_native_calibration_benchmark( period=period, policyengine_us_data_repo=policyengine_us_data_repo, policyengine_us_data_python=policyengine_us_data_python, - batch_households=batch_households, - baseline_cache_dir=baseline_cache_dir, - skip_tax_expenditure_targets=skip_tax_expenditure_targets, + target_scope_filter=target_scope_filter, ) _log(f"scored variants in {perf_counter() - scoring_started_at:.1f}s") scores_by_dataset = { @@ -525,6 +549,7 @@ def build_policyengine_us_native_calibration_benchmark( "baseline_dataset": str(baseline_path), "output_dir": str(destination), "skip_tax_expenditure_targets": bool(skip_tax_expenditure_targets), + "target_scope_filter": target_scope_filter, "target_total_weight": resolved_target_total_weight, "target_total_weight_resolved_from": target_total_weight_resolved_from, "budget": None if budget is None else int(budget), @@ -534,11 +559,7 @@ def build_policyengine_us_native_calibration_benchmark( "baseline_enhanced_cps_native_loss": baseline_loss, "best_variant_label": ranked_rows[0]["label"] if ranked_rows else None, "best_variant_loss": ( - float( - ranked_rows[0]["score_summary"][ - "candidate_enhanced_cps_native_loss" - ] - ) + float(ranked_rows[0]["score_summary"]["candidate_enhanced_cps_native_loss"]) if ranked_rows else None ), @@ -633,6 +654,11 @@ def main(argv: list[str] | None = None) -> int: "--skip-tax-expenditure-targets", action="store_true", ) + parser.add_argument( + "--target-scope-filter", + choices=("national", "state"), + help="Restrict PE-native optimization/scoring to a target scope.", + ) parser.add_argument( "--force", action="store_true", @@ -664,12 +690,17 @@ def main(argv: list[str] | None = None) -> int: batch_households=args.batch_households, baseline_cache_dir=args.baseline_cache_dir or None, skip_tax_expenditure_targets=args.skip_tax_expenditure_targets, + target_scope_filter=args.target_scope_filter, force=args.force, ) print(str(written)) return 0 +if __name__ == "__main__": + raise SystemExit(main()) + + __all__ = [ "CalibrationBenchmarkVariant", "build_policyengine_us_native_calibration_benchmark", diff --git a/src/microplex_us/pipelines/pe_native_loss.py b/src/microplex_us/pipelines/pe_native_loss.py new file mode 100644 index 0000000..0bf81e5 --- /dev/null +++ b/src/microplex_us/pipelines/pe_native_loss.py @@ -0,0 +1,355 @@ +"""Shared PE-native robust loss helpers.""" + +from __future__ import annotations + +from dataclasses import dataclass +from typing import Any + +import numpy as np + +PE_NATIVE_ROBUST_LOSS_METRIC = "pe_native_bucketed_baseline_huber_v1" +DEFAULT_BASELINE_WEIGHT_BETA = 1.0 +DEFAULT_BUCKET_EPSILON_FRACTION = 0.02 +DEFAULT_HUBER_DELTA = 1.0 + + +@dataclass(frozen=True) +class PENativeLossArrays: + """Per-target constants for the robust PE-native loss.""" + + target_names: tuple[str, ...] + target_values: np.ndarray + objective_target: np.ndarray + denominator: np.ndarray + target_weight: np.ndarray + bucket_keys: np.ndarray + unit_keys: np.ndarray + scope_keys: np.ndarray + family_keys: np.ndarray + epsilon: np.ndarray + beta: float + huber_delta: float + epsilon_fraction: float + bucket_weight_mode: str + + def metadata(self) -> dict[str, Any]: + unique_buckets, bucket_counts = np.unique( + self.bucket_keys, + return_counts=True, + ) + return { + "loss_metric": PE_NATIVE_ROBUST_LOSS_METRIC, + "loss_config": { + "baseline_weight_beta": float(self.beta), + "bucket_epsilon_fraction": float(self.epsilon_fraction), + "huber_delta": float(self.huber_delta), + "bucket_key": "scope_x_unit", + "bucket_weight_mode": self.bucket_weight_mode, + "residual": "(estimate - target) / (abs(target) + eps_bucket)", + "penalty": "huber", + }, + "loss_buckets": { + str(bucket): int(count) + for bucket, count in zip(unique_buckets, bucket_counts, strict=True) + }, + } + + def sidecar_rows(self) -> list[dict[str, Any]]: + return [ + { + "target_index": int(index), + "target_name": str(name), + "scope": str(self.scope_keys[index]), + "unit": str(self.unit_keys[index]), + "family": str(self.family_keys[index]), + "bucket": str(self.bucket_keys[index]), + "target_value": float(self.target_values[index]), + "objective_target": float(self.objective_target[index]), + "denominator": float(self.denominator[index]), + "epsilon": float(self.epsilon[index]), + "target_weight": float(self.target_weight[index]), + } + for index, name in enumerate(self.target_names) + ] + + +def build_pe_native_loss_arrays( + target_names: list[str] | tuple[str, ...] | np.ndarray, + target_values: np.ndarray, + *, + beta: float = DEFAULT_BASELINE_WEIGHT_BETA, + epsilon_fraction: float = DEFAULT_BUCKET_EPSILON_FRACTION, + huber_delta: float = DEFAULT_HUBER_DELTA, + bucket_weight_mode: str = "equal_bucket", +) -> PENativeLossArrays: + """Build constants for the bucketed, baseline-weighted Huber loss.""" + + names = tuple(str(name) for name in target_names) + targets = np.asarray(target_values, dtype=np.float64) + if targets.ndim != 1: + raise ValueError("target_values must be 1D") + if len(names) != targets.shape[0]: + raise ValueError("target_names and target_values length mismatch") + if targets.size == 0: + raise ValueError("PE-native loss requires at least one target") + if beta < 0.0: + raise ValueError("baseline-weight beta must be nonnegative") + if epsilon_fraction < 0.0: + raise ValueError("bucket epsilon fraction must be nonnegative") + if huber_delta <= 0.0: + raise ValueError("Huber delta must be positive") + if bucket_weight_mode != "equal_bucket": + raise ValueError("Only equal_bucket weighting is implemented") + + scopes = np.asarray( + [infer_pe_native_target_scope(name) for name in names], dtype=object + ) + units = np.asarray( + [infer_pe_native_target_unit(name) for name in names], dtype=object + ) + families = np.asarray( + [classify_pe_native_target_family(name) for name in names], dtype=object + ) + buckets = np.asarray( + [f"{scope}:{unit}" for scope, unit in zip(scopes, units, strict=True)], + dtype=object, + ) + abs_targets = np.abs(targets) + epsilon = np.zeros_like(targets, dtype=np.float64) + target_weight = np.zeros_like(targets, dtype=np.float64) + unique_buckets = sorted({str(bucket) for bucket in buckets}) + bucket_budget = 1.0 / float(len(unique_buckets)) + for bucket in unique_buckets: + mask = buckets == bucket + bucket_targets = abs_targets[mask] + nonzero_targets = bucket_targets[bucket_targets > 0.0] + median_target = ( + float(np.median(nonzero_targets)) if nonzero_targets.size else 1.0 + ) + bucket_epsilon = max(float(epsilon_fraction) * median_target, 1e-12) + epsilon[mask] = bucket_epsilon + baseline_importance = np.power(bucket_targets + bucket_epsilon, beta) + total_importance = float(baseline_importance.sum()) + if total_importance <= 0.0 or not np.isfinite(total_importance): + target_weight[mask] = bucket_budget / float(mask.sum()) + else: + target_weight[mask] = bucket_budget * baseline_importance / total_importance + + denominator = abs_targets + epsilon + return PENativeLossArrays( + target_names=names, + target_values=targets, + objective_target=targets.astype(np.float64, copy=True), + denominator=denominator, + target_weight=target_weight, + bucket_keys=buckets, + unit_keys=units, + scope_keys=scopes, + family_keys=families, + epsilon=epsilon, + beta=float(beta), + huber_delta=float(huber_delta), + epsilon_fraction=float(epsilon_fraction), + bucket_weight_mode=bucket_weight_mode, + ) + + +def pe_native_huber_loss_terms( + estimate: np.ndarray, + loss_arrays: PENativeLossArrays, +) -> np.ndarray: + rel = pe_native_relative_error(estimate, loss_arrays) + return loss_arrays.target_weight * huber_value(rel, loss_arrays.huber_delta) + + +def pe_native_huber_loss( + estimate: np.ndarray, + loss_arrays: PENativeLossArrays, +) -> float: + return float(pe_native_huber_loss_terms(estimate, loss_arrays).sum()) + + +def pe_native_huber_gradient_factor( + estimate: np.ndarray, + loss_arrays: PENativeLossArrays, +) -> np.ndarray: + rel = pe_native_relative_error(estimate, loss_arrays) + return ( + loss_arrays.target_weight + * huber_derivative(rel, loss_arrays.huber_delta) + / loss_arrays.denominator + ) + + +def pe_native_relative_error( + estimate: np.ndarray, + loss_arrays: PENativeLossArrays, +) -> np.ndarray: + estimate_array = np.asarray(estimate, dtype=np.float64) + if estimate_array.shape != loss_arrays.objective_target.shape: + raise ValueError("estimate and target shapes differ") + return (estimate_array - loss_arrays.objective_target) / loss_arrays.denominator + + +def huber_value(values: np.ndarray, delta: float) -> np.ndarray: + values_array = np.asarray(values, dtype=np.float64) + abs_values = np.abs(values_array) + return np.where( + abs_values <= delta, + 0.5 * np.square(values_array), + delta * (abs_values - 0.5 * delta), + ) + + +def huber_derivative(values: np.ndarray, delta: float) -> np.ndarray: + return np.clip(np.asarray(values, dtype=np.float64), -delta, delta) + + +def infer_pe_native_target_scope(target_name: str) -> str: + if target_name.startswith("nation/"): + return "national" + return "state" + + +def infer_pe_native_target_unit(target_name: str) -> str: + normalized = target_name.lower().replace("-", "_") + parts = normalized.split("/") + if normalized.endswith("/snap_hhs") or normalized.endswith("/snap-hhs"): + return "households" + if any(part in {"amount", "total"} for part in parts): + return "dollars" + if any(part in {"count", "returns", "filers"} for part in parts): + return "returns" + if "spending" in normalized or "cost" in normalized or "tax" in normalized: + return "dollars" + if "net_worth" in normalized or "income" in normalized: + return "dollars" + if "enrollment" in normalized or "population" in normalized: + return "people" + if "/age/" in normalized or "population_by_age" in normalized: + return "people" + if "household" in normalized or "hhs" in normalized: + return "households" + return "other" + + +def classify_pe_native_target_family(target_name: str) -> str: + """Classify one PE target name into broad diagnostic families.""" + + parts = target_name.split("/") + if target_name.startswith("state/census/age/"): + return "state_age_distribution" + if target_name.startswith("state/census/population_by_state/"): + return "state_population" + if target_name.startswith("state/census/population_under_5_by_state/"): + return "state_population_under_5" + if target_name.startswith("nation/irs/aca_spending/"): + return "state_aca_spending" + if target_name.startswith("state/irs/aca_enrollment/"): + return "state_aca_enrollment" + if target_name.startswith("irs/medicaid_enrollment/"): + return "state_medicaid_enrollment" + if target_name.endswith("/snap-cost"): + return "state_snap_cost" + if target_name.endswith("/snap-hhs"): + return "state_snap_households" + if target_name.startswith("state/real_estate_taxes/"): + return "state_real_estate_taxes" + if len(parts) >= 3 and parts[0] == "state" and parts[2] == "adjusted_gross_income": + return "state_agi_distribution" + if target_name.startswith("nation/jct/"): + return "national_tax_expenditures" + if target_name.startswith("nation/net_worth/"): + return "national_net_worth" + if target_name.startswith("nation/ssa/"): + return "national_ssa" + if target_name.startswith("nation/census/population_by_age/"): + return "national_population_by_age" + if target_name == "nation/census/infants": + return "national_infants" + if target_name.startswith("nation/census/agi_in_spm_threshold_decile_"): + return "national_spm_threshold_agi" + if target_name.startswith("nation/census/count_in_spm_threshold_decile_"): + return "national_spm_threshold_count" + if target_name.startswith("nation/census/"): + return "national_census_other" + if target_name.startswith("nation/irs/"): + return "national_irs_other" + return "other" + + +def loss_arrays_from_inputs(loss_inputs: dict[str, Any]) -> PENativeLossArrays | None: + metadata = dict(loss_inputs.get("metadata") or {}) + if metadata.get("loss_metric") != PE_NATIVE_ROBUST_LOSS_METRIC: + return None + target_names = tuple(str(name) for name in metadata.get("target_names", ())) + return PENativeLossArrays( + target_names=target_names, + target_values=np.asarray(loss_inputs["unscaled_target"], dtype=np.float64), + objective_target=np.asarray(loss_inputs["scaled_target"], dtype=np.float64), + denominator=np.asarray(loss_inputs["loss_denominator"], dtype=np.float64), + target_weight=np.asarray(loss_inputs["loss_target_weight"], dtype=np.float64), + bucket_keys=np.asarray(loss_inputs["loss_bucket"], dtype=object), + unit_keys=np.asarray(loss_inputs["loss_unit"], dtype=object), + scope_keys=np.asarray(loss_inputs["loss_scope"], dtype=object), + family_keys=np.asarray(loss_inputs["loss_family"], dtype=object), + epsilon=np.asarray(loss_inputs["loss_epsilon"], dtype=np.float64), + beta=float(metadata.get("loss_config", {}).get("baseline_weight_beta", 1.0)), + huber_delta=float(metadata.get("loss_config", {}).get("huber_delta", 1.0)), + epsilon_fraction=float( + metadata.get("loss_config", {}).get("bucket_epsilon_fraction", 0.02) + ), + bucket_weight_mode=str( + metadata.get("loss_config", {}).get("bucket_weight_mode", "equal_bucket") + ), + ) + + +def subset_loss_arrays( + loss_arrays: PENativeLossArrays, + mask: np.ndarray, +) -> PENativeLossArrays: + mask_array = np.asarray(mask, dtype=bool) + if mask_array.shape != loss_arrays.objective_target.shape: + raise ValueError("loss-array mask shape mismatch") + return PENativeLossArrays( + target_names=tuple( + name + for name, keep in zip(loss_arrays.target_names, mask_array, strict=True) + if keep + ), + target_values=loss_arrays.target_values[mask_array], + objective_target=loss_arrays.objective_target[mask_array], + denominator=loss_arrays.denominator[mask_array], + target_weight=loss_arrays.target_weight[mask_array], + bucket_keys=loss_arrays.bucket_keys[mask_array], + unit_keys=loss_arrays.unit_keys[mask_array], + scope_keys=loss_arrays.scope_keys[mask_array], + family_keys=loss_arrays.family_keys[mask_array], + epsilon=loss_arrays.epsilon[mask_array], + beta=loss_arrays.beta, + huber_delta=loss_arrays.huber_delta, + epsilon_fraction=loss_arrays.epsilon_fraction, + bucket_weight_mode=loss_arrays.bucket_weight_mode, + ) + + +__all__ = [ + "DEFAULT_BASELINE_WEIGHT_BETA", + "DEFAULT_BUCKET_EPSILON_FRACTION", + "DEFAULT_HUBER_DELTA", + "PE_NATIVE_ROBUST_LOSS_METRIC", + "PENativeLossArrays", + "build_pe_native_loss_arrays", + "classify_pe_native_target_family", + "huber_derivative", + "huber_value", + "infer_pe_native_target_scope", + "infer_pe_native_target_unit", + "loss_arrays_from_inputs", + "pe_native_huber_gradient_factor", + "pe_native_huber_loss", + "pe_native_huber_loss_terms", + "pe_native_relative_error", + "subset_loss_arrays", +] diff --git a/src/microplex_us/pipelines/pe_native_optimization.py b/src/microplex_us/pipelines/pe_native_optimization.py index 163933c..d53b620 100644 --- a/src/microplex_us/pipelines/pe_native_optimization.py +++ b/src/microplex_us/pipelines/pe_native_optimization.py @@ -3,6 +3,7 @@ from __future__ import annotations import json +import math import shutil import subprocess from collections.abc import Callable @@ -14,6 +15,12 @@ import h5py import numpy as np +from microplex_us.pipelines.pe_native_loss import ( + PENativeLossArrays, + loss_arrays_from_inputs, + pe_native_huber_gradient_factor, + pe_native_huber_loss, +) from microplex_us.pipelines.pe_native_scores import ( _ENHANCED_CPS_BAD_TARGETS, build_policyengine_us_data_subprocess_env, @@ -35,10 +42,65 @@ from policyengine_us import Microsimulation from policyengine_us_data.utils.loss import build_loss_matrix + +def patch_policyengine_us_data_uprating_aliases(): + import policyengine_us_data.utils.soi as soi_utils + + original = soi_utils.create_policyengine_uprating_factors_table + + def patched_create_policyengine_uprating_factors_table(*args, **kwargs): + table = original(*args, **kwargs) + if ( + "employment_income" not in table.index + and "employment_income_before_lsr" in table.index + ): + table.loc["employment_income"] = table.loc[ + "employment_income_before_lsr" + ] + return table + + soi_utils.create_policyengine_uprating_factors_table = ( + patched_create_policyengine_uprating_factors_table + ) + + +patch_policyengine_us_data_uprating_aliases() + + +def load_microplex_loss_helpers(): + import importlib.util + + for entry in sys.path: + candidate = Path(entry) / "microplex_us" / "pipelines" / "pe_native_loss.py" + if not candidate.exists(): + continue + spec = importlib.util.spec_from_file_location( + "microplex_us_pe_native_loss_standalone", + candidate, + ) + if spec is None or spec.loader is None: + continue + module = importlib.util.module_from_spec(spec) + sys.modules[spec.name] = module + spec.loader.exec_module(module) + return module + raise ModuleNotFoundError("Could not load microplex_us pe_native_loss helper") + + +_LOSS_HELPERS = load_microplex_loss_helpers() +build_pe_native_loss_arrays = _LOSS_HELPERS.build_pe_native_loss_arrays +pe_native_huber_loss = _LOSS_HELPERS.pe_native_huber_loss + BAD_TARGETS = tuple(json.loads(sys.argv[2])) PERIOD = int(sys.argv[3]) DATASET_PATH = sys.argv[4] -OUTPUT_PREFIX = Path(sys.argv[5]) +if len(sys.argv) >= 7: + SKIP_TAX_EXPENDITURE_TARGETS = sys.argv[5] == "1" + OUTPUT_PREFIX = Path(sys.argv[6]) +else: + SKIP_TAX_EXPENDITURE_TARGETS = False + OUTPUT_PREFIX = Path(sys.argv[5]) +TARGET_SCOPE_FILTER = sys.argv[7] if len(sys.argv) >= 8 and sys.argv[7] else None def dataset_from_path(dataset_path: str, dataset_name: str): @@ -52,6 +114,22 @@ class LocalDataset(Dataset): return LocalDataset +def scope_keep_mask(target_names): + if TARGET_SCOPE_FILTER is None: + return np.ones(target_names.shape, dtype=bool) + if TARGET_SCOPE_FILTER == "national": + return np.asarray( + [str(name).startswith("nation/") for name in target_names], + dtype=bool, + ) + if TARGET_SCOPE_FILTER == "state": + return np.asarray( + [not str(name).startswith("nation/") for name in target_names], + dtype=bool, + ) + raise ValueError(f"Unsupported target scope filter: {TARGET_SCOPE_FILTER}") + + dataset_cls = dataset_from_path( DATASET_PATH, Path(DATASET_PATH).stem.replace("-", "_"), @@ -60,31 +138,22 @@ class LocalDataset(Dataset): target_names = np.asarray(loss_matrix.columns) zero_mask = np.isclose(targets_array, 0.0, atol=0.1) bad_mask = np.isin(target_names, BAD_TARGETS) -keep_mask = ~(zero_mask | bad_mask) +keep_mask = ~(zero_mask | bad_mask) & scope_keep_mask(target_names) filtered = loss_matrix.loc[:, keep_mask] filtered_targets = np.asarray(targets_array[keep_mask], dtype=np.float64) +if filtered_targets.size == 0: + raise ValueError("PE-native loss matrix has no targets after filtering") is_national = np.asarray(filtered.columns.str.startswith("nation/"), dtype=bool) n_national = int(is_national.sum()) n_state = int((~is_national).sum()) -if n_national == 0 or n_state == 0: - raise ValueError( - "PE-native broad loss requires both national and state targets after filtering" - ) -normalisation_factor = np.where( - is_national, - 1.0 / n_national, - 1.0 / n_state, -).astype(np.float64) -inv_mean_normalisation = 1.0 / float(np.mean(normalisation_factor)) -per_target_weight = ( - inv_mean_normalisation * normalisation_factor / float(len(filtered_targets)) -).astype(np.float64) -denominator = (filtered_targets + 1.0).astype(np.float64) -scaling = np.sqrt(per_target_weight) / denominator -scaled_matrix = filtered.to_numpy(dtype=np.float64) * scaling[np.newaxis, :] -scaled_target = (filtered_targets - 1.0) * scaling +loss_arrays = build_pe_native_loss_arrays( + filtered.columns.tolist(), + filtered_targets, +) +matrix = filtered.to_numpy(dtype=np.float64) +target = loss_arrays.objective_target sim = Microsimulation(dataset=dataset_cls) sim.default_calculation_period = PERIOD @@ -94,24 +163,33 @@ class LocalDataset(Dataset): period=PERIOD, ).values.astype(np.float64) -np.save(OUTPUT_PREFIX.with_suffix(".matrix.npy"), scaled_matrix) -np.save(OUTPUT_PREFIX.with_suffix(".target.npy"), scaled_target) +np.save(OUTPUT_PREFIX.with_suffix(".matrix.npy"), matrix) +np.save(OUTPUT_PREFIX.with_suffix(".target.npy"), target) np.save(OUTPUT_PREFIX.with_suffix(".weights.npy"), weights) np.save(OUTPUT_PREFIX.with_suffix(".target_unscaled.npy"), filtered_targets) -np.save(OUTPUT_PREFIX.with_suffix(".scaling.npy"), scaling) +np.save(OUTPUT_PREFIX.with_suffix(".loss_denominator.npy"), loss_arrays.denominator) +np.save(OUTPUT_PREFIX.with_suffix(".loss_target_weight.npy"), loss_arrays.target_weight) +np.save(OUTPUT_PREFIX.with_suffix(".loss_bucket.npy"), loss_arrays.bucket_keys) +np.save(OUTPUT_PREFIX.with_suffix(".loss_unit.npy"), loss_arrays.unit_keys) +np.save(OUTPUT_PREFIX.with_suffix(".loss_scope.npy"), loss_arrays.scope_keys) +np.save(OUTPUT_PREFIX.with_suffix(".loss_family.npy"), loss_arrays.family_keys) +np.save(OUTPUT_PREFIX.with_suffix(".loss_epsilon.npy"), loss_arrays.epsilon) with open(OUTPUT_PREFIX.with_suffix(".meta.json"), "w") as handle: json.dump( { + **loss_arrays.metadata(), "target_names": filtered.columns.tolist(), + "target_loss_metadata": loss_arrays.sidecar_rows(), "n_targets_total": int(len(target_names)), "n_targets_kept": int(keep_mask.sum()), "n_targets_zero_dropped": int(zero_mask.sum()), "n_targets_bad_dropped": int(bad_mask.sum()), "n_national_targets": n_national, "n_state_targets": n_state, + "target_scope_filter": TARGET_SCOPE_FILTER, "weight_sum": float(weights.sum()), "candidate_loss_before": float( - np.square(scaled_matrix.T @ weights - scaled_target).sum() + pe_native_huber_loss(matrix.T @ weights, loss_arrays) ), }, handle, @@ -166,7 +244,7 @@ def _project_to_simplex(values: np.ndarray, total: float) -> np.ndarray: return values.copy() clipped = np.maximum(values.astype(np.float64, copy=False), 0.0) current_sum = float(clipped.sum()) - if np.isclose(current_sum, total): + if np.isclose(current_sum, total, rtol=0.0, atol=1e-6): return clipped if total <= 0.0: return np.zeros_like(clipped) @@ -220,6 +298,7 @@ def optimize_pe_native_loss_weights( scaled_matrix: np.ndarray, scaled_target: np.ndarray, initial_weights: np.ndarray, + loss_arrays: PENativeLossArrays | None = None, budget: int | None = None, max_iter: int = 200, l2_penalty: float = 0.0, @@ -243,19 +322,35 @@ def optimize_pe_native_loss_weights( raise ValueError("scaled_target must match scaled_matrix target dimension") if weights0.ndim != 1 or weights0.shape[0] != matrix.shape[0]: raise ValueError("initial_weights must match scaled_matrix household dimension") + if ( + loss_arrays is not None + and loss_arrays.objective_target.shape[0] != matrix.shape[1] + ): + raise ValueError("loss_arrays target dimension must match scaled_matrix") initial_weight_sum = float(weights0.sum()) total_weight = ( - float(target_total_weight) if target_total_weight is not None else initial_weight_sum + float(target_total_weight) + if target_total_weight is not None + else initial_weight_sum ) weights = _project_to_budget_simplex(weights0, total_weight, budget) initial_reference = weights.copy() - lipschitz = _estimate_quadratic_lipschitz(matrix, l2_penalty) + if loss_arrays is None: + lipschitz_matrix = matrix + else: + lipschitz_scale = np.sqrt(loss_arrays.target_weight) / loss_arrays.denominator + lipschitz_matrix = matrix * lipschitz_scale[np.newaxis, :] + lipschitz = _estimate_quadratic_lipschitz(lipschitz_matrix, l2_penalty) step_size = 1.0 / lipschitz def objective(candidate: np.ndarray) -> float: - residual = matrix.T @ candidate - target - base = float(np.dot(residual, residual)) + estimate = matrix.T @ candidate + if loss_arrays is None: + residual = estimate - target + base = float(np.dot(residual, residual)) + else: + base = pe_native_huber_loss(estimate, loss_arrays) if l2_penalty > 0.0: delta = candidate - initial_reference base += float(l2_penalty * np.dot(delta, delta)) @@ -275,44 +370,97 @@ def objective(candidate: np.ndarray) -> float: converged = False completed_iter = 0 total_backtracking_steps = 0 - for iteration in range(1, max_iter + 1): - residual = matrix.T @ weights - target - gradient = 2.0 * (matrix @ residual) + momentum = 1.0 + search_weights = weights.copy() + min_step_size = step_size * 1e-12 + max_step_size = step_size * 1e8 + + def gradient_at(candidate: np.ndarray) -> np.ndarray: + estimate = matrix.T @ candidate + if loss_arrays is None: + residual = estimate - target + gradient = 2.0 * (matrix @ residual) + else: + gradient = matrix @ pe_native_huber_gradient_factor( + estimate, + loss_arrays, + ) if l2_penalty > 0.0: - gradient += 2.0 * l2_penalty * (weights - initial_reference) + gradient += 2.0 * l2_penalty * (candidate - initial_reference) + return gradient + + for iteration in range(1, max_iter + 1): + gradient = gradient_at(search_weights) completed_iter = iteration candidate = weights candidate_loss = current_loss accepted_descent_step = False iteration_step_size = step_size - for backtrack in range(30): - trial = _project_to_budget_simplex( - weights - iteration_step_size * gradient, - total_weight, - budget, - ) - trial_loss = objective(trial) - if trial_loss <= current_loss: - candidate = trial - candidate_loss = trial_loss - accepted_descent_step = True - total_backtracking_steps += backtrack + accepted_backtrack = 0 + + for start, start_gradient in ( + (search_weights, gradient), + (weights, None), + ): + if accepted_descent_step: break - iteration_step_size *= 0.5 + if start_gradient is None: + start_gradient = gradient_at(start) + iteration_step_size = step_size + for backtrack in range(40): + trial = _project_to_budget_simplex( + start - iteration_step_size * start_gradient, + total_weight, + budget, + ) + trial_loss = objective(trial) + if trial_loss <= current_loss: + candidate = trial + candidate_loss = trial_loss + accepted_descent_step = True + accepted_backtrack = backtrack + total_backtracking_steps += backtrack + break + iteration_step_size *= 0.5 + if iteration_step_size < min_step_size: + break + if not accepted_descent_step: + momentum = 1.0 + search_weights = weights.copy() if not accepted_descent_step: converged = True break improvement = current_loss - candidate_loss + previous_weights = weights weights = candidate current_loss = candidate_loss + if accepted_backtrack == 0: + step_size = min(iteration_step_size * 1.25, max_step_size) + else: + step_size = iteration_step_size + next_momentum = 0.5 * (1.0 + math.sqrt(1.0 + 4.0 * momentum**2)) + extrapolated = weights + ((momentum - 1.0) / next_momentum) * ( + weights - previous_weights + ) + search_weights = _project_to_budget_simplex( + extrapolated, + total_weight, + budget, + ) + if float(np.dot(weights - previous_weights, search_weights - weights)) > 0.0: + next_momentum = 1.0 + search_weights = weights.copy() + momentum = next_momentum loss_history.append( { "iteration": int(iteration), "objective_loss": float(current_loss), "weight_sum": float(weights.sum()), "positive_household_count": int((weights > 1e-9).sum()), + "step_size": float(step_size), + "backtracking_steps": int(accepted_backtrack), } ) if history_callback is not None: @@ -333,7 +481,9 @@ def objective(candidate: np.ndarray) -> float: "budget": None if budget is None else int(budget), "iterations": int(completed_iter), "converged": bool(converged), + "method": "monotone_accelerated_projected_gradient", "step_size": float(step_size), + "initial_step_size": float(1.0 / lipschitz), "line_search_backtracking_steps": int(total_backtracking_steps), "loss_history": loss_history, } @@ -358,7 +508,9 @@ def rewrite_policyengine_us_dataset_weights( with h5py.File(output, "r+") as handle: household_ids = handle["household_id"][period_key][:] if len(household_ids) != len(weights): - raise ValueError("household_weights length does not match household_id array") + raise ValueError( + "household_weights length does not match household_id array" + ) household_map = { int(household_id): float(weight) for household_id, weight in zip(household_ids, weights, strict=True) @@ -368,7 +520,10 @@ def rewrite_policyengine_us_dataset_weights( if "person_weight" in handle and "person_household_id" in handle: person_households = handle["person_household_id"][period_key][:] person_weights = np.array( - [household_map[int(household_id)] for household_id in person_households], + [ + household_map[int(household_id)] + for household_id in person_households + ], dtype=np.float32, ) handle["person_weight"][period_key][...] = person_weights @@ -448,8 +603,10 @@ def optimize_policyengine_us_native_loss_dataset( check=False, ) if completed.returncode != 0: - detail = completed.stderr.strip() or completed.stdout.strip() or str( - completed.returncode + detail = ( + completed.stderr.strip() + or completed.stdout.strip() + or str(completed.returncode) ) raise RuntimeError(f"PE-native loss-matrix extraction failed: {detail}") @@ -457,11 +614,37 @@ def optimize_policyengine_us_native_loss_dataset( scaled_target = np.load(prefix.with_suffix(".target.npy")) initial_weights = np.load(prefix.with_suffix(".weights.npy")) metadata = json.loads(prefix.with_suffix(".meta.json").read_text()) + loss_inputs = { + "scaled_matrix": scaled_matrix, + "scaled_target": scaled_target, + "initial_weights": initial_weights, + "unscaled_target": np.load(prefix.with_suffix(".target_unscaled.npy")), + "loss_denominator": np.load(prefix.with_suffix(".loss_denominator.npy")), + "loss_target_weight": np.load( + prefix.with_suffix(".loss_target_weight.npy") + ), + "loss_bucket": np.load( + prefix.with_suffix(".loss_bucket.npy"), allow_pickle=True + ), + "loss_unit": np.load( + prefix.with_suffix(".loss_unit.npy"), allow_pickle=True + ), + "loss_scope": np.load( + prefix.with_suffix(".loss_scope.npy"), allow_pickle=True + ), + "loss_family": np.load( + prefix.with_suffix(".loss_family.npy"), allow_pickle=True + ), + "loss_epsilon": np.load(prefix.with_suffix(".loss_epsilon.npy")), + "metadata": metadata, + } + loss_arrays = loss_arrays_from_inputs(loss_inputs) optimized_weights, summary = optimize_pe_native_loss_weights( scaled_matrix=scaled_matrix, scaled_target=scaled_target, initial_weights=initial_weights, + loss_arrays=loss_arrays, budget=budget, max_iter=max_iter, l2_penalty=l2_penalty, @@ -475,7 +658,12 @@ def optimize_policyengine_us_native_loss_dataset( period=period, ) return PolicyEngineUSNativeWeightOptimizationResult( - metric="enhanced_cps_native_loss_weight_optimization", + metric=str( + metadata.get( + "loss_metric", + "enhanced_cps_native_loss_weight_optimization", + ) + ), period=int(period), input_dataset=str(Path(input_dataset_path).expanduser().resolve()), output_dataset=str(rewritten), diff --git a/src/microplex_us/pipelines/pe_native_scores.py b/src/microplex_us/pipelines/pe_native_scores.py index 2e9cdf1..21a8df4 100644 --- a/src/microplex_us/pipelines/pe_native_scores.py +++ b/src/microplex_us/pipelines/pe_native_scores.py @@ -58,10 +58,62 @@ from policyengine_us import Microsimulation from policyengine_us_data.utils.loss import build_loss_matrix + +def patch_policyengine_us_data_uprating_aliases(): + import policyengine_us_data.utils.soi as soi_utils + + original = soi_utils.create_policyengine_uprating_factors_table + + def patched_create_policyengine_uprating_factors_table(*args, **kwargs): + table = original(*args, **kwargs) + if ( + "employment_income" not in table.index + and "employment_income_before_lsr" in table.index + ): + table.loc["employment_income"] = table.loc[ + "employment_income_before_lsr" + ] + return table + + soi_utils.create_policyengine_uprating_factors_table = ( + patched_create_policyengine_uprating_factors_table + ) + + +patch_policyengine_us_data_uprating_aliases() + + +def load_microplex_loss_helpers(): + import importlib.util + + for entry in sys.path: + candidate = Path(entry) / "microplex_us" / "pipelines" / "pe_native_loss.py" + if not candidate.exists(): + continue + spec = importlib.util.spec_from_file_location( + "microplex_us_pe_native_loss_standalone", + candidate, + ) + if spec is None or spec.loader is None: + continue + module = importlib.util.module_from_spec(spec) + sys.modules[spec.name] = module + spec.loader.exec_module(module) + return module + raise ModuleNotFoundError("Could not load microplex_us pe_native_loss helper") + + +_LOSS_HELPERS = load_microplex_loss_helpers() +PE_NATIVE_ROBUST_LOSS_METRIC = _LOSS_HELPERS.PE_NATIVE_ROBUST_LOSS_METRIC +build_pe_native_loss_arrays = _LOSS_HELPERS.build_pe_native_loss_arrays +pe_native_huber_loss_terms = _LOSS_HELPERS.pe_native_huber_loss_terms +pe_native_relative_error = _LOSS_HELPERS.pe_native_relative_error + BAD_TARGETS = tuple(json.loads(sys.argv[2])) PERIOD = int(sys.argv[3]) CANDIDATE_DATASET = sys.argv[4] BASELINE_DATASET = sys.argv[5] +TARGET_SCOPE_FILTER = sys.argv[6] if len(sys.argv) >= 7 and sys.argv[6] else None def dataset_from_path(dataset_path: str, dataset_name: str): @@ -75,6 +127,22 @@ class LocalDataset(Dataset): return LocalDataset +def scope_keep_mask(target_names): + if TARGET_SCOPE_FILTER is None: + return np.ones(target_names.shape, dtype=bool) + if TARGET_SCOPE_FILTER == "national": + return np.asarray( + [str(name).startswith("nation/") for name in target_names], + dtype=bool, + ) + if TARGET_SCOPE_FILTER == "state": + return np.asarray( + [not str(name).startswith("nation/") for name in target_names], + dtype=bool, + ) + raise ValueError(f"Unsupported target scope filter: {TARGET_SCOPE_FILTER}") + + def classify_target_family(target_name: str) -> str: parts = target_name.split("/") if target_name.startswith("state/census/age/"): @@ -122,7 +190,6 @@ def build_family_breakdown(target_names, candidate_terms, baseline_terms, candid family_rows = [] target_names = list(target_names) unique_families = sorted({classify_target_family(name) for name in target_names}) - n_targets_total = float(len(target_names)) for family in unique_families: idx = [i for i, name in enumerate(target_names) if classify_target_family(name) == family] if not idx: @@ -135,14 +202,17 @@ def build_family_breakdown(target_names, candidate_terms, baseline_terms, candid { "family": family, "n_targets": int(len(idx)), - "candidate_loss_contribution": float(candidate_slice.sum() / n_targets_total), - "baseline_loss_contribution": float(baseline_slice.sum() / n_targets_total), - "loss_contribution_delta": float((candidate_slice.sum() - baseline_slice.sum()) / n_targets_total), + "candidate_loss_contribution": float(candidate_slice.sum()), + "baseline_loss_contribution": float(baseline_slice.sum()), + "loss_contribution_delta": float(candidate_slice.sum() - baseline_slice.sum()), "candidate_mean_weighted_loss": float(candidate_slice.mean()), "baseline_mean_weighted_loss": float(baseline_slice.mean()), - "candidate_mean_unweighted_msre": float(candidate_rel_slice.mean()), - "baseline_mean_unweighted_msre": float(baseline_rel_slice.mean()), - "unweighted_msre_delta": float(candidate_rel_slice.mean() - baseline_rel_slice.mean()), + "candidate_mean_unweighted_msre": float(np.mean(np.square(candidate_rel_slice))), + "baseline_mean_unweighted_msre": float(np.mean(np.square(baseline_rel_slice))), + "unweighted_msre_delta": float( + np.mean(np.square(candidate_rel_slice)) + - np.mean(np.square(baseline_rel_slice)) + ), } ) family_rows.sort(key=lambda row: row["loss_contribution_delta"], reverse=True) @@ -158,24 +228,20 @@ def compute(dataset_path: str) -> dict[str, float | int]: target_names = np.asarray(loss_matrix.columns) zero_mask = np.isclose(targets_array, 0.0, atol=0.1) bad_mask = np.isin(target_names, BAD_TARGETS) - keep_mask = ~(zero_mask | bad_mask) + keep_mask = ~(zero_mask | bad_mask) & scope_keep_mask(target_names) filtered = loss_matrix.loc[:, keep_mask] filtered_targets = np.asarray(targets_array[keep_mask], dtype=np.float64) + if filtered_targets.size == 0: + raise ValueError("PE-native broad loss has no targets after filtering") is_national = np.asarray(filtered.columns.str.startswith("nation/"), dtype=bool) n_national = int(is_national.sum()) n_state = int((~is_national).sum()) - if n_national == 0 or n_state == 0: - raise ValueError( - "PE-native broad loss requires both national and state targets after filtering" - ) - normalisation_factor = np.where( - is_national, - 1.0 / n_national, - 1.0 / n_state, - ).astype(np.float64) - inv_mean_normalisation = 1.0 / float(np.mean(normalisation_factor)) + loss_arrays = build_pe_native_loss_arrays( + filtered.columns.tolist(), + filtered_targets, + ) sim = Microsimulation(dataset=dataset_cls) sim.default_calculation_period = PERIOD @@ -186,12 +252,14 @@ def compute(dataset_path: str) -> dict[str, float | int]: ).values.astype(np.float64) estimate = weights @ filtered.to_numpy(dtype=np.float64) - rel_error = (((estimate - filtered_targets) + 1.0) / (filtered_targets + 1.0)) ** 2 - weighted_terms = inv_mean_normalisation * rel_error * normalisation_factor - loss_value = float(weighted_terms.mean()) - unweighted_msre = float(rel_error.mean()) + rel_error = pe_native_relative_error(estimate, loss_arrays) + weighted_terms = pe_native_huber_loss_terms(estimate, loss_arrays) + loss_value = float(weighted_terms.sum()) + unweighted_msre = float(np.mean(np.square(rel_error))) return { + "metric": PE_NATIVE_ROBUST_LOSS_METRIC, + "loss_config": loss_arrays.metadata().get("loss_config"), "loss": loss_value, "unweighted_msre": unweighted_msre, "n_targets_total": int(len(target_names)), @@ -200,10 +268,12 @@ def compute(dataset_path: str) -> dict[str, float | int]: "n_targets_bad_dropped": int(bad_mask.sum()), "n_national_targets": n_national, "n_state_targets": n_state, + "target_scope_filter": TARGET_SCOPE_FILTER, "weight_sum": float(weights.sum()), "target_names": filtered.columns.tolist(), "weighted_terms": weighted_terms.tolist(), "rel_error": rel_error.tolist(), + "target_loss_metadata": loss_arrays.sidecar_rows(), } @@ -219,7 +289,8 @@ def compute(dataset_path: str) -> dict[str, float | int]: raise ValueError("Candidate and baseline produced different target names after filtering") payload = { - "metric": "enhanced_cps_native_loss", + "metric": candidate["metric"], + "loss_config": candidate.get("loss_config"), "period": PERIOD, "candidate_dataset": CANDIDATE_DATASET, "baseline_dataset": BASELINE_DATASET, @@ -237,6 +308,7 @@ def compute(dataset_path: str) -> dict[str, float | int]: "n_targets_bad_dropped": candidate["n_targets_bad_dropped"], "n_national_targets": candidate["n_national_targets"], "n_state_targets": candidate["n_state_targets"], + "target_scope_filter": TARGET_SCOPE_FILTER, "candidate_weight_sum": candidate["weight_sum"], "baseline_weight_sum": baseline["weight_sum"], "family_breakdown": build_family_breakdown( @@ -265,10 +337,62 @@ def compute(dataset_path: str) -> dict[str, float | int]: from policyengine_us import Microsimulation from policyengine_us_data.utils.loss import build_loss_matrix + +def patch_policyengine_us_data_uprating_aliases(): + import policyengine_us_data.utils.soi as soi_utils + + original = soi_utils.create_policyengine_uprating_factors_table + + def patched_create_policyengine_uprating_factors_table(*args, **kwargs): + table = original(*args, **kwargs) + if ( + "employment_income" not in table.index + and "employment_income_before_lsr" in table.index + ): + table.loc["employment_income"] = table.loc[ + "employment_income_before_lsr" + ] + return table + + soi_utils.create_policyengine_uprating_factors_table = ( + patched_create_policyengine_uprating_factors_table + ) + + +patch_policyengine_us_data_uprating_aliases() + + +def load_microplex_loss_helpers(): + import importlib.util + + for entry in sys.path: + candidate = Path(entry) / "microplex_us" / "pipelines" / "pe_native_loss.py" + if not candidate.exists(): + continue + spec = importlib.util.spec_from_file_location( + "microplex_us_pe_native_loss_standalone", + candidate, + ) + if spec is None or spec.loader is None: + continue + module = importlib.util.module_from_spec(spec) + sys.modules[spec.name] = module + spec.loader.exec_module(module) + return module + raise ModuleNotFoundError("Could not load microplex_us pe_native_loss helper") + + +_LOSS_HELPERS = load_microplex_loss_helpers() +PE_NATIVE_ROBUST_LOSS_METRIC = _LOSS_HELPERS.PE_NATIVE_ROBUST_LOSS_METRIC +build_pe_native_loss_arrays = _LOSS_HELPERS.build_pe_native_loss_arrays +pe_native_huber_loss_terms = _LOSS_HELPERS.pe_native_huber_loss_terms +pe_native_relative_error = _LOSS_HELPERS.pe_native_relative_error + BAD_TARGETS = tuple(json.loads(sys.argv[2])) PERIOD = int(sys.argv[3]) BASELINE_DATASET = sys.argv[4] CANDIDATE_DATASETS = tuple(json.loads(sys.argv[5])) +TARGET_SCOPE_FILTER = sys.argv[6] if len(sys.argv) >= 7 and sys.argv[6] else None def dataset_from_path(dataset_path: str, dataset_name: str): @@ -282,6 +406,22 @@ class LocalDataset(Dataset): return LocalDataset +def scope_keep_mask(target_names): + if TARGET_SCOPE_FILTER is None: + return np.ones(target_names.shape, dtype=bool) + if TARGET_SCOPE_FILTER == "national": + return np.asarray( + [str(name).startswith("nation/") for name in target_names], + dtype=bool, + ) + if TARGET_SCOPE_FILTER == "state": + return np.asarray( + [not str(name).startswith("nation/") for name in target_names], + dtype=bool, + ) + raise ValueError(f"Unsupported target scope filter: {TARGET_SCOPE_FILTER}") + + def classify_target_family(target_name: str) -> str: parts = target_name.split("/") if target_name.startswith("state/census/age/"): @@ -329,7 +469,6 @@ def build_family_breakdown(target_names, candidate_terms, baseline_terms, candid family_rows = [] target_names = list(target_names) unique_families = sorted({classify_target_family(name) for name in target_names}) - n_targets_total = float(len(target_names)) for family in unique_families: idx = [i for i, name in enumerate(target_names) if classify_target_family(name) == family] if not idx: @@ -342,14 +481,17 @@ def build_family_breakdown(target_names, candidate_terms, baseline_terms, candid { "family": family, "n_targets": int(len(idx)), - "candidate_loss_contribution": float(candidate_slice.sum() / n_targets_total), - "baseline_loss_contribution": float(baseline_slice.sum() / n_targets_total), - "loss_contribution_delta": float((candidate_slice.sum() - baseline_slice.sum()) / n_targets_total), + "candidate_loss_contribution": float(candidate_slice.sum()), + "baseline_loss_contribution": float(baseline_slice.sum()), + "loss_contribution_delta": float(candidate_slice.sum() - baseline_slice.sum()), "candidate_mean_weighted_loss": float(candidate_slice.mean()), "baseline_mean_weighted_loss": float(baseline_slice.mean()), - "candidate_mean_unweighted_msre": float(candidate_rel_slice.mean()), - "baseline_mean_unweighted_msre": float(baseline_rel_slice.mean()), - "unweighted_msre_delta": float(candidate_rel_slice.mean() - baseline_rel_slice.mean()), + "candidate_mean_unweighted_msre": float(np.mean(np.square(candidate_rel_slice))), + "baseline_mean_unweighted_msre": float(np.mean(np.square(baseline_rel_slice))), + "unweighted_msre_delta": float( + np.mean(np.square(candidate_rel_slice)) + - np.mean(np.square(baseline_rel_slice)) + ), } ) family_rows.sort(key=lambda row: row["loss_contribution_delta"], reverse=True) @@ -365,24 +507,20 @@ def compute(dataset_path: str) -> dict[str, float | int]: target_names = np.asarray(loss_matrix.columns) zero_mask = np.isclose(targets_array, 0.0, atol=0.1) bad_mask = np.isin(target_names, BAD_TARGETS) - keep_mask = ~(zero_mask | bad_mask) + keep_mask = ~(zero_mask | bad_mask) & scope_keep_mask(target_names) filtered = loss_matrix.loc[:, keep_mask] filtered_targets = np.asarray(targets_array[keep_mask], dtype=np.float64) + if filtered_targets.size == 0: + raise ValueError("PE-native broad loss has no targets after filtering") is_national = np.asarray(filtered.columns.str.startswith("nation/"), dtype=bool) n_national = int(is_national.sum()) n_state = int((~is_national).sum()) - if n_national == 0 or n_state == 0: - raise ValueError( - "PE-native broad loss requires both national and state targets after filtering" - ) - normalisation_factor = np.where( - is_national, - 1.0 / n_national, - 1.0 / n_state, - ).astype(np.float64) - inv_mean_normalisation = 1.0 / float(np.mean(normalisation_factor)) + loss_arrays = build_pe_native_loss_arrays( + filtered.columns.tolist(), + filtered_targets, + ) sim = Microsimulation(dataset=dataset_cls) sim.default_calculation_period = PERIOD @@ -393,13 +531,15 @@ def compute(dataset_path: str) -> dict[str, float | int]: ).values.astype(np.float64) estimate = weights @ filtered.to_numpy(dtype=np.float64) - rel_error = (((estimate - filtered_targets) + 1.0) / (filtered_targets + 1.0)) ** 2 - weighted_terms = inv_mean_normalisation * rel_error * normalisation_factor - loss_value = float(weighted_terms.mean()) - unweighted_msre = float(rel_error.mean()) + rel_error = pe_native_relative_error(estimate, loss_arrays) + weighted_terms = pe_native_huber_loss_terms(estimate, loss_arrays) + loss_value = float(weighted_terms.sum()) + unweighted_msre = float(np.mean(np.square(rel_error))) return { "dataset": dataset_path, + "metric": PE_NATIVE_ROBUST_LOSS_METRIC, + "loss_config": loss_arrays.metadata().get("loss_config"), "loss": loss_value, "unweighted_msre": unweighted_msre, "n_targets_total": int(len(target_names)), @@ -408,10 +548,12 @@ def compute(dataset_path: str) -> dict[str, float | int]: "n_targets_bad_dropped": int(bad_mask.sum()), "n_national_targets": n_national, "n_state_targets": n_state, + "target_scope_filter": TARGET_SCOPE_FILTER, "weight_sum": float(weights.sum()), "target_names": filtered.columns.tolist(), "weighted_terms": weighted_terms.tolist(), "rel_error": rel_error.tolist(), + "target_loss_metadata": loss_arrays.sidecar_rows(), } @@ -428,7 +570,8 @@ def compute(dataset_path: str) -> dict[str, float | int]: raise ValueError("Candidate and baseline produced different target names after filtering") payload.append( { - "metric": "enhanced_cps_native_loss", + "metric": candidate["metric"], + "loss_config": candidate.get("loss_config"), "period": PERIOD, "candidate_dataset": candidate_dataset, "baseline_dataset": BASELINE_DATASET, @@ -447,6 +590,7 @@ def compute(dataset_path: str) -> dict[str, float | int]: "n_targets_bad_dropped": candidate["n_targets_bad_dropped"], "n_national_targets": candidate["n_national_targets"], "n_state_targets": candidate["n_state_targets"], + "target_scope_filter": TARGET_SCOPE_FILTER, "candidate_weight_sum": candidate["weight_sum"], "baseline_weight_sum": baseline["weight_sum"], "family_breakdown": build_family_breakdown( @@ -476,6 +620,57 @@ def compute(dataset_path: str) -> dict[str, float | int]: from policyengine_us import Microsimulation from policyengine_us_data.utils.loss import build_loss_matrix + +def patch_policyengine_us_data_uprating_aliases(): + import policyengine_us_data.utils.soi as soi_utils + + original = soi_utils.create_policyengine_uprating_factors_table + + def patched_create_policyengine_uprating_factors_table(*args, **kwargs): + table = original(*args, **kwargs) + if ( + "employment_income" not in table.index + and "employment_income_before_lsr" in table.index + ): + table.loc["employment_income"] = table.loc[ + "employment_income_before_lsr" + ] + return table + + soi_utils.create_policyengine_uprating_factors_table = ( + patched_create_policyengine_uprating_factors_table + ) + + +patch_policyengine_us_data_uprating_aliases() + + +def load_microplex_loss_helpers(): + import importlib.util + + for entry in sys.path: + candidate = Path(entry) / "microplex_us" / "pipelines" / "pe_native_loss.py" + if not candidate.exists(): + continue + spec = importlib.util.spec_from_file_location( + "microplex_us_pe_native_loss_standalone", + candidate, + ) + if spec is None or spec.loader is None: + continue + module = importlib.util.module_from_spec(spec) + sys.modules[spec.name] = module + spec.loader.exec_module(module) + return module + raise ModuleNotFoundError("Could not load microplex_us pe_native_loss helper") + + +_LOSS_HELPERS = load_microplex_loss_helpers() +PE_NATIVE_ROBUST_LOSS_METRIC = _LOSS_HELPERS.PE_NATIVE_ROBUST_LOSS_METRIC +build_pe_native_loss_arrays = _LOSS_HELPERS.build_pe_native_loss_arrays +pe_native_huber_loss_terms = _LOSS_HELPERS.pe_native_huber_loss_terms +pe_native_relative_error = _LOSS_HELPERS.pe_native_relative_error + BAD_TARGETS = tuple(json.loads(sys.argv[2])) PERIOD = int(sys.argv[3]) FROM_DATASET = sys.argv[4] @@ -515,12 +710,10 @@ def compute(dataset_path: str): "PE-native broad loss requires both national and state targets after filtering" ) - normalisation_factor = np.where( - is_national, - 1.0 / n_national, - 1.0 / n_state, - ).astype(np.float64) - inv_mean_normalisation = 1.0 / float(np.mean(normalisation_factor)) + loss_arrays = build_pe_native_loss_arrays( + filtered.columns.tolist(), + filtered_targets, + ) sim = Microsimulation(dataset=dataset_cls) sim.default_calculation_period = PERIOD @@ -531,8 +724,8 @@ def compute(dataset_path: str): ).values.astype(np.float64) estimate = weights @ filtered.to_numpy(dtype=np.float64) - rel_error = (((estimate - filtered_targets) + 1.0) / (filtered_targets + 1.0)) ** 2 - weighted_terms = inv_mean_normalisation * rel_error * normalisation_factor + rel_error = pe_native_relative_error(estimate, loss_arrays) + weighted_terms = pe_native_huber_loss_terms(estimate, loss_arrays) return { "target_names": filtered.columns.tolist(), "targets": filtered_targets.tolist(), @@ -707,6 +900,57 @@ def summarize_target_rows(rows, *, group_field=None): from policyengine_us import Microsimulation from policyengine_us_data.utils.loss import build_loss_matrix + +def patch_policyengine_us_data_uprating_aliases(): + import policyengine_us_data.utils.soi as soi_utils + + original = soi_utils.create_policyengine_uprating_factors_table + + def patched_create_policyengine_uprating_factors_table(*args, **kwargs): + table = original(*args, **kwargs) + if ( + "employment_income" not in table.index + and "employment_income_before_lsr" in table.index + ): + table.loc["employment_income"] = table.loc[ + "employment_income_before_lsr" + ] + return table + + soi_utils.create_policyengine_uprating_factors_table = ( + patched_create_policyengine_uprating_factors_table + ) + + +patch_policyengine_us_data_uprating_aliases() + + +def load_microplex_loss_helpers(): + import importlib.util + + for entry in sys.path: + candidate = Path(entry) / "microplex_us" / "pipelines" / "pe_native_loss.py" + if not candidate.exists(): + continue + spec = importlib.util.spec_from_file_location( + "microplex_us_pe_native_loss_standalone", + candidate, + ) + if spec is None or spec.loader is None: + continue + module = importlib.util.module_from_spec(spec) + sys.modules[spec.name] = module + spec.loader.exec_module(module) + return module + raise ModuleNotFoundError("Could not load microplex_us pe_native_loss helper") + + +_LOSS_HELPERS = load_microplex_loss_helpers() +PE_NATIVE_ROBUST_LOSS_METRIC = _LOSS_HELPERS.PE_NATIVE_ROBUST_LOSS_METRIC +build_pe_native_loss_arrays = _LOSS_HELPERS.build_pe_native_loss_arrays +pe_native_huber_loss_terms = _LOSS_HELPERS.pe_native_huber_loss_terms +pe_native_relative_error = _LOSS_HELPERS.pe_native_relative_error + BAD_TARGETS = tuple(json.loads(sys.argv[2])) PERIOD = int(sys.argv[3]) BASELINE_DATASET = sys.argv[4] @@ -746,12 +990,10 @@ def compute(dataset_path: str): "PE-native broad loss requires both national and state targets after filtering" ) - normalisation_factor = np.where( - is_national, - 1.0 / n_national, - 1.0 / n_state, - ).astype(np.float64) - inv_mean_normalisation = 1.0 / float(np.mean(normalisation_factor)) + loss_arrays = build_pe_native_loss_arrays( + filtered.columns.tolist(), + filtered_targets, + ) sim = Microsimulation(dataset=dataset_cls) sim.default_calculation_period = PERIOD @@ -762,8 +1004,8 @@ def compute(dataset_path: str): ).values.astype(np.float64) estimate = weights @ filtered.to_numpy(dtype=np.float64) - rel_error = (((estimate - filtered_targets) + 1.0) / (filtered_targets + 1.0)) ** 2 - weighted_terms = inv_mean_normalisation * rel_error * normalisation_factor + rel_error = pe_native_relative_error(estimate, loss_arrays) + weighted_terms = pe_native_huber_loss_terms(estimate, loss_arrays) return { "target_names": filtered.columns.tolist(), "targets": filtered_targets.tolist(), @@ -2043,6 +2285,8 @@ class PolicyEngineUSEnhancedCPSNativeScores: candidate_weight_sum: float baseline_weight_sum: float family_breakdown: tuple[dict[str, Any], ...] = field(default_factory=tuple) + loss_config: dict[str, Any] | None = None + target_scope_filter: str | None = None def to_dict(self) -> dict[str, Any]: return { @@ -2069,10 +2313,14 @@ def to_dict(self) -> dict[str, Any]: "candidate_weight_sum": self.candidate_weight_sum, "baseline_weight_sum": self.baseline_weight_sum, "family_breakdown": list(self.family_breakdown), + "loss_config": self.loss_config, + "target_scope_filter": self.target_scope_filter, } @classmethod - def from_dict(cls, payload: dict[str, Any]) -> PolicyEngineUSEnhancedCPSNativeScores: + def from_dict( + cls, payload: dict[str, Any] + ) -> PolicyEngineUSEnhancedCPSNativeScores: return cls( metric=str(payload["metric"]), period=int(payload["period"]), @@ -2099,6 +2347,8 @@ def from_dict(cls, payload: dict[str, Any]) -> PolicyEngineUSEnhancedCPSNativeSc candidate_weight_sum=float(payload["candidate_weight_sum"]), baseline_weight_sum=float(payload["baseline_weight_sum"]), family_breakdown=tuple(payload.get("family_breakdown", ())), + loss_config=payload.get("loss_config"), + target_scope_filter=payload.get("target_scope_filter"), ) @@ -2124,8 +2374,7 @@ def resolve_policyengine_us_data_repo_root( return resolved searched = ", ".join(str(path.expanduser()) for path in candidates) raise FileNotFoundError( - "Could not resolve policyengine-us-data repo root. " - f"Searched: {searched}" + f"Could not resolve policyengine-us-data repo root. Searched: {searched}" ) @@ -2169,7 +2418,8 @@ def build_policyengine_us_data_pythonpath( """Build the native-scoring PYTHONPATH for local PE-US-data checkouts.""" resolved_repo = resolve_policyengine_us_data_repo_root(repo_root) - path_entries: list[str] = [str(resolved_repo)] + microplex_src = Path(__file__).resolve().parents[2] + path_entries: list[str] = [str(resolved_repo), str(microplex_src)] sibling_microimpute = resolved_repo.parent / "microimpute" if (sibling_microimpute / "microimpute").exists(): @@ -2209,6 +2459,7 @@ def compute_policyengine_us_enhanced_cps_native_scores( period: int = 2024, policyengine_us_data_python: str | Path | None = None, policyengine_us_data_repo: str | Path | None = None, + target_scope_filter: str | None = None, ) -> PolicyEngineUSEnhancedCPSNativeScores: """Score one candidate and baseline under the exact enhanced-CPS loss.""" resolved_repo = resolve_policyengine_us_data_repo_root(policyengine_us_data_repo) @@ -2227,6 +2478,7 @@ def compute_policyengine_us_enhanced_cps_native_scores( str(int(period)), str(Path(candidate_dataset).expanduser().resolve()), str(Path(baseline_dataset).expanduser().resolve()), + target_scope_filter or "", ], cwd=resolved_repo, env=env, @@ -2250,6 +2502,7 @@ def score_policyengine_us_native_broad_loss( period: int = 2024, python_executable: str | Path | None = None, repo_root: str | Path | None = None, + target_scope_filter: str | None = None, ) -> PolicyEngineUSEnhancedCPSNativeScores: """Backward-compatible alias for the exact enhanced-CPS loss scorer.""" return compute_policyengine_us_enhanced_cps_native_scores( @@ -2258,6 +2511,7 @@ def score_policyengine_us_native_broad_loss( period=period, policyengine_us_data_python=python_executable, policyengine_us_data_repo=repo_root, + target_scope_filter=target_scope_filter, ) @@ -2268,6 +2522,7 @@ def compute_us_pe_native_scores( period: int = 2024, policyengine_us_data_repo: str | Path | None = None, policyengine_us_data_python: str | Path | None = None, + target_scope_filter: str | None = None, ) -> dict[str, Any]: """Build the saved manifest payload for PE-native broad scoring.""" @@ -2277,6 +2532,7 @@ def compute_us_pe_native_scores( period=period, policyengine_us_data_python=policyengine_us_data_python, policyengine_us_data_repo=policyengine_us_data_repo, + target_scope_filter=target_scope_filter, ) return { "metric": score.metric, @@ -2299,6 +2555,8 @@ def compute_us_pe_native_scores( "n_targets_bad_dropped": score.n_targets_bad_dropped, "n_national_targets": score.n_national_targets, "n_state_targets": score.n_state_targets, + "loss_config": score.loss_config, + "target_scope_filter": score.target_scope_filter, }, "broad_loss": score.to_dict(), "family_breakdown": list(score.family_breakdown), @@ -2312,6 +2570,7 @@ def compute_batch_us_pe_native_scores( period: int = 2024, policyengine_us_data_repo: str | Path | None = None, policyengine_us_data_python: str | Path | None = None, + target_scope_filter: str | None = None, ) -> list[dict[str, Any]]: """Score multiple candidates against one baseline in a single PE-native subprocess.""" @@ -2339,6 +2598,7 @@ def compute_batch_us_pe_native_scores( for candidate_path in candidate_dataset_paths ] ), + target_scope_filter or "", ], cwd=resolved_repo, env=env, @@ -2358,6 +2618,7 @@ def compute_batch_us_pe_native_scores( "metric": item["metric"], "period": int(item["period"]), "summary": { + "loss_config": item.get("loss_config"), "candidate_enhanced_cps_native_loss": float( item["candidate_enhanced_cps_native_loss"] ), @@ -2367,9 +2628,7 @@ def compute_batch_us_pe_native_scores( "enhanced_cps_native_loss_delta": float( item["enhanced_cps_native_loss_delta"] ), - "candidate_beats_baseline": bool( - item["candidate_beats_baseline"] - ), + "candidate_beats_baseline": bool(item["candidate_beats_baseline"]), "candidate_unweighted_msre": float(item["candidate_unweighted_msre"]), "baseline_unweighted_msre": float(item["baseline_unweighted_msre"]), "unweighted_msre_delta": float(item["unweighted_msre_delta"]), @@ -2379,9 +2638,11 @@ def compute_batch_us_pe_native_scores( "n_targets_bad_dropped": int(item["n_targets_bad_dropped"]), "n_national_targets": int(item["n_national_targets"]), "n_state_targets": int(item["n_state_targets"]), + "target_scope_filter": item.get("target_scope_filter"), }, "broad_loss": { "metric": item["metric"], + "loss_config": item.get("loss_config"), "period": int(item["period"]), "candidate_dataset": str(item["candidate_dataset"]), "baseline_dataset": str(item["baseline_dataset"]), @@ -2394,9 +2655,7 @@ def compute_batch_us_pe_native_scores( "enhanced_cps_native_loss_delta": float( item["enhanced_cps_native_loss_delta"] ), - "candidate_beats_baseline": bool( - item["candidate_beats_baseline"] - ), + "candidate_beats_baseline": bool(item["candidate_beats_baseline"]), "candidate_unweighted_msre": float(item["candidate_unweighted_msre"]), "baseline_unweighted_msre": float(item["baseline_unweighted_msre"]), "unweighted_msre_delta": float(item["unweighted_msre_delta"]), @@ -2406,6 +2665,7 @@ def compute_batch_us_pe_native_scores( "n_targets_bad_dropped": int(item["n_targets_bad_dropped"]), "n_national_targets": int(item["n_national_targets"]), "n_state_targets": int(item["n_state_targets"]), + "target_scope_filter": item.get("target_scope_filter"), "candidate_weight_sum": float(item["candidate_weight_sum"]), "baseline_weight_sum": float(item["baseline_weight_sum"]), "family_breakdown": list(item.get("family_breakdown", [])), @@ -2662,7 +2922,9 @@ def annotate_pe_native_target_db_matches( key = parse_pe_native_target_lookup_key(target_name) if key is None: annotation: dict[str, Any] = {"policyengine_target_match": "unparsed"} - elif resolved_db_path is None or not resolved_db_path.exists() or target_db_error: + elif ( + resolved_db_path is None or not resolved_db_path.exists() or target_db_error + ): annotation = { "policyengine_target_match": "db_unavailable", "policyengine_target_expected": key.expected_target(), diff --git a/tests/pipelines/test_ecps_replacement_comparison.py b/tests/pipelines/test_ecps_replacement_comparison.py index 70647e1..9610d79 100644 --- a/tests/pipelines/test_ecps_replacement_comparison.py +++ b/tests/pipelines/test_ecps_replacement_comparison.py @@ -16,6 +16,7 @@ from microplex_us.pipelines.mp300k_artifact_gates import ( write_mp300k_artifact_gate_report, ) +from microplex_us.pipelines.pe_native_loss import build_pe_native_loss_arrays from microplex_us.policyengine.us import write_policyengine_us_time_period_dataset _TARGET_NAMES = [ @@ -258,6 +259,42 @@ def test_target_loss_diagnostics_family_breakdown_uses_total_loss_scale(): ) == pytest.approx(diagnostics["summary"]["candidate_loss"]) +def test_robust_loss_terms_match_objective() -> None: + target_names = [ + "nation/irs/example income/total/AGI in 0_1/taxable/All", + "nation/irs/example count/count/AGI in 0_1/taxable/All", + ] + targets = np.asarray([100.0, 10.0]) + loss_arrays = build_pe_native_loss_arrays(target_names, targets) + matrix = np.asarray([[90.0, 20.0], [5.0, 0.0]]) + weights = np.asarray([1.0, 1.0]) + loss_inputs = { + "scaled_matrix": matrix, + "scaled_target": loss_arrays.objective_target, + "unscaled_target": targets, + "loss_denominator": loss_arrays.denominator, + "loss_target_weight": loss_arrays.target_weight, + "loss_bucket": loss_arrays.bucket_keys, + "loss_unit": loss_arrays.unit_keys, + "loss_scope": loss_arrays.scope_keys, + "loss_family": loss_arrays.family_keys, + "loss_epsilon": loss_arrays.epsilon, + "metadata": { + **loss_arrays.metadata(), + "target_names": target_names, + }, + } + + assert ecps._loss_terms(loss_inputs, weights).sum() == pytest.approx( + ecps._objective( + matrix, + loss_arrays.objective_target, + weights, + loss_arrays=loss_arrays, + ) + ) + + def _artifact_manifest(artifact_dir: Path, baseline_dataset: Path) -> None: (artifact_dir / "source_weight_diagnostics.json").write_text( json.dumps( diff --git a/tests/pipelines/test_pe_native_calibration_benchmark.py b/tests/pipelines/test_pe_native_calibration_benchmark.py index 622bbd6..77ab020 100644 --- a/tests/pipelines/test_pe_native_calibration_benchmark.py +++ b/tests/pipelines/test_pe_native_calibration_benchmark.py @@ -65,7 +65,8 @@ def test_build_policyengine_us_native_calibration_benchmark_scores_variants( existing_dataset = _write_dataset(tmp_path / "current_weight_diff.h5", [1.2, 0.8]) output_dir = tmp_path / "benchmark" - def fake_extract(**_kwargs): + def fake_extract(**kwargs): + assert kwargs["target_scope_filter"] == "national" return { "scaled_matrix": np.eye(2), "scaled_target": np.asarray([1.0, 0.0]), @@ -103,6 +104,7 @@ def fake_rewrite(**kwargs): return output_path.resolve() def fake_scores(**kwargs): + assert kwargs["target_scope_filter"] == "national" results = [] for candidate_path in kwargs["candidate_dataset_paths"]: path = Path(candidate_path).resolve() @@ -192,9 +194,11 @@ def fake_scores(**kwargs): target_total_weight_source="baseline", existing_candidates={"current_weight_diff": existing_dataset}, skip_tax_expenditure_targets=True, + target_scope_filter="national", ) assert payload["variant_count"] == 4 + assert payload["target_scope_filter"] == "national" assert payload["target_total_weight"] == 4.0 assert payload["target_total_weight_resolved_from"] == "baseline" assert payload["best_variant_label"] == "pe_native_unconstrained_baseline_total" diff --git a/tests/pipelines/test_pe_native_loss.py b/tests/pipelines/test_pe_native_loss.py new file mode 100644 index 0000000..925b381 --- /dev/null +++ b/tests/pipelines/test_pe_native_loss.py @@ -0,0 +1,69 @@ +"""Tests for robust PE-native loss helpers.""" + +from __future__ import annotations + +import numpy as np +import pytest + +from microplex_us.pipelines.pe_native_loss import ( + build_pe_native_loss_arrays, + pe_native_huber_loss, + pe_native_huber_loss_terms, +) +from microplex_us.pipelines.pe_native_optimization import ( + _project_to_simplex, + optimize_pe_native_loss_weights, +) + + +def test_bucketed_loss_downweights_tiny_baseline_outlier() -> None: + targets = np.asarray([10.0, 1_000_000.0]) + names = [ + "nation/irs/estate losses/total/AGI in 100k-200k/taxable/All", + "nation/irs/adjusted gross income/total/AGI in 500k-1m/taxable/All", + ] + loss_arrays = build_pe_native_loss_arrays(names, targets) + + estimate = np.asarray([410.0, 1_100_000.0]) + terms = pe_native_huber_loss_terms(estimate, loss_arrays) + + assert loss_arrays.target_weight[1] > loss_arrays.target_weight[0] + assert terms[0] / terms.sum() < 0.05 + assert terms[1] > terms[0] + + +def test_robust_pe_native_optimizer_uses_huber_objective() -> None: + matrix = np.asarray([[1.0, 0.0], [0.0, 1.0]]) + target = np.asarray([1.0, 1.0]) + loss_arrays = build_pe_native_loss_arrays( + [ + "nation/irs/example income/total/AGI in 0_1/taxable/All", + "nation/irs/example income/total/AGI in 1_2/taxable/All", + ], + target, + ) + initial_weights = np.asarray([0.0, 2.0]) + + optimized, summary = optimize_pe_native_loss_weights( + scaled_matrix=matrix, + scaled_target=target, + initial_weights=initial_weights, + loss_arrays=loss_arrays, + max_iter=100, + tol=1e-10, + ) + + assert summary["optimized_loss"] < summary["initial_loss"] + assert pe_native_huber_loss(matrix.T @ optimized, loss_arrays) == pytest.approx( + summary["optimized_loss"] + ) + assert optimized == pytest.approx(np.asarray([1.0, 1.0]), abs=1e-3) + + +def test_simplex_projection_preserves_large_population_total_exactly() -> None: + values = np.asarray([50_000_000.0, 50_000_000.0, 53_768_767.0]) + target_total = values.sum() - 1_000.0 + + projected = _project_to_simplex(values, target_total) + + assert projected.sum() == pytest.approx(target_total, abs=1e-6) diff --git a/tests/pipelines/test_pe_native_scores.py b/tests/pipelines/test_pe_native_scores.py index 5006097..e9311d6 100644 --- a/tests/pipelines/test_pe_native_scores.py +++ b/tests/pipelines/test_pe_native_scores.py @@ -23,6 +23,8 @@ write_us_pe_native_target_diagnostics, ) +_MICROPLEX_SRC = __import__("microplex_us").__path__[0].rsplit("/microplex_us", 1)[0] + def test_compute_us_pe_native_scores_wraps_broad_loss(monkeypatch, tmp_path) -> None: candidate = tmp_path / "candidate.h5" @@ -223,7 +225,10 @@ def test_compute_batch_us_pe_native_scores_wraps_multiple_candidates( assert results[1]["summary"]["candidate_beats_baseline"] is False assert results[1]["broad_loss"]["enhanced_cps_native_loss_delta"] == 0.25 assert results[0]["family_breakdown"][0]["family"] == "state_age_distribution" - assert results[1]["broad_loss"]["family_breakdown"][0]["family"] == "state_agi_distribution" + assert ( + results[1]["broad_loss"]["family_breakdown"][0]["family"] + == "state_agi_distribution" + ) assert results[0]["timing"]["batch_candidate_count"] == 2 assert results[0]["timing"]["batch_elapsed_seconds"] >= 0.0 @@ -243,6 +248,7 @@ def test_build_policyengine_us_data_pythonpath_includes_sibling_microimpute( assert pythonpath.split(os.pathsep) == [ str(repo), + _MICROPLEX_SRC, str(microimpute), "/tmp/existing-one", "/tmp/existing-two", @@ -276,6 +282,7 @@ def test_build_policyengine_us_data_subprocess_env_strips_outer_uv_markers( assert "UV_RUN_RECURSION_DEPTH" not in env assert env["PYTHONPATH"].split(os.pathsep) == [ str(repo), + _MICROPLEX_SRC, str(microimpute), "/tmp/existing", ] @@ -421,12 +428,8 @@ def test_compare_us_pe_native_target_deltas_wraps_subprocess_payload( def test_parse_pe_native_target_lookup_key_maps_eitc_agi_child_labels() -> None: - amount_key = parse_pe_native_target_lookup_key( - "nation/irs/eitc/amount/c3_1_1k" - ) - returns_key = parse_pe_native_target_lookup_key( - "nation/irs/eitc/returns/c2_1_1k" - ) + amount_key = parse_pe_native_target_lookup_key("nation/irs/eitc/amount/c3_1_1k") + returns_key = parse_pe_native_target_lookup_key("nation/irs/eitc/returns/c2_1_1k") assert amount_key is not None assert amount_key.variable == "eitc" @@ -464,9 +467,7 @@ def test_annotate_pe_native_target_db_matches_marks_matches_and_gaps( "notes": "Table 2.5", "geo_level": "national", "geographic_id": "US", - "domain_variable": ( - "adjusted_gross_income,eitc,eitc_child_count" - ), + "domain_variable": ("adjusted_gross_income,eitc,eitc_child_count"), "constraints": [ { "variable": "eitc_child_count", @@ -774,4 +775,7 @@ def test_compute_batch_us_pe_native_support_audits_wraps_multiple_candidates( assert len(results) == 2 assert results[0]["baseline_dataset"] == str(baseline) assert results[0]["candidate_dataset"] == str(candidate_a) - assert results[1]["comparisons"]["critical_input_support"][0]["weighted_nonzero_delta"] == 2.0 + assert ( + results[1]["comparisons"]["critical_input_support"][0]["weighted_nonzero_delta"] + == 2.0 + )