From 76544d0d9bd6b0e84b167dd38c7f3b672f15e39c Mon Sep 17 00:00:00 2001 From: Parth Bansal Date: Thu, 16 Apr 2026 13:16:38 +0000 Subject: [PATCH] update --- package-lock.json | 15 + packages/databricks/src/transport/http.ts | 19 +- ...-a-ReadableStream-body-to-the-server-1.png | Bin 0 -> 2900 bytes ...reams-a-large-body-without-buffering-1.png | Bin 0 -> 2900 bytes .../tests/transport/http.server.test.ts | 232 ++++++++++++ .../databricks/tests/transport/http.test.ts | 266 ++++++++++++-- .../tests/transport/test-server.global.ts | 220 +++++++++++ packages/databricks/vitest.config.browser.ts | 7 + packages/databricks/vitest.config.ts | 8 + packages/files/package.json | 35 ++ packages/files/src/v1/client.ts | 135 +++++++ packages/files/src/v1/index.ts | 3 + packages/files/src/v1/model.ts | 39 ++ packages/files/src/v1/utils.ts | 96 +++++ packages/files/tests/tsconfig.json | 9 + packages/files/tests/v1/client.test.ts | 341 ++++++++++++++++++ packages/files/tests/v1/e2e.test.ts | 282 +++++++++++++++ packages/files/tests/v1/utils.test.ts | 88 +++++ packages/files/tsconfig.json | 9 + packages/files/vitest.config.browser.ts | 14 + 20 files changed, 1789 insertions(+), 29 deletions(-) create mode 100644 packages/databricks/tests/transport/__screenshots__/http.server.test.ts/newFetchHttpClient--server--streams-a-ReadableStream-body-to-the-server-1.png create mode 100644 packages/databricks/tests/transport/__screenshots__/http.server.test.ts/newFetchHttpClient--server--streams-a-large-body-without-buffering-1.png create mode 100644 packages/databricks/tests/transport/http.server.test.ts create mode 100644 packages/databricks/tests/transport/test-server.global.ts create mode 100644 packages/databricks/vitest.config.ts create mode 100644 packages/files/package.json create mode 100644 packages/files/src/v1/client.ts create mode 100644 packages/files/src/v1/index.ts create mode 100644 packages/files/src/v1/model.ts create mode 100644 packages/files/src/v1/utils.ts create mode 100644 packages/files/tests/tsconfig.json create mode 100644 packages/files/tests/v1/client.test.ts create mode 100644 packages/files/tests/v1/e2e.test.ts create mode 100644 packages/files/tests/v1/utils.test.ts create mode 100644 packages/files/tsconfig.json create mode 100644 packages/files/vitest.config.browser.ts diff --git a/package-lock.json b/package-lock.json index 5d4c2588..3b3d41d2 100644 --- a/package-lock.json +++ b/package-lock.json @@ -152,6 +152,10 @@ "resolved": "packages/featurestore", "link": true }, + "node_modules/@databricks/sdk-files": { + "resolved": "packages/files", + "link": true + }, "node_modules/@databricks/sdk-iam": { "resolved": "packages/iam", "link": true @@ -5263,6 +5267,17 @@ "node": ">=22.0.0" } }, + "packages/files": { + "name": "@databricks/sdk-files", + "version": "0.1.0", + "license": "Apache-2.0", + "dependencies": { + "@databricks/sdk-databricks": "*" + }, + "engines": { + "node": ">=22.0.0" + } + }, "packages/iam": { "name": "@databricks/sdk-iam", "version": "0.1.0", diff --git a/packages/databricks/src/transport/http.ts b/packages/databricks/src/transport/http.ts index 32e1df15..2e0d657c 100644 --- a/packages/databricks/src/transport/http.ts +++ b/packages/databricks/src/transport/http.ts @@ -14,7 +14,7 @@ export interface HttpRequest { headers: Headers; /** The request body. */ - body?: string | ArrayBuffer | Uint8Array | null; + body?: string | ArrayBuffer | Uint8Array | ReadableStream | null; /** An optional signal to abort the request. */ signal?: AbortSignal; @@ -46,12 +46,21 @@ export interface HttpClient { export function newFetchHttpClient(): HttpClient { return { async send(request: HttpRequest): Promise { - const response = await fetch(request.url, { + const init: RequestInit = { method: request.method, headers: request.headers, - ...(request.body !== undefined && {body: request.body}), - ...(request.signal !== undefined && {signal: request.signal}), - }); + }; + if (request.body !== undefined) { + init.body = request.body; + // The Fetch spec requires duplex: 'half' for streaming request bodies. + if (request.body instanceof ReadableStream) { + init.duplex = 'half'; + } + } + if (request.signal !== undefined) { + init.signal = request.signal; + } + const response = await fetch(request.url, init); return { statusCode: response.status, headers: response.headers, diff --git a/packages/databricks/tests/transport/__screenshots__/http.server.test.ts/newFetchHttpClient--server--streams-a-ReadableStream-body-to-the-server-1.png b/packages/databricks/tests/transport/__screenshots__/http.server.test.ts/newFetchHttpClient--server--streams-a-ReadableStream-body-to-the-server-1.png new file mode 100644 index 0000000000000000000000000000000000000000..f2d37de02daabb253a52321432654065d53c4601 GIT binary patch literal 2900 zcmeAS@N?(olHy`uVBq!ia0y~yV4TOmz}&#W1QcnFZJNcvz|HCD;uumf=gk#EK?Vj6 z21WMY;n(Eq1)BEmX3V(Bcp^|*V2PQ6N3O$!TMY_Z86C5jo3602tmNeg8kHCglF`I4 znhi!vhS4%{v^E&6AV-@9qm81W+)k8ZVEF&PCRvsd*fwSZHHw)T-mek(_v3c3A5etB M)78&qol`;+0J6uJ!2kdN literal 0 HcmV?d00001 diff --git a/packages/databricks/tests/transport/__screenshots__/http.server.test.ts/newFetchHttpClient--server--streams-a-large-body-without-buffering-1.png b/packages/databricks/tests/transport/__screenshots__/http.server.test.ts/newFetchHttpClient--server--streams-a-large-body-without-buffering-1.png new file mode 100644 index 0000000000000000000000000000000000000000..f2d37de02daabb253a52321432654065d53c4601 GIT binary patch literal 2900 zcmeAS@N?(olHy`uVBq!ia0y~yV4TOmz}&#W1QcnFZJNcvz|HCD;uumf=gk#EK?Vj6 z21WMY;n(Eq1)BEmX3V(Bcp^|*V2PQ6N3O$!TMY_Z86C5jo3602tmNeg8kHCglF`I4 znhi!vhS4%{v^E&6AV-@9qm81W+)k8ZVEF&PCRvsd*fwSZHHw)T-mek(_v3c3A5etB M)78&qol`;+0J6uJ!2kdN literal 0 HcmV?d00001 diff --git a/packages/databricks/tests/transport/http.server.test.ts b/packages/databricks/tests/transport/http.server.test.ts new file mode 100644 index 00000000..9b13d149 --- /dev/null +++ b/packages/databricks/tests/transport/http.server.test.ts @@ -0,0 +1,232 @@ +/** + * Tests for newFetchHttpClient against a local HTTPS/HTTP2 server. These + * verify actual fetch behavior (body types, streaming, headers, signals) + * without mocking. The server is started by test-server.global.ts; all + * tests run in both Node.js and the browser. + */ + +import {describe, expect, inject, it} from 'vitest'; + +import {newFetchHttpClient} from '../../src/transport/http'; + +declare module 'vitest' { + // eslint-disable-next-line @typescript-eslint/consistent-type-definitions + interface ProvidedContext { + baseUrl: string; + } +} + +/** Creates a ReadableStream that emits data in multiple chunks. */ +function multiChunkStream(chunks: Uint8Array[]): ReadableStream { + let index = 0; + return new ReadableStream({ + pull(controller): void { + if (index >= chunks.length) { + controller.close(); + return; + } + controller.enqueue(chunks[index]); + index++; + }, + }); +} + +/** Reads a ReadableStream into a single Uint8Array. */ +async function readAll(body: ReadableStream): Promise { + const reader = body.getReader(); + const chunks: Uint8Array[] = []; + for (;;) { + const {done, value} = await reader.read(); + if (done) break; + chunks.push(value); + } + const totalLength = chunks.reduce((acc, c) => acc + c.length, 0); + const result = new Uint8Array(totalLength); + let offset = 0; + for (const c of chunks) { + result.set(c, offset); + offset += c.length; + } + return result; +} + +describe('newFetchHttpClient (server)', () => { + const baseUrl = inject('baseUrl'); + + it('sends and receives a JSON body', async () => { + const requestBody = '{"key":"value"}'; + + const client = newFetchHttpClient(); + const resp = await client.send({ + url: `${baseUrl}/json`, + method: 'POST', + headers: new Headers({'Content-Type': 'application/json'}), + body: requestBody, + }); + + expect(resp.statusCode).toBe(200); + expect(resp.headers.get('content-type')).toBe('application/json'); + + const json = JSON.parse(await new Response(resp.body).text()) as Record< + string, + string + >; + expect(json.receivedMethod).toBe('POST'); + expect(json.receivedContentType).toBe('application/json'); + expect(json.receivedBody).toBe(requestBody); + }); + + it('sends a Uint8Array body', async () => { + const data = new Uint8Array([0x00, 0x01, 0x02, 0xff]); + + const client = newFetchHttpClient(); + const resp = await client.send({ + url: `${baseUrl}/bytes`, + method: 'PUT', + headers: new Headers({'Content-Type': 'application/octet-stream'}), + body: data, + }); + + expect(resp.statusCode).toBe(200); + + const json = JSON.parse(await new Response(resp.body).text()) as { + length: number; + bytes: number[]; + }; + expect(json.length).toBe(4); + expect(json.bytes).toEqual([0x00, 0x01, 0x02, 0xff]); + }); + + it('streams a ReadableStream body to the server', async () => { + const chunk1 = new TextEncoder().encode('hello '); + const chunk2 = new TextEncoder().encode('world'); + const stream = multiChunkStream([chunk1, chunk2]); + + const client = newFetchHttpClient(); + const resp = await client.send({ + url: `${baseUrl}/stream-upload`, + method: 'PUT', + headers: new Headers({'Content-Type': 'application/octet-stream'}), + body: stream, + }); + + expect(resp.statusCode).toBe(200); + const text = await new Response(resp.body).text(); + expect(text).toBe('hello world'); + }); + + it('streams a large body without buffering', async () => { + // 2 MiB in 64 KiB chunks. + const chunkSize = 64 * 1024; + const chunkCount = 32; + const totalSize = chunkSize * chunkCount; + const chunks: Uint8Array[] = []; + for (let i = 0; i < chunkCount; i++) { + const chunk = new Uint8Array(chunkSize); + chunk.fill(i % 256); + chunks.push(chunk); + } + const stream = multiChunkStream(chunks); + + const client = newFetchHttpClient(); + const resp = await client.send({ + url: `${baseUrl}/stream-large`, + method: 'PUT', + headers: new Headers({'Content-Type': 'application/octet-stream'}), + body: stream, + }); + + expect(resp.statusCode).toBe(200); + expect(resp.headers.get('content-length')).toBe(String(totalSize)); + + expect(resp.body).not.toBeNull(); + const body = await readAll(resp.body); + expect(body.length).toBe(totalSize); + + for (let i = 0; i < chunkCount; i++) { + const offset = i * chunkSize; + expect(body[offset]).toBe(i % 256); + } + }); + + it('receives a streaming response body', async () => { + const client = newFetchHttpClient(); + const resp = await client.send({ + url: `${baseUrl}/stream-download`, + method: 'GET', + headers: new Headers(), + }); + + expect(resp.statusCode).toBe(200); + expect(resp.headers.get('content-type')).toBe('text/plain'); + expect(resp.headers.get('x-custom')).toBe('test-value'); + + const text = await new Response(resp.body).text(); + expect(text).toBe('chunk1chunk2chunk3'); + }); + + it('sends no body for GET requests', async () => { + const client = newFetchHttpClient(); + const resp = await client.send({ + url: `${baseUrl}/no-body`, + method: 'GET', + headers: new Headers(), + }); + + expect(resp.statusCode).toBe(200); + + const json = JSON.parse(await new Response(resp.body).text()) as { + method: string; + bodyLength: number; + }; + expect(json.method).toBe('GET'); + expect(json.bodyLength).toBe(0); + }); + + it('passes request headers to the server', async () => { + const client = newFetchHttpClient(); + const resp = await client.send({ + url: `${baseUrl}/headers`, + method: 'GET', + headers: new Headers({ + Accept: 'application/octet-stream', + 'X-Custom-Header': 'my-value', + }), + }); + + const json = JSON.parse(await new Response(resp.body).text()) as Record< + string, + string + >; + expect(json.accept).toBe('application/octet-stream'); + expect(json.custom).toBe('my-value'); + }); + + it('returns error status codes without throwing', async () => { + const client = newFetchHttpClient(); + const resp = await client.send({ + url: `${baseUrl}/error`, + method: 'GET', + headers: new Headers(), + }); + + expect(resp.statusCode).toBe(500); + const text = await new Response(resp.body).text(); + expect(text).toBe('{"error":"internal"}'); + }); + + it('respects AbortSignal cancellation', async () => { + const client = newFetchHttpClient(); + const controller = new AbortController(); + controller.abort(); + + await expect( + client.send({ + url: `${baseUrl}/slow`, + method: 'GET', + headers: new Headers(), + signal: controller.signal, + }) + ).rejects.toThrow(); + }); +}); diff --git a/packages/databricks/tests/transport/http.test.ts b/packages/databricks/tests/transport/http.test.ts index 5bef6736..b6d083e7 100644 --- a/packages/databricks/tests/transport/http.test.ts +++ b/packages/databricks/tests/transport/http.test.ts @@ -2,43 +2,261 @@ import {describe, expect, it, vi} from 'vitest'; import type {Credentials, Header} from '@databricks/sdk-auth'; -import type {HttpClient} from '../../src/transport/http'; +import type {HttpClient, HttpRequest} from '../../src/transport/http'; import {newFetchHttpClient, newHttpClient} from '../../src/transport/http'; -describe('newFetchHttpClient', () => { - it('sends a request using fetch and returns the response', async () => { - const mockFetch = vi.fn().mockResolvedValue( - new Response('hello', { - status: 201, - headers: {'X-Request-Id': 'abc123'}, - }) - ); - vi.stubGlobal('fetch', mockFetch); +// Helper to build a ReadableStream from a string. +function streamFrom(text: string): ReadableStream { + const data = new TextEncoder().encode(text); + return new ReadableStream({ + start(controller): void { + controller.enqueue(data); + controller.close(); + }, + }); +} - try { - const client = newFetchHttpClient(); - const response = await client.send({ - url: 'https://example.com/api/resource', +describe('newFetchHttpClient', () => { + // Table-driven tests for the body / duplex / signal mapping. + const bodyCases: { + desc: string; + request: HttpRequest; + wantBodyType: + | 'string' + | 'Uint8Array' + | 'ArrayBuffer' + | 'ReadableStream' + | 'null' + | 'undefined'; + wantDuplex: 'half' | undefined; + wantSignal: boolean; + }[] = [ + { + desc: 'string body — no duplex', + request: { + url: 'https://example.com/api', method: 'POST', headers: new Headers({'Content-Type': 'application/json'}), body: '{"key":"value"}', - }); + }, + wantBodyType: 'string', + wantDuplex: undefined, + wantSignal: false, + }, + { + desc: 'Uint8Array body — no duplex', + request: { + url: 'https://example.com/api', + method: 'PUT', + headers: new Headers(), + body: new Uint8Array([1, 2, 3]), + }, + wantBodyType: 'Uint8Array', + wantDuplex: undefined, + wantSignal: false, + }, + { + desc: 'ArrayBuffer body — no duplex', + request: { + url: 'https://example.com/api', + method: 'PUT', + headers: new Headers(), + body: new ArrayBuffer(4), + }, + wantBodyType: 'ArrayBuffer', + wantDuplex: undefined, + wantSignal: false, + }, + { + desc: 'null body — no duplex', + request: { + url: 'https://example.com/api', + method: 'DELETE', + headers: new Headers(), + body: null, + }, + wantBodyType: 'null', + wantDuplex: undefined, + wantSignal: false, + }, + { + desc: 'undefined body — omitted from init', + request: { + url: 'https://example.com/api', + method: 'GET', + headers: new Headers(), + }, + wantBodyType: 'undefined', + wantDuplex: undefined, + wantSignal: false, + }, + { + desc: 'ReadableStream body — sets duplex to half', + request: { + url: 'https://example.com/upload', + method: 'PUT', + headers: new Headers({'Content-Type': 'application/octet-stream'}), + body: streamFrom('streamed data'), + }, + wantBodyType: 'ReadableStream', + wantDuplex: 'half', + wantSignal: false, + }, + { + desc: 'with AbortSignal — passes signal through', + request: { + url: 'https://example.com/api', + method: 'GET', + headers: new Headers(), + signal: AbortSignal.timeout(5000), + }, + wantBodyType: 'undefined', + wantDuplex: undefined, + wantSignal: true, + }, + { + desc: 'ReadableStream body with signal — sets both duplex and signal', + request: { + url: 'https://example.com/upload', + method: 'PUT', + headers: new Headers(), + body: streamFrom('data'), + signal: AbortSignal.timeout(5000), + }, + wantBodyType: 'ReadableStream', + wantDuplex: 'half', + wantSignal: true, + }, + ]; - expect(response.statusCode).toBe(201); - expect(response.headers.get('X-Request-Id')).toBe('abc123'); + it.each(bodyCases)('$desc', async tc => { + const mockFetch = vi + .fn() + .mockResolvedValue(new Response('ok', {status: 200})); + vi.stubGlobal('fetch', mockFetch); - // Read the body stream to verify content. - const text = await new Response(response.body).text(); - expect(text).toBe('hello'); + try { + const client = newFetchHttpClient(); + await client.send(tc.request); - // Verify fetch was called with the correct parameters. expect(mockFetch).toHaveBeenCalledOnce(); const call = mockFetch.mock.calls[0]; expect(call).toBeDefined(); const [url, init] = call; - expect(url).toBe('https://example.com/api/resource'); - expect(init?.method).toBe('POST'); - expect(init?.body).toBe('{"key":"value"}'); + expect(url).toBe(tc.request.url); + expect(init?.method).toBe(tc.request.method); + + // Verify body was passed through with the correct type. + const body = init?.body; + switch (tc.wantBodyType) { + case 'string': + expect(typeof body).toBe('string'); + expect(body).toBe(tc.request.body); + break; + case 'Uint8Array': + expect(body).toBeInstanceOf(Uint8Array); + expect(body).toBe(tc.request.body); + break; + case 'ArrayBuffer': + expect(body).toBeInstanceOf(ArrayBuffer); + expect(body).toBe(tc.request.body); + break; + case 'ReadableStream': + expect(body).toBeInstanceOf(ReadableStream); + expect(body).toBe(tc.request.body); + break; + case 'null': + expect(body).toBeNull(); + break; + case 'undefined': + expect(body).toBeUndefined(); + break; + } + + // Verify duplex. + expect(init?.duplex).toBe(tc.wantDuplex); + + // Verify signal. + if (tc.wantSignal) { + expect(init?.signal).toBeInstanceOf(AbortSignal); + } else { + expect(init?.signal).toBeUndefined(); + } + } finally { + vi.unstubAllGlobals(); + } + }); + + // Table-driven tests for response mapping. + const responseCases: { + desc: string; + fetchResponse: Response; + wantStatus: number; + wantHeader: {key: string; value: string} | undefined; + wantBody: string; + }[] = [ + { + desc: 'maps 200 with body and headers', + fetchResponse: new Response('hello', { + status: 200, + headers: {'X-Request-Id': 'abc123'}, + }), + wantStatus: 200, + wantHeader: {key: 'X-Request-Id', value: 'abc123'}, + wantBody: 'hello', + }, + { + desc: 'maps 201 created', + fetchResponse: new Response('created', {status: 201}), + wantStatus: 201, + wantHeader: undefined, + wantBody: 'created', + }, + { + desc: 'maps 204 no content', + fetchResponse: new Response(null, {status: 204}), + wantStatus: 204, + wantHeader: undefined, + wantBody: '', + }, + { + desc: 'maps 404 error', + fetchResponse: new Response('not found', {status: 404}), + wantStatus: 404, + wantHeader: undefined, + wantBody: 'not found', + }, + { + desc: 'maps 500 error', + fetchResponse: new Response('internal error', {status: 500}), + wantStatus: 500, + wantHeader: undefined, + wantBody: 'internal error', + }, + ]; + + it.each(responseCases)('$desc', async tc => { + const mockFetch = vi.fn().mockResolvedValue(tc.fetchResponse); + vi.stubGlobal('fetch', mockFetch); + + try { + const client = newFetchHttpClient(); + const response = await client.send({ + url: 'https://example.com/api', + method: 'GET', + headers: new Headers(), + }); + + expect(response.statusCode).toBe(tc.wantStatus); + + if (tc.wantHeader !== undefined) { + expect(response.headers.get(tc.wantHeader.key)).toBe( + tc.wantHeader.value + ); + } + + const text = await new Response(response.body).text(); + expect(text).toBe(tc.wantBody); } finally { vi.unstubAllGlobals(); } diff --git a/packages/databricks/tests/transport/test-server.global.ts b/packages/databricks/tests/transport/test-server.global.ts new file mode 100644 index 00000000..08ce8c7f --- /dev/null +++ b/packages/databricks/tests/transport/test-server.global.ts @@ -0,0 +1,220 @@ +/** + * Vitest globalSetup that starts a local HTTPS/HTTP2 server for transport + * tests. Uses a self-signed certificate so that streaming request bodies + * (ReadableStream) work in Chromium, which requires HTTP/2. The browser + * config passes --ignore-certificate-errors to Chromium to accept the + * self-signed cert. The server URL is provided to tests via + * inject('baseUrl'). + */ + +import {execSync} from 'node:child_process'; +import {mkdtempSync, readFileSync, rmSync} from 'node:fs'; +import type { + Http2SecureServer, + Http2ServerRequest, + Http2ServerResponse, +} from 'node:http2'; +import {createSecureServer} from 'node:http2'; +import {tmpdir} from 'node:os'; +import {join} from 'node:path'; +import type {GlobalSetupContext} from 'vitest/node'; + +/** Collects the full request body into a Uint8Array. */ +async function collectBody(req: Http2ServerRequest): Promise { + const chunks: Buffer[] = []; + for await (const chunk of req) { + chunks.push(chunk as Buffer); + } + return Buffer.concat(chunks); +} + +/** Adds CORS headers so browser-based tests can reach the server. */ +function setCors(res: Http2ServerResponse): void { + res.setHeader('Access-Control-Allow-Origin', '*'); + res.setHeader('Access-Control-Allow-Methods', '*'); + res.setHeader('Access-Control-Allow-Headers', '*'); + res.setHeader('Access-Control-Expose-Headers', '*'); +} + +/** Generates a self-signed certificate in a temp directory. */ +function generateCert(): {key: Buffer; cert: Buffer; tmpDir: string} { + const tmpDir = mkdtempSync(join(tmpdir(), 'test-server-')); + const keyPath = join(tmpDir, 'key.pem'); + const certPath = join(tmpDir, 'cert.pem'); + execSync( + `openssl req -x509 -newkey ec -pkeyopt ec_paramgen_curve:prime256v1 ` + + `-keyout ${keyPath} -out ${certPath} -days 1 -nodes ` + + `-subj "/CN=localhost" -addext "subjectAltName=IP:127.0.0.1" 2>/dev/null` + ); + return { + key: readFileSync(keyPath), + cert: readFileSync(certPath), + tmpDir, + }; +} + +let server: Http2SecureServer | undefined; +let certTmpDir: string | undefined; + +export async function setup({provide}: GlobalSetupContext): Promise { + // Allow Node.js fetch to connect to the self-signed certificate. + process.env.NODE_TLS_REJECT_UNAUTHORIZED = '0'; + + const {key, cert, tmpDir} = generateCert(); + certTmpDir = tmpDir; + + const srv = createSecureServer( + {key, cert, allowHTTP1: true}, + (req: Http2ServerRequest, res: Http2ServerResponse) => { + setCors(res); + + if (req.method === 'OPTIONS') { + res.writeHead(204); + res.end(); + return; + } + + void handleRoute(req.url, req, res); + } + ); + server = srv; + + await new Promise(resolve => { + srv.listen(0, '127.0.0.1', () => { + resolve(); + }); + }); + + const addr = srv.address(); + if (addr === null || typeof addr === 'string') { + throw new Error('Failed to get server address.'); + } + const baseUrl = `https://127.0.0.1:${String(addr.port)}`; + provide('baseUrl', baseUrl); +} + +export async function teardown(): Promise { + const srv = server; + if (srv !== undefined) { + await new Promise((resolve, reject) => { + srv.close(err => { + if (err !== undefined) { + reject(err); + } else { + resolve(); + } + }); + }); + } + if (certTmpDir !== undefined) { + rmSync(certTmpDir, {recursive: true, force: true}); + } +} + +/** + * Routes requests to the appropriate handler. Each path corresponds to a + * test case. + */ +async function handleRoute( + url: string, + req: Http2ServerRequest, + res: Http2ServerResponse +): Promise { + switch (url) { + case '/json': { + const body = await collectBody(req); + res.writeHead(200, {'Content-Type': 'application/json'}); + res.end( + JSON.stringify({ + receivedMethod: req.method, + receivedContentType: req.headers['content-type'], + receivedBody: Buffer.from(body).toString(), + }) + ); + return; + } + + case '/bytes': { + const body = await collectBody(req); + res.writeHead(200, {'Content-Type': 'application/json'}); + res.end( + JSON.stringify({ + length: body.length, + bytes: Array.from(body), + }) + ); + return; + } + + case '/stream-upload': { + const body = await collectBody(req); + res.writeHead(200, {'Content-Type': 'text/plain'}); + res.end(Buffer.from(body).toString()); + return; + } + + case '/stream-large': { + const body = await collectBody(req); + res.writeHead(200, { + 'Content-Type': 'application/octet-stream', + 'Content-Length': String(body.length), + }); + res.end(body); + return; + } + + case '/stream-download': { + res.writeHead(200, { + 'Content-Type': 'text/plain', + 'X-Custom': 'test-value', + }); + res.write('chunk1'); + res.write('chunk2'); + res.write('chunk3'); + res.end(); + return; + } + + case '/no-body': { + const body = await collectBody(req); + res.writeHead(200, {'Content-Type': 'application/json'}); + res.end( + JSON.stringify({ + method: req.method, + bodyLength: body.length, + }) + ); + return; + } + + case '/headers': { + res.writeHead(200, {'Content-Type': 'application/json'}); + res.end( + JSON.stringify({ + accept: req.headers.accept, + custom: req.headers['x-custom-header'], + }) + ); + return; + } + + case '/error': { + res.writeHead(500, {'Content-Type': 'application/json'}); + res.end('{"error":"internal"}'); + return; + } + + case '/slow': { + setTimeout(() => { + res.writeHead(200); + res.end('too late'); + }, 5000); + return; + } + + default: { + res.writeHead(404); + res.end('not found'); + } + } +} diff --git a/packages/databricks/vitest.config.browser.ts b/packages/databricks/vitest.config.browser.ts index 737359f4..45295971 100644 --- a/packages/databricks/vitest.config.browser.ts +++ b/packages/databricks/vitest.config.browser.ts @@ -7,7 +7,14 @@ export default defineConfig({ name: 'chromium', provider: 'playwright', headless: true, + providerOptions: { + launch: { + // Accept the self-signed cert from the HTTPS/HTTP2 test server. + args: ['--ignore-certificate-errors'], + }, + }, }, include: ['tests/**/*.test.ts'], + globalSetup: ['tests/transport/test-server.global.ts'], }, }); diff --git a/packages/databricks/vitest.config.ts b/packages/databricks/vitest.config.ts new file mode 100644 index 00000000..aa0be35f --- /dev/null +++ b/packages/databricks/vitest.config.ts @@ -0,0 +1,8 @@ +import {defineConfig} from 'vitest/config'; + +export default defineConfig({ + test: { + include: ['tests/**/*.test.ts'], + globalSetup: ['tests/transport/test-server.global.ts'], + }, +}); diff --git a/packages/files/package.json b/packages/files/package.json new file mode 100644 index 00000000..99c3ae7f --- /dev/null +++ b/packages/files/package.json @@ -0,0 +1,35 @@ +{ + "name": "@databricks/sdk-files", + "version": "0.1.0", + "description": "Databricks Files service client", + "type": "module", + "exports": { + "./v1": { + "types": "./dist/v1/index.d.ts", + "import": "./dist/v1/index.js" + } + }, + "files": [ + "dist", + "src" + ], + "scripts": { + "build": "tsc", + "lint": "eslint src tests --ext .ts", + "lint:fix": "eslint src tests --ext .ts --fix", + "format": "prettier --write \"src/**/*.ts\" \"tests/**/*.ts\"", + "format:check": "prettier --check \"src/**/*.ts\" \"tests/**/*.ts\"", + "test": "vitest run", + "test:browser": "vitest run --config vitest.config.browser.ts", + "typecheck": "tsc --noEmit", + "clean": "rm -rf dist" + }, + "author": "Databricks", + "license": "Apache-2.0", + "dependencies": { + "@databricks/sdk-databricks": "*" + }, + "engines": { + "node": ">=22.0.0" + } +} diff --git a/packages/files/src/v1/client.ts b/packages/files/src/v1/client.ts new file mode 100644 index 00000000..3a0bd57d --- /dev/null +++ b/packages/files/src/v1/client.ts @@ -0,0 +1,135 @@ +/** + * Files service client for the Databricks SDK. + */ + +import type {Call, Options} from '@databricks/sdk-databricks/api'; +import {execute} from '@databricks/sdk-databricks/api'; +import type {Logger} from '@databricks/sdk-databricks/logger'; +import {NoOpLogger} from '@databricks/sdk-databricks/logger'; +import type {ClientOptions} from '@databricks/sdk-databricks/options'; +import type { + HttpClient, + HttpRequest, +} from '@databricks/sdk-databricks/transport'; +import {newHttpClient} from '@databricks/sdk-databricks/transport'; + +import type {DownloadRequest, DownloadResponse, UploadRequest} from './model'; +import {encodeFilePath, sendAndCheckError} from './utils'; + +export class Client { + private readonly host: string; + private readonly httpClient: HttpClient; + private readonly logger: Logger; + + constructor(options: ClientOptions) { + if (options.host === undefined) { + throw new Error('Host is required.'); + } + this.host = options.host.replace(/\/$/, ''); + this.logger = options.logger ?? new NoOpLogger(); + this.httpClient = newHttpClient(options); + } + + /** + * Uploads a file to the specified path in the Databricks workspace. + * + * Because the request body is a ReadableStream which can only be consumed + * once, this method does not retry on failure. If the upload fails the + * caller must construct a new ReadableStream and call upload again. + */ + async upload( + signal: AbortSignal | undefined, + req: UploadRequest + ): Promise { + const encodedPath = encodeFilePath(req.filePath); + const url = new URL(`${this.host}/api/2.0/fs/files/${encodedPath}`); + if (req.overwrite === true) { + url.searchParams.set('overwrite', 'true'); + } + + const headers = new Headers(); + headers.set('Content-Type', 'application/octet-stream'); + + const httpReq: HttpRequest = { + url: url.toString(), + method: 'PUT', + headers, + body: req.contents, + }; + if (signal !== undefined) { + httpReq.signal = signal; + } + + await sendAndCheckError({ + request: httpReq, + httpClient: this.httpClient, + logger: this.logger, + }); + } + + /** + * Downloads a file from the specified path in the Databricks workspace. + * + * The response contains a ReadableStream with the file contents. The caller + * is responsible for consuming or cancelling the stream. + */ + async download( + signal: AbortSignal | undefined, + req: DownloadRequest, + options?: Options + ): Promise { + const encodedPath = encodeFilePath(req.filePath); + const url = `${this.host}/api/2.0/fs/files/${encodedPath}`; + + let result: DownloadResponse | undefined; + const call: Call = async (callSignal?: AbortSignal): Promise => { + const headers = new Headers(); + headers.set('Accept', 'application/octet-stream'); + + const httpReq: HttpRequest = { + url, + method: 'GET', + headers, + }; + if (callSignal !== undefined) { + httpReq.signal = callSignal; + } + + const httpResp = await sendAndCheckError({ + request: httpReq, + httpClient: this.httpClient, + logger: this.logger, + }); + + const contentLengthHeader = httpResp.headers.get('content-length'); + const contentTypeHeader = httpResp.headers.get('content-type'); + const lastModifiedHeader = httpResp.headers.get('last-modified'); + + result = { + contents: + httpResp.body ?? + new ReadableStream({ + start(controller): void { + controller.close(); + }, + }), + ...(contentLengthHeader !== null && { + contentLength: parseInt(contentLengthHeader, 10), + }), + ...(contentTypeHeader !== null && { + contentType: contentTypeHeader, + }), + ...(lastModifiedHeader !== null && { + lastModified: lastModifiedHeader, + }), + }; + }; + + await execute(signal, call, options); + + if (result === undefined) { + throw new Error('API call completed without a result.'); + } + return result; + } +} diff --git a/packages/files/src/v1/index.ts b/packages/files/src/v1/index.ts new file mode 100644 index 00000000..d1bc263b --- /dev/null +++ b/packages/files/src/v1/index.ts @@ -0,0 +1,3 @@ +export {Client} from './client'; + +export type {DownloadRequest, DownloadResponse, UploadRequest} from './model'; diff --git a/packages/files/src/v1/model.ts b/packages/files/src/v1/model.ts new file mode 100644 index 00000000..9be7f437 --- /dev/null +++ b/packages/files/src/v1/model.ts @@ -0,0 +1,39 @@ +/** + * Request and response types for the Databricks Files service. + */ + +/** Request to upload a file to the Databricks workspace. */ +export interface UploadRequest { + /** The absolute path of the file in the workspace. */ + filePath: string; + + /** The file contents as a readable stream of bytes. */ + contents: ReadableStream; + + /** When true, overwrites the file if it already exists. */ + overwrite?: boolean; +} + +/** Request to download a file from the Databricks workspace. */ +export interface DownloadRequest { + /** The absolute path of the file in the workspace. */ + filePath: string; +} + +/** Response from a file download operation. */ +export interface DownloadResponse { + /** + * The file contents as a readable stream of bytes. The caller is + * responsible for consuming or cancelling this stream. + */ + contents: ReadableStream; + + /** The size of the file in bytes, if provided by the server. */ + contentLength?: number; + + /** The MIME type of the file, if provided by the server. */ + contentType?: string; + + /** The last-modified timestamp, if provided by the server. */ + lastModified?: string; +} diff --git a/packages/files/src/v1/utils.ts b/packages/files/src/v1/utils.ts new file mode 100644 index 00000000..f8154e45 --- /dev/null +++ b/packages/files/src/v1/utils.ts @@ -0,0 +1,96 @@ +/** + * Internal utilities for the Files service client. + */ + +import {APIError} from '@databricks/sdk-databricks/apierror'; +import type {Logger} from '@databricks/sdk-databricks/logger'; +import type { + HttpClient, + HttpRequest, + HttpResponse, +} from '@databricks/sdk-databricks/transport'; + +export interface HttpCallOptions { + readonly request: HttpRequest; + readonly httpClient: HttpClient; + readonly logger: Logger; +} + +/** + * Reads a response body stream into a single Uint8Array. Only used for error + * responses where we need to buffer the JSON body to parse an APIError. + */ +export async function readAll( + body: ReadableStream | null +): Promise { + if (body === null) { + return new Uint8Array(0); + } + const reader = body.getReader(); + const chunks: Uint8Array[] = []; + for (;;) { + const {done, value} = await reader.read(); + if (done) { + break; + } + chunks.push(value); + } + const totalLength = chunks.reduce((acc, chunk) => acc + chunk.length, 0); + const result = new Uint8Array(totalLength); + let offset = 0; + for (const chunk of chunks) { + result.set(chunk, offset); + offset += chunk.length; + } + return result; +} + +/** + * Encodes a file path for use in the Files API URL. Each path segment is + * individually percent-encoded while preserving the "/" separators. + */ +export function encodeFilePath(filePath: string): string { + // Strip leading slash to avoid an empty first segment. + const stripped = filePath.startsWith('/') ? filePath.slice(1) : filePath; + return stripped + .split('/') + .map(segment => encodeURIComponent(segment)) + .join('/'); +} + +/** + * Sends an HTTP request and checks for API errors. On non-2xx responses the + * body is buffered and parsed into an APIError. On 2xx the raw HttpResponse + * is returned with the body stream untouched. + */ +export async function sendAndCheckError( + opts: HttpCallOptions +): Promise { + opts.logger.debug('HTTP request', { + method: opts.request.method, + url: opts.request.url, + }); + + let resp: HttpResponse; + try { + resp = await opts.httpClient.send(opts.request); + } catch (e: unknown) { + opts.logger.debug('HTTP request failed'); + throw e; + } + + opts.logger.debug('HTTP response', {statusCode: resp.statusCode}); + + // On error responses, buffer the body and throw an APIError. + if (resp.statusCode < 200 || resp.statusCode >= 300) { + const body = await readAll(resp.body); + const apiErr = APIError.fromHttpError(resp.statusCode, resp.headers, body); + if (apiErr !== undefined) { + throw apiErr; + } + // Fallback if fromHttpError returns undefined for an unknown status. + throw new Error(`unexpected HTTP status ${String(resp.statusCode)}`); + } + + return resp; +} diff --git a/packages/files/tests/tsconfig.json b/packages/files/tests/tsconfig.json new file mode 100644 index 00000000..211186ad --- /dev/null +++ b/packages/files/tests/tsconfig.json @@ -0,0 +1,9 @@ +{ + "extends": "../../../tsconfig.base.json", + "compilerOptions": { + "noEmit": true, + "rootDir": ".." + }, + "include": [".", "../src"], + "exclude": ["../dist", "../node_modules"] +} diff --git a/packages/files/tests/v1/client.test.ts b/packages/files/tests/v1/client.test.ts new file mode 100644 index 00000000..f97b2164 --- /dev/null +++ b/packages/files/tests/v1/client.test.ts @@ -0,0 +1,341 @@ +import {describe, expect, it} from 'vitest'; + +import type { + HttpClient, + HttpRequest, + HttpResponse, +} from '@databricks/sdk-databricks/transport'; + +import {Client} from '../../src/v1/client'; +import {readAll} from '../../src/v1/utils'; + +/** Creates a ReadableStream from a string. */ +function streamFrom(text: string): ReadableStream { + const data = new TextEncoder().encode(text); + return new ReadableStream({ + start(controller): void { + controller.enqueue(data); + controller.close(); + }, + }); +} + +/** Creates a mock HttpClient that records the request and returns a canned response. */ +function mockClient(response: HttpResponse): { + client: HttpClient; + lastRequest: () => HttpRequest; +} { + let captured: HttpRequest | undefined; + const client: HttpClient = { + send(request: HttpRequest): Promise { + captured = request; + return Promise.resolve(response); + }, + }; + return { + client, + lastRequest(): HttpRequest { + if (captured === undefined) { + throw new Error('No request was captured.'); + } + return captured; + }, + }; +} + +/** Creates a mock HttpClient that returns a JSON error response. */ +function errorClient(statusCode: number, errorBody: object): HttpClient { + return { + send(): Promise { + const body = new TextEncoder().encode(JSON.stringify(errorBody)); + return Promise.resolve({ + statusCode, + headers: new Headers({'content-type': 'application/json'}), + body: new ReadableStream({ + start(controller): void { + controller.enqueue(body); + controller.close(); + }, + }), + }); + }, + }; +} + +describe('Client.upload', () => { + it('sends PUT to the correct URL with encoded path', async () => { + const {client, lastRequest} = mockClient({ + statusCode: 204, + headers: new Headers(), + body: null, + }); + const files = new Client({ + host: 'https://example.com', + httpClient: client, + }); + + await files.upload(undefined, { + filePath: '/Volumes/catalog/schema/file.txt', + contents: streamFrom('data'), + }); + + const req = lastRequest(); + expect(req.method).toBe('PUT'); + expect(req.url).toBe( + 'https://example.com/api/2.0/fs/files/Volumes/catalog/schema/file.txt' + ); + expect(req.headers.get('content-type')).toBe('application/octet-stream'); + }); + + it('includes overwrite query parameter when set', async () => { + const {client, lastRequest} = mockClient({ + statusCode: 204, + headers: new Headers(), + body: null, + }); + const files = new Client({ + host: 'https://example.com', + httpClient: client, + }); + + await files.upload(undefined, { + filePath: '/path/to/file.txt', + contents: streamFrom('data'), + overwrite: true, + }); + + const req = lastRequest(); + expect(req.url).toContain('overwrite=true'); + }); + + it('does not include overwrite query parameter when false', async () => { + const {client, lastRequest} = mockClient({ + statusCode: 204, + headers: new Headers(), + body: null, + }); + const files = new Client({ + host: 'https://example.com', + httpClient: client, + }); + + await files.upload(undefined, { + filePath: '/path/to/file.txt', + contents: streamFrom('data'), + overwrite: false, + }); + + const req = lastRequest(); + expect(req.url).not.toContain('overwrite'); + }); + + it('passes the ReadableStream as the request body', async () => { + const {client, lastRequest} = mockClient({ + statusCode: 204, + headers: new Headers(), + body: null, + }); + const files = new Client({ + host: 'https://example.com', + httpClient: client, + }); + + const contents = streamFrom('file contents'); + await files.upload(undefined, { + filePath: '/path/to/file.txt', + contents, + }); + + const req = lastRequest(); + expect(req.body).toBe(contents); + }); + + it('passes the AbortSignal through', async () => { + const {client, lastRequest} = mockClient({ + statusCode: 204, + headers: new Headers(), + body: null, + }); + const files = new Client({ + host: 'https://example.com', + httpClient: client, + }); + + const controller = new AbortController(); + await files.upload(controller.signal, { + filePath: '/path/to/file.txt', + contents: streamFrom('data'), + }); + + const req = lastRequest(); + expect(req.signal).toBe(controller.signal); + }); + + it('throws APIError on error response', async () => { + const client = errorClient(404, { + error_code: 'NOT_FOUND', + message: 'File not found', + }); + const files = new Client({ + host: 'https://example.com', + httpClient: client, + }); + + await expect( + files.upload(undefined, { + filePath: '/nonexistent/file.txt', + contents: streamFrom('data'), + }) + ).rejects.toThrow('File not found'); + }); + + it('encodes special characters in file path', async () => { + const {client, lastRequest} = mockClient({ + statusCode: 204, + headers: new Headers(), + body: null, + }); + const files = new Client({ + host: 'https://example.com', + httpClient: client, + }); + + await files.upload(undefined, { + filePath: '/path/with spaces/file?.txt', + contents: streamFrom('data'), + }); + + const req = lastRequest(); + expect(req.url).toContain('path/with%20spaces/file%3F.txt'); + }); + + it('strips trailing slash from host', async () => { + const {client, lastRequest} = mockClient({ + statusCode: 204, + headers: new Headers(), + body: null, + }); + const files = new Client({ + host: 'https://example.com/', + httpClient: client, + }); + + await files.upload(undefined, { + filePath: '/file.txt', + contents: streamFrom('data'), + }); + + const req = lastRequest(); + expect(req.url).toMatch(/^https:\/\/example\.com\/api/); + }); +}); + +describe('Client.download', () => { + it('sends GET to the correct URL', async () => { + const {client, lastRequest} = mockClient({ + statusCode: 200, + headers: new Headers({ + 'content-type': 'application/octet-stream', + 'content-length': '11', + }), + body: streamFrom('hello world'), + }); + const files = new Client({ + host: 'https://example.com', + httpClient: client, + }); + + const resp = await files.download(undefined, { + filePath: '/Volumes/catalog/schema/file.txt', + }); + + const req = lastRequest(); + expect(req.method).toBe('GET'); + expect(req.url).toBe( + 'https://example.com/api/2.0/fs/files/Volumes/catalog/schema/file.txt' + ); + expect(req.headers.get('accept')).toBe('application/octet-stream'); + + const body = await readAll(resp.contents); + expect(new TextDecoder().decode(body)).toBe('hello world'); + }); + + it('extracts response headers', async () => { + const {client} = mockClient({ + statusCode: 200, + headers: new Headers({ + 'content-type': 'text/plain', + 'content-length': '5', + 'last-modified': 'Tue, 01 Jan 2025 00:00:00 GMT', + }), + body: streamFrom('hello'), + }); + const files = new Client({ + host: 'https://example.com', + httpClient: client, + }); + + const resp = await files.download(undefined, { + filePath: '/file.txt', + }); + + expect(resp.contentType).toBe('text/plain'); + expect(resp.contentLength).toBe(5); + expect(resp.lastModified).toBe('Tue, 01 Jan 2025 00:00:00 GMT'); + }); + + it('omits headers when not present', async () => { + const {client} = mockClient({ + statusCode: 200, + headers: new Headers(), + body: streamFrom('data'), + }); + const files = new Client({ + host: 'https://example.com', + httpClient: client, + }); + + const resp = await files.download(undefined, { + filePath: '/file.txt', + }); + + expect(resp.contentType).toBeUndefined(); + expect(resp.contentLength).toBeUndefined(); + expect(resp.lastModified).toBeUndefined(); + }); + + it('throws APIError on error response', async () => { + const client = errorClient(404, { + error_code: 'NOT_FOUND', + message: 'File not found', + }); + const files = new Client({ + host: 'https://example.com', + httpClient: client, + }); + + await expect( + files.download(undefined, {filePath: '/nonexistent.txt'}) + ).rejects.toThrow('File not found'); + }); + + it('handles null response body gracefully', async () => { + const {client} = mockClient({ + statusCode: 200, + headers: new Headers(), + body: null, + }); + const files = new Client({ + host: 'https://example.com', + httpClient: client, + }); + + const resp = await files.download(undefined, { + filePath: '/empty.txt', + }); + + // Should return an empty stream, not null. + const body = await readAll(resp.contents); + expect(body).toEqual(new Uint8Array(0)); + }); +}); diff --git a/packages/files/tests/v1/e2e.test.ts b/packages/files/tests/v1/e2e.test.ts new file mode 100644 index 00000000..a35f18ad --- /dev/null +++ b/packages/files/tests/v1/e2e.test.ts @@ -0,0 +1,282 @@ +/* eslint-disable no-console -- E2E test with intentional logging to show streaming behavior. */ +import {describe, expect, it, vi} from 'vitest'; +import {newPatCredentials} from '@databricks/sdk-auth'; + +import {Client} from '../../src/v1/client'; +import {readAll} from '../../src/v1/utils'; + +// Allow up to 30 minutes for the 4 GiB streaming test. +vi.setConfig({testTimeout: 30 * 60 * 1000}); + +const HOST = 'https://dbc-6b51e481-2d96.cloud.databricks.com/'; +const VOLUME_PATH = '/Volumes/test-parth-1/schema-parth-1/parth-test'; +const PAT_TOKEN: string | undefined = process.env.PATTOKEN; + +describe.skipIf(PAT_TOKEN === undefined || PAT_TOKEN === '')( + 'Files E2E', + () => { + function createClient(): Client { + if (PAT_TOKEN === undefined || PAT_TOKEN === '') { + throw new Error('PATTOKEN is not set.'); + } + return new Client({ + host: HOST, + credentials: newPatCredentials(PAT_TOKEN), + // Use console so HTTP request/response logs are visible. + logger: console, + }); + } + + /** Creates a ReadableStream from a string. */ + function streamFrom(text: string): ReadableStream { + const data = new TextEncoder().encode(text); + return new ReadableStream({ + start(controller): void { + controller.enqueue(data); + controller.close(); + }, + }); + } + + /** + * Creates a ReadableStream that produces data in multiple chunks to + * exercise real streaming behavior. Each chunk is 64 KiB and the + * pull callback logs every chunk so the streaming behavior is visible + * in test output. Returns the stream and the full expected content. + */ + function multiChunkStream(totalBytes: number): { + stream: ReadableStream; + expected: Uint8Array; + } { + const chunkSize = 64 * 1024; // 64 KiB per chunk. + const expected = new Uint8Array(totalBytes); + // Fill with a deterministic pattern so we can verify the download. + for (let i = 0; i < totalBytes; i++) { + expected[i] = i % 256; + } + + let offset = 0; + let chunkNumber = 0; + const totalChunks = Math.ceil(totalBytes / chunkSize); + const stream = new ReadableStream({ + pull(controller): void { + if (offset >= totalBytes) { + console.log( + `[stream] all ${String(totalChunks)} chunks enqueued, closing stream` + ); + controller.close(); + return; + } + const end = Math.min(offset + chunkSize, totalBytes); + chunkNumber++; + console.log( + `[stream] enqueue chunk ${String(chunkNumber)}/${String(totalChunks)}: bytes ${String(offset)}-${String(end - 1)} (${String(end - offset)} bytes)` + ); + controller.enqueue(expected.slice(offset, end)); + offset = end; + }, + }); + + return {stream, expected}; + } + + it('uploads and downloads a small file', async () => { + const client = createClient(); + const fileName = `e2e-test-${String(Date.now())}.txt`; + const filePath = `${VOLUME_PATH}/${fileName}`; + const content = `Hello from sdk-js e2e test at ${new Date().toISOString()}`; + + // Upload the file. + await client.upload(undefined, { + filePath, + contents: streamFrom(content), + }); + + // Download and verify the content. + const resp = await client.download(undefined, {filePath}); + + expect(resp.contentLength).toBe(new TextEncoder().encode(content).length); + expect(resp.contentType).toBeDefined(); + + const body = await readAll(resp.contents); + const downloaded = new TextDecoder().decode(body); + expect(downloaded).toBe(content); + }); + + it('uploads and downloads a 5 MiB file via streaming', async () => { + const client = createClient(); + const fileName = `e2e-stream-${String(Date.now())}.bin`; + const filePath = `${VOLUME_PATH}/${fileName}`; + const size = 5 * 1024 * 1024; // 5 MiB. + const {stream, expected} = multiChunkStream(size); + + console.log(`[e2e] uploading ${String(size)} bytes to ${filePath}`); + + // Upload using a multi-chunk stream. + await client.upload(undefined, { + filePath, + contents: stream, + }); + + console.log('[e2e] upload complete, downloading'); + + // Download and verify every byte matches. + const resp = await client.download(undefined, {filePath}); + expect(resp.contentLength).toBe(size); + + const body = await readAll(resp.contents); + expect(body.length).toBe(size); + expect(body).toEqual(expected); + + console.log('[e2e] download verified, all bytes match'); + }); + + /** + * Creates a ReadableStream that generates data on the fly without + * pre-allocating the full buffer. Each byte at position i has the + * value i % 256, so it can be verified without holding the whole + * file in memory. + */ + function largeStream( + totalBytes: number, + chunkSize: number + ): ReadableStream { + let offset = 0; + const totalChunks = Math.ceil(totalBytes / chunkSize); + const logInterval = Math.max(1, Math.floor(totalChunks / 20)); // Log ~20 times. + let chunkNumber = 0; + return new ReadableStream({ + pull(controller): void { + if (offset >= totalBytes) { + console.log( + `[stream] all ${String(totalChunks)} chunks enqueued, closing stream` + ); + controller.close(); + return; + } + const end = Math.min(offset + chunkSize, totalBytes); + const size = end - offset; + const chunk = new Uint8Array(size); + for (let i = 0; i < size; i++) { + chunk[i] = (offset + i) % 256; + } + chunkNumber++; + if (chunkNumber % logInterval === 0 || chunkNumber === 1) { + const mb = Math.round(offset / (1024 * 1024)); + console.log( + `[stream] chunk ${String(chunkNumber)}/${String(totalChunks)} (${String(mb)} MiB sent)` + ); + } + controller.enqueue(chunk); + offset = end; + }, + }); + } + + /** + * Reads a stream chunk-by-chunk and verifies each byte matches the + * expected pattern (position % 256) without buffering the whole file. + * Returns the total number of bytes read. + */ + async function verifyStreamContent( + stream: ReadableStream, + expectedTotalBytes: number + ): Promise { + const reader = stream.getReader(); + let globalOffset = 0; + const logInterval = 100 * 1024 * 1024; // Log every 100 MiB. + let nextLogAt = logInterval; + for (;;) { + const {done, value} = await reader.read(); + if (done) { + break; + } + for (let i = 0; i < value.length; i++) { + const expected = (globalOffset + i) % 256; + if (value[i] !== expected) { + throw new Error( + `byte mismatch at offset ${String(globalOffset + i)}: expected ${String(expected)}, got ${String(value[i])}` + ); + } + } + globalOffset += value.length; + if (globalOffset >= nextLogAt) { + const mb = Math.round(globalOffset / (1024 * 1024)); + console.log( + `[verify] ${String(mb)}/${String(Math.round(expectedTotalBytes / (1024 * 1024)))} MiB verified` + ); + nextLogAt += logInterval; + } + } + return globalOffset; + } + + it('uploads and downloads a 4 GiB file via streaming', async () => { + const client = createClient(); + const fileName = `e2e-4gib-${String(Date.now())}.bin`; + const filePath = `${VOLUME_PATH}/${fileName}`; + const size = 4 * 1024 * 1024 * 1024; // 4 GiB. + const chunkSize = 1024 * 1024; // 1 MiB chunks. + + console.log( + `[e2e] uploading ${String(size)} bytes (4 GiB) to ${filePath}` + ); + const uploadStart = Date.now(); + + // Upload using a streaming source that generates data on the fly. + await client.upload(undefined, { + filePath, + contents: largeStream(size, chunkSize), + }); + + const uploadMs = Date.now() - uploadStart; + console.log( + `[e2e] upload complete in ${String(Math.round(uploadMs / 1000))}s, downloading` + ); + + // Download and verify every byte on the fly without buffering. + const downloadStart = Date.now(); + const resp = await client.download(undefined, {filePath}); + expect(resp.contentLength).toBe(size); + + const totalRead = await verifyStreamContent(resp.contents, size); + expect(totalRead).toBe(size); + + const downloadMs = Date.now() - downloadStart; + console.log( + `[e2e] download verified in ${String(Math.round(downloadMs / 1000))}s, all ${String(size)} bytes match` + ); + }); + + it('uploads with overwrite', async () => { + const client = createClient(); + const fileName = `e2e-overwrite-${String(Date.now())}.txt`; + const filePath = `${VOLUME_PATH}/${fileName}`; + + // Upload the first version. + await client.upload(undefined, { + filePath, + contents: streamFrom('version 1'), + }); + + // Upload again with overwrite. + await client.upload(undefined, { + filePath, + contents: streamFrom('version 2'), + overwrite: true, + }); + + // Download and verify we got version 2. + const resp = await client.download(undefined, {filePath}); + const body = await readAll(resp.contents); + expect(new TextDecoder().decode(body)).toBe('version 2'); + }); + + it('download returns 404 for nonexistent file', async () => { + const client = createClient(); + const filePath = `${VOLUME_PATH}/nonexistent-${String(Date.now())}.txt`; + + await expect(client.download(undefined, {filePath})).rejects.toThrow(); + }); + } +); diff --git a/packages/files/tests/v1/utils.test.ts b/packages/files/tests/v1/utils.test.ts new file mode 100644 index 00000000..c0bc4b68 --- /dev/null +++ b/packages/files/tests/v1/utils.test.ts @@ -0,0 +1,88 @@ +import {describe, expect, it} from 'vitest'; + +import {encodeFilePath, readAll} from '../../src/v1/utils'; + +describe('encodeFilePath', () => { + it.each([ + { + name: 'normal path with leading slash', + input: '/Volumes/catalog/schema/file.txt', + expected: 'Volumes/catalog/schema/file.txt', + }, + { + name: 'path with spaces', + input: '/path/with spaces/file name.txt', + expected: 'path/with%20spaces/file%20name.txt', + }, + { + name: 'path with percent character', + input: '/path/special%chars/file', + expected: 'path/special%25chars/file', + }, + { + name: 'path with question mark', + input: '/path/to/file?.txt', + expected: 'path/to/file%3F.txt', + }, + { + name: 'path with hash', + input: '/path/to/file#1.txt', + expected: 'path/to/file%231.txt', + }, + { + name: 'path without leading slash', + input: 'no-leading-slash/file.txt', + expected: 'no-leading-slash/file.txt', + }, + { + name: 'single segment', + input: '/single', + expected: 'single', + }, + ])('$name', ({input, expected}) => { + expect(encodeFilePath(input)).toBe(expected); + }); +}); + +describe('readAll', () => { + it('returns empty Uint8Array for null body', async () => { + const result = await readAll(null); + expect(result).toEqual(new Uint8Array(0)); + }); + + it('reads a single chunk', async () => { + const data = new TextEncoder().encode('hello world'); + const stream = new ReadableStream({ + start(controller): void { + controller.enqueue(data); + controller.close(); + }, + }); + const result = await readAll(stream); + expect(result).toEqual(data); + }); + + it('concatenates multiple chunks', async () => { + const chunk1 = new TextEncoder().encode('hello '); + const chunk2 = new TextEncoder().encode('world'); + const stream = new ReadableStream({ + start(controller): void { + controller.enqueue(chunk1); + controller.enqueue(chunk2); + controller.close(); + }, + }); + const result = await readAll(stream); + expect(new TextDecoder().decode(result)).toBe('hello world'); + }); + + it('reads an empty stream', async () => { + const stream = new ReadableStream({ + start(controller): void { + controller.close(); + }, + }); + const result = await readAll(stream); + expect(result).toEqual(new Uint8Array(0)); + }); +}); diff --git a/packages/files/tsconfig.json b/packages/files/tsconfig.json new file mode 100644 index 00000000..9b8dd1b0 --- /dev/null +++ b/packages/files/tsconfig.json @@ -0,0 +1,9 @@ +{ + "extends": "../../tsconfig.base.json", + "compilerOptions": { + "outDir": "./dist", + "rootDir": "./src" + }, + "include": ["src"], + "exclude": ["dist", "node_modules", "tests"] +} diff --git a/packages/files/vitest.config.browser.ts b/packages/files/vitest.config.browser.ts new file mode 100644 index 00000000..930f020f --- /dev/null +++ b/packages/files/vitest.config.browser.ts @@ -0,0 +1,14 @@ +import {defineConfig} from 'vitest/config'; + +export default defineConfig({ + test: { + browser: { + enabled: true, + name: 'chromium', + provider: 'playwright', + headless: true, + }, + include: ['tests/**/*.test.ts'], + exclude: ['tests/**/e2e.test.ts'], + }, +});