Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 64 additions & 0 deletions packages/server/src/concurrency.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/**
* In-process concurrency limit for task execution.
*
* Bounds how many tasks can run concurrently across all SSE and WebSocket
* connections. When at the limit, new requests get an immediate
* `AT_CAPACITY` rejection with a `Retry-After` hint, instead of queuing
* indefinitely on the server and tying up file descriptors / memory.
*
* The limit comes from `PILO_MAX_CONCURRENT_TASKS` (default 10). Read lazily
* each call so tests can override at runtime; production reads it once at
* startup since the env var doesn't change.
*
* In-process only — fine for the current single-instance deployment. If we
* ever scale to multiple replicas, replace with a Redis-backed token bucket
* or rely on the load balancer's queue depth.
Comment on lines +13 to +15
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, we might need this sooner than later for Pilo server production: looks like we currently run at least 3 replicas there and a max of 30. So, this would only cap concurrency per replica?

*/

const DEFAULT_LIMIT = 10;

function parseLimit(): number {
const env = process.env.PILO_MAX_CONCURRENT_TASKS;
if (env === undefined) return DEFAULT_LIMIT;
const n = Number(env);
if (!Number.isFinite(n) || n <= 0) return DEFAULT_LIMIT;
return Math.floor(n);
}

let inflight = 0;

/**
* Try to acquire a task slot. Returns true on success (caller MUST eventually
* call `release()` exactly once). Returns false when at the limit; the caller
* should respond with an AT_CAPACITY error.
*/
export function tryAcquire(): boolean {
if (inflight >= parseLimit()) return false;
inflight++;
return true;
}

/**
* Release a previously-acquired task slot. Safe to call when the counter is
* already at zero (no-op) so cleanup paths don't need to track acquisition
* state precisely — but each successful `tryAcquire` should be paired with
* exactly one release.
*/
export function release(): void {
if (inflight > 0) inflight--;
}

/** Current number of in-flight tasks. */
export function getInflight(): number {
return inflight;
}

/** The configured maximum (re-evaluated each call from the env var). */
export function getMaxConcurrent(): number {
return parseLimit();
}

/** Test-only: reset the inflight counter. */
export function _resetInflight(): void {
inflight = 0;
}
41 changes: 41 additions & 0 deletions packages/server/src/routes/pilo.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -60,17 +60,24 @@ vi.mock("../StreamLogger.js", () => ({

const UUID_RE = /^[0-9a-f]{8}-[0-9a-f]{4}-4[0-9a-f]{3}-[89ab][0-9a-f]{3}-[0-9a-f]{12}$/i;

import { _resetInflight } from "../concurrency.js";

describe("Pilo Routes", () => {
let app: Hono;

beforeEach(() => {
_resetInflight();
app = new Hono();
app.route("/pilo", piloRoutes);

// Don't clear mocks - it breaks our mock setup
// vi.clearAllMocks();
});

afterEach(() => {
_resetInflight();
});

describe("POST /pilo/run", () => {
beforeEach(async () => {
process.env.OPENAI_API_KEY = "test-key";
Expand Down Expand Up @@ -274,6 +281,40 @@ describe("Pilo Routes", () => {
expect(res.headers.get("x-pilo-task-id")).toBe(data.error.taskId);
});

it("should return 429 with AT_CAPACITY when inflight tasks are at the limit", async () => {
process.env.PILO_MAX_CONCURRENT_TASKS = "1";
const { tryAcquire } = await import("../concurrency.js");
expect(tryAcquire()).toBe(true);

const res = await app.request("/pilo/run", {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({ task: "test task" }),
});

expect(res.status).toBe(429);
expect(res.headers.get("Retry-After")).toBeDefined();
const data = await res.json();
expect(data.error.code).toBe("AT_CAPACITY");
expect(data.error.reason).toBe("AT_CAPACITY");
expect(data.error.taskId).toBeDefined();
delete process.env.PILO_MAX_CONCURRENT_TASKS;
});

it("should release the inflight slot after a successful SSE request completes", async () => {
const res = await app.request("/pilo/run", {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({ task: "test task" }),
});

// Drain the SSE stream so the streamSSE callback finishes.
await res.text();

const { getInflight } = await import("../concurrency.js");
expect(getInflight()).toBe(0);
});

it("should include taskId in SSE start event", async () => {
const res = await app.request("/pilo/run", {
method: "POST",
Expand Down
19 changes: 19 additions & 0 deletions packages/server/src/routes/pilo.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { randomUUID } from "node:crypto";
import { Hono } from "hono";
import { streamSSE } from "hono/streaming";
import { release, tryAcquire } from "../concurrency.js";
import {
runTask,
validateTaskRequest,
Expand All @@ -9,6 +10,8 @@ import {
} from "../taskRunner.js";
import type { PiloTaskRequest } from "../taskRunner.js";

const RETRY_AFTER_SECONDS = "30";

const pilo = new Hono();

// POST /pilo/run - Execute a Pilo task with real-time SSE streaming (non-interactive)
Expand All @@ -29,6 +32,20 @@ pilo.post("/run", async (c) => {
);
}

if (!tryAcquire()) {
c.header("Retry-After", RETRY_AFTER_SECONDS);
return c.json(
createErrorResponse({
message: "The server is at capacity. Retry after a short delay.",
code: "AT_CAPACITY",
reason: "AT_CAPACITY",
phase: "setup",
taskId,
}),
429,
);
}

return streamSSE(c, async (stream) => {
const abortController = new AbortController();

Expand Down Expand Up @@ -83,6 +100,8 @@ pilo.post("/run", async (c) => {
),
});
}
} finally {
release();
}
});
} catch (error) {
Expand Down
50 changes: 50 additions & 0 deletions packages/server/src/routes/piloWs.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ vi.mock("../taskRunner.js", () => ({

import { createPiloWsRoute } from "./piloWs.js";
import type { UpgradeWebSocket, WSContext } from "hono/ws";
import { _resetInflight, tryAcquire } from "../concurrency.js";

const UUID_RE = /^[0-9a-f]{8}-[0-9a-f]{4}-4[0-9a-f]{3}-[89ab][0-9a-f]{3}-[0-9a-f]{12}$/i;

Expand Down Expand Up @@ -347,6 +348,55 @@ describe("piloWs", () => {
});
});

describe("capacity limit", () => {
afterEach(() => {
delete process.env.PILO_MAX_CONCURRENT_TASKS;
_resetInflight();
});

it("should send AT_CAPACITY error and not start a task when at limit", async () => {
process.env.PILO_MAX_CONCURRENT_TASKS = "1";
_resetInflight();
expect(tryAcquire()).toBe(true);

const h = createTestHarness();
h.sendMessage({ event: "task:details", data: { task: "test" } });
await vi.runAllTimersAsync();

const errorMsg = h.sentMessages.find((m) => m.event === "error");
expect(errorMsg).toBeDefined();
expect(errorMsg!.data.error.code).toBe("AT_CAPACITY");
expect(errorMsg!.data.error.reason).toBe("AT_CAPACITY");
expect(errorMsg!.data.error.taskId).toMatch(UUID_RE);
// runTask must not have been called
expect(mockRunTask).not.toHaveBeenCalled();
});

it("should release the slot after a successful task completes", async () => {
_resetInflight();
mockRunTask.mockResolvedValue({ success: true });

const h = createTestHarness();
h.sendMessage({ event: "task:details", data: { task: "test" } });
await vi.runAllTimersAsync();

const { getInflight } = await import("../concurrency.js");
expect(getInflight()).toBe(0);
});

it("should release the slot after a task throws", async () => {
_resetInflight();
mockRunTask.mockRejectedValue(new Error("boom"));

const h = createTestHarness();
h.sendMessage({ event: "task:details", data: { task: "test" } });
await vi.runAllTimersAsync();

const { getInflight } = await import("../concurrency.js");
expect(getInflight()).toBe(0);
});
});

describe("taskId", () => {
it("should emit task:accepted event with taskId after validation passes", async () => {
const h = createTestHarness();
Expand Down
17 changes: 17 additions & 0 deletions packages/server/src/routes/piloWs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import { Hono } from "hono";
import type { UpgradeWebSocket, WSContext } from "hono/ws";
import type { UserDataCallback, UserDataResponse } from "pilo-core";
import { withRemoteContext } from "pilo-core";
import { release, tryAcquire } from "../concurrency.js";
import {
runTask,
validateTaskRequest,
Expand Down Expand Up @@ -125,6 +126,21 @@ export function createPiloWsRoute(upgradeWebSocket: UpgradeWebSocket): Hono {
return;
}

if (!tryAcquire()) {
send(
ws,
"error",
createErrorResponse({
message: "The server is at capacity. Retry after a short delay.",
code: "AT_CAPACITY",
reason: "AT_CAPACITY",
phase: "setup",
taskId,
}),
);
return;
}

taskRunning = true;
send(ws, "task:accepted", { taskId });

Expand Down Expand Up @@ -173,6 +189,7 @@ export function createPiloWsRoute(upgradeWebSocket: UpgradeWebSocket): Hono {
}
} finally {
taskRunning = false;
release();
for (const [id, pending] of pendingRequests) {
clearTimeout(pending.timer);
pending.reject(new Error("Task ended"));
Expand Down
2 changes: 2 additions & 0 deletions packages/server/src/taskRunner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ export type ErrorReason =
| "MAX_ITERATIONS"
| "MAX_ERRORS"
| "TIMEOUT"
| "AT_CAPACITY"
| "INTERNAL_ERROR";

/** Which phase of the request pipeline produced the error. */
Expand Down Expand Up @@ -167,6 +168,7 @@ const REASON_HINTS: Record<ErrorReason, string> = {
"The agent exceeded the maximum number of iterations without completing the task.",
MAX_ERRORS: "The agent hit the error threshold and aborted.",
TIMEOUT: "The task exceeded its time budget.",
AT_CAPACITY: "The server is at capacity. Retry after a short delay.",
INTERNAL_ERROR: "The task failed due to an internal error.",
};

Expand Down
94 changes: 94 additions & 0 deletions packages/server/test/concurrency.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
import { describe, it, expect, beforeEach, afterEach } from "vitest";
import {
tryAcquire,
release,
getInflight,
getMaxConcurrent,
_resetInflight,
} from "../src/concurrency.js";

describe("concurrency limit", () => {
const originalEnv = process.env.PILO_MAX_CONCURRENT_TASKS;

beforeEach(() => {
_resetInflight();
});

afterEach(() => {
if (originalEnv === undefined) {
delete process.env.PILO_MAX_CONCURRENT_TASKS;
} else {
process.env.PILO_MAX_CONCURRENT_TASKS = originalEnv;
}
_resetInflight();
});

describe("tryAcquire / release", () => {
it("returns true and increments while under the limit", () => {
process.env.PILO_MAX_CONCURRENT_TASKS = "3";
expect(tryAcquire()).toBe(true);
expect(tryAcquire()).toBe(true);
expect(getInflight()).toBe(2);
});

it("returns false and does NOT increment when at the limit", () => {
process.env.PILO_MAX_CONCURRENT_TASKS = "2";
expect(tryAcquire()).toBe(true);
expect(tryAcquire()).toBe(true);
expect(tryAcquire()).toBe(false);
expect(getInflight()).toBe(2);
});

it("release decrements the counter", () => {
process.env.PILO_MAX_CONCURRENT_TASKS = "2";
tryAcquire();
tryAcquire();
release();
expect(getInflight()).toBe(1);
});

it("after a release, a new tryAcquire succeeds again", () => {
process.env.PILO_MAX_CONCURRENT_TASKS = "1";
expect(tryAcquire()).toBe(true);
expect(tryAcquire()).toBe(false);
release();
expect(tryAcquire()).toBe(true);
});

it("release floors at zero (safe to call extra times)", () => {
release();
release();
release();
expect(getInflight()).toBe(0);
});
});

describe("limit configuration", () => {
it("uses 10 as the default when PILO_MAX_CONCURRENT_TASKS is unset", () => {
delete process.env.PILO_MAX_CONCURRENT_TASKS;
expect(getMaxConcurrent()).toBe(10);
});

it("reads PILO_MAX_CONCURRENT_TASKS from env", () => {
process.env.PILO_MAX_CONCURRENT_TASKS = "5";
expect(getMaxConcurrent()).toBe(5);
});

it("falls back to default for non-numeric values", () => {
process.env.PILO_MAX_CONCURRENT_TASKS = "not-a-number";
expect(getMaxConcurrent()).toBe(10);
});

it("falls back to default for zero or negative values", () => {
process.env.PILO_MAX_CONCURRENT_TASKS = "0";
expect(getMaxConcurrent()).toBe(10);
process.env.PILO_MAX_CONCURRENT_TASKS = "-3";
expect(getMaxConcurrent()).toBe(10);
});

it("floors fractional values", () => {
process.env.PILO_MAX_CONCURRENT_TASKS = "3.7";
expect(getMaxConcurrent()).toBe(3);
});
});
});