Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 41 additions & 17 deletions tests/test_processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -798,57 +798,81 @@ def test_pd_fill_gap(self):
def test_combiner(self):
test_df = pd.DataFrame(
{
"Tin__°C__building": [10.0, 20.0, 30.0],
"Text__°C__outdoor": [-1.0, 5.0, 4.0],
"radiation__W/m2__outdoor": [50, 100, 400],
"Humidity__%HR": [10, 15, 13],
"Humidity__%HR__room1": [20, 30, 50],
"Humidity_2": [10, 15, 13],
"light__DIMENSIONLESS__building": [100, 200, 300],
"mass_flwr__m3/h__hvac": [300, 500, 600],
"Tin__°C__building__room_1": [10.0, 20.0, 30.0],
"Tin__°C__building__room_2": [20.0, 40.0, 60.0],
"Text__°C__outdoor__meteo": [-1.0, 5.0, 4.0],
"radiation__W/m2__outdoor__meteo": [50, 100, 400],
"Humidity__%HR__building__room_1": [10, 15, 13],
"Humidity__%HR__building__room_2": [20, 30, 50],
"Humidity__%HR__outdoor__meteo": [10, 15, 13],
"light__DIMENSIONLESS__building__room_1": [100, 200, 300],
"mass_flwr__m3/h__hvac__pump": [300, 500, 600],
},
index=pd.date_range("2009", freq="h", periods=3, tz="UTC"),
)

combiner = ExpressionCombine(
columns_dict={
"T1": "Tin__°C__building",
"T2": "Text__°C__outdoor",
"m": "mass_flwr__m3/h__hvac",
"T1": "Tin__°C__building__room_1",
"T2": "Text__°C__outdoor__meteo",
"m": "mass_flwr__m3/h__hvac__pump",
},
expression="(T1 - T2) * m * 1004 * 1.204",
result_column_name="loss_ventilation__J__hvac",
result_column_name="loss_ventilation__J__building__room_1",
)

res = combiner.fit_transform(test_df.copy())
check_feature_names_out(combiner, res)
np.testing.assert_almost_equal(
res["loss_ventilation__J__hvac"],
res["loss_ventilation__J__building__room_1"],
[3989092.8, 9066120.0, 18857529.6],
decimal=1,
)

combiner.set_params(drop_columns=True)
res = combiner.fit_transform(test_df.copy())
assert res.shape == (3, 6)
assert res.shape == (3, 7)
check_feature_names_out(combiner, res)

combiner_cond = ExpressionCombine(
columns_dict={
"T1": "Text__°C__outdoor",
"T1": "Text__°C__outdoor__meteo",
},
expression="(T1 > 10) * 1",
result_column_name="where_test_01__hvac",
result_column_name="where_test_01__meteo__outdoor",
)

res = combiner_cond.fit_transform(test_df.copy())
check_feature_names_out(combiner_cond, res)
np.testing.assert_almost_equal(
res["where_test_01__hvac"],
res["where_test_01__meteo__outdoor"],
[0, 0, 0],
decimal=1,
)

combiner_broadcast = ExpressionCombine(
columns_dict={
"T_in": "Tin__°C__building",
"T_ext": "Text__°C",
"m": "mass_flwr__m3/h__hvac__pump",
},
expression="(T_in - T_ext) * m * 1004 * 1.204",
result_column_name="loss_ventilation__J__building",
)

res = combiner_broadcast.fit_transform(test_df.copy())
np.testing.assert_almost_equal(
res["loss_ventilation__J__building__room_1"],
[3989092.8, 9066120.0, 18857529.6],
decimal=1,
)

np.testing.assert_almost_equal(
res["loss_ventilation__J__building__room_2"],
[7615540.8, 21154280.0, 40616217.6],
decimal=1,
)

@patch("tide.base.get_oikolab_df", side_effect=mock_get_oikolab_df)
def test_fill_oiko_meteo(self, mock_get_oikolab):
data = pd.read_csv(
Expand Down
237 changes: 196 additions & 41 deletions tide/processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -1972,66 +1972,133 @@ def _transform_implementation(self, X: pd.Series | pd.DataFrame):
class ExpressionCombine(BaseProcessing):
"""A transformer that combines DataFrame columns using a mathematical expression.

This transformer evaluates a mathematical expression using specified columns from a DataFrame,
creating a new column with the result. It supports both simple aggregations and complex
physical expressions, with the option to drop the source columns after computation.
This transformer evaluates a mathematical expression using specified columns
from a DataFrame, creating new columns with the results.
It supports broadcasting: when a tag selector matches multiple columns,
the expression is applied to each matching group. Selectors matching a
single column are broadcast across all groups.

Parameters
----------
columns_dict : dict[str, str]
Dictionary mapping expression variables to DataFrame column names.
Dictionary mapping expression variables to TIDE-style column tag patterns.
Keys are the variable names used in the expression, and values are the
corresponding column names in the DataFrame.
tag patterns to match columns (e.g., "Tin__°C__building").

expression : str
Mathematical expression to evaluate, using variables defined in columns_dict.
The expression should be a valid Python mathematical expression that can be
evaluated using pandas.eval().
The expression must be a valid Python expression compatible with pandas.eval().
Supports standard mathematical operators and numpy functions.

result_column_name : str
Name of the new column that will contain the evaluated expression result.
Must not already exist in the DataFrame.
Base name for result columns following TIDE naming convention.
When multiple column groups are found, the sub-bloc suffix from matched
columns (e.g., "__room_1", "__room_2") is automatically appended.

drop_columns : bool, default=False
Whether to drop the source columns used in the expression after computation.
If True, only the result column and other non-source columns are kept.
If True, only the newly created result columns and other unused columns are kept.

Attributes
----------
column_groups_ : list[dict]
List of dictionaries mapping variable names to actual column names for each group.
Created during fit().

result_columns_ : list[str]
List of result column names with appropriate suffixes. Created during fit().

required_columns : list[str]
List of all source columns used in the expression. Updated during fit().

feature_names_out_ : list[str]
List of column names in the transformed DataFrame. If drop_columns is True,
excludes the source columns used in the expression.
List of all column names in the transformed DataFrame.

Notes
-----
Broadcasting Rules:
- If all tag patterns match exactly one column, a single result column is created.
- If some patterns match multiple columns, all multi-column patterns must match
the same number of columns.
- Single-column matches are broadcast (reused) across all groups.
- The sub-bloc suffix is extracted from the first multi-column variable found.

Column Naming:
- Result columns follow the pattern: name__unit__bloc__sub-bloc
- The sub-bloc suffix is extracted by comparing matched column names with
their tag patterns.
- If no multi-column variables exist, no suffix is added.

Raises
------
ValueError
If result_column_name already exists in the DataFrame.
- If a tag pattern matches no columns in the DataFrame.
- If tag patterns resolve to incompatible column counts (multiple patterns
match different numbers of columns, none being 1).
- If result_column_name (with suffix) already exists in the output DataFrame.

Examples
--------
>>> from tide.processing import ExpressionCombine
Basic usage with broadcasting:

>>> import pandas as pd
>>> # Create sample data
>>> df = pd.DataFrame(
... {
... "Tin__°C__building": [20, 21, 22],
... "Text__°C__outdoor": [10, 11, 12],
... "mass_flwr__m3/h__hvac": [1, 2, 3],
... "Tin__°C__building__room_1": [20, 21, 22],
... "Tin__°C__building__room_2": [25, 26, 27],
... "Text__°C__outdoor__meteo": [10, 11, 12],
... "mass_flwr__kg/s__hvac__pump": [0.1, 0.2, 0.3],
... }
... )
>>> # Calculate ventilation losses
>>>
>>> # Calculate ventilation heat loss for each room
>>> # Text and mass_flwr are broadcast to both rooms
>>> combiner = ExpressionCombine(
... columns_dict={
... "T1": "Tin__°C__building",
... "T2": "Text__°C__outdoor",
... "m": "mass_flwr__m3/h__hvac",
... "Tin": "Tin__°C__building",
... "Text": "Text__°C__outdoor__meteo",
... "mdot": "mass_flwr__kg/s__hvac__pump",
... },
... expression="(T1 - T2) * m * 1004 * 1.204",
... result_column_name="loss_ventilation__J__hvac",
... drop_columns=True,
... expression="(Tin - Text) * mdot * 1004", # Q = ΔT × ṁ × cp
... result_column_name="Q_vent__W__building",
... )
>>> # Transform the data
>>>
>>> result = combiner.fit_transform(df)
>>> # Creates: Q_vent__W__building__room_1 and Q_vent__W__building__room_2

Using drop_columns to keep only results:

>>> combiner_clean = ExpressionCombine(
... columns_dict={
... "Tin": "Tin__°C__building",
... "Text": "Text__°C__outdoor__meteo",
... },
... expression="Tin - Text",
... result_column_name="deltaT__K__building",
... drop_columns=True,
... )
>>>
>>> result = combiner_clean.fit_transform(df)
>>> # Only deltaT__K__building__room_1, deltaT__K__building__room_2,
>>> # and mass_flwr__kg/s__hvac__pump remain

Single column operation (no broadcasting):

>>> df_simple = pd.DataFrame(
... {
... "P__W__hvac": [100, 200, 300],
... "t__s__hvac": [3600, 3600, 3600],
... }
... )
>>>
>>> energy_calc = ExpressionCombine(
... columns_dict={"P": "P__W__hvac", "t": "t__s__hvac"},
... expression="P * t",
... result_column_name="E__J__hvac",
... )
>>>
>>> result = energy_calc.fit_transform(df_simple)
>>> # Creates single column: E__J__hvac
"""

def __init__(
Expand All @@ -2041,30 +2108,118 @@ def __init__(
result_column_name: str,
drop_columns: bool = False,
):
BaseProcessing.__init__(self, required_columns=list(columns_dict.values()))
self.columns_dict = columns_dict
self.required_columns = list(columns_dict.values())
self.expression = expression
self.result_column_name = result_column_name
self.drop_columns = drop_columns

BaseProcessing.__init__(self, required_columns=[])

def _resolve_column_groups(self, X: pd.DataFrame) -> tuple[list[dict], list[str]]:
"""Resolve tag patterns to column groups and validate broadcasting rules.

Returns
-------
column_groups : list[dict]
List of dicts mapping variable names to actual column names for each group.
result_columns : list[str]
List of result column names with appropriate suffixes.
"""
resolved = {}
for var, tag_pattern in self.columns_dict.items():
matched_cols = tide_request(X.columns, tag_pattern)
if not matched_cols:
raise ValueError(
f"Tag pattern '{tag_pattern}' for variable '{var}' "
f"matched no columns in DataFrame"
)
resolved[var] = matched_cols

counts = {var: len(cols) for var, cols in resolved.items()}
non_one_counts = [c for c in counts.values() if c > 1]

if not non_one_counts:
n_groups = 1
else:
if len(set(non_one_counts)) > 1:
raise ValueError(
f"Incompatible column counts for broadcasting: {counts}. "
f"All counts must be 1 or equal to each other."
)
n_groups = non_one_counts[0]

column_groups = []
result_columns = []

for i in range(n_groups):
group = {}
for var, cols in resolved.items():
# Broadcast single columns across all groups
group[var] = cols[0] if len(cols) == 1 else cols[i]
column_groups.append(group)

# Extract sub-bloc suffix from any multi-column variable
suffix = ""
for var, cols in resolved.items():
if len(cols) > 1:
col_name = cols[i]
tag_pattern = self.columns_dict[var]
parts = col_name.split("__")
pattern_parts = tag_pattern.split("__")
if len(parts) > len(pattern_parts):
suffix = "__" + "__".join(parts[len(pattern_parts) :])
break

result_columns.append(self.result_column_name + suffix)

return column_groups, result_columns

def _fit_implementation(self, X, y=None):
"""Fit the transformer by resolving column groups."""
self.column_groups_, self.result_columns_ = self._resolve_column_groups(X)

# Update required columns for BaseProcessing
all_cols = set()
for group in self.column_groups_:
all_cols.update(group.values())
self.required_columns = list(all_cols)

if self.drop_columns:
self.feature_names_out_ = list(X.columns.drop(self.required_columns))
if self.result_column_name in self.feature_names_out_:
raise ValueError(
f"label_name {self.result_column_name} already in X columns. "
f"It cannot be overwritten"
)
self.feature_names_out_.append(self.result_column_name)
self.feature_names_out_ = [
col for col in X.columns if col not in self.required_columns
]
else:
self.feature_names_out_ = list(X.columns)

def _transform_implementation(self, X: pd.Series | pd.DataFrame):
X.loc[:, self.result_column_name] = pd.eval(
self.expression,
target=X,
local_dict={var: X[col] for var, col in self.columns_dict.items()},
# Check for conflicts
for result_col in self.result_columns_:
if result_col in self.feature_names_out_:
raise ValueError(
f"Result column '{result_col}' already exists in DataFrame. "
f"It cannot be overwritten."
)

self.feature_names_out_.extend(self.result_columns_)

return self

def _transform_implementation(self, X: pd.DataFrame) -> pd.DataFrame:
result = (
X
if not self.drop_columns
else X[[col for col in X.columns if col not in self.required_columns]]
)
return X[self.feature_names_out_]

for group, result_col in zip(self.column_groups_, self.result_columns_):
# Build local_dict for pd.eval
local_dict = {var: X[col].values for var, col in group.items()}

result.loc[:, result_col] = pd.eval(
self.expression,
local_dict=local_dict,
)

return result[self.feature_names_out_]


class FillOikoMeteo(BaseFiller, BaseOikoMeteo, BaseProcessing):
Expand Down
Loading