From 75fa393140dc549c2e88cb33a67bc253ac01d66e Mon Sep 17 00:00:00 2001 From: Abdelrahman Awad Date: Tue, 30 Jun 2026 10:32:01 -0400 Subject: [PATCH 01/14] feat(server-utils): Restore caller context for callback tracing channels MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit `bindTracingChannelToSpan` only bound the span store on `start`, which covers the synchronous frame but not a callback the library dispatches from a detached async context (e.g. a socket data handler or a `setImmediate` drain). There, native async-context propagation no longer reaches the caller, so work issued inside the callback lost its parent. Stash the caller's store at `start` and re-bind it on `asyncStart`, so callback-style channels run their continuation in the caller's context — the same way a promise's `.then` does natively. It's inert for promise channels, which `publish` `asyncStart` rather than `runStores` it. Migrate the lru-memoizer subscriber onto the helper (`getSpan` returns `undefined`, so no span is created — it only needs the context rebind), dropping its hand-rolled callback re-wrapping. --- .../tracing-channel/lru-memoizer.ts | 45 +++++++------------ packages/server-utils/src/tracing-channel.ts | 24 +++++++++- .../server-utils/test/tracing-channel.test.ts | 45 +++++++++++++++++++ 3 files changed, 82 insertions(+), 32 deletions(-) diff --git a/packages/server-utils/src/integrations/tracing-channel/lru-memoizer.ts b/packages/server-utils/src/integrations/tracing-channel/lru-memoizer.ts index 1ca463b3b319..4e18c0b49348 100644 --- a/packages/server-utils/src/integrations/tracing-channel/lru-memoizer.ts +++ b/packages/server-utils/src/integrations/tracing-channel/lru-memoizer.ts @@ -1,14 +1,15 @@ import * as diagnosticsChannel from 'node:diagnostics_channel'; import type { IntegrationFn } from '@sentry/core'; -import { debug, defineIntegration, getCurrentScope, withScope } from '@sentry/core'; +import { debug, defineIntegration, waitForTracingChannelBinding } from '@sentry/core'; import { DEBUG_BUILD } from '../../debug-build'; import { CHANNELS } from '../../orchestrion/channels'; +import { bindTracingChannelToSpan } from '../../tracing-channel'; // Same name as the OTel integration by design — when enabled, the OTel // 'LruMemoizer' integration is omitted from the default set. const INTEGRATION_NAME = 'LruMemoizer' as const; -interface LruMemoizerChannelContext { +interface LruMemoizerLoadContext { arguments: unknown[]; } @@ -22,35 +23,19 @@ const _lruMemoizerChannelIntegration = (() => { } DEBUG_BUILD && debug.log(`[orchestrion:lru-memoizer] subscribing to channel "${CHANNELS.LRU_MEMOIZER_LOAD}"`); - const lruMemoizerCh = diagnosticsChannel.tracingChannel(CHANNELS.LRU_MEMOIZER_LOAD); - lruMemoizerCh.subscribe({ - start(rawCtx) { - const ctx = rawCtx as LruMemoizerChannelContext; - if (ctx.arguments.length === 0) { - return; - } - - // Capture the scope while we're still synchronously inside the memoized call. - // lru-memoizer queues the callback and fires it later via setImmediate, where the - // active scope no longer reflects the caller's context. - const scope = getCurrentScope(); - const cbIdx = ctx.arguments.length - 1; - const orchestrionWrappedCb = ctx.arguments[cbIdx]; - - if (typeof orchestrionWrappedCb !== 'function') { - return; - } - - const wrapped = orchestrionWrappedCb as (...a: unknown[]) => unknown; - ctx.arguments[cbIdx] = function (this: unknown, ...args: unknown[]): unknown { - return withScope(scope, () => wrapped.apply(this, args)); - }; - }, - end() {}, - asyncStart() {}, - asyncEnd() {}, - error() {}, + // lru-memoizer creates no span: it queues the load callback and fires it later via + // `setImmediate`, from a detached context where the caller's scope is no longer active. + // Returning `undefined` from `getSpan` opts out of span creation entirely — the helper's + // `asyncStart` rebind still restores the caller's context for that callback, which is all this + // instrumentation needs (keeping memoized work parented to the caller). `bindTracingChannelToSpan` + // uses `bindStore`, which needs the async-context binding `initOpenTelemetry()` registers after + // integration `setupOnce`, so defer until it's available (matches the other channel subscribers). + waitForTracingChannelBinding(() => { + bindTracingChannelToSpan( + diagnosticsChannel.tracingChannel(CHANNELS.LRU_MEMOIZER_LOAD), + () => undefined, + ); }); }, }; diff --git a/packages/server-utils/src/tracing-channel.ts b/packages/server-utils/src/tracing-channel.ts index 6fccf6820ec5..c8d5701dd11b 100644 --- a/packages/server-utils/src/tracing-channel.ts +++ b/packages/server-utils/src/tracing-channel.ts @@ -6,7 +6,15 @@ import { DEBUG_BUILD } from './debug-build'; import { ERROR_TYPE } from '@sentry/conventions/attributes'; export type TracingChannelPayloadWithSpan = TData & { + /** + * The current active span for the traced call. + */ _sentrySpan?: Span; + + /** + * The context's active store value, used to restore the context for asyncStart continuations for callback-based tracing. + */ + _sentryCallerStore?: unknown; }; /* @@ -158,21 +166,33 @@ function bindSpanToChannelStore( // 3. Read: inside the op, Sentry's scope machinery calls getScopes() → asyncStorage.getStore() on that same ALS, so getCurrentScope/getIsolationScope/getActiveSpan resolve to the scope carrying our span. // 4. Nest: any child span started in the traced op parents to that active span. channel.start.bindStore(asyncLocalStorage, (data: TracingChannelPayloadWithSpan) => { + // Stash the caller's store before we swap in the span store, so `asyncStart` can restore it for + // callback-style channels (see `_sentryCallerStore`). + data._sentryCallerStore = asyncLocalStorage.getStore(); + const span = getSpan(data); if (!span) { // Leave the active context untouched so nested operations keep parenting to the enclosing span. - return asyncLocalStorage.getStore() as TData; + return data._sentryCallerStore as TData; } data._sentrySpan = span; return binding.getStoreWithActiveSpan(span) as TData; }); + // Restore the caller's context for the async continuation. Only callback-style channels `runStores` + // `asyncStart` (so the callback runs inside this store); promise channels `publish` it, leaving this + // inert — their continuation already inherits the caller's context natively. + channel.asyncStart.bindStore(asyncLocalStorage, (data: TracingChannelPayloadWithSpan) => { + return (data._sentryCallerStore ?? asyncLocalStorage.getStore()) as TData; + }); + return { channel, unbind: () => { - // Removes the store + // Removes the stores channel.start.unbindStore(asyncLocalStorage); + channel.asyncStart.unbindStore(asyncLocalStorage); }, }; } diff --git a/packages/server-utils/test/tracing-channel.test.ts b/packages/server-utils/test/tracing-channel.test.ts index fdaeee36476f..0ae5093dd559 100644 --- a/packages/server-utils/test/tracing-channel.test.ts +++ b/packages/server-utils/test/tracing-channel.test.ts @@ -175,6 +175,51 @@ describe('bindTracingChannelToSpan', () => { expect(childParentSpanId).toBe(parent.spanContext().spanId); }); + it('restores the caller context in a callback dispatched from a detached context (asyncStart rebind)', async () => { + installTestAsyncContextStrategy(); + initTestClient(); + + let channelSpanId: string | undefined; + const { channel } = bindTracingChannelToSpan( + tracingChannel<{ operation: string }>('test:asyncStart:caller-context'), + () => { + const span = startInactiveSpan({ name: 'channel-span' }); + channelSpanId = span.spanContext().spanId; + return span; + }, + ); + + let enclosingSpanId: string | undefined; + let childParentSpanId: string | undefined; + + await new Promise(done => { + startSpan({ forceTransaction: true, name: 'enclosing-span' }, enclosing => { + enclosingSpanId = enclosing.spanContext().spanId; + channel.traceCallback( + (cb: (err: Error | null, result?: string) => void) => { + // Fire the callback after the enclosing scope has exited, so it runs in a detached + // async context — the asyncStart rebind is the only thing that can restore the caller's. + setTimeout(() => cb(null, 'ok'), 1); + }, + 0, + { operation: 'read' }, + undefined, + () => { + startSpan({ name: 'child-span' }, child => { + childParentSpanId = spanToJSON(child).parent_span_id; + }); + done(); + }, + ); + }); + }); + + // A span started inside the callback parents to the caller (the enclosing span), not to the + // channel span — matching how a promise's `.then` continuation behaves. + expect(childParentSpanId).toBe(enclosingSpanId); + expect(childParentSpanId).not.toBe(channelSpanId); + }); + describe('auto lifecycle ending strategy', () => { // Returns a channel whose span we can observe, plus spies for `span.end` and `captureException`. function setup(name: string): { From 910fdfe17caf57db225388ad294adfb18f5cb0cc Mon Sep 17 00:00:00 2001 From: Abdelrahman Awad Date: Tue, 30 Jun 2026 10:33:01 -0400 Subject: [PATCH 02/14] chore: Reword comment --- packages/server-utils/src/tracing-channel.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/packages/server-utils/src/tracing-channel.ts b/packages/server-utils/src/tracing-channel.ts index c8d5701dd11b..9d2355b4fa0e 100644 --- a/packages/server-utils/src/tracing-channel.ts +++ b/packages/server-utils/src/tracing-channel.ts @@ -181,8 +181,8 @@ function bindSpanToChannelStore( }); // Restore the caller's context for the async continuation. Only callback-style channels `runStores` - // `asyncStart` (so the callback runs inside this store); promise channels `publish` it, leaving this - // inert — their continuation already inherits the caller's context natively. + // `asyncStart` (so the callback runs inside this store). promise channels `publish` it, leaving this + // inert, their continuation already inherits the caller's context natively. channel.asyncStart.bindStore(asyncLocalStorage, (data: TracingChannelPayloadWithSpan) => { return (data._sentryCallerStore ?? asyncLocalStorage.getStore()) as TData; }); From e70e4a85f1425b1a4944083db81bf49dbbb350c0 Mon Sep 17 00:00:00 2001 From: Abdelrahman Awad Date: Tue, 30 Jun 2026 15:19:14 -0400 Subject: [PATCH 03/14] fix(server-utils): Don't leak a foreign store into a callback with no caller context MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The asyncStart producer used `data._sentryCallerStore ?? asyncLocalStorage.getStore()`. When the caller had no active store, `_sentryCallerStore` is `undefined` and the fallback bound whatever was ambient at callback time — for a callback dispatched from a pooled socket handler, that can be another request's store. `start` always runs first and sets `_sentryCallerStore`, so return it verbatim (no fallback), matching the synchronous path: no caller context restores to none, not a foreign one. --- packages/server-utils/src/tracing-channel.ts | 2 +- .../server-utils/test/tracing-channel.test.ts | 30 +++++++++++++++++++ 2 files changed, 31 insertions(+), 1 deletion(-) diff --git a/packages/server-utils/src/tracing-channel.ts b/packages/server-utils/src/tracing-channel.ts index 9d2355b4fa0e..bedc59d2e652 100644 --- a/packages/server-utils/src/tracing-channel.ts +++ b/packages/server-utils/src/tracing-channel.ts @@ -184,7 +184,7 @@ function bindSpanToChannelStore( // `asyncStart` (so the callback runs inside this store). promise channels `publish` it, leaving this // inert, their continuation already inherits the caller's context natively. channel.asyncStart.bindStore(asyncLocalStorage, (data: TracingChannelPayloadWithSpan) => { - return (data._sentryCallerStore ?? asyncLocalStorage.getStore()) as TData; + return data._sentryCallerStore as TData; }); return { diff --git a/packages/server-utils/test/tracing-channel.test.ts b/packages/server-utils/test/tracing-channel.test.ts index 0ae5093dd559..a6d26b3f5f3a 100644 --- a/packages/server-utils/test/tracing-channel.test.ts +++ b/packages/server-utils/test/tracing-channel.test.ts @@ -220,6 +220,36 @@ describe('bindTracingChannelToSpan', () => { expect(childParentSpanId).not.toBe(channelSpanId); }); + it('does not leak an unrelated active store into the callback when the caller had none', () => { + installTestAsyncContextStrategy(); + initTestClient(); + + const { channel } = bindTracingChannelToSpan(tracingChannel<{ operation: string }>('test:asyncStart:no-leak'), () => + startInactiveSpan({ name: 'channel-span' }), + ); + + // Caller issues the op with no active context, so the caller store is captured as `undefined`. + const ctx = { operation: 'read' }; + channel.start.runStores(ctx, () => undefined); + + let otherRequestSpanId: string | undefined; + let childParentSpanId: string | undefined; + + // The callback fires later, dispatched from *another* request's active context. + startSpan({ forceTransaction: true, name: 'other-request' }, other => { + otherRequestSpanId = other.spanContext().spanId; + channel.asyncStart.runStores(ctx, () => { + startSpan({ name: 'child-span' }, child => { + childParentSpanId = spanToJSON(child).parent_span_id; + }); + }); + }); + + // The caller had no context, so the callback must restore to none — not adopt the other request's. + expect(childParentSpanId).toBeUndefined(); + expect(childParentSpanId).not.toBe(otherRequestSpanId); + }); + describe('auto lifecycle ending strategy', () => { // Returns a channel whose span we can observe, plus spies for `span.end` and `captureException`. function setup(name: string): { From dea56c038d1e5c89d12a02a0bcf48feb5eb065ae Mon Sep 17 00:00:00 2001 From: Abdelrahman Awad Date: Tue, 30 Jun 2026 15:22:38 -0400 Subject: [PATCH 04/14] chore: trim the yap --- .../src/integrations/tracing-channel/lru-memoizer.ts | 9 ++------- 1 file changed, 2 insertions(+), 7 deletions(-) diff --git a/packages/server-utils/src/integrations/tracing-channel/lru-memoizer.ts b/packages/server-utils/src/integrations/tracing-channel/lru-memoizer.ts index 4e18c0b49348..2b9636abb8fd 100644 --- a/packages/server-utils/src/integrations/tracing-channel/lru-memoizer.ts +++ b/packages/server-utils/src/integrations/tracing-channel/lru-memoizer.ts @@ -24,13 +24,8 @@ const _lruMemoizerChannelIntegration = (() => { DEBUG_BUILD && debug.log(`[orchestrion:lru-memoizer] subscribing to channel "${CHANNELS.LRU_MEMOIZER_LOAD}"`); - // lru-memoizer creates no span: it queues the load callback and fires it later via - // `setImmediate`, from a detached context where the caller's scope is no longer active. - // Returning `undefined` from `getSpan` opts out of span creation entirely — the helper's - // `asyncStart` rebind still restores the caller's context for that callback, which is all this - // instrumentation needs (keeping memoized work parented to the caller). `bindTracingChannelToSpan` - // uses `bindStore`, which needs the async-context binding `initOpenTelemetry()` registers after - // integration `setupOnce`, so defer until it's available (matches the other channel subscribers). + // We only want the helper's caller-context restore for the + // callback lru-memoizer fires from a detached `setImmediate`. Defer until the bindStore binding exists. waitForTracingChannelBinding(() => { bindTracingChannelToSpan( diagnosticsChannel.tracingChannel(CHANNELS.LRU_MEMOIZER_LOAD), From 1fa066333d67ab662dbf70d28a4d1412fd9e8651 Mon Sep 17 00:00:00 2001 From: Abdelrahman Awad Date: Tue, 30 Jun 2026 15:24:35 -0400 Subject: [PATCH 05/14] chore: move comment to getSpan arg --- .../src/integrations/tracing-channel/lru-memoizer.ts | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/packages/server-utils/src/integrations/tracing-channel/lru-memoizer.ts b/packages/server-utils/src/integrations/tracing-channel/lru-memoizer.ts index 2b9636abb8fd..982e5200b4ef 100644 --- a/packages/server-utils/src/integrations/tracing-channel/lru-memoizer.ts +++ b/packages/server-utils/src/integrations/tracing-channel/lru-memoizer.ts @@ -24,11 +24,10 @@ const _lruMemoizerChannelIntegration = (() => { DEBUG_BUILD && debug.log(`[orchestrion:lru-memoizer] subscribing to channel "${CHANNELS.LRU_MEMOIZER_LOAD}"`); - // We only want the helper's caller-context restore for the - // callback lru-memoizer fires from a detached `setImmediate`. Defer until the bindStore binding exists. waitForTracingChannelBinding(() => { bindTracingChannelToSpan( diagnosticsChannel.tracingChannel(CHANNELS.LRU_MEMOIZER_LOAD), + // We only want the helper's caller-context restore for the callback lru-memoizer fires from a detached `setImmediate`. () => undefined, ); }); From f0f382ef44e500d771758a2d3b6d8fa3b1561570 Mon Sep 17 00:00:00 2001 From: Abdelrahman Awad Date: Tue, 30 Jun 2026 11:13:33 -0400 Subject: [PATCH 06/14] feat(server-utils): Move mysql orchestrion integration onto bindTracingChannelToSpan MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The mysql subscriber hand-rolled the whole channel lifecycle: a span WeakMap, a parent-scope WeakMap, manual callback re-wrapping to restore the caller's context, and per-path span ending. Most of that is what the helper already does — now that it restores caller context on `asyncStart`, the manual re-wrap can go. Add a `deferSpanEnd` option to the helper: return `true` to take ownership of ending the span so it isn't ended on `end`/`asyncEnd`. mysql uses it for the one path the helper can't model — `query(sql)` with no callback returns a streamed `Query` emitter that settles via its own `'end'`/`'error'` events, not the channel. That path keeps `bindScopeToEmitter` so user listeners nest correctly. Net: the callback and sync-throw paths are fully the helper's; only the streamed-emitter wiring stays mysql-specific. --- .../src/integrations/tracing-channel/mysql.ts | 242 ++++++------------ packages/server-utils/src/tracing-channel.ts | 18 ++ 2 files changed, 98 insertions(+), 162 deletions(-) diff --git a/packages/server-utils/src/integrations/tracing-channel/mysql.ts b/packages/server-utils/src/integrations/tracing-channel/mysql.ts index bdaffd7bdfc6..dcff84c77225 100644 --- a/packages/server-utils/src/integrations/tracing-channel/mysql.ts +++ b/packages/server-utils/src/integrations/tracing-channel/mysql.ts @@ -1,5 +1,5 @@ import * as diagnosticsChannel from 'node:diagnostics_channel'; -import type { IntegrationFn, Scope, Span } from '@sentry/core'; +import type { IntegrationFn, Scope } from '@sentry/core'; import { bindScopeToEmitter, debug, @@ -8,10 +8,11 @@ import { SEMANTIC_ATTRIBUTE_SENTRY_ORIGIN, SPAN_STATUS_ERROR, startInactiveSpan, - withScope, + waitForTracingChannelBinding, } from '@sentry/core'; import { DEBUG_BUILD } from '../../debug-build'; import { CHANNELS } from '../../orchestrion/channels'; +import { bindTracingChannelToSpan } from '../../tracing-channel'; // NOTE: this uses the same name as the OTel integration by design. // When enabled, OTel 'Mysql' integration is omitted from the default set. @@ -36,21 +37,18 @@ const ATTR_NET_PEER_NAME = 'net.peer.name'; const ATTR_NET_PEER_PORT = 'net.peer.port'; /** - * The shape orchestrion's wrapCallback transform attaches to the tracing-channel - * `context` object. Documented here rather than imported because orchestrion's - * runtime doesn't export it — see `node_modules/@apm-js-collab/code-transformer/lib/transforms.js`. - * - * `arguments` is the *live* args array passed to the wrapped function: orchestrion - * splices the user's callback out and inserts its own wrapper at the same index - * before publishing `start`. The `start` hook re-wraps that entry to restore the - * caller's scope across mysql's async callback dispatch (see below). + * The shape orchestrion's transform attaches to the tracing-channel `context` object. Documented here + * rather than imported because orchestrion's runtime doesn't export it. */ interface MysqlQueryChannelContext { + // The live args array passed to the wrapped `connection.query` call; `arguments[0]` is the SQL. arguments: unknown[]; self?: MysqlConnection; moduleVersion?: string; result?: unknown; error?: unknown; + // The caller's scope, captured at `start` and replayed onto the streamed `Query` emitter (see below). + _sentryCallerScope?: Scope; } interface MysqlConnectionConfig { @@ -70,163 +68,83 @@ const _mysqlChannelIntegration = (() => { return { name: INTEGRATION_NAME, setupOnce() { - DEBUG_BUILD && debug.log(`[orchestrion:mysql] subscribing to channel "${CHANNELS.MYSQL_QUERY}"`); - const queryCh = diagnosticsChannel.tracingChannel(CHANNELS.MYSQL_QUERY); - - // Orchestrion creates one `context` object per call, shared across all - // lifecycle hooks. We key both maps off that identity; `WeakMap` so an - // unfinished path can't leak its entries. - const spans = new WeakMap(); - // The scope active when the query was issued, consumed in `end` to bind - // the streamed `Query` emitter's listeners to it. - const parentScopes = new WeakMap(); - - // `subscribe()` requires all five lifecycle hooks. The orchestrion - // `wrapAuto` transform fires events in one of four orders depending on - // call shape: - // - sync throw : start → error → end - // (NO asyncEnd) - // - async-callback error : start → end → error → - // asyncStart → asyncEnd - // - async-callback success : start → end → asyncStart → - // asyncEnd - // - no-callback (streamable Query) : start → end - // (ctx.result is the Query - // emitter, no async events) - // - // Where the span closes depends on the path: `asyncEnd` for callbacks (so - // it spans the full round-trip + callback), or `end` for the sync-throw - // and streamable paths. The `end` hook tells those apart via `ctx.error` - // / `ctx.result` — see there. - queryCh.subscribe({ - start(rawCtx) { - const ctx = rawCtx as MysqlQueryChannelContext; - const sql = extractSql(ctx.arguments[0]); - const { host, port, database, user } = getConnectionConfig(ctx.self); - const portNumber = typeof port === 'string' ? parseInt(port, 10) : port; - const portIsNumber = typeof portNumber === 'number' && !isNaN(portNumber); - - const span = startInactiveSpan({ - name: sql ?? 'mysql.query', - op: 'db', - attributes: { - [ATTR_DB_SYSTEM]: 'mysql', - [SEMANTIC_ATTRIBUTE_SENTRY_ORIGIN]: 'auto.db.orchestrion.mysql', - [ATTR_DB_CONNECTION_STRING]: getJDBCString(host, portIsNumber ? portNumber : undefined, database), - ...(database ? { [ATTR_DB_NAME]: database } : {}), - ...(user ? { [ATTR_DB_USER]: user } : {}), - ...(sql ? { [ATTR_DB_STATEMENT]: sql } : {}), - ...(host ? { [ATTR_NET_PEER_NAME]: host } : {}), - ...(portIsNumber ? { [ATTR_NET_PEER_PORT]: portNumber } : {}), - }, - }); - spans.set(rawCtx, span); - - // Capture the scope while we're still synchronously inside the - // caller's `connection.query` call. mysql v2 drains callbacks and - // emits streamed-query events from its socket data handler, where the - // AsyncLocalStorage store backing the active span no longer reflects - // the caller's context — and `asyncStart`/`asyncEnd` fire from that - // same lost context, so capturing has to happen now. - const scope = getCurrentScope(); - parentScopes.set(rawCtx, scope); - - // Callback path: orchestrion has spliced the user's callback out of - // `ctx.arguments` and put its own wrapper (`__apm$wrappedCb`) at the - // same index. Re-wrap it so the callback — and any nested - // `connection.query(...)` — runs with the captured scope active. - if (ctx.arguments.length > 0) { - const cbIdx = ctx.arguments.length - 1; - const orchestrionWrappedCb = ctx.arguments[cbIdx]; - if (typeof orchestrionWrappedCb === 'function') { - const wrapped = orchestrionWrappedCb as (...a: unknown[]) => unknown; - ctx.arguments[cbIdx] = function (this: unknown, ...args: unknown[]): unknown { - return withScope(scope, () => wrapped.apply(this, args)); - }; - } - } - }, - - end(rawCtx) { - const ctx = rawCtx as MysqlQueryChannelContext; - - // Sync throw: `end` fires AFTER `error` (both inside the wrapper's - // `try/catch/finally`), so `ctx.error` is already set. Close the - // span now since no `asyncEnd` will fire. - if (ctx.error !== undefined) { - finishSpan(rawCtx); - return; - } - - // No-callback (streamable Query) path: orchestrion's `wrapPromise` - // stores the synchronous return value on `ctx.result` and never - // fires `asyncStart`/`asyncEnd`. The returned `Query` is an - // `EventEmitter` that emits `'end'` on success and `'error'` on - // failure — hook those to close the span. - // Note: a streamed span never finishes if the connection is destroyed - // mid-flight — mysql then emits neither `'end'` nor `'error'`, so the - // span is dropped (the `WeakMap` still prevents a leak). Closing this - // needs connection-level hooks the per-query context doesn't expose. - const result = ctx.result; - if (result && typeof result === 'object' && hasOnMethod(result)) { - const span = spans.get(rawCtx); - if (!span) return; + // `tracingChannel` is unavailable before Node 18.19 so do nothing in that case. + if (!diagnosticsChannel.tracingChannel) { + return; + } - // Bind the captured scope to the streamed `Query` emitter: its - // `'end'`/`'error'`/`'fields'`/… events fire from mysql's socket - // handler with the caller's context lost, so without this a span - // started in a user's stream listener would begin a fresh root trace - // instead of nesting under the parent. `bindScopeToEmitter` patches - // `on`/`addListener`/… so listeners added after `query()` returns - // inherit the scope (like OTel's `context.bind`). - const parentScope = parentScopes.get(rawCtx); - if (parentScope) { - bindScopeToEmitter(result, parentScope); - } + DEBUG_BUILD && debug.log(`[orchestrion:mysql] subscribing to channel "${CHANNELS.MYSQL_QUERY}"`); - result.on('error', err => { - span.setStatus({ - code: SPAN_STATUS_ERROR, - message: err instanceof Error ? err.message : 'unknown_error', - }); - // Defensive: end the span here too in case `'end'` never fires - // (e.g. abrupt socket destruction). `finishSpan` is idempotent — - // `spans.delete` makes the subsequent `'end'` listener a no-op. - finishSpan(rawCtx); + // `bindTracingChannelToSpan` uses `bindStore`, which needs the async-context binding registered + // after integration `setupOnce` — defer until it's available (matches the other channel subscribers). + // The helper opens the span on `start`, restores the caller's context for callback-style queries on + // `asyncStart`, sets error status, and ends the span on `asyncEnd` (callback) / `end` (sync throw). + waitForTracingChannelBinding(() => { + bindTracingChannelToSpan( + diagnosticsChannel.tracingChannel(CHANNELS.MYSQL_QUERY), + data => { + const sql = extractSql(data.arguments[0]); + const { host, port, database, user } = getConnectionConfig(data.self); + const portNumber = typeof port === 'string' ? parseInt(port, 10) : port; + const portIsNumber = typeof portNumber === 'number' && !isNaN(portNumber); + + // Capture the caller's scope while still synchronously inside `connection.query`, for the + // streamed-query path: mysql emits the `Query` emitter's events from its socket data handler, + // where the caller's context is lost. `deferSpanEnd` replays this scope onto that emitter. + data._sentryCallerScope = getCurrentScope(); + + return startInactiveSpan({ + name: sql ?? 'mysql.query', + op: 'db', + attributes: { + [ATTR_DB_SYSTEM]: 'mysql', + [SEMANTIC_ATTRIBUTE_SENTRY_ORIGIN]: 'auto.db.orchestrion.mysql', + [ATTR_DB_CONNECTION_STRING]: getJDBCString(host, portIsNumber ? portNumber : undefined, database), + ...(database ? { [ATTR_DB_NAME]: database } : {}), + ...(user ? { [ATTR_DB_USER]: user } : {}), + ...(sql ? { [ATTR_DB_STATEMENT]: sql } : {}), + ...(host ? { [ATTR_NET_PEER_NAME]: host } : {}), + ...(portIsNumber ? { [ATTR_NET_PEER_PORT]: portNumber } : {}), + }, }); - result.on('end', () => finishSpan(rawCtx)); - return; - } - - // Callback path: `asyncEnd` will close the span. Nothing to do here. - }, - - error(rawCtx) { - const ctx = rawCtx as MysqlQueryChannelContext; - const span = spans.get(rawCtx); - if (!span) return; - span.setStatus({ - code: SPAN_STATUS_ERROR, - message: ctx.error instanceof Error ? ctx.error.message : 'unknown_error', - }); - }, - - asyncStart() { - // No-op: we end on `asyncEnd` so the span covers the full callback duration. - }, + }, + { + // mysql's no-callback `query(sql)` returns a streamable `Query` emitter: the channel publishes + // `end` synchronously (carrying the emitter as `result`), but the query isn't done until the + // emitter emits `'end'`/`'error'`. Defer ending to those events for that path; the callback and + // sync-throw paths carry no emitter, so the helper ends the span as usual. + // Note: a streamed span never finishes if the connection is destroyed mid-flight — mysql then + // emits neither `'end'` nor `'error'`, so the span is dropped (it leaks no memory; the emitter + // and its closed-over span are collected together). Closing this needs connection-level hooks + // the per-query context doesn't expose. + deferSpanEnd(span, data) { + const result = data.result; + if (!result || typeof result !== 'object' || !hasOnMethod(result)) { + return false; + } + + // Replay the caller's scope onto the emitter so listeners the user attaches after `query()` + // returns (and any spans they start) nest under the caller, not a fresh root trace. + const callerScope = data._sentryCallerScope; + if (callerScope) { + bindScopeToEmitter(result, callerScope); + } + + result.on('error', err => { + span.setStatus({ + code: SPAN_STATUS_ERROR, + message: err instanceof Error ? err.message : 'unknown_error', + }); + // `span.end()` is idempotent, so a following `'end'` is a no-op. + span.end(); + }); + result.on('end', () => span.end()); - asyncEnd(rawCtx) { - finishSpan(rawCtx); - }, + return true; + }, + }, + ); }); - - function finishSpan(rawCtx: object): void { - const span = spans.get(rawCtx); - if (!span) return; - span.end(); - spans.delete(rawCtx); - parentScopes.delete(rawCtx); - } }, }; }) satisfies IntegrationFn; diff --git a/packages/server-utils/src/tracing-channel.ts b/packages/server-utils/src/tracing-channel.ts index bedc59d2e652..8989246ad0b0 100644 --- a/packages/server-utils/src/tracing-channel.ts +++ b/packages/server-utils/src/tracing-channel.ts @@ -42,6 +42,15 @@ export interface TracingChannelLifeCycleOptions { * For database drivers, it is not recommended to set this at all. */ captureError?: boolean | ((e: unknown) => ExclusiveEventHintOrCaptureContext); + + /** + * Take ownership of ending the span for a given payload: return `true` to stop the helper from + * ending it (on `end` or `asyncEnd`), making the caller responsible for calling `span.end()`. + * Use it for results that settle out-of-band of the channel lifecycle — e.g. a streamed + * `EventEmitter` whose completion is signalled by its own `'end'`/`'error'` events, not by the + * channel. Return `false` (the default) to let the helper end the span as usual. + */ + deferSpanEnd?: (span: Span, data: TracingChannelPayloadWithSpan) => boolean; } /** Returned by {@link bindTracingChannelToSpan}: the bound channel plus a teardown handle. */ @@ -77,6 +86,7 @@ export function bindTracingChannelToSpan( const handle = bindSpanToChannelStore(channel, getSpan); const beforeSpanEnd = opts?.beforeSpanEnd; + const deferSpanEnd = opts?.deferSpanEnd; const getErrorHint = (e: unknown): ExclusiveEventHintOrCaptureContext => { if (typeof opts?.captureError === 'function') { return opts.captureError(e); @@ -97,6 +107,10 @@ export function bindTracingChannelToSpan( // The operation settled synchronously (returned or threw) // Presence checks because caller can return `undefined` result or throw a falsy value. if ('error' in data || 'result' in data) { + const span = data._sentrySpan; + if (span && deferSpanEnd?.(span, data)) { + return; + } endBoundSpan(data, beforeSpanEnd); } }, @@ -117,6 +131,10 @@ export function bindTracingChannelToSpan( span.setAttributes(attributes); }, asyncEnd(data) { + const span = data._sentrySpan; + if (span && deferSpanEnd?.(span, data)) { + return; + } endBoundSpan(data, beforeSpanEnd); }, }; From a72910af1d5fbe03f5fe8c9dd2456512374d8b63 Mon Sep 17 00:00:00 2001 From: Abdelrahman Awad Date: Tue, 30 Jun 2026 14:45:06 -0400 Subject: [PATCH 07/14] test(deno): Drive mysql channel via runStores in orchestrion unit test MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The synthetic test published `start`/`asyncStart` with bare `publish`, which doesn't run bound stores — fine for the old subscriber that opened the span on `start`, but the span is now opened in the `start.bindStore` transform (only run via `runStores`). Drive it the way orchestrion's `wrapCallback` actually does so the bound store activates and the span opens. --- packages/deno/test/orchestrion-mysql.test.ts | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/packages/deno/test/orchestrion-mysql.test.ts b/packages/deno/test/orchestrion-mysql.test.ts index eac5d7dad6cb..0159d66baf79 100644 --- a/packages/deno/test/orchestrion-mysql.test.ts +++ b/packages/deno/test/orchestrion-mysql.test.ts @@ -120,11 +120,15 @@ Deno.test('denoMysqlIntegration: orchestrion:mysql:query channel produces a nest // Callback-success order published by orchestrion's transform: // start → end → asyncStart → asyncEnd (the span closes on asyncEnd). + // `start`/`asyncStart` go through `runStores` (not bare `publish`), exactly as the transform's + // `wrapCallback` does — that's what activates the store the subscriber binds, so the span opens. startSpan({ name: 'parent', op: 'test' }, () => { - channel.start.publish(ctx); - channel.end.publish(ctx); - channel.asyncStart.publish(ctx); - channel.asyncEnd.publish(ctx); + channel.start.runStores(ctx, () => { + channel.end.publish(ctx); + }); + channel.asyncStart.runStores(ctx, () => { + channel.asyncEnd.publish(ctx); + }); }); const parent = await withTimeout( From 81a297da9f10bcf9b77487390a2925df308a7285 Mon Sep 17 00:00:00 2001 From: Abdelrahman Awad Date: Tue, 30 Jun 2026 14:53:56 -0400 Subject: [PATCH 08/14] chore: trim the yap --- .../src/integrations/tracing-channel/mysql.ts | 12 ++---------- 1 file changed, 2 insertions(+), 10 deletions(-) diff --git a/packages/server-utils/src/integrations/tracing-channel/mysql.ts b/packages/server-utils/src/integrations/tracing-channel/mysql.ts index dcff84c77225..820663b50d4a 100644 --- a/packages/server-utils/src/integrations/tracing-channel/mysql.ts +++ b/packages/server-utils/src/integrations/tracing-channel/mysql.ts @@ -75,10 +75,6 @@ const _mysqlChannelIntegration = (() => { DEBUG_BUILD && debug.log(`[orchestrion:mysql] subscribing to channel "${CHANNELS.MYSQL_QUERY}"`); - // `bindTracingChannelToSpan` uses `bindStore`, which needs the async-context binding registered - // after integration `setupOnce` — defer until it's available (matches the other channel subscribers). - // The helper opens the span on `start`, restores the caller's context for callback-style queries on - // `asyncStart`, sets error status, and ends the span on `asyncEnd` (callback) / `end` (sync throw). waitForTracingChannelBinding(() => { bindTracingChannelToSpan( diagnosticsChannel.tracingChannel(CHANNELS.MYSQL_QUERY), @@ -111,12 +107,8 @@ const _mysqlChannelIntegration = (() => { { // mysql's no-callback `query(sql)` returns a streamable `Query` emitter: the channel publishes // `end` synchronously (carrying the emitter as `result`), but the query isn't done until the - // emitter emits `'end'`/`'error'`. Defer ending to those events for that path; the callback and - // sync-throw paths carry no emitter, so the helper ends the span as usual. - // Note: a streamed span never finishes if the connection is destroyed mid-flight — mysql then - // emits neither `'end'` nor `'error'`, so the span is dropped (it leaks no memory; the emitter - // and its closed-over span are collected together). Closing this needs connection-level hooks - // the per-query context doesn't expose. + // emitter emits `'end'`/`'error'`. + // Defer ending to those events for that path. deferSpanEnd(span, data) { const result = data.result; if (!result || typeof result !== 'object' || !hasOnMethod(result)) { From 2c2d110b8e6de6c41cee34972b42f2c75b39fdf9 Mon Sep 17 00:00:00 2001 From: Abdelrahman Awad Date: Tue, 30 Jun 2026 15:04:46 -0400 Subject: [PATCH 09/14] feat(server-utils): Give deferSpanEnd an `end` util with full lifecycle parity MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit `deferSpanEnd` handed the caller the span and left them to end it, so the streamed-mysql path reimplemented a thinner version of the lifecycle — bare `setStatus`, no `error.type`, no `captureError`, no `beforeSpanEnd`. Pass an `end(error?)` into `deferSpanEnd` instead: it owns *how* the span ends (error status + attributes, captureError, beforeSpanEnd) and is idempotent, so the deferred path can't drift from the channel lifecycle. The error annotation is factored into a shared helper both paths use. mysql just wires the emitter: `on('error', end)` / `on('end', end)`. `deferSpanEnd` now takes a single object arg `{ span, data, end }`. --- .../src/integrations/tracing-channel/mysql.ts | 41 ++++-------- packages/server-utils/src/tracing-channel.ts | 63 +++++++++++++----- .../server-utils/test/tracing-channel.test.ts | 66 +++++++++++++++++++ 3 files changed, 125 insertions(+), 45 deletions(-) diff --git a/packages/server-utils/src/integrations/tracing-channel/mysql.ts b/packages/server-utils/src/integrations/tracing-channel/mysql.ts index 820663b50d4a..522bb0b962ce 100644 --- a/packages/server-utils/src/integrations/tracing-channel/mysql.ts +++ b/packages/server-utils/src/integrations/tracing-channel/mysql.ts @@ -6,7 +6,6 @@ import { defineIntegration, getCurrentScope, SEMANTIC_ATTRIBUTE_SENTRY_ORIGIN, - SPAN_STATUS_ERROR, startInactiveSpan, waitForTracingChannelBinding, } from '@sentry/core'; @@ -18,16 +17,9 @@ import { bindTracingChannelToSpan } from '../../tracing-channel'; // When enabled, OTel 'Mysql' integration is omitted from the default set. const INTEGRATION_NAME = 'Mysql' as const; -// OpenTelemetry "OLD" db/net semantic-conventions. We inline them rather than -// importing `@opentelemetry/semantic-conventions` to keep this integration's -// dependency surface free of OTel — orchestrion's whole point is to step away -// from the OTel auto-instrumentation stack. -// -// We emit the OLD conventions to match `@opentelemetry/instrumentation-mysql`'s -// default (it only emits the stable `db.system.name` / `db.query.text` set when -// `OTEL_SEMCONV_STABILITY_OPT_IN=database` is opted into) and the rest of the -// Sentry JS SDK, whose `inferDbSpanData` processor renames spans based on -// `db.statement`. +// OTel "OLD" db/net semantic-conventions, inlined to keep this integration free of OTel deps. Matches +// `@opentelemetry/instrumentation-mysql`'s default and the SDK's `inferDbSpanData` (which renames spans +// off `db.statement`). const ATTR_DB_SYSTEM = 'db.system'; const ATTR_DB_CONNECTION_STRING = 'db.connection_string'; const ATTR_DB_NAME = 'db.name'; @@ -84,9 +76,8 @@ const _mysqlChannelIntegration = (() => { const portNumber = typeof port === 'string' ? parseInt(port, 10) : port; const portIsNumber = typeof portNumber === 'number' && !isNaN(portNumber); - // Capture the caller's scope while still synchronously inside `connection.query`, for the - // streamed-query path: mysql emits the `Query` emitter's events from its socket data handler, - // where the caller's context is lost. `deferSpanEnd` replays this scope onto that emitter. + // For the streamed path: mysql emits the `Query` emitter's events from its socket data + // handler with the caller's context lost. `deferSpanEnd` replays this scope onto the emitter. data._sentryCallerScope = getCurrentScope(); return startInactiveSpan({ @@ -105,32 +96,22 @@ const _mysqlChannelIntegration = (() => { }); }, { - // mysql's no-callback `query(sql)` returns a streamable `Query` emitter: the channel publishes - // `end` synchronously (carrying the emitter as `result`), but the query isn't done until the - // emitter emits `'end'`/`'error'`. - // Defer ending to those events for that path. - deferSpanEnd(span, data) { + // No-callback `query(sql)` returns a streamable `Query` emitter as `result`; it settles on the + // emitter's `'end'`/`'error'`, not the channel, so defer ending to those. + deferSpanEnd({ data, end }) { const result = data.result; if (!result || typeof result !== 'object' || !hasOnMethod(result)) { return false; } - // Replay the caller's scope onto the emitter so listeners the user attaches after `query()` - // returns (and any spans they start) nest under the caller, not a fresh root trace. + // Replay the caller's scope so user listeners on the emitter nest under it, not a new trace. const callerScope = data._sentryCallerScope; if (callerScope) { bindScopeToEmitter(result, callerScope); } - result.on('error', err => { - span.setStatus({ - code: SPAN_STATUS_ERROR, - message: err instanceof Error ? err.message : 'unknown_error', - }); - // `span.end()` is idempotent, so a following `'end'` is a no-op. - span.end(); - }); - result.on('end', () => span.end()); + result.on('error', err => end(err)); + result.on('end', () => end()); return true; }, diff --git a/packages/server-utils/src/tracing-channel.ts b/packages/server-utils/src/tracing-channel.ts index 8989246ad0b0..860d9675886a 100644 --- a/packages/server-utils/src/tracing-channel.ts +++ b/packages/server-utils/src/tracing-channel.ts @@ -44,13 +44,20 @@ export interface TracingChannelLifeCycleOptions { captureError?: boolean | ((e: unknown) => ExclusiveEventHintOrCaptureContext); /** - * Take ownership of ending the span for a given payload: return `true` to stop the helper from - * ending it (on `end` or `asyncEnd`), making the caller responsible for calling `span.end()`. - * Use it for results that settle out-of-band of the channel lifecycle — e.g. a streamed - * `EventEmitter` whose completion is signalled by its own `'end'`/`'error'` events, not by the - * channel. Return `false` (the default) to let the helper end the span as usual. + * Take ownership of *when* the span ends: return `true` and the helper won't end it on + * `end`/`asyncEnd`. For results that settle out-of-band — e.g. a streamed `EventEmitter` that + * completes via its own `'end'`/`'error'` events. + * + * Call `end` when it settles — `end()` on success, `end(error)` on failure. `end` owns *how* the span + * ends (error status/attributes, `captureError`, `beforeSpanEnd`) and is idempotent. Default `false` + * lets the helper end the span as usual. */ - deferSpanEnd?: (span: Span, data: TracingChannelPayloadWithSpan) => boolean; + deferSpanEnd?: (args: { + span: Span; + data: TracingChannelPayloadWithSpan; + /** Ends the span: `end()` on success, `end(error)` on failure. Idempotent. */ + end: (error?: unknown) => void; + }) => boolean; } /** Returned by {@link bindTracingChannelToSpan}: the bound channel plus a teardown handle. */ @@ -100,6 +107,38 @@ export function bindTracingChannelToSpan( }; }; + // Apply Sentry error status + attributes (and capture, if configured) to a span. Shared by the + // channel `error` lifecycle and the deferred `end` util so the two can't drift. + const annotateSpanError = (span: Span, error: unknown): void => { + if (opts?.captureError) { + captureException(error, getErrorHint(error)); + } + + const { message, attributes } = getErrorInfo(error); + span.setStatus({ code: SPAN_STATUS_ERROR, message }); + span.setAttributes(attributes); + }; + + // The `end` handed to `deferSpanEnd`: the caller owns *when*, this owns *how* (error annotation, + // `beforeSpanEnd`, the actual end). Idempotent so a deferred source firing both `'error'` and + // `'end'` doesn't double-annotate. + const makeDeferredEnd = (span: Span, data: TracingChannelPayloadWithSpan) => { + let ended = false; + + return (error?: unknown): void => { + if (ended) { + return; + } + ended = true; + + if (error !== undefined) { + annotateSpanError(span, error); + } + beforeSpanEnd?.(span, data); + span.end(); + }; + }; + const subscribers: Partial>> = { start: NOOP, asyncStart: NOOP, @@ -108,7 +147,7 @@ export function bindTracingChannelToSpan( // Presence checks because caller can return `undefined` result or throw a falsy value. if ('error' in data || 'result' in data) { const span = data._sentrySpan; - if (span && deferSpanEnd?.(span, data)) { + if (span && deferSpanEnd?.({ span, data, end: makeDeferredEnd(span, data) })) { return; } endBoundSpan(data, beforeSpanEnd); @@ -122,17 +161,11 @@ export function bindTracingChannelToSpan( return; } - if (opts?.captureError) { - captureException(data.error, getErrorHint(data.error)); - } - - const { message, attributes } = getErrorInfo(data.error); - span.setStatus({ code: SPAN_STATUS_ERROR, message }); - span.setAttributes(attributes); + annotateSpanError(span, data.error); }, asyncEnd(data) { const span = data._sentrySpan; - if (span && deferSpanEnd?.(span, data)) { + if (span && deferSpanEnd?.({ span, data, end: makeDeferredEnd(span, data) })) { return; } endBoundSpan(data, beforeSpanEnd); diff --git a/packages/server-utils/test/tracing-channel.test.ts b/packages/server-utils/test/tracing-channel.test.ts index a6d26b3f5f3a..6ea6a704ee0d 100644 --- a/packages/server-utils/test/tracing-channel.test.ts +++ b/packages/server-utils/test/tracing-channel.test.ts @@ -783,6 +783,72 @@ describe('bindTracingChannelToSpan', () => { }); }); + describe('deferSpanEnd', () => { + // Drives a deferred span: `getSpan` opens it, `deferSpanEnd` returns true and captures the `end` + // util so the test can settle the span out-of-band, mimicking a streamed emitter. + function setupDeferred( + name: string, + opts?: { captureError?: boolean }, + ): { span: Span; endSpy: ReturnType; end: (error?: unknown) => void } { + installTestAsyncContextStrategy(); + initTestClient(); + const span = startInactiveSpan({ name: 'channel-span' }); + const endSpy = vi.spyOn(span, 'end'); + let captured: (error?: unknown) => void = () => undefined; + const { channel } = bindTracingChannelToSpan(tracingChannel<{ operation: string }>(name), () => span, { + captureError: opts?.captureError, + deferSpanEnd({ end }) { + captured = end; + return true; + }, + }); + + channel.traceSync(() => 'stream', { operation: 'read' }); + + return { span, endSpy, end: (error?: unknown) => captured(error) }; + } + + it('does not end the span while deferred', () => { + const { span, endSpy } = setupDeferred('test:defer:open'); + expect(endSpy).not.toHaveBeenCalled(); + expect(spanToJSON(span).timestamp).toBeUndefined(); + }); + + it('`end()` ends the span once with no error status', () => { + const { span, endSpy, end } = setupDeferred('test:defer:ok'); + end(); + expect(endSpy).toHaveBeenCalledTimes(1); + expect(spanToJSON(span).timestamp).toBeDefined(); + expect(spanToJSON(span).status).toBeUndefined(); + }); + + it('`end(error)` sets error status and the `error.type` attribute, then ends', () => { + const { span, endSpy, end } = setupDeferred('test:defer:error'); + end(new TypeError('stream blew up')); + expect(spanToJSON(span).status).toBe('stream blew up'); + expect(spanToJSON(span).data['error.type']).toBe('TypeError'); + expect(endSpy).toHaveBeenCalledTimes(1); + }); + + it('is idempotent: a trailing settle is a no-op', () => { + const { endSpy, end } = setupDeferred('test:defer:idempotent'); + end(new Error('boom')); + end(); + expect(endSpy).toHaveBeenCalledTimes(1); + }); + + it('captures the error via `end(error)` when `captureError` is set', () => { + const captureExceptionSpy = vi.spyOn(SentryCore, 'captureException').mockReturnValue('event-id'); + const { end } = setupDeferred('test:defer:capture', { captureError: true }); + const error = new Error('captured-stream'); + end(error); + expect(captureExceptionSpy).toHaveBeenCalledTimes(1); + expect(captureExceptionSpy).toHaveBeenCalledWith(error, { + mechanism: { type: 'auto.diagnostic_channels.bind_span', handled: false }, + }); + }); + }); + it('returns the channel unchanged when no async context binding is available', () => { // No async context strategy is installed, so the binding cannot be resolved. const span = startInactiveSpan({ name: 'channel-span' }); From 972275c721d3138a75ce381b32edef58d3e52ccf Mon Sep 17 00:00:00 2001 From: Abdelrahman Awad Date: Tue, 30 Jun 2026 15:06:18 -0400 Subject: [PATCH 10/14] ref(server-utils): Reuse endBoundSpan in the deferred end util --- packages/server-utils/src/tracing-channel.ts | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/packages/server-utils/src/tracing-channel.ts b/packages/server-utils/src/tracing-channel.ts index 860d9675886a..70b70c0f3bae 100644 --- a/packages/server-utils/src/tracing-channel.ts +++ b/packages/server-utils/src/tracing-channel.ts @@ -119,9 +119,7 @@ export function bindTracingChannelToSpan( span.setAttributes(attributes); }; - // The `end` handed to `deferSpanEnd`: the caller owns *when*, this owns *how* (error annotation, - // `beforeSpanEnd`, the actual end). Idempotent so a deferred source firing both `'error'` and - // `'end'` doesn't double-annotate. + // Creates an end fn for deferred handlers to use, ensures consistent span end behavior const makeDeferredEnd = (span: Span, data: TracingChannelPayloadWithSpan) => { let ended = false; @@ -129,13 +127,13 @@ export function bindTracingChannelToSpan( if (ended) { return; } - ended = true; + ended = true; if (error !== undefined) { annotateSpanError(span, error); } - beforeSpanEnd?.(span, data); - span.end(); + + endBoundSpan(data, beforeSpanEnd); }; }; From 891901ccaf07483d8bdd25007e7e68d80edc9d12 Mon Sep 17 00:00:00 2001 From: isaacs Date: Fri, 26 Jun 2026 21:05:52 -0700 Subject: [PATCH 11/14] feat: pg orchestrion instrumentation Add orchestrion instrumentation for Node, Deno, and Bun, covering the `pg` module. This basically copies exactly what the `mysql` integration does, but for postgres. fix: #20764 fix: JS-2415 --- .github/workflows/build.yml | 3 +- .../bun-integration-tests/package.json | 3 +- .../suites/orchestrion-postgres/build.ts | 43 +++ .../suites/orchestrion-postgres/scenario.ts | 61 ++++ .../suites/orchestrion-postgres/test.ts | 65 ++++ .../test-applications/deno-pg/deno.json | 7 + .../deno-pg/docker-compose.yml | 17 + .../deno-pg/global-setup.mjs | 14 + .../deno-pg/global-teardown.mjs | 12 + .../test-applications/deno-pg/package.json | 23 ++ .../deno-pg/playwright.config.mjs | 12 + .../test-applications/deno-pg/src/app.ts | 69 ++++ .../deno-pg/start-event-proxy.mjs | 6 + .../deno-pg/tests/pg.test.ts | 55 +++ .../postgres/instrument-orchestrion.mjs | 16 + .../suites/tracing/postgres/test.ts | 190 +++++++++++ packages/deno/package.json | 3 +- packages/deno/src/index.ts | 1 + packages/deno/src/integrations/postgres.ts | 34 ++ packages/deno/src/sdk.ts | 3 +- .../deno/test/__snapshots__/mod.test.ts.snap | 4 + .../deno/test/orchestrion-postgres.test.ts | 148 ++++++++ .../test/orchestrion-postgres/scenario.mjs | 40 +++ ...erimentalUseDiagnosticsChannelInjection.ts | 19 +- .../integrations/tracing-channel/postgres.ts | 317 ++++++++++++++++++ .../server-utils/src/orchestrion/channels.ts | 3 + .../server-utils/src/orchestrion/config.ts | 38 +++ .../server-utils/src/orchestrion/index.ts | 1 + .../postgres-ignore-connect.test.ts | 19 ++ .../test/orchestrion/postgres.test.ts | 169 ++++++++++ 30 files changed, 1389 insertions(+), 6 deletions(-) create mode 100644 dev-packages/bun-integration-tests/suites/orchestrion-postgres/build.ts create mode 100644 dev-packages/bun-integration-tests/suites/orchestrion-postgres/scenario.ts create mode 100644 dev-packages/bun-integration-tests/suites/orchestrion-postgres/test.ts create mode 100644 dev-packages/e2e-tests/test-applications/deno-pg/deno.json create mode 100644 dev-packages/e2e-tests/test-applications/deno-pg/docker-compose.yml create mode 100644 dev-packages/e2e-tests/test-applications/deno-pg/global-setup.mjs create mode 100644 dev-packages/e2e-tests/test-applications/deno-pg/global-teardown.mjs create mode 100644 dev-packages/e2e-tests/test-applications/deno-pg/package.json create mode 100644 dev-packages/e2e-tests/test-applications/deno-pg/playwright.config.mjs create mode 100644 dev-packages/e2e-tests/test-applications/deno-pg/src/app.ts create mode 100644 dev-packages/e2e-tests/test-applications/deno-pg/start-event-proxy.mjs create mode 100644 dev-packages/e2e-tests/test-applications/deno-pg/tests/pg.test.ts create mode 100644 dev-packages/node-integration-tests/suites/tracing/postgres/instrument-orchestrion.mjs create mode 100644 packages/deno/src/integrations/postgres.ts create mode 100644 packages/deno/test/orchestrion-postgres.test.ts create mode 100644 packages/deno/test/orchestrion-postgres/scenario.mjs create mode 100644 packages/server-utils/src/integrations/tracing-channel/postgres.ts create mode 100644 packages/server-utils/test/orchestrion/postgres-ignore-connect.test.ts create mode 100644 packages/server-utils/test/orchestrion/postgres.test.ts diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 95d746229277..f5b424400357 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -1047,7 +1047,8 @@ jobs: - name: Set up Deno if: matrix.test-application == 'deno' || matrix.test-application == 'deno-streamed' || matrix.test-application == - 'deno-redis' || matrix.test-application == 'hono-4' || matrix.test-application == 'deno-mysql' + 'deno-redis' || matrix.test-application == 'hono-4' || matrix.test-application == 'deno-mysql' || + matrix.test-application == 'deno-pg' uses: denoland/setup-deno@v2.0.4 with: deno-version: ${{ matrix.deno-version || 'v2.8.0' }} diff --git a/dev-packages/bun-integration-tests/package.json b/dev-packages/bun-integration-tests/package.json index d6ecb6b86ce9..2dd0074acf7c 100644 --- a/dev-packages/bun-integration-tests/package.json +++ b/dev-packages/bun-integration-tests/package.json @@ -16,7 +16,8 @@ "@sentry/bun": "10.62.0", "@sentry/hono": "10.62.0", "hono": "^4.12.25", - "mysql": "^2.18.1" + "mysql": "^2.18.1", + "pg": "8.16.0" }, "devDependencies": { "@sentry-internal/test-utils": "10.62.0", diff --git a/dev-packages/bun-integration-tests/suites/orchestrion-postgres/build.ts b/dev-packages/bun-integration-tests/suites/orchestrion-postgres/build.ts new file mode 100644 index 000000000000..4091b03e6711 --- /dev/null +++ b/dev-packages/bun-integration-tests/suites/orchestrion-postgres/build.ts @@ -0,0 +1,43 @@ +// Builds the smoke scenario with the orchestrion `bun build` plugin and writes +// the bundle to a temp dir, printing the output path for test.ts to execute. +// +// A successful build proves `bun build` runs with the plugin; running the +// bundle (see test.ts) then proves the bundled `pg` is actually instrumented. + +// @ts-ignore -- subpath export resolved by Bun at runtime; the package +// tsconfig's node module resolution can't see `exports` subpaths. +import { sentryBunPlugin } from '@sentry/bun/plugin'; +import { tmpdir } from 'os'; +import { join } from 'path'; + +void (async () => { + const outdir = join(tmpdir(), `sentry-bun-orchestrion-pg-${process.pid}-${Date.now()}`); + const result = await Bun.build({ + entrypoints: [join(__dirname, 'scenario.ts')], + target: 'bun', + outdir, + // Deliberately mark `pg` external. An externalized dependency is resolved + // from `node_modules` at runtime and never passes through the transform's + // `onLoad`, so its channel injection would be silently skipped. The plugin + // must strip instrumented packages back out of `external` so they get + // bundled (and thus transformed). + external: ['pg'], + plugins: [sentryBunPlugin()], + }); + + if (!result.success) { + // eslint-disable-next-line no-console + console.error('BUILD_FAILED', result.logs); + process.exit(1); + } + + const output = result.outputs[0]; + if (!output) { + // eslint-disable-next-line no-console + console.error('BUILD_FAILED no outputs'); + process.exit(1); + } + + // eslint-disable-next-line no-console + console.log(`BUILD_OK outfile=${output.path}`); +})(); diff --git a/dev-packages/bun-integration-tests/suites/orchestrion-postgres/scenario.ts b/dev-packages/bun-integration-tests/suites/orchestrion-postgres/scenario.ts new file mode 100644 index 000000000000..72a779d9cf90 --- /dev/null +++ b/dev-packages/bun-integration-tests/suites/orchestrion-postgres/scenario.ts @@ -0,0 +1,61 @@ +// Bundled entry for the `bun build` smoke test. +// +// Once `Bun.build` (with the orchestrion plugin) has transformed `pg`, +// calling `client.query()` publishes to the `orchestrion:pg:query` tracing +// channel. +// +// `start` fires synchronously on the call, so no live database is needed. +// +// We subscribe, run a query, and report which channel events fired +// (plus the detection marker the plugin's banner sets at boot). + +import { tracingChannel } from 'node:diagnostics_channel'; + +// @ts-ignore -- only the runtime value is needed; pg's types are irrelevant +import pg from 'pg'; + +interface QueryContext { + arguments?: unknown[]; +} +interface Client { + query(sql: string, cb: () => void): void; +} +interface PgModule { + Client: new (opts: { host: string; user: string; database: string }) => Client; +} + +const events: string[] = []; +let statement = ''; + +tracingChannel('orchestrion:pg:query').subscribe({ + start(message: unknown) { + events.push('start'); + const first = (message as QueryContext).arguments?.[0]; + statement = typeof first === 'string' ? first : ''; + }, + end() { + events.push('end'); + }, + asyncStart() {}, + asyncEnd() { + events.push('asyncEnd'); + }, + error() {}, +}); + +const client = new (pg as PgModule).Client({ host: '127.0.0.1', user: 'root', database: 'mydb' }); +try { + client.query('SELECT 1 AS solution', () => {}); +} catch { + // No live server + // `start` has already published synchronously by this point. +} + +const marker = (globalThis as { __SENTRY_ORCHESTRION__?: { runtime?: boolean; bundler?: boolean } }) + .__SENTRY_ORCHESTRION__; + +setTimeout(() => { + // eslint-disable-next-line no-console + console.log(`SCENARIO events=${events.join(',')} statement=${statement} marker=${JSON.stringify(marker ?? null)}`); + process.exit(0); +}, 200); diff --git a/dev-packages/bun-integration-tests/suites/orchestrion-postgres/test.ts b/dev-packages/bun-integration-tests/suites/orchestrion-postgres/test.ts new file mode 100644 index 000000000000..58e97626f8bc --- /dev/null +++ b/dev-packages/bun-integration-tests/suites/orchestrion-postgres/test.ts @@ -0,0 +1,65 @@ +import { spawnSync } from 'child_process'; +import { rmSync } from 'fs'; +import { dirname, join } from 'path'; +import { describe, expect, it } from 'vitest'; + +const dir = __dirname; + +// Cap each `bun` subprocess. The test runs two of them sequentially, so its +// own timeout must exceed `2 * SUBPROCESS_TIMEOUT_MS` otherwise the suite's +// default `testTimeout` (20s) fails the test before these caps do, +// for example on a slow CI runner where the build+run can take >20s. +const SUBPROCESS_TIMEOUT_MS = 60_000; + +function runBun(args: string[]): { stdout: string; stderr: string; status: number | null } { + const res = spawnSync('bun', args, { cwd: dir, encoding: 'utf8', timeout: SUBPROCESS_TIMEOUT_MS }); + return { stdout: res.stdout ?? '', stderr: res.stderr ?? '', status: res.status }; +} + +// Bun orchestrion instrumentation is BUILD-ONLY (`@sentry/bun/plugin` is a +// `Bun.build` plugin; there is no `bun run` preload). +// +// A `bun run` runtime plugin cannot instrument CommonJS dependencies like +// `pg`: any module returned by a runtime `onLoad` plugin in Bun loses its +// CommonJS named exports +// +// When https://github.com/oven-sh/bun/pull/31770 lands, we can revisit an +// auto-load plugin for `bun run`. +describe('orchestrion pg instrumentation (Bun)', () => { + it( + 'bundles `pg` with the plugin, and the built output fires the pg channel when run', + () => { + // Build the scenario with the orchestrion `bun build` plugin. + const build = runBun(['run', join(dir, 'build.ts')]); + expect(build.status, `build failed:\nstderr:\n${build.stderr}\nstdout:\n${build.stdout}`).toBe(0); + + const outfile = build.stdout.match(/BUILD_OK outfile=(.+)/)?.[1]?.trim(); + expect(outfile, `no outfile in build output:\n${build.stdout}`).toBeTruthy(); + + try { + // Run the built bundle. The bundled (transformed) `pg` should publish + // to the `orchestrion:pg:query` channel when `client.query()` is + // called, and the plugin's banner should set the `bundler` marker at + // boot. + const run = runBun(['run', outfile as string]); + expect(run.status, `run failed:\nstderr:\n${run.stderr}\nstdout:\n${run.stdout}`).toBe(0); + + const line = run.stdout.split('\n').find(l => l.startsWith('SCENARIO')) ?? ''; + // channel `start` fired on `client.query()` + expect(line).toContain('events=start'); + // with the expected SQL + expect(line).toContain('statement=SELECT 1 AS solution'); + // injected banner ran at bundle boot + expect(line).toContain('"bundler":true'); + } finally { + if (outfile) { + rmSync(dirname(outfile), { recursive: true, force: true }); + } + } + // Allow for both sequential `runBun` calls hitting their subprocess + // cap, so the `spawnSync` timeouts (not the vitest 20s def) are the + // binding limit. + }, + 2 * SUBPROCESS_TIMEOUT_MS, + ); +}); diff --git a/dev-packages/e2e-tests/test-applications/deno-pg/deno.json b/dev-packages/e2e-tests/test-applications/deno-pg/deno.json new file mode 100644 index 000000000000..2bc35855c689 --- /dev/null +++ b/dev-packages/e2e-tests/test-applications/deno-pg/deno.json @@ -0,0 +1,7 @@ +{ + "imports": { + "@sentry/deno": "npm:@sentry/deno", + "pg": "npm:pg@8.16.0" + }, + "nodeModulesDir": "manual" +} diff --git a/dev-packages/e2e-tests/test-applications/deno-pg/docker-compose.yml b/dev-packages/e2e-tests/test-applications/deno-pg/docker-compose.yml new file mode 100644 index 000000000000..aeee1935341e --- /dev/null +++ b/dev-packages/e2e-tests/test-applications/deno-pg/docker-compose.yml @@ -0,0 +1,17 @@ +services: + db: + image: postgres:13 + restart: always + container_name: e2e-tests-deno-pg + ports: + - '5432:5432' + environment: + POSTGRES_USER: postgres + POSTGRES_PASSWORD: password + POSTGRES_DB: postgres + healthcheck: + test: ['CMD-SHELL', 'pg_isready -U postgres -d postgres'] + interval: 2s + timeout: 3s + retries: 30 + start_period: 5s diff --git a/dev-packages/e2e-tests/test-applications/deno-pg/global-setup.mjs b/dev-packages/e2e-tests/test-applications/deno-pg/global-setup.mjs new file mode 100644 index 000000000000..2e9841a6fdbf --- /dev/null +++ b/dev-packages/e2e-tests/test-applications/deno-pg/global-setup.mjs @@ -0,0 +1,14 @@ +import { execSync } from 'child_process'; +import { dirname } from 'path'; +import { fileURLToPath } from 'url'; + +const __dirname = dirname(fileURLToPath(import.meta.url)); + +export default async function globalSetup() { + // Start PostgreSQL via Docker Compose. `--wait` blocks until the healthcheck + // in docker-compose.yml passes, so the Deno app can connect immediately. + execSync('docker compose up -d --wait', { + cwd: __dirname, + stdio: 'inherit', + }); +} diff --git a/dev-packages/e2e-tests/test-applications/deno-pg/global-teardown.mjs b/dev-packages/e2e-tests/test-applications/deno-pg/global-teardown.mjs new file mode 100644 index 000000000000..2742279431ad --- /dev/null +++ b/dev-packages/e2e-tests/test-applications/deno-pg/global-teardown.mjs @@ -0,0 +1,12 @@ +import { execSync } from 'child_process'; +import { dirname } from 'path'; +import { fileURLToPath } from 'url'; + +const __dirname = dirname(fileURLToPath(import.meta.url)); + +export default async function globalTeardown() { + execSync('docker compose down --volumes', { + cwd: __dirname, + stdio: 'inherit', + }); +} diff --git a/dev-packages/e2e-tests/test-applications/deno-pg/package.json b/dev-packages/e2e-tests/test-applications/deno-pg/package.json new file mode 100644 index 000000000000..fafb688ac688 --- /dev/null +++ b/dev-packages/e2e-tests/test-applications/deno-pg/package.json @@ -0,0 +1,23 @@ +{ + "name": "deno-pg", + "version": "1.0.0", + "private": true, + "scripts": { + "start": "docker compose up -d --wait && deno run --allow-net --allow-env --allow-read --allow-sys --allow-write src/app.ts", + "test": "playwright test", + "clean": "npx rimraf node_modules pnpm-lock.yaml", + "test:build": "pnpm install", + "test:assert": "pnpm test" + }, + "dependencies": { + "@sentry/deno": "file:../../packed/sentry-deno-packed.tgz", + "pg": "8.16.0" + }, + "devDependencies": { + "@playwright/test": "~1.56.0", + "@sentry-internal/test-utils": "link:../../../test-utils" + }, + "volta": { + "extends": "../../package.json" + } +} diff --git a/dev-packages/e2e-tests/test-applications/deno-pg/playwright.config.mjs b/dev-packages/e2e-tests/test-applications/deno-pg/playwright.config.mjs new file mode 100644 index 000000000000..d525dd371bc9 --- /dev/null +++ b/dev-packages/e2e-tests/test-applications/deno-pg/playwright.config.mjs @@ -0,0 +1,12 @@ +import { getPlaywrightConfig } from '@sentry-internal/test-utils'; + +const config = getPlaywrightConfig({ + startCommand: `pnpm start`, + port: 3030, +}); + +export default { + ...config, + globalSetup: './global-setup.mjs', + globalTeardown: './global-teardown.mjs', +}; diff --git a/dev-packages/e2e-tests/test-applications/deno-pg/src/app.ts b/dev-packages/e2e-tests/test-applications/deno-pg/src/app.ts new file mode 100644 index 000000000000..2b9e7a432376 --- /dev/null +++ b/dev-packages/e2e-tests/test-applications/deno-pg/src/app.ts @@ -0,0 +1,69 @@ +// `@sentry/deno/import` MUST be the very first import: it registers the +// orchestrion runtime hook, which transforms `pg` (imported dynamically below) +// to publish the `orchestrion:pg:query` diagnostics channel. +// In Deno 2.8.0–2.8.2 the hook only works as the first import in the entry +// graph. +import '@sentry/deno/import'; +import * as Sentry from '@sentry/deno'; + +Sentry.init({ + environment: 'qa', + dsn: Deno.env.get('E2E_TEST_DSN'), + debug: !!Deno.env.get('DEBUG'), + tunnel: 'http://localhost:3031/', // proxy server + tracesSampleRate: 1, +}); + +// Dynamic import AFTER init so the orchestrion hook (registered above) is in +// place to transform `pg/lib/client.js`'s `query`, and so +// `denoPostgresIntegration` (wired by `init()`) is already subscribed. +const { default: pg } = await import('pg'); + +const client = new pg.Client({ + host: Deno.env.get('PGHOST') ?? '127.0.0.1', + port: Number(Deno.env.get('PGPORT') ?? 5432), + user: 'postgres', + password: 'password', + database: 'postgres', +}); + +// Swallow connection errors (e.g. the DB container going away at teardown) so +// they don't become an uncaught exception that crashes the process on +// shutdown. +client.on('error', (err: unknown) => { + // eslint-disable-next-line no-console + console.error('pg client error', err); +}); + +client.connect((err: unknown) => { + if (err) { + // eslint-disable-next-line no-console + console.error('pg connect error', err); + } +}); + +const port = 3030; + +Deno.serve({ port, hostname: '0.0.0.0' }, async (req: Request) => { + const url = new URL(req.url); + + // Runs two queries, the second NESTED inside the first's callback. pg + // dispatches that callback from its socket data handler (a fresh async + // context), so the nested query's span only lands on this request's + // http.server transaction if `denoPostgresIntegration`'s AsyncLocalStorage + // context strategy restored the parent across the async boundary. + if (url.pathname === '/test-pg') { + await new Promise((resolve, reject) => { + client.query('SELECT 1 + 1 AS solution', (err: unknown) => { + if (err) return reject(err); + client.query('SELECT NOW()', (err2: unknown) => { + if (err2) return reject(err2); + resolve(); + }); + }); + }); + return Response.json({ status: 'ok' }); + } + + return new Response('Not found', { status: 404 }); +}); diff --git a/dev-packages/e2e-tests/test-applications/deno-pg/start-event-proxy.mjs b/dev-packages/e2e-tests/test-applications/deno-pg/start-event-proxy.mjs new file mode 100644 index 000000000000..7f5c950f439e --- /dev/null +++ b/dev-packages/e2e-tests/test-applications/deno-pg/start-event-proxy.mjs @@ -0,0 +1,6 @@ +import { startEventProxyServer } from '@sentry-internal/test-utils'; + +startEventProxyServer({ + port: 3031, + proxyServerName: 'deno-pg', +}); diff --git a/dev-packages/e2e-tests/test-applications/deno-pg/tests/pg.test.ts b/dev-packages/e2e-tests/test-applications/deno-pg/tests/pg.test.ts new file mode 100644 index 000000000000..34bbb9240862 --- /dev/null +++ b/dev-packages/e2e-tests/test-applications/deno-pg/tests/pg.test.ts @@ -0,0 +1,55 @@ +import { expect, test } from '@playwright/test'; +import { waitForTransaction } from '@sentry-internal/test-utils'; + +test('pg queries emit a db span with orchestrion-channel attributes', async ({ baseURL }) => { + // Each incoming request gets a Sentry http.server transaction (via the + // default denoServeIntegration); the pg queries run inside it, so their + // db spans attach to that transaction. + const transactionPromise = waitForTransaction('deno-pg', event => { + return ( + event?.contexts?.trace?.op === 'http.server' && + (event.request?.url ?? '').includes('/test-pg') && + (event.spans?.some(span => span.op === 'db') ?? false) + ); + }); + + const res = await fetch(`${baseURL}/test-pg`); + expect(res.status).toBe(200); + await res.json(); + + const transaction = await transactionPromise; + const dbSpans = transaction.spans!.filter(span => span.op === 'db'); + + const firstQuery = dbSpans.find(span => span.description === 'SELECT 1 + 1 AS solution'); + expect(firstQuery).toBeDefined(); + expect(firstQuery!.data?.['sentry.origin']).toBe('auto.db.orchestrion.postgres'); + expect(firstQuery!.data?.['db.system']).toBe('postgresql'); + expect(firstQuery!.data?.['db.statement']).toBe('SELECT 1 + 1 AS solution'); + expect(firstQuery!.data?.['net.peer.port']).toBe(5432); + expect(firstQuery!.data?.['db.user']).toBe('postgres'); +}); + +test('a nested query lands on the same transaction (AsyncLocalStorage context restored)', async ({ baseURL }) => { + // The second query runs inside the first query's callback + // i.e. across pg's async socket-callback dispatch. Both spans appearing + // on the SAME http.server transaction proves denoPostgresIntegration's + // context strategy restored the parent span across that async boundary + // (otherwise the nested query would start its own trace and never join + // this transaction). + const transactionPromise = waitForTransaction('deno-pg', event => { + return ( + event?.contexts?.trace?.op === 'http.server' && + (event.request?.url ?? '').includes('/test-pg') && + (event.spans?.filter(span => span.op === 'db').length ?? 0) >= 2 + ); + }); + + const res = await fetch(`${baseURL}/test-pg`); + expect(res.status).toBe(200); + await res.json(); + + const transaction = await transactionPromise; + const descriptions = transaction.spans!.filter(span => span.op === 'db').map(span => span.description); + expect(descriptions).toContain('SELECT 1 + 1 AS solution'); + expect(descriptions).toContain('SELECT NOW()'); +}); diff --git a/dev-packages/node-integration-tests/suites/tracing/postgres/instrument-orchestrion.mjs b/dev-packages/node-integration-tests/suites/tracing/postgres/instrument-orchestrion.mjs new file mode 100644 index 000000000000..d0ac1aec0b2c --- /dev/null +++ b/dev-packages/node-integration-tests/suites/tracing/postgres/instrument-orchestrion.mjs @@ -0,0 +1,16 @@ +// Opting in via `experimentalUseDiagnosticsChannelInjection()` before `init()` +// is all that's needed. Because this file is loaded +// (via `--import`/`--require`) before the scenario imports `pg`, +// `Sentry.init()` synchronously installs the channel-injection hooks, so the +// OTel `Postgres` instrumentation is swapped for the diagnostics-channel one. +import * as Sentry from '@sentry/node'; +import { loggingTransport } from '@sentry-internal/node-integration-tests'; + +Sentry.experimentalUseDiagnosticsChannelInjection(); + +Sentry.init({ + dsn: 'https://public@dsn.ingest.sentry.io/1337', + release: '1.0', + tracesSampleRate: 1.0, + transport: loggingTransport, +}); diff --git a/dev-packages/node-integration-tests/suites/tracing/postgres/test.ts b/dev-packages/node-integration-tests/suites/tracing/postgres/test.ts index a24b11efee42..f72dac61a1b1 100644 --- a/dev-packages/node-integration-tests/suites/tracing/postgres/test.ts +++ b/dev-packages/node-integration-tests/suites/tracing/postgres/test.ts @@ -353,4 +353,194 @@ describe('postgres auto instrumentation', () => { }); }); }); + + // Orchestrion (diagnostics-channel) variant: the same scenarios opted into + // `experimentalUseDiagnosticsChannelInjection()`. Produces the same spans as + // the OTel path, except the query origin reports the mechanism + // (`auto.db.orchestrion.postgres`); connect/pool-connect spans stay 'manual' + // (mirroring OTel — those spans never set an origin). + describe('orchestrion (diagnostics-channel)', () => { + const ORIGIN = 'auto.db.orchestrion.postgres'; + + describe('default', () => { + const EXPECTED_TRANSACTION = { + transaction: 'Test Transaction', + spans: expect.arrayContaining([ + expect.objectContaining({ + data: expect.objectContaining({ + 'db.system': 'postgresql', + 'db.name': 'tests', + 'sentry.origin': 'manual', + 'sentry.op': 'db', + }), + description: 'pg.connect', + op: 'db', + status: 'ok', + }), + expect.objectContaining({ + data: expect.objectContaining({ + 'db.system': 'postgresql', + 'db.name': 'tests', + 'db.statement': 'INSERT INTO "User" ("email", "name") VALUES ($1, $2)', + 'sentry.origin': ORIGIN, + 'sentry.op': 'db', + }), + description: 'INSERT INTO "User" ("email", "name") VALUES ($1, $2)', + op: 'db', + status: 'ok', + origin: ORIGIN, + }), + expect.objectContaining({ + data: expect.objectContaining({ + 'db.system': 'postgresql', + 'db.name': 'tests', + 'db.statement': 'SELECT * FROM "User" WHERE "email" = $1', + 'db.postgresql.plan': 'select-user-by-email', + 'sentry.origin': ORIGIN, + 'sentry.op': 'db', + }), + description: 'SELECT * FROM "User" WHERE "email" = $1', + op: 'db', + status: 'ok', + origin: ORIGIN, + }), + expect.objectContaining({ + data: expect.objectContaining({ + 'db.system': 'postgresql', + 'db.statement': 'SELECT * FROM "does_not_exist_table"', + 'sentry.origin': ORIGIN, + 'sentry.op': 'db', + }), + description: 'SELECT * FROM "does_not_exist_table"', + op: 'db', + status: 'internal_error', + origin: ORIGIN, + }), + ]), + }; + + createEsmAndCjsTests(__dirname, 'scenario.mjs', 'instrument-orchestrion.mjs', (createTestRunner, test) => { + test('auto-instruments `pg` via diagnostics channels', { timeout: 90_000 }, async () => { + await createTestRunner() + .withDockerCompose({ workingDirectory: [__dirname] }) + .expect({ transaction: EXPECTED_TRANSACTION }) + .start() + .completed(); + }); + }); + }); + + describe('pool', () => { + const EXPECTED_TRANSACTION = { + transaction: 'Test Transaction', + spans: expect.arrayContaining([ + expect.objectContaining({ + data: expect.objectContaining({ + 'db.system': 'postgresql', + 'db.name': 'tests', + 'db.connection_string': 'postgresql://localhost:5494/tests', + 'sentry.op': 'db', + }), + description: 'pg-pool.connect', + op: 'db', + status: 'ok', + origin: 'manual', + }), + expect.objectContaining({ + data: expect.objectContaining({ + 'db.system': 'postgresql', + 'db.name': 'tests', + 'db.statement': 'SELECT 1 AS foo', + 'sentry.origin': ORIGIN, + 'sentry.op': 'db', + }), + description: 'SELECT 1 AS foo', + op: 'db', + status: 'ok', + origin: ORIGIN, + }), + ]), + }; + + createEsmAndCjsTests(__dirname, 'scenario-pool.mjs', 'instrument-orchestrion.mjs', (createTestRunner, test) => { + test('auto-instruments `pg.Pool` and handles callback-style queries', { timeout: 90_000 }, async () => { + await createTestRunner() + .withDockerCompose({ workingDirectory: [__dirname] }) + .expect({ transaction: EXPECTED_TRANSACTION }) + .start() + .completed(); + }); + }); + }); + + describe('connect error', () => { + const EXPECTED_TRANSACTION = { + transaction: 'Test Transaction', + spans: expect.arrayContaining([ + expect.objectContaining({ + data: expect.objectContaining({ 'db.system': 'postgresql', 'db.name': 'tests', 'sentry.op': 'db' }), + description: 'pg.connect', + op: 'db', + status: 'internal_error', + origin: 'manual', + }), + ]), + }; + + createEsmAndCjsTests( + __dirname, + 'scenario-connect-error.mjs', + 'instrument-orchestrion.mjs', + (createTestRunner, test) => { + test('records an errored connect span when the connection fails', { timeout: 90_000 }, async () => { + await createTestRunner().expect({ transaction: EXPECTED_TRANSACTION }).start().completed(); + }); + }, + ); + }); + + describe('requireParentSpan', () => { + createEsmAndCjsTests( + __dirname, + 'scenario-no-parent.mjs', + 'instrument-orchestrion.mjs', + (createTestRunner, test) => { + test( + 'does not instrument queries or connects without an active parent span', + { timeout: 90_000 }, + async () => { + await createTestRunner() + .withDockerCompose({ workingDirectory: [__dirname] }) + .expect({ + transaction: txn => { + const descriptions = txn.spans?.map(span => span.description) ?? []; + expect(descriptions).not.toContain('SELECT 1 AS unparented'); + expect(descriptions.find(name => name?.includes('connect'))).toBeUndefined(); + expect(txn).toMatchObject({ + transaction: 'Test Transaction', + spans: expect.arrayContaining([ + expect.objectContaining({ + data: expect.objectContaining({ + 'db.system': 'postgresql', + 'db.statement': 'SELECT 2 AS parented', + 'sentry.origin': ORIGIN, + 'sentry.op': 'db', + }), + description: 'SELECT 2 AS parented', + op: 'db', + status: 'ok', + origin: ORIGIN, + }), + ]), + }); + }, + }) + .start() + .completed(); + }, + ); + }, + ); + }); + }); }); diff --git a/packages/deno/package.json b/packages/deno/package.json index 184b628adf21..655334f0bfd6 100644 --- a/packages/deno/package.json +++ b/packages/deno/package.json @@ -32,7 +32,8 @@ "@sentry/server-utils": "10.62.0" }, "devDependencies": { - "mysql": "^2.18.1" + "mysql": "^2.18.1", + "pg": "8.16.0" }, "scripts": { "deno-types": "node ./scripts/download-deno-types.mjs", diff --git a/packages/deno/src/index.ts b/packages/deno/src/index.ts index 6ed78e48a884..7361e4d28e86 100644 --- a/packages/deno/src/index.ts +++ b/packages/deno/src/index.ts @@ -112,6 +112,7 @@ export type { DenoHttpIntegrationOptions } from './integrations/http'; export { denoRedisIntegration } from './integrations/redis'; export type { DenoRedisIntegrationOptions } from './integrations/redis'; export { denoMysqlIntegration } from './integrations/mysql'; +export { denoPostgresIntegration } from './integrations/postgres'; export { denoContextIntegration } from './integrations/context'; export { globalHandlersIntegration } from './integrations/globalhandlers'; export { normalizePathsIntegration } from './integrations/normalizepaths'; diff --git a/packages/deno/src/integrations/postgres.ts b/packages/deno/src/integrations/postgres.ts new file mode 100644 index 000000000000..afb72e827a13 --- /dev/null +++ b/packages/deno/src/integrations/postgres.ts @@ -0,0 +1,34 @@ +import { postgresChannelIntegration } from '@sentry/server-utils/orchestrion'; +import type { Integration, IntegrationFn } from '@sentry/core'; +import { defineIntegration, extendIntegration } from '@sentry/core'; +import { setAsyncLocalStorageAsyncContextStrategy } from '../async'; + +const INTEGRATION_NAME = 'DenoPostgres' as const; + +/** + * Create spans for `pg` (node-postgres) queries under Deno. + * + * `pg` channels are injected by the orchestrion runtime hook at load time. + * The `@sentry/deno/import` loader must be active for this integration to + * record anything. + * + * The channel-subscription logic is shared with the other server runtimes in + * `@sentry/server-utils`. This just installs Deno's + * `AsyncLocalStorage` context strategy (so spans nest under the active + * span and survive pg's internal callback dispatch) before delegating. + */ +const _denoPostgresIntegration = (() => { + const inner = postgresChannelIntegration(); + + return extendIntegration(inner, { + name: INTEGRATION_NAME, + setupOnce() { + setAsyncLocalStorageAsyncContextStrategy(); + }, + }); +}) satisfies IntegrationFn; + +export const denoPostgresIntegration = defineIntegration(_denoPostgresIntegration) as () => Integration & { + name: 'DenoPostgres'; + setupOnce: () => void; +}; diff --git a/packages/deno/src/sdk.ts b/packages/deno/src/sdk.ts index 5846d5f02747..db846d6b8007 100644 --- a/packages/deno/src/sdk.ts +++ b/packages/deno/src/sdk.ts @@ -24,6 +24,7 @@ import { import { denoServeIntegration } from './integrations/deno-serve'; import { denoHttpIntegration } from './integrations/http'; import { denoMysqlIntegration } from './integrations/mysql'; +import { denoPostgresIntegration } from './integrations/postgres'; import { denoRedisIntegration } from './integrations/redis'; import { globalHandlersIntegration } from './integrations/globalhandlers'; import { normalizePathsIntegration } from './integrations/normalizepaths'; @@ -59,7 +60,7 @@ export function getDefaultIntegrations(_options: Options): Integration[] { // It's possible that the orchestrion channels will be injected AFTER // (or in parallel to) loading the SDK, so we only gate on whether the // feature is possible. If they're never loaded, it'll just be a no-op. - ...(MODULE_REGISTER_HOOKS_SUPPORTED ? [denoMysqlIntegration()] : []), + ...(MODULE_REGISTER_HOOKS_SUPPORTED ? [denoMysqlIntegration(), denoPostgresIntegration()] : []), contextLinesIntegration(), normalizePathsIntegration(), globalHandlersIntegration(), diff --git a/packages/deno/test/__snapshots__/mod.test.ts.snap b/packages/deno/test/__snapshots__/mod.test.ts.snap index 2b4107c40251..92173dfec650 100644 --- a/packages/deno/test/__snapshots__/mod.test.ts.snap +++ b/packages/deno/test/__snapshots__/mod.test.ts.snap @@ -116,6 +116,7 @@ snapshot[`captureException 1`] = ` "DenoHttp", "DenoRedis", "DenoMysql", + "DenoPostgres", "ContextLines", "NormalizePaths", "GlobalHandlers", @@ -192,6 +193,7 @@ snapshot[`captureMessage 1`] = ` "DenoHttp", "DenoRedis", "DenoMysql", + "DenoPostgres", "ContextLines", "NormalizePaths", "GlobalHandlers", @@ -275,6 +277,7 @@ snapshot[`captureMessage twice 1`] = ` "DenoHttp", "DenoRedis", "DenoMysql", + "DenoPostgres", "ContextLines", "NormalizePaths", "GlobalHandlers", @@ -365,6 +368,7 @@ snapshot[`captureMessage twice 2`] = ` "DenoHttp", "DenoRedis", "DenoMysql", + "DenoPostgres", "ContextLines", "NormalizePaths", "GlobalHandlers", diff --git a/packages/deno/test/orchestrion-postgres.test.ts b/packages/deno/test/orchestrion-postgres.test.ts new file mode 100644 index 000000000000..e4c49eae0eae --- /dev/null +++ b/packages/deno/test/orchestrion-postgres.test.ts @@ -0,0 +1,148 @@ +// + +import { tracingChannel } from 'node:diagnostics_channel'; +import type { TransactionEvent } from '@sentry/core'; +import { assert } from 'https://deno.land/std@0.212.0/assert/assert.ts'; +import { assertEquals } from 'https://deno.land/std@0.212.0/assert/assert_equals.ts'; +import { assertExists } from 'https://deno.land/std@0.212.0/assert/assert_exists.ts'; +import type { DenoClient } from '../build/esm/index.js'; +import { getCurrentScope, getGlobalScope, getIsolationScope, init, startSpan } from '../build/esm/index.js'; + +function resetGlobals(): void { + getCurrentScope().clear(); + getCurrentScope().setClient(undefined); + getIsolationScope().clear(); + getGlobalScope().clear(); +} + +/** See `deno-redis.test.ts` same sink shape, deduped for clarity. */ +function transactionSink(): { + beforeSendTransaction: (event: TransactionEvent) => null; + waitFor: (predicate: (event: TransactionEvent) => boolean) => Promise; +} { + const transactions: TransactionEvent[] = []; + const waiters: { predicate: (e: TransactionEvent) => boolean; resolve: (e: TransactionEvent) => void }[] = []; + return { + beforeSendTransaction(event) { + transactions.push(event); + for (let i = waiters.length - 1; i >= 0; i--) { + const w = waiters[i]!; + if (w.predicate(event)) { + waiters.splice(i, 1); + w.resolve(event); + } + } + return null; + }, + waitFor(predicate) { + const already = transactions.find(predicate); + if (already) return Promise.resolve(already); + return new Promise(resolve => { + waiters.push({ predicate, resolve }); + }); + }, + }; +} + +function withTimeout(p: Promise, ms: number, what: string): Promise { + let timer: ReturnType | undefined; + const timeout = new Promise((_, reject) => { + timer = setTimeout(() => reject(new Error(`Timed out waiting for ${what} after ${ms}ms`)), ms); + }); + return Promise.race([p, timeout]).finally(() => { + if (timer !== undefined) clearTimeout(timer); + }); +} + +Deno.test('denoPostgresIntegration: included in default integrations (Deno 2.8.0+)', () => { + resetGlobals(); + const client = init({ dsn: 'https://username@domain/123' }) as DenoClient; + const names = client.getOptions().integrations.map(i => i.name); + assert(names.includes('DenoPostgres'), `DenoPostgres should be in defaults, got ${names.join(', ')}`); +}); + +// The orchestrion runtime hook (`@sentry/deno/import`) only works as a FIRST +// import inside the entry graph in Deno 2.8.0 through 2.8.2. +// TODO: revisit a `--import` or `--preload` approach once Deno 2.8.3 ships. +Deno.test('@sentry/deno/import: transforms pg so it publishes the orchestrion channel', async () => { + const scenario = new URL('./orchestrion-postgres/scenario.mjs', import.meta.url); + + // packages/deno, where node_modules resolves + const cwd = new URL('../', import.meta.url); + + const command = new Deno.Command('deno', { + args: ['run', '--allow-all', scenario.pathname], + cwd: cwd.pathname, + stdout: 'piped', + stderr: 'piped', + }); + + const { code, stdout, stderr } = await command.output(); + const out = new TextDecoder().decode(stdout); + const err = new TextDecoder().decode(stderr); + + assertEquals(code, 0, `scenario exited ${code}\nstdout:\n${out}\nstderr:\n${err}`); + + const line = out.split('\n').find(l => l.startsWith('SCENARIO')) ?? ''; + assert(line, `no SCENARIO line in output:\n${out}\nstderr:\n${err}`); + // The injected channel fired on `client.query()` + // proves pg was transformed... + assert(line.includes('events=start'), `expected channel 'start' event, got: ${line}`); + // ...with the real SQL forwarded through the channel context. + assert(line.includes('statement=SELECT 1 AS solution'), `expected forwarded SQL, got: ${line}`); + // The runtime hook set its detection marker at boot. + assert(line.includes('"runtime":true'), `expected runtime marker, got: ${line}`); +}); + +// Exercises the SDK path e2e: `init()` wires `denoPostgresIntegration` +// (which installs the AsyncLocalStorage context strategy and subscribes to +// the channel), and we drive the `orchestrion:pg:query` channel manually, +// the same events the orchestrion transform publishes around +// `client.query()`, so no live database is needed. Asserting a nested `db` +// span proves the subscriber, the emitted attributes, AND the +// context-strategy wiring all work. +Deno.test('denoPostgresIntegration: orchestrion:pg:query channel produces a nested db span', async () => { + resetGlobals(); + const sink = transactionSink(); + init({ + dsn: 'https://username@domain/123', + tracesSampleRate: 1, + beforeSendTransaction: sink.beforeSendTransaction, + }); + + const channel = tracingChannel('orchestrion:pg:query'); + + // The shared context object orchestrion reuses across the lifecycle events + // + // `arguments[0]` is the SQL; `self.connectionParameters` is pg's resolved + // connection config. + const ctx = { + arguments: ['SELECT 1 AS solution'], + self: { connectionParameters: { host: '127.0.0.1', port: 5432, database: 'mydb', user: 'root' } }, + }; + + // Callback-success order published by orchestrion's transform: + // start -> end -> asyncStart -> asyncEnd (the span closes on asyncEnd). + startSpan({ name: 'parent', op: 'test' }, () => { + channel.start.publish(ctx); + channel.end.publish(ctx); + channel.asyncStart.publish(ctx); + channel.asyncEnd.publish(ctx); + }); + + const parent = await withTimeout( + sink.waitFor(t => t.transaction === 'parent'), + 5000, + "'parent' transaction", + ); + + const pgSpan = parent.spans?.find(s => s.op === 'db'); + assertExists(pgSpan, `expected a db child span, got ops: ${parent.spans?.map(s => s.op).join(', ')}`); + assertEquals(pgSpan!.description, 'SELECT 1 AS solution'); + assertEquals(pgSpan!.data?.['db.system'], 'postgresql'); + assertEquals(pgSpan!.data?.['db.statement'], 'SELECT 1 AS solution'); + assertEquals(pgSpan!.data?.['net.peer.name'], '127.0.0.1'); + assertEquals(pgSpan!.data?.['net.peer.port'], 5432); + assertEquals(pgSpan!.data?.['db.user'], 'root'); + assertEquals(pgSpan!.data?.['sentry.origin'], 'auto.db.orchestrion.postgres'); +}); diff --git a/packages/deno/test/orchestrion-postgres/scenario.mjs b/packages/deno/test/orchestrion-postgres/scenario.mjs new file mode 100644 index 000000000000..fa6aadbdbe31 --- /dev/null +++ b/packages/deno/test/orchestrion-postgres/scenario.mjs @@ -0,0 +1,40 @@ +// Spawned by orchestrion-postgres.test.ts via `deno run`. +// +// Importing `@sentry/deno/import` FIRST registers the orchestrion module hook, +// so the subsequent `pg` import is transformed to publish to the +// `orchestrion:pg:query` tracing channel. `client.query()` publishes `start` +// synchronously, so no live database is needed. +import '@sentry/deno/import'; + +import { tracingChannel } from 'node:diagnostics_channel'; +const { default: pg } = await import('pg'); + +const events = []; +let statement = ''; + +tracingChannel('orchestrion:pg:query').subscribe({ + start(message) { + events.push('start'); + const first = message?.arguments?.[0]; + statement = typeof first === 'string' ? first : ''; + }, + end() { + events.push('end'); + }, + asyncStart() {}, + asyncEnd() { + events.push('asyncEnd'); + }, + error() {}, +}); + +const client = new pg.Client({ host: '127.0.0.1', user: 'root', database: 'mydb' }); +try { + client.query('SELECT 1 AS solution', () => {}); +} catch { + // No live server, `start` has already published synchronously by now. +} + +const marker = globalThis.__SENTRY_ORCHESTRION__ ?? null; +// eslint-disable-next-line no-console +console.log(`SCENARIO events=${events.join(',')} statement=${statement} marker=${JSON.stringify(marker)}`); diff --git a/packages/node/src/sdk/experimentalUseDiagnosticsChannelInjection.ts b/packages/node/src/sdk/experimentalUseDiagnosticsChannelInjection.ts index 6bba862cdfc0..d5dda607b345 100644 --- a/packages/node/src/sdk/experimentalUseDiagnosticsChannelInjection.ts +++ b/packages/node/src/sdk/experimentalUseDiagnosticsChannelInjection.ts @@ -1,7 +1,8 @@ import { - mysqlChannelIntegration, lruMemoizerChannelIntegration, detectOrchestrionSetup, + mysqlChannelIntegration, + postgresChannelIntegration, } from '@sentry/server-utils/orchestrion'; import { registerDiagnosticsChannelInjection } from '@sentry/server-utils/orchestrion/register'; import type { DiagnosticsChannelInjection } from './diagnosticsChannelInjection'; @@ -37,11 +38,25 @@ import { setDiagnosticsChannelInjectionLoader } from './diagnosticsChannelInject * subscriber/channel modules; the heavy code-transform dependencies stay lazy * inside `register()` and load only when injection actually runs. * + * Per-integration options are passed here rather than via the OTel + * `xxxIntegration({...})` instances, because those are swapped out wholesale for + * their channel equivalents (and a user-provided OTel instance would otherwise + * win integration de-duplication, silently keeping the OTel path). For example, + * to suppress pg connect spans on the orchestrion path: + * + * ```ts + * Sentry.experimentalUseDiagnosticsChannelInjection({ postgres: { ignoreConnectSpans: true } }); + * ``` + * * @experimental May change or be removed in any release. */ export function experimentalUseDiagnosticsChannelInjection(): void { setDiagnosticsChannelInjectionLoader((): DiagnosticsChannelInjection => { - const integrations = [mysqlChannelIntegration(), lruMemoizerChannelIntegration()] as const; + const integrations = [ + mysqlChannelIntegration(), + postgresChannelIntegration(), + lruMemoizerChannelIntegration(), + ] as const; const replacedOtelIntegrationNames = integrations.map(i => i.name); return { diff --git a/packages/server-utils/src/integrations/tracing-channel/postgres.ts b/packages/server-utils/src/integrations/tracing-channel/postgres.ts new file mode 100644 index 000000000000..a98b8068058e --- /dev/null +++ b/packages/server-utils/src/integrations/tracing-channel/postgres.ts @@ -0,0 +1,317 @@ +import * as diagnosticsChannel from 'node:diagnostics_channel'; +import type { IntegrationFn, Scope, Span, SpanAttributes } from '@sentry/core'; +import { + bindScopeToEmitter, + debug, + defineIntegration, + getActiveSpan, + getCurrentScope, + SEMANTIC_ATTRIBUTE_SENTRY_ORIGIN, + SPAN_STATUS_ERROR, + startInactiveSpan, + withScope, +} from '@sentry/core'; +import { DEBUG_BUILD } from '../../debug-build'; +import { CHANNELS } from '../../orchestrion/channels'; + +// NOTE: this uses the same name as the OTel integration by design. +// When enabled, the OTel 'Postgres' integration is omitted from the default set. +const INTEGRATION_NAME = 'Postgres' as const; + +// Only the query span carries an origin (the connect/pool-connect spans don't, +// so they default to 'manual'). +const ORIGIN = 'auto.db.orchestrion.postgres'; + +// OpenTelemetry "OLD" db/net semantic-conventions, inlined to keep this +// integration free of `@opentelemetry/*` deps. +const ATTR_DB_SYSTEM = 'db.system'; +const ATTR_DB_NAME = 'db.name'; +const ATTR_DB_CONNECTION_STRING = 'db.connection_string'; +const ATTR_DB_USER = 'db.user'; +const ATTR_DB_STATEMENT = 'db.statement'; +const ATTR_NET_PEER_NAME = 'net.peer.name'; +const ATTR_NET_PEER_PORT = 'net.peer.port'; +const ATTR_PG_PLAN = 'db.postgresql.plan'; +const ATTR_PG_IDLE_TIMEOUT = 'db.postgresql.idle.timeout.millis'; +const ATTR_PG_MAX_CLIENT = 'db.postgresql.max.client'; +const DB_SYSTEM_POSTGRESQL = 'postgresql'; + +// We set `op: 'db'` and the SQL description directly here (same as mysql +// orchestrion) rather than relying on the OTel pipeline's `inferDbSpanData` +// processor, which only runs in the node SDK, so setting them here is what +// makes the spans correct on the other runtimes +// +// The user-visible span is identical to OTel: query spans are named after +// `db.statement`; connect/pool-connect spans keep these names. +const SPAN_QUERY_FALLBACK = 'pg.query'; +const SPAN_CONNECT = 'pg.connect'; +const SPAN_POOL_CONNECT = 'pg-pool.connect'; + +/** + * The shape orchestrion's wrapAuto transform attaches to the tracing-channel + * `context`. `arguments` is the live call args; orchestrion splices the user's + * callback out and inserts its own wrapper at the same index before `start`, + * and the `start` hook re-wraps that entry to restore the caller's scope + * across pg's async callback dispatch. + */ +interface PgChannelContext { + arguments: unknown[]; + self?: unknown; + result?: unknown; + error?: unknown; +} + +interface PgConnectionParams { + database?: string; + host?: string; + port?: number; + user?: string; + connectionString?: string; +} + +interface PgPoolOptions extends PgConnectionParams { + idleTimeoutMillis?: number; + // pg-pool stores the max pool size as `max` (defaulting it to 10 in its + // constructor). The OTel pg instrumentation reads a `maxClient` field that + // pg-pool never sets, so its `db.postgresql.max.client` attribute is always + // dropped; we read the real `max` so the attribute is actually populated. + max?: number; +} + +const _postgresChannelIntegration = ((options: { ignoreConnectSpans?: boolean } = {}) => { + return { + name: INTEGRATION_NAME, + setupOnce() { + // Query spans: `pg`/native `Client.prototype.query`. + subscribeQueryLikeChannel(CHANNELS.PG_QUERY, querySpanOptions); + + // Connect spans, gated by `ignoreConnectSpans` (same as OTel pg). + // `Client.prototype.connect` (pg + native) + // and `Pool.prototype.connect` (pg-pool). + if (!options.ignoreConnectSpans) { + subscribeQueryLikeChannel(CHANNELS.PG_CONNECT, connectSpanOptions); + subscribeQueryLikeChannel(CHANNELS.PGPOOL_CONNECT, poolConnectSpanOptions); + } + }, + }; +}) satisfies IntegrationFn; + +/** + * Subscribe to a pg tracing-channel and manage a span across its lifecycle. + * Shared by the query/connect/pool-connect channels. They differ only in how + * the span's name + attributes are built (`getSpanOptions`). + */ +function subscribeQueryLikeChannel( + channelName: string, + getSpanOptions: (ctx: PgChannelContext) => { name: string; op: string; attributes: SpanAttributes }, +): void { + DEBUG_BUILD && debug.log(`[orchestrion:pg] subscribing to channel "${channelName}"`); + const ch = diagnosticsChannel.tracingChannel(channelName); + const spans = new WeakMap(); + const parentScopes = new WeakMap(); + + ch.subscribe({ + start(rawCtx) { + // only instrument when there's an active span + if (!getActiveSpan()) { + return; + } + const ctx = rawCtx as PgChannelContext; + const span = startInactiveSpan(getSpanOptions(ctx)); + spans.set(rawCtx, span); + + // Capture the caller's scope while we're still synchronously inside the + // call, to restore it for deferred callbacks/streamed events + // (pg dispatches them outside the original async scope). + const scope = getCurrentScope(); + parentScopes.set(rawCtx, scope); + + // Re-wrap orchestrion's callback wrapper so the user's callback and + // anything chained off it runs with the captured scope active. + if (ctx.arguments.length > 0) { + const cbIdx = ctx.arguments.length - 1; + const orchestrionWrappedCb = ctx.arguments[cbIdx]; + if (typeof orchestrionWrappedCb === 'function') { + const wrapped = orchestrionWrappedCb as (...a: unknown[]) => unknown; + ctx.arguments[cbIdx] = function (this: unknown, ...args: unknown[]): unknown { + return withScope(scope, () => wrapped.apply(this, args)); + }; + } + } + }, + + end(rawCtx) { + const ctx = rawCtx as PgChannelContext; + + // Sync throw: `end` fires after `error`, so `ctx.error` is set; close + // now since no `asyncEnd` will fire. + if (ctx.error !== undefined) { + finishSpan(rawCtx); + return; + } + + // Streamable `Submittable` (e.g. `client.query(new Query())`): + // orchestrion stored the returned emitter on `ctx.result` and fired no + // async events. Bind the captured scope to it and finish on + // end or error. + const result = ctx.result; + if (result && typeof result === 'object' && hasOnMethod(result)) { + const span = spans.get(rawCtx); + if (!span) { + return; + } + const parentScope = parentScopes.get(rawCtx); + if (parentScope) { + bindScopeToEmitter(result, parentScope); + } + result.on('error', err => { + span.setStatus({ code: SPAN_STATUS_ERROR, message: err instanceof Error ? err.message : 'unknown_error' }); + finishSpan(rawCtx); + }); + result.on('end', () => finishSpan(rawCtx)); + return; + } + + // Callback/promise path: `asyncEnd` closes the span. + }, + + error(rawCtx) { + const ctx = rawCtx as PgChannelContext; + const span = spans.get(rawCtx); + if (!span) { + return; + } + span.setStatus({ + code: SPAN_STATUS_ERROR, + message: ctx.error instanceof Error ? ctx.error.message : 'unknown_error', + }); + }, + + asyncStart() { + // No-op: we end on `asyncEnd` so the span covers the full duration. + }, + + asyncEnd(rawCtx) { + finishSpan(rawCtx); + }, + }); + + function finishSpan(rawCtx: object): void { + const span = spans.get(rawCtx); + if (span) { + span.end(); + spans.delete(rawCtx); + parentScopes.delete(rawCtx); + } + } +} + +function querySpanOptions(ctx: PgChannelContext): { name: string; op: string; attributes: SpanAttributes } { + const params = (ctx.self as { connectionParameters?: PgConnectionParams } | undefined)?.connectionParameters ?? {}; + const queryConfig = extractQueryConfig(ctx.arguments); + return { + // The description is the SQL statement + name: queryConfig?.text ?? SPAN_QUERY_FALLBACK, + op: 'db', + attributes: { + ...getConnectionAttributes(params), + [SEMANTIC_ATTRIBUTE_SENTRY_ORIGIN]: ORIGIN, + ...(queryConfig?.text ? { [ATTR_DB_STATEMENT]: queryConfig.text } : {}), + ...(typeof queryConfig?.name === 'string' ? { [ATTR_PG_PLAN]: queryConfig.name } : {}), + }, + }; +} + +function connectSpanOptions(ctx: PgChannelContext): { name: string; op: string; attributes: SpanAttributes } { + const params = (ctx.self as { connectionParameters?: PgConnectionParams } | undefined)?.connectionParameters ?? {}; + // No origin set -> defaults to 'manual' + return { name: SPAN_CONNECT, op: 'db', attributes: getConnectionAttributes(params) }; +} + +function poolConnectSpanOptions(ctx: PgChannelContext): { name: string; op: string; attributes: SpanAttributes } { + const opts = (ctx.self as { options?: PgPoolOptions } | undefined)?.options ?? {}; + return { name: SPAN_POOL_CONNECT, op: 'db', attributes: getPoolConnectionAttributes(opts) }; +} + +function hasOnMethod(obj: object): obj is { on: (event: string, listener: (arg?: unknown) => void) => unknown } { + return 'on' in obj && typeof (obj as { on?: unknown }).on === 'function'; +} + +// `client.query(text, cb?)`, `client.query(text, values, cb?)`, and +// `client.query(configObj, cb?)` are all valid; normalize to `{ text, name }` +// (the only fields the span needs). Returns undefined for invalid args. +function extractQueryConfig(args: unknown[]): { text: string; name?: unknown } | undefined { + const arg0 = args[0]; + if (typeof arg0 === 'string') { + return { text: arg0 }; + } + if (arg0 && typeof arg0 === 'object' && typeof (arg0 as { text?: unknown }).text === 'string') { + const obj = arg0 as { text: string; name?: unknown }; + return { text: obj.text, name: obj.name }; + } + return undefined; +} + +function getConnectionAttributes(params: PgConnectionParams): SpanAttributes { + return { + [ATTR_DB_SYSTEM]: DB_SYSTEM_POSTGRESQL, + [ATTR_DB_CONNECTION_STRING]: getConnectionString(params), + ...(params.database ? { [ATTR_DB_NAME]: params.database } : {}), + ...(params.user ? { [ATTR_DB_USER]: params.user } : {}), + ...(params.host ? { [ATTR_NET_PEER_NAME]: params.host } : {}), + ...(Number.isInteger(params.port) ? { [ATTR_NET_PEER_PORT]: params.port } : {}), + }; +} + +function getPoolConnectionAttributes(opts: PgPoolOptions): SpanAttributes { + let url: URL | undefined; + try { + url = opts.connectionString ? new URL(opts.connectionString) : undefined; + } catch { + url = undefined; + } + const database = url?.pathname.slice(1) || opts.database; + const host = url?.hostname || opts.host; + const port = url ? Number(url.port) || undefined : Number.isInteger(opts.port) ? opts.port : undefined; + const user = url?.username || opts.user; + return { + [ATTR_DB_SYSTEM]: DB_SYSTEM_POSTGRESQL, + [ATTR_DB_CONNECTION_STRING]: getConnectionString(opts), + ...(opts.idleTimeoutMillis !== undefined ? { [ATTR_PG_IDLE_TIMEOUT]: opts.idleTimeoutMillis } : {}), + ...(opts.max !== undefined ? { [ATTR_PG_MAX_CLIENT]: opts.max } : {}), + ...(database ? { [ATTR_DB_NAME]: database } : {}), + ...(host ? { [ATTR_NET_PEER_NAME]: host } : {}), + ...(port !== undefined ? { [ATTR_NET_PEER_PORT]: port } : {}), + ...(user ? { [ATTR_DB_USER]: user } : {}), + }; +} + +// Builds `postgresql://host:port/database`, masking credentials when a raw +// connection string was provided. +function getConnectionString(params: PgConnectionParams): string { + if (params.connectionString) { + try { + const url = new URL(params.connectionString); + url.username = ''; + url.password = ''; + return url.toString(); + } catch { + return 'postgresql://localhost:5432/'; + } + } + const host = params.host || 'localhost'; + const port = params.port || 5432; + const database = params.database || ''; + return `postgresql://${host}:${port}/${database}`; +} + +/** + * EXPERIMENTAL: orchestrion-driven `pg` (node-postgres) integration. + * + * Subscribes to the `orchestrion:pg:query`/`:connect` and + * `orchestrion:pg-pool:connect` diagnostics_channels that the orchestrion code + * transform injects into `pg`'s `Client.prototype.query`/`connect` + * and `pg-pool`'s `Pool.prototype.connect`. Requires the orchestrion runtime + * hook or bundler plugin to be active. + */ +export const postgresChannelIntegration = defineIntegration(_postgresChannelIntegration); diff --git a/packages/server-utils/src/orchestrion/channels.ts b/packages/server-utils/src/orchestrion/channels.ts index ad2d8ccdd4dd..71cba4d6e8da 100644 --- a/packages/server-utils/src/orchestrion/channels.ts +++ b/packages/server-utils/src/orchestrion/channels.ts @@ -14,6 +14,9 @@ export const CHANNELS = { MYSQL_QUERY: 'orchestrion:mysql:query', LRU_MEMOIZER_LOAD: 'orchestrion:lru-memoizer:load', + PG_QUERY: 'orchestrion:pg:query', + PG_CONNECT: 'orchestrion:pg:connect', + PGPOOL_CONNECT: 'orchestrion:pg-pool:connect', } as const; export type ChannelName = (typeof CHANNELS)[keyof typeof CHANNELS]; diff --git a/packages/server-utils/src/orchestrion/config.ts b/packages/server-utils/src/orchestrion/config.ts index 104df2185386..a41e255772c7 100644 --- a/packages/server-utils/src/orchestrion/config.ts +++ b/packages/server-utils/src/orchestrion/config.ts @@ -38,6 +38,44 @@ export const SENTRY_INSTRUMENTATIONS: InstrumentationConfig[] = [ module: { name: 'lru-memoizer', versionRange: '>=2.1.0 <4', filePath: 'lib/async.js' }, functionQuery: { functionName: 'memoizedFunction', kind: 'Callback' }, }, + // `pg` (node-postgres). + // instruments `Client.prototype.query`/`connect` (both the JS and native + // clients) plus `pg-pool`'s `Pool.prototype.connect`. + // `Auto` covers the callback, promise, and streamable-`Submittable` + // call shapes (like mysql). + // `pg/lib/client.js` is `class Client { query() {...} connect() {...} }`, + // so `className`+`methodName` matches directly. + { + channelName: 'query', + module: { name: 'pg', versionRange: '>=8.0.3 <9', filePath: 'lib/client.js' }, + functionQuery: { className: 'Client', methodName: 'query', kind: 'Auto' }, + }, + { + channelName: 'connect', + module: { name: 'pg', versionRange: '>=8.0.3 <9', filePath: 'lib/client.js' }, + functionQuery: { className: 'Client', methodName: 'connect', kind: 'Auto' }, + }, + // The native client (`pg/lib/native/client.js`) is a constructor function, + // not a class. + // `Client.prototype.query = function (config, values, callback) {...}` + // so it needs `expressionName` (the mysql shape), publishing to the SAME + // `orchestrion:pg:query`/`:connect` channels as the JS client. + { + channelName: 'query', + module: { name: 'pg', versionRange: '>=8.0.3 <9', filePath: 'lib/native/client.js' }, + functionQuery: { expressionName: 'query', kind: 'Auto' }, + }, + { + channelName: 'connect', + module: { name: 'pg', versionRange: '>=8.0.3 <9', filePath: 'lib/native/client.js' }, + functionQuery: { expressionName: 'connect', kind: 'Auto' }, + }, + // `pg-pool` is `class Pool extends EventEmitter { connect(cb) {...} }`. + { + channelName: 'connect', + module: { name: 'pg-pool', versionRange: '>=2.0.0 <4', filePath: 'index.js' }, + functionQuery: { className: 'Pool', methodName: 'connect', kind: 'Auto' }, + }, ]; /** diff --git a/packages/server-utils/src/orchestrion/index.ts b/packages/server-utils/src/orchestrion/index.ts index 4b182e51ec13..e1b55a602876 100644 --- a/packages/server-utils/src/orchestrion/index.ts +++ b/packages/server-utils/src/orchestrion/index.ts @@ -1,3 +1,4 @@ export { detectOrchestrionSetup } from './detect'; export { mysqlChannelIntegration } from '../integrations/tracing-channel/mysql'; export { lruMemoizerChannelIntegration } from '../integrations/tracing-channel/lru-memoizer'; +export { postgresChannelIntegration } from '../integrations/tracing-channel/postgres'; diff --git a/packages/server-utils/test/orchestrion/postgres-ignore-connect.test.ts b/packages/server-utils/test/orchestrion/postgres-ignore-connect.test.ts new file mode 100644 index 000000000000..0949296084f8 --- /dev/null +++ b/packages/server-utils/test/orchestrion/postgres-ignore-connect.test.ts @@ -0,0 +1,19 @@ +import { tracingChannel } from 'node:diagnostics_channel'; +import { describe, expect, it } from 'vitest'; +import { postgresChannelIntegration } from '../../src/orchestrion'; +import { CHANNELS } from '../../src/orchestrion/channels'; + +// `setupOnce` subscribes to process-global `tracingChannel`s, so asserting the +// ABSENCE of connect subscribers only holds when no other (default-options) +// integration in the same module context has subscribed. vitest isolates +// module state per file, so this file keeps that assertion clean (the default +// options integration is exercised in `postgres.test.ts`). +describe('postgresChannelIntegration({ ignoreConnectSpans: true })', () => { + it('subscribes to the query channel but NOT the connect / pool-connect channels', () => { + postgresChannelIntegration({ ignoreConnectSpans: true }).setupOnce?.(); + + expect(tracingChannel(CHANNELS.PG_QUERY).start.hasSubscribers).toBe(true); + expect(tracingChannel(CHANNELS.PG_CONNECT).start.hasSubscribers).toBe(false); + expect(tracingChannel(CHANNELS.PGPOOL_CONNECT).start.hasSubscribers).toBe(false); + }); +}); diff --git a/packages/server-utils/test/orchestrion/postgres.test.ts b/packages/server-utils/test/orchestrion/postgres.test.ts new file mode 100644 index 000000000000..b206562c33de --- /dev/null +++ b/packages/server-utils/test/orchestrion/postgres.test.ts @@ -0,0 +1,169 @@ +import { tracingChannel } from 'node:diagnostics_channel'; +import type { Span } from '@sentry/core'; +import * as SentryCore from '@sentry/core'; +import { afterEach, beforeAll, beforeEach, describe, expect, it, type MockInstance, vi } from 'vitest'; +import { postgresChannelIntegration } from '../../src/orchestrion'; +import { CHANNELS } from '../../src/orchestrion/channels'; + +// The subscriber builds spans via `startInactiveSpan` and gates on +// `getActiveSpan`. We spy both: `getActiveSpan` to satisfy the +// requireParentSpan gate, and `startInactiveSpan` to capture the span +// options the subscriber builds (name + raw attributes) and to track the +// span's lifecycle. The final `op: 'db'` / SQL description come from the +// SDK's `inferDbSpanData` processor, which isn't wired up here. That's +// covered by the integration test. +function makeSpan(): Span { + return { end: vi.fn(), setStatus: vi.fn() } as unknown as Span; +} + +interface ChannelContext { + arguments: unknown[]; + self?: unknown; +} + +describe('postgresChannelIntegration', () => { + let startInactiveSpanSpy: MockInstance; + let getActiveSpanSpy: MockInstance; + let span: Span; + + // Subscribe once for the whole file so a single subscriber handles each + // publish (avoids accumulating duplicate subscriptions across tests). + beforeAll(() => { + postgresChannelIntegration().setupOnce?.(); + }); + + beforeEach(() => { + span = makeSpan(); + startInactiveSpanSpy = vi.spyOn(SentryCore, 'startInactiveSpan').mockReturnValue(span); + // A truthy active span by default, so the requireParentSpan gate passes. + getActiveSpanSpy = vi.spyOn(SentryCore, 'getActiveSpan').mockReturnValue({} as Span); + }); + + afterEach(() => { + vi.restoreAllMocks(); + }); + + const CONNECTION = { database: 'tests', host: 'localhost', port: 5432, user: 'tim' }; + + it('query: builds a `pg.query` span with db attributes and the orchestrion origin', async () => { + const ctx: ChannelContext = { arguments: ['SELECT * FROM "User"'], self: { connectionParameters: CONNECTION } }; + + await tracingChannel(CHANNELS.PG_QUERY).tracePromise(async () => ({ rows: [] }), ctx); + + expect(startInactiveSpanSpy).toHaveBeenCalledWith( + expect.objectContaining({ + name: 'SELECT * FROM "User"', + op: 'db', + attributes: expect.objectContaining({ + 'db.system': 'postgresql', + 'db.name': 'tests', + 'db.user': 'tim', + 'net.peer.name': 'localhost', + 'net.peer.port': 5432, + 'db.connection_string': 'postgresql://localhost:5432/tests', + 'db.statement': 'SELECT * FROM "User"', + 'sentry.origin': 'auto.db.orchestrion.postgres', + }), + }), + ); + // Ended on `asyncEnd` (the full promise round-trip). + expect(span.end).toHaveBeenCalledTimes(1); + }); + + it('query: records the prepared-statement name as `db.postgresql.plan`', async () => { + const ctx: ChannelContext = { + arguments: [{ name: 'select-user-by-email', text: 'SELECT * FROM "User" WHERE "email" = $1', values: ['x'] }], + self: { connectionParameters: CONNECTION }, + }; + + await tracingChannel(CHANNELS.PG_QUERY).tracePromise(async () => ({ rows: [] }), ctx); + + expect(startInactiveSpanSpy).toHaveBeenCalledWith( + expect.objectContaining({ + name: 'SELECT * FROM "User" WHERE "email" = $1', + op: 'db', + attributes: expect.objectContaining({ + 'db.statement': 'SELECT * FROM "User" WHERE "email" = $1', + 'db.postgresql.plan': 'select-user-by-email', + 'sentry.origin': 'auto.db.orchestrion.postgres', + }), + }), + ); + }); + + it('query: sets error status and ends the span when the query rejects', async () => { + const ctx: ChannelContext = { arguments: ['SELECT 1'], self: { connectionParameters: CONNECTION } }; + + await expect( + tracingChannel(CHANNELS.PG_QUERY).tracePromise(async () => { + throw new Error('boom'); + }, ctx), + ).rejects.toThrow('boom'); + + expect(span.setStatus).toHaveBeenCalledWith({ code: expect.anything(), message: 'boom' }); + expect(span.end).toHaveBeenCalledTimes(1); + }); + + it('connect: builds a `pg.connect` span with no origin (defaults to manual)', async () => { + const ctx: ChannelContext = { arguments: [], self: { connectionParameters: CONNECTION } }; + + await tracingChannel(CHANNELS.PG_CONNECT).tracePromise(async () => undefined, ctx); + + expect(startInactiveSpanSpy).toHaveBeenCalledWith( + expect.objectContaining({ + name: 'pg.connect', + op: 'db', + attributes: expect.objectContaining({ 'db.system': 'postgresql', 'db.name': 'tests' }), + }), + ); + // Connect spans must NOT set an origin (so they default to 'manual'). + const options = startInactiveSpanSpy.mock.calls[0]![0] as { attributes: Record }; + expect(options.attributes['sentry.origin']).toBeUndefined(); + expect(span.end).toHaveBeenCalledTimes(1); + }); + + it('pool connect: builds a `pg-pool.connect` span with masked connection string + pool attributes', async () => { + const ctx: ChannelContext = { + arguments: [], + self: { + options: { + connectionString: 'postgresql://user:secret@localhost:5494/tests', + idleTimeoutMillis: 10_000, + // pg-pool exposes the max pool size as `max` (not `maxClient`). + max: 10, + }, + }, + }; + + await tracingChannel(CHANNELS.PGPOOL_CONNECT).tracePromise(async () => undefined, ctx); + + expect(startInactiveSpanSpy).toHaveBeenCalledWith( + expect.objectContaining({ + name: 'pg-pool.connect', + op: 'db', + attributes: expect.objectContaining({ + 'db.system': 'postgresql', + 'db.name': 'tests', + 'db.user': 'user', + 'net.peer.name': 'localhost', + 'net.peer.port': 5494, + // Credentials masked out of the connection string. + 'db.connection_string': 'postgresql://localhost:5494/tests', + 'db.postgresql.idle.timeout.millis': 10_000, + 'db.postgresql.max.client': 10, + }), + }), + ); + const options = startInactiveSpanSpy.mock.calls[0]![0] as { attributes: Record }; + expect(options.attributes['sentry.origin']).toBeUndefined(); + }); + + it('requireParentSpan: does not create a span when there is no active span', async () => { + getActiveSpanSpy.mockReturnValue(undefined); + const ctx: ChannelContext = { arguments: ['SELECT 1'], self: { connectionParameters: CONNECTION } }; + + await tracingChannel(CHANNELS.PG_QUERY).tracePromise(async () => ({ rows: [] }), ctx); + + expect(startInactiveSpanSpy).not.toHaveBeenCalled(); + }); +}); From b23de23fd8e7a7fc5e7f31e4ce5102f685b0a813 Mon Sep 17 00:00:00 2001 From: isaacs Date: Sat, 27 Jun 2026 16:18:04 -0700 Subject: [PATCH 12/14] fix(node): do not pass options to orchestrion opt-in This resolves the API surface wart where options for orchestrion integrations had to be passed into the `experimentalUseDiagnosticsChannelInjection` method, and returns that to being an argument-free void-returning opt-in that can be easily no-op'ed in the future, and all options are passed in via `Sentry.init()` as they were in the prior OTel implementation. --- .../instrument-orchestrion-ignoreConnect.mjs | 17 +++++ .../suites/tracing/postgres/test.ts | 55 ++++++++++++++ .../src/integrations/tracing/mysql/index.ts | 8 +++ .../integrations/tracing/postgres/index.ts | 8 +++ .../src/sdk/diagnosticsChannelInjection.ts | 37 ++++++++-- ...erimentalUseDiagnosticsChannelInjection.ts | 54 +++++++------- packages/node/src/sdk/index.ts | 26 ++----- .../sdk/orchestrionIntegrationSwap.test.ts | 71 +++++++++++++++++++ 8 files changed, 226 insertions(+), 50 deletions(-) create mode 100644 dev-packages/node-integration-tests/suites/tracing/postgres/instrument-orchestrion-ignoreConnect.mjs create mode 100644 packages/node/test/sdk/orchestrionIntegrationSwap.test.ts diff --git a/dev-packages/node-integration-tests/suites/tracing/postgres/instrument-orchestrion-ignoreConnect.mjs b/dev-packages/node-integration-tests/suites/tracing/postgres/instrument-orchestrion-ignoreConnect.mjs new file mode 100644 index 000000000000..0bf0316cf234 --- /dev/null +++ b/dev-packages/node-integration-tests/suites/tracing/postgres/instrument-orchestrion-ignoreConnect.mjs @@ -0,0 +1,17 @@ +// Same orchestrion opt-in as `instrument-orchestrion.mjs`, but configuring the +// integration the normal way: `postgresIntegration({ ignoreConnectSpans: true })`. +// Because injection was opted into, `postgresIntegration()` builds the +// diagnostics-channel implementation and forwards the option to it — so connect +// spans are suppressed on the orchestrion path exactly as on the OTel one. +import * as Sentry from '@sentry/node'; +import { loggingTransport } from '@sentry-internal/node-integration-tests'; + +Sentry.experimentalUseDiagnosticsChannelInjection(); + +Sentry.init({ + dsn: 'https://public@dsn.ingest.sentry.io/1337', + release: '1.0', + tracesSampleRate: 1.0, + integrations: [Sentry.postgresIntegration({ ignoreConnectSpans: true })], + transport: loggingTransport, +}); diff --git a/dev-packages/node-integration-tests/suites/tracing/postgres/test.ts b/dev-packages/node-integration-tests/suites/tracing/postgres/test.ts index f72dac61a1b1..053d2b7f80f9 100644 --- a/dev-packages/node-integration-tests/suites/tracing/postgres/test.ts +++ b/dev-packages/node-integration-tests/suites/tracing/postgres/test.ts @@ -542,5 +542,60 @@ describe('postgres auto instrumentation', () => { }, ); }); + + describe('ignoreConnectSpans', () => { + createEsmAndCjsTests( + __dirname, + 'scenario.mjs', + 'instrument-orchestrion-ignoreConnect.mjs', + (createTestRunner, test) => { + test("doesn't emit connect spans if ignoreConnectSpans is true", { timeout: 90_000 }, async () => { + await createTestRunner() + .withDockerCompose({ workingDirectory: [__dirname] }) + .expect({ + transaction: txn => { + const spanNames = txn.spans?.map(span => span.description); + // No `pg.connect` / `pg-pool.connect` spans were produced. + expect(spanNames?.find(name => name?.includes('connect'))).toBeUndefined(); + // ...but the query spans are still instrumented via orchestrion. + expect(txn).toMatchObject({ + transaction: 'Test Transaction', + spans: expect.arrayContaining([ + expect.objectContaining({ + data: expect.objectContaining({ + 'db.system': 'postgresql', + 'db.name': 'tests', + 'db.statement': 'INSERT INTO "User" ("email", "name") VALUES ($1, $2)', + 'sentry.origin': ORIGIN, + 'sentry.op': 'db', + }), + description: 'INSERT INTO "User" ("email", "name") VALUES ($1, $2)', + op: 'db', + status: 'ok', + origin: ORIGIN, + }), + expect.objectContaining({ + data: expect.objectContaining({ + 'db.system': 'postgresql', + 'db.name': 'tests', + 'db.statement': 'SELECT * FROM "User"', + 'sentry.origin': ORIGIN, + 'sentry.op': 'db', + }), + description: 'SELECT * FROM "User"', + op: 'db', + status: 'ok', + origin: ORIGIN, + }), + ]), + }); + }, + }) + .start() + .completed(); + }); + }, + ); + }); }); }); diff --git a/packages/node/src/integrations/tracing/mysql/index.ts b/packages/node/src/integrations/tracing/mysql/index.ts index e29d1b6034f1..e77353c68db3 100644 --- a/packages/node/src/integrations/tracing/mysql/index.ts +++ b/packages/node/src/integrations/tracing/mysql/index.ts @@ -2,12 +2,20 @@ import { MySQLInstrumentation } from './vendored/instrumentation'; import type { IntegrationFn } from '@sentry/core'; import { defineIntegration } from '@sentry/core'; import { generateInstrumentOnce } from '@sentry/node-core'; +import { getChannelIntegrationFactory } from '../../../sdk/diagnosticsChannelInjection'; const INTEGRATION_NAME = 'Mysql' as const; export const instrumentMysql = generateInstrumentOnce(INTEGRATION_NAME, () => new MySQLInstrumentation({})); const _mysqlIntegration = (() => { + // When opted into diagnostics-channel injection, build + // orchestrion implementation instead, with same options. + const channelIntegration = getChannelIntegrationFactory(INTEGRATION_NAME); + if (channelIntegration) { + return channelIntegration(); + } + return { name: INTEGRATION_NAME, setupOnce() { diff --git a/packages/node/src/integrations/tracing/postgres/index.ts b/packages/node/src/integrations/tracing/postgres/index.ts index 8db6589cc041..155cccecb520 100644 --- a/packages/node/src/integrations/tracing/postgres/index.ts +++ b/packages/node/src/integrations/tracing/postgres/index.ts @@ -2,6 +2,7 @@ import { PgInstrumentation } from './vendored/instrumentation'; import type { IntegrationFn } from '@sentry/core'; import { defineIntegration } from '@sentry/core'; import { generateInstrumentOnce } from '@sentry/node-core'; +import { getChannelIntegrationFactory } from '../../../sdk/diagnosticsChannelInjection'; interface PostgresIntegrationOptions { ignoreConnectSpans?: boolean; @@ -18,6 +19,13 @@ export const instrumentPostgres = generateInstrumentOnce( ); const _postgresIntegration = ((options?: PostgresIntegrationOptions) => { + // When opted into diagnostics-channel injection, build + // orchestrion implementation instead, with same options. + const channelIntegration = getChannelIntegrationFactory(INTEGRATION_NAME); + if (channelIntegration) { + return channelIntegration(options); + } + return { name: INTEGRATION_NAME, setupOnce() { diff --git a/packages/node/src/sdk/diagnosticsChannelInjection.ts b/packages/node/src/sdk/diagnosticsChannelInjection.ts index c152d1177f56..5061ccf0180e 100644 --- a/packages/node/src/sdk/diagnosticsChannelInjection.ts +++ b/packages/node/src/sdk/diagnosticsChannelInjection.ts @@ -1,4 +1,4 @@ -import type { Integration } from '@sentry/core'; +import type { IntegrationFn } from '@sentry/core'; /** * The orchestrion-driven pieces, resolved lazily by the opt-in loader. @@ -13,10 +13,15 @@ import type { Integration } from '@sentry/core'; * normally. */ export interface DiagnosticsChannelInjection { - /** Channel-based integrations to register, replacing their OTel equivalents. */ - integrations: Integration[] | readonly Integration[]; - /** OTel integration names these replace; filtered out of the default set. */ - replacedOtelIntegrationNames: string[]; + /** + * Channel-based integration factories, keyed by the OTel integration name + * they replace (e.g. `Postgres`). The matching OTel integration factory + * (e.g. `postgresIntegration()`) looks itself up here and, when present, + * builds the channel implementation instead, forwarding the user's options. + * This is how opting in swaps implementations without the OTel factory + * ever importing the orchestrion code (which would defeat tree-shaking). + */ + integrationFactories: Record; /** Installs the module hooks that inject the diagnostics channels. */ register: () => void; /** Warns (DEBUG only) about missing or doubled channel injection. */ @@ -35,6 +40,8 @@ let cached: DiagnosticsChannelInjection | undefined; */ export function setDiagnosticsChannelInjectionLoader(load: () => DiagnosticsChannelInjection): void { loader = load; + // A new loader invalidates anything memoized from a previous one. + cached = undefined; } /** Whether `experimentalUseDiagnosticsChannelInjection()` was called. */ @@ -54,3 +61,23 @@ export function resolveDiagnosticsChannelInjection(): DiagnosticsChannelInjectio } return (cached ??= loader()); } + +/** + * The channel-based integration factory registered to replace the OTel + * integration named `name` (e.g. `Postgres`), or `undefined` when the app + * didn't opt into diagnostics-channel injection. Node integration factories + * call this to decide whether to build the orchestrion implementation (with + * the user's options) instead of the OTel one, without ever importing + * orchestrion directly. + * + * @internal + */ +export function getChannelIntegrationFactory(name: string): IntegrationFn | undefined { + return resolveDiagnosticsChannelInjection()?.integrationFactories[name]; +} + +/** Test-only: clear the registered loader and memoized result. */ +export function _resetDiagnosticsChannelInjectionForTesting(): void { + loader = undefined; + cached = undefined; +} diff --git a/packages/node/src/sdk/experimentalUseDiagnosticsChannelInjection.ts b/packages/node/src/sdk/experimentalUseDiagnosticsChannelInjection.ts index d5dda607b345..08b7804386dc 100644 --- a/packages/node/src/sdk/experimentalUseDiagnosticsChannelInjection.ts +++ b/packages/node/src/sdk/experimentalUseDiagnosticsChannelInjection.ts @@ -28,9 +28,25 @@ import { setDiagnosticsChannelInjectionLoader } from './diagnosticsChannelInject * OpenTelemetry ones, and installs the module hooks that inject the channels * (so libraries imported after `init()` publish the channel events). * + * The swap is per-integration and transparent: the OTel integration factories + * build their channel equivalent when this has been called, so you configure + * them exactly as before. + * + * For example, to suppress pg connect spans on the orchestrion path: + * + * ```ts + * Sentry.experimentalUseDiagnosticsChannelInjection(); + * Sentry.init({ + * dsn: '__DSN__', + * integrations: [Sentry.postgresIntegration({ ignoreConnectSpans: true })], + * }); + * ``` + * * This is a standalone function rather than an `init()` option so that a * bundler drops all of it (and its transitive deps) when this function isn't - * called. `init()` reads the loader registered below. + * called. The integration factories reach the channel implementations only via + * the registry populated below, so they never import the orchestrion code + * themselves, keeping the tree-shaking boundary at this function. * * An app that DOES call it gets the orchestrion code bundled as intended. * @@ -38,32 +54,22 @@ import { setDiagnosticsChannelInjectionLoader } from './diagnosticsChannelInject * subscriber/channel modules; the heavy code-transform dependencies stay lazy * inside `register()` and load only when injection actually runs. * - * Per-integration options are passed here rather than via the OTel - * `xxxIntegration({...})` instances, because those are swapped out wholesale for - * their channel equivalents (and a user-provided OTel instance would otherwise - * win integration de-duplication, silently keeping the OTel path). For example, - * to suppress pg connect spans on the orchestrion path: - * - * ```ts - * Sentry.experimentalUseDiagnosticsChannelInjection({ postgres: { ignoreConnectSpans: true } }); - * ``` - * * @experimental May change or be removed in any release. */ +// Note: This function is to remain argument-free and void-returning so that +// it can be easily no-op'ed (or provided with a `false` flag or something to +// opt-out) when orchestrion becomes the default. export function experimentalUseDiagnosticsChannelInjection(): void { - setDiagnosticsChannelInjectionLoader((): DiagnosticsChannelInjection => { - const integrations = [ - mysqlChannelIntegration(), - postgresChannelIntegration(), - lruMemoizerChannelIntegration(), - ] as const; - const replacedOtelIntegrationNames = integrations.map(i => i.name); - - return { - integrations, - replacedOtelIntegrationNames, + setDiagnosticsChannelInjectionLoader( + (): DiagnosticsChannelInjection => ({ + // Keyed by the OTel integration name each one replaces. + integrationFactories: { + Mysql: mysqlChannelIntegration, + Postgres: postgresChannelIntegration, + LruMemoizer: lruMemoizerChannelIntegration, + }, register: registerDiagnosticsChannelInjection, detect: detectOrchestrionSetup, - }; - }); + }), + ); } diff --git a/packages/node/src/sdk/index.ts b/packages/node/src/sdk/index.ts index 8c8d2e887541..794165ba38a9 100644 --- a/packages/node/src/sdk/index.ts +++ b/packages/node/src/sdk/index.ts @@ -30,32 +30,16 @@ export function getDefaultIntegrationsWithoutPerformance(): Integration[] { /** Get the default integrations for the Node SDK. */ export function getDefaultIntegrations(options: Options): Integration[] { - const integrations: Integration[] = [ + return [ ...getDefaultIntegrationsWithoutPerformance(), // We only add performance integrations if tracing is enabled - // Note that this means that without tracing enabled, e.g. `expressIntegration()` will not be added - // This means that generally request isolation will work (because that is done by httpIntegration) + // Note that this means that without tracing enabled, e.g. + // `expressIntegration()` will not be added + // This means that generally request isolation will work (because that is + // done by httpIntegration) // But `transactionName` will not be set automatically ...(hasSpansEnabled(options) ? getAutoPerformanceIntegrations() : []), ]; - - // When the app opted into diagnostics-channel injection (via - // `experimentalUseDiagnosticsChannelInjection()`) AND span recording is - // enabled, swap the channel-based integrations in place of OTel equivalents - // so the two don't both instrument the same library. - // - // Every channel-based integration we ship today is a 1:1 replacement for an - // OTel performance/tracing integration and produces nothing but spans (those - // only come from `getAutoPerformanceIntegrations()` above), so it's gated on - // span recording. - if (isDiagnosticsChannelInjectionEnabled() && hasSpansEnabled(options)) { - const diagnosticsChannelInjection = resolveDiagnosticsChannelInjection(); - if (diagnosticsChannelInjection) { - const replaced = new Set(diagnosticsChannelInjection.replacedOtelIntegrationNames); - return [...integrations.filter(i => !replaced.has(i.name)), ...diagnosticsChannelInjection.integrations]; - } - } - return integrations; } /** diff --git a/packages/node/test/sdk/orchestrionIntegrationSwap.test.ts b/packages/node/test/sdk/orchestrionIntegrationSwap.test.ts new file mode 100644 index 000000000000..b5c06732aacb --- /dev/null +++ b/packages/node/test/sdk/orchestrionIntegrationSwap.test.ts @@ -0,0 +1,71 @@ +import type { Integration } from '@sentry/core'; +import { afterEach, describe, expect, it, vi } from 'vitest'; +import { mysqlIntegration } from '../../src/integrations/tracing/mysql'; +import { postgresIntegration } from '../../src/integrations/tracing/postgres'; +import { + _resetDiagnosticsChannelInjectionForTesting, + setDiagnosticsChannelInjectionLoader, +} from '../../src/sdk/diagnosticsChannelInjection'; + +// `experimentalUseDiagnosticsChannelInjection()` registers +// channel-integration factories into a runtime registry; the OTel +// `xxxIntegration()` factories look themselves up there and build the +// channel implementation instead. We exercise that swap by registering fakes +// directly, so this file never imports the orchestrion code (which is exactly +// the property that keeps it tree-shakeable). + +afterEach(() => { + _resetDiagnosticsChannelInjectionForTesting(); + vi.restoreAllMocks(); +}); + +describe('OTel integration factory <-> channel swap', () => { + it('returns the OTel implementation when injection is not opted into', () => { + // No loader registered (the default), so no channel factory is found and + // the factories build their own OTel implementation. + const pg = postgresIntegration(); + const mysql = mysqlIntegration(); + + expect(pg.name).toBe('Postgres'); + expect(typeof pg.setupOnce).toBe('function'); + expect(mysql.name).toBe('Mysql'); + expect(typeof mysql.setupOnce).toBe('function'); + }); + + it('builds the registered channel implementation, forwarding options, when opted in', () => { + const pgChannel: Integration = { name: 'Postgres', setupOnce: vi.fn() }; + const mysqlChannel: Integration = { name: 'Mysql', setupOnce: vi.fn() }; + const pgFactory = vi.fn(() => pgChannel); + const mysqlFactory = vi.fn(() => mysqlChannel); + + setDiagnosticsChannelInjectionLoader(() => ({ + integrationFactories: { Postgres: pgFactory, Mysql: mysqlFactory }, + register: vi.fn(), + detect: vi.fn(), + })); + + const pg = postgresIntegration({ ignoreConnectSpans: true }); + const mysql = mysqlIntegration(); + + // The factory's instance is returned verbatim + expect(pg).toBe(pgChannel); + expect(mysql).toBe(mysqlChannel); + // The user's options reach the channel factory unchanged. + expect(pgFactory).toHaveBeenCalledWith({ ignoreConnectSpans: true }); + }); + + it('only swaps the integrations that have a registered factory', () => { + const pgChannel: Integration = { name: 'Postgres', setupOnce: vi.fn() }; + + setDiagnosticsChannelInjectionLoader(() => ({ + integrationFactories: { Postgres: () => pgChannel }, + register: vi.fn(), + detect: vi.fn(), + })); + + // Postgres has a registered factory; mysql does not, so it stays OTel. + expect(postgresIntegration()).toBe(pgChannel); + expect(mysqlIntegration().name).toBe('Mysql'); + expect(mysqlIntegration()).not.toBe(pgChannel); + }); +}); From 87f9faf693fea8595db596188b96e0b9c17615a8 Mon Sep 17 00:00:00 2001 From: isaacs Date: Sat, 27 Jun 2026 16:32:10 -0700 Subject: [PATCH 13/14] fix(deno): provide options to pg integration --- packages/deno/src/integrations/postgres.ts | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/packages/deno/src/integrations/postgres.ts b/packages/deno/src/integrations/postgres.ts index afb72e827a13..c76ac86f3893 100644 --- a/packages/deno/src/integrations/postgres.ts +++ b/packages/deno/src/integrations/postgres.ts @@ -5,6 +5,11 @@ import { setAsyncLocalStorageAsyncContextStrategy } from '../async'; const INTEGRATION_NAME = 'DenoPostgres' as const; +interface DenoPostgresIntegrationOptions { + /** Whether to skip creating spans for `pg`/`pg-pool` connections. Defaults to `false`. */ + ignoreConnectSpans?: boolean; +} + /** * Create spans for `pg` (node-postgres) queries under Deno. * @@ -17,8 +22,8 @@ const INTEGRATION_NAME = 'DenoPostgres' as const; * `AsyncLocalStorage` context strategy (so spans nest under the active * span and survive pg's internal callback dispatch) before delegating. */ -const _denoPostgresIntegration = (() => { - const inner = postgresChannelIntegration(); +const _denoPostgresIntegration = ((options?: DenoPostgresIntegrationOptions) => { + const inner = postgresChannelIntegration(options); return extendIntegration(inner, { name: INTEGRATION_NAME, @@ -28,7 +33,9 @@ const _denoPostgresIntegration = (() => { }); }) satisfies IntegrationFn; -export const denoPostgresIntegration = defineIntegration(_denoPostgresIntegration) as () => Integration & { +export const denoPostgresIntegration = defineIntegration(_denoPostgresIntegration) as ( + options?: DenoPostgresIntegrationOptions, +) => Integration & { name: 'DenoPostgres'; setupOnce: () => void; }; From 373c014c4c815b35f46253974ac726e5b667dfaf Mon Sep 17 00:00:00 2001 From: isaacs Date: Tue, 30 Jun 2026 16:43:47 -0700 Subject: [PATCH 14/14] ref(mysql): use bindTracingChannelToSpan --- .../deno/test/orchestrion-postgres.test.ts | 10 +- .../integrations/tracing-channel/postgres.ts | 159 ++++++------------ .../postgres-ignore-connect.test.ts | 66 +++++++- .../test/orchestrion/postgres.test.ts | 70 +++++++- 4 files changed, 192 insertions(+), 113 deletions(-) diff --git a/packages/deno/test/orchestrion-postgres.test.ts b/packages/deno/test/orchestrion-postgres.test.ts index e4c49eae0eae..f1ca62c16988 100644 --- a/packages/deno/test/orchestrion-postgres.test.ts +++ b/packages/deno/test/orchestrion-postgres.test.ts @@ -124,10 +124,12 @@ Deno.test('denoPostgresIntegration: orchestrion:pg:query channel produces a nest // Callback-success order published by orchestrion's transform: // start -> end -> asyncStart -> asyncEnd (the span closes on asyncEnd). startSpan({ name: 'parent', op: 'test' }, () => { - channel.start.publish(ctx); - channel.end.publish(ctx); - channel.asyncStart.publish(ctx); - channel.asyncEnd.publish(ctx); + channel.start.runStores(ctx, () => { + channel.end.publish(ctx); + }); + channel.asyncStart.runStores(ctx, () => { + channel.asyncEnd.publish(ctx); + }); }); const parent = await withTimeout( diff --git a/packages/server-utils/src/integrations/tracing-channel/postgres.ts b/packages/server-utils/src/integrations/tracing-channel/postgres.ts index a98b8068058e..42277769bc88 100644 --- a/packages/server-utils/src/integrations/tracing-channel/postgres.ts +++ b/packages/server-utils/src/integrations/tracing-channel/postgres.ts @@ -1,5 +1,5 @@ import * as diagnosticsChannel from 'node:diagnostics_channel'; -import type { IntegrationFn, Scope, Span, SpanAttributes } from '@sentry/core'; +import type { IntegrationFn, Scope, SpanAttributes } from '@sentry/core'; import { bindScopeToEmitter, debug, @@ -7,12 +7,12 @@ import { getActiveSpan, getCurrentScope, SEMANTIC_ATTRIBUTE_SENTRY_ORIGIN, - SPAN_STATUS_ERROR, startInactiveSpan, - withScope, + waitForTracingChannelBinding, } from '@sentry/core'; import { DEBUG_BUILD } from '../../debug-build'; import { CHANNELS } from '../../orchestrion/channels'; +import { bindTracingChannelToSpan } from '../../tracing-channel'; // NOTE: this uses the same name as the OTel integration by design. // When enabled, the OTel 'Postgres' integration is omitted from the default set. @@ -48,17 +48,17 @@ const SPAN_CONNECT = 'pg.connect'; const SPAN_POOL_CONNECT = 'pg-pool.connect'; /** - * The shape orchestrion's wrapAuto transform attaches to the tracing-channel - * `context`. `arguments` is the live call args; orchestrion splices the user's - * callback out and inserts its own wrapper at the same index before `start`, - * and the `start` hook re-wraps that entry to restore the caller's scope - * across pg's async callback dispatch. + * The shape orchestrion's transform attaches to the tracing-channel `context`. Documented here rather + * than imported because orchestrion's runtime doesn't export it. */ interface PgChannelContext { + // The live args array passed to the wrapped `query`/`connect` call. arguments: unknown[]; self?: unknown; result?: unknown; error?: unknown; + // The caller's scope, captured at `start` and replayed onto a streamed `Submittable` emitter (see below). + _sentryCallerScope?: Scope; } interface PgConnectionParams { @@ -82,16 +82,23 @@ const _postgresChannelIntegration = ((options: { ignoreConnectSpans?: boolean } return { name: INTEGRATION_NAME, setupOnce() { - // Query spans: `pg`/native `Client.prototype.query`. - subscribeQueryLikeChannel(CHANNELS.PG_QUERY, querySpanOptions); - - // Connect spans, gated by `ignoreConnectSpans` (same as OTel pg). - // `Client.prototype.connect` (pg + native) - // and `Pool.prototype.connect` (pg-pool). - if (!options.ignoreConnectSpans) { - subscribeQueryLikeChannel(CHANNELS.PG_CONNECT, connectSpanOptions); - subscribeQueryLikeChannel(CHANNELS.PGPOOL_CONNECT, poolConnectSpanOptions); + // `tracingChannel` is unavailable before Node 18.19 so do nothing in that case. + if (!diagnosticsChannel.tracingChannel) { + return; } + + waitForTracingChannelBinding(() => { + // Query spans: `pg`/native `Client.prototype.query`. + subscribeQueryLikeChannel(CHANNELS.PG_QUERY, querySpanOptions); + + // Connect spans, gated by `ignoreConnectSpans` (same as OTel pg). + // `Client.prototype.connect` (pg + native) + // and `Pool.prototype.connect` (pg-pool). + if (!options.ignoreConnectSpans) { + subscribeQueryLikeChannel(CHANNELS.PG_CONNECT, connectSpanOptions); + subscribeQueryLikeChannel(CHANNELS.PGPOOL_CONNECT, poolConnectSpanOptions); + } + }); }, }; }) satisfies IntegrationFn; @@ -106,104 +113,48 @@ function subscribeQueryLikeChannel( getSpanOptions: (ctx: PgChannelContext) => { name: string; op: string; attributes: SpanAttributes }, ): void { DEBUG_BUILD && debug.log(`[orchestrion:pg] subscribing to channel "${channelName}"`); - const ch = diagnosticsChannel.tracingChannel(channelName); - const spans = new WeakMap(); - const parentScopes = new WeakMap(); - ch.subscribe({ - start(rawCtx) { - // only instrument when there's an active span + bindTracingChannelToSpan( + diagnosticsChannel.tracingChannel(channelName), + data => { + // Only instrument when there's an active span; returning `undefined` opts this call out entirely, + // leaving the active context untouched (e.g. connects issued during app startup). if (!getActiveSpan()) { - return; + return undefined; } - const ctx = rawCtx as PgChannelContext; - const span = startInactiveSpan(getSpanOptions(ctx)); - spans.set(rawCtx, span); - // Capture the caller's scope while we're still synchronously inside the - // call, to restore it for deferred callbacks/streamed events - // (pg dispatches them outside the original async scope). - const scope = getCurrentScope(); - parentScopes.set(rawCtx, scope); + // Capture the caller's scope while still synchronously inside the call, for the streamed path: + // pg dispatches a `Submittable` emitter's events outside the original async scope, so `deferSpanEnd` + // replays this scope onto that emitter. + data._sentryCallerScope = getCurrentScope(); - // Re-wrap orchestrion's callback wrapper so the user's callback and - // anything chained off it runs with the captured scope active. - if (ctx.arguments.length > 0) { - const cbIdx = ctx.arguments.length - 1; - const orchestrionWrappedCb = ctx.arguments[cbIdx]; - if (typeof orchestrionWrappedCb === 'function') { - const wrapped = orchestrionWrappedCb as (...a: unknown[]) => unknown; - ctx.arguments[cbIdx] = function (this: unknown, ...args: unknown[]): unknown { - return withScope(scope, () => wrapped.apply(this, args)); - }; - } - } + return startInactiveSpan(getSpanOptions(data)); }, - - end(rawCtx) { - const ctx = rawCtx as PgChannelContext; - - // Sync throw: `end` fires after `error`, so `ctx.error` is set; close - // now since no `asyncEnd` will fire. - if (ctx.error !== undefined) { - finishSpan(rawCtx); - return; - } - - // Streamable `Submittable` (e.g. `client.query(new Query())`): - // orchestrion stored the returned emitter on `ctx.result` and fired no - // async events. Bind the captured scope to it and finish on - // end or error. - const result = ctx.result; - if (result && typeof result === 'object' && hasOnMethod(result)) { - const span = spans.get(rawCtx); - if (!span) { - return; + { + // Streamable `Submittable` (e.g. `client.query(new Query())`) returns an emitter that orchestrion + // stores on `ctx.result` while firing no async events; the query isn't done until the emitter emits + // `'end'`/`'error'`. Defer ending to those events for that path; the callback, promise, and sync-throw + // paths carry no emitter, so the helper ends the span as usual. + deferSpanEnd({ data, end }) { + const result = data.result; + if (!result || typeof result !== 'object' || !hasOnMethod(result)) { + return false; } - const parentScope = parentScopes.get(rawCtx); - if (parentScope) { - bindScopeToEmitter(result, parentScope); - } - result.on('error', err => { - span.setStatus({ code: SPAN_STATUS_ERROR, message: err instanceof Error ? err.message : 'unknown_error' }); - finishSpan(rawCtx); - }); - result.on('end', () => finishSpan(rawCtx)); - return; - } - // Callback/promise path: `asyncEnd` closes the span. - }, - - error(rawCtx) { - const ctx = rawCtx as PgChannelContext; - const span = spans.get(rawCtx); - if (!span) { - return; - } - span.setStatus({ - code: SPAN_STATUS_ERROR, - message: ctx.error instanceof Error ? ctx.error.message : 'unknown_error', - }); - }, + // Replay the caller's scope onto the emitter so listeners the user attaches after the call returns + // (and any spans they start) nest under the caller, not a fresh root trace. + const callerScope = data._sentryCallerScope; + if (callerScope) { + bindScopeToEmitter(result, callerScope); + } - asyncStart() { - // No-op: we end on `asyncEnd` so the span covers the full duration. - }, + result.on('error', err => end(err)); + result.on('end', () => end()); - asyncEnd(rawCtx) { - finishSpan(rawCtx); + return true; + }, }, - }); - - function finishSpan(rawCtx: object): void { - const span = spans.get(rawCtx); - if (span) { - span.end(); - spans.delete(rawCtx); - parentScopes.delete(rawCtx); - } - } + ); } function querySpanOptions(ctx: PgChannelContext): { name: string; op: string; attributes: SpanAttributes } { diff --git a/packages/server-utils/test/orchestrion/postgres-ignore-connect.test.ts b/packages/server-utils/test/orchestrion/postgres-ignore-connect.test.ts index 0949296084f8..12f2bd6688df 100644 --- a/packages/server-utils/test/orchestrion/postgres-ignore-connect.test.ts +++ b/packages/server-utils/test/orchestrion/postgres-ignore-connect.test.ts @@ -1,14 +1,78 @@ +import { AsyncLocalStorage } from 'node:async_hooks'; import { tracingChannel } from 'node:diagnostics_channel'; -import { describe, expect, it } from 'vitest'; +import type { Scope } from '@sentry/core'; +import { + _INTERNAL_setSpanForScope, + getDefaultCurrentScope, + getDefaultIsolationScope, + setAsyncContextStrategy, +} from '@sentry/core'; +import { afterAll, beforeAll, describe, expect, it } from 'vitest'; import { postgresChannelIntegration } from '../../src/orchestrion'; import { CHANNELS } from '../../src/orchestrion/channels'; +interface TestStore { + scope: Scope; + isolationScope: Scope; +} + +// `setupOnce` only subscribes once `waitForTracingChannelBinding` sees an +// async-context strategy exposing `getTracingChannelBinding`. Install a +// minimal one so the subscriptions actually register here. +function installTestAsyncContextStrategy(): void { + const asyncStorage = new AsyncLocalStorage(); + + function getScopes(): TestStore { + return asyncStorage.getStore() || { scope: getDefaultCurrentScope(), isolationScope: getDefaultIsolationScope() }; + } + + setAsyncContextStrategy({ + withScope: callback => { + const scope = getScopes().scope.clone(); + const isolationScope = getScopes().isolationScope; + return asyncStorage.run({ scope, isolationScope }, () => callback(scope)); + }, + withSetScope: (scope, callback) => { + const isolationScope = getScopes().isolationScope; + return asyncStorage.run({ scope, isolationScope }, () => callback(scope)); + }, + withIsolationScope: callback => { + const scope = getScopes().scope; + const isolationScope = getScopes().isolationScope.clone(); + return asyncStorage.run({ scope, isolationScope }, () => callback(isolationScope)); + }, + withSetIsolationScope: (isolationScope, callback) => { + const scope = getScopes().scope; + return asyncStorage.run({ scope, isolationScope }, () => callback(isolationScope)); + }, + getCurrentScope: () => getScopes().scope, + getIsolationScope: () => getScopes().isolationScope, + getTracingChannelBinding: () => ({ + asyncLocalStorage: asyncStorage, + getStoreWithActiveSpan: span => { + const scope = getScopes().scope.clone(); + const isolationScope = getScopes().isolationScope; + _INTERNAL_setSpanForScope(scope, span); + return { scope, isolationScope }; + }, + }), + }); +} + // `setupOnce` subscribes to process-global `tracingChannel`s, so asserting the // ABSENCE of connect subscribers only holds when no other (default-options) // integration in the same module context has subscribed. vitest isolates // module state per file, so this file keeps that assertion clean (the default // options integration is exercised in `postgres.test.ts`). describe('postgresChannelIntegration({ ignoreConnectSpans: true })', () => { + beforeAll(() => { + installTestAsyncContextStrategy(); + }); + + afterAll(() => { + setAsyncContextStrategy(undefined); + }); + it('subscribes to the query channel but NOT the connect / pool-connect channels', () => { postgresChannelIntegration({ ignoreConnectSpans: true }).setupOnce?.(); diff --git a/packages/server-utils/test/orchestrion/postgres.test.ts b/packages/server-utils/test/orchestrion/postgres.test.ts index b206562c33de..fb9cb310c734 100644 --- a/packages/server-utils/test/orchestrion/postgres.test.ts +++ b/packages/server-utils/test/orchestrion/postgres.test.ts @@ -1,10 +1,66 @@ +import { AsyncLocalStorage } from 'node:async_hooks'; import { tracingChannel } from 'node:diagnostics_channel'; -import type { Span } from '@sentry/core'; +import type { Scope, Span } from '@sentry/core'; import * as SentryCore from '@sentry/core'; -import { afterEach, beforeAll, beforeEach, describe, expect, it, type MockInstance, vi } from 'vitest'; +import { + _INTERNAL_setSpanForScope, + getDefaultCurrentScope, + getDefaultIsolationScope, + setAsyncContextStrategy, +} from '@sentry/core'; +import { afterAll, afterEach, beforeAll, beforeEach, describe, expect, it, type MockInstance, vi } from 'vitest'; import { postgresChannelIntegration } from '../../src/orchestrion'; import { CHANNELS } from '../../src/orchestrion/channels'; +interface TestStore { + scope: Scope; + isolationScope: Scope; +} + +// `bindTracingChannelToSpan` only binds (and `setupOnce` only subscribes via +// `waitForTracingChannelBinding`) when an async-context strategy exposes a +// `getTracingChannelBinding`. Install a minimal one so the channel +// subscriptions actually register in this unit-test context (no SDK `init`). +function installTestAsyncContextStrategy(): void { + const asyncStorage = new AsyncLocalStorage(); + + function getScopes(): TestStore { + return asyncStorage.getStore() || { scope: getDefaultCurrentScope(), isolationScope: getDefaultIsolationScope() }; + } + + setAsyncContextStrategy({ + withScope: callback => { + const scope = getScopes().scope.clone(); + const isolationScope = getScopes().isolationScope; + return asyncStorage.run({ scope, isolationScope }, () => callback(scope)); + }, + withSetScope: (scope, callback) => { + const isolationScope = getScopes().isolationScope; + return asyncStorage.run({ scope, isolationScope }, () => callback(scope)); + }, + withIsolationScope: callback => { + const scope = getScopes().scope; + const isolationScope = getScopes().isolationScope.clone(); + return asyncStorage.run({ scope, isolationScope }, () => callback(isolationScope)); + }, + withSetIsolationScope: (isolationScope, callback) => { + const scope = getScopes().scope; + return asyncStorage.run({ scope, isolationScope }, () => callback(isolationScope)); + }, + getCurrentScope: () => getScopes().scope, + getIsolationScope: () => getScopes().isolationScope, + getTracingChannelBinding: () => ({ + asyncLocalStorage: asyncStorage, + getStoreWithActiveSpan: span => { + const scope = getScopes().scope.clone(); + const isolationScope = getScopes().isolationScope; + _INTERNAL_setSpanForScope(scope, span); + return { scope, isolationScope }; + }, + }), + }); +} + // The subscriber builds spans via `startInactiveSpan` and gates on // `getActiveSpan`. We spy both: `getActiveSpan` to satisfy the // requireParentSpan gate, and `startInactiveSpan` to capture the span @@ -13,7 +69,7 @@ import { CHANNELS } from '../../src/orchestrion/channels'; // SDK's `inferDbSpanData` processor, which isn't wired up here. That's // covered by the integration test. function makeSpan(): Span { - return { end: vi.fn(), setStatus: vi.fn() } as unknown as Span; + return { end: vi.fn(), setStatus: vi.fn(), setAttributes: vi.fn() } as unknown as Span; } interface ChannelContext { @@ -27,11 +83,17 @@ describe('postgresChannelIntegration', () => { let span: Span; // Subscribe once for the whole file so a single subscriber handles each - // publish (avoids accumulating duplicate subscriptions across tests). + // publish (avoids accumulating duplicate subscriptions across tests). The + // strategy must be installed first so `setupOnce`'s `waitForTracingChannelBinding` fires synchronously. beforeAll(() => { + installTestAsyncContextStrategy(); postgresChannelIntegration().setupOnce?.(); }); + afterAll(() => { + setAsyncContextStrategy(undefined); + }); + beforeEach(() => { span = makeSpan(); startInactiveSpanSpy = vi.spyOn(SentryCore, 'startInactiveSpan').mockReturnValue(span);