Skip to content

Commit 6d2bd6f

Browse files
garyrussellartembilan
authored andcommitted
Manage log.dir in the EmbeddedKafkaBroker
Resolves #194 Create the temporary directory in EKB instead of the broker to avoid `NoSuchFileException`s during shutdown. **cherry-pick to 2.4.x, 2.3.x, 2.2.x**
1 parent 2db5263 commit 6d2bd6f

File tree

1 file changed

+17
-0
lines changed

1 file changed

+17
-0
lines changed

spring-kafka-test/src/main/java/org/springframework/kafka/test/EmbeddedKafkaBroker.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,9 @@
1818

1919
import java.io.File;
2020
import java.io.IOException;
21+
import java.io.UncheckedIOException;
2122
import java.net.InetSocketAddress;
23+
import java.nio.file.Files;
2224
import java.time.Duration;
2325
import java.util.ArrayList;
2426
import java.util.Arrays;
@@ -30,6 +32,7 @@
3032
import java.util.Map;
3133
import java.util.Properties;
3234
import java.util.Set;
35+
import java.util.UUID;
3336
import java.util.concurrent.TimeUnit;
3437
import java.util.concurrent.atomic.AtomicBoolean;
3538
import java.util.stream.Collectors;
@@ -288,6 +291,7 @@ public void afterPropertiesSet() {
288291
}
289292
this.zkConnect = LOOPBACK + ":" + this.zookeeper.getPort();
290293
this.kafkaServers.clear();
294+
boolean userLogDir = this.brokerProperties.get(KafkaConfig.LogDirProp()) != null && this.count == 1;
291295
for (int i = 0; i < this.count; i++) {
292296
Properties brokerConfigProperties = createBrokerProperties(i);
293297
brokerConfigProperties.setProperty(KafkaConfig.ReplicaSocketTimeoutMsProp(), "1000");
@@ -299,6 +303,9 @@ public void afterPropertiesSet() {
299303
if (!this.brokerProperties.containsKey(KafkaConfig.NumPartitionsProp())) {
300304
brokerConfigProperties.setProperty(KafkaConfig.NumPartitionsProp(), "" + this.partitionsPerTopic);
301305
}
306+
if (!userLogDir) {
307+
logDir(brokerConfigProperties);
308+
}
302309
KafkaServer server = TestUtils.createServer(new KafkaConfig(brokerConfigProperties), Time.SYSTEM);
303310
this.kafkaServers.add(server);
304311
if (this.kafkaPorts[i] == 0) {
@@ -316,6 +323,16 @@ public void afterPropertiesSet() {
316323
System.setProperty(SPRING_EMBEDDED_ZOOKEEPER_CONNECT, getZookeeperConnectionString());
317324
}
318325

326+
private void logDir(Properties brokerConfigProperties) {
327+
try {
328+
brokerConfigProperties.put(KafkaConfig.LogDirProp(),
329+
Files.createTempDirectory("spring.kafka." + UUID.randomUUID()).toString());
330+
}
331+
catch (IOException e) {
332+
throw new UncheckedIOException(e);
333+
}
334+
}
335+
319336
private void overrideExitMethods() {
320337
String exitMsg = "Exit.%s(%d, %s) called";
321338
Exit.setExitProcedure((statusCode, message) -> {

0 commit comments

Comments
 (0)