Skip to content

AIandGlobalDevelopmentLab/geopipe-software

Repository files navigation

geopipe

           _____
        .-'     '-.
      .'  .---.    '.
     /   /     \     \======[ Raster  ]
    |   |  GEO  |     |=====[ Tabular ]
    |   |  PIPE |     |=====[ Vector  ]
     \   \     /     /======[ Fusion  ]
      '.  '---'    .'
        '-._____.-'

Geospatial data pipeline framework for causal inference workflows

Python 3.10+ License: MIT

geopipe simplifies large-scale geospatial data pipelines that integrate satellite imagery with heterogeneous tabular data sources. It's designed for researchers running causal inference workflows with complex data fusion requirements.

Features

  • Declarative Data Fusion: YAML/Python schemas for joining 27+ heterogeneous sources
  • Remote Data Access: Download from Earth Engine, Planetary Computer, and STAC catalogs
  • Data Discovery: Query available datasets across cloud catalogs with discover()
  • Quality Intelligence: Proactive data auditing with schema.audit() before expensive computation
  • Robustness DSL: Declarative YAML specifications for sensitivity analysis across parameter space
  • Pipeline Orchestration: DAG-based workflows with checkpointing and resume
  • Cluster Computing: Native SLURM/PBS integration with job monitoring

Installation

pip install geopipe

# With optional dependencies
pip install geopipe[prefect,cluster]  # Pipeline orchestration + HPC
pip install geopipe[remote]           # Earth Engine, Planetary Computer
pip install geopipe[all]              # Everything

System Dependencies

geopipe requires GDAL and PROJ for geospatial operations. Install these first:

macOS (Homebrew):

brew install gdal proj

Ubuntu/Debian:

sudo apt-get install gdal-bin libgdal-dev libproj-dev

Conda (recommended):

conda install -c conda-forge gdal proj
pip install geopipe

Quick Start

1. Define a Data Fusion Schema

from geopipe import FusionSchema, RasterSource, TabularSource

schema = FusionSchema(
    name="aid_effects",
    resolution="5km",
    temporal_range=("2000-01-01", "2020-12-31"),
    sources=[
        RasterSource("nightlights", path="data/viirs/*.tif", aggregation="mean"),
        RasterSource("landcover", path="data/modis/*.tif", aggregation="mode"),
        TabularSource("conflict", path="data/acled.csv",
                      spatial_join="buffer_10km", temporal_align="yearly_sum"),
        TabularSource("aid_projects", path="data/aiddata.csv",
                      spatial_join="nearest"),
    ],
    output="data/interim/consolidated.parquet"
)

# Execute fusion
result = schema.execute()

2. Or use YAML configuration

# sources.yaml
name: aid_effects
resolution: 5km
temporal_range: [2000-01-01, 2020-12-31]

sources:
  - type: raster
    name: nightlights
    path: data/viirs/*.tif
    aggregation: mean

  - type: tabular
    name: conflict
    path: data/acled.csv
    spatial_join: buffer_10km
    temporal_align: yearly_sum

output: data/interim/consolidated.parquet
geopipe fuse sources.yaml

Alignment Options Reference

Spatial Join Methods (spatial_join):

Method Description
nearest Nearest neighbor join (default)
intersects Join on geometric intersection
within Source geometries within target
contains Target contains source geometries
buffer_5km Buffer target by 5km, then intersect
buffer_10km Buffer target by 10km, then intersect
buffer_25km Buffer target by 25km, then intersect
buffer_50km Buffer target by 50km, then intersect

Temporal Alignment (temporal_align):

Method Description
exact No aggregation (default)
yearly_sum Sum numeric values by year
yearly_mean Average numeric values by year
monthly_sum Sum numeric values by month
monthly_mean Average numeric values by month
latest Keep most recent record per geometry

Raster Aggregation (aggregation):

Method Description
mean Arithmetic mean (default)
sum Sum of pixel values
min / max Minimum / maximum value
std Standard deviation
median Median value
mode Most frequent value
count Count of valid pixels

Resolution Formats:

  • "5km" - Kilometers
  • "1000m" - Meters
  • "0.5deg" - Decimal degrees

3. Build Pipelines with Checkpointing

from geopipe import Pipeline, task

@task(cache=True, checkpoint=True)
def download_imagery(region, year):
    ...

@task(cache=True, resources={"memory": "16GB"})
def extract_features(imagery_path, model="resnet50"):
    ...

pipeline = Pipeline([download_imagery, extract_features])
pipeline.run(resume=True)  # Resume from last checkpoint

Checkpoint Behavior:

  • Task checkpoints stored in .geopipe/checkpoints/ (configurable via checkpoint_dir)
  • Pipeline state stored in .geopipe/pipeline/{name}_state.json
  • Resume skips completed stages and re-executes from failure point
  • Clear checkpoints with geopipe clean or delete .geopipe/ directory
# Configure checkpoint location
pipeline = Pipeline(
    [download_imagery, extract_features],
    checkpoint_dir="my_checkpoints/",
)

# Force fresh run (ignore existing checkpoints)
pipeline.run(resume=False)

# Resume from last successful stage
pipeline.run(resume=True)

4. Run on HPC Clusters

from geopipe.cluster import SLURMExecutor

executor = SLURMExecutor(
    partition="gpu",
    nodes=10,
    time_limit="24:00:00",
    conda_env="geopipe",
)

pipeline.run(executor=executor)
executor.status()  # Monitor progress

5. Manage Robustness Specifications

Run parallel analysis variants to assess sensitivity of results. See Robustness DSL for the declarative YAML approach.

from geopipe.specs import Spec, SpecRegistry

# Define analysis variants programmatically
specs = SpecRegistry([
    Spec("MAIN", buffer_km=5, include_ntl=True),
    Spec("ROBUST_BUFFER", buffer_km=10, include_ntl=True),
    Spec("ROBUST_NO_NTL", buffer_km=5, include_ntl=False),
])

# Run each specification
for spec in specs:
    schema.output = f"results/{spec.name}/estimates.csv"
    schema.execute()

# Compare results and generate LaTeX table
latex = specs.to_latex(
    pattern="{spec}/estimates.csv",
    estimate_col="estimate",
    se_col="std_error",
    caption="Robustness Specifications",
)

Or use the declarative YAML approach (recommended):

# schema.yaml
robustness:
  dimensions:
    buffer_km: [5, 10, 25]
    include_ntl: [true, false]
geopipe specs run schema.yaml

Use Case: Satellite Imagery Causal Inference

geopipe was designed for workflows like estimating causal effects from satellite imagery across multiple countries, integrating:

  • Satellite data: VIIRS nightlights, MODIS land cover, Sentinel-2 imagery
  • Conflict data: ACLED, UCDP
  • Climate data: CHIRPS precipitation, ERA5 temperature
  • Development data: World Bank indicators, DHS surveys
  • Treatment data: Aid project locations (AidData, IATI)

Instead of coordinating 25+ scripts manually:

Analysis/
├── 01_setup.R
├── 02_get_conflict.R
├── 03_get_climate.R
├── ... (11 data prep scripts)
├── call_CI_analysis.R
└── consolidate_results.R

Use a single declarative pipeline:

schema = FusionSchema.from_yaml("sources.yaml")
pipeline = Pipeline.from_schema(schema)
pipeline.add_stage(run_cnn_inference)
pipeline.add_stage(estimate_causal_effects)
pipeline.run(executor=SLURMExecutor(), specs=["MAIN", "ROBUST_BUFFER"])

Data Format Requirements

Coordinate Reference System

All sources default to EPSG:4326 (WGS84). Buffer operations automatically convert to EPSG:3857 for accurate distance calculations.

Raster Sources

  • Formats: GeoTIFF, Cloud-Optimized GeoTIFF (COG)
  • Bands: 1-indexed (e.g., band=1 for first band)
  • Nodata: Auto-detected from file metadata or explicit parameter

Tabular Sources

  • Formats: CSV, Parquet, Excel (.xlsx), JSON
  • Required columns (one of):
    • latitude + longitude columns (configurable via lat_col, lon_col)
    • geometry column with WKT strings

Temporal Format

ISO 8601 strings: "2020-01-01" or "2020-01-01T00:00:00"

Bounds Format

Tuple of (minx, miny, maxx, maxy) in WGS84 coordinates.

Troubleshooting

Console Output

geopipe uses colored terminal output via rich:

  • Blue: Progress information
  • Green: Success messages
  • Yellow: Warnings and retries
  • Red: Errors

Common Issues

File not found errors:

# Validate sources before execution
issues = schema.validate_sources()
if issues:
    print("\n".join(issues))

Task failures with retry:

@task(retries=3, retry_delay=5.0)  # Retry 3x with exponential backoff
def flaky_download(url):
    ...

Checkpoint corruption:

# Clear all checkpoints and restart
geopipe clean
# Or manually: rm -rf .geopipe/

Using geopipe from R

R users can access geopipe via the reticulate package. Results are returned as GeoDataFrames that convert directly to sf objects.

Setup

install.packages(c("reticulate", "sf"))
library(reticulate)

# Create conda environment with dependencies
conda_create("geopipe-env")
conda_install("geopipe-env", c("gdal", "proj"), channel = "conda-forge")
py_install("geopipe", envname = "geopipe-env")

Basic Usage

library(reticulate)
library(sf)
use_condaenv("geopipe-env")

# Import geopipe
geopipe <- import("geopipe")

# Define fusion schema
schema <- geopipe$FusionSchema(
  name = "analysis",
  resolution = "5km",
  sources = list(
    geopipe$RasterSource("nightlights", path = "data/viirs/*.tif", aggregation = "mean"),
    geopipe$TabularSource("conflict", path = "data/acled.csv", spatial_join = "buffer_10km")
  ),
  output = "results/fused.parquet"
)

# Execute and convert to sf
result <- schema$execute()
result_sf <- st_as_sf(result)

YAML-based Workflow

Load a pre-configured YAML schema to minimize Python syntax:

schema <- geopipe$FusionSchema$from_yaml("sources.yaml")
result_sf <- st_as_sf(schema$execute())

Remote Data Sources

geopipe can download satellite imagery directly from cloud providers. Install remote dependencies:

pip install geopipe[remote]

Supported Providers

Provider Source Class Collections
Google Earth Engine EarthEngineSource VIIRS, Landsat, Sentinel-2, MODIS, ERA5, etc.
Microsoft Planetary Computer PlanetaryComputerSource Sentinel-2, Landsat, NAIP, etc.
Any STAC Catalog STACSource Varies by catalog

Authentication Setup

Earth Engine (required):

# One-time authentication
earthengine authenticate

# Or use service account
export GOOGLE_APPLICATION_CREDENTIALS=/path/to/credentials.json

Planetary Computer (optional, for higher rate limits):

export PC_SDK_SUBSCRIPTION_KEY=your-api-key

STAC Catalogs: No authentication needed for public catalogs.

Earth Engine Example

from geopipe import EarthEngineSource

source = EarthEngineSource(
    name="viirs_nightlights",
    collection="NOAA/VIIRS/DNB/MONTHLY_V1/VCMCFG",
    bands=["avg_rad"],
    center=(7.5, 9.0),        # lon, lat (Nigeria)
    patch_size_km=100,        # 100km x 100km area
    resolution=500,           # 500m resolution
    temporal_range=("2023-01-01", "2023-12-31"),
    reducer="mean",           # Temporal composite method
    output_dir="data/remote",
)

gdf = source.load()  # Downloads and returns GeoDataFrame

Planetary Computer Example

from geopipe import PlanetaryComputerSource

source = PlanetaryComputerSource(
    name="sentinel2",
    collection="sentinel-2-l2a",
    bands=["B04", "B03", "B02", "B08"],  # Red, Green, Blue, NIR
    bounds=(-122.5, 37.7, -122.4, 37.8),
    resolution=10,
    temporal_range=("2023-06-01", "2023-08-31"),
    cloud_cover_max=20,       # Filter cloudy images
    output_dir="data/remote",
)

gdf = source.load()

Generic STAC Example

from geopipe import STACSource

source = STACSource(
    name="sentinel2_element84",
    catalog_url="https://earth-search.aws.element84.com/v1",
    collection="sentinel-2-l2a",
    assets=["red", "green", "blue"],
    center=(-105.27, 40.01),  # Boulder, CO
    patch_size_km=10,
    temporal_range=("2023-07-01", "2023-09-30"),
    output_dir="data/remote",
)

gdf = source.load()

Remote Source Parameters

Spatial Extent (choose one):

  • bounds=(minx, miny, maxx, maxy) - Explicit bounding box in WGS84
  • center=(lon, lat) + patch_size_km=N - Center point with patch size

Common Parameters:

Parameter Description
resolution Target resolution in meters
temporal_range ("start_date", "end_date") ISO format
output_dir Directory for cached downloads
bands / assets List of band names to extract

Earth Engine Specific:

Parameter Description
reducer Temporal composite: mean, median, max, min, sum, first
cloud_mask Apply cloud masking for optical sensors
scale_factor Multiply band values by factor

Planetary Computer Specific:

Parameter Description
cloud_cover_max Maximum cloud cover percentage (0-100)
mosaic_method How to combine images: first, median, mean

Caching

Downloaded files are automatically cached based on query parameters:

data/remote/
├── earthengine/
│   ├── abc123def456.tif
│   └── abc123def456.json  # metadata
├── planetary_computer/
└── stac/

Subsequent calls with the same parameters load from cache without re-downloading.

Data Discovery

Discover available datasets across cloud catalogs before building your pipeline:

from geopipe import discover

# Find nightlights data
results = discover(
    bounds=(-122.5, 37.7, -122.4, 37.8),
    categories=["nightlights"],
)

for r in results:
    print(f"{r.name}: {r.resolution_m}m ({r.provider})")

# Convert discovery result to source
source = results[0].to_source(name="sf_ntl")
schema.add_source(source)

CLI Discovery

# Search by category
geopipe discover search --category nightlights

# Search by provider
geopipe discover search --provider earthengine --format table

# Get dataset details
geopipe discover info viirs_dnb_monthly

# List all known datasets
geopipe discover list-datasets

# List categories
geopipe discover categories

Available Categories

Category Description Example Datasets
nightlights Nighttime light emissions VIIRS DNB, DMSP-OLS
optical Multispectral imagery Sentinel-2, Landsat, MODIS
sar Synthetic aperture radar Sentinel-1, ALOS PALSAR
elevation Digital elevation models SRTM, Copernicus DEM
climate Weather and climate ERA5, CHIRPS, TerraClimate
land_cover Land use classification ESA WorldCover, Dynamic World
vegetation Vegetation indices MODIS NDVI/EVI, LAI
population Population density WorldPop, LandScan
infrastructure Built environment GHSL, OSM Buildings

Complementary Datasets

Discovery results can suggest related datasets:

viirs = discover(categories=["nightlights"])[0]
related = viirs.suggest_complementary()
# Returns: Sentinel-2, Landsat, etc.

Data Quality Intelligence

Audit data quality before running expensive computations:

from geopipe import FusionSchema

schema = FusionSchema.from_yaml("sources.yaml")

# Run quality audit
report = schema.audit()

# Check results
print(report.summary())
print(f"Overall score: {report.overall_score}/100")

if report.has_errors:
    print("Critical issues found!")
    for issue in report.filter(severity="error"):
        print(f"  - {issue.source_name}: {issue.message}")

# Export report
report.to_markdown("quality_report.md")

CLI Audit

# Basic audit
geopipe audit schema.yaml

# Save report
geopipe audit schema.yaml --output report.md

# Auto-fix issues
geopipe audit schema.yaml --fix

# Strict mode (fail on warnings)
geopipe audit schema.yaml --strict

Quality Checks

Check Category Description
TemporalOverlapCheck Cross-source Verify all sources have overlapping time ranges
CRSAlignmentCheck Cross-source Check CRS compatibility across sources
BoundsOverlapCheck Cross-source Verify spatial extents overlap
TemporalGapCheck Raster Detect missing dates in time series
SpatialCoverageCheck Raster Report NoData percentage in AOI
MissingValueCheck Tabular Check for nulls in key columns
GeocodingPrecisionCheck Tabular Analyze coordinate precision

Issue Severity

  • ERROR: Critical issues that will cause fusion to fail
  • WARNING: Issues that may affect data quality (-10 points)
  • INFO: Informational notes (-2 points)

Robustness DSL

Define robustness specifications declaratively in YAML for sensitivity analysis:

# schema.yaml
name: aid_effects
resolution: 5km
temporal_range: [2010-01-01, 2020-12-31]

sources:
  - type: tabular
    name: conflict
    path: data/acled.csv
    spatial_join: buffer_${buffer_km}km  # Template substitution

output: results/${spec_name}/fused.parquet  # Per-spec output

robustness:
  dimensions:
    buffer_km: [5, 10, 25, 50]
    include_ntl: [true, false]
    temporal_lag: [0, 1, 2]

  exclude:
    - buffer_km: 50
      temporal_lag: 2  # Invalid combination

  named:
    MAIN:
      buffer_km: 10
      include_ntl: true
      temporal_lag: 0

This generates 4 x 2 x 3 - 1 = 23 specifications automatically.

Python API

from geopipe import FusionSchema

schema = FusionSchema.from_yaml("schema.yaml")

# Expand to spec + configured schema pairs
for spec, configured_schema in schema.expand_specs():
    print(f"Running {spec.name}...")
    configured_schema.execute()

CLI Commands

# Preview specifications
geopipe specs list schema.yaml

# Preview with parameters
geopipe specs list schema.yaml --verbose

# Write individual schema files
geopipe specs expand schema.yaml --output-dir schemas/

# Run all specifications
geopipe specs run schema.yaml

# Run specific specs only
geopipe specs run schema.yaml --spec MAIN --spec ROBUST_BUFFER

# Dry run (show what would execute)
geopipe specs run schema.yaml --dry-run

Specification Curve Analysis

Analyze results across all specifications:

# Generate specification curve from results
geopipe specs curve "results/*/estimates.csv" --output curve.pdf

# Export as LaTeX table
geopipe specs curve "results/*.csv" --format latex

# With custom column names
geopipe specs curve "results/*.csv" \
  --estimate-col treatment_effect \
  --se-col robust_se
from geopipe.specs import SpecificationCurve

curve = SpecificationCurve(
    specs=specs,
    results_pattern="results/{spec_name}/estimates.csv",
    estimate_col="treatment_effect",
)

# Summary statistics
summary = curve.compute_summary()
print(f"Mean estimate: {summary['mean_estimate']:.4f}")
print(f"% Significant: {summary['pct_significant_05']:.1f}%")

# Rank dimensions by influence
influence = curve.rank_by_influence()
print(influence[["dimension", "variance_ratio"]])

# Generate plot
curve.plot("figures/spec_curve.pdf")

# Export for publication
print(curve.to_latex())

Template Substitution

Use ${param} or {param} in schema values:

sources:
  - name: conflict
    spatial_join: buffer_${buffer_km}km

output: results/{spec_name}/data.parquet

YAML Configuration

Remote sources work in YAML schemas:

sources:
  - type: earthengine
    name: viirs
    collection: NOAA/VIIRS/DNB/MONTHLY_V1/VCMCFG
    bands: [avg_rad]
    center: [7.5, 9.0]
    patch_size_km: 100
    resolution: 500
    temporal_range: ["2023-01-01", "2023-12-31"]
    output_dir: data/remote

  - type: planetary_computer
    name: sentinel2
    collection: sentinel-2-l2a
    bands: [B04, B08]
    bounds: [-122.5, 37.5, -122.0, 38.0]
    cloud_cover_max: 20
    resolution: 10
    output_dir: data/remote

Documentation

License

MIT License. See LICENSE for details.

Citation

If you use geopipe in your research, please cite:

@software{geopipe,
  author = {Jerzak, Connor T.},
  title = {geopipe: Geospatial Data Pipeline Framework},
  year = {2025},
  url = {https://github.com/cjerzak/geopipe-software}
}

About

geopipe: Geospatial data pipeline framework for causal inference workflows

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published