From 7bbb961738eff9d8f8274d4ada80246f359cd623 Mon Sep 17 00:00:00 2001 From: guoaomen Date: Mon, 19 Jan 2026 10:38:25 +0800 Subject: [PATCH 1/2] fix[ATK-4637]: add Jackson dependencies and enhance MySQL delay detection logic --- pom.xml | 22 ++++++++++++++ .../backend/heartbeat/MySQLDelayDetector.java | 13 ++++++-- .../backend/heartbeat/MySQLHeartbeat.java | 30 +++++++++++++------ 3 files changed, 53 insertions(+), 12 deletions(-) diff --git a/pom.xml b/pom.xml index 740f4e7cfd..3dea2045ee 100644 --- a/pom.xml +++ b/pom.xml @@ -26,6 +26,7 @@ 1.75.0 4.1.124.Final 2.18.0 + 2.15.0 @@ -130,6 +131,27 @@ de.ruedigermoeller fst 2.57 + + + com.fasterxml.jackson.core + jackson-core + + + + + com.fasterxml.jackson.core + jackson-databind + ${jackson.version} + + + com.fasterxml.jackson.core + jackson-annotations + ${jackson.version} + + + com.fasterxml.jackson.core + jackson-core + ${jackson.version} junit diff --git a/src/main/java/com/actiontech/dble/backend/heartbeat/MySQLDelayDetector.java b/src/main/java/com/actiontech/dble/backend/heartbeat/MySQLDelayDetector.java index 9faba18aa7..501df8390c 100644 --- a/src/main/java/com/actiontech/dble/backend/heartbeat/MySQLDelayDetector.java +++ b/src/main/java/com/actiontech/dble/backend/heartbeat/MySQLDelayDetector.java @@ -56,9 +56,16 @@ private void delayCal(long delay, long delayThreshold) { heartbeat.setSlaveBehindMaster((int) delayVal); heartbeat.setDbSynStatus(MySQLHeartbeat.DB_SYN_NORMAL); } else { - // master and slave maybe switch - heartbeat.setSlaveBehindMaster(null); - heartbeat.setDbSynStatus(MySQLHeartbeat.DB_SYN_ERROR); + if (heartbeat.getStatus() != MySQLHeartbeat.OK_STATUS) { + long updatedLogic = dbGroup.getLogicTimestamp().updateAndGet(current -> Math.max(current, delay)); + LOGGER.warn("delay detection rebased logic_timestamp to {} for dbGroup {}", updatedLogic, dbGroup.getGroupName()); + heartbeat.setSlaveBehindMaster(0); + heartbeat.setDbSynStatus(MySQLHeartbeat.DB_SYN_NORMAL); + } else { + // master and slave maybe switch + heartbeat.setSlaveBehindMaster(null); + heartbeat.setDbSynStatus(MySQLHeartbeat.DB_SYN_ERROR); + } } } } diff --git a/src/main/java/com/actiontech/dble/backend/heartbeat/MySQLHeartbeat.java b/src/main/java/com/actiontech/dble/backend/heartbeat/MySQLHeartbeat.java index 9410dd7404..acc2c95102 100644 --- a/src/main/java/com/actiontech/dble/backend/heartbeat/MySQLHeartbeat.java +++ b/src/main/java/com/actiontech/dble/backend/heartbeat/MySQLHeartbeat.java @@ -84,7 +84,8 @@ public MySQLHeartbeat(PhysicalDbInstance dbInstance) { this.heartbeatTimeout = dbInstance.getDbGroupConfig().getHeartbeatTimeout(); this.isDelayDetection = dbInstance.getDbGroupConfig().isDelayDetection(); if (isDelayDetection) { - this.heartbeatSQL = getDetectorSql(dbInstance.getDbGroupConfig().getName(), dbInstance.getDbGroupConfig().getDelayDatabase()); + this.heartbeatSQL = getDetectorSql(dbInstance.getDbGroupConfig().getName(), + dbInstance.getDbGroupConfig().getDelayDatabase(), dbInstance.isReadInstance()); } else { this.heartbeatSQL = source.getDbGroupConfig().getHeartbeatSQL(); } @@ -181,12 +182,12 @@ public void heartbeat() { } } - private String getDetectorSql(String dbGroupName, String delayDatabase) { + private String getDetectorSql(String dbGroupName, String delayDatabase, boolean readInstance) { String[] str = {"dble", dbGroupName, SystemConfig.getInstance().getInstanceName()}; String sourceName = Joiner.on("_").join(str); String sqlTableName = delayDatabase + ".u_delay "; String detectorSql; - if (!source.isReadInstance()) { + if (!readInstance) { String update = "replace into ? (source,real_timestamp,logic_timestamp) values ('?','?',?)"; detectorSql = convert(update, Lists.newArrayList(sqlTableName, sourceName)); } else { @@ -199,9 +200,14 @@ private String getDetectorSql(String dbGroupName, String delayDatabase) { private String convert(String template, List list) { StringBuilder sb = new StringBuilder(template); String replace = "?"; + int fromIndex = 0; for (String str : list) { - int index = sb.indexOf(replace); - sb.replace(index, index + 1, str); + int index = sb.indexOf(replace, fromIndex); + if (index < 0) { + throw new IllegalArgumentException("heartbeat sql template placeholder '?' not enough, template=" + template + ", values=" + list); + } + sb.replace(index, index + replace.length(), str); + fromIndex = index + str.length(); } return sb.toString(); } @@ -387,11 +393,17 @@ public long getHeartbeatTimeout() { } String getHeartbeatSQL() { - if (isDelayDetection && !source.isReadInstance()) { - return convert(heartbeatSQL, Lists.newArrayList(String.valueOf(LocalDateTime.now()), String.valueOf(source.getDbGroup().getLogicTimestamp().incrementAndGet()))); - } else { - return heartbeatSQL; + if (isDelayDetection) { + boolean readInstance = source.isReadInstance(); + String detectorSql = getDetectorSql(source.getDbGroupConfig().getName(), + source.getDbGroupConfig().getDelayDatabase(), readInstance); + if (!readInstance) { + return convert(detectorSql, Lists.newArrayList(String.valueOf(LocalDateTime.now()), + String.valueOf(source.getDbGroup().getLogicTimestamp().incrementAndGet()))); + } + return detectorSql; } + return heartbeatSQL; } public DbInstanceSyncRecorder getAsyncRecorder() { From 1170f04b099f3ce0234ea865dad45ed05dbd16dd Mon Sep 17 00:00:00 2001 From: Cursor Agent Date: Mon, 19 Jan 2026 06:28:27 +0000 Subject: [PATCH 2/2] Gate delay rebase on init timestamp Co-authored-by: 859235917 <859235917@qq.com> --- .../actiontech/dble/backend/heartbeat/MySQLDelayDetector.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/com/actiontech/dble/backend/heartbeat/MySQLDelayDetector.java b/src/main/java/com/actiontech/dble/backend/heartbeat/MySQLDelayDetector.java index 501df8390c..a9f863636b 100644 --- a/src/main/java/com/actiontech/dble/backend/heartbeat/MySQLDelayDetector.java +++ b/src/main/java/com/actiontech/dble/backend/heartbeat/MySQLDelayDetector.java @@ -56,7 +56,7 @@ private void delayCal(long delay, long delayThreshold) { heartbeat.setSlaveBehindMaster((int) delayVal); heartbeat.setDbSynStatus(MySQLHeartbeat.DB_SYN_NORMAL); } else { - if (heartbeat.getStatus() != MySQLHeartbeat.OK_STATUS) { + if (logic == 0) { long updatedLogic = dbGroup.getLogicTimestamp().updateAndGet(current -> Math.max(current, delay)); LOGGER.warn("delay detection rebased logic_timestamp to {} for dbGroup {}", updatedLogic, dbGroup.getGroupName()); heartbeat.setSlaveBehindMaster(0);