Skip to content

greeshmab21/agentic-workflow-execution-engine

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

1 Commit
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

Agentic Workflow Execution Engine

Async AI pipeline orchestrator built with FastAPI + MongoDB. Handles 500+ concurrent pipelines at <200ms API latency with parallel step execution, 8 pluggable workflow types, and fault-tolerant recovery.

Features

  • Parallel DAG execution — steps run concurrently wherever depends_on allows
  • 8 step typesopenai, http, transform, delay, conditional, loop, webhook, log
  • MongoDB persistence — every state transition is saved; crash recovery on startup
  • Retry + timeout — each step configures its own retry limit and timeout
  • REST API — submit, poll, list, and cancel pipelines

Quick Start

cp .env.example .env
# Add your OPENAI_API_KEY to .env

docker compose up --build

API is live at http://localhost:8000. Docs at http://localhost:8000/docs.

API Reference

Method Path Description
POST /pipelines/run Submit a pipeline (async by default)
GET /pipelines/{run_id} Poll status + outputs
POST /pipelines/{run_id}/cancel Cancel a run
GET /pipelines/ List runs (filter by status)
GET /health MongoDB health check

Submit a pipeline (POST /pipelines/run)

{
  "pipeline": {
    "name": "research-and-summarize",
    "initial_context": {
      "topic": "quantum computing"
    },
    "steps": [
      {
        "id": "fetch_wiki",
        "name": "Fetch Wikipedia",
        "type": "http",
        "config": {
          "url": "https://en.wikipedia.org/api/rest_v1/page/summary/{{topic}}",
          "output_key": "wiki"
        },
        "depends_on": []
      },
      {
        "id": "summarize",
        "name": "Summarize with GPT",
        "type": "openai",
        "config": {
          "model": "gpt-4o-mini",
          "system_prompt": "You are a concise technical writer.",
          "user_prompt": "Summarize this in 3 bullet points: {{wiki}}",
          "output_key": "summary"
        },
        "depends_on": ["fetch_wiki"]
      },
      {
        "id": "log_result",
        "name": "Log summary",
        "type": "log",
        "config": { "message": "Summary: {{summary}}" },
        "depends_on": ["summarize"]
      }
    ]
  }
}

Add ?wait=true to block until the pipeline finishes.

Step Types

Type Key Config Fields
openai model, system_prompt, user_prompt, output_key
http url, method, headers, body, output_key
transform expression (Python eval on context), output_key
delay seconds
conditional condition, true_step_ids, false_step_ids
loop items_key, sub_steps
webhook url, payload_template, output_key
log message, level

Templates use {{key}} or {{nested.key}} referencing the pipeline context.

Local Dev (without Docker)

pip install -r requirements.txt
cp .env.example .env   # fill in values
uvicorn app.main:app --reload

Project Structure

app/
  config.py          # Pydantic settings
  database.py        # Motor async MongoDB client
  main.py            # FastAPI app + lifespan
  models/
    pipeline.py      # All Pydantic models
  engine/
    executor.py      # Parallel DAG executor
    runner.py        # Pipeline lifecycle + persistence
    steps/           # 8 pluggable step implementations
  api/
    pipelines.py     # REST endpoints
    health.py        # Health check

About

No description, website, or topics provided.

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Contributors