Skip to content

maco144/wave-executor

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

5 Commits
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

wave-executor

Dependency-aware async task scheduling for Python, with three execution modes that compose into a four-level hierarchy.

Task (atomic) → DAGExecutor → WaveReconciler → MetaDAG

Zero required dependencies. Pure asyncio. Python 3.9+.


The problem with level-based task execution

Most async task runners group tasks into "waves" by dependency depth:

A (10s) ─┐
          ├─→ C (1s)
B  (1s) ─┘
D  (1s) ─→ E (1s)

A level-based executor sees depth-0 = {A, B, D} and depth-1 = {C, E}. E must wait 10 seconds for A, even though E only depends on D.

DAGExecutor starts each task the moment its specific dependencies complete. E starts at t=1, completes at t=2 — 9 seconds earlier.


Installation

pip install wave-executor

DAGExecutor — one-shot dependency graph

import asyncio
from wave_executor import DAGExecutor, WaveConfig

async def process(task_id: str, spec: dict):
    # Your actual work here
    return f"result-{task_id}"

async def main():
    executor = DAGExecutor(config=WaveConfig(max_parallel_per_wave=20))
    result = await executor.execute(
        tasks={
            "fetch_a": {"url": "..."},
            "fetch_b": {"url": "..."},
            "analyze":  {"model": "fast"},
            "report":   {},
        },
        dependencies={
            "analyze": {"fetch_a", "fetch_b"},  # waits for both fetches
            "report":  {"analyze"},
        },
        task_executor=process,
    )

    print(f"{result.total_successful}/{result.total_tasks} succeeded")
    print(f"Critical path: {result.get_critical_path(dependencies)}")

asyncio.run(main())

Key features:

  • asyncio.Event per task — zero-overhead dependency signalling
  • AdaptiveSemaphore — AIMD-controlled concurrency window (self-adjusts under load)
  • Failed deps automatically skip all downstream tasks
  • Optional dep-output injection: completed results are injected into downstream specs under _dep_outputs
  • Exponential-backoff retry, per-task timeout, total execution timeout
  • Cycle detection (Kahn's algorithm) before execution starts

WaveReconciler — desired-state reconciliation

Inspired by stereOS agentd's reconciliation loop. Instead of running once and returning, the reconciler holds a desired state and continuously ensures tasks match it.

from wave_executor import WaveReconciler, WaveConfig
from wave_executor.reconciler import RestartPolicy

async def run_job(task_id: str, spec: dict):
    # Work that might crash and needs restarting
    ...

reconciler = WaveReconciler(
    task_executor=run_job,
    config=WaveConfig(max_parallel_per_wave=10),
    poll_interval_s=2.0,
)

await reconciler.start()

await reconciler.update(
    tasks={
        "validator":  {"port": 8080},
        "aggregator": {"upstream": "validator"},
    },
    dependencies={"aggregator": {"validator"}},
    policies={
        "validator":  "always",      # restart after any exit
        "aggregator": "on-failure",  # restart only on crash
    },
    max_restart_attempts={"validator": 100, "aggregator": 5},
    restart_backoff_base=3.0,   # exponential: 0s, 3s, 6s, 12s...
)

# Hot-swap the spec while running — unchanged tasks are undisturbed
await reconciler.update(tasks={"validator": {"port": 9090}, "aggregator": ...})

# Wait for all non-ALWAYS tasks to reach terminal state
ok = await reconciler.wait_until_complete(timeout=60)
await reconciler.stop()

Key features:

  • Three restart policies: no / on-failure / always
  • Exponential backoff: base * 2^(n-1) seconds between retries
  • Hot-update desired state without stopping everything
  • max_restart_attempts ceiling — task marked "blocked" when exceeded
  • Dependency-aware: tasks only start once upstream deps have "success" status
  • Dep-output injection: outputs from completed deps injected into downstream specs

MetaDAG — DAG of DAGs

Orchestrate multiple DAGExecutors and WaveReconcilers as nodes in a higher-level dependency graph. Upstream node outputs are injected into downstream nodes' task specs.

from wave_executor import MetaDAG
from wave_executor.meta import MetaDAGNodeSpec

async def fetch(task_id, spec): ...
async def analyze(task_id, spec): ...
async def store(task_id, spec): ...

meta = (
    MetaDAG(inject_dep_outputs=True)
    .add_node(MetaDAGNodeSpec(
        node_id="ingest",
        tasks={"page_a": url_a, "page_b": url_b},
        task_executor=fetch,
    ))
    .add_node(MetaDAGNodeSpec(
        node_id="analyze",
        tasks={"report": {}},
        task_executor=analyze,   # receives _upstream_outputs["ingest"]
    ), depends_on=["ingest"])
    .add_node(MetaDAGNodeSpec(
        node_id="store",
        tasks={"db_write": {}},
        task_executor=store,
    ), depends_on=["analyze"])
)

result = await meta.execute()
print(result.to_dict())

Nodes can also be WaveReconciler instances (set node_type="reconciler"), which run until all their tasks complete before the node is considered done.


Configuration

from wave_executor import WaveConfig

config = WaveConfig(
    max_parallel_per_wave=50,      # global concurrency cap
    default_task_timeout=30.0,     # per-task timeout (seconds)
    total_execution_timeout=600.0, # hard limit for the whole execution
    max_retries=3,                 # exponential-backoff retries
    retry_backoff_factor=2.0,
    adaptive_batch_window=True,    # AIMD concurrency auto-scaling
    min_parallel=2,                # floor for adaptive window
    saturation_threshold=20,       # waiting tasks that trigger shrink
)

Observability

Subscribe to execution events on any level:

executor = DAGExecutor()

@executor.on_event
async def handle(event_type: str, event: dict):
    print(event_type, event["task_id"], event.get("duration_ms"))

# Events: EXECUTION_STARTED, TASK_STARTED, TASK_COMPLETED,
#         TASK_FAILED, TASK_SKIPPED, EXECUTION_COMPLETED

MetaDAG fires additional events: META_EXECUTION_STARTED, NODE_STARTED, NODE_COMPLETED, NODE_SKIPPED, META_EXECUTION_COMPLETED.


Running tests

pip install wave-executor[dev]
pytest

Changelog

v0.3.0 (2026-03-08)

Structured escalation reports

When a task exhausts all retries or hits a timeout, DAGExecutor now emits an EscalationReport instead of a bare error string. The report is stored under task_result.output["escalation"] and contains:

  • failure_history — error message from every attempt
  • root_cause — heuristic: consistent errors across attempts → "code bug or bad input spec"; circuit trip → "downstream service unavailable"
  • resolution_options — suggested next actions
  • circuit_state — circuit breaker state at escalation time
  • escalated_at — ISO timestamp

A TASK_ESCALATED event is also fired to all registered on_event callbacks.

Circuit breaker HALF_OPEN pre-entry guard

_check_circuit_breaker_state() is now called at the top of every retry iteration (attempt > 0), before the task executor and before any backoff sleep. It fast-fails if the circuit is already HALF_OPEN (a probe is in flight — additional retries must not pile on as extra probes) or OPEN with recovery timeout not yet elapsed. This makes the single-probe recovery semantics strict rather than advisory.

EscalationReport is now exported from the top-level package.


v0.2.0

  • Circuit breaker (CLOSED → OPEN → HALF_OPEN → CLOSED lifecycle) on DAGExecutor
  • Checkpoint persistence on task failure
  • AdaptiveSemaphore AIMD concurrency control

v0.1.0

  • Initial release: DAGExecutor, WaveReconciler, MetaDAG
  • Kahn's algorithm cycle detection
  • Dep-output injection, exponential-backoff retry, per-task timeout
  • on_event observability callbacks

License

MIT

About

MetaDAG. a DAG of DAGs

Resources

License

Stars

Watchers

Forks

Packages

 
 
 

Contributors

Languages