Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -109,48 +109,6 @@ private void initDiscoveryListener()
public void gracefullyShutdownClientChannels()
{
LOG.warn("Gracefully shutting down all client channels");
try {


// Mark all active connections to be closed after next response sent.
LOG.warn("Flagging CLOSE_AFTER_RESPONSE on " + channels.size() + " client channels.");
// Pick some arbitrary executor.
PromiseCombiner closeAfterPromises = new PromiseCombiner(ImmediateEventExecutor.INSTANCE);
for (Channel channel : channels)
{
ConnectionCloseType.setForChannel(channel, ConnectionCloseType.DELAYED_GRACEFUL);

ChannelPromise closePromise = channel.pipeline().newPromise();
channel.attr(ConnectionCloseChannelAttributes.CLOSE_AFTER_RESPONSE).set(closePromise);
// TODO(carl-mastrangelo): remove closePromise, since I don't think it's needed. Need to verify.
closeAfterPromises.add(channel.closeFuture());
}

// Wait for all of the attempts to close connections gracefully, or max of 30 secs each.
Promise<Void> combinedCloseAfterPromise = executor.newPromise();
closeAfterPromises.finish(combinedCloseAfterPromise);
combinedCloseAfterPromise.await(30, TimeUnit.SECONDS);

// Close all of the remaining active connections.
LOG.warn("Closing remaining active client channels.");
List<ChannelFuture> forceCloseFutures = new ArrayList<>();
channels.forEach(channel -> {
if (channel.isActive()) {
ChannelFuture f = channel.pipeline().close();
forceCloseFutures.add(f);
}
});

LOG.warn("Waiting for " + forceCloseFutures.size() + " client channels to be closed.");
PromiseCombiner closePromisesCombiner = new PromiseCombiner(ImmediateEventExecutor.INSTANCE);
closePromisesCombiner.addAll(forceCloseFutures.toArray(new ChannelFuture[0]));
Promise<Void> combinedClosePromise = executor.newPromise();
closePromisesCombiner.finish(combinedClosePromise);
combinedClosePromise.await(5, TimeUnit.SECONDS);
LOG.warn(forceCloseFutures.size() + " client channels closed.");
}
catch (InterruptedException ie) {
LOG.warn("Interrupted while shutting down client channels");
}
//rev1
}
}