Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions packages/agent/src/acp-extensions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -64,4 +64,7 @@ export const POSTHOG_NOTIFICATIONS = {

/** Marks a boundary for log compaction */
COMPACT_BOUNDARY: "_posthog/compact_boundary",

/** Token usage update for a session turn */
USAGE_UPDATE: "_posthog/usage_update",
} as const;
22 changes: 13 additions & 9 deletions packages/agent/src/adapters/claude/claude-agent.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ import {
} from "@anthropic-ai/claude-agent-sdk";
import { v7 as uuidv7 } from "uuid";
import packageJson from "../../../package.json" with { type: "json" };
import { POSTHOG_NOTIFICATIONS } from "../../acp-extensions";
import { unreachable, withTimeout } from "../../utils/common";
import { Logger } from "../../utils/logger";
import { Pushable } from "../../utils/streams";
Expand Down Expand Up @@ -442,16 +443,19 @@ export class ClaudeAcpAgent extends BaseAcpAgent {
});
}

await this.client.extNotification("_posthog/usage_update", {
sessionId: params.sessionId,
used: {
inputTokens: message.usage.input_tokens,
outputTokens: message.usage.output_tokens,
cachedReadTokens: message.usage.cache_read_input_tokens,
cachedWriteTokens: message.usage.cache_creation_input_tokens,
await this.client.extNotification(
POSTHOG_NOTIFICATIONS.USAGE_UPDATE,
{
sessionId: params.sessionId,
used: {
inputTokens: message.usage.input_tokens,
outputTokens: message.usage.output_tokens,
cachedReadTokens: message.usage.cache_read_input_tokens,
cachedWriteTokens: message.usage.cache_creation_input_tokens,
},
cost: message.total_cost_usd,
},
cost: message.total_cost_usd,
});
);

const usage: Usage = {
inputTokens: this.session.accumulatedUsage.inputTokens,
Expand Down
97 changes: 34 additions & 63 deletions packages/agent/src/adapters/codex/codex-agent.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
import {
type AgentSideConnection,
type AuthenticateRequest,
type CancelNotification,
ClientSideConnection,
type ForkSessionRequest,
type ForkSessionResponse,
Expand Down Expand Up @@ -102,8 +101,8 @@ export class CodexAcpAgent extends BaseAcpAgent {
readonly adapterName = "codex";
declare session: CodexSession;
private codexProcess: CodexProcess;
private codexConnection!: ClientSideConnection;
private sessionState!: CodexSessionState;
private codexConnection: ClientSideConnection;
private sessionState: CodexSessionState;

constructor(client: AgentSideConnection, options: CodexAcpAgentOptions) {
super(client);
Expand Down Expand Up @@ -132,28 +131,14 @@ export class CodexAcpAgent extends BaseAcpAgent {
cancelled: false,
};

this.sessionState = createSessionState("", cwd);

// Create the ClientSideConnection to codex-acp.
// The Client handler delegates all requests from codex-acp to the upstream
// PostHog Code client via our AgentSideConnection.
this.codexConnection = new ClientSideConnection(
(_agent) =>
createCodexClient(
this.client,
this.logger,
this.sessionState ?? {
sessionId: "",
cwd: "",
modeId: "default",
configOptions: [],
accumulatedUsage: {
inputTokens: 0,
outputTokens: 0,
cachedReadTokens: 0,
cachedWriteTokens: 0,
},
cancelled: false,
},
),
createCodexClient(this.client, this.logger, this.sessionState),
codexStream,
);
}
Expand Down Expand Up @@ -224,9 +209,11 @@ export class CodexAcpAgent extends BaseAcpAgent {

async loadSession(params: LoadSessionRequest): Promise<LoadSessionResponse> {
const response = await this.codexConnection.loadSession(params);
const meta = params._meta as NewSessionMeta | undefined;

// Update session state
this.sessionState = createSessionState(params.sessionId, params.cwd);
this.sessionState = createSessionState(params.sessionId, params.cwd, {
permissionMode: toCodeExecutionMode(meta?.permissionMode),
});
this.sessionId = params.sessionId;
this.sessionState.configOptions = response.configOptions ?? [];

Expand All @@ -243,11 +230,15 @@ export class CodexAcpAgent extends BaseAcpAgent {
mcpServers: params.mcpServers ?? [],
});

this.sessionState = createSessionState(params.sessionId, params.cwd);
const meta = params._meta as NewSessionMeta | undefined;
this.sessionState = createSessionState(params.sessionId, params.cwd, {
taskRunId: meta?.taskRunId,
taskId: meta?.taskId ?? meta?.persistence?.taskId,
permissionMode: toCodeExecutionMode(meta?.permissionMode),
});
this.sessionId = params.sessionId;
this.sessionState.configOptions = loadResponse.configOptions ?? [];

const meta = params._meta as NewSessionMeta | undefined;
if (meta?.taskRunId) {
await this.client.extNotification(POSTHOG_NOTIFICATIONS.SDK_SESSION, {
taskRunId: meta.taskRunId,
Expand All @@ -273,7 +264,12 @@ export class CodexAcpAgent extends BaseAcpAgent {
_meta: params._meta,
});

this.sessionState = createSessionState(newResponse.sessionId, params.cwd);
const meta = params._meta as NewSessionMeta | undefined;
this.sessionState = createSessionState(newResponse.sessionId, params.cwd, {
taskRunId: meta?.taskRunId,
taskId: meta?.taskId ?? meta?.persistence?.taskId,
permissionMode: toCodeExecutionMode(meta?.permissionMode),
});
this.sessionId = newResponse.sessionId;
this.sessionState.configOptions = newResponse.configOptions ?? [];

Expand All @@ -289,31 +285,21 @@ export class CodexAcpAgent extends BaseAcpAgent {
async unstable_listSessions(
params: ListSessionsRequest,
): Promise<ListSessionsResponse> {
return this.codexConnection.listSessions(params);
return this.listSessions(params);
}

async prompt(params: PromptRequest): Promise<PromptResponse> {
if (this.sessionState) {
this.sessionState.cancelled = false;
this.sessionState.interruptReason = undefined;
resetUsage(this.sessionState);
}
this.session.cancelled = false;
this.session.interruptReason = undefined;
resetUsage(this.sessionState);

const response = await this.codexConnection.prompt(params);

if (this.sessionState && response.usage) {
// Accumulate token usage from the prompt response
this.sessionState.accumulatedUsage.inputTokens +=
response.usage.inputTokens ?? 0;
this.sessionState.accumulatedUsage.outputTokens +=
response.usage.outputTokens ?? 0;
this.sessionState.accumulatedUsage.cachedReadTokens +=
response.usage.cachedReadTokens ?? 0;
this.sessionState.accumulatedUsage.cachedWriteTokens +=
response.usage.cachedWriteTokens ?? 0;
}
// Usage is already accumulated via sessionUpdate notifications in
// codex-client.ts. Do NOT also add response.usage here or tokens
// get double-counted.

if (this.sessionState?.taskRunId) {
if (this.sessionState.taskRunId) {
const { accumulatedUsage } = this.sessionState;

await this.client.extNotification(POSTHOG_NOTIFICATIONS.TURN_COMPLETE, {
Expand All @@ -333,7 +319,7 @@ export class CodexAcpAgent extends BaseAcpAgent {
});

if (response.usage) {
await this.client.extNotification("_posthog/usage_update", {
await this.client.extNotification(POSTHOG_NOTIFICATIONS.USAGE_UPDATE, {
sessionId: params.sessionId,
used: {
inputTokens: response.usage.inputTokens ?? 0,
Expand All @@ -350,25 +336,11 @@ export class CodexAcpAgent extends BaseAcpAgent {
}

protected async interrupt(): Promise<void> {
if (this.sessionState) {
this.sessionState.cancelled = true;
}
await this.codexConnection.cancel({
sessionId: this.sessionId,
});
}

async cancel(params: CancelNotification): Promise<void> {
if (this.sessionState) {
this.sessionState.cancelled = true;
const meta = params._meta as { interruptReason?: string } | undefined;
if (meta?.interruptReason) {
this.sessionState.interruptReason = meta.interruptReason;
}
}
await this.codexConnection.cancel(params);
}

async setSessionMode(
params: SetSessionModeRequest,
): Promise<SetSessionModeResponse> {
Expand All @@ -380,18 +352,16 @@ export class CodexAcpAgent extends BaseAcpAgent {
modeId: nativeMode,
});

if (this.sessionState) {
this.sessionState.modeId = nativeMode;
this.sessionState.permissionMode = requestedMode;
}
this.sessionState.modeId = nativeMode;
this.sessionState.permissionMode = requestedMode;
return response ?? {};
}

async setSessionConfigOption(
params: SetSessionConfigOptionRequest,
): Promise<SetSessionConfigOptionResponse> {
const response = await this.codexConnection.setSessionConfigOption(params);
if (this.sessionState && response.configOptions) {
if (response.configOptions) {
this.sessionState.configOptions = response.configOptions;
}
return response;
Expand All @@ -403,6 +373,7 @@ export class CodexAcpAgent extends BaseAcpAgent {

async closeSession(): Promise<void> {
this.logger.info("Closing Codex session", { sessionId: this.sessionId });
this.session.abortController.abort();
this.session.settingsManager.dispose();
try {
this.codexProcess.kill();
Expand Down
3 changes: 0 additions & 3 deletions packages/agent/src/adapters/codex/session-state.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@ export interface CodexSessionState {
contextSize?: number;
contextUsed?: number;
permissionMode: CodeExecutionMode;
cancelled: boolean;
interruptReason?: string;
taskRunId?: string;
taskId?: string;
}
Expand Down Expand Up @@ -53,7 +51,6 @@ export function createSessionState(
cachedWriteTokens: 0,
},
permissionMode: opts?.permissionMode ?? "default",
cancelled: false,
taskRunId: opts?.taskRunId,
taskId: opts?.taskId,
};
Expand Down
2 changes: 2 additions & 0 deletions packages/agent/src/adapters/codex/spawn.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ function buildConfigArgs(options: CodexProcessOptions): string[] {
if (options.instructions) {
const escaped = options.instructions
.replace(/\\/g, "\\\\")
.replace(/\n/g, "\\n")
.replace(/\r/g, "\\r")
.replace(/"/g, '\\"');
args.push("-c", `instructions="${escaped}"`);
}
Expand Down
Loading