From c85cea0fc1d48614ebb89e872cdee70dd27f8c6b Mon Sep 17 00:00:00 2001 From: Nico Prananta <311343+nicnocquee@users.noreply.github.com> Date: Tue, 24 Feb 2026 11:49:10 +0100 Subject: [PATCH] Add group-based concurrency limits to job processing - Introduced `group` metadata in job options to allow for global concurrency limits based on group identifiers. - Added `groupConcurrency` option in processor settings to cap the number of concurrent jobs per group across all workers. - Updated documentation to include examples of how to use `group` and `groupConcurrency` for multi-tenant fairness and resource management. - Enhanced tests to verify the correct enforcement of group-based concurrency limits, ensuring ungrouped jobs remain unaffected. These changes improve the flexibility and control over job processing, enabling better resource allocation in multi-tenant environments. --- apps/docs/content/docs/api/job-queue.mdx | 5 + apps/docs/content/docs/api/processor.mdx | 5 + apps/docs/content/docs/intro/comparison.mdx | 40 ++-- apps/docs/content/docs/usage/process-jobs.mdx | 21 +++ apps/docs/content/docs/usage/scaling.mdx | 23 +++ apps/docs/public/llms-full.txt | 94 ++++++++-- packages/dataqueue/ai/rules/advanced.md | 1 + packages/dataqueue/ai/rules/basic.md | 3 +- .../ai/skills/dataqueue-advanced/SKILL.md | 22 +++ .../ai/skills/dataqueue-core/SKILL.md | 2 + ...00000007_add_group_fields_to_job_queue.sql | 16 ++ packages/dataqueue/src/backend.ts | 1 + packages/dataqueue/src/backends/postgres.ts | 174 +++++++++++++----- .../dataqueue/src/backends/redis-scripts.ts | 135 ++++++++++---- packages/dataqueue/src/backends/redis.test.ts | 85 +++++++++ packages/dataqueue/src/backends/redis.ts | 9 + packages/dataqueue/src/processor.test.ts | 18 ++ packages/dataqueue/src/processor.ts | 14 ++ packages/dataqueue/src/queue.test.ts | 107 +++++++++++ packages/dataqueue/src/queue.ts | 2 + packages/dataqueue/src/types.ts | 35 ++++ 21 files changed, 692 insertions(+), 120 deletions(-) create mode 100644 packages/dataqueue/migrations/1781200000007_add_group_fields_to_job_queue.sql diff --git a/apps/docs/content/docs/api/job-queue.mdx b/apps/docs/content/docs/api/job-queue.mdx index 55cc717..5b38326 100644 --- a/apps/docs/content/docs/api/job-queue.mdx +++ b/apps/docs/content/docs/api/job-queue.mdx @@ -115,12 +115,14 @@ interface JobOptions { retryDelay?: number; // Base delay between retries in seconds (default: 60) retryBackoff?: boolean; // Use exponential backoff (default: true) retryDelayMax?: number; // Max delay cap in seconds (default: none) + group?: { id: string; tier?: string }; // Optional group for global concurrency limits } ``` - `retryDelay` - Base delay between retries in seconds. When `retryBackoff` is true, this is the base for exponential backoff (`retryDelay * 2^attempts`). When false, retries use this fixed delay. Default: `60`. - `retryBackoff` - Whether to use exponential backoff. When true, delay doubles with each attempt and includes jitter. Default: `true`. - `retryDelayMax` - Maximum delay cap in seconds. Only meaningful when `retryBackoff` is true. No limit when omitted. +- `group` - Optional grouping metadata. Use `group.id` to enforce global per-group limits with `ProcessorOptions.groupConcurrency`. `group.tier` is reserved for future policies. #### AddJobOptions @@ -531,6 +533,7 @@ interface ProcessorOptions { workerId?: string; batchSize?: number; concurrency?: number; + groupConcurrency?: number; pollInterval?: number; onError?: (error: Error) => void; verbose?: boolean; @@ -538,6 +541,8 @@ interface ProcessorOptions { } ``` +- `groupConcurrency` - Optional global per-group concurrency limit (positive integer). Applies only to jobs with `group.id`; ungrouped jobs are unaffected. + --- ## Background Supervisor diff --git a/apps/docs/content/docs/api/processor.mdx b/apps/docs/content/docs/api/processor.mdx index 8911a43..5714d04 100644 --- a/apps/docs/content/docs/api/processor.mdx +++ b/apps/docs/content/docs/api/processor.mdx @@ -20,6 +20,7 @@ interface ProcessorOptions { workerId?: string; batchSize?: number; concurrency?: number; + groupConcurrency?: number; pollInterval?: number; onError?: (error: Error) => void; verbose?: boolean; @@ -27,6 +28,10 @@ interface ProcessorOptions { } ``` +- `groupConcurrency` sets a global per-group concurrency cap across all workers/instances for jobs with `group.id`. +- Must be a positive integer when provided. +- Jobs without `group.id` are not affected. + ## Methods ### startInBackground diff --git a/apps/docs/content/docs/intro/comparison.mdx b/apps/docs/content/docs/intro/comparison.mdx index 127ef04..db6dc79 100644 --- a/apps/docs/content/docs/intro/comparison.mdx +++ b/apps/docs/content/docs/intro/comparison.mdx @@ -5,26 +5,26 @@ description: How DataQueue compares to BullMQ and Trigger.dev Choosing a job queue depends on your stack, infrastructure preferences, and the features you need. Here is a side-by-side comparison of **DataQueue**, **BullMQ**, and **Trigger.dev**. -| Feature | DataQueue | BullMQ | Trigger.dev | -| ----------------------- | ----------------------------------------------- | ------------------------------------------- | --------------------------------------- | -| **Backend** | PostgreSQL or Redis | Redis only | Cloud or self-hosted (Postgres + Redis) | -| **Type Safety** | Full generic `PayloadMap` | Basic types | Full TypeScript tasks | -| **Scheduling** | `runAt`, Cron | Cron, delayed, recurring | Cron, delayed | -| **Retries** | Exponential backoff, configurable `maxAttempts` | Exponential backoff, custom strategies, DLQ | Auto retries, bulk replay, DLQ | -| **Priority** | Integer priority | Priority levels | Queue-based priority | -| **Concurrency Control** | `batchSize` + `concurrency` | Built-in | Per-task + shared limits | -| **Rate Limiting** | - | Yes | Via concurrency limits | -| **Job Flows / DAGs** | - | Parent-child flows | Workflows | -| **Dashboard** | Built-in Next.js package | Third-party (Bull Board, etc.) | Built-in web dashboard | -| **Wait / Pause Jobs** | `waitFor`, `waitUntil`, token system | - | Durable execution | -| **Human-in-the-Loop** | Token system | - | Yes | -| **Progress Tracking** | Yes (0-100%) | Yes | Yes (realtime) | -| **Serverless-First** | Yes | No (needs long-running process) | Yes (cloud) | -| **Self-Hosted** | Yes | Yes (your Redis) | Yes (containers) | -| **Cloud Option** | - | - | Yes | -| **License** | MIT | MIT | Apache-2.0 | -| **Pricing** | Free (OSS) | Free (OSS) | Free tier + paid plans | -| **Infrastructure** | Your own Postgres or Redis | Your own Redis | Their cloud or your infra | +| Feature | DataQueue | BullMQ | Trigger.dev | +| ----------------------- | ------------------------------------------------------- | ------------------------------------------- | --------------------------------------- | +| **Backend** | PostgreSQL or Redis | Redis only | Cloud or self-hosted (Postgres + Redis) | +| **Type Safety** | Full generic `PayloadMap` | Basic types | Full TypeScript tasks | +| **Scheduling** | `runAt`, Cron | Cron, delayed, recurring | Cron, delayed | +| **Retries** | Exponential backoff, configurable `maxAttempts` | Exponential backoff, custom strategies, DLQ | Auto retries, bulk replay, DLQ | +| **Priority** | Integer priority | Priority levels | Queue-based priority | +| **Concurrency Control** | `batchSize` + `concurrency` + global `groupConcurrency` | Built-in | Per-task + shared limits | +| **Rate Limiting** | - | Yes | Via concurrency limits | +| **Job Flows / DAGs** | - | Parent-child flows | Workflows | +| **Dashboard** | Built-in Next.js package | Third-party (Bull Board, etc.) | Built-in web dashboard | +| **Wait / Pause Jobs** | `waitFor`, `waitUntil`, token system | - | Durable execution | +| **Human-in-the-Loop** | Token system | - | Yes | +| **Progress Tracking** | Yes (0-100%) | Yes | Yes (realtime) | +| **Serverless-First** | Yes | No (needs long-running process) | Yes (cloud) | +| **Self-Hosted** | Yes | Yes (your Redis) | Yes (containers) | +| **Cloud Option** | - | - | Yes | +| **License** | MIT | MIT | Apache-2.0 | +| **Pricing** | Free (OSS) | Free (OSS) | Free tier + paid plans | +| **Infrastructure** | Your own Postgres or Redis | Your own Redis | Their cloud or your infra | ## Where DataQueue shines diff --git a/apps/docs/content/docs/usage/process-jobs.mdx b/apps/docs/content/docs/usage/process-jobs.mdx index 6ae84d1..7b6ce9d 100644 --- a/apps/docs/content/docs/usage/process-jobs.mdx +++ b/apps/docs/content/docs/usage/process-jobs.mdx @@ -36,6 +36,7 @@ export async function GET(request: Request) { workerId: `cron-${Date.now()}`, batchSize: 10, // up to 10 jobs per batch concurrency: 3, // up to 3 jobs processed in parallel + groupConcurrency: 2, // optional: max 2 concurrent jobs per group.id globally verbose: true, }); @@ -71,6 +72,26 @@ Some jobs are resource-intensive, like image processing, LLM calls, or calling a The default is `3`. Set it to `1` to process jobs one at a time. Use a lower value to avoid exhausting resources in constrained environments. +### Group-Based Concurrency + +When jobs should be limited per tenant/customer/account across all workers, set `groupConcurrency` on the processor and provide `group.id` when enqueuing jobs: + +```typescript +await jobQueue.addJob({ + jobType: 'send_email', + payload: { to: 'user@example.com' }, + group: { id: 'tenant_123' }, +}); + +const processor = jobQueue.createProcessor(jobHandlers, { + batchSize: 20, + concurrency: 10, + groupConcurrency: 2, +}); +``` + +With this configuration, at most 2 jobs from the same `group.id` run at once globally, while ungrouped jobs continue to flow normally. + ### Triggering the Processor via Cron Defining an endpoint isn't enough—you need to trigger it regularly. For example, use Vercel cron to trigger the endpoint every minute by adding this to your `vercel.json`: diff --git a/apps/docs/content/docs/usage/scaling.mdx b/apps/docs/content/docs/usage/scaling.mdx index fd0e43d..9268372 100644 --- a/apps/docs/content/docs/usage/scaling.mdx +++ b/apps/docs/content/docs/usage/scaling.mdx @@ -46,6 +46,29 @@ For example, with `batchSize: 20`, `concurrency: 10`, `pollInterval: 2000ms`, an - **IO-bound jobs** (API calls, email sending): higher concurrency (5-20) works well since jobs spend most time waiting. - **Rate-limited APIs**: match concurrency to the API's rate limit to avoid throttling. +### Group Concurrency + +`groupConcurrency` caps how many jobs with the same `group.id` can be in `processing` at the same time across all worker instances. + +- Use this for multi-tenant fairness and per-account rate protection. +- It is global coordination (PostgreSQL + Redis), not per-process. +- Ungrouped jobs are unaffected. + +```typescript +await jobQueue.addJob({ + jobType: 'send_email', + payload: { to: 'user@example.com' }, + group: { id: 'tenant_abc' }, +}); + +const processor = jobQueue.createProcessor(jobHandlers, { + batchSize: 30, + concurrency: 10, + groupConcurrency: 2, + pollInterval: 2000, +}); +``` + ### Poll Interval `pollInterval` controls how often the processor checks for new jobs when idle. diff --git a/apps/docs/public/llms-full.txt b/apps/docs/public/llms-full.txt index 0cddfa8..1c242ad 100644 --- a/apps/docs/public/llms-full.txt +++ b/apps/docs/public/llms-full.txt @@ -335,12 +335,14 @@ interface JobOptions { retryDelay?: number; // Base delay between retries in seconds (default: 60) retryBackoff?: boolean; // Use exponential backoff (default: true) retryDelayMax?: number; // Max delay cap in seconds (default: none) + group?: { id: string; tier?: string }; // Optional group for global concurrency limits } ``` - `retryDelay` - Base delay between retries in seconds. When `retryBackoff` is true, this is the base for exponential backoff (`retryDelay * 2^attempts`). When false, retries use this fixed delay. Default: `60`. - `retryBackoff` - Whether to use exponential backoff. When true, delay doubles with each attempt and includes jitter. Default: `true`. - `retryDelayMax` - Maximum delay cap in seconds. Only meaningful when `retryBackoff` is true. No limit when omitted. +- `group` - Optional grouping metadata. Use `group.id` to enforce global per-group limits with `ProcessorOptions.groupConcurrency`. `group.tier` is reserved for future policies. #### AddJobOptions @@ -751,6 +753,7 @@ interface ProcessorOptions { workerId?: string; batchSize?: number; concurrency?: number; + groupConcurrency?: number; pollInterval?: number; onError?: (error: Error) => void; verbose?: boolean; @@ -758,6 +761,8 @@ interface ProcessorOptions { } ``` +- `groupConcurrency` - Optional global per-group concurrency limit (positive integer). Applies only to jobs with `group.id`; ungrouped jobs are unaffected. + --- ## Background Supervisor @@ -926,6 +931,7 @@ interface ProcessorOptions { workerId?: string; batchSize?: number; concurrency?: number; + groupConcurrency?: number; pollInterval?: number; onError?: (error: Error) => void; verbose?: boolean; @@ -933,6 +939,10 @@ interface ProcessorOptions { } ``` +- `groupConcurrency` sets a global per-group concurrency cap across all workers/instances for jobs with `group.id`. +- Must be a positive integer when provided. +- Jobs without `group.id` are not affected. + ## Methods ### startInBackground @@ -1506,26 +1516,26 @@ Slug: intro/comparison Choosing a job queue depends on your stack, infrastructure preferences, and the features you need. Here is a side-by-side comparison of **DataQueue**, **BullMQ**, and **Trigger.dev**. -| Feature | DataQueue | BullMQ | Trigger.dev | -| ----------------------- | ----------------------------------------------- | ------------------------------------------- | --------------------------------------- | -| **Backend** | PostgreSQL or Redis | Redis only | Cloud or self-hosted (Postgres + Redis) | -| **Type Safety** | Full generic `PayloadMap` | Basic types | Full TypeScript tasks | -| **Scheduling** | `runAt`, Cron | Cron, delayed, recurring | Cron, delayed | -| **Retries** | Exponential backoff, configurable `maxAttempts` | Exponential backoff, custom strategies, DLQ | Auto retries, bulk replay, DLQ | -| **Priority** | Integer priority | Priority levels | Queue-based priority | -| **Concurrency Control** | `batchSize` + `concurrency` | Built-in | Per-task + shared limits | -| **Rate Limiting** | - | Yes | Via concurrency limits | -| **Job Flows / DAGs** | - | Parent-child flows | Workflows | -| **Dashboard** | Built-in Next.js package | Third-party (Bull Board, etc.) | Built-in web dashboard | -| **Wait / Pause Jobs** | `waitFor`, `waitUntil`, token system | - | Durable execution | -| **Human-in-the-Loop** | Token system | - | Yes | -| **Progress Tracking** | Yes (0-100%) | Yes | Yes (realtime) | -| **Serverless-First** | Yes | No (needs long-running process) | Yes (cloud) | -| **Self-Hosted** | Yes | Yes (your Redis) | Yes (containers) | -| **Cloud Option** | - | - | Yes | -| **License** | MIT | MIT | Apache-2.0 | -| **Pricing** | Free (OSS) | Free (OSS) | Free tier + paid plans | -| **Infrastructure** | Your own Postgres or Redis | Your own Redis | Their cloud or your infra | +| Feature | DataQueue | BullMQ | Trigger.dev | +| ----------------------- | ------------------------------------------------------- | ------------------------------------------- | --------------------------------------- | +| **Backend** | PostgreSQL or Redis | Redis only | Cloud or self-hosted (Postgres + Redis) | +| **Type Safety** | Full generic `PayloadMap` | Basic types | Full TypeScript tasks | +| **Scheduling** | `runAt`, Cron | Cron, delayed, recurring | Cron, delayed | +| **Retries** | Exponential backoff, configurable `maxAttempts` | Exponential backoff, custom strategies, DLQ | Auto retries, bulk replay, DLQ | +| **Priority** | Integer priority | Priority levels | Queue-based priority | +| **Concurrency Control** | `batchSize` + `concurrency` + global `groupConcurrency` | Built-in | Per-task + shared limits | +| **Rate Limiting** | - | Yes | Via concurrency limits | +| **Job Flows / DAGs** | - | Parent-child flows | Workflows | +| **Dashboard** | Built-in Next.js package | Third-party (Bull Board, etc.) | Built-in web dashboard | +| **Wait / Pause Jobs** | `waitFor`, `waitUntil`, token system | - | Durable execution | +| **Human-in-the-Loop** | Token system | - | Yes | +| **Progress Tracking** | Yes (0-100%) | Yes | Yes (realtime) | +| **Serverless-First** | Yes | No (needs long-running process) | Yes (cloud) | +| **Self-Hosted** | Yes | Yes (your Redis) | Yes (containers) | +| **Cloud Option** | - | - | Yes | +| **License** | MIT | MIT | Apache-2.0 | +| **Pricing** | Free (OSS) | Free (OSS) | Free tier + paid plans | +| **Infrastructure** | Your own Postgres or Redis | Your own Redis | Their cloud or your infra | ## Where DataQueue shines @@ -4297,6 +4307,7 @@ export async function GET(request: Request) { workerId: `cron-${Date.now()}`, batchSize: 10, // up to 10 jobs per batch concurrency: 3, // up to 3 jobs processed in parallel + groupConcurrency: 2, // optional: max 2 concurrent jobs per group.id globally verbose: true, }); @@ -4332,6 +4343,26 @@ Some jobs are resource-intensive, like image processing, LLM calls, or calling a The default is `3`. Set it to `1` to process jobs one at a time. Use a lower value to avoid exhausting resources in constrained environments. +### Group-Based Concurrency + +When jobs should be limited per tenant/customer/account across all workers, set `groupConcurrency` on the processor and provide `group.id` when enqueuing jobs: + +```typescript +await jobQueue.addJob({ + jobType: 'send_email', + payload: { to: 'user@example.com' }, + group: { id: 'tenant_123' }, +}); + +const processor = jobQueue.createProcessor(jobHandlers, { + batchSize: 20, + concurrency: 10, + groupConcurrency: 2, +}); +``` + +With this configuration, at most 2 jobs from the same `group.id` run at once globally, while ungrouped jobs continue to flow normally. + ### Triggering the Processor via Cron Defining an endpoint isn't enough—you need to trigger it regularly. For example, use Vercel cron to trigger the endpoint every minute by adding this to your `vercel.json`: @@ -4799,6 +4830,29 @@ For example, with `batchSize: 20`, `concurrency: 10`, `pollInterval: 2000ms`, an - **IO-bound jobs** (API calls, email sending): higher concurrency (5-20) works well since jobs spend most time waiting. - **Rate-limited APIs**: match concurrency to the API's rate limit to avoid throttling. +### Group Concurrency + +`groupConcurrency` caps how many jobs with the same `group.id` can be in `processing` at the same time across all worker instances. + +- Use this for multi-tenant fairness and per-account rate protection. +- It is global coordination (PostgreSQL + Redis), not per-process. +- Ungrouped jobs are unaffected. + +```typescript +await jobQueue.addJob({ + jobType: 'send_email', + payload: { to: 'user@example.com' }, + group: { id: 'tenant_abc' }, +}); + +const processor = jobQueue.createProcessor(jobHandlers, { + batchSize: 30, + concurrency: 10, + groupConcurrency: 2, + pollInterval: 2000, +}); +``` + ### Poll Interval `pollInterval` controls how often the processor checks for new jobs when idle. diff --git a/packages/dataqueue/ai/rules/advanced.md b/packages/dataqueue/ai/rules/advanced.md index 791836e..a384087 100644 --- a/packages/dataqueue/ai/rules/advanced.md +++ b/packages/dataqueue/ai/rules/advanced.md @@ -137,6 +137,7 @@ Events: `job:added`, `job:processing`, `job:completed`, `job:failed` (with `will ## Scaling - Increase `batchSize` and `concurrency` for higher throughput. +- Use `group: { id }` on jobs with `groupConcurrency` on processors when you need global per-tenant/per-account fairness. - Run multiple processor instances with unique `workerId` values — `FOR UPDATE SKIP LOCKED` (PostgreSQL) or Lua scripts (Redis) prevent double-claiming. - Use `jobType` filter for specialized workers. - Use `createSupervisor()` to automate maintenance (reclaim stuck jobs, cleanup, token expiry). Safe to run across multiple instances. diff --git a/packages/dataqueue/ai/rules/basic.md b/packages/dataqueue/ai/rules/basic.md index 47fe711..f91d470 100644 --- a/packages/dataqueue/ai/rules/basic.md +++ b/packages/dataqueue/ai/rules/basic.md @@ -86,7 +86,7 @@ const ids = await queue.addJobs([ // ids[i] corresponds to the i-th input job ``` -Both support `idempotencyKey`, `priority`, `runAt`, `tags`, and `{ db }` for transactional inserts (PostgreSQL only). +Both support `idempotencyKey`, `priority`, `runAt`, `tags`, optional `group: { id, tier? }`, and `{ db }` for transactional inserts (PostgreSQL only). ## Handlers @@ -113,6 +113,7 @@ Handler signature: `(payload: T, signal: AbortSignal, ctx: JobContext) => Promis const processor = queue.createProcessor(handlers, { batchSize: 10, concurrency: 3, + groupConcurrency: 2, // optional global cap per group.id }); await processor.start(); ``` diff --git a/packages/dataqueue/ai/skills/dataqueue-advanced/SKILL.md b/packages/dataqueue/ai/skills/dataqueue-advanced/SKILL.md index 4936a1b..441fd65 100644 --- a/packages/dataqueue/ai/skills/dataqueue-advanced/SKILL.md +++ b/packages/dataqueue/ai/skills/dataqueue-advanced/SKILL.md @@ -239,6 +239,28 @@ await queue.cancelAllUpcomingJobs({ Tag query modes: `'exact'`, `'all'`, `'any'`, `'none'`. +## Group-Based Concurrency + +Use job `group.id` plus processor `groupConcurrency` to enforce a global cap per group across all workers/instances (PostgreSQL and Redis). + +```typescript +await queue.addJob({ + jobType: 'email', + payload: { + /* ... */ + }, + group: { id: 'tenant_abc', tier: 'gold' }, // tier is optional/reserved +}); + +const processor = queue.createProcessor(handlers, { + batchSize: 20, + concurrency: 10, + groupConcurrency: 2, +}); +``` + +Ungrouped jobs are unaffected by `groupConcurrency`. + ## Idempotency ```typescript diff --git a/packages/dataqueue/ai/skills/dataqueue-core/SKILL.md b/packages/dataqueue/ai/skills/dataqueue-core/SKILL.md index 3ee6ae4..586b6f3 100644 --- a/packages/dataqueue/ai/skills/dataqueue-core/SKILL.md +++ b/packages/dataqueue/ai/skills/dataqueue-core/SKILL.md @@ -114,6 +114,7 @@ const jobId = await queue.addJob({ runAt: new Date(Date.now() + 5000), tags: ['welcome'], idempotencyKey: 'welcome-user-123', + group: { id: 'tenant_123' }, // optional: for global per-group concurrency limits }); ``` @@ -190,6 +191,7 @@ await queue.addJob({ const processor = queue.createProcessor(handlers, { batchSize: 10, concurrency: 3, + groupConcurrency: 2, // optional global cap per group.id across all workers }); const processed = await processor.start(); ``` diff --git a/packages/dataqueue/migrations/1781200000007_add_group_fields_to_job_queue.sql b/packages/dataqueue/migrations/1781200000007_add_group_fields_to_job_queue.sql new file mode 100644 index 0000000..9f57598 --- /dev/null +++ b/packages/dataqueue/migrations/1781200000007_add_group_fields_to_job_queue.sql @@ -0,0 +1,16 @@ +-- Up Migration: Add group metadata fields for group-based concurrency limits +ALTER TABLE job_queue + ADD COLUMN IF NOT EXISTS group_id VARCHAR(255), + ADD COLUMN IF NOT EXISTS group_tier VARCHAR(255); + +-- Index for efficient active-group concurrency checks +CREATE INDEX IF NOT EXISTS idx_job_queue_processing_group_id + ON job_queue (group_id) + WHERE status = 'processing' AND group_id IS NOT NULL; + +-- Down Migration +DROP INDEX IF EXISTS idx_job_queue_processing_group_id; + +ALTER TABLE job_queue + DROP COLUMN IF EXISTS group_tier, + DROP COLUMN IF EXISTS group_id; diff --git a/packages/dataqueue/src/backend.ts b/packages/dataqueue/src/backend.ts index 6ccf2a2..0b0ce1f 100644 --- a/packages/dataqueue/src/backend.ts +++ b/packages/dataqueue/src/backend.ts @@ -146,6 +146,7 @@ export interface QueueBackend { workerId: string, batchSize?: number, jobType?: string | string[], + groupConcurrency?: number, ): Promise[]>; /** Mark a job as completed, optionally storing output data. */ diff --git a/packages/dataqueue/src/backends/postgres.ts b/packages/dataqueue/src/backends/postgres.ts index e7475ae..7c489c5 100644 --- a/packages/dataqueue/src/backends/postgres.ts +++ b/packages/dataqueue/src/backends/postgres.ts @@ -127,6 +127,7 @@ export class PostgresBackend implements QueueBackend { retryDelay = undefined, retryBackoff = undefined, retryDelayMax = undefined, + group = undefined, }: JobOptions, options?: AddJobOptions, ): Promise { @@ -142,8 +143,8 @@ export class PostgresBackend implements QueueBackend { if (runAt) { result = await client.query( `INSERT INTO job_queue - (job_type, payload, max_attempts, priority, run_at, timeout_ms, force_kill_on_timeout, tags, idempotency_key, retry_delay, retry_backoff, retry_delay_max) - VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12) + (job_type, payload, max_attempts, priority, run_at, timeout_ms, force_kill_on_timeout, tags, idempotency_key, retry_delay, retry_backoff, retry_delay_max, group_id, group_tier) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14) ${onConflict} RETURNING id`, [ @@ -159,13 +160,15 @@ export class PostgresBackend implements QueueBackend { retryDelay ?? null, retryBackoff ?? null, retryDelayMax ?? null, + group?.id ?? null, + group?.tier ?? null, ], ); } else { result = await client.query( `INSERT INTO job_queue - (job_type, payload, max_attempts, priority, timeout_ms, force_kill_on_timeout, tags, idempotency_key, retry_delay, retry_backoff, retry_delay_max) - VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11) + (job_type, payload, max_attempts, priority, timeout_ms, force_kill_on_timeout, tags, idempotency_key, retry_delay, retry_backoff, retry_delay_max, group_id, group_tier) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13) ${onConflict} RETURNING id`, [ @@ -180,6 +183,8 @@ export class PostgresBackend implements QueueBackend { retryDelay ?? null, retryBackoff ?? null, retryDelayMax ?? null, + group?.id ?? null, + group?.tier ?? null, ], ); } @@ -251,7 +256,7 @@ export class PostgresBackend implements QueueBackend { const client: DatabaseClient = externalClient ?? (await this.pool.connect()); try { - const COLS_PER_JOB = 12; + const COLS_PER_JOB = 14; const valueClauses: string[] = []; const params: any[] = []; @@ -271,6 +276,7 @@ export class PostgresBackend implements QueueBackend { retryDelay = undefined, retryBackoff = undefined, retryDelayMax = undefined, + group = undefined, } = jobs[i]; const base = i * COLS_PER_JOB; @@ -278,7 +284,7 @@ export class PostgresBackend implements QueueBackend { `($${base + 1}, $${base + 2}, $${base + 3}, $${base + 4}, ` + `COALESCE($${base + 5}::timestamptz, CURRENT_TIMESTAMP), ` + `$${base + 6}, $${base + 7}, $${base + 8}, $${base + 9}, ` + - `$${base + 10}, $${base + 11}, $${base + 12})`, + `$${base + 10}, $${base + 11}, $${base + 12}, $${base + 13}, $${base + 14})`, ); params.push( jobType, @@ -293,6 +299,8 @@ export class PostgresBackend implements QueueBackend { retryDelay ?? null, retryBackoff ?? null, retryDelayMax ?? null, + group?.id ?? null, + group?.tier ?? null, ); } @@ -302,7 +310,7 @@ export class PostgresBackend implements QueueBackend { const result = await client.query( `INSERT INTO job_queue - (job_type, payload, max_attempts, priority, run_at, timeout_ms, force_kill_on_timeout, tags, idempotency_key, retry_delay, retry_backoff, retry_delay_max) + (job_type, payload, max_attempts, priority, run_at, timeout_ms, force_kill_on_timeout, tags, idempotency_key, retry_delay, retry_backoff, retry_delay_max, group_id, group_tier) VALUES ${valueClauses.join(', ')} ${onConflict} RETURNING id, idempotency_key`, @@ -424,7 +432,7 @@ export class PostgresBackend implements QueueBackend { const client = await this.pool.connect(); try { const result = await client.query( - `SELECT id, job_type AS "jobType", payload, status, max_attempts AS "maxAttempts", attempts, priority, run_at AS "runAt", timeout_ms AS "timeoutMs", force_kill_on_timeout AS "forceKillOnTimeout", created_at AS "createdAt", updated_at AS "updatedAt", started_at AS "startedAt", completed_at AS "completedAt", last_failed_at AS "lastFailedAt", locked_at AS "lockedAt", locked_by AS "lockedBy", error_history AS "errorHistory", failure_reason AS "failureReason", next_attempt_at AS "nextAttemptAt", last_failed_at AS "lastFailedAt", last_retried_at AS "lastRetriedAt", last_cancelled_at AS "lastCancelledAt", pending_reason AS "pendingReason", tags, idempotency_key AS "idempotencyKey", wait_until AS "waitUntil", wait_token_id AS "waitTokenId", step_data AS "stepData", progress, retry_delay AS "retryDelay", retry_backoff AS "retryBackoff", retry_delay_max AS "retryDelayMax", output FROM job_queue WHERE id = $1`, + `SELECT id, job_type AS "jobType", payload, status, max_attempts AS "maxAttempts", attempts, priority, run_at AS "runAt", timeout_ms AS "timeoutMs", force_kill_on_timeout AS "forceKillOnTimeout", created_at AS "createdAt", updated_at AS "updatedAt", started_at AS "startedAt", completed_at AS "completedAt", last_failed_at AS "lastFailedAt", locked_at AS "lockedAt", locked_by AS "lockedBy", error_history AS "errorHistory", failure_reason AS "failureReason", next_attempt_at AS "nextAttemptAt", last_failed_at AS "lastFailedAt", last_retried_at AS "lastRetriedAt", last_cancelled_at AS "lastCancelledAt", pending_reason AS "pendingReason", tags, idempotency_key AS "idempotencyKey", wait_until AS "waitUntil", wait_token_id AS "waitTokenId", step_data AS "stepData", progress, retry_delay AS "retryDelay", retry_backoff AS "retryBackoff", retry_delay_max AS "retryDelayMax", group_id AS "groupId", group_tier AS "groupTier", output FROM job_queue WHERE id = $1`, [id], ); @@ -458,7 +466,7 @@ export class PostgresBackend implements QueueBackend { const client = await this.pool.connect(); try { const result = await client.query( - `SELECT id, job_type AS "jobType", payload, status, max_attempts AS "maxAttempts", attempts, priority, run_at AS "runAt", timeout_ms AS "timeoutMs", force_kill_on_timeout AS "forceKillOnTimeout", created_at AS "createdAt", updated_at AS "updatedAt", started_at AS "startedAt", completed_at AS "completedAt", last_failed_at AS "lastFailedAt", locked_at AS "lockedAt", locked_by AS "lockedBy", error_history AS "errorHistory", failure_reason AS "failureReason", next_attempt_at AS "nextAttemptAt", last_failed_at AS "lastFailedAt", last_retried_at AS "lastRetriedAt", last_cancelled_at AS "lastCancelledAt", pending_reason AS "pendingReason", idempotency_key AS "idempotencyKey", wait_until AS "waitUntil", wait_token_id AS "waitTokenId", step_data AS "stepData", progress, retry_delay AS "retryDelay", retry_backoff AS "retryBackoff", retry_delay_max AS "retryDelayMax", output FROM job_queue WHERE status = $1 ORDER BY created_at DESC LIMIT $2 OFFSET $3`, + `SELECT id, job_type AS "jobType", payload, status, max_attempts AS "maxAttempts", attempts, priority, run_at AS "runAt", timeout_ms AS "timeoutMs", force_kill_on_timeout AS "forceKillOnTimeout", created_at AS "createdAt", updated_at AS "updatedAt", started_at AS "startedAt", completed_at AS "completedAt", last_failed_at AS "lastFailedAt", locked_at AS "lockedAt", locked_by AS "lockedBy", error_history AS "errorHistory", failure_reason AS "failureReason", next_attempt_at AS "nextAttemptAt", last_failed_at AS "lastFailedAt", last_retried_at AS "lastRetriedAt", last_cancelled_at AS "lastCancelledAt", pending_reason AS "pendingReason", idempotency_key AS "idempotencyKey", wait_until AS "waitUntil", wait_token_id AS "waitTokenId", step_data AS "stepData", progress, retry_delay AS "retryDelay", retry_backoff AS "retryBackoff", retry_delay_max AS "retryDelayMax", group_id AS "groupId", group_tier AS "groupTier", output FROM job_queue WHERE status = $1 ORDER BY created_at DESC LIMIT $2 OFFSET $3`, [status, limit, offset], ); log(`Found ${result.rows.length} jobs by status ${status}`); @@ -484,7 +492,7 @@ export class PostgresBackend implements QueueBackend { const client = await this.pool.connect(); try { const result = await client.query( - `SELECT id, job_type AS "jobType", payload, status, max_attempts AS "maxAttempts", attempts, priority, run_at AS "runAt", timeout_ms AS "timeoutMs", force_kill_on_timeout AS "forceKillOnTimeout", created_at AS "createdAt", updated_at AS "updatedAt", started_at AS "startedAt", completed_at AS "completedAt", last_failed_at AS "lastFailedAt", locked_at AS "lockedAt", locked_by AS "lockedBy", error_history AS "errorHistory", failure_reason AS "failureReason", next_attempt_at AS "nextAttemptAt", last_failed_at AS "lastFailedAt", last_retried_at AS "lastRetriedAt", last_cancelled_at AS "lastCancelledAt", pending_reason AS "pendingReason", idempotency_key AS "idempotencyKey", wait_until AS "waitUntil", wait_token_id AS "waitTokenId", step_data AS "stepData", progress, retry_delay AS "retryDelay", retry_backoff AS "retryBackoff", retry_delay_max AS "retryDelayMax", output FROM job_queue ORDER BY created_at DESC LIMIT $1 OFFSET $2`, + `SELECT id, job_type AS "jobType", payload, status, max_attempts AS "maxAttempts", attempts, priority, run_at AS "runAt", timeout_ms AS "timeoutMs", force_kill_on_timeout AS "forceKillOnTimeout", created_at AS "createdAt", updated_at AS "updatedAt", started_at AS "startedAt", completed_at AS "completedAt", last_failed_at AS "lastFailedAt", locked_at AS "lockedAt", locked_by AS "lockedBy", error_history AS "errorHistory", failure_reason AS "failureReason", next_attempt_at AS "nextAttemptAt", last_failed_at AS "lastFailedAt", last_retried_at AS "lastRetriedAt", last_cancelled_at AS "lastCancelledAt", pending_reason AS "pendingReason", idempotency_key AS "idempotencyKey", wait_until AS "waitUntil", wait_token_id AS "waitTokenId", step_data AS "stepData", progress, retry_delay AS "retryDelay", retry_backoff AS "retryBackoff", retry_delay_max AS "retryDelayMax", group_id AS "groupId", group_tier AS "groupTier", output FROM job_queue ORDER BY created_at DESC LIMIT $1 OFFSET $2`, [limit, offset], ); log(`Found ${result.rows.length} jobs (all)`); @@ -509,7 +517,7 @@ export class PostgresBackend implements QueueBackend { ): Promise[]> { const client = await this.pool.connect(); try { - let query = `SELECT id, job_type AS "jobType", payload, status, max_attempts AS "maxAttempts", attempts, priority, run_at AS "runAt", timeout_ms AS "timeoutMs", force_kill_on_timeout AS "forceKillOnTimeout", created_at AS "createdAt", updated_at AS "updatedAt", started_at AS "startedAt", completed_at AS "completedAt", last_failed_at AS "lastFailedAt", locked_at AS "lockedAt", locked_by AS "lockedBy", error_history AS "errorHistory", failure_reason AS "failureReason", next_attempt_at AS "nextAttemptAt", last_failed_at AS "lastFailedAt", last_retried_at AS "lastRetriedAt", last_cancelled_at AS "lastCancelledAt", pending_reason AS "pendingReason", tags, idempotency_key AS "idempotencyKey", wait_until AS "waitUntil", wait_token_id AS "waitTokenId", step_data AS "stepData", progress, retry_delay AS "retryDelay", retry_backoff AS "retryBackoff", retry_delay_max AS "retryDelayMax", output FROM job_queue`; + let query = `SELECT id, job_type AS "jobType", payload, status, max_attempts AS "maxAttempts", attempts, priority, run_at AS "runAt", timeout_ms AS "timeoutMs", force_kill_on_timeout AS "forceKillOnTimeout", created_at AS "createdAt", updated_at AS "updatedAt", started_at AS "startedAt", completed_at AS "completedAt", last_failed_at AS "lastFailedAt", locked_at AS "lockedAt", locked_by AS "lockedBy", error_history AS "errorHistory", failure_reason AS "failureReason", next_attempt_at AS "nextAttemptAt", last_failed_at AS "lastFailedAt", last_retried_at AS "lastRetriedAt", last_cancelled_at AS "lastCancelledAt", pending_reason AS "pendingReason", tags, idempotency_key AS "idempotencyKey", wait_until AS "waitUntil", wait_token_id AS "waitTokenId", step_data AS "stepData", progress, retry_delay AS "retryDelay", retry_backoff AS "retryBackoff", retry_delay_max AS "retryDelayMax", group_id AS "groupId", group_tier AS "groupTier", output FROM job_queue`; const params: any[] = []; const where: string[] = []; let paramIdx = 1; @@ -636,7 +644,7 @@ export class PostgresBackend implements QueueBackend { ): Promise[]> { const client = await this.pool.connect(); try { - let query = `SELECT id, job_type AS "jobType", payload, status, max_attempts AS "maxAttempts", attempts, priority, run_at AS "runAt", timeout_ms AS "timeoutMs", created_at AS "createdAt", updated_at AS "updatedAt", started_at AS "startedAt", completed_at AS "completedAt", last_failed_at AS "lastFailedAt", locked_at AS "lockedAt", locked_by AS "lockedBy", error_history AS "errorHistory", failure_reason AS "failureReason", next_attempt_at AS "nextAttemptAt", last_failed_at AS "lastFailedAt", last_retried_at AS "lastRetriedAt", last_cancelled_at AS "lastCancelledAt", pending_reason AS "pendingReason", tags, idempotency_key AS "idempotencyKey", wait_until AS "waitUntil", wait_token_id AS "waitTokenId", step_data AS "stepData", progress, retry_delay AS "retryDelay", retry_backoff AS "retryBackoff", retry_delay_max AS "retryDelayMax", output + let query = `SELECT id, job_type AS "jobType", payload, status, max_attempts AS "maxAttempts", attempts, priority, run_at AS "runAt", timeout_ms AS "timeoutMs", created_at AS "createdAt", updated_at AS "updatedAt", started_at AS "startedAt", completed_at AS "completedAt", last_failed_at AS "lastFailedAt", locked_at AS "lockedAt", locked_by AS "lockedBy", error_history AS "errorHistory", failure_reason AS "failureReason", next_attempt_at AS "nextAttemptAt", last_failed_at AS "lastFailedAt", last_retried_at AS "lastRetriedAt", last_cancelled_at AS "lastCancelledAt", pending_reason AS "pendingReason", tags, idempotency_key AS "idempotencyKey", wait_until AS "waitUntil", wait_token_id AS "waitTokenId", step_data AS "stepData", progress, retry_delay AS "retryDelay", retry_backoff AS "retryBackoff", retry_delay_max AS "retryDelayMax", group_id AS "groupId", group_tier AS "groupTier", output FROM job_queue`; let params: any[] = []; switch (mode) { @@ -689,6 +697,7 @@ export class PostgresBackend implements QueueBackend { workerId: string, batchSize = 10, jobType?: string | string[], + groupConcurrency?: number, ): Promise[]> { const client = await this.pool.connect(); try { @@ -698,50 +707,121 @@ export class PostgresBackend implements QueueBackend { const params: any[] = [workerId, batchSize]; if (jobType) { if (Array.isArray(jobType)) { - jobTypeFilter = ` AND job_type = ANY($3)`; + jobTypeFilter = ` AND candidate.job_type = ANY($3)`; params.push(jobType); } else { - jobTypeFilter = ` AND job_type = $3`; + jobTypeFilter = ` AND candidate.job_type = $3`; params.push(jobType); } } - const result = await client.query( - ` - UPDATE job_queue - SET status = 'processing', - locked_at = NOW(), - locked_by = $1, - attempts = CASE WHEN status = 'waiting' THEN attempts ELSE attempts + 1 END, - updated_at = NOW(), - pending_reason = NULL, - started_at = COALESCE(started_at, NOW()), - last_retried_at = CASE WHEN status != 'waiting' AND attempts > 0 THEN NOW() ELSE last_retried_at END, - wait_until = NULL - WHERE id IN ( - SELECT id FROM job_queue - WHERE ( - ( - (status = 'pending' OR (status = 'failed' AND next_attempt_at <= NOW())) - AND (attempts < max_attempts) - AND run_at <= NOW() + let result; + if (groupConcurrency === undefined) { + result = await client.query( + ` + UPDATE job_queue + SET status = 'processing', + locked_at = NOW(), + locked_by = $1, + attempts = CASE WHEN status = 'waiting' THEN attempts ELSE attempts + 1 END, + updated_at = NOW(), + pending_reason = NULL, + started_at = COALESCE(started_at, NOW()), + last_retried_at = CASE WHEN status != 'waiting' AND attempts > 0 THEN NOW() ELSE last_retried_at END, + wait_until = NULL + WHERE id IN ( + SELECT id FROM job_queue candidate + WHERE ( + ( + (candidate.status = 'pending' OR (candidate.status = 'failed' AND candidate.next_attempt_at <= NOW())) + AND (candidate.attempts < candidate.max_attempts) + AND candidate.run_at <= NOW() + ) + OR ( + candidate.status = 'waiting' + AND candidate.wait_until IS NOT NULL + AND candidate.wait_until <= NOW() + AND candidate.wait_token_id IS NULL + ) ) - OR ( - status = 'waiting' - AND wait_until IS NOT NULL - AND wait_until <= NOW() - AND wait_token_id IS NULL + ${jobTypeFilter} + ORDER BY candidate.priority DESC, candidate.created_at ASC + LIMIT $2 + FOR UPDATE SKIP LOCKED + ) + RETURNING id, job_type AS "jobType", payload, status, max_attempts AS "maxAttempts", attempts, priority, run_at AS "runAt", timeout_ms AS "timeoutMs", force_kill_on_timeout AS "forceKillOnTimeout", created_at AS "createdAt", updated_at AS "updatedAt", started_at AS "startedAt", completed_at AS "completedAt", last_failed_at AS "lastFailedAt", locked_at AS "lockedAt", locked_by AS "lockedBy", error_history AS "errorHistory", failure_reason AS "failureReason", next_attempt_at AS "nextAttemptAt", last_retried_at AS "lastRetriedAt", last_cancelled_at AS "lastCancelledAt", pending_reason AS "pendingReason", idempotency_key AS "idempotencyKey", wait_until AS "waitUntil", wait_token_id AS "waitTokenId", step_data AS "stepData", progress, retry_delay AS "retryDelay", retry_backoff AS "retryBackoff", retry_delay_max AS "retryDelayMax", group_id AS "groupId", group_tier AS "groupTier", output + `, + params, + ); + } else { + const constrainedParams = [...params, groupConcurrency]; + const groupConcurrencyParamIndex = constrainedParams.length; + result = await client.query( + ` + WITH eligible AS ( + SELECT candidate.id, candidate.group_id, candidate.priority, candidate.created_at + FROM job_queue candidate + WHERE ( + ( + (candidate.status = 'pending' OR (candidate.status = 'failed' AND candidate.next_attempt_at <= NOW())) + AND (candidate.attempts < candidate.max_attempts) + AND candidate.run_at <= NOW() + ) + OR ( + candidate.status = 'waiting' + AND candidate.wait_until IS NOT NULL + AND candidate.wait_until <= NOW() + AND candidate.wait_token_id IS NULL + ) ) + ${jobTypeFilter} + FOR UPDATE SKIP LOCKED + ), + ranked AS ( + SELECT + eligible.id, + eligible.group_id, + eligible.priority, + eligible.created_at, + ROW_NUMBER() OVER ( + PARTITION BY eligible.group_id + ORDER BY eligible.priority DESC, eligible.created_at ASC + ) AS group_rank, + COALESCE(( + SELECT COUNT(*) + FROM job_queue processing_jobs + WHERE processing_jobs.status = 'processing' + AND processing_jobs.group_id = eligible.group_id + ), 0) AS active_group_count + FROM eligible + ), + selected AS ( + SELECT ranked.id + FROM ranked + WHERE ranked.group_id IS NULL + OR ( + ranked.active_group_count < $${groupConcurrencyParamIndex} + AND ranked.group_rank <= ($${groupConcurrencyParamIndex} - ranked.active_group_count) + ) + ORDER BY ranked.priority DESC, ranked.created_at ASC + LIMIT $2 ) - ${jobTypeFilter} - ORDER BY priority DESC, created_at ASC - LIMIT $2 - FOR UPDATE SKIP LOCKED - ) - RETURNING id, job_type AS "jobType", payload, status, max_attempts AS "maxAttempts", attempts, priority, run_at AS "runAt", timeout_ms AS "timeoutMs", force_kill_on_timeout AS "forceKillOnTimeout", created_at AS "createdAt", updated_at AS "updatedAt", started_at AS "startedAt", completed_at AS "completedAt", last_failed_at AS "lastFailedAt", locked_at AS "lockedAt", locked_by AS "lockedBy", error_history AS "errorHistory", failure_reason AS "failureReason", next_attempt_at AS "nextAttemptAt", last_retried_at AS "lastRetriedAt", last_cancelled_at AS "lastCancelledAt", pending_reason AS "pendingReason", idempotency_key AS "idempotencyKey", wait_until AS "waitUntil", wait_token_id AS "waitTokenId", step_data AS "stepData", progress, retry_delay AS "retryDelay", retry_backoff AS "retryBackoff", retry_delay_max AS "retryDelayMax", output - `, - params, - ); + UPDATE job_queue + SET status = 'processing', + locked_at = NOW(), + locked_by = $1, + attempts = CASE WHEN status = 'waiting' THEN attempts ELSE attempts + 1 END, + updated_at = NOW(), + pending_reason = NULL, + started_at = COALESCE(started_at, NOW()), + last_retried_at = CASE WHEN status != 'waiting' AND attempts > 0 THEN NOW() ELSE last_retried_at END, + wait_until = NULL + WHERE id IN (SELECT id FROM selected) + RETURNING id, job_type AS "jobType", payload, status, max_attempts AS "maxAttempts", attempts, priority, run_at AS "runAt", timeout_ms AS "timeoutMs", force_kill_on_timeout AS "forceKillOnTimeout", created_at AS "createdAt", updated_at AS "updatedAt", started_at AS "startedAt", completed_at AS "completedAt", last_failed_at AS "lastFailedAt", locked_at AS "lockedAt", locked_by AS "lockedBy", error_history AS "errorHistory", failure_reason AS "failureReason", next_attempt_at AS "nextAttemptAt", last_retried_at AS "lastRetriedAt", last_cancelled_at AS "lastCancelledAt", pending_reason AS "pendingReason", idempotency_key AS "idempotencyKey", wait_until AS "waitUntil", wait_token_id AS "waitTokenId", step_data AS "stepData", progress, retry_delay AS "retryDelay", retry_backoff AS "retryBackoff", retry_delay_max AS "retryDelayMax", group_id AS "groupId", group_tier AS "groupTier", output + `, + constrainedParams, + ); + } log(`Found ${result.rows.length} jobs to process`); await client.query('COMMIT'); diff --git a/packages/dataqueue/src/backends/redis-scripts.ts b/packages/dataqueue/src/backends/redis-scripts.ts index 000d0ed..e11f2cd 100644 --- a/packages/dataqueue/src/backends/redis-scripts.ts +++ b/packages/dataqueue/src/backends/redis-scripts.ts @@ -32,7 +32,7 @@ const SCORE_RANGE = '1000000000000000'; // 1e15 * KEYS: [prefix] * ARGV: [jobType, payloadJson, maxAttempts, priority, runAtMs, timeoutMs, * forceKillOnTimeout, tagsJson, idempotencyKey, nowMs, - * retryDelay, retryBackoff, retryDelayMax] + * retryDelay, retryBackoff, retryDelayMax, groupId, groupTier] * Returns: job ID (number) */ export const ADD_JOB_SCRIPT = ` @@ -50,6 +50,8 @@ local nowMs = tonumber(ARGV[10]) local retryDelay = ARGV[11] -- "null" or seconds string local retryBackoff = ARGV[12] -- "null" or "true"/"false" local retryDelayMax = ARGV[13] -- "null" or seconds string +local groupId = ARGV[14] -- "null" or group ID +local groupTier = ARGV[15] -- "null" or group tier -- Idempotency check if idempotencyKey ~= "null" then @@ -96,7 +98,9 @@ redis.call('HMSET', jobKey, 'stepData', 'null', 'retryDelay', retryDelay, 'retryBackoff', retryBackoff, - 'retryDelayMax', retryDelayMax + 'retryDelayMax', retryDelayMax, + 'groupId', groupId, + 'groupTier', groupTier ) -- Status index @@ -169,6 +173,8 @@ for i, job in ipairs(jobs) do local retryDelay = tostring(job.retryDelay) local retryBackoff = tostring(job.retryBackoff) local retryDelayMax = tostring(job.retryDelayMax) + local groupId = tostring(job.groupId) + local groupTier = tostring(job.groupTier) -- Idempotency check local skip = false @@ -218,7 +224,9 @@ for i, job in ipairs(jobs) do 'stepData', 'null', 'retryDelay', retryDelay, 'retryBackoff', retryBackoff, - 'retryDelayMax', retryDelayMax + 'retryDelayMax', retryDelayMax, + 'groupId', groupId, + 'groupTier', groupTier ) -- Status index @@ -265,7 +273,7 @@ return results * GET NEXT BATCH * Atomically: move ready delayed/retry jobs into queue, then pop N jobs. * KEYS: [prefix] - * ARGV: [workerId, batchSize, nowMs, jobTypeFilter] + * ARGV: [workerId, batchSize, nowMs, jobTypeFilter, groupConcurrency] * jobTypeFilter: "null" or a JSON array like ["email","sms"] or a string like "email" * Returns: array of job field arrays (flat: [field1, val1, field2, val2, ...] per job) */ @@ -275,6 +283,12 @@ local workerId = ARGV[1] local batchSize = tonumber(ARGV[2]) local nowMs = tonumber(ARGV[3]) local jobTypeFilter = ARGV[4] -- "null" or JSON array or single string +local groupConcurrencyRaw = ARGV[5] -- "null" or positive integer +local groupConcurrency = nil +if groupConcurrencyRaw ~= "null" then + groupConcurrency = tonumber(groupConcurrencyRaw) +end +local groupActiveKey = prefix .. 'group:active' -- 1. Move ready delayed jobs into queue local delayed = redis.call('ZRANGEBYSCORE', prefix .. 'delayed', '-inf', nowMs, 'LIMIT', 0, 200) @@ -375,36 +389,53 @@ for i = 1, #candidates, 2 do -- Not ready yet: move to delayed redis.call('ZADD', prefix .. 'delayed', runAt, jobId) else - -- Claim this job - local attempts = tonumber(redis.call('HGET', jk, 'attempts')) - local startedAt = redis.call('HGET', jk, 'startedAt') - local lastRetriedAt = redis.call('HGET', jk, 'lastRetriedAt') - if startedAt == 'null' then startedAt = nowMs end - if attempts > 0 then lastRetriedAt = nowMs end - - redis.call('HMSET', jk, - 'status', 'processing', - 'lockedAt', nowMs, - 'lockedBy', workerId, - 'attempts', attempts + 1, - 'updatedAt', nowMs, - 'pendingReason', 'null', - 'startedAt', startedAt, - 'lastRetriedAt', lastRetriedAt - ) - - -- Update status sets - redis.call('SREM', prefix .. 'status:pending', jobId) - redis.call('SADD', prefix .. 'status:processing', jobId) + local groupId = redis.call('HGET', jk, 'groupId') + local hasGroup = groupId and groupId ~= 'null' + local canClaim = true + if hasGroup and groupConcurrency then + local activeCount = tonumber(redis.call('HGET', groupActiveKey, groupId) or '0') + if activeCount >= groupConcurrency then + table.insert(putBack, score) + table.insert(putBack, jobId) + canClaim = false + end + end - -- Return job data as flat array - local data = redis.call('HGETALL', jk) - for _, v in ipairs(data) do - table.insert(results, v) + if canClaim then + -- Claim this job + local attempts = tonumber(redis.call('HGET', jk, 'attempts')) + local startedAt = redis.call('HGET', jk, 'startedAt') + local lastRetriedAt = redis.call('HGET', jk, 'lastRetriedAt') + if startedAt == 'null' then startedAt = nowMs end + if attempts > 0 then lastRetriedAt = nowMs end + + redis.call('HMSET', jk, + 'status', 'processing', + 'lockedAt', nowMs, + 'lockedBy', workerId, + 'attempts', attempts + 1, + 'updatedAt', nowMs, + 'pendingReason', 'null', + 'startedAt', startedAt, + 'lastRetriedAt', lastRetriedAt + ) + + -- Update status sets + redis.call('SREM', prefix .. 'status:pending', jobId) + redis.call('SADD', prefix .. 'status:processing', jobId) + if hasGroup and groupConcurrency then + redis.call('HINCRBY', groupActiveKey, groupId, 1) + end + + -- Return job data as flat array + local data = redis.call('HGETALL', jk) + for _, v in ipairs(data) do + table.insert(results, v) + end + -- Separator + table.insert(results, '__JOB_SEP__') + jobsClaimed = jobsClaimed + 1 end - -- Separator - table.insert(results, '__JOB_SEP__') - jobsClaimed = jobsClaimed + 1 end end end @@ -429,6 +460,7 @@ local jobId = ARGV[1] local nowMs = ARGV[2] local outputJson = ARGV[3] local jk = prefix .. 'job:' .. jobId +local groupId = redis.call('HGET', jk, 'groupId') local fields = { 'status', 'completed', @@ -447,6 +479,13 @@ end redis.call('HMSET', jk, unpack(fields)) redis.call('SREM', prefix .. 'status:processing', jobId) redis.call('SADD', prefix .. 'status:completed', jobId) +if groupId and groupId ~= 'null' then + local activeKey = prefix .. 'group:active' + local remaining = redis.call('HINCRBY', activeKey, groupId, -1) + if tonumber(remaining) <= 0 then + redis.call('HDEL', activeKey, groupId) + end +end return 1 `; @@ -464,6 +503,7 @@ local errorJson = ARGV[2] local failureReason = ARGV[3] local nowMs = tonumber(ARGV[4]) local jk = prefix .. 'job:' .. jobId +local groupId = redis.call('HGET', jk, 'groupId') local attempts = tonumber(redis.call('HGET', jk, 'attempts')) local maxAttempts = tonumber(redis.call('HGET', jk, 'maxAttempts')) @@ -521,6 +561,13 @@ redis.call('HMSET', jk, ) redis.call('SREM', prefix .. 'status:processing', jobId) redis.call('SADD', prefix .. 'status:failed', jobId) +if groupId and groupId ~= 'null' then + local activeKey = prefix .. 'group:active' + local remaining = redis.call('HINCRBY', activeKey, groupId, -1) + if tonumber(remaining) <= 0 then + redis.call('HDEL', activeKey, groupId) + end +end -- Schedule retry if applicable if nextAttemptAt ~= 'null' then @@ -543,6 +590,7 @@ local jk = prefix .. 'job:' .. jobId local oldStatus = redis.call('HGET', jk, 'status') if oldStatus ~= 'failed' and oldStatus ~= 'processing' then return 0 end +local groupId = redis.call('HGET', jk, 'groupId') redis.call('HMSET', jk, 'status', 'pending', @@ -556,6 +604,13 @@ redis.call('HMSET', jk, -- Remove from old status, add to pending redis.call('SREM', prefix .. 'status:' .. oldStatus, jobId) redis.call('SADD', prefix .. 'status:pending', jobId) +if oldStatus == 'processing' and groupId and groupId ~= 'null' then + local activeKey = prefix .. 'group:active' + local remaining = redis.call('HINCRBY', activeKey, groupId, -1) + if tonumber(remaining) <= 0 then + redis.call('HDEL', activeKey, groupId) + end +end -- Remove from retry sorted set if present redis.call('ZREM', prefix .. 'retry', jobId) @@ -661,6 +716,14 @@ for _, jobId in ipairs(processing) do ) redis.call('SREM', prefix .. 'status:processing', jobId) redis.call('SADD', prefix .. 'status:pending', jobId) + local groupId = redis.call('HGET', jk, 'groupId') + if groupId and groupId ~= 'null' then + local activeKey = prefix .. 'group:active' + local remaining = redis.call('HINCRBY', activeKey, groupId, -1) + if tonumber(remaining) <= 0 then + redis.call('HDEL', activeKey, groupId) + end + end -- Re-add to queue local priority = tonumber(redis.call('HGET', jk, 'priority') or '0') @@ -749,6 +812,7 @@ local jk = prefix .. 'job:' .. jobId local status = redis.call('HGET', jk, 'status') if status ~= 'processing' then return 0 end +local groupId = redis.call('HGET', jk, 'groupId') redis.call('HMSET', jk, 'status', 'waiting', @@ -761,6 +825,13 @@ redis.call('HMSET', jk, ) redis.call('SREM', prefix .. 'status:processing', jobId) redis.call('SADD', prefix .. 'status:waiting', jobId) +if groupId and groupId ~= 'null' then + local activeKey = prefix .. 'group:active' + local remaining = redis.call('HINCRBY', activeKey, groupId, -1) + if tonumber(remaining) <= 0 then + redis.call('HDEL', activeKey, groupId) + end +end -- Add to waiting sorted set if time-based wait if waitUntilMs ~= 'null' then diff --git a/packages/dataqueue/src/backends/redis.test.ts b/packages/dataqueue/src/backends/redis.test.ts index 6dc490c..fe9cda7 100644 --- a/packages/dataqueue/src/backends/redis.test.ts +++ b/packages/dataqueue/src/backends/redis.test.ts @@ -2180,3 +2180,88 @@ describe('Redis event hooks', () => { expect(listener).toHaveBeenCalledTimes(1); }); }); + +describe('Redis group-based concurrency limits', () => { + let prefix: string; + let jobQueue: ReturnType>; + let redisClient: any; + + beforeEach(async () => { + prefix = createRedisTestPrefix(); + const config: RedisJobQueueConfig = { + backend: 'redis', + redisConfig: { + url: REDIS_URL, + keyPrefix: prefix, + }, + }; + jobQueue = initJobQueue(config); + redisClient = jobQueue.getRedisClient(); + }); + + afterEach(async () => { + await cleanupRedisPrefix(redisClient, prefix); + await redisClient.quit(); + }); + + it('stores group metadata for Redis jobs', async () => { + const jobId = await jobQueue.addJob({ + jobType: 'test', + payload: { foo: 'grouped' }, + group: { id: 'tenant-r1', tier: 'silver' }, + }); + + const job = await jobQueue.getJob(jobId); + expect(job?.groupId).toBe('tenant-r1'); + expect(job?.groupTier).toBe('silver'); + }); + + it('enforces global grouped limits across processor instances', async () => { + await jobQueue.addJob({ + jobType: 'test', + payload: { foo: 'job-1' }, + group: { id: 'tenant-r2' }, + }); + await jobQueue.addJob({ + jobType: 'test', + payload: { foo: 'job-2' }, + group: { id: 'tenant-r2' }, + }); + + let started = 0; + let release!: () => void; + const gate = new Promise((resolve) => { + release = resolve; + }); + + const handler = vi.fn(async () => { + started += 1; + await gate; + }); + + const processorA = jobQueue.createProcessor( + { email: vi.fn(), sms: vi.fn(), test: handler }, + { batchSize: 2, concurrency: 2, groupConcurrency: 1 }, + ); + const processorB = jobQueue.createProcessor( + { email: vi.fn(), sms: vi.fn(), test: handler }, + { batchSize: 2, concurrency: 2, groupConcurrency: 1 }, + ); + + const runA = processorA.start(); + await new Promise((resolve) => setTimeout(resolve, 40)); + const processedByB = await processorB.start(); + + expect(processedByB).toBe(0); + expect(started).toBe(1); + + release(); + await runA; + + const pendingAfterA = await jobQueue.getJobsByStatus('pending'); + expect(pendingAfterA).toHaveLength(1); + + const processedByBSecondRun = await processorB.start(); + expect(processedByBSecondRun).toBe(1); + }); +}); diff --git a/packages/dataqueue/src/backends/redis.ts b/packages/dataqueue/src/backends/redis.ts index 82c9239..d394f31 100644 --- a/packages/dataqueue/src/backends/redis.ts +++ b/packages/dataqueue/src/backends/redis.ts @@ -172,6 +172,8 @@ function deserializeJob>( ? false : null, retryDelayMax: numOrNull(h.retryDelayMax), + groupId: nullish(h.groupId) as string | null | undefined, + groupTier: nullish(h.groupTier) as string | null | undefined, output: parseJsonField(h.output), }; } @@ -317,6 +319,7 @@ export class RedisBackend implements QueueBackend { retryDelay = undefined, retryBackoff = undefined, retryDelayMax = undefined, + group = undefined, }: JobOptions, options?: AddJobOptions, ): Promise { @@ -346,6 +349,8 @@ export class RedisBackend implements QueueBackend { retryDelay !== undefined ? retryDelay.toString() : 'null', retryBackoff !== undefined ? retryBackoff.toString() : 'null', retryDelayMax !== undefined ? retryDelayMax.toString() : 'null', + group?.id ?? 'null', + group?.tier ?? 'null', )) as number; const jobId = Number(result); @@ -397,6 +402,8 @@ export class RedisBackend implements QueueBackend { job.retryBackoff !== undefined ? job.retryBackoff.toString() : 'null', retryDelayMax: job.retryDelayMax !== undefined ? job.retryDelayMax.toString() : 'null', + groupId: job.group?.id ?? 'null', + groupTier: job.group?.tier ?? 'null', })); const result = (await this.client.eval( @@ -555,6 +562,7 @@ export class RedisBackend implements QueueBackend { workerId: string, batchSize = 10, jobType?: string | string[], + groupConcurrency?: number, ): Promise[]> { const now = this.nowMs(); const jobTypeFilter = @@ -572,6 +580,7 @@ export class RedisBackend implements QueueBackend { batchSize, now, jobTypeFilter, + groupConcurrency !== undefined ? groupConcurrency : 'null', )) as string[]; if (!result || result.length === 0) { diff --git a/packages/dataqueue/src/processor.test.ts b/packages/dataqueue/src/processor.test.ts index 6dfc0f6..a9b3c0a 100644 --- a/packages/dataqueue/src/processor.test.ts +++ b/packages/dataqueue/src/processor.test.ts @@ -436,6 +436,24 @@ describe('concurrency option', () => { await processor.start(); expect(maxParallel).toBe(1); }); + + it('should throw when groupConcurrency is not a positive integer', async () => { + const handlers = { test: vi.fn(async () => {}) }; + expect(() => + createProcessor(backend, handlers, { + groupConcurrency: 0, + }), + ).toThrow( + 'Processor option "groupConcurrency" must be a positive integer when provided.', + ); + expect(() => + createProcessor(backend, handlers, { + groupConcurrency: 1.5, + }), + ).toThrow( + 'Processor option "groupConcurrency" must be a positive integer when provided.', + ); + }); }); describe('per-job timeout', () => { diff --git a/packages/dataqueue/src/processor.ts b/packages/dataqueue/src/processor.ts index c49dbef..feb8d43 100644 --- a/packages/dataqueue/src/processor.ts +++ b/packages/dataqueue/src/processor.ts @@ -754,6 +754,7 @@ export async function processJobWithHandlers< * @param jobType - Optional job type filter. * @param jobHandlers - Map of job type to handler function. * @param concurrency - Max parallel jobs within the batch. + * @param groupConcurrency - Optional global per-group concurrency limit. * @param onError - Legacy error callback. * @param emit - Optional callback to emit lifecycle events. */ @@ -764,6 +765,7 @@ export async function processBatchWithHandlers( jobType: string | string[] | undefined, jobHandlers: JobHandlers, concurrency?: number, + groupConcurrency?: number, onError?: (error: Error) => void, emit?: QueueEmitFn, ): Promise { @@ -771,6 +773,7 @@ export async function processBatchWithHandlers( workerId, batchSize, jobType, + groupConcurrency, ); // Emit job:processing for each claimed job @@ -842,8 +845,18 @@ export const createProcessor = ( onError = (error: Error) => console.error('Job processor error:', error), jobType, concurrency = 3, + groupConcurrency, } = options; + if ( + groupConcurrency !== undefined && + (!Number.isInteger(groupConcurrency) || groupConcurrency <= 0) + ) { + throw new Error( + 'Processor option "groupConcurrency" must be a positive integer when provided.', + ); + } + let running = false; let intervalId: NodeJS.Timeout | null = null; let currentBatchPromise: Promise | null = null; @@ -880,6 +893,7 @@ export const createProcessor = ( jobType, handlers, concurrency, + groupConcurrency, onError, emit, ); diff --git a/packages/dataqueue/src/queue.test.ts b/packages/dataqueue/src/queue.test.ts index 736eb0d..eccba86 100644 --- a/packages/dataqueue/src/queue.test.ts +++ b/packages/dataqueue/src/queue.test.ts @@ -2436,3 +2436,110 @@ describe('addJobs batch insert', () => { expect(job1).toBeNull(); }); }); + +describe('group-based concurrency limits (Postgres)', () => { + let pool: Pool; + let dbName: string; + + beforeEach(async () => { + const setup = await createTestDbAndPool(); + pool = setup.pool; + dbName = setup.dbName; + }); + + afterEach(async () => { + await pool.end(); + await destroyTestDb(dbName); + }); + + it('stores and returns group metadata on jobs', async () => { + const id = await queue.addJob<{ email: { to: string } }, 'email'>(pool, { + jobType: 'email', + payload: { to: 'grouped@example.com' }, + group: { id: 'tenant-a', tier: 'gold' }, + }); + + const job = await queue.getJob(pool, id); + expect(job?.groupId).toBe('tenant-a'); + expect(job?.groupTier).toBe('gold'); + }); + + it('enforces global per-group limits across workers', async () => { + await queue.addJob<{ email: { to: string } }, 'email'>(pool, { + jobType: 'email', + payload: { to: 'a-1@example.com' }, + group: { id: 'tenant-a' }, + }); + await queue.addJob<{ email: { to: string } }, 'email'>(pool, { + jobType: 'email', + payload: { to: 'a-2@example.com' }, + group: { id: 'tenant-a' }, + }); + await queue.addJob<{ email: { to: string } }, 'email'>(pool, { + jobType: 'email', + payload: { to: 'b-1@example.com' }, + group: { id: 'tenant-b' }, + }); + + const firstBatch = await queue.getNextBatch( + pool, + 'worker-1', + 10, + undefined, + 1, + ); + expect(firstBatch).toHaveLength(2); + expect(new Set(firstBatch.map((job) => job.groupId))).toEqual( + new Set(['tenant-a', 'tenant-b']), + ); + + const blockedBatch = await queue.getNextBatch( + pool, + 'worker-2', + 10, + undefined, + 1, + ); + expect(blockedBatch).toHaveLength(0); + + const groupAJob = firstBatch.find((job) => job.groupId === 'tenant-a'); + expect(groupAJob).toBeDefined(); + await queue.completeJob(pool, groupAJob!.id); + + const resumedBatch = await queue.getNextBatch( + pool, + 'worker-3', + 10, + undefined, + 1, + ); + expect(resumedBatch).toHaveLength(1); + expect(resumedBatch[0]?.groupId).toBe('tenant-a'); + }); + + it('keeps ungrouped jobs unaffected by groupConcurrency', async () => { + await queue.addJob<{ email: { to: string } }, 'email'>(pool, { + jobType: 'email', + payload: { to: 'ungrouped-1@example.com' }, + }); + await queue.addJob<{ email: { to: string } }, 'email'>(pool, { + jobType: 'email', + payload: { to: 'ungrouped-2@example.com' }, + }); + await queue.addJob<{ email: { to: string } }, 'email'>(pool, { + jobType: 'email', + payload: { to: 'grouped-1@example.com' }, + group: { id: 'tenant-c' }, + }); + await queue.addJob<{ email: { to: string } }, 'email'>(pool, { + jobType: 'email', + payload: { to: 'grouped-2@example.com' }, + group: { id: 'tenant-c' }, + }); + + const batch = await queue.getNextBatch(pool, 'worker-u', 10, undefined, 1); + expect(batch).toHaveLength(3); + expect(batch.filter((job) => job.groupId === 'tenant-c')).toHaveLength(1); + expect(batch.filter((job) => !job.groupId)).toHaveLength(2); + }); +}); diff --git a/packages/dataqueue/src/queue.ts b/packages/dataqueue/src/queue.ts index 2dc620d..ef72e68 100644 --- a/packages/dataqueue/src/queue.ts +++ b/packages/dataqueue/src/queue.ts @@ -73,11 +73,13 @@ export const getNextBatch = async < workerId: string, batchSize = 10, jobType?: string | string[], + groupConcurrency?: number, ): Promise[]> => new PostgresBackend(pool).getNextBatch( workerId, batchSize, jobType, + groupConcurrency, ); export const completeJob = async ( diff --git a/packages/dataqueue/src/types.ts b/packages/dataqueue/src/types.ts index 764d7cd..ad78d0e 100644 --- a/packages/dataqueue/src/types.ts +++ b/packages/dataqueue/src/types.ts @@ -28,6 +28,20 @@ export interface AddJobOptions { db?: DatabaseClient; } +/** + * Optional grouping metadata for a job. + * Use `id` to enforce global per-group concurrency limits when + * `ProcessorOptions.groupConcurrency` is set. + * + * `tier` is reserved for future tier-based policies. + */ +export interface JobGroup { + /** Stable group identifier (for example: tenant ID, user ID, organization ID). */ + id: string; + /** Optional tier label reserved for future tier-based concurrency controls. */ + tier?: string; +} + export interface JobOptions> { jobType: T; payload: PayloadMap[T]; @@ -120,6 +134,12 @@ export interface JobOptions> { * `retryBackoff` is true. No limit when omitted. */ retryDelayMax?: number; + /** + * Optional group metadata for this job. + * When `ProcessorOptions.groupConcurrency` is configured, grouped jobs are + * globally limited by `group.id` across all workers/instances. + */ + group?: JobGroup; } /** @@ -260,6 +280,14 @@ export interface JobRecord> { * Maximum delay cap for retries in seconds, or null if no cap. */ retryDelayMax?: number | null; + /** + * Group identifier for this job, if provided at enqueue time. + */ + groupId?: string | null; + /** + * Group tier for this job, if provided at enqueue time. + */ + groupTier?: string | null; } /** @@ -476,6 +504,13 @@ export interface ProcessorOptions { * - Set to a lower value to avoid resource exhaustion. */ concurrency?: number; + /** + * Global per-group concurrency limit across all workers/instances. + * - Applies only to jobs with `group.id` set. + * - Jobs without a group are unaffected. + * - Disabled when omitted. + */ + groupConcurrency?: number; /** * The interval in milliseconds to poll for new jobs. * - If not provided, the processor will process jobs every 5 seconds when startInBackground is called.