8787import org .apache .cassandra .net .RequestCallback ;
8888import org .apache .cassandra .service .StorageService ;
8989import org .apache .cassandra .tcm .ClusterMetadata ;
90+ import org .apache .cassandra .tcm .Epoch ;
9091import org .apache .cassandra .tcm .membership .NodeId ;
9192import org .apache .cassandra .tcm .membership .NodeState ;
9293import org .apache .cassandra .utils .FBUtilities ;
107108import static org .apache .cassandra .gms .Gossiper .GossipedWith .SEED ;
108109import static org .apache .cassandra .gms .VersionedValue .BOOTSTRAPPING_STATUS ;
109110import static org .apache .cassandra .gms .VersionedValue .HIBERNATE ;
111+ import static org .apache .cassandra .gms .VersionedValue .REMOVED_TOKEN ;
112+ import static org .apache .cassandra .gms .VersionedValue .REMOVING_TOKEN ;
113+ import static org .apache .cassandra .gms .VersionedValue .SHUTDOWN ;
114+ import static org .apache .cassandra .gms .VersionedValue .STATUS_LEFT ;
110115import static org .apache .cassandra .gms .VersionedValue .unsafeMakeVersionedValue ;
111116import static org .apache .cassandra .net .NoPayload .noPayload ;
112117import static org .apache .cassandra .net .Verb .ECHO_REQ ;
@@ -137,8 +142,7 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean,
137142 private static final ScheduledExecutorPlus executor = executorFactory ().scheduled ("GossipTasks" );
138143
139144 static final ApplicationState [] STATES = ApplicationState .values ();
140- static final List <String > DEAD_STATES = Arrays .asList (VersionedValue .REMOVING_TOKEN , VersionedValue .REMOVED_TOKEN ,
141- VersionedValue .STATUS_LEFT , VersionedValue .HIBERNATE );
145+ static final List <String > DEAD_STATES = Arrays .asList (REMOVING_TOKEN , REMOVED_TOKEN , STATUS_LEFT , HIBERNATE );
142146 static ArrayList <String > SILENT_SHUTDOWN_STATES = new ArrayList <>();
143147 static
144148 {
@@ -185,7 +189,15 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean,
185189 /* map where key is endpoint and value is timestamp when this endpoint was removed from
186190 * gossip. We will ignore any gossip regarding these endpoints for QUARANTINE_DELAY time
187191 * after removal to prevent nodes from falsely reincarnating during the time when removal
188- * gossip gets propagated to all nodes */
192+ * gossip gets propagated to all nodes.
193+ * Note: in future, this need only be used when ClusterMetadataService is in the GOSSIP state,
194+ * i.e. during the major upgrade to the version with CEP-21, but before the CMS is initialized.
195+ * In this state, gossip is still used to propagate changes to broadcast address and release
196+ * version. Once the CMS initialization is complete, this is no longer necessary.
197+ * Currently in order to support a controlled rollout of that change to behaviour, quarantine
198+ * is still used by default, but can be disabled via config (gossip_quarantine_disabled) or
199+ * JMX (GossiperMBean::setQuarantineDisabled)
200+ */
189201 private final Map <InetAddressAndPort , Long > justRemovedEndpoints = new ConcurrentHashMap <>();
190202
191203 private final Map <InetAddressAndPort , Long > expireTimeEndpointMap = new ConcurrentHashMap <>();
@@ -450,14 +462,7 @@ private static boolean isShutdown(EndpointState epState)
450462
451463 public static boolean isShutdown (VersionedValue vv )
452464 {
453- if (vv == null )
454- return false ;
455-
456- String value = vv .value ;
457- String [] pieces = value .split (VersionedValue .DELIMITER_STR , -1 );
458- assert (pieces .length > 0 );
459- String state = pieces [0 ];
460- return state .equals (VersionedValue .SHUTDOWN );
465+ return matchesStatusString (vv , SHUTDOWN );
461466 }
462467
463468 public static boolean isHibernate (EndpointState epState )
@@ -469,15 +474,39 @@ public static boolean isHibernate(EndpointState epState)
469474 }
470475
471476 public static boolean isHibernate (VersionedValue vv )
477+ {
478+ return matchesStatusString (vv , HIBERNATE );
479+ }
480+
481+ public static boolean isLeft (VersionedValue vv )
482+ {
483+ return matchesStatusString (vv , STATUS_LEFT );
484+ }
485+
486+ private static boolean matchesStatusString (VersionedValue vv , String toMatch )
472487 {
473488 if (vv == null )
474489 return false ;
475490
476- String value = vv .value ;
477- String [] pieces = value .split (VersionedValue .DELIMITER_STR , -1 );
491+ String [] pieces = vv .splitValue ();
478492 assert (pieces .length > 0 );
479493 String state = pieces [0 ];
480- return state .equals (VersionedValue .HIBERNATE );
494+ return state .equals (toMatch );
495+ }
496+
497+ public static long extractExpireTime (String [] pieces )
498+ {
499+ if (pieces .length < 3 )
500+ return 0L ;
501+ try
502+ {
503+ return Long .parseLong (pieces [2 ]);
504+ }
505+ catch (NumberFormatException e )
506+ {
507+ logger .debug ("Invalid value found for expire time ({}), ignoring" , pieces [2 ]);
508+ return 0L ;
509+ }
481510 }
482511
483512 public static void runInGossipStageBlocking (Runnable runnable )
@@ -696,10 +725,21 @@ private void quarantineEndpoint(InetAddressAndPort endpoint, long quarantineExpi
696725 {
697726 if (disableEndpointRemoval )
698727 return ;
728+
729+ // Quarantine is only necessary while upgrading from gossip-driven management of cluster metadata
730+ if (getQuarantineDisabled () && ClusterMetadata .current ().epoch .isAfter (Epoch .UPGRADE_GOSSIP ))
731+ return ;
732+
699733 justRemovedEndpoints .put (endpoint , quarantineExpiration );
700734 GossiperDiagnostics .quarantinedEndpoint (this , endpoint , quarantineExpiration );
701735 }
702736
737+ public void clearQuarantinedEndpoints ()
738+ {
739+ logger .info ("Clearing quarantined endpoints" );
740+ justRemovedEndpoints .clear ();
741+ }
742+
703743 /**
704744 * The gossip digest is built based on randomization
705745 * rather than just looping through the collection of live endpoints.
@@ -948,15 +988,14 @@ void doStatusCheck()
948988 }
949989
950990 // check for dead state removal
951- long expireTime = getExpireTimeForEndpoint (endpoint );
952- if (!epState .isAlive () && (now > expireTime )
953- && (!metadata .directory .allAddresses ().contains (endpoint )))
991+ if (!epState .isAlive () && (!metadata .directory .allJoinedEndpoints ().contains (endpoint )))
954992 {
955- if (logger .isDebugEnabled ())
993+ long expireTime = getExpireTimeForEndpoint (endpoint );
994+ if (now > expireTime )
956995 {
957- logger .debug ("time is expiring for endpoint : {} ({})" , endpoint , expireTime );
996+ logger .info ("Reached gossip expiry time for endpoint : {} ({})" , endpoint , expireTime );
997+ runInGossipStageBlocking (() -> evictFromMembership (endpoint ));
958998 }
959- runInGossipStageBlocking (() -> evictFromMembership (endpoint ));
960999 }
9611000 }
9621001 }
@@ -1897,11 +1936,15 @@ public int getCurrentGenerationNumber(String address) throws UnknownHostExceptio
18971936
18981937 public void addExpireTimeForEndpoint (InetAddressAndPort endpoint , long expireTime )
18991938 {
1900- if (logger .isDebugEnabled ())
1939+ if (expireTime == 0L )
1940+ {
1941+ logger .debug ("Supplied expire time for {} was 0, not recording" , endpoint );
1942+ }
1943+ else
19011944 {
19021945 logger .debug ("adding expire time for endpoint : {} ({})" , endpoint , expireTime );
1946+ expireTimeEndpointMap .put (endpoint , expireTime );
19031947 }
1904- expireTimeEndpointMap .put (endpoint , expireTime );
19051948 }
19061949
19071950 public static long computeExpireTime ()
@@ -2104,6 +2147,50 @@ public void unsafeSendLocalEndpointStateTo(InetAddressAndPort ep)
21042147 MessagingService .instance ().send (message , ep );
21052148 }
21062149
2150+ public void unsafeBroadcastLeftStatus (InetAddressAndPort left ,
2151+ Collection <Token > tokens ,
2152+ Iterable <InetAddressAndPort > sendTo )
2153+ {
2154+ runInGossipStageBlocking (() -> {
2155+ EndpointState epState = endpointStateMap .get (left );
2156+ if (epState == null )
2157+ {
2158+ logger .info ("No gossip state for node {}" , left );
2159+ return ;
2160+ }
2161+
2162+ NodeState state = ClusterMetadata .current ().directory .peerState (left );
2163+ if (state != NodeState .LEFT )
2164+ {
2165+ logger .info ("Node Status for {} is not LEFT ({})" , left , state );
2166+ return ;
2167+ }
2168+
2169+ EndpointState toSend = new EndpointState (epState );
2170+ toSend .forceNewerGenerationUnsafe ();
2171+ toSend .markDead ();
2172+ VersionedValue value = StorageService .instance .valueFactory .left (tokens , computeExpireTime ());
2173+
2174+ if (left .equals (getBroadcastAddressAndPort ()))
2175+ {
2176+ // Adding local state bumps the value's version. To keep this consistent across
2177+ // the cluster, re-fetch it before broadcasting.
2178+ Gossiper .instance .addLocalApplicationState (ApplicationState .STATUS_WITH_PORT , value );
2179+ value = Gossiper .instance .endpointStateMap .get (getBroadcastAddressAndPort ())
2180+ .getApplicationState (ApplicationState .STATUS_WITH_PORT );
2181+ }
2182+
2183+ toSend .addApplicationState (ApplicationState .STATUS_WITH_PORT , value );
2184+ GossipDigestAck2 payload = new GossipDigestAck2 (Collections .singletonMap (left , toSend ));
2185+ logger .info ("Sending app state with status {} to {}" , value .value , sendTo );
2186+ for (InetAddressAndPort ep : sendTo )
2187+ {
2188+ Message <GossipDigestAck2 > message = Message .out (Verb .GOSSIP_DIGEST_ACK2 , payload );
2189+ MessagingService .instance ().send (message , ep );
2190+ }
2191+ });
2192+ }
2193+
21072194 private void unsafeUpdateEpStates (InetAddressAndPort endpoint , EndpointState epstate )
21082195 {
21092196 checkProperThreadForStateMutation ();
@@ -2221,9 +2308,31 @@ private void mergeNodeToGossip(NodeId nodeId, ClusterMetadata metadata, Collecti
22212308 newValue = valueFactory .hibernate (true );
22222309 break ;
22232310 }
2311+
22242312 if (isLocal && !StorageService .instance .shouldJoinRing ())
22252313 break ;
2226- newValue = GossipHelper .nodeStateToStatus (nodeId , metadata , tokens , valueFactory , oldValue );
2314+
2315+ // If quarantine has been disabled and we have already seen a LEFT status for a remote peer
2316+ // which originated from the peer itself or the node which coordinated its removal (and so
2317+ // has a version > 0), keep it as this is how we ensure the gossip expiry time encoded in
2318+ // the status string converges across peers.
2319+ // Should a node leave and then rejoin after resetting its local state (i.e. wipe and
2320+ // rejoin), it is automatically unregistered which removes all gossip state for it so there
2321+ // will be no oldValue in that case.
2322+ //
2323+ // Note: don't reorder these conditions as isLeft includes a null check
2324+ if (getQuarantineDisabled () && !isLocal && Gossiper .isLeft (oldValue ) && oldValue .version > 0 )
2325+ {
2326+ logger .debug ("Already seen a LEFT status for {} with a non-zero version, " +
2327+ "dropping derived value {}" , endpoint , newValue );
2328+ newValue = oldValue ;
2329+ }
2330+ else
2331+ {
2332+ newValue = GossipHelper .nodeStateToStatus (nodeId , metadata , tokens , valueFactory , oldValue );
2333+ if (Gossiper .isLeft (newValue ))
2334+ Gossiper .instance .addExpireTimeForEndpoint (endpoint , Gossiper .extractExpireTime (newValue .splitValue ()));
2335+ }
22272336 break ;
22282337 default :
22292338 newValue = oldValue ;
@@ -2269,4 +2378,17 @@ public void triggerRoundWithCMS()
22692378 sendGossip (message , cms );
22702379 }
22712380 }
2381+
2382+ @ Override
2383+ public boolean getQuarantineDisabled ()
2384+ {
2385+ return DatabaseDescriptor .getGossipQuarantineDisabled ();
2386+ }
2387+
2388+ @ Override
2389+ public void setQuarantineDisabled (boolean enabled )
2390+ {
2391+ logger .info ("Setting gossip_quarantine_disabled: {}" , enabled );
2392+ DatabaseDescriptor .setGossipQuarantineDisabled (enabled );
2393+ }
22722394}
0 commit comments