Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/fruity-sloths-pull.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@livekit/agents': patch
---

Fix supervisor process crashes when child process dies unexpectedly
147 changes: 147 additions & 0 deletions agents/src/ipc/supervised_proc.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
// SPDX-FileCopyrightText: 2024 LiveKit, Inc.
//
// SPDX-License-Identifier: Apache-2.0
import { fork, spawn } from 'node:child_process';
import { unlinkSync, writeFileSync } from 'node:fs';
import { tmpdir } from 'node:os';
import { join } from 'node:path';
import pidusage from 'pidusage';
import { afterAll, beforeAll, describe, expect, it } from 'vitest';

const childScript = join(tmpdir(), 'test_child.mjs');

beforeAll(() => {
writeFileSync(
childScript,
`process.on('message', (msg) => process.send?.({ echo: msg }));
setInterval(() => {}, 1000);`,
);
});

afterAll(() => {
try {
unlinkSync(childScript);
} catch {}
});

async function getChildMemoryUsageMB(pid: number | undefined): Promise<number> {
if (!pid) return 0;
try {
const stats = await pidusage(pid);
return stats.memory / (1024 * 1024);
} catch (err) {
const code = (err as NodeJS.ErrnoException).code;
if (code === 'ENOENT' || code === 'ESRCH') {
return 0;
}
throw err;
}
}

describe('pidusage on dead process', () => {
it('raw pidusage throws on dead pid', async () => {
const child = spawn('sleep', ['10']);
const pid = child.pid!;

child.kill('SIGKILL');
await new Promise<void>((r) => child.on('exit', r));

await expect(pidusage(pid)).rejects.toThrow();
});

it('fixed version returns 0 instead of crashing', async () => {
const child = spawn('sleep', ['10']);
const pid = child.pid!;

child.kill('SIGKILL');
await new Promise<void>((r) => child.on('exit', r));

const mem = await getChildMemoryUsageMB(pid);
expect(mem).toBe(0);
});

it('handles concurrent calls on dying process', async () => {
const child = spawn('sleep', ['10']);
const pid = child.pid!;
const exitPromise = new Promise<void>((r) => child.on('exit', r));

child.kill('SIGKILL');

const results = await Promise.all([
getChildMemoryUsageMB(pid),
getChildMemoryUsageMB(pid),
getChildMemoryUsageMB(pid),
]);

await exitPromise;
expect(results.every((r) => r === 0)).toBe(true);
});
});

describe('IPC send on dead process', () => {
it('child.connected becomes false when child dies', async () => {
const child = fork(childScript, [], { stdio: ['pipe', 'pipe', 'pipe', 'ipc'] });
const exitPromise = new Promise<void>((r) => child.on('exit', r));

await new Promise((r) => setTimeout(r, 50));
expect(child.connected).toBe(true);

child.kill('SIGKILL');
await exitPromise;

expect(child.connected).toBe(false);
});

it('checking connected before send prevents crash', async () => {
const child = fork(childScript, [], { stdio: ['pipe', 'pipe', 'pipe', 'ipc'] });
const exitPromise = new Promise<void>((r) => child.on('exit', r));

let sent = 0;
let skipped = 0;

const interval = setInterval(() => {
if (child.connected) {
child.send({ ping: Date.now() });
sent++;
} else {
skipped++;
}
}, 20);

await new Promise((r) => setTimeout(r, 60));
child.kill('SIGKILL');
await exitPromise;
await new Promise((r) => setTimeout(r, 80));
clearInterval(interval);

expect(sent).toBeGreaterThan(0);
expect(skipped).toBeGreaterThan(0);
});
});

describe('timer cleanup', () => {
it('clearInterval stops the interval', async () => {
let count = 0;
const interval = setInterval(() => count++, 30);

await new Promise((r) => setTimeout(r, 80));
const countAtClear = count;
clearInterval(interval);

await new Promise((r) => setTimeout(r, 80));
expect(count).toBe(countAtClear);
});

it('double clear is safe', () => {
const interval = setInterval(() => {}, 100);
const timeout = setTimeout(() => {}, 1000);

clearInterval(interval);
clearTimeout(timeout);

expect(() => {
clearInterval(interval);
clearTimeout(timeout);
}).not.toThrow();
});
});
36 changes: 27 additions & 9 deletions agents/src/ipc/supervised_proc.ts
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,9 @@ export abstract class SupervisedProc {
await this.init.await;

this.#pingInterval = setInterval(() => {
this.proc!.send({ case: 'pingRequest', value: { timestamp: Date.now() } });
if (this.proc?.connected) {
this.proc.send({ case: 'pingRequest', value: { timestamp: Date.now() } });
}
}, this.#opts.pingInterval);

this.#pongTimeout = setTimeout(() => {
Expand Down Expand Up @@ -141,6 +143,7 @@ export abstract class SupervisedProc {
});

this.proc!.on('exit', () => {
this.clearTimers();
this.#join.resolve();
});

Expand All @@ -159,11 +162,13 @@ export abstract class SupervisedProc {

async initialize() {
const timer = setTimeout(() => {
const err = new Error('runner initialization timed out');
this.init.reject(err);
throw err;
this.init.reject(new Error('runner initialization timed out'));
}, this.#opts.initializeTimeout);
this.proc!.send({
if (!this.proc?.connected) {
this.init.reject(new Error('process not connected'));
return;
}
this.proc.send({
case: 'initializeRequest',
value: {
loggerOptions,
Expand All @@ -187,7 +192,9 @@ export abstract class SupervisedProc {
}
this.#closing = true;

this.proc!.send({ case: 'shutdownRequest' });
if (this.proc?.connected) {
this.proc.send({ case: 'shutdownRequest' });
}

const timer = setTimeout(() => {
this.#logger.error('job shutdown is taking too much time');
Expand All @@ -203,17 +210,28 @@ export abstract class SupervisedProc {
if (this.#runningJob) {
throw new Error('executor already has a running job');
}
if (!this.proc?.connected) {
throw new Error('process not connected');
}
this.#runningJob = info;
this.proc!.send({ case: 'startJobRequest', value: { runningJob: info } });
this.proc.send({ case: 'startJobRequest', value: { runningJob: info } });
}

private async getChildMemoryUsageMB(): Promise<number> {
const pid = this.proc?.pid;
if (!pid) {
return 0;
}
const stats = await pidusage(pid);
return stats.memory / (1024 * 1024); // Convert bytes to MB
try {
const stats = await pidusage(pid);
return stats.memory / (1024 * 1024);
} catch (err) {
const code = (err as NodeJS.ErrnoException).code;
if (code === 'ENOENT' || code === 'ESRCH') {
return 0;
}
throw err;
}
}

private clearTimers() {
Expand Down