-
Notifications
You must be signed in to change notification settings - Fork 1
B3: concurrency limit with 429 backpressure #404
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
MrTravisB
wants to merge
1
commit into
travis/pilo-sentry-scrubber
Choose a base branch
from
travis/pilo-concurrency-limit
base: travis/pilo-sentry-scrubber
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from all commits
Commits
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,64 @@ | ||
| /** | ||
| * In-process concurrency limit for task execution. | ||
| * | ||
| * Bounds how many tasks can run concurrently across all SSE and WebSocket | ||
| * connections. When at the limit, new requests get an immediate | ||
| * `AT_CAPACITY` rejection with a `Retry-After` hint, instead of queuing | ||
| * indefinitely on the server and tying up file descriptors / memory. | ||
| * | ||
| * The limit comes from `PILO_MAX_CONCURRENT_TASKS` (default 10). Read lazily | ||
| * each call so tests can override at runtime; production reads it once at | ||
| * startup since the env var doesn't change. | ||
| * | ||
| * In-process only — fine for the current single-instance deployment. If we | ||
| * ever scale to multiple replicas, replace with a Redis-backed token bucket | ||
| * or rely on the load balancer's queue depth. | ||
| */ | ||
|
|
||
| const DEFAULT_LIMIT = 10; | ||
|
|
||
| function parseLimit(): number { | ||
| const env = process.env.PILO_MAX_CONCURRENT_TASKS; | ||
| if (env === undefined) return DEFAULT_LIMIT; | ||
| const n = Number(env); | ||
| if (!Number.isFinite(n) || n <= 0) return DEFAULT_LIMIT; | ||
| return Math.floor(n); | ||
| } | ||
|
|
||
| let inflight = 0; | ||
|
|
||
| /** | ||
| * Try to acquire a task slot. Returns true on success (caller MUST eventually | ||
| * call `release()` exactly once). Returns false when at the limit; the caller | ||
| * should respond with an AT_CAPACITY error. | ||
| */ | ||
| export function tryAcquire(): boolean { | ||
| if (inflight >= parseLimit()) return false; | ||
| inflight++; | ||
| return true; | ||
| } | ||
|
|
||
| /** | ||
| * Release a previously-acquired task slot. Safe to call when the counter is | ||
| * already at zero (no-op) so cleanup paths don't need to track acquisition | ||
| * state precisely — but each successful `tryAcquire` should be paired with | ||
| * exactly one release. | ||
| */ | ||
| export function release(): void { | ||
| if (inflight > 0) inflight--; | ||
| } | ||
|
|
||
| /** Current number of in-flight tasks. */ | ||
| export function getInflight(): number { | ||
| return inflight; | ||
| } | ||
|
|
||
| /** The configured maximum (re-evaluated each call from the env var). */ | ||
| export function getMaxConcurrent(): number { | ||
| return parseLimit(); | ||
| } | ||
|
|
||
| /** Test-only: reset the inflight counter. */ | ||
| export function _resetInflight(): void { | ||
| inflight = 0; | ||
| } | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,94 @@ | ||
| import { describe, it, expect, beforeEach, afterEach } from "vitest"; | ||
| import { | ||
| tryAcquire, | ||
| release, | ||
| getInflight, | ||
| getMaxConcurrent, | ||
| _resetInflight, | ||
| } from "../src/concurrency.js"; | ||
|
|
||
| describe("concurrency limit", () => { | ||
| const originalEnv = process.env.PILO_MAX_CONCURRENT_TASKS; | ||
|
|
||
| beforeEach(() => { | ||
| _resetInflight(); | ||
| }); | ||
|
|
||
| afterEach(() => { | ||
| if (originalEnv === undefined) { | ||
| delete process.env.PILO_MAX_CONCURRENT_TASKS; | ||
| } else { | ||
| process.env.PILO_MAX_CONCURRENT_TASKS = originalEnv; | ||
| } | ||
| _resetInflight(); | ||
| }); | ||
|
|
||
| describe("tryAcquire / release", () => { | ||
| it("returns true and increments while under the limit", () => { | ||
| process.env.PILO_MAX_CONCURRENT_TASKS = "3"; | ||
| expect(tryAcquire()).toBe(true); | ||
| expect(tryAcquire()).toBe(true); | ||
| expect(getInflight()).toBe(2); | ||
| }); | ||
|
|
||
| it("returns false and does NOT increment when at the limit", () => { | ||
| process.env.PILO_MAX_CONCURRENT_TASKS = "2"; | ||
| expect(tryAcquire()).toBe(true); | ||
| expect(tryAcquire()).toBe(true); | ||
| expect(tryAcquire()).toBe(false); | ||
| expect(getInflight()).toBe(2); | ||
| }); | ||
|
|
||
| it("release decrements the counter", () => { | ||
| process.env.PILO_MAX_CONCURRENT_TASKS = "2"; | ||
| tryAcquire(); | ||
| tryAcquire(); | ||
| release(); | ||
| expect(getInflight()).toBe(1); | ||
| }); | ||
|
|
||
| it("after a release, a new tryAcquire succeeds again", () => { | ||
| process.env.PILO_MAX_CONCURRENT_TASKS = "1"; | ||
| expect(tryAcquire()).toBe(true); | ||
| expect(tryAcquire()).toBe(false); | ||
| release(); | ||
| expect(tryAcquire()).toBe(true); | ||
| }); | ||
|
|
||
| it("release floors at zero (safe to call extra times)", () => { | ||
| release(); | ||
| release(); | ||
| release(); | ||
| expect(getInflight()).toBe(0); | ||
| }); | ||
| }); | ||
|
|
||
| describe("limit configuration", () => { | ||
| it("uses 10 as the default when PILO_MAX_CONCURRENT_TASKS is unset", () => { | ||
| delete process.env.PILO_MAX_CONCURRENT_TASKS; | ||
| expect(getMaxConcurrent()).toBe(10); | ||
| }); | ||
|
|
||
| it("reads PILO_MAX_CONCURRENT_TASKS from env", () => { | ||
| process.env.PILO_MAX_CONCURRENT_TASKS = "5"; | ||
| expect(getMaxConcurrent()).toBe(5); | ||
| }); | ||
|
|
||
| it("falls back to default for non-numeric values", () => { | ||
| process.env.PILO_MAX_CONCURRENT_TASKS = "not-a-number"; | ||
| expect(getMaxConcurrent()).toBe(10); | ||
| }); | ||
|
|
||
| it("falls back to default for zero or negative values", () => { | ||
| process.env.PILO_MAX_CONCURRENT_TASKS = "0"; | ||
| expect(getMaxConcurrent()).toBe(10); | ||
| process.env.PILO_MAX_CONCURRENT_TASKS = "-3"; | ||
| expect(getMaxConcurrent()).toBe(10); | ||
| }); | ||
|
|
||
| it("floors fractional values", () => { | ||
| process.env.PILO_MAX_CONCURRENT_TASKS = "3.7"; | ||
| expect(getMaxConcurrent()).toBe(3); | ||
| }); | ||
| }); | ||
| }); |
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, we might need this sooner than later for Pilo server production: looks like we currently run at least 3 replicas there and a max of 30. So, this would only cap concurrency per replica?