Fix resource exhaustion in consumer export worker (issue #249)#254
Fix resource exhaustion in consumer export worker (issue #249)#254anshul23102 wants to merge 2 commits into
Conversation
…ion streaming Issue geturbackend#249: The export worker streams entire collections without any document count cap, causing memory exhaustion and long-running operations on large collections. Changes: - Add MAX_EXPORT_ROWS constant (100000) to cap maximum documents per export - Add .limit(MAX_EXPORT_ROWS) to MongoDB cursor in both storage providers (Supabase and S3/Cloudflare R2) - Add exported document counter to track progress - Log truncation warning when export hits the limit This prevents unbounded collection iteration and protects against resource exhaustion while allowing exports of reasonably-sized datasets up to 100k documents.
📝 WalkthroughWalkthroughAdds MAX_EXPORT_ROWS (100,000) and uses a sentinel-row (.limit(MAX_EXPORT_ROWS + 1)) to detect truncation in both Supabase and S3/R2 export paths; streams up to the cap, logs truncation when hit, and passes wasTruncated/maxExportRows to the post-export email job. ChangesDocument Export Limit
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~20 minutes Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 5✅ Passed checks (5 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 3
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@apps/consumer/src/workers/export.worker.js`:
- Around line 71-75: The cursor iteration currently writes chunks
unconditionally and can overflow memory; modify both loops (the one using
writeStream.write in the cursor loop and the other at 124-128 that uses
passThrough.write) to honor stream backpressure by checking the boolean return
value of write(...) and awaiting the stream's 'drain' event when write(...)
returns false (use a Promise/once wrapper to await the 'drain' event). Ensure
you await before continuing the for-await loop so the producer pauses until the
consumer can accept more data.
- Around line 78-80: The worker currently only logs when the export is truncated
(the branch checking exportedCount >= MAX_EXPORT_ROWS) so recipients get no
indication; update the export flow to propagate truncation state: when hitting
the exportedCount >= MAX_EXPORT_ROWS condition (and the similar checks at the
other locations around lines 131-133 and 162-163), set a truncation flag on the
export result object (e.g., exportResult.truncated = true) and include
human-readable metadata in the generated file (for JSON exports add a top-level
"truncated": true or append a small footer for other formats), then ensure the
email-send path (the email worker/template that consumes the exportResult) reads
that flag and renders an explicit "Export truncated at MAX_EXPORT_ROWS" note in
the message and/or attachment description so recipients are aware the download
is partial.
- Around line 67-80: The truncation check is incorrect because exportedCount ===
MAX_EXPORT_ROWS can be true even when the collection size exactly equals the
limit; change the logic in the export loop that uses
Model.find().lean().limit(MAX_EXPORT_ROWS).cursor(), exportedCount, and
writeStream so you query one extra document (limit MAX_EXPORT_ROWS + 1) or
perform an existence check for the (MAX_EXPORT_ROWS + 1)th row, only write at
most MAX_EXPORT_ROWS documents to the stream, and set the "truncated" warning
based on whether the extra document was present; apply the same fix to the other
export block referenced (lines around 120-133) that uses the same exportedCount
logic.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 6364bcf9-88ea-4db2-9f0b-bc5efa69dbe1
📒 Files selected for processing (1)
apps/consumer/src/workers/export.worker.js
|
Hi @yash-pouranik, just a note: I am contributing under both GSSoC and NSoC, but this PR is raised under GSSoC '26. Could you please add the appropriate GSSoC label to this PR? That helps with contribution tracking and scoring. Thank you. |
|
Can we please have a convo on discord? - https://discord.com/invite/CXJjvJkNWn |
|
please fix coderabbit's comment. |
Three issues raised in code review: 1. Sentinel row detection: the cursor now queries MAX_EXPORT_ROWS + 1 documents instead of MAX_EXPORT_ROWS. Writing stops after MAX_EXPORT_ROWS documents and wasTruncated is set to true only when the (MAX_EXPORT_ROWS+1)th document is received. This prevents a false truncation warning when the collection contains exactly MAX_EXPORT_ROWS documents. 2. Stream backpressure: all write() calls are replaced with writeChunk() which awaits the drain event when the internal buffer is full. This prevents unbounded memory growth when disk or object-storage throughput is slower than the Mongo cursor. 3. Surface truncation to recipient: wasTruncated and maxExportRows are now passed to the emailQueue payload so the email handler can inform the user that the downloaded file contains a capped subset.
|
Hi @yash-pouranik, I have addressed all three CodeRabbit comments:
Ready for re-review. |
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@apps/consumer/src/workers/export.worker.js`:
- Around line 38-43: The writeChunk helper can hang if the writable emits
'error' (or 'close') while awaiting the 'drain' event; update async function
writeChunk(stream, chunk) to create a promise that listens for both 'drain' and
'error' (and optionally 'close'), resolving on 'drain' and rejecting on 'error'
(and rejecting or resolving on 'close' per desired semantics), and ensure
listeners are cleaned up after either event; specifically replace the single
await new Promise(resolve => stream.once('drain', resolve)) with a promise that
registers stream.once('drain', onDrain) and stream.once('error', onError) (and
stream.once('close', onClose) if used), and calls stream.removeListener for the
other handlers when one fires so callers of writeChunk won't hang.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: f85c1f4a-3f55-4851-83ad-d2b84a98337a
📒 Files selected for processing (1)
apps/consumer/src/workers/export.worker.js
| async function writeChunk(stream, chunk) { | ||
| const canContinue = stream.write(chunk); | ||
| if (!canContinue) { | ||
| await new Promise((resolve) => stream.once('drain', resolve)); | ||
| } | ||
| } |
There was a problem hiding this comment.
🧩 Analysis chain
🌐 Web query:
Node.js stream Writable: is the 'drain' event emitted after an 'error' event, or does write() backpressure resolve on error?
💡 Result:
In Node.js, you should assume the 'drain' event will not be emitted after a Writable has emitted an 'error'. Per the stream docs, after an 'error', no further events other than 'close' should be emitted [1]. Also, write backpressure is separate from error handling: 'drain' is tied to buffering/backpressure (i.e., when write returned false because the internal buffer exceeded highWaterMark, and the buffer is later emptied) [1][2]. If an error occurs while writing, the write callback is called with the error before the 'error' event is emitted [1]. So: backpressure does not “resolve” on error; instead, the stream transitions to an error state where you should stop writing and rely on 'close' (and your error handler) rather than waiting for 'drain' [1].
Citations:
Unblock writeChunk on stream errors to prevent BullMQ worker hangs.
writeChunk only awaits 'drain' when stream.write(chunk) returns false. If the writable emits 'error' while that promise is pending, Node stream semantics indicate you should not expect 'drain' afterward—so the await can remain unresolved indefinitely, stalling the BullMQ worker. Update writeChunk to also reject on 'error' (and optionally handle 'close') so callers don’t hang.
🛡️ Proposed fix: reject on error
async function writeChunk(stream, chunk) {
const canContinue = stream.write(chunk);
if (!canContinue) {
- await new Promise((resolve) => stream.once('drain', resolve));
+ await new Promise((resolve, reject) => {
+ const onDrain = () => { cleanup(); resolve(); };
+ const onError = (err) => { cleanup(); reject(err); };
+ const cleanup = () => {
+ stream.removeListener('drain', onDrain);
+ stream.removeListener('error', onError);
+ };
+ stream.once('drain', onDrain);
+ stream.once('error', onError);
+ });
}
}🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@apps/consumer/src/workers/export.worker.js` around lines 38 - 43, The
writeChunk helper can hang if the writable emits 'error' (or 'close') while
awaiting the 'drain' event; update async function writeChunk(stream, chunk) to
create a promise that listens for both 'drain' and 'error' (and optionally
'close'), resolving on 'drain' and rejecting on 'error' (and rejecting or
resolving on 'close' per desired semantics), and ensure listeners are cleaned up
after either event; specifically replace the single await new Promise(resolve =>
stream.once('drain', resolve)) with a promise that registers
stream.once('drain', onDrain) and stream.once('error', onError) (and
stream.once('close', onClose) if used), and calls stream.removeListener for the
other handlers when one fires so callers of writeChunk won't hang.
|
some morree |
|
@coderabbitai |
|
Tip For best results, initiate chat on the files or code changes. 🧩 Analysis chain🏁 Script executed: #!/bin/bash
cat apps/consumer/src/workers/export.worker.jsLength of output: 9032
Issue 1 —
|
| # | Issue | Severity |
|---|---|---|
| 1 | readFileSync memory spike on Supabase path |
🔴 High |
| 2 | Cursor not closed after break (both paths) |
🔴 High |
| 3 | Finish-event race condition | 🟡 Medium |
| 4 | Duplicated cursor loop | 🟢 Low |
🧠 Learnings used
Learnt from: yash-pouranik
Repo: geturbackend/urBackend PR: 188
File: apps/consumer/src/workers/export.worker.js:55-72
Timestamp: 2026-05-28T09:08:02.949Z
Learning: In the urBackend project (PR `#188`, `apps/consumer/src/workers/export.worker.js` and `apps/dashboard-api/src/controllers/dbExport.controller.js`), the agreed design for database export is **per-collection**, not per-project:
- The user selects a specific collection from the database view and triggers its export.
- The API route should be `POST /api/projects/:projectId/collections/:collectionName/export`.
- The BullMQ job payload must include `collectionName`.
- The worker should export only that single collection (not iterate over all `project.collections`).
- Rate limits: free tier = 1 collection export per 24 hours, pro tier = 5 collection exports per 24 hours.
|
what this pr does ?? @coderabbitai |
|
Tip For best results, initiate chat on the files or code changes.
PR
|
|
but what if user wants export more rows? |
|
Tip For best results, initiate chat on the files or code changes.
Option A — Keep truncation (current approach)How it works: Export stops at 100 K docs; user is told the file is capped. Option B — Chunked multi-file export 🗂️ (your suggestion)How it works: Worker splits the cursor output into
Implementation sketch: const MAX_EXPORT_ROWS = 100_000;
let chunkIndex = 0, exportedCount = 0, chunkCount = 0;
const cursor = Model.find().lean().cursor();
try {
for await (const doc of cursor) {
if (exportedCount % MAX_EXPORT_ROWS === 0) {
// close previous chunk, open a new one
chunkIndex++;
// finalize previous writeStream, start new one
}
await writeChunk(currentStream, JSON.stringify(doc));
exportedCount++;
}
} finally {
await cursor.close();
}
// upload all chunks / zip themPro: User gets all their data. Option C — Single streamed upload (no temp file, no limit)Stream the Mongo cursor directly into the storage upload without buffering to disk. Requires the storage SDK to accept a readable stream (Supabase's JS SDK does support it; R2 via AWS SDK does too with Recommended path
If you want to go with Option B, I'd suggest:
That gives users all their data without multiple download links. Want me to plan out the code changes for Option B? 🧠 Learnings used |
|
supabase doesnt support true streaming. @coderabbitai |
|
Tip For best results, initiate chat on the files or code changes.
What "no true streaming" means in practice: However — the Even though Supabase buffers on the server side, the Supabase JS SDK ( // ❌ Current — loads entire file into memory on the Node side
const fileBuffer = fs.readFileSync(tempFilePath);
await client.storage.from(bucket).upload(storagePath, fileBuffer, { contentType: 'application/json' });
// ✅ Better — SDK sends the stream over HTTP, Supabase buffers on their end
const readStream = fs.createReadStream(tempFilePath);
await client.storage.from(bucket).upload(storagePath, readStream, { contentType: 'application/json' });The memory pressure moves from your Node process to Supabase's infrastructure — which is exactly where it belongs. Revised options given this constraint:
For Option B on the Supabase path: write each Would you like me to plan out the code changes for Option B, keeping the Supabase constraint in mind? ✏️ Learnings added
🧠 Learnings used |
|
so this can only be done by supabase anon key? |
|
Tip For best results, initiate chat on the files or code changes. 🧩 Analysis chain🏁 Script executed: #!/bin/bash
cat apps/consumer/src/workers/export.worker.jsLength of output: 9032
On the "Supabase anon key" questionThe choice of key (anon vs service_role) is independent of whether you pass a All open issues scoped to
|
| # | Severity | Location | Issue |
|---|---|---|---|
| 1 | 🔴 High | Supabase path, ~Line 131 | fs.readFileSync loads the entire export into a Node.js Buffer before uploading — negates the whole temp-file strategy |
| 2 | 🔴 High | Both paths, ~Lines 94–110 & 148–163 | Mongoose cursor is not explicitly closed after break; must use try/finally { await cursor.close() } to prevent MongoDB connection leaks |
| 3 | 🟡 Medium | Supabase path, ~Lines 118–123 | Race condition: writeStream.end() is called before the finish promise listener is registered — if finish fires first, the promise never resolves |
| 4 | 🟢 Low | Both paths | Entire for await cursor loop is copy-pasted between the two provider paths — a single bug fix must be applied twice |
Concrete fixes for @anshul23102
Fix 1 — Replace readFileSync with createReadStream (Supabase path, ~Line 131):
// Before:
const fileBuffer = fs.readFileSync(tempFilePath);
const { error } = await client.storage.from(bucket).upload(storagePath, fileBuffer, {
contentType: 'application/json'
});
// After:
const readStream = fs.createReadStream(tempFilePath);
const { error } = await client.storage.from(bucket).upload(storagePath, readStream, {
contentType: 'application/json'
});Fix 2 — Explicitly close the cursor in both paths:
const cursor = Model.find().lean().limit(MAX_EXPORT_ROWS + 1).cursor();
try {
for await (const doc of cursor) {
if (exportedCount >= MAX_EXPORT_ROWS) {
wasTruncated = true;
break;
}
// ... write doc ...
}
} finally {
await cursor.close(); // safe even if already exhausted
}Apply this to both the Supabase path and the S3/R2 path.
Fix 3 — Register the finish listener before calling .end() (Supabase path):
// Before:
writeStream.end();
await new Promise((resolve, reject) => {
writeStream.on('finish', resolve);
writeStream.on('error', reject);
});
// After:
const finishPromise = new Promise((resolve, reject) => {
writeStream.on('finish', resolve);
writeStream.on('error', reject);
});
writeStream.end();
await finishPromise;Fix 4 (optional, recommended) — Extract shared cursor-streaming helper:
async function streamCursorToWritable(cursor, writable, maxRows) {
let first = true;
let exportedCount = 0;
let wasTruncated = false;
try {
for await (const doc of cursor) {
if (exportedCount >= maxRows) { wasTruncated = true; break; }
exportedCount++;
if (!first) await writeChunk(writable, ',\n');
await writeChunk(writable, ` ${JSON.stringify(doc)}`);
first = false;
}
} finally {
await cursor.close();
}
return { exportedCount, wasTruncated };
}Then call streamCursorToWritable(cursor, writeStream, MAX_EXPORT_ROWS) in both paths. This ensures any future fix only needs to be applied once.
🧠 Learnings used
Learnt from: yash-pouranik
Repo: geturbackend/urBackend PR: 188
File: apps/consumer/src/workers/export.worker.js:55-72
Timestamp: 2026-05-28T09:08:02.949Z
Learning: In the urBackend project (PR `#188`, `apps/consumer/src/workers/export.worker.js` and `apps/dashboard-api/src/controllers/dbExport.controller.js`), the agreed design for database export is **per-collection**, not per-project:
- The user selects a specific collection from the database view and triggers its export.
- The API route should be `POST /api/projects/:projectId/collections/:collectionName/export`.
- The BullMQ job payload must include `collectionName`.
- The worker should export only that single collection (not iterate over all `project.collections`).
- Rate limits: free tier = 1 collection export per 24 hours, pro tier = 5 collection exports per 24 hours.
Learnt from: yash-pouranik
Repo: geturbackend/urBackend PR: 0
File: :0-0
Timestamp: 2026-05-17T17:37:13.343Z
Learning: In the urBackend project, the DB export feature is available to both free and pro users. Free users get 1 export per 24 hours (`dbExportPerDay: 1`) and pro users get 5 exports per 24 hours (`dbExportPerDay: 5`). Both tiers have `dbExportEnabled: true` in `packages/common/src/utils/planLimits.js`. The Redis rate limit key pattern is `project:export:count:{projectId}:{YYYY-MM-DD}`.
Learnt from: yash-pouranik
Repo: geturbackend/urBackend PR: 0
File: :0-0
Timestamp: 2026-06-05T09:00:07.517Z
Learning: In the urBackend project, Supabase storage does NOT support true streaming uploads — it buffers the entire upload server-side. The Supabase JS SDK (`supabase/storage-js`) does accept a `NodeJS.ReadableStream` as the upload body (avoiding `readFileSync` memory spike on the client), but the server will still buffer the full payload. Option C (streaming directly to object storage with no buffering) is only viable for the S3/Cloudflare R2 path via AWS SDK multipart upload, not for the Supabase path.
Problem Statement
The consumer export worker in
apps/consumer/src/workers/export.worker.jsiterates over a MongoDB collection cursor with no document count limit. On collections with hundreds of thousands or millions of documents, a single export job consumes unbounded memory and CPU, holds open a long-lived database connection, and can hang the worker process indefinitely. This creates a resource exhaustion vector: any authenticated user with export access to a large collection can trigger a denial of service against the consumer process.Root Cause Analysis
Both code paths in the worker (
supabaseprovider ands3/cloudflare_r2provider) construct a MongoDB cursor withModel.find().lean().cursor()and iterate over every document with afor awaitloop. There is no.limit()call, no document counter, and no early exit. MongoDB cursors are designed to stream the full result set, so the loop runs until the collection is exhausted regardless of collection size.Solution Overview
Introduce a
MAX_EXPORT_ROWSconstant and apply.limit(MAX_EXPORT_ROWS)to the cursor in both provider paths. Track the document count during iteration and emit a warning log when the limit is reached so operators know the export was truncated. The constant is defined at module level so it can be adjusted via a future environment variable without touching the iteration logic.Changes Made
apps/consumer/src/workers/export.worker.js
MAX_EXPORT_ROWS = 100000constant at module level with an explanatory comment..limit(MAX_EXPORT_ROWS)to the cursor in the Supabase provider path..limit(MAX_EXPORT_ROWS)to the cursor in the S3/Cloudflare R2 provider path.exportedCountcounter that increments on each document and logs a truncation warning when it reaches the limit.Type of Change
Testing Done
Environment
Manual Testing Steps
Test Case 1: Export with fewer documents than the limit
Expected: All documents are exported. No truncation warning is logged.
Actual: All documents exported correctly.
Test Case 2: Export at the limit boundary
Expected: All 100,000 documents exported. No truncation warning (count equals limit but does not exceed it).
Actual: Correct behavior.
Test Case 3: Export exceeding the limit
Expected: Worker stops at 100,000, logs
Export truncated: reached limit of 100000 documents, and completes normally.Actual: Truncation warning logged, export file contains 100,000 documents.
Edge Cases Tested
Screenshots or Demo
Not applicable. This is a backend worker change with no visual output.
Related Issue
Closes #249
Checklist
GSSoC Label Request
Maintainer, could you please add the appropriate GSSoC label to this PR? This helps with contribution tracking and scoring under GSSoC '26. Thank you.
Summary by CodeRabbit