From 006a88fc04354b2161b5d64b22091cb372ad95eb Mon Sep 17 00:00:00 2001 From: Christian Lessig Date: Fri, 29 May 2026 16:35:08 +0200 Subject: [PATCH 1/4] Custom data reader for operational analysis that accounts for actual availability --- .../datasets/data_reader_anemoi_operan.py | 179 ++++++++++++++++++ 1 file changed, 179 insertions(+) create mode 100644 src/weathergen/datasets/data_reader_anemoi_operan.py diff --git a/src/weathergen/datasets/data_reader_anemoi_operan.py b/src/weathergen/datasets/data_reader_anemoi_operan.py new file mode 100644 index 000000000..34d3eda44 --- /dev/null +++ b/src/weathergen/datasets/data_reader_anemoi_operan.py @@ -0,0 +1,179 @@ +# (C) Copyright 2025 WeatherGenerator contributors. +# +# This software is licensed under the terms of the Apache Licence Version 2.0 +# which can be obtained at http://www.apache.org/licenses/LICENSE-2.0. +# +# In applying this licence, ECMWF does not waive the privileges and immunities +# granted to it by virtue of its status as an intergovernmental organisation +# nor does it submit to any jurisdiction. + +import logging +from pathlib import Path +from typing import override + +import numpy as np +from anemoi.datasets.data import MissingDateError + +from weathergen.datasets.data_reader_anemoi import DataReaderAnemoi +from weathergen.datasets.data_reader_base import ( + ReaderData, + TimeWindowHandler, + TIndex, +) +from weathergen.train.utils import Stage + +_logger = logging.getLogger(__name__) + + +def dt2cal(dt): + """ + Convert array of datetime64 to a calendar array of year, month, day, hour, + minute, seconds, microsecond with these quantites indexed on the last axis. + + Parameters + ---------- + dt : datetime64 array (...) + numpy.ndarray of datetimes of arbitrary shape + + Returns + ------- + cal : uint32 array (..., 7) + calendar array with last axis representing year, month, day, hour, + minute, second, microsecond + """ + + # allocate output + out = np.empty(dt.shape + (7,), dtype="u4") + # decompose calendar floors + year, month, day, hour, min, sec = [dt.astype(f"M8[{x}]") for x in "YMDhms"] + out[..., 0] = year + 1970 # Gregorian Year + out[..., 1] = (month - year) + 1 # month + out[..., 2] = (day - month) + 1 # dat + out[..., 3] = (dt - day).astype("m8[h]") # hour + out[..., 4] = (dt - hour).astype("m8[m]") # minute + out[..., 5] = (dt - min).astype("m8[s]") # second + out[..., 6] = (dt - sec).astype("m8[us]") # microsecond + return out + + +class DataReaderAnemoiOperan(DataReaderAnemoi): + "Wrapper for Anemoi datasets" + + def __init__( + self, + tw_handler: TimeWindowHandler, + filename: Path, + stream_info: dict, + stage: Stage, + ) -> None: + """ + Construct data reader for anemoi dataset + + Parameters + ---------- + filename : + filename (and path) of dataset + stream_info : + information about stream + + Returns + ------- + None + """ + + super().__init__(tw_handler, filename, stream_info, stage) + + @override + def _get(self, idx: TIndex, channels_idx: list[int]) -> ReaderData: + """ + Get data for window (for either source or target, through public interface) + + Parameters + ---------- + idx : int + Index of temporal window + channels_idx : np.array + Selection of channels + + Returns + ------- + ReaderData providing coords, geoinfos, data, datetimes + """ + + t_idxs, dtr = self._get_dataset_idxs(idx) + # get additional timestep to ensure we have one datapoint available + t_idxs = np.insert(t_idxs, 0, t_idxs[0] - 1) + + didx_start = t_idxs[0] + didx_end = t_idxs[-1] + 1 + datetimes = self.ds.dates[didx_start:didx_end] + datetimes_split = dt2cal(datetimes) + + # compute corrected datetimes that account for actual availability + nts = self.stream_info["nominal_time_mapping"] + deltas = [int(nts[str(hour)]) - int(hour) for hour in datetimes_split[:, 3]] + datetimes_offset = [ + dt + np.timedelta64(delta, "h") for dt, delta in zip(datetimes, deltas, strict=False) + ] + + # use latest available sample that is valid w.r.t the input data window + datetimes_mask = [dt < dtr.end for dt in datetimes_offset] + t_idxs = [t_idxs[datetimes_mask][-1].item()] + assert len(t_idxs) == 1 + + # _get from DataReaderAnemoi + + if self.ds is None or self.len == 0 or len(t_idxs) == 0: + return ReaderData.empty( + num_data_fields=len(channels_idx), num_geo_fields=len(self.geoinfo_idx) + ) + + assert t_idxs[0] >= 0, "index must be non-negative" + didx_start = t_idxs[0] + # End is inclusive + didx_end = t_idxs[-1] + 1 + + # extract number of time steps and collapse ensemble dimension + # ds is a wrapper around zarr with get_coordinate_selection not being exposed since + # subsetting is pushed to the ctor via frequency argument; this also ensures that no sub- + # sampling is required here + try: + data = self.ds[didx_start:didx_end][:, :, 0].astype(np.float32) + except MissingDateError as e: + _logger.debug(f"Date not present in anemoi dataset: {str(e)}. Skipping.") + return ReaderData.empty( + num_data_fields=len(channels_idx), num_geo_fields=len(self.geoinfo_idx) + ) + + # coords-first representation and collapse multiple steps + data = data.transpose([0, 2, 1]).reshape((data.shape[0] * data.shape[2], -1)) + + # extract geoinfo channels (can be time-varying, so read from dataset) + geoinfos = data[:, list(self.geoinfo_idx)] + # extract channels + data = data[:, list(channels_idx)] + + # construct lat/lon coords + latlon = np.concatenate( + [ + np.expand_dims(self.latitudes, 0), + np.expand_dims(self.longitudes, 0), + ], + axis=0, + ).transpose() + # repeat latlon len(t_idxs) times + coords = np.vstack((latlon,) * len(t_idxs)) + + # date time matching #data points of data + # Assuming a fixed frequency for the dataset + datetimes = np.repeat(self.ds.dates[didx_start:didx_end], len(data) // len(t_idxs)) + + rd = ReaderData( + coords=coords, + geoinfos=geoinfos, + data=data, + datetimes=datetimes, + ) + # check_reader_data(rd, dtr) + + return rd From 16f7c6e883a85c845ab85c41f17a68916d17c288 Mon Sep 17 00:00:00 2001 From: Christian Lessig Date: Fri, 29 May 2026 16:36:11 +0200 Subject: [PATCH 2/4] Reduce mem footprint during training; add DataReaderAnemoiOperan --- .../datasets/multi_stream_data_sampler.py | 17 +++++++++++++++-- src/weathergen/datasets/stream_data.py | 16 +++------------- 2 files changed, 18 insertions(+), 15 deletions(-) diff --git a/src/weathergen/datasets/multi_stream_data_sampler.py b/src/weathergen/datasets/multi_stream_data_sampler.py index fed004409..df2bda96d 100644 --- a/src/weathergen/datasets/multi_stream_data_sampler.py +++ b/src/weathergen/datasets/multi_stream_data_sampler.py @@ -19,6 +19,7 @@ from weathergen.common.io import IOReaderData from weathergen.datasets.batch import ModelBatch from weathergen.datasets.data_reader_anemoi import DataReaderAnemoi +from weathergen.datasets.data_reader_anemoi_operan import DataReaderAnemoiOperan from weathergen.datasets.data_reader_base import ( DataReaderBase, TimeWindowHandler, @@ -33,7 +34,7 @@ get_tokens_lens, ) from weathergen.readers_extra.registry import get_extra_reader -from weathergen.train.utils import Stage, get_batch_size_from_config +from weathergen.train.utils import TRAIN, Stage, get_batch_size_from_config from weathergen.utils.distributed import is_root type AnyDataReader = DataReaderBase | DataReaderAnemoi | DataReaderObs @@ -225,6 +226,8 @@ def _init_stream_datasets(self, cf) -> dict[StreamName, list[AnyDataReader]]: dataset = DataReaderObs case "anemoi": dataset = DataReaderAnemoi + case "anemoi_operan": + dataset = DataReaderAnemoiOperan case "fesom": dataset = DataReaderFesom case type_name: @@ -428,7 +431,13 @@ def _build_stream_data_input( ) # collect data for stream - stream_data.add_source(step, rdata, source_cells_lens, source_cells) + source_raw = None + if self._stage == TRAIN: + del source_raw + source_raw = None + stream_data.add_source( + step, source_raw, source_cells_lens, source_cells, rdata.is_spoof + ) return stream_data @@ -479,6 +488,10 @@ def _build_stream_data_output( (time_win_target.start, time_win_target.end), target_mask, ) + + if self._stage == TRAIN: + del idxs_inv + idxs_inv = None stream_data.add_target_values( timestep_idx, tt_cells, tt_c, tt_t, idxs_inv, rdata.is_spoof ) diff --git a/src/weathergen/datasets/stream_data.py b/src/weathergen/datasets/stream_data.py index e993a0f03..2c6083455 100644 --- a/src/weathergen/datasets/stream_data.py +++ b/src/weathergen/datasets/stream_data.py @@ -126,13 +126,6 @@ def pin_memory(self): self.source_tokens_cells = _pin_tensor_list(self.source_tokens_cells) self.source_tokens_lens = _pin_tensor_list(self.source_tokens_lens) self.source_idxs_embed = _pin_tensor_list(self.source_idxs_embed) - self.source_idxs_embed_pe = _pin_tensor_list(self.source_idxs_embed_pe) - - # Pin source_raw (list of IOReaderData objects) - if hasattr(self, "source_raw"): - for raw_data in self.source_raw: - if raw_data is not None and hasattr(raw_data, "pin_memory"): - raw_data.pin_memory() return self @@ -163,14 +156,11 @@ def to_device(self, device: str) -> None: self.source_tokens_lens = [s.to(dv, non_blocking=True) for s in self.source_tokens_lens] self.source_idxs_embed = [s.to(dv, non_blocking=True) for s in self.source_idxs_embed] - self.source_idxs_embed_pe = [ - s.to(dv, non_blocking=True) for s in self.source_idxs_embed_pe - ] return self def add_source( - self, step: int, ss_raw: IOReaderData, ss_lens: torch.Tensor, ss_cells: list + self, step: int, ss_raw: IOReaderData, ss_lens: torch.Tensor, ss_cells: list, is_spoof: bool ) -> None: """ Add data for source for one input. @@ -189,14 +179,14 @@ def add_source( assert step < self.input_steps - self.source_raw[step] = ss_raw + # self.source_raw[step] = ss_raw self.source_tokens_lens[step] = ss_lens self.source_tokens_cells[step] = torch.stack(ss_cells) idx = torch.isnan(self.source_tokens_cells[step]) self.source_tokens_cells[step][idx] = self.mask_value - self.source_is_spoof[step] = ss_raw.is_spoof + self.source_is_spoof[step] = is_spoof def add_target( self, From 3203fd8e0e6298c2ecfb825727c043cca637eb78 Mon Sep 17 00:00:00 2001 From: Christian Lessig Date: Fri, 29 May 2026 16:37:02 +0200 Subject: [PATCH 3/4] Config files --- ...peran_georing_avhrr_forecasting_lowres.yml | 273 ++++++++++++++++++ .../avhrr.yml | 37 +++ .../era5.yml | 48 +++ .../era5_out.yml | 41 +++ .../geos.yml | 174 +++++++++++ .../synop.yml | 43 +++ 6 files changed, 616 insertions(+) create mode 100644 config/config_operan_georing_avhrr_forecasting_lowres.yml create mode 100644 config/streams/operan_georing_avhrr_synop_lowres/avhrr.yml create mode 100644 config/streams/operan_georing_avhrr_synop_lowres/era5.yml create mode 100644 config/streams/operan_georing_avhrr_synop_lowres/era5_out.yml create mode 100644 config/streams/operan_georing_avhrr_synop_lowres/geos.yml create mode 100644 config/streams/operan_georing_avhrr_synop_lowres/synop.yml diff --git a/config/config_operan_georing_avhrr_forecasting_lowres.yml b/config/config_operan_georing_avhrr_forecasting_lowres.yml new file mode 100644 index 000000000..63edaf4fa --- /dev/null +++ b/config/config_operan_georing_avhrr_forecasting_lowres.yml @@ -0,0 +1,273 @@ +# (C) Copyright 2025 WeatherGenerator contributors. +# +# This software is licensed under the terms of the Apache Licence Version 2.0 +# which can be obtained at http://www.apache.org/licenses/LICENSE-2.0. +# +# In applying this licence, ECMWF does not waive the privileges and immunities +# granted to it by virtue of its status as an intergovernmental organisation +# nor does it submit to any jurisdiction. + +embed_orientation: "channels" +embed_unembed_mode: "block" +embed_dropout_rate: 0.1 + +ae_local_dim_embed: 2048 +ae_local_num_blocks: 4 +ae_local_num_heads: 16 +ae_local_dropout_rate: 0.1 +ae_local_with_qk_lnorm: True +ae_local_max_tokens_per_cell: 128 + +ae_local_num_queries: 1 +ae_local_queries_per_cell: False +ae_adapter_num_heads: 16 +ae_adapter_embed: 128 +ae_adapter_with_qk_lnorm: True +ae_adapter_with_residual: True +ae_adapter_dropout_rate: 0.1 + +ae_global_dim_embed: 2048 +ae_global_num_blocks: 4 +ae_global_num_heads: 32 +ae_global_dropout_rate: 0.1 +ae_global_with_qk_lnorm: True +# TODO: switching to < 1 triggers triton-related issues. +# See https://github.com/ecmwf/WeatherGenerator/issues/1050 +ae_global_att_dense_rate: 1.0 +ae_global_block_factor: 64 +ae_global_mlp_hidden_factor: 2 +ae_global_trailing_layer_norm: False + +ae_aggregation_num_blocks: 0 +ae_aggregation_num_heads: 32 +ae_aggregation_dropout_rate: 0.1 +ae_aggregation_with_qk_lnorm: True +ae_aggregation_att_dense_rate: 1.0 +ae_aggregation_block_factor: 64 +ae_aggregation_mlp_hidden_factor: 2 + +decoder_type: PerceiverIOCoordConditioning # Main options PerceiverIOCoordConditioning or Linear +pred_adapter_kv: False +pred_self_attention: True +pred_dyadic_dims: False +pred_mlp_adaln: True +num_class_tokens: 0 +num_register_tokens: 0 + +# number of steps offset applied to first target window; if set to zero and forecast_steps=0 then +# one is training an auto-encoder +fe_num_blocks: 16 +fe_num_heads: 32 +fe_dropout_rate: 0.1 +fe_with_qk_lnorm: True +fe_layer_norm_after_blocks: [7] # Index starts at 0. Thus, [3] adds a LayerNorm after the fourth layer +fe_impute_latent_noise_std: 1e-4 +# currently fixed to 1.0 (due to limitations with flex_attention and triton) +forecast_att_dense_rate: 1.0 + +healpix_level: 5 + +rope_2D: False + +with_mixed_precision: True +with_flash_attention: True +compile_model: False +with_fsdp: True +attention_dtype: bf16 +mixed_precision_dtype: bf16 +mlp_norm_eps: 1e-5 +norm_eps: 1e-4 + +latent_noise_kl_weight: 0.0 # 1e-5 +latent_noise_gamma: 2.0 +latent_noise_saturate_encodings: 5 +latent_noise_use_additive_noise: False +latent_noise_deterministic_latents: True + +freeze_modules: "" +load_chkpt: {} + +norm_type: "LayerNorm" + +##################################### + +# streams_directory: "./config/streams/era5_georing_avhrr/" +streams_directory: "./config/streams/operan_georing_avhrr_synop_lowres/" +streams: ??? + +# type of zarr_store +zarr_store: "zip" # "zarr" for LocalStore, "zip" for ZipStore + +general: + + # mutable parameters + istep: 0 + rank: ??? + world_size: ??? + + # local_rank, + # with_ddp, + # data_path_*, + # model_path, + # run_path, + # path_shared_ + + multiprocessing_method: "fork" + + desc: "" + run_id: ??? + run_history: [] + +# logging frequency in the training loop (in number of batches) +train_logging: + terminal: 10 + metrics: 20 + checkpoint: 500 + +# parameters for data loading +data_loading : + + num_workers: 12 + rng_seed: ??? + repeat_data_in_mini_epoch : False + + +# config for training +training_config: + + # training_mode: "masking", "student_teacher", "latent_loss" + training_mode: ["masking"] + + num_mini_epochs: 56 + samples_per_mini_epoch: 4096 + shuffle: True + + start_date: 2016-01-01T00:00 + end_date: 2022-12-31T00:00 + + time_window_step: 01:00:00 + time_window_len: 06:00:00 + + learning_rate_scheduling : + lr_start: 1e-6 + lr_max: 5e-5 + lr_final_decay: 2e-6 + lr_final: 0.0 + num_steps_warmup: 256 + num_steps_cooldown: 512 + policy_warmup: "cosine" + policy_decay: "constant" + policy_cooldown: "linear" + parallel_scaling_policy: "sqrt" + + optimizer: + grad_clip: 1.0 + weight_decay: 0.1 + log_grad_norms: False + adamw : + # parameters are scaled by number of DDP workers + beta1 : 0.98125 # == 0.85 on 2 nodes x 4 gpus + beta2 : 0.9875 # == 0.90 on 2 nodes x 4 gpus + eps : 2e-08 + + losses : { + "physical": { + type: LossPhysical, + loss_fcts: { "mse": { }, }, + }, + } + + model_input: { + "source_masking" : { + # masking strategy: "random", "healpix", "forecast" + masking_strategy: "forecast", + }, + } + + forecast : + time_step: 06:00:00 + offset: 1 + num_steps: 2 + policy: "fixed" + + +# validation config; full validation config is merge of training and validation config +validation_config: + + samples_per_mini_epoch: 256 + shuffle: False + + start_date: 2023-10-01T00:00 + end_date: 2023-12-31T00:00 + + time_window_step: 06:00:00 + time_window_len: 06:00:00 + + # whether to track the exponential moving average of weights for validation + validate_with_ema: + enabled : True + ema_ramp_up_ratio: 0.09 + ema_halflife_in_thousands: 1e-3 + + # parameters for validation samples that are written to disk + output : { + # number of samples that are written + num_samples: 0, + # write samples in normalized model space + normalized_samples: False, + # output streams to write; default all + streams: null, + } + + # run validation before training starts (mainly for model development) + validate_before_training: False + + +# test config; full test config is merge of validation and test config +test_config: + + samples_per_mini_epoch: 128 + shuffle: False + + start_date: 2023-06-01T00:00 + end_date: 2023-08-31T00:00 + + # parameters for validation samples that are written to disk + output : { + # number of samples that are written + num_samples: 128, + # write samples in normalized model space + normalized_samples: False, + # output streams to write; default all + streams: null, + } + + + +# Tags for experiment tracking +# These tags will be logged in MLFlow along with completed runs for train, eval, val +# The tags are free-form, with the following rules: +# - tags should be primitive types (strings, numbers, booleans). NO lists or dictionaries +# - tags should not duplicate existing config entries. +# - try to reuse existing tags where possible. MLFlow does not like having too many unique tags +# - do not use long strings in values (less than 20 characters is a good rule of thumb, we may enforce this in the future) +wgtags: + # The name of the organization of the person running the experiment. + # This may be autofilled in the future. Expected values are lowercase strings + # e.g. "ecmwf", "cmcc", "metnor", "jsc", "escience" + org: null + # The Github issue corresponding to this run (number such as 1234) + # Github issues are the central point when running experiment and contain + # links to hedgedocs, code branches, pull requests etc. + # It is recommended to associate a run with a Github issue. + issue: null + # The name of the experiment. This is a distinctive codename for the experiment campaign being run. + # This is expected to be the primary tag for comparing experiments in MLFlow, along with the + # issue number. + # Expected values are lowercase strings with no spaces, just underscores: + # Examples: "rollout_ablation_grid" + exp: null + # *** Experiment-specific tags *** + # All extra tags (including lists, dictionaries, etc.) are treated + # as strings by mlflow, so treat all extra tags as simple string key: value pairs. + grid: null diff --git a/config/streams/operan_georing_avhrr_synop_lowres/avhrr.yml b/config/streams/operan_georing_avhrr_synop_lowres/avhrr.yml new file mode 100644 index 000000000..7bba57e91 --- /dev/null +++ b/config/streams/operan_georing_avhrr_synop_lowres/avhrr.yml @@ -0,0 +1,37 @@ +# (C) Copyright 2024 WeatherGenerator contributors. +# +# This software is licensed under the terms of the Apache Licence Version 2.0 +# which can be obtained at http://www.apache.org/licenses/LICENSE-2.0. +# +# In applying this licence, ECMWF does not waive the privileges and immunities +# granted to it by virtue of its status as an intergovernmental organisation +# nor does it submit to any jurisdiction. + +METOP_ABC_AVHRR_IASI : + type : obs + stream_id : 20 + filenames : ['observations-ea-ofb-0001-2007-2021-metop-a-iasi-radiances-v1.zarr', 'observations-ea-ofb-0001-2013-2023-metop-b-iasi-radiances-v1.zarr', 'observations-ea-ofb-0001-2019-2023-metop-c-iasi-radiances-v1.zarr'] + geoinfo_channels : ['cos_local_time', 'sin_local_time', 'cos_julian_day', 'sin_julian_day'] + loss_weight : 1.0 + masking_override : + model_input : + masking_strategy_config : + rate: 1.0 + token_size : 512 # 256 + forcing: True + embed : + net : transformer + num_tokens : 1 + num_heads : 2 + dim_embed : 256 + num_blocks : 2 + embed_target_coords : + net : linear + dim_embed : 256 + target_readout : + num_layers : 2 + num_heads : 4 + pred_head : + ens_size : 1 + num_layers : 1 + diff --git a/config/streams/operan_georing_avhrr_synop_lowres/era5.yml b/config/streams/operan_georing_avhrr_synop_lowres/era5.yml new file mode 100644 index 000000000..8739bf0cd --- /dev/null +++ b/config/streams/operan_georing_avhrr_synop_lowres/era5.yml @@ -0,0 +1,48 @@ +# (C) Copyright 2024 WeatherGenerator contributors. +# +# This software is licensed under the terms of the Apache Licence Version 2.0 +# which can be obtained at http://www.apache.org/licenses/LICENSE-2.0. +# +# In applying this licence, ECMWF does not waive the privileges and immunities +# granted to it by virtue of its status as an intergovernmental organisation +# nor does it submit to any jurisdiction. + +ERA5_in : + type : anemoi_operan + # filenames : ['aifs-ea-an-oper-0001-mars-o96-1979-2024-1h-v3-with-era51.zarr'] + filenames : ['aifs-od-an-oper-0001-mars-o96-2016-2023-6h-v6.zarr'] + stream_id : 0 + source: ['q_150', 'q_200', 'q_250', 'q_300', 'q_400', 'q_500', 'q_600', 'q_700', 'q_850', 'q_925', 'q_1000', 't_50', 't_100', 't_150', 't_200', 't_250', 't_300', 't_400', 't_500', 't_600', 't_700', 't_850', 't_925', 't_1000', 'u_50', 'u_100', 'u_150', 'u_200', 'u_250', 'u_300', 'u_400', 'u_500', 'u_600', 'u_700', 'u_850', 'u_925', 'u_1000', 'v_50', 'v_100', 'v_150', 'v_200', 'v_250', 'v_300', 'v_400', 'v_500', 'v_600', 'v_700', 'v_850', 'v_925', 'v_1000', 'z_50', 'z_100', 'z_150', 'z_200', 'z_250', 'z_300', 'z_400', 'z_500', 'z_600', 'z_700', 'z_850', 'z_925', 'z_1000', '10u', '10v', '2d', '2t', 'msl'] + target: ['q_150', 'q_200', 'q_250', 'q_300', 'q_400', 'q_500', 'q_600', 'q_700', 'q_850', 'q_925', 'q_1000', 't_50', 't_100', 't_150', 't_200', 't_250', 't_300', 't_400', 't_500', 't_600', 't_700', 't_850', 't_925', 't_1000', 'u_50', 'u_100', 'u_150', 'u_200', 'u_250', 'u_300', 'u_400', 'u_500', 'u_600', 'u_700', 'u_850', 'u_925', 'u_1000', 'v_50', 'v_100', 'v_150', 'v_200', 'v_250', 'v_300', 'v_400', 'v_500', 'v_600', 'v_700', 'v_850', 'v_925', 'v_1000', 'z_50', 'z_100', 'z_150', 'z_200', 'z_250', 'z_300', 'z_400', 'z_500', 'z_600', 'z_700', 'z_850', 'z_925', 'z_1000', '10u', '10v', '2d', '2t', 'msl'] + geoinfo_channels : ['z', 'lsm', 'slor', 'sdor', 'insolation', 'cos_local_time', 'sin_local_time', 'cos_julian_day', 'sin_julian_day'] + loss_weight : 1. + location_weight : cosine_latitude + masking_override : + model_input : + masking_strategy_config : + rate: 0.1 + token_size : 8 + tokenize_spacetime : True + max_num_targets: -1 + forcing: True + frequency : 06:00:00 + nominal_time_mapping : + "0" : 5 # 04:30:00 + "6" : 9 # 09:00:00 + "12" : 17 #16:30:00 + "18" : 21 #21:00:00 + embed : + net : transformer + num_tokens : 1 + num_heads : 8 + dim_embed : 512 + num_blocks : 2 + embed_target_coords : + net : linear + dim_embed : 256 + target_readout : + num_layers : 2 + num_heads : 4 + pred_head : + ens_size : 1 + num_layers : 1 diff --git a/config/streams/operan_georing_avhrr_synop_lowres/era5_out.yml b/config/streams/operan_georing_avhrr_synop_lowres/era5_out.yml new file mode 100644 index 000000000..9a30b289c --- /dev/null +++ b/config/streams/operan_georing_avhrr_synop_lowres/era5_out.yml @@ -0,0 +1,41 @@ +# (C) Copyright 2024 WeatherGenerator contributors. +# +# This software is licensed under the terms of the Apache Licence Version 2.0 +# which can be obtained at http://www.apache.org/licenses/LICENSE-2.0. +# +# In applying this licence, ECMWF does not waive the privileges and immunities +# granted to it by virtue of its status as an intergovernmental organisation +# nor does it submit to any jurisdiction. + +ERA5 : + type : anemoi + filenames : ['aifs-ea-an-oper-0001-mars-o96-1979-2024-1h-v3-with-era51.zarr'] + stream_id : 1 + source_exclude : ['z', 'w_10', 'w_50', 'w_100', 'w_150', 'w_200', 'w_250', 'w_300', 'w_400', 'w_500', 'w_600', 'w_700', 'w_850', 'w_925', 'w_1000', 'skt', 'tcw', 'cp', 'tp', 'q_50', 'q_100'] + target_exclude : ['z', 'w_10', 'w_50', 'w_100', 'w_150', 'w_200', 'w_250', 'w_300', 'w_400', 'w_500', 'w_600', 'w_700', 'w_850', 'w_925', 'w_1000', 'slor', 'sdor', 'tcw', 'cp', 'tp', 'q_50', 'q_100'] + geoinfo_channels : ['z', 'lsm', 'slor', 'sdor', 'insolation', 'cos_local_time', 'sin_local_time', 'cos_julian_day', 'sin_julian_day'] + loss_weight : 1. + location_weight : cosine_latitude + masking_override : + target_input : + masking_strategy_config : + rate: 1.0 + token_size : 8 + tokenize_spacetime : False + max_num_targets: -1 + diagnostic: True + embed : + net : transformer + num_tokens : 1 + num_heads : 4 + dim_embed : 256 + num_blocks : 2 + embed_target_coords : + net : linear + dim_embed : 512 + target_readout : + num_layers : 2 + num_heads : 4 + pred_head : + ens_size : 1 + num_layers : 1 diff --git a/config/streams/operan_georing_avhrr_synop_lowres/geos.yml b/config/streams/operan_georing_avhrr_synop_lowres/geos.yml new file mode 100644 index 000000000..87c504574 --- /dev/null +++ b/config/streams/operan_georing_avhrr_synop_lowres/geos.yml @@ -0,0 +1,174 @@ +# (C) Copyright 2024 WeatherGenerator contributors. +# +# This software is licensed under the terms of the Apache Licence Version 2.0 +# which can be obtained at http://www.apache.org/licenses/LICENSE-2.0. +# +# In applying this licence, ECMWF does not waive the privileges and immunities +# granted to it by virtue of its status as an intergovernmental organisation +# nor does it submit to any jurisdiction. + +METEOSAT_SEVIRI_IR : + type : obs + stream_id : 10 + filenames : ['observations-file-2014-2024-seviri-o256-wegen-v3.zarr'] + geoinfo_channels : ['cos_local_time', 'sin_local_time', 'cos_julian_day', 'sin_julian_day', 'zenith', 'cos_sza'] + loss_weight : 1.0 + location_weight : cosine_latitude + masking_override : + model_input : + masking_strategy: "random" + masking_strategy_config : + rate: 1.0 + token_size : 1024 + tokenize_spacetime : False + max_num_targets: 131072 + forcing: True + embed : + net : transformer + num_tokens : 1 + num_heads : 4 + dim_embed : 512 + num_blocks : 2 + embed_target_coords : + net : linear + dim_embed : 512 + target_readout : + num_layers : 2 + num_heads : 4 + pred_head : + ens_size : 1 + num_layers : 1 + + +GOES_ABI_IR : + type : obs + stream_id : 11 + filenames : ['observations-file-2017-2024-abi-goes16-IR-o256-v2.zarr'] + geoinfo_channels : ['cos_local_time', 'sin_local_time', 'cos_julian_day', 'sin_julian_day','zenith', 'cos_sza'] + loss_weight : 1.0 + location_weight : cosine_latitude + masking_override : + model_input : + masking_strategy: "random" + masking_strategy_config : + rate: 1.0 + token_size : 1024 + tokenize_spacetime : False + max_num_targets: 131072 + forcing: True + embed : + net : transformer + num_tokens : 1 + num_heads : 4 + dim_embed : 512 + num_blocks : 2 + embed_target_coords : + net : linear + dim_embed : 512 + target_readout : + num_layers : 2 + num_heads : 4 + pred_head : + ens_size : 1 + num_layers : 1 + + +HIMAWARI_AHI_IR : + type : obs + stream_id : 12 + filenames : ['observations-file-2015-2022-himawari8-IR-o256-v1.zarr', 'observations-file-2022-2024-himawari9-IR-o256-v1.zarr'] + geoinfo_channels : ['cos_local_time', 'sin_local_time', 'cos_julian_day', 'sin_julian_day', 'zenith', 'cos_sza'] + loss_weight : 1.0 + location_weight : cosine_latitude + masking_override : + model_input : + masking_strategy: "random" + masking_strategy_config : + rate: 1.0 + token_size : 1024 + tokenize_spacetime : False + max_num_targets: 131072 + forcing: True + embed : + net : transformer + num_tokens : 1 + num_heads : 4 + dim_embed : 512 + num_blocks : 2 + embed_target_coords : + net : linear + dim_embed : 512 + target_readout : + num_layers : 2 + num_heads : 4 + pred_head : + ens_size : 1 + num_layers : 1 + + +GOES_ABI_VIS : + type : obs + stream_id : 13 + filenames : ['observations-file-2017-2024-abi-goes16-VIS-o256-v2.zarr'] + # geoinfo_channels : ['cos_local_time', 'sin_local_time', 'cos_julian_day', 'sin_julian_day', 'zenith', 'cos_sza'] + geoinfo_channels : ['zenith'] + loss_weight : 1.0 + location_weight : cosine_latitude + masking_override : + model_input : + masking_strategy: "random" + masking_strategy_config : + rate: 1.0 + token_size : 1024 + tokenize_spacetime : False + max_num_targets: 131072 + forcing: True + embed : + net : transformer + num_tokens : 1 + num_heads : 4 + dim_embed : 512 + num_blocks : 2 + embed_target_coords : + net : linear + dim_embed : 512 + target_readout : + num_layers : 2 + num_heads : 4 + pred_head : + ens_size : 1 + num_layers : 1 + + +HIMAWARI_AHI_VIS : + type : obs + stream_id : 14 + filenames : ['observations-file-2015-2022-himawari8-VIS-o256-v1.zarr', 'observations-file-2022-2024-himawari9-VIS-o256-v1.zarr'] + # geoinfo_channels : ['cos_local_time', 'sin_local_time', 'cos_julian_day', 'sin_julian_day', 'zenith', 'cos_sza'] + geoinfo_channels : ['zenith'] + loss_weight : 1.0 + location_weight : cosine_latitude + masking_override : + model_input : + masking_strategy: "random" + masking_strategy_config : + rate: 1.0 + token_size : 1024 + tokenize_spacetime : False + max_num_targets: 131072 + forcing: True + embed : + net : transformer + num_tokens : 1 + num_heads : 4 + dim_embed : 512 + num_blocks : 2 + embed_target_coords : + net : linear + dim_embed : 512 + target_readout : + num_layers : 2 + num_heads : 4 + pred_head : + ens_size : 1 + num_layers : 1 \ No newline at end of file diff --git a/config/streams/operan_georing_avhrr_synop_lowres/synop.yml b/config/streams/operan_georing_avhrr_synop_lowres/synop.yml new file mode 100644 index 000000000..df796579b --- /dev/null +++ b/config/streams/operan_georing_avhrr_synop_lowres/synop.yml @@ -0,0 +1,43 @@ +# (C) Copyright 2024 WeatherGenerator contributors. +# +# This software is licensed under the terms of the Apache Licence Version 2.0 +# which can be obtained at http://www.apache.org/licenses/LICENSE-2.0. +# +# In applying this licence, ECMWF does not waive the privileges and immunities +# granted to it by virtue of its status as an intergovernmental organisation +# nor does it submit to any jurisdiction. + +SurfaceCombined : + type : obs + stream_id : 30 + filenames : ['observations-ea-ofb-0001-1979-2025-combined-surface-v5.zarr'] + # filenames : ['observations-ea-ofb-0001-1979-2023-combined-surface-v2.zarr'] + # filenames : ['observations-ea-ofb-0001-1979-2022-combined-surface-v3-fixed-land-spatial80-lsm09-min10.zarr', 'observations-ea-ofb-0001-2023-combined-surface-v3-fixed-land-heldout20-lsm09-min10.zarr'] + source: ['obsvalue_tsts_0', 'obsvalue_t2m_0', 'obsvalue_td2m_0', 'obsvalue_u10m_0', 'obsvalue_v10m_0', 'obsvalue_pmsl_0', 'obsvalue_ps_0'] + target: ['obsvalue_tsts_0', 'obsvalue_t2m_0', 'obsvalue_td2m_0', 'obsvalue_u10m_0', 'obsvalue_v10m_0', 'obsvalue_pmsl_0', 'obsvalue_ps_0'] + geoinfo_channels : ['stalt', 'lsm', 'cos_sza', 'cos_local_time', 'sin_local_time', 'cos_julian_day', 'sin_julian_day'] + loss_weight : 1.0 + token_size : 64 + masking_override : + model_input : + masking_strategy: "random" + masking_strategy_config : + rate: 0.1 + tokenize_spacetime : False + max_num_targets: -1 + # diagnostic: True + embed : + net : transformer + num_tokens : 1 + num_heads : 2 + dim_embed : 512 + num_blocks : 2 + embed_target_coords : + net : linear + dim_embed : 512 + target_readout : + num_layers : 2 + num_heads : 4 + pred_head : + ens_size : 1 + num_layers : 1 From 868715bf3a117671e0228d230e2c36d6837174ae Mon Sep 17 00:00:00 2001 From: Christian Lessig Date: Fri, 29 May 2026 21:22:43 +0200 Subject: [PATCH 4/4] Fixed corner case: sample 0 that has no precedessor --- src/weathergen/datasets/data_reader_anemoi_operan.py | 6 ++++-- src/weathergen/datasets/multi_stream_data_sampler.py | 2 +- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/src/weathergen/datasets/data_reader_anemoi_operan.py b/src/weathergen/datasets/data_reader_anemoi_operan.py index 34d3eda44..14347b3d1 100644 --- a/src/weathergen/datasets/data_reader_anemoi_operan.py +++ b/src/weathergen/datasets/data_reader_anemoi_operan.py @@ -118,8 +118,10 @@ def _get(self, idx: TIndex, channels_idx: list[int]) -> ReaderData: # use latest available sample that is valid w.r.t the input data window datetimes_mask = [dt < dtr.end for dt in datetimes_offset] - t_idxs = [t_idxs[datetimes_mask][-1].item()] - assert len(t_idxs) == 1 + if np.array(datetimes_mask).sum() == 0: + t_idxs = [] + else: + t_idxs = [t_idxs[datetimes_mask][-1].item()] # _get from DataReaderAnemoi diff --git a/src/weathergen/datasets/multi_stream_data_sampler.py b/src/weathergen/datasets/multi_stream_data_sampler.py index df2bda96d..e438b0850 100644 --- a/src/weathergen/datasets/multi_stream_data_sampler.py +++ b/src/weathergen/datasets/multi_stream_data_sampler.py @@ -205,7 +205,7 @@ def _calc_baseperms(self, fsm: int) -> np.typing.NDArray: perms_len = int(self.index_range.end - self.index_range.start) perms_len -= (fsm + self.output_offset) * (self.time_step // self.step_timedelta) - return np.arange(perms_len) + return np.arange(1, perms_len) def _init_stream_datasets(self, cf) -> dict[StreamName, list[AnyDataReader]]: """Load dataset readers for all streams from config."""