Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -154,3 +154,5 @@ cython_debug/

# IDEs, Editors, etc...
.vscode/

mnemonic.txt
35 changes: 35 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,41 @@ for collection_id, info in datasets.items():

```

## Siren API usage

The Python client also exposes a Siren REST client for metrics and regions.

```python
from dclimate_client_py import (
dClimateClient,
SirenApiKeyAuth,
SirenMetricQuery,
SirenOptions,
)

async def main():
client = dClimateClient(
siren=SirenOptions(
auth=SirenApiKeyAuth() # reads SIREN_API_KEY and SIREN_ACCOUNT_ID from env
)
)

regions = await client.list_regions()
print(f"Loaded {len(regions)} regions")

data = await client.get_metric_data(
SirenMetricQuery(
region_id=regions[0].id,
metric="average_precip",
start_date="2025-01-01",
end_date="2025-01-31",
)
)
print(data[:3])
```

`x402` is included in the default install.

> More examples can be found at [dClimate Jupyter Notebooks](https://github.com/dClimate/jupyter-notebooks/tree/main/notebooks). To run your own IPFS gateway follow the instructions for [installing ipfs](https://docs.ipfs.tech/install/command-line/#install-official-binary-distributions). For additional assistance find us on [Discord](https://discord.com/invite/bYWVdNDMpe ), if you are an organization or business reach out to us at community at dclimate dot net.

## Create and activate a virtual environment:
Expand Down
35 changes: 35 additions & 0 deletions dclimate_client_py/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,30 @@
CatalogCollection,
CatalogDataset,
DatasetVariantConfig,
SpatialExtent,
TemporalExtent,
)
from .stac_catalog import (
load_stac_catalog,
list_available_datasets,
)
from .siren import (
SirenClient,
SirenApiKeyAuth,
SirenX402Auth,
SirenOptions,
SirenMetricQuery,
SirenMetricDataPoint,
SirenRegion,
SirenRegionsResponse,
SirenCountry,
EvmSigner,
)
from .dclimate_zarr_errors import (
SirenApiError,
X402PaymentError,
X402NotInstalledError,
)

__all__ = [
"dClimateClient",
Expand All @@ -29,6 +48,22 @@
"CatalogCollection",
"CatalogDataset",
"DatasetVariantConfig",
"SpatialExtent",
"TemporalExtent",
"load_stac_catalog",
"list_available_datasets",
# Siren
"SirenClient",
"SirenApiKeyAuth",
"SirenX402Auth",
"SirenOptions",
"SirenMetricQuery",
"SirenMetricDataPoint",
"SirenRegion",
"SirenRegionsResponse",
"SirenCountry",
"EvmSigner",
"SirenApiError",
"X402PaymentError",
"X402NotInstalledError",
]
12 changes: 2 additions & 10 deletions dclimate_client_py/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,23 +4,15 @@

import datetime
import typing
import xarray as xr

from .dclimate_zarr_errors import (
ConflictingGeoRequestError,
ConflictingAggregationRequestError,
InvalidExportFormatError,
InvalidSelectionError,
)
from .geotemporal_data import GeotemporalData, DEFAULT_POINT_LIMIT
from .s3_retrieval import get_dataset_from_s3
from .ipfs_retrieval import (
_get_dataset_by_ipfs_cid,
)
from .datasets import (
DatasetCatalog,
)
from .concatenate import concatenate_datasets


def load_s3(
dataset_name: str,
Expand Down Expand Up @@ -194,4 +186,4 @@ def geo_temporal_query(
if output_format == "netcdf":
return data.to_netcdf()
else: # "array"
return data.as_dict()
return data.as_dict()
6 changes: 2 additions & 4 deletions dclimate_client_py/concatenate.py
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ async def concatenate_datasets(
)

# Concatenate with combined dataset
new_combined = xr.concat(
combined = xr.concat(
[combined, sliced_next],
dim=dimension,
)
Expand All @@ -192,9 +192,7 @@ async def concatenate_datasets(
)

except NoDataFoundError as e:
logger.warning(
f"Skipping dataset {i} as it contains no new data: {e}"
)
logger.warning(f"Skipping dataset {i} as it contains no new data: {e}")
# Continue to next dataset
continue

Expand Down
42 changes: 30 additions & 12 deletions dclimate_client_py/datasets.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,43 +6,53 @@
"""

import typing
from typing import TypedDict, List, Optional
from typing import TypedDict, List, Optional, Tuple
import logging
import requests

from .dclimate_zarr_errors import (
DatasetNotFoundError,
InvalidSelectionError,
VariantNotFoundError,
CollectionNotFoundError,
IpfsConnectionError,
)

logger = logging.getLogger(__name__)


# --- Type Definitions ---

hydrogen_endpoint = "https://dclimate-ceramic.duckdns.org/api/datasets";
hydrogen_endpoint = "https://dclimate-ceramic.duckdns.org/api/datasets"


class SpatialExtent(TypedDict):
"""Bounding box for a dataset's spatial coverage."""

bbox: Tuple[float, float, float, float] # [minLon, minLat, maxLon, maxLat]


class TemporalExtent(TypedDict):
"""Temporal range for a dataset's time coverage."""

start: Optional[str]
end: Optional[str]


class DatasetVariantConfig(TypedDict, total=False):
"""Configuration for a single dataset variant."""

variant: str
cid: Optional[str] # Direct IPFS CID
url: Optional[str] # API endpoint that returns CID (for future use)
concat_priority: Optional[int] # Lower number = higher priority for concatenation
concat_dimension: Optional[str] # Dimension to concatenate along (default: "time")
spatial_extent: Optional[SpatialExtent]
temporal_extent: Optional[TemporalExtent]


class CatalogDataset(TypedDict):
"""A dataset with its variants."""

dataset: str
variants: List[DatasetVariantConfig]


class CatalogCollection(TypedDict):
"""A collection of related datasets."""

collection: str
datasets: List[CatalogDataset]

Expand All @@ -53,6 +63,7 @@ class CatalogCollection(TypedDict):

class ResolvedDatasetSource(TypedDict):
"""Resolved dataset information."""

collection: str
dataset: str
variant: str
Expand All @@ -63,21 +74,28 @@ class ResolvedDatasetSource(TypedDict):

class UrlFetchResult(TypedDict, total=False):
"""Result from fetching CID from URL endpoint."""

cid: str
dataset: Optional[str]
timestamp: Optional[int] # Unix timestamp in milliseconds


class DatasetMetadata(TypedDict, total=False):
"""Metadata about a loaded dataset."""

collection: str
dataset: str
variant: str
slug: str # Full dataset identifier (e.g., "era5/temp2m/finalized")
cid: str # The actual CID used to load the dataset
url: Optional[str] # URL if one was used in the resolution
timestamp: Optional[int] # Unix timestamp in milliseconds when dataset was last updated
source: typing.Literal["catalog", "stac", "direct_cid"] # How the dataset was loaded
timestamp: Optional[
int
] # Unix timestamp in milliseconds when dataset was last updated
source: typing.Literal[
"catalog", "stac", "direct_cid"
] # How the dataset was loaded
organization: Optional[str]


# --- Helper Functions ---
67 changes: 59 additions & 8 deletions dclimate_client_py/dclimate_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,11 @@
"""

import typing
import requests
import xarray as xr
from py_hamt import KuboCAS
import pystac

# Import here to avoid circular imports
from .ipfs_retrieval import _load_dataset_from_ipfs_cid

Expand All @@ -22,6 +24,13 @@
list_available_datasets,
)
from .stac_server import resolve_cid_from_stac_server
from .siren import SirenClient
from .siren.types import (
SirenMetricDataPoint,
SirenMetricQuery,
SirenOptions,
SirenRegion,
)


class dClimateClient:
Expand Down Expand Up @@ -74,6 +83,7 @@ def __init__(
gateway_base_url: typing.Optional[str] = "https://ipfs-gateway.dclimate.net",
rpc_base_url: typing.Optional[str] = "https://ipfs-gateway.dclimate.net",
stac_server_url: typing.Optional[str] = "https://api.stac.dclimate.net",
siren: typing.Optional[SirenOptions] = None,
):
self._gateway_base_url = gateway_base_url
self._rpc_base_url = rpc_base_url
Expand All @@ -82,6 +92,11 @@ def __init__(
self._kubo_cas: typing.Optional[KuboCAS] = None
# Note: STAC catalog is loaded lazily (only if STAC server fails)

# Siren REST API client (optional)
self._siren_client: typing.Optional[SirenClient] = None
if siren is not None:
self._siren_client = SirenClient(siren)

async def __aenter__(self) -> "dClimateClient":
"""Initialize KuboCAS when entering async context."""
# Create KuboCAS with configured endpoints
Expand All @@ -95,6 +110,8 @@ async def __aenter__(self) -> "dClimateClient":

async def __aexit__(self, exc_type, exc_val, exc_tb):
"""Clean up KuboCAS when exiting async context."""
if self._siren_client is not None:
await self._siren_client.aclose()
if self._kubo_cas:
await self._kubo_cas.__aexit__(exc_type, exc_val, exc_tb)
self._kubo_cas = None
Expand All @@ -109,7 +126,7 @@ async def load_dataset(
return_xarray: bool = False,
) -> typing.Union[
typing.Tuple[GeotemporalData, DatasetMetadata],
typing.Tuple[xr.Dataset, DatasetMetadata]
typing.Tuple[xr.Dataset, DatasetMetadata],
]:
"""
Load a dClimate dataset from IPFS using the STAC catalog.
Expand Down Expand Up @@ -183,7 +200,11 @@ async def load_dataset(
)

resolved_collection = collection
if organization and collection and not collection.startswith(f"{organization}_"):
if (
organization
and collection
and not collection.startswith(f"{organization}_")
):
resolved_collection = f"{organization}_{collection}"

# Case 1: Direct CID provided - bypass catalog resolution
Expand Down Expand Up @@ -239,9 +260,9 @@ async def load_dataset(
variant=variant,
server_url=self._stac_server_url,
)
except Exception as e:
print("Fallback")
pass # Fall back to IPFS catalog
except (requests.RequestException, ValueError):
# Fall back when server lookup fails or returns no usable match.
pass

# Fallback: Resolve via STAC catalog from IPFS
if final_cid is None:
Expand Down Expand Up @@ -332,8 +353,38 @@ def list_datasets(self) -> typing.Dict[str, typing.Dict[str, typing.Any]]:
"""
# Lazy load STAC catalog
if self._stac_catalog is None:
self._stac_catalog = load_stac_catalog(
gateway_url=self._gateway_base_url
)
self._stac_catalog = load_stac_catalog(gateway_url=self._gateway_base_url)

return list_available_datasets(self._stac_catalog)

# ------------------------------------------------------------------
# Siren REST API methods
# ------------------------------------------------------------------

async def get_metric_data(
self, query: SirenMetricQuery
) -> list[SirenMetricDataPoint]:
"""
Fetch Siren metric data for a region over a date range.

Requires the client to be initialised with ``siren=SirenOptions(...)``.
"""
if self._siren_client is None:
raise RuntimeError(
"Siren is not configured. Pass siren=SirenOptions(...) "
"when creating the dClimateClient."
)
return await self._siren_client.get_metric_data(query)

async def list_regions(self) -> list[SirenRegion]:
"""
List available Siren regions.

Requires the client to be initialised with ``siren=SirenOptions(...)``.
"""
if self._siren_client is None:
raise RuntimeError(
"Siren is not configured. Pass siren=SirenOptions(...) "
"when creating the dClimateClient."
)
return await self._siren_client.list_regions()
Loading
Loading