Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
153 changes: 47 additions & 106 deletions core/src/main/java/io/grpc/internal/ManagedChannelImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@
import io.grpc.internal.RetriableStream.ChannelBufferMeter;
import io.grpc.internal.RetriableStream.Throttle;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
Expand Down Expand Up @@ -167,11 +168,9 @@ public Result selectConfig(PickSubchannelArgs args) {
@Nullable
private final ChannelCredentials originalChannelCreds;
private final ClientTransportFactory transportFactory;
private final ClientTransportFactory oobTransportFactory;
private final RestrictedScheduledExecutor scheduledExecutor;
private final Executor executor;
private final ObjectPool<? extends Executor> executorPool;
private final ObjectPool<? extends Executor> balancerRpcExecutorPool;
private final ExecutorHolder balancerRpcExecutorHolder;
private final ExecutorHolder offloadExecutorHolder;
private final TimeProvider timeProvider;
Expand Down Expand Up @@ -240,9 +239,6 @@ public void uncaughtException(Thread t, Throwable e) {
private Collection<RealChannel.PendingCall<?, ?>> pendingCalls;
private final Object pendingCallsInUseObject = new Object();

// Must be mutated from syncContext
private final Set<OobChannel> oobChannels = new HashSet<>(1, .75f);

// reprocess() must be run from syncContext
private final DelayedClientTransport delayedTransport;
private final UncommittedRetriableStreamsRegistry uncommittedRetriableStreamsRegistry
Expand Down Expand Up @@ -312,9 +308,6 @@ private void maybeShutdownNowSubchannels() {
for (InternalSubchannel subchannel : subchannels) {
subchannel.shutdownNow(SHUTDOWN_NOW_STATUS);
}
for (OobChannel oobChannel : oobChannels) {
oobChannel.getInternalSubchannel().shutdownNow(SHUTDOWN_NOW_STATUS);
}
}
}

Expand All @@ -334,7 +327,6 @@ public void run() {
builder.setTarget(target).setState(channelStateManager.getState());
List<InternalWithLogId> children = new ArrayList<>();
children.addAll(subchannels);
children.addAll(oobChannels);
builder.setSubchannels(children);
ret.set(builder.build());
}
Expand Down Expand Up @@ -564,8 +556,6 @@ ClientStream newSubstream(
new ExecutorHolder(checkNotNull(builder.offloadExecutorPool, "offloadExecutorPool"));
this.transportFactory = new CallCredentialsApplyingTransportFactory(
clientTransportFactory, builder.callCredentials, this.offloadExecutorHolder);
this.oobTransportFactory = new CallCredentialsApplyingTransportFactory(
clientTransportFactory, null, this.offloadExecutorHolder);
this.scheduledExecutor =
new RestrictedScheduledExecutor(transportFactory.getScheduledExecutorService());
maxTraceEvents = builder.maxTraceEvents;
Expand Down Expand Up @@ -604,8 +594,8 @@ ClientStream newSubstream(
this.nameResolverArgs = nameResolverArgsBuilder.build();
this.nameResolver = getNameResolver(
targetUri, authorityOverride, nameResolverProvider, nameResolverArgs);
this.balancerRpcExecutorPool = checkNotNull(balancerRpcExecutorPool, "balancerRpcExecutorPool");
this.balancerRpcExecutorHolder = new ExecutorHolder(balancerRpcExecutorPool);
this.balancerRpcExecutorHolder = new ExecutorHolder(
checkNotNull(balancerRpcExecutorPool, "balancerRpcExecutorPool"));
this.delayedTransport = new DelayedClientTransport(this.executor, this.syncContext);
this.delayedTransport.start(delayedTransportListener);
this.backoffPolicyProvider = backoffPolicyProvider;
Expand Down Expand Up @@ -1187,7 +1177,7 @@ private void maybeTerminateChannel() {
if (terminated) {
return;
}
if (shutdown.get() && subchannels.isEmpty() && oobChannels.isEmpty()) {
if (shutdown.get() && subchannels.isEmpty()) {
channelLogger.log(ChannelLogLevel.INFO, "Terminated");
channelz.removeRootChannel(this);
executorPool.returnObject(executor);
Expand All @@ -1201,13 +1191,6 @@ private void maybeTerminateChannel() {
}
}

// Must be called from syncContext
private void handleInternalSubchannelState(ConnectivityStateInfo newState) {
if (newState.getState() == TRANSIENT_FAILURE || newState.getState() == IDLE) {
refreshNameResolution();
}
}

@Override
public ConnectivityState getState(boolean requestConnection) {
ConnectivityState savedChannelState = channelStateManager.getState();
Expand Down Expand Up @@ -1253,9 +1236,6 @@ public void run() {
for (InternalSubchannel subchannel : subchannels) {
subchannel.resetConnectBackoff();
}
for (OobChannel oobChannel : oobChannels) {
oobChannel.resetConnectBackoff();
}
}
}

Expand Down Expand Up @@ -1413,86 +1393,28 @@ public ManagedChannel createOobChannel(EquivalentAddressGroup addressGroup, Stri
@Override
public ManagedChannel createOobChannel(List<EquivalentAddressGroup> addressGroup,
String authority) {
// TODO(ejona): can we be even stricter? Like terminating?
checkState(!terminated, "Channel is terminated");
long oobChannelCreationTime = timeProvider.currentTimeNanos();
InternalLogId oobLogId = InternalLogId.allocate("OobChannel", /*details=*/ null);
InternalLogId subchannelLogId =
InternalLogId.allocate("Subchannel-OOB", /*details=*/ authority);
ChannelTracer oobChannelTracer =
new ChannelTracer(
oobLogId, maxTraceEvents, oobChannelCreationTime,
"OobChannel for " + addressGroup);
final OobChannel oobChannel = new OobChannel(
authority, balancerRpcExecutorPool, oobTransportFactory.getScheduledExecutorService(),
syncContext, callTracerFactory.create(), oobChannelTracer, channelz, timeProvider);
channelTracer.reportEvent(new ChannelTrace.Event.Builder()
.setDescription("Child OobChannel created")
.setSeverity(ChannelTrace.Event.Severity.CT_INFO)
.setTimestampNanos(oobChannelCreationTime)
.setChannelRef(oobChannel)
.build());
ChannelTracer subchannelTracer =
new ChannelTracer(subchannelLogId, maxTraceEvents, oobChannelCreationTime,
"Subchannel for " + addressGroup);
ChannelLogger subchannelLogger = new ChannelLoggerImpl(subchannelTracer, timeProvider);
final class ManagedOobChannelCallback extends InternalSubchannel.Callback {
@Override
void onTerminated(InternalSubchannel is) {
oobChannels.remove(oobChannel);
channelz.removeSubchannel(is);
oobChannel.handleSubchannelTerminated();
maybeTerminateChannel();
}

@Override
void onStateChange(InternalSubchannel is, ConnectivityStateInfo newState) {
// TODO(chengyuanzhang): change to let LB policies explicitly manage OOB channel's
// state and refresh name resolution if necessary.
handleInternalSubchannelState(newState);
oobChannel.handleSubchannelStateChange(newState);
}
}

final InternalSubchannel internalSubchannel = new InternalSubchannel(
CreateSubchannelArgs.newBuilder().setAddresses(addressGroup).build(),
authority, userAgent, backoffPolicyProvider, oobTransportFactory,
oobTransportFactory.getScheduledExecutorService(), stopwatchSupplier, syncContext,
// All callback methods are run from syncContext
new ManagedOobChannelCallback(),
channelz,
callTracerFactory.create(),
subchannelTracer,
subchannelLogId,
subchannelLogger,
transportFilters,
target,
lbHelper.getMetricRecorder());
oobChannelTracer.reportEvent(new ChannelTrace.Event.Builder()
.setDescription("Child Subchannel created")
.setSeverity(ChannelTrace.Event.Severity.CT_INFO)
.setTimestampNanos(oobChannelCreationTime)
.setSubchannelRef(internalSubchannel)
.build());
channelz.addSubchannel(oobChannel);
channelz.addSubchannel(internalSubchannel);
oobChannel.setSubchannel(internalSubchannel);
final class AddOobChannel implements Runnable {
@Override
public void run() {
if (terminating) {
oobChannel.shutdown();
}
if (!terminated) {
// If channel has not terminated, it will track the subchannel and block termination
// for it.
oobChannels.add(oobChannel);
}
}
}

syncContext.execute(new AddOobChannel());
return oobChannel;
NameResolverRegistry nameResolverRegistry = new NameResolverRegistry();
OobNameResolverProvider resolverProvider =
new OobNameResolverProvider(authority, addressGroup, syncContext);
nameResolverRegistry.register(resolverProvider);
// We could use a hard-coded target, as the name resolver won't actually use this string.
// However, that would make debugging less clear, as we use the target to identify the
// channel.
String target;
try {
target = new URI("oob", "", "/" + authority, null, null).toString();
} catch (URISyntaxException ex) {
// Any special characters in the path will be percent encoded. So this should be impossible.
throw new AssertionError(ex);
}
ManagedChannel delegate = createResolvingOobChannelBuilder(
target, new DefaultChannelCreds(), nameResolverRegistry)
// TODO(zdapeng): executors should not outlive the parent channel.
.executor(balancerRpcExecutorHolder.getExecutor())
.idleTimeout(Integer.MAX_VALUE, TimeUnit.SECONDS)
.disableRetry()
.build();
return new OobChannel(delegate, resolverProvider);
}

@Deprecated
Expand All @@ -1504,11 +1426,17 @@ public ManagedChannelBuilder<?> createResolvingOobChannelBuilder(String target)
.overrideAuthority(getAuthority());
}

// TODO(creamsoup) prevent main channel to shutdown if oob channel is not terminated
// TODO(zdapeng) register the channel as a subchannel of the parent channel in channelz.
@Override
public ManagedChannelBuilder<?> createResolvingOobChannelBuilder(
final String target, final ChannelCredentials channelCreds) {
return createResolvingOobChannelBuilder(target, channelCreds, nameResolverRegistry);
}

// TODO(creamsoup) prevent main channel to shutdown if oob channel is not terminated
// TODO(zdapeng) register the channel as a subchannel of the parent channel in channelz.
private ManagedChannelBuilder<?> createResolvingOobChannelBuilder(
final String target, final ChannelCredentials channelCreds,
NameResolverRegistry nameResolverRegistry) {
checkNotNull(channelCreds, "channelCreds");

final class ResolvingOobChannelBuilder
Expand Down Expand Up @@ -1641,6 +1569,19 @@ public ChannelCredentials withoutBearerTokens() {
}
}

static final class OobChannel extends ForwardingManagedChannel {
private final OobNameResolverProvider resolverProvider;

public OobChannel(ManagedChannel delegate, OobNameResolverProvider resolverProvider) {
super(delegate);
this.resolverProvider = checkNotNull(resolverProvider, "resolverProvider");
}

public void updateAddresses(List<EquivalentAddressGroup> eags) {
resolverProvider.updateAddresses(eags);
}
}

final class NameResolverListener extends NameResolver.Listener2 {
final LbHelperImpl helper;
final NameResolver resolver;
Expand Down
Loading