From 23ac0b3b8869e81bf43a94867ca381f7a618c6b7 Mon Sep 17 00:00:00 2001 From: Eunbin Son Date: Sat, 20 Jun 2026 15:05:48 +0900 Subject: [PATCH] Standardize OkHttpHttpSender shutdown to await executor termination --- .../okhttp/internal/OkHttpHttpSender.java | 34 +++- .../okhttp/internal/OkHttpHttpSenderTest.java | 163 ++++++++++++++++++ 2 files changed, 195 insertions(+), 2 deletions(-) diff --git a/exporters/sender/okhttp/src/main/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpHttpSender.java b/exporters/sender/okhttp/src/main/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpHttpSender.java index 19026e7691f..b5c611d1bcb 100644 --- a/exporters/sender/okhttp/src/main/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpHttpSender.java +++ b/exporters/sender/okhttp/src/main/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpHttpSender.java @@ -22,6 +22,7 @@ import java.util.Map; import java.util.concurrent.ExecutorService; import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.TimeUnit; import java.util.function.Consumer; import java.util.function.Supplier; import java.util.logging.Level; @@ -209,10 +210,39 @@ private void handleResponse( @Override public CompletableResultCode shutdown() { client.dispatcher().cancelAll(); + client.connectionPool().evictAll(); + if (managedExecutor) { - client.dispatcher().executorService().shutdownNow(); + ExecutorService executorService = client.dispatcher().executorService(); + // Use shutdownNow() to interrupt idle threads immediately since we've cancelled all work + executorService.shutdownNow(); + + // Wait for threads to terminate in a background thread + CompletableResultCode result = new CompletableResultCode(); + Thread terminationThread = + new Thread( + () -> { + try { + // Wait up to 5 seconds for threads to terminate + // Even if timeout occurs, we succeed since these are daemon threads + boolean terminated = executorService.awaitTermination(5, TimeUnit.SECONDS); + if (!terminated) { + logger.log( + Level.WARNING, + "Executor did not terminate within 5 seconds, proceeding with shutdown since threads are daemon threads."); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } finally { + result.succeed(); + } + }, + "okhttp-shutdown"); + terminationThread.setDaemon(true); + terminationThread.start(); + return result; } - client.connectionPool().evictAll(); + return CompletableResultCode.ofSuccess(); } diff --git a/exporters/sender/okhttp/src/test/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpHttpSenderTest.java b/exporters/sender/okhttp/src/test/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpHttpSenderTest.java index a74695df9af..0edc216aee6 100644 --- a/exporters/sender/okhttp/src/test/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpHttpSenderTest.java +++ b/exporters/sender/okhttp/src/test/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpHttpSenderTest.java @@ -6,17 +6,25 @@ package io.opentelemetry.exporter.sender.okhttp.internal; import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; +import io.opentelemetry.sdk.common.CompletableResultCode; import io.opentelemetry.sdk.common.export.HttpResponse; import io.opentelemetry.sdk.common.export.MessageWriter; import java.io.OutputStream; +import java.net.ServerSocket; import java.net.URI; import java.time.Duration; import java.util.Collections; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; +import javax.annotation.Nullable; import org.junit.jupiter.api.Test; class OkHttpHttpSenderTest { @@ -51,6 +59,161 @@ void send_rejectedExecution_callsOnError() { assertThat(responseRef.get()).isNull(); } + @Test + void shutdown_CompletableResultCodeShouldWaitForThreads() throws Exception { + // This test verifies that shutdown() returns a CompletableResultCode that only + // completes AFTER threads terminate, not immediately. + + // Allocate an ephemeral port and immediately close it to get a port with nothing listening + int port; + try (ServerSocket socket = new ServerSocket(0)) { + port = socket.getLocalPort(); + } + + OkHttpHttpSender sender = + newSender("http://localhost:" + port, null); // null executor = managed + + CompletableResultCode sendResult = new CompletableResultCode(); + sender.send( + new NoOpRequestBodyWriter(), response -> sendResult.succeed(), error -> sendResult.fail()); + + // Give threads time to start + Thread.sleep(500); + + CompletableResultCode shutdownResult = sender.shutdown(); + + // The CompletableResultCode should NOT be done() immediately because we need to wait for + // threads to terminate. + assertFalse( + shutdownResult.isDone(), + "CompletableResultCode should not be done immediately - it should wait for thread termination"); + + shutdownResult.join(10, TimeUnit.SECONDS); + assertTrue(shutdownResult.isDone(), "CompletableResultCode should be done after waiting"); + assertTrue(shutdownResult.isSuccess(), "Shutdown should complete successfully"); + } + + @Test + void shutdown_NonManagedExecutor_ReturnsImmediately() { + // This test verifies that when using a non-managed executor (custom ExecutorService), + // shutdown() returns an already-completed CompletableResultCode immediately. + + ExecutorService customExecutor = Executors.newSingleThreadExecutor(); + + try { + OkHttpHttpSender sender = newSender("http://localhost:8080", customExecutor); + + CompletableResultCode shutdownResult = sender.shutdown(); + + assertTrue( + shutdownResult.isDone(), + "CompletableResultCode should be done immediately for non-managed executor"); + assertTrue(shutdownResult.isSuccess(), "Shutdown should complete successfully"); + } finally { + customExecutor.shutdownNow(); + } + } + + @Test + void shutdown_ExecutorDoesNotTerminateInTime_LogsWarningButSucceeds() throws Exception { + // This test verifies that when threads don't terminate within 5 seconds, a warning is logged + // but shutdown still succeeds. + + int port; + try (ServerSocket socket = new ServerSocket(0)) { + port = socket.getLocalPort(); + } + + OkHttpHttpSender sender = + newSender("http://localhost:" + port, null); // null executor = managed + + // Start multiple requests with callbacks that block longer than the 5-second timeout + CountDownLatch blockCallbacks = new CountDownLatch(1); + for (int i = 0; i < 3; i++) { + sender.send( + new NoOpRequestBodyWriter(), + response -> { + try { + blockCallbacks.await(10, TimeUnit.SECONDS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + }, + error -> { + try { + blockCallbacks.await(10, TimeUnit.SECONDS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + }); + } + + Thread.sleep(500); + + CompletableResultCode shutdownResult = sender.shutdown(); + + assertTrue( + shutdownResult.join(10, TimeUnit.SECONDS).isSuccess(), + "Shutdown should succeed even when threads don't terminate quickly"); + + blockCallbacks.countDown(); + } + + @Test + void shutdown_InterruptedWhileWaiting_StillSucceeds() throws Exception { + // This test verifies that if the shutdown thread is interrupted while waiting for termination, + // it still marks the shutdown as successful. + + int port; + try (ServerSocket socket = new ServerSocket(0)) { + port = socket.getLocalPort(); + } + + OkHttpHttpSender sender = + newSender("http://localhost:" + port, null); // null executor = managed + + sender.send(new NoOpRequestBodyWriter(), response -> {}, error -> {}); + + Thread.sleep(500); + + CompletableResultCode shutdownResult = sender.shutdown(); + + // Give the shutdown thread a moment to start + Thread.sleep(100); + + // Find and interrupt the okhttp-shutdown thread to trigger the InterruptedException path + Thread[] threads = new Thread[Thread.activeCount() + 10]; + int count = Thread.enumerate(threads); + for (int i = 0; i < count; i++) { + Thread thread = threads[i]; + if (thread != null && thread.getName().equals("okhttp-shutdown")) { + thread.interrupt(); + break; + } + } + + assertTrue( + shutdownResult.join(10, TimeUnit.SECONDS).isSuccess(), + "Shutdown should succeed even when interrupted"); + } + + private static OkHttpHttpSender newSender( + String endpoint, @Nullable ExecutorService executorService) { + return new OkHttpHttpSender( + URI.create(endpoint), + "text/plain", + null, + Duration.ofSeconds(10), + Duration.ofSeconds(10), + Collections::emptyMap, + null, + null, + null, + null, + executorService, + Long.MAX_VALUE); + } + private static class NoOpRequestBodyWriter implements MessageWriter { @Override public void writeMessage(OutputStream output) {}