Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions changelog_entry.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
- bump: minor
changes:
added:
- Wasserstein distance and Total Variation Distance metrics for distributional similarity.
fixed:
- Bug in data preprocessing which attempted to normalize categorical variables.
- Bug in loss metric used in Matching hyperparameter tuning.
1 change: 1 addition & 0 deletions microimpute/comparisons/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

# Import loss/metric functions
from microimpute.comparisons.metrics import (
compare_distributions,
compare_metrics,
compute_loss,
get_metric_for_variable_type,
Expand Down
35 changes: 26 additions & 9 deletions microimpute/comparisons/autoimpute.py
Original file line number Diff line number Diff line change
Expand Up @@ -223,20 +223,23 @@ def _evaluate_models_parallel(
def _generate_imputations_for_all_models(
model_classes: List[Type[Imputer]],
best_method: str,
training_data: pd.DataFrame,
imputing_data: pd.DataFrame,
donor_data: pd.DataFrame,
receiver_data: pd.DataFrame,
predictors: List[str],
imputed_variables: List[str],
weight_col: Optional[str],
imputation_q: float,
normalize_data: bool,
normalizing_params: Optional[dict],
train_size: float,
tune_hyperparameters: bool,
hyperparams: Optional[Dict[str, Any]],
log_level: str,
) -> Tuple[Dict[str, pd.DataFrame], Dict[str, Any]]:
"""Generate imputations for all models when impute_all=True.

Note: This function takes the original donor and receiver data and preprocesses
them fresh for each model to ensure proper encoding and normalization.

Returns:
Tuple of (imputations_dict, fitted_models_dict)
"""
Expand All @@ -250,9 +253,9 @@ def _generate_imputations_for_all_models(
if model_name == best_method:
continue # Skip the best method as it's already done

# Check if model can handle the variable types
# Check if model can handle the variable types using original data
if not _can_model_handle_variables(
model_name, training_data, imputed_variables
model_name, donor_data, imputed_variables
):
log.info(
f"Skipping {model_name} due to incompatible variable types."
Expand All @@ -261,6 +264,20 @@ def _generate_imputations_for_all_models(

log.info(f"Generating imputations with {model_name}.")

# Preprocess data fresh for this model
training_data, imputing_data, normalizing_params = (
prepare_data_for_imputation(
donor_data,
receiver_data,
predictors,
imputed_variables,
weight_col,
normalize_data,
train_size,
1 - train_size,
)
)

# Get model-specific hyperparameters if available
model_hyperparams = None
if tune_hyperparameters and hyperparams and model_name in hyperparams:
Expand Down Expand Up @@ -556,14 +573,14 @@ def autoimpute(
_generate_imputations_for_all_models(
model_classes,
best_method,
training_data,
imputing_data,
donor_data,
receiver_data,
predictors,
imputed_variables,
original_imputed_variables,
weight_col,
imputation_q,
normalize_data,
normalizing_params,
train_size,
tune_hyperparameters,
best_hyperparams,
log_level,
Expand Down
166 changes: 166 additions & 0 deletions microimpute/comparisons/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
This module contains utilities for evaluating imputation quality using various metrics:
- Quantile loss for numerical variables
- Log loss for categorical variables
- Distributional similarity metrics (Wasserstein distance, Total Variation Distance)
The module automatically detects which metric to use based on variable type.
"""

Expand All @@ -12,10 +13,12 @@
import numpy as np
import pandas as pd
from pydantic import validate_call
from scipy.stats import wasserstein_distance
from sklearn.metrics import log_loss as sklearn_log_loss

from microimpute.comparisons.validation import (
validate_columns_exist,
validate_dataframe_compatibility,
validate_quantiles,
)
from microimpute.config import QUANTILES, VALIDATE_CONFIG
Expand Down Expand Up @@ -490,3 +493,166 @@ def compare_metrics(
except (KeyError, TypeError, AttributeError) as e:
log.error(f"Error in metrics comparison: {str(e)}")
raise RuntimeError(f"Failed to compare metrics: {str(e)}") from e


def total_variation_distance(
donor_values: np.ndarray, receiver_values: np.ndarray
) -> float:
"""Calculate Total Variation Distance between two categorical distributions.

Total Variation Distance (TVD) measures the maximum difference between
two probability distributions. For categorical variables, it is calculated as:
TVD = 0.5 * sum(|P(x) - Q(x)|) for all categories x

Args:
donor_values: Array of categorical values from donor data.
receiver_values: Array of categorical values from receiver data.

Returns:
Total variation distance value between 0 and 1, where 0 indicates
identical distributions and 1 indicates completely disjoint distributions.

Raises:
ValueError: If inputs are empty or invalid.
"""
if len(donor_values) == 0 or len(receiver_values) == 0:
raise ValueError(
"Both donor and receiver values must be non-empty arrays"
)

# Get all unique categories from both distributions
all_categories = np.union1d(
np.unique(donor_values), np.unique(receiver_values)
)

# Calculate probability distributions
donor_counts = pd.Series(donor_values).value_counts(normalize=True)
receiver_counts = pd.Series(receiver_values).value_counts(normalize=True)

# Calculate TVD
tvd = 0.0
for category in all_categories:
p_donor = donor_counts.get(category, 0.0)
p_receiver = receiver_counts.get(category, 0.0)
tvd += abs(p_donor - p_receiver)

# TVD is half the sum of absolute differences
return tvd / 2.0


@validate_call(config=VALIDATE_CONFIG)
def compare_distributions(
donor_data: pd.DataFrame,
receiver_data: pd.DataFrame,
imputed_variables: List[str],
) -> pd.DataFrame:
"""Compare distributions between donor and receiver data for imputed variables.

Evaluates distributional similarity using appropriate metrics:
- Wasserstein Distance for numerical variables
- Total Variation Distance for categorical variables

Args:
donor_data: DataFrame containing original donor data.
receiver_data: DataFrame containing receiver data with imputations.
imputed_variables: List of variable names to compare.

Returns:
DataFrame with columns 'Variable', 'Metric', and 'Distance' containing
the distributional similarity metrics for each variable.

Raises:
ValueError: If variables don't exist in both DataFrames or if data is invalid.
RuntimeError: If distribution comparison fails.

Example:
>>> donor_df = pd.DataFrame({'income': [1000, 2000, 3000],
... 'region': ['A', 'B', 'A']})
>>> receiver_df = pd.DataFrame({'income': [1100, 1900, 3100],
... 'region': ['A', 'A', 'B']})
>>> result = compare_distributions(donor_df, receiver_df,
... ['income', 'region'])
>>> print(result)
Variable Metric Distance
0 income wasserstein_distance 66.666667
1 region total_variation_distance 0.166667
"""
try:
log.info(
f"Comparing distributions for {len(imputed_variables)} variables"
)
log.info(f"Donor data shape: {donor_data.shape}")
log.info(f"Receiver data shape: {receiver_data.shape}")

# Validate inputs
validate_columns_exist(donor_data, imputed_variables, "donor_data")
validate_columns_exist(
receiver_data, imputed_variables, "receiver_data"
)

results = []

# Detect metric type and compute distance for each variable
detector = VariableTypeDetector()
for var in imputed_variables:
# Get values from both datasets
donor_values = donor_data[var].dropna().values
receiver_values = receiver_data[var].dropna().values

if len(donor_values) == 0 or len(receiver_values) == 0:
log.warning(
f"Skipping variable '{var}' due to insufficient data "
f"(donor: {len(donor_values)}, receiver: {len(receiver_values)})"
)
continue

# Detect variable type using donor data
var_type, _ = detector.categorize_variable(
donor_data[var], var, log
)

# Choose appropriate metric
if var_type in ["bool", "categorical", "numeric_categorical"]:
# Use Total Variation Distance for categorical
metric_name = "total_variation_distance"
distance = total_variation_distance(
donor_values, receiver_values
)
log.debug(
f"TVD for categorical variable '{var}': {distance:.6f}"
)
else:
# Use Wasserstein Distance for numerical
metric_name = "wasserstein_distance"
distance = wasserstein_distance(donor_values, receiver_values)
log.debug(
f"Wasserstein distance for numerical variable '{var}': {distance:.6f}"
)

results.append(
{
"Variable": var,
"Metric": metric_name,
"Distance": distance,
}
)

if not results:
raise ValueError(
"No valid distribution comparisons could be computed. "
"Check that variables have sufficient non-null data."
)

results_df = pd.DataFrame(results)
log.info(
f"Distribution comparison complete. Computed {len(results_df)} metrics."
)

return results_df

except ValueError as e:
# Re-raise validation errors
raise e
except Exception as e:
log.error(f"Error comparing distributions: {str(e)}")
raise RuntimeError(f"Failed to compare distributions: {str(e)}") from e
49 changes: 34 additions & 15 deletions microimpute/models/matching.py
Original file line number Diff line number Diff line change
Expand Up @@ -573,7 +573,7 @@ def _tune_hyperparameters(

optuna.logging.set_verbosity(optuna.logging.WARNING)

# Use same CV strategy as QRF: 3-fold CV with 10 trials
# Use 3-fold CV with 10 trials
n_cv_folds = 3
n_trials = 10

Expand Down Expand Up @@ -605,6 +605,17 @@ def objective(trial: optuna.Trial) -> float:
"k": trial.suggest_int("k", 1, 10),
}

# Detect variable types for appropriate metric selection
from microimpute.comparisons.metrics import (
get_metric_for_variable_type,
)

variable_metrics = {}
for var in imputed_variables:
variable_metrics[var] = get_metric_for_variable_type(
data[var], var
)

# Track errors across CV folds
fold_errors = []

Expand Down Expand Up @@ -678,21 +689,29 @@ def objective(trial: optuna.Trial) -> float:
y_pred = np.full(len(X_val_var), mean_val)
y_val_combined = y_val.values

# Use quantile loss with median (q=0.5) for hyperparameter tuning
_, quantile_loss_value = compute_loss(
y_val_combined.flatten(),
y_pred.flatten(),
"quantile_loss",
q=0.5,
)
# Use appropriate metric based on variable type
metric = variable_metrics[var]

# Normalize by variable's standard deviation
std = np.std(y_val_combined.flatten())
normalized_loss = (
quantile_loss_value / std
if std > 0
else quantile_loss_value
)
if metric == "quantile_loss":
_, loss_value = compute_loss(
y_val_combined.flatten(),
y_pred.flatten(),
"quantile_loss",
q=0.5,
)
# Normalize by variable's standard deviation
std = np.std(y_val_combined.flatten())
normalized_loss = (
loss_value / std if std > 0 else loss_value
)
else: # log_loss for categorical/boolean
_, loss_value = compute_loss(
y_val_combined.flatten(),
y_pred.flatten(),
"log_loss",
)
# Log loss is already normalized
normalized_loss = loss_value

var_errors.append(normalized_loss)

Expand Down
Loading