Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions apps/docs/content/docs/api/job-options.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ The `JobOptions` interface defines the options for creating a new job in the que

- `tags?`: _string[]_ — Tags for this job. Used for grouping, searching, or batch operations.
- `idempotencyKey?`: _string_ — Optional idempotency key. When provided, ensures that only one job exists for a given key. If a job with the same key already exists, `addJob` returns the existing job's ID instead of creating a duplicate. See [Idempotency](/usage/add-job#idempotency) for details.
- `deadLetterJobType?`: _string_ — Optional dead-letter destination job type. When the job exhausts retries, DataQueue creates a new pending job in this job type with an envelope payload containing source metadata, original payload, and failure context.

## Example

Expand All @@ -36,5 +37,6 @@ const job = {
forceKillOnTimeout: false, // Use graceful shutdown (default)
tags: ['welcome', 'user'], // tags for grouping/searching
idempotencyKey: 'welcome-email-user-123', // prevent duplicate jobs
deadLetterJobType: 'email_dead_letter', // route exhausted failures
};
```
5 changes: 4 additions & 1 deletion apps/docs/content/docs/api/job-queue.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -115,13 +115,15 @@ interface JobOptions {
retryDelay?: number; // Base delay between retries in seconds (default: 60)
retryBackoff?: boolean; // Use exponential backoff (default: true)
retryDelayMax?: number; // Max delay cap in seconds (default: none)
deadLetterJobType?: string; // Route exhausted failures to this job type
group?: { id: string; tier?: string }; // Optional group for global concurrency limits
}
```

- `retryDelay` - Base delay between retries in seconds. When `retryBackoff` is true, this is the base for exponential backoff (`retryDelay * 2^attempts`). When false, retries use this fixed delay. Default: `60`.
- `retryBackoff` - Whether to use exponential backoff. When true, delay doubles with each attempt and includes jitter. Default: `true`.
- `retryDelayMax` - Maximum delay cap in seconds. Only meaningful when `retryBackoff` is true. No limit when omitted.
- `deadLetterJobType` - Optional dead-letter destination. When retries are exhausted, a new pending job is created in this job type with an envelope payload (`originalJob`, `originalPayload`, `failure`).
- `group` - Optional grouping metadata. Use `group.id` to enforce global per-group limits with `ProcessorOptions.groupConcurrency`. `group.tier` is reserved for future policies.

#### AddJobOptions
Expand Down Expand Up @@ -277,10 +279,11 @@ interface EditJobOptions {
retryDelay?: number | null;
retryBackoff?: boolean | null;
retryDelayMax?: number | null;
deadLetterJobType?: string | null;
}
```

All fields are optional - only provided fields will be updated. Note that `jobType` cannot be changed. Set retry fields to `null` to revert to legacy default behavior.
All fields are optional - only provided fields will be updated. Note that `jobType` cannot be changed. Set retry fields to `null` to revert to legacy default behavior. Set `deadLetterJobType` to `null` to clear dead-letter routing for pending jobs.

#### Example

Expand Down
5 changes: 4 additions & 1 deletion apps/docs/content/docs/api/job-record.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ The `JobRecord` interface represents a job stored in the queue, including its st
- `jobType`: _string_ — The type of the job.
- `payload`: _any_ — The job payload.
- `status`:
_'pending' | 'processing' | 'completed' | 'failed' | 'cancelled'_ —
_'pending' | 'processing' | 'completed' | 'failed' | 'cancelled' | 'waiting'_ —
Current job status.
- `createdAt`: _Date_ — When the job was created.
- `updated_at`: _Date_ — When the job was last updated.
Expand Down Expand Up @@ -42,6 +42,9 @@ The `JobRecord` interface represents a job stored in the queue, including its st
- `idempotencyKey?`: _string | null_ — The idempotency key for this job, if one was provided when the job was created.
- `progress?`: _number | null_ — Progress percentage (0–100) reported by the handler via `ctx.setProgress()`. `null` if no progress has been reported. See [Progress Tracking](/usage/progress-tracking).
- `output?`: _unknown_ — Handler output stored via `ctx.setOutput(data)` or by returning a value from the handler. `null` if no output has been stored. See [Job Output](/usage/job-output).
- `deadLetterJobType?`: _string | null_ — Configured dead-letter destination job type for this job.
- `deadLetteredAt?`: _Date | null_ — Timestamp when this job was routed to a dead-letter job.
- `deadLetterJobId?`: _number | null_ — Linked dead-letter job ID created when retries were exhausted.

## Example

Expand Down
5 changes: 3 additions & 2 deletions apps/docs/content/docs/usage/building-with-ai.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ Type handlers as `JobHandlers<PayloadMap>` — TypeScript enforces a handler for
1. Creating initJobQueue per request (creates a DB pool each time)
2. Missing handler for a job type (fails with NoHandler)
3. Not checking signal.aborted in long handlers
4. Forgetting reclaimStuckJobs() — crashed workers leave jobs stuck
5. Skipping migrations (PostgreSQL requires `dataqueue-cli migrate`)
4. Forgetting dead-letter routing for critical jobs — set `deadLetterJobType` so exhausted failures are inspectable/replayable
5. Forgetting reclaimStuckJobs() — crashed workers leave jobs stuck
6. Skipping migrations (PostgreSQL requires `dataqueue-cli migrate`)
```
31 changes: 31 additions & 0 deletions apps/docs/content/docs/usage/failed-jobs.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,37 @@ A job handler can fail for many reasons, such as a bug in the code or running ou

When a job fails, it is marked as `failed` and retried up to `maxAttempts` times (default: 3). You can view the error history for a job in its `errorHistory` field.

## Dead-letter queues

You can route permanently failed jobs to a dead-letter job type using `deadLetterJobType`.

When a job exhausts retries (`attempts >= maxAttempts`), DataQueue:

1. Keeps the source job as `failed`.
2. Creates a new pending dead-letter job in `deadLetterJobType`.
3. Stores linkage metadata on the source job (`deadLetteredAt`, `deadLetterJobId`).

```ts
await jobQueue.addJob({
jobType: 'email',
payload: { to: 'user@example.com' },
maxAttempts: 3,
deadLetterJobType: 'email_dead_letter',
});
```

The dead-letter job payload is an envelope:

```ts
{
originalJob: { id, jobType, attempts, maxAttempts },
originalPayload: { ... }, // original job payload
failure: { message, reason, failedAt },
}
```

If `deadLetterJobType` is not set, behavior is unchanged: exhausted jobs remain failed without creating a dead-letter job.

## Retry configuration

You can control the retry behavior per-job using three options:
Expand Down
15 changes: 15 additions & 0 deletions packages/dataqueue/ai/rules/advanced.md
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,21 @@ await queue.addJob({
- No config — legacy `2^attempts * 60s` formula (backward compatible).
- Cron schedules propagate retry config to enqueued jobs.

## Dead-Letter Routing

Configure dead-letter capture with `deadLetterJobType`:

```typescript
await queue.addJob({
jobType: 'email',
payload,
maxAttempts: 3,
deadLetterJobType: 'email_dead_letter',
});
```

When retries are exhausted, DataQueue creates a pending dead-letter job with envelope payload containing `originalJob`, `originalPayload`, and `failure`. Source jobs remain `failed` and store linkage metadata (`deadLetteredAt`, `deadLetterJobId`).

## Event Hooks

Subscribe to real-time lifecycle events via `on`, `once`, `off`, `removeAllListeners`. Works with both Postgres and Redis.
Expand Down
16 changes: 16 additions & 0 deletions packages/dataqueue/ai/rules/basic.md
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,21 @@ Control retry behavior per-job with optional fields on `addJob`:

When none are set, the legacy `2^attempts * 60s` formula is used.

## Dead-Letter Queue

Use `deadLetterJobType` for jobs that must be captured after exhausting retries:

```typescript
await queue.addJob({
jobType: 'email',
payload: { to: 'user@example.com', subject: 'Hi', body: '...' },
maxAttempts: 3,
deadLetterJobType: 'email_dead_letter',
});
```

On exhaustion, the source job stays `failed` and a new pending dead-letter job is created with envelope payload: `{ originalJob, originalPayload, failure }`.

## Common Mistakes

1. Creating `initJobQueue` per request — use a singleton.
Expand All @@ -158,3 +173,4 @@ When none are set, the legacy `2^attempts * 60s` formula is used.
4. Skipping maintenance — use `createSupervisor()` to automate reclaim, cleanup, and token expiry. Without it, stuck jobs and old data accumulate.
5. Skipping migrations (PostgreSQL) — run `dataqueue-cli migrate` first. Redis needs none.
6. Using `stop()` instead of `stopAndDrain()` — leaves in-flight jobs stuck.
7. Expecting dead-letter routing without setting `deadLetterJobType` — DLQ is opt-in.
23 changes: 23 additions & 0 deletions packages/dataqueue/ai/skills/dataqueue-advanced/SKILL.md
Original file line number Diff line number Diff line change
Expand Up @@ -364,6 +364,29 @@ await queue.addCronJob({

Every job enqueued by the schedule inherits the retry settings.

### Dead-letter routing

Set `deadLetterJobType` on jobs (or cron schedules) to route exhausted failures:

```typescript
await queue.addJob({
jobType: 'email',
payload: { to: 'user@example.com' },
maxAttempts: 3,
deadLetterJobType: 'email_dead_letter',
});
```

Dead-letter jobs receive envelope payload:

```typescript
{
originalJob: { id, jobType, attempts, maxAttempts },
originalPayload: {...},
failure: { message, reason, failedAt }
}
```

### Default behavior

When no retry options are set, the legacy formula `2^attempts * 60 seconds` is used. This is fully backward compatible.
Expand Down
16 changes: 16 additions & 0 deletions packages/dataqueue/ai/skills/dataqueue-core/SKILL.md
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,21 @@ await queue.addJob({
- **Exponential backoff** (default): delay doubles each attempt with jitter.
- **Default**: when no retry options are set, legacy `2^attempts * 60s` is used.

### Dead-letter queues

Route exhausted failures into a dedicated job type with `deadLetterJobType`:

```typescript
await queue.addJob({
jobType: 'send_email',
payload: { to: 'user@example.com', subject: 'Hi', body: 'Hello' },
maxAttempts: 3,
deadLetterJobType: 'email_dead_letter',
});
```

When retries are exhausted, DataQueue keeps the source job as `failed` and creates a new pending dead-letter job with envelope payload: `{ originalJob, originalPayload, failure }`.

## Step 5: Process Jobs

### Serverless (one-shot)
Expand Down Expand Up @@ -235,3 +250,4 @@ process.on('SIGTERM', async () => {
6. **Not calling `stopAndDrain` on shutdown** — use `stopAndDrain()` (not `stop()`) for graceful shutdown to avoid stuck jobs.
7. **Forgetting to commit/rollback when using `db` option** — the `addJob` INSERT sits in an open transaction. If you never `COMMIT` or `ROLLBACK`, the connection leaks and the job is invisible to other sessions.
8. **Using `db` option with Redis** — transactional job creation is PostgreSQL only. The Redis backend throws if `db` is provided.
9. **Expecting dead-letter routing without configuration** — DLQ is opt-in. Set `deadLetterJobType` on jobs (or cron schedules) that require dead-letter capture.
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
-- Up Migration
ALTER TABLE job_queue ADD COLUMN IF NOT EXISTS dead_letter_job_type VARCHAR(255);
ALTER TABLE job_queue ADD COLUMN IF NOT EXISTS dead_lettered_at TIMESTAMPTZ;
ALTER TABLE job_queue ADD COLUMN IF NOT EXISTS dead_letter_job_id INT;

ALTER TABLE cron_schedules ADD COLUMN IF NOT EXISTS dead_letter_job_type VARCHAR(255);

-- Down Migration
ALTER TABLE job_queue DROP COLUMN IF EXISTS dead_letter_job_type;
ALTER TABLE job_queue DROP COLUMN IF EXISTS dead_lettered_at;
ALTER TABLE job_queue DROP COLUMN IF EXISTS dead_letter_job_id;

ALTER TABLE cron_schedules DROP COLUMN IF EXISTS dead_letter_job_type;
2 changes: 2 additions & 0 deletions packages/dataqueue/src/backend.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ export interface JobUpdates {
retryDelay?: number | null;
retryBackoff?: boolean | null;
retryDelayMax?: number | null;
deadLetterJobType?: string | null;
}

/**
Expand All @@ -65,6 +66,7 @@ export interface CronScheduleInput {
retryDelay: number | null;
retryBackoff: boolean | null;
retryDelayMax: number | null;
deadLetterJobType: string | null;
}

/**
Expand Down
Loading
Loading