Skip to content

Issue 3: Queue + persistence layer #622

Description

@gewenyu99

Issue 3: Queue + persistence layer

Epic: Task-queue orchestrator runner · Depends on: none (parallel with #621) · PR: #607

Why

The queue is the spine of the orchestrator. It lives in memory during the run, and
that is the source of truth. The disk file is a reflection of the in-memory queue,
written asynchronously and kept in sync. Disk gives us the audit log,
observability, and a base for resume later.

Resume across runs and crashes is #629. This issue mirrors the queue to disk. It
does not read a leftover file back on the next run. The schema below is
resume-ready so #629 stays additive.

Scope / deliverable

  1. QueueStore (queue.ts) holds the queue in memory, an array of
    QueuedTask, as the source of truth, and reflects it to disk:
    <installDir>/.posthog-wizard/
      queue.json              async reflection of the in-memory queue
      audit.jsonl             append-only event log (enqueue, start, complete, fail)
      handoffs/<taskId>.json  per-task structured handoff for downstream tasks
    
    Cross-agent context travels in the structured per-task handoffs (schema in Issue 4: Orchestrator MCP tools (in wizard-tools) #623):
    what the task did, what its goal was, and what the next agent should know. File
    names follow the existing .posthog-audit-checks.json and .posthog-events.json
    conventions.
  2. Task record schema:
    interface QueuedTask {
      id: string;                          // stable id
      type: string;                        // resolves to an agent prompt (#625)
      status: 'pending'|'in_progress'|'done'|'failed'|'blocked';
      dependsOn: string[];                 // task ids
      inputs: Record<string, unknown>;
      model?: string;
      attempts: number;
      maxAttempts: number;                 // default 2
      handoffPath?: string;
      enqueuedBy: 'orchestrator' | string; // task id, for dynamic enqueue + guards
      depth: number;                       // termination guard
      createdAt: string; startedAt?: string; finishedAt?: string;
      error?: { type: string; message: string };
    }
    interface QueueFile { version: 1; runId: string; tasks: QueuedTask[]; }
  3. In-memory transitions, async reflection. Every transition mutates the
    in-memory array under a makeMutex lock so concurrent tasks at cap above 1 do
    not race, appends to audit.jsonl, and schedules an async flush of the whole
    queue to queue.json. The flush is debounced and coalesced, following the
    TaskStreamPush 250 ms debounce pattern (task-stream-push.ts), so rapid
    transitions do not thrash the disk, and writes are atomic via
    writeLedgerAtomic. A transition never blocks on the disk write. On run end, a
    final synchronous flush leaves the file matching memory. Lift
    writeLedgerAtomic (wizard-tools.ts:375) and makeMutex (:481) into a
    shared src/utils/atomic-ledger.ts, and update the audit tools to import from
    the new location.
  4. Each run starts a new queue with a new runId. A leftover queue.json is
    overwritten by the first flush. Reading a leftover queue back is Issue 10: Resume across runs/crashes (deferred, low priority) #629. Keep the
    version field stamped for future schema-drift handling.
  5. Selection helpers: nextRunnable() (pending tasks whose dependsOn are
    all done, respecting the concurrency cap), allTerminal(), summary().
    Within a run, a failed task may retry while attempts < maxAttempts.

Key files

  • new src/lib/programs/orchestrator/queue.ts (with unit tests)
  • new src/utils/atomic-ledger.ts (lifted writeLedgerAtomic and makeMutex)
  • src/lib/wizard-tools.ts (import the lifted helpers, no behavior change)

Acceptance criteria

  • Unit tests cover enqueue, dependency-aware nextRunnable, each status
    transition, and within-run retry while attempts < maxAttempts.
  • Concurrent transitions serialize via the mutex with no lost updates, even at
    concurrency above 1.
  • The in-memory queue is the source of truth. queue.json is an async
    reflection that converges to match it, and a final flush on run end leaves
    the file matching memory.
  • audit.jsonl and the handoffs are written and readable mid-run.
  • A new run starts from an empty in-memory queue. A leftover queue.json is
    overwritten. Reading it back is Issue 10: Resume across runs/crashes (deferred, low priority) #629.
  • The audit tools pass after importing the lifted helpers.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions