From 71127485c64a25e43b24de17e4aee1788954e5cb Mon Sep 17 00:00:00 2001 From: "Vincent (Wen Yu) Ge" Date: Mon, 8 Jun 2026 21:50:30 -0400 Subject: [PATCH 1/5] feat(orchestrator): flag gating + shared bootstrap extraction Co-Authored-By: Claude Opus 4.8 (1M context) --- .../agent/__tests__/variant-gating.test.ts | 36 ++++++ src/lib/agent/agent-interface.ts | 12 ++ src/lib/agent/agent-runner.ts | 116 +++++++++++++++--- src/lib/constants.ts | 2 + .../orchestrator/orchestrator-runner.ts | 33 +++++ 5 files changed, 181 insertions(+), 18 deletions(-) create mode 100644 src/lib/agent/__tests__/variant-gating.test.ts create mode 100644 src/lib/programs/orchestrator/orchestrator-runner.ts diff --git a/src/lib/agent/__tests__/variant-gating.test.ts b/src/lib/agent/__tests__/variant-gating.test.ts new file mode 100644 index 00000000..699bd096 --- /dev/null +++ b/src/lib/agent/__tests__/variant-gating.test.ts @@ -0,0 +1,36 @@ +import { + buildWizardMetadata, + isOrchestratorEnabled, +} from '@lib/agent/agent-interface'; + +describe('isOrchestratorEnabled', () => { + it('is true only when the wizard-orchestrator flag is true', () => { + expect(isOrchestratorEnabled({ 'wizard-orchestrator': 'true' })).toBe(true); + }); + + it('is false when the flag is false, another flag, or absent', () => { + expect(isOrchestratorEnabled({ 'wizard-orchestrator': 'false' })).toBe( + false, + ); + expect(isOrchestratorEnabled({ 'wizard-variant': 'orchestrator' })).toBe( + false, + ); + expect(isOrchestratorEnabled({})).toBe(false); + expect(isOrchestratorEnabled()).toBe(false); + }); +}); + +describe('buildWizardMetadata', () => { + it('selects a known variant header from the flag', () => { + expect(buildWizardMetadata({ 'wizard-variant': 'subagents' })).toEqual({ + VARIANT: 'subagents', + }); + }); + + it('falls back to the base variant for unknown or missing flags', () => { + expect(buildWizardMetadata({ 'wizard-variant': 'nope' })).toEqual({ + VARIANT: 'base', + }); + expect(buildWizardMetadata({})).toEqual({ VARIANT: 'base' }); + }); +}); diff --git a/src/lib/agent/agent-interface.ts b/src/lib/agent/agent-interface.ts index 8c3be73e..35c45d03 100644 --- a/src/lib/agent/agent-interface.ts +++ b/src/lib/agent/agent-interface.ts @@ -14,6 +14,7 @@ import { POSTHOG_PROPERTY_HEADER_PREFIX, WIZARD_VARIANT_FLAG_KEY, WIZARD_VARIANTS, + WIZARD_ORCHESTRATOR_FLAG_KEY, WIZARD_USER_AGENT, } from '@lib/constants'; import { @@ -363,6 +364,17 @@ export function buildWizardMetadata( return { ...variant }; } +/** + * Whether this run uses the experimental task-queue orchestrator. Gated by the + * boolean `wizard-orchestrator` feature flag, targeted to the user in the wizard's + * analytics project. + */ +export function isOrchestratorEnabled( + flags: Record = {}, +): boolean { + return flags[WIZARD_ORCHESTRATOR_FLAG_KEY] === 'true'; +} + /** * Build env for the SDK subprocess: process.env plus ANTHROPIC_CUSTOM_HEADERS, which always * includes `x-posthog-use-bedrock-fallback: true` so the LLM gateway falls back to Bedrock on diff --git a/src/lib/agent/agent-runner.ts b/src/lib/agent/agent-runner.ts index 78bb01e9..da8eb018 100644 --- a/src/lib/agent/agent-runner.ts +++ b/src/lib/agent/agent-runner.ts @@ -9,9 +9,11 @@ * - What MCP servers and package manager detector to use * - What happens after the agent completes * - * The pipeline itself is fixed: - * init → health check → settings → OAuth → [skill install] → - * agent init → prompt → run → errors → [postRun] → outro + * The pipeline runs a shared bootstrap (logging, health check, settings, OAuth, + * flags, MCP url), then forks. The `orchestrator` variant routes to the + * experimental task-queue runner. Every other variant runs the fixed linear + * pipeline: + * [skill install] → agent init → prompt → run → errors → [postRun] → outro */ import { @@ -29,10 +31,12 @@ import { AgentErrorType, AgentSignals, buildWizardMetadata, + isOrchestratorEnabled, checkAllSettingsConflicts, backupAndFixClaudeSettings, restoreClaudeSettings, } from './agent-interface'; +import { runOrchestrator } from '../programs/orchestrator/orchestrator-runner'; import { getCloudUrlFromRegion } from '@utils/urls'; import { evaluateWizardReadiness, @@ -51,7 +55,7 @@ import { getSkillsBaseUrl } from '@lib/constants'; import { runtimeEnv } from '@env'; import { installSkillById, type InstallSkillResult } from '@lib/wizard-tools'; import { createWizardAskBridge } from '@lib/wizard-ask-bridge'; -import type { WizardRunOptions } from '@utils/types'; +import type { WizardRunOptions, CloudRegion } from '@utils/types'; import type { ProgramConfig } from '@lib/programs/program-step'; import { assemblePrompt, type PromptContext } from './agent-prompt'; @@ -106,7 +110,7 @@ export interface ProgramRun { buildOutroData?: ( session: WizardSession, credentials: Credentials, - cloudRegion: import('@utils/types').CloudRegion | undefined, + cloudRegion: CloudRegion | undefined, ) => WizardSession['outroData']; /** * Per-run cap on `wizard_ask` invocations. Defaults to 10. The 4th call @@ -115,6 +119,23 @@ export interface ProgramRun { maxQuestions?: number; } +/** + * Result of the shared bootstrap, consumed by both the linear and the + * orchestrator arm. Credentials, role, and user are already applied to the + * session by `bootstrapProgram`; this carries the values both arms still need. + */ +export interface BootstrapResult { + skillsBaseUrl: string; + projectApiKey: Credentials['projectApiKey']; + host: Credentials['host']; + accessToken: Credentials['accessToken']; + projectId: Credentials['projectId']; + cloudRegion: CloudRegion; + mcpUrl: string; + wizardFlags: Record; + wizardMetadata: Record; +} + // ── Helpers ────────────────────────────────────────────────────────── /** @@ -170,16 +191,35 @@ export async function runAgent( /** * Run a program's agent pipeline. * - * This is the single execution path for all programs — both skill-based - * (revenue analytics) and framework-based (core integration). The - * `ProgramRun` controls what varies between them; `programConfig` carries - * the program-level static metadata (tool allow/disallow lists, etc.). + * Runs the shared bootstrap, then forks on the `wizard-variant` flag. The + * `orchestrator` variant routes to the experimental task-queue runner; every + * other variant runs the linear pipeline. */ export async function runProgram( session: WizardSession, config: ProgramRun, programConfig: ProgramConfig, ): Promise { + const boot = await bootstrapProgram(session, config, programConfig); + + if (isOrchestratorEnabled(boot.wizardFlags)) { + return runOrchestrator(session, programConfig, boot); + } + + return runLinearProgram(session, config, programConfig, boot); +} + +/** + * Shared setup for both arms: logging, health check, settings conflicts, OAuth + * and credentials, then the feature flags, variant metadata, and MCP url. Sets + * `session.credentials`, role, and user as a side effect. Returns the values the + * arms still need. + */ +async function bootstrapProgram( + session: WizardSession, + config: ProgramRun, + programConfig: ProgramConfig, +): Promise { // 1. Init logging + debug initLogFile(); session.skillId = config.skillId ?? config.integrationLabel; @@ -283,6 +323,55 @@ export async function runProgram( analytics.setGroups(groupsFromUser(user, host)); + // Feature flags, variant metadata, and MCP url. Both arms need these, and the + // fork decision reads the flags. + const wizardFlags = await analytics.getAllFlagsForWizard(); + const wizardMetadata = buildWizardMetadata(wizardFlags); + + const mcpUrl = session.localMcp + ? 'http://localhost:8787/mcp' + : runtimeEnv('MCP_URL') || + (cloudRegion === 'eu' + ? 'https://mcp-eu.posthog.com/mcp' + : 'https://mcp.posthog.com/mcp'); + + return { + skillsBaseUrl, + projectApiKey, + host, + accessToken, + projectId, + cloudRegion, + mcpUrl, + wizardFlags, + wizardMetadata, + }; +} + +/** + * The linear pipeline. Single execution path for all non-orchestrator programs, + * both skill-based (revenue analytics) and framework-based (core integration). + * The `ProgramRun` controls what varies between them; `programConfig` carries the + * program-level static metadata (tool allow/disallow lists, etc.). + */ +async function runLinearProgram( + session: WizardSession, + config: ProgramRun, + programConfig: ProgramConfig, + boot: BootstrapResult, +): Promise { + const { + skillsBaseUrl, + projectApiKey, + host, + accessToken, + projectId, + cloudRegion, + mcpUrl, + wizardFlags, + wizardMetadata, + } = boot; + // 5. Skill install (if skillId provided) let skillPath: string | undefined; if (config.skillId) { @@ -302,15 +391,6 @@ export async function runProgram( // 6. Initialize agent const spinner = getUI().spinner(); - const wizardFlags = await analytics.getAllFlagsForWizard(); - const wizardMetadata = buildWizardMetadata(wizardFlags); - - const mcpUrl = session.localMcp - ? 'http://localhost:8787/mcp' - : runtimeEnv('MCP_URL') || - (cloudRegion === 'eu' - ? 'https://mcp-eu.posthog.com/mcp' - : 'https://mcp.posthog.com/mcp'); const restoreSettings = () => restoreClaudeSettings(session.installDir); getUI().onEnterScreen('outro', restoreSettings); diff --git a/src/lib/constants.ts b/src/lib/constants.ts index bb7f3580..9f7357c3 100644 --- a/src/lib/constants.ts +++ b/src/lib/constants.ts @@ -141,6 +141,8 @@ export const WIZARD_INTERACTION_EVENT_NAME = 'wizard interaction'; export const WIZARD_REMARK_EVENT_NAME = 'wizard remark'; /** Feature flag key whose value selects a variant from WIZARD_VARIANTS. */ export const WIZARD_VARIANT_FLAG_KEY = 'wizard-variant'; +/** Boolean feature flag that routes a run to the experimental orchestrator runner. */ +export const WIZARD_ORCHESTRATOR_FLAG_KEY = 'wizard-orchestrator'; /** Feature flag key that gates the intro-screen "Tools" menu. */ export const WIZARD_TOOLS_MENU_FLAG_KEY = 'wizard-tools-menu'; /** Variant key -> metadata for wizard run (VARIANT flag selects which entry to use). */ diff --git a/src/lib/programs/orchestrator/orchestrator-runner.ts b/src/lib/programs/orchestrator/orchestrator-runner.ts new file mode 100644 index 00000000..f14fd9e6 --- /dev/null +++ b/src/lib/programs/orchestrator/orchestrator-runner.ts @@ -0,0 +1,33 @@ +/** + * Experimental task-queue orchestrator runner. + * + * Branches from the linear runner when the `wizard-orchestrator` feature flag is + * on. The shape: an orchestrator agent inspects the repo and seeds an + * in-memory task queue, and an executor drains it one fresh agent per task. + * + * This is the stub. It logs, emits a start event, and returns. The queue, the + * executor, and the seeding agent land in the following issues. + */ +import type { WizardSession } from '../../wizard-session'; +import type { ProgramConfig } from '../program-step'; +import type { BootstrapResult } from '../../agent/agent-runner'; +import { getUI } from '../../../ui'; +import { logToFile } from '../../../utils/debug'; +import { analytics } from '../../../utils/analytics'; + +export function runOrchestrator( + session: WizardSession, + programConfig: ProgramConfig, + _boot: BootstrapResult, +): Promise { + logToFile( + `[orchestrator] START program=${programConfig.id} dir=${session.installDir}`, + ); + analytics.wizardCapture('orchestrator started', { + program_id: programConfig.id, + }); + getUI().log.info( + 'Orchestrator flag is on. This runner is a stub for now; the queue and executor land in the following issues.', + ); + return Promise.resolve(); +} From 48e0be9e4aaa4ec0405dcd268dfb1dcd235ecf38 Mon Sep 17 00:00:00 2001 From: "Vincent (Wen Yu) Ge" Date: Mon, 8 Jun 2026 22:15:13 -0400 Subject: [PATCH 2/5] feat(orchestrator): in-memory queue + disk persistence (QueueStore) Co-Authored-By: Claude Opus 4.8 (1M context) --- .../orchestrator/__tests__/queue.test.ts | 135 ++++++++++ src/lib/programs/orchestrator/queue.ts | 231 ++++++++++++++++++ src/lib/wizard-tools.ts | 23 +- src/utils/atomic-ledger.ts | 29 +++ 4 files changed, 398 insertions(+), 20 deletions(-) create mode 100644 src/lib/programs/orchestrator/__tests__/queue.test.ts create mode 100644 src/lib/programs/orchestrator/queue.ts create mode 100644 src/utils/atomic-ledger.ts diff --git a/src/lib/programs/orchestrator/__tests__/queue.test.ts b/src/lib/programs/orchestrator/__tests__/queue.test.ts new file mode 100644 index 00000000..4a18dee2 --- /dev/null +++ b/src/lib/programs/orchestrator/__tests__/queue.test.ts @@ -0,0 +1,135 @@ +import * as fs from 'fs'; +import * as os from 'os'; +import * as path from 'path'; +import { + QueueStore, + type QueueFile, + type TaskHandoff, +} from '@lib/programs/orchestrator/queue'; + +function tmpDir(): string { + return fs.mkdtempSync(path.join(os.tmpdir(), 'queue-test-')); +} + +describe('QueueStore', () => { + let dir: string; + let q: QueueStore; + + beforeEach(() => { + dir = tmpDir(); + q = new QueueStore(dir, 'run-1'); + }); + + afterEach(() => { + fs.rmSync(dir, { recursive: true, force: true }); + }); + + it('enqueues a pending task with defaults', () => { + const t = q.enqueue({ type: 'install' }); + expect(t.status).toBe('pending'); + expect(t.attempts).toBe(0); + expect(t.maxAttempts).toBe(2); + expect(t.enqueuedBy).toBe('orchestrator'); + expect(t.dependsOn).toEqual([]); + expect(q.list()).toHaveLength(1); + }); + + it('only marks a task runnable once its dependencies are done', () => { + const a = q.enqueue({ type: 'install' }); + const b = q.enqueue({ type: 'init', dependsOn: [a.id] }); + + expect(q.nextRunnable().map((t) => t.id)).toEqual([a.id]); + + q.start(a.id); + q.complete(a.id); + expect(q.nextRunnable().map((t) => t.id)).toEqual([b.id]); + }); + + it('returns every runnable task; the graph alone decides parallelism', () => { + const a = q.enqueue({ type: 'install' }); + const b = q.enqueue({ type: 'init' }); + q.enqueue({ type: 'capture', dependsOn: [a.id, b.id] }); + + // Both independent tasks are runnable at once; the dependent one is not. + expect( + q + .nextRunnable() + .map((t) => t.id) + .sort(), + ).toEqual([a.id, b.id].sort()); + + q.start(a.id); + // An in-progress task is no longer offered. + expect(q.nextRunnable().map((t) => t.id)).toEqual([b.id]); + }); + + it('treats a skipped dependency as satisfied', () => { + const a = q.enqueue({ type: 'install' }); + const b = q.enqueue({ type: 'init', dependsOn: [a.id] }); + + q.start(a.id); + q.skip(a.id); + expect(q.nextRunnable().map((t) => t.id)).toEqual([b.id]); + }); + + it('start increments attempts and supports within-run retry while attempts remain', () => { + const t = q.enqueue({ type: 'install', maxAttempts: 2 }); + q.start(t.id); + expect(q.get(t.id)?.attempts).toBe(1); + + q.fail(t.id, { type: 'API_ERROR', message: 'boom' }); + expect(q.get(t.id)?.status).toBe('failed'); + + // Retry: attempts (1) < maxAttempts (2), so requeue and run again. + q.requeue(t.id); + expect(q.get(t.id)?.status).toBe('pending'); + q.start(t.id); + expect(q.get(t.id)?.attempts).toBe(2); + }); + + it('completing a task records and reads back a structured handoff', () => { + const t = q.enqueue({ type: 'install' }); + const handoff: TaskHandoff = { + goals: 'install the sdk', + did: 'added posthog-js', + forNextAgent: 'env vars not set yet', + filesTouched: ['package.json'], + }; + q.start(t.id); + q.complete(t.id, handoff); + + expect(q.get(t.id)?.status).toBe('done'); + expect(q.readHandoff(t.id)).toEqual(handoff); + expect(q.readHandoffsByType('install')).toEqual([handoff]); + }); + + it('is drained when a pending task is blocked by a failed dependency', () => { + const a = q.enqueue({ type: 'install' }); + q.enqueue({ type: 'init', dependsOn: [a.id] }); + + expect(q.isDrained()).toBe(false); + q.start(a.id); + q.fail(a.id, { type: 'API_ERROR', message: 'boom' }); + + // init can never run now, and nothing is in progress. + expect(q.nextRunnable()).toHaveLength(0); + expect(q.isDrained()).toBe(true); + }); + + it('reflects every transition to queue.json, handoffs included', () => { + const a = q.enqueue({ type: 'install' }); + q.start(a.id); + q.complete(a.id, { + goals: 'g', + did: 'd', + forNextAgent: 'n', + }); + + const file = JSON.parse(fs.readFileSync(q.queuePath, 'utf8')) as QueueFile; + expect(file.version).toBe(1); + expect(file.runId).toBe('run-1'); + expect(file.tasks).toHaveLength(1); + expect(file.tasks[0].status).toBe('done'); + expect(file.tasks[0].handoff?.did).toBe('d'); + }); +}); diff --git a/src/lib/programs/orchestrator/queue.ts b/src/lib/programs/orchestrator/queue.ts new file mode 100644 index 00000000..0ac9cb46 --- /dev/null +++ b/src/lib/programs/orchestrator/queue.ts @@ -0,0 +1,231 @@ +/** + * The orchestrator task queue. + * + * In memory, synchronous, single-owner: one Node process drives the run, so + * there is no locking. The queue imposes no execution policy — `nextRunnable` + * returns every pending task whose dependencies are satisfied, and how many of + * those run at once is decided by the task graph, not the queue. + * + * Every transition rewrites `/.posthog-wizard/queue.json`, a small + * file holding the whole queue, handoffs included. Today it is the run's + * log and the report's source; later it is the resume point. + */ +import * as fs from 'fs'; +import * as path from 'path'; +import { randomUUID } from 'crypto'; +import { writeJsonAtomic } from '../../../utils/atomic-ledger'; + +export type TaskStatus = + | 'pending' + | 'in_progress' + | 'done' + | 'skipped' + | 'failed'; + +export interface QueuedTask { + id: string; + type: string; + status: TaskStatus; + dependsOn: string[]; + inputs: Record; + model?: string; + attempts: number; + maxAttempts: number; + /** The structured handoff the task reported on completion. */ + handoff?: TaskHandoff; + /** 'orchestrator' for seeded tasks, or the id of the task that enqueued this one. */ + enqueuedBy: string; + createdAt: string; + startedAt?: string; + finishedAt?: string; + error?: { type: string; message: string }; +} + +export interface QueueFile { + version: 1; + runId: string; + tasks: QueuedTask[]; +} + +/** The structured handoff a task leaves for the next agent. */ +export interface TaskHandoff { + goals: string; + did: string; + forNextAgent: string; + filesTouched?: string[]; +} + +export interface EnqueueInput { + type: string; + inputs?: Record; + dependsOn?: string[]; + model?: string; + maxAttempts?: number; + enqueuedBy?: string; +} + +export const QUEUE_DIR_NAME = '.posthog-wizard'; +const DEFAULT_MAX_ATTEMPTS = 2; + +function nowIso(): string { + return new Date().toISOString(); +} + +export class QueueStore { + private tasks: QueuedTask[] = []; + + readonly runId: string; + readonly queuePath: string; + + constructor(installDir: string, runId: string) { + this.runId = runId; + const dir = path.join(installDir, QUEUE_DIR_NAME); + this.queuePath = path.join(dir, 'queue.json'); + fs.mkdirSync(dir, { recursive: true }); + } + + // ── Reads ─────────────────────────────────────────────────────────── + + list(): readonly QueuedTask[] { + return this.tasks; + } + + get(id: string): QueuedTask | undefined { + return this.tasks.find((t) => t.id === id); + } + + /** + * Every pending task whose dependencies are all satisfied (`done` or + * `skipped`). A skipped dependency does not block downstream work. + */ + nextRunnable(): QueuedTask[] { + const doneIds = new Set( + this.tasks + .filter((t) => t.status === 'done' || t.status === 'skipped') + .map((t) => t.id), + ); + return this.tasks.filter( + (t) => t.status === 'pending' && t.dependsOn.every((d) => doneIds.has(d)), + ); + } + + /** + * True when no task is in progress and none can be started. Either everything + * is terminal, or the only pending tasks are blocked by a failed dependency. + */ + isDrained(): boolean { + if (this.tasks.some((t) => t.status === 'in_progress')) return false; + return this.nextRunnable().length === 0; + } + + summary(): Record & { total: number } { + const counts: Record = { + pending: 0, + in_progress: 0, + done: 0, + skipped: 0, + failed: 0, + }; + for (const t of this.tasks) counts[t.status] += 1; + return { ...counts, total: this.tasks.length }; + } + + readHandoff(id: string): TaskHandoff | null { + return this.get(id)?.handoff ?? null; + } + + /** Handoffs of completed tasks of a given type, oldest first. */ + readHandoffsByType(type: string): TaskHandoff[] { + return this.tasks + .filter((t) => t.type === type && t.handoff) + .map((t) => t.handoff as TaskHandoff); + } + + // ── Transitions (each one reflected to queue.json) ────────────────── + + enqueue(input: EnqueueInput): QueuedTask { + const task: QueuedTask = { + id: randomUUID(), + type: input.type, + status: 'pending', + dependsOn: input.dependsOn ?? [], + inputs: input.inputs ?? {}, + model: input.model, + attempts: 0, + maxAttempts: input.maxAttempts ?? DEFAULT_MAX_ATTEMPTS, + enqueuedBy: input.enqueuedBy ?? 'orchestrator', + createdAt: nowIso(), + }; + this.tasks.push(task); + this.reflect(); + return task; + } + + start(id: string): QueuedTask { + const t = this.require(id); + t.status = 'in_progress'; + t.startedAt = nowIso(); + t.attempts += 1; + this.reflect(); + return t; + } + + complete(id: string, handoff?: TaskHandoff): QueuedTask { + return this.finish(id, 'done', handoff); + } + + /** Terminal: the agent could not do the task. Not done, not failed. */ + skip(id: string, handoff?: TaskHandoff): QueuedTask { + return this.finish(id, 'skipped', handoff); + } + + fail( + id: string, + error: { type: string; message: string }, + handoff?: TaskHandoff, + ): QueuedTask { + const t = this.require(id); + t.error = error; + return this.finish(id, 'failed', handoff); + } + + /** Put a failed/in-progress task back to pending for a retry within the run. */ + requeue(id: string): QueuedTask { + const t = this.require(id); + t.status = 'pending'; + t.startedAt = undefined; + t.finishedAt = undefined; + this.reflect(); + return t; + } + + // ── Internals ─────────────────────────────────────────────────────── + + private finish( + id: string, + status: 'done' | 'skipped' | 'failed', + handoff?: TaskHandoff, + ): QueuedTask { + const t = this.require(id); + if (handoff) t.handoff = handoff; + t.status = status; + t.finishedAt = nowIso(); + this.reflect(); + return t; + } + + private reflect(): void { + const file: QueueFile = { + version: 1, + runId: this.runId, + tasks: this.tasks, + }; + writeJsonAtomic(this.queuePath, file); + } + + private require(id: string): QueuedTask { + const t = this.get(id); + if (!t) throw new Error(`No task ${id} in the queue`); + return t; + } +} diff --git a/src/lib/wizard-tools.ts b/src/lib/wizard-tools.ts index ab8824b6..aae21ec5 100644 --- a/src/lib/wizard-tools.ts +++ b/src/lib/wizard-tools.ts @@ -16,6 +16,7 @@ import { z } from 'zod'; import { logToFile } from '@utils/debug'; import { analytics } from '@utils/analytics'; import { skillTmpPath } from '@utils/paths'; +import { writeJsonAtomic, makeMutex } from '@utils/atomic-ledger'; import type { PackageManagerDetector } from './detection/package-manager'; import { AUDIT_CHECKS_FILE, @@ -368,14 +369,9 @@ const auditUpdateSchema = z.object({ details: z.string().optional(), }); -/** - * Atomically write JSON: write to .tmp then rename. The rename is what bumps - * the file's mtime, which is what the UI's file watcher polls on. - */ +/** Atomically write the audit ledger. Thin typed wrapper over writeJsonAtomic. */ function writeLedgerAtomic(targetPath: string, checks: AuditCheck[]): void { - const tmpPath = `${targetPath}.tmp`; - fs.writeFileSync(tmpPath, JSON.stringify(checks, null, 2), 'utf8'); - fs.renameSync(tmpPath, targetPath); + writeJsonAtomic(targetPath, checks); } /** @@ -474,19 +470,6 @@ function appendAuditChecksToLedger( return { ok: true, added: additions.length }; } -/** - * Single async mutex shared by audit tools — guarantees a read-modify-write - * cycle on the ledger is atomic across concurrent tool calls (e.g. future subagents). - */ -function makeMutex() { - let chain: Promise = Promise.resolve(); - return async function run(fn: () => Promise | T): Promise { - const next = chain.then(() => fn()); - chain = next.catch(() => undefined); - return next; - }; -} - // --------------------------------------------------------------------------- // Server factory // --------------------------------------------------------------------------- diff --git a/src/utils/atomic-ledger.ts b/src/utils/atomic-ledger.ts new file mode 100644 index 00000000..0ae8c832 --- /dev/null +++ b/src/utils/atomic-ledger.ts @@ -0,0 +1,29 @@ +/** + * Small shared primitives for on-disk ledgers: an atomic JSON writer and a + * single-chain async mutex. Used by the audit tools and by the orchestrator + * queue. Lifted here so both share one implementation. + */ +import * as fs from 'fs'; + +/** + * Atomically write JSON: write to a `.tmp` file then rename over the target. The + * rename bumps the file's mtime in one step, which is what a file watcher polls. + */ +export function writeJsonAtomic(targetPath: string, data: unknown): void { + const tmpPath = `${targetPath}.tmp`; + fs.writeFileSync(tmpPath, JSON.stringify(data, null, 2), 'utf8'); + fs.renameSync(tmpPath, targetPath); +} + +/** + * A single async mutex. Serializes read-modify-write cycles so concurrent callers + * (parallel task agents, audit tool calls) never interleave a mutation. + */ +export function makeMutex() { + let chain: Promise = Promise.resolve(); + return async function run(fn: () => Promise | T): Promise { + const next = chain.then(() => fn()); + chain = next.catch(() => undefined); + return next; + }; +} From 8ed4d827a713a2e7fc18fb0aeed40cde71049245 Mon Sep 17 00:00:00 2001 From: "Vincent (Wen Yu) Ge" Date: Mon, 8 Jun 2026 22:25:33 -0400 Subject: [PATCH 3/5] feat(orchestrator): enqueue_task, complete_task, read_handoffs tools with guards Co-Authored-By: Claude Opus 4.8 (1M context) --- .../__tests__/queue-tools.test.ts | 127 +++++++++ src/lib/programs/orchestrator/queue-tools.ts | 253 ++++++++++++++++++ src/lib/wizard-tools.ts | 20 ++ 3 files changed, 400 insertions(+) create mode 100644 src/lib/programs/orchestrator/__tests__/queue-tools.test.ts create mode 100644 src/lib/programs/orchestrator/queue-tools.ts diff --git a/src/lib/programs/orchestrator/__tests__/queue-tools.test.ts b/src/lib/programs/orchestrator/__tests__/queue-tools.test.ts new file mode 100644 index 00000000..318825d2 --- /dev/null +++ b/src/lib/programs/orchestrator/__tests__/queue-tools.test.ts @@ -0,0 +1,127 @@ +import * as fs from 'fs'; +import * as os from 'os'; +import * as path from 'path'; +import { QueueStore } from '@lib/programs/orchestrator/queue'; +import { + applyComplete, + applyEnqueue, + applyReadHandoffs, + checkEnqueueGuards, + type OrchestratorToolsContext, +} from '@lib/programs/orchestrator/queue-tools'; + +function tmpDir(): string { + return fs.mkdtempSync(path.join(os.tmpdir(), 'queue-tools-test-')); +} + +const VALID = ['install', 'init', 'capture']; + +describe('checkEnqueueGuards', () => { + let dir: string; + let store: QueueStore; + let ctx: OrchestratorToolsContext; + + beforeEach(() => { + dir = tmpDir(); + store = new QueueStore(dir, 'run-1'); + ctx = { store, validTypes: VALID }; + }); + + afterEach(() => fs.rmSync(dir, { recursive: true, force: true })); + + it('rejects an unknown type', () => { + const r = checkEnqueueGuards(ctx, { type: 'nope', reason: 'x' }); + expect(r).toMatchObject({ ok: false, guard: 'unknown-type' }); + }); + + it('rejects an unknown dependency', () => { + const r = checkEnqueueGuards(ctx, { + type: 'init', + dependsOn: ['ghost'], + reason: 'x', + }); + expect(r).toMatchObject({ ok: false, guard: 'unknown-dep' }); + }); + + it('trips dedup on the same type and inputs', () => { + store.enqueue({ type: 'install', inputs: { pkg: 'posthog-js' } }); + const r = checkEnqueueGuards(ctx, { + type: 'install', + inputs: { pkg: 'posthog-js' }, + reason: 'x', + }); + expect(r).toMatchObject({ ok: false, guard: 'dedup' }); + }); + + it('allows a valid enqueue', () => { + const r = checkEnqueueGuards(ctx, { type: 'init', reason: 'x' }); + expect(r).toEqual({ ok: true }); + }); +}); + +describe('apply functions', () => { + let dir: string; + let store: QueueStore; + let ctx: OrchestratorToolsContext; + + beforeEach(() => { + dir = tmpDir(); + store = new QueueStore(dir, 'run-1'); + ctx = { store, validTypes: VALID }; + }); + + afterEach(() => fs.rmSync(dir, { recursive: true, force: true })); + + it('attributes a seed enqueue to the orchestrator', () => { + const r = applyEnqueue(ctx, { type: 'install', reason: 'seed' }); + expect(r.ok).toBe(true); + if (!r.ok) return; + expect(r.task.enqueuedBy).toBe('orchestrator'); + }); + + it('attributes a follow-up enqueue to the running task', () => { + const parent = store.enqueue({ type: 'init' }); + ctx.currentTaskId = parent.id; + const r = applyEnqueue(ctx, { type: 'capture', reason: 'follow-up' }); + expect(r.ok).toBe(true); + if (!r.ok) return; + expect(r.task.enqueuedBy).toBe(parent.id); + }); + + it('complete_task fails when no task is running', () => { + const r = applyComplete(ctx, { + status: 'done', + handoff: { goals: 'g', did: 'd', forNextAgent: 'n' }, + }); + expect(r.ok).toBe(false); + }); + + it('complete_task marks the running task done and stores the handoff', () => { + const t = store.enqueue({ type: 'install' }); + ctx.currentTaskId = t.id; + store.start(t.id); + const r = applyComplete(ctx, { + status: 'done', + handoff: { goals: 'g', did: 'added sdk', forNextAgent: 'env next' }, + }); + expect(r.ok).toBe(true); + expect(store.get(t.id)?.status).toBe('done'); + expect(store.readHandoff(t.id)?.did).toBe('added sdk'); + }); + + it('read_handoffs returns a dependency handoff for the running task', () => { + const dep = store.enqueue({ type: 'install' }); + store.start(dep.id); + store.complete(dep.id, { + goals: 'g', + did: 'installed', + forNextAgent: 'now init', + }); + const t = store.enqueue({ type: 'init', dependsOn: [dep.id] }); + ctx.currentTaskId = t.id; + + const handoffs = applyReadHandoffs(ctx, {}); + expect(handoffs).toHaveLength(1); + expect(handoffs[0].did).toBe('installed'); + }); +}); diff --git a/src/lib/programs/orchestrator/queue-tools.ts b/src/lib/programs/orchestrator/queue-tools.ts new file mode 100644 index 00000000..624d9b09 --- /dev/null +++ b/src/lib/programs/orchestrator/queue-tools.ts @@ -0,0 +1,253 @@ +/** + * Orchestrator MCP tools, registered into the existing `wizard-tools` server when + * a queue is present. They let the orchestrator agent and task agents grow the + * queue, report completion with a structured handoff, and read prior handoffs. + * + * The guard logic and the apply functions are plain, exported, and unit-tested. + * `buildOrchestratorTools` wraps them in the SDK `tool()` shape. + */ +import { z } from 'zod'; +import { analytics } from '../../../utils/analytics'; +import type { QueueStore, QueuedTask, TaskHandoff } from './queue'; + +export interface OrchestratorToolsContext { + store: QueueStore; + /** Task types the registry knows about. enqueue_task rejects anything else. */ + validTypes: readonly string[]; + /** + * The id of the task this tool server is bound to. Each task agent gets its + * own wizard-tools server, so attribution holds when independent tasks run + * in parallel. Absent for the seed, which is not a task. + */ + currentTaskId?: string; +} + +export interface EnqueueArgs { + type: string; + inputs?: Record; + dependsOn?: string[]; + model?: string; + reason: string; +} + +export type GuardResult = + | { ok: true } + | { ok: false; guard: string; message: string }; + +function stableStringify(value: unknown): string { + if (value === null || typeof value !== 'object') return JSON.stringify(value); + if (Array.isArray(value)) return `[${value.map(stableStringify).join(',')}]`; + const entries = Object.entries(value as Record).sort( + ([a], [b]) => a.localeCompare(b), + ); + return `{${entries + .map(([k, v]) => `${JSON.stringify(k)}:${stableStringify(v)}`) + .join(',')}}`; +} + +function dedupKey(type: string, inputs: Record): string { + return `${type}::${stableStringify(inputs)}`; +} + +/** + * Validate an enqueue. Structural checks only — a real type, real dependencies, + * and not a literal duplicate. How much runs, and in what shape, is the task + * graph's business, not a knob's. + */ +export function checkEnqueueGuards( + ctx: OrchestratorToolsContext, + args: EnqueueArgs, +): GuardResult { + const tasks = ctx.store.list(); + + if (!ctx.validTypes.includes(args.type)) { + return { + ok: false, + guard: 'unknown-type', + message: `Unknown task type "${ + args.type + }". Valid types: ${ctx.validTypes.join(', ')}.`, + }; + } + + for (const dep of args.dependsOn ?? []) { + if (!ctx.store.get(dep)) { + return { + ok: false, + guard: 'unknown-dep', + message: `Dependency "${dep}" is not a known task id.`, + }; + } + } + + const key = dedupKey(args.type, args.inputs ?? {}); + if ( + tasks.some( + (t) => t.status !== 'failed' && dedupKey(t.type, t.inputs) === key, + ) + ) { + return { + ok: false, + guard: 'dedup', + message: `A "${args.type}" task with these inputs already exists.`, + }; + } + + return { ok: true }; +} + +export type EnqueueResult = + | { ok: true; task: QueuedTask } + | { ok: false; guard: string; message: string }; + +export function applyEnqueue( + ctx: OrchestratorToolsContext, + args: EnqueueArgs, +): EnqueueResult { + const guard = checkEnqueueGuards(ctx, args); + if (!guard.ok) return guard; + + const task = ctx.store.enqueue({ + type: args.type, + inputs: args.inputs ?? {}, + dependsOn: args.dependsOn ?? [], + model: args.model, + enqueuedBy: ctx.currentTaskId ?? 'orchestrator', + }); + return { ok: true, task }; +} + +export type CompleteResult = { ok: true } | { ok: false; message: string }; + +export function applyComplete( + ctx: OrchestratorToolsContext, + args: { status: 'done' | 'failed' | 'skipped'; handoff: TaskHandoff }, +): CompleteResult { + const id = ctx.currentTaskId; + if (!id) { + return { + ok: false, + message: 'complete_task can only be called by a running task agent.', + }; + } + if (args.status === 'failed') { + ctx.store.fail( + id, + { type: 'self-reported', message: args.handoff.forNextAgent }, + args.handoff, + ); + } else if (args.status === 'skipped') { + ctx.store.skip(id, args.handoff); + } else { + ctx.store.complete(id, args.handoff); + } + return { ok: true }; +} + +export function applyReadHandoffs( + ctx: OrchestratorToolsContext, + args: { type?: string; taskId?: string }, +): TaskHandoff[] { + if (args.taskId) { + const h = ctx.store.readHandoff(args.taskId); + return h ? [h] : []; + } + if (args.type) { + return ctx.store.readHandoffsByType(args.type); + } + // No filter: every handoff of a dependency of the current task. + const currentId = ctx.currentTaskId; + const current = currentId ? ctx.store.get(currentId) : undefined; + if (!current) return []; + return current.dependsOn + .map((depId) => ctx.store.readHandoff(depId)) + .filter((h): h is TaskHandoff => h !== null); +} + +const HANDOFF_SHAPE = { + goals: z.string().describe('What this task was asked to achieve.'), + did: z.string().describe('What you actually did.'), + forNextAgent: z.string().describe('What the next agent should know.'), + filesTouched: z.array(z.string()).optional(), +}; + +type SdkTool = ( + name: string, + description: string, + // The SDK accepts a plain object of zod fields as the schema. + schema: Record, + handler: (args: never) => unknown, +) => unknown; + +function textResult(text: string, isError = false) { + return { isError, content: [{ type: 'text' as const, text }] }; +} + +/** + * Build the orchestrator tools in the SDK `tool()` shape. Called from + * createWizardToolsServer only when a queue context is present. + */ +export function buildOrchestratorTools( + tool: SdkTool, + ctx: OrchestratorToolsContext, +): unknown[] { + const enqueueTask = tool( + 'enqueue_task', + 'Add a task to the orchestrator queue. Use it to seed work and to enqueue follow-up work you discover. Keep tasks small and discrete.', + { + type: z + .string() + .describe(`The task type. One of: ${ctx.validTypes.join(', ')}.`), + inputs: z.record(z.unknown()).optional(), + dependsOn: z + .array(z.string()) + .optional() + .describe('Task ids that must be done before this task runs.'), + model: z.string().optional(), + reason: z.string().describe('One line on why this task is needed.'), + }, + ((args: EnqueueArgs) => { + const res = applyEnqueue(ctx, args); + if (!res.ok) { + analytics.wizardCapture('orchestrator guard tripped', { + guard: res.guard, + type: args.type, + }); + return textResult(res.message, true); + } + return textResult(JSON.stringify({ id: res.task.id })); + }) as (args: never) => unknown, + ); + + const completeTask = tool( + 'complete_task', + "Report the outcome of your task. Always call this exactly once when you finish, with a structured handoff for the next agent. Use status 'skipped' when the task does not apply to this project and you cannot do it (say why in the handoff) — not 'done'.", + { + status: z.enum(['done', 'failed', 'skipped']), + handoff: z.object(HANDOFF_SHAPE), + }, + ((args: { + status: 'done' | 'failed' | 'skipped'; + handoff: TaskHandoff; + }) => { + const res = applyComplete(ctx, args); + if (!res.ok) return textResult(res.message, true); + return textResult('ok'); + }) as (args: never) => unknown, + ); + + const readHandoffs = tool( + 'read_handoffs', + 'Read structured handoffs from earlier tasks. With no argument, returns the handoffs of your dependencies.', + { + type: z.string().optional(), + taskId: z.string().optional(), + }, + ((args: { type?: string; taskId?: string }) => { + const handoffs = applyReadHandoffs(ctx, args); + return textResult(JSON.stringify(handoffs, null, 2)); + }) as (args: never) => unknown, + ); + + return [enqueueTask, completeTask, readHandoffs]; +} diff --git a/src/lib/wizard-tools.ts b/src/lib/wizard-tools.ts index aae21ec5..302970eb 100644 --- a/src/lib/wizard-tools.ts +++ b/src/lib/wizard-tools.ts @@ -26,6 +26,10 @@ import { } from './programs/audit/types'; import type { WizardAskBridge } from './wizard-ask-bridge'; import { createSecretVault, type SecretVault } from './secret-vault'; +import { + buildOrchestratorTools, + type OrchestratorToolsContext, +} from './programs/orchestrator/queue-tools'; // --------------------------------------------------------------------------- // SDK dynamic import (ESM module loaded once, cached) @@ -203,6 +207,13 @@ export interface WizardToolsOptions { * (e.g. in unit tests), a fresh vault is created internally. */ secretVault?: SecretVault; + + /** + * Orchestrator queue context. Present only on the experimental `orchestrator` + * variant; when set, the orchestrator tools (enqueue_task, complete_task, + * read_handoffs) are registered. Absent on the linear path. + */ + orchestrator?: OrchestratorToolsContext; } /** Default per-run cap on wizard_ask calls when no override is provided. */ @@ -488,6 +499,7 @@ export async function createWizardToolsServer(options: WizardToolsOptions) { askBridge, askMaxQuestions = DEFAULT_ASK_MAX_QUESTIONS, secretVault = createSecretVault(), + orchestrator, } = options; const sdk = await getSDKModule(); const { tool, createSdkMcpServer } = sdk; @@ -1087,6 +1099,10 @@ export async function createWizardToolsServer(options: WizardToolsOptions) { // -- Assemble server ------------------------------------------------------ + const orchestratorTools = orchestrator + ? buildOrchestratorTools(tool, orchestrator) + : []; + return createSdkMcpServer({ name: SERVER_NAME, version: '1.0.0', @@ -1100,6 +1116,7 @@ export async function createWizardToolsServer(options: WizardToolsOptions) { auditAddChecks, auditResolveChecks, wizardAsk, + ...orchestratorTools, ], }); } @@ -1119,6 +1136,9 @@ export const WIZARD_TOOL_NAMES = { auditAddChecks: `mcp__${SERVER_NAME}__audit_add_checks`, auditResolveChecks: `mcp__${SERVER_NAME}__audit_resolve_checks`, wizardAsk: `mcp__${SERVER_NAME}__wizard_ask`, + enqueueTask: `mcp__${SERVER_NAME}__enqueue_task`, + completeTask: `mcp__${SERVER_NAME}__complete_task`, + readHandoffs: `mcp__${SERVER_NAME}__read_handoffs`, } as const; // --------------------------------------------------------------------------- From 3edb2a9c4841d2942aca4efc021307f528aee8d8 Mon Sep 17 00:00:00 2001 From: "Vincent (Wen Yu) Ge" Date: Mon, 8 Jun 2026 22:30:18 -0400 Subject: [PATCH 4/5] feat(orchestrator): executor drain-loop scheduler Co-Authored-By: Claude Opus 4.8 (1M context) --- .../orchestrator/__tests__/executor.test.ts | 150 ++++++++++++++++++ src/lib/programs/orchestrator/executor.ts | 112 +++++++++++++ 2 files changed, 262 insertions(+) create mode 100644 src/lib/programs/orchestrator/__tests__/executor.test.ts create mode 100644 src/lib/programs/orchestrator/executor.ts diff --git a/src/lib/programs/orchestrator/__tests__/executor.test.ts b/src/lib/programs/orchestrator/__tests__/executor.test.ts new file mode 100644 index 00000000..5665b9b2 --- /dev/null +++ b/src/lib/programs/orchestrator/__tests__/executor.test.ts @@ -0,0 +1,150 @@ +import * as fs from 'fs'; +import * as os from 'os'; +import * as path from 'path'; +import { + QueueStore, + type QueuedTask, + type TaskHandoff, +} from '@lib/programs/orchestrator/queue'; +import { drainQueue, type RunTask } from '@lib/programs/orchestrator/executor'; + +jest.mock('@utils/analytics', () => ({ + analytics: { captureException: jest.fn(), wizardCapture: jest.fn() }, +})); +import { analytics } from '@utils/analytics'; + +const HANDOFF: TaskHandoff = { goals: 'g', did: 'd', forNextAgent: 'n' }; + +function tmpDir(): string { + return fs.mkdtempSync(path.join(os.tmpdir(), 'executor-test-')); +} + +describe('drainQueue', () => { + let dir: string; + let q: QueueStore; + + beforeEach(() => { + dir = tmpDir(); + q = new QueueStore(dir, 'run-1'); + }); + + afterEach(() => fs.rmSync(dir, { recursive: true, force: true })); + + const completing: RunTask = (task) => { + q.complete(task.id, HANDOFF); + return Promise.resolve(); + }; + + it('runs a single task to done and drains', async () => { + const a = q.enqueue({ type: 'install' }); + await drainQueue(q, completing, { maxStarts: 50 }); + expect(q.get(a.id)?.status).toBe('done'); + expect(q.isDrained()).toBe(true); + }); + + it('runs a dependent task only after its dependency completes', async () => { + const order: string[] = []; + const a = q.enqueue({ type: 'install' }); + const b = q.enqueue({ type: 'init', dependsOn: [a.id] }); + const runner: RunTask = (task) => { + order.push(task.type); + q.complete(task.id, HANDOFF); + return Promise.resolve(); + }; + await drainQueue(q, runner, { maxStarts: 50 }); + expect(order).toEqual(['install', 'init']); + expect(q.get(b.id)?.status).toBe('done'); + }); + + it('runs independent branches concurrently; the graph is the only schedule', async () => { + let active = 0; + let maxActive = 0; + const runner: RunTask = async (task) => { + active += 1; + maxActive = Math.max(maxActive, active); + await new Promise((r) => setTimeout(r, 5)); + q.complete(task.id, HANDOFF); + active -= 1; + }; + const a = q.enqueue({ type: 'install' }); + const b = q.enqueue({ type: 'init' }); + q.enqueue({ type: 'capture', dependsOn: [a.id, b.id] }); + await drainQueue(q, runner, { maxStarts: 50 }); + // install and init overlap; capture waits for both. + expect(maxActive).toBe(2); + expect(q.summary().done).toBe(3); + }); + + it('starts a dependent the moment its dependency finishes, not in waves', async () => { + const startedAt: Record = {}; + let clock = 0; + const runner: RunTask = async (task) => { + startedAt[task.type] = clock++; + // slow holds the wave open; fast finishes early and unblocks after-fast. + const delay = task.type === 'slow' ? 30 : 5; + await new Promise((r) => setTimeout(r, delay)); + q.complete(task.id, HANDOFF); + }; + q.enqueue({ type: 'slow' }); + const fast = q.enqueue({ type: 'fast' }); + q.enqueue({ type: 'after-fast', dependsOn: [fast.id] }); + await drainQueue(q, runner, { maxStarts: 50 }); + // after-fast started while slow was still running. + expect(startedAt['after-fast']).toBeDefined(); + expect(q.summary().done).toBe(3); + }); + + it('retries a task that ends without reporting, then fails it', async () => { + const a = q.enqueue({ type: 'install', maxAttempts: 2 }); + const noReport: RunTask = async () => { + /* agent never calls complete_task */ + }; + await drainQueue(q, noReport, { maxStarts: 50 }); + expect(q.get(a.id)?.status).toBe('failed'); + expect(q.get(a.id)?.attempts).toBe(2); + }); + + it('succeeds on a retry within the attempt budget', async () => { + let calls = 0; + const a = q.enqueue({ type: 'install', maxAttempts: 3 }); + const flaky: RunTask = (task: QueuedTask) => { + calls += 1; + if (calls >= 2) q.complete(task.id, HANDOFF); + return Promise.resolve(); + }; + await drainQueue(q, flaky, { maxStarts: 50 }); + expect(q.get(a.id)?.status).toBe('done'); + expect(calls).toBe(2); + }); + + it('captures and fails a task whose runner throws', async () => { + const a = q.enqueue({ type: 'install', maxAttempts: 1 }); + const throwing: RunTask = () => Promise.reject(new Error('agent exploded')); + await drainQueue(q, throwing, { maxStarts: 50 }); + expect(q.get(a.id)?.status).toBe('failed'); + expect(analytics.captureException).toHaveBeenCalled(); + }); + + it('does not run a task whose dependency failed', async () => { + const a = q.enqueue({ type: 'install', maxAttempts: 1 }); + const b = q.enqueue({ type: 'init', dependsOn: [a.id] }); + const runner: RunTask = (task) => { + if (task.type === 'init') q.complete(task.id, HANDOFF); + // install never reports, so it fails after its single attempt. + return Promise.resolve(); + }; + await drainQueue(q, runner, { maxStarts: 50 }); + expect(q.get(a.id)?.status).toBe('failed'); + expect(q.get(b.id)?.status).toBe('pending'); + expect(q.isDrained()).toBe(true); + }); + + it('terminates via the start backstop instead of looping forever', async () => { + const a = q.enqueue({ type: 'install', maxAttempts: 999 }); + const neverReports: RunTask = async () => { + /* would retry forever without the backstop */ + }; + await drainQueue(q, neverReports, { maxStarts: 3 }); + expect(q.get(a.id)?.attempts).toBeLessThanOrEqual(3); + }); +}); diff --git a/src/lib/programs/orchestrator/executor.ts b/src/lib/programs/orchestrator/executor.ts new file mode 100644 index 00000000..45355cde --- /dev/null +++ b/src/lib/programs/orchestrator/executor.ts @@ -0,0 +1,112 @@ +/** + * The executor drains the queue. It starts every runnable task (dependencies + * satisfied) as soon as it becomes runnable — parallelism is decided by the + * task graph, not by an executor knob. Each task runs through an injected + * `runTask` function and reports its outcome via `complete_task`; a task that + * ends without reporting is retried while attempts remain, then failed. A + * `maxStarts` backstop guarantees termination. + * + * The drain loop is independent of how a task actually runs. `runTask` is + * injected: the real one spins up a fresh agent, the tests use a fake. + */ +import { analytics } from '../../../utils/analytics'; +import { logToFile } from '../../../utils/debug'; +import type { QueueStore, QueuedTask } from './queue'; + +/** Per-task agent configuration the resolver produces from a task's type. */ +export interface ResolvedTask { + model: string; + allowedTools: readonly string[]; + disallowedTools: readonly string[]; + /** Mini-skills to install before the task runs (the HOW). */ + skills: readonly string[]; + prompt: string; +} + +/** Resolves a queued task to what the agent needs. The real one is markdown-backed. */ +export type TaskResolver = ( + task: QueuedTask, + store: QueueStore, +) => ResolvedTask; + +/** Runs one task's agent. It is expected to drive the task to a terminal state + * (via the task agent calling complete_task). */ +export type RunTask = (task: QueuedTask) => Promise; + +export interface DrainOptions { + /** Backstop against a pathological always-one-more-pending loop. */ + maxStarts: number; +} + +export const DEFAULT_DRAIN_OPTIONS: DrainOptions = { + maxStarts: 200, +}; + +async function runOne( + store: QueueStore, + runTask: RunTask, + task: QueuedTask, +): Promise { + store.start(task.id); + try { + await runTask(task); + } catch (error) { + // The task threw rather than reporting. The outcome check below handles + // the queue; the exception itself should never be silent. + logToFile(`[executor] runTask threw for ${task.type}:`, error); + analytics.captureException( + error instanceof Error ? error : new Error(String(error)), + { step: 'orchestrator_run_task', task_type: task.type }, + ); + } + + const after = store.get(task.id); + if (!after) return; + + if (after.status === 'in_progress') { + // The agent ended without calling complete_task. Retry or fail. + if (after.attempts < after.maxAttempts) { + store.requeue(task.id); + } else { + store.fail(task.id, { + type: 'no-report', + message: 'Task ended without calling complete_task.', + }); + } + return; + } + + if (after.status === 'failed' && after.attempts < after.maxAttempts) { + store.requeue(task.id); + } +} + +/** + * Drain the queue to a terminal state. Every runnable task starts the moment + * its dependencies finish; independent branches run concurrently. Returns when + * every task is done, failed, or blocked by a failed dependency, or when the + * start backstop trips. + */ +export async function drainQueue( + store: QueueStore, + runTask: RunTask, + opts: DrainOptions = DEFAULT_DRAIN_OPTIONS, +): Promise { + const running = new Map>(); + let starts = 0; + + for (;;) { + for (const task of store.nextRunnable()) { + if (++starts > opts.maxStarts) break; + // runOne marks the task in_progress synchronously, so the next + // nextRunnable() call no longer offers it. + const p = runOne(store, runTask, task).finally(() => + running.delete(task.id), + ); + running.set(task.id, p); + } + if (running.size === 0) break; + // Wake on the first finish; it may have unblocked dependents or requeued. + await Promise.race(running.values()); + } +} From fea70a9e34c2bec14c0eb3e3c0bbdbbfddf5181b Mon Sep 17 00:00:00 2001 From: "Vincent (Wen Yu) Ge" Date: Tue, 9 Jun 2026 09:11:04 -0400 Subject: [PATCH 5/5] feat(orchestrator): walking skeleton runner Seed agent enqueues a parallel-branch task graph (install/init, then identify and plan-capture, then capture); the executor drains it one fresh agent per task as dry-run stubs; the TUI renders the queue. Co-Authored-By: Claude Opus 4.8 (1M context) --- src/lib/agent/agent-interface.ts | 8 +- src/lib/agent/agent-runner.ts | 9 +- .../orchestrator/orchestrator-runner.ts | 190 ++++++++++++++++-- .../programs/orchestrator/skeleton-prompts.ts | 45 +++++ 4 files changed, 234 insertions(+), 18 deletions(-) create mode 100644 src/lib/programs/orchestrator/skeleton-prompts.ts diff --git a/src/lib/agent/agent-interface.ts b/src/lib/agent/agent-interface.ts index 35c45d03..969cad33 100644 --- a/src/lib/agent/agent-interface.ts +++ b/src/lib/agent/agent-interface.ts @@ -265,6 +265,11 @@ export type AgentConfig = { getPendingQuestion?: () => | import('@lib/wizard-session').PendingQuestion | null; + /** + * Orchestrator queue context. Present only on the experimental `orchestrator` + * variant; threaded into wizard-tools so the orchestrator tools register. + */ + orchestrator?: import('@lib/programs/orchestrator/queue-tools').OrchestratorToolsContext; }; /** @@ -708,6 +713,7 @@ export async function initializeAgent( skillsBaseUrl: config.skillsBaseUrl, askBridge: config.askBridge, askMaxQuestions: config.askMaxQuestions, + orchestrator: config.orchestrator, }); mcpServers['wizard-tools'] = wizardToolsServer; @@ -747,8 +753,6 @@ export async function initializeAgent( }); } - getUI().log.step(`Verbose logs: ${getLogFilePath()}`); - getUI().log.success("Agent initialized. Let's get cooking!"); return agentRunConfig; } catch (error) { getUI().log.error( diff --git a/src/lib/agent/agent-runner.ts b/src/lib/agent/agent-runner.ts index da8eb018..b82867f0 100644 --- a/src/lib/agent/agent-runner.ts +++ b/src/lib/agent/agent-runner.ts @@ -45,7 +45,12 @@ import { getBlockingServiceKeys, SERVICE_LABELS, } from '@lib/health-checks/readiness'; -import { enableDebugLogs, initLogFile, logToFile } from '@utils/debug'; +import { + enableDebugLogs, + getLogFilePath, + initLogFile, + logToFile, +} from '@utils/debug'; import { createBenchmarkPipeline } from '@lib/middleware/benchmark'; import { wizardAbort, WizardError, registerCleanup } from '@utils/wizard-abort'; import { formatScanReport, writeScanReport } from '@lib/yara-hooks'; @@ -439,6 +444,8 @@ async function runLinearProgram( }, sessionToOptions(session), ); + getUI().log.step(`Verbose logs: ${getLogFilePath()}`); + getUI().log.success("Agent initialized. Let's get cooking!"); const middleware = session.benchmark ? createBenchmarkPipeline(spinner, sessionToOptions(session)) diff --git a/src/lib/programs/orchestrator/orchestrator-runner.ts b/src/lib/programs/orchestrator/orchestrator-runner.ts index f14fd9e6..659d979c 100644 --- a/src/lib/programs/orchestrator/orchestrator-runner.ts +++ b/src/lib/programs/orchestrator/orchestrator-runner.ts @@ -1,33 +1,193 @@ /** * Experimental task-queue orchestrator runner. * - * Branches from the linear runner when the `wizard-orchestrator` feature flag is - * on. The shape: an orchestrator agent inspects the repo and seeds an - * in-memory task queue, and an executor drains it one fresh agent per task. + * Branches from the linear runner when the `wizard-orchestrator` flag is on. An + * orchestrator agent inspects the repo and seeds an in-memory task queue; an + * executor drains it, running one fresh agent per task. * - * This is the stub. It logs, emits a start event, and returns. The queue, the - * executor, and the seeding agent land in the following issues. + * This is the walking skeleton. The seed agent plans a task graph, and each task + * is a dry-run stub that only reports a handoff, so the whole pipeline runs + * end-to-end without touching the user's project. Real per-task work, served from + * context-mill as `agents`, replaces the stub prompts later. */ -import type { WizardSession } from '../../wizard-session'; -import type { ProgramConfig } from '../program-step'; -import type { BootstrapResult } from '../../agent/agent-runner'; +import { randomUUID } from 'crypto'; +import { + initializeAgent, + runAgent, + type AgentConfig, +} from '../../agent/agent-interface'; +import { OutroKind, type WizardSession } from '../../wizard-session'; +import { detectNodePackageManagers } from '../../detection/package-manager'; import { getUI } from '../../../ui'; -import { logToFile } from '../../../utils/debug'; import { analytics } from '../../../utils/analytics'; +import { logToFile } from '../../../utils/debug'; +import type { ProgramConfig } from '../program-step'; +import type { BootstrapResult } from '../../agent/agent-runner'; +import type { WizardRunOptions } from '../../../utils/types'; +import { QueueStore, type TaskStatus } from './queue'; +import { drainQueue, type RunTask } from './executor'; +import { + SKELETON_TASK_TYPES, + buildSeedPrompt, + buildStubPrompt, +} from './skeleton-prompts'; + +/** The seed plans the graph, so it gets a stronger model; stub tasks are cheap. */ +const SEED_MODEL = 'claude-sonnet-4-6'; +const STUB_MODEL = 'claude-haiku-4-5-20251001'; +/** The skeleton never edits the user's project. */ +const NO_EDIT_TOOLS = ['Write', 'Edit', 'Bash'] as const; -export function runOrchestrator( +function toTodoStatus(status: TaskStatus): string { + switch (status) { + case 'in_progress': + return 'in_progress'; + case 'done': + case 'failed': + return 'completed'; + default: + return 'pending'; + } +} + +function sessionRunOptions(session: WizardSession): WizardRunOptions { + return { + installDir: session.installDir, + debug: session.debug, + default: false, + signup: session.signup, + localMcp: session.localMcp, + ci: session.ci, + benchmark: session.benchmark, + projectId: session.projectId, + apiKey: session.apiKey, + yaraReport: session.yaraReport, + }; +} + +export async function runOrchestrator( session: WizardSession, programConfig: ProgramConfig, - _boot: BootstrapResult, + boot: BootstrapResult, ): Promise { + const runId = randomUUID(); + const store = new QueueStore(session.installDir, runId); + + const options = sessionRunOptions(session); + logToFile( - `[orchestrator] START program=${programConfig.id} dir=${session.installDir}`, + `[orchestrator] START program=${programConfig.id} dir=${session.installDir} run=${runId}`, ); analytics.wizardCapture('orchestrator started', { program_id: programConfig.id, }); - getUI().log.info( - 'Orchestrator flag is on. This runner is a stub for now; the queue and executor land in the following issues.', + getUI().startRun(); + + const renderQueue = () => + getUI().syncTodos( + store.list().map((t) => ({ + content: t.type, + status: toTodoStatus(t.status), + activeForm: `Running ${t.type}`, + })), + ); + + // Each agent gets its own config so its wizard-tools server is bound to the + // task it runs — independent tasks run in parallel, and attribution of + // complete_task / enqueue_task must hold per agent. The seed is not a task, + // so its context has no task id. + const agentConfigFor = (currentTaskId?: string): AgentConfig => ({ + workingDirectory: session.installDir, + posthogMcpUrl: boot.mcpUrl, + posthogApiKey: boot.accessToken, + posthogApiHost: boot.host, + detectPackageManager: detectNodePackageManagers, + skillsBaseUrl: boot.skillsBaseUrl, + wizardFlags: boot.wizardFlags, + // Tag agent events as orchestrator so telemetry segments from the baseline. + wizardMetadata: { ...boot.wizardMetadata, VARIANT: 'orchestrator' }, + integrationLabel: programConfig.id, + orchestrator: { + store, + validTypes: SKELETON_TASK_TYPES, + currentTaskId, + }, + }); + + const spinner = getUI().spinner(); + + // 1. Seed the queue with the orchestrator agent. + const seedAgent = await initializeAgent(agentConfigFor(), options); + const seedResult = await runAgent( + { ...seedAgent, model: SEED_MODEL, disallowedTools: [...NO_EDIT_TOOLS] }, + buildSeedPrompt(), + options, + spinner, + { + spinnerMessage: 'Planning the integration...', + successMessage: 'Planned the integration', + additionalFeatureQueue: [], + }, ); - return Promise.resolve(); + if (seedResult.error) { + logToFile( + `[orchestrator] seed error: ${seedResult.error} ${ + seedResult.message ?? '' + }`, + ); + } + analytics.wizardCapture('orchestrator seeded', { + task_count: store.list().length, + types: store.list().map((t) => t.type), + }); + renderQueue(); + + // 2. Drain the queue, one fresh agent per task. Independent tasks run in + // parallel — the graph the seed planned is the only schedule. + const runTask: RunTask = async (task) => { + renderQueue(); + const agent = await initializeAgent(agentConfigFor(task.id), options); + try { + await runAgent( + { + ...agent, + model: task.model ?? STUB_MODEL, + disallowedTools: [...NO_EDIT_TOOLS], + }, + buildStubPrompt(task), + options, + spinner, + { + spinnerMessage: `Running ${task.type}...`, + successMessage: `${task.type} done`, + additionalFeatureQueue: [], + }, + ); + } finally { + renderQueue(); + } + }; + await drainQueue(store, runTask); + + renderQueue(); + + const summary = store.summary(); + logToFile( + `[orchestrator] DONE done=${summary.done} failed=${summary.failed} total=${summary.total}`, + ); + analytics.wizardCapture('orchestrator run finished', { + tasks_total: summary.total, + tasks_done: summary.done, + tasks_failed: summary.failed, + }); + + const message = `Orchestrator dry run finished: ${summary.done}/${summary.total} tasks completed.`; + getUI().setOutroData({ + kind: OutroKind.Success, + message, + reportFile: store.queuePath, + docsUrl: 'https://posthog.com/docs/ai-engineering/ai-wizard', + }); + getUI().outro(message); + await analytics.shutdown('success'); } diff --git a/src/lib/programs/orchestrator/skeleton-prompts.ts b/src/lib/programs/orchestrator/skeleton-prompts.ts new file mode 100644 index 00000000..71166338 --- /dev/null +++ b/src/lib/programs/orchestrator/skeleton-prompts.ts @@ -0,0 +1,45 @@ +/** + * Inline prompts for the walking skeleton. They prove the seed -> drain pipeline + * without touching the user's project: the seed agent plans a task graph, and each + * task is a dry-run stub that only reports a handoff. The real agent prompts live + * in context-mill as the `agents` content type and replace these later. + */ +import type { QueuedTask } from './queue'; + +/** Task types the skeleton seeds. Mirrors the real integration graph. */ +export const SKELETON_TASK_TYPES = [ + 'install', + 'init', + 'identify', + 'plan-capture', + 'capture', +] as const; + +export function buildSeedPrompt(): string { + return `You are the orchestrator for a PostHog integration. Right now your only job is to plan the work and seed a task queue. Do NOT install anything, edit any files, or integrate PostHog yourself. + +Take a quick look at the repository to confirm it is a real project. A brief glance, not a deep analysis. + +Then seed the queue by calling enqueue_task five times. Each call returns a JSON object with an "id". Capture those ids so you can wire dependencies: + +1. enqueue_task, type "install", no dependsOn. +2. enqueue_task, type "init", no dependsOn. install and init are independent and can run together. +3. enqueue_task, type "identify", dependsOn = [the install id, the init id]. +4. enqueue_task, type "plan-capture", dependsOn = [the install id, the init id]. identify and plan-capture are independent of each other. +5. enqueue_task, type "capture", dependsOn = [the plan-capture id]. + +After all five are enqueued, you are done. Do NOT call complete_task. You are the orchestrator, not a task.`; +} + +export function buildStubPrompt(task: QueuedTask): string { + return `You are a single, isolated task of type "${task.type}" in a PostHog integration. This is a DRY RUN: do not install anything, edit any files, or change the user's project in any way. + +Your only job is to report completion. Call complete_task exactly once with: +- status: "done" +- handoff: + - goals: one line on what a real "${task.type}" task would aim to do + - did: a one-line note that this was a dry-run stub, no changes made + - forNextAgent: anything the next task should know + +Then stop.`; +}