Skip to content

Commit e813218

Browse files
artembilansobychacko
authored andcommitted
GH-2943: Fix KT.clusterId for concurrency (#2944)
Fixes: #2943 The `if (this.kafkaAdmin != null && this.clusterId == null) {` condition might be always true for concurrent threads (especially virtual). Therefore, all of those threads are calling `this.kafkaAdmin.clusterId()` making unnecessary network chats to Kafka broker * Surround `this.kafkaAdmin.clusterId()` call with `Lock` **Cherry-pick to `3.0.x`**
1 parent 654c531 commit e813218

File tree

1 file changed

+13
-1
lines changed

1 file changed

+13
-1
lines changed

spring-kafka/src/main/java/org/springframework/kafka/core/KafkaTemplate.java

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@
2828
import java.util.concurrent.CompletableFuture;
2929
import java.util.concurrent.ExecutionException;
3030
import java.util.concurrent.Future;
31+
import java.util.concurrent.locks.Lock;
32+
import java.util.concurrent.locks.ReentrantLock;
3133
import java.util.function.Function;
3234

3335
import org.apache.commons.logging.LogFactory;
@@ -117,6 +119,8 @@ public class KafkaTemplate<K, V> implements KafkaOperations<K, V>, ApplicationCo
117119

118120
private final Map<String, String> micrometerTags = new HashMap<>();
119121

122+
private final Lock clusterIdLock = new ReentrantLock();
123+
120124
private String beanName = "kafkaTemplate";
121125

122126
private ApplicationContext applicationContext;
@@ -500,7 +504,15 @@ else if (this.micrometerEnabled) {
500504
@Nullable
501505
private String clusterId() {
502506
if (this.kafkaAdmin != null && this.clusterId == null) {
503-
this.clusterId = this.kafkaAdmin.clusterId();
507+
this.clusterIdLock.lock();
508+
try {
509+
if (this.clusterId == null) {
510+
this.clusterId = this.kafkaAdmin.clusterId();
511+
}
512+
}
513+
finally {
514+
this.clusterIdLock.unlock();
515+
}
504516
}
505517
return this.clusterId;
506518
}

0 commit comments

Comments
 (0)