diff --git a/changelog.d/1150.fixed.md b/changelog.d/1150.fixed.md new file mode 100644 index 000000000..3b53b925c --- /dev/null +++ b/changelog.d/1150.fixed.md @@ -0,0 +1 @@ +Fix Enhanced CPS PUF-clone calibration by anchoring source household weights and excluding Forbes-scale PUF donors from clone training. diff --git a/policyengine_us_data/calibration/puf_impute.py b/policyengine_us_data/calibration/puf_impute.py index cda23d446..de7498178 100644 --- a/policyengine_us_data/calibration/puf_impute.py +++ b/policyengine_us_data/calibration/puf_impute.py @@ -40,6 +40,35 @@ PUF_SUBSAMPLE_TARGET = 20_000 PUF_TOP_PERCENTILE = 99.5 +FORBES_SYNTHETIC_FINANCIAL_THRESHOLD = 250_000_000 +PUF_METADATA_MISSING_TOP_TAIL_THRESHOLD = 10_000_000 +FORBES_METADATA_MARKER_THRESHOLDS = ( + ("forbes_unit_id", 0), + ("forbes_replicate_id", 0), + ("forbes_rank", 1), +) +PUF_METADATA_MISSING_TOP_TAIL_VARIABLES = ( + "adjusted_gross_income", + "qualified_dividend_income", + "non_qualified_dividend_income", + "taxable_interest_income", + "tax_exempt_interest_income", + "long_term_capital_gains", + "short_term_capital_gains", + "non_sch_d_capital_gains", + "long_term_capital_gains_on_collectibles", + "unrecaptured_section_1250_gain", + "partnership_s_corp_income", + "self_employment_income", + "sstb_self_employment_income", + "rental_income", + "farm_income", + "farm_rent_income", + "farm_operations_income", + "estate_income", + "charitable_cash_donations", + "charitable_non_cash_donations", +) DEMOGRAPHIC_PREDICTORS = [ "age", @@ -925,6 +954,7 @@ def _run_qrf_imputation( puf_sim = Microsimulation(dataset=puf_dataset) puf_agi = puf_sim.calculate("adjusted_gross_income", map_to="person").values + puf_data = puf_sim.dataset.load_dataset() X_train_full = puf_sim.calculate_dataframe( DEMOGRAPHIC_PREDICTORS + IMPUTED_VARIABLES @@ -936,6 +966,63 @@ def _run_qrf_imputation( del puf_sim + tax_unit_ids = _period_array(puf_data, "tax_unit_id", time_period) + has_forbes_metadata = _has_forbes_metadata( + puf_data, + time_period, + expected_length=0 if tax_unit_ids is None else len(tax_unit_ids), + ) + forbes_person_mask = _forbes_person_training_mask( + puf_data, + time_period, + n_persons=len(puf_agi), + ) + if has_forbes_metadata: + top_tail_threshold = FORBES_SYNTHETIC_FINANCIAL_THRESHOLD + top_tail_label = "Forbes" + low_weight_mask = np.ones_like(forbes_person_mask, dtype=bool) + else: + top_tail_threshold = PUF_METADATA_MISSING_TOP_TAIL_THRESHOLD + top_tail_label = "metadata-missing top-tail" + low_weight_mask = np.ones_like(forbes_person_mask, dtype=bool) + + forbes_person_mask |= low_weight_mask & (puf_agi >= top_tail_threshold) + for frame in (X_train_full, X_train_override): + candidate_columns = ( + IMPUTED_VARIABLES + OVERRIDDEN_IMPUTED_VARIABLES + if has_forbes_metadata + else PUF_METADATA_MISSING_TOP_TAIL_VARIABLES + ) + financial_columns = [ + column for column in candidate_columns if column in frame.columns + ] + if financial_columns: + forbes_person_mask |= low_weight_mask & ( + frame[financial_columns].abs().max(axis=1).to_numpy() + >= top_tail_threshold + ) + if len(forbes_person_mask) == len(puf_agi) and forbes_person_mask.any(): + if len(X_train_full) != len(forbes_person_mask) or len(X_train_override) != len( + forbes_person_mask + ): + logger.warning( + "Skipping Forbes donor exclusion because QRF training " + "frames do not match person-level PUF metadata lengths" + ) + else: + logger.info( + "Excluding %d %s person records from PUF QRF training at threshold $%s", + int(forbes_person_mask.sum()), + top_tail_label, + f"{top_tail_threshold:,.0f}", + ) + non_forbes_mask = ~forbes_person_mask + puf_agi = puf_agi[non_forbes_mask] + X_train_full = X_train_full.loc[non_forbes_mask].reset_index(drop=True) + X_train_override = X_train_override.loc[non_forbes_mask].reset_index( + drop=True + ) + sub_idx = _stratified_subsample_index(puf_agi) _log_stratified_subsample( len(puf_agi), @@ -975,6 +1062,79 @@ def _run_qrf_imputation( return y_full, y_override +def _period_array( + data: Dict[str, Dict[int, np.ndarray]], + variable: str, + time_period: int, +) -> Optional[np.ndarray]: + if variable not in data: + return None + values = data[variable] + if isinstance(values, dict): + values = values.get(time_period, values.get(str(time_period))) + if values is None: + return None + return np.asarray(values) + + +def _has_forbes_metadata( + puf_data: Dict[str, Dict[int, np.ndarray]], + time_period: int, + expected_length: int, +) -> bool: + """Return whether usable Forbes synthetic-record metadata is present.""" + if expected_length <= 0: + return False + for variable, marker_threshold in FORBES_METADATA_MARKER_THRESHOLDS: + values = _period_array(puf_data, variable, time_period) + if values is None or len(values) != expected_length: + continue + values = np.asarray(values, dtype=float) + if np.any(values >= marker_threshold): + return True + return False + + +def _forbes_person_training_mask( + puf_data: Dict[str, Dict[int, np.ndarray]], + time_period: int, + n_persons: int, +) -> np.ndarray: + """Return person-level mask for synthetic Forbes top-tail PUF records.""" + tax_unit_id = _period_array(puf_data, "tax_unit_id", time_period) + person_tax_unit_id = _period_array(puf_data, "person_tax_unit_id", time_period) + if tax_unit_id is None or person_tax_unit_id is None: + return np.zeros(n_persons, dtype=bool) + if len(person_tax_unit_id) != n_persons: + return np.zeros(n_persons, dtype=bool) + + tax_unit_forbes = np.zeros(len(tax_unit_id), dtype=bool) + for variable, default_threshold in FORBES_METADATA_MARKER_THRESHOLDS: + values = _period_array(puf_data, variable, time_period) + if values is None or len(values) != len(tax_unit_id): + continue + values = np.asarray(values, dtype=float) + if default_threshold == 0: + tax_unit_forbes |= values >= 0 + else: + tax_unit_forbes |= values >= default_threshold + + if not tax_unit_forbes.any(): + return np.zeros(n_persons, dtype=bool) + + sorted_index = np.argsort(tax_unit_id) + sorted_tax_unit_id = tax_unit_id[sorted_index] + sorted_tax_unit_forbes = tax_unit_forbes[sorted_index] + + positions = np.searchsorted(sorted_tax_unit_id, person_tax_unit_id) + valid = positions < len(sorted_tax_unit_id) + person_mask = np.zeros(n_persons, dtype=bool) + valid_positions = positions[valid] + valid[valid] = sorted_tax_unit_id[valid_positions] == person_tax_unit_id[valid] + person_mask[valid] = sorted_tax_unit_forbes[positions[valid]] + return person_mask + + def _stratified_subsample_index( income: np.ndarray, target_n: int = PUF_SUBSAMPLE_TARGET, diff --git a/policyengine_us_data/datasets/cps/enhanced_cps.py b/policyengine_us_data/datasets/cps/enhanced_cps.py index de0d0b58e..1d1a852ca 100644 --- a/policyengine_us_data/datasets/cps/enhanced_cps.py +++ b/policyengine_us_data/datasets/cps/enhanced_cps.py @@ -7,6 +7,7 @@ from policyengine_us_data.utils import ( ABSOLUTE_ERROR_SCALE_TARGETS, HOUSEHOLD_COUNT_TARGET, + PUF_CLONE_HOUSEHOLD_COUNT_TARGET_SHARE, build_loss_matrix, get_target_error_normalisation, get_target_loss_weights, @@ -42,6 +43,12 @@ torch = None +HOUSEHOLD_WEIGHT_TOTAL_REL_TOLERANCE = 0.02 +PUF_CLONE_HOUSEHOLD_WEIGHT_SHARE_TOLERANCE = 0.10 +PERSON_POVERTY_RATE_MIN = 0.05 +PERSON_POVERTY_RATE_MAX = 0.25 + + def initialize_weight_priors( original_weights: np.ndarray, seed: int = 1456, @@ -81,6 +88,119 @@ def initialize_weight_priors( return priors +def validate_household_weight_total( + weights: np.ndarray, + *, + source_total: float, + year: int, + rel_tolerance: float = HOUSEHOLD_WEIGHT_TOTAL_REL_TOLERANCE, +) -> float: + """Validate calibrated household weights against the source total.""" + + weights = np.asarray(weights) + if np.any(np.isnan(weights)): + raise ValueError(f"Year {year}: household_weight contains NaN values") + if np.any(weights < 0): + raise ValueError(f"Year {year}: household_weight contains negative values") + + weighted_hh_count = float(np.sum(weights)) + if not (1e8 <= weighted_hh_count <= 2e8): + raise ValueError( + f"Year {year}: weighted household count " + f"{weighted_hh_count:,.0f} outside expected range " + f"[100M, 200M]" + ) + + source_total = float(source_total) + if not np.isfinite(source_total) or source_total <= 0: + raise ValueError( + f"Year {year}: source household count total must be positive; " + f"got {source_total:,.0f}" + ) + + rel_error = abs(weighted_hh_count - source_total) / source_total + if rel_error > rel_tolerance: + raise ValueError( + f"Year {year}: weighted household count " + f"{weighted_hh_count:,.0f} differs from source household count " + f"{source_total:,.0f} by {rel_error:.2%}, exceeding " + f"{rel_tolerance:.2%} tolerance" + ) + + return weighted_hh_count + + +def validate_clone_household_weight_share( + weights: np.ndarray, + household_is_puf_clone: np.ndarray, + *, + year: int, + target_share: float = PUF_CLONE_HOUSEHOLD_COUNT_TARGET_SHARE, + abs_tolerance: float = PUF_CLONE_HOUSEHOLD_WEIGHT_SHARE_TOLERANCE, +) -> float: + """Validate that PUF-clone households do not dominate final weights.""" + + weights = np.asarray(weights, dtype=np.float64) + household_is_puf_clone = np.asarray(household_is_puf_clone, dtype=bool) + if len(weights) != len(household_is_puf_clone): + raise ValueError( + f"Year {year}: household_is_puf_clone length " + f"{len(household_is_puf_clone)} does not match household_weight " + f"length {len(weights)}" + ) + + total = float(weights.sum()) + if total <= 0: + raise ValueError(f"Year {year}: household_weight total must be positive") + + clone_share = float(weights[household_is_puf_clone].sum()) / total + if abs(clone_share - target_share) > abs_tolerance: + raise ValueError( + f"Year {year}: PUF-clone household weight share " + f"{clone_share:.2%} differs from target {target_share:.2%} by " + f"{abs(clone_share - target_share):.2%}, exceeding " + f"{abs_tolerance:.2%} tolerance" + ) + + return clone_share + + +def _period_array_from_loaded_dataset( + data: dict, + variable_name: str, + period: int, +) -> np.ndarray: + values_by_period = data[variable_name] + if period in values_by_period: + return values_by_period[period] + period_key = str(period) + if period_key in values_by_period: + return values_by_period[period_key] + raise KeyError(f"{variable_name}[{period}] not found in loaded dataset") + + +def validate_person_poverty_rate( + sim, + *, + year: int, + min_rate: float = PERSON_POVERTY_RATE_MIN, + max_rate: float = PERSON_POVERTY_RATE_MAX, +) -> float: + """Fail fast when calibrated weights imply an implausible poverty rate.""" + + poverty_rate = float( + sim.calculate("person_in_poverty", period=year, map_to="person").mean() + ) + if not np.isfinite(poverty_rate): + raise ValueError(f"Year {year}: person poverty rate is not finite") + if not (min_rate <= poverty_rate <= max_rate): + raise ValueError( + f"Year {year}: person poverty rate {poverty_rate:.2%} outside " + f"expected range [{min_rate:.2%}, {max_rate:.2%}]" + ) + return poverty_rate + + def _to_numpy(value) -> np.ndarray: return np.asarray(getattr(value, "values", value)) @@ -639,6 +759,7 @@ def generate(self): data["household_weight"] = {} original_weights = sim.calculate("household_weight") original_weights = initialize_weight_priors(original_weights.values) + source_household_count = float(np.sum(original_weights)) bad_targets = [ "nation/irs/adjusted gross income/total/AGI in 10k-15k/taxable/Head of Household", @@ -687,26 +808,34 @@ def generate(self): seed=1456, ) data["household_weight"][year] = optimised_weights + sim.set_input( + "household_weight", + year, + optimised_weights.astype(np.float32), + ) - # Validate dense weights - w = optimised_weights - if np.any(np.isnan(w)): - raise ValueError(f"Year {year}: household_weight contains NaN values") - if np.any(w < 0): - raise ValueError( - f"Year {year}: household_weight contains negative values" - ) - weighted_hh_count = float(np.sum(w)) - if not (1e8 <= weighted_hh_count <= 2e8): - raise ValueError( - f"Year {year}: weighted household count " - f"{weighted_hh_count:,.0f} outside expected range " - f"[100M, 200M]" - ) + weighted_hh_count = validate_household_weight_total( + optimised_weights, + source_total=source_household_count, + year=year, + ) + clone_household_share = validate_clone_household_weight_share( + optimised_weights, + _period_array_from_loaded_dataset( + data, + "household_is_puf_clone", + year, + ), + year=year, + ) + poverty_rate = validate_person_poverty_rate(sim, year=year) logging.info( f"Year {year}: weights validated — " - f"{weighted_hh_count:,.0f} weighted households, " - f"{int(np.sum(w > 0))} non-zero" + f"{weighted_hh_count:,.0f} weighted households " + f"vs {source_household_count:,.0f} source households, " + f"{clone_household_share:.1%} PUF-clone household share, " + f"{poverty_rate:.1%} person poverty rate, " + f"{int(np.sum(optimised_weights > 0))} non-zero" ) if 2025 in ACA_POST_CALIBRATION_PERSON_TARGETS: @@ -824,9 +953,15 @@ def generate(self): data = sim.dataset.load_dataset() original_weights = sim.calculate("household_weight") original_weights = initialize_weight_priors(original_weights.values) + source_household_count = float(np.sum(original_weights)) for year in [2024]: loss_matrix, targets_array = build_loss_matrix(self.input_dataset, year) optimised_weights = reweight(original_weights, loss_matrix, targets_array) + validate_household_weight_total( + optimised_weights, + source_total=source_household_count, + year=year, + ) data["household_weight"] = optimised_weights self.save_dataset(data) diff --git a/policyengine_us_data/utils/__init__.py b/policyengine_us_data/utils/__init__.py index a0a0f2507..841bcde9e 100644 --- a/policyengine_us_data/utils/__init__.py +++ b/policyengine_us_data/utils/__init__.py @@ -10,8 +10,11 @@ __all__ = [ "ABSOLUTE_ERROR_SCALE_TARGETS", + "CPS_HOUSEHOLD_COUNT_TARGET", "HardConcrete", "HOUSEHOLD_COUNT_TARGET", + "PUF_CLONE_HOUSEHOLD_COUNT_TARGET", + "PUF_CLONE_HOUSEHOLD_COUNT_TARGET_SHARE", "build_loss_matrix", "get_target_error_normalisation", "get_target_loss_weights", diff --git a/policyengine_us_data/utils/loss.py b/policyengine_us_data/utils/loss.py index 373dc4731..b143824c0 100644 --- a/policyengine_us_data/utils/loss.py +++ b/policyengine_us_data/utils/loss.py @@ -4,6 +4,7 @@ import numpy as np import logging import sqlite3 +from pathlib import Path from policyengine_us_data.storage import CALIBRATION_FOLDER, STORAGE_FOLDER from policyengine_us_data.storage.calibration_targets.pull_soi_targets import ( @@ -83,7 +84,10 @@ BEA_NIPA_DIRECT_SUM_LOSS_WEIGHT = 1_000.0 BEA_WAGES_AND_SALARIES_LOSS_WEIGHT = 1_000.0 HOUSEHOLD_COUNT_TARGET = "nation/source/household_count" -HOUSEHOLD_COUNT_LOSS_WEIGHT = 1_000.0 +CPS_HOUSEHOLD_COUNT_TARGET = "nation/source/cps_household_count" +PUF_CLONE_HOUSEHOLD_COUNT_TARGET = "nation/source/puf_clone_household_count" +PUF_CLONE_HOUSEHOLD_COUNT_TARGET_SHARE = 0.5 +HOUSEHOLD_COUNT_LOSS_WEIGHT = 100_000.0 CBO_INCOME_BY_SOURCE_TARGETS = [ ("irs_employment_income", "employment_income"), @@ -1201,7 +1205,31 @@ def _add_transfer_balance_targets(loss_matrix, targets_list, sim, time_period): return targets_list, loss_matrix -def _add_household_count_target(loss_matrix, targets_list, sim): +def _load_household_is_puf_clone(dataset, time_period): + file_path = getattr(dataset, "file_path", None) + if file_path is None: + return None + file_path = Path(file_path) + if not file_path.exists(): + return None + + import h5py + + with h5py.File(file_path, "r") as h5_file: + if "household_is_puf_clone" not in h5_file: + return None + obj = h5_file["household_is_puf_clone"] + if isinstance(obj, h5py.Dataset): + return np.asarray(obj[...], dtype=bool) + period_key = str(time_period) + if period_key in obj: + return np.asarray(obj[period_key][...], dtype=bool) + if str(int(time_period)) in obj: + return np.asarray(obj[str(int(time_period))][...], dtype=bool) + return None + + +def _add_household_count_target(loss_matrix, targets_list, sim, dataset, time_period): """Constrain total household weight to the source survey total.""" household_weights = sim.calculate("household_weight").values @@ -1223,6 +1251,27 @@ def _add_household_count_target(loss_matrix, targets_list, sim): dtype=np.float32, ) targets_list.append(target) + + household_is_puf_clone = _load_household_is_puf_clone(dataset, time_period) + if household_is_puf_clone is not None: + if len(household_is_puf_clone) != len(household_weights): + raise ValueError( + "PUF clone flag length mismatch: " + f"household_is_puf_clone has {len(household_is_puf_clone)} " + f"values but household_weight has {len(household_weights)} values" + ) + + puf_clone_target = target * PUF_CLONE_HOUSEHOLD_COUNT_TARGET_SHARE + cps_target = target - puf_clone_target + loss_matrix[CPS_HOUSEHOLD_COUNT_TARGET] = (~household_is_puf_clone).astype( + np.float32 + ) + targets_list.append(cps_target) + loss_matrix[PUF_CLONE_HOUSEHOLD_COUNT_TARGET] = (household_is_puf_clone).astype( + np.float32 + ) + targets_list.append(puf_clone_target) + return targets_list, loss_matrix @@ -1254,7 +1303,15 @@ def get_target_loss_weights(target_names): ) | np.char.startswith(target_names, "state/bea/wages_and_salaries/") weights[is_bea_direct_sum_target] = BEA_NIPA_DIRECT_SUM_LOSS_WEIGHT weights[is_bea_wage_target] = BEA_WAGES_AND_SALARIES_LOSS_WEIGHT - weights[target_names == HOUSEHOLD_COUNT_TARGET] = HOUSEHOLD_COUNT_LOSS_WEIGHT + household_count_targets = np.isin( + target_names, + [ + HOUSEHOLD_COUNT_TARGET, + CPS_HOUSEHOLD_COUNT_TARGET, + PUF_CLONE_HOUSEHOLD_COUNT_TARGET, + ], + ) + weights[household_count_targets] = HOUSEHOLD_COUNT_LOSS_WEIGHT return weights @@ -1392,6 +1449,8 @@ def build_loss_matrix(dataset: type, time_period): loss_matrix, targets_array, sim, + dataset, + time_period, ) # Census single-year age population projections diff --git a/tests/unit/calibration/test_calibration_puf_impute.py b/tests/unit/calibration/test_calibration_puf_impute.py index 6f2f60561..de6fa4c90 100644 --- a/tests/unit/calibration/test_calibration_puf_impute.py +++ b/tests/unit/calibration/test_calibration_puf_impute.py @@ -12,9 +12,11 @@ DEMOGRAPHIC_PREDICTORS, IMPUTED_VARIABLES, OVERRIDDEN_IMPUTED_VARIABLES, + _forbes_person_training_mask, _impute_retirement_contributions, _impute_weeks_unemployed, _log_stratified_subsample, + _run_qrf_imputation, _stratified_subsample_index, puf_clone_dataset, ) @@ -384,6 +386,383 @@ def test_indices_sorted(self): assert np.all(idx[1:] >= idx[:-1]) +class TestForbesTrainingExclusion: + def test_maps_forbes_tax_units_to_person_records(self): + data = { + "tax_unit_id": {2024: np.array([10, 20, 30])}, + "person_tax_unit_id": {2024: np.array([10, 20, 20, 30])}, + "forbes_unit_id": {2024: np.array([-1, 0, -1])}, + "forbes_replicate_id": {2024: np.array([-1, 3, -1])}, + "forbes_rank": {2024: np.array([0, 42, 0])}, + } + + result = _forbes_person_training_mask(data, 2024, n_persons=4) + + np.testing.assert_array_equal( + result, + np.array([False, True, True, False]), + ) + + def test_missing_forbes_metadata_keeps_all_records(self): + data = { + "tax_unit_id": {2024: np.array([10, 20])}, + "person_tax_unit_id": {2024: np.array([10, 20])}, + } + + result = _forbes_person_training_mask(data, 2024, n_persons=2) + + np.testing.assert_array_equal(result, np.array([False, False])) + + def test_qrf_training_filters_forbes_person_records(self, monkeypatch): + class FakeDataset: + def load_dataset(self): + return { + "tax_unit_id": {2024: np.array([10, 20, 30])}, + "person_tax_unit_id": {2024: np.array([10, 20, 30, 30])}, + "forbes_unit_id": {2024: np.array([-1, 0, -1])}, + "forbes_rank": {2024: np.array([0, 1, 0])}, + "forbes_replicate_id": {2024: np.array([-1, 0, -1])}, + } + + class FakeMicrosimulation: + def __init__(self, dataset): + self.dataset = FakeDataset() + + def calculate(self, variable, map_to=None): + assert map_to == "person" + assert variable == "adjusted_gross_income" + return pd.Series([10.0, 30_000_000.0, 20.0, 30.0]) + + def calculate_dataframe(self, columns): + frame = pd.DataFrame({"age": [40.0, 99.0, 50.0, 55.0]}) + for column in columns: + if column not in frame: + frame[column] = 0.0 + return frame[list(columns)] + + train_frames = [] + + def fake_sequential_qrf(X_train, X_test, predictors, output_vars): + train_frames.append(X_train.copy()) + return {variable: np.zeros(len(X_test)) for variable in output_vars} + + monkeypatch.setattr("policyengine_us.Microsimulation", FakeMicrosimulation) + monkeypatch.setattr( + puf_impute_module, + "_sequential_qrf", + fake_sequential_qrf, + ) + + data = { + predictor: {2024: np.array([0.0, 1.0])} + for predictor in DEMOGRAPHIC_PREDICTORS + } + _run_qrf_imputation( + data=data, + time_period=2024, + puf_dataset=object(), + dataset_path=None, + ) + + assert len(train_frames) == 2 + assert all(len(frame) == 3 for frame in train_frames) + assert all(99.0 not in set(frame["age"]) for frame in train_frames) + + def test_qrf_training_filters_synthetic_top_tail_without_metadata( + self, monkeypatch + ): + class FakeDataset: + def load_dataset(self): + return { + "tax_unit_id": {2024: np.array([10, 20, 30])}, + "person_tax_unit_id": {2024: np.array([10, 20, 30, 30])}, + } + + class FakeMicrosimulation: + def __init__(self, dataset): + self.dataset = FakeDataset() + + def calculate(self, variable, map_to=None): + assert map_to == "person" + if variable == "person_weight": + return pd.Series([100.0, 0.13, 100.0, 100.0]) + assert variable == "adjusted_gross_income" + return pd.Series([10.0, 1_000_000_000.0, 20.0, 30.0]) + + def calculate_dataframe(self, columns): + frame = pd.DataFrame({"age": [40.0, 99.0, 50.0, 55.0]}) + for column in columns: + if column not in frame: + frame[column] = 0.0 + return frame[list(columns)] + + train_frames = [] + + def fake_sequential_qrf(X_train, X_test, predictors, output_vars): + train_frames.append(X_train.copy()) + return {variable: np.zeros(len(X_test)) for variable in output_vars} + + monkeypatch.setattr("policyengine_us.Microsimulation", FakeMicrosimulation) + monkeypatch.setattr( + puf_impute_module, + "_sequential_qrf", + fake_sequential_qrf, + ) + + data = { + predictor: {2024: np.array([0.0, 1.0])} + for predictor in DEMOGRAPHIC_PREDICTORS + } + _run_qrf_imputation( + data=data, + time_period=2024, + puf_dataset=object(), + dataset_path=None, + ) + + assert len(train_frames) == 2 + assert all(len(frame) == 3 for frame in train_frames) + assert all(99.0 not in set(frame["age"]) for frame in train_frames) + + def test_qrf_training_filters_synthetic_top_tail_components_without_metadata( + self, monkeypatch + ): + class FakeDataset: + def load_dataset(self): + return { + "tax_unit_id": {2024: np.array([10, 20, 30])}, + "person_tax_unit_id": {2024: np.array([10, 20, 30, 30])}, + } + + class FakeMicrosimulation: + def __init__(self, dataset): + self.dataset = FakeDataset() + + def calculate(self, variable, map_to=None): + assert map_to == "person" + assert variable == "adjusted_gross_income" + return pd.Series([10.0, 20.0, 30.0, 40.0]) + + def calculate_dataframe(self, columns): + frame = pd.DataFrame( + { + "age": [40.0, 99.0, 50.0, 55.0], + "long_term_capital_gains": [ + 0.0, + 30_000_000.0, + 0.0, + 0.0, + ], + } + ) + for column in columns: + if column not in frame: + frame[column] = 0.0 + return frame[list(columns)] + + train_frames = [] + + def fake_sequential_qrf(X_train, X_test, predictors, output_vars): + train_frames.append(X_train.copy()) + return {variable: np.zeros(len(X_test)) for variable in output_vars} + + monkeypatch.setattr("policyengine_us.Microsimulation", FakeMicrosimulation) + monkeypatch.setattr( + puf_impute_module, + "_sequential_qrf", + fake_sequential_qrf, + ) + + data = { + predictor: {2024: np.array([0.0, 1.0])} + for predictor in DEMOGRAPHIC_PREDICTORS + } + _run_qrf_imputation( + data=data, + time_period=2024, + puf_dataset=object(), + dataset_path=None, + ) + + assert len(train_frames) == 2 + assert all(len(frame) == 3 for frame in train_frames) + assert all(99.0 not in set(frame["age"]) for frame in train_frames) + + def test_qrf_training_filters_normal_weight_top_tail_without_metadata( + self, monkeypatch + ): + class FakeDataset: + def load_dataset(self): + return { + "tax_unit_id": {2024: np.array([10, 20, 30])}, + "person_tax_unit_id": {2024: np.array([10, 20, 30, 30])}, + } + + class FakeMicrosimulation: + def __init__(self, dataset): + self.dataset = FakeDataset() + + def calculate(self, variable, map_to=None): + assert map_to == "person" + assert variable == "adjusted_gross_income" + return pd.Series([10.0, 30_000_000.0, 20.0, 30.0]) + + def calculate_dataframe(self, columns): + frame = pd.DataFrame({"age": [40.0, 99.0, 50.0, 55.0]}) + for column in columns: + if column not in frame: + frame[column] = 0.0 + return frame[list(columns)] + + train_frames = [] + + def fake_sequential_qrf(X_train, X_test, predictors, output_vars): + train_frames.append(X_train.copy()) + return {variable: np.zeros(len(X_test)) for variable in output_vars} + + monkeypatch.setattr("policyengine_us.Microsimulation", FakeMicrosimulation) + monkeypatch.setattr( + puf_impute_module, + "_sequential_qrf", + fake_sequential_qrf, + ) + + data = { + predictor: {2024: np.array([0.0, 1.0])} + for predictor in DEMOGRAPHIC_PREDICTORS + } + _run_qrf_imputation( + data=data, + time_period=2024, + puf_dataset=object(), + dataset_path=None, + ) + + assert len(train_frames) == 2 + assert all(len(frame) == 3 for frame in train_frames) + assert all(99.0 not in set(frame["age"]) for frame in train_frames) + + def test_qrf_training_filters_all_default_metadata_top_tail(self, monkeypatch): + class FakeDataset: + def load_dataset(self): + return { + "tax_unit_id": {2024: np.array([10, 20, 30])}, + "person_tax_unit_id": {2024: np.array([10, 20, 30, 30])}, + "forbes_unit_id": {2024: np.array([-1, -1, -1])}, + "forbes_rank": {2024: np.array([0, 0, 0])}, + "forbes_replicate_id": {2024: np.array([-1, -1, -1])}, + } + + class FakeMicrosimulation: + def __init__(self, dataset): + self.dataset = FakeDataset() + + def calculate(self, variable, map_to=None): + assert map_to == "person" + assert variable == "adjusted_gross_income" + return pd.Series([10.0, 30_000_000.0, 20.0, 30.0]) + + def calculate_dataframe(self, columns): + frame = pd.DataFrame({"age": [40.0, 99.0, 50.0, 55.0]}) + for column in columns: + if column not in frame: + frame[column] = 0.0 + return frame[list(columns)] + + train_frames = [] + + def fake_sequential_qrf(X_train, X_test, predictors, output_vars): + train_frames.append(X_train.copy()) + return {variable: np.zeros(len(X_test)) for variable in output_vars} + + monkeypatch.setattr("policyengine_us.Microsimulation", FakeMicrosimulation) + monkeypatch.setattr( + puf_impute_module, + "_sequential_qrf", + fake_sequential_qrf, + ) + + data = { + predictor: {2024: np.array([0.0, 1.0])} + for predictor in DEMOGRAPHIC_PREDICTORS + } + _run_qrf_imputation( + data=data, + time_period=2024, + puf_dataset=object(), + dataset_path=None, + ) + + assert len(train_frames) == 2 + assert all(len(frame) == 3 for frame in train_frames) + assert all(99.0 not in set(frame["age"]) for frame in train_frames) + + def test_qrf_training_keeps_non_forbes_top_tail_with_metadata(self, monkeypatch): + class FakeDataset: + def load_dataset(self): + return { + "tax_unit_id": {2024: np.array([10, 20, 30])}, + "person_tax_unit_id": {2024: np.array([10, 20, 30, 30])}, + "forbes_unit_id": {2024: np.array([-1, -1, 0])}, + "forbes_rank": {2024: np.array([0, 0, 1])}, + "forbes_replicate_id": {2024: np.array([-1, -1, 0])}, + } + + class FakeMicrosimulation: + def __init__(self, dataset): + self.dataset = FakeDataset() + + def calculate(self, variable, map_to=None): + assert map_to == "person" + assert variable == "adjusted_gross_income" + return pd.Series([10.0, 20.0, 30.0, 40.0]) + + def calculate_dataframe(self, columns): + frame = pd.DataFrame( + { + "age": [40.0, 99.0, 50.0, 55.0], + "long_term_capital_gains": [ + 0.0, + 30_000_000.0, + 0.0, + 0.0, + ], + } + ) + for column in columns: + if column not in frame: + frame[column] = 0.0 + return frame[list(columns)] + + train_frames = [] + + def fake_sequential_qrf(X_train, X_test, predictors, output_vars): + train_frames.append(X_train.copy()) + return {variable: np.zeros(len(X_test)) for variable in output_vars} + + monkeypatch.setattr("policyengine_us.Microsimulation", FakeMicrosimulation) + monkeypatch.setattr( + puf_impute_module, + "_sequential_qrf", + fake_sequential_qrf, + ) + + data = { + predictor: {2024: np.array([0.0, 1.0])} + for predictor in DEMOGRAPHIC_PREDICTORS + } + _run_qrf_imputation( + data=data, + time_period=2024, + puf_dataset=object(), + dataset_path=None, + ) + + assert len(train_frames) == 2 + assert all(len(frame) == 2 for frame in train_frames) + assert all(99.0 in set(frame["age"]) for frame in train_frames) + + def test_retirement_imputation_uses_sstb_income_for_se_eligibility(monkeypatch): class FakeMicrosimulation: def __init__(self, dataset): diff --git a/tests/unit/calibration/test_loss_targets.py b/tests/unit/calibration/test_loss_targets.py index 2cacbbc9a..9ec8032b9 100644 --- a/tests/unit/calibration/test_loss_targets.py +++ b/tests/unit/calibration/test_loss_targets.py @@ -1,6 +1,7 @@ import inspect from types import SimpleNamespace +import h5py import numpy as np import pandas as pd import pytest @@ -14,11 +15,13 @@ BEA_NIPA_DIRECT_SUM_TARGETS, BEA_NIPA_DIRECT_SUM_LOSS_WEIGHT, BEA_WAGES_AND_SALARIES_LOSS_WEIGHT, + CPS_HOUSEHOLD_COUNT_TARGET, BLS_CE_TOTALS, HARD_CODED_TOTALS, HOUSEHOLD_COUNT_LOSS_WEIGHT, HOUSEHOLD_COUNT_TARGET, LOW_AGI_INVESTMENT_INCOME_SOI_VARIABLES, + PUF_CLONE_HOUSEHOLD_COUNT_TARGET, SOI_NEGATIVE_AGI_TARGETED_VARIABLES, TRANSFER_BALANCE_TARGETS, _add_bea_state_wage_targets, @@ -174,6 +177,8 @@ def test_household_count_target_gets_higher_loss_weight(): target_names = np.array( [ HOUSEHOLD_COUNT_TARGET, + CPS_HOUSEHOLD_COUNT_TARGET, + PUF_CLONE_HOUSEHOLD_COUNT_TARGET, "nation/census/population_by_age/0", ] ) @@ -181,6 +186,8 @@ def test_household_count_target_gets_higher_loss_weight(): weights = get_target_loss_weights(target_names) assert weights.tolist() == [ + HOUSEHOLD_COUNT_LOSS_WEIGHT, + HOUSEHOLD_COUNT_LOSS_WEIGHT, HOUSEHOLD_COUNT_LOSS_WEIGHT, 1.0, ] @@ -464,6 +471,8 @@ def test_add_household_count_target_uses_source_weight_total(): loss_matrix, [], _FakeHouseholdWeightSimulation([80.0, 20.0, 0.0, 0.0]), + SimpleNamespace(), + 2024, ) assert targets == [100.0] @@ -473,6 +482,58 @@ def test_add_household_count_target_uses_source_weight_total(): ) +def test_add_household_count_target_adds_clone_split_targets(tmp_path): + file_path = tmp_path / "extended_cps_2024.h5" + with h5py.File(file_path, "w") as h5_file: + group = h5_file.create_group("household_is_puf_clone") + group.create_dataset("2024", data=np.array([0, 0, 1, 1], dtype=np.int8)) + + loss_matrix = pd.DataFrame(index=[101, 102, 103, 104]) + + targets, loss_matrix = _add_household_count_target( + loss_matrix, + [], + _FakeHouseholdWeightSimulation([80.0, 20.0, 0.0, 0.0]), + SimpleNamespace(file_path=file_path), + 2024, + ) + + assert targets == [100.0, 50.0, 50.0] + np.testing.assert_array_equal( + loss_matrix[CPS_HOUSEHOLD_COUNT_TARGET].to_numpy(), + np.array([1.0, 1.0, 0.0, 0.0], dtype=np.float32), + ) + np.testing.assert_array_equal( + loss_matrix[PUF_CLONE_HOUSEHOLD_COUNT_TARGET].to_numpy(), + np.array([0.0, 0.0, 1.0, 1.0], dtype=np.float32), + ) + + +def test_add_household_count_target_accepts_string_dataset_path(tmp_path): + file_path = tmp_path / "extended_cps_2024.h5" + with h5py.File(file_path, "w") as h5_file: + group = h5_file.create_group("household_is_puf_clone") + group.create_dataset("2024", data=np.array([0, 1], dtype=np.int8)) + + targets, loss_matrix = _add_household_count_target( + pd.DataFrame(index=[101, 102]), + [], + _FakeHouseholdWeightSimulation([80.0, 20.0]), + SimpleNamespace(file_path=str(file_path)), + 2024, + ) + + assert targets == [100.0, 50.0, 50.0] + np.testing.assert_array_equal( + loss_matrix[CPS_HOUSEHOLD_COUNT_TARGET].to_numpy(), + np.array([1.0, 0.0], dtype=np.float32), + ) + np.testing.assert_array_equal( + loss_matrix[PUF_CLONE_HOUSEHOLD_COUNT_TARGET].to_numpy(), + np.array([0.0, 1.0], dtype=np.float32), + ) + + def test_build_loss_matrix_adds_household_count_target_before_reweighting(): source = inspect.getsource(build_loss_matrix) diff --git a/tests/unit/datasets/test_enhanced_cps_seeding.py b/tests/unit/datasets/test_enhanced_cps_seeding.py index 11e7e457d..de875ad3e 100644 --- a/tests/unit/datasets/test_enhanced_cps_seeding.py +++ b/tests/unit/datasets/test_enhanced_cps_seeding.py @@ -8,6 +8,7 @@ """ import numpy as np +import pytest from policyengine_us_data.utils.seed import set_seeds @@ -56,3 +57,96 @@ def test_initialize_weight_priors_preserves_source_weight_total(): np.testing.assert_allclose(priors.sum(), 100.0) np.testing.assert_allclose(priors, np.array([40.0, 10.0, 25.0, 25.0])) + + +def test_validate_household_weight_total_accepts_close_total(): + from policyengine_us_data.datasets.cps.enhanced_cps import ( + validate_household_weight_total, + ) + + total = validate_household_weight_total( + np.array([50_000_000.0, 96_000_000.0]), + source_total=145_000_000.0, + year=2024, + ) + + assert total == 146_000_000.0 + + +def test_validate_household_weight_total_rejects_inflated_total(): + from policyengine_us_data.datasets.cps.enhanced_cps import ( + validate_household_weight_total, + ) + + with pytest.raises(ValueError, match="differs from source household count"): + validate_household_weight_total( + np.array([100_000_000.0, 86_900_000.0]), + source_total=145_000_000.0, + year=2024, + ) + + +def test_validate_clone_household_weight_share_accepts_target_share(): + from policyengine_us_data.datasets.cps.enhanced_cps import ( + validate_clone_household_weight_share, + ) + + share = validate_clone_household_weight_share( + np.array([40_000_000.0, 10_000_000.0, 25_000_000.0, 25_000_000.0]), + np.array([False, False, True, True]), + year=2024, + ) + + assert share == pytest.approx(0.5) + + +def test_validate_clone_household_weight_share_rejects_clone_dominance(): + from policyengine_us_data.datasets.cps.enhanced_cps import ( + validate_clone_household_weight_share, + ) + + with pytest.raises(ValueError, match="PUF-clone household weight share"): + validate_clone_household_weight_share( + np.array([10_000_000.0, 10_000_000.0, 40_000_000.0, 40_000_000.0]), + np.array([False, False, True, True]), + year=2024, + ) + + +def test_validate_person_poverty_rate_accepts_reasonable_rate(): + from policyengine_us_data.datasets.cps.enhanced_cps import ( + validate_person_poverty_rate, + ) + + class FakePoverty: + def mean(self): + return 0.12 + + class FakeSimulation: + def calculate(self, variable, period, map_to): + assert variable == "person_in_poverty" + assert period == 2024 + assert map_to == "person" + return FakePoverty() + + assert validate_person_poverty_rate(FakeSimulation(), year=2024) == 0.12 + + +def test_validate_person_poverty_rate_rejects_implausible_rate(): + from policyengine_us_data.datasets.cps.enhanced_cps import ( + validate_person_poverty_rate, + ) + + class FakePoverty: + def mean(self): + return 0.39 + + class FakeSimulation: + def calculate(self, variable, period, map_to): + assert variable == "person_in_poverty" + assert period == 2024 + assert map_to == "person" + return FakePoverty() + + with pytest.raises(ValueError, match="person poverty rate"): + validate_person_poverty_rate(FakeSimulation(), year=2024)