From da4f7a2a25d0bfe6036d35af196376d9e6147fe2 Mon Sep 17 00:00:00 2001 From: Radim Vansa Date: Tue, 18 Apr 2023 09:09:05 +0200 Subject: [PATCH] Add suspend/resume to ConnectionPool This new pair of methods intends to support snapshotting of the application when using OpenJDK CRaC. Signed-off-by: Radim Vansa --- .../core/net/impl/pool/ConnectionPool.java | 14 ++++ .../net/impl/pool/SimpleConnectionPool.java | 64 +++++++++++++++++-- 2 files changed, 73 insertions(+), 5 deletions(-) diff --git a/src/main/java/io/vertx/core/net/impl/pool/ConnectionPool.java b/src/main/java/io/vertx/core/net/impl/pool/ConnectionPool.java index ddc2883ae31..94f0757fa7a 100644 --- a/src/main/java/io/vertx/core/net/impl/pool/ConnectionPool.java +++ b/src/main/java/io/vertx/core/net/impl/pool/ConnectionPool.java @@ -149,4 +149,18 @@ static ConnectionPool pool(PoolConnector connector, int[] maxSizes, in * to take decisions, this can be used for statistic or testing purpose */ int requests(); + + /** + * Removes all connections from the pool and returns them in the handler. The pool + * is blocked from creating new connections until {@link #resume()} is invoked. + * + * @param handler the callback handler with the result + */ + void suspend(Handler>>> handler); + + /** + * Allows a {@link #suspend suspended} connection pool to continue allocating + * and serving new connections. + */ + void resume(); } diff --git a/src/main/java/io/vertx/core/net/impl/pool/SimpleConnectionPool.java b/src/main/java/io/vertx/core/net/impl/pool/SimpleConnectionPool.java index 0c599893f51..41f7bdb192d 100644 --- a/src/main/java/io/vertx/core/net/impl/pool/SimpleConnectionPool.java +++ b/src/main/java/io/vertx/core/net/impl/pool/SimpleConnectionPool.java @@ -18,11 +18,7 @@ import io.vertx.core.impl.ContextInternal; import io.vertx.core.impl.EventLoopContext; -import java.util.AbstractList; -import java.util.ArrayList; -import java.util.Iterator; -import java.util.List; -import java.util.NoSuchElementException; +import java.util.*; import java.util.function.BiFunction; import java.util.function.Function; import java.util.function.Predicate; @@ -903,4 +899,62 @@ public int size() { return size; } } + + @Override + public void suspend(Handler>>> handler) { + execute(new Suspend<>(handler)); + } + + private static class Suspend implements Executor.Action> { + private final Handler>>> handler; + + private Suspend(Handler>>> handler) { + this.handler = handler; + } + + @Override + public Task execute(SimpleConnectionPool pool) { + if (pool.closed) { + return new Task() { + @Override + public void run() { + handler.handle(Future.succeededFuture(Collections.emptyList())); + } + }; + } + List> list = new ArrayList<>(); + for (int i = 0; i < pool.size;i++) { + Slot slot = pool.slots[i]; + pool.slots[i] = null; + PoolWaiter waiter = slot.initiator; + if (waiter != null) { + pool.waiters.addFirst(slot.initiator); + slot.initiator = null; + } + list.add(slot.result.future()); + } + pool.size = 0; + // prevent creating further connections + pool.capacity = pool.maxCapacity; + return new Task() { + @Override + public void run() { + handler.handle(Future.succeededFuture(list)); + } + }; + } + } + + @Override + public void resume() { + execute(new Resume<>()); + } + + private static class Resume implements Executor.Action> { + @Override + public Task execute(SimpleConnectionPool pool) { + pool.capacity = 0; + return null; + } + } }