diff --git a/src/main/java/com/actiontech/dble/DbleServer.java b/src/main/java/com/actiontech/dble/DbleServer.java index c1cced9a5..6e48e0d3f 100644 --- a/src/main/java/com/actiontech/dble/DbleServer.java +++ b/src/main/java/com/actiontech/dble/DbleServer.java @@ -208,6 +208,9 @@ public void startup() throws Exception { this.config.testConnection(); LOGGER.info("==========================================Test connection finish=================================="); + this.config.createDelayDetectTable(); + LOGGER.info("==========================================Create delay detect table finish=================================="); + // sync global status this.config.getAndSyncKeyVariables(); LOGGER.info("=====================================Get And Sync KeyVariables finish============================="); diff --git a/src/main/java/com/actiontech/dble/backend/datasource/PhysicalDbGroup.java b/src/main/java/com/actiontech/dble/backend/datasource/PhysicalDbGroup.java index b82c721ea..f8ee541c6 100644 --- a/src/main/java/com/actiontech/dble/backend/datasource/PhysicalDbGroup.java +++ b/src/main/java/com/actiontech/dble/backend/datasource/PhysicalDbGroup.java @@ -30,6 +30,7 @@ import java.io.IOException; import java.lang.reflect.Type; import java.util.*; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.ReentrantReadWriteLock; public class PhysicalDbGroup { @@ -42,11 +43,15 @@ public class PhysicalDbGroup { public static final int RW_SPLIT_ALL = 2; // weight public static final int WEIGHT = 0; - private final List writeInstanceList; + + enum USAGE { + NONE, RW, SHARDING; + } private final String groupName; private final DbGroupConfig dbGroupConfig; private volatile PhysicalDbInstance writeDbInstance; + private final List writeInstanceList; private Map allSourceMap = new HashMap<>(); private final int rwSplitMode; @@ -55,8 +60,10 @@ public class PhysicalDbGroup { private final LocalReadLoadBalancer localReadLoadBalancer = new LocalReadLoadBalancer(); private final ReentrantReadWriteLock adjustLock = new ReentrantReadWriteLock(); - private boolean shardingUseless = true; - private boolean rwSplitUseless = true; + //delayDetection + private AtomicLong logicTimestamp = new AtomicLong(); + + private USAGE usedFor = USAGE.NONE; public PhysicalDbGroup(String name, DbGroupConfig config, PhysicalDbInstance writeDbInstances, PhysicalDbInstance[] readDbInstances, int rwSplitMode) { this.groupName = name; @@ -66,8 +73,8 @@ public PhysicalDbGroup(String name, DbGroupConfig config, PhysicalDbInstance wri writeDbInstances.setDbGroup(this); this.writeDbInstance = writeDbInstances; this.writeInstanceList = Collections.singletonList(writeDbInstance); - allSourceMap.put(writeDbInstances.getName(), writeDbInstances); + allSourceMap.put(writeDbInstances.getName(), writeDbInstances); for (PhysicalDbInstance readDbInstance : readDbInstances) { readDbInstance.setDbGroup(this); allSourceMap.put(readDbInstance.getName(), readDbInstance); @@ -89,6 +96,54 @@ public PhysicalDbGroup(PhysicalDbGroup org) { writeInstanceList = Collections.singletonList(writeDbInstance); } + public void init(String reason) { + for (Map.Entry entry : allSourceMap.entrySet()) { + entry.getValue().init(reason); + } + } + + // only fresh backend connection pool + public void init(List sourceNames, String reason) { + for (String sourceName : sourceNames) { + if (allSourceMap.containsKey(sourceName)) { + allSourceMap.get(sourceName).init(reason, false); + } + } + } + + public void stop(String reason) { + stop(reason, false); + } + + public void stop(String reason, boolean closeFront) { + for (PhysicalDbInstance dbInstance : allSourceMap.values()) { + dbInstance.stop(reason, closeFront); + } + } + + // only fresh backend connection pool + public void stop(List sourceNames, String reason, boolean closeFront) { + for (String sourceName : sourceNames) { + if (allSourceMap.containsKey(sourceName)) { + allSourceMap.get(sourceName).stop(reason, closeFront, false); + } + } + + if (closeFront) { + Iterator iterator = IOProcessor.BACKENDS_OLD.iterator(); + while (iterator.hasNext()) { + PooledConnection con = iterator.next(); + if (con instanceof BackendConnection) { + BackendConnection backendCon = (BackendConnection) con; + if (backendCon.getPoolDestroyedTime() != 0 && sourceNames.contains(backendCon.getInstance().getConfig().getInstanceName())) { + backendCon.closeWithFront("old active backend conn will be forced closed by closing front conn"); + iterator.remove(); + } + } + } + } + } + public String getGroupName() { return groupName; } @@ -125,7 +180,7 @@ PhysicalDbInstance findDbInstance(BackendConnection exitsCon) { } boolean isSlave(PhysicalDbInstance ds) { - return !(writeDbInstance == ds); + return writeDbInstance != ds; } public int getRwSplitMode() { @@ -133,80 +188,38 @@ public int getRwSplitMode() { } public boolean isUseless() { - return shardingUseless && rwSplitUseless; + return usedFor == USAGE.NONE; + } + + public boolean usedForSharding() { + return usedFor == USAGE.SHARDING; } - public boolean isShardingUseless() { - return shardingUseless; + public boolean usedForRW() { + return usedFor == USAGE.RW; } - public boolean isRwSplitUseless() { - return rwSplitUseless; + public void setUsedForSharding() { + usedFor = USAGE.SHARDING; } - public void setShardingUseless(boolean shardingUseless) { - this.shardingUseless = shardingUseless; + public void setUsedForRW() { + usedFor = USAGE.RW; } - public void setRwSplitUseless(boolean rwSplitUseless) { - this.rwSplitUseless = rwSplitUseless; + public USAGE getUsedFor() { + return usedFor; } private boolean checkSlaveSynStatus() { - return (dbGroupConfig.getDelayThreshold() != -1) && - (dbGroupConfig.isShowSlaveSql()); + return ((dbGroupConfig.getDelayThreshold() != -1) && dbGroupConfig.isShowSlaveSql()) || + dbGroupConfig.isDelayDetection(); } public PhysicalDbInstance getWriteDbInstance() { return writeDbInstance; } - public void init(String reason) { - for (Map.Entry entry : allSourceMap.entrySet()) { - entry.getValue().init(reason); - } - } - - public void init(List sourceNames, String reason) { - for (String sourceName : sourceNames) { - if (allSourceMap.containsKey(sourceName)) { - allSourceMap.get(sourceName).init(reason, false); - } - } - } - - public void stop(String reason) { - stop(reason, false); - } - - public void stop(String reason, boolean closeFront) { - for (PhysicalDbInstance dbInstance : allSourceMap.values()) { - dbInstance.stop(reason, closeFront); - } - } - - public void stop(List sourceNames, String reason, boolean closeFront) { - for (String sourceName : sourceNames) { - if (allSourceMap.containsKey(sourceName)) { - allSourceMap.get(sourceName).stop(reason, closeFront, false); - } - } - - if (closeFront) { - Iterator iterator = IOProcessor.BACKENDS_OLD.iterator(); - while (iterator.hasNext()) { - PooledConnection con = iterator.next(); - if (con instanceof BackendConnection) { - BackendConnection backendCon = (BackendConnection) con; - if (backendCon.getPoolDestroyedTime() != 0 && sourceNames.contains(backendCon.getInstance().getConfig().getInstanceName())) { - backendCon.closeWithFront("old active backend conn will be forced closed by closing front conn"); - iterator.remove(); - } - } - } - } - } - public Collection getDbInstances(boolean isAll) { if (!isAll && rwSplitMode == RW_SPLIT_OFF) { return writeInstanceList; @@ -230,18 +243,6 @@ public PhysicalDbInstance[] getReadDbInstances() { return readSources; } - /** - * rwsplit user - * - * @param master - * @param writeStatistical - * @return - * @throws IOException - */ - public PhysicalDbInstance rwSelect(Boolean master, Boolean writeStatistical) throws IOException { - return rwSelect(master, writeStatistical, false); - } - /** * rwsplit user * @@ -546,6 +547,14 @@ public boolean checkInstanceExist(String instanceName) { return true; } + public AtomicLong getLogicTimestamp() { + return logicTimestamp; + } + + public void setLogicTimestamp(AtomicLong logicTimestamp) { + this.logicTimestamp = logicTimestamp; + } + private void reportHeartbeatError(PhysicalDbInstance ins) throws IOException { final DbInstanceConfig config = ins.getConfig(); String heartbeatError = "the dbInstance[" + config.getUrl() + "] can't reach. Please check the dbInstance status"; @@ -565,7 +574,9 @@ public boolean equalsBaseInfo(PhysicalDbGroup pool) { pool.getDbGroupConfig().getErrorRetryCount() == this.dbGroupConfig.getErrorRetryCount() && pool.getDbGroupConfig().getRwSplitMode() == this.dbGroupConfig.getRwSplitMode() && pool.getDbGroupConfig().getDelayThreshold() == this.dbGroupConfig.getDelayThreshold() && + pool.getDbGroupConfig().getDelayPeriodMillis() == this.dbGroupConfig.getDelayPeriodMillis() && + pool.getDbGroupConfig().getDelayDatabase().equals(this.dbGroupConfig.getDelayDatabase()) && pool.getDbGroupConfig().isDisableHA() == this.dbGroupConfig.isDisableHA() && - pool.getGroupName().equals(this.groupName) && pool.isShardingUseless() == this.isShardingUseless() && pool.isRwSplitUseless() == this.isRwSplitUseless(); + pool.getGroupName().equals(this.groupName) && pool.getUsedFor() == this.getUsedFor(); } } diff --git a/src/main/java/com/actiontech/dble/backend/datasource/PhysicalDbInstance.java b/src/main/java/com/actiontech/dble/backend/datasource/PhysicalDbInstance.java index 76a60f713..a23b9f162 100644 --- a/src/main/java/com/actiontech/dble/backend/datasource/PhysicalDbInstance.java +++ b/src/main/java/com/actiontech/dble/backend/datasource/PhysicalDbInstance.java @@ -18,15 +18,12 @@ import com.actiontech.dble.net.factory.MySQLConnectionFactory; import com.actiontech.dble.net.service.AbstractService; import com.actiontech.dble.services.mysqlsharding.MySQLResponseService; -import com.actiontech.dble.singleton.Scheduler; import com.actiontech.dble.singleton.TraceManager; import com.actiontech.dble.util.StringUtil; -import com.actiontech.dble.util.TimeUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.LongAdder; @@ -54,7 +51,6 @@ public abstract class PhysicalDbInstance implements ReadTimeStatusInstance { private final LongAdder writeCount = new LongAdder(); private final AtomicBoolean isInitial = new AtomicBoolean(false); - private AtomicBoolean initHeartbeat = new AtomicBoolean(false); // connection pool private ConnectionPool connectionPool; @@ -96,11 +92,20 @@ public void init(String reason, boolean isInitHeartbeat) { return; } + if (dbGroup.usedForSharding()) { + checkPoolSize(); + } + + LOGGER.info("init dbInstance[{}]", this.dbGroup.getGroupName() + "." + name); + start(reason, isInitHeartbeat); + } + + private void checkPoolSize() { int size = config.getMinCon(); String[] physicalSchemas = dbGroup.getSchemas(); int initSize = physicalSchemas.length; if (size < initSize) { - LOGGER.warn("For db instance[{}], minIdle is less than (the count of shardingNodes), so dble will create at least 1 conn for every schema, " + + LOGGER.warn("For db instance[{}], minIdle is less than (the count of shardingNodes/apNodes), so dble will create at least 1 conn for every schema, " + "minCon size before:{}, now:{}", this.dbGroup.getGroupName() + "." + name, size, initSize); config.setMinCon(initSize); } @@ -108,12 +113,9 @@ public void init(String reason, boolean isInitHeartbeat) { initSize = Math.max(initSize, config.getMinCon()); size = config.getMaxCon(); if (size < initSize) { - LOGGER.warn("For db instance[{}], maxTotal[{}] is less than the minCon or the count of shardingNodes,change the maxCon into {}", this.dbGroup.getGroupName() + "." + name, size, initSize); + LOGGER.warn("For db instance[{}], maxTotal[{}] is less than the minCon or the count of shardingNodes/apNodes,change the maxCon into {}", this.dbGroup.getGroupName() + "." + name, size, initSize); config.setMaxCon(initSize); } - - LOGGER.info("init dbInstance[{}]", this.dbGroup.getGroupName() + "." + name); - start(reason, isInitHeartbeat); } public void createConnectionSkipPool(String schema, ResponseHandler handler) { @@ -363,22 +365,11 @@ private void startHeartbeat() { LOGGER.info("the instance[{}] is disabled or fake node, skip to start heartbeat.", this.dbGroup.getGroupName() + "." + name); return; } + heartbeat.start(heartbeatRecoveryTime); + } - heartbeat.start(); - if (initHeartbeat.compareAndSet(false, true)) { - - heartbeat.setScheduledFuture(Scheduler.getInstance().getScheduledExecutor().scheduleAtFixedRate(() -> { - if (DbleServer.getInstance().getConfig().isFullyConfigured()) { - if (TimeUtil.currentTimeMillis() < heartbeatRecoveryTime) { - return; - } - - heartbeat.heartbeat(); - } - }, 0L, config.getPoolConfig().getHeartbeatPeriodMillis(), TimeUnit.MILLISECONDS)); - } else { - LOGGER.warn("init dbInstance[{}] heartbeat, but it has been initialized, skip initialization.", heartbeat.getSource().getName()); - } + private void stopHeartbeat(String reason) { + heartbeat.stop(reason); } public void start(String reason) { @@ -386,13 +377,21 @@ public void start(String reason) { } public void start(String reason, boolean isStartHeartbeat) { + startPool(reason); + if (isStartHeartbeat) { + startHeartbeat(); + } + } + + private void startPool(String reason) { + if (disabled.get() || fakeNode) { + LOGGER.info("init dbInstance[{}] because {}, but it is disabled or a fakeNode, skip initialization.", this.dbGroup.getGroupName() + "." + name, reason); + return; + } if ((dbGroupConfig.getRwSplitMode() != RW_SPLIT_OFF || dbGroup.getWriteDbInstance() == this) && !dbGroup.isUseless()) { LOGGER.info("start connection pool of physical db instance[{}], due to {}", this.dbGroup.getGroupName() + "." + name, reason); this.connectionPool.startEvictor(); } - if (isStartHeartbeat) { - startHeartbeat(); - } } public void stop(String reason, boolean closeFront) { @@ -401,17 +400,18 @@ public void stop(String reason, boolean closeFront) { public void stop(String reason, boolean closeFront, boolean isStopHeartbeat) { if (isStopHeartbeat) { - final boolean stop = heartbeat.isStop(); - heartbeat.stop(reason); - if (!stop) { - initHeartbeat.set(false); - } + stopHeartbeat(reason); } + stopPool(reason, closeFront); + + isInitial.set(false); + } + + private void stopPool(String reason, boolean closeFront) { if (dbGroupConfig.getRwSplitMode() != RW_SPLIT_OFF || dbGroup.getWriteDbInstance() == this) { LOGGER.info("stop connection pool of physical db instance[{}], due to {}", this.dbGroup.getGroupName() + "." + name, reason); connectionPool.stop(reason, closeFront); } - isInitial.set(false); } public void closeAllConnection(String reason) { diff --git a/src/main/java/com/actiontech/dble/backend/delyDetection/DelayDetectionStatus.java b/src/main/java/com/actiontech/dble/backend/delyDetection/DelayDetectionStatus.java new file mode 100644 index 000000000..50c6c120b --- /dev/null +++ b/src/main/java/com/actiontech/dble/backend/delyDetection/DelayDetectionStatus.java @@ -0,0 +1,11 @@ +package com.actiontech.dble.backend.delyDetection; + +public enum DelayDetectionStatus { + INIT(), OK(), TIMEOUT(), ERROR(), STOP(); + + @Override + public String toString() { + return super.toString().toLowerCase(); + } + +} diff --git a/src/main/java/com/actiontech/dble/backend/heartbeat/HeartbeatSQLJob.java b/src/main/java/com/actiontech/dble/backend/heartbeat/HeartbeatSQLJob.java index e58bc3d8e..781a89246 100644 --- a/src/main/java/com/actiontech/dble/backend/heartbeat/HeartbeatSQLJob.java +++ b/src/main/java/com/actiontech/dble/backend/heartbeat/HeartbeatSQLJob.java @@ -24,7 +24,7 @@ public class HeartbeatSQLJob implements ResponseHandler { public static final Logger LOGGER = LoggerFactory.getLogger(HeartbeatSQLJob.class); - private final String sql; + private volatile String sql; private final SQLJobHandler jobHandler; /* * (null, 0) -> initial @@ -37,11 +37,19 @@ public class HeartbeatSQLJob implements ResponseHandler { public HeartbeatSQLJob(MySQLHeartbeat heartbeat, SQLJobHandler jobHandler) { super(); - this.sql = heartbeat.getHeartbeatSQL(); this.jobHandler = jobHandler; this.heartbeat = heartbeat; } + public long getConnectionId() { + final BackendConnection con = this.connectionRef.getReference(); + long connId = 0; + if (con != null) { + connId = con.getId(); + } + return connId; + } + public void terminate() { if (connectionRef.compareAndSet(null, null, 0, 2)) { LOGGER.info("[heartbeat]terminate timeout heartbeat job."); @@ -72,6 +80,7 @@ public void connectionAcquired(final BackendConnection conn) { if (LOGGER.isDebugEnabled()) { LOGGER.debug("[heartbeat]do heartbeat,conn is " + conn); } + this.sql = heartbeat.getHeartbeatSQL(); conn.getBackendService().query(sql); } catch (Exception e) { // (UnsupportedEncodingException e) { LOGGER.warn("[heartbeat]send heartbeat error", e); @@ -89,6 +98,7 @@ public void execute() { if (LOGGER.isDebugEnabled()) { LOGGER.debug("[heartbeat]do heartbeat,conn is {}", conn); } + this.sql = heartbeat.getHeartbeatSQL(); conn.getBackendService().query(sql); } catch (Exception e) { // (UnsupportedEncodingException e) { LOGGER.warn("[heartbeat]send heartbeat error", e); diff --git a/src/main/java/com/actiontech/dble/backend/heartbeat/MySQLDefaultDetector.java b/src/main/java/com/actiontech/dble/backend/heartbeat/MySQLDefaultDetector.java new file mode 100644 index 000000000..8a472cc58 --- /dev/null +++ b/src/main/java/com/actiontech/dble/backend/heartbeat/MySQLDefaultDetector.java @@ -0,0 +1,31 @@ +/* + * Copyright (C) 2016-2022 ActionTech. + * based on code by MyCATCopyrightHolder Copyright (c) 2013, OpenCloudDB/MyCAT. + * License: http://www.gnu.org/licenses/gpl.html GPL version 2 or higher. + */ +package com.actiontech.dble.backend.heartbeat; + +import com.actiontech.dble.backend.datasource.PhysicalDbInstance; +import com.actiontech.dble.sqlengine.OneRawSQLQueryResultHandler; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; + +/** + * @author mycat + */ +public class MySQLDefaultDetector extends MySQLDetector { + public static final Logger LOGGER = LoggerFactory.getLogger(MySQLDefaultDetector.class); + + public MySQLDefaultDetector(MySQLHeartbeat heartbeat) { + super(heartbeat); + String[] fetchCols = {}; + this.sqlJob = new HeartbeatSQLJob(heartbeat, new OneRawSQLQueryResultHandler(fetchCols, this)); + } + + @Override + protected void setStatus(PhysicalDbInstance source, Map resultResult) { + // heartbeat.setResult(MySQLHeartbeat.OK_STATUS); + } +} diff --git a/src/main/java/com/actiontech/dble/backend/heartbeat/MySQLDelayDetector.java b/src/main/java/com/actiontech/dble/backend/heartbeat/MySQLDelayDetector.java new file mode 100644 index 000000000..9a458e272 --- /dev/null +++ b/src/main/java/com/actiontech/dble/backend/heartbeat/MySQLDelayDetector.java @@ -0,0 +1,60 @@ +/* + * Copyright (C) 2016-2022 ActionTech. + * based on code by MyCATCopyrightHolder Copyright (c) 2013, OpenCloudDB/MyCAT. + * License: http://www.gnu.org/licenses/gpl.html GPL version 2 or higher. + */ +package com.actiontech.dble.backend.heartbeat; + +import com.actiontech.dble.backend.datasource.PhysicalDbGroup; +import com.actiontech.dble.backend.datasource.PhysicalDbInstance; +import com.actiontech.dble.sqlengine.OneRawSQLQueryResultHandler; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; +import java.util.Optional; + +/** + * @author mycat + */ +public class MySQLDelayDetector extends MySQLDetector { + public static final Logger LOGGER = LoggerFactory.getLogger(MySQLDelayDetector.class); + private static final String[] MYSQL_DELAY_DETECTION_COLS = new String[]{ + "logic_timestamp", + }; + + public MySQLDelayDetector(MySQLHeartbeat heartbeat) { + super(heartbeat); + this.sqlJob = new HeartbeatSQLJob(heartbeat, new OneRawSQLQueryResultHandler(MYSQL_DELAY_DETECTION_COLS, this)); + } + + @Override + protected void setStatus(PhysicalDbInstance source, Map resultResult) { + if (source.isReadInstance()) { + String logicTimestamp = Optional.ofNullable(resultResult.get("logic_timestamp")).orElse("0"); + long logic = Long.parseLong(logicTimestamp); + delayCal(logic, source.getDbGroupConfig().getDelayThreshold()); + } else { + heartbeat.setSlaveBehindMaster(null); + heartbeat.setDbSynStatus(MySQLHeartbeat.DB_SYN_NORMAL); + } + } + + private void delayCal(long delay, long delayThreshold) { + PhysicalDbGroup dbGroup = heartbeat.getSource().getDbGroup(); + long logic = dbGroup.getLogicTimestamp().get(); + long result = logic - delay; + if (result >= 0) { + long delayVal = result * (dbGroup.getDbGroupConfig().getDelayPeriodMillis() / 2); + if (delayThreshold > 0 && delayVal > delayThreshold) { + MySQLHeartbeat.LOGGER.warn("found MySQL master/slave Replication delay !!! " + heartbeat.getSource().getConfig() + ", binlog sync time delay: " + delayVal + "ms"); + } + heartbeat.setSlaveBehindMaster((int) delayVal); + heartbeat.setDbSynStatus(MySQLHeartbeat.DB_SYN_NORMAL); + } else { + // master and slave maybe switch + heartbeat.setSlaveBehindMaster(null); + heartbeat.setDbSynStatus(MySQLHeartbeat.DB_SYN_ERROR); + } + } +} diff --git a/src/main/java/com/actiontech/dble/backend/heartbeat/MySQLDetector.java b/src/main/java/com/actiontech/dble/backend/heartbeat/MySQLDetector.java index d504e88f3..a95f33d63 100644 --- a/src/main/java/com/actiontech/dble/backend/heartbeat/MySQLDetector.java +++ b/src/main/java/com/actiontech/dble/backend/heartbeat/MySQLDetector.java @@ -14,7 +14,6 @@ import com.actiontech.dble.config.helper.GetAndSyncDbInstanceKeyVariables; import com.actiontech.dble.config.helper.KeyVariables; import com.actiontech.dble.config.model.SystemConfig; -import com.actiontech.dble.sqlengine.OneRawSQLQueryResultHandler; import com.actiontech.dble.sqlengine.SQLQueryResult; import com.actiontech.dble.sqlengine.SQLQueryResultListener; import org.slf4j.Logger; @@ -25,36 +24,17 @@ /** * @author mycat */ -public class MySQLDetector implements SQLQueryResultListener>> { +public abstract class MySQLDetector implements SQLQueryResultListener>> { public static final Logger LOGGER = LoggerFactory.getLogger(MySQLDetector.class); - private static final String[] MYSQL_SLAVE_STATUS_COLS = new String[]{ - "Seconds_Behind_Master", - "Slave_IO_Running", - "Slave_SQL_Running", - "Slave_IO_State", - "Master_Host", - "Master_User", - "Master_Port", - "Connect_Retry", - "Last_IO_Error"}; - private static final String[] MYSQL_READ_ONLY_COLS = new String[]{"@@read_only"}; - private final MySQLHeartbeat heartbeat; private volatile long lastSendQryTime; private volatile long lastReceivedQryTime; - private final HeartbeatSQLJob sqlJob; + + protected final MySQLHeartbeat heartbeat; + protected HeartbeatSQLJob sqlJob; public MySQLDetector(MySQLHeartbeat heartbeat) { this.heartbeat = heartbeat; - String[] fetchCols = {}; - if (heartbeat.getSource().getDbGroupConfig().isShowSlaveSql()) { - fetchCols = MYSQL_SLAVE_STATUS_COLS; - } else if (heartbeat.getSource().getDbGroupConfig().isSelectReadOnlySql()) { - fetchCols = MYSQL_READ_ONLY_COLS; - } - - OneRawSQLQueryResultHandler resultHandler = new OneRawSQLQueryResultHandler(fetchCols, this); - this.sqlJob = new HeartbeatSQLJob(heartbeat, resultHandler); } public void heartbeat() { @@ -90,19 +70,20 @@ public void onResult(SQLQueryResult> result) { if (result.isSuccess()) { PhysicalDbInstance source = heartbeat.getSource(); Map resultResult = result.getResult(); - if (source.getDbGroupConfig().isShowSlaveSql()) { - setStatusBySlave(source, resultResult); - } else if (source.getDbGroupConfig().isSelectReadOnlySql()) { - setStatusByReadOnly(source, resultResult); - } else { - setStatusForNormalHeartbeat(source); - } + setStatus(source, resultResult); + if (checkRecoverFail(source)) return; + heartbeat.setResult(MySQLHeartbeat.OK_STATUS); } } - private void setStatusForNormalHeartbeat(PhysicalDbInstance source) { - if (checkRecoverFail(source)) return; - heartbeat.setResult(MySQLHeartbeat.OK_STATUS); + protected abstract void setStatus(PhysicalDbInstance source, Map resultResult); + + public long getHeartbeatConnId() { + if (sqlJob != null) { + return sqlJob.getConnectionId(); + } else { + return 0L; + } } /** @@ -169,47 +150,4 @@ private boolean checkRecoverFail(PhysicalDbInstance source) { } return false; } - - private void setStatusBySlave(PhysicalDbInstance source, Map resultResult) { - String slaveIoRunning = resultResult != null ? resultResult.get("Slave_IO_Running") : null; - String slaveSqlRunning = resultResult != null ? resultResult.get("Slave_SQL_Running") : null; - if (slaveIoRunning != null && slaveIoRunning.equals(slaveSqlRunning) && slaveSqlRunning.equals("Yes")) { - heartbeat.setDbSynStatus(MySQLHeartbeat.DB_SYN_NORMAL); - String secondsBehindMaster = resultResult.get("Seconds_Behind_Master"); - if (null != secondsBehindMaster && !"".equals(secondsBehindMaster) && !"NULL".equalsIgnoreCase(secondsBehindMaster)) { - int behindMaster = Integer.parseInt(secondsBehindMaster); - int delayThreshold = source.getDbGroupConfig().getDelayThreshold(); - if (delayThreshold > 0 && behindMaster > delayThreshold) { - MySQLHeartbeat.LOGGER.warn("found MySQL master/slave Replication delay !!! " + heartbeat.getSource().getConfig() + ", binlog sync time delay: " + behindMaster + "s"); - } - heartbeat.setSlaveBehindMaster(behindMaster); - } else { - heartbeat.setSlaveBehindMaster(null); - } - } else if (source.isSalveOrRead()) { - //String Last_IO_Error = resultResult != null ? resultResult.get("Last_IO_Error") : null; - MySQLHeartbeat.LOGGER.warn("found MySQL master/slave Replication err !!! " + - heartbeat.getSource().getConfig() + ", " + resultResult); - heartbeat.setDbSynStatus(MySQLHeartbeat.DB_SYN_ERROR); - heartbeat.setSlaveBehindMaster(null); - } - heartbeat.getAsyncRecorder().setBySlaveStatus(resultResult); - if (checkRecoverFail(source)) return; - heartbeat.setResult(MySQLHeartbeat.OK_STATUS); - } - - private void setStatusByReadOnly(PhysicalDbInstance source, Map resultResult) { - String readonly = resultResult != null ? resultResult.get("@@read_only") : null; - if (readonly == null) { - heartbeat.setErrorResult("result of select @@read_only is null"); - return; - } else if (readonly.equals("0")) { - source.setReadOnly(false); - } else { - source.setReadOnly(true); - } - if (checkRecoverFail(source)) return; - heartbeat.setResult(MySQLHeartbeat.OK_STATUS); - } - } diff --git a/src/main/java/com/actiontech/dble/backend/heartbeat/MySQLHeartbeat.java b/src/main/java/com/actiontech/dble/backend/heartbeat/MySQLHeartbeat.java index ee970647b..43d161c7f 100644 --- a/src/main/java/com/actiontech/dble/backend/heartbeat/MySQLHeartbeat.java +++ b/src/main/java/com/actiontech/dble/backend/heartbeat/MySQLHeartbeat.java @@ -5,19 +5,29 @@ */ package com.actiontech.dble.backend.heartbeat; +import com.actiontech.dble.DbleServer; import com.actiontech.dble.alarm.AlarmCode; import com.actiontech.dble.alarm.Alert; import com.actiontech.dble.alarm.AlertUtil; import com.actiontech.dble.backend.datasource.PhysicalDbInstance; +import com.actiontech.dble.config.model.SystemConfig; +import com.actiontech.dble.singleton.Scheduler; import com.actiontech.dble.statistic.DbInstanceSyncRecorder; import com.actiontech.dble.statistic.HeartbeatRecorder; +import com.actiontech.dble.util.TimeUtil; +import com.google.common.base.Joiner; +import com.google.common.collect.Lists; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.text.SimpleDateFormat; +import java.time.LocalDateTime; import java.util.Date; +import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; @@ -37,33 +47,47 @@ public class MySQLHeartbeat { public static final int INIT_STATUS = 0; public static final int OK_STATUS = 1; - private static final int ERROR_STATUS = -1; - static final int TIMEOUT_STATUS = -2; - private final int errorRetryCount; + public static final int ERROR_STATUS = -1; + public static final int TIMEOUT_STATUS = -2; + private final AtomicBoolean isChecking = new AtomicBoolean(false); - private final HeartbeatRecorder recorder = new HeartbeatRecorder(); - private final DbInstanceSyncRecorder asyncRecorder = new DbInstanceSyncRecorder(); + private final AtomicBoolean initHeartbeat = new AtomicBoolean(false); private final PhysicalDbInstance source; protected volatile int status; private String heartbeatSQL; - private long heartbeatTimeout; // during the time, heart failed will ignore - private final AtomicInteger errorCount = new AtomicInteger(0); - private AtomicLong startErrorTime = new AtomicLong(-1L); + + private final int errorRetryCount; // when heartbeat error, dble retry count + private final long heartbeatTimeout; // during the time, heart failed will ignore private volatile boolean isStop = true; - private volatile int dbSynStatus = DB_SYN_NORMAL; - private volatile Integer slaveBehindMaster; private MySQLDetector detector; private volatile String message; - private volatile ScheduledFuture scheduledFuture; + private ScheduledFuture scheduledFuture; + + private final AtomicInteger errorCount = new AtomicInteger(0); + private AtomicLong startErrorTime = new AtomicLong(-1L); private AtomicLong errorTimeInLast5Min = new AtomicLong(); private int errorTimeInLast5MinCount = 0; + private final HeartbeatRecorder recorder = new HeartbeatRecorder(); + + private boolean isDelayDetection; + private volatile int logicUpdate = 0; + + private volatile int dbSynStatus = DB_SYN_NORMAL; + private volatile Integer slaveBehindMaster; + private final DbInstanceSyncRecorder asyncRecorder = new DbInstanceSyncRecorder(); + public MySQLHeartbeat(PhysicalDbInstance dbInstance) { this.source = dbInstance; this.status = INIT_STATUS; this.errorRetryCount = dbInstance.getDbGroupConfig().getErrorRetryCount(); this.heartbeatTimeout = dbInstance.getDbGroupConfig().getHeartbeatTimeout(); - this.heartbeatSQL = dbInstance.getDbGroupConfig().getHeartbeatSQL(); + this.isDelayDetection = dbInstance.getDbGroupConfig().isDelayDetection(); + if (isDelayDetection) { + this.heartbeatSQL = getDetectorSql(dbInstance.getDbGroupConfig().getName(), dbInstance.getDbGroupConfig().getDelayDatabase()); + } else { + this.heartbeatSQL = source.getDbGroupConfig().getHeartbeatSQL(); + } } public String getMessage() { @@ -74,10 +98,6 @@ public PhysicalDbInstance getSource() { return source; } - public void setScheduledFuture(ScheduledFuture scheduledFuture) { - this.scheduledFuture = scheduledFuture; - } - public String getLastActiveTime() { if (detector == null) { return null; @@ -87,8 +107,37 @@ public String getLastActiveTime() { return sdf.format(new Date(t)); } - public void start() { + public void start(long heartbeatRecoveryTime) { + LOGGER.info("start heartbeat of instance[{}]", source); + if (Objects.nonNull(scheduledFuture)) { + stop("the legacy thread is not closed"); + } isStop = false; + if (initHeartbeat.compareAndSet(false, true)) { + long heartbeatPeriodMillis; + long initialDelay = 0; + if (isDelayDetection) { + heartbeatPeriodMillis = source.getDbGroupConfig().getDelayPeriodMillis(); + if (source.isReadInstance()) { + initialDelay = source.getDbGroupConfig().getDelayPeriodMillis() / 2; + } + } else { + heartbeatPeriodMillis = (int) source.getConfig().getPoolConfig().getHeartbeatPeriodMillis(); + } + + this.scheduledFuture = Scheduler.getInstance().getScheduledExecutor().scheduleAtFixedRate(() -> { + if (DbleServer.getInstance().getConfig().isFullyConfigured()) { + if (TimeUtil.currentTimeMillis() < heartbeatRecoveryTime) { + return; + } + + heartbeat(); + } + }, initialDelay, heartbeatPeriodMillis, TimeUnit.MILLISECONDS); + } else { + LOGGER.warn("init dbInstance[{}] heartbeat, but it has been initialized, skip initialization.", source.getName()); + } + } public void stop(String reason) { @@ -99,6 +148,7 @@ public void stop(String reason) { LOGGER.info("stop heartbeat of instance[{}], due to {}", source.getConfig().getUrl(), reason); isStop = true; scheduledFuture.cancel(false); + initHeartbeat.set(false); this.status = INIT_STATUS; if (detector != null && !detector.isQuit()) { detector.quit(); @@ -112,7 +162,7 @@ public void stop(String reason) { public void heartbeat() { if (isChecking.compareAndSet(false, true)) { if (detector == null || detector.isQuit()) { - detector = new MySQLDetector(this); + detector = getMySQLDetector(); } detector.heartbeat(); } else { @@ -131,6 +181,44 @@ public void heartbeat() { } } + private String getDetectorSql(String dbGroupName, String delayDatabase) { + String[] str = {"dble", dbGroupName, SystemConfig.getInstance().getInstanceName()}; + String sourceName = Joiner.on("_").join(str); + String sqlTableName = delayDatabase + ".u_delay "; + String detectorSql; + if (!source.isReadInstance()) { + String update = "replace into ? (source,real_timestamp,logic_timestamp) values ('?','?',?)"; + detectorSql = convert(update, Lists.newArrayList(sqlTableName, sourceName)); + } else { + String select = "select logic_timestamp from ? where source = '?'"; + detectorSql = convert(select, Lists.newArrayList(sqlTableName, sourceName)); + } + return detectorSql; + } + + private String convert(String template, List list) { + StringBuilder sb = new StringBuilder(template); + String replace = "?"; + for (String str : list) { + int index = sb.indexOf(replace); + sb.replace(index, index + 1, str); + } + return sb.toString(); + } + + private MySQLDetector getMySQLDetector() { + if (isDelayDetection) { + return new MySQLDelayDetector(this); + } else if (source.getDbGroupConfig().isShowSlaveSql()) { + return new MySQLShowSlaveStatusDetector(this); + } else if (source.getDbGroupConfig().isSelectReadOnlySql()) { + return new MySQLReadOnlyDetector(this); + } else { + return new MySQLDefaultDetector(this); + } + } + + // only use when heartbeat connection is closed boolean doHeartbeatRetry() { if (errorRetryCount > 0 && errorCount.get() < errorRetryCount) { @@ -157,8 +245,7 @@ void setErrorResult(String errMsg) { this.message = errMsg; this.status = ERROR_STATUS; startErrorTime.compareAndSet(-1, System.currentTimeMillis()); - Map labels = AlertUtil.genSingleLabel("dbInstance", this.source.getDbGroupConfig().getName() + "-" + this.source.getConfig().getInstanceName()); - AlertUtil.alert(AlarmCode.HEARTBEAT_FAIL, Alert.AlertLevel.WARN, "heartbeat status:" + this.status, "mysql", this.source.getConfig().getId(), labels); + alert(); if (errorRetryCount > 0 && errorCount.get() < errorRetryCount) { LOGGER.warn("retry to do heartbeat for the " + errorCount.incrementAndGet() + " times"); heartbeat(); // error count not enough, heart beat again @@ -180,11 +267,15 @@ void setResult(int result) { break; } if (this.status != OK_STATUS) { - Map labels = AlertUtil.genSingleLabel("dbInstance", this.source.getDbGroupConfig().getName() + "-" + this.source.getConfig().getInstanceName()); - AlertUtil.alert(AlarmCode.HEARTBEAT_FAIL, Alert.AlertLevel.WARN, "heartbeat status:" + this.status, "mysql", this.source.getConfig().getId(), labels); + alert(); } } + private void alert() { + Map labels = AlertUtil.genSingleLabel("dbInstance", this.source.getDbGroupConfig().getName() + "-" + this.source.getConfig().getInstanceName()); + AlertUtil.alert(AlarmCode.HEARTBEAT_FAIL, Alert.AlertLevel.WARN, "heartbeat status:" + this.status, "mysql", this.source.getConfig().getId(), labels); + } + private void setOk() { if (LOGGER.isDebugEnabled()) { LOGGER.debug("heartbeat to [" + source.getConfig().getUrl() + "] setOK"); @@ -209,8 +300,7 @@ private void setOk() { this.status = OK_STATUS; this.errorCount.set(0); this.startErrorTime.set(-1); - Map labels = AlertUtil.genSingleLabel("dbInstance", this.source.getDbGroupConfig().getName() + "-" + this.source.getConfig().getInstanceName()); - AlertUtil.alertResolve(AlarmCode.HEARTBEAT_FAIL, Alert.AlertLevel.WARN, "mysql", this.source.getConfig().getId(), labels); + alert(); } if (isStop) { LOGGER.warn("heartbeat[{}] had been stop", source.getConfig().getUrl()); @@ -296,7 +386,11 @@ public long getHeartbeatTimeout() { } String getHeartbeatSQL() { - return heartbeatSQL; + if (isDelayDetection && !source.isReadInstance()) { + return convert(heartbeatSQL, Lists.newArrayList(String.valueOf(LocalDateTime.now()), String.valueOf(source.getDbGroup().getLogicTimestamp().incrementAndGet()))); + } else { + return heartbeatSQL; + } } public DbInstanceSyncRecorder getAsyncRecorder() { @@ -306,4 +400,20 @@ public DbInstanceSyncRecorder getAsyncRecorder() { public int getErrorTimeInLast5MinCount() { return errorTimeInLast5MinCount; } + + public long getHeartbeatConnId() { + if (detector != null) { + return detector.getHeartbeatConnId(); + } else { + return 0L; + } + } + + public int getLogicUpdate() { + return logicUpdate; + } + + public void setLogicUpdate(int logicUpdate) { + this.logicUpdate = logicUpdate; + } } diff --git a/src/main/java/com/actiontech/dble/backend/heartbeat/MySQLReadOnlyDetector.java b/src/main/java/com/actiontech/dble/backend/heartbeat/MySQLReadOnlyDetector.java new file mode 100644 index 000000000..2b35d4b45 --- /dev/null +++ b/src/main/java/com/actiontech/dble/backend/heartbeat/MySQLReadOnlyDetector.java @@ -0,0 +1,38 @@ +/* + * Copyright (C) 2016-2022 ActionTech. + * based on code by MyCATCopyrightHolder Copyright (c) 2013, OpenCloudDB/MyCAT. + * License: http://www.gnu.org/licenses/gpl.html GPL version 2 or higher. + */ +package com.actiontech.dble.backend.heartbeat; + +import com.actiontech.dble.backend.datasource.PhysicalDbInstance; +import com.actiontech.dble.sqlengine.OneRawSQLQueryResultHandler; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; + +/** + * @author mycat + */ +public class MySQLReadOnlyDetector extends MySQLDetector { + public static final Logger LOGGER = LoggerFactory.getLogger(MySQLReadOnlyDetector.class); + private static final String[] MYSQL_READ_ONLY_COLS = new String[]{"@@read_only"}; + + public MySQLReadOnlyDetector(MySQLHeartbeat heartbeat) { + super(heartbeat); + this.sqlJob = new HeartbeatSQLJob(heartbeat, new OneRawSQLQueryResultHandler(MYSQL_READ_ONLY_COLS, this)); + } + + @Override + protected void setStatus(PhysicalDbInstance source, Map resultResult) { + String readonly = resultResult != null ? resultResult.get("@@read_only") : null; + if (readonly == null) { + heartbeat.setErrorResult("result of select @@read_only is null"); + } else if (readonly.equals("0")) { + source.setReadOnly(false); + } else { + source.setReadOnly(true); + } + } +} diff --git a/src/main/java/com/actiontech/dble/backend/heartbeat/MySQLShowSlaveStatusDetector.java b/src/main/java/com/actiontech/dble/backend/heartbeat/MySQLShowSlaveStatusDetector.java new file mode 100644 index 000000000..1a32e919c --- /dev/null +++ b/src/main/java/com/actiontech/dble/backend/heartbeat/MySQLShowSlaveStatusDetector.java @@ -0,0 +1,62 @@ +/* + * Copyright (C) 2016-2022 ActionTech. + * based on code by MyCATCopyrightHolder Copyright (c) 2013, OpenCloudDB/MyCAT. + * License: http://www.gnu.org/licenses/gpl.html GPL version 2 or higher. + */ +package com.actiontech.dble.backend.heartbeat; + +import com.actiontech.dble.backend.datasource.PhysicalDbInstance; +import com.actiontech.dble.sqlengine.OneRawSQLQueryResultHandler; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; + +/** + * @author mycat + */ +public class MySQLShowSlaveStatusDetector extends MySQLDetector { + public static final Logger LOGGER = LoggerFactory.getLogger(MySQLShowSlaveStatusDetector.class); + private static final String[] MYSQL_SLAVE_STATUS_COLS = new String[]{ + "Seconds_Behind_Master", + "Slave_IO_Running", + "Slave_SQL_Running", + "Slave_IO_State", + "Master_Host", + "Master_User", + "Master_Port", + "Connect_Retry", + "Last_IO_Error"}; + + public MySQLShowSlaveStatusDetector(MySQLHeartbeat heartbeat) { + super(heartbeat); + this.sqlJob = new HeartbeatSQLJob(heartbeat, new OneRawSQLQueryResultHandler(MYSQL_SLAVE_STATUS_COLS, this)); + } + + @Override + protected void setStatus(PhysicalDbInstance source, Map resultResult) { + String slaveIoRunning = resultResult != null ? resultResult.get("Slave_IO_Running") : null; + String slaveSqlRunning = resultResult != null ? resultResult.get("Slave_SQL_Running") : null; + if (slaveIoRunning != null && slaveIoRunning.equals(slaveSqlRunning) && slaveSqlRunning.equals("Yes")) { + heartbeat.setDbSynStatus(MySQLHeartbeat.DB_SYN_NORMAL); + String secondsBehindMaster = resultResult.get("Seconds_Behind_Master"); + if (null != secondsBehindMaster && !"".equals(secondsBehindMaster) && !"NULL".equalsIgnoreCase(secondsBehindMaster)) { + int behindMaster = Integer.parseInt(secondsBehindMaster) * 1000; + int delayThreshold = source.getDbGroupConfig().getDelayThreshold(); + if (delayThreshold > 0 && behindMaster > delayThreshold) { + MySQLHeartbeat.LOGGER.warn("found MySQL master/slave Replication delay !!! " + heartbeat.getSource().getConfig() + ", binlog sync time delay: " + behindMaster + "ms"); + } + heartbeat.setSlaveBehindMaster(behindMaster); + } else { + heartbeat.setSlaveBehindMaster(null); + } + } else if (source.isSalveOrRead()) { + //String Last_IO_Error = resultResult != null ? resultResult.get("Last_IO_Error") : null; + MySQLHeartbeat.LOGGER.warn("found MySQL master/slave Replication err !!! " + + heartbeat.getSource().getConfig() + ", " + resultResult); + heartbeat.setDbSynStatus(MySQLHeartbeat.DB_SYN_ERROR); + heartbeat.setSlaveBehindMaster(null); + } + heartbeat.getAsyncRecorder().setBySlaveStatus(resultResult); + } +} diff --git a/src/main/java/com/actiontech/dble/cluster/zkprocess/entity/dbGroups/DBGroup.java b/src/main/java/com/actiontech/dble/cluster/zkprocess/entity/dbGroups/DBGroup.java index 1fe8f9f4e..8e9e5d4d7 100644 --- a/src/main/java/com/actiontech/dble/cluster/zkprocess/entity/dbGroups/DBGroup.java +++ b/src/main/java/com/actiontech/dble/cluster/zkprocess/entity/dbGroups/DBGroup.java @@ -27,6 +27,12 @@ public class DBGroup implements Named { @XmlAttribute protected Integer delayThreshold; + @XmlAttribute + protected Integer delayPeriodMillis; + + @XmlAttribute + protected String delayDatabase; + @XmlAttribute protected String disableHA; @@ -106,15 +112,34 @@ public void setDisableHA(String disableHA) { this.disableHA = disableHA; } + public Integer getDelayPeriodMillis() { + return delayPeriodMillis; + } + + public void setDelayPeriodMillis(Integer delayPeriodMillis) { + this.delayPeriodMillis = delayPeriodMillis; + } + + public String getDelayDatabase() { + return delayDatabase; + } + + public void setDelayDatabase(String delayDatabase) { + this.delayDatabase = delayDatabase; + } @Override public String toString() { - String builder = "dbGroup [rwSplitMode=" + + return "dbGroup [rwSplitMode=" + rwSplitMode + ", name=" + name + ", delayThreshold=" + delayThreshold + + ", delayPeriodMillis=" + + delayPeriodMillis + + ", delayDatabase=" + + delayDatabase + ", disableHA=" + disableHA + ", heartbeat=" + @@ -122,6 +147,5 @@ public String toString() { ", dbInstances=[" + dbInstance + "]"; - return builder; } } diff --git a/src/main/java/com/actiontech/dble/config/ConfigInitializer.java b/src/main/java/com/actiontech/dble/config/ConfigInitializer.java index 913f72c70..cb464d47f 100644 --- a/src/main/java/com/actiontech/dble/config/ConfigInitializer.java +++ b/src/main/java/com/actiontech/dble/config/ConfigInitializer.java @@ -9,6 +9,7 @@ import com.actiontech.dble.backend.datasource.PhysicalDbInstance; import com.actiontech.dble.backend.datasource.ShardingNode; import com.actiontech.dble.cluster.values.ConfStatus; +import com.actiontech.dble.config.helper.CreateDelayDetectTableTask; import com.actiontech.dble.config.helper.TestSchemasTask; import com.actiontech.dble.config.helper.TestTask; import com.actiontech.dble.config.loader.xml.XMLDbLoader; @@ -161,7 +162,7 @@ private void deleteUselessShardingNode() { if (allUseShardingNode.contains(shardingNodeName)) { shardingNodeGroup = entry.getValue().getDbGroup(); if (shardingNodeGroup != null) { - shardingNodeGroup.setShardingUseless(false); + shardingNodeGroup.setUsedForSharding(); } else { throw new ConfigException("dbGroup not exists " + entry.getValue().getDbGroupName()); } @@ -183,10 +184,30 @@ private void checkRwSplitDbGroup() { group = this.dbGroups.get(rwSplitUserConfig.getDbGroup()); if (group == null) { throw new ConfigException("The user's group[" + rwSplitUserConfig.getName() + "." + rwSplitUserConfig.getDbGroup() + "] for rwSplit isn't configured in db.xml."); - } else if (!group.isShardingUseless()) { + } else if (group.usedForSharding()) { throw new ConfigException("The group[" + rwSplitUserConfig.getName() + "." + rwSplitUserConfig.getDbGroup() + "] has been used by sharding node, can't be used by rwSplit."); } else { - group.setRwSplitUseless(false); + group.setUsedForRW(); + } + } + } + } + + public void createDelayDetectTable() { + String dbGroupName; + PhysicalDbGroup dbGroup; + for (Map.Entry entry : this.dbGroups.entrySet()) { + dbGroup = entry.getValue(); + dbGroupName = entry.getKey(); + if (!dbGroup.isUseless()) { + for (PhysicalDbInstance ds : dbGroup.getDbInstances(true)) { + if (ds.getDbGroupConfig().isDelayDetection() && !ds.isSalveOrRead()) { + BoolPtr createTablePtr = new BoolPtr(false); + createDelayDetectTable(ds, createTablePtr); + if (!createTablePtr.get()) { + throw new ConfigException("create delay table error, please check dbInstance[" + dbGroupName + "." + ds.getName() + "]."); + } + } } } } @@ -205,7 +226,7 @@ public void testConnection() { if (SystemConfig.getInstance().isSkipTestConOnUpdate()) { if (reloadContext != null && !reloadContext.getAffectDbInstanceList().isEmpty()) { if (reloadContext.getConfStatus() != ConfStatus.Status.MANAGER_DELETE) { - boolean useSharding = reloadContext.getAffectDbInstanceList().stream().map(ele -> dbGroups.get(ele.getGroupName())).anyMatch((ele) -> ele != null && !ele.isShardingUseless()); + boolean useSharding = reloadContext.getAffectDbInstanceList().stream().map(ele -> dbGroups.get(ele.getGroupName())).anyMatch((ele) -> ele != null && ele.usedForSharding()); //not support for sharding db group if (!useSharding) { @@ -313,6 +334,17 @@ private boolean testDbInstance(String dbGroupName, PhysicalDbInstance ds, List

>> genDbInstanceSchemaMap() { Map>> dbInstanceSchemaMap = new HashMap<>(16); if (shardingNodes != null) { diff --git a/src/main/java/com/actiontech/dble/config/ServerConfig.java b/src/main/java/com/actiontech/dble/config/ServerConfig.java index 1f774075e..c02e9753b 100644 --- a/src/main/java/com/actiontech/dble/config/ServerConfig.java +++ b/src/main/java/com/actiontech/dble/config/ServerConfig.java @@ -113,6 +113,14 @@ public void testConnection() { } } + public void createDelayDetectTable() { + confInitNew.createDelayDetectTable(); + } + + public void create() throws Exception { + ConfigUtil.getAndSyncKeyVariables(confInitNew.getDbGroups(), true); + } + public void getAndSyncKeyVariables() throws Exception { ConfigUtil.getAndSyncKeyVariables(confInitNew.getDbGroups(), true); } diff --git a/src/main/java/com/actiontech/dble/config/helper/CreateDelayDetectTableTask.java b/src/main/java/com/actiontech/dble/config/helper/CreateDelayDetectTableTask.java new file mode 100644 index 000000000..700174a06 --- /dev/null +++ b/src/main/java/com/actiontech/dble/config/helper/CreateDelayDetectTableTask.java @@ -0,0 +1,78 @@ +/* + * Copyright (C) 2016-2020 ActionTech. + * License: http://www.gnu.org/licenses/gpl.html GPL version 2 or higher. + */ + +package com.actiontech.dble.config.helper; + +import com.actiontech.dble.backend.datasource.PhysicalDbInstance; +import com.actiontech.dble.plan.common.ptr.BoolPtr; +import com.actiontech.dble.sqlengine.OneRawSQLQueryResultHandler; +import com.actiontech.dble.sqlengine.OneTimeConnJob; +import com.actiontech.dble.sqlengine.SQLQueryResult; +import com.actiontech.dble.sqlengine.SQLQueryResultListener; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; + +public class CreateDelayDetectTableTask extends Thread { + private static final Logger LOGGER = LoggerFactory.getLogger(CreateDelayDetectTableTask.class); + private final PhysicalDbInstance ds; + + private final ReentrantLock lock = new ReentrantLock(); + private final Condition finishCond = lock.newCondition(); + private boolean isFinish = false; + private BoolPtr successFlag; + + public CreateDelayDetectTableTask(PhysicalDbInstance ds, BoolPtr successFlag) { + this.ds = ds; + this.successFlag = successFlag; + } + + @Override + public void run() { + String table = ds.getDbGroupConfig().getDelayDatabase() + ".u_delay"; + + OneRawSQLQueryResultHandler resultHandler = new OneRawSQLQueryResultHandler(new String[0], new DelayDetectionListener()); + String createTableSQL = "create table if not exists " + table + + " (source VARCHAR(256) primary key,real_timestamp varchar(26) NOT NULL,logic_timestamp BIGINT default 0)"; + OneTimeConnJob sqlJob = new OneTimeConnJob(createTableSQL, null, resultHandler, ds); + sqlJob.run(); + lock.lock(); + try { + while (!isFinish) { + finishCond.await(); + } + } catch (InterruptedException e) { + LOGGER.warn("test conn Interrupted:", e); + } finally { + lock.unlock(); + } + } + + private class DelayDetectionListener implements SQLQueryResultListener>> { + + @Override + public void onResult(SQLQueryResult> result) { + if (result.isSuccess()) { + successFlag.set(true); + } + handleFinished(); + } + + private void handleFinished() { + lock.lock(); + try { + isFinish = true; + finishCond.signal(); + } finally { + lock.unlock(); + } + } + + + } +} diff --git a/src/main/java/com/actiontech/dble/config/loader/xml/XMLDbLoader.java b/src/main/java/com/actiontech/dble/config/loader/xml/XMLDbLoader.java index ecc8fd5f5..97242b1c6 100644 --- a/src/main/java/com/actiontech/dble/config/loader/xml/XMLDbLoader.java +++ b/src/main/java/com/actiontech/dble/config/loader/xml/XMLDbLoader.java @@ -126,6 +126,12 @@ private void loadDbGroups(Element root) throws InvocationTargetException, Illega //slave delay threshold String delayThresholdStr = ConfigUtil.checkAndGetAttribute(element, "delayThreshold", "-1", problemReporter); final int delayThreshold = Integer.parseInt(delayThresholdStr); + + String delayPeriodMillisStr = ConfigUtil.checkAndGetAttribute(element, "delayPeriodMillis", "-1", problemReporter); + final int delayPeriodMillis = Integer.parseInt(delayPeriodMillisStr); + + final String delayDatabaseStr = element.getAttribute("delayDatabase"); + String disableHAStr = ConfigUtil.checkAndGetAttribute(element, "disableHA", "false", problemReporter); boolean disableHA = Boolean.parseBoolean(disableHAStr); @@ -167,6 +173,9 @@ private void loadDbGroups(Element root) throws InvocationTargetException, Illega dbGroupConf.setHeartbeatSQL(heartbeatSQL); dbGroupConf.setHeartbeatTimeout(Integer.parseInt(strHBTimeout) * 1000); dbGroupConf.setErrorRetryCount(Integer.parseInt(strHBErrorRetryCount)); + // delay check + dbGroupConf.setDelayPeriodMillis(delayPeriodMillis); + dbGroupConf.setDelayDatabase(delayDatabaseStr); dbGroupConfigs.put(dbGroupConf.getName(), dbGroupConf); } } diff --git a/src/main/java/com/actiontech/dble/config/model/db/DbGroupConfig.java b/src/main/java/com/actiontech/dble/config/model/db/DbGroupConfig.java index 724063503..93eb9a5bb 100644 --- a/src/main/java/com/actiontech/dble/config/model/db/DbGroupConfig.java +++ b/src/main/java/com/actiontech/dble/config/model/db/DbGroupConfig.java @@ -8,6 +8,7 @@ import com.actiontech.dble.backend.datasource.PhysicalDbGroup; import com.actiontech.dble.config.util.ConfigException; import com.actiontech.dble.util.StringUtil; +import com.google.common.base.Strings; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -23,6 +24,8 @@ public class DbGroupConfig { private boolean isShowSlaveSql = false; private boolean isSelectReadOnlySql = false; private int delayThreshold; + private int delayPeriodMillis; + private String delayDatabase; private int heartbeatTimeout = 0; private int errorRetryCount = 1; @@ -131,4 +134,24 @@ public boolean isDisableHA() { public void setDisableHA(boolean disableHA) { this.disableHA = disableHA; } + + public int getDelayPeriodMillis() { + return delayPeriodMillis; + } + + public void setDelayPeriodMillis(int delayPeriodMillis) { + this.delayPeriodMillis = delayPeriodMillis; + } + + public String getDelayDatabase() { + return delayDatabase; + } + + public void setDelayDatabase(String delayDatabase) { + this.delayDatabase = delayDatabase; + } + + public boolean isDelayDetection() { + return !Strings.isNullOrEmpty(delayDatabase) && delayThreshold > 0 && delayPeriodMillis > 0; + } } diff --git a/src/main/java/com/actiontech/dble/rwsplit/RWSplitNonBlockingSession.java b/src/main/java/com/actiontech/dble/rwsplit/RWSplitNonBlockingSession.java index 6b12e7b8e..4181da867 100644 --- a/src/main/java/com/actiontech/dble/rwsplit/RWSplitNonBlockingSession.java +++ b/src/main/java/com/actiontech/dble/rwsplit/RWSplitNonBlockingSession.java @@ -44,7 +44,7 @@ public void execute(Boolean master, Callback callback, boolean writeStatistical) * @param localRead only the SELECT and show statements attempt to localRead */ public void execute(Boolean master, Callback callback, boolean writeStatistical, boolean localRead) { - execute(master, null, callback, writeStatistical, localRead && !rwGroup.isRwSplitUseless()); + execute(master, null, callback, writeStatistical, localRead && rwGroup.usedForRW()); } diff --git a/src/main/java/com/actiontech/dble/services/manager/information/ManagerSchemaInfo.java b/src/main/java/com/actiontech/dble/services/manager/information/ManagerSchemaInfo.java index 5b6e4b316..c4e0b8f61 100644 --- a/src/main/java/com/actiontech/dble/services/manager/information/ManagerSchemaInfo.java +++ b/src/main/java/com/actiontech/dble/services/manager/information/ManagerSchemaInfo.java @@ -47,6 +47,7 @@ private ManagerSchemaInfo() { registerTable(new ProcessList()); registerTable(new SessionVariables()); registerTable(new BackendVariables()); + registerTable(new DbleDelayDetection()); } diff --git a/src/main/java/com/actiontech/dble/services/manager/information/tables/DbleDbGroup.java b/src/main/java/com/actiontech/dble/services/manager/information/tables/DbleDbGroup.java index c785d015d..9f1648e9b 100644 --- a/src/main/java/com/actiontech/dble/services/manager/information/tables/DbleDbGroup.java +++ b/src/main/java/com/actiontech/dble/services/manager/information/tables/DbleDbGroup.java @@ -45,6 +45,8 @@ public class DbleDbGroup extends ManagerWritableTable { public static final String COLUMN_DELAY_THRESHOLD = "delay_threshold"; public static final String COLUMN_DISABLE_HA = "disable_ha"; public static final String COLUMN_ACTIVE = "active"; + public static final String DELAY_PERIOD_MILLIS = "delay_period_millis"; + public static final String DELAY_DATABASE = "delay_database"; private final List> tempRowList = Lists.newArrayList(); @@ -77,6 +79,12 @@ protected void initColumnAndType() { columns.put(COLUMN_DELAY_THRESHOLD, new ColumnMeta(COLUMN_DELAY_THRESHOLD, "int(11)", true, "-1")); columnsType.put(COLUMN_DELAY_THRESHOLD, Fields.FIELD_TYPE_LONG); + columns.put(DELAY_PERIOD_MILLIS, new ColumnMeta(DELAY_PERIOD_MILLIS, "int(11)", true, "-1")); + columnsType.put(DELAY_PERIOD_MILLIS, Fields.FIELD_TYPE_LONG); + + columns.put(DELAY_DATABASE, new ColumnMeta(DELAY_DATABASE, "varchar(255)", true, null)); + columnsType.put(DELAY_DATABASE, Fields.FIELD_TYPE_VAR_STRING); + columns.put(COLUMN_DISABLE_HA, new ColumnMeta(COLUMN_DISABLE_HA, "varchar(5)", true, "false")); columnsType.put(COLUMN_DISABLE_HA, Fields.FIELD_TYPE_VAR_STRING); @@ -198,6 +206,12 @@ public DBGroup transformRowToDBGroup(LinkedHashMap values) { case COLUMN_DELAY_THRESHOLD: dbGroup.setDelayThreshold(Integer.parseInt(value)); break; + case DELAY_PERIOD_MILLIS: + dbGroup.setDelayPeriodMillis(Integer.parseInt(value)); + break; + case DELAY_DATABASE: + dbGroup.setDelayDatabase(String.valueOf(value)); + break; case COLUMN_DISABLE_HA: dbGroup.setDisableHA(value); break; @@ -270,6 +284,12 @@ private void checkRule(LinkedHashMap row) { } } + private void delayDetectionCheck(String delayPeriodMillis) { + if (!StringUtil.isBlank(delayPeriodMillis) && IntegerUtil.parseInt(delayPeriodMillis) < -1) { + throw new ConfigException("Column '" + COLUMN_DELAY_THRESHOLD + "' should be an integer greater than -1!"); + } + } + public static DbGroups getDbGroups() { XmlProcessBase xmlProcess = new XmlProcessBase(); DbGroups dbs = null; @@ -299,6 +319,8 @@ private void checkInterValue(LinkedHashMap row) { if (row.containsKey(COLUMN_HEARTBEAT_RETRY) && (StringUtil.isBlank(heartbeatRetryStr) || IntegerUtil.parseInt(heartbeatRetryStr) < 0)) { throw new ConfigException("Column '" + COLUMN_HEARTBEAT_RETRY + "' should be an integer greater than or equal to 0!"); } + String delayPeriodMillis = row.get(DELAY_PERIOD_MILLIS); + delayDetectionCheck(delayPeriodMillis); } private LinkedHashMap initMap(DbGroupConfig dbGroupConfig) { @@ -309,6 +331,8 @@ private LinkedHashMap initMap(DbGroupConfig dbGroupConfig) { map.put(COLUMN_HEARTBEAT_RETRY, String.valueOf(dbGroupConfig.getErrorRetryCount())); map.put(COLUMN_RW_SPLIT_MODE, String.valueOf(dbGroupConfig.getRwSplitMode())); map.put(COLUMN_DELAY_THRESHOLD, String.valueOf(dbGroupConfig.getDelayThreshold())); + map.put(DELAY_PERIOD_MILLIS, String.valueOf(dbGroupConfig.getDelayPeriodMillis())); + map.put(DELAY_DATABASE, String.valueOf(dbGroupConfig.getDelayDatabase())); map.put(COLUMN_DISABLE_HA, String.valueOf(dbGroupConfig.isDisableHA())); return map; } diff --git a/src/main/java/com/actiontech/dble/services/manager/information/tables/DbleDelayDetection.java b/src/main/java/com/actiontech/dble/services/manager/information/tables/DbleDelayDetection.java new file mode 100644 index 000000000..010d34dcc --- /dev/null +++ b/src/main/java/com/actiontech/dble/services/manager/information/tables/DbleDelayDetection.java @@ -0,0 +1,159 @@ +/* + * Copyright (C) 2016-2022 ActionTech. + * License: http://www.gnu.org/licenses/gpl.html GPL version 2 or higher. + */ + +package com.actiontech.dble.services.manager.information.tables; + +import com.actiontech.dble.DbleServer; +import com.actiontech.dble.backend.datasource.PhysicalDbGroup; +import com.actiontech.dble.backend.datasource.PhysicalDbInstance; +import com.actiontech.dble.backend.heartbeat.MySQLHeartbeat; +import com.actiontech.dble.config.ErrorCode; +import com.actiontech.dble.config.Fields; +import com.actiontech.dble.config.ServerConfig; +import com.actiontech.dble.config.model.db.DbInstanceConfig; +import com.actiontech.dble.meta.ColumnMeta; +import com.actiontech.dble.services.manager.information.ManagerWritableTable; +import com.actiontech.dble.util.StringUtil; +import com.google.common.collect.Lists; + +import java.sql.SQLException; +import java.util.*; +import java.util.concurrent.locks.ReentrantReadWriteLock; + + +public class DbleDelayDetection extends ManagerWritableTable { + private static final String TABLE_NAME = "delay_detection"; + + private static final String COLUMN_DB_GROUP_NAME = "db_group_name"; + private static final String COLUMN_NAME = "name"; + private static final String COLUMN_HOST = "host"; + private static final String COLUMN_DELAY = "delay"; + private static final String COLUMN_STATUS = "status"; + private static final String COLUMN_MESSAGE = "message"; + private static final String COLUMN_LAST_ACTIVE_TIME = "last_active_time"; + private static final String COLUMN_BACKEND_CONN_ID = "backend_conn_id"; + private static final String COLUMN_LOGIC_UPDATE = "logic_update"; + + public DbleDelayDetection() { + super(TABLE_NAME, 9); + } + + @Override + protected void initColumnAndType() { + + columns.put(COLUMN_DB_GROUP_NAME, new ColumnMeta(COLUMN_DB_GROUP_NAME, "varchar(64)", false)); + columnsType.put(COLUMN_DB_GROUP_NAME, Fields.FIELD_TYPE_VAR_STRING); + + columns.put(COLUMN_NAME, new ColumnMeta(COLUMN_NAME, "varchar(64)", false, true)); + columnsType.put(COLUMN_NAME, Fields.FIELD_TYPE_VAR_STRING); + + columns.put(COLUMN_HOST, new ColumnMeta(COLUMN_HOST, "int(11)", false)); + columnsType.put(COLUMN_HOST, Fields.FIELD_TYPE_VAR_STRING); + + columns.put(COLUMN_DELAY, new ColumnMeta(COLUMN_DELAY, "int(11)", false)); + columnsType.put(COLUMN_DELAY, Fields.FIELD_TYPE_LONG); + + columns.put(COLUMN_STATUS, new ColumnMeta(COLUMN_STATUS, "varchar(3)", false)); + columnsType.put(COLUMN_STATUS, Fields.FIELD_TYPE_VAR_STRING); + + columns.put(COLUMN_MESSAGE, new ColumnMeta(COLUMN_MESSAGE, "varchar(1024)", false)); + columnsType.put(COLUMN_MESSAGE, Fields.FIELD_TYPE_VAR_STRING); + + columns.put(COLUMN_LAST_ACTIVE_TIME, new ColumnMeta(COLUMN_LAST_ACTIVE_TIME, "timestamp", false)); + columnsType.put(COLUMN_LAST_ACTIVE_TIME, Fields.FIELD_TYPE_TIMESTAMP); + + columns.put(COLUMN_BACKEND_CONN_ID, new ColumnMeta(COLUMN_BACKEND_CONN_ID, "int(11)", false)); + columnsType.put(COLUMN_BACKEND_CONN_ID, Fields.FIELD_TYPE_LONG); + + columns.put(COLUMN_LOGIC_UPDATE, new ColumnMeta(COLUMN_LOGIC_UPDATE, "int(11)", false)); + columnsType.put(COLUMN_LOGIC_UPDATE, Fields.FIELD_TYPE_LONG); + + + } + + protected List> getRows() { + List> results = new ArrayList<>(); + ServerConfig conf = DbleServer.getInstance().getConfig(); + Map dbGroups = conf.getDbGroups(); + for (PhysicalDbGroup dbGroup : dbGroups.values()) { + if (dbGroup.getDbGroupConfig().isDelayDetection()) { + for (PhysicalDbInstance dbInstance : dbGroup.getDbInstances(true)) { + LinkedHashMap row = getRow(dbInstance); + if (!row.isEmpty()) { + results.add(row); + } + } + } + } + return results; + } + + private LinkedHashMap getRow(PhysicalDbInstance dbInstance) { + LinkedHashMap row = new LinkedHashMap<>(); + final MySQLHeartbeat heartbeat = dbInstance.getHeartbeat(); + if (Objects.isNull(heartbeat) || heartbeat.isStop()) { + return row; + } + DbInstanceConfig config = dbInstance.getConfig(); + row.put(COLUMN_DB_GROUP_NAME, dbInstance.getDbGroup().getGroupName()); + row.put(COLUMN_NAME, dbInstance.getName()); + row.put(COLUMN_HOST, config.getUrl()); + row.put(COLUMN_DELAY, String.valueOf(heartbeat.getSlaveBehindMaster())); + row.put(COLUMN_STATUS, String.valueOf(heartbeat.getStatus())); + row.put(COLUMN_MESSAGE, heartbeat.getMessage()); + row.put(COLUMN_LAST_ACTIVE_TIME, String.valueOf(heartbeat.getLastActiveTime())); + row.put(COLUMN_LOGIC_UPDATE, String.valueOf(heartbeat.getLogicUpdate())); + row.put(COLUMN_BACKEND_CONN_ID, String.valueOf(heartbeat.getHeartbeatConnId())); + return row; + } + + @Override + public int insertRows(List> rows) throws SQLException { + throw new SQLException("Access denied for table '" + tableName + "'", "42000", ErrorCode.ER_ACCESS_DENIED_ERROR); + } + + @Override + public int updateRows(Set> affectPks, LinkedHashMap values) throws SQLException { + if (values.size() != 1 || !values.containsKey(COLUMN_LOGIC_UPDATE)) { + throw new SQLException("only column '" + COLUMN_LOGIC_UPDATE + "' is writable", "42S22", ErrorCode.ER_ERROR_ON_WRITE); + } + final ReentrantReadWriteLock lock = DbleServer.getInstance().getConfig().getLock(); + lock.writeLock().lock(); + try { + int val = Integer.parseInt(values.get(COLUMN_LOGIC_UPDATE)); + ServerConfig conf = DbleServer.getInstance().getConfig(); + Map dbGroups = conf.getDbGroups(); + List instanceList = Lists.newArrayList(); + for (LinkedHashMap affectPk : affectPks) { + String groupName = affectPk.get(COLUMN_DB_GROUP_NAME); + String name = affectPk.get(COLUMN_NAME); + for (PhysicalDbGroup physicalDbGroup : dbGroups.values()) { + if (StringUtil.equals(groupName, physicalDbGroup.getGroupName()) && physicalDbGroup.getDbGroupConfig().isDelayDetection()) { + PhysicalDbInstance instance = physicalDbGroup.getDbInstances(true).stream().filter(dbInstance -> StringUtil.equals(name, dbInstance.getName()) && Objects.nonNull(dbInstance.getHeartbeat())).findFirst().get(); + MySQLHeartbeat delayDetection = instance.getHeartbeat(); + int logicUpdate = delayDetection.getLogicUpdate(); + if (val != logicUpdate + 1) { + throw new SQLException("parameter only increment is allowed to be 1", "42S22", ErrorCode.ER_TABLE_CANT_HANDLE_AUTO_INCREMENT); + } + instanceList.add(instance); + } + } + } + for (PhysicalDbInstance instance : instanceList) { + instance.getHeartbeat().stop("the management end is shut down manually"); + instance.getHeartbeat().start(instance.getHeartbeatRecoveryTime()); + instance.getHeartbeat().setLogicUpdate(val); + } + } finally { + lock.writeLock().unlock(); + } + return affectPks.size(); + } + + @Override + public int deleteRows(Set> affectPks) throws SQLException { + throw new SQLException("Access denied for table '" + tableName + "'", "42000", ErrorCode.ER_ACCESS_DENIED_ERROR); + } +} diff --git a/src/main/java/com/actiontech/dble/services/manager/response/DryRun.java b/src/main/java/com/actiontech/dble/services/manager/response/DryRun.java index 9a1010a11..07b2631ec 100644 --- a/src/main/java/com/actiontech/dble/services/manager/response/DryRun.java +++ b/src/main/java/com/actiontech/dble/services/manager/response/DryRun.java @@ -86,6 +86,11 @@ public static void execute(ManagerService service) { LOGGER.debug("just test ,not stop reload, catch exception", e); } } + try { + loader.createDelayDetectTable(); + } catch (Exception e) { + list.add(new ErrorInfo("Backend", "ERROR", e.getMessage())); + } try { String msg = ConfigUtil.getAndSyncKeyVariables(loader.getDbGroups(), false); if (msg != null) { diff --git a/src/main/java/com/actiontech/dble/services/manager/response/ReloadConfig.java b/src/main/java/com/actiontech/dble/services/manager/response/ReloadConfig.java index ad092170c..51bf2927b 100644 --- a/src/main/java/com/actiontech/dble/services/manager/response/ReloadConfig.java +++ b/src/main/java/com/actiontech/dble/services/manager/response/ReloadConfig.java @@ -238,6 +238,8 @@ public static boolean reloadAll(final int loadAllMode, ReloadContext reloadConte } } + loader.createDelayDetectTable(); + boolean forceAllReload = false; if ((loadAllMode & ManagerParseConfig.OPTR_MODE) != 0) { @@ -285,6 +287,9 @@ private static boolean intelligentReloadAll(int loadAllMode, ConfigInitializer l } } checkTestConnIfNeed(loadAllMode, loader); + if (loader.isFullyConfigured()) { + loader.createDelayDetectTable(); + } Map newUsers = serverConfig.getUsers(); Map newSchemas = serverConfig.getSchemas(); @@ -394,6 +399,9 @@ private static boolean forceReloadAll(final int loadAllMode, ConfigInitializer l } } checkTestConnIfNeed(loadAllMode, loader); + if (loader.isFullyConfigured()) { + loader.createDelayDetectTable(); + } Map newUsers = serverConfig.getUsers(); Map newSchemas = serverConfig.getSchemas(); diff --git a/src/main/resources/db.dtd b/src/main/resources/db.dtd index 007a6bc88..d70e0081c 100644 --- a/src/main/resources/db.dtd +++ b/src/main/resources/db.dtd @@ -26,6 +26,8 @@ rwSplitMode NMTOKEN #REQUIRED name NMTOKEN #REQUIRED delayThreshold NMTOKEN #IMPLIED + delayPeriodMillis NMTOKEN #IMPLIED + delayDatabase NMTOKEN #IMPLIED disableHA NMTOKEN #IMPLIED>