From cdcd6ebc60393891612e85240a2d28282e000420 Mon Sep 17 00:00:00 2001 From: Hugo Richard Date: Sun, 8 Feb 2026 21:06:18 +0000 Subject: [PATCH 1/5] feat: add shared drain pipeline for batching and retry --- apps/playground/app/config/tests.config.ts | 38 +++ apps/playground/app/pages/index.vue | 9 + packages/evlog/package.json | 7 + packages/evlog/src/adapters/axiom.ts | 13 +- packages/evlog/src/adapters/posthog.ts | 13 +- packages/evlog/src/adapters/sentry.ts | 13 +- packages/evlog/src/pipeline.ts | 147 +++++++++ packages/evlog/test/pipeline.test.ts | 341 +++++++++++++++++++++ packages/evlog/tsdown.config.ts | 1 + 9 files changed, 561 insertions(+), 21 deletions(-) create mode 100644 packages/evlog/src/pipeline.ts create mode 100644 packages/evlog/test/pipeline.test.ts diff --git a/apps/playground/app/config/tests.config.ts b/apps/playground/app/config/tests.config.ts index e28a451..f8ee015 100644 --- a/apps/playground/app/config/tests.config.ts +++ b/apps/playground/app/config/tests.config.ts @@ -265,6 +265,44 @@ export const testConfig = { }, ], } as TestSection, + { + id: 'pipeline', + label: 'Pipeline', + icon: 'i-lucide-layers', + title: 'Drain Pipeline (Batching + Retry)', + description: 'Events are buffered and sent in batches (size: 5, interval: 2s). Watch the terminal for "[evlog/pipeline] Flushing batch of N events" messages.', + layout: 'cards', + tests: [ + { + id: 'pipeline-single', + label: '1 Request', + description: 'Single event - buffered until batch size (5) or interval (2s) is reached', + endpoint: '/api/test/success', + method: 'GET', + badge: { + label: 'Buffered', + color: 'blue', + }, + toastOnSuccess: { + title: 'Event buffered', + description: 'Check terminal - will flush after 2s or when 5 events accumulate', + }, + }, + { + id: 'pipeline-batch', + label: 'Fire 10 Requests', + description: 'Fires 10 requests in parallel - should produce 2 batches of 5 events', + badge: { + label: '2 batches', + color: 'green', + }, + toastOnSuccess: { + title: '10 requests sent', + description: 'Check terminal - should see 2 batches of 5 events', + }, + }, + ], + } as TestSection, { id: 'drains', label: 'Drains', diff --git a/apps/playground/app/pages/index.vue b/apps/playground/app/pages/index.vue index d62c6c9..21f02a3 100644 --- a/apps/playground/app/pages/index.vue +++ b/apps/playground/app/pages/index.vue @@ -39,6 +39,12 @@ async function handleBatchRequest() { ) } +async function handlePipelineBatch() { + await Promise.all( + Array.from({ length: 10 }, () => $fetch('/api/test/success')), + ) +} + // Get custom onClick for specific tests function getOnClick(testId: string) { if (testId === 'structured-error-toast') { @@ -47,6 +53,9 @@ function getOnClick(testId: string) { if (testId === 'tail-fast-batch') { return handleBatchRequest } + if (testId === 'pipeline-batch') { + return handlePipelineBatch + } return undefined } diff --git a/packages/evlog/package.json b/packages/evlog/package.json index c5dcfef..5cb2ccb 100644 --- a/packages/evlog/package.json +++ b/packages/evlog/package.json @@ -58,6 +58,10 @@ "./enrichers": { "types": "./dist/enrichers.d.mts", "import": "./dist/enrichers.mjs" + }, + "./pipeline": { + "types": "./dist/pipeline.d.mts", + "import": "./dist/pipeline.mjs" } }, "main": "./dist/index.mjs", @@ -90,6 +94,9 @@ ], "enrichers": [ "./dist/enrichers.d.mts" + ], + "pipeline": [ + "./dist/pipeline.d.mts" ] } }, diff --git a/packages/evlog/src/adapters/axiom.ts b/packages/evlog/src/adapters/axiom.ts index 495e041..54d6d6b 100644 --- a/packages/evlog/src/adapters/axiom.ts +++ b/packages/evlog/src/adapters/axiom.ts @@ -34,8 +34,11 @@ export interface AxiomConfig { * })) * ``` */ -export function createAxiomDrain(overrides?: Partial): (ctx: DrainContext) => Promise { - return async (ctx: DrainContext) => { +export function createAxiomDrain(overrides?: Partial): (ctx: DrainContext | DrainContext[]) => Promise { + return async (ctx: DrainContext | DrainContext[]) => { + const contexts = Array.isArray(ctx) ? ctx : [ctx] + if (contexts.length === 0) return + const runtimeConfig = getRuntimeConfig() // Support both runtimeConfig.evlog.axiom and runtimeConfig.axiom const evlogAxiom = runtimeConfig?.evlog?.axiom @@ -55,11 +58,7 @@ export function createAxiomDrain(overrides?: Partial): (ctx: DrainC return } - try { - await sendToAxiom(ctx.event, config as AxiomConfig) - } catch (error) { - console.error('[evlog/axiom] Failed to send event:', error) - } + await sendBatchToAxiom(contexts.map(c => c.event), config as AxiomConfig) } } diff --git a/packages/evlog/src/adapters/posthog.ts b/packages/evlog/src/adapters/posthog.ts index 2966232..97a7faf 100644 --- a/packages/evlog/src/adapters/posthog.ts +++ b/packages/evlog/src/adapters/posthog.ts @@ -61,8 +61,11 @@ export function toPostHogEvent(event: WideEvent, config: PostHogConfig): PostHog * })) * ``` */ -export function createPostHogDrain(overrides?: Partial): (ctx: DrainContext) => Promise { - return async (ctx: DrainContext) => { +export function createPostHogDrain(overrides?: Partial): (ctx: DrainContext | DrainContext[]) => Promise { + return async (ctx: DrainContext | DrainContext[]) => { + const contexts = Array.isArray(ctx) ? ctx : [ctx] + if (contexts.length === 0) return + const runtimeConfig = getRuntimeConfig() // Support both runtimeConfig.evlog.posthog and runtimeConfig.posthog const evlogPostHog = runtimeConfig?.evlog?.posthog @@ -82,11 +85,7 @@ export function createPostHogDrain(overrides?: Partial): (ctx: Dr return } - try { - await sendToPostHog(ctx.event, config as PostHogConfig) - } catch (error) { - console.error('[evlog/posthog] Failed to send event:', error) - } + await sendBatchToPostHog(contexts.map(c => c.event), config as PostHogConfig) } } diff --git a/packages/evlog/src/adapters/sentry.ts b/packages/evlog/src/adapters/sentry.ts index 45f765b..2c85ad0 100644 --- a/packages/evlog/src/adapters/sentry.ts +++ b/packages/evlog/src/adapters/sentry.ts @@ -214,8 +214,11 @@ function buildEnvelopeBody(logs: SentryLog[], dsn: string): string { * })) * ``` */ -export function createSentryDrain(overrides?: Partial): (ctx: DrainContext) => Promise { - return async (ctx: DrainContext) => { +export function createSentryDrain(overrides?: Partial): (ctx: DrainContext | DrainContext[]) => Promise { + return async (ctx: DrainContext | DrainContext[]) => { + const contexts = Array.isArray(ctx) ? ctx : [ctx] + if (contexts.length === 0) return + const runtimeConfig = getRuntimeConfig() const evlogSentry = runtimeConfig?.evlog?.sentry const rootSentry = runtimeConfig?.sentry @@ -233,11 +236,7 @@ export function createSentryDrain(overrides?: Partial): (ctx: Drai return } - try { - await sendToSentry(ctx.event, config as SentryConfig) - } catch (error) { - console.error('[evlog/sentry] Failed to send log:', error) - } + await sendBatchToSentry(contexts.map(c => c.event), config as SentryConfig) } } diff --git a/packages/evlog/src/pipeline.ts b/packages/evlog/src/pipeline.ts new file mode 100644 index 0000000..c88faab --- /dev/null +++ b/packages/evlog/src/pipeline.ts @@ -0,0 +1,147 @@ +export interface DrainPipelineOptions { + batch?: { + /** Default: 50 */ + size?: number + /** Default: 5000 */ + intervalMs?: number + } + retry?: { + /** Including the first attempt. Default: 3 */ + maxAttempts?: number + /** Default: 'exponential' */ + backoff?: 'exponential' | 'linear' | 'fixed' + /** Default: 1000 */ + initialDelayMs?: number + /** Default: 30000 */ + maxDelayMs?: number + } + /** Default: 1000 */ + maxBufferSize?: number + onDropped?: (events: T[], error?: Error) => void +} + +export interface PipelineDrainFn { + (ctx: T): void + /** Flush all buffered events. Call on server shutdown. */ + flush: () => Promise + readonly pending: number +} + +export function createDrainPipeline(options?: DrainPipelineOptions): (drain: (ctx: T | T[]) => void | Promise) => PipelineDrainFn { + const batchSize = options?.batch?.size ?? 50 + const intervalMs = options?.batch?.intervalMs ?? 5000 + const maxBufferSize = options?.maxBufferSize ?? 1000 + const maxAttempts = options?.retry?.maxAttempts ?? 3 + const backoffStrategy = options?.retry?.backoff ?? 'exponential' + const initialDelayMs = options?.retry?.initialDelayMs ?? 1000 + const maxDelayMs = options?.retry?.maxDelayMs ?? 30000 + const onDropped = options?.onDropped + + return (drain: (ctx: T | T[]) => void | Promise): PipelineDrainFn => { + const buffer: T[] = [] + let timer: ReturnType | null = null + let activeFlush: Promise | null = null + + function clearTimer(): void { + if (timer !== null) { + clearTimeout(timer) + timer = null + } + } + + function scheduleFlush(): void { + if (timer !== null || activeFlush) return + timer = setTimeout(() => { + timer = null + if (!activeFlush) startFlush() + }, intervalMs) + } + + function getRetryDelay(attempt: number): number { + let delay: number + switch (backoffStrategy) { + case 'linear': + delay = initialDelayMs * attempt + break + case 'fixed': + delay = initialDelayMs + break + case 'exponential': + default: + delay = initialDelayMs * 2 ** (attempt - 1) + break + } + return Math.min(delay, maxDelayMs) + } + + async function sendWithRetry(batch: T[]): Promise { + let lastError: Error | undefined + for (let attempt = 1; attempt <= maxAttempts; attempt++) { + try { + await drain(batch) + return + } + catch (error) { + lastError = error instanceof Error ? error : new Error(String(error)) + if (attempt < maxAttempts) { + await new Promise(r => setTimeout(r, getRetryDelay(attempt))) + } + } + } + onDropped?.(batch, lastError) + } + + async function drainBuffer(): Promise { + while (buffer.length > 0) { + const batch = buffer.splice(0, batchSize) + await sendWithRetry(batch) + } + } + + function startFlush(): void { + if (activeFlush) return + activeFlush = drainBuffer().finally(() => { + activeFlush = null + if (buffer.length >= batchSize) { + startFlush() + } + else if (buffer.length > 0) { + scheduleFlush() + } + }) + } + + function push(ctx: T): void { + if (buffer.length >= maxBufferSize) { + const dropped = buffer.splice(0, 1) + onDropped?.(dropped) + } + buffer.push(ctx) + + if (buffer.length >= batchSize) { + clearTimer() + startFlush() + } + else if (!activeFlush) { + scheduleFlush() + } + } + + async function flush(): Promise { + clearTimer() + if (activeFlush) { + await activeFlush + } + await drainBuffer() + } + + const hookFn = push as PipelineDrainFn + hookFn.flush = flush + Object.defineProperty(hookFn, 'pending', { + get: () => buffer.length, + enumerable: true, + }) + + return hookFn + } +} diff --git a/packages/evlog/test/pipeline.test.ts b/packages/evlog/test/pipeline.test.ts new file mode 100644 index 0000000..5267b05 --- /dev/null +++ b/packages/evlog/test/pipeline.test.ts @@ -0,0 +1,341 @@ +import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest' +import type { DrainContext } from '../src/types' +import { createDrainPipeline } from '../src/pipeline' + +function createTestContext(id: number): DrainContext { + return { + event: { + timestamp: '2024-01-01T00:00:00.000Z', + level: 'info', + service: 'test', + environment: 'test', + id, + }, + } +} + +describe('createDrainPipeline', () => { + beforeEach(() => { + vi.useFakeTimers() + }) + + afterEach(() => { + vi.useRealTimers() + }) + + describe('batching by size', () => { + it('does not flush before batch size is reached', async () => { + const drain = vi.fn().mockResolvedValue(undefined) + const hook = createDrainPipeline({ batch: { size: 3, intervalMs: 60000 } })(drain) + + hook(createTestContext(1)) + hook(createTestContext(2)) + + await vi.advanceTimersByTimeAsync(0) + expect(drain).not.toHaveBeenCalled() + + await hook.flush() + }) + + it('flushes when batch size is reached', async () => { + const drain = vi.fn().mockResolvedValue(undefined) + const hook = createDrainPipeline({ batch: { size: 3, intervalMs: 60000 } })(drain) + + hook(createTestContext(1)) + hook(createTestContext(2)) + hook(createTestContext(3)) + + await vi.runAllTimersAsync() + + expect(drain).toHaveBeenCalledTimes(1) + const batch = drain.mock.calls[0]![0] as DrainContext[] + expect(batch).toHaveLength(3) + expect(batch[0]!.event.id).toBe(1) + expect(batch[1]!.event.id).toBe(2) + expect(batch[2]!.event.id).toBe(3) + }) + + it('splits into multiple batches', async () => { + const drain = vi.fn().mockResolvedValue(undefined) + const hook = createDrainPipeline({ batch: { size: 3 } })(drain) + + for (let i = 1; i <= 7; i++) hook(createTestContext(i)) + + await hook.flush() + + expect(drain).toHaveBeenCalledTimes(3) + expect((drain.mock.calls[0]![0] as DrainContext[])).toHaveLength(3) + expect((drain.mock.calls[1]![0] as DrainContext[])).toHaveLength(3) + expect((drain.mock.calls[2]![0] as DrainContext[])).toHaveLength(1) + }) + }) + + describe('batching by interval', () => { + it('flushes after interval expires', async () => { + const drain = vi.fn().mockResolvedValue(undefined) + const hook = createDrainPipeline({ batch: { size: 100, intervalMs: 5000 } })(drain) + + hook(createTestContext(1)) + hook(createTestContext(2)) + + await vi.advanceTimersByTimeAsync(4999) + expect(drain).not.toHaveBeenCalled() + + await vi.advanceTimersByTimeAsync(1) + expect(drain).toHaveBeenCalledTimes(1) + expect((drain.mock.calls[0]![0] as DrainContext[])).toHaveLength(2) + }) + + it('resets interval when batch size is reached', async () => { + const drain = vi.fn().mockResolvedValue(undefined) + const hook = createDrainPipeline({ batch: { size: 2, intervalMs: 5000 } })(drain) + + hook(createTestContext(1)) + hook(createTestContext(2)) + + // Batch size reached, should flush immediately + await vi.advanceTimersByTimeAsync(0) + expect(drain).toHaveBeenCalledTimes(1) + + // Push one more - should start interval timer, not flush immediately + hook(createTestContext(3)) + await vi.advanceTimersByTimeAsync(0) + expect(drain).toHaveBeenCalledTimes(1) + + // Wait for interval + await vi.advanceTimersByTimeAsync(5000) + expect(drain).toHaveBeenCalledTimes(2) + }) + }) + + describe('retry', () => { + it('retries on failure with exponential backoff', async () => { + const drain = vi.fn() + .mockRejectedValueOnce(new Error('fail 1')) + .mockRejectedValueOnce(new Error('fail 2')) + .mockResolvedValueOnce(undefined) + + const hook = createDrainPipeline({ + batch: { size: 1 }, + retry: { maxAttempts: 3, backoff: 'exponential', initialDelayMs: 100, maxDelayMs: 10000 }, + })(drain) + + hook(createTestContext(1)) + + // First attempt (immediate) + await vi.advanceTimersByTimeAsync(0) + expect(drain).toHaveBeenCalledTimes(1) + + // Wait for first retry delay (100ms * 2^0 = 100ms) + await vi.advanceTimersByTimeAsync(100) + expect(drain).toHaveBeenCalledTimes(2) + + // Wait for second retry delay (100ms * 2^1 = 200ms) + await vi.advanceTimersByTimeAsync(200) + expect(drain).toHaveBeenCalledTimes(3) + }) + + it('retries with linear backoff', async () => { + const drain = vi.fn() + .mockRejectedValueOnce(new Error('fail')) + .mockRejectedValueOnce(new Error('fail')) + .mockResolvedValueOnce(undefined) + + const hook = createDrainPipeline({ + batch: { size: 1 }, + retry: { maxAttempts: 3, backoff: 'linear', initialDelayMs: 100 }, + })(drain) + + hook(createTestContext(1)) + + await vi.advanceTimersByTimeAsync(0) + expect(drain).toHaveBeenCalledTimes(1) + + // Linear: 100 * 1 = 100ms + await vi.advanceTimersByTimeAsync(100) + expect(drain).toHaveBeenCalledTimes(2) + + // Linear: 100 * 2 = 200ms + await vi.advanceTimersByTimeAsync(200) + expect(drain).toHaveBeenCalledTimes(3) + }) + + it('retries with fixed backoff', async () => { + const drain = vi.fn() + .mockRejectedValueOnce(new Error('fail')) + .mockResolvedValueOnce(undefined) + + const hook = createDrainPipeline({ + batch: { size: 1 }, + retry: { maxAttempts: 2, backoff: 'fixed', initialDelayMs: 100 }, + })(drain) + + hook(createTestContext(1)) + + await vi.advanceTimersByTimeAsync(0) + expect(drain).toHaveBeenCalledTimes(1) + + // Fixed: always 100ms + await vi.advanceTimersByTimeAsync(100) + expect(drain).toHaveBeenCalledTimes(2) + }) + + it('caps retry delay at maxDelayMs', async () => { + const drain = vi.fn() + .mockRejectedValueOnce(new Error('fail')) + .mockResolvedValueOnce(undefined) + + const hook = createDrainPipeline({ + batch: { size: 1 }, + retry: { maxAttempts: 2, backoff: 'exponential', initialDelayMs: 1000, maxDelayMs: 500 }, + })(drain) + + hook(createTestContext(1)) + + await vi.advanceTimersByTimeAsync(0) + expect(drain).toHaveBeenCalledTimes(1) + + // Exponential: 1000 * 2^0 = 1000ms, capped to 500ms + await vi.advanceTimersByTimeAsync(500) + expect(drain).toHaveBeenCalledTimes(2) + }) + + it('calls onDropped after all retries exhausted', async () => { + const drain = vi.fn().mockRejectedValue(new Error('permanent failure')) + const onDropped = vi.fn() + + const hook = createDrainPipeline({ + batch: { size: 1 }, + retry: { maxAttempts: 2, backoff: 'fixed', initialDelayMs: 50 }, + onDropped, + })(drain) + + hook(createTestContext(1)) + + await vi.runAllTimersAsync() + + expect(drain).toHaveBeenCalledTimes(2) + expect(onDropped).toHaveBeenCalledTimes(1) + expect((onDropped.mock.calls[0]![0] as DrainContext[])).toHaveLength(1) + expect(onDropped.mock.calls[0]![1]).toBeInstanceOf(Error) + expect((onDropped.mock.calls[0]![1] as Error).message).toBe('permanent failure') + }) + }) + + describe('buffer overflow', () => { + it('drops oldest events and calls onDropped when buffer is full', async () => { + const drain = vi.fn().mockResolvedValue(undefined) + const onDropped = vi.fn() + + const hook = createDrainPipeline({ + batch: { size: 100, intervalMs: 60000 }, + maxBufferSize: 3, + onDropped, + })(drain) + + hook(createTestContext(1)) + hook(createTestContext(2)) + hook(createTestContext(3)) + expect(onDropped).not.toHaveBeenCalled() + + // Buffer full - should drop oldest + hook(createTestContext(4)) + expect(onDropped).toHaveBeenCalledTimes(1) + expect((onDropped.mock.calls[0]![0] as DrainContext[])).toHaveLength(1) + expect((onDropped.mock.calls[0]![0] as DrainContext[])[0]!.event.id).toBe(1) + + expect(hook.pending).toBe(3) + + // Flush and verify the remaining events are 2, 3, 4 + await hook.flush() + const batch = drain.mock.calls[0]![0] as DrainContext[] + expect(batch.map(c => c.event.id)).toEqual([2, 3, 4]) + }) + }) + + describe('flush()', () => { + it('drains all buffered events', async () => { + const drain = vi.fn().mockResolvedValue(undefined) + const hook = createDrainPipeline({ batch: { size: 100 } })(drain) + + hook(createTestContext(1)) + hook(createTestContext(2)) + hook(createTestContext(3)) + + await hook.flush() + + expect(drain).toHaveBeenCalledTimes(1) + expect((drain.mock.calls[0]![0] as DrainContext[])).toHaveLength(3) + expect(hook.pending).toBe(0) + }) + + it('is safe to call when buffer is empty', async () => { + const drain = vi.fn().mockResolvedValue(undefined) + const hook = createDrainPipeline()(drain) + + await hook.flush() + + expect(drain).not.toHaveBeenCalled() + }) + + it('flush drains events that arrived during active flush', async () => { + const drain = vi.fn().mockResolvedValue(undefined) + const hook = createDrainPipeline({ batch: { size: 2, intervalMs: 60000 } })(drain) + + // Push 2 events to trigger auto-flush + hook(createTestContext(1)) + hook(createTestContext(2)) + + // Push more while flush may be in progress + hook(createTestContext(3)) + + // Explicit flush should drain everything + await hook.flush() + + expect(hook.pending).toBe(0) + const allEvents = drain.mock.calls.flatMap(call => call[0] as DrainContext[]) + expect(allEvents).toHaveLength(3) + }) + }) + + describe('pending', () => { + it('returns current buffer size', () => { + const drain = vi.fn().mockResolvedValue(undefined) + const hook = createDrainPipeline({ batch: { size: 100, intervalMs: 60000 } })(drain) + + expect(hook.pending).toBe(0) + hook(createTestContext(1)) + expect(hook.pending).toBe(1) + hook(createTestContext(2)) + expect(hook.pending).toBe(2) + }) + + it('returns 0 after flush', async () => { + const drain = vi.fn().mockResolvedValue(undefined) + const hook = createDrainPipeline({ batch: { size: 100 } })(drain) + + hook(createTestContext(1)) + hook(createTestContext(2)) + expect(hook.pending).toBe(2) + + await hook.flush() + expect(hook.pending).toBe(0) + }) + }) + + describe('defaults', () => { + it('uses default options when none provided', async () => { + const drain = vi.fn().mockResolvedValue(undefined) + const hook = createDrainPipeline()(drain) + + hook(createTestContext(1)) + + // Default interval is 5000ms + await vi.advanceTimersByTimeAsync(4999) + expect(drain).not.toHaveBeenCalled() + + await vi.advanceTimersByTimeAsync(1) + expect(drain).toHaveBeenCalledTimes(1) + }) + }) +}) diff --git a/packages/evlog/tsdown.config.ts b/packages/evlog/tsdown.config.ts index 1198b3f..d57bc47 100644 --- a/packages/evlog/tsdown.config.ts +++ b/packages/evlog/tsdown.config.ts @@ -21,6 +21,7 @@ export default defineConfig({ 'adapters/posthog': 'src/adapters/posthog.ts', 'adapters/sentry': 'src/adapters/sentry.ts', 'enrichers': 'src/enrichers/index.ts', + 'pipeline': 'src/pipeline.ts', }, format: 'esm', dts: true, From 749936ad95c83419dfaa07b29db7e00ca45ff24f Mon Sep 17 00:00:00 2001 From: "autofix-ci[bot]" <114827586+autofix-ci[bot]@users.noreply.github.com> Date: Sun, 8 Feb 2026 21:09:26 +0000 Subject: [PATCH 2/5] chore: apply automated lint fixes --- packages/evlog/src/pipeline.ts | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/packages/evlog/src/pipeline.ts b/packages/evlog/src/pipeline.ts index c88faab..86c4334 100644 --- a/packages/evlog/src/pipeline.ts +++ b/packages/evlog/src/pipeline.ts @@ -80,8 +80,7 @@ export function createDrainPipeline(options?: DrainPipelineOptions< try { await drain(batch) return - } - catch (error) { + } catch (error) { lastError = error instanceof Error ? error : new Error(String(error)) if (attempt < maxAttempts) { await new Promise(r => setTimeout(r, getRetryDelay(attempt))) @@ -104,8 +103,7 @@ export function createDrainPipeline(options?: DrainPipelineOptions< activeFlush = null if (buffer.length >= batchSize) { startFlush() - } - else if (buffer.length > 0) { + } else if (buffer.length > 0) { scheduleFlush() } }) @@ -121,8 +119,7 @@ export function createDrainPipeline(options?: DrainPipelineOptions< if (buffer.length >= batchSize) { clearTimer() startFlush() - } - else if (!activeFlush) { + } else if (!activeFlush) { scheduleFlush() } } From e37564a1fcf7daf7fb0c1691b2cb9e98429dee56 Mon Sep 17 00:00:00 2001 From: Hugo Richard Date: Sun, 8 Feb 2026 21:06:18 +0000 Subject: [PATCH 3/5] feat: add shared drain pipeline for batching and retry --- apps/playground/app/config/tests.config.ts | 38 +++ apps/playground/app/pages/index.vue | 9 + packages/evlog/package.json | 7 + packages/evlog/src/adapters/axiom.ts | 13 +- packages/evlog/src/adapters/posthog.ts | 13 +- packages/evlog/src/adapters/sentry.ts | 13 +- packages/evlog/src/pipeline.ts | 148 +++++++++ packages/evlog/test/pipeline.test.ts | 341 +++++++++++++++++++++ packages/evlog/tsdown.config.ts | 1 + 9 files changed, 562 insertions(+), 21 deletions(-) create mode 100644 packages/evlog/src/pipeline.ts create mode 100644 packages/evlog/test/pipeline.test.ts diff --git a/apps/playground/app/config/tests.config.ts b/apps/playground/app/config/tests.config.ts index e28a451..f8ee015 100644 --- a/apps/playground/app/config/tests.config.ts +++ b/apps/playground/app/config/tests.config.ts @@ -265,6 +265,44 @@ export const testConfig = { }, ], } as TestSection, + { + id: 'pipeline', + label: 'Pipeline', + icon: 'i-lucide-layers', + title: 'Drain Pipeline (Batching + Retry)', + description: 'Events are buffered and sent in batches (size: 5, interval: 2s). Watch the terminal for "[evlog/pipeline] Flushing batch of N events" messages.', + layout: 'cards', + tests: [ + { + id: 'pipeline-single', + label: '1 Request', + description: 'Single event - buffered until batch size (5) or interval (2s) is reached', + endpoint: '/api/test/success', + method: 'GET', + badge: { + label: 'Buffered', + color: 'blue', + }, + toastOnSuccess: { + title: 'Event buffered', + description: 'Check terminal - will flush after 2s or when 5 events accumulate', + }, + }, + { + id: 'pipeline-batch', + label: 'Fire 10 Requests', + description: 'Fires 10 requests in parallel - should produce 2 batches of 5 events', + badge: { + label: '2 batches', + color: 'green', + }, + toastOnSuccess: { + title: '10 requests sent', + description: 'Check terminal - should see 2 batches of 5 events', + }, + }, + ], + } as TestSection, { id: 'drains', label: 'Drains', diff --git a/apps/playground/app/pages/index.vue b/apps/playground/app/pages/index.vue index d62c6c9..21f02a3 100644 --- a/apps/playground/app/pages/index.vue +++ b/apps/playground/app/pages/index.vue @@ -39,6 +39,12 @@ async function handleBatchRequest() { ) } +async function handlePipelineBatch() { + await Promise.all( + Array.from({ length: 10 }, () => $fetch('/api/test/success')), + ) +} + // Get custom onClick for specific tests function getOnClick(testId: string) { if (testId === 'structured-error-toast') { @@ -47,6 +53,9 @@ function getOnClick(testId: string) { if (testId === 'tail-fast-batch') { return handleBatchRequest } + if (testId === 'pipeline-batch') { + return handlePipelineBatch + } return undefined } diff --git a/packages/evlog/package.json b/packages/evlog/package.json index c5dcfef..5cb2ccb 100644 --- a/packages/evlog/package.json +++ b/packages/evlog/package.json @@ -58,6 +58,10 @@ "./enrichers": { "types": "./dist/enrichers.d.mts", "import": "./dist/enrichers.mjs" + }, + "./pipeline": { + "types": "./dist/pipeline.d.mts", + "import": "./dist/pipeline.mjs" } }, "main": "./dist/index.mjs", @@ -90,6 +94,9 @@ ], "enrichers": [ "./dist/enrichers.d.mts" + ], + "pipeline": [ + "./dist/pipeline.d.mts" ] } }, diff --git a/packages/evlog/src/adapters/axiom.ts b/packages/evlog/src/adapters/axiom.ts index 495e041..54d6d6b 100644 --- a/packages/evlog/src/adapters/axiom.ts +++ b/packages/evlog/src/adapters/axiom.ts @@ -34,8 +34,11 @@ export interface AxiomConfig { * })) * ``` */ -export function createAxiomDrain(overrides?: Partial): (ctx: DrainContext) => Promise { - return async (ctx: DrainContext) => { +export function createAxiomDrain(overrides?: Partial): (ctx: DrainContext | DrainContext[]) => Promise { + return async (ctx: DrainContext | DrainContext[]) => { + const contexts = Array.isArray(ctx) ? ctx : [ctx] + if (contexts.length === 0) return + const runtimeConfig = getRuntimeConfig() // Support both runtimeConfig.evlog.axiom and runtimeConfig.axiom const evlogAxiom = runtimeConfig?.evlog?.axiom @@ -55,11 +58,7 @@ export function createAxiomDrain(overrides?: Partial): (ctx: DrainC return } - try { - await sendToAxiom(ctx.event, config as AxiomConfig) - } catch (error) { - console.error('[evlog/axiom] Failed to send event:', error) - } + await sendBatchToAxiom(contexts.map(c => c.event), config as AxiomConfig) } } diff --git a/packages/evlog/src/adapters/posthog.ts b/packages/evlog/src/adapters/posthog.ts index 2966232..97a7faf 100644 --- a/packages/evlog/src/adapters/posthog.ts +++ b/packages/evlog/src/adapters/posthog.ts @@ -61,8 +61,11 @@ export function toPostHogEvent(event: WideEvent, config: PostHogConfig): PostHog * })) * ``` */ -export function createPostHogDrain(overrides?: Partial): (ctx: DrainContext) => Promise { - return async (ctx: DrainContext) => { +export function createPostHogDrain(overrides?: Partial): (ctx: DrainContext | DrainContext[]) => Promise { + return async (ctx: DrainContext | DrainContext[]) => { + const contexts = Array.isArray(ctx) ? ctx : [ctx] + if (contexts.length === 0) return + const runtimeConfig = getRuntimeConfig() // Support both runtimeConfig.evlog.posthog and runtimeConfig.posthog const evlogPostHog = runtimeConfig?.evlog?.posthog @@ -82,11 +85,7 @@ export function createPostHogDrain(overrides?: Partial): (ctx: Dr return } - try { - await sendToPostHog(ctx.event, config as PostHogConfig) - } catch (error) { - console.error('[evlog/posthog] Failed to send event:', error) - } + await sendBatchToPostHog(contexts.map(c => c.event), config as PostHogConfig) } } diff --git a/packages/evlog/src/adapters/sentry.ts b/packages/evlog/src/adapters/sentry.ts index 45f765b..2c85ad0 100644 --- a/packages/evlog/src/adapters/sentry.ts +++ b/packages/evlog/src/adapters/sentry.ts @@ -214,8 +214,11 @@ function buildEnvelopeBody(logs: SentryLog[], dsn: string): string { * })) * ``` */ -export function createSentryDrain(overrides?: Partial): (ctx: DrainContext) => Promise { - return async (ctx: DrainContext) => { +export function createSentryDrain(overrides?: Partial): (ctx: DrainContext | DrainContext[]) => Promise { + return async (ctx: DrainContext | DrainContext[]) => { + const contexts = Array.isArray(ctx) ? ctx : [ctx] + if (contexts.length === 0) return + const runtimeConfig = getRuntimeConfig() const evlogSentry = runtimeConfig?.evlog?.sentry const rootSentry = runtimeConfig?.sentry @@ -233,11 +236,7 @@ export function createSentryDrain(overrides?: Partial): (ctx: Drai return } - try { - await sendToSentry(ctx.event, config as SentryConfig) - } catch (error) { - console.error('[evlog/sentry] Failed to send log:', error) - } + await sendBatchToSentry(contexts.map(c => c.event), config as SentryConfig) } } diff --git a/packages/evlog/src/pipeline.ts b/packages/evlog/src/pipeline.ts new file mode 100644 index 0000000..63b632d --- /dev/null +++ b/packages/evlog/src/pipeline.ts @@ -0,0 +1,148 @@ +export interface DrainPipelineOptions { + batch?: { + /** Maximum number of events per batch sent to the drain function. @default 50 */ + size?: number + /** Maximum time (ms) an event can stay buffered before a flush is triggered, even if the batch is not full. @default 5000 */ + intervalMs?: number + } + retry?: { + /** Total number of attempts (including the initial one) before dropping the batch. @default 3 */ + maxAttempts?: number + /** Delay strategy between retry attempts. @default 'exponential' */ + backoff?: 'exponential' | 'linear' | 'fixed' + /** Base delay (ms) for the first retry. Scaled by the backoff strategy on subsequent retries. @default 1000 */ + initialDelayMs?: number + /** Upper bound (ms) for any single retry delay. @default 30000 */ + maxDelayMs?: number + } + /** Maximum number of events held in the buffer. When exceeded, the oldest event is dropped. @default 1000 */ + maxBufferSize?: number + /** Called when a batch is dropped after all retry attempts are exhausted, or when the buffer overflows. */ + onDropped?: (events: T[], error?: Error) => void +} + +export interface PipelineDrainFn { + (ctx: T): void + /** Flush all buffered events. Call on server shutdown. */ + flush: () => Promise + readonly pending: number +} + +export function createDrainPipeline(options?: DrainPipelineOptions): (drain: (ctx: T | T[]) => void | Promise) => PipelineDrainFn { + const batchSize = options?.batch?.size ?? 50 + const intervalMs = options?.batch?.intervalMs ?? 5000 + const maxBufferSize = options?.maxBufferSize ?? 1000 + const maxAttempts = options?.retry?.maxAttempts ?? 3 + const backoffStrategy = options?.retry?.backoff ?? 'exponential' + const initialDelayMs = options?.retry?.initialDelayMs ?? 1000 + const maxDelayMs = options?.retry?.maxDelayMs ?? 30000 + const onDropped = options?.onDropped + + return (drain: (ctx: T | T[]) => void | Promise): PipelineDrainFn => { + const buffer: T[] = [] + let timer: ReturnType | null = null + let activeFlush: Promise | null = null + + function clearTimer(): void { + if (timer !== null) { + clearTimeout(timer) + timer = null + } + } + + function scheduleFlush(): void { + if (timer !== null || activeFlush) return + timer = setTimeout(() => { + timer = null + if (!activeFlush) startFlush() + }, intervalMs) + } + + function getRetryDelay(attempt: number): number { + let delay: number + switch (backoffStrategy) { + case 'linear': + delay = initialDelayMs * attempt + break + case 'fixed': + delay = initialDelayMs + break + case 'exponential': + default: + delay = initialDelayMs * 2 ** (attempt - 1) + break + } + return Math.min(delay, maxDelayMs) + } + + async function sendWithRetry(batch: T[]): Promise { + let lastError: Error | undefined + for (let attempt = 1; attempt <= maxAttempts; attempt++) { + try { + await drain(batch) + return + } + catch (error) { + lastError = error instanceof Error ? error : new Error(String(error)) + if (attempt < maxAttempts) { + await new Promise(r => setTimeout(r, getRetryDelay(attempt))) + } + } + } + onDropped?.(batch, lastError) + } + + async function drainBuffer(): Promise { + while (buffer.length > 0) { + const batch = buffer.splice(0, batchSize) + await sendWithRetry(batch) + } + } + + function startFlush(): void { + if (activeFlush) return + activeFlush = drainBuffer().finally(() => { + activeFlush = null + if (buffer.length >= batchSize) { + startFlush() + } + else if (buffer.length > 0) { + scheduleFlush() + } + }) + } + + function push(ctx: T): void { + if (buffer.length >= maxBufferSize) { + const dropped = buffer.splice(0, 1) + onDropped?.(dropped) + } + buffer.push(ctx) + + if (buffer.length >= batchSize) { + clearTimer() + startFlush() + } + else if (!activeFlush) { + scheduleFlush() + } + } + + async function flush(): Promise { + clearTimer() + if (activeFlush) { + await activeFlush + } + await drainBuffer() + } + + const hookFn = push as PipelineDrainFn + hookFn.flush = flush + Object.defineProperty(hookFn, 'pending', { + get: () => buffer.length, + enumerable: true, + }) + + return hookFn + } +} diff --git a/packages/evlog/test/pipeline.test.ts b/packages/evlog/test/pipeline.test.ts new file mode 100644 index 0000000..5267b05 --- /dev/null +++ b/packages/evlog/test/pipeline.test.ts @@ -0,0 +1,341 @@ +import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest' +import type { DrainContext } from '../src/types' +import { createDrainPipeline } from '../src/pipeline' + +function createTestContext(id: number): DrainContext { + return { + event: { + timestamp: '2024-01-01T00:00:00.000Z', + level: 'info', + service: 'test', + environment: 'test', + id, + }, + } +} + +describe('createDrainPipeline', () => { + beforeEach(() => { + vi.useFakeTimers() + }) + + afterEach(() => { + vi.useRealTimers() + }) + + describe('batching by size', () => { + it('does not flush before batch size is reached', async () => { + const drain = vi.fn().mockResolvedValue(undefined) + const hook = createDrainPipeline({ batch: { size: 3, intervalMs: 60000 } })(drain) + + hook(createTestContext(1)) + hook(createTestContext(2)) + + await vi.advanceTimersByTimeAsync(0) + expect(drain).not.toHaveBeenCalled() + + await hook.flush() + }) + + it('flushes when batch size is reached', async () => { + const drain = vi.fn().mockResolvedValue(undefined) + const hook = createDrainPipeline({ batch: { size: 3, intervalMs: 60000 } })(drain) + + hook(createTestContext(1)) + hook(createTestContext(2)) + hook(createTestContext(3)) + + await vi.runAllTimersAsync() + + expect(drain).toHaveBeenCalledTimes(1) + const batch = drain.mock.calls[0]![0] as DrainContext[] + expect(batch).toHaveLength(3) + expect(batch[0]!.event.id).toBe(1) + expect(batch[1]!.event.id).toBe(2) + expect(batch[2]!.event.id).toBe(3) + }) + + it('splits into multiple batches', async () => { + const drain = vi.fn().mockResolvedValue(undefined) + const hook = createDrainPipeline({ batch: { size: 3 } })(drain) + + for (let i = 1; i <= 7; i++) hook(createTestContext(i)) + + await hook.flush() + + expect(drain).toHaveBeenCalledTimes(3) + expect((drain.mock.calls[0]![0] as DrainContext[])).toHaveLength(3) + expect((drain.mock.calls[1]![0] as DrainContext[])).toHaveLength(3) + expect((drain.mock.calls[2]![0] as DrainContext[])).toHaveLength(1) + }) + }) + + describe('batching by interval', () => { + it('flushes after interval expires', async () => { + const drain = vi.fn().mockResolvedValue(undefined) + const hook = createDrainPipeline({ batch: { size: 100, intervalMs: 5000 } })(drain) + + hook(createTestContext(1)) + hook(createTestContext(2)) + + await vi.advanceTimersByTimeAsync(4999) + expect(drain).not.toHaveBeenCalled() + + await vi.advanceTimersByTimeAsync(1) + expect(drain).toHaveBeenCalledTimes(1) + expect((drain.mock.calls[0]![0] as DrainContext[])).toHaveLength(2) + }) + + it('resets interval when batch size is reached', async () => { + const drain = vi.fn().mockResolvedValue(undefined) + const hook = createDrainPipeline({ batch: { size: 2, intervalMs: 5000 } })(drain) + + hook(createTestContext(1)) + hook(createTestContext(2)) + + // Batch size reached, should flush immediately + await vi.advanceTimersByTimeAsync(0) + expect(drain).toHaveBeenCalledTimes(1) + + // Push one more - should start interval timer, not flush immediately + hook(createTestContext(3)) + await vi.advanceTimersByTimeAsync(0) + expect(drain).toHaveBeenCalledTimes(1) + + // Wait for interval + await vi.advanceTimersByTimeAsync(5000) + expect(drain).toHaveBeenCalledTimes(2) + }) + }) + + describe('retry', () => { + it('retries on failure with exponential backoff', async () => { + const drain = vi.fn() + .mockRejectedValueOnce(new Error('fail 1')) + .mockRejectedValueOnce(new Error('fail 2')) + .mockResolvedValueOnce(undefined) + + const hook = createDrainPipeline({ + batch: { size: 1 }, + retry: { maxAttempts: 3, backoff: 'exponential', initialDelayMs: 100, maxDelayMs: 10000 }, + })(drain) + + hook(createTestContext(1)) + + // First attempt (immediate) + await vi.advanceTimersByTimeAsync(0) + expect(drain).toHaveBeenCalledTimes(1) + + // Wait for first retry delay (100ms * 2^0 = 100ms) + await vi.advanceTimersByTimeAsync(100) + expect(drain).toHaveBeenCalledTimes(2) + + // Wait for second retry delay (100ms * 2^1 = 200ms) + await vi.advanceTimersByTimeAsync(200) + expect(drain).toHaveBeenCalledTimes(3) + }) + + it('retries with linear backoff', async () => { + const drain = vi.fn() + .mockRejectedValueOnce(new Error('fail')) + .mockRejectedValueOnce(new Error('fail')) + .mockResolvedValueOnce(undefined) + + const hook = createDrainPipeline({ + batch: { size: 1 }, + retry: { maxAttempts: 3, backoff: 'linear', initialDelayMs: 100 }, + })(drain) + + hook(createTestContext(1)) + + await vi.advanceTimersByTimeAsync(0) + expect(drain).toHaveBeenCalledTimes(1) + + // Linear: 100 * 1 = 100ms + await vi.advanceTimersByTimeAsync(100) + expect(drain).toHaveBeenCalledTimes(2) + + // Linear: 100 * 2 = 200ms + await vi.advanceTimersByTimeAsync(200) + expect(drain).toHaveBeenCalledTimes(3) + }) + + it('retries with fixed backoff', async () => { + const drain = vi.fn() + .mockRejectedValueOnce(new Error('fail')) + .mockResolvedValueOnce(undefined) + + const hook = createDrainPipeline({ + batch: { size: 1 }, + retry: { maxAttempts: 2, backoff: 'fixed', initialDelayMs: 100 }, + })(drain) + + hook(createTestContext(1)) + + await vi.advanceTimersByTimeAsync(0) + expect(drain).toHaveBeenCalledTimes(1) + + // Fixed: always 100ms + await vi.advanceTimersByTimeAsync(100) + expect(drain).toHaveBeenCalledTimes(2) + }) + + it('caps retry delay at maxDelayMs', async () => { + const drain = vi.fn() + .mockRejectedValueOnce(new Error('fail')) + .mockResolvedValueOnce(undefined) + + const hook = createDrainPipeline({ + batch: { size: 1 }, + retry: { maxAttempts: 2, backoff: 'exponential', initialDelayMs: 1000, maxDelayMs: 500 }, + })(drain) + + hook(createTestContext(1)) + + await vi.advanceTimersByTimeAsync(0) + expect(drain).toHaveBeenCalledTimes(1) + + // Exponential: 1000 * 2^0 = 1000ms, capped to 500ms + await vi.advanceTimersByTimeAsync(500) + expect(drain).toHaveBeenCalledTimes(2) + }) + + it('calls onDropped after all retries exhausted', async () => { + const drain = vi.fn().mockRejectedValue(new Error('permanent failure')) + const onDropped = vi.fn() + + const hook = createDrainPipeline({ + batch: { size: 1 }, + retry: { maxAttempts: 2, backoff: 'fixed', initialDelayMs: 50 }, + onDropped, + })(drain) + + hook(createTestContext(1)) + + await vi.runAllTimersAsync() + + expect(drain).toHaveBeenCalledTimes(2) + expect(onDropped).toHaveBeenCalledTimes(1) + expect((onDropped.mock.calls[0]![0] as DrainContext[])).toHaveLength(1) + expect(onDropped.mock.calls[0]![1]).toBeInstanceOf(Error) + expect((onDropped.mock.calls[0]![1] as Error).message).toBe('permanent failure') + }) + }) + + describe('buffer overflow', () => { + it('drops oldest events and calls onDropped when buffer is full', async () => { + const drain = vi.fn().mockResolvedValue(undefined) + const onDropped = vi.fn() + + const hook = createDrainPipeline({ + batch: { size: 100, intervalMs: 60000 }, + maxBufferSize: 3, + onDropped, + })(drain) + + hook(createTestContext(1)) + hook(createTestContext(2)) + hook(createTestContext(3)) + expect(onDropped).not.toHaveBeenCalled() + + // Buffer full - should drop oldest + hook(createTestContext(4)) + expect(onDropped).toHaveBeenCalledTimes(1) + expect((onDropped.mock.calls[0]![0] as DrainContext[])).toHaveLength(1) + expect((onDropped.mock.calls[0]![0] as DrainContext[])[0]!.event.id).toBe(1) + + expect(hook.pending).toBe(3) + + // Flush and verify the remaining events are 2, 3, 4 + await hook.flush() + const batch = drain.mock.calls[0]![0] as DrainContext[] + expect(batch.map(c => c.event.id)).toEqual([2, 3, 4]) + }) + }) + + describe('flush()', () => { + it('drains all buffered events', async () => { + const drain = vi.fn().mockResolvedValue(undefined) + const hook = createDrainPipeline({ batch: { size: 100 } })(drain) + + hook(createTestContext(1)) + hook(createTestContext(2)) + hook(createTestContext(3)) + + await hook.flush() + + expect(drain).toHaveBeenCalledTimes(1) + expect((drain.mock.calls[0]![0] as DrainContext[])).toHaveLength(3) + expect(hook.pending).toBe(0) + }) + + it('is safe to call when buffer is empty', async () => { + const drain = vi.fn().mockResolvedValue(undefined) + const hook = createDrainPipeline()(drain) + + await hook.flush() + + expect(drain).not.toHaveBeenCalled() + }) + + it('flush drains events that arrived during active flush', async () => { + const drain = vi.fn().mockResolvedValue(undefined) + const hook = createDrainPipeline({ batch: { size: 2, intervalMs: 60000 } })(drain) + + // Push 2 events to trigger auto-flush + hook(createTestContext(1)) + hook(createTestContext(2)) + + // Push more while flush may be in progress + hook(createTestContext(3)) + + // Explicit flush should drain everything + await hook.flush() + + expect(hook.pending).toBe(0) + const allEvents = drain.mock.calls.flatMap(call => call[0] as DrainContext[]) + expect(allEvents).toHaveLength(3) + }) + }) + + describe('pending', () => { + it('returns current buffer size', () => { + const drain = vi.fn().mockResolvedValue(undefined) + const hook = createDrainPipeline({ batch: { size: 100, intervalMs: 60000 } })(drain) + + expect(hook.pending).toBe(0) + hook(createTestContext(1)) + expect(hook.pending).toBe(1) + hook(createTestContext(2)) + expect(hook.pending).toBe(2) + }) + + it('returns 0 after flush', async () => { + const drain = vi.fn().mockResolvedValue(undefined) + const hook = createDrainPipeline({ batch: { size: 100 } })(drain) + + hook(createTestContext(1)) + hook(createTestContext(2)) + expect(hook.pending).toBe(2) + + await hook.flush() + expect(hook.pending).toBe(0) + }) + }) + + describe('defaults', () => { + it('uses default options when none provided', async () => { + const drain = vi.fn().mockResolvedValue(undefined) + const hook = createDrainPipeline()(drain) + + hook(createTestContext(1)) + + // Default interval is 5000ms + await vi.advanceTimersByTimeAsync(4999) + expect(drain).not.toHaveBeenCalled() + + await vi.advanceTimersByTimeAsync(1) + expect(drain).toHaveBeenCalledTimes(1) + }) + }) +}) diff --git a/packages/evlog/tsdown.config.ts b/packages/evlog/tsdown.config.ts index 1198b3f..d57bc47 100644 --- a/packages/evlog/tsdown.config.ts +++ b/packages/evlog/tsdown.config.ts @@ -21,6 +21,7 @@ export default defineConfig({ 'adapters/posthog': 'src/adapters/posthog.ts', 'adapters/sentry': 'src/adapters/sentry.ts', 'enrichers': 'src/enrichers/index.ts', + 'pipeline': 'src/pipeline.ts', }, format: 'esm', dts: true, From 66d5d31c243c32b4576c0b6be86b08f496ece6d4 Mon Sep 17 00:00:00 2001 From: Hugo Richard Date: Mon, 9 Feb 2026 09:31:32 +0000 Subject: [PATCH 4/5] fix lint --- packages/evlog/src/pipeline.ts | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/packages/evlog/src/pipeline.ts b/packages/evlog/src/pipeline.ts index e720ac9..372c727 100644 --- a/packages/evlog/src/pipeline.ts +++ b/packages/evlog/src/pipeline.ts @@ -100,8 +100,7 @@ export function createDrainPipeline(options?: DrainPipelineOptions< try { await drain(batch) return - } - catch (error) { + } catch (error) { lastError = error instanceof Error ? error : new Error(String(error)) if (attempt < maxAttempts) { await new Promise(r => setTimeout(r, getRetryDelay(attempt))) @@ -124,8 +123,7 @@ export function createDrainPipeline(options?: DrainPipelineOptions< activeFlush = null if (buffer.length >= batchSize) { startFlush() - } - else if (buffer.length > 0) { + } else if (buffer.length > 0) { scheduleFlush() } }) @@ -141,8 +139,7 @@ export function createDrainPipeline(options?: DrainPipelineOptions< if (buffer.length >= batchSize) { clearTimer() startFlush() - } - else if (!activeFlush) { + } else if (!activeFlush) { scheduleFlush() } } From 11cb24d87d70ea0b346a057b1efeeaaad8609dcc Mon Sep 17 00:00:00 2001 From: Hugo Richard Date: Mon, 9 Feb 2026 19:52:41 +0000 Subject: [PATCH 5/5] fix from code review --- apps/playground/app/config/tests.config.ts | 2 +- packages/evlog/src/adapters/axiom.ts | 6 +++- packages/evlog/src/adapters/posthog.ts | 6 +++- packages/evlog/src/adapters/sentry.ts | 6 +++- packages/evlog/src/pipeline.ts | 25 +++++++++++++++-- packages/evlog/test/pipeline.test.ts | 32 +++++++++++++++++++++- 6 files changed, 70 insertions(+), 7 deletions(-) diff --git a/apps/playground/app/config/tests.config.ts b/apps/playground/app/config/tests.config.ts index f8ee015..28103af 100644 --- a/apps/playground/app/config/tests.config.ts +++ b/apps/playground/app/config/tests.config.ts @@ -270,7 +270,7 @@ export const testConfig = { label: 'Pipeline', icon: 'i-lucide-layers', title: 'Drain Pipeline (Batching + Retry)', - description: 'Events are buffered and sent in batches (size: 5, interval: 2s). Watch the terminal for "[evlog/pipeline] Flushing batch of N events" messages.', + description: 'Events are buffered and sent in batches (size: 5, interval: 2s). Watch the terminal for batched drain output.', layout: 'cards', tests: [ { diff --git a/packages/evlog/src/adapters/axiom.ts b/packages/evlog/src/adapters/axiom.ts index 54d6d6b..950548d 100644 --- a/packages/evlog/src/adapters/axiom.ts +++ b/packages/evlog/src/adapters/axiom.ts @@ -58,7 +58,11 @@ export function createAxiomDrain(overrides?: Partial): (ctx: DrainC return } - await sendBatchToAxiom(contexts.map(c => c.event), config as AxiomConfig) + try { + await sendBatchToAxiom(contexts.map(c => c.event), config as AxiomConfig) + } catch (error) { + console.error('[evlog/axiom] Failed to send events to Axiom:', error) + } } } diff --git a/packages/evlog/src/adapters/posthog.ts b/packages/evlog/src/adapters/posthog.ts index 97a7faf..32a87d6 100644 --- a/packages/evlog/src/adapters/posthog.ts +++ b/packages/evlog/src/adapters/posthog.ts @@ -85,7 +85,11 @@ export function createPostHogDrain(overrides?: Partial): (ctx: Dr return } - await sendBatchToPostHog(contexts.map(c => c.event), config as PostHogConfig) + try { + await sendBatchToPostHog(contexts.map(c => c.event), config as PostHogConfig) + } catch (error) { + console.error('[evlog/posthog] Failed to send events to PostHog:', error) + } } } diff --git a/packages/evlog/src/adapters/sentry.ts b/packages/evlog/src/adapters/sentry.ts index 2c85ad0..dd99c3f 100644 --- a/packages/evlog/src/adapters/sentry.ts +++ b/packages/evlog/src/adapters/sentry.ts @@ -236,7 +236,11 @@ export function createSentryDrain(overrides?: Partial): (ctx: Drai return } - await sendBatchToSentry(contexts.map(c => c.event), config as SentryConfig) + try { + await sendBatchToSentry(contexts.map(c => c.event), config as SentryConfig) + } catch (error) { + console.error('[evlog/sentry] Failed to send events to Sentry:', error) + } } } diff --git a/packages/evlog/src/pipeline.ts b/packages/evlog/src/pipeline.ts index 9dda98b..65ba28e 100644 --- a/packages/evlog/src/pipeline.ts +++ b/packages/evlog/src/pipeline.ts @@ -57,6 +57,25 @@ export function createDrainPipeline(options?: DrainPipelineOptions< const maxDelayMs = options?.retry?.maxDelayMs ?? 30000 const onDropped = options?.onDropped + if (batchSize <= 0 || !Number.isFinite(batchSize)) { + throw new Error(`[evlog/pipeline] batch.size must be a positive finite number, got: ${batchSize}`) + } + if (intervalMs <= 0 || !Number.isFinite(intervalMs)) { + throw new Error(`[evlog/pipeline] batch.intervalMs must be a positive finite number, got: ${intervalMs}`) + } + if (maxBufferSize <= 0 || !Number.isFinite(maxBufferSize)) { + throw new Error(`[evlog/pipeline] maxBufferSize must be a positive finite number, got: ${maxBufferSize}`) + } + if (maxAttempts <= 0 || !Number.isFinite(maxAttempts)) { + throw new Error(`[evlog/pipeline] retry.maxAttempts must be a positive finite number, got: ${maxAttempts}`) + } + if (initialDelayMs < 0 || !Number.isFinite(initialDelayMs)) { + throw new Error(`[evlog/pipeline] retry.initialDelayMs must be a non-negative finite number, got: ${initialDelayMs}`) + } + if (maxDelayMs < 0 || !Number.isFinite(maxDelayMs)) { + throw new Error(`[evlog/pipeline] retry.maxDelayMs must be a non-negative finite number, got: ${maxDelayMs}`) + } + return (drain: (batch: T[]) => void | Promise): PipelineDrainFn => { const buffer: T[] = [] let timer: ReturnType | null = null @@ -149,7 +168,9 @@ export function createDrainPipeline(options?: DrainPipelineOptions< if (activeFlush) { await activeFlush } - await drainBuffer() + while (buffer.length > 0) { + await drainBuffer() + } } const hookFn = push as PipelineDrainFn @@ -161,4 +182,4 @@ export function createDrainPipeline(options?: DrainPipelineOptions< return hookFn } -} \ No newline at end of file +} diff --git a/packages/evlog/test/pipeline.test.ts b/packages/evlog/test/pipeline.test.ts index 16b7dce..892f8e9 100644 --- a/packages/evlog/test/pipeline.test.ts +++ b/packages/evlog/test/pipeline.test.ts @@ -353,4 +353,34 @@ describe('createDrainPipeline', () => { expect(drain).toHaveBeenCalledTimes(1) }) }) -}) \ No newline at end of file + + describe('input validation', () => { + it('throws on batch.size <= 0', () => { + expect(() => createDrainPipeline({ batch: { size: 0 } })).toThrow('batch.size must be a positive finite number') + }) + + it('throws on batch.size = -1', () => { + expect(() => createDrainPipeline({ batch: { size: -1 } })).toThrow('batch.size must be a positive finite number') + }) + + it('throws on batch.intervalMs <= 0', () => { + expect(() => createDrainPipeline({ batch: { intervalMs: 0 } })).toThrow('batch.intervalMs must be a positive finite number') + }) + + it('throws on maxBufferSize <= 0', () => { + expect(() => createDrainPipeline({ maxBufferSize: 0 })).toThrow('maxBufferSize must be a positive finite number') + }) + + it('throws on retry.maxAttempts <= 0', () => { + expect(() => createDrainPipeline({ retry: { maxAttempts: 0 } })).toThrow('retry.maxAttempts must be a positive finite number') + }) + + it('throws on non-finite batch.size', () => { + expect(() => createDrainPipeline({ batch: { size: Infinity } })).toThrow('batch.size must be a positive finite number') + }) + + it('throws on NaN batch.size', () => { + expect(() => createDrainPipeline({ batch: { size: NaN } })).toThrow('batch.size must be a positive finite number') + }) + }) +})