From 7babc308b8e4b0bf6d0815b2808834cf46d3590f Mon Sep 17 00:00:00 2001 From: Nico Prananta <311343+nicnocquee@users.noreply.github.com> Date: Tue, 24 Feb 2026 11:59:51 +0100 Subject: [PATCH 1/2] Add dead-letter job type support in DataQueue - Introduced the `deadLetterJobType` option in the `JobOptions` interface, allowing users to specify a destination for jobs that exhaust their retry attempts. - Updated the `JobRecord` interface to include fields for tracking dead-letter job metadata, such as `deadLetteredAt` and `deadLetterJobId`. - Enhanced documentation to explain the dead-letter routing feature, including examples of how to configure it and the structure of the envelope payload for dead-letter jobs. - Implemented logic in both PostgreSQL and Redis backends to create dead-letter jobs when retries are exhausted, preserving original job metadata and failure context. - Added tests to verify the correct behavior of dead-letter routing, ensuring that jobs are properly routed and metadata is accurately recorded. These changes improve error handling and job management in DataQueue, providing users with a robust mechanism for dealing with failed jobs. --- apps/docs/content/docs/api/job-options.mdx | 2 + apps/docs/content/docs/api/job-queue.mdx | 5 +- apps/docs/content/docs/api/job-record.mdx | 5 +- .../content/docs/usage/building-with-ai.mdx | 5 +- apps/docs/content/docs/usage/failed-jobs.mdx | 31 +++ packages/dataqueue/ai/rules/advanced.md | 15 ++ packages/dataqueue/ai/rules/basic.md | 16 ++ .../ai/skills/dataqueue-advanced/SKILL.md | 23 +++ .../ai/skills/dataqueue-core/SKILL.md | 16 ++ .../1781200000007_add_dead_letter_columns.sql | 13 ++ packages/dataqueue/src/backend.ts | 2 + packages/dataqueue/src/backends/postgres.ts | 194 +++++++++++++++--- .../dataqueue/src/backends/redis-scripts.ts | 101 ++++++++- packages/dataqueue/src/backends/redis.test.ts | 72 +++++++ packages/dataqueue/src/backends/redis.ts | 42 +++- packages/dataqueue/src/index.test.ts | 54 +++++ packages/dataqueue/src/index.ts | 2 + packages/dataqueue/src/processor.test.ts | 40 ++++ packages/dataqueue/src/queue.test.ts | 41 ++++ packages/dataqueue/src/queue.ts | 2 + packages/dataqueue/src/types.ts | 46 +++++ 21 files changed, 691 insertions(+), 36 deletions(-) create mode 100644 packages/dataqueue/migrations/1781200000007_add_dead_letter_columns.sql diff --git a/apps/docs/content/docs/api/job-options.mdx b/apps/docs/content/docs/api/job-options.mdx index aac7470..5876bdb 100644 --- a/apps/docs/content/docs/api/job-options.mdx +++ b/apps/docs/content/docs/api/job-options.mdx @@ -22,6 +22,7 @@ The `JobOptions` interface defines the options for creating a new job in the que - `tags?`: _string[]_ — Tags for this job. Used for grouping, searching, or batch operations. - `idempotencyKey?`: _string_ — Optional idempotency key. When provided, ensures that only one job exists for a given key. If a job with the same key already exists, `addJob` returns the existing job's ID instead of creating a duplicate. See [Idempotency](/usage/add-job#idempotency) for details. +- `deadLetterJobType?`: _string_ — Optional dead-letter destination job type. When the job exhausts retries, DataQueue creates a new pending job in this job type with an envelope payload containing source metadata, original payload, and failure context. ## Example @@ -36,5 +37,6 @@ const job = { forceKillOnTimeout: false, // Use graceful shutdown (default) tags: ['welcome', 'user'], // tags for grouping/searching idempotencyKey: 'welcome-email-user-123', // prevent duplicate jobs + deadLetterJobType: 'email_dead_letter', // route exhausted failures }; ``` diff --git a/apps/docs/content/docs/api/job-queue.mdx b/apps/docs/content/docs/api/job-queue.mdx index 55cc717..fd037e0 100644 --- a/apps/docs/content/docs/api/job-queue.mdx +++ b/apps/docs/content/docs/api/job-queue.mdx @@ -115,12 +115,14 @@ interface JobOptions { retryDelay?: number; // Base delay between retries in seconds (default: 60) retryBackoff?: boolean; // Use exponential backoff (default: true) retryDelayMax?: number; // Max delay cap in seconds (default: none) + deadLetterJobType?: string; // Route exhausted failures to this job type } ``` - `retryDelay` - Base delay between retries in seconds. When `retryBackoff` is true, this is the base for exponential backoff (`retryDelay * 2^attempts`). When false, retries use this fixed delay. Default: `60`. - `retryBackoff` - Whether to use exponential backoff. When true, delay doubles with each attempt and includes jitter. Default: `true`. - `retryDelayMax` - Maximum delay cap in seconds. Only meaningful when `retryBackoff` is true. No limit when omitted. +- `deadLetterJobType` - Optional dead-letter destination. When retries are exhausted, a new pending job is created in this job type with an envelope payload (`originalJob`, `originalPayload`, `failure`). #### AddJobOptions @@ -275,10 +277,11 @@ interface EditJobOptions { retryDelay?: number | null; retryBackoff?: boolean | null; retryDelayMax?: number | null; + deadLetterJobType?: string | null; } ``` -All fields are optional - only provided fields will be updated. Note that `jobType` cannot be changed. Set retry fields to `null` to revert to legacy default behavior. +All fields are optional - only provided fields will be updated. Note that `jobType` cannot be changed. Set retry fields to `null` to revert to legacy default behavior. Set `deadLetterJobType` to `null` to clear dead-letter routing for pending jobs. #### Example diff --git a/apps/docs/content/docs/api/job-record.mdx b/apps/docs/content/docs/api/job-record.mdx index f037b79..7fccd34 100644 --- a/apps/docs/content/docs/api/job-record.mdx +++ b/apps/docs/content/docs/api/job-record.mdx @@ -10,7 +10,7 @@ The `JobRecord` interface represents a job stored in the queue, including its st - `jobType`: _string_ — The type of the job. - `payload`: _any_ — The job payload. - `status`: - _'pending' | 'processing' | 'completed' | 'failed' | 'cancelled'_ — + _'pending' | 'processing' | 'completed' | 'failed' | 'cancelled' | 'waiting'_ — Current job status. - `createdAt`: _Date_ — When the job was created. - `updated_at`: _Date_ — When the job was last updated. @@ -42,6 +42,9 @@ The `JobRecord` interface represents a job stored in the queue, including its st - `idempotencyKey?`: _string | null_ — The idempotency key for this job, if one was provided when the job was created. - `progress?`: _number | null_ — Progress percentage (0–100) reported by the handler via `ctx.setProgress()`. `null` if no progress has been reported. See [Progress Tracking](/usage/progress-tracking). - `output?`: _unknown_ — Handler output stored via `ctx.setOutput(data)` or by returning a value from the handler. `null` if no output has been stored. See [Job Output](/usage/job-output). +- `deadLetterJobType?`: _string | null_ — Configured dead-letter destination job type for this job. +- `deadLetteredAt?`: _Date | null_ — Timestamp when this job was routed to a dead-letter job. +- `deadLetterJobId?`: _number | null_ — Linked dead-letter job ID created when retries were exhausted. ## Example diff --git a/apps/docs/content/docs/usage/building-with-ai.mdx b/apps/docs/content/docs/usage/building-with-ai.mdx index 6e1b520..7f0dae8 100644 --- a/apps/docs/content/docs/usage/building-with-ai.mdx +++ b/apps/docs/content/docs/usage/building-with-ai.mdx @@ -140,6 +140,7 @@ Type handlers as `JobHandlers` — TypeScript enforces a handler for 1. Creating initJobQueue per request (creates a DB pool each time) 2. Missing handler for a job type (fails with NoHandler) 3. Not checking signal.aborted in long handlers -4. Forgetting reclaimStuckJobs() — crashed workers leave jobs stuck -5. Skipping migrations (PostgreSQL requires `dataqueue-cli migrate`) +4. Forgetting dead-letter routing for critical jobs — set `deadLetterJobType` so exhausted failures are inspectable/replayable +5. Forgetting reclaimStuckJobs() — crashed workers leave jobs stuck +6. Skipping migrations (PostgreSQL requires `dataqueue-cli migrate`) ``` diff --git a/apps/docs/content/docs/usage/failed-jobs.mdx b/apps/docs/content/docs/usage/failed-jobs.mdx index 623c193..b208a37 100644 --- a/apps/docs/content/docs/usage/failed-jobs.mdx +++ b/apps/docs/content/docs/usage/failed-jobs.mdx @@ -6,6 +6,37 @@ A job handler can fail for many reasons, such as a bug in the code or running ou When a job fails, it is marked as `failed` and retried up to `maxAttempts` times (default: 3). You can view the error history for a job in its `errorHistory` field. +## Dead-letter queues + +You can route permanently failed jobs to a dead-letter job type using `deadLetterJobType`. + +When a job exhausts retries (`attempts >= maxAttempts`), DataQueue: + +1. Keeps the source job as `failed`. +2. Creates a new pending dead-letter job in `deadLetterJobType`. +3. Stores linkage metadata on the source job (`deadLetteredAt`, `deadLetterJobId`). + +```ts +await jobQueue.addJob({ + jobType: 'email', + payload: { to: 'user@example.com' }, + maxAttempts: 3, + deadLetterJobType: 'email_dead_letter', +}); +``` + +The dead-letter job payload is an envelope: + +```ts +{ + originalJob: { id, jobType, attempts, maxAttempts }, + originalPayload: { ... }, // original job payload + failure: { message, reason, failedAt }, +} +``` + +If `deadLetterJobType` is not set, behavior is unchanged: exhausted jobs remain failed without creating a dead-letter job. + ## Retry configuration You can control the retry behavior per-job using three options: diff --git a/packages/dataqueue/ai/rules/advanced.md b/packages/dataqueue/ai/rules/advanced.md index 791836e..1f3a4bb 100644 --- a/packages/dataqueue/ai/rules/advanced.md +++ b/packages/dataqueue/ai/rules/advanced.md @@ -116,6 +116,21 @@ await queue.addJob({ - No config — legacy `2^attempts * 60s` formula (backward compatible). - Cron schedules propagate retry config to enqueued jobs. +## Dead-Letter Routing + +Configure dead-letter capture with `deadLetterJobType`: + +```typescript +await queue.addJob({ + jobType: 'email', + payload, + maxAttempts: 3, + deadLetterJobType: 'email_dead_letter', +}); +``` + +When retries are exhausted, DataQueue creates a pending dead-letter job with envelope payload containing `originalJob`, `originalPayload`, and `failure`. Source jobs remain `failed` and store linkage metadata (`deadLetteredAt`, `deadLetterJobId`). + ## Event Hooks Subscribe to real-time lifecycle events via `on`, `once`, `off`, `removeAllListeners`. Works with both Postgres and Redis. diff --git a/packages/dataqueue/ai/rules/basic.md b/packages/dataqueue/ai/rules/basic.md index 47fe711..8d792a6 100644 --- a/packages/dataqueue/ai/rules/basic.md +++ b/packages/dataqueue/ai/rules/basic.md @@ -149,6 +149,21 @@ Control retry behavior per-job with optional fields on `addJob`: When none are set, the legacy `2^attempts * 60s` formula is used. +## Dead-Letter Queue + +Use `deadLetterJobType` for jobs that must be captured after exhausting retries: + +```typescript +await queue.addJob({ + jobType: 'email', + payload: { to: 'user@example.com', subject: 'Hi', body: '...' }, + maxAttempts: 3, + deadLetterJobType: 'email_dead_letter', +}); +``` + +On exhaustion, the source job stays `failed` and a new pending dead-letter job is created with envelope payload: `{ originalJob, originalPayload, failure }`. + ## Common Mistakes 1. Creating `initJobQueue` per request — use a singleton. @@ -157,3 +172,4 @@ When none are set, the legacy `2^attempts * 60s` formula is used. 4. Skipping maintenance — use `createSupervisor()` to automate reclaim, cleanup, and token expiry. Without it, stuck jobs and old data accumulate. 5. Skipping migrations (PostgreSQL) — run `dataqueue-cli migrate` first. Redis needs none. 6. Using `stop()` instead of `stopAndDrain()` — leaves in-flight jobs stuck. +7. Expecting dead-letter routing without setting `deadLetterJobType` — DLQ is opt-in. diff --git a/packages/dataqueue/ai/skills/dataqueue-advanced/SKILL.md b/packages/dataqueue/ai/skills/dataqueue-advanced/SKILL.md index 4936a1b..30be8e9 100644 --- a/packages/dataqueue/ai/skills/dataqueue-advanced/SKILL.md +++ b/packages/dataqueue/ai/skills/dataqueue-advanced/SKILL.md @@ -342,6 +342,29 @@ await queue.addCronJob({ Every job enqueued by the schedule inherits the retry settings. +### Dead-letter routing + +Set `deadLetterJobType` on jobs (or cron schedules) to route exhausted failures: + +```typescript +await queue.addJob({ + jobType: 'email', + payload: { to: 'user@example.com' }, + maxAttempts: 3, + deadLetterJobType: 'email_dead_letter', +}); +``` + +Dead-letter jobs receive envelope payload: + +```typescript +{ + originalJob: { id, jobType, attempts, maxAttempts }, + originalPayload: {...}, + failure: { message, reason, failedAt } +} +``` + ### Default behavior When no retry options are set, the legacy formula `2^attempts * 60 seconds` is used. This is fully backward compatible. diff --git a/packages/dataqueue/ai/skills/dataqueue-core/SKILL.md b/packages/dataqueue/ai/skills/dataqueue-core/SKILL.md index 3ee6ae4..ee2b972 100644 --- a/packages/dataqueue/ai/skills/dataqueue-core/SKILL.md +++ b/packages/dataqueue/ai/skills/dataqueue-core/SKILL.md @@ -182,6 +182,21 @@ await queue.addJob({ - **Exponential backoff** (default): delay doubles each attempt with jitter. - **Default**: when no retry options are set, legacy `2^attempts * 60s` is used. +### Dead-letter queues + +Route exhausted failures into a dedicated job type with `deadLetterJobType`: + +```typescript +await queue.addJob({ + jobType: 'send_email', + payload: { to: 'user@example.com', subject: 'Hi', body: 'Hello' }, + maxAttempts: 3, + deadLetterJobType: 'email_dead_letter', +}); +``` + +When retries are exhausted, DataQueue keeps the source job as `failed` and creates a new pending dead-letter job with envelope payload: `{ originalJob, originalPayload, failure }`. + ## Step 5: Process Jobs ### Serverless (one-shot) @@ -233,3 +248,4 @@ process.on('SIGTERM', async () => { 6. **Not calling `stopAndDrain` on shutdown** — use `stopAndDrain()` (not `stop()`) for graceful shutdown to avoid stuck jobs. 7. **Forgetting to commit/rollback when using `db` option** — the `addJob` INSERT sits in an open transaction. If you never `COMMIT` or `ROLLBACK`, the connection leaks and the job is invisible to other sessions. 8. **Using `db` option with Redis** — transactional job creation is PostgreSQL only. The Redis backend throws if `db` is provided. +9. **Expecting dead-letter routing without configuration** — DLQ is opt-in. Set `deadLetterJobType` on jobs (or cron schedules) that require dead-letter capture. diff --git a/packages/dataqueue/migrations/1781200000007_add_dead_letter_columns.sql b/packages/dataqueue/migrations/1781200000007_add_dead_letter_columns.sql new file mode 100644 index 0000000..6653ebe --- /dev/null +++ b/packages/dataqueue/migrations/1781200000007_add_dead_letter_columns.sql @@ -0,0 +1,13 @@ +-- Up Migration +ALTER TABLE job_queue ADD COLUMN IF NOT EXISTS dead_letter_job_type VARCHAR(255); +ALTER TABLE job_queue ADD COLUMN IF NOT EXISTS dead_lettered_at TIMESTAMPTZ; +ALTER TABLE job_queue ADD COLUMN IF NOT EXISTS dead_letter_job_id INT; + +ALTER TABLE cron_schedules ADD COLUMN IF NOT EXISTS dead_letter_job_type VARCHAR(255); + +-- Down Migration +ALTER TABLE job_queue DROP COLUMN IF EXISTS dead_letter_job_type; +ALTER TABLE job_queue DROP COLUMN IF EXISTS dead_lettered_at; +ALTER TABLE job_queue DROP COLUMN IF EXISTS dead_letter_job_id; + +ALTER TABLE cron_schedules DROP COLUMN IF EXISTS dead_letter_job_type; diff --git a/packages/dataqueue/src/backend.ts b/packages/dataqueue/src/backend.ts index 6ccf2a2..0287967 100644 --- a/packages/dataqueue/src/backend.ts +++ b/packages/dataqueue/src/backend.ts @@ -43,6 +43,7 @@ export interface JobUpdates { retryDelay?: number | null; retryBackoff?: boolean | null; retryDelayMax?: number | null; + deadLetterJobType?: string | null; } /** @@ -65,6 +66,7 @@ export interface CronScheduleInput { retryDelay: number | null; retryBackoff: boolean | null; retryDelayMax: number | null; + deadLetterJobType: string | null; } /** diff --git a/packages/dataqueue/src/backends/postgres.ts b/packages/dataqueue/src/backends/postgres.ts index e7475ae..7a4252a 100644 --- a/packages/dataqueue/src/backends/postgres.ts +++ b/packages/dataqueue/src/backends/postgres.ts @@ -14,6 +14,7 @@ import { CreateTokenOptions, AddJobOptions, DatabaseClient, + DeadLetterPayloadEnvelope, } from '../types.js'; import { randomUUID } from 'crypto'; import { @@ -61,6 +62,35 @@ function parseTimeoutString(timeout: string): number { return ms; } +/** + * Build the envelope payload persisted to the dead-letter job. + */ +function buildDeadLetterPayload(params: { + sourceJobId: number; + sourceJobType: string; + attempts: number; + maxAttempts: number; + originalPayload: unknown; + errorMessage: string; + failureReason: FailureReason | null; + failedAtIso: string; +}): DeadLetterPayloadEnvelope { + return { + originalJob: { + id: params.sourceJobId, + jobType: params.sourceJobType, + attempts: params.attempts, + maxAttempts: params.maxAttempts, + }, + originalPayload: params.originalPayload, + failure: { + message: params.errorMessage, + reason: params.failureReason, + failedAt: params.failedAtIso, + }, + }; +} + export class PostgresBackend implements QueueBackend { constructor(private pool: Pool) {} @@ -127,6 +157,7 @@ export class PostgresBackend implements QueueBackend { retryDelay = undefined, retryBackoff = undefined, retryDelayMax = undefined, + deadLetterJobType = undefined, }: JobOptions, options?: AddJobOptions, ): Promise { @@ -142,8 +173,8 @@ export class PostgresBackend implements QueueBackend { if (runAt) { result = await client.query( `INSERT INTO job_queue - (job_type, payload, max_attempts, priority, run_at, timeout_ms, force_kill_on_timeout, tags, idempotency_key, retry_delay, retry_backoff, retry_delay_max) - VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12) + (job_type, payload, max_attempts, priority, run_at, timeout_ms, force_kill_on_timeout, tags, idempotency_key, retry_delay, retry_backoff, retry_delay_max, dead_letter_job_type) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13) ${onConflict} RETURNING id`, [ @@ -159,13 +190,14 @@ export class PostgresBackend implements QueueBackend { retryDelay ?? null, retryBackoff ?? null, retryDelayMax ?? null, + deadLetterJobType ?? null, ], ); } else { result = await client.query( `INSERT INTO job_queue - (job_type, payload, max_attempts, priority, timeout_ms, force_kill_on_timeout, tags, idempotency_key, retry_delay, retry_backoff, retry_delay_max) - VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11) + (job_type, payload, max_attempts, priority, timeout_ms, force_kill_on_timeout, tags, idempotency_key, retry_delay, retry_backoff, retry_delay_max, dead_letter_job_type) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12) ${onConflict} RETURNING id`, [ @@ -180,6 +212,7 @@ export class PostgresBackend implements QueueBackend { retryDelay ?? null, retryBackoff ?? null, retryDelayMax ?? null, + deadLetterJobType ?? null, ], ); } @@ -251,7 +284,7 @@ export class PostgresBackend implements QueueBackend { const client: DatabaseClient = externalClient ?? (await this.pool.connect()); try { - const COLS_PER_JOB = 12; + const COLS_PER_JOB = 13; const valueClauses: string[] = []; const params: any[] = []; @@ -271,6 +304,7 @@ export class PostgresBackend implements QueueBackend { retryDelay = undefined, retryBackoff = undefined, retryDelayMax = undefined, + deadLetterJobType = undefined, } = jobs[i]; const base = i * COLS_PER_JOB; @@ -278,7 +312,7 @@ export class PostgresBackend implements QueueBackend { `($${base + 1}, $${base + 2}, $${base + 3}, $${base + 4}, ` + `COALESCE($${base + 5}::timestamptz, CURRENT_TIMESTAMP), ` + `$${base + 6}, $${base + 7}, $${base + 8}, $${base + 9}, ` + - `$${base + 10}, $${base + 11}, $${base + 12})`, + `$${base + 10}, $${base + 11}, $${base + 12}, $${base + 13})`, ); params.push( jobType, @@ -293,6 +327,7 @@ export class PostgresBackend implements QueueBackend { retryDelay ?? null, retryBackoff ?? null, retryDelayMax ?? null, + deadLetterJobType ?? null, ); } @@ -302,7 +337,7 @@ export class PostgresBackend implements QueueBackend { const result = await client.query( `INSERT INTO job_queue - (job_type, payload, max_attempts, priority, run_at, timeout_ms, force_kill_on_timeout, tags, idempotency_key, retry_delay, retry_backoff, retry_delay_max) + (job_type, payload, max_attempts, priority, run_at, timeout_ms, force_kill_on_timeout, tags, idempotency_key, retry_delay, retry_backoff, retry_delay_max, dead_letter_job_type) VALUES ${valueClauses.join(', ')} ${onConflict} RETURNING id, idempotency_key`, @@ -424,7 +459,7 @@ export class PostgresBackend implements QueueBackend { const client = await this.pool.connect(); try { const result = await client.query( - `SELECT id, job_type AS "jobType", payload, status, max_attempts AS "maxAttempts", attempts, priority, run_at AS "runAt", timeout_ms AS "timeoutMs", force_kill_on_timeout AS "forceKillOnTimeout", created_at AS "createdAt", updated_at AS "updatedAt", started_at AS "startedAt", completed_at AS "completedAt", last_failed_at AS "lastFailedAt", locked_at AS "lockedAt", locked_by AS "lockedBy", error_history AS "errorHistory", failure_reason AS "failureReason", next_attempt_at AS "nextAttemptAt", last_failed_at AS "lastFailedAt", last_retried_at AS "lastRetriedAt", last_cancelled_at AS "lastCancelledAt", pending_reason AS "pendingReason", tags, idempotency_key AS "idempotencyKey", wait_until AS "waitUntil", wait_token_id AS "waitTokenId", step_data AS "stepData", progress, retry_delay AS "retryDelay", retry_backoff AS "retryBackoff", retry_delay_max AS "retryDelayMax", output FROM job_queue WHERE id = $1`, + `SELECT id, job_type AS "jobType", payload, status, max_attempts AS "maxAttempts", attempts, priority, run_at AS "runAt", timeout_ms AS "timeoutMs", force_kill_on_timeout AS "forceKillOnTimeout", created_at AS "createdAt", updated_at AS "updatedAt", started_at AS "startedAt", completed_at AS "completedAt", last_failed_at AS "lastFailedAt", locked_at AS "lockedAt", locked_by AS "lockedBy", error_history AS "errorHistory", failure_reason AS "failureReason", next_attempt_at AS "nextAttemptAt", last_failed_at AS "lastFailedAt", last_retried_at AS "lastRetriedAt", last_cancelled_at AS "lastCancelledAt", pending_reason AS "pendingReason", tags, idempotency_key AS "idempotencyKey", wait_until AS "waitUntil", wait_token_id AS "waitTokenId", step_data AS "stepData", progress, retry_delay AS "retryDelay", retry_backoff AS "retryBackoff", retry_delay_max AS "retryDelayMax", dead_letter_job_type AS "deadLetterJobType", dead_lettered_at AS "deadLetteredAt", dead_letter_job_id AS "deadLetterJobId", output FROM job_queue WHERE id = $1`, [id], ); @@ -458,7 +493,7 @@ export class PostgresBackend implements QueueBackend { const client = await this.pool.connect(); try { const result = await client.query( - `SELECT id, job_type AS "jobType", payload, status, max_attempts AS "maxAttempts", attempts, priority, run_at AS "runAt", timeout_ms AS "timeoutMs", force_kill_on_timeout AS "forceKillOnTimeout", created_at AS "createdAt", updated_at AS "updatedAt", started_at AS "startedAt", completed_at AS "completedAt", last_failed_at AS "lastFailedAt", locked_at AS "lockedAt", locked_by AS "lockedBy", error_history AS "errorHistory", failure_reason AS "failureReason", next_attempt_at AS "nextAttemptAt", last_failed_at AS "lastFailedAt", last_retried_at AS "lastRetriedAt", last_cancelled_at AS "lastCancelledAt", pending_reason AS "pendingReason", idempotency_key AS "idempotencyKey", wait_until AS "waitUntil", wait_token_id AS "waitTokenId", step_data AS "stepData", progress, retry_delay AS "retryDelay", retry_backoff AS "retryBackoff", retry_delay_max AS "retryDelayMax", output FROM job_queue WHERE status = $1 ORDER BY created_at DESC LIMIT $2 OFFSET $3`, + `SELECT id, job_type AS "jobType", payload, status, max_attempts AS "maxAttempts", attempts, priority, run_at AS "runAt", timeout_ms AS "timeoutMs", force_kill_on_timeout AS "forceKillOnTimeout", created_at AS "createdAt", updated_at AS "updatedAt", started_at AS "startedAt", completed_at AS "completedAt", last_failed_at AS "lastFailedAt", locked_at AS "lockedAt", locked_by AS "lockedBy", error_history AS "errorHistory", failure_reason AS "failureReason", next_attempt_at AS "nextAttemptAt", last_failed_at AS "lastFailedAt", last_retried_at AS "lastRetriedAt", last_cancelled_at AS "lastCancelledAt", pending_reason AS "pendingReason", idempotency_key AS "idempotencyKey", wait_until AS "waitUntil", wait_token_id AS "waitTokenId", step_data AS "stepData", progress, retry_delay AS "retryDelay", retry_backoff AS "retryBackoff", retry_delay_max AS "retryDelayMax", dead_letter_job_type AS "deadLetterJobType", dead_lettered_at AS "deadLetteredAt", dead_letter_job_id AS "deadLetterJobId", output FROM job_queue WHERE status = $1 ORDER BY created_at DESC LIMIT $2 OFFSET $3`, [status, limit, offset], ); log(`Found ${result.rows.length} jobs by status ${status}`); @@ -484,7 +519,7 @@ export class PostgresBackend implements QueueBackend { const client = await this.pool.connect(); try { const result = await client.query( - `SELECT id, job_type AS "jobType", payload, status, max_attempts AS "maxAttempts", attempts, priority, run_at AS "runAt", timeout_ms AS "timeoutMs", force_kill_on_timeout AS "forceKillOnTimeout", created_at AS "createdAt", updated_at AS "updatedAt", started_at AS "startedAt", completed_at AS "completedAt", last_failed_at AS "lastFailedAt", locked_at AS "lockedAt", locked_by AS "lockedBy", error_history AS "errorHistory", failure_reason AS "failureReason", next_attempt_at AS "nextAttemptAt", last_failed_at AS "lastFailedAt", last_retried_at AS "lastRetriedAt", last_cancelled_at AS "lastCancelledAt", pending_reason AS "pendingReason", idempotency_key AS "idempotencyKey", wait_until AS "waitUntil", wait_token_id AS "waitTokenId", step_data AS "stepData", progress, retry_delay AS "retryDelay", retry_backoff AS "retryBackoff", retry_delay_max AS "retryDelayMax", output FROM job_queue ORDER BY created_at DESC LIMIT $1 OFFSET $2`, + `SELECT id, job_type AS "jobType", payload, status, max_attempts AS "maxAttempts", attempts, priority, run_at AS "runAt", timeout_ms AS "timeoutMs", force_kill_on_timeout AS "forceKillOnTimeout", created_at AS "createdAt", updated_at AS "updatedAt", started_at AS "startedAt", completed_at AS "completedAt", last_failed_at AS "lastFailedAt", locked_at AS "lockedAt", locked_by AS "lockedBy", error_history AS "errorHistory", failure_reason AS "failureReason", next_attempt_at AS "nextAttemptAt", last_failed_at AS "lastFailedAt", last_retried_at AS "lastRetriedAt", last_cancelled_at AS "lastCancelledAt", pending_reason AS "pendingReason", idempotency_key AS "idempotencyKey", wait_until AS "waitUntil", wait_token_id AS "waitTokenId", step_data AS "stepData", progress, retry_delay AS "retryDelay", retry_backoff AS "retryBackoff", retry_delay_max AS "retryDelayMax", dead_letter_job_type AS "deadLetterJobType", dead_lettered_at AS "deadLetteredAt", dead_letter_job_id AS "deadLetterJobId", output FROM job_queue ORDER BY created_at DESC LIMIT $1 OFFSET $2`, [limit, offset], ); log(`Found ${result.rows.length} jobs (all)`); @@ -509,7 +544,7 @@ export class PostgresBackend implements QueueBackend { ): Promise[]> { const client = await this.pool.connect(); try { - let query = `SELECT id, job_type AS "jobType", payload, status, max_attempts AS "maxAttempts", attempts, priority, run_at AS "runAt", timeout_ms AS "timeoutMs", force_kill_on_timeout AS "forceKillOnTimeout", created_at AS "createdAt", updated_at AS "updatedAt", started_at AS "startedAt", completed_at AS "completedAt", last_failed_at AS "lastFailedAt", locked_at AS "lockedAt", locked_by AS "lockedBy", error_history AS "errorHistory", failure_reason AS "failureReason", next_attempt_at AS "nextAttemptAt", last_failed_at AS "lastFailedAt", last_retried_at AS "lastRetriedAt", last_cancelled_at AS "lastCancelledAt", pending_reason AS "pendingReason", tags, idempotency_key AS "idempotencyKey", wait_until AS "waitUntil", wait_token_id AS "waitTokenId", step_data AS "stepData", progress, retry_delay AS "retryDelay", retry_backoff AS "retryBackoff", retry_delay_max AS "retryDelayMax", output FROM job_queue`; + let query = `SELECT id, job_type AS "jobType", payload, status, max_attempts AS "maxAttempts", attempts, priority, run_at AS "runAt", timeout_ms AS "timeoutMs", force_kill_on_timeout AS "forceKillOnTimeout", created_at AS "createdAt", updated_at AS "updatedAt", started_at AS "startedAt", completed_at AS "completedAt", last_failed_at AS "lastFailedAt", locked_at AS "lockedAt", locked_by AS "lockedBy", error_history AS "errorHistory", failure_reason AS "failureReason", next_attempt_at AS "nextAttemptAt", last_failed_at AS "lastFailedAt", last_retried_at AS "lastRetriedAt", last_cancelled_at AS "lastCancelledAt", pending_reason AS "pendingReason", tags, idempotency_key AS "idempotencyKey", wait_until AS "waitUntil", wait_token_id AS "waitTokenId", step_data AS "stepData", progress, retry_delay AS "retryDelay", retry_backoff AS "retryBackoff", retry_delay_max AS "retryDelayMax", dead_letter_job_type AS "deadLetterJobType", dead_lettered_at AS "deadLetteredAt", dead_letter_job_id AS "deadLetterJobId", output FROM job_queue`; const params: any[] = []; const where: string[] = []; let paramIdx = 1; @@ -636,7 +671,7 @@ export class PostgresBackend implements QueueBackend { ): Promise[]> { const client = await this.pool.connect(); try { - let query = `SELECT id, job_type AS "jobType", payload, status, max_attempts AS "maxAttempts", attempts, priority, run_at AS "runAt", timeout_ms AS "timeoutMs", created_at AS "createdAt", updated_at AS "updatedAt", started_at AS "startedAt", completed_at AS "completedAt", last_failed_at AS "lastFailedAt", locked_at AS "lockedAt", locked_by AS "lockedBy", error_history AS "errorHistory", failure_reason AS "failureReason", next_attempt_at AS "nextAttemptAt", last_failed_at AS "lastFailedAt", last_retried_at AS "lastRetriedAt", last_cancelled_at AS "lastCancelledAt", pending_reason AS "pendingReason", tags, idempotency_key AS "idempotencyKey", wait_until AS "waitUntil", wait_token_id AS "waitTokenId", step_data AS "stepData", progress, retry_delay AS "retryDelay", retry_backoff AS "retryBackoff", retry_delay_max AS "retryDelayMax", output + let query = `SELECT id, job_type AS "jobType", payload, status, max_attempts AS "maxAttempts", attempts, priority, run_at AS "runAt", timeout_ms AS "timeoutMs", created_at AS "createdAt", updated_at AS "updatedAt", started_at AS "startedAt", completed_at AS "completedAt", last_failed_at AS "lastFailedAt", locked_at AS "lockedAt", locked_by AS "lockedBy", error_history AS "errorHistory", failure_reason AS "failureReason", next_attempt_at AS "nextAttemptAt", last_failed_at AS "lastFailedAt", last_retried_at AS "lastRetriedAt", last_cancelled_at AS "lastCancelledAt", pending_reason AS "pendingReason", tags, idempotency_key AS "idempotencyKey", wait_until AS "waitUntil", wait_token_id AS "waitTokenId", step_data AS "stepData", progress, retry_delay AS "retryDelay", retry_backoff AS "retryBackoff", retry_delay_max AS "retryDelayMax", dead_letter_job_type AS "deadLetterJobType", dead_lettered_at AS "deadLetteredAt", dead_letter_job_id AS "deadLetterJobId", output FROM job_queue`; let params: any[] = []; switch (mode) { @@ -738,7 +773,7 @@ export class PostgresBackend implements QueueBackend { LIMIT $2 FOR UPDATE SKIP LOCKED ) - RETURNING id, job_type AS "jobType", payload, status, max_attempts AS "maxAttempts", attempts, priority, run_at AS "runAt", timeout_ms AS "timeoutMs", force_kill_on_timeout AS "forceKillOnTimeout", created_at AS "createdAt", updated_at AS "updatedAt", started_at AS "startedAt", completed_at AS "completedAt", last_failed_at AS "lastFailedAt", locked_at AS "lockedAt", locked_by AS "lockedBy", error_history AS "errorHistory", failure_reason AS "failureReason", next_attempt_at AS "nextAttemptAt", last_retried_at AS "lastRetriedAt", last_cancelled_at AS "lastCancelledAt", pending_reason AS "pendingReason", idempotency_key AS "idempotencyKey", wait_until AS "waitUntil", wait_token_id AS "waitTokenId", step_data AS "stepData", progress, retry_delay AS "retryDelay", retry_backoff AS "retryBackoff", retry_delay_max AS "retryDelayMax", output + RETURNING id, job_type AS "jobType", payload, status, max_attempts AS "maxAttempts", attempts, priority, run_at AS "runAt", timeout_ms AS "timeoutMs", force_kill_on_timeout AS "forceKillOnTimeout", created_at AS "createdAt", updated_at AS "updatedAt", started_at AS "startedAt", completed_at AS "completedAt", last_failed_at AS "lastFailedAt", locked_at AS "lockedAt", locked_by AS "lockedBy", error_history AS "errorHistory", failure_reason AS "failureReason", next_attempt_at AS "nextAttemptAt", last_retried_at AS "lastRetriedAt", last_cancelled_at AS "lastCancelledAt", pending_reason AS "pendingReason", idempotency_key AS "idempotencyKey", wait_until AS "waitUntil", wait_token_id AS "waitTokenId", step_data AS "stepData", progress, retry_delay AS "retryDelay", retry_backoff AS "retryBackoff", retry_delay_max AS "retryDelayMax", dead_letter_job_type AS "deadLetterJobType", dead_lettered_at AS "deadLetteredAt", dead_letter_job_id AS "deadLetterJobId", output `, params, ); @@ -807,6 +842,9 @@ export class PostgresBackend implements QueueBackend { ): Promise { const client = await this.pool.connect(); try { + await client.query('BEGIN'); + const failedAt = new Date(); + const failedAtIso = failedAt.toISOString(); const result = await client.query( ` UPDATE job_queue @@ -828,13 +866,20 @@ export class PostgresBackend implements QueueBackend { failure_reason = $3, last_failed_at = NOW() WHERE id = $1 AND status IN ('processing', 'pending') + RETURNING id, + job_type AS "jobType", + payload, + attempts, + max_attempts AS "maxAttempts", + next_attempt_at AS "nextAttemptAt", + dead_letter_job_type AS "deadLetterJobType" `, [ jobId, JSON.stringify([ { message: error.message || String(error), - timestamp: new Date().toISOString(), + timestamp: failedAtIso, }, ]), failureReason ?? null, @@ -844,13 +889,85 @@ export class PostgresBackend implements QueueBackend { log( `Job ${jobId} could not be failed (not in processing/pending state or does not exist)`, ); + await client.query('COMMIT'); + return; } - await this.recordJobEvent(jobId, JobEventType.Failed, { - message: error.message || String(error), - failureReason, - }); - log(`Failed job ${jobId}`); + + const failedJob = result.rows[0] as { + id: number; + jobType: string; + payload: unknown; + attempts: number; + maxAttempts: number; + nextAttemptAt: Date | null; + deadLetterJobType: string | null; + }; + + let deadLetterJobId: number | null = null; + const isExhausted = failedJob.nextAttemptAt == null; + + if (isExhausted && failedJob.deadLetterJobType) { + const deadLetterPayload = buildDeadLetterPayload({ + sourceJobId: failedJob.id, + sourceJobType: failedJob.jobType, + attempts: failedJob.attempts, + maxAttempts: failedJob.maxAttempts, + originalPayload: failedJob.payload, + errorMessage: error.message || String(error), + failureReason: failureReason ?? null, + failedAtIso, + }); + + const deadLetterInsert = await client.query( + `INSERT INTO job_queue + (job_type, payload, max_attempts, priority, run_at) + VALUES ($1, $2, $3, $4, NOW()) + RETURNING id`, + [failedJob.deadLetterJobType, deadLetterPayload, 1, 0], + ); + + deadLetterJobId = deadLetterInsert.rows[0].id; + await client.query( + `UPDATE job_queue + SET dead_lettered_at = NOW(), + dead_letter_job_id = $2 + WHERE id = $1`, + [failedJob.id, deadLetterJobId], + ); + + await client.query( + `INSERT INTO job_events (job_id, event_type, metadata) VALUES ($1, $2, $3)`, + [ + deadLetterJobId, + JobEventType.Added, + JSON.stringify({ + jobType: failedJob.deadLetterJobType, + sourceJobId: failedJob.id, + sourceJobType: failedJob.jobType, + }), + ], + ); + } + + await client.query( + `INSERT INTO job_events (job_id, event_type, metadata) VALUES ($1, $2, $3)`, + [ + jobId, + JobEventType.Failed, + JSON.stringify({ + message: error.message || String(error), + failureReason, + deadLetterJobId, + }), + ], + ); + + await client.query('COMMIT'); + log( + `Failed job ${jobId}${deadLetterJobId ? ` and routed to dead-letter job ${deadLetterJobId}` : ''}`, + ); } catch (err) { + await client.query('ROLLBACK'); log(`Error failing job ${jobId}: ${err}`); throw err; } finally { @@ -1104,6 +1221,10 @@ export class PostgresBackend implements QueueBackend { updateFields.push(`retry_delay_max = $${paramIdx++}`); params.push(updates.retryDelayMax ?? null); } + if (updates.deadLetterJobType !== undefined) { + updateFields.push(`dead_letter_job_type = $${paramIdx++}`); + params.push(updates.deadLetterJobType ?? null); + } if (updateFields.length === 0) { log(`No fields to update for job ${jobId}`); @@ -1136,6 +1257,8 @@ export class PostgresBackend implements QueueBackend { metadata.retryBackoff = updates.retryBackoff; if (updates.retryDelayMax !== undefined) metadata.retryDelayMax = updates.retryDelayMax; + if (updates.deadLetterJobType !== undefined) + metadata.deadLetterJobType = updates.deadLetterJobType; await this.recordJobEvent(jobId, JobEventType.Edited, metadata); log(`Edited job ${jobId}: ${JSON.stringify(metadata)}`); @@ -1197,6 +1320,10 @@ export class PostgresBackend implements QueueBackend { updateFields.push(`retry_delay_max = $${paramIdx++}`); params.push(updates.retryDelayMax ?? null); } + if (updates.deadLetterJobType !== undefined) { + updateFields.push(`dead_letter_job_type = $${paramIdx++}`); + params.push(updates.deadLetterJobType ?? null); + } if (updateFields.length === 0) { log(`No fields to update for batch edit`); @@ -1291,6 +1418,14 @@ export class PostgresBackend implements QueueBackend { if (updates.timeoutMs !== undefined) metadata.timeoutMs = updates.timeoutMs; if (updates.tags !== undefined) metadata.tags = updates.tags; + if (updates.retryDelay !== undefined) + metadata.retryDelay = updates.retryDelay; + if (updates.retryBackoff !== undefined) + metadata.retryBackoff = updates.retryBackoff; + if (updates.retryDelayMax !== undefined) + metadata.retryDelayMax = updates.retryDelayMax; + if (updates.deadLetterJobType !== undefined) + metadata.deadLetterJobType = updates.deadLetterJobType; for (const row of result.rows) { await this.recordJobEvent(row.id, JobEventType.Edited, metadata); @@ -1467,8 +1602,8 @@ export class PostgresBackend implements QueueBackend { `INSERT INTO cron_schedules (schedule_name, cron_expression, job_type, payload, max_attempts, priority, timeout_ms, force_kill_on_timeout, tags, timezone, - allow_overlap, next_run_at, retry_delay, retry_backoff, retry_delay_max) - VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15) + allow_overlap, next_run_at, retry_delay, retry_backoff, retry_delay_max, dead_letter_job_type) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16) RETURNING id`, [ input.scheduleName, @@ -1486,6 +1621,7 @@ export class PostgresBackend implements QueueBackend { input.retryDelay, input.retryBackoff, input.retryDelayMax, + input.deadLetterJobType, ], ); const id = result.rows[0].id; @@ -1519,7 +1655,8 @@ export class PostgresBackend implements QueueBackend { next_run_at AS "nextRunAt", created_at AS "createdAt", updated_at AS "updatedAt", retry_delay AS "retryDelay", retry_backoff AS "retryBackoff", - retry_delay_max AS "retryDelayMax" + retry_delay_max AS "retryDelayMax", + dead_letter_job_type AS "deadLetterJobType" FROM cron_schedules WHERE id = $1`, [id], ); @@ -1549,7 +1686,8 @@ export class PostgresBackend implements QueueBackend { next_run_at AS "nextRunAt", created_at AS "createdAt", updated_at AS "updatedAt", retry_delay AS "retryDelay", retry_backoff AS "retryBackoff", - retry_delay_max AS "retryDelayMax" + retry_delay_max AS "retryDelayMax", + dead_letter_job_type AS "deadLetterJobType" FROM cron_schedules WHERE schedule_name = $1`, [name], ); @@ -1578,7 +1716,8 @@ export class PostgresBackend implements QueueBackend { next_run_at AS "nextRunAt", created_at AS "createdAt", updated_at AS "updatedAt", retry_delay AS "retryDelay", retry_backoff AS "retryBackoff", - retry_delay_max AS "retryDelayMax" + retry_delay_max AS "retryDelayMax", + dead_letter_job_type AS "deadLetterJobType" FROM cron_schedules`; const params: any[] = []; if (status) { @@ -1704,6 +1843,10 @@ export class PostgresBackend implements QueueBackend { updateFields.push(`retry_delay_max = $${paramIdx++}`); params.push(updates.retryDelayMax); } + if (updates.deadLetterJobType !== undefined) { + updateFields.push(`dead_letter_job_type = $${paramIdx++}`); + params.push(updates.deadLetterJobType); + } if (nextRunAt !== undefined) { updateFields.push(`next_run_at = $${paramIdx++}`); params.push(nextRunAt); @@ -1745,7 +1888,8 @@ export class PostgresBackend implements QueueBackend { next_run_at AS "nextRunAt", created_at AS "createdAt", updated_at AS "updatedAt", retry_delay AS "retryDelay", retry_backoff AS "retryBackoff", - retry_delay_max AS "retryDelayMax" + retry_delay_max AS "retryDelayMax", + dead_letter_job_type AS "deadLetterJobType" FROM cron_schedules WHERE status = 'active' AND next_run_at IS NOT NULL diff --git a/packages/dataqueue/src/backends/redis-scripts.ts b/packages/dataqueue/src/backends/redis-scripts.ts index 000d0ed..d765f9f 100644 --- a/packages/dataqueue/src/backends/redis-scripts.ts +++ b/packages/dataqueue/src/backends/redis-scripts.ts @@ -32,7 +32,7 @@ const SCORE_RANGE = '1000000000000000'; // 1e15 * KEYS: [prefix] * ARGV: [jobType, payloadJson, maxAttempts, priority, runAtMs, timeoutMs, * forceKillOnTimeout, tagsJson, idempotencyKey, nowMs, - * retryDelay, retryBackoff, retryDelayMax] + * retryDelay, retryBackoff, retryDelayMax, deadLetterJobType] * Returns: job ID (number) */ export const ADD_JOB_SCRIPT = ` @@ -50,6 +50,7 @@ local nowMs = tonumber(ARGV[10]) local retryDelay = ARGV[11] -- "null" or seconds string local retryBackoff = ARGV[12] -- "null" or "true"/"false" local retryDelayMax = ARGV[13] -- "null" or seconds string +local deadLetterJobType = ARGV[14] -- "null" or jobType string -- Idempotency check if idempotencyKey ~= "null" then @@ -96,7 +97,10 @@ redis.call('HMSET', jobKey, 'stepData', 'null', 'retryDelay', retryDelay, 'retryBackoff', retryBackoff, - 'retryDelayMax', retryDelayMax + 'retryDelayMax', retryDelayMax, + 'deadLetterJobType', deadLetterJobType, + 'deadLetteredAt', 'null', + 'deadLetterJobId', 'null' ) -- Status index @@ -145,7 +149,7 @@ return id * jobsJson is a JSON array of objects, each with: * jobType, payload (already JSON string), maxAttempts, priority, * runAtMs, timeoutMs, forceKillOnTimeout, tags (JSON or "null"), - * idempotencyKey + * idempotencyKey, retryDelay, retryBackoff, retryDelayMax, deadLetterJobType * Returns: array of job IDs (one per input job, in order) */ export const ADD_JOBS_SCRIPT = ` @@ -169,6 +173,7 @@ for i, job in ipairs(jobs) do local retryDelay = tostring(job.retryDelay) local retryBackoff = tostring(job.retryBackoff) local retryDelayMax = tostring(job.retryDelayMax) + local deadLetterJobType = tostring(job.deadLetterJobType) -- Idempotency check local skip = false @@ -218,7 +223,10 @@ for i, job in ipairs(jobs) do 'stepData', 'null', 'retryDelay', retryDelay, 'retryBackoff', retryBackoff, - 'retryDelayMax', retryDelayMax + 'retryDelayMax', retryDelayMax, + 'deadLetterJobType', deadLetterJobType, + 'deadLetteredAt', 'null', + 'deadLetterJobId', 'null' ) -- Status index @@ -456,6 +464,7 @@ return 1 * KEYS: [prefix] * ARGV: [jobId, errorJson, failureReason, nowMs] * errorJson: JSON array like [{"message":"...", "timestamp":"..."}] + * Returns: dead-letter job ID when routed, otherwise 0 */ export const FAIL_JOB_SCRIPT = ` local prefix = KEYS[1] @@ -472,6 +481,7 @@ local maxAttempts = tonumber(redis.call('HGET', jk, 'maxAttempts')) local rdRaw = redis.call('HGET', jk, 'retryDelay') local rbRaw = redis.call('HGET', jk, 'retryBackoff') local rmRaw = redis.call('HGET', jk, 'retryDelayMax') +local deadLetterJobType = redis.call('HGET', jk, 'deadLetterJobType') local nextAttemptAt = 'null' if attempts < maxAttempts then @@ -511,6 +521,8 @@ for _, e in ipairs(newErrors) do table.insert(arr, e) end +local deadLetterJobId = 0 + redis.call('HMSET', jk, 'status', 'failed', 'updatedAt', nowMs, @@ -527,7 +539,86 @@ if nextAttemptAt ~= 'null' then redis.call('ZADD', prefix .. 'retry', nextAttemptAt, jobId) end -return 1 +-- Route exhausted jobs to a configured dead-letter job type. +if nextAttemptAt == 'null' and deadLetterJobType and deadLetterJobType ~= 'null' then + local sourceJobType = redis.call('HGET', jk, 'jobType') + local payloadRaw = redis.call('HGET', jk, 'payload') + local originalPayload = payloadRaw + local payloadOk, payloadDecoded = pcall(cjson.decode, payloadRaw) + if payloadOk then originalPayload = payloadDecoded end + + local failureMessage = 'Unknown error' + if #newErrors > 0 and newErrors[1].message then + failureMessage = newErrors[1].message + end + + local envelope = cjson.encode({ + originalJob = { + id = tonumber(jobId), + jobType = sourceJobType, + attempts = attempts, + maxAttempts = maxAttempts + }, + originalPayload = originalPayload, + failure = { + message = failureMessage, + reason = failureReason ~= 'null' and failureReason or cjson.null, + failedAt = newErrors[1] and newErrors[1].timestamp or cjson.null + } + }) + + deadLetterJobId = redis.call('INCR', prefix .. 'id_seq') + local dlqKey = prefix .. 'job:' .. deadLetterJobId + redis.call('HMSET', dlqKey, + 'id', deadLetterJobId, + 'jobType', deadLetterJobType, + 'payload', envelope, + 'status', 'pending', + 'maxAttempts', 1, + 'attempts', 0, + 'priority', 0, + 'runAt', nowMs, + 'timeoutMs', 'null', + 'forceKillOnTimeout', 'false', + 'createdAt', nowMs, + 'updatedAt', nowMs, + 'lockedAt', 'null', + 'lockedBy', 'null', + 'nextAttemptAt', 'null', + 'pendingReason', 'null', + 'errorHistory', '[]', + 'failureReason', 'null', + 'completedAt', 'null', + 'startedAt', 'null', + 'lastRetriedAt', 'null', + 'lastFailedAt', 'null', + 'lastCancelledAt', 'null', + 'tags', 'null', + 'idempotencyKey', 'null', + 'waitUntil', 'null', + 'waitTokenId', 'null', + 'stepData', 'null', + 'retryDelay', 'null', + 'retryBackoff', 'null', + 'retryDelayMax', 'null', + 'deadLetterJobType', 'null', + 'deadLetteredAt', 'null', + 'deadLetterJobId', 'null' + ) + + redis.call('SADD', prefix .. 'status:pending', deadLetterJobId) + redis.call('SADD', prefix .. 'type:' .. deadLetterJobType, deadLetterJobId) + redis.call('ZADD', prefix .. 'all', nowMs, deadLetterJobId) + local dlqScore = 0 * ${SCORE_RANGE} + (${SCORE_RANGE} - nowMs) + redis.call('ZADD', prefix .. 'queue', dlqScore, deadLetterJobId) + + redis.call('HMSET', jk, + 'deadLetteredAt', nowMs, + 'deadLetterJobId', deadLetterJobId + ) +end + +return deadLetterJobId `; /** diff --git a/packages/dataqueue/src/backends/redis.test.ts b/packages/dataqueue/src/backends/redis.test.ts index 6dc490c..0b32659 100644 --- a/packages/dataqueue/src/backends/redis.test.ts +++ b/packages/dataqueue/src/backends/redis.test.ts @@ -118,6 +118,46 @@ describe('Redis backend integration', () => { expect(job?.status).toBe('pending'); }); + it('should route exhausted jobs to a dead-letter job type when configured', async () => { + // Setup + const sourceJobId = await jobQueue.addJob({ + jobType: 'email', + payload: { to: 'deadletter@example.com' }, + maxAttempts: 1, + deadLetterJobType: 'email', + }); + + // Act + const processor = jobQueue.createProcessor({ + email: async () => { + throw new Error('permanent redis failure'); + }, + sms: vi.fn(async () => {}), + test: vi.fn(async () => {}), + }); + await processor.start(); + + // Assert + const sourceJob = await jobQueue.getJob(sourceJobId); + expect(sourceJob?.status).toBe('failed'); + expect(sourceJob?.nextAttemptAt).toBeNull(); + expect(sourceJob?.deadLetterJobType).toBe('email'); + expect(sourceJob?.deadLetteredAt).not.toBeNull(); + expect(sourceJob?.deadLetterJobId).not.toBeNull(); + + const deadLetterJob = await jobQueue.getJob(sourceJob!.deadLetterJobId!); + expect(deadLetterJob).not.toBeNull(); + expect(deadLetterJob?.status).toBe('pending'); + expect(deadLetterJob?.maxAttempts).toBe(1); + + const envelope = deadLetterJob?.payload as any; + expect(envelope.originalJob.id).toBe(sourceJobId); + expect(envelope.originalJob.jobType).toBe('email'); + expect(envelope.originalPayload).toEqual({ to: 'deadletter@example.com' }); + expect(envelope.failure.message).toBe('permanent redis failure'); + expect(envelope.failure.reason).toBe('handler_error'); + }); + it('should cancel a pending job', async () => { const jobId = await jobQueue.addJob({ jobType: 'email', @@ -789,6 +829,38 @@ describe('Redis cron schedules integration', () => { expect(schedule!.nextRunAt).toBeInstanceOf(Date); }); + it('stores deadLetterJobType on cron schedule and propagates it to enqueued jobs', async () => { + // Setup + const id = await jobQueue.addCronJob({ + scheduleName: 'cron-dead-letter-redis', + cronExpression: '* * * * *', + jobType: 'email', + payload: { to: 'redis-cron-dlq@example.com' }, + deadLetterJobType: 'email', + }); + + const pastMs = (Date.now() - 60_000).toString(); + await redisClient.hset(`${prefix}cron:${id}`, 'nextRunAt', pastMs); + await redisClient.zadd(`${prefix}cron_due`, Number(pastMs), id.toString()); + + // Act + const count = await jobQueue.enqueueDueCronJobs(); + + // Assert + expect(count).toBe(1); + const schedule = await jobQueue.getCronJob(id); + expect(schedule?.deadLetterJobType).toBe('email'); + + const jobs = await jobQueue.getJobsByStatus('pending'); + const cronJob = jobs.find( + (j) => + j.jobType === 'email' && + (j.payload as any).to === 'redis-cron-dlq@example.com', + ); + expect(cronJob).toBeDefined(); + expect(cronJob?.deadLetterJobType).toBe('email'); + }); + it('retrieves a cron schedule by name', async () => { // Setup await jobQueue.addCronJob({ diff --git a/packages/dataqueue/src/backends/redis.ts b/packages/dataqueue/src/backends/redis.ts index 82c9239..88d9106 100644 --- a/packages/dataqueue/src/backends/redis.ts +++ b/packages/dataqueue/src/backends/redis.ts @@ -172,6 +172,9 @@ function deserializeJob>( ? false : null, retryDelayMax: numOrNull(h.retryDelayMax), + deadLetterJobType: nullish(h.deadLetterJobType) as string | null | undefined, + deadLetteredAt: dateOrNull(h.deadLetteredAt), + deadLetterJobId: numOrNull(h.deadLetterJobId), output: parseJsonField(h.output), }; } @@ -317,6 +320,7 @@ export class RedisBackend implements QueueBackend { retryDelay = undefined, retryBackoff = undefined, retryDelayMax = undefined, + deadLetterJobType = undefined, }: JobOptions, options?: AddJobOptions, ): Promise { @@ -346,6 +350,7 @@ export class RedisBackend implements QueueBackend { retryDelay !== undefined ? retryDelay.toString() : 'null', retryBackoff !== undefined ? retryBackoff.toString() : 'null', retryDelayMax !== undefined ? retryDelayMax.toString() : 'null', + deadLetterJobType ?? 'null', )) as number; const jobId = Number(result); @@ -397,6 +402,7 @@ export class RedisBackend implements QueueBackend { job.retryBackoff !== undefined ? job.retryBackoff.toString() : 'null', retryDelayMax: job.retryDelayMax !== undefined ? job.retryDelayMax.toString() : 'null', + deadLetterJobType: job.deadLetterJobType ?? 'null', })); const result = (await this.client.eval( @@ -632,7 +638,7 @@ export class RedisBackend implements QueueBackend { timestamp: new Date(now).toISOString(), }, ]); - await this.client.eval( + const result = (await this.client.eval( FAIL_JOB_SCRIPT, 1, this.prefix, @@ -640,11 +646,28 @@ export class RedisBackend implements QueueBackend { errorJson, failureReason ?? 'null', now, - ); + )) as number; + const deadLetterJobId = Number(result) || null; await this.recordJobEvent(jobId, JobEventType.Failed, { message: error.message || String(error), failureReason, + deadLetterJobId, }); + if (deadLetterJobId) { + const sourceJob = await this.client.hget( + `${this.prefix}job:${jobId}`, + 'jobType', + ); + const deadLetterJobType = await this.client.hget( + `${this.prefix}job:${deadLetterJobId}`, + 'jobType', + ); + await this.recordJobEvent(deadLetterJobId, JobEventType.Added, { + jobType: deadLetterJobType, + sourceJobId: jobId, + sourceJobType: sourceJob, + }); + } log(`Failed job ${jobId}`); } @@ -844,6 +867,10 @@ export class RedisBackend implements QueueBackend { ); metadata.retryDelayMax = updates.retryDelayMax; } + if (updates.deadLetterJobType !== undefined) { + fields.push('deadLetterJobType', updates.deadLetterJobType ?? 'null'); + metadata.deadLetterJobType = updates.deadLetterJobType; + } if (fields.length === 0) { log(`No fields to update for job ${jobId}`); @@ -1435,6 +1462,10 @@ export class RedisBackend implements QueueBackend { input.retryDelayMax !== null && input.retryDelayMax !== undefined ? input.retryDelayMax.toString() : 'null', + 'deadLetterJobType', + input.deadLetterJobType !== null && input.deadLetterJobType !== undefined + ? input.deadLetterJobType + : 'null', ]; await (this.client as any).hmset(key, ...fields); @@ -1638,6 +1669,12 @@ export class RedisBackend implements QueueBackend { : 'null', ); } + if (updates.deadLetterJobType !== undefined) { + fields.push( + 'deadLetterJobType', + updates.deadLetterJobType !== null ? updates.deadLetterJobType : 'null', + ); + } if (nextRunAt !== undefined) { const val = nextRunAt !== null ? nextRunAt.getTime().toString() : 'null'; fields.push('nextRunAt', val); @@ -1786,6 +1823,7 @@ export class RedisBackend implements QueueBackend { ? false : null, retryDelayMax: numOrNull(h.retryDelayMax), + deadLetterJobType: nullish(h.deadLetterJobType), }; } diff --git a/packages/dataqueue/src/index.test.ts b/packages/dataqueue/src/index.test.ts index 1ae4752..8f1e6ea 100644 --- a/packages/dataqueue/src/index.test.ts +++ b/packages/dataqueue/src/index.test.ts @@ -571,6 +571,35 @@ describe('index integration', () => { expect(delaySec).toBeGreaterThanOrEqual(9); expect(delaySec).toBeLessThanOrEqual(11); }); + + it('should route exhausted jobs to dead-letter job type through public API', async () => { + const sourceJobId = await jobQueue.addJob({ + jobType: 'email', + payload: { to: 'public-dlq@example.com' }, + maxAttempts: 1, + deadLetterJobType: 'email', + }); + + const processor = jobQueue.createProcessor({ + email: async () => { + throw new Error('public api permanent failure'); + }, + sms: vi.fn(async () => {}), + test: vi.fn(async () => {}), + }); + await processor.start(); + + const sourceJob = await jobQueue.getJob(sourceJobId); + expect(sourceJob?.status).toBe('failed'); + expect(sourceJob?.deadLetterJobId).not.toBeNull(); + expect(sourceJob?.deadLetteredAt).not.toBeNull(); + + const deadLetterJob = await jobQueue.getJob(sourceJob!.deadLetterJobId!); + expect(deadLetterJob?.status).toBe('pending'); + const envelope = deadLetterJob?.payload as any; + expect(envelope.originalJob.id).toBe(sourceJobId); + expect(envelope.failure.message).toBe('public api permanent failure'); + }); }); describe('cron schedules integration', () => { @@ -978,6 +1007,31 @@ describe('cron schedules integration', () => { expect(schedule?.retryBackoff).toBe(true); expect(schedule?.retryDelayMax).toBe(600); }); + + it('should propagate deadLetterJobType from cron schedule to enqueued jobs', async () => { + const cronId = await jobQueue.addCronJob({ + scheduleName: 'cron-dead-letter-propagation', + cronExpression: '* * * * *', + jobType: 'email', + payload: { to: 'cron-dlq@example.com' }, + deadLetterJobType: 'email', + }); + + await pool.query( + `UPDATE cron_schedules SET next_run_at = NOW() - interval '1 second' WHERE id = $1`, + [cronId], + ); + + const count = await jobQueue.enqueueDueCronJobs(); + expect(count).toBe(1); + + const pendingJobs = await jobQueue.getJobsByStatus('pending', 10, 0); + const created = pendingJobs.find( + (job) => (job.payload as any)?.to === 'cron-dlq@example.com', + ); + expect(created).toBeDefined(); + expect(created?.deadLetterJobType).toBe('email'); + }); }); // ── BYOC (Bring Your Own Connection) tests ────────────────────────────── diff --git a/packages/dataqueue/src/index.ts b/packages/dataqueue/src/index.ts index 405f392..1e51945 100644 --- a/packages/dataqueue/src/index.ts +++ b/packages/dataqueue/src/index.ts @@ -120,6 +120,7 @@ export const initJobQueue = ( retryDelay: schedule.retryDelay ?? undefined, retryBackoff: schedule.retryBackoff ?? undefined, retryDelayMax: schedule.retryDelayMax ?? undefined, + deadLetterJobType: schedule.deadLetterJobType ?? undefined, }); // Advance to next occurrence @@ -327,6 +328,7 @@ export const initJobQueue = ( retryDelay: options.retryDelay ?? null, retryBackoff: options.retryBackoff ?? null, retryDelayMax: options.retryDelayMax ?? null, + deadLetterJobType: options.deadLetterJobType ?? null, }; return backend.addCronSchedule(input); }, diff --git a/packages/dataqueue/src/processor.test.ts b/packages/dataqueue/src/processor.test.ts index 6dfc0f6..d2062ed 100644 --- a/packages/dataqueue/src/processor.test.ts +++ b/packages/dataqueue/src/processor.test.ts @@ -112,6 +112,46 @@ describe('processor integration', () => { expect(failed?.failureReason).toBe('handler_error'); }); + it('should create a dead-letter job when retries are exhausted and DLQ is configured', async () => { + const handler = vi.fn(async () => { + throw new Error('processor permanent failure'); + }); + const handlers = { + test: vi.fn(async () => {}), + fail: handler, + missing: vi.fn(async () => {}), + batch: vi.fn(async () => {}), + proc: vi.fn(async () => {}), + typeA: vi.fn(async () => {}), + typeB: vi.fn(async () => {}), + typeC: vi.fn(async () => {}), + }; + + const sourceJobId = await queue.addJob(pool, { + jobType: 'fail', + payload: {}, + maxAttempts: 1, + deadLetterJobType: 'typeA', + }); + const batch = await queue.getNextBatch(pool, 'test-worker', 1); + expect(batch).toHaveLength(1); + expect(batch[0].id).toBe(sourceJobId); + await processJobWithHandlers(backend, batch[0], handlers); + + const sourceJob = await queue.getJob(pool, sourceJobId); + expect(sourceJob?.status).toBe('failed'); + expect(sourceJob?.deadLetterJobType).toBe('typeA'); + expect(sourceJob?.deadLetterJobId).not.toBeNull(); + + const deadLetterJob = await queue.getJob(pool, sourceJob!.deadLetterJobId!); + expect(deadLetterJob?.jobType).toBe('typeA'); + expect(deadLetterJob?.status).toBe('pending'); + const envelope = deadLetterJob?.payload as any; + expect(envelope.originalJob.id).toBe(sourceJobId); + expect(envelope.failure.message).toBe('processor permanent failure'); + expect(envelope.failure.reason).toBe('handler_error'); + }); + it('should mark job as failed if no handler registered', async () => { const handler = vi.fn(async () => { throw new Error('No handler registered'); diff --git a/packages/dataqueue/src/queue.test.ts b/packages/dataqueue/src/queue.test.ts index 736eb0d..eac2446 100644 --- a/packages/dataqueue/src/queue.test.ts +++ b/packages/dataqueue/src/queue.test.ts @@ -2000,6 +2000,47 @@ describe('getJobs', () => { expect(batch3.length).toBe(0); }); + it('should route exhausted jobs to dead-letter job type when configured', async () => { + // Setup + const sourceJobId = await queue.addJob< + { email: { to: string }; email_dead_letter: any }, + 'email' + >(pool, { + jobType: 'email', + payload: { to: 'deadletter@example.com' }, + maxAttempts: 1, + deadLetterJobType: 'email_dead_letter', + }); + + // Act + await queue.getNextBatch(pool, 'worker-dead-letter', 1); + await queue.failJob(pool, sourceJobId, new Error('permanent failure')); + + // Assert + const sourceJob = await queue.getJob(pool, sourceJobId); + expect(sourceJob?.status).toBe('failed'); + expect(sourceJob?.nextAttemptAt).toBeNull(); + expect(sourceJob?.deadLetterJobType).toBe('email_dead_letter'); + expect(sourceJob?.deadLetteredAt).not.toBeNull(); + expect(sourceJob?.deadLetterJobId).not.toBeNull(); + + const deadLetterJob = await queue.getJob( + pool, + sourceJob!.deadLetterJobId as number, + ); + expect(deadLetterJob).not.toBeNull(); + expect(deadLetterJob?.jobType).toBe('email_dead_letter'); + expect(deadLetterJob?.status).toBe('pending'); + expect(deadLetterJob?.maxAttempts).toBe(1); + + const envelope = deadLetterJob?.payload as any; + expect(envelope.originalJob.id).toBe(sourceJobId); + expect(envelope.originalJob.jobType).toBe('email'); + expect(envelope.originalPayload).toEqual({ to: 'deadletter@example.com' }); + expect(envelope.failure.message).toBe('permanent failure'); + expect(envelope.failure.reason).toBeNull(); + }); + // ── Configurable retry strategy tests ──────────────────────────────── it('uses legacy backoff when no retry config is set', async () => { diff --git a/packages/dataqueue/src/queue.ts b/packages/dataqueue/src/queue.ts index 2dc620d..30fd4b0 100644 --- a/packages/dataqueue/src/queue.ts +++ b/packages/dataqueue/src/queue.ts @@ -123,6 +123,7 @@ export const editJob = async ( retryDelay?: number | null; retryBackoff?: boolean | null; retryDelayMax?: number | null; + deadLetterJobType?: string | null; }, ): Promise => new PostgresBackend(pool).editJob(jobId, updates); @@ -151,6 +152,7 @@ export const editAllPendingJobs = async < retryDelay?: number | null; retryBackoff?: boolean | null; retryDelayMax?: number | null; + deadLetterJobType?: string | null; }, ): Promise => new PostgresBackend(pool).editAllPendingJobs(filters, updates); diff --git a/packages/dataqueue/src/types.ts b/packages/dataqueue/src/types.ts index 764d7cd..b8400d7 100644 --- a/packages/dataqueue/src/types.ts +++ b/packages/dataqueue/src/types.ts @@ -120,6 +120,15 @@ export interface JobOptions> { * `retryBackoff` is true. No limit when omitted. */ retryDelayMax?: number; + /** + * Optional job type used as a dead-letter destination when this job + * exhausts all retry attempts. + * + * When set, a new job is created in this destination with an envelope + * payload containing the original job metadata, original payload, and + * failure context. + */ + deadLetterJobType?: string; } /** @@ -260,6 +269,36 @@ export interface JobRecord> { * Maximum delay cap for retries in seconds, or null if no cap. */ retryDelayMax?: number | null; + /** + * Optional dead-letter destination configured for this job. + */ + deadLetterJobType?: string | null; + /** + * The time this job was routed to a dead-letter job, if it was routed. + */ + deadLetteredAt?: Date | null; + /** + * The generated dead-letter job ID linked to this source job, if routed. + */ + deadLetterJobId?: number | null; +} + +/** + * Envelope payload stored in dead-letter jobs. + */ +export interface DeadLetterPayloadEnvelope { + originalJob: { + id: number; + jobType: string; + attempts: number; + maxAttempts: number; + }; + originalPayload: unknown; + failure: { + message: string; + reason: FailureReason | null; + failedAt: string; + }; } /** @@ -780,6 +819,11 @@ export interface CronScheduleOptions< retryBackoff?: boolean; /** Maximum delay cap for retries in seconds. */ retryDelayMax?: number; + /** + * Optional dead-letter destination job type inherited by jobs enqueued + * from this schedule. + */ + deadLetterJobType?: string; } /** @@ -807,6 +851,7 @@ export interface CronScheduleRecord { retryDelay: number | null; retryBackoff: boolean | null; retryDelayMax: number | null; + deadLetterJobType: string | null; } /** @@ -826,6 +871,7 @@ export interface EditCronScheduleOptions { retryDelay?: number | null; retryBackoff?: boolean | null; retryDelayMax?: number | null; + deadLetterJobType?: string | null; } // ── Event hooks ────────────────────────────────────────────────────── From 98aa264da3c56122ee6976ac84bc0b14414fe217 Mon Sep 17 00:00:00 2001 From: Nico Prananta <311343+nicnocquee@users.noreply.github.com> Date: Tue, 24 Feb 2026 12:10:38 +0100 Subject: [PATCH 2/2] move dlq migration --- ...tter_columns.sql => 1781200000008_add_dead_letter_columns.sql} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename packages/dataqueue/migrations/{1781200000007_add_dead_letter_columns.sql => 1781200000008_add_dead_letter_columns.sql} (100%) diff --git a/packages/dataqueue/migrations/1781200000007_add_dead_letter_columns.sql b/packages/dataqueue/migrations/1781200000008_add_dead_letter_columns.sql similarity index 100% rename from packages/dataqueue/migrations/1781200000007_add_dead_letter_columns.sql rename to packages/dataqueue/migrations/1781200000008_add_dead_letter_columns.sql