diff --git a/README.md b/README.md index adba6d2..f5ff299 100644 --- a/README.md +++ b/README.md @@ -247,6 +247,21 @@ await api.ingest("events", { pathname: "/home", }); +// Ingest retry behavior (disabled by default): +// - 429 retries use Retry-After / X-RateLimit-Reset headers. +// - 503 retries use SDK default exponential backoff. +await api.ingest( + "events", + { + timestamp: "2024-01-15 10:31:00", + event_name: "button_click", + pathname: "/pricing", + }, + { + maxRetries: 3, + } +); + // Import rows from URL/file await api.appendDatasource("events", { url: "https://example.com/events.csv", diff --git a/package.json b/package.json index f38d7fb..6e0832e 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "@tinybirdco/sdk", - "version": "0.0.57", + "version": "0.0.58", "description": "TypeScript SDK for Tinybird Forward - define datasources and pipes as TypeScript", "type": "module", "main": "./dist/index.js", diff --git a/src/api/api.test.ts b/src/api/api.test.ts index 7a317bb..2480cec 100644 --- a/src/api/api.test.ts +++ b/src/api/api.test.ts @@ -207,6 +207,395 @@ describe("TinybirdApi", () => { ).rejects.toThrow("Date values are not supported in ingest payloads"); }); + it("does not retry ingest on 503 when retry is disabled", async () => { + let attempts = 0; + + server.use( + http.post(`${BASE_URL}/v0/events`, () => { + attempts += 1; + return new HttpResponse("Service unavailable", { status: 503 }); + }) + ); + + const api = createTinybirdApi({ + baseUrl: BASE_URL, + token: "p.default-token", + }); + + await expect( + api.ingest("events", { timestamp: "2024-01-01 00:00:00" }) + ).rejects.toMatchObject({ + name: "TinybirdApiError", + statusCode: 503, + }); + expect(attempts).toBe(1); + }); + + it("retries ingest on 503 with exponential backoff", async () => { + let attempts = 0; + + server.use( + http.post(`${BASE_URL}/v0/events`, () => { + attempts += 1; + if (attempts === 1) { + return new HttpResponse("Service unavailable", { status: 503 }); + } + + return HttpResponse.json({ + successful_rows: 1, + quarantined_rows: 0, + }); + }) + ); + + const api = createTinybirdApi({ + baseUrl: BASE_URL, + token: "p.default-token", + }); + + const result = await api.ingest( + "events", + { timestamp: "2024-01-01 00:00:00" }, + { + maxRetries: 1, + } + ); + + expect(result).toEqual({ successful_rows: 1, quarantined_rows: 0 }); + expect(attempts).toBe(2); + }); + + it("retries ingest on 429 with retry-after header and succeeds", async () => { + let attempts = 0; + + server.use( + http.post(`${BASE_URL}/v0/events`, () => { + attempts += 1; + if (attempts === 1) { + return new HttpResponse("Rate limited", { + status: 429, + headers: { + "Retry-After": "0", + }, + }); + } + + return HttpResponse.json({ + successful_rows: 1, + quarantined_rows: 0, + }); + }) + ); + + const api = createTinybirdApi({ + baseUrl: BASE_URL, + token: "p.default-token", + }); + + const result = await api.ingest( + "events", + { timestamp: "2024-01-01 00:00:00" }, + { + maxRetries: 1, + } + ); + + expect(result).toEqual({ successful_rows: 1, quarantined_rows: 0 }); + expect(attempts).toBe(2); + }); + + it("drains retryable 429 response body before retrying", async () => { + let attempts = 0; + let firstResponse: Response | undefined; + + const customFetch: typeof fetch = async () => { + attempts += 1; + if (attempts === 1) { + const stream = new ReadableStream({ + start(controller) { + controller.enqueue(new TextEncoder().encode("rate limited")); + controller.close(); + }, + }); + + firstResponse = new Response(stream, { + status: 429, + headers: { + "Retry-After": "0", + }, + }); + return firstResponse; + } + + return new Response( + JSON.stringify({ + successful_rows: 1, + quarantined_rows: 0, + }), + { + status: 200, + headers: { + "Content-Type": "application/json", + }, + } + ); + }; + + const api = createTinybirdApi({ + baseUrl: BASE_URL, + token: "p.default-token", + fetch: customFetch, + }); + + const result = await api.ingest( + "events", + { timestamp: "2024-01-01 00:00:00" }, + { + maxRetries: 1, + } + ); + + expect(result).toEqual({ successful_rows: 1, quarantined_rows: 0 }); + expect(attempts).toBe(2); + expect(firstResponse?.bodyUsed).toBe(true); + }); + + it("does not retry 429 when rate-limit delay headers are missing", async () => { + let attempts = 0; + + server.use( + http.post(`${BASE_URL}/v0/events`, () => { + attempts += 1; + return new HttpResponse("Rate limited", { status: 429 }); + }) + ); + + const api = createTinybirdApi({ + baseUrl: BASE_URL, + token: "p.default-token", + }); + + await expect( + api.ingest( + "events", + { timestamp: "2024-01-01 00:00:00" }, + { + maxRetries: 3, + } + ) + ).rejects.toMatchObject({ + name: "TinybirdApiError", + statusCode: 429, + }); + expect(attempts).toBe(1); + }); + + it("does not retry ingest on non-retryable status by default", async () => { + let attempts = 0; + + server.use( + http.post(`${BASE_URL}/v0/events`, () => { + attempts += 1; + return HttpResponse.json({ error: "Invalid payload" }, { status: 400 }); + }) + ); + + const api = createTinybirdApi({ + baseUrl: BASE_URL, + token: "p.default-token", + }); + + await expect( + api.ingest( + "events", + { timestamp: "2024-01-01 00:00:00" }, + { + maxRetries: 3, + } + ) + ).rejects.toMatchObject({ + name: "TinybirdApiError", + statusCode: 400, + }); + + expect(attempts).toBe(1); + }); + + it("stops retrying ingest after maxRetries on 429", async () => { + let attempts = 0; + + server.use( + http.post(`${BASE_URL}/v0/events`, () => { + attempts += 1; + return new HttpResponse("Rate limited", { + status: 429, + headers: { + "Retry-After": "0", + }, + }); + }) + ); + + const api = createTinybirdApi({ + baseUrl: BASE_URL, + token: "p.default-token", + }); + + await expect( + api.ingest( + "events", + { timestamp: "2024-01-01 00:00:00" }, + { + maxRetries: 2, + } + ) + ).rejects.toMatchObject({ + name: "TinybirdApiError", + statusCode: 429, + }); + + expect(attempts).toBe(3); + }); + + it("stops retrying ingest after maxRetries on 503", async () => { + let attempts = 0; + + server.use( + http.post(`${BASE_URL}/v0/events`, () => { + attempts += 1; + return new HttpResponse("Service unavailable", { status: 503 }); + }) + ); + + const api = createTinybirdApi({ + baseUrl: BASE_URL, + token: "p.default-token", + }); + + await expect( + api.ingest( + "events", + { timestamp: "2024-01-01 00:00:00" }, + { + maxRetries: 2, + } + ) + ).rejects.toMatchObject({ + name: "TinybirdApiError", + statusCode: 503, + }); + + expect(attempts).toBe(3); + }); + + it("retries ingest on 503 when wait is false", async () => { + let attempts = 0; + + server.use( + http.post(`${BASE_URL}/v0/events`, () => { + attempts += 1; + if (attempts === 1) { + return new HttpResponse("Service unavailable", { status: 503 }); + } + + return HttpResponse.json({ + successful_rows: 1, + quarantined_rows: 0, + }); + }) + ); + + const api = createTinybirdApi({ + baseUrl: BASE_URL, + token: "p.default-token", + }); + + const result = await api.ingest( + "events", + { timestamp: "2024-01-01 00:00:00" }, + { + wait: false, + maxRetries: 1, + } + ); + + expect(result).toEqual({ + successful_rows: 1, + quarantined_rows: 0, + }); + expect(attempts).toBe(2); + }); + + it("does not retry 500 even when wait is false", async () => { + let attempts = 0; + + server.use( + http.post(`${BASE_URL}/v0/events`, () => { + attempts += 1; + return new HttpResponse("Internal error", { status: 500 }); + }) + ); + + const api = createTinybirdApi({ + baseUrl: BASE_URL, + token: "p.default-token", + }); + + await expect( + api.ingest( + "events", + { timestamp: "2024-01-01 00:00:00" }, + { + wait: false, + maxRetries: 3, + } + ) + ).rejects.toMatchObject({ + name: "TinybirdApiError", + statusCode: 500, + }); + + expect(attempts).toBe(1); + }); + + it("does not retry ingest on transient network errors", async () => { + let fetchAttempts = 0; + + server.use( + http.post(`${BASE_URL}/v0/events`, () => { + return HttpResponse.json({ + successful_rows: 1, + quarantined_rows: 0, + }); + }) + ); + + const flakyFetch: typeof fetch = async (input, init) => { + fetchAttempts += 1; + if (fetchAttempts === 1) { + throw new TypeError("fetch failed"); + } + return fetch(input, init); + }; + + const api = createTinybirdApi({ + baseUrl: BASE_URL, + token: "p.default-token", + fetch: flakyFetch, + }); + + await expect( + api.ingest( + "events", + { timestamp: "2024-01-01 00:00:00" }, + { + maxRetries: 1, + } + ) + ).rejects.toThrow("fetch failed"); + expect(fetchAttempts).toBe(1); + }); + it("executes raw SQL via tinybirdApi.sql", async () => { let rawSql: string | null = null; let contentType: string | null = null; diff --git a/src/api/api.ts b/src/api/api.ts index e78f73b..dab93fe 100644 --- a/src/api/api.ts +++ b/src/api/api.ts @@ -14,6 +14,8 @@ import type { } from "../client/types.js"; const DEFAULT_TIMEOUT = 30000; +const DEFAULT_INGEST_RETRY_503_BASE_DELAY_MS = 200; +const DEFAULT_INGEST_RETRY_503_MAX_DELAY_MS = 3000; /** * Public, decoupled Tinybird API wrapper configuration @@ -279,22 +281,49 @@ export class TinybirdApi { const ndjson = events .map((event) => JSON.stringify(this.serializeEvent(event))) .join("\n"); + const signal = this.createAbortSignal(options.timeout, options.signal); + const maxRetries = this.resolveIngestMaxRetries(options.maxRetries); + let retryCount = 0; + + while (true) { + let response: Response; + + try { + response = await this.request(url.toString(), { + method: "POST", + token: options.token, + headers: { + "Content-Type": "application/x-ndjson", + }, + body: ndjson, + signal, + }); + } catch (error) { + throw error; + } - const response = await this.request(url.toString(), { - method: "POST", - token: options.token, - headers: { - "Content-Type": "application/x-ndjson", - }, - body: ndjson, - signal: this.createAbortSignal(options.timeout, options.signal), - }); + if (response.ok) { + return (await response.json()) as IngestResult; + } + + const retry429Delay = this.resolveRetry429Delay(response, maxRetries, retryCount); + if (retry429Delay !== undefined) { + await this.discardResponseBody(response); + await this.sleep(retry429Delay, signal); + retryCount += 1; + continue; + } + + const retry503Delay = this.resolveRetry503Delay(response, maxRetries, retryCount); + if (retry503Delay !== undefined) { + await this.discardResponseBody(response); + await this.sleep(retry503Delay, signal); + retryCount += 1; + continue; + } - if (!response.ok) { await this.handleErrorResponse(response); } - - return (await response.json()) as IngestResult; } /** @@ -575,6 +604,164 @@ export class TinybirdApi { return AbortSignal.any([timeoutSignal, existingSignal]); } + private resolveIngestMaxRetries( + maxRetries: TinybirdApiIngestOptions["maxRetries"] + ): number | undefined { + if (maxRetries === undefined) { + return undefined; + } + + if (!Number.isFinite(maxRetries)) { + throw new Error("'maxRetries' must be a finite number"); + } + + return Math.max(0, Math.floor(maxRetries)); + } + + private resolveRetry429Delay( + response: Response, + maxRetries: number | undefined, + retryCount: number + ): number | undefined { + if (maxRetries === undefined) { + return undefined; + } + + if (response.status !== 429) { + return undefined; + } + + if (retryCount >= maxRetries) { + return undefined; + } + + return this.resolveRetryDelayFromHeaders(response); + } + + private resolveRetry503Delay( + response: Response, + maxRetries: number | undefined, + retryCount: number + ): number | undefined { + if (maxRetries === undefined) { + return undefined; + } + + if (response.status !== 503) { + return undefined; + } + + if (retryCount >= maxRetries) { + return undefined; + } + + return this.calculateRetry503DelayMs(retryCount); + } + + private resolveRetryDelayFromHeaders(response: Response): number | undefined { + const retryAfter = response.headers.get("retry-after"); + const retryAfterDelay = this.parseRetryAfterDelayMs(retryAfter); + if (retryAfterDelay !== undefined) { + return retryAfterDelay; + } + + const rateLimitReset = response.headers.get("x-ratelimit-reset"); + const rateLimitResetDelay = this.parseRateLimitResetDelayMs(rateLimitReset); + if (rateLimitResetDelay !== undefined) { + return rateLimitResetDelay; + } + return undefined; + } + + private parseRetryAfterDelayMs(value: string | null): number | undefined { + if (!value) { + return undefined; + } + + const trimmed = value.trim(); + const seconds = Number(trimmed); + if (Number.isFinite(seconds)) { + return Math.max(0, Math.floor(seconds * 1000)); + } + + const retryDateMs = Date.parse(trimmed); + if (Number.isNaN(retryDateMs)) { + return undefined; + } + + return Math.max(0, retryDateMs - Date.now()); + } + + private parseRateLimitResetDelayMs(value: string | null): number | undefined { + if (!value) { + return undefined; + } + + const numericValue = Number(value.trim()); + if (!Number.isFinite(numericValue)) { + return undefined; + } + + return Math.max(0, Math.floor(numericValue * 1000)); + } + + private calculateRetry503DelayMs(retryCount: number): number { + return Math.min( + DEFAULT_INGEST_RETRY_503_MAX_DELAY_MS, + DEFAULT_INGEST_RETRY_503_BASE_DELAY_MS * 2 ** retryCount + ); + } + + private async discardResponseBody(response: Response): Promise { + if (response.bodyUsed || !response.body) { + return; + } + + try { + await response.arrayBuffer(); + } catch { + try { + await response.body.cancel(); + } catch { + // Best effort cleanup only; never mask retry/error flow. + } + } + } + + private async sleep(delayMs: number, signal?: AbortSignal): Promise { + if (delayMs <= 0) { + return; + } + + await new Promise((resolve, reject) => { + const timer = setTimeout(() => { + cleanup(); + resolve(); + }, delayMs); + + const onAbort = () => { + cleanup(); + reject(signal?.reason ?? new DOMException("The operation was aborted.", "AbortError")); + }; + + const cleanup = () => { + clearTimeout(timer); + signal?.removeEventListener("abort", onAbort); + }; + + if (!signal) { + return; + } + + if (signal.aborted) { + onAbort(); + return; + } + + signal.addEventListener("abort", onAbort, { once: true }); + }); + } + private serializeEvent( event: Record ): Record { diff --git a/src/client/types.ts b/src/client/types.ts index 49171e4..e7b093b 100644 --- a/src/client/types.ts +++ b/src/client/types.ts @@ -152,6 +152,11 @@ export interface IngestOptions { signal?: AbortSignal; /** Wait for the ingestion to complete before returning */ wait?: boolean; + /** + * Number of retry attempts after the first request. + * Retries are disabled by default when undefined. + */ + maxRetries?: number; } /**