diff --git a/packages/engine/src/index.ts b/packages/engine/src/index.ts index ef26a6a00..f0d03e957 100644 --- a/packages/engine/src/index.ts +++ b/packages/engine/src/index.ts @@ -151,6 +151,7 @@ export type { export { calculateOptimalWorkers, distributeFrames, + distributeFramesInterleaved, executeParallelCapture, mergeWorkerFrames, getSystemResources, diff --git a/packages/engine/src/services/parallelCoordinator.test.ts b/packages/engine/src/services/parallelCoordinator.test.ts index 523a51442..0a1213640 100644 --- a/packages/engine/src/services/parallelCoordinator.test.ts +++ b/packages/engine/src/services/parallelCoordinator.test.ts @@ -2,6 +2,7 @@ import { describe, it, expect } from "vitest"; import { calculateOptimalWorkers, distributeFrames, + distributeFramesInterleaved, formatWorkerFailure, selectWorkerDiagnostics, shouldVerifyWorkerGpu, @@ -50,6 +51,74 @@ describe("distributeFrames", () => { }); }); +describe("distributeFramesInterleaved", () => { + it("assigns worker i frames [i, i+N, i+2N, …] via stride", () => { + const tasks = distributeFramesInterleaved(10, 3, "/tmp/work"); + expect(tasks).toHaveLength(3); + + // worker 0: frames 0, 3, 6, 9 → startFrame=0, stride=3, endFrame=10 + expect(tasks[0]?.workerId).toBe(0); + expect(tasks[0]?.startFrame).toBe(0); + expect(tasks[0]?.endFrame).toBe(10); + expect(tasks[0]?.stride).toBe(3); + + // worker 1: frames 1, 4, 7 → startFrame=1, stride=3, endFrame=10 + expect(tasks[1]?.workerId).toBe(1); + expect(tasks[1]?.startFrame).toBe(1); + expect(tasks[1]?.stride).toBe(3); + + // worker 2: frames 2, 5, 8 → startFrame=2, stride=3, endFrame=10 + expect(tasks[2]?.workerId).toBe(2); + expect(tasks[2]?.startFrame).toBe(2); + expect(tasks[2]?.stride).toBe(3); + }); + + it("all frames are covered exactly once across workers", () => { + for (const [total, workers] of [ + [10, 3], + [12, 4], + [7, 2], + [1, 4], + ] as [number, number][]) { + const tasks = distributeFramesInterleaved(total, workers, "/tmp/work"); + const captured = new Set(); + for (const task of tasks) { + for (let i = task.startFrame; i < task.endFrame; i += task.stride ?? 1) { + expect(captured.has(i)).toBe(false); // no duplicates + captured.add(i); + } + } + expect(captured.size).toBe(Math.min(total, total)); // all frames present + for (let i = 0; i < total; i++) { + expect(captured.has(i)).toBe(true); + } + } + }); + + it("guards workerCount > totalFrames — only spawns as many workers as there are frames", () => { + const tasks = distributeFramesInterleaved(2, 5, "/tmp/work"); + // Only 2 workers should be created (one per frame), not 5 + expect(tasks).toHaveLength(2); + expect(tasks[0]?.startFrame).toBe(0); + expect(tasks[1]?.startFrame).toBe(1); + }); + + it("single worker degenerates to a single task covering all frames with stride=1", () => { + const tasks = distributeFramesInterleaved(100, 1, "/tmp/work"); + expect(tasks).toHaveLength(1); + expect(tasks[0]?.startFrame).toBe(0); + expect(tasks[0]?.endFrame).toBe(100); + expect(tasks[0]?.stride).toBe(1); + }); + + it("assigns worker output directories", () => { + const tasks = distributeFramesInterleaved(6, 3, "/tmp/my-work"); + expect(tasks[0]?.outputDir).toContain("worker-0"); + expect(tasks[1]?.outputDir).toContain("worker-1"); + expect(tasks[2]?.outputDir).toContain("worker-2"); + }); +}); + describe("calculateOptimalWorkers", () => { it("lets high-cost auto renders fall back to one worker when CPU budget requires it", () => { const workers = calculateOptimalWorkers(180, undefined, { diff --git a/packages/engine/src/services/parallelCoordinator.ts b/packages/engine/src/services/parallelCoordinator.ts index 45a3ffbbc..c76432009 100644 --- a/packages/engine/src/services/parallelCoordinator.ts +++ b/packages/engine/src/services/parallelCoordinator.ts @@ -41,6 +41,15 @@ export interface WorkerTask { * calculation still uses the absolute frame index. */ outputFrameOffset?: number; + /** + * Step size between frames assigned to this worker. Default 1 (contiguous + * chunk). Set to `workerCount` for interleaved distribution — worker `i` + * captures frames `i, i+N, i+2N, …` so all workers advance in lockstep and + * the streaming-encode reorder buffer stays nearly uncontended. + * Only meaningful for the streaming capture path; disk capture uses the + * default contiguous layout so worker output directories are contiguous. + */ + stride?: number; } export interface WorkerResult { @@ -209,6 +218,52 @@ export function distributeFrames( return tasks; } +/** + * Distribute frames across workers using an interleaved (round-robin) pattern. + * + * Worker `i` captures frames `i, i+N, i+2N, …` where `N = workerCount`. + * All workers advance through the timeline in near-lockstep, which means the + * `FrameReorderBuffer` in the streaming encode path stays nearly uncontended: + * whichever worker is waiting on frame `k` unblocks as soon as the worker + * that owns `k-1` finishes and calls `advanceTo(k)`. + * + * Contrast with the contiguous `distributeFrames`: worker 1 would have to wait + * for ALL of worker 0's chunk before it could unblock, collapsing N workers + * down to effectively 1 for the streaming path. + * + * Use this distribution **only** for the streaming capture path. Disk capture + * writes frames to worker-local directories and relies on contiguous layout so + * `mergeWorkerFrames` can rename/copy files without index arithmetic. + */ +export function distributeFramesInterleaved( + totalFrames: number, + workerCount: number, + workDir: string, +): WorkerTask[] { + if (workerCount <= 1) { + return [ + { + workerId: 0, + startFrame: 0, + endFrame: totalFrames, + stride: 1, + outputDir: join(workDir, "worker-0"), + }, + ]; + } + const tasks: WorkerTask[] = []; + for (let i = 0; i < workerCount && i < totalFrames; i++) { + tasks.push({ + workerId: i, + startFrame: i, + endFrame: totalFrames, + stride: workerCount, + outputDir: join(workDir, `worker-${i}`), + }); + } + return tasks; +} + /** * Decide whether a parallel worker should run the per-worker SwiftShader * assertion. Gated to worker 0 only: workers within a chunk share the same @@ -229,7 +284,8 @@ async function captureFrameRange( ): Promise { let framesCaptured = 0; const outputOffset = task.outputFrameOffset ?? 0; - for (let i = task.startFrame; i < task.endFrame; i++) { + const stride = task.stride ?? 1; + for (let i = task.startFrame; i < task.endFrame; i += stride) { if (signal?.aborted) throw new Error("Parallel worker cancelled"); const time = (i * captureOptions.fps.den) / captureOptions.fps.num; const fileFrameIdx = i - outputOffset; diff --git a/packages/engine/src/services/streamingEncoder.test.ts b/packages/engine/src/services/streamingEncoder.test.ts index b5c61a9b2..09d88cd06 100644 --- a/packages/engine/src/services/streamingEncoder.test.ts +++ b/packages/engine/src/services/streamingEncoder.test.ts @@ -556,7 +556,7 @@ describe("spawnStreamingEncoder lifecycle and cleanup", () => { const dir = mkdtempSync(join(tmpdir(), "se-writefail-")); const encoder = await spawnStreamingEncoder(join(dir, "out.mp4"), baseOptions); - expect(encoder.writeFrame(Buffer.from([0]))).toBe(true); + expect(await encoder.writeFrame(Buffer.from([0]))).toBe(true); const proc = calls[0]!.proc; await new Promise((resolve) => { @@ -566,7 +566,131 @@ describe("spawnStreamingEncoder lifecycle and cleanup", () => { }); }); - expect(encoder.writeFrame(Buffer.from([0]))).toBe(false); + expect(await encoder.writeFrame(Buffer.from([0]))).toBe(false); + }); + + it("back-pressure: writeFrame blocks until drain is emitted", async () => { + const { spawn, calls } = createSpawnSpy(); + vi.resetModules(); + vi.doMock("child_process", () => ({ spawn })); + + const { spawnStreamingEncoder } = await import("./streamingEncoder.js"); + const dir = mkdtempSync(join(tmpdir(), "se-drain-blocks-")); + const encoder = await spawnStreamingEncoder(join(dir, "out.mp4"), baseOptions); + + const proc = calls[0]!.proc; + proc.stdin.write = (_chunk: Buffer) => false; // simulate full buffer + + let resolved = false; + const p = encoder.writeFrame(Buffer.from([0])).then((v) => { + resolved = true; + return v; + }); + + // Flush microtasks so writeFrame reaches the drain-await + await Promise.resolve(); + expect(resolved).toBe(false); // still blocked + + proc.stdin.emit("drain"); + expect(await p).toBe(true); + expect(resolved).toBe(true); + }); + + it("back-pressure: writeFrame returns false when encoder dies while awaiting drain", async () => { + const { spawn, calls } = createSpawnSpy(); + vi.resetModules(); + vi.doMock("child_process", () => ({ spawn })); + + const { spawnStreamingEncoder } = await import("./streamingEncoder.js"); + const dir = mkdtempSync(join(tmpdir(), "se-drain-death-")); + const encoder = await spawnStreamingEncoder(join(dir, "out.mp4"), baseOptions); + + const proc = calls[0]!.proc; + proc.stdin.write = (_chunk: Buffer) => false; + + const p = encoder.writeFrame(Buffer.from([0])); + await Promise.resolve(); // reach drain-await + + // FFmpeg exits while we're waiting + proc.emit("close", 1); + await Promise.resolve(); + // Drain fires (e.g. OS flushes the pipe) but encoder is already dead + proc.stdin.emit("drain"); + + expect(await p).toBe(false); + }); + + it("back-pressure: listeners are cleaned up when finish fires before drain", async () => { + const { spawn, calls } = createSpawnSpy(); + vi.resetModules(); + vi.doMock("child_process", () => ({ spawn })); + + const { spawnStreamingEncoder } = await import("./streamingEncoder.js"); + const dir = mkdtempSync(join(tmpdir(), "se-drain-finish-")); + const encoder = await spawnStreamingEncoder(join(dir, "out.mp4"), baseOptions); + + const proc = calls[0]!.proc; + // Capture baseline listener counts (spawnStreamingEncoder may register its own) + const baselineDrain = proc.stdin.listenerCount("drain"); + const baselineFinish = proc.stdin.listenerCount("finish"); + const baselineError = proc.stdin.listenerCount("error"); + + proc.stdin.write = (_chunk: Buffer) => false; + const p = encoder.writeFrame(Buffer.from([0])); + await Promise.resolve(); // reach drain-await — 3 listeners registered (+drain, +finish, +error) + + expect(proc.stdin.listenerCount("drain")).toBe(baselineDrain + 1); + expect(proc.stdin.listenerCount("finish")).toBe(baselineFinish + 1); + expect(proc.stdin.listenerCount("error")).toBe(baselineError + 1); + + proc.stdin.emit("finish"); // finish fires instead of drain + await p; // resolves + + // All three back-pressure listeners must be removed regardless of which event fired + expect(proc.stdin.listenerCount("drain")).toBe(baselineDrain); + expect(proc.stdin.listenerCount("finish")).toBe(baselineFinish); + expect(proc.stdin.listenerCount("error")).toBe(baselineError); + }); + + it("back-pressure: inactivity timer resets after drain (slow-but-alive FFmpeg)", async () => { + vi.useFakeTimers(); + try { + const { spawn, calls } = createSpawnSpy(); + vi.resetModules(); + vi.doMock("child_process", () => ({ spawn })); + + const { spawnStreamingEncoder } = await import("./streamingEncoder.js"); + const dir = mkdtempSync(join(tmpdir(), "se-drain-timer-")); + const encoder = await spawnStreamingEncoder(join(dir, "out.mp4"), baseOptions, undefined, { + ffmpegStreamingTimeout: 1000, + }); + + const proc = calls[0]!.proc; + proc.stdin.write = (_chunk: Buffer) => false; // every write triggers drain-await + + // Advance 800ms — timer set at spawn, will fire at T=1000ms if not reset + vi.advanceTimersByTime(800); + + // Kick off a writeFrame — floating promise, drain-await registered + void encoder.writeFrame(Buffer.from([0])); + // Flush microtasks so the drain listener is registered before we emit + await Promise.resolve(); + + // FFmpeg drains: proves it's alive, resetTimer() must fire + proc.stdin.emit("drain"); + await Promise.resolve(); // run the post-drain microtasks (resetTimer call) + + // Advance 300ms more (T=1100ms from spawn). Without the post-drain reset + // the original timer would have fired SIGTERM at T=1000ms. + vi.advanceTimersByTime(300); + expect(proc.kill).not.toHaveBeenCalled(); + + // Advance past the reset timer (1000ms from the drain at T=800ms → T=1800ms) + vi.advanceTimersByTime(700); + expect(proc.kill).toHaveBeenCalledWith("SIGTERM"); + } finally { + vi.useRealTimers(); + } }); it("close() removes the abort listener so a post-close abort does not re-kill ffmpeg", async () => { @@ -613,7 +737,7 @@ describe("spawnStreamingEncoder lifecycle and cleanup", () => { // progressing" capture the encoder must still be alive. The old total- // render timeout would have fired SIGTERM at ~1000ms. for (let i = 0; i < 9; i++) { - encoder.writeFrame(Buffer.from([i])); + void encoder.writeFrame(Buffer.from([i])); vi.advanceTimersByTime(900); } expect(proc.kill).not.toHaveBeenCalled(); @@ -651,7 +775,7 @@ describe("spawnStreamingEncoder lifecycle and cleanup", () => { // should NOT fire (every write was buffered, not accepted), so the // 1000ms timer (last reset on spawn) elapses near the start. for (let i = 0; i < 9; i++) { - encoder.writeFrame(Buffer.from([i])); + void encoder.writeFrame(Buffer.from([i])); vi.advanceTimersByTime(900); } expect(proc.kill).toHaveBeenCalledWith("SIGTERM"); diff --git a/packages/engine/src/services/streamingEncoder.ts b/packages/engine/src/services/streamingEncoder.ts index 08f6cbcbb..d43cfd5d6 100644 --- a/packages/engine/src/services/streamingEncoder.ts +++ b/packages/engine/src/services/streamingEncoder.ts @@ -124,7 +124,7 @@ export interface StreamingEncoderResult { } export interface StreamingEncoder { - writeFrame: (buffer: Buffer) => boolean; + writeFrame: (buffer: Buffer) => Promise; close: () => Promise; getExitStatus: () => "running" | "success" | "error"; } @@ -447,7 +447,7 @@ export async function spawnStreamingEncoder( resetTimer(); const encoder: StreamingEncoder = { - writeFrame: (buffer: Buffer): boolean => { + writeFrame: async (buffer: Buffer): Promise => { if (exitStatus !== "running" || !ffmpeg.stdin || ffmpeg.stdin.destroyed) { return false; } @@ -458,17 +458,39 @@ export async function spawnStreamingEncoder( // and flicker. const copy = Buffer.from(buffer); const accepted = ffmpeg.stdin.write(copy); - // Reset inactivity timer ONLY on `accepted === true`. `true` means the - // write went through to the kernel pipe without buffering in Node — - // proof FFmpeg is actually consuming. `false` means Node's writable - // stream had to buffer (FFmpeg hasn't drained the pipe yet); we deliberately - // don't reset on `false` so a hung FFmpeg with a still-producing Chrome - // can't keep us alive forever while Node's stdin buffer grows to OOM. In - // steady state with a slower-but-alive FFmpeg, writes alternate between - // true and false as the buffer drains and refills; the trues are enough - // to keep the heartbeat ticking. - if (accepted) resetTimer(); - return accepted; + // Timer reset policy — three cases: + // 1. `accepted === true`: write went straight to the kernel pipe; FFmpeg + // is keeping up. Reset now. + // 2. `accepted === false` (buffer full): do NOT reset immediately. If + // FFmpeg is truly hung the timer must still fire to prevent the Node + // stdin buffer from growing unboundedly → OOM. + // 3. After `await drain`: the pipe cleared, proving FFmpeg is alive and + // consuming. Reset then — a slow-but-alive FFmpeg must never trigger + // a spurious SIGTERM. + if (accepted) { + resetTimer(); + return true; + } + // Back-pressure: FFmpeg stdin buffer is full. Await drain before returning + // so the capture loop naturally throttles to FFmpeg's encode throughput. + // This prevents Node's internal stdin buffer from growing unboundedly on + // long high-worker-count renders. + await new Promise((resolve) => { + const stdin = ffmpeg.stdin!; + const cleanup = () => { + stdin.removeListener("drain", onEvent); + stdin.removeListener("finish", onEvent); + stdin.removeListener("error", onEvent); + resolve(); + }; + const onEvent = cleanup; + stdin.once("drain", onEvent); + stdin.once("finish", onEvent); + stdin.once("error", onEvent); + }); + if (exitStatus !== "running") return false; + resetTimer(); + return true; }, close: async (): Promise => { diff --git a/packages/producer/src/services/render/stages/captureHdrHybridLoop.ts b/packages/producer/src/services/render/stages/captureHdrHybridLoop.ts index 3b3e8644c..d8260047a 100644 --- a/packages/producer/src/services/render/stages/captureHdrHybridLoop.ts +++ b/packages/producer/src/services/render/stages/captureHdrHybridLoop.ts @@ -185,7 +185,7 @@ export async function runHybridLayeredFrameLoop(input: HybridLoopInput): Promise const writeEncoded = async (frameIdx: number, buf: Buffer): Promise => { await reorderBuffer.waitForFrame(frameIdx); const writeStart = Date.now(); - hdrEncoder.writeFrame(buf); + await hdrEncoder.writeFrame(buf); addHdrTiming(hdrPerf, "encoderWriteMs", writeStart); reorderBuffer.advanceTo(frameIdx + 1); framesWritten += 1; diff --git a/packages/producer/src/services/render/stages/captureHdrSequentialLoop.ts b/packages/producer/src/services/render/stages/captureHdrSequentialLoop.ts index b372359b2..ad5a62bf6 100644 --- a/packages/producer/src/services/render/stages/captureHdrSequentialLoop.ts +++ b/packages/producer/src/services/render/stages/captureHdrSequentialLoop.ts @@ -189,7 +189,7 @@ export async function runSequentialLayeredFrameLoop(input: SequentialLoopInput): ); addHdrTiming(hdrPerf, "transitionCompositeMs", transitionTimingStart); timingStart = Date.now(); - hdrEncoder.writeFrame(transitionBuffers.output); + await hdrEncoder.writeFrame(transitionBuffers.output); addHdrTiming(hdrPerf, "encoderWriteMs", timingStart); } else { if (hdrPerf) hdrPerf.normalFrames += 1; @@ -206,7 +206,7 @@ export async function runSequentialLayeredFrameLoop(input: SequentialLoopInput): ); } timingStart = Date.now(); - hdrEncoder.writeFrame(normalCanvas); + await hdrEncoder.writeFrame(normalCanvas); addHdrTiming(hdrPerf, "encoderWriteMs", timingStart); } diff --git a/packages/producer/src/services/render/stages/captureStreamingStage.test.ts b/packages/producer/src/services/render/stages/captureStreamingStage.test.ts index ef0f49013..32767d1c5 100644 --- a/packages/producer/src/services/render/stages/captureStreamingStage.test.ts +++ b/packages/producer/src/services/render/stages/captureStreamingStage.test.ts @@ -6,7 +6,7 @@ type MinimalEngineConfig = { ffmpegStreamingTimeout: number; }; -const writeFrame = mock((_buffer: Buffer) => true); +const writeFrame = mock(async (_buffer: Buffer) => true); const closeEncoder = mock(async () => ({ success: true, durationMs: 123, fileSize: 42 })); const spawnStreamingEncoder = mock(async () => ({ writeFrame, @@ -36,6 +36,7 @@ mock.module("@hyperframes/engine", () => ({ advanceTo: () => {}, }), distributeFrames: () => [], + distributeFramesInterleaved: () => [], executeParallelCapture: async () => {}, initializeSession: async (session: { isInitialized: boolean }) => { if (failInitializeSession) { diff --git a/packages/producer/src/services/render/stages/captureStreamingStage.ts b/packages/producer/src/services/render/stages/captureStreamingStage.ts index ac68348e8..262b4bb82 100644 --- a/packages/producer/src/services/render/stages/captureStreamingStage.ts +++ b/packages/producer/src/services/render/stages/captureStreamingStage.ts @@ -50,7 +50,7 @@ import { closeCaptureSession, createCaptureSession, createFrameReorderBuffer, - distributeFrames, + distributeFramesInterleaved, executeParallelCapture, initializeSession, prepareCaptureSessionForReuse, @@ -190,12 +190,16 @@ export async function runCaptureStreamingStage( const reorderBuffer = createFrameReorderBuffer(0, totalFrames); if (workerCount > 1) { - // Parallel capture → streaming encode - const tasks = distributeFrames(totalFrames, workerCount, workDir); + // Parallel capture → streaming encode. + // Use interleaved frame distribution so all workers advance in lockstep + // and the reorder buffer stays nearly uncontended. With contiguous chunk + // distribution, worker 1 would block at its first frame until worker 0 + // finished its entire chunk — collapsing N workers to effectively 1. + const tasks = distributeFramesInterleaved(totalFrames, workerCount, workDir); const onFrameBuffer = async (frameIndex: number, buffer: Buffer): Promise => { await reorderBuffer.waitForFrame(frameIndex); - currentEncoder.writeFrame(buffer); + await currentEncoder.writeFrame(buffer); reorderBuffer.advanceTo(frameIndex + 1); }; @@ -263,7 +267,7 @@ export async function runCaptureStreamingStage( const time = (i * job.config.fps.den) / job.config.fps.num; const { buffer } = await captureFrameToBuffer(session, i, time); await reorderBuffer.waitForFrame(i); - currentEncoder.writeFrame(buffer); + await currentEncoder.writeFrame(buffer); reorderBuffer.advanceTo(i + 1); job.framesRendered = i + 1; diff --git a/packages/producer/src/services/renderOrchestrator.test.ts b/packages/producer/src/services/renderOrchestrator.test.ts index 5c19b16f2..bd9d4edb6 100644 --- a/packages/producer/src/services/renderOrchestrator.test.ts +++ b/packages/producer/src/services/renderOrchestrator.test.ts @@ -115,12 +115,18 @@ describe("shouldUseStreamingEncode", () => { ).toBe(false); }); - it("keeps png-sequence and parallel capture on the non-streaming path", () => { + it("keeps png-sequence on the non-streaming path", () => { expect(shouldUseStreamingEncode(streamingEnabledConfig, "png-sequence", 1, 240)).toBe(false); - expect(shouldUseStreamingEncode(streamingEnabledConfig, "mp4", 2, 240)).toBe(false); + expect(shouldUseStreamingEncode(streamingEnabledConfig, "png-sequence", 2, 240)).toBe(false); }); - it("keeps renders over the configured max duration on normal encoding", () => { + it("enables streaming for multi-worker renders (parallel streaming)", () => { + expect(shouldUseStreamingEncode(streamingEnabledConfig, "mp4", 2, 240)).toBe(true); + expect(shouldUseStreamingEncode(streamingEnabledConfig, "mp4", 4, 240)).toBe(true); + }); + + it("applies the duration cap to single-worker (config-driven) and a fixed 1800s cap to multi-worker", () => { + // Single-worker: duration cap from config expect(shouldUseStreamingEncode(streamingEnabledConfig, "mp4", 1, 240)).toBe(true); expect(shouldUseStreamingEncode(streamingEnabledConfig, "mp4", 1, 240.001)).toBe(false); expect( @@ -131,6 +137,21 @@ describe("shouldUseStreamingEncode", () => { 120.001, ), ).toBe(false); + // Multi-worker: fixed 1800s cap (3× the 548s Hypetech comp; guards Node stdin OOM) + expect(shouldUseStreamingEncode(streamingEnabledConfig, "mp4", 2, 548)).toBe(true); + expect(shouldUseStreamingEncode(streamingEnabledConfig, "mp4", 2, 1800)).toBe(true); + expect(shouldUseStreamingEncode(streamingEnabledConfig, "mp4", 2, 1800.001)).toBe(false); + expect(shouldUseStreamingEncode(streamingEnabledConfig, "mp4", 3, 1800)).toBe(true); + expect(shouldUseStreamingEncode(streamingEnabledConfig, "mp4", 3, 3600)).toBe(false); + // Multi-worker cap is independent of the single-worker config value + expect( + shouldUseStreamingEncode( + { enableStreamingEncode: true, streamingEncodeMaxDurationSeconds: 60 }, + "mp4", + 2, + 1800, + ), + ).toBe(true); }); }); diff --git a/packages/producer/src/services/renderOrchestrator.ts b/packages/producer/src/services/renderOrchestrator.ts index 5f95e7148..52b8e4141 100644 --- a/packages/producer/src/services/renderOrchestrator.ts +++ b/packages/producer/src/services/renderOrchestrator.ts @@ -1436,8 +1436,20 @@ export function shouldUseStreamingEncode( if (outputFormat === "png-sequence") return false; if (outputFormat === "gif") return false; if (!Number.isFinite(durationSeconds) || durationSeconds <= 0) return false; - if (durationSeconds > cfg.streamingEncodeMaxDurationSeconds) return false; - return workerCount === 1; + // Capture-side: workers serialize through FrameReorderBuffer, so at most + // workerCount captured frames are in flight at any moment. The encoder-side + // buffer (Node stdin → FFmpeg) is NOT explicitly bounded and relies on FFmpeg + // keeping up with workerCount × per-worker-fps. Long comps + slow encode + // can still grow Node's internal write buffer — see streamingEncodeMaxDurationSeconds + // for the single-worker safeguard. + // Multi-worker cap is fixed at 1800s (30 min), independent of the single-worker + // config value. Rationale: 1800s is 3× the longest known practical composition + // (~548s), giving real-world headroom while keeping worst-case Node buffer growth + // below ~20 GB. Adjust if your workloads routinely exceed 30 min. + const MULTI_WORKER_MAX_DURATION_SECONDS = 1800; + const maxDuration = workerCount === 1 ? cfg.streamingEncodeMaxDurationSeconds : MULTI_WORKER_MAX_DURATION_SECONDS; + if (durationSeconds > maxDuration) return false; + return workerCount > 0; } /**