Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions alert_system/etl/gdacs_cyclone/config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# NOTE: Store Config files here. Might need to refactor if source supports filtering with hazards.
from alert_system.etl.base.config import ExtractionConfig

gdacs_cyclone_config: ExtractionConfig = {
"event_collection_type": "gdacs-events",
"hazard_collection_type": "gdacs-hazards",
"impact_collection_type": "gdacs-impacts",
"filter_event": {"hazard_codes": ["MH0309", "TC", "nat-met-sto-tro"]},
"filter_hazard": None,
"filter_impact": None,
"people_exposed_threshold": 500,
"forecasted_data": False,
}
15 changes: 15 additions & 0 deletions alert_system/etl/gdacs_cyclone/extraction.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
import logging

from alert_system.etl.base.extraction import BaseExtractionClass

from .config import gdacs_cyclone_config
from .loader import GdacsCycloneLoader
from .transform import GdacsCycloneTransformer

logger = logging.getLogger(__name__)


class GdacsCycloneExtraction(BaseExtractionClass):
config = gdacs_cyclone_config
transformer_class = GdacsCycloneTransformer
loader_class = GdacsCycloneLoader
12 changes: 12 additions & 0 deletions alert_system/etl/gdacs_cyclone/loader.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
from alert_system.etl.base.loader import BaseLoaderClass

from .config import gdacs_cyclone_config


class GdacsCycloneLoader(BaseLoaderClass):

def filter_eligible_items(self, load_obj):
people_exposed = load_obj.get("people_exposed")
if people_exposed is None:
return False
return people_exposed > gdacs_cyclone_config["people_exposed_threshold"]
80 changes: 80 additions & 0 deletions alert_system/etl/gdacs_cyclone/transform.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
import logging
from typing import Optional

from alert_system.etl.base.transform import BaseTransformerClass
from alert_system.models import ImpactDetailsEnum

logger = logging.getLogger(__name__)


class GdacsCycloneTransformer(BaseTransformerClass):
"""
Transformer for GDACS STAC impacts.
Extracts and normalizes impact fields, computes derived values, and stores metadata.
"""

def compute_people_exposed(self, metadata_list) -> Optional[int]:
for data in metadata_list:
if data["category"] == ImpactDetailsEnum.Category.PEOPLE and data["type"] == ImpactDetailsEnum.Type.AFFECTED_TOTAL:
return data["value"]
return None

def compute_buildings_exposed(self, metadata_list) -> Optional[int]:
"""
Compute the 'buildings_exposed' field.
"""
for data in metadata_list:
if data["category"] == ImpactDetailsEnum.Category.BUILDINGS and data["type"] == ImpactDetailsEnum.Type.AFFECTED_TOTAL:
return data["value"]
return None

def process_impact(self, impact_items) -> BaseTransformerClass.ImpactType:
metadata = []
for item in impact_items:
properties = item.resp_data.get("properties", {})
impact_detail = properties.get("monty:impact_detail", {})
category = impact_detail.get("category")
type_ = impact_detail.get("type")
value = impact_detail.get("value")
if category and type_:
metadata = [
{
"category": category,
"type": type_,
"value": value,
"unit": impact_detail.get("unit", ""),
"estimate_type": impact_detail.get("estimate_type", ""),
}
]
return {
"people_exposed": self.compute_people_exposed(metadata),
"buildings_exposed": self.compute_buildings_exposed(metadata),
"impact_metadata": metadata,
}

def process_hazard(self, hazard_item) -> BaseTransformerClass.HazardType:
if not hazard_item:
return {
"severity_unit": "",
"severity_label": "",
"severity_value": None,
}

properties = hazard_item.resp_data.get("properties", {})
detail = properties.get("monty:hazard_detail", {})

return {
"severity_unit": detail.get("severity_unit", ""),
"severity_label": detail.get("severity_label", ""),
"severity_value": detail.get("severity_value"),
}

def process_event(self, event_item) -> BaseTransformerClass.EventType:
properties = event_item.resp_data.get("properties", {})
return {
"title": properties.get("title", ""),
"description": properties.get("description", ""),
"country": properties.get("monty:country_codes", ""),
"start_datetime": properties.get("start_datetime"),
"end_datetime": properties.get("end_datetime"),
}
1 change: 1 addition & 0 deletions alert_system/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ class ConnectorType(models.IntegerChoices):
PROCESSOR_REGISTRY = {
ConnectorType.GDACS_FLOOD: "alert_system.etl.gdacs_flood.extraction.GdacsFloodExtraction",
ConnectorType.USGS_EARTHQUAKE: "alert_system.etl.usgs_earthquake.extraction.USGSEarthquakeExtraction",
ConnectorType.GDACS_CYCLONE: "alert_system.etl.gdacs_cyclone.extraction.GdacsCycloneExtraction",
} # Add all the extraction classes here

class Status(models.IntegerChoices):
Expand Down
Loading