Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
59 commits
Select commit Hold shift + click to select a range
6f7d728
stream: prototype for new stream implementation
jasnell Mar 1, 2026
344826c
stream: updates to stream/new impl
jasnell Mar 3, 2026
1830eef
stream: clarify backpressure details in stream_new
jasnell Mar 3, 2026
c6f5717
stream: fixup sync pull batching in stream/new
jasnell Mar 3, 2026
2321d75
stream: fixup stream_new.md linting
jasnell Mar 3, 2026
23968f0
stream: fixup some perf bugs in stream/new
jasnell Mar 3, 2026
93f6002
stream: apply more perf improvements to stream/new
jasnell Mar 3, 2026
4fdcf04
stream: apply more perf improvements to stream/new
jasnell Mar 3, 2026
2b9287c
stream: apply another minor perf improvement in stream/new
jasnell Mar 3, 2026
2a7ebab
stream: use proper # private fields in stream/new
jasnell Mar 3, 2026
58581a9
stream: fixup from memory issue / batch yielding
jasnell Mar 3, 2026
41f5bfd
stream: update stream/new default highWaterMark to 4
jasnell Mar 3, 2026
2997fed
stream: rename stream/new to stream/iter
jasnell Mar 18, 2026
392c2de
stream: gate stream/iter behind --experimental-stream-iter
jasnell Mar 18, 2026
6f380d3
stream: replace instanceof with cross-realm-safe type checks
jasnell Mar 18, 2026
5d879bd
stream: use primordials for prototype method access in stream/iter
jasnell Mar 18, 2026
6bceb17
stream: fix Broadcast.from() silently dropping non-array chunks
jasnell Mar 18, 2026
c53ffb2
stream: fix share consumer premature termination on concurrent pull
jasnell Mar 18, 2026
72256c6
stream: fix Writer sync method return values for try-fallback pattern
jasnell Mar 18, 2026
32a1cd3
stream: use Writer try-fallback pattern in pipeTo and Broadcast.from
jasnell Mar 18, 2026
3d46f5c
stream: tolerate non-array batches in stream/iter consumers
jasnell Mar 18, 2026
37b539b
stream: treat Writer end/fail methods as optional in pipeTo
jasnell Mar 18, 2026
bafc18d
stream: add bounds checking to RingBuffer.get()
jasnell Mar 18, 2026
b3b506b
stream: deduplicate consumer sync/async iteration logic
jasnell Mar 18, 2026
38d6027
stream: validate all elements in isUint8ArrayBatch
jasnell Mar 18, 2026
98edd75
stream: handle sync iterables in merge() multi-source path
jasnell Mar 18, 2026
f213fa6
stream: consolidate TextEncoder to single instance in utils
jasnell Mar 18, 2026
0aa2037
stream: prevent unhandled rejection in Broadcast.from() pump
jasnell Mar 18, 2026
fd65597
stream: validate backpressure option at construction time
jasnell Mar 18, 2026
fe3c1b5
stream: make a number of cleanups in broadcast.js
jasnell Mar 18, 2026
d984198
stream: update and improve the stream_iter doc
jasnell Mar 18, 2026
c9c95f7
stream: add pull/writer methods to FileHandle conditionally
jasnell Mar 18, 2026
ca38be2
stream: make multiple updates, cleanups, and fixes
jasnell Mar 18, 2026
2627f92
stream: improve validation of options in stream/iter
jasnell Mar 18, 2026
f9844d7
stream: make additional fixups to stream/iter arg validation
jasnell Mar 18, 2026
28c3468
stream: expand and split stream/iter tests
jasnell Mar 18, 2026
f9164a9
stream: add stream/iter benchmarks
jasnell Mar 18, 2026
127dcad
stream: performance optimizations for stream/iter pipeline and broadcast
jasnell Mar 18, 2026
ef20558
src: fixup linting issue
jasnell Mar 18, 2026
de13754
stream: fixup stream/iter benchmarks and test assertion
jasnell Mar 18, 2026
c5cb396
stream: fixup stream/iter tests
jasnell Mar 19, 2026
4021134
stream: cleanup a few minor details in stream/iter
jasnell Mar 19, 2026
3464b45
stream: apply spec conformance fixes (part 1)
jasnell Mar 20, 2026
316eac1
stream: apply more stream/iter spec conformance fixes
jasnell Mar 20, 2026
4859b8b
stream: apply more stream/iter conformance fixes
jasnell Mar 20, 2026
4da0408
stream: apply more stream/iter conformance fixes
jasnell Mar 20, 2026
be34e21
stream: apply more stream/iter conformance fixes
jasnell Mar 20, 2026
51bfd76
stream: apply more stream/iter conformance fixes
jasnell Mar 20, 2026
355b207
stream: update stream/iter benchmarks
jasnell Mar 20, 2026
15ec1bc
stream: reorder stream/iter tests
jasnell Mar 20, 2026
c8407df
stream: make stream/iter transform benchmark better
jasnell Mar 20, 2026
2ae8069
stream: apply multiple stream/iter cleanups
jasnell Mar 20, 2026
25699be
stream: make additional minor tweaks in stream/iter
jasnell Mar 20, 2026
7ba041c
stream: add sab support to stream/iter
jasnell Mar 20, 2026
3c568ca
stream: apply more stream/iter cleanups
jasnell Mar 20, 2026
7548080
stream: expand stream/iter test coverage
jasnell Mar 20, 2026
319df06
fs: fill out more of the FileHandle pull/writer impl
jasnell Mar 23, 2026
720cd90
stream: split stream/iter compression transforms out
jasnell Mar 23, 2026
5a62bd3
stream: fixup stream/iter FileHandle benchmarks
jasnell Mar 23, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
103 changes: 103 additions & 0 deletions benchmark/fs/bench-filehandle-pipetosync.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
// Benchmark: pipeToSync with sync compression transforms.
// Measures fully synchronous file-to-file pipeline (no threadpool, no promises).
'use strict';

const common = require('../common.js');
const fs = require('fs');
const { openSync, closeSync, writeSync, unlinkSync } = fs;

const tmpdir = require('../../test/common/tmpdir');
tmpdir.refresh();
const srcFile = tmpdir.resolve(`.removeme-sync-bench-src-${process.pid}`);
const dstFile = tmpdir.resolve(`.removeme-sync-bench-dst-${process.pid}`);

const bench = common.createBenchmark(main, {
compression: ['gzip', 'deflate', 'brotli', 'zstd'],
filesize: [1024 * 1024, 16 * 1024 * 1024, 64 * 1024 * 1024],
n: [5],
}, {
flags: ['--experimental-stream-iter'],
});

function main({ compression, filesize, n }) {
// Create the fixture file with repeating lowercase ASCII
const chunk = Buffer.alloc(Math.min(filesize, 64 * 1024), 'abcdefghij');
const fd = openSync(srcFile, 'w');
let remaining = filesize;
while (remaining > 0) {
const toWrite = Math.min(remaining, chunk.length);
writeSync(fd, chunk, 0, toWrite);
remaining -= toWrite;
}
closeSync(fd);

const { pipeToSync } = require('stream/iter');
const {
compressGzipSync,
compressDeflateSync,
compressBrotliSync,
compressZstdSync,
} = require('zlib/iter');
const { open } = fs.promises;

const compressFactory = {
gzip: compressGzipSync,
deflate: compressDeflateSync,
brotli: compressBrotliSync,
zstd: compressZstdSync,
}[compression];

// Stateless uppercase transform (sync)
const upper = (chunks) => {
if (chunks === null) return null;
const out = new Array(chunks.length);
for (let j = 0; j < chunks.length; j++) {
const src = chunks[j];
const buf = Buffer.allocUnsafe(src.length);
for (let i = 0; i < src.length; i++) {
const b = src[i];
buf[i] = (b >= 0x61 && b <= 0x7a) ? b - 0x20 : b;
}
out[j] = buf;
}
return out;
};

// Use a synchronous wrapper since pipeToSync is fully sync.
// We need FileHandle for pullSync/writer, so open async then run sync.
(async () => {
const srcFh = await open(srcFile, 'r');
const dstFh = await open(dstFile, 'w');

// Warm up
runSync(srcFh, dstFh, upper, compressFactory, pipeToSync);

// Reset file positions for the benchmark
await srcFh.close();
await dstFh.close();

bench.start();
let totalBytes = 0;
for (let i = 0; i < n; i++) {
const src = await open(srcFile, 'r');
const dst = await open(dstFile, 'w');
totalBytes += runSync(src, dst, upper, compressFactory, pipeToSync);
await src.close();
await dst.close();
}
bench.end(totalBytes / (1024 * 1024));

cleanup();
})();
}

function runSync(srcFh, dstFh, upper, compressFactory, pipeToSync) {
const w = dstFh.writer();
pipeToSync(srcFh.pullSync(upper, compressFactory()), w);
return w.endSync();
}

function cleanup() {
try { unlinkSync(srcFile); } catch { /* Ignore */ }
try { unlinkSync(dstFile); } catch { /* Ignore */ }
}
201 changes: 201 additions & 0 deletions benchmark/fs/bench-filehandle-pull-vs-webstream.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,201 @@
// Compare FileHandle.createReadStream() vs readableWebStream() vs pull()
// reading a large file through two transforms: uppercase then compress.
'use strict';

const common = require('../common.js');
const fs = require('fs');
const zlib = require('zlib');
const { Transform, Writable, pipeline } = require('stream');

const tmpdir = require('../../test/common/tmpdir');
tmpdir.refresh();
const filename = tmpdir.resolve(`.removeme-benchmark-garbage-${process.pid}`);

const bench = common.createBenchmark(main, {
api: ['classic', 'webstream', 'pull'],
compression: ['gzip', 'deflate', 'brotli', 'zstd'],
filesize: [1024 * 1024, 16 * 1024 * 1024, 64 * 1024 * 1024],
n: [5],
}, {
flags: ['--experimental-stream-iter'],
// Classic and webstream only support gzip (native zlib / CompressionStream).
// Brotli, deflate, zstd are pull-only via stream/iter transforms.
combinationFilter({ api, compression }) {
if (api === 'classic' && compression !== 'gzip') return false;
if (api === 'webstream' && compression !== 'gzip') return false;
return true;
},
});

function main({ api, compression, filesize, n }) {
// Create the fixture file with repeating lowercase ASCII
const chunk = Buffer.alloc(Math.min(filesize, 64 * 1024), 'abcdefghij');
const fd = fs.openSync(filename, 'w');
let remaining = filesize;
while (remaining > 0) {
const toWrite = Math.min(remaining, chunk.length);
fs.writeSync(fd, chunk, 0, toWrite);
remaining -= toWrite;
}
fs.closeSync(fd);

if (api === 'classic') {
benchClassic(n, filesize).then(() => cleanup());
} else if (api === 'webstream') {
benchWebStream(n, filesize).then(() => cleanup());
} else {
benchPull(n, filesize, compression).then(() => cleanup());
}
}

function cleanup() {
try { fs.unlinkSync(filename); } catch { /* ignore */ }
}

// Stateless uppercase transform (shared by all paths)
function uppercaseChunk(chunk) {
const buf = Buffer.allocUnsafe(chunk.length);
for (let i = 0; i < chunk.length; i++) {
const b = chunk[i];
buf[i] = (b >= 0x61 && b <= 0x7a) ? b - 0x20 : b;
}
return buf;
}

// ---------------------------------------------------------------------------
// Classic streams path: createReadStream -> Transform (upper) -> createGzip
// ---------------------------------------------------------------------------
async function benchClassic(n, filesize) {
await runClassic();

bench.start();
let totalBytes = 0;
for (let i = 0; i < n; i++) {
totalBytes += await runClassic();
}
bench.end(totalBytes / (1024 * 1024));
}

function runClassic() {
return new Promise((resolve, reject) => {
const rs = fs.createReadStream(filename);

const upper = new Transform({
transform(chunk, encoding, callback) {
callback(null, uppercaseChunk(chunk));
},
});

const gz = zlib.createGzip();

let totalBytes = 0;
const sink = new Writable({
write(chunk, encoding, callback) {
totalBytes += chunk.length;
callback();
},
});

pipeline(rs, upper, gz, sink, (err) => {
if (err) reject(err);
else resolve(totalBytes);
});
});
}

// ---------------------------------------------------------------------------
// WebStream path: readableWebStream -> TransformStream (upper) -> CompressionStream
// ---------------------------------------------------------------------------
async function benchWebStream(n, filesize) {
await runWebStream();

bench.start();
let totalBytes = 0;
for (let i = 0; i < n; i++) {
totalBytes += await runWebStream();
}
bench.end(totalBytes / (1024 * 1024));
}

async function runWebStream() {
const fh = await fs.promises.open(filename, 'r');
try {
const rs = fh.readableWebStream();

const upper = new TransformStream({
transform(chunk, controller) {
const buf = new Uint8Array(chunk.length);
for (let i = 0; i < chunk.length; i++) {
const b = chunk[i];
buf[i] = (b >= 0x61 && b <= 0x7a) ? b - 0x20 : b;
}
controller.enqueue(buf);
},
});

const compress = new CompressionStream('gzip');
const output = rs.pipeThrough(upper).pipeThrough(compress);
const reader = output.getReader();

let totalBytes = 0;
while (true) {
const { done, value } = await reader.read();
if (done) break;
totalBytes += value.byteLength;
}
return totalBytes;
} finally {
await fh.close();
}
}

// ---------------------------------------------------------------------------
// Pull/iter path: pull() with uppercase transform + selected compression
// ---------------------------------------------------------------------------
async function benchPull(n, filesize, compression) {
const iter = require('zlib/iter');

const compressFactory = {
gzip: iter.compressGzip,
deflate: iter.compressDeflate,
brotli: iter.compressBrotli,
zstd: iter.compressZstd,
}[compression];

// Warm up
await runPull(compressFactory);

bench.start();
let totalBytes = 0;
for (let i = 0; i < n; i++) {
totalBytes += await runPull(compressFactory);
}
bench.end(totalBytes / (1024 * 1024));
}

async function runPull(compressFactory) {
const fh = await fs.promises.open(filename, 'r');
try {
// Stateless transform: uppercase each chunk in the batch
const upper = (chunks) => {
if (chunks === null) return null;
const out = new Array(chunks.length);
for (let j = 0; j < chunks.length; j++) {
out[j] = uppercaseChunk(chunks[j]);
}
return out;
};

const readable = fh.pull(upper, compressFactory());

let totalBytes = 0;
for await (const chunks of readable) {
for (let i = 0; i < chunks.length; i++) {
totalBytes += chunks[i].byteLength;
}
}
return totalBytes;
} finally {
await fh.close();
}
}
92 changes: 92 additions & 0 deletions benchmark/streams/iter-creation.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
// Object creation overhead benchmark.
// Measures the cost of constructing stream infrastructure (no data flow).
'use strict';

const common = require('../common.js');
const { Readable, Writable, Transform, PassThrough } = require('stream');

const bench = common.createBenchmark(main, {
api: ['classic', 'webstream', 'iter'],
type: ['readable', 'writable', 'transform', 'pair'],
n: [1e5],
}, {
flags: ['--experimental-stream-iter'],
// Iter has no standalone Transform class; transforms are plain functions.
combinationFilter: ({ api, type }) =>
!(api === 'iter' && type === 'transform'),
});

function main({ api, type, n }) {
switch (api) {
case 'classic':
return benchClassic(type, n);
case 'webstream':
return benchWebStream(type, n);
case 'iter':
return benchIter(type, n);
}
}

function benchClassic(type, n) {
bench.start();
switch (type) {
case 'readable':
for (let i = 0; i < n; i++) new Readable({ read() {} });
break;
case 'writable':
for (let i = 0; i < n; i++) new Writable({ write(c, e, cb) { cb(); } });
break;
case 'transform':
for (let i = 0; i < n; i++) new Transform({
transform(c, e, cb) { cb(null, c); },
});
break;
case 'pair':
for (let i = 0; i < n; i++) new PassThrough();
break;
}
bench.end(n);
}

function benchWebStream(type, n) {
bench.start();
switch (type) {
case 'readable':
for (let i = 0; i < n; i++) new ReadableStream({ pull() {} });
break;
case 'writable':
for (let i = 0; i < n; i++) new WritableStream({ write() {} });
break;
case 'transform':
for (let i = 0; i < n; i++) new TransformStream({
transform(c, controller) { controller.enqueue(c); },
});
break;
case 'pair': {
// TransformStream gives a readable+writable pair
for (let i = 0; i < n; i++) new TransformStream();
break;
}
}
bench.end(n);
}

function benchIter(type, n) {
const { push, from, duplex } = require('stream/iter');

bench.start();
switch (type) {
case 'readable':
for (let i = 0; i < n; i++) from('x');
break;
case 'writable':
// push() creates a writer+readable pair
for (let i = 0; i < n; i++) push();
break;
case 'pair':
// duplex() creates a bidirectional channel pair
for (let i = 0; i < n; i++) duplex();
break;
}
bench.end(n);
}
Loading