Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions apps/demo/app/features/add-jobs/add-job-form.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ const defaultPayloads: Record<string, string> = {
null,
2,
),
dep_demo: JSON.stringify({ label: 'Example dependency demo step' }, null, 2),
};

export function AddJobForm() {
Expand Down Expand Up @@ -124,6 +125,7 @@ export function AddJobForm() {
<SelectItem value="approval_request">
approval_request
</SelectItem>
<SelectItem value="dep_demo">dep_demo</SelectItem>
</SelectContent>
</Select>
</div>
Expand Down
190 changes: 190 additions & 0 deletions apps/demo/app/features/dependencies/dependency-demo.tsx
Original file line number Diff line number Diff line change
@@ -0,0 +1,190 @@
'use client';

import { useState, useTransition } from 'react';
import { Button } from '@/components/ui/button';
import {
Card,
CardContent,
CardDescription,
CardHeader,
CardTitle,
} from '@/components/ui/card';
import { addGenericJob } from '@/app/jobs/add-job';
import { processJobs } from '@/app/jobs/process-jobs';
import { Loader2 } from 'lucide-react';

/**
* Interactive demos for `dependsOn.jobIds` (linear prerequisites) and
* `dependsOn.tags` (tag drain: wait until no other active job is tagged as a superset).
*/
export function DependencyDemo() {
const [isPending, startTransition] = useTransition();
const [log, setLog] = useState<string | null>(null);

const runChain = () => {
startTransition(async () => {
const a = await addGenericJob({
jobType: 'dep_demo',
payload: { label: 'chain — step A' },
tags: ['demo-deps', 'chain'],
});
const b = await addGenericJob({
jobType: 'dep_demo',
payload: { label: 'chain — step B' },
tags: ['demo-deps', 'chain'],
dependsOn: { jobIds: [a.job] },
});
const c = await addGenericJob({
jobType: 'dep_demo',
payload: { label: 'chain — step C' },
tags: ['demo-deps', 'chain'],
dependsOn: { jobIds: [b.job] },
});
setLog(
`Enqueued linear chain: A=#${a.job} → B=#${b.job} (waits on A) → C=#${c.job} (waits on B). Run the processor to watch order.`,
);
});
};

const runTagBarrier = () => {
startTransition(async () => {
const wave = `wave-${Date.now()}`;
const j1 = await addGenericJob({
jobType: 'dep_demo',
payload: { label: 'parallel slot 1' },
tags: ['demo-deps', wave, 'slot-1'],
});
const j2 = await addGenericJob({
jobType: 'dep_demo',
payload: { label: 'parallel slot 2' },
tags: ['demo-deps', wave, 'slot-2'],
});
const barrier = await addGenericJob({
jobType: 'dep_demo',
payload: { label: 'after wave (tag drain)' },
tags: ['demo-deps', wave, 'barrier'],
dependsOn: { tags: [wave] },
});
setLog(
`Tag “${wave}”: parallel jobs #${j1.job} and #${j2.job} must finish (or leave the active set) before #${barrier.job} runs — barrier waits while any job still has tag [${wave}].`,
);
});
};

const runFailureCascade = () => {
startTransition(async () => {
const bad = await addGenericJob({
jobType: 'dep_demo',
payload: { label: 'fails on purpose', fail: true },
tags: ['demo-deps', 'fail-cascade'],
maxAttempts: 1,
});
const dep = await addGenericJob({
jobType: 'dep_demo',
payload: { label: 'cancelled when prerequisite fails' },
tags: ['demo-deps', 'fail-cascade'],
dependsOn: { jobIds: [bad.job] },
});
setLog(
`Enqueued prerequisite #${bad.job} (will fail once) and dependent #${dep.job}. After processing, #${dep.job} should end cancelled.`,
);
});
};

const triggerProcessor = () => {
startTransition(async () => {
await processJobs();
setLog((prev) =>
prev
? `${prev}\nProcessor tick complete — refresh the table below.`
: 'Processor tick complete — refresh the table below.',
);
});
};

return (
<div className="space-y-6">
<Card>
<CardHeader>
<CardTitle>Linear chain (`dependsOn.jobIds`)</CardTitle>
<CardDescription>
B waits until job A is completed; C waits until B is completed.
Prerequisite failures or cancellations propagate to dependents.
</CardDescription>
</CardHeader>
<CardContent className="flex flex-wrap gap-2">
<Button onClick={runChain} disabled={isPending}>
{isPending ? <Loader2 className="h-4 w-4 animate-spin" /> : null}
Enqueue A → B → C
</Button>
</CardContent>
</Card>

<Card>
<CardHeader>
<CardTitle>Tag drain (`dependsOn.tags`)</CardTitle>
<CardDescription>
The barrier job waits until no <em>active</em> job (pending,
processing, or waiting) still includes all of those tags. Parallel
work can run; the barrier runs after the wave clears.
</CardDescription>
</CardHeader>
<CardContent className="flex flex-wrap gap-2">
<Button
variant="secondary"
onClick={runTagBarrier}
disabled={isPending}
>
{isPending ? <Loader2 className="h-4 w-4 animate-spin" /> : null}
Enqueue parallel wave + barrier
</Button>
</CardContent>
</Card>

<Card>
<CardHeader>
<CardTitle>Failure cascade</CardTitle>
<CardDescription>
A job that fails (here: single attempt, intentional throw) cancels
jobs that depend on it by job id.
</CardDescription>
</CardHeader>
<CardContent className="flex flex-wrap gap-2">
<Button
variant="destructive"
onClick={runFailureCascade}
disabled={isPending}
>
{isPending ? <Loader2 className="h-4 w-4 animate-spin" /> : null}
Enqueue failing job + dependent
</Button>
</CardContent>
</Card>

<Card>
<CardHeader>
<CardTitle>Run the processor</CardTitle>
<CardDescription>
Same as other demos: jobs move when the processor runs (manual
button here, or Auto Processor on the home page).
</CardDescription>
</CardHeader>
<CardContent>
<Button
variant="outline"
onClick={triggerProcessor}
disabled={isPending}
>
Process jobs now
</Button>
</CardContent>
</Card>

{log && (
<p className="text-sm rounded-md border bg-muted/40 px-3 py-2 whitespace-pre-wrap">
{log}
</p>
)}
</div>
);
}
34 changes: 34 additions & 0 deletions apps/demo/app/features/dependencies/page.tsx
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
export const dynamic = 'force-dynamic';

import { FeaturePage } from '@/components/feature-page';
import { JobMonitor } from '@/components/job-monitor';
import { DependencyDemo } from './dependency-demo';
import { RefreshPeriodically } from '@/app/refresh-periodically';
import { refresh } from '@/app/queue/refresh';

export default function DependenciesPage() {
return (
<FeaturePage
title="Job dependencies"
description="Optional dependsOn when enqueueing: wait on specific job ids (completed prerequisites) and/or tag-drain barriers. Failures and cancellations on prerequisites cascade to dependents."
docsLinks={[
{
label: 'Add job',
url: 'https://docs.dataqueue.dev/usage/add-job',
},
]}
>
<RefreshPeriodically action={refresh} interval={5000} />
<div className="space-y-6">
<DependencyDemo />
<JobMonitor
title="Jobs from this demo"
description="Filter: dep_demo. Dependency columns show persisted prerequisites."
filter={{ jobType: 'dep_demo' }}
compact
showDependencyColumns
/>
</div>
</FeaturePage>
);
}
20 changes: 20 additions & 0 deletions apps/demo/app/jobs/add-job.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,14 @@

import { getJobQueue, type JobPayloadMap } from '@/lib/queue';
import { revalidatePath } from 'next/cache';
import type { JobDependsOn } from '@nicnocquee/dataqueue';

/**
* Enqueues a typed job. Optionally attaches prerequisites via {@link JobDependsOn}.
*
* @param params - Job type, payload, and optional scheduling / dependency fields.
* @returns The new job id (numeric) as `job`.
*/
export const addGenericJob = async ({
jobType,
payload,
Expand All @@ -13,6 +20,7 @@ export const addGenericJob = async ({
timeoutMs,
forceKillOnTimeout,
maxAttempts,
dependsOn,
}: {
jobType: keyof JobPayloadMap;
payload: JobPayloadMap[keyof JobPayloadMap];
Expand All @@ -23,12 +31,22 @@ export const addGenericJob = async ({
timeoutMs?: number;
forceKillOnTimeout?: boolean;
maxAttempts?: number;
dependsOn?: JobDependsOn;
}) => {
const jobQueue = getJobQueue();
const runAt = runAtDelay
? new Date(Date.now() + runAtDelay * 1000)
: undefined;

const normalizedDependsOn: JobDependsOn | undefined =
dependsOn &&
((dependsOn.jobIds?.length ?? 0) > 0 || (dependsOn.tags?.length ?? 0) > 0)
? {
...(dependsOn.jobIds?.length ? { jobIds: dependsOn.jobIds } : {}),
...(dependsOn.tags?.length ? { tags: dependsOn.tags } : {}),
}
: undefined;

const job = await jobQueue.addJob({
jobType,
payload: payload as never,
Expand All @@ -39,8 +57,10 @@ export const addGenericJob = async ({
timeoutMs: timeoutMs ?? undefined,
forceKillOnTimeout: forceKillOnTimeout ?? undefined,
maxAttempts: maxAttempts ?? undefined,
dependsOn: normalizedDependsOn,
});

revalidatePath('/');
revalidatePath('/features/dependencies');
return { job };
};
9 changes: 9 additions & 0 deletions apps/demo/app/page.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import {
Monitor,
ExternalLink,
CalendarClock,
GitBranch,
} from 'lucide-react';
import Link from 'next/link';

Expand All @@ -43,6 +44,14 @@ const features = [
icon: Tags,
docsUrl: 'https://docs.dataqueue.dev/usage/get-jobs',
},
{
title: 'Job dependencies',
description:
'dependsOn job ids and tag-drain barriers; cascade on fail or cancel',
href: '/features/dependencies',
icon: GitBranch,
docsUrl: 'https://docs.dataqueue.dev/usage/add-job',
},
{
title: 'Job Management',
description: 'Retry, cancel, and edit individual jobs',
Expand Down
6 changes: 6 additions & 0 deletions apps/demo/components/app-sidebar.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import {
Monitor,
LayoutDashboard,
CalendarClock,
GitBranch,
} from 'lucide-react';
import Link from 'next/link';
import { usePathname } from 'next/navigation';
Expand All @@ -33,6 +34,11 @@ const featurePages = [
{ title: 'Overview', href: '/', icon: Home },
{ title: 'Add & Process Jobs', href: '/features/add-jobs', icon: Plus },
{ title: 'Tags & Filtering', href: '/features/tags', icon: Tags },
{
title: 'Job dependencies',
href: '/features/dependencies',
icon: GitBranch,
},
{ title: 'Job Management', href: '/features/management', icon: Settings },
{ title: 'Idempotency', href: '/features/idempotency', icon: Key },
{ title: 'Timeouts', href: '/features/timeouts', icon: Timer },
Expand Down
Loading
Loading