Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions environment.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ dependencies:
- boto3
- click
- earthaccess
- fiona
- geopandas
- jupyterlab
- leafmap
Expand Down
53 changes: 53 additions & 0 deletions src/disasters/catalog.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,59 @@ def read_opera_metadata(output_dir: Path) -> pd.DataFrame:
return df


def fetch_missing_dems(bbox: list, local_dir: Path) -> None:
"""
Queries Earthdata for recent DSWx-HLS granules covering the bbox
and downloads ONLY their _B10_DEM.tif files to the local directory.
"""
import datetime
import earthaccess
import logging

logger.info("[DEM Fetcher] Missing local DEMs detected. Querying Earthdata for static topography...")

try:
# Repackage our [S, N, W, E] bbox into Earthaccess format: (W, S, E, N)
s, n, w, e = bbox
cmr_bbox = (w, s, e, n)

# Query Earthdata for recent DSWx-HLS granules covering the bbox (last 60 days)
end_date = datetime.datetime.now(datetime.timezone.utc)
start_date = end_date - datetime.timedelta(days=60)

results = earthaccess.search_data(
short_name="OPERA_L3_DSWX-HLS_V1",
bounding_box=cmr_bbox,
temporal=(start_date.strftime("%Y-%m-%d"), end_date.strftime("%Y-%m-%d")),
count=20 # Grab enough to cover the bbox footprint
)

if not results:
logger.warning("[DEM Fetcher] No recent DSWx-HLS granules found for this BBOX.")
return

# Filter to get only the _B10_DEM URLs
dem_urls = []
for granule in results:
for link in granule.data_links():
if "_B10_DEM.tif" in link:
dem_urls.append(link)

if not dem_urls:
logger.warning("[DEM Fetcher] Found granules, but no _B10_DEM.tif links.")
return

# Remove duplicates
dem_urls = list(set(dem_urls))

logger.info(f"[DEM Fetcher] Downloading {len(dem_urls)} DEM layers to {local_dir}...")
earthaccess.download(dem_urls, local_path=str(local_dir))
logger.info("[DEM Fetcher] Topography download complete.")

except Exception as e:
logger.error(f"[DEM Fetcher] Failed to fetch missing DEMs: {e}")


def cluster_by_time(df: pd.DataFrame, time_col: str = "Start Time", threshold_minutes: int = 120) -> list:
"""
Groups dataframe rows by time clustering to separate passes (e.g. Ascending vs Descending).
Expand Down
157 changes: 88 additions & 69 deletions src/disasters/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,18 +34,11 @@ def cli() -> None:

@cli.command(name="run")
@click.option(
"-b",
"--bbox",
"-b",
"--bbox",
type=str,
required=True,
help=(
"Bounding box or area of interest. MUST be enclosed in double quotes if it contains spaces. "
"Accepted formats: "
"1) 4 floats: \"S N W E\" | "
"2) WKT string: \"POLYGON((...))\" | "
"3) Local path: \"/path/to/file.kml\" | "
"4) Web URL: \"https://example.com/AOI.geojson\""
),
required=True,
help="Bounding box (4 coords, WKT, or path to KML/GeoJSON). Defines the search and processing AOI."
)
@click.option(
"-zb",
Expand Down Expand Up @@ -207,33 +200,30 @@ def run(
skip_existing: bool
) -> None:
"""Run the disaster pipeline (end-to-end)."""

from .io import parse_bbox_input
from .pipeline import run_pipeline
import logging

logger = logging.getLogger(__name__)

# Ensure slope values are between 0 and 100 degrees, if provided
if slope_threshold is not None and not (0 <= slope_threshold <= 100):
raise click.BadParameter("Slope threshold must be between 0 and 100.", param_hint="--slope-threshold")

# Process bbox tokens into a list of floats OR a single WKT/path string
bbox_parts = bbox.replace(",", " ").split()

if len(bbox_parts) == 4:
try:
bbox_arg = [float(x) for x in bbox_parts]
except ValueError:
bbox_arg = bbox
else:
# Keep as WKT string or file path
bbox_arg = bbox
# Parse bbox input
try:
bbox_arg = parse_bbox_input(bbox)
except Exception as e:
raise click.BadParameter(f"Failed to parse bounding box: {e}", param_hint="--bbox")

# Process zoom_bbox if provided
# Parse zoom_bbox input, if provided
zoom_bbox_arg = None
if zoom_bbox is not None:
zoom_parts = zoom_bbox.replace(",", " ").split()
if len(zoom_parts) == 4:
try:
zoom_bbox_arg = [float(x) for x in zoom_parts]
except ValueError:
raise click.BadParameter("Zoom bounding box must contain exactly 4 valid numbers.", param_hint="--zoom-bbox")
else:
raise click.BadParameter("Zoom bounding box must contain exactly 4 valid numbers.", param_hint="--zoom-bbox")
try:
zoom_bbox_arg = parse_bbox_input(zoom_bbox)
except Exception as e:
raise click.BadParameter(f"Failed to parse zoom bounding box: {e}", param_hint="--zoom-bbox")

# Build the PipelineConfig object
cfg = PipelineConfig(
Expand Down Expand Up @@ -439,11 +429,18 @@ def download(

@cli.command(name="mosaic")
@click.option(
"-i",
"--input-dir",
"-b",
"--bbox",
type=str,
required=False,
help="Bounding box (4 coords, WKT, or path to KML/GeoJSON). Limits mosaic extent.",
)
@click.option(
"-ld",
"--local-dir",
type=click.Path(path_type=Path, file_okay=False, dir_okay=True, exists=True),
required=True,
help="Path to a local directory containing pre-downloaded OPERA geotiffs.",
help="Path to a local directory containing pre-downloaded OPERA geotiffs. The mosaic will be built from these files."
)
@click.option(
"-o",
Expand All @@ -452,18 +449,6 @@ def download(
required=True,
help="Directory where the stitched GeoTIFF mosaics will be saved.",
)
@click.option(
"-b",
"--bbox",
type=str,
required=False,
default=None,
help=(
"Optional bounding box to crop the output. If omitted, the pipeline computes the geographic union of all inputs. "
"MUST be enclosed in double quotes if it contains spaces. "
"Accepted formats: \"S N W E\" | \"POLYGON((...))\" | \"/path/to/file.kml\""
),
)
@click.option(
"--benchmark",
is_flag=True,
Expand All @@ -472,38 +457,28 @@ def download(
)

def mosaic(
input_dir: Path,
output_dir: Path,
bbox: Optional[str],
local_dir: Path,
output_dir: Path,
benchmark: bool
) -> None:
"""Stitch local OPERA granules into analysis-ready mosaics (No analysis/layouts)."""

# Process optional bbox using the same parsing as the run command
bbox_arg = None
if bbox is not None:
bbox_parts = bbox.replace(",", " ").split()
if len(bbox_parts) == 4:
try:
coords = [float(x) for x in bbox_parts]
# Auto-swap S/N if flipped
if coords[0] > coords[1]: coords[0], coords[1] = coords[1], coords[0]
# Auto-swap W/E if flipped
if coords[2] > coords[3]: coords[2], coords[3] = coords[3], coords[2]
bbox_arg = coords
except ValueError:
bbox_arg = bbox
else:
bbox_arg = bbox

# Import the dedicated mosaic pipeline (we will build this next)
from .io import parse_bbox_input
from .pipeline import run_mosaic_only

# Parse the input string into the [S, N, W, E] list
parsed_bbox = None
if bbox:
try:
parsed_bbox = parse_bbox_input(bbox)
except Exception as e:
raise click.BadParameter(f"Failed to parse bounding box: {e}", param_hint="--bbox")

logger.info("Starting mosaic pipeline...")
output_path = run_mosaic_only(
input_dir=input_dir,
input_dir=local_dir,
output_dir=output_dir,
bbox=bbox_arg,
bbox=parsed_bbox,
benchmark=benchmark
)

Expand All @@ -512,5 +487,49 @@ def mosaic(
else:
logger.warning("Mosaic pipeline exited without producing outputs.")


@cli.command(name="slope-filter")
@click.option(
"-ld",
"--local-dir",
type=click.Path(path_type=Path, file_okay=False, dir_okay=True, exists=True),
required=True,
help="Path to a local directory containing pre-downloaded OPERA geotiffs.",
)
@click.option(
"-st",
"--slope-threshold",
type=float,
required=True,
help="Slope threshold in degrees (0-100) to define the resulting mask.",
)
@click.option(
"-o",
"--output-dir",
type=click.Path(path_type=Path, file_okay=False, dir_okay=True),
required=True,
help="Directory where the generated dem.tif and slope.tif will be saved.",
)
def slope_filter(local_dir: Path, slope_threshold: float, output_dir: Path) -> None:
"""Generate a standalone DEM and slope mask from local OPERA products."""

if not (0 <= slope_threshold <= 100):
raise click.BadParameter("Slope threshold must be between 0 and 100.", param_hint="--slope-threshold")

from .pipeline import run_slope_filter_only

logger.info(f"Starting standalone slope filter pipeline for threshold > {slope_threshold} degrees...")
out_dir = run_slope_filter_only(
local_dir=local_dir,
slope_threshold=slope_threshold,
output_dir=output_dir
)

if out_dir:
logger.info(f"Slope generation complete. Files saved to: {out_dir}")
else:
logger.warning("Slope pipeline exited without producing outputs.")


if __name__ == "__main__":
cli()
Loading