_____
.-' '-.
.' .---. '.
/ / \ \======[ Raster ]
| | GEO | |=====[ Tabular ]
| | PIPE | |=====[ Vector ]
\ \ / /======[ Fusion ]
'. '---' .'
'-._____.-'
Geospatial data pipeline framework for causal inference workflows
geopipe simplifies large-scale geospatial data pipelines that integrate satellite imagery with heterogeneous tabular data sources. It's designed for researchers running causal inference workflows with complex data fusion requirements.
- Declarative Data Fusion: YAML/Python schemas for joining 27+ heterogeneous sources
- Remote Data Access: Download from Earth Engine, Planetary Computer, and STAC catalogs
- Data Discovery: Query available datasets across cloud catalogs with
discover() - Quality Intelligence: Proactive data auditing with
schema.audit()before expensive computation - Robustness DSL: Declarative YAML specifications for sensitivity analysis across parameter space
- Pipeline Orchestration: DAG-based workflows with checkpointing and resume
- Cluster Computing: Native SLURM/PBS integration with job monitoring
pip install geopipe
# With optional dependencies
pip install geopipe[prefect,cluster] # Pipeline orchestration + HPC
pip install geopipe[remote] # Earth Engine, Planetary Computer
pip install geopipe[all] # Everythinggeopipe requires GDAL and PROJ for geospatial operations. Install these first:
macOS (Homebrew):
brew install gdal projUbuntu/Debian:
sudo apt-get install gdal-bin libgdal-dev libproj-devConda (recommended):
conda install -c conda-forge gdal proj
pip install geopipefrom geopipe import FusionSchema, RasterSource, TabularSource
schema = FusionSchema(
name="aid_effects",
resolution="5km",
temporal_range=("2000-01-01", "2020-12-31"),
sources=[
RasterSource("nightlights", path="data/viirs/*.tif", aggregation="mean"),
RasterSource("landcover", path="data/modis/*.tif", aggregation="mode"),
TabularSource("conflict", path="data/acled.csv",
spatial_join="buffer_10km", temporal_align="yearly_sum"),
TabularSource("aid_projects", path="data/aiddata.csv",
spatial_join="nearest"),
],
output="data/interim/consolidated.parquet"
)
# Execute fusion
result = schema.execute()# sources.yaml
name: aid_effects
resolution: 5km
temporal_range: [2000-01-01, 2020-12-31]
sources:
- type: raster
name: nightlights
path: data/viirs/*.tif
aggregation: mean
- type: tabular
name: conflict
path: data/acled.csv
spatial_join: buffer_10km
temporal_align: yearly_sum
output: data/interim/consolidated.parquetgeopipe fuse sources.yamlSpatial Join Methods (spatial_join):
| Method | Description |
|---|---|
nearest |
Nearest neighbor join (default) |
intersects |
Join on geometric intersection |
within |
Source geometries within target |
contains |
Target contains source geometries |
buffer_5km |
Buffer target by 5km, then intersect |
buffer_10km |
Buffer target by 10km, then intersect |
buffer_25km |
Buffer target by 25km, then intersect |
buffer_50km |
Buffer target by 50km, then intersect |
Temporal Alignment (temporal_align):
| Method | Description |
|---|---|
exact |
No aggregation (default) |
yearly_sum |
Sum numeric values by year |
yearly_mean |
Average numeric values by year |
monthly_sum |
Sum numeric values by month |
monthly_mean |
Average numeric values by month |
latest |
Keep most recent record per geometry |
Raster Aggregation (aggregation):
| Method | Description |
|---|---|
mean |
Arithmetic mean (default) |
sum |
Sum of pixel values |
min / max |
Minimum / maximum value |
std |
Standard deviation |
median |
Median value |
mode |
Most frequent value |
count |
Count of valid pixels |
Resolution Formats:
"5km"- Kilometers"1000m"- Meters"0.5deg"- Decimal degrees
from geopipe import Pipeline, task
@task(cache=True, checkpoint=True)
def download_imagery(region, year):
...
@task(cache=True, resources={"memory": "16GB"})
def extract_features(imagery_path, model="resnet50"):
...
pipeline = Pipeline([download_imagery, extract_features])
pipeline.run(resume=True) # Resume from last checkpointCheckpoint Behavior:
- Task checkpoints stored in
.geopipe/checkpoints/(configurable viacheckpoint_dir) - Pipeline state stored in
.geopipe/pipeline/{name}_state.json - Resume skips completed stages and re-executes from failure point
- Clear checkpoints with
geopipe cleanor delete.geopipe/directory
# Configure checkpoint location
pipeline = Pipeline(
[download_imagery, extract_features],
checkpoint_dir="my_checkpoints/",
)
# Force fresh run (ignore existing checkpoints)
pipeline.run(resume=False)
# Resume from last successful stage
pipeline.run(resume=True)from geopipe.cluster import SLURMExecutor
executor = SLURMExecutor(
partition="gpu",
nodes=10,
time_limit="24:00:00",
conda_env="geopipe",
)
pipeline.run(executor=executor)
executor.status() # Monitor progressRun parallel analysis variants to assess sensitivity of results. See Robustness DSL for the declarative YAML approach.
from geopipe.specs import Spec, SpecRegistry
# Define analysis variants programmatically
specs = SpecRegistry([
Spec("MAIN", buffer_km=5, include_ntl=True),
Spec("ROBUST_BUFFER", buffer_km=10, include_ntl=True),
Spec("ROBUST_NO_NTL", buffer_km=5, include_ntl=False),
])
# Run each specification
for spec in specs:
schema.output = f"results/{spec.name}/estimates.csv"
schema.execute()
# Compare results and generate LaTeX table
latex = specs.to_latex(
pattern="{spec}/estimates.csv",
estimate_col="estimate",
se_col="std_error",
caption="Robustness Specifications",
)Or use the declarative YAML approach (recommended):
# schema.yaml
robustness:
dimensions:
buffer_km: [5, 10, 25]
include_ntl: [true, false]geopipe specs run schema.yamlgeopipe was designed for workflows like estimating causal effects from satellite imagery across multiple countries, integrating:
- Satellite data: VIIRS nightlights, MODIS land cover, Sentinel-2 imagery
- Conflict data: ACLED, UCDP
- Climate data: CHIRPS precipitation, ERA5 temperature
- Development data: World Bank indicators, DHS surveys
- Treatment data: Aid project locations (AidData, IATI)
Instead of coordinating 25+ scripts manually:
Analysis/
├── 01_setup.R
├── 02_get_conflict.R
├── 03_get_climate.R
├── ... (11 data prep scripts)
├── call_CI_analysis.R
└── consolidate_results.R
Use a single declarative pipeline:
schema = FusionSchema.from_yaml("sources.yaml")
pipeline = Pipeline.from_schema(schema)
pipeline.add_stage(run_cnn_inference)
pipeline.add_stage(estimate_causal_effects)
pipeline.run(executor=SLURMExecutor(), specs=["MAIN", "ROBUST_BUFFER"])All sources default to EPSG:4326 (WGS84). Buffer operations automatically convert to EPSG:3857 for accurate distance calculations.
- Formats: GeoTIFF, Cloud-Optimized GeoTIFF (COG)
- Bands: 1-indexed (e.g.,
band=1for first band) - Nodata: Auto-detected from file metadata or explicit parameter
- Formats: CSV, Parquet, Excel (.xlsx), JSON
- Required columns (one of):
latitude+longitudecolumns (configurable vialat_col,lon_col)geometrycolumn with WKT strings
ISO 8601 strings: "2020-01-01" or "2020-01-01T00:00:00"
Tuple of (minx, miny, maxx, maxy) in WGS84 coordinates.
geopipe uses colored terminal output via rich:
- Blue: Progress information
- Green: Success messages
- Yellow: Warnings and retries
- Red: Errors
File not found errors:
# Validate sources before execution
issues = schema.validate_sources()
if issues:
print("\n".join(issues))Task failures with retry:
@task(retries=3, retry_delay=5.0) # Retry 3x with exponential backoff
def flaky_download(url):
...Checkpoint corruption:
# Clear all checkpoints and restart
geopipe clean
# Or manually: rm -rf .geopipe/R users can access geopipe via the reticulate package. Results are returned as GeoDataFrames that convert directly to sf objects.
install.packages(c("reticulate", "sf"))
library(reticulate)
# Create conda environment with dependencies
conda_create("geopipe-env")
conda_install("geopipe-env", c("gdal", "proj"), channel = "conda-forge")
py_install("geopipe", envname = "geopipe-env")library(reticulate)
library(sf)
use_condaenv("geopipe-env")
# Import geopipe
geopipe <- import("geopipe")
# Define fusion schema
schema <- geopipe$FusionSchema(
name = "analysis",
resolution = "5km",
sources = list(
geopipe$RasterSource("nightlights", path = "data/viirs/*.tif", aggregation = "mean"),
geopipe$TabularSource("conflict", path = "data/acled.csv", spatial_join = "buffer_10km")
),
output = "results/fused.parquet"
)
# Execute and convert to sf
result <- schema$execute()
result_sf <- st_as_sf(result)Load a pre-configured YAML schema to minimize Python syntax:
schema <- geopipe$FusionSchema$from_yaml("sources.yaml")
result_sf <- st_as_sf(schema$execute())geopipe can download satellite imagery directly from cloud providers. Install remote dependencies:
pip install geopipe[remote]| Provider | Source Class | Collections |
|---|---|---|
| Google Earth Engine | EarthEngineSource |
VIIRS, Landsat, Sentinel-2, MODIS, ERA5, etc. |
| Microsoft Planetary Computer | PlanetaryComputerSource |
Sentinel-2, Landsat, NAIP, etc. |
| Any STAC Catalog | STACSource |
Varies by catalog |
Earth Engine (required):
# One-time authentication
earthengine authenticate
# Or use service account
export GOOGLE_APPLICATION_CREDENTIALS=/path/to/credentials.jsonPlanetary Computer (optional, for higher rate limits):
export PC_SDK_SUBSCRIPTION_KEY=your-api-keySTAC Catalogs: No authentication needed for public catalogs.
from geopipe import EarthEngineSource
source = EarthEngineSource(
name="viirs_nightlights",
collection="NOAA/VIIRS/DNB/MONTHLY_V1/VCMCFG",
bands=["avg_rad"],
center=(7.5, 9.0), # lon, lat (Nigeria)
patch_size_km=100, # 100km x 100km area
resolution=500, # 500m resolution
temporal_range=("2023-01-01", "2023-12-31"),
reducer="mean", # Temporal composite method
output_dir="data/remote",
)
gdf = source.load() # Downloads and returns GeoDataFramefrom geopipe import PlanetaryComputerSource
source = PlanetaryComputerSource(
name="sentinel2",
collection="sentinel-2-l2a",
bands=["B04", "B03", "B02", "B08"], # Red, Green, Blue, NIR
bounds=(-122.5, 37.7, -122.4, 37.8),
resolution=10,
temporal_range=("2023-06-01", "2023-08-31"),
cloud_cover_max=20, # Filter cloudy images
output_dir="data/remote",
)
gdf = source.load()from geopipe import STACSource
source = STACSource(
name="sentinel2_element84",
catalog_url="https://earth-search.aws.element84.com/v1",
collection="sentinel-2-l2a",
assets=["red", "green", "blue"],
center=(-105.27, 40.01), # Boulder, CO
patch_size_km=10,
temporal_range=("2023-07-01", "2023-09-30"),
output_dir="data/remote",
)
gdf = source.load()Spatial Extent (choose one):
bounds=(minx, miny, maxx, maxy)- Explicit bounding box in WGS84center=(lon, lat)+patch_size_km=N- Center point with patch size
Common Parameters:
| Parameter | Description |
|---|---|
resolution |
Target resolution in meters |
temporal_range |
("start_date", "end_date") ISO format |
output_dir |
Directory for cached downloads |
bands / assets |
List of band names to extract |
Earth Engine Specific:
| Parameter | Description |
|---|---|
reducer |
Temporal composite: mean, median, max, min, sum, first |
cloud_mask |
Apply cloud masking for optical sensors |
scale_factor |
Multiply band values by factor |
Planetary Computer Specific:
| Parameter | Description |
|---|---|
cloud_cover_max |
Maximum cloud cover percentage (0-100) |
mosaic_method |
How to combine images: first, median, mean |
Downloaded files are automatically cached based on query parameters:
data/remote/
├── earthengine/
│ ├── abc123def456.tif
│ └── abc123def456.json # metadata
├── planetary_computer/
└── stac/
Subsequent calls with the same parameters load from cache without re-downloading.
Discover available datasets across cloud catalogs before building your pipeline:
from geopipe import discover
# Find nightlights data
results = discover(
bounds=(-122.5, 37.7, -122.4, 37.8),
categories=["nightlights"],
)
for r in results:
print(f"{r.name}: {r.resolution_m}m ({r.provider})")
# Convert discovery result to source
source = results[0].to_source(name="sf_ntl")
schema.add_source(source)# Search by category
geopipe discover search --category nightlights
# Search by provider
geopipe discover search --provider earthengine --format table
# Get dataset details
geopipe discover info viirs_dnb_monthly
# List all known datasets
geopipe discover list-datasets
# List categories
geopipe discover categories| Category | Description | Example Datasets |
|---|---|---|
nightlights |
Nighttime light emissions | VIIRS DNB, DMSP-OLS |
optical |
Multispectral imagery | Sentinel-2, Landsat, MODIS |
sar |
Synthetic aperture radar | Sentinel-1, ALOS PALSAR |
elevation |
Digital elevation models | SRTM, Copernicus DEM |
climate |
Weather and climate | ERA5, CHIRPS, TerraClimate |
land_cover |
Land use classification | ESA WorldCover, Dynamic World |
vegetation |
Vegetation indices | MODIS NDVI/EVI, LAI |
population |
Population density | WorldPop, LandScan |
infrastructure |
Built environment | GHSL, OSM Buildings |
Discovery results can suggest related datasets:
viirs = discover(categories=["nightlights"])[0]
related = viirs.suggest_complementary()
# Returns: Sentinel-2, Landsat, etc.Audit data quality before running expensive computations:
from geopipe import FusionSchema
schema = FusionSchema.from_yaml("sources.yaml")
# Run quality audit
report = schema.audit()
# Check results
print(report.summary())
print(f"Overall score: {report.overall_score}/100")
if report.has_errors:
print("Critical issues found!")
for issue in report.filter(severity="error"):
print(f" - {issue.source_name}: {issue.message}")
# Export report
report.to_markdown("quality_report.md")# Basic audit
geopipe audit schema.yaml
# Save report
geopipe audit schema.yaml --output report.md
# Auto-fix issues
geopipe audit schema.yaml --fix
# Strict mode (fail on warnings)
geopipe audit schema.yaml --strict| Check | Category | Description |
|---|---|---|
TemporalOverlapCheck |
Cross-source | Verify all sources have overlapping time ranges |
CRSAlignmentCheck |
Cross-source | Check CRS compatibility across sources |
BoundsOverlapCheck |
Cross-source | Verify spatial extents overlap |
TemporalGapCheck |
Raster | Detect missing dates in time series |
SpatialCoverageCheck |
Raster | Report NoData percentage in AOI |
MissingValueCheck |
Tabular | Check for nulls in key columns |
GeocodingPrecisionCheck |
Tabular | Analyze coordinate precision |
- ERROR: Critical issues that will cause fusion to fail
- WARNING: Issues that may affect data quality (-10 points)
- INFO: Informational notes (-2 points)
Define robustness specifications declaratively in YAML for sensitivity analysis:
# schema.yaml
name: aid_effects
resolution: 5km
temporal_range: [2010-01-01, 2020-12-31]
sources:
- type: tabular
name: conflict
path: data/acled.csv
spatial_join: buffer_${buffer_km}km # Template substitution
output: results/${spec_name}/fused.parquet # Per-spec output
robustness:
dimensions:
buffer_km: [5, 10, 25, 50]
include_ntl: [true, false]
temporal_lag: [0, 1, 2]
exclude:
- buffer_km: 50
temporal_lag: 2 # Invalid combination
named:
MAIN:
buffer_km: 10
include_ntl: true
temporal_lag: 0This generates 4 x 2 x 3 - 1 = 23 specifications automatically.
from geopipe import FusionSchema
schema = FusionSchema.from_yaml("schema.yaml")
# Expand to spec + configured schema pairs
for spec, configured_schema in schema.expand_specs():
print(f"Running {spec.name}...")
configured_schema.execute()# Preview specifications
geopipe specs list schema.yaml
# Preview with parameters
geopipe specs list schema.yaml --verbose
# Write individual schema files
geopipe specs expand schema.yaml --output-dir schemas/
# Run all specifications
geopipe specs run schema.yaml
# Run specific specs only
geopipe specs run schema.yaml --spec MAIN --spec ROBUST_BUFFER
# Dry run (show what would execute)
geopipe specs run schema.yaml --dry-runAnalyze results across all specifications:
# Generate specification curve from results
geopipe specs curve "results/*/estimates.csv" --output curve.pdf
# Export as LaTeX table
geopipe specs curve "results/*.csv" --format latex
# With custom column names
geopipe specs curve "results/*.csv" \
--estimate-col treatment_effect \
--se-col robust_sefrom geopipe.specs import SpecificationCurve
curve = SpecificationCurve(
specs=specs,
results_pattern="results/{spec_name}/estimates.csv",
estimate_col="treatment_effect",
)
# Summary statistics
summary = curve.compute_summary()
print(f"Mean estimate: {summary['mean_estimate']:.4f}")
print(f"% Significant: {summary['pct_significant_05']:.1f}%")
# Rank dimensions by influence
influence = curve.rank_by_influence()
print(influence[["dimension", "variance_ratio"]])
# Generate plot
curve.plot("figures/spec_curve.pdf")
# Export for publication
print(curve.to_latex())Use ${param} or {param} in schema values:
sources:
- name: conflict
spatial_join: buffer_${buffer_km}km
output: results/{spec_name}/data.parquetRemote sources work in YAML schemas:
sources:
- type: earthengine
name: viirs
collection: NOAA/VIIRS/DNB/MONTHLY_V1/VCMCFG
bands: [avg_rad]
center: [7.5, 9.0]
patch_size_km: 100
resolution: 500
temporal_range: ["2023-01-01", "2023-12-31"]
output_dir: data/remote
- type: planetary_computer
name: sentinel2
collection: sentinel-2-l2a
bands: [B04, B08]
bounds: [-122.5, 37.5, -122.0, 38.0]
cloud_cover_max: 20
resolution: 10
output_dir: data/remote- Data Sources
- Fusion Schemas
- Data Discovery
- Quality Intelligence
- Robustness DSL
- Pipeline Orchestration
- Cluster Computing
MIT License. See LICENSE for details.
If you use geopipe in your research, please cite:
@software{geopipe,
author = {Jerzak, Connor T.},
title = {geopipe: Geospatial Data Pipeline Framework},
year = {2025},
url = {https://github.com/cjerzak/geopipe-software}
}