From fe3822dbc7a99314db435a17f358306c63a92f61 Mon Sep 17 00:00:00 2001 From: isilber Date: Mon, 6 Apr 2026 19:09:28 +0000 Subject: [PATCH 1/2] DEV: Bundle processing (now includes all REAL instruments) --- profiles_eval/configs/ceil.json | 42 +- profiles_eval/configs/ceilpol.json | 44 + profiles_eval/configs/dl.json | 35 + profiles_eval/configs/hsrl.json | 97 +- profiles_eval/configs/interpolatedsonde.json | 62 ++ profiles_eval/configs/minimpl.json | 63 ++ profiles_eval/configs/mpl.json | 88 +- profiles_eval/configs/rl.json | 79 +- profiles_eval/prof_init.py | 68 -- profiles_eval/prof_utils.py | 43 - profiles_eval/real_lidar_corrections.py | 528 +++++++++++ profiles_eval/real_prof_init.py | 154 ++++ profiles_eval/real_prof_io.py | 902 +++++++++++++++++++ profiles_eval/real_prof_main.py | 92 ++ profiles_eval/real_prof_plot.py | 553 ++++++++++++ profiles_eval/real_prof_utils.py | 247 +++++ 16 files changed, 2881 insertions(+), 216 deletions(-) create mode 100644 profiles_eval/configs/ceilpol.json create mode 100644 profiles_eval/configs/dl.json create mode 100644 profiles_eval/configs/interpolatedsonde.json create mode 100644 profiles_eval/configs/minimpl.json delete mode 100644 profiles_eval/prof_init.py delete mode 100644 profiles_eval/prof_utils.py create mode 100644 profiles_eval/real_lidar_corrections.py create mode 100644 profiles_eval/real_prof_init.py create mode 100644 profiles_eval/real_prof_io.py create mode 100644 profiles_eval/real_prof_main.py create mode 100644 profiles_eval/real_prof_plot.py create mode 100644 profiles_eval/real_prof_utils.py diff --git a/profiles_eval/configs/ceil.json b/profiles_eval/configs/ceil.json index 4262a85..8b33a9a 100644 --- a/profiles_eval/configs/ceil.json +++ b/profiles_eval/configs/ceil.json @@ -1,22 +1,40 @@ { "instrument_info": { - "name": "CEIL (CL31)", + "name": "Ceilometer (CL31)", + "type": "lidar", "instrument_class": "ceil10m", - "level": "b1" - + "level": "b1", + "requires_corrections": false }, "variables": { - "time": "time", - "range": "range", - "attenuated_backscatter": "backscatter" + "time": { + "name_in_file": "time", + "units": "sec" + }, + "range": { + "name_in_file": "range", + "units": "m" + }, + "attenuated_backscatter": { + "name_in_file": "backscatter", + "units": "1/(sr*km*10000)" + }, + "cloud_base": { + "name_in_file": "first_cbh", + "units": "km" + }, + "cloud_base2": { + "name_in_file": "second_cbh", + "units": "km" + }, + "cloud_base3": { + "name_in_file": "third_cbh", + "units": "km" + } }, "coordinates": { "time": "time", - "range": "range" - }, - "units": { - "time": "sec", - "range": "m", - "attenuated_backscatter": "1/(sr*km*10000)" + "range": "range", + "layer": "layer" } } diff --git a/profiles_eval/configs/ceilpol.json b/profiles_eval/configs/ceilpol.json new file mode 100644 index 0000000..b62ad83 --- /dev/null +++ b/profiles_eval/configs/ceilpol.json @@ -0,0 +1,44 @@ +{ + "instrument_info": { + "name": "Polarized Ceilometer (Vaisala CL61)", + "type": "lidar", + "instrument_class": "ceilpol", + "level": "b1", + "requires_corrections": false + }, + "variables": { + "time": { + "name_in_file": "time", + "units": "sec" + }, + "range": { + "name_in_file": "range", + "units": "m" + }, + "attenuated_backscatter": { + "name_in_file": "beta_att", + "units": "1/(m sr)" + }, + "linear_depol_ratio": { + "name_in_file": "linear_depol_ratio", + "units": "1" + }, + "cloud_base": { + "name_in_file": "cloud_base_heights", + "units": "m" + }, + "precipitation_detection": { + "name_in_file": "precipitation_detection", + "units": "1" + }, + "fog_detection": { + "name_in_file": "fog_detection", + "units": "1" + } + }, + "coordinates": { + "time": "time", + "range": "range", + "layer": "layer" + } +} diff --git a/profiles_eval/configs/dl.json b/profiles_eval/configs/dl.json new file mode 100644 index 0000000..9a462d4 --- /dev/null +++ b/profiles_eval/configs/dl.json @@ -0,0 +1,35 @@ +{ + "instrument_info": { + "name": "Doppler Lidar (Halo Streamline)", + "type": "lidar", + "instrument_class": "dlfpt", + "level": "b1", + "requires_corrections": false + }, + "variables": { + "time": { + "name_in_file": "time", + "units": "sec" + }, + "range": { + "name_in_file": "range", + "units": "m" + }, + "attenuated_backscatter": { + "name_in_file": "attenuated_backscatter", + "units": "1/(m sr)" + }, + "radial_velocity": { + "name_in_file": "radial_velocity", + "units": "m/s" + }, + "qc_radial_velocity": { + "name_in_file": "qc_radial_velocity", + "units": "1" + } + }, + "coordinates": { + "time": "time", + "range": "range" + } +} diff --git a/profiles_eval/configs/hsrl.json b/profiles_eval/configs/hsrl.json index 72d3655..f05d063 100644 --- a/profiles_eval/configs/hsrl.json +++ b/profiles_eval/configs/hsrl.json @@ -1,43 +1,76 @@ { "instrument_info": { - "name": "HSRL", + "name": "High spectral resolution lidar (HSRL)", + "type": "lidar", "instrument_class": "hsrl", - "level": "a1" + "instrument_class_highres": "hsrl5s", + "level": "a1", + "requires_corrections": false }, "variables": { - "time": "time", - "range": "range", - "optical_depth": "od_aerosol", - "molecular_attenuated_backscatter": "atten_beta_r_backscatter", - "attenuated_backscatter": "atten_beta_a_backscatter", - "attenuated_backscatter_1064nm": "atten_beta_a_1064nm_backscatter", - "particulate_backscatter": "beta_a_backscatter", - "particulate_backscatter_1064nm": "beta_a_1064_backscatter", - "particulate_extinction": "extinction_aerosol", - "linear_depol_ratio": "linear_depol_ratio", - "linear_depol_ratio_1064nm": "linear_depol_1064", - "molecular_signal_to_noise": "molecular_signal_to_noise", - "particulate_backscatter_signal_to_noise": "particulate_backscatter_signal_to_noise", - "color_ratio": "color_ratio" + "time": { + "name_in_file": "time", + "units": "sec" + }, + "range": { + "name_in_file": "range", + "units": "m" + }, + "particulate_backscatter": { + "name_in_file": "beta_a_backscatter", + "units": "1/(m*sr)" + }, + "particulate_backscatter_1064nm": { + "name_in_file": "beta_a_1064_backscatter", + "units": "1/(m*sr)" + }, + "particulate_extinction": { + "name_in_file": "extinction_aerosol", + "units": "1/m" + }, + "linear_depol_ratio": { + "name_in_file": "linear_depol", + "units": "1" + }, + "linear_depol_ratio_1064nm": { + "name_in_file": "linear_depol_1064", + "units": "1" + }, + "optical_depth": { + "name_in_file": "od_aerosol", + "units": "1" + }, + "molecular_attenuated_backscatter": { + "name_in_file": "atten_beta_r_backscatter", + "units": "1/(m*sr)" + }, + "attenuated_backscatter": { + "name_in_file": "atten_beta_a_backscatter", + "units": "1/(m*sr)" + }, + "attenuated_backscatter_1064nm": { + "name_in_file": "atten_beta_a_1064nm_backscatter", + "units": "1/(m*sr)" + }, + "particulate_backscatter_std": { + "name_in_file": "beta_a_backscatter_std", + "units": "1/(m*sr)" + }, + "molecular_signal_to_noise": { + "name_in_file": "molecular_signal_to_noise", + "units": "1" + }, + "particulate_backscatter_signal_to_noise": { + "name_in_file": "particulate_backscatter_signal_to_noise", + "units": "1" + }, + "color_ratio": { + "name_in_file": "color_ratio", + "units": "1" + } }, "coordinates": { "time": "time", "range": "range" - }, - "units": { - "time": "sec", - "range": "m", - "optical_depth": "1", - "molecular_attenuated_backscatter": "1/(m*sr)", - "attenuated_backscatter": "1/(m*sr)", - "attenuated_backscatter_1064nm": "1/(m*sr)", - "particulate_backscatter": "1/(m*sr)", - "particulate_backscatter_1064nm": "1/(m*sr)", - "particulate_extinction": "1/m", - "linear_depol_ratio": "1", - "linear_depol_ratio_1064nm": "1", - "molecular_signal_to_noise": "1", - "particulate_backscatter_signal_to_noise": "1", - "color_ratio": "1" } } diff --git a/profiles_eval/configs/interpolatedsonde.json b/profiles_eval/configs/interpolatedsonde.json new file mode 100644 index 0000000..7648e31 --- /dev/null +++ b/profiles_eval/configs/interpolatedsonde.json @@ -0,0 +1,62 @@ +{ + "instrument_info": { + "name": "Interpolated Sonde", + "type": "aux", + "instrument_class": "interpolatedsonde", + "level": "c1" + }, + "variables": { + "time": { + "name_in_file": "time", + "units": "sec" + }, + "range": { + "name_in_file": "height", + "units": "m" + }, + "temp": { + "name_in_file": "temp", + "units": "degC" + }, + "qc_temp": { + "name_in_file": "qc_temp", + "units": "1" + }, + "rh": { + "name_in_file": "rh", + "units": "%" + }, + "qc_rh": { + "name_in_file": "qc_rh", + "units": "1" + }, + "pres": { + "name_in_file": "bar_pres", + "units": "kPa" + }, + "qc_pres": { + "name_in_file": "qc_bar_pres", + "units": "1" + }, + "wspd": { + "name_in_file": "wspd", + "units": "m s-1" + }, + "qc_wspd": { + "name_in_file": "qc_wspd", + "units": "1" + }, + "wdir": { + "name_in_file": "wdir", + "units": "degree" + }, + "qc_wdir": { + "name_in_file": "qc_wdir", + "units": "1" + } + }, + "coordinates": { + "time": "time", + "range": "height" + } +} diff --git a/profiles_eval/configs/minimpl.json b/profiles_eval/configs/minimpl.json new file mode 100644 index 0000000..b46129c --- /dev/null +++ b/profiles_eval/configs/minimpl.json @@ -0,0 +1,63 @@ +{ + "instrument_info": { + "name": "miniMPL", + "type": "lidar", + "instrument_class": "minimpl", + "level": "b1", + "requires_corrections": true + }, + "variables": { + "time": { + "name_in_file": "time", + "units": "sec" + }, + "range": { + "name_in_file": "range", + "units": "km" + }, + "raw_signal_co_pol": { + "name_in_file": "signal_return_co_pol", + "units": "counts/us" + }, + "raw_signal_cross_pol": { + "name_in_file": "signal_return_cross_pol", + "units": "counts/us" + }, + "background_co_pol": { + "name_in_file": "background_signal_co_pol", + "units": "counts/us" + }, + "background_cross_pol": { + "name_in_file": "background_signal_cross_pol", + "units": "counts/us" + }, + "attenuated_backscatter": { + "name_in_file": "nrb_co", + "units": "counts/us km^2" + }, + "linear_depol_ratio": { + "name_in_file": "ldr", + "units": "1" + }, + "attenuated_backscatter_cross_pol": { + "name_in_file": "nrb_cross", + "units": "counts/us km^2" + } + }, + "coordinates": { + "time": "time", + "range": "range" + }, + "corrections": { + "deadtime_counts": "deadtime_correction_counts", + "deadtime_factors": "deadtime_correction", + "afterpulse_range": "range", + "afterpulse_profile_co_pol": "afterpulse_correction_co_pol", + "afterpulse_profile_cross_pol":"afterpulse_correction_cross_pol", + "darkcounts_range": "range", + "darkcounts_profile_co_pol": "darkcount_correction_co_pol", + "darkcounts_profile_cross_pol":"darkcount_correction_cross_pol", + "overlap_range": "overlap_correction_heights", + "overlap_factors": "overlap_correction" + } +} diff --git a/profiles_eval/configs/mpl.json b/profiles_eval/configs/mpl.json index efd350c..fe1b131 100644 --- a/profiles_eval/configs/mpl.json +++ b/profiles_eval/configs/mpl.json @@ -1,42 +1,68 @@ { "instrument_info": { "name": "MPL", + "type": "lidar", "instrument_class": "mplcmaskml", - "level": "c1" - + "level": "c1", + "requires_corrections": false }, "variables": { - "time": "time", - "range": "height", - "attenuated_backscatter": "range_corrected_backscatter", - "qc_preprocess_backscatter": "qc_preprocess_backscatter", - "linear_depol_ratio": "linear_depol_ratio", - "qc_linear_depol_ratio": "qc_linear_depol_ratio", - "feature_mask": "cloud_mask", - "qc_feature_mask": "qc_cloud_mask", - "feature_confidence": "cloud_mask_confidence", - "cloud_base": "cloud_base", - "cloud_top": "cloud_top", - "num_cloud_layers": "num_cloud_layers", - "qc_num_cloud_layers": "qc_num_cloud_layers" + "time": { + "name_in_file": "time", + "units": "sec" + }, + "range": { + "name_in_file": "height", + "units": "km" + }, + "attenuated_backscatter": { + "name_in_file": "range_corrected_backscatter", + "units": "km^2*count/us" + }, + "qc_preprocess_backscatter": { + "name_in_file": "qc_preprocess_backscatter", + "units": "1" + }, + "linear_depol_ratio": { + "name_in_file": "linear_depol_ratio", + "units": "1" + }, + "qc_linear_depol_ratio": { + "name_in_file": "qc_linear_depol_ratio", + "units": "1" + }, + "feature_mask": { + "name_in_file": "cloud_mask", + "units": "1" + }, + "qc_feature_mask": { + "name_in_file": "qc_cloud_mask", + "units": "1" + }, + "feature_confidence": { + "name_in_file": "cloud_mask_confidence", + "units": "1" + }, + "cloud_base": { + "name_in_file": "cloud_base", + "units": "km" + }, + "cloud_top": { + "name_in_file": "cloud_top", + "units": "km" + }, + "num_cloud_layers": { + "name_in_file": "num_cloud_layers", + "units": "1" + }, + "qc_num_cloud_layers": { + "name_in_file": "qc_num_cloud_layers", + "units": "1" + } }, "coordinates": { "time": "time", - "range": "height" - }, - "units": { - "time": "sec", - "range": "km", - "attenuated_backscatter": "km^2*count/us", - "qc_preprocess_backscatter": "1", - "linear_depol_ratio": "1", - "qc_linear_depol_ratio": "1", - "cloud_mask": "1", - "qc_cloud_mask": "1", - "cloud_mask_confidence": "1", - "cloud_base": "km", - "cloud_top": "km", - "num_cloud_layers": "1", - "qc_num_cloud_layers": "1" + "range": "height", + "layer": "layer" } } diff --git a/profiles_eval/configs/rl.json b/profiles_eval/configs/rl.json index 9579b18..f68074b 100644 --- a/profiles_eval/configs/rl.json +++ b/profiles_eval/configs/rl.json @@ -1,40 +1,59 @@ { "instrument_info": { - "name": "RL", + "name": "Raman Lidar", + "type": "lidar", "instrument_class": "rlproffex1thor", - "level": "c0" + "level": "c0", + "requires_corrections": false }, "variables": { - "time": "time", - "range": "height_high", - "range_low": "height_low", - "scattering_ratio": "scattering_ratio_e", - "qc_scattering_ratio": "qc_scattering_ratio_e", - "particulate_backscatter": "particulate_backscatter_be", - "particulate_extinction": "extinction_be", - "lidar_ratio": "lidar_ratio_be", - "linear_depol_ratio": "depolarization_ratio", - "source_lidar_ratio": "source_lidar_ratio_be", - "feature_mask": "feature_mask", - "feature_mask_confidence": "detection_confidence_score_total" + "time": { + "name_in_file": "time", + "units": "sec" + }, + "range": { + "name_in_file": "height_high", + "units": "km" + }, + "particulate_backscatter": { + "name_in_file": "particulate_backscatter_be", + "units": "1/(km*sr)" + }, + "particulate_extinction": { + "name_in_file": "extinction_be", + "units": "1/km" + }, + "linear_depol_ratio": { + "name_in_file": "depolarization_ratio", + "units": "1" + }, + "scattering_ratio": { + "name_in_file": "scattering_ratio_e", + "units": "1" + }, + "qc_scattering_ratio": { + "name_in_file": "qc_scattering_ratio_e", + "units": "1" + }, + "lidar_ratio": { + "name_in_file": "lidar_ratio_be", + "units": "sr" + }, + "source_lidar_ratio": { + "name_in_file": "source_lidar_ratio_be", + "units": "1" + }, + "feature_mask": { + "name_in_file": "feature_mask", + "units": "1" + }, + "feature_mask_confidence": { + "name_in_file": "detection_confidence_score_total", + "units": "1" + } }, "coordinates": { "time": "time", - "range": "height_high", - "range_low": "height_low" - }, - "units": { - "time": "sec", - "range": "km", - "range_low": "km", - "scattering_ratio": "1", - "qc_scattering_ratio": "1", - "particulate_backscatter": "1/(km*sr)", - "particulate_extinction": "1/km", - "lidar_ratio": "sr", - "linear_depol_ratio": "1", - "source_lidar_ratio": "1", - "feature_mask": "1", - "feature_mask_confidence": "1" + "range": "height_high" } } diff --git a/profiles_eval/prof_init.py b/profiles_eval/prof_init.py deleted file mode 100644 index be1bf10..0000000 --- a/profiles_eval/prof_init.py +++ /dev/null @@ -1,68 +0,0 @@ -""" -=============================================================== -Israel Silber -=============================================================== -Lidar profile intercomparison initialization module -=============================================================== -""" - -import json -from pathlib import Path -from typing import Dict, List - - -def load_config(instrument_type: str, config_dir: str = "./configs") -> Dict: - """ - Load configuration for a specific instrument type. - - Parameters - ---------- - instrument_type : str - The instrument type (e.g., 'vaisala_ceilometer', 'hsrl', etc.) - config_dir : str, optional - Directory containing the JSON configuration files, by default "./configs" - - Returns - ------- - Dict - Configuration dictionary - - Raises - ------ - FileNotFoundError - If configuration file is not found - ValueError - If JSON file is invalid - """ - config_file = Path(config_dir) / f"{instrument_type}.json" - - if not config_file.exists(): - raise FileNotFoundError(f"Configuration file not found: {config_file}") - - try: - with open(config_file, 'r') as f: - config = json.load(f) - return config - - except json.JSONDecodeError as e: - raise ValueError(f"Invalid JSON in config file {config_file}: {e}") - - -def get_variable_mapping(instrument_type: str, config_dir: str = "./configs") -> Dict[str, str]: - """ - Get variable mapping for an instrument type. - - Parameters - ---------- - instrument_type : str - The instrument type - config_dir : str, optional - Directory containing the JSON configuration files, by default "./configs" - - Returns - ------- - Dict[str, str] - Dictionary mapping standard names to instrument-specific names - """ - config = load_config(instrument_type, config_dir) - return config["variables"] \ No newline at end of file diff --git a/profiles_eval/prof_utils.py b/profiles_eval/prof_utils.py deleted file mode 100644 index 3c664b8..0000000 --- a/profiles_eval/prof_utils.py +++ /dev/null @@ -1,43 +0,0 @@ -""" -=============================================================== -Israel Silber -=============================================================== -Lidar profile intercomparison utility functions -=============================================================== -""" - -from typing import Dict, List, Optional -import prof_init as pi - - -def get_common_variables(instrument_types: List[str], config_dir: str = "./configs") -> List[str]: - """ - Get variables that are common across multiple instrument types. - - Parameters - ---------- - instrument_types : List[str] - List of instrument types to compare - config_dir : str, optional - Directory containing the JSON configuration files, by default "./configs" - - Returns - ------- - List[str] - List of common variable names (standard names) - """ - if not instrument_types: - return [] - - # Load all configs and get variable sets - variable_sets = [] - for inst_type in instrument_types: - config = pi.load_config(inst_type, config_dir) - variable_sets.append(set(config["variables"].keys())) - - # Find intersection - common_vars = variable_sets[0] - for var_set in variable_sets[1:]: - common_vars &= var_set - - return list(common_vars) \ No newline at end of file diff --git a/profiles_eval/real_lidar_corrections.py b/profiles_eval/real_lidar_corrections.py new file mode 100644 index 0000000..bd4ae55 --- /dev/null +++ b/profiles_eval/real_lidar_corrections.py @@ -0,0 +1,528 @@ +""" +=============================================================== +Israel Silber +=============================================================== +Elastic lidar profile correction functions for the REAL intercomparison. +Includes: +- Deadtime correction via count-rate LUT +- Afterpulse correction via range LUTs +- Background subtraction using a per-profile background time series +- Range-squared correction +- Overlap correction via range LUT +=============================================================== +""" +import numpy as np +import xarray as xr + +from real_prof_init import load_config + + +def apply_deadtime_correction( + raw_signal: np.ndarray, + correction_counts: np.ndarray, + correction_factors: np.ndarray, +) -> np.ndarray: + """ + Apply deadtime correction to raw photon-counting lidar signals using + a precomputed lookup table (LUT). + Out-of-range bins are extrapolated using the nearest boundary factor. + + Parameters + ---------- + raw_signal : np.ndarray + 2D array of shape (n_profiles, n_range_bins) containing raw + photon-counting lidar signal in counts/us. Must be the raw detector + output before any other correction. + correction_counts : np.ndarray + 1D array of shape (n_lut,) containing the count-rate axis of the + deadtime correction LUT in counts/us. Must be monotonically + increasing. + correction_factors : np.ndarray + 1D array of shape (n_lut,) containing the multiplicative correction + factor at each count rate. A value of 1.0 means no correction; + values > 1.0 indicate the true signal exceeds the measured signal. + + Returns + ------- + corrected : np.ndarray + 2D array of shape (n_profiles, n_range_bins) with deadtime correction + applied, in counts/us. + """ + interp_factors = np.interp( + raw_signal.ravel(), + correction_counts, + correction_factors, + ).reshape(raw_signal.shape) + + return raw_signal * interp_factors + + +def apply_afterpulse_correction( + signal: np.ndarray, + range_km: np.ndarray, + afterpulse_range: np.ndarray, + afterpulse_profile: np.ndarray, + darkcounts_range: np.ndarray, + darkcounts_profile: np.ndarray, +) -> np.ndarray: + """ + Apply afterpulse correction to a deadtime-corrected lidar signal. + + Parameters + ---------- + signal : np.ndarray + 2D array of shape (n_profiles, n_range_bins) containing the + deadtime-corrected lidar signal in counts/us. + range_km : np.ndarray + 1D array of shape (n_range_bins,) with range gate centres in km + onto which the LUT profiles are interpolated. + afterpulse_range : np.ndarray + 1D array of shape (n_lut_ap,) containing the range axis of the + afterpulse LUT in km. + afterpulse_profile : np.ndarray + 1D array of shape (n_lut_ap,) containing the afterpulse signal + in counts/us. + darkcounts_range : np.ndarray + 1D array of shape (n_lut_dc,) containing the range axis of the + dark-count correction LUT in km. + darkcounts_profile : np.ndarray + 1D array of shape (n_lut_dc,) containing the dark-count correction + profile in counts/us. + + Returns + ------- + corrected : np.ndarray + 2D array of shape (n_profiles, n_range_bins) with afterpulse + subtracted, in counts/us. + """ + # Interpolate both profiles onto the signal range grid + ap_interp = np.interp(range_km, afterpulse_range, afterpulse_profile) + dc_interp = np.interp(range_km, darkcounts_range, darkcounts_profile) + + return signal - (ap_interp - dc_interp) + + +def apply_background_subtraction( + signal: np.ndarray, + background: np.ndarray, +) -> np.ndarray: + """ + Subtract a per-profile background signal from the lidar signal. + + Parameters + ---------- + signal : np.ndarray + 2D array of shape (n_profiles, n_range_bins) containing the + afterpulse-corrected lidar signal in counts/us. + background : np.ndarray + 1D array of shape (n_profiles,) containing the background signal + level for each profile in counts/us (e.g. derived from far-range + bins or a co-located dark measurement). + + Returns + ------- + corrected : np.ndarray + 2D array of shape (n_profiles, n_range_bins) with background + subtracted, in counts/us. + """ + if background.shape[0] != signal.shape[0]: + raise ValueError( + f"background length ({background.shape[0]}) must match " + f"n_profiles ({signal.shape[0]})." + ) + + # background[:, np.newaxis] : (n_profiles, 1) broadcasts over all bins + return signal - background[:, np.newaxis] + + +def apply_range_correction( + signal: np.ndarray, + range_km: np.ndarray, +) -> np.ndarray: + """ + Apply range-squared correction. + + Parameters + ---------- + signal : np.ndarray + 2D array of shape (n_profiles, n_range_bins) in counts/us. + range_km : np.ndarray + 1D array of shape (n_range_bins,) with range gate centres in km. + + Returns + ------- + corrected : np.ndarray + 2D array of shape (n_profiles, n_range_bins) in counts/us * km^2. + """ + return signal * range_km ** 2 + + +def apply_overlap_correction( + signal: np.ndarray, + overlap_range: np.ndarray, + overlap_factors: np.ndarray, + range_km: np.ndarray, +) -> np.ndarray: + """ + Apply overlap correction by interpolating an overlap LUT onto the + range grid of the signal and dividing. + Bins where the interpolated overlap is zero are set to NaN to avoid + division by zero. + + Parameters + ---------- + signal : np.ndarray + 2D array of shape (n_profiles, n_range_bins) in counts/us * km^2. + overlap_range : np.ndarray + 1D array of shape (n_lut,) containing the range axis of the overlap + LUT in km. Must be monotonically increasing. + overlap_factors : np.ndarray + 1D array of shape (n_lut,) containing the overlap correction factors + in [0, 1] at each range in overlap_range. + range_km : np.ndarray + 1D array of shape (n_range_bins,) with the range gate centres in km + onto which the LUT is interpolated. + + Returns + ------- + corrected : np.ndarray + 2D array of shape (n_profiles, n_range_bins) with overlap correction + applied. Bins with zero overlap are NaN. + """ + # Interpolate overlap LUT onto the signal range grid + # Out-of-range bins are extrapolated using the nearest boundary value + overlap_interp = np.interp(range_km, overlap_range, overlap_factors) + + # Zero-overlap bins cannot be corrected + overlap_safe = np.where(overlap_interp == 0.0, np.nan, overlap_interp) + + return signal / overlap_safe # broadcasts (n_profiles, n_bins) / (n_bins,) + + +def compute_nrb( + raw_signal: np.ndarray, + range_km: np.ndarray, + background: np.ndarray, + deadtime_correction_counts: np.ndarray, + deadtime_correction_factors: np.ndarray, + afterpulse_range: np.ndarray, + afterpulse_profile: np.ndarray, + darkcounts_range: np.ndarray, + darkcounts_profile: np.ndarray, + overlap_range: np.ndarray, + overlap_factors: np.ndarray, + calibration_constant: float = 1.0, +) -> np.ndarray: + """ + Compute the Normalized Relative Backscatter (NRB) from raw lidar data. + + Processing steps applied in order: + + 1. **Deadtime correction** -- corrects photon pile-up via a count-rate LUT. + 2. **Afterpulse correction** -- removes detector afterpulse after + subtracting its dark-count component. + 3. **Background subtraction** -- removes per-profile solar background and + dark current provided as a time series. + 4. **Range-squared correction** -- compensates geometric signal decay. + 5. **Overlap correction** -- corrects near-field beam/FOV mismatch via a + range LUT. + 6. **Calibration scaling** -- divides by the instrument calibration constant. + + Parameters + ---------- + raw_signal : np.ndarray + 2D array of shape (n_profiles, n_range_bins) with raw lidar signal + in counts/us. + range_km : np.ndarray + 1D array of shape (n_range_bins,) with range gate centres in km. + background : np.ndarray + 1D array of shape (n_profiles,) with the background signal level per + profile in counts/us. + deadtime_correction_counts : np.ndarray + 1D array of shape (n_lut_dt,) with count-rate axis of deadtime LUT + in counts/us. Must be monotonically increasing. + deadtime_correction_factors : np.ndarray + 1D array of shape (n_lut_dt,) with multiplicative deadtime correction + factors. + afterpulse_range : np.ndarray + 1D array of shape (n_lut_ap,) with range axis of the afterpulse LUT + in km. + afterpulse_profile : np.ndarray + 1D array of shape (n_lut_ap,) with afterpulse signal in counts/us. + darkcounts_range : np.ndarray + 1D array of shape (n_lut_dc,) with range axis of the dark-count LUT + in km. + darkcounts_profile : np.ndarray + 1D array of shape (n_lut_dc,) with dark-count correction profile in + counts/us. + overlap_range : np.ndarray + 1D array of shape (n_lut_ol,) with range axis of overlap LUT in km. + Must be monotonically increasing. + overlap_factors : np.ndarray + 1D array of shape (n_lut_ol,) with overlap correction factors in [0,1]. + calibration_constant : float, optional + Instrument calibration constant C. Use 1.0 for relative NRB. + Default is 1.0. + + Returns + ------- + nrb : np.ndarray + 2D array of shape (n_profiles, n_range_bins) with NRB values in + counts/us * km^2. Bins with zero overlap are NaN. + """ + # Step 1 — deadtime correction + signal = apply_deadtime_correction( + raw_signal, + deadtime_correction_counts, + deadtime_correction_factors, + ) + + # Step 2 — afterpulse correction + signal = apply_afterpulse_correction( + signal, + range_km, + afterpulse_range, + afterpulse_profile, + darkcounts_range, + darkcounts_profile, + ) + + # Step 3 — background subtraction + signal = apply_background_subtraction(signal, background) + + # Step 4 — range-squared correction + signal = apply_range_correction(signal, range_km) + + # Step 5 — overlap correction + signal = apply_overlap_correction( + signal, + overlap_range, + overlap_factors, + range_km, + ) + + # Step 6 — calibration scaling + return signal / calibration_constant + +def compute_nrb_dataset( + ds: "xr.Dataset", + instrument_type: str = None, + *, + variables: dict = None, + corrections: dict = None, + calibration_constant: float = 1.0, + cross_pol: bool = True, + config_dir: str = "./configs", +) -> "xr.Dataset": + """ + Compute co-pol NRB and, optionally, cross-pol NRB and linear depolarization + ratio (LDR) from an elastic lidar xarray Dataset. + + Field names can be provided in two mutually exclusive ways: + + 1. **Config file** (recommended): pass ``instrument_type`` and the function + loads ``/.json`` via + :func:`prof_init.load_config`. + + 2. **Manual dicts**: pass ``variables`` and ``corrections`` directly, + using the same key structure as the JSON file. + + The LDR is defined as: + + LDR = NRB_cross / (NRB_co + NRB_cross) + + Parameters + ---------- + ds : xr.Dataset + Dataset loaded from a lidar NetCDF file. + instrument_type : str, optional + Instrument identifier used to locate the JSON config file, e.g. + ``"minimpl"`` or ``"mpl"``. + Mutually exclusive with ``variables`` and ``corrections``. + variables : dict, optional + Manual mapping of standard variable names to file field names. + corrections : dict, optional + Manual mapping of standard correction keys to file field names + (flat strings). + calibration_constant : float, optional + Instrument calibration constant. Default is 1.0. + cross_pol : bool, optional + Whether cross-pol data are present. If True (default), ``nrb_cross`` + and ``ldr`` are computed and included in the output. If False, only + ``nrb_co`` is returned. + config_dir : str, optional + Directory containing the JSON configuration files. Only used when + ``instrument_type`` is provided. Default is ``"./configs"``. + + Returns + ------- + xr.Dataset + Dataset with variable ``nrb_co`` and, when ``cross_pol=True``, + also ``nrb_cross`` and ``ldr``. + """ + # Resolve field-name mappings + if instrument_type is not None and (variables is not None or corrections is not None): + raise ValueError( + "Provide either 'instrument_type' or 'variables'/'corrections', not both." + ) + if instrument_type is not None: + _cfg = load_config(instrument_type, config_dir) + v = _cfg["variables"] + c = _cfg["corrections"] + elif variables is not None and corrections is not None: + v = variables + c = corrections + else: + raise ValueError( + "Provide either 'instrument_type' or both 'variables' and 'corrections'." + ) + + def _name(entry): + """Accept a plain field-name string or a {name_in_file, ...} dict.""" + return entry if isinstance(entry, str) else entry["name_in_file"] + + def _units(entry): + """Return units string, or empty string if not specified.""" + return "" if isinstance(entry, str) else entry.get("units", "") + + # Extract all arrays from the dataset upfront + range_km = ds[_name(v["range"])].values + raw_co = ds[_name(v["raw_signal_co_pol"])].values + background_co = ds[_name(v["background_co_pol"])].values + dt_counts = ds[c["deadtime_counts"]].values + dt_factors = ds[c["deadtime_factors"]].values + ap_range = ds[c["afterpulse_range"]].values + ap_profile_co = ds[c["afterpulse_profile_co_pol"]].values + dc_range = ds[c["darkcounts_range"]].values + dc_profile_co = ds[c["darkcounts_profile_co_pol"]].values + overlap_range = ds[c["overlap_range"]].values + overlap_factors = ds[c["overlap_factors"]].values + + nrb_co = compute_nrb( + raw_signal = raw_co, + range_km = range_km, + background = background_co, + deadtime_correction_counts = dt_counts, + deadtime_correction_factors = dt_factors, + afterpulse_range = ap_range, + afterpulse_profile = ap_profile_co, + darkcounts_range = dc_range, + darkcounts_profile = dc_profile_co, + overlap_range = overlap_range, + overlap_factors = overlap_factors, + calibration_constant = calibration_constant, + ) + + dims = ("time", "range_bins") + data_vars = { + _name(v["attenuated_backscatter"]): ( + dims, nrb_co, + {"units": _units(v["attenuated_backscatter"]), "long_name": "NRB co-pol"}, + ), + } + + if cross_pol: + raw_cross = ds[_name(v["raw_signal_cross_pol"])].values + background_cross = ds[_name(v["background_cross_pol"])].values + ap_profile_cross = ds[c["afterpulse_profile_cross_pol"]].values + dc_profile_cross = ds[c["darkcounts_profile_cross_pol"]].values + + nrb_cross = compute_nrb( + raw_signal = raw_cross, + range_km = range_km, + background = background_cross, + deadtime_correction_counts = dt_counts, + deadtime_correction_factors = dt_factors, + afterpulse_range = ap_range, + afterpulse_profile = ap_profile_cross, + darkcounts_range = dc_range, + darkcounts_profile = dc_profile_cross, + overlap_range = overlap_range, + overlap_factors = overlap_factors, + calibration_constant = calibration_constant, + ) + ldr = nrb_cross / (nrb_co + nrb_cross) + data_vars[_name(v["attenuated_backscatter_cross_pol"])] = ( + dims, nrb_cross, + {"units": _units(v["attenuated_backscatter_cross_pol"]), "long_name": "NRB cross-pol"}, + ) + data_vars[_name(v["linear_depol_ratio"])] = ( + dims, ldr, + {"units": _units(v["linear_depol_ratio"]), + "long_name": "Linear depolarization ratio"}, + ) + + return xr.Dataset( + data_vars, + coords={ + "time": ds[_name(v["time"])], + "range_bins": range_km, + }, + ) + + +# ----------------------------------------------------------------------- +# Demo +# ----------------------------------------------------------------------- +if __name__ == "__main__": + import matplotlib.pyplot as plt + import matplotlib.dates as mdates + + # ------------------------------------------------------------------ + # Load data and compute NRB dataset + # ------------------------------------------------------------------ + FILE = "/data/archive/sgp/sgpminimplC1.b1/sgpminimplC1.b1.20260214.000009.nc" + ds = xr.open_dataset(FILE) + result = compute_nrb_dataset(ds, instrument_type="minimpl", config_dir="./configs") + + has_cross = "ldr" in result + max_range_km = 10.0 # maximum y-axis range (km) + + # Build plot dataset — keep only up to max_range_km + range_sel = result.range_bins <= max_range_km + plot_ds = result.sel(range_bins=range_sel) + + # Add log10-transformed fields (raw signal and NRB co); LDR stays linear + range_mask = ds["range"].values <= max_range_km + raw_vals = ds["signal_return_co_pol"].values[:, range_mask] + plot_ds["raw"] = xr.DataArray( + np.log10(np.where(raw_vals > 0, raw_vals, np.nan)), + dims=["time", "range_bins"], + coords={"time": plot_ds.time, "range_bins": plot_ds.range_bins}, + ) + plot_ds["log_nrb_co"] = np.log10(plot_ds["nrb_co"].where(plot_ds["nrb_co"] > 0)) + + # (var, colorbar label, colormap, profile colour, title, fixed vmin, fixed vmax) + # None means derive vmin/vmax from 5th/95th percentile + rows = [ + ("raw", "log₁₀ counts/µs", "jet", "steelblue", "Raw Signal", None, None), + ("log_nrb_co", "log₁₀ counts/µs · km²", "jet", "darkorange", "NRB (co-pol)", None, None), + ] + if has_cross: + rows.append(("ldr", "LDR", "jet", "firebrick", "LDR", 0.0, 0.6)) + + n_rows = len(rows) + fig, axes = plt.subplots(n_rows, 1, figsize=(18, 5 * n_rows), + constrained_layout=True) + if n_rows == 1: + axes = [axes] + + for ax_c, (var, cb_label, cmap, _, title, fixed_vmin, fixed_vmax) in zip(axes, rows): + da = plot_ds[var] + vmin = fixed_vmin if fixed_vmin is not None else float(da.quantile(0.01)) + vmax = fixed_vmax if fixed_vmax is not None else float(da.quantile(0.99)) + + da.plot.pcolormesh( + ax=ax_c, x="time", y="range_bins", + cmap=cmap, vmin=vmin, vmax=vmax, + cbar_kwargs={"label": cb_label}, + ) + ax_c.xaxis.set_major_formatter(mdates.DateFormatter("%H:%M")) + ax_c.set_ylim(float(plot_ds.range_bins[0]), max_range_km) + ax_c.set_title(f"{title} Curtain") + ax_c.set_xlabel("Time (UTC)") + ax_c.set_ylabel("Range (km)") + + fig.autofmt_xdate(rotation=30) + plt.savefig("nrb_full_pipeline.png", dpi=150) + plt.show() \ No newline at end of file diff --git a/profiles_eval/real_prof_init.py b/profiles_eval/real_prof_init.py new file mode 100644 index 0000000..e533b91 --- /dev/null +++ b/profiles_eval/real_prof_init.py @@ -0,0 +1,154 @@ +""" +=============================================================== +Israel Silber +=============================================================== +Lidar profile intercomparison initialization module +=============================================================== +""" + +import json +from pathlib import Path +from typing import Dict + + +def load_config(instrument_type: str, config_dir: str = "./configs") -> Dict: + """ + Load configuration for a specific instrument type. + + Parameters + ---------- + instrument_type : str + The instrument type (e.g., 'vaisala_ceilometer', 'hsrl', etc.) + config_dir : str, optional + Directory containing the JSON configuration files, by default "./configs" + + Returns + ------- + Dict + Configuration dictionary loaded from the JSON file + """ + config_file = Path(config_dir) / f"{instrument_type}.json" + + if not config_file.exists(): + raise FileNotFoundError(f"Configuration file not found: {config_file}") + + try: + with open(config_file, 'r') as f: + config = json.load(f) + return config + + except json.JSONDecodeError as e: + raise ValueError(f"Invalid JSON in config file {config_file}: {e}") + + +def get_variable_mapping(instrument_type: str, config_dir: str = "./configs") -> Dict[str, dict]: + """ + Get variable mapping for an instrument type. + + Parameters + ---------- + instrument_type : str + The instrument type + config_dir : str, optional + Directory containing the JSON configuration files, by default "./configs" + + Returns + ------- + Dict[str, dict] + Dictionary mapping standard names to dicts with ``name_in_file`` and + ``units`` keys. + """ + config = load_config(instrument_type, config_dir) + return config["variables"] + + +def get_corrections_mapping(instrument_type: str, config_dir: str = "./configs") -> Dict[str, str]: + """ + Get the corrections field-name mapping for an instrument type. + + Parameters + ---------- + instrument_type : str + The instrument type (must have a ``"corrections"`` section in its + JSON config, e.g. ``"minimpl"``). + config_dir : str, optional + Directory containing the JSON configuration files, by default + ``"./configs"``. + + Returns + ------- + Dict[str, str] + Dictionary mapping standard correction keys to instrument-specific + NetCDF field names. + + Raises + ------ + KeyError + If the config file has no ``"corrections"`` section. + """ + config = load_config(instrument_type, config_dir) + if "corrections" not in config: + raise KeyError( + f"No 'corrections' section found in config for '{instrument_type}'." + ) + return config["corrections"] + + +def find_instrument_type( + source_datastream: str, + site: str, + facility: str, + config_dir: str = "./configs", +) -> str | None: + """ + Return the instrument_type key whose config matches *source_datastream*. + + Parses ``{instrument_class}`` and ``{level}`` from *source_datastream* + (formatted as ``{site}{instrument_class}{facility}.{level}``) and scans + every JSON config in *config_dir* for a matching ``instrument_class`` / + ``level`` pair. Returns ``None`` if no match is found. + + Parameters + ---------- + source_datastream : str + ARM datastream string, e.g. ``"sgpceil10mC1.b1"``. + site : str + ARM site code, e.g. ``"sgp"``. + facility : str + ARM facility code, e.g. ``"C1"``. + config_dir : str, optional + Directory containing JSON configuration files. Default is + ``"./configs"``. + + Returns + ------- + str or None + The instrument_type string (JSON filename stem), or ``None`` if no + matching config is found. + """ + stem = source_datastream + if stem.startswith(site): + stem = stem[len(site):] + parts = stem.split(".") + if len(parts) < 2: + return None + level = parts[1] # e.g. "b1" + class_fac = parts[0] # e.g. "ceil10mC1" + if class_fac.endswith(facility): + instrument_class = class_fac[: -len(facility)] # e.g. "ceil10m" + else: + return None + + for cfg_path in sorted(Path(config_dir).glob("*.json")): + try: + with open(cfg_path) as fh: + cfg = json.load(fh) + info = cfg.get("instrument_info", {}) + if ( + info.get("instrument_class") == instrument_class + and info.get("level") == level + ): + return cfg_path.stem + except Exception: + continue + return None \ No newline at end of file diff --git a/profiles_eval/real_prof_io.py b/profiles_eval/real_prof_io.py new file mode 100644 index 0000000..bd6f03d --- /dev/null +++ b/profiles_eval/real_prof_io.py @@ -0,0 +1,902 @@ +""" +=============================================================== +Israel Silber +=============================================================== +Lidar profile intercomparison real-data I/O module +=============================================================== +""" +import warnings +from datetime import datetime, timezone +from pathlib import Path +import numpy as np +import xarray as xr + +from real_prof_init import load_config +from real_prof_utils import interpolate_data +from real_lidar_corrections import compute_nrb_dataset + +# Number of cloud/precipitation layers used throughout the pipeline +N_LAYERS = 10 + +# Canonical variable ordering: variables are grouped by keyword presence in +# the order listed below. qc_ variants are inserted after their parent. +_PRIORITY_VARS = [ + "particulate_backscatter", + "extinction", + "attenuated_backscatter", + "depol", + "cloud_base", + "cloud_top" +] + + +def _ordered_vars(var_names: list[str]) -> list[str]: + """Return *var_names* in canonical order with qc_ vars after their parent. + + Variables whose names contain a keyword from ``_PRIORITY_VARS`` are placed + first (in keyword order), each immediately followed by its ``qc_`` twin. + Remaining variables follow in their original relative order, likewise each + followed by their ``qc_`` twin. Orphaned ``qc_`` vars (whose parent was + not present) appear last. + """ + remaining = list(var_names) + ordered: list[str] = [] + + def _pop(name: str) -> bool: + if name in remaining: + remaining.remove(name) + ordered.append(name) + return True + return False + + # Keyword groups: collect non-qc vars whose name contains the keyword, + # preserving their relative order; each is followed by its qc_ twin. + for keyword in _PRIORITY_VARS: + group = [v for v in remaining if keyword in v and not v.startswith("qc_")] + for base in group: + _pop(base) + _pop(f"qc_{base}") + + # Remaining non-qc variables (in their original relative order), + # each followed by their qc_ twin + for var in [v for v in remaining if not v.startswith("qc_")]: + _pop(var) + _pop(f"qc_{var}") + + # Any orphaned qc_ vars whose parent was not present + for var in list(remaining): + _pop(var) + + return ordered + + +def find_arm_files( + instrument_type: str, + site: str, + facility: str, + data_path: str | Path | None, + time_min: np.datetime64, + time_max: np.datetime64, + safety_delta: np.timedelta64 = np.timedelta64(5, "m"), + config_dir: str = "./configs", + data_path_template: str | None = None, +) -> list[Path]: + """ + Find ARM instrument data files within a given time range. + + Scans ``data_path`` for files whose name matches the ARM convention + ``{site}{instrument_class}{facility}.{level}.YYYYMMDD.HHMMSS.*`` and + whose start timestamp falls within + ``[time_min - safety_delta, time_max + safety_delta]`` plus the single + file whose timestamp is the largest one strictly before + ``t_search_min``. + + The ``instrument_class`` and ``level`` are read from the instrument's + JSON config file. + + Parameters + ---------- + instrument_type : str + Instrument identifier matching a JSON config file, e.g. + ``"minimpl"`` or ``"ceil"``. + site : str + ARM site code, e.g. ``"sgp"``. + facility : str + ARM facility code, e.g. ``"C1"``. + data_path : str, Path, or None + Directory to search for data files. Mutually exclusive with + ``data_path_template``; pass ``None`` when using the template. + time_min : np.datetime64 + Start of the desired time range. + time_max : np.datetime64 + End of the desired time range. + safety_delta : np.timedelta64, optional + Widening margin applied symmetrically around ``[time_min, time_max]`` + when searching. Default is 5 minutes. + config_dir : str, optional + Directory containing JSON configuration files. Default is + ``"./configs"``. + data_path_template : str, optional + Format string for the ARM nested archive layout, e.g. + ``"/data/archive/{site}/{site}{instrument_class}{facility}.{level}"``. + Used only when ``data_path`` is ``None``. + + Returns + ------- + list[Path] + Sorted list of matching file paths. + """ + cfg = load_config(instrument_type, config_dir) + info = cfg["instrument_info"] + instrument_class = info["instrument_class"] + level = info["level"] + + if data_path is None: + if data_path_template is None: + raise ValueError( + "Either 'data_path' or 'data_path_template' must be provided." + ) + data_path = Path( + data_path_template.format( + site=site, + instrument_class=instrument_class, + facility=facility, + level=level, + ) + ) + else: + data_path = Path(data_path) + stem_prefix = f"{site}{instrument_class}{facility}.{level}." + + t_search_min = time_min - safety_delta + t_search_max = time_max + safety_delta + + # Two-pass scan: + # Pass 1 – collect all candidate files and their parsed timestamps. + # Pass 2 – keep files within [t_search_min, t_search_max] plus the single + # file whose timestamp is the largest one strictly before + # t_search_min (the "covering" file — e.g. a daily file that + # started at 00:00 but contains data through the whole day). + candidates: list[tuple[np.datetime64, Path]] = [] + for fp in sorted(data_path.iterdir()): + if not fp.name.startswith(stem_prefix): + continue + parts = fp.stem.split(".") + if len(parts) < 4: + continue + try: + file_dt = np.datetime64( + datetime.strptime(f"{parts[2]}{parts[3]}", "%Y%m%d%H%M%S"), "s" + ) + except ValueError: + continue + candidates.append((file_dt, fp)) + + selected: list[Path] = [] + covering: tuple[np.datetime64, Path] | None = None # latest file before window + for file_dt, fp in candidates: + if t_search_min <= file_dt <= t_search_max: + selected.append(fp) + elif file_dt < t_search_min: + if covering is None or file_dt > covering[0]: + covering = (file_dt, fp) + + if covering is not None: + selected.insert(0, covering[1]) + + if not selected: + raise FileNotFoundError( + f"No '{instrument_type}' files found in '{data_path}' for the period " + f"{time_min} to {time_max} (safety_delta={safety_delta})." + ) + + return selected + + +def load_arm_data( + instrument_type: str, + site: str, + facility: str, + data_path: str | Path | None, + time_min: np.datetime64, + time_max: np.datetime64, + safety_delta: np.timedelta64 = np.timedelta64(5, "m"), + config_dir: str = "./configs", + data_path_template: str | None = None, +) -> xr.Dataset: + """ + Load ARM instrument data files into a single xarray Dataset. + + Delegates file discovery to :func:`find_arm_files`, then opens and + concatenates the matching files along the time dimension. Only + variables (and correction fields, if defined) listed in the instrument's + JSON config are retained. + + The returned dataset is trimmed so that: + + * The earliest sample is at or after ``time_min - safety_delta``. + * The latest sample is the **first sample beyond** ``time_max`` + (inclusive), giving one look-ahead point for downstream interpolation + or boundary handling. If no such sample exists the dataset ends at + the last available sample within the window. + + Parameters + ---------- + instrument_type : str + Instrument identifier matching a JSON config file, e.g. + ``"minimpl"`` or ``"ceil"``. + site : str + ARM site code. + facility : str + ARM facility code. + data_path : str, Path, or None + Directory containing the instrument data files. Pass ``None`` + when ``data_path_template`` is used instead. + time_min : np.datetime64 + Start of the desired time range (inclusive). + time_max : np.datetime64 + End of the desired time range. One sample beyond this time will + be included in the output if available. + safety_delta : np.timedelta64, optional + Look-back/look-ahead margin used both for file discovery and for + trimming the output. Default is 5 minutes. + config_dir : str, optional + Directory containing JSON configuration files. Default is + ``"./configs"``. + data_path_template : str, optional + Format string for the ARM nested archive layout (see + :func:`find_arm_files` for placeholder details). Used only when + ``data_path`` is ``None``. + + Returns + ------- + xr.Dataset + Merged dataset containing only the fields defined in the config, + trimmed to the requested time window with one sample of look-ahead + beyond ``time_max``. + """ + # ------------------------------------------------------------------ + # 1. Load config and resolve the set of NetCDF field names to keep + # ------------------------------------------------------------------ + cfg = load_config(instrument_type, config_dir) + info = cfg["instrument_info"] + + v = cfg["variables"] + # nif_to_key: name_in_file -> canonical JSON key (for renaming after load) + nif_to_key: dict[str, str] = { + (entry["name_in_file"] if isinstance(entry, dict) else entry): key + for key, entry in v.items() + } + field_names: set[str] = set(nif_to_key.keys()) + if "corrections" in cfg: + field_names |= set(cfg["corrections"].values()) + + # ------------------------------------------------------------------ + # 2. Discover files + # ------------------------------------------------------------------ + files = find_arm_files( + instrument_type, site, facility, data_path, + time_min, time_max, safety_delta, config_dir, + data_path_template=data_path_template, + ) + + # ------------------------------------------------------------------ + # 3. Open and concatenate, keeping only config-defined fields + # ------------------------------------------------------------------ + # Open the first file to grab global provenance attrs AND per-variable + # attrs. While xarray preserves variable attrs via open_mfdataset, we + # collect them explicitly here so they can be applied unconditionally + # in step 5, regardless of any merging edge-cases. + with xr.open_dataset(str(files[0])) as _ds0: + source_datastream = _ds0.attrs.get("datastream", "") + source_process_version = _ds0.attrs.get("process_version", "") + file_var_attrs: dict[str, dict] = { + fv: dict(_ds0[fv].attrs) + for fv in _ds0.data_vars + if fv in field_names + } + + def _preprocess(ds: xr.Dataset) -> xr.Dataset: + keep = [f for f in field_names if f in ds] + return ds[keep] + + merged = xr.open_mfdataset( + [str(fp) for fp in files], + combine="nested", + concat_dim="time", + # "minimal": only concatenate variables that have a time dimension; + # static variables (correction LUTs, range) come from the first file. + data_vars="minimal", + coords="minimal", + preprocess=_preprocess, + engine="netcdf4", + ) + + merged = merged.sortby("time") + + # ------------------------------------------------------------------ + # 4. Trim to [time_min - safety_delta, first sample > time_max] + # ------------------------------------------------------------------ + t_arr = merged.time.values + t_search_min_ns = (time_min - safety_delta).astype("datetime64[ns]") + time_max_ns = time_max.astype("datetime64[ns]") + + mask_start = t_arr >= t_search_min_ns + after_end = t_arr[t_arr > time_max_ns] + mask_end = t_arr <= (after_end[0] if len(after_end) > 0 else time_max_ns) + + ds = merged.isel(time=mask_start & mask_end) + + if ds.sizes.get("time", 0) == 0: + raise FileNotFoundError( + f"No '{instrument_type}' data in '{data_path}' falls within the period " + f"{time_min} to {time_max} (safety_delta={safety_delta})." + ) + + # ------------------------------------------------------------------ + # 5. Attach field attributes to every data variable + # ------------------------------------------------------------------ + # Apply original file variable attrs first (fills in long_name, units, + # valid_min/max, missing_value, etc.), then overwrite / add the three + # provenance attrs. Using setdefault for file attrs preserves any attrs + # already set by xarray (they will be identical in practice). + for var in ds.data_vars: + for k, val in file_var_attrs.get(str(var), {}).items(): + ds[var].attrs.setdefault(k, val) + ds[var].attrs["source_fieldname"] = str(var) + ds[var].attrs["source_datastream"] = source_datastream + ds[var].attrs["source_process_version"] = source_process_version + + return ds + + +# --------------------------------------------------------------------------- +# High-level loader + processor +# --------------------------------------------------------------------------- + +def load_and_process_arm_data( + instrument_type: str | list[str], + site: str, + facility: str, + data_path: str | Path | None, + time_min: np.datetime64, + time_max: np.datetime64, + safety_delta: np.timedelta64 = np.timedelta64(5, "m"), + config_dir: str = "./configs", + interpolate: bool = True, + range_km: "np.ndarray | None" = None, + time_step: np.timedelta64 = np.timedelta64(15, "s"), + data_path_template: str | None = None, +) -> xr.Dataset: + """ + Load, optionally correct, and optionally interpolate ARM data. + + This function chains :func:`load_arm_data`, optional NRB corrections + (when ``instrument_info.requires_corrections`` is ``true`` and + ``instrument_info.type`` is ``"lidar"``), and optional uniform-grid + interpolation via :func:`real_prof_utils.interpolate_data`. + + If the instrument config contains an ``instrument_class_highres`` key, + a second higher-resolution dataset is loaded from the same directory. + After interpolation, the high-resolution product fills the range bins + up to its maximum available range; the standard-resolution product + fills the remaining (higher) range bins. + + Cloud-base and cloud-top fields numbered with a suffix (``cloud_base``, + ``cloud_base2``, ``cloud_base3``, …) are concatenated along a ``layer`` + dimension of size 10 (first layer = ``cloud_base``, subsequent layers + from numbered fields; remaining layers are filled with NaN). + + Parameters + ---------- + instrument_type : str or list of str + Instrument identifier (class) matching a JSON config file, e.g. + ``"ceil"`` or ``["ceil", "hsrl", "minimpl"]``. When a list is + provided, each instrument is processed independently and the + resulting datasets are merged; every data variable is prefixed + with ``"_"`` to avoid name collisions. + site : str + ARM site code + facility : str + ARM facility code + data_path : str, Path, or None + Directory containing the instrument data files. Pass ``None`` + when ``data_path_template`` is used instead. + time_min : np.datetime64 + Start of the desired output time grid. + time_max : np.datetime64 + End of the desired output time grid. + safety_delta : np.timedelta64, optional + Lookback/ahead margin for file discovery. Default is 5 minutes. + config_dir : str, optional + Directory containing JSON configuration files. Default is + ``"./configs"``. + interpolate : bool, optional + Whether to interpolate onto a uniform grid after loading and + correcting. Default is ``True``. + range_km : np.ndarray, optional + Output range grid in km. Passed through to + :func:`real_prof_utils.interpolate_data`. If ``None`` the default + 0-20 km / 15 m grid is used. + time_step : np.timedelta64, optional + Output time step. Default is 15 s. + data_path_template : str, optional + Format string for the ARM nested archive layout (see + :func:`find_arm_files`). When provided, ``data_path`` may be + ``None`` and the concrete path is derived per-instrument. This is + especially convenient when ``instrument_type`` is a list. + + Returns + ------- + xr.Dataset + Processed (and optionally interpolated) dataset. When multiple + instruments are requested (list), variables from each are prefixed with + ``"_"`` and the datasets are merged. + """ + # ------------------------------------------------------------------ + # Multi-instrument calls: recurse per instrument, prefix, merge + # ------------------------------------------------------------------ + if isinstance(instrument_type, list): + datasets = [] + for instr in instrument_type: + print(f"Loading '{instr}' ...") + try: + ds_instr = load_and_process_arm_data( + instr, site, facility, data_path, + time_min, time_max, safety_delta, config_dir, + interpolate, range_km, time_step, + data_path_template=data_path_template, + ) + except FileNotFoundError as exc: + warnings.warn(f"[{instr}] skipped — {exc}", stacklevel=2) + continue + rename_map = {var: f"{instr}_{var}" for var in ds_instr.data_vars} + # Rename any dimension that is not a shared axis (time, range, layer) + # to avoid conflicts when instruments have different sizes for + # the same dimension name. + for dim in ds_instr.dims: + if dim not in ("time", "range", "layer"): + rename_map[dim] = f"{instr}_{dim}" + ds_instr = ds_instr.rename(rename_map) + + # Pad or trim 'layer' dimension to N_LAYERS so all instruments + # share a single compatible layer coordinate when merged. + if "layer" in ds_instr.dims and ds_instr.sizes["layer"] != N_LAYERS: + cur = ds_instr.sizes["layer"] + layer_vars = [v for v in ds_instr.data_vars if "layer" in ds_instr[v].dims] + new_das = {} + for lv in layer_vars: + da = ds_instr[lv] + arr = da.values + ax = list(da.dims).index("layer") + if cur < N_LAYERS: + pad_shape = list(arr.shape) + pad_shape[ax] = N_LAYERS - cur + arr = np.concatenate([arr, np.full(pad_shape, np.nan)], axis=ax) + else: + idx = [slice(None)] * arr.ndim + idx[ax] = slice(None, N_LAYERS) + arr = arr[tuple(idx)] + new_coords = {d: da.coords[d] for d in da.dims if d in da.coords and d != "layer"} + new_coords["layer"] = np.arange(N_LAYERS) + new_das[lv] = xr.DataArray(arr, dims=da.dims, coords=new_coords, attrs=da.attrs) + # Drop all layer-dim vars + the stale layer coord, then re-add + drop = layer_vars + (["layer"] if "layer" in ds_instr.coords else []) + ds_instr = ds_instr.drop_vars(drop) + for lv, new_da in new_das.items(): + ds_instr = ds_instr.assign({lv: new_da}) + + datasets.append(ds_instr) + if not datasets: + warnings.warn( + f"No data found for any of the requested instruments " + f"for the period {time_min} to {time_max}.", + stacklevel=2, + ) + return xr.Dataset() + return xr.merge(datasets) + + cfg = load_config(instrument_type, config_dir) + info = cfg["instrument_info"] + is_lidar = info.get("type", "lidar") == "lidar" + needs_corrections = is_lidar and info.get("requires_corrections", False) + + # key_to_nif: canonical JSON key -> name_in_file (used by step 4) + _cfg_vars = cfg["variables"] + key_to_nif: dict[str, str] = { + k: (e["name_in_file"] if isinstance(e, dict) else e) + for k, e in _cfg_vars.items() + } + + # ------------------------------------------------------------------ + # 1. Load primary dataset + # ------------------------------------------------------------------ + ds = load_arm_data( + instrument_type, site, facility, data_path, + time_min, time_max, safety_delta, config_dir, + data_path_template=data_path_template, + ) + + # Derive range units from the field attribute so that new + # instruments are handled correctly without touching the config. + range_nif = key_to_nif.get("range", "range") + _range_da = ds.get(range_nif, ds.coords.get(range_nif)) + range_units = ( + _range_da.attrs.get("units", "m") if _range_da is not None else "m" + ) + + # ------------------------------------------------------------------ + # 2. Load high-resolution dataset if specified + # ------------------------------------------------------------------ + highres_class = info.get("instrument_class_highres") + ds_highres = None + if highres_class is not None: + # Build a temporary config-like instrument_type by scanning for a + # matching config file; if not found, attempt to load by class name + try: + ds_highres = load_arm_data( + highres_class, site, facility, data_path, + time_min, time_max, safety_delta, config_dir, + data_path_template=data_path_template, + ) + except FileNotFoundError: + ds_highres = None + + # ------------------------------------------------------------------ + # 3. Apply NRB corrections if required + # ------------------------------------------------------------------ + correction_field_names: list[str] = [] + if needs_corrections: + + c = cfg.get("corrections", {}) + correction_field_names = list(c.values()) + + # Preserve provenance attributes from raw fields that will be replaced + v = cfg["variables"] + nrb_co_nif = (v.get("attenuated_backscatter", {}).get("name_in_file", "") + if isinstance(v.get("attenuated_backscatter"), dict) + else v.get("attenuated_backscatter", "")) + nrb_x_nif = (v.get("attenuated_backscatter_cross_pol", {}).get("name_in_file", "") + if isinstance(v.get("attenuated_backscatter_cross_pol"), dict) + else v.get("attenuated_backscatter_cross_pol", "")) + ldr_nif = (v.get("linear_depol_ratio", {}).get("name_in_file", "") + if isinstance(v.get("linear_depol_ratio"), dict) + else v.get("linear_depol_ratio", "")) + + raw_co_nif = (v.get("raw_signal_co_pol", {}).get("name_in_file", "") + if isinstance(v.get("raw_signal_co_pol"), dict) + else v.get("raw_signal_co_pol", "")) + raw_x_nif = (v.get("raw_signal_cross_pol", {}).get("name_in_file", "") + if isinstance(v.get("raw_signal_cross_pol"), dict) + else v.get("raw_signal_cross_pol", "")) + + ds_corrected = compute_nrb_dataset( + ds, instrument_type=instrument_type, config_dir=config_dir + ) + + # Propagate source attributes; read from any variable in ds since + # load_arm_data stores them uniformly on every variable. + _ref_var = next(iter(ds.data_vars), None) + src_ds = ds[_ref_var].attrs.get("source_datastream", "") if _ref_var else "" + src_pv = ds[_ref_var].attrs.get("source_process_version", "") if _ref_var else "" + for nif, raw_fields in [ + (nrb_co_nif, raw_co_nif), + (nrb_x_nif, raw_x_nif), + (ldr_nif, f"{raw_co_nif},{raw_x_nif}"), + ]: + if nif and nif in ds_corrected: + ds_corrected[nif].attrs["source_fieldname"] = raw_fields + ds_corrected[nif].attrs["source_datastream"] = src_ds + ds_corrected[nif].attrs["source_process_version"] = src_pv + + # Remove raw input and correction lookup fields from output + raw_fields_to_drop = [ + raw_co_nif, raw_x_nif, + v.get("background_co_pol", {}).get("name_in_file", "") + if isinstance(v.get("background_co_pol"), dict) + else v.get("background_co_pol", ""), + v.get("background_cross_pol", {}).get("name_in_file", "") + if isinstance(v.get("background_cross_pol"), dict) + else v.get("background_cross_pol", ""), + ] + correction_field_names + drop = [f for f in raw_fields_to_drop if f and f in ds_corrected] + ds = ds_corrected.drop_vars(drop, errors="ignore") + + # ------------------------------------------------------------------ + # 4. Consolidate cloud_base / cloud_top fields into layer dimension + # ------------------------------------------------------------------ + for base_key in ("cloud_base", "cloud_top"): + # Resolve the actual variable names present in ds using nif names + def _nif(key: str) -> str: + return key_to_nif.get(key, key) + + base_nif = _nif(base_key) + if base_nif not in ds: + continue + + # Case A: base variable already has a layer dimension (e.g. ceilpol's + # cloud_base_heights is (time, layer)). Pad or trim to N_LAYERS. + if "layer" in ds[base_nif].dims: + da = ds[base_nif] + cur_layers = da.sizes["layer"] + if cur_layers != N_LAYERS: + arr = da.values # (time, cur_layers) + if cur_layers < N_LAYERS: + pad = np.full((ds.sizes["time"], N_LAYERS - cur_layers), np.nan) + arr = np.concatenate([arr, pad], axis=-1) + else: + arr = arr[:, :N_LAYERS] + da = xr.DataArray( + arr, + dims=["time", "layer"], + coords={"time": da.time, "layer": np.arange(N_LAYERS)}, + attrs=ds[base_nif].attrs, + ) + # Drop the old variable and its stale layer coordinate + # before assigning the resized DataArray; otherwise xarray + # refuses to align the old size-N index with the new one. + drop = [base_nif] + (["layer"] if "layer" in ds.coords else []) + ds = ds.drop_vars(drop) + ds[base_nif] = da + continue + + # Case B: separate numbered variables (first_cbh, second_cbh, …) + # that need to be stacked along a new layer dimension. + # Look up nif names for each numbered canonical key. + numbered_nifs = [base_nif] + [ + _nif(f"{base_key}{i}") for i in range(2, 20) + if _nif(f"{base_key}{i}") in ds + ] + present = [n for n in numbered_nifs if n in ds] + + time_vals = ds.time + layers = [] + for i in range(N_LAYERS): + if i < len(present): + layers.append(ds[present[i]].values) + else: + layers.append(np.full(ds.sizes["time"], np.nan)) + + stacked = np.stack(layers, axis=-1) # (time, 10) + merged_da = xr.DataArray( + stacked, + dims=["time", "layer"], + coords={"time": time_vals, "layer": np.arange(N_LAYERS)}, + attrs=ds[present[0]].attrs, + ) + # source_fieldname lists all input field names + merged_da.attrs["source_fieldname"] = ",".join( + ds[n].attrs.get("source_fieldname", n) for n in present + ) + ds = ds.drop_vars(present) + ds[base_nif] = merged_da + + # ------------------------------------------------------------------ + # 5. Interpolate onto uniform grid + # ------------------------------------------------------------------ + if interpolate: + # Rename range dimension to canonical 'range' if the file uses a + # different name (e.g. 'height' for interpolatedsonde); required by + # interpolate_data which always looks for a 'range' dimension. + range_nif = key_to_nif.get("range", "range") + if range_nif != "range" and range_nif in ds.dims: + ds = ds.rename({range_nif: "range"}) + # Identify range-only data variables (no time dimension) before + # interpolation; they will be absent from the interpolated output + # and are explicitly dropped so they do not linger. + range_only_vars = [ + var for var in ds.data_vars + if "time" not in ds[var].dims + ] + + ds = interpolate_data( + ds, + time_min=time_min, + time_max=time_max, + range_km=range_km, + time_step=time_step, + range_units=range_units, + ) + + # Drop any range-only variables that may have survived (interpolate_data + # already excludes them from out_vars, but be explicit for safety). + ds = ds.drop_vars([v for v in range_only_vars if v in ds], errors="ignore") + + # If high-resolution data available, blend: highres up to its max range, + # standard data above that + if ds_highres is not None: + ds_highres_interp = interpolate_data( + ds_highres, + time_min=time_min, + time_max=time_max, + range_km=range_km, + time_step=time_step, + range_units=range_units, + ) + highres_max_range = float( + ds_highres["range"].max() + / (1000.0 if range_units.lower() == "m" else 1.0) + ) + out_range = ds["range"].values if "range" in ds.coords else range_km + for var in ds.data_vars: + if var not in ds_highres_interp: + continue + if "range" not in ds[var].dims: + continue + blended = xr.where( + ds["range"] <= highres_max_range, + ds_highres_interp[var], + ds[var], + ) + blended.attrs = ds[var].attrs + # Update source attributes to reflect both streams + hr_src = ds_highres_interp[var].attrs.get("source_datastream", "") + blended.attrs["source_datastream"] = ( + f"{ds[var].attrs.get('source_datastream','')},{hr_src}".strip(",") + ) + hr_pv = ds_highres_interp[var].attrs.get("source_process_version", "") + blended.attrs["source_process_version"] = ( + f"{ds[var].attrs.get('source_process_version','')},{hr_pv}".strip(",") + ) + ds[var] = blended + + # ------------------------------------------------------------------ + # Final step: rename variables from name_in_file to canonical JSON key + # ------------------------------------------------------------------ + # Drop variables with dimensions outside the canonical set + # ------------------------------------------------------------------ + canonical_dims = {"time", "range", "layer"} + drop_extra = [ + var for var in ds.data_vars + if not set(ds[var].dims).issubset(canonical_dims) + ] + if drop_extra: + ds = ds.drop_vars(drop_extra) + # Drop any orphaned dimensions (and their coordinates) + orphan_dims = [d for d in ds.dims if d not in canonical_dims] + if orphan_dims: + ds = ds.drop_dims(orphan_dims) + + # ------------------------------------------------------------------ + cfg_final = load_config(instrument_type, config_dir) + v_final = cfg_final["variables"] + nif_to_key: dict[str, str] = { + (entry["name_in_file"] if isinstance(entry, dict) else entry): key + for key, entry in v_final.items() + } + rename_map = { + str(var): nif_to_key[str(var)] + for var in ds.data_vars + if str(var) in nif_to_key and nif_to_key[str(var)] != str(var) + } + if rename_map: + ds = ds.rename(rename_map) + + # ------------------------------------------------------------------ + # Unit conversions — applied after rename so canonical names are used + # ------------------------------------------------------------------ + # cloud_base / cloud_top: m -> km (only meaningful on the km range grid) + if interpolate: + for var in ds.data_vars: + if "cloud_base" in var or "cloud_top" in var: + if ds[var].attrs.get("units", "").lower() == "m": + attrs = dict(ds[var].attrs) + ds[var] = ds[var] / 1000.0 + attrs["units"] = "km" + ds[var].attrs = attrs + + # pressure: Pa or kPa -> hPa (applied unconditionally) + _PRES_SCALE = {"pa": 1e-2, "kpa": 10.0} + for var in ds.data_vars: + if "pres" in var: + u = ds[var].attrs.get("units", "") + scale = _PRES_SCALE.get(u.lower()) + if scale is not None: + attrs = dict(ds[var].attrs) + ds[var] = ds[var] * scale + attrs["units"] = "hPa" + ds[var].attrs = attrs + + # Attach site/facility/datastream to global attributes + info_final = cfg_final["instrument_info"] + instrument_class = info_final["instrument_class"] + level = info_final["level"] + ds.attrs["site"] = site + ds.attrs["facility"] = facility + ds.attrs["datastream"] = f"{site}{instrument_class}{facility}.{level}" + + # Order variables: backscatter → extinction → depol → other, qc_ after parent + ds = ds[_ordered_vars(list(ds.data_vars))] + + return ds + + +# --------------------------------------------------------------------------- +# Export +# --------------------------------------------------------------------------- + +def export_dataset( + ds: xr.Dataset, + site: str, + facility: str, + output_path: str | Path = "./", + level: str = "c0", + instrument_class: str = "realbundle", +) -> Path: + """ + Export a processed dataset to a NetCDF4-Classic file. + + The output filename follows the ARM convention:: + + {site}{instrument_class}{facility}.{level}.YYYYMMDD.HHMMSS.nc + + Parameters + ---------- + ds : xr.Dataset + Dataset to export. + site : str + ARM site code. + facility : str + ARM facility code. + output_path : str or Path, optional + Directory where the file will be written. Default is ``"./"``. + level : str, optional + Data level string used in the filename. Default is ``"c0"``. + instrument_class : str, optional + Instrument class string used in the filename. Default is + ``"realbundle"``. + + Returns + ------- + Path + Absolute path of the written file. + """ + + output_path = Path(output_path) + output_path.mkdir(parents=True, exist_ok=True) + + # Derive timestamp from first time sample + t0 = ds.time.values[0] + dt_str = str(t0)[:19].replace("-", "").replace("T", ".").replace(":", "") + # dt_str is now "YYYYMMDD.HHMMSS" + + filename = f"{site}{instrument_class}{facility}.{level}.{dt_str}.nc" + out_file = output_path / filename + + # Stamp creation time in UTC and set output-level global attributes + ds = ds.copy() + ds.attrs["creation_date"] = ( + datetime.now(tz=timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ") + ) + ds.attrs["site"] = site + ds.attrs["facility"] = facility + ds.attrs["datastream"] = f"{site}{instrument_class}{facility}.{level}" + + # Rebuild the dataset so that dimension coordinates (time, range, layer) + # are written first. + _DIM_COORD_ORDER = ("time", "range", "layer") + ordered_coords = {k: ds.coords[k] for k in _DIM_COORD_ORDER if k in ds.coords} + other_coords = {k: ds.coords[k] for k in ds.coords if k not in ordered_coords} + ds_out = xr.Dataset(coords={**ordered_coords, **other_coords}, attrs=ds.attrs) + for v in ds.data_vars: + ds_out[v] = ds[v] + + # Materialise any dask-backed arrays before writing. open_mfdataset + # returns lazy arrays; driving them through to_netcdf via the dask + # threaded scheduler together with zlib compression can cause a deadlock. + # Loading into memory first avoids that entirely. + ds_out = ds_out.load() + + # Build per-variable encoding: zlib compression level 9 for all variables + encoding = { + var: {"zlib": True, "complevel": 9} + for var in ds_out.data_vars + } + + ds_out.to_netcdf( + str(out_file), + format="NETCDF4_CLASSIC", + encoding=encoding, + ) + + return out_file.resolve() diff --git a/profiles_eval/real_prof_main.py b/profiles_eval/real_prof_main.py new file mode 100644 index 0000000..80d02dc --- /dev/null +++ b/profiles_eval/real_prof_main.py @@ -0,0 +1,92 @@ +""" +=============================================================== +Israel Silber +=============================================================== +Wrapper / test script for the lidar profile intercomparison +(REAL) data pipeline. +Each instrument is loaded, corrected (if required), and +interpolated onto a common uniform time x range grid. +Instruments with missing data files are skipped with a warning. +=============================================================== +""" + +from pathlib import Path +import warnings + +import numpy as np + +from real_prof_io import load_and_process_arm_data, export_dataset + +# --------------------------------------------------------------------------- +# Configuration +# --------------------------------------------------------------------------- + +SITE = "sgp" +FACILITY = "C1" +CONFIG_DIR = str(Path(__file__).parent / "configs") + +TIME_MIN = np.datetime64("2026-03-10T20:00:00", "ns") +TIME_MAX = np.datetime64("2026-03-10T21:00:00", "ns") + +# ARM nested archive: /data/archive/{site}/{site}{instrument_class}{facility}.{level}/ +DATA_PATH_TEMPLATE = "/data/archive/{site}/{site}{instrument_class}{facility}.{level}" + +# All instruments for data loading (verify a corresponding JSON config exists) +ALL_INSTRUMENTS = [ + "ceil", + "ceilpol", + "dl", + "hsrl", + "minimpl", + "mpl", + "rl", + "interpolatedsonde", +] + +# --------------------------------------------------------------------------- +# Main +# --------------------------------------------------------------------------- + +def main() -> "xr.Dataset": + + print("=" * 70) + print(f"Site: {SITE} Facility: {FACILITY}") + print(f"Period: {TIME_MIN} → {TIME_MAX}") + print(f"Path template: {DATA_PATH_TEMPLATE}") + print("=" * 70) + + with warnings.catch_warnings(record=True) as caught: + warnings.simplefilter("always") + ds = load_and_process_arm_data( + instrument_type = ALL_INSTRUMENTS, + site = SITE, + facility = FACILITY, + data_path = None, + time_min = TIME_MIN, + time_max = TIME_MAX, + config_dir = CONFIG_DIR, + data_path_template = DATA_PATH_TEMPLATE, + interpolate = True, + ) + + skipped = [str(w.message) for w in caught] + if skipped: + print("\nSkipped instruments:") + for msg in skipped: + print(f" {msg}") + + print(f"\nMerged dataset: {dict(ds.sizes)} vars={len(ds.data_vars)}") + if "time" in ds.coords: + print(f" time: {str(ds.time.values[0])[:19]} → {str(ds.time.values[-1])[:19]}") + if "range" in ds.coords: + print(f" range: [{float(ds.coords['range'].min()):.3f}," + f" {float(ds.coords['range'].max()):.3f}] km") + + return ds + + +if __name__ == "__main__": + result = main() + if result.data_vars: + out = export_dataset(result, site=SITE, facility=FACILITY) + print(f"\nExported: {out}") diff --git a/profiles_eval/real_prof_plot.py b/profiles_eval/real_prof_plot.py new file mode 100644 index 0000000..47a848e --- /dev/null +++ b/profiles_eval/real_prof_plot.py @@ -0,0 +1,553 @@ +""" +=============================================================== +Israel Silber +=============================================================== +Lidar profile intercomparison plotting functions +=============================================================== +""" +from __future__ import annotations + +import warnings + +import matplotlib.colors as mcolors +import matplotlib.dates as mdates +import matplotlib.pyplot as plt +import numpy as np +import xarray as xr + + +from real_prof_init import find_instrument_type +from real_prof_io import load_arm_data + + +# --------------------------------------------------------------------------- +# Private helpers +# --------------------------------------------------------------------------- + +def _is_log_var(varname: str) -> bool: + """Return True if the variable should be rendered with a log colour scale.""" + return "backscatter" in varname or "extinction" in varname + + +def _safe_norm( + data: np.ndarray, + log: bool, + vmin: float | None = None, + vmax: float | None = None, + varname: str = "", +) -> mcolors.Normalize: + """ + Build a Normalize (or LogNorm) from *data*, ignoring non-finite values. + + For log-scale normalization only positive values are considered. + Falls back to linear normalization if no positive finite values are found. + """ + candidates = data[np.isfinite(data) & (data > 0)] if log else data[np.isfinite(data)] + if candidates.size == 0: + return mcolors.Normalize(vmin=0, vmax=1) + # Variable-specific hard-coded defaults + if "linear_depol_ratio" in varname: + _vmin = vmin if vmin is not None else 0.0 + _vmax = vmax if vmax is not None else 1.0 + else: + _vmin = vmin if vmin is not None else float(np.percentile(candidates, 1)) + _vmax = vmax if vmax is not None else float(np.percentile(candidates, 99)) + if log and _vmin > 0 and _vmax > _vmin: + return mcolors.LogNorm(vmin=_vmin, vmax=_vmax) + return mcolors.Normalize(vmin=_vmin, vmax=_vmax) + + +def _plot_curtain( + ax: plt.Axes, + da: xr.DataArray, + norm: mcolors.Normalize, + cmap: str = "viridis", + title: str = "", + ylabel: str = "Range (km)", +) -> plt.cm.ScalarMappable: + """Render a 2-D (time x spatial) curtain on *ax*. + + The spatial dimension may have any name (``"range"``, ``"height"``, etc.). + Returns the :class:`~matplotlib.collections.QuadMesh` artist. + """ + spatial_dims = [d for d in da.dims if d != "time"] + if not spatial_dims: + raise ValueError( + f"DataArray '{da.name}' has no spatial dimension besides 'time'." + ) + sdim = spatial_dims[0] + + if list(da.dims) != ["time", sdim]: + da = da.transpose("time", sdim) + + t = da.time.values + r = da[sdim].values if sdim in da.coords else np.arange(da.sizes[sdim]) + + mesh = ax.pcolormesh(t, r, da.values.T, norm=norm, cmap=cmap, shading="nearest") + ax.set_xlim(t[0], t[-1]) + ax.set_ylabel(ylabel) + ax.set_xlabel("Time (UTC)") + if title: + ax.set_title(title, fontsize=9) + ax.xaxis.set_major_formatter(mdates.DateFormatter("%H:%M")) + plt.setp(ax.xaxis.get_majorticklabels(), rotation=30, ha="right") + return mesh + + +# --------------------------------------------------------------------------- +# Plotting functions +# --------------------------------------------------------------------------- + +def _cloud_layer_scatter( + ax: plt.Axes, + da: xr.DataArray, + color: str, +) -> None: + """Scatter-plot all layers of a ``(time, layer)`` cloud variable on *ax*. + + Assumes values are already in km (as produced by the interpolation + pipeline when ``interpolate=True``). NaN layers are skipped silently. + """ + t = da.time.values + n_layers = da.sizes["layer"] + for li in range(n_layers): + vals = da.isel(layer=li).values + valid = np.isfinite(vals) + if not valid.any(): + continue + ax.scatter(t[valid], vals[valid], s=10, c=color, marker=".", linewidths=0) + + +def plot_variable_curtain( + ds: xr.Dataset, + variable: str, + overlay_field: str | None = None, + overlay_levels: int | list = 8, + overlay_colors: str = "magenta", + overlay_cloud_layers: bool = True, + cloud_base_color: str = "k", + cloud_top_color: str = "lightgray", + reference_cloud_instrument: str | None = None, + cmap: str = "viridis", + shared_norm: bool = False, + ylim: tuple[float, float] | None = None, + fig_width: float = 10.0, + panel_height: float = 3.0, + **kwargs, +) -> tuple[plt.Figure, np.ndarray]: + """Plot curtain panels for every dataset variable whose name contains *variable*. + + One panel is created per matching variable (``qc_`` fields and variables + without a range dimension are excluded). When ``shared_norm=False`` + (default) all panels share the same color scale, enabling direct + inter-instrument comparison. Variables whose name contains + ``"backscatter"`` or ``"extinction"`` are rendered with a logarithmic + color scale. + + When ``overlay_cloud_layers=True`` (default), cloud base and cloud top + layer variables are overlaid as scatter points (marker size 3). By + default each panel uses the cloud layers from the same instrument as the + plotted variable. Set *reference_cloud_instrument* to a specific + instrument prefix (e.g. ``"ceil"``) to overlay that instrument's cloud + layers on **every** panel instead. + + Parameters + ---------- + ds : xr.Dataset + Processed multi-instrument dataset (variables prefixed with instrument + name, e.g. ``"hsrl_attenuated_backscatter"``). + variable : str + Substring to search for in variable names, e.g. + ``"attenuated_backscatter"``. + overlay_field : str, optional + Name of a dataset variable to overlay as iso-contours on every panel + (e.g. ``"interpolatedsonde_temp"``). + overlay_levels : int or list, optional + Number of contour levels or explicit level values. Default is 8. + overlay_colors : str, optional + Color for contour lines. Default is ``"k"`` (black). + overlay_cloud_layers : bool, optional + Overlay cloud base / cloud top scatter points when present. + Default is ``True``. + cloud_base_color : str, optional + Marker color for cloud base points. Default is ``"white"``. + cloud_top_color : str, optional + Marker color for cloud top points. Default is ``"lightgray"``. + reference_cloud_instrument : str, optional + When set, use this instrument's cloud layers on every panel instead + of matching each panel's own instrument. E.g. ``"ceil"`` will + scatter ``ceil_cloud_base`` / ``ceil_cloud_top`` on all panels. + Ignored when *overlay_cloud_layers* is ``False``. + cmap : str, optional + Colormap name. Default is ``"viridis"``. + shared_norm : bool, optional + Share color normalization across all panels. Default is ``False``. + ylim : tuple of float, optional + ``(ymin, ymax)`` range limits applied to every panel. When ``None`` + (default) matplotlib chooses the limits automatically. + fig_width : float, optional + Figure width in inches. Default is 10. + panel_height : float, optional + Height of each panel in inches. Default is 3. + **kwargs + Forwarded to :func:`matplotlib.pyplot.subplots`. + + Returns + ------- + fig : matplotlib.figure.Figure + axes : np.ndarray of matplotlib.axes.Axes + """ + _sep = f"_{variable}" + matches = [ + v for v in ds.data_vars + if v.endswith(_sep) + and "_" not in v[: len(v) - len(_sep)] + and not v.startswith("qc_") + and "range" in ds[v].dims + ] + if not matches: + raise ValueError( + f"No range-dimension variables containing '{variable}' found in the dataset." + ) + + log = _is_log_var(variable) + + norm_shared: mcolors.Normalize | None = None + if shared_norm: + all_data = np.concatenate([ds[v].values.ravel() for v in matches]) + norm_shared = _safe_norm(all_data, log, varname=variable) + + n = len(matches) + fig, _axes = plt.subplots( + n, 1, figsize=(fig_width, panel_height * n), squeeze=False, **kwargs + ) + axes: np.ndarray = _axes.ravel() + + # Resolve overlay DataArray once + overlay_da: xr.DataArray | None = None + if overlay_field is not None: + if overlay_field in ds: + overlay_da = ds[overlay_field] + else: + warnings.warn( + f"overlay_field '{overlay_field}' not found in dataset; skipping.", + stacklevel=2, + ) + + for ax, var in zip(axes, matches): + da = ds[var] + panel_norm = norm_shared if shared_norm else _safe_norm(da.values.ravel(), log, varname=var) + mesh = _plot_curtain(ax, da, norm=panel_norm, cmap=cmap, title=var) + plt.colorbar(mesh, ax=ax, pad=0.02, label=da.attrs.get("units", "")) + + if overlay_da is not None and "range" in overlay_da.dims: + ov = overlay_da + if list(ov.dims) != ["time", "range"]: + ov = ov.transpose("time", "range") + cs = ax.contour( + ov.time.values, + ov["range"].values, + ov.values.T, + levels=overlay_levels, + colors=overlay_colors, + linewidths=0.6, + ) + ax.clabel(cs, inline=True, fontsize=7, fmt="%.4g") + + # Overlay cloud base / cloud top scatter points + if overlay_cloud_layers: + # If a reference instrument is specified, use its cloud layers on + # every panel; otherwise derive the prefix from the plotted variable + # so only the matching instrument's layers are shown. + if reference_cloud_instrument is not None: + prefix = f"{reference_cloud_instrument}_" + else: + _sep = f"_{variable}" + prefix = var[: var.index(_sep) + 1] if _sep in var else "" + for cloud_key, color in ( + ("cloud_base", cloud_base_color), + ("cloud_top", cloud_top_color), + ): + # Prefer prefixed name, fall back to unprefixed + cloud_var = next( + (c for c in (f"{prefix}{cloud_key}", cloud_key) + if c in ds and "layer" in ds[c].dims), + None, + ) + if cloud_var is not None: + _cloud_layer_scatter(ax, ds[cloud_var], color=color) + + if ylim is not None: + ax.set_ylim(ylim) + + fig.tight_layout() + return fig, axes + + +def compare_variable_to_orig( + ds: xr.Dataset, + variable: str, + ds_orig: xr.Dataset | None = None, + data_path: "str | Path | None" = None, + data_path_template: str | None = None, + config_dir: str = "./configs", + match_scale: bool = True, + cmap: str = "viridis", + fig_width: float = 14.0, + panel_height: float = 4.0, + ylim: tuple[float, float] | None = None, + **kwargs, +) -> tuple[plt.Figure, np.ndarray]: + """Plot a processed/interpolated variable side-by-side with its original. + + The original field name is read from the ``source_fieldname`` attribute of + the processed variable. The original dataset can be supplied directly via + *ds_orig*, or left as ``None`` to let the function discover and load the + source data file automatically using the ``source_datastream`` attribute + together with *data_path* / *data_path_template*. + + Both panels share the same color scale when ``match_scale=True`` (default). + For NRB-corrected variables—where the source field is physically distinct + from the output (raw signal vs. normalized backscatter)—pass + ``match_scale=False`` to use independent color scales. + + Variables whose name contains ``"backscatter"`` or ``"extinction"`` are + rendered with a logarithmic color scale. + + Parameters + ---------- + ds : xr.Dataset + Processed (interpolated) dataset. + variable : str + Variable name as it appears in *ds*. + ds_orig : xr.Dataset or None, optional + Original dataset. When ``None``, the function attempts to load the + source file automatically using *data_path* or *data_path_template* + together with metadata stored in ``ds[variable].attrs``. + data_path : str or Path, optional + Directory containing the original instrument data files. Used only + when *ds_orig* is ``None``. + data_path_template : str, optional + ARM archive path template (see :func:`real_prof_io.find_arm_files`). + Used only when *ds_orig* is ``None`` and *data_path* is also ``None``. + config_dir : str, optional + Directory containing JSON configuration files. Default is + ``"./configs"``. + match_scale : bool, optional + Share the color scale between both panels. Pass ``False`` for + NRB-corrected variables where source and output are physically + different quantities. Default is ``True``. + cmap : str, optional + Colormap name. Default is ``"viridis"``. + fig_width : float, optional + Figure width in inches. Default is 14. + panel_height : float, optional + Panel height in inches. Default is 4. + ylim : tuple of float, optional + ``(ymin, ymax)`` y-axis limits applied to both panels. When ``None`` + (default) limits are derived from the finite data extent across both + panels. + **kwargs + Forwarded to :func:`matplotlib.pyplot.subplots`. + + Returns + ------- + fig : matplotlib.figure.Figure + axes : np.ndarray of matplotlib.axes.Axes (shape (2,)) + """ + if variable not in ds: + raise ValueError(f"Variable '{variable}' not found in ds.") + + da_proc = ds[variable] + orig_name = da_proc.attrs.get("source_fieldname", variable) + + # ------------------------------------------------------------------ + # Auto-load original dataset when ds_orig is not supplied + # ------------------------------------------------------------------ + if ds_orig is None: + site = ds.attrs.get("site", "") + facility = ds.attrs.get("facility", "") + source_ds = da_proc.attrs.get("source_datastream", "") + + if not source_ds: + raise ValueError( + f"Cannot auto-load original data: 'source_datastream' attribute " + f"is missing from ds['{variable}']. Pass ds_orig explicitly." + ) + if not site or not facility: + raise ValueError( + "Cannot auto-load original data: 'site' and 'facility' global " + "attributes are missing from ds. Pass ds_orig explicitly." + ) + + instrument_type = find_instrument_type( + source_ds, site, facility, config_dir + ) + if instrument_type is None: + raise ValueError( + f"No config in '{config_dir}' matches source_datastream " + f"'{source_ds}'. Pass ds_orig explicitly." + ) + + t_min = ds.time.values[0].astype("datetime64[ns]") + t_max = ds.time.values[-1].astype("datetime64[ns]") + + ds_orig = load_arm_data( + instrument_type, site, facility, data_path, + t_min, t_max, + config_dir=config_dir, + data_path_template=data_path_template, + ) + + if orig_name not in ds_orig: + raise ValueError( + f"Original field '{orig_name}' (source_fieldname of '{variable}') " + f"not found in ds_orig. Available: {list(ds_orig.data_vars)}" + ) + + da_orig = ds_orig[orig_name] + + # Convert original range coordinate to km so both panels share the same + # y-axis scale. Handles both a 'range' coord and other spatial dim names. + spatial_orig = [d for d in da_orig.dims if d != "time"] + if spatial_orig: + sdim = spatial_orig[0] + if sdim in da_orig.coords: + r_units = da_orig[sdim].attrs.get("units", "m") + if r_units.lower() == "m": + r_km = da_orig[sdim].values / 1000.0 + da_orig = da_orig.assign_coords({sdim: r_km}) + + log = _is_log_var(variable) + + if match_scale: + all_data = np.concatenate([da_proc.values.ravel(), da_orig.values.ravel()]) + norm = _safe_norm(all_data, log, varname=variable) + norm_proc = norm_orig = norm + else: + norm_proc = _safe_norm(da_proc.values.ravel(), log, varname=variable) + norm_orig = _safe_norm(da_orig.values.ravel(), log, varname=variable) + + fig, _axes = plt.subplots( + 1, 2, figsize=(fig_width, panel_height), squeeze=False, **kwargs + ) + axes: np.ndarray = _axes.ravel() + + # Left panel — processed / interpolated + mesh_proc = _plot_curtain( + axes[0], da_proc, norm=norm_proc, cmap=cmap, + title=f"{variable}\n(processed / interpolated)", + ) + plt.colorbar(mesh_proc, ax=axes[0], pad=0.02, label=da_proc.attrs.get("units", "")) + + # Right panel — original + src_ds = da_proc.attrs.get("source_datastream", "") + orig_title = f"{orig_name}\n(original" + (f" · {src_ds}" if src_ds else "") + ")" + mesh_orig = _plot_curtain( + axes[1], da_orig, norm=norm_orig, cmap=cmap, title=orig_title, + ) + plt.colorbar(mesh_orig, ax=axes[1], pad=0.02, label=da_orig.attrs.get("units", "")) + + # Y-axis limits: explicit or derived from finite data extent across both panels + if ylim is not None: + y_min, y_max = ylim + else: + def _finite_ylim(da): + sdim = next(d for d in da.dims if d != "time") + r = da[sdim].values if sdim in da.coords else np.arange(da.sizes[sdim]) + if list(da.dims) != ["time", sdim]: + da = da.transpose("time", sdim) + mask = np.any(np.isfinite(da.values), axis=0) + return float(r[0]), float(r[mask].max() if mask.any() else r[-1]) + + y0_p, y1_p = _finite_ylim(da_proc) + y0_o, y1_o = _finite_ylim(da_orig) + y_min = min(y0_p, y0_o) + y_max = max(y1_p, y1_o) + for _ax in axes: + _ax.set_ylim(y_min, y_max) + + fig.tight_layout() + return fig, axes + + +# --------------------------------------------------------------------------- +# Example / quick-look +# --------------------------------------------------------------------------- + +if __name__ == "__main__": + import glob + from pathlib import Path + + # ------------------------------------------------------------------ + # Configuration — adjust to match your local setup + # ------------------------------------------------------------------ + SITE = "sgp" + FACILITY = "C1" + INSTRUMENT_CLASS = "realbundle" + LEVEL = "c0" + OUTPUT_DIR = "." + DATA_PATH_TEMPLATE = "/data/archive/{site}/{site}{instrument_class}{facility}.{level}" + + # ------------------------------------------------------------------ + # Locate the most-recent bundle file matching the convention + # ------------------------------------------------------------------ + pattern = str( + Path(OUTPUT_DIR) + / f"{SITE}{INSTRUMENT_CLASS}{FACILITY}.{LEVEL}.*.nc" + ) + files = sorted(glob.glob(pattern)) + if not files: + raise FileNotFoundError( + f"No bundle file found matching: {pattern}\n" + "Run real_prof_main.py first to produce the output file." + ) + bundle_file = files[-1] + print(f"Loading: {bundle_file}") + ds = xr.open_dataset(bundle_file) + + # ------------------------------------------------------------------ + # 1. Curtain plot — attenuated backscatter from all instruments, + # with ceil cloud base overlaid on every panel as a reference + # ------------------------------------------------------------------ + fig1, axes1 = plot_variable_curtain( + ds, + variable="attenuated_backscatter", + overlay_field="interpolatedsonde_temp", + overlay_levels=[], + overlay_colors="cyan", + overlay_cloud_layers=True, + reference_cloud_instrument="ceil", + shared_norm=False, + cmap="jet", + ylim=(0, 10.0), + ) + fig1.savefig("curtain_attenuated_backscatter.png", dpi=150, bbox_inches="tight") + print("Saved: curtain_attenuated_backscatter.png") + + # ------------------------------------------------------------------ + # 2. Comparison plot — processed vs. original for one instrument. + # Uses source_fieldname / source_datastream attrs to auto-load + # the original file; supply data_path_template for file discovery. + # ------------------------------------------------------------------ + # Pick the first available attenuated_backscatter variable in the bundle + comp_var = next( + (v for v in ds.data_vars if "attenuated_backscatter" in v and not v.startswith("qc_")), + None, + ) + if comp_var is not None: + fig2, axes2 = compare_variable_to_orig( + ds, + variable=comp_var, + data_path_template=DATA_PATH_TEMPLATE, + match_scale=True, + cmap="jet", + ) + out_name = f"compare_{comp_var}.png" + fig2.savefig(out_name, dpi=150, bbox_inches="tight") + print(f"Saved: {out_name}") + else: + print("No attenuated_backscatter variable found — skipping comparison plot.") + + plt.show() + diff --git a/profiles_eval/real_prof_utils.py b/profiles_eval/real_prof_utils.py new file mode 100644 index 0000000..dd3372d --- /dev/null +++ b/profiles_eval/real_prof_utils.py @@ -0,0 +1,247 @@ +""" +=============================================================== +Israel Silber +=============================================================== +Lidar profile intercomparison utility functions +=============================================================== +""" + +from typing import Dict, List, Optional +import numpy as np +import xarray as xr +import real_prof_init as pi + + +def get_common_variables(instrument_types: List[str], config_dir: str = "./configs") -> List[str]: + """ + Get variables that are common across multiple instrument types. + + Parameters + ---------- + instrument_types : List[str] + List of instrument types to compare + config_dir : str, optional + Directory containing the JSON configuration files, by default "./configs" + + Returns + ------- + List[str] + List of common variable names (standard names) + """ + if not instrument_types: + return [] + + # Load all configs and get variable sets + variable_sets = [] + for inst_type in instrument_types: + config = pi.load_config(inst_type, config_dir) + variable_sets.append(set(config["variables"].keys())) + + # Find intersection + common_vars = variable_sets[0] + for var_set in variable_sets[1:]: + common_vars &= var_set + + return list(common_vars) + + +# Default output grids +_DEFAULT_RANGE_KM = np.arange(0.0, 20.0 + 0.015, 0.015) # 0–20 km, 15 m steps + + +def interpolate_data( + ds: xr.Dataset, + time_min: np.datetime64, + time_max: np.datetime64, + range_km: Optional[np.ndarray] = None, + time_step: np.timedelta64 = np.timedelta64(15, "s"), + range_units: str = "m", +) -> xr.Dataset: + """ + Interpolate all variables in a dataset onto uniform time and range grids. + + Interpolation method depends on variable name: + + * Variables whose name ends with ``_mask`` or starts with ``qc_``: + **nearest-neighbour** in all applicable dimensions. + * All other variables: **linear** interpolation. + + The interpolation is applied per dimension type: + + * **(time, range) 2-D variables**: bilinear (or nearest) gridded interpolation. + * **time-only 1-D variables**: 1-D linear (or nearest) interpolation along time. + * **(time, layer) 2-D variables**: linear (or nearest for ``qc_``/``_mask`` + variables) interpolation along time, applied independently for each layer index. + * Variables with no time dimension are passed through unchanged. + + Parameters + ---------- + ds : xr.Dataset + Input dataset. + time_min : np.datetime64 + Start of the output time grid (inclusive). + time_max : np.datetime64 + End of the output time grid (inclusive). + range_km : np.ndarray, optional + 1-D array of output range gate centres in **km**. Defaults to + ``np.arange(0, 20.015, 0.015)`` (0–20 km in 15 m increments). + Ignored for variables that have no range dimension. + time_step : np.timedelta64, optional + Spacing between output time steps. Default is 15 s. + range_units : str, optional + Units of the ``range`` coordinate in *ds* (``"m"`` or ``"km"``). + If ``"m"``, the coordinate is divided by 1000 before interpolation. + Default is ``"m"``. + + Returns + ------- + xr.Dataset + New dataset on the uniform (time, range_km) grid. Variables that + could not be interpolated (e.g. scalar coordinates) are dropped + silently. + """ + if range_km is None: + range_km = _DEFAULT_RANGE_KM + + if "time" in ds.coords and ds.sizes.get("time", 0) == 0: + raise ValueError( + "interpolate_data received a dataset with an empty time dimension." + ) + + # ------------------------------------------------------------------ + # Build output time coordinate + # ------------------------------------------------------------------ + step_ns = int(time_step / np.timedelta64(1, "ns")) + t_min_ns = time_min.astype("datetime64[ns]").astype(np.int64) + t_max_ns = time_max.astype("datetime64[ns]").astype(np.int64) + out_time = (np.arange(t_min_ns, t_max_ns + step_ns, step_ns) + .astype("datetime64[ns]")) + + # ------------------------------------------------------------------ + # Standardise range coordinate to km + # ------------------------------------------------------------------ + if "range" in ds.coords: + range_coord = ds["range"].values + if range_units.lower() == "m": + range_coord = range_coord / 1000.0 + # Rebuild dataset with km-valued range + ds = ds.assign_coords(range=range_coord) + + # ------------------------------------------------------------------ + # Ensure missing values are NaN (replace common fill values) + # ------------------------------------------------------------------ + for var in ds.data_vars: + da = ds[var] + if np.issubdtype(da.dtype, np.floating): + fill = da.attrs.get("missing_value", da.attrs.get("_FillValue", None)) + if fill is not None: + ds[var] = da.where(da != fill) + + # ------------------------------------------------------------------ + # Helper: decide method for a given variable name + # ------------------------------------------------------------------ + def _method(name: str) -> str: + if name.endswith("_mask") or name.startswith("qc_"): + return "nearest" + return "linear" + + # ------------------------------------------------------------------ + # Interpolate each variable + # ------------------------------------------------------------------ + out_vars: dict[str, xr.DataArray] = {} + + for var in ds.data_vars: + da = ds[var] + dims = da.dims + method = _method(var) + + # Skip variables whose time dimension is empty + if "time" in dims and da.sizes["time"] == 0: + continue + + # Skip range-only variables (no time dim); they are meaningless on + # the interpolated output grid and are excluded unconditionally. + if "time" not in dims: + continue + + # Pre-compute input coordinate bounds for explicit extrapolation masking + t_in_min = da.time.values[0] if "time" in dims else None + t_in_max = da.time.values[-1] if "time" in dims else None + r_in_min = float(da["range"].min()) if "range" in dims else None + r_in_max = float(da["range"].max()) if "range" in dims else None + + # Boolean masks over the *output* grids marking in-bounds points + t_valid = ( + xr.DataArray( + (out_time >= t_in_min) & (out_time <= t_in_max), + dims=["time"], coords={"time": out_time}, + ) + if t_in_min is not None else None + ) + r_valid = ( + xr.DataArray( + (range_km >= r_in_min) & (range_km <= r_in_max), + dims=["range"], coords={"range": range_km}, + ) + if r_in_min is not None else None + ) + + if "time" in dims and "range" in dims: + # 2-D (time × range) — gridded interpolation + try: + result = da.interp( + time=out_time, + range=range_km, + method=method, + kwargs={"fill_value": np.nan, "bounds_error": False}, + ) + # Explicitly zero out any extrapolated cells (handles nearest) + result = result.where(t_valid & r_valid) + out_vars[var] = result + except Exception: + pass + + elif "time" in dims and "layer" in dims: + # 2-D (time × layer) — interpolate along time, layer unchanged + n_layers = da.sizes["layer"] + layers = [] + for li in range(n_layers): + sl = da.isel(layer=li) + try: + interped = sl.interp( + time=out_time, + method=method, + kwargs={"fill_value": np.nan, "bounds_error": False}, + ) + # Mask time extrapolation explicitly + interped = interped.where(t_valid) + layers.append(interped) + except Exception: + layers.append(xr.full_like( + sl.isel(time=0).expand_dims(time=out_time), np.nan + )) + out_vars[var] = xr.concat(layers, dim="layer").transpose("time", "layer") + + elif "time" in dims: + # 1-D (time only) + try: + result = da.interp( + time=out_time, + method=method, + kwargs={"fill_value": np.nan, "bounds_error": False}, + ) + # Mask time extrapolation explicitly + result = result.where(t_valid) + out_vars[var] = result + except Exception: + pass + + # ------------------------------------------------------------------ + # Assemble output dataset, carrying over variable attributes + # ------------------------------------------------------------------ + out_ds = xr.Dataset(out_vars) + for var in out_ds.data_vars: + if var in ds.data_vars: + out_ds[var].attrs.update(ds[var].attrs) + + return out_ds \ No newline at end of file From 65f74faf3696cc0b73d0603bb478b526e889a4af Mon Sep 17 00:00:00 2001 From: isilber Date: Mon, 6 Apr 2026 19:27:38 +0000 Subject: [PATCH 2/2] DEV: consistent xlim in compare plotting routine --- profiles_eval/real_prof_plot.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/profiles_eval/real_prof_plot.py b/profiles_eval/real_prof_plot.py index 47a848e..ec50e58 100644 --- a/profiles_eval/real_prof_plot.py +++ b/profiles_eval/real_prof_plot.py @@ -448,6 +448,12 @@ def compare_variable_to_orig( ) plt.colorbar(mesh_orig, ax=axes[1], pad=0.02, label=da_orig.attrs.get("units", "")) + # Lock both panels to the processed dataset's time extent + t_min_ds = ds.time.values[0] + t_max_ds = ds.time.values[-1] + for _ax in axes: + _ax.set_xlim(t_min_ds, t_max_ds) + # Y-axis limits: explicit or derived from finite data extent across both panels if ylim is not None: y_min, y_max = ylim