From 4ca2ef8b7e537d4bc6580db88599963775736b97 Mon Sep 17 00:00:00 2001 From: Volodymyr Kolesnykov Date: Tue, 23 Jun 2026 01:45:30 +0300 Subject: [PATCH] fix(upload): set duplex for stream retry bodies --- __tests__/lib/fetch-with-retry-undici.js | 27 ++++++++++++++++++++++++ src/lib/client-file-uploader.ts | 15 ++++++++++--- 2 files changed, 39 insertions(+), 3 deletions(-) create mode 100644 __tests__/lib/fetch-with-retry-undici.js diff --git a/__tests__/lib/fetch-with-retry-undici.js b/__tests__/lib/fetch-with-retry-undici.js new file mode 100644 index 000000000..d8e413418 --- /dev/null +++ b/__tests__/lib/fetch-with-retry-undici.js @@ -0,0 +1,27 @@ +/** + * @format + */ + +import { Readable } from 'node:stream'; + +import { fetchWithRetry } from '../../src/lib/client-file-uploader'; +import { getUndiciMockPool, resetUndiciMockAgent } from '../../test-utils/undici-mock'; + +describe( 'fetchWithRetry() with real undici', () => { + afterEach( resetUndiciMockAgent ); + + it( 'should add duplex for stream bodies created per attempt', async () => { + const pool = getUndiciMockPool( 'https://upload.example.com' ); + pool.intercept( { method: 'PUT', path: '/upload' } ).reply( 200, 'ok' ); + + const response = await fetchWithRetry( + 'https://upload.example.com/upload', + { method: 'PUT' }, + 0, + () => Readable.from( [ 'hello' ] ) + ); + + expect( response.status ).toBe( 200 ); + await expect( response.text() ).resolves.toBe( 'ok' ); + } ); +} ); diff --git a/src/lib/client-file-uploader.ts b/src/lib/client-file-uploader.ts index bea93aff6..abda49cbd 100644 --- a/src/lib/client-file-uploader.ts +++ b/src/lib/client-file-uploader.ts @@ -34,6 +34,11 @@ export function parseEtagHeader( etag: string ): string { export type BodyFactory = () => RequestInit[ 'body' ]; +type RequestInitWithDuplex = RequestInit & { duplex?: 'half' }; + +const isStreamBody = ( body: RequestInit[ 'body' ] ): boolean => + typeof ( body as { pipe?: unknown } | null | undefined )?.pipe === 'function'; + /** * Wraps `fetch` with exponential-backoff retries. * @@ -54,13 +59,17 @@ export async function fetchWithRetry( retries = 3, createBody?: BodyFactory ): Promise< Response > { - const bodyIsStream = - typeof ( init.body as { pipe?: unknown } | null | undefined )?.pipe === 'function'; + const bodyIsStream = isStreamBody( init.body ); // Only retry when we can hand `fetch` a fresh, replayable body each attempt. const maxAttempts = createBody || ! bodyIsStream ? retries : 0; for ( let attempt = 0; attempt <= maxAttempts; attempt++ ) { - const requestInit = createBody ? { ...init, body: createBody() } : init; + const requestInit: RequestInitWithDuplex = createBody + ? { ...init, body: createBody() } + : { ...init }; + if ( isStreamBody( requestInit.body ) && ! requestInit.duplex ) { + requestInit.duplex = 'half'; + } try { // eslint-disable-next-line no-await-in-loop return await fetch( input, requestInit );