Understanding how to properly use bunqueue #12
-
Beta Was this translation helpful? Give feedback.
Replies: 4 comments
-
|
I found more info about the jobs: but yea no idea why it's not running. |
Beta Was this translation helpful? Give feedback.
-
|
Hey Arthur, thanks for the detailed writeup! I can see a few things going on here, let me walk through them. The main issue: mode mismatchYou're running bunqueue as a systemd service (TCP server mode), which is correct. But the worker setup in your app seems to follow the The same applies to events: How to fix itSince you already have the bunqueue server running via systemd, your app should use the regular import { Queue, Worker } from 'bunqueue/client';
// Both connect to your bunqueue systemd server
const queue = new Queue('ftp-pipeline', {
connection: { host: 'localhost', port: 6789 }
});
const worker = new Worker('ftp-pipeline', async (job) => {
console.log(`Processing: ${job.name}`, job.data);
await job.updateProgress(10, 'Starting...');
// your logic here
const result = await doWork(job.data);
await job.updateProgress(100, 'Done');
return result;
}, {
connection: { host: 'localhost', port: 6789 },
concurrency: 1,
});
// Now events work
worker.on('completed', (job, result) => {
console.log(`Job ${job.id} completed`, result);
});
worker.on('failed', (job, error) => {
console.error(`Job ${job.id} failed`, error.message);
});The key rule is: if About your large data (200-400MB)This is important: do not put 200-400MB of raw data inside the job payload. Job data gets serialized via msgpack and stored in SQLite, so large payloads will cause memory spikes, slow serialization, and database bloat. Instead, store only references in the job data (FTP paths, file IDs, database query parameters) and let the worker fetch the actual data during processing: // Good: small job payload with references
await queue.add('fetch-ftp', {
ftpHost: 'ftp.example.com',
remotePath: '/data/export-2026-03-01.csv',
localTempPath: '/tmp/ftp-downloads/'
});
// Inside the worker, download and process directly
const worker = new Worker('ftp-pipeline', async (job) => {
if (job.name === 'fetch-ftp') {
const data = await downloadFromFTP(job.data.ftpHost, job.data.remotePath);
await saveToTempFile(job.data.localTempPath, data);
return { filePath: job.data.localTempPath, recordCount: data.length };
}
// ...
});For your pipeline: FlowProducerFor the multi-step workflow (fetch from FTP, filter, create/update, cleanup old data), you can use import { FlowProducer, Worker } from 'bunqueue/client';
const flow = new FlowProducer({
connection: { host: 'localhost', port: 6789 }
});
// Create the pipeline: each step waits for the previous one to complete
const { jobIds } = await flow.addChain([
{
name: 'fetch-ftp',
queueName: 'ftp-pipeline',
data: { ftpHost: 'ftp.example.com', remotePath: '/data/export.csv' }
},
{
name: 'filter-data',
queueName: 'ftp-pipeline',
data: { criteria: 'active-only' }
},
{
name: 'create-update-records',
queueName: 'ftp-pipeline',
data: { table: 'products' }
},
{
name: 'cleanup-old',
queueName: 'ftp-pipeline',
data: { olderThanDays: 30 }
},
]);
console.log('Pipeline started, job IDs:', jobIds);Then a single worker handles all steps based on the job name: const worker = new Worker('ftp-pipeline', async (job) => {
switch (job.name) {
case 'fetch-ftp': {
await job.updateProgress(10, 'Connecting to FTP...');
const records = await fetchFromFTP(job.data);
await saveToDisk('/tmp/pipeline/raw.json', records);
await job.updateProgress(100, 'Downloaded');
return { recordCount: records.length, path: '/tmp/pipeline/raw.json' };
}
case 'filter-data': {
const raw = await readFromDisk('/tmp/pipeline/raw.json');
const filtered = raw.filter(r => r.status === 'active');
await saveToDisk('/tmp/pipeline/filtered.json', filtered);
return { filteredCount: filtered.length };
}
case 'create-update-records': {
const filtered = await readFromDisk('/tmp/pipeline/filtered.json');
const result = await upsertRecords(filtered);
return { created: result.created, updated: result.updated };
}
case 'cleanup-old': {
const deleted = await removeOldRecords(job.data.olderThanDays);
return { deleted };
}
}
}, {
connection: { host: 'localhost', port: 6789 },
concurrency: 1,
});Each step automatically waits for the previous one to complete before starting. The execution order is guaranteed: fetch → filter → create/update → cleanup. Your systemd setupYour systemd configuration for the bunqueue server looks fine. Just make sure your app code (the Queue and Worker) uses the Quick summaryThe worker file you created ( Let me know if you run into anything else! |
Beta Was this translation helpful? Give feedback.
-
|
Thanks for detailed explanation. So I should switch to the regular Worker. The main reason I want to use the But what do you think is the best take? Should I start using embedded worker? or keep the structure as you described it to me? I do like this FlowProducer I will look into that. Like for example. The fetching data from the ftp and then using that data into the second child process. Or is the best just to write a file and read that out? |
Beta Was this translation helpful? Give feedback.
-
|
Great questions! Embedded vs TCP: what to chooseSince you already have a production API running in the same process and you want to offload heavy work, embedded mode with The TCP (server mode) alternative would be: run TL;DR: Stay with embedded + SandboxedWorker. Switch to TCP/server mode only if you need to run workers on separate machines or want independent scaling. Passing data between chain stepsYes! import { FlowProducer } from 'bunqueue/client';
const flow = new FlowProducer({ embedded: true });
await flow.addChain([
{ name: 'fetch-ftp', queueName: 'pipeline', data: { ftpPath: '/export.csv' } },
{ name: 'filter', queueName: 'pipeline', data: {} },
{ name: 'upsert', queueName: 'pipeline', data: {} },
]);In your SandboxedWorker processor, each step can get the previous result: // In the processor file
export default async (job) => {
if (job.name === 'fetch-ftp') {
const records = await downloadFTP(job.data.ftpPath);
await Bun.write('/tmp/pipeline/raw.json', JSON.stringify(records));
return { path: '/tmp/pipeline/raw.json', count: records.length };
}
if (job.name === 'filter') {
// Access previous step's result via __flowParentId in job.data
const parentResult = await flow.getParentResult(job.data.__flowParentId);
// parentResult = { path: '/tmp/pipeline/raw.json', count: 1234 }
const raw = JSON.parse(await Bun.file(parentResult.path).text());
const filtered = raw.filter(r => r.active);
await Bun.write('/tmp/pipeline/filtered.json', JSON.stringify(filtered));
return { path: '/tmp/pipeline/filtered.json', count: filtered.length };
}
};Important: large data (200-400MB)Never pass large data through job results. Results are stored in an LRU cache (max 10,000 entries) and SQLite. Serializing 200MB+ objects will cause memory spikes and slow down the queue. The pattern you should use:
This way the job result is just Let me know if you need help setting up the chain! |
Beta Was this translation helpful? Give feedback.




Great questions!
Embedded vs TCP: what to choose
Since you already have a production API running in the same process and you want to offload heavy work, embedded mode with
SandboxedWorkeris actually the right choice for your use case. EachSandboxedWorkerruns in an isolated Bun Worker thread with its own heap, so it won't block your API's event loop or compete for the same memory. That's exactly the separation you're looking for.The TCP (server mode) alternative would be: run
bunqueue startas a separate systemd service, then connect your app vianew Worker(queue, processor, { connection: { port: 6789 } }). This gives you process-level isolation (separate PID) and lets you scale the qu…