From 083c19cbdfee17f987cd1276fd634c23f0282592 Mon Sep 17 00:00:00 2001 From: Nico Prananta <311343+nicnocquee@users.noreply.github.com> Date: Sun, 22 Mar 2026 13:20:08 +0100 Subject: [PATCH 1/4] Add job dependency features to DataQueue - Introduced a new SQL migration to add `depends_on_job_ids` and `depends_on_tags` columns to the `job_queue` table, along with an index for efficient querying. - Implemented job dependency management in the backend, allowing jobs to specify prerequisites that must be completed before execution. - Enhanced the `JobOptions` and `JobRecord` interfaces to include fields for job dependencies. - Added utility functions for normalizing and resolving job dependencies, ensuring robust handling of prerequisite jobs. - Developed comprehensive tests for the new job dependency features, validating their functionality and integration. These changes significantly enhance the job management capabilities of DataQueue, enabling more complex workflows and dependencies. --- ...1200000009_add_depends_on_to_job_queue.sql | 10 + packages/dataqueue/src/backends/postgres.ts | 283 ++++++++++++++++-- .../dataqueue/src/backends/redis-scripts.ts | 101 ++++++- packages/dataqueue/src/backends/redis.ts | 218 ++++++++++++-- packages/dataqueue/src/index.ts | 8 + .../dataqueue/src/job-dependencies.test.ts | 129 ++++++++ packages/dataqueue/src/job-dependencies.ts | 140 +++++++++ packages/dataqueue/src/types.ts | 36 +++ packages/react/src/types.ts | 4 + 9 files changed, 872 insertions(+), 57 deletions(-) create mode 100644 packages/dataqueue/migrations/1781200000009_add_depends_on_to_job_queue.sql create mode 100644 packages/dataqueue/src/job-dependencies.test.ts create mode 100644 packages/dataqueue/src/job-dependencies.ts diff --git a/packages/dataqueue/migrations/1781200000009_add_depends_on_to_job_queue.sql b/packages/dataqueue/migrations/1781200000009_add_depends_on_to_job_queue.sql new file mode 100644 index 0000000..fb8c180 --- /dev/null +++ b/packages/dataqueue/migrations/1781200000009_add_depends_on_to_job_queue.sql @@ -0,0 +1,10 @@ +-- Up Migration +ALTER TABLE job_queue ADD COLUMN IF NOT EXISTS depends_on_job_ids INTEGER[]; +ALTER TABLE job_queue ADD COLUMN IF NOT EXISTS depends_on_tags TEXT[]; + +CREATE INDEX IF NOT EXISTS idx_job_queue_depends_on_job_ids ON job_queue USING GIN (depends_on_job_ids); + +-- Down Migration +DROP INDEX IF EXISTS idx_job_queue_depends_on_job_ids; +ALTER TABLE job_queue DROP COLUMN IF EXISTS depends_on_tags; +ALTER TABLE job_queue DROP COLUMN IF EXISTS depends_on_job_ids; diff --git a/packages/dataqueue/src/backends/postgres.ts b/packages/dataqueue/src/backends/postgres.ts index c0197cc..a61e5c6 100644 --- a/packages/dataqueue/src/backends/postgres.ts +++ b/packages/dataqueue/src/backends/postgres.ts @@ -24,9 +24,39 @@ import { CronScheduleInput, } from '../backend.js'; import { log } from '../log-context.js'; +import { + assertNoDependencyCycle, + normalizeDependsOn, + resolveDependsOnJobIdsForBatch, + validatePrerequisiteJobIdsExist, +} from '../job-dependencies.js'; const MAX_TIMEOUT_MS = 365 * 24 * 60 * 60 * 1000; +/** SQL fragment: candidate job may run only if job-id and tag prerequisites are satisfied. */ +const JOB_DEPENDS_ON_PREDICATE = ` + AND ( + candidate.depends_on_job_ids IS NULL + OR cardinality(candidate.depends_on_job_ids) = 0 + OR NOT EXISTS ( + SELECT 1 + FROM unnest(candidate.depends_on_job_ids) AS dep(id) + LEFT JOIN job_queue prereq ON prereq.id = dep.id + WHERE prereq.id IS NULL OR prereq.status <> 'completed' + ) + ) + AND ( + candidate.depends_on_tags IS NULL + OR cardinality(candidate.depends_on_tags) = 0 + OR NOT EXISTS ( + SELECT 1 FROM job_queue blocker + WHERE blocker.id <> candidate.id + AND blocker.status IN ('pending', 'processing', 'waiting') + AND blocker.tags IS NOT NULL + AND blocker.tags @> candidate.depends_on_tags + ) + )`; + /** Parse a timeout string like '10m', '1h', '24h', '7d' into milliseconds. */ function parseTimeoutString(timeout: string): number { const match = timeout.match(/^(\d+)(s|m|h|d)$/); @@ -159,13 +189,34 @@ export class PostgresBackend implements QueueBackend { retryDelayMax = undefined, deadLetterJobType = undefined, group = undefined, + dependsOn, }: JobOptions, options?: AddJobOptions, ): Promise { const externalClient = options?.db; const client: DatabaseClient = externalClient ?? (await this.pool.connect()); + let manageTx = false; try { + const { jobIds: depJobIdsRaw, tags: depTags } = + normalizeDependsOn(dependsOn); + let resolvedDepJobIds: number[] = []; + if (depJobIdsRaw?.length) { + if (depJobIdsRaw.some((id) => id < 0)) { + throw new Error( + 'dependsOn.jobIds: batch-relative (negative) ids are only supported in addJobs()', + ); + } + resolvedDepJobIds = depJobIdsRaw; + await validatePrerequisiteJobIdsExist(client, resolvedDepJobIds); + } + const dependsOnJobIdsParam = + resolvedDepJobIds.length > 0 ? resolvedDepJobIds : null; + const dependsOnTagsParam = depTags?.length ? depTags : null; + + manageTx = resolvedDepJobIds.length > 0 && !externalClient; + if (manageTx) await client.query('BEGIN'); + let result; const onConflict = idempotencyKey ? `ON CONFLICT (idempotency_key) WHERE idempotency_key IS NOT NULL DO NOTHING` @@ -174,8 +225,8 @@ export class PostgresBackend implements QueueBackend { if (runAt) { result = await client.query( `INSERT INTO job_queue - (job_type, payload, max_attempts, priority, run_at, timeout_ms, force_kill_on_timeout, tags, idempotency_key, retry_delay, retry_backoff, retry_delay_max, dead_letter_job_type, group_id, group_tier) - VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15) + (job_type, payload, max_attempts, priority, run_at, timeout_ms, force_kill_on_timeout, tags, idempotency_key, retry_delay, retry_backoff, retry_delay_max, dead_letter_job_type, group_id, group_tier, depends_on_job_ids, depends_on_tags) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17) ${onConflict} RETURNING id`, [ @@ -194,13 +245,15 @@ export class PostgresBackend implements QueueBackend { deadLetterJobType ?? null, group?.id ?? null, group?.tier ?? null, + dependsOnJobIdsParam, + dependsOnTagsParam, ], ); } else { result = await client.query( `INSERT INTO job_queue - (job_type, payload, max_attempts, priority, timeout_ms, force_kill_on_timeout, tags, idempotency_key, retry_delay, retry_backoff, retry_delay_max, dead_letter_job_type, group_id, group_tier) - VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14) + (job_type, payload, max_attempts, priority, timeout_ms, force_kill_on_timeout, tags, idempotency_key, retry_delay, retry_backoff, retry_delay_max, dead_letter_job_type, group_id, group_tier, depends_on_job_ids, depends_on_tags) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16) ${onConflict} RETURNING id`, [ @@ -218,11 +271,14 @@ export class PostgresBackend implements QueueBackend { deadLetterJobType ?? null, group?.id ?? null, group?.tier ?? null, + dependsOnJobIdsParam, + dependsOnTagsParam, ], ); } if (result.rows.length === 0 && idempotencyKey) { + if (manageTx) await client.query('ROLLBACK'); const existing = await client.query( `SELECT id FROM job_queue WHERE idempotency_key = $1`, [idempotencyKey], @@ -233,39 +289,53 @@ export class PostgresBackend implements QueueBackend { ); return existing.rows[0].id; } + if (manageTx) await client.query('ROLLBACK'); throw new Error( `Failed to insert job and could not find existing job with idempotency key "${idempotencyKey}"`, ); } const jobId = result.rows[0].id; + + if (resolvedDepJobIds.length > 0) { + await assertNoDependencyCycle(client, jobId, resolvedDepJobIds); + } + log( `Added job ${jobId}: payload ${JSON.stringify(payload)}, ${runAt ? `runAt ${runAt.toISOString()}, ` : ''}priority ${priority}, maxAttempts ${maxAttempts}, jobType ${jobType}, tags ${JSON.stringify(tags)}${idempotencyKey ? `, idempotencyKey "${idempotencyKey}"` : ''}`, ); + const addedMeta = { + jobType, + payload, + tags, + idempotencyKey, + dependsOn: + dependsOnJobIdsParam || dependsOnTagsParam ? dependsOn : undefined, + }; + if (externalClient) { try { await client.query( `INSERT INTO job_events (job_id, event_type, metadata) VALUES ($1, $2, $3)`, - [ - jobId, - JobEventType.Added, - JSON.stringify({ jobType, payload, tags, idempotencyKey }), - ], + [jobId, JobEventType.Added, JSON.stringify(addedMeta)], ); } catch (error) { log(`Error recording job event for job ${jobId}: ${error}`); } } else { - await this.recordJobEvent(jobId, JobEventType.Added, { - jobType, - payload, - tags, - idempotencyKey, - }); + await this.recordJobEvent(jobId, JobEventType.Added, addedMeta); } + if (manageTx) await client.query('COMMIT'); return jobId; } catch (error) { + if (manageTx) { + try { + await client.query('ROLLBACK'); + } catch { + /* ignore */ + } + } log(`Error adding job: ${error}`); throw error; } finally { @@ -289,7 +359,53 @@ export class PostgresBackend implements QueueBackend { const client: DatabaseClient = externalClient ?? (await this.pool.connect()); try { - const COLS_PER_JOB = 15; + const needsSequential = jobs.some((j) => { + const n = normalizeDependsOn(j.dependsOn); + return Boolean(n.jobIds?.length || n.tags?.length); + }); + + if (needsSequential) { + const useOuterTx = !externalClient; + if (useOuterTx) await client.query('BEGIN'); + try { + const ids: number[] = []; + for (let i = 0; i < jobs.length; i++) { + let job = jobs[i]!; + const nd = normalizeDependsOn(job.dependsOn); + if (nd.jobIds?.some((id) => id < 0)) { + const resolvedJobIds = resolveDependsOnJobIdsForBatch( + nd.jobIds, + ids, + ); + job = { + ...job, + dependsOn: { + jobIds: resolvedJobIds, + tags: job.dependsOn?.tags, + }, + }; + } + const id = await this.addJob(job, { db: client }); + ids.push(id); + } + if (useOuterTx) await client.query('COMMIT'); + log( + `Batch-inserted ${jobs.length} jobs (sequential), IDs: [${ids.join(', ')}]`, + ); + return ids; + } catch (e) { + if (!externalClient) { + try { + await client.query('ROLLBACK'); + } catch { + /* ignore */ + } + } + throw e; + } + } + + const COLS_PER_JOB = 17; const valueClauses: string[] = []; const params: any[] = []; @@ -318,7 +434,7 @@ export class PostgresBackend implements QueueBackend { `($${base + 1}, $${base + 2}, $${base + 3}, $${base + 4}, ` + `COALESCE($${base + 5}::timestamptz, CURRENT_TIMESTAMP), ` + `$${base + 6}, $${base + 7}, $${base + 8}, $${base + 9}, ` + - `$${base + 10}, $${base + 11}, $${base + 12}, $${base + 13}, $${base + 14}, $${base + 15})`, + `$${base + 10}, $${base + 11}, $${base + 12}, $${base + 13}, $${base + 14}, $${base + 15}, $${base + 16}, $${base + 17})`, ); params.push( jobType, @@ -336,6 +452,8 @@ export class PostgresBackend implements QueueBackend { deadLetterJobType ?? null, group?.id ?? null, group?.tier ?? null, + null, + null, ); } @@ -345,7 +463,7 @@ export class PostgresBackend implements QueueBackend { const result = await client.query( `INSERT INTO job_queue - (job_type, payload, max_attempts, priority, run_at, timeout_ms, force_kill_on_timeout, tags, idempotency_key, retry_delay, retry_backoff, retry_delay_max, dead_letter_job_type, group_id, group_tier) + (job_type, payload, max_attempts, priority, run_at, timeout_ms, force_kill_on_timeout, tags, idempotency_key, retry_delay, retry_backoff, retry_delay_max, dead_letter_job_type, group_id, group_tier, depends_on_job_ids, depends_on_tags) VALUES ${valueClauses.join(', ')} ${onConflict} RETURNING id, idempotency_key`, @@ -412,6 +530,7 @@ export class PostgresBackend implements QueueBackend { const wasInserted = !job.idempotencyKey || !missingKeys.includes(job.idempotencyKey); if (wasInserted) { + const nd = normalizeDependsOn(job.dependsOn); newJobEvents.push({ jobId: ids[i], eventType: JobEventType.Added, @@ -420,6 +539,9 @@ export class PostgresBackend implements QueueBackend { payload: job.payload, tags: job.tags, idempotencyKey: job.idempotencyKey, + ...(nd.jobIds?.length || nd.tags?.length + ? { dependsOn: job.dependsOn } + : {}), }, }); } @@ -467,7 +589,7 @@ export class PostgresBackend implements QueueBackend { const client = await this.pool.connect(); try { const result = await client.query( - `SELECT id, job_type AS "jobType", payload, status, max_attempts AS "maxAttempts", attempts, priority, run_at AS "runAt", timeout_ms AS "timeoutMs", force_kill_on_timeout AS "forceKillOnTimeout", created_at AS "createdAt", updated_at AS "updatedAt", started_at AS "startedAt", completed_at AS "completedAt", last_failed_at AS "lastFailedAt", locked_at AS "lockedAt", locked_by AS "lockedBy", error_history AS "errorHistory", failure_reason AS "failureReason", next_attempt_at AS "nextAttemptAt", last_failed_at AS "lastFailedAt", last_retried_at AS "lastRetriedAt", last_cancelled_at AS "lastCancelledAt", pending_reason AS "pendingReason", tags, idempotency_key AS "idempotencyKey", wait_until AS "waitUntil", wait_token_id AS "waitTokenId", step_data AS "stepData", progress, retry_delay AS "retryDelay", retry_backoff AS "retryBackoff", retry_delay_max AS "retryDelayMax", dead_letter_job_type AS "deadLetterJobType", dead_lettered_at AS "deadLetteredAt", dead_letter_job_id AS "deadLetterJobId", group_id AS "groupId", group_tier AS "groupTier", output FROM job_queue WHERE id = $1`, + `SELECT id, job_type AS "jobType", payload, status, max_attempts AS "maxAttempts", attempts, priority, run_at AS "runAt", timeout_ms AS "timeoutMs", force_kill_on_timeout AS "forceKillOnTimeout", created_at AS "createdAt", updated_at AS "updatedAt", started_at AS "startedAt", completed_at AS "completedAt", last_failed_at AS "lastFailedAt", locked_at AS "lockedAt", locked_by AS "lockedBy", error_history AS "errorHistory", failure_reason AS "failureReason", next_attempt_at AS "nextAttemptAt", last_failed_at AS "lastFailedAt", last_retried_at AS "lastRetriedAt", last_cancelled_at AS "lastCancelledAt", pending_reason AS "pendingReason", tags, idempotency_key AS "idempotencyKey", wait_until AS "waitUntil", wait_token_id AS "waitTokenId", step_data AS "stepData", progress, retry_delay AS "retryDelay", retry_backoff AS "retryBackoff", retry_delay_max AS "retryDelayMax", dead_letter_job_type AS "deadLetterJobType", dead_lettered_at AS "deadLetteredAt", dead_letter_job_id AS "deadLetterJobId", group_id AS "groupId", group_tier AS "groupTier", depends_on_job_ids AS "dependsOnJobIds", depends_on_tags AS "dependsOnTags", output FROM job_queue WHERE id = $1`, [id], ); @@ -501,7 +623,7 @@ export class PostgresBackend implements QueueBackend { const client = await this.pool.connect(); try { const result = await client.query( - `SELECT id, job_type AS "jobType", payload, status, max_attempts AS "maxAttempts", attempts, priority, run_at AS "runAt", timeout_ms AS "timeoutMs", force_kill_on_timeout AS "forceKillOnTimeout", created_at AS "createdAt", updated_at AS "updatedAt", started_at AS "startedAt", completed_at AS "completedAt", last_failed_at AS "lastFailedAt", locked_at AS "lockedAt", locked_by AS "lockedBy", error_history AS "errorHistory", failure_reason AS "failureReason", next_attempt_at AS "nextAttemptAt", last_failed_at AS "lastFailedAt", last_retried_at AS "lastRetriedAt", last_cancelled_at AS "lastCancelledAt", pending_reason AS "pendingReason", idempotency_key AS "idempotencyKey", wait_until AS "waitUntil", wait_token_id AS "waitTokenId", step_data AS "stepData", progress, retry_delay AS "retryDelay", retry_backoff AS "retryBackoff", retry_delay_max AS "retryDelayMax", dead_letter_job_type AS "deadLetterJobType", dead_lettered_at AS "deadLetteredAt", dead_letter_job_id AS "deadLetterJobId", group_id AS "groupId", group_tier AS "groupTier", output FROM job_queue WHERE status = $1 ORDER BY created_at DESC LIMIT $2 OFFSET $3`, + `SELECT id, job_type AS "jobType", payload, status, max_attempts AS "maxAttempts", attempts, priority, run_at AS "runAt", timeout_ms AS "timeoutMs", force_kill_on_timeout AS "forceKillOnTimeout", created_at AS "createdAt", updated_at AS "updatedAt", started_at AS "startedAt", completed_at AS "completedAt", last_failed_at AS "lastFailedAt", locked_at AS "lockedAt", locked_by AS "lockedBy", error_history AS "errorHistory", failure_reason AS "failureReason", next_attempt_at AS "nextAttemptAt", last_failed_at AS "lastFailedAt", last_retried_at AS "lastRetriedAt", last_cancelled_at AS "lastCancelledAt", pending_reason AS "pendingReason", idempotency_key AS "idempotencyKey", wait_until AS "waitUntil", wait_token_id AS "waitTokenId", step_data AS "stepData", progress, retry_delay AS "retryDelay", retry_backoff AS "retryBackoff", retry_delay_max AS "retryDelayMax", dead_letter_job_type AS "deadLetterJobType", dead_lettered_at AS "deadLetteredAt", dead_letter_job_id AS "deadLetterJobId", group_id AS "groupId", group_tier AS "groupTier", depends_on_job_ids AS "dependsOnJobIds", depends_on_tags AS "dependsOnTags", output FROM job_queue WHERE status = $1 ORDER BY created_at DESC LIMIT $2 OFFSET $3`, [status, limit, offset], ); log(`Found ${result.rows.length} jobs by status ${status}`); @@ -527,7 +649,7 @@ export class PostgresBackend implements QueueBackend { const client = await this.pool.connect(); try { const result = await client.query( - `SELECT id, job_type AS "jobType", payload, status, max_attempts AS "maxAttempts", attempts, priority, run_at AS "runAt", timeout_ms AS "timeoutMs", force_kill_on_timeout AS "forceKillOnTimeout", created_at AS "createdAt", updated_at AS "updatedAt", started_at AS "startedAt", completed_at AS "completedAt", last_failed_at AS "lastFailedAt", locked_at AS "lockedAt", locked_by AS "lockedBy", error_history AS "errorHistory", failure_reason AS "failureReason", next_attempt_at AS "nextAttemptAt", last_failed_at AS "lastFailedAt", last_retried_at AS "lastRetriedAt", last_cancelled_at AS "lastCancelledAt", pending_reason AS "pendingReason", idempotency_key AS "idempotencyKey", wait_until AS "waitUntil", wait_token_id AS "waitTokenId", step_data AS "stepData", progress, retry_delay AS "retryDelay", retry_backoff AS "retryBackoff", retry_delay_max AS "retryDelayMax", dead_letter_job_type AS "deadLetterJobType", dead_lettered_at AS "deadLetteredAt", dead_letter_job_id AS "deadLetterJobId", group_id AS "groupId", group_tier AS "groupTier", output FROM job_queue ORDER BY created_at DESC LIMIT $1 OFFSET $2`, + `SELECT id, job_type AS "jobType", payload, status, max_attempts AS "maxAttempts", attempts, priority, run_at AS "runAt", timeout_ms AS "timeoutMs", force_kill_on_timeout AS "forceKillOnTimeout", created_at AS "createdAt", updated_at AS "updatedAt", started_at AS "startedAt", completed_at AS "completedAt", last_failed_at AS "lastFailedAt", locked_at AS "lockedAt", locked_by AS "lockedBy", error_history AS "errorHistory", failure_reason AS "failureReason", next_attempt_at AS "nextAttemptAt", last_failed_at AS "lastFailedAt", last_retried_at AS "lastRetriedAt", last_cancelled_at AS "lastCancelledAt", pending_reason AS "pendingReason", idempotency_key AS "idempotencyKey", wait_until AS "waitUntil", wait_token_id AS "waitTokenId", step_data AS "stepData", progress, retry_delay AS "retryDelay", retry_backoff AS "retryBackoff", retry_delay_max AS "retryDelayMax", dead_letter_job_type AS "deadLetterJobType", dead_lettered_at AS "deadLetteredAt", dead_letter_job_id AS "deadLetterJobId", group_id AS "groupId", group_tier AS "groupTier", depends_on_job_ids AS "dependsOnJobIds", depends_on_tags AS "dependsOnTags", output FROM job_queue ORDER BY created_at DESC LIMIT $1 OFFSET $2`, [limit, offset], ); log(`Found ${result.rows.length} jobs (all)`); @@ -552,7 +674,7 @@ export class PostgresBackend implements QueueBackend { ): Promise[]> { const client = await this.pool.connect(); try { - let query = `SELECT id, job_type AS "jobType", payload, status, max_attempts AS "maxAttempts", attempts, priority, run_at AS "runAt", timeout_ms AS "timeoutMs", force_kill_on_timeout AS "forceKillOnTimeout", created_at AS "createdAt", updated_at AS "updatedAt", started_at AS "startedAt", completed_at AS "completedAt", last_failed_at AS "lastFailedAt", locked_at AS "lockedAt", locked_by AS "lockedBy", error_history AS "errorHistory", failure_reason AS "failureReason", next_attempt_at AS "nextAttemptAt", last_failed_at AS "lastFailedAt", last_retried_at AS "lastRetriedAt", last_cancelled_at AS "lastCancelledAt", pending_reason AS "pendingReason", tags, idempotency_key AS "idempotencyKey", wait_until AS "waitUntil", wait_token_id AS "waitTokenId", step_data AS "stepData", progress, retry_delay AS "retryDelay", retry_backoff AS "retryBackoff", retry_delay_max AS "retryDelayMax", dead_letter_job_type AS "deadLetterJobType", dead_lettered_at AS "deadLetteredAt", dead_letter_job_id AS "deadLetterJobId", group_id AS "groupId", group_tier AS "groupTier", output FROM job_queue`; + let query = `SELECT id, job_type AS "jobType", payload, status, max_attempts AS "maxAttempts", attempts, priority, run_at AS "runAt", timeout_ms AS "timeoutMs", force_kill_on_timeout AS "forceKillOnTimeout", created_at AS "createdAt", updated_at AS "updatedAt", started_at AS "startedAt", completed_at AS "completedAt", last_failed_at AS "lastFailedAt", locked_at AS "lockedAt", locked_by AS "lockedBy", error_history AS "errorHistory", failure_reason AS "failureReason", next_attempt_at AS "nextAttemptAt", last_failed_at AS "lastFailedAt", last_retried_at AS "lastRetriedAt", last_cancelled_at AS "lastCancelledAt", pending_reason AS "pendingReason", tags, idempotency_key AS "idempotencyKey", wait_until AS "waitUntil", wait_token_id AS "waitTokenId", step_data AS "stepData", progress, retry_delay AS "retryDelay", retry_backoff AS "retryBackoff", retry_delay_max AS "retryDelayMax", dead_letter_job_type AS "deadLetterJobType", dead_lettered_at AS "deadLetteredAt", dead_letter_job_id AS "deadLetterJobId", group_id AS "groupId", group_tier AS "groupTier", depends_on_job_ids AS "dependsOnJobIds", depends_on_tags AS "dependsOnTags", output FROM job_queue`; const params: any[] = []; const where: string[] = []; let paramIdx = 1; @@ -679,7 +801,7 @@ export class PostgresBackend implements QueueBackend { ): Promise[]> { const client = await this.pool.connect(); try { - let query = `SELECT id, job_type AS "jobType", payload, status, max_attempts AS "maxAttempts", attempts, priority, run_at AS "runAt", timeout_ms AS "timeoutMs", created_at AS "createdAt", updated_at AS "updatedAt", started_at AS "startedAt", completed_at AS "completedAt", last_failed_at AS "lastFailedAt", locked_at AS "lockedAt", locked_by AS "lockedBy", error_history AS "errorHistory", failure_reason AS "failureReason", next_attempt_at AS "nextAttemptAt", last_failed_at AS "lastFailedAt", last_retried_at AS "lastRetriedAt", last_cancelled_at AS "lastCancelledAt", pending_reason AS "pendingReason", tags, idempotency_key AS "idempotencyKey", wait_until AS "waitUntil", wait_token_id AS "waitTokenId", step_data AS "stepData", progress, retry_delay AS "retryDelay", retry_backoff AS "retryBackoff", retry_delay_max AS "retryDelayMax", dead_letter_job_type AS "deadLetterJobType", dead_lettered_at AS "deadLetteredAt", dead_letter_job_id AS "deadLetterJobId", group_id AS "groupId", group_tier AS "groupTier", output + let query = `SELECT id, job_type AS "jobType", payload, status, max_attempts AS "maxAttempts", attempts, priority, run_at AS "runAt", timeout_ms AS "timeoutMs", created_at AS "createdAt", updated_at AS "updatedAt", started_at AS "startedAt", completed_at AS "completedAt", last_failed_at AS "lastFailedAt", locked_at AS "lockedAt", locked_by AS "lockedBy", error_history AS "errorHistory", failure_reason AS "failureReason", next_attempt_at AS "nextAttemptAt", last_failed_at AS "lastFailedAt", last_retried_at AS "lastRetriedAt", last_cancelled_at AS "lastCancelledAt", pending_reason AS "pendingReason", tags, idempotency_key AS "idempotencyKey", wait_until AS "waitUntil", wait_token_id AS "waitTokenId", step_data AS "stepData", progress, retry_delay AS "retryDelay", retry_backoff AS "retryBackoff", retry_delay_max AS "retryDelayMax", dead_letter_job_type AS "deadLetterJobType", dead_lettered_at AS "deadLetteredAt", dead_letter_job_id AS "deadLetterJobId", group_id AS "groupId", group_tier AS "groupTier", depends_on_job_ids AS "dependsOnJobIds", depends_on_tags AS "dependsOnTags", output FROM job_queue`; let params: any[] = []; switch (mode) { @@ -780,11 +902,12 @@ export class PostgresBackend implements QueueBackend { ) ) ${jobTypeFilter} + ${JOB_DEPENDS_ON_PREDICATE} ORDER BY candidate.priority DESC, candidate.created_at ASC LIMIT $2 FOR UPDATE SKIP LOCKED ) - RETURNING id, job_type AS "jobType", payload, status, max_attempts AS "maxAttempts", attempts, priority, run_at AS "runAt", timeout_ms AS "timeoutMs", force_kill_on_timeout AS "forceKillOnTimeout", created_at AS "createdAt", updated_at AS "updatedAt", started_at AS "startedAt", completed_at AS "completedAt", last_failed_at AS "lastFailedAt", locked_at AS "lockedAt", locked_by AS "lockedBy", error_history AS "errorHistory", failure_reason AS "failureReason", next_attempt_at AS "nextAttemptAt", last_retried_at AS "lastRetriedAt", last_cancelled_at AS "lastCancelledAt", pending_reason AS "pendingReason", idempotency_key AS "idempotencyKey", wait_until AS "waitUntil", wait_token_id AS "waitTokenId", step_data AS "stepData", progress, retry_delay AS "retryDelay", retry_backoff AS "retryBackoff", retry_delay_max AS "retryDelayMax", dead_letter_job_type AS "deadLetterJobType", dead_lettered_at AS "deadLetteredAt", dead_letter_job_id AS "deadLetterJobId", group_id AS "groupId", group_tier AS "groupTier", output + RETURNING id, job_type AS "jobType", payload, status, max_attempts AS "maxAttempts", attempts, priority, run_at AS "runAt", timeout_ms AS "timeoutMs", force_kill_on_timeout AS "forceKillOnTimeout", created_at AS "createdAt", updated_at AS "updatedAt", started_at AS "startedAt", completed_at AS "completedAt", last_failed_at AS "lastFailedAt", locked_at AS "lockedAt", locked_by AS "lockedBy", error_history AS "errorHistory", failure_reason AS "failureReason", next_attempt_at AS "nextAttemptAt", last_retried_at AS "lastRetriedAt", last_cancelled_at AS "lastCancelledAt", pending_reason AS "pendingReason", idempotency_key AS "idempotencyKey", wait_until AS "waitUntil", wait_token_id AS "waitTokenId", step_data AS "stepData", progress, retry_delay AS "retryDelay", retry_backoff AS "retryBackoff", retry_delay_max AS "retryDelayMax", dead_letter_job_type AS "deadLetterJobType", dead_lettered_at AS "deadLetteredAt", dead_letter_job_id AS "deadLetterJobId", group_id AS "groupId", group_tier AS "groupTier", depends_on_job_ids AS "dependsOnJobIds", depends_on_tags AS "dependsOnTags", output `, params, ); @@ -810,6 +933,7 @@ export class PostgresBackend implements QueueBackend { ) ) ${jobTypeFilter} + ${JOB_DEPENDS_ON_PREDICATE} FOR UPDATE SKIP LOCKED ), ranked AS ( @@ -852,7 +976,7 @@ export class PostgresBackend implements QueueBackend { last_retried_at = CASE WHEN status != 'waiting' AND attempts > 0 THEN NOW() ELSE last_retried_at END, wait_until = NULL WHERE id IN (SELECT id FROM selected) - RETURNING id, job_type AS "jobType", payload, status, max_attempts AS "maxAttempts", attempts, priority, run_at AS "runAt", timeout_ms AS "timeoutMs", force_kill_on_timeout AS "forceKillOnTimeout", created_at AS "createdAt", updated_at AS "updatedAt", started_at AS "startedAt", completed_at AS "completedAt", last_failed_at AS "lastFailedAt", locked_at AS "lockedAt", locked_by AS "lockedBy", error_history AS "errorHistory", failure_reason AS "failureReason", next_attempt_at AS "nextAttemptAt", last_retried_at AS "lastRetriedAt", last_cancelled_at AS "lastCancelledAt", pending_reason AS "pendingReason", idempotency_key AS "idempotencyKey", wait_until AS "waitUntil", wait_token_id AS "waitTokenId", step_data AS "stepData", progress, retry_delay AS "retryDelay", retry_backoff AS "retryBackoff", retry_delay_max AS "retryDelayMax", dead_letter_job_type AS "deadLetterJobType", dead_lettered_at AS "deadLetteredAt", dead_letter_job_id AS "deadLetterJobId", group_id AS "groupId", group_tier AS "groupTier", output + RETURNING id, job_type AS "jobType", payload, status, max_attempts AS "maxAttempts", attempts, priority, run_at AS "runAt", timeout_ms AS "timeoutMs", force_kill_on_timeout AS "forceKillOnTimeout", created_at AS "createdAt", updated_at AS "updatedAt", started_at AS "startedAt", completed_at AS "completedAt", last_failed_at AS "lastFailedAt", locked_at AS "lockedAt", locked_by AS "lockedBy", error_history AS "errorHistory", failure_reason AS "failureReason", next_attempt_at AS "nextAttemptAt", last_retried_at AS "lastRetriedAt", last_cancelled_at AS "lastCancelledAt", pending_reason AS "pendingReason", idempotency_key AS "idempotencyKey", wait_until AS "waitUntil", wait_token_id AS "waitTokenId", step_data AS "stepData", progress, retry_delay AS "retryDelay", retry_backoff AS "retryBackoff", retry_delay_max AS "retryDelayMax", dead_letter_job_type AS "deadLetterJobType", dead_lettered_at AS "deadLetteredAt", dead_letter_job_id AS "deadLetterJobId", group_id AS "groupId", group_tier AS "groupTier", depends_on_job_ids AS "dependsOnJobIds", depends_on_tags AS "dependsOnTags", output `, constrainedParams, ); @@ -915,6 +1039,87 @@ export class PostgresBackend implements QueueBackend { } } + /** + * Cancel pending/waiting jobs that depend on any seed job (by job id or tag superset), transitively. + * + * @param client - Database client (must be inside an open transaction when used from fail/cancel). + * @param initialSeeds - Job ids that just failed or were cancelled. + * @param rootJobId - Original job id for event metadata. + */ + private async propagateDependencyCancellations( + client: DatabaseClient, + initialSeeds: number[], + rootJobId: number, + ): Promise { + const seeds = [...new Set(initialSeeds.filter((id) => id > 0))]; + if (seeds.length === 0) return; + + const cancelled = new Set(); + const reasonJson = JSON.stringify({ + rootJobId, + dependencyCascade: true, + }); + + let frontier = seeds; + while (frontier.length > 0) { + const res = await client.query( + ` + SELECT DISTINCT j.id + FROM job_queue j + CROSS JOIN unnest($1::int[]) AS s(id) + INNER JOIN job_queue sx ON sx.id = s.id + WHERE j.status IN ('pending', 'waiting') + AND j.id <> sx.id + AND ( + j.depends_on_job_ids @> ARRAY[s.id]::integer[] + OR ( + j.depends_on_tags IS NOT NULL + AND cardinality(j.depends_on_tags) > 0 + AND sx.tags IS NOT NULL + AND sx.tags @> j.depends_on_tags + ) + ) + `, + [frontier], + ); + + const toCancel: number[] = []; + for (const row of res.rows) { + const pid = row.id as number; + if (cancelled.has(pid)) continue; + cancelled.add(pid); + toCancel.push(pid); + } + + if (toCancel.length === 0) break; + + await client.query( + ` + UPDATE job_queue + SET status = 'cancelled', + updated_at = NOW(), + last_cancelled_at = NOW(), + wait_until = NULL, + wait_token_id = NULL, + pending_reason = $2 + WHERE id = ANY($1::int[]) + AND status IN ('pending', 'waiting') + `, + [toCancel, reasonJson], + ); + + const meta = JSON.stringify({ rootJobId, dependencyCascade: true }); + for (const jid of toCancel) { + await client.query( + `INSERT INTO job_events (job_id, event_type, metadata) VALUES ($1, $2, $3)`, + [jid, JobEventType.Cancelled, meta], + ); + } + + frontier = toCancel; + } + } + async failJob( jobId: number, error: Error, @@ -1000,8 +1205,8 @@ export class PostgresBackend implements QueueBackend { const deadLetterInsert = await client.query( `INSERT INTO job_queue - (job_type, payload, max_attempts, priority, run_at) - VALUES ($1, $2, $3, $4, NOW()) + (job_type, payload, max_attempts, priority, run_at, depends_on_job_ids, depends_on_tags) + VALUES ($1, $2, $3, $4, NOW(), NULL, NULL) RETURNING id`, [failedJob.deadLetterJobType, deadLetterPayload, 1, 0], ); @@ -1042,6 +1247,8 @@ export class PostgresBackend implements QueueBackend { ], ); + await this.propagateDependencyCancellations(client, [jobId], jobId); + await client.query('COMMIT'); log( `Failed job ${jobId}${deadLetterJobId ? ` and routed to dead-letter job ${deadLetterJobId}` : ''}`, @@ -1147,7 +1354,8 @@ export class PostgresBackend implements QueueBackend { async cancelJob(jobId: number): Promise { const client = await this.pool.connect(); try { - await client.query( + await client.query('BEGIN'); + const upd = await client.query( ` UPDATE job_queue SET status = 'cancelled', updated_at = NOW(), last_cancelled_at = NOW(), @@ -1156,9 +1364,26 @@ export class PostgresBackend implements QueueBackend { `, [jobId], ); - await this.recordJobEvent(jobId, JobEventType.Cancelled); + if (upd.rowCount === 0) { + await client.query('ROLLBACK'); + log( + `Job ${jobId} could not be cancelled (not in pending/waiting state or does not exist)`, + ); + return; + } + await client.query( + `INSERT INTO job_events (job_id, event_type, metadata) VALUES ($1, $2, $3)`, + [jobId, JobEventType.Cancelled, null], + ); + await this.propagateDependencyCancellations(client, [jobId], jobId); + await client.query('COMMIT'); log(`Cancelled job ${jobId}`); } catch (error) { + try { + await client.query('ROLLBACK'); + } catch { + /* ignore */ + } log(`Error cancelling job ${jobId}: ${error}`); throw error; } finally { diff --git a/packages/dataqueue/src/backends/redis-scripts.ts b/packages/dataqueue/src/backends/redis-scripts.ts index 376c33a..f2aeffe 100644 --- a/packages/dataqueue/src/backends/redis-scripts.ts +++ b/packages/dataqueue/src/backends/redis-scripts.ts @@ -32,7 +32,8 @@ const SCORE_RANGE = '1000000000000000'; // 1e15 * KEYS: [prefix] * ARGV: [jobType, payloadJson, maxAttempts, priority, runAtMs, timeoutMs, * forceKillOnTimeout, tagsJson, idempotencyKey, nowMs, - * retryDelay, retryBackoff, retryDelayMax, deadLetterJobType, groupId, groupTier] + * retryDelay, retryBackoff, retryDelayMax, deadLetterJobType, groupId, groupTier, + * dependsOnJobIdsJson, dependsOnTagsJson] * Returns: job ID (number) */ export const ADD_JOB_SCRIPT = ` @@ -53,6 +54,8 @@ local retryDelayMax = ARGV[13] -- "null" or seconds string local deadLetterJobType = ARGV[14] -- "null" or jobType string local groupId = ARGV[15] -- "null" or group ID local groupTier = ARGV[16] -- "null" or group tier +local dependsOnJobIdsJson = ARGV[17] -- "null" or JSON array of job ids +local dependsOnTagsJson = ARGV[18] -- "null" or JSON array of tags -- Idempotency check if idempotencyKey ~= "null" then @@ -104,9 +107,18 @@ redis.call('HMSET', jobKey, 'deadLetteredAt', 'null', 'deadLetterJobId', 'null', 'groupId', groupId, - 'groupTier', groupTier + 'groupTier', groupTier, + 'dependsOnJobIds', dependsOnJobIdsJson, + 'dependsOnTags', dependsOnTagsJson ) +if dependsOnJobIdsJson ~= "null" then + local depIds = cjson.decode(dependsOnJobIdsJson) + for _, parentId in ipairs(depIds) do + redis.call('SADD', prefix .. 'dep:' .. tostring(parentId), tostring(id)) + end +end + -- Status index redis.call('SADD', prefix .. 'status:pending', id) @@ -180,6 +192,8 @@ for i, job in ipairs(jobs) do local deadLetterJobType = tostring(job.deadLetterJobType) local groupId = tostring(job.groupId) local groupTier = tostring(job.groupTier) + local dependsOnJobIdsJson = job.dependsOnJobIds and tostring(job.dependsOnJobIds) or "null" + local dependsOnTagsJson = job.dependsOnTags and tostring(job.dependsOnTags) or "null" -- Idempotency check local skip = false @@ -234,9 +248,18 @@ for i, job in ipairs(jobs) do 'deadLetteredAt', 'null', 'deadLetterJobId', 'null', 'groupId', groupId, - 'groupTier', groupTier + 'groupTier', groupTier, + 'dependsOnJobIds', dependsOnJobIdsJson, + 'dependsOnTags', dependsOnTagsJson ) + if dependsOnJobIdsJson ~= "null" then + local depIds = cjson.decode(dependsOnJobIdsJson) + for _, parentId in ipairs(depIds) do + redis.call('SADD', prefix .. 'dep:' .. tostring(parentId), tostring(id)) + end + end + -- Status index redis.call('SADD', prefix .. 'status:pending', id) @@ -409,6 +432,48 @@ for i = 1, #candidates, 2 do end end + if canClaim then + local depIdsJson = redis.call('HGET', jk, 'dependsOnJobIds') + local depTagsJson = redis.call('HGET', jk, 'dependsOnTags') + local depsOk = true + if depIdsJson and depIdsJson ~= 'null' then + local dids = cjson.decode(depIdsJson) + for _, pid in ipairs(dids) do + local pst = redis.call('HGET', prefix .. 'job:' .. pid, 'status') + if pst ~= 'completed' then depsOk = false break end + end + end + if depsOk and depTagsJson and depTagsJson ~= 'null' then + local req = cjson.decode(depTagsJson) + if #req > 0 then + for _, stname in ipairs({'pending','processing','waiting'}) do + local members = redis.call('SMEMBERS', prefix .. 'status:' .. stname) + for _, oid in ipairs(members) do + if oid ~= jobId then + local otags = redis.call('HGET', prefix .. 'job:' .. oid, 'tags') + if otags and otags ~= 'null' then + local oarr = cjson.decode(otags) + local tagset = {} + for _, t in ipairs(oarr) do tagset[t] = true end + local all = true + for _, rt in ipairs(req) do + if not tagset[rt] then all = false break end + end + if all then depsOk = false break end + end + end + end + if not depsOk then break end + end + end + end + if not depsOk then + table.insert(putBack, score) + table.insert(putBack, jobId) + canClaim = false + end + end + if canClaim then -- Claim this job local attempts = tonumber(redis.call('HGET', jk, 'attempts')) @@ -495,6 +560,14 @@ if groupId and groupId ~= 'null' then end end +local depIdsJson = redis.call('HGET', jk, 'dependsOnJobIds') +if depIdsJson and depIdsJson ~= 'null' then + local dids = cjson.decode(depIdsJson) + for _, pid in ipairs(dids) do + redis.call('SREM', prefix .. 'dep:' .. tostring(pid), jobId) + end +end + return 1 `; @@ -650,7 +723,11 @@ if nextAttemptAt == 'null' and deadLetterJobType and deadLetterJobType ~= 'null' 'retryDelayMax', 'null', 'deadLetterJobType', 'null', 'deadLetteredAt', 'null', - 'deadLetterJobId', 'null' + 'deadLetterJobId', 'null', + 'dependsOnJobIds', 'null', + 'dependsOnTags', 'null', + 'groupId', 'null', + 'groupTier', 'null' ) redis.call('SADD', prefix .. 'status:pending', deadLetterJobId) @@ -665,6 +742,14 @@ if nextAttemptAt == 'null' and deadLetterJobType and deadLetterJobType ~= 'null' ) end +local depIdsJsonFail = redis.call('HGET', jk, 'dependsOnJobIds') +if depIdsJsonFail and depIdsJsonFail ~= 'null' then + local dids = cjson.decode(depIdsJsonFail) + for _, pid in ipairs(dids) do + redis.call('SREM', prefix .. 'dep:' .. tostring(pid), jobId) + end +end + return deadLetterJobId `; @@ -743,6 +828,14 @@ redis.call('ZREM', prefix .. 'queue', jobId) redis.call('ZREM', prefix .. 'delayed', jobId) redis.call('ZREM', prefix .. 'waiting', jobId) +local depIdsJsonCan = redis.call('HGET', jk, 'dependsOnJobIds') +if depIdsJsonCan and depIdsJsonCan ~= 'null' then + local dids = cjson.decode(depIdsJsonCan) + for _, pid in ipairs(dids) do + redis.call('SREM', prefix .. 'dep:' .. tostring(pid), jobId) + end +end + return 1 `; diff --git a/packages/dataqueue/src/backends/redis.ts b/packages/dataqueue/src/backends/redis.ts index f1ea1ea..6817de5 100644 --- a/packages/dataqueue/src/backends/redis.ts +++ b/packages/dataqueue/src/backends/redis.ts @@ -23,6 +23,11 @@ import { CronScheduleInput, } from '../backend.js'; import { log } from '../log-context.js'; +import { + normalizeDependsOn, + resolveDependsOnJobIdsForBatch, + tagsAreSuperset, +} from '../job-dependencies.js'; const MAX_TIMEOUT_MS = 365 * 24 * 60 * 60 * 1000; @@ -181,9 +186,31 @@ function deserializeJob>( groupId: nullish(h.groupId) as string | null | undefined, groupTier: nullish(h.groupTier) as string | null | undefined, output: parseJsonField(h.output), + dependsOnJobIds: parseOptionalIntArray(h.dependsOnJobIds), + dependsOnTags: parseOptionalStringArray(h.dependsOnTags), }; } +function parseOptionalIntArray(raw: string | undefined): number[] | null { + if (!raw || raw === 'null') return null; + try { + const arr = JSON.parse(raw) as number[]; + return Array.isArray(arr) && arr.length > 0 ? arr : null; + } catch { + return null; + } +} + +function parseOptionalStringArray(raw: string | undefined): string[] | null { + if (!raw || raw === 'null') return null; + try { + const arr = JSON.parse(raw) as string[]; + return Array.isArray(arr) && arr.length > 0 ? arr : null; + } catch { + return null; + } +} + /** Parse a JSON field from a Redis hash, returning null for missing/null values. */ function parseJsonField(raw: string | undefined): unknown { if (!raw || raw === 'null') return null; @@ -271,6 +298,76 @@ export class RedisBackend implements QueueBackend { return Date.now(); } + /** + * Cancel pending/waiting jobs that depend on seed jobs (job id or tag), transitively. + * + * @param initialSeeds - Job ids that failed or were cancelled. + * @param rootJobId - Root id for event metadata. + */ + private async propagateDependencyCancellationsRedis( + initialSeeds: number[], + rootJobId: number, + ): Promise { + const cancelled = new Set(); + let frontier = [...new Set(initialSeeds.filter((id) => id > 0))]; + + while (frontier.length > 0) { + const pendingRaw = await this.client.sunion( + `${this.prefix}status:pending`, + `${this.prefix}status:waiting`, + ); + const toCancel: number[] = []; + + for (const pidStr of pendingRaw) { + const pid = Number(pidStr); + if (cancelled.has(pid)) continue; + const job = await this.getJob(pid); + if (!job || (job.status !== 'pending' && job.status !== 'waiting')) { + continue; + } + + for (const seedId of frontier) { + if (pid === seedId) continue; + const seedJob = await this.getJob(seedId); + if (!seedJob) continue; + + const byJobId = job.dependsOnJobIds?.includes(seedId) ?? false; + const byTag = + job.dependsOnTags && + job.dependsOnTags.length > 0 && + tagsAreSuperset(seedJob.tags, job.dependsOnTags); + + if (byJobId || byTag) { + toCancel.push(pid); + break; + } + } + } + + if (toCancel.length === 0) break; + + const now = this.nowMs(); + for (const jid of toCancel) { + const ok = await this.client.eval( + CANCEL_JOB_SCRIPT, + 1, + this.prefix, + jid, + now, + ); + if (Number(ok) === 1) { + cancelled.add(jid); + await this.recordJobEvent(jid, JobEventType.Cancelled, { + rootJobId, + dependencyCascade: true, + }); + } + } + + frontier = toCancel; + } + } + // ── Events ────────────────────────────────────────────────────────── async recordJobEvent( @@ -327,6 +424,7 @@ export class RedisBackend implements QueueBackend { retryDelayMax = undefined, deadLetterJobType = undefined, group = undefined, + dependsOn, }: JobOptions, options?: AddJobOptions, ): Promise { @@ -336,6 +434,20 @@ export class RedisBackend implements QueueBackend { 'Transactional job creation is only available with PostgreSQL.', ); } + const { jobIds: depJobIdsRaw, tags: depTags } = + normalizeDependsOn(dependsOn); + if (depJobIdsRaw?.some((id) => id < 0)) { + throw new Error( + 'dependsOn.jobIds: batch-relative (negative) ids are only supported in addJobs()', + ); + } + const dependsOnJobIdsJson = + depJobIdsRaw && depJobIdsRaw.length > 0 + ? JSON.stringify(depJobIdsRaw) + : 'null'; + const dependsOnTagsJson = + depTags && depTags.length > 0 ? JSON.stringify(depTags) : 'null'; + const now = this.nowMs(); const runAtMs = runAt ? runAt.getTime() : 0; @@ -359,6 +471,8 @@ export class RedisBackend implements QueueBackend { deadLetterJobType ?? 'null', group?.id ?? 'null', group?.tier ?? 'null', + dependsOnJobIdsJson, + dependsOnTagsJson, )) as number; const jobId = Number(result); @@ -370,6 +484,10 @@ export class RedisBackend implements QueueBackend { payload, tags, idempotencyKey, + dependsOn: + dependsOnJobIdsJson !== 'null' || dependsOnTagsJson !== 'null' + ? dependsOn + : undefined, }); return jobId; } @@ -391,29 +509,67 @@ export class RedisBackend implements QueueBackend { ); } + const needsSequential = jobs.some((j) => { + const n = normalizeDependsOn(j.dependsOn); + return Boolean(n.jobIds?.length || n.tags?.length); + }); + + if (needsSequential) { + const ids: number[] = []; + for (let i = 0; i < jobs.length; i++) { + let job = jobs[i]!; + const nd = normalizeDependsOn(job.dependsOn); + if (nd.jobIds?.some((id) => id < 0)) { + const resolvedJobIds = resolveDependsOnJobIdsForBatch( + nd.jobIds!, + ids, + ); + job = { + ...job, + dependsOn: { + jobIds: resolvedJobIds, + tags: job.dependsOn?.tags, + }, + }; + } + ids.push(await this.addJob(job)); + } + log( + `Batch-inserted ${jobs.length} jobs (sequential), IDs: [${ids.join(', ')}]`, + ); + return ids; + } + const now = this.nowMs(); - const jobsPayload = jobs.map((job) => ({ - jobType: job.jobType, - payload: JSON.stringify(job.payload), - maxAttempts: job.maxAttempts ?? 3, - priority: job.priority ?? 0, - runAtMs: job.runAt ? job.runAt.getTime() : 0, - timeoutMs: - job.timeoutMs !== undefined ? job.timeoutMs.toString() : 'null', - forceKillOnTimeout: job.forceKillOnTimeout ? 'true' : 'false', - tags: job.tags ? JSON.stringify(job.tags) : 'null', - idempotencyKey: job.idempotencyKey ?? 'null', - retryDelay: - job.retryDelay !== undefined ? job.retryDelay.toString() : 'null', - retryBackoff: - job.retryBackoff !== undefined ? job.retryBackoff.toString() : 'null', - retryDelayMax: - job.retryDelayMax !== undefined ? job.retryDelayMax.toString() : 'null', - deadLetterJobType: job.deadLetterJobType ?? 'null', - groupId: job.group?.id ?? 'null', - groupTier: job.group?.tier ?? 'null', - })); + const jobsPayload = jobs.map((job) => { + const nd = normalizeDependsOn(job.dependsOn); + return { + jobType: job.jobType, + payload: JSON.stringify(job.payload), + maxAttempts: job.maxAttempts ?? 3, + priority: job.priority ?? 0, + runAtMs: job.runAt ? job.runAt.getTime() : 0, + timeoutMs: + job.timeoutMs !== undefined ? job.timeoutMs.toString() : 'null', + forceKillOnTimeout: job.forceKillOnTimeout ? 'true' : 'false', + tags: job.tags ? JSON.stringify(job.tags) : 'null', + idempotencyKey: job.idempotencyKey ?? 'null', + retryDelay: + job.retryDelay !== undefined ? job.retryDelay.toString() : 'null', + retryBackoff: + job.retryBackoff !== undefined ? job.retryBackoff.toString() : 'null', + retryDelayMax: + job.retryDelayMax !== undefined + ? job.retryDelayMax.toString() + : 'null', + deadLetterJobType: job.deadLetterJobType ?? 'null', + groupId: job.group?.id ?? 'null', + groupTier: job.group?.tier ?? 'null', + dependsOnJobIds: nd.jobIds?.length ? JSON.stringify(nd.jobIds) : null, + dependsOnTags: nd.tags?.length ? JSON.stringify(nd.tags) : null, + }; + }); const result = (await this.client.eval( ADD_JOBS_SCRIPT, @@ -665,6 +821,7 @@ export class RedisBackend implements QueueBackend { failureReason, deadLetterJobId, }); + await this.propagateDependencyCancellationsRedis([jobId], jobId); if (deadLetterJobId) { const sourceJob = await this.client.hget( `${this.prefix}job:${jobId}`, @@ -743,9 +900,22 @@ export class RedisBackend implements QueueBackend { async cancelJob(jobId: number): Promise { const now = this.nowMs(); - await this.client.eval(CANCEL_JOB_SCRIPT, 1, this.prefix, jobId, now); - await this.recordJobEvent(jobId, JobEventType.Cancelled); - log(`Cancelled job ${jobId}`); + const ok = await this.client.eval( + CANCEL_JOB_SCRIPT, + 1, + this.prefix, + jobId, + now, + ); + if (Number(ok) === 1) { + await this.recordJobEvent(jobId, JobEventType.Cancelled); + await this.propagateDependencyCancellationsRedis([jobId], jobId); + log(`Cancelled job ${jobId}`); + } else { + log( + `Job ${jobId} could not be cancelled (not in pending/waiting state or does not exist)`, + ); + } } async cancelAllUpcomingJobs(filters?: JobFilters): Promise { diff --git a/packages/dataqueue/src/index.ts b/packages/dataqueue/src/index.ts index 1e51945..1d4dbd5 100644 --- a/packages/dataqueue/src/index.ts +++ b/packages/dataqueue/src/index.ts @@ -442,6 +442,14 @@ const withLogContext = }; export * from './types.js'; +export { + batchDepRef, + normalizeDependsOn, + resolveDependsOnJobIdsForBatch, + tagsAreSuperset, + validatePrerequisiteJobIdsExist, + assertNoDependencyCycle, +} from './job-dependencies.js'; export { QueueBackend, CronScheduleInput } from './backend.js'; export { PostgresBackend } from './backends/postgres.js'; export { diff --git a/packages/dataqueue/src/job-dependencies.test.ts b/packages/dataqueue/src/job-dependencies.test.ts new file mode 100644 index 0000000..9461eb8 --- /dev/null +++ b/packages/dataqueue/src/job-dependencies.test.ts @@ -0,0 +1,129 @@ +import { describe, it, expect, vi } from 'vitest'; +import { + assertNoDependencyCycle, + batchDepRef, + normalizeDependsOn, + resolveDependsOnJobIdsForBatch, + tagsAreSuperset, + validatePrerequisiteJobIdsExist, +} from './job-dependencies.js'; +import type { DatabaseClient } from './types.js'; + +describe('batchDepRef', () => { + it('returns negative index encoding', () => { + expect(batchDepRef(0)).toBe(-1); + expect(batchDepRef(2)).toBe(-3); + }); + + it('throws on invalid index', () => { + expect(() => batchDepRef(-1)).toThrow(); + expect(() => batchDepRef(1.5)).toThrow(); + }); +}); + +describe('normalizeDependsOn', () => { + it('returns undefined for empty input', () => { + expect(normalizeDependsOn(undefined)).toEqual({ + jobIds: undefined, + tags: undefined, + }); + }); + + it('deduplicates and drops empty', () => { + expect( + normalizeDependsOn({ jobIds: [1, 1, 2], tags: ['a', 'a', 'b'] }), + ).toEqual({ + jobIds: [1, 2], + tags: ['a', 'b'], + }); + }); +}); + +describe('resolveDependsOnJobIdsForBatch', () => { + it('resolves negative placeholders', () => { + expect(resolveDependsOnJobIdsForBatch([-1, -2], [10, 20])).toEqual([ + 10, 20, + ]); + }); + + it('passes through positive ids', () => { + expect(resolveDependsOnJobIdsForBatch([5, -1], [99])).toEqual([5, 99]); + }); + + it('throws when index out of range', () => { + expect(() => resolveDependsOnJobIdsForBatch([-3], [1, 2])).toThrow(); + }); +}); + +describe('tagsAreSuperset', () => { + it('returns false for empty required', () => { + expect(tagsAreSuperset(['a'], [])).toBe(false); + }); + + it('checks inclusion', () => { + expect(tagsAreSuperset(['a', 'b'], ['a'])).toBe(true); + expect(tagsAreSuperset(['a'], ['a', 'b'])).toBe(false); + expect(tagsAreSuperset(null, ['a'])).toBe(false); + }); +}); + +describe('validatePrerequisiteJobIdsExist', () => { + it('no-ops for empty', async () => { + const client: DatabaseClient = { + query: vi.fn(), + }; + await validatePrerequisiteJobIdsExist(client, []); + expect(client.query).not.toHaveBeenCalled(); + }); + + it('throws when count mismatches', async () => { + const client: DatabaseClient = { + query: vi.fn().mockResolvedValue({ rows: [{ c: 1 }], rowCount: 1 }), + }; + await expect( + validatePrerequisiteJobIdsExist(client, [1, 2]), + ).rejects.toThrow(/do not exist/); + }); + + it('resolves when all exist', async () => { + const client: DatabaseClient = { + query: vi.fn().mockResolvedValue({ rows: [{ c: 2 }], rowCount: 1 }), + }; + await validatePrerequisiteJobIdsExist(client, [1, 2]); + expect(client.query).toHaveBeenCalledTimes(1); + }); +}); + +describe('assertNoDependencyCycle', () => { + it('throws on self-dependency', async () => { + const client: DatabaseClient = { query: vi.fn() }; + await expect(assertNoDependencyCycle(client, 1, [1])).rejects.toThrow( + /cannot depend on itself/, + ); + }); + + it('throws when cycle detected', async () => { + const client: DatabaseClient = { + query: vi + .fn() + .mockResolvedValue({ rows: [{ '?column?': 1 }], rowCount: 1 }), + }; + await expect(assertNoDependencyCycle(client, 5, [1, 2])).rejects.toThrow( + /cycle/, + ); + }); + + it('no-ops when no deps', async () => { + const client: DatabaseClient = { query: vi.fn() }; + await assertNoDependencyCycle(client, 1, []); + expect(client.query).not.toHaveBeenCalled(); + }); + + it('no-ops when query returns no cycle', async () => { + const client: DatabaseClient = { + query: vi.fn().mockResolvedValue({ rows: [], rowCount: 0 }), + }; + await assertNoDependencyCycle(client, 5, [1]); + expect(client.query).toHaveBeenCalledTimes(1); + }); +}); diff --git a/packages/dataqueue/src/job-dependencies.ts b/packages/dataqueue/src/job-dependencies.ts new file mode 100644 index 0000000..3d205ee --- /dev/null +++ b/packages/dataqueue/src/job-dependencies.ts @@ -0,0 +1,140 @@ +import type { DatabaseClient } from './types.js'; +import type { JobDependsOn } from './types.js'; + +/** + * Returns a negative placeholder id for `addJobs` batch ordering: `-(index + 1)`. + * Resolves to the id of the job at `batchIndex` in the same batch after inserts. + * + * @param batchIndex - Zero-based index into the `addJobs` array. + */ +export function batchDepRef(batchIndex: number): number { + if (!Number.isInteger(batchIndex) || batchIndex < 0) { + throw new Error( + `batchDepRef: expected non-negative integer index, got ${batchIndex}`, + ); + } + return -(batchIndex + 1); +} + +/** + * Normalizes optional `dependsOn`: empty arrays become undefined, ids de-duplicated. + * + * @param dep - Raw dependency options from the caller. + */ +export function normalizeDependsOn(dep?: JobDependsOn): { + jobIds: number[] | undefined; + tags: string[] | undefined; +} { + if (!dep) return { jobIds: undefined, tags: undefined }; + const jobIds = + dep.jobIds && dep.jobIds.length > 0 ? [...new Set(dep.jobIds)] : undefined; + const tags = + dep.tags && dep.tags.length > 0 ? [...new Set(dep.tags)] : undefined; + return { jobIds, tags }; +} + +/** + * Resolves batch-relative negative ids to real job ids after partial batch inserts. + * + * @param jobIds - May contain negative placeholders from {@link batchDepRef}. + * @param insertedIds - Ids inserted so far, index-aligned with the batch array prefix. + */ +export function resolveDependsOnJobIdsForBatch( + jobIds: number[], + insertedIds: number[], +): number[] { + return jobIds.map((id) => { + if (id >= 0) return id; + const idx = -id - 1; + if (idx < 0 || idx >= insertedIds.length) { + throw new Error( + `Invalid batch-relative job id ${id}: index ${idx} out of range for ${insertedIds.length} inserted job(s)`, + ); + } + return insertedIds[idx]!; + }); +} + +/** + * Returns true if `holderTags` contains every tag in `requiredTags` (set inclusion). + * + * @param holderTags - Tags on job X. + * @param requiredTags - `depends_on_tags` on dependent D. + */ +export function tagsAreSuperset( + holderTags: string[] | null | undefined, + requiredTags: string[] | null | undefined, +): boolean { + if (!requiredTags || requiredTags.length === 0) return false; + if (!holderTags || holderTags.length === 0) return false; + const set = new Set(holderTags); + for (const t of requiredTags) { + if (!set.has(t)) return false; + } + return true; +} + +/** + * Throws if inserting a job with `dependsOnJobIds` would create a cycle. + * Uses: jobs reachable downstream from `newJobId` must not include any prerequisite id + * (equivalently: a prerequisite must not lie in the downstream closure of `newJobId`). + * + * @param client - DB client (transaction). + * @param newJobId - Id of the row just inserted. + * @param dependsOnJobIds - Resolved positive prerequisite ids. + */ +/** + * Ensures every id in `jobIds` exists in `job_queue`. + * + * @param client - Database client. + * @param jobIds - Resolved positive job ids. + */ +export async function validatePrerequisiteJobIdsExist( + client: DatabaseClient, + jobIds: number[], +): Promise { + if (jobIds.length === 0) return; + const r = await client.query( + `SELECT COUNT(*)::int AS c FROM job_queue WHERE id = ANY($1::int[])`, + [jobIds], + ); + const c = r.rows[0]?.c ?? 0; + if (c !== jobIds.length) { + throw new Error( + `dependsOn.jobIds: one or more job ids do not exist (${jobIds.join(', ')})`, + ); + } +} + +export async function assertNoDependencyCycle( + client: DatabaseClient, + newJobId: number, + dependsOnJobIds: number[], +): Promise { + if (dependsOnJobIds.length === 0) return; + if (dependsOnJobIds.includes(newJobId)) { + throw new Error( + `Job ${newJobId} cannot depend on itself (dependsOn.jobIds)`, + ); + } + const result = await client.query( + ` + WITH RECURSIVE downstream AS ( + SELECT j.id + FROM job_queue j + WHERE j.depends_on_job_ids @> ARRAY[$1::integer]::integer[] + UNION + SELECT j.id + FROM job_queue j + INNER JOIN downstream d ON j.depends_on_job_ids @> ARRAY[d.id]::integer[] + ) + SELECT 1 FROM downstream WHERE id = ANY($2::integer[]) LIMIT 1 + `, + [newJobId, dependsOnJobIds], + ); + if (result.rows.length > 0) { + throw new Error( + `Adding job ${newJobId} would create a dependency cycle (dependsOn.jobIds)`, + ); + } +} diff --git a/packages/dataqueue/src/types.ts b/packages/dataqueue/src/types.ts index 2972d80..66edf6e 100644 --- a/packages/dataqueue/src/types.ts +++ b/packages/dataqueue/src/types.ts @@ -42,6 +42,30 @@ export interface JobGroup { tier?: string; } +/** + * Declares prerequisites for a job. Both dimensions use logical AND. + * + * - `jobIds`: The job will not run until every listed job is `completed`. If any + * prerequisite becomes `failed` or `cancelled`, pending dependents are cancelled (transitively). + * - `tags`: Active barrier — the job will not run while another job (not self) is + * `pending`, `processing`, or `waiting` whose `tags` are a superset of every tag listed here + * (Postgres `tags @> depends_on_tags`). If any such job becomes `failed` or `cancelled`, + * pending jobs that list these tags are cancelled (transitively). + * + * **`addJobs` batch references:** In a batch insert, a negative job id means a 0-based index + * into the same batch array: use {@link batchDepRef} (e.g. `batchDepRef(0)` for the first job). + * Single `addJob` calls must use positive database ids only. + */ +export interface JobDependsOn { + /** Prerequisite job ids (must all reach `completed`). */ + jobIds?: number[]; + /** + * Tag drain: wait until no active job (pending/processing/waiting) has all of these tags. + * Requires matching jobs to succeed (dependents are cancelled if a matching job fails or is cancelled). + */ + tags?: string[]; +} + export interface JobOptions> { jobType: T; payload: PayloadMap[T]; @@ -149,6 +173,10 @@ export interface JobOptions> { * globally limited by `group.id` across all workers/instances. */ group?: JobGroup; + /** + * Optional prerequisites (job ids and/or tag drain). See {@link JobDependsOn}. + */ + dependsOn?: JobDependsOn; } /** @@ -309,6 +337,14 @@ export interface JobRecord> { * Group tier for this job, if provided at enqueue time. */ groupTier?: string | null; + /** + * Prerequisite job ids persisted at enqueue time, if any. + */ + dependsOnJobIds?: number[] | null; + /** + * Tag drain prerequisites persisted at enqueue time, if any. + */ + dependsOnTags?: string[] | null; } /** diff --git a/packages/react/src/types.ts b/packages/react/src/types.ts index b8e04ec..0b6699d 100644 --- a/packages/react/src/types.ts +++ b/packages/react/src/types.ts @@ -27,6 +27,10 @@ export interface JobData { status: JobStatus; progress?: number | null; output?: unknown; + /** Prerequisite job ids when returned from the server `JobRecord`. */ + dependsOnJobIds?: number[] | null; + /** Tag-drain prerequisites when returned from the server `JobRecord`. */ + dependsOnTags?: string[] | null; [key: string]: unknown; } From 232a579fab4aee6497f23f6702f7f1374520ef24 Mon Sep 17 00:00:00 2001 From: Nico Prananta <311343+nicnocquee@users.noreply.github.com> Date: Sun, 22 Mar 2026 13:35:48 +0100 Subject: [PATCH 2/4] Add job dependency features and documentation - Introduced a new feature for job dependencies, allowing jobs to specify prerequisites using `dependsOn.jobIds` and `dependsOn.tags`. - Implemented a `DependencyDemo` component to showcase job dependency functionality, including linear chains and tag-drain barriers. - Updated the `AddJobForm` to include a new job type for dependencies and enhanced the `JobMonitor` to display dependency information. - Created comprehensive documentation on job dependencies, including usage examples and integration details. - Updated the sidebar and comparison sections to reflect the new job dependencies feature. These changes significantly enhance the job management capabilities of DataQueue, enabling more complex workflows and dependencies. --- .../app/features/add-jobs/add-job-form.tsx | 2 + .../features/dependencies/dependency-demo.tsx | 190 ++++++++++++++++++ apps/demo/app/features/dependencies/page.tsx | 34 ++++ apps/demo/app/jobs/add-job.ts | 20 ++ apps/demo/app/page.tsx | 9 + apps/demo/components/app-sidebar.tsx | 6 + apps/demo/components/job-monitor.tsx | 59 +++++- apps/demo/lib/queue.ts | 13 ++ apps/docs/content/docs/api/index.mdx | 4 + apps/docs/content/docs/api/job-options.mdx | 2 + apps/docs/content/docs/api/job-queue.mdx | 4 +- apps/docs/content/docs/api/job-record.mdx | 2 + apps/docs/content/docs/usage/add-job.mdx | 4 + .../content/docs/usage/job-dependencies.mdx | 110 ++++++++++ apps/docs/content/docs/usage/meta.json | 1 + apps/docs/public/llms-full.txt | 127 +++++++++++- apps/docs/public/llms.txt | 1 + apps/website/app/page.tsx | 17 +- packages/dataqueue/ai/rules/advanced.md | 27 +++ packages/dataqueue/ai/rules/basic.md | 2 +- .../ai/skills/dataqueue-advanced/SKILL.md | 41 +++- .../ai/skills/dataqueue-core/SKILL.md | 9 + 22 files changed, 670 insertions(+), 14 deletions(-) create mode 100644 apps/demo/app/features/dependencies/dependency-demo.tsx create mode 100644 apps/demo/app/features/dependencies/page.tsx create mode 100644 apps/docs/content/docs/usage/job-dependencies.mdx diff --git a/apps/demo/app/features/add-jobs/add-job-form.tsx b/apps/demo/app/features/add-jobs/add-job-form.tsx index eca0bc6..ce7ac64 100644 --- a/apps/demo/app/features/add-jobs/add-job-form.tsx +++ b/apps/demo/app/features/add-jobs/add-job-form.tsx @@ -49,6 +49,7 @@ const defaultPayloads: Record = { null, 2, ), + dep_demo: JSON.stringify({ label: 'Example dependency demo step' }, null, 2), }; export function AddJobForm() { @@ -124,6 +125,7 @@ export function AddJobForm() { approval_request + dep_demo diff --git a/apps/demo/app/features/dependencies/dependency-demo.tsx b/apps/demo/app/features/dependencies/dependency-demo.tsx new file mode 100644 index 0000000..a5d93bb --- /dev/null +++ b/apps/demo/app/features/dependencies/dependency-demo.tsx @@ -0,0 +1,190 @@ +'use client'; + +import { useState, useTransition } from 'react'; +import { Button } from '@/components/ui/button'; +import { + Card, + CardContent, + CardDescription, + CardHeader, + CardTitle, +} from '@/components/ui/card'; +import { addGenericJob } from '@/app/jobs/add-job'; +import { processJobs } from '@/app/jobs/process-jobs'; +import { Loader2 } from 'lucide-react'; + +/** + * Interactive demos for `dependsOn.jobIds` (linear prerequisites) and + * `dependsOn.tags` (tag drain: wait until no other active job is tagged as a superset). + */ +export function DependencyDemo() { + const [isPending, startTransition] = useTransition(); + const [log, setLog] = useState(null); + + const runChain = () => { + startTransition(async () => { + const a = await addGenericJob({ + jobType: 'dep_demo', + payload: { label: 'chain — step A' }, + tags: ['demo-deps', 'chain'], + }); + const b = await addGenericJob({ + jobType: 'dep_demo', + payload: { label: 'chain — step B' }, + tags: ['demo-deps', 'chain'], + dependsOn: { jobIds: [a.job] }, + }); + const c = await addGenericJob({ + jobType: 'dep_demo', + payload: { label: 'chain — step C' }, + tags: ['demo-deps', 'chain'], + dependsOn: { jobIds: [b.job] }, + }); + setLog( + `Enqueued linear chain: A=#${a.job} → B=#${b.job} (waits on A) → C=#${c.job} (waits on B). Run the processor to watch order.`, + ); + }); + }; + + const runTagBarrier = () => { + startTransition(async () => { + const wave = `wave-${Date.now()}`; + const j1 = await addGenericJob({ + jobType: 'dep_demo', + payload: { label: 'parallel slot 1' }, + tags: ['demo-deps', wave, 'slot-1'], + }); + const j2 = await addGenericJob({ + jobType: 'dep_demo', + payload: { label: 'parallel slot 2' }, + tags: ['demo-deps', wave, 'slot-2'], + }); + const barrier = await addGenericJob({ + jobType: 'dep_demo', + payload: { label: 'after wave (tag drain)' }, + tags: ['demo-deps', wave, 'barrier'], + dependsOn: { tags: [wave] }, + }); + setLog( + `Tag “${wave}”: parallel jobs #${j1.job} and #${j2.job} must finish (or leave the active set) before #${barrier.job} runs — barrier waits while any job still has tag [${wave}].`, + ); + }); + }; + + const runFailureCascade = () => { + startTransition(async () => { + const bad = await addGenericJob({ + jobType: 'dep_demo', + payload: { label: 'fails on purpose', fail: true }, + tags: ['demo-deps', 'fail-cascade'], + maxAttempts: 1, + }); + const dep = await addGenericJob({ + jobType: 'dep_demo', + payload: { label: 'cancelled when prerequisite fails' }, + tags: ['demo-deps', 'fail-cascade'], + dependsOn: { jobIds: [bad.job] }, + }); + setLog( + `Enqueued prerequisite #${bad.job} (will fail once) and dependent #${dep.job}. After processing, #${dep.job} should end cancelled.`, + ); + }); + }; + + const triggerProcessor = () => { + startTransition(async () => { + await processJobs(); + setLog((prev) => + prev + ? `${prev}\nProcessor tick complete — refresh the table below.` + : 'Processor tick complete — refresh the table below.', + ); + }); + }; + + return ( +
+ + + Linear chain (`dependsOn.jobIds`) + + B waits until job A is completed; C waits until B is completed. + Prerequisite failures or cancellations propagate to dependents. + + + + + + + + + + Tag drain (`dependsOn.tags`) + + The barrier job waits until no active job (pending, + processing, or waiting) still includes all of those tags. Parallel + work can run; the barrier runs after the wave clears. + + + + + + + + + + Failure cascade + + A job that fails (here: single attempt, intentional throw) cancels + jobs that depend on it by job id. + + + + + + + + + + Run the processor + + Same as other demos: jobs move when the processor runs (manual + button here, or Auto Processor on the home page). + + + + + + + + {log && ( +

+ {log} +

+ )} +
+ ); +} diff --git a/apps/demo/app/features/dependencies/page.tsx b/apps/demo/app/features/dependencies/page.tsx new file mode 100644 index 0000000..922e2f2 --- /dev/null +++ b/apps/demo/app/features/dependencies/page.tsx @@ -0,0 +1,34 @@ +export const dynamic = 'force-dynamic'; + +import { FeaturePage } from '@/components/feature-page'; +import { JobMonitor } from '@/components/job-monitor'; +import { DependencyDemo } from './dependency-demo'; +import { RefreshPeriodically } from '@/app/refresh-periodically'; +import { refresh } from '@/app/queue/refresh'; + +export default function DependenciesPage() { + return ( + + +
+ + +
+
+ ); +} diff --git a/apps/demo/app/jobs/add-job.ts b/apps/demo/app/jobs/add-job.ts index 14d8d57..e4bdb52 100644 --- a/apps/demo/app/jobs/add-job.ts +++ b/apps/demo/app/jobs/add-job.ts @@ -2,7 +2,14 @@ import { getJobQueue, type JobPayloadMap } from '@/lib/queue'; import { revalidatePath } from 'next/cache'; +import type { JobDependsOn } from '@nicnocquee/dataqueue'; +/** + * Enqueues a typed job. Optionally attaches prerequisites via {@link JobDependsOn}. + * + * @param params - Job type, payload, and optional scheduling / dependency fields. + * @returns The new job id (numeric) as `job`. + */ export const addGenericJob = async ({ jobType, payload, @@ -13,6 +20,7 @@ export const addGenericJob = async ({ timeoutMs, forceKillOnTimeout, maxAttempts, + dependsOn, }: { jobType: keyof JobPayloadMap; payload: JobPayloadMap[keyof JobPayloadMap]; @@ -23,12 +31,22 @@ export const addGenericJob = async ({ timeoutMs?: number; forceKillOnTimeout?: boolean; maxAttempts?: number; + dependsOn?: JobDependsOn; }) => { const jobQueue = getJobQueue(); const runAt = runAtDelay ? new Date(Date.now() + runAtDelay * 1000) : undefined; + const normalizedDependsOn: JobDependsOn | undefined = + dependsOn && + ((dependsOn.jobIds?.length ?? 0) > 0 || (dependsOn.tags?.length ?? 0) > 0) + ? { + ...(dependsOn.jobIds?.length ? { jobIds: dependsOn.jobIds } : {}), + ...(dependsOn.tags?.length ? { tags: dependsOn.tags } : {}), + } + : undefined; + const job = await jobQueue.addJob({ jobType, payload: payload as never, @@ -39,8 +57,10 @@ export const addGenericJob = async ({ timeoutMs: timeoutMs ?? undefined, forceKillOnTimeout: forceKillOnTimeout ?? undefined, maxAttempts: maxAttempts ?? undefined, + dependsOn: normalizedDependsOn, }); revalidatePath('/'); + revalidatePath('/features/dependencies'); return { job }; }; diff --git a/apps/demo/app/page.tsx b/apps/demo/app/page.tsx index bc893b9..c6d545d 100644 --- a/apps/demo/app/page.tsx +++ b/apps/demo/app/page.tsx @@ -25,6 +25,7 @@ import { Monitor, ExternalLink, CalendarClock, + GitBranch, } from 'lucide-react'; import Link from 'next/link'; @@ -43,6 +44,14 @@ const features = [ icon: Tags, docsUrl: 'https://docs.dataqueue.dev/usage/get-jobs', }, + { + title: 'Job dependencies', + description: + 'dependsOn job ids and tag-drain barriers; cascade on fail or cancel', + href: '/features/dependencies', + icon: GitBranch, + docsUrl: 'https://docs.dataqueue.dev/usage/add-job', + }, { title: 'Job Management', description: 'Retry, cancel, and edit individual jobs', diff --git a/apps/demo/components/app-sidebar.tsx b/apps/demo/components/app-sidebar.tsx index 6d889bb..3c3fe8b 100644 --- a/apps/demo/components/app-sidebar.tsx +++ b/apps/demo/components/app-sidebar.tsx @@ -25,6 +25,7 @@ import { Monitor, LayoutDashboard, CalendarClock, + GitBranch, } from 'lucide-react'; import Link from 'next/link'; import { usePathname } from 'next/navigation'; @@ -33,6 +34,11 @@ const featurePages = [ { title: 'Overview', href: '/', icon: Home }, { title: 'Add & Process Jobs', href: '/features/add-jobs', icon: Plus }, { title: 'Tags & Filtering', href: '/features/tags', icon: Tags }, + { + title: 'Job dependencies', + href: '/features/dependencies', + icon: GitBranch, + }, { title: 'Job Management', href: '/features/management', icon: Settings }, { title: 'Idempotency', href: '/features/idempotency', icon: Key }, { title: 'Timeouts', href: '/features/timeouts', icon: Timer }, diff --git a/apps/demo/components/job-monitor.tsx b/apps/demo/components/job-monitor.tsx index c166117..f510a3e 100644 --- a/apps/demo/components/job-monitor.tsx +++ b/apps/demo/components/job-monitor.tsx @@ -1,5 +1,6 @@ import { getJobQueue } from '@/lib/queue'; import { JobTable, StatusBadge } from './job-table'; +import type { JobRecord } from '@nicnocquee/dataqueue'; import { Card, CardContent, @@ -8,6 +9,14 @@ import { CardTitle, } from '@/components/ui/card'; +type AnyJobRecord = JobRecord, string>; + +type JobMonitorColumn = { + header: string; + key: keyof AnyJobRecord; + render?: (value: unknown, job: AnyJobRecord) => React.ReactNode; +}; + const statuses = [ 'pending', 'processing', @@ -17,13 +26,37 @@ const statuses = [ 'cancelled', ] as const; -const defaultColumns = [ - { header: 'ID', key: 'id' as const }, - { header: 'Type', key: 'jobType' as const }, - { header: 'Status', key: 'status' as const }, - { header: 'Priority', key: 'priority' as const }, - { header: 'Tags', key: 'tags' as const }, - { header: 'Created', key: 'createdAt' as const }, +const defaultColumns: JobMonitorColumn[] = [ + { header: 'ID', key: 'id' }, + { header: 'Type', key: 'jobType' }, + { header: 'Status', key: 'status' }, + { header: 'Priority', key: 'priority' }, + { header: 'Tags', key: 'tags' }, + { header: 'Created', key: 'createdAt' }, +]; + +const dependencyExtraColumns: JobMonitorColumn[] = [ + { + header: 'Dep. job IDs', + key: 'dependsOnJobIds', + render: (value: unknown) => { + const ids = value as number[] | null | undefined; + if (!ids?.length) return -; + return {ids.join(', ')}; + }, + }, + { + header: 'Dep. tags', + key: 'dependsOnTags', + render: (value: unknown) => { + const tags = value as string[] | null | undefined; + if (!tags?.length) + return -; + return ( + {tags.join(', ')} + ); + }, + }, ]; export async function JobMonitor({ @@ -31,12 +64,22 @@ export async function JobMonitor({ description, filter, compact = false, + showDependencyColumns = false, }: { title?: string; description?: string; filter?: { jobType?: string; status?: string }; compact?: boolean; + /** When true, adds columns for persisted `dependsOnJobIds` / `dependsOnTags`. */ + showDependencyColumns?: boolean; }) { + const tableColumns = showDependencyColumns + ? [ + ...defaultColumns.slice(0, 5), + ...dependencyExtraColumns, + defaultColumns[5], + ] + : defaultColumns; const jobQueue = getJobQueue(); const statusesToShow = filter?.status @@ -96,7 +139,7 @@ export async function JobMonitor({ diff --git a/apps/demo/lib/queue.ts b/apps/demo/lib/queue.ts index b519103..e45558a 100644 --- a/apps/demo/lib/queue.ts +++ b/apps/demo/lib/queue.ts @@ -28,6 +28,12 @@ export type JobPayloadMap = { requestType: string; description: string; }; + /** Lightweight jobs for the Job dependencies demo (chains, tag drain, failure cascade). */ + dep_demo: { + label: string; + /** When true, the handler throws so dependents can be cancelled by the queue. */ + fail?: boolean; + }; }; let jobQueue: ReturnType> | null = null; @@ -109,4 +115,11 @@ export const jobHandlers: JobHandlers = { console.log(`[approval_request] Token failed:`, result.error); } }, + dep_demo: async (payload) => { + const { label, fail } = payload; + console.log(`[dep_demo] ${label}`); + if (fail) { + throw new Error('Intentional failure (dependency demo)'); + } + }, }; diff --git a/apps/docs/content/docs/api/index.mdx b/apps/docs/content/docs/api/index.mdx index 07562c6..e2d9015 100644 --- a/apps/docs/content/docs/api/index.mdx +++ b/apps/docs/content/docs/api/index.mdx @@ -18,3 +18,7 @@ This section documents the main classes, types, and functions available for mana - [JobHandlers](/api/job-handlers) - [Database Utility](/api/db-util) - [Tags](/api/tags) + +## Guides + +- [Job dependencies](/usage/job-dependencies) — prerequisites via `dependsOn.jobIds`, tag drain via `dependsOn.tags`, and `batchDepRef` for `addJobs` diff --git a/apps/docs/content/docs/api/job-options.mdx b/apps/docs/content/docs/api/job-options.mdx index 5876bdb..e6217cc 100644 --- a/apps/docs/content/docs/api/job-options.mdx +++ b/apps/docs/content/docs/api/job-options.mdx @@ -23,6 +23,7 @@ The `JobOptions` interface defines the options for creating a new job in the que - `tags?`: _string[]_ — Tags for this job. Used for grouping, searching, or batch operations. - `idempotencyKey?`: _string_ — Optional idempotency key. When provided, ensures that only one job exists for a given key. If a job with the same key already exists, `addJob` returns the existing job's ID instead of creating a duplicate. See [Idempotency](/usage/add-job#idempotency) for details. - `deadLetterJobType?`: _string_ — Optional dead-letter destination job type. When the job exhausts retries, DataQueue creates a new pending job in this job type with an envelope payload containing source metadata, original payload, and failure context. +- `dependsOn?`: _JobDependsOn_ — Optional prerequisites. Wait for listed jobs to complete (`jobIds`) and/or for a tag-drain barrier (`tags`). See [Job dependencies](/usage/job-dependencies). ## Example @@ -38,5 +39,6 @@ const job = { tags: ['welcome', 'user'], // tags for grouping/searching idempotencyKey: 'welcome-email-user-123', // prevent duplicate jobs deadLetterJobType: 'email_dead_letter', // route exhausted failures + dependsOn: { jobIds: [42] }, // run only after job 42 completes }; ``` diff --git a/apps/docs/content/docs/api/job-queue.mdx b/apps/docs/content/docs/api/job-queue.mdx index fc2e87d..e1cc72d 100644 --- a/apps/docs/content/docs/api/job-queue.mdx +++ b/apps/docs/content/docs/api/job-queue.mdx @@ -117,6 +117,7 @@ interface JobOptions { retryDelayMax?: number; // Max delay cap in seconds (default: none) deadLetterJobType?: string; // Route exhausted failures to this job type group?: { id: string; tier?: string }; // Optional group for global concurrency limits + dependsOn?: { jobIds?: number[]; tags?: string[] }; // Prerequisites — see Job dependencies } ``` @@ -125,6 +126,7 @@ interface JobOptions { - `retryDelayMax` - Maximum delay cap in seconds. Only meaningful when `retryBackoff` is true. No limit when omitted. - `deadLetterJobType` - Optional dead-letter destination. When retries are exhausted, a new pending job is created in this job type with an envelope payload (`originalJob`, `originalPayload`, `failure`). - `group` - Optional grouping metadata. Use `group.id` to enforce global per-group limits with `ProcessorOptions.groupConcurrency`. `group.tier` is reserved for future policies. +- `dependsOn` - Optional prerequisites (`jobIds` and/or `tags`). See [Job dependencies](/usage/job-dependencies). #### AddJobOptions @@ -146,7 +148,7 @@ Adds multiple jobs to the queue in a single operation. More efficient than calli Returns an array of job IDs in the same order as the input array. -Each job can independently have its own `priority`, `runAt`, `tags`, `idempotencyKey`, and other options. Idempotency keys are handled per-job — duplicates resolve to the existing job's ID without creating a new row. +Each job can independently have its own `priority`, `runAt`, `tags`, `idempotencyKey`, `dependsOn`, and other options. Idempotency keys are handled per-job — duplicates resolve to the existing job's ID without creating a new row. Passing an empty array returns `[]` immediately without touching the database. diff --git a/apps/docs/content/docs/api/job-record.mdx b/apps/docs/content/docs/api/job-record.mdx index 7fccd34..266c69d 100644 --- a/apps/docs/content/docs/api/job-record.mdx +++ b/apps/docs/content/docs/api/job-record.mdx @@ -45,6 +45,8 @@ The `JobRecord` interface represents a job stored in the queue, including its st - `deadLetterJobType?`: _string | null_ — Configured dead-letter destination job type for this job. - `deadLetteredAt?`: _Date | null_ — Timestamp when this job was routed to a dead-letter job. - `deadLetterJobId?`: _number | null_ — Linked dead-letter job ID created when retries were exhausted. +- `dependsOnJobIds?`: _number[] | null_ — Prerequisite job ids set at enqueue time, if any. See [Job dependencies](/usage/job-dependencies). +- `dependsOnTags?`: _string[] | null_ — Tag-drain prerequisite tags set at enqueue time, if any. See [Job dependencies](/usage/job-dependencies). ## Example diff --git a/apps/docs/content/docs/usage/add-job.mdx b/apps/docs/content/docs/usage/add-job.mdx index f1da4ec..ad74dfc 100644 --- a/apps/docs/content/docs/usage/add-job.mdx +++ b/apps/docs/content/docs/usage/add-job.mdx @@ -85,6 +85,10 @@ export const sendBulkEmails = async ( - **Idempotency**: Each job's `idempotencyKey` is handled independently. Duplicate keys resolve to the existing job's ID. - **Transactional**: The `{ db }` option works with `addJobs` the same way as `addJob` (PostgreSQL only). +## Job dependencies + +Use `dependsOn` to wait for other jobs to finish (`jobIds`) and/or for a [tag drain](/usage/job-dependencies#dependsontags-tag-drain) (`tags`). In `addJobs`, use `batchDepRef` to point at other jobs in the same batch. See [Job dependencies](/usage/job-dependencies). + ## Idempotency You can provide an `idempotencyKey` when adding a job to prevent duplicate jobs. If a job with the same key already exists in the queue, `addJob` returns the existing job's ID instead of creating a new one. diff --git a/apps/docs/content/docs/usage/job-dependencies.mdx b/apps/docs/content/docs/usage/job-dependencies.mdx new file mode 100644 index 0000000..43f16c8 --- /dev/null +++ b/apps/docs/content/docs/usage/job-dependencies.mdx @@ -0,0 +1,110 @@ +--- +title: Job Dependencies +--- + +You can defer a job until prerequisites are satisfied by setting `dependsOn` on [`JobOptions`](/api/job-options). Both dimensions use **logical AND** when both are present. + +## `dependsOn.jobIds` + +The job stays **pending** until **every** listed prerequisite has status `completed`. + +- **Invalid ids**: Enqueue fails if any id does not exist in the queue. +- **Self-dependency**: A job cannot list its own id in `dependsOn.jobIds`. +- **Cycles**: DataQueue rejects inserts that would create a dependency cycle between jobs. +- **Failure or cancellation**: If any prerequisite ends as `failed` or `cancelled`, pending jobs that depend on it (transitively) are **cancelled**. + +Use this for explicit chains: _job B runs only after job A completes successfully_. + +```typescript title="Linear chain" +import { getJobQueue } from '@/lib/queue'; + +const jobQueue = getJobQueue(); + +const a = await jobQueue.addJob({ + jobType: 'ingest', + payload: { fileId: 'f1' }, +}); + +await jobQueue.addJob({ + jobType: 'transform', + payload: { fileId: 'f1' }, + dependsOn: { jobIds: [a] }, +}); +``` + +## `dependsOn.tags` (tag drain) + +The job stays **pending** while **another** job (not itself) is **active** — `pending`, `processing`, or `waiting` — and that job’s `tags` are a **superset** of **every** tag listed in `dependsOn.tags` (same semantics as Postgres `tags @> depends_on_tags`). + +When no such blocking job exists, the dependent job becomes eligible to run (subject to `runAt`, workers, etc.). + +- **Failure or cancellation**: If a job that matches the tag barrier fails or is cancelled, pending jobs that listed those tags are **cancelled** (transitively). + +Use this for _drain_ patterns: _wait until no in-flight work is tagged in a certain way_ (for example a “wave” or “tenant” tag). + +```typescript title="Tag drain" +await jobQueue.addJob({ + jobType: 'finalize_wave', + payload: { wave: 2 }, + tags: ['wave:2'], + dependsOn: { tags: ['wave:1'] }, +}); +``` + +## Combining `jobIds` and `tags` + +If you set both, **all** job-id prerequisites must be `completed` **and** the tag-drain condition must be clear before the job runs. + +## `addJob` vs `addJobs` + +### Single `addJob` + +`dependsOn.jobIds` must contain **positive** database ids only. Negative placeholders are **not** allowed (they are reserved for batch inserts). + +### Batch `addJobs` and `batchDepRef` + +When you enqueue several related jobs in one `addJobs` call, you can reference earlier jobs in the **same batch** using negative placeholders: `-(index + 1)` for the job at `index` in the array. Use the helper **`batchDepRef`** from `@nicnocquee/dataqueue` instead of hard-coding negatives. + +```typescript title="Batch dependencies" +import { batchDepRef } from '@nicnocquee/dataqueue'; +import { getJobQueue } from '@/lib/queue'; + +const jobQueue = getJobQueue(); + +const [idA, idB, idC] = await jobQueue.addJobs([ + { jobType: 'step_a', payload: {} }, + { + jobType: 'step_b', + payload: {}, + dependsOn: { jobIds: [batchDepRef(0)] }, + }, + { + jobType: 'step_c', + payload: {}, + dependsOn: { jobIds: [batchDepRef(0), batchDepRef(1)] }, + }, +]); +``` + +This enqueues **three jobs in one round-trip**. Array indices are **0-based**: `batchDepRef(0)` means “the job at index 0 in this same array,” and `batchDepRef(1)` means “the job at index 1.” After insert, those placeholders become real ids. + +- **`step_a`** (index `0`) has no prerequisites; it can run as soon as a worker picks it up. +- **`step_b`** waits for **`step_a`** only (`dependsOn: { jobIds: [batchDepRef(0)] }`). +- **`step_c`** waits for **both** earlier jobs (`batchDepRef(0)` and `batchDepRef(1)`), so it runs only after `step_a` and `step_b` have reached `completed`. + +`[idA, idB, idC]` are the final database ids in the same order as the input array — the same ids that were written into each row’s dependency list when placeholders were resolved. + + + `batchDepRef` is exported from `@nicnocquee/dataqueue`. Re-export it from your + queue module if you prefer a single import path. + + +## Persisted fields on `JobRecord` + +Prerequisites are stored on the row as [`dependsOnJobIds`](/api/job-record) and [`dependsOnTags`](/api/job-record) for inspection and debugging. + +## See also + +- [`JobOptions`](/api/job-options) — full `dependsOn` / `JobDependsOn` shape +- [`JobRecord`](/api/job-record) — persisted prerequisite columns +- [Add job](/usage/add-job) — `addJob` / `addJobs` batch behavior diff --git a/apps/docs/content/docs/usage/meta.json b/apps/docs/content/docs/usage/meta.json index a1aa229..de93480 100644 --- a/apps/docs/content/docs/usage/meta.json +++ b/apps/docs/content/docs/usage/meta.json @@ -7,6 +7,7 @@ "job-handlers", "init-queue", "add-job", + "job-dependencies", "job-output", "process-jobs", "long-running-server", diff --git a/apps/docs/public/llms-full.txt b/apps/docs/public/llms-full.txt index 33f98a3..11c674d 100644 --- a/apps/docs/public/llms-full.txt +++ b/apps/docs/public/llms-full.txt @@ -79,6 +79,10 @@ This section documents the main classes, types, and functions available for mana - [Database Utility](/api/db-util) - [Tags](/api/tags) +## Guides + +- [Job dependencies](/usage/job-dependencies) — prerequisites via `dependsOn.jobIds`, tag drain via `dependsOn.tags`, and `batchDepRef` for `addJobs` + --- # JobEvent @@ -200,6 +204,7 @@ The `JobOptions` interface defines the options for creating a new job in the que - `tags?`: _string[]_ — Tags for this job. Used for grouping, searching, or batch operations. - `idempotencyKey?`: _string_ — Optional idempotency key. When provided, ensures that only one job exists for a given key. If a job with the same key already exists, `addJob` returns the existing job's ID instead of creating a duplicate. See [Idempotency](/usage/add-job#idempotency) for details. - `deadLetterJobType?`: _string_ — Optional dead-letter destination job type. When the job exhausts retries, DataQueue creates a new pending job in this job type with an envelope payload containing source metadata, original payload, and failure context. +- `dependsOn?`: _JobDependsOn_ — Optional prerequisites. Wait for listed jobs to complete (`jobIds`) and/or for a tag-drain barrier (`tags`). See [Job dependencies](/usage/job-dependencies). ## Example @@ -215,6 +220,7 @@ const job = { tags: ['welcome', 'user'], // tags for grouping/searching idempotencyKey: 'welcome-email-user-123', // prevent duplicate jobs deadLetterJobType: 'email_dead_letter', // route exhausted failures + dependsOn: { jobIds: [42] }, // run only after job 42 completes }; ``` @@ -339,6 +345,7 @@ interface JobOptions { retryDelayMax?: number; // Max delay cap in seconds (default: none) deadLetterJobType?: string; // Route exhausted failures to this job type group?: { id: string; tier?: string }; // Optional group for global concurrency limits + dependsOn?: { jobIds?: number[]; tags?: string[] }; // Prerequisites — see Job dependencies } ``` @@ -347,6 +354,7 @@ interface JobOptions { - `retryDelayMax` - Maximum delay cap in seconds. Only meaningful when `retryBackoff` is true. No limit when omitted. - `deadLetterJobType` - Optional dead-letter destination. When retries are exhausted, a new pending job is created in this job type with an envelope payload (`originalJob`, `originalPayload`, `failure`). - `group` - Optional grouping metadata. Use `group.id` to enforce global per-group limits with `ProcessorOptions.groupConcurrency`. `group.tier` is reserved for future policies. +- `dependsOn` - Optional prerequisites (`jobIds` and/or `tags`). See [Job dependencies](/usage/job-dependencies). #### AddJobOptions @@ -368,7 +376,7 @@ Adds multiple jobs to the queue in a single operation. More efficient than calli Returns an array of job IDs in the same order as the input array. -Each job can independently have its own `priority`, `runAt`, `tags`, `idempotencyKey`, and other options. Idempotency keys are handled per-job — duplicates resolve to the existing job's ID without creating a new row. +Each job can independently have its own `priority`, `runAt`, `tags`, `idempotencyKey`, `dependsOn`, and other options. Idempotency keys are handled per-job — duplicates resolve to the existing job's ID without creating a new row. Passing an empty array returns `[]` immediately without touching the database. @@ -898,6 +906,8 @@ The `JobRecord` interface represents a job stored in the queue, including its st - `deadLetterJobType?`: _string | null_ — Configured dead-letter destination job type for this job. - `deadLetteredAt?`: _Date | null_ — Timestamp when this job was routed to a dead-letter job. - `deadLetterJobId?`: _number | null_ — Linked dead-letter job ID created when retries were exhausted. +- `dependsOnJobIds?`: _number[] | null_ — Prerequisite job ids set at enqueue time, if any. See [Job dependencies](/usage/job-dependencies). +- `dependsOnTags?`: _string[] | null_ — Tag-drain prerequisite tags set at enqueue time, if any. See [Job dependencies](/usage/job-dependencies). ## Example @@ -1769,6 +1779,10 @@ export const sendBulkEmails = async ( - **Idempotency**: Each job's `idempotencyKey` is handled independently. Duplicate keys resolve to the existing job's ID. - **Transactional**: The `{ db }` option works with `addJobs` the same way as `addJob` (PostgreSQL only). +## Job dependencies + +Use `dependsOn` to wait for other jobs to finish (`jobIds`) and/or for a [tag drain](/usage/job-dependencies#dependsontags-tag-drain) (`tags`). In `addJobs`, use `batchDepRef` to point at other jobs in the same batch. See [Job dependencies](/usage/job-dependencies). + ## Idempotency You can provide an `idempotencyKey` when adding a job to prevent duplicate jobs. If a job with the same key already exists in the queue, `addJob` returns the existing job's ID instead of creating a new one. @@ -3668,6 +3682,117 @@ jobQueue = initJobQueue({ --- +# Job Dependencies + +Slug: usage/job-dependencies + +You can defer a job until prerequisites are satisfied by setting `dependsOn` on [`JobOptions`](/api/job-options). Both dimensions use **logical AND** when both are present. + +## `dependsOn.jobIds` + +The job stays **pending** until **every** listed prerequisite has status `completed`. + +- **Invalid ids**: Enqueue fails if any id does not exist in the queue. +- **Self-dependency**: A job cannot list its own id in `dependsOn.jobIds`. +- **Cycles**: DataQueue rejects inserts that would create a dependency cycle between jobs. +- **Failure or cancellation**: If any prerequisite ends as `failed` or `cancelled`, pending jobs that depend on it (transitively) are **cancelled**. + +Use this for explicit chains: _job B runs only after job A completes successfully_. + +```typescript title="Linear chain" +import { getJobQueue } from '@/lib/queue'; + +const jobQueue = getJobQueue(); + +const a = await jobQueue.addJob({ + jobType: 'ingest', + payload: { fileId: 'f1' }, +}); + +await jobQueue.addJob({ + jobType: 'transform', + payload: { fileId: 'f1' }, + dependsOn: { jobIds: [a] }, +}); +``` + +## `dependsOn.tags` (tag drain) + +The job stays **pending** while **another** job (not itself) is **active** — `pending`, `processing`, or `waiting` — and that job’s `tags` are a **superset** of **every** tag listed in `dependsOn.tags` (same semantics as Postgres `tags @> depends_on_tags`). + +When no such blocking job exists, the dependent job becomes eligible to run (subject to `runAt`, workers, etc.). + +- **Failure or cancellation**: If a job that matches the tag barrier fails or is cancelled, pending jobs that listed those tags are **cancelled** (transitively). + +Use this for _drain_ patterns: _wait until no in-flight work is tagged in a certain way_ (for example a “wave” or “tenant” tag). + +```typescript title="Tag drain" +await jobQueue.addJob({ + jobType: 'finalize_wave', + payload: { wave: 2 }, + tags: ['wave:2'], + dependsOn: { tags: ['wave:1'] }, +}); +``` + +## Combining `jobIds` and `tags` + +If you set both, **all** job-id prerequisites must be `completed` **and** the tag-drain condition must be clear before the job runs. + +## `addJob` vs `addJobs` + +### Single `addJob` + +`dependsOn.jobIds` must contain **positive** database ids only. Negative placeholders are **not** allowed (they are reserved for batch inserts). + +### Batch `addJobs` and `batchDepRef` + +When you enqueue several related jobs in one `addJobs` call, you can reference earlier jobs in the **same batch** using negative placeholders: `-(index + 1)` for the job at `index` in the array. Use the helper **`batchDepRef`** from `@nicnocquee/dataqueue` instead of hard-coding negatives. + +```typescript title="Batch dependencies" +import { batchDepRef } from '@nicnocquee/dataqueue'; +import { getJobQueue } from '@/lib/queue'; + +const jobQueue = getJobQueue(); + +const [idA, idB, idC] = await jobQueue.addJobs([ + { jobType: 'step_a', payload: {} }, + { + jobType: 'step_b', + payload: {}, + dependsOn: { jobIds: [batchDepRef(0)] }, + }, + { + jobType: 'step_c', + payload: {}, + dependsOn: { jobIds: [batchDepRef(0), batchDepRef(1)] }, + }, +]); +``` + +This enqueues **three jobs in one round-trip**. Array indices are **0-based**: `batchDepRef(0)` means “the job at index 0 in this same array,” and `batchDepRef(1)` means “the job at index 1.” After insert, those placeholders become real ids. + +- **`step_a`** (index `0`) has no prerequisites; it can run as soon as a worker picks it up. +- **`step_b`** waits for **`step_a`** only (`dependsOn: { jobIds: [batchDepRef(0)] }`). +- **`step_c`** waits for **both** earlier jobs (`batchDepRef(0)` and `batchDepRef(1)`), so it runs only after `step_a` and `step_b` have reached `completed`. + +`[idA, idB, idC]` are the final database ids in the same order as the input array — the same ids that were written into each row’s dependency list when placeholders were resolved. + +> **Note:** `batchDepRef` is exported from `@nicnocquee/dataqueue`. Re-export it from your + queue module if you prefer a single import path. + +## Persisted fields on `JobRecord` + +Prerequisites are stored on the row as [`dependsOnJobIds`](/api/job-record) and [`dependsOnTags`](/api/job-record) for inspection and debugging. + +## See also + +- [`JobOptions`](/api/job-options) — full `dependsOn` / `JobDependsOn` shape +- [`JobRecord`](/api/job-record) — persisted prerequisite columns +- [Add job](/usage/add-job) — `addJob` / `addJobs` batch behavior + +--- + # Job Events Slug: usage/job-events diff --git a/apps/docs/public/llms.txt b/apps/docs/public/llms.txt index d4abc98..ef1f2a4 100644 --- a/apps/docs/public/llms.txt +++ b/apps/docs/public/llms.txt @@ -18,6 +18,7 @@ - [Job Handlers](https://docs.dataqueue.dev/usage/job-handlers): Define typed handlers with PayloadMap - [Initialize Queue](https://docs.dataqueue.dev/usage/init-queue): PostgreSQL and Redis configuration - [Add Job](https://docs.dataqueue.dev/usage/add-job): Enqueue jobs with priority, scheduling, tags, idempotency +- [Job Dependencies](https://docs.dataqueue.dev/usage/job-dependencies): Prerequisites via `dependsOn.jobIds` or tag drain (`dependsOn.tags`); `batchDepRef` for same-batch `addJobs` - [Process Jobs](https://docs.dataqueue.dev/usage/process-jobs): Serverless batch processing with cron - [Long-Running Server](https://docs.dataqueue.dev/usage/long-running-server): Background processor, supervisor (auto-reclaim, auto-cleanup), and graceful shutdown - [Failed Jobs](https://docs.dataqueue.dev/usage/failed-jobs): Retry behavior and error history diff --git a/apps/website/app/page.tsx b/apps/website/app/page.tsx index a3b54d3..f00632f 100644 --- a/apps/website/app/page.tsx +++ b/apps/website/app/page.tsx @@ -21,6 +21,7 @@ import { Minus, CalendarClock, Sparkles, + GitBranch, } from 'lucide-react'; import { Button } from '@/components/ui/button'; import { Card, CardContent } from '@/components/ui/card'; @@ -137,8 +138,8 @@ const comparisonRows: ComparisonRow[] = [ trigger: false, }, { - feature: 'Job Flows / DAGs', - dataqueue: false, + feature: 'Job dependencies / DAGs', + dataqueue: 'dependsOn (job ids, tags, batch)', pgboss: false, bullmq: 'Parent-child flows', trigger: 'Workflows', @@ -332,6 +333,12 @@ queue.process('email', async (job) => { description: 'Pause and resume job execution with time-based delays or external signals. Build multi-step workflows like onboarding sequences and approval flows as a single handler.', }, + { + icon: GitBranch, + title: 'Job dependencies', + description: + 'Defer jobs with dependsOn: prerequisite job ids, tag-drain barriers, or batch references via batchDepRef in addJobs. Invalid ids, self-deps, and cycles are rejected at enqueue time.', + }, { icon: Atom, title: 'React Hooks', @@ -410,6 +417,12 @@ queue.process('email', async (job) => { > Features + + Dependencies + ms)` — reactive; return ms to extend, or nothing to let timeout proceed. - `forceKillOnTimeout: true` — terminates handler via Worker Thread. Requires Node.js, serializable handler, and disables `ctx.run`/waits/`prolong`/`onTimeout`. +## Job Dependencies + +Defer enqueue eligibility with `dependsOn` on `addJob` / `addJobs`. PostgreSQL and Redis both support this. If both `jobIds` and `tags` are set, **all** conditions must pass (logical AND). + +### `dependsOn.jobIds` + +The job stays **pending** until **every** listed prerequisite has status `completed`. Enqueue fails if any id is missing, if a job depends on itself, or if the graph would contain a cycle. If a prerequisite ends `failed` or `cancelled`, dependent pending jobs are **cancelled** (transitively). + +Single `addJob` calls must use **positive** database ids only. + +### `dependsOn.tags` (tag drain) + +The job stays **pending** while **another** job (not itself) is **active** (`pending`, `processing`, or `waiting`) and that job’s `tags` are a **superset** of every tag in `dependsOn.tags`. When no such blocker exists, the job becomes eligible. Matching jobs that fail or cancel also cancel dependents waiting on those tags. + +### Same-batch `addJobs` — `batchDepRef` + +Use `batchDepRef(batchIndex)` from `@nicnocquee/dataqueue` to reference the job at `batchIndex` in the **same** `addJobs` array (negative placeholders resolved after insert). Hard-coding negative ids is discouraged. + +```typescript +import { batchDepRef } from '@nicnocquee/dataqueue'; + +await queue.addJobs([ + { jobType: 'a', payload: {} }, + { jobType: 'b', payload: {}, dependsOn: { jobIds: [batchDepRef(0)] } }, +]); +``` + ## Tags and Filtering ```typescript diff --git a/packages/dataqueue/ai/rules/basic.md b/packages/dataqueue/ai/rules/basic.md index 7b56f56..c822166 100644 --- a/packages/dataqueue/ai/rules/basic.md +++ b/packages/dataqueue/ai/rules/basic.md @@ -86,7 +86,7 @@ const ids = await queue.addJobs([ // ids[i] corresponds to the i-th input job ``` -Both support `idempotencyKey`, `priority`, `runAt`, `tags`, optional `group: { id, tier? }`, and `{ db }` for transactional inserts (PostgreSQL only). +Both support `idempotencyKey`, `priority`, `runAt`, `tags`, optional `group: { id, tier? }`, optional `dependsOn` for prerequisite jobs or tag-drain barriers, and `{ db }` for transactional inserts (PostgreSQL only). ## Handlers diff --git a/packages/dataqueue/ai/skills/dataqueue-advanced/SKILL.md b/packages/dataqueue/ai/skills/dataqueue-advanced/SKILL.md index 1230595..50387a4 100644 --- a/packages/dataqueue/ai/skills/dataqueue-advanced/SKILL.md +++ b/packages/dataqueue/ai/skills/dataqueue-advanced/SKILL.md @@ -1,10 +1,49 @@ --- name: dataqueue-advanced -description: Advanced DataQueue patterns — step memoization, waits, tokens, cron, timeouts, tags, idempotency. +description: Advanced DataQueue patterns — job dependencies, step memoization, waits, tokens, cron, timeouts, tags, idempotency. --- # DataQueue Advanced Patterns +## Job Dependencies + +Use `dependsOn` on `addJob` or `addJobs` so a job stays **pending** until prerequisites are satisfied (PostgreSQL and Redis). Combining `jobIds` and `tags` requires **both** to be clear (logical AND). + +### Prerequisites by job id (`dependsOn.jobIds`) + +The job runs only after **every** listed job has reached `completed`. DataQueue validates ids, rejects self-dependencies and cycles, and cancels dependents (transitively) if a prerequisite ends `failed` or `cancelled`. + +- **`addJob`**: use only **positive** existing job ids. +- **`addJobs`**: import `batchDepRef` from `@nicnocquee/dataqueue` to point at another entry in the **same** batch — e.g. `dependsOn: { jobIds: [batchDepRef(0)] }` waits for the job at index `0`. + +```typescript +import { batchDepRef } from '@nicnocquee/dataqueue'; + +const [idA, idB] = await queue.addJobs([ + { jobType: 'ingest', payload: { fileId: '1' } }, + { + jobType: 'transform', + payload: { fileId: '1' }, + dependsOn: { jobIds: [batchDepRef(0)] }, + }, +]); +``` + +### Tag drain (`dependsOn.tags`) + +Wait until there is **no** other active job (`pending`, `processing`, or `waiting`) whose `tags` are a **superset** of every tag in `dependsOn.tags`. Use for “wave” or tenant barriers. If a matching job fails or is cancelled, dependent jobs waiting on those tags are cancelled (transitively). + +```typescript +await queue.addJob({ + jobType: 'finalize_wave', + payload: { wave: 2 }, + tags: ['wave:2'], + dependsOn: { tags: ['wave:1'] }, +}); +``` + +Persisted fields on `JobRecord`: `dependsOnJobIds`, `dependsOnTags`. + ## Step Memoization with ctx.run() Wrap side-effectful work in `ctx.run(stepName, fn)`. Results are cached in the database — when the handler re-runs after a wait, completed steps replay from cache without re-executing. diff --git a/packages/dataqueue/ai/skills/dataqueue-core/SKILL.md b/packages/dataqueue/ai/skills/dataqueue-core/SKILL.md index 0e86965..0500438 100644 --- a/packages/dataqueue/ai/skills/dataqueue-core/SKILL.md +++ b/packages/dataqueue/ai/skills/dataqueue-core/SKILL.md @@ -143,6 +143,15 @@ const jobIds = await queue.addJobs([ Each job can independently have its own `idempotencyKey`, `priority`, `runAt`, `tags`, etc. The `{ db }` transactional option is also supported (PostgreSQL only). +### Job dependencies + +Optional `dependsOn` defers a job until prerequisites are satisfied: + +- `dependsOn.jobIds` — wait until every listed job is `completed` (ids must exist; cycles and self-deps are rejected). +- `dependsOn.tags` — tag-drain: wait while another active job’s tags are a superset of every listed tag. + +For multiple jobs in one `addJobs` call, import `batchDepRef` and pass `batchDepRef(0)`, `batchDepRef(1)`, etc., to depend on earlier entries in the same array. See the **dataqueue-advanced** skill for failure/cancellation propagation and full semantics. + ### Transactional Job Creation (PostgreSQL only) Pass an external `pg.PoolClient` inside a transaction via `{ db: client }`: From e4ae1d54742fc253488412af6323cfd4f9371111 Mon Sep 17 00:00:00 2001 From: Nico Prananta <311343+nicnocquee@users.noreply.github.com> Date: Sun, 22 Mar 2026 13:47:19 +0100 Subject: [PATCH 3/4] Add batch job processing and dependency handling - Introduced a new endpoint for batch job insertion at `/api/jobs/batch`, allowing multiple jobs to be added simultaneously with support for job dependencies using `dependsOn.jobIds` and `dependsOn.tags`. - Updated the existing job creation endpoint to include the `dependsOn` field in the job options. - Enhanced helper functions to support batch job addition and job dependency management. - Added comprehensive end-to-end tests to validate the functionality of job dependencies and batch processing. These changes significantly enhance the job management capabilities of DataQueue, enabling more complex workflows and efficient job handling. --- apps/e2e/app/api/jobs/batch/route.ts | 48 +++++++++ apps/e2e/app/api/jobs/route.ts | 3 +- apps/e2e/e2e/helpers.ts | 32 ++++++ apps/e2e/e2e/job-dependencies.spec.ts | 100 ++++++++++++++++++ .../dataqueue/src/backends/redis-scripts.ts | 7 +- 5 files changed, 187 insertions(+), 3 deletions(-) create mode 100644 apps/e2e/app/api/jobs/batch/route.ts create mode 100644 apps/e2e/e2e/job-dependencies.spec.ts diff --git a/apps/e2e/app/api/jobs/batch/route.ts b/apps/e2e/app/api/jobs/batch/route.ts new file mode 100644 index 0000000..1ce2ab8 --- /dev/null +++ b/apps/e2e/app/api/jobs/batch/route.ts @@ -0,0 +1,48 @@ +import { NextRequest, NextResponse } from 'next/server'; +import { getJobQueue } from '@/lib/queue'; + +/** + * POST /api/jobs/batch — insert multiple jobs (supports batch-relative dependsOn.jobIds). + * Body: { jobs: JobOptions[] } — same fields as POST /api/jobs per item. + */ +export async function POST(request: NextRequest) { + try { + const body = await request.json(); + const jobs = body.jobs as Array<{ + jobType: string; + payload: unknown; + maxAttempts?: number; + priority?: number; + runAt?: string; + timeoutMs?: number; + forceKillOnTimeout?: boolean; + tags?: string[]; + idempotencyKey?: string; + dependsOn?: { jobIds?: number[]; tags?: string[] }; + }>; + if (!Array.isArray(jobs)) { + return NextResponse.json( + { error: 'Expected body.jobs to be an array' }, + { status: 400 }, + ); + } + const queue = getJobQueue(); + const ids = await queue.addJobs( + jobs.map((j) => ({ + jobType: j.jobType, + payload: j.payload, + maxAttempts: j.maxAttempts, + priority: j.priority, + runAt: j.runAt ? new Date(j.runAt) : undefined, + timeoutMs: j.timeoutMs, + forceKillOnTimeout: j.forceKillOnTimeout, + tags: j.tags, + idempotencyKey: j.idempotencyKey, + dependsOn: j.dependsOn, + })), + ); + return NextResponse.json({ ids }); + } catch (error) { + return NextResponse.json({ error: String(error) }, { status: 500 }); + } +} diff --git a/apps/e2e/app/api/jobs/route.ts b/apps/e2e/app/api/jobs/route.ts index 4550e4b..caf17a8 100644 --- a/apps/e2e/app/api/jobs/route.ts +++ b/apps/e2e/app/api/jobs/route.ts @@ -4,7 +4,7 @@ import { JobStatus } from '@nicnocquee/dataqueue'; /** * POST /api/jobs - Add a new job - * Body: { jobType, payload, maxAttempts?, priority?, runAt?, timeoutMs?, forceKillOnTimeout?, tags?, idempotencyKey? } + * Body: { jobType, payload, maxAttempts?, priority?, runAt?, timeoutMs?, forceKillOnTimeout?, tags?, idempotencyKey?, dependsOn? } */ export async function POST(request: NextRequest) { try { @@ -20,6 +20,7 @@ export async function POST(request: NextRequest) { forceKillOnTimeout: body.forceKillOnTimeout, tags: body.tags, idempotencyKey: body.idempotencyKey, + dependsOn: body.dependsOn, }); return NextResponse.json({ id }); } catch (error) { diff --git a/apps/e2e/e2e/helpers.ts b/apps/e2e/e2e/helpers.ts index c024b05..8d9bd00 100644 --- a/apps/e2e/e2e/helpers.ts +++ b/apps/e2e/e2e/helpers.ts @@ -2,6 +2,12 @@ import { APIRequestContext } from '@playwright/test'; const BASE = 'http://localhost:3099'; +/** + * Enqueue a single job via the e2e API. + * + * @param request - Playwright API request context. + * @param options - Job fields; `dependsOn` uses real job ids (negative batch refs are not valid here). + */ export async function addJob( request: APIRequestContext, options: { @@ -14,12 +20,38 @@ export async function addJob( forceKillOnTimeout?: boolean; tags?: string[]; idempotencyKey?: string; + dependsOn?: { jobIds?: number[]; tags?: string[] }; }, ) { const res = await request.post(`${BASE}/api/jobs`, { data: options }); return res.json() as Promise<{ id: number }>; } +/** + * Batch-insert jobs (supports `dependsOn.jobIds` batch placeholders: -1 = first job in the batch, etc.). + * + * @param request - Playwright API request context. + * @param jobs - Same shape as {@link addJob} per entry. + */ +export async function addJobsBatch( + request: APIRequestContext, + jobs: Array<{ + jobType: string; + payload: Record; + maxAttempts?: number; + priority?: number; + runAt?: string; + timeoutMs?: number; + forceKillOnTimeout?: boolean; + tags?: string[]; + idempotencyKey?: string; + dependsOn?: { jobIds?: number[]; tags?: string[] }; + }>, +) { + const res = await request.post(`${BASE}/api/jobs/batch`, { data: { jobs } }); + return res.json() as Promise<{ ids: number[] }>; +} + export async function getJob(request: APIRequestContext, id: number) { const res = await request.get(`${BASE}/api/jobs/${id}`); return res.json() as Promise<{ job: Record }>; diff --git a/apps/e2e/e2e/job-dependencies.spec.ts b/apps/e2e/e2e/job-dependencies.spec.ts new file mode 100644 index 0000000..6017501 --- /dev/null +++ b/apps/e2e/e2e/job-dependencies.spec.ts @@ -0,0 +1,100 @@ +import { test, expect } from '@playwright/test'; +import { addJob, addJobsBatch, getJob, processJobs } from './helpers'; + +test.describe('Job dependencies', () => { + test('dependent with dependsOn.jobIds runs only after prerequisite completes', async ({ + request, + }) => { + const { id: prereqId } = await addJob(request, { + jobType: 'fast-job', + payload: { value: 'dep-prereq' }, + }); + const { id: depId } = await addJob(request, { + jobType: 'fast-job', + payload: { value: 'dep-follow' }, + dependsOn: { jobIds: [prereqId] }, + }); + + await processJobs(request); + const { job: afterFirst } = await getJob(request, depId); + expect(afterFirst.status).toBe('pending'); + + const { job: prereqAfter } = await getJob(request, prereqId); + expect(prereqAfter.status).toBe('completed'); + + await processJobs(request); + const { job: depDone } = await getJob(request, depId); + expect(depDone.status).toBe('completed'); + }); + + test('dependsOn.tags: dependent waits until tagged barrier job finishes', async ({ + request, + }) => { + await addJob(request, { + jobType: 'slow-job', + payload: { value: 'barrier', delayMs: 250 }, + tags: ['e2e-barrier'], + }); + const { id: depId } = await addJob(request, { + jobType: 'fast-job', + payload: { value: 'after-barrier' }, + dependsOn: { tags: ['e2e-barrier'] }, + }); + + await processJobs(request); + const { job: depAfterBarrier } = await getJob(request, depId); + expect(depAfterBarrier.status).toBe('pending'); + + await processJobs(request); + const { job: depDone } = await getJob(request, depId); + expect(depDone.status).toBe('completed'); + }); + + test('addJobs batch resolves batch-relative dependsOn.jobIds', async ({ + request, + }) => { + const { ids } = await addJobsBatch(request, [ + { jobType: 'fast-job', payload: { value: 'batch-a' } }, + { + jobType: 'fast-job', + payload: { value: 'batch-b' }, + dependsOn: { jobIds: [-1] }, + }, + ]); + expect(ids).toHaveLength(2); + + const { job: first } = await getJob(request, ids[0]!); + const { job: second } = await getJob(request, ids[1]!); + expect(second.dependsOnJobIds).toEqual([first.id]); + + await processJobs(request); + const { job: aAfter } = await getJob(request, ids[0]!); + const { job: bAfter } = await getJob(request, ids[1]!); + expect(aAfter.status).toBe('completed'); + expect(bAfter.status).toBe('pending'); + + await processJobs(request); + const { job: bDone } = await getJob(request, ids[1]!); + expect(bDone.status).toBe('completed'); + }); + + test('failed prerequisite cancels pending dependent', async ({ request }) => { + const { id: prereqId } = await addJob(request, { + jobType: 'failing-job', + payload: { value: 'dep-root-fail', shouldFail: true }, + maxAttempts: 1, + }); + const { id: depId } = await addJob(request, { + jobType: 'fast-job', + payload: { value: 'dep-cancelled' }, + dependsOn: { jobIds: [prereqId] }, + }); + + await processJobs(request); + + const { job: prereq } = await getJob(request, prereqId); + const { job: dep } = await getJob(request, depId); + expect(prereq.status).toBe('failed'); + expect(dep.status).toBe('cancelled'); + }); +}); diff --git a/packages/dataqueue/src/backends/redis-scripts.ts b/packages/dataqueue/src/backends/redis-scripts.ts index f2aeffe..115d422 100644 --- a/packages/dataqueue/src/backends/redis-scripts.ts +++ b/packages/dataqueue/src/backends/redis-scripts.ts @@ -167,6 +167,9 @@ return id * runAtMs, timeoutMs, forceKillOnTimeout, tags (JSON or "null"), * idempotencyKey, retryDelay, retryBackoff, retryDelayMax, deadLetterJobType * Returns: array of job IDs (one per input job, in order) + * + * Note: JSON null decodes to cjson.null, which is truthy in Lua; optional fields must + * be normalized with explicit nil/cjson.null checks before tostring/cjson.decode. */ export const ADD_JOBS_SCRIPT = ` local prefix = KEYS[1] @@ -192,8 +195,8 @@ for i, job in ipairs(jobs) do local deadLetterJobType = tostring(job.deadLetterJobType) local groupId = tostring(job.groupId) local groupTier = tostring(job.groupTier) - local dependsOnJobIdsJson = job.dependsOnJobIds and tostring(job.dependsOnJobIds) or "null" - local dependsOnTagsJson = job.dependsOnTags and tostring(job.dependsOnTags) or "null" + local dependsOnJobIdsJson = (job.dependsOnJobIds ~= nil and job.dependsOnJobIds ~= cjson.null) and tostring(job.dependsOnJobIds) or "null" + local dependsOnTagsJson = (job.dependsOnTags ~= nil and job.dependsOnTags ~= cjson.null) and tostring(job.dependsOnTags) or "null" -- Idempotency check local skip = false From d92eb1b9e365ab68e6197d95035ecda3296fe900 Mon Sep 17 00:00:00 2001 From: Nico Prananta <311343+nicnocquee@users.noreply.github.com> Date: Sun, 22 Mar 2026 13:51:28 +0100 Subject: [PATCH 4/4] Refactor batch job addition to use mapped job options - Updated the batch job processing endpoint to utilize a mapped array of job options, enhancing type safety with `JobOptions` and `JobType`. - Improved code readability by separating the mapping logic from the job addition call. - Ensured compatibility with the existing job dependency features introduced in previous commits. These changes streamline the job addition process and maintain consistency with the recent enhancements in job management capabilities. --- apps/e2e/app/api/jobs/batch/route.ts | 28 +++++++++++++++------------- 1 file changed, 15 insertions(+), 13 deletions(-) diff --git a/apps/e2e/app/api/jobs/batch/route.ts b/apps/e2e/app/api/jobs/batch/route.ts index 1ce2ab8..761b43d 100644 --- a/apps/e2e/app/api/jobs/batch/route.ts +++ b/apps/e2e/app/api/jobs/batch/route.ts @@ -1,5 +1,6 @@ import { NextRequest, NextResponse } from 'next/server'; -import { getJobQueue } from '@/lib/queue'; +import type { JobOptions, JobType } from '@nicnocquee/dataqueue'; +import { getJobQueue, type TestPayloadMap } from '@/lib/queue'; /** * POST /api/jobs/batch — insert multiple jobs (supports batch-relative dependsOn.jobIds). @@ -27,19 +28,20 @@ export async function POST(request: NextRequest) { ); } const queue = getJobQueue(); + const mapped = jobs.map((j) => ({ + jobType: j.jobType, + payload: j.payload, + maxAttempts: j.maxAttempts, + priority: j.priority, + runAt: j.runAt ? new Date(j.runAt) : undefined, + timeoutMs: j.timeoutMs, + forceKillOnTimeout: j.forceKillOnTimeout, + tags: j.tags, + idempotencyKey: j.idempotencyKey, + dependsOn: j.dependsOn, + })); const ids = await queue.addJobs( - jobs.map((j) => ({ - jobType: j.jobType, - payload: j.payload, - maxAttempts: j.maxAttempts, - priority: j.priority, - runAt: j.runAt ? new Date(j.runAt) : undefined, - timeoutMs: j.timeoutMs, - forceKillOnTimeout: j.forceKillOnTimeout, - tags: j.tags, - idempotencyKey: j.idempotencyKey, - dependsOn: j.dependsOn, - })), + mapped as JobOptions>[], ); return NextResponse.json({ ids }); } catch (error) {