From dff47bd81ce3705248cae656fbd4a6b1a0d62ca5 Mon Sep 17 00:00:00 2001 From: Nikita Date: Fri, 26 Dec 2025 13:08:53 -0800 Subject: [PATCH 1/2] fix(agents): prevent crash when child process dies - Handle pidusage ENOENT on dead process - Check proc.connected before send() - Clear timers on exit - Remove throw after reject in setTimeout --- agents/src/ipc/supervised_proc.test.ts | 147 +++++++++++++++++++++++++ agents/src/ipc/supervised_proc.ts | 36 ++++-- 2 files changed, 174 insertions(+), 9 deletions(-) create mode 100644 agents/src/ipc/supervised_proc.test.ts diff --git a/agents/src/ipc/supervised_proc.test.ts b/agents/src/ipc/supervised_proc.test.ts new file mode 100644 index 000000000..0640bf85a --- /dev/null +++ b/agents/src/ipc/supervised_proc.test.ts @@ -0,0 +1,147 @@ +// SPDX-FileCopyrightText: 2024 LiveKit, Inc. +// +// SPDX-License-Identifier: Apache-2.0 +import { fork, spawn } from 'node:child_process'; +import { unlinkSync, writeFileSync } from 'node:fs'; +import { tmpdir } from 'node:os'; +import { join } from 'node:path'; +import pidusage from 'pidusage'; +import { afterAll, beforeAll, describe, expect, it } from 'vitest'; + +const childScript = join(tmpdir(), 'test_child.mjs'); + +beforeAll(() => { + writeFileSync( + childScript, + `process.on('message', (msg) => process.send?.({ echo: msg })); + setInterval(() => {}, 1000);`, + ); +}); + +afterAll(() => { + try { + unlinkSync(childScript); + } catch {} +}); + +async function getChildMemoryUsageMB(pid: number | undefined): Promise { + if (!pid) return 0; + try { + const stats = await pidusage(pid); + return stats.memory / (1024 * 1024); + } catch (err) { + const code = (err as NodeJS.ErrnoException).code; + if (code === 'ENOENT' || code === 'ESRCH') { + return 0; + } + throw err; + } +} + +describe('pidusage on dead process', () => { + it('raw pidusage throws on dead pid', async () => { + const child = spawn('sleep', ['10']); + const pid = child.pid!; + + child.kill('SIGKILL'); + await new Promise((r) => child.on('exit', r)); + + await expect(pidusage(pid)).rejects.toThrow(); + }); + + it('fixed version returns 0 instead of crashing', async () => { + const child = spawn('sleep', ['10']); + const pid = child.pid!; + + child.kill('SIGKILL'); + await new Promise((r) => child.on('exit', r)); + + const mem = await getChildMemoryUsageMB(pid); + expect(mem).toBe(0); + }); + + it('handles concurrent calls on dying process', async () => { + const child = spawn('sleep', ['10']); + const pid = child.pid!; + const exitPromise = new Promise((r) => child.on('exit', r)); + + child.kill('SIGKILL'); + + const results = await Promise.all([ + getChildMemoryUsageMB(pid), + getChildMemoryUsageMB(pid), + getChildMemoryUsageMB(pid), + ]); + + await exitPromise; + expect(results.every((r) => r === 0)).toBe(true); + }); +}); + +describe('IPC send on dead process', () => { + it('child.connected becomes false when child dies', async () => { + const child = fork(childScript, [], { stdio: ['pipe', 'pipe', 'pipe', 'ipc'] }); + const exitPromise = new Promise((r) => child.on('exit', r)); + + await new Promise((r) => setTimeout(r, 50)); + expect(child.connected).toBe(true); + + child.kill('SIGKILL'); + await exitPromise; + + expect(child.connected).toBe(false); + }); + + it('checking connected before send prevents crash', async () => { + const child = fork(childScript, [], { stdio: ['pipe', 'pipe', 'pipe', 'ipc'] }); + const exitPromise = new Promise((r) => child.on('exit', r)); + + let sent = 0; + let skipped = 0; + + const interval = setInterval(() => { + if (child.connected) { + child.send({ ping: Date.now() }); + sent++; + } else { + skipped++; + } + }, 20); + + await new Promise((r) => setTimeout(r, 60)); + child.kill('SIGKILL'); + await exitPromise; + await new Promise((r) => setTimeout(r, 80)); + clearInterval(interval); + + expect(sent).toBeGreaterThan(0); + expect(skipped).toBeGreaterThan(0); + }); +}); + +describe('timer cleanup', () => { + it('clearInterval stops the interval', async () => { + let count = 0; + const interval = setInterval(() => count++, 30); + + await new Promise((r) => setTimeout(r, 80)); + const countAtClear = count; + clearInterval(interval); + + await new Promise((r) => setTimeout(r, 80)); + expect(count).toBe(countAtClear); + }); + + it('double clear is safe', () => { + const interval = setInterval(() => {}, 100); + const timeout = setTimeout(() => {}, 1000); + + clearInterval(interval); + clearTimeout(timeout); + + expect(() => { + clearInterval(interval); + clearTimeout(timeout); + }).not.toThrow(); + }); +}); diff --git a/agents/src/ipc/supervised_proc.ts b/agents/src/ipc/supervised_proc.ts index a330673df..7ce5190b6 100644 --- a/agents/src/ipc/supervised_proc.ts +++ b/agents/src/ipc/supervised_proc.ts @@ -80,7 +80,9 @@ export abstract class SupervisedProc { await this.init.await; this.#pingInterval = setInterval(() => { - this.proc!.send({ case: 'pingRequest', value: { timestamp: Date.now() } }); + if (this.proc?.connected) { + this.proc.send({ case: 'pingRequest', value: { timestamp: Date.now() } }); + } }, this.#opts.pingInterval); this.#pongTimeout = setTimeout(() => { @@ -141,6 +143,7 @@ export abstract class SupervisedProc { }); this.proc!.on('exit', () => { + this.clearTimers(); this.#join.resolve(); }); @@ -159,11 +162,13 @@ export abstract class SupervisedProc { async initialize() { const timer = setTimeout(() => { - const err = new Error('runner initialization timed out'); - this.init.reject(err); - throw err; + this.init.reject(new Error('runner initialization timed out')); }, this.#opts.initializeTimeout); - this.proc!.send({ + if (!this.proc?.connected) { + this.init.reject(new Error('process not connected')); + return; + } + this.proc.send({ case: 'initializeRequest', value: { loggerOptions, @@ -187,7 +192,9 @@ export abstract class SupervisedProc { } this.#closing = true; - this.proc!.send({ case: 'shutdownRequest' }); + if (this.proc?.connected) { + this.proc.send({ case: 'shutdownRequest' }); + } const timer = setTimeout(() => { this.#logger.error('job shutdown is taking too much time'); @@ -203,8 +210,11 @@ export abstract class SupervisedProc { if (this.#runningJob) { throw new Error('executor already has a running job'); } + if (!this.proc?.connected) { + throw new Error('process not connected'); + } this.#runningJob = info; - this.proc!.send({ case: 'startJobRequest', value: { runningJob: info } }); + this.proc.send({ case: 'startJobRequest', value: { runningJob: info } }); } private async getChildMemoryUsageMB(): Promise { @@ -212,8 +222,16 @@ export abstract class SupervisedProc { if (!pid) { return 0; } - const stats = await pidusage(pid); - return stats.memory / (1024 * 1024); // Convert bytes to MB + try { + const stats = await pidusage(pid); + return stats.memory / (1024 * 1024); + } catch (err) { + const code = (err as NodeJS.ErrnoException).code; + if (code === 'ENOENT' || code === 'ESRCH') { + return 0; + } + throw err; + } } private clearTimers() { From 59c7d1a259be4105d61a96ce70af73dffc4e048d Mon Sep 17 00:00:00 2001 From: Brian Yin Date: Mon, 5 Jan 2026 16:12:11 +0800 Subject: [PATCH 2/2] add changeset --- .changeset/fruity-sloths-pull.md | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 .changeset/fruity-sloths-pull.md diff --git a/.changeset/fruity-sloths-pull.md b/.changeset/fruity-sloths-pull.md new file mode 100644 index 000000000..104f64b30 --- /dev/null +++ b/.changeset/fruity-sloths-pull.md @@ -0,0 +1,5 @@ +--- +'@livekit/agents': patch +--- + +Fix supervisor process crashes when child process dies unexpectedly