From 779cdc108babdc2011dd7fd3438341c41853484e Mon Sep 17 00:00:00 2001 From: Michael Chan Date: Mon, 22 Jun 2026 13:25:04 +1000 Subject: [PATCH 1/2] Fix file upload retry reusing a consumed stream body `fetchWithRetry` retried by re-calling `fetch` with the same `init`. When the request body is a Node stream (as in `uploadUsingPutObject` and the multipart part upload), the stream is consumed on the first attempt. On a retry undici was handed an already-disturbed/locked stream and threw the misleading `Response body object should not be disturbed or locked`, masking the underlying network error. `fetchWithRetry` now accepts an optional body factory that produces a fresh body for every attempt, and `uploadUsingPutObject` passes one. When a stream body is supplied without a factory, the request is attempted only once so the real error surfaces instead of the confusing undici message. Co-Authored-By: Claude --- __tests__/lib/client-file-uploader.js | 88 +++++++++++++++++++++++++++ src/lib/client-file-uploader.ts | 70 +++++++++++++++------ 2 files changed, 139 insertions(+), 19 deletions(-) diff --git a/__tests__/lib/client-file-uploader.js b/__tests__/lib/client-file-uploader.js index e185573ec..0f46bc522 100644 --- a/__tests__/lib/client-file-uploader.js +++ b/__tests__/lib/client-file-uploader.js @@ -2,13 +2,22 @@ * @format */ +import { PassThrough } from 'stream'; +import { fetch } from 'undici'; + import { + fetchWithRetry, getFileHash, getFileMeta, getPartBoundaries, parseEtagHeader, } from '../../src/lib/client-file-uploader'; +jest.mock( 'undici', () => { + const actual = jest.requireActual( 'undici' ); + return { ...actual, fetch: jest.fn() }; +} ); + describe( 'client-file-uploader', () => { describe( 'getFileMeta()', () => { it( 'should get meta from a 67mb sql file', async () => { @@ -109,4 +118,83 @@ describe( 'client-file-uploader', () => { expect( parseEtagHeader( 'abc123' ) ).toBe( 'abc123' ); } ); } ); + + describe( 'fetchWithRetry()', () => { + beforeEach( () => { + fetch.mockReset(); + } ); + + afterEach( () => { + jest.useRealTimers(); + } ); + + it( 'should return the response without retrying on success', async () => { + const response = { status: 200 }; + fetch.mockResolvedValueOnce( response ); + + await expect( fetchWithRetry( 'https://example.com', { method: 'PUT' } ) ).resolves.toBe( + response + ); + expect( fetch ).toHaveBeenCalledTimes( 1 ); + } ); + + it( 'should recreate the body for each attempt when a factory is provided', async () => { + jest.useFakeTimers(); + const response = { status: 200 }; + fetch.mockRejectedValueOnce( new Error( 'ECONNRESET' ) ).mockResolvedValueOnce( response ); + + let calls = 0; + const createBody = jest.fn( () => `body-${ ++calls }` ); + + const promise = fetchWithRetry( 'https://example.com', { method: 'PUT' }, 3, createBody ); + await jest.advanceTimersByTimeAsync( 1000 ); + + await expect( promise ).resolves.toBe( response ); + expect( fetch ).toHaveBeenCalledTimes( 2 ); + expect( createBody ).toHaveBeenCalledTimes( 2 ); + // Each attempt must receive a fresh body, never a reused/consumed one. + expect( fetch.mock.calls[ 0 ][ 1 ].body ).toBe( 'body-1' ); + expect( fetch.mock.calls[ 1 ][ 1 ].body ).toBe( 'body-2' ); + } ); + + it( 'should not retry a one-shot stream body without a factory', async () => { + const err = new Error( 'socket hang up' ); + fetch.mockRejectedValue( err ); + + // A stream body can only be consumed once; retrying it would throw the + // misleading "Response body object should not be disturbed or locked". + await expect( + fetchWithRetry( 'https://example.com', { method: 'PUT', body: new PassThrough() } ) + ).rejects.toThrow( 'socket hang up' ); + expect( fetch ).toHaveBeenCalledTimes( 1 ); + } ); + + it( 'should retry a non-stream body using the same init', async () => { + jest.useFakeTimers(); + const response = { status: 200 }; + fetch.mockRejectedValueOnce( new Error( 'flaky' ) ).mockResolvedValueOnce( response ); + + const promise = fetchWithRetry( 'https://example.com', { + method: 'PUT', + body: 'plain-string', + } ); + await jest.advanceTimersByTimeAsync( 1000 ); + + await expect( promise ).resolves.toBe( response ); + expect( fetch ).toHaveBeenCalledTimes( 2 ); + } ); + + it( 'should throw the last error after exhausting retries', async () => { + jest.useFakeTimers(); + fetch.mockRejectedValue( new Error( 'persistent failure' ) ); + + const settled = fetchWithRetry( 'https://example.com', { method: 'GET' }, 2 ).catch( + err => err + ); + await jest.advanceTimersByTimeAsync( 1000 + 2000 ); + + await expect( settled ).resolves.toThrow( 'persistent failure' ); + expect( fetch ).toHaveBeenCalledTimes( 3 ); // initial attempt + 2 retries + } ); + } ); } ); diff --git a/src/lib/client-file-uploader.ts b/src/lib/client-file-uploader.ts index ca1f87cb5..5fee328ac 100644 --- a/src/lib/client-file-uploader.ts +++ b/src/lib/client-file-uploader.ts @@ -31,17 +31,40 @@ export function parseEtagHeader( etag: string ): string { return normalizedEtag.replace( /^"(.*)"$/u, '$1' ); } -async function fetchWithRetry( +type BodyFactory = () => RequestInit[ 'body' ]; + +/** + * Wraps `fetch` with exponential-backoff retries. + * + * A request body that is a Node stream can only be consumed once. If the body + * is a stream and we retried with the same `init`, the second attempt would + * pass an already-consumed (disturbed/locked) stream to `fetch`, which throws + * the misleading `Response body object should not be disturbed or locked` error + * and masks the original network failure. + * + * To retry safely, callers that send a stream body should pass `createBody`, + * which is invoked to produce a fresh body for every attempt. If a stream body + * is supplied without a `createBody` factory, the request is attempted only + * once so the underlying error surfaces instead of the misleading undici one. + */ +export async function fetchWithRetry( input: string | URL, - init?: RequestInit, - retries = 3 + init: RequestInit = {}, + retries = 3, + createBody?: BodyFactory ): Promise< Response > { - for ( let attempt = 0; attempt <= retries; attempt++ ) { + const bodyIsStream = + typeof ( init.body as { pipe?: unknown } | null | undefined )?.pipe === 'function'; + // Only retry when we can hand `fetch` a fresh, replayable body each attempt. + const maxAttempts = createBody || ! bodyIsStream ? retries : 0; + + for ( let attempt = 0; attempt <= maxAttempts; attempt++ ) { + const requestInit = createBody ? { ...init, body: createBody() } : init; try { // eslint-disable-next-line no-await-in-loop - return await fetch( input, init ); + return await fetch( input, requestInit ); } catch ( err ) { - if ( attempt === retries ) { + if ( attempt === maxAttempts ) { throw err; } // eslint-disable-next-line no-await-in-loop @@ -298,21 +321,30 @@ async function uploadUsingPutObject( { 'Content-Length': `${ fileSize }`, // This has to be a string }; - let readBytes = 0; - const progressPassThrough = new PassThrough(); - progressPassThrough.on( 'data', ( data: Buffer | string ) => { - readBytes += data.length; - const percentage = `${ Math.floor( ( 100 * readBytes ) / fileSize ) }%`; - debug( percentage ); - if ( typeof progressCallback === 'function' ) { - progressCallback( percentage ); + // Build a fresh request body for every attempt. The upload streams the file + // from disk, and a stream can only be consumed once; recreating it per + // attempt lets `fetchWithRetry` retry without reusing a consumed stream + // (which would otherwise throw `... should not be disturbed or locked`). + const createBody = (): RequestInit[ 'body' ] => { + if ( fileContent ) { + return fileContent; } - } ); - const response = await fetchWithRetry( presignedRequest.url, { - ...fetchOptions, - body: fileContent ?? createReadStream( fileName ).pipe( progressPassThrough ), - } ); + let readBytes = 0; + const progressPassThrough = new PassThrough(); + progressPassThrough.on( 'data', ( data: Buffer | string ) => { + readBytes += data.length; + const percentage = `${ Math.floor( ( 100 * readBytes ) / fileSize ) }%`; + debug( percentage ); + if ( typeof progressCallback === 'function' ) { + progressCallback( percentage ); + } + } ); + + return createReadStream( fileName ).pipe( progressPassThrough ); + }; + + const response = await fetchWithRetry( presignedRequest.url, fetchOptions, 3, createBody ); if ( response.status === 200 ) { return 'ok'; From aaeec79db003dd67839400a0181c2343993fd339 Mon Sep 17 00:00:00 2001 From: Volodymyr Kolesnykov Date: Mon, 22 Jun 2026 22:21:35 +0300 Subject: [PATCH 2/2] fix(upload): retry multipart part uploads Purpose and Context: Extend the disturbed/locked stream-body fix to the multipart upload path. `uploadPart` previously built its request body once (`createReadStream(...).pipe(progressPassThrough)`) and called `fetchWithRetry` without a `createBody` factory, so multipart part uploads were attempted only once and could not recover from a transient network failure. The reported customer archive exceeds MULTIPART_THRESHOLD, so it uses this path; without this change the PR only converts the misleading "should not be disturbed or locked" error into the real network error for that case, rather than restoring retry/recovery. Key Changes: - `uploadPart` now forwards a per-attempt `createBody` factory to `fetchWithRetry(url, opts, 3, createBody)`, mirroring the single-PUT `uploadUsingPutObject` path. - The per-part factory (built in `uploadParts`) recreates the ranged read stream and its progress PassThrough per attempt and resets that part's counters, so retries re-stream the part without reusing a consumed stream and without double-counting bytes. - Aggregate progress is now summed from a per-part `partBytesRead` array, keeping totals correct across concurrent parts and retries. - Export `BodyFactory` for the public `UploadPartArgs` type. - Import `setInterval`/`clearInterval` from `node:timers` instead of relying on test-sandbox globals; under Node 26 the jest environment does not expose them as globals, which broke the new tests. Impact and Considerations: Production behaviour is unchanged for successful uploads. The presigned request is still fetched once per part (outside `fetchWithRetry`), so the URL is reused across attempts, as on the single-PUT path. No config or migration changes are required. Testing and Validation: Added a `uploadParts()` test suite (mocking `undici` fetch and `../lib/api/http`): multi-part progress aggregation stays within bounds and finishes at 100%, and a transient ECONNRESET on the first attempt recovers on retry (fetch twice, http once) with final progress exactly 100%, proving the per-attempt counter reset prevents double-counting. Verified with check-types, eslint, babel build, and the `__tests__/lib` suite on Node 20, 22, 24, and 26. Refs: PLTFRM-2491 Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- __tests__/lib/client-file-uploader.js | 135 ++++++++++++++++++++++++++ src/lib/client-file-uploader.ts | 53 ++++++---- 2 files changed, 169 insertions(+), 19 deletions(-) diff --git a/__tests__/lib/client-file-uploader.js b/__tests__/lib/client-file-uploader.js index 0f46bc522..76e149dab 100644 --- a/__tests__/lib/client-file-uploader.js +++ b/__tests__/lib/client-file-uploader.js @@ -2,15 +2,20 @@ * @format */ +import { mkdtempSync, rmSync, writeFileSync } from 'fs'; +import os from 'os'; +import path from 'path'; import { PassThrough } from 'stream'; import { fetch } from 'undici'; +import http from '../../src/lib/api/http'; import { fetchWithRetry, getFileHash, getFileMeta, getPartBoundaries, parseEtagHeader, + uploadParts, } from '../../src/lib/client-file-uploader'; jest.mock( 'undici', () => { @@ -18,6 +23,11 @@ jest.mock( 'undici', () => { return { ...actual, fetch: jest.fn() }; } ); +jest.mock( '../../src/lib/api/http', () => ( { + __esModule: true, + default: jest.fn(), +} ) ); + describe( 'client-file-uploader', () => { describe( 'getFileMeta()', () => { it( 'should get meta from a 67mb sql file', async () => { @@ -197,4 +207,129 @@ describe( 'client-file-uploader', () => { expect( fetch ).toHaveBeenCalledTimes( 3 ); // initial attempt + 2 retries } ); } ); + + describe( 'uploadParts()', () => { + let tmpDir; + + const presignedResponse = () => ( { + status: 200, + json: async () => ( { + url: 'https://s3.example.com/upload-part', + options: { method: 'PUT', headers: {} }, + } ), + } ); + + const uploadOkResponse = etag => ( { + status: 200, + headers: { get: header => ( header === 'etag' ? `"${ etag }"` : null ) }, + } ); + + // Real `fetch` consumes the request body; the mock must drain it so the + // progress PassThrough emits 'data' deterministically and the file stream + // closes (otherwise it would error after the temp dir is removed). + const drainBody = body => + new Promise( ( resolve, reject ) => { + if ( ! body || typeof body.resume !== 'function' ) { + resolve(); + return; + } + body.on( 'end', resolve ); + body.on( 'close', resolve ); + body.on( 'error', reject ); + body.resume(); + } ); + + const writeTempFile = ( name, size ) => { + const fileName = path.join( tmpDir, name ); + writeFileSync( fileName, Buffer.alloc( size, 'a' ) ); + return fileName; + }; + + beforeAll( () => { + tmpDir = mkdtempSync( path.join( os.tmpdir(), 'vip-cli-upload-parts-' ) ); + } ); + + afterAll( () => { + rmSync( tmpDir, { recursive: true, force: true } ); + } ); + + beforeEach( () => { + fetch.mockReset(); + http.mockReset(); + } ); + + it( 'should upload every part and aggregate progress without exceeding 100%', async () => { + const fileSize = 200; + const fileName = writeTempFile( 'two-parts.bin', fileSize ); + const parts = [ + { start: 0, end: 99, index: 0, partSize: 100 }, + { start: 100, end: 199, index: 1, partSize: 100 }, + ]; + + http.mockResolvedValue( presignedResponse() ); + let etagCounter = 0; + fetch.mockImplementation( async ( _url, init ) => { + await drainBody( init.body ); + return uploadOkResponse( `etag-${ etagCounter++ }` ); + } ); + + const progress = []; + const result = await uploadParts( { + app: { id: 1 }, + env: { id: 2 }, + fileMeta: { basename: 'two-parts.bin', fileName, fileSize, isCompressed: false }, + uploadId: 'upload-id', + parts, + progressCallback: percentage => progress.push( percentage ), + } ); + + expect( result ).toHaveLength( 2 ); + expect( result.map( part => part.PartNumber ).sort() ).toEqual( [ 1, 2 ] ); + expect( result.every( part => typeof part.ETag === 'string' ) ).toBe( true ); + expect( fetch ).toHaveBeenCalledTimes( 2 ); + // Every reported percentage must stay within bounds, and the upload must + // finish at exactly 100% (aggregated across both parts). + expect( progress.every( pct => parseInt( pct, 10 ) <= 100 ) ).toBe( true ); + expect( progress[ progress.length - 1 ] ).toBe( '100%' ); + } ); + + it( 'should retry a failed part without double-counting its progress', async () => { + const fileSize = 100; + const fileName = writeTempFile( 'one-part.bin', fileSize ); + const parts = [ { start: 0, end: 99, index: 0, partSize: 100 } ]; + + http.mockResolvedValue( presignedResponse() ); + fetch + .mockImplementationOnce( async ( _url, init ) => { + // Fail after the body has streamed, mirroring a mid-flight reset. + await drainBody( init.body ); + throw new Error( 'ECONNRESET' ); + } ) + .mockImplementationOnce( async ( _url, init ) => { + await drainBody( init.body ); + return uploadOkResponse( 'etag-retry' ); + } ); + + const progress = []; + const result = await uploadParts( { + app: { id: 1 }, + env: { id: 2 }, + fileMeta: { basename: 'one-part.bin', fileName, fileSize, isCompressed: false }, + uploadId: 'upload-id', + parts, + progressCallback: percentage => progress.push( percentage ), + } ); + + expect( result ).toHaveLength( 1 ); + expect( result[ 0 ].ETag ).toBe( 'etag-retry' ); + // Retried via fetchWithRetry: two fetch attempts... + expect( fetch ).toHaveBeenCalledTimes( 2 ); + // ...but the presigned request is fetched only once per part. + expect( http ).toHaveBeenCalledTimes( 1 ); + // The first (failed) attempt still streamed the part's bytes; the retry + // must reset this part's counter so progress never exceeds 100%. + expect( progress.every( pct => parseInt( pct, 10 ) <= 100 ) ).toBe( true ); + expect( progress[ progress.length - 1 ] ).toBe( '100%' ); + }, 15000 ); + } ); } ); diff --git a/src/lib/client-file-uploader.ts b/src/lib/client-file-uploader.ts index 5fee328ac..bea93aff6 100644 --- a/src/lib/client-file-uploader.ts +++ b/src/lib/client-file-uploader.ts @@ -4,6 +4,7 @@ import debugLib from 'debug'; import { constants, createReadStream, createWriteStream, type ReadStream } from 'fs'; import { access, mkdtemp, open, stat } from 'node:fs/promises'; import { pipeline } from 'node:stream/promises'; +import { clearInterval, setInterval } from 'node:timers'; import { setTimeout } from 'node:timers/promises'; import os from 'os'; import path from 'path'; @@ -31,7 +32,7 @@ export function parseEtagHeader( etag: string ): string { return normalizedEtag.replace( /^"(.*)"$/u, '$1' ); } -type BodyFactory = () => RequestInit[ 'body' ]; +export type BodyFactory = () => RequestInit[ 'body' ]; /** * Wraps `fetch` with exponential-backoff retries. @@ -580,8 +581,8 @@ export async function uploadParts( { progressCallback, }: UploadPartsArgs ) { let uploadsInProgress = 0; - let totalBytesRead = 0; const partPercentages = new Array< number >( parts.length ).fill( 0 ); + const partBytesRead = new Array< number >( parts.length ).fill( 0 ); const readyForPartUpload = () => new Promise< void >( resolve => { @@ -595,6 +596,7 @@ export async function uploadParts( { } ); const updateProgress = () => { + const totalBytesRead = partBytesRead.reduce( ( sum, bytes ) => sum + bytes, 0 ); const percentage = `${ Math.floor( ( 100 * totalBytesRead ) / fileMeta.fileSize ) }%`; if ( typeof progressCallback === 'function' ) { @@ -619,15 +621,25 @@ export async function uploadParts( { const allDone = await Promise.all( parts.map( async part => { - const { index, partSize } = part; - const progressPassThrough = new PassThrough(); - - let partBytesRead = 0; - progressPassThrough.on( 'data', ( data: Buffer | string ) => { - totalBytesRead += data.length; - partBytesRead += data.length; - partPercentages[ index ] = Math.floor( ( 100 * partBytesRead ) / partSize ); - } ); + const { index, partSize, start, end } = part; + + // Build a fresh request body for every attempt. A stream can only be + // consumed once, so recreating the ranged read stream (and its progress + // PassThrough) per attempt lets `fetchWithRetry` retry without reusing a + // disturbed/locked stream. Reset this part's progress counters so a retry + // re-streams the part without double-counting bytes. + const createBody = (): RequestInit[ 'body' ] => { + partBytesRead[ index ] = 0; + partPercentages[ index ] = 0; + + const progressPassThrough = new PassThrough(); + progressPassThrough.on( 'data', ( data: Buffer | string ) => { + partBytesRead[ index ] += data.length; + partPercentages[ index ] = Math.floor( ( 100 * partBytesRead[ index ] ) / partSize ); + } ); + + return createReadStream( fileMeta.fileName, { start, end } ).pipe( progressPassThrough ); + }; await readyForPartUpload(); @@ -636,7 +648,7 @@ export async function uploadParts( { env, fileMeta, part, - progressPassThrough, + createBody, uploadId, } ); @@ -657,18 +669,18 @@ export interface UploadPartArgs { env: WithId; fileMeta: FileMeta; part: Part; - progressPassThrough: PassThrough; + createBody: BodyFactory; uploadId: string; } async function uploadPart( { app, env, - fileMeta: { basename, fileName }, + fileMeta: { basename }, part, - progressPassThrough, + createBody, uploadId, }: UploadPartArgs ) { - const { end, index, partSize, start } = part; + const { index, partSize } = part; const s3PartNumber = index + 1; // S3 multipart is indexed from 1 // TODO: handle failures / retries, etc. @@ -694,9 +706,12 @@ async function uploadPart( { */ }; - fetchOptions.body = createReadStream( fileName, { start, end } ).pipe( progressPassThrough ); - - const fetchResponse = await fetchWithRetry( partUploadRequestData.url, fetchOptions ); + const fetchResponse = await fetchWithRetry( + partUploadRequestData.url, + fetchOptions, + 3, + createBody + ); if ( fetchResponse.status === 200 ) { const etag = fetchResponse.headers.get( 'etag' );