Skip to content

Commit 0b80ed3

Browse files
feat(rate-limiter): FIFO queue for hosted-key per-workspace fairness
Replace the per-call distributed lock with a Redis-backed FIFO queue so callers within a workspace get strict ordering instead of racing the bucket. Adds heartbeat-based crash recovery and dead-head reaping in a single Lua script. Bumps Exa search hosted RPM from 5 to 60.
1 parent d7a6339 commit 0b80ed3

6 files changed

Lines changed: 715 additions & 82 deletions

File tree

apps/sim/lib/core/rate-limiter/hosted-key/hosted-key-rate-limiter.test.ts

Lines changed: 155 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import type {
55
TokenStatus,
66
} from '@/lib/core/rate-limiter/storage'
77
import { HostedKeyRateLimiter } from './hosted-key-rate-limiter'
8+
import type { HostedKeyQueue } from './queue'
89
import type { CustomRateLimit, PerRequestRateLimit } from './types'
910

1011
/** Force the queue wait to give up on the first iteration by reporting a retry time
@@ -23,10 +24,30 @@ const createMockAdapter = (): MockAdapter => ({
2324
resetBucket: vi.fn(),
2425
})
2526

27+
interface MockQueue {
28+
enqueue: Mock
29+
checkHead: Mock
30+
refreshHeartbeat: Mock
31+
dequeue: Mock
32+
}
33+
34+
/** Stub queue that defaults to "you're at the head, no waiting" — i.e. acts as if the
35+
* queue is empty or Redis is unavailable. Tests override per-call to simulate ordering. */
36+
const createMockQueue = (): MockQueue => {
37+
const queue: MockQueue = {
38+
enqueue: vi.fn().mockResolvedValue({ position: 0, enabled: true }),
39+
checkHead: vi.fn().mockResolvedValue('head'),
40+
refreshHeartbeat: vi.fn().mockResolvedValue(undefined),
41+
dequeue: vi.fn().mockResolvedValue(undefined),
42+
}
43+
return queue
44+
}
45+
2646
describe('HostedKeyRateLimiter', () => {
2747
const testProvider = 'exa'
2848
const envKeyPrefix = 'EXA_API_KEY'
2949
let mockAdapter: MockAdapter
50+
let mockQueue: MockQueue
3051
let rateLimiter: HostedKeyRateLimiter
3152
let originalEnv: NodeJS.ProcessEnv
3253

@@ -38,7 +59,11 @@ describe('HostedKeyRateLimiter', () => {
3859
beforeEach(() => {
3960
vi.clearAllMocks()
4061
mockAdapter = createMockAdapter()
41-
rateLimiter = new HostedKeyRateLimiter(mockAdapter as RateLimitStorageAdapter)
62+
mockQueue = createMockQueue()
63+
rateLimiter = new HostedKeyRateLimiter(
64+
mockAdapter as RateLimitStorageAdapter,
65+
mockQueue as unknown as HostedKeyQueue
66+
)
4267

4368
originalEnv = { ...process.env }
4469
process.env.EXA_API_KEY_COUNT = '3'
@@ -216,6 +241,135 @@ describe('HostedKeyRateLimiter', () => {
216241
})
217242
})
218243

244+
describe('FIFO queue ordering', () => {
245+
const allowed: ConsumeResult = {
246+
allowed: true,
247+
tokensRemaining: 9,
248+
resetAt: new Date(Date.now() + 60000),
249+
}
250+
251+
it('enqueues every call onto the per-workspace+provider queue', async () => {
252+
mockAdapter.consumeTokens.mockResolvedValue(allowed)
253+
254+
await rateLimiter.acquireKey(testProvider, envKeyPrefix, perRequestRateLimit, 'workspace-1')
255+
256+
expect(mockQueue.enqueue).toHaveBeenCalledWith(
257+
testProvider,
258+
'workspace-1',
259+
expect.any(String)
260+
)
261+
})
262+
263+
it('always dequeues at the end of a successful acquisition', async () => {
264+
mockAdapter.consumeTokens.mockResolvedValue(allowed)
265+
266+
await rateLimiter.acquireKey(testProvider, envKeyPrefix, perRequestRateLimit, 'workspace-1')
267+
268+
expect(mockQueue.dequeue).toHaveBeenCalledWith(
269+
testProvider,
270+
'workspace-1',
271+
expect.any(String)
272+
)
273+
})
274+
275+
it('always dequeues even when the call fails (no keys configured)', async () => {
276+
mockAdapter.consumeTokens.mockResolvedValue(allowed)
277+
process.env.EXA_API_KEY_COUNT = '0'
278+
279+
await rateLimiter.acquireKey(testProvider, envKeyPrefix, perRequestRateLimit, 'workspace-1')
280+
281+
expect(mockQueue.dequeue).toHaveBeenCalled()
282+
})
283+
284+
it('waits at the head of the queue before consuming from the bucket', async () => {
285+
mockAdapter.consumeTokens.mockResolvedValue(allowed)
286+
// First two checkHead calls say we're waiting; third says we're up.
287+
mockQueue.checkHead
288+
.mockResolvedValueOnce('waiting')
289+
.mockResolvedValueOnce('waiting')
290+
.mockResolvedValueOnce('head')
291+
292+
const result = await rateLimiter.acquireKey(
293+
testProvider,
294+
envKeyPrefix,
295+
perRequestRateLimit,
296+
'workspace-1'
297+
)
298+
299+
expect(result.success).toBe(true)
300+
expect(mockQueue.checkHead).toHaveBeenCalledTimes(3)
301+
// Bucket is only consumed once we reach the head.
302+
expect(mockAdapter.consumeTokens).toHaveBeenCalledTimes(1)
303+
})
304+
305+
it('refreshes the heartbeat while waiting at the head of the queue', async () => {
306+
mockAdapter.consumeTokens.mockResolvedValue(allowed)
307+
308+
// We need the wait loop to iterate long enough for HEARTBEAT_REFRESH_INTERVAL_MS
309+
// to elapse. Use fake timers so we don't actually sleep.
310+
vi.useFakeTimers()
311+
try {
312+
// Queue says we're waiting forever — except after some time we're at head.
313+
mockQueue.checkHead.mockImplementation(async () => {
314+
// Advance past the heartbeat interval each time we poll, then say we're up.
315+
vi.advanceTimersByTime(15_000)
316+
return mockQueue.checkHead.mock.calls.length >= 2 ? 'head' : 'waiting'
317+
})
318+
319+
const promise = rateLimiter.acquireKey(
320+
testProvider,
321+
envKeyPrefix,
322+
perRequestRateLimit,
323+
'workspace-1'
324+
)
325+
// Drain pending timers so the sleep() resolves.
326+
await vi.runAllTimersAsync()
327+
await promise
328+
329+
expect(mockQueue.refreshHeartbeat).toHaveBeenCalled()
330+
} finally {
331+
vi.useRealTimers()
332+
}
333+
})
334+
335+
it('returns 429 when the queue wait exceeds the cap', async () => {
336+
mockAdapter.consumeTokens.mockResolvedValue(allowed)
337+
mockQueue.checkHead.mockResolvedValue('waiting')
338+
339+
vi.useFakeTimers()
340+
try {
341+
const promise = rateLimiter.acquireKey(
342+
testProvider,
343+
envKeyPrefix,
344+
perRequestRateLimit,
345+
'workspace-1'
346+
)
347+
// Burn past the 5-minute cap.
348+
await vi.advanceTimersByTimeAsync(6 * 60 * 1000)
349+
const result = await promise
350+
351+
expect(result.success).toBe(false)
352+
expect(result.billingActorRateLimited).toBe(true)
353+
} finally {
354+
vi.useRealTimers()
355+
}
356+
})
357+
358+
it('treats "missing" status as proceed (queue evicted, fall through to bucket race)', async () => {
359+
mockAdapter.consumeTokens.mockResolvedValue(allowed)
360+
mockQueue.checkHead.mockResolvedValueOnce('missing')
361+
362+
const result = await rateLimiter.acquireKey(
363+
testProvider,
364+
envKeyPrefix,
365+
perRequestRateLimit,
366+
'workspace-1'
367+
)
368+
369+
expect(result.success).toBe(true)
370+
})
371+
})
372+
219373
describe('acquireKey with custom rate limit', () => {
220374
const customRateLimit: CustomRateLimit = {
221375
mode: 'custom',

0 commit comments

Comments
 (0)