From 85bdaef15d2ed781b0b9e887f8041224e25e7629 Mon Sep 17 00:00:00 2001 From: saikatcse03 Date: Tue, 2 Jun 2026 11:50:24 +0530 Subject: [PATCH 01/12] implemented the kafka commit failure handling --- apache-kafka-4/pom.xml | 1 + .../batch/KafkaConsumerService.java | 60 ++++++ .../fixed/async/KafkaConsumerService.java | 172 ++++++++++++++++++ .../fixed/batch/KafkaConsumerService.java | 60 ++++++ .../sequential/KafkaConsumerService.java | 61 +++++++ .../sequential/KafkaConsumerService.java | 61 +++++++ .../batch/KafkaConsumerServiceLiveTest.java | 77 ++++++++ .../async/KafkaConsumerServiceLiveTest.java | 76 ++++++++ .../batch/KafkaConsumerServiceLiveTest.java | 75 ++++++++ .../KafkaConsumerServiceLiveTest.java | 75 ++++++++ .../KafkaConsumerServiceLiveTest.java | 77 ++++++++ 11 files changed, 795 insertions(+) create mode 100644 apache-kafka-4/src/main/java/com/baeldung/kafka/commitfailure/batch/KafkaConsumerService.java create mode 100644 apache-kafka-4/src/main/java/com/baeldung/kafka/commitfailure/fixed/async/KafkaConsumerService.java create mode 100644 apache-kafka-4/src/main/java/com/baeldung/kafka/commitfailure/fixed/batch/KafkaConsumerService.java create mode 100644 apache-kafka-4/src/main/java/com/baeldung/kafka/commitfailure/fixed/sequential/KafkaConsumerService.java create mode 100644 apache-kafka-4/src/main/java/com/baeldung/kafka/commitfailure/sequential/KafkaConsumerService.java create mode 100644 apache-kafka-4/src/test/java/com/baeldung/kafka/commitfailure/batch/KafkaConsumerServiceLiveTest.java create mode 100644 apache-kafka-4/src/test/java/com/baeldung/kafka/commitfailure/fixed/async/KafkaConsumerServiceLiveTest.java create mode 100644 apache-kafka-4/src/test/java/com/baeldung/kafka/commitfailure/fixed/batch/KafkaConsumerServiceLiveTest.java create mode 100644 apache-kafka-4/src/test/java/com/baeldung/kafka/commitfailure/fixed/sequential/KafkaConsumerServiceLiveTest.java create mode 100644 apache-kafka-4/src/test/java/com/baeldung/kafka/commitfailure/sequential/KafkaConsumerServiceLiveTest.java diff --git a/apache-kafka-4/pom.xml b/apache-kafka-4/pom.xml index ffdd84b03754..1cad893182ff 100644 --- a/apache-kafka-4/pom.xml +++ b/apache-kafka-4/pom.xml @@ -100,6 +100,7 @@ + 21 3.9.0 1.19.3 1.19.3 diff --git a/apache-kafka-4/src/main/java/com/baeldung/kafka/commitfailure/batch/KafkaConsumerService.java b/apache-kafka-4/src/main/java/com/baeldung/kafka/commitfailure/batch/KafkaConsumerService.java new file mode 100644 index 000000000000..b81c036297e9 --- /dev/null +++ b/apache-kafka-4/src/main/java/com/baeldung/kafka/commitfailure/batch/KafkaConsumerService.java @@ -0,0 +1,60 @@ +package com.baeldung.kafka.commitfailure.batch; + +import java.time.Duration; +import java.util.List; +import java.util.Properties; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.errors.WakeupException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class KafkaConsumerService { + + private static final Logger log = LoggerFactory.getLogger(KafkaConsumerService.class); + private final KafkaConsumer consumer; + private final AtomicBoolean running = new AtomicBoolean(true); + + public KafkaConsumerService(Properties consumerProps, String topic) { + this.consumer = new KafkaConsumer<>(consumerProps); + consumer.subscribe(List.of(topic)); + } + + public void consume() { + try { + while (running.get()) { + ConsumerRecords records = consumer.poll(Duration.ofMillis(250)); + if (records.isEmpty()) { + continue; + } + simulateDBCall(records); + consumer.commitSync(); + } + } catch (WakeupException ex) { + if (running.get()) { + log.error("Error in the Kafka Consumer with exception {} {}", ex.getMessage(), ex, ex.getCause()); + throw ex; + } + } finally { + consumer.close(); + } + } + + public void shutdown() { + running.compareAndSet(true, false); + consumer.wakeup(); + } + + private void simulateDBCall(ConsumerRecords records) { + try { + log.info("Simulating long running batch db update for records {}", records.count()); + Thread.sleep(1000L); + } catch (InterruptedException ex) { + Thread.currentThread() + .interrupt(); + throw new RuntimeException(ex); + } + } +} diff --git a/apache-kafka-4/src/main/java/com/baeldung/kafka/commitfailure/fixed/async/KafkaConsumerService.java b/apache-kafka-4/src/main/java/com/baeldung/kafka/commitfailure/fixed/async/KafkaConsumerService.java new file mode 100644 index 000000000000..7d9a2dafe47b --- /dev/null +++ b/apache-kafka-4/src/main/java/com/baeldung/kafka/commitfailure/fixed/async/KafkaConsumerService.java @@ -0,0 +1,172 @@ +package com.baeldung.kafka.commitfailure.fixed.async; + +import java.time.Duration; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.StreamSupport; + +import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.WakeupException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class KafkaConsumerService { + + private static final Logger log = LoggerFactory.getLogger(KafkaConsumerService.class); + private final KafkaConsumer consumer; + private final AtomicBoolean running = new AtomicBoolean(true); + private final ExecutorService workers; + private final Map committableOffsets = new ConcurrentHashMap<>(); + + public KafkaConsumerService(Properties consumerProps, String topic) { + this.consumer = new KafkaConsumer<>(consumerProps); + consumer.subscribe(List.of(topic), new ConsumerRebalanceListener() { + @Override + public void onPartitionsRevoked(Collection partitions) { + try { + commitOffsets(partitions); + } catch (Exception ex) { + log.error("Commit failed during rebalance {} {}", ex.getMessage(), ex, ex.getCause()); + } finally { + partitions.forEach(committableOffsets::remove); + } + } + + @Override + public void onPartitionsAssigned(Collection partitions) { + partitions.forEach(committableOffsets::remove); + } + }); + workers = Executors.newVirtualThreadPerTaskExecutor(); + } + + public void consume() { + try { + while (running.get()) { + ConsumerRecords records = consumer.poll(Duration.ofMillis(100)); + if (records.isEmpty()) { + continue; + } + + List> futures = StreamSupport.stream(records.spliterator(), false) + .map(record -> CompletableFuture.runAsync(() -> simulateDBUpdate(record), workers) + .whenComplete((ignored, ex) -> { + if (ex == null) { + markComplete(record); + } else { + log.error("Failed offset and send to DLQ {} {} {}", record.offset(), record.key(), ex.getMessage()); + } + }) + .exceptionally(ex -> null)) + .toList(); + + try { + CompletableFuture.allOf(futures.toArray(CompletableFuture[]::new)) + .orTimeout(1000, TimeUnit.MILLISECONDS) + .join(); + } catch (CompletionException ex) { + log.error("Batch processing timed out or failed {} {}", ex.getMessage(), ex, ex.getCause()); + } + + commitOffsets(); + } + } catch (WakeupException ex) { + if (running.get()) { + log.error("Error in the Kafka Consumer with exception {} {}", ex.getMessage(), ex, ex.getCause()); + throw ex; + } + } finally { + commitOffsets(); + consumer.close(); + } + } + + public void shutdown() { + running.compareAndSet(true, false); + consumer.wakeup(); + try { + workers.shutdown(); + if (!workers.awaitTermination(60, TimeUnit.SECONDS)) { + workers.shutdownNow(); + } + } catch (InterruptedException ex) { + workers.shutdownNow(); + Thread.currentThread() + .interrupt(); + } + } + + private void simulateDBUpdate(ConsumerRecord record) { + try { + log.info("Simulating a db call - record key {} value {}", record.key(), record.value()); + Thread.sleep(300L); + } catch (InterruptedException ex) { + Thread.currentThread() + .interrupt(); + throw new RuntimeException(ex); + } + } + + private void markComplete(ConsumerRecord record) { + TopicPartition tp = new TopicPartition(record.topic(), record.partition()); + committableOffsets.computeIfAbsent(tp, k -> new AtomicLong(-1)) + .accumulateAndGet(record.offset() + 1, Math::max); + } + + private void commitOffsets() { + Map toCommit = new HashMap<>(); + + committableOffsets.forEach((tp, atomicOffset) -> { + long val = atomicOffset.get(); + if (val != -1L) { + toCommit.put(tp, new OffsetAndMetadata(val)); + } + }); + + if (toCommit.isEmpty()) { + return; + } + consumer.commitSync(toCommit); + toCommit.forEach((tp, meta) -> { + AtomicLong ref = committableOffsets.get(tp); + if (ref != null) { + ref.compareAndSet(meta.offset(), -1L); + } + }); + } + + private void commitOffsets(Collection partitions) { + Map toCommit = new HashMap<>(); + + partitions.forEach(tp -> { + AtomicLong ref = committableOffsets.get(tp); + if (ref != null) { + long val = ref.get(); + if (val != -1L) { + toCommit.put(tp, new OffsetAndMetadata(val)); + } + } + }); + + if (toCommit.isEmpty()) { + return; + } + consumer.commitSync(toCommit); + } +} diff --git a/apache-kafka-4/src/main/java/com/baeldung/kafka/commitfailure/fixed/batch/KafkaConsumerService.java b/apache-kafka-4/src/main/java/com/baeldung/kafka/commitfailure/fixed/batch/KafkaConsumerService.java new file mode 100644 index 000000000000..7bc84ddd3948 --- /dev/null +++ b/apache-kafka-4/src/main/java/com/baeldung/kafka/commitfailure/fixed/batch/KafkaConsumerService.java @@ -0,0 +1,60 @@ +package com.baeldung.kafka.commitfailure.fixed.batch; + +import java.time.Duration; +import java.util.List; +import java.util.Properties; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.errors.WakeupException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class KafkaConsumerService { + + private static final Logger log = LoggerFactory.getLogger(KafkaConsumerService.class); + private final KafkaConsumer consumer; + private final AtomicBoolean running = new AtomicBoolean(true); + + public KafkaConsumerService(Properties consumerProps, String topic) { + this.consumer = new KafkaConsumer<>(consumerProps); + consumer.subscribe(List.of(topic)); + } + + public void consume() { + try { + while (running.get()) { + ConsumerRecords records = consumer.poll(Duration.ofMillis(100)); + if (records.isEmpty()) { + continue; + } + simulateDBUpdate(records); + consumer.commitSync(); + } + } catch (WakeupException ex) { + if (running.get()) { + log.error("Error in the Kafka Consumer with exception {} {}", ex.getMessage(), ex, ex.getCause()); + throw ex; + } + } finally { + consumer.close(); + } + } + + public void shutdown() { + running.compareAndSet(true, false); + consumer.wakeup(); + } + + private void simulateDBUpdate(ConsumerRecords records) { + try { + log.info("Simulating long running batch db update {}", records.count()); + Thread.sleep(1000L); + } catch (InterruptedException ex) { + Thread.currentThread() + .interrupt(); + throw new RuntimeException(ex); + } + } +} diff --git a/apache-kafka-4/src/main/java/com/baeldung/kafka/commitfailure/fixed/sequential/KafkaConsumerService.java b/apache-kafka-4/src/main/java/com/baeldung/kafka/commitfailure/fixed/sequential/KafkaConsumerService.java new file mode 100644 index 000000000000..7d677f7d8cc1 --- /dev/null +++ b/apache-kafka-4/src/main/java/com/baeldung/kafka/commitfailure/fixed/sequential/KafkaConsumerService.java @@ -0,0 +1,61 @@ +package com.baeldung.kafka.commitfailure.fixed.sequential; + +import java.time.Duration; +import java.util.List; +import java.util.Properties; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.errors.WakeupException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class KafkaConsumerService { + + private static final Logger log = LoggerFactory.getLogger(KafkaConsumerService.class); + private final KafkaConsumer consumer; + private final AtomicBoolean running = new AtomicBoolean(true); + + public KafkaConsumerService(Properties consumerProps, String topic) { + this.consumer = new KafkaConsumer<>(consumerProps); + consumer.subscribe(List.of(topic)); + } + + public void consume() { + try { + while (running.get()) { + ConsumerRecords records = consumer.poll(Duration.ofMillis(100)); + if (records.isEmpty()) { + continue; + } + records.forEach(this::simulateDBUpdate); + consumer.commitSync(); + } + } catch (WakeupException ex) { + if (running.get()) { + log.error("Error in the Kafka Consumer with exception {} {}", ex.getMessage(), ex, ex.getCause()); + throw ex; + } + } finally { + consumer.close(); + } + } + + public void shutdown() { + running.compareAndSet(true, false); + consumer.wakeup(); + } + + private void simulateDBUpdate(ConsumerRecord record) { + try { + log.info("Simulating a db call - record key {} value {}", record.key(), record.value()); + Thread.sleep(100L); + } catch (InterruptedException ex) { + Thread.currentThread() + .interrupt(); + throw new RuntimeException(ex); + } + } +} diff --git a/apache-kafka-4/src/main/java/com/baeldung/kafka/commitfailure/sequential/KafkaConsumerService.java b/apache-kafka-4/src/main/java/com/baeldung/kafka/commitfailure/sequential/KafkaConsumerService.java new file mode 100644 index 000000000000..078553600015 --- /dev/null +++ b/apache-kafka-4/src/main/java/com/baeldung/kafka/commitfailure/sequential/KafkaConsumerService.java @@ -0,0 +1,61 @@ +package com.baeldung.kafka.commitfailure.sequential; + +import java.time.Duration; +import java.util.List; +import java.util.Properties; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.errors.WakeupException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class KafkaConsumerService { + + private static final Logger log = LoggerFactory.getLogger(KafkaConsumerService.class); + private final KafkaConsumer consumer; + private final AtomicBoolean running = new AtomicBoolean(true); + + public KafkaConsumerService(Properties props, String topic) { + this.consumer = new KafkaConsumer<>(props); + consumer.subscribe(List.of(topic)); + } + + public void consume() { + try { + while (running.get()) { + ConsumerRecords records = consumer.poll(Duration.ofMillis(100)); + if (records.isEmpty()) { + continue; + } + records.forEach(this::simulateDBCall); + consumer.commitSync(); + } + } catch (WakeupException ex) { + if (running.get()) { + log.error("Error in the Kafka Consumer with exception {} {}", ex.getMessage(), ex, ex.getCause()); + throw ex; + } + } finally { + consumer.close(); + } + } + + public void shutdown() { + running.compareAndSet(true, false); + consumer.wakeup(); + } + + private void simulateDBCall(ConsumerRecord record) { + try { + log.info("Simulating a DB call - record key {} value {}", record.key(), record.value()); + Thread.sleep(100L); + } catch (InterruptedException ex) { + Thread.currentThread() + .interrupt(); + throw new RuntimeException(ex); + } + } +} diff --git a/apache-kafka-4/src/test/java/com/baeldung/kafka/commitfailure/batch/KafkaConsumerServiceLiveTest.java b/apache-kafka-4/src/test/java/com/baeldung/kafka/commitfailure/batch/KafkaConsumerServiceLiveTest.java new file mode 100644 index 000000000000..2fa5679028bf --- /dev/null +++ b/apache-kafka-4/src/test/java/com/baeldung/kafka/commitfailure/batch/KafkaConsumerServiceLiveTest.java @@ -0,0 +1,77 @@ +package com.baeldung.kafka.commitfailure.batch; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.util.Properties; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +import org.apache.kafka.clients.consumer.CommitFailedException; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.junit.jupiter.api.Test; +import org.testcontainers.containers.KafkaContainer; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; +import org.testcontainers.utility.DockerImageName; + +@Testcontainers +public class KafkaConsumerServiceLiveTest { + + @Container + private static final KafkaContainer KAFKA_CONTAINER = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:7.9.0")); + + @Test + void givenProducerMessagesAreSent_whenConsumerIsRunningAsBatch_thenConsumerThrowsCommitFailedException() throws InterruptedException { + CountDownLatch countDownLatch = new CountDownLatch(1); + AtomicReference uncaughtException = new AtomicReference<>(); + + KafkaConsumerService kafkaConsumerService = new KafkaConsumerService(getConsumerConfig(), "test-topic"); + Thread th = new Thread(kafkaConsumerService::consume); + th.setUncaughtExceptionHandler((thread, ex) -> { + uncaughtException.set(ex); + countDownLatch.countDown(); + }); + th.start(); + + try (KafkaProducer producer = new KafkaProducer<>(getProducerConfig())) { + for (int num = 0; num < 300; num++) { + producer.send(new ProducerRecord<>("test-topic", "x1" + num, "test" + num)); + producer.flush(); + } + } + + countDownLatch.await(30, TimeUnit.SECONDS); + assertThat(uncaughtException.get()).isInstanceOf(CommitFailedException.class); + + kafkaConsumerService.shutdown(); + } + + private static Properties getProducerConfig() { + Properties producerProperties = new Properties(); + producerProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_CONTAINER.getBootstrapServers()); + producerProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + producerProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + + return producerProperties; + } + + private static Properties getConsumerConfig() { + Properties consumerProperties = new Properties(); + consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_CONTAINER.getBootstrapServers()); + consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, "test-batch-consumer-app"); + consumerProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); + consumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); + consumerProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + consumerProperties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); + consumerProperties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 50); + consumerProperties.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 500); + + return consumerProperties; + } +} diff --git a/apache-kafka-4/src/test/java/com/baeldung/kafka/commitfailure/fixed/async/KafkaConsumerServiceLiveTest.java b/apache-kafka-4/src/test/java/com/baeldung/kafka/commitfailure/fixed/async/KafkaConsumerServiceLiveTest.java new file mode 100644 index 000000000000..0f6523cc2eaa --- /dev/null +++ b/apache-kafka-4/src/test/java/com/baeldung/kafka/commitfailure/fixed/async/KafkaConsumerServiceLiveTest.java @@ -0,0 +1,76 @@ +package com.baeldung.kafka.commitfailure.fixed.async; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.util.Properties; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.junit.jupiter.api.Test; +import org.testcontainers.containers.KafkaContainer; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; +import org.testcontainers.utility.DockerImageName; + +@Testcontainers +public class KafkaConsumerServiceLiveTest { + + @Container + private static final KafkaContainer KAFKA_CONTAINER = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:7.9.0")); + + @Test + void givenProducerMessagesAreSent_whenConsumerIsRunning_thenConsumerDoesNotThrowCommitFailedException() throws InterruptedException { + CountDownLatch countDownLatch = new CountDownLatch(1); + AtomicReference uncaughtException = new AtomicReference<>(); + KafkaConsumerService kafkaConsumerService = new KafkaConsumerService(getConsumerConfig(), "test-topic"); + + Thread th = new Thread(kafkaConsumerService::consume); + th.setUncaughtExceptionHandler((thread, ex) -> { + uncaughtException.set(ex); + countDownLatch.countDown(); + }); + th.start(); + + try (KafkaProducer producer = new KafkaProducer<>(getProducerConfig())) { + for (int num = 0; num < 100; num++) { + producer.send(new ProducerRecord<>("test-topic", "x1" + num, "test" + num)); + producer.flush(); + } + } + + countDownLatch.await(30, TimeUnit.SECONDS); + assertThat(uncaughtException.get()).doesNotThrowAnyException(); + + kafkaConsumerService.shutdown(); + } + + private static Properties getProducerConfig() { + Properties producerProperties = new Properties(); + producerProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_CONTAINER.getBootstrapServers()); + producerProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + producerProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + + return producerProperties; + } + + private static Properties getConsumerConfig() { + Properties consumerProperties = new Properties(); + consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_CONTAINER.getBootstrapServers()); + consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, "test-async-consumer-app"); + consumerProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); + consumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); + consumerProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"); + consumerProperties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); + consumerProperties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 30); + consumerProperties.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 1000); + + return consumerProperties; + } +} diff --git a/apache-kafka-4/src/test/java/com/baeldung/kafka/commitfailure/fixed/batch/KafkaConsumerServiceLiveTest.java b/apache-kafka-4/src/test/java/com/baeldung/kafka/commitfailure/fixed/batch/KafkaConsumerServiceLiveTest.java new file mode 100644 index 000000000000..25e3e5aa90fc --- /dev/null +++ b/apache-kafka-4/src/test/java/com/baeldung/kafka/commitfailure/fixed/batch/KafkaConsumerServiceLiveTest.java @@ -0,0 +1,75 @@ +package com.baeldung.kafka.commitfailure.fixed.batch; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.util.Properties; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.junit.jupiter.api.Test; +import org.testcontainers.containers.KafkaContainer; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; +import org.testcontainers.utility.DockerImageName; + +@Testcontainers +public class KafkaConsumerServiceLiveTest { + + @Container + private static final KafkaContainer KAFKA_CONTAINER = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:7.9.0")); + + @Test + void givenProducerMessagesAreSent_whenConsumerIsRunningAsBatch_thenConsumesDoesNotThrowCommitFailedException() throws InterruptedException { + CountDownLatch countDownLatch = new CountDownLatch(1); + AtomicReference uncaughtException = new AtomicReference<>(); + + KafkaConsumerService kafkaConsumerService = new KafkaConsumerService(getConsumerConfig(), "test-topic"); + Thread th = new Thread(kafkaConsumerService::consume); + th.setUncaughtExceptionHandler((thread, ex) -> { + uncaughtException.set(ex); + countDownLatch.countDown(); + }); + th.start(); + + try (KafkaProducer producer = new KafkaProducer<>(getProducerConfig())) { + for (int num = 0; num < 300; num++) { + producer.send(new ProducerRecord<>("test-topic", "x1" + num, "test" + num)); + producer.flush(); + } + } + + countDownLatch.await(30, TimeUnit.SECONDS); + assertThat(uncaughtException.get()).doesNotThrowAnyException(); + kafkaConsumerService.shutdown(); + } + + private static Properties getProducerConfig() { + Properties producerProperties = new Properties(); + producerProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_CONTAINER.getBootstrapServers()); + producerProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + producerProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + + return producerProperties; + } + + private static Properties getConsumerConfig() { + Properties consumerProperties = new Properties(); + consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_CONTAINER.getBootstrapServers()); + consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, "test-batch-consumer-app-fixed"); + consumerProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); + consumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); + consumerProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + consumerProperties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); + consumerProperties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 50); + consumerProperties.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 3000); + + return consumerProperties; + } +} diff --git a/apache-kafka-4/src/test/java/com/baeldung/kafka/commitfailure/fixed/sequential/KafkaConsumerServiceLiveTest.java b/apache-kafka-4/src/test/java/com/baeldung/kafka/commitfailure/fixed/sequential/KafkaConsumerServiceLiveTest.java new file mode 100644 index 000000000000..e51ae03f379b --- /dev/null +++ b/apache-kafka-4/src/test/java/com/baeldung/kafka/commitfailure/fixed/sequential/KafkaConsumerServiceLiveTest.java @@ -0,0 +1,75 @@ +package com.baeldung.kafka.commitfailure.fixed.sequential; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.util.Properties; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.junit.jupiter.api.Test; +import org.testcontainers.containers.KafkaContainer; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; +import org.testcontainers.utility.DockerImageName; + +@Testcontainers +public class KafkaConsumerServiceLiveTest { + + @Container + private static final KafkaContainer KAFKA_CONTAINER = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:7.9.0")); + + @Test + void givenProducerMessagesIsSent_whenConsumerIsRunning_thenConsumerDoesNotThrowsCommitFailedException() throws InterruptedException { + KafkaConsumerService kafkaConsumerService = new KafkaConsumerService(getConsumerConfig(), "test-topic"); + CountDownLatch countDownLatch = new CountDownLatch(1); + AtomicReference uncaughtException = new AtomicReference<>(); + + Thread th = new Thread(kafkaConsumerService::consume); + th.setUncaughtExceptionHandler((thread, ex) -> { + uncaughtException.set(ex); + countDownLatch.countDown(); + }); + + th.start(); + + try (KafkaProducer producer = new KafkaProducer<>(getProducerConfig())) { + producer.send(new ProducerRecord<>("test-topic", "x1", "test")); + producer.flush(); + } + + countDownLatch.await(10, TimeUnit.SECONDS); + assertThat(uncaughtException.get()).doesNotThrowAnyException(); + + kafkaConsumerService.shutdown(); + } + + private static Properties getProducerConfig() { + Properties producerProperties = new Properties(); + producerProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_CONTAINER.getBootstrapServers()); + producerProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + producerProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + + return producerProperties; + } + + private static Properties getConsumerConfig() { + Properties consumerProperties = new Properties(); + consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_CONTAINER.getBootstrapServers()); + consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, "test-seq-consumer-app-fixed"); + consumerProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); + consumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); + consumerProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + consumerProperties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); + consumerProperties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1); + consumerProperties.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 300); + + return consumerProperties; + } +} diff --git a/apache-kafka-4/src/test/java/com/baeldung/kafka/commitfailure/sequential/KafkaConsumerServiceLiveTest.java b/apache-kafka-4/src/test/java/com/baeldung/kafka/commitfailure/sequential/KafkaConsumerServiceLiveTest.java new file mode 100644 index 000000000000..f4648c8e0c0c --- /dev/null +++ b/apache-kafka-4/src/test/java/com/baeldung/kafka/commitfailure/sequential/KafkaConsumerServiceLiveTest.java @@ -0,0 +1,77 @@ +package com.baeldung.kafka.commitfailure.sequential; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.util.Properties; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +import org.apache.kafka.clients.consumer.CommitFailedException; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.junit.jupiter.api.Test; +import org.testcontainers.containers.KafkaContainer; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; +import org.testcontainers.utility.DockerImageName; + +@Testcontainers +public class KafkaConsumerServiceLiveTest { + + @Container + private static final KafkaContainer KAFKA_CONTAINER = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:7.9.0")); + + @Test + void givenProducerMessageIsSent_whenConsumerIsRunning_thenConsumerThrowsCommitFailedException() throws InterruptedException { + KafkaConsumerService kafkaConsumerService = new KafkaConsumerService(getConsumerConfig(), "test-topic"); + CountDownLatch countDownLatch = new CountDownLatch(1); + AtomicReference uncaughtException = new AtomicReference<>(); + + Thread th = new Thread(kafkaConsumerService::consume); + th.setUncaughtExceptionHandler((thread, ex) -> { + uncaughtException.set(ex); + countDownLatch.countDown(); + }); + + th.start(); + + try (KafkaProducer producer = new KafkaProducer<>(getProducerConfig())) { + producer.send(new ProducerRecord<>("test-topic", "x1", "test1")); + producer.flush(); + } + + countDownLatch.await(30, TimeUnit.SECONDS); + assertThat(uncaughtException.get()).isInstanceOf(CommitFailedException.class); + + kafkaConsumerService.shutdown(); + } + + private static Properties getProducerConfig() { + Properties producerProperties = new Properties(); + producerProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_CONTAINER.getBootstrapServers()); + producerProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + producerProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + + return producerProperties; + } + + private static Properties getConsumerConfig() { + Properties consumerProperties = new Properties(); + consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_CONTAINER.getBootstrapServers()); + consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, "seq-consumer-app"); + consumerProperties.put(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, "consumer-1"); + consumerProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); + consumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); + consumerProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + consumerProperties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); + consumerProperties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1); + consumerProperties.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 50); + + return consumerProperties; + } +} From 3a48a804a4771fc4d232f926abb38b3c3d31f788 Mon Sep 17 00:00:00 2001 From: saikatcse03 Date: Tue, 2 Jun 2026 13:47:24 +0530 Subject: [PATCH 02/12] log error update --- .../kafka/commitfailure/batch/KafkaConsumerService.java | 2 +- .../commitfailure/fixed/async/KafkaConsumerService.java | 6 +++--- .../commitfailure/fixed/batch/KafkaConsumerService.java | 2 +- .../fixed/sequential/KafkaConsumerService.java | 2 +- .../commitfailure/sequential/KafkaConsumerService.java | 2 +- 5 files changed, 7 insertions(+), 7 deletions(-) diff --git a/apache-kafka-4/src/main/java/com/baeldung/kafka/commitfailure/batch/KafkaConsumerService.java b/apache-kafka-4/src/main/java/com/baeldung/kafka/commitfailure/batch/KafkaConsumerService.java index b81c036297e9..343f3e584409 100644 --- a/apache-kafka-4/src/main/java/com/baeldung/kafka/commitfailure/batch/KafkaConsumerService.java +++ b/apache-kafka-4/src/main/java/com/baeldung/kafka/commitfailure/batch/KafkaConsumerService.java @@ -34,7 +34,7 @@ public void consume() { } } catch (WakeupException ex) { if (running.get()) { - log.error("Error in the Kafka Consumer with exception {} {}", ex.getMessage(), ex, ex.getCause()); + log.error("Error in the Kafka Consumer with exception", ex); throw ex; } } finally { diff --git a/apache-kafka-4/src/main/java/com/baeldung/kafka/commitfailure/fixed/async/KafkaConsumerService.java b/apache-kafka-4/src/main/java/com/baeldung/kafka/commitfailure/fixed/async/KafkaConsumerService.java index 7d9a2dafe47b..0b09bdc03f64 100644 --- a/apache-kafka-4/src/main/java/com/baeldung/kafka/commitfailure/fixed/async/KafkaConsumerService.java +++ b/apache-kafka-4/src/main/java/com/baeldung/kafka/commitfailure/fixed/async/KafkaConsumerService.java @@ -42,7 +42,7 @@ public void onPartitionsRevoked(Collection partitions) { try { commitOffsets(partitions); } catch (Exception ex) { - log.error("Commit failed during rebalance {} {}", ex.getMessage(), ex, ex.getCause()); + log.error("Commit failed during rebalance", ex); } finally { partitions.forEach(committableOffsets::remove); } @@ -81,14 +81,14 @@ public void consume() { .orTimeout(1000, TimeUnit.MILLISECONDS) .join(); } catch (CompletionException ex) { - log.error("Batch processing timed out or failed {} {}", ex.getMessage(), ex, ex.getCause()); + log.error("Batch processing timed out or failed", ex); } commitOffsets(); } } catch (WakeupException ex) { if (running.get()) { - log.error("Error in the Kafka Consumer with exception {} {}", ex.getMessage(), ex, ex.getCause()); + log.error("Error in the Kafka Consumer with exception", ex); throw ex; } } finally { diff --git a/apache-kafka-4/src/main/java/com/baeldung/kafka/commitfailure/fixed/batch/KafkaConsumerService.java b/apache-kafka-4/src/main/java/com/baeldung/kafka/commitfailure/fixed/batch/KafkaConsumerService.java index 7bc84ddd3948..7435da74f9bf 100644 --- a/apache-kafka-4/src/main/java/com/baeldung/kafka/commitfailure/fixed/batch/KafkaConsumerService.java +++ b/apache-kafka-4/src/main/java/com/baeldung/kafka/commitfailure/fixed/batch/KafkaConsumerService.java @@ -34,7 +34,7 @@ public void consume() { } } catch (WakeupException ex) { if (running.get()) { - log.error("Error in the Kafka Consumer with exception {} {}", ex.getMessage(), ex, ex.getCause()); + log.error("Error in the Kafka Consumer with exception", ex); throw ex; } } finally { diff --git a/apache-kafka-4/src/main/java/com/baeldung/kafka/commitfailure/fixed/sequential/KafkaConsumerService.java b/apache-kafka-4/src/main/java/com/baeldung/kafka/commitfailure/fixed/sequential/KafkaConsumerService.java index 7d677f7d8cc1..af142471fb58 100644 --- a/apache-kafka-4/src/main/java/com/baeldung/kafka/commitfailure/fixed/sequential/KafkaConsumerService.java +++ b/apache-kafka-4/src/main/java/com/baeldung/kafka/commitfailure/fixed/sequential/KafkaConsumerService.java @@ -35,7 +35,7 @@ public void consume() { } } catch (WakeupException ex) { if (running.get()) { - log.error("Error in the Kafka Consumer with exception {} {}", ex.getMessage(), ex, ex.getCause()); + log.error("Error in the Kafka Consumer with exception", ex); throw ex; } } finally { diff --git a/apache-kafka-4/src/main/java/com/baeldung/kafka/commitfailure/sequential/KafkaConsumerService.java b/apache-kafka-4/src/main/java/com/baeldung/kafka/commitfailure/sequential/KafkaConsumerService.java index 078553600015..39dea62a5a6e 100644 --- a/apache-kafka-4/src/main/java/com/baeldung/kafka/commitfailure/sequential/KafkaConsumerService.java +++ b/apache-kafka-4/src/main/java/com/baeldung/kafka/commitfailure/sequential/KafkaConsumerService.java @@ -35,7 +35,7 @@ public void consume() { } } catch (WakeupException ex) { if (running.get()) { - log.error("Error in the Kafka Consumer with exception {} {}", ex.getMessage(), ex, ex.getCause()); + log.error("Error in the Kafka Consumer with exception", ex); throw ex; } } finally { From e9226de72d33ed88a51a04b330dc33b96179d12b Mon Sep 17 00:00:00 2001 From: saikatcse03 Date: Tue, 2 Jun 2026 18:34:30 +0530 Subject: [PATCH 03/12] log error update --- .../fixed/async/KafkaConsumerService.java | 26 +++++++++++-------- 1 file changed, 15 insertions(+), 11 deletions(-) diff --git a/apache-kafka-4/src/main/java/com/baeldung/kafka/commitfailure/fixed/async/KafkaConsumerService.java b/apache-kafka-4/src/main/java/com/baeldung/kafka/commitfailure/fixed/async/KafkaConsumerService.java index 0b09bdc03f64..82c32fbcceb1 100644 --- a/apache-kafka-4/src/main/java/com/baeldung/kafka/commitfailure/fixed/async/KafkaConsumerService.java +++ b/apache-kafka-4/src/main/java/com/baeldung/kafka/commitfailure/fixed/async/KafkaConsumerService.java @@ -64,17 +64,7 @@ public void consume() { continue; } - List> futures = StreamSupport.stream(records.spliterator(), false) - .map(record -> CompletableFuture.runAsync(() -> simulateDBUpdate(record), workers) - .whenComplete((ignored, ex) -> { - if (ex == null) { - markComplete(record); - } else { - log.error("Failed offset and send to DLQ {} {} {}", record.offset(), record.key(), ex.getMessage()); - } - }) - .exceptionally(ex -> null)) - .toList(); + List> futures = processAsync(records); try { CompletableFuture.allOf(futures.toArray(CompletableFuture[]::new)) @@ -112,6 +102,20 @@ public void shutdown() { } } + private List> processAsync(ConsumerRecords records) { + return StreamSupport.stream(records.spliterator(), false) + .map(record -> CompletableFuture.runAsync(() -> simulateDBUpdate(record), workers) + .whenComplete((ignored, ex) -> { + if (ex == null) { + markComplete(record); + } else { + log.error("Failed offset and send to DLQ {} {} {}", record.offset(), record.key(), ex.getMessage()); + } + }) + .exceptionally(ex -> null)) + .toList(); + } + private void simulateDBUpdate(ConsumerRecord record) { try { log.info("Simulating a db call - record key {} value {}", record.key(), record.value()); From 93b3a415679b8c88c20d77009540862ddf54a21b Mon Sep 17 00:00:00 2001 From: saikatcse03 Date: Mon, 8 Jun 2026 16:54:30 +0530 Subject: [PATCH 04/12] code refactor --- .../fixed/async/KafkaConsumerService.java | 2 +- .../batch/KafkaConsumerServiceLiveTest.java | 7 +-- .../async/KafkaConsumerServiceLiveTest.java | 50 +++++++++++++------ .../batch/KafkaConsumerServiceLiveTest.java | 46 ++++++++++++----- .../KafkaConsumerServiceLiveTest.java | 43 +++++++++++----- .../KafkaConsumerServiceLiveTest.java | 13 +++-- 6 files changed, 114 insertions(+), 47 deletions(-) diff --git a/apache-kafka-4/src/main/java/com/baeldung/kafka/commitfailure/fixed/async/KafkaConsumerService.java b/apache-kafka-4/src/main/java/com/baeldung/kafka/commitfailure/fixed/async/KafkaConsumerService.java index 82c32fbcceb1..2476b7d0e338 100644 --- a/apache-kafka-4/src/main/java/com/baeldung/kafka/commitfailure/fixed/async/KafkaConsumerService.java +++ b/apache-kafka-4/src/main/java/com/baeldung/kafka/commitfailure/fixed/async/KafkaConsumerService.java @@ -68,7 +68,7 @@ public void consume() { try { CompletableFuture.allOf(futures.toArray(CompletableFuture[]::new)) - .orTimeout(1000, TimeUnit.MILLISECONDS) + .orTimeout(700, TimeUnit.MILLISECONDS) .join(); } catch (CompletionException ex) { log.error("Batch processing timed out or failed", ex); diff --git a/apache-kafka-4/src/test/java/com/baeldung/kafka/commitfailure/batch/KafkaConsumerServiceLiveTest.java b/apache-kafka-4/src/test/java/com/baeldung/kafka/commitfailure/batch/KafkaConsumerServiceLiveTest.java index 2fa5679028bf..069b8a224d1c 100644 --- a/apache-kafka-4/src/test/java/com/baeldung/kafka/commitfailure/batch/KafkaConsumerServiceLiveTest.java +++ b/apache-kafka-4/src/test/java/com/baeldung/kafka/commitfailure/batch/KafkaConsumerServiceLiveTest.java @@ -1,6 +1,7 @@ package com.baeldung.kafka.commitfailure.batch; import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.jupiter.api.Assertions.assertTrue; import java.util.Properties; import java.util.concurrent.CountDownLatch; @@ -42,11 +43,11 @@ void givenProducerMessagesAreSent_whenConsumerIsRunningAsBatch_thenConsumerThrow try (KafkaProducer producer = new KafkaProducer<>(getProducerConfig())) { for (int num = 0; num < 300; num++) { producer.send(new ProducerRecord<>("test-topic", "x1" + num, "test" + num)); - producer.flush(); } + producer.flush(); } - countDownLatch.await(30, TimeUnit.SECONDS); + assertTrue(countDownLatch.await(30, TimeUnit.SECONDS)); assertThat(uncaughtException.get()).isInstanceOf(CommitFailedException.class); kafkaConsumerService.shutdown(); @@ -64,7 +65,7 @@ private static Properties getProducerConfig() { private static Properties getConsumerConfig() { Properties consumerProperties = new Properties(); consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_CONTAINER.getBootstrapServers()); - consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, "test-batch-consumer-app"); + consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, "consumer-app"); consumerProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); consumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); consumerProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); diff --git a/apache-kafka-4/src/test/java/com/baeldung/kafka/commitfailure/fixed/async/KafkaConsumerServiceLiveTest.java b/apache-kafka-4/src/test/java/com/baeldung/kafka/commitfailure/fixed/async/KafkaConsumerServiceLiveTest.java index 0f6523cc2eaa..43d66468703a 100644 --- a/apache-kafka-4/src/test/java/com/baeldung/kafka/commitfailure/fixed/async/KafkaConsumerServiceLiveTest.java +++ b/apache-kafka-4/src/test/java/com/baeldung/kafka/commitfailure/fixed/async/KafkaConsumerServiceLiveTest.java @@ -1,18 +1,27 @@ package com.baeldung.kafka.commitfailure.fixed.async; import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import java.util.Map; import java.util.Properties; -import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.TimeoutException; +import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.admin.AdminClientConfig; +import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsResult; import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; +import org.awaitility.Awaitility; import org.junit.jupiter.api.Test; import org.testcontainers.containers.KafkaContainer; import org.testcontainers.junit.jupiter.Container; @@ -26,27 +35,34 @@ public class KafkaConsumerServiceLiveTest { private static final KafkaContainer KAFKA_CONTAINER = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:7.9.0")); @Test - void givenProducerMessagesAreSent_whenConsumerIsRunning_thenConsumerDoesNotThrowCommitFailedException() throws InterruptedException { - CountDownLatch countDownLatch = new CountDownLatch(1); - AtomicReference uncaughtException = new AtomicReference<>(); + void givenProducerMessagesAreSent_whenConsumerIsRunning_thenConsumerCommitsOffset() throws InterruptedException, ExecutionException, TimeoutException { KafkaConsumerService kafkaConsumerService = new KafkaConsumerService(getConsumerConfig(), "test-topic"); Thread th = new Thread(kafkaConsumerService::consume); - th.setUncaughtExceptionHandler((thread, ex) -> { - uncaughtException.set(ex); - countDownLatch.countDown(); - }); th.start(); try (KafkaProducer producer = new KafkaProducer<>(getProducerConfig())) { for (int num = 0; num < 100; num++) { producer.send(new ProducerRecord<>("test-topic", "x1" + num, "test" + num)); - producer.flush(); } + producer.flush(); } - countDownLatch.await(30, TimeUnit.SECONDS); - assertThat(uncaughtException.get()).doesNotThrowAnyException(); + Awaitility.await() + .atMost(30, TimeUnit.SECONDS) + .pollInterval(1, TimeUnit.SECONDS) + .untilAsserted(() -> { + Map committedOffsets; + try (AdminClient adminClient = AdminClient.create(getAdminProps())) { + ListConsumerGroupOffsetsResult result = adminClient.listConsumerGroupOffsets("consumer-app"); + committedOffsets = result.partitionsToOffsetAndMetadata() + .get(); + } + assertThat(committedOffsets).isNotEmpty(); + assertNotNull(committedOffsets.get(new TopicPartition("test-topic", 0))); + assertEquals(100L, committedOffsets.get(new TopicPartition("test-topic", 0)) + .offset()); + }); kafkaConsumerService.shutdown(); } @@ -63,14 +79,20 @@ private static Properties getProducerConfig() { private static Properties getConsumerConfig() { Properties consumerProperties = new Properties(); consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_CONTAINER.getBootstrapServers()); - consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, "test-async-consumer-app"); + consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, "consumer-app"); consumerProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); consumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); - consumerProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"); + consumerProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); consumerProperties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); consumerProperties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 30); consumerProperties.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 1000); return consumerProperties; } + + private static Properties getAdminProps() { + Properties props = new Properties(); + props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_CONTAINER.getBootstrapServers()); + return props; + } } diff --git a/apache-kafka-4/src/test/java/com/baeldung/kafka/commitfailure/fixed/batch/KafkaConsumerServiceLiveTest.java b/apache-kafka-4/src/test/java/com/baeldung/kafka/commitfailure/fixed/batch/KafkaConsumerServiceLiveTest.java index 25e3e5aa90fc..3289814398b5 100644 --- a/apache-kafka-4/src/test/java/com/baeldung/kafka/commitfailure/fixed/batch/KafkaConsumerServiceLiveTest.java +++ b/apache-kafka-4/src/test/java/com/baeldung/kafka/commitfailure/fixed/batch/KafkaConsumerServiceLiveTest.java @@ -1,18 +1,27 @@ package com.baeldung.kafka.commitfailure.fixed.batch; import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import java.util.Map; import java.util.Properties; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; +import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.admin.AdminClientConfig; +import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsResult; import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; +import org.awaitility.Awaitility; import org.junit.jupiter.api.Test; import org.testcontainers.containers.KafkaContainer; import org.testcontainers.junit.jupiter.Container; @@ -26,27 +35,34 @@ public class KafkaConsumerServiceLiveTest { private static final KafkaContainer KAFKA_CONTAINER = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:7.9.0")); @Test - void givenProducerMessagesAreSent_whenConsumerIsRunningAsBatch_thenConsumesDoesNotThrowCommitFailedException() throws InterruptedException { - CountDownLatch countDownLatch = new CountDownLatch(1); - AtomicReference uncaughtException = new AtomicReference<>(); - + void givenProducerMessagesAreSent_whenConsumerIsRunningAsBatch_thenConsumerCommitsOffset() { KafkaConsumerService kafkaConsumerService = new KafkaConsumerService(getConsumerConfig(), "test-topic"); Thread th = new Thread(kafkaConsumerService::consume); - th.setUncaughtExceptionHandler((thread, ex) -> { - uncaughtException.set(ex); - countDownLatch.countDown(); - }); th.start(); try (KafkaProducer producer = new KafkaProducer<>(getProducerConfig())) { for (int num = 0; num < 300; num++) { producer.send(new ProducerRecord<>("test-topic", "x1" + num, "test" + num)); - producer.flush(); } + producer.flush(); } - countDownLatch.await(30, TimeUnit.SECONDS); - assertThat(uncaughtException.get()).doesNotThrowAnyException(); + Awaitility.await() + .atMost(30, TimeUnit.SECONDS) + .pollInterval(1, TimeUnit.SECONDS) + .untilAsserted(() -> { + Map committedOffsets; + try (AdminClient adminClient = AdminClient.create(getAdminProps())) { + ListConsumerGroupOffsetsResult result = adminClient.listConsumerGroupOffsets("consumer-app"); + committedOffsets = result.partitionsToOffsetAndMetadata() + .get(); + } + assertThat(committedOffsets).isNotEmpty(); + assertNotNull(committedOffsets.get(new TopicPartition("test-topic", 0))); + assertEquals(300L, committedOffsets.get(new TopicPartition("test-topic", 0)) + .offset()); + }); + kafkaConsumerService.shutdown(); } @@ -62,7 +78,7 @@ private static Properties getProducerConfig() { private static Properties getConsumerConfig() { Properties consumerProperties = new Properties(); consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_CONTAINER.getBootstrapServers()); - consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, "test-batch-consumer-app-fixed"); + consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, "consumer-app"); consumerProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); consumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); consumerProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); @@ -72,4 +88,10 @@ private static Properties getConsumerConfig() { return consumerProperties; } + + private static Properties getAdminProps() { + Properties props = new Properties(); + props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_CONTAINER.getBootstrapServers()); + return props; + } } diff --git a/apache-kafka-4/src/test/java/com/baeldung/kafka/commitfailure/fixed/sequential/KafkaConsumerServiceLiveTest.java b/apache-kafka-4/src/test/java/com/baeldung/kafka/commitfailure/fixed/sequential/KafkaConsumerServiceLiveTest.java index e51ae03f379b..07755754174b 100644 --- a/apache-kafka-4/src/test/java/com/baeldung/kafka/commitfailure/fixed/sequential/KafkaConsumerServiceLiveTest.java +++ b/apache-kafka-4/src/test/java/com/baeldung/kafka/commitfailure/fixed/sequential/KafkaConsumerServiceLiveTest.java @@ -1,18 +1,25 @@ package com.baeldung.kafka.commitfailure.fixed.sequential; import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import java.util.Map; import java.util.Properties; -import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicReference; +import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.admin.AdminClientConfig; +import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsResult; import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; +import org.awaitility.Awaitility; import org.junit.jupiter.api.Test; import org.testcontainers.containers.KafkaContainer; import org.testcontainers.junit.jupiter.Container; @@ -28,15 +35,8 @@ public class KafkaConsumerServiceLiveTest { @Test void givenProducerMessagesIsSent_whenConsumerIsRunning_thenConsumerDoesNotThrowsCommitFailedException() throws InterruptedException { KafkaConsumerService kafkaConsumerService = new KafkaConsumerService(getConsumerConfig(), "test-topic"); - CountDownLatch countDownLatch = new CountDownLatch(1); - AtomicReference uncaughtException = new AtomicReference<>(); Thread th = new Thread(kafkaConsumerService::consume); - th.setUncaughtExceptionHandler((thread, ex) -> { - uncaughtException.set(ex); - countDownLatch.countDown(); - }); - th.start(); try (KafkaProducer producer = new KafkaProducer<>(getProducerConfig())) { @@ -44,8 +44,21 @@ void givenProducerMessagesIsSent_whenConsumerIsRunning_thenConsumerDoesNotThrows producer.flush(); } - countDownLatch.await(10, TimeUnit.SECONDS); - assertThat(uncaughtException.get()).doesNotThrowAnyException(); + Awaitility.await() + .atMost(30, TimeUnit.SECONDS) + .pollInterval(1, TimeUnit.SECONDS) + .untilAsserted(() -> { + Map committedOffsets; + try (AdminClient adminClient = AdminClient.create(getAdminProps())) { + ListConsumerGroupOffsetsResult result = adminClient.listConsumerGroupOffsets("consumer-app"); + committedOffsets = result.partitionsToOffsetAndMetadata() + .get(10, TimeUnit.SECONDS); + } + assertThat(committedOffsets).isNotEmpty(); + assertNotNull(committedOffsets.get(new TopicPartition("test-topic", 0))); + assertEquals(1L, committedOffsets.get(new TopicPartition("test-topic", 0)) + .offset()); + }); kafkaConsumerService.shutdown(); } @@ -62,7 +75,7 @@ private static Properties getProducerConfig() { private static Properties getConsumerConfig() { Properties consumerProperties = new Properties(); consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_CONTAINER.getBootstrapServers()); - consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, "test-seq-consumer-app-fixed"); + consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, "consumer-app"); consumerProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); consumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); consumerProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); @@ -72,4 +85,10 @@ private static Properties getConsumerConfig() { return consumerProperties; } + + private static Properties getAdminProps() { + Properties props = new Properties(); + props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_CONTAINER.getBootstrapServers()); + return props; + } } diff --git a/apache-kafka-4/src/test/java/com/baeldung/kafka/commitfailure/sequential/KafkaConsumerServiceLiveTest.java b/apache-kafka-4/src/test/java/com/baeldung/kafka/commitfailure/sequential/KafkaConsumerServiceLiveTest.java index f4648c8e0c0c..7ea37a3d9ee4 100644 --- a/apache-kafka-4/src/test/java/com/baeldung/kafka/commitfailure/sequential/KafkaConsumerServiceLiveTest.java +++ b/apache-kafka-4/src/test/java/com/baeldung/kafka/commitfailure/sequential/KafkaConsumerServiceLiveTest.java @@ -1,6 +1,7 @@ package com.baeldung.kafka.commitfailure.sequential; import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.jupiter.api.Assertions.assertTrue; import java.util.Properties; import java.util.concurrent.CountDownLatch; @@ -27,7 +28,8 @@ public class KafkaConsumerServiceLiveTest { private static final KafkaContainer KAFKA_CONTAINER = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:7.9.0")); @Test - void givenProducerMessageIsSent_whenConsumerIsRunning_thenConsumerThrowsCommitFailedException() throws InterruptedException { + void givenProducerMessageIsSent_whenConsumerIsRunning_thenConsumerThrowsCommitFailedException() + throws InterruptedException { KafkaConsumerService kafkaConsumerService = new KafkaConsumerService(getConsumerConfig(), "test-topic"); CountDownLatch countDownLatch = new CountDownLatch(1); AtomicReference uncaughtException = new AtomicReference<>(); @@ -45,8 +47,10 @@ void givenProducerMessageIsSent_whenConsumerIsRunning_thenConsumerThrowsCommitFa producer.flush(); } - countDownLatch.await(30, TimeUnit.SECONDS); - assertThat(uncaughtException.get()).isInstanceOf(CommitFailedException.class); + assertTrue(countDownLatch.await(30, TimeUnit.SECONDS)); + assertThat(uncaughtException.get()).isNotNull() + .isInstanceOf(CommitFailedException.class); + assertThat(th.isAlive()).isFalse(); kafkaConsumerService.shutdown(); } @@ -63,8 +67,7 @@ private static Properties getProducerConfig() { private static Properties getConsumerConfig() { Properties consumerProperties = new Properties(); consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_CONTAINER.getBootstrapServers()); - consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, "seq-consumer-app"); - consumerProperties.put(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, "consumer-1"); + consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, "consumer-app"); consumerProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); consumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); consumerProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); From e26bbac685c497058746fbb7602b34ce785f60bc Mon Sep 17 00:00:00 2001 From: saikatcse03 Date: Mon, 8 Jun 2026 17:35:24 +0530 Subject: [PATCH 05/12] code refactor for tests --- .../fixed/async/KafkaConsumerServiceLiveTest.java | 9 +++------ .../fixed/batch/KafkaConsumerServiceLiveTest.java | 4 ++-- .../fixed/sequential/KafkaConsumerServiceLiveTest.java | 5 ++--- .../sequential/KafkaConsumerServiceLiveTest.java | 1 - 4 files changed, 7 insertions(+), 12 deletions(-) diff --git a/apache-kafka-4/src/test/java/com/baeldung/kafka/commitfailure/fixed/async/KafkaConsumerServiceLiveTest.java b/apache-kafka-4/src/test/java/com/baeldung/kafka/commitfailure/fixed/async/KafkaConsumerServiceLiveTest.java index 43d66468703a..0204d7c57c6c 100644 --- a/apache-kafka-4/src/test/java/com/baeldung/kafka/commitfailure/fixed/async/KafkaConsumerServiceLiveTest.java +++ b/apache-kafka-4/src/test/java/com/baeldung/kafka/commitfailure/fixed/async/KafkaConsumerServiceLiveTest.java @@ -1,14 +1,11 @@ package com.baeldung.kafka.commitfailure.fixed.async; -import static org.assertj.core.api.Assertions.assertThat; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; import java.util.Map; import java.util.Properties; -import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.clients.admin.AdminClientConfig; @@ -35,7 +32,7 @@ public class KafkaConsumerServiceLiveTest { private static final KafkaContainer KAFKA_CONTAINER = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:7.9.0")); @Test - void givenProducerMessagesAreSent_whenConsumerIsRunning_thenConsumerCommitsOffset() throws InterruptedException, ExecutionException, TimeoutException { + void givenProducerMessagesAreSent_whenConsumerIsRunning_thenConsumerOffsetIsCommitted() { KafkaConsumerService kafkaConsumerService = new KafkaConsumerService(getConsumerConfig(), "test-topic"); Thread th = new Thread(kafkaConsumerService::consume); @@ -58,7 +55,7 @@ void givenProducerMessagesAreSent_whenConsumerIsRunning_thenConsumerCommitsOffse committedOffsets = result.partitionsToOffsetAndMetadata() .get(); } - assertThat(committedOffsets).isNotEmpty(); + assertNotNull(committedOffsets); assertNotNull(committedOffsets.get(new TopicPartition("test-topic", 0))); assertEquals(100L, committedOffsets.get(new TopicPartition("test-topic", 0)) .offset()); @@ -79,7 +76,7 @@ private static Properties getProducerConfig() { private static Properties getConsumerConfig() { Properties consumerProperties = new Properties(); consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_CONTAINER.getBootstrapServers()); - consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, "consumer-app"); + consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, "-consumer-app"); consumerProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); consumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); consumerProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); diff --git a/apache-kafka-4/src/test/java/com/baeldung/kafka/commitfailure/fixed/batch/KafkaConsumerServiceLiveTest.java b/apache-kafka-4/src/test/java/com/baeldung/kafka/commitfailure/fixed/batch/KafkaConsumerServiceLiveTest.java index 3289814398b5..92207bafa51e 100644 --- a/apache-kafka-4/src/test/java/com/baeldung/kafka/commitfailure/fixed/batch/KafkaConsumerServiceLiveTest.java +++ b/apache-kafka-4/src/test/java/com/baeldung/kafka/commitfailure/fixed/batch/KafkaConsumerServiceLiveTest.java @@ -35,7 +35,7 @@ public class KafkaConsumerServiceLiveTest { private static final KafkaContainer KAFKA_CONTAINER = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:7.9.0")); @Test - void givenProducerMessagesAreSent_whenConsumerIsRunningAsBatch_thenConsumerCommitsOffset() { + void givenProducerMessagesAreSent_whenConsumerIsRunning_thenConsumerOffsetIsCommitted() { KafkaConsumerService kafkaConsumerService = new KafkaConsumerService(getConsumerConfig(), "test-topic"); Thread th = new Thread(kafkaConsumerService::consume); th.start(); @@ -57,7 +57,7 @@ void givenProducerMessagesAreSent_whenConsumerIsRunningAsBatch_thenConsumerCommi committedOffsets = result.partitionsToOffsetAndMetadata() .get(); } - assertThat(committedOffsets).isNotEmpty(); + assertNotNull(committedOffsets); assertNotNull(committedOffsets.get(new TopicPartition("test-topic", 0))); assertEquals(300L, committedOffsets.get(new TopicPartition("test-topic", 0)) .offset()); diff --git a/apache-kafka-4/src/test/java/com/baeldung/kafka/commitfailure/fixed/sequential/KafkaConsumerServiceLiveTest.java b/apache-kafka-4/src/test/java/com/baeldung/kafka/commitfailure/fixed/sequential/KafkaConsumerServiceLiveTest.java index 07755754174b..a96d7c636ab5 100644 --- a/apache-kafka-4/src/test/java/com/baeldung/kafka/commitfailure/fixed/sequential/KafkaConsumerServiceLiveTest.java +++ b/apache-kafka-4/src/test/java/com/baeldung/kafka/commitfailure/fixed/sequential/KafkaConsumerServiceLiveTest.java @@ -1,6 +1,5 @@ package com.baeldung.kafka.commitfailure.fixed.sequential; -import static org.assertj.core.api.Assertions.assertThat; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; @@ -33,7 +32,7 @@ public class KafkaConsumerServiceLiveTest { private static final KafkaContainer KAFKA_CONTAINER = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:7.9.0")); @Test - void givenProducerMessagesIsSent_whenConsumerIsRunning_thenConsumerDoesNotThrowsCommitFailedException() throws InterruptedException { + void givenProducerMessagesAreSent_whenConsumerIsRunning_thenConsumerOffsetIsCommitted() throws InterruptedException { KafkaConsumerService kafkaConsumerService = new KafkaConsumerService(getConsumerConfig(), "test-topic"); Thread th = new Thread(kafkaConsumerService::consume); @@ -54,7 +53,7 @@ void givenProducerMessagesIsSent_whenConsumerIsRunning_thenConsumerDoesNotThrows committedOffsets = result.partitionsToOffsetAndMetadata() .get(10, TimeUnit.SECONDS); } - assertThat(committedOffsets).isNotEmpty(); + assertNotNull(committedOffsets); assertNotNull(committedOffsets.get(new TopicPartition("test-topic", 0))); assertEquals(1L, committedOffsets.get(new TopicPartition("test-topic", 0)) .offset()); diff --git a/apache-kafka-4/src/test/java/com/baeldung/kafka/commitfailure/sequential/KafkaConsumerServiceLiveTest.java b/apache-kafka-4/src/test/java/com/baeldung/kafka/commitfailure/sequential/KafkaConsumerServiceLiveTest.java index 7ea37a3d9ee4..942dffc00a26 100644 --- a/apache-kafka-4/src/test/java/com/baeldung/kafka/commitfailure/sequential/KafkaConsumerServiceLiveTest.java +++ b/apache-kafka-4/src/test/java/com/baeldung/kafka/commitfailure/sequential/KafkaConsumerServiceLiveTest.java @@ -50,7 +50,6 @@ void givenProducerMessageIsSent_whenConsumerIsRunning_thenConsumerThrowsCommitFa assertTrue(countDownLatch.await(30, TimeUnit.SECONDS)); assertThat(uncaughtException.get()).isNotNull() .isInstanceOf(CommitFailedException.class); - assertThat(th.isAlive()).isFalse(); kafkaConsumerService.shutdown(); } From 47045635a88208779fc07f5ef4c18a80a3b83a40 Mon Sep 17 00:00:00 2001 From: saikatcse03 Date: Mon, 8 Jun 2026 17:36:20 +0530 Subject: [PATCH 06/12] code refactor for tests --- .../fixed/batch/KafkaConsumerServiceLiveTest.java | 3 --- 1 file changed, 3 deletions(-) diff --git a/apache-kafka-4/src/test/java/com/baeldung/kafka/commitfailure/fixed/batch/KafkaConsumerServiceLiveTest.java b/apache-kafka-4/src/test/java/com/baeldung/kafka/commitfailure/fixed/batch/KafkaConsumerServiceLiveTest.java index 92207bafa51e..bc31138f9982 100644 --- a/apache-kafka-4/src/test/java/com/baeldung/kafka/commitfailure/fixed/batch/KafkaConsumerServiceLiveTest.java +++ b/apache-kafka-4/src/test/java/com/baeldung/kafka/commitfailure/fixed/batch/KafkaConsumerServiceLiveTest.java @@ -1,14 +1,11 @@ package com.baeldung.kafka.commitfailure.fixed.batch; -import static org.assertj.core.api.Assertions.assertThat; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; import java.util.Map; import java.util.Properties; -import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicReference; import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.clients.admin.AdminClientConfig; From bdbca8db7a055b747bee139713e6ff28527748ec Mon Sep 17 00:00:00 2001 From: saikatcse03 Date: Tue, 9 Jun 2026 16:18:13 +0530 Subject: [PATCH 07/12] test refactoring --- .../sequential/KafkaConsumerService.java | 2 +- .../batch/KafkaConsumerServiceLiveTest.java | 6 +++--- .../fixed/async/KafkaConsumerServiceLiveTest.java | 12 +++++++----- .../fixed/batch/KafkaConsumerServiceLiveTest.java | 10 +++++++--- .../sequential/KafkaConsumerServiceLiveTest.java | 15 +++++++++------ .../sequential/KafkaConsumerServiceLiveTest.java | 8 +++----- 6 files changed, 30 insertions(+), 23 deletions(-) diff --git a/apache-kafka-4/src/main/java/com/baeldung/kafka/commitfailure/sequential/KafkaConsumerService.java b/apache-kafka-4/src/main/java/com/baeldung/kafka/commitfailure/sequential/KafkaConsumerService.java index 39dea62a5a6e..63d08c339361 100644 --- a/apache-kafka-4/src/main/java/com/baeldung/kafka/commitfailure/sequential/KafkaConsumerService.java +++ b/apache-kafka-4/src/main/java/com/baeldung/kafka/commitfailure/sequential/KafkaConsumerService.java @@ -51,7 +51,7 @@ public void shutdown() { private void simulateDBCall(ConsumerRecord record) { try { log.info("Simulating a DB call - record key {} value {}", record.key(), record.value()); - Thread.sleep(100L); + Thread.sleep(150L); } catch (InterruptedException ex) { Thread.currentThread() .interrupt(); diff --git a/apache-kafka-4/src/test/java/com/baeldung/kafka/commitfailure/batch/KafkaConsumerServiceLiveTest.java b/apache-kafka-4/src/test/java/com/baeldung/kafka/commitfailure/batch/KafkaConsumerServiceLiveTest.java index 069b8a224d1c..7c3b7cb12cc7 100644 --- a/apache-kafka-4/src/test/java/com/baeldung/kafka/commitfailure/batch/KafkaConsumerServiceLiveTest.java +++ b/apache-kafka-4/src/test/java/com/baeldung/kafka/commitfailure/batch/KafkaConsumerServiceLiveTest.java @@ -1,7 +1,6 @@ package com.baeldung.kafka.commitfailure.batch; import static org.assertj.core.api.Assertions.assertThat; -import static org.junit.jupiter.api.Assertions.assertTrue; import java.util.Properties; import java.util.concurrent.CountDownLatch; @@ -47,8 +46,9 @@ void givenProducerMessagesAreSent_whenConsumerIsRunningAsBatch_thenConsumerThrow producer.flush(); } - assertTrue(countDownLatch.await(30, TimeUnit.SECONDS)); - assertThat(uncaughtException.get()).isInstanceOf(CommitFailedException.class); + countDownLatch.await(30, TimeUnit.SECONDS); + assertThat(uncaughtException.get()).isNotNull() + .isInstanceOf(CommitFailedException.class); kafkaConsumerService.shutdown(); } diff --git a/apache-kafka-4/src/test/java/com/baeldung/kafka/commitfailure/fixed/async/KafkaConsumerServiceLiveTest.java b/apache-kafka-4/src/test/java/com/baeldung/kafka/commitfailure/fixed/async/KafkaConsumerServiceLiveTest.java index 0204d7c57c6c..a73fc99474c9 100644 --- a/apache-kafka-4/src/test/java/com/baeldung/kafka/commitfailure/fixed/async/KafkaConsumerServiceLiveTest.java +++ b/apache-kafka-4/src/test/java/com/baeldung/kafka/commitfailure/fixed/async/KafkaConsumerServiceLiveTest.java @@ -34,7 +34,6 @@ public class KafkaConsumerServiceLiveTest { @Test void givenProducerMessagesAreSent_whenConsumerIsRunning_thenConsumerOffsetIsCommitted() { KafkaConsumerService kafkaConsumerService = new KafkaConsumerService(getConsumerConfig(), "test-topic"); - Thread th = new Thread(kafkaConsumerService::consume); th.start(); @@ -47,17 +46,20 @@ void givenProducerMessagesAreSent_whenConsumerIsRunning_thenConsumerOffsetIsComm Awaitility.await() .atMost(30, TimeUnit.SECONDS) - .pollInterval(1, TimeUnit.SECONDS) + .pollInterval(2, TimeUnit.SECONDS) .untilAsserted(() -> { + TopicPartition topicPartition = new TopicPartition("test-topic", 0); Map committedOffsets; + try (AdminClient adminClient = AdminClient.create(getAdminProps())) { ListConsumerGroupOffsetsResult result = adminClient.listConsumerGroupOffsets("consumer-app"); committedOffsets = result.partitionsToOffsetAndMetadata() .get(); } + assertNotNull(committedOffsets); - assertNotNull(committedOffsets.get(new TopicPartition("test-topic", 0))); - assertEquals(100L, committedOffsets.get(new TopicPartition("test-topic", 0)) + assertNotNull(committedOffsets.get(topicPartition)); + assertEquals(100L, committedOffsets.get(topicPartition) .offset()); }); @@ -76,7 +78,7 @@ private static Properties getProducerConfig() { private static Properties getConsumerConfig() { Properties consumerProperties = new Properties(); consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_CONTAINER.getBootstrapServers()); - consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, "-consumer-app"); + consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, "consumer-app"); consumerProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); consumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); consumerProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); diff --git a/apache-kafka-4/src/test/java/com/baeldung/kafka/commitfailure/fixed/batch/KafkaConsumerServiceLiveTest.java b/apache-kafka-4/src/test/java/com/baeldung/kafka/commitfailure/fixed/batch/KafkaConsumerServiceLiveTest.java index bc31138f9982..61910c000100 100644 --- a/apache-kafka-4/src/test/java/com/baeldung/kafka/commitfailure/fixed/batch/KafkaConsumerServiceLiveTest.java +++ b/apache-kafka-4/src/test/java/com/baeldung/kafka/commitfailure/fixed/batch/KafkaConsumerServiceLiveTest.java @@ -46,17 +46,21 @@ void givenProducerMessagesAreSent_whenConsumerIsRunning_thenConsumerOffsetIsComm Awaitility.await() .atMost(30, TimeUnit.SECONDS) - .pollInterval(1, TimeUnit.SECONDS) + .pollInterval(2, TimeUnit.SECONDS) .untilAsserted(() -> { + TopicPartition topicPartition = new TopicPartition("test-topic", 0); Map committedOffsets; + try (AdminClient adminClient = AdminClient.create(getAdminProps())) { ListConsumerGroupOffsetsResult result = adminClient.listConsumerGroupOffsets("consumer-app"); committedOffsets = result.partitionsToOffsetAndMetadata() .get(); } + assertNotNull(committedOffsets); - assertNotNull(committedOffsets.get(new TopicPartition("test-topic", 0))); - assertEquals(300L, committedOffsets.get(new TopicPartition("test-topic", 0)) + assertEquals(1, committedOffsets.size()); + assertNotNull(committedOffsets.get(topicPartition)); + assertEquals(300L, committedOffsets.get(topicPartition) .offset()); }); diff --git a/apache-kafka-4/src/test/java/com/baeldung/kafka/commitfailure/fixed/sequential/KafkaConsumerServiceLiveTest.java b/apache-kafka-4/src/test/java/com/baeldung/kafka/commitfailure/fixed/sequential/KafkaConsumerServiceLiveTest.java index a96d7c636ab5..7f79bad26756 100644 --- a/apache-kafka-4/src/test/java/com/baeldung/kafka/commitfailure/fixed/sequential/KafkaConsumerServiceLiveTest.java +++ b/apache-kafka-4/src/test/java/com/baeldung/kafka/commitfailure/fixed/sequential/KafkaConsumerServiceLiveTest.java @@ -32,9 +32,8 @@ public class KafkaConsumerServiceLiveTest { private static final KafkaContainer KAFKA_CONTAINER = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:7.9.0")); @Test - void givenProducerMessagesAreSent_whenConsumerIsRunning_thenConsumerOffsetIsCommitted() throws InterruptedException { + void givenProducerMessagesAreSent_whenConsumerIsRunning_thenConsumerOffsetIsCommitted() { KafkaConsumerService kafkaConsumerService = new KafkaConsumerService(getConsumerConfig(), "test-topic"); - Thread th = new Thread(kafkaConsumerService::consume); th.start(); @@ -45,17 +44,21 @@ void givenProducerMessagesAreSent_whenConsumerIsRunning_thenConsumerOffsetIsComm Awaitility.await() .atMost(30, TimeUnit.SECONDS) - .pollInterval(1, TimeUnit.SECONDS) + .pollInterval(2, TimeUnit.SECONDS) .untilAsserted(() -> { + TopicPartition topicPartition = new TopicPartition("test-topic", 0); Map committedOffsets; + try (AdminClient adminClient = AdminClient.create(getAdminProps())) { ListConsumerGroupOffsetsResult result = adminClient.listConsumerGroupOffsets("consumer-app"); committedOffsets = result.partitionsToOffsetAndMetadata() - .get(10, TimeUnit.SECONDS); + .get(); } + assertNotNull(committedOffsets); - assertNotNull(committedOffsets.get(new TopicPartition("test-topic", 0))); - assertEquals(1L, committedOffsets.get(new TopicPartition("test-topic", 0)) + assertEquals(1, committedOffsets.size()); + assertNotNull(committedOffsets.get(topicPartition)); + assertEquals(1L, committedOffsets.get(topicPartition) .offset()); }); diff --git a/apache-kafka-4/src/test/java/com/baeldung/kafka/commitfailure/sequential/KafkaConsumerServiceLiveTest.java b/apache-kafka-4/src/test/java/com/baeldung/kafka/commitfailure/sequential/KafkaConsumerServiceLiveTest.java index 942dffc00a26..399ce1b23940 100644 --- a/apache-kafka-4/src/test/java/com/baeldung/kafka/commitfailure/sequential/KafkaConsumerServiceLiveTest.java +++ b/apache-kafka-4/src/test/java/com/baeldung/kafka/commitfailure/sequential/KafkaConsumerServiceLiveTest.java @@ -1,7 +1,6 @@ package com.baeldung.kafka.commitfailure.sequential; import static org.assertj.core.api.Assertions.assertThat; -import static org.junit.jupiter.api.Assertions.assertTrue; import java.util.Properties; import java.util.concurrent.CountDownLatch; @@ -28,12 +27,11 @@ public class KafkaConsumerServiceLiveTest { private static final KafkaContainer KAFKA_CONTAINER = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:7.9.0")); @Test - void givenProducerMessageIsSent_whenConsumerIsRunning_thenConsumerThrowsCommitFailedException() - throws InterruptedException { - KafkaConsumerService kafkaConsumerService = new KafkaConsumerService(getConsumerConfig(), "test-topic"); + void givenProducerMessageIsSent_whenConsumerIsRunning_thenConsumerThrowsCommitFailedException() throws InterruptedException { CountDownLatch countDownLatch = new CountDownLatch(1); AtomicReference uncaughtException = new AtomicReference<>(); + KafkaConsumerService kafkaConsumerService = new KafkaConsumerService(getConsumerConfig(), "test-topic"); Thread th = new Thread(kafkaConsumerService::consume); th.setUncaughtExceptionHandler((thread, ex) -> { uncaughtException.set(ex); @@ -47,7 +45,7 @@ void givenProducerMessageIsSent_whenConsumerIsRunning_thenConsumerThrowsCommitFa producer.flush(); } - assertTrue(countDownLatch.await(30, TimeUnit.SECONDS)); + countDownLatch.await(30, TimeUnit.SECONDS); assertThat(uncaughtException.get()).isNotNull() .isInstanceOf(CommitFailedException.class); From 6be6c9963aac90eb498bc990ccba9d28e6892382 Mon Sep 17 00:00:00 2001 From: saikatcse03 Date: Tue, 9 Jun 2026 16:25:10 +0530 Subject: [PATCH 08/12] remove redundant assertion --- .../commitfailure/fixed/batch/KafkaConsumerServiceLiveTest.java | 1 - .../fixed/sequential/KafkaConsumerServiceLiveTest.java | 1 - 2 files changed, 2 deletions(-) diff --git a/apache-kafka-4/src/test/java/com/baeldung/kafka/commitfailure/fixed/batch/KafkaConsumerServiceLiveTest.java b/apache-kafka-4/src/test/java/com/baeldung/kafka/commitfailure/fixed/batch/KafkaConsumerServiceLiveTest.java index 61910c000100..fd93f0607167 100644 --- a/apache-kafka-4/src/test/java/com/baeldung/kafka/commitfailure/fixed/batch/KafkaConsumerServiceLiveTest.java +++ b/apache-kafka-4/src/test/java/com/baeldung/kafka/commitfailure/fixed/batch/KafkaConsumerServiceLiveTest.java @@ -58,7 +58,6 @@ void givenProducerMessagesAreSent_whenConsumerIsRunning_thenConsumerOffsetIsComm } assertNotNull(committedOffsets); - assertEquals(1, committedOffsets.size()); assertNotNull(committedOffsets.get(topicPartition)); assertEquals(300L, committedOffsets.get(topicPartition) .offset()); diff --git a/apache-kafka-4/src/test/java/com/baeldung/kafka/commitfailure/fixed/sequential/KafkaConsumerServiceLiveTest.java b/apache-kafka-4/src/test/java/com/baeldung/kafka/commitfailure/fixed/sequential/KafkaConsumerServiceLiveTest.java index 7f79bad26756..fbee18899612 100644 --- a/apache-kafka-4/src/test/java/com/baeldung/kafka/commitfailure/fixed/sequential/KafkaConsumerServiceLiveTest.java +++ b/apache-kafka-4/src/test/java/com/baeldung/kafka/commitfailure/fixed/sequential/KafkaConsumerServiceLiveTest.java @@ -56,7 +56,6 @@ void givenProducerMessagesAreSent_whenConsumerIsRunning_thenConsumerOffsetIsComm } assertNotNull(committedOffsets); - assertEquals(1, committedOffsets.size()); assertNotNull(committedOffsets.get(topicPartition)); assertEquals(1L, committedOffsets.get(topicPartition) .offset()); From 8b88ebc14ecbff4d803482528e76e38c1ed13071 Mon Sep 17 00:00:00 2001 From: saikatcse03 Date: Tue, 9 Jun 2026 16:33:49 +0530 Subject: [PATCH 09/12] assert include for countdownlatch --- .../commitfailure/batch/KafkaConsumerServiceLiveTest.java | 3 ++- .../commitfailure/sequential/KafkaConsumerServiceLiveTest.java | 3 ++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/apache-kafka-4/src/test/java/com/baeldung/kafka/commitfailure/batch/KafkaConsumerServiceLiveTest.java b/apache-kafka-4/src/test/java/com/baeldung/kafka/commitfailure/batch/KafkaConsumerServiceLiveTest.java index 7c3b7cb12cc7..ec1dc24ff842 100644 --- a/apache-kafka-4/src/test/java/com/baeldung/kafka/commitfailure/batch/KafkaConsumerServiceLiveTest.java +++ b/apache-kafka-4/src/test/java/com/baeldung/kafka/commitfailure/batch/KafkaConsumerServiceLiveTest.java @@ -1,6 +1,7 @@ package com.baeldung.kafka.commitfailure.batch; import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.jupiter.api.Assertions.assertTrue; import java.util.Properties; import java.util.concurrent.CountDownLatch; @@ -46,7 +47,7 @@ void givenProducerMessagesAreSent_whenConsumerIsRunningAsBatch_thenConsumerThrow producer.flush(); } - countDownLatch.await(30, TimeUnit.SECONDS); + assertTrue(countDownLatch.await(30, TimeUnit.SECONDS)); assertThat(uncaughtException.get()).isNotNull() .isInstanceOf(CommitFailedException.class); diff --git a/apache-kafka-4/src/test/java/com/baeldung/kafka/commitfailure/sequential/KafkaConsumerServiceLiveTest.java b/apache-kafka-4/src/test/java/com/baeldung/kafka/commitfailure/sequential/KafkaConsumerServiceLiveTest.java index 399ce1b23940..4f42f9ab9759 100644 --- a/apache-kafka-4/src/test/java/com/baeldung/kafka/commitfailure/sequential/KafkaConsumerServiceLiveTest.java +++ b/apache-kafka-4/src/test/java/com/baeldung/kafka/commitfailure/sequential/KafkaConsumerServiceLiveTest.java @@ -1,6 +1,7 @@ package com.baeldung.kafka.commitfailure.sequential; import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.jupiter.api.Assertions.assertTrue; import java.util.Properties; import java.util.concurrent.CountDownLatch; @@ -45,7 +46,7 @@ void givenProducerMessageIsSent_whenConsumerIsRunning_thenConsumerThrowsCommitFa producer.flush(); } - countDownLatch.await(30, TimeUnit.SECONDS); + assertTrue(countDownLatch.await(30, TimeUnit.SECONDS)); assertThat(uncaughtException.get()).isNotNull() .isInstanceOf(CommitFailedException.class); From efcc548bf4276758e08d4b0ff6e5a2330335578d Mon Sep 17 00:00:00 2001 From: saikatcse03 Date: Tue, 9 Jun 2026 22:57:38 +0530 Subject: [PATCH 10/12] format and time fix --- .../commitfailure/fixed/async/KafkaConsumerService.java | 6 +++--- .../fixed/sequential/KafkaConsumerService.java | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/apache-kafka-4/src/main/java/com/baeldung/kafka/commitfailure/fixed/async/KafkaConsumerService.java b/apache-kafka-4/src/main/java/com/baeldung/kafka/commitfailure/fixed/async/KafkaConsumerService.java index 2476b7d0e338..f091cba4cffe 100644 --- a/apache-kafka-4/src/main/java/com/baeldung/kafka/commitfailure/fixed/async/KafkaConsumerService.java +++ b/apache-kafka-4/src/main/java/com/baeldung/kafka/commitfailure/fixed/async/KafkaConsumerService.java @@ -68,7 +68,7 @@ public void consume() { try { CompletableFuture.allOf(futures.toArray(CompletableFuture[]::new)) - .orTimeout(700, TimeUnit.MILLISECONDS) + .orTimeout(700L, TimeUnit.MILLISECONDS) .join(); } catch (CompletionException ex) { log.error("Batch processing timed out or failed", ex); @@ -129,8 +129,8 @@ private void simulateDBUpdate(ConsumerRecord record) { private void markComplete(ConsumerRecord record) { TopicPartition tp = new TopicPartition(record.topic(), record.partition()); - committableOffsets.computeIfAbsent(tp, k -> new AtomicLong(-1)) - .accumulateAndGet(record.offset() + 1, Math::max); + committableOffsets.computeIfAbsent(tp, k -> new AtomicLong(-1L)) + .accumulateAndGet(record.offset() + 1L, Math::max); } private void commitOffsets() { diff --git a/apache-kafka-4/src/main/java/com/baeldung/kafka/commitfailure/fixed/sequential/KafkaConsumerService.java b/apache-kafka-4/src/main/java/com/baeldung/kafka/commitfailure/fixed/sequential/KafkaConsumerService.java index af142471fb58..0d19d3817d47 100644 --- a/apache-kafka-4/src/main/java/com/baeldung/kafka/commitfailure/fixed/sequential/KafkaConsumerService.java +++ b/apache-kafka-4/src/main/java/com/baeldung/kafka/commitfailure/fixed/sequential/KafkaConsumerService.java @@ -51,7 +51,7 @@ public void shutdown() { private void simulateDBUpdate(ConsumerRecord record) { try { log.info("Simulating a db call - record key {} value {}", record.key(), record.value()); - Thread.sleep(100L); + Thread.sleep(150L); } catch (InterruptedException ex) { Thread.currentThread() .interrupt(); From 613053099b90908049f3b7411d89d219768a1403 Mon Sep 17 00:00:00 2001 From: saikatcse03 Date: Fri, 12 Jun 2026 11:42:32 +0530 Subject: [PATCH 11/12] removed batch related service and tests --- .../batch/KafkaConsumerService.java | 60 ------------ .../fixed/batch/KafkaConsumerService.java | 60 ------------ .../batch/KafkaConsumerServiceLiveTest.java | 79 --------------- .../batch/KafkaConsumerServiceLiveTest.java | 97 ------------------- 4 files changed, 296 deletions(-) delete mode 100644 apache-kafka-4/src/main/java/com/baeldung/kafka/commitfailure/batch/KafkaConsumerService.java delete mode 100644 apache-kafka-4/src/main/java/com/baeldung/kafka/commitfailure/fixed/batch/KafkaConsumerService.java delete mode 100644 apache-kafka-4/src/test/java/com/baeldung/kafka/commitfailure/batch/KafkaConsumerServiceLiveTest.java delete mode 100644 apache-kafka-4/src/test/java/com/baeldung/kafka/commitfailure/fixed/batch/KafkaConsumerServiceLiveTest.java diff --git a/apache-kafka-4/src/main/java/com/baeldung/kafka/commitfailure/batch/KafkaConsumerService.java b/apache-kafka-4/src/main/java/com/baeldung/kafka/commitfailure/batch/KafkaConsumerService.java deleted file mode 100644 index 343f3e584409..000000000000 --- a/apache-kafka-4/src/main/java/com/baeldung/kafka/commitfailure/batch/KafkaConsumerService.java +++ /dev/null @@ -1,60 +0,0 @@ -package com.baeldung.kafka.commitfailure.batch; - -import java.time.Duration; -import java.util.List; -import java.util.Properties; -import java.util.concurrent.atomic.AtomicBoolean; - -import org.apache.kafka.clients.consumer.ConsumerRecords; -import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.apache.kafka.common.errors.WakeupException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class KafkaConsumerService { - - private static final Logger log = LoggerFactory.getLogger(KafkaConsumerService.class); - private final KafkaConsumer consumer; - private final AtomicBoolean running = new AtomicBoolean(true); - - public KafkaConsumerService(Properties consumerProps, String topic) { - this.consumer = new KafkaConsumer<>(consumerProps); - consumer.subscribe(List.of(topic)); - } - - public void consume() { - try { - while (running.get()) { - ConsumerRecords records = consumer.poll(Duration.ofMillis(250)); - if (records.isEmpty()) { - continue; - } - simulateDBCall(records); - consumer.commitSync(); - } - } catch (WakeupException ex) { - if (running.get()) { - log.error("Error in the Kafka Consumer with exception", ex); - throw ex; - } - } finally { - consumer.close(); - } - } - - public void shutdown() { - running.compareAndSet(true, false); - consumer.wakeup(); - } - - private void simulateDBCall(ConsumerRecords records) { - try { - log.info("Simulating long running batch db update for records {}", records.count()); - Thread.sleep(1000L); - } catch (InterruptedException ex) { - Thread.currentThread() - .interrupt(); - throw new RuntimeException(ex); - } - } -} diff --git a/apache-kafka-4/src/main/java/com/baeldung/kafka/commitfailure/fixed/batch/KafkaConsumerService.java b/apache-kafka-4/src/main/java/com/baeldung/kafka/commitfailure/fixed/batch/KafkaConsumerService.java deleted file mode 100644 index 7435da74f9bf..000000000000 --- a/apache-kafka-4/src/main/java/com/baeldung/kafka/commitfailure/fixed/batch/KafkaConsumerService.java +++ /dev/null @@ -1,60 +0,0 @@ -package com.baeldung.kafka.commitfailure.fixed.batch; - -import java.time.Duration; -import java.util.List; -import java.util.Properties; -import java.util.concurrent.atomic.AtomicBoolean; - -import org.apache.kafka.clients.consumer.ConsumerRecords; -import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.apache.kafka.common.errors.WakeupException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class KafkaConsumerService { - - private static final Logger log = LoggerFactory.getLogger(KafkaConsumerService.class); - private final KafkaConsumer consumer; - private final AtomicBoolean running = new AtomicBoolean(true); - - public KafkaConsumerService(Properties consumerProps, String topic) { - this.consumer = new KafkaConsumer<>(consumerProps); - consumer.subscribe(List.of(topic)); - } - - public void consume() { - try { - while (running.get()) { - ConsumerRecords records = consumer.poll(Duration.ofMillis(100)); - if (records.isEmpty()) { - continue; - } - simulateDBUpdate(records); - consumer.commitSync(); - } - } catch (WakeupException ex) { - if (running.get()) { - log.error("Error in the Kafka Consumer with exception", ex); - throw ex; - } - } finally { - consumer.close(); - } - } - - public void shutdown() { - running.compareAndSet(true, false); - consumer.wakeup(); - } - - private void simulateDBUpdate(ConsumerRecords records) { - try { - log.info("Simulating long running batch db update {}", records.count()); - Thread.sleep(1000L); - } catch (InterruptedException ex) { - Thread.currentThread() - .interrupt(); - throw new RuntimeException(ex); - } - } -} diff --git a/apache-kafka-4/src/test/java/com/baeldung/kafka/commitfailure/batch/KafkaConsumerServiceLiveTest.java b/apache-kafka-4/src/test/java/com/baeldung/kafka/commitfailure/batch/KafkaConsumerServiceLiveTest.java deleted file mode 100644 index ec1dc24ff842..000000000000 --- a/apache-kafka-4/src/test/java/com/baeldung/kafka/commitfailure/batch/KafkaConsumerServiceLiveTest.java +++ /dev/null @@ -1,79 +0,0 @@ -package com.baeldung.kafka.commitfailure.batch; - -import static org.assertj.core.api.Assertions.assertThat; -import static org.junit.jupiter.api.Assertions.assertTrue; - -import java.util.Properties; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicReference; - -import org.apache.kafka.clients.consumer.CommitFailedException; -import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.apache.kafka.clients.producer.KafkaProducer; -import org.apache.kafka.clients.producer.ProducerConfig; -import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.kafka.common.serialization.StringDeserializer; -import org.apache.kafka.common.serialization.StringSerializer; -import org.junit.jupiter.api.Test; -import org.testcontainers.containers.KafkaContainer; -import org.testcontainers.junit.jupiter.Container; -import org.testcontainers.junit.jupiter.Testcontainers; -import org.testcontainers.utility.DockerImageName; - -@Testcontainers -public class KafkaConsumerServiceLiveTest { - - @Container - private static final KafkaContainer KAFKA_CONTAINER = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:7.9.0")); - - @Test - void givenProducerMessagesAreSent_whenConsumerIsRunningAsBatch_thenConsumerThrowsCommitFailedException() throws InterruptedException { - CountDownLatch countDownLatch = new CountDownLatch(1); - AtomicReference uncaughtException = new AtomicReference<>(); - - KafkaConsumerService kafkaConsumerService = new KafkaConsumerService(getConsumerConfig(), "test-topic"); - Thread th = new Thread(kafkaConsumerService::consume); - th.setUncaughtExceptionHandler((thread, ex) -> { - uncaughtException.set(ex); - countDownLatch.countDown(); - }); - th.start(); - - try (KafkaProducer producer = new KafkaProducer<>(getProducerConfig())) { - for (int num = 0; num < 300; num++) { - producer.send(new ProducerRecord<>("test-topic", "x1" + num, "test" + num)); - } - producer.flush(); - } - - assertTrue(countDownLatch.await(30, TimeUnit.SECONDS)); - assertThat(uncaughtException.get()).isNotNull() - .isInstanceOf(CommitFailedException.class); - - kafkaConsumerService.shutdown(); - } - - private static Properties getProducerConfig() { - Properties producerProperties = new Properties(); - producerProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_CONTAINER.getBootstrapServers()); - producerProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); - producerProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); - - return producerProperties; - } - - private static Properties getConsumerConfig() { - Properties consumerProperties = new Properties(); - consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_CONTAINER.getBootstrapServers()); - consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, "consumer-app"); - consumerProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); - consumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); - consumerProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); - consumerProperties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); - consumerProperties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 50); - consumerProperties.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 500); - - return consumerProperties; - } -} diff --git a/apache-kafka-4/src/test/java/com/baeldung/kafka/commitfailure/fixed/batch/KafkaConsumerServiceLiveTest.java b/apache-kafka-4/src/test/java/com/baeldung/kafka/commitfailure/fixed/batch/KafkaConsumerServiceLiveTest.java deleted file mode 100644 index fd93f0607167..000000000000 --- a/apache-kafka-4/src/test/java/com/baeldung/kafka/commitfailure/fixed/batch/KafkaConsumerServiceLiveTest.java +++ /dev/null @@ -1,97 +0,0 @@ -package com.baeldung.kafka.commitfailure.fixed.batch; - -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertNotNull; - -import java.util.Map; -import java.util.Properties; -import java.util.concurrent.TimeUnit; - -import org.apache.kafka.clients.admin.AdminClient; -import org.apache.kafka.clients.admin.AdminClientConfig; -import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsResult; -import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.apache.kafka.clients.consumer.OffsetAndMetadata; -import org.apache.kafka.clients.producer.KafkaProducer; -import org.apache.kafka.clients.producer.ProducerConfig; -import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.common.serialization.StringDeserializer; -import org.apache.kafka.common.serialization.StringSerializer; -import org.awaitility.Awaitility; -import org.junit.jupiter.api.Test; -import org.testcontainers.containers.KafkaContainer; -import org.testcontainers.junit.jupiter.Container; -import org.testcontainers.junit.jupiter.Testcontainers; -import org.testcontainers.utility.DockerImageName; - -@Testcontainers -public class KafkaConsumerServiceLiveTest { - - @Container - private static final KafkaContainer KAFKA_CONTAINER = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:7.9.0")); - - @Test - void givenProducerMessagesAreSent_whenConsumerIsRunning_thenConsumerOffsetIsCommitted() { - KafkaConsumerService kafkaConsumerService = new KafkaConsumerService(getConsumerConfig(), "test-topic"); - Thread th = new Thread(kafkaConsumerService::consume); - th.start(); - - try (KafkaProducer producer = new KafkaProducer<>(getProducerConfig())) { - for (int num = 0; num < 300; num++) { - producer.send(new ProducerRecord<>("test-topic", "x1" + num, "test" + num)); - } - producer.flush(); - } - - Awaitility.await() - .atMost(30, TimeUnit.SECONDS) - .pollInterval(2, TimeUnit.SECONDS) - .untilAsserted(() -> { - TopicPartition topicPartition = new TopicPartition("test-topic", 0); - Map committedOffsets; - - try (AdminClient adminClient = AdminClient.create(getAdminProps())) { - ListConsumerGroupOffsetsResult result = adminClient.listConsumerGroupOffsets("consumer-app"); - committedOffsets = result.partitionsToOffsetAndMetadata() - .get(); - } - - assertNotNull(committedOffsets); - assertNotNull(committedOffsets.get(topicPartition)); - assertEquals(300L, committedOffsets.get(topicPartition) - .offset()); - }); - - kafkaConsumerService.shutdown(); - } - - private static Properties getProducerConfig() { - Properties producerProperties = new Properties(); - producerProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_CONTAINER.getBootstrapServers()); - producerProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); - producerProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); - - return producerProperties; - } - - private static Properties getConsumerConfig() { - Properties consumerProperties = new Properties(); - consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_CONTAINER.getBootstrapServers()); - consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, "consumer-app"); - consumerProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); - consumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); - consumerProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); - consumerProperties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); - consumerProperties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 50); - consumerProperties.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 3000); - - return consumerProperties; - } - - private static Properties getAdminProps() { - Properties props = new Properties(); - props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_CONTAINER.getBootstrapServers()); - return props; - } -} From 9839c1fdf4fe3ac4020f339ae7bf46e602d56295 Mon Sep 17 00:00:00 2001 From: saikatcse03 Date: Fri, 12 Jun 2026 12:46:04 +0530 Subject: [PATCH 12/12] rename method --- .../fixed/sequential/KafkaConsumerServiceLiveTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apache-kafka-4/src/test/java/com/baeldung/kafka/commitfailure/fixed/sequential/KafkaConsumerServiceLiveTest.java b/apache-kafka-4/src/test/java/com/baeldung/kafka/commitfailure/fixed/sequential/KafkaConsumerServiceLiveTest.java index fbee18899612..d5e9c4e45e7d 100644 --- a/apache-kafka-4/src/test/java/com/baeldung/kafka/commitfailure/fixed/sequential/KafkaConsumerServiceLiveTest.java +++ b/apache-kafka-4/src/test/java/com/baeldung/kafka/commitfailure/fixed/sequential/KafkaConsumerServiceLiveTest.java @@ -32,7 +32,7 @@ public class KafkaConsumerServiceLiveTest { private static final KafkaContainer KAFKA_CONTAINER = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:7.9.0")); @Test - void givenProducerMessagesAreSent_whenConsumerIsRunning_thenConsumerOffsetIsCommitted() { + void givenProducerMessageIsSent_whenConsumerIsRunning_thenConsumerOffsetIsCommitted() { KafkaConsumerService kafkaConsumerService = new KafkaConsumerService(getConsumerConfig(), "test-topic"); Thread th = new Thread(kafkaConsumerService::consume); th.start();