diff --git a/tests/test_processing.py b/tests/test_processing.py index fc44425..46c60a4 100644 --- a/tests/test_processing.py +++ b/tests/test_processing.py @@ -798,57 +798,81 @@ def test_pd_fill_gap(self): def test_combiner(self): test_df = pd.DataFrame( { - "Tin__°C__building": [10.0, 20.0, 30.0], - "Text__°C__outdoor": [-1.0, 5.0, 4.0], - "radiation__W/m2__outdoor": [50, 100, 400], - "Humidity__%HR": [10, 15, 13], - "Humidity__%HR__room1": [20, 30, 50], - "Humidity_2": [10, 15, 13], - "light__DIMENSIONLESS__building": [100, 200, 300], - "mass_flwr__m3/h__hvac": [300, 500, 600], + "Tin__°C__building__room_1": [10.0, 20.0, 30.0], + "Tin__°C__building__room_2": [20.0, 40.0, 60.0], + "Text__°C__outdoor__meteo": [-1.0, 5.0, 4.0], + "radiation__W/m2__outdoor__meteo": [50, 100, 400], + "Humidity__%HR__building__room_1": [10, 15, 13], + "Humidity__%HR__building__room_2": [20, 30, 50], + "Humidity__%HR__outdoor__meteo": [10, 15, 13], + "light__DIMENSIONLESS__building__room_1": [100, 200, 300], + "mass_flwr__m3/h__hvac__pump": [300, 500, 600], }, index=pd.date_range("2009", freq="h", periods=3, tz="UTC"), ) combiner = ExpressionCombine( columns_dict={ - "T1": "Tin__°C__building", - "T2": "Text__°C__outdoor", - "m": "mass_flwr__m3/h__hvac", + "T1": "Tin__°C__building__room_1", + "T2": "Text__°C__outdoor__meteo", + "m": "mass_flwr__m3/h__hvac__pump", }, expression="(T1 - T2) * m * 1004 * 1.204", - result_column_name="loss_ventilation__J__hvac", + result_column_name="loss_ventilation__J__building__room_1", ) res = combiner.fit_transform(test_df.copy()) check_feature_names_out(combiner, res) np.testing.assert_almost_equal( - res["loss_ventilation__J__hvac"], + res["loss_ventilation__J__building__room_1"], [3989092.8, 9066120.0, 18857529.6], decimal=1, ) combiner.set_params(drop_columns=True) res = combiner.fit_transform(test_df.copy()) - assert res.shape == (3, 6) + assert res.shape == (3, 7) check_feature_names_out(combiner, res) combiner_cond = ExpressionCombine( columns_dict={ - "T1": "Text__°C__outdoor", + "T1": "Text__°C__outdoor__meteo", }, expression="(T1 > 10) * 1", - result_column_name="where_test_01__hvac", + result_column_name="where_test_01__meteo__outdoor", ) res = combiner_cond.fit_transform(test_df.copy()) check_feature_names_out(combiner_cond, res) np.testing.assert_almost_equal( - res["where_test_01__hvac"], + res["where_test_01__meteo__outdoor"], [0, 0, 0], decimal=1, ) + combiner_broadcast = ExpressionCombine( + columns_dict={ + "T_in": "Tin__°C__building", + "T_ext": "Text__°C", + "m": "mass_flwr__m3/h__hvac__pump", + }, + expression="(T_in - T_ext) * m * 1004 * 1.204", + result_column_name="loss_ventilation__J__building", + ) + + res = combiner_broadcast.fit_transform(test_df.copy()) + np.testing.assert_almost_equal( + res["loss_ventilation__J__building__room_1"], + [3989092.8, 9066120.0, 18857529.6], + decimal=1, + ) + + np.testing.assert_almost_equal( + res["loss_ventilation__J__building__room_2"], + [7615540.8, 21154280.0, 40616217.6], + decimal=1, + ) + @patch("tide.base.get_oikolab_df", side_effect=mock_get_oikolab_df) def test_fill_oiko_meteo(self, mock_get_oikolab): data = pd.read_csv( diff --git a/tide/processing.py b/tide/processing.py index 3ae3ffb..8bfb7e5 100644 --- a/tide/processing.py +++ b/tide/processing.py @@ -1972,66 +1972,133 @@ def _transform_implementation(self, X: pd.Series | pd.DataFrame): class ExpressionCombine(BaseProcessing): """A transformer that combines DataFrame columns using a mathematical expression. - This transformer evaluates a mathematical expression using specified columns from a DataFrame, - creating a new column with the result. It supports both simple aggregations and complex - physical expressions, with the option to drop the source columns after computation. + This transformer evaluates a mathematical expression using specified columns + from a DataFrame, creating new columns with the results. + It supports broadcasting: when a tag selector matches multiple columns, + the expression is applied to each matching group. Selectors matching a + single column are broadcast across all groups. Parameters ---------- columns_dict : dict[str, str] - Dictionary mapping expression variables to DataFrame column names. + Dictionary mapping expression variables to TIDE-style column tag patterns. Keys are the variable names used in the expression, and values are the - corresponding column names in the DataFrame. + tag patterns to match columns (e.g., "Tin__°C__building"). expression : str Mathematical expression to evaluate, using variables defined in columns_dict. - The expression should be a valid Python mathematical expression that can be - evaluated using pandas.eval(). + The expression must be a valid Python expression compatible with pandas.eval(). + Supports standard mathematical operators and numpy functions. result_column_name : str - Name of the new column that will contain the evaluated expression result. - Must not already exist in the DataFrame. + Base name for result columns following TIDE naming convention. + When multiple column groups are found, the sub-bloc suffix from matched + columns (e.g., "__room_1", "__room_2") is automatically appended. drop_columns : bool, default=False Whether to drop the source columns used in the expression after computation. - If True, only the result column and other non-source columns are kept. + If True, only the newly created result columns and other unused columns are kept. Attributes ---------- + column_groups_ : list[dict] + List of dictionaries mapping variable names to actual column names for each group. + Created during fit(). + + result_columns_ : list[str] + List of result column names with appropriate suffixes. Created during fit(). + + required_columns : list[str] + List of all source columns used in the expression. Updated during fit(). + feature_names_out_ : list[str] - List of column names in the transformed DataFrame. If drop_columns is True, - excludes the source columns used in the expression. + List of all column names in the transformed DataFrame. + + Notes + ----- + Broadcasting Rules: + - If all tag patterns match exactly one column, a single result column is created. + - If some patterns match multiple columns, all multi-column patterns must match + the same number of columns. + - Single-column matches are broadcast (reused) across all groups. + - The sub-bloc suffix is extracted from the first multi-column variable found. + + Column Naming: + - Result columns follow the pattern: name__unit__bloc__sub-bloc + - The sub-bloc suffix is extracted by comparing matched column names with + their tag patterns. + - If no multi-column variables exist, no suffix is added. Raises ------ ValueError - If result_column_name already exists in the DataFrame. + - If a tag pattern matches no columns in the DataFrame. + - If tag patterns resolve to incompatible column counts (multiple patterns + match different numbers of columns, none being 1). + - If result_column_name (with suffix) already exists in the output DataFrame. Examples -------- - >>> from tide.processing import ExpressionCombine + Basic usage with broadcasting: + >>> import pandas as pd - >>> # Create sample data >>> df = pd.DataFrame( ... { - ... "Tin__°C__building": [20, 21, 22], - ... "Text__°C__outdoor": [10, 11, 12], - ... "mass_flwr__m3/h__hvac": [1, 2, 3], + ... "Tin__°C__building__room_1": [20, 21, 22], + ... "Tin__°C__building__room_2": [25, 26, 27], + ... "Text__°C__outdoor__meteo": [10, 11, 12], + ... "mass_flwr__kg/s__hvac__pump": [0.1, 0.2, 0.3], ... } ... ) - >>> # Calculate ventilation losses + >>> + >>> # Calculate ventilation heat loss for each room + >>> # Text and mass_flwr are broadcast to both rooms >>> combiner = ExpressionCombine( ... columns_dict={ - ... "T1": "Tin__°C__building", - ... "T2": "Text__°C__outdoor", - ... "m": "mass_flwr__m3/h__hvac", + ... "Tin": "Tin__°C__building", + ... "Text": "Text__°C__outdoor__meteo", + ... "mdot": "mass_flwr__kg/s__hvac__pump", ... }, - ... expression="(T1 - T2) * m * 1004 * 1.204", - ... result_column_name="loss_ventilation__J__hvac", - ... drop_columns=True, + ... expression="(Tin - Text) * mdot * 1004", # Q = ΔT × ṁ × cp + ... result_column_name="Q_vent__W__building", ... ) - >>> # Transform the data + >>> >>> result = combiner.fit_transform(df) + >>> # Creates: Q_vent__W__building__room_1 and Q_vent__W__building__room_2 + + Using drop_columns to keep only results: + + >>> combiner_clean = ExpressionCombine( + ... columns_dict={ + ... "Tin": "Tin__°C__building", + ... "Text": "Text__°C__outdoor__meteo", + ... }, + ... expression="Tin - Text", + ... result_column_name="deltaT__K__building", + ... drop_columns=True, + ... ) + >>> + >>> result = combiner_clean.fit_transform(df) + >>> # Only deltaT__K__building__room_1, deltaT__K__building__room_2, + >>> # and mass_flwr__kg/s__hvac__pump remain + + Single column operation (no broadcasting): + + >>> df_simple = pd.DataFrame( + ... { + ... "P__W__hvac": [100, 200, 300], + ... "t__s__hvac": [3600, 3600, 3600], + ... } + ... ) + >>> + >>> energy_calc = ExpressionCombine( + ... columns_dict={"P": "P__W__hvac", "t": "t__s__hvac"}, + ... expression="P * t", + ... result_column_name="E__J__hvac", + ... ) + >>> + >>> result = energy_calc.fit_transform(df_simple) + >>> # Creates single column: E__J__hvac """ def __init__( @@ -2041,30 +2108,118 @@ def __init__( result_column_name: str, drop_columns: bool = False, ): - BaseProcessing.__init__(self, required_columns=list(columns_dict.values())) self.columns_dict = columns_dict - self.required_columns = list(columns_dict.values()) self.expression = expression self.result_column_name = result_column_name self.drop_columns = drop_columns + BaseProcessing.__init__(self, required_columns=[]) + + def _resolve_column_groups(self, X: pd.DataFrame) -> tuple[list[dict], list[str]]: + """Resolve tag patterns to column groups and validate broadcasting rules. + + Returns + ------- + column_groups : list[dict] + List of dicts mapping variable names to actual column names for each group. + result_columns : list[str] + List of result column names with appropriate suffixes. + """ + resolved = {} + for var, tag_pattern in self.columns_dict.items(): + matched_cols = tide_request(X.columns, tag_pattern) + if not matched_cols: + raise ValueError( + f"Tag pattern '{tag_pattern}' for variable '{var}' " + f"matched no columns in DataFrame" + ) + resolved[var] = matched_cols + + counts = {var: len(cols) for var, cols in resolved.items()} + non_one_counts = [c for c in counts.values() if c > 1] + + if not non_one_counts: + n_groups = 1 + else: + if len(set(non_one_counts)) > 1: + raise ValueError( + f"Incompatible column counts for broadcasting: {counts}. " + f"All counts must be 1 or equal to each other." + ) + n_groups = non_one_counts[0] + + column_groups = [] + result_columns = [] + + for i in range(n_groups): + group = {} + for var, cols in resolved.items(): + # Broadcast single columns across all groups + group[var] = cols[0] if len(cols) == 1 else cols[i] + column_groups.append(group) + + # Extract sub-bloc suffix from any multi-column variable + suffix = "" + for var, cols in resolved.items(): + if len(cols) > 1: + col_name = cols[i] + tag_pattern = self.columns_dict[var] + parts = col_name.split("__") + pattern_parts = tag_pattern.split("__") + if len(parts) > len(pattern_parts): + suffix = "__" + "__".join(parts[len(pattern_parts) :]) + break + + result_columns.append(self.result_column_name + suffix) + + return column_groups, result_columns + def _fit_implementation(self, X, y=None): + """Fit the transformer by resolving column groups.""" + self.column_groups_, self.result_columns_ = self._resolve_column_groups(X) + + # Update required columns for BaseProcessing + all_cols = set() + for group in self.column_groups_: + all_cols.update(group.values()) + self.required_columns = list(all_cols) + if self.drop_columns: - self.feature_names_out_ = list(X.columns.drop(self.required_columns)) - if self.result_column_name in self.feature_names_out_: - raise ValueError( - f"label_name {self.result_column_name} already in X columns. " - f"It cannot be overwritten" - ) - self.feature_names_out_.append(self.result_column_name) + self.feature_names_out_ = [ + col for col in X.columns if col not in self.required_columns + ] + else: + self.feature_names_out_ = list(X.columns) - def _transform_implementation(self, X: pd.Series | pd.DataFrame): - X.loc[:, self.result_column_name] = pd.eval( - self.expression, - target=X, - local_dict={var: X[col] for var, col in self.columns_dict.items()}, + # Check for conflicts + for result_col in self.result_columns_: + if result_col in self.feature_names_out_: + raise ValueError( + f"Result column '{result_col}' already exists in DataFrame. " + f"It cannot be overwritten." + ) + + self.feature_names_out_.extend(self.result_columns_) + + return self + + def _transform_implementation(self, X: pd.DataFrame) -> pd.DataFrame: + result = ( + X + if not self.drop_columns + else X[[col for col in X.columns if col not in self.required_columns]] ) - return X[self.feature_names_out_] + + for group, result_col in zip(self.column_groups_, self.result_columns_): + # Build local_dict for pd.eval + local_dict = {var: X[col].values for var, col in group.items()} + + result.loc[:, result_col] = pd.eval( + self.expression, + local_dict=local_dict, + ) + + return result[self.feature_names_out_] class FillOikoMeteo(BaseFiller, BaseOikoMeteo, BaseProcessing):