Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -66,20 +66,28 @@ public void update(CacheUpdate update) {
}

public ChannelEndpoint findServer(ReadRequest.Builder reqBuilder) {
return findServer(reqBuilder, preferLeader(reqBuilder.getTransaction()));
}

public ChannelEndpoint findServer(ReadRequest.Builder reqBuilder, boolean preferLeader) {
recipeCache.computeKeys(reqBuilder);
return fillRoutingHint(
reqBuilder.getTransaction(),
reqBuilder.getDirectedReadOptions(),
preferLeader,
KeyRangeCache.RangeMode.COVERING_SPLIT,
reqBuilder.getDirectedReadOptions(),
reqBuilder.getRoutingHintBuilder());
}

public ChannelEndpoint findServer(ExecuteSqlRequest.Builder reqBuilder) {
return findServer(reqBuilder, preferLeader(reqBuilder.getTransaction()));
}

public ChannelEndpoint findServer(ExecuteSqlRequest.Builder reqBuilder, boolean preferLeader) {
recipeCache.computeKeys(reqBuilder);
return fillRoutingHint(
reqBuilder.getTransaction(),
reqBuilder.getDirectedReadOptions(),
preferLeader,
KeyRangeCache.RangeMode.PICK_RANDOM,
reqBuilder.getDirectedReadOptions(),
reqBuilder.getRoutingHintBuilder());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

import com.google.api.core.InternalApi;
import com.google.api.gax.grpc.InstantiatingGrpcChannelProvider;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.protobuf.ByteString;
import com.google.spanner.v1.BeginTransactionRequest;
import com.google.spanner.v1.CommitRequest;
Expand Down Expand Up @@ -51,6 +53,7 @@
*/
@InternalApi
final class KeyAwareChannel extends ManagedChannel {
private static final long MAX_TRACKED_READ_ONLY_TRANSACTIONS = 100_000L;
private static final String STREAMING_READ_METHOD = "google.spanner.v1.Spanner/StreamingRead";
private static final String STREAMING_SQL_METHOD =
"google.spanner.v1.Spanner/ExecuteStreamingSql";
Expand All @@ -67,6 +70,11 @@ final class KeyAwareChannel extends ManagedChannel {
private final Map<String, SoftReference<ChannelFinder>> channelFinders =
new ConcurrentHashMap<>();
private final Map<ByteString, String> transactionAffinities = new ConcurrentHashMap<>();
// Maps read-only transaction IDs to their preferLeader value.
// Strong reads → true (prefer leader), Stale reads → false (any replica).
// Bounded to prevent unbounded growth if application code does not close read-only transactions.
private final Cache<ByteString, Boolean> readOnlyTxPreferLeader =
CacheBuilder.newBuilder().maximumSize(MAX_TRACKED_READ_ONLY_TRANSACTIONS).build();

private KeyAwareChannel(
InstantiatingGrpcChannelProvider channelProvider,
Expand Down Expand Up @@ -184,12 +192,34 @@ private void clearAffinity(ByteString transactionId) {
return;
}
transactionAffinities.remove(transactionId);
readOnlyTxPreferLeader.invalidate(transactionId);
}

void clearTransactionAffinity(ByteString transactionId) {
clearAffinity(transactionId);
}

private boolean isReadOnlyTransaction(ByteString transactionId) {
return transactionId != null
&& !transactionId.isEmpty()
&& readOnlyTxPreferLeader.getIfPresent(transactionId) != null;
}

@Nullable
private Boolean readOnlyPreferLeader(ByteString transactionId) {
if (transactionId == null || transactionId.isEmpty()) {
return null;
}
return readOnlyTxPreferLeader.getIfPresent(transactionId);
}

private void trackReadOnlyTransaction(ByteString transactionId, boolean preferLeader) {
if (transactionId == null || transactionId.isEmpty()) {
return;
}
readOnlyTxPreferLeader.put(transactionId, preferLeader);
}

private void recordAffinity(
ByteString transactionId, @Nullable ChannelEndpoint endpoint, boolean allowDefault) {
if (transactionId == null || transactionId.isEmpty() || endpoint == null) {
Expand Down Expand Up @@ -250,6 +280,8 @@ static final class KeyAwareClientCall<RequestT, ResponseT>
@Nullable private Boolean pendingMessageCompression;
@Nullable private io.grpc.Status cancelledStatus;
@Nullable private Metadata cancelledTrailers;
private boolean isReadOnlyBegin;
private boolean readOnlyIsStrong;
private final Object lock = new Object();

KeyAwareClientCall(
Expand Down Expand Up @@ -307,12 +339,14 @@ public void sendMessage(RequestT message) {

if (message instanceof ReadRequest) {
ReadRequest.Builder reqBuilder = ((ReadRequest) message).toBuilder();
maybeTrackReadOnlyBegin(reqBuilder.getTransaction());
RoutingDecision routing = routeFromRequest(reqBuilder);
finder = routing.finder;
endpoint = routing.endpoint;
message = (RequestT) reqBuilder.build();
} else if (message instanceof ExecuteSqlRequest) {
ExecuteSqlRequest.Builder reqBuilder = ((ExecuteSqlRequest) message).toBuilder();
maybeTrackReadOnlyBegin(reqBuilder.getTransaction());
RoutingDecision routing = routeFromRequest(reqBuilder);
finder = routing.finder;
endpoint = routing.endpoint;
Expand All @@ -325,7 +359,12 @@ public void sendMessage(RequestT message) {
finder = parentChannel.getOrCreateChannelFinder(databaseId);
endpoint = finder.findServer(reqBuilder);
}
allowDefaultAffinity = true;
if (reqBuilder.hasOptions() && reqBuilder.getOptions().hasReadOnly()) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that we should add similar logic for ExecuteSqlRequest and ReadRequest, as those can also include an inlined BeginTransactionOption. That might not be something that we use today for read-only transactions, but it is likely to change in the future. (Alternatively, we should have a test that fails if we see an ExecuteSqlRequest with an inlined begin. That way we get a signal if this part of the implementation changes in the future, and are forced to fix it here then)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added support for both explicit begin and inline begin with ExecuteSqlRequest and ReadRequest

isReadOnlyBegin = true;
readOnlyIsStrong = reqBuilder.getOptions().getReadOnly().getStrong();
} else {
allowDefaultAffinity = true;
}
message = (RequestT) reqBuilder.build();
} else if (message instanceof CommitRequest) {
CommitRequest request = (CommitRequest) message;
Expand Down Expand Up @@ -483,32 +522,52 @@ void maybeClearAffinity() {
parentChannel.clearAffinity(transactionIdToClear);
}

private void maybeTrackReadOnlyBegin(TransactionSelector selector) {
if (selector.getSelectorCase() == TransactionSelector.SelectorCase.BEGIN
&& selector.getBegin().hasReadOnly()) {
isReadOnlyBegin = true;
readOnlyIsStrong = selector.getBegin().getReadOnly().getStrong();
}
}

private RoutingDecision routeFromRequest(ReadRequest.Builder reqBuilder) {
String databaseId = parentChannel.extractDatabaseIdFromSession(reqBuilder.getSession());
ByteString transactionId = transactionIdFromSelector(reqBuilder.getTransaction());
ChannelEndpoint endpoint = parentChannel.affinityEndpoint(transactionId);
// Skip affinity for read-only transactions so each read routes independently.
boolean isReadOnly = parentChannel.isReadOnlyTransaction(transactionId);
ChannelEndpoint endpoint = isReadOnly ? null : parentChannel.affinityEndpoint(transactionId);
ChannelFinder finder = null;
if (databaseId != null) {
finder = parentChannel.getOrCreateChannelFinder(databaseId);
ChannelEndpoint routed = finder.findServer(reqBuilder);
if (endpoint == null) {
endpoint = routed;
}
}
if (databaseId != null && endpoint == null) {
Boolean preferLeaderOverride = parentChannel.readOnlyPreferLeader(transactionId);
ChannelEndpoint routed =
preferLeaderOverride != null
? finder.findServer(reqBuilder, preferLeaderOverride)
: finder.findServer(reqBuilder);
endpoint = routed;
}
return new RoutingDecision(finder, endpoint);
}

private RoutingDecision routeFromRequest(ExecuteSqlRequest.Builder reqBuilder) {
String databaseId = parentChannel.extractDatabaseIdFromSession(reqBuilder.getSession());
ByteString transactionId = transactionIdFromSelector(reqBuilder.getTransaction());
ChannelEndpoint endpoint = parentChannel.affinityEndpoint(transactionId);
// Skip affinity for read-only transactions so each query routes independently.
boolean isReadOnly = parentChannel.isReadOnlyTransaction(transactionId);
ChannelEndpoint endpoint = isReadOnly ? null : parentChannel.affinityEndpoint(transactionId);
ChannelFinder finder = null;
if (databaseId != null) {
finder = parentChannel.getOrCreateChannelFinder(databaseId);
ChannelEndpoint routed = finder.findServer(reqBuilder);
if (endpoint == null) {
endpoint = routed;
}
}
if (databaseId != null && endpoint == null) {
Boolean preferLeaderOverride = parentChannel.readOnlyPreferLeader(transactionId);
ChannelEndpoint routed =
preferLeaderOverride != null
? finder.findServer(reqBuilder, preferLeaderOverride)
: finder.findServer(reqBuilder);
endpoint = routed;
}
return new RoutingDecision(finder, endpoint);
}
Expand Down Expand Up @@ -554,7 +613,13 @@ public void onMessage(ResponseT message) {
transactionId = transactionIdFromTransaction(response);
}
if (transactionId != null) {
call.maybeRecordAffinity(transactionId);
if (call.isReadOnlyBegin) {
// Track the read-only transaction so subsequent reads skip affinity
// and route independently based on key-based routing.
call.parentChannel.trackReadOnlyTransaction(transactionId, call.readOnlyIsStrong);
} else if (!call.parentChannel.isReadOnlyTransaction(transactionId)) {
call.maybeRecordAffinity(transactionId);
}
}
super.onMessage(message);
}
Expand Down
Loading
Loading