diff --git a/causalml/inference/meta/base.py b/causalml/inference/meta/base.py index 9a948c7b..18d6f6f2 100644 --- a/causalml/inference/meta/base.py +++ b/causalml/inference/meta/base.py @@ -7,7 +7,13 @@ from tqdm import tqdm from causalml.inference.meta.explainer import Explainer -from causalml.inference.meta.utils import check_p_conditions, convert_pd_to_np +from causalml.inference.meta.utils import ( + check_p_conditions, + filter_mask, + filter_index, + n_rows, + to_numpy, +) from causalml.propensity import compute_propensity_score logger = logging.getLogger("causalml") @@ -30,8 +36,9 @@ def _fit_bootstrap_clone(learner_template, X, treatment, y, p, seed, bootstrap_s A fitted clone of learner_template trained on a bootstrap sample. """ rng = np.random.RandomState(seed) - idxs = rng.choice(np.arange(X.shape[0]), size=bootstrap_size) - X_b = X[idxs] + idxs = rng.choice(np.arange(n_rows(X)), size=bootstrap_size) + + X_b = filter_index(X, idxs) treatment_b = treatment[idxs] y_b = y[idxs] p_b = {group: _p[idxs] for group, _p in p.items()} if p is not None else None @@ -102,12 +109,27 @@ def estimate_ate( pass def bootstrap(self, X, treatment, y, p=None, size=10000, rng=None): - """Runs a single bootstrap. Fits on bootstrapped sample, then predicts on whole population.""" + """Runs a single bootstrap. Fits on bootstrapped sample, then predicts on whole population. + + Args: + X (np.matrix, np.array, pd.DataFrame, or pl.DataFrame): a feature matrix. + Resampled natively via :func:`filter_index`, so X stays in its + original format (numpy/pandas/polars) throughout. + treatment (np.array): a treatment vector (numpy) + y (np.array): an outcome vector (numpy) + p (dict, optional): a dict of {treatment group: propensity scores (numpy)} + size (int, optional): number of samples to draw with replacement + rng (np.random.Generator, optional): random number generator for + deterministic resampling + Returns: + (numpy.ndarray): Predictions of treatment effects on the full X + from a model trained on the resampled subset. + """ if rng is not None: - idxs = rng.choice(np.arange(0, X.shape[0]), size=size) + idxs = rng.choice(np.arange(0, n_rows(X)), size=size) else: - idxs = np.random.choice(np.arange(0, X.shape[0]), size=size) - X_b = X[idxs] + idxs = np.random.choice(np.arange(0, n_rows(X)), size=size) + X_b = filter_index(X, idxs) if p is not None: p_b = {group: _p[idxs] for group, _p in p.items()} @@ -171,21 +193,19 @@ def _format_p(p, t_groups): """Format propensity scores into a dictionary of {treatment group: propensity scores}. Args: - p (np.ndarray, pd.Series, or dict): propensity scores + p (np.ndarray, pd.Series, pl.Series, or dict): propensity scores t_groups (list): treatment group names. Returns: - dict of {treatment group: propensity scores} + dict of {treatment group: propensity scores (numpy.ndarray)} """ check_p_conditions(p, t_groups) - if isinstance(p, (np.ndarray, pd.Series)): + if isinstance(p, dict): + p = {treatment_name: to_numpy(_p) for treatment_name, _p in p.items()} + else: treatment_name = t_groups[0] - p = {treatment_name: convert_pd_to_np(p)} - elif isinstance(p, dict): - p = { - treatment_name: convert_pd_to_np(_p) for treatment_name, _p in p.items() - } + p = {treatment_name: to_numpy(p)} return p @@ -199,19 +219,22 @@ def _set_propensity_models(self, X, treatment, y): PropensityModel (i.e. ElasticNetPropensityModel). Args: - X (np.matrix or np.array or pd.Dataframe): a feature matrix - treatment (np.array or pd.Series): a treatment vector - y (np.array or pd.Series): an outcome vector + X (np.matrix, np.array, pd.DataFrame, or pl.DataFrame): a feature matrix. + Kept in its native format; scikit-learn >= 1.6 accepts pandas + and Polars DataFrames natively, so no conversion is performed. + treatment (np.array, pd.Series, or pl.Series): a treatment vector + y (np.array, pd.Series, or pl.Series): an outcome vector """ logger.info("Generating propensity score") + treatment_np = to_numpy(treatment) p = dict() p_model = dict() for group in self.t_groups: - mask = (treatment == group) | (treatment == self.control_name) - treatment_filt = treatment[mask] - X_filt = X[mask] - w_filt = (treatment_filt == group).astype(int) - w = (treatment == group).astype(int) + mask = (treatment_np == group) | (treatment_np == self.control_name) + treatment_filt_np = treatment_np[mask] + X_filt = filter_mask(X, mask) + w_filt = (treatment_filt_np == group).astype(int) + w = (treatment_np == group).astype(int) propensity_model = self.model_p if hasattr(self, "model_p") else None p[group], p_model[group] = compute_propensity_score( X=X_filt, diff --git a/causalml/inference/meta/drlearner.py b/causalml/inference/meta/drlearner.py index b1d04451..5649f376 100644 --- a/causalml/inference/meta/drlearner.py +++ b/causalml/inference/meta/drlearner.py @@ -11,7 +11,11 @@ from causalml.inference.meta.utils import ( check_treatment_vector, check_p_conditions, - convert_pd_to_np, + collect_if_lazy, + filter_mask, + filter_index, + n_rows, + to_numpy, ) from causalml.metrics import regression_metrics, classification_metrics from causalml.propensity import compute_propensity_score @@ -48,9 +52,9 @@ def __init__( control_name (str or int, optional): name of control group Note: arguments are stored verbatim (scikit-learn convention) so that - ``get_params`` / ``clone`` work correctly. Model construction is deferred to ``fit()``. - Per the scikit-learn convention, ``__init__`` does not validate or raise — - validation happens in ``fit()``. + ``get_params`` / ``clone`` work correctly. Model construction is deferred + to ``fit()``. Per the scikit-learn convention, ``__init__`` does not + validate or raise — validation happens in ``fit()``. """ # Store verbatim — no deepcopy, no logic (scikit-learn convention). self.learner = learner @@ -67,12 +71,18 @@ def fit(self, X, treatment, y, p=None, seed=None): """Fit the inference model. Args: - X (np.matrix or np.array or pd.Dataframe): a feature matrix - treatment (np.array or pd.Series): a treatment vector - y (np.array or pd.Series): an outcome vector - p (np.ndarray or pd.Series or dict, optional): propensity scores + X (np.matrix, np.array, pd.DataFrame, pl.DataFrame, or pl.LazyFrame): a feature matrix. + A pl.LazyFrame is collected once at the start of this method; the + feature matrix is otherwise kept in its native format throughout, + including the KFold partitions (sliced via :func:`filter_index`). + treatment (np.array, pd.Series, or pl.Series): a treatment vector + y (np.array, pd.Series, or pl.Series): an outcome vector + p (np.ndarray, pd.Series, pl.Series, or dict, optional): an array of propensity scores of float (0,1) in the + single-treatment case; or, a dictionary of treatment groups that map to propensity vectors of + float (0,1); if None will run ElasticNetPropensityModel() to generate the propensity scores. seed (int): random seed for cross-fitting """ + X = collect_if_lazy(X) if (self.learner is None) and ( (self.control_outcome_learner is None) or (self.treatment_outcome_learner is None) @@ -83,9 +93,11 @@ def fit(self, X, treatment, y, p=None, seed=None): "`treatment_outcome_learner`, and `treatment_effect_learner` " "must be specified." ) - X, treatment, y = convert_pd_to_np(X, treatment, y) check_treatment_vector(treatment, self.control_name) - self.t_groups = np.unique(treatment[treatment != self.control_name]) + treatment_np = to_numpy(treatment) + y_np = to_numpy(y) + + self.t_groups = np.unique(treatment_np[treatment_np != self.control_name]) self.t_groups.sort() self._classes = {group: i for i, group in enumerate(self.t_groups)} @@ -110,7 +122,7 @@ def fit(self, X, treatment, y, p=None, seed=None): # the outcome regression, and the treatment regression on the doubly robust estimates. The use of # the partitions is rotated so we do not lose on the sample size. cv = KFold(n_splits=3, shuffle=True, random_state=seed) - split_indices = [index for _, index in cv.split(y)] + split_indices = [index for _, index in cv.split(y_np)] self.models_mu_c = [ deepcopy(_control_outcome_learner), @@ -134,33 +146,38 @@ def fit(self, X, treatment, y, p=None, seed=None): for group in self.t_groups } if p is None: - self.propensity = {group: np.zeros(y.shape[0]) for group in self.t_groups} + self.propensity = { + group: np.zeros(y_np.shape[0]) for group in self.t_groups + } for ifold in range(3): treatment_idx = split_indices[ifold] outcome_idx = split_indices[(ifold + 1) % 3] tau_idx = split_indices[(ifold + 2) % 3] - treatment_treat, treatment_out, treatment_tau = ( - treatment[treatment_idx], - treatment[outcome_idx], - treatment[tau_idx], - ) - y_out, y_tau = y[outcome_idx], y[tau_idx] - X_treat, X_out, X_tau = X[treatment_idx], X[outcome_idx], X[tau_idx] + treatment_treat = filter_index(treatment, treatment_idx) + treatment_out_np = treatment_np[outcome_idx] + treatment_tau_np = treatment_np[tau_idx] + treatment_treat_np = treatment_np[treatment_idx] + + y_out = y_np[outcome_idx] + y_tau = y_np[tau_idx] + + X_treat = filter_index(X, treatment_idx) + X_out = filter_index(X, outcome_idx) + X_tau = filter_index(X, tau_idx) if p is None: logger.info("Generating propensity score") cur_p = dict() for group in self.t_groups: - mask = (treatment_treat == group) | ( - treatment_treat == self.control_name + mask = (treatment_treat_np == group) | ( + treatment_treat_np == self.control_name ) - treatment_filt = treatment_treat[mask] - X_filt = X_treat[mask] - w_filt = (treatment_filt == group).astype(int) - w = (treatment_tau == group).astype(int) + X_filt = filter_mask(X_treat, mask) + w_filt = (treatment_treat_np[mask] == group).astype(int) + w = (treatment_tau_np == group).astype(int) cur_p[group], _ = compute_propensity_score( X=X_filt, treatment=w_filt, X_pred=X_tau, treatment_pred=w ) @@ -168,29 +185,31 @@ def fit(self, X, treatment, y, p=None, seed=None): else: cur_p = dict() if isinstance(p, (np.ndarray, pd.Series)): - cur_p = {self.t_groups[0]: convert_pd_to_np(p[tau_idx])} + cur_p = {self.t_groups[0]: to_numpy(p)[tau_idx]} else: - cur_p = {g: prop[tau_idx] for g, prop in p.items()} + cur_p = {g: to_numpy(prop)[tau_idx] for g, prop in p.items()} check_p_conditions(cur_p, self.t_groups) logger.info("Generate outcome regressions") self.models_mu_c[ifold].fit( - X_out[treatment_out == self.control_name], - y_out[treatment_out == self.control_name], + filter_mask(X_out, treatment_out_np == self.control_name), + y_out[treatment_out_np == self.control_name], ) for group in self.t_groups: self.models_mu_t[group][ifold].fit( - X_out[treatment_out == group], y_out[treatment_out == group] + filter_mask(X_out, treatment_out_np == group), + y_out[treatment_out_np == group], ) logger.info("Fit pseudo outcomes from the DR formula") for group in self.t_groups: - mask = (treatment_tau == group) | (treatment_tau == self.control_name) - treatment_filt = treatment_tau[mask] - X_filt = X_tau[mask] + mask = (treatment_tau_np == group) | ( + treatment_tau_np == self.control_name + ) + X_filt = filter_mask(X_tau, mask) y_filt = y_tau[mask] - w_filt = (treatment_filt == group).astype(int) + w_filt = (treatment_tau_np[mask] == group).astype(int) p_filt = cur_p[group][mask] mu_t = self.models_mu_t[group][ifold].predict(X_filt) mu_c = self.models_mu_c[ifold].predict(X_filt) @@ -206,12 +225,28 @@ def fit(self, X, treatment, y, p=None, seed=None): return self def bootstrap(self, X, treatment, y, p=None, size=10000, rng=None, seed=None): - """Runs a single bootstrap with optional deterministic cross-fit seed.""" + """Runs a single bootstrap with optional deterministic cross-fit seed. + + Args: + X (np.matrix, np.array, pd.DataFrame, or pl.DataFrame): a feature matrix. + Resampled natively via :func:`filter_index`. + treatment (np.array): a treatment vector (numpy) + y (np.array): an outcome vector (numpy) + p (dict, optional): a dict of {treatment group: propensity scores (numpy)} + size (int, optional): number of samples to draw with replacement + rng (np.random.Generator, optional): random number generator for + deterministic resampling + seed (int, optional): random seed for cross-fitting within the + resampled fit() call + Returns: + (numpy.ndarray): Predictions of treatment effects on the full X + from a model trained on the resampled subset. + """ if rng is not None: - idxs = rng.choice(np.arange(0, X.shape[0]), size=size) + idxs = rng.choice(np.arange(0, n_rows(X)), size=size) else: - idxs = np.random.choice(np.arange(0, X.shape[0]), size=size) - X_b = X[idxs] + idxs = np.random.choice(np.arange(0, n_rows(X)), size=size) + X_b = filter_index(X, idxs) if p is not None: p_b = {group: _p[idxs] for group, _p in p.items()} @@ -229,16 +264,17 @@ def predict( """Predict treatment effects. Args: - X (np.matrix or np.array or pd.Dataframe): a feature matrix - treatment (np.array or pd.Series, optional): a treatment vector - y (np.array or pd.Series, optional): an outcome vector + X (np.matrix, np.array, pd.DataFrame, pl.DataFrame, or pl.LazyFrame): a feature matrix. + A pl.LazyFrame is collected once at the start of this method. + treatment (np.array, pd.Series, or pl.Series, optional): a treatment vector + y (np.array, pd.Series, or pl.Series, optional): an outcome vector verbose (bool, optional): whether to output progress logs Returns: (numpy.ndarray): Predictions of treatment effects. """ - X, treatment, y = convert_pd_to_np(X, treatment, y) + X = collect_if_lazy(X) - te = np.zeros((X.shape[0], self.t_groups.shape[0])) + te = np.zeros((n_rows(X), self.t_groups.shape[0])) yhat_ts = {} yhat_c = np.r_[[model.predict(X) for model in self.models_mu_c]].mean(axis=0) @@ -253,10 +289,11 @@ def predict( ].mean(axis=0) if (y is not None) and (treatment is not None) and verbose: - mask = (treatment == group) | (treatment == self.control_name) - treatment_filt = treatment[mask] - y_filt = y[mask] - w = (treatment_filt == group).astype(int) + treatment_np = to_numpy(treatment) + mask = (treatment_np == group) | (treatment_np == self.control_name) + treatment_filt_np = treatment_np[mask] + y_filt = to_numpy(filter_mask(y, mask)) + w = (treatment_filt_np == group).astype(int) yhat = np.zeros_like(y_filt, dtype=float) yhat[w == 0] = yhat_c[mask][w == 0] @@ -283,13 +320,15 @@ def fit_predict( verbose=True, seed=None, ): - """Fit the DR-learner and predict treatment effects. + """Fit the treatment effect and outcome models of the DR learner and predict treatment effects. Args: - X (np.matrix or np.array or pd.Dataframe): a feature matrix - treatment (np.array or pd.Series): a treatment vector - y (np.array or pd.Series): an outcome vector - p (np.ndarray or pd.Series or dict, optional): propensity scores + X (np.matrix, np.array, pd.DataFrame, pl.DataFrame, or pl.LazyFrame): a feature matrix + treatment (np.array, pd.Series, or pl.Series): a treatment vector + y (np.array, pd.Series, or pl.Series): an outcome vector + p (np.ndarray, pd.Series, pl.Series, or dict, optional): an array of propensity scores of float (0,1) in the + single-treatment case; or, a dictionary of treatment groups that map to propensity vectors of + float (0,1); if None will run ElasticNetPropensityModel() to generate the propensity scores. return_ci (bool): whether to return confidence intervals n_bootstraps (int): number of bootstrap iterations bootstrap_size (int): number of samples per bootstrap @@ -299,7 +338,7 @@ def fit_predict( Returns: (numpy.ndarray): Predictions of treatment effects. """ - X, treatment, y = convert_pd_to_np(X, treatment, y) + X = collect_if_lazy(X) self.fit(X, treatment, y, p, seed) if p is None: @@ -307,12 +346,9 @@ def fit_predict( check_p_conditions(p, self.t_groups) if isinstance(p, (np.ndarray, pd.Series)): - treatment_name = self.t_groups[0] - p = {treatment_name: convert_pd_to_np(p)} + p = {self.t_groups[0]: to_numpy(p)} elif isinstance(p, dict): - p = { - treatment_name: convert_pd_to_np(_p) for treatment_name, _p in p.items() - } + p = {k: to_numpy(v) for k, v in p.items()} te = self.predict( X, treatment=treatment, y=y, return_components=return_components @@ -321,13 +357,16 @@ def fit_predict( if not return_ci: return te else: + treatment_np = to_numpy(treatment) + y_np = to_numpy(y) + t_groups_global = self.t_groups _classes_global = self._classes models_mu_c_global = deepcopy(self.models_mu_c) models_mu_t_global = deepcopy(self.models_mu_t) models_tau_global = deepcopy(self.models_tau) te_bootstraps = np.zeros( - shape=(X.shape[0], self.t_groups.shape[0], n_bootstraps) + shape=(n_rows(X), self.t_groups.shape[0], n_bootstraps) ) rng = np.random.default_rng(seed) if seed is not None else None @@ -340,8 +379,8 @@ def fit_predict( ) te_b = self.bootstrap( X, - treatment, - y, + treatment_np, + y_np, p, size=bootstrap_size, rng=rng, @@ -377,10 +416,12 @@ def estimate_ate( """Estimate the Average Treatment Effect (ATE). Args: - X (np.matrix or np.array or pd.Dataframe): a feature matrix - treatment (np.array or pd.Series): a treatment vector - y (np.array or pd.Series): an outcome vector - p (np.ndarray or pd.Series or dict, optional): propensity scores + X (np.matrix, np.array, pd.DataFrame, pl.DataFrame, or pl.LazyFrame): a feature matrix + treatment (np.array, pd.Series, or pl.Series): a treatment vector + y (np.array, pd.Series, or pl.Series): an outcome vector + p (np.ndarray, pd.Series, pl.Series, or dict, optional): an array of propensity scores of float (0,1) in the + single-treatment case; or, a dictionary of treatment groups that map to propensity vectors of + float (0,1); if None will run ElasticNetPropensityModel() to generate the propensity scores. bootstrap_ci (bool): whether run bootstrap for confidence intervals n_bootstraps (int): number of bootstrap iterations bootstrap_size (int): number of samples per bootstrap @@ -389,6 +430,8 @@ def estimate_ate( Returns: The mean and confidence interval (LB, UB) of the ATE estimate. """ + X = collect_if_lazy(X) + if pretrain: if not hasattr(self, "t_groups"): raise ValueError( @@ -401,19 +444,18 @@ def estimate_ate( te, yhat_cs, yhat_ts = self.fit_predict( X, treatment, y, p, return_components=True, seed=seed ) - X, treatment, y = convert_pd_to_np(X, treatment, y) + + treatment_np = to_numpy(treatment) + y_np = to_numpy(y) if p is None: p = self.propensity else: check_p_conditions(p, self.t_groups) if isinstance(p, (np.ndarray, pd.Series)): - treatment_name = self.t_groups[0] - p = {treatment_name: convert_pd_to_np(p)} + p = {self.t_groups[0]: to_numpy(p)} elif isinstance(p, dict): - p = { - treatment_name: convert_pd_to_np(_p) for treatment_name, _p in p.items() - } + p = {k: to_numpy(v) for k, v in p.items()} ate = np.zeros(self.t_groups.shape[0]) ate_lb = np.zeros(self.t_groups.shape[0]) @@ -422,14 +464,14 @@ def estimate_ate( for i, group in enumerate(self.t_groups): _ate = te[:, i].mean() - mask = (treatment == group) | (treatment == self.control_name) - treatment_filt = treatment[mask] + mask = (treatment_np == group) | (treatment_np == self.control_name) + treatment_filt = treatment_np[mask] w = (treatment_filt == group).astype(int) prob_treatment = float(sum(w)) / w.shape[0] yhat_c = yhat_cs[group][mask] yhat_t = yhat_ts[group][mask] - y_filt = y[mask] + y_filt = y_np[mask] se = np.sqrt( ( @@ -468,8 +510,8 @@ def estimate_ate( ) cate_b = self.bootstrap( X, - treatment, - y, + treatment_np, + y_np, p, size=bootstrap_size, rng=rng, @@ -493,9 +535,7 @@ def estimate_ate( class BaseDRRegressor(BaseDRLearner): - """ - A parent class for DR-learner regressor classes. - """ + """A parent class for DR-learner regressor classes.""" def __init__( self, @@ -517,9 +557,7 @@ def __init__( class BaseDRClassifier(BaseDRLearner): - """ - A parent class for DR-learner classifier classes. - """ + """A parent class for DR-learner classifier classes.""" def __init__( self, @@ -542,10 +580,30 @@ def __init__( def predict( self, X, treatment=None, y=None, p=None, return_components=False, verbose=True ): - """Predict treatment effects (classifier variant — uses predict_proba for outcomes).""" - X, treatment, y = convert_pd_to_np(X, treatment, y) + """Predict treatment effects (classifier variant — uses predict_proba for outcomes). + + Args: + X (np.matrix, np.array, pd.DataFrame, pl.DataFrame, or pl.LazyFrame): a feature matrix. + A pl.LazyFrame is collected once at the start of this method. + treatment (np.array, pd.Series, or pl.Series, optional): a treatment vector. Used for computing + classification metrics when y is also provided. + y (np.array, pd.Series, or pl.Series, optional): an outcome vector. Used for computing + classification metrics when treatment is also provided. + p (np.ndarray, pd.Series, pl.Series, or dict, optional): an array of propensity scores of float (0,1) in the + single-treatment case; or, a dictionary of treatment groups that map to propensity vectors of + float (0,1). Currently not used in prediction but kept for API consistency. + return_components (bool, optional): whether to return outcome probabilities for treatment and control + groups separately. Defaults to False. + verbose (bool, optional): whether to output progress logs. Defaults to True. + Returns: + (numpy.ndarray): Predictions of treatment effects. + If return_components is True, also returns: + - dict: Predicted probabilities for the control group (yhat_cs). + - dict: Predicted probabilities for the treatment group (yhat_ts). + """ + X = collect_if_lazy(X) - te = np.zeros((X.shape[0], self.t_groups.shape[0])) + te = np.zeros((n_rows(X), self.t_groups.shape[0])) yhat_ts = {} yhat_c = np.r_[ @@ -562,10 +620,11 @@ def predict( ].mean(axis=0) if (y is not None) and (treatment is not None) and verbose: - mask = (treatment == group) | (treatment == self.control_name) - treatment_filt = treatment[mask] - y_filt = y[mask] - w = (treatment_filt == group).astype(int) + treatment_np = to_numpy(treatment) + mask = (treatment_np == group) | (treatment_np == self.control_name) + treatment_filt_np = treatment_np[mask] + y_filt = to_numpy(filter_mask(y, mask)) + w = (treatment_filt_np == group).astype(int) yhat = np.zeros_like(y_filt, dtype=float) yhat[w == 0] = yhat_c[mask][w == 0] diff --git a/causalml/inference/meta/rlearner.py b/causalml/inference/meta/rlearner.py index 1d76b15e..3120038f 100644 --- a/causalml/inference/meta/rlearner.py +++ b/causalml/inference/meta/rlearner.py @@ -9,8 +9,11 @@ from causalml.inference.meta.base import BaseLearner from causalml.inference.meta.utils import ( check_treatment_vector, + collect_if_lazy, + filter_mask, + n_rows, + to_numpy, get_xgboost_objective_metric, - convert_pd_to_np, get_weighted_variance, ) from causalml.propensity import ElasticNetPropensityModel @@ -55,9 +58,10 @@ def __init__( processors Note: arguments are stored verbatim (scikit-learn convention) so that - ``get_params`` / ``clone`` work correctly. Model construction is deferred to ``fit()``. - Per the scikit-learn convention, ``__init__`` does not validate or raise — - validation of ``learner``/``outcome_learner``/``effect_learner`` happens in ``fit()``. + ``get_params`` / ``clone`` work correctly. Model construction is deferred + to ``fit()``. Per the scikit-learn convention, ``__init__`` does not + validate or raise — validation of ``learner``/``outcome_learner``/ + ``effect_learner`` happens in ``fit()``. """ # Store verbatim — no deepcopy, no logic (scikit-learn convention). self.learner = learner @@ -74,13 +78,21 @@ def fit(self, X, treatment, y, p=None, sample_weight=None, verbose=True): """Fit the treatment effect and outcome models of the R learner. Args: - X (np.matrix or np.array or pd.Dataframe): a feature matrix - treatment (np.array or pd.Series): a treatment vector - y (np.array or pd.Series): an outcome vector - p (np.ndarray or pd.Series or dict, optional): propensity scores - sample_weight (np.array or pd.Series, optional): sample weights for `effect_learner`. + X (np.matrix, np.array, pd.DataFrame, pl.DataFrame, or pl.LazyFrame): a feature matrix. + A pl.LazyFrame is collected once at the start of this method; the + feature matrix is otherwise kept in its native format throughout, + including the call to ``cross_val_predict`` (scikit-learn >= 1.6 + accepts pandas and Polars DataFrames natively). + treatment (np.array, pd.Series, or pl.Series): a treatment vector + y (np.array, pd.Series, or pl.Series): an outcome vector + p (np.ndarray, pd.Series, pl.Series, or dict, optional): an array of propensity scores of float (0,1) in the + single-treatment case; or, a dictionary of treatment groups that map to propensity vectors of + float (0,1); if None will run ElasticNetPropensityModel() to generate the propensity scores. + sample_weight (np.array, pd.Series, or pl.Series, optional): an array of sample weights indicating the + weight of each observation for `effect_learner`. If None, it assumes equal weight. verbose (bool, optional): whether to output progress logs """ + X = collect_if_lazy(X) if (self.learner is None) and ( (self.outcome_learner is None) or (self.effect_learner is None) ): @@ -90,19 +102,21 @@ def fit(self, X, treatment, y, p=None, sample_weight=None, verbose=True): ) if self.propensity_learner is None: raise ValueError("`propensity_learner` must be specified.") - X, treatment, y = convert_pd_to_np(X, treatment, y) check_treatment_vector(treatment, self.control_name) + treatment_np = to_numpy(treatment) + y_np = to_numpy(y) + if sample_weight is not None: assert len(sample_weight) == len( - y + y_np ), "Data length must be equal for sample_weight and the input data" - sample_weight = convert_pd_to_np(sample_weight) + sample_weight = to_numpy(sample_weight) - self.t_groups = np.unique(treatment[treatment != self.control_name]) + self.t_groups = np.unique(treatment_np[treatment_np != self.control_name]) self.t_groups.sort() if p is None: - self._set_propensity_models(X=X, treatment=treatment, y=y) + self._set_propensity_models(X=X, treatment=treatment_np, y=y_np) p = self.propensity else: p = self._format_p(p, self.t_groups) @@ -132,26 +146,30 @@ def fit(self, X, treatment, y, p=None, sample_weight=None, verbose=True): if verbose: logger.info("generating out-of-fold CV outcome estimates") - yhat = cross_val_predict(self.model_mu, X, y, cv=self.cv, n_jobs=self.cv_n_jobs) + yhat = cross_val_predict( + self.model_mu, X, y_np, cv=self.cv, n_jobs=self.cv_n_jobs + ) for group in self.t_groups: - mask = (treatment == group) | (treatment == self.control_name) - treatment_filt = treatment[mask] - X_filt = X[mask] - y_filt = y[mask] + mask = (treatment_np == group) | (treatment_np == self.control_name) + treatment_filt = filter_mask(treatment, mask) + X_filt = filter_mask(X, mask) + y_filt = y_np[mask] yhat_filt = yhat[mask] p_filt = p[group][mask] - w = (treatment_filt == group).astype(int) + w = (to_numpy(treatment_filt) == group).astype(int) weight = (w - p_filt) ** 2 diff_c = y_filt[w == 0] - yhat_filt[w == 0] diff_t = y_filt[w == 1] - yhat_filt[w == 1] if sample_weight is not None: sample_weight_filt = sample_weight[mask] - sample_weight_filt_c = sample_weight_filt[w == 0] - sample_weight_filt_t = sample_weight_filt[w == 1] - self.vars_c[group] = get_weighted_variance(diff_c, sample_weight_filt_c) - self.vars_t[group] = get_weighted_variance(diff_t, sample_weight_filt_t) + self.vars_c[group] = get_weighted_variance( + diff_c, sample_weight_filt[w == 0] + ) + self.vars_t[group] = get_weighted_variance( + diff_t, sample_weight_filt[w == 1] + ) weight *= sample_weight_filt else: self.vars_c[group] = diff_c.var() @@ -172,17 +190,16 @@ def predict(self, X, p=None): """Predict treatment effects. Args: - X (np.matrix or np.array or pd.Dataframe): a feature matrix + X (np.matrix, np.array, pd.DataFrame, pl.DataFrame, or pl.LazyFrame): a feature matrix. + A pl.LazyFrame is collected once at the start of this method. Returns: (numpy.ndarray): Predictions of treatment effects. """ - X = convert_pd_to_np(X) - te = np.zeros((X.shape[0], self.t_groups.shape[0])) + X = collect_if_lazy(X) + te = np.zeros((n_rows(X), self.t_groups.shape[0])) for i, group in enumerate(self.t_groups): - dhat = self.models_tau[group].predict(X) - te[:, i] = dhat - + te[:, i] = self.models_tau[group].predict(X) return te def fit_predict( @@ -200,11 +217,14 @@ def fit_predict( """Fit the R learner and predict treatment effects. Args: - X (np.matrix or np.array or pd.Dataframe): a feature matrix - treatment (np.array or pd.Series): a treatment vector - y (np.array or pd.Series): an outcome vector - p (np.ndarray or pd.Series or dict, optional): propensity scores - sample_weight (np.array or pd.Series, optional): sample weights + X (np.matrix, np.array, pd.DataFrame, pl.DataFrame, or pl.LazyFrame): a feature matrix + treatment (np.array, pd.Series, or pl.Series): a treatment vector + y (np.array, pd.Series, or pl.Series): an outcome vector + p (np.ndarray, pd.Series, pl.Series, or dict, optional): an array of propensity scores of float (0,1) in the + single-treatment case; or, a dictionary of treatment groups that map to propensity vectors of + float (0,1); if None will run ElasticNetPropensityModel() to generate the propensity scores. + sample_weight (np.array, pd.Series, or pl.Series, optional): an array of sample weights indicating the + weight of each observation for `effect_learner`. If None, it assumes equal weight. return_ci (bool): whether to return confidence intervals n_bootstraps (int): number of bootstrap iterations bootstrap_size (int): number of samples per bootstrap @@ -212,19 +232,22 @@ def fit_predict( Returns: (numpy.ndarray): Predictions of treatment effects. """ - X, treatment, y = convert_pd_to_np(X, treatment, y) + X = collect_if_lazy(X) self.fit(X, treatment, y, p, sample_weight, verbose=verbose) te = self.predict(X) if not return_ci: return te else: + treatment_np = to_numpy(treatment) + y_np = to_numpy(y) + t_groups_global = self.t_groups _classes_global = self._classes model_mu_global = deepcopy(self.model_mu) models_tau_global = deepcopy(self.models_tau) te_bootstraps = np.zeros( - shape=(X.shape[0], self.t_groups.shape[0], n_bootstraps) + shape=(n_rows(X), self.t_groups.shape[0], n_bootstraps) ) logger.info("Bootstrap Confidence Intervals") @@ -233,7 +256,7 @@ def fit_predict( p = self.propensity else: p = self._format_p(p, self.t_groups) - te_b = self.bootstrap(X, treatment, y, p, size=bootstrap_size) + te_b = self.bootstrap(X, treatment_np, y_np, p, size=bootstrap_size) te_bootstraps[:, :, i] = te_b te_lower = np.percentile(te_bootstraps, (self.ate_alpha / 2) * 100, axis=2) @@ -263,11 +286,14 @@ def estimate_ate( """Estimate the Average Treatment Effect (ATE). Args: - X (np.matrix or np.array or pd.Dataframe): a feature matrix - treatment (np.array or pd.Series): treatment vector (needed when pretrain=False) - y (np.array or pd.Series): outcome vector (needed when pretrain=False) - p (np.ndarray or pd.Series or dict, optional): propensity scores - sample_weight (np.array or pd.Series, optional): sample weights + X (np.matrix, np.array, pd.DataFrame, pl.DataFrame, or pl.LazyFrame): a feature matrix + treatment (np.array, pd.Series, or pl.Series): only needed when pretrain=False, a treatment vector + y (np.array, pd.Series, or pl.Series): only needed when pretrain=False, an outcome vector + p (np.ndarray, pd.Series, pl.Series, or dict, optional): an array of propensity scores of float (0,1) in the + single-treatment case; or, a dictionary of treatment groups that map to propensity vectors of + float (0,1); if None will run ElasticNetPropensityModel() to generate the propensity scores. + sample_weight (np.array, pd.Series, or pl.Series, optional): an array of sample weights indicating the + weight of each observation for `effect_learner`. If None, it assumes equal weight. bootstrap_ci (bool): whether run bootstrap for confidence intervals n_bootstraps (int): number of bootstrap iterations bootstrap_size (int): number of samples per bootstrap @@ -275,31 +301,39 @@ def estimate_ate( Returns: The mean and confidence interval (LB, UB) of the ATE estimate. """ - X, treatment, y = convert_pd_to_np(X, treatment, y) + X = collect_if_lazy(X) + treatment_np = to_numpy(treatment) + y_np = to_numpy(y) + if pretrain: te = self.predict(X, p) else: - if not len(treatment) or not len(y): + if treatment is None or y is None: raise ValueError("treatment and y must be provided when pretrain=False") - te = self.fit_predict(X, treatment, y, p, sample_weight, return_ci=False) + + te = self.fit_predict( + X, + treatment, + y, + p, + sample_weight, + return_ci=False, + ) ate = np.zeros(self.t_groups.shape[0]) ate_lb = np.zeros(self.t_groups.shape[0]) ate_ub = np.zeros(self.t_groups.shape[0]) for i, group in enumerate(self.t_groups): - w = (treatment == group).astype(int) - prob_treatment = float(sum(w)) / X.shape[0] + w = (treatment_np == group).astype(int) + prob_treatment = float(sum(w)) / n_rows(X) _ate = te[:, i].mean() - se = ( - np.sqrt( - (self.vars_t[group] / prob_treatment) - + (self.vars_c[group] / (1 - prob_treatment)) - + te[:, i].var() - ) - / X.shape[0] - ) + se = np.sqrt( + (self.vars_t[group] / prob_treatment) + + (self.vars_c[group] / (1 - prob_treatment)) + + te[:, i].var() + ) / n_rows(X) _ate_lb = _ate - se * norm.ppf(1 - self.ate_alpha / 2) _ate_ub = _ate + se * norm.ppf(1 - self.ate_alpha / 2) @@ -324,7 +358,7 @@ def estimate_ate( p = self.propensity else: p = self._format_p(p, self.t_groups) - cate_b = self.bootstrap(X, treatment, y, p, size=bootstrap_size) + cate_b = self.bootstrap(X, treatment_np, y_np, p, size=bootstrap_size) ate_bootstraps[:, n] = cate_b.mean(axis=0) ate_lower = np.percentile( @@ -342,9 +376,7 @@ def estimate_ate( class BaseRRegressor(BaseRLearner): - """ - A parent class for R-learner regressor classes. - """ + """A parent class for R-learner regressor classes.""" def __init__( self, @@ -370,9 +402,7 @@ def __init__( class BaseRClassifier(BaseRLearner): - """ - A parent class for R-learner classifier classes. - """ + """A parent class for R-learner classifier classes.""" def __init__( self, @@ -412,19 +442,36 @@ def __init__( ) def fit(self, X, treatment, y, p=None, sample_weight=None, verbose=True): - """Fit the R-learner classifier (uses predict_proba for outcome estimates).""" - X, treatment, y = convert_pd_to_np(X, treatment, y) + """Fit the R-learner classifier (uses predict_proba for outcome estimates). + + Args: + X (np.matrix, np.array, pd.DataFrame, pl.DataFrame, or pl.LazyFrame): a feature matrix. + A pl.LazyFrame is collected once at the start of this method. + treatment (np.array, pd.Series, or pl.Series): a treatment vector + y (np.array, pd.Series, or pl.Series): an outcome vector + p (np.ndarray, pd.Series, pl.Series, or dict, optional): an array of propensity scores of float (0,1) in the + single-treatment case; or, a dictionary of treatment groups that map to propensity vectors of + float (0,1); if None will run ElasticNetPropensityModel() to generate the propensity scores. + sample_weight (np.array, pd.Series, or pl.Series, optional): an array of sample weights indicating the + weight of each observation for `effect_learner`. If None, it assumes equal weight. + verbose (bool, optional): whether to output progress logs + """ + X = collect_if_lazy(X) check_treatment_vector(treatment, self.control_name) + treatment_np = to_numpy(treatment) + y_np = to_numpy(y) + if sample_weight is not None: assert len(sample_weight) == len( - y + y_np ), "Data length must be equal for sample_weight and the input data" - sample_weight = convert_pd_to_np(sample_weight) - self.t_groups = np.unique(treatment[treatment != self.control_name]) + sample_weight = to_numpy(sample_weight) + + self.t_groups = np.unique(treatment_np[treatment_np != self.control_name]) self.t_groups.sort() if p is None: - self._set_propensity_models(X=X, treatment=treatment, y=y) + self._set_propensity_models(X=X, treatment=treatment_np, y=y_np) p = self.propensity else: p = self._format_p(p, self.t_groups) @@ -446,27 +493,29 @@ def fit(self, X, treatment, y, p=None, sample_weight=None, verbose=True): if verbose: logger.info("generating out-of-fold CV outcome estimates") yhat = cross_val_predict( - self.model_mu, X, y, cv=self.cv, method="predict_proba", n_jobs=-1 + self.model_mu, X, y_np, cv=self.cv, method="predict_proba", n_jobs=-1 )[:, 1] for group in self.t_groups: - mask = (treatment == group) | (treatment == self.control_name) - treatment_filt = treatment[mask] - X_filt = X[mask] - y_filt = y[mask] + mask = (treatment_np == group) | (treatment_np == self.control_name) + treatment_filt = filter_mask(treatment, mask) + X_filt = filter_mask(X, mask) + y_filt = y_np[mask] yhat_filt = yhat[mask] p_filt = p[group][mask] - w = (treatment_filt == group).astype(int) + w = (to_numpy(treatment_filt) == group).astype(int) weight = (w - p_filt) ** 2 diff_c = y_filt[w == 0] - yhat_filt[w == 0] diff_t = y_filt[w == 1] - yhat_filt[w == 1] if sample_weight is not None: sample_weight_filt = sample_weight[mask] - sample_weight_filt_c = sample_weight_filt[w == 0] - sample_weight_filt_t = sample_weight_filt[w == 1] - self.vars_c[group] = get_weighted_variance(diff_c, sample_weight_filt_c) - self.vars_t[group] = get_weighted_variance(diff_t, sample_weight_filt_t) + self.vars_c[group] = get_weighted_variance( + diff_c, sample_weight_filt[w == 0] + ) + self.vars_t[group] = get_weighted_variance( + diff_t, sample_weight_filt[w == 1] + ) weight *= sample_weight_filt else: self.vars_c[group] = diff_c.var() @@ -484,13 +533,19 @@ def fit(self, X, treatment, y, p=None, sample_weight=None, verbose=True): return self def predict(self, X, p=None): - """Predict treatment effects.""" - X = convert_pd_to_np(X) - te = np.zeros((X.shape[0], self.t_groups.shape[0])) - for i, group in enumerate(self.t_groups): - dhat = self.models_tau[group].predict(X) - te[:, i] = dhat + """Predict treatment effects. + Args: + X (np.matrix, np.array, pd.DataFrame, pl.DataFrame, or pl.LazyFrame): a feature matrix. + A pl.LazyFrame is collected once at the start of this method. + + Returns: + (numpy.ndarray): Predictions of treatment effects. + """ + X = collect_if_lazy(X) + te = np.zeros((n_rows(X), self.t_groups.shape[0])) + for i, group in enumerate(self.t_groups): + te[:, i] = self.models_tau[group].predict(X) return te @@ -543,9 +598,6 @@ def __init__( assert isinstance(random_state, int), "random_state should be int." # Store verbatim — no transformation, no XGBRegressor construction here. - # xgb_kwargs=None is stored as-is; BaseEstimator.get_params surfaces it - # correctly since it is a named parameter. The or {} coalesce happens in - # fit() so that clone(XGBRRegressor()) still round-trips None → None. self.early_stopping = early_stopping self.test_size = test_size self.early_stopping_rounds = early_stopping_rounds @@ -564,22 +616,37 @@ def __init__( ) def fit(self, X, treatment, y, p=None, sample_weight=None, verbose=True): - """Fit using early-stopping XGBoost R-learner.""" - X, treatment, y = convert_pd_to_np(X, treatment, y) + """Fit using early-stopping XGBoost R-learner. + + Args: + X (np.matrix, np.array, pd.DataFrame, pl.DataFrame, or pl.LazyFrame): a feature matrix. + A pl.LazyFrame is collected once at the start of this method. + treatment (np.array, pd.Series, or pl.Series): a treatment vector + y (np.array, pd.Series, or pl.Series): an outcome vector + p (np.ndarray, pd.Series, pl.Series, or dict, optional): an array of propensity scores of float (0,1) in the + single-treatment case; or, a dictionary of treatment groups that map to propensity vectors of + float (0,1); if None will run ElasticNetPropensityModel() to generate the propensity scores. + sample_weight (np.array, pd.Series, or pl.Series, optional): an array of sample weights indicating the + weight of each observation for `effect_learner`. If None, it assumes equal weight. + verbose (bool, optional): whether to output progress logs + """ + X = collect_if_lazy(X) check_treatment_vector(treatment, self.control_name) + treatment_np = to_numpy(treatment) + y_np = to_numpy(y) + + # initialize equal sample weight if it's not provided, for simplicity purpose sample_weight = ( - convert_pd_to_np(sample_weight) - if sample_weight is not None - else convert_pd_to_np(np.ones(len(y))) + to_numpy(sample_weight) if sample_weight is not None else np.ones(len(y_np)) ) assert len(sample_weight) == len( - y + y_np ), "Data length must be equal for sample_weight and the input data" - self.t_groups = np.unique(treatment[treatment != self.control_name]) + self.t_groups = np.unique(treatment_np[treatment_np != self.control_name]) self.t_groups.sort() if p is None: - self._set_propensity_models(X=X, treatment=treatment, y=y) + self._set_propensity_models(X=X, treatment=treatment_np, y=y_np) p = self.propensity else: p = self._format_p(p, self.t_groups) @@ -623,15 +690,17 @@ def fit(self, X, treatment, y, p=None, sample_weight=None, verbose=True): if verbose: logger.info("generating out-of-fold CV outcome estimates") - yhat = cross_val_predict(self.model_mu, X, y, cv=self.cv, n_jobs=-1) + yhat = cross_val_predict(self.model_mu, X, y_np, cv=self.cv, n_jobs=-1) for group in self.t_groups: - treatment_mask = (treatment == group) | (treatment == self.control_name) - treatment_filt = treatment[treatment_mask] - w = (treatment_filt == group).astype(int) + treatment_mask = (treatment_np == group) | ( + treatment_np == self.control_name + ) + treatment_filt = filter_mask(treatment, treatment_mask) + w = (to_numpy(treatment_filt) == group).astype(int) - X_filt = X[treatment_mask] - y_filt = y[treatment_mask] + X_filt = filter_mask(X, treatment_mask) + y_filt = y_np[treatment_mask] yhat_filt = yhat[treatment_mask] p_filt = p[group][treatment_mask] sample_weight_filt = sample_weight[treatment_mask] diff --git a/causalml/inference/meta/slearner.py b/causalml/inference/meta/slearner.py index ee599e7e..ce05ed9b 100644 --- a/causalml/inference/meta/slearner.py +++ b/causalml/inference/meta/slearner.py @@ -7,7 +7,15 @@ from copy import deepcopy from causalml.inference.meta.base import BaseLearner -from causalml.inference.meta.utils import check_treatment_vector, convert_pd_to_np +from causalml.inference.meta.utils import ( + check_treatment_vector, + collect_if_lazy, + concat_treatment_col, + filter_mask, + n_rows, + prepend_column, + to_numpy, +) from causalml.metrics import regression_metrics, classification_metrics logger = logging.getLogger("causalml") @@ -18,6 +26,7 @@ class StatsmodelsOLS: def __init__(self, cov_type="HC1", alpha=0.05): """Initialize a statsmodels' OLS wrapper class object. + Args: cov_type (str, optional): covariance estimator type. alpha (float, optional): the confidence level alpha. @@ -27,6 +36,7 @@ def __init__(self, cov_type="HC1", alpha=0.05): def fit(self, X, y): """Fit OLS. + Args: X (np.matrix): a feature matrix y (np.array): a label vector @@ -46,7 +56,9 @@ def predict(self, X): class BaseSLearner(BaseLearner): """A parent class for S-learner classes. + An S-learner estimates treatment effects with one machine learning model. + Details of S-learner are available at `Kunzel et al. (2018) `_. """ @@ -55,7 +67,7 @@ def __init__(self, learner=None, ate_alpha=0.05, control_name=0): Args: learner (optional): a model to estimate the treatment effect. - If None, a DummyRegressor is used. The argument is stored + If None, a DummyRegressor is used. The argument is stored verbatim so that ``get_params`` / ``clone`` work correctly (scikit-learn convention). ate_alpha (float, optional): the confidence level alpha of the ATE estimate @@ -70,13 +82,16 @@ def fit(self, X, treatment, y, p=None): """Fit the inference model. Args: - X (np.matrix, np.array, or pd.Dataframe): a feature matrix - treatment (np.array or pd.Series): a treatment vector - y (np.array or pd.Series): an outcome vector + X (np.matrix, np.array, pd.DataFrame, pl.DataFrame, or pl.LazyFrame): a feature matrix. + A pl.LazyFrame is collected once at the start of this method; the + feature matrix is otherwise kept in its native format throughout. + treatment (np.array, pd.Series, or pl.Series): a treatment vector + y (np.array, pd.Series, or pl.Series): an outcome vector """ - X, treatment, y = convert_pd_to_np(X, treatment, y) + X = collect_if_lazy(X) check_treatment_vector(treatment, self.control_name) - self.t_groups = np.unique(treatment[treatment != self.control_name]) + treatment_np = to_numpy(treatment) + self.t_groups = np.unique(treatment_np[treatment_np != self.control_name]) self.t_groups.sort() self._classes = {group: i for i, group in enumerate(self.t_groups)} @@ -85,13 +100,13 @@ def fit(self, X, treatment, y, p=None): self.models = {group: deepcopy(_base_model) for group in self.t_groups} for group in self.t_groups: - mask = (treatment == group) | (treatment == self.control_name) - treatment_filt = treatment[mask] - X_filt = X[mask] - y_filt = y[mask] + mask = (treatment_np == group) | (treatment_np == self.control_name) + treatment_filt = filter_mask(treatment, mask) + X_filt = filter_mask(X, mask) + y_filt = filter_mask(y, mask) - w = (treatment_filt == group).astype(int) - X_new = np.hstack((w.reshape((-1, 1)), X_filt)) + w = (to_numpy(treatment_filt) == group).astype(int) + X_new = concat_treatment_col(w, X_filt) self.models[group].fit(X_new, y_filt) return self @@ -99,35 +114,38 @@ def predict( self, X, treatment=None, y=None, p=None, return_components=False, verbose=True ): """Predict treatment effects. + Args: - X (np.matrix or np.array or pd.Dataframe): a feature matrix - treatment (np.array or pd.Series, optional): a treatment vector - y (np.array or pd.Series, optional): an outcome vector + X (np.matrix, np.array, pd.DataFrame, pl.DataFrame, or pl.LazyFrame): a feature matrix. + A pl.LazyFrame is collected once at the start of this method. + treatment (np.array, pd.Series, or pl.Series, optional): a treatment vector + y (np.array, pd.Series, or pl.Series, optional): an outcome vector return_components (bool, optional): whether to return outcome for treatment and control seperately verbose (bool, optional): whether to output progress logs Returns: (numpy.ndarray): Predictions of treatment effects. """ - X, treatment, y = convert_pd_to_np(X, treatment, y) + X = collect_if_lazy(X) yhat_cs = {} yhat_ts = {} - # Build the augmented arrays once; they are identical for every group. - # (Separate allocations avoid in-place mutation by learners like CatBoost - # that set the writeable flag to False on arrays passed to predict().) - X_new_c = np.hstack((np.zeros((X.shape[0], 1)), X)) - X_new_t = np.hstack((np.ones((X.shape[0], 1)), X)) - for group in self.t_groups: model = self.models[group] + + # Build separate frames for control and treatment to avoid in-place + # mutation, which fails when learners like CatBoost set the + # writeable flag to False on arrays passed to predict(). + X_new_c = prepend_column(0.0, X) + X_new_t = prepend_column(1.0, X) yhat_cs[group] = model.predict(X_new_c) yhat_ts[group] = model.predict(X_new_t) if (y is not None) and (treatment is not None) and verbose: - mask = (treatment == group) | (treatment == self.control_name) - treatment_filt = treatment[mask] - w = (treatment_filt == group).astype(int) - y_filt = y[mask] + treatment_np = to_numpy(treatment) + mask = (treatment_np == group) | (treatment_np == self.control_name) + treatment_filt_np = treatment_np[mask] + w = (treatment_filt_np == group).astype(int) + y_filt = to_numpy(filter_mask(y, mask)) yhat = np.zeros_like(y_filt, dtype=float) yhat[w == 0] = yhat_cs[group][mask][w == 0] @@ -136,7 +154,7 @@ def predict( logger.info("Error metrics for group {}".format(group)) regression_metrics(y_filt, yhat, w) - te = np.zeros((X.shape[0], self.t_groups.shape[0])) + te = np.zeros((n_rows(X), self.t_groups.shape[0])) for i, group in enumerate(self.t_groups): te[:, i] = yhat_ts[group] - yhat_cs[group] @@ -158,10 +176,11 @@ def fit_predict( verbose=True, ): """Fit the inference model of the S learner and predict treatment effects. + Args: - X (np.matrix, np.array, or pd.Dataframe): a feature matrix - treatment (np.array or pd.Series): a treatment vector - y (np.array or pd.Series): an outcome vector + X (np.matrix, np.array, pd.DataFrame, pl.DataFrame, or pl.LazyFrame): a feature matrix + treatment (np.array, pd.Series, or pl.Series): a treatment vector + y (np.array, pd.Series, or pl.Series): an outcome vector return_ci (bool, optional): whether to return confidence intervals n_bootstraps (int, optional): number of bootstrap iterations bootstrap_size (int, optional): number of samples per bootstrap @@ -172,22 +191,26 @@ def fit_predict( If return_ci, returns CATE [n_samples, n_treatment], LB [n_samples, n_treatment], UB [n_samples, n_treatment] """ + X = collect_if_lazy(X) self.fit(X, treatment, y) te = self.predict(X, treatment, y, return_components=return_components) if not return_ci: return te else: + treatment_np = to_numpy(treatment) + y_np = to_numpy(y) + t_groups_global = self.t_groups _classes_global = self._classes models_global = deepcopy(self.models) te_bootstraps = np.zeros( - shape=(X.shape[0], self.t_groups.shape[0], n_bootstraps) + shape=(n_rows(X), self.t_groups.shape[0], n_bootstraps) ) logger.info("Bootstrap Confidence Intervals") for i in tqdm(range(n_bootstraps)): - te_b = self.bootstrap(X, treatment, y, size=bootstrap_size) + te_b = self.bootstrap(X, treatment_np, y_np, size=bootstrap_size) te_bootstraps[:, :, i] = te_b te_lower = np.percentile(te_bootstraps, (self.ate_alpha / 2) * 100, axis=2) @@ -217,9 +240,9 @@ def estimate_ate( """Estimate the Average Treatment Effect (ATE). Args: - X (np.matrix, np.array, or pd.Dataframe): a feature matrix - treatment (np.array or pd.Series): a treatment vector - y (np.array or pd.Series): an outcome vector + X (np.matrix, np.array, pd.DataFrame, pl.DataFrame, or pl.LazyFrame): a feature matrix + treatment (np.array, pd.Series, or pl.Series): a treatment vector + y (np.array, pd.Series, or pl.Series): an outcome vector return_ci (bool, optional): whether to return confidence intervals bootstrap_ci (bool): whether to return confidence intervals n_bootstraps (int): number of bootstrap iterations @@ -228,8 +251,7 @@ def estimate_ate( Returns: The mean and confidence interval (LB, UB) of the ATE estimate. """ - - X, treatment, y = convert_pd_to_np(X, treatment, y) + X = collect_if_lazy(X) if pretrain: te, yhat_cs, yhat_ts = self.predict(X, treatment, y, return_components=True) else: @@ -237,6 +259,9 @@ def estimate_ate( X, treatment, y, return_components=True ) + treatment_np = to_numpy(treatment) + y_np = to_numpy(y) + ate = np.zeros(self.t_groups.shape[0]) ate_lb = np.zeros(self.t_groups.shape[0]) ate_ub = np.zeros(self.t_groups.shape[0]) @@ -244,9 +269,9 @@ def estimate_ate( for i, group in enumerate(self.t_groups): _ate = te[:, i].mean() - mask = (treatment == group) | (treatment == self.control_name) - treatment_filt = treatment[mask] - y_filt = y[mask] + mask = (treatment_np == group) | (treatment_np == self.control_name) + treatment_filt = treatment_np[mask] + y_filt = y_np[mask] w = (treatment_filt == group).astype(int) prob_treatment = float(sum(w)) / w.shape[0] @@ -282,7 +307,7 @@ def estimate_ate( ate_bootstraps = np.zeros(shape=(self.t_groups.shape[0], n_bootstraps)) for n in tqdm(range(n_bootstraps)): - ate_b = self.bootstrap(X, treatment, y, size=bootstrap_size) + ate_b = self.bootstrap(X, treatment_np, y_np, size=bootstrap_size) ate_bootstraps[:, n] = ate_b.mean(axis=0) ate_lower = np.percentile( @@ -301,14 +326,14 @@ def estimate_ate( class BaseSRegressor(BaseSLearner): - """ - A parent class for S-learner regressor classes. - """ + """A parent class for S-learner regressor classes.""" def __init__(self, learner=None, ate_alpha=0.05, control_name=0): """Initialize an S-learner regressor. + Args: learner (optional): a model to estimate the treatment effect + ate_alpha (float, optional): the confidence level alpha of the ATE estimate control_name (str or int, optional): name of control group """ super().__init__( @@ -317,15 +342,15 @@ def __init__(self, learner=None, ate_alpha=0.05, control_name=0): class BaseSClassifier(BaseSLearner): - """ - A parent class for S-learner classifier classes. - """ + """A parent class for S-learner classifier classes.""" def __init__(self, learner=None, ate_alpha=0.05, control_name=0): """Initialize an S-learner classifier. + Args: learner (optional): a model to estimate the treatment effect. Should have a predict_proba() method. + ate_alpha (float, optional): the confidence level alpha of the ATE estimate control_name (str or int, optional): name of control group """ super().__init__( @@ -336,33 +361,38 @@ def predict( self, X, treatment=None, y=None, p=None, return_components=False, verbose=True ): """Predict treatment effects. + Args: - X (np.matrix or np.array or pd.Dataframe): a feature matrix - treatment (np.array or pd.Series, optional): a treatment vector - y (np.array or pd.Series, optional): an outcome vector + X (np.matrix, np.array, pd.DataFrame, pl.DataFrame, or pl.LazyFrame): a feature matrix. + A pl.LazyFrame is collected once at the start of this method. + treatment (np.array, pd.Series, or pl.Series, optional): a treatment vector + y (np.array, pd.Series, or pl.Series, optional): an outcome vector return_components (bool, optional): whether to return outcome for treatment and control seperately verbose (bool, optional): whether to output progress logs Returns: (numpy.ndarray): Predictions of treatment effects. """ - X, treatment, y = convert_pd_to_np(X, treatment, y) + X = collect_if_lazy(X) yhat_cs = {} yhat_ts = {} - # Build the augmented arrays once; they are identical for every group. - X_new_c = np.hstack((np.zeros((X.shape[0], 1)), X)) - X_new_t = np.hstack((np.ones((X.shape[0], 1)), X)) - for group in self.t_groups: model = self.models[group] + + # Build separate frames for control and treatment to avoid in-place + # mutation, which fails when learners like CatBoost set the + # writeable flag to False on arrays passed to predict(). + X_new_c = prepend_column(0.0, X) + X_new_t = prepend_column(1.0, X) yhat_cs[group] = model.predict_proba(X_new_c)[:, 1] yhat_ts[group] = model.predict_proba(X_new_t)[:, 1] if y is not None and (treatment is not None) and verbose: - mask = (treatment == group) | (treatment == self.control_name) - treatment_filt = treatment[mask] - w = (treatment_filt == group).astype(int) - y_filt = y[mask] + treatment_np = to_numpy(treatment) + mask = (treatment_np == group) | (treatment_np == self.control_name) + treatment_filt_np = treatment_np[mask] + w = (treatment_filt_np == group).astype(int) + y_filt = to_numpy(filter_mask(y, mask)) yhat = np.zeros_like(y_filt, dtype=float) yhat[w == 0] = yhat_cs[group][mask][w == 0] @@ -371,7 +401,7 @@ def predict( logger.info("Error metrics for group {}".format(group)) classification_metrics(y_filt, yhat, w) - te = np.zeros((X.shape[0], self.t_groups.shape[0])) + te = np.zeros((n_rows(X), self.t_groups.shape[0])) for i, group in enumerate(self.t_groups): te[:, i] = yhat_ts[group] - yhat_cs[group] @@ -384,6 +414,7 @@ def predict( class LRSRegressor(BaseSRegressor): def __init__(self, ate_alpha=0.05, control_name=0): """Initialize an S-learner with a linear regression model. + Args: ate_alpha (float, optional): the confidence level alpha of the ATE estimate control_name (str or int, optional): name of control group @@ -392,14 +423,16 @@ def __init__(self, ate_alpha=0.05, control_name=0): def estimate_ate(self, X, treatment, y, p=None, pretrain=False): """Estimate the Average Treatment Effect (ATE). + Args: - X (np.matrix, np.array, or pd.Dataframe): a feature matrix - treatment (np.array or pd.Series): a treatment vector - y (np.array or pd.Series): an outcome vector + X (np.matrix, np.array, pd.DataFrame, pl.DataFrame, or pl.LazyFrame): a feature matrix + treatment (np.array, pd.Series, or pl.Series): a treatment vector + y (np.array, pd.Series, or pl.Series): an outcome vector + pretrain (bool): whether a model has been fit, default False. Returns: The mean and confidence interval (LB, UB) of the ATE estimate. """ - X, treatment, y = convert_pd_to_np(X, treatment, y) + X = collect_if_lazy(X) if not pretrain: self.fit(X, treatment, y) diff --git a/causalml/inference/meta/tlearner.py b/causalml/inference/meta/tlearner.py index 749ac127..d9d555da 100644 --- a/causalml/inference/meta/tlearner.py +++ b/causalml/inference/meta/tlearner.py @@ -15,7 +15,13 @@ from xgboost import XGBRegressor from causalml.inference.meta.base import BaseLearner -from causalml.inference.meta.utils import check_treatment_vector, convert_pd_to_np +from causalml.inference.meta.utils import ( + check_treatment_vector, + collect_if_lazy, + filter_mask, + n_rows, + to_numpy, +) from causalml.metrics import regression_metrics, classification_metrics logger = logging.getLogger("causalml") @@ -71,12 +77,14 @@ def fit( random_state=None, n_jobs=1, ): - """Fit the inference model + """Fit the inference model. Args: - X (np.matrix or np.array or pd.Dataframe): a feature matrix - treatment (np.array or pd.Series): a treatment vector - y (np.array or pd.Series): an outcome vector + X (np.matrix, np.array, pd.DataFrame, pl.DataFrame, or pl.LazyFrame): a feature matrix. + A pl.LazyFrame is collected once at the start of this method; the + feature matrix is otherwise kept in its native format throughout. + treatment (np.array, pd.Series, or pl.Series): a treatment vector + y (np.array, pd.Series, or pl.Series): an outcome vector p: unused, kept for API consistency store_bootstraps (bool, optional): if True, trains a bootstrap ensemble during fit and stores it in self.bootstrap_models_ for post-fit CI @@ -87,6 +95,7 @@ def fit( bootstrap_size (int, optional): number of samples per bootstrap. Default: 10000. random_state (int, optional): random seed for reproducible bootstrap sampling. """ + X = collect_if_lazy(X) if (self.learner is None) and ( (self.control_learner is None) or (self.treatment_learner is None) ): @@ -94,9 +103,11 @@ def fit( "Either `learner` or both `control_learner` and `treatment_learner` " "must be specified." ) - X, treatment, y = convert_pd_to_np(X, treatment, y) check_treatment_vector(treatment, self.control_name) - self.t_groups = np.unique(treatment[treatment != self.control_name]) + treatment_np = to_numpy(treatment) + y_np = to_numpy(y) + + self.t_groups = np.unique(treatment_np[treatment_np != self.control_name]) self.t_groups.sort() self._classes = {group: i for i, group in enumerate(self.t_groups)} @@ -114,22 +125,25 @@ def fit( self.models_t = {group: deepcopy(_treatment_learner) for group in self.t_groups} - # model_c is trained on the control group, identical for every treatment group. - control_mask = treatment == self.control_name + # model_c is trained on the control group, which is identical for every + # treatment group, so fit it once. + control_mask = treatment_np == self.control_name self.model_c = deepcopy(_control_learner) - self.model_c.fit(X[control_mask], y[control_mask]) + self.model_c.fit(filter_mask(X, control_mask), y_np[control_mask]) # Expose as a shared-reference dict to preserve the public models_c API. self.models_c = {group: self.model_c for group in self.t_groups} for group in self.t_groups: - treatment_mask = treatment == group - self.models_t[group].fit(X[treatment_mask], y[treatment_mask]) + treatment_mask = treatment_np == group + self.models_t[group].fit( + filter_mask(X, treatment_mask), y_np[treatment_mask] + ) if store_bootstraps: self.fit_bootstrap_ensemble( X=X, - treatment=treatment, - y=y, + treatment=treatment_np, + y=y_np, n_bootstraps=n_bootstraps, bootstrap_size=bootstrap_size, random_state=random_state, @@ -143,7 +157,7 @@ def _compute_bootstrap_ci(self, X): """Compute bootstrap CI using stored ensemble. Args: - X (np.matrix or np.array or pd.Dataframe): a feature matrix + X (np.matrix, np.array, pd.DataFrame, pl.DataFrame, or pl.LazyFrame): a feature matrix Returns: (te_lower, te_upper): percentile CI bounds, each of shape [n_samples, n_treatment] """ @@ -152,7 +166,7 @@ def _compute_bootstrap_ci(self, X): "No bootstrap ensemble found. Call fit(..., store_bootstraps=True) first." ) te_bootstraps = np.zeros( - (X.shape[0], self.t_groups.shape[0], len(self.bootstrap_models_)) + (n_rows(X), self.t_groups.shape[0], len(self.bootstrap_models_)) ) for b, learner_b in enumerate(self.bootstrap_models_): te_bootstraps[:, :, b] = learner_b.predict(X) @@ -173,11 +187,11 @@ def predict( """Predict treatment effects. Args: - X (np.matrix or np.array or pd.Dataframe): a feature matrix - treatment (np.array or pd.Series, optional): a treatment vector - y (np.array or pd.Series, optional): an outcome vector - return_components (bool, optional): whether to return outcome for - treatment and control separately + X (np.matrix, np.array, pd.DataFrame, pl.DataFrame, or pl.LazyFrame): a feature matrix. + A pl.LazyFrame is collected once at the start of this method. + treatment (np.array, pd.Series, or pl.Series, optional): a treatment vector + y (np.array, pd.Series, or pl.Series, optional): an outcome vector + return_components (bool, optional): whether to return outcome for treatment and control seperately verbose (bool, optional): whether to output progress logs return_ci (bool, optional): whether to return confidence intervals using the stored bootstrap ensemble. Requires fit() to have been @@ -189,20 +203,22 @@ def predict( if return_ci and return_components: raise ValueError("return_ci and return_components cannot both be True.") - X, treatment, y = convert_pd_to_np(X, treatment, y) + X = collect_if_lazy(X) yhat_ts = {} yhat_c = self.model_c.predict(X) + # Shared-reference dict — no array duplication yhat_cs = {group: yhat_c for group in self.t_groups} for group in self.t_groups: yhat_ts[group] = self.models_t[group].predict(X) if (y is not None) and (treatment is not None) and verbose: - mask = (treatment == group) | (treatment == self.control_name) - treatment_filt = treatment[mask] - y_filt = y[mask] - w = (treatment_filt == group).astype(int) + treatment_np = to_numpy(treatment) + mask = (treatment_np == group) | (treatment_np == self.control_name) + treatment_filt_np = treatment_np[mask] + y_filt = to_numpy(filter_mask(y, mask)) + w = (treatment_filt_np == group).astype(int) yhat = np.zeros_like(y_filt, dtype=float) yhat[w == 0] = yhat_c[mask][w == 0] @@ -211,7 +227,7 @@ def predict( logger.info("Error metrics for group {}".format(group)) regression_metrics(y_filt, yhat, w) - te = np.zeros((X.shape[0], self.t_groups.shape[0])) + te = np.zeros((n_rows(X), self.t_groups.shape[0])) for i, group in enumerate(self.t_groups): te[:, i] = yhat_ts[group] - yhat_c @@ -239,9 +255,9 @@ def fit_predict( """Fit the inference model of the T learner and predict treatment effects. Args: - X (np.matrix or np.array or pd.Dataframe): a feature matrix - treatment (np.array or pd.Series): a treatment vector - y (np.array or pd.Series): an outcome vector + X (np.matrix, np.array, pd.DataFrame, pl.DataFrame, or pl.LazyFrame): a feature matrix + treatment (np.array, pd.Series, or pl.Series): a treatment vector + y (np.array, pd.Series, or pl.Series): an outcome vector return_ci (bool): whether to return confidence intervals n_bootstraps (int): number of bootstrap iterations bootstrap_size (int): number of samples per bootstrap @@ -252,9 +268,12 @@ def fit_predict( If return_ci, returns CATE [n_samples, n_treatment], LB [n_samples, n_treatment], UB [n_samples, n_treatment] """ - X, treatment, y = convert_pd_to_np(X, treatment, y) - self.fit(X, treatment, y) - te = self.predict(X, treatment, y, return_components=return_components) + X = collect_if_lazy(X) + treatment_np = to_numpy(treatment) + y_np = to_numpy(y) + + self.fit(X, treatment_np, y_np) + te = self.predict(X, treatment_np, y_np, return_components=return_components) if not return_ci: return te @@ -264,12 +283,12 @@ def fit_predict( model_c_global = deepcopy(self.model_c) models_t_global = deepcopy(self.models_t) te_bootstraps = np.zeros( - shape=(X.shape[0], self.t_groups.shape[0], n_bootstraps) + shape=(n_rows(X), self.t_groups.shape[0], n_bootstraps) ) logger.info("Bootstrap Confidence Intervals") for i in tqdm(range(n_bootstraps)): - te_b = self.bootstrap(X, treatment, y, size=bootstrap_size) + te_b = self.bootstrap(X, treatment_np, y_np, size=bootstrap_size) te_bootstraps[:, :, i] = te_b te_lower = np.percentile(te_bootstraps, (self.ate_alpha / 2) * 100, axis=2) @@ -300,9 +319,9 @@ def estimate_ate( """Estimate the Average Treatment Effect (ATE). Args: - X (np.matrix or np.array or pd.Dataframe): a feature matrix - treatment (np.array or pd.Series): a treatment vector - y (np.array or pd.Series): an outcome vector + X (np.matrix, np.array, pd.DataFrame, pl.DataFrame, or pl.LazyFrame): a feature matrix + treatment (np.array, pd.Series, or pl.Series): a treatment vector + y (np.array, pd.Series, or pl.Series): an outcome vector bootstrap_ci (bool): whether to return confidence intervals n_bootstraps (int): number of bootstrap iterations bootstrap_size (int): number of samples per bootstrap @@ -310,12 +329,17 @@ def estimate_ate( Returns: The mean and confidence interval (LB, UB) of the ATE estimate. """ - X, treatment, y = convert_pd_to_np(X, treatment, y) + X = collect_if_lazy(X) + treatment_np = to_numpy(treatment) + y_np = to_numpy(y) + if pretrain: - te, yhat_cs, yhat_ts = self.predict(X, treatment, y, return_components=True) + te, yhat_cs, yhat_ts = self.predict( + X, treatment_np, y_np, return_components=True + ) else: te, yhat_cs, yhat_ts = self.fit_predict( - X, treatment, y, return_components=True + X, treatment_np, y_np, return_components=True ) ate = np.zeros(self.t_groups.shape[0]) @@ -325,9 +349,9 @@ def estimate_ate( for i, group in enumerate(self.t_groups): _ate = te[:, i].mean() - mask = (treatment == group) | (treatment == self.control_name) - treatment_filt = treatment[mask] - y_filt = y[mask] + mask = (treatment_np == group) | (treatment_np == self.control_name) + treatment_filt = treatment_np[mask] + y_filt = y_np[mask] w = (treatment_filt == group).astype(int) prob_treatment = float(sum(w)) / w.shape[0] @@ -362,7 +386,7 @@ def estimate_ate( ate_bootstraps = np.zeros(shape=(self.t_groups.shape[0], n_bootstraps)) for n in tqdm(range(n_bootstraps)): - ate_b = self.bootstrap(X, treatment, y, size=bootstrap_size) + ate_b = self.bootstrap(X, treatment_np, y_np, size=bootstrap_size) ate_bootstraps[:, n] = ate_b.mean(axis=0) ate_lower = np.percentile( @@ -383,9 +407,7 @@ def estimate_ate( class BaseTRegressor(BaseTLearner): - """ - A parent class for T-learner regressor classes. - """ + """A parent class for T-learner regressor classes.""" def __init__( self, @@ -414,9 +436,7 @@ def __init__( class BaseTClassifier(BaseTLearner): - """ - A parent class for T-learner classifier classes. - """ + """A parent class for T-learner classifier classes.""" def __init__( self, @@ -456,10 +476,14 @@ def predict( """Predict treatment effects. Args: - X (np.matrix or np.array or pd.Dataframe): a feature matrix - treatment (np.array or pd.Series, optional): a treatment vector - y (np.array or pd.Series, optional): an outcome vector + X (np.matrix, np.array, pd.DataFrame, pl.DataFrame, or pl.LazyFrame): a feature matrix. + A pl.LazyFrame is collected once at the start of this method. + treatment (np.array, pd.Series, or pl.Series, optional): a treatment vector + y (np.array, pd.Series, or pl.Series, optional): an outcome vector + return_components (bool, optional): whether to return outcome for treatment and control seperately verbose (bool, optional): whether to output progress logs + return_ci (bool, optional): whether to return confidence intervals using + the stored bootstrap ensemble. Returns: (numpy.ndarray): Predictions of treatment effects. """ @@ -468,6 +492,7 @@ def predict( if return_ci and return_components: raise ValueError("return_ci and return_components cannot both be True.") + X = collect_if_lazy(X) yhat_ts = {} yhat_c = self.model_c.predict_proba(X)[:, 1] @@ -477,10 +502,11 @@ def predict( yhat_ts[group] = self.models_t[group].predict_proba(X)[:, 1] if (y is not None) and (treatment is not None) and verbose: - mask = (treatment == group) | (treatment == self.control_name) - treatment_filt = treatment[mask] - y_filt = y[mask] - w = (treatment_filt == group).astype(int) + treatment_np = to_numpy(treatment) + mask = (treatment_np == group) | (treatment_np == self.control_name) + treatment_filt_np = treatment_np[mask] + y_filt = to_numpy(filter_mask(y, mask)) + w = (treatment_filt_np == group).astype(int) yhat = np.zeros_like(y_filt, dtype=float) yhat[w == 0] = yhat_c[mask][w == 0] @@ -489,7 +515,7 @@ def predict( logger.info("Error metrics for group {}".format(group)) classification_metrics(y_filt, yhat, w) - te = np.zeros((X.shape[0], self.t_groups.shape[0])) + te = np.zeros((n_rows(X), self.t_groups.shape[0])) for i, group in enumerate(self.t_groups): te[:, i] = yhat_ts[group] - yhat_c diff --git a/causalml/inference/meta/utils.py b/causalml/inference/meta/utils.py index 157eeaf6..91dff021 100644 --- a/causalml/inference/meta/utils.py +++ b/causalml/inference/meta/utils.py @@ -4,42 +4,295 @@ from packaging import version from xgboost import __version__ as xgboost_version +# --------------------------------------------------------------------------- +# Optional Polars import +# --------------------------------------------------------------------------- +try: + import polars as pl + + _POLARS_AVAILABLE = True +except ImportError: + pl = None + _POLARS_AVAILABLE = False + + +# --------------------------------------------------------------------------- +# Native DataFrame helpers +# +# Design contract (per maintainer review): +# - X (feature matrices) stay native end-to-end: numpy, pandas, or polars. +# pl.LazyFrame is collected ONCE at the top of each public method into a +# pl.DataFrame via `collect_if_lazy`. After that point, helpers only need +# to handle pl.DataFrame / pl.Series. +# - treatment / y / p / sample_weight (1-D vectors used for masking, +# np.unique, .astype, etc.) are normalized to numpy via `to_numpy` at the +# entry of each public method. +# - Never call to_numpy(X) merely to read a row count: use `n_rows(X)`, +# which works for numpy, pandas, and polars. +# --------------------------------------------------------------------------- + + +def collect_if_lazy(X): + """Collect a polars LazyFrame into a DataFrame; pass through otherwise. + + Call this once at the top of each public method on the feature matrix X + so that downstream helpers (filter_mask, filter_index, prepend_column, + concat_treatment_col, n_rows) never need to handle pl.LazyFrame. + + Args: + X: numpy array, pd.DataFrame, pl.DataFrame, or pl.LazyFrame. + + Returns: + X unchanged, unless it was a pl.LazyFrame (then its .collect()). + """ + if _POLARS_AVAILABLE and isinstance(X, pl.LazyFrame): + return X.collect() + return X -def convert_pd_to_np(*args): - output = [obj.to_numpy() if hasattr(obj, "to_numpy") else obj for obj in args] - return output if len(output) > 1 else output[0] + +def n_rows(X): + """Return the number of rows of X without converting to numpy. + + Works for numpy arrays, pandas DataFrames/Series, and polars + DataFrames/Series (and LazyFrames, which are collected first). + + Args: + X: numpy array, pd.DataFrame, pd.Series, pl.DataFrame, pl.Series, + or pl.LazyFrame. + + Returns: + int: number of rows. + """ + X = collect_if_lazy(X) + if hasattr(X, "shape"): + return X.shape[0] + return len(X) + + +def filter_mask(obj, mask): + """Filter rows by a boolean mask. + + Works natively for numpy arrays, pandas DataFrames/Series, and + polars DataFrames/Series/LazyFrames without materialising unnecessary + copies. + + Args: + obj: numpy array, pd.DataFrame, pd.Series, pl.DataFrame, pl.Series, + or pl.LazyFrame. + mask: boolean array or Series of the same length as obj. + + Returns: + Filtered object of the same type as input (a pl.LazyFrame input is + returned as a pl.DataFrame, since filtering requires collection). + """ + if obj is None: + return None + + obj = collect_if_lazy(obj) + + if isinstance(obj, (pd.DataFrame, pd.Series)): + if isinstance(mask, pd.Series): + return obj.loc[mask] + return obj.loc[np.asarray(mask, dtype=bool)] + + if _POLARS_AVAILABLE and isinstance(obj, (pl.DataFrame, pl.Series)): + if isinstance(mask, pl.Series): + return obj.filter(mask) + return obj.filter(pl.Series(np.asarray(mask, dtype=bool))) + + # numpy / anything else + return obj[np.asarray(mask, dtype=bool)] + + +def filter_index(obj, indices): + """Filter rows by integer positional indices. + + Used for KFold partition slicing (e.g. in DR-learner) and bootstrap + resampling. + + Args: + obj: numpy array, pd.DataFrame, pd.Series, pl.DataFrame, pl.Series, + or pl.LazyFrame. + indices: integer array of row positions. + + Returns: + Filtered object of the same type as input (a pl.LazyFrame input is + returned as a pl.DataFrame). + """ + if obj is None: + return None + + obj = collect_if_lazy(obj) + + if isinstance(obj, (pd.DataFrame, pd.Series)): + return obj.iloc[indices] + + if _POLARS_AVAILABLE and isinstance(obj, (pl.DataFrame, pl.Series)): + return obj[indices] + + return obj[indices] # numpy + + +def prepend_column(value, X): + """Prepend a constant-value column to a feature matrix X. + + Used by the S-learner to prepend the treatment indicator before + calling predict() for the control (value=0.0) and treatment (value=1.0) + counterfactuals. + + Args: + value (float): scalar value to fill the new column with (0.0 or 1.0). + X: numpy array, pd.DataFrame, pl.DataFrame, or pl.LazyFrame. + + Returns: + Feature matrix with the new column prepended, same type as X (a + pl.LazyFrame input is returned as a pl.DataFrame). + """ + X = collect_if_lazy(X) + n = n_rows(X) + + if isinstance(X, pd.DataFrame): + # Reset to range-index column names to avoid type clashes, then restore. + # We prepend a treatment column then rename all columns to 0..n so + # downstream sklearn sees consistent integer names. + arr = np.hstack((np.full((n, 1), value), X.to_numpy())) + return pd.DataFrame(arr, index=X.index) + + if _POLARS_AVAILABLE and isinstance(X, pl.DataFrame): + col = pl.Series("_w", np.full(n, value)) + return X.with_columns(col).select(["_w"] + X.columns) + + # numpy + return np.hstack((np.full((n, 1), value), X)) + + +def concat_treatment_col(w, X): + """Prepend an array-valued treatment column *w* to feature matrix X. + + Unlike :func:`prepend_column`, this takes an existing 1-D array rather + than a scalar. Used by S-learner ``fit()`` to build the augmented + matrix [w | X]. + + Args: + w: 1-D numpy array of treatment indicators (0/1). + X: numpy array, pd.DataFrame, pl.DataFrame, or pl.LazyFrame. + + Returns: + Augmented feature matrix with w prepended, same type as X (a + pl.LazyFrame input is returned as a pl.DataFrame). + """ + X = collect_if_lazy(X) + + if isinstance(X, pd.DataFrame): + arr = np.hstack((np.asarray(w).reshape(-1, 1), X.to_numpy())) + return pd.DataFrame(arr, index=X.index) + + if _POLARS_AVAILABLE and isinstance(X, pl.DataFrame): + col = pl.Series("_w", np.asarray(w)) + return X.with_columns(col).select(["_w"] + X.columns) + + # numpy + return np.hstack((np.asarray(w).reshape(-1, 1), X)) + + +def to_numpy(obj): + """Convert a pandas or polars 1-D vector (or feature matrix) to NumPy. + + Per the native-X contract, this should be called on ``treatment``, + ``y``, ``p``, and ``sample_weight`` at the entry of each public method. + It should NOT be called on the feature matrix X except at the small + number of boundaries that strictly require numpy (documented inline + where used). + + Args: + obj: numpy array, pd.DataFrame, pd.Series, pl.DataFrame, + pl.LazyFrame, or pl.Series. ``None`` is passed through. + + Returns: + numpy.ndarray or None. + """ + if obj is None: + return None + + obj = collect_if_lazy(obj) + + if _POLARS_AVAILABLE and isinstance(obj, (pl.DataFrame, pl.Series)): + arr = obj.to_numpy() + if isinstance(obj, pl.DataFrame) and arr.ndim == 2 and arr.shape[1] == 1: + return arr.ravel() + return arr + + if hasattr(obj, "to_numpy"): + return obj.to_numpy() + + return np.asarray(obj) + + +# --------------------------------------------------------------------------- +# Validation helpers +# --------------------------------------------------------------------------- def check_treatment_vector(treatment, control_name=None): - n_unique_treatments = np.unique(treatment).shape[0] + """Assert that *treatment* has at least two unique levels. + + Args: + treatment (np.array, pd.Series, or pl.Series): a treatment vector. + control_name (str or int, optional): name of the control group. + """ + t = to_numpy(treatment) + n_unique_treatments = np.unique(t).shape[0] assert n_unique_treatments > 1, "Treatment vector must have at least two levels." if control_name is not None: assert ( - control_name in treatment + control_name in t ), "Control group level {} not found in treatment vector.".format(control_name) def check_p_conditions(p, t_groups): + """Validate that propensity scores p lie strictly within (0, 1). + + Args: + p (np.ndarray, pd.Series, pl.Series, or dict): propensity scores. + t_groups (np.ndarray): unique non-control treatment group labels. + """ eps = np.finfo(float).eps + + _allowed = [np.ndarray, pd.Series] + if _POLARS_AVAILABLE: + _allowed.append(pl.Series) + _allowed_tuple = tuple(_allowed) + assert isinstance( - p, (np.ndarray, pd.Series, dict) - ), "p must be an np.ndarray, pd.Series, or dict type" - if isinstance(p, (np.ndarray, pd.Series)): + p, (*_allowed_tuple, dict) + ), "p must be an np.ndarray, pd.Series, pl.Series (if polars is installed), or dict type" + + if isinstance(p, _allowed_tuple): + p_np = to_numpy(p) assert ( t_groups.shape[0] == 1 - ), "If p is passed as an np.ndarray, there must be only 1 unique non-control group in the treatment vector." - assert (0 + eps < p).all() and ( - p < 1 - eps + ), "If p is passed as an array/Series, there must be only 1 unique non-control group in the treatment vector." + assert (0 + eps < p_np).all() and ( + p_np < 1 - eps ).all(), "The values of p should lie within the (0, 1) interval." if isinstance(p, dict): for t_name in t_groups: - assert (0 + eps < p[t_name]).all() and ( - p[t_name] < 1 - eps + p_np = to_numpy(p[t_name]) + assert (0 + eps < p_np).all() and ( + p_np < 1 - eps ).all(), "The values of p should lie within the (0, 1) interval." def check_explain_conditions(method, models, X=None, treatment=None, y=None): + """Validate inputs for explain_validation-style feature-importance methods. + + Args: + method (str): one of "gini", "permutation", "shapley". + models (list): models that need a ``.feature_importances_`` attribute + for "gini"/"shapley". + X, treatment, y: required (non-None) for "permutation"/"shapley". + """ valid_methods = ["gini", "permutation", "shapley"] assert method in valid_methods, "Current supported methods: {}".format( ", ".join(valid_methods) @@ -59,6 +312,11 @@ def check_explain_conditions(method, models, X=None, treatment=None, y=None): ), "X, treatment, and y must be provided if method = {}".format(method) +# --------------------------------------------------------------------------- +# XGBoost objective helpers (unchanged) +# --------------------------------------------------------------------------- + + def clean_xgboost_objective(objective): """ Translate objective to be compatible with loaded xgboost version @@ -134,3 +392,21 @@ def get_weighted_variance(x, sample_weight): average = np.average(x, weights=sample_weight) variance = np.average((x - average) ** 2, weights=sample_weight) return variance + + +# --------------------------------------------------------------------------- +# Backward-compatibility alias +# --------------------------------------------------------------------------- + + +def convert_pd_to_np(*args): + """Deprecated alias for :func:`to_numpy`, kept for backward compatibility + with modules (e.g. explainer.py) that have not yet migrated to the + native-DataFrame contract. + + .. deprecated:: + Use :func:`to_numpy` for 1-D vectors. Do not use on feature matrices + X under the native-X contract. + """ + output = [to_numpy(obj) for obj in args] + return output if len(output) > 1 else output[0] diff --git a/causalml/inference/meta/xlearner.py b/causalml/inference/meta/xlearner.py index 3bb64cca..536b8425 100644 --- a/causalml/inference/meta/xlearner.py +++ b/causalml/inference/meta/xlearner.py @@ -7,7 +7,10 @@ from causalml.inference.meta.base import BaseLearner from causalml.inference.meta.utils import ( check_treatment_vector, - convert_pd_to_np, + collect_if_lazy, + filter_mask, + n_rows, + to_numpy, ) from causalml.metrics import regression_metrics, classification_metrics @@ -45,9 +48,9 @@ def __init__( control_name (str or int, optional): name of control group Note: arguments are stored verbatim (scikit-learn convention) so that - ``get_params`` / ``clone`` work correctly. Model construction is deferred to ``fit()``. - Per the scikit-learn convention, ``__init__`` does not validate or raise — - validation happens in ``fit()``. + ``get_params`` / ``clone`` work correctly. Model construction is deferred + to ``fit()``. Per the scikit-learn convention, ``__init__`` does not + validate or raise — validation happens in ``fit()``. """ # Store verbatim — no deepcopy, no logic (scikit-learn convention). self.learner = learner @@ -58,21 +61,24 @@ def __init__( self.ate_alpha = ate_alpha self.control_name = control_name # Sentinel so estimate_ate(pretrain=True) raises a clean ValueError - # ("no propensity score, please call fit() first") instead of AttributeError - # when called before fit(). + # ("no propensity score, please call fit() first") instead of + # AttributeError when called before fit(). self.propensity = {} def fit(self, X, treatment, y, p=None): """Fit the inference model. Args: - X (np.matrix or np.array or pd.Dataframe): a feature matrix - treatment (np.array or pd.Series): a treatment vector - y (np.array or pd.Series): an outcome vector - p (np.ndarray or pd.Series or dict, optional): an array of propensity scores of float (0,1) in the - single-treatment case; or, a dictionary of treatment groups that map to propensity vectors of + X (np.matrix, np.array, pd.DataFrame, pl.DataFrame, or pl.LazyFrame): a feature matrix. + A pl.LazyFrame is collected once at the start of this method; the + feature matrix is otherwise kept in its native format throughout. + treatment (np.array, pd.Series, or pl.Series): a treatment vector + y (np.array, pd.Series, or pl.Series): an outcome vector + p (np.ndarray, pd.Series, pl.Series, or dict, optional): an array of propensity scores of float (0,1) in + the single-treatment case; or, a dictionary of treatment groups that map to propensity vectors of float (0,1); if None will run ElasticNetPropensityModel() to generate the propensity scores. """ + X = collect_if_lazy(X) if (self.learner is None) and ( (self.control_outcome_learner is None) or (self.treatment_outcome_learner is None) @@ -84,20 +90,20 @@ def fit(self, X, treatment, y, p=None): "`treatment_outcome_learner`, `control_effect_learner`, and " "`treatment_effect_learner` must be specified." ) - X, treatment, y = convert_pd_to_np(X, treatment, y) check_treatment_vector(treatment, self.control_name) - self.t_groups = np.unique(treatment[treatment != self.control_name]) + treatment_np = to_numpy(treatment) + self.t_groups = np.unique(treatment_np[treatment_np != self.control_name]) self.t_groups.sort() if p is None: - self._set_propensity_models(X=X, treatment=treatment, y=y) + self._set_propensity_models(X=X, treatment=treatment_np, y=to_numpy(y)) p = self.propensity else: p = self._format_p(p, self.t_groups) self._classes = {group: i for i, group in enumerate(self.t_groups)} - # Resolve base models from stored constructor args (no templates needed). + # Resolve base models from stored constructor args (scikit-learn convention). _control_outcome_learner = ( self.control_outcome_learner if self.control_outcome_learner is not None @@ -128,34 +134,44 @@ def fit(self, X, treatment, y, p=None): self.models_tau_t = { group: deepcopy(_treatment_effect_learner) for group in self.t_groups } - self.vars_c = {} self.vars_t = {} - # model_mu_c is trained on control data, identical for every treatment group. - control_mask = treatment == self.control_name + # model_mu_c is trained on control only (identical across groups) — fit once. + control_mask = treatment_np == self.control_name + X_control = filter_mask(X, control_mask) + y_control = to_numpy(filter_mask(y, control_mask)) self.model_mu_c = deepcopy(_control_outcome_learner) - self.model_mu_c.fit(X[control_mask], y[control_mask]) + self.model_mu_c.fit(X_control, y_control) self.models_mu_c = {group: self.model_mu_c for group in self.t_groups} - - y_control_pred = self.model_mu_c.predict(X[control_mask]) - self.var_c = (y[control_mask] - y_control_pred).var() + # var_c is a single scalar since the control model is shared across groups + self.var_c = (y_control - self.model_mu_c.predict(X_control)).var() self.vars_c = {group: self.var_c for group in self.t_groups} for group in self.t_groups: - treatment_mask = treatment == group - X_treat = X[treatment_mask] - y_treat = y[treatment_mask] + mask = (treatment_np == group) | (treatment_np == self.control_name) + treatment_filt = filter_mask(treatment, mask) + X_filt = filter_mask(X, mask) + y_filt = filter_mask(y, mask) + w = (to_numpy(treatment_filt) == group).astype(int) + + X_filt_c = filter_mask(X_filt, w == 0) + X_filt_t = filter_mask(X_filt, w == 1) + y_filt_np = to_numpy(y_filt) - self.models_mu_t[group].fit(X_treat, y_treat) + # Train treatment outcome model + self.models_mu_t[group].fit(X_filt_t, filter_mask(y_filt, w == 1)) - self.vars_t[group] = ( - y_treat - self.models_mu_t[group].predict(X_treat) + var_t = ( + y_filt_np[w == 1] - self.models_mu_t[group].predict(X_filt_t) ).var() + self.vars_t[group] = var_t + + # Train treatment effect models + d_c = self.models_mu_t[group].predict(X_filt_c) - y_filt_np[w == 0] + d_t = y_filt_np[w == 1] - self.model_mu_c.predict(X_filt_t) + self.models_tau_c[group].fit(X_filt_c, d_c) + self.models_tau_t[group].fit(X_filt_t, d_t) - d_c = self.models_mu_t[group].predict(X[control_mask]) - y[control_mask] - d_t = y_treat - self.model_mu_c.predict(X_treat) - self.models_tau_c[group].fit(X[control_mask], d_c) - self.models_tau_t[group].fit(X_treat, d_t) return self def predict( @@ -164,56 +180,57 @@ def predict( """Predict treatment effects. Args: - X (np.matrix or np.array or pd.Dataframe): a feature matrix - treatment (np.array or pd.Series, optional): a treatment vector - y (np.array or pd.Series, optional): an outcome vector - p (np.ndarray or pd.Series or dict, optional): propensity scores + X (np.matrix, np.array, pd.DataFrame, pl.DataFrame, or pl.LazyFrame): a feature matrix. + A pl.LazyFrame is collected once at the start of this method. + treatment (np.array, pd.Series, or pl.Series, optional): a treatment vector + y (np.array, pd.Series, or pl.Series, optional): an outcome vector + p (np.ndarray, pd.Series, pl.Series, or dict, optional): an array of propensity scores of float (0,1) in + the single-treatment case; or, a dictionary of treatment groups that map to propensity vectors of + float (0,1); if None will run ElasticNetPropensityModel() to generate the propensity scores. return_components (bool, optional): whether to return outcome for treatment and control seperately verbose (bool, optional): whether to output progress logs Returns: (numpy.ndarray): Predictions of treatment effects. """ - X, treatment, y = convert_pd_to_np(X, treatment, y) + X = collect_if_lazy(X) if p is None: logger.info("Generating propensity score") - p = dict() - for group in self.t_groups: - p_model = self.propensity_model[group] - p[group] = p_model.predict(X) + p = { + group: self.propensity_model[group].predict(X) + for group in self.t_groups + } else: p = self._format_p(p, self.t_groups) - te = np.zeros((X.shape[0], self.t_groups.shape[0])) + te = np.zeros((n_rows(X), self.t_groups.shape[0])) dhat_cs = {} dhat_ts = {} - yhat_c_verbose = None - if (y is not None) and (treatment is not None) and verbose: - control_mask = treatment == self.control_name - yhat_c_verbose = self.model_mu_c.predict(X[control_mask]) - for i, group in enumerate(self.t_groups): - model_tau_c = self.models_tau_c[group] - model_tau_t = self.models_tau_t[group] - dhat_cs[group] = model_tau_c.predict(X) - dhat_ts[group] = model_tau_t.predict(X) + dhat_cs[group] = self.models_tau_c[group].predict(X) + dhat_ts[group] = self.models_tau_t[group].predict(X) _te = (p[group] * dhat_cs[group] + (1 - p[group]) * dhat_ts[group]).reshape( -1, 1 ) te[:, i] = np.ravel(_te) - if yhat_c_verbose is not None: - mask = (treatment == group) | (treatment == self.control_name) - treatment_filt = treatment[mask] - X_filt = X[mask] - y_filt = y[mask] - w = (treatment_filt == group).astype(int) + if (y is not None) and (treatment is not None) and verbose: + treatment_np = to_numpy(treatment) + mask = (treatment_np == group) | (treatment_np == self.control_name) + treatment_filt_np = treatment_np[mask] + X_filt = filter_mask(X, mask) + y_filt = to_numpy(filter_mask(y, mask)) + w = (treatment_filt_np == group).astype(int) yhat = np.zeros_like(y_filt, dtype=float) - yhat[w == 0] = yhat_c_verbose - yhat[w == 1] = self.models_mu_t[group].predict(X_filt[w == 1]) + yhat[w == 0] = self.models_mu_c[group].predict( + filter_mask(X_filt, w == 0) + ) + yhat[w == 1] = self.models_mu_t[group].predict( + filter_mask(X_filt, w == 1) + ) logger.info("Error metrics for group {}".format(group)) regression_metrics(y_filt, yhat, w) @@ -238,10 +255,12 @@ def fit_predict( """Fit the X-learner and predict treatment effects. Args: - X (np.matrix or np.array or pd.Dataframe): a feature matrix - treatment (np.array or pd.Series): a treatment vector - y (np.array or pd.Series): an outcome vector - p (np.ndarray or pd.Series or dict, optional): propensity scores + X (np.matrix, np.array, pd.DataFrame, pl.DataFrame, or pl.LazyFrame): a feature matrix + treatment (np.array, pd.Series, or pl.Series): a treatment vector + y (np.array, pd.Series, or pl.Series): an outcome vector + p (np.ndarray, pd.Series, pl.Series, or dict, optional): an array of propensity scores of float (0,1) in + the single-treatment case; or, a dictionary of treatment groups that map to propensity vectors of + float (0,1); if None will run ElasticNetPropensityModel() to generate the propensity scores. return_ci (bool): whether to return confidence intervals n_bootstraps (int): number of bootstrap iterations bootstrap_size (int): number of samples per bootstrap @@ -250,7 +269,7 @@ def fit_predict( Returns: (numpy.ndarray): Predictions of treatment effects. """ - X, treatment, y = convert_pd_to_np(X, treatment, y) + X = collect_if_lazy(X) self.fit(X, treatment, y, p) if p is None: @@ -265,19 +284,22 @@ def fit_predict( if not return_ci: return te else: + treatment_np = to_numpy(treatment) + y_np = to_numpy(y) + t_groups_global = self.t_groups _classes_global = self._classes - model_mu_c_global = deepcopy(self.model_mu_c) + models_mu_c_global = deepcopy(self.models_mu_c) models_mu_t_global = deepcopy(self.models_mu_t) models_tau_c_global = deepcopy(self.models_tau_c) models_tau_t_global = deepcopy(self.models_tau_t) te_bootstraps = np.zeros( - shape=(X.shape[0], self.t_groups.shape[0], n_bootstraps) + shape=(n_rows(X), self.t_groups.shape[0], n_bootstraps) ) logger.info("Bootstrap Confidence Intervals") for i in tqdm(range(n_bootstraps)): - te_b = self.bootstrap(X, treatment, y, p, size=bootstrap_size) + te_b = self.bootstrap(X, treatment_np, y_np, p, size=bootstrap_size) te_bootstraps[:, :, i] = te_b te_lower = np.percentile(te_bootstraps, (self.ate_alpha / 2) * 100, axis=2) @@ -287,8 +309,7 @@ def fit_predict( self.t_groups = t_groups_global self._classes = _classes_global - self.model_mu_c = deepcopy(model_mu_c_global) - self.models_mu_c = {group: self.model_mu_c for group in self.t_groups} + self.models_mu_c = deepcopy(models_mu_c_global) self.models_mu_t = deepcopy(models_mu_t_global) self.models_tau_c = deepcopy(models_tau_c_global) self.models_tau_t = deepcopy(models_tau_t_global) @@ -309,10 +330,12 @@ def estimate_ate( """Estimate the Average Treatment Effect (ATE). Args: - X (np.matrix or np.array or pd.Dataframe): a feature matrix - treatment (np.array or pd.Series): a treatment vector - y (np.array or pd.Series): an outcome vector - p (np.ndarray or pd.Series or dict, optional): propensity scores + X (np.matrix, np.array, pd.DataFrame, pl.DataFrame, or pl.LazyFrame): a feature matrix + treatment (np.array, pd.Series, or pl.Series): a treatment vector + y (np.array, pd.Series, or pl.Series): an outcome vector + p (np.ndarray, pd.Series, pl.Series, or dict, optional): an array of propensity scores of float (0,1) in + the single-treatment case; or, a dictionary of treatment groups that map to propensity vectors of + float (0,1); if None will run ElasticNetPropensityModel() to generate the propensity scores. bootstrap_ci (bool): whether run bootstrap for confidence intervals n_bootstraps (int): number of bootstrap iterations bootstrap_size (int): number of samples per bootstrap @@ -320,6 +343,8 @@ def estimate_ate( Returns: The mean and confidence interval (LB, UB) of the ATE estimate. """ + X = collect_if_lazy(X) + if pretrain: if p is None: if not self.propensity: @@ -336,7 +361,8 @@ def estimate_ate( te, dhat_cs, dhat_ts = self.fit_predict( X, treatment, y, p, return_components=True ) - X, treatment, y = convert_pd_to_np(X, treatment, y) + + treatment_np = to_numpy(treatment) if p is None: p = self.propensity @@ -350,8 +376,8 @@ def estimate_ate( for i, group in enumerate(self.t_groups): _ate = te[:, i].mean() - mask = (treatment == group) | (treatment == self.control_name) - treatment_filt = treatment[mask] + mask = (treatment_np == group) | (treatment_np == self.control_name) + treatment_filt = treatment_np[mask] w = (treatment_filt == group).astype(int) prob_treatment = float(sum(w)) / w.shape[0] @@ -362,7 +388,7 @@ def estimate_ate( se = np.sqrt( ( self.vars_t[group] / prob_treatment - + self.var_c / (1 - prob_treatment) + + self.vars_c[group] / (1 - prob_treatment) + (p_filt * dhat_c + (1 - p_filt) * dhat_t).var() ) / w.shape[0] @@ -378,9 +404,10 @@ def estimate_ate( if not bootstrap_ci: return ate, ate_lb, ate_ub else: + y_np = to_numpy(y) t_groups_global = self.t_groups _classes_global = self._classes - model_mu_c_global = deepcopy(self.model_mu_c) + models_mu_c_global = deepcopy(self.models_mu_c) models_mu_t_global = deepcopy(self.models_mu_t) models_tau_c_global = deepcopy(self.models_tau_c) models_tau_t_global = deepcopy(self.models_tau_t) @@ -389,7 +416,7 @@ def estimate_ate( ate_bootstraps = np.zeros(shape=(self.t_groups.shape[0], n_bootstraps)) for n in tqdm(range(n_bootstraps)): - cate_b = self.bootstrap(X, treatment, y, p, size=bootstrap_size) + cate_b = self.bootstrap(X, treatment_np, y_np, p, size=bootstrap_size) ate_bootstraps[:, n] = cate_b.mean(axis=0) ate_lower = np.percentile( @@ -401,8 +428,7 @@ def estimate_ate( self.t_groups = t_groups_global self._classes = _classes_global - self.model_mu_c = deepcopy(model_mu_c_global) - self.models_mu_c = {group: self.model_mu_c for group in self.t_groups} + self.models_mu_c = deepcopy(models_mu_c_global) self.models_mu_t = deepcopy(models_mu_t_global) self.models_tau_c = deepcopy(models_tau_c_global) self.models_tau_t = deepcopy(models_tau_t_global) @@ -410,9 +436,7 @@ def estimate_ate( class BaseXRegressor(BaseXLearner): - """ - A parent class for X-learner regressor classes. - """ + """A parent class for X-learner regressor classes.""" def __init__( self, @@ -436,9 +460,7 @@ def __init__( class BaseXClassifier(BaseXLearner): - """ - A parent class for X-learner classifier classes. - """ + """A parent class for X-learner classifier classes.""" def __init__( self, @@ -476,29 +498,58 @@ def __init__( self.propensity = {} def fit(self, X, treatment, y, p=None): - """Fit the inference model (classifier variant — uses predict_proba).""" - # Resolve and validate here (not in __init__) — sklearn convention. - _control_outcome_learner = self.control_outcome_learner or self.outcome_learner - _treatment_outcome_learner = ( - self.treatment_outcome_learner or self.outcome_learner - ) - _control_effect_learner = self.control_effect_learner or self.effect_learner - _treatment_effect_learner = self.treatment_effect_learner or self.effect_learner + """Fit the inference model. - if ( - _control_outcome_learner is None or _treatment_outcome_learner is None - ) and (_control_effect_learner is None or _treatment_effect_learner is None): + Args: + X (np.matrix, np.array, pd.DataFrame, pl.DataFrame, or pl.LazyFrame): a feature matrix. + A pl.LazyFrame is collected once at the start of this method; the + feature matrix is otherwise kept in its native format throughout. + treatment (np.array, pd.Series, or pl.Series): a treatment vector + y (np.array, pd.Series, or pl.Series): an outcome vector + p (np.ndarray, pd.Series, pl.Series, or dict, optional): an array of propensity scores of float (0,1) in + the single-treatment case; or, a dictionary of treatment groups that map to propensity vectors of + float (0,1); if None will run ElasticNetPropensityModel() to generate the propensity scores. + """ + X = collect_if_lazy(X) + if (self.outcome_learner is None) and ( + (self.control_outcome_learner is None) + or (self.treatment_outcome_learner is None) + or (self.control_effect_learner is None) + or (self.treatment_effect_learner is None) + ): raise ValueError( - "Either the outcome learner or the effect learner pair must be specified." + "Either `outcome_learner` and `effect_learner`, or all four " + "specialized learners must be specified." ) - X, treatment, y = convert_pd_to_np(X, treatment, y) + _control_outcome_learner = ( + deepcopy(self.outcome_learner) + if self.control_outcome_learner is None + else deepcopy(self.control_outcome_learner) + ) + _treatment_outcome_learner = ( + deepcopy(self.outcome_learner) + if self.treatment_outcome_learner is None + else deepcopy(self.treatment_outcome_learner) + ) + _control_effect_learner = ( + deepcopy(self.effect_learner) + if self.control_effect_learner is None + else deepcopy(self.control_effect_learner) + ) + _treatment_effect_learner = ( + deepcopy(self.effect_learner) + if self.treatment_effect_learner is None + else deepcopy(self.treatment_effect_learner) + ) + check_treatment_vector(treatment, self.control_name) - self.t_groups = np.unique(treatment[treatment != self.control_name]) + treatment_np = to_numpy(treatment) + self.t_groups = np.unique(treatment_np[treatment_np != self.control_name]) self.t_groups.sort() if p is None: - self._set_propensity_models(X=X, treatment=treatment, y=y) + self._set_propensity_models(X=X, treatment=treatment_np, y=to_numpy(y)) p = self.propensity else: p = self._format_p(p, self.t_groups) @@ -514,85 +565,109 @@ def fit(self, X, treatment, y, p=None): self.models_tau_t = { group: deepcopy(_treatment_effect_learner) for group in self.t_groups } - self.vars_c = {} self.vars_t = {} - control_mask = treatment == self.control_name + # model_mu_c is trained on control only (identical across groups) — fit once. + control_mask = treatment_np == self.control_name + X_control = filter_mask(X, control_mask) + y_control = to_numpy(filter_mask(y, control_mask)) self.model_mu_c = deepcopy(_control_outcome_learner) - self.model_mu_c.fit(X[control_mask], y[control_mask]) + self.model_mu_c.fit(X_control, y_control) self.models_mu_c = {group: self.model_mu_c for group in self.t_groups} - - y_control_pred = self.model_mu_c.predict_proba(X[control_mask])[:, 1] - self.var_c = (y[control_mask] - y_control_pred).var() + self.var_c = (y_control - self.model_mu_c.predict_proba(X_control)[:, 1]).var() self.vars_c = {group: self.var_c for group in self.t_groups} for group in self.t_groups: - treatment_mask = treatment == group - X_treat = X[treatment_mask] - y_treat = y[treatment_mask] - - self.models_mu_t[group].fit(X_treat, y_treat) - - self.vars_t[group] = ( - y_treat - self.models_mu_t[group].predict_proba(X_treat)[:, 1] + mask = (treatment_np == group) | (treatment_np == self.control_name) + treatment_filt = filter_mask(treatment, mask) + X_filt = filter_mask(X, mask) + y_filt = filter_mask(y, mask) + w = (to_numpy(treatment_filt) == group).astype(int) + + X_filt_c = filter_mask(X_filt, w == 0) + X_filt_t = filter_mask(X_filt, w == 1) + y_filt_np = to_numpy(y_filt) + + # Train treatment outcome model + self.models_mu_t[group].fit(X_filt_t, filter_mask(y_filt, w == 1)) + + var_t = ( + y_filt_np[w == 1] + - self.models_mu_t[group].predict_proba(X_filt_t)[:, 1] ).var() + self.vars_t[group] = var_t + # Train treatment effect models d_c = ( - self.models_mu_t[group].predict_proba(X[control_mask])[:, 1] - - y[control_mask] + self.models_mu_t[group].predict_proba(X_filt_c)[:, 1] + - y_filt_np[w == 0] + ) + d_t = ( + y_filt_np[w == 1] + - self.models_mu_c[group].predict_proba(X_filt_t)[:, 1] ) - d_t = y_treat - self.model_mu_c.predict_proba(X_treat)[:, 1] - self.models_tau_c[group].fit(X[control_mask], d_c) - self.models_tau_t[group].fit(X_treat, d_t) + self.models_tau_c[group].fit(X_filt_c, d_c) + self.models_tau_t[group].fit(X_filt_t, d_t) + return self def predict( self, X, treatment=None, y=None, p=None, return_components=False, verbose=True ): - """Predict treatment effects (classifier variant — uses predict_proba).""" - X, treatment, y = convert_pd_to_np(X, treatment, y) + """Predict treatment effects (classifier variant — uses predict_proba). + + Args: + X (np.matrix, np.array, pd.DataFrame, pl.DataFrame, or pl.LazyFrame): a feature matrix. + A pl.LazyFrame is collected once at the start of this method. + treatment (np.array, pd.Series, or pl.Series, optional): a treatment vector + y (np.array, pd.Series, or pl.Series, optional): an outcome vector + p (np.ndarray, pd.Series, pl.Series, or dict, optional): an array of propensity scores of float (0,1) in + the single-treatment case; or, a dictionary of treatment groups that map to propensity vectors of + float (0,1); if None will run ElasticNetPropensityModel() to generate the propensity scores. + return_components (bool, optional): whether to return outcome for treatment and control seperately + verbose (bool, optional): whether to output progress logs + Returns: + (numpy.ndarray): Predictions of treatment effects. + """ + X = collect_if_lazy(X) if p is None: logger.info("Generating propensity score") - p = dict() - for group in self.t_groups: - p_model = self.propensity_model[group] - p[group] = p_model.predict(X) + p = { + group: self.propensity_model[group].predict(X) + for group in self.t_groups + } else: p = self._format_p(p, self.t_groups) - te = np.zeros((X.shape[0], self.t_groups.shape[0])) + te = np.zeros((n_rows(X), self.t_groups.shape[0])) dhat_cs = {} dhat_ts = {} - yhat_c_verbose = None - if (y is not None) and (treatment is not None) and verbose: - control_mask = treatment == self.control_name - yhat_c_verbose = self.model_mu_c.predict_proba(X[control_mask])[:, 1] - for i, group in enumerate(self.t_groups): - model_tau_c = self.models_tau_c[group] - model_tau_t = self.models_tau_t[group] - dhat_cs[group] = model_tau_c.predict(X) - dhat_ts[group] = model_tau_t.predict(X) + dhat_cs[group] = self.models_tau_c[group].predict(X) + dhat_ts[group] = self.models_tau_t[group].predict(X) _te = (p[group] * dhat_cs[group] + (1 - p[group]) * dhat_ts[group]).reshape( -1, 1 ) te[:, i] = np.ravel(_te) - if yhat_c_verbose is not None: - mask = (treatment == group) | (treatment == self.control_name) - treatment_filt = treatment[mask] - X_filt = X[mask] - y_filt = y[mask] - w = (treatment_filt == group).astype(int) + if (y is not None) and (treatment is not None) and verbose: + treatment_np = to_numpy(treatment) + mask = (treatment_np == group) | (treatment_np == self.control_name) + treatment_filt_np = treatment_np[mask] + X_filt = filter_mask(X, mask) + y_filt = to_numpy(filter_mask(y, mask)) + w = (treatment_filt_np == group).astype(int) yhat = np.zeros_like(y_filt, dtype=float) - yhat[w == 0] = yhat_c_verbose - yhat[w == 1] = self.models_mu_t[group].predict_proba(X_filt[w == 1])[ - :, 1 - ] + yhat[w == 0] = self.models_mu_c[group].predict_proba( + filter_mask(X_filt, w == 0) + )[:, 1] + yhat[w == 1] = self.models_mu_t[group].predict_proba( + filter_mask(X_filt, w == 1) + )[:, 1] logger.info("Error metrics for group {}".format(group)) classification_metrics(y_filt, yhat, w) diff --git a/causalml/propensity.py b/causalml/propensity.py index 4e12dcb9..fc0e0640 100644 --- a/causalml/propensity.py +++ b/causalml/propensity.py @@ -38,8 +38,10 @@ def fit(self, X, y): Fit a propensity model. Args: - X (numpy.ndarray): a feature matrix - y (numpy.ndarray): a binary target vector + X (numpy.ndarray, pd.DataFrame, or pl.DataFrame): a feature matrix. + scikit-learn >= 1.6 accepts pandas and Polars DataFrames + natively, so no conversion is performed here. + y (numpy.ndarray, pd.Series, or pl.Series): a binary target vector """ self.model.fit(X, y) if self.calibrate: @@ -57,7 +59,7 @@ def predict(self, X): Predict propensity scores. Args: - X (numpy.ndarray): a feature matrix + X (numpy.ndarray, pd.DataFrame, or pl.DataFrame): a feature matrix Returns: (numpy.ndarray): Propensity scores between 0 and 1. @@ -73,8 +75,8 @@ def fit_predict(self, X, y): Fit a propensity model and predict propensity scores. Args: - X (numpy.ndarray): a feature matrix - y (numpy.ndarray): a binary target vector + X (numpy.ndarray, pd.DataFrame, or pl.DataFrame): a feature matrix + y (numpy.ndarray, pd.Series, or pl.Series): a binary target vector Returns: (numpy.ndarray): Propensity scores between 0 and 1. @@ -159,10 +161,9 @@ def fit(self, X, y, stop_val_size=0.2): Fit a propensity model. Args: - X (numpy.ndarray): a feature matrix - y (numpy.ndarray): a binary target vector + X (numpy.ndarray, pd.DataFrame, or pl.DataFrame): a feature matrix + y (numpy.ndarray, pd.Series, or pl.Series): a binary target vector """ - if self.early_stop: X_train, X_val, y_train, y_val = train_test_split( X, y, test_size=stop_val_size @@ -196,11 +197,11 @@ def compute_propensity_score( """Generate propensity score if user didn't provide and optionally calibrate. Args: - X (np.matrix): features for training - treatment (np.array or pd.Series): a treatment vector for training + X (np.matrix, pd.DataFrame, or pl.DataFrame): features for training + treatment (np.array, pd.Series, or pl.Series): a treatment vector for training p_model (model object, optional): a binary classifier with either a predict_proba or predict method - X_pred (np.matrix, optional): features for prediction - treatment_pred (np.array or pd.Series, optional): a treatment vector for prediciton + X_pred (np.matrix, pd.DataFrame, or pl.DataFrame, optional): features for prediction + treatment_pred (np.array, pd.Series, or pl.Series, optional): a treatment vector for prediction calibrate_p (bool, optional): whether calibrate the propensity score clip_bounds (tuple, optional): lower and upper bounds for clipping propensity scores. Bounds should be implemented such that: 0 < lower < upper < 1, to avoid division by zero in BaseRLearner.fit_predict() step. @@ -211,7 +212,8 @@ def compute_propensity_score( - p_model (PropensityModel): either the original p_model or a trained ElasticNetPropensityModel """ if treatment_pred is None: - treatment_pred = treatment.copy() + treatment_pred = treatment.copy() if hasattr(treatment, "copy") else treatment + if p_model is None: p_model = ElasticNetPropensityModel( clip_bounds=clip_bounds, calibrate=calibrate_p diff --git a/pyproject.toml b/pyproject.toml index 0ab8d825..37ad7aaf 100755 --- a/pyproject.toml +++ b/pyproject.toml @@ -44,9 +44,11 @@ dependencies = [ ] [project.optional-dependencies] +polars = ["polars>=1.0.0"] test = [ "pytest>=4.6", - "pytest-cov>=4.0" + "pytest-cov>=4.0", + "causalml[polars]" ] tf = [ "tensorflow>=2.4.0" diff --git a/tests/test_polars_support.py b/tests/test_polars_support.py new file mode 100644 index 00000000..8b2c1f70 --- /dev/null +++ b/tests/test_polars_support.py @@ -0,0 +1,463 @@ +""" +Tests for Polars DataFrame/Series support across CausalML meta-learners. + +Run with: + pytest tests/test_polars_support.py -v --noconftest +""" + +import pytest +import numpy as np +import pandas as pd + +# Skip the entire module if polars is not installed +polars = pytest.importorskip("polars", reason="polars is not installed") +import polars as pl + +from sklearn.linear_model import LinearRegression + +from sklearn.linear_model import LogisticRegression +from causalml.inference.meta.tlearner import BaseTClassifier +from causalml.inference.meta.slearner import BaseSClassifier +from causalml.inference.meta.xlearner import BaseXClassifier +from causalml.inference.meta.tlearner import BaseTRegressor +from causalml.inference.meta.slearner import BaseSRegressor +from causalml.inference.meta.xlearner import BaseXRegressor +from causalml.inference.meta.rlearner import BaseRRegressor +from causalml.inference.meta.drlearner import BaseDRRegressor +from causalml.inference.meta.utils import convert_pd_to_np, check_p_conditions + +# Fixtures + +N = 200 +N_FEATURES = 5 +RANDOM_STATE = 42 + + +@pytest.fixture(scope="module") +def synthetic_data_numpy(): + """Return (X, treatment, y) as NumPy arrays — the baseline.""" + rng = np.random.default_rng(RANDOM_STATE) + X = rng.standard_normal((N, N_FEATURES)) + treatment = rng.choice([0, 1], size=N) + y = X[:, 0] * treatment + rng.standard_normal(N) * 0.1 + return X, treatment, y + + +@pytest.fixture(scope="module") +def synthetic_data_pandas(synthetic_data_numpy): + """Return (X, treatment, y) as pandas objects.""" + X_np, t_np, y_np = synthetic_data_numpy + X = pd.DataFrame(X_np, columns=[f"f{i}" for i in range(N_FEATURES)]) + treatment = pd.Series(t_np, name="treatment") + y = pd.Series(y_np, name="outcome") + return X, treatment, y + + +@pytest.fixture(scope="module") +def synthetic_data_polars(synthetic_data_numpy): + """Return (X, treatment, y) as polars objects.""" + X_np, t_np, y_np = synthetic_data_numpy + X = pl.DataFrame({f"f{i}": X_np[:, i] for i in range(N_FEATURES)}) + treatment = pl.Series("treatment", t_np) + y = pl.Series("outcome", y_np) + return X, treatment, y + + +@pytest.fixture(scope="module") +def synthetic_data_polars_lazy(synthetic_data_polars): + """Return X as a polars LazyFrame, treatment and y as Series.""" + X, treatment, y = synthetic_data_polars + return X.lazy(), treatment, y + + +@pytest.fixture(scope="module") +def synthetic_data_numpy_binary(): + """Return (X, treatment, y) with binary y for classifier tests.""" + rng = np.random.default_rng(RANDOM_STATE) + X = rng.standard_normal((N, N_FEATURES)) + treatment = rng.choice([0, 1], size=N) + y = rng.choice([0, 1], size=N) + return X, treatment, y + + +@pytest.fixture(scope="module") +def synthetic_data_polars_binary(synthetic_data_numpy_binary): + X_np, t_np, y_np = synthetic_data_numpy_binary + X = pl.DataFrame({f"f{i}": X_np[:, i] for i in range(N_FEATURES)}) + treatment = pl.Series("treatment", t_np) + y = pl.Series("outcome", y_np) + return X, treatment, y + + +@pytest.fixture(scope="module") +def synthetic_data_polars_lazy_binary(synthetic_data_polars_binary): + X, treatment, y = synthetic_data_polars_binary + return X.lazy(), treatment, y + + +# convert_pd_to_np unit tests + + +class TestConvertPdToNp: + def test_numpy_passthrough(self, synthetic_data_numpy): + X, t, y = synthetic_data_numpy + X_out, t_out, y_out = convert_pd_to_np(X, t, y) + np.testing.assert_array_equal(X_out, X) + np.testing.assert_array_equal(t_out, t) + + def test_pandas_conversion(self, synthetic_data_pandas): + X, t, y = synthetic_data_pandas + X_out, t_out, y_out = convert_pd_to_np(X, t, y) + assert isinstance(X_out, np.ndarray) + assert isinstance(t_out, np.ndarray) + assert isinstance(y_out, np.ndarray) + + def test_polars_dataframe_conversion(self, synthetic_data_polars): + X, t, y = synthetic_data_polars + X_out, t_out, y_out = convert_pd_to_np(X, t, y) + assert isinstance(X_out, np.ndarray) + assert X_out.shape == (N, N_FEATURES) + assert isinstance(t_out, np.ndarray) + assert t_out.shape == (N,) + assert isinstance(y_out, np.ndarray) + + def test_polars_lazyframe_conversion(self, synthetic_data_polars_lazy): + X_lazy, t, y = synthetic_data_polars_lazy + X_out = convert_pd_to_np(X_lazy) + assert isinstance(X_out, np.ndarray) + assert X_out.shape == (N, N_FEATURES) + + def test_none_passthrough(self): + result = convert_pd_to_np(None) + assert result is None + + def test_single_arg(self, synthetic_data_polars): + X, _, _ = synthetic_data_polars + X_out = convert_pd_to_np(X) + assert isinstance(X_out, np.ndarray) + + def test_single_column_polars_df_is_1d(self): + """A single-column pl.DataFrame should be squeezed to 1-D.""" + s = pl.DataFrame({"a": [1, 2, 3]}) + out = convert_pd_to_np(s) + assert out.ndim == 1 + + def test_multi_column_polars_df_is_2d(self, synthetic_data_polars): + X, _, _ = synthetic_data_polars + out = convert_pd_to_np(X) + assert out.ndim == 2 + + +# check_p_conditions with polars Series + + +class TestCheckPConditions: + def test_polars_series_accepted(self): + t_groups = np.array([1]) + p = pl.Series("p", np.linspace(0.1, 0.9, N)) + # Should not raise + check_p_conditions(p, t_groups) + + def test_polars_series_out_of_bounds_raises(self): + t_groups = np.array([1]) + p = pl.Series("p", np.linspace(0.0, 1.0, N)) # includes 0 and 1 + with pytest.raises(AssertionError): + check_p_conditions(p, t_groups) + + +# Helper: assert predictions are close between two input formats + + +def _assert_te_close(te_ref, te_other, atol=1e-5): + """Treatment-effect arrays from two input formats should be identical.""" + np.testing.assert_allclose( + te_ref, + te_other, + atol=atol, + err_msg="Treatment effects differ between input formats", + ) + + +# T-Learner + + +class TestTLearnerPolars: + @pytest.fixture(autouse=True) + def _learner(self): + self.learner = BaseTRegressor(learner=LinearRegression()) + + def _fit_predict(self, X, treatment, y): + self.learner.fit(X, treatment, y) + return self.learner.predict(X) + + def test_polars_matches_numpy(self, synthetic_data_numpy, synthetic_data_polars): + te_np = self._fit_predict(*synthetic_data_numpy) + self.learner = BaseTRegressor(learner=LinearRegression()) + te_pl = self._fit_predict(*synthetic_data_polars) + _assert_te_close(te_np, te_pl) + + def test_polars_matches_pandas(self, synthetic_data_pandas, synthetic_data_polars): + te_pd = self._fit_predict(*synthetic_data_pandas) + self.learner = BaseTRegressor(learner=LinearRegression()) + te_pl = self._fit_predict(*synthetic_data_polars) + _assert_te_close(te_pd, te_pl) + + def test_lazyframe_input(self, synthetic_data_numpy, synthetic_data_polars_lazy): + te_np = self._fit_predict(*synthetic_data_numpy) + self.learner = BaseTRegressor(learner=LinearRegression()) + te_pl = self._fit_predict(*synthetic_data_polars_lazy) + _assert_te_close(te_np, te_pl) + + def test_fit_predict_returns_numpy(self, synthetic_data_polars): + te = self._fit_predict(*synthetic_data_polars) + assert isinstance(te, np.ndarray) + + def test_estimate_ate_polars(self, synthetic_data_polars): + X, treatment, y = synthetic_data_polars + ate, lb, ub = self.learner.estimate_ate(X, treatment, y) + assert isinstance(ate, np.ndarray) + assert lb[0] < ate[0] < ub[0] + + +# S-Learner + + +class TestSLearnerPolars: + @pytest.fixture(autouse=True) + def _learner(self): + self.learner = BaseSRegressor(learner=LinearRegression()) + + def _fit_predict(self, X, treatment, y): + self.learner.fit(X, treatment, y) + return self.learner.predict(X) + + def test_polars_matches_numpy(self, synthetic_data_numpy, synthetic_data_polars): + te_np = self._fit_predict(*synthetic_data_numpy) + self.learner = BaseSRegressor(learner=LinearRegression()) + te_pl = self._fit_predict(*synthetic_data_polars) + _assert_te_close(te_np, te_pl) + + def test_fit_predict_returns_numpy(self, synthetic_data_polars): + te = self._fit_predict(*synthetic_data_polars) + assert isinstance(te, np.ndarray) + + def test_estimate_ate_polars(self, synthetic_data_polars): + X, treatment, y = synthetic_data_polars + # BaseSLearner.estimate_ate returns only `ate` when return_ci=False (default) + ate = self.learner.estimate_ate(X, treatment, y) + assert isinstance(ate, np.ndarray) + # With return_ci=True it returns (ate, lb, ub) + ate, lb, ub = self.learner.estimate_ate(X, treatment, y, return_ci=True) + assert lb[0] < ate[0] < ub[0] + + +# X-Learner + + +class TestXLearnerPolars: + @pytest.fixture(autouse=True) + def _learner(self): + self.learner = BaseXRegressor(learner=LinearRegression()) + + def _fit_predict(self, X, treatment, y): + self.learner.fit(X, treatment, y) + return self.learner.predict(X) + + def test_polars_matches_numpy(self, synthetic_data_numpy, synthetic_data_polars): + te_np = self._fit_predict(*synthetic_data_numpy) + self.learner = BaseXRegressor(learner=LinearRegression()) + te_pl = self._fit_predict(*synthetic_data_polars) + _assert_te_close(te_np, te_pl) + + def test_fit_predict_returns_numpy(self, synthetic_data_polars): + te = self._fit_predict(*synthetic_data_polars) + assert isinstance(te, np.ndarray) + + +# R-Learner + + +class TestRLearnerPolars: + @pytest.fixture(autouse=True) + def _learner(self): + # fixed random_state so KFold splits are identical across both runs + self.learner = BaseRRegressor( + learner=LinearRegression(), random_state=RANDOM_STATE + ) + + def _fit_predict(self, X, treatment, y): + self.learner.fit(X, treatment, y) + return self.learner.predict(X) + + def test_polars_matches_numpy(self, synthetic_data_numpy, synthetic_data_polars): + # With a fixed random_state the KFold splits are deterministic, + # so numpy and polars inputs must produce identical results. + te_np = self._fit_predict(*synthetic_data_numpy) + self.learner = BaseRRegressor( + learner=LinearRegression(), random_state=RANDOM_STATE + ) + te_pl = self._fit_predict(*synthetic_data_polars) + _assert_te_close(te_np, te_pl) + + def test_fit_predict_returns_numpy(self, synthetic_data_polars): + te = self._fit_predict(*synthetic_data_polars) + assert isinstance(te, np.ndarray) + + +# DR-Learner + + +class TestDRLearnerPolars: + @pytest.fixture(autouse=True) + def _learner(self): + self.learner = BaseDRRegressor(learner=LinearRegression()) + + def _fit_predict(self, X, treatment, y, seed=None): + self.learner.fit(X, treatment, y, seed=seed) + return self.learner.predict(X) + + def test_polars_matches_numpy(self, synthetic_data_numpy, synthetic_data_polars): + # DR-Learner uses KFold with a seed parameter passed to fit(); fix it + # so both runs use the same splits. + te_np = self._fit_predict(*synthetic_data_numpy, seed=RANDOM_STATE) + self.learner = BaseDRRegressor(learner=LinearRegression()) + te_pl = self._fit_predict(*synthetic_data_polars, seed=RANDOM_STATE) + _assert_te_close(te_np, te_pl) + + def test_fit_predict_returns_numpy(self, synthetic_data_polars): + te = self._fit_predict(*synthetic_data_polars) + assert isinstance(te, np.ndarray) + + +# Edge cases + + +class TestEdgeCases: + def test_mixed_inputs_polars_x_numpy_treatment( + self, synthetic_data_numpy, synthetic_data_polars + ): + """X as polars DataFrame, treatment and y as numpy — should work fine.""" + X_pl, _, _ = synthetic_data_polars + _, t_np, y_np = synthetic_data_numpy + learner = BaseTRegressor(learner=LinearRegression()) + learner.fit(X_pl, t_np, y_np) + te = learner.predict(X_pl) + assert isinstance(te, np.ndarray) + + def test_polars_predict_only(self, synthetic_data_numpy, synthetic_data_polars): + """Fit on numpy, predict on polars — predict must accept polars X.""" + X_np, t_np, y_np = synthetic_data_numpy + X_pl, _, _ = synthetic_data_polars + learner = BaseTRegressor(learner=LinearRegression()) + learner.fit(X_np, t_np, y_np) + te = learner.predict(X_pl) + assert isinstance(te, np.ndarray) + assert te.shape[0] == N + + +# T-Learner Classifier + + +class TestTClassifierPolars: + @pytest.fixture(autouse=True) + def _learner(self): + self.learner = BaseTClassifier(learner=LogisticRegression()) + + def _fit_predict(self, X, treatment, y): + self.learner.fit(X, treatment, y) + return self.learner.predict(X) + + def test_polars_matches_numpy( + self, synthetic_data_numpy_binary, synthetic_data_polars_binary + ): + te_np = self._fit_predict(*synthetic_data_numpy_binary) + self.learner = BaseTClassifier(learner=LogisticRegression()) + te_pl = self._fit_predict(*synthetic_data_polars_binary) + _assert_te_close(te_np, te_pl) + + def test_lazyframe_input( + self, synthetic_data_numpy_binary, synthetic_data_polars_lazy_binary + ): + te_np = self._fit_predict(*synthetic_data_numpy_binary) + self.learner = BaseTClassifier(learner=LogisticRegression()) + te_pl = self._fit_predict(*synthetic_data_polars_lazy_binary) + _assert_te_close(te_np, te_pl) + + def test_fit_predict_returns_numpy(self, synthetic_data_polars_binary): + te = self._fit_predict(*synthetic_data_polars_binary) + assert isinstance(te, np.ndarray) + + +# S-Learner Classifier + + +class TestSClassifierPolars: + @pytest.fixture(autouse=True) + def _learner(self): + self.learner = BaseSClassifier(learner=LogisticRegression()) + + def _fit_predict(self, X, treatment, y): + self.learner.fit(X, treatment, y) + return self.learner.predict(X) + + def test_polars_matches_numpy( + self, synthetic_data_numpy_binary, synthetic_data_polars_binary + ): + te_np = self._fit_predict(*synthetic_data_numpy_binary) + self.learner = BaseSClassifier(learner=LogisticRegression()) + te_pl = self._fit_predict(*synthetic_data_polars_binary) + _assert_te_close(te_np, te_pl) + + def test_lazyframe_input( + self, synthetic_data_numpy_binary, synthetic_data_polars_lazy_binary + ): + te_np = self._fit_predict(*synthetic_data_numpy_binary) + self.learner = BaseSClassifier(learner=LogisticRegression()) + te_pl = self._fit_predict(*synthetic_data_polars_lazy_binary) + _assert_te_close(te_np, te_pl) + + def test_fit_predict_returns_numpy(self, synthetic_data_polars_binary): + te = self._fit_predict(*synthetic_data_polars_binary) + assert isinstance(te, np.ndarray) + + +# X-Learner Classifier + + +class TestXClassifierPolars: + @pytest.fixture(autouse=True) + def _learner(self): + self.learner = BaseXClassifier( + outcome_learner=LogisticRegression(), + effect_learner=LinearRegression(), + ) + + def _fit_predict(self, X, treatment, y): + self.learner.fit(X, treatment, y) + return self.learner.predict(X) + + def test_polars_matches_numpy( + self, synthetic_data_numpy_binary, synthetic_data_polars_binary + ): + te_np = self._fit_predict(*synthetic_data_numpy_binary) + self.learner = BaseXClassifier( + outcome_learner=LogisticRegression(), + effect_learner=LinearRegression(), + ) + te_pl = self._fit_predict(*synthetic_data_polars_binary) + _assert_te_close(te_np, te_pl) + + def test_lazyframe_input( + self, synthetic_data_numpy_binary, synthetic_data_polars_lazy_binary + ): + te_np = self._fit_predict(*synthetic_data_numpy_binary) + self.learner = BaseXClassifier( + outcome_learner=LogisticRegression(), + effect_learner=LinearRegression(), + ) + te_pl = self._fit_predict(*synthetic_data_polars_lazy_binary) + _assert_te_close(te_np, te_pl) + + def test_fit_predict_returns_numpy(self, synthetic_data_polars_binary): + te = self._fit_predict(*synthetic_data_polars_binary) + assert isinstance(te, np.ndarray)