Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,2 +1,19 @@
Comparing source compatibility of opentelemetry-exporter-otlp-1.61.0-SNAPSHOT.jar against opentelemetry-exporter-otlp-1.60.1.jar
No changes.
*** MODIFIED CLASS: PUBLIC FINAL io.opentelemetry.exporter.otlp.http.logs.OtlpHttpLogRecordExporterBuilder (not serializable)
=== CLASS FILE FORMAT VERSION: 52.0 <- 52.0
+++ NEW METHOD: PUBLIC(+) io.opentelemetry.exporter.otlp.http.logs.OtlpHttpLogRecordExporterBuilder setFallbackEndpoint(java.lang.String)
*** MODIFIED CLASS: PUBLIC FINAL io.opentelemetry.exporter.otlp.http.metrics.OtlpHttpMetricExporterBuilder (not serializable)
=== CLASS FILE FORMAT VERSION: 52.0 <- 52.0
+++ NEW METHOD: PUBLIC(+) io.opentelemetry.exporter.otlp.http.metrics.OtlpHttpMetricExporterBuilder setFallbackEndpoint(java.lang.String)
*** MODIFIED CLASS: PUBLIC FINAL io.opentelemetry.exporter.otlp.http.trace.OtlpHttpSpanExporterBuilder (not serializable)
=== CLASS FILE FORMAT VERSION: 52.0 <- 52.0
+++ NEW METHOD: PUBLIC(+) io.opentelemetry.exporter.otlp.http.trace.OtlpHttpSpanExporterBuilder setFallbackEndpoint(java.lang.String)
*** MODIFIED CLASS: PUBLIC FINAL io.opentelemetry.exporter.otlp.logs.OtlpGrpcLogRecordExporterBuilder (not serializable)
=== CLASS FILE FORMAT VERSION: 52.0 <- 52.0
+++ NEW METHOD: PUBLIC(+) io.opentelemetry.exporter.otlp.logs.OtlpGrpcLogRecordExporterBuilder setFallbackEndpoint(java.lang.String)
*** MODIFIED CLASS: PUBLIC FINAL io.opentelemetry.exporter.otlp.metrics.OtlpGrpcMetricExporterBuilder (not serializable)
=== CLASS FILE FORMAT VERSION: 52.0 <- 52.0
+++ NEW METHOD: PUBLIC(+) io.opentelemetry.exporter.otlp.metrics.OtlpGrpcMetricExporterBuilder setFallbackEndpoint(java.lang.String)
*** MODIFIED CLASS: PUBLIC FINAL io.opentelemetry.exporter.otlp.trace.OtlpGrpcSpanExporterBuilder (not serializable)
=== CLASS FILE FORMAT VERSION: 52.0 <- 52.0
+++ NEW METHOD: PUBLIC(+) io.opentelemetry.exporter.otlp.trace.OtlpGrpcSpanExporterBuilder setFallbackEndpoint(java.lang.String)
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,12 @@
import io.opentelemetry.sdk.common.internal.StandardComponentId;
import io.opentelemetry.sdk.common.internal.ThrottlingLogger;
import java.net.URI;
import java.util.Arrays;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.annotation.Nullable;

/**
* Generic gRPC exporter.
Expand All @@ -41,6 +43,7 @@ public final class GrpcExporter {

private final String type;
private final GrpcSender grpcSender;
@Nullable private final GrpcSender fallbackGrpcSender;
private final ExporterInstrumentation exporterMetrics;

public GrpcExporter(
Expand All @@ -49,8 +52,19 @@ public GrpcExporter(
StandardComponentId componentId,
Supplier<MeterProvider> meterProviderSupplier,
URI endpoint) {
this(grpcSender, internalTelemetryVersion, componentId, meterProviderSupplier, endpoint, null);
}

public GrpcExporter(
GrpcSender grpcSender,
InternalTelemetryVersion internalTelemetryVersion,
StandardComponentId componentId,
Supplier<MeterProvider> meterProviderSupplier,
URI endpoint,
@Nullable GrpcSender fallbackGrpcSender) {
this.type = componentId.getStandardType().signal().logFriendlyName();
this.grpcSender = grpcSender;
this.fallbackGrpcSender = fallbackGrpcSender;
this.exporterMetrics =
new ExporterInstrumentation(
internalTelemetryVersion, meterProviderSupplier, componentId, endpoint);
Expand All @@ -69,7 +83,22 @@ public CompletableResultCode export(Marshaler exportRequest, int numItems) {
grpcSender.send(
exportRequest.toBinaryMessageWriter(),
grpcResponse -> onResponse(result, metricRecording, grpcResponse),
throwable -> onError(result, metricRecording, throwable));
throwable -> {
if (fallbackGrpcSender != null) {
logger.log(
Level.INFO,
"Primary endpoint failed for "
+ type
+ "s, attempting fallback endpoint. Error: "
+ throwable.getMessage());
fallbackGrpcSender.send(
exportRequest.toBinaryMessageWriter(),
grpcResponse -> onResponse(result, metricRecording, grpcResponse),
fallbackThrowable -> onError(result, metricRecording, fallbackThrowable));
} else {
onError(result, metricRecording, throwable);
}
});

return result;
}
Expand Down Expand Up @@ -143,6 +172,11 @@ public CompletableResultCode shutdown() {
logger.log(Level.INFO, "Calling shutdown() multiple times.");
return CompletableResultCode.ofSuccess();
}
return grpcSender.shutdown();
CompletableResultCode primaryResult = grpcSender.shutdown();
if (fallbackGrpcSender != null) {
CompletableResultCode fallbackResult = fallbackGrpcSender.shutdown();
return CompletableResultCode.ofAll(Arrays.asList(primaryResult, fallbackResult));
}
return primaryResult;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ public class GrpcExporterBuilder {
private Duration timeout;
private Duration connectTimeout = Duration.ofSeconds(DEFAULT_CONNECT_TIMEOUT_SECS);
private URI endpoint;
@Nullable private URI fallbackEndpoint;
@Nullable private Compressor compressor;
private final Map<String, String> constantHeaders = new HashMap<>();
private Supplier<Map<String, String>> headerSupplier = Collections::emptyMap;
Expand Down Expand Up @@ -104,6 +105,11 @@ public GrpcExporterBuilder setEndpoint(String endpoint) {
return this;
}

public GrpcExporterBuilder setFallbackEndpoint(String fallbackEndpoint) {
this.fallbackEndpoint = ExporterBuilderUtil.validateEndpoint(fallbackEndpoint);
return this;
}

public GrpcExporterBuilder setCompression(@Nullable Compressor compressor) {
this.compressor = compressor;
return this;
Expand Down Expand Up @@ -190,6 +196,7 @@ public GrpcExporterBuilder copy() {
copy.internalTelemetryVersion = internalTelemetryVersion;
copy.grpcChannel = grpcChannel;
copy.componentLoader = componentLoader;
copy.fallbackEndpoint = fallbackEndpoint;
return copy;
}

Expand Down Expand Up @@ -233,12 +240,33 @@ public GrpcExporter build() {
grpcChannel));
LOGGER.log(Level.FINE, "Using GrpcSender: " + grpcSender.getClass().getName());

GrpcSender fallbackSender = null;
if (fallbackEndpoint != null) {
boolean isFallbackPlainHttp = "http".equals(fallbackEndpoint.getScheme());
fallbackSender =
grpcSenderProvider.createSender(
ImmutableGrpcSenderConfig.create(
fallbackEndpoint,
fullMethodName,
compressor,
timeout,
connectTimeout,
headerSupplier,
retryPolicy,
isFallbackPlainHttp ? null : tlsConfigHelper.getSslContext(),
isFallbackPlainHttp ? null : tlsConfigHelper.getTrustManager(),
executorService,
grpcChannel));
LOGGER.log(Level.FINE, "Using fallback GrpcSender: " + fallbackSender.getClass().getName());
}

return new GrpcExporter(
grpcSender,
internalTelemetryVersion,
ComponentId.generateLazy(exporterType),
meterProviderSupplier,
endpoint);
endpoint,
fallbackSender);
}

public String toString(boolean includePrefixAndSuffix) {
Expand All @@ -247,6 +275,9 @@ public String toString(boolean includePrefixAndSuffix) {
? new StringJoiner(", ", "GrpcExporterBuilder{", "}")
: new StringJoiner(", ");
joiner.add("endpoint=" + endpoint.toString());
if (fallbackEndpoint != null) {
joiner.add("fallbackEndpoint=" + fallbackEndpoint);
}
joiner.add("fullMethodName=" + fullMethodName);
joiner.add("timeoutNanos=" + timeout.toNanos());
joiner.add("connectTimeoutNanos=" + connectTimeout.toNanos());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import io.opentelemetry.sdk.common.internal.ThrottlingLogger;
import java.io.IOException;
import java.net.URI;
import java.util.Arrays;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;
import java.util.logging.Level;
Expand All @@ -41,6 +42,7 @@ public final class HttpExporter {

private final String type;
private final HttpSender httpSender;
@Nullable private final HttpSender fallbackHttpSender;
private final ExporterInstrumentation exporterMetrics;
private final boolean exportAsJson;

Expand All @@ -51,8 +53,27 @@ public HttpExporter(
InternalTelemetryVersion internalTelemetryVersion,
URI endpoint,
boolean exportAsJson) {
this(
componentId,
httpSender,
meterProviderSupplier,
internalTelemetryVersion,
endpoint,
exportAsJson,
null);
}

public HttpExporter(
StandardComponentId componentId,
HttpSender httpSender,
Supplier<MeterProvider> meterProviderSupplier,
InternalTelemetryVersion internalTelemetryVersion,
URI endpoint,
boolean exportAsJson,
@Nullable HttpSender fallbackHttpSender) {
this.type = componentId.getStandardType().signal().logFriendlyName();
this.httpSender = httpSender;
this.fallbackHttpSender = fallbackHttpSender;
this.exporterMetrics =
new ExporterInstrumentation(
internalTelemetryVersion, meterProviderSupplier, componentId, endpoint);
Expand All @@ -74,7 +95,26 @@ public CompletableResultCode export(Marshaler exportRequest, int numItems) {
httpSender.send(
messageWriter,
httpResponse -> onResponse(result, metricRecording, httpResponse),
throwable -> onError(result, metricRecording, throwable));
throwable -> {
if (fallbackHttpSender != null) {
logger.log(
Level.INFO,
"Primary endpoint failed for "
+ type
+ "s, attempting fallback endpoint. Error: "
+ throwable.getMessage());
MessageWriter fallbackMessageWriter =
exportAsJson
? exportRequest.toJsonMessageWriter()
: exportRequest.toBinaryMessageWriter();
fallbackHttpSender.send(
fallbackMessageWriter,
httpResponse -> onResponse(result, metricRecording, httpResponse),
fallbackThrowable -> onError(result, metricRecording, fallbackThrowable));
} else {
onError(result, metricRecording, throwable);
}
});

return result;
}
Expand Down Expand Up @@ -131,7 +171,12 @@ public CompletableResultCode shutdown() {
logger.log(Level.INFO, "Calling shutdown() multiple times.");
return CompletableResultCode.ofSuccess();
}
return httpSender.shutdown();
CompletableResultCode primaryResult = httpSender.shutdown();
if (fallbackHttpSender != null) {
CompletableResultCode fallbackResult = fallbackHttpSender.shutdown();
return CompletableResultCode.ofAll(Arrays.asList(primaryResult, fallbackResult));
}
return primaryResult;
}

private static String extractErrorStatus(String statusMessage, @Nullable byte[] responseBody) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ public final class HttpExporterBuilder {
private StandardComponentId.ExporterType exporterType;

private URI endpoint;
@Nullable private URI fallbackEndpoint;

private Duration timeout = Duration.ofSeconds(DEFAULT_TIMEOUT_SECS);
@Nullable private Compressor compressor;
Expand Down Expand Up @@ -96,6 +97,11 @@ public HttpExporterBuilder setEndpoint(String endpoint) {
return this;
}

public HttpExporterBuilder setFallbackEndpoint(String fallbackEndpoint) {
this.fallbackEndpoint = ExporterBuilderUtil.validateEndpoint(fallbackEndpoint);
return this;
}

public HttpExporterBuilder setCompression(@Nullable Compressor compressor) {
this.compressor = compressor;
return this;
Expand Down Expand Up @@ -205,6 +211,7 @@ public HttpExporterBuilder copy() {
copy.internalTelemetryVersion = internalTelemetryVersion;
copy.proxyOptions = proxyOptions;
copy.componentLoader = componentLoader;
copy.fallbackEndpoint = fallbackEndpoint;
return copy;
}

Expand All @@ -231,12 +238,13 @@ public HttpExporter build() {
};

boolean isPlainHttp = endpoint.getScheme().equals("http");
String contentType = exportAsJson ? "application/json" : "application/x-protobuf";
HttpSenderProvider httpSenderProvider = SenderUtil.resolveHttpSenderProvider(componentLoader);
HttpSender httpSender =
httpSenderProvider.createSender(
ImmutableHttpSenderConfig.create(
endpoint,
exportAsJson ? "application/json" : "application/x-protobuf",
contentType,
compressor,
timeout,
connectTimeout,
Expand All @@ -248,13 +256,34 @@ public HttpExporter build() {
executorService));
LOGGER.log(Level.FINE, "Using HttpSender: " + httpSender.getClass().getName());

HttpSender fallbackSender = null;
if (fallbackEndpoint != null) {
boolean isFallbackPlainHttp = fallbackEndpoint.getScheme().equals("http");
fallbackSender =
httpSenderProvider.createSender(
ImmutableHttpSenderConfig.create(
fallbackEndpoint,
contentType,
compressor,
timeout,
connectTimeout,
headerSupplier,
proxyOptions,
retryPolicy,
isFallbackPlainHttp ? null : tlsConfigHelper.getSslContext(),
isFallbackPlainHttp ? null : tlsConfigHelper.getTrustManager(),
executorService));
LOGGER.log(Level.FINE, "Using fallback HttpSender: " + fallbackSender.getClass().getName());
}

return new HttpExporter(
ComponentId.generateLazy(exporterType),
httpSender,
meterProviderSupplier,
internalTelemetryVersion,
endpoint,
exportAsJson);
exportAsJson,
fallbackSender);
}

public String toString(boolean includePrefixAndSuffix) {
Expand All @@ -263,6 +292,9 @@ public String toString(boolean includePrefixAndSuffix) {
? new StringJoiner(", ", "HttpExporterBuilder{", "}")
: new StringJoiner(", ");
joiner.add("endpoint=" + endpoint);
if (fallbackEndpoint != null) {
joiner.add("fallbackEndpoint=" + fallbackEndpoint);
}
joiner.add("timeoutNanos=" + timeout.toNanos());
joiner.add("proxyOptions=" + proxyOptions);
joiner.add(
Expand Down
Loading
Loading