Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/clean-session-shutdown.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@moonshot-ai/kimi-code": patch
---

Close live sessions during server shutdown.
18 changes: 18 additions & 0 deletions packages/agent-core/src/rpc/core-impl.ts
Original file line number Diff line number Diff line change
Expand Up @@ -352,6 +352,24 @@ export class KimiCore implements PromisableMethods<CoreAPI> {
}
}

async closeAllSessions(): Promise<void> {
const results = await Promise.allSettled(
Array.from(this.sessions.entries(), async ([sessionId, session]) => {
try {
await session.close();
} finally {
this.sessions.delete(sessionId);
}
}),
);
const failures = results
.filter((result) => result.status === 'rejected')
.map((result) => result.reason);
if (failures.length > 0) {
throw new AggregateError(failures, 'failed to close all sessions');
}
}

async archiveSession({ sessionId }: ArchiveSessionPayload): Promise<void> {
await this.closeSession({ sessionId });
await this.sessionStore.archive(sessionId);
Expand Down
3 changes: 3 additions & 0 deletions packages/agent-core/src/services/coreProcess/coreProcess.ts
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,9 @@ export interface ICoreProcessService {
*/
ready(): Promise<void>;

/** Gracefully close every live in-process session before disposal. */
closeAllSessions(): Promise<void>;

/**
* Tear down the adapter. After dispose, `rpc.<method>(...)` rejects with a
* "core process disposed" error before reaching `KimiCore`. Idempotent.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,12 +139,15 @@ export class CoreProcessService extends Disposable implements ICoreProcessServic
return this._ready;
}

async closeAllSessions(): Promise<void> {
await this._core.closeAllSessions();
}

override dispose(): void {
if (this._store.isDisposed) return;
// KimiCore does not currently expose a dispose() — when it does, we'll
// await/call it here BEFORE super.dispose(). For now, disposing the
// service flips _disposed, which makes future rpc.* invocations reject
// before they reach KimiCore.
// Session shutdown is async and is owned by `closeAllSessions()` so callers
// can await it before tearing down the DI graph. `dispose()` remains the
// synchronous guard that rejects future rpc.* invocations.
super.dispose();
}

Expand Down
60 changes: 60 additions & 0 deletions packages/agent-core/test/rpc/core-close-all.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
import { mkdtempSync, rmSync } from 'node:fs';
import { tmpdir } from 'node:os';
import { join } from 'node:path';

import { afterEach, beforeEach, describe, expect, it } from 'vitest';

import { KimiCore } from '../../src/rpc/core-impl';

let homeDir: string;

beforeEach(() => {
homeDir = mkdtempSync(join(tmpdir(), 'kimi-core-close-all-'));
});

afterEach(() => {
rmSync(homeDir, { recursive: true, force: true });
});

describe('KimiCore.closeAllSessions', () => {
it('closes every live session and clears the active session map', async () => {
const core = new KimiCore(async () => ({}) as never, { homeDir });
const closed: string[] = [];
core.sessions.set('sess_a', {
close: async () => {
closed.push('sess_a');
},
} as never);
core.sessions.set('sess_b', {
close: async () => {
closed.push('sess_b');
},
} as never);

await core.closeAllSessions();

expect(closed.toSorted()).toEqual(['sess_a', 'sess_b']);
expect(core.sessions.size).toBe(0);
});

it('continues closing later sessions and clears entries when one close fails', async () => {
const core = new KimiCore(async () => ({}) as never, { homeDir });
const closed: string[] = [];
core.sessions.set('sess_bad', {
close: async () => {
closed.push('sess_bad');
throw new Error('close failed');
},
} as never);
core.sessions.set('sess_good', {
close: async () => {
closed.push('sess_good');
},
} as never);

await expect(core.closeAllSessions()).rejects.toThrow(AggregateError);

expect(closed.toSorted()).toEqual(['sess_bad', 'sess_good']);
expect(core.sessions.size).toBe(0);
});
});
2 changes: 2 additions & 0 deletions packages/agent-core/test/services/message-service.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ function makeFakeBridge(
return {
rpc: rpc as CoreRPC,
ready: vi.fn().mockResolvedValue(undefined),
closeAllSessions: vi.fn().mockResolvedValue(undefined),
dispose: vi.fn(),
_serviceBrand: undefined,
};
Expand Down Expand Up @@ -349,6 +350,7 @@ describe('MessageService', () => {
const failingBridge: ICoreProcessService = {
rpc: rpc as CoreRPC,
ready: vi.fn().mockResolvedValue(undefined),
closeAllSessions: vi.fn().mockResolvedValue(undefined),
dispose: vi.fn(),
_serviceBrand: undefined,
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -590,6 +590,7 @@ describe('MessageService over a compacted wire log', () => {
bridge = {
rpc: rpc as CoreRPC,
ready: vi.fn().mockResolvedValue(undefined),
closeAllSessions: vi.fn().mockResolvedValue(undefined),
dispose: vi.fn(),
_serviceBrand: undefined,
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ function makeCore(configRef: { current: KimiConfig }): {
_serviceBrand: undefined,
rpc: rpc as CoreRPC,
ready: async () => undefined,
closeAllSessions: async () => undefined,
dispose: () => undefined,
},
getCalls,
Expand Down
1 change: 1 addition & 0 deletions packages/agent-core/test/services/prompt-service.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,7 @@ function makeBridge(
const bridge: ICoreProcessService = {
rpc: rpc as CoreRPC,
ready: vi.fn().mockResolvedValue(undefined),
closeAllSessions: vi.fn().mockResolvedValue(undefined),
dispose: vi.fn(),
_serviceBrand: undefined,
};
Expand Down
1 change: 1 addition & 0 deletions packages/agent-core/test/services/session-service.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,7 @@ function makeFakeBridge(state: FakeBridgeState): ICoreProcessService {
return {
rpc: rpc as CoreRPC,
ready: async () => undefined,
closeAllSessions: async () => undefined,
dispose: () => undefined,
_serviceBrand: undefined,
};
Expand Down
1 change: 1 addition & 0 deletions packages/agent-core/test/services/task-service.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ function makeBridge(state: FakeState): ICoreProcessService {
return {
rpc: rpc as CoreRPC,
ready: async () => undefined,
closeAllSessions: async () => undefined,
dispose: () => undefined,
_serviceBrand: undefined,
};
Expand Down
1 change: 1 addition & 0 deletions packages/agent-core/test/services/tool-service.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ function makeFakeBridge(state: FakeBridgeState): ICoreProcessService {
return {
rpc: rpc as CoreRPC,
ready: async () => undefined,
closeAllSessions: async () => undefined,
dispose: () => undefined,
_serviceBrand: undefined,
};
Expand Down
9 changes: 9 additions & 0 deletions packages/server/src/start.ts
Original file line number Diff line number Diff line change
Expand Up @@ -613,6 +613,15 @@ export async function startServer(opts: ServerStartOptions): Promise<RunningServ

}

try {
await coreProcess.closeAllSessions();
} catch (error) {
pinoLogger.warn(
{ err: error instanceof Error ? error : new Error(String(error)) },
'failed to close live sessions during server shutdown',
);
}

try {
ix.dispose();
} catch {
Expand Down