Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 38 additions & 0 deletions apps/playground/app/config/tests.config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,44 @@ export const testConfig = {
},
],
} as TestSection,
{
id: 'pipeline',
label: 'Pipeline',
icon: 'i-lucide-layers',
title: 'Drain Pipeline (Batching + Retry)',
description: 'Events are buffered and sent in batches (size: 5, interval: 2s). Watch the terminal for "[evlog/pipeline] Flushing batch of N events" messages.',
layout: 'cards',
Comment on lines +272 to +274
Copy link

Copilot AI Feb 8, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The UI description says to watch for "[evlog/pipeline] Flushing batch of N events" logs, but there’s no corresponding log output in the new pipeline implementation. Either add the logging where batches are flushed, or adjust this description so it matches actual behavior.

Copilot uses AI. Check for mistakes.
tests: [
{
id: 'pipeline-single',
label: '1 Request',
description: 'Single event - buffered until batch size (5) or interval (2s) is reached',
endpoint: '/api/test/success',
method: 'GET',
badge: {
label: 'Buffered',
color: 'blue',
},
toastOnSuccess: {
title: 'Event buffered',
description: 'Check terminal - will flush after 2s or when 5 events accumulate',
},
},
{
id: 'pipeline-batch',
label: 'Fire 10 Requests',
description: 'Fires 10 requests in parallel - should produce 2 batches of 5 events',
badge: {
label: '2 batches',
color: 'green',
},
toastOnSuccess: {
title: '10 requests sent',
description: 'Check terminal - should see 2 batches of 5 events',
},
},
],
} as TestSection,
{
id: 'drains',
label: 'Drains',
Expand Down
9 changes: 9 additions & 0 deletions apps/playground/app/pages/index.vue
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,12 @@ async function handleBatchRequest() {
)
}

async function handlePipelineBatch() {
await Promise.all(
Array.from({ length: 10 }, () => $fetch('/api/test/success')),
)
}

// Get custom onClick for specific tests
function getOnClick(testId: string) {
if (testId === 'structured-error-toast') {
Expand All @@ -47,6 +53,9 @@ function getOnClick(testId: string) {
if (testId === 'tail-fast-batch') {
return handleBatchRequest
}
if (testId === 'pipeline-batch') {
return handlePipelineBatch
}
return undefined
}
</script>
Expand Down
7 changes: 7 additions & 0 deletions packages/evlog/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,10 @@
"./enrichers": {
"types": "./dist/enrichers.d.mts",
"import": "./dist/enrichers.mjs"
},
"./pipeline": {
"types": "./dist/pipeline.d.mts",
"import": "./dist/pipeline.mjs"
}
},
"main": "./dist/index.mjs",
Expand Down Expand Up @@ -90,6 +94,9 @@
],
"enrichers": [
"./dist/enrichers.d.mts"
],
"pipeline": [
"./dist/pipeline.d.mts"
]
}
},
Expand Down
13 changes: 6 additions & 7 deletions packages/evlog/src/adapters/axiom.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,11 @@ export interface AxiomConfig {
* }))
* ```
*/
export function createAxiomDrain(overrides?: Partial<AxiomConfig>): (ctx: DrainContext) => Promise<void> {
return async (ctx: DrainContext) => {
export function createAxiomDrain(overrides?: Partial<AxiomConfig>): (ctx: DrainContext | DrainContext[]) => Promise<void> {
return async (ctx: DrainContext | DrainContext[]) => {
const contexts = Array.isArray(ctx) ? ctx : [ctx]
if (contexts.length === 0) return

const runtimeConfig = getRuntimeConfig()
// Support both runtimeConfig.evlog.axiom and runtimeConfig.axiom
const evlogAxiom = runtimeConfig?.evlog?.axiom
Expand All @@ -55,11 +58,7 @@ export function createAxiomDrain(overrides?: Partial<AxiomConfig>): (ctx: DrainC
return
}

try {
await sendToAxiom(ctx.event, config as AxiomConfig)
} catch (error) {
console.error('[evlog/axiom] Failed to send event:', error)
}
await sendBatchToAxiom(contexts.map(c => c.event), config as AxiomConfig)
Copy link

Copilot AI Feb 8, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

createAxiomDrain no longer catches and logs send errors; it now lets sendBatchToAxiom rejections propagate. This is inconsistent with the stated evlog:drain contract (errors are logged and never block the request) and with createOTLPDrain, which still wraps sendToOTLP in a try/catch. Consider restoring a try/catch here (or making error propagation an explicit opt-in) so hook errors don’t leak as unhandled promise rejections.

Suggested change
await sendBatchToAxiom(contexts.map(c => c.event), config as AxiomConfig)
try {
await sendBatchToAxiom(contexts.map(c => c.event), config as AxiomConfig)
} catch (error) {
console.error('[evlog/axiom] Failed to send logs to Axiom:', error)
}

Copilot uses AI. Check for mistakes.
}
}

Expand Down
13 changes: 6 additions & 7 deletions packages/evlog/src/adapters/posthog.ts
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,11 @@ export function toPostHogEvent(event: WideEvent, config: PostHogConfig): PostHog
* }))
* ```
*/
export function createPostHogDrain(overrides?: Partial<PostHogConfig>): (ctx: DrainContext) => Promise<void> {
return async (ctx: DrainContext) => {
export function createPostHogDrain(overrides?: Partial<PostHogConfig>): (ctx: DrainContext | DrainContext[]) => Promise<void> {
return async (ctx: DrainContext | DrainContext[]) => {
const contexts = Array.isArray(ctx) ? ctx : [ctx]
if (contexts.length === 0) return

const runtimeConfig = getRuntimeConfig()
// Support both runtimeConfig.evlog.posthog and runtimeConfig.posthog
const evlogPostHog = runtimeConfig?.evlog?.posthog
Expand All @@ -82,11 +85,7 @@ export function createPostHogDrain(overrides?: Partial<PostHogConfig>): (ctx: Dr
return
}

try {
await sendToPostHog(ctx.event, config as PostHogConfig)
} catch (error) {
console.error('[evlog/posthog] Failed to send event:', error)
}
await sendBatchToPostHog(contexts.map(c => c.event), config as PostHogConfig)
Copy link

Copilot AI Feb 8, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

createPostHogDrain now calls sendBatchToPostHog without a try/catch, so failures will reject the drain hook. This differs from createOTLPDrain (which catches/logs) and from the documented expectation that drain errors are logged and never block requests. Consider reintroducing error handling here (or making throwing behavior explicit/opt-in).

Suggested change
await sendBatchToPostHog(contexts.map(c => c.event), config as PostHogConfig)
try {
await sendBatchToPostHog(contexts.map(c => c.event), config as PostHogConfig)
} catch (error) {
console.error('[evlog/posthog] Failed to send events to PostHog:', error)
}

Copilot uses AI. Check for mistakes.
}
}

Expand Down
13 changes: 6 additions & 7 deletions packages/evlog/src/adapters/sentry.ts
Original file line number Diff line number Diff line change
Expand Up @@ -214,8 +214,11 @@ function buildEnvelopeBody(logs: SentryLog[], dsn: string): string {
* }))
* ```
*/
export function createSentryDrain(overrides?: Partial<SentryConfig>): (ctx: DrainContext) => Promise<void> {
return async (ctx: DrainContext) => {
export function createSentryDrain(overrides?: Partial<SentryConfig>): (ctx: DrainContext | DrainContext[]) => Promise<void> {
return async (ctx: DrainContext | DrainContext[]) => {
const contexts = Array.isArray(ctx) ? ctx : [ctx]
if (contexts.length === 0) return

const runtimeConfig = getRuntimeConfig()
const evlogSentry = runtimeConfig?.evlog?.sentry
const rootSentry = runtimeConfig?.sentry
Expand All @@ -233,11 +236,7 @@ export function createSentryDrain(overrides?: Partial<SentryConfig>): (ctx: Drai
return
}

try {
await sendToSentry(ctx.event, config as SentryConfig)
} catch (error) {
console.error('[evlog/sentry] Failed to send log:', error)
}
await sendBatchToSentry(contexts.map(c => c.event), config as SentryConfig)
Copy link

Copilot AI Feb 8, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

createSentryDrain no longer catches and logs send failures; sendBatchToSentry throws on non-2xx responses, which will reject the drain hook. This is inconsistent with createOTLPDrain and with the evlog:drain documentation that errors are logged and never block the request. Consider restoring try/catch logging (or an explicit configuration flag to control throwing).

Suggested change
await sendBatchToSentry(contexts.map(c => c.event), config as SentryConfig)
try {
await sendBatchToSentry(contexts.map(c => c.event), config as SentryConfig)
} catch (error) {
console.error('[evlog/sentry] Failed to send logs to Sentry:', error)
}

Copilot uses AI. Check for mistakes.
}
}

Expand Down
144 changes: 144 additions & 0 deletions packages/evlog/src/pipeline.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
export interface DrainPipelineOptions<T = unknown> {
batch?: {
/** Default: 50 */
size?: number
/** Default: 5000 */
intervalMs?: number
}
retry?: {
/** Including the first attempt. Default: 3 */
maxAttempts?: number
/** Default: 'exponential' */
backoff?: 'exponential' | 'linear' | 'fixed'
/** Default: 1000 */
initialDelayMs?: number
/** Default: 30000 */
maxDelayMs?: number
}
/** Default: 1000 */
maxBufferSize?: number
onDropped?: (events: T[], error?: Error) => void
}

export interface PipelineDrainFn<T> {
(ctx: T): void
/** Flush all buffered events. Call on server shutdown. */
flush: () => Promise<void>
readonly pending: number
}

export function createDrainPipeline<T = unknown>(options?: DrainPipelineOptions<T>): (drain: (ctx: T | T[]) => void | Promise<void>) => PipelineDrainFn<T> {
const batchSize = options?.batch?.size ?? 50
const intervalMs = options?.batch?.intervalMs ?? 5000
const maxBufferSize = options?.maxBufferSize ?? 1000
Comment on lines +30 to +33
Copy link

Copilot AI Feb 8, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The pipeline always invokes drain(batch) with an array, but the type signature accepts T | T[] and there’s no doc comment clarifying that drains should be batch-capable. This makes it easy for consumers to pass an existing single-event drain and either fail type-checking or get unexpected runtime input. Consider documenting that drain will be called with T[] (and optionally provide an overload/helper that adapts (ctx: T) => ... drains by wrapping them).

Copilot uses AI. Check for mistakes.
const maxAttempts = options?.retry?.maxAttempts ?? 3
const backoffStrategy = options?.retry?.backoff ?? 'exponential'
const initialDelayMs = options?.retry?.initialDelayMs ?? 1000
const maxDelayMs = options?.retry?.maxDelayMs ?? 30000
Comment on lines +31 to +37
Copy link

Copilot AI Feb 8, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

batchSize, intervalMs, maxBufferSize, and retry options are used without validation. Certain values can break the pipeline (e.g., batch.size <= 0 makes drainBuffer() loop forever because splice(0, 0) never reduces the buffer; maxAttempts <= 0 skips sending entirely; maxBufferSize <= 0 still allows buffering). Add input validation/clamping (and ideally throw a clear error) for these options before constructing the pipeline.

Suggested change
const batchSize = options?.batch?.size ?? 50
const intervalMs = options?.batch?.intervalMs ?? 5000
const maxBufferSize = options?.maxBufferSize ?? 1000
const maxAttempts = options?.retry?.maxAttempts ?? 3
const backoffStrategy = options?.retry?.backoff ?? 'exponential'
const initialDelayMs = options?.retry?.initialDelayMs ?? 1000
const maxDelayMs = options?.retry?.maxDelayMs ?? 30000
const rawBatchSize = options?.batch?.size
if (rawBatchSize !== undefined) {
if (!Number.isFinite(rawBatchSize) || rawBatchSize <= 0) {
throw new Error(`DrainPipelineOptions.batch.size must be a positive finite number, got: ${rawBatchSize}`)
}
}
const batchSize = rawBatchSize ?? 50
const rawIntervalMs = options?.batch?.intervalMs
if (rawIntervalMs !== undefined) {
if (!Number.isFinite(rawIntervalMs) || rawIntervalMs <= 0) {
throw new Error(`DrainPipelineOptions.batch.intervalMs must be a positive finite number, got: ${rawIntervalMs}`)
}
}
const intervalMs = rawIntervalMs ?? 5000
const rawMaxBufferSize = options?.maxBufferSize
if (rawMaxBufferSize !== undefined) {
if (!Number.isFinite(rawMaxBufferSize) || rawMaxBufferSize <= 0) {
throw new Error(`DrainPipelineOptions.maxBufferSize must be a positive finite number, got: ${rawMaxBufferSize}`)
}
}
const maxBufferSize = rawMaxBufferSize ?? 1000
const rawMaxAttempts = options?.retry?.maxAttempts
if (rawMaxAttempts !== undefined) {
if (!Number.isFinite(rawMaxAttempts) || rawMaxAttempts <= 0) {
throw new Error(`DrainPipelineOptions.retry.maxAttempts must be a positive finite number, got: ${rawMaxAttempts}`)
}
}
const maxAttempts = rawMaxAttempts ?? 3
const backoffStrategy = options?.retry?.backoff ?? 'exponential'
const rawInitialDelayMs = options?.retry?.initialDelayMs
if (rawInitialDelayMs !== undefined) {
if (!Number.isFinite(rawInitialDelayMs) || rawInitialDelayMs < 0) {
throw new Error(`DrainPipelineOptions.retry.initialDelayMs must be a non-negative finite number, got: ${rawInitialDelayMs}`)
}
}
const initialDelayMs = rawInitialDelayMs ?? 1000
const rawMaxDelayMs = options?.retry?.maxDelayMs
if (rawMaxDelayMs !== undefined) {
if (!Number.isFinite(rawMaxDelayMs) || rawMaxDelayMs < 0) {
throw new Error(`DrainPipelineOptions.retry.maxDelayMs must be a non-negative finite number, got: ${rawMaxDelayMs}`)
}
}
const maxDelayMs = rawMaxDelayMs ?? 30000

Copilot uses AI. Check for mistakes.
const onDropped = options?.onDropped

return (drain: (ctx: T | T[]) => void | Promise<void>): PipelineDrainFn<T> => {
const buffer: T[] = []
let timer: ReturnType<typeof setTimeout> | null = null
let activeFlush: Promise<void> | null = null

function clearTimer(): void {
if (timer !== null) {
clearTimeout(timer)
timer = null
}
}

function scheduleFlush(): void {
if (timer !== null || activeFlush) return
timer = setTimeout(() => {
timer = null
if (!activeFlush) startFlush()
}, intervalMs)
}

function getRetryDelay(attempt: number): number {
let delay: number
switch (backoffStrategy) {
case 'linear':
delay = initialDelayMs * attempt
break
case 'fixed':
delay = initialDelayMs
break
case 'exponential':
default:
delay = initialDelayMs * 2 ** (attempt - 1)
break
}
return Math.min(delay, maxDelayMs)
}

async function sendWithRetry(batch: T[]): Promise<void> {
let lastError: Error | undefined
for (let attempt = 1; attempt <= maxAttempts; attempt++) {
try {
await drain(batch)
return
} catch (error) {
lastError = error instanceof Error ? error : new Error(String(error))
if (attempt < maxAttempts) {
await new Promise<void>(r => setTimeout(r, getRetryDelay(attempt)))
}
}
}
onDropped?.(batch, lastError)
}

async function drainBuffer(): Promise<void> {
while (buffer.length > 0) {
const batch = buffer.splice(0, batchSize)
await sendWithRetry(batch)
}
}

function startFlush(): void {
if (activeFlush) return
activeFlush = drainBuffer().finally(() => {
activeFlush = null
if (buffer.length >= batchSize) {
startFlush()
} else if (buffer.length > 0) {
scheduleFlush()
}
})
}

function push(ctx: T): void {
if (buffer.length >= maxBufferSize) {
const dropped = buffer.splice(0, 1)
onDropped?.(dropped)
}
buffer.push(ctx)

if (buffer.length >= batchSize) {
clearTimer()
startFlush()
} else if (!activeFlush) {
scheduleFlush()
}
}

async function flush(): Promise<void> {
clearTimer()
if (activeFlush) {
await activeFlush
}
await drainBuffer()
}

const hookFn = push as PipelineDrainFn<T>
hookFn.flush = flush
Comment on lines +130 to +136
Copy link

Copilot AI Feb 8, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

flush() calls drainBuffer() directly without marking activeFlush. If events are pushed while flush() is awaiting drain(batch), push() will see !activeFlush and schedule a timer instead of coordinating with the in-progress flush, so flush() can resolve while new events remain buffered (breaking the expectation that flush() drains everything). Consider setting activeFlush during flush() (similar to startFlush()), and/or looping until buffer.length === 0 after awaiting current work.

Copilot uses AI. Check for mistakes.
Object.defineProperty(hookFn, 'pending', {
get: () => buffer.length,
enumerable: true,
})

return hookFn
}
}
Loading
Loading