From c6d84ce79aad4bcec68177742610b32c77b87080 Mon Sep 17 00:00:00 2001 From: morluto Date: Wed, 1 Jul 2026 12:57:00 +0000 Subject: [PATCH 1/2] fix(agent-core): add core session shutdown lifecycle --- packages/agent-core/src/rpc/core-impl.ts | 18 ++++++ .../src/services/coreProcess/coreProcess.ts | 3 + .../coreProcess/coreProcessService.ts | 11 ++-- .../test/rpc/core-close-all.test.ts | 60 +++++++++++++++++++ .../test/services/message-service.test.ts | 2 + .../test/services/message-transcript.test.ts | 1 + .../services/model-catalog-service.test.ts | 1 + .../test/services/prompt-service.test.ts | 1 + .../test/services/session-service.test.ts | 1 + .../test/services/task-service.test.ts | 1 + .../test/services/tool-service.test.ts | 1 + 11 files changed, 96 insertions(+), 4 deletions(-) create mode 100644 packages/agent-core/test/rpc/core-close-all.test.ts diff --git a/packages/agent-core/src/rpc/core-impl.ts b/packages/agent-core/src/rpc/core-impl.ts index c028b6d51..d896ed61b 100644 --- a/packages/agent-core/src/rpc/core-impl.ts +++ b/packages/agent-core/src/rpc/core-impl.ts @@ -352,6 +352,24 @@ export class KimiCore implements PromisableMethods { } } + async closeAllSessions(): Promise { + const results = await Promise.allSettled( + Array.from(this.sessions.entries(), async ([sessionId, session]) => { + try { + await session.close(); + } finally { + this.sessions.delete(sessionId); + } + }), + ); + const failures = results + .filter((result) => result.status === 'rejected') + .map((result) => result.reason); + if (failures.length > 0) { + throw new AggregateError(failures, 'failed to close all sessions'); + } + } + async archiveSession({ sessionId }: ArchiveSessionPayload): Promise { await this.closeSession({ sessionId }); await this.sessionStore.archive(sessionId); diff --git a/packages/agent-core/src/services/coreProcess/coreProcess.ts b/packages/agent-core/src/services/coreProcess/coreProcess.ts index 2a8b5ff03..b99c1aae8 100644 --- a/packages/agent-core/src/services/coreProcess/coreProcess.ts +++ b/packages/agent-core/src/services/coreProcess/coreProcess.ts @@ -66,6 +66,9 @@ export interface ICoreProcessService { */ ready(): Promise; + /** Gracefully close every live in-process session before disposal. */ + closeAllSessions(): Promise; + /** * Tear down the adapter. After dispose, `rpc.(...)` rejects with a * "core process disposed" error before reaching `KimiCore`. Idempotent. diff --git a/packages/agent-core/src/services/coreProcess/coreProcessService.ts b/packages/agent-core/src/services/coreProcess/coreProcessService.ts index 58bb34452..6eb3fad2e 100644 --- a/packages/agent-core/src/services/coreProcess/coreProcessService.ts +++ b/packages/agent-core/src/services/coreProcess/coreProcessService.ts @@ -139,12 +139,15 @@ export class CoreProcessService extends Disposable implements ICoreProcessServic return this._ready; } + async closeAllSessions(): Promise { + await this._core.closeAllSessions(); + } + override dispose(): void { if (this._store.isDisposed) return; - // KimiCore does not currently expose a dispose() — when it does, we'll - // await/call it here BEFORE super.dispose(). For now, disposing the - // service flips _disposed, which makes future rpc.* invocations reject - // before they reach KimiCore. + // Session shutdown is async and is owned by `closeAllSessions()` so callers + // can await it before tearing down the DI graph. `dispose()` remains the + // synchronous guard that rejects future rpc.* invocations. super.dispose(); } diff --git a/packages/agent-core/test/rpc/core-close-all.test.ts b/packages/agent-core/test/rpc/core-close-all.test.ts new file mode 100644 index 000000000..2e9038628 --- /dev/null +++ b/packages/agent-core/test/rpc/core-close-all.test.ts @@ -0,0 +1,60 @@ +import { mkdtempSync, rmSync } from 'node:fs'; +import { tmpdir } from 'node:os'; +import { join } from 'node:path'; + +import { afterEach, beforeEach, describe, expect, it } from 'vitest'; + +import { KimiCore } from '../../src/rpc/core-impl'; + +let homeDir: string; + +beforeEach(() => { + homeDir = mkdtempSync(join(tmpdir(), 'kimi-core-close-all-')); +}); + +afterEach(() => { + rmSync(homeDir, { recursive: true, force: true }); +}); + +describe('KimiCore.closeAllSessions', () => { + it('closes every live session and clears the active session map', async () => { + const core = new KimiCore(async () => ({}) as never, { homeDir }); + const closed: string[] = []; + core.sessions.set('sess_a', { + close: async () => { + closed.push('sess_a'); + }, + } as never); + core.sessions.set('sess_b', { + close: async () => { + closed.push('sess_b'); + }, + } as never); + + await core.closeAllSessions(); + + expect(closed.toSorted()).toEqual(['sess_a', 'sess_b']); + expect(core.sessions.size).toBe(0); + }); + + it('continues closing later sessions and clears entries when one close fails', async () => { + const core = new KimiCore(async () => ({}) as never, { homeDir }); + const closed: string[] = []; + core.sessions.set('sess_bad', { + close: async () => { + closed.push('sess_bad'); + throw new Error('close failed'); + }, + } as never); + core.sessions.set('sess_good', { + close: async () => { + closed.push('sess_good'); + }, + } as never); + + await expect(core.closeAllSessions()).rejects.toThrow(AggregateError); + + expect(closed.toSorted()).toEqual(['sess_bad', 'sess_good']); + expect(core.sessions.size).toBe(0); + }); +}); diff --git a/packages/agent-core/test/services/message-service.test.ts b/packages/agent-core/test/services/message-service.test.ts index 1e34acb15..f7e7fa0b8 100644 --- a/packages/agent-core/test/services/message-service.test.ts +++ b/packages/agent-core/test/services/message-service.test.ts @@ -53,6 +53,7 @@ function makeFakeBridge( return { rpc: rpc as CoreRPC, ready: vi.fn().mockResolvedValue(undefined), + closeAllSessions: vi.fn().mockResolvedValue(undefined), dispose: vi.fn(), _serviceBrand: undefined, }; @@ -349,6 +350,7 @@ describe('MessageService', () => { const failingBridge: ICoreProcessService = { rpc: rpc as CoreRPC, ready: vi.fn().mockResolvedValue(undefined), + closeAllSessions: vi.fn().mockResolvedValue(undefined), dispose: vi.fn(), _serviceBrand: undefined, }; diff --git a/packages/agent-core/test/services/message-transcript.test.ts b/packages/agent-core/test/services/message-transcript.test.ts index c249c0174..f599b94f2 100644 --- a/packages/agent-core/test/services/message-transcript.test.ts +++ b/packages/agent-core/test/services/message-transcript.test.ts @@ -590,6 +590,7 @@ describe('MessageService over a compacted wire log', () => { bridge = { rpc: rpc as CoreRPC, ready: vi.fn().mockResolvedValue(undefined), + closeAllSessions: vi.fn().mockResolvedValue(undefined), dispose: vi.fn(), _serviceBrand: undefined, }; diff --git a/packages/agent-core/test/services/model-catalog-service.test.ts b/packages/agent-core/test/services/model-catalog-service.test.ts index 636a69643..658ca8eda 100644 --- a/packages/agent-core/test/services/model-catalog-service.test.ts +++ b/packages/agent-core/test/services/model-catalog-service.test.ts @@ -84,6 +84,7 @@ function makeCore(configRef: { current: KimiConfig }): { _serviceBrand: undefined, rpc: rpc as CoreRPC, ready: async () => undefined, + closeAllSessions: async () => undefined, dispose: () => undefined, }, getCalls, diff --git a/packages/agent-core/test/services/prompt-service.test.ts b/packages/agent-core/test/services/prompt-service.test.ts index 3a8d85541..7b5215ab0 100644 --- a/packages/agent-core/test/services/prompt-service.test.ts +++ b/packages/agent-core/test/services/prompt-service.test.ts @@ -246,6 +246,7 @@ function makeBridge( const bridge: ICoreProcessService = { rpc: rpc as CoreRPC, ready: vi.fn().mockResolvedValue(undefined), + closeAllSessions: vi.fn().mockResolvedValue(undefined), dispose: vi.fn(), _serviceBrand: undefined, }; diff --git a/packages/agent-core/test/services/session-service.test.ts b/packages/agent-core/test/services/session-service.test.ts index 0d4eb0a08..a144cd424 100644 --- a/packages/agent-core/test/services/session-service.test.ts +++ b/packages/agent-core/test/services/session-service.test.ts @@ -201,6 +201,7 @@ function makeFakeBridge(state: FakeBridgeState): ICoreProcessService { return { rpc: rpc as CoreRPC, ready: async () => undefined, + closeAllSessions: async () => undefined, dispose: () => undefined, _serviceBrand: undefined, }; diff --git a/packages/agent-core/test/services/task-service.test.ts b/packages/agent-core/test/services/task-service.test.ts index 7dc59aff8..54f802472 100644 --- a/packages/agent-core/test/services/task-service.test.ts +++ b/packages/agent-core/test/services/task-service.test.ts @@ -59,6 +59,7 @@ function makeBridge(state: FakeState): ICoreProcessService { return { rpc: rpc as CoreRPC, ready: async () => undefined, + closeAllSessions: async () => undefined, dispose: () => undefined, _serviceBrand: undefined, }; diff --git a/packages/agent-core/test/services/tool-service.test.ts b/packages/agent-core/test/services/tool-service.test.ts index 59dbd6b35..ffddbf455 100644 --- a/packages/agent-core/test/services/tool-service.test.ts +++ b/packages/agent-core/test/services/tool-service.test.ts @@ -51,6 +51,7 @@ function makeFakeBridge(state: FakeBridgeState): ICoreProcessService { return { rpc: rpc as CoreRPC, ready: async () => undefined, + closeAllSessions: async () => undefined, dispose: () => undefined, _serviceBrand: undefined, }; From 9339a2f6c50e722ac2476aa00a58dac8bbe14cb4 Mon Sep 17 00:00:00 2001 From: morluto Date: Wed, 1 Jul 2026 12:57:09 +0000 Subject: [PATCH 2/2] fix(server): close live sessions during shutdown --- .changeset/clean-session-shutdown.md | 5 +++++ packages/server/src/start.ts | 9 +++++++++ 2 files changed, 14 insertions(+) create mode 100644 .changeset/clean-session-shutdown.md diff --git a/.changeset/clean-session-shutdown.md b/.changeset/clean-session-shutdown.md new file mode 100644 index 000000000..f353dcd9e --- /dev/null +++ b/.changeset/clean-session-shutdown.md @@ -0,0 +1,5 @@ +--- +"@moonshot-ai/kimi-code": patch +--- + +Close live sessions during server shutdown. diff --git a/packages/server/src/start.ts b/packages/server/src/start.ts index dda05f458..1b73eb4c8 100644 --- a/packages/server/src/start.ts +++ b/packages/server/src/start.ts @@ -613,6 +613,15 @@ export async function startServer(opts: ServerStartOptions): Promise