feat: add shared drain pipeline for batching and retry#57
feat: add shared drain pipeline for batching and retry#57
Conversation
|
The latest updates on your projects. Learn more about Vercel for GitHub.
|
|
Thank you for following the naming conventions! 🙏 |
commit: |
There was a problem hiding this comment.
Pull request overview
Adds a new shared drain “pipeline” to evlog to support buffered batching with retry semantics, and updates several drains and the playground to exercise batch delivery.
Changes:
- Add
createDrainPipeline(buffering + batch flush on size/interval + retry/backoff + overflow dropping) and export it asevlog/pipeline. - Update Axiom/PostHog/Sentry drains to accept batched contexts and send via new batch senders.
- Add Vitest coverage for batching/retry/overflow/flush behaviors and add a playground UI section to trigger pipeline batching.
Reviewed changes
Copilot reviewed 9 out of 9 changed files in this pull request and generated 7 comments.
Show a summary per file
| File | Description |
|---|---|
| packages/evlog/tsdown.config.ts | Adds pipeline build entry so it’s emitted to dist. |
| packages/evlog/src/pipeline.ts | Introduces the batching/retry pipeline implementation and its public API. |
| packages/evlog/test/pipeline.test.ts | Adds tests for batching, interval flush, retry strategies, buffer overflow, and flush/pending. |
| packages/evlog/src/adapters/axiom.ts | Updates drain to accept batch inputs and uses batch sender. |
| packages/evlog/src/adapters/posthog.ts | Updates drain to accept batch inputs and uses batch sender. |
| packages/evlog/src/adapters/sentry.ts | Updates drain to accept batch inputs and uses batch sender. |
| packages/evlog/package.json | Exposes ./pipeline via exports/typesVersions. |
| apps/playground/app/pages/index.vue | Adds a UI action to fire 10 requests in parallel for batching. |
| apps/playground/app/config/tests.config.ts | Adds a “Pipeline” test section to the playground UI. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| title: 'Drain Pipeline (Batching + Retry)', | ||
| description: 'Events are buffered and sent in batches (size: 5, interval: 2s). Watch the terminal for "[evlog/pipeline] Flushing batch of N events" messages.', | ||
| layout: 'cards', |
There was a problem hiding this comment.
The UI description says to watch for "[evlog/pipeline] Flushing batch of N events" logs, but there’s no corresponding log output in the new pipeline implementation. Either add the logging where batches are flushed, or adjust this description so it matches actual behavior.
| const batchSize = options?.batch?.size ?? 50 | ||
| const intervalMs = options?.batch?.intervalMs ?? 5000 | ||
| const maxBufferSize = options?.maxBufferSize ?? 1000 | ||
| const maxAttempts = options?.retry?.maxAttempts ?? 3 | ||
| const backoffStrategy = options?.retry?.backoff ?? 'exponential' | ||
| const initialDelayMs = options?.retry?.initialDelayMs ?? 1000 | ||
| const maxDelayMs = options?.retry?.maxDelayMs ?? 30000 |
There was a problem hiding this comment.
batchSize, intervalMs, maxBufferSize, and retry options are used without validation. Certain values can break the pipeline (e.g., batch.size <= 0 makes drainBuffer() loop forever because splice(0, 0) never reduces the buffer; maxAttempts <= 0 skips sending entirely; maxBufferSize <= 0 still allows buffering). Add input validation/clamping (and ideally throw a clear error) for these options before constructing the pipeline.
| const batchSize = options?.batch?.size ?? 50 | |
| const intervalMs = options?.batch?.intervalMs ?? 5000 | |
| const maxBufferSize = options?.maxBufferSize ?? 1000 | |
| const maxAttempts = options?.retry?.maxAttempts ?? 3 | |
| const backoffStrategy = options?.retry?.backoff ?? 'exponential' | |
| const initialDelayMs = options?.retry?.initialDelayMs ?? 1000 | |
| const maxDelayMs = options?.retry?.maxDelayMs ?? 30000 | |
| const rawBatchSize = options?.batch?.size | |
| if (rawBatchSize !== undefined) { | |
| if (!Number.isFinite(rawBatchSize) || rawBatchSize <= 0) { | |
| throw new Error(`DrainPipelineOptions.batch.size must be a positive finite number, got: ${rawBatchSize}`) | |
| } | |
| } | |
| const batchSize = rawBatchSize ?? 50 | |
| const rawIntervalMs = options?.batch?.intervalMs | |
| if (rawIntervalMs !== undefined) { | |
| if (!Number.isFinite(rawIntervalMs) || rawIntervalMs <= 0) { | |
| throw new Error(`DrainPipelineOptions.batch.intervalMs must be a positive finite number, got: ${rawIntervalMs}`) | |
| } | |
| } | |
| const intervalMs = rawIntervalMs ?? 5000 | |
| const rawMaxBufferSize = options?.maxBufferSize | |
| if (rawMaxBufferSize !== undefined) { | |
| if (!Number.isFinite(rawMaxBufferSize) || rawMaxBufferSize <= 0) { | |
| throw new Error(`DrainPipelineOptions.maxBufferSize must be a positive finite number, got: ${rawMaxBufferSize}`) | |
| } | |
| } | |
| const maxBufferSize = rawMaxBufferSize ?? 1000 | |
| const rawMaxAttempts = options?.retry?.maxAttempts | |
| if (rawMaxAttempts !== undefined) { | |
| if (!Number.isFinite(rawMaxAttempts) || rawMaxAttempts <= 0) { | |
| throw new Error(`DrainPipelineOptions.retry.maxAttempts must be a positive finite number, got: ${rawMaxAttempts}`) | |
| } | |
| } | |
| const maxAttempts = rawMaxAttempts ?? 3 | |
| const backoffStrategy = options?.retry?.backoff ?? 'exponential' | |
| const rawInitialDelayMs = options?.retry?.initialDelayMs | |
| if (rawInitialDelayMs !== undefined) { | |
| if (!Number.isFinite(rawInitialDelayMs) || rawInitialDelayMs < 0) { | |
| throw new Error(`DrainPipelineOptions.retry.initialDelayMs must be a non-negative finite number, got: ${rawInitialDelayMs}`) | |
| } | |
| } | |
| const initialDelayMs = rawInitialDelayMs ?? 1000 | |
| const rawMaxDelayMs = options?.retry?.maxDelayMs | |
| if (rawMaxDelayMs !== undefined) { | |
| if (!Number.isFinite(rawMaxDelayMs) || rawMaxDelayMs < 0) { | |
| throw new Error(`DrainPipelineOptions.retry.maxDelayMs must be a non-negative finite number, got: ${rawMaxDelayMs}`) | |
| } | |
| } | |
| const maxDelayMs = rawMaxDelayMs ?? 30000 |
| await activeFlush | ||
| } | ||
| await drainBuffer() | ||
| } | ||
|
|
||
| const hookFn = push as PipelineDrainFn<T> | ||
| hookFn.flush = flush |
There was a problem hiding this comment.
flush() calls drainBuffer() directly without marking activeFlush. If events are pushed while flush() is awaiting drain(batch), push() will see !activeFlush and schedule a timer instead of coordinating with the in-progress flush, so flush() can resolve while new events remain buffered (breaking the expectation that flush() drains everything). Consider setting activeFlush during flush() (similar to startFlush()), and/or looping until buffer.length === 0 after awaiting current work.
| export function createDrainPipeline<T = unknown>(options?: DrainPipelineOptions<T>): (drain: (ctx: T | T[]) => void | Promise<void>) => PipelineDrainFn<T> { | ||
| const batchSize = options?.batch?.size ?? 50 | ||
| const intervalMs = options?.batch?.intervalMs ?? 5000 | ||
| const maxBufferSize = options?.maxBufferSize ?? 1000 |
There was a problem hiding this comment.
The pipeline always invokes drain(batch) with an array, but the type signature accepts T | T[] and there’s no doc comment clarifying that drains should be batch-capable. This makes it easy for consumers to pass an existing single-event drain and either fail type-checking or get unexpected runtime input. Consider documenting that drain will be called with T[] (and optionally provide an overload/helper that adapts (ctx: T) => ... drains by wrapping them).
| } catch (error) { | ||
| console.error('[evlog/axiom] Failed to send event:', error) | ||
| } | ||
| await sendBatchToAxiom(contexts.map(c => c.event), config as AxiomConfig) |
There was a problem hiding this comment.
createAxiomDrain no longer catches and logs send errors; it now lets sendBatchToAxiom rejections propagate. This is inconsistent with the stated evlog:drain contract (errors are logged and never block the request) and with createOTLPDrain, which still wraps sendToOTLP in a try/catch. Consider restoring a try/catch here (or making error propagation an explicit opt-in) so hook errors don’t leak as unhandled promise rejections.
| await sendBatchToAxiom(contexts.map(c => c.event), config as AxiomConfig) | |
| try { | |
| await sendBatchToAxiom(contexts.map(c => c.event), config as AxiomConfig) | |
| } catch (error) { | |
| console.error('[evlog/axiom] Failed to send logs to Axiom:', error) | |
| } |
| } catch (error) { | ||
| console.error('[evlog/posthog] Failed to send event:', error) | ||
| } | ||
| await sendBatchToPostHog(contexts.map(c => c.event), config as PostHogConfig) |
There was a problem hiding this comment.
createPostHogDrain now calls sendBatchToPostHog without a try/catch, so failures will reject the drain hook. This differs from createOTLPDrain (which catches/logs) and from the documented expectation that drain errors are logged and never block requests. Consider reintroducing error handling here (or making throwing behavior explicit/opt-in).
| await sendBatchToPostHog(contexts.map(c => c.event), config as PostHogConfig) | |
| try { | |
| await sendBatchToPostHog(contexts.map(c => c.event), config as PostHogConfig) | |
| } catch (error) { | |
| console.error('[evlog/posthog] Failed to send events to PostHog:', error) | |
| } |
| } catch (error) { | ||
| console.error('[evlog/sentry] Failed to send log:', error) | ||
| } | ||
| await sendBatchToSentry(contexts.map(c => c.event), config as SentryConfig) |
There was a problem hiding this comment.
createSentryDrain no longer catches and logs send failures; sendBatchToSentry throws on non-2xx responses, which will reject the drain hook. This is inconsistent with createOTLPDrain and with the evlog:drain documentation that errors are logged and never block the request. Consider restoring try/catch logging (or an explicit configuration flag to control throwing).
| await sendBatchToSentry(contexts.map(c => c.event), config as SentryConfig) | |
| try { | |
| await sendBatchToSentry(contexts.map(c => c.event), config as SentryConfig) | |
| } catch (error) { | |
| console.error('[evlog/sentry] Failed to send logs to Sentry:', error) | |
| } |
This pull request introduces a new batching and retry pipeline for event drains in the
evlogpackage, enabling efficient, configurable buffering and delivery of events to external services. It updates the Axiom, PostHog, and Sentry adapters to support batch delivery, adds comprehensive tests for the new pipeline, and integrates the pipeline into the playground app for interactive testing.