Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 40 additions & 0 deletions src/lib/agent/__tests__/agent-prompt-loader.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,46 @@ describe('resolveTask', () => {
'Context from previous steps',
);
});

it('includes transitive ancestors, not just direct dependencies', () => {
const registry = registryOf([prompt]);
// install -> capture -> (this task). The task depends only on capture, but
// install's context must still reach it so nothing is silently lost.
const install = store.enqueue({ type: 'install' });
store.complete(install.id, {
goals: 'declare the SDK',
did: 'added posthog to the manifest',
forNextAgent: 'SDK is declared, not yet installed',
});
const capture = store.enqueue({ type: 'capture', dependsOn: [install.id] });
store.complete(capture.id, {
goals: 'instrument events',
did: 'added capture calls',
forNextAgent: 'events are in',
});
const task = store.enqueue({ type: 'capture', dependsOn: [capture.id] });
const { prompt: out } = resolveTask(registry, task, store);
expect(out).toContain('added posthog to the manifest'); // transitive
expect(out).toContain('added capture calls'); // direct
});

it('lists each ancestor once for diamond dependencies', () => {
const registry = registryOf([prompt]);
const install = store.enqueue({ type: 'install' });
store.complete(install.id, {
goals: 'g',
did: 'manifest entry added',
forNextAgent: 'n',
});
const a = store.enqueue({ type: 'identify', dependsOn: [install.id] });
store.complete(a.id, { goals: 'g', did: 'a-did', forNextAgent: 'n' });
const b = store.enqueue({ type: 'identify', dependsOn: [install.id] });
store.complete(b.id, { goals: 'g', did: 'b-did', forNextAgent: 'n' });
// Resolved task must be a registered type (capture); its ancestors need not be.
const task = store.enqueue({ type: 'capture', dependsOn: [a.id, b.id] });
const { prompt: out } = resolveTask(registry, task, store);
expect(out.match(/manifest entry added/g)).toHaveLength(1);
});
});

describe('taskModel', () => {
Expand Down
35 changes: 29 additions & 6 deletions src/lib/agent/agent-prompt-loader.ts
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ function commandmentsReference(ctx: OrchestratorPromptContext): string | null {
return `Framework rules for this integration are at \`${ctx.commandmentsPath}\`. Read them before you edit and follow them.`;
}

const TASK_BASICS = `You are one isolated task in a larger PostHog workflow, run as a fresh agent with no memory of the other tasks beyond the context you are given. Do only your task, then report exactly once by calling complete_task with a structured handoff: what your goal was, what you did, and what the next agent should know. When you are given context from previous steps, trust it — those agents already did their work, so do not re-verify or re-read what their handoffs tell you. Build on it and move fast. Read a file before you edit it, so your own changes do not duplicate what is already there. Work only within this project's own directory; nothing outside it is part of your task. If your task does not apply to this project — there is genuinely nothing for it to do — report it with status \`skipped\` and say why, rather than marking it done.`;
const TASK_BASICS = `You are one isolated task in a larger PostHog workflow, run as a fresh agent with no memory of the other tasks beyond the context you are given. Do only your task, then report exactly once by calling complete_task with a structured handoff: what your goal was, what you did, and what the next agent should know. When you are given context from previous steps, trust it — those agents already did their work, so do not re-verify or re-read what their handoffs tell you. Build on it and move fast. Read a file before you edit it, so your own changes do not duplicate what is already there. Work only inside this project's own directory: never read, list, or search (find, ls, grep, glob) outside it — not the OS, not other projects, not global package caches. If your task seems to need something outside this directory, it does not — skip that part and say so in your handoff rather than hunting across the filesystem. If your task does not apply to this project — there is genuinely nothing for it to do — report it with status \`skipped\` and say why, rather than marking it done.`;

const SEED_BASICS = `You are the orchestrator. Plan the work and seed the queue with enqueue_task — each call returns an id you can pass as a dependency to a later task. Give each task a short label for the UI — the action in a few words, not file names, class names, or other specifics. You are not a task yourself: do not call complete_task and do not edit the project.`;

Expand Down Expand Up @@ -277,14 +277,37 @@ function formatInputValue(value: unknown): string {
}

/**
* Render the handoffs of a task's completed dependencies into a context section,
* so a fresh agent sees what the upstream steps did. Empty when there are none.
* The ids of every task `task` transitively depends on — the full upstream
* chain, not just direct dependencies — ordered roots-first, each once. A `seen`
* set dedupes diamonds and guards against cycles.
*/
function ancestorIds(task: QueuedTask, store: QueueStore): string[] {
const seen = new Set<string>();
const ordered: string[] = [];
const visit = (id: string): void => {
if (seen.has(id)) return;
seen.add(id);
const t = store.get(id);
if (!t) return;
for (const dep of t.dependsOn) visit(dep); // ancestors before dependents
ordered.push(id);
};
for (const dep of task.dependsOn) visit(dep);
return ordered;
}

/**
* Render the handoffs of every step `task` transitively depends on into a context
* section, so a fresh agent sees the whole upstream chain — not just its direct
* dependencies. Reliability over token economy: a step must never have to
* re-discover what any ancestor already established just because an intermediate
* handoff happened to omit it. Empty when there are no completed ancestors.
*/
function renderHandoffContext(task: QueuedTask, store: QueueStore): string {
const lines: string[] = [];
for (const depId of task.dependsOn) {
const dep = store.get(depId);
const handoff = store.readHandoff(depId);
for (const id of ancestorIds(task, store)) {
const dep = store.get(id);
const handoff = store.readHandoff(id);
if (!dep || !handoff) continue;
lines.push(`### ${dep.type}`);
lines.push(`- did: ${handoff.did}`);
Expand Down
10 changes: 10 additions & 0 deletions src/lib/programs/orchestrator/__tests__/queue.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import * as os from 'os';
import * as path from 'path';
import {
QueueStore,
QUEUE_DIR_NAME,
type QueueFile,
type TaskHandoff,
} from '@lib/programs/orchestrator/queue';
Expand All @@ -28,6 +29,15 @@ describe('QueueStore', () => {
fs.rmSync(dir, { recursive: true, force: true });
});

it('drops a self-explaining .DELETE-ME.md in the cache folder', () => {
const note = fs.readFileSync(
path.join(dir, QUEUE_DIR_NAME, '.DELETE-ME.md'),
'utf8',
);
expect(note).toContain('safely delete');
expect(note).toContain(`${QUEUE_DIR_NAME}/`);
});

it('enqueues a pending task with defaults', () => {
const t = q.enqueue({ type: 'install' });
expect(t.status).toBe('pending');
Expand Down
68 changes: 68 additions & 0 deletions src/lib/programs/orchestrator/__tests__/run-metrics.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
import { RunMetrics } from '@lib/programs/orchestrator/run-metrics';

describe('RunMetrics', () => {
it('reports time to first start and first completion from run start', () => {
const m = new RunMetrics(0);
m.recordStart(100);
m.recordComplete(300);
m.recordStart(1000);
m.recordComplete(1100);
const s = m.summary();
expect(s.time_to_first_task_ms).toBe(100);
expect(s.time_to_first_completion_ms).toBe(300);
});

it('max_gap_ms is the longest silence across all visible transitions', () => {
const m = new RunMetrics(0);
m.recordStart(100); // visible @100
m.recordComplete(300); // gap 200
m.recordStart(1000); // gap 700 ← longest
m.recordComplete(1100); // gap 100
expect(m.summary().max_gap_ms).toBe(700);
});

it('recordStart returns ms_since_run_start and the gap from the previous start', () => {
const m = new RunMetrics(0);
expect(m.recordStart(100)).toEqual({
ms_since_run_start: 100,
gap_since_prev_start_ms: undefined,
});
expect(m.recordStart(1000)).toEqual({
ms_since_run_start: 1000,
gap_since_prev_start_ms: 900,
});
});

it('reports undefined timings for a run with no transitions, not zero', () => {
const s = new RunMetrics(0).summary();
expect(s.time_to_first_task_ms).toBeUndefined();
expect(s.time_to_first_completion_ms).toBeUndefined();
expect(s.max_gap_ms).toBeUndefined();
});

it('a single started-but-unfinished task reports a real zero gap and no completion', () => {
const m = new RunMetrics(0);
m.recordStart(50);
const s = m.summary();
expect(s.time_to_first_task_ms).toBe(50);
expect(s.time_to_first_completion_ms).toBeUndefined();
expect(s.max_gap_ms).toBe(0); // one visible transition → genuine 0, not undefined
});

it('counts a retry stall (start to re-start) as silence', () => {
const m = new RunMetrics(0);
m.recordStart(0);
// the task ended without reporting and was requeued (invisible), then
// re-started 5s later — that stall is a silence the user sees.
m.recordStart(5000);
expect(m.summary().max_gap_ms).toBe(5000);
});

it('treats skip and fail as visible transitions for gap tracking', () => {
const m = new RunMetrics(0);
m.recordStart(0);
m.recordTerminal(2000); // skip or fail, gap 2000
m.recordStart(2500); // gap 500
expect(m.summary().max_gap_ms).toBe(2000);
});
});
81 changes: 60 additions & 21 deletions src/lib/programs/orchestrator/orchestrator-runner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import {
} from '../../agent/agent-interface';
import { OutroKind, type WizardSession } from '../../wizard-session';
import { detectNodePackageManagers } from '../../detection/package-manager';
import { installSkillById } from '../../wizard-tools';
import { installSkillById, fetchSkillMenu } from '../../wizard-tools';
import { getUI } from '../../../ui';
import { analytics } from '../../../utils/analytics';
import { ciExcludedTaskTypes } from '../../../utils/ci-flag-overrides';
Expand All @@ -35,6 +35,7 @@ import {
type QueuedTask,
} from './queue';
import { drainQueue, type RunTask } from './executor';
import { RunMetrics } from './run-metrics';
import {
agentRunTools,
assembleSeedPrompt,
Expand Down Expand Up @@ -74,6 +75,27 @@ function sessionRunOptions(session: WizardSession): WizardRunOptions {
};
}

/**
* The framework reference is the full `integration` skill. `session.skillId` is
* the bare framework (e.g. `django`), but the skill menu ids it as
* `integration-<variant>`. Resolve to the menu id: exact `integration-<framework>`
* (the 1:1 frameworks — django, python, flask, …), else the first granular variant
* under it (e.g. `integration-nextjs-app-router`). Undefined when none exists.
*/
async function resolveReferenceSkillId(
skillsBaseUrl: string,
framework: string,
): Promise<string | undefined> {
const menu = await fetchSkillMenu(skillsBaseUrl);
if (!menu) return undefined;
const ids = Object.values(menu.categories)
.flat()
.map((s) => s.id);
const exact = `integration-${framework}`;
if (ids.includes(exact)) return exact;
return ids.find((id) => id.startsWith(`integration-${framework}-`));
}

Comment thread
gewenyu99 marked this conversation as resolved.
export async function runOrchestrator(
session: WizardSession,
programConfig: ProgramConfig,
Expand Down Expand Up @@ -107,8 +129,7 @@ export async function runOrchestrator(
// queue transitions, with the resolved model so cheap work is attributable
// to cheap models.
const runStartMs = Date.now();
let firstStartMs: number | undefined;
let lastStartMs: number | undefined;
const metrics = new RunMetrics(runStartMs);
const durationMs = (t: QueuedTask) =>
t.startedAt && t.finishedAt
? Date.parse(t.finishedAt) - Date.parse(t.startedAt)
Expand All @@ -129,31 +150,28 @@ export async function runOrchestrator(
dynamic: task.enqueuedBy !== 'orchestrator',
});
break;
case 'start': {
const now = Date.now();
case 'start':
analytics.wizardCapture('orchestrator task started', {
...base,
ms_since_run_start: now - runStartMs,
gap_since_prev_start_ms:
lastStartMs === undefined ? undefined : now - lastStartMs,
...metrics.recordStart(Date.now()),
});
firstStartMs ??= now;
lastStartMs = now;
break;
}
case 'complete':
metrics.recordComplete(Date.now());
analytics.wizardCapture('orchestrator task completed', {
...base,
duration_ms: durationMs(task),
});
break;
case 'skip':
metrics.recordTerminal(Date.now());
analytics.wizardCapture('orchestrator task skipped', {
...base,
duration_ms: durationMs(task),
});
break;
case 'fail':
metrics.recordTerminal(Date.now());
analytics.wizardCapture('orchestrator task failed', {
...base,
duration_ms: durationMs(task),
Expand All @@ -172,9 +190,12 @@ export async function runOrchestrator(
// skill — only the example file is read, when the agent's prompt points at it.
let examplePath: string | undefined;
let commandmentsPath: string | undefined;
if (session.skillId) {
const referenceSkillId = session.skillId
? await resolveReferenceSkillId(boot.skillsBaseUrl, session.skillId)
: undefined;
if (referenceSkillId) {
const ref = await installSkillById(
session.skillId,
referenceSkillId,
session.installDir,
boot.skillsBaseUrl,
path.join(QUEUE_DIR_NAME, 'reference'),
Expand All @@ -189,8 +210,14 @@ export async function runOrchestrator(
commandmentsPath = commandments;
}
} else {
logToFile(`[orchestrator] reference example unavailable: ${ref.kind}`);
logToFile(
`[orchestrator] reference unavailable: ${ref.kind} (${referenceSkillId})`,
);
}
} else if (session.skillId) {
logToFile(
`[orchestrator] no integration skill for framework "${session.skillId}"`,
);
}

// The client injects the basics (project context + the I/O contract) around
Expand Down Expand Up @@ -353,11 +380,20 @@ export async function runOrchestrator(
try {
await drainQueue(store, runTask);
} finally {
// Success or failure, the installed task instructions never outlive the run.
rmSync(path.join(session.installDir, taskSkillsRoot), {
recursive: true,
force: true,
});
// Success or failure, no run artifact outlives the run — wipe the whole
// cache folder (queue, handoffs, reference example, installed task
// instructions). The .DELETE-ME.md inside is the fallback if we don't.
try {
rmSync(path.join(session.installDir, QUEUE_DIR_NAME), {
recursive: true,
force: true,
});
} catch (err) {
analytics.captureException(
err instanceof Error ? err : new Error(String(err)),
{ step: 'orchestrator_cache_cleanup' },
);
}
}

renderQueue();
Expand All @@ -372,8 +408,11 @@ export async function runOrchestrator(
tasks_failed: summary.failed,
tasks_skipped: summary.skipped,
total_duration_ms: Date.now() - runStartMs,
time_to_first_task_ms:
firstStartMs === undefined ? undefined : firstStartMs - runStartMs,
...metrics.summary(),
dynamic_enqueue_count: store
.list()
.filter((t) => t.enqueuedBy !== 'orchestrator').length,
retried_task_count: store.list().filter((t) => t.attempts > 1).length,
});

// The build step flags any unresolved conflict in its handoff; surface the
Expand Down
Loading
Loading