From b1f01d5d6f976d42bc7e203247979010b3b826f5 Mon Sep 17 00:00:00 2001 From: Yongzao <532741407@qq.com> Date: Tue, 5 May 2026 13:58:28 +0800 Subject: [PATCH 1/8] ready 4 review --- .../confignode/conf/ConfigNodeConfig.java | 33 ++++ .../confignode/conf/ConfigNodeDescriptor.java | 18 +++ .../manager/load/cache/LoadCache.java | 2 +- .../load/service/HeartbeatService.java | 8 +- .../manager/load/service/TopologyService.java | 143 ++++++++++++++++-- .../agent/task/PipeDataNodeTaskAgent.java | 4 +- .../impl/DataNodeInternalRPCServiceImpl.java | 42 ++++- .../db/queryengine/plan/ClusterTopology.java | 2 +- .../commons/client/ClientPoolFactory.java | 6 +- .../iotdb/commons/concurrent/ThreadName.java | 1 + .../iotdb/commons/conf/CommonConfig.java | 10 ++ .../iotdb/commons/conf/CommonDescriptor.java | 6 + 12 files changed, 237 insertions(+), 38 deletions(-) diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java index f305b06398db9..306e6cd895f8f 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java @@ -204,6 +204,15 @@ public class ConfigNodeConfig { /** Acceptable pause duration for Phi accrual failure detector */ private long failureDetectorPhiAcceptablePauseInMs = 10000; + /** Base interval in ms for topology probing. Actual interval scales with DataNode count. */ + private long topologyProbingBaseIntervalInMs = 5000; + + /** Reference DataNode count for adaptive probing interval scaling. */ + private int topologyProbingReferenceNodeCount = 10; + + /** Ratio of probing timeout to probing interval (must be less than 1.0). */ + private double topologyProbingTimeoutRatio = 0.5; + /** The policy of cluster RegionGroups' leader distribution. */ private String leaderDistributionPolicy = AbstractLeaderBalancer.CFD_POLICY; @@ -1288,4 +1297,28 @@ public long getFailureDetectorPhiAcceptablePauseInMs() { public void setFailureDetectorPhiAcceptablePauseInMs(long failureDetectorPhiAcceptablePauseInMs) { this.failureDetectorPhiAcceptablePauseInMs = failureDetectorPhiAcceptablePauseInMs; } + + public long getTopologyProbingBaseIntervalInMs() { + return topologyProbingBaseIntervalInMs; + } + + public void setTopologyProbingBaseIntervalInMs(long topologyProbingBaseIntervalInMs) { + this.topologyProbingBaseIntervalInMs = topologyProbingBaseIntervalInMs; + } + + public int getTopologyProbingReferenceNodeCount() { + return topologyProbingReferenceNodeCount; + } + + public void setTopologyProbingReferenceNodeCount(int topologyProbingReferenceNodeCount) { + this.topologyProbingReferenceNodeCount = topologyProbingReferenceNodeCount; + } + + public double getTopologyProbingTimeoutRatio() { + return topologyProbingTimeoutRatio; + } + + public void setTopologyProbingTimeoutRatio(double topologyProbingTimeoutRatio) { + this.topologyProbingTimeoutRatio = topologyProbingTimeoutRatio; + } } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java index 77790dae1a903..506131a583a7e 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java @@ -322,6 +322,24 @@ private void loadProperties(TrimProperties properties) throws BadNodeUrlExceptio "failure_detector_phi_acceptable_pause_in_ms", String.valueOf(conf.getFailureDetectorPhiAcceptablePauseInMs())))); + conf.setTopologyProbingBaseIntervalInMs( + Long.parseLong( + properties.getProperty( + "topology_probing_base_interval_in_ms", + String.valueOf(conf.getTopologyProbingBaseIntervalInMs())))); + + conf.setTopologyProbingReferenceNodeCount( + Integer.parseInt( + properties.getProperty( + "topology_probing_reference_node_count", + String.valueOf(conf.getTopologyProbingReferenceNodeCount())))); + + conf.setTopologyProbingTimeoutRatio( + Double.parseDouble( + properties.getProperty( + "topology_probing_timeout_ratio", + String.valueOf(conf.getTopologyProbingTimeoutRatio())))); + String leaderDistributionPolicy = properties.getProperty("leader_distribution_policy", conf.getLeaderDistributionPolicy()); if (AbstractLeaderBalancer.GREEDY_POLICY.equals(leaderDistributionPolicy) diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/LoadCache.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/LoadCache.java index 07105d96bbd1d..41a0dbc5c4671 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/LoadCache.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/LoadCache.java @@ -787,7 +787,7 @@ public void updateTopology(Map> latestTopology) { for (int fromId : latestTopology.keySet()) { for (int toId : latestTopology.keySet()) { boolean originReachable = - latestTopology.getOrDefault(fromId, Collections.emptySet()).contains(toId); + topologyGraph.getOrDefault(fromId, Collections.emptySet()).contains(toId); boolean newReachable = latestTopology.getOrDefault(fromId, Collections.emptySet()).contains(toId); if (originReachable != newReachable) { diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/HeartbeatService.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/HeartbeatService.java index 64322da5bbb20..e5f65152c875c 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/HeartbeatService.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/HeartbeatService.java @@ -49,7 +49,6 @@ import org.slf4j.LoggerFactory; import java.util.List; -import java.util.Map; import java.util.Optional; import java.util.Set; import java.util.concurrent.Future; @@ -168,12 +167,7 @@ protected TDataNodeHeartbeatReq genHeartbeatReq() { heartbeatReq.setSpaceQuotaUsage(configManager.getClusterQuotaManager().getSpaceQuotaUsage()); } - final Map> topologyMap = - configManager.getLoadManager().getLoadCache().getTopology(); - if (topologyMap != null) { - heartbeatReq.setTopology(topologyMap); - heartbeatReq.setDataNodes(configManager.getNodeManager().getRegisteredDataNodeLocations()); - } + // Topology is now pushed independently by TopologyService, no longer piggybacked on heartbeat // We broadcast region operations list every 100 heartbeat loops if (heartbeatCounter.get() % 100 == 0) { diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/TopologyService.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/TopologyService.java index 9e4f6bd6121ab..1fd716fc7fc92 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/TopologyService.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/TopologyService.java @@ -21,9 +21,13 @@ import org.apache.iotdb.common.rpc.thrift.TDataNodeConfiguration; import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation; +import org.apache.iotdb.common.rpc.thrift.TEndPoint; import org.apache.iotdb.common.rpc.thrift.TNodeLocations; import org.apache.iotdb.common.rpc.thrift.TTestConnectionResp; import org.apache.iotdb.common.rpc.thrift.TTestConnectionResult; +import org.apache.iotdb.commons.client.ClientPoolFactory; +import org.apache.iotdb.commons.client.IClientManager; +import org.apache.iotdb.commons.client.async.AsyncDataNodeInternalServiceClient; import org.apache.iotdb.commons.cluster.NodeStatus; import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory; import org.apache.iotdb.commons.concurrent.ThreadName; @@ -41,14 +45,18 @@ import org.apache.iotdb.confignode.manager.load.cache.node.NodeStatistics; import org.apache.iotdb.confignode.manager.load.subscriber.IClusterStatusSubscriber; import org.apache.iotdb.confignode.manager.load.subscriber.NodeStatisticsChangeEvent; +import org.apache.iotdb.mpp.rpc.thrift.TDataNodeHeartbeatReq; +import org.apache.iotdb.mpp.rpc.thrift.TDataNodeHeartbeatResp; import org.apache.ratis.util.AwaitForSignal; +import org.apache.thrift.async.AsyncMethodCallback; import org.apache.tsfile.utils.Pair; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.ArrayList; import java.util.Collections; +import java.util.Comparator; import java.util.HashSet; import java.util.LinkedList; import java.util.List; @@ -67,8 +75,6 @@ public class TopologyService implements Runnable, IClusterStatusSubscriber { private static final Logger LOGGER = LoggerFactory.getLogger(TopologyService.class); - private static final long PROBING_INTERVAL_MS = 5_000L; - private static final long PROBING_TIMEOUT_MS = PROBING_INTERVAL_MS; private static final int SAMPLING_WINDOW_SIZE = 100; private final ExecutorService topologyThread = @@ -90,6 +96,16 @@ public class TopologyService implements Runnable, IClusterStatusSubscriber { private final IFailureDetector failureDetector; private static final ConfigNodeConfig CONF = ConfigNodeDescriptor.getInstance().getConf(); + /** Rotation index for sqrt(N) prober selection. */ + private int proberRotationIndex = 0; + + /** Last topology pushed to each DataNode, used to detect changes. */ + private final Map> lastPushedTopology = new ConcurrentHashMap<>(); + + /** Client manager for pushing topology updates to DataNodes via heartbeat RPC. */ + private final IClientManager + topologyPushClientManager; + public TopologyService( IManager configManager, Consumer>> topologyChangeListener) { this.configManager = configManager; @@ -97,6 +113,10 @@ public TopologyService( this.heartbeats = new ConcurrentHashMap<>(); this.shouldRun = new AtomicBoolean(false); this.awaitForSignal = new AwaitForSignal(this.getClass().getSimpleName()); + this.topologyPushClientManager = + new IClientManager.Factory() + .createClientManager( + new ClientPoolFactory.AsyncDataNodeHeartbeatServiceClientPoolFactory()); // here we use the same failure switch (CONF.getFailureDetector()) { @@ -133,12 +153,17 @@ public synchronized void stopTopologyService() { } /** - * Schedule the {@link #topologyProbing} task either: 1. every PROBING_INTERVAL_MS interval. 2. + * Schedule the {@link #topologyProbing} task either: 1. every adaptive probing interval. 2. * Manually triggered by outside events (node restart / register, etc.). */ private boolean mayWait() { try { - this.awaitForSignal.await(PROBING_INTERVAL_MS, TimeUnit.MILLISECONDS); + long baseInterval = CONF.getTopologyProbingBaseIntervalInMs(); + int dataNodeCount = + configManager.getNodeManager().getRegisteredDataNodes().size() - startingDataNodes.size(); + int referenceNodeCount = CONF.getTopologyProbingReferenceNodeCount(); + long interval = Math.max(baseInterval, baseInterval * dataNodeCount / referenceNodeCount); + this.awaitForSignal.await(interval, TimeUnit.MILLISECONDS); return true; } catch (InterruptedException e) { // we don't reset the interrupt flag here since we may reuse this thread again. @@ -153,8 +178,29 @@ public void run() { } } + /** + * Select sqrt(N) DataNodes as probers, rotating through all DataNodes across cycles so that every + * DataNode gets to be a prober over sqrt(N) cycles. + */ + private List selectProbers(List allDataNodes) { + int n = allDataNodes.size(); + if (n <= 1) { + return allDataNodes; + } + int sqrtN = (int) Math.ceil(Math.sqrt(n)); + List sorted = new ArrayList<>(allDataNodes); + sorted.sort(Comparator.comparingInt(TDataNodeLocation::getDataNodeId)); + int startIndex = (proberRotationIndex * sqrtN) % n; + proberRotationIndex++; + List probers = new ArrayList<>(sqrtN); + for (int i = 0; i < sqrtN && i < n; i++) { + probers.add(sorted.get((startIndex + i) % n)); + } + return probers; + } + private synchronized void topologyProbing() { - // 1. get the latest datanode list + // 1. get the latest datanode list, filter out starting ones final List dataNodeLocations = new ArrayList<>(); final Set dataNodeIds = new HashSet<>(); for (final TDataNodeConfiguration dataNodeConf : @@ -167,22 +213,36 @@ private synchronized void topologyProbing() { dataNodeIds.add(location.getDataNodeId()); } - // 2. send the verify connection RPC to all datanodes + // 2. compute adaptive interval and timeout from N = dataNodeLocations.size() + final long baseInterval = CONF.getTopologyProbingBaseIntervalInMs(); + final int referenceNodeCount = CONF.getTopologyProbingReferenceNodeCount(); + final double timeoutRatio = CONF.getTopologyProbingTimeoutRatio(); + final int dataNodeCount = dataNodeLocations.size(); + final long interval = Math.max(baseInterval, baseInterval * dataNodeCount / referenceNodeCount); + final long timeout = (long) (interval * timeoutRatio); + + // 3. select sqrt(N) probers via rotating selection + final List probers = selectProbers(dataNodeLocations); + + // 4. build TNodeLocations with ALL DataNode locations (so probers test all targets) final TNodeLocations nodeLocations = new TNodeLocations(); nodeLocations.setDataNodeLocations(dataNodeLocations); nodeLocations.setConfigNodeLocations(Collections.emptyList()); - final Map dataNodeLocationMap = - configManager.getNodeManager().getRegisteredDataNodes().stream() - .map(TDataNodeConfiguration::getLocation) + + // 5. build proberLocationMap containing only the selected probers + final Map proberLocationMap = + probers.stream() .collect(Collectors.toMap(TDataNodeLocation::getDataNodeId, location -> location)); + + // 6. send async requests ONLY to probers (not all DataNodes) with computed timeout final DataNodeAsyncRequestContext dataNodeAsyncRequestContext = new DataNodeAsyncRequestContext<>( CnToDnAsyncRequestType.SUBMIT_TEST_DN_INTERNAL_CONNECTION_TASK, nodeLocations, - dataNodeLocationMap); + proberLocationMap); CnToDnInternalServiceAsyncRequestManager.getInstance() - .sendAsyncRequestWithTimeoutInMs(dataNodeAsyncRequestContext, PROBING_TIMEOUT_MS); + .sendAsyncRequestWithTimeoutInMs(dataNodeAsyncRequestContext, timeout); final List results = new ArrayList<>(); dataNodeAsyncRequestContext .getResponseMap() @@ -193,7 +253,7 @@ private synchronized void topologyProbing() { } }); - // 3. collect results and update the heartbeat timestamps + // 7. collect results and update the heartbeat timestamps for (final TTestConnectionResult result : results) { final int fromDataNodeId = Optional.ofNullable(result.getSender().getDataNodeLocation()) @@ -203,8 +263,6 @@ private synchronized void topologyProbing() { if (result.isSuccess() && dataNodeIds.contains(fromDataNodeId) && dataNodeIds.contains(toDataNodeId)) { - // testAllDataNodeConnectionWithTimeout ensures the heartbeats are Dn-Dn internally. Here we - // just double-check. final List heartbeatHistory = heartbeats.computeIfAbsent( new Pair<>(fromDataNodeId, toDataNodeId), p -> new LinkedList<>()); @@ -215,7 +273,7 @@ private synchronized void topologyProbing() { } } - // 4. use failure detector to identify potential network partitions + // 8. use failure detector to identify potential network partitions (on ALL heartbeat pairs) final Map> latestTopology = dataNodeLocations.stream() .collect(Collectors.toMap(TDataNodeLocation::getDataNodeId, k -> new HashSet<>())); @@ -234,10 +292,63 @@ private synchronized void topologyProbing() { logAsymmetricPartition(latestTopology); - // 5. notify the listeners on topology change + // 9. notify the listeners on topology change if (shouldRun.get()) { topologyChangeListener.accept(latestTopology); } + + // 10. push topology changes to DataNodes + pushTopologyToDataNodes(latestTopology, dataNodeLocations); + } + + /** + * Push topology changes to DataNodes via heartbeat RPC. Each DataNode only receives a push when + * its own reachable set has changed since the last push. + */ + private void pushTopologyToDataNodes( + Map> latestTopology, List dataNodeLocations) { + // Build dataNodes map once for all pushes + final Map dataNodesMap = + dataNodeLocations.stream() + .collect(Collectors.toMap(TDataNodeLocation::getDataNodeId, loc -> loc)); + + for (final TDataNodeLocation location : dataNodeLocations) { + final int nodeId = location.getDataNodeId(); + final Set reachableSet = latestTopology.getOrDefault(nodeId, Collections.emptySet()); + final Set lastPushed = lastPushedTopology.get(nodeId); + + if (lastPushed != null && lastPushed.equals(reachableSet)) { + continue; + } + + final TDataNodeHeartbeatReq req = + new TDataNodeHeartbeatReq(System.nanoTime(), false, false, 0L, 0L); + req.setTopology(latestTopology); + req.setDataNodes(dataNodesMap); + + final TEndPoint endPoint = location.getInternalEndPoint(); + try { + topologyPushClientManager + .borrowClient(endPoint) + .getDataNodeHeartBeat( + req, + new AsyncMethodCallback() { + @Override + public void onComplete(TDataNodeHeartbeatResp response) { + // No-op: topology push is fire-and-forget + } + + @Override + public void onError(Exception exception) { + // No-op: topology push failures are silently ignored + } + }); + lastPushedTopology.put(nodeId, new HashSet<>(reachableSet)); + } catch (Exception e) { + LOGGER.debug( + "Failed to push topology to DataNode {} at {}: {}", nodeId, endPoint, e.getMessage()); + } + } } /** diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java index 06d6a512b30a4..bc0a619afd892 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java @@ -24,7 +24,6 @@ import org.apache.iotdb.commons.concurrent.IoTThreadFactory; import org.apache.iotdb.commons.concurrent.ThreadName; import org.apache.iotdb.commons.concurrent.threadpool.WrappedThreadPoolExecutor; -import org.apache.iotdb.commons.conf.CommonDescriptor; import org.apache.iotdb.commons.consensus.DataRegionId; import org.apache.iotdb.commons.consensus.SchemaRegionId; import org.apache.iotdb.commons.consensus.index.ProgressIndex; @@ -377,8 +376,7 @@ public void stopAllPipesWithCriticalExceptionAndTrackException( ///////////////////////// Heartbeat ///////////////////////// public void collectPipeMetaList(final TDataNodeHeartbeatResp resp) throws TException { - if (!tryReadLockWithTimeOutInMs( - CommonDescriptor.getInstance().getConfig().getDnConnectionTimeoutInMS() * 2L / 3)) { + if (!tryReadLockWithTimeOut(2)) { return; } try { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java index 84479c9dcd22a..e889d936ace5f 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java @@ -52,6 +52,7 @@ import org.apache.iotdb.commons.cluster.NodeStatus; import org.apache.iotdb.commons.concurrent.Await; import org.apache.iotdb.commons.concurrent.AwaitTimeoutException; +import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory; import org.apache.iotdb.commons.concurrent.IoTThreadFactory; import org.apache.iotdb.commons.concurrent.ThreadName; import org.apache.iotdb.commons.concurrent.threadpool.WrappedThreadPoolExecutor; @@ -417,6 +418,12 @@ public class DataNodeInternalRPCServiceImpl implements IDataNodeRPCService.Iface private final ClusterTopology clusterTopology = ClusterTopology.getInstance(); + private static final long TEST_CONNECTION_TIMEOUT_MS = + CommonDescriptor.getInstance().getConfig().getDnConnectionTimeoutInMS(); + + private static final ExecutorService TOPOLOGY_PROBING_EXECUTOR = + IoTDBThreadPoolFactory.newFixedThreadPool(2, ThreadName.DATANODE_TOPOLOGY_PROBING.getName()); + private final CommonConfig commonConfig = CommonDescriptor.getInstance().getConfig(); private final DataNodeContext dataNodeContext; @@ -2076,9 +2083,23 @@ public TTestConnectionResp submitTestConnectionTask(final TNodeLocations nodeLoc @Override public TTestConnectionResp submitInternalTestConnectionTask(TNodeLocations nodeLocations) throws TException { - return new TTestConnectionResp( - new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()), - testAllDataNodeConnectionInHeartbeatChannel(nodeLocations.getDataNodeLocations())); + try { + Future future = + TOPOLOGY_PROBING_EXECUTOR.submit( + () -> + new TTestConnectionResp( + new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()), + testAllDataNodeConnectionInHeartbeatChannel( + nodeLocations.getDataNodeLocations()))); + return future.get(TEST_CONNECTION_TIMEOUT_MS, TimeUnit.MILLISECONDS); + } catch (TimeoutException e) { + return new TTestConnectionResp( + new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode()) + .setMessage("Topology probing timed out after " + TEST_CONNECTION_TIMEOUT_MS + "ms"), + Collections.emptyList()); + } catch (Exception e) { + throw new TException(e); + } } private static List testConnections( @@ -2105,7 +2126,8 @@ private List testAllConfigNodeConnection( TServiceType.ConfigNodeInternalService, DnToCnRequestType.TEST_CONNECTION, (AsyncRequestContext handler) -> - DnToCnInternalServiceAsyncRequestManager.getInstance().sendAsyncRequest(handler)); + DnToCnInternalServiceAsyncRequestManager.getInstance() + .sendAsyncRequestWithTimeoutInMs(handler, TEST_CONNECTION_TIMEOUT_MS)); } private List testAllDataNodeConnectionInHeartbeatChannel( @@ -2117,7 +2139,8 @@ private List testAllDataNodeConnectionInHeartbeatChannel( TServiceType.DataNodeInternalService, DnToDnRequestType.TEST_CONNECTION, (AsyncRequestContext handler) -> - DataNodeIntraHeartbeatManager.getInstance().sendAsyncRequest(handler, 1, null, true)); + DataNodeIntraHeartbeatManager.getInstance() + .sendAsyncRequest(handler, 1, TEST_CONNECTION_TIMEOUT_MS, true)); } private List testAllDataNodeInternalServiceConnection( @@ -2129,7 +2152,8 @@ private List testAllDataNodeInternalServiceConnection( TServiceType.DataNodeInternalService, DnToDnRequestType.TEST_CONNECTION, (AsyncRequestContext handler) -> - DnToDnInternalServiceAsyncRequestManager.getInstance().sendAsyncRequest(handler)); + DnToDnInternalServiceAsyncRequestManager.getInstance() + .sendAsyncRequestWithTimeoutInMs(handler, TEST_CONNECTION_TIMEOUT_MS)); } private List testAllDataNodeMPPServiceConnection( @@ -2141,7 +2165,8 @@ private List testAllDataNodeMPPServiceConnection( TServiceType.DataNodeMPPService, DnToDnRequestType.TEST_CONNECTION, (AsyncRequestContext handler) -> - DataNodeMPPServiceAsyncRequestManager.getInstance().sendAsyncRequest(handler)); + DataNodeMPPServiceAsyncRequestManager.getInstance() + .sendAsyncRequestWithTimeoutInMs(handler, TEST_CONNECTION_TIMEOUT_MS)); } private List testAllDataNodeExternalServiceConnection( @@ -2153,7 +2178,8 @@ private List testAllDataNodeExternalServiceConnection( TServiceType.DataNodeExternalService, DnToDnRequestType.TEST_CONNECTION, (AsyncRequestContext handler) -> - DataNodeExternalServiceAsyncRequestManager.getInstance().sendAsyncRequest(handler)); + DataNodeExternalServiceAsyncRequestManager.getInstance() + .sendAsyncRequestWithTimeoutInMs(handler, TEST_CONNECTION_TIMEOUT_MS)); } @Override diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/ClusterTopology.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/ClusterTopology.java index 64ea8b8a2ea11..194d9f8c6cdf0 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/ClusterTopology.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/ClusterTopology.java @@ -152,7 +152,7 @@ public void updateTopology( for (int fromId : dataNodes.keySet()) { for (int toId : dataNodes.keySet()) { boolean originReachable = - latestTopology.getOrDefault(fromId, Collections.emptySet()).contains(toId); + this.topologyMap.get().getOrDefault(fromId, Collections.emptySet()).contains(toId); boolean newReachable = latestTopology.getOrDefault(fromId, Collections.emptySet()).contains(toId); if (originReachable != newReachable) { diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientPoolFactory.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientPoolFactory.java index ea20f9c76ccd8..f9ffca123b5c7 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientPoolFactory.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientPoolFactory.java @@ -182,7 +182,8 @@ public GenericKeyedObjectPool c new ThriftClientProperty.Builder() .setConnectionTimeoutMs(conf.getCnConnectionTimeoutInMS()) .setRpcThriftCompressionEnabled(conf.isRpcThriftCompressionEnabled()) - .setSelectorNumOfAsyncClientManager(conf.getSelectorNumOfClientManager()) + .setSelectorNumOfAsyncClientManager( + conf.getHeartbeatSelectorNumOfClientManager()) .setPrintLogWhenEncounterException(false) .build(), ThreadName.ASYNC_CONFIGNODE_HEARTBEAT_CLIENT_POOL.getName()), @@ -207,7 +208,8 @@ public GenericKeyedObjectPool cre new ThriftClientProperty.Builder() .setConnectionTimeoutMs(conf.getCnConnectionTimeoutInMS()) .setRpcThriftCompressionEnabled(conf.isRpcThriftCompressionEnabled()) - .setSelectorNumOfAsyncClientManager(conf.getSelectorNumOfClientManager()) + .setSelectorNumOfAsyncClientManager( + conf.getHeartbeatSelectorNumOfClientManager()) .setPrintLogWhenEncounterException(false) .build(), ThreadName.ASYNC_DATANODE_HEARTBEAT_CLIENT_POOL.getName()), diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java index 20fa9d78d5927..ad22c8b20ae71 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java @@ -196,6 +196,7 @@ public enum ThreadName { INFLUXDB_RPC_PROCESSOR("InfluxdbRPC-Processor"), STORAGE_ENGINE_CACHED_POOL("StorageEngine"), DATANODE_SHUTDOWN_HOOK("DataNode-Shutdown-Hook"), + DATANODE_TOPOLOGY_PROBING("DataNode-Topology-Probing"), UPGRADE_TASK("UpgradeThread"), REGION_MIGRATE("Region-Migrate-Pool"), STORAGE_ENGINE_RECOVER_TRIGGER("StorageEngine-RecoverTrigger"), diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java index 19313723e55f5..598c9d5f02134 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java @@ -155,6 +155,8 @@ public class CommonConfig { */ private int selectorNumOfClientManager = 1; + private int heartbeatSelectorNumOfClientManager = 4; + /** Whether to use thrift compression. */ private boolean isRpcThriftCompressionEnabled = false; @@ -697,6 +699,14 @@ public void setSelectorNumOfClientManager(int selectorNumOfClientManager) { this.selectorNumOfClientManager = selectorNumOfClientManager; } + public int getHeartbeatSelectorNumOfClientManager() { + return heartbeatSelectorNumOfClientManager; + } + + public void setHeartbeatSelectorNumOfClientManager(int heartbeatSelectorNumOfClientManager) { + this.heartbeatSelectorNumOfClientManager = heartbeatSelectorNumOfClientManager; + } + public boolean isRpcThriftCompressionEnabled() { return isRpcThriftCompressionEnabled; } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java index d392a60bbbd76..11db54be34f81 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java @@ -178,6 +178,12 @@ public void loadCommonProps(TrimProperties properties) throws IOException { String.valueOf(config.getSelectorNumOfClientManager())) .trim())); + config.setHeartbeatSelectorNumOfClientManager( + Integer.parseInt( + properties.getProperty( + "heartbeat_selector_num_of_client_manager", + String.valueOf(config.getHeartbeatSelectorNumOfClientManager())))); + config.setMaxClientNumForEachNode( Integer.parseInt( properties From af504f9cfa87db2760129818fe2ad0a72973e6f9 Mon Sep 17 00:00:00 2001 From: Yongzao <532741407@qq.com> Date: Tue, 5 May 2026 15:18:42 +0800 Subject: [PATCH 2/8] update --- .../iotdb/confignode/conf/ConfigNodeConfig.java | 13 +------------ .../confignode/conf/ConfigNodeDescriptor.java | 6 ------ .../manager/load/service/HeartbeatService.java | 2 -- .../manager/load/service/TopologyService.java | 16 +++------------- .../impl/DataNodeInternalRPCServiceImpl.java | 1 + .../conf/iotdb-system.properties.template | 16 ++++++++++++++++ .../apache/iotdb/commons/conf/CommonConfig.java | 5 ++++- 7 files changed, 25 insertions(+), 34 deletions(-) diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java index 306e6cd895f8f..4dd5e78372f46 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java @@ -204,12 +204,9 @@ public class ConfigNodeConfig { /** Acceptable pause duration for Phi accrual failure detector */ private long failureDetectorPhiAcceptablePauseInMs = 10000; - /** Base interval in ms for topology probing. Actual interval scales with DataNode count. */ + /** Base interval in ms for topology probing. */ private long topologyProbingBaseIntervalInMs = 5000; - /** Reference DataNode count for adaptive probing interval scaling. */ - private int topologyProbingReferenceNodeCount = 10; - /** Ratio of probing timeout to probing interval (must be less than 1.0). */ private double topologyProbingTimeoutRatio = 0.5; @@ -1306,14 +1303,6 @@ public void setTopologyProbingBaseIntervalInMs(long topologyProbingBaseIntervalI this.topologyProbingBaseIntervalInMs = topologyProbingBaseIntervalInMs; } - public int getTopologyProbingReferenceNodeCount() { - return topologyProbingReferenceNodeCount; - } - - public void setTopologyProbingReferenceNodeCount(int topologyProbingReferenceNodeCount) { - this.topologyProbingReferenceNodeCount = topologyProbingReferenceNodeCount; - } - public double getTopologyProbingTimeoutRatio() { return topologyProbingTimeoutRatio; } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java index 506131a583a7e..591a4cb817ec2 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java @@ -328,12 +328,6 @@ private void loadProperties(TrimProperties properties) throws BadNodeUrlExceptio "topology_probing_base_interval_in_ms", String.valueOf(conf.getTopologyProbingBaseIntervalInMs())))); - conf.setTopologyProbingReferenceNodeCount( - Integer.parseInt( - properties.getProperty( - "topology_probing_reference_node_count", - String.valueOf(conf.getTopologyProbingReferenceNodeCount())))); - conf.setTopologyProbingTimeoutRatio( Double.parseDouble( properties.getProperty( diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/HeartbeatService.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/HeartbeatService.java index e5f65152c875c..100b0cbf384e0 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/HeartbeatService.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/HeartbeatService.java @@ -167,8 +167,6 @@ protected TDataNodeHeartbeatReq genHeartbeatReq() { heartbeatReq.setSpaceQuotaUsage(configManager.getClusterQuotaManager().getSpaceQuotaUsage()); } - // Topology is now pushed independently by TopologyService, no longer piggybacked on heartbeat - // We broadcast region operations list every 100 heartbeat loops if (heartbeatCounter.get() % 100 == 0) { heartbeatReq.setCurrentRegionOperations( diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/TopologyService.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/TopologyService.java index 1fd716fc7fc92..76d943e24c224 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/TopologyService.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/TopologyService.java @@ -158,15 +158,9 @@ public synchronized void stopTopologyService() { */ private boolean mayWait() { try { - long baseInterval = CONF.getTopologyProbingBaseIntervalInMs(); - int dataNodeCount = - configManager.getNodeManager().getRegisteredDataNodes().size() - startingDataNodes.size(); - int referenceNodeCount = CONF.getTopologyProbingReferenceNodeCount(); - long interval = Math.max(baseInterval, baseInterval * dataNodeCount / referenceNodeCount); - this.awaitForSignal.await(interval, TimeUnit.MILLISECONDS); + this.awaitForSignal.await(CONF.getTopologyProbingBaseIntervalInMs(), TimeUnit.MILLISECONDS); return true; } catch (InterruptedException e) { - // we don't reset the interrupt flag here since we may reuse this thread again. return false; } } @@ -213,13 +207,9 @@ private synchronized void topologyProbing() { dataNodeIds.add(location.getDataNodeId()); } - // 2. compute adaptive interval and timeout from N = dataNodeLocations.size() + // 2. compute probing timeout final long baseInterval = CONF.getTopologyProbingBaseIntervalInMs(); - final int referenceNodeCount = CONF.getTopologyProbingReferenceNodeCount(); - final double timeoutRatio = CONF.getTopologyProbingTimeoutRatio(); - final int dataNodeCount = dataNodeLocations.size(); - final long interval = Math.max(baseInterval, baseInterval * dataNodeCount / referenceNodeCount); - final long timeout = (long) (interval * timeoutRatio); + final long timeout = (long) (baseInterval * CONF.getTopologyProbingTimeoutRatio()); // 3. select sqrt(N) probers via rotating selection final List probers = selectProbers(dataNodeLocations); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java index e889d936ace5f..2e0f6fd84b5d4 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java @@ -374,6 +374,7 @@ import java.util.concurrent.Future; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; diff --git a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template index 87be30f4520e0..ae940226ab89c 100644 --- a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template +++ b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template @@ -722,6 +722,22 @@ failure_detector_phi_threshold=30 # Datatype: long failure_detector_phi_acceptable_pause_in_ms=10000 +# Base interval in ms for topology probing between DataNodes +# effectiveMode: restart +# Datatype: long +# topology_probing_base_interval_in_ms=5000 + +# Ratio of probing timeout to probing interval (must be less than 1.0) +# effectiveMode: restart +# Datatype: double +# topology_probing_timeout_ratio=0.5 + +# Selector thread (TAsyncClientManager) count for heartbeat async client pools. +# 0 means auto: max(1, availableProcessors / 4) +# effectiveMode: restart +# Datatype: int +# heartbeat_selector_num_of_client_manager=0 + # Disk remaining threshold at which DataNode is set to ReadOnly status # effectiveMode: restart # Datatype: double(percentage) diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java index 598c9d5f02134..f91f6156d753c 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java @@ -155,7 +155,7 @@ public class CommonConfig { */ private int selectorNumOfClientManager = 1; - private int heartbeatSelectorNumOfClientManager = 4; + private int heartbeatSelectorNumOfClientManager = 0; /** Whether to use thrift compression. */ private boolean isRpcThriftCompressionEnabled = false; @@ -700,6 +700,9 @@ public void setSelectorNumOfClientManager(int selectorNumOfClientManager) { } public int getHeartbeatSelectorNumOfClientManager() { + if (heartbeatSelectorNumOfClientManager <= 0) { + return Math.max(1, Runtime.getRuntime().availableProcessors() / 4); + } return heartbeatSelectorNumOfClientManager; } From e992d04a2041aa5b67a587d546701677e62c47cb Mon Sep 17 00:00:00 2001 From: Yongzao <532741407@qq.com> Date: Tue, 5 May 2026 16:29:14 +0800 Subject: [PATCH 3/8] better impl 4 topology service --- .../manager/load/service/TopologyService.java | 32 +++---- .../impl/DataNodeInternalRPCServiceImpl.java | 4 +- .../db/queryengine/plan/ClusterTopology.java | 89 +++++++------------ 3 files changed, 45 insertions(+), 80 deletions(-) diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/TopologyService.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/TopologyService.java index 76d943e24c224..8203f83402e6c 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/TopologyService.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/TopologyService.java @@ -57,6 +57,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.Comparator; +import java.util.HashMap; import java.util.HashSet; import java.util.LinkedList; import java.util.List; @@ -65,7 +66,6 @@ import java.util.Optional; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; @@ -91,7 +91,6 @@ public class TopologyService implements Runnable, IClusterStatusSubscriber { /* (fromDataNodeId, toDataNodeId) -> heartbeat history */ private final Map, List> heartbeats; - private final List startingDataNodes = new CopyOnWriteArrayList<>(); private final IFailureDetector failureDetector; private static final ConfigNodeConfig CONF = ConfigNodeDescriptor.getInstance().getConf(); @@ -194,14 +193,15 @@ private List selectProbers(List allDataNod } private synchronized void topologyProbing() { - // 1. get the latest datanode list, filter out starting ones + // 1. get Running DataNodes only final List dataNodeLocations = new ArrayList<>(); final Set dataNodeIds = new HashSet<>(); for (final TDataNodeConfiguration dataNodeConf : configManager.getNodeManager().getRegisteredDataNodes()) { final TDataNodeLocation location = dataNodeConf.getLocation(); - if (startingDataNodes.contains(location.getDataNodeId())) { - continue; // we shall wait for internal endpoint to be ready + if (configManager.getLoadManager().getNodeStatus(location.getDataNodeId()) + != NodeStatus.Running) { + continue; } dataNodeLocations.add(location); dataNodeIds.add(location.getDataNodeId()); @@ -297,7 +297,6 @@ private synchronized void topologyProbing() { */ private void pushTopologyToDataNodes( Map> latestTopology, List dataNodeLocations) { - // Build dataNodes map once for all pushes final Map dataNodesMap = dataNodeLocations.stream() .collect(Collectors.toMap(TDataNodeLocation::getDataNodeId, loc -> loc)); @@ -311,9 +310,13 @@ private void pushTopologyToDataNodes( continue; } + // Per-node topology: only this DataNode's own reachable set + final Map> perNodeTopology = new HashMap<>(); + perNodeTopology.put(nodeId, new HashSet<>(reachableSet)); + final TDataNodeHeartbeatReq req = new TDataNodeHeartbeatReq(System.nanoTime(), false, false, 0L, 0L); - req.setTopology(latestTopology); + req.setTopology(perNodeTopology); req.setDataNodes(dataNodesMap); final TEndPoint endPoint = location.getInternalEndPoint(); @@ -324,14 +327,10 @@ private void pushTopologyToDataNodes( req, new AsyncMethodCallback() { @Override - public void onComplete(TDataNodeHeartbeatResp response) { - // No-op: topology push is fire-and-forget - } + public void onComplete(TDataNodeHeartbeatResp response) {} @Override - public void onError(Exception exception) { - // No-op: topology push failures are silently ignored - } + public void onError(Exception exception) {} }); lastPushedTopology.put(nodeId, new HashSet<>(reachableSet)); } catch (Exception e) { @@ -388,11 +387,7 @@ public void onNodeStatisticsChanged(NodeStatisticsChangeEvent event) { continue; } if (changeEvent.getLeft() == null) { - // if a new datanode registered, DO NOT trigger probing immediately - startingDataNodes.add(nodeId); continue; - } else { - startingDataNodes.remove(nodeId); } final Set> affectedPairs = @@ -404,13 +399,10 @@ public void onNodeStatisticsChanged(NodeStatisticsChangeEvent event) { .collect(Collectors.toSet()); if (changeEvent.getRight() == null) { - // datanode removed from cluster, clean up probing history affectedPairs.forEach(heartbeats::remove); } else { - // we only trigger probing immediately if node comes around from UNKNOWN to RUNNING if (NodeStatus.Unknown.equals(changeEvent.getLeft().getStatus()) && NodeStatus.Running.equals(changeEvent.getRight().getStatus())) { - // let's clear the history when a new node comes around affectedPairs.forEach(pair -> heartbeats.put(pair, new ArrayList<>())); awaitForSignal.signal(); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java index 2e0f6fd84b5d4..ec691880a43d8 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java @@ -423,7 +423,9 @@ public class DataNodeInternalRPCServiceImpl implements IDataNodeRPCService.Iface CommonDescriptor.getInstance().getConfig().getDnConnectionTimeoutInMS(); private static final ExecutorService TOPOLOGY_PROBING_EXECUTOR = - IoTDBThreadPoolFactory.newFixedThreadPool(2, ThreadName.DATANODE_TOPOLOGY_PROBING.getName()); + IoTDBThreadPoolFactory.newFixedThreadPool( + Math.max(1, Runtime.getRuntime().availableProcessors() / 4), + ThreadName.DATANODE_TOPOLOGY_PROBING.getName()); private final CommonConfig commonConfig = CommonDescriptor.getInstance().getConfig(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/ClusterTopology.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/ClusterTopology.java index 194d9f8c6cdf0..f7807d4a90941 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/ClusterTopology.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/ClusterTopology.java @@ -43,7 +43,7 @@ public class ClusterTopology { private static final Logger LOGGER = LoggerFactory.getLogger(ClusterTopology.class); private final Integer myself; private final AtomicReference> dataNodes; - private final AtomicReference>> topologyMap; + private final AtomicReference> myReachableNodes; private final AtomicBoolean isPartitioned = new AtomicBoolean(); public static ClusterTopology getInstance() { @@ -54,11 +54,10 @@ public TRegionReplicaSet getValidatedReplicaSet(TRegionReplicaSet origin) { if (!isPartitioned.get() || origin == null) { return origin; } - final Set reachableToMyself = - Collections.unmodifiableSet(topologyMap.get().get(myself)); + final Set reachable = myReachableNodes.get(); final List locations = new ArrayList<>(); for (final TDataNodeLocation location : origin.getDataNodeLocations()) { - if (reachableToMyself.contains(location.getDataNodeId())) { + if (reachable.contains(location.getDataNodeId())) { locations.add(location); } } @@ -97,82 +96,54 @@ private List getReachableCandidates(List a return all; } for (TRegionReplicaSet replicaSet : all) { - // some TRegionReplicaSet is unreachable since all DataNodes are down if (replicaSet.getDataNodeLocationsSize() == 0) { throw new ReplicaSetUnreachableException(replicaSet); } } - final Map> topologyMapCurrent = - Collections.unmodifiableMap(this.topologyMap.get()); - - // brute-force search to select DataNode candidates that can communicate to all - // TRegionReplicaSets - final List dataNodeCandidates = new ArrayList<>(); - for (final Integer datanode : topologyMapCurrent.keySet()) { - boolean reachableToAllSets = true; - final Set datanodeReachableToThis = topologyMapCurrent.get(datanode); - for (final TRegionReplicaSet replicaSet : all) { - final List replicaNodeLocations = - replicaSet.getDataNodeLocations().stream() - .map(TDataNodeLocation::getDataNodeId) - .collect(Collectors.toList()); - replicaNodeLocations.retainAll(datanodeReachableToThis); - reachableToAllSets = !replicaNodeLocations.isEmpty(); - } - if (reachableToAllSets) { - dataNodeCandidates.add(datanode); - } - } - // select TRegionReplicaSet candidates whose DataNode Locations contain at least one - // allReachableDataNodes + final Set reachable = this.myReachableNodes.get(); + final Map dataNodesCurrent = this.dataNodes.get(); + final List reachableSetCandidates = new ArrayList<>(); for (final TRegionReplicaSet replicaSet : all) { - final List commonLocations = + final List validLocations = replicaSet.getDataNodeLocations().stream() - .map(TDataNodeLocation::getDataNodeId) + .filter(loc -> reachable.contains(loc.getDataNodeId())) + .map(loc -> dataNodesCurrent.getOrDefault(loc.getDataNodeId(), loc)) .collect(Collectors.toList()); - commonLocations.retainAll(dataNodeCandidates); - if (!commonLocations.isEmpty()) { - final List validLocations = - commonLocations.stream().map(dataNodes.get()::get).collect(Collectors.toList()); - final TRegionReplicaSet validCandidate = - new TRegionReplicaSet(replicaSet.getRegionId(), validLocations); - reachableSetCandidates.add(validCandidate); + if (!validLocations.isEmpty()) { + reachableSetCandidates.add(new TRegionReplicaSet(replicaSet.getRegionId(), validLocations)); } } - return reachableSetCandidates; } public void updateTopology( final Map dataNodes, Map> latestTopology) { - if (!latestTopology.equals(topologyMap.get())) { - LOGGER.info("[Topology] latest view from config-node: {}", latestTopology); - for (int fromId : dataNodes.keySet()) { - for (int toId : dataNodes.keySet()) { - boolean originReachable = - this.topologyMap.get().getOrDefault(fromId, Collections.emptySet()).contains(toId); - boolean newReachable = - latestTopology.getOrDefault(fromId, Collections.emptySet()).contains(toId); - if (originReachable != newReachable) { - LOGGER.info( - "[Topology] Topology of DataNode {} is now {} to DataNode {}", - fromId, - newReachable ? "reachable" : "unreachable", - toId); - } + final Set newReachable = latestTopology.getOrDefault(myself, Collections.emptySet()); + final Set oldReachable = this.myReachableNodes.get(); + + if (!newReachable.equals(oldReachable)) { + LOGGER.info( + "[Topology] latest view from config-node for myself({}): {}", myself, newReachable); + for (int toId : dataNodes.keySet()) { + boolean wasReachable = oldReachable.contains(toId); + boolean nowReachable = newReachable.contains(toId); + if (wasReachable != nowReachable) { + LOGGER.info( + "[Topology] DataNode {} is now {} to myself({})", + toId, + nowReachable ? "reachable" : "unreachable", + myself); } } - this.topologyMap.set(latestTopology); + this.myReachableNodes.set(newReachable); } this.dataNodes.set(dataNodes); - if (latestTopology.get(myself) == null || latestTopology.get(myself).isEmpty()) { - // latest topology doesn't include this node information. - // This mostly happens when this node just starts and haven't report connection details. + if (newReachable.isEmpty()) { this.isPartitioned.set(false); } else { - this.isPartitioned.set(latestTopology.get(myself).size() != latestTopology.size()); + this.isPartitioned.set(newReachable.size() != dataNodes.size()); } } @@ -180,7 +151,7 @@ private ClusterTopology() { this.myself = IoTDBDescriptor.getInstance().getConfig().generateLocalDataNodeLocation().getDataNodeId(); this.isPartitioned.set(false); - this.topologyMap = new AtomicReference<>(Collections.emptyMap()); + this.myReachableNodes = new AtomicReference<>(Collections.emptySet()); this.dataNodes = new AtomicReference<>(Collections.emptyMap()); } From 10c29118aa979270477537fec2d98b9d019a64ce Mon Sep 17 00:00:00 2001 From: Yongzao <532741407@qq.com> Date: Wed, 6 May 2026 14:09:20 +0800 Subject: [PATCH 4/8] Refine topology probing, push, and ClusterTopology - Only run failure detector on current prober batch instead of all pairs - Push topology via dedicated PUSH_TOPOLOGY request type (not heartbeat) - Update lastPushedTopology only on successful push - Simplify onNodeStatisticsChanged to only clean up on removal/Removing - ClusterTopology returns full replicas when topology not yet probed - Add Javadoc to ClusterTopology --- .../client/async/CnToDnAsyncRequestType.java | 1 + ...oDnInternalServiceAsyncRequestManager.java | 7 + .../rpc/DataNodeAsyncRequestRPCHandler.java | 8 + .../handlers/rpc/TopologyPushRPCHandler.java | 53 +++++++ .../manager/load/service/TopologyService.java | 139 +++++++----------- .../db/queryengine/plan/ClusterTopology.java | 22 +++ 6 files changed, 144 insertions(+), 86 deletions(-) create mode 100644 iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/TopologyPushRPCHandler.java diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/CnToDnAsyncRequestType.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/CnToDnAsyncRequestType.java index e5753bf1bd184..e592ef8e5cbde 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/CnToDnAsyncRequestType.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/CnToDnAsyncRequestType.java @@ -34,6 +34,7 @@ public enum CnToDnAsyncRequestType { SUBMIT_TEST_CONNECTION_TASK, SUBMIT_TEST_DN_INTERNAL_CONNECTION_TASK, TEST_CONNECTION, + PUSH_TOPOLOGY, // Region Maintenance CREATE_DATA_REGION, diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/CnToDnInternalServiceAsyncRequestManager.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/CnToDnInternalServiceAsyncRequestManager.java index cd69f8b2c846d..7bed1b48cc43e 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/CnToDnInternalServiceAsyncRequestManager.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/CnToDnInternalServiceAsyncRequestManager.java @@ -43,6 +43,7 @@ import org.apache.iotdb.confignode.client.async.handlers.rpc.PipePushMetaRPCHandler; import org.apache.iotdb.confignode.client.async.handlers.rpc.SchemaUpdateRPCHandler; import org.apache.iotdb.confignode.client.async.handlers.rpc.SubmitTestConnectionTaskRPCHandler; +import org.apache.iotdb.confignode.client.async.handlers.rpc.TopologyPushRPCHandler; import org.apache.iotdb.confignode.client.async.handlers.rpc.TransferLeaderRPCHandler; import org.apache.iotdb.confignode.client.async.handlers.rpc.TreeDeviceViewFieldDetectionHandler; import org.apache.iotdb.confignode.client.async.handlers.rpc.subscription.CheckSchemaRegionUsingTemplateRPCHandler; @@ -64,6 +65,7 @@ import org.apache.iotdb.mpp.rpc.thrift.TCreatePipePluginInstanceReq; import org.apache.iotdb.mpp.rpc.thrift.TCreateSchemaRegionReq; import org.apache.iotdb.mpp.rpc.thrift.TCreateTriggerInstanceReq; +import org.apache.iotdb.mpp.rpc.thrift.TDataNodeHeartbeatReq; import org.apache.iotdb.mpp.rpc.thrift.TDeactivateTemplateReq; import org.apache.iotdb.mpp.rpc.thrift.TDeleteColumnDataReq; import org.apache.iotdb.mpp.rpc.thrift.TDeleteDataForDeleteSchemaReq; @@ -407,6 +409,11 @@ protected void initActionMapBuilder() { CnToDnAsyncRequestType.TEST_CONNECTION, (req, client, handler) -> client.testConnectionEmptyRPC((DataNodeTSStatusRPCHandler) handler)); + actionMapBuilder.put( + CnToDnAsyncRequestType.PUSH_TOPOLOGY, + (req, client, handler) -> + client.getDataNodeHeartBeat( + (TDataNodeHeartbeatReq) req, (TopologyPushRPCHandler) handler)); actionMapBuilder.put( CnToDnAsyncRequestType.INSERT_RECORD, (req, client, handler) -> diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/DataNodeAsyncRequestRPCHandler.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/DataNodeAsyncRequestRPCHandler.java index b2e2ec3232781..95d23bebd4b24 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/DataNodeAsyncRequestRPCHandler.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/DataNodeAsyncRequestRPCHandler.java @@ -202,6 +202,14 @@ public static DataNodeAsyncRequestRPCHandler buildHandler( dataNodeLocationMap, (Map) responseMap, countDownLatch); + case PUSH_TOPOLOGY: + return new TopologyPushRPCHandler( + requestType, + requestId, + targetDataNode, + dataNodeLocationMap, + (Map) responseMap, + countDownLatch); case SET_TTL: case CREATE_DATA_REGION: case CREATE_SCHEMA_REGION: diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/TopologyPushRPCHandler.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/TopologyPushRPCHandler.java new file mode 100644 index 0000000000000..d2023a8810865 --- /dev/null +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/TopologyPushRPCHandler.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.confignode.client.async.handlers.rpc; + +import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation; +import org.apache.iotdb.confignode.client.async.CnToDnAsyncRequestType; +import org.apache.iotdb.mpp.rpc.thrift.TDataNodeHeartbeatResp; + +import java.util.Map; +import java.util.concurrent.CountDownLatch; + +public class TopologyPushRPCHandler extends DataNodeAsyncRequestRPCHandler { + + public TopologyPushRPCHandler( + CnToDnAsyncRequestType requestType, + int requestId, + TDataNodeLocation targetDataNode, + Map dataNodeLocationMap, + Map responseMap, + CountDownLatch countDownLatch) { + super(requestType, requestId, targetDataNode, dataNodeLocationMap, responseMap, countDownLatch); + } + + @Override + public void onComplete(TDataNodeHeartbeatResp resp) { + responseMap.put(requestId, resp); + nodeLocationMap.remove(requestId); + countDownLatch.countDown(); + } + + @Override + public void onError(Exception e) { + nodeLocationMap.remove(requestId); + countDownLatch.countDown(); + } +} diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/TopologyService.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/TopologyService.java index 8203f83402e6c..038d8843411b1 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/TopologyService.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/TopologyService.java @@ -21,13 +21,9 @@ import org.apache.iotdb.common.rpc.thrift.TDataNodeConfiguration; import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation; -import org.apache.iotdb.common.rpc.thrift.TEndPoint; import org.apache.iotdb.common.rpc.thrift.TNodeLocations; import org.apache.iotdb.common.rpc.thrift.TTestConnectionResp; import org.apache.iotdb.common.rpc.thrift.TTestConnectionResult; -import org.apache.iotdb.commons.client.ClientPoolFactory; -import org.apache.iotdb.commons.client.IClientManager; -import org.apache.iotdb.commons.client.async.AsyncDataNodeInternalServiceClient; import org.apache.iotdb.commons.cluster.NodeStatus; import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory; import org.apache.iotdb.commons.concurrent.ThreadName; @@ -49,7 +45,6 @@ import org.apache.iotdb.mpp.rpc.thrift.TDataNodeHeartbeatResp; import org.apache.ratis.util.AwaitForSignal; -import org.apache.thrift.async.AsyncMethodCallback; import org.apache.tsfile.utils.Pair; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -95,16 +90,11 @@ public class TopologyService implements Runnable, IClusterStatusSubscriber { private final IFailureDetector failureDetector; private static final ConfigNodeConfig CONF = ConfigNodeDescriptor.getInstance().getConf(); - /** Rotation index for sqrt(N) prober selection. */ private int proberRotationIndex = 0; - /** Last topology pushed to each DataNode, used to detect changes. */ + /** Last topology pushed to each DataNode, updated only on successful push. */ private final Map> lastPushedTopology = new ConcurrentHashMap<>(); - /** Client manager for pushing topology updates to DataNodes via heartbeat RPC. */ - private final IClientManager - topologyPushClientManager; - public TopologyService( IManager configManager, Consumer>> topologyChangeListener) { this.configManager = configManager; @@ -112,12 +102,7 @@ public TopologyService( this.heartbeats = new ConcurrentHashMap<>(); this.shouldRun = new AtomicBoolean(false); this.awaitForSignal = new AwaitForSignal(this.getClass().getSimpleName()); - this.topologyPushClientManager = - new IClientManager.Factory() - .createClientManager( - new ClientPoolFactory.AsyncDataNodeHeartbeatServiceClientPoolFactory()); - // here we use the same failure switch (CONF.getFailureDetector()) { case IFailureDetector.PHI_ACCRUAL_DETECTOR: this.failureDetector = @@ -151,10 +136,6 @@ public synchronized void stopTopologyService() { LOGGER.info("Topology Probing has stopped successfully"); } - /** - * Schedule the {@link #topologyProbing} task either: 1. every adaptive probing interval. 2. - * Manually triggered by outside events (node restart / register, etc.). - */ private boolean mayWait() { try { this.awaitForSignal.await(CONF.getTopologyProbingBaseIntervalInMs(), TimeUnit.MILLISECONDS); @@ -208,11 +189,13 @@ private synchronized void topologyProbing() { } // 2. compute probing timeout - final long baseInterval = CONF.getTopologyProbingBaseIntervalInMs(); - final long timeout = (long) (baseInterval * CONF.getTopologyProbingTimeoutRatio()); + final long timeout = + (long) (CONF.getTopologyProbingBaseIntervalInMs() * CONF.getTopologyProbingTimeoutRatio()); // 3. select sqrt(N) probers via rotating selection final List probers = selectProbers(dataNodeLocations); + final Set proberIds = + probers.stream().map(TDataNodeLocation::getDataNodeId).collect(Collectors.toSet()); // 4. build TNodeLocations with ALL DataNode locations (so probers test all targets) final TNodeLocations nodeLocations = new TNodeLocations(); @@ -224,7 +207,7 @@ private synchronized void topologyProbing() { probers.stream() .collect(Collectors.toMap(TDataNodeLocation::getDataNodeId, location -> location)); - // 6. send async requests ONLY to probers (not all DataNodes) with computed timeout + // 6. send async requests ONLY to probers with computed timeout final DataNodeAsyncRequestContext dataNodeAsyncRequestContext = new DataNodeAsyncRequestContext<>( @@ -263,7 +246,7 @@ private synchronized void topologyProbing() { } } - // 8. use failure detector to identify potential network partitions (on ALL heartbeat pairs) + // 8. run failure detector only on pairs from current probers (not all pairs) final Map> latestTopology = dataNodeLocations.stream() .collect(Collectors.toMap(TDataNodeLocation::getDataNodeId, k -> new HashSet<>())); @@ -272,6 +255,14 @@ private synchronized void topologyProbing() { final int fromId = entry.getKey().getLeft(); final int toId = entry.getKey().getRight(); + if (!proberIds.contains(fromId)) { + // Not in this batch — carry forward as reachable if history is non-empty and last known OK + if (!entry.getValue().isEmpty()) { + Optional.ofNullable(latestTopology.get(fromId)).ifPresent(s -> s.add(toId)); + } + continue; + } + if (!entry.getValue().isEmpty() && !failureDetector.isAvailable(entry.getKey(), entry.getValue())) { LOGGER.debug("Connection from DataNode {} to DataNode {} is broken", fromId, toId); @@ -292,8 +283,8 @@ private synchronized void topologyProbing() { } /** - * Push topology changes to DataNodes via heartbeat RPC. Each DataNode only receives a push when - * its own reachable set has changed since the last push. + * Push topology changes to DataNodes via PUSH_TOPOLOGY request. Each DataNode receives only its + * own reachable set. lastPushedTopology is updated only on successful push. */ private void pushTopologyToDataNodes( Map> latestTopology, List dataNodeLocations) { @@ -301,54 +292,51 @@ private void pushTopologyToDataNodes( dataNodeLocations.stream() .collect(Collectors.toMap(TDataNodeLocation::getDataNodeId, loc -> loc)); + final Map targetMap = new HashMap<>(); for (final TDataNodeLocation location : dataNodeLocations) { final int nodeId = location.getDataNodeId(); final Set reachableSet = latestTopology.getOrDefault(nodeId, Collections.emptySet()); final Set lastPushed = lastPushedTopology.get(nodeId); - if (lastPushed != null && lastPushed.equals(reachableSet)) { continue; } + targetMap.put(nodeId, location); + } - // Per-node topology: only this DataNode's own reachable set + if (targetMap.isEmpty()) { + return; + } + + final DataNodeAsyncRequestContext context = + new DataNodeAsyncRequestContext<>(CnToDnAsyncRequestType.PUSH_TOPOLOGY, targetMap); + for (final Map.Entry entry : targetMap.entrySet()) { + final int nodeId = entry.getKey(); + final Set reachableSet = latestTopology.getOrDefault(nodeId, Collections.emptySet()); final Map> perNodeTopology = new HashMap<>(); perNodeTopology.put(nodeId, new HashSet<>(reachableSet)); - final TDataNodeHeartbeatReq req = new TDataNodeHeartbeatReq(System.nanoTime(), false, false, 0L, 0L); req.setTopology(perNodeTopology); req.setDataNodes(dataNodesMap); - - final TEndPoint endPoint = location.getInternalEndPoint(); - try { - topologyPushClientManager - .borrowClient(endPoint) - .getDataNodeHeartBeat( - req, - new AsyncMethodCallback() { - @Override - public void onComplete(TDataNodeHeartbeatResp response) {} - - @Override - public void onError(Exception exception) {} - }); - lastPushedTopology.put(nodeId, new HashSet<>(reachableSet)); - } catch (Exception e) { - LOGGER.debug( - "Failed to push topology to DataNode {} at {}: {}", nodeId, endPoint, e.getMessage()); - } + context.putRequest(nodeId, req); } + + CnToDnInternalServiceAsyncRequestManager.getInstance() + .sendAsyncRequestWithTimeoutInMs(context, CONF.getTopologyProbingBaseIntervalInMs()); + + context + .getResponseMap() + .forEach( + (nodeId, resp) -> { + Set reachableSet = + latestTopology.getOrDefault(nodeId, Collections.emptySet()); + lastPushedTopology.put(nodeId, new HashSet<>(reachableSet)); + }); } - /** - * We only consider warning (one vs remaining) network partition. If we need to cover more - * complicated scenarios like (many vs many) network partition, we shall use graph algorithms - * then. - */ private void logAsymmetricPartition(final Map> topology) { final Set nodes = topology.keySet(); if (nodes.size() == 1) { - // 1 DataNode return; } @@ -358,11 +346,9 @@ private void logAsymmetricPartition(final Map> topology) { continue; } - // whether we have asymmetric partition [from -> to] final Set reachableFrom = topology.get(from); final Set reachableTo = topology.get(to); if (reachableFrom.size() <= 1 || reachableTo.size() <= 1) { - // symmetric partition for (from) or (to) continue; } if (!reachableTo.contains(from) && !reachableFrom.contains(to)) { @@ -372,40 +358,21 @@ private void logAsymmetricPartition(final Map> topology) { } } - /** We only listen to datanode remove / restart / register events */ + /** Clean up heartbeat and push state when a node is removed or entering Removing status. */ @Override public void onNodeStatisticsChanged(NodeStatisticsChangeEvent event) { - final Set datanodeIds = - configManager.getNodeManager().getRegisteredDataNodeLocations().keySet(); - final Map> changes = - event.getDifferentNodeStatisticsMap(); for (final Map.Entry> entry : - changes.entrySet()) { + event.getDifferentNodeStatisticsMap().entrySet()) { final Integer nodeId = entry.getKey(); - final Pair changeEvent = entry.getValue(); - if (!datanodeIds.contains(nodeId)) { - continue; - } - if (changeEvent.getLeft() == null) { - continue; - } - - final Set> affectedPairs = - heartbeats.keySet().stream() - .filter( - pair -> - Objects.equals(pair.getLeft(), nodeId) - || Objects.equals(pair.getRight(), nodeId)) - .collect(Collectors.toSet()); - - if (changeEvent.getRight() == null) { - affectedPairs.forEach(heartbeats::remove); - } else { - if (NodeStatus.Unknown.equals(changeEvent.getLeft().getStatus()) - && NodeStatus.Running.equals(changeEvent.getRight().getStatus())) { - affectedPairs.forEach(pair -> heartbeats.put(pair, new ArrayList<>())); - awaitForSignal.signal(); - } + final Pair change = entry.getValue(); + if (change.getRight() == null || NodeStatus.Removing.equals(change.getRight().getStatus())) { + heartbeats + .keySet() + .removeIf( + pair -> + Objects.equals(pair.getLeft(), nodeId) + || Objects.equals(pair.getRight(), nodeId)); + lastPushedTopology.remove(nodeId); } } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/ClusterTopology.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/ClusterTopology.java index f7807d4a90941..2af51aec26b5f 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/ClusterTopology.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/ClusterTopology.java @@ -39,22 +39,33 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; +/** + * Tracks this DataNode's view of cluster network topology for partition-aware query planning. Only + * stores the set of DataNodes reachable from this node, pushed by ConfigNode's TopologyService. + */ public class ClusterTopology { private static final Logger LOGGER = LoggerFactory.getLogger(ClusterTopology.class); private final Integer myself; private final AtomicReference> dataNodes; + + /** DataNode IDs reachable from this node. Empty means not yet probed by TopologyService. */ private final AtomicReference> myReachableNodes; + private final AtomicBoolean isPartitioned = new AtomicBoolean(); public static ClusterTopology getInstance() { return ClusterTopologyHolder.INSTANCE; } + /** Filters a replica set to only include DataNodes reachable from this node. */ public TRegionReplicaSet getValidatedReplicaSet(TRegionReplicaSet origin) { if (!isPartitioned.get() || origin == null) { return origin; } final Set reachable = myReachableNodes.get(); + if (reachable.isEmpty()) { + return origin; + } final List locations = new ArrayList<>(); for (final TDataNodeLocation location : origin.getDataNodeLocations()) { if (reachable.contains(location.getDataNodeId())) { @@ -64,6 +75,7 @@ public TRegionReplicaSet getValidatedReplicaSet(TRegionReplicaSet origin) { return new TRegionReplicaSet(origin.getRegionId(), locations); } + /** Filters region-to-scan mappings, keeping only replicas reachable from this node. */ public Set> filterReachableCandidates( Set> input) { if (!isPartitioned.get()) { @@ -86,6 +98,9 @@ public Set> filterReachableCandidates( final TRegionReplicaSet replicaSet = newMap.get(gid); if (replicaSet != null) { candidateMap.put(replicaSet, entry.getValue()); + } else { + // Topology not yet available for this region — return original to avoid silent data loss + candidateMap.put(entry.getKey(), entry.getValue()); } } return candidateMap.entrySet(); @@ -102,6 +117,9 @@ private List getReachableCandidates(List a } final Set reachable = this.myReachableNodes.get(); + if (reachable.isEmpty()) { + return all; + } final Map dataNodesCurrent = this.dataNodes.get(); final List reachableSetCandidates = new ArrayList<>(); @@ -113,11 +131,15 @@ private List getReachableCandidates(List a .collect(Collectors.toList()); if (!validLocations.isEmpty()) { reachableSetCandidates.add(new TRegionReplicaSet(replicaSet.getRegionId(), validLocations)); + } else { + // No reachable replica — return full set so upper layer can handle or report the error + reachableSetCandidates.add(replicaSet); } } return reachableSetCandidates; } + /** Called by ConfigNode's TopologyService push. Updates this node's reachable set. */ public void updateTopology( final Map dataNodes, Map> latestTopology) { final Set newReachable = latestTopology.getOrDefault(myself, Collections.emptySet()); From 431e2b37f27bf9ff161af2e43fed05262a7d0786 Mon Sep 17 00:00:00 2001 From: Yongzao <532741407@qq.com> Date: Wed, 6 May 2026 14:34:41 +0800 Subject: [PATCH 5/8] =?UTF-8?q?Drop=20PipeDataNodeTaskAgent=20change=20?= =?UTF-8?q?=E2=80=94=20already=20fixed=20in=20master=20(#17590)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java index bc0a619afd892..06d6a512b30a4 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java @@ -24,6 +24,7 @@ import org.apache.iotdb.commons.concurrent.IoTThreadFactory; import org.apache.iotdb.commons.concurrent.ThreadName; import org.apache.iotdb.commons.concurrent.threadpool.WrappedThreadPoolExecutor; +import org.apache.iotdb.commons.conf.CommonDescriptor; import org.apache.iotdb.commons.consensus.DataRegionId; import org.apache.iotdb.commons.consensus.SchemaRegionId; import org.apache.iotdb.commons.consensus.index.ProgressIndex; @@ -376,7 +377,8 @@ public void stopAllPipesWithCriticalExceptionAndTrackException( ///////////////////////// Heartbeat ///////////////////////// public void collectPipeMetaList(final TDataNodeHeartbeatResp resp) throws TException { - if (!tryReadLockWithTimeOut(2)) { + if (!tryReadLockWithTimeOutInMs( + CommonDescriptor.getInstance().getConfig().getDnConnectionTimeoutInMS() * 2L / 3)) { return; } try { From f16f1c3442c6770f22c1f2c4845e77b03fc0f719 Mon Sep 17 00:00:00 2001 From: Yongzao <532741407@qq.com> Date: Wed, 6 May 2026 15:02:27 +0800 Subject: [PATCH 6/8] Introduce dedicated updateClusterTopology RPC for topology push - Add TUpdateClusterTopologyReq struct and updateClusterTopology RPC to datanode.thrift - Implement updateClusterTopology handler in DataNodeInternalRPCServiceImpl - Change PUSH_TOPOLOGY action to call updateClusterTopology instead of getDataNodeHeartBeat - Use DataNodeTSStatusRPCHandler (default) instead of custom TopologyPushRPCHandler - Remove topology handling from heartbeat handler (getDataNodeHeartBeat) - Delete TopologyPushRPCHandler (no longer needed) --- ...oDnInternalServiceAsyncRequestManager.java | 7 ++- .../rpc/DataNodeAsyncRequestRPCHandler.java | 7 --- .../handlers/rpc/TopologyPushRPCHandler.java | 53 ------------------- .../manager/load/service/TopologyService.java | 12 ++--- .../impl/DataNodeInternalRPCServiceImpl.java | 11 ++-- .../src/main/thrift/datanode.thrift | 11 ++++ 6 files changed, 26 insertions(+), 75 deletions(-) delete mode 100644 iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/TopologyPushRPCHandler.java diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/CnToDnInternalServiceAsyncRequestManager.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/CnToDnInternalServiceAsyncRequestManager.java index 7bed1b48cc43e..60426fea62bbc 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/CnToDnInternalServiceAsyncRequestManager.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/CnToDnInternalServiceAsyncRequestManager.java @@ -43,7 +43,6 @@ import org.apache.iotdb.confignode.client.async.handlers.rpc.PipePushMetaRPCHandler; import org.apache.iotdb.confignode.client.async.handlers.rpc.SchemaUpdateRPCHandler; import org.apache.iotdb.confignode.client.async.handlers.rpc.SubmitTestConnectionTaskRPCHandler; -import org.apache.iotdb.confignode.client.async.handlers.rpc.TopologyPushRPCHandler; import org.apache.iotdb.confignode.client.async.handlers.rpc.TransferLeaderRPCHandler; import org.apache.iotdb.confignode.client.async.handlers.rpc.TreeDeviceViewFieldDetectionHandler; import org.apache.iotdb.confignode.client.async.handlers.rpc.subscription.CheckSchemaRegionUsingTemplateRPCHandler; @@ -65,7 +64,6 @@ import org.apache.iotdb.mpp.rpc.thrift.TCreatePipePluginInstanceReq; import org.apache.iotdb.mpp.rpc.thrift.TCreateSchemaRegionReq; import org.apache.iotdb.mpp.rpc.thrift.TCreateTriggerInstanceReq; -import org.apache.iotdb.mpp.rpc.thrift.TDataNodeHeartbeatReq; import org.apache.iotdb.mpp.rpc.thrift.TDeactivateTemplateReq; import org.apache.iotdb.mpp.rpc.thrift.TDeleteColumnDataReq; import org.apache.iotdb.mpp.rpc.thrift.TDeleteDataForDeleteSchemaReq; @@ -102,6 +100,7 @@ import org.apache.iotdb.mpp.rpc.thrift.TTableDeviceDeletionWithPatternAndFilterReq; import org.apache.iotdb.mpp.rpc.thrift.TTableDeviceDeletionWithPatternOrModReq; import org.apache.iotdb.mpp.rpc.thrift.TTableDeviceInvalidateCacheReq; +import org.apache.iotdb.mpp.rpc.thrift.TUpdateClusterTopologyReq; import org.apache.iotdb.mpp.rpc.thrift.TUpdateTableReq; import org.apache.iotdb.mpp.rpc.thrift.TUpdateTemplateReq; import org.apache.iotdb.mpp.rpc.thrift.TUpdateTriggerLocationReq; @@ -412,8 +411,8 @@ protected void initActionMapBuilder() { actionMapBuilder.put( CnToDnAsyncRequestType.PUSH_TOPOLOGY, (req, client, handler) -> - client.getDataNodeHeartBeat( - (TDataNodeHeartbeatReq) req, (TopologyPushRPCHandler) handler)); + client.updateClusterTopology( + (TUpdateClusterTopologyReq) req, (DataNodeTSStatusRPCHandler) handler)); actionMapBuilder.put( CnToDnAsyncRequestType.INSERT_RECORD, (req, client, handler) -> diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/DataNodeAsyncRequestRPCHandler.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/DataNodeAsyncRequestRPCHandler.java index 95d23bebd4b24..ae651d1d9fc5a 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/DataNodeAsyncRequestRPCHandler.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/DataNodeAsyncRequestRPCHandler.java @@ -203,13 +203,6 @@ public static DataNodeAsyncRequestRPCHandler buildHandler( (Map) responseMap, countDownLatch); case PUSH_TOPOLOGY: - return new TopologyPushRPCHandler( - requestType, - requestId, - targetDataNode, - dataNodeLocationMap, - (Map) responseMap, - countDownLatch); case SET_TTL: case CREATE_DATA_REGION: case CREATE_SCHEMA_REGION: diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/TopologyPushRPCHandler.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/TopologyPushRPCHandler.java deleted file mode 100644 index d2023a8810865..0000000000000 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/TopologyPushRPCHandler.java +++ /dev/null @@ -1,53 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.iotdb.confignode.client.async.handlers.rpc; - -import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation; -import org.apache.iotdb.confignode.client.async.CnToDnAsyncRequestType; -import org.apache.iotdb.mpp.rpc.thrift.TDataNodeHeartbeatResp; - -import java.util.Map; -import java.util.concurrent.CountDownLatch; - -public class TopologyPushRPCHandler extends DataNodeAsyncRequestRPCHandler { - - public TopologyPushRPCHandler( - CnToDnAsyncRequestType requestType, - int requestId, - TDataNodeLocation targetDataNode, - Map dataNodeLocationMap, - Map responseMap, - CountDownLatch countDownLatch) { - super(requestType, requestId, targetDataNode, dataNodeLocationMap, responseMap, countDownLatch); - } - - @Override - public void onComplete(TDataNodeHeartbeatResp resp) { - responseMap.put(requestId, resp); - nodeLocationMap.remove(requestId); - countDownLatch.countDown(); - } - - @Override - public void onError(Exception e) { - nodeLocationMap.remove(requestId); - countDownLatch.countDown(); - } -} diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/TopologyService.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/TopologyService.java index 038d8843411b1..f92a656859bbc 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/TopologyService.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/TopologyService.java @@ -22,6 +22,7 @@ import org.apache.iotdb.common.rpc.thrift.TDataNodeConfiguration; import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation; import org.apache.iotdb.common.rpc.thrift.TNodeLocations; +import org.apache.iotdb.common.rpc.thrift.TSStatus; import org.apache.iotdb.common.rpc.thrift.TTestConnectionResp; import org.apache.iotdb.common.rpc.thrift.TTestConnectionResult; import org.apache.iotdb.commons.cluster.NodeStatus; @@ -41,8 +42,7 @@ import org.apache.iotdb.confignode.manager.load.cache.node.NodeStatistics; import org.apache.iotdb.confignode.manager.load.subscriber.IClusterStatusSubscriber; import org.apache.iotdb.confignode.manager.load.subscriber.NodeStatisticsChangeEvent; -import org.apache.iotdb.mpp.rpc.thrift.TDataNodeHeartbeatReq; -import org.apache.iotdb.mpp.rpc.thrift.TDataNodeHeartbeatResp; +import org.apache.iotdb.mpp.rpc.thrift.TUpdateClusterTopologyReq; import org.apache.ratis.util.AwaitForSignal; import org.apache.tsfile.utils.Pair; @@ -307,17 +307,15 @@ private void pushTopologyToDataNodes( return; } - final DataNodeAsyncRequestContext context = + final DataNodeAsyncRequestContext context = new DataNodeAsyncRequestContext<>(CnToDnAsyncRequestType.PUSH_TOPOLOGY, targetMap); for (final Map.Entry entry : targetMap.entrySet()) { final int nodeId = entry.getKey(); final Set reachableSet = latestTopology.getOrDefault(nodeId, Collections.emptySet()); final Map> perNodeTopology = new HashMap<>(); perNodeTopology.put(nodeId, new HashSet<>(reachableSet)); - final TDataNodeHeartbeatReq req = - new TDataNodeHeartbeatReq(System.nanoTime(), false, false, 0L, 0L); - req.setTopology(perNodeTopology); - req.setDataNodes(dataNodesMap); + final TUpdateClusterTopologyReq req = + new TUpdateClusterTopologyReq(dataNodesMap, perNodeTopology); context.putRequest(nodeId, req); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java index ec691880a43d8..34b9a3bc4ffea 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java @@ -320,6 +320,7 @@ import org.apache.iotdb.mpp.rpc.thrift.TTableDeviceDeletionWithPatternOrModReq; import org.apache.iotdb.mpp.rpc.thrift.TTableDeviceInvalidateCacheReq; import org.apache.iotdb.mpp.rpc.thrift.TTsFilePieceReq; +import org.apache.iotdb.mpp.rpc.thrift.TUpdateClusterTopologyReq; import org.apache.iotdb.mpp.rpc.thrift.TUpdateTableReq; import org.apache.iotdb.mpp.rpc.thrift.TUpdateTemplateReq; import org.apache.iotdb.mpp.rpc.thrift.TUpdateTriggerLocationReq; @@ -2190,6 +2191,12 @@ public TSStatus testConnectionEmptyRPC() throws TException { return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()); } + @Override + public TSStatus updateClusterTopology(TUpdateClusterTopologyReq req) throws TException { + clusterTopology.updateTopology(req.getDataNodes(), req.getTopology()); + return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()); + } + private PathPatternTree filterPathPatternTree(PathPatternTree patternTree, String database) { PathPatternTree filteredPatternTree = new PathPatternTree(); try { @@ -2284,10 +2291,6 @@ public TDataNodeHeartbeatResp getDataNodeHeartBeat(TDataNodeHeartbeatReq req) th } } - if (req.isSetTopology() && req.isSetDataNodes()) { - clusterTopology.updateTopology(req.getDataNodes(), req.getTopology()); - } - if (req.isSetCurrentRegionOperations()) { RegionMigrateService.getInstance() .notifyRegionMigration( diff --git a/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift b/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift index 4323e956a8e99..92a7602b34dee 100644 --- a/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift +++ b/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift @@ -332,6 +332,11 @@ struct TRegionRouteReq { 2: required map regionRouteMap } +struct TUpdateClusterTopologyReq { + 1: required map dataNodes + 2: required map> topology +} + struct TUpdateTemplateReq { 1: required byte type 2: required binary templateInfo @@ -1304,6 +1309,12 @@ service IDataNodeRPCService { /** Empty rpc, only for connection test */ common.TSStatus testConnectionEmptyRPC() + /** + * Push cluster topology to this DataNode. + * Each DataNode receives only its own reachable set. + */ + common.TSStatus updateClusterTopology(1:TUpdateClusterTopologyReq req) + /** to write audit log or other events as time series **/ common.TSStatus insertRecord(1:client.TSInsertRecordReq req); From 3e09c8ac43958f7725f0366e1d591735950372c6 Mon Sep 17 00:00:00 2001 From: Yongzao <532741407@qq.com> Date: Thu, 7 May 2026 16:42:43 +0800 Subject: [PATCH 7/8] Add enable_topology_probing toggle with hot-reload; revert heartbeat selector optimization - Add `enable_topology_probing` config parameter (default true, volatile) with hot-reload support via loadHotModifiedProps - TopologyService checks the flag each iteration, skipping probing when disabled while keeping the thread alive for seamless re-enable - Revert dedicated heartbeat selector count: heartbeat client pools go back to using the general selectorNumOfClientManager --- .../iotdb/confignode/conf/ConfigNodeConfig.java | 11 +++++++++++ .../iotdb/confignode/conf/ConfigNodeDescriptor.java | 7 +++++++ .../manager/load/service/TopologyService.java | 4 +++- .../resources/conf/iotdb-system.properties.template | 6 ------ .../iotdb/commons/client/ClientPoolFactory.java | 6 ++---- .../org/apache/iotdb/commons/conf/CommonConfig.java | 13 ------------- .../apache/iotdb/commons/conf/CommonDescriptor.java | 6 ------ 7 files changed, 23 insertions(+), 30 deletions(-) diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java index 4dd5e78372f46..6a5b2013e841e 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java @@ -204,6 +204,9 @@ public class ConfigNodeConfig { /** Acceptable pause duration for Phi accrual failure detector */ private long failureDetectorPhiAcceptablePauseInMs = 10000; + /** Whether to enable topology probing between DataNodes. Supports hot-reload. */ + private volatile boolean enableTopologyProbing = true; + /** Base interval in ms for topology probing. */ private long topologyProbingBaseIntervalInMs = 5000; @@ -1295,6 +1298,14 @@ public void setFailureDetectorPhiAcceptablePauseInMs(long failureDetectorPhiAcce this.failureDetectorPhiAcceptablePauseInMs = failureDetectorPhiAcceptablePauseInMs; } + public boolean isEnableTopologyProbing() { + return enableTopologyProbing; + } + + public void setEnableTopologyProbing(boolean enableTopologyProbing) { + this.enableTopologyProbing = enableTopologyProbing; + } + public long getTopologyProbingBaseIntervalInMs() { return topologyProbingBaseIntervalInMs; } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java index 591a4cb817ec2..072fa41a92b7d 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java @@ -322,6 +322,11 @@ private void loadProperties(TrimProperties properties) throws BadNodeUrlExceptio "failure_detector_phi_acceptable_pause_in_ms", String.valueOf(conf.getFailureDetectorPhiAcceptablePauseInMs())))); + conf.setEnableTopologyProbing( + Boolean.parseBoolean( + properties.getProperty( + "enable_topology_probing", String.valueOf(conf.isEnableTopologyProbing())))); + conf.setTopologyProbingBaseIntervalInMs( Long.parseLong( properties.getProperty( @@ -785,6 +790,8 @@ public void loadHotModifiedProps(TrimProperties properties) { ConfigurationFileUtils.updateAppliedProperties(properties, true); Optional.ofNullable(properties.getProperty(IoTDBConstant.CLUSTER_NAME)) .ifPresent(conf::setClusterName); + Optional.ofNullable(properties.getProperty("enable_topology_probing")) + .ifPresent(v -> conf.setEnableTopologyProbing(Boolean.parseBoolean(v))); } public static ConfigNodeDescriptor getInstance() { diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/TopologyService.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/TopologyService.java index f92a656859bbc..fc35c6bc1725a 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/TopologyService.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/TopologyService.java @@ -148,7 +148,9 @@ private boolean mayWait() { @Override public void run() { while (shouldRun.get() && mayWait()) { - topologyProbing(); + if (CONF.isEnableTopologyProbing()) { + topologyProbing(); + } } } diff --git a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template index ae940226ab89c..399a39352985f 100644 --- a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template +++ b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template @@ -732,12 +732,6 @@ failure_detector_phi_acceptable_pause_in_ms=10000 # Datatype: double # topology_probing_timeout_ratio=0.5 -# Selector thread (TAsyncClientManager) count for heartbeat async client pools. -# 0 means auto: max(1, availableProcessors / 4) -# effectiveMode: restart -# Datatype: int -# heartbeat_selector_num_of_client_manager=0 - # Disk remaining threshold at which DataNode is set to ReadOnly status # effectiveMode: restart # Datatype: double(percentage) diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientPoolFactory.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientPoolFactory.java index f9ffca123b5c7..ea20f9c76ccd8 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientPoolFactory.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientPoolFactory.java @@ -182,8 +182,7 @@ public GenericKeyedObjectPool c new ThriftClientProperty.Builder() .setConnectionTimeoutMs(conf.getCnConnectionTimeoutInMS()) .setRpcThriftCompressionEnabled(conf.isRpcThriftCompressionEnabled()) - .setSelectorNumOfAsyncClientManager( - conf.getHeartbeatSelectorNumOfClientManager()) + .setSelectorNumOfAsyncClientManager(conf.getSelectorNumOfClientManager()) .setPrintLogWhenEncounterException(false) .build(), ThreadName.ASYNC_CONFIGNODE_HEARTBEAT_CLIENT_POOL.getName()), @@ -208,8 +207,7 @@ public GenericKeyedObjectPool cre new ThriftClientProperty.Builder() .setConnectionTimeoutMs(conf.getCnConnectionTimeoutInMS()) .setRpcThriftCompressionEnabled(conf.isRpcThriftCompressionEnabled()) - .setSelectorNumOfAsyncClientManager( - conf.getHeartbeatSelectorNumOfClientManager()) + .setSelectorNumOfAsyncClientManager(conf.getSelectorNumOfClientManager()) .setPrintLogWhenEncounterException(false) .build(), ThreadName.ASYNC_DATANODE_HEARTBEAT_CLIENT_POOL.getName()), diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java index f91f6156d753c..19313723e55f5 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java @@ -155,8 +155,6 @@ public class CommonConfig { */ private int selectorNumOfClientManager = 1; - private int heartbeatSelectorNumOfClientManager = 0; - /** Whether to use thrift compression. */ private boolean isRpcThriftCompressionEnabled = false; @@ -699,17 +697,6 @@ public void setSelectorNumOfClientManager(int selectorNumOfClientManager) { this.selectorNumOfClientManager = selectorNumOfClientManager; } - public int getHeartbeatSelectorNumOfClientManager() { - if (heartbeatSelectorNumOfClientManager <= 0) { - return Math.max(1, Runtime.getRuntime().availableProcessors() / 4); - } - return heartbeatSelectorNumOfClientManager; - } - - public void setHeartbeatSelectorNumOfClientManager(int heartbeatSelectorNumOfClientManager) { - this.heartbeatSelectorNumOfClientManager = heartbeatSelectorNumOfClientManager; - } - public boolean isRpcThriftCompressionEnabled() { return isRpcThriftCompressionEnabled; } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java index 11db54be34f81..d392a60bbbd76 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java @@ -178,12 +178,6 @@ public void loadCommonProps(TrimProperties properties) throws IOException { String.valueOf(config.getSelectorNumOfClientManager())) .trim())); - config.setHeartbeatSelectorNumOfClientManager( - Integer.parseInt( - properties.getProperty( - "heartbeat_selector_num_of_client_manager", - String.valueOf(config.getHeartbeatSelectorNumOfClientManager())))); - config.setMaxClientNumForEachNode( Integer.parseInt( properties From 3490412ed5a10a0e14cc8f69efad3a14d04329b9 Mon Sep 17 00:00:00 2001 From: Yongzao <532741407@qq.com> Date: Thu, 7 May 2026 17:31:38 +0800 Subject: [PATCH 8/8] Move TopologyService lifecycle to ConfigRegionStateMachine; add enable_topology_probing to template --- .../statemachine/ConfigRegionStateMachine.java | 5 +++++ .../iotdb/confignode/manager/ConfigManager.java | 14 ++++++++++++++ .../iotdb/confignode/manager/load/LoadManager.java | 8 +++++++- .../manager/load/service/TopologyService.java | 10 +++++----- .../conf/iotdb-system.properties.template | 5 +++++ 5 files changed, 36 insertions(+), 6 deletions(-) diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java index effb1c466bab9..9c0f64873c41d 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java @@ -256,6 +256,7 @@ public void notifyNotLeader() { configManager.getPipeManager().getPipeRuntimeCoordinator().stopPipeMetaSync(); configManager.getPipeManager().getPipeRuntimeCoordinator().stopPipeHeartbeat(); configManager.getSubscriptionManager().getSubscriptionCoordinator().stopSubscriptionMetaSync(); + configManager.getLoadManager().stopTopologyService(); configManager.getLoadManager().stopLoadServices(); configManager.getProcedureManager().stopExecutor(); configManager.getRetryFailedTasksThread().stopRetryFailedTasksService(); @@ -288,6 +289,10 @@ public void notifyLeaderReady() { // Always start load services first configManager.getLoadManager().startLoadServices(); + if (CONF.isEnableTopologyProbing()) { + configManager.getLoadManager().startTopologyService(); + } + // Start leader scheduling services configManager.getProcedureManager().startExecutor(); threadPool.submit( diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java index 327f842e966e6..34533362d44e6 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java @@ -1744,6 +1744,7 @@ public TSStatus setConfiguration(TSetConfigurationReq req) { TrimProperties properties = new TrimProperties(); properties.putAll(req.getConfigs()); + boolean wasTopologyProbingEnabled = CONF.isEnableTopologyProbing(); if (configurationFileFound) { File file = new File(url.getFile()); try { @@ -1767,6 +1768,7 @@ public TSStatus setConfiguration(TSetConfigurationReq req) { } LOGGER.warn(msg); } + handleTopologyProbingHotReload(wasTopologyProbingEnabled); if (currentNodeId == req.getNodeId()) { return tsStatus; } @@ -1778,6 +1780,18 @@ public TSStatus setConfiguration(TSetConfigurationReq req) { return RpcUtils.squashResponseStatusList(statusList); } + private void handleTopologyProbingHotReload(boolean wasEnabled) { + boolean isEnabled = CONF.isEnableTopologyProbing(); + if (wasEnabled == isEnabled) { + return; + } + if (isEnabled && getConsensusManager().isLeader()) { + getLoadManager().startTopologyService(); + } else if (!isEnabled) { + getLoadManager().stopTopologyService(); + } + } + @Override public TSStatus startRepairData() { TSStatus status = confirmLeader(); diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java index 993bfc0e40066..e97f32bdbda85 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java @@ -152,7 +152,6 @@ public void startLoadServices() { statisticsService.startLoadStatisticsService(); eventService.startEventService(); partitionBalancer.setupPartitionBalancer(); - topologyService.startTopologyService(); } public void stopLoadServices() { @@ -162,6 +161,13 @@ public void stopLoadServices() { loadCache.clearHeartbeatCache(); partitionBalancer.clearPartitionBalancer(); routeBalancer.clearRegionPriority(); + } + + public void startTopologyService() { + topologyService.startTopologyService(); + } + + public void stopTopologyService() { topologyService.stopTopologyService(); } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/TopologyService.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/TopologyService.java index fc35c6bc1725a..7084ec4ea2372 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/TopologyService.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/TopologyService.java @@ -130,8 +130,10 @@ public synchronized void startTopologyService() { public synchronized void stopTopologyService() { shouldRun.set(false); - future.cancel(true); - future = null; + if (future != null) { + future.cancel(true); + future = null; + } heartbeats.clear(); LOGGER.info("Topology Probing has stopped successfully"); } @@ -148,9 +150,7 @@ private boolean mayWait() { @Override public void run() { while (shouldRun.get() && mayWait()) { - if (CONF.isEnableTopologyProbing()) { - topologyProbing(); - } + topologyProbing(); } } diff --git a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template index 399a39352985f..50b1e82acab5f 100644 --- a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template +++ b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template @@ -722,6 +722,11 @@ failure_detector_phi_threshold=30 # Datatype: long failure_detector_phi_acceptable_pause_in_ms=10000 +# Whether to enable topology probing between DataNodes +# effectiveMode: hot_reload +# Datatype: Boolean +# enable_topology_probing=true + # Base interval in ms for topology probing between DataNodes # effectiveMode: restart # Datatype: long