diff --git a/pom.xml b/pom.xml index 740f4e7cf..3dea2045e 100644 --- a/pom.xml +++ b/pom.xml @@ -26,6 +26,7 @@ 1.75.0 4.1.124.Final 2.18.0 + 2.15.0 @@ -130,6 +131,27 @@ de.ruedigermoeller fst 2.57 + + + com.fasterxml.jackson.core + jackson-core + + + + + com.fasterxml.jackson.core + jackson-databind + ${jackson.version} + + + com.fasterxml.jackson.core + jackson-annotations + ${jackson.version} + + + com.fasterxml.jackson.core + jackson-core + ${jackson.version} junit diff --git a/src/main/java/com/actiontech/dble/backend/heartbeat/MySQLDelayDetector.java b/src/main/java/com/actiontech/dble/backend/heartbeat/MySQLDelayDetector.java index 9faba18aa..a9f863636 100644 --- a/src/main/java/com/actiontech/dble/backend/heartbeat/MySQLDelayDetector.java +++ b/src/main/java/com/actiontech/dble/backend/heartbeat/MySQLDelayDetector.java @@ -56,9 +56,16 @@ private void delayCal(long delay, long delayThreshold) { heartbeat.setSlaveBehindMaster((int) delayVal); heartbeat.setDbSynStatus(MySQLHeartbeat.DB_SYN_NORMAL); } else { - // master and slave maybe switch - heartbeat.setSlaveBehindMaster(null); - heartbeat.setDbSynStatus(MySQLHeartbeat.DB_SYN_ERROR); + if (logic == 0) { + long updatedLogic = dbGroup.getLogicTimestamp().updateAndGet(current -> Math.max(current, delay)); + LOGGER.warn("delay detection rebased logic_timestamp to {} for dbGroup {}", updatedLogic, dbGroup.getGroupName()); + heartbeat.setSlaveBehindMaster(0); + heartbeat.setDbSynStatus(MySQLHeartbeat.DB_SYN_NORMAL); + } else { + // master and slave maybe switch + heartbeat.setSlaveBehindMaster(null); + heartbeat.setDbSynStatus(MySQLHeartbeat.DB_SYN_ERROR); + } } } } diff --git a/src/main/java/com/actiontech/dble/backend/heartbeat/MySQLHeartbeat.java b/src/main/java/com/actiontech/dble/backend/heartbeat/MySQLHeartbeat.java index 9410dd740..acc2c9510 100644 --- a/src/main/java/com/actiontech/dble/backend/heartbeat/MySQLHeartbeat.java +++ b/src/main/java/com/actiontech/dble/backend/heartbeat/MySQLHeartbeat.java @@ -84,7 +84,8 @@ public MySQLHeartbeat(PhysicalDbInstance dbInstance) { this.heartbeatTimeout = dbInstance.getDbGroupConfig().getHeartbeatTimeout(); this.isDelayDetection = dbInstance.getDbGroupConfig().isDelayDetection(); if (isDelayDetection) { - this.heartbeatSQL = getDetectorSql(dbInstance.getDbGroupConfig().getName(), dbInstance.getDbGroupConfig().getDelayDatabase()); + this.heartbeatSQL = getDetectorSql(dbInstance.getDbGroupConfig().getName(), + dbInstance.getDbGroupConfig().getDelayDatabase(), dbInstance.isReadInstance()); } else { this.heartbeatSQL = source.getDbGroupConfig().getHeartbeatSQL(); } @@ -181,12 +182,12 @@ public void heartbeat() { } } - private String getDetectorSql(String dbGroupName, String delayDatabase) { + private String getDetectorSql(String dbGroupName, String delayDatabase, boolean readInstance) { String[] str = {"dble", dbGroupName, SystemConfig.getInstance().getInstanceName()}; String sourceName = Joiner.on("_").join(str); String sqlTableName = delayDatabase + ".u_delay "; String detectorSql; - if (!source.isReadInstance()) { + if (!readInstance) { String update = "replace into ? (source,real_timestamp,logic_timestamp) values ('?','?',?)"; detectorSql = convert(update, Lists.newArrayList(sqlTableName, sourceName)); } else { @@ -199,9 +200,14 @@ private String getDetectorSql(String dbGroupName, String delayDatabase) { private String convert(String template, List list) { StringBuilder sb = new StringBuilder(template); String replace = "?"; + int fromIndex = 0; for (String str : list) { - int index = sb.indexOf(replace); - sb.replace(index, index + 1, str); + int index = sb.indexOf(replace, fromIndex); + if (index < 0) { + throw new IllegalArgumentException("heartbeat sql template placeholder '?' not enough, template=" + template + ", values=" + list); + } + sb.replace(index, index + replace.length(), str); + fromIndex = index + str.length(); } return sb.toString(); } @@ -387,11 +393,17 @@ public long getHeartbeatTimeout() { } String getHeartbeatSQL() { - if (isDelayDetection && !source.isReadInstance()) { - return convert(heartbeatSQL, Lists.newArrayList(String.valueOf(LocalDateTime.now()), String.valueOf(source.getDbGroup().getLogicTimestamp().incrementAndGet()))); - } else { - return heartbeatSQL; + if (isDelayDetection) { + boolean readInstance = source.isReadInstance(); + String detectorSql = getDetectorSql(source.getDbGroupConfig().getName(), + source.getDbGroupConfig().getDelayDatabase(), readInstance); + if (!readInstance) { + return convert(detectorSql, Lists.newArrayList(String.valueOf(LocalDateTime.now()), + String.valueOf(source.getDbGroup().getLogicTimestamp().incrementAndGet()))); + } + return detectorSql; } + return heartbeatSQL; } public DbInstanceSyncRecorder getAsyncRecorder() {