From da5367792747b514cc72810f510d6743ab575215 Mon Sep 17 00:00:00 2001 From: wardli Date: Tue, 9 Dec 2025 15:54:12 +0800 Subject: [PATCH] [Bug]: TaskRuntime.stateLock and TableOptimizingProcess.lock can cause deadlocks. #4001 --- .../server/optimizing/OptimizingQueue.java | 24 ++++++++++++++++--- 1 file changed, 21 insertions(+), 3 deletions(-) diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/OptimizingQueue.java b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/OptimizingQueue.java index ec207705c7..187a074f13 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/OptimizingQueue.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/OptimizingQueue.java @@ -564,9 +564,16 @@ private void ackTask(OptimizingTaskId taskId, OptimizerThread thread) { private void completeTask(OptimizerThread thread, OptimizingTaskResult result) { TaskRuntime taskRuntime = getTaskRuntime(result.getTaskId()); + // Complete taskRuntime first (this acquires the database lock, but not under + // TableOptimizingProcess.this.lock protection) + // This avoids deadlock by ensuring consistent lock acquisition order (acquire + // TableOptimizingProcess.this.lock first, then database lock) + taskRuntime.complete(thread, result); + // After taskRuntime.complete() returns (database lock is released), acquire + // TableOptimizingProcess.this.lock and call acceptResult lock.lock(); try { - taskRuntime.complete(thread, result); + acceptResult(taskRuntime); } finally { lock.unlock(); } @@ -891,7 +898,14 @@ private void persistAndSetCompleted(boolean success) { } private void cancelTasks() { + // Cancel all tasks first (this acquires the database lock, but not under + // TableOptimizingProcess.this.lock protection) taskMap.values().forEach(TaskRuntime::tryCanceling); + // Then call acceptResult for each canceled task (while holding + // TableOptimizingProcess.this.lock) + // This avoids deadlock by ensuring consistent lock acquisition order (acquire + // TableOptimizingProcess.this.lock first, then database lock) + taskMap.values().forEach(this::acceptResult); } private void loadTaskRuntimes(OptimizingProcess optimizingProcess) { @@ -905,7 +919,9 @@ private void loadTaskRuntimes(OptimizingProcess optimizingProcess) { Map inputs = TaskFilesPersistence.loadTaskInputs(processId); taskRuntimes.forEach( taskRuntime -> { - taskRuntime.getCompletedFuture().whenCompleted(() -> acceptResult(taskRuntime)); + // Remove whenCompleted callback registration to avoid deadlock caused by + // synchronously calling acceptResult while holding database lock + // acceptResult will be explicitly called in completeTask taskRuntime .getTaskDescriptor() .setInput(inputs.get(taskRuntime.getTaskId().getTaskId())); @@ -935,7 +951,9 @@ private void loadTaskRuntimes(List taskDescriptors) { tableRuntime.getTableIdentifier(), taskRuntime.getTaskId(), taskRuntime.getSummary()); - taskRuntime.getCompletedFuture().whenCompleted(() -> acceptResult(taskRuntime)); + // Remove whenCompleted callback registration to avoid deadlock caused by synchronously + // calling acceptResult while holding database lock + // acceptResult will be explicitly called in completeTask taskMap.put(taskRuntime.getTaskId(), taskRuntime); taskQueue.offer(taskRuntime); }