Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/print-wait-background-subagents.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@moonshot-ai/kimi-code": patch
---

In `kimi -p` runs, wait for background subagents to finish before exiting when `background.keep_alive_on_exit` is enabled. Set `keep_alive_on_exit = true` to let concurrent background subagents complete.
14 changes: 13 additions & 1 deletion apps/kimi-code/src/cli/run-prompt.ts
Original file line number Diff line number Diff line change
Expand Up @@ -510,7 +510,19 @@ function runPromptTurn(
return;
case 'turn.ended':
if (event.reason === 'completed') {
finish();
void (async () => {
// Flush the buffered assistant message before draining background
// tasks: in stream-json mode the final message is only emitted by
// finish(), so a long background wait would otherwise withhold the
// main turn's result until the drain settles.
outputWriter.flushAssistant();
try {
await session.waitForBackgroundTasksOnPrint();
Comment thread
RealKai42 marked this conversation as resolved.
} catch (error) {
log.warn('waitForBackgroundTasksOnPrint failed', { error });
}
finish();
})();
return;
}
finish(new Error(formatTurnEndedFailure(event)));
Expand Down
32 changes: 32 additions & 0 deletions apps/kimi-code/test/cli/run-prompt.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ const mocks = vi.hoisted(() => {
handler(mainEvent({ type: 'turn.ended', turnId: 1, reason: 'completed' }));
}
}),
waitForBackgroundTasksOnPrint: vi.fn(async () => {}),
};

return {
Expand Down Expand Up @@ -659,6 +660,37 @@ describe('runPrompt', () => {
);
});

it('flushes stream-json assistant output before waiting for background tasks', async () => {
let releaseWait: () => void = () => {};
const waitGate = new Promise<void>((resolve) => {
releaseWait = resolve;
});
mocks.session.waitForBackgroundTasksOnPrint.mockImplementationOnce(async () => waitGate);

mocks.session.prompt.mockImplementationOnce(async () => {
for (const handler of mocks.eventHandlers) {
handler(mocks.mainEvent({ type: 'turn.started', turnId: 9, origin: { kind: 'user' } }));
handler(mocks.mainEvent({ type: 'assistant.delta', turnId: 9, delta: 'final answer' }));
handler(mocks.mainEvent({ type: 'turn.ended', turnId: 9, reason: 'completed' }));
}
});

const stdout = writer();
const stderr = writer();
const runPromise = runPrompt(opts({ outputFormat: 'stream-json' }), '1.2.3-test', {
stdout,
stderr,
});

// The assistant message must be flushed even while the background wait is pending.
await waitForAssertion(() => {
expect(stdout.text()).toContain('{"role":"assistant","content":"final answer"}');
});

releaseWait();
await runPromise;
});

it('resumes a concrete session without a configured default model', async () => {
mocks.harnessGetConfig.mockResolvedValueOnce({ providers: {}, telemetry: true });
mocks.session.getStatus.mockResolvedValueOnce({ permission: 'manual', model: 'saved-model' });
Expand Down
5 changes: 4 additions & 1 deletion docs/en/configuration/config-files.md
Original file line number Diff line number Diff line change
Expand Up @@ -193,10 +193,13 @@ You can also switch models temporarily without touching the config file — by s
| Field | Type | Default | Description |
| --- | --- | --- | --- |
| `max_running_tasks` | `integer` | — | Maximum number of background tasks running concurrently |
| `keep_alive_on_exit` | `boolean` | `false` | Whether to keep still-running background tasks when the session closes. By default, Kimi Code requests that all background tasks stop before the process exits; set this to `true` only when you want tasks to outlive the session |
| `keep_alive_on_exit` | `boolean` | `false` | Whether to keep still-running background tasks when the session closes. By default, Kimi Code requests that all background tasks stop before the process exits; set this to `true` only when you want tasks to outlive the session. In print mode (`kimi -p`), setting this to `true` also makes the process wait for all background tasks to finish before exiting, so background subagents can complete their work |
| `print_wait_ceiling_s` | `integer` | `3600` | In print mode (`kimi -p`) with `keep_alive_on_exit = true`, the maximum number of seconds the process waits for background tasks to finish after the main agent's turn ends. Has no effect outside print mode or when `keep_alive_on_exit` is `false` |

`keep_alive_on_exit` can be overridden by the `KIMI_CODE_BACKGROUND_KEEP_ALIVE_ON_EXIT` environment variable, which takes higher priority than `config.toml`.

In print mode (`kimi -p "<prompt>"`), Kimi Code runs a single non-interactive turn and exits as soon as the main agent finishes. If you launch background tasks (for example, concurrent subagents via `Agent(run_in_background=true)`) and need them to run to completion, set `keep_alive_on_exit = true`: the process then waits for every background task to reach a terminal state before exiting, bounded by `print_wait_ceiling_s`. Without it, the single turn ending tears background tasks down with the process.

<!--
## `experimental`

Expand Down
5 changes: 4 additions & 1 deletion docs/zh/configuration/config-files.md
Original file line number Diff line number Diff line change
Expand Up @@ -193,10 +193,13 @@ display_name = "Kimi for Coding (custom)"
| 字段 | 类型 | 默认值 | 说明 |
| --- | --- | --- | --- |
| `max_running_tasks` | `integer` | — | 同时运行的最大后台任务数 |
| `keep_alive_on_exit` | `boolean` | `false` | 会话关闭时是否保留仍在运行的后台任务。默认情况下,Kimi Code 会在进程退出前请求停止所有后台任务;只有希望任务在会话结束后继续运行时才设为 `true` |
| `keep_alive_on_exit` | `boolean` | `false` | 会话关闭时是否保留仍在运行的后台任务。默认情况下,Kimi Code 会在进程退出前请求停止所有后台任务;只有希望任务在会话结束后继续运行时才设为 `true`。在 print 模式(`kimi -p`)下,设为 `true` 还会让进程在退出前等待所有后台任务跑完,使后台子代理得以完成工作 |
| `print_wait_ceiling_s` | `integer` | `3600` | 在 print 模式(`kimi -p`)且 `keep_alive_on_exit = true` 时,主 agent 的 turn 结束后进程等待后台任务完成的最长秒数。在非 print 模式或 `keep_alive_on_exit` 为 `false` 时无效 |

`keep_alive_on_exit` 可被环境变量 `KIMI_CODE_BACKGROUND_KEEP_ALIVE_ON_EXIT` 覆盖,优先级高于配置文件。

在 print 模式(`kimi -p "<prompt>"`)下,Kimi Code 只跑一个非交互的单轮 turn,主 agent 一结束就退出。如果你启动了后台任务(例如通过 `Agent(run_in_background=true)` 并发子代理)并希望它们跑完,请设置 `keep_alive_on_exit = true`:进程会在退出前等待所有后台任务进入终态,最长不超过 `print_wait_ceiling_s`。否则,单轮 turn 结束时后台任务会随进程一起被清理。

<!--
## `experimental`

Expand Down
1 change: 1 addition & 0 deletions packages/agent-core/src/rpc/core-api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -431,6 +431,7 @@ export interface SessionAPI extends AgentAPIWithId {
reconnectMcpServer: (payload: ReconnectMcpServerPayload) => void;
generateAgentsMd: (payload: EmptyPayload) => void;
getSessionWarnings: (payload: EmptyPayload) => readonly SessionWarning[];
waitForBackgroundTasksOnPrint: (payload: EmptyPayload) => void;
addAdditionalDir: (payload: AddAdditionalDirPayload) => AddAdditionalDirResult;
}

Expand Down
4 changes: 4 additions & 0 deletions packages/agent-core/src/rpc/core-impl.ts
Original file line number Diff line number Diff line change
Expand Up @@ -791,6 +791,10 @@ export class KimiCore implements PromisableMethods<CoreAPI> {
return this.sessionApi(sessionId).getSessionWarnings(payload);
}

waitForBackgroundTasksOnPrint({ sessionId, ...payload }: SessionScopedPayload<EmptyPayload>): Promise<void> {
return this.sessionApi(sessionId).waitForBackgroundTasksOnPrint(payload);
}

addAdditionalDir({
sessionId,
...payload
Expand Down
75 changes: 75 additions & 0 deletions packages/agent-core/src/session/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -423,6 +423,81 @@ export class Session {
);
}

/**
* Wait for all still-running background tasks (across every agent) to reach a
* terminal state before a `kimi -p` (print) run exits.
*
* Gated by `background.keep_alive_on_exit`: when it is not `true`, this returns
* immediately so print mode keeps its default single-turn semantics. The wait is
* bounded by `background.print_wait_ceiling_s` (default 3600s) so a wedged task
* cannot keep the process alive forever.
*
* Terminal notifications are suppressed for each task while we wait, so a task
* completing cannot `turn.steer` the (already finished) main agent into launching
* a new turn.
*/
async waitForBackgroundTasksOnPrint(): Promise<void> {
const keepAliveOnExit = resolveConfigValue({
env: process.env,
envKey: BACKGROUND_KEEP_ALIVE_ON_EXIT_ENV,
configValue: this.options.background?.keepAliveOnExit,
defaultValue: false,
parseEnv: parseBooleanEnv,
});
if (!keepAliveOnExit) return;

const ceilingS = this.options.background?.printWaitCeilingS ?? 3600;
const timeoutMs = ceilingS * 1000;
const deadline = Date.now() + timeoutMs;

// Re-enumerate active background tasks across every agent until none remain
// (or the ceiling expires). A subagent may fan out new background tasks
// after a previous enumeration, so a single pass could return while those
// later tasks are still running — breaking the "every background task"
// guarantee. Each round waits for the newly discovered tasks, then rescans
// to catch anything spawned in the meantime.
const seen = new Set<string>();
const allWaiters: Promise<unknown>[] = [];
while (Date.now() < deadline) {
const batch: Promise<unknown>[] = [];
const suppressions: Promise<void>[] = [];
let activeCount = 0;
for (const agent of this.readyAgents()) {
for (const task of agent.background.list(true)) {
activeCount++;
if (seen.has(task.taskId)) continue;
seen.add(task.taskId);
// suppressTerminalNotification sets the suppressed flag synchronously
// when called; defer awaiting the persist until after the whole
// enumeration so no task can complete and fire a notification while
// another task's persist write is pending.
suppressions.push(agent.background.suppressTerminalNotification(task.taskId));
const remaining = Math.max(1, deadline - Date.now());
const waiter = agent.background.wait(task.taskId, remaining);
batch.push(waiter);
allWaiters.push(waiter);
}
}
if (suppressions.length > 0) {
await Promise.all(suppressions);
}
if (activeCount === 0 || batch.length === 0) break;
this.log.info('waiting for background tasks before print exit', {
active: activeCount,
new: batch.length,
timeoutMs,
});
await Promise.all(batch);
}
if (allWaiters.length > 0) {
await Promise.all(allWaiters);
this.log.info('background tasks settled before print exit', {
count: seen.size,
timeoutMs,
});
}
}

async createAgent(
config: Partial<AgentOptions>,
options: CreateAgentOptions = {},
Expand Down
4 changes: 4 additions & 0 deletions packages/agent-core/src/session/rpc.ts
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,10 @@ export class SessionAPIImpl implements PromisableMethods<SessionAPI> {
return this.session.getSessionWarnings();
}

waitForBackgroundTasksOnPrint(_payload: EmptyPayload): Promise<void> {
return this.session.waitForBackgroundTasksOnPrint();
}

addAdditionalDir(payload: AddAdditionalDirPayload): Promise<AddAdditionalDirResult> {
return this.session.addAdditionalDir(payload.path, payload.persist);
}
Expand Down
160 changes: 160 additions & 0 deletions packages/agent-core/test/session/lifecycle-hooks.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,166 @@ describe('Session lifecycle hooks', () => {
expect(main.background.getTask(taskId)?.status).toBe('running');
});

it('waitForBackgroundTasksOnPrint returns immediately when keepAliveOnExit is false', async () => {
const { sessionDir, workDir } = await hookFixture();
const session = new Session({
kaos: testKaos.withCwd(workDir),
id: 'session-print-wait-disabled',
homedir: sessionDir,
rpc: createSessionRpc(),
skills: { explicitDirs: [join(workDir, 'missing-skills')] },
background: { keepAliveOnExit: false },
});
const agent = await session.createMain();
const { proc, killSpy } = pendingProcess();
const taskId = agent.background.registerTask(
new ProcessBackgroundTask(proc, 'sleep 60', 'no wait'),
);

await session.waitForBackgroundTasksOnPrint();

expect(killSpy).not.toHaveBeenCalled();
expect(agent.background.getTask(taskId)?.status).toBe('running');
await session.close();
});

it('waitForBackgroundTasksOnPrint waits for background tasks to finish when keepAliveOnExit is true', async () => {
const { sessionDir, workDir } = await hookFixture();
const session = new Session({
kaos: testKaos.withCwd(workDir),
id: 'session-print-wait',
homedir: sessionDir,
rpc: createSessionRpc(),
skills: { explicitDirs: [join(workDir, 'missing-skills')] },
background: { keepAliveOnExit: true },
});
const agent = await session.createMain();
const { proc } = pendingProcess(0);
const taskId = agent.background.registerTask(
new ProcessBackgroundTask(proc, 'sleep 60', 'wait for me'),
);

let settled = false;
const waitPromise = session.waitForBackgroundTasksOnPrint().then(() => {
settled = true;
});

await new Promise((resolve) => setImmediate(resolve));
expect(settled).toBe(false);

await proc.kill('SIGTERM');
await waitPromise;
expect(settled).toBe(true);
expect(agent.background.getTask(taskId)?.status).toBe('completed');
await session.close();
});

it('waitForBackgroundTasksOnPrint times out after printWaitCeilingS', async () => {
const { sessionDir, workDir } = await hookFixture();
const session = new Session({
kaos: testKaos.withCwd(workDir),
id: 'session-print-wait-timeout',
homedir: sessionDir,
rpc: createSessionRpc(),
skills: { explicitDirs: [join(workDir, 'missing-skills')] },
background: { keepAliveOnExit: true, printWaitCeilingS: 1 },
});
const agent = await session.createMain();
const { proc } = pendingProcess();
const taskId = agent.background.registerTask(
new ProcessBackgroundTask(proc, 'sleep 60', 'times out'),
);

await session.waitForBackgroundTasksOnPrint();

expect(agent.background.getTask(taskId)?.status).toBe('running');
await session.close();
});

it('waitForBackgroundTasksOnPrint waits for tasks spawned after the first enumeration', async () => {
const { sessionDir, workDir } = await hookFixture();
const session = new Session({
kaos: testKaos.withCwd(workDir),
id: 'session-print-wait-fanout',
homedir: sessionDir,
rpc: createSessionRpc(),
skills: { explicitDirs: [join(workDir, 'missing-skills')] },
background: { keepAliveOnExit: true },
});
const agent = await session.createMain();
const first = pendingProcess(0);
const firstTaskId = agent.background.registerTask(
new ProcessBackgroundTask(first.proc, 'sleep 60', 'first'),
);

let settled = false;
const waitPromise = session.waitForBackgroundTasksOnPrint().then(() => {
settled = true;
});

// Let the first enumeration run and suspend on the first task.
await new Promise((resolve) => setImmediate(resolve));
expect(settled).toBe(false);

// Fan out a second background task after the first enumeration.
const second = pendingProcess(0);
const secondTaskId = agent.background.registerTask(
new ProcessBackgroundTask(second.proc, 'sleep 60', 'second'),
);

// Finish the first task; the wait must not settle while the second is running.
await first.proc.kill('SIGTERM');
await new Promise((resolve) => setImmediate(resolve));
expect(settled).toBe(false);

// Finish the second task; the wait should now settle.
await second.proc.kill('SIGTERM');
await waitPromise;
expect(settled).toBe(true);
expect(agent.background.getTask(firstTaskId)?.status).toBe('completed');
expect(agent.background.getTask(secondTaskId)?.status).toBe('completed');
await session.close();
});

it('suppresses notifications for every active task before awaiting any of them', async () => {
const { sessionDir, workDir } = await hookFixture();
const session = new Session({
kaos: testKaos.withCwd(workDir),
id: 'session-print-wait-suppress-race',
homedir: sessionDir,
rpc: createSessionRpc(),
skills: { explicitDirs: [join(workDir, 'missing-skills')] },
background: { keepAliveOnExit: true },
});
const agent = await session.createMain();
const steerSpy = vi.spyOn(agent.turn, 'steer');

// Detached tasks fire a completion notification unless suppressed.
const first = pendingProcess(0);
agent.background.registerTask(new ProcessBackgroundTask(first.proc, 'sleep 60', 'first'), {
detached: true,
});
const second = pendingProcess(0);
agent.background.registerTask(
new ProcessBackgroundTask(second.proc, 'sleep 60', 'second'),
{ detached: true },
);

const waitPromise = session.waitForBackgroundTasksOnPrint();

// Let the synchronous enumeration run so both tasks get suppressed.
await new Promise((resolve) => setImmediate(resolve));

// Complete both tasks after suppression but before the wait settles.
await first.proc.kill('SIGTERM');
await second.proc.kill('SIGTERM');
await new Promise((resolve) => setImmediate(resolve));

expect(steerSpy).not.toHaveBeenCalled();
await waitPromise;
await session.close();
});

it('lets the environment override config when deciding background task cleanup', async () => {
vi.stubEnv('KIMI_CODE_BACKGROUND_KEEP_ALIVE_ON_EXIT', '0');
const { sessionDir, workDir } = await hookFixture();
Expand Down
5 changes: 5 additions & 0 deletions packages/node-sdk/src/rpc.ts
Original file line number Diff line number Diff line change
Expand Up @@ -542,6 +542,11 @@ export abstract class SDKRpcClientBase {
});
}

async waitForBackgroundTasksOnPrint(input: SessionIdRpcInput): Promise<void> {
const rpc = await this.getRpc();
return rpc.waitForBackgroundTasksOnPrint({ sessionId: input.sessionId });
}

async createGoal(input: SessionIdRpcInput & CreateGoalInput): Promise<GoalSnapshot> {
const rpc = await this.getRpc();
return rpc.createGoal({
Expand Down
Loading
Loading