diff --git a/packages/engine/src/services/chunkEncoder.test.ts b/packages/engine/src/services/chunkEncoder.test.ts index 4300e87a5..5b0091639 100644 --- a/packages/engine/src/services/chunkEncoder.test.ts +++ b/packages/engine/src/services/chunkEncoder.test.ts @@ -1,6 +1,93 @@ -import { describe, it, expect, vi } from "vitest"; +import { EventEmitter } from "node:events"; +import { mkdirSync, mkdtempSync, rmSync, writeFileSync } from "node:fs"; +import { tmpdir } from "node:os"; +import { join } from "node:path"; +import { afterEach, describe, it, expect, vi } from "vitest"; import { ENCODER_PRESETS, getEncoderPreset, buildEncoderArgs } from "./chunkEncoder.js"; +const TINY_PNG = Buffer.from( + "iVBORw0KGgoAAAANSUhEUgAAAAIAAAACCAIAAAD91JpzAAAACXBIWXMAAAABAAAAAQBPJcTWAAAAEElEQVR4nGP8wwACLGCSAQANBAECv1AVswAAAABJRU5ErkJggg==", + "base64", +); + +const tempDirs: string[] = []; + +afterEach(() => { + for (const dir of tempDirs.splice(0)) { + rmSync(dir, { recursive: true, force: true }); + } + vi.resetModules(); + vi.doUnmock("child_process"); + vi.useRealTimers(); +}); + +function createFrameFixture(): { root: string; framesDir: string } { + const root = mkdtempSync(join(tmpdir(), "hf-chunk-encoder-")); + tempDirs.push(root); + const framesDir = join(root, "frames"); + mkdirSync(framesDir); + for (let i = 1; i <= 2; i++) { + writeFileSync(join(framesDir, `frame_${String(i).padStart(6, "0")}.png`), TINY_PNG); + } + return { root, framesDir }; +} + +const tinyEncodeOptions = { + fps: { num: 30, den: 1 }, + width: 2, + height: 2, + codec: "h264" as const, + preset: "ultrafast", + quality: 28, + pixelFormat: "yuv420p", + useGpu: false, +}; + +function encodeTimeoutMessage(timeoutMs: number): string { + return `FFmpeg killed after exceeding ffmpegEncodeTimeout (${timeoutMs} ms)`; +} + +type FakeProc = EventEmitter & { + stderr: EventEmitter; + kill: ReturnType; + killed: boolean; +}; + +type SpawnCall = { + command: string; + args: readonly string[]; + proc: FakeProc; +}; + +function createFakeProc(): FakeProc { + const proc = new EventEmitter() as FakeProc; + proc.stderr = new EventEmitter(); + proc.kill = vi.fn(() => { + proc.killed = true; + return true; + }); + proc.killed = false; + return proc; +} + +function createSpawnSpy(): { + spawn: (command: string, args: readonly string[]) => FakeProc; + calls: SpawnCall[]; +} { + const calls: SpawnCall[] = []; + const spawn = (command: string, args: readonly string[]): FakeProc => { + const proc = createFakeProc(); + calls.push({ command, args, proc }); + return proc; + }; + return { spawn, calls }; +} + +function emitClose(proc: FakeProc, code: number): void { + proc.emit("exit", code); + proc.emit("close", code); +} + describe("ENCODER_PRESETS", () => { it("has draft, standard, and high presets", () => { expect(ENCODER_PRESETS).toHaveProperty("draft"); @@ -26,6 +113,248 @@ describe("ENCODER_PRESETS", () => { }); }); +describe("encodeFramesFromDir ffmpegEncodeTimeout", () => { + it("kills ffmpeg when config timeout elapses", async () => { + vi.useFakeTimers(); + const { spawn, calls } = createSpawnSpy(); + vi.resetModules(); + vi.doMock("child_process", () => ({ spawn })); + + const { encodeFramesFromDir } = await import("./chunkEncoder.js"); + const { root, framesDir } = createFrameFixture(); + + const encodePromise = encodeFramesFromDir( + framesDir, + "frame_%06d.png", + join(root, "timeout.mp4"), + tinyEncodeOptions, + undefined, + { ffmpegEncodeTimeout: 1000 }, + ); + + expect(calls).toHaveLength(1); + const proc = calls[0]!.proc; + vi.advanceTimersByTime(999); + expect(proc.kill).not.toHaveBeenCalled(); + + vi.advanceTimersByTime(1); + expect(proc.kill).toHaveBeenCalledWith("SIGTERM"); + + proc.stderr.emit("data", Buffer.from("terminated by timeout\n")); + emitClose(proc, 143); + + const result = await encodePromise; + expect(result.success).toBe(false); + expect(result.error).toContain("FFmpeg exited with code 143"); + expect(result.error).toContain("terminated by timeout"); + expect(result.error).toContain(encodeTimeoutMessage(1000)); + }); + + it("keeps non-timeout ffmpeg failures unchanged", async () => { + vi.useFakeTimers(); + const { spawn, calls } = createSpawnSpy(); + vi.resetModules(); + vi.doMock("child_process", () => ({ spawn })); + + const { encodeFramesFromDir } = await import("./chunkEncoder.js"); + const { root, framesDir } = createFrameFixture(); + + const encodePromise = encodeFramesFromDir( + framesDir, + "frame_%06d.png", + join(root, "failure.mp4"), + tinyEncodeOptions, + undefined, + { ffmpegEncodeTimeout: 1000 }, + ); + + expect(calls).toHaveLength(1); + const proc = calls[0]!.proc; + proc.stderr.emit("data", Buffer.from("encoder failed\n")); + emitClose(proc, 1); + + const result = await encodePromise; + expect(result.success).toBe(false); + expect(result.error).toContain("FFmpeg exited with code 1"); + expect(result.error).toContain("encoder failed"); + expect(result.error).not.toContain("ffmpegEncodeTimeout"); + }); + + it("uses the default timeout when config is omitted", async () => { + vi.useFakeTimers(); + const { spawn, calls } = createSpawnSpy(); + vi.resetModules(); + vi.doMock("child_process", () => ({ spawn })); + + const { encodeFramesFromDir } = await import("./chunkEncoder.js"); + const { root, framesDir } = createFrameFixture(); + + const encodePromise = encodeFramesFromDir( + framesDir, + "frame_%06d.png", + join(root, "default.mp4"), + tinyEncodeOptions, + ); + + expect(calls).toHaveLength(1); + const proc = calls[0]!.proc; + vi.advanceTimersByTime(599_999); + expect(proc.kill).not.toHaveBeenCalled(); + + emitClose(proc, 0); + + const result = await encodePromise; + expect(result.success).toBe(true); + expect(result.framesEncoded).toBe(2); + expect(result.fileSize).toBe(0); + }); +}); + +describe("encodeFramesChunkedConcat ffmpegEncodeTimeout", () => { + it("passes config timeout to per-chunk encodes", async () => { + vi.useFakeTimers(); + const { spawn, calls } = createSpawnSpy(); + vi.resetModules(); + vi.doMock("child_process", () => ({ spawn })); + + const { encodeFramesChunkedConcat } = await import("./chunkEncoder.js"); + const { root, framesDir } = createFrameFixture(); + + const encodePromise = encodeFramesChunkedConcat( + framesDir, + "frame_%06d.png", + join(root, "chunked.mp4"), + tinyEncodeOptions, + 30, + undefined, + { ffmpegEncodeTimeout: 1000 }, + ); + + expect(calls).toHaveLength(1); + const proc = calls[0]!.proc; + vi.advanceTimersByTime(999); + expect(proc.kill).not.toHaveBeenCalled(); + + vi.advanceTimersByTime(1); + expect(proc.kill).toHaveBeenCalledWith("SIGTERM"); + + proc.stderr.emit("data", Buffer.from("chunk timeout\n")); + emitClose(proc, 143); + + const result = await encodePromise; + expect(result.success).toBe(false); + expect(result.error).toContain("Chunk 0 encode failed"); + expect(result.error).toContain("chunk timeout"); + expect(result.error).toContain(encodeTimeoutMessage(1000)); + }); + + it("keeps non-timeout chunk failures unchanged", async () => { + vi.useFakeTimers(); + const { spawn, calls } = createSpawnSpy(); + vi.resetModules(); + vi.doMock("child_process", () => ({ spawn })); + + const { encodeFramesChunkedConcat } = await import("./chunkEncoder.js"); + const { root, framesDir } = createFrameFixture(); + + const encodePromise = encodeFramesChunkedConcat( + framesDir, + "frame_%06d.png", + join(root, "chunked-failure.mp4"), + tinyEncodeOptions, + 30, + undefined, + { ffmpegEncodeTimeout: 1000 }, + ); + + expect(calls).toHaveLength(1); + const proc = calls[0]!.proc; + proc.stderr.emit("data", Buffer.from("chunk failed\n")); + emitClose(proc, 1); + + const result = await encodePromise; + expect(result.success).toBe(false); + expect(result.error).toBe("Chunk 0 encode failed: chunk failed\n"); + expect(result.error).not.toContain("ffmpegEncodeTimeout"); + }); + + it("kills concat ffmpeg when config timeout elapses", async () => { + vi.useFakeTimers(); + const { spawn, calls } = createSpawnSpy(); + vi.resetModules(); + vi.doMock("child_process", () => ({ spawn })); + + const { encodeFramesChunkedConcat } = await import("./chunkEncoder.js"); + const { root, framesDir } = createFrameFixture(); + + const encodePromise = encodeFramesChunkedConcat( + framesDir, + "frame_%06d.png", + join(root, "concat-timeout.mp4"), + tinyEncodeOptions, + 30, + undefined, + { ffmpegEncodeTimeout: 1000 }, + ); + + expect(calls).toHaveLength(1); + emitClose(calls[0]!.proc, 0); + await Promise.resolve(); + + expect(calls).toHaveLength(2); + const concatProc = calls[1]!.proc; + vi.advanceTimersByTime(999); + expect(concatProc.kill).not.toHaveBeenCalled(); + + vi.advanceTimersByTime(1); + expect(concatProc.kill).toHaveBeenCalledWith("SIGTERM"); + + concatProc.stderr.emit("data", Buffer.from("concat timeout\n")); + emitClose(concatProc, 143); + + const result = await encodePromise; + expect(result.success).toBe(false); + expect(result.error).toContain("Chunk concat failed"); + expect(result.error).toContain("concat timeout"); + expect(result.error).toContain(encodeTimeoutMessage(1000)); + }); + + it("uses the default timeout for per-chunk encodes when config is omitted", async () => { + vi.useFakeTimers(); + const { spawn, calls } = createSpawnSpy(); + vi.resetModules(); + vi.doMock("child_process", () => ({ spawn })); + + const { encodeFramesChunkedConcat } = await import("./chunkEncoder.js"); + const { root, framesDir } = createFrameFixture(); + + const encodePromise = encodeFramesChunkedConcat( + framesDir, + "frame_%06d.png", + join(root, "chunked-default.mp4"), + tinyEncodeOptions, + 30, + ); + + expect(calls).toHaveLength(1); + const chunkProc = calls[0]!.proc; + vi.advanceTimersByTime(599_999); + expect(chunkProc.kill).not.toHaveBeenCalled(); + + emitClose(chunkProc, 0); + await Promise.resolve(); + + expect(calls).toHaveLength(2); + const concatProc = calls[1]!.proc; + emitClose(concatProc, 0); + + const result = await encodePromise; + expect(result.success).toBe(true); + expect(result.framesEncoded).toBe(2); + expect(result.fileSize).toBe(0); + }); +}); + describe("getEncoderPreset", () => { it("returns h264 with yuv420p for mp4 format", () => { const preset = getEncoderPreset("standard", "mp4"); diff --git a/packages/engine/src/services/chunkEncoder.ts b/packages/engine/src/services/chunkEncoder.ts index e5f1f3a4d..fb11da468 100644 --- a/packages/engine/src/services/chunkEncoder.ts +++ b/packages/engine/src/services/chunkEncoder.ts @@ -39,6 +39,11 @@ export interface EncoderPreset { hdr?: { transfer: HdrTransfer }; } +function appendEncodeTimeoutMessage(error: string, timedOut: boolean, timeoutMs: number): string { + if (!timedOut) return error; + return `${error}\nFFmpeg killed after exceeding ffmpegEncodeTimeout (${timeoutMs} ms)`; +} + /** * Get encoder preset for a given quality and output format. * WebM uses VP9 with alpha-capable pixel format; MP4 uses h264 (or h265 for HDR); @@ -428,7 +433,9 @@ export async function encodeFramesFromDir( } const encodeTimeout = config?.ffmpegEncodeTimeout ?? DEFAULT_CONFIG.ffmpegEncodeTimeout; + let timedOut = false; const timer = setTimeout(() => { + timedOut = true; ffmpeg.kill("SIGTERM"); }, encodeTimeout); @@ -440,7 +447,7 @@ export async function encodeFramesFromDir( clearTimeout(timer); if (signal) signal.removeEventListener("abort", onAbort); const durationMs = Date.now() - startTime; - if (signal?.aborted) { + if (signal?.aborted && !timedOut) { resolve({ success: false, outputPath, @@ -452,14 +459,18 @@ export async function encodeFramesFromDir( return; } - if (code !== 0) { + if (code !== 0 || timedOut) { resolve({ success: false, outputPath, durationMs, framesEncoded: 0, fileSize: 0, - error: formatFfmpegError(code, stderr), + error: appendEncodeTimeoutMessage( + formatFfmpegError(code, stderr), + timedOut, + encodeTimeout, + ), }); return; } @@ -477,7 +488,7 @@ export async function encodeFramesFromDir( durationMs: Date.now() - startTime, framesEncoded: 0, fileSize: 0, - error: `[FFmpeg] ${err.message}`, + error: appendEncodeTimeoutMessage(`[FFmpeg] ${err.message}`, timedOut, encodeTimeout), }); }); }); @@ -490,6 +501,7 @@ export async function encodeFramesChunkedConcat( options: EncoderOptions, chunkSizeFrames: number, signal?: AbortSignal, + config?: Partial>, ): Promise { const start = Date.now(); const files = readdirSync(framesDir) @@ -548,15 +560,39 @@ export async function encodeFramesChunkedConcat( const ffmpeg = spawn(getFfmpegBinary(), args); trackChildProcess(ffmpeg); let stderr = ""; + const encodeTimeout = config?.ffmpegEncodeTimeout ?? DEFAULT_CONFIG.ffmpegEncodeTimeout; + let timedOut = false; + const timer = setTimeout(() => { + timedOut = true; + ffmpeg.kill("SIGTERM"); + }, encodeTimeout); ffmpeg.stderr.on("data", (d) => { stderr += d.toString(); }); ffmpeg.on("close", (code) => { - if (code === 0) resolve({ success: true }); - else resolve({ success: false, error: `Chunk ${i} encode failed: ${stderr.slice(-400)}` }); + clearTimeout(timer); + if (code === 0 && !timedOut) resolve({ success: true }); + else { + resolve({ + success: false, + error: appendEncodeTimeoutMessage( + `Chunk ${i} encode failed: ${stderr.slice(-400)}`, + timedOut, + encodeTimeout, + ), + }); + } }); ffmpeg.on("error", (err) => { - resolve({ success: false, error: `Chunk ${i} encode error: ${err.message}` }); + clearTimeout(timer); + resolve({ + success: false, + error: appendEncodeTimeoutMessage( + `Chunk ${i} encode error: ${err.message}`, + timedOut, + encodeTimeout, + ), + }); }); }); if (!chunkResult.success) { @@ -592,15 +628,39 @@ export async function encodeFramesChunkedConcat( const ffmpeg = spawn(getFfmpegBinary(), concatArgs); trackChildProcess(ffmpeg); let stderr = ""; + const encodeTimeout = config?.ffmpegEncodeTimeout ?? DEFAULT_CONFIG.ffmpegEncodeTimeout; + let timedOut = false; + const timer = setTimeout(() => { + timedOut = true; + ffmpeg.kill("SIGTERM"); + }, encodeTimeout); ffmpeg.stderr.on("data", (d) => { stderr += d.toString(); }); ffmpeg.on("close", (code) => { - if (code === 0) resolve({ success: true }); - else resolve({ success: false, error: `Chunk concat failed: ${stderr.slice(-400)}` }); + clearTimeout(timer); + if (code === 0 && !timedOut) resolve({ success: true }); + else { + resolve({ + success: false, + error: appendEncodeTimeoutMessage( + `Chunk concat failed: ${stderr.slice(-400)}`, + timedOut, + encodeTimeout, + ), + }); + } }); ffmpeg.on("error", (err) => { - resolve({ success: false, error: `Chunk concat error: ${err.message}` }); + clearTimeout(timer); + resolve({ + success: false, + error: appendEncodeTimeoutMessage( + `Chunk concat error: ${err.message}`, + timedOut, + encodeTimeout, + ), + }); }); }); diff --git a/packages/producer/src/services/render/stages/encodeStage.test.ts b/packages/producer/src/services/render/stages/encodeStage.test.ts index f2ef4ac1a..728116f79 100644 --- a/packages/producer/src/services/render/stages/encodeStage.test.ts +++ b/packages/producer/src/services/render/stages/encodeStage.test.ts @@ -1,5 +1,116 @@ -import { describe, expect, it } from "bun:test"; +import { mkdirSync, mkdtempSync, rmSync, writeFileSync } from "node:fs"; +import { tmpdir } from "node:os"; +import { join } from "node:path"; +import { afterEach, describe, expect, it, mock } from "bun:test"; import { buildGifPalettegenArgs, buildGifPaletteuseArgs } from "./gifEncodeArgs.js"; +import type { EncodeStageInput } from "./encodeStage.js"; + +const resolvedEngineConfig = { ffmpegEncodeTimeout: 12_345 }; +const encodeFramesFromDirMock = mock( + async (_framesDir: string, _framePattern: string, outputPath: string) => ({ + success: true, + outputPath, + durationMs: 1, + framesEncoded: 1, + fileSize: 1, + }), +); +const encodeFramesChunkedConcatMock = mock( + async (_framesDir: string, _framePattern: string, outputPath: string) => ({ + success: true, + outputPath, + durationMs: 1, + framesEncoded: 1, + fileSize: 1, + }), +); +const runFfmpegMock = mock(async () => ({ + success: true, + exitCode: 0, + stderr: "", + durationMs: 1, +})); + +mock.module("@hyperframes/engine", () => ({ + DEFAULT_CONFIG: { ffmpegEncodeTimeout: 600_000 }, + encodeFramesChunkedConcat: encodeFramesChunkedConcatMock, + encodeFramesFromDir: encodeFramesFromDirMock, + formatFfmpegError: (code: number | null, stderr: string) => `${String(code)} ${stderr}`, + getEncoderPreset: () => ({ + codec: "h264", + preset: "ultrafast", + quality: 28, + pixelFormat: "yuv420p", + }), + resolveConfig: () => resolvedEngineConfig, + runFfmpeg: runFfmpegMock, +})); + +const tempDirs: string[] = []; + +afterEach(() => { + encodeFramesFromDirMock.mockClear(); + encodeFramesChunkedConcatMock.mockClear(); + runFfmpegMock.mockClear(); + for (const dir of tempDirs.splice(0)) { + rmSync(dir, { recursive: true, force: true }); + } +}); + +function createFramesDir(ext: "jpg" | "png"): { root: string; framesDir: string } { + const root = mkdtempSync(join(tmpdir(), "hf-encode-stage-")); + tempDirs.push(root); + const framesDir = join(root, "frames"); + mkdirSync(framesDir); + writeFileSync(join(framesDir, `frame_000001.${ext}`), "stub"); + return { root, framesDir }; +} + +function makeInput(overrides: Partial = {}): EncodeStageInput { + const paths = createFramesDir("jpg"); + return { + job: { + id: "encode-stage-config-test", + config: { + fps: { num: 30, den: 1 }, + quality: "draft", + }, + status: "queued", + progress: 0, + currentStage: "queued", + createdAt: new Date(0), + duration: 1, + }, + log: { + error: () => {}, + warn: () => {}, + info: () => {}, + debug: () => {}, + }, + outputPath: join(paths.root, "out.mp4"), + framesDir: paths.framesDir, + videoOnlyPath: join(paths.root, "video-only.mp4"), + width: 2, + height: 2, + needsAlpha: false, + hasAudio: false, + isPngSequence: false, + isGif: false, + preset: { + codec: "h264", + preset: "ultrafast", + quality: 28, + pixelFormat: "yuv420p", + }, + effectiveQuality: 28, + effectiveBitrate: undefined, + enableChunkedEncode: false, + chunkedEncodeSize: 30, + abortSignal: undefined, + assertNotAborted: () => {}, + ...overrides, + }; +} describe("gif encode args", () => { const input = { @@ -41,3 +152,55 @@ describe("gif encode args", () => { ]); }); }); + +describe("runEncodeStage config plumbing", () => { + it("prefers engine config supplied by the orchestrator", async () => { + const { runEncodeStage } = await import("./encodeStage.js"); + const orchestratorEngineConfig = { ffmpegEncodeTimeout: 54_321 }; + + await runEncodeStage(makeInput({ engineConfig: orchestratorEngineConfig })); + + expect(encodeFramesFromDirMock).toHaveBeenCalledTimes(1); + expect(encodeFramesFromDirMock.mock.calls[0]?.[5]).toBe(orchestratorEngineConfig); + }); + + it("passes resolved engine config to encodeFramesFromDir", async () => { + const { runEncodeStage } = await import("./encodeStage.js"); + + await runEncodeStage(makeInput()); + + expect(encodeFramesFromDirMock).toHaveBeenCalledTimes(1); + expect(encodeFramesFromDirMock.mock.calls[0]?.[5]).toBe(resolvedEngineConfig); + }); + + it("passes resolved engine config to encodeFramesChunkedConcat", async () => { + const { runEncodeStage } = await import("./encodeStage.js"); + + await runEncodeStage(makeInput({ enableChunkedEncode: true })); + + expect(encodeFramesChunkedConcatMock).toHaveBeenCalledTimes(1); + expect(encodeFramesChunkedConcatMock.mock.calls[0]?.[6]).toBe(resolvedEngineConfig); + }); + + it("uses resolved engine config for GIF ffmpeg timeouts", async () => { + const { runEncodeStage } = await import("./encodeStage.js"); + const paths = createFramesDir("jpg"); + + await runEncodeStage( + makeInput({ + framesDir: paths.framesDir, + outputPath: join(paths.root, "out.gif"), + videoOnlyPath: join(paths.root, "video-only.mp4"), + isGif: true, + }), + ); + + expect(runFfmpegMock).toHaveBeenCalledTimes(2); + expect(runFfmpegMock.mock.calls[0]?.[1]?.timeout).toBe( + resolvedEngineConfig.ffmpegEncodeTimeout, + ); + expect(runFfmpegMock.mock.calls[1]?.[1]?.timeout).toBe( + resolvedEngineConfig.ffmpegEncodeTimeout, + ); + }); +}); diff --git a/packages/producer/src/services/render/stages/encodeStage.ts b/packages/producer/src/services/render/stages/encodeStage.ts index c4ef22b7f..2787cac45 100644 --- a/packages/producer/src/services/render/stages/encodeStage.ts +++ b/packages/producer/src/services/render/stages/encodeStage.ts @@ -31,12 +31,13 @@ import { copyFileSync, existsSync, mkdirSync, readdirSync, statSync } from "node:fs"; import { dirname, join } from "node:path"; import { - DEFAULT_CONFIG, encodeFramesChunkedConcat, encodeFramesFromDir, formatFfmpegError, getEncoderPreset, + resolveConfig, runFfmpeg, + type EngineConfig, type EncodeResult, } from "@hyperframes/engine"; import type { Fps } from "@hyperframes/core"; @@ -83,6 +84,8 @@ export interface EncodeStageInput { /** Producer config — enables the chunked-concat encoder when on. */ enableChunkedEncode: boolean; chunkedEncodeSize: number; + /** Already-resolved engine config from the orchestrator; direct callers fall back below. */ + engineConfig?: Pick; abortSignal: AbortSignal | undefined; assertNotAborted: () => void; onProgress?: ProgressCallback; @@ -241,6 +244,8 @@ export async function runEncodeStage(input: EncodeStageInput): Promise