Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
97 changes: 97 additions & 0 deletions __tests__/lib/client-file-uploader.js
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import {
getFileMeta,
getPartBoundaries,
parseEtagHeader,
uploadImportFileToS3,
uploadParts,
} from '../../src/lib/client-file-uploader';

Expand Down Expand Up @@ -206,6 +207,102 @@ describe( 'client-file-uploader', () => {
await expect( settled ).resolves.toThrow( 'persistent failure' );
expect( fetch ).toHaveBeenCalledTimes( 3 ); // initial attempt + 2 retries
} );

it( 'should include the underlying fetch cause after exhausting retries', async () => {
jest.useFakeTimers();
const cause = new Error( 'write ECONNRESET' );
const err = new TypeError( 'fetch failed' );
err.cause = cause;
fetch.mockRejectedValue( err );

const settled = fetchWithRetry( 'https://example.com', { method: 'GET' }, 1 ).catch(
fetchErr => fetchErr
);
await jest.advanceTimersByTimeAsync( 1000 );

await expect( settled ).resolves.toThrow( 'fetch failed: write ECONNRESET' );
await expect( settled ).resolves.toHaveProperty( 'cause', err );
expect( fetch ).toHaveBeenCalledTimes( 2 ); // initial attempt + 1 retry
} );

it( 'should omit unsupported expect headers before sending fetch requests', async () => {
const response = { status: 200 };
fetch.mockResolvedValueOnce( response );

await expect(
fetchWithRetry( 'https://example.com', {
method: 'POST',
headers: { Expect: '100-continue', 'Content-Type': 'application/xml' },
body: '<xml />',
} )
).resolves.toBe( response );

expect( fetch ).toHaveBeenCalledTimes( 1 );
const headers = fetch.mock.calls[ 0 ][ 1 ].headers;
expect( headers.get( 'expect' ) ).toBeNull();
expect( headers.get( 'content-type' ) ).toBe( 'application/xml' );
} );
} );

describe( 'uploadImportFileToS3()', () => {
let tmpDir;

const presignedResponse = () => ( {
status: 200,
json: async () => ( {
url: 'https://s3.example.com/put-object',
options: { method: 'PUT', headers: {} },
} ),
} );

const writeTempFile = ( name, size ) => {
const fileName = path.join( tmpDir, name );
writeFileSync( fileName, Buffer.alloc( size, 'a' ) );
return fileName;
};

beforeAll( () => {
tmpDir = mkdtempSync( path.join( os.tmpdir(), 'vip-cli-upload-import-' ) );
} );

afterAll( () => {
rmSync( tmpDir, { recursive: true, force: true } );
} );

beforeEach( () => {
fetch.mockReset();
http.mockReset();
} );

it( 'should stream exactly Content-Length bytes for single-PUT uploads', async () => {
const fileSize = 256;
const fileName = writeTempFile( 'single-put.zip', fileSize );
const progress = [];
let uploadedBytes = 0;

http.mockResolvedValue( presignedResponse() );
fetch.mockImplementation( async ( _url, init ) => {
expect( init.headers.get( 'content-length' ) ).toBe( `${ fileSize }` );

for await ( const chunk of init.body ) {
uploadedBytes += chunk.length;
}

return { status: 200 };
} );

const result = await uploadImportFileToS3( {
app: { id: 1 },
env: { id: 2 },
fileMeta: { basename: 'single-put.zip', fileName, fileSize, isCompressed: true },
progressCallback: percentage => progress.push( percentage ),
} );

expect( result.result ).toBe( 'ok' );
expect( uploadedBytes ).toBe( fileSize );
expect( progress[ progress.length - 1 ] ).toBe( '100%' );
expect( fetch ).toHaveBeenCalledTimes( 1 );
} );
} );

describe( 'uploadParts()', () => {
Expand Down
59 changes: 49 additions & 10 deletions src/lib/client-file-uploader.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ import { clearInterval, setInterval } from 'node:timers';
import { setTimeout } from 'node:timers/promises';
import os from 'os';
import path from 'path';
import { PassThrough } from 'stream';
import { fetch, type HeadersInit, type RequestInit, type Response } from 'undici';
import { Transform } from 'stream';
import { fetch, Headers, type HeadersInit, type RequestInit, type Response } from 'undici';
import { Parser as XmlParser } from 'xml2js';
import { createGunzip, createGzip, Gunzip, ZlibOptions } from 'zlib';

Expand Down Expand Up @@ -39,6 +39,47 @@ type RequestInitWithDuplex = RequestInit & { duplex?: 'half' };
const isStreamBody = ( body: RequestInit[ 'body' ] ): boolean =>
typeof ( body as { pipe?: unknown } | null | undefined )?.pipe === 'function';

function createProgressTransform( onChunk: ( data: Buffer ) => void ): Transform {
return new Transform( {
transform( chunk: Buffer, _encoding, callback ) {
onChunk( chunk );
callback( null, chunk );
},
} );
}

function omitUnsupportedFetchHeaders( init: RequestInitWithDuplex ): RequestInitWithDuplex {
if ( ! init.headers ) {
return init;
}

const headers = new Headers( init.headers );
headers.delete( 'expect' );

return { ...init, headers };
}

function withCauseMessage( err: unknown ): unknown {
if ( ! ( err instanceof Error ) ) {
return err;
}

const cause = ( err as Error & { cause?: unknown } ).cause;
let causeMessage: string | undefined;

if ( cause instanceof Error ) {
causeMessage = cause.message;
} else if ( typeof cause === 'string' ) {
causeMessage = cause;
}

if ( ! causeMessage || err.message.includes( causeMessage ) ) {
return err;
}

return new Error( `${ err.message }: ${ causeMessage }`, { cause: err } );
}

/**
* Wraps `fetch` with exponential-backoff retries.
*
Expand Down Expand Up @@ -72,10 +113,10 @@ export async function fetchWithRetry(
}
try {
// eslint-disable-next-line no-await-in-loop
return await fetch( input, requestInit );
return await fetch( input, omitUnsupportedFetchHeaders( requestInit ) );
} catch ( err ) {
if ( attempt === maxAttempts ) {
throw err;
throw withCauseMessage( err );
}
// eslint-disable-next-line no-await-in-loop
await setTimeout( Math.pow( 2, attempt ) * 1000 ); // 1000, 2000, 4000
Expand Down Expand Up @@ -341,8 +382,7 @@ async function uploadUsingPutObject( {
}

let readBytes = 0;
const progressPassThrough = new PassThrough();
progressPassThrough.on( 'data', ( data: Buffer | string ) => {
const progressTransform = createProgressTransform( data => {
readBytes += data.length;
const percentage = `${ Math.floor( ( 100 * readBytes ) / fileSize ) }%`;
debug( percentage );
Expand All @@ -351,7 +391,7 @@ async function uploadUsingPutObject( {
}
} );

return createReadStream( fileName ).pipe( progressPassThrough );
return createReadStream( fileName ).pipe( progressTransform );
};

const response = await fetchWithRetry( presignedRequest.url, fetchOptions, 3, createBody );
Expand Down Expand Up @@ -641,13 +681,12 @@ export async function uploadParts( {
partBytesRead[ index ] = 0;
partPercentages[ index ] = 0;

const progressPassThrough = new PassThrough();
progressPassThrough.on( 'data', ( data: Buffer | string ) => {
const progressTransform = createProgressTransform( data => {
partBytesRead[ index ] += data.length;
partPercentages[ index ] = Math.floor( ( 100 * partBytesRead[ index ] ) / partSize );
} );

return createReadStream( fileMeta.fileName, { start, end } ).pipe( progressPassThrough );
return createReadStream( fileMeta.fileName, { start, end } ).pipe( progressTransform );
};

await readyForPartUpload();
Expand Down