From be25ac6001fa9791a259147d3fe7edfc2b403013 Mon Sep 17 00:00:00 2001 From: Sridhar Reddy Survi Date: Tue, 17 Mar 2026 10:50:20 -0500 Subject: [PATCH] Add fallback endpoint support for OTLP exporters When the primary OTLP endpoint fails with a transport error (after retries are exhausted), the exporter will automatically attempt to send telemetry data to a configurable fallback endpoint. This enables high-availability setups where a secondary collector can receive data when the primary is unavailable. Configuration via environment variables / system properties: - otel.exporter.otlp.fallback.endpoint (generic) - otel.exporter.otlp..fallback.endpoint (signal-specific) Programmatic configuration via builder: - setFallbackEndpoint(String) on all exporter builders Supported for all signal types (traces, metrics, logs) and both HTTP/protobuf and gRPC protocols. --- .../opentelemetry-exporter-otlp.txt | 19 ++- .../exporter/internal/grpc/GrpcExporter.java | 38 +++++- .../internal/grpc/GrpcExporterBuilder.java | 33 ++++- .../exporter/internal/http/HttpExporter.java | 49 +++++++- .../internal/http/HttpExporterBuilder.java | 36 +++++- .../internal/grpc/GrpcExporterTest.java | 111 +++++++++++++++++ .../internal/http/HttpExporterTest.java | 117 ++++++++++++++++++ .../OtlpHttpLogRecordExporterBuilder.java | 12 ++ .../OtlpHttpMetricExporterBuilder.java | 12 ++ .../trace/OtlpHttpSpanExporterBuilder.java | 12 ++ .../otlp/internal/OtlpConfigUtil.java | 60 +++++++++ .../OtlpLogRecordExporterProvider.java | 6 +- .../internal/OtlpMetricExporterProvider.java | 6 +- .../internal/OtlpSpanExporterProvider.java | 6 +- .../OtlpGrpcLogRecordExporterBuilder.java | 11 ++ .../OtlpGrpcMetricExporterBuilder.java | 11 ++ .../trace/OtlpGrpcSpanExporterBuilder.java | 11 ++ .../otlp/internal/OtlpConfigUtilTest.java | 87 +++++++++++++ 18 files changed, 623 insertions(+), 14 deletions(-) diff --git a/docs/apidiffs/current_vs_latest/opentelemetry-exporter-otlp.txt b/docs/apidiffs/current_vs_latest/opentelemetry-exporter-otlp.txt index 92a030821a3..e22ef8fe9e7 100644 --- a/docs/apidiffs/current_vs_latest/opentelemetry-exporter-otlp.txt +++ b/docs/apidiffs/current_vs_latest/opentelemetry-exporter-otlp.txt @@ -1,2 +1,19 @@ Comparing source compatibility of opentelemetry-exporter-otlp-1.61.0-SNAPSHOT.jar against opentelemetry-exporter-otlp-1.60.1.jar -No changes. \ No newline at end of file +*** MODIFIED CLASS: PUBLIC FINAL io.opentelemetry.exporter.otlp.http.logs.OtlpHttpLogRecordExporterBuilder (not serializable) + === CLASS FILE FORMAT VERSION: 52.0 <- 52.0 + +++ NEW METHOD: PUBLIC(+) io.opentelemetry.exporter.otlp.http.logs.OtlpHttpLogRecordExporterBuilder setFallbackEndpoint(java.lang.String) +*** MODIFIED CLASS: PUBLIC FINAL io.opentelemetry.exporter.otlp.http.metrics.OtlpHttpMetricExporterBuilder (not serializable) + === CLASS FILE FORMAT VERSION: 52.0 <- 52.0 + +++ NEW METHOD: PUBLIC(+) io.opentelemetry.exporter.otlp.http.metrics.OtlpHttpMetricExporterBuilder setFallbackEndpoint(java.lang.String) +*** MODIFIED CLASS: PUBLIC FINAL io.opentelemetry.exporter.otlp.http.trace.OtlpHttpSpanExporterBuilder (not serializable) + === CLASS FILE FORMAT VERSION: 52.0 <- 52.0 + +++ NEW METHOD: PUBLIC(+) io.opentelemetry.exporter.otlp.http.trace.OtlpHttpSpanExporterBuilder setFallbackEndpoint(java.lang.String) +*** MODIFIED CLASS: PUBLIC FINAL io.opentelemetry.exporter.otlp.logs.OtlpGrpcLogRecordExporterBuilder (not serializable) + === CLASS FILE FORMAT VERSION: 52.0 <- 52.0 + +++ NEW METHOD: PUBLIC(+) io.opentelemetry.exporter.otlp.logs.OtlpGrpcLogRecordExporterBuilder setFallbackEndpoint(java.lang.String) +*** MODIFIED CLASS: PUBLIC FINAL io.opentelemetry.exporter.otlp.metrics.OtlpGrpcMetricExporterBuilder (not serializable) + === CLASS FILE FORMAT VERSION: 52.0 <- 52.0 + +++ NEW METHOD: PUBLIC(+) io.opentelemetry.exporter.otlp.metrics.OtlpGrpcMetricExporterBuilder setFallbackEndpoint(java.lang.String) +*** MODIFIED CLASS: PUBLIC FINAL io.opentelemetry.exporter.otlp.trace.OtlpGrpcSpanExporterBuilder (not serializable) + === CLASS FILE FORMAT VERSION: 52.0 <- 52.0 + +++ NEW METHOD: PUBLIC(+) io.opentelemetry.exporter.otlp.trace.OtlpGrpcSpanExporterBuilder setFallbackEndpoint(java.lang.String) diff --git a/exporters/common/src/main/java/io/opentelemetry/exporter/internal/grpc/GrpcExporter.java b/exporters/common/src/main/java/io/opentelemetry/exporter/internal/grpc/GrpcExporter.java index 20ecfb83092..ec2b72408e3 100644 --- a/exporters/common/src/main/java/io/opentelemetry/exporter/internal/grpc/GrpcExporter.java +++ b/exporters/common/src/main/java/io/opentelemetry/exporter/internal/grpc/GrpcExporter.java @@ -17,10 +17,12 @@ import io.opentelemetry.sdk.common.internal.StandardComponentId; import io.opentelemetry.sdk.common.internal.ThrottlingLogger; import java.net.URI; +import java.util.Arrays; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Supplier; import java.util.logging.Level; import java.util.logging.Logger; +import javax.annotation.Nullable; /** * Generic gRPC exporter. @@ -41,6 +43,7 @@ public final class GrpcExporter { private final String type; private final GrpcSender grpcSender; + @Nullable private final GrpcSender fallbackGrpcSender; private final ExporterInstrumentation exporterMetrics; public GrpcExporter( @@ -49,8 +52,19 @@ public GrpcExporter( StandardComponentId componentId, Supplier meterProviderSupplier, URI endpoint) { + this(grpcSender, internalTelemetryVersion, componentId, meterProviderSupplier, endpoint, null); + } + + public GrpcExporter( + GrpcSender grpcSender, + InternalTelemetryVersion internalTelemetryVersion, + StandardComponentId componentId, + Supplier meterProviderSupplier, + URI endpoint, + @Nullable GrpcSender fallbackGrpcSender) { this.type = componentId.getStandardType().signal().logFriendlyName(); this.grpcSender = grpcSender; + this.fallbackGrpcSender = fallbackGrpcSender; this.exporterMetrics = new ExporterInstrumentation( internalTelemetryVersion, meterProviderSupplier, componentId, endpoint); @@ -69,7 +83,22 @@ public CompletableResultCode export(Marshaler exportRequest, int numItems) { grpcSender.send( exportRequest.toBinaryMessageWriter(), grpcResponse -> onResponse(result, metricRecording, grpcResponse), - throwable -> onError(result, metricRecording, throwable)); + throwable -> { + if (fallbackGrpcSender != null) { + logger.log( + Level.INFO, + "Primary endpoint failed for " + + type + + "s, attempting fallback endpoint. Error: " + + throwable.getMessage()); + fallbackGrpcSender.send( + exportRequest.toBinaryMessageWriter(), + grpcResponse -> onResponse(result, metricRecording, grpcResponse), + fallbackThrowable -> onError(result, metricRecording, fallbackThrowable)); + } else { + onError(result, metricRecording, throwable); + } + }); return result; } @@ -143,6 +172,11 @@ public CompletableResultCode shutdown() { logger.log(Level.INFO, "Calling shutdown() multiple times."); return CompletableResultCode.ofSuccess(); } - return grpcSender.shutdown(); + CompletableResultCode primaryResult = grpcSender.shutdown(); + if (fallbackGrpcSender != null) { + CompletableResultCode fallbackResult = fallbackGrpcSender.shutdown(); + return CompletableResultCode.ofAll(Arrays.asList(primaryResult, fallbackResult)); + } + return primaryResult; } } diff --git a/exporters/common/src/main/java/io/opentelemetry/exporter/internal/grpc/GrpcExporterBuilder.java b/exporters/common/src/main/java/io/opentelemetry/exporter/internal/grpc/GrpcExporterBuilder.java index 53a3d5c7c5e..a5a19a67ef9 100644 --- a/exporters/common/src/main/java/io/opentelemetry/exporter/internal/grpc/GrpcExporterBuilder.java +++ b/exporters/common/src/main/java/io/opentelemetry/exporter/internal/grpc/GrpcExporterBuilder.java @@ -57,6 +57,7 @@ public class GrpcExporterBuilder { private Duration timeout; private Duration connectTimeout = Duration.ofSeconds(DEFAULT_CONNECT_TIMEOUT_SECS); private URI endpoint; + @Nullable private URI fallbackEndpoint; @Nullable private Compressor compressor; private final Map constantHeaders = new HashMap<>(); private Supplier> headerSupplier = Collections::emptyMap; @@ -104,6 +105,11 @@ public GrpcExporterBuilder setEndpoint(String endpoint) { return this; } + public GrpcExporterBuilder setFallbackEndpoint(String fallbackEndpoint) { + this.fallbackEndpoint = ExporterBuilderUtil.validateEndpoint(fallbackEndpoint); + return this; + } + public GrpcExporterBuilder setCompression(@Nullable Compressor compressor) { this.compressor = compressor; return this; @@ -190,6 +196,7 @@ public GrpcExporterBuilder copy() { copy.internalTelemetryVersion = internalTelemetryVersion; copy.grpcChannel = grpcChannel; copy.componentLoader = componentLoader; + copy.fallbackEndpoint = fallbackEndpoint; return copy; } @@ -233,12 +240,33 @@ public GrpcExporter build() { grpcChannel)); LOGGER.log(Level.FINE, "Using GrpcSender: " + grpcSender.getClass().getName()); + GrpcSender fallbackSender = null; + if (fallbackEndpoint != null) { + boolean isFallbackPlainHttp = "http".equals(fallbackEndpoint.getScheme()); + fallbackSender = + grpcSenderProvider.createSender( + ImmutableGrpcSenderConfig.create( + fallbackEndpoint, + fullMethodName, + compressor, + timeout, + connectTimeout, + headerSupplier, + retryPolicy, + isFallbackPlainHttp ? null : tlsConfigHelper.getSslContext(), + isFallbackPlainHttp ? null : tlsConfigHelper.getTrustManager(), + executorService, + grpcChannel)); + LOGGER.log(Level.FINE, "Using fallback GrpcSender: " + fallbackSender.getClass().getName()); + } + return new GrpcExporter( grpcSender, internalTelemetryVersion, ComponentId.generateLazy(exporterType), meterProviderSupplier, - endpoint); + endpoint, + fallbackSender); } public String toString(boolean includePrefixAndSuffix) { @@ -247,6 +275,9 @@ public String toString(boolean includePrefixAndSuffix) { ? new StringJoiner(", ", "GrpcExporterBuilder{", "}") : new StringJoiner(", "); joiner.add("endpoint=" + endpoint.toString()); + if (fallbackEndpoint != null) { + joiner.add("fallbackEndpoint=" + fallbackEndpoint); + } joiner.add("fullMethodName=" + fullMethodName); joiner.add("timeoutNanos=" + timeout.toNanos()); joiner.add("connectTimeoutNanos=" + connectTimeout.toNanos()); diff --git a/exporters/common/src/main/java/io/opentelemetry/exporter/internal/http/HttpExporter.java b/exporters/common/src/main/java/io/opentelemetry/exporter/internal/http/HttpExporter.java index 277177eda19..009e97e81d2 100644 --- a/exporters/common/src/main/java/io/opentelemetry/exporter/internal/http/HttpExporter.java +++ b/exporters/common/src/main/java/io/opentelemetry/exporter/internal/http/HttpExporter.java @@ -19,6 +19,7 @@ import io.opentelemetry.sdk.common.internal.ThrottlingLogger; import java.io.IOException; import java.net.URI; +import java.util.Arrays; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Supplier; import java.util.logging.Level; @@ -41,6 +42,7 @@ public final class HttpExporter { private final String type; private final HttpSender httpSender; + @Nullable private final HttpSender fallbackHttpSender; private final ExporterInstrumentation exporterMetrics; private final boolean exportAsJson; @@ -51,8 +53,27 @@ public HttpExporter( InternalTelemetryVersion internalTelemetryVersion, URI endpoint, boolean exportAsJson) { + this( + componentId, + httpSender, + meterProviderSupplier, + internalTelemetryVersion, + endpoint, + exportAsJson, + null); + } + + public HttpExporter( + StandardComponentId componentId, + HttpSender httpSender, + Supplier meterProviderSupplier, + InternalTelemetryVersion internalTelemetryVersion, + URI endpoint, + boolean exportAsJson, + @Nullable HttpSender fallbackHttpSender) { this.type = componentId.getStandardType().signal().logFriendlyName(); this.httpSender = httpSender; + this.fallbackHttpSender = fallbackHttpSender; this.exporterMetrics = new ExporterInstrumentation( internalTelemetryVersion, meterProviderSupplier, componentId, endpoint); @@ -74,7 +95,26 @@ public CompletableResultCode export(Marshaler exportRequest, int numItems) { httpSender.send( messageWriter, httpResponse -> onResponse(result, metricRecording, httpResponse), - throwable -> onError(result, metricRecording, throwable)); + throwable -> { + if (fallbackHttpSender != null) { + logger.log( + Level.INFO, + "Primary endpoint failed for " + + type + + "s, attempting fallback endpoint. Error: " + + throwable.getMessage()); + MessageWriter fallbackMessageWriter = + exportAsJson + ? exportRequest.toJsonMessageWriter() + : exportRequest.toBinaryMessageWriter(); + fallbackHttpSender.send( + fallbackMessageWriter, + httpResponse -> onResponse(result, metricRecording, httpResponse), + fallbackThrowable -> onError(result, metricRecording, fallbackThrowable)); + } else { + onError(result, metricRecording, throwable); + } + }); return result; } @@ -131,7 +171,12 @@ public CompletableResultCode shutdown() { logger.log(Level.INFO, "Calling shutdown() multiple times."); return CompletableResultCode.ofSuccess(); } - return httpSender.shutdown(); + CompletableResultCode primaryResult = httpSender.shutdown(); + if (fallbackHttpSender != null) { + CompletableResultCode fallbackResult = fallbackHttpSender.shutdown(); + return CompletableResultCode.ofAll(Arrays.asList(primaryResult, fallbackResult)); + } + return primaryResult; } private static String extractErrorStatus(String statusMessage, @Nullable byte[] responseBody) { diff --git a/exporters/common/src/main/java/io/opentelemetry/exporter/internal/http/HttpExporterBuilder.java b/exporters/common/src/main/java/io/opentelemetry/exporter/internal/http/HttpExporterBuilder.java index 2e06f8caa26..51b60778abe 100644 --- a/exporters/common/src/main/java/io/opentelemetry/exporter/internal/http/HttpExporterBuilder.java +++ b/exporters/common/src/main/java/io/opentelemetry/exporter/internal/http/HttpExporterBuilder.java @@ -53,6 +53,7 @@ public final class HttpExporterBuilder { private StandardComponentId.ExporterType exporterType; private URI endpoint; + @Nullable private URI fallbackEndpoint; private Duration timeout = Duration.ofSeconds(DEFAULT_TIMEOUT_SECS); @Nullable private Compressor compressor; @@ -96,6 +97,11 @@ public HttpExporterBuilder setEndpoint(String endpoint) { return this; } + public HttpExporterBuilder setFallbackEndpoint(String fallbackEndpoint) { + this.fallbackEndpoint = ExporterBuilderUtil.validateEndpoint(fallbackEndpoint); + return this; + } + public HttpExporterBuilder setCompression(@Nullable Compressor compressor) { this.compressor = compressor; return this; @@ -205,6 +211,7 @@ public HttpExporterBuilder copy() { copy.internalTelemetryVersion = internalTelemetryVersion; copy.proxyOptions = proxyOptions; copy.componentLoader = componentLoader; + copy.fallbackEndpoint = fallbackEndpoint; return copy; } @@ -231,12 +238,13 @@ public HttpExporter build() { }; boolean isPlainHttp = endpoint.getScheme().equals("http"); + String contentType = exportAsJson ? "application/json" : "application/x-protobuf"; HttpSenderProvider httpSenderProvider = SenderUtil.resolveHttpSenderProvider(componentLoader); HttpSender httpSender = httpSenderProvider.createSender( ImmutableHttpSenderConfig.create( endpoint, - exportAsJson ? "application/json" : "application/x-protobuf", + contentType, compressor, timeout, connectTimeout, @@ -248,13 +256,34 @@ public HttpExporter build() { executorService)); LOGGER.log(Level.FINE, "Using HttpSender: " + httpSender.getClass().getName()); + HttpSender fallbackSender = null; + if (fallbackEndpoint != null) { + boolean isFallbackPlainHttp = fallbackEndpoint.getScheme().equals("http"); + fallbackSender = + httpSenderProvider.createSender( + ImmutableHttpSenderConfig.create( + fallbackEndpoint, + contentType, + compressor, + timeout, + connectTimeout, + headerSupplier, + proxyOptions, + retryPolicy, + isFallbackPlainHttp ? null : tlsConfigHelper.getSslContext(), + isFallbackPlainHttp ? null : tlsConfigHelper.getTrustManager(), + executorService)); + LOGGER.log(Level.FINE, "Using fallback HttpSender: " + fallbackSender.getClass().getName()); + } + return new HttpExporter( ComponentId.generateLazy(exporterType), httpSender, meterProviderSupplier, internalTelemetryVersion, endpoint, - exportAsJson); + exportAsJson, + fallbackSender); } public String toString(boolean includePrefixAndSuffix) { @@ -263,6 +292,9 @@ public String toString(boolean includePrefixAndSuffix) { ? new StringJoiner(", ", "HttpExporterBuilder{", "}") : new StringJoiner(", "); joiner.add("endpoint=" + endpoint); + if (fallbackEndpoint != null) { + joiner.add("fallbackEndpoint=" + fallbackEndpoint); + } joiner.add("timeoutNanos=" + timeout.toNanos()); joiner.add("proxyOptions=" + proxyOptions); joiner.add( diff --git a/exporters/common/src/test/java/io/opentelemetry/exporter/internal/grpc/GrpcExporterTest.java b/exporters/common/src/test/java/io/opentelemetry/exporter/internal/grpc/GrpcExporterTest.java index 0dcfdfbc0c9..6be23a4bb02 100644 --- a/exporters/common/src/test/java/io/opentelemetry/exporter/internal/grpc/GrpcExporterTest.java +++ b/exporters/common/src/test/java/io/opentelemetry/exporter/internal/grpc/GrpcExporterTest.java @@ -14,6 +14,7 @@ import io.opentelemetry.api.common.Attributes; import io.opentelemetry.exporter.internal.marshal.Marshaler; import io.opentelemetry.internal.testing.slf4j.SuppressLogger; +import io.opentelemetry.sdk.common.CompletableResultCode; import io.opentelemetry.sdk.common.InternalTelemetryVersion; import io.opentelemetry.sdk.common.export.GrpcResponse; import io.opentelemetry.sdk.common.export.GrpcSender; @@ -224,4 +225,114 @@ void testInternalTelemetry(StandardComponentId.ExporterType exporterType) { .hasBucketCounts(1)))); } } + + @Test + @SuppressLogger(GrpcExporter.class) + void export_FallbackOnPrimaryTransportError() { + GrpcSender mockPrimarySender = Mockito.mock(GrpcSender.class); + GrpcSender mockFallbackSender = Mockito.mock(GrpcSender.class); + Marshaler mockMarshaller = Mockito.mock(Marshaler.class); + + GrpcExporter exporter = + new GrpcExporter( + mockPrimarySender, + InternalTelemetryVersion.LATEST, + ComponentId.generateLazy(StandardComponentId.ExporterType.OTLP_GRPC_SPAN_EXPORTER), + () -> SdkMeterProvider.builder().build(), + URI.create("http://primary:4317"), + mockFallbackSender); + + // Primary sender fails with transport error, fallback succeeds + doAnswer( + invoc -> { + Consumer onError = invoc.getArgument(2); + onError.accept(new IOException("Connection refused")); + return null; + }) + .when(mockPrimarySender) + .send(any(), any(), any()); + + doAnswer( + invoc -> { + Consumer onResponse = invoc.getArgument(1); + onResponse.accept(ImmutableGrpcResponse.create(GrpcStatusCode.OK, null, new byte[0])); + return null; + }) + .when(mockFallbackSender) + .send(any(), any(), any()); + + exporter.export(mockMarshaller, 10); + + // Verify fallback was called + Mockito.verify(mockFallbackSender).send(any(), any(), any()); + } + + @Test + @SuppressLogger(GrpcExporter.class) + void export_NoFallbackOnSuccess() { + GrpcSender mockPrimarySender = Mockito.mock(GrpcSender.class); + GrpcSender mockFallbackSender = Mockito.mock(GrpcSender.class); + Marshaler mockMarshaller = Mockito.mock(Marshaler.class); + + GrpcExporter exporter = + new GrpcExporter( + mockPrimarySender, + InternalTelemetryVersion.LATEST, + ComponentId.generateLazy(StandardComponentId.ExporterType.OTLP_GRPC_SPAN_EXPORTER), + () -> SdkMeterProvider.builder().build(), + URI.create("http://primary:4317"), + mockFallbackSender); + + doAnswer( + invoc -> { + Consumer onResponse = invoc.getArgument(1); + onResponse.accept(ImmutableGrpcResponse.create(GrpcStatusCode.OK, null, new byte[0])); + return null; + }) + .when(mockPrimarySender) + .send(any(), any(), any()); + + exporter.export(mockMarshaller, 10); + + Mockito.verify(mockFallbackSender, Mockito.never()).send(any(), any(), any()); + } + + @Test + @SuppressLogger(GrpcExporter.class) + void export_BothEndpointsFail() { + GrpcSender mockPrimarySender = Mockito.mock(GrpcSender.class); + GrpcSender mockFallbackSender = Mockito.mock(GrpcSender.class); + Marshaler mockMarshaller = Mockito.mock(Marshaler.class); + + GrpcExporter exporter = + new GrpcExporter( + mockPrimarySender, + InternalTelemetryVersion.LATEST, + ComponentId.generateLazy(StandardComponentId.ExporterType.OTLP_GRPC_SPAN_EXPORTER), + () -> SdkMeterProvider.builder().build(), + URI.create("http://primary:4317"), + mockFallbackSender); + + doAnswer( + invoc -> { + Consumer onError = invoc.getArgument(2); + onError.accept(new IOException("Primary connection refused")); + return null; + }) + .when(mockPrimarySender) + .send(any(), any(), any()); + + doAnswer( + invoc -> { + Consumer onError = invoc.getArgument(2); + onError.accept(new IOException("Fallback connection refused")); + return null; + }) + .when(mockFallbackSender) + .send(any(), any(), any()); + + CompletableResultCode result = exporter.export(mockMarshaller, 10); + + assertThat(result.isSuccess()).isFalse(); + } } diff --git a/exporters/common/src/test/java/io/opentelemetry/exporter/internal/http/HttpExporterTest.java b/exporters/common/src/test/java/io/opentelemetry/exporter/internal/http/HttpExporterTest.java index c4d28be6586..ee93bb5173a 100644 --- a/exporters/common/src/test/java/io/opentelemetry/exporter/internal/http/HttpExporterTest.java +++ b/exporters/common/src/test/java/io/opentelemetry/exporter/internal/http/HttpExporterTest.java @@ -13,6 +13,7 @@ import io.opentelemetry.api.common.Attributes; import io.opentelemetry.exporter.internal.marshal.Marshaler; import io.opentelemetry.internal.testing.slf4j.SuppressLogger; +import io.opentelemetry.sdk.common.CompletableResultCode; import io.opentelemetry.sdk.common.InternalTelemetryVersion; import io.opentelemetry.sdk.common.export.HttpResponse; import io.opentelemetry.sdk.common.export.HttpSender; @@ -213,6 +214,122 @@ void testInternalTelemetry(StandardComponentId.ExporterType exporterType) { } } + @Test + @SuppressLogger(HttpExporter.class) + void export_FallbackOnPrimaryTransportError() { + HttpSender mockPrimarySender = Mockito.mock(HttpSender.class); + HttpSender mockFallbackSender = Mockito.mock(HttpSender.class); + Marshaler mockMarshaller = Mockito.mock(Marshaler.class); + + HttpExporter exporter = + new HttpExporter( + ComponentId.generateLazy(StandardComponentId.ExporterType.OTLP_HTTP_SPAN_EXPORTER), + mockPrimarySender, + () -> SdkMeterProvider.builder().build(), + InternalTelemetryVersion.LATEST, + URI.create("http://primary:4318"), + false, + mockFallbackSender); + + // Primary sender fails with transport error, fallback succeeds + doAnswer( + invoc -> { + Consumer onError = invoc.getArgument(2); + onError.accept(new IOException("Connection refused")); + return null; + }) + .when(mockPrimarySender) + .send(any(), any(), any()); + + doAnswer( + invoc -> { + Consumer onResponse = invoc.getArgument(1); + onResponse.accept(new FakeHttpResponse(200, "Ok")); + return null; + }) + .when(mockFallbackSender) + .send(any(), any(), any()); + + exporter.export(mockMarshaller, 10); + + // Verify fallback was called + Mockito.verify(mockFallbackSender).send(any(), any(), any()); + } + + @Test + @SuppressLogger(HttpExporter.class) + void export_NoFallbackOnSuccess() { + HttpSender mockPrimarySender = Mockito.mock(HttpSender.class); + HttpSender mockFallbackSender = Mockito.mock(HttpSender.class); + Marshaler mockMarshaller = Mockito.mock(Marshaler.class); + + HttpExporter exporter = + new HttpExporter( + ComponentId.generateLazy(StandardComponentId.ExporterType.OTLP_HTTP_SPAN_EXPORTER), + mockPrimarySender, + () -> SdkMeterProvider.builder().build(), + InternalTelemetryVersion.LATEST, + URI.create("http://primary:4318"), + false, + mockFallbackSender); + + // Primary sender succeeds + doAnswer( + invoc -> { + Consumer onResponse = invoc.getArgument(1); + onResponse.accept(new FakeHttpResponse(200, "Ok")); + return null; + }) + .when(mockPrimarySender) + .send(any(), any(), any()); + + exporter.export(mockMarshaller, 10); + + // Verify fallback was NOT called + Mockito.verify(mockFallbackSender, Mockito.never()).send(any(), any(), any()); + } + + @Test + @SuppressLogger(HttpExporter.class) + void export_BothEndpointsFail() { + HttpSender mockPrimarySender = Mockito.mock(HttpSender.class); + HttpSender mockFallbackSender = Mockito.mock(HttpSender.class); + Marshaler mockMarshaller = Mockito.mock(Marshaler.class); + + HttpExporter exporter = + new HttpExporter( + ComponentId.generateLazy(StandardComponentId.ExporterType.OTLP_HTTP_SPAN_EXPORTER), + mockPrimarySender, + () -> SdkMeterProvider.builder().build(), + InternalTelemetryVersion.LATEST, + URI.create("http://primary:4318"), + false, + mockFallbackSender); + + // Both senders fail + doAnswer( + invoc -> { + Consumer onError = invoc.getArgument(2); + onError.accept(new IOException("Primary connection refused")); + return null; + }) + .when(mockPrimarySender) + .send(any(), any(), any()); + + doAnswer( + invoc -> { + Consumer onError = invoc.getArgument(2); + onError.accept(new IOException("Fallback connection refused")); + return null; + }) + .when(mockFallbackSender) + .send(any(), any(), any()); + + CompletableResultCode result = exporter.export(mockMarshaller, 10); + + assertThat(result.isSuccess()).isFalse(); + } + private static class FakeHttpResponse implements HttpResponse { final int statusCode; diff --git a/exporters/otlp/all/src/main/java/io/opentelemetry/exporter/otlp/http/logs/OtlpHttpLogRecordExporterBuilder.java b/exporters/otlp/all/src/main/java/io/opentelemetry/exporter/otlp/http/logs/OtlpHttpLogRecordExporterBuilder.java index 8684a7680bc..9f507b7a33d 100644 --- a/exporters/otlp/all/src/main/java/io/opentelemetry/exporter/otlp/http/logs/OtlpHttpLogRecordExporterBuilder.java +++ b/exporters/otlp/all/src/main/java/io/opentelemetry/exporter/otlp/http/logs/OtlpHttpLogRecordExporterBuilder.java @@ -108,6 +108,18 @@ public OtlpHttpLogRecordExporterBuilder setEndpoint(String endpoint) { return this; } + /** + * Sets the fallback OTLP endpoint to connect to when the primary endpoint is unavailable. The + * endpoint must start with either http:// or https://, and include the full HTTP path. If the + * primary endpoint fails with a transport error (after retries), the exporter will attempt to + * send to this fallback endpoint. + */ + public OtlpHttpLogRecordExporterBuilder setFallbackEndpoint(String fallbackEndpoint) { + requireNonNull(fallbackEndpoint, "fallbackEndpoint"); + delegate.setFallbackEndpoint(fallbackEndpoint); + return this; + } + /** * Sets the method used to compress payloads. If unset, compression is disabled. Compression * method "gzip" and "none" are supported out of the box. Additional compression methods can be diff --git a/exporters/otlp/all/src/main/java/io/opentelemetry/exporter/otlp/http/metrics/OtlpHttpMetricExporterBuilder.java b/exporters/otlp/all/src/main/java/io/opentelemetry/exporter/otlp/http/metrics/OtlpHttpMetricExporterBuilder.java index 1e531d36e3d..fc0bd80f584 100644 --- a/exporters/otlp/all/src/main/java/io/opentelemetry/exporter/otlp/http/metrics/OtlpHttpMetricExporterBuilder.java +++ b/exporters/otlp/all/src/main/java/io/opentelemetry/exporter/otlp/http/metrics/OtlpHttpMetricExporterBuilder.java @@ -128,6 +128,18 @@ public OtlpHttpMetricExporterBuilder setEndpoint(String endpoint) { return this; } + /** + * Sets the fallback OTLP endpoint to connect to when the primary endpoint is unavailable. The + * endpoint must start with either http:// or https://, and include the full HTTP path. If the + * primary endpoint fails with a transport error (after retries), the exporter will attempt to + * send to this fallback endpoint. + */ + public OtlpHttpMetricExporterBuilder setFallbackEndpoint(String fallbackEndpoint) { + requireNonNull(fallbackEndpoint, "fallbackEndpoint"); + delegate.setFallbackEndpoint(fallbackEndpoint); + return this; + } + /** * Sets the method used to compress payloads. If unset, compression is disabled. Compression * method "gzip" and "none" are supported out of the box. Additional compression methods can be diff --git a/exporters/otlp/all/src/main/java/io/opentelemetry/exporter/otlp/http/trace/OtlpHttpSpanExporterBuilder.java b/exporters/otlp/all/src/main/java/io/opentelemetry/exporter/otlp/http/trace/OtlpHttpSpanExporterBuilder.java index 6480929299b..98ed8157a32 100644 --- a/exporters/otlp/all/src/main/java/io/opentelemetry/exporter/otlp/http/trace/OtlpHttpSpanExporterBuilder.java +++ b/exporters/otlp/all/src/main/java/io/opentelemetry/exporter/otlp/http/trace/OtlpHttpSpanExporterBuilder.java @@ -108,6 +108,18 @@ public OtlpHttpSpanExporterBuilder setEndpoint(String endpoint) { return this; } + /** + * Sets the fallback OTLP endpoint to connect to when the primary endpoint is unavailable. The + * endpoint must start with either http:// or https://, and include the full HTTP path. If the + * primary endpoint fails with a transport error (after retries), the exporter will attempt to + * send to this fallback endpoint. + */ + public OtlpHttpSpanExporterBuilder setFallbackEndpoint(String fallbackEndpoint) { + requireNonNull(fallbackEndpoint, "fallbackEndpoint"); + delegate.setFallbackEndpoint(fallbackEndpoint); + return this; + } + /** * Sets the method used to compress payloads. If unset, compression is disabled. Compression * method "gzip" and "none" are supported out of the box. Additional compression methods can be diff --git a/exporters/otlp/all/src/main/java/io/opentelemetry/exporter/otlp/internal/OtlpConfigUtil.java b/exporters/otlp/all/src/main/java/io/opentelemetry/exporter/otlp/internal/OtlpConfigUtil.java index 6dc0555f5d4..d201cc848f8 100644 --- a/exporters/otlp/all/src/main/java/io/opentelemetry/exporter/otlp/internal/OtlpConfigUtil.java +++ b/exporters/otlp/all/src/main/java/io/opentelemetry/exporter/otlp/internal/OtlpConfigUtil.java @@ -65,6 +65,38 @@ public static void configureOtlpExporterBuilder( Consumer setRetryPolicy, Consumer setMemoryMode, Consumer setInternalTelemetryVersion) { + configureOtlpExporterBuilder( + dataType, + config, + setComponentLoader, + setEndpoint, + addHeader, + setCompression, + setTimeout, + setTrustedCertificates, + setClientTls, + setRetryPolicy, + setMemoryMode, + setInternalTelemetryVersion, + null); + } + + /** Invoke the setters with the OTLP configuration for the {@code dataType}. */ + @SuppressWarnings("TooManyParameters") + public static void configureOtlpExporterBuilder( + String dataType, + ConfigProperties config, + Consumer setComponentLoader, + Consumer setEndpoint, + BiConsumer addHeader, + Consumer setCompression, + Consumer setTimeout, + Consumer setTrustedCertificates, + BiConsumer setClientTls, + Consumer setRetryPolicy, + Consumer setMemoryMode, + Consumer setInternalTelemetryVersion, + @Nullable Consumer setFallbackEndpoint) { setComponentLoader.accept(config.getComponentLoader()); String protocol = getOtlpProtocol(dataType, config); @@ -142,6 +174,34 @@ public static void configureOtlpExporterBuilder( setClientTls.accept(clientKeyBytes, clientKeyChainBytes); } + if (setFallbackEndpoint != null) { + boolean isFallbackHttpProtobuf = isHttpProtobuf; + URL fallbackEndpoint = + validateEndpoint( + config.getString("otel.exporter.otlp." + dataType + ".fallback.endpoint"), + isFallbackHttpProtobuf); + if (fallbackEndpoint != null) { + if (fallbackEndpoint.getPath().isEmpty()) { + fallbackEndpoint = createUrl(fallbackEndpoint, "/"); + } + } else { + fallbackEndpoint = + validateEndpoint( + config.getString("otel.exporter.otlp.fallback.endpoint"), isFallbackHttpProtobuf); + if (fallbackEndpoint != null && isFallbackHttpProtobuf) { + String fallbackPath = fallbackEndpoint.getPath(); + if (!fallbackPath.endsWith("/")) { + fallbackPath += "/"; + } + fallbackPath += signalPath(dataType); + fallbackEndpoint = createUrl(fallbackEndpoint, fallbackPath); + } + } + if (fallbackEndpoint != null) { + setFallbackEndpoint.accept(fallbackEndpoint.toString()); + } + } + Boolean retryDisabled = config.getBoolean("otel.java.exporter.otlp.retry.disabled"); if (retryDisabled != null && retryDisabled) { setRetryPolicy.accept(null); diff --git a/exporters/otlp/all/src/main/java/io/opentelemetry/exporter/otlp/internal/OtlpLogRecordExporterProvider.java b/exporters/otlp/all/src/main/java/io/opentelemetry/exporter/otlp/internal/OtlpLogRecordExporterProvider.java index 6a478db1984..e2a15b08707 100644 --- a/exporters/otlp/all/src/main/java/io/opentelemetry/exporter/otlp/internal/OtlpLogRecordExporterProvider.java +++ b/exporters/otlp/all/src/main/java/io/opentelemetry/exporter/otlp/internal/OtlpLogRecordExporterProvider.java @@ -54,7 +54,8 @@ public LogRecordExporter createExporter(ConfigProperties config) { builder::setClientTls, builder::setRetryPolicy, builder::setMemoryMode, - builder::setInternalTelemetryVersion); + builder::setInternalTelemetryVersion, + builder::setFallbackEndpoint); builder.setMeterProvider(meterProviderRef::get); return builder.build(); @@ -73,7 +74,8 @@ public LogRecordExporter createExporter(ConfigProperties config) { builder::setClientTls, builder::setRetryPolicy, builder::setMemoryMode, - builder::setInternalTelemetryVersion); + builder::setInternalTelemetryVersion, + builder::setFallbackEndpoint); builder.setMeterProvider(meterProviderRef::get); return builder.build(); diff --git a/exporters/otlp/all/src/main/java/io/opentelemetry/exporter/otlp/internal/OtlpMetricExporterProvider.java b/exporters/otlp/all/src/main/java/io/opentelemetry/exporter/otlp/internal/OtlpMetricExporterProvider.java index 35039182f23..71e758ff23c 100644 --- a/exporters/otlp/all/src/main/java/io/opentelemetry/exporter/otlp/internal/OtlpMetricExporterProvider.java +++ b/exporters/otlp/all/src/main/java/io/opentelemetry/exporter/otlp/internal/OtlpMetricExporterProvider.java @@ -55,7 +55,8 @@ public MetricExporter createExporter(ConfigProperties config) { builder::setClientTls, builder::setRetryPolicy, builder::setMemoryMode, - builder::setInternalTelemetryVersion); + builder::setInternalTelemetryVersion, + builder::setFallbackEndpoint); ExporterBuilderUtil.configureOtlpAggregationTemporality( config, builder::setAggregationTemporalitySelector); ExporterBuilderUtil.configureOtlpHistogramDefaultAggregation( @@ -78,7 +79,8 @@ public MetricExporter createExporter(ConfigProperties config) { builder::setClientTls, builder::setRetryPolicy, builder::setMemoryMode, - builder::setInternalTelemetryVersion); + builder::setInternalTelemetryVersion, + builder::setFallbackEndpoint); ExporterBuilderUtil.configureOtlpAggregationTemporality( config, builder::setAggregationTemporalitySelector); ExporterBuilderUtil.configureOtlpHistogramDefaultAggregation( diff --git a/exporters/otlp/all/src/main/java/io/opentelemetry/exporter/otlp/internal/OtlpSpanExporterProvider.java b/exporters/otlp/all/src/main/java/io/opentelemetry/exporter/otlp/internal/OtlpSpanExporterProvider.java index f4467a2f601..923e91abf92 100644 --- a/exporters/otlp/all/src/main/java/io/opentelemetry/exporter/otlp/internal/OtlpSpanExporterProvider.java +++ b/exporters/otlp/all/src/main/java/io/opentelemetry/exporter/otlp/internal/OtlpSpanExporterProvider.java @@ -53,7 +53,8 @@ public SpanExporter createExporter(ConfigProperties config) { builder::setClientTls, builder::setRetryPolicy, builder::setMemoryMode, - builder::setInternalTelemetryVersion); + builder::setInternalTelemetryVersion, + builder::setFallbackEndpoint); builder.setMeterProvider(meterProviderRef::get); return builder.build(); @@ -72,7 +73,8 @@ public SpanExporter createExporter(ConfigProperties config) { builder::setClientTls, builder::setRetryPolicy, builder::setMemoryMode, - builder::setInternalTelemetryVersion); + builder::setInternalTelemetryVersion, + builder::setFallbackEndpoint); builder.setMeterProvider(meterProviderRef::get); return builder.build(); diff --git a/exporters/otlp/all/src/main/java/io/opentelemetry/exporter/otlp/logs/OtlpGrpcLogRecordExporterBuilder.java b/exporters/otlp/all/src/main/java/io/opentelemetry/exporter/otlp/logs/OtlpGrpcLogRecordExporterBuilder.java index c115c05b91d..3014581619f 100644 --- a/exporters/otlp/all/src/main/java/io/opentelemetry/exporter/otlp/logs/OtlpGrpcLogRecordExporterBuilder.java +++ b/exporters/otlp/all/src/main/java/io/opentelemetry/exporter/otlp/logs/OtlpGrpcLogRecordExporterBuilder.java @@ -137,6 +137,17 @@ public OtlpGrpcLogRecordExporterBuilder setEndpoint(String endpoint) { return this; } + /** + * Sets the fallback OTLP endpoint to connect to when the primary endpoint is unavailable. The + * endpoint must start with either http:// or https://. If the primary endpoint fails with a + * transport error (after retries), the exporter will attempt to send to this fallback endpoint. + */ + public OtlpGrpcLogRecordExporterBuilder setFallbackEndpoint(String fallbackEndpoint) { + requireNonNull(fallbackEndpoint, "fallbackEndpoint"); + delegate.setFallbackEndpoint(fallbackEndpoint); + return this; + } + /** * Sets the method used to compress payloads. If unset, compression is disabled. Compression * method "gzip" and "none" are supported out of the box. Additional compression methods can be diff --git a/exporters/otlp/all/src/main/java/io/opentelemetry/exporter/otlp/metrics/OtlpGrpcMetricExporterBuilder.java b/exporters/otlp/all/src/main/java/io/opentelemetry/exporter/otlp/metrics/OtlpGrpcMetricExporterBuilder.java index 6b72ec3f36d..8fb6bce4986 100644 --- a/exporters/otlp/all/src/main/java/io/opentelemetry/exporter/otlp/metrics/OtlpGrpcMetricExporterBuilder.java +++ b/exporters/otlp/all/src/main/java/io/opentelemetry/exporter/otlp/metrics/OtlpGrpcMetricExporterBuilder.java @@ -156,6 +156,17 @@ public OtlpGrpcMetricExporterBuilder setEndpoint(String endpoint) { return this; } + /** + * Sets the fallback OTLP endpoint to connect to when the primary endpoint is unavailable. The + * endpoint must start with either http:// or https://. If the primary endpoint fails with a + * transport error (after retries), the exporter will attempt to send to this fallback endpoint. + */ + public OtlpGrpcMetricExporterBuilder setFallbackEndpoint(String fallbackEndpoint) { + requireNonNull(fallbackEndpoint, "fallbackEndpoint"); + delegate.setFallbackEndpoint(fallbackEndpoint); + return this; + } + /** * Sets the method used to compress payloads. If unset, compression is disabled. Compression * method "gzip" and "none" are supported out of the box. Additional compression methods can be diff --git a/exporters/otlp/all/src/main/java/io/opentelemetry/exporter/otlp/trace/OtlpGrpcSpanExporterBuilder.java b/exporters/otlp/all/src/main/java/io/opentelemetry/exporter/otlp/trace/OtlpGrpcSpanExporterBuilder.java index d1180b404ef..0c5c5fd6d5b 100644 --- a/exporters/otlp/all/src/main/java/io/opentelemetry/exporter/otlp/trace/OtlpGrpcSpanExporterBuilder.java +++ b/exporters/otlp/all/src/main/java/io/opentelemetry/exporter/otlp/trace/OtlpGrpcSpanExporterBuilder.java @@ -134,6 +134,17 @@ public OtlpGrpcSpanExporterBuilder setEndpoint(String endpoint) { return this; } + /** + * Sets the fallback OTLP endpoint to connect to when the primary endpoint is unavailable. The + * endpoint must start with either http:// or https://. If the primary endpoint fails with a + * transport error (after retries), the exporter will attempt to send to this fallback endpoint. + */ + public OtlpGrpcSpanExporterBuilder setFallbackEndpoint(String fallbackEndpoint) { + requireNonNull(fallbackEndpoint, "fallbackEndpoint"); + delegate.setFallbackEndpoint(fallbackEndpoint); + return this; + } + /** * Sets the method used to compress payloads. If unset, compression is disabled. Compression * method "gzip" and "none" are supported out of the box. Additional compression methods can be diff --git a/exporters/otlp/all/src/test/java/io/opentelemetry/exporter/otlp/internal/OtlpConfigUtilTest.java b/exporters/otlp/all/src/test/java/io/opentelemetry/exporter/otlp/internal/OtlpConfigUtilTest.java index 420c2f57e1e..81abb269aa9 100644 --- a/exporters/otlp/all/src/test/java/io/opentelemetry/exporter/otlp/internal/OtlpConfigUtilTest.java +++ b/exporters/otlp/all/src/test/java/io/opentelemetry/exporter/otlp/internal/OtlpConfigUtilTest.java @@ -385,6 +385,93 @@ private static String configureEndpoint(String dataType, Map pro return endpoint.get(); } + @Test + void configureOtlpExporterBuilder_FallbackEndpoint() { + // No fallback endpoint configured + assertThat( + configureFallbackEndpoint( + DATA_TYPE_TRACES, ImmutableMap.of("otel.exporter.otlp.protocol", "http/protobuf"))) + .isEmpty(); + + // Generic fallback endpoint + assertThat( + configureFallbackEndpoint( + DATA_TYPE_TRACES, + ImmutableMap.of( + "otel.exporter.otlp.protocol", + "http/protobuf", + "otel.exporter.otlp.fallback.endpoint", + "http://fallback:4318"))) + .isEqualTo("http://fallback:4318/v1/traces"); + + // Signal-specific fallback endpoint takes precedence + assertThat( + configureFallbackEndpoint( + DATA_TYPE_TRACES, + ImmutableMap.of( + "otel.exporter.otlp.protocol", + "http/protobuf", + "otel.exporter.otlp.fallback.endpoint", + "http://generic-fallback:4318", + "otel.exporter.otlp.traces.fallback.endpoint", + "http://traces-fallback:4318/v1/traces"))) + .isEqualTo("http://traces-fallback:4318/v1/traces"); + + // Fallback for metrics + assertThat( + configureFallbackEndpoint( + DATA_TYPE_METRICS, + ImmutableMap.of( + "otel.exporter.otlp.protocol", + "http/protobuf", + "otel.exporter.otlp.fallback.endpoint", + "http://fallback:4318"))) + .isEqualTo("http://fallback:4318/v1/metrics"); + + // Fallback for logs + assertThat( + configureFallbackEndpoint( + DATA_TYPE_LOGS, + ImmutableMap.of( + "otel.exporter.otlp.protocol", + "http/protobuf", + "otel.exporter.otlp.fallback.endpoint", + "http://fallback:4318"))) + .isEqualTo("http://fallback:4318/v1/logs"); + + // gRPC fallback endpoint + assertThat( + configureFallbackEndpoint( + DATA_TYPE_TRACES, + ImmutableMap.of( + "otel.exporter.otlp.protocol", + "grpc", + "otel.exporter.otlp.fallback.endpoint", + "http://fallback:4317"))) + .isEqualTo("http://fallback:4317"); + } + + private static String configureFallbackEndpoint(String dataType, Map properties) { + AtomicReference fallbackEndpoint = new AtomicReference<>(""); + + OtlpConfigUtil.configureOtlpExporterBuilder( + dataType, + DefaultConfigProperties.createFromMap(properties), + value -> {}, + value -> {}, + (value1, value2) -> {}, + value -> {}, + value -> {}, + value -> {}, + (value1, value2) -> {}, + value -> {}, + value -> {}, + value -> {}, + fallbackEndpoint::set); + + return fallbackEndpoint.get(); + } + @Test void configureOtlpAggregationTemporality() { assertThatThrownBy(