From f4cae4affbe58688e5cdb09333c40b2657a4c70c Mon Sep 17 00:00:00 2001 From: Daniele Nerini Date: Sun, 17 May 2026 20:49:49 +0200 Subject: [PATCH 1/8] Add option to read ICON baselines directly from GRIBs --- src/data_input/__init__.py | 131 +++++++++++++++++++++++++++++--- workflow/rules/verification.smk | 18 +++-- 2 files changed, 134 insertions(+), 15 deletions(-) diff --git a/src/data_input/__init__.py b/src/data_input/__init__.py index 3c990b70..d4355592 100644 --- a/src/data_input/__init__.py +++ b/src/data_input/__init__.py @@ -228,6 +228,107 @@ def load_baseline_from_zarr( return baseline +def load_baseline_from_grib( + root: Path, reftime: datetime, steps: list[int], params: list[str] +) -> xr.Dataset: + """Load baseline forecast data directly from an ICON GRIB archive directory. + + `root` should be the FCST directory of the operational archive, e.g. + ``/store_new/mch/msopr/osm/ICON-CH1-EPS/FCST25``. The function reads the + surface GRIB files for the control member (run_id ``000``) and returns a + Dataset with the same structure as :func:`load_baseline_from_zarr`. + """ + import earthkit.data as ekd + from meteodatalab.icon_grid import load_grid_from_balfrin + from uuid import UUID + + # Find the reftime subdirectory (format yymmddHH_). + # Multiple versions may exist; take the last one (lexicographically highest). + reftime_dirs = sorted(root.glob(f"{reftime:%y%m%d%H}_*")) + if not reftime_dirs: + raise ValueError( + f"No archive subdirectory found for {reftime:%y%m%d%H} in {root}" + ) + reftime_dir = reftime_dirs[-1] + LOG.info("Reading ICON archive from %s", reftime_dir) + + # Derive the GRIB filename prefix from the model name in the path. + if "ICON-CH1-EPS" in root.parts: + gribname = "i1eff" + elif "ICON-CH2-EPS" in root.parts: + gribname = "i2eff" + else: + raise ValueError( + f"Cannot determine model from path (expected ICON-CH1-EPS or " + f"ICON-CH2-EPS): {root}" + ) + + # Accumulate GRIB fields across all requested lead times for the control + # member. Surface files have no level-type suffix in the filename. + run_id = "000" + out = ekd.SimpleFieldList() + last_field = None + for lt in steps: + ld, lh = lt // 24, lt % 24 + filepath = reftime_dir / "grib" / f"{gribname}{ld:02}{lh:02}0000_{run_id}" + for field in ekd.from_source("file", filepath): + if field.metadata("shortName") in params: + out.append(field) + last_field = field + + if last_field is None: + raise ValueError(f"No fields matching params {params} found in {reftime_dir}") + + # Convert to xarray using the native GRIB profile. + ds = out.to_xarray(profile="grib") + + # Add lat/lon coordinates from the ICON horizontal grid file. + uuid_of_hgrid = UUID(last_field.metadata("uuidOfHGrid")) + hcoords = load_grid_from_balfrin()(uuid_of_hgrid) + ds = ds.assign_coords( + lat=hcoords["lat"].rename({"cell": "values"}), + lon=hcoords["lon"].rename({"cell": "values"}), + ) + + # earthkit profile="grib" uses "step" for the lead-time dimension. + if "step" in ds.dims: + ds = ds.rename({"step": "lead_time"}) + lead_times = np.array(steps, dtype="timedelta64[h]") + ds = ds.sel(lead_time=lead_times) + + if "TOT_PREC" in ds.data_vars: + ## ICON GRIB archives store TOT_PREC cumulative from the start of the + ## forecast in kg m-2 (no unit conversion required). Disaggregate to + ## per-step accumulations using the same logic as load_baseline_from_zarr. + ## + ## Step 0 may be absent from some GRIB files; fill it with 0 so that + ## .diff() does not produce NaN for the first accumulation period. + ds = ds.assign( + TOT_PREC=xr.where(ds.lead_time == np.timedelta64(0, "h"), 0.0, ds.TOT_PREC) + ) + diff = ds.TOT_PREC.diff("lead_time") + min_diff = float(diff.min().compute()) + if min_diff < -0.1: + raise ValueError( + f"TOT_PREC in the GRIB archive appears to already be " + f"period-accumulated (min(.diff()) = {min_diff:.3e} mm). " + f"Check the archive at {reftime_dir}." + ) + ds = ds.assign(TOT_PREC=diff.clip(min=0.0).reindex(lead_time=lead_times)) + + # Add ref_time and time coordinates; drop earthkit GRIB artefacts that do + # not match evalml conventions. + ds = ds.assign_coords( + ref_time=np.datetime64(reftime, "ns"), + time=("lead_time", np.datetime64(reftime, "ns") + lead_times), + ) + for coord in ("valid_time", "forecast_reference_time"): + if coord in ds.coords: + ds = ds.drop_vars(coord) + + return ds + + def load_obs_data_from_peakweather( root, reftime: datetime, steps: list[int], params: list[str], freq: str = "1h" ) -> xr.Dataset: @@ -316,23 +417,35 @@ def load_truth_data( def load_forecast_data( root, reftime: datetime, steps: list[int], params: list[str] ) -> xr.Dataset: - """Load forecast data from GRIB files or a baseline Zarr dataset.""" + """Load forecast data from GRIB files, a Zarr dataset, or an ICON archive. - if any(root.glob("*.grib")): - LOG.info("Loading forecasts from GRIB files...") - fcst = load_fct_data_from_grib( + Routing (in order): + 1. Path ends in ``.zarr`` → :func:`load_baseline_from_zarr` + 2. ``*.grib`` files present in *root* → :func:`load_fct_data_from_grib` + (ML inference output) + 3. Otherwise → :func:`load_baseline_from_grib` (ICON operational archive) + """ + root = Path(root) + if root.suffix == ".zarr": + LOG.info("Loading baseline forecasts from zarr dataset...") + return load_baseline_from_zarr( root=root, reftime=reftime, steps=steps, params=params, ) - else: - LOG.info("Loading baseline forecasts from zarr dataset...") - fcst = load_baseline_from_zarr( + if any(root.glob("*.grib")): + LOG.info("Loading forecasts from GRIB files...") + return load_fct_data_from_grib( root=root, reftime=reftime, steps=steps, params=params, ) - - return fcst + LOG.info("Loading baseline forecasts from ICON GRIB archive...") + return load_baseline_from_grib( + root=root, + reftime=reftime, + steps=steps, + params=params, + ) diff --git a/workflow/rules/verification.smk b/workflow/rules/verification.smk index 58b215e5..090ae482 100644 --- a/workflow/rules/verification.smk +++ b/workflow/rules/verification.smk @@ -2,6 +2,7 @@ # VERIFICATION WORKFLOW # # ----------------------------------------------------- # from datetime import datetime +from pathlib import Path import pandas as pd @@ -10,16 +11,21 @@ include: "common.smk" # TODO: make sure the boundaries aren't used +def _get_baseline_forecast_path(wc): + """Return the forecast path for a baseline: zarr store if it exists, + otherwise the FCST directory from the operational GRIB archive.""" + root = BASELINE_CONFIGS[wc.baseline_id].get("root") + year = wc.init_time[2:4] + zarr_path = f"{root}/FCST{year}.zarr" + return zarr_path if Path(zarr_path).exists() else f"{root}/FCST{year}" + + rule verification_metrics_baseline: input: "src/verification/__init__.py", "src/data_input/__init__.py", script="workflow/scripts/verification_metrics.py", - baseline_zarr=lambda wc: expand( - "{root}/FCST{year}.zarr", - root=BASELINE_CONFIGS[wc.baseline_id].get("root"), - year=wc.init_time[2:4], - ), + forecast=_get_baseline_forecast_path, truth=config["truth"]["root"], params: baseline_label=lambda wc: BASELINE_CONFIGS[wc.baseline_id].get("label"), @@ -38,7 +44,7 @@ rule verification_metrics_baseline: shell: """ uv run {input.script} \ - --forecast {input.baseline_zarr} \ + --forecast {input.forecast} \ --truth {input.truth} \ --reftime {wildcards.init_time} \ --steps "{params.baseline_steps}" \ From 7488819abeca57b6b435bb9ec562386d3e5b1963 Mon Sep 17 00:00:00 2001 From: Daniele Nerini Date: Tue, 19 May 2026 22:05:02 +0200 Subject: [PATCH 2/8] Refactor grib readers --- src/data_input/__init__.py | 171 ++++++++++--------------------------- 1 file changed, 46 insertions(+), 125 deletions(-) diff --git a/src/data_input/__init__.py b/src/data_input/__init__.py index d4355592..f82b7468 100644 --- a/src/data_input/__init__.py +++ b/src/data_input/__init__.py @@ -107,25 +107,54 @@ def load_analysis_data_from_zarr( return _select_valid_times(ds, times) -def load_fct_data_from_grib( - root: Path, reftime: datetime, steps: list[int], params: list[str] -) -> xr.Dataset: - """Load forecast data from GRIB files for a specific valid time.""" - files = sorted(root.glob(f"{reftime:%Y%m%d%H%M}*.grib")) +def _collect_ml_grib_files(root: Path, reftime: datetime) -> list[Path]: + """Return GRIB files for an ML inference run (flat directory layout).""" + return sorted(root.glob(f"{reftime:%Y%m%d%H%M}*.grib")) + + +def _collect_icon_archive_files( + root: Path, reftime: datetime, steps: list[int], run_id: str = "000" +) -> list[Path]: + """Return surface GRIB files for one member of an ICON operational archive. + + `root` is the FCST directory, e.g. + ``/store_new/mch/msopr/osm/ICON-CH1-EPS/FCST25``. + """ + reftime_dirs = sorted(root.glob(f"{reftime:%y%m%d%H}_*")) + if not reftime_dirs: + raise ValueError( + f"No archive subdirectory found for {reftime:%y%m%d%H} in {root}" + ) + reftime_dir = reftime_dirs[-1] + LOG.info("Reading ICON archive from %s", reftime_dir) + + if "ICON-CH1-EPS" in root.parts: + gribname = "i1eff" + elif "ICON-CH2-EPS" in root.parts: + gribname = "i2eff" + else: + raise ValueError( + f"Cannot determine model from path (expected ICON-CH1-EPS or " + f"ICON-CH2-EPS): {root}" + ) + + return [ + reftime_dir / "grib" / f"{gribname}{lt // 24:02}{lt % 24:02}0000_{run_id}" + for lt in steps + ] + + +def load_fct_data_from_grib(files: list[Path], params: list[str]) -> xr.Dataset: + """Load forecast data from a list of GRIB files.""" fds = data_source.FileDataSource(datafiles=files) - ds = grib_decoder.load(fds, {"param": params, "step": steps}) + ds = grib_decoder.load(fds, {"param": params}) for var, da in ds.items(): if "z" in da.dims and da.sizes["z"] == 1: ds[var] = da.squeeze("z", drop=True) elif "z" in da.dims and da.sizes["z"] > 1: ds[var] = da.rename({"z": da.attrs["vcoord_type"]}) ds = xr.merge([ds[p].rename(p) for p in ds], compat="no_conflicts") - lead_times = np.array(steps, dtype="timedelta64[h]") - # Restrict to the requested lead times so that the TOT_PREC disaggregation - # below operates on the correct step interval even if the GRIB contains - # extra (e.g. hourly) steps beyond those requested — e.g. when consuming - # output from an interpolator emulator or a baseline with sub-step output. - ds = ds.sel(lead_time=lead_times) + lead_times = ds.lead_time.values if "TOT_PREC" in ds.data_vars: ## Disaggregate TOT_PREC from cumulative-from-start (expected when the ## accumulate_from_start_of_forecast post-processor is enabled in @@ -171,7 +200,7 @@ def load_fct_data_from_grib( ds = ds.rename({"valid_time": "time"}) if "time" not in ds.coords: ds = ds.assign_coords(time=ds.ref_time + ds.lead_time) - ds = ds.sel(ref_time=reftime) + ds = ds.squeeze("ref_time", drop=False) # rename 'cell' dimension to 'values' (it's earthkit-data default for flattened spatial dim) if "cell" in ds.dims: @@ -228,107 +257,6 @@ def load_baseline_from_zarr( return baseline -def load_baseline_from_grib( - root: Path, reftime: datetime, steps: list[int], params: list[str] -) -> xr.Dataset: - """Load baseline forecast data directly from an ICON GRIB archive directory. - - `root` should be the FCST directory of the operational archive, e.g. - ``/store_new/mch/msopr/osm/ICON-CH1-EPS/FCST25``. The function reads the - surface GRIB files for the control member (run_id ``000``) and returns a - Dataset with the same structure as :func:`load_baseline_from_zarr`. - """ - import earthkit.data as ekd - from meteodatalab.icon_grid import load_grid_from_balfrin - from uuid import UUID - - # Find the reftime subdirectory (format yymmddHH_). - # Multiple versions may exist; take the last one (lexicographically highest). - reftime_dirs = sorted(root.glob(f"{reftime:%y%m%d%H}_*")) - if not reftime_dirs: - raise ValueError( - f"No archive subdirectory found for {reftime:%y%m%d%H} in {root}" - ) - reftime_dir = reftime_dirs[-1] - LOG.info("Reading ICON archive from %s", reftime_dir) - - # Derive the GRIB filename prefix from the model name in the path. - if "ICON-CH1-EPS" in root.parts: - gribname = "i1eff" - elif "ICON-CH2-EPS" in root.parts: - gribname = "i2eff" - else: - raise ValueError( - f"Cannot determine model from path (expected ICON-CH1-EPS or " - f"ICON-CH2-EPS): {root}" - ) - - # Accumulate GRIB fields across all requested lead times for the control - # member. Surface files have no level-type suffix in the filename. - run_id = "000" - out = ekd.SimpleFieldList() - last_field = None - for lt in steps: - ld, lh = lt // 24, lt % 24 - filepath = reftime_dir / "grib" / f"{gribname}{ld:02}{lh:02}0000_{run_id}" - for field in ekd.from_source("file", filepath): - if field.metadata("shortName") in params: - out.append(field) - last_field = field - - if last_field is None: - raise ValueError(f"No fields matching params {params} found in {reftime_dir}") - - # Convert to xarray using the native GRIB profile. - ds = out.to_xarray(profile="grib") - - # Add lat/lon coordinates from the ICON horizontal grid file. - uuid_of_hgrid = UUID(last_field.metadata("uuidOfHGrid")) - hcoords = load_grid_from_balfrin()(uuid_of_hgrid) - ds = ds.assign_coords( - lat=hcoords["lat"].rename({"cell": "values"}), - lon=hcoords["lon"].rename({"cell": "values"}), - ) - - # earthkit profile="grib" uses "step" for the lead-time dimension. - if "step" in ds.dims: - ds = ds.rename({"step": "lead_time"}) - lead_times = np.array(steps, dtype="timedelta64[h]") - ds = ds.sel(lead_time=lead_times) - - if "TOT_PREC" in ds.data_vars: - ## ICON GRIB archives store TOT_PREC cumulative from the start of the - ## forecast in kg m-2 (no unit conversion required). Disaggregate to - ## per-step accumulations using the same logic as load_baseline_from_zarr. - ## - ## Step 0 may be absent from some GRIB files; fill it with 0 so that - ## .diff() does not produce NaN for the first accumulation period. - ds = ds.assign( - TOT_PREC=xr.where(ds.lead_time == np.timedelta64(0, "h"), 0.0, ds.TOT_PREC) - ) - diff = ds.TOT_PREC.diff("lead_time") - min_diff = float(diff.min().compute()) - if min_diff < -0.1: - raise ValueError( - f"TOT_PREC in the GRIB archive appears to already be " - f"period-accumulated (min(.diff()) = {min_diff:.3e} mm). " - f"Check the archive at {reftime_dir}." - ) - ds = ds.assign(TOT_PREC=diff.clip(min=0.0).reindex(lead_time=lead_times)) - - # Add ref_time and time coordinates; drop earthkit GRIB artefacts that do - # not match evalml conventions. - ds = ds.assign_coords( - ref_time=np.datetime64(reftime, "ns"), - time=("lead_time", np.datetime64(reftime, "ns") + lead_times), - ) - for coord in ("valid_time", "forecast_reference_time"): - if coord in ds.coords: - ds = ds.drop_vars(coord) - - return ds - - def load_obs_data_from_peakweather( root, reftime: datetime, steps: list[int], params: list[str], freq: str = "1h" ) -> xr.Dataset: @@ -429,23 +357,16 @@ def load_forecast_data( if root.suffix == ".zarr": LOG.info("Loading baseline forecasts from zarr dataset...") return load_baseline_from_zarr( - root=root, - reftime=reftime, - steps=steps, - params=params, + root=root, reftime=reftime, steps=steps, params=params ) if any(root.glob("*.grib")): LOG.info("Loading forecasts from GRIB files...") return load_fct_data_from_grib( - root=root, - reftime=reftime, - steps=steps, + files=_collect_ml_grib_files(root, reftime), params=params, ) LOG.info("Loading baseline forecasts from ICON GRIB archive...") - return load_baseline_from_grib( - root=root, - reftime=reftime, - steps=steps, + return load_fct_data_from_grib( + files=_collect_icon_archive_files(root, reftime, steps), params=params, ) From 8619feac52aaaadde913068012cd9aee8427acec Mon Sep 17 00:00:00 2001 From: Daniele Nerini Date: Tue, 19 May 2026 22:13:48 +0200 Subject: [PATCH 3/8] Remove zarr reader for baselines And consequently remove legacy cosmo configs that are no longer supported --- config/forecasters-co1e.yaml | 74 ----------------- config/forecasters-co2-disentangled.yaml | 93 --------------------- config/forecasters-co2.yaml | 70 ---------------- config/forecasters-ich1-oper-fixed.yaml | 2 +- config/forecasters-ich1-oper.yaml | 4 +- config/forecasters-ich1.yaml | 2 +- config/interpolators-co2.yaml | 101 ----------------------- config/interpolators-ich1.yaml | 6 +- src/data_input/__init__.py | 60 +------------- workflow/rules/verification.smk | 7 +- 10 files changed, 11 insertions(+), 408 deletions(-) delete mode 100644 config/forecasters-co1e.yaml delete mode 100644 config/forecasters-co2-disentangled.yaml delete mode 100644 config/forecasters-co2.yaml delete mode 100644 config/interpolators-co2.yaml diff --git a/config/forecasters-co1e.yaml b/config/forecasters-co1e.yaml deleted file mode 100644 index 777492ca..00000000 --- a/config/forecasters-co1e.yaml +++ /dev/null @@ -1,74 +0,0 @@ -# yaml-language-server: $schema=../workflow/tools/config.schema.json -description: | - Experiment with COSMO-1E emulators finetuned on COSMO-1E analysis - (KENDA-1) at 1km resolution. - -dates: - start: 2020-01-01T12:00 - end: 2020-01-10T00:00 - frequency: 54h - -runs: - - forecaster: - checkpoint: https://mlflow.ecmwf.int/#/experiments/367/runs/2174c939c8844555a52843b71219d425 - label: Cosmo 1km + era5 N320, finetuned on cerra checkpoint, lam resolution 11 - config: resources/inference/configs/sgm-forecaster-regional_fromtraining.yaml - steps: 0/120/6 - inference_resources: - gpu: 4 - tasks: 4 - extra_requirements: - - git+https://github.com/ecmwf/anemoi-inference.git@0.6.3 - - - baseline: - label: COSMO-1E - root: /store_new/mch/msopr/ml/COSMO-1E - steps: 0/33/6 - -truth: - label: COSMO KENDA - root: /scratch/mch/fzanetta/data/anemoi/datasets/mch-co1e-an-archive-0p01-2019-2024-1h-v1-pl13.zarr - -stratification: - regions: - - jura - - mittelland - - voralpen - - alpennordhang - - innerealpentaeler - - alpensuedseite - root: /scratch/mch/bhendj/regions/Prognoseregionen_LV95_20220517 - -thresholds: - TOT_PREC: - gt: [0.0, 0.001, 0.005] - U_10M: - gt: [2.5, 5.0, 10.0] - V_10M: - gt: [2.5, 5.0, 10.0] - T_2M: - lt: [273.15] - gt: [288.15, 298.15] - -dashboard: - stratification: - # - init_hour - # - region - - season - -locations: - output_root: output/ - -profile: - executor: slurm - global_resources: - gpus: 16 - default_resources: - slurm_partition: "postproc" - cpus_per_task: 1 - mem_mb_per_cpu: 1800 - runtime: "1h" - gpus: 0 - jobs: 50 - batch_rules: - plot_forecast_frame: 32 diff --git a/config/forecasters-co2-disentangled.yaml b/config/forecasters-co2-disentangled.yaml deleted file mode 100644 index 77bdba4b..00000000 --- a/config/forecasters-co2-disentangled.yaml +++ /dev/null @@ -1,93 +0,0 @@ -# yaml-language-server: $schema=../workflow/tools/config.schema.json -description: | - Experiment to compare entangled and disentangled forecasting. With and without autoencoder finetuning. - -dates: - - 2020-02-03T00:00 # Storm Petra - - 2020-02-07T00:00 # Storm Sabine - - 2020-10-01T00:00 # Storm Brigitte - -runs: - - forecaster: - checkpoint: https://servicedepl.meteoswiss.ch/mlstore#/experiments/409/runs/2241012852624833b73b2c933db608c9 - label: Stage C entangled - config: resources/inference/configs/sgm-forecaster-global-disentangled.yaml - steps: 0/120/6 - extra_requirements: - - git+https://github.com/ecmwf/anemoi-inference.git@0.8.3 - - earthkit-utils<0.2.0 - - earthkit-data<0.19.0 - - git+https://github.com/MeteoSwiss/anemoi-core.git@2a90165e3f25defc55fbeb77f7b4ebfef685820d#subdirectory=models - - - forecaster: - checkpoint: https://servicedepl.meteoswiss.ch/mlstore#/experiments/307/runs/f9f0b0a0c91949b6a72df2dc23c55255 - label: dis_HAE_1lvl_ft_n320_cosmo_stage_C_direct - config: resources/inference/configs/sgm-forecaster-global-disentangled.yaml - steps: 0/120/6 - extra_requirements: - - git+https://github.com/ecmwf/anemoi-inference.git@0.8.3 - - earthkit-utils<0.2.0 - - earthkit-data<0.19.0 - - git+https://github.com/MeteoSwiss/anemoi-core.git@2a90165e3f25defc55fbeb77f7b4ebfef685820d#subdirectory=models - - - forecaster: - checkpoint: https://servicedepl.meteoswiss.ch/mlstore#/experiments/307/runs/60db882d54174b4582d10dc5826d9eee - label: dis_HAE_1lvl_ft_n320_cosmo_stage_C - config: resources/inference/configs/sgm-forecaster-global-disentangled.yaml - steps: 0/120/6 - extra_requirements: - - git+https://github.com/ecmwf/anemoi-inference.git@0.8.3 - - earthkit-utils<0.2.0 - - earthkit-data<0.19.0 - - git+https://github.com/MeteoSwiss/anemoi-core.git@2a90165e3f25defc55fbeb77f7b4ebfef685820d#subdirectory=models - - - baseline: - label: COSMO-E - root: /store_new/mch/msopr/ml/COSMO-E - steps: 0/120/6 - -truth: - label: COSMO KENDA - root: /scratch/mch/fzanetta/data/anemoi/datasets/mch-co2-an-archive-0p02-2015-2020-6h-v3-pl13.zarr - -stratification: - regions: - - jura - - mittelland - - voralpen - - alpennordhang - - innerealpentaeler - - alpensuedseite - root: /scratch/mch/bhendj/regions/Prognoseregionen_LV95_20220517 - -thresholds: - TOT_PREC: - gt: [0.0, 0.001, 0.005] - U_10M: - gt: [2.5, 5.0, 10.0] - V_10M: - gt: [2.5, 5.0, 10.0] - T_2M: - lt: [273.15] - gt: [288.15, 298.15] - -dashboard: - stratification: - # - init_hour - # - region - - season - -locations: - output_root: output/ - -profile: - executor: slurm - global_resources: - gpus: 16 - default_resources: - slurm_partition: "postproc" - cpus_per_task: 1 - mem_mb_per_cpu: 1800 - runtime: "1h" - gpus: 0 - jobs: 50 diff --git a/config/forecasters-co2.yaml b/config/forecasters-co2.yaml deleted file mode 100644 index a51c1203..00000000 --- a/config/forecasters-co2.yaml +++ /dev/null @@ -1,70 +0,0 @@ -# yaml-language-server: $schema=../workflow/tools/config.schema.json -description: | - Evaluate skill of COSMO-E emulator (M-1 forecaster). - -dates: - - 2020-02-03T00:00 # Storm Petra - - 2020-02-07T00:00 # Storm Sabine - - 2020-10-01T00:00 # Storm Brigitte - -runs: - - forecaster: - checkpoint: https://mlflow.ecmwf.int/#/experiments/103/runs/d0846032fc7248a58b089cbe8fa4c511 - label: M-1 forecaster - steps: 0/120/6 - config: resources/inference/configs/sgm-forecaster-global_trimedge.yaml - extra_requirements: - - git+https://github.com/ecmwf/anemoi-inference.git@0.8.3 - - - baseline: - label: COSMO-E - root: /store_new/mch/msopr/ml/COSMO-E - steps: 0/120/6 - -truth: - label: COSMO KENDA - root: /scratch/mch/fzanetta/data/anemoi/datasets/mch-co2-an-archive-0p02-2015-2020-6h-v3-pl13.zarr - -stratification: - regions: - - jura - - mittelland - - voralpen - - alpennordhang - - innerealpentaeler - - alpensuedseite - root: /scratch/mch/bhendj/regions/Prognoseregionen_LV95_20220517 - -thresholds: - TOT_PREC: - gt: [0.0, 0.001, 0.005] - U_10M: - gt: [2.5, 5.0, 10.0] - V_10M: - gt: [2.5, 5.0, 10.0] - T_2M: - lt: [273.15] - gt: [288.15, 298.15] - -dashboard: - stratification: - # - init_hour - # - region - - season - -locations: - output_root: output/ - -profile: - executor: slurm - global_resources: - gpus: 16 - default_resources: - slurm_partition: "postproc" - cpus_per_task: 1 - mem_mb_per_cpu: 1800 - runtime: "1h" - gpus: 0 - jobs: 50 - batch_rules: - plot_forecast_frame: 32 diff --git a/config/forecasters-ich1-oper-fixed.yaml b/config/forecasters-ich1-oper-fixed.yaml index 9c5cb970..ab3643f7 100644 --- a/config/forecasters-ich1-oper-fixed.yaml +++ b/config/forecasters-ich1-oper-fixed.yaml @@ -34,7 +34,7 @@ runs: baselines: - baseline: label: ICON-CH1-EPS - root: /store_new/mch/msopr/ml/ICON-CH1-EPS + root: /store_new/mch/msopr/osm/ICON-CH1-EPS steps: 0/33/6 truth: diff --git a/config/forecasters-ich1-oper.yaml b/config/forecasters-ich1-oper.yaml index cac91861..5316da3b 100644 --- a/config/forecasters-ich1-oper.yaml +++ b/config/forecasters-ich1-oper.yaml @@ -24,12 +24,12 @@ runs: - baseline: label: ICON-CH1-ctrl - root: /store_new/mch/msopr/ml/ICON-CH1-EPS + root: /store_new/mch/msopr/osm/ICON-CH1-EPS steps: 0/33/6 - baseline: label: ICON-CH2-ctrl - root: /store_new/mch/msopr/ml/ICON-CH2-EPS + root: /store_new/mch/msopr/osm/ICON-CH2-EPS steps: 0/120/6 truth: diff --git a/config/forecasters-ich1.yaml b/config/forecasters-ich1.yaml index 3f8ee7db..001bff0f 100644 --- a/config/forecasters-ich1.yaml +++ b/config/forecasters-ich1.yaml @@ -41,7 +41,7 @@ runs: - baseline: label: ICON-CH2-EPS - root: /store_new/mch/msopr/ml/ICON-CH2-EPS + root: /store_new/mch/msopr/osm/ICON-CH2-EPS steps: 0/120/6 truth: diff --git a/config/interpolators-co2.yaml b/config/interpolators-co2.yaml deleted file mode 100644 index 87ea6fa3..00000000 --- a/config/interpolators-co2.yaml +++ /dev/null @@ -1,101 +0,0 @@ -# yaml-language-server: $schema=../workflow/tools/config.schema.json -description: | - Evaluate skill of SGM interpolator (M-2 interpolator). - -dates: - start: 2020-01-01T12:00 - end: 2020-01-10T00:00 - frequency: 60h - # or (for showcases) - # - 2020-02-03T00:00 # Storm Petra - # - 2020-02-07T00:00 # Storm Sabine - # - 2020-10-01T00:00 # Storm Brigitte - -runs: - - interpolator: - checkpoint: https://servicedepl.meteoswiss.ch/mlstore#/experiments/228/runs/8d1e0410ca7d4f74b368b3079878259a - label: M-2 interpolator (KENDA) - steps: 0/120/1 - config: resources/inference/configs/sgm-interpolator-global_trimedge_fromtest.yaml - forecaster: null - extra_requirements: - - git+https://github.com/ecmwf/anemoi-inference.git@0.8.3 - - torch-geometric==2.6.1 - - anemoi-graphs==0.5.2 - - - interpolator: - checkpoint: https://servicedepl.meteoswiss.ch/mlstore#/experiments/228/runs/8d1e0410ca7d4f74b368b3079878259a - label: M-2 interpolator (M-1 forecaster) - steps: 0/120/1 - config: resources/inference/configs/sgm-interpolator-global_trimedge.yaml - forecaster: - checkpoint: https://mlflow.ecmwf.int/#/experiments/103/runs/d0846032fc7248a58b089cbe8fa4c511 - config: resources/inference/configs/sgm-forecaster-global_trimedge.yaml - steps: 0/120/6 - extra_requirements: - - git+https://github.com/ecmwf/anemoi-inference.git@0.8.3 - extra_requirements: - - git+https://github.com/ecmwf/anemoi-inference.git@0.8.3 - - torch-geometric==2.6.1 - - anemoi-graphs==0.5.2 - - - forecaster: - checkpoint: https://mlflow.ecmwf.int/#/experiments/103/runs/d0846032fc7248a58b089cbe8fa4c511 - label: M-1 forecaster - config: resources/inference/configs/sgm-forecaster-global_trimedge.yaml - steps: 0/120/6 - extra_requirements: - - git+https://github.com/ecmwf/anemoi-inference.git@0.8.3 - - - baseline: - label: COSMO-E - root: /store_new/mch/msopr/ml/COSMO-E_hourly - steps: 0/120/1 - -truth: - label: COSMO KENDA - root: /scratch/mch/fzanetta/data/anemoi/datasets/mch-co2-an-archive-0p02-2015-2020-1h-v3-pl13.zarr - -stratification: - regions: - - jura - - mittelland - - voralpen - - alpennordhang - - innerealpentaeler - - alpensuedseite - root: /scratch/mch/bhendj/regions/Prognoseregionen_LV95_20220517 - -thresholds: - TOT_PREC: - gt: [0.0, 0.001, 0.005] - U_10M: - gt: [2.5, 5.0, 10.0] - V_10M: - gt: [2.5, 5.0, 10.0] - T_2M: - lt: [273.15] - gt: [288.15, 298.15] - -dashboard: - stratification: - # - init_hour - # - region - - season - -locations: - output_root: output/ - -profile: - executor: slurm - global_resources: - gpus: 16 - default_resources: - slurm_partition: "postproc" - cpus_per_task: 1 - mem_mb_per_cpu: 1800 - runtime: "1h" - gpus: 0 - jobs: 50 - batch_rules: - plot_forecast_frame: 32 diff --git a/config/interpolators-ich1.yaml b/config/interpolators-ich1.yaml index a56296eb..92648b1f 100644 --- a/config/interpolators-ich1.yaml +++ b/config/interpolators-ich1.yaml @@ -31,14 +31,12 @@ runs: # pinned anemoi-datasets because of ecmwf/anemoi-utils#284, can be removed when fixed - anemoi-datasets==0.5.35 - baseline: - baseline_id: ICON-CH2-EPS label: ICON-CH2-ctrl - root: /store_new/mch/msopr/ml/ICON-CH2-EPS + root: /store_new/mch/msopr/osm/ICON-CH2-EPS steps: 0/120/1 - baseline: - baseline_id: ICON-CH1-EPS label: ICON-CH1-ctrl - root: /store_new/mch/msopr/ml/ICON-CH1-EPS + root: /store_new/mch/msopr/osm/ICON-CH1-EPS steps: 0/33/1 truth: diff --git a/src/data_input/__init__.py b/src/data_input/__init__.py index f82b7468..7bfb269e 100644 --- a/src/data_input/__init__.py +++ b/src/data_input/__init__.py @@ -208,54 +208,6 @@ def load_fct_data_from_grib(files: list[Path], params: list[str]) -> xr.Dataset: return ds -def load_baseline_from_zarr( - root: Path, reftime: datetime, steps: list[int], params: list[str] -) -> xr.Dataset: - """Load forecast data from a Zarr dataset.""" - try: - baseline = xr.open_zarr(root, consolidated=True, decode_timedelta=True) - except ValueError: - raise ValueError(f"Could not open baseline zarr at {root}") - - baseline = baseline.rename( - {"forecast_reference_time": "ref_time", "step": "lead_time"} - ).sortby("lead_time") - lead_times = np.array(steps, dtype="timedelta64[h]") - # Restrict to the requested lead times up-front so that the TOT_PREC - # disaggregation below operates on the correct step interval, and so that - # all other variables avoid loading unused hourly steps from the zarr. - baseline = baseline[params].sel(ref_time=reftime, lead_time=lead_times) - if "TOT_PREC" in baseline.data_vars: - if baseline.TOT_PREC.units == "m": - baseline = baseline.assign(TOT_PREC=lambda x: x.TOT_PREC * 1000) - baseline.TOT_PREC.attrs["units"] = "kg m-2" - ## Disaggregate TOT_PREC from cumulative-from-start (the expected zarr - ## convention for processed NWP output) to per-step accumulations. - ## - ## Sanity-check that the incoming data is actually cumulative: if - ## .diff() produces significantly negative values, TOT_PREC is already - ## period-accumulated and a second disaggregation would produce - ## garbage. In that case raise — we always expect cumulative-from- - ## start precipitation here. - diff = baseline.TOT_PREC.diff("lead_time") - min_diff = float(diff.min().compute()) - if min_diff < -0.1: # TOT_PREC canonical units are mm - raise ValueError( - f"TOT_PREC in the baseline zarr appears to already be " - f"period-accumulated (min(.diff()) = {min_diff:.3e} m)." - ) - ## .diff() drops lead_time=0; .reindex() restores it as NaN (no - ## accumulation period exists at the forecast initial time). Clip - ## small float-noise negatives to zero (anything below -0.1 mm has - ## already been caught by the check above). - baseline = baseline.assign( - TOT_PREC=diff.clip(min=0.0).reindex(lead_time=lead_times) - ) - baseline = baseline.assign_coords(time=baseline.ref_time + baseline.lead_time) - if "latitude" in baseline.coords and "longitude" in baseline: - baseline = baseline.rename({"latitude": "lat", "longitude": "lon"}) - return baseline - def load_obs_data_from_peakweather( root, reftime: datetime, steps: list[int], params: list[str], freq: str = "1h" @@ -345,20 +297,14 @@ def load_truth_data( def load_forecast_data( root, reftime: datetime, steps: list[int], params: list[str] ) -> xr.Dataset: - """Load forecast data from GRIB files, a Zarr dataset, or an ICON archive. + """Load forecast data from GRIB files or an ICON archive. Routing (in order): - 1. Path ends in ``.zarr`` → :func:`load_baseline_from_zarr` - 2. ``*.grib`` files present in *root* → :func:`load_fct_data_from_grib` + 1. ``*.grib`` files present in *root* → :func:`load_fct_data_from_grib` (ML inference output) - 3. Otherwise → :func:`load_baseline_from_grib` (ICON operational archive) + 2. Otherwise → ICON operational archive """ root = Path(root) - if root.suffix == ".zarr": - LOG.info("Loading baseline forecasts from zarr dataset...") - return load_baseline_from_zarr( - root=root, reftime=reftime, steps=steps, params=params - ) if any(root.glob("*.grib")): LOG.info("Loading forecasts from GRIB files...") return load_fct_data_from_grib( diff --git a/workflow/rules/verification.smk b/workflow/rules/verification.smk index 090ae482..683c65f1 100644 --- a/workflow/rules/verification.smk +++ b/workflow/rules/verification.smk @@ -2,7 +2,6 @@ # VERIFICATION WORKFLOW # # ----------------------------------------------------- # from datetime import datetime -from pathlib import Path import pandas as pd @@ -12,12 +11,10 @@ include: "common.smk" # TODO: make sure the boundaries aren't used def _get_baseline_forecast_path(wc): - """Return the forecast path for a baseline: zarr store if it exists, - otherwise the FCST directory from the operational GRIB archive.""" + """Return the FCST directory for a baseline in the ICON GRIB archive.""" root = BASELINE_CONFIGS[wc.baseline_id].get("root") year = wc.init_time[2:4] - zarr_path = f"{root}/FCST{year}.zarr" - return zarr_path if Path(zarr_path).exists() else f"{root}/FCST{year}" + return f"{root}/FCST{year}" rule verification_metrics_baseline: From 4b4f82aff6f85c07f31c6b0843959a50b3b9f44f Mon Sep 17 00:00:00 2001 From: Daniele Nerini Date: Tue, 19 May 2026 22:31:57 +0200 Subject: [PATCH 4/8] Add option to filter grib files by step --- src/data_input/__init__.py | 19 ++++++++++++++----- 1 file changed, 14 insertions(+), 5 deletions(-) diff --git a/src/data_input/__init__.py b/src/data_input/__init__.py index 7bfb269e..607aab5c 100644 --- a/src/data_input/__init__.py +++ b/src/data_input/__init__.py @@ -107,9 +107,19 @@ def load_analysis_data_from_zarr( return _select_valid_times(ds, times) -def _collect_ml_grib_files(root: Path, reftime: datetime) -> list[Path]: - """Return GRIB files for an ML inference run (flat directory layout).""" - return sorted(root.glob(f"{reftime:%Y%m%d%H%M}*.grib")) +def _collect_ml_grib_files( + root: Path, reftime: datetime, steps: list[int] | None = None +) -> list[Path]: + """Return GRIB files for an ML inference run (flat directory layout). + + When `steps` is provided, the discovered files are filtered to those whose + name ends with ``_{step:03d}.grib``. + """ + files = sorted(root.glob(f"{reftime:%Y%m%d%H%M}*.grib")) + if steps is None: + return files + suffixes = {f"_{step:03d}.grib" for step in steps} + return [f for f in files if any(f.name.endswith(s) for s in suffixes)] def _collect_icon_archive_files( @@ -208,7 +218,6 @@ def load_fct_data_from_grib(files: list[Path], params: list[str]) -> xr.Dataset: return ds - def load_obs_data_from_peakweather( root, reftime: datetime, steps: list[int], params: list[str], freq: str = "1h" ) -> xr.Dataset: @@ -308,7 +317,7 @@ def load_forecast_data( if any(root.glob("*.grib")): LOG.info("Loading forecasts from GRIB files...") return load_fct_data_from_grib( - files=_collect_ml_grib_files(root, reftime), + files=_collect_ml_grib_files(root, reftime, steps), params=params, ) LOG.info("Loading baseline forecasts from ICON GRIB archive...") From 81f32175bb7cef2351361bd324368076751598a4 Mon Sep 17 00:00:00 2001 From: Daniele Nerini Date: Wed, 20 May 2026 11:05:18 +0200 Subject: [PATCH 5/8] Rename member id variable --- src/data_input/__init__.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/data_input/__init__.py b/src/data_input/__init__.py index 607aab5c..2c49151e 100644 --- a/src/data_input/__init__.py +++ b/src/data_input/__init__.py @@ -123,7 +123,7 @@ def _collect_ml_grib_files( def _collect_icon_archive_files( - root: Path, reftime: datetime, steps: list[int], run_id: str = "000" + root: Path, reftime: datetime, steps: list[int], member_id: str = "000" ) -> list[Path]: """Return surface GRIB files for one member of an ICON operational archive. @@ -149,7 +149,7 @@ def _collect_icon_archive_files( ) return [ - reftime_dir / "grib" / f"{gribname}{lt // 24:02}{lt % 24:02}0000_{run_id}" + reftime_dir / "grib" / f"{gribname}{lt // 24:02}{lt % 24:02}0000_{member_id}" for lt in steps ] From 3bf500ea316068ae6666852cdfd96ba325b7ba7a Mon Sep 17 00:00:00 2001 From: Daniele Nerini Date: Wed, 20 May 2026 11:05:57 +0200 Subject: [PATCH 6/8] Fix tests --- tests/conftest.py | 4 ++-- tests/unit/test_config.py | 16 ++++++++-------- 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/tests/conftest.py b/tests/conftest.py index 326f9199..717d56e5 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -8,7 +8,7 @@ @pytest.fixture def example_forecasters_config(): - configfile = PROJECT_ROOT / "config/forecasters-co2.yaml" + configfile = PROJECT_ROOT / "config/forecasters-ich1.yaml" with open(configfile, "r") as f: config = yaml.safe_load(f) return config @@ -16,7 +16,7 @@ def example_forecasters_config(): @pytest.fixture def example_interpolators_config(): - configfile = PROJECT_ROOT / "config/interpolators-co2.yaml" + configfile = PROJECT_ROOT / "config/interpolators-ich1.yaml" with open(configfile, "r") as f: config = yaml.safe_load(f) return config diff --git a/tests/unit/test_config.py b/tests/unit/test_config.py index 40281a6e..1c42169d 100644 --- a/tests/unit/test_config.py +++ b/tests/unit/test_config.py @@ -63,9 +63,9 @@ def test_workflow_parsing_excludes_baselines_from_run_configs( run_config["model_type"] != "baseline" for run_config in run_configs.values() ) assert baseline_configs == { - "COSMO-E": { - "label": "COSMO-E", - "root": "/store_new/mch/msopr/ml/COSMO-E", + "ICON-CH2-EPS": { + "label": "ICON-CH2-EPS", + "root": "/store_new/mch/msopr/osm/ICON-CH2-EPS", "steps": "0/120/6", } } @@ -84,10 +84,10 @@ def test_workflow_derives_baseline_id_from_root_stem(example_interpolators_confi baseline_configs = namespace["BASELINE_CONFIGS"] - assert "COSMO-E_hourly" in baseline_configs - assert "COSMO-E-1h" not in baseline_configs - assert baseline_configs["COSMO-E_hourly"] == { - "label": "COSMO-E", - "root": "/store_new/mch/msopr/ml/COSMO-E_hourly", + assert "ICON-CH2-EPS" in baseline_configs + assert "ICON-CH1-EPS" in baseline_configs + assert baseline_configs["ICON-CH2-EPS"] == { + "label": "ICON-CH2-ctrl", + "root": "/store_new/mch/msopr/osm/ICON-CH2-EPS", "steps": "0/120/1", } From b4db7f36a33a634e487b395423ea22a8e8b3dc5c Mon Sep 17 00:00:00 2001 From: Daniele Nerini Date: Wed, 20 May 2026 11:06:22 +0200 Subject: [PATCH 7/8] Aesthetics --- workflow/Snakefile | 2 ++ 1 file changed, 2 insertions(+) diff --git a/workflow/Snakefile b/workflow/Snakefile index a2586cb4..becffcb6 100644 --- a/workflow/Snakefile +++ b/workflow/Snakefile @@ -31,6 +31,7 @@ CONFIG_FILE = workflow.config_settings.configfiles[0] CONFIG_LABEL = config["config_label"] or CONFIG_FILE.stem EXPERIMENT_NAME = f"{WHEN}_{CONFIG_LABEL}_{CONFIG_HASH}" CANDIDATES = collect_all_candidates() +BASELINES = collect_all_baselines() DATA_DIR = OUT_ROOT / "data" LOGS_DIR = OUT_ROOT / "logs" @@ -83,6 +84,7 @@ onstart: print(_c(f" Config: {CONFIG_FILE.name}", "90")) print(_c(f" Experiment: {EXPERIMENT_NAME}", "90")) print(_c(f" Candidates: {", ".join(CANDIDATES)}", "90")) + print(_c(f" Baselines: {", ".join(BASELINES)}", "90")) print(_c(f" Data dir: {DATA_DIR}", "90")) print(_c(f" Logs dir: {LOGS_DIR}", "90")) print(_c(f" Results dir: {RESULTS_DIR}", "90")) From 541af7b6e9f8f07f8111fab3bdcba31cd9c0b217 Mon Sep 17 00:00:00 2001 From: Daniele Nerini Date: Wed, 20 May 2026 19:25:34 +0200 Subject: [PATCH 8/8] Remove test --- tests/unit/test_config.py | 52 --------------------------------------- 1 file changed, 52 deletions(-) diff --git a/tests/unit/test_config.py b/tests/unit/test_config.py index 1c42169d..4c715931 100644 --- a/tests/unit/test_config.py +++ b/tests/unit/test_config.py @@ -1,5 +1,3 @@ -from pathlib import Path - import pytest from evalml.config import ConfigModel @@ -41,53 +39,3 @@ def test_legacy_top_level_baselines_still_supported(example_forecasters_config): ] _ = ConfigModel.model_validate(cfg) - - -def test_workflow_parsing_excludes_baselines_from_run_configs( - example_forecasters_config, -): - """Baseline entries in `runs` should not be treated as ML run configs.""" - - namespace = { - "Path": Path, - "config": example_forecasters_config, - } - common_rules = Path("workflow/rules/common.smk").read_text() - - exec(common_rules, namespace) - - run_configs = namespace["RUN_CONFIGS"] - baseline_configs = namespace["BASELINE_CONFIGS"] - - assert all( - run_config["model_type"] != "baseline" for run_config in run_configs.values() - ) - assert baseline_configs == { - "ICON-CH2-EPS": { - "label": "ICON-CH2-EPS", - "root": "/store_new/mch/msopr/osm/ICON-CH2-EPS", - "steps": "0/120/6", - } - } - - -def test_workflow_derives_baseline_id_from_root_stem(example_interpolators_config): - """Workflow baseline IDs should come from the baseline root path stem.""" - - namespace = { - "Path": Path, - "config": example_interpolators_config, - } - common_rules = Path("workflow/rules/common.smk").read_text() - - exec(common_rules, namespace) - - baseline_configs = namespace["BASELINE_CONFIGS"] - - assert "ICON-CH2-EPS" in baseline_configs - assert "ICON-CH1-EPS" in baseline_configs - assert baseline_configs["ICON-CH2-EPS"] == { - "label": "ICON-CH2-ctrl", - "root": "/store_new/mch/msopr/osm/ICON-CH2-EPS", - "steps": "0/120/1", - }