diff --git a/lib/internal/streams/end-of-stream.js b/lib/internal/streams/end-of-stream.js index e995755079f8c1..f310cd6936fbe7 100644 --- a/lib/internal/streams/end-of-stream.js +++ b/lib/internal/streams/end-of-stream.js @@ -7,6 +7,7 @@ const { Promise, PromisePrototypeThen, ReflectApply, + Symbol, SymbolDispose, } = primordials; @@ -66,6 +67,50 @@ function bindAsyncResource(fn, type) { }; } +/** + * Returns the current stream error tracked by eos(), if any. + * @param {import('stream').Stream} stream + * @returns {Error | null} + */ +function getEosErrored(stream) { + const errored = isWritableErrored(stream) || isReadableErrored(stream); + return typeof errored !== 'boolean' && errored || null; +} + +/** + * Returns the error eos() would report from an immediate close, including + * premature close detection for unfinished readable or writable sides. + * @param {import('stream').Stream} stream + * @param {boolean} readable + * @param {boolean | null} readableFinished + * @param {boolean} writable + * @param {boolean | null} writableFinished + * @returns {Error | null} + */ +function getEosOnCloseError(stream, readable, readableFinished, writable, writableFinished) { + const errored = getEosErrored(stream); + if (errored) { + return errored; + } + + if (readable && !readableFinished && isReadableNodeStream(stream, true)) { + if (!isReadableFinished(stream, false)) { + return new ERR_STREAM_PREMATURE_CLOSE(); + } + } + if (writable && !writableFinished) { + if (!isWritableFinished(stream, false)) { + return new ERR_STREAM_PREMATURE_CLOSE(); + } + } + + return null; +} + +// Internal only: if eos() can settle immediately, invoke the callback before +// returning cleanup. Callers must tolerate cleanup yet to be assigned. +const kEosNodeSynchronousCallback = Symbol('kEosNodeSynchronousCallback'); + function eos(stream, options, callback) { if (arguments.length === 2) { callback = options; @@ -78,14 +123,6 @@ function eos(stream, options, callback) { validateFunction(callback, 'callback'); validateAbortSignal(options.signal, 'options.signal'); - if (AsyncContextFrame.current() || enabledHooksExist()) { - // Avoid AsyncResource.bind() because it calls ObjectDefineProperties which - // is a bottleneck here. - callback = once(bindAsyncResource(callback, 'STREAM_END_OF_STREAM')); - } else { - callback = once(callback); - } - if (isReadableStream(stream) || isWritableStream(stream)) { return eosWeb(stream, options, callback); } @@ -97,15 +134,6 @@ function eos(stream, options, callback) { const readable = options.readable ?? isReadableNodeStream(stream); const writable = options.writable ?? isWritableNodeStream(stream); - const wState = stream._writableState; - const rState = stream._readableState; - - const onlegacyfinish = () => { - if (!stream.writable) { - onfinish(); - } - }; - // TODO (ronag): Improve soft detection to include core modules and // common ecosystem modules that do properly emit 'close' but fail // this generic check. @@ -114,8 +142,85 @@ function eos(stream, options, callback) { isReadableNodeStream(stream) === readable && isWritableNodeStream(stream) === writable ); - let writableFinished = isWritableFinished(stream, false); + let readableFinished = isReadableFinished(stream, false); + + const wState = stream._writableState; + const rState = stream._readableState; + + /** + * @type {Error | null | undefined} + * undefined: to be determined + * null: no error + * Error: an error occurred + */ + let immediateResult; + if (isClosed(stream)) { + immediateResult = getEosOnCloseError( + stream, + readable, + readableFinished, + writable, + writableFinished, + ); + } else if (wState?.errorEmitted || rState?.errorEmitted) { + if (!willEmitClose) { + immediateResult = getEosErrored(stream); + } + } else if ( + !readable && + (!willEmitClose || isReadable(stream)) && + (writableFinished || isWritable(stream) === false) && + (wState == null || wState.pendingcb === undefined || wState.pendingcb === 0) + ) { + immediateResult = getEosErrored(stream); + } else if ( + !writable && + (!willEmitClose || isWritable(stream)) && + (readableFinished || isReadable(stream) === false) + ) { + immediateResult = getEosErrored(stream); + } else if ((rState && stream.req && stream.aborted)) { + immediateResult = getEosErrored(stream); + } + let cleanup = () => { + callback = nop; + }; + if (immediateResult !== undefined) { + if (options.error !== false) { + stream.on('error', nop); + cleanup = () => { + callback = nop; + stream.removeListener('error', nop); + }; + } + } else if (options.signal?.aborted) { + immediateResult = new AbortError(undefined, { cause: options.signal.reason }); + } + if (immediateResult !== undefined && options[kEosNodeSynchronousCallback]) { + ReflectApply(callback, stream, immediateResult === null ? [] : [immediateResult]); + return cleanup; + } + + if (AsyncContextFrame.current() || enabledHooksExist()) { + // Avoid AsyncResource.bind() because it calls ObjectDefineProperties which + // is a bottleneck here. + callback = bindAsyncResource(callback, 'STREAM_END_OF_STREAM'); + } + + if (immediateResult !== undefined) { + process.nextTick(() => ReflectApply(callback, stream, immediateResult === null ? [] : [immediateResult])); + return cleanup; + } + + callback = once(callback); + + const onlegacyfinish = () => { + if (!stream.writable) { + onfinish(); + } + }; + const onfinish = () => { writableFinished = true; // Stream should not be destroyed here. If it is that @@ -134,7 +239,6 @@ function eos(stream, options, callback) { } }; - let readableFinished = isReadableFinished(stream, false); const onend = () => { readableFinished = true; // Stream should not be destroyed here. If it is that @@ -157,41 +261,13 @@ function eos(stream, options, callback) { callback.call(stream, err); }; - let closed = isClosed(stream); - const onclose = () => { - closed = true; - - const errored = isWritableErrored(stream) || isReadableErrored(stream); - - if (errored && typeof errored !== 'boolean') { - return callback.call(stream, errored); - } - - if (readable && !readableFinished && isReadableNodeStream(stream, true)) { - if (!isReadableFinished(stream, false)) - return callback.call(stream, - new ERR_STREAM_PREMATURE_CLOSE()); - } - if (writable && !writableFinished) { - if (!isWritableFinished(stream, false)) - return callback.call(stream, - new ERR_STREAM_PREMATURE_CLOSE()); - } - - callback.call(stream); - }; - - const onclosed = () => { - closed = true; - - const errored = isWritableErrored(stream) || isReadableErrored(stream); - - if (errored && typeof errored !== 'boolean') { - return callback.call(stream, errored); + const error = getEosOnCloseError(stream, readable, readableFinished, writable, writableFinished); + if (error === null) { + callback.call(stream); + } else { + callback.call(stream, error); } - - callback.call(stream); }; const onrequest = () => { @@ -225,30 +301,7 @@ function eos(stream, options, callback) { } stream.on('close', onclose); - if (closed) { - process.nextTick(onclose); - } else if (wState?.errorEmitted || rState?.errorEmitted) { - if (!willEmitClose) { - process.nextTick(onclosed); - } - } else if ( - !readable && - (!willEmitClose || isReadable(stream)) && - (writableFinished || isWritable(stream) === false) && - (wState == null || wState.pendingcb === undefined || wState.pendingcb === 0) - ) { - process.nextTick(onclosed); - } else if ( - !writable && - (!willEmitClose || isWritable(stream)) && - (readableFinished || isReadable(stream) === false) - ) { - process.nextTick(onclosed); - } else if ((rState && stream.req && stream.aborted)) { - process.nextTick(onclosed); - } - - const cleanup = () => { + cleanup = () => { callback = nop; stream.removeListener('aborted', onclose); stream.removeListener('complete', onfinish); @@ -263,7 +316,7 @@ function eos(stream, options, callback) { stream.removeListener('close', onclose); }; - if (options.signal && !closed) { + if (options.signal) { const abort = () => { // Keep it because cleanup removes it. const endCallback = callback; @@ -272,23 +325,27 @@ function eos(stream, options, callback) { stream, new AbortError(undefined, { cause: options.signal.reason })); }; - if (options.signal.aborted) { - process.nextTick(abort); - } else { - addAbortListener ??= require('internal/events/abort_listener').addAbortListener; - const disposable = addAbortListener(options.signal, abort); - const originalCallback = callback; - callback = once((...args) => { - disposable[SymbolDispose](); - ReflectApply(originalCallback, stream, args); - }); - } + addAbortListener ??= require('internal/events/abort_listener').addAbortListener; + const disposable = addAbortListener(options.signal, abort); + const originalCallback = callback; + callback = once((...args) => { + disposable[SymbolDispose](); + ReflectApply(originalCallback, stream, args); + }); } return cleanup; } function eosWeb(stream, options, callback) { + if (AsyncContextFrame.current() || enabledHooksExist()) { + // Avoid AsyncResource.bind() because it calls ObjectDefineProperties which + // is a bottleneck here. + callback = once(bindAsyncResource(callback, 'STREAM_END_OF_STREAM')); + } else { + callback = once(callback); + } + let isAborted = false; let abort = nop; if (options.signal) { @@ -347,4 +404,5 @@ function finished(stream, opts) { module.exports = { eos, finished, + kEosNodeSynchronousCallback, }; diff --git a/lib/internal/webstreams/adapters.js b/lib/internal/webstreams/adapters.js index 188f715a62d3c1..077d585ea5a387 100644 --- a/lib/internal/webstreams/adapters.js +++ b/lib/internal/webstreams/adapters.js @@ -91,7 +91,10 @@ const { streamBaseState, } = internalBinding('stream_wrap'); -const { eos } = require('internal/streams/end-of-stream'); +const { + eos, + kEosNodeSynchronousCallback, +} = require('internal/streams/end-of-stream'); const { UV_EOF } = internalBinding('uv'); @@ -139,6 +142,8 @@ function handleKnownInternalErrors(cause) { } } +const noop = () => {}; + /** * @typedef {import('../../stream').Writable} Writable * @typedef {import('../../stream').Readable} Readable @@ -449,6 +454,8 @@ function newStreamWritableFromWritableStream(writableStream, options = kEmptyObj return writable; } +const kErrorSentinelAttached = Symbol('kErrorSentinelAttached'); + /** * @typedef {import('./queuingstrategies').QueuingStrategy} QueuingStrategy * @param {Readable} streamReadable @@ -475,89 +482,81 @@ function newReadableStreamFromStreamReadable(streamReadable, options = kEmptyObj } const isBYOB = options.type === 'bytes'; - - if (isDestroyed(streamReadable) || !isReadable(streamReadable)) { - const readable = new ReadableStream(); - readable.cancel(); - return readable; - } - - const objectMode = streamReadable.readableObjectMode; - const highWaterMark = streamReadable.readableHighWaterMark; - - const evaluateStrategyOrFallback = (strategy) => { - // If the stream is BYOB, we only use highWaterMark - if (isBYOB) - return { highWaterMark }; - // If there is a strategy available, use it - if (strategy) - return strategy; - - if (objectMode) { - // When running in objectMode explicitly but no strategy, we just fall - // back to CountQueuingStrategy - return new CountQueuingStrategy({ highWaterMark }); - } - - return new ByteLengthQueuingStrategy({ highWaterMark }); - }; - - const strategy = evaluateStrategyOrFallback(options?.strategy); - let controller; let wasCanceled = false; + let strategy; - function onData(chunk) { - // Copy the Buffer to detach it from the pool. - if (Buffer.isBuffer(chunk) && !objectMode) - chunk = new Uint8Array(chunk); - controller.enqueue(chunk); - if (controller.desiredSize <= 0) - streamReadable.pause(); - } + /** @type {UnderlyingSource} */ + const underlyingSource = { + __proto__: null, + type: isBYOB ? 'bytes' : undefined, + start(c) { controller = c; }, + cancel(reason) { + wasCanceled = true; + destroy(streamReadable, reason); + }, + }; - streamReadable.pause(); + const readable = isReadable(streamReadable); + const objectMode = streamReadable.readableObjectMode; + if (readable) { + underlyingSource.pull = function pull() { + streamReadable.resume(); + }; + + const highWaterMark = streamReadable.readableHighWaterMark; + strategy = isBYOB ? { highWaterMark } : + options.strategy ?? new (objectMode ? CountQueuingStrategy : ByteLengthQueuingStrategy)({ highWaterMark }); + } + const readableStream = new ReadableStream(underlyingSource, strategy); - const cleanup = eos(streamReadable, (error) => { + // When adapting a Duplex as a ReadableStream, readable completion should not + // wait for a half-open writable side to finish as well. + let cleanup = noop; + cleanup = eos(streamReadable, { + __proto__: null, + writable: false, + [kEosNodeSynchronousCallback]: true, + }, (error) => { error = handleKnownInternalErrors(error); + // If eos calls the callback synchronously, cleanup is still a no-op here. cleanup(); - // This is a protection against non-standard, legacy streams - // that happen to emit an error event again after finished is called. - streamReadable.on('error', () => {}); - if (error) - return controller.error(error); - // Was already canceled + + if (!(kErrorSentinelAttached in streamReadable)) { + // This is a protection against non-standard, legacy streams + // that happen to emit an error event again after finished is called. + streamReadable.on('error', noop); + streamReadable[kErrorSentinelAttached] = true; + } if (wasCanceled) { return; } + wasCanceled = true; + if (error) + return controller.error(error); controller.close(); + if (isBYOB) + controller.byobRequest?.respond(0); }); - streamReadable.on('data', onData); - - return new ReadableStream({ - type: isBYOB ? 'bytes' : undefined, - start(c) { - controller = c; - if (isBYOB) { - streamReadable.once('end', () => { - // close the controller - controller.close(); - // And unlock the last BYOB read request - controller.byobRequest?.respond(0); - wasCanceled = true; - }); - } - }, + if (wasCanceled) { + // `eos` called the callback synchronously + cleanup(); + } else if (readable) { + streamReadable.pause(); - pull() { streamReadable.resume(); }, + streamReadable.on('data', function onData(chunk) { + // Copy the Buffer to detach it from the pool. + if (Buffer.isBuffer(chunk) && !objectMode) + chunk = new Uint8Array(chunk); + controller.enqueue(chunk); + if (controller.desiredSize <= 0) + streamReadable.pause(); + }); + } - cancel(reason) { - wasCanceled = true; - destroy(streamReadable, reason); - }, - }, strategy); + return readableStream; } /** diff --git a/test/parallel/test-stream-finished.js b/test/parallel/test-stream-finished.js index b55107a7a6440f..6c2f3cad68e1d8 100644 --- a/test/parallel/test-stream-finished.js +++ b/test/parallel/test-stream-finished.js @@ -1,3 +1,4 @@ +// Flags: --expose-internals 'use strict'; const common = require('../common'); @@ -15,6 +16,7 @@ const EE = require('events'); const fs = require('fs'); const { promisify } = require('util'); const http = require('http'); +const { kEosNodeSynchronousCallback } = require('internal/streams/end-of-stream'); { const rs = new Readable({ @@ -95,13 +97,20 @@ const http = require('http'); { // Check pre-cancelled - const signal = new EventTarget(); - signal.aborted = true; + const signal = AbortSignal.abort(); const rs = Readable.from((function* () {})()); - finished(rs, { signal }, common.mustCall((err) => { + const cleanup = finished(rs, { signal }, common.mustCall((err) => { + assert.strictEqual(err.name, 'AbortError'); + cleanup(); + })); + const unset = Symbol('unset'); + let cleanup2 = unset; + cleanup2 = finished(rs, { signal, [kEosNodeSynchronousCallback]: true }, common.mustCall((err) => { assert.strictEqual(err.name, 'AbortError'); + assert.strictEqual(cleanup2, unset); })); + cleanup2(); } { @@ -160,8 +169,7 @@ const http = require('http'); // Promisified pre-aborted works const finishedPromise = promisify(finished); async function run() { - const signal = new EventTarget(); - signal.aborted = true; + const signal = AbortSignal.abort(); const rs = Readable.from((function* () {})()); await finishedPromise(rs, { signal }); } @@ -592,6 +600,29 @@ testClosed((opts) => new Writable({ write() {}, ...opts })); })); } +{ + let serverRes; + const server = http.createServer(common.mustCall((req, res) => { + serverRes = res; + res.write('hello'); + })).listen(0, common.mustCall(function() { + http.get({ port: this.address().port }, common.mustCall((res) => { + res.on('aborted', common.mustCall(() => { + finished(res, common.mustCall((err) => { + assert.strictEqual(err.code, 'ECONNRESET'); + assert.strictEqual(err.message, 'aborted'); + server.close(); + })); + })); + res.on('error', common.expectsError({ + code: 'ECONNRESET', + message: 'aborted', + })); + serverRes.destroy(); + })).on('error', common.mustNotCall()); + })); +} + { const w = new Writable({ write(chunk, encoding, callback) { diff --git a/test/parallel/test-stream-readable-to-web-termination.js b/test/parallel/test-stream-readable-to-web-termination.js index 13fce9bc715e1e..f30cf721e14cd8 100644 --- a/test/parallel/test-stream-readable-to-web-termination.js +++ b/test/parallel/test-stream-readable-to-web-termination.js @@ -1,6 +1,8 @@ 'use strict'; -require('../common'); -const { Readable } = require('stream'); +const common = require('../common'); +const assert = require('assert'); +const { Duplex, Readable } = require('stream'); +const { setTimeout: delay } = require('timers/promises'); { const r = Readable.from([]); @@ -10,3 +12,33 @@ const { Readable } = require('stream'); const reader = Readable.toWeb(r).getReader(); reader.read(); } + +{ + const duplex = new Duplex({ + read() { + this.push(Buffer.from('x')); + this.push(null); + }, + write(_chunk, _encoding, callback) { + callback(); + }, + }); + + const reader = Readable.toWeb(duplex).getReader(); + + (async () => { + const result = await reader.read(); + assert.deepStrictEqual(result, { + value: new Uint8Array(Buffer.from('x')), + done: false, + }); + + const closeResult = await Promise.race([ + reader.read(), + delay(common.platformTimeout(100)).then(() => 'timeout'), + ]); + + assert.notStrictEqual(closeResult, 'timeout'); + assert.deepStrictEqual(closeResult, { value: undefined, done: true }); + })().then(common.mustCall()); +} diff --git a/test/parallel/test-whatwg-webstreams-adapters-to-readablestream.js b/test/parallel/test-whatwg-webstreams-adapters-to-readablestream.js index 66af7b128c4d5a..eb01a39d8dc168 100644 --- a/test/parallel/test-whatwg-webstreams-adapters-to-readablestream.js +++ b/test/parallel/test-whatwg-webstreams-adapters-to-readablestream.js @@ -4,6 +4,7 @@ const common = require('../common'); const assert = require('assert'); +const { once } = require('events'); const { newReadableStreamFromStreamReadable, @@ -11,6 +12,7 @@ const { const { Duplex, + PassThrough, Readable, } = require('stream'); @@ -188,11 +190,142 @@ const { } { - const readable = new Readable(); + /** + * Runs the same assertion across finalize-before/after and + * default/BYOB adapter creation orders. + * @param {(readable: Readable) => void | Promise} finalize + * Finalizes the source stream before or after adaptation. + * @param {(readAndAssert: () => Promise, reader: ReadableStreamReader) => Promise} postAssert + * Asserts the resulting web stream state for the current case. + * @param {() => Readable} [createReadable] + * Creates the source stream for each case. + */ + function testConfluence(finalize, postAssert, createReadable = () => new PassThrough()) { + const cases = [false, true].flatMap((finalizeFirst) => [false, true].map((isBYOB) => ({ finalizeFirst, isBYOB }))); + Promise.all(cases.map(async (case_) => { + try { + const { isBYOB } = case_; + const readable = createReadable(); + if (case_.finalizeFirst) { + await finalize(readable); + } + /** @type {ReadableStream} */ + const readableStream = newReadableStreamFromStreamReadable(readable, { type: isBYOB ? 'bytes' : undefined }); + const reader = readableStream.getReader({ mode: isBYOB ? 'byob' : undefined }); + if (!case_.finalizeFirst) { + await finalize(readable); + } + + const readAndAssert = common.mustCall(() => { + return reader.read(isBYOB ? new Uint8Array(1) : undefined).then((result) => { + assert.deepStrictEqual(result, { + value: isBYOB ? new Uint8Array(0) : undefined, + done: true, + }); + }); + }); + await postAssert(readAndAssert, reader); + } catch (cause) { + throw new Error(`Case failed: ${JSON.stringify(case_)}`, { cause }); + } + })).then(common.mustCall()); + } + const error = new Error('boom'); + // Ending the readable without an error => closes the readableStream without an error + testConfluence( + async (readable) => { + readable.resume(); + readable.end(); + await once(readable, 'end'); + }, + common.mustCall(async (readAndAssert, reader) => { + await readAndAssert(); + await reader.closed; + }, 4) + ); + // Prematurely destroying the stream.Readable without an error + // => errors the ReadableStream with a premature close error + testConfluence( + (readable) => readable.destroy(), + common.mustCall(async (readAndAssert, reader) => { + const errorPredicate = { code: 'ABORT_ERR' }; + await assert.rejects(readAndAssert(), errorPredicate); + await assert.rejects(reader.closed, errorPredicate); + }, 4) + ); + // Asynchronously destroyed readable => errors the ReadableStream with a + // premature close error regardless of adapter creation order. + class AsyncDestroyReadable extends Readable { + _read() {} + + _destroy(error, callback) { + setImmediate(callback, error); + } + } + testConfluence( + (readable) => readable.destroy(), + common.mustCall(async (readAndAssert, reader) => { + const errorPredicate = { code: 'ABORT_ERR' }; + await assert.rejects(readAndAssert(), errorPredicate); + await assert.rejects(reader.closed, errorPredicate); + }, 4), + () => new AsyncDestroyReadable() + ); + // Destroying the readable with an error => errors the readableStream + testConfluence( + common.mustCall((readable) => { + readable.on('error', common.mustCall((reason) => { + assert.strictEqual(reason, error); + })); + readable.destroy(error); + }, 4), + common.mustCall(async (readAndAssert, reader) => { + await assert.rejects(readAndAssert(), error); + await assert.rejects(reader.closed, error); + }, 4) + ); +} + +{ + const readable = new PassThrough(); + readable.end(); readable.destroy(); - const readableStream = newReadableStreamFromStreamReadable(readable); - const reader = readableStream.getReader(); - reader.closed.then(common.mustCall()); + + (async () => { + await new Promise((resolve) => readable.once('close', resolve)); + assert.strictEqual(readable.listenerCount('error'), 0); + + const readableStream = newReadableStreamFromStreamReadable(readable); + // Only one error listener from the adapter should be added + assert.strictEqual(readable.listenerCount('error'), 1); + newReadableStreamFromStreamReadable(readable); + // No duplicate listeners should be added. + assert.strictEqual(readable.listenerCount('error'), 1); + + const readResult = await readableStream.getReader().read(); + assert.deepStrictEqual(readResult, { value: undefined, done: true }); + })().then(common.mustCall()); +} + +{ + const readable = new PassThrough(); + const readableStream = newReadableStreamFromStreamReadable(readable, { + type: 'bytes', + }); + const reader = readableStream.getReader({ mode: 'byob' }); + + (async () => { + const readPromise = reader.read(new Uint8Array(8)); + + readable.end(); + await once(readable, 'end'); + + assert.deepStrictEqual(await readPromise, { + value: new Uint8Array(0), + done: true, + }); + await reader.closed; + })().then(common.mustCall()); } {