From 2ca800462ed9c83cfeab10070b94a2f879fee904 Mon Sep 17 00:00:00 2001 From: cmspeed Date: Thu, 23 Apr 2026 11:30:36 -0700 Subject: [PATCH 1/9] Add new slope-filter option to cli --- src/disasters/cli.py | 44 ++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 44 insertions(+) diff --git a/src/disasters/cli.py b/src/disasters/cli.py index 977b5a6..72da971 100644 --- a/src/disasters/cli.py +++ b/src/disasters/cli.py @@ -512,5 +512,49 @@ def mosaic( else: logger.warning("Mosaic pipeline exited without producing outputs.") + +@cli.command(name="slope-filter") +@click.option( + "-ld", + "--local-dir", + type=click.Path(path_type=Path, file_okay=False, dir_okay=True, exists=True), + required=True, + help="Path to a local directory containing pre-downloaded OPERA geotiffs.", +) +@click.option( + "-st", + "--slope-threshold", + type=float, + required=True, + help="Slope threshold in degrees (0-100) to define the resulting mask.", +) +@click.option( + "-o", + "--output-dir", + type=click.Path(path_type=Path, file_okay=False, dir_okay=True), + required=True, + help="Directory where the generated dem.tif and slope.tif will be saved.", +) +def slope_filter(local_dir: Path, slope_threshold: float, output_dir: Path) -> None: + """Generate a standalone DEM and slope mask from local OPERA products.""" + + if not (0 <= slope_threshold <= 100): + raise click.BadParameter("Slope threshold must be between 0 and 100.", param_hint="--slope-threshold") + + from .pipeline import run_slope_filter_only + + logger.info(f"Starting standalone slope filter pipeline for threshold > {slope_threshold} degrees...") + out_dir = run_slope_filter_only( + local_dir=local_dir, + slope_threshold=slope_threshold, + output_dir=output_dir + ) + + if out_dir: + logger.info(f"Slope generation complete. Files saved to: {out_dir}") + else: + logger.warning("Slope pipeline exited without producing outputs.") + + if __name__ == "__main__": cli() \ No newline at end of file From fc1800e7508403c7eb8aee74e07599e7bab4863b Mon Sep 17 00:00:00 2001 From: cmspeed Date: Thu, 23 Apr 2026 11:31:17 -0700 Subject: [PATCH 2/9] Add helper function fetch DEMs if needed --- src/disasters/catalog.py | 47 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 47 insertions(+) diff --git a/src/disasters/catalog.py b/src/disasters/catalog.py index 18007fe..69f4994 100644 --- a/src/disasters/catalog.py +++ b/src/disasters/catalog.py @@ -81,6 +81,53 @@ def read_opera_metadata(output_dir: Path) -> pd.DataFrame: return df +def fetch_missing_dems(bbox: list, local_dir: Path) -> None: + """ + Queries Earthdata for recent DSWx-HLS granules covering the bbox + and downloads ONLY their _B10_DEM.tif files to the local directory. + """ + import datetime + import earthaccess + + logger.info("[DEM Fetcher] Missing local DEMs detected. Querying Earthdata for static topography...") + try: + # We just need one recent static DEM. Look at the last 60 days to ensure coverage. + end_date = datetime.datetime.utcnow() + start_date = end_date - datetime.timedelta(days=60) + + results = earthaccess.search_data( + short_name="OPERA_L3_DSWX-HLS_V1", + bounding_box=tuple(bbox), + temporal=(start_date.strftime("%Y-%m-%d"), end_date.strftime("%Y-%m-%d")), + count=20 # Grab enough to cover the bbox footprint + ) + + if not results: + logger.warning("[DEM Fetcher] No recent DSWx-HLS granules found for this BBOX.") + return + + # Filter to ONLY get the _B10_DEM URLs (we don't want to download the water data) + dem_urls = [] + for granule in results: + for link in granule.data_links(): + if "_B10_DEM.tif" in link: + dem_urls.append(link) + + if not dem_urls: + logger.warning("[DEM Fetcher] Found granules, but no _B10_DEM.tif links.") + return + + # Remove duplicates + dem_urls = list(set(dem_urls)) + + logger.info(f"[DEM Fetcher] Downloading {len(dem_urls)} DEM layers to {local_dir}...") + earthaccess.download(dem_urls, local_path=str(local_dir)) + logger.info("[DEM Fetcher] Topography download complete.") + + except Exception as e: + logger.error(f"[DEM Fetcher] Failed to fetch missing DEMs: {e}") + + def cluster_by_time(df: pd.DataFrame, time_col: str = "Start Time", threshold_minutes: int = 120) -> list: """ Groups dataframe rows by time clustering to separate passes (e.g. Ascending vs Descending). From 69687761743b3303ad0bb17df3af42b91c41a8cd Mon Sep 17 00:00:00 2001 From: cmspeed Date: Thu, 23 Apr 2026 11:32:45 -0700 Subject: [PATCH 3/9] Update slope filtering logic to handle local data --- src/disasters/filters.py | 128 +++++++++++++++++++++++++++++++-------- 1 file changed, 103 insertions(+), 25 deletions(-) diff --git a/src/disasters/filters.py b/src/disasters/filters.py index dcad4b4..9505cd1 100644 --- a/src/disasters/filters.py +++ b/src/disasters/filters.py @@ -237,12 +237,11 @@ def generate_coastal_mask(bbox: list, master_grid: dict) -> Optional[xr.DataArra return None -def process_dem_and_slope( - df: pd.DataFrame, master_grid: dict, threshold: float, output_dir: Path, skip_existing: bool = False -) -> Optional[np.ndarray]: +def process_dem_and_slope(df: pd.DataFrame, master_grid: dict, threshold: float, output_dir: Path, skip_existing: bool = True): """ - Fetches all DSWx-HLS Band 10 URLs and mosaics them into 'dem.tif' saved at output_dir. - Calculates slope and returns a boolean mask (True where slope < threshold). + Fetches all DSWx-HLS Band 10 URLs (or downloads them if missing) and mosaics them + into 'dem.tif' saved at output_dir. Calculates slope and returns a boolean mask + (True where slope < threshold). Args: df (pd.DataFrame): DataFrame containing OPERA products metadata. @@ -254,8 +253,12 @@ def process_dem_and_slope( Returns: np.ndarray: Mask indicating areas where slope is below threshold. """ - logger.info(f"Processing DEM and Slope Mask (Threshold: {threshold} deg)...") + from .catalog import fetch_missing_dems + from .io import scan_local_directory + from .mosaic import get_local_spatial_properties + logger.info(f"[Filters] Processing DEM and generating slope mask (> {threshold} degrees)...") + dem_output_path = output_dir / "dem.tif" slope_output_path = output_dir / "slope.tif" @@ -273,30 +276,49 @@ def process_dem_and_slope( except Exception as e: logger.warning(f"Failed to read existing slope mask: {e}. Proceeding to recompute...") - # Filter for ALL DSWx-HLS products to get DEMs - dswx_rows = df[df['Dataset'] == 'OPERA_L3_DSWX-HLS_V1'] + # Check if we have DEM files locally or DSWx-HLS cloud links + has_dem_files = any('_B10_DEM' in str(path) for path in df['Filepath'].dropna() if pd.notna(path)) + has_dswx_cloud = not df[df['Dataset'] == 'OPERA_L3_DSWX-HLS_V1'].empty - # Check if any DSWx-HLS products are available to generate the DEM mosaic. - if dswx_rows.empty: - logger.warning("No DSWx-HLS products found. Cannot generate DEM or slope mask.") - return None + # If there are no local DEMs and no cloud DSWx-HLS links, download them + if not has_dem_files and not has_dswx_cloud: + logger.info("[Filters] No DEM data found in DataFrame. Initiating dynamic DEM fetcher...") + + # Get bbox from the existing local files + auto_bbox, _ = get_local_spatial_properties(df) + + # Determine the local directory from the first available filepath + first_valid_file = df['Filepath'].dropna().iloc[0] + local_dir = Path(first_valid_file).parent + + # Fetch the DEMs + fetch_missing_dems(auto_bbox, local_dir) + + # Rescan the directory so our DataFrame now includes the newly downloaded B10_DEM files + df = scan_local_directory(local_dir) # Construct Band 10 DEM URLs dem_urls = [] - # Drop duplicates to avoid downloading/warping the same granule twice - for url in dswx_rows['Download URL WTR'].dropna().unique(): - if '_B01_WTR' in url: - # Replace WTR with DEM band - dem_url = url.replace('_B01_WTR', '_B10_DEM') - - # Prefix for GDAL vsicurl - if dem_url.startswith('http') and not dem_url.startswith('/vsi'): - dem_urls.append(f'/vsicurl/{dem_url}') - else: - dem_urls.append(dem_url) + explicit_dems = df[df['Filepath'].str.contains('_B10_DEM', na=False, case=False)]['Filepath'].tolist() + + if explicit_dems: + dem_urls = explicit_dems + else: + # Fallback for cloud mode: deduce DEM URLs from DSWx WTR URLs + dswx_rows = df[df['Dataset'] == 'OPERA_L3_DSWX-HLS_V1'] + for url in dswx_rows['Download URL WTR'].dropna().unique(): + if '_B01_WTR' in url: + if url.startswith('http') and not url.startswith('/vsi'): + dem_url = url.replace('_B01_WTR', '_B10_DEM') + dem_urls.append(f'/vsicurl/{dem_url}') + else: + local_dem_path = url.replace('_B01_WTR', '_B10_DEM') + dem_urls.append(local_dem_path) + + dem_urls = list(set(dem_urls)) if not dem_urls: - logger.warning("Could not construct Band 10 URLs.") + logger.warning("[Filters] Failed to identify or fetch any DEM URLs. Skipping slope masking.") return None # Extract Master Grid Properties @@ -355,4 +377,60 @@ def process_dem_and_slope( except Exception as e: logger.warning(f"Slope processing failed: {e}") - return None \ No newline at end of file + return None + + +def apply_slope_mask_to_raster(target_tif: Path, slope_tif: Path, threshold: float, output_tif: Path): + """ + Dynamically reprojects the slope mask to match the target raster's grid, + applies the mask, and saves the filtered output. + """ + import rasterio + import numpy as np + from rasterio.warp import reproject, Resampling + import logging + + logger = logging.getLogger(__name__) + + try: + with rasterio.open(target_tif) as src: + target_meta = src.meta.copy() + target_arr = src.read(1) + target_crs = src.crs + target_transform = src.transform + nodata_val = src.nodata if src.nodata is not None else -9999 + + with rasterio.open(slope_tif) as slope_src: + # Create an empty array matching the target raster's exact shape + aligned_slope_arr = np.empty_like(target_arr, dtype=np.float32) + + # Reproject the slope data dynamically into the target's grid space + reproject( + source=rasterio.band(slope_src, 1), + destination=aligned_slope_arr, + src_transform=slope_src.transform, + src_crs=slope_src.crs, + dst_transform=target_transform, + dst_crs=target_crs, + resampling=Resampling.bilinear + ) + + # Apply the mask: If slope >= threshold OR slope is nodata, set target pixel to nodata + filtered_arr = np.where( + (aligned_slope_arr >= threshold) | (aligned_slope_arr == -9999), + nodata_val, + target_arr + ) + + # Ensure the output meta has a defined nodata value and uses compression + target_meta.update({ + "driver": "COG", + "compress": "deflate", + "nodata": nodata_val + }) + + with rasterio.open(output_tif, 'w', **target_meta) as dst: + dst.write(filtered_arr, 1) + + except Exception as e: + logger.error(f"Failed to apply slope mask to {target_tif.name}: {e}") \ No newline at end of file From 793c48bfe5fd90864dd5fe06eb8897063e370932 Mon Sep 17 00:00:00 2001 From: cmspeed Date: Thu, 23 Apr 2026 11:33:46 -0700 Subject: [PATCH 4/9] Add run_slope_filter_only to pipeline --- src/disasters/pipeline.py | 66 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 66 insertions(+) diff --git a/src/disasters/pipeline.py b/src/disasters/pipeline.py index d570d7a..e49b412 100644 --- a/src/disasters/pipeline.py +++ b/src/disasters/pipeline.py @@ -697,6 +697,72 @@ def run_mosaic_only(input_dir: Path, output_dir: Path, bbox: Sequence[float] | s return output_dir +def run_slope_filter_only(local_dir: Path, slope_threshold: float, output_dir: Path) -> Path | None: + """ + Run a standalone pipeline to generate a slope mask and apply it to all valid rasters in the local directory. + """ + from .io import scan_local_directory, ensure_directory + from .mosaic import get_local_spatial_properties, get_master_grid_props + from .filters import process_dem_and_slope, apply_slope_mask_to_raster + import pyproj + import logging + + logger = logging.getLogger(__name__) + + logger.info(f"[Pipeline] Running standalone SLOPE pipeline using data from: {local_dir}") + + df_opera = scan_local_directory(local_dir) + if df_opera.empty and not list(local_dir.glob("*.tif")): + logger.error("[Pipeline] No valid TIF data found in the local directory.") + return None + + ensure_directory(output_dir) + + # Calculate Master Grid for the Slope Generation + auto_bbox, target_crs_proj4 = get_local_spatial_properties(df_opera) + crs_obj = pyproj.CRS.from_string(target_crs_proj4) + target_res = 0.0002695 if crs_obj.is_geographic else 30 + master_grid = get_master_grid_props(auto_bbox, target_crs_proj4, target_res=target_res) + + # Generate the master dem.tif and slope.tif + mask = process_dem_and_slope( + df=df_opera, + master_grid=master_grid, + threshold=slope_threshold, + output_dir=output_dir, + skip_existing=False + ) + + slope_tif_path = output_dir / "slope.tif" + if mask is None or not slope_tif_path.exists(): + logger.error("[Pipeline] Failed to generate base slope mask. Cannot proceed with filtering.") + return None + + # Find all data files to mask (ignore DEMs, base slope outputs, and already-filtered files) + ignore_list = ['_B10_DEM', 'dem.tif', 'slope.tif', '_slope_filtered'] + tifs_to_process = [ + f for f in local_dir.glob("*.tif") + if not any(ignore_str in f.name for ignore_str in ignore_list) + ] + + if not tifs_to_process: + logger.info("[Pipeline] Slope mask generated, but found no valid data TIFs to filter in the local directory.") + return output_dir + + # Apply the mask to every relevant file + logger.info(f"[Pipeline] Applying {slope_threshold}° slope filter to {len(tifs_to_process)} rasters...") + + for tif_path in tifs_to_process: + out_name = f"{tif_path.stem}_{int(slope_threshold)}deg_slope_filtered.tif" + out_path = output_dir / out_name + + logger.info(f" -> Filtering {tif_path.name} to {out_name}...") + apply_slope_mask_to_raster(tif_path, slope_tif_path, slope_threshold, out_path) + + logger.info("[Pipeline] Standalone slope filtering complete!") + return output_dir + + def run_plotting_task( maps_dir, layouts_dir, mosaic_path, short_name, layer, date_id, layout_date, layout_title, bbox, zoom_bbox, From ef29944e5d2ab0da3c80aa553d94f2ec6f7dbb3e Mon Sep 17 00:00:00 2001 From: cmspeed Date: Thu, 23 Apr 2026 11:58:56 -0700 Subject: [PATCH 5/9] Update bbox parsing --- src/disasters/cli.py | 113 +++++++++++++++++-------------------------- src/disasters/io.py | 47 ++++++++++++++++++ 2 files changed, 91 insertions(+), 69 deletions(-) diff --git a/src/disasters/cli.py b/src/disasters/cli.py index 72da971..4005d5d 100644 --- a/src/disasters/cli.py +++ b/src/disasters/cli.py @@ -34,18 +34,11 @@ def cli() -> None: @cli.command(name="run") @click.option( - "-b", - "--bbox", + "-b", + "--bbox", type=str, - required=True, - help=( - "Bounding box or area of interest. MUST be enclosed in double quotes if it contains spaces. " - "Accepted formats: " - "1) 4 floats: \"S N W E\" | " - "2) WKT string: \"POLYGON((...))\" | " - "3) Local path: \"/path/to/file.kml\" | " - "4) Web URL: \"https://example.com/AOI.geojson\"" - ), + required=True, + help="Bounding box (4 coords, WKT, or path to KML/GeoJSON). Defines the search and processing AOI." ) @click.option( "-zb", @@ -207,33 +200,30 @@ def run( skip_existing: bool ) -> None: """Run the disaster pipeline (end-to-end).""" + + from .io import parse_bbox_input + from .pipeline import run_pipeline + import logging + + logger = logging.getLogger(__name__) + # Ensure slope values are between 0 and 100 degrees, if provided if slope_threshold is not None and not (0 <= slope_threshold <= 100): raise click.BadParameter("Slope threshold must be between 0 and 100.", param_hint="--slope-threshold") - # Process bbox tokens into a list of floats OR a single WKT/path string - bbox_parts = bbox.replace(",", " ").split() - - if len(bbox_parts) == 4: - try: - bbox_arg = [float(x) for x in bbox_parts] - except ValueError: - bbox_arg = bbox - else: - # Keep as WKT string or file path - bbox_arg = bbox + # Parse bbox input + try: + bbox_arg = parse_bbox_input(bbox) + except Exception as e: + raise click.BadParameter(f"Failed to parse bounding box: {e}", param_hint="--bbox") - # Process zoom_bbox if provided + # Parse zoom_bbox input, if provided zoom_bbox_arg = None if zoom_bbox is not None: - zoom_parts = zoom_bbox.replace(",", " ").split() - if len(zoom_parts) == 4: - try: - zoom_bbox_arg = [float(x) for x in zoom_parts] - except ValueError: - raise click.BadParameter("Zoom bounding box must contain exactly 4 valid numbers.", param_hint="--zoom-bbox") - else: - raise click.BadParameter("Zoom bounding box must contain exactly 4 valid numbers.", param_hint="--zoom-bbox") + try: + zoom_bbox_arg = parse_bbox_input(zoom_bbox) + except Exception as e: + raise click.BadParameter(f"Failed to parse zoom bounding box: {e}", param_hint="--zoom-bbox") # Build the PipelineConfig object cfg = PipelineConfig( @@ -439,11 +429,18 @@ def download( @cli.command(name="mosaic") @click.option( - "-i", - "--input-dir", + "-b", + "--bbox", + type=str, + required=False, + help="Bounding box (4 coords, WKT, or path to KML/GeoJSON). Limits mosaic extent.", +) +@click.option( + "-ld", + "--local-dir", type=click.Path(path_type=Path, file_okay=False, dir_okay=True, exists=True), required=True, - help="Path to a local directory containing pre-downloaded OPERA geotiffs.", + help="Path to a local directory containing pre-downloaded OPERA geotiffs. The mosaic will be built from these files." ) @click.option( "-o", @@ -452,18 +449,6 @@ def download( required=True, help="Directory where the stitched GeoTIFF mosaics will be saved.", ) -@click.option( - "-b", - "--bbox", - type=str, - required=False, - default=None, - help=( - "Optional bounding box to crop the output. If omitted, the pipeline computes the geographic union of all inputs. " - "MUST be enclosed in double quotes if it contains spaces. " - "Accepted formats: \"S N W E\" | \"POLYGON((...))\" | \"/path/to/file.kml\"" - ), -) @click.option( "--benchmark", is_flag=True, @@ -472,38 +457,28 @@ def download( ) def mosaic( - input_dir: Path, - output_dir: Path, bbox: Optional[str], + local_dir: Path, + output_dir: Path, benchmark: bool ) -> None: """Stitch local OPERA granules into analysis-ready mosaics (No analysis/layouts).""" - - # Process optional bbox using the same parsing as the run command - bbox_arg = None - if bbox is not None: - bbox_parts = bbox.replace(",", " ").split() - if len(bbox_parts) == 4: - try: - coords = [float(x) for x in bbox_parts] - # Auto-swap S/N if flipped - if coords[0] > coords[1]: coords[0], coords[1] = coords[1], coords[0] - # Auto-swap W/E if flipped - if coords[2] > coords[3]: coords[2], coords[3] = coords[3], coords[2] - bbox_arg = coords - except ValueError: - bbox_arg = bbox - else: - bbox_arg = bbox - - # Import the dedicated mosaic pipeline (we will build this next) + from .io import parse_bbox_input from .pipeline import run_mosaic_only + + # Parse the input string into the [S, N, W, E] list + parsed_bbox = None + if bbox: + try: + parsed_bbox = parse_bbox_input(bbox) + except Exception as e: + raise click.BadParameter(f"Failed to parse bounding box: {e}", param_hint="--bbox") logger.info("Starting mosaic pipeline...") output_path = run_mosaic_only( - input_dir=input_dir, + input_dir=local_dir, output_dir=output_dir, - bbox=bbox_arg, + bbox=parsed_bbox, benchmark=benchmark ) diff --git a/src/disasters/io.py b/src/disasters/io.py index 42f6074..7cb04d6 100644 --- a/src/disasters/io.py +++ b/src/disasters/io.py @@ -10,6 +10,53 @@ logger = logging.getLogger(__name__) +def parse_bbox_input(bbox_string: str) -> list[float]: + """ + Parses a string input (KML, GeoJSON, WKT, or 4 coordinates) + into a standardized [South, North, West, East] bounding box list. + Args: + bbox_string (str): Input string representing a bounding box. + Can be a file path to KML/GeoJSON/SHP, a WKT string, or raw coordinates. + Returns: + list[float]: Bounding box in the format [South, North, West, East]. + """ + import os + import geopandas as gpd + from shapely import wkt + import logging + + logger = logging.getLogger(__name__) + + # Check if it's a geospatial file-type (KML, GeoJSON, SHP) + if os.path.isfile(bbox_string): + if bbox_string.lower().endswith('.kml'): + import fiona + # Enable KML driver for geopandas/fiona + fiona.drvsupport.supported_drivers['KML'] = 'rw' + fiona.drvsupport.supported_drivers['LIBKML'] = 'rw' + + logger.info(f"Extracting bounding box from file: {bbox_string}") + gdf = gpd.read_file(bbox_string) + # GeoPandas total_bounds returns [minx, miny, maxx, maxy] -> [West, South, East, North] + w, s, e, n = gdf.total_bounds + return [s, n, w, e] + + # Check if it's a WKT string + if bbox_string.upper().startswith(('POLYGON', 'MULTIPOLYGON', 'BBOX')): + logger.info("Extracting bounding box from WKT string...") + geom = wkt.loads(bbox_string) + w, s, e, n = geom.bounds + return [s, n, w, e] + + # Assume it's a raw coordinate string + logger.info("Parsing raw coordinates...") + coords = [float(x) for x in bbox_string.replace(',', ' ').split()] + if len(coords) != 4: + raise ValueError("Bounding box must be a valid file, WKT, or 4 space/comma separated coordinates.") + + return coords + + def ensure_directory(output_dir: Path) -> Path: """ Create the output directory if it does not exist. From f35b8c7c0fd94b57833d1859b71aaa1d5367b10c Mon Sep 17 00:00:00 2001 From: cmspeed Date: Thu, 23 Apr 2026 14:02:00 -0700 Subject: [PATCH 6/9] Update pipeline to be more flexible for local slope filtering --- src/disasters/pipeline.py | 138 +++++++++++++++++++++++--------------- 1 file changed, 85 insertions(+), 53 deletions(-) diff --git a/src/disasters/pipeline.py b/src/disasters/pipeline.py index e49b412..89af3b1 100644 --- a/src/disasters/pipeline.py +++ b/src/disasters/pipeline.py @@ -77,52 +77,66 @@ class PipelineConfig: skip_existing: bool = False -def get_local_spatial_properties(df_opera: pd.DataFrame) -> tuple[list[float], str]: +def get_local_spatial_properties(df: pd.DataFrame) -> tuple[list[float], str]: """ Calculates the global bounding box [S, N, W, E] and most common CRS from a local DataFrame of OPERA products by reading their headers. + + Args: + df (pd.DataFrame): DataFrame containing metadata about local OPERA products, + including columns with file paths. + Returns: + tuple[list[float], str]: A tuple containing the bounding box as [S, N, W, E] and the most common CRS in PROJ4 string format. """ + logger = logging.getLogger(__name__) logger.info("Calculating spatial properties from local files...") - url_cols = [c for c in df_opera.columns if c.startswith("Download URL")] - all_files = [] - for c in url_cols: - all_files.extend(df_opera[c].dropna().tolist()) - all_files = list(set(all_files)) # Unique files only - - minx, miny, maxx, maxy = float('inf'), float('inf'), float('-inf'), float('-inf') - crs_counter = Counter() - - for f in all_files: + + min_s, max_n, min_w, max_e = 90.0, -90.0, 180.0, -180.0 + target_crs_wkt = None + valid_files = 0 + + for _, row in df.iterrows(): + # Try to get the explicit Filepath (used by standalone slope filter) + filepath = row.get('Filepath') + + # If no Filepath column, grab the first valid 'Download URL' column + if pd.isna(filepath): + for col in df.columns: + if str(col).startswith("Download URL") and pd.notna(row[col]): + filepath = row[col] + break + + # If we still don't have a file, skip to the next row + if pd.isna(filepath): + continue + try: - with rasterio.open(f) as src: - bounds = src.bounds - crs = src.crs - - if crs is not None: - crs_counter[crs.to_string()] += 1 + # Extract CRS and bounds using rasterio + with rasterio.open(filepath) as src: + if src.crs is None: + logger.warning(f"[Spatial Calc] File {filepath} has no CRS metadata! Skipping...") + continue - # Transform to EPSG:4326 to match S, N, W, E expected format - if crs and crs.to_string() != "EPSG:4326": - left, bottom, right, top = transform_bounds(crs, "EPSG:4326", *bounds) - else: - left, bottom, right, top = bounds + if not target_crs_wkt: + target_crs_wkt = src.crs.to_wkt() + + # rasterio bounds are (west, south, east, north) + bounds = transform_bounds(src.crs, "EPSG:4326", *src.bounds) + + min_w = min(min_w, bounds[0]) + min_s = min(min_s, bounds[1]) + max_e = max(max_e, bounds[2]) + max_n = max(max_n, bounds[3]) + + valid_files += 1 - minx = min(minx, left) - miny = min(miny, bottom) - maxx = max(maxx, right) - maxy = max(maxy, top) except Exception as e: - logger.warning(f"Could not read spatial properties from {f}: {e}") - - if minx == float('inf'): - raise RuntimeError("Could not calculate bounding box from local files.") - - most_common_crs = crs_counter.most_common(1)[0][0] - - logger.info(f"Local Master CRS determined: {most_common_crs}") - - # Return [S, N, W, E] and the CRS - return [miny, maxy, minx, maxx], most_common_crs + logger.error(f"[Spatial Calc] Failed to read {filepath}. Reason: {e}") + + if valid_files == 0: + raise RuntimeError("Could not calculate bounding box from local files. Check the errors above to see why rasterio rejected the files!") + + return [min_s, max_n, min_w, max_e], target_crs_wkt def run_pipeline(config: PipelineConfig) -> Path | None: @@ -700,9 +714,16 @@ def run_mosaic_only(input_dir: Path, output_dir: Path, bbox: Sequence[float] | s def run_slope_filter_only(local_dir: Path, slope_threshold: float, output_dir: Path) -> Path | None: """ Run a standalone pipeline to generate a slope mask and apply it to all valid rasters in the local directory. + + Args: + local_dir (Path): Directory containing the raw OPERA GeoTIFFs to process. + slope_threshold (float): Slope angle threshold in degrees for masking. + output_dir (Path): Directory to save the slope-filtered outputs. + Returns: + Path | None: The output directory containing slope-filtered rasters, or None if the process failed. """ from .io import scan_local_directory, ensure_directory - from .mosaic import get_local_spatial_properties, get_master_grid_props + from .mosaic import get_master_grid_props from .filters import process_dem_and_slope, apply_slope_mask_to_raster import pyproj import logging @@ -710,20 +731,41 @@ def run_slope_filter_only(local_dir: Path, slope_threshold: float, output_dir: P logger = logging.getLogger(__name__) logger.info(f"[Pipeline] Running standalone SLOPE pipeline using data from: {local_dir}") - + + logger.info("[Pipeline] Authenticating with Earthdata...") + authenticate() + + # Gather metadata about the local OPERA files to calculate spatial properties and find valid rasters to process df_opera = scan_local_directory(local_dir) - if df_opera.empty and not list(local_dir.glob("*.tif")): - logger.error("[Pipeline] No valid TIF data found in the local directory.") + + # Find all data files to mask (ignore DEMs, base slope outputs, and already-filtered files) + ignore_list = ['_B10_DEM', 'dem.tif', 'slope.tif', '_slope_filtered'] + tifs_to_process = [ + f for f in local_dir.glob("*.tif") + if not any(ignore_str in f.name for ignore_str in ignore_list) + ] + + if not tifs_to_process: + logger.error("[Pipeline] No valid data TIFs found in the local directory to process.") return None ensure_directory(output_dir) + + # Gather filepaths into a DataFrame for spatial calculations + df_spatial = pd.DataFrame({'Filepath': [str(p) for p in tifs_to_process]}) # Calculate Master Grid for the Slope Generation - auto_bbox, target_crs_proj4 = get_local_spatial_properties(df_opera) + auto_bbox, target_crs_proj4 = get_local_spatial_properties(df_spatial) crs_obj = pyproj.CRS.from_string(target_crs_proj4) target_res = 0.0002695 if crs_obj.is_geographic else 30 master_grid = get_master_grid_props(auto_bbox, target_crs_proj4, target_res=target_res) + # If the dir is empty or only contains mosaics, seed it with spatial metadata + if df_opera.empty: + df_opera = df_spatial.copy() + df_opera['Dataset'] = 'CUSTOM_MOSAIC' + df_opera['Download URL WTR'] = None + # Generate the master dem.tif and slope.tif mask = process_dem_and_slope( df=df_opera, @@ -738,21 +780,11 @@ def run_slope_filter_only(local_dir: Path, slope_threshold: float, output_dir: P logger.error("[Pipeline] Failed to generate base slope mask. Cannot proceed with filtering.") return None - # Find all data files to mask (ignore DEMs, base slope outputs, and already-filtered files) - ignore_list = ['_B10_DEM', 'dem.tif', 'slope.tif', '_slope_filtered'] - tifs_to_process = [ - f for f in local_dir.glob("*.tif") - if not any(ignore_str in f.name for ignore_str in ignore_list) - ] - - if not tifs_to_process: - logger.info("[Pipeline] Slope mask generated, but found no valid data TIFs to filter in the local directory.") - return output_dir - # Apply the mask to every relevant file logger.info(f"[Pipeline] Applying {slope_threshold}° slope filter to {len(tifs_to_process)} rasters...") for tif_path in tifs_to_process: + # Create output filename clearly linked to the input out_name = f"{tif_path.stem}_{int(slope_threshold)}deg_slope_filtered.tif" out_path = output_dir / out_name From bd002a493f0ff2cb6ab1feb1f4ed57ce394ee6c2 Mon Sep 17 00:00:00 2001 From: cmspeed Date: Thu, 23 Apr 2026 14:03:32 -0700 Subject: [PATCH 7/9] Update earthdata query for DEM if not present during local slope filtering --- src/disasters/catalog.py | 14 ++++++++---- src/disasters/filters.py | 45 +++++++++++++++++++++++---------------- src/disasters/pipeline.py | 2 +- 3 files changed, 38 insertions(+), 23 deletions(-) diff --git a/src/disasters/catalog.py b/src/disasters/catalog.py index 69f4994..48e488d 100644 --- a/src/disasters/catalog.py +++ b/src/disasters/catalog.py @@ -88,16 +88,22 @@ def fetch_missing_dems(bbox: list, local_dir: Path) -> None: """ import datetime import earthaccess + import logging logger.info("[DEM Fetcher] Missing local DEMs detected. Querying Earthdata for static topography...") + try: - # We just need one recent static DEM. Look at the last 60 days to ensure coverage. - end_date = datetime.datetime.utcnow() + # Repackage our [S, N, W, E] bbox into Earthaccess format: (W, S, E, N) + s, n, w, e = bbox + cmr_bbox = (w, s, e, n) + + # Query Earthdata for recent DSWx-HLS granules covering the bbox (last 60 days) + end_date = datetime.datetime.now(datetime.timezone.utc) start_date = end_date - datetime.timedelta(days=60) results = earthaccess.search_data( short_name="OPERA_L3_DSWX-HLS_V1", - bounding_box=tuple(bbox), + bounding_box=cmr_bbox, temporal=(start_date.strftime("%Y-%m-%d"), end_date.strftime("%Y-%m-%d")), count=20 # Grab enough to cover the bbox footprint ) @@ -106,7 +112,7 @@ def fetch_missing_dems(bbox: list, local_dir: Path) -> None: logger.warning("[DEM Fetcher] No recent DSWx-HLS granules found for this BBOX.") return - # Filter to ONLY get the _B10_DEM URLs (we don't want to download the water data) + # Filter to get only the _B10_DEM URLs dem_urls = [] for granule in results: for link in granule.data_links(): diff --git a/src/disasters/filters.py b/src/disasters/filters.py index 9505cd1..420a16e 100644 --- a/src/disasters/filters.py +++ b/src/disasters/filters.py @@ -255,7 +255,7 @@ def process_dem_and_slope(df: pd.DataFrame, master_grid: dict, threshold: float, """ from .catalog import fetch_missing_dems from .io import scan_local_directory - from .mosaic import get_local_spatial_properties + from .pipeline import get_local_spatial_properties logger.info(f"[Filters] Processing DEM and generating slope mask (> {threshold} degrees)...") @@ -277,29 +277,38 @@ def process_dem_and_slope(df: pd.DataFrame, master_grid: dict, threshold: float, logger.warning(f"Failed to read existing slope mask: {e}. Proceeding to recompute...") # Check if we have DEM files locally or DSWx-HLS cloud links - has_dem_files = any('_B10_DEM' in str(path) for path in df['Filepath'].dropna() if pd.notna(path)) - has_dswx_cloud = not df[df['Dataset'] == 'OPERA_L3_DSWX-HLS_V1'].empty - - # If there are no local DEMs and no cloud DSWx-HLS links, download them + all_paths = [] + if 'Filepath' in df.columns: + all_paths.extend(df['Filepath'].dropna().astype(str).tolist()) + for col in df.columns: + if str(col).startswith('Download URL'): + all_paths.extend(df[col].dropna().astype(str).tolist()) + + has_dem_files = any('_B10_DEM' in p for p in all_paths) + has_dswx_cloud = 'Dataset' in df.columns and df['Dataset'].str.contains('DSWx-HLS', na=False).any() + + # If there are no local DEMs and no cloud DSWx links, download them if not has_dem_files and not has_dswx_cloud: - logger.info("[Filters] No DEM data found in DataFrame. Initiating dynamic DEM fetcher...") + logger.info("[Filters] No DEM data found. Initiating dynamic DEM fetcher...") - # Get bbox from the existing local files - auto_bbox, _ = get_local_spatial_properties(df) + from .pipeline import get_local_spatial_properties + from .catalog import fetch_missing_dems - # Determine the local directory from the first available filepath - first_valid_file = df['Filepath'].dropna().iloc[0] - local_dir = Path(first_valid_file).parent + auto_bbox, _ = get_local_spatial_properties(df) + local_dir = Path(all_paths[0]).parent - # Fetch the DEMs fetch_missing_dems(auto_bbox, local_dir) - # Rescan the directory so our DataFrame now includes the newly downloaded B10_DEM files - df = scan_local_directory(local_dir) - - # Construct Band 10 DEM URLs - dem_urls = [] - explicit_dems = df[df['Filepath'].str.contains('_B10_DEM', na=False, case=False)]['Filepath'].tolist() + # Inject newly downloaded DEMs into our path list + new_dems = [str(p) for p in local_dir.glob("*_B10_DEM*.tif")] + if new_dems: + all_paths.extend(new_dems) + else: + logger.warning("[Filters] Earthdata fetch completed, but no DEMs were found on disk.") + return None + + # Gather the final list of DEM paths for GDAL processing + explicit_dems = [p for p in all_paths if '_B10_DEM' in p] if explicit_dems: dem_urls = explicit_dems diff --git a/src/disasters/pipeline.py b/src/disasters/pipeline.py index 89af3b1..082a953 100644 --- a/src/disasters/pipeline.py +++ b/src/disasters/pipeline.py @@ -791,7 +791,7 @@ def run_slope_filter_only(local_dir: Path, slope_threshold: float, output_dir: P logger.info(f" -> Filtering {tif_path.name} to {out_name}...") apply_slope_mask_to_raster(tif_path, slope_tif_path, slope_threshold, out_path) - logger.info("[Pipeline] Standalone slope filtering complete!") + logger.info("[Pipeline] Slope filtering complete.") return output_dir From cf78e6d7c2ecb6a2fdfda7186427cc95df5acaa3 Mon Sep 17 00:00:00 2001 From: cmspeed Date: Thu, 23 Apr 2026 14:23:32 -0700 Subject: [PATCH 8/9] Add check to ensure full DEM coverage --- src/disasters/filters.py | 46 +++++++++++++++++++++------------------- 1 file changed, 24 insertions(+), 22 deletions(-) diff --git a/src/disasters/filters.py b/src/disasters/filters.py index 420a16e..a9467f3 100644 --- a/src/disasters/filters.py +++ b/src/disasters/filters.py @@ -284,32 +284,34 @@ def process_dem_and_slope(df: pd.DataFrame, master_grid: dict, threshold: float, if str(col).startswith('Download URL'): all_paths.extend(df[col].dropna().astype(str).tolist()) - has_dem_files = any('_B10_DEM' in p for p in all_paths) has_dswx_cloud = 'Dataset' in df.columns and df['Dataset'].str.contains('DSWx-HLS', na=False).any() - # If there are no local DEMs and no cloud DSWx links, download them - if not has_dem_files and not has_dswx_cloud: - logger.info("[Filters] No DEM data found. Initiating dynamic DEM fetcher...") - - from .pipeline import get_local_spatial_properties - from .catalog import fetch_missing_dems - - auto_bbox, _ = get_local_spatial_properties(df) - local_dir = Path(all_paths[0]).parent - - fetch_missing_dems(auto_bbox, local_dir) - - # Inject newly downloaded DEMs into our path list - new_dems = [str(p) for p in local_dir.glob("*_B10_DEM*.tif")] - if new_dems: - all_paths.extend(new_dems) - else: - logger.warning("[Filters] Earthdata fetch completed, but no DEMs were found on disk.") - return None + # If there are no (or partial number of) local DEMs and no cloud DSWx links, download them + if not has_dswx_cloud: + logger.info("[Filters] Verifying continuous DEM coverage for spatial footprint...") - # Gather the final list of DEM paths for GDAL processing + from .pipeline import get_local_spatial_properties + from .catalog import fetch_missing_dems + + auto_bbox, _ = get_local_spatial_properties(df) + local_dir = Path(all_paths[0]).parent + + # Query CMR for any missing DEMs that intersect our footprint and download them to the local directory + fetch_missing_dems(auto_bbox, local_dir) + + # Inject all local DEMs (existing + newly downloaded) into our path list + new_dems = [str(p) for p in local_dir.glob("*_B10_DEM*.tif")] + if not new_dems: + logger.warning("[Filters] Earthdata fetch completed, but no DEMs were found on disk.") + return None + + for dem_path in new_dems: + if dem_path not in all_paths: + all_paths.append(dem_path) + + # Gather the complete list of DEM paths for GDAL processing explicit_dems = [p for p in all_paths if '_B10_DEM' in p] - + if explicit_dems: dem_urls = explicit_dems else: From f4c2cf926bada3670e2aa0e83336647520027289 Mon Sep 17 00:00:00 2001 From: cmspeed Date: Thu, 23 Apr 2026 14:52:15 -0700 Subject: [PATCH 9/9] Update environment.yml --- environment.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/environment.yml b/environment.yml index 5646adc..56bbf04 100644 --- a/environment.yml +++ b/environment.yml @@ -11,6 +11,7 @@ dependencies: - boto3 - click - earthaccess + - fiona - geopandas - jupyterlab - leafmap