diff --git a/environment.yml b/environment.yml index 5646adc..56bbf04 100644 --- a/environment.yml +++ b/environment.yml @@ -11,6 +11,7 @@ dependencies: - boto3 - click - earthaccess + - fiona - geopandas - jupyterlab - leafmap diff --git a/src/disasters/catalog.py b/src/disasters/catalog.py index 18007fe..48e488d 100644 --- a/src/disasters/catalog.py +++ b/src/disasters/catalog.py @@ -81,6 +81,59 @@ def read_opera_metadata(output_dir: Path) -> pd.DataFrame: return df +def fetch_missing_dems(bbox: list, local_dir: Path) -> None: + """ + Queries Earthdata for recent DSWx-HLS granules covering the bbox + and downloads ONLY their _B10_DEM.tif files to the local directory. + """ + import datetime + import earthaccess + import logging + + logger.info("[DEM Fetcher] Missing local DEMs detected. Querying Earthdata for static topography...") + + try: + # Repackage our [S, N, W, E] bbox into Earthaccess format: (W, S, E, N) + s, n, w, e = bbox + cmr_bbox = (w, s, e, n) + + # Query Earthdata for recent DSWx-HLS granules covering the bbox (last 60 days) + end_date = datetime.datetime.now(datetime.timezone.utc) + start_date = end_date - datetime.timedelta(days=60) + + results = earthaccess.search_data( + short_name="OPERA_L3_DSWX-HLS_V1", + bounding_box=cmr_bbox, + temporal=(start_date.strftime("%Y-%m-%d"), end_date.strftime("%Y-%m-%d")), + count=20 # Grab enough to cover the bbox footprint + ) + + if not results: + logger.warning("[DEM Fetcher] No recent DSWx-HLS granules found for this BBOX.") + return + + # Filter to get only the _B10_DEM URLs + dem_urls = [] + for granule in results: + for link in granule.data_links(): + if "_B10_DEM.tif" in link: + dem_urls.append(link) + + if not dem_urls: + logger.warning("[DEM Fetcher] Found granules, but no _B10_DEM.tif links.") + return + + # Remove duplicates + dem_urls = list(set(dem_urls)) + + logger.info(f"[DEM Fetcher] Downloading {len(dem_urls)} DEM layers to {local_dir}...") + earthaccess.download(dem_urls, local_path=str(local_dir)) + logger.info("[DEM Fetcher] Topography download complete.") + + except Exception as e: + logger.error(f"[DEM Fetcher] Failed to fetch missing DEMs: {e}") + + def cluster_by_time(df: pd.DataFrame, time_col: str = "Start Time", threshold_minutes: int = 120) -> list: """ Groups dataframe rows by time clustering to separate passes (e.g. Ascending vs Descending). diff --git a/src/disasters/cli.py b/src/disasters/cli.py index 977b5a6..4005d5d 100644 --- a/src/disasters/cli.py +++ b/src/disasters/cli.py @@ -34,18 +34,11 @@ def cli() -> None: @cli.command(name="run") @click.option( - "-b", - "--bbox", + "-b", + "--bbox", type=str, - required=True, - help=( - "Bounding box or area of interest. MUST be enclosed in double quotes if it contains spaces. " - "Accepted formats: " - "1) 4 floats: \"S N W E\" | " - "2) WKT string: \"POLYGON((...))\" | " - "3) Local path: \"/path/to/file.kml\" | " - "4) Web URL: \"https://example.com/AOI.geojson\"" - ), + required=True, + help="Bounding box (4 coords, WKT, or path to KML/GeoJSON). Defines the search and processing AOI." ) @click.option( "-zb", @@ -207,33 +200,30 @@ def run( skip_existing: bool ) -> None: """Run the disaster pipeline (end-to-end).""" + + from .io import parse_bbox_input + from .pipeline import run_pipeline + import logging + + logger = logging.getLogger(__name__) + # Ensure slope values are between 0 and 100 degrees, if provided if slope_threshold is not None and not (0 <= slope_threshold <= 100): raise click.BadParameter("Slope threshold must be between 0 and 100.", param_hint="--slope-threshold") - # Process bbox tokens into a list of floats OR a single WKT/path string - bbox_parts = bbox.replace(",", " ").split() - - if len(bbox_parts) == 4: - try: - bbox_arg = [float(x) for x in bbox_parts] - except ValueError: - bbox_arg = bbox - else: - # Keep as WKT string or file path - bbox_arg = bbox + # Parse bbox input + try: + bbox_arg = parse_bbox_input(bbox) + except Exception as e: + raise click.BadParameter(f"Failed to parse bounding box: {e}", param_hint="--bbox") - # Process zoom_bbox if provided + # Parse zoom_bbox input, if provided zoom_bbox_arg = None if zoom_bbox is not None: - zoom_parts = zoom_bbox.replace(",", " ").split() - if len(zoom_parts) == 4: - try: - zoom_bbox_arg = [float(x) for x in zoom_parts] - except ValueError: - raise click.BadParameter("Zoom bounding box must contain exactly 4 valid numbers.", param_hint="--zoom-bbox") - else: - raise click.BadParameter("Zoom bounding box must contain exactly 4 valid numbers.", param_hint="--zoom-bbox") + try: + zoom_bbox_arg = parse_bbox_input(zoom_bbox) + except Exception as e: + raise click.BadParameter(f"Failed to parse zoom bounding box: {e}", param_hint="--zoom-bbox") # Build the PipelineConfig object cfg = PipelineConfig( @@ -439,11 +429,18 @@ def download( @cli.command(name="mosaic") @click.option( - "-i", - "--input-dir", + "-b", + "--bbox", + type=str, + required=False, + help="Bounding box (4 coords, WKT, or path to KML/GeoJSON). Limits mosaic extent.", +) +@click.option( + "-ld", + "--local-dir", type=click.Path(path_type=Path, file_okay=False, dir_okay=True, exists=True), required=True, - help="Path to a local directory containing pre-downloaded OPERA geotiffs.", + help="Path to a local directory containing pre-downloaded OPERA geotiffs. The mosaic will be built from these files." ) @click.option( "-o", @@ -452,18 +449,6 @@ def download( required=True, help="Directory where the stitched GeoTIFF mosaics will be saved.", ) -@click.option( - "-b", - "--bbox", - type=str, - required=False, - default=None, - help=( - "Optional bounding box to crop the output. If omitted, the pipeline computes the geographic union of all inputs. " - "MUST be enclosed in double quotes if it contains spaces. " - "Accepted formats: \"S N W E\" | \"POLYGON((...))\" | \"/path/to/file.kml\"" - ), -) @click.option( "--benchmark", is_flag=True, @@ -472,38 +457,28 @@ def download( ) def mosaic( - input_dir: Path, - output_dir: Path, bbox: Optional[str], + local_dir: Path, + output_dir: Path, benchmark: bool ) -> None: """Stitch local OPERA granules into analysis-ready mosaics (No analysis/layouts).""" - - # Process optional bbox using the same parsing as the run command - bbox_arg = None - if bbox is not None: - bbox_parts = bbox.replace(",", " ").split() - if len(bbox_parts) == 4: - try: - coords = [float(x) for x in bbox_parts] - # Auto-swap S/N if flipped - if coords[0] > coords[1]: coords[0], coords[1] = coords[1], coords[0] - # Auto-swap W/E if flipped - if coords[2] > coords[3]: coords[2], coords[3] = coords[3], coords[2] - bbox_arg = coords - except ValueError: - bbox_arg = bbox - else: - bbox_arg = bbox - - # Import the dedicated mosaic pipeline (we will build this next) + from .io import parse_bbox_input from .pipeline import run_mosaic_only + + # Parse the input string into the [S, N, W, E] list + parsed_bbox = None + if bbox: + try: + parsed_bbox = parse_bbox_input(bbox) + except Exception as e: + raise click.BadParameter(f"Failed to parse bounding box: {e}", param_hint="--bbox") logger.info("Starting mosaic pipeline...") output_path = run_mosaic_only( - input_dir=input_dir, + input_dir=local_dir, output_dir=output_dir, - bbox=bbox_arg, + bbox=parsed_bbox, benchmark=benchmark ) @@ -512,5 +487,49 @@ def mosaic( else: logger.warning("Mosaic pipeline exited without producing outputs.") + +@cli.command(name="slope-filter") +@click.option( + "-ld", + "--local-dir", + type=click.Path(path_type=Path, file_okay=False, dir_okay=True, exists=True), + required=True, + help="Path to a local directory containing pre-downloaded OPERA geotiffs.", +) +@click.option( + "-st", + "--slope-threshold", + type=float, + required=True, + help="Slope threshold in degrees (0-100) to define the resulting mask.", +) +@click.option( + "-o", + "--output-dir", + type=click.Path(path_type=Path, file_okay=False, dir_okay=True), + required=True, + help="Directory where the generated dem.tif and slope.tif will be saved.", +) +def slope_filter(local_dir: Path, slope_threshold: float, output_dir: Path) -> None: + """Generate a standalone DEM and slope mask from local OPERA products.""" + + if not (0 <= slope_threshold <= 100): + raise click.BadParameter("Slope threshold must be between 0 and 100.", param_hint="--slope-threshold") + + from .pipeline import run_slope_filter_only + + logger.info(f"Starting standalone slope filter pipeline for threshold > {slope_threshold} degrees...") + out_dir = run_slope_filter_only( + local_dir=local_dir, + slope_threshold=slope_threshold, + output_dir=output_dir + ) + + if out_dir: + logger.info(f"Slope generation complete. Files saved to: {out_dir}") + else: + logger.warning("Slope pipeline exited without producing outputs.") + + if __name__ == "__main__": cli() \ No newline at end of file diff --git a/src/disasters/filters.py b/src/disasters/filters.py index dcad4b4..a9467f3 100644 --- a/src/disasters/filters.py +++ b/src/disasters/filters.py @@ -237,12 +237,11 @@ def generate_coastal_mask(bbox: list, master_grid: dict) -> Optional[xr.DataArra return None -def process_dem_and_slope( - df: pd.DataFrame, master_grid: dict, threshold: float, output_dir: Path, skip_existing: bool = False -) -> Optional[np.ndarray]: +def process_dem_and_slope(df: pd.DataFrame, master_grid: dict, threshold: float, output_dir: Path, skip_existing: bool = True): """ - Fetches all DSWx-HLS Band 10 URLs and mosaics them into 'dem.tif' saved at output_dir. - Calculates slope and returns a boolean mask (True where slope < threshold). + Fetches all DSWx-HLS Band 10 URLs (or downloads them if missing) and mosaics them + into 'dem.tif' saved at output_dir. Calculates slope and returns a boolean mask + (True where slope < threshold). Args: df (pd.DataFrame): DataFrame containing OPERA products metadata. @@ -254,8 +253,12 @@ def process_dem_and_slope( Returns: np.ndarray: Mask indicating areas where slope is below threshold. """ - logger.info(f"Processing DEM and Slope Mask (Threshold: {threshold} deg)...") + from .catalog import fetch_missing_dems + from .io import scan_local_directory + from .pipeline import get_local_spatial_properties + logger.info(f"[Filters] Processing DEM and generating slope mask (> {threshold} degrees)...") + dem_output_path = output_dir / "dem.tif" slope_output_path = output_dir / "slope.tif" @@ -273,30 +276,60 @@ def process_dem_and_slope( except Exception as e: logger.warning(f"Failed to read existing slope mask: {e}. Proceeding to recompute...") - # Filter for ALL DSWx-HLS products to get DEMs - dswx_rows = df[df['Dataset'] == 'OPERA_L3_DSWX-HLS_V1'] - - # Check if any DSWx-HLS products are available to generate the DEM mosaic. - if dswx_rows.empty: - logger.warning("No DSWx-HLS products found. Cannot generate DEM or slope mask.") - return None + # Check if we have DEM files locally or DSWx-HLS cloud links + all_paths = [] + if 'Filepath' in df.columns: + all_paths.extend(df['Filepath'].dropna().astype(str).tolist()) + for col in df.columns: + if str(col).startswith('Download URL'): + all_paths.extend(df[col].dropna().astype(str).tolist()) - # Construct Band 10 DEM URLs - dem_urls = [] - # Drop duplicates to avoid downloading/warping the same granule twice - for url in dswx_rows['Download URL WTR'].dropna().unique(): - if '_B01_WTR' in url: - # Replace WTR with DEM band - dem_url = url.replace('_B01_WTR', '_B10_DEM') + has_dswx_cloud = 'Dataset' in df.columns and df['Dataset'].str.contains('DSWx-HLS', na=False).any() + + # If there are no (or partial number of) local DEMs and no cloud DSWx links, download them + if not has_dswx_cloud: + logger.info("[Filters] Verifying continuous DEM coverage for spatial footprint...") + + from .pipeline import get_local_spatial_properties + from .catalog import fetch_missing_dems + + auto_bbox, _ = get_local_spatial_properties(df) + local_dir = Path(all_paths[0]).parent - # Prefix for GDAL vsicurl - if dem_url.startswith('http') and not dem_url.startswith('/vsi'): - dem_urls.append(f'/vsicurl/{dem_url}') - else: - dem_urls.append(dem_url) + # Query CMR for any missing DEMs that intersect our footprint and download them to the local directory + fetch_missing_dems(auto_bbox, local_dir) + + # Inject all local DEMs (existing + newly downloaded) into our path list + new_dems = [str(p) for p in local_dir.glob("*_B10_DEM*.tif")] + if not new_dems: + logger.warning("[Filters] Earthdata fetch completed, but no DEMs were found on disk.") + return None + + for dem_path in new_dems: + if dem_path not in all_paths: + all_paths.append(dem_path) + + # Gather the complete list of DEM paths for GDAL processing + explicit_dems = [p for p in all_paths if '_B10_DEM' in p] + + if explicit_dems: + dem_urls = explicit_dems + else: + # Fallback for cloud mode: deduce DEM URLs from DSWx WTR URLs + dswx_rows = df[df['Dataset'] == 'OPERA_L3_DSWX-HLS_V1'] + for url in dswx_rows['Download URL WTR'].dropna().unique(): + if '_B01_WTR' in url: + if url.startswith('http') and not url.startswith('/vsi'): + dem_url = url.replace('_B01_WTR', '_B10_DEM') + dem_urls.append(f'/vsicurl/{dem_url}') + else: + local_dem_path = url.replace('_B01_WTR', '_B10_DEM') + dem_urls.append(local_dem_path) + + dem_urls = list(set(dem_urls)) if not dem_urls: - logger.warning("Could not construct Band 10 URLs.") + logger.warning("[Filters] Failed to identify or fetch any DEM URLs. Skipping slope masking.") return None # Extract Master Grid Properties @@ -355,4 +388,60 @@ def process_dem_and_slope( except Exception as e: logger.warning(f"Slope processing failed: {e}") - return None \ No newline at end of file + return None + + +def apply_slope_mask_to_raster(target_tif: Path, slope_tif: Path, threshold: float, output_tif: Path): + """ + Dynamically reprojects the slope mask to match the target raster's grid, + applies the mask, and saves the filtered output. + """ + import rasterio + import numpy as np + from rasterio.warp import reproject, Resampling + import logging + + logger = logging.getLogger(__name__) + + try: + with rasterio.open(target_tif) as src: + target_meta = src.meta.copy() + target_arr = src.read(1) + target_crs = src.crs + target_transform = src.transform + nodata_val = src.nodata if src.nodata is not None else -9999 + + with rasterio.open(slope_tif) as slope_src: + # Create an empty array matching the target raster's exact shape + aligned_slope_arr = np.empty_like(target_arr, dtype=np.float32) + + # Reproject the slope data dynamically into the target's grid space + reproject( + source=rasterio.band(slope_src, 1), + destination=aligned_slope_arr, + src_transform=slope_src.transform, + src_crs=slope_src.crs, + dst_transform=target_transform, + dst_crs=target_crs, + resampling=Resampling.bilinear + ) + + # Apply the mask: If slope >= threshold OR slope is nodata, set target pixel to nodata + filtered_arr = np.where( + (aligned_slope_arr >= threshold) | (aligned_slope_arr == -9999), + nodata_val, + target_arr + ) + + # Ensure the output meta has a defined nodata value and uses compression + target_meta.update({ + "driver": "COG", + "compress": "deflate", + "nodata": nodata_val + }) + + with rasterio.open(output_tif, 'w', **target_meta) as dst: + dst.write(filtered_arr, 1) + + except Exception as e: + logger.error(f"Failed to apply slope mask to {target_tif.name}: {e}") \ No newline at end of file diff --git a/src/disasters/io.py b/src/disasters/io.py index 42f6074..7cb04d6 100644 --- a/src/disasters/io.py +++ b/src/disasters/io.py @@ -10,6 +10,53 @@ logger = logging.getLogger(__name__) +def parse_bbox_input(bbox_string: str) -> list[float]: + """ + Parses a string input (KML, GeoJSON, WKT, or 4 coordinates) + into a standardized [South, North, West, East] bounding box list. + Args: + bbox_string (str): Input string representing a bounding box. + Can be a file path to KML/GeoJSON/SHP, a WKT string, or raw coordinates. + Returns: + list[float]: Bounding box in the format [South, North, West, East]. + """ + import os + import geopandas as gpd + from shapely import wkt + import logging + + logger = logging.getLogger(__name__) + + # Check if it's a geospatial file-type (KML, GeoJSON, SHP) + if os.path.isfile(bbox_string): + if bbox_string.lower().endswith('.kml'): + import fiona + # Enable KML driver for geopandas/fiona + fiona.drvsupport.supported_drivers['KML'] = 'rw' + fiona.drvsupport.supported_drivers['LIBKML'] = 'rw' + + logger.info(f"Extracting bounding box from file: {bbox_string}") + gdf = gpd.read_file(bbox_string) + # GeoPandas total_bounds returns [minx, miny, maxx, maxy] -> [West, South, East, North] + w, s, e, n = gdf.total_bounds + return [s, n, w, e] + + # Check if it's a WKT string + if bbox_string.upper().startswith(('POLYGON', 'MULTIPOLYGON', 'BBOX')): + logger.info("Extracting bounding box from WKT string...") + geom = wkt.loads(bbox_string) + w, s, e, n = geom.bounds + return [s, n, w, e] + + # Assume it's a raw coordinate string + logger.info("Parsing raw coordinates...") + coords = [float(x) for x in bbox_string.replace(',', ' ').split()] + if len(coords) != 4: + raise ValueError("Bounding box must be a valid file, WKT, or 4 space/comma separated coordinates.") + + return coords + + def ensure_directory(output_dir: Path) -> Path: """ Create the output directory if it does not exist. diff --git a/src/disasters/pipeline.py b/src/disasters/pipeline.py index d570d7a..082a953 100644 --- a/src/disasters/pipeline.py +++ b/src/disasters/pipeline.py @@ -77,52 +77,66 @@ class PipelineConfig: skip_existing: bool = False -def get_local_spatial_properties(df_opera: pd.DataFrame) -> tuple[list[float], str]: +def get_local_spatial_properties(df: pd.DataFrame) -> tuple[list[float], str]: """ Calculates the global bounding box [S, N, W, E] and most common CRS from a local DataFrame of OPERA products by reading their headers. + + Args: + df (pd.DataFrame): DataFrame containing metadata about local OPERA products, + including columns with file paths. + Returns: + tuple[list[float], str]: A tuple containing the bounding box as [S, N, W, E] and the most common CRS in PROJ4 string format. """ + logger = logging.getLogger(__name__) logger.info("Calculating spatial properties from local files...") - url_cols = [c for c in df_opera.columns if c.startswith("Download URL")] - all_files = [] - for c in url_cols: - all_files.extend(df_opera[c].dropna().tolist()) - all_files = list(set(all_files)) # Unique files only - - minx, miny, maxx, maxy = float('inf'), float('inf'), float('-inf'), float('-inf') - crs_counter = Counter() - - for f in all_files: + + min_s, max_n, min_w, max_e = 90.0, -90.0, 180.0, -180.0 + target_crs_wkt = None + valid_files = 0 + + for _, row in df.iterrows(): + # Try to get the explicit Filepath (used by standalone slope filter) + filepath = row.get('Filepath') + + # If no Filepath column, grab the first valid 'Download URL' column + if pd.isna(filepath): + for col in df.columns: + if str(col).startswith("Download URL") and pd.notna(row[col]): + filepath = row[col] + break + + # If we still don't have a file, skip to the next row + if pd.isna(filepath): + continue + try: - with rasterio.open(f) as src: - bounds = src.bounds - crs = src.crs - - if crs is not None: - crs_counter[crs.to_string()] += 1 + # Extract CRS and bounds using rasterio + with rasterio.open(filepath) as src: + if src.crs is None: + logger.warning(f"[Spatial Calc] File {filepath} has no CRS metadata! Skipping...") + continue - # Transform to EPSG:4326 to match S, N, W, E expected format - if crs and crs.to_string() != "EPSG:4326": - left, bottom, right, top = transform_bounds(crs, "EPSG:4326", *bounds) - else: - left, bottom, right, top = bounds + if not target_crs_wkt: + target_crs_wkt = src.crs.to_wkt() + + # rasterio bounds are (west, south, east, north) + bounds = transform_bounds(src.crs, "EPSG:4326", *src.bounds) + + min_w = min(min_w, bounds[0]) + min_s = min(min_s, bounds[1]) + max_e = max(max_e, bounds[2]) + max_n = max(max_n, bounds[3]) + + valid_files += 1 - minx = min(minx, left) - miny = min(miny, bottom) - maxx = max(maxx, right) - maxy = max(maxy, top) except Exception as e: - logger.warning(f"Could not read spatial properties from {f}: {e}") - - if minx == float('inf'): - raise RuntimeError("Could not calculate bounding box from local files.") - - most_common_crs = crs_counter.most_common(1)[0][0] - - logger.info(f"Local Master CRS determined: {most_common_crs}") - - # Return [S, N, W, E] and the CRS - return [miny, maxy, minx, maxx], most_common_crs + logger.error(f"[Spatial Calc] Failed to read {filepath}. Reason: {e}") + + if valid_files == 0: + raise RuntimeError("Could not calculate bounding box from local files. Check the errors above to see why rasterio rejected the files!") + + return [min_s, max_n, min_w, max_e], target_crs_wkt def run_pipeline(config: PipelineConfig) -> Path | None: @@ -697,6 +711,90 @@ def run_mosaic_only(input_dir: Path, output_dir: Path, bbox: Sequence[float] | s return output_dir +def run_slope_filter_only(local_dir: Path, slope_threshold: float, output_dir: Path) -> Path | None: + """ + Run a standalone pipeline to generate a slope mask and apply it to all valid rasters in the local directory. + + Args: + local_dir (Path): Directory containing the raw OPERA GeoTIFFs to process. + slope_threshold (float): Slope angle threshold in degrees for masking. + output_dir (Path): Directory to save the slope-filtered outputs. + Returns: + Path | None: The output directory containing slope-filtered rasters, or None if the process failed. + """ + from .io import scan_local_directory, ensure_directory + from .mosaic import get_master_grid_props + from .filters import process_dem_and_slope, apply_slope_mask_to_raster + import pyproj + import logging + + logger = logging.getLogger(__name__) + + logger.info(f"[Pipeline] Running standalone SLOPE pipeline using data from: {local_dir}") + + logger.info("[Pipeline] Authenticating with Earthdata...") + authenticate() + + # Gather metadata about the local OPERA files to calculate spatial properties and find valid rasters to process + df_opera = scan_local_directory(local_dir) + + # Find all data files to mask (ignore DEMs, base slope outputs, and already-filtered files) + ignore_list = ['_B10_DEM', 'dem.tif', 'slope.tif', '_slope_filtered'] + tifs_to_process = [ + f for f in local_dir.glob("*.tif") + if not any(ignore_str in f.name for ignore_str in ignore_list) + ] + + if not tifs_to_process: + logger.error("[Pipeline] No valid data TIFs found in the local directory to process.") + return None + + ensure_directory(output_dir) + + # Gather filepaths into a DataFrame for spatial calculations + df_spatial = pd.DataFrame({'Filepath': [str(p) for p in tifs_to_process]}) + + # Calculate Master Grid for the Slope Generation + auto_bbox, target_crs_proj4 = get_local_spatial_properties(df_spatial) + crs_obj = pyproj.CRS.from_string(target_crs_proj4) + target_res = 0.0002695 if crs_obj.is_geographic else 30 + master_grid = get_master_grid_props(auto_bbox, target_crs_proj4, target_res=target_res) + + # If the dir is empty or only contains mosaics, seed it with spatial metadata + if df_opera.empty: + df_opera = df_spatial.copy() + df_opera['Dataset'] = 'CUSTOM_MOSAIC' + df_opera['Download URL WTR'] = None + + # Generate the master dem.tif and slope.tif + mask = process_dem_and_slope( + df=df_opera, + master_grid=master_grid, + threshold=slope_threshold, + output_dir=output_dir, + skip_existing=False + ) + + slope_tif_path = output_dir / "slope.tif" + if mask is None or not slope_tif_path.exists(): + logger.error("[Pipeline] Failed to generate base slope mask. Cannot proceed with filtering.") + return None + + # Apply the mask to every relevant file + logger.info(f"[Pipeline] Applying {slope_threshold}° slope filter to {len(tifs_to_process)} rasters...") + + for tif_path in tifs_to_process: + # Create output filename clearly linked to the input + out_name = f"{tif_path.stem}_{int(slope_threshold)}deg_slope_filtered.tif" + out_path = output_dir / out_name + + logger.info(f" -> Filtering {tif_path.name} to {out_name}...") + apply_slope_mask_to_raster(tif_path, slope_tif_path, slope_threshold, out_path) + + logger.info("[Pipeline] Slope filtering complete.") + return output_dir + + def run_plotting_task( maps_dir, layouts_dir, mosaic_path, short_name, layer, date_id, layout_date, layout_title, bbox, zoom_bbox,