Skip to content

Commit ab68aad

Browse files
committed
stream: validate streams in pipeline
1 parent 2b74812 commit ab68aad

File tree

1 file changed

+79
-7
lines changed

1 file changed

+79
-7
lines changed

lib/internal/streams/pipeline.js

Lines changed: 79 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@ const {
1313
const { eos } = require('internal/streams/end-of-stream');
1414
const { once } = require('internal/util');
1515
const destroyImpl = require('internal/streams/destroy');
16-
const Duplex = require('internal/streams/duplex');
1716
const {
1817
AbortError,
1918
aggregateTwoErrors,
@@ -36,6 +35,8 @@ const {
3635
isIterable,
3736
isReadable,
3837
isReadableNodeStream,
38+
isWritableNodeStream,
39+
isWritableStream,
3940
isNodeStream,
4041
isTransformStream,
4142
isWebStream,
@@ -159,7 +160,8 @@ async function pumpToWeb(readable, writable, finish, { end }) {
159160
try {
160161
for await (const chunk of readable) {
161162
await writer.ready;
162-
writer.write(chunk).catch(() => {});
163+
writer.write(chunk).catch(() => {
164+
});
163165
}
164166

165167
await writer.ready;
@@ -179,6 +181,76 @@ async function pumpToWeb(readable, writable, finish, { end }) {
179181
}
180182
}
181183

184+
function getPipelineArgName(i, len) {
185+
if (i === 0) return 'source';
186+
if (i === len - 1) return 'destination';
187+
return `transform[${i - 1}]`;
188+
}
189+
190+
function isValidPipelineSource(stream) {
191+
return (
192+
isIterable(stream) ||
193+
isReadableNodeStream(stream) ||
194+
isReadableStream(stream) ||
195+
isTransformStream(stream)
196+
);
197+
}
198+
199+
function isValidPipelineTransform(stream) {
200+
return (
201+
isTransformStream(stream) ||
202+
(isNodeStream(stream) &&
203+
isReadableNodeStream(stream) &&
204+
isWritableNodeStream(stream))
205+
);
206+
}
207+
208+
function isValidPipelineDestination(stream) {
209+
return (
210+
isWritableNodeStream(stream) ||
211+
isWritableStream(stream) ||
212+
isTransformStream(stream)
213+
);
214+
}
215+
216+
function validatePipelineStream(stream, i, len) {
217+
if (typeof stream === 'function') {
218+
return;
219+
}
220+
221+
const name = getPipelineArgName(i, len);
222+
223+
if (i === 0) {
224+
if (!isValidPipelineSource(stream)) {
225+
throw new ERR_INVALID_ARG_TYPE(
226+
name,
227+
['Readable', 'Iterable', 'AsyncIterable', 'ReadableStream', 'TransformStream'],
228+
stream,
229+
);
230+
}
231+
return;
232+
}
233+
234+
if (i === len - 1) {
235+
if (!isValidPipelineDestination(stream)) {
236+
throw new ERR_INVALID_ARG_TYPE(
237+
name,
238+
['Writable', 'WritableStream', 'TransformStream'],
239+
stream,
240+
);
241+
}
242+
return;
243+
}
244+
245+
if (!isValidPipelineTransform(stream)) {
246+
throw new ERR_INVALID_ARG_TYPE(
247+
name,
248+
['Duplex', 'Transform', 'TransformStream'],
249+
stream,
250+
);
251+
}
252+
}
253+
182254
function pipeline(...streams) {
183255
return pipelineImpl(streams, once(popCallback(streams)));
184256
}
@@ -192,6 +264,10 @@ function pipelineImpl(streams, callback, opts) {
192264
throw new ERR_MISSING_ARGS('streams');
193265
}
194266

267+
for (let i = 0; i < streams.length; i++) {
268+
validatePipelineStream(streams[i], i, streams.length);
269+
}
270+
195271
const ac = new AbortController();
196272
const signal = ac.signal;
197273
const outerSignal = opts?.signal;
@@ -298,10 +374,8 @@ function pipelineImpl(streams, callback, opts) {
298374
throw new ERR_INVALID_RETURN_VALUE(
299375
'Iterable, AsyncIterable or Stream', 'source', ret);
300376
}
301-
} else if (isIterable(stream) || isReadableNodeStream(stream) || isTransformStream(stream)) {
302-
ret = stream;
303377
} else {
304-
ret = Duplex.from(stream);
378+
ret = stream;
305379
}
306380
} else if (typeof stream === 'function') {
307381
if (isTransformStream(ret)) {
@@ -402,8 +476,6 @@ function pipelineImpl(streams, callback, opts) {
402476
'val', ['Readable', 'Iterable', 'AsyncIterable', 'ReadableStream', 'TransformStream'], ret);
403477
}
404478
ret = stream;
405-
} else {
406-
ret = Duplex.from(stream);
407479
}
408480
}
409481

0 commit comments

Comments
 (0)