diff --git a/changelog.d/target-filters.added.md b/changelog.d/target-filters.added.md new file mode 100644 index 0000000..81b5359 --- /dev/null +++ b/changelog.d/target-filters.added.md @@ -0,0 +1 @@ +Added common and target-specific row filters for QRF training. diff --git a/microimpute/models/imputer.py b/microimpute/models/imputer.py index ec563a8..dae46fa 100644 --- a/microimpute/models/imputer.py +++ b/microimpute/models/imputer.py @@ -14,7 +14,7 @@ import numpy as np import pandas as pd -from pydantic import SkipValidation, validate_call +from pydantic import validate_call from microimpute.config import RANDOM_STATE, VALIDATE_CONFIG from microimpute.utils.type_handling import ( @@ -104,6 +104,7 @@ def identify_target_types( data: pd.DataFrame, imputed_variables: List[str], not_numeric_categorical: Optional[List[str]] = None, + target_fit_masks: Optional[Dict[str, pd.Series]] = None, ) -> None: """Identify and track variable types for imputation targets. @@ -113,21 +114,27 @@ def identify_target_types( not_numeric_categorical: Optional list of variable names that should be treated as numeric even if they would normally be detected as numeric_categorical. + target_fit_masks: Optional target-specific row masks to use when + inferring target type and constants. """ detector = VariableTypeDetector() not_numeric_categorical = not_numeric_categorical or [] + target_fit_masks = target_fit_masks or {} for var in imputed_variables: if var not in data.columns: continue + target_data = data[var] + if var in target_fit_masks: + target_data = target_data.loc[target_fit_masks[var]] # First check if the variable has a constant value - unique_values = data[var].dropna().unique() + unique_values = target_data.dropna().unique() if len(unique_values) == 1: constant_val = unique_values[0] self.constant_targets[var] = { "value": constant_val, - "dtype": data[var].dtype, + "dtype": target_data.dtype, } self.logger.warning( f"Target variable '{var}' has constant value {constant_val}. " @@ -136,7 +143,7 @@ def identify_target_types( continue var_type, categories = detector.categorize_variable( - data[var], + target_data, var, self.logger, force_numeric=(var in not_numeric_categorical), @@ -145,7 +152,7 @@ def identify_target_types( if var_type == "bool": self.boolean_targets[var] = { "type": "boolean", - "dtype": data[var].dtype, + "dtype": target_data.dtype, } self.logger.info(f"Identified boolean target: {var}") @@ -153,7 +160,7 @@ def identify_target_types( self.categorical_targets[var] = { "type": var_type, "categories": categories, - "dtype": data[var].dtype, + "dtype": target_data.dtype, } self.logger.info( f"Identified categorical target: {var} with {len(categories) if categories else 0} categories" @@ -163,6 +170,30 @@ def identify_target_types( self.numeric_targets.append(var) self.logger.debug(f"Identified numeric target: {var}") + def _coerce_fit_filter( + self, + X_train: pd.DataFrame, + fit_filter: Union[str, np.ndarray, pd.Series, List[bool], Tuple[bool, ...]], + *, + name: str, + ) -> pd.Series: + """Normalize a row-filter input to a boolean Series on ``X_train``.""" + if isinstance(fit_filter, str): + if fit_filter not in X_train.columns: + raise ValueError(f"{name} column '{fit_filter}' not found in X_train") + mask = X_train[fit_filter] + elif isinstance(fit_filter, pd.Series): + mask = fit_filter.reindex(X_train.index) + else: + mask = pd.Series(fit_filter, index=X_train.index) + + if len(mask) != len(X_train): + raise ValueError(f"{name} must have length {len(X_train)}, got {len(mask)}") + if mask.isna().any(): + raise ValueError(f"{name} contains missing values") + + return mask.astype(bool) + @validate_call(config=VALIDATE_CONFIG) def preprocess_data_types( self, @@ -216,6 +247,12 @@ def fit( weight_col: Optional[Union[str, np.ndarray, pd.Series]] = None, skip_missing: bool = False, not_numeric_categorical: Optional[List[str]] = None, + row_filter: Optional[ + Union[str, np.ndarray, pd.Series, List[bool], Tuple[bool, ...]] + ] = None, + target_filters: Optional[ + Dict[str, Union[str, np.ndarray, pd.Series, List[bool], Tuple[bool, ...]]] + ] = None, **kwargs: Any, ) -> Any: # Returns ImputerResults """Fit the model to the training data. @@ -229,6 +266,13 @@ def fit( not_numeric_categorical: Optional list of variable names that should be treated as numeric even if they would normally be detected as numeric_categorical. + row_filter: Optional common row mask, or the name of a boolean + column in X_train, selecting rows eligible for all targets. + target_filters: Optional mapping from imputed variable name to a + target-specific row mask, or the name of a boolean column in + X_train. Target-specific filters are combined with row_filter. + They are supported by models that fit one model per target, + such as QRF. **kwargs: Additional model-specific parameters. Returns: @@ -240,6 +284,48 @@ def fit( NotImplementedError: If method is not implemented by subclass. """ original_predictors = predictors.copy() + target_filters = target_filters or {} + unknown_target_filters = set(target_filters) - set(imputed_variables) + if unknown_target_filters: + raise ValueError( + "target_filters contains variables not in imputed_variables: " + f"{sorted(unknown_target_filters)}" + ) + + base_mask = pd.Series(True, index=X_train.index) + if row_filter is not None: + base_mask = self._coerce_fit_filter( + X_train, + row_filter, + name="row_filter", + ) + + target_fit_masks = {} + for variable, target_filter in target_filters.items(): + target_fit_masks[variable] = ( + self._coerce_fit_filter( + X_train, + target_filter, + name=f"target_filters[{variable!r}]", + ) + & base_mask + ) + + if target_filters and not getattr(self, "supports_target_filters", False): + raise NotImplementedError( + f"{type(self).__name__} does not support target_filters" + ) + + if not base_mask.all(): + if isinstance(weight_col, np.ndarray): + weight_col = pd.Series(weight_col, index=base_mask.index).loc[base_mask] + elif isinstance(weight_col, pd.Series): + weight_col = weight_col.reindex(base_mask.index).loc[base_mask] + X_train = X_train.loc[base_mask].copy() + target_fit_masks = { + variable: mask.loc[X_train.index] + for variable, mask in target_fit_masks.items() + } try: # Handle missing variables if skip_missing is enabled @@ -288,7 +374,12 @@ def fit( ) # Identify target types BEFORE preprocessing - self.identify_target_types(X_train, imputed_variables, not_numeric_categorical) + self.identify_target_types( + X_train, + imputed_variables, + not_numeric_categorical, + target_fit_masks=target_fit_masks, + ) X_train, predictors, imputed_variables, imputed_vars_dummy_info = ( self.preprocess_data_types( @@ -319,17 +410,23 @@ def fit( ) # Defer actual training to subclass with all parameters + fit_kwargs = { + "categorical_targets": self.categorical_targets, + "boolean_targets": self.boolean_targets, + "numeric_targets": self.numeric_targets, + "constant_targets": self.constant_targets, + "sample_weight": sample_weight, + **kwargs, + } + if target_fit_masks: + fit_kwargs["target_fit_masks"] = target_fit_masks + fitted_model = self._fit( X_train, self.predictors, self.imputed_variables, self.original_predictors, - categorical_targets=self.categorical_targets, - boolean_targets=self.boolean_targets, - numeric_targets=self.numeric_targets, - constant_targets=self.constant_targets, - sample_weight=sample_weight, - **kwargs, + **fit_kwargs, ) return fitted_model diff --git a/microimpute/models/qrf.py b/microimpute/models/qrf.py index d68d65a..c190d5b 100644 --- a/microimpute/models/qrf.py +++ b/microimpute/models/qrf.py @@ -567,6 +567,8 @@ class QRF(Imputer): The underlying QRF implementation is from the quantile_forest package. """ + supports_target_filters = True + def __init__( self, log_level: Optional[str] = "WARNING", @@ -738,6 +740,65 @@ def _fit_model( # Regular QRF fit model.fit(X, y, sample_weight=sample_weight, **model_params) + def _target_fit_data( + self, + X_train: pd.DataFrame, + variable: str, + target_fit_masks: Optional[Dict[str, pd.Series]], + sample_weight: Optional[np.ndarray], + ) -> Tuple[pd.DataFrame, Optional[np.ndarray]]: + """Return training rows and weights for one target variable.""" + if not target_fit_masks or variable not in target_fit_masks: + return X_train, sample_weight + + mask = ( + target_fit_masks[variable].reindex(X_train.index).fillna(False).astype(bool) + ) + if not mask.any(): + raise ValueError(f"No training rows selected for target '{variable}'") + + target_train = X_train.loc[mask] + target_sample_weight = None + if sample_weight is not None: + target_sample_weight = np.asarray(sample_weight, dtype=float)[ + mask.to_numpy() + ] + + selected_rows = len(target_train) + if ( + self.max_train_samples is not None + and len(target_train) > self.max_train_samples + ): + try: + variable_offset = (self.imputed_variables or []).index(variable) + except ValueError: + variable_offset = 0 + seed = None if self.seed is None else self.seed + variable_offset + rng = np.random.default_rng(seed) + sel = rng.choice( + len(target_train), size=self.max_train_samples, replace=False + ) + target_train = target_train.iloc[sel] + if target_sample_weight is not None: + target_sample_weight = target_sample_weight[sel] + self.logger.info( + "Subsampling target '%s' training data from %d to %d rows", + variable, + selected_rows, + self.max_train_samples, + ) + + dropped = len(X_train) - selected_rows + if dropped: + self.logger.info( + "Target filter for '%s' selected %d/%d training rows", + variable, + selected_rows, + len(X_train), + ) + + return target_train, target_sample_weight + def _get_memory_usage_info(self) -> str: """Get formatted memory usage information.""" if PSUTIL_AVAILABLE: @@ -759,6 +820,7 @@ def _fit( constant_targets: Optional[Dict[str, Dict]] = None, tune_hyperparameters: bool = False, sample_weight: Optional[np.ndarray] = None, + target_fit_masks: Optional[Dict[str, pd.Series]] = None, **qrf_kwargs: Any, ) -> QRFResults: """Fit the QRF model to the training data. @@ -779,10 +841,17 @@ def _fit( RuntimeError: If model fitting fails. """ try: + target_fit_masks = target_fit_masks or {} + if tune_hyperparameters and target_fit_masks: + raise NotImplementedError( + "QRF target_filters are not supported with tune_hyperparameters" + ) + # Subsample training data if max_train_samples is set if ( self.max_train_samples is not None and len(X_train) > self.max_train_samples + and not target_fit_masks ): self.logger.info( f"Subsampling training data from " @@ -893,12 +962,18 @@ def _fit( # Create appropriate model based on variable type model = self._create_model_for_variable(variable) + target_train, target_sample_weight = self._target_fit_data( + X_train, + variable, + target_fit_masks, + sample_weight, + ) self._fit_model( model, - X_train[encoded_predictors], - X_train[variable], + target_train[encoded_predictors], + target_train[variable], variable, - sample_weight=sample_weight, + sample_weight=target_sample_weight, **qrf_kwargs, ) @@ -997,6 +1072,7 @@ def _fit( qrf_kwargs, constant_targets, sample_weight=sample_weight, + target_fit_masks=target_fit_masks, ) # Memory cleanup after each batch @@ -1051,12 +1127,18 @@ def _fit( model = self._create_model_for_variable(variable) try: + target_train, target_sample_weight = self._target_fit_data( + X_train, + variable, + target_fit_masks, + sample_weight, + ) self._fit_model( model, - X_train[encoded_predictors], - X_train[variable], + target_train[encoded_predictors], + target_train[variable], variable, - sample_weight=sample_weight, + sample_weight=target_sample_weight, **qrf_kwargs, ) @@ -1135,6 +1217,7 @@ def _fit_variable_batch( qrf_kwargs: Dict[str, Any], constant_targets: Optional[Dict[str, Dict]] = None, sample_weight: Optional[np.ndarray] = None, + target_fit_masks: Optional[Dict[str, pd.Series]] = None, ) -> None: """Fit models for a batch of variables. @@ -1165,12 +1248,16 @@ def _fit_variable_batch( current_predictors = _get_sequential_predictors( predictors, imputed_variables, i ) + dummy_processor = getattr(self, "dummy_processor", None) + encoded_predictors = self._get_encoded_predictors( + current_predictors, dummy_processor + ) # Log detailed pre-imputation information self.logger.info( f"[{i + 1}/{len(imputed_variables)}] Starting imputation for '{variable}'" ) - self.logger.info(f" Features: {len(current_predictors)} predictors") + self.logger.info(f" Features: {len(encoded_predictors)} predictors") self.logger.info(f" Memory usage: {self._get_memory_usage_info()}") # Create and fit model @@ -1178,12 +1265,18 @@ def _fit_variable_batch( model = self._create_model_for_variable(variable) try: + target_train, target_sample_weight = self._target_fit_data( + X_train, + variable, + target_fit_masks, + sample_weight, + ) self._fit_model( model, - X_train[current_predictors], - X_train[variable], + target_train[encoded_predictors], + target_train[variable], variable, - sample_weight=sample_weight, + sample_weight=target_sample_weight, **qrf_kwargs, ) diff --git a/pyproject.toml b/pyproject.toml index 881873b..6d8fbd8 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -7,7 +7,7 @@ build-backend = "setuptools.build_meta" [project] name = "microimpute" -version = "2.0.4" +version = "2.0.5" description = "Benchmarking imputation methods for microdata" readme = "README.md" authors = [ diff --git a/tests/test_models/test_qrf.py b/tests/test_models/test_qrf.py index 799e1c1..4da9388 100644 --- a/tests/test_models/test_qrf.py +++ b/tests/test_models/test_qrf.py @@ -205,6 +205,94 @@ def test_qrf_repeated_predict_calls_produce_different_draws( ) +def test_qrf_target_filters_fit_each_target_on_eligible_rows() -> None: + """Target-specific masks should let each target ignore its own bad rows.""" + train = pd.DataFrame( + { + "x": [0.0, 1.0, 2.0, 3.0], + "y1": [10.0, 20.0, np.nan, np.nan], + "y2": [np.nan, np.nan, 30.0, 40.0], + "y1_observed": [True, True, False, False], + "y2_observed": [False, False, True, True], + } + ) + + fitted = QRF().fit( + train, + predictors=["x"], + imputed_variables=["y1", "y2"], + target_filters={ + "y1": "y1_observed", + "y2": "y2_observed", + }, + not_numeric_categorical=["x"], + n_estimators=10, + ) + + predictions = fitted.predict(pd.DataFrame({"x": [1.5, 2.5]}), quantiles=[0.5]) + + assert set(predictions[0.5].columns) == {"y1", "y2"} + assert not predictions[0.5].isna().any().any() + + +def test_qrf_row_filter_applies_common_training_mask() -> None: + train = pd.DataFrame( + { + "x": [0.0, 1.0, 2.0], + "y": [10.0, 20.0, np.nan], + "training_row": [True, True, False], + } + ) + + fitted = QRF().fit( + train, + predictors=["x"], + imputed_variables=["y"], + row_filter="training_row", + not_numeric_categorical=["x"], + n_estimators=10, + ) + + predictions = fitted.predict(pd.DataFrame({"x": [1.5]}), quantiles=[0.5]) + + assert not predictions[0.5].isna().any().any() + + +def test_qrf_target_filters_reject_unknown_targets(simple_data: pd.DataFrame) -> None: + with pytest.raises(ValueError, match="target_filters contains variables"): + QRF().fit( + simple_data, + predictors=["x1", "x2"], + imputed_variables=["y"], + target_filters={"missing": [True] * len(simple_data)}, + ) + + +def test_qrf_target_filters_apply_before_max_train_samples() -> None: + train = pd.DataFrame( + { + "x": np.arange(50, dtype=float), + "y": np.nan, + "y_observed": False, + } + ) + train.loc[[45, 46, 47], "y"] = [100.0, 110.0, 120.0] + train.loc[[45, 46, 47], "y_observed"] = True + + fitted = QRF(max_train_samples=5).fit( + train, + predictors=["x"], + imputed_variables=["y"], + target_filters={"y": "y_observed"}, + not_numeric_categorical=["x"], + n_estimators=10, + ) + + predictions = fitted.predict(pd.DataFrame({"x": [46.0]}), quantiles=[0.5]) + + assert not predictions[0.5]["y"].isna().any() + + def test_qrf_stochastic_median_is_unbiased() -> None: """Regression test for the quantile-grid bias bug (#2): the mean of many stochastic median predictions must approximate the true median (q=0.5) diff --git a/uv.lock b/uv.lock index f8e01ff..bcf6f0a 100644 --- a/uv.lock +++ b/uv.lock @@ -1048,7 +1048,7 @@ wheels = [ [[package]] name = "microimpute" -version = "2.0.4" +version = "2.0.5" source = { editable = "." } dependencies = [ { name = "joblib" },