Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions packages/engine/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ export type {
export {
calculateOptimalWorkers,
distributeFrames,
distributeFramesInterleaved,
executeParallelCapture,
mergeWorkerFrames,
getSystemResources,
Expand Down
69 changes: 69 additions & 0 deletions packages/engine/src/services/parallelCoordinator.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { describe, it, expect } from "vitest";
import {
calculateOptimalWorkers,
distributeFrames,
distributeFramesInterleaved,
formatWorkerFailure,
selectWorkerDiagnostics,
shouldVerifyWorkerGpu,
Expand Down Expand Up @@ -50,6 +51,74 @@ describe("distributeFrames", () => {
});
});

describe("distributeFramesInterleaved", () => {
it("assigns worker i frames [i, i+N, i+2N, …] via stride", () => {
const tasks = distributeFramesInterleaved(10, 3, "/tmp/work");
expect(tasks).toHaveLength(3);

// worker 0: frames 0, 3, 6, 9 → startFrame=0, stride=3, endFrame=10
expect(tasks[0]?.workerId).toBe(0);
expect(tasks[0]?.startFrame).toBe(0);
expect(tasks[0]?.endFrame).toBe(10);
expect(tasks[0]?.stride).toBe(3);

// worker 1: frames 1, 4, 7 → startFrame=1, stride=3, endFrame=10
expect(tasks[1]?.workerId).toBe(1);
expect(tasks[1]?.startFrame).toBe(1);
expect(tasks[1]?.stride).toBe(3);

// worker 2: frames 2, 5, 8 → startFrame=2, stride=3, endFrame=10
expect(tasks[2]?.workerId).toBe(2);
expect(tasks[2]?.startFrame).toBe(2);
expect(tasks[2]?.stride).toBe(3);
});

it("all frames are covered exactly once across workers", () => {
for (const [total, workers] of [
[10, 3],
[12, 4],
[7, 2],
[1, 4],
] as [number, number][]) {
const tasks = distributeFramesInterleaved(total, workers, "/tmp/work");
const captured = new Set<number>();
for (const task of tasks) {
for (let i = task.startFrame; i < task.endFrame; i += task.stride ?? 1) {
expect(captured.has(i)).toBe(false); // no duplicates
captured.add(i);
}
}
expect(captured.size).toBe(Math.min(total, total)); // all frames present
for (let i = 0; i < total; i++) {
expect(captured.has(i)).toBe(true);
}
}
});

it("guards workerCount > totalFrames — only spawns as many workers as there are frames", () => {
const tasks = distributeFramesInterleaved(2, 5, "/tmp/work");
// Only 2 workers should be created (one per frame), not 5
expect(tasks).toHaveLength(2);
expect(tasks[0]?.startFrame).toBe(0);
expect(tasks[1]?.startFrame).toBe(1);
});

it("single worker degenerates to a single task covering all frames with stride=1", () => {
const tasks = distributeFramesInterleaved(100, 1, "/tmp/work");
expect(tasks).toHaveLength(1);
expect(tasks[0]?.startFrame).toBe(0);
expect(tasks[0]?.endFrame).toBe(100);
expect(tasks[0]?.stride).toBe(1);
});

it("assigns worker output directories", () => {
const tasks = distributeFramesInterleaved(6, 3, "/tmp/my-work");
expect(tasks[0]?.outputDir).toContain("worker-0");
expect(tasks[1]?.outputDir).toContain("worker-1");
expect(tasks[2]?.outputDir).toContain("worker-2");
});
});

describe("calculateOptimalWorkers", () => {
it("lets high-cost auto renders fall back to one worker when CPU budget requires it", () => {
const workers = calculateOptimalWorkers(180, undefined, {
Expand Down
58 changes: 57 additions & 1 deletion packages/engine/src/services/parallelCoordinator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,15 @@ export interface WorkerTask {
* calculation still uses the absolute frame index.
*/
outputFrameOffset?: number;
/**
* Step size between frames assigned to this worker. Default 1 (contiguous
* chunk). Set to `workerCount` for interleaved distribution — worker `i`
* captures frames `i, i+N, i+2N, …` so all workers advance in lockstep and
* the streaming-encode reorder buffer stays nearly uncontended.
* Only meaningful for the streaming capture path; disk capture uses the
* default contiguous layout so worker output directories are contiguous.
*/
stride?: number;
}

export interface WorkerResult {
Expand Down Expand Up @@ -209,6 +218,52 @@ export function distributeFrames(
return tasks;
}

/**
* Distribute frames across workers using an interleaved (round-robin) pattern.
*
* Worker `i` captures frames `i, i+N, i+2N, …` where `N = workerCount`.
* All workers advance through the timeline in near-lockstep, which means the
* `FrameReorderBuffer` in the streaming encode path stays nearly uncontended:
* whichever worker is waiting on frame `k` unblocks as soon as the worker
* that owns `k-1` finishes and calls `advanceTo(k)`.
*
* Contrast with the contiguous `distributeFrames`: worker 1 would have to wait
* for ALL of worker 0's chunk before it could unblock, collapsing N workers
* down to effectively 1 for the streaming path.
*
* Use this distribution **only** for the streaming capture path. Disk capture
* writes frames to worker-local directories and relies on contiguous layout so
* `mergeWorkerFrames` can rename/copy files without index arithmetic.
*/
export function distributeFramesInterleaved(
totalFrames: number,
workerCount: number,
workDir: string,
): WorkerTask[] {
if (workerCount <= 1) {
return [
{
workerId: 0,
startFrame: 0,
endFrame: totalFrames,
stride: 1,
outputDir: join(workDir, "worker-0"),
},
];
}
const tasks: WorkerTask[] = [];
for (let i = 0; i < workerCount && i < totalFrames; i++) {
tasks.push({
workerId: i,
startFrame: i,
endFrame: totalFrames,
stride: workerCount,
outputDir: join(workDir, `worker-${i}`),
});
}
return tasks;
}

/**
* Decide whether a parallel worker should run the per-worker SwiftShader
* assertion. Gated to worker 0 only: workers within a chunk share the same
Expand All @@ -229,7 +284,8 @@ async function captureFrameRange(
): Promise<number> {
let framesCaptured = 0;
const outputOffset = task.outputFrameOffset ?? 0;
for (let i = task.startFrame; i < task.endFrame; i++) {
const stride = task.stride ?? 1;
for (let i = task.startFrame; i < task.endFrame; i += stride) {
if (signal?.aborted) throw new Error("Parallel worker cancelled");
const time = (i * captureOptions.fps.den) / captureOptions.fps.num;
const fileFrameIdx = i - outputOffset;
Expand Down
132 changes: 128 additions & 4 deletions packages/engine/src/services/streamingEncoder.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -556,7 +556,7 @@ describe("spawnStreamingEncoder lifecycle and cleanup", () => {
const dir = mkdtempSync(join(tmpdir(), "se-writefail-"));
const encoder = await spawnStreamingEncoder(join(dir, "out.mp4"), baseOptions);

expect(encoder.writeFrame(Buffer.from([0]))).toBe(true);
expect(await encoder.writeFrame(Buffer.from([0]))).toBe(true);

const proc = calls[0]!.proc;
await new Promise<void>((resolve) => {
Expand All @@ -566,7 +566,131 @@ describe("spawnStreamingEncoder lifecycle and cleanup", () => {
});
});

expect(encoder.writeFrame(Buffer.from([0]))).toBe(false);
expect(await encoder.writeFrame(Buffer.from([0]))).toBe(false);
});

it("back-pressure: writeFrame blocks until drain is emitted", async () => {
const { spawn, calls } = createSpawnSpy();
vi.resetModules();
vi.doMock("child_process", () => ({ spawn }));

const { spawnStreamingEncoder } = await import("./streamingEncoder.js");
const dir = mkdtempSync(join(tmpdir(), "se-drain-blocks-"));
const encoder = await spawnStreamingEncoder(join(dir, "out.mp4"), baseOptions);

const proc = calls[0]!.proc;
proc.stdin.write = (_chunk: Buffer) => false; // simulate full buffer

let resolved = false;
const p = encoder.writeFrame(Buffer.from([0])).then((v) => {
resolved = true;
return v;
});

// Flush microtasks so writeFrame reaches the drain-await
await Promise.resolve();
expect(resolved).toBe(false); // still blocked

proc.stdin.emit("drain");
expect(await p).toBe(true);
expect(resolved).toBe(true);
});

it("back-pressure: writeFrame returns false when encoder dies while awaiting drain", async () => {
const { spawn, calls } = createSpawnSpy();
vi.resetModules();
vi.doMock("child_process", () => ({ spawn }));

const { spawnStreamingEncoder } = await import("./streamingEncoder.js");
const dir = mkdtempSync(join(tmpdir(), "se-drain-death-"));
const encoder = await spawnStreamingEncoder(join(dir, "out.mp4"), baseOptions);

const proc = calls[0]!.proc;
proc.stdin.write = (_chunk: Buffer) => false;

const p = encoder.writeFrame(Buffer.from([0]));
await Promise.resolve(); // reach drain-await

// FFmpeg exits while we're waiting
proc.emit("close", 1);
await Promise.resolve();
// Drain fires (e.g. OS flushes the pipe) but encoder is already dead
proc.stdin.emit("drain");

expect(await p).toBe(false);
});

it("back-pressure: listeners are cleaned up when finish fires before drain", async () => {
const { spawn, calls } = createSpawnSpy();
vi.resetModules();
vi.doMock("child_process", () => ({ spawn }));

const { spawnStreamingEncoder } = await import("./streamingEncoder.js");
const dir = mkdtempSync(join(tmpdir(), "se-drain-finish-"));
const encoder = await spawnStreamingEncoder(join(dir, "out.mp4"), baseOptions);

const proc = calls[0]!.proc;
// Capture baseline listener counts (spawnStreamingEncoder may register its own)
const baselineDrain = proc.stdin.listenerCount("drain");
const baselineFinish = proc.stdin.listenerCount("finish");
const baselineError = proc.stdin.listenerCount("error");

proc.stdin.write = (_chunk: Buffer) => false;
const p = encoder.writeFrame(Buffer.from([0]));
await Promise.resolve(); // reach drain-await — 3 listeners registered (+drain, +finish, +error)

expect(proc.stdin.listenerCount("drain")).toBe(baselineDrain + 1);
expect(proc.stdin.listenerCount("finish")).toBe(baselineFinish + 1);
expect(proc.stdin.listenerCount("error")).toBe(baselineError + 1);

proc.stdin.emit("finish"); // finish fires instead of drain
await p; // resolves

// All three back-pressure listeners must be removed regardless of which event fired
expect(proc.stdin.listenerCount("drain")).toBe(baselineDrain);
expect(proc.stdin.listenerCount("finish")).toBe(baselineFinish);
expect(proc.stdin.listenerCount("error")).toBe(baselineError);
});

it("back-pressure: inactivity timer resets after drain (slow-but-alive FFmpeg)", async () => {
vi.useFakeTimers();
try {
const { spawn, calls } = createSpawnSpy();
vi.resetModules();
vi.doMock("child_process", () => ({ spawn }));

const { spawnStreamingEncoder } = await import("./streamingEncoder.js");
const dir = mkdtempSync(join(tmpdir(), "se-drain-timer-"));
const encoder = await spawnStreamingEncoder(join(dir, "out.mp4"), baseOptions, undefined, {
ffmpegStreamingTimeout: 1000,
});

const proc = calls[0]!.proc;
proc.stdin.write = (_chunk: Buffer) => false; // every write triggers drain-await

// Advance 800ms — timer set at spawn, will fire at T=1000ms if not reset
vi.advanceTimersByTime(800);

// Kick off a writeFrame — floating promise, drain-await registered
void encoder.writeFrame(Buffer.from([0]));
// Flush microtasks so the drain listener is registered before we emit
await Promise.resolve();

// FFmpeg drains: proves it's alive, resetTimer() must fire
proc.stdin.emit("drain");
await Promise.resolve(); // run the post-drain microtasks (resetTimer call)

// Advance 300ms more (T=1100ms from spawn). Without the post-drain reset
// the original timer would have fired SIGTERM at T=1000ms.
vi.advanceTimersByTime(300);
expect(proc.kill).not.toHaveBeenCalled();

// Advance past the reset timer (1000ms from the drain at T=800ms → T=1800ms)
vi.advanceTimersByTime(700);
expect(proc.kill).toHaveBeenCalledWith("SIGTERM");
} finally {
vi.useRealTimers();
}
});

it("close() removes the abort listener so a post-close abort does not re-kill ffmpeg", async () => {
Expand Down Expand Up @@ -613,7 +737,7 @@ describe("spawnStreamingEncoder lifecycle and cleanup", () => {
// progressing" capture the encoder must still be alive. The old total-
// render timeout would have fired SIGTERM at ~1000ms.
for (let i = 0; i < 9; i++) {
encoder.writeFrame(Buffer.from([i]));
void encoder.writeFrame(Buffer.from([i]));
vi.advanceTimersByTime(900);
}
expect(proc.kill).not.toHaveBeenCalled();
Expand Down Expand Up @@ -651,7 +775,7 @@ describe("spawnStreamingEncoder lifecycle and cleanup", () => {
// should NOT fire (every write was buffered, not accepted), so the
// 1000ms timer (last reset on spawn) elapses near the start.
for (let i = 0; i < 9; i++) {
encoder.writeFrame(Buffer.from([i]));
void encoder.writeFrame(Buffer.from([i]));
vi.advanceTimersByTime(900);
}
expect(proc.kill).toHaveBeenCalledWith("SIGTERM");
Expand Down
48 changes: 35 additions & 13 deletions packages/engine/src/services/streamingEncoder.ts
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ export interface StreamingEncoderResult {
}

export interface StreamingEncoder {
writeFrame: (buffer: Buffer) => boolean;
writeFrame: (buffer: Buffer) => Promise<boolean>;
close: () => Promise<StreamingEncoderResult>;
getExitStatus: () => "running" | "success" | "error";
}
Expand Down Expand Up @@ -447,7 +447,7 @@ export async function spawnStreamingEncoder(
resetTimer();

const encoder: StreamingEncoder = {
writeFrame: (buffer: Buffer): boolean => {
writeFrame: async (buffer: Buffer): Promise<boolean> => {
if (exitStatus !== "running" || !ffmpeg.stdin || ffmpeg.stdin.destroyed) {
return false;
}
Expand All @@ -458,17 +458,39 @@ export async function spawnStreamingEncoder(
// and flicker.
const copy = Buffer.from(buffer);
const accepted = ffmpeg.stdin.write(copy);
// Reset inactivity timer ONLY on `accepted === true`. `true` means the
// write went through to the kernel pipe without buffering in Node —
// proof FFmpeg is actually consuming. `false` means Node's writable
// stream had to buffer (FFmpeg hasn't drained the pipe yet); we deliberately
// don't reset on `false` so a hung FFmpeg with a still-producing Chrome
// can't keep us alive forever while Node's stdin buffer grows to OOM. In
// steady state with a slower-but-alive FFmpeg, writes alternate between
// true and false as the buffer drains and refills; the trues are enough
// to keep the heartbeat ticking.
if (accepted) resetTimer();
return accepted;
// Timer reset policy — three cases:
// 1. `accepted === true`: write went straight to the kernel pipe; FFmpeg
// is keeping up. Reset now.
// 2. `accepted === false` (buffer full): do NOT reset immediately. If
// FFmpeg is truly hung the timer must still fire to prevent the Node
// stdin buffer from growing unboundedly → OOM.
// 3. After `await drain`: the pipe cleared, proving FFmpeg is alive and
// consuming. Reset then — a slow-but-alive FFmpeg must never trigger
// a spurious SIGTERM.
if (accepted) {
resetTimer();
return true;
}
// Back-pressure: FFmpeg stdin buffer is full. Await drain before returning
// so the capture loop naturally throttles to FFmpeg's encode throughput.
// This prevents Node's internal stdin buffer from growing unboundedly on
// long high-worker-count renders.
await new Promise<void>((resolve) => {
const stdin = ffmpeg.stdin!;
const cleanup = () => {
stdin.removeListener("drain", onEvent);
stdin.removeListener("finish", onEvent);
stdin.removeListener("error", onEvent);
resolve();
};
const onEvent = cleanup;
stdin.once("drain", onEvent);
stdin.once("finish", onEvent);
stdin.once("error", onEvent);
});
if (exitStatus !== "running") return false;
resetTimer();
return true;
},

close: async (): Promise<StreamingEncoderResult> => {
Expand Down
Loading