diff --git a/packages/server/src/concurrency.ts b/packages/server/src/concurrency.ts new file mode 100644 index 00000000..dac592fb --- /dev/null +++ b/packages/server/src/concurrency.ts @@ -0,0 +1,64 @@ +/** + * In-process concurrency limit for task execution. + * + * Bounds how many tasks can run concurrently across all SSE and WebSocket + * connections. When at the limit, new requests get an immediate + * `AT_CAPACITY` rejection with a `Retry-After` hint, instead of queuing + * indefinitely on the server and tying up file descriptors / memory. + * + * The limit comes from `PILO_MAX_CONCURRENT_TASKS` (default 10). Read lazily + * each call so tests can override at runtime; production reads it once at + * startup since the env var doesn't change. + * + * In-process only — fine for the current single-instance deployment. If we + * ever scale to multiple replicas, replace with a Redis-backed token bucket + * or rely on the load balancer's queue depth. + */ + +const DEFAULT_LIMIT = 10; + +function parseLimit(): number { + const env = process.env.PILO_MAX_CONCURRENT_TASKS; + if (env === undefined) return DEFAULT_LIMIT; + const n = Number(env); + if (!Number.isFinite(n) || n <= 0) return DEFAULT_LIMIT; + return Math.floor(n); +} + +let inflight = 0; + +/** + * Try to acquire a task slot. Returns true on success (caller MUST eventually + * call `release()` exactly once). Returns false when at the limit; the caller + * should respond with an AT_CAPACITY error. + */ +export function tryAcquire(): boolean { + if (inflight >= parseLimit()) return false; + inflight++; + return true; +} + +/** + * Release a previously-acquired task slot. Safe to call when the counter is + * already at zero (no-op) so cleanup paths don't need to track acquisition + * state precisely — but each successful `tryAcquire` should be paired with + * exactly one release. + */ +export function release(): void { + if (inflight > 0) inflight--; +} + +/** Current number of in-flight tasks. */ +export function getInflight(): number { + return inflight; +} + +/** The configured maximum (re-evaluated each call from the env var). */ +export function getMaxConcurrent(): number { + return parseLimit(); +} + +/** Test-only: reset the inflight counter. */ +export function _resetInflight(): void { + inflight = 0; +} diff --git a/packages/server/src/routes/pilo.test.ts b/packages/server/src/routes/pilo.test.ts index 9562b310..0d5c1a25 100644 --- a/packages/server/src/routes/pilo.test.ts +++ b/packages/server/src/routes/pilo.test.ts @@ -60,10 +60,13 @@ vi.mock("../StreamLogger.js", () => ({ const UUID_RE = /^[0-9a-f]{8}-[0-9a-f]{4}-4[0-9a-f]{3}-[89ab][0-9a-f]{3}-[0-9a-f]{12}$/i; +import { _resetInflight } from "../concurrency.js"; + describe("Pilo Routes", () => { let app: Hono; beforeEach(() => { + _resetInflight(); app = new Hono(); app.route("/pilo", piloRoutes); @@ -71,6 +74,10 @@ describe("Pilo Routes", () => { // vi.clearAllMocks(); }); + afterEach(() => { + _resetInflight(); + }); + describe("POST /pilo/run", () => { beforeEach(async () => { process.env.OPENAI_API_KEY = "test-key"; @@ -274,6 +281,40 @@ describe("Pilo Routes", () => { expect(res.headers.get("x-pilo-task-id")).toBe(data.error.taskId); }); + it("should return 429 with AT_CAPACITY when inflight tasks are at the limit", async () => { + process.env.PILO_MAX_CONCURRENT_TASKS = "1"; + const { tryAcquire } = await import("../concurrency.js"); + expect(tryAcquire()).toBe(true); + + const res = await app.request("/pilo/run", { + method: "POST", + headers: { "Content-Type": "application/json" }, + body: JSON.stringify({ task: "test task" }), + }); + + expect(res.status).toBe(429); + expect(res.headers.get("Retry-After")).toBeDefined(); + const data = await res.json(); + expect(data.error.code).toBe("AT_CAPACITY"); + expect(data.error.reason).toBe("AT_CAPACITY"); + expect(data.error.taskId).toBeDefined(); + delete process.env.PILO_MAX_CONCURRENT_TASKS; + }); + + it("should release the inflight slot after a successful SSE request completes", async () => { + const res = await app.request("/pilo/run", { + method: "POST", + headers: { "Content-Type": "application/json" }, + body: JSON.stringify({ task: "test task" }), + }); + + // Drain the SSE stream so the streamSSE callback finishes. + await res.text(); + + const { getInflight } = await import("../concurrency.js"); + expect(getInflight()).toBe(0); + }); + it("should include taskId in SSE start event", async () => { const res = await app.request("/pilo/run", { method: "POST", diff --git a/packages/server/src/routes/pilo.ts b/packages/server/src/routes/pilo.ts index 82b2120a..f52a40d5 100644 --- a/packages/server/src/routes/pilo.ts +++ b/packages/server/src/routes/pilo.ts @@ -1,6 +1,7 @@ import { randomUUID } from "node:crypto"; import { Hono } from "hono"; import { streamSSE } from "hono/streaming"; +import { release, tryAcquire } from "../concurrency.js"; import { runTask, validateTaskRequest, @@ -9,6 +10,8 @@ import { } from "../taskRunner.js"; import type { PiloTaskRequest } from "../taskRunner.js"; +const RETRY_AFTER_SECONDS = "30"; + const pilo = new Hono(); // POST /pilo/run - Execute a Pilo task with real-time SSE streaming (non-interactive) @@ -29,6 +32,20 @@ pilo.post("/run", async (c) => { ); } + if (!tryAcquire()) { + c.header("Retry-After", RETRY_AFTER_SECONDS); + return c.json( + createErrorResponse({ + message: "The server is at capacity. Retry after a short delay.", + code: "AT_CAPACITY", + reason: "AT_CAPACITY", + phase: "setup", + taskId, + }), + 429, + ); + } + return streamSSE(c, async (stream) => { const abortController = new AbortController(); @@ -83,6 +100,8 @@ pilo.post("/run", async (c) => { ), }); } + } finally { + release(); } }); } catch (error) { diff --git a/packages/server/src/routes/piloWs.test.ts b/packages/server/src/routes/piloWs.test.ts index 7459ca0f..82433b41 100644 --- a/packages/server/src/routes/piloWs.test.ts +++ b/packages/server/src/routes/piloWs.test.ts @@ -80,6 +80,7 @@ vi.mock("../taskRunner.js", () => ({ import { createPiloWsRoute } from "./piloWs.js"; import type { UpgradeWebSocket, WSContext } from "hono/ws"; +import { _resetInflight, tryAcquire } from "../concurrency.js"; const UUID_RE = /^[0-9a-f]{8}-[0-9a-f]{4}-4[0-9a-f]{3}-[89ab][0-9a-f]{3}-[0-9a-f]{12}$/i; @@ -347,6 +348,55 @@ describe("piloWs", () => { }); }); + describe("capacity limit", () => { + afterEach(() => { + delete process.env.PILO_MAX_CONCURRENT_TASKS; + _resetInflight(); + }); + + it("should send AT_CAPACITY error and not start a task when at limit", async () => { + process.env.PILO_MAX_CONCURRENT_TASKS = "1"; + _resetInflight(); + expect(tryAcquire()).toBe(true); + + const h = createTestHarness(); + h.sendMessage({ event: "task:details", data: { task: "test" } }); + await vi.runAllTimersAsync(); + + const errorMsg = h.sentMessages.find((m) => m.event === "error"); + expect(errorMsg).toBeDefined(); + expect(errorMsg!.data.error.code).toBe("AT_CAPACITY"); + expect(errorMsg!.data.error.reason).toBe("AT_CAPACITY"); + expect(errorMsg!.data.error.taskId).toMatch(UUID_RE); + // runTask must not have been called + expect(mockRunTask).not.toHaveBeenCalled(); + }); + + it("should release the slot after a successful task completes", async () => { + _resetInflight(); + mockRunTask.mockResolvedValue({ success: true }); + + const h = createTestHarness(); + h.sendMessage({ event: "task:details", data: { task: "test" } }); + await vi.runAllTimersAsync(); + + const { getInflight } = await import("../concurrency.js"); + expect(getInflight()).toBe(0); + }); + + it("should release the slot after a task throws", async () => { + _resetInflight(); + mockRunTask.mockRejectedValue(new Error("boom")); + + const h = createTestHarness(); + h.sendMessage({ event: "task:details", data: { task: "test" } }); + await vi.runAllTimersAsync(); + + const { getInflight } = await import("../concurrency.js"); + expect(getInflight()).toBe(0); + }); + }); + describe("taskId", () => { it("should emit task:accepted event with taskId after validation passes", async () => { const h = createTestHarness(); diff --git a/packages/server/src/routes/piloWs.ts b/packages/server/src/routes/piloWs.ts index 173ef6d3..1348090c 100644 --- a/packages/server/src/routes/piloWs.ts +++ b/packages/server/src/routes/piloWs.ts @@ -21,6 +21,7 @@ import { Hono } from "hono"; import type { UpgradeWebSocket, WSContext } from "hono/ws"; import type { UserDataCallback, UserDataResponse } from "pilo-core"; import { withRemoteContext } from "pilo-core"; +import { release, tryAcquire } from "../concurrency.js"; import { runTask, validateTaskRequest, @@ -125,6 +126,21 @@ export function createPiloWsRoute(upgradeWebSocket: UpgradeWebSocket): Hono { return; } + if (!tryAcquire()) { + send( + ws, + "error", + createErrorResponse({ + message: "The server is at capacity. Retry after a short delay.", + code: "AT_CAPACITY", + reason: "AT_CAPACITY", + phase: "setup", + taskId, + }), + ); + return; + } + taskRunning = true; send(ws, "task:accepted", { taskId }); @@ -173,6 +189,7 @@ export function createPiloWsRoute(upgradeWebSocket: UpgradeWebSocket): Hono { } } finally { taskRunning = false; + release(); for (const [id, pending] of pendingRequests) { clearTimeout(pending.timer); pending.reject(new Error("Task ended")); diff --git a/packages/server/src/taskRunner.ts b/packages/server/src/taskRunner.ts index 1cf41835..699c157b 100644 --- a/packages/server/src/taskRunner.ts +++ b/packages/server/src/taskRunner.ts @@ -97,6 +97,7 @@ export type ErrorReason = | "MAX_ITERATIONS" | "MAX_ERRORS" | "TIMEOUT" + | "AT_CAPACITY" | "INTERNAL_ERROR"; /** Which phase of the request pipeline produced the error. */ @@ -167,6 +168,7 @@ const REASON_HINTS: Record = { "The agent exceeded the maximum number of iterations without completing the task.", MAX_ERRORS: "The agent hit the error threshold and aborted.", TIMEOUT: "The task exceeded its time budget.", + AT_CAPACITY: "The server is at capacity. Retry after a short delay.", INTERNAL_ERROR: "The task failed due to an internal error.", }; diff --git a/packages/server/test/concurrency.test.ts b/packages/server/test/concurrency.test.ts new file mode 100644 index 00000000..adfa811b --- /dev/null +++ b/packages/server/test/concurrency.test.ts @@ -0,0 +1,94 @@ +import { describe, it, expect, beforeEach, afterEach } from "vitest"; +import { + tryAcquire, + release, + getInflight, + getMaxConcurrent, + _resetInflight, +} from "../src/concurrency.js"; + +describe("concurrency limit", () => { + const originalEnv = process.env.PILO_MAX_CONCURRENT_TASKS; + + beforeEach(() => { + _resetInflight(); + }); + + afterEach(() => { + if (originalEnv === undefined) { + delete process.env.PILO_MAX_CONCURRENT_TASKS; + } else { + process.env.PILO_MAX_CONCURRENT_TASKS = originalEnv; + } + _resetInflight(); + }); + + describe("tryAcquire / release", () => { + it("returns true and increments while under the limit", () => { + process.env.PILO_MAX_CONCURRENT_TASKS = "3"; + expect(tryAcquire()).toBe(true); + expect(tryAcquire()).toBe(true); + expect(getInflight()).toBe(2); + }); + + it("returns false and does NOT increment when at the limit", () => { + process.env.PILO_MAX_CONCURRENT_TASKS = "2"; + expect(tryAcquire()).toBe(true); + expect(tryAcquire()).toBe(true); + expect(tryAcquire()).toBe(false); + expect(getInflight()).toBe(2); + }); + + it("release decrements the counter", () => { + process.env.PILO_MAX_CONCURRENT_TASKS = "2"; + tryAcquire(); + tryAcquire(); + release(); + expect(getInflight()).toBe(1); + }); + + it("after a release, a new tryAcquire succeeds again", () => { + process.env.PILO_MAX_CONCURRENT_TASKS = "1"; + expect(tryAcquire()).toBe(true); + expect(tryAcquire()).toBe(false); + release(); + expect(tryAcquire()).toBe(true); + }); + + it("release floors at zero (safe to call extra times)", () => { + release(); + release(); + release(); + expect(getInflight()).toBe(0); + }); + }); + + describe("limit configuration", () => { + it("uses 10 as the default when PILO_MAX_CONCURRENT_TASKS is unset", () => { + delete process.env.PILO_MAX_CONCURRENT_TASKS; + expect(getMaxConcurrent()).toBe(10); + }); + + it("reads PILO_MAX_CONCURRENT_TASKS from env", () => { + process.env.PILO_MAX_CONCURRENT_TASKS = "5"; + expect(getMaxConcurrent()).toBe(5); + }); + + it("falls back to default for non-numeric values", () => { + process.env.PILO_MAX_CONCURRENT_TASKS = "not-a-number"; + expect(getMaxConcurrent()).toBe(10); + }); + + it("falls back to default for zero or negative values", () => { + process.env.PILO_MAX_CONCURRENT_TASKS = "0"; + expect(getMaxConcurrent()).toBe(10); + process.env.PILO_MAX_CONCURRENT_TASKS = "-3"; + expect(getMaxConcurrent()).toBe(10); + }); + + it("floors fractional values", () => { + process.env.PILO_MAX_CONCURRENT_TASKS = "3.7"; + expect(getMaxConcurrent()).toBe(3); + }); + }); +});