Skip to content

Commit 8602881

Browse files
committed
add exporter
1 parent 45408a4 commit 8602881

File tree

1 file changed

+175
-0
lines changed

1 file changed

+175
-0
lines changed
Lines changed: 175 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,175 @@
1+
import type { ReadableSpan } from '@opentelemetry/sdk-trace-base';
2+
import type { Client, Span, SpanV2JSON } from '@sentry/core';
3+
import {
4+
type SpanV2JSONWithSegmentRef,
5+
captureSpan,
6+
createSpanV2Envelope,
7+
debug,
8+
getDynamicSamplingContextFromSpan,
9+
safeSetSpanJSONAttributes,
10+
SEMANTIC_ATTRIBUTE_SENTRY_OP,
11+
SEMANTIC_ATTRIBUTE_SENTRY_ORIGIN,
12+
} from '@sentry/core';
13+
import { DEBUG_BUILD } from './debug-build';
14+
import { type ISentrySpanExporter, getSpanData } from './spanExporter';
15+
16+
type StreamingSpanExporterOptions = {
17+
flushInterval?: number;
18+
maxSpanLimit?: number;
19+
};
20+
21+
/**
22+
* A Sentry-specific exporter that buffers span JSON objects and streams them to Sentry
23+
* in Span v2 envelopes. This exporter works with pre-serialized span JSON rather than
24+
* OTel span instances to avoid mutating already-ended spans.
25+
*/
26+
export class StreamingSpanExporter implements ISentrySpanExporter {
27+
private _flushInterval: number;
28+
private _maxSpanLimit: number;
29+
30+
private _spanTreeMap: Map<string, Set<SpanV2JSONWithSegmentRef>>;
31+
32+
private _flushIntervalId: NodeJS.Timeout | null;
33+
34+
private _client: Client;
35+
36+
public constructor(client: Client, options?: StreamingSpanExporterOptions) {
37+
this._spanTreeMap = new Map();
38+
this._client = client;
39+
40+
const safeMaxSpanLimit =
41+
options?.maxSpanLimit && options.maxSpanLimit > 0 && options.maxSpanLimit <= 1000 ? options.maxSpanLimit : 1000;
42+
const safeFlushInterval = options?.flushInterval && options?.flushInterval > 0 ? options.flushInterval : 5_000;
43+
this._flushInterval = safeFlushInterval;
44+
this._maxSpanLimit = safeMaxSpanLimit;
45+
46+
this._flushIntervalId = setInterval(() => {
47+
this.flush();
48+
}, this._flushInterval);
49+
50+
this._client.on('processSpan', (spanJSON, hint) => {
51+
const { readOnlySpan } = hint;
52+
// TODO: This can be simplified by using spanJSON to get the data instead of the readOnlySpan
53+
// for now this is the easiest backwards-compatible way to get the data.
54+
const { op, description, data, origin = 'manual' } = getSpanData(readOnlySpan as unknown as ReadableSpan);
55+
const allData = {
56+
[SEMANTIC_ATTRIBUTE_SENTRY_ORIGIN]: origin,
57+
[SEMANTIC_ATTRIBUTE_SENTRY_OP]: op,
58+
...data,
59+
};
60+
safeSetSpanJSONAttributes(spanJSON, allData, spanJSON.attributes);
61+
spanJSON.name = description;
62+
});
63+
64+
this._client.on('enqueueSpan', spanJSON => {
65+
const traceId = spanJSON.trace_id;
66+
let traceBucket = this._spanTreeMap.get(traceId);
67+
if (traceBucket) {
68+
traceBucket.add(spanJSON);
69+
} else {
70+
traceBucket = new Set([spanJSON]);
71+
this._spanTreeMap.set(traceId, traceBucket);
72+
}
73+
74+
if (traceBucket.size >= this._maxSpanLimit) {
75+
this._flushTrace(traceId);
76+
this._debounceFlushInterval();
77+
}
78+
});
79+
}
80+
81+
/**
82+
* Enqueue a span JSON into the buffer
83+
*/
84+
public export(span: ReadableSpan & Span): void {
85+
captureSpan(span, this._client);
86+
}
87+
88+
/**
89+
* Try to flush any pending spans immediately.
90+
* This is called internally by the exporter (via _debouncedFlush),
91+
* but can also be triggered externally if we force-flush.
92+
*/
93+
public flush(): void {
94+
if (!this._spanTreeMap.size) {
95+
return;
96+
}
97+
98+
debug.log(`Flushing span tree map with ${this._spanTreeMap.size} traces`);
99+
100+
this._spanTreeMap.forEach((_, traceId) => {
101+
this._flushTrace(traceId);
102+
});
103+
this._debounceFlushInterval();
104+
}
105+
106+
/**
107+
* Clear the exporter.
108+
* This is called when the span processor is shut down.
109+
*/
110+
public clear(): void {
111+
if (this._flushIntervalId) {
112+
clearInterval(this._flushIntervalId);
113+
this._flushIntervalId = null;
114+
}
115+
// TODO (span-streaming): record client outcome for leftover spans?
116+
this._spanTreeMap.clear();
117+
}
118+
119+
/**
120+
* Flush a trace from the span tree map.
121+
*/
122+
private _flushTrace(traceId: string): void {
123+
const traceBucket = this._spanTreeMap.get(traceId);
124+
if (!traceBucket) {
125+
return;
126+
}
127+
128+
if (!traceBucket.size) {
129+
this._spanTreeMap.delete(traceId);
130+
return;
131+
}
132+
133+
// we checked against empty bucket above, so we can safely get the first span JSON here
134+
const firstSpanJSON = traceBucket.values().next().value;
135+
136+
// Extract the segment span reference for DSC calculation
137+
const segmentSpan = firstSpanJSON?._segmentSpan;
138+
if (!segmentSpan) {
139+
DEBUG_BUILD && debug.warn('No segment span reference found on span JSON, cannot compute DSC');
140+
this._spanTreeMap.delete(traceId);
141+
return;
142+
}
143+
144+
const dsc = getDynamicSamplingContextFromSpan(segmentSpan);
145+
146+
// Clean up segment span references before sending
147+
const cleanedSpans: SpanV2JSON[] = Array.from(traceBucket).map(spanJSON => {
148+
// eslint-disable-next-line @typescript-eslint/no-unused-vars
149+
const { _segmentSpan, ...cleanSpanJSON } = spanJSON;
150+
return cleanSpanJSON;
151+
});
152+
153+
const envelope = createSpanV2Envelope(cleanedSpans, dsc, this._client);
154+
155+
debug.log(`Sending span envelope for trace ${traceId} with ${cleanedSpans.length} spans`);
156+
157+
this._client.sendEnvelope(envelope).then(null, reason => {
158+
DEBUG_BUILD && debug.error('Error while sending span stream envelope:', reason);
159+
});
160+
161+
this._spanTreeMap.delete(traceId);
162+
}
163+
164+
/**
165+
* Debounce (reset) the flush interval.
166+
*/
167+
private _debounceFlushInterval(): void {
168+
if (this._flushIntervalId) {
169+
clearInterval(this._flushIntervalId);
170+
}
171+
this._flushIntervalId = setInterval(() => {
172+
this.flush();
173+
}, this._flushInterval);
174+
}
175+
}

0 commit comments

Comments
 (0)