Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { describe, expect, it, vi } from 'vitest';
import type { Env, SandboxInstance } from '../../types.js';
import type { SessionMetadata } from '../../persistence/session-metadata.js';
import { WrapperClient } from '../../kilo/wrapper-client.js';
import { WRAPPER_FINAL_LOG_UPLOAD_TIMEOUT_MS } from '../../shared/protocol.js';
import { WRAPPER_VERSION } from '../../shared/wrapper-version.js';
import type { EnsureWrapperRequest } from '../protocol.js';
import { CloudflareAgentSandbox } from './cloudflare-agent-sandbox.js';
Expand Down Expand Up @@ -541,6 +542,36 @@ describe('CloudflareAgentSandbox', () => {
expect(exec).toHaveBeenCalledWith(expect.stringContaining('--agent-session agent_cloudflare'));
});

it('allows the wrapper final-log deadline before force stopping', async () => {
const observedProcess = {
id: 'wrapper-target',
command:
'WRAPPER_PORT=5000 kilocode-wrapper --agent-session agent_cloudflare --wrapper-instance-id instance_1 --wrapper-instance-generation 2',
status: 'running',
};
const listProcesses = vi.fn().mockResolvedValue([observedProcess]);
const sleep = vi.fn().mockResolvedValue(undefined);
const sandbox = new CloudflareAgentSandbox({} as Env, metadata(), {
resolveSandbox: () =>
({
listProcesses,
exec: vi.fn().mockResolvedValue({ exitCode: 0, stdout: '' }),
}) as unknown as SandboxInstance,
sleep,
});

await expect(
sandbox.stopWrappers({
target: { kind: 'instance', instance: { instanceId: 'instance_1', instanceGeneration: 2 } },
attemptId: 'attempt_graceful_flush',
reason: 'readiness-failed',
})
).resolves.toMatchObject({ status: 'still-present' });

const gracefulStopWindowMs = sleep.mock.calls.reduce((total, [delayMs]) => total + delayMs, 0);
expect(gracefulStopWindowMs).toBeGreaterThan(WRAPPER_FINAL_LOG_UPLOAD_TIMEOUT_MS);
});

it('returns still-present when targeted forceful cleanup remains observable', async () => {
const observedProcess = {
id: 'wrapper-target',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import {
} from '../../sandbox-timeout-logging.js';
import { SANDBOX_WORKSPACE_PROBE_TIMEOUT_MESSAGE } from '../../sandbox-recovery.js';
import { withTimeout } from '@kilocode/worker-utils';
import { WRAPPER_GRACEFUL_STOP_TIMEOUT_MS } from '../../shared/protocol.js';
import { WRAPPER_VERSION } from '../../shared/wrapper-version.js';
import { ExecutionError } from '../../execution/errors.js';
import {
Expand All @@ -47,7 +48,15 @@ import {
} from '../../workspace-errors.js';

const PREPARE_WORKSPACE_TIMEOUT_MS = 10 * 60 * 1000;
const DEFAULT_STOP_OBSERVATION_DELAYS_MS = [100, 500, 1_000];
const EARLY_STOP_OBSERVATION_DELAYS_MS = [100, 500, 1_000, 2_000, 5_000, 10_000];
const EARLY_STOP_OBSERVATION_WINDOW_MS = EARLY_STOP_OBSERVATION_DELAYS_MS.reduce(
(total, delayMs) => total + delayMs,
0
);
const DEFAULT_STOP_OBSERVATION_DELAYS_MS = [
...EARLY_STOP_OBSERVATION_DELAYS_MS,
WRAPPER_GRACEFUL_STOP_TIMEOUT_MS - EARLY_STOP_OBSERVATION_WINDOW_MS,
];

function withWorkspacePreparationTimeout<T>(operation: Promise<T>, step: string): Promise<T> {
return withTimeout(
Expand Down
3 changes: 3 additions & 0 deletions services/cloud-agent-next/src/shared/protocol.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,9 @@ export type IngestEvent = {
data: unknown;
};

export const WRAPPER_FINAL_LOG_UPLOAD_TIMEOUT_MS = 32_000;
export const WRAPPER_GRACEFUL_STOP_TIMEOUT_MS = WRAPPER_FINAL_LOG_UPLOAD_TIMEOUT_MS + 3_000;

export const WrapperTerminalFailureCodes = ['payment_required', 'model_missing'] as const;
export type WrapperTerminalFailureCode = (typeof WrapperTerminalFailureCodes)[number];

Expand Down
48 changes: 48 additions & 0 deletions services/cloud-agent-next/wrapper/src/lifecycle.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import { WrapperState } from './state';
import { createLifecycleManager } from './lifecycle';
import type { IngestEvent } from '../../src/shared/protocol';
import type { WrapperKiloClient } from './kilo-api';
import type { LogUploader } from './log-uploader';

const sessionContext = {
kiloSessionId: 'kilo_sess_test',
Expand All @@ -19,6 +20,53 @@ function wait(ms: number): Promise<void> {
}

describe('wrapper lifecycle drain races', () => {
it('flushes immutable logs before emitting completion', async () => {
const state = new WrapperState();
const events: IngestEvent[] = [];
const order: string[] = [];
let releaseFlush: (() => void) | undefined;
const flushGate = new Promise<void>(resolve => {
releaseFlush = resolve;
});
const uploader: LogUploader = {
start: () => {},
uploadNow: async () => {},
flushNow: async () => {
order.push('flush-started');
await flushGate;
order.push('flush-finished');
},
stop: () => {},
};
state.bindSession(sessionContext);
state.setSendToIngestFn(event => {
events.push(event);
if (event.streamEventType === 'complete') order.push('complete');
});
state.setLogUploader(uploader);
const lifecycle = createLifecycleManager(
{ workspacePath: '/tmp' },
{
state,
kiloClient: {} as WrapperKiloClient,
closeConnections: async () => {},
isConnected: () => true,
reconnectEventSubscription: () => {},
}
);

lifecycle.triggerDrainAndClose();
while (!order.includes('flush-started')) await wait(1);
expect(events.map(event => event.streamEventType)).not.toContain('complete');

const release = releaseFlush;
if (!release) throw new Error('Expected final log flush to start');
release();
while (!order.includes('complete')) await wait(1);

expect(order).toEqual(['flush-started', 'flush-finished', 'complete']);
});

it('clears aborted state when activity cancels an aborted drain', async () => {
const state = new WrapperState();
const events: IngestEvent[] = [];
Expand Down
3 changes: 1 addition & 2 deletions services/cloud-agent-next/wrapper/src/lifecycle.ts
Original file line number Diff line number Diff line change
Expand Up @@ -173,13 +173,12 @@ export function createLifecycleManager(
const uploader = state.logUploader;
if (uploader) {
try {
await uploader.uploadNow();
await uploader.flushNow();
} catch (error) {
logToFile(
`final log upload failed: ${error instanceof Error ? error.message : String(error)}`
);
}
uploader.stop();
}
} finally {
const currentSession = state.currentSession;
Expand Down
162 changes: 162 additions & 0 deletions services/cloud-agent-next/wrapper/src/log-uploader.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
import { afterEach, beforeEach, describe, expect, it } from 'bun:test';
import fs from 'node:fs';
import os from 'node:os';
import path from 'node:path';
import { createLogUploader } from './log-uploader';
import { getKiloImportDiagnosticPath } from './utils';

function asFetch(
fn: (...args: Parameters<typeof fetch>) => ReturnType<typeof fetch>
): typeof fetch {
return Object.assign(fn, { preconnect: fetch.preconnect });
}

function requestUrl(input: string | URL | Request): string {
if (typeof input === 'string') return input;
return input instanceof URL ? input.href : input.url;
}

describe('createLogUploader', () => {
let tmpDir: string;
let originalFetch: typeof globalThis.fetch;

beforeEach(() => {
tmpDir = fs.mkdtempSync(path.join(os.tmpdir(), 'log-uploader-test-'));
originalFetch = globalThis.fetch;
});

afterEach(() => {
globalThis.fetch = originalFetch;
fs.rmSync(tmpDir, { recursive: true, force: true });
});

it('uploads the import failure diagnostic in the authenticated R2 log archive', async () => {
const cliLogDir = path.join(tmpDir, 'cli-logs');
const wrapperLogPath = path.join(tmpDir, 'wrapper.log');
const diagnosticPath = getKiloImportDiagnosticPath(wrapperLogPath);
fs.mkdirSync(cliLogDir);
fs.writeFileSync(path.join(cliLogDir, 'kilo.log'), 'cli log');
fs.writeFileSync(wrapperLogPath, 'wrapper log');
fs.writeFileSync(diagnosticPath, '{"version":1,"stderr":{"text":"schema failure"}}');

const requests: Array<{ url: string; authorization: string | null; archive: Uint8Array }> = [];
globalThis.fetch = asFetch(async (input, init) => {
const body = init?.body;
if (!(body instanceof ReadableStream)) throw new Error('Expected streaming archive body');
requests.push({
url: requestUrl(input),
authorization: new Headers(init?.headers).get('Authorization'),
archive: new Uint8Array(await new Response(body).arrayBuffer()),
});
return new Response(null, { status: 204 });
});

const uploader = createLogUploader({
workerBaseUrl: 'https://worker.example',
sessionId: 'agent_test',
executionId: 'session',
userId: 'user_test',
workerAuthToken: 'worker-token',
cliLogDir,
wrapperLogPath,
});
await uploader.uploadNow();

expect(requests).toHaveLength(1);
expect(requests[0]?.url).toBe(
'https://worker.example/sessions/user_test/agent_test/logs/session/logs.tar.gz'
);
expect(requests[0]?.authorization).toBe('Bearer worker-token');

const archivePath = path.join(tmpDir, 'logs.tar.gz');
const extractDir = path.join(tmpDir, 'extracted');
fs.writeFileSync(archivePath, requests[0]?.archive ?? new Uint8Array());
fs.mkdirSync(extractDir);
const extraction = Bun.spawnSync(['tar', 'xzf', archivePath, '-C', extractDir], {
stdout: 'pipe',
stderr: 'pipe',
});
expect(extraction.exitCode).toBe(0);
expect(fs.readFileSync(path.join(extractDir, path.basename(diagnosticPath)), 'utf8')).toContain(
'schema failure'
);
});

it('serializes final uploads into one bounded final archive key', async () => {
const cliLogDir = path.join(tmpDir, 'cli-logs');
const wrapperLogPath = path.join(tmpDir, 'wrapper.log');
const diagnosticPath = getKiloImportDiagnosticPath(wrapperLogPath);
fs.mkdirSync(cliLogDir);
fs.writeFileSync(wrapperLogPath, 'wrapper log');

const archives: Uint8Array[] = [];
const requestUrls: string[] = [];
const uploadOrder: string[] = [];
let firstUploadSignal: AbortSignal | undefined;
let releaseFirstResponse: (() => void) | undefined;
const firstResponseGate = new Promise<void>(resolve => {
releaseFirstResponse = resolve;
});
globalThis.fetch = asFetch(async (input, init) => {
const body = init?.body;
if (!(body instanceof ReadableStream)) throw new Error('Expected streaming archive body');
requestUrls.push(requestUrl(input));
const requestNumber = requestUrls.length;
uploadOrder.push(`start-${requestNumber}`);
archives.push(new Uint8Array(await new Response(body).arrayBuffer()));
if (requestNumber === 1) {
firstUploadSignal = init?.signal ?? undefined;
await firstResponseGate;
}
uploadOrder.push(`settle-${requestNumber}`);
return new Response(null, { status: 204 });
});

const uploader = createLogUploader({
workerBaseUrl: 'https://worker.example',
sessionId: 'agent_test',
executionId: 'session',
userId: 'user_test',
workerAuthToken: 'worker-token',
cliLogDir,
wrapperLogPath,
});
const periodicUpload = uploader.uploadNow();
while (requestUrls.length === 0) await Bun.sleep(1);

fs.writeFileSync(diagnosticPath, '{"version":1,"stderr":{"text":"late failure"}}');
const finalUpload = uploader.flushNow();
await Promise.resolve();
await Promise.resolve();
const firstUploadWasAborted = firstUploadSignal?.aborted ?? false;
const finalUploadStartedBeforeFirstSettled = requestUrls.length > 1;
const release = releaseFirstResponse;
if (!release) throw new Error('First upload did not start');
release();
await Promise.all([periodicUpload, finalUpload]);

expect(firstUploadWasAborted).toBe(false);
expect(finalUploadStartedBeforeFirstSettled).toBe(false);
expect(uploadOrder).toEqual(['start-1', 'settle-1', 'start-2', 'settle-2']);
expect(requestUrls[0]).toContain('/logs/session/logs.tar.gz');
expect(requestUrls[1]).toContain('/logs/session-final/logs.tar.gz');

await uploader.flushNow();

expect(requestUrls[2]).toBe(requestUrls[1]);
expect(new Set(requestUrls).size).toBe(2);
expect(archives).toHaveLength(3);
const archivePath = path.join(tmpDir, 'final-logs.tar.gz');
const extractDir = path.join(tmpDir, 'final-extracted');
fs.writeFileSync(archivePath, archives[1] ?? new Uint8Array());
fs.mkdirSync(extractDir);
const extraction = Bun.spawnSync(['tar', 'xzf', archivePath, '-C', extractDir], {
stdout: 'pipe',
stderr: 'pipe',
});
expect(extraction.exitCode).toBe(0);
expect(fs.readFileSync(path.join(extractDir, path.basename(diagnosticPath)), 'utf8')).toContain(
'late failure'
);
});
});
Loading