diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/EnvUtils.java b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/EnvUtils.java index 51fd05d8eaf5d..53d7140a6f455 100644 --- a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/EnvUtils.java +++ b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/EnvUtils.java @@ -163,22 +163,7 @@ public static Map listPortOccupationWindows(final List p */ public static Map listPortOccupationUnix(final List ports) throws IOException { - return listPortOccupation(ports, "lsof -iTCP -sTCP:LISTEN -P -n", 10, 9, 1); - } - - private static String getSearchAvailablePortCmd(final List ports) { - return SystemUtils.IS_OS_WINDOWS ? getWindowsSearchPortCmd(ports) : getUnixSearchPortCmd(ports); - } - - private static String getWindowsSearchPortCmd(final List ports) { - return "netstat -aon -p tcp | findStr " - + ports.stream().map(v -> "/C:\"127.0.0.1:" + v + "\"").collect(Collectors.joining(" ")); - } - - private static String getUnixSearchPortCmd(final List ports) { - return "lsof -iTCP -sTCP:LISTEN -P -n | awk '{print $9}' | grep -E " - + ports.stream().map(String::valueOf).collect(Collectors.joining("|")) - + "\""; + return listPortOccupation(ports, "lsof -iTCP -sTCP:LISTEN,TIME_WAIT -P -n", 10, 9, 1); } private static Pair getClusterNodesNum(final int index) { diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppBaseConfig.java b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppBaseConfig.java index 473d2f1b64c7f..975ae1a5682df 100644 --- a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppBaseConfig.java +++ b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppBaseConfig.java @@ -40,6 +40,8 @@ public abstract class MppBaseConfig { /** Create an empty MppPersistentConfig. */ protected MppBaseConfig() { this.properties = new Properties(); + this.properties.setProperty("cn_selector_thread_nums_of_client_manager", "1"); + this.properties.setProperty("dn_selector_thread_nums_of_client_manager", "1"); } /** diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncAINodeHeartbeatClientPool.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncAINodeHeartbeatClientPool.java index 8d9081f435273..f3e1d1064ef4e 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncAINodeHeartbeatClientPool.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncAINodeHeartbeatClientPool.java @@ -25,6 +25,7 @@ import org.apache.iotdb.commons.client.IClientManager; import org.apache.iotdb.commons.client.async.AsyncAINodeInternalServiceClient; import org.apache.iotdb.confignode.client.async.handlers.heartbeat.AINodeHeartbeatHandler; +import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor; /** Asynchronously send RPC requests to AINodes. */ public class AsyncAINodeHeartbeatClientPool { @@ -35,7 +36,8 @@ private AsyncAINodeHeartbeatClientPool() { clientManager = new IClientManager.Factory() .createClientManager( - new ClientPoolFactory.AsyncAINodeHeartbeatServiceClientPoolFactory()); + new ClientPoolFactory.AsyncAINodeHeartbeatServiceClientPoolFactory( + ConfigNodeDescriptor.getInstance().getConf().getSelectorNumOfClientManager())); } /** diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncConfigNodeHeartbeatClientPool.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncConfigNodeHeartbeatClientPool.java index 7b6bca5d0d9e7..a6dffbe0eef63 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncConfigNodeHeartbeatClientPool.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncConfigNodeHeartbeatClientPool.java @@ -24,6 +24,7 @@ import org.apache.iotdb.commons.client.IClientManager; import org.apache.iotdb.commons.client.async.AsyncConfigNodeInternalServiceClient; import org.apache.iotdb.confignode.client.async.handlers.heartbeat.ConfigNodeHeartbeatHandler; +import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor; import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeHeartbeatReq; public class AsyncConfigNodeHeartbeatClientPool { @@ -34,7 +35,8 @@ private AsyncConfigNodeHeartbeatClientPool() { clientManager = new IClientManager.Factory() .createClientManager( - new ClientPoolFactory.AsyncConfigNodeHeartbeatServiceClientPoolFactory()); + new ClientPoolFactory.AsyncConfigNodeHeartbeatServiceClientPoolFactory( + ConfigNodeDescriptor.getInstance().getConf().getSelectorNumOfClientManager())); } /** diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncDataNodeHeartbeatClientPool.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncDataNodeHeartbeatClientPool.java index 324e351302787..f99a88ebc77ca 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncDataNodeHeartbeatClientPool.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncDataNodeHeartbeatClientPool.java @@ -25,6 +25,7 @@ import org.apache.iotdb.commons.client.async.AsyncDataNodeInternalServiceClient; import org.apache.iotdb.confignode.client.async.handlers.audit.DataNodeWriteAuditLogHandler; import org.apache.iotdb.confignode.client.async.handlers.heartbeat.DataNodeHeartbeatHandler; +import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor; import org.apache.iotdb.mpp.rpc.thrift.TAuditLogReq; import org.apache.iotdb.mpp.rpc.thrift.TDataNodeHeartbeatReq; @@ -37,7 +38,8 @@ private AsyncDataNodeHeartbeatClientPool() { clientManager = new IClientManager.Factory() .createClientManager( - new ClientPoolFactory.AsyncDataNodeHeartbeatServiceClientPoolFactory()); + new ClientPoolFactory.AsyncDataNodeHeartbeatServiceClientPoolFactory( + ConfigNodeDescriptor.getInstance().getConf().getSelectorNumOfClientManager())); } /** diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/CnToCnInternalServiceAsyncRequestManager.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/CnToCnInternalServiceAsyncRequestManager.java index 19eaf9d9a40d2..00267e5a8ccef 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/CnToCnInternalServiceAsyncRequestManager.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/CnToCnInternalServiceAsyncRequestManager.java @@ -30,6 +30,7 @@ import org.apache.iotdb.confignode.client.async.handlers.rpc.ConfigNodeAsyncRequestRPCHandler; import org.apache.iotdb.confignode.client.async.handlers.rpc.ConfigNodeTSStatusRPCHandler; import org.apache.iotdb.confignode.client.async.handlers.rpc.SubmitTestConnectionTaskToConfigNodeRPCHandler; +import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -40,6 +41,10 @@ public class CnToCnInternalServiceAsyncRequestManager private static final Logger LOGGER = LoggerFactory.getLogger(CnToCnInternalServiceAsyncRequestManager.class); + public CnToCnInternalServiceAsyncRequestManager(int selectorNumOfAsyncClientManager) { + super(selectorNumOfAsyncClientManager); + } + @Override protected void initActionMapBuilder() { actionMapBuilder.put( @@ -71,7 +76,8 @@ protected void adjustClientTimeoutIfNecessary( private static class ClientPoolHolder { private static final CnToCnInternalServiceAsyncRequestManager INSTANCE = - new CnToCnInternalServiceAsyncRequestManager(); + new CnToCnInternalServiceAsyncRequestManager( + ConfigNodeDescriptor.getInstance().getConf().getSelectorNumOfClientManager()); private ClientPoolHolder() { // Empty constructor diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/CnToDnInternalServiceAsyncRequestManager.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/CnToDnInternalServiceAsyncRequestManager.java index cd69f8b2c846d..8c7b389bd3dcc 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/CnToDnInternalServiceAsyncRequestManager.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/CnToDnInternalServiceAsyncRequestManager.java @@ -48,6 +48,7 @@ import org.apache.iotdb.confignode.client.async.handlers.rpc.subscription.CheckSchemaRegionUsingTemplateRPCHandler; import org.apache.iotdb.confignode.client.async.handlers.rpc.subscription.ConsumerGroupPushMetaRPCHandler; import org.apache.iotdb.confignode.client.async.handlers.rpc.subscription.TopicPushMetaRPCHandler; +import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor; import org.apache.iotdb.mpp.rpc.thrift.TActiveTriggerInstanceReq; import org.apache.iotdb.mpp.rpc.thrift.TAlterEncodingCompressorReq; import org.apache.iotdb.mpp.rpc.thrift.TAlterTimeSeriesReq; @@ -120,6 +121,10 @@ public class CnToDnInternalServiceAsyncRequestManager private static final Logger LOGGER = LoggerFactory.getLogger(CnToDnInternalServiceAsyncRequestManager.class); + private CnToDnInternalServiceAsyncRequestManager() { + super(ConfigNodeDescriptor.getInstance().getConf().getSelectorNumOfClientManager()); + } + @SuppressWarnings("unchecked") @Override protected void initActionMapBuilder() { diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java index f305b06398db9..64167ba1d2e0f 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java @@ -145,6 +145,17 @@ public class ConfigNodeConfig { */ private int maxClientNumForEachNode = DefaultProperty.MAX_CLIENT_NUM_FOR_EACH_NODE; + private int maxIdleClientNumForEachNode = DefaultProperty.MAX_IDLE_CLIENT_NUM_FOR_EACH_NODE; + + /** + * ClientManager will have so many selector threads (TAsyncClientManager) to distribute to its + * clients. + */ + private int selectorNumOfClientManager = + Runtime.getRuntime().availableProcessors() / 4 > 0 + ? Runtime.getRuntime().availableProcessors() / 4 + : 1; + /** System directory, including version file for each database and metadata. */ private String systemDir = IoTDBConstant.CN_DEFAULT_DATA_DIR + File.separator + IoTDBConstant.SYSTEM_FOLDER_NAME; @@ -460,6 +471,23 @@ public ConfigNodeConfig setMaxClientNumForEachNode(int maxClientNumForEachNode) return this; } + public int getMaxIdleClientNumForEachNode() { + return maxIdleClientNumForEachNode; + } + + public ConfigNodeConfig setMaxIdleClientNumForEachNode(int maxIdleClientNumForEachNode) { + this.maxIdleClientNumForEachNode = maxIdleClientNumForEachNode; + return this; + } + + public int getSelectorNumOfClientManager() { + return selectorNumOfClientManager; + } + + public void setSelectorNumOfClientManager(int selectorNumOfClientManager) { + this.selectorNumOfClientManager = selectorNumOfClientManager; + } + public String getConsensusDir() { return consensusDir; } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java index 77790dae1a903..fa35565ff5101 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java @@ -271,6 +271,24 @@ private void loadProperties(TrimProperties properties) throws BadNodeUrlExceptio "cn_max_client_count_for_each_node_in_client_manager", String.valueOf(conf.getMaxClientNumForEachNode())))); + int cnMaxIdleClientNumForEachNode = + Integer.parseInt( + properties.getProperty( + "cn_max_idle_client_count_for_each_node_in_client_manager", + String.valueOf(conf.getMaxIdleClientNumForEachNode()))); + if (cnMaxIdleClientNumForEachNode >= 0) { + conf.setMaxIdleClientNumForEachNode(cnMaxIdleClientNumForEachNode); + } + + int cnSelectorNumOfClientManager = + Integer.parseInt( + properties.getProperty( + "cn_selector_thread_nums_of_client_manager", + String.valueOf(conf.getSelectorNumOfClientManager()))); + if (cnSelectorNumOfClientManager > 0) { + conf.setSelectorNumOfClientManager(cnSelectorNumOfClientManager); + } + conf.setSystemDir(properties.getProperty("cn_system_dir", conf.getSystemDir())); conf.setConsensusDir(properties.getProperty("cn_consensus_dir", conf.getConsensusDir())); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java index c7fc72152e21e..9b177cffcfa96 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java @@ -137,9 +137,6 @@ public class IoTDBConfig { /** Port which the JDBC server listens to. */ private int rpcPort = 6667; - /** Rpc Selector thread num */ - private int rpcSelectorThreadCount = 1; - /** Max concurrent client number */ private int rpcMaxConcurrentClientNum = 1000; @@ -953,6 +950,8 @@ public class IoTDBConfig { */ private int maxClientNumForEachNode = DefaultProperty.MAX_CLIENT_NUM_FOR_EACH_NODE; + private int maxIdleClientNumForEachNode = DefaultProperty.MAX_IDLE_CLIENT_NUM_FOR_EACH_NODE; + /** * Cache size of partition cache in {@link * org.apache.iotdb.db.queryengine.plan.analyze.ClusterPartitionFetcher} @@ -989,9 +988,6 @@ public class IoTDBConfig { /** ThreadPool size for read operation in coordinator */ private int coordinatorReadExecutorSize = 20; - /** ThreadPool size for write operation in coordinator */ - private int coordinatorWriteExecutorSize = 50; - /** Policy of DataNodeSchemaCache eviction */ private String dataNodeSchemaCacheEvictionPolicy = "FIFO"; @@ -1823,14 +1819,6 @@ public void setUnSeqTsFileSize(long unSeqTsFileSize) { this.unSeqTsFileSize = unSeqTsFileSize; } - public int getRpcSelectorThreadCount() { - return rpcSelectorThreadCount; - } - - public void setRpcSelectorThreadCount(int rpcSelectorThreadCount) { - this.rpcSelectorThreadCount = rpcSelectorThreadCount; - } - public int getRpcMaxConcurrentClientNum() { return rpcMaxConcurrentClientNum; } @@ -3202,6 +3190,14 @@ public void setMaxClientNumForEachNode(int maxClientNumForEachNode) { this.maxClientNumForEachNode = maxClientNumForEachNode; } + public int getMaxIdleClientNumForEachNode() { + return maxIdleClientNumForEachNode; + } + + public void setMaxIdleClientNumForEachNode(int maxIdleClientNumForEachNode) { + this.maxIdleClientNumForEachNode = maxIdleClientNumForEachNode; + } + public int getSelectorNumOfClientManager() { return selectorNumOfClientManager; } @@ -3349,14 +3345,6 @@ public void setCoordinatorReadExecutorSize(int coordinatorReadExecutorSize) { this.coordinatorReadExecutorSize = coordinatorReadExecutorSize; } - public int getCoordinatorWriteExecutorSize() { - return coordinatorWriteExecutorSize; - } - - public void setCoordinatorWriteExecutorSize(int coordinatorWriteExecutorSize) { - this.coordinatorWriteExecutorSize = coordinatorWriteExecutorSize; - } - public TEndPoint getAddressAndPort() { return new TEndPoint(rpcAddress, rpcPort); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java index 065583a383c96..7193fd27c582d 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java @@ -291,11 +291,23 @@ public void loadProperties(TrimProperties properties) throws BadNodeUrlException "dn_max_client_count_for_each_node_in_client_manager", String.valueOf(conf.getMaxClientNumForEachNode())))); - conf.setSelectorNumOfClientManager( + int dnMaxIdleClientNumForEachNode = Integer.parseInt( properties.getProperty( - "dn_selector_thread_count_of_client_manager", - String.valueOf(conf.getSelectorNumOfClientManager())))); + "dn_max_idle_client_count_for_each_node_in_client_manager", + String.valueOf(conf.getMaxIdleClientNumForEachNode()))); + if (dnMaxIdleClientNumForEachNode >= 0) { + conf.setMaxIdleClientNumForEachNode(dnMaxIdleClientNumForEachNode); + } + + int dnSelectorNumOfClientManager = + Integer.parseInt( + properties.getProperty( + "dn_selector_thread_nums_of_client_manager", + String.valueOf(conf.getSelectorNumOfClientManager()))); + if (dnSelectorNumOfClientManager > 0) { + conf.setSelectorNumOfClientManager(dnSelectorNumOfClientManager); + } conf.setRpcPort( Integer.parseInt( @@ -724,18 +736,6 @@ public void loadProperties(TrimProperties properties) throws BadNodeUrlException properties.getProperty( "0.13_data_insert_adapt", String.valueOf(conf.isEnable13DataInsertAdapt())))); - int rpcSelectorThreadNum = - Integer.parseInt( - properties.getProperty( - "dn_rpc_selector_thread_count", - Integer.toString(conf.getRpcSelectorThreadCount()))); - - if (rpcSelectorThreadNum <= 0) { - rpcSelectorThreadNum = 1; - } - - conf.setRpcSelectorThreadCount(rpcSelectorThreadNum); - int maxConcurrentClientNum = Integer.parseInt( properties.getProperty( @@ -980,12 +980,6 @@ public void loadProperties(TrimProperties properties) throws BadNodeUrlException properties.getProperty( "coordinator_read_executor_size", Integer.toString(conf.getCoordinatorReadExecutorSize())))); - conf.setCoordinatorWriteExecutorSize( - Integer.parseInt( - properties.getProperty( - "coordinator_write_executor_size", - Integer.toString(conf.getCoordinatorWriteExecutorSize())))); - conf.setDataNodeTableSchemaCacheSize( Long.parseLong( properties.getProperty( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/DataNodeClientPoolFactory.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/DataNodeClientPoolFactory.java index da0d84d8466fe..c3dd465f4b200 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/DataNodeClientPoolFactory.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/DataNodeClientPoolFactory.java @@ -63,34 +63,6 @@ public GenericKeyedObjectPool createClientPool } } - public static class ClusterDeletionConfigNodeClientPoolFactory - implements IClientPoolFactory { - - @Override - public GenericKeyedObjectPool createClientPool( - ClientManager manager) { - GenericKeyedObjectPool clientPool = - new GenericKeyedObjectPool<>( - new ConfigNodeClient.Factory( - manager, - new ThriftClientProperty.Builder() - .setConnectionTimeoutMs(CONF.getConnectionTimeoutInMS() * 10) - .setRpcThriftCompressionEnabled(CONF.isRpcThriftCompressionEnable()) - .setSelectorNumOfAsyncClientManager( - CONF.getSelectorNumOfClientManager() / 10 > 0 - ? CONF.getSelectorNumOfClientManager() / 10 - : 1) - .build()), - new ClientPoolProperty.Builder() - .setMaxClientNumForEachNode(CONF.getMaxClientNumForEachNode()) - .build() - .getConfig()); - ClientManagerMetrics.getInstance() - .registerClientManager(this.getClass().getSimpleName(), clientPool); - return clientPool; - } - } - public static class AINodeClientPoolFactory implements IClientPoolFactory { @Override diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/cn/DnToCnInternalServiceAsyncRequestManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/cn/DnToCnInternalServiceAsyncRequestManager.java index b6a218a732e2b..9ac6b70841136 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/cn/DnToCnInternalServiceAsyncRequestManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/cn/DnToCnInternalServiceAsyncRequestManager.java @@ -25,6 +25,7 @@ import org.apache.iotdb.commons.client.request.AsyncRequestRPCHandler; import org.apache.iotdb.commons.client.request.ConfigNodeInternalServiceAsyncRequestManager; import org.apache.iotdb.commons.client.request.TestConnectionUtils; +import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -35,6 +36,10 @@ public class DnToCnInternalServiceAsyncRequestManager private static final Logger LOGGER = LoggerFactory.getLogger(DnToCnInternalServiceAsyncRequestManager.class); + public DnToCnInternalServiceAsyncRequestManager() { + super(IoTDBDescriptor.getInstance().getConfig().getSelectorNumOfClientManager()); + } + @Override protected void initActionMapBuilder() { actionMapBuilder.put( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/dn/DataNodeExternalServiceAsyncRequestManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/dn/DataNodeExternalServiceAsyncRequestManager.java index dd56e1366c03b..29c3f76790716 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/dn/DataNodeExternalServiceAsyncRequestManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/dn/DataNodeExternalServiceAsyncRequestManager.java @@ -27,6 +27,7 @@ import org.apache.iotdb.commons.client.request.AsyncRequestContext; import org.apache.iotdb.commons.client.request.AsyncRequestManager; import org.apache.iotdb.commons.client.request.AsyncRequestRPCHandler; +import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -38,12 +39,17 @@ public class DataNodeExternalServiceAsyncRequestManager private static final Logger LOGGER = LoggerFactory.getLogger(DataNodeExternalServiceAsyncRequestManager.class); + private DataNodeExternalServiceAsyncRequestManager() { + super(IoTDBDescriptor.getInstance().getConfig().getSelectorNumOfClientManager()); + } + @Override - protected void initClientManager() { + protected void initClientManager(int selectorNumOfAsyncClientManager) { clientManager = new IClientManager.Factory() .createClientManager( - new ClientPoolFactory.AsyncDataNodeExternalServiceClientPoolFactory()); + new ClientPoolFactory.AsyncDataNodeExternalServiceClientPoolFactory( + selectorNumOfAsyncClientManager)); } @Override diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/dn/DataNodeIntraHeartbeatManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/dn/DataNodeIntraHeartbeatManager.java index d0ba1ba389f5b..b421f9b20da2a 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/dn/DataNodeIntraHeartbeatManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/dn/DataNodeIntraHeartbeatManager.java @@ -23,10 +23,15 @@ import org.apache.iotdb.commons.client.request.AsyncRequestContext; import org.apache.iotdb.commons.client.request.AsyncRequestRPCHandler; import org.apache.iotdb.commons.client.request.DataNodeIntraHeartbeatRequestManager; +import org.apache.iotdb.db.conf.IoTDBDescriptor; public class DataNodeIntraHeartbeatManager extends DataNodeIntraHeartbeatRequestManager { + public DataNodeIntraHeartbeatManager() { + super(IoTDBDescriptor.getInstance().getConfig().getSelectorNumOfClientManager()); + } + @Override protected void initActionMapBuilder() { actionMapBuilder.put( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/dn/DataNodeMPPServiceAsyncRequestManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/dn/DataNodeMPPServiceAsyncRequestManager.java index ab08d83f2645b..e4d190e1571a4 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/dn/DataNodeMPPServiceAsyncRequestManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/dn/DataNodeMPPServiceAsyncRequestManager.java @@ -27,6 +27,7 @@ import org.apache.iotdb.commons.client.request.AsyncRequestContext; import org.apache.iotdb.commons.client.request.AsyncRequestManager; import org.apache.iotdb.commons.client.request.AsyncRequestRPCHandler; +import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -37,14 +38,17 @@ public class DataNodeMPPServiceAsyncRequestManager private static final Logger LOGGER = LoggerFactory.getLogger(DataNodeMPPServiceAsyncRequestManager.class); - public DataNodeMPPServiceAsyncRequestManager() {} + public DataNodeMPPServiceAsyncRequestManager() { + super(IoTDBDescriptor.getInstance().getConfig().getSelectorNumOfClientManager()); + } @Override - protected void initClientManager() { + protected void initClientManager(int selectorNumOfAsyncClientManager) { clientManager = new IClientManager.Factory() .createClientManager( - new ClientPoolFactory.AsyncDataNodeMPPDataExchangeServiceClientPoolFactory()); + new ClientPoolFactory.AsyncDataNodeMPPDataExchangeServiceClientPoolFactory( + selectorNumOfAsyncClientManager)); } @Override diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/dn/DnToDnInternalServiceAsyncRequestManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/dn/DnToDnInternalServiceAsyncRequestManager.java index b57a10bcb4301..29dfbed203f29 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/dn/DnToDnInternalServiceAsyncRequestManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/dn/DnToDnInternalServiceAsyncRequestManager.java @@ -23,6 +23,7 @@ import org.apache.iotdb.commons.client.request.AsyncRequestContext; import org.apache.iotdb.commons.client.request.AsyncRequestRPCHandler; import org.apache.iotdb.commons.client.request.DataNodeInternalServiceRequestManager; +import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.mpp.rpc.thrift.TAttributeUpdateReq; import org.slf4j.Logger; @@ -33,6 +34,10 @@ public class DnToDnInternalServiceAsyncRequestManager private static final Logger LOGGER = LoggerFactory.getLogger(DnToDnInternalServiceAsyncRequestManager.class); + private DnToDnInternalServiceAsyncRequestManager() { + super(IoTDBDescriptor.getInstance().getConfig().getSelectorNumOfClientManager()); + } + @Override protected void initActionMapBuilder() { actionMapBuilder.put( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java index d0c256a34c3fa..7cf6be0d49d99 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java @@ -216,10 +216,10 @@ public class Coordinator { ASYNC_INTERNAL_SERVICE_CLIENT_MANAGER = new IClientManager.Factory() .createClientManager( - new ClientPoolFactory.AsyncDataNodeInternalServiceClientPoolFactory()); + new ClientPoolFactory.AsyncDataNodeInternalServiceClientPoolFactory( + CONFIG.getSelectorNumOfClientManager())); private final ExecutorService executor; - private final ExecutorService writeOperationExecutor; private final ScheduledExecutorService scheduledExecutor; private final ExecutorService dispatchExecutor; @@ -276,7 +276,6 @@ private Coordinator() { this.queryExecutionMap = new ConcurrentHashMap<>(); this.typeManager = new InternalTypeManager(); this.executor = getQueryExecutor(); - this.writeOperationExecutor = getWriteExecutor(); this.scheduledExecutor = getScheduledExecutor(); int dispatchThreadNum = Math.max(20, Runtime.getRuntime().availableProcessors() * 2); this.dispatchExecutor = @@ -410,7 +409,6 @@ private IQueryExecution createQueryExecutionForTreeModel( new TreeModelPlanner( statement, executor, - writeOperationExecutor, scheduledExecutor, partitionFetcher, schemaFetcher, @@ -791,12 +789,6 @@ private ExecutorService getQueryExecutor() { coordinatorReadExecutorSize, ThreadName.MPP_COORDINATOR_EXECUTOR_POOL.getName()); } - private ExecutorService getWriteExecutor() { - int coordinatorWriteExecutorSize = CONFIG.getCoordinatorWriteExecutorSize(); - return IoTDBThreadPoolFactory.newFixedThreadPool( - coordinatorWriteExecutorSize, ThreadName.MPP_COORDINATOR_WRITE_EXECUTOR.getName()); - } - private ScheduledExecutorService getScheduledExecutor() { return IoTDBThreadPoolFactory.newScheduledThreadPool( COORDINATOR_SCHEDULED_EXECUTOR_SIZE, diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java index 659b24b276718..bfee33c8bf1fc 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java @@ -188,7 +188,6 @@ import org.apache.iotdb.db.protocol.client.ConfigNodeClient; import org.apache.iotdb.db.protocol.client.ConfigNodeClientManager; import org.apache.iotdb.db.protocol.client.ConfigNodeInfo; -import org.apache.iotdb.db.protocol.client.DataNodeClientPoolFactory; import org.apache.iotdb.db.protocol.client.an.AINodeClient; import org.apache.iotdb.db.protocol.client.an.AINodeClientManager; import org.apache.iotdb.db.protocol.session.IClientSession; @@ -406,13 +405,6 @@ public class ClusterConfigTaskExecutor implements IConfigTaskExecutor { private static final IClientManager AI_NODE_CLIENT_MANAGER = AINodeClientManager.getInstance(); - /** FIXME Consolidate this clientManager with the upper one. */ - private static final IClientManager - CLUSTER_DELETION_CONFIG_NODE_CLIENT_MANAGER = - new IClientManager.Factory() - .createClientManager( - new DataNodeClientPoolFactory.ClusterDeletionConfigNodeClientPoolFactory()); - private static final class ClusterConfigTaskExecutorHolder { private static final ClusterConfigTaskExecutor INSTANCE = new ClusterConfigTaskExecutor(); @@ -1987,7 +1979,7 @@ public SettableFuture deactivateSchemaTemplate( req.setPathPatternTree( serializePatternListToByteBuffer(deactivateTemplateStatement.getPathPatternList())); try (final ConfigNodeClient client = - CLUSTER_DELETION_CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) { + CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) { TSStatus tsStatus; do { try { @@ -2079,7 +2071,7 @@ public SettableFuture alterSchemaTemplate( alterSchemaTemplateStatement.getOperationType(), alterSchemaTemplateStatement.getTemplateAlterInfo())); try (final ConfigNodeClient client = - CLUSTER_DELETION_CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) { + CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) { TSStatus tsStatus; do { try { @@ -2130,7 +2122,7 @@ public SettableFuture unsetSchemaTemplate( req.setTemplateName(unsetSchemaTemplateStatement.getTemplateName()); req.setPath(unsetSchemaTemplateStatement.getPath().getFullPath()); try (final ConfigNodeClient client = - CLUSTER_DELETION_CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) { + CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) { TSStatus tsStatus; do { try { @@ -2961,7 +2953,7 @@ public SettableFuture alterEncodingCompressor( alterEncodingCompressorStatement.ifExists(), alterEncodingCompressorStatement.isWithAudit()); try (final ConfigNodeClient client = - CLUSTER_DELETION_CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) { + CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) { TSStatus tsStatus; do { try { @@ -3004,7 +2996,7 @@ public SettableFuture deleteTimeSeries( serializePatternListToByteBuffer(deleteTimeSeriesStatement.getPathPatternList())); req.setMayDeleteAudit(deleteTimeSeriesStatement.isMayDeleteAudit()); try (ConfigNodeClient client = - CLUSTER_DELETION_CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) { + CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) { TSStatus tsStatus; do { try { @@ -3046,7 +3038,7 @@ public SettableFuture deleteLogicalView( queryId, serializePatternListToByteBuffer(deleteLogicalViewStatement.getPathPatternList())); try (final ConfigNodeClient client = - CLUSTER_DELETION_CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) { + CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) { TSStatus tsStatus; do { try { @@ -3132,7 +3124,7 @@ public SettableFuture renameLogicalView( new TDeleteLogicalViewReq( queryId, serializePatternListToByteBuffer(Collections.singletonList(oldName))); try (ConfigNodeClient client = - CLUSTER_DELETION_CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) { + CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) { TSStatus tsStatus; do { try { @@ -3204,7 +3196,7 @@ public SettableFuture alterLogicalView( new TAlterLogicalViewReq( context.getQueryId().getId(), ByteBuffer.wrap(stream.toByteArray())); try (final ConfigNodeClient client = - CLUSTER_DELETION_CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) { + CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) { TSStatus tsStatus; do { try { @@ -3262,7 +3254,7 @@ public TSStatus alterLogicalViewByPipe( .setIsGeneratedByPipe(shouldMarkAsPipeRequest); TSStatus tsStatus; try (final ConfigNodeClient client = - CLUSTER_DELETION_CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) { + CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) { do { try { tsStatus = client.alterLogicalView(req); @@ -4416,7 +4408,7 @@ public SettableFuture alterTableRenameTable( final boolean isView) { final SettableFuture future = SettableFuture.create(); try (final ConfigNodeClient client = - CLUSTER_DELETION_CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) { + CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) { final ByteArrayOutputStream stream = new ByteArrayOutputStream(); try { @@ -4460,7 +4452,7 @@ public SettableFuture alterTableAddColumn( final boolean isView) { final SettableFuture future = SettableFuture.create(); try (final ConfigNodeClient client = - CLUSTER_DELETION_CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) { + CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) { final TSStatus tsStatus = sendAlterReq2ConfigNode( @@ -4499,7 +4491,7 @@ public SettableFuture alterColumnDataType( final boolean isView) { final SettableFuture future = SettableFuture.create(); try (final ConfigNodeClient client = - CLUSTER_DELETION_CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) { + CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) { final TSStatus tsStatus = sendAlterReq2ConfigNode( @@ -4538,7 +4530,7 @@ public SettableFuture alterTableRenameColumn( final boolean isView) { final SettableFuture future = SettableFuture.create(); try (final ConfigNodeClient client = - CLUSTER_DELETION_CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) { + CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) { final ByteArrayOutputStream stream = new ByteArrayOutputStream(); try { @@ -4584,7 +4576,7 @@ public SettableFuture alterTableDropColumn( final boolean isView) { final SettableFuture future = SettableFuture.create(); try (final ConfigNodeClient client = - CLUSTER_DELETION_CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) { + CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) { final ByteArrayOutputStream stream = new ByteArrayOutputStream(); try { @@ -4628,7 +4620,7 @@ public SettableFuture alterTableSetProperties( final boolean isView) { final SettableFuture future = SettableFuture.create(); try (final ConfigNodeClient client = - CLUSTER_DELETION_CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) { + CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) { final ByteArrayOutputStream stream = new ByteArrayOutputStream(); try { @@ -4670,7 +4662,7 @@ public SettableFuture alterTableCommentTable( final boolean isView) { final SettableFuture future = SettableFuture.create(); try (final ConfigNodeClient client = - CLUSTER_DELETION_CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) { + CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) { final ByteArrayOutputStream stream = new ByteArrayOutputStream(); try { @@ -4714,7 +4706,7 @@ public SettableFuture alterTableCommentColumn( final boolean isView) { final SettableFuture future = SettableFuture.create(); try (final ConfigNodeClient client = - CLUSTER_DELETION_CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) { + CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) { final ByteArrayOutputStream stream = new ByteArrayOutputStream(); try { @@ -4758,7 +4750,7 @@ public SettableFuture dropTable( final boolean isView) { final SettableFuture future = SettableFuture.create(); try (final ConfigNodeClient client = - CLUSTER_DELETION_CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) { + CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) { final TSStatus tsStatus = sendAlterReq2ConfigNode( @@ -4793,7 +4785,7 @@ public SettableFuture deleteDevice( } try (final ConfigNodeClient client = - CLUSTER_DELETION_CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) { + CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) { final ByteArrayOutputStream patternStream = new ByteArrayOutputStream(); try (final DataOutputStream outputStream = new DataOutputStream(patternStream)) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TreeModelPlanner.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TreeModelPlanner.java index f2d901c1540bc..756f17148cad3 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TreeModelPlanner.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TreeModelPlanner.java @@ -59,7 +59,6 @@ public class TreeModelPlanner implements IPlanner { private final Statement statement; private final ExecutorService executor; - private final ExecutorService writeOperationExecutor; private final ScheduledExecutorService scheduledExecutor; private final IPartitionFetcher partitionFetcher; @@ -75,7 +74,6 @@ public class TreeModelPlanner implements IPlanner { public TreeModelPlanner( Statement statement, ExecutorService executor, - ExecutorService writeOperationExecutor, ScheduledExecutorService scheduledExecutor, IPartitionFetcher partitionFetcher, ISchemaFetcher schemaFetcher, @@ -84,7 +82,6 @@ public TreeModelPlanner( asyncInternalServiceClientManager) { this.statement = statement; this.executor = executor; - this.writeOperationExecutor = writeOperationExecutor; this.scheduledExecutor = scheduledExecutor; this.partitionFetcher = partitionFetcher; this.schemaFetcher = schemaFetcher; diff --git a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template index 87be30f4520e0..54d9ccaf5ce74 100644 --- a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template +++ b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template @@ -494,9 +494,10 @@ cn_rpc_max_concurrent_client_num=3000 cn_connection_timeout_ms=60000 # selector thread (TAsyncClientManager) nums for async thread in a clientManager +# When <= 0, use max(1, CPU core number / 4). # effectiveMode: restart # Datatype: int -cn_selector_thread_nums_of_client_manager=1 +cn_selector_thread_nums_of_client_manager=0 # The maximum number of clients that can be allocated for a node in a clientManager. # when the number of the client to a single node exceeds this number, the thread for applying for a client will be blocked @@ -505,6 +506,14 @@ cn_selector_thread_nums_of_client_manager=1 # Datatype: int cn_max_client_count_for_each_node_in_client_manager=1000 +# The maximum number of idle clients that can be retained for a node in a clientManager. +# When the number of idle clients to a single node exceeds this number, excess idle clients will be evicted. +# Idle clients are determined by a time threshold (default 1 minute of inactivity). +# 0 means no idle clients will be retained, connections are destroyed immediately upon return. +# effectiveMode: restart +# Datatype: int +# cn_max_idle_client_count_for_each_node_in_client_manager=1000 + # The maximum session idle time. unit: ms # Idle sessions are the ones that performs neither query or non-query operations for a period of time # Set to 0 to disable session timeout @@ -523,11 +532,6 @@ dn_rpc_thrift_compression_enable=false # this feature is under development, set this as false before it is done. dn_rpc_advanced_compression_enable=false -# the number of rpc selector -# effectiveMode: restart -# Datatype: int -dn_rpc_selector_thread_count=1 - # The maximum number of concurrent clients that can be connected to the dataNode. # effectiveMode: restart # Datatype: int @@ -549,9 +553,10 @@ dn_thrift_init_buffer_size=1024 dn_connection_timeout_ms=60000 # selector thread (TAsyncClientManager) nums for async thread in a clientManager +# When <= 0, use max(1, CPU core number / 4). # effectiveMode: restart # Datatype: int -dn_selector_thread_count_of_client_manager=1 +dn_selector_thread_nums_of_client_manager=0 # The maximum number of clients that can be allocated for a node in a clientManager. # When the number of the client to a single node exceeds this number, the thread for applying for a client will be blocked @@ -560,6 +565,14 @@ dn_selector_thread_count_of_client_manager=1 # Datatype: int dn_max_client_count_for_each_node_in_client_manager=1000 +# The maximum number of idle clients that can be retained for a node in a clientManager. +# When the number of idle clients to a single node exceeds this number, excess idle clients will be evicted. +# Idle clients are determined by a time threshold (default 1 minute of inactivity). +# 0 means no idle clients will be retained, connections are destroyed immediately upon return. +# effectiveMode: restart +# Datatype: int +# dn_max_idle_client_count_for_each_node_in_client_manager=1000 + #################### ### REST Service Configuration #################### diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientPoolFactory.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientPoolFactory.java index ea20f9c76ccd8..ce5c836688980 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientPoolFactory.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientPoolFactory.java @@ -72,6 +72,12 @@ public GenericKeyedObjectPool createCli public static class AsyncConfigNodeInternalServiceClientPoolFactory implements IClientPoolFactory { + private final int selectorNumOfAsyncClientManager; + + public AsyncConfigNodeInternalServiceClientPoolFactory(int selectorNumOfAsyncClientManager) { + this.selectorNumOfAsyncClientManager = selectorNumOfAsyncClientManager; + } + @Override public GenericKeyedObjectPool createClientPool( ClientManager manager) { @@ -82,7 +88,7 @@ public GenericKeyedObjectPool c new ThriftClientProperty.Builder() .setConnectionTimeoutMs(conf.getCnConnectionTimeoutInMS()) .setRpcThriftCompressionEnabled(conf.isRpcThriftCompressionEnabled()) - .setSelectorNumOfAsyncClientManager(conf.getSelectorNumOfClientManager()) + .setSelectorNumOfAsyncClientManager(selectorNumOfAsyncClientManager) .build(), ThreadName.ASYNC_CONFIGNODE_CLIENT_POOL.getName()), new ClientPoolProperty.Builder() @@ -120,6 +126,12 @@ public GenericKeyedObjectPool crea public static class AsyncDataNodeInternalServiceClientPoolFactory implements IClientPoolFactory { + private final int selectorNumOfAsyncClientManager; + + public AsyncDataNodeInternalServiceClientPoolFactory(int selectorNumOfAsyncClientManager) { + this.selectorNumOfAsyncClientManager = selectorNumOfAsyncClientManager; + } + @Override public GenericKeyedObjectPool createClientPool( ClientManager manager) { @@ -130,7 +142,7 @@ public GenericKeyedObjectPool cre new ThriftClientProperty.Builder() .setConnectionTimeoutMs(conf.getDnConnectionTimeoutInMS()) .setRpcThriftCompressionEnabled(conf.isRpcThriftCompressionEnabled()) - .setSelectorNumOfAsyncClientManager(conf.getSelectorNumOfClientManager()) + .setSelectorNumOfAsyncClientManager(selectorNumOfAsyncClientManager) .setPrintLogWhenEncounterException(false) .build(), ThreadName.ASYNC_DATANODE_CLIENT_POOL.getName()), @@ -146,6 +158,12 @@ public GenericKeyedObjectPool cre public static class AsyncDataNodeExternalServiceClientPoolFactory implements IClientPoolFactory { + private final int selectorNumOfAsyncClientManager; + + public AsyncDataNodeExternalServiceClientPoolFactory(int selectorNumOfAsyncClientManager) { + this.selectorNumOfAsyncClientManager = selectorNumOfAsyncClientManager; + } + @Override public GenericKeyedObjectPool createClientPool( ClientManager manager) { @@ -156,7 +174,7 @@ public GenericKeyedObjectPool cre new ThriftClientProperty.Builder() .setConnectionTimeoutMs(conf.getDnConnectionTimeoutInMS()) .setRpcThriftCompressionEnabled(conf.isRpcThriftCompressionEnabled()) - .setSelectorNumOfAsyncClientManager(conf.getSelectorNumOfClientManager()) + .setSelectorNumOfAsyncClientManager(selectorNumOfAsyncClientManager) .build(), ThreadName.ASYNC_DATANODE_CLIENT_POOL.getName()), new ClientPoolProperty.Builder() @@ -171,6 +189,12 @@ public GenericKeyedObjectPool cre public static class AsyncConfigNodeHeartbeatServiceClientPoolFactory implements IClientPoolFactory { + private final int selectorNumOfAsyncClientManager; + + public AsyncConfigNodeHeartbeatServiceClientPoolFactory(int selectorNumOfAsyncClientManager) { + this.selectorNumOfAsyncClientManager = selectorNumOfAsyncClientManager; + } + @Override public GenericKeyedObjectPool createClientPool( ClientManager manager) { @@ -182,7 +206,7 @@ public GenericKeyedObjectPool c new ThriftClientProperty.Builder() .setConnectionTimeoutMs(conf.getCnConnectionTimeoutInMS()) .setRpcThriftCompressionEnabled(conf.isRpcThriftCompressionEnabled()) - .setSelectorNumOfAsyncClientManager(conf.getSelectorNumOfClientManager()) + .setSelectorNumOfAsyncClientManager(selectorNumOfAsyncClientManager) .setPrintLogWhenEncounterException(false) .build(), ThreadName.ASYNC_CONFIGNODE_HEARTBEAT_CLIENT_POOL.getName()), @@ -197,6 +221,13 @@ public GenericKeyedObjectPool c public static class AsyncDataNodeHeartbeatServiceClientPoolFactory implements IClientPoolFactory { + + private final int selectorNumOfAsyncClientManager; + + public AsyncDataNodeHeartbeatServiceClientPoolFactory(int selectorNumOfAsyncClientManager) { + this.selectorNumOfAsyncClientManager = selectorNumOfAsyncClientManager; + } + @Override public GenericKeyedObjectPool createClientPool( ClientManager manager) { @@ -207,7 +238,7 @@ public GenericKeyedObjectPool cre new ThriftClientProperty.Builder() .setConnectionTimeoutMs(conf.getCnConnectionTimeoutInMS()) .setRpcThriftCompressionEnabled(conf.isRpcThriftCompressionEnabled()) - .setSelectorNumOfAsyncClientManager(conf.getSelectorNumOfClientManager()) + .setSelectorNumOfAsyncClientManager(selectorNumOfAsyncClientManager) .setPrintLogWhenEncounterException(false) .build(), ThreadName.ASYNC_DATANODE_HEARTBEAT_CLIENT_POOL.getName()), @@ -247,6 +278,13 @@ public static class SyncDataNodeMPPDataExchangeServiceClientPoolFactory public static class AsyncDataNodeMPPDataExchangeServiceClientPoolFactory implements IClientPoolFactory { + private final int selectorNumOfAsyncClientManager; + + public AsyncDataNodeMPPDataExchangeServiceClientPoolFactory( + int selectorNumOfAsyncClientManager) { + this.selectorNumOfAsyncClientManager = selectorNumOfAsyncClientManager; + } + @Override public GenericKeyedObjectPool createClientPool( @@ -258,7 +296,7 @@ public static class AsyncDataNodeMPPDataExchangeServiceClientPoolFactory new ThriftClientProperty.Builder() .setConnectionTimeoutMs(conf.getDnConnectionTimeoutInMS()) .setRpcThriftCompressionEnabled(conf.isRpcThriftCompressionEnabled()) - .setSelectorNumOfAsyncClientManager(conf.getSelectorNumOfClientManager()) + .setSelectorNumOfAsyncClientManager(selectorNumOfAsyncClientManager) .build(), ThreadName.ASYNC_DATANODE_MPP_DATA_EXCHANGE_CLIENT_POOL.getName()), new ClientPoolProperty.Builder() @@ -414,6 +452,12 @@ public GenericKeyedObjectPool createClientPool( public static class AsyncAINodeHeartbeatServiceClientPoolFactory implements IClientPoolFactory { + private final int selectorNumOfAsyncClientManager; + + public AsyncAINodeHeartbeatServiceClientPoolFactory(int selectorNumOfAsyncClientManager) { + this.selectorNumOfAsyncClientManager = selectorNumOfAsyncClientManager; + } + @Override public GenericKeyedObjectPool createClientPool( ClientManager manager) { @@ -424,7 +468,7 @@ public GenericKeyedObjectPool creat new ThriftClientProperty.Builder() .setConnectionTimeoutMs(conf.getCnConnectionTimeoutInMS()) .setRpcThriftCompressionEnabled(conf.isRpcThriftCompressionEnabled()) - .setSelectorNumOfAsyncClientManager(conf.getSelectorNumOfClientManager()) + .setSelectorNumOfAsyncClientManager(selectorNumOfAsyncClientManager) .setPrintLogWhenEncounterException(false) .build(), ThreadName.ASYNC_DATANODE_HEARTBEAT_CLIENT_POOL.getName()), diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/property/ClientPoolProperty.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/property/ClientPoolProperty.java index 5bb7c22ee4994..1f818afe543da 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/property/ClientPoolProperty.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/property/ClientPoolProperty.java @@ -19,6 +19,8 @@ package org.apache.iotdb.commons.client.property; +import org.apache.iotdb.commons.conf.CommonDescriptor; + import org.apache.commons.pool2.impl.GenericKeyedObjectPoolConfig; import java.time.Duration; @@ -49,7 +51,11 @@ public static class Builder { * the maximum number of clients that can be allocated for a node. When some clients are idle * for more than {@code maxIdleTimeForClient}, they will be cleaned up. */ - private int maxClientNumForEachNode = DefaultProperty.MAX_CLIENT_NUM_FOR_EACH_NODE; + private int maxClientNumForEachNode = + CommonDescriptor.getInstance().getConfig().getMaxClientNumForEachNode(); + + private int maxIdleClientNumForEachNode = + CommonDescriptor.getInstance().getConfig().getMaxIdleClientNumForEachNode(); /** * the minimum amount of time a client may sit idle in the pool before it is eligible for @@ -74,6 +80,11 @@ public Builder setMaxClientNumForEachNode(int maxClientNumForEachNode) { return this; } + public Builder setMaxIdleClientNumForEachNode(int maxIdleClientNumForEachNode) { + this.maxIdleClientNumForEachNode = maxIdleClientNumForEachNode; + return this; + } + public Builder setMinIdleTimeForClient(long minIdleTimeForClient) { this.minIdleTimeForClient = minIdleTimeForClient; return this; @@ -87,7 +98,7 @@ public Builder setTimeBetweenEvictionRuns(long timeBetweenEvictionRuns) { public ClientPoolProperty build() { GenericKeyedObjectPoolConfig poolConfig = new GenericKeyedObjectPoolConfig<>(); poolConfig.setMaxTotalPerKey(maxClientNumForEachNode); - poolConfig.setMaxIdlePerKey(maxClientNumForEachNode); + poolConfig.setMaxIdlePerKey(maxIdleClientNumForEachNode); poolConfig.setTimeBetweenEvictionRuns(Duration.ofMillis(timeBetweenEvictionRuns)); poolConfig.setMinEvictableIdleTime(Duration.ofMillis(minIdleTimeForClient)); poolConfig.setMaxWait(Duration.ofMillis(waitClientTimeoutMs)); @@ -105,5 +116,6 @@ private DefaultProperty() {} public static final long MIN_IDLE_TIME_FOR_CLIENT_MS = TimeUnit.MINUTES.toMillis(1); public static final long TIME_BETWEEN_EVICTION_RUNS_MS = TimeUnit.MINUTES.toMillis(1); public static final int MAX_CLIENT_NUM_FOR_EACH_NODE = 1000; + public static final int MAX_IDLE_CLIENT_NUM_FOR_EACH_NODE = 1000; } } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/property/ThriftClientProperty.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/property/ThriftClientProperty.java index f8fe16166a8c8..f157ee7df936f 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/property/ThriftClientProperty.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/property/ThriftClientProperty.java @@ -117,7 +117,10 @@ private DefaultProperty() {} public static final boolean RPC_THRIFT_COMPRESSED_ENABLED = false; public static final int CONNECTION_TIMEOUT_MS = (int) TimeUnit.SECONDS.toMillis(20); public static final int CONNECTION_NEVER_TIMEOUT_MS = 0; - public static final int SELECTOR_NUM_OF_ASYNC_CLIENT_MANAGER = 1; + public static final int SELECTOR_NUM_OF_ASYNC_CLIENT_MANAGER = + Runtime.getRuntime().availableProcessors() / 4 > 0 + ? Runtime.getRuntime().availableProcessors() / 4 + : 1; public static final boolean PRINT_LOG_WHEN_ENCOUNTER_EXCEPTION = true; } } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/request/AsyncRequestManager.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/request/AsyncRequestManager.java index 8053b677c782e..5c069368e4197 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/request/AsyncRequestManager.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/request/AsyncRequestManager.java @@ -53,15 +53,15 @@ public abstract class AsyncRequestManager { private static final int MAX_RETRY_NUM = 6; - protected AsyncRequestManager() { - initClientManager(); + protected AsyncRequestManager(int selectorNumOfAsyncClientManager) { + initClientManager(selectorNumOfAsyncClientManager); actionMapBuilder = ImmutableMap.builder(); initActionMapBuilder(); this.actionMap = this.actionMapBuilder.build(); checkActionMapCompleteness(); } - protected abstract void initClientManager(); + protected abstract void initClientManager(int selectorNumOfAsyncClientManager); protected abstract void initActionMapBuilder(); diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/request/ConfigNodeInternalServiceAsyncRequestManager.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/request/ConfigNodeInternalServiceAsyncRequestManager.java index 791a1e5df0e96..3b50c29fba6bf 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/request/ConfigNodeInternalServiceAsyncRequestManager.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/request/ConfigNodeInternalServiceAsyncRequestManager.java @@ -28,12 +28,18 @@ public abstract class ConfigNodeInternalServiceAsyncRequestManager extends AsyncRequestManager< RequestType, TConfigNodeLocation, AsyncConfigNodeInternalServiceClient> { + + protected ConfigNodeInternalServiceAsyncRequestManager(int selectorNumOfAsyncClientManager) { + super(selectorNumOfAsyncClientManager); + } + @Override - protected void initClientManager() { + protected void initClientManager(int selectorNumOfAsyncClientManager) { clientManager = new IClientManager.Factory() .createClientManager( - new ClientPoolFactory.AsyncConfigNodeInternalServiceClientPoolFactory()); + new ClientPoolFactory.AsyncConfigNodeInternalServiceClientPoolFactory( + selectorNumOfAsyncClientManager)); } @Override diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/request/DataNodeInternalServiceRequestManager.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/request/DataNodeInternalServiceRequestManager.java index fcb1b01857df8..722d4f241ebe1 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/request/DataNodeInternalServiceRequestManager.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/request/DataNodeInternalServiceRequestManager.java @@ -28,12 +28,18 @@ public abstract class DataNodeInternalServiceRequestManager extends AsyncRequestManager< RequestType, TDataNodeLocation, AsyncDataNodeInternalServiceClient> { + + protected DataNodeInternalServiceRequestManager(int selectorNumOfAsyncClientManager) { + super(selectorNumOfAsyncClientManager); + } + @Override - protected void initClientManager() { + protected void initClientManager(int selectorNumOfAsyncClientManager) { clientManager = new IClientManager.Factory() .createClientManager( - new ClientPoolFactory.AsyncDataNodeInternalServiceClientPoolFactory()); + new ClientPoolFactory.AsyncDataNodeInternalServiceClientPoolFactory( + selectorNumOfAsyncClientManager)); } @Override diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/request/DataNodeIntraHeartbeatRequestManager.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/request/DataNodeIntraHeartbeatRequestManager.java index f00de855eb8c4..34e44e5c8cfd8 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/request/DataNodeIntraHeartbeatRequestManager.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/request/DataNodeIntraHeartbeatRequestManager.java @@ -29,12 +29,17 @@ public abstract class DataNodeIntraHeartbeatRequestManager extends AsyncRequestManager< RequestType, TDataNodeLocation, AsyncDataNodeInternalServiceClient> { + protected DataNodeIntraHeartbeatRequestManager(int selectorNumOfAsyncClientManager) { + super(selectorNumOfAsyncClientManager); + } + @Override - protected void initClientManager() { + protected void initClientManager(int selectorNumOfAsyncClientManager) { clientManager = new IClientManager.Factory() .createClientManager( - new ClientPoolFactory.AsyncDataNodeHeartbeatServiceClientPoolFactory()); + new ClientPoolFactory.AsyncDataNodeHeartbeatServiceClientPoolFactory( + selectorNumOfAsyncClientManager)); } @Override diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java index 20fa9d78d5927..b08416bd8c6a9 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java @@ -45,7 +45,6 @@ public enum ThreadName { MPP_COORDINATOR_EXECUTOR_POOL("MPP-Coordinator-Executor"), DATANODE_INTERNAL_RPC_SERVICE("DataNodeInternalRPC-Service"), DATANODE_INTERNAL_RPC_PROCESSOR("DataNodeInternalRPC-Processor"), - MPP_COORDINATOR_WRITE_EXECUTOR("MPP-Coordinator-Write-Executor"), ASYNC_DATANODE_MPP_DATA_EXCHANGE_CLIENT_POOL("AsyncDataNodeMPPDataExchangeServiceClientPool"), // -------------------------- Compaction -------------------------- COMPACTION_WORKER("Compaction-Worker"), @@ -231,7 +230,6 @@ public enum ThreadName { MPP_COORDINATOR_EXECUTOR_POOL, DATANODE_INTERNAL_RPC_SERVICE, DATANODE_INTERNAL_RPC_PROCESSOR, - MPP_COORDINATOR_WRITE_EXECUTOR, ASYNC_DATANODE_MPP_DATA_EXCHANGE_CLIENT_POOL)); private static final Set compactionThreadNames = new HashSet<>(Arrays.asList(COMPACTION_WORKER, COMPACTION_SUB_TASK, COMPACTION_SCHEDULE)); diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java index 19313723e55f5..e34d5804cac9d 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java @@ -153,13 +153,18 @@ public class CommonConfig { * ClientManager will have so many selector threads (TAsyncClientManager) to distribute to its * clients. */ - private int selectorNumOfClientManager = 1; + private int selectorNumOfClientManager = + Runtime.getRuntime().availableProcessors() / 4 > 0 + ? Runtime.getRuntime().availableProcessors() / 4 + : 1; /** Whether to use thrift compression. */ private boolean isRpcThriftCompressionEnabled = false; private int maxClientNumForEachNode = DefaultProperty.MAX_CLIENT_NUM_FOR_EACH_NODE; + private int maxIdleClientNumForEachNode = DefaultProperty.MAX_IDLE_CLIENT_NUM_FOR_EACH_NODE; + /** What will the system do when unrecoverable error occurs. */ private HandleSystemErrorStrategy handleSystemErrorStrategy = HandleSystemErrorStrategy.CHANGE_TO_READ_ONLY; @@ -713,6 +718,14 @@ public void setMaxClientNumForEachNode(int maxClientNumForEachNode) { this.maxClientNumForEachNode = maxClientNumForEachNode; } + public int getMaxIdleClientNumForEachNode() { + return maxIdleClientNumForEachNode; + } + + public void setMaxIdleClientNumForEachNode(int maxIdleClientNumForEachNode) { + this.maxIdleClientNumForEachNode = maxIdleClientNumForEachNode; + } + HandleSystemErrorStrategy getHandleSystemErrorStrategy() { return handleSystemErrorStrategy; } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java index d392a60bbbd76..5cd954a09f7b8 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java @@ -139,13 +139,16 @@ public void loadCommonProps(TrimProperties properties) throws IOException { "cn_connection_timeout_ms", String.valueOf(config.getCnConnectionTimeoutInMS())) .trim())); - config.setSelectorNumOfClientManager( + int cnSelectorNumOfClientManager = Integer.parseInt( properties .getProperty( "cn_selector_thread_nums_of_client_manager", String.valueOf(config.getSelectorNumOfClientManager())) - .trim())); + .trim()); + if (cnSelectorNumOfClientManager > 0) { + config.setSelectorNumOfClientManager(cnSelectorNumOfClientManager); + } config.setMaxClientNumForEachNode( Integer.parseInt( @@ -155,6 +158,17 @@ public void loadCommonProps(TrimProperties properties) throws IOException { String.valueOf(config.getMaxClientNumForEachNode())) .trim())); + int cnMaxIdleClientNumForEachNode = + Integer.parseInt( + properties + .getProperty( + "cn_max_idle_client_count_for_each_node_in_client_manager", + String.valueOf(config.getMaxIdleClientNumForEachNode())) + .trim()); + if (cnMaxIdleClientNumForEachNode >= 0) { + config.setMaxIdleClientNumForEachNode(cnMaxIdleClientNumForEachNode); + } + config.setDnConnectionTimeoutInMS( Integer.parseInt( properties @@ -170,13 +184,16 @@ public void loadCommonProps(TrimProperties properties) throws IOException { String.valueOf(config.isRpcThriftCompressionEnabled())) .trim())); - config.setSelectorNumOfClientManager( + int dnSelectorNumOfClientManager = Integer.parseInt( properties .getProperty( "dn_selector_thread_nums_of_client_manager", String.valueOf(config.getSelectorNumOfClientManager())) - .trim())); + .trim()); + if (dnSelectorNumOfClientManager > 0) { + config.setSelectorNumOfClientManager(dnSelectorNumOfClientManager); + } config.setMaxClientNumForEachNode( Integer.parseInt( @@ -186,6 +203,17 @@ public void loadCommonProps(TrimProperties properties) throws IOException { String.valueOf(config.getMaxClientNumForEachNode())) .trim())); + int dnMaxIdleClientNumForEachNode = + Integer.parseInt( + properties + .getProperty( + "dn_max_idle_client_count_for_each_node_in_client_manager", + String.valueOf(config.getMaxIdleClientNumForEachNode())) + .trim()); + if (dnMaxIdleClientNumForEachNode >= 0) { + config.setMaxIdleClientNumForEachNode(dnMaxIdleClientNumForEachNode); + } + config.setHandleSystemErrorStrategy( HandleSystemErrorStrategy.valueOf( properties diff --git a/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/client/ClientManagerTest.java b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/client/ClientManagerTest.java index 8b12d667f74c7..be7cf49239a63 100644 --- a/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/client/ClientManagerTest.java +++ b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/client/ClientManagerTest.java @@ -214,6 +214,7 @@ public void evictionTest() throws Exception { manager, new ThriftClientProperty.Builder().build()), new ClientPoolProperty.Builder() .setMaxClientNumForEachNode(maxClientForEachNode) + .setMaxIdleClientNumForEachNode(maxClientForEachNode) .setMinIdleTimeForClient(minIdleDuration) .setTimeBetweenEvictionRuns(evictionRunsDuration) .build() @@ -294,6 +295,7 @@ public void maxTotalTest() throws Exception { manager, new ThriftClientProperty.Builder().build()), new ClientPoolProperty.Builder() .setMaxClientNumForEachNode(maxTotalClientForEachNode) + .setMaxIdleClientNumForEachNode(maxTotalClientForEachNode) .setWaitClientTimeoutMs(waitClientTimeoutMs) .build() .getConfig()); @@ -369,6 +371,7 @@ public void maxWaitClientTimeoutTest() throws Exception { new ClientPoolProperty.Builder() .setWaitClientTimeoutMs(waitClientTimeoutMS) .setMaxClientNumForEachNode(maxTotalClientForEachNode) + .setMaxIdleClientNumForEachNode(maxTotalClientForEachNode) .build() .getConfig()); } @@ -617,7 +620,13 @@ public GenericKeyedObjectPool crea new ThriftClientProperty.Builder() .setConnectionTimeoutMs(CONNECTION_TIMEOUT) .build()), - new ClientPoolProperty.Builder().build().getConfig()); + new ClientPoolProperty.Builder() + .setMaxClientNumForEachNode( + ClientPoolProperty.DefaultProperty.MAX_CLIENT_NUM_FOR_EACH_NODE) + .setMaxIdleClientNumForEachNode( + ClientPoolProperty.DefaultProperty.MAX_IDLE_CLIENT_NUM_FOR_EACH_NODE) + .build() + .getConfig()); } } @@ -632,7 +641,13 @@ public GenericKeyedObjectPool cre manager, new ThriftClientProperty.Builder().setConnectionTimeoutMs(CONNECTION_TIMEOUT).build(), ThreadName.ASYNC_DATANODE_CLIENT_POOL.getName()), - new ClientPoolProperty.Builder().build().getConfig()); + new ClientPoolProperty.Builder() + .setMaxClientNumForEachNode( + ClientPoolProperty.DefaultProperty.MAX_CLIENT_NUM_FOR_EACH_NODE) + .setMaxIdleClientNumForEachNode( + ClientPoolProperty.DefaultProperty.MAX_IDLE_CLIENT_NUM_FOR_EACH_NODE) + .build() + .getConfig()); } } }