Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.logging.Level;
Expand Down Expand Up @@ -209,10 +210,39 @@ private void handleResponse(
@Override
public CompletableResultCode shutdown() {
client.dispatcher().cancelAll();
client.connectionPool().evictAll();

if (managedExecutor) {
client.dispatcher().executorService().shutdownNow();
ExecutorService executorService = client.dispatcher().executorService();
// Use shutdownNow() to interrupt idle threads immediately since we've cancelled all work
executorService.shutdownNow();

// Wait for threads to terminate in a background thread
CompletableResultCode result = new CompletableResultCode();
Thread terminationThread =
new Thread(
() -> {
try {
// Wait up to 5 seconds for threads to terminate
// Even if timeout occurs, we succeed since these are daemon threads
boolean terminated = executorService.awaitTermination(5, TimeUnit.SECONDS);
if (!terminated) {
logger.log(
Level.WARNING,
"Executor did not terminate within 5 seconds, proceeding with shutdown since threads are daemon threads.");
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
result.succeed();
}
},
"okhttp-shutdown");
terminationThread.setDaemon(true);
terminationThread.start();
return result;
}
client.connectionPool().evictAll();

return CompletableResultCode.ofSuccess();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,25 @@
package io.opentelemetry.exporter.sender.okhttp.internal;

import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;

import io.opentelemetry.sdk.common.CompletableResultCode;
import io.opentelemetry.sdk.common.export.HttpResponse;
import io.opentelemetry.sdk.common.export.MessageWriter;
import java.io.OutputStream;
import java.net.ServerSocket;
import java.net.URI;
import java.time.Duration;
import java.util.Collections;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.Nullable;
import org.junit.jupiter.api.Test;

class OkHttpHttpSenderTest {
Expand Down Expand Up @@ -51,6 +59,161 @@ void send_rejectedExecution_callsOnError() {
assertThat(responseRef.get()).isNull();
}

@Test
void shutdown_CompletableResultCodeShouldWaitForThreads() throws Exception {
// This test verifies that shutdown() returns a CompletableResultCode that only
// completes AFTER threads terminate, not immediately.

// Allocate an ephemeral port and immediately close it to get a port with nothing listening
int port;
try (ServerSocket socket = new ServerSocket(0)) {
port = socket.getLocalPort();
}

OkHttpHttpSender sender =
newSender("http://localhost:" + port, null); // null executor = managed

CompletableResultCode sendResult = new CompletableResultCode();
sender.send(
new NoOpRequestBodyWriter(), response -> sendResult.succeed(), error -> sendResult.fail());

// Give threads time to start
Thread.sleep(500);

CompletableResultCode shutdownResult = sender.shutdown();

// The CompletableResultCode should NOT be done() immediately because we need to wait for
// threads to terminate.
assertFalse(
shutdownResult.isDone(),
"CompletableResultCode should not be done immediately - it should wait for thread termination");

shutdownResult.join(10, TimeUnit.SECONDS);
assertTrue(shutdownResult.isDone(), "CompletableResultCode should be done after waiting");
assertTrue(shutdownResult.isSuccess(), "Shutdown should complete successfully");
}

@Test
void shutdown_NonManagedExecutor_ReturnsImmediately() {
// This test verifies that when using a non-managed executor (custom ExecutorService),
// shutdown() returns an already-completed CompletableResultCode immediately.

ExecutorService customExecutor = Executors.newSingleThreadExecutor();

try {
OkHttpHttpSender sender = newSender("http://localhost:8080", customExecutor);

CompletableResultCode shutdownResult = sender.shutdown();

assertTrue(
shutdownResult.isDone(),
"CompletableResultCode should be done immediately for non-managed executor");
assertTrue(shutdownResult.isSuccess(), "Shutdown should complete successfully");
} finally {
customExecutor.shutdownNow();
}
}

@Test
void shutdown_ExecutorDoesNotTerminateInTime_LogsWarningButSucceeds() throws Exception {
// This test verifies that when threads don't terminate within 5 seconds, a warning is logged
// but shutdown still succeeds.

int port;
try (ServerSocket socket = new ServerSocket(0)) {
port = socket.getLocalPort();
}

OkHttpHttpSender sender =
newSender("http://localhost:" + port, null); // null executor = managed

// Start multiple requests with callbacks that block longer than the 5-second timeout
CountDownLatch blockCallbacks = new CountDownLatch(1);
for (int i = 0; i < 3; i++) {
sender.send(
new NoOpRequestBodyWriter(),
response -> {
try {
blockCallbacks.await(10, TimeUnit.SECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
},
error -> {
try {
blockCallbacks.await(10, TimeUnit.SECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
}

Thread.sleep(500);

CompletableResultCode shutdownResult = sender.shutdown();

assertTrue(
shutdownResult.join(10, TimeUnit.SECONDS).isSuccess(),
"Shutdown should succeed even when threads don't terminate quickly");

blockCallbacks.countDown();
}

@Test
void shutdown_InterruptedWhileWaiting_StillSucceeds() throws Exception {
// This test verifies that if the shutdown thread is interrupted while waiting for termination,
// it still marks the shutdown as successful.

int port;
try (ServerSocket socket = new ServerSocket(0)) {
port = socket.getLocalPort();
}

OkHttpHttpSender sender =
newSender("http://localhost:" + port, null); // null executor = managed

sender.send(new NoOpRequestBodyWriter(), response -> {}, error -> {});

Thread.sleep(500);

CompletableResultCode shutdownResult = sender.shutdown();

// Give the shutdown thread a moment to start
Thread.sleep(100);

// Find and interrupt the okhttp-shutdown thread to trigger the InterruptedException path
Thread[] threads = new Thread[Thread.activeCount() + 10];
int count = Thread.enumerate(threads);
for (int i = 0; i < count; i++) {
Thread thread = threads[i];
if (thread != null && thread.getName().equals("okhttp-shutdown")) {
thread.interrupt();
break;
}
}

assertTrue(
shutdownResult.join(10, TimeUnit.SECONDS).isSuccess(),
"Shutdown should succeed even when interrupted");
}

private static OkHttpHttpSender newSender(
String endpoint, @Nullable ExecutorService executorService) {
return new OkHttpHttpSender(
URI.create(endpoint),
"text/plain",
null,
Duration.ofSeconds(10),
Duration.ofSeconds(10),
Collections::emptyMap,
null,
null,
null,
null,
executorService,
Long.MAX_VALUE);
}

private static class NoOpRequestBodyWriter implements MessageWriter {
@Override
public void writeMessage(OutputStream output) {}
Expand Down
Loading