Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import {
GEN_AI_USAGE_OUTPUT_TOKENS_ATTRIBUTE,
GEN_AI_USAGE_TOTAL_TOKENS_ATTRIBUTE,
} from '../../../../../packages/core/src/tracing/ai/gen-ai-attributes';
import { isOrchestrionEnabled } from '../../../utils';
import { cleanupChildProcesses, createEsmAndCjsTests } from '../../../utils/runner';

describe('Anthropic integration', () => {
Expand Down Expand Up @@ -93,8 +94,15 @@ describe('Anthropic integration', () => {

createEsmAndCjsTests(__dirname, 'scenario.mjs', 'instrument.mjs', (createRunner, test) => {
test('creates anthropic related spans with genAI recording disabled', async () => {
await createRunner()
.expect({ event: EXPECTED_MODEL_ERROR })
const runner = createRunner();

// The orchestrion path only marks the errored span; unlike the OTel path it does not
// capture the handled `error-model` rejection as an event.
if (!isOrchestrionEnabled()) {
runner.expect({ event: EXPECTED_MODEL_ERROR });
}

await runner
.expect({ transaction: EXPECTED_TRANSACTION_DEFAULT_PII_FALSE })
.expect({
span: container => {
Expand Down Expand Up @@ -141,8 +149,15 @@ describe('Anthropic integration', () => {

createEsmAndCjsTests(__dirname, 'scenario.mjs', 'instrument-with-pii.mjs', (createRunner, test) => {
test('creates anthropic related spans with genAI recording enabled', async () => {
await createRunner()
.expect({ event: EXPECTED_MODEL_ERROR })
const runner = createRunner();

// The orchestrion path only marks the errored span; unlike the OTel path it does not
// capture the handled `error-model` rejection as an event.
if (!isOrchestrionEnabled()) {
runner.expect({ event: EXPECTED_MODEL_ERROR });
}

await runner
.expect({ transaction: EXPECTED_TRANSACTION_DEFAULT_PII_TRUE })
.expect({
span: container => {
Expand All @@ -168,7 +183,9 @@ describe('Anthropic integration', () => {
expect(completionSpan!.attributes[GEN_AI_USAGE_OUTPUT_TOKENS_ATTRIBUTE].value).toBe(15);
expect(completionSpan!.attributes[GEN_AI_USAGE_TOTAL_TOKENS_ATTRIBUTE].value).toBe(25);
expect(completionSpan!.attributes['sentry.op'].value).toBe('gen_ai.chat');
expect(completionSpan!.attributes['sentry.origin'].value).toBe('auto.ai.anthropic');
expect(completionSpan!.attributes['sentry.origin'].value).toBe(
isOrchestrionEnabled() ? 'auto.ai.orchestrion.anthropic' : 'auto.ai.anthropic',
);
Comment thread
cursor[bot] marked this conversation as resolved.
Comment thread
nicohrubec marked this conversation as resolved.

const errorSpan = container.items.find(
span =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import {
GEN_AI_USAGE_OUTPUT_TOKENS_ATTRIBUTE,
GEN_AI_USAGE_TOTAL_TOKENS_ATTRIBUTE,
} from '../../../../../packages/core/src/tracing/ai/gen-ai-attributes';
import { isOrchestrionEnabled } from '../../../utils';
import { cleanupChildProcesses, createEsmAndCjsTests } from '../../../utils/runner';
import { createEsmTests } from '../../../utils/runner/createEsmAndCjsTests';

Expand Down Expand Up @@ -255,7 +256,9 @@ describe('LangChain integration', () => {
span: container => {
expect(container.items).toHaveLength(2);
const anthropicSpan = container.items.find(
span => span.attributes['sentry.origin'].value === 'auto.ai.anthropic',
span =>
span.attributes['sentry.origin'].value ===
(isOrchestrionEnabled() ? 'auto.ai.orchestrion.anthropic' : 'auto.ai.anthropic'),
);
expect(anthropicSpan).toBeDefined();
expect(anthropicSpan!.name).toBe('chat claude-3-5-sonnet-20241022');
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import {
GEN_AI_USAGE_OUTPUT_TOKENS_ATTRIBUTE,
GEN_AI_USAGE_TOTAL_TOKENS_ATTRIBUTE,
} from '../../../../../../packages/core/src/tracing/ai/gen-ai-attributes';
import { conditionalTest } from '../../../../utils';
import { conditionalTest, isOrchestrionEnabled } from '../../../../utils';
import { cleanupChildProcesses, createEsmAndCjsTests } from '../../../../utils/runner';
import { createEsmTests } from '../../../../utils/runner/createEsmAndCjsTests';

Expand Down Expand Up @@ -287,7 +287,9 @@ conditionalTest({ min: 20 })('LangChain integration (v1)', () => {
span: container => {
expect(container.items).toHaveLength(2);
const anthropicSpan = container.items.find(
span => span.attributes['sentry.origin'].value === 'auto.ai.anthropic',
span =>
span.attributes['sentry.origin'].value ===
(isOrchestrionEnabled() ? 'auto.ai.orchestrion.anthropic' : 'auto.ai.anthropic'),
);
expect(anthropicSpan).toBeDefined();
expect(anthropicSpan!.name).toBe('chat claude-3-5-sonnet-20241022');
Expand Down
9 changes: 8 additions & 1 deletion packages/core/src/shared-exports.ts
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,7 @@ export { addVercelAiProcessors, getProviderMetadataAttributes } from './tracing/
export { getTruncatedJsonString, shouldEnableTruncation, resolveAIRecordingOptions } from './tracing/ai/utils';
export {
GEN_AI_INPUT_MESSAGES_ORIGINAL_LENGTH_ATTRIBUTE,
GEN_AI_REQUEST_MODEL_ATTRIBUTE,
GEN_AI_SYSTEM_INSTRUCTIONS_ATTRIBUTE,
} from './tracing/ai/gen-ai-attributes';
export { _INTERNAL_getSpanContextForToolCallId, _INTERNAL_cleanupToolCallSpanContext } from './tracing/vercel-ai/utils';
Expand All @@ -196,7 +197,13 @@ export {
} from './tracing/openai/utils';
export { instrumentStream as instrumentOpenAiStream } from './tracing/openai/streaming';
export { OPENAI_INTEGRATION_NAME } from './tracing/openai/constants';
export { instrumentAnthropicAiClient } from './tracing/anthropic-ai';
export {
instrumentAnthropicAiClient,
extractRequestAttributes as extractAnthropicRequestAttributes,
addPrivateRequestAttributes as addAnthropicRequestAttributes,
addResponseAttributes as addAnthropicResponseAttributes,
} from './tracing/anthropic-ai';
export { instrumentAsyncIterableStream, instrumentMessageStream } from './tracing/anthropic-ai/streaming';
export { ANTHROPIC_AI_INTEGRATION_NAME } from './tracing/anthropic-ai/constants';
export { instrumentGoogleGenAIClient } from './tracing/google-genai';
export { GOOGLE_GENAI_INTEGRATION_NAME } from './tracing/google-genai/constants';
Expand Down
14 changes: 11 additions & 3 deletions packages/core/src/tracing/anthropic-ai/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,11 @@ import { handleResponseError, messagesFromParams, setMessagesAttribute } from '.
/**
* Extract request attributes from method arguments
*/
function extractRequestAttributes(args: unknown[], methodPath: string, operationName: string): Record<string, unknown> {
export function extractRequestAttributes(
args: unknown[],
methodPath: string,
operationName: string,
): Record<string, unknown> {
const attributes: Record<string, unknown> = {
[GEN_AI_SYSTEM_ATTRIBUTE]: 'anthropic',
[GEN_AI_OPERATION_NAME_ATTRIBUTE]: operationName,
Expand Down Expand Up @@ -73,7 +77,11 @@ function extractRequestAttributes(args: unknown[], methodPath: string, operation
* Add private request attributes to spans.
* This is only recorded if recordInputs is true.
*/
function addPrivateRequestAttributes(span: Span, params: Record<string, unknown>, enableTruncation: boolean): void {
export function addPrivateRequestAttributes(
span: Span,
params: Record<string, unknown>,
enableTruncation: boolean,
): void {
const messages = messagesFromParams(params);
setMessagesAttribute(span, messages, enableTruncation);

Expand Down Expand Up @@ -143,7 +151,7 @@ function addMetadataAttributes(span: Span, response: AnthropicAiResponse): void
/**
* Add response attributes to spans
*/
function addResponseAttributes(span: Span, response: AnthropicAiResponse, recordOutputs?: boolean): void {
export function addResponseAttributes(span: Span, response: AnthropicAiResponse, recordOutputs?: boolean): void {
if (!response || typeof response !== 'object') return;

// capture error, do not add attributes if error (they shouldn't exist)
Expand Down
183 changes: 183 additions & 0 deletions packages/server-utils/src/integrations/tracing-channel/anthropic.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
import * as diagnosticsChannel from 'node:diagnostics_channel';
import type { AnthropicAiOptions, AnthropicAiResponse, IntegrationFn, Span, SpanAttributeValue } from '@sentry/core';
import {
_INTERNAL_shouldSkipAiProviderWrapping,
addAnthropicRequestAttributes,
addAnthropicResponseAttributes,
debug,
defineIntegration,
extractAnthropicRequestAttributes,
GEN_AI_REQUEST_MODEL_ATTRIBUTE,
instrumentAsyncIterableStream,
instrumentMessageStream,
resolveAIRecordingOptions,
SEMANTIC_ATTRIBUTE_SENTRY_ORIGIN,
shouldEnableTruncation,
startInactiveSpan,
waitForTracingChannelBinding,
} from '@sentry/core';
import { DEBUG_BUILD } from '../../debug-build';
import { CHANNELS } from '../../orchestrion/channels';
import { bindTracingChannelToSpan } from '../../tracing-channel';

// Same name as the OTel integration by design: when enabled, the OTel 'Anthropic_AI'
// integration is dropped from the default set (see the Node opt-in loader).
const INTEGRATION_NAME = 'Anthropic_AI' as const;

// Distinct from the proxy's `auto.ai.anthropic` so spans from the orchestrion path
// are attributable separately from the OTel/proxy one.
const ORIGIN = 'auto.ai.orchestrion.anthropic';

// `stream` determines how the span is ended
const INSTRUMENTED_CHANNELS = [
{ channel: CHANNELS.ANTHROPIC_CHAT, operation: 'chat', methodPath: 'messages.create', stream: 'async-iterable' },
Comment thread
sentry[bot] marked this conversation as resolved.
{ channel: CHANNELS.ANTHROPIC_MODELS, operation: 'models', methodPath: 'models.retrieve', stream: 'none' },
{
channel: CHANNELS.ANTHROPIC_MESSAGES_STREAM,
operation: 'chat',
methodPath: 'messages.stream',
stream: 'message-stream',
},
] as const;

type StreamMode = (typeof INSTRUMENTED_CHANNELS)[number]['stream'];

interface AnthropicChannelContext {
arguments: unknown[];
result?: unknown;
}

let subscribed = false;

const _anthropicChannelIntegration = ((options: AnthropicAiOptions = {}) => {
return {
name: INTEGRATION_NAME,
setupOnce() {
// tracingChannel is unavailable before Node 18.19 and prevent double-subscribe
if (!diagnosticsChannel.tracingChannel || subscribed) {
return;
}
subscribed = true;

// `bindTracingChannelToSpan` needs the async-context binding that `initOpenTelemetry()` registers
// after `setupOnce` runs, so wait for it before subscribing.
waitForTracingChannelBinding(() => {
for (const { channel, operation, methodPath, stream } of INSTRUMENTED_CHANNELS) {
DEBUG_BUILD && debug.log(`[orchestrion:anthropic] subscribing to channel "${channel}"`);
bindTracingChannelToSpan(
diagnosticsChannel.tracingChannel<AnthropicChannelContext>(channel),
data => createGenAiSpan(data, operation, methodPath, options),
{
beforeSpanEnd: (span, data) => {
addAnthropicResponseAttributes(
span,
data.result as AnthropicAiResponse,
resolveAIRecordingOptions(options).recordOutputs,
);
},
Comment thread
nicohrubec marked this conversation as resolved.
deferSpanEnd: ({ span, data }) => wrapStreamResult(span, data, stream, options),
},
);
}
});
},
};
}) satisfies IntegrationFn;

/**
* Build the span for an instrumented call.
* Returning `undefined` opts the payload out so no span is opened.
*/
function createGenAiSpan(
data: AnthropicChannelContext,
operation: string,
methodPath: string,
options: AnthropicAiOptions,
): Span | undefined {
const args = data.arguments ?? [];

// When LangChain (or another provider) is driving the SDK, it records the spans itself and marks this
// provider as skipped — mirror the OTel integration and don't double-instrument.
if (_INTERNAL_shouldSkipAiProviderWrapping(INTEGRATION_NAME)) {
return undefined;
}

// `messages.stream()` internally calls the instrumented `messages.create({ stream: true })` tagged with
// an `X-Stainless-Helper-Method: 'stream'` header. The messages-stream channel already covers it, so skip
// the nested create to avoid a duplicate span.
const requestOptions = args[1] as { headers?: Record<string, unknown> } | undefined;
if (requestOptions?.headers?.['X-Stainless-Helper-Method'] === 'stream') {
return undefined;
}

const params = typeof args[0] === 'object' && args[0] !== null ? (args[0] as Record<string, unknown>) : undefined;

const { recordInputs } = resolveAIRecordingOptions(options);
const enableTruncation = shouldEnableTruncation(options.enableTruncation);

const attributes = extractAnthropicRequestAttributes(args, methodPath, operation);
const model = (attributes[GEN_AI_REQUEST_MODEL_ATTRIBUTE] as string) || 'unknown';
attributes[SEMANTIC_ATTRIBUTE_SENTRY_ORIGIN] = ORIGIN;

const span = startInactiveSpan({
name: `${operation} ${model}`,
op: `gen_ai.${operation}`,
attributes: attributes as Record<string, SpanAttributeValue>,
});

if (recordInputs && params) {
addAnthropicRequestAttributes(span, params, enableTruncation);
}

return span;
}

type AsyncIterableStream = { [Symbol.asyncIterator]: () => AsyncIterator<unknown> };
type MessageStreamEmitter = { on: (...args: unknown[]) => void };

function isAsyncIterable(value: unknown): value is AsyncIterableStream {
return !!value && typeof (value as AsyncIterableStream)[Symbol.asyncIterator] === 'function';
Comment thread
sentry[bot] marked this conversation as resolved.
}

function isMessageStream(value: unknown): value is MessageStreamEmitter {
return !!value && typeof (value as MessageStreamEmitter).on === 'function';
}

/**
* Hand span-ending ownership to a streamed result: returns `true` to skip the normal `beforeSpanEnd`,
* `false` for non-streaming results (which end via `beforeSpanEnd`).
*
* - `async-iterable`: patch the `Stream`'s async iterator in place so `instrumentAsyncIterableStream` ends
* the span when iteration finishes.
* - `message-stream`: `instrumentMessageStream` attaches `'message'`/`'error'` listeners that end the span.
*/
function wrapStreamResult(
span: Span,
data: AnthropicChannelContext,
stream: StreamMode,
options: AnthropicAiOptions,
): boolean {
const { recordOutputs } = resolveAIRecordingOptions(options);
const result = data.result;

if (stream === 'async-iterable' && isAsyncIterable(result)) {
const iterate = result[Symbol.asyncIterator].bind(result);
const instrumented = instrumentAsyncIterableStream({ [Symbol.asyncIterator]: iterate }, span, recordOutputs);
result[Symbol.asyncIterator] = () => instrumented;
Comment thread
nicohrubec marked this conversation as resolved.
return true;
Comment thread
cursor[bot] marked this conversation as resolved.
}

if (stream === 'message-stream' && isMessageStream(result)) {
instrumentMessageStream(result, span, recordOutputs);
return true;
}

return false;
}

/**
* EXPERIMENTAL — orchestrion-driven Anthropic integration. Subscribes to the `orchestrion:@anthropic-ai/sdk:*`
* diagnostics_channels injected into the SDK's chat (`messages`/`completions`/beta `messages`), `models`, and
* `messages.stream()` methods, so it requires the orchestrion runtime hook or bundler plugin.
*/
export const anthropicChannelIntegration = defineIntegration(_anthropicChannelIntegration);
3 changes: 3 additions & 0 deletions packages/server-utils/src/orchestrion/channels.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ export const CHANNELS = {
OPENAI_RESPONSES: 'orchestrion:openai:responses',
OPENAI_EMBEDDINGS: 'orchestrion:openai:embeddings',
OPENAI_CONVERSATIONS: 'orchestrion:openai:conversations',
ANTHROPIC_CHAT: 'orchestrion:@anthropic-ai/sdk:chat',
ANTHROPIC_MODELS: 'orchestrion:@anthropic-ai/sdk:models',
ANTHROPIC_MESSAGES_STREAM: 'orchestrion:@anthropic-ai/sdk:messages-stream',
} as const;

export type ChannelName = (typeof CHANNELS)[keyof typeof CHANNELS];
30 changes: 30 additions & 0 deletions packages/server-utils/src/orchestrion/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,36 @@ export const SENTRY_INSTRUMENTATIONS: InstrumentationConfig[] = [
module: { name: 'pg-pool', versionRange: '>=2.0.0 <4', filePath: 'index.js' },
functionQuery: { className: 'Pool', methodName: 'connect', kind: 'Auto' },
},
// One entry each for CJS/ESM
...(['resources/messages/messages.js', 'resources/messages/messages.mjs'].flatMap(filePath =>
(['create', 'countTokens'] as const).map(methodName => ({
channelName: 'chat',
module: { name: '@anthropic-ai/sdk', versionRange: '>=0.19.2 <1', filePath },
functionQuery: { className: 'Messages', methodName, kind: 'Auto' as const },
})),
) satisfies InstrumentationConfig[]),
...(['resources/completions.js', 'resources/completions.mjs'].map(filePath => ({
channelName: 'chat',
module: { name: '@anthropic-ai/sdk', versionRange: '>=0.19.2 <1', filePath },
functionQuery: { className: 'Completions', methodName: 'create', kind: 'Auto' as const },
})) satisfies InstrumentationConfig[]),
...(['resources/beta/messages/messages.js', 'resources/beta/messages/messages.mjs'].map(filePath => ({
channelName: 'chat',
module: { name: '@anthropic-ai/sdk', versionRange: '>=0.19.2 <1', filePath },
functionQuery: { className: 'Messages', methodName: 'create', kind: 'Auto' as const },
})) satisfies InstrumentationConfig[]),
...(['resources/models.js', 'resources/models.mjs'].map(filePath => ({
channelName: 'models',
module: { name: '@anthropic-ai/sdk', versionRange: '>=0.19.2 <1', filePath },
functionQuery: { className: 'Models', methodName: 'retrieve', kind: 'Auto' as const },
})) satisfies InstrumentationConfig[]),
// `messages.stream()` returns a synchronous emitter, not a promise, so `kind: 'Sync'` is required:
// `Auto`'s promise wrapper never publishes `end` for a non-thenable return, so the span would never end.
...(['resources/messages/messages.js', 'resources/messages/messages.mjs'].map(filePath => ({
channelName: 'messages-stream',
module: { name: '@anthropic-ai/sdk', versionRange: '>=0.19.2 <1', filePath },
functionQuery: { className: 'Messages', methodName: 'stream', kind: 'Sync' as const },
})) satisfies InstrumentationConfig[]),
];

/**
Expand Down
Loading
Loading