diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/ChannelFinder.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/ChannelFinder.java index 0e3ede5d73..6c4554ea77 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/ChannelFinder.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/ChannelFinder.java @@ -66,20 +66,28 @@ public void update(CacheUpdate update) { } public ChannelEndpoint findServer(ReadRequest.Builder reqBuilder) { + return findServer(reqBuilder, preferLeader(reqBuilder.getTransaction())); + } + + public ChannelEndpoint findServer(ReadRequest.Builder reqBuilder, boolean preferLeader) { recipeCache.computeKeys(reqBuilder); return fillRoutingHint( - reqBuilder.getTransaction(), - reqBuilder.getDirectedReadOptions(), + preferLeader, KeyRangeCache.RangeMode.COVERING_SPLIT, + reqBuilder.getDirectedReadOptions(), reqBuilder.getRoutingHintBuilder()); } public ChannelEndpoint findServer(ExecuteSqlRequest.Builder reqBuilder) { + return findServer(reqBuilder, preferLeader(reqBuilder.getTransaction())); + } + + public ChannelEndpoint findServer(ExecuteSqlRequest.Builder reqBuilder, boolean preferLeader) { recipeCache.computeKeys(reqBuilder); return fillRoutingHint( - reqBuilder.getTransaction(), - reqBuilder.getDirectedReadOptions(), + preferLeader, KeyRangeCache.RangeMode.PICK_RANDOM, + reqBuilder.getDirectedReadOptions(), reqBuilder.getRoutingHintBuilder()); } diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/KeyAwareChannel.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/KeyAwareChannel.java index 3014b9db46..382ccead71 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/KeyAwareChannel.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/KeyAwareChannel.java @@ -18,6 +18,8 @@ import com.google.api.core.InternalApi; import com.google.api.gax.grpc.InstantiatingGrpcChannelProvider; +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; import com.google.protobuf.ByteString; import com.google.spanner.v1.BeginTransactionRequest; import com.google.spanner.v1.CommitRequest; @@ -51,6 +53,7 @@ */ @InternalApi final class KeyAwareChannel extends ManagedChannel { + private static final long MAX_TRACKED_READ_ONLY_TRANSACTIONS = 100_000L; private static final String STREAMING_READ_METHOD = "google.spanner.v1.Spanner/StreamingRead"; private static final String STREAMING_SQL_METHOD = "google.spanner.v1.Spanner/ExecuteStreamingSql"; @@ -67,6 +70,11 @@ final class KeyAwareChannel extends ManagedChannel { private final Map> channelFinders = new ConcurrentHashMap<>(); private final Map transactionAffinities = new ConcurrentHashMap<>(); + // Maps read-only transaction IDs to their preferLeader value. + // Strong reads → true (prefer leader), Stale reads → false (any replica). + // Bounded to prevent unbounded growth if application code does not close read-only transactions. + private final Cache readOnlyTxPreferLeader = + CacheBuilder.newBuilder().maximumSize(MAX_TRACKED_READ_ONLY_TRANSACTIONS).build(); private KeyAwareChannel( InstantiatingGrpcChannelProvider channelProvider, @@ -184,12 +192,34 @@ private void clearAffinity(ByteString transactionId) { return; } transactionAffinities.remove(transactionId); + readOnlyTxPreferLeader.invalidate(transactionId); } void clearTransactionAffinity(ByteString transactionId) { clearAffinity(transactionId); } + private boolean isReadOnlyTransaction(ByteString transactionId) { + return transactionId != null + && !transactionId.isEmpty() + && readOnlyTxPreferLeader.getIfPresent(transactionId) != null; + } + + @Nullable + private Boolean readOnlyPreferLeader(ByteString transactionId) { + if (transactionId == null || transactionId.isEmpty()) { + return null; + } + return readOnlyTxPreferLeader.getIfPresent(transactionId); + } + + private void trackReadOnlyTransaction(ByteString transactionId, boolean preferLeader) { + if (transactionId == null || transactionId.isEmpty()) { + return; + } + readOnlyTxPreferLeader.put(transactionId, preferLeader); + } + private void recordAffinity( ByteString transactionId, @Nullable ChannelEndpoint endpoint, boolean allowDefault) { if (transactionId == null || transactionId.isEmpty() || endpoint == null) { @@ -250,6 +280,8 @@ static final class KeyAwareClientCall @Nullable private Boolean pendingMessageCompression; @Nullable private io.grpc.Status cancelledStatus; @Nullable private Metadata cancelledTrailers; + private boolean isReadOnlyBegin; + private boolean readOnlyIsStrong; private final Object lock = new Object(); KeyAwareClientCall( @@ -307,12 +339,14 @@ public void sendMessage(RequestT message) { if (message instanceof ReadRequest) { ReadRequest.Builder reqBuilder = ((ReadRequest) message).toBuilder(); + maybeTrackReadOnlyBegin(reqBuilder.getTransaction()); RoutingDecision routing = routeFromRequest(reqBuilder); finder = routing.finder; endpoint = routing.endpoint; message = (RequestT) reqBuilder.build(); } else if (message instanceof ExecuteSqlRequest) { ExecuteSqlRequest.Builder reqBuilder = ((ExecuteSqlRequest) message).toBuilder(); + maybeTrackReadOnlyBegin(reqBuilder.getTransaction()); RoutingDecision routing = routeFromRequest(reqBuilder); finder = routing.finder; endpoint = routing.endpoint; @@ -325,7 +359,12 @@ public void sendMessage(RequestT message) { finder = parentChannel.getOrCreateChannelFinder(databaseId); endpoint = finder.findServer(reqBuilder); } - allowDefaultAffinity = true; + if (reqBuilder.hasOptions() && reqBuilder.getOptions().hasReadOnly()) { + isReadOnlyBegin = true; + readOnlyIsStrong = reqBuilder.getOptions().getReadOnly().getStrong(); + } else { + allowDefaultAffinity = true; + } message = (RequestT) reqBuilder.build(); } else if (message instanceof CommitRequest) { CommitRequest request = (CommitRequest) message; @@ -483,17 +522,31 @@ void maybeClearAffinity() { parentChannel.clearAffinity(transactionIdToClear); } + private void maybeTrackReadOnlyBegin(TransactionSelector selector) { + if (selector.getSelectorCase() == TransactionSelector.SelectorCase.BEGIN + && selector.getBegin().hasReadOnly()) { + isReadOnlyBegin = true; + readOnlyIsStrong = selector.getBegin().getReadOnly().getStrong(); + } + } + private RoutingDecision routeFromRequest(ReadRequest.Builder reqBuilder) { String databaseId = parentChannel.extractDatabaseIdFromSession(reqBuilder.getSession()); ByteString transactionId = transactionIdFromSelector(reqBuilder.getTransaction()); - ChannelEndpoint endpoint = parentChannel.affinityEndpoint(transactionId); + // Skip affinity for read-only transactions so each read routes independently. + boolean isReadOnly = parentChannel.isReadOnlyTransaction(transactionId); + ChannelEndpoint endpoint = isReadOnly ? null : parentChannel.affinityEndpoint(transactionId); ChannelFinder finder = null; if (databaseId != null) { finder = parentChannel.getOrCreateChannelFinder(databaseId); - ChannelEndpoint routed = finder.findServer(reqBuilder); - if (endpoint == null) { - endpoint = routed; - } + } + if (databaseId != null && endpoint == null) { + Boolean preferLeaderOverride = parentChannel.readOnlyPreferLeader(transactionId); + ChannelEndpoint routed = + preferLeaderOverride != null + ? finder.findServer(reqBuilder, preferLeaderOverride) + : finder.findServer(reqBuilder); + endpoint = routed; } return new RoutingDecision(finder, endpoint); } @@ -501,14 +554,20 @@ private RoutingDecision routeFromRequest(ReadRequest.Builder reqBuilder) { private RoutingDecision routeFromRequest(ExecuteSqlRequest.Builder reqBuilder) { String databaseId = parentChannel.extractDatabaseIdFromSession(reqBuilder.getSession()); ByteString transactionId = transactionIdFromSelector(reqBuilder.getTransaction()); - ChannelEndpoint endpoint = parentChannel.affinityEndpoint(transactionId); + // Skip affinity for read-only transactions so each query routes independently. + boolean isReadOnly = parentChannel.isReadOnlyTransaction(transactionId); + ChannelEndpoint endpoint = isReadOnly ? null : parentChannel.affinityEndpoint(transactionId); ChannelFinder finder = null; if (databaseId != null) { finder = parentChannel.getOrCreateChannelFinder(databaseId); - ChannelEndpoint routed = finder.findServer(reqBuilder); - if (endpoint == null) { - endpoint = routed; - } + } + if (databaseId != null && endpoint == null) { + Boolean preferLeaderOverride = parentChannel.readOnlyPreferLeader(transactionId); + ChannelEndpoint routed = + preferLeaderOverride != null + ? finder.findServer(reqBuilder, preferLeaderOverride) + : finder.findServer(reqBuilder); + endpoint = routed; } return new RoutingDecision(finder, endpoint); } @@ -554,7 +613,13 @@ public void onMessage(ResponseT message) { transactionId = transactionIdFromTransaction(response); } if (transactionId != null) { - call.maybeRecordAffinity(transactionId); + if (call.isReadOnlyBegin) { + // Track the read-only transaction so subsequent reads skip affinity + // and route independently based on key-based routing. + call.parentChannel.trackReadOnlyTransaction(transactionId, call.readOnlyIsStrong); + } else if (!call.parentChannel.isReadOnlyTransaction(transactionId)) { + call.maybeRecordAffinity(transactionId); + } } super.onMessage(message); } diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/spi/v1/KeyAwareChannelTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/spi/v1/KeyAwareChannelTest.java index 21c27604c9..123ffba1d4 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/spi/v1/KeyAwareChannelTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/spi/v1/KeyAwareChannelTest.java @@ -28,13 +28,18 @@ import com.google.spanner.v1.CommitResponse; import com.google.spanner.v1.ExecuteSqlRequest; import com.google.spanner.v1.Group; +import com.google.spanner.v1.PartialResultSet; import com.google.spanner.v1.Range; +import com.google.spanner.v1.ReadRequest; import com.google.spanner.v1.ResultSet; +import com.google.spanner.v1.ResultSetMetadata; import com.google.spanner.v1.RollbackRequest; import com.google.spanner.v1.RoutingHint; import com.google.spanner.v1.SpannerGrpc; import com.google.spanner.v1.Tablet; import com.google.spanner.v1.Transaction; +import com.google.spanner.v1.TransactionOptions; +import com.google.spanner.v1.TransactionSelector; import io.grpc.CallOptions; import io.grpc.ClientCall; import io.grpc.ManagedChannel; @@ -269,6 +274,384 @@ public void resultSetCacheUpdateRoutesSubsequentRequest() throws Exception { assertThat(harness.endpointCache.callCountForAddress("routed:1234")).isEqualTo(1); } + @Test + public void readOnlyTransactionRoutesEachReadIndependently() throws Exception { + TestHarness harness = createHarness(); + ByteString transactionId = ByteString.copyFromUtf8("ro-tx-1"); + + // 1. Begin a read-only transaction (stale read). + ClientCall beginCall = + harness.channel.newCall(SpannerGrpc.getBeginTransactionMethod(), CallOptions.DEFAULT); + CapturingListener beginListener = new CapturingListener<>(); + beginCall.start(beginListener, new Metadata()); + beginCall.sendMessage( + BeginTransactionRequest.newBuilder() + .setSession(SESSION) + .setOptions( + TransactionOptions.newBuilder() + .setReadOnly( + TransactionOptions.ReadOnly.newBuilder() + .setReturnReadTimestamp(true) + .build())) + .build()); + + // BeginTransaction goes to default channel. + assertThat(harness.defaultManagedChannel.callCount()).isEqualTo(1); + + @SuppressWarnings("unchecked") + RecordingClientCall beginDelegate = + (RecordingClientCall) + harness.defaultManagedChannel.latestCall(); + beginDelegate.emitOnMessage(Transaction.newBuilder().setId(transactionId).build()); + beginDelegate.emitOnClose(Status.OK, new Metadata()); + + // 2. Populate cache with routing data for two different key ranges. + CacheUpdate cacheUpdate = + CacheUpdate.newBuilder() + .setDatabaseId(7L) + .addRange( + Range.newBuilder() + .setStartKey(bytes("a")) + .setLimitKey(bytes("m")) + .setGroupUid(1L) + .setSplitId(1L) + .setGeneration(bytes("1"))) + .addRange( + Range.newBuilder() + .setStartKey(bytes("m")) + .setLimitKey(bytes("z")) + .setGroupUid(2L) + .setSplitId(2L) + .setGeneration(bytes("1"))) + .addGroup( + Group.newBuilder() + .setGroupUid(1L) + .setGeneration(bytes("1")) + .addTablets( + Tablet.newBuilder() + .setTabletUid(1L) + .setServerAddress("server-a:1234") + .setIncarnation(bytes("1")) + .setDistance(0))) + .addGroup( + Group.newBuilder() + .setGroupUid(2L) + .setGeneration(bytes("1")) + .addTablets( + Tablet.newBuilder() + .setTabletUid(2L) + .setServerAddress("server-b:1234") + .setIncarnation(bytes("1")) + .setDistance(0))) + .build(); + + // Seed the cache via a dummy query response with cache update. + ClientCall seedCall = + harness.channel.newCall(SpannerGrpc.getExecuteSqlMethod(), CallOptions.DEFAULT); + seedCall.start(new CapturingListener(), new Metadata()); + seedCall.sendMessage( + ExecuteSqlRequest.newBuilder() + .setSession(SESSION) + .setRoutingHint(RoutingHint.newBuilder().setKey(bytes("a")).build()) + .build()); + @SuppressWarnings("unchecked") + RecordingClientCall seedDelegate = + (RecordingClientCall) + harness.defaultManagedChannel.latestCall(); + seedDelegate.emitOnMessage(ResultSet.newBuilder().setCacheUpdate(cacheUpdate).build()); + + // 3. Send a streaming read with key in range [a, m) → should go to server-a. + ClientCall readCallA = + harness.channel.newCall(SpannerGrpc.getStreamingReadMethod(), CallOptions.DEFAULT); + readCallA.start(new CapturingListener(), new Metadata()); + readCallA.sendMessage( + ReadRequest.newBuilder() + .setSession(SESSION) + .setTransaction(TransactionSelector.newBuilder().setId(transactionId)) + .setRoutingHint(RoutingHint.newBuilder().setKey(bytes("b")).build()) + .build()); + + assertThat(harness.endpointCache.callCountForAddress("server-a:1234")).isEqualTo(1); + + // 4. Send an ExecuteStreamingSql with key in range [m, z) → should go to server-b. + ClientCall queryCallB = + harness.channel.newCall(SpannerGrpc.getExecuteStreamingSqlMethod(), CallOptions.DEFAULT); + queryCallB.start(new CapturingListener(), new Metadata()); + queryCallB.sendMessage( + ExecuteSqlRequest.newBuilder() + .setSession(SESSION) + .setTransaction(TransactionSelector.newBuilder().setId(transactionId)) + .setRoutingHint(RoutingHint.newBuilder().setKey(bytes("n")).build()) + .build()); + + assertThat(harness.endpointCache.callCountForAddress("server-b:1234")).isEqualTo(1); + + // Neither read was pinned to the default host (besides the initial begin + seed). + // default had: 1 begin + 1 seed = 2 calls + assertThat(harness.defaultManagedChannel.callCount()).isEqualTo(2); + } + + @Test + public void readOnlyInlinedBeginExecuteSqlRoutesSubsequentRequestsIndependently() + throws Exception { + TestHarness harness = createHarness(); + ByteString transactionId = ByteString.copyFromUtf8("ro-inline-sql"); + + seedCache(harness, createTwoRangeCacheUpdate()); + + // First query begins a read-only transaction inline and routes to server-a. + ClientCall firstCall = + harness.channel.newCall(SpannerGrpc.getExecuteSqlMethod(), CallOptions.DEFAULT); + firstCall.start(new CapturingListener(), new Metadata()); + firstCall.sendMessage( + ExecuteSqlRequest.newBuilder() + .setSession(SESSION) + .setTransaction( + TransactionSelector.newBuilder() + .setBegin( + TransactionOptions.newBuilder() + .setReadOnly( + TransactionOptions.ReadOnly.newBuilder() + .setReturnReadTimestamp(true) + .build()) + .build())) + .setRoutingHint(RoutingHint.newBuilder().setKey(bytes("b")).build()) + .build()); + + assertThat(harness.endpointCache.callCountForAddress("server-a:1234")).isEqualTo(1); + + @SuppressWarnings("unchecked") + RecordingClientCall firstDelegate = + (RecordingClientCall) + harness.endpointCache.latestCallForAddress("server-a:1234"); + firstDelegate.emitOnMessage( + ResultSet.newBuilder() + .setMetadata( + ResultSetMetadata.newBuilder() + .setTransaction(Transaction.newBuilder().setId(transactionId))) + .build()); + + // Second query in same txn should route by key to server-b, not affinity-pin to server-a. + ClientCall secondCall = + harness.channel.newCall(SpannerGrpc.getExecuteSqlMethod(), CallOptions.DEFAULT); + secondCall.start(new CapturingListener(), new Metadata()); + secondCall.sendMessage( + ExecuteSqlRequest.newBuilder() + .setSession(SESSION) + .setTransaction(TransactionSelector.newBuilder().setId(transactionId)) + .setRoutingHint(RoutingHint.newBuilder().setKey(bytes("n")).build()) + .build()); + + assertThat(harness.endpointCache.callCountForAddress("server-a:1234")).isEqualTo(1); + assertThat(harness.endpointCache.callCountForAddress("server-b:1234")).isEqualTo(1); + assertThat(harness.defaultManagedChannel.callCount()).isEqualTo(1); + } + + @Test + public void readOnlyInlinedBeginReadRoutesSubsequentRequestsIndependently() throws Exception { + TestHarness harness = createHarness(); + ByteString transactionId = ByteString.copyFromUtf8("ro-inline-read"); + + seedCache(harness, createTwoRangeCacheUpdate()); + + // First read begins a read-only transaction inline and routes to server-a. + ClientCall firstCall = + harness.channel.newCall(SpannerGrpc.getStreamingReadMethod(), CallOptions.DEFAULT); + firstCall.start(new CapturingListener(), new Metadata()); + firstCall.sendMessage( + ReadRequest.newBuilder() + .setSession(SESSION) + .setTransaction( + TransactionSelector.newBuilder() + .setBegin( + TransactionOptions.newBuilder() + .setReadOnly( + TransactionOptions.ReadOnly.newBuilder() + .setReturnReadTimestamp(true) + .build()) + .build())) + .setRoutingHint(RoutingHint.newBuilder().setKey(bytes("b")).build()) + .build()); + + assertThat(harness.endpointCache.callCountForAddress("server-a:1234")).isEqualTo(1); + + @SuppressWarnings("unchecked") + RecordingClientCall firstDelegate = + (RecordingClientCall) + harness.endpointCache.latestCallForAddress("server-a:1234"); + firstDelegate.emitOnMessage( + PartialResultSet.newBuilder() + .setMetadata( + ResultSetMetadata.newBuilder() + .setTransaction(Transaction.newBuilder().setId(transactionId))) + .build()); + + // Second read in same txn should route by key to server-b, not affinity-pin to server-a. + ClientCall secondCall = + harness.channel.newCall(SpannerGrpc.getStreamingReadMethod(), CallOptions.DEFAULT); + secondCall.start(new CapturingListener(), new Metadata()); + secondCall.sendMessage( + ReadRequest.newBuilder() + .setSession(SESSION) + .setTransaction(TransactionSelector.newBuilder().setId(transactionId)) + .setRoutingHint(RoutingHint.newBuilder().setKey(bytes("n")).build()) + .build()); + + assertThat(harness.endpointCache.callCountForAddress("server-a:1234")).isEqualTo(1); + assertThat(harness.endpointCache.callCountForAddress("server-b:1234")).isEqualTo(1); + assertThat(harness.defaultManagedChannel.callCount()).isEqualTo(1); + } + + @Test + public void readOnlyTransactionDoesNotRecordAffinity() throws Exception { + TestHarness harness = createHarness(); + ByteString transactionId = ByteString.copyFromUtf8("ro-tx-2"); + + // Begin a read-only transaction. + ClientCall beginCall = + harness.channel.newCall(SpannerGrpc.getBeginTransactionMethod(), CallOptions.DEFAULT); + beginCall.start(new CapturingListener(), new Metadata()); + beginCall.sendMessage( + BeginTransactionRequest.newBuilder() + .setSession(SESSION) + .setOptions( + TransactionOptions.newBuilder() + .setReadOnly( + TransactionOptions.ReadOnly.newBuilder() + .setReturnReadTimestamp(true) + .build())) + .build()); + + @SuppressWarnings("unchecked") + RecordingClientCall beginDelegate = + (RecordingClientCall) + harness.defaultManagedChannel.latestCall(); + beginDelegate.emitOnMessage(Transaction.newBuilder().setId(transactionId).build()); + beginDelegate.emitOnClose(Status.OK, new Metadata()); + + // No affinity should be recorded for the default endpoint. + // Verify by checking that the endpoint cache was never queried for affinity lookup. + // The default endpoint getCount tracks affinity lookups. + assertThat(harness.endpointCache.getCount(DEFAULT_ADDRESS)).isEqualTo(0); + + // Send a read using the transaction ID (no cache populated, so falls back to default). + ClientCall readCall = + harness.channel.newCall(SpannerGrpc.getExecuteSqlMethod(), CallOptions.DEFAULT); + readCall.start(new CapturingListener(), new Metadata()); + readCall.sendMessage( + ExecuteSqlRequest.newBuilder() + .setSession(SESSION) + .setTransaction(TransactionSelector.newBuilder().setId(transactionId)) + .build()); + + // The read goes to default (no cache data), but NOT because of affinity. + // No affinity lookup should have been performed for the read-only txn. + assertThat(harness.endpointCache.getCount(DEFAULT_ADDRESS)).isEqualTo(0); + + // Now receive a response with the transaction ID — should NOT record affinity. + @SuppressWarnings("unchecked") + RecordingClientCall readDelegate = + (RecordingClientCall) + harness.defaultManagedChannel.latestCall(); + readDelegate.emitOnMessage( + ResultSet.newBuilder() + .setMetadata( + ResultSetMetadata.newBuilder() + .setTransaction(Transaction.newBuilder().setId(transactionId))) + .build()); + + // Still no affinity recorded. + assertThat(harness.endpointCache.getCount(DEFAULT_ADDRESS)).isEqualTo(0); + } + + @Test + public void readOnlyTransactionCleanupOnClose() throws Exception { + TestHarness harness = createHarness(); + ByteString transactionId = ByteString.copyFromUtf8("ro-tx-3"); + + // Begin a read-only transaction. + ClientCall beginCall = + harness.channel.newCall(SpannerGrpc.getBeginTransactionMethod(), CallOptions.DEFAULT); + beginCall.start(new CapturingListener(), new Metadata()); + beginCall.sendMessage( + BeginTransactionRequest.newBuilder() + .setSession(SESSION) + .setOptions( + TransactionOptions.newBuilder() + .setReadOnly( + TransactionOptions.ReadOnly.newBuilder() + .setReturnReadTimestamp(true) + .build())) + .build()); + + @SuppressWarnings("unchecked") + RecordingClientCall beginDelegate = + (RecordingClientCall) + harness.defaultManagedChannel.latestCall(); + beginDelegate.emitOnMessage(Transaction.newBuilder().setId(transactionId).build()); + beginDelegate.emitOnClose(Status.OK, new Metadata()); + + // Clear transaction affinity (simulates MultiUseReadOnlyTransaction.close()). + harness.channel.clearTransactionAffinity(transactionId); + } + + private static CacheUpdate createTwoRangeCacheUpdate() { + return CacheUpdate.newBuilder() + .setDatabaseId(7L) + .addRange( + Range.newBuilder() + .setStartKey(bytes("a")) + .setLimitKey(bytes("m")) + .setGroupUid(1L) + .setSplitId(1L) + .setGeneration(bytes("1"))) + .addRange( + Range.newBuilder() + .setStartKey(bytes("m")) + .setLimitKey(bytes("z")) + .setGroupUid(2L) + .setSplitId(2L) + .setGeneration(bytes("1"))) + .addGroup( + Group.newBuilder() + .setGroupUid(1L) + .setGeneration(bytes("1")) + .addTablets( + Tablet.newBuilder() + .setTabletUid(1L) + .setServerAddress("server-a:1234") + .setIncarnation(bytes("1")) + .setDistance(0))) + .addGroup( + Group.newBuilder() + .setGroupUid(2L) + .setGeneration(bytes("1")) + .addTablets( + Tablet.newBuilder() + .setTabletUid(2L) + .setServerAddress("server-b:1234") + .setIncarnation(bytes("1")) + .setDistance(0))) + .build(); + } + + private static void seedCache(TestHarness harness, CacheUpdate cacheUpdate) { + ClientCall seedCall = + harness.channel.newCall(SpannerGrpc.getExecuteSqlMethod(), CallOptions.DEFAULT); + seedCall.start(new CapturingListener(), new Metadata()); + seedCall.sendMessage( + ExecuteSqlRequest.newBuilder() + .setSession(SESSION) + .setRoutingHint(RoutingHint.newBuilder().setKey(bytes("a")).build()) + .build()); + + @SuppressWarnings("unchecked") + RecordingClientCall seedDelegate = + (RecordingClientCall) + harness.defaultManagedChannel.latestCall(); + seedDelegate.emitOnMessage(ResultSet.newBuilder().setCacheUpdate(cacheUpdate).build()); + } + private static TestHarness createHarness() throws IOException { FakeEndpointCache endpointCache = new FakeEndpointCache(DEFAULT_ADDRESS); InstantiatingGrpcChannelProvider provider = @@ -359,6 +742,17 @@ int callCountForAddress(String address) { FakeEndpoint endpoint = endpoints.get(address); return endpoint == null ? 0 : endpoint.channel.callCount(); } + + RecordingClientCall latestCallForAddress(String address) { + if (defaultAddress.equals(address)) { + return defaultEndpoint.channel.latestCall(); + } + FakeEndpoint endpoint = endpoints.get(address); + if (endpoint == null) { + throw new IllegalStateException("No endpoint for address: " + address); + } + return endpoint.channel.latestCall(); + } } private static final class FakeEndpoint implements ChannelEndpoint {