diff --git a/pyproject.toml b/pyproject.toml index 596e327..ff7f490 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -5,7 +5,7 @@ description = "Add your description here" readme = "README.md" requires-python = ">=3.12" dependencies = [ - "astropy~=7.0.1", + "astropy~=7.2.0", "cartopy~=0.24.1", "cf-xarray~=0.10", "cftime~=1.6.0", @@ -13,8 +13,9 @@ dependencies = [ "matplotlib~=3.8", "netcdf4==1.7.3", "numcodecs>=0.13.0,<0.17", - "numcodecs-combinators[xarray]~=0.2.10", + "numcodecs-combinators[xarray]~=0.2.13", "numcodecs-observers~=0.1.2", + "numcodecs-safeguards==0.1.0b2", "numcodecs-wasm~=0.2.2", "numcodecs-wasm-bit-round~=0.4.0", "numcodecs-wasm-fixed-offset-scale~=0.4.0", @@ -28,6 +29,7 @@ dependencies = [ "numcodecs-wasm-zfp~=0.6.0", "numcodecs-wasm-zfp-classic~=0.4.0", "numcodecs-wasm-zstd~=0.4.0", + "numcodecs-zero~=0.1.2", "pandas~=2.2", "scipy~=1.14", "seaborn~=0.13.2", diff --git a/src/climatebenchpress/compressor/compressors/__init__.py b/src/climatebenchpress/compressor/compressors/__init__.py index b523c75..63da691 100644 --- a/src/climatebenchpress/compressor/compressors/__init__.py +++ b/src/climatebenchpress/compressor/compressors/__init__.py @@ -2,6 +2,12 @@ "BitRound", "BitRoundPco", "Jpeg2000", + "SafeguardedBitRoundPco", + "SafeguardedSperr", + "SafeguardedSz3", + "SafeguardedZero", + "SafeguardedZeroDssim", + "SafeguardedZfpRound", "Sperr", "StochRound", "StochRoundPco", @@ -15,6 +21,14 @@ from .bitround import BitRound from .bitround_pco import BitRoundPco from .jpeg2000 import Jpeg2000 +from .safeguarded import ( + SafeguardedBitRoundPco, + SafeguardedSperr, + SafeguardedSz3, + SafeguardedZero, + SafeguardedZeroDssim, + SafeguardedZfpRound, +) from .sperr import Sperr from .stochround import StochRound from .stochround_pco import StochRoundPco diff --git a/src/climatebenchpress/compressor/compressors/abc.py b/src/climatebenchpress/compressor/compressors/abc.py index c4dacb8..e20429f 100644 --- a/src/climatebenchpress/compressor/compressors/abc.py +++ b/src/climatebenchpress/compressor/compressors/abc.py @@ -88,6 +88,10 @@ def abs_bound_codec( dtype: Optional[np.dtype] = None, data_min: Optional[float] = None, data_max: Optional[float] = None, + data_abs_min: Optional[float] = None, + data_abs_max: Optional[float] = None, + data_min_2d: Optional[np.ndarray] = None, + data_max_2d: Optional[np.ndarray] = None, ) -> Codec: """Create a codec with an absolute error bound.""" pass @@ -100,6 +104,10 @@ def rel_bound_codec( dtype: Optional[np.dtype] = None, data_min: Optional[float] = None, data_max: Optional[float] = None, + data_abs_min: Optional[float] = None, + data_abs_max: Optional[float] = None, + data_min_2d: Optional[np.ndarray] = None, + data_max_2d: Optional[np.ndarray] = None, ) -> Codec: """Create a codec with a relative error bound.""" pass @@ -112,6 +120,8 @@ def build( data_abs_max: dict[VariableName, float], data_min: dict[VariableName, float], data_max: dict[VariableName, float], + data_min_2d: dict[VariableName, np.ndarray], + data_max_2d: dict[VariableName, np.ndarray], error_bounds: list[dict[VariableName, ErrorBound]], ) -> dict[VariantName, list[NamedPerVariableCodec]]: """ @@ -135,6 +145,12 @@ def build( Dict mapping from variable name to minimum value for the variable. data_max : dict[VariableName, float] Dict mapping from variable name to maximum value for the variable. + data_min_2d : dict[VariableName, np.ndarray] + Dict mapping from variable name to per-lat-lon-slice minimum value for the + variable. + data_max_2d : dict[VariableName, np.ndarray] + Dict mapping from variable name to per-lat-lon-slice maximum value for the + variable. error_bounds: list[ErrorBound] List of error bounds to use for the compressor. @@ -167,6 +183,10 @@ def build( dtype=dtypes[var], data_min=data_min[var], data_max=data_max[var], + data_abs_min=data_abs_min[var], + data_abs_max=data_abs_max[var], + data_min_2d=data_min_2d[var], + data_max_2d=data_max_2d[var], ) elif eb.rel_error is not None and cls.has_rel_error_impl: new_codecs[var] = partial( @@ -175,6 +195,10 @@ def build( dtype=dtypes[var], data_min=data_min[var], data_max=data_max[var], + data_abs_min=data_abs_min[var], + data_abs_max=data_abs_max[var], + data_min_2d=data_min_2d[var], + data_max_2d=data_max_2d[var], ) else: # This should never happen as we have already transformed the error bounds. diff --git a/src/climatebenchpress/compressor/compressors/safeguarded/__init__.py b/src/climatebenchpress/compressor/compressors/safeguarded/__init__.py new file mode 100644 index 0000000..2fb5669 --- /dev/null +++ b/src/climatebenchpress/compressor/compressors/safeguarded/__init__.py @@ -0,0 +1,15 @@ +__all__ = [ + "SafeguardedBitRoundPco", + "SafeguardedSperr", + "SafeguardedSz3", + "SafeguardedZero", + "SafeguardedZeroDssim", + "SafeguardedZfpRound", +] + +from .bitround_pco import SafeguardedBitRoundPco +from .sperr import SafeguardedSperr +from .sz3 import SafeguardedSz3 +from .zero import SafeguardedZero +from .zero_dssim import SafeguardedZeroDssim +from .zfp_round import SafeguardedZfpRound diff --git a/src/climatebenchpress/compressor/compressors/safeguarded/bitround_pco.py b/src/climatebenchpress/compressor/compressors/safeguarded/bitround_pco.py new file mode 100644 index 0000000..36cd540 --- /dev/null +++ b/src/climatebenchpress/compressor/compressors/safeguarded/bitround_pco.py @@ -0,0 +1,66 @@ +__all__ = ["SafeguardedBitRoundPco"] + + +import numcodecs_safeguards +import numcodecs_wasm_bit_round +import numcodecs_wasm_pco + +from ..abc import Compressor +from ..utils import compute_keepbits + + +class SafeguardedBitRoundPco(Compressor): + """Safeguarded Bit Rounding + PCodec compressor. + + This compressor first applies bit rounding to the data, which reduces the precision of the data + while preserving its overall structure. After that, it uses PCodec for further compression. + """ + + name = "safeguarded-bitround-pco" + description = "Safeguarded(Bit Rounding + PCodec)" + + @staticmethod + def abs_bound_codec(error_bound, *, dtype=None, data_abs_max=None, **kwargs): + assert dtype is not None, "dtype must be provided" + assert data_abs_max is not None, "data_abs_max must be provided" + + # conservative abs->rel error bound transformation, + # same as convert_abs_error_to_rel_error + # so that we can inform the safeguards of the abs bound + keepbits = compute_keepbits(dtype, error_bound / data_abs_max) + + return numcodecs_safeguards.SafeguardedCodec( + codec=numcodecs_wasm_bit_round.BitRound(keepbits=keepbits), + lossless=numcodecs_safeguards.lossless.Lossless( + for_codec=numcodecs_wasm_pco.Pco( + level=8, + mode="auto", + delta="auto", + paging="equal-pages-up-to", + ) + ), + safeguards=[ + dict(kind="eb", type="abs", eb=error_bound, equal_nan=True), + ], + ) + + @staticmethod + def rel_bound_codec(error_bound, *, dtype=None, **kwargs): + assert dtype is not None, "dtype must be provided" + + keepbits = compute_keepbits(dtype, error_bound) + + return numcodecs_safeguards.SafeguardedCodec( + codec=numcodecs_wasm_bit_round.BitRound(keepbits=keepbits), + lossless=numcodecs_safeguards.lossless.Lossless( + for_codec=numcodecs_wasm_pco.Pco( + level=8, + mode="auto", + delta="auto", + paging="equal-pages-up-to", + ) + ), + safeguards=[ + dict(kind="eb", type="rel", eb=error_bound, equal_nan=True), + ], + ) diff --git a/src/climatebenchpress/compressor/compressors/safeguarded/sperr.py b/src/climatebenchpress/compressor/compressors/safeguarded/sperr.py new file mode 100644 index 0000000..5bb2373 --- /dev/null +++ b/src/climatebenchpress/compressor/compressors/safeguarded/sperr.py @@ -0,0 +1,60 @@ +__all__ = ["SafeguardedSperr"] + +import numcodecs +import numcodecs.abc +import numcodecs.compat +import numcodecs_safeguards +import numcodecs_wasm_sperr +import numpy as np +from numcodecs_combinators.stack import CodecStack + +from ..abc import Compressor + + +class SafeguardedSperr(Compressor): + """Safeguarded SPERR compressor.""" + + name = "safeguarded-sperr" + description = "Safeguarded(SPERR)" + + @staticmethod + def abs_bound_codec(error_bound, **kwargs): + return numcodecs_safeguards.SafeguardedCodec( + codec=CodecStack( + NaNToMean(), + numcodecs_wasm_sperr.Sperr(mode="pwe", pwe=error_bound), + ), + safeguards=[ + dict(kind="eb", type="abs", eb=error_bound, equal_nan=True), + ], + ) + + @staticmethod + def rel_bound_codec(error_bound, *, data_abs_min=None, **kwargs): + assert data_abs_min is not None, "data_abs_min must be provided" + + return numcodecs_safeguards.SafeguardedCodec( + codec=CodecStack( + NaNToMean(), + # conservative rel->abs error bound transformation, + # same as convert_rel_error_to_abs_error + # so that we can inform the safeguards of the rel bound + numcodecs_wasm_sperr.Sperr(mode="pwe", pwe=error_bound * data_abs_min), + ), + safeguards=[ + dict(kind="eb", type="rel", eb=error_bound, equal_nan=True), + ], + ) + + +# inspired by H5Z-SPERR's treatment of NaN values: +# https://github.com/NCAR/H5Z-SPERR/blob/72ebcb00e382886c229c5ef5a7e237fe451d5fb8/src/h5z-sperr.c#L464-L473 +# https://github.com/NCAR/H5Z-SPERR/blob/72ebcb00e382886c229c5ef5a7e237fe451d5fb8/src/h5zsperr_helper.cpp#L179-L212 +class NaNToMean(numcodecs.abc.Codec): + codec_id = "nan-to-mean" # type: ignore + + def encode(self, buf): + return np.nan_to_num(buf, nan=np.nanmean(buf), posinf=np.inf, neginf=-np.inf) + + def decode(self, buf, out=None): + return numcodecs.compat.ndarray_copy(buf, out) diff --git a/src/climatebenchpress/compressor/compressors/safeguarded/sz3.py b/src/climatebenchpress/compressor/compressors/safeguarded/sz3.py new file mode 100644 index 0000000..609f1de --- /dev/null +++ b/src/climatebenchpress/compressor/compressors/safeguarded/sz3.py @@ -0,0 +1,31 @@ +__all__ = ["SafeguardedSz3"] + +import numcodecs_safeguards +import numcodecs_wasm_sz3 + +from ..abc import Compressor + + +class SafeguardedSz3(Compressor): + """Safeguarded SZ3 compressor.""" + + name = "safeguarded-sz3" + description = "Safeguarded(SZ3)" + + @staticmethod + def abs_bound_codec(error_bound, **kwargs): + return numcodecs_safeguards.SafeguardedCodec( + codec=numcodecs_wasm_sz3.Sz3(eb_mode="abs", eb_abs=error_bound), + safeguards=[ + dict(kind="eb", type="abs", eb=error_bound, equal_nan=True), + ], + ) + + @staticmethod + def rel_bound_codec(error_bound, **kwargs): + return numcodecs_safeguards.SafeguardedCodec( + codec=numcodecs_wasm_sz3.Sz3(eb_mode="rel", eb_rel=error_bound), + safeguards=[ + dict(kind="eb", type="rel", eb=error_bound, equal_nan=True), + ], + ) diff --git a/src/climatebenchpress/compressor/compressors/safeguarded/zero.py b/src/climatebenchpress/compressor/compressors/safeguarded/zero.py new file mode 100644 index 0000000..99e1790 --- /dev/null +++ b/src/climatebenchpress/compressor/compressors/safeguarded/zero.py @@ -0,0 +1,31 @@ +__all__ = ["SafeguardedZero"] + +import numcodecs_safeguards +import numcodecs_zero + +from ..abc import Compressor + + +class SafeguardedZero(Compressor): + """Safeguarded all-zero compressor.""" + + name = "safeguarded-zero" + description = "Safeguarded(0)" + + @staticmethod + def abs_bound_codec(error_bound, **kwargs): + return numcodecs_safeguards.SafeguardedCodec( + codec=numcodecs_zero.ZeroCodec(), + safeguards=[ + dict(kind="eb", type="abs", eb=error_bound, equal_nan=True), + ], + ) + + @staticmethod + def rel_bound_codec(error_bound, **kwargs): + return numcodecs_safeguards.SafeguardedCodec( + codec=numcodecs_zero.ZeroCodec(), + safeguards=[ + dict(kind="eb", type="rel", eb=error_bound, equal_nan=True), + ], + ) diff --git a/src/climatebenchpress/compressor/compressors/safeguarded/zero_dssim.py b/src/climatebenchpress/compressor/compressors/safeguarded/zero_dssim.py new file mode 100644 index 0000000..ea483ba --- /dev/null +++ b/src/climatebenchpress/compressor/compressors/safeguarded/zero_dssim.py @@ -0,0 +1,95 @@ +__all__ = ["SafeguardedZeroDssim"] + +import numcodecs_safeguards +import numcodecs_zero + +from ..abc import Compressor + + +class SafeguardedZeroDssim(Compressor): + """Safeguarded all-zero compressor that also safeguards the dSSIM score.""" + + name = "safeguarded-zero-dssim" + description = "Safeguarded(0, dSSIM)" + + @staticmethod + def abs_bound_codec(error_bound, data_min_2d=None, data_max_2d=None, **kwargs): + assert data_min_2d is not None, "data_min_2d must be provided" + assert data_max_2d is not None, "data_max_2d must be provided" + + return numcodecs_safeguards.SafeguardedCodec( + codec=numcodecs_zero.ZeroCodec(), + safeguards=[ + dict(kind="eb", type="abs", eb=error_bound, equal_nan=True), + # guarantee that the per-latitude-longitude-slice minimum and + # maximum are preserved, which simplifies the rescaling + dict(kind="sign", offset="x_min"), + dict(kind="sign", offset="x_max"), + dict( + kind="qoi_eb_pw", + qoi=""" + # === pointwise dSSIM quantity of interest === # + + # we guarantee that + # min(data) = min(corrected) and + # max(data) = max(corrected) + # with the sign safeguards above + v["smin"] = c["x_min"]; + v["smax"] = c["x_max"]; + v["r"] = v["smax"] - v["smin"]; + + # re-scale to [0-1] and quantize to 256 bins + v["sc_a2"] = round_ties_even(((x - v["smin"]) / v["r"]) * 255) / 255; + + # force the quantized value to stay the same + return v["sc_a2"]; + """, + type="abs", + eb=0, + ), + ], + # use data_min_2d instead of $x_min since we need the minimum per + # 2d latitude-longitude slice + fixed_constants=dict(x_min=data_min_2d, x_max=data_max_2d), + ) + + @staticmethod + def rel_bound_codec(error_bound, data_min_2d=None, data_max_2d=None, **kwargs): + assert data_min_2d is not None, "data_min_2d must be provided" + assert data_max_2d is not None, "data_max_2d must be provided" + + return numcodecs_safeguards.SafeguardedCodec( + codec=numcodecs_zero.ZeroCodec(), + safeguards=[ + dict(kind="eb", type="rel", eb=error_bound, equal_nan=True), + # guarantee that the per-latitude-longitude-slice minimum and + # maximum are preserved, which simplifies the rescaling + dict(kind="sign", offset="x_min"), + dict(kind="sign", offset="x_max"), + dict( + kind="qoi_eb_pw", + qoi=""" + # === pointwise dSSIM quantity of interest === # + + # we guarantee that + # min(data) = min(corrected) and + # max(data) = max(corrected) + # with the sign safeguards above + v["smin"] = c["x_min"]; + v["smax"] = c["x_max"]; + v["r"] = v["smax"] - v["smin"]; + + # re-scale to [0-1] and quantize to 256 bins + v["sc_a2"] = round_ties_even(((x - v["smin"]) / v["r"]) * 255) / 255; + + # force the quantized value to stay the same + return v["sc_a2"]; + """, + type="abs", + eb=0, + ), + ], + # use data_min_2d instead of $x_min since we need the minimum per + # 2d latitude-longitude slice + fixed_constants=dict(x_min=data_min_2d, x_max=data_max_2d), + ) diff --git a/src/climatebenchpress/compressor/compressors/safeguarded/zfp_round.py b/src/climatebenchpress/compressor/compressors/safeguarded/zfp_round.py new file mode 100644 index 0000000..d98f3d9 --- /dev/null +++ b/src/climatebenchpress/compressor/compressors/safeguarded/zfp_round.py @@ -0,0 +1,54 @@ +__all__ = ["SafeguardedZfpRound"] + +import numcodecs_safeguards +import numcodecs_wasm_zfp + +from ..abc import Compressor + + +class SafeguardedZfpRound(Compressor): + """Safeguarded ZFP-ROUND compressor. + + This is an adjusted version of the ZFP compressor with an improved rounding mechanism + for the transform coefficients. + """ + + name = "safeguarded-zfp-round" + description = "Safeguarded(ZFP-ROUND)" + + # NOTE: + # ZFP mechanism for strictly supporting relative error bounds is to + # truncate the floating point bit representation and then use ZFP's lossless + # mode for compression. This is essentially equivalent to the BitRound + # compressors we are already implementing (with a difference what the lossless + # compression algorithm is). + # See https://zfp.readthedocs.io/en/release1.0.1/faq.html#q-relerr for more details. + + @staticmethod + def abs_bound_codec(error_bound, **kwargs): + return numcodecs_safeguards.SafeguardedCodec( + codec=numcodecs_wasm_zfp.Zfp( + mode="fixed-accuracy", tolerance=error_bound, non_finite="allow-unsafe" + ), + safeguards=[ + dict(kind="eb", type="abs", eb=error_bound, equal_nan=True), + ], + ) + + @staticmethod + def rel_bound_codec(error_bound, *, data_abs_min=None, **kwargs): + assert data_abs_min is not None, "data_abs_min must be provided" + + return numcodecs_safeguards.SafeguardedCodec( + # conservative rel->abs error bound transformation, + # same as convert_rel_error_to_abs_error + # so that we can inform the safeguards of the rel bound + codec=numcodecs_wasm_zfp.Zfp( + mode="fixed-accuracy", + tolerance=error_bound * data_abs_min, + non_finite="allow-unsafe", + ), + safeguards=[ + dict(kind="eb", type="rel", eb=error_bound, equal_nan=True), + ], + ) diff --git a/src/climatebenchpress/compressor/plotting/plot_metrics.py b/src/climatebenchpress/compressor/plotting/plot_metrics.py index b00d92c..408b098 100644 --- a/src/climatebenchpress/compressor/plotting/plot_metrics.py +++ b/src/climatebenchpress/compressor/plotting/plot_metrics.py @@ -207,8 +207,9 @@ def _normalize(data): # Normalize each variable by its mean and std normalized[new_col] = normalized.apply( - lambda x: (x[col] - mean_std[x["Variable"]][0]) - / mean_std[x["Variable"]][1], + lambda x: ( + (x[col] - mean_std[x["Variable"]][0]) / mean_std[x["Variable"]][1] + ), axis=1, ) diff --git a/src/climatebenchpress/compressor/scripts/compress.py b/src/climatebenchpress/compressor/scripts/compress.py index 98b71e4..df7d40f 100644 --- a/src/climatebenchpress/compressor/scripts/compress.py +++ b/src/climatebenchpress/compressor/scripts/compress.py @@ -87,6 +87,8 @@ def compress( ds_abs_maxs: dict[str, float] = dict() ds_mins: dict[str, float] = dict() ds_maxs: dict[str, float] = dict() + ds_min_2ds: dict[str, np.ndarray] = dict() + ds_max_2ds: dict[str, np.ndarray] = dict() for v in ds: vs: str = str(v) abs_vals = xr.ufuncs.abs(ds[v]) @@ -96,6 +98,16 @@ def compress( ds_abs_maxs[vs] = abs_vals.max().values.item() ds_mins[vs] = ds[v].min().values.item() ds_maxs[vs] = ds[v].max().values.item() + ds_min_2ds[vs] = ( + ds[v] + .min(dim=[ds[v].cf["Y"].name, ds[v].cf["X"].name], keepdims=True) + .values + ) + ds_max_2ds[vs] = ( + ds[v] + .max(dim=[ds[v].cf["Y"].name, ds[v].cf["X"].name], keepdims=True) + .values + ) if chunked: for v in ds: @@ -115,7 +127,14 @@ def compress( compressor_variants: dict[str, list[NamedPerVariableCodec]] = ( compressor.build( - ds_dtypes, ds_abs_mins, ds_abs_maxs, ds_mins, ds_maxs, error_bounds + ds_dtypes, + ds_abs_mins, + ds_abs_maxs, + ds_mins, + ds_maxs, + ds_min_2ds, + ds_max_2ds, + error_bounds, ) ) @@ -189,6 +208,15 @@ def compress_decompress( if not isinstance(codec, CodecStack): codec = CodecStack(codec) + # HACK: Safeguarded(0, dSSIM) requires the per-lat-lon-slice minimum + # and maximum + # for potentially-chunked data we should really use xarray-safeguards, + # but not using chunks also works (for now) + is_safeguarded_zero_dssim = ( + "# === pointwise dSSIM quantity of interest === #" + in json.dumps(codec.get_config()) + ) + with numcodecs_observers.observe( codec, observers=[ @@ -197,26 +225,42 @@ def compress_decompress( timing, ], ) as codec_: - variables[v] = codec_.encode_decode_data_array(ds[v]).compute() - - measurements[v] = { - "encoded_bytes": sum( - b.post for b in nbytes.encode_sizes[HashableCodec(codec[-1])] - ), - "decoded_bytes": sum( - b.post for b in nbytes.decode_sizes[HashableCodec(codec[0])] - ), - "encode_timing": sum(t for ts in timing.encode_times.values() for t in ts), - "decode_timing": sum(t for ts in timing.decode_times.values() for t in ts), - "encode_instructions": sum( - i for is_ in instructions.encode_instructions.values() for i in is_ - ) - or None, - "decode_instructions": sum( - i for is_ in instructions.decode_instructions.values() for i in is_ - ) - or None, - } + variables[v] = codec_.encode_decode_data_array( + ds[v].compute() if is_safeguarded_zero_dssim else ds[v] + ).compute() + + cs = [c._codec for c in codec_.__iter__()] + + measurements[v] = { + # bytes measurements: only look at the first and last codec in + # the top level stack, which gives the total encoded and + # decoded sizes + "encoded_bytes": sum( + b.post for b in nbytes.encode_sizes[HashableCodec(cs[-1])] + ), + "decoded_bytes": sum( + b.post for b in nbytes.decode_sizes[HashableCodec(cs[0])] + ), + # time measurements: only sum over the top level stack members + # to avoid double counting from nested codec combinators + "encode_timing": sum( + t for c in cs for t in timing.encode_times[HashableCodec(c)] + ), + "decode_timing": sum( + t for c in cs for t in timing.decode_times[HashableCodec(c)] + ), + # encode instructions: sum over all codecs since WASM + # instruction counts are currently not aggregated in codec + # combinators + "encode_instructions": sum( + i for is_ in instructions.encode_instructions.values() for i in is_ + ) + or None, + "decode_instructions": sum( + i for is_ in instructions.decode_instructions.values() for i in is_ + ) + or None, + } return xr.Dataset(variables, coords=ds.coords, attrs=ds.attrs), measurements