diff --git a/.github/workflows/publish-docker.yaml b/.github/workflows/publish-docker.yaml index de7889321983..3e2a3c757dc1 100644 --- a/.github/workflows/publish-docker.yaml +++ b/.github/workflows/publish-docker.yaml @@ -75,6 +75,11 @@ jobs: uses: docker/setup-qemu-action@v3 - name: Set up Docker Buildx uses: docker/setup-buildx-action@v3 + - name: Build and push docker images based on Java 11 + env: + SW_OAP_BASE_IMAGE: eclipse-temurin:11-jre + TAG: ${{ env.TAG }}-java11 + run: make build.all docker.push - name: Build and push docker images based on Java 17 env: SW_OAP_BASE_IMAGE: eclipse-temurin:17-jre @@ -85,11 +90,6 @@ jobs: SW_OAP_BASE_IMAGE: eclipse-temurin:21-jre TAG: ${{ env.TAG }}-java21 run: make build.all docker.push - - name: Build and push docker images based on Java 25 - env: - SW_OAP_BASE_IMAGE: eclipse-temurin:25-jre - TAG: ${{ env.TAG }}-java25 - run: make build.all docker.push - name: Build and push docker images run: make build.all docker.push - name: Build and push data-generator image diff --git a/docker/oap/Dockerfile b/docker/oap/Dockerfile index b4072c746d76..5e346b80232d 100644 --- a/docker/oap/Dockerfile +++ b/docker/oap/Dockerfile @@ -14,7 +14,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -ARG BASE_IMAGE='eclipse-temurin:11-jre' +ARG BASE_IMAGE='eclipse-temurin:25-jre' FROM $BASE_IMAGE diff --git a/docs/en/changes/changes.md b/docs/en/changes/changes.md index ec5d97fd4cad..9cdeebd7fff6 100644 --- a/docs/en/changes/changes.md +++ b/docs/en/changes/changes.md @@ -12,6 +12,17 @@ * Add `CLAUDE.md` as AI assistant guide for the project. * Upgrade Groovy to 5.0.3 in OAP backend. * Bump up nodejs to v24.13.0 for the latest UI(booster-ui) compiling. +* Add virtual thread support (JDK 25+) for gRPC and Armeria HTTP server handler threads. + Set `SW_VIRTUAL_THREADS_ENABLED=false` to disable. + + | Pool | Threads (JDK < 25) | Threads (JDK 25+) | + |---|---|---| + | gRPC server handler (`core-grpc`, `receiver-grpc`, `als-grpc`, `ebpf-grpc`) | Cached platform (unbounded) | Virtual threads | + | HTTP blocking (`core-http`, `receiver-http`, `promql-http`, `logql-http`, `zipkin-query-http`, `zipkin-http`, `firehose-http`) | Cached platform (max 200) | Virtual threads | + | VT carrier threads (ForkJoinPool) | N/A | ~9 shared | + + On JDK 25+, all 11 thread pools above share ~9 carrier threads instead of up to 1,400+ platform threads. +* Change default Docker base image to JDK 25 (`eclipse-temurin:25-jre`). JDK 11 kept as `-java11` variant. #### OAP Server diff --git a/docs/en/setup/backend/configuration-vocabulary.md b/docs/en/setup/backend/configuration-vocabulary.md index 93fd86460ffd..c1f512f468e1 100644 --- a/docs/en/setup/backend/configuration-vocabulary.md +++ b/docs/en/setup/backend/configuration-vocabulary.md @@ -542,6 +542,16 @@ OAP will query the data from the "hot and warm" stage by default if the "warm" s | property | - | - | The group settings of property, such as UI and profiling. | - | - | | - | shardNum | - | Shards Number for property group. | SW_STORAGE_BANYANDB_PROPERTY_SHARD_NUM | 1 | | - | replicas | - | Replicas for property group. |SW_STORAGE_BANYANDB_PROPERTY_REPLICAS | 0 | + +## Standalone Environment Variables +The following environment variables are **not** backed by `application.yml`. They are read directly from the +process environment and take effect across all modules. + +| Environment Variable | Value(s) and Explanation | Default | +|-----------------------------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|---------| +| SW_OAL_ENGINE_DEBUG | Set to any non-empty value to dump OAL-generated `.class` files to disk (under the `oal-rt/` directory relative to the OAP working path). Useful for debugging code generation issues. Leave unset in production. | (not set, no files written) | +| SW_VIRTUAL_THREADS_ENABLED | Set to `false` to disable virtual threads on JDK 25+. On JDK 25+, gRPC server handler threads and HTTP blocking task executors are virtual threads by default. Set this variable to `false` to force traditional platform thread pools. Ignored on JDK versions below 25. | (not set, virtual threads enabled on JDK 25+) | + ## Note ¹ System Environment Variable name could be declared and changed in `application.yml/bydb.yaml`. The names listed here are simply diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java index d55dc78853ee..337208602d3d 100755 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java @@ -249,6 +249,7 @@ public void prepare() throws ServiceNotProvidedException, ModuleStartException { if (moduleConfig.getGRPCThreadPoolSize() > 0) { grpcServer.setThreadPoolSize(moduleConfig.getGRPCThreadPoolSize()); } + grpcServer.setThreadPoolName("core-grpc"); grpcServer.initialize(); HTTPServerConfig httpServerConfig = HTTPServerConfig.builder() @@ -264,6 +265,7 @@ public void prepare() throws ServiceNotProvidedException, ModuleStartException { setBootingParameter("oap.external.http.host", moduleConfig.getRestHost()); setBootingParameter("oap.external.http.port", moduleConfig.getRestPort()); httpServer = new HTTPServer(httpServerConfig); + httpServer.setBlockingTaskName("core-http"); httpServer.initialize(); this.registerServiceImplementation(ConfigService.class, new ConfigService(moduleConfig, this)); diff --git a/oap-server/server-library/library-server/src/main/java/org/apache/skywalking/oap/server/library/server/grpc/GRPCServer.java b/oap-server/server-library/library-server/src/main/java/org/apache/skywalking/oap/server/library/server/grpc/GRPCServer.java index 068597b166d6..3c55cbeb0cc2 100644 --- a/oap-server/server-library/library-server/src/main/java/org/apache/skywalking/oap/server/library/server/grpc/GRPCServer.java +++ b/oap-server/server-library/library-server/src/main/java/org/apache/skywalking/oap/server/library/server/grpc/GRPCServer.java @@ -38,6 +38,7 @@ import org.apache.skywalking.oap.server.library.server.ServerException; import org.apache.skywalking.oap.server.library.server.grpc.ssl.DynamicSslContext; import org.apache.skywalking.oap.server.library.server.pool.CustomThreadFactory; +import org.apache.skywalking.oap.server.library.util.VirtualThreads; @Slf4j public class GRPCServer implements Server { @@ -53,6 +54,7 @@ public class GRPCServer implements Server { private String trustedCAsFile; private DynamicSslContext sslContext; private int threadPoolSize; + private String threadPoolName = "grpcServerPool"; private static final Marker SERVER_START_MARKER = MarkerFactory.getMarker("Console"); public GRPCServer(String host, int port) { @@ -72,6 +74,10 @@ public void setThreadPoolSize(int threadPoolSize) { this.threadPoolSize = threadPoolSize; } + public void setThreadPoolName(String threadPoolName) { + this.threadPoolName = threadPoolName; + } + /** * Require for `server.crt` and `server.pem` for open ssl at server side. * @@ -96,11 +102,21 @@ public void initialize() { if (maxMessageSize > 0) { nettyServerBuilder.maxInboundMessageSize(maxMessageSize); } - if (threadPoolSize > 0) { - ExecutorService executor = new ThreadPoolExecutor( - threadPoolSize, threadPoolSize, 60, TimeUnit.SECONDS, new SynchronousQueue<>(), - new CustomThreadFactory("grpcServerPool"), new CustomRejectedExecutionHandler() - ); + final ExecutorService executor = VirtualThreads.createExecutor( + threadPoolName, + () -> { + if (threadPoolSize > 0) { + return new ThreadPoolExecutor( + threadPoolSize, threadPoolSize, 60, TimeUnit.SECONDS, + new SynchronousQueue<>(), + new CustomThreadFactory(threadPoolName), + new CustomRejectedExecutionHandler() + ); + } + return null; + } + ); + if (executor != null) { nettyServerBuilder.executor(executor); } diff --git a/oap-server/server-library/library-server/src/main/java/org/apache/skywalking/oap/server/library/server/http/HTTPServer.java b/oap-server/server-library/library-server/src/main/java/org/apache/skywalking/oap/server/library/server/http/HTTPServer.java index 0ab8b8925f54..1a352818fa27 100644 --- a/oap-server/server-library/library-server/src/main/java/org/apache/skywalking/oap/server/library/server/http/HTTPServer.java +++ b/oap-server/server-library/library-server/src/main/java/org/apache/skywalking/oap/server/library/server/http/HTTPServer.java @@ -35,10 +35,12 @@ import java.time.Duration; import java.util.List; import java.util.Set; +import java.util.concurrent.ScheduledExecutorService; import lombok.extern.slf4j.Slf4j; import org.apache.skywalking.oap.server.library.server.Server; import org.apache.skywalking.oap.server.library.server.ssl.PrivateKeyUtil; +import org.apache.skywalking.oap.server.library.util.VirtualThreads; import static java.util.Objects.requireNonNull; @@ -48,11 +50,16 @@ public class HTTPServer implements Server { protected ServerBuilder sb; // Health check service, supports HEAD, GET method. protected final Set allowedMethods = Sets.newHashSet(HttpMethod.HEAD); + private String blockingTaskName = "http-blocking"; public HTTPServer(HTTPServerConfig config) { this.config = config; } + public void setBlockingTaskName(final String blockingTaskName) { + this.blockingTaskName = blockingTaskName; + } + @Override public void initialize() { sb = com.linecorp.armeria.server.Server @@ -93,6 +100,14 @@ public void initialize() { sb.absoluteUriTransformer(this::transformAbsoluteURI); } + if (VirtualThreads.isSupported()) { + final ScheduledExecutorService blockingExecutor = VirtualThreads.createScheduledExecutor( + blockingTaskName, () -> null); + if (blockingExecutor != null) { + sb.blockingTaskExecutor(blockingExecutor, true); + } + } + log.info("Server root context path: {}", config.getContextPath()); } diff --git a/oap-server/server-library/library-util/src/main/java/org/apache/skywalking/oap/server/library/util/VirtualThreadScheduledExecutor.java b/oap-server/server-library/library-util/src/main/java/org/apache/skywalking/oap/server/library/util/VirtualThreadScheduledExecutor.java new file mode 100644 index 000000000000..6392283cb927 --- /dev/null +++ b/oap-server/server-library/library-util/src/main/java/org/apache/skywalking/oap/server/library/util/VirtualThreadScheduledExecutor.java @@ -0,0 +1,261 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.oap.server.library.util; + +import java.util.Collection; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.Delayed; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import lombok.extern.slf4j.Slf4j; + +/** + * A {@link ScheduledExecutorService} fully backed by virtual threads. + * + *

All methods — including {@code schedule()}, {@code scheduleAtFixedRate()}, + * and {@code scheduleWithFixedDelay()} — delegate to virtual threads. + * Scheduling is implemented by sleeping in a virtual thread (which does not + * block OS threads), eliminating the need for a platform timer thread. + * + *

This adapter bridges the gap between virtual thread executors (which return + * {@link ExecutorService}) and frameworks like Armeria that require a + * {@link ScheduledExecutorService} for their blocking task executor. + */ +@Slf4j +final class VirtualThreadScheduledExecutor implements ScheduledExecutorService { + + private final ExecutorService vtExecutor; + + VirtualThreadScheduledExecutor(final ExecutorService vtExecutor) { + this.vtExecutor = vtExecutor; + } + + // --- Core execution: delegate to virtual threads --- + + @Override + public void execute(final Runnable command) { + vtExecutor.execute(command); + } + + @Override + public Future submit(final Runnable task) { + return vtExecutor.submit(task); + } + + @Override + public Future submit(final Runnable task, final T result) { + return vtExecutor.submit(task, result); + } + + @Override + public Future submit(final Callable task) { + return vtExecutor.submit(task); + } + + @Override + public List> invokeAll(final Collection> tasks) + throws InterruptedException { + return vtExecutor.invokeAll(tasks); + } + + @Override + public List> invokeAll(final Collection> tasks, + final long timeout, final TimeUnit unit) + throws InterruptedException { + return vtExecutor.invokeAll(tasks, timeout, unit); + } + + @Override + public T invokeAny(final Collection> tasks) + throws InterruptedException, ExecutionException { + return vtExecutor.invokeAny(tasks); + } + + @Override + public T invokeAny(final Collection> tasks, + final long timeout, final TimeUnit unit) + throws InterruptedException, ExecutionException, TimeoutException { + return vtExecutor.invokeAny(tasks, timeout, unit); + } + + // --- Scheduling: sleep in virtual thread, then execute --- + + @Override + public ScheduledFuture schedule(final Runnable command, final long delay, final TimeUnit unit) { + final long triggerNanos = System.nanoTime() + unit.toNanos(delay); + final VirtualScheduledFuture sf = new VirtualScheduledFuture<>(triggerNanos); + sf.setFuture(vtExecutor.submit(() -> { + sleepUntil(triggerNanos); + command.run(); + return null; + })); + return sf; + } + + @Override + public ScheduledFuture schedule(final Callable callable, final long delay, + final TimeUnit unit) { + final long triggerNanos = System.nanoTime() + unit.toNanos(delay); + final VirtualScheduledFuture sf = new VirtualScheduledFuture<>(triggerNanos); + sf.setFuture(vtExecutor.submit(() -> { + sleepUntil(triggerNanos); + return callable.call(); + })); + return sf; + } + + @Override + public ScheduledFuture scheduleAtFixedRate(final Runnable command, final long initialDelay, + final long period, final TimeUnit unit) { + final long periodNanos = unit.toNanos(period); + final long firstTrigger = System.nanoTime() + unit.toNanos(initialDelay); + final VirtualScheduledFuture sf = new VirtualScheduledFuture<>(firstTrigger); + sf.setFuture(vtExecutor.submit(() -> { + long nextTrigger = firstTrigger; + sleepUntil(nextTrigger); + while (!Thread.currentThread().isInterrupted()) { + command.run(); + nextTrigger += periodNanos; + sf.updateTriggerNanos(nextTrigger); + sleepUntil(nextTrigger); + } + return null; + })); + return sf; + } + + @Override + public ScheduledFuture scheduleWithFixedDelay(final Runnable command, final long initialDelay, + final long delay, final TimeUnit unit) { + final long delayNanos = unit.toNanos(delay); + final long firstTrigger = System.nanoTime() + unit.toNanos(initialDelay); + final VirtualScheduledFuture sf = new VirtualScheduledFuture<>(firstTrigger); + sf.setFuture(vtExecutor.submit(() -> { + sleepUntil(firstTrigger); + while (!Thread.currentThread().isInterrupted()) { + command.run(); + final long nextTrigger = System.nanoTime() + delayNanos; + sf.updateTriggerNanos(nextTrigger); + sleepUntil(nextTrigger); + } + return null; + })); + return sf; + } + + private static void sleepUntil(final long triggerNanos) throws InterruptedException { + long remaining = triggerNanos - System.nanoTime(); + while (remaining > 0) { + TimeUnit.NANOSECONDS.sleep(remaining); + remaining = triggerNanos - System.nanoTime(); + } + } + + // --- Lifecycle --- + + @Override + public void shutdown() { + vtExecutor.shutdown(); + } + + @Override + public List shutdownNow() { + return vtExecutor.shutdownNow(); + } + + @Override + public boolean isShutdown() { + return vtExecutor.isShutdown(); + } + + @Override + public boolean isTerminated() { + return vtExecutor.isTerminated(); + } + + @Override + public boolean awaitTermination(final long timeout, final TimeUnit unit) throws InterruptedException { + return vtExecutor.awaitTermination(timeout, unit); + } + + /** + * A {@link ScheduledFuture} backed by a virtual thread {@link Future}. + */ + static final class VirtualScheduledFuture implements ScheduledFuture { + private volatile Future delegate; + private volatile long triggerNanos; + + VirtualScheduledFuture(final long triggerNanos) { + this.triggerNanos = triggerNanos; + } + + void setFuture(final Future delegate) { + this.delegate = delegate; + } + + void updateTriggerNanos(final long triggerNanos) { + this.triggerNanos = triggerNanos; + } + + @Override + public long getDelay(final TimeUnit unit) { + return unit.convert(triggerNanos - System.nanoTime(), TimeUnit.NANOSECONDS); + } + + @Override + public int compareTo(final Delayed other) { + if (other == this) { + return 0; + } + return Long.compare(getDelay(TimeUnit.NANOSECONDS), other.getDelay(TimeUnit.NANOSECONDS)); + } + + @Override + public boolean cancel(final boolean mayInterruptIfRunning) { + return delegate.cancel(mayInterruptIfRunning); + } + + @Override + public boolean isCancelled() { + return delegate.isCancelled(); + } + + @Override + public boolean isDone() { + return delegate.isDone(); + } + + @Override + public V get() throws InterruptedException, ExecutionException { + return delegate.get(); + } + + @Override + public V get(final long timeout, final TimeUnit unit) + throws InterruptedException, ExecutionException, TimeoutException { + return delegate.get(timeout, unit); + } + } +} diff --git a/oap-server/server-library/library-util/src/main/java/org/apache/skywalking/oap/server/library/util/VirtualThreads.java b/oap-server/server-library/library-util/src/main/java/org/apache/skywalking/oap/server/library/util/VirtualThreads.java new file mode 100644 index 000000000000..4f558157a2a4 --- /dev/null +++ b/oap-server/server-library/library-util/src/main/java/org/apache/skywalking/oap/server/library/util/VirtualThreads.java @@ -0,0 +1,201 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.oap.server.library.util; + +import java.lang.reflect.Method; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadFactory; +import java.util.function.Supplier; +import lombok.extern.slf4j.Slf4j; + +/** + * Unified executor factory for both virtual threads (JDK 25+) and platform threads. + * + *

Virtual threads (JEP 444) are available since JDK 21, but JDK 21-23 has a critical + * thread pinning bug where {@code synchronized} blocks prevent virtual threads from + * unmounting from carrier threads (see JEP 491). This was fixed in JDK 24, but JDK 24 + * is non-LTS. JDK 25 LTS is the first long-term support release with the fix. + * + *

This utility requires JDK 25+ to enable virtual threads, ensuring both + * the pinning fix and LTS support are present. All created threads (virtual or platform) + * are named with the provided prefix for monitoring and debugging. + */ +@Slf4j +public final class VirtualThreads { + + /** + * The minimum JDK version required for virtual thread support. + * JDK 25 is the first LTS with the synchronized pinning fix (JEP 491). + */ + static final int MINIMUM_JDK_VERSION = 25; + + private static final boolean SUPPORTED; + + /* + * Cached reflection handles for JDK 25+ virtual thread APIs: + * Thread.ofVirtual() -> Thread.Builder.OfVirtual + * Thread.Builder#name(String prefix, long start) -> Thread.Builder + * Thread.Builder#factory() -> ThreadFactory + * Executors.newThreadPerTaskExecutor(ThreadFactory) -> ExecutorService + */ + private static final Method OF_VIRTUAL; + private static final Method BUILDER_NAME; + private static final Method BUILDER_FACTORY; + private static final Method NEW_THREAD_PER_TASK_EXECUTOR; + + /** + * System environment variable to disable virtual threads on JDK 25+. + * Set {@code SW_VIRTUAL_THREADS_ENABLED=false} to force platform threads. + */ + static final String ENV_VIRTUAL_THREADS_ENABLED = "SW_VIRTUAL_THREADS_ENABLED"; + + static { + final int jdkVersion = Runtime.version().feature(); + boolean supported = false; + Method ofVirtual = null; + Method builderName = null; + Method builderFactory = null; + Method newThreadPerTaskExecutor = null; + + final String envValue = System.getenv(ENV_VIRTUAL_THREADS_ENABLED); + final boolean disabledByEnv = "false".equalsIgnoreCase(envValue); + + if (disabledByEnv) { + log.info("Virtual threads disabled by environment variable {}={}", + ENV_VIRTUAL_THREADS_ENABLED, envValue); + } else if (jdkVersion >= MINIMUM_JDK_VERSION) { + try { + ofVirtual = Thread.class.getMethod("ofVirtual"); + final Class builderClass = Class.forName("java.lang.Thread$Builder"); + builderName = builderClass.getMethod("name", String.class, long.class); + builderFactory = builderClass.getMethod("factory"); + newThreadPerTaskExecutor = Executors.class.getMethod( + "newThreadPerTaskExecutor", ThreadFactory.class); + supported = true; + log.info("Virtual threads available (JDK {})", jdkVersion); + } catch (final ReflectiveOperationException e) { + log.warn("JDK {} meets version requirement but virtual thread API " + + "not found, virtual threads disabled", jdkVersion, e); + } + } else { + log.info("Virtual threads require JDK {}+, current JDK is {}", + MINIMUM_JDK_VERSION, jdkVersion); + } + + SUPPORTED = supported; + OF_VIRTUAL = ofVirtual; + BUILDER_NAME = builderName; + BUILDER_FACTORY = builderFactory; + NEW_THREAD_PER_TASK_EXECUTOR = newThreadPerTaskExecutor; + } + + private VirtualThreads() { + } + + /** + * @return true if the current JDK version is 25+ and virtual thread API is available. + */ + public static boolean isSupported() { + return SUPPORTED; + } + + /** + * Create a named executor service with virtual threads enabled by default. + * On JDK 25+, creates a virtual-thread-per-task executor with threads named + * {@code {namePrefix}-0}, {@code {namePrefix}-1}, etc. + * On older JDKs, delegates to the provided {@code platformExecutorSupplier}. + * + * @param namePrefix prefix for virtual thread names + * @param platformExecutorSupplier supplies the platform-thread executor as fallback + * @return virtual thread executor on JDK 25+, or the supplier's executor otherwise + */ + public static ExecutorService createExecutor(final String namePrefix, + final Supplier platformExecutorSupplier) { + return createExecutor(namePrefix, true, platformExecutorSupplier); + } + + /** + * Create a named executor service. When {@code enableVirtualThreads} is true and JDK 25+, + * creates a virtual-thread-per-task executor with threads named + * {@code {namePrefix}-0}, {@code {namePrefix}-1}, etc. + * Otherwise, delegates to the provided {@code platformExecutorSupplier}. + * + * @param namePrefix prefix for virtual thread names + * @param enableVirtualThreads whether to use virtual threads (requires JDK 25+) + * @param platformExecutorSupplier supplies the platform-thread executor as fallback + * @return virtual thread executor or the supplier's executor + */ + public static ExecutorService createExecutor(final String namePrefix, + final boolean enableVirtualThreads, + final Supplier platformExecutorSupplier) { + if (enableVirtualThreads && SUPPORTED) { + try { + return createVirtualThreadExecutor(namePrefix); + } catch (final ReflectiveOperationException e) { + log.warn("Failed to create virtual thread executor [{}], " + + "falling back to platform threads", namePrefix, e); + } + } + return platformExecutorSupplier.get(); + } + + /** + * Create a named scheduled executor service with virtual threads enabled by default. + * On JDK 25+, creates a virtual-thread-backed {@link ScheduledExecutorService}. + * On older JDKs, delegates to the provided {@code platformExecutorSupplier}. + * + *

This is designed for frameworks (e.g. Armeria) that require a + * {@link ScheduledExecutorService} for their blocking task executor. + * All methods — including scheduling — are fully backed by virtual threads. + * Scheduling is implemented by sleeping in a virtual thread. + * + * @param namePrefix prefix for virtual thread names + * @param platformExecutorSupplier supplies the platform-thread executor as fallback + * @return virtual thread scheduled executor on JDK 25+, or the supplier's executor otherwise + */ + public static ScheduledExecutorService createScheduledExecutor( + final String namePrefix, + final Supplier platformExecutorSupplier) { + if (SUPPORTED) { + try { + final ExecutorService vtExecutor = createVirtualThreadExecutor(namePrefix); + return new VirtualThreadScheduledExecutor(vtExecutor); + } catch (final ReflectiveOperationException e) { + log.warn("Failed to create virtual thread scheduled executor [{}], " + + "falling back to platform threads", namePrefix, e); + } + } + return platformExecutorSupplier.get(); + } + + private static ExecutorService createVirtualThreadExecutor( + final String namePrefix) throws ReflectiveOperationException { + // Thread.ofVirtual().name("vt:" + namePrefix + "-", 0).factory() + final Object builder = OF_VIRTUAL.invoke(null); + final Object namedBuilder = BUILDER_NAME.invoke(builder, "vt:" + namePrefix + "-", 0L); + final ThreadFactory factory = (ThreadFactory) BUILDER_FACTORY.invoke(namedBuilder); + // Executors.newThreadPerTaskExecutor(factory) + final ExecutorService executor = + (ExecutorService) NEW_THREAD_PER_TASK_EXECUTOR.invoke(null, factory); + log.info("Created virtual-thread-per-task executor [{}]", namePrefix); + return executor; + } +} diff --git a/oap-server/server-library/library-util/src/test/java/org/apache/skywalking/oap/server/library/util/VirtualThreadsTest.java b/oap-server/server-library/library-util/src/test/java/org/apache/skywalking/oap/server/library/util/VirtualThreadsTest.java new file mode 100644 index 000000000000..325b621fb962 --- /dev/null +++ b/oap-server/server-library/library-util/src/test/java/org/apache/skywalking/oap/server/library/util/VirtualThreadsTest.java @@ -0,0 +1,164 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.skywalking.oap.server.library.util; + +import java.lang.reflect.Method; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertSame; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class VirtualThreadsTest { + + @Test + public void testIsSupportedMatchesJdkVersion() { + final int jdkVersion = Runtime.version().feature(); + final boolean expected = jdkVersion >= VirtualThreads.MINIMUM_JDK_VERSION; + assertEquals(expected, VirtualThreads.isSupported()); + } + + @Test + public void testVirtualThreadExecutor() throws Exception { + if (!VirtualThreads.isSupported()) { + return; + } + final ExecutorService executor = VirtualThreads.createExecutor( + "vt-check", true, () -> Executors.newSingleThreadExecutor()); + try { + final ThreadCapture capture = submitAndCapture(executor); + assertTrue(capture.name.startsWith("vt:vt-check-"), + "Virtual thread name should start with 'vt:vt-check-', but was: " + capture.name); + assertTrue(isVirtual(capture.thread), + "Thread should be virtual on JDK 25+"); + } finally { + executor.shutdown(); + } + } + + @Test + public void testForcePlatformFallback() throws Exception { + if (!VirtualThreads.isSupported()) { + return; + } + final AtomicLong counter = new AtomicLong(0); + final ExecutorService executor = VirtualThreads.createExecutor( + "pt-check", false, () -> new ThreadPoolExecutor( + 2, 2, 60, TimeUnit.SECONDS, new SynchronousQueue<>(), + r -> new Thread(r, "pt-check-" + counter.getAndIncrement()) + )); + try { + final ThreadCapture capture = submitAndCapture(executor); + assertTrue(capture.name.startsWith("pt-check-"), + "Platform thread name should start with 'pt-check-', but was: " + capture.name); + assertFalse(isVirtual(capture.thread), + "Thread should NOT be virtual when enableVirtualThreads=false"); + } finally { + executor.shutdown(); + } + } + + @Test + public void testScheduledExecutorUsesVirtualThreads() throws Exception { + if (!VirtualThreads.isSupported()) { + return; + } + final ScheduledExecutorService executor = VirtualThreads.createScheduledExecutor( + "sched-vt", () -> null); + assertNotNull(executor); + try { + // Test execute() runs on virtual threads + final ThreadCapture capture = submitAndCapture(executor); + assertTrue(capture.name.startsWith("vt:sched-vt-"), + "Scheduled executor virtual thread name should start with 'vt:sched-vt-', but was: " + + capture.name); + assertTrue(isVirtual(capture.thread), + "Scheduled executor should use virtual threads on JDK 25+"); + + // Test schedule() also dispatches to virtual threads + final AtomicReference scheduledRef = new AtomicReference<>(); + final CountDownLatch scheduledLatch = new CountDownLatch(1); + final ScheduledFuture future = executor.schedule(() -> { + scheduledRef.set(Thread.currentThread()); + scheduledLatch.countDown(); + }, 10, TimeUnit.MILLISECONDS); + assertTrue(scheduledLatch.await(5, TimeUnit.SECONDS), "Scheduled task did not complete"); + assertTrue(isVirtual(scheduledRef.get()), + "Scheduled task should run on a virtual thread"); + } finally { + executor.shutdown(); + } + } + + @Test + public void testFallbackUsedWhenNotSupported() { + if (VirtualThreads.isSupported()) { + return; + } + final ExecutorService fallback = Executors.newSingleThreadExecutor(); + try { + final ExecutorService result = VirtualThreads.createExecutor("test", () -> fallback); + assertSame(fallback, result); + } finally { + fallback.shutdown(); + } + } + + private ThreadCapture submitAndCapture(final ExecutorService executor) throws InterruptedException { + final AtomicReference threadRef = new AtomicReference<>(); + final CountDownLatch latch = new CountDownLatch(1); + executor.submit(() -> { + threadRef.set(Thread.currentThread()); + latch.countDown(); + }); + assertTrue(latch.await(5, TimeUnit.SECONDS), "Task did not complete in time"); + final Thread thread = threadRef.get(); + assertNotNull(thread); + return new ThreadCapture(thread, thread.getName()); + } + + /** + * Check Thread.isVirtual() via reflection (JDK 21+ API, compiled against JDK 11). + */ + private static boolean isVirtual(final Thread thread) throws Exception { + final Method isVirtual = Thread.class.getMethod("isVirtual"); + return (boolean) isVirtual.invoke(thread); + } + + private static class ThreadCapture { + final Thread thread; + final String name; + + ThreadCapture(final Thread thread, final String name) { + this.thread = thread; + this.name = name; + } + } +} diff --git a/oap-server/server-query-plugin/logql-plugin/src/main/java/org/apache/skywalking/oap/query/logql/LogQLProvider.java b/oap-server/server-query-plugin/logql-plugin/src/main/java/org/apache/skywalking/oap/query/logql/LogQLProvider.java index b23cd7d84ea2..7299b35149be 100644 --- a/oap-server/server-query-plugin/logql-plugin/src/main/java/org/apache/skywalking/oap/query/logql/LogQLProvider.java +++ b/oap-server/server-query-plugin/logql-plugin/src/main/java/org/apache/skywalking/oap/query/logql/LogQLProvider.java @@ -76,6 +76,7 @@ public void start() throws ServiceNotProvidedException, ModuleStartException { .build(); httpServer = new HTTPServer(httpServerConfig); + httpServer.setBlockingTaskName("logql-http"); httpServer.initialize(); httpServer.addHandler( new LogQLApiHandler(getManager()), diff --git a/oap-server/server-query-plugin/promql-plugin/src/main/java/org/apache/skywalking/oap/query/promql/PromQLProvider.java b/oap-server/server-query-plugin/promql-plugin/src/main/java/org/apache/skywalking/oap/query/promql/PromQLProvider.java index a41ce71a89a2..62c9fc0cb6b2 100644 --- a/oap-server/server-query-plugin/promql-plugin/src/main/java/org/apache/skywalking/oap/query/promql/PromQLProvider.java +++ b/oap-server/server-query-plugin/promql-plugin/src/main/java/org/apache/skywalking/oap/query/promql/PromQLProvider.java @@ -77,6 +77,7 @@ public void start() throws ServiceNotProvidedException, ModuleStartException { .build(); httpServer = new HTTPServer(httpServerConfig); + httpServer.setBlockingTaskName("promql-http"); httpServer.initialize(); httpServer.addHandler( new PromQLApiHandler(getManager(), config), diff --git a/oap-server/server-query-plugin/zipkin-query-plugin/src/main/java/org/apache/skywalking/oap/query/zipkin/ZipkinQueryProvider.java b/oap-server/server-query-plugin/zipkin-query-plugin/src/main/java/org/apache/skywalking/oap/query/zipkin/ZipkinQueryProvider.java index d5ab3a6be1cb..aa5ff0141e4c 100644 --- a/oap-server/server-query-plugin/zipkin-query-plugin/src/main/java/org/apache/skywalking/oap/query/zipkin/ZipkinQueryProvider.java +++ b/oap-server/server-query-plugin/zipkin-query-plugin/src/main/java/org/apache/skywalking/oap/query/zipkin/ZipkinQueryProvider.java @@ -77,6 +77,7 @@ public void start() throws ServiceNotProvidedException, ModuleStartException { .build(); httpServer = new HTTPServer(httpServerConfig); + httpServer.setBlockingTaskName("zipkin-query-http"); httpServer.initialize(); httpServer.addHandler( new ZipkinQueryHandler(config, getManager()), diff --git a/oap-server/server-receiver-plugin/aws-firehose-receiver/src/main/java/org/apache/skywalking/oap/server/receiver/aws/firehose/AWSFirehoseReceiverModuleProvider.java b/oap-server/server-receiver-plugin/aws-firehose-receiver/src/main/java/org/apache/skywalking/oap/server/receiver/aws/firehose/AWSFirehoseReceiverModuleProvider.java index 3ab6608e20fe..41d8ea15ecdf 100644 --- a/oap-server/server-receiver-plugin/aws-firehose-receiver/src/main/java/org/apache/skywalking/oap/server/receiver/aws/firehose/AWSFirehoseReceiverModuleProvider.java +++ b/oap-server/server-receiver-plugin/aws-firehose-receiver/src/main/java/org/apache/skywalking/oap/server/receiver/aws/firehose/AWSFirehoseReceiverModuleProvider.java @@ -81,6 +81,7 @@ public void prepare() throws ServiceNotProvidedException { .tlsCertChainPath(moduleConfig.getTlsCertChainPath()) .build(); httpServer = new HTTPServer(httpServerConfig); + httpServer.setBlockingTaskName("firehose-http"); httpServer.initialize(); } diff --git a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/EnvoyMetricReceiverProvider.java b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/EnvoyMetricReceiverProvider.java index ac4bcd5c1e8e..4f7b54928f81 100644 --- a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/EnvoyMetricReceiverProvider.java +++ b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/EnvoyMetricReceiverProvider.java @@ -102,6 +102,7 @@ public void prepare() throws ServiceNotProvidedException, ModuleStartException { if (config.getGRPCThreadPoolSize() > 0) { grpcServer.setThreadPoolSize(config.getGRPCThreadPoolSize()); } + grpcServer.setThreadPoolName("als-grpc"); grpcServer.initialize(); this.receiverGRPCHandlerRegister = new GRPCHandlerRegisterImpl(grpcServer); diff --git a/oap-server/server-receiver-plugin/skywalking-ebpf-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/ebpf/provider/EBPFReceiverProvider.java b/oap-server/server-receiver-plugin/skywalking-ebpf-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/ebpf/provider/EBPFReceiverProvider.java index 6041ac865b5a..f48ee9928f9f 100644 --- a/oap-server/server-receiver-plugin/skywalking-ebpf-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/ebpf/provider/EBPFReceiverProvider.java +++ b/oap-server/server-receiver-plugin/skywalking-ebpf-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/ebpf/provider/EBPFReceiverProvider.java @@ -107,6 +107,7 @@ public void start() throws ServiceNotProvidedException, ModuleStartException { if (config.getGRPCThreadPoolSize() > 0) { grpcServer.setThreadPoolSize(config.getGRPCThreadPoolSize()); } + grpcServer.setThreadPoolName("ebpf-grpc"); grpcServer.initialize(); this.receiverGRPCHandlerRegister = new GRPCHandlerRegisterImpl(grpcServer); diff --git a/oap-server/server-receiver-plugin/skywalking-sharing-server-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/sharing/server/SharingServerModuleProvider.java b/oap-server/server-receiver-plugin/skywalking-sharing-server-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/sharing/server/SharingServerModuleProvider.java index 933e09520762..7a678233723e 100644 --- a/oap-server/server-receiver-plugin/skywalking-sharing-server-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/sharing/server/SharingServerModuleProvider.java +++ b/oap-server/server-receiver-plugin/skywalking-sharing-server-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/sharing/server/SharingServerModuleProvider.java @@ -90,6 +90,7 @@ public void prepare() { setBootingParameter("oap.external.http.port", config.getRestPort()); httpServer = new HTTPServer(httpServerConfig); + httpServer.setBlockingTaskName("receiver-http"); httpServer.initialize(); this.registerServiceImplementation(HTTPHandlerRegister.class, new HTTPHandlerRegisterImpl(httpServer)); @@ -128,6 +129,7 @@ public void prepare() { if (config.getGRPCThreadPoolSize() > 0) { grpcServer.setThreadPoolSize(config.getGRPCThreadPoolSize()); } + grpcServer.setThreadPoolName("receiver-grpc"); grpcServer.initialize(); GRPCHandlerRegisterImpl grpcHandlerRegister = new GRPCHandlerRegisterImpl(grpcServer); diff --git a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/ZipkinReceiverProvider.java b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/ZipkinReceiverProvider.java index 441f9faaecd2..3878a6d7c0ce 100644 --- a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/ZipkinReceiverProvider.java +++ b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/ZipkinReceiverProvider.java @@ -88,6 +88,7 @@ public void start() throws ServiceNotProvidedException, ModuleStartException { .build(); httpServer = new HTTPServer(httpServerConfig); + httpServer.setBlockingTaskName("zipkin-http"); httpServer.initialize(); httpServer.addHandler(