Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.iotdb.commons.client.IClientManager;
import org.apache.iotdb.commons.client.async.AsyncAINodeInternalServiceClient;
import org.apache.iotdb.confignode.client.async.handlers.heartbeat.AINodeHeartbeatHandler;
import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;

/** Asynchronously send RPC requests to AINodes. */
public class AsyncAINodeHeartbeatClientPool {
Expand All @@ -35,7 +36,8 @@ private AsyncAINodeHeartbeatClientPool() {
clientManager =
new IClientManager.Factory<TEndPoint, AsyncAINodeInternalServiceClient>()
.createClientManager(
new ClientPoolFactory.AsyncAINodeHeartbeatServiceClientPoolFactory());
new ClientPoolFactory.AsyncAINodeHeartbeatServiceClientPoolFactory(
ConfigNodeDescriptor.getInstance().getConf().getSelectorNumOfClientManager()));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.iotdb.commons.client.IClientManager;
import org.apache.iotdb.commons.client.async.AsyncConfigNodeInternalServiceClient;
import org.apache.iotdb.confignode.client.async.handlers.heartbeat.ConfigNodeHeartbeatHandler;
import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeHeartbeatReq;

public class AsyncConfigNodeHeartbeatClientPool {
Expand All @@ -34,7 +35,8 @@ private AsyncConfigNodeHeartbeatClientPool() {
clientManager =
new IClientManager.Factory<TEndPoint, AsyncConfigNodeInternalServiceClient>()
.createClientManager(
new ClientPoolFactory.AsyncConfigNodeHeartbeatServiceClientPoolFactory());
new ClientPoolFactory.AsyncConfigNodeHeartbeatServiceClientPoolFactory(
ConfigNodeDescriptor.getInstance().getConf().getSelectorNumOfClientManager()));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.iotdb.commons.client.async.AsyncDataNodeInternalServiceClient;
import org.apache.iotdb.confignode.client.async.handlers.audit.DataNodeWriteAuditLogHandler;
import org.apache.iotdb.confignode.client.async.handlers.heartbeat.DataNodeHeartbeatHandler;
import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
import org.apache.iotdb.mpp.rpc.thrift.TAuditLogReq;
import org.apache.iotdb.mpp.rpc.thrift.TDataNodeHeartbeatReq;

Expand All @@ -37,7 +38,8 @@ private AsyncDataNodeHeartbeatClientPool() {
clientManager =
new IClientManager.Factory<TEndPoint, AsyncDataNodeInternalServiceClient>()
.createClientManager(
new ClientPoolFactory.AsyncDataNodeHeartbeatServiceClientPoolFactory());
new ClientPoolFactory.AsyncDataNodeHeartbeatServiceClientPoolFactory(
ConfigNodeDescriptor.getInstance().getConf().getSelectorNumOfClientManager()));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.iotdb.confignode.client.async.handlers.rpc.ConfigNodeAsyncRequestRPCHandler;
import org.apache.iotdb.confignode.client.async.handlers.rpc.ConfigNodeTSStatusRPCHandler;
import org.apache.iotdb.confignode.client.async.handlers.rpc.SubmitTestConnectionTaskToConfigNodeRPCHandler;
import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -40,6 +41,10 @@ public class CnToCnInternalServiceAsyncRequestManager
private static final Logger LOGGER =
LoggerFactory.getLogger(CnToCnInternalServiceAsyncRequestManager.class);

public CnToCnInternalServiceAsyncRequestManager(int selectorNumOfAsyncClientManager) {
super(selectorNumOfAsyncClientManager);
}

@Override
protected void initActionMapBuilder() {
actionMapBuilder.put(
Expand Down Expand Up @@ -71,7 +76,8 @@ protected void adjustClientTimeoutIfNecessary(

private static class ClientPoolHolder {
private static final CnToCnInternalServiceAsyncRequestManager INSTANCE =
new CnToCnInternalServiceAsyncRequestManager();
new CnToCnInternalServiceAsyncRequestManager(
ConfigNodeDescriptor.getInstance().getConf().getSelectorNumOfClientManager());

private ClientPoolHolder() {
// Empty constructor
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import org.apache.iotdb.confignode.client.async.handlers.rpc.subscription.CheckSchemaRegionUsingTemplateRPCHandler;
import org.apache.iotdb.confignode.client.async.handlers.rpc.subscription.ConsumerGroupPushMetaRPCHandler;
import org.apache.iotdb.confignode.client.async.handlers.rpc.subscription.TopicPushMetaRPCHandler;
import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
import org.apache.iotdb.mpp.rpc.thrift.TActiveTriggerInstanceReq;
import org.apache.iotdb.mpp.rpc.thrift.TAlterEncodingCompressorReq;
import org.apache.iotdb.mpp.rpc.thrift.TAlterTimeSeriesReq;
Expand Down Expand Up @@ -120,6 +121,10 @@ public class CnToDnInternalServiceAsyncRequestManager
private static final Logger LOGGER =
LoggerFactory.getLogger(CnToDnInternalServiceAsyncRequestManager.class);

private CnToDnInternalServiceAsyncRequestManager() {
super(ConfigNodeDescriptor.getInstance().getConf().getSelectorNumOfClientManager());
}

@SuppressWarnings("unchecked")
@Override
protected void initActionMapBuilder() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,17 @@ public class ConfigNodeConfig {
*/
private int maxClientNumForEachNode = DefaultProperty.MAX_CLIENT_NUM_FOR_EACH_NODE;

private int maxIdleClientNumForEachNode = DefaultProperty.MAX_IDLE_CLIENT_NUM_FOR_EACH_NODE;

/**
* ClientManager will have so many selector threads (TAsyncClientManager) to distribute to its
* clients.
*/
private int selectorNumOfClientManager =
Runtime.getRuntime().availableProcessors() / 4 > 0
? Runtime.getRuntime().availableProcessors() / 4
: 1;

/** System directory, including version file for each database and metadata. */
private String systemDir =
IoTDBConstant.CN_DEFAULT_DATA_DIR + File.separator + IoTDBConstant.SYSTEM_FOLDER_NAME;
Expand Down Expand Up @@ -460,6 +471,23 @@ public ConfigNodeConfig setMaxClientNumForEachNode(int maxClientNumForEachNode)
return this;
}

public int getMaxIdleClientNumForEachNode() {
return maxIdleClientNumForEachNode;
}

public ConfigNodeConfig setMaxIdleClientNumForEachNode(int maxIdleClientNumForEachNode) {
this.maxIdleClientNumForEachNode = maxIdleClientNumForEachNode;
return this;
}

public int getSelectorNumOfClientManager() {
return selectorNumOfClientManager;
}

public void setSelectorNumOfClientManager(int selectorNumOfClientManager) {
this.selectorNumOfClientManager = selectorNumOfClientManager;
}

public String getConsensusDir() {
return consensusDir;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@
}
}

private void loadProperties(TrimProperties properties) throws BadNodeUrlException, IOException {

Check failure on line 149 in iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Refactor this method to reduce its Cognitive Complexity from 17 to the 15 allowed.

See more on https://sonarcloud.io/project/issues?id=apache_iotdb&issues=AZ380nxvZ2vKmsiQtuah&open=AZ380nxvZ2vKmsiQtuah&pullRequest=17551
ConfigurationFileUtils.updateAppliedProperties(properties, false);
conf.setClusterName(properties.getProperty(IoTDBConstant.CLUSTER_NAME, conf.getClusterName()));

Expand Down Expand Up @@ -271,6 +271,24 @@
"cn_max_client_count_for_each_node_in_client_manager",
String.valueOf(conf.getMaxClientNumForEachNode()))));

int cnMaxIdleClientNumForEachNode =
Integer.parseInt(
properties.getProperty(
"cn_max_idle_client_count_for_each_node_in_client_manager",
String.valueOf(conf.getMaxIdleClientNumForEachNode())));
if (cnMaxIdleClientNumForEachNode >= 0) {
conf.setMaxIdleClientNumForEachNode(cnMaxIdleClientNumForEachNode);
}

int cnSelectorNumOfClientManager =
Integer.parseInt(
properties.getProperty(
"cn_selector_thread_nums_of_client_manager",
String.valueOf(conf.getSelectorNumOfClientManager())));
if (cnSelectorNumOfClientManager > 0) {
conf.setSelectorNumOfClientManager(cnSelectorNumOfClientManager);
}

conf.setSystemDir(properties.getProperty("cn_system_dir", conf.getSystemDir()));

conf.setConsensusDir(properties.getProperty("cn_consensus_dir", conf.getConsensusDir()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,9 +137,6 @@ public class IoTDBConfig {
/** Port which the JDBC server listens to. */
private int rpcPort = 6667;

/** Rpc Selector thread num */
private int rpcSelectorThreadCount = 1;

/** Max concurrent client number */
private int rpcMaxConcurrentClientNum = 1000;

Expand Down Expand Up @@ -953,6 +950,8 @@ public class IoTDBConfig {
*/
private int maxClientNumForEachNode = DefaultProperty.MAX_CLIENT_NUM_FOR_EACH_NODE;

private int maxIdleClientNumForEachNode = DefaultProperty.MAX_IDLE_CLIENT_NUM_FOR_EACH_NODE;

/**
* Cache size of partition cache in {@link
* org.apache.iotdb.db.queryengine.plan.analyze.ClusterPartitionFetcher}
Expand Down Expand Up @@ -989,9 +988,6 @@ public class IoTDBConfig {
/** ThreadPool size for read operation in coordinator */
private int coordinatorReadExecutorSize = 20;

/** ThreadPool size for write operation in coordinator */
private int coordinatorWriteExecutorSize = 50;

/** Policy of DataNodeSchemaCache eviction */
private String dataNodeSchemaCacheEvictionPolicy = "FIFO";

Expand Down Expand Up @@ -1823,14 +1819,6 @@ public void setUnSeqTsFileSize(long unSeqTsFileSize) {
this.unSeqTsFileSize = unSeqTsFileSize;
}

public int getRpcSelectorThreadCount() {
return rpcSelectorThreadCount;
}

public void setRpcSelectorThreadCount(int rpcSelectorThreadCount) {
this.rpcSelectorThreadCount = rpcSelectorThreadCount;
}

public int getRpcMaxConcurrentClientNum() {
return rpcMaxConcurrentClientNum;
}
Expand Down Expand Up @@ -3202,6 +3190,14 @@ public void setMaxClientNumForEachNode(int maxClientNumForEachNode) {
this.maxClientNumForEachNode = maxClientNumForEachNode;
}

public int getMaxIdleClientNumForEachNode() {
return maxIdleClientNumForEachNode;
}

public void setMaxIdleClientNumForEachNode(int maxIdleClientNumForEachNode) {
this.maxIdleClientNumForEachNode = maxIdleClientNumForEachNode;
}

public int getSelectorNumOfClientManager() {
return selectorNumOfClientManager;
}
Expand Down Expand Up @@ -3349,14 +3345,6 @@ public void setCoordinatorReadExecutorSize(int coordinatorReadExecutorSize) {
this.coordinatorReadExecutorSize = coordinatorReadExecutorSize;
}

public int getCoordinatorWriteExecutorSize() {
return coordinatorWriteExecutorSize;
}

public void setCoordinatorWriteExecutorSize(int coordinatorWriteExecutorSize) {
this.coordinatorWriteExecutorSize = coordinatorWriteExecutorSize;
}

public TEndPoint getAddressAndPort() {
return new TEndPoint(rpcAddress, rpcPort);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -291,11 +291,23 @@ public void loadProperties(TrimProperties properties) throws BadNodeUrlException
"dn_max_client_count_for_each_node_in_client_manager",
String.valueOf(conf.getMaxClientNumForEachNode()))));

conf.setSelectorNumOfClientManager(
int dnMaxIdleClientNumForEachNode =
Integer.parseInt(
properties.getProperty(
"dn_selector_thread_count_of_client_manager",
String.valueOf(conf.getSelectorNumOfClientManager()))));
"dn_max_idle_client_count_for_each_node_in_client_manager",
String.valueOf(conf.getMaxIdleClientNumForEachNode())));
if (dnMaxIdleClientNumForEachNode >= 0) {
conf.setMaxIdleClientNumForEachNode(dnMaxIdleClientNumForEachNode);
}

int dnSelectorNumOfClientManager =
Integer.parseInt(
properties.getProperty(
"dn_selector_thread_nums_of_client_manager",
String.valueOf(conf.getSelectorNumOfClientManager())));
if (dnSelectorNumOfClientManager > 0) {
conf.setSelectorNumOfClientManager(dnSelectorNumOfClientManager);
}

conf.setRpcPort(
Integer.parseInt(
Expand Down Expand Up @@ -724,18 +736,6 @@ public void loadProperties(TrimProperties properties) throws BadNodeUrlException
properties.getProperty(
"0.13_data_insert_adapt", String.valueOf(conf.isEnable13DataInsertAdapt()))));

int rpcSelectorThreadNum =
Integer.parseInt(
properties.getProperty(
"dn_rpc_selector_thread_count",
Integer.toString(conf.getRpcSelectorThreadCount())));

if (rpcSelectorThreadNum <= 0) {
rpcSelectorThreadNum = 1;
}

conf.setRpcSelectorThreadCount(rpcSelectorThreadNum);

int maxConcurrentClientNum =
Integer.parseInt(
properties.getProperty(
Expand Down Expand Up @@ -980,12 +980,6 @@ public void loadProperties(TrimProperties properties) throws BadNodeUrlException
properties.getProperty(
"coordinator_read_executor_size",
Integer.toString(conf.getCoordinatorReadExecutorSize()))));
conf.setCoordinatorWriteExecutorSize(
Integer.parseInt(
properties.getProperty(
"coordinator_write_executor_size",
Integer.toString(conf.getCoordinatorWriteExecutorSize()))));

conf.setDataNodeTableSchemaCacheSize(
Long.parseLong(
properties.getProperty(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,34 +63,6 @@ public GenericKeyedObjectPool<ConfigRegionId, ConfigNodeClient> createClientPool
}
}

public static class ClusterDeletionConfigNodeClientPoolFactory
implements IClientPoolFactory<ConfigRegionId, ConfigNodeClient> {

@Override
public GenericKeyedObjectPool<ConfigRegionId, ConfigNodeClient> createClientPool(
ClientManager<ConfigRegionId, ConfigNodeClient> manager) {
GenericKeyedObjectPool<ConfigRegionId, ConfigNodeClient> clientPool =
new GenericKeyedObjectPool<>(
new ConfigNodeClient.Factory(
manager,
new ThriftClientProperty.Builder()
.setConnectionTimeoutMs(CONF.getConnectionTimeoutInMS() * 10)
.setRpcThriftCompressionEnabled(CONF.isRpcThriftCompressionEnable())
.setSelectorNumOfAsyncClientManager(
CONF.getSelectorNumOfClientManager() / 10 > 0
? CONF.getSelectorNumOfClientManager() / 10
: 1)
.build()),
new ClientPoolProperty.Builder<ConfigNodeClient>()
.setMaxClientNumForEachNode(CONF.getMaxClientNumForEachNode())
.build()
.getConfig());
ClientManagerMetrics.getInstance()
.registerClientManager(this.getClass().getSimpleName(), clientPool);
return clientPool;
}
}

public static class AINodeClientPoolFactory implements IClientPoolFactory<Integer, AINodeClient> {

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.iotdb.commons.client.request.AsyncRequestRPCHandler;
import org.apache.iotdb.commons.client.request.ConfigNodeInternalServiceAsyncRequestManager;
import org.apache.iotdb.commons.client.request.TestConnectionUtils;
import org.apache.iotdb.db.conf.IoTDBDescriptor;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -35,6 +36,10 @@ public class DnToCnInternalServiceAsyncRequestManager
private static final Logger LOGGER =
LoggerFactory.getLogger(DnToCnInternalServiceAsyncRequestManager.class);

public DnToCnInternalServiceAsyncRequestManager() {
super(IoTDBDescriptor.getInstance().getConfig().getSelectorNumOfClientManager());
}

@Override
protected void initActionMapBuilder() {
actionMapBuilder.put(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.iotdb.commons.client.request.AsyncRequestContext;
import org.apache.iotdb.commons.client.request.AsyncRequestManager;
import org.apache.iotdb.commons.client.request.AsyncRequestRPCHandler;
import org.apache.iotdb.db.conf.IoTDBDescriptor;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -38,12 +39,17 @@ public class DataNodeExternalServiceAsyncRequestManager
private static final Logger LOGGER =
LoggerFactory.getLogger(DataNodeExternalServiceAsyncRequestManager.class);

private DataNodeExternalServiceAsyncRequestManager() {
super(IoTDBDescriptor.getInstance().getConfig().getSelectorNumOfClientManager());
}

@Override
protected void initClientManager() {
protected void initClientManager(int selectorNumOfAsyncClientManager) {
clientManager =
new IClientManager.Factory<TEndPoint, AsyncDataNodeExternalServiceClient>()
.createClientManager(
new ClientPoolFactory.AsyncDataNodeExternalServiceClientPoolFactory());
new ClientPoolFactory.AsyncDataNodeExternalServiceClientPoolFactory(
selectorNumOfAsyncClientManager));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,15 @@
import org.apache.iotdb.commons.client.request.AsyncRequestContext;
import org.apache.iotdb.commons.client.request.AsyncRequestRPCHandler;
import org.apache.iotdb.commons.client.request.DataNodeIntraHeartbeatRequestManager;
import org.apache.iotdb.db.conf.IoTDBDescriptor;

public class DataNodeIntraHeartbeatManager
extends DataNodeIntraHeartbeatRequestManager<DnToDnRequestType> {

public DataNodeIntraHeartbeatManager() {
super(IoTDBDescriptor.getInstance().getConfig().getSelectorNumOfClientManager());
}

@Override
protected void initActionMapBuilder() {
actionMapBuilder.put(
Expand Down
Loading
Loading