From b3babc3d52181c239a70429bb16354fd14b7bd8c Mon Sep 17 00:00:00 2001 From: Oleksandr Shchur Date: Wed, 24 Jun 2026 09:10:33 +0000 Subject: [PATCH 1/6] Add Toto 2.0 model wrapper --- models/toto-2.0/model.py | 133 +++++++++++++++++++++++++++++++ models/toto-2.0/requirements.txt | 1 + 2 files changed, 134 insertions(+) create mode 100644 models/toto-2.0/model.py create mode 100644 models/toto-2.0/requirements.txt diff --git a/models/toto-2.0/model.py b/models/toto-2.0/model.py new file mode 100644 index 0000000..f7b8d3f --- /dev/null +++ b/models/toto-2.0/model.py @@ -0,0 +1,133 @@ +import datasets +import numpy as np + +import fev + +# Toto 2.0 always returns these nine quantile levels; arbitrary task levels are interpolated from them. +TOTO_QUANTILES = np.array([0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9]) + + +class Toto2Model(fev.ForecastingModel): + """Toto 2.0 model from https://github.com/DataDog/toto (https://pypi.org/project/toto-2).""" + + model_name = "toto-2.0" + trained_on_datasets = [ + "favorita_transactions_1D", + "fred_md_2025", + "proenfo_gfc12", + "proenfo_gfc14", + "proenfo_gfc17", + "kdd_cup_2022_10T", + "m5_1D", + ] + + def __init__( + self, + model_path: str = "Datadog/Toto-2.0-22m", + max_batch_variate_size: int = 24, + max_context_length: int = 4096, + decode_block_size: int | None = None, + device: str = "auto", + seed: int = 42, + ): + super().__init__() + self.model_path = model_path + self.max_batch_variate_size = max_batch_variate_size + self.max_context_length = max_context_length + self.decode_block_size = decode_block_size + self.device = device + self.seed = seed + + def _fit_predict(self, task: fev.Task) -> list[datasets.DatasetDict]: + import torch + from toto2 import Toto2Model as Toto2 + + torch.manual_seed(self.seed) + if self.device == "auto": + self.device = "cuda" if torch.cuda.is_available() else "cpu" + + model = Toto2.from_pretrained(fev.utils.maybe_cache_from_s3(self.model_path)) + model = model.to(self.device).eval() + + return [self._predict_window(window, model, task.quantile_levels) for window in task.iter_windows()] + + def _predict_window( + self, + window: fev.EvaluationWindow, + model, + quantile_levels: list[float], + ) -> datasets.DatasetDict: + import torch + + target_columns = window.target_columns + num_variates = len(target_columns) + + past_data, _ = window.get_input_data() + past_data = past_data.select_columns(target_columns).cast( + datasets.Features({col: datasets.Sequence(datasets.Value("float32")) for col in target_columns}) + ) + # One tensor of shape (num_variates, context_length) per time series item. + series = [torch.tensor(np.stack(list(row.values())), dtype=torch.float32) for row in past_data] + + batch_size = max(1, self.max_batch_variate_size // num_variates) + forecasts: list[np.ndarray] = [] # each entry: (num_quantiles, batch, num_variates, horizon) + with self._record_inference_time(): + for batch in _batchify(series, batch_size): + target, mask = _left_pad_and_stack(batch, self.max_context_length, self.device) + series_ids = torch.zeros(len(batch), num_variates, dtype=torch.long, device=self.device) + + quantiles = model.forecast( + {"target": target, "target_mask": mask, "series_ids": series_ids}, + horizon=window.horizon, + decode_block_size=self.decode_block_size, + has_missing_values=not bool(mask.all()), + ) + forecasts.append(quantiles.cpu().numpy()) + + # (num_quantiles, num_items, num_variates, horizon) + quantiles = np.concatenate(forecasts, axis=1) + predictions = { + variate: {"predictions": _interp(quantiles, 0.5)[:, i]} for i, variate in enumerate(target_columns) + } + for q in quantile_levels: + q_forecast = _interp(quantiles, q) + for i, variate in enumerate(target_columns): + predictions[variate][str(q)] = q_forecast[:, i] + + result = datasets.DatasetDict( + {variate: datasets.Dataset.from_dict(preds) for variate, preds in predictions.items()} + ) + result.set_format("numpy") + return result + + +def _batchify(items: list, batch_size: int): + for i in range(0, len(items), batch_size): + yield items[i : i + batch_size] + + +def _left_pad_and_stack(series: list, max_context_length: int, device: str): + """Left-pad a batch of (num_variates, time) tensors to a common length and return (target, mask).""" + import torch + + series = [s[..., -max_context_length:] for s in series] + context_length = max(s.shape[-1] for s in series) + targets, masks = [], [] + for s in series: + pad = context_length - s.shape[-1] + nan_mask = torch.isnan(s) + targets.append(torch.nn.functional.pad(s.nan_to_num(0.0), (pad, 0))) + masks.append(torch.nn.functional.pad(~nan_mask, (pad, 0))) # padded and NaN positions are masked out + return torch.stack(targets).to(device), torch.stack(masks).to(device) + + +def _interp(quantiles: np.ndarray, level: float) -> np.ndarray: + """Linearly interpolate a quantile `level` from the model's fixed `TOTO_QUANTILES` (along axis 0).""" + if level <= TOTO_QUANTILES[0]: + return quantiles[0] + if level >= TOTO_QUANTILES[-1]: + return quantiles[-1] + hi = int(np.searchsorted(TOTO_QUANTILES, level)) + lo = hi - 1 + weight = (level - TOTO_QUANTILES[lo]) / (TOTO_QUANTILES[hi] - TOTO_QUANTILES[lo]) + return quantiles[lo] * (1 - weight) + quantiles[hi] * weight diff --git a/models/toto-2.0/requirements.txt b/models/toto-2.0/requirements.txt new file mode 100644 index 0000000..84cdc03 --- /dev/null +++ b/models/toto-2.0/requirements.txt @@ -0,0 +1 @@ +toto-2==2.0.0 From bc54a124827f5ac42f23458bb4b96f85fb9e7d67 Mon Sep 17 00:00:00 2001 From: Oleksandr Shchur Date: Wed, 24 Jun 2026 09:14:40 +0000 Subject: [PATCH 2/6] Round Toto 2.0 context length up to a multiple of patch_size --- models/toto-2.0/model.py | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/models/toto-2.0/model.py b/models/toto-2.0/model.py index f7b8d3f..67e6d03 100644 --- a/models/toto-2.0/model.py +++ b/models/toto-2.0/model.py @@ -73,7 +73,9 @@ def _predict_window( forecasts: list[np.ndarray] = [] # each entry: (num_quantiles, batch, num_variates, horizon) with self._record_inference_time(): for batch in _batchify(series, batch_size): - target, mask = _left_pad_and_stack(batch, self.max_context_length, self.device) + target, mask = _left_pad_and_stack( + batch, self.max_context_length, model.config.patch_size, self.device + ) series_ids = torch.zeros(len(batch), num_variates, dtype=torch.long, device=self.device) quantiles = model.forecast( @@ -106,12 +108,17 @@ def _batchify(items: list, batch_size: int): yield items[i : i + batch_size] -def _left_pad_and_stack(series: list, max_context_length: int, device: str): - """Left-pad a batch of (num_variates, time) tensors to a common length and return (target, mask).""" +def _left_pad_and_stack(series: list, max_context_length: int, patch_size: int, device: str): + """Left-pad a batch of (num_variates, time) tensors to a common length and return (target, mask). + + The context length is rounded up to a multiple of ``patch_size`` (required by Toto's patch embedding); + the extra positions are masked out. + """ import torch series = [s[..., -max_context_length:] for s in series] - context_length = max(s.shape[-1] for s in series) + longest = max(s.shape[-1] for s in series) + context_length = -(-longest // patch_size) * patch_size # round up to a multiple of patch_size targets, masks = [], [] for s in series: pad = context_length - s.shape[-1] From ca4f93e48176044e3f143150e7814ccc37cb73ae Mon Sep 17 00:00:00 2001 From: Oleksandr Shchur Date: Wed, 24 Jun 2026 09:44:18 +0000 Subject: [PATCH 3/6] Use Toto2GluonTSModel predictor instead of hand-rolled inference - Reuse Datadog's GluonTS integration (ffill imputation, scaler fallback, patch-size rounding) via fev's gluonts adapter, mirroring the moirai wrapper. - trained_on_datasets = [] (Toto 2.0 has no overlap with fev datasets). - Drop the seed arg (model is deterministic). --- models/toto-2.0/model.py | 166 ++++++++++++++------------------------- 1 file changed, 57 insertions(+), 109 deletions(-) diff --git a/models/toto-2.0/model.py b/models/toto-2.0/model.py index 67e6d03..ce32b0d 100644 --- a/models/toto-2.0/model.py +++ b/models/toto-2.0/model.py @@ -1,140 +1,88 @@ +import logging +import warnings + import datasets import numpy as np import fev -# Toto 2.0 always returns these nine quantile levels; arbitrary task levels are interpolated from them. -TOTO_QUANTILES = np.array([0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9]) - class Toto2Model(fev.ForecastingModel): """Toto 2.0 model from https://github.com/DataDog/toto (https://pypi.org/project/toto-2).""" model_name = "toto-2.0" - trained_on_datasets = [ - "favorita_transactions_1D", - "fred_md_2025", - "proenfo_gfc12", - "proenfo_gfc14", - "proenfo_gfc17", - "kdd_cup_2022_10T", - "m5_1D", - ] + trained_on_datasets = [] def __init__( self, model_path: str = "Datadog/Toto-2.0-22m", - max_batch_variate_size: int = 24, - max_context_length: int = 4096, + batch_size: int = 128, + context_length: int = 4096, decode_block_size: int | None = None, + as_univariate: bool = False, device: str = "auto", - seed: int = 42, ): super().__init__() self.model_path = model_path - self.max_batch_variate_size = max_batch_variate_size - self.max_context_length = max_context_length + self.batch_size = batch_size + self.context_length = context_length self.decode_block_size = decode_block_size + self.as_univariate = as_univariate self.device = device - self.seed = seed def _fit_predict(self, task: fev.Task) -> list[datasets.DatasetDict]: import torch - from toto2 import Toto2Model as Toto2 + from toto2 import Toto2GluonTSModel, Toto2GluonTSModelConfig, Toto2Model - torch.manual_seed(self.seed) if self.device == "auto": self.device = "cuda" if torch.cuda.is_available() else "cpu" - model = Toto2.from_pretrained(fev.utils.maybe_cache_from_s3(self.model_path)) - model = model.to(self.device).eval() - - return [self._predict_window(window, model, task.quantile_levels) for window in task.iter_windows()] - - def _predict_window( - self, - window: fev.EvaluationWindow, - model, - quantile_levels: list[float], - ) -> datasets.DatasetDict: - import torch + target_columns = ["target"] if self.as_univariate else task.target_columns - target_columns = window.target_columns - num_variates = len(target_columns) - - past_data, _ = window.get_input_data() - past_data = past_data.select_columns(target_columns).cast( - datasets.Features({col: datasets.Sequence(datasets.Value("float32")) for col in target_columns}) + model = Toto2Model.from_pretrained(fev.utils.maybe_cache_from_s3(self.model_path)) + config = Toto2GluonTSModelConfig( + prediction_length=task.horizon, + context_length=self.context_length, + target_dim=len(target_columns), + decode_block_size=self.decode_block_size, + quantiles=task.quantile_levels, ) - # One tensor of shape (num_variates, context_length) per time series item. - series = [torch.tensor(np.stack(list(row.values())), dtype=torch.float32) for row in past_data] - - batch_size = max(1, self.max_batch_variate_size // num_variates) - forecasts: list[np.ndarray] = [] # each entry: (num_quantiles, batch, num_variates, horizon) - with self._record_inference_time(): - for batch in _batchify(series, batch_size): - target, mask = _left_pad_and_stack( - batch, self.max_context_length, model.config.patch_size, self.device + gts_model = Toto2GluonTSModel(model.to(self.device).eval(), config) + predictor = gts_model.create_predictor(batch_size=self.batch_size, device=self.device) + + predictions_per_window = [] + for window in task.iter_windows(): + _, prediction_dataset = fev.convert_input_data(window, adapter="gluonts", as_univariate=self.as_univariate) + with self._record_inference_time(): + with warnings.catch_warnings(): + warnings.simplefilter("ignore", RuntimeWarning) + forecasts = list(predictor.predict(prediction_dataset)) + + flat_predictions = self._flatten_forecasts(forecasts, quantile_levels=task.quantile_levels) + predictions_per_window.append( + fev.utils.combine_univariate_predictions_to_multivariate( + flat_predictions, target_columns=task.target_columns ) - series_ids = torch.zeros(len(batch), num_variates, dtype=torch.long, device=self.device) - - quantiles = model.forecast( - {"target": target, "target_mask": mask, "series_ids": series_ids}, - horizon=window.horizon, - decode_block_size=self.decode_block_size, - has_missing_values=not bool(mask.all()), - ) - forecasts.append(quantiles.cpu().numpy()) - - # (num_quantiles, num_items, num_variates, horizon) - quantiles = np.concatenate(forecasts, axis=1) - predictions = { - variate: {"predictions": _interp(quantiles, 0.5)[:, i]} for i, variate in enumerate(target_columns) - } - for q in quantile_levels: - q_forecast = _interp(quantiles, q) - for i, variate in enumerate(target_columns): - predictions[variate][str(q)] = q_forecast[:, i] - - result = datasets.DatasetDict( - {variate: datasets.Dataset.from_dict(preds) for variate, preds in predictions.items()} - ) - result.set_format("numpy") - return result - - -def _batchify(items: list, batch_size: int): - for i in range(0, len(items), batch_size): - yield items[i : i + batch_size] - - -def _left_pad_and_stack(series: list, max_context_length: int, patch_size: int, device: str): - """Left-pad a batch of (num_variates, time) tensors to a common length and return (target, mask). - - The context length is rounded up to a multiple of ``patch_size`` (required by Toto's patch embedding); - the extra positions are masked out. - """ - import torch - - series = [s[..., -max_context_length:] for s in series] - longest = max(s.shape[-1] for s in series) - context_length = -(-longest // patch_size) * patch_size # round up to a multiple of patch_size - targets, masks = [], [] - for s in series: - pad = context_length - s.shape[-1] - nan_mask = torch.isnan(s) - targets.append(torch.nn.functional.pad(s.nan_to_num(0.0), (pad, 0))) - masks.append(torch.nn.functional.pad(~nan_mask, (pad, 0))) # padded and NaN positions are masked out - return torch.stack(targets).to(device), torch.stack(masks).to(device) - - -def _interp(quantiles: np.ndarray, level: float) -> np.ndarray: - """Linearly interpolate a quantile `level` from the model's fixed `TOTO_QUANTILES` (along axis 0).""" - if level <= TOTO_QUANTILES[0]: - return quantiles[0] - if level >= TOTO_QUANTILES[-1]: - return quantiles[-1] - hi = int(np.searchsorted(TOTO_QUANTILES, level)) - lo = hi - 1 - weight = (level - TOTO_QUANTILES[lo]) / (TOTO_QUANTILES[hi] - TOTO_QUANTILES[lo]) - return quantiles[lo] * (1 - weight) + quantiles[hi] * weight + ) + return predictions_per_window + + @staticmethod + def _flatten_forecasts(forecasts: list, quantile_levels: list[float]) -> datasets.Dataset: + """Flatten GluonTS forecasts into per-variate univariate predictions, interleaved by variate. + + Each (possibly multivariate) forecast is split into one univariate prediction per variate, so the + result is ordered as ``[item0_var0, item0_var1, ..., item1_var0, ...]`` -- the layout expected by + `combine_univariate_predictions_to_multivariate`. + """ + logging.getLogger("gluonts").setLevel(100) + + # The 0.5 quantile is used as the point forecast (Toto 2.0 is quantile-based and has no mean prediction). + forecast_keys = {"predictions": 0.5, **{str(q): q for q in quantile_levels}} + columns = {key: [] for key in forecast_keys} + for f in forecasts: + for key, q in forecast_keys.items(): + arr = np.asarray(f.quantile(q)) # (horizon,) univariate or (horizon, n_variates) multivariate + if arr.ndim == 1: + arr = arr[:, None] + columns[key].extend(arr.T) + return datasets.Dataset.from_dict({key: np.stack(values) for key, values in columns.items()}) From 1620f27992f3e6490a399574fa230126c429dbe0 Mon Sep 17 00:00:00 2001 From: Oleksandr Shchur Date: Wed, 24 Jun 2026 09:54:05 +0000 Subject: [PATCH 4/6] Alias imported Toto2Model to avoid shadowing the wrapper class --- models/toto-2.0/model.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/models/toto-2.0/model.py b/models/toto-2.0/model.py index ce32b0d..a9050eb 100644 --- a/models/toto-2.0/model.py +++ b/models/toto-2.0/model.py @@ -32,14 +32,15 @@ def __init__( def _fit_predict(self, task: fev.Task) -> list[datasets.DatasetDict]: import torch - from toto2 import Toto2GluonTSModel, Toto2GluonTSModelConfig, Toto2Model + from toto2 import Toto2GluonTSModel, Toto2GluonTSModelConfig + from toto2 import Toto2Model as PretrainedToto2 if self.device == "auto": self.device = "cuda" if torch.cuda.is_available() else "cpu" target_columns = ["target"] if self.as_univariate else task.target_columns - model = Toto2Model.from_pretrained(fev.utils.maybe_cache_from_s3(self.model_path)) + model = PretrainedToto2.from_pretrained(fev.utils.maybe_cache_from_s3(self.model_path)) config = Toto2GluonTSModelConfig( prediction_length=task.horizon, context_length=self.context_length, From 801497e7acc0658cbe9be5adaf4b0daae76d0f47 Mon Sep 17 00:00:00 2001 From: Oleksandr Shchur Date: Wed, 24 Jun 2026 11:12:06 +0000 Subject: [PATCH 5/6] Refactor prediction formatting; align defaults with GIFT-Eval notebook - Split univariate/multivariate post-processing into _format_predictions (no more flatten-then-recombine round-trip for the multivariate path). - batch_size 128 -> 512 to match the GIFT-Eval benchmark notebook (context_length=4096 and model_path=Toto-2.0-22m already matched). --- models/toto-2.0/model.py | 54 +++++++++++++++++++--------------------- 1 file changed, 26 insertions(+), 28 deletions(-) diff --git a/models/toto-2.0/model.py b/models/toto-2.0/model.py index a9050eb..64242fb 100644 --- a/models/toto-2.0/model.py +++ b/models/toto-2.0/model.py @@ -16,7 +16,7 @@ class Toto2Model(fev.ForecastingModel): def __init__( self, model_path: str = "Datadog/Toto-2.0-22m", - batch_size: int = 128, + batch_size: int = 512, context_length: int = 4096, decode_block_size: int | None = None, as_univariate: bool = False, @@ -51,6 +51,10 @@ def _fit_predict(self, task: fev.Task) -> list[datasets.DatasetDict]: gts_model = Toto2GluonTSModel(model.to(self.device).eval(), config) predictor = gts_model.create_predictor(batch_size=self.batch_size, device=self.device) + logging.getLogger("gluonts").setLevel(100) + # The 0.5 quantile is used as the point forecast (Toto 2.0 is quantile-based and has no mean prediction). + forecast_keys = {"predictions": 0.5, **{str(q): q for q in task.quantile_levels}} + predictions_per_window = [] for window in task.iter_windows(): _, prediction_dataset = fev.convert_input_data(window, adapter="gluonts", as_univariate=self.as_univariate) @@ -58,32 +62,26 @@ def _fit_predict(self, task: fev.Task) -> list[datasets.DatasetDict]: with warnings.catch_warnings(): warnings.simplefilter("ignore", RuntimeWarning) forecasts = list(predictor.predict(prediction_dataset)) - - flat_predictions = self._flatten_forecasts(forecasts, quantile_levels=task.quantile_levels) - predictions_per_window.append( - fev.utils.combine_univariate_predictions_to_multivariate( - flat_predictions, target_columns=task.target_columns - ) - ) + predictions_per_window.append(self._format_predictions(forecasts, task, forecast_keys)) return predictions_per_window - @staticmethod - def _flatten_forecasts(forecasts: list, quantile_levels: list[float]) -> datasets.Dataset: - """Flatten GluonTS forecasts into per-variate univariate predictions, interleaved by variate. - - Each (possibly multivariate) forecast is split into one univariate prediction per variate, so the - result is ordered as ``[item0_var0, item0_var1, ..., item1_var0, ...]`` -- the layout expected by - `combine_univariate_predictions_to_multivariate`. - """ - logging.getLogger("gluonts").setLevel(100) - - # The 0.5 quantile is used as the point forecast (Toto 2.0 is quantile-based and has no mean prediction). - forecast_keys = {"predictions": 0.5, **{str(q): q for q in quantile_levels}} - columns = {key: [] for key in forecast_keys} - for f in forecasts: - for key, q in forecast_keys.items(): - arr = np.asarray(f.quantile(q)) # (horizon,) univariate or (horizon, n_variates) multivariate - if arr.ndim == 1: - arr = arr[:, None] - columns[key].extend(arr.T) - return datasets.Dataset.from_dict({key: np.stack(values) for key, values in columns.items()}) + def _format_predictions( + self, forecasts: list, task: fev.Task, forecast_keys: dict[str, float] + ) -> datasets.DatasetDict: + """Format GluonTS forecasts into a `DatasetDict` keyed by target column, as expected by fev.""" + if self.as_univariate: + # One univariate forecast per (item, variate), interleaved by variate; `f.quantile(q)` is (horizon,). + flat = datasets.Dataset.from_dict( + {key: np.stack([f.quantile(q) for f in forecasts]) for key, q in forecast_keys.items()} + ) + return fev.utils.combine_univariate_predictions_to_multivariate(flat, target_columns=task.target_columns) + else: + # One multivariate forecast per item; `f.quantile(q)` is (horizon, n_variates). + return datasets.DatasetDict( + { + col: datasets.Dataset.from_dict( + {key: np.stack([f.quantile(q)[:, i] for f in forecasts]) for key, q in forecast_keys.items()} + ) + for i, col in enumerate(task.target_columns) + } + ) From b7e59b42281e3428f7c7a306a4c55559635fdeed Mon Sep 17 00:00:00 2001 From: Oleksandr Shchur Date: Wed, 24 Jun 2026 12:30:36 +0000 Subject: [PATCH 6/6] Fix single-target tasks: model squeezes the variate axis when target_dim=1 --- models/toto-2.0/model.py | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/models/toto-2.0/model.py b/models/toto-2.0/model.py index 64242fb..02fdd9b 100644 --- a/models/toto-2.0/model.py +++ b/models/toto-2.0/model.py @@ -76,12 +76,15 @@ def _format_predictions( ) return fev.utils.combine_univariate_predictions_to_multivariate(flat, target_columns=task.target_columns) else: - # One multivariate forecast per item; `f.quantile(q)` is (horizon, n_variates). + # One forecast per item, reshaped to (num_items, horizon, n_variates). The model squeezes the + # variate axis for single-target tasks, so `f.quantile(q)` is (horizon,) there and (horizon, n_var) else. + quantiles = { + key: np.stack([f.quantile(q).reshape(task.horizon, -1) for f in forecasts]) + for key, q in forecast_keys.items() + } return datasets.DatasetDict( { - col: datasets.Dataset.from_dict( - {key: np.stack([f.quantile(q)[:, i] for f in forecasts]) for key, q in forecast_keys.items()} - ) + col: datasets.Dataset.from_dict({key: arr[..., i] for key, arr in quantiles.items()}) for i, col in enumerate(task.target_columns) } )