diff --git a/pom.xml b/pom.xml
index 740f4e7cf..3dea2045e 100644
--- a/pom.xml
+++ b/pom.xml
@@ -26,6 +26,7 @@
1.75.0
4.1.124.Final
2.18.0
+ 2.15.0
@@ -130,6 +131,27 @@
de.ruedigermoeller
fst
2.57
+
+
+ com.fasterxml.jackson.core
+ jackson-core
+
+
+
+
+ com.fasterxml.jackson.core
+ jackson-databind
+ ${jackson.version}
+
+
+ com.fasterxml.jackson.core
+ jackson-annotations
+ ${jackson.version}
+
+
+ com.fasterxml.jackson.core
+ jackson-core
+ ${jackson.version}
junit
diff --git a/src/main/java/com/actiontech/dble/backend/heartbeat/MySQLDelayDetector.java b/src/main/java/com/actiontech/dble/backend/heartbeat/MySQLDelayDetector.java
index 9faba18aa..a9f863636 100644
--- a/src/main/java/com/actiontech/dble/backend/heartbeat/MySQLDelayDetector.java
+++ b/src/main/java/com/actiontech/dble/backend/heartbeat/MySQLDelayDetector.java
@@ -56,9 +56,16 @@ private void delayCal(long delay, long delayThreshold) {
heartbeat.setSlaveBehindMaster((int) delayVal);
heartbeat.setDbSynStatus(MySQLHeartbeat.DB_SYN_NORMAL);
} else {
- // master and slave maybe switch
- heartbeat.setSlaveBehindMaster(null);
- heartbeat.setDbSynStatus(MySQLHeartbeat.DB_SYN_ERROR);
+ if (logic == 0) {
+ long updatedLogic = dbGroup.getLogicTimestamp().updateAndGet(current -> Math.max(current, delay));
+ LOGGER.warn("delay detection rebased logic_timestamp to {} for dbGroup {}", updatedLogic, dbGroup.getGroupName());
+ heartbeat.setSlaveBehindMaster(0);
+ heartbeat.setDbSynStatus(MySQLHeartbeat.DB_SYN_NORMAL);
+ } else {
+ // master and slave maybe switch
+ heartbeat.setSlaveBehindMaster(null);
+ heartbeat.setDbSynStatus(MySQLHeartbeat.DB_SYN_ERROR);
+ }
}
}
}
diff --git a/src/main/java/com/actiontech/dble/backend/heartbeat/MySQLHeartbeat.java b/src/main/java/com/actiontech/dble/backend/heartbeat/MySQLHeartbeat.java
index 9410dd740..acc2c9510 100644
--- a/src/main/java/com/actiontech/dble/backend/heartbeat/MySQLHeartbeat.java
+++ b/src/main/java/com/actiontech/dble/backend/heartbeat/MySQLHeartbeat.java
@@ -84,7 +84,8 @@ public MySQLHeartbeat(PhysicalDbInstance dbInstance) {
this.heartbeatTimeout = dbInstance.getDbGroupConfig().getHeartbeatTimeout();
this.isDelayDetection = dbInstance.getDbGroupConfig().isDelayDetection();
if (isDelayDetection) {
- this.heartbeatSQL = getDetectorSql(dbInstance.getDbGroupConfig().getName(), dbInstance.getDbGroupConfig().getDelayDatabase());
+ this.heartbeatSQL = getDetectorSql(dbInstance.getDbGroupConfig().getName(),
+ dbInstance.getDbGroupConfig().getDelayDatabase(), dbInstance.isReadInstance());
} else {
this.heartbeatSQL = source.getDbGroupConfig().getHeartbeatSQL();
}
@@ -181,12 +182,12 @@ public void heartbeat() {
}
}
- private String getDetectorSql(String dbGroupName, String delayDatabase) {
+ private String getDetectorSql(String dbGroupName, String delayDatabase, boolean readInstance) {
String[] str = {"dble", dbGroupName, SystemConfig.getInstance().getInstanceName()};
String sourceName = Joiner.on("_").join(str);
String sqlTableName = delayDatabase + ".u_delay ";
String detectorSql;
- if (!source.isReadInstance()) {
+ if (!readInstance) {
String update = "replace into ? (source,real_timestamp,logic_timestamp) values ('?','?',?)";
detectorSql = convert(update, Lists.newArrayList(sqlTableName, sourceName));
} else {
@@ -199,9 +200,14 @@ private String getDetectorSql(String dbGroupName, String delayDatabase) {
private String convert(String template, List list) {
StringBuilder sb = new StringBuilder(template);
String replace = "?";
+ int fromIndex = 0;
for (String str : list) {
- int index = sb.indexOf(replace);
- sb.replace(index, index + 1, str);
+ int index = sb.indexOf(replace, fromIndex);
+ if (index < 0) {
+ throw new IllegalArgumentException("heartbeat sql template placeholder '?' not enough, template=" + template + ", values=" + list);
+ }
+ sb.replace(index, index + replace.length(), str);
+ fromIndex = index + str.length();
}
return sb.toString();
}
@@ -387,11 +393,17 @@ public long getHeartbeatTimeout() {
}
String getHeartbeatSQL() {
- if (isDelayDetection && !source.isReadInstance()) {
- return convert(heartbeatSQL, Lists.newArrayList(String.valueOf(LocalDateTime.now()), String.valueOf(source.getDbGroup().getLogicTimestamp().incrementAndGet())));
- } else {
- return heartbeatSQL;
+ if (isDelayDetection) {
+ boolean readInstance = source.isReadInstance();
+ String detectorSql = getDetectorSql(source.getDbGroupConfig().getName(),
+ source.getDbGroupConfig().getDelayDatabase(), readInstance);
+ if (!readInstance) {
+ return convert(detectorSql, Lists.newArrayList(String.valueOf(LocalDateTime.now()),
+ String.valueOf(source.getDbGroup().getLogicTimestamp().incrementAndGet())));
+ }
+ return detectorSql;
}
+ return heartbeatSQL;
}
public DbInstanceSyncRecorder getAsyncRecorder() {