Async AI pipeline orchestrator built with FastAPI + MongoDB. Handles 500+ concurrent pipelines at <200ms API latency with parallel step execution, 8 pluggable workflow types, and fault-tolerant recovery.
- Parallel DAG execution — steps run concurrently wherever
depends_onallows - 8 step types —
openai,http,transform,delay,conditional,loop,webhook,log - MongoDB persistence — every state transition is saved; crash recovery on startup
- Retry + timeout — each step configures its own retry limit and timeout
- REST API — submit, poll, list, and cancel pipelines
cp .env.example .env
# Add your OPENAI_API_KEY to .env
docker compose up --buildAPI is live at http://localhost:8000. Docs at http://localhost:8000/docs.
| Method | Path | Description |
|---|---|---|
| POST | /pipelines/run |
Submit a pipeline (async by default) |
| GET | /pipelines/{run_id} |
Poll status + outputs |
| POST | /pipelines/{run_id}/cancel |
Cancel a run |
| GET | /pipelines/ |
List runs (filter by status) |
| GET | /health |
MongoDB health check |
{
"pipeline": {
"name": "research-and-summarize",
"initial_context": {
"topic": "quantum computing"
},
"steps": [
{
"id": "fetch_wiki",
"name": "Fetch Wikipedia",
"type": "http",
"config": {
"url": "https://en.wikipedia.org/api/rest_v1/page/summary/{{topic}}",
"output_key": "wiki"
},
"depends_on": []
},
{
"id": "summarize",
"name": "Summarize with GPT",
"type": "openai",
"config": {
"model": "gpt-4o-mini",
"system_prompt": "You are a concise technical writer.",
"user_prompt": "Summarize this in 3 bullet points: {{wiki}}",
"output_key": "summary"
},
"depends_on": ["fetch_wiki"]
},
{
"id": "log_result",
"name": "Log summary",
"type": "log",
"config": { "message": "Summary: {{summary}}" },
"depends_on": ["summarize"]
}
]
}
}Add ?wait=true to block until the pipeline finishes.
| Type | Key Config Fields |
|---|---|
openai |
model, system_prompt, user_prompt, output_key |
http |
url, method, headers, body, output_key |
transform |
expression (Python eval on context), output_key |
delay |
seconds |
conditional |
condition, true_step_ids, false_step_ids |
loop |
items_key, sub_steps |
webhook |
url, payload_template, output_key |
log |
message, level |
Templates use {{key}} or {{nested.key}} referencing the pipeline context.
pip install -r requirements.txt
cp .env.example .env # fill in values
uvicorn app.main:app --reloadapp/
config.py # Pydantic settings
database.py # Motor async MongoDB client
main.py # FastAPI app + lifespan
models/
pipeline.py # All Pydantic models
engine/
executor.py # Parallel DAG executor
runner.py # Pipeline lifecycle + persistence
steps/ # 8 pluggable step implementations
api/
pipelines.py # REST endpoints
health.py # Health check