Skip to content

Commit 0c9827a

Browse files
committed
stream: fix Writer sync method return values for try-fallback pattern
All `*Sync` methods on the Writer interface are designed as try-fallback pairs with their async counterparts: attempt the fast synchronous path first, and fall back to the async version only when the synchronous call indicates it could not complete. `endSync()` was returning the byte count unconditionally, making the fallback check in duplex `close()` incorrect (`!0 === true`). `failSync()` always returned true even when already errored. - `endSync()` now returns -1 when writer is not open, byte count (`>= 0`) on success - `failSync()` now returns false when already errored - `PushQueue.fail()` returns boolean to support `failSync()` - duplex `close()` uses `endSync() < 0` for the fallback check - Document the try-fallback pattern and sync return values
1 parent a6a37e5 commit 0c9827a

4 files changed

Lines changed: 40 additions & 8 deletions

File tree

doc/api/stream_iter.md

Lines changed: 30 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -695,7 +695,19 @@ run().catch(console.error);
695695

696696
#### Writer
697697

698-
The writer returned by `push()` has the following methods:
698+
The writer returned by `push()` has the following methods.
699+
700+
Each async method has a synchronous `*Sync` counterpart designed for a
701+
try-fallback pattern: attempt the fast synchronous path first, and fall back
702+
to the async version only when the synchronous call indicates it could not
703+
complete:
704+
705+
```mjs
706+
if (!writer.writeSync(chunk)) await writer.write(chunk);
707+
if (!writer.writevSync(chunks)) await writer.writev(chunks);
708+
if (writer.endSync() < 0) await writer.end();
709+
if (!writer.failSync(err)) await writer.fail(err);
710+
```
699711

700712
##### `writer.fail(reason)`
701713

@@ -707,8 +719,10 @@ Fail the stream with an error.
707719
##### `writer.failSync(reason)`
708720

709721
* `reason` {Error}
722+
* Returns: {boolean} `true` if the writer was failed, `false` if already
723+
errored.
710724

711-
Synchronously fail the stream with an error. Does not return a promise.
725+
Synchronous variant of `writer.fail()`.
712726

713727
##### `writer.desiredSize`
714728

@@ -726,6 +740,20 @@ Returns `null` if the writer is closed or the consumer has disconnected.
726740

727741
Signal that no more data will be written.
728742

743+
##### `writer.endSync()`
744+
745+
* Returns: {number} Total bytes written, or `-1` if the writer is not open.
746+
747+
Synchronous variant of `writer.end()`. Returns `-1` if the writer is already
748+
closed or errored. Can be used as a try-fallback pattern:
749+
750+
```cjs
751+
const result = writer.endSync();
752+
if (result < 0) {
753+
writer.end();
754+
}
755+
```
756+
729757
##### `writer.write(chunk[, options])`
730758

731759
* `chunk` {Uint8Array|string}

lib/internal/streams/iter/duplex.js

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ function duplex(options) {
4646
if (aWriterRef === null) return;
4747
const writer = aWriterRef;
4848
aWriterRef = null;
49-
if (!writer.endSync()) {
49+
if (writer.endSync() < 0) {
5050
await writer.end();
5151
}
5252
},
@@ -62,7 +62,7 @@ function duplex(options) {
6262
if (bWriterRef === null) return;
6363
const writer = bWriterRef;
6464
bWriterRef = null;
65-
if (!writer.endSync()) {
65+
if (writer.endSync() < 0) {
6666
await writer.end();
6767
}
6868
},

lib/internal/streams/iter/push.js

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -248,7 +248,7 @@ class PushQueue {
248248
*/
249249
end() {
250250
if (this.#writerState !== 'open') {
251-
return this.#bytesWritten;
251+
return -1;
252252
}
253253

254254
this.#writerState = 'closed';
@@ -261,16 +261,18 @@ class PushQueue {
261261

262262
/**
263263
* Put queue into terminal error state.
264+
* @returns {boolean} true if the writer was failed, false if already errored.
264265
*/
265266
fail(reason) {
266-
if (this.#writerState === 'errored') return;
267+
if (this.#writerState === 'errored') return false;
267268

268269
this.#writerState = 'errored';
269270
this.#error = reason ?? new ERR_INVALID_STATE('Failed');
270271
this.#cleanup();
271272
this.#rejectPendingReads(this.#error);
272273
this.#rejectPendingWrites(this.#error);
273274
this.#rejectPendingDrains(this.#error);
275+
return true;
274276
}
275277

276278
get totalBytesWritten() {
@@ -512,8 +514,7 @@ class PushWriter {
512514
}
513515

514516
failSync(reason) {
515-
this.#queue.fail(reason);
516-
return true;
517+
return this.#queue.fail(reason);
517518
}
518519
}
519520

test/parallel/test-stream-iter-push.js

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,9 @@ async function testWriterEnd() {
108108
const totalBytes = writer.endSync();
109109
assert.strictEqual(totalBytes, 0);
110110

111+
// Calling endSync again returns -1 (already closed)
112+
assert.strictEqual(writer.endSync(), -1);
113+
111114
const batches = [];
112115
for await (const batch of readable) {
113116
batches.push(batch);

0 commit comments

Comments
 (0)