From ffef0aad6a932ccfe3c973ca77328efd8d12656a Mon Sep 17 00:00:00 2001 From: gsxdsm Date: Mon, 15 Jun 2026 13:15:38 -0700 Subject: [PATCH 1/7] =?UTF-8?q?docs(solutions):=20ACP=20bridge=20'Not=20lo?= =?UTF-8?q?gged=20in'=20=E2=80=94=20thin=20spawn=20env=20+=20keychain=20se?= =?UTF-8?q?ssion=20isolation?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Compound learning: the claude-code-cli-acp bridge returned 'Not logged in' despite a working claude -p, due to (1) a too-thin spawn env (needs XDG_*/USER/ SHELL beyond HOME/PATH) and (2) macOS login-Keychain session isolation for detached/headless processes. Six headless tasks misdiagnosed it as an upstream gap. Cross-linked from the ACP runtime integration pattern doc. Co-Authored-By: Claude Opus 4.8 --- ...stent-jsonrpc-agent-runtime-integration.md | 2 +- ...t-logged-in-thin-env-keychain-isolation.md | 88 +++++++++++++++++++ 2 files changed, 89 insertions(+), 1 deletion(-) create mode 100644 docs/solutions/integration-issues/acp-bridge-not-logged-in-thin-env-keychain-isolation.md diff --git a/docs/solutions/architecture-patterns/acp-persistent-jsonrpc-agent-runtime-integration.md b/docs/solutions/architecture-patterns/acp-persistent-jsonrpc-agent-runtime-integration.md index 4a12845a97..55d167b2b1 100644 --- a/docs/solutions/architecture-patterns/acp-persistent-jsonrpc-agent-runtime-integration.md +++ b/docs/solutions/architecture-patterns/acp-persistent-jsonrpc-agent-runtime-integration.md @@ -42,7 +42,7 @@ related_components: - **Per-category permission gating, never per-preset.** The shipped default policy preset is `unrestricted` (every category → allow). A preset-level shortcut auto-approves everything the moment the runtime is selected. Classify each call's kind into a category and read `permissionPolicy.rules[category]`; add an explicit acknowledgement setting before honoring blanket allows on sensitive categories. - Select `allow_once` only — never `allow_always`/`reject_always` (a persisted grant inside untrusted code loses per-call interception). Unmappable/missing kinds, missing gate/policy, and HITL-without-a-readable-decision all default-deny. Require **both** `pauseForApproval` AND `findApprovalByDedupeKey` before creating an approval request — otherwise a human approval is silently discarded and a pending record is orphaned. - **Filesystem jail = realpath, not string checks.** `project-root-guard.ts` is a suffix check, not a jail. Use realpath-within-realpath(cwd), `lstat` the final component for new files, `O_NOFOLLOW` open, and **truncate only after post-open re-validation** (passing `O_TRUNC` into open() truncates an escaped target before validation — write-path TOCTOU). Deny-list secrets and `.git/**` by basename regardless of cwd membership. Stat-gate reads (a full `readFile` before a byte ceiling is an OOM vector). -- **Bound everything the agent emits**, including the channels that don't look like output: per-turn + per-chunk caps on text/thinking, ANSI/control stripping, bounded identifier lengths and correlation maps, and **plan/structured events** (entry size was bounded but entry *count* wasn't — 1,000 × 64KB entries bypassed the per-turn budget). Redact stderr across chunk boundaries, not per-chunk (secrets split across `data` events evade per-chunk regexes). Build the subprocess env from an allow-list, never inherited `process.env`. +- **Bound everything the agent emits**, including the channels that don't look like output: per-turn + per-chunk caps on text/thinking, ANSI/control stripping, bounded identifier lengths and correlation maps, and **plan/structured events** (entry size was bounded but entry *count* wasn't — 1,000 × 64KB entries bypassed the per-turn budget). Redact stderr across chunk boundaries, not per-chunk (secrets split across `data` events evade per-chunk regexes). Build the subprocess env from an allow-list, never inherited `process.env` — but make the list **complete**: a thin `{HOME,PATH}` starves agent CLIs of the vars they use to find auth (`XDG_CONFIG_HOME`/`XDG_CACHE_HOME`/`USER`/`SHELL`/`LANG`), and even a correct env can't beat macOS login-Keychain session isolation for detached daemons. See `integration-issues/acp-bridge-not-logged-in-thin-env-keychain-isolation.md`. **4. Per-turn bridge state must actually reset per turn.** Anything accumulated per "turn" (output budgets, cap-flag latches, tool-call correlation maps) needs an explicit `reset()` invoked at the top of each prompt — a latch that never resets silently suppresses all output for the rest of the session after one flood. Write a two-turns-through-the-same-handler test; single-turn tests cannot catch it. diff --git a/docs/solutions/integration-issues/acp-bridge-not-logged-in-thin-env-keychain-isolation.md b/docs/solutions/integration-issues/acp-bridge-not-logged-in-thin-env-keychain-isolation.md new file mode 100644 index 0000000000..475ccc457c --- /dev/null +++ b/docs/solutions/integration-issues/acp-bridge-not-logged-in-thin-env-keychain-isolation.md @@ -0,0 +1,88 @@ +--- +title: "ACP bridge returns 'Not logged in' despite a working claude -p: thin spawn env + Keychain session isolation" +date: 2026-06-15 +category: integration-issues +module: pi-claude-cli +problem_type: integration_issue +component: tooling +symptoms: + - "ACP-bridged turns return the literal assistant text 'Not logged in · Please run /login' instead of real answers" + - "claude -p \"say hi\" works in the same shell while the bridge fails" + - "A verification harness forwarding only HOME and PATH fails even inside an authenticated terminal" + - "Reproducible under detached/headless runners (launchd daemon, autonomous task runner) but not interactively" +root_cause: incomplete_setup +resolution_type: code_fix +severity: high +tags: [acp, claude-code, keychain, spawn-env, authentication, macos, pi-claude-cli] +related_components: [authentication, tooling] +--- + +# ACP bridge returns 'Not logged in' despite a working claude -p: thin spawn env + Keychain session isolation + +## Problem + +The `claude-code-cli-acp` ACP bridge — driven by Fusion's `pi-claude-cli` provider to replace `claude -p` — returned the assistant text **"Not logged in · Please run /login"** instead of real answers, even though `claude -p "say hi"` succeeded in the same shell. The cause was environmental, not an upstream bridge limitation: a thin spawn env starved `claude` of the variables it needs to locate its auth, and macOS Keychain session isolation blocked headless processes from reading the login Keychain at all. + +## Symptoms + +- ACP-bridged turns return the literal text `Not logged in · Please run /login` (no tool calls, no real content), while `claude -p "say hi"` works in the same interactive shell. +- A verification harness that forwarded only `{HOME, PATH}` to the bridge failed **even inside an authenticated terminal**, falsely implying the auth itself was broken. +- The failure is reproducible in detached/headless contexts (launchd daemon, autonomous task-runner subprocess) but not in interactive ones — "works when I run it, fails when the daemon runs it." +- `~/.claude/.credentials.json` exists but is an empty **directory**, making file-based credential debugging a dead end. + +## What Didn't Work + +- **Six autonomous headless task attempts** (FN-6466/6467/6473/6476) re-ran the bridge spike, each hit "Not logged in," concluded **NOT-GO**, and even filed upstream issue `moabualruz/claude-code-cli-acp#2` — misattributing an environmental problem to an upstream bridge gap. +- **A `{HOME, PATH}`-only verification harness** kept failing in an authenticated terminal. Because it failed where auth was known-good, it masked that the *env*, not the *auth state*, was wrong — and reinforced the wrong conclusion across every retry. +- **Re-running `claude` / `claude --print` to "re-auth"** — print mode is non-interactive and cannot perform interactive OAuth login, so this could never repair the session. + +## Solution + +Two changes, one per root cause. + +**1. Forward the full env allow-list when spawning the bridge.** Build the bridge subprocess env from an explicit allow-list (never inherited `process.env`, never API keys), and make that list *complete* — not just `{HOME, PATH}`. + +`packages/pi-claude-cli/src/acp-driver.ts`: + +```ts +const BRIDGE_ENV_ALLOWLIST = [ + "HOME", "PATH", "USER", "LOGNAME", "SHELL", "LANG", "LC_ALL", "LC_CTYPE", + "TERM", "TERMINFO", "TMPDIR", "XDG_CONFIG_HOME", "XDG_CACHE_HOME", "COLORTERM", +]; + +function buildBridgeEnv(supplied?: NodeJS.ProcessEnv): NodeJS.ProcessEnv { + const source = supplied ?? process.env; + const env: NodeJS.ProcessEnv = {}; + for (const key of BRIDGE_ENV_ALLOWLIST) { + const v = source[key]; + if (typeof v === "string") env[key] = v; + } + return env; +} +// spawn(options.bridgePath, [], { ..., env: buildBridgeEnv(options.bridgeEnv) }) +``` + +The critical additions over a naive `{HOME, PATH}` env are **`XDG_CONFIG_HOME`, `XDG_CACHE_HOME`, `USER`, `SHELL`, `LANG`**. With the full list, auth succeeds immediately. + +**2. The Keychain finding (gate R17).** Claude Code stores its OAuth credentials in the macOS **login Keychain** as a generic-password item (service `"Claude Code-credentials"`), *not* a file (`~/.claude/.credentials.json` is an empty directory). A detached/headless process runs in a **different security session** and cannot read the login Keychain, so it fails regardless of env; a login-session process (interactive terminal, or an `fn` daemon launched from a login shell) can. This is codified as gate **R17**: the provider's runtime must have login-Keychain access. The driver also detects a not-logged-in turn and writes a best-effort cross-process signal (`fusion-acp-bridge-auth.json`) that `GET /providers/claude-cli/status` reads, so the dashboard can raise an auth-failure banner with a "Use `claude -p`" fallback. + +## Why This Works + +Two independent environmental causes were compounding, which is why the failure looked like a flaky upstream bug: + +1. **Thin spawn env (the silent one).** `claude` resolves config/auth through more than `{HOME, PATH}` — it reads `XDG_CONFIG_HOME`/`XDG_CACHE_HOME` for config locations and relies on `USER`/`SHELL`/`LANG` for session and locale context. Spawned with only `{HOME, PATH}` it can't locate its auth context and reports "Not logged in." The `{HOME,PATH}`-only harness reproduced this *even in an authenticated terminal*, which is exactly why it misdirected six investigations: it "proved" the bridge couldn't auth using a starved env. + +2. **macOS Keychain session isolation.** Even with a perfect env, the login Keychain is bound to the login security session. Interactive terminals (and daemons started from a login shell) share that session and can read the `"Claude Code-credentials"` item; detached launchd daemons and autonomous subprocesses run in a separate session and cannot. Same machine, same credentials, different security session — the precise reason `claude -p` worked interactively while the headless tasks failed. + +## Prevention + +- **When spawning an agent CLI as a subprocess, forward the full env allow-list, not a thin `{HOME, PATH}`.** Agent CLIs resolve auth/config through `XDG_CONFIG_HOME`, `XDG_CACHE_HOME`, `USER`, `SHELL`, and locale vars. Keep the allow-list explicit (no inherited `process.env`, no API keys) but make it *complete*. +- **Never trust a verification harness that uses a thinner env than the real spawn path.** A harness that forwards fewer vars than production manufactures failures and masks the real cause. Match the production allow-list exactly, or the harness lies. +- **Treat "works interactively but fails headless" as a session/Keychain problem first.** On macOS, OAuth/login credentials live in the session-bound login Keychain. A detached daemon or autonomous task-runner is in a different security session and cannot read them — no amount of env or file fiddling fixes that. Ask "is this process in the login session?" before assuming the tool is broken. +- **Headless daemons need an explicit credential-delivery story.** Don't assume a daemon inherits interactive credentials. Either launch it from a login shell/session or provide credentials through a session-independent channel, and encode it as a runtime gate (here, R17) so it's checked rather than rediscovered. +- **Don't let autonomous/headless task-runners conclude "impossible" or file upstream issues from a single un-isolated failure.** Six runs reached NOT-GO and an upstream issue from one un-diagnosed environmental cause. Require an environmental-isolation step (interactive vs. headless, full vs. thin env) before declaring an integration unworkable. + +## Related Issues + +- `docs/solutions/architecture-patterns/acp-persistent-jsonrpc-agent-runtime-integration.md` — the ACP runtime integration pattern. Its §3 rule "build the subprocess env from an allow-list, never inherited `process.env`" is the principle this doc operationalizes; this doc is its concrete failure mode (allow-list too thin → "Not logged in") plus the Keychain-isolation dimension that pattern doc does not cover. +- Upstream `moabualruz/claude-code-cli-acp#2` — filed during the failed investigation; the issue is environmental (this doc), not an upstream bridge gap. From b83210b47194ab0bd92b04d082c30d1c371403e8 Mon Sep 17 00:00:00 2001 From: gsxdsm Date: Mon, 15 Jun 2026 13:19:10 -0700 Subject: [PATCH 2/7] test(acp): cover the claude-cli status acp block + auth-failure signal (U12) GET /providers/claude-cli/status: asserts acp.{enabled,bridgeAvailable,active, authFailed,authReason} reflect the FUSION_CLAUDE_ACP env + the bridge auth-failure signal file, and that acp is inactive/clean when no bridge path is published. Co-Authored-By: Claude Opus 4.8 --- .../src/__tests__/routes-auth.test.ts | 44 +++++++++++++++++++ 1 file changed, 44 insertions(+) diff --git a/packages/dashboard/src/__tests__/routes-auth.test.ts b/packages/dashboard/src/__tests__/routes-auth.test.ts index e392079ce7..48d04cfae6 100644 --- a/packages/dashboard/src/__tests__/routes-auth.test.ts +++ b/packages/dashboard/src/__tests__/routes-auth.test.ts @@ -972,6 +972,50 @@ describe("GET /providers/claude-cli/status", () => { expect(res.body.binary).toMatchObject({ available: true, version: "claude 1.0.0" }); expect(res.body.extension).toMatchObject({ status: "ok" }); }); + + it("surfaces ACP transport state + the bridge auth-failure signal", async () => { + const probeSpy = vi.spyOn(claudeCliProbeModule, "probeClaudeCli").mockResolvedValue({ + available: true, version: "claude 1.0.0", probeDurationMs: 10, + }); + const signalPath = join(tmpdir(), "fusion-acp-bridge-auth.json"); + const prevBridge = process.env.FUSION_CLAUDE_ACP_BRIDGE; + const prevFlag = process.env.FUSION_CLAUDE_ACP; + process.env.FUSION_CLAUDE_ACP_BRIDGE = "/abs/node_modules/.bin/claude-code-cli-acp"; + process.env.FUSION_CLAUDE_ACP = "1"; + writeFileSync(signalPath, JSON.stringify({ authFailed: true, reason: "Not logged in" })); + try { + const res = await GET(buildApp(), "/api/providers/claude-cli/status"); + expect(res.status).toBe(200); + expect(res.body.acp).toMatchObject({ enabled: true, bridgeAvailable: true, active: true, authFailed: true }); + expect(res.body.acp.authReason).toContain("Not logged in"); + } finally { + probeSpy.mockRestore(); + rmSync(signalPath, { force: true }); + if (prevBridge === undefined) delete process.env.FUSION_CLAUDE_ACP_BRIDGE; + else process.env.FUSION_CLAUDE_ACP_BRIDGE = prevBridge; + if (prevFlag === undefined) delete process.env.FUSION_CLAUDE_ACP; + else process.env.FUSION_CLAUDE_ACP = prevFlag; + } + }); + + it("reports acp inactive + no auth failure when the bridge isn't published", async () => { + const probeSpy = vi.spyOn(claudeCliProbeModule, "probeClaudeCli").mockResolvedValue({ + available: true, version: "claude 1.0.0", probeDurationMs: 10, + }); + const prevBridge = process.env.FUSION_CLAUDE_ACP_BRIDGE; + delete process.env.FUSION_CLAUDE_ACP_BRIDGE; + rmSync(join(tmpdir(), "fusion-acp-bridge-auth.json"), { force: true }); + try { + const res = await GET(buildApp(), "/api/providers/claude-cli/status"); + expect(res.body.acp.bridgeAvailable).toBe(false); + expect(res.body.acp.active).toBe(false); + expect(res.body.acp.authFailed).toBe(false); + } finally { + probeSpy.mockRestore(); + if (prevBridge === undefined) delete process.env.FUSION_CLAUDE_ACP_BRIDGE; + else process.env.FUSION_CLAUDE_ACP_BRIDGE = prevBridge; + } + }); }); describe("Droid CLI auth routes", () => { From d217125a90b8b71e2abbb51d96ffb7c40becff32 Mon Sep 17 00:00:00 2001 From: gsxdsm Date: Mon, 15 Jun 2026 13:26:15 -0700 Subject: [PATCH 3/7] feat(acp): wire ACP token usage (OQ3) + opt-in headless auth (R17) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Item 2 (OQ3): capture PromptResponse.usage from conn.prompt and feed it into the bridge before finish(), so ACP-path turns report token usage/cost instead of always zero. Zero-when-absent is safe; tool-use (break-early) turns inherently report zero (the prompt result never resolves). - Item 3 (R17): opt-in headless credential delivery. When FUSION_CLAUDE_ACP_FORWARD_AUTH=1, buildBridgeEnv forwards a SINGLE Claude auth token (CLAUDE_CODE_OAUTH_TOKEN > ANTHROPIC_AUTH_TOKEN > ANTHROPIC_API_KEY) from the operator's launch env so a detached daemon (no login Keychain) can authenticate. Default OFF — the secure no-secrets posture is unchanged. acp-driver tests 9/9 (usage + the three auth-opt-in cases); typecheck clean. Remaining: item 1 (connection reuse / resume latency). Co-Authored-By: Claude Opus 4.8 --- .../src/__tests__/acp-driver.test.ts | 51 +++++++++++++++++-- packages/pi-claude-cli/src/acp-driver.ts | 46 +++++++++++++++-- 2 files changed, 91 insertions(+), 6 deletions(-) diff --git a/packages/pi-claude-cli/src/__tests__/acp-driver.test.ts b/packages/pi-claude-cli/src/__tests__/acp-driver.test.ts index 77edc653bd..a6de14a60a 100644 --- a/packages/pi-claude-cli/src/__tests__/acp-driver.test.ts +++ b/packages/pi-claude-cli/src/__tests__/acp-driver.test.ts @@ -4,6 +4,7 @@ import { PassThrough } from "node:stream"; // Synthetic ACP session/update sequence the mocked prompt() will replay. let scriptedUpdates: Array> = []; +let scriptedUsage: Record | undefined; // Driver validates the bridge path with existsSync — make the fake path "exist". // writeFileSync/unlinkSync back the R17 auth-failure signal (spied). @@ -33,7 +34,7 @@ vi.mock("@agentclientprotocol/sdk", () => ({ this.newSession = vi.fn(async () => ({ sessionId: "s1" })); this.prompt = vi.fn(async () => { for (const u of scriptedUpdates) await handler.sessionUpdate({ update: u }); - return { stopReason: "end_turn" }; + return { stopReason: "end_turn", usage: scriptedUsage }; }); }), })); @@ -53,7 +54,7 @@ vi.mock("@earendil-works/pi-ai", () => ({ calculateCost: vi.fn(), })); -import { streamViaAcp } from "../acp-driver.js"; +import { streamViaAcp, buildBridgeEnv } from "../acp-driver.js"; const MODEL = { id: "claude-sonnet-4-5", name: "Claude Sonnet 4.5" } as never; const CTX = { messages: [{ role: "user", content: "hi" }] } as never; @@ -65,7 +66,17 @@ function eventsOf(stream: { _events: Array> }) { const flush = () => new Promise((r) => setTimeout(r, 30)); describe("streamViaAcp — ACP→pi translation (U11)", () => { - beforeEach(() => { scriptedUpdates = []; }); + beforeEach(() => { scriptedUpdates = []; scriptedUsage = undefined; }); + + it("feeds ACP token usage into the done message (item 2)", async () => { + scriptedUsage = { inputTokens: 11, outputTokens: 22 }; + scriptedUpdates = [{ sessionUpdate: "agent_message_chunk", content: { type: "text", text: "hi" } }]; + const stream = streamViaAcp(MODEL, CTX, OPTS) as unknown as { _events: Array> }; + await flush(); + const done = stream._events.find((e) => e.type === "done") as { message?: { usage?: { input?: number; output?: number } } }; + expect(done?.message?.usage?.input).toBe(11); + expect(done?.message?.usage?.output).toBe(22); + }); it("translates agent_message_chunk text into pi text events + done(stop)", async () => { scriptedUpdates = [ @@ -157,3 +168,37 @@ describe("streamViaAcp — ACP→pi translation (U11)", () => { expect(eventsOf(stream).some((e) => e.type === "done")).toBe(true); }); }); + +describe("buildBridgeEnv — R17 auth opt-in (item 3)", () => { + const saved = { flag: process.env.FUSION_CLAUDE_ACP_FORWARD_AUTH, oauth: process.env.CLAUDE_CODE_OAUTH_TOKEN, key: process.env.ANTHROPIC_API_KEY }; + afterEach(() => { + for (const [k, v] of [["FUSION_CLAUDE_ACP_FORWARD_AUTH", saved.flag], ["CLAUDE_CODE_OAUTH_TOKEN", saved.oauth], ["ANTHROPIC_API_KEY", saved.key]] as const) { + if (v === undefined) delete process.env[k]; else process.env[k] = v; + } + }); + + it("does NOT forward auth vars by default (secure default)", () => { + delete process.env.FUSION_CLAUDE_ACP_FORWARD_AUTH; + process.env.ANTHROPIC_API_KEY = "sk-secret"; + const env = buildBridgeEnv({ HOME: "/h", PATH: "/b" }); + expect(env.ANTHROPIC_API_KEY).toBeUndefined(); + expect(env.HOME).toBe("/h"); + }); + + it("forwards a single auth token when opted in", () => { + process.env.FUSION_CLAUDE_ACP_FORWARD_AUTH = "1"; + delete process.env.CLAUDE_CODE_OAUTH_TOKEN; + process.env.ANTHROPIC_API_KEY = "sk-secret"; + const env = buildBridgeEnv({ HOME: "/h", PATH: "/b" }); + expect(env.ANTHROPIC_API_KEY).toBe("sk-secret"); + }); + + it("prefers CLAUDE_CODE_OAUTH_TOKEN and forwards only one token", () => { + process.env.FUSION_CLAUDE_ACP_FORWARD_AUTH = "1"; + process.env.CLAUDE_CODE_OAUTH_TOKEN = "oauth-tok"; + process.env.ANTHROPIC_API_KEY = "sk-secret"; + const env = buildBridgeEnv({ HOME: "/h", PATH: "/b" }); + expect(env.CLAUDE_CODE_OAUTH_TOKEN).toBe("oauth-tok"); + expect(env.ANTHROPIC_API_KEY).toBeUndefined(); + }); +}); diff --git a/packages/pi-claude-cli/src/acp-driver.ts b/packages/pi-claude-cli/src/acp-driver.ts index 1566e27a87..535b4f5425 100644 --- a/packages/pi-claude-cli/src/acp-driver.ts +++ b/packages/pi-claude-cli/src/acp-driver.ts @@ -119,13 +119,35 @@ const BRIDGE_ENV_ALLOWLIST = [ "TERM", "TERMINFO", "TMPDIR", "XDG_CONFIG_HOME", "XDG_CACHE_HOME", "COLORTERM", ]; -function buildBridgeEnv(supplied?: NodeJS.ProcessEnv): NodeJS.ProcessEnv { +/** + * R17 opt-in (detached-daemon auth): a headless daemon can't reach the macOS + * login Keychain, so `claude` reports "Not logged in". When an operator sets + * `FUSION_CLAUDE_ACP_FORWARD_AUTH=1` AND provides one of these in the launch + * environment, we forward it (and ONLY it) so the bridged `claude` can + * authenticate non-interactively. Default OFF — no secret-bearing var ever + * reaches the untrusted bridge otherwise. Mirrors the native claude-code + * adapter's recognized auth vars. + */ +const BRIDGE_AUTH_ENV_KEYS = ["CLAUDE_CODE_OAUTH_TOKEN", "ANTHROPIC_AUTH_TOKEN", "ANTHROPIC_API_KEY"]; + +export function buildBridgeEnv(supplied?: NodeJS.ProcessEnv): NodeJS.ProcessEnv { const source = supplied ?? process.env; const env: NodeJS.ProcessEnv = {}; for (const key of BRIDGE_ENV_ALLOWLIST) { const v = source[key]; if (typeof v === "string") env[key] = v; } + // Opt-in only: forward a single Claude auth token from the operator's launch + // env (always process.env, never the caller-supplied object). + if (process.env.FUSION_CLAUDE_ACP_FORWARD_AUTH === "1") { + for (const key of BRIDGE_AUTH_ENV_KEYS) { + const v = process.env[key]; + if (typeof v === "string" && v.length > 0) { + env[key] = v; + break; // forward only the highest-preference token that's present + } + } + } return env; } @@ -350,8 +372,26 @@ export function streamViaAcp( ]; // ACP ContentBlock[] — text/image shapes match; cast through unknown. - await conn.prompt({ sessionId: opened.sessionId, prompt: blocks as unknown as Parameters[0]["prompt"] }); - if (!sawToolCall) finish("stop"); + const res = await conn.prompt({ sessionId: opened.sessionId, prompt: blocks as unknown as Parameters[0]["prompt"] }); + // Feed token usage (experimental ACP field) into the bridge BEFORE finish() + // so it lands in the `done` message. Tool-use turns break early and never + // resolve here, so they inherently report zero usage. Zero-when-absent safe. + if (!sawToolCall) { + const u = (res as { usage?: { inputTokens?: number; outputTokens?: number; cachedReadTokens?: number; cachedWriteTokens?: number } }).usage; + if (u) { + bridge.handleEvent({ + type: "message_delta", + delta: {}, + usage: { + input_tokens: u.inputTokens, + output_tokens: u.outputTokens, + cache_read_input_tokens: u.cachedReadTokens ?? undefined, + cache_creation_input_tokens: u.cachedWriteTokens ?? undefined, + }, + } as ClaudeApiEvent); + } + finish("stop"); + } } catch (err) { failWith(err instanceof Error ? err.message : String(err)); } From 4e2a887422921361a088f9b033537c5ee35d9ef2 Mon Sep 17 00:00:00 2001 From: gsxdsm Date: Mon, 15 Jun 2026 13:28:43 -0700 Subject: [PATCH 4/7] fix(acp): import afterEach in acp-driver test (typecheck) Co-Authored-By: Claude Opus 4.8 --- packages/pi-claude-cli/src/__tests__/acp-driver.test.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/pi-claude-cli/src/__tests__/acp-driver.test.ts b/packages/pi-claude-cli/src/__tests__/acp-driver.test.ts index a6de14a60a..2c9ac962cb 100644 --- a/packages/pi-claude-cli/src/__tests__/acp-driver.test.ts +++ b/packages/pi-claude-cli/src/__tests__/acp-driver.test.ts @@ -1,4 +1,4 @@ -import { describe, it, expect, vi, beforeEach } from "vitest"; +import { describe, it, expect, vi, beforeEach, afterEach } from "vitest"; import { EventEmitter } from "node:events"; import { PassThrough } from "node:stream"; From 031a5470bb7c8914234181b4292339139dba22ba Mon Sep 17 00:00:00 2001 From: gsxdsm Date: Mon, 15 Jun 2026 14:19:17 -0700 Subject: [PATCH 5/7] fix(review): address PR #1682 security review (usage validation + cache tokens) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - P2: event-bridge handleMessageDelta now consumes cache_read/cache_creation tokens (parity with handleMessageStart) — the OQ3 usage path carried them but they were silently dropped, understating cost for cached turns. - P2: validate the untrusted bridge usage payload — coerce each field to a finite, non-negative number before forwarding, so a malformed value (string/NaN/negative) can't corrupt totalTokens/cost. - Tests: usage now asserts cache tokens + totalTokens; new cases for malformed usage, tool-use turns reporting zero usage, the ANTHROPIC_AUTH_TOKEN middle precedence, and that the auth token is read from process.env (never a caller-supplied value — no token substitution). - Doc: state the auth-forwarding exposure trade-off in the code comment. acp-driver 13/13; event-bridge tests green; typecheck clean. Co-Authored-By: Claude Opus 4.8 --- .../src/__tests__/acp-driver.test.ts | 65 +++++++++++++++++-- packages/pi-claude-cli/src/acp-driver.ts | 21 ++++-- packages/pi-claude-cli/src/event-bridge.ts | 7 ++ 3 files changed, 83 insertions(+), 10 deletions(-) diff --git a/packages/pi-claude-cli/src/__tests__/acp-driver.test.ts b/packages/pi-claude-cli/src/__tests__/acp-driver.test.ts index 2c9ac962cb..2dedde00e4 100644 --- a/packages/pi-claude-cli/src/__tests__/acp-driver.test.ts +++ b/packages/pi-claude-cli/src/__tests__/acp-driver.test.ts @@ -68,14 +68,39 @@ const flush = () => new Promise((r) => setTimeout(r, 30)); describe("streamViaAcp — ACP→pi translation (U11)", () => { beforeEach(() => { scriptedUpdates = []; scriptedUsage = undefined; }); - it("feeds ACP token usage into the done message (item 2)", async () => { - scriptedUsage = { inputTokens: 11, outputTokens: 22 }; + it("feeds ACP token usage (incl. cache tokens) into the done message (item 2)", async () => { + scriptedUsage = { inputTokens: 11, outputTokens: 22, cachedReadTokens: 5, cachedWriteTokens: 3 }; scriptedUpdates = [{ sessionUpdate: "agent_message_chunk", content: { type: "text", text: "hi" } }]; const stream = streamViaAcp(MODEL, CTX, OPTS) as unknown as { _events: Array> }; await flush(); - const done = stream._events.find((e) => e.type === "done") as { message?: { usage?: { input?: number; output?: number } } }; + const done = stream._events.find((e) => e.type === "done") as { message?: { usage?: { input?: number; output?: number; cacheRead?: number; cacheWrite?: number; totalTokens?: number } } }; expect(done?.message?.usage?.input).toBe(11); expect(done?.message?.usage?.output).toBe(22); + expect(done?.message?.usage?.cacheRead).toBe(5); + expect(done?.message?.usage?.cacheWrite).toBe(3); + expect(done?.message?.usage?.totalTokens).toBe(41); + }); + + it("ignores a malformed/untrusted usage payload (string/NaN/negative)", async () => { + scriptedUsage = { inputTokens: "99" as unknown as number, outputTokens: NaN, cachedReadTokens: -5 }; + scriptedUpdates = [{ sessionUpdate: "agent_message_chunk", content: { type: "text", text: "hi" } }]; + const stream = streamViaAcp(MODEL, CTX, OPTS) as unknown as { _events: Array> }; + await flush(); + const done = stream._events.find((e) => e.type === "done") as { message?: { usage?: { input?: number; output?: number } } }; + // Coerced to undefined → bridge leaves usage at 0; never a string/NaN. + expect(done?.message?.usage?.input).toBe(0); + expect(Number.isNaN(done?.message?.usage?.output)).toBe(false); + }); + + it("does not emit usage on a tool-use (break-early) turn", async () => { + scriptedUsage = { inputTokens: 11, outputTokens: 22 }; + scriptedUpdates = [ + { sessionUpdate: "tool_call", toolCallId: "t1", _meta: { claudeCode: { toolName: "mcp__custom-tools__fn_task_list" } }, rawInput: {} }, + ]; + const stream = streamViaAcp(MODEL, CTX, OPTS) as unknown as { _events: Array> }; + await flush(); + const done = stream._events.find((e) => e.type === "done") as { message?: { usage?: { input?: number } } }; + expect(done?.message?.usage?.input ?? 0).toBe(0); // tool-use turn reports zero usage }); it("translates agent_message_chunk text into pi text events + done(stop)", async () => { @@ -170,9 +195,19 @@ describe("streamViaAcp — ACP→pi translation (U11)", () => { }); describe("buildBridgeEnv — R17 auth opt-in (item 3)", () => { - const saved = { flag: process.env.FUSION_CLAUDE_ACP_FORWARD_AUTH, oauth: process.env.CLAUDE_CODE_OAUTH_TOKEN, key: process.env.ANTHROPIC_API_KEY }; + const saved = { + flag: process.env.FUSION_CLAUDE_ACP_FORWARD_AUTH, + oauth: process.env.CLAUDE_CODE_OAUTH_TOKEN, + authTok: process.env.ANTHROPIC_AUTH_TOKEN, + key: process.env.ANTHROPIC_API_KEY, + }; afterEach(() => { - for (const [k, v] of [["FUSION_CLAUDE_ACP_FORWARD_AUTH", saved.flag], ["CLAUDE_CODE_OAUTH_TOKEN", saved.oauth], ["ANTHROPIC_API_KEY", saved.key]] as const) { + for (const [k, v] of [ + ["FUSION_CLAUDE_ACP_FORWARD_AUTH", saved.flag], + ["CLAUDE_CODE_OAUTH_TOKEN", saved.oauth], + ["ANTHROPIC_AUTH_TOKEN", saved.authTok], + ["ANTHROPIC_API_KEY", saved.key], + ] as const) { if (v === undefined) delete process.env[k]; else process.env[k] = v; } }); @@ -201,4 +236,24 @@ describe("buildBridgeEnv — R17 auth opt-in (item 3)", () => { expect(env.CLAUDE_CODE_OAUTH_TOKEN).toBe("oauth-tok"); expect(env.ANTHROPIC_API_KEY).toBeUndefined(); }); + + it("forwards ANTHROPIC_AUTH_TOKEN (middle precedence) when no OAuth token", () => { + process.env.FUSION_CLAUDE_ACP_FORWARD_AUTH = "1"; + delete process.env.CLAUDE_CODE_OAUTH_TOKEN; + process.env.ANTHROPIC_AUTH_TOKEN = "auth-tok"; + process.env.ANTHROPIC_API_KEY = "sk-secret"; + const env = buildBridgeEnv({ HOME: "/h", PATH: "/b" }); + expect(env.ANTHROPIC_AUTH_TOKEN).toBe("auth-tok"); + expect(env.ANTHROPIC_API_KEY).toBeUndefined(); + }); + + it("reads the auth token from process.env, never a caller-supplied value (no token substitution)", () => { + process.env.FUSION_CLAUDE_ACP_FORWARD_AUTH = "1"; + delete process.env.CLAUDE_CODE_OAUTH_TOKEN; + delete process.env.ANTHROPIC_AUTH_TOKEN; + process.env.ANTHROPIC_API_KEY = "real-from-env"; + // A caller trying to inject a different token via the supplied env must be ignored. + const env = buildBridgeEnv({ HOME: "/h", PATH: "/b", ANTHROPIC_API_KEY: "attacker" } as NodeJS.ProcessEnv); + expect(env.ANTHROPIC_API_KEY).toBe("real-from-env"); + }); }); diff --git a/packages/pi-claude-cli/src/acp-driver.ts b/packages/pi-claude-cli/src/acp-driver.ts index 535b4f5425..daca18941d 100644 --- a/packages/pi-claude-cli/src/acp-driver.ts +++ b/packages/pi-claude-cli/src/acp-driver.ts @@ -127,6 +127,12 @@ const BRIDGE_ENV_ALLOWLIST = [ * authenticate non-interactively. Default OFF — no secret-bearing var ever * reaches the untrusted bridge otherwise. Mirrors the native claude-code * adapter's recognized auth vars. + * + * Security trade-off (state it where the operator opts in): once forwarded, the + * token is visible to the bridge subprocess AND everything it spawns (including + * MCP servers that inherit env). It is NOT in prompt/model context, so the model + * can't read it, but opting in widens exposure to the bridge process tree. + * Prefer a scoped/rotatable `CLAUDE_CODE_OAUTH_TOKEN` (`claude setup-token`). */ const BRIDGE_AUTH_ENV_KEYS = ["CLAUDE_CODE_OAUTH_TOKEN", "ANTHROPIC_AUTH_TOKEN", "ANTHROPIC_API_KEY"]; @@ -377,16 +383,21 @@ export function streamViaAcp( // so it lands in the `done` message. Tool-use turns break early and never // resolve here, so they inherently report zero usage. Zero-when-absent safe. if (!sawToolCall) { - const u = (res as { usage?: { inputTokens?: number; outputTokens?: number; cachedReadTokens?: number; cachedWriteTokens?: number } }).usage; + const u = (res as { usage?: Record }).usage; + // The bridge is untrusted (see BRIDGE_ENV_ALLOWLIST): coerce its usage + // payload to finite, non-negative numbers only so a malformed value + // (string/NaN/negative) can't corrupt totalTokens / cost downstream. + const num = (x: unknown): number | undefined => + typeof x === "number" && Number.isFinite(x) && x >= 0 ? x : undefined; if (u) { bridge.handleEvent({ type: "message_delta", delta: {}, usage: { - input_tokens: u.inputTokens, - output_tokens: u.outputTokens, - cache_read_input_tokens: u.cachedReadTokens ?? undefined, - cache_creation_input_tokens: u.cachedWriteTokens ?? undefined, + input_tokens: num(u.inputTokens), + output_tokens: num(u.outputTokens), + cache_read_input_tokens: num(u.cachedReadTokens), + cache_creation_input_tokens: num(u.cachedWriteTokens), }, } as ClaudeApiEvent); } diff --git a/packages/pi-claude-cli/src/event-bridge.ts b/packages/pi-claude-cli/src/event-bridge.ts index bde3db7c07..9f83638567 100644 --- a/packages/pi-claude-cli/src/event-bridge.ts +++ b/packages/pi-claude-cli/src/event-bridge.ts @@ -420,6 +420,13 @@ export function createEventBridge( if (usage.input_tokens != null) output.usage.input = usage.input_tokens; if (usage.output_tokens != null) output.usage.output = usage.output_tokens; + // Cache tokens (parity with handleMessageStart) — the ACP usage path + // (OQ3) carries these too; without this they'd be silently dropped and + // cost for cached turns would be understated. + if (usage.cache_read_input_tokens != null) + output.usage.cacheRead = usage.cache_read_input_tokens; + if (usage.cache_creation_input_tokens != null) + output.usage.cacheWrite = usage.cache_creation_input_tokens; output.usage.totalTokens = output.usage.input + output.usage.output + From b0bb39aa39e8346fc94bd1ebf5dab41433f493c2 Mon Sep 17 00:00:00 2001 From: gsxdsm Date: Mon, 15 Jun 2026 14:38:19 -0700 Subject: [PATCH 6/7] feat(acp): opt-in warm connection reuse across turns (OQ2) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Keep a warm bridge connection + ACP session across turns of one conversation (gated by FUSION_CLAUDE_ACP_REUSE=1, default OFF), so multi-turn lanes skip the cold bridge/claude spawn and session/new round-trip and send only the latest-turn delta (buildResumePrompt). A stable router indirection serves each turn's handlers. Addresses the adversarial review of the reuse path: - P0: a warm-child death routes failure to the CURRENT owner turn via router.fail, so a reuse turn fails fast instead of hanging until the 30-min inactivity timeout. - P1: eviction is cache-identity-aware (evictCachedAcpConn only deletes the map key when it still points at the entry), so a concurrent cold turn / stale close handler / idle timer can't evict or kill a newer live entry's child. - P1: an empty resume delta cold-starts instead of issuing an empty prompt that could hang. - P2: a per-turn token drops cross-turn stray updates on the shared warm connection. - The idle reaper is unref'd so it never pins the process. Default OFF → the cold path is functionally unchanged (reviewer-verified). Adds multi-turn tests: reuse skips spawn+session/new, flag-off spawns fresh, fail-fast on warm-child death, empty-resume cold fallback. 346/346 pass, tsc clean. Co-Authored-By: Claude Opus 4.8 --- .changeset/acp-route-a-claude-cli-bridge.md | 1 + .../src/__tests__/acp-driver.test.ts | 125 ++++++++- packages/pi-claude-cli/src/acp-driver.ts | 246 +++++++++++++++--- 3 files changed, 330 insertions(+), 42 deletions(-) diff --git a/.changeset/acp-route-a-claude-cli-bridge.md b/.changeset/acp-route-a-claude-cli-bridge.md index 27a3f3efbe..b3f60068bb 100644 --- a/.changeset/acp-route-a-claude-cli-bridge.md +++ b/.changeset/acp-route-a-claude-cli-bridge.md @@ -7,5 +7,6 @@ Route Fusion's Claude CLI path through the ACP bridge (`claude-code-cli-acp`) in - **U10** — forward `mcpServers` on ACP `session/new` through the runtime contract (`AgentRuntimeOptions.mcpServers` + the plugin's `newAcpSession`); defaults to `[]` so existing read-only ACP "ask" turns are unchanged. - **U11** — `streamViaAcp`: the `pi-claude-cli` provider can drive Claude through the bundled ACP bridge, returning the same `AssistantMessageEventStream` as the `-p` path. Dispatched only when `FUSION_CLAUDE_ACP=1` and a bridge path are present, so the live `-p` path is byte-for-byte untouched by default. Full-history prompting, schema-only MCP forwarding with break-early on pi-known tools, control-char/size sanitization, env allow-list, process-registry registration, and inactivity timeout. - **KTD10** — the ACP runtime plugin publishes its identity-pinned bundled bridge path on load so the kill-switch needs no manual path; it does not enable the transport. +- **OQ2** — opt-in connection reuse (`FUSION_CLAUDE_ACP_REUSE=1`, default OFF): a warm bridge connection + ACP session is kept across turns of one conversation (keyed by `sessionId`), so multi-turn lanes skip the cold bridge/`claude` spawn and `session/new` round-trip and send only the latest-turn delta (`buildResumePrompt`). A stable `router` indirection serves each turn's handlers; a warm-child death routes failure to the current owner turn (no 30-min inactivity hang), eviction is cache-identity-aware (a concurrent cold turn can't kill a newer entry's child), an empty resume cold-starts instead of issuing an empty prompt, and a per-turn token drops cross-turn stray updates. The idle reaper is `unref`'d. Default OFF → the cold path is functionally unchanged. The Claude-via-pi OAuth path is unchanged. Live verification confirmed the bridge gates tool execution behind `session/request_permission` (forwarded MCP tools and native tools do not execute when cancelled). Remaining for a follow-up: picker/auth/status surface (U12), workflow `model`-node verification (U13), and production rollout. diff --git a/packages/pi-claude-cli/src/__tests__/acp-driver.test.ts b/packages/pi-claude-cli/src/__tests__/acp-driver.test.ts index 2dedde00e4..ee9e1a854d 100644 --- a/packages/pi-claude-cli/src/__tests__/acp-driver.test.ts +++ b/packages/pi-claude-cli/src/__tests__/acp-driver.test.ts @@ -5,6 +5,9 @@ import { PassThrough } from "node:stream"; // Synthetic ACP session/update sequence the mocked prompt() will replay. let scriptedUpdates: Array> = []; let scriptedUsage: Record | undefined; +// When set, prompt() never resolves — simulates a turn waiting on the bridge so +// only an out-of-band event (child death / abort) can end it. +let scriptedHang = false; // Driver validates the bridge path with existsSync — make the fake path "exist". // writeFileSync/unlinkSync back the R17 auth-failure signal (spied). @@ -33,6 +36,7 @@ vi.mock("@agentclientprotocol/sdk", () => ({ this.initialize = vi.fn(async () => ({ protocolVersion: 1 })); this.newSession = vi.fn(async () => ({ sessionId: "s1" })); this.prompt = vi.fn(async () => { + if (scriptedHang) return new Promise(() => {}); // never resolves for (const u of scriptedUpdates) await handler.sessionUpdate({ update: u }); return { stopReason: "end_turn", usage: scriptedUsage }; }); @@ -54,6 +58,8 @@ vi.mock("@earendil-works/pi-ai", () => ({ calculateCost: vi.fn(), })); +import { spawn } from "node:child_process"; +import { ClientSideConnection } from "@agentclientprotocol/sdk"; import { streamViaAcp, buildBridgeEnv } from "../acp-driver.js"; const MODEL = { id: "claude-sonnet-4-5", name: "Claude Sonnet 4.5" } as never; @@ -66,7 +72,7 @@ function eventsOf(stream: { _events: Array> }) { const flush = () => new Promise((r) => setTimeout(r, 30)); describe("streamViaAcp — ACP→pi translation (U11)", () => { - beforeEach(() => { scriptedUpdates = []; scriptedUsage = undefined; }); + beforeEach(() => { scriptedUpdates = []; scriptedUsage = undefined; scriptedHang = false; }); it("feeds ACP token usage (incl. cache tokens) into the done message (item 2)", async () => { scriptedUsage = { inputTokens: 11, outputTokens: 22, cachedReadTokens: 5, cachedWriteTokens: 3 }; @@ -194,6 +200,123 @@ describe("streamViaAcp — ACP→pi translation (U11)", () => { }); }); +describe("connection reuse (item 1) — gated by FUSION_CLAUDE_ACP_REUSE", () => { + const savedReuse = process.env.FUSION_CLAUDE_ACP_REUSE; + beforeEach(() => { + scriptedUpdates = []; + scriptedUsage = undefined; + scriptedHang = false; + vi.mocked(spawn).mockClear(); + vi.mocked(ClientSideConnection).mockClear(); + }); + afterEach(() => { + if (savedReuse === undefined) delete process.env.FUSION_CLAUDE_ACP_REUSE; + else process.env.FUSION_CLAUDE_ACP_REUSE = savedReuse; + }); + + it("reuses one warm bridge connection across turns; turn 2 skips spawn + session/new", async () => { + process.env.FUSION_CLAUDE_ACP_REUSE = "1"; + const reuseOpts = { ...OPTS, sessionId: "conv-reuse-1" }; + + // Turn 1 (cold): needs >1 message so reuseKey activates and the connection caches. + const ctx1 = { messages: [{ role: "user", content: "hi" }, { role: "assistant", content: "hello" }] } as never; + scriptedUpdates = [{ sessionUpdate: "agent_message_chunk", content: { type: "text", text: "turn one" } }]; + streamViaAcp(MODEL, ctx1, reuseOpts); + await flush(); + expect(vi.mocked(spawn)).toHaveBeenCalledTimes(1); + expect(vi.mocked(ClientSideConnection)).toHaveBeenCalledTimes(1); + + // Turn 2 (warm): same sessionId → no new spawn, no new connection. + const ctx2 = { messages: [...(ctx1 as unknown as { messages: unknown[] }).messages, { role: "user", content: "again" }] } as never; + scriptedUpdates = [{ sessionUpdate: "agent_message_chunk", content: { type: "text", text: "turn two" } }]; + const s2 = streamViaAcp(MODEL, ctx2, reuseOpts) as unknown as { _events: Array> }; + await flush(); + expect(vi.mocked(spawn)).toHaveBeenCalledTimes(1); // no second spawn + expect(vi.mocked(ClientSideConnection)).toHaveBeenCalledTimes(1); // no second connection + + // The single warm connection's prompt() ran once per turn. + const conn = vi.mocked(ClientSideConnection).mock.instances[0] as unknown as { prompt: ReturnType; newSession: ReturnType }; + expect(conn.prompt).toHaveBeenCalledTimes(2); + expect(conn.newSession).toHaveBeenCalledTimes(1); // session/new only on the cold turn + const done = s2._events.find((e) => e.type === "done"); + expect(done!.reason).toBe("stop"); + + // Cleanup: evict the warm connection + clear its (unref'd) idle timer. + (vi.mocked(spawn).mock.results[0].value as EventEmitter).emit("close", 0); + }); + + it("fails a reuse turn FAST when the warm child dies mid-prompt (P0: no 30min hang)", async () => { + process.env.FUSION_CLAUDE_ACP_REUSE = "1"; + const reuseOpts = { ...OPTS, sessionId: "conv-death" }; + + // Turn 1 (cold) caches the warm connection. + const ctx1 = { messages: [{ role: "user", content: "hi" }, { role: "assistant", content: "hello" }] } as never; + scriptedUpdates = [{ sessionUpdate: "agent_message_chunk", content: { type: "text", text: "one" } }]; + streamViaAcp(MODEL, ctx1, reuseOpts); + await flush(); + const child = vi.mocked(spawn).mock.results[0].value as EventEmitter; + + // Turn 2 (warm) hangs on prompt() — only the child-death path can end it. + scriptedHang = true; + const ctx2 = { messages: [...(ctx1 as unknown as { messages: unknown[] }).messages, { role: "user", content: "again" }] } as never; + const s2 = streamViaAcp(MODEL, ctx2, reuseOpts) as unknown as { _events: Array> }; + await flush(); + expect(s2._events.some((e) => e.type === "done")).toBe(false); // still waiting + + // The warm child dies. The cold turn's close handler routes failure to the + // CURRENT (reuse) turn via router.fail, so it ends immediately. + child.emit("close", 1); + await flush(); + const done = s2._events.find((e) => e.type === "done") as { reason?: string; message?: { content?: Array<{ text?: string }> } }; + expect(done).toBeDefined(); + expect(done!.reason).toBe("stop"); + expect(JSON.stringify(done!.message?.content)).toContain("Error"); + + // Cache was evicted: a subsequent turn cold-spawns a fresh bridge. + scriptedHang = false; + scriptedUpdates = [{ sessionUpdate: "agent_message_chunk", content: { type: "text", text: "fresh" } }]; + const ctx3 = { messages: [...(ctx2 as unknown as { messages: unknown[] }).messages, { role: "assistant", content: "" }, { role: "user", content: "q3" }] } as never; + streamViaAcp(MODEL, ctx3, reuseOpts); + await flush(); + expect(vi.mocked(spawn)).toHaveBeenCalledTimes(2); // turn 1 + the post-death cold restart + }); + + it("cold-starts (no warm reuse) when the resume delta is empty (P1: no empty-prompt hang)", async () => { + process.env.FUSION_CLAUDE_ACP_REUSE = "1"; + const reuseOpts = { ...OPTS, sessionId: "conv-empty" }; + + // Turn 1 (cold) caches. + const ctx1 = { messages: [{ role: "user", content: "hi" }, { role: "assistant", content: "hello" }] } as never; + scriptedUpdates = [{ sessionUpdate: "agent_message_chunk", content: { type: "text", text: "one" } }]; + streamViaAcp(MODEL, ctx1, reuseOpts); + await flush(); + expect(vi.mocked(spawn)).toHaveBeenCalledTimes(1); + + // Turn 2 whose context ends in an assistant message → buildResumePrompt is + // empty → must NOT take the warm path (would hang); cold-starts instead. + const ctx2 = { messages: [...(ctx1 as unknown as { messages: unknown[] }).messages, { role: "user", content: "x" }, { role: "assistant", content: "y" }] } as never; + scriptedUpdates = [{ sessionUpdate: "agent_message_chunk", content: { type: "text", text: "two" } }]; + const s2 = streamViaAcp(MODEL, ctx2, reuseOpts) as unknown as { _events: Array> }; + await flush(); + expect(vi.mocked(spawn)).toHaveBeenCalledTimes(2); // empty resume → fresh spawn + const done = s2._events.find((e) => e.type === "done"); + expect(done!.reason).toBe("stop"); // produced a normal turn, did not hang + }); + + it("does NOT reuse when the flag is off (default): each turn spawns a fresh bridge", async () => { + delete process.env.FUSION_CLAUDE_ACP_REUSE; + const reuseOpts = { ...OPTS, sessionId: "conv-off" }; + const ctx = { messages: [{ role: "user", content: "hi" }, { role: "assistant", content: "x" }] } as never; + scriptedUpdates = [{ sessionUpdate: "agent_message_chunk", content: { type: "text", text: "a" } }]; + streamViaAcp(MODEL, ctx, reuseOpts); + await flush(); + streamViaAcp(MODEL, ctx, reuseOpts); + await flush(); + expect(vi.mocked(spawn)).toHaveBeenCalledTimes(2); + expect(vi.mocked(ClientSideConnection)).toHaveBeenCalledTimes(2); + }); +}); + describe("buildBridgeEnv — R17 auth opt-in (item 3)", () => { const saved = { flag: process.env.FUSION_CLAUDE_ACP_FORWARD_AUTH, diff --git a/packages/pi-claude-cli/src/acp-driver.ts b/packages/pi-claude-cli/src/acp-driver.ts index daca18941d..d034475dda 100644 --- a/packages/pi-claude-cli/src/acp-driver.ts +++ b/packages/pi-claude-cli/src/acp-driver.ts @@ -45,7 +45,7 @@ import { } from "@agentclientprotocol/sdk"; import { AssistantMessageEventStream } from "@earendil-works/pi-ai"; import type { Api, Model, SimpleStreamOptions } from "@earendil-works/pi-ai"; -import { buildPrompt, buildSystemPrompt, type PiContext } from "./prompt-builder.js"; +import { buildPrompt, buildResumePrompt, buildSystemPrompt, type PiContext } from "./prompt-builder.js"; import { createEventBridge } from "./event-bridge.js"; import { registerProcess, captureStderr } from "./process-manager.js"; import { isPiKnownClaudeTool } from "./tool-mapping.js"; @@ -190,6 +190,61 @@ function toAcpPromptBlocks( return out; } +/** + * FNXC:ClaudeAcp 2026-06-15-14:10: + * Connection-reuse cache (item 1 / OQ2). Gated behind `FUSION_CLAUDE_ACP_REUSE` + * (default OFF). When on, a live bridge connection + ACP session is kept warm + * across turns of one conversation (keyed by the stable `options.sessionId`), so + * multi-turn lanes skip the cold bridge+claude spawn, the `session/new` + * round-trip, AND the full-history resend — sending only `buildResumePrompt` + * (delta) on reuse. A stable `router` indirection lets the long-lived connection + * handler serve each turn's fresh per-call state. Default OFF → the cold path + * below is functionally unchanged. + */ +const REUSE_IDLE_MS = 5 * 60_000; +interface AcpRouter { + onUpdate: ((p: { update?: Record } & Record) => Promise) | null; + onPermission: ((p: Record) => Promise<{ outcome: { outcome: "cancelled" } }>) | null; + // Liveness: invoked when the warm child dies so the turn CURRENTLY owning the + // connection fails fast instead of hanging until the inactivity timeout. The + // long-lived `child.on("close")` is bound to the cold turn's closure, so + // without this a reuse turn's death would never reach its own `failWith`. + // Repointed to each turn's `failWith`; nulled on release (idle → just evict). + fail: ((msg: string) => void) | null; +} +interface CachedAcpConn { + conn: ClientSideConnection; + child: ChildProcess; + acpSessionId: string; + cwd: string; + inUse: boolean; + router: AcpRouter; + idleTimer?: ReturnType; + // Monotonic id of the turn currently owning the connection. A stray + // session/update from a finished turn is dropped when it no longer matches. + activeTurn: number; +} +const acpSessionCache = new Map(); +let acpTurnCounter = 0; +function acpReuseEnabled(): boolean { + return process.env.FUSION_CLAUDE_ACP_REUSE === "1"; +} +/** + * Kill a cached connection's child and evict it — but only delete the map key + * if it STILL points at this exact entry. A concurrent cold turn may have + * replaced the entry under the same key; a stale close handler / idle timer + * must not evict (or kill the child of) that newer, live entry. The passed + * entry's own child is always killed (it is the dead/finished one). + */ +function evictCachedAcpConn(key: string, entry: CachedAcpConn): void { + if (acpSessionCache.get(key) === entry) acpSessionCache.delete(key); + if (entry.idleTimer) { clearTimeout(entry.idleTimer); entry.idleTimer = undefined; } + entry.router.onUpdate = null; + entry.router.onPermission = null; + entry.router.fail = null; + try { entry.child.kill("SIGKILL"); } catch { /* registry SIGKILL is authoritative */ } +} + /** * Stream a Claude response via the ACP bridge as an `AssistantMessageEventStream`. * Mirrors `streamViaCli`'s contract (start → deltas → done; break-early on tools). @@ -205,6 +260,12 @@ export function streamViaAcp( const bridge = createEventBridge(stream, model); (async () => { + const cwd = options.cwd ?? process.cwd(); + const reuseKey = + acpReuseEnabled() && options.sessionId && context.messages.length > 1 + ? options.sessionId + : undefined; + let child: ChildProcess | undefined; let getStderr: (() => string) | undefined; let ended = false; @@ -214,11 +275,44 @@ export function streamViaAcp( let sawToolCall = false; let inactivity: ReturnType | undefined; let onAbort: (() => void) | undefined; + // The cache entry this turn is bound to (set on reuse, or after a cold turn + // caches its connection). Identity-checked against the map before release. + let cacheEntry: CachedAcpConn | undefined; + // This turn's monotonic id, stamped onto the shared cache entry when the + // turn acquires it. Handlers drop updates once the entry moves to a newer + // turn (defends the warm connection against cross-turn content bleed). + let myTurn = 0; - const cleanup = () => { + // End the turn. `destroy` kills+evicts the connection; otherwise a cached + // connection is released (kept warm for the next turn) and a one-shot + // (non-reuse) connection is killed. + const endTurn = (destroy: boolean) => { if (inactivity) { clearTimeout(inactivity); inactivity = undefined; } if (onAbort && options.signal) options.signal.removeEventListener("abort", onAbort); - try { child?.kill("SIGKILL"); } catch { /* registry SIGKILL is authoritative */ } + const entry = cacheEntry; + const keepWarm = + !destroy && entry !== undefined && reuseKey !== undefined && + acpSessionCache.get(reuseKey) === entry; + if (keepWarm) { + // Release the warm connection: drop this turn's handlers (so a late + // update can't reach a finished turn or the liveness hook fire stale), + // mark idle, and arm an unref'd reaper bound to THIS entry. + entry!.router.onUpdate = null; + entry!.router.onPermission = null; + entry!.router.fail = null; + entry!.inUse = false; + if (entry!.idleTimer) clearTimeout(entry!.idleTimer); + const idle = setTimeout(() => evictCachedAcpConn(reuseKey!, entry!), REUSE_IDLE_MS); + idle.unref?.(); // a warm-connection idle timer must not keep the process alive + entry!.idleTimer = idle; + return; + } + if (entry !== undefined && reuseKey !== undefined) { + // Kills this turn's child; evicts the map key only if still current. + evictCachedAcpConn(reuseKey, entry); + } else { + try { child?.kill("SIGKILL"); } catch { /* registry SIGKILL is authoritative */ } + } }; const armInactivity = () => { if (inactivity) clearTimeout(inactivity); @@ -251,7 +345,7 @@ export function streamViaAcp( bridge.handleEvent({ type: "message_delta", delta: { stop_reason: effective === "tool_use" ? "tool_use" : "end_turn" } } as ClaudeApiEvent); stream.push({ type: "done", reason: effective === "tool_use" ? "toolUse" : "stop", message: bridge.getOutput() }); stream.end(); - cleanup(); + endTurn(false); // clean turn → keep a cached connection warm for next turn }; const failWith = (msg: string) => { @@ -268,7 +362,7 @@ export function streamViaAcp( }, }); stream.end(); - cleanup(); + endTurn(true); // failed turn → destroy the connection (never reuse a broken one) }; const openBlock = (kind: "text" | "thinking") => { @@ -291,9 +385,11 @@ export function streamViaAcp( finish("tool_use"); }; - const clientHandler = { - async sessionUpdate(params: { update?: Record } & Record) { + const handleUpdate = async (params: { update?: Record } & Record): Promise => { if (ended) return; + // The warm connection is shared across turns; ignore a stray update once + // the entry has been handed to a newer turn (cross-turn bleed guard). + if (cacheEntry && cacheEntry.activeTurn !== myTurn) return; armInactivity(); const u = (params.update ?? params) as Record; const kind = u.sessionUpdate as string; @@ -320,13 +416,14 @@ export function streamViaAcp( surfaceToolAndBreak(claudeName, (u.toolCallId as string) ?? `acp_${blockIndex + 1}`, u.rawInput ?? u.input); } } - }, - async requestPermission(params: Record) { + }; + + const handlePermission = async (params: Record): Promise<{ outcome: { outcome: "cancelled" } }> => { // A permission request means the bridge is about to EXECUTE a tool. For a // pi-known tool, surface it to pi and break early (pi executes it); deny // by default otherwise. We always return cancelled so the bridge never // executes Fusion's tools itself. - if (!ended) { + if (!ended && !(cacheEntry && cacheEntry.activeTurn !== myTurn)) { const tc = (params.toolCall ?? {}) as Record; const claudeName = ((tc._meta as { claudeCode?: { toolName?: string } } | undefined)?.claudeCode?.toolName) ?? (tc.title as string) ?? ""; if (isPiKnownClaudeTool(claudeName)) { @@ -334,19 +431,92 @@ export function streamViaAcp( } } return { outcome: { outcome: "cancelled" as const } }; - }, + }; + + // Usage emission (OQ3) — shared by the cold + reuse paths. Coerces the + // untrusted bridge usage payload to finite, non-negative numbers. + const emitUsage = (res: unknown): void => { + if (sawToolCall) return; + const u = (res as { usage?: Record }).usage; + if (!u) return; + const num = (x: unknown): number | undefined => + typeof x === "number" && Number.isFinite(x) && x >= 0 ? x : undefined; + bridge.handleEvent({ + type: "message_delta", + delta: {}, + usage: { + input_tokens: num(u.inputTokens), + output_tokens: num(u.outputTokens), + cache_read_input_tokens: num(u.cachedReadTokens), + cache_creation_input_tokens: num(u.cachedWriteTokens), + }, + } as ClaudeApiEvent); }; try { + const withTimeout = (p: Promise, label: string) => + Promise.race([p, new Promise((_, rej) => setTimeout(() => rej(new Error(`ACP ${label} timeout`)), INITIALIZE_TIMEOUT_MS))]); + + // ── Reuse path: a warm connection for this conversation exists ────────── + // Skip spawn + initialize + session/new, and send ONLY the latest-turn + // delta (`buildResumePrompt`) because the warm `claude` session already + // holds the prior turns server-side (sending full history would duplicate + // it). Gated by `reuseKey`, which is undefined unless reuse is enabled. + let warm = reuseKey ? acpSessionCache.get(reuseKey) : undefined; + // Never reuse a busy connection or one bound to a different cwd. + if (warm && (warm.inUse || warm.cwd !== cwd)) warm = undefined; + // A reuse turn sends only the delta; if there's nothing new to send, an + // empty prompt to the warm session could hang. Drop the warm connection + // and cold-start with full history instead. + let resumeBlocks: ReturnType | undefined; + if (warm && reuseKey) { + const resume = buildResumePrompt(context); + const resumeEmpty = typeof resume === "string" ? resume.trim() === "" : resume.length === 0; + if (resumeEmpty) { evictCachedAcpConn(reuseKey, warm); warm = undefined; } + else resumeBlocks = toAcpPromptBlocks(resume as string | Array>); + } + if (warm && reuseKey && resumeBlocks) { + cacheEntry = warm; + myTurn = ++acpTurnCounter; + warm.activeTurn = myTurn; + warm.inUse = true; + if (warm.idleTimer) { clearTimeout(warm.idleTimer); warm.idleTimer = undefined; } + warm.router.onUpdate = handleUpdate; + warm.router.onPermission = handlePermission; + warm.router.fail = failWith; // a warm-child death now fails THIS turn fast + child = warm.child; + onAbort = () => failWith("aborted"); + if (options.signal) options.signal.addEventListener("abort", onAbort, { once: true }); + armInactivity(); + + // ACP ContentBlock[] — text/image shapes match; cast through unknown. + const res = await warm.conn.prompt({ sessionId: warm.acpSessionId, prompt: resumeBlocks as unknown as Parameters[0]["prompt"] }); + if (ended) return; + emitUsage(res); + if (!sawToolCall) finish("stop"); + return; + } + + // ── Cold path: spawn the bridge and open a fresh ACP session ─────────── if (!isAbsolute(options.bridgePath) || !existsSync(options.bridgePath)) { failWith(`ACP bridge path invalid (must be an absolute, existing binary): ${options.bridgePath}`); return; } - child = spawn(options.bridgePath, [], { stdio: ["pipe", "pipe", "pipe"], cwd: options.cwd ?? process.cwd(), env: buildBridgeEnv(options.bridgeEnv) }); + child = spawn(options.bridgePath, [], { stdio: ["pipe", "pipe", "pipe"], cwd, env: buildBridgeEnv(options.bridgeEnv) }); registerProcess(child); getStderr = captureStderr(child); - child.on("error", (e) => failWith(`ACP bridge spawn failed: ${e.message}`)); - child.on("close", (code) => { if (!ended) failWith(`ACP bridge exited (code ${code ?? "?"})${getStderr ? `: ${getStderr().slice(-500)}` : ""}`); }); + // Stable router indirection: the long-lived connection + child handlers + // always dispatch to whichever turn currently owns the connection. On + // reuse we repoint `router.*` at the new turn; `router.fail` lets a + // warm-child death fail the CURRENT owner (not the cold turn it spawned). + const router: AcpRouter = { onUpdate: handleUpdate, onPermission: handlePermission, fail: failWith }; + child.on("error", (e) => router.fail?.(`ACP bridge spawn failed: ${e.message}`)); + child.on("close", (code) => { + const msg = `ACP bridge exited (code ${code ?? "?"})${getStderr ? `: ${getStderr().slice(-500)}` : ""}`; + const fail = router.fail; // capture before evict nulls it + if (reuseKey && cacheEntry) evictCachedAcpConn(reuseKey, cacheEntry); // a dead child can never be reused + fail?.(msg); // fail the owning turn (no-op if idle / already ended) + }); onAbort = () => failWith("aborted"); if (options.signal) options.signal.addEventListener("abort", onAbort, { once: true }); armInactivity(); @@ -355,10 +525,15 @@ export function streamViaAcp( Writable.toWeb(child.stdin!) as unknown as WritableStream, Readable.toWeb(child.stdout!) as unknown as ReadableStream, ); - const conn = new ClientSideConnection(() => clientHandler, acpStream); - - const withTimeout = (p: Promise, label: string) => - Promise.race([p, new Promise((_, rej) => setTimeout(() => rej(new Error(`ACP ${label} timeout`)), INITIALIZE_TIMEOUT_MS))]); + const conn = new ClientSideConnection( + () => ({ + sessionUpdate: (p) => router.onUpdate?.(p as Parameters>[0]) ?? Promise.resolve(), + requestPermission: (p) => + router.onPermission?.(p as Parameters>[0]) ?? + Promise.resolve({ outcome: { outcome: "cancelled" as const } }), + }), + acpStream, + ); const init = await withTimeout( conn.initialize({ protocolVersion: PROTOCOL_VERSION, clientCapabilities: { fs: { readTextFile: false, writeTextFile: false } } }), @@ -367,10 +542,17 @@ export function streamViaAcp( if (ended) return; if (init.protocolVersion !== PROTOCOL_VERSION) { failWith(`incompatible ACP protocol ${init.protocolVersion}`); return; } - const opened = await withTimeout(conn.newSession({ cwd: options.cwd ?? process.cwd(), mcpServers: options.mcpServers ?? [] }), "newSession"); + const opened = await withTimeout(conn.newSession({ cwd, mcpServers: options.mcpServers ?? [] }), "newSession"); if (ended) return; - const cwd = options.cwd ?? process.cwd(); + // Cache the warm connection so the next turn of this conversation reuses + // it. Only when reuse is enabled (reuseKey set) and the child is live. + if (reuseKey) { + myTurn = ++acpTurnCounter; + cacheEntry = { conn, child, acpSessionId: opened.sessionId, cwd, inUse: true, router, activeTurn: myTurn }; + acpSessionCache.set(reuseKey, cacheEntry); + } + const systemPrompt = buildSystemPrompt(context, cwd); const blocks = [ ...(systemPrompt ? [{ type: "text" as const, text: `${systemPrompt}\n\n` }] : []), @@ -379,30 +561,12 @@ export function streamViaAcp( // ACP ContentBlock[] — text/image shapes match; cast through unknown. const res = await conn.prompt({ sessionId: opened.sessionId, prompt: blocks as unknown as Parameters[0]["prompt"] }); + if (ended) return; // Feed token usage (experimental ACP field) into the bridge BEFORE finish() // so it lands in the `done` message. Tool-use turns break early and never // resolve here, so they inherently report zero usage. Zero-when-absent safe. - if (!sawToolCall) { - const u = (res as { usage?: Record }).usage; - // The bridge is untrusted (see BRIDGE_ENV_ALLOWLIST): coerce its usage - // payload to finite, non-negative numbers only so a malformed value - // (string/NaN/negative) can't corrupt totalTokens / cost downstream. - const num = (x: unknown): number | undefined => - typeof x === "number" && Number.isFinite(x) && x >= 0 ? x : undefined; - if (u) { - bridge.handleEvent({ - type: "message_delta", - delta: {}, - usage: { - input_tokens: num(u.inputTokens), - output_tokens: num(u.outputTokens), - cache_read_input_tokens: num(u.cachedReadTokens), - cache_creation_input_tokens: num(u.cachedWriteTokens), - }, - } as ClaudeApiEvent); - } - finish("stop"); - } + emitUsage(res); + if (!sawToolCall) finish("stop"); } catch (err) { failWith(err instanceof Error ? err.message : String(err)); } From 65c49585d1bc180924ad39411779711d2dc3efe7 Mon Sep 17 00:00:00 2001 From: gsxdsm Date: Mon, 15 Jun 2026 14:52:58 -0700 Subject: [PATCH 7/7] fix(review): address PR #1682 re-review (reuse concurrency + auth hardening) - P1 (Greptile): a tool-use break-early turn released the warm connection (inUse=false) while conn.prompt() was still pending, letting the next turn launch a concurrent prompt on the same ACP session (protocol corruption). keepWarm now requires !sawToolCall, so a tool-use turn tears the connection down like the non-reuse path; only a clean stop turn (prompt fully resolved before finish) keeps it warm. + test. - buildBridgeEnv: treat a whitespace-only auth var as absent (v.trim()), so a blank higher-preference token can't shadow a real lower-preference one and we never forward a useless blank token. + test. - Auth-forwarding tests: clear ambient auth vars in beforeEach so a runner-env token can't shadow the case under test (CodeRabbit). - Doc: clarify the allow-list never carries API keys by default; the single FUSION_CLAUDE_ACP_FORWARD_AUTH opt-in (default OFF) is the only exception. 348/348 pass, tsc clean. Co-Authored-By: Claude Opus 4.8 --- ...t-logged-in-thin-env-keychain-isolation.md | 2 + .../src/__tests__/acp-driver.test.ts | 41 +++++++++++++++++++ packages/pi-claude-cli/src/acp-driver.ts | 13 +++++- 3 files changed, 54 insertions(+), 2 deletions(-) diff --git a/docs/solutions/integration-issues/acp-bridge-not-logged-in-thin-env-keychain-isolation.md b/docs/solutions/integration-issues/acp-bridge-not-logged-in-thin-env-keychain-isolation.md index 475ccc457c..0f83b602fc 100644 --- a/docs/solutions/integration-issues/acp-bridge-not-logged-in-thin-env-keychain-isolation.md +++ b/docs/solutions/integration-issues/acp-bridge-not-logged-in-thin-env-keychain-isolation.md @@ -64,6 +64,8 @@ function buildBridgeEnv(supplied?: NodeJS.ProcessEnv): NodeJS.ProcessEnv { The critical additions over a naive `{HOME, PATH}` env are **`XDG_CONFIG_HOME`, `XDG_CACHE_HOME`, `USER`, `SHELL`, `LANG`**. With the full list, auth succeeds immediately. +> The allow-list itself never carries API keys. The one exception is an **explicit operator opt-in**, `FUSION_CLAUDE_ACP_FORWARD_AUTH=1`, which forwards a single Claude auth token (`CLAUDE_CODE_OAUTH_TOKEN` > `ANTHROPIC_AUTH_TOKEN` > `ANTHROPIC_API_KEY`) for headless daemons that can't reach the login Keychain (gate R17). It is **OFF by default**, so the no-secrets posture above is the standing default — the opt-in only widens exposure when the operator deliberately enables it. + **2. The Keychain finding (gate R17).** Claude Code stores its OAuth credentials in the macOS **login Keychain** as a generic-password item (service `"Claude Code-credentials"`), *not* a file (`~/.claude/.credentials.json` is an empty directory). A detached/headless process runs in a **different security session** and cannot read the login Keychain, so it fails regardless of env; a login-session process (interactive terminal, or an `fn` daemon launched from a login shell) can. This is codified as gate **R17**: the provider's runtime must have login-Keychain access. The driver also detects a not-logged-in turn and writes a best-effort cross-process signal (`fusion-acp-bridge-auth.json`) that `GET /providers/claude-cli/status` reads, so the dashboard can raise an auth-failure banner with a "Use `claude -p`" fallback. ## Why This Works diff --git a/packages/pi-claude-cli/src/__tests__/acp-driver.test.ts b/packages/pi-claude-cli/src/__tests__/acp-driver.test.ts index ee9e1a854d..8b2620e0e1 100644 --- a/packages/pi-claude-cli/src/__tests__/acp-driver.test.ts +++ b/packages/pi-claude-cli/src/__tests__/acp-driver.test.ts @@ -281,6 +281,30 @@ describe("connection reuse (item 1) — gated by FUSION_CLAUDE_ACP_REUSE", () => expect(vi.mocked(spawn)).toHaveBeenCalledTimes(2); // turn 1 + the post-death cold restart }); + it("does NOT keep the connection warm after a tool-use (break-early) turn — prompt() still pending", async () => { + process.env.FUSION_CLAUDE_ACP_REUSE = "1"; + const reuseOpts = { ...OPTS, sessionId: "conv-tooluse" }; + + // Turn 1 (cold) breaks early on a pi-known tool — prompt() never resolves + // (the break happens mid-stream), so the connection must be torn down, not + // released warm, or turn 2 would launch a concurrent prompt on it. + const ctx1 = { messages: [{ role: "user", content: "hi" }, { role: "assistant", content: "hello" }] } as never; + scriptedUpdates = [ + { sessionUpdate: "tool_call", toolCallId: "t1", _meta: { claudeCode: { toolName: "mcp__custom-tools__fn_task_list" } }, rawInput: {} }, + ]; + const s1 = streamViaAcp(MODEL, ctx1, reuseOpts) as unknown as { _events: Array> }; + await flush(); + expect(s1._events.find((e) => e.type === "done")!.reason).toBe("toolUse"); + expect(vi.mocked(spawn)).toHaveBeenCalledTimes(1); + + // Turn 2 must cold-spawn a fresh bridge (no warm reuse after a tool turn). + scriptedUpdates = [{ sessionUpdate: "agent_message_chunk", content: { type: "text", text: "after tool" } }]; + const ctx2 = { messages: [...(ctx1 as unknown as { messages: unknown[] }).messages, { role: "user", content: "again" }] } as never; + streamViaAcp(MODEL, ctx2, reuseOpts); + await flush(); + expect(vi.mocked(spawn)).toHaveBeenCalledTimes(2); // fresh spawn → no concurrent prompt on a warm conn + }); + it("cold-starts (no warm reuse) when the resume delta is empty (P1: no empty-prompt hang)", async () => { process.env.FUSION_CLAUDE_ACP_REUSE = "1"; const reuseOpts = { ...OPTS, sessionId: "conv-empty" }; @@ -324,6 +348,14 @@ describe("buildBridgeEnv — R17 auth opt-in (item 3)", () => { authTok: process.env.ANTHROPIC_AUTH_TOKEN, key: process.env.ANTHROPIC_API_KEY, }; + // Start each case from a clean slate so an ambient auth var in the runner's + // env can't shadow the token a test means to exercise (precedence is global). + beforeEach(() => { + delete process.env.FUSION_CLAUDE_ACP_FORWARD_AUTH; + delete process.env.CLAUDE_CODE_OAUTH_TOKEN; + delete process.env.ANTHROPIC_AUTH_TOKEN; + delete process.env.ANTHROPIC_API_KEY; + }); afterEach(() => { for (const [k, v] of [ ["FUSION_CLAUDE_ACP_FORWARD_AUTH", saved.flag], @@ -370,6 +402,15 @@ describe("buildBridgeEnv — R17 auth opt-in (item 3)", () => { expect(env.ANTHROPIC_API_KEY).toBeUndefined(); }); + it("treats a whitespace-only higher-preference token as absent (no shadowing, no blank forward)", () => { + process.env.FUSION_CLAUDE_ACP_FORWARD_AUTH = "1"; + process.env.CLAUDE_CODE_OAUTH_TOKEN = " "; // blank → must be skipped + process.env.ANTHROPIC_API_KEY = "sk-real"; + const env = buildBridgeEnv({ HOME: "/h", PATH: "/b" }); + expect(env.CLAUDE_CODE_OAUTH_TOKEN).toBeUndefined(); + expect(env.ANTHROPIC_API_KEY).toBe("sk-real"); // real lower-preference token wins + }); + it("reads the auth token from process.env, never a caller-supplied value (no token substitution)", () => { process.env.FUSION_CLAUDE_ACP_FORWARD_AUTH = "1"; delete process.env.CLAUDE_CODE_OAUTH_TOKEN; diff --git a/packages/pi-claude-cli/src/acp-driver.ts b/packages/pi-claude-cli/src/acp-driver.ts index d034475dda..499b001a1d 100644 --- a/packages/pi-claude-cli/src/acp-driver.ts +++ b/packages/pi-claude-cli/src/acp-driver.ts @@ -148,7 +148,10 @@ export function buildBridgeEnv(supplied?: NodeJS.ProcessEnv): NodeJS.ProcessEnv if (process.env.FUSION_CLAUDE_ACP_FORWARD_AUTH === "1") { for (const key of BRIDGE_AUTH_ENV_KEYS) { const v = process.env[key]; - if (typeof v === "string" && v.length > 0) { + // Treat a whitespace-only value as absent, so a blank higher-preference + // var doesn't shadow a real lower-preference token (and we never forward + // a useless blank token). + if (typeof v === "string" && v.trim().length > 0) { env[key] = v; break; // forward only the highest-preference token that's present } @@ -290,8 +293,14 @@ export function streamViaAcp( if (inactivity) { clearTimeout(inactivity); inactivity = undefined; } if (onAbort && options.signal) options.signal.removeEventListener("abort", onAbort); const entry = cacheEntry; + // A tool-use turn breaks early while `conn.prompt()` is still pending (we + // never await it on break). Releasing the connection as warm here would + // let the next turn launch a SECOND concurrent prompt on the same ACP + // session — protocol corruption. So a tool-use turn always tears down, + // exactly like the non-reuse path; only a clean `stop` turn (prompt fully + // resolved before finish) keeps the connection warm. const keepWarm = - !destroy && entry !== undefined && reuseKey !== undefined && + !destroy && !sawToolCall && entry !== undefined && reuseKey !== undefined && acpSessionCache.get(reuseKey) === entry; if (keepWarm) { // Release the warm connection: drop this turn's handlers (so a late