From bb3031e85a2cd2b89c1f01012190212e50fad600 Mon Sep 17 00:00:00 2001 From: Peter Kirkham Date: Fri, 3 Apr 2026 12:49:20 +0100 Subject: [PATCH] feat: update codex adapter and migrate to base acp prxy for agent --- apps/code/scripts/download-binaries.mjs | 2 +- packages/agent/src/adapters/acp-connection.ts | 387 +++--------------- packages/agent/src/adapters/base-acp-agent.ts | 15 +- .../agent/src/adapters/codex/codex-agent.ts | 355 ++++++++++++++++ .../agent/src/adapters/codex/codex-client.ts | 151 +++++++ .../agent/src/adapters/codex/session-state.ts | 65 +++ packages/agent/src/adapters/codex/settings.ts | 127 ++++++ 7 files changed, 765 insertions(+), 337 deletions(-) create mode 100644 packages/agent/src/adapters/codex/codex-agent.ts create mode 100644 packages/agent/src/adapters/codex/codex-client.ts create mode 100644 packages/agent/src/adapters/codex/session-state.ts create mode 100644 packages/agent/src/adapters/codex/settings.ts diff --git a/apps/code/scripts/download-binaries.mjs b/apps/code/scripts/download-binaries.mjs index 1a694c958..a8e6d964d 100644 --- a/apps/code/scripts/download-binaries.mjs +++ b/apps/code/scripts/download-binaries.mjs @@ -19,7 +19,7 @@ const DEST_DIR = join(__dirname, "..", "resources", "codex-acp"); const BINARIES = [ { name: "codex-acp", - version: "0.9.5", + version: "0.11.1", getUrl: (version, target) => { const ext = target.includes("windows") ? "zip" : "tar.gz"; return `https://github.com/zed-industries/codex-acp/releases/download/v${version}/codex-acp-${version}-${target}.${ext}`; diff --git a/packages/agent/src/adapters/acp-connection.ts b/packages/agent/src/adapters/acp-connection.ts index 9567b033f..8bf446676 100644 --- a/packages/agent/src/adapters/acp-connection.ts +++ b/packages/agent/src/adapters/acp-connection.ts @@ -1,18 +1,15 @@ import { AgentSideConnection, ndJsonStream } from "@agentclientprotocol/sdk"; -import { POSTHOG_NOTIFICATIONS } from "../acp-extensions"; -import { formatModelId } from "../gateway-models"; import type { SessionLogWriter } from "../session-log-writer"; import type { ProcessSpawnedCallback } from "../types"; import { Logger } from "../utils/logger"; import { createBidirectionalStreams, createTappedWritableStream, - nodeReadableToWebReadable, - nodeWritableToWebWritable, type StreamPair, } from "../utils/streams"; import { ClaudeAcpAgent } from "./claude/claude-agent"; -import { type CodexProcessOptions, spawnCodexProcess } from "./codex/spawn"; +import { CodexAcpAgent } from "./codex/codex-agent"; +import type { CodexProcessOptions } from "./codex/spawn"; type AgentAdapter = "claude" | "codex"; @@ -37,108 +34,6 @@ export type AcpConnection = { export type InProcessAcpConnection = AcpConnection; -type ModelOption = { value?: string; name?: string }; -type ModelGroup = { group?: string; name?: string; options?: ModelOption[] }; - -type ConfigOption = { - id?: string; - category?: string | null; - currentValue?: string; - options?: Array; -}; - -function isGroupedOptions( - options: NonNullable, -): options is ModelGroup[] { - return options.length > 0 && "group" in options[0]; -} - -function formatOption(o: ModelOption): ModelOption { - if (!o.value) return o; - return { ...o, name: formatModelId(o.value) }; -} - -function filterModelConfigOptions( - msg: Record, - allowedModelIds: Set, -): Record | null { - const payload = msg as { - method?: string; - result?: { configOptions?: ConfigOption[] }; - params?: { - update?: { sessionUpdate?: string; configOptions?: ConfigOption[] }; - }; - }; - - const configOptions = - payload.result?.configOptions ?? payload.params?.update?.configOptions; - if (!configOptions) return null; - - const filtered = configOptions.map((opt) => { - if (opt.category !== "model" || !opt.options) return opt; - - const options = opt.options; - if (isGroupedOptions(options)) { - const filteredOptions = options.map((group) => ({ - ...group, - options: (group.options ?? []) - .filter((o) => o?.value && allowedModelIds.has(o.value)) - .map(formatOption), - })); - const flat = filteredOptions.flatMap((g) => g.options ?? []); - const currentAllowed = - opt.currentValue && allowedModelIds.has(opt.currentValue); - const nextCurrent = - currentAllowed || flat.length === 0 ? opt.currentValue : flat[0]?.value; - - return { - ...opt, - currentValue: nextCurrent, - options: filteredOptions, - }; - } - - const valueOptions = options as ModelOption[]; - const filteredOptions = valueOptions - .filter((o) => o?.value && allowedModelIds.has(o.value)) - .map(formatOption); - const currentAllowed = - opt.currentValue && allowedModelIds.has(opt.currentValue); - const nextCurrent = - currentAllowed || filteredOptions.length === 0 - ? opt.currentValue - : filteredOptions[0]?.value; - - return { - ...opt, - currentValue: nextCurrent, - options: filteredOptions, - }; - }); - - if (payload.result?.configOptions) { - return { ...msg, result: { ...payload.result, configOptions: filtered } }; - } - if (payload.params?.update?.configOptions) { - return { - ...msg, - params: { - ...payload.params, - update: { ...payload.params.update, configOptions: filtered }, - }, - }; - } - return null; -} - -function extractReasoningEffort( - configOptions: ConfigOption[] | undefined, -): string | undefined { - if (!configOptions) return undefined; - const option = configOptions.find((opt) => opt.id === "reasoning_effort"); - return option?.currentValue ?? undefined; -} - /** * Creates an ACP connection with the specified agent framework. * @@ -234,247 +129,51 @@ function createClaudeConnection(config: AcpConnectionConfig): AcpConnection { }; } +/** + * Creates an ACP connection to codex-acp via an in-process proxy agent. + * + * The CodexAcpAgent implements the ACP Agent interface and delegates to + * the codex-acp binary over a ClientSideConnection. This replaces the + * previous raw stream transform approach and gives us proper interception + * points for PostHog-specific features. + */ function createCodexConnection(config: AcpConnectionConfig): AcpConnection { const logger = config.logger?.child("CodexConnection") ?? new Logger({ debug: true, prefix: "[CodexConnection]" }); const { logWriter } = config; - const allowedModelIds = config.allowedModelIds; - - const codexProcess = spawnCodexProcess({ - ...config.codexOptions, - logger, - processCallbacks: config.processCallbacks, - }); - - let clientReadable = nodeReadableToWebReadable(codexProcess.stdout); - let clientWritable = nodeWritableToWebWritable(codexProcess.stdin); - - let isLoadingSession = false; - let loadRequestId: string | number | null = null; - let newSessionRequestId: string | number | null = null; - let sdkSessionEmitted = false; - const reasoningEffortBySessionId = new Map(); - let injectedConfigId = 0; - - const decoder = new TextDecoder(); - const encoder = new TextEncoder(); - let readBuffer = ""; - - const taskRunId = config.taskRunId; - - const filteringReadable = clientReadable.pipeThrough( - new TransformStream({ - transform(chunk, controller) { - readBuffer += decoder.decode(chunk, { stream: true }); - const lines = readBuffer.split("\n"); - readBuffer = lines.pop() ?? ""; - - const outputLines: string[] = []; - - for (const line of lines) { - const trimmed = line.trim(); - if (!trimmed) { - outputLines.push(line); - continue; - } - - let shouldFilter = false; - try { - const msg = JSON.parse(trimmed); - const sessionId = - msg?.params?.sessionId ?? msg?.result?.sessionId ?? null; - const configOptions = - msg?.result?.configOptions ?? msg?.params?.update?.configOptions; - if (sessionId && configOptions) { - const effort = extractReasoningEffort(configOptions); - if (effort) { - reasoningEffortBySessionId.set(sessionId, effort); - } - } - - if ( - !sdkSessionEmitted && - newSessionRequestId !== null && - msg.id === newSessionRequestId && - "result" in msg - ) { - const sessionId = msg.result?.sessionId; - if (sessionId && taskRunId) { - const sdkSessionNotification = { - jsonrpc: "2.0", - method: POSTHOG_NOTIFICATIONS.SDK_SESSION, - params: { - taskRunId, - sessionId, - adapter: "codex", - }, - }; - outputLines.push(JSON.stringify(sdkSessionNotification)); - sdkSessionEmitted = true; - } - newSessionRequestId = null; - } - - if (isLoadingSession) { - if (msg.id === loadRequestId && "result" in msg) { - logger.debug("session/load complete, resuming stream"); - isLoadingSession = false; - loadRequestId = null; - } else if (msg.method === "session/update") { - shouldFilter = true; - } - } - - if (!shouldFilter && allowedModelIds && allowedModelIds.size > 0) { - const updated = filterModelConfigOptions(msg, allowedModelIds); - if (updated) { - outputLines.push(JSON.stringify(updated)); - continue; - } - } - } catch { - // Not valid JSON, pass through - } - - if (!shouldFilter) { - outputLines.push(line); - const isChunkNoise = - trimmed.includes('"sessionUpdate":"agent_message_chunk"') || - trimmed.includes('"sessionUpdate":"agent_thought_chunk"'); - if (!isChunkNoise) { - logger.debug("codex-acp stdout:", trimmed); - } - } - } - - if (outputLines.length > 0) { - const output = `${outputLines.join("\n")}\n`; - controller.enqueue(encoder.encode(output)); - } - }, - flush(controller) { - if (readBuffer.trim()) { - controller.enqueue(encoder.encode(readBuffer)); - } - }, - }), - ); - clientReadable = filteringReadable; - - const originalWritable = clientWritable; - clientWritable = new WritableStream({ - write(chunk) { - const text = decoder.decode(chunk, { stream: true }); - const trimmed = text.trim(); - logger.debug("codex-acp stdin:", trimmed); - - try { - const msg = JSON.parse(trimmed); - if ( - msg.method === "session/set_config_option" && - msg.params?.configId === "reasoning_effort" && - msg.params?.sessionId && - msg.params?.value - ) { - reasoningEffortBySessionId.set( - msg.params.sessionId, - msg.params.value, - ); - } - if (msg.method === "session/prompt" && msg.params?.sessionId) { - const effort = reasoningEffortBySessionId.get(msg.params.sessionId); - if (effort) { - const injection = { - jsonrpc: "2.0", - id: `reasoning_effort_${Date.now()}_${injectedConfigId++}`, - method: "session/set_config_option", - params: { - sessionId: msg.params.sessionId, - configId: "reasoning_effort", - value: effort, - }, - }; - const injectionLine = `${JSON.stringify(injection)}\n`; - const writer = originalWritable.getWriter(); - return writer - .write(encoder.encode(injectionLine)) - .then(() => writer.releaseLock()) - .then(() => { - const nextWriter = originalWritable.getWriter(); - return nextWriter - .write(chunk) - .finally(() => nextWriter.releaseLock()); - }); - } - } - if (msg.method === "session/new" && msg.id) { - logger.debug("session/new detected, tracking request ID"); - newSessionRequestId = msg.id; - } else if (msg.method === "session/load" && msg.id) { - logger.debug("session/load detected, pausing stream updates"); - isLoadingSession = true; - loadRequestId = msg.id; - } - } catch { - // Not valid JSON - } - - const writer = originalWritable.getWriter(); - return writer.write(chunk).finally(() => writer.releaseLock()); - }, - close() { - const writer = originalWritable.getWriter(); - return writer.close().finally(() => writer.releaseLock()); - }, - }); + // Create bidirectional streams for client ↔ agent communication + const streams = createBidirectionalStreams(); - const shouldTapLogs = config.taskRunId && logWriter; + let agentWritable = streams.agent.writable; + let clientWritable = streams.client.writable; - if (shouldTapLogs && config.taskRunId) { - const taskRunId = config.taskRunId; - if (!logWriter.isRegistered(taskRunId)) { - logWriter.register(taskRunId, { - taskId: config.taskId ?? taskRunId, - runId: taskRunId, + // Tap streams for session log writing + if (config.taskRunId && logWriter) { + if (!logWriter.isRegistered(config.taskRunId)) { + logWriter.register(config.taskRunId, { + taskId: config.taskId ?? config.taskRunId, + runId: config.taskRunId, + deviceType: config.deviceType, }); } - clientWritable = createTappedWritableStream(clientWritable, { + const taskRunId = config.taskRunId; + agentWritable = createTappedWritableStream(streams.agent.writable, { onMessage: (line) => { logWriter.appendRawLine(taskRunId, line); }, logger, }); - const originalReadable = clientReadable; - const logDecoder = new TextDecoder(); - let logBuffer = ""; - - clientReadable = originalReadable.pipeThrough( - new TransformStream({ - transform(chunk, controller) { - logBuffer += logDecoder.decode(chunk, { stream: true }); - const lines = logBuffer.split("\n"); - logBuffer = lines.pop() ?? ""; - - for (const line of lines) { - if (line.trim()) { - logWriter.appendRawLine(taskRunId, line); - } - } - - controller.enqueue(chunk); - }, - flush() { - if (logBuffer.trim()) { - logWriter.appendRawLine(taskRunId, logBuffer); - } - }, - }), - ); + clientWritable = createTappedWritableStream(streams.client.writable, { + onMessage: (line) => { + logWriter.appendRawLine(taskRunId, line); + }, + logger, + }); } else { logger.info("Tapped streams NOT enabled for Codex", { hasTaskRunId: !!config.taskRunId, @@ -482,18 +181,38 @@ function createCodexConnection(config: AcpConnectionConfig): AcpConnection { }); } + const agentStream = ndJsonStream(agentWritable, streams.agent.readable); + + let agent: CodexAcpAgent | null = null; + const agentConnection = new AgentSideConnection((client) => { + agent = new CodexAcpAgent(client, { + codexProcessOptions: config.codexOptions ?? {}, + processCallbacks: config.processCallbacks, + }); + logger.info(`Created ${agent.adapterName} agent`); + return agent; + }, agentStream); + return { - agentConnection: undefined, + agentConnection, clientStreams: { - readable: clientReadable, + readable: streams.client.readable, writable: clientWritable, }, cleanup: async () => { logger.info("Cleaning up Codex connection"); - codexProcess.kill(); + + if (agent) { + await agent.closeSession(); + } try { - await clientWritable.close(); + await streams.client.writable.close(); + } catch { + // Stream may already be closed + } + try { + await streams.agent.writable.close(); } catch { // Stream may already be closed } diff --git a/packages/agent/src/adapters/base-acp-agent.ts b/packages/agent/src/adapters/base-acp-agent.ts index 792efe416..3ef7417d7 100644 --- a/packages/agent/src/adapters/base-acp-agent.ts +++ b/packages/agent/src/adapters/base-acp-agent.ts @@ -24,14 +24,25 @@ import { isAnthropicModel, } from "../gateway-models"; import { Logger } from "../utils/logger"; -import type { SettingsManager } from "./claude/session/settings"; +/** + * Shared settings manager interface that both Claude's SettingsManager + * and Codex's CodexSettingsManager implement. BaseAcpAgent only calls + * dispose() on this; each adapter's Session type narrows it to the + * concrete implementation. + */ +export interface BaseSettingsManager { + dispose(): void; + getCwd(): string; + setCwd(cwd: string): Promise; + initialize(): Promise; +} export interface BaseSession { notificationHistory: SessionNotification[]; cancelled: boolean; interruptReason?: string; abortController: AbortController; - settingsManager: SettingsManager; + settingsManager: BaseSettingsManager; } const DEFAULT_CONTEXT_WINDOW = 200_000; diff --git a/packages/agent/src/adapters/codex/codex-agent.ts b/packages/agent/src/adapters/codex/codex-agent.ts new file mode 100644 index 000000000..b939e52d3 --- /dev/null +++ b/packages/agent/src/adapters/codex/codex-agent.ts @@ -0,0 +1,355 @@ +/** + * In-process ACP proxy agent for Codex. + * + * Implements the ACP Agent interface and delegates to the codex-acp binary + * via a ClientSideConnection. This gives us interception points for: + * - PostHog-specific notifications (sdk_session, usage_update, turn_complete) + * - Session resume/fork (not natively supported by codex-acp) + * - Usage accumulation + * - System prompt injection + */ + +import { + type AgentSideConnection, + type AuthenticateRequest, + type CancelNotification, + ClientSideConnection, + type ForkSessionRequest, + type ForkSessionResponse, + type InitializeRequest, + type InitializeResponse, + type ListSessionsRequest, + type ListSessionsResponse, + type LoadSessionRequest, + type LoadSessionResponse, + type NewSessionRequest, + type NewSessionResponse, + ndJsonStream, + type PromptRequest, + type PromptResponse, + type ResumeSessionRequest, + type ResumeSessionResponse, + type SetSessionConfigOptionRequest, + type SetSessionConfigOptionResponse, + type SetSessionModeRequest, + type SetSessionModeResponse, +} from "@agentclientprotocol/sdk"; +import packageJson from "../../../package.json" with { type: "json" }; +import { POSTHOG_NOTIFICATIONS } from "../../acp-extensions"; +import type { ProcessSpawnedCallback } from "../../types"; +import { Logger } from "../../utils/logger"; +import { + nodeReadableToWebReadable, + nodeWritableToWebWritable, +} from "../../utils/streams"; +import { BaseAcpAgent, type BaseSession } from "../base-acp-agent"; +import { createCodexClient } from "./codex-client"; +import { + type CodexSessionState, + createSessionState, + resetUsage, +} from "./session-state"; +import { CodexSettingsManager } from "./settings"; +import { + type CodexProcess, + type CodexProcessOptions, + spawnCodexProcess, +} from "./spawn"; + +interface NewSessionMeta { + taskRunId?: string; + taskId?: string; + systemPrompt?: string; + permissionMode?: string; + model?: string; + persistence?: { taskId?: string; runId?: string; logUrl?: string }; + claudeCode?: { + options?: Record; + }; + additionalRoots?: string[]; + disableBuiltInTools?: boolean; + allowedDomains?: string[]; +} + +export interface CodexAcpAgentOptions { + codexProcessOptions: CodexProcessOptions; + processCallbacks?: ProcessSpawnedCallback; +} + +type CodexSession = BaseSession & { + settingsManager: CodexSettingsManager; +}; + +export class CodexAcpAgent extends BaseAcpAgent { + readonly adapterName = "codex"; + declare session: CodexSession; + private codexProcess: CodexProcess; + private codexConnection!: ClientSideConnection; + private sessionState!: CodexSessionState; + + constructor(client: AgentSideConnection, options: CodexAcpAgentOptions) { + super(client); + this.logger = new Logger({ debug: true, prefix: "[CodexAcpAgent]" }); + + // Spawn the codex-acp subprocess + this.codexProcess = spawnCodexProcess({ + ...options.codexProcessOptions, + logger: this.logger, + processCallbacks: options.processCallbacks, + }); + + // Create ACP connection to codex-acp over stdin/stdout + const codexReadable = nodeReadableToWebReadable(this.codexProcess.stdout); + const codexWritable = nodeWritableToWebWritable(this.codexProcess.stdin); + const codexStream = ndJsonStream(codexWritable, codexReadable); + + // Set up session with CodexSettingsManager + const cwd = options.codexProcessOptions.cwd ?? process.cwd(); + const settingsManager = new CodexSettingsManager(cwd); + const abortController = new AbortController(); + this.session = { + abortController, + settingsManager, + notificationHistory: [], + cancelled: false, + }; + + // Create the ClientSideConnection to codex-acp. + // The Client handler delegates all requests from codex-acp to the upstream + // PostHog Code client via our AgentSideConnection. + this.codexConnection = new ClientSideConnection( + (_agent) => + createCodexClient( + this.client, + this.logger, + this.sessionState ?? { + sessionId: "", + cwd: "", + modeId: "default", + configOptions: [], + accumulatedUsage: { + inputTokens: 0, + outputTokens: 0, + cachedReadTokens: 0, + cachedWriteTokens: 0, + }, + cancelled: false, + }, + ), + codexStream, + ); + } + + async initialize(request: InitializeRequest): Promise { + // Initialize settings + await this.session.settingsManager.initialize(); + + // Forward to codex-acp + const response = await this.codexConnection.initialize(request); + + // Merge our enhanced capabilities + return { + ...response, + agentCapabilities: { + ...response.agentCapabilities, + sessionCapabilities: { + ...response.agentCapabilities?.sessionCapabilities, + resume: {}, + fork: {}, + }, + _meta: { + posthog: { + resumeSession: true, + }, + }, + }, + agentInfo: { + name: packageJson.name, + title: "Codex Agent", + version: packageJson.version, + }, + }; + } + + async newSession(params: NewSessionRequest): Promise { + const meta = params._meta as NewSessionMeta | undefined; + + const response = await this.codexConnection.newSession(params); + + // Initialize session state + this.sessionState = createSessionState(response.sessionId, params.cwd, { + taskRunId: meta?.taskRunId, + taskId: meta?.taskId ?? meta?.persistence?.taskId, + modeId: response.modes?.currentModeId ?? "default", + modelId: response.models?.currentModelId, + }); + this.sessionId = response.sessionId; + this.sessionState.configOptions = response.configOptions ?? []; + + // Emit _posthog/sdk_session so the app can track the session + if (meta?.taskRunId) { + await this.client.extNotification(POSTHOG_NOTIFICATIONS.SDK_SESSION, { + taskRunId: meta.taskRunId, + sessionId: response.sessionId, + adapter: "codex", + }); + } + + this.logger.info("Codex session created", { + sessionId: response.sessionId, + taskRunId: meta?.taskRunId, + }); + + return response; + } + + async loadSession(params: LoadSessionRequest): Promise { + const response = await this.codexConnection.loadSession(params); + + // Update session state + this.sessionState = createSessionState(params.sessionId, params.cwd); + this.sessionId = params.sessionId; + this.sessionState.configOptions = response.configOptions ?? []; + + return response; + } + + async unstable_resumeSession( + params: ResumeSessionRequest, + ): Promise { + // codex-acp doesn't support resume natively, use loadSession instead + const loadResponse = await this.codexConnection.loadSession({ + sessionId: params.sessionId, + cwd: params.cwd, + mcpServers: params.mcpServers ?? [], + }); + + this.sessionState = createSessionState(params.sessionId, params.cwd); + this.sessionId = params.sessionId; + this.sessionState.configOptions = loadResponse.configOptions ?? []; + + const meta = params._meta as NewSessionMeta | undefined; + if (meta?.taskRunId) { + await this.client.extNotification(POSTHOG_NOTIFICATIONS.SDK_SESSION, { + taskRunId: meta.taskRunId, + sessionId: params.sessionId, + adapter: "codex", + }); + } + + return { + modes: loadResponse.modes, + models: loadResponse.models, + configOptions: loadResponse.configOptions, + }; + } + + async unstable_forkSession( + params: ForkSessionRequest, + ): Promise { + // Create a new session via codex-acp (fork isn't natively supported) + const newResponse = await this.codexConnection.newSession({ + cwd: params.cwd, + mcpServers: params.mcpServers ?? [], + _meta: params._meta, + }); + + this.sessionState = createSessionState(newResponse.sessionId, params.cwd); + this.sessionId = newResponse.sessionId; + this.sessionState.configOptions = newResponse.configOptions ?? []; + + return newResponse; + } + + async listSessions( + params: ListSessionsRequest, + ): Promise { + return this.codexConnection.listSessions(params); + } + + async unstable_listSessions( + params: ListSessionsRequest, + ): Promise { + return this.codexConnection.listSessions(params); + } + + async prompt(params: PromptRequest): Promise { + if (this.sessionState) { + this.sessionState.cancelled = false; + this.sessionState.interruptReason = undefined; + resetUsage(this.sessionState); + } + + const response = await this.codexConnection.prompt(params); + + // Emit PostHog usage notification + if (this.sessionState?.taskRunId && response.usage) { + await this.client.extNotification("_posthog/usage_update", { + sessionId: params.sessionId, + used: { + inputTokens: response.usage.inputTokens ?? 0, + outputTokens: response.usage.outputTokens ?? 0, + cachedReadTokens: response.usage.cachedReadTokens ?? 0, + cachedWriteTokens: response.usage.cachedWriteTokens ?? 0, + }, + cost: null, + }); + } + + return response; + } + + protected async interrupt(): Promise { + if (this.sessionState) { + this.sessionState.cancelled = true; + } + await this.codexConnection.cancel({ + sessionId: this.sessionId, + }); + } + + async cancel(params: CancelNotification): Promise { + if (this.sessionState) { + this.sessionState.cancelled = true; + const meta = params._meta as { interruptReason?: string } | undefined; + if (meta?.interruptReason) { + this.sessionState.interruptReason = meta.interruptReason; + } + } + await this.codexConnection.cancel(params); + } + + async setSessionMode( + params: SetSessionModeRequest, + ): Promise { + const response = await this.codexConnection.setSessionMode(params); + if (this.sessionState) { + this.sessionState.modeId = params.modeId; + } + return response ?? {}; + } + + async setSessionConfigOption( + params: SetSessionConfigOptionRequest, + ): Promise { + const response = await this.codexConnection.setSessionConfigOption(params); + if (this.sessionState && response.configOptions) { + this.sessionState.configOptions = response.configOptions; + } + return response; + } + + async authenticate(_params: AuthenticateRequest): Promise { + // Auth handled externally + } + + async closeSession(): Promise { + this.logger.info("Closing Codex session", { sessionId: this.sessionId }); + this.session.settingsManager.dispose(); + try { + this.codexProcess.kill(); + } catch (err) { + this.logger.warn("Failed to kill codex-acp process", { error: err }); + } + } +} diff --git a/packages/agent/src/adapters/codex/codex-client.ts b/packages/agent/src/adapters/codex/codex-client.ts new file mode 100644 index 000000000..f2966fe89 --- /dev/null +++ b/packages/agent/src/adapters/codex/codex-client.ts @@ -0,0 +1,151 @@ +/** + * ACP Client implementation for communicating with codex-acp subprocess. + * + * This acts as the "client" from codex-acp's perspective: it receives + * permission requests, session updates, file I/O, and terminal operations + * from codex-acp and delegates them to the upstream PostHog Code client. + */ + +import type { + AgentSideConnection, + Client, + CreateTerminalRequest, + CreateTerminalResponse, + KillTerminalRequest, + KillTerminalResponse, + ReadTextFileRequest, + ReadTextFileResponse, + ReleaseTerminalRequest, + ReleaseTerminalResponse, + RequestPermissionRequest, + RequestPermissionResponse, + SessionNotification, + TerminalHandle, + TerminalOutputRequest, + TerminalOutputResponse, + WaitForTerminalExitRequest, + WaitForTerminalExitResponse, + WriteTextFileRequest, + WriteTextFileResponse, +} from "@agentclientprotocol/sdk"; +import type { Logger } from "../../utils/logger"; +import type { CodexSessionState } from "./session-state"; + +export interface CodexClientCallbacks { + /** Called when a usage_update session notification is received */ + onUsageUpdate?: (update: Record) => void; +} + +/** + * Creates an ACP Client that delegates all requests from codex-acp + * to the upstream PostHog Code client (via AgentSideConnection). + */ +export function createCodexClient( + upstreamClient: AgentSideConnection, + logger: Logger, + sessionState: CodexSessionState, + callbacks?: CodexClientCallbacks, +): Client { + // Track terminal handles for delegation + const terminalHandles = new Map(); + + return { + async requestPermission( + params: RequestPermissionRequest, + ): Promise { + logger.debug("Relaying permission request to upstream", { + sessionId: params.sessionId, + }); + return upstreamClient.requestPermission(params); + }, + + async sessionUpdate(params: SessionNotification): Promise { + // Parse usage data from session updates + const update = params.update as Record | undefined; + if (update?.sessionUpdate === "usage_update") { + const used = update.used as number | undefined; + const size = update.size as number | undefined; + if (used !== undefined) sessionState.contextUsed = used; + if (size !== undefined) sessionState.contextSize = size; + callbacks?.onUsageUpdate?.(update); + } + + // Forward to upstream client + await upstreamClient.sessionUpdate(params); + }, + + async readTextFile( + params: ReadTextFileRequest, + ): Promise { + return upstreamClient.readTextFile(params); + }, + + async writeTextFile( + params: WriteTextFileRequest, + ): Promise { + return upstreamClient.writeTextFile(params); + }, + + async createTerminal( + params: CreateTerminalRequest, + ): Promise { + const handle = await upstreamClient.createTerminal(params); + terminalHandles.set(handle.id, handle); + return { terminalId: handle.id }; + }, + + async terminalOutput( + params: TerminalOutputRequest, + ): Promise { + const handle = terminalHandles.get(params.terminalId); + if (!handle) { + return { output: "", truncated: false }; + } + return handle.currentOutput(); + }, + + async releaseTerminal( + params: ReleaseTerminalRequest, + ): Promise { + const handle = terminalHandles.get(params.terminalId); + if (handle) { + terminalHandles.delete(params.terminalId); + const result = await handle.release(); + return result ?? undefined; + } + }, + + async waitForTerminalExit( + params: WaitForTerminalExitRequest, + ): Promise { + const handle = terminalHandles.get(params.terminalId); + if (!handle) { + return { exitCode: 1 }; + } + return handle.waitForExit(); + }, + + async killTerminal( + params: KillTerminalRequest, + ): Promise { + const handle = terminalHandles.get(params.terminalId); + if (handle) { + return handle.kill(); + } + }, + + async extMethod( + method: string, + params: Record, + ): Promise> { + return upstreamClient.extMethod(method, params); + }, + + async extNotification( + method: string, + params: Record, + ): Promise { + return upstreamClient.extNotification(method, params); + }, + }; +} diff --git a/packages/agent/src/adapters/codex/session-state.ts b/packages/agent/src/adapters/codex/session-state.ts new file mode 100644 index 000000000..a1555df23 --- /dev/null +++ b/packages/agent/src/adapters/codex/session-state.ts @@ -0,0 +1,65 @@ +/** + * Session state tracking for Codex proxy agent. + * Tracks usage accumulation, model/mode state, and config options. + */ + +import type { SessionConfigOption } from "@agentclientprotocol/sdk"; + +export interface CodexUsage { + inputTokens: number; + outputTokens: number; + cachedReadTokens: number; + cachedWriteTokens: number; +} + +export interface CodexSessionState { + sessionId: string; + cwd: string; + modelId?: string; + modeId: string; + configOptions: SessionConfigOption[]; + accumulatedUsage: CodexUsage; + contextSize?: number; + contextUsed?: number; + cancelled: boolean; + interruptReason?: string; + taskRunId?: string; + taskId?: string; +} + +export function createSessionState( + sessionId: string, + cwd: string, + opts?: { + taskRunId?: string; + taskId?: string; + modeId?: string; + modelId?: string; + }, +): CodexSessionState { + return { + sessionId, + cwd, + modeId: opts?.modeId ?? "default", + modelId: opts?.modelId, + configOptions: [], + accumulatedUsage: { + inputTokens: 0, + outputTokens: 0, + cachedReadTokens: 0, + cachedWriteTokens: 0, + }, + cancelled: false, + taskRunId: opts?.taskRunId, + taskId: opts?.taskId, + }; +} + +export function resetUsage(state: CodexSessionState): void { + state.accumulatedUsage = { + inputTokens: 0, + outputTokens: 0, + cachedReadTokens: 0, + cachedWriteTokens: 0, + }; +} diff --git a/packages/agent/src/adapters/codex/settings.ts b/packages/agent/src/adapters/codex/settings.ts new file mode 100644 index 000000000..7493df02e --- /dev/null +++ b/packages/agent/src/adapters/codex/settings.ts @@ -0,0 +1,127 @@ +import * as fs from "node:fs"; +import * as os from "node:os"; +import * as path from "node:path"; + +/** + * Codex settings parsed from ~/.codex/config.toml and project-level config. + * + * Mirrors the shape of ClaudeCodeSettings so both adapters have a + * consistent settings interface. + */ +export interface CodexSettings { + model?: string; + personality?: string; + modelReasoningEffort?: string; + trustLevel?: string; +} + +/** + * SettingsManager for Codex sessions. + * + * Reads from ~/.codex/config.toml (user-level) and respects + * per-project trust configuration. Has the same public interface + * as Claude's SettingsManager so both can satisfy BaseSession. + */ +export class CodexSettingsManager { + private cwd: string; + private settings: CodexSettings = {}; + private initialized = false; + + constructor(cwd: string) { + this.cwd = cwd; + } + + async initialize(): Promise { + if (this.initialized) { + return; + } + await this.loadSettings(); + this.initialized = true; + } + + private getConfigPath(): string { + return path.join(os.homedir(), ".codex", "config.toml"); + } + + private async loadSettings(): Promise { + const configPath = this.getConfigPath(); + try { + const content = await fs.promises.readFile(configPath, "utf-8"); + this.settings = parseCodexToml(content, this.cwd); + } catch { + this.settings = {}; + } + } + + getSettings(): CodexSettings { + return this.settings; + } + + getCwd(): string { + return this.cwd; + } + + async setCwd(cwd: string): Promise { + if (this.cwd === cwd) { + return; + } + this.dispose(); + this.cwd = cwd; + this.initialized = false; + await this.initialize(); + } + + dispose(): void { + this.initialized = false; + } +} + +/** + * Minimal TOML parser for codex config.toml. + * Handles flat key=value pairs and [projects."path"] sections. + * Does NOT handle full TOML spec — only what codex config uses. + */ +function parseCodexToml(content: string, cwd: string): CodexSettings { + const settings: CodexSettings = {}; + let currentSection = ""; + + for (const line of content.split("\n")) { + const trimmed = line.trim(); + if (!trimmed || trimmed.startsWith("#")) continue; + + // Section header: [projects."/some/path"] or [section] + const sectionMatch = trimmed.match(/^\[(.+)\]$/); + if (sectionMatch) { + currentSection = sectionMatch[1] ?? ""; + continue; + } + + // Key = value + const kvMatch = trimmed.match(/^(\w+)\s*=\s*(.+)$/); + if (!kvMatch) continue; + + const key = kvMatch[1]; + let value = kvMatch[2]?.trim() ?? ""; + + // Strip quotes + if ( + (value.startsWith('"') && value.endsWith('"')) || + (value.startsWith("'") && value.endsWith("'")) + ) { + value = value.slice(1, -1); + } + + if (!currentSection) { + // Top-level keys + if (key === "model") settings.model = value; + if (key === "personality") settings.personality = value; + if (key === "model_reasoning_effort") + settings.modelReasoningEffort = value; + } else if (currentSection === `projects."${cwd}"`) { + // Project-specific keys + if (key === "trust_level") settings.trustLevel = value; + } + } + + return settings; +}