diff --git a/__tests__/lib/client-file-uploader.js b/__tests__/lib/client-file-uploader.js index 76e149dab..7ed4fad15 100644 --- a/__tests__/lib/client-file-uploader.js +++ b/__tests__/lib/client-file-uploader.js @@ -15,6 +15,7 @@ import { getFileMeta, getPartBoundaries, parseEtagHeader, + uploadImportFileToS3, uploadParts, } from '../../src/lib/client-file-uploader'; @@ -206,6 +207,102 @@ describe( 'client-file-uploader', () => { await expect( settled ).resolves.toThrow( 'persistent failure' ); expect( fetch ).toHaveBeenCalledTimes( 3 ); // initial attempt + 2 retries } ); + + it( 'should include the underlying fetch cause after exhausting retries', async () => { + jest.useFakeTimers(); + const cause = new Error( 'write ECONNRESET' ); + const err = new TypeError( 'fetch failed' ); + err.cause = cause; + fetch.mockRejectedValue( err ); + + const settled = fetchWithRetry( 'https://example.com', { method: 'GET' }, 1 ).catch( + fetchErr => fetchErr + ); + await jest.advanceTimersByTimeAsync( 1000 ); + + await expect( settled ).resolves.toThrow( 'fetch failed: write ECONNRESET' ); + await expect( settled ).resolves.toHaveProperty( 'cause', err ); + expect( fetch ).toHaveBeenCalledTimes( 2 ); // initial attempt + 1 retry + } ); + + it( 'should omit unsupported expect headers before sending fetch requests', async () => { + const response = { status: 200 }; + fetch.mockResolvedValueOnce( response ); + + await expect( + fetchWithRetry( 'https://example.com', { + method: 'POST', + headers: { Expect: '100-continue', 'Content-Type': 'application/xml' }, + body: '', + } ) + ).resolves.toBe( response ); + + expect( fetch ).toHaveBeenCalledTimes( 1 ); + const headers = fetch.mock.calls[ 0 ][ 1 ].headers; + expect( headers.get( 'expect' ) ).toBeNull(); + expect( headers.get( 'content-type' ) ).toBe( 'application/xml' ); + } ); + } ); + + describe( 'uploadImportFileToS3()', () => { + let tmpDir; + + const presignedResponse = () => ( { + status: 200, + json: async () => ( { + url: 'https://s3.example.com/put-object', + options: { method: 'PUT', headers: {} }, + } ), + } ); + + const writeTempFile = ( name, size ) => { + const fileName = path.join( tmpDir, name ); + writeFileSync( fileName, Buffer.alloc( size, 'a' ) ); + return fileName; + }; + + beforeAll( () => { + tmpDir = mkdtempSync( path.join( os.tmpdir(), 'vip-cli-upload-import-' ) ); + } ); + + afterAll( () => { + rmSync( tmpDir, { recursive: true, force: true } ); + } ); + + beforeEach( () => { + fetch.mockReset(); + http.mockReset(); + } ); + + it( 'should stream exactly Content-Length bytes for single-PUT uploads', async () => { + const fileSize = 256; + const fileName = writeTempFile( 'single-put.zip', fileSize ); + const progress = []; + let uploadedBytes = 0; + + http.mockResolvedValue( presignedResponse() ); + fetch.mockImplementation( async ( _url, init ) => { + expect( init.headers.get( 'content-length' ) ).toBe( `${ fileSize }` ); + + for await ( const chunk of init.body ) { + uploadedBytes += chunk.length; + } + + return { status: 200 }; + } ); + + const result = await uploadImportFileToS3( { + app: { id: 1 }, + env: { id: 2 }, + fileMeta: { basename: 'single-put.zip', fileName, fileSize, isCompressed: true }, + progressCallback: percentage => progress.push( percentage ), + } ); + + expect( result.result ).toBe( 'ok' ); + expect( uploadedBytes ).toBe( fileSize ); + expect( progress[ progress.length - 1 ] ).toBe( '100%' ); + expect( fetch ).toHaveBeenCalledTimes( 1 ); + } ); } ); describe( 'uploadParts()', () => { diff --git a/src/lib/client-file-uploader.ts b/src/lib/client-file-uploader.ts index abda49cbd..e7e8de376 100644 --- a/src/lib/client-file-uploader.ts +++ b/src/lib/client-file-uploader.ts @@ -8,8 +8,8 @@ import { clearInterval, setInterval } from 'node:timers'; import { setTimeout } from 'node:timers/promises'; import os from 'os'; import path from 'path'; -import { PassThrough } from 'stream'; -import { fetch, type HeadersInit, type RequestInit, type Response } from 'undici'; +import { Transform } from 'stream'; +import { fetch, Headers, type HeadersInit, type RequestInit, type Response } from 'undici'; import { Parser as XmlParser } from 'xml2js'; import { createGunzip, createGzip, Gunzip, ZlibOptions } from 'zlib'; @@ -39,6 +39,47 @@ type RequestInitWithDuplex = RequestInit & { duplex?: 'half' }; const isStreamBody = ( body: RequestInit[ 'body' ] ): boolean => typeof ( body as { pipe?: unknown } | null | undefined )?.pipe === 'function'; +function createProgressTransform( onChunk: ( data: Buffer ) => void ): Transform { + return new Transform( { + transform( chunk: Buffer, _encoding, callback ) { + onChunk( chunk ); + callback( null, chunk ); + }, + } ); +} + +function omitUnsupportedFetchHeaders( init: RequestInitWithDuplex ): RequestInitWithDuplex { + if ( ! init.headers ) { + return init; + } + + const headers = new Headers( init.headers ); + headers.delete( 'expect' ); + + return { ...init, headers }; +} + +function withCauseMessage( err: unknown ): unknown { + if ( ! ( err instanceof Error ) ) { + return err; + } + + const cause = ( err as Error & { cause?: unknown } ).cause; + let causeMessage: string | undefined; + + if ( cause instanceof Error ) { + causeMessage = cause.message; + } else if ( typeof cause === 'string' ) { + causeMessage = cause; + } + + if ( ! causeMessage || err.message.includes( causeMessage ) ) { + return err; + } + + return new Error( `${ err.message }: ${ causeMessage }`, { cause: err } ); +} + /** * Wraps `fetch` with exponential-backoff retries. * @@ -72,10 +113,10 @@ export async function fetchWithRetry( } try { // eslint-disable-next-line no-await-in-loop - return await fetch( input, requestInit ); + return await fetch( input, omitUnsupportedFetchHeaders( requestInit ) ); } catch ( err ) { if ( attempt === maxAttempts ) { - throw err; + throw withCauseMessage( err ); } // eslint-disable-next-line no-await-in-loop await setTimeout( Math.pow( 2, attempt ) * 1000 ); // 1000, 2000, 4000 @@ -341,8 +382,7 @@ async function uploadUsingPutObject( { } let readBytes = 0; - const progressPassThrough = new PassThrough(); - progressPassThrough.on( 'data', ( data: Buffer | string ) => { + const progressTransform = createProgressTransform( data => { readBytes += data.length; const percentage = `${ Math.floor( ( 100 * readBytes ) / fileSize ) }%`; debug( percentage ); @@ -351,7 +391,7 @@ async function uploadUsingPutObject( { } } ); - return createReadStream( fileName ).pipe( progressPassThrough ); + return createReadStream( fileName ).pipe( progressTransform ); }; const response = await fetchWithRetry( presignedRequest.url, fetchOptions, 3, createBody ); @@ -641,13 +681,12 @@ export async function uploadParts( { partBytesRead[ index ] = 0; partPercentages[ index ] = 0; - const progressPassThrough = new PassThrough(); - progressPassThrough.on( 'data', ( data: Buffer | string ) => { + const progressTransform = createProgressTransform( data => { partBytesRead[ index ] += data.length; partPercentages[ index ] = Math.floor( ( 100 * partBytesRead[ index ] ) / partSize ); } ); - return createReadStream( fileMeta.fileName, { start, end } ).pipe( progressPassThrough ); + return createReadStream( fileMeta.fileName, { start, end } ).pipe( progressTransform ); }; await readyForPartUpload();