diff --git a/packages/deno/test/orchestrion-mysql.test.ts b/packages/deno/test/orchestrion-mysql.test.ts index eac5d7dad6cb..0159d66baf79 100644 --- a/packages/deno/test/orchestrion-mysql.test.ts +++ b/packages/deno/test/orchestrion-mysql.test.ts @@ -120,11 +120,15 @@ Deno.test('denoMysqlIntegration: orchestrion:mysql:query channel produces a nest // Callback-success order published by orchestrion's transform: // start → end → asyncStart → asyncEnd (the span closes on asyncEnd). + // `start`/`asyncStart` go through `runStores` (not bare `publish`), exactly as the transform's + // `wrapCallback` does — that's what activates the store the subscriber binds, so the span opens. startSpan({ name: 'parent', op: 'test' }, () => { - channel.start.publish(ctx); - channel.end.publish(ctx); - channel.asyncStart.publish(ctx); - channel.asyncEnd.publish(ctx); + channel.start.runStores(ctx, () => { + channel.end.publish(ctx); + }); + channel.asyncStart.runStores(ctx, () => { + channel.asyncEnd.publish(ctx); + }); }); const parent = await withTimeout( diff --git a/packages/server-utils/src/integrations/tracing-channel/mysql.ts b/packages/server-utils/src/integrations/tracing-channel/mysql.ts index bdaffd7bdfc6..522bb0b962ce 100644 --- a/packages/server-utils/src/integrations/tracing-channel/mysql.ts +++ b/packages/server-utils/src/integrations/tracing-channel/mysql.ts @@ -1,32 +1,25 @@ import * as diagnosticsChannel from 'node:diagnostics_channel'; -import type { IntegrationFn, Scope, Span } from '@sentry/core'; +import type { IntegrationFn, Scope } from '@sentry/core'; import { bindScopeToEmitter, debug, defineIntegration, getCurrentScope, SEMANTIC_ATTRIBUTE_SENTRY_ORIGIN, - SPAN_STATUS_ERROR, startInactiveSpan, - withScope, + waitForTracingChannelBinding, } from '@sentry/core'; import { DEBUG_BUILD } from '../../debug-build'; import { CHANNELS } from '../../orchestrion/channels'; +import { bindTracingChannelToSpan } from '../../tracing-channel'; // NOTE: this uses the same name as the OTel integration by design. // When enabled, OTel 'Mysql' integration is omitted from the default set. const INTEGRATION_NAME = 'Mysql' as const; -// OpenTelemetry "OLD" db/net semantic-conventions. We inline them rather than -// importing `@opentelemetry/semantic-conventions` to keep this integration's -// dependency surface free of OTel — orchestrion's whole point is to step away -// from the OTel auto-instrumentation stack. -// -// We emit the OLD conventions to match `@opentelemetry/instrumentation-mysql`'s -// default (it only emits the stable `db.system.name` / `db.query.text` set when -// `OTEL_SEMCONV_STABILITY_OPT_IN=database` is opted into) and the rest of the -// Sentry JS SDK, whose `inferDbSpanData` processor renames spans based on -// `db.statement`. +// OTel "OLD" db/net semantic-conventions, inlined to keep this integration free of OTel deps. Matches +// `@opentelemetry/instrumentation-mysql`'s default and the SDK's `inferDbSpanData` (which renames spans +// off `db.statement`). const ATTR_DB_SYSTEM = 'db.system'; const ATTR_DB_CONNECTION_STRING = 'db.connection_string'; const ATTR_DB_NAME = 'db.name'; @@ -36,21 +29,18 @@ const ATTR_NET_PEER_NAME = 'net.peer.name'; const ATTR_NET_PEER_PORT = 'net.peer.port'; /** - * The shape orchestrion's wrapCallback transform attaches to the tracing-channel - * `context` object. Documented here rather than imported because orchestrion's - * runtime doesn't export it — see `node_modules/@apm-js-collab/code-transformer/lib/transforms.js`. - * - * `arguments` is the *live* args array passed to the wrapped function: orchestrion - * splices the user's callback out and inserts its own wrapper at the same index - * before publishing `start`. The `start` hook re-wraps that entry to restore the - * caller's scope across mysql's async callback dispatch (see below). + * The shape orchestrion's transform attaches to the tracing-channel `context` object. Documented here + * rather than imported because orchestrion's runtime doesn't export it. */ interface MysqlQueryChannelContext { + // The live args array passed to the wrapped `connection.query` call; `arguments[0]` is the SQL. arguments: unknown[]; self?: MysqlConnection; moduleVersion?: string; result?: unknown; error?: unknown; + // The caller's scope, captured at `start` and replayed onto the streamed `Query` emitter (see below). + _sentryCallerScope?: Scope; } interface MysqlConnectionConfig { @@ -70,163 +60,64 @@ const _mysqlChannelIntegration = (() => { return { name: INTEGRATION_NAME, setupOnce() { - DEBUG_BUILD && debug.log(`[orchestrion:mysql] subscribing to channel "${CHANNELS.MYSQL_QUERY}"`); - const queryCh = diagnosticsChannel.tracingChannel(CHANNELS.MYSQL_QUERY); - - // Orchestrion creates one `context` object per call, shared across all - // lifecycle hooks. We key both maps off that identity; `WeakMap` so an - // unfinished path can't leak its entries. - const spans = new WeakMap(); - // The scope active when the query was issued, consumed in `end` to bind - // the streamed `Query` emitter's listeners to it. - const parentScopes = new WeakMap(); - - // `subscribe()` requires all five lifecycle hooks. The orchestrion - // `wrapAuto` transform fires events in one of four orders depending on - // call shape: - // - sync throw : start → error → end - // (NO asyncEnd) - // - async-callback error : start → end → error → - // asyncStart → asyncEnd - // - async-callback success : start → end → asyncStart → - // asyncEnd - // - no-callback (streamable Query) : start → end - // (ctx.result is the Query - // emitter, no async events) - // - // Where the span closes depends on the path: `asyncEnd` for callbacks (so - // it spans the full round-trip + callback), or `end` for the sync-throw - // and streamable paths. The `end` hook tells those apart via `ctx.error` - // / `ctx.result` — see there. - queryCh.subscribe({ - start(rawCtx) { - const ctx = rawCtx as MysqlQueryChannelContext; - const sql = extractSql(ctx.arguments[0]); - const { host, port, database, user } = getConnectionConfig(ctx.self); - const portNumber = typeof port === 'string' ? parseInt(port, 10) : port; - const portIsNumber = typeof portNumber === 'number' && !isNaN(portNumber); - - const span = startInactiveSpan({ - name: sql ?? 'mysql.query', - op: 'db', - attributes: { - [ATTR_DB_SYSTEM]: 'mysql', - [SEMANTIC_ATTRIBUTE_SENTRY_ORIGIN]: 'auto.db.orchestrion.mysql', - [ATTR_DB_CONNECTION_STRING]: getJDBCString(host, portIsNumber ? portNumber : undefined, database), - ...(database ? { [ATTR_DB_NAME]: database } : {}), - ...(user ? { [ATTR_DB_USER]: user } : {}), - ...(sql ? { [ATTR_DB_STATEMENT]: sql } : {}), - ...(host ? { [ATTR_NET_PEER_NAME]: host } : {}), - ...(portIsNumber ? { [ATTR_NET_PEER_PORT]: portNumber } : {}), - }, - }); - spans.set(rawCtx, span); - - // Capture the scope while we're still synchronously inside the - // caller's `connection.query` call. mysql v2 drains callbacks and - // emits streamed-query events from its socket data handler, where the - // AsyncLocalStorage store backing the active span no longer reflects - // the caller's context — and `asyncStart`/`asyncEnd` fire from that - // same lost context, so capturing has to happen now. - const scope = getCurrentScope(); - parentScopes.set(rawCtx, scope); - - // Callback path: orchestrion has spliced the user's callback out of - // `ctx.arguments` and put its own wrapper (`__apm$wrappedCb`) at the - // same index. Re-wrap it so the callback — and any nested - // `connection.query(...)` — runs with the captured scope active. - if (ctx.arguments.length > 0) { - const cbIdx = ctx.arguments.length - 1; - const orchestrionWrappedCb = ctx.arguments[cbIdx]; - if (typeof orchestrionWrappedCb === 'function') { - const wrapped = orchestrionWrappedCb as (...a: unknown[]) => unknown; - ctx.arguments[cbIdx] = function (this: unknown, ...args: unknown[]): unknown { - return withScope(scope, () => wrapped.apply(this, args)); - }; - } - } - }, - - end(rawCtx) { - const ctx = rawCtx as MysqlQueryChannelContext; - - // Sync throw: `end` fires AFTER `error` (both inside the wrapper's - // `try/catch/finally`), so `ctx.error` is already set. Close the - // span now since no `asyncEnd` will fire. - if (ctx.error !== undefined) { - finishSpan(rawCtx); - return; - } - - // No-callback (streamable Query) path: orchestrion's `wrapPromise` - // stores the synchronous return value on `ctx.result` and never - // fires `asyncStart`/`asyncEnd`. The returned `Query` is an - // `EventEmitter` that emits `'end'` on success and `'error'` on - // failure — hook those to close the span. - // Note: a streamed span never finishes if the connection is destroyed - // mid-flight — mysql then emits neither `'end'` nor `'error'`, so the - // span is dropped (the `WeakMap` still prevents a leak). Closing this - // needs connection-level hooks the per-query context doesn't expose. - const result = ctx.result; - if (result && typeof result === 'object' && hasOnMethod(result)) { - const span = spans.get(rawCtx); - if (!span) return; + // `tracingChannel` is unavailable before Node 18.19 so do nothing in that case. + if (!diagnosticsChannel.tracingChannel) { + return; + } - // Bind the captured scope to the streamed `Query` emitter: its - // `'end'`/`'error'`/`'fields'`/… events fire from mysql's socket - // handler with the caller's context lost, so without this a span - // started in a user's stream listener would begin a fresh root trace - // instead of nesting under the parent. `bindScopeToEmitter` patches - // `on`/`addListener`/… so listeners added after `query()` returns - // inherit the scope (like OTel's `context.bind`). - const parentScope = parentScopes.get(rawCtx); - if (parentScope) { - bindScopeToEmitter(result, parentScope); - } + DEBUG_BUILD && debug.log(`[orchestrion:mysql] subscribing to channel "${CHANNELS.MYSQL_QUERY}"`); - result.on('error', err => { - span.setStatus({ - code: SPAN_STATUS_ERROR, - message: err instanceof Error ? err.message : 'unknown_error', - }); - // Defensive: end the span here too in case `'end'` never fires - // (e.g. abrupt socket destruction). `finishSpan` is idempotent — - // `spans.delete` makes the subsequent `'end'` listener a no-op. - finishSpan(rawCtx); + waitForTracingChannelBinding(() => { + bindTracingChannelToSpan( + diagnosticsChannel.tracingChannel(CHANNELS.MYSQL_QUERY), + data => { + const sql = extractSql(data.arguments[0]); + const { host, port, database, user } = getConnectionConfig(data.self); + const portNumber = typeof port === 'string' ? parseInt(port, 10) : port; + const portIsNumber = typeof portNumber === 'number' && !isNaN(portNumber); + + // For the streamed path: mysql emits the `Query` emitter's events from its socket data + // handler with the caller's context lost. `deferSpanEnd` replays this scope onto the emitter. + data._sentryCallerScope = getCurrentScope(); + + return startInactiveSpan({ + name: sql ?? 'mysql.query', + op: 'db', + attributes: { + [ATTR_DB_SYSTEM]: 'mysql', + [SEMANTIC_ATTRIBUTE_SENTRY_ORIGIN]: 'auto.db.orchestrion.mysql', + [ATTR_DB_CONNECTION_STRING]: getJDBCString(host, portIsNumber ? portNumber : undefined, database), + ...(database ? { [ATTR_DB_NAME]: database } : {}), + ...(user ? { [ATTR_DB_USER]: user } : {}), + ...(sql ? { [ATTR_DB_STATEMENT]: sql } : {}), + ...(host ? { [ATTR_NET_PEER_NAME]: host } : {}), + ...(portIsNumber ? { [ATTR_NET_PEER_PORT]: portNumber } : {}), + }, }); - result.on('end', () => finishSpan(rawCtx)); - return; - } - - // Callback path: `asyncEnd` will close the span. Nothing to do here. - }, - - error(rawCtx) { - const ctx = rawCtx as MysqlQueryChannelContext; - const span = spans.get(rawCtx); - if (!span) return; - span.setStatus({ - code: SPAN_STATUS_ERROR, - message: ctx.error instanceof Error ? ctx.error.message : 'unknown_error', - }); - }, - - asyncStart() { - // No-op: we end on `asyncEnd` so the span covers the full callback duration. - }, - - asyncEnd(rawCtx) { - finishSpan(rawCtx); - }, + }, + { + // No-callback `query(sql)` returns a streamable `Query` emitter as `result`; it settles on the + // emitter's `'end'`/`'error'`, not the channel, so defer ending to those. + deferSpanEnd({ data, end }) { + const result = data.result; + if (!result || typeof result !== 'object' || !hasOnMethod(result)) { + return false; + } + + // Replay the caller's scope so user listeners on the emitter nest under it, not a new trace. + const callerScope = data._sentryCallerScope; + if (callerScope) { + bindScopeToEmitter(result, callerScope); + } + + result.on('error', err => end(err)); + result.on('end', () => end()); + + return true; + }, + }, + ); }); - - function finishSpan(rawCtx: object): void { - const span = spans.get(rawCtx); - if (!span) return; - span.end(); - spans.delete(rawCtx); - parentScopes.delete(rawCtx); - } }, }; }) satisfies IntegrationFn; diff --git a/packages/server-utils/src/tracing-channel.ts b/packages/server-utils/src/tracing-channel.ts index bedc59d2e652..70b70c0f3bae 100644 --- a/packages/server-utils/src/tracing-channel.ts +++ b/packages/server-utils/src/tracing-channel.ts @@ -42,6 +42,22 @@ export interface TracingChannelLifeCycleOptions { * For database drivers, it is not recommended to set this at all. */ captureError?: boolean | ((e: unknown) => ExclusiveEventHintOrCaptureContext); + + /** + * Take ownership of *when* the span ends: return `true` and the helper won't end it on + * `end`/`asyncEnd`. For results that settle out-of-band — e.g. a streamed `EventEmitter` that + * completes via its own `'end'`/`'error'` events. + * + * Call `end` when it settles — `end()` on success, `end(error)` on failure. `end` owns *how* the span + * ends (error status/attributes, `captureError`, `beforeSpanEnd`) and is idempotent. Default `false` + * lets the helper end the span as usual. + */ + deferSpanEnd?: (args: { + span: Span; + data: TracingChannelPayloadWithSpan; + /** Ends the span: `end()` on success, `end(error)` on failure. Idempotent. */ + end: (error?: unknown) => void; + }) => boolean; } /** Returned by {@link bindTracingChannelToSpan}: the bound channel plus a teardown handle. */ @@ -77,6 +93,7 @@ export function bindTracingChannelToSpan( const handle = bindSpanToChannelStore(channel, getSpan); const beforeSpanEnd = opts?.beforeSpanEnd; + const deferSpanEnd = opts?.deferSpanEnd; const getErrorHint = (e: unknown): ExclusiveEventHintOrCaptureContext => { if (typeof opts?.captureError === 'function') { return opts.captureError(e); @@ -90,6 +107,36 @@ export function bindTracingChannelToSpan( }; }; + // Apply Sentry error status + attributes (and capture, if configured) to a span. Shared by the + // channel `error` lifecycle and the deferred `end` util so the two can't drift. + const annotateSpanError = (span: Span, error: unknown): void => { + if (opts?.captureError) { + captureException(error, getErrorHint(error)); + } + + const { message, attributes } = getErrorInfo(error); + span.setStatus({ code: SPAN_STATUS_ERROR, message }); + span.setAttributes(attributes); + }; + + // Creates an end fn for deferred handlers to use, ensures consistent span end behavior + const makeDeferredEnd = (span: Span, data: TracingChannelPayloadWithSpan) => { + let ended = false; + + return (error?: unknown): void => { + if (ended) { + return; + } + + ended = true; + if (error !== undefined) { + annotateSpanError(span, error); + } + + endBoundSpan(data, beforeSpanEnd); + }; + }; + const subscribers: Partial>> = { start: NOOP, asyncStart: NOOP, @@ -97,6 +144,10 @@ export function bindTracingChannelToSpan( // The operation settled synchronously (returned or threw) // Presence checks because caller can return `undefined` result or throw a falsy value. if ('error' in data || 'result' in data) { + const span = data._sentrySpan; + if (span && deferSpanEnd?.({ span, data, end: makeDeferredEnd(span, data) })) { + return; + } endBoundSpan(data, beforeSpanEnd); } }, @@ -108,15 +159,13 @@ export function bindTracingChannelToSpan( return; } - if (opts?.captureError) { - captureException(data.error, getErrorHint(data.error)); - } - - const { message, attributes } = getErrorInfo(data.error); - span.setStatus({ code: SPAN_STATUS_ERROR, message }); - span.setAttributes(attributes); + annotateSpanError(span, data.error); }, asyncEnd(data) { + const span = data._sentrySpan; + if (span && deferSpanEnd?.({ span, data, end: makeDeferredEnd(span, data) })) { + return; + } endBoundSpan(data, beforeSpanEnd); }, }; diff --git a/packages/server-utils/test/tracing-channel.test.ts b/packages/server-utils/test/tracing-channel.test.ts index a6d26b3f5f3a..6ea6a704ee0d 100644 --- a/packages/server-utils/test/tracing-channel.test.ts +++ b/packages/server-utils/test/tracing-channel.test.ts @@ -783,6 +783,72 @@ describe('bindTracingChannelToSpan', () => { }); }); + describe('deferSpanEnd', () => { + // Drives a deferred span: `getSpan` opens it, `deferSpanEnd` returns true and captures the `end` + // util so the test can settle the span out-of-band, mimicking a streamed emitter. + function setupDeferred( + name: string, + opts?: { captureError?: boolean }, + ): { span: Span; endSpy: ReturnType; end: (error?: unknown) => void } { + installTestAsyncContextStrategy(); + initTestClient(); + const span = startInactiveSpan({ name: 'channel-span' }); + const endSpy = vi.spyOn(span, 'end'); + let captured: (error?: unknown) => void = () => undefined; + const { channel } = bindTracingChannelToSpan(tracingChannel<{ operation: string }>(name), () => span, { + captureError: opts?.captureError, + deferSpanEnd({ end }) { + captured = end; + return true; + }, + }); + + channel.traceSync(() => 'stream', { operation: 'read' }); + + return { span, endSpy, end: (error?: unknown) => captured(error) }; + } + + it('does not end the span while deferred', () => { + const { span, endSpy } = setupDeferred('test:defer:open'); + expect(endSpy).not.toHaveBeenCalled(); + expect(spanToJSON(span).timestamp).toBeUndefined(); + }); + + it('`end()` ends the span once with no error status', () => { + const { span, endSpy, end } = setupDeferred('test:defer:ok'); + end(); + expect(endSpy).toHaveBeenCalledTimes(1); + expect(spanToJSON(span).timestamp).toBeDefined(); + expect(spanToJSON(span).status).toBeUndefined(); + }); + + it('`end(error)` sets error status and the `error.type` attribute, then ends', () => { + const { span, endSpy, end } = setupDeferred('test:defer:error'); + end(new TypeError('stream blew up')); + expect(spanToJSON(span).status).toBe('stream blew up'); + expect(spanToJSON(span).data['error.type']).toBe('TypeError'); + expect(endSpy).toHaveBeenCalledTimes(1); + }); + + it('is idempotent: a trailing settle is a no-op', () => { + const { endSpy, end } = setupDeferred('test:defer:idempotent'); + end(new Error('boom')); + end(); + expect(endSpy).toHaveBeenCalledTimes(1); + }); + + it('captures the error via `end(error)` when `captureError` is set', () => { + const captureExceptionSpy = vi.spyOn(SentryCore, 'captureException').mockReturnValue('event-id'); + const { end } = setupDeferred('test:defer:capture', { captureError: true }); + const error = new Error('captured-stream'); + end(error); + expect(captureExceptionSpy).toHaveBeenCalledTimes(1); + expect(captureExceptionSpy).toHaveBeenCalledWith(error, { + mechanism: { type: 'auto.diagnostic_channels.bind_span', handled: false }, + }); + }); + }); + it('returns the channel unchanged when no async context binding is available', () => { // No async context strategy is installed, so the binding cannot be resolved. const span = startInactiveSpan({ name: 'channel-span' });