diff --git a/sdk/trace/src/main/java/io/opentelemetry/sdk/trace/export/BatchSpanProcessor.java b/sdk/trace/src/main/java/io/opentelemetry/sdk/trace/export/BatchSpanProcessor.java index 2febbb46667..f264128696e 100644 --- a/sdk/trace/src/main/java/io/opentelemetry/sdk/trace/export/BatchSpanProcessor.java +++ b/sdk/trace/src/main/java/io/opentelemetry/sdk/trace/export/BatchSpanProcessor.java @@ -184,6 +184,7 @@ private static final class Worker implements Runnable { private volatile boolean continueWork = true; private final ArrayList batch; private final long maxQueueSize; + private final AtomicInteger droppedSpanCount = new AtomicInteger(0); private Worker( SpanExporter spanExporter, @@ -212,6 +213,7 @@ private void addSpan(ReadableSpan span) { spanProcessorInstrumentation.buildQueueMetricsOnce(maxQueueSize, queue::size); if (!queue.offer(span)) { spanProcessorInstrumentation.dropSpans(1); + droppedSpanCount.incrementAndGet(); } else { if (queueSize.incrementAndGet() >= spansNeeded.get()) { signal.offer(true); @@ -315,6 +317,18 @@ private void exportCurrentBatch() { return; } + int dropped = droppedSpanCount.getAndSet(0); + if (dropped > 0) { + logger.log( + Level.WARNING, + "BatchSpanProcessor dropped " + + dropped + + " span(s) since the last export because the queue is full" + + " (maxQueueSize=" + + maxQueueSize + + ")"); + } + String error = null; try { CompletableResultCode result = spanExporter.export(Collections.unmodifiableList(batch)); diff --git a/sdk/trace/src/test/java/io/opentelemetry/sdk/trace/export/BatchSpanProcessorTest.java b/sdk/trace/src/test/java/io/opentelemetry/sdk/trace/export/BatchSpanProcessorTest.java index 98bc45705b3..d64b4604b6c 100644 --- a/sdk/trace/src/test/java/io/opentelemetry/sdk/trace/export/BatchSpanProcessorTest.java +++ b/sdk/trace/src/test/java/io/opentelemetry/sdk/trace/export/BatchSpanProcessorTest.java @@ -21,6 +21,7 @@ import io.opentelemetry.api.internal.GuardedBy; import io.opentelemetry.api.trace.Span; import io.opentelemetry.api.trace.Tracer; +import io.github.netmikey.logunit.api.LogCapturer; import io.opentelemetry.internal.testing.slf4j.SuppressLogger; import io.opentelemetry.sdk.common.CompletableResultCode; import io.opentelemetry.sdk.trace.ReadableSpan; @@ -29,6 +30,7 @@ import io.opentelemetry.sdk.trace.samplers.Sampler; import io.opentelemetry.sdk.trace.samplers.SamplingResult; import java.time.Duration; +import org.slf4j.event.Level; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -42,6 +44,7 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.api.extension.RegisterExtension; import org.mockito.ArgumentMatchers; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; @@ -62,9 +65,13 @@ class BatchSpanProcessorTest { @Mock private Sampler mockSampler; @Mock private SpanExporter mockSpanExporter; + @RegisterExtension + LogCapturer logs = LogCapturer.create().captureForType(BatchSpanProcessor.class); + @BeforeEach void setUp() { when(mockSpanExporter.shutdown()).thenReturn(CompletableResultCode.ofSuccess()); + when(mockSpanExporter.export(anyList())).thenReturn(CompletableResultCode.ofSuccess()); } @AfterEach @@ -232,6 +239,30 @@ void exportMoreSpansThanTheBufferSize() { span6.toSpanData())); } + @Test + void droppedSpanIsLogged() { + sdkTracerProvider = + SdkTracerProvider.builder() + .addSpanProcessor( + BatchSpanProcessor.builder(mockSpanExporter) + .setMaxQueueSize(1) + .setMaxExportBatchSize(1) + .setScheduleDelay(Duration.ofMillis(1)) + .build()) + .build(); + + // Add two spans quickly to trigger a drop when the queue is full. + createEndedSpan(SPAN_NAME_1); + createEndedSpan(SPAN_NAME_2); + + await() + .untilAsserted( + () -> + logs.assertContains( + loggingEvent -> loggingEvent.getLevel().equals(Level.WARN), + "BatchSpanProcessor dropped")); + } + @Test void forceExport() { WaitingSpanExporter waitingSpanExporter =