diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java index c437507f4b085..9038e7c3a7191 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java @@ -374,7 +374,7 @@ public void stopAllPipesWithCriticalExceptionAndTrackException( ///////////////////////// Heartbeat ///////////////////////// public void collectPipeMetaList(final TDataNodeHeartbeatResp resp) throws TException { - if (!tryReadLockWithTimeOut( + if (!tryReadLockWithTimeOutInMs( CommonDescriptor.getInstance().getConfig().getDnConnectionTimeoutInMS() * 2L / 3)) { return; } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java index 726bc1b6a1f93..fcff317b3f337 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java @@ -125,6 +125,10 @@ protected boolean tryReadLockWithTimeOut(final long timeOutInSeconds) { } } + protected boolean tryReadLockWithTimeOutInMs(final long timeOutInMs) { + return tryReadLockWithTimeOut(convertMsToCeilSeconds(timeOutInMs)); + } + protected void releaseReadLock() { pipeMetaKeeper.releaseReadLock(); } @@ -143,10 +147,18 @@ protected boolean tryWriteLockWithTimeOut(final long timeOutInSeconds) { } } + protected boolean tryWriteLockWithTimeOutInMs(final long timeOutInMs) { + return tryWriteLockWithTimeOut(convertMsToCeilSeconds(timeOutInMs)); + } + protected void releaseWriteLock() { pipeMetaKeeper.releaseWriteLock(); } + private long convertMsToCeilSeconds(final long timeOutInMs) { + return Math.max(1L, (Math.max(0L, timeOutInMs) + 999L) / 1000L); + } + ////////////////////////// Pipe Task Management Entry ////////////////////////// public TPushPipeMetaRespExceptionMessage handleSinglePipeMetaChanges( @@ -363,7 +375,7 @@ protected TPushPipeMetaRespExceptionMessage handleDropPipeInternal(final String public List handlePipeMetaChanges( final List pipeMetaListFromCoordinator) { - if (!tryWriteLockWithTimeOut( + if (!tryWriteLockWithTimeOutInMs( CommonDescriptor.getInstance().getConfig().getDnConnectionTimeoutInMS() * 2L / 3)) { return null; } @@ -1091,7 +1103,7 @@ private void stopAllPipesWithCriticalExceptionInternal(final int currentNodeId) public void collectPipeMetaList(final TPipeHeartbeatReq req, final TPipeHeartbeatResp resp) throws TException { - if (!tryReadLockWithTimeOut( + if (!tryReadLockWithTimeOutInMs( CommonDescriptor.getInstance().getConfig().getDnConnectionTimeoutInMS() * 2L / 3)) { return; }