Skip to content

Commit 32eaae7

Browse files
committed
stream: use Writer try-fallback pattern in pipeTo and Broadcast.from
The `Writer` interface sync methods are designed as try-fallback pairs: attempt the fast sync path first, fall back to async only when rejected or unavailable. This was not being followed in `pipeTo`, `pipeToSync`, or `Broadcast.from()`. `pipeTo` now attempts `writeSync`/`writevSync` before `write`/`writev`, `endSync` before `end`, and `failSync` before `fail`, with graceful handling when *Sync methods are not present on the writer. `pipeToSync` similarly prefers *Sync methods with fallback to the non-sync variants for writers that lack them. `Broadcast.from()` applies the same pattern when writing source data to the internal broadcast writer. Additionally, writes in p`ipeTo are now sequential rather than parallel (previously used `Promise.all`), which respects writer backpressure signals.
1 parent 0c9827a commit 32eaae7

File tree

2 files changed

+56
-29
lines changed

2 files changed

+56
-29
lines changed

lib/internal/streams/iter/broadcast.js

Lines changed: 19 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -683,6 +683,7 @@ const Broadcast = {
683683
const signal = options?.signal;
684684

685685
(async () => {
686+
const w = result.writer;
686687
try {
687688
if (isAsyncIterable(input)) {
688689
for await (const chunks of input) {
@@ -691,11 +692,11 @@ const Broadcast = {
691692
lazyDOMException('Aborted', 'AbortError');
692693
}
693694
if (ArrayIsArray(chunks)) {
694-
await result.writer.writev(
695-
chunks, signal ? { signal } : undefined);
696-
} else {
697-
await result.writer.write(
698-
chunks, signal ? { signal } : undefined);
695+
if (!w.writevSync(chunks)) {
696+
await w.writev(chunks, signal ? { signal } : undefined);
697+
}
698+
} else if (!w.writeSync(chunks)) {
699+
await w.write(chunks, signal ? { signal } : undefined);
699700
}
700701
}
701702
} else if (isSyncIterable(input)) {
@@ -705,18 +706,23 @@ const Broadcast = {
705706
lazyDOMException('Aborted', 'AbortError');
706707
}
707708
if (ArrayIsArray(chunks)) {
708-
await result.writer.writev(
709-
chunks, signal ? { signal } : undefined);
710-
} else {
711-
await result.writer.write(
712-
chunks, signal ? { signal } : undefined);
709+
if (!w.writevSync(chunks)) {
710+
await w.writev(chunks, signal ? { signal } : undefined);
711+
}
712+
} else if (!w.writeSync(chunks)) {
713+
await w.write(chunks, signal ? { signal } : undefined);
713714
}
714715
}
715716
}
716-
await result.writer.end(signal ? { signal } : undefined);
717+
if (w.endSync() < 0) {
718+
await w.end(signal ? { signal } : undefined);
719+
}
717720
} catch (error) {
718-
await result.writer.fail(
719-
isError(error) ? error : new ERR_INVALID_ARG_TYPE('error', 'Error', String(error)));
721+
const err = isError(error) ? error :
722+
new ERR_INVALID_ARG_TYPE('error', 'Error', String(error));
723+
if (!w.failSync(err)) {
724+
await w.fail(err);
725+
}
720726
}
721727
})();
722728

lib/internal/streams/iter/pull.js

Lines changed: 37 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@ const {
1010
ArrayIsArray,
1111
ArrayPrototypePush,
1212
ArrayPrototypeSlice,
13-
SafePromiseAllReturnVoid,
1413
String,
1514
SymbolAsyncIterator,
1615
SymbolIterator,
@@ -610,22 +609,33 @@ function pipeToSync(source, ...args) {
610609
source;
611610

612611
let totalBytes = 0;
612+
const hasWriteSync = typeof writer.writeSync === 'function';
613+
const hasEndSync = typeof writer.endSync === 'function';
614+
const hasFailSync = typeof writer.failSync === 'function';
613615

614616
try {
615617
for (const batch of pipeline) {
616618
for (let i = 0; i < batch.length; i++) {
617619
const chunk = batch[i];
618-
writer.write(chunk);
620+
if (!hasWriteSync || !writer.writeSync(chunk)) {
621+
writer.write(chunk);
622+
}
619623
totalBytes += TypedArrayPrototypeGetByteLength(chunk);
620624
}
621625
}
622626

623627
if (!options?.preventClose) {
624-
writer.end();
628+
if (!hasEndSync || writer.endSync() < 0) {
629+
writer.end();
630+
}
625631
}
626632
} catch (error) {
627633
if (!options?.preventFail) {
628-
writer.fail(isError(error) ? error : new ERR_OPERATION_FAILED(String(error)));
634+
const err = isError(error) ? error :
635+
new ERR_OPERATION_FAILED(String(error));
636+
if (!hasFailSync || !writer.failSync(err)) {
637+
writer.fail(err);
638+
}
629639
}
630640
throw error;
631641
}
@@ -657,27 +667,31 @@ async function pipeTo(source, ...args) {
657667

658668
let totalBytes = 0;
659669
const hasWritev = typeof writer.writev === 'function';
670+
const hasWritevSync = typeof writer.writevSync === 'function';
671+
const hasWriteSync = typeof writer.writeSync === 'function';
660672

661-
// Helper to write a batch efficiently
673+
// Helper to write a batch efficiently using try-fallback pattern:
674+
// attempt the sync path first, fall back to async if rejected.
662675
const writeBatch = async (batch) => {
663676
if (hasWritev && batch.length > 1) {
664-
await writer.writev(batch, signal ? { signal } : undefined);
677+
if (!hasWritevSync ||
678+
!writer.writevSync(batch)) {
679+
await writer.writev(batch, signal ? { signal } : undefined);
680+
}
665681
for (let i = 0; i < batch.length; i++) {
666682
totalBytes += TypedArrayPrototypeGetByteLength(batch[i]);
667683
}
668684
} else {
669-
const promises = [];
670685
for (let i = 0; i < batch.length; i++) {
671686
const chunk = batch[i];
672-
const result = writer.write(chunk, signal ? { signal } : undefined);
673-
if (result !== undefined) {
674-
ArrayPrototypePush(promises, result);
687+
if (!hasWriteSync || !writer.writeSync(chunk)) {
688+
const result = writer.write(chunk, signal ? { signal } : undefined);
689+
if (result !== undefined) {
690+
await result;
691+
}
675692
}
676693
totalBytes += TypedArrayPrototypeGetByteLength(chunk);
677694
}
678-
if (promises.length > 0) {
679-
await SafePromiseAllReturnVoid(promises);
680-
}
681695
}
682696
};
683697

@@ -719,12 +733,19 @@ async function pipeTo(source, ...args) {
719733
}
720734

721735
if (!options?.preventClose) {
722-
await writer.end(signal ? { signal } : undefined);
736+
if (typeof writer.endSync !== 'function' ||
737+
writer.endSync() < 0) {
738+
await writer.end(signal ? { signal } : undefined);
739+
}
723740
}
724741
} catch (error) {
725742
if (!options?.preventFail) {
726-
await writer.fail(
727-
isError(error) ? error : new ERR_OPERATION_FAILED(String(error)));
743+
const err = isError(error) ? error :
744+
new ERR_OPERATION_FAILED(String(error));
745+
if (typeof writer.failSync !== 'function' ||
746+
!writer.failSync(err)) {
747+
await writer.fail(err);
748+
}
728749
}
729750
throw error;
730751
}

0 commit comments

Comments
 (0)