-
-
Notifications
You must be signed in to change notification settings - Fork 9
feat: add shared drain pipeline for batching and retry #57
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -34,8 +34,11 @@ export interface AxiomConfig { | |||||||||||||
| * })) | ||||||||||||||
| * ``` | ||||||||||||||
| */ | ||||||||||||||
| export function createAxiomDrain(overrides?: Partial<AxiomConfig>): (ctx: DrainContext) => Promise<void> { | ||||||||||||||
| return async (ctx: DrainContext) => { | ||||||||||||||
| export function createAxiomDrain(overrides?: Partial<AxiomConfig>): (ctx: DrainContext | DrainContext[]) => Promise<void> { | ||||||||||||||
| return async (ctx: DrainContext | DrainContext[]) => { | ||||||||||||||
| const contexts = Array.isArray(ctx) ? ctx : [ctx] | ||||||||||||||
| if (contexts.length === 0) return | ||||||||||||||
|
|
||||||||||||||
| const runtimeConfig = getRuntimeConfig() | ||||||||||||||
| // Support both runtimeConfig.evlog.axiom and runtimeConfig.axiom | ||||||||||||||
| const evlogAxiom = runtimeConfig?.evlog?.axiom | ||||||||||||||
|
|
@@ -55,11 +58,7 @@ export function createAxiomDrain(overrides?: Partial<AxiomConfig>): (ctx: DrainC | |||||||||||||
| return | ||||||||||||||
| } | ||||||||||||||
|
|
||||||||||||||
| try { | ||||||||||||||
| await sendToAxiom(ctx.event, config as AxiomConfig) | ||||||||||||||
| } catch (error) { | ||||||||||||||
| console.error('[evlog/axiom] Failed to send event:', error) | ||||||||||||||
| } | ||||||||||||||
| await sendBatchToAxiom(contexts.map(c => c.event), config as AxiomConfig) | ||||||||||||||
|
||||||||||||||
| await sendBatchToAxiom(contexts.map(c => c.event), config as AxiomConfig) | |
| try { | |
| await sendBatchToAxiom(contexts.map(c => c.event), config as AxiomConfig) | |
| } catch (error) { | |
| console.error('[evlog/axiom] Failed to send logs to Axiom:', error) | |
| } |
| Original file line number | Diff line number | Diff line change | ||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -61,8 +61,11 @@ export function toPostHogEvent(event: WideEvent, config: PostHogConfig): PostHog | |||||||||||||
| * })) | ||||||||||||||
| * ``` | ||||||||||||||
| */ | ||||||||||||||
| export function createPostHogDrain(overrides?: Partial<PostHogConfig>): (ctx: DrainContext) => Promise<void> { | ||||||||||||||
| return async (ctx: DrainContext) => { | ||||||||||||||
| export function createPostHogDrain(overrides?: Partial<PostHogConfig>): (ctx: DrainContext | DrainContext[]) => Promise<void> { | ||||||||||||||
| return async (ctx: DrainContext | DrainContext[]) => { | ||||||||||||||
| const contexts = Array.isArray(ctx) ? ctx : [ctx] | ||||||||||||||
| if (contexts.length === 0) return | ||||||||||||||
|
|
||||||||||||||
| const runtimeConfig = getRuntimeConfig() | ||||||||||||||
| // Support both runtimeConfig.evlog.posthog and runtimeConfig.posthog | ||||||||||||||
| const evlogPostHog = runtimeConfig?.evlog?.posthog | ||||||||||||||
|
|
@@ -82,11 +85,7 @@ export function createPostHogDrain(overrides?: Partial<PostHogConfig>): (ctx: Dr | |||||||||||||
| return | ||||||||||||||
| } | ||||||||||||||
|
|
||||||||||||||
| try { | ||||||||||||||
| await sendToPostHog(ctx.event, config as PostHogConfig) | ||||||||||||||
| } catch (error) { | ||||||||||||||
| console.error('[evlog/posthog] Failed to send event:', error) | ||||||||||||||
| } | ||||||||||||||
| await sendBatchToPostHog(contexts.map(c => c.event), config as PostHogConfig) | ||||||||||||||
|
||||||||||||||
| await sendBatchToPostHog(contexts.map(c => c.event), config as PostHogConfig) | |
| try { | |
| await sendBatchToPostHog(contexts.map(c => c.event), config as PostHogConfig) | |
| } catch (error) { | |
| console.error('[evlog/posthog] Failed to send events to PostHog:', error) | |
| } |
| Original file line number | Diff line number | Diff line change | ||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -214,8 +214,11 @@ function buildEnvelopeBody(logs: SentryLog[], dsn: string): string { | |||||||||||||
| * })) | ||||||||||||||
| * ``` | ||||||||||||||
| */ | ||||||||||||||
| export function createSentryDrain(overrides?: Partial<SentryConfig>): (ctx: DrainContext) => Promise<void> { | ||||||||||||||
| return async (ctx: DrainContext) => { | ||||||||||||||
| export function createSentryDrain(overrides?: Partial<SentryConfig>): (ctx: DrainContext | DrainContext[]) => Promise<void> { | ||||||||||||||
| return async (ctx: DrainContext | DrainContext[]) => { | ||||||||||||||
| const contexts = Array.isArray(ctx) ? ctx : [ctx] | ||||||||||||||
| if (contexts.length === 0) return | ||||||||||||||
|
|
||||||||||||||
| const runtimeConfig = getRuntimeConfig() | ||||||||||||||
| const evlogSentry = runtimeConfig?.evlog?.sentry | ||||||||||||||
| const rootSentry = runtimeConfig?.sentry | ||||||||||||||
|
|
@@ -233,11 +236,7 @@ export function createSentryDrain(overrides?: Partial<SentryConfig>): (ctx: Drai | |||||||||||||
| return | ||||||||||||||
| } | ||||||||||||||
|
|
||||||||||||||
| try { | ||||||||||||||
| await sendToSentry(ctx.event, config as SentryConfig) | ||||||||||||||
| } catch (error) { | ||||||||||||||
| console.error('[evlog/sentry] Failed to send log:', error) | ||||||||||||||
| } | ||||||||||||||
| await sendBatchToSentry(contexts.map(c => c.event), config as SentryConfig) | ||||||||||||||
|
||||||||||||||
| await sendBatchToSentry(contexts.map(c => c.event), config as SentryConfig) | |
| try { | |
| await sendBatchToSentry(contexts.map(c => c.event), config as SentryConfig) | |
| } catch (error) { | |
| console.error('[evlog/sentry] Failed to send logs to Sentry:', error) | |
| } |
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
| @@ -0,0 +1,144 @@ | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| export interface DrainPipelineOptions<T = unknown> { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| batch?: { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| /** Default: 50 */ | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| size?: number | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| /** Default: 5000 */ | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| intervalMs?: number | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| retry?: { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| /** Including the first attempt. Default: 3 */ | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| maxAttempts?: number | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| /** Default: 'exponential' */ | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| backoff?: 'exponential' | 'linear' | 'fixed' | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| /** Default: 1000 */ | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| initialDelayMs?: number | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| /** Default: 30000 */ | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| maxDelayMs?: number | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| /** Default: 1000 */ | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| maxBufferSize?: number | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| onDropped?: (events: T[], error?: Error) => void | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| export interface PipelineDrainFn<T> { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| (ctx: T): void | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| /** Flush all buffered events. Call on server shutdown. */ | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| flush: () => Promise<void> | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| readonly pending: number | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| export function createDrainPipeline<T = unknown>(options?: DrainPipelineOptions<T>): (drain: (ctx: T | T[]) => void | Promise<void>) => PipelineDrainFn<T> { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| const batchSize = options?.batch?.size ?? 50 | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| const intervalMs = options?.batch?.intervalMs ?? 5000 | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| const maxBufferSize = options?.maxBufferSize ?? 1000 | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
Comment on lines
+30
to
+33
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| const maxAttempts = options?.retry?.maxAttempts ?? 3 | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| const backoffStrategy = options?.retry?.backoff ?? 'exponential' | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| const initialDelayMs = options?.retry?.initialDelayMs ?? 1000 | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| const maxDelayMs = options?.retry?.maxDelayMs ?? 30000 | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
Comment on lines
+31
to
+37
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| const batchSize = options?.batch?.size ?? 50 | |
| const intervalMs = options?.batch?.intervalMs ?? 5000 | |
| const maxBufferSize = options?.maxBufferSize ?? 1000 | |
| const maxAttempts = options?.retry?.maxAttempts ?? 3 | |
| const backoffStrategy = options?.retry?.backoff ?? 'exponential' | |
| const initialDelayMs = options?.retry?.initialDelayMs ?? 1000 | |
| const maxDelayMs = options?.retry?.maxDelayMs ?? 30000 | |
| const rawBatchSize = options?.batch?.size | |
| if (rawBatchSize !== undefined) { | |
| if (!Number.isFinite(rawBatchSize) || rawBatchSize <= 0) { | |
| throw new Error(`DrainPipelineOptions.batch.size must be a positive finite number, got: ${rawBatchSize}`) | |
| } | |
| } | |
| const batchSize = rawBatchSize ?? 50 | |
| const rawIntervalMs = options?.batch?.intervalMs | |
| if (rawIntervalMs !== undefined) { | |
| if (!Number.isFinite(rawIntervalMs) || rawIntervalMs <= 0) { | |
| throw new Error(`DrainPipelineOptions.batch.intervalMs must be a positive finite number, got: ${rawIntervalMs}`) | |
| } | |
| } | |
| const intervalMs = rawIntervalMs ?? 5000 | |
| const rawMaxBufferSize = options?.maxBufferSize | |
| if (rawMaxBufferSize !== undefined) { | |
| if (!Number.isFinite(rawMaxBufferSize) || rawMaxBufferSize <= 0) { | |
| throw new Error(`DrainPipelineOptions.maxBufferSize must be a positive finite number, got: ${rawMaxBufferSize}`) | |
| } | |
| } | |
| const maxBufferSize = rawMaxBufferSize ?? 1000 | |
| const rawMaxAttempts = options?.retry?.maxAttempts | |
| if (rawMaxAttempts !== undefined) { | |
| if (!Number.isFinite(rawMaxAttempts) || rawMaxAttempts <= 0) { | |
| throw new Error(`DrainPipelineOptions.retry.maxAttempts must be a positive finite number, got: ${rawMaxAttempts}`) | |
| } | |
| } | |
| const maxAttempts = rawMaxAttempts ?? 3 | |
| const backoffStrategy = options?.retry?.backoff ?? 'exponential' | |
| const rawInitialDelayMs = options?.retry?.initialDelayMs | |
| if (rawInitialDelayMs !== undefined) { | |
| if (!Number.isFinite(rawInitialDelayMs) || rawInitialDelayMs < 0) { | |
| throw new Error(`DrainPipelineOptions.retry.initialDelayMs must be a non-negative finite number, got: ${rawInitialDelayMs}`) | |
| } | |
| } | |
| const initialDelayMs = rawInitialDelayMs ?? 1000 | |
| const rawMaxDelayMs = options?.retry?.maxDelayMs | |
| if (rawMaxDelayMs !== undefined) { | |
| if (!Number.isFinite(rawMaxDelayMs) || rawMaxDelayMs < 0) { | |
| throw new Error(`DrainPipelineOptions.retry.maxDelayMs must be a non-negative finite number, got: ${rawMaxDelayMs}`) | |
| } | |
| } | |
| const maxDelayMs = rawMaxDelayMs ?? 30000 |
Copilot
AI
Feb 8, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
flush() calls drainBuffer() directly without marking activeFlush. If events are pushed while flush() is awaiting drain(batch), push() will see !activeFlush and schedule a timer instead of coordinating with the in-progress flush, so flush() can resolve while new events remain buffered (breaking the expectation that flush() drains everything). Consider setting activeFlush during flush() (similar to startFlush()), and/or looping until buffer.length === 0 after awaiting current work.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The UI description says to watch for "[evlog/pipeline] Flushing batch of N events" logs, but there’s no corresponding log output in the new pipeline implementation. Either add the logging where batches are flushed, or adjust this description so it matches actual behavior.