Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,7 @@ private static final class Worker implements Runnable {
private volatile boolean continueWork = true;
private final ArrayList<SpanData> batch;
private final long maxQueueSize;
private final AtomicInteger droppedSpanCount = new AtomicInteger(0);

private Worker(
SpanExporter spanExporter,
Expand Down Expand Up @@ -212,6 +213,7 @@ private void addSpan(ReadableSpan span) {
spanProcessorInstrumentation.buildQueueMetricsOnce(maxQueueSize, queue::size);
if (!queue.offer(span)) {
spanProcessorInstrumentation.dropSpans(1);
droppedSpanCount.incrementAndGet();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jack-berg do you think it would make sense to add something to the SpanProcessorInstrumentation to get this information, rather than tracking it separately here?
/cc @anuraaga

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would have to maintain state over there but at least it would be hidden behind a central abstraction

An otel maximalist answer here would be:

  • Record the log using the otel log API (first solving Sketch out slf4j bridge #7905)
  • Rely on temporal correlation between logs and metrics for the user to look up the metric state at the time the log was observed

I'm not trying to boil the ocean for this though 😛

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah if the SpanProcessorInstrumentation was already using an async counter or such with tracking, just reading that would be fine. But it's probably better to keep that synchronous (I think?) so keeping this BSP implementation detail in BSP seems to make more sense

} else {
if (queueSize.incrementAndGet() >= spansNeeded.get()) {
signal.offer(true);
Expand Down Expand Up @@ -315,6 +317,18 @@ private void exportCurrentBatch() {
return;
}

int dropped = droppedSpanCount.getAndSet(0);
if (dropped > 0) {
logger.log(
Level.WARNING,
"BatchSpanProcessor dropped "
+ dropped
+ " span(s) since the last export because the queue is full"
+ " (maxQueueSize="
+ maxQueueSize
+ ")");
}

String error = null;
try {
CompletableResultCode result = spanExporter.export(Collections.unmodifiableList(batch));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import io.opentelemetry.api.internal.GuardedBy;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.Tracer;
import io.github.netmikey.logunit.api.LogCapturer;
import io.opentelemetry.internal.testing.slf4j.SuppressLogger;
import io.opentelemetry.sdk.common.CompletableResultCode;
import io.opentelemetry.sdk.trace.ReadableSpan;
Expand All @@ -29,6 +30,7 @@
import io.opentelemetry.sdk.trace.samplers.Sampler;
import io.opentelemetry.sdk.trace.samplers.SamplingResult;
import java.time.Duration;
import org.slf4j.event.Level;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
Expand All @@ -42,6 +44,7 @@
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.mockito.ArgumentMatchers;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
Expand All @@ -62,9 +65,13 @@ class BatchSpanProcessorTest {
@Mock private Sampler mockSampler;
@Mock private SpanExporter mockSpanExporter;

@RegisterExtension
LogCapturer logs = LogCapturer.create().captureForType(BatchSpanProcessor.class);

@BeforeEach
void setUp() {
when(mockSpanExporter.shutdown()).thenReturn(CompletableResultCode.ofSuccess());
when(mockSpanExporter.export(anyList())).thenReturn(CompletableResultCode.ofSuccess());
}

@AfterEach
Expand Down Expand Up @@ -232,6 +239,30 @@ void exportMoreSpansThanTheBufferSize() {
span6.toSpanData()));
}

@Test
void droppedSpanIsLogged() {
sdkTracerProvider =
SdkTracerProvider.builder()
.addSpanProcessor(
BatchSpanProcessor.builder(mockSpanExporter)
.setMaxQueueSize(1)
.setMaxExportBatchSize(1)
.setScheduleDelay(Duration.ofMillis(1))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we don't need any exports, may as well set to a huge number to reduce potential flakiness. And maybe it makes sense then to drop more than one span

.build())
.build();

// Add two spans quickly to trigger a drop when the queue is full.
createEndedSpan(SPAN_NAME_1);
createEndedSpan(SPAN_NAME_2);

await()
.untilAsserted(
() ->
logs.assertContains(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems reasonable to assert the whole message, especially since it has some dynamic parts

loggingEvent -> loggingEvent.getLevel().equals(Level.WARN),
"BatchSpanProcessor dropped"));
}

@Test
void forceExport() {
WaitingSpanExporter waitingSpanExporter =
Expand Down