diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java index d314cb0a8dfa..a8860be63a14 100644 --- a/src/java/org/apache/cassandra/config/Config.java +++ b/src/java/org/apache/cassandra/config/Config.java @@ -1521,4 +1521,29 @@ public enum CQLStartTime public boolean enforce_native_deadline_for_hints = false; public boolean paxos_repair_race_wait = true; + + /** + * If true, gossip state updates for nodes which have left the cluster will continue to be processed while the + * node is still present in ClusterMetadata. This enables the gossip expiry time for those nodes (the deadline + * after which their state is fully purged from gossip) to converge across the remaining nodes in the cluster. + * This is a change from previous behaviour as historically once a node has advertised a LEFT status further + * updates to gossip state for it are ignored for a period of time to prevent flapping if older/stale states + * are encountered. + * Following CEP-21, most significant state changes are handled by the cluster metadata log, so resurrection + * of left nodes is not a problem for gossip to solve and so quarantine is not really necessary. However, + * FailureDetector does still use gossip messages to assess node health and some external systems still use gossip + * state to inform decisions about topology/node health/etc. For those reasons, for now the disabling of quarantine + * is off by default and hot-proppable. + * + * With quarantine still in effect, expiry from gossip of LEFT nodes will occur at different times on each peer. + * Also, when there are LEFT nodes in gossip, the state will never fully converge across the cluster as each node + * will have its own expiry time for a LEFT peer. + * + * With quarantine disabled the STATUS_WITH_PORT values for the left node which include the expiry time will + * converge and peers will all evict it from gossip after the same deadline. + * + * Eventually, this configuration option should be removed and quarantine disabled entirely for clusters running + * 6.0 and later. + */ + public volatile boolean gossip_quarantine_disabled = false; } diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java index 09100da8d3e9..9e28199f7c47 100644 --- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java +++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java @@ -6119,4 +6119,14 @@ public static void setPartitioner(String name) { partitioner = FBUtilities.newPartitioner(name); } + + public static boolean getGossipQuarantineDisabled() + { + return conf.gossip_quarantine_disabled; + } + + public static void setGossipQuarantineDisabled(boolean disabled) + { + conf.gossip_quarantine_disabled = disabled; + } } diff --git a/src/java/org/apache/cassandra/gms/EndpointState.java b/src/java/org/apache/cassandra/gms/EndpointState.java index 26fe33ec698c..41a79caf7f59 100644 --- a/src/java/org/apache/cassandra/gms/EndpointState.java +++ b/src/java/org/apache/cassandra/gms/EndpointState.java @@ -324,7 +324,7 @@ public CassandraVersion getReleaseVersion() public String toString() { View view = ref.get(); - return "EndpointState: HeartBeatState = " + view.hbState + ", AppStateMap = " + view.applicationState; + return "EndpointState: HeartBeatState = " + view.hbState + ", AppStateMap = " + view.applicationState + ", isAlive = " + isAlive; } public boolean isSupersededBy(EndpointState that) diff --git a/src/java/org/apache/cassandra/gms/Gossiper.java b/src/java/org/apache/cassandra/gms/Gossiper.java index 83bf5169af65..6d0195093ffe 100644 --- a/src/java/org/apache/cassandra/gms/Gossiper.java +++ b/src/java/org/apache/cassandra/gms/Gossiper.java @@ -87,6 +87,7 @@ import org.apache.cassandra.net.RequestCallback; import org.apache.cassandra.service.StorageService; import org.apache.cassandra.tcm.ClusterMetadata; +import org.apache.cassandra.tcm.Epoch; import org.apache.cassandra.tcm.membership.NodeId; import org.apache.cassandra.tcm.membership.NodeState; import org.apache.cassandra.utils.FBUtilities; @@ -107,6 +108,10 @@ import static org.apache.cassandra.gms.Gossiper.GossipedWith.SEED; import static org.apache.cassandra.gms.VersionedValue.BOOTSTRAPPING_STATUS; import static org.apache.cassandra.gms.VersionedValue.HIBERNATE; +import static org.apache.cassandra.gms.VersionedValue.REMOVED_TOKEN; +import static org.apache.cassandra.gms.VersionedValue.REMOVING_TOKEN; +import static org.apache.cassandra.gms.VersionedValue.SHUTDOWN; +import static org.apache.cassandra.gms.VersionedValue.STATUS_LEFT; import static org.apache.cassandra.gms.VersionedValue.unsafeMakeVersionedValue; import static org.apache.cassandra.net.NoPayload.noPayload; import static org.apache.cassandra.net.Verb.ECHO_REQ; @@ -137,8 +142,7 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean, private static final ScheduledExecutorPlus executor = executorFactory().scheduled("GossipTasks"); static final ApplicationState[] STATES = ApplicationState.values(); - static final List DEAD_STATES = Arrays.asList(VersionedValue.REMOVING_TOKEN, VersionedValue.REMOVED_TOKEN, - VersionedValue.STATUS_LEFT, VersionedValue.HIBERNATE); + static final List DEAD_STATES = Arrays.asList(REMOVING_TOKEN, REMOVED_TOKEN, STATUS_LEFT, HIBERNATE); static ArrayList SILENT_SHUTDOWN_STATES = new ArrayList<>(); static { @@ -185,7 +189,15 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean, /* map where key is endpoint and value is timestamp when this endpoint was removed from * gossip. We will ignore any gossip regarding these endpoints for QUARANTINE_DELAY time * after removal to prevent nodes from falsely reincarnating during the time when removal - * gossip gets propagated to all nodes */ + * gossip gets propagated to all nodes. + * Note: in future, this need only be used when ClusterMetadataService is in the GOSSIP state, + * i.e. during the major upgrade to the version with CEP-21, but before the CMS is initialized. + * In this state, gossip is still used to propagate changes to broadcast address and release + * version. Once the CMS initialization is complete, this is no longer necessary. + * Currently in order to support a controlled rollout of that change to behaviour, quarantine + * is still used by default, but can be disabled via config (gossip_quarantine_disabled) or + * JMX (GossiperMBean::setQuarantineDisabled) + */ private final Map justRemovedEndpoints = new ConcurrentHashMap<>(); private final Map expireTimeEndpointMap = new ConcurrentHashMap<>(); @@ -450,14 +462,7 @@ private static boolean isShutdown(EndpointState epState) public static boolean isShutdown(VersionedValue vv) { - if (vv == null) - return false; - - String value = vv.value; - String[] pieces = value.split(VersionedValue.DELIMITER_STR, -1); - assert (pieces.length > 0); - String state = pieces[0]; - return state.equals(VersionedValue.SHUTDOWN); + return matchesStatusString(vv, SHUTDOWN); } public static boolean isHibernate(EndpointState epState) @@ -469,15 +474,39 @@ public static boolean isHibernate(EndpointState epState) } public static boolean isHibernate(VersionedValue vv) + { + return matchesStatusString(vv, HIBERNATE); + } + + public static boolean isLeft(VersionedValue vv) + { + return matchesStatusString(vv, STATUS_LEFT); + } + + private static boolean matchesStatusString(VersionedValue vv, String toMatch) { if (vv == null) return false; - String value = vv.value; - String[] pieces = value.split(VersionedValue.DELIMITER_STR, -1); + String[] pieces = vv.splitValue(); assert (pieces.length > 0); String state = pieces[0]; - return state.equals(VersionedValue.HIBERNATE); + return state.equals(toMatch); + } + + public static long extractExpireTime(String[] pieces) + { + if (pieces.length < 3) + return 0L; + try + { + return Long.parseLong(pieces[2]); + } + catch (NumberFormatException e) + { + logger.debug("Invalid value found for expire time ({}), ignoring", pieces[2]); + return 0L; + } } public static void runInGossipStageBlocking(Runnable runnable) @@ -696,10 +725,21 @@ private void quarantineEndpoint(InetAddressAndPort endpoint, long quarantineExpi { if (disableEndpointRemoval) return; + + // Quarantine is only necessary while upgrading from gossip-driven management of cluster metadata + if (getQuarantineDisabled() && ClusterMetadata.current().epoch.isAfter(Epoch.UPGRADE_GOSSIP)) + return; + justRemovedEndpoints.put(endpoint, quarantineExpiration); GossiperDiagnostics.quarantinedEndpoint(this, endpoint, quarantineExpiration); } + public void clearQuarantinedEndpoints() + { + logger.info("Clearing quarantined endpoints"); + justRemovedEndpoints.clear(); + } + /** * The gossip digest is built based on randomization * rather than just looping through the collection of live endpoints. @@ -948,16 +988,7 @@ void doStatusCheck() } // check for dead state removal - long expireTime = getExpireTimeForEndpoint(endpoint); - if (!epState.isAlive() && (now > expireTime) - && (!metadata.directory.allAddresses().contains(endpoint))) - { - if (logger.isDebugEnabled()) - { - logger.debug("time is expiring for endpoint : {} ({})", endpoint, expireTime); - } - runInGossipStageBlocking(() -> evictFromMembership(endpoint)); - } + evictIfExpired(endpoint, epState, metadata.directory, now); } } @@ -975,7 +1006,21 @@ void doStatusCheck() } } - protected long getExpireTimeForEndpoint(InetAddressAndPort endpoint) + @VisibleForTesting + void evictIfExpired(InetAddressAndPort endpoint, EndpointState epState, Directory directory, long now) + { + if (!epState.isAlive() && (!directory.allJoinedEndpoints().contains(endpoint))) + { + long expireTime = getExpireTimeForEndpoint(endpoint); + if (now > expireTime) + { + logger.info("Reached gossip expiry time for endpoint : {} ({})", endpoint, expireTime); + runInGossipStageBlocking(() -> evictFromMembership(endpoint)); + } + } + } + + long getExpireTimeForEndpoint(InetAddressAndPort endpoint) { /* default expireTime is aVeryLongTime */ Long storedTime = expireTimeEndpointMap.get(endpoint); @@ -1897,11 +1942,15 @@ public int getCurrentGenerationNumber(String address) throws UnknownHostExceptio public void addExpireTimeForEndpoint(InetAddressAndPort endpoint, long expireTime) { - if (logger.isDebugEnabled()) + if (expireTime == 0L) + { + logger.debug("Supplied expire time for {} was 0, not recording", endpoint); + } + else { logger.debug("adding expire time for endpoint : {} ({})", endpoint, expireTime); + expireTimeEndpointMap.put(endpoint, expireTime); } - expireTimeEndpointMap.put(endpoint, expireTime); } public static long computeExpireTime() @@ -2104,6 +2153,50 @@ public void unsafeSendLocalEndpointStateTo(InetAddressAndPort ep) MessagingService.instance().send(message, ep); } + public void unsafeBroadcastLeftStatus(InetAddressAndPort left, + Collection tokens, + Iterable sendTo) + { + runInGossipStageBlocking(() -> { + EndpointState epState = endpointStateMap.get(left); + if (epState == null) + { + logger.info("No gossip state for node {}", left); + return; + } + + NodeState state = ClusterMetadata.current().directory.peerState(left); + if (state != NodeState.LEFT) + { + logger.info("Node Status for {} is not LEFT ({})", left, state); + return; + } + + EndpointState toSend = new EndpointState(epState); + toSend.forceNewerGenerationUnsafe(); + toSend.markDead(); + VersionedValue value = StorageService.instance.valueFactory.left(tokens, computeExpireTime()); + + if (left.equals(getBroadcastAddressAndPort())) + { + // Adding local state bumps the value's version. To keep this consistent across + // the cluster, re-fetch it before broadcasting. + Gossiper.instance.addLocalApplicationState(ApplicationState.STATUS_WITH_PORT, value); + value = Gossiper.instance.endpointStateMap.get(getBroadcastAddressAndPort()) + .getApplicationState(ApplicationState.STATUS_WITH_PORT); + } + + toSend.addApplicationState(ApplicationState.STATUS_WITH_PORT, value); + GossipDigestAck2 payload = new GossipDigestAck2(Collections.singletonMap(left, toSend)); + logger.info("Sending app state with status {} to {}", value.value, sendTo); + for (InetAddressAndPort ep : sendTo) + { + Message message = Message.out(Verb.GOSSIP_DIGEST_ACK2, payload); + MessagingService.instance().send(message, ep); + } + }); + } + private void unsafeUpdateEpStates(InetAddressAndPort endpoint, EndpointState epstate) { checkProperThreadForStateMutation(); @@ -2221,9 +2314,31 @@ private void mergeNodeToGossip(NodeId nodeId, ClusterMetadata metadata, Collecti newValue = valueFactory.hibernate(true); break; } + if (isLocal && !StorageService.instance.shouldJoinRing()) break; - newValue = GossipHelper.nodeStateToStatus(nodeId, metadata, tokens, valueFactory, oldValue); + + // If quarantine has been disabled and we have already seen a LEFT status for a remote peer + // which originated from the peer itself or the node which coordinated its removal (and so + // has a version > 0), keep it as this is how we ensure the gossip expiry time encoded in + // the status string converges across peers. + // Should a node leave and then rejoin after resetting its local state (i.e. wipe and + // rejoin), it is automatically unregistered which removes all gossip state for it so there + // will be no oldValue in that case. + // + // Note: don't reorder these conditions as isLeft includes a null check + if (getQuarantineDisabled() && !isLocal && Gossiper.isLeft(oldValue) && oldValue.version > 0) + { + logger.debug("Already seen a LEFT status for {} with a non-zero version, " + + "dropping derived value {}", endpoint, newValue); + newValue = oldValue; + } + else + { + newValue = GossipHelper.nodeStateToStatus(nodeId, metadata, tokens, valueFactory, oldValue); + if (Gossiper.isLeft(newValue)) + Gossiper.instance.addExpireTimeForEndpoint(endpoint, Gossiper.extractExpireTime(newValue.splitValue())); + } break; default: newValue = oldValue; @@ -2269,4 +2384,17 @@ public void triggerRoundWithCMS() sendGossip(message, cms); } } + + @Override + public boolean getQuarantineDisabled() + { + return DatabaseDescriptor.getGossipQuarantineDisabled(); + } + + @Override + public void setQuarantineDisabled(boolean enabled) + { + logger.info("Setting gossip_quarantine_disabled: {}", enabled); + DatabaseDescriptor.setGossipQuarantineDisabled(enabled); + } } diff --git a/src/java/org/apache/cassandra/gms/GossiperMBean.java b/src/java/org/apache/cassandra/gms/GossiperMBean.java index 0552883a60e5..3d46887b0eda 100644 --- a/src/java/org/apache/cassandra/gms/GossiperMBean.java +++ b/src/java/org/apache/cassandra/gms/GossiperMBean.java @@ -43,4 +43,8 @@ public interface GossiperMBean public boolean getLooseEmptyEnabled(); public void setLooseEmptyEnabled(boolean enabled); + + public boolean getQuarantineDisabled(); + + public void setQuarantineDisabled(boolean disabled); } diff --git a/src/java/org/apache/cassandra/gms/VersionedValue.java b/src/java/org/apache/cassandra/gms/VersionedValue.java index b03211b05c64..126aecf70336 100644 --- a/src/java/org/apache/cassandra/gms/VersionedValue.java +++ b/src/java/org/apache/cassandra/gms/VersionedValue.java @@ -159,6 +159,11 @@ public byte[] toBytes() return value.getBytes(ISO_8859_1); } + public String[] splitValue() + { + return value.split(DELIMITER_STR, -1); + } + private static String versionString(String... args) { return StringUtils.join(args, VersionedValue.DELIMITER); diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java index 3dc5adf5a5a0..e6b051e57084 100644 --- a/src/java/org/apache/cassandra/service/StorageService.java +++ b/src/java/org/apache/cassandra/service/StorageService.java @@ -2158,6 +2158,12 @@ public void onChange(InetAddressAndPort endpoint, ApplicationState state, Versio Gossiper.instance.markDead(endpoint, epState); }); } + else if (Gossiper.isLeft(value)) + { + long expireTime = Gossiper.extractExpireTime(value.splitValue()); + logger.info("Node state LEFT detected, setting or updating expire time {}", expireTime); + Gossiper.instance.addExpireTimeForEndpoint(endpoint, expireTime); + } } if (epState == null || Gossiper.instance.isDeadState(epState)) @@ -2207,7 +2213,7 @@ public void onChange(InetAddressAndPort endpoint, ApplicationState state, Versio updateNetVersion(endpoint, value); break; case STATUS_WITH_PORT: - String[] pieces = splitValue(value); + String[] pieces = value.splitValue(); String moveName = pieces[0]; if (moveName.equals(VersionedValue.SHUTDOWN)) logger.info("Node {} state jump to shutdown", endpoint); @@ -2226,11 +2232,6 @@ else if (moveName.equals(VersionedValue.STATUS_NORMAL)) } } - private static String[] splitValue(VersionedValue value) - { - return value.value.split(VersionedValue.DELIMITER_STR, -1); - } - public static void updateIndexStatus(InetAddressAndPort endpoint, VersionedValue versionedValue) { IndexStatusManager.instance.receivePeerIndexStatus(endpoint, versionedValue); diff --git a/src/java/org/apache/cassandra/tcm/listeners/LegacyStateListener.java b/src/java/org/apache/cassandra/tcm/listeners/LegacyStateListener.java index fa1fe9d25ba0..3f77db174f7f 100644 --- a/src/java/org/apache/cassandra/tcm/listeners/LegacyStateListener.java +++ b/src/java/org/apache/cassandra/tcm/listeners/LegacyStateListener.java @@ -65,8 +65,7 @@ public void notifyPostCommit(ClusterMetadata prev, ClusterMetadata next, boolean next.tokenMap.lastModified().equals(prev.tokenMap.lastModified())) return; - Set removedAddr = Sets.difference(new HashSet<>(prev.directory.allAddresses()), - new HashSet<>(next.directory.allAddresses())); + Set removedAddr = Sets.difference(prev.directory.allAddresses(), next.directory.allAddresses()); Set changed = new HashSet<>(); for (NodeId node : next.directory.peerIds()) diff --git a/src/java/org/apache/cassandra/tcm/listeners/UpgradeMigrationListener.java b/src/java/org/apache/cassandra/tcm/listeners/UpgradeMigrationListener.java index ff24e17b9d95..67931af1779c 100644 --- a/src/java/org/apache/cassandra/tcm/listeners/UpgradeMigrationListener.java +++ b/src/java/org/apache/cassandra/tcm/listeners/UpgradeMigrationListener.java @@ -35,5 +35,7 @@ public void notifyPostCommit(ClusterMetadata prev, ClusterMetadata next, boolean logger.info("Detected upgrade from gossip mode, updating my host id in gossip to {}", next.myNodeId()); Gossiper.instance.mergeNodeToGossip(next.myNodeId(), next); + if (Gossiper.instance.getQuarantineDisabled()) + Gossiper.instance.clearQuarantinedEndpoints(); } } diff --git a/src/java/org/apache/cassandra/tcm/membership/Directory.java b/src/java/org/apache/cassandra/tcm/membership/Directory.java index 8e73f7f34189..ec5f2c4a9dd9 100644 --- a/src/java/org/apache/cassandra/tcm/membership/Directory.java +++ b/src/java/org/apache/cassandra/tcm/membership/Directory.java @@ -26,7 +26,6 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; -import com.google.common.collect.ImmutableList; import com.google.common.collect.Multimap; import com.google.common.collect.Sets; @@ -385,9 +384,9 @@ public boolean isEmpty() * those cases use allJoinedEndpoints. * @return */ - public ImmutableList allAddresses() + public Set allAddresses() { - return ImmutableList.copyOf(peers.values()); + return peers.values(); } public NavigableSet peerIds() diff --git a/src/java/org/apache/cassandra/tcm/sequences/SingleNodeSequences.java b/src/java/org/apache/cassandra/tcm/sequences/SingleNodeSequences.java index 58c5f024f9c0..9da95b8aff2b 100644 --- a/src/java/org/apache/cassandra/tcm/sequences/SingleNodeSequences.java +++ b/src/java/org/apache/cassandra/tcm/sequences/SingleNodeSequences.java @@ -18,6 +18,7 @@ package org.apache.cassandra.tcm.sequences; +import java.util.Collection; import java.util.Collections; import java.util.EnumSet; @@ -38,6 +39,7 @@ import org.apache.cassandra.tcm.transformations.CancelInProgressSequence; import org.apache.cassandra.tcm.transformations.PrepareLeave; import org.apache.cassandra.tcm.transformations.PrepareMove; +import org.apache.cassandra.utils.FBUtilities; import static org.apache.cassandra.service.StorageService.Mode.LEAVING; import static org.apache.cassandra.service.StorageService.Mode.MOVE_FAILED; @@ -73,7 +75,7 @@ static void decommission(boolean shutdownNetworking, boolean force) logger.debug("DECOMMISSIONING"); NodeId self = metadata.myNodeId(); - + Collection tokens = metadata.tokenMap.tokens(self); ReconfigureCMS.maybeReconfigureCMS(metadata, getBroadcastAddressAndPort()); MultiStepOperation inProgress = metadata.inProgressSequences.get(self); @@ -95,6 +97,9 @@ else if (InProgressSequences.isLeave(inProgress)) } InProgressSequences.finishInProgressSequences(self); + Gossiper.instance.unsafeBroadcastLeftStatus(FBUtilities.getBroadcastAddressAndPort(), + tokens, + metadata.directory.allJoinedEndpoints()); if (shutdownNetworking) StorageService.instance.shutdownNetworking(); } @@ -134,12 +139,13 @@ static void removeNode(NodeId toRemove, boolean force) ReconfigureCMS.maybeReconfigureCMS(metadata, endpoint); logger.info("starting removenode with {} {}", metadata.epoch, toRemove); - + Collection tokens = metadata.tokenMap.tokens(toRemove); ClusterMetadataService.instance().commit(new PrepareLeave(toRemove, force, ClusterMetadataService.instance().placementProvider(), LeaveStreams.Kind.REMOVENODE)); InProgressSequences.finishInProgressSequences(toRemove); + Gossiper.instance.unsafeBroadcastLeftStatus(endpoint, tokens, metadata.directory.allJoinedEndpoints()); } static void abortRemoveNode(String nodeId) diff --git a/src/java/org/apache/cassandra/tcm/transformations/Assassinate.java b/src/java/org/apache/cassandra/tcm/transformations/Assassinate.java index 76bca253ee93..f1bb6d50d317 100644 --- a/src/java/org/apache/cassandra/tcm/transformations/Assassinate.java +++ b/src/java/org/apache/cassandra/tcm/transformations/Assassinate.java @@ -18,8 +18,12 @@ package org.apache.cassandra.tcm.transformations; +import java.util.Collection; + import com.google.common.collect.ImmutableSet; +import org.apache.cassandra.dht.Token; +import org.apache.cassandra.gms.Gossiper; import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.tcm.ClusterMetadata; import org.apache.cassandra.tcm.ClusterMetadataService; @@ -64,8 +68,10 @@ public static void assassinateEndpoint(InetAddressAndPort endpoint) ReconfigureCMS.maybeReconfigureCMS(metadata, endpoint); NodeId nodeId = metadata.directory.peerId(endpoint); - ClusterMetadataService.instance().commit(new Assassinate(nodeId, - ClusterMetadataService.instance().placementProvider())); + Collection tokens = metadata.tokenMap.tokens(nodeId); + ClusterMetadataService.instance() + .commit(new Assassinate(nodeId, ClusterMetadataService.instance().placementProvider())); + Gossiper.instance.unsafeBroadcastLeftStatus(endpoint, tokens, metadata.directory.allJoinedEndpoints()); } @Override diff --git a/test/distributed/org/apache/cassandra/distributed/shared/ClusterUtils.java b/test/distributed/org/apache/cassandra/distributed/shared/ClusterUtils.java index d6479e1813ed..fedc1a7c5568 100644 --- a/test/distributed/org/apache/cassandra/distributed/shared/ClusterUtils.java +++ b/test/distributed/org/apache/cassandra/distributed/shared/ClusterUtils.java @@ -46,7 +46,6 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Strings; -import com.google.common.collect.ImmutableList; import com.google.common.util.concurrent.Futures; import accord.topology.EpochReady; @@ -805,7 +804,7 @@ public static Epoch snapshotClusterMetadata(IInvokableInstance inst) public static Map getPeerEpochs(IInvokableInstance requester) { Map map = requester.callOnInstance(() -> { - ImmutableList peers = ClusterMetadata.current().directory.allAddresses(); + Set peers = ClusterMetadata.current().directory.allAddresses(); CountDownLatch latch = CountDownLatch.newCountDownLatch(peers.size()); Map epochs = new ConcurrentHashMap<>(peers.size()); peers.forEach(peer -> { diff --git a/test/distributed/org/apache/cassandra/distributed/test/CASTest.java b/test/distributed/org/apache/cassandra/distributed/test/CASTest.java index a08ba58a06ba..15c532422a61 100644 --- a/test/distributed/org/apache/cassandra/distributed/test/CASTest.java +++ b/test/distributed/org/apache/cassandra/distributed/test/CASTest.java @@ -105,13 +105,17 @@ public static void beforeClass() throws Throwable ParameterizedClass seeds = new ParameterizedClass(SimpleSeedProvider.class.getName(), Collections.singletonMap("seeds", "127.0.0.2")); + // TODO: This currently requires the accord service to be disabled as some tests remove a node from the + // cluster and re-join it using the same broadcast address. See CASSANDRA-21026 Consumer conf = config -> config .set("paxos_variant", "v2") .set("write_request_timeout", REQUEST_TIMEOUT) .set("cas_contention_timeout", CONTENTION_TIMEOUT) .set("request_timeout", REQUEST_TIMEOUT) .set("seed_provider", seeds) - .set("auto_bootstrap", config.num() == 2); + .set("auto_bootstrap", config.num() == 2) + .set("accord.enabled", false); + // TODO: fails with vnode enabled THREE_NODES = init(Cluster.build(3).withConfig(conf).withoutVNodes().start()); FOUR_NODES = init(Cluster.build(4).withConfig(conf).withoutVNodes().start(), 3); @@ -830,18 +834,28 @@ private void joinFully(Cluster cluster, int node) { IInstanceConfig config = cluster.get(node).config(); InetAddressAndPort address = InetAddressAndPort.getByAddress(config.broadcastAddress()); + String dc = config.localDatacenter(); + String rack = config.localRack(); IPartitioner partitioner = FBUtilities.newPartitioner(config.getString("partitioner")); Token token = partitioner.getTokenFactory().fromString(config.getString("initial_token")); - cluster.get(node).runOnInstance(() -> ClusterMetadataTestHelper.join(address, token)); + cluster.get(node).runOnInstance(() -> { + ClusterMetadataTestHelper.register(address, dc, rack); + ClusterMetadataTestHelper.join(address, token); + }); } private void joinPartially(Cluster cluster, int node) { IInstanceConfig config = cluster.get(node).config(); InetAddressAndPort address = InetAddressAndPort.getByAddress(config.broadcastAddress()); + String dc = config.localDatacenter(); + String rack = config.localRack(); IPartitioner partitioner = FBUtilities.newPartitioner(config.getString("partitioner")); Token token = partitioner.getTokenFactory().fromString(config.getString("initial_token")); - cluster.get(node).runOnInstance(() -> ClusterMetadataTestHelper.joinPartially(address, token)); + cluster.get(node).runOnInstance(() -> { + ClusterMetadataTestHelper.register(address, dc, rack); + ClusterMetadataTestHelper.joinPartially(address, token); + }); } private void finishJoin(Cluster cluster, int node) diff --git a/test/distributed/org/apache/cassandra/distributed/test/CASTestBase.java b/test/distributed/org/apache/cassandra/distributed/test/CASTestBase.java index 1c441ded3ef7..715a091dca26 100644 --- a/test/distributed/org/apache/cassandra/distributed/test/CASTestBase.java +++ b/test/distributed/org/apache/cassandra/distributed/test/CASTestBase.java @@ -193,7 +193,10 @@ public static void assertVisibleInRing(IInstance peer) public static void removeFromRing(IInvokableInstance peer) { - peer.runOnInstance(() -> ClusterMetadataTestHelper.leave(FBUtilities.getBroadcastAddressAndPort())); + peer.runOnInstance(() -> { + ClusterMetadataTestHelper.leave(FBUtilities.getBroadcastAddressAndPort()); + ClusterMetadataTestHelper.unregister(FBUtilities.getBroadcastAddressAndPort()); + }); } public static void assertNotVisibleInRing(IInstance peer) diff --git a/test/distributed/org/apache/cassandra/distributed/test/GossipTest.java b/test/distributed/org/apache/cassandra/distributed/test/GossipTest.java index 9c52ac92a9cc..cb5cb1f470f5 100644 --- a/test/distributed/org/apache/cassandra/distributed/test/GossipTest.java +++ b/test/distributed/org/apache/cassandra/distributed/test/GossipTest.java @@ -251,7 +251,7 @@ static void populate(Cluster cluster) } @Test - public void testQuarantine() throws IOException + public void testReplacedNodeRemovedFromGossip() throws IOException { TokenSupplier even = TokenSupplier.evenlyDistributedTokens(4, 1); try (Cluster cluster = Cluster.build(4) diff --git a/test/distributed/org/apache/cassandra/distributed/test/gossip/GossipExpiryAfterAssassinateTest.java b/test/distributed/org/apache/cassandra/distributed/test/gossip/GossipExpiryAfterAssassinateTest.java new file mode 100644 index 000000000000..4bcae67a2aaa --- /dev/null +++ b/test/distributed/org/apache/cassandra/distributed/test/gossip/GossipExpiryAfterAssassinateTest.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.distributed.test.gossip; + +import java.net.InetSocketAddress; + +import org.apache.cassandra.distributed.Cluster; +import org.apache.cassandra.distributed.api.IInvokableInstance; + +public class GossipExpiryAfterAssassinateTest extends GossipExpiryTestBase +{ + @Override + void doRemoval(Cluster cluster, IInvokableInstance toRemove) + { + // Shut down one peer, then have another assassinate it. The coordinating node will gossip a final LEFT status, + // including the expiry time it calculated to the remaining members. + IInvokableInstance coordinator = cluster.get(1); + if (coordinator.equals(toRemove)) + throw new IllegalArgumentException("Node cannot assassinate itself"); + + try + { + InetSocketAddress toAssassinate = toRemove.broadcastAddress(); + toRemove.shutdown().get(); + coordinator.nodetoolResult("assassinate", toAssassinate.getHostString()).asserts().success(); + } + catch (Exception e) + { + throw new RuntimeException(e); + } + } +} diff --git a/test/distributed/org/apache/cassandra/distributed/test/gossip/GossipExpiryAfterDecommissionTest.java b/test/distributed/org/apache/cassandra/distributed/test/gossip/GossipExpiryAfterDecommissionTest.java new file mode 100644 index 000000000000..5b3f4e09fd2a --- /dev/null +++ b/test/distributed/org/apache/cassandra/distributed/test/gossip/GossipExpiryAfterDecommissionTest.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.distributed.test.gossip; + +import org.apache.cassandra.distributed.Cluster; +import org.apache.cassandra.distributed.api.IInvokableInstance; + +public class GossipExpiryAfterDecommissionTest extends GossipExpiryTestBase +{ + @Override + void doRemoval(Cluster cluster, IInvokableInstance toRemove) + { + // Decommission one peer. Before shutting down messaging, the leaving node will gossip its final LEFT + // status, including the expiry time it calculated to the remaining members. + toRemove.nodetoolResult("decommission").asserts().success(); + } +} diff --git a/test/distributed/org/apache/cassandra/distributed/test/gossip/GossipExpiryAfterRemoveNodeTest.java b/test/distributed/org/apache/cassandra/distributed/test/gossip/GossipExpiryAfterRemoveNodeTest.java new file mode 100644 index 000000000000..c9def985dd6b --- /dev/null +++ b/test/distributed/org/apache/cassandra/distributed/test/gossip/GossipExpiryAfterRemoveNodeTest.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.distributed.test.gossip; + +import org.apache.cassandra.distributed.Cluster; +import org.apache.cassandra.distributed.api.IInvokableInstance; +import org.apache.cassandra.tcm.ClusterMetadata; + +public class GossipExpiryAfterRemoveNodeTest extends GossipExpiryTestBase +{ + @Override + void doRemoval(Cluster cluster, IInvokableInstance toRemove) + { + // Shut down one peer, then have another remove it. The coordinating node will gossip a final LEFT status, + // including the expiry time it calculated to the remaining members. + IInvokableInstance coordinator = cluster.get(1); + if (coordinator.equals(toRemove)) + throw new IllegalArgumentException("Node to be removed cannot act as removal coordinator"); + + try + { + String nodeId = toRemove.callOnInstance(() -> ClusterMetadata.current().myNodeId().toUUID().toString()); + toRemove.shutdown().get(); + coordinator.nodetoolResult("removenode", nodeId).asserts().success(); + } + catch (Exception e) + { + throw new RuntimeException(e); + } + } +} diff --git a/test/distributed/org/apache/cassandra/distributed/test/gossip/GossipExpiryTestBase.java b/test/distributed/org/apache/cassandra/distributed/test/gossip/GossipExpiryTestBase.java new file mode 100644 index 000000000000..51d340a70c65 --- /dev/null +++ b/test/distributed/org/apache/cassandra/distributed/test/gossip/GossipExpiryTestBase.java @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.distributed.test.gossip; + +import java.io.IOException; +import java.util.HashSet; +import java.util.Map; +import java.util.Random; +import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import net.bytebuddy.ByteBuddy; +import net.bytebuddy.dynamic.loading.ClassLoadingStrategy; +import net.bytebuddy.implementation.MethodDelegation; +import net.bytebuddy.implementation.bind.annotation.SuperCall; +import org.apache.cassandra.distributed.Cluster; +import org.apache.cassandra.distributed.api.IInvokableInstance; +import org.apache.cassandra.distributed.shared.ClusterUtils; +import org.apache.cassandra.distributed.shared.WithProperties; +import org.apache.cassandra.distributed.test.TestBaseImpl; +import org.apache.cassandra.gms.GossipExpiryHelper; +import org.apache.cassandra.tcm.ClusterMetadata; +import org.apache.cassandra.tcm.listeners.LegacyStateListener; +import org.apache.cassandra.tcm.membership.NodeId; +import org.awaitility.Awaitility; + +import static net.bytebuddy.matcher.ElementMatchers.named; +import static org.apache.cassandra.config.CassandraRelevantProperties.VERY_LONG_TIME_MS; +import static org.apache.cassandra.distributed.api.Feature.GOSSIP; +import static org.apache.cassandra.distributed.api.Feature.NETWORK; +import static org.apache.cassandra.gms.ApplicationState.STATUS_WITH_PORT; +import static org.apache.cassandra.gms.VersionedValue.STATUS_LEFT; + +public abstract class GossipExpiryTestBase extends TestBaseImpl +{ + private static final Logger logger = LoggerFactory.getLogger(GossipExpiryTestBase.class); + + abstract void doRemoval(Cluster cluster, IInvokableInstance toRemove); + + @Test + public void testExpiryOfLeftStateWithoutQuarantine() throws IOException + { + doTest(true); + } + + @Test + public void testExpiryOfLeftStateWithQuarantine() throws IOException + { + doTest(false); + } + + private void doTest(boolean withQuarantineDisabled) throws IOException + { + // This test verifies that when a node leaves the cluster, the expiry time for its state in gossip is + // recorded on each node and then expunged when that deadline is reached. By default, the expiry time for + // a left peer is calculated on each node independently, but if the gossip_quarantine_disabled config + // option is set to true it will converge and become consistent across the remaining members. + // + // * First we set the property that controls expiry time to 10s. This interval is added to the current wall + // clock time to calculate the expiry deadline. + // * Use bytebuddy to inject some jitter into the local expiry time calculation. Each node will individually + // calculate an expiry based on when it processes the completion of the operation that removes the node. If + // the config option is set the cluster should converge on the expiry time calculated by the node coordinating + // the operation. For decommission, this will be the leaving node itself and for removenode/assassinate it + // will be the coordinator. The jitter is to make sure that in the test, the nodes start off with differing + // expiry times. + // * Remove one peer via decommission, removenode or assassinate. + // * After maybe verifying the convergence, check that the state for the left node does in fact get removed. + try (WithProperties ignored = new WithProperties().set(VERY_LONG_TIME_MS, 11000); + Cluster cluster = builder().withNodes(5) + .withInstanceInitializer(GossipExpiryTestBase.BB::install) + .withConfig(config -> config.with(NETWORK, GOSSIP) + .set("gossip_quarantine_disabled", withQuarantineDisabled)) + .start()) + { + cluster.forEach(i -> i.runOnInstance(() -> BB.injectDelay.set(true))); + + IInvokableInstance toRemove = cluster.get(5); + String gossipStateKey = toRemove.config().broadcastAddress().getAddress().toString(); + doRemoval(cluster, toRemove); + if (withQuarantineDisabled) + { + // STATUS_WITH_PORT for the left node should converge to share the same expiry time across all nodes + Awaitility.waitAtMost(10, TimeUnit.SECONDS) + .pollInterval(1, TimeUnit.SECONDS) + .until(() -> { + Set endpointStates = new HashSet<>(); + cluster.forEach(i -> { + if (!i.equals(toRemove)) + { + Map instanceState = ClusterUtils.gossipInfo(i).get(gossipStateKey); + if (instanceState != null && instanceState.containsKey(STATUS_WITH_PORT.name())) + endpointStates.add(instanceState.get(STATUS_WITH_PORT.name())); + } + }); + logger.info("Collected STATUS_WITH_PORT values: {}", endpointStates); + return endpointStates.size() == 1 && endpointStates.iterator() + .next() + .contains(STATUS_LEFT); + }); + } + + // Once the expiry time is reached, gossip state for the left node is purged + Awaitility.waitAtMost(30, TimeUnit.SECONDS) + .pollInterval(1, TimeUnit.SECONDS) + .until(() -> { + AtomicBoolean purged = new AtomicBoolean(true); + cluster.forEach(i -> { + if (!i.equals(toRemove)) + { + // Expiry happens during periodic gossip tasks. Sometimes these may not run in a + // timely fashion, so for tests we can trigger the status check artificially. + i.runOnInstance(GossipExpiryHelper.evictExpiredFromGossip(toRemove)); + if (ClusterUtils.gossipInfo(i).containsKey(gossipStateKey)) + purged.set(false); + } + }); + return purged.get(); + }); + } + } + + public static class BB + { + static void install(ClassLoader cl, int nodeNumber) + { + if (nodeNumber != 2) + return; + new ByteBuddy().rebase(LegacyStateListener.class) + .method(named("processChangesToRemotePeers")) + .intercept(MethodDelegation.to(BB.class)) + .make() + .load(cl, ClassLoadingStrategy.Default.INJECTION); + } + + static AtomicBoolean injectDelay = new AtomicBoolean(false); + static Random random = new Random(System.nanoTime()); + + public static void processChangesToRemotePeers(ClusterMetadata prev, + ClusterMetadata next, + Set changed, + @SuperCall Callable zuper) throws Exception + { + if (injectDelay.get()) + TimeUnit.MILLISECONDS.sleep(random.nextInt(1000)); + zuper.call(); + } + } +} diff --git a/test/distributed/org/apache/cassandra/distributed/test/log/ClusterMetadataTestHelper.java b/test/distributed/org/apache/cassandra/distributed/test/log/ClusterMetadataTestHelper.java index 43c147c2550c..cbdcc2239495 100644 --- a/test/distributed/org/apache/cassandra/distributed/test/log/ClusterMetadataTestHelper.java +++ b/test/distributed/org/apache/cassandra/distributed/test/log/ClusterMetadataTestHelper.java @@ -21,6 +21,7 @@ import java.net.UnknownHostException; import java.util.Collection; import java.util.Collections; +import java.util.EnumSet; import java.util.HashSet; import java.util.Map; import java.util.Random; @@ -83,9 +84,9 @@ import org.apache.cassandra.tcm.sequences.BootstrapAndJoin; import org.apache.cassandra.tcm.sequences.BootstrapAndReplace; import org.apache.cassandra.tcm.sequences.InProgressSequences; +import org.apache.cassandra.tcm.sequences.LeaveStreams; import org.apache.cassandra.tcm.sequences.LockedRanges; import org.apache.cassandra.tcm.sequences.Move; -import org.apache.cassandra.tcm.sequences.LeaveStreams; import org.apache.cassandra.tcm.sequences.ReconfigureCMS; import org.apache.cassandra.tcm.sequences.UnbootstrapAndLeave; import org.apache.cassandra.tcm.transformations.AlterSchema; @@ -94,6 +95,7 @@ import org.apache.cassandra.tcm.transformations.PrepareMove; import org.apache.cassandra.tcm.transformations.PrepareReplace; import org.apache.cassandra.tcm.transformations.Register; +import org.apache.cassandra.tcm.transformations.Unregister; import org.apache.cassandra.tcm.transformations.cms.AdvanceCMSReconfiguration; import org.apache.cassandra.tcm.transformations.cms.PrepareCMSReconfiguration; import org.apache.cassandra.utils.ByteBufferUtil; @@ -101,6 +103,10 @@ import org.apache.cassandra.utils.Throwables; import static org.apache.cassandra.schema.SchemaTestUtil.submit; +import static org.apache.cassandra.tcm.membership.NodeState.BOOTSTRAPPING; +import static org.apache.cassandra.tcm.membership.NodeState.BOOT_REPLACING; +import static org.apache.cassandra.tcm.membership.NodeState.LEFT; +import static org.apache.cassandra.tcm.membership.NodeState.REGISTERED; import static org.junit.Assert.assertEquals; public class ClusterMetadataTestHelper @@ -427,6 +433,25 @@ public static void leave(InetAddressAndPort endpoint) } } + public static void unregister(InetAddressAndPort endpoint) + { + unregister(nodeId(endpoint)); + } + + public static void unregister(NodeId nodeId) + { + try + { + commit(new Unregister(nodeId, + EnumSet.of(REGISTERED, BOOTSTRAPPING, BOOT_REPLACING, LEFT), + ClusterMetadataService.instance().placementProvider())); + } + catch (Throwable e) + { + throw new RuntimeException(e); + } + } + public static JoinProcess lazyJoin(int nodeIdx, long token) { return lazyJoin(addr(nodeIdx), Collections.singleton(new Murmur3Partitioner.LongToken(token))); diff --git a/test/distributed/org/apache/cassandra/gms/GossipExpiryHelper.java b/test/distributed/org/apache/cassandra/gms/GossipExpiryHelper.java new file mode 100644 index 000000000000..ed808a6bcb6a --- /dev/null +++ b/test/distributed/org/apache/cassandra/gms/GossipExpiryHelper.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.gms; + + +import java.net.InetSocketAddress; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.distributed.api.IInvokableInstance; +import org.apache.cassandra.distributed.api.IIsolatedExecutor; +import org.apache.cassandra.locator.InetAddressAndPort; +import org.apache.cassandra.tcm.ClusterMetadata; +import org.apache.cassandra.tcm.membership.Directory; + +import static org.apache.cassandra.distributed.impl.TestEndpointCache.toCassandraInetAddressAndPort; + +public class GossipExpiryHelper +{ + public static IIsolatedExecutor.SerializableRunnable evictExpiredFromGossip(IInvokableInstance instance) + { + InetSocketAddress address = instance.config().broadcastAddress(); + return () -> { + Logger logger = LoggerFactory.getLogger(Gossiper.class); + Directory directory = ClusterMetadata.current().directory; + long now = System.currentTimeMillis(); + InetAddressAndPort endpoint = toCassandraInetAddressAndPort(address); + EndpointState epState = Gossiper.instance.endpointStateMap.get(endpoint); + if (epState == null) + { + logger.info("Test helper found no gossip state for endpoint {}", endpoint); + return; + } + logger.info("Test helper triggering expiry check at {} for {} (joined: {}, alive: {})", + now, + endpoint, + directory.allJoinedEndpoints().contains(endpoint), + epState.isAlive()); + FailureDetector.instance.forceConviction(endpoint); + Gossiper.instance.evictIfExpired(endpoint, epState, directory, now); + }; + } +} diff --git a/test/unit/org/apache/cassandra/gms/GossiperTest.java b/test/unit/org/apache/cassandra/gms/GossiperTest.java index d254850a36d2..1dfe194b2216 100644 --- a/test/unit/org/apache/cassandra/gms/GossiperTest.java +++ b/test/unit/org/apache/cassandra/gms/GossiperTest.java @@ -47,6 +47,7 @@ import org.apache.cassandra.dht.IPartitioner; import org.apache.cassandra.dht.RandomPartitioner; import org.apache.cassandra.dht.Token; +import org.apache.cassandra.distributed.test.log.ClusterMetadataTestHelper; import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.locator.SeedProvider; import org.apache.cassandra.tcm.ClusterMetadataService; @@ -122,6 +123,8 @@ public void testPaddingIntact() throws Exception public void testLargeGenerationJump() throws UnknownHostException, InterruptedException { Util.initGossipTokens(partitioner, endpointTokens, hosts, hostIds, 2); + for (InetAddressAndPort host : hosts) + ClusterMetadataTestHelper.register(host); try { InetAddressAndPort remoteHostAddress = hosts.get(1); @@ -169,6 +172,8 @@ public void testDuplicatedStateUpdate() throws Exception SimpleStateChangeListener stateChangeListener = null; Util.initGossipTokens(partitioner, endpointTokens, hosts, hostIds, 2); + for (InetAddressAndPort host : hosts) + ClusterMetadataTestHelper.register(host); try { InetAddressAndPort remoteHostAddress = hosts.get(1); @@ -329,6 +334,9 @@ public void testNotFireDuplicatedNotificationsWithUpdateContainsOldAndNewState() new VersionedValue.VersionedValueFactory(DatabaseDescriptor.getPartitioner()); Util.initGossipTokens(partitioner, endpointTokens, hosts, hostIds, 2); + for (InetAddressAndPort host : hosts) + ClusterMetadataTestHelper.register(host); + SimpleStateChangeListener stateChangeListener = null; try { diff --git a/test/unit/org/apache/cassandra/tcm/membership/MembershipUtils.java b/test/unit/org/apache/cassandra/tcm/membership/MembershipUtils.java index aebe443529e3..91e1f1e1c3ce 100644 --- a/test/unit/org/apache/cassandra/tcm/membership/MembershipUtils.java +++ b/test/unit/org/apache/cassandra/tcm/membership/MembershipUtils.java @@ -20,6 +20,8 @@ import java.net.UnknownHostException; import java.util.Random; +import java.util.Set; +import java.util.stream.Collectors; import org.apache.cassandra.locator.InetAddressAndPort; @@ -36,6 +38,15 @@ public static InetAddressAndPort randomEndpoint(Random random) return endpoint(random.nextInt(254) + 1); } + public static Set uniqueEndpoints(Random random, int count) + { + return random.ints(1, 255) + .distinct() + .limit(count) + .mapToObj(MembershipUtils::endpoint) + .collect(Collectors.toSet()); + } + public static InetAddressAndPort endpoint(int i) { return endpoint((byte)i); diff --git a/test/unit/org/apache/cassandra/tcm/transformations/PrepareJoinTest.java b/test/unit/org/apache/cassandra/tcm/transformations/PrepareJoinTest.java index 33ccb137ae61..9cd8ee774685 100644 --- a/test/unit/org/apache/cassandra/tcm/transformations/PrepareJoinTest.java +++ b/test/unit/org/apache/cassandra/tcm/transformations/PrepareJoinTest.java @@ -21,6 +21,7 @@ import java.util.Collection; import java.util.Collections; import java.util.HashSet; +import java.util.Iterator; import java.util.Random; import java.util.Set; @@ -36,16 +37,18 @@ import org.apache.cassandra.dht.Token; import org.apache.cassandra.distributed.test.log.ClusterMetadataTestHelper; import org.apache.cassandra.exceptions.ExceptionCode; +import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.tcm.ClusterMetadata; import org.apache.cassandra.tcm.Transformation; import org.apache.cassandra.tcm.membership.Directory; import org.apache.cassandra.tcm.membership.Location; +import org.apache.cassandra.tcm.membership.NodeAddresses; import org.apache.cassandra.tcm.membership.NodeId; import org.apache.cassandra.tcm.membership.NodeState; import org.apache.cassandra.tcm.membership.NodeVersion; import org.apache.cassandra.tcm.ownership.OwnershipUtils; -import static org.apache.cassandra.tcm.membership.MembershipUtils.nodeAddresses; +import static org.apache.cassandra.tcm.membership.MembershipUtils.uniqueEndpoints; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; @@ -116,9 +119,10 @@ private ClusterMetadata metadata() other = new NodeId(1); joining = new NodeId(2); Location location = new Location("dc", "rack"); - Directory directory = new Directory().unsafeWithNodeForTesting(other, nodeAddresses(random), location, NodeVersion.CURRENT) + Iterator endpoints = uniqueEndpoints(random, 2).iterator(); + Directory directory = new Directory().unsafeWithNodeForTesting(other, new NodeAddresses(endpoints.next()), location, NodeVersion.CURRENT) .withNodeState(other, NodeState.JOINED) - .unsafeWithNodeForTesting(joining, nodeAddresses(random), location, NodeVersion.CURRENT) + .unsafeWithNodeForTesting(joining, new NodeAddresses(endpoints.next()), location, NodeVersion.CURRENT) .withNodeState(joining, NodeState.REGISTERED); Set ownedTokens = OwnershipUtils.randomTokens(16, partitioner, random); return ClusterMetadataTestHelper.minimalForTesting(partitioner)