Skip to content

feat: add shared drain pipeline for batching and retry#57

Draft
HugoRCD wants to merge 2 commits intomainfrom
feat/drain-pipeline
Draft

feat: add shared drain pipeline for batching and retry#57
HugoRCD wants to merge 2 commits intomainfrom
feat/drain-pipeline

Conversation

@HugoRCD
Copy link
Owner

@HugoRCD HugoRCD commented Feb 8, 2026

This pull request introduces a new batching and retry pipeline for event drains in the evlog package, enabling efficient, configurable buffering and delivery of events to external services. It updates the Axiom, PostHog, and Sentry adapters to support batch delivery, adds comprehensive tests for the new pipeline, and integrates the pipeline into the playground app for interactive testing.

@HugoRCD HugoRCD self-assigned this Feb 8, 2026
@vercel
Copy link

vercel bot commented Feb 8, 2026

The latest updates on your projects. Learn more about Vercel for GitHub.

Project Deployment Actions Updated (UTC)
evlog-docs Ready Ready Preview, Comment Feb 8, 2026 9:09pm

@HugoRCD HugoRCD requested a review from Copilot February 8, 2026 21:08
@github-actions
Copy link

github-actions bot commented Feb 8, 2026

Thank you for following the naming conventions! 🙏

@pkg-pr-new
Copy link

pkg-pr-new bot commented Feb 8, 2026

npm i https://pkg.pr.new/evlog@57

commit: 749936a

Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Adds a new shared drain “pipeline” to evlog to support buffered batching with retry semantics, and updates several drains and the playground to exercise batch delivery.

Changes:

  • Add createDrainPipeline (buffering + batch flush on size/interval + retry/backoff + overflow dropping) and export it as evlog/pipeline.
  • Update Axiom/PostHog/Sentry drains to accept batched contexts and send via new batch senders.
  • Add Vitest coverage for batching/retry/overflow/flush behaviors and add a playground UI section to trigger pipeline batching.

Reviewed changes

Copilot reviewed 9 out of 9 changed files in this pull request and generated 7 comments.

Show a summary per file
File Description
packages/evlog/tsdown.config.ts Adds pipeline build entry so it’s emitted to dist.
packages/evlog/src/pipeline.ts Introduces the batching/retry pipeline implementation and its public API.
packages/evlog/test/pipeline.test.ts Adds tests for batching, interval flush, retry strategies, buffer overflow, and flush/pending.
packages/evlog/src/adapters/axiom.ts Updates drain to accept batch inputs and uses batch sender.
packages/evlog/src/adapters/posthog.ts Updates drain to accept batch inputs and uses batch sender.
packages/evlog/src/adapters/sentry.ts Updates drain to accept batch inputs and uses batch sender.
packages/evlog/package.json Exposes ./pipeline via exports/typesVersions.
apps/playground/app/pages/index.vue Adds a UI action to fire 10 requests in parallel for batching.
apps/playground/app/config/tests.config.ts Adds a “Pipeline” test section to the playground UI.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +272 to +274
title: 'Drain Pipeline (Batching + Retry)',
description: 'Events are buffered and sent in batches (size: 5, interval: 2s). Watch the terminal for "[evlog/pipeline] Flushing batch of N events" messages.',
layout: 'cards',
Copy link

Copilot AI Feb 8, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The UI description says to watch for "[evlog/pipeline] Flushing batch of N events" logs, but there’s no corresponding log output in the new pipeline implementation. Either add the logging where batches are flushed, or adjust this description so it matches actual behavior.

Copilot uses AI. Check for mistakes.
Comment on lines +31 to +37
const batchSize = options?.batch?.size ?? 50
const intervalMs = options?.batch?.intervalMs ?? 5000
const maxBufferSize = options?.maxBufferSize ?? 1000
const maxAttempts = options?.retry?.maxAttempts ?? 3
const backoffStrategy = options?.retry?.backoff ?? 'exponential'
const initialDelayMs = options?.retry?.initialDelayMs ?? 1000
const maxDelayMs = options?.retry?.maxDelayMs ?? 30000
Copy link

Copilot AI Feb 8, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

batchSize, intervalMs, maxBufferSize, and retry options are used without validation. Certain values can break the pipeline (e.g., batch.size <= 0 makes drainBuffer() loop forever because splice(0, 0) never reduces the buffer; maxAttempts <= 0 skips sending entirely; maxBufferSize <= 0 still allows buffering). Add input validation/clamping (and ideally throw a clear error) for these options before constructing the pipeline.

Suggested change
const batchSize = options?.batch?.size ?? 50
const intervalMs = options?.batch?.intervalMs ?? 5000
const maxBufferSize = options?.maxBufferSize ?? 1000
const maxAttempts = options?.retry?.maxAttempts ?? 3
const backoffStrategy = options?.retry?.backoff ?? 'exponential'
const initialDelayMs = options?.retry?.initialDelayMs ?? 1000
const maxDelayMs = options?.retry?.maxDelayMs ?? 30000
const rawBatchSize = options?.batch?.size
if (rawBatchSize !== undefined) {
if (!Number.isFinite(rawBatchSize) || rawBatchSize <= 0) {
throw new Error(`DrainPipelineOptions.batch.size must be a positive finite number, got: ${rawBatchSize}`)
}
}
const batchSize = rawBatchSize ?? 50
const rawIntervalMs = options?.batch?.intervalMs
if (rawIntervalMs !== undefined) {
if (!Number.isFinite(rawIntervalMs) || rawIntervalMs <= 0) {
throw new Error(`DrainPipelineOptions.batch.intervalMs must be a positive finite number, got: ${rawIntervalMs}`)
}
}
const intervalMs = rawIntervalMs ?? 5000
const rawMaxBufferSize = options?.maxBufferSize
if (rawMaxBufferSize !== undefined) {
if (!Number.isFinite(rawMaxBufferSize) || rawMaxBufferSize <= 0) {
throw new Error(`DrainPipelineOptions.maxBufferSize must be a positive finite number, got: ${rawMaxBufferSize}`)
}
}
const maxBufferSize = rawMaxBufferSize ?? 1000
const rawMaxAttempts = options?.retry?.maxAttempts
if (rawMaxAttempts !== undefined) {
if (!Number.isFinite(rawMaxAttempts) || rawMaxAttempts <= 0) {
throw new Error(`DrainPipelineOptions.retry.maxAttempts must be a positive finite number, got: ${rawMaxAttempts}`)
}
}
const maxAttempts = rawMaxAttempts ?? 3
const backoffStrategy = options?.retry?.backoff ?? 'exponential'
const rawInitialDelayMs = options?.retry?.initialDelayMs
if (rawInitialDelayMs !== undefined) {
if (!Number.isFinite(rawInitialDelayMs) || rawInitialDelayMs < 0) {
throw new Error(`DrainPipelineOptions.retry.initialDelayMs must be a non-negative finite number, got: ${rawInitialDelayMs}`)
}
}
const initialDelayMs = rawInitialDelayMs ?? 1000
const rawMaxDelayMs = options?.retry?.maxDelayMs
if (rawMaxDelayMs !== undefined) {
if (!Number.isFinite(rawMaxDelayMs) || rawMaxDelayMs < 0) {
throw new Error(`DrainPipelineOptions.retry.maxDelayMs must be a non-negative finite number, got: ${rawMaxDelayMs}`)
}
}
const maxDelayMs = rawMaxDelayMs ?? 30000

Copilot uses AI. Check for mistakes.
Comment on lines +130 to +136
await activeFlush
}
await drainBuffer()
}

const hookFn = push as PipelineDrainFn<T>
hookFn.flush = flush
Copy link

Copilot AI Feb 8, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

flush() calls drainBuffer() directly without marking activeFlush. If events are pushed while flush() is awaiting drain(batch), push() will see !activeFlush and schedule a timer instead of coordinating with the in-progress flush, so flush() can resolve while new events remain buffered (breaking the expectation that flush() drains everything). Consider setting activeFlush during flush() (similar to startFlush()), and/or looping until buffer.length === 0 after awaiting current work.

Copilot uses AI. Check for mistakes.
Comment on lines +30 to +33
export function createDrainPipeline<T = unknown>(options?: DrainPipelineOptions<T>): (drain: (ctx: T | T[]) => void | Promise<void>) => PipelineDrainFn<T> {
const batchSize = options?.batch?.size ?? 50
const intervalMs = options?.batch?.intervalMs ?? 5000
const maxBufferSize = options?.maxBufferSize ?? 1000
Copy link

Copilot AI Feb 8, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The pipeline always invokes drain(batch) with an array, but the type signature accepts T | T[] and there’s no doc comment clarifying that drains should be batch-capable. This makes it easy for consumers to pass an existing single-event drain and either fail type-checking or get unexpected runtime input. Consider documenting that drain will be called with T[] (and optionally provide an overload/helper that adapts (ctx: T) => ... drains by wrapping them).

Copilot uses AI. Check for mistakes.
} catch (error) {
console.error('[evlog/axiom] Failed to send event:', error)
}
await sendBatchToAxiom(contexts.map(c => c.event), config as AxiomConfig)
Copy link

Copilot AI Feb 8, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

createAxiomDrain no longer catches and logs send errors; it now lets sendBatchToAxiom rejections propagate. This is inconsistent with the stated evlog:drain contract (errors are logged and never block the request) and with createOTLPDrain, which still wraps sendToOTLP in a try/catch. Consider restoring a try/catch here (or making error propagation an explicit opt-in) so hook errors don’t leak as unhandled promise rejections.

Suggested change
await sendBatchToAxiom(contexts.map(c => c.event), config as AxiomConfig)
try {
await sendBatchToAxiom(contexts.map(c => c.event), config as AxiomConfig)
} catch (error) {
console.error('[evlog/axiom] Failed to send logs to Axiom:', error)
}

Copilot uses AI. Check for mistakes.
} catch (error) {
console.error('[evlog/posthog] Failed to send event:', error)
}
await sendBatchToPostHog(contexts.map(c => c.event), config as PostHogConfig)
Copy link

Copilot AI Feb 8, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

createPostHogDrain now calls sendBatchToPostHog without a try/catch, so failures will reject the drain hook. This differs from createOTLPDrain (which catches/logs) and from the documented expectation that drain errors are logged and never block requests. Consider reintroducing error handling here (or making throwing behavior explicit/opt-in).

Suggested change
await sendBatchToPostHog(contexts.map(c => c.event), config as PostHogConfig)
try {
await sendBatchToPostHog(contexts.map(c => c.event), config as PostHogConfig)
} catch (error) {
console.error('[evlog/posthog] Failed to send events to PostHog:', error)
}

Copilot uses AI. Check for mistakes.
} catch (error) {
console.error('[evlog/sentry] Failed to send log:', error)
}
await sendBatchToSentry(contexts.map(c => c.event), config as SentryConfig)
Copy link

Copilot AI Feb 8, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

createSentryDrain no longer catches and logs send failures; sendBatchToSentry throws on non-2xx responses, which will reject the drain hook. This is inconsistent with createOTLPDrain and with the evlog:drain documentation that errors are logged and never block the request. Consider restoring try/catch logging (or an explicit configuration flag to control throwing).

Suggested change
await sendBatchToSentry(contexts.map(c => c.event), config as SentryConfig)
try {
await sendBatchToSentry(contexts.map(c => c.event), config as SentryConfig)
} catch (error) {
console.error('[evlog/sentry] Failed to send logs to Sentry:', error)
}

Copilot uses AI. Check for mistakes.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant