From 8e15f157f832300edc854ba240247e2694c4f796 Mon Sep 17 00:00:00 2001 From: juaristi22 Date: Thu, 2 Oct 2025 19:41:31 +0800 Subject: [PATCH] ensure correct hyperparameter tuning for QRF models --- changelog_entry.yaml | 5 + microimpute/comparisons/autoimpute.py | 2 +- microimpute/comparisons/metrics.py | 2 +- microimpute/models/imputer.py | 143 +------ microimpute/models/matching.py | 174 ++++---- microimpute/models/qrf.py | 550 ++++++++++++++++++++++---- microimpute/utils/__init__.py | 2 +- microimpute/utils/type_detector.py | 83 ---- microimpute/utils/type_handling.py | 356 +++++++++++++++++ tests/test_models/test_qrf.py | 377 +++++++++++++++--- 10 files changed, 1261 insertions(+), 433 deletions(-) delete mode 100644 microimpute/utils/type_detector.py create mode 100644 microimpute/utils/type_handling.py diff --git a/changelog_entry.yaml b/changelog_entry.yaml index e69de29..b1e5bec 100644 --- a/changelog_entry.yaml +++ b/changelog_entry.yaml @@ -0,0 +1,5 @@ +- bump: minor + changes: + fixed: + - QRF hyperparameter tuning now correctly tunes QRF and RFC separately. + - QRF imputation now correctly handles categorical variables that become predictors after imputation. diff --git a/microimpute/comparisons/autoimpute.py b/microimpute/comparisons/autoimpute.py index 2fe1ece..9c1f073 100644 --- a/microimpute/comparisons/autoimpute.py +++ b/microimpute/comparisons/autoimpute.py @@ -28,7 +28,7 @@ ) from microimpute.models import OLS, QRF, Imputer, QuantReg from microimpute.utils.data import unnormalize_predictions -from microimpute.utils.type_detector import VariableTypeDetector +from microimpute.utils.type_handling import VariableTypeDetector try: from microimpute.models import Matching diff --git a/microimpute/comparisons/metrics.py b/microimpute/comparisons/metrics.py index cb8e3f0..4431fcd 100644 --- a/microimpute/comparisons/metrics.py +++ b/microimpute/comparisons/metrics.py @@ -19,7 +19,7 @@ validate_quantiles, ) from microimpute.config import QUANTILES, VALIDATE_CONFIG -from microimpute.utils.type_detector import VariableTypeDetector +from microimpute.utils.type_handling import VariableTypeDetector log = logging.getLogger(__name__) diff --git a/microimpute/models/imputer.py b/microimpute/models/imputer.py index 776693d..e59e4a2 100644 --- a/microimpute/models/imputer.py +++ b/microimpute/models/imputer.py @@ -17,7 +17,10 @@ from pydantic import SkipValidation, validate_call from microimpute.config import RANDOM_STATE, VALIDATE_CONFIG -from microimpute.utils.type_detector import VariableTypeDetector +from microimpute.utils.type_handling import ( + DummyVariableProcessor, + VariableTypeDetector, +) class _ConstantValueModel: @@ -34,144 +37,6 @@ def predict(self, X: pd.DataFrame, **kwargs) -> pd.Series: ) -class DummyVariableProcessor: - """Handles conversion of categorical predictors to dummy variables.""" - - def __init__(self, logger: logging.Logger): - self.logger = logger - self.dummy_mapping = {} # Maps original column to dummy columns - - def preprocess_predictors( - self, - data: pd.DataFrame, - predictors: List[str], - imputed_variables: List[str], - ) -> Tuple[pd.DataFrame, List[str]]: - """ - Process only predictor variables, converting categoricals to dummies. - Imputation targets remain in original form. - - Returns: - Tuple of (processed_data, updated_predictors) - """ - # Start with a copy containing all needed columns - all_columns = list(set(predictors + imputed_variables)) - data = data[all_columns].copy() - detector = VariableTypeDetector() - - # Identify categorical predictors only (not targets) - categorical_predictors = [] - for col in predictors: # Only check predictors - if col not in data.columns: - continue - var_type, categories = detector.categorize_variable( - data[col], col, self.logger - ) - if var_type in ["categorical", "numeric_categorical"]: - categorical_predictors.append(col) - self.logger.info( - f"Will create dummy variables for predictor '{col}' ({var_type})" - ) - - # Process categorical predictors - updated_predictors = predictors.copy() - - if categorical_predictors: - # Create dummy variables for categorical predictors only - dummy_df = pd.get_dummies( - data[categorical_predictors], - columns=categorical_predictors, - dtype="float64", - drop_first=True, # Standard practice for predictors - ) - - # Track mapping for each original column - for orig_col in categorical_predictors: - dummy_cols = [ - col - for col in dummy_df.columns - if col.startswith(f"{orig_col}_") - ] - self.dummy_mapping[orig_col] = dummy_cols - - # Update predictor list - updated_predictors.remove(orig_col) - updated_predictors.extend(dummy_cols) - - self.logger.debug( - f"Created {len(dummy_cols)} dummy variables for '{orig_col}'" - ) - - # Drop original categorical columns and add dummies - data = data.drop(columns=categorical_predictors) - data = pd.concat([data, dummy_df], axis=1) - - # Convert boolean predictors to float (but keep as single column) - for col in predictors: - if col in data.columns: - var_type, _ = detector.categorize_variable( - data[col], col, self.logger - ) - if var_type == "bool": - data[col] = data[col].astype("float64") - self.logger.debug( - f"Converted boolean predictor '{col}' to float64" - ) - - return data, updated_predictors - - def apply_dummy_encoding_to_test( - self, - data: pd.DataFrame, - predictors: List[str], - ) -> Tuple[pd.DataFrame, List[str]]: - """Apply same dummy encoding to test data based on training mapping.""" - detector = VariableTypeDetector() - data = data.copy() - updated_predictors = predictors.copy() - - # Apply dummy encoding based on stored mapping - for orig_col, dummy_cols in self.dummy_mapping.items(): - if orig_col in predictors and orig_col in data.columns: - # Create dummies for this column - dummy_df = pd.get_dummies( - data[[orig_col]], - columns=[orig_col], - dtype="float64", - drop_first=False, # Don't drop first, we'll handle missing manually - ) - - # Ensure we have the exact dummy columns from training - for dummy_col in dummy_cols: - if dummy_col not in dummy_df.columns: - dummy_df[dummy_col] = 0.0 # Missing category gets 0 - - # Keep only the dummy columns from training - dummy_df = dummy_df[dummy_cols] - - # Update data - data = data.drop(columns=[orig_col]) - data = pd.concat([data, dummy_df], axis=1) - - # Update predictor list - updated_predictors.remove(orig_col) - updated_predictors.extend(dummy_cols) - - # Convert boolean predictors to float - for col in predictors: - if col in data.columns: - var_type, _ = detector.categorize_variable( - data[col], col, self.logger - ) - if var_type == "bool": - data[col] = data[col].astype("float64") - - return data, updated_predictors - - # Note: Old reverse encoding methods removed as we now handle categorical - # targets directly through classification models - - class Imputer(ABC): """ Abstract base class for fitting imputation models. diff --git a/microimpute/models/matching.py b/microimpute/models/matching.py index 44a73e6..d249354 100644 --- a/microimpute/models/matching.py +++ b/microimpute/models/matching.py @@ -554,10 +554,12 @@ def _tune_hyperparameters( predictors: List[str], imputed_variables: List[str], ) -> Dict[str, Any]: - """Tune hyperparameters for the Matching model using Optuna. + """Tune hyperparameters for the Matching model using Optuna with CV. + + Uses cross-validation and quantile loss for robust hyperparameter selection. Args: - X_train: DataFrame containing the training data. + data: DataFrame containing the training data. predictors: List of column names to use as predictors. imputed_variables: List of column names to impute. @@ -565,13 +567,21 @@ def _tune_hyperparameters( Dictionary of tuned hyperparameters. """ import optuna - from sklearn.model_selection import train_test_split + from sklearn.model_selection import KFold + + from microimpute.comparisons.metrics import compute_loss optuna.logging.set_verbosity(optuna.logging.WARNING) - # Create a validation split (80% train, 20% validation) - X_train, X_test = train_test_split( - data, test_size=0.2, random_state=self.seed + # Use same CV strategy as QRF: 3-fold CV with 10 trials + n_cv_folds = 3 + n_trials = 10 + + # Set up CV folds + kf = KFold(n_splits=n_cv_folds, shuffle=True, random_state=self.seed) + + self.logger.info( + f"Tuning Matching hyperparameters with {n_cv_folds}-fold CV and {n_trials} trials" ) def objective(trial: optuna.Trial) -> float: @@ -595,81 +605,103 @@ def objective(trial: optuna.Trial) -> float: "k": trial.suggest_int("k", 1, 10), } - # Track errors for all variables - var_errors = [] + # Track errors across CV folds + fold_errors = [] - for var in imputed_variables: - y_test = X_test[var] - X_test_var = X_test.copy().drop(var, axis=1) + # Perform CV + for fold_idx, (train_idx, val_idx) in enumerate(kf.split(data)): + X_train_fold = data.iloc[train_idx] + X_val_fold = data.iloc[val_idx] - # Determine if chunking is needed for hyperparameter tuning - chunk_size = 1000 # Smaller chunks for tuning - total_size = len(X_train) * len(X_test_var) - use_chunking = ( - len(X_test_var) > chunk_size - or total_size > 25_000_000 # Lower threshold for tuning - ) + # Track errors for all variables in this fold + var_errors = [] - if use_chunking: - # Perform chunked matching for hyperparameter tuning - y_pred_chunks = [] - y_test_chunks = [] + for var in imputed_variables: + y_val = X_val_fold[var] + X_val_var = X_val_fold.copy().drop(var, axis=1) - for i in range(0, len(X_test_var), chunk_size): - chunk_end = min(i + chunk_size, len(X_test_var)) - chunk_data = X_test_var.iloc[i:chunk_end] - chunk_y_test = y_test.iloc[i:chunk_end] + # Determine if chunking is needed for hyperparameter tuning + chunk_size = 1000 # Smaller chunks for tuning + total_size = len(X_train_fold) * len(X_val_var) + use_chunking = ( + len(X_val_var) > chunk_size + or total_size + > 25_000_000 # Lower threshold for tuning + ) + if use_chunking: + # Perform chunked matching for hyperparameter tuning + y_pred_chunks = [] + y_val_chunks = [] + + for i in range(0, len(X_val_var), chunk_size): + chunk_end = min(i + chunk_size, len(X_val_var)) + chunk_data = X_val_var.iloc[i:chunk_end] + chunk_y_val = y_val.iloc[i:chunk_end] + + try: + fused0, fused1 = self.matching_hotdeck( + receiver=chunk_data, + donor=X_train_fold, + matching_variables=predictors, + z_variables=[var], + **params, + ) + y_pred_chunks.append(fused0[var].values) + y_val_chunks.append(chunk_y_val.values) + except Exception: + # If chunk fails, use mean of training data as prediction + mean_val = X_train_fold[var].mean() + y_pred_chunks.append( + np.full(len(chunk_data), mean_val) + ) + y_val_chunks.append(chunk_y_val.values) + + # Combine chunk results + y_pred = np.concatenate(y_pred_chunks) + y_val_combined = np.concatenate(y_val_chunks) + else: + # Perform single matching try: fused0, fused1 = self.matching_hotdeck( - receiver=chunk_data, - donor=X_train, + receiver=X_val_var, + donor=X_train_fold, matching_variables=predictors, z_variables=[var], **params, ) - y_pred_chunks.append(fused0[var].values) - y_test_chunks.append(chunk_y_test.values) + y_pred = fused0[var].values + y_val_combined = y_val.values except Exception: - # If chunk fails, use mean of training data as prediction - mean_val = X_train[var].mean() - y_pred_chunks.append( - np.full(len(chunk_data), mean_val) - ) - y_test_chunks.append(chunk_y_test.values) + # If matching fails, use mean of training data as prediction + mean_val = X_train_fold[var].mean() + y_pred = np.full(len(X_val_var), mean_val) + y_val_combined = y_val.values + + # Use quantile loss with median (q=0.5) for hyperparameter tuning + _, quantile_loss_value = compute_loss( + y_val_combined.flatten(), + y_pred.flatten(), + "quantile_loss", + q=0.5, + ) - # Combine chunk results - y_pred = np.concatenate(y_pred_chunks) - y_test_combined = np.concatenate(y_test_chunks) - else: - # Perform single matching - try: - fused0, fused1 = self.matching_hotdeck( - receiver=X_test_var, - donor=X_train, - matching_variables=predictors, - z_variables=[var], - **params, - ) - y_pred = fused0[var].values - y_test_combined = y_test.values - except Exception: - # If matching fails, use mean of training data as prediction - mean_val = X_train[var].mean() - y_pred = np.full(len(X_test_var), mean_val) - y_test_combined = y_test.values - - # Calculate error - # Normalize error by variable's standard deviation - std = np.std(y_test_combined.flatten()) - mse = np.mean( - (y_pred.flatten() - y_test_combined.flatten()) ** 2 - ) - normalized_mse = mse / (std**2) if std > 0 else mse + # Normalize by variable's standard deviation + std = np.std(y_val_combined.flatten()) + normalized_loss = ( + quantile_loss_value / std + if std > 0 + else quantile_loss_value + ) - var_errors.append(normalized_mse) + var_errors.append(normalized_loss) - return np.mean(var_errors) + # Average across variables for this fold + if var_errors: + fold_errors.append(np.mean(var_errors)) + + # Return mean error across all CV folds + return np.mean(fold_errors) if fold_errors else float("inf") study = optuna.create_study( direction="minimize", @@ -681,12 +713,16 @@ def objective(trial: optuna.Trial) -> float: os.environ["PYTHONWARNINGS"] = "ignore" - study.optimize(objective, n_trials=30) + study.optimize(objective, n_trials=n_trials) best_value = study.best_value - self.logger.info(f"Lowest average normalized MSE: {best_value}") + self.logger.info( + f"Matching - Lowest average normalized quantile loss ({n_cv_folds}-fold CV): {best_value}" + ) best_params = study.best_params - self.logger.info(f"Best hyperparameters found: {best_params}") + self.logger.info( + f"Matching - Best hyperparameters found: {best_params}" + ) return best_params diff --git a/microimpute/models/qrf.py b/microimpute/models/qrf.py index aba512b..0f8a4c7 100644 --- a/microimpute/models/qrf.py +++ b/microimpute/models/qrf.py @@ -316,15 +316,27 @@ def _predict( var_predictors = _get_sequential_predictors( self.predictors, self.imputed_variables, i ) + + # Get properly encoded predictor columns + if self.dummy_processor: + encoded_predictors = self.dummy_processor.get_sequential_predictor_columns( + var_predictors + ) + else: + encoded_predictors = var_predictors + self.logger.debug( f"var_predictors for {variable}: {var_predictors}" ) + self.logger.debug( + f"encoded_predictors for {variable}: {encoded_predictors}" + ) self.logger.debug( f"Available columns in X_test_augmented: {X_test_augmented.columns.tolist()}" ) # Ensure we have all needed columns in X_test_augmented - missing_cols = set(var_predictors) - set( + missing_cols = set(encoded_predictors) - set( X_test_augmented.columns ) if missing_cols: @@ -355,20 +367,21 @@ def _predict( if return_probs and prob_results is not None: # Get probabilities and classes prob_info = model.predict( - X_test_augmented[var_predictors], + X_test_augmented[encoded_predictors], return_probs=True, ) prob_results[variable] = prob_info # Get class predictions imputed_values = model.predict( - X_test_augmented[var_predictors], + X_test_augmented[encoded_predictors], return_probs=False, ) else: # Regression for numeric targets imputed_values = model.predict( - X_test_augmented[var_predictors], mean_quantile=q + X_test_augmented[encoded_predictors], + mean_quantile=q, ) imputed_df[variable] = imputed_values @@ -376,21 +389,23 @@ def _predict( # Add the imputed values to X_test_augmented for subsequent variables X_test_augmented[variable] = imputed_values - # If this is a categorical variable, track its dummy columns - # for future sequential imputation steps - if variable in self.categorical_targets: - # Track which dummy columns would be created for this variable - # using drop_first=True convention - unique_values = imputed_values.unique() - if len(unique_values) > 1: - # With drop_first=True, we create dummies for all but the first category - for val in sorted(unique_values)[1:]: - dummy_col = f"{variable}_{val}" - imputed_dummy_cols.add(dummy_col) - # Also create the actual dummy column if it will be used - X_test_augmented[dummy_col] = ( - imputed_values == val - ).astype(float) + # Encode categorical/boolean imputed variable for next iteration + if ( + self.dummy_processor + and variable + in self.dummy_processor.imputed_var_dummy_mapping + ): + X_test_augmented = self.dummy_processor.sequential_imputed_predictor_encoding( + X_test_augmented, variable + ) + # Track the dummy columns that were added + var_info = ( + self.dummy_processor.imputed_var_dummy_mapping[ + variable + ] + ) + if var_info["dummy_cols"]: + imputed_dummy_cols.update(var_info["dummy_cols"]) # Log timing for individual variables when not processing multiple quantiles if not quantiles: @@ -504,20 +519,42 @@ def _fit_model( categorical_targets = getattr(self, "categorical_targets", {}) boolean_targets = getattr(self, "boolean_targets", {}) + # Extract appropriate parameters based on model type + # Handle nested structure from hyperparameter tuning if isinstance(model, _RandomForestClassifierModel): + # Use RFC params if they exist in a nested structure + if "rfc" in kwargs: + model_params = kwargs["rfc"] + elif "qrf" in kwargs: + # Mixed case: only QRF params available, use defaults for RFC + model_params = {} + else: + # Flat dict: use all kwargs (backward compatible) + model_params = kwargs + if variable in categorical_targets: model.fit( X, y, var_type=categorical_targets[variable]["type"], categories=categorical_targets[variable].get("categories"), - **kwargs, + **model_params, ) elif variable in boolean_targets: - model.fit(X, y, var_type="boolean", **kwargs) + model.fit(X, y, var_type="boolean", **model_params) else: + # Use QRF params if they exist in a nested structure + if "qrf" in kwargs: + model_params = kwargs["qrf"] + elif "rfc" in kwargs: + # Mixed case: only RFC params available, use defaults for QRF + model_params = {} + else: + # Flat dict: use all kwargs (backward compatible) + model_params = kwargs + # Regular QRF fit - model.fit(X, y, **kwargs) + model.fit(X, y, **model_params) def _get_memory_usage_info(self) -> str: """Get formatted memory usage information.""" @@ -556,6 +593,13 @@ def _fit( RuntimeError: If model fitting fails. """ try: + # Store target type information early for hyperparameter tuning + self.categorical_targets = categorical_targets or {} + self.boolean_targets = boolean_targets or {} + self.numeric_targets = numeric_targets or [] + self.constant_targets = constant_targets or {} + self.imputed_variables = imputed_variables + if tune_hyperparameters: try: qrf_kwargs = self._tune_hyperparameters( @@ -633,12 +677,23 @@ def _fit( predictors, imputed_variables, i ) + # Get properly encoded predictor columns + dummy_processor = getattr( + self, "dummy_processor", None + ) + if dummy_processor: + encoded_predictors = dummy_processor.get_sequential_predictor_columns( + current_predictors + ) + else: + encoded_predictors = current_predictors + # Log detailed pre-imputation information self.logger.info( f"[{i+1}/{len(imputed_variables)}] Starting imputation for '{variable}'" ) self.logger.info( - f" Features: {len(current_predictors)} predictors" + f" Features: {len(encoded_predictors)} predictors" ) self.logger.info( f" Memory usage: {self._get_memory_usage_info()}" @@ -648,7 +703,7 @@ def _fit( model = self._create_model_for_variable(variable) self._fit_model( model, - X_train[current_predictors], + X_train[encoded_predictors], X_train[variable], variable, **qrf_kwargs, @@ -678,6 +733,19 @@ def _fit( self.models[variable] = model + # Encode categorical/boolean imputed variable for next iteration + if ( + dummy_processor + and variable + in dummy_processor.imputed_var_dummy_mapping + ): + X_train = dummy_processor.sequential_imputed_predictor_encoding( + X_train, variable + ) + self.logger.debug( + f" Encoded '{variable}' for use in sequential imputation" + ) + except Exception as e: self.logger.error( f" ✗ Failed: {variable} - {str(e)}" @@ -966,33 +1034,39 @@ def _fit_variable_batch( f" Memory cleanup performed. Usage: {self._get_memory_usage_info()}" ) - @validate_call(config=VALIDATE_CONFIG) - def _tune_hyperparameters( + def _tune_qrf_hyperparameters( self, data: pd.DataFrame, predictors: List[str], - imputed_variables: List[str], + numeric_vars: List[str], + n_cv_folds: int = 3, + n_trials: int = 10, ) -> Dict[str, Any]: - """Tune hyperparameters for the QRF model using Optuna. + """Tune hyperparameters for QRF model using quantile loss with CV. Args: - X_train: DataFrame containing the training data. + data: Full training data. predictors: List of column names to use as predictors. - imputed_variables: List of column names to impute. + numeric_vars: List of numeric variables to impute. + n_cv_folds: Number of CV folds for robust evaluation (default: 3). + n_trials: Number of Optuna trials (default: 10). Returns: - Dictionary of tuned hyperparameters. + Dictionary of tuned hyperparameters for QRF. """ import optuna - from sklearn.model_selection import train_test_split + from sklearn.model_selection import KFold + + from microimpute.comparisons.metrics import compute_loss # Suppress Optuna's logs during optimization optuna.logging.set_verbosity(optuna.logging.WARNING) - # Create a validation split (80% train, 20% validation) - X_train, X_test = train_test_split( - data, test_size=0.2, random_state=self.seed - ) + # Get all imputed variables for proper sequential imputation + all_imputed_vars = getattr(self, "imputed_variables", numeric_vars) + + # Set up CV folds + kf = KFold(n_splits=n_cv_folds, shuffle=True, random_state=self.seed) def objective(trial: optuna.Trial) -> float: params = { @@ -1009,54 +1083,96 @@ def objective(trial: optuna.Trial) -> float: ), } - # Track errors for all variables - var_errors = [] + # Track errors across CV folds + fold_errors = [] - # Create copies for augmented data - X_train_augmented = X_train.copy() - X_test_augmented = X_test.copy() + # Perform CV + for fold_idx, (train_idx, val_idx) in enumerate(kf.split(data)): + X_train_fold = data.iloc[train_idx] + X_val_fold = data.iloc[val_idx] - # For each imputed variable - for i, var in enumerate(imputed_variables): - # Build predictor set: original predictors + previously imputed variables - current_predictors = _get_sequential_predictors( - predictors, imputed_variables, i - ) + # Track errors for numeric variables in this fold + var_errors = [] - # Extract target variable values - y_test = X_test[var] + # Create copies for augmented data + X_train_augmented = X_train_fold.copy() + X_val_augmented = X_val_fold.copy() - # Create and fit QRF model with trial parameters - # Note: X_train_augmented is already preprocessed by base class - model = self._create_model_for_variable(var) - self._fit_model( - model, - X_train_augmented[current_predictors], - X_train[var], - var, - **params, - ) + # For each imputed variable (only evaluate numeric ones) + for i, var in enumerate(all_imputed_vars): + # Build predictor set: original predictors + previously imputed variables + current_predictors = _get_sequential_predictors( + predictors, all_imputed_vars, i + ) - # Predict and calculate error - y_pred = model.predict(X_test_augmented[current_predictors]) + # Get properly encoded predictor columns + dummy_processor = getattr(self, "dummy_processor", None) + if dummy_processor: + encoded_predictors = ( + dummy_processor.get_sequential_predictor_columns( + current_predictors + ) + ) + else: + encoded_predictors = current_predictors + + # Only fit and evaluate numeric variables + if var in numeric_vars: + # Extract target variable values + y_val = X_val_fold[var] + + # Create and fit QRF model with trial parameters + model = _QRFModel(seed=self.seed, logger=self.logger) + model.fit( + X_train_augmented[encoded_predictors], + X_train_fold[var], + **params, + ) - # Add predictions to augmented datasets for next variable - X_train_augmented[var] = model.predict( - X_train_augmented[current_predictors] - ) - X_test_augmented[var] = y_pred + # Predict + y_pred = model.predict( + X_val_augmented[encoded_predictors] + ) - # Normalize error by variable's standard deviation - std = np.std(y_test.values.flatten()) - mse = np.mean( - (y_pred.values.flatten() - y_test.values.flatten()) ** 2 - ) - normalized_mse = mse / (std**2) if std > 0 else mse + # Add predictions to augmented datasets for next variable + X_train_augmented[var] = model.predict( + X_train_augmented[encoded_predictors] + ) + X_val_augmented[var] = y_pred + + # Use quantile loss with median (q=0.5) for hyperparameter tuning + _, quantile_loss_value = compute_loss( + y_val.values.flatten(), + y_pred.values.flatten(), + "quantile_loss", + q=0.5, + ) + + # Normalize by variable's standard deviation + std = np.std(y_val.values.flatten()) + normalized_loss = ( + quantile_loss_value / std + if std > 0 + else quantile_loss_value + ) + + var_errors.append(normalized_loss) + else: + # Categorical variable - encode it for use as predictor in next iterations + if dummy_processor and var in X_train_fold.columns: + X_train_augmented = dummy_processor.sequential_imputed_predictor_encoding( + X_train_augmented, var + ) + X_val_augmented = dummy_processor.sequential_imputed_predictor_encoding( + X_val_augmented, var + ) - var_errors.append(normalized_mse) + # Average across variables for this fold + if var_errors: + fold_errors.append(np.mean(var_errors)) - # Return mean error across all variables - return np.mean(var_errors) + # Return mean error across all CV folds + return np.mean(fold_errors) if fold_errors else float("inf") # Create and run the study study = optuna.create_study( @@ -1069,12 +1185,290 @@ def objective(trial: optuna.Trial) -> float: os.environ["PYTHONWARNINGS"] = "ignore" - study.optimize(objective, n_trials=30) + study.optimize(objective, n_trials=n_trials) best_value = study.best_value - self.logger.info(f"Lowest average normalized MSE: {best_value}") + self.logger.info( + f"QRF - Lowest average normalized quantile loss ({n_cv_folds}-fold CV): {best_value}" + ) best_params = study.best_params - self.logger.info(f"Best hyperparameters found: {best_params}") + self.logger.info(f"QRF - Best hyperparameters found: {best_params}") return best_params + + def _tune_rfc_hyperparameters( + self, + data: pd.DataFrame, + predictors: List[str], + categorical_vars: List[str], + n_cv_folds: int = 3, + n_trials: int = 10, + ) -> Dict[str, Any]: + """Tune hyperparameters for RFC model using log loss with CV. + + Args: + data: Full training data. + predictors: List of column names to use as predictors. + categorical_vars: List of categorical/boolean variables to impute. + n_cv_folds: Number of CV folds for robust evaluation (default: 3). + n_trials: Number of Optuna trials (default: 10). + + Returns: + Dictionary of tuned hyperparameters for RFC. + """ + import optuna + from sklearn.model_selection import KFold + + from microimpute.comparisons.metrics import ( + compute_loss, + order_probabilities_alphabetically, + ) + + # Suppress Optuna's logs during optimization + optuna.logging.set_verbosity(optuna.logging.WARNING) + + # Get all imputed variables for proper sequential imputation + all_imputed_vars = getattr(self, "imputed_variables", categorical_vars) + categorical_targets = getattr(self, "categorical_targets", {}) + boolean_targets = getattr(self, "boolean_targets", {}) + + # Set up CV folds + kf = KFold(n_splits=n_cv_folds, shuffle=True, random_state=self.seed) + + def objective(trial: optuna.Trial) -> float: + params = { + "n_estimators": trial.suggest_int("n_estimators", 50, 300), + "min_samples_split": trial.suggest_int( + "min_samples_split", 2, 20 + ), + "min_samples_leaf": trial.suggest_int( + "min_samples_leaf", 1, 10 + ), + "max_features": trial.suggest_categorical( + "max_features", ["sqrt", "log2", 0.5, 0.8, 1.0] + ), + "bootstrap": trial.suggest_categorical( + "bootstrap", [True, False] + ), + } + + # Track errors across CV folds + fold_errors = [] + + # Perform CV + for fold_idx, (train_idx, val_idx) in enumerate(kf.split(data)): + X_train_fold = data.iloc[train_idx] + X_val_fold = data.iloc[val_idx] + + # Track errors for categorical variables in this fold + var_errors = [] + + # Create copies for augmented data + X_train_augmented = X_train_fold.copy() + X_val_augmented = X_val_fold.copy() + + # For each imputed variable (only evaluate categorical ones) + for i, var in enumerate(all_imputed_vars): + # Build predictor set: original predictors + previously imputed variables + current_predictors = _get_sequential_predictors( + predictors, all_imputed_vars, i + ) + + # Get properly encoded predictor columns + dummy_processor = getattr(self, "dummy_processor", None) + if dummy_processor: + encoded_predictors = ( + dummy_processor.get_sequential_predictor_columns( + current_predictors + ) + ) + else: + encoded_predictors = current_predictors + + # Only fit and evaluate categorical/boolean variables + if var in categorical_vars: + # Extract target variable values + y_val = X_val_fold[var] + + # Create and fit RFC model with trial parameters + model = _RandomForestClassifierModel( + seed=self.seed, logger=self.logger + ) + + # Determine variable type and fit appropriately + if var in categorical_targets: + model.fit( + X_train_augmented[encoded_predictors], + X_train_fold[var], + var_type=categorical_targets[var]["type"], + categories=categorical_targets[var].get( + "categories" + ), + **params, + ) + elif var in boolean_targets: + model.fit( + X_train_augmented[encoded_predictors], + X_train_fold[var], + var_type="boolean", + **params, + ) + + # Get probability predictions + prob_info = model.predict( + X_val_augmented[encoded_predictors], + return_probs=True, + ) + + # Get class predictions for augmented data + y_pred = model.predict( + X_val_augmented[encoded_predictors], + return_probs=False, + ) + + # Add predictions to augmented datasets for next variable + X_train_augmented[var] = model.predict( + X_train_augmented[encoded_predictors], + return_probs=False, + ) + X_val_augmented[var] = y_pred + + # Order probabilities alphabetically for log loss + probs_ordered, alphabetical_labels = ( + order_probabilities_alphabetically( + prob_info["probabilities"], + prob_info["classes"], + ) + ) + + # Compute log loss + _, log_loss_value = compute_loss( + y_val.values, + probs_ordered, + "log_loss", + labels=alphabetical_labels, + ) + + var_errors.append(log_loss_value) + + # Encode the categorical variable for use as predictor in next iterations + if dummy_processor: + X_train_augmented = dummy_processor.sequential_imputed_predictor_encoding( + X_train_augmented, var + ) + X_val_augmented = dummy_processor.sequential_imputed_predictor_encoding( + X_val_augmented, var + ) + else: + # Numeric variable - just add it to augmented data (already there from fold data) + pass + + # Average across variables for this fold + if var_errors: + fold_errors.append(np.mean(var_errors)) + + # Return mean error across all CV folds + return np.mean(fold_errors) if fold_errors else float("inf") + + # Create and run the study + study = optuna.create_study( + direction="minimize", + sampler=optuna.samplers.TPESampler(seed=self.seed), + ) + + # Suppress warnings during optimization + import os + + os.environ["PYTHONWARNINGS"] = "ignore" + + study.optimize(objective, n_trials=n_trials) + + best_value = study.best_value + self.logger.info( + f"RFC - Lowest average log loss ({n_cv_folds}-fold CV): {best_value}" + ) + + best_params = study.best_params + self.logger.info(f"RFC - Best hyperparameters found: {best_params}") + + return best_params + + @validate_call(config=VALIDATE_CONFIG) + def _tune_hyperparameters( + self, + data: pd.DataFrame, + predictors: List[str], + imputed_variables: List[str], + ) -> Dict[str, Any]: + """Tune hyperparameters for the QRF/RFC models using Optuna with CV. + + Automatically detects variable types and tunes appropriate models: + - Numeric variables: QRF with quantile loss + - Categorical/Boolean variables: RFC with log loss + + Uses cross-validation for robust hyperparameter selection. + + Args: + data: DataFrame containing the training data. + predictors: List of column names to use as predictors. + imputed_variables: List of column names to impute. + + Returns: + Dictionary of tuned hyperparameters. Format depends on variable types: + - Only numeric: flat dict with QRF params + - Only categorical: flat dict with RFC params + - Mixed: nested dict {"qrf": {...}, "rfc": {...}} + """ + # Separate variables by type using existing class attributes + categorical_targets = getattr(self, "categorical_targets", {}) + boolean_targets = getattr(self, "boolean_targets", {}) + + categorical_vars = [ + var + for var in imputed_variables + if var in categorical_targets or var in boolean_targets + ] + numeric_vars = [ + var for var in imputed_variables if var not in categorical_vars + ] + + # Default: 3-fold CV with 10 trials (same computational cost as old 30 trials) + n_cv_folds = 3 + n_trials = 10 + + self.logger.info( + f"Hyperparameter tuning with {n_cv_folds}-fold CV and {n_trials} trials: " + f"{len(numeric_vars)} numeric variables, " + f"{len(categorical_vars)} categorical/boolean variables" + ) + + # Tune appropriate models based on variable types + if not categorical_vars: + # Backward compatible: only numeric variables + self.logger.info( + "Tuning QRF hyperparameters (numeric variables only)" + ) + return self._tune_qrf_hyperparameters( + data, predictors, numeric_vars, n_cv_folds, n_trials + ) + elif not numeric_vars: + # Only categorical variables + self.logger.info( + "Tuning RFC hyperparameters (categorical/boolean variables only)" + ) + return self._tune_rfc_hyperparameters( + data, predictors, categorical_vars, n_cv_folds, n_trials + ) + else: + # Mixed: tune both separately + self.logger.info( + "Tuning both QRF and RFC hyperparameters (mixed variable types)" + ) + qrf_params = self._tune_qrf_hyperparameters( + data, predictors, numeric_vars, n_cv_folds, n_trials + ) + rfc_params = self._tune_rfc_hyperparameters( + data, predictors, categorical_vars, n_cv_folds, n_trials + ) + return {"qrf": qrf_params, "rfc": rfc_params} diff --git a/microimpute/utils/__init__.py b/microimpute/utils/__init__.py index 3ab40c0..7437931 100644 --- a/microimpute/utils/__init__.py +++ b/microimpute/utils/__init__.py @@ -11,7 +11,7 @@ """ from microimpute.utils.data import preprocess_data, unnormalize_predictions -from microimpute.utils.type_detector import VariableTypeDetector +from microimpute.utils.type_handling import VariableTypeDetector # Optional import for R-based functions try: diff --git a/microimpute/utils/type_detector.py b/microimpute/utils/type_detector.py deleted file mode 100644 index f7a197a..0000000 --- a/microimpute/utils/type_detector.py +++ /dev/null @@ -1,83 +0,0 @@ -"""Variable type detection utilities. - -This module provides utilities for detecting and categorizing variable types -in pandas DataFrames, helping determine whether variables are boolean, categorical, -numeric categorical, or purely numeric. -""" - -import logging -from typing import List, Optional, Tuple - -import numpy as np -import pandas as pd - - -class VariableTypeDetector: - """Utility class for detecting and categorizing variable types.""" - - @staticmethod - def is_boolean_variable(series: pd.Series) -> bool: - """Check if a series represents boolean data.""" - if pd.api.types.is_bool_dtype(series): - return True - - unique_vals = set(series.dropna().unique()) - if pd.api.types.is_integer_dtype(series) and unique_vals <= {0, 1}: - return True - if pd.api.types.is_float_dtype(series) and unique_vals <= {0.0, 1.0}: - return True - - return False - - @staticmethod - def is_categorical_variable(series: pd.Series) -> bool: - """Check if a series represents categorical string/object data.""" - return pd.api.types.is_string_dtype( - series - ) or pd.api.types.is_object_dtype(series) - - @staticmethod - def is_numeric_categorical_variable( - series: pd.Series, max_unique: int = 10 - ) -> bool: - """Check if a numeric series should be treated as categorical.""" - if not pd.api.types.is_numeric_dtype(series): - return False - - if series.nunique() >= max_unique: - return False - - # Check for equal spacing between values - unique_values = np.sort(series.dropna().unique()) - if len(unique_values) < 2: - return True - - differences = np.diff(unique_values) - return np.allclose(differences, differences[0], rtol=1e-9) - - @staticmethod - def categorize_variable( - series: pd.Series, col_name: str, logger: logging.Logger - ) -> Tuple[str, Optional[List]]: - """ - Categorize a variable and return its type and categories if applicable. - - Returns: - Tuple of (variable_type, categories) - variable_type: 'bool', 'categorical', 'numeric_categorical', or 'numeric' - categories: List of unique values for categorical types, None for numeric - """ - if VariableTypeDetector.is_boolean_variable(series): - return "bool", None - - if VariableTypeDetector.is_categorical_variable(series): - return "categorical", series.unique().tolist() - - if VariableTypeDetector.is_numeric_categorical_variable(series): - categories = [float(i) for i in series.unique().tolist()] - logger.info( - f"Treating numeric variable '{col_name}' as categorical due to low unique count and equal spacing" - ) - return "numeric_categorical", categories - - return "numeric", None diff --git a/microimpute/utils/type_handling.py b/microimpute/utils/type_handling.py new file mode 100644 index 0000000..b22f3a7 --- /dev/null +++ b/microimpute/utils/type_handling.py @@ -0,0 +1,356 @@ +"""Variable type detection utilities. + +This module provides utilities for detecting and categorizing variable types +in pandas DataFrames, helping determine whether variables are boolean, categorical, +numeric categorical, or purely numeric. +""" + +import logging +from typing import List, Optional, Tuple + +import numpy as np +import pandas as pd + + +class VariableTypeDetector: + """Utility class for detecting and categorizing variable types.""" + + @staticmethod + def is_boolean_variable(series: pd.Series) -> bool: + """Check if a series represents boolean data.""" + if pd.api.types.is_bool_dtype(series): + return True + + unique_vals = set(series.dropna().unique()) + if pd.api.types.is_integer_dtype(series) and unique_vals <= {0, 1}: + return True + if pd.api.types.is_float_dtype(series) and unique_vals <= {0.0, 1.0}: + return True + + return False + + @staticmethod + def is_categorical_variable(series: pd.Series) -> bool: + """Check if a series represents categorical string/object data.""" + return pd.api.types.is_string_dtype( + series + ) or pd.api.types.is_object_dtype(series) + + @staticmethod + def is_numeric_categorical_variable( + series: pd.Series, max_unique: int = 10 + ) -> bool: + """Check if a numeric series should be treated as categorical.""" + if not pd.api.types.is_numeric_dtype(series): + return False + + if series.nunique() >= max_unique: + return False + + # Check for equal spacing between values + unique_values = np.sort(series.dropna().unique()) + if len(unique_values) < 2: + return True + + differences = np.diff(unique_values) + return np.allclose(differences, differences[0], rtol=1e-9) + + @staticmethod + def categorize_variable( + series: pd.Series, col_name: str, logger: logging.Logger + ) -> Tuple[str, Optional[List]]: + """ + Categorize a variable and return its type and categories if applicable. + + Returns: + Tuple of (variable_type, categories) + variable_type: 'bool', 'categorical', 'numeric_categorical', or 'numeric' + categories: List of unique values for categorical types, None for numeric + """ + if VariableTypeDetector.is_boolean_variable(series): + return "bool", None + + if VariableTypeDetector.is_categorical_variable(series): + return "categorical", series.unique().tolist() + + if VariableTypeDetector.is_numeric_categorical_variable(series): + categories = [float(i) for i in series.unique().tolist()] + logger.info( + f"Treating numeric variable '{col_name}' as categorical due to low unique count and equal spacing" + ) + return "numeric_categorical", categories + + return "numeric", None + + +class DummyVariableProcessor: + """Handles conversion of categorical predictors to dummy variables.""" + + def __init__(self, logger: logging.Logger): + self.logger = logger + self.dummy_mapping = {} # Maps original column to dummy columns + self.imputed_var_dummy_mapping = ( + {} + ) # Pre-computed dummy info for imputed vars + + def preprocess_predictors( + self, + data: pd.DataFrame, + predictors: List[str], + imputed_variables: List[str], + ) -> Tuple[pd.DataFrame, List[str]]: + """ + Process predictor variables and pre-compute dummy encodings. + + For predictors: converts categoricals to dummies and adds to dataframe. + For imputed_variables: pre-computes dummy info but keeps original form. + + Returns: + Tuple of (processed_data, updated_predictors) + """ + # Start with a copy containing all needed columns + all_columns = list(set(predictors + imputed_variables)) + data = data[all_columns].copy() + detector = VariableTypeDetector() + + # Identify categorical predictors (not imputed targets) + categorical_predictors = [] + for col in predictors: + if col not in data.columns: + continue + var_type, categories = detector.categorize_variable( + data[col], col, self.logger + ) + if var_type in ["categorical", "numeric_categorical"]: + categorical_predictors.append(col) + self.logger.info( + f"Will create dummy variables for predictor '{col}' ({var_type})" + ) + + # Pre-compute dummy info for categorical imputed variables + for col in imputed_variables: + if col not in data.columns: + continue + var_type, categories = detector.categorize_variable( + data[col], col, self.logger + ) + if var_type in ["categorical", "numeric_categorical"]: + # Create dummy columns to determine structure + dummy_df = pd.get_dummies( + data[[col]], + columns=[col], + dtype="float64", + drop_first=True, + ) + dummy_cols = [ + c for c in dummy_df.columns if c.startswith(f"{col}_") + ] + + # Store pre-computed dummy info + self.imputed_var_dummy_mapping[col] = { + "dummy_cols": dummy_cols, + "var_type": var_type, + "categories": categories, + } + self.logger.info( + f"Pre-computed {len(dummy_cols)} dummy columns for imputed variable '{col}' ({var_type})" + ) + elif var_type == "bool": + # Track boolean imputed variables + self.imputed_var_dummy_mapping[col] = { + "dummy_cols": None, + "var_type": "bool", + "categories": None, + } + + # Process categorical predictors (add to dataframe) + updated_predictors = predictors.copy() + + if categorical_predictors: + # Create dummy variables for categorical predictors only + dummy_df = pd.get_dummies( + data[categorical_predictors], + columns=categorical_predictors, + dtype="float64", + drop_first=True, + ) + + # Track mapping for each original column + for orig_col in categorical_predictors: + dummy_cols = [ + col + for col in dummy_df.columns + if col.startswith(f"{orig_col}_") + ] + self.dummy_mapping[orig_col] = dummy_cols + + # Update predictor list + updated_predictors.remove(orig_col) + updated_predictors.extend(dummy_cols) + + self.logger.debug( + f"Created {len(dummy_cols)} dummy variables for '{orig_col}'" + ) + + # Drop original categorical columns and add dummies + data = data.drop(columns=categorical_predictors) + data = pd.concat([data, dummy_df], axis=1) + + # Convert boolean predictors to float (but keep as single column) + for col in predictors: + if col in data.columns: + var_type, _ = detector.categorize_variable( + data[col], col, self.logger + ) + if var_type == "bool": + data[col] = data[col].astype("float64") + self.logger.debug( + f"Converted boolean predictor '{col}' to float64" + ) + + return data, updated_predictors + + def sequential_imputed_predictor_encoding( + self, data: pd.DataFrame, variable: str + ) -> pd.DataFrame: + """ + Encode a freshly imputed variable so it can become a predictor. + + For categorical imputed variables: adds pre-computed dummy columns. + For boolean imputed variables: converts to float64. + For numeric variables: no change needed. + + Args: + data: DataFrame containing the imputed variable in original form + variable: Name of the imputed variable to encode + + Returns: + DataFrame with encoded variable (original column kept) + """ + data = data.copy() + + if variable not in self.imputed_var_dummy_mapping: + # Numeric variable - no encoding needed + return data + + var_info = self.imputed_var_dummy_mapping[variable] + + if var_info["var_type"] in ["categorical", "numeric_categorical"]: + # Add pre-computed dummy columns + dummy_cols = var_info["dummy_cols"] + + # Create dummies from current data + dummy_df = pd.get_dummies( + data[[variable]], + columns=[variable], + dtype="float64", + drop_first=True, + ) + + # Ensure we have all expected dummy columns + for dummy_col in dummy_cols: + if dummy_col not in dummy_df.columns: + dummy_df[dummy_col] = 0.0 + + # Keep only pre-computed dummy columns + dummy_df = dummy_df[dummy_cols] + + # Add dummy columns to dataframe (keep original too) + data = pd.concat([data, dummy_df], axis=1) + + self.logger.debug( + f"Added {len(dummy_cols)} dummy columns for sequential predictor '{variable}'" + ) + + elif var_info["var_type"] == "bool": + # Convert boolean to float (in place) + if variable in data.columns: + data[variable] = data[variable].astype("float64") + self.logger.debug( + f"Converted boolean sequential predictor '{variable}' to float64" + ) + + return data + + def get_sequential_predictor_columns( + self, variables: List[str] + ) -> List[str]: + """ + Get correct column names for sequential predictors. + + For categorical imputed variables: returns dummy column names. + For other variables: returns original column name. + + Args: + variables: List of variable names + + Returns: + List of column names to use as predictors + """ + predictor_cols = [] + + for var in variables: + if var in self.imputed_var_dummy_mapping: + var_info = self.imputed_var_dummy_mapping[var] + if var_info["var_type"] in [ + "categorical", + "numeric_categorical", + ]: + # Use dummy columns + predictor_cols.extend(var_info["dummy_cols"]) + else: + # Boolean or numeric - use original column + predictor_cols.append(var) + else: + # Not in mapping - use original column + predictor_cols.append(var) + + return predictor_cols + + def apply_dummy_encoding_to_test( + self, + data: pd.DataFrame, + predictors: List[str], + ) -> Tuple[pd.DataFrame, List[str]]: + """Apply same dummy encoding to test data based on training mapping.""" + detector = VariableTypeDetector() + data = data.copy() + updated_predictors = predictors.copy() + + # Apply dummy encoding based on stored mapping + for orig_col, dummy_cols in self.dummy_mapping.items(): + if orig_col in predictors and orig_col in data.columns: + # Create dummies for this column + dummy_df = pd.get_dummies( + data[[orig_col]], + columns=[orig_col], + dtype="float64", + drop_first=False, # Don't drop first, we'll handle missing manually + ) + + # Ensure we have the exact dummy columns from training + for dummy_col in dummy_cols: + if dummy_col not in dummy_df.columns: + dummy_df[dummy_col] = 0.0 # Missing category gets 0 + + # Keep only the dummy columns from training + dummy_df = dummy_df[dummy_cols] + + # Update data + data = data.drop(columns=[orig_col]) + data = pd.concat([data, dummy_df], axis=1) + + # Update predictor list + updated_predictors.remove(orig_col) + updated_predictors.extend(dummy_cols) + + # Convert boolean predictors to float + for col in predictors: + if col in data.columns: + var_type, _ = detector.categorize_variable( + data[col], col, self.logger + ) + if var_type == "bool": + data[col] = data[col].astype("float64") + + return data, updated_predictors diff --git a/tests/test_models/test_qrf.py b/tests/test_models/test_qrf.py index 149100f..63563c4 100644 --- a/tests/test_models/test_qrf.py +++ b/tests/test_models/test_qrf.py @@ -222,67 +222,6 @@ def test_qrf_missing_categorical_levels_in_test( assert not predictions["target"].isna().any() -# === Hyperparameter Tuning Tests === - - -def test_qrf_hyperparameter_tuning(diabetes_data: pd.DataFrame) -> None: - """Test hyperparameter tuning functionality.""" - predictors = ["age", "sex", "bmi", "bp"] - imputed_variables = ["s1", "s4"] - data = diabetes_data[predictors + imputed_variables] - - # Split data - np.random.seed(42) - train_idx = np.random.choice( - len(data), int(0.7 * len(data)), replace=False - ) - valid_idx = np.array([i for i in range(len(data)) if i not in train_idx]) - - train_data = data.iloc[train_idx].reset_index(drop=True) - valid_data = data.iloc[valid_idx].reset_index(drop=True) - - X_train = preprocess_data( - train_data, full_data=True, train_size=1.0, test_size=0.0 - ) - X_valid = preprocess_data( - valid_data, full_data=True, train_size=1.0, test_size=0.0 - ) - - # Fit models with and without tuning - default_model = QRF() - default_fitted = default_model.fit(X_train, predictors, imputed_variables) - - tuned_model = QRF() - tuned_fitted, best_params = tuned_model.fit( - X_train, predictors, imputed_variables, tune_hyperparameters=True - ) - - # Compare predictions - default_preds = default_fitted.predict(X_valid, quantiles=[0.5]) - tuned_preds = tuned_fitted.predict(X_valid, quantiles=[0.5]) - - # Calculate MSE - default_mse = {} - tuned_mse = {} - - for var in imputed_variables: - default_mse[var] = mean_squared_error( - X_valid[var], default_preds[0.5][var] - ) - tuned_mse[var] = mean_squared_error( - X_valid[var], tuned_preds[0.5][var] - ) - - # Verify hyperparameters are reasonable - for var in imputed_variables: - model = tuned_fitted.models[var] - if hasattr(model, "rf"): - if hasattr(model.rf, "n_estimators"): - assert 50 <= model.rf.n_estimators <= 300 - if hasattr(model.rf, "min_samples_leaf"): - assert 1 <= model.rf.min_samples_leaf <= 10 - - # === Memory Management and Performance Tests === @@ -792,3 +731,319 @@ def test_qrf_performance_characteristics(diabetes_data: pd.DataFrame) -> None: # MSE should be reasonable (not infinite or NaN) assert np.isfinite(mse) assert mse < 1e6 # Reasonable upper bound + + +# === Hyperparameter Tuning Tests === + + +def test_qrf_hyperparameter_tuning_numeric_only() -> None: + """Test hyperparameter tuning with numeric variables only (backward compatibility).""" + np.random.seed(42) + n_samples = 150 + + # Create numeric-only dataset + data = pd.DataFrame( + { + "x1": np.random.randn(n_samples), + "x2": np.random.randn(n_samples), + "y1": np.random.randn(n_samples) * 2 + 5, + "y2": np.random.randn(n_samples) * 3 + 10, + } + ) + + model = QRF(log_level="INFO") + + # Fit with hyperparameter tuning + fitted_model = model.fit( + data, + predictors=["x1", "x2"], + imputed_variables=["y1", "y2"], + tune_hyperparameters=True, + ) + + # Check that tuning returned parameters + assert isinstance(fitted_model, tuple) + fitted_instance, tuned_params = fitted_model + + # Should return flat dict (not nested) for numeric-only + assert isinstance(tuned_params, dict) + assert "qrf" not in tuned_params # Should be flat + assert "rfc" not in tuned_params + + # Check that expected hyperparameters are present + assert "n_estimators" in tuned_params + assert "min_samples_split" in tuned_params + assert "min_samples_leaf" in tuned_params + assert "max_features" in tuned_params + assert "bootstrap" in tuned_params + + # Verify parameter ranges + assert 50 <= tuned_params["n_estimators"] <= 300 + assert 2 <= tuned_params["min_samples_split"] <= 20 + assert 1 <= tuned_params["min_samples_leaf"] <= 10 + assert 0.1 <= tuned_params["max_features"] <= 1.0 + assert tuned_params["bootstrap"] in [True, False] + + # Test that predictions work + predictions = fitted_instance.predict(data[["x1", "x2"]].head(10)) + assert isinstance(predictions, pd.DataFrame) + assert set(predictions.columns) == {"y1", "y2"} + + +def test_qrf_hyperparameter_tuning_categorical_only() -> None: + """Test hyperparameter tuning with categorical variables only.""" + np.random.seed(42) + n_samples = 150 + + # Create categorical-only dataset + data = pd.DataFrame( + { + "x1": np.random.randn(n_samples), + "x2": np.random.randn(n_samples), + "cat1": np.random.choice(["A", "B", "C"], n_samples), + "cat2": np.random.choice(["X", "Y", "Z"], n_samples), + } + ) + + model = QRF(log_level="INFO") + + # Fit with hyperparameter tuning + fitted_model = model.fit( + data, + predictors=["x1", "x2"], + imputed_variables=["cat1", "cat2"], + tune_hyperparameters=True, + ) + + # Check that tuning returned parameters + assert isinstance(fitted_model, tuple) + fitted_instance, tuned_params = fitted_model + + # Should return flat dict (not nested) for categorical-only + assert isinstance(tuned_params, dict) + assert "qrf" not in tuned_params # Should be flat + assert "rfc" not in tuned_params + + # Check that expected hyperparameters are present + assert "n_estimators" in tuned_params + assert "min_samples_split" in tuned_params + assert "min_samples_leaf" in tuned_params + assert "max_features" in tuned_params + assert "bootstrap" in tuned_params + + # Verify parameter ranges + assert 50 <= tuned_params["n_estimators"] <= 300 + assert 2 <= tuned_params["min_samples_split"] <= 20 + assert 1 <= tuned_params["min_samples_leaf"] <= 10 + # For RFC, max_features can be string or float + assert tuned_params["max_features"] in ["sqrt", "log2", 0.5, 0.8, 1.0] + assert tuned_params["bootstrap"] in [True, False] + + # Test that predictions work + predictions = fitted_instance.predict(data[["x1", "x2"]].head(10)) + assert isinstance(predictions, pd.DataFrame) + assert set(predictions.columns) == {"cat1", "cat2"} + + +def test_qrf_hyperparameter_tuning_mixed_variables() -> None: + """Test hyperparameter tuning with mixed numeric and categorical variables.""" + np.random.seed(42) + n_samples = 150 + + # Create mixed dataset + data = pd.DataFrame( + { + "x1": np.random.randn(n_samples), + "x2": np.random.randn(n_samples), + "numeric1": np.random.randn(n_samples) * 2 + 5, + "numeric2": np.random.randn(n_samples) * 3 + 10, + "category": np.random.choice(["Low", "Medium", "High"], n_samples), + "boolean": np.random.choice([True, False], n_samples), + } + ) + + model = QRF(log_level="INFO") + + # Fit with hyperparameter tuning + fitted_model = model.fit( + data, + predictors=["x1", "x2"], + imputed_variables=["numeric1", "numeric2", "category", "boolean"], + tune_hyperparameters=True, + ) + + # Check that tuning returned parameters + assert isinstance(fitted_model, tuple) + fitted_instance, tuned_params = fitted_model + + # Should return NESTED dict for mixed variables + assert isinstance(tuned_params, dict) + assert "qrf" in tuned_params + assert "rfc" in tuned_params + + # Check QRF parameters + qrf_params = tuned_params["qrf"] + assert isinstance(qrf_params, dict) + assert "n_estimators" in qrf_params + assert "max_features" in qrf_params + assert 0.1 <= qrf_params["max_features"] <= 1.0 # QRF uses float + + # Check RFC parameters + rfc_params = tuned_params["rfc"] + assert isinstance(rfc_params, dict) + assert "n_estimators" in rfc_params + assert "max_features" in rfc_params + assert rfc_params["max_features"] in [ + "sqrt", + "log2", + 0.5, + 0.8, + 1.0, + ] # RFC uses categorical + + # Test that predictions work + predictions = fitted_instance.predict(data[["x1", "x2"]].head(10)) + assert isinstance(predictions, pd.DataFrame) + assert set(predictions.columns) == { + "numeric1", + "numeric2", + "category", + "boolean", + } + + # Verify categorical predictions are valid + assert all(predictions["category"].isin(["Low", "Medium", "High"])) + assert all(predictions["boolean"].isin([True, False])) + + +def test_qrf_hyperparameter_tuning_with_cv_folds() -> None: + """Test that hyperparameter tuning uses CV folds correctly.""" + np.random.seed(42) + n_samples = 150 + + data = pd.DataFrame( + { + "x1": np.random.randn(n_samples), + "x2": np.random.randn(n_samples), + "y1": np.random.randn(n_samples), + "cat1": np.random.choice(["A", "B"], n_samples), + } + ) + + # Capture logs + log_stream = io.StringIO() + handler = logging.StreamHandler(log_stream) + handler.setLevel(logging.INFO) + + model = QRF(log_level="INFO") + model.logger.addHandler(handler) + + # Fit with mixed variables to test both QRF and RFC tuning + fitted_model = model.fit( + data, + predictors=["x1", "x2"], + imputed_variables=["y1", "cat1"], + tune_hyperparameters=True, + ) + + log_output = log_stream.getvalue() + + # Verify CV strategy is logged + assert "3-fold CV" in log_output or "3-fold cv" in log_output.lower() + assert "10 trials" in log_output.lower() + + # Verify both QRF and RFC were tuned + assert "QRF" in log_output + assert "RFC" in log_output + + # Verify results structure + fitted_instance, tuned_params = fitted_model + assert "qrf" in tuned_params + assert "rfc" in tuned_params + + model.logger.removeHandler(handler) + + +def test_qrf_hyperparameter_tuning_improves_performance() -> None: + """Test that tuned hyperparameters perform better than untuned model.""" + from microimpute.comparisons.metrics import compute_loss + + np.random.seed(42) + n_samples = 400 + + # Create a more complex dataset where tuning matters + # Include non-linear relationships and interactions + x1 = np.random.randn(n_samples) + x2 = np.random.randn(n_samples) + x3 = np.random.randn(n_samples) + x4 = np.random.randn(n_samples) + x5 = np.random.randn(n_samples) + + # Complex non-linear relationship with interactions + y = ( + 2 * x1 + + 3 * x2 + + 1.5 * x3 * x4 # interaction term + + 0.5 * x1**2 # non-linear term + - 0.8 * x5 + + np.random.randn(n_samples) * 0.5 + ) + + data = pd.DataFrame( + { + "x1": x1, + "x2": x2, + "x3": x3, + "x4": x4, + "x5": x5, + "y": y, + } + ) + + # Split into train and test + train_data = data[:300] + test_data = data[300:] + + # Fit with default hyperparameters (untuned) + untuned_model = QRF(log_level="WARNING") + untuned_fitted = untuned_model.fit( + train_data, + predictors=["x1", "x2", "x3", "x4", "x5"], + imputed_variables=["y"], + ) + + # Fit with tuning + tuned_model = QRF(log_level="WARNING") + tuned_fitted, tuned_params = tuned_model.fit( + train_data, + predictors=["x1", "x2", "x3", "x4", "x5"], + imputed_variables=["y"], + tune_hyperparameters=True, + ) + + # Predict on test set with both models + untuned_predictions = untuned_fitted.predict( + test_data[["x1", "x2", "x3", "x4", "x5"]], quantiles=[0.5] + ) + tuned_predictions = tuned_fitted.predict( + test_data[["x1", "x2", "x3", "x4", "x5"]], quantiles=[0.5] + ) + + # Calculate quantile loss for both models using q=0.5 + true_values = test_data["y"].values + _, untuned_loss = compute_loss( + true_values, + untuned_predictions[0.5]["y"].values, + "quantile_loss", + q=0.5, + ) + _, tuned_loss = compute_loss( + true_values, tuned_predictions[0.5]["y"].values, "quantile_loss", q=0.5 + ) + + # Tuned model should perform at least as well as untuned, or within 20% margin + # (allowing for random variation in small datasets) + margin = 0.2 + assert tuned_loss <= untuned_loss * ( + 1 + margin + ), f"Tuned loss ({tuned_loss:.4f}) should be ≤ {(1+margin)*100}% of untuned loss ({untuned_loss:.4f} * {1+margin} = {untuned_loss * (1+margin):.4f})"