Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions apache-kafka-4/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@
</build>

<properties>
<java.version>21</java.version>
<kafka.version>3.9.0</kafka.version>
<testcontainers-kafka.version>1.19.3</testcontainers-kafka.version>
<testcontainers-jupiter.version>1.19.3</testcontainers-jupiter.version>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
package com.baeldung.kafka.commitfailure.fixed.async;

import java.time.Duration;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.StreamSupport;

import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.WakeupException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KafkaConsumerService {

private static final Logger log = LoggerFactory.getLogger(KafkaConsumerService.class);
private final KafkaConsumer<String, String> consumer;
private final AtomicBoolean running = new AtomicBoolean(true);
private final ExecutorService workers;
private final Map<TopicPartition, AtomicLong> committableOffsets = new ConcurrentHashMap<>();

public KafkaConsumerService(Properties consumerProps, String topic) {
this.consumer = new KafkaConsumer<>(consumerProps);
consumer.subscribe(List.of(topic), new ConsumerRebalanceListener() {
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
try {
commitOffsets(partitions);
} catch (Exception ex) {
log.error("Commit failed during rebalance", ex);

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could this mask a CommitFailedException being caught?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no, in this place CommitFailedException has to only log the failure and no need to rethrow it as that might fail consumer process for other partitions. Anyways the failed committed offset will be polled by other consumer in the group, and no other action required here. Even committing offset during rebalance is not always required and is only best effort.

} finally {
partitions.forEach(committableOffsets::remove);
}
}

@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
partitions.forEach(committableOffsets::remove);
}
});
workers = Executors.newVirtualThreadPerTaskExecutor();
}

public void consume() {
try {
while (running.get()) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
if (records.isEmpty()) {
continue;
}

List<CompletableFuture<Void>> futures = processAsync(records);

try {
CompletableFuture.allOf(futures.toArray(CompletableFuture[]::new))
.orTimeout(700L, TimeUnit.MILLISECONDS)
.join();
} catch (CompletionException ex) {
log.error("Batch processing timed out or failed", ex);
}

commitOffsets();
}
} catch (WakeupException ex) {
if (running.get()) {
log.error("Error in the Kafka Consumer with exception", ex);
throw ex;
}
} finally {
commitOffsets();
consumer.close();
}
}

public void shutdown() {
running.compareAndSet(true, false);
consumer.wakeup();
try {
workers.shutdown();
if (!workers.awaitTermination(60, TimeUnit.SECONDS)) {
workers.shutdownNow();
}
} catch (InterruptedException ex) {
workers.shutdownNow();
Thread.currentThread()
.interrupt();
}
}

private List<CompletableFuture<Void>> processAsync(ConsumerRecords<String, String> records) {
return StreamSupport.stream(records.spliterator(), false)
.map(record -> CompletableFuture.runAsync(() -> simulateDBUpdate(record), workers)
.whenComplete((ignored, ex) -> {
if (ex == null) {
markComplete(record);
} else {
log.error("Failed offset and send to DLQ {} {} {}", record.offset(), record.key(), ex.getMessage());
}
})
.exceptionally(ex -> null))
.toList();
}

private void simulateDBUpdate(ConsumerRecord<String, String> record) {
try {
log.info("Simulating a db call - record key {} value {}", record.key(), record.value());
Thread.sleep(300L);
} catch (InterruptedException ex) {
Thread.currentThread()
.interrupt();
throw new RuntimeException(ex);
}
}

private void markComplete(ConsumerRecord<String, String> record) {
TopicPartition tp = new TopicPartition(record.topic(), record.partition());
committableOffsets.computeIfAbsent(tp, k -> new AtomicLong(-1L))
.accumulateAndGet(record.offset() + 1L, Math::max);
}

private void commitOffsets() {
Map<TopicPartition, OffsetAndMetadata> toCommit = new HashMap<>();

committableOffsets.forEach((tp, atomicOffset) -> {
long val = atomicOffset.get();
if (val != -1L) {
toCommit.put(tp, new OffsetAndMetadata(val));
}
});

if (toCommit.isEmpty()) {
return;
}
consumer.commitSync(toCommit);

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can't this throw a CommitFailedException? Both the timeout and deadline are set to 1 second so there is no safety margin

We don't see it at the moment because the workload itself finishes in well under 1 second

@saikatcse03 saikatcse03 Jun 9, 2026

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, i did not realize this timeout is same with as with the test poll timeout. reduced it. Ideally, in production code, i would read the config timeout and then derive it here with some buffer.

toCommit.forEach((tp, meta) -> {
AtomicLong ref = committableOffsets.get(tp);
if (ref != null) {
ref.compareAndSet(meta.offset(), -1L);
}
});
}

private void commitOffsets(Collection<TopicPartition> partitions) {
Map<TopicPartition, OffsetAndMetadata> toCommit = new HashMap<>();

partitions.forEach(tp -> {
AtomicLong ref = committableOffsets.get(tp);
if (ref != null) {
long val = ref.get();
if (val != -1L) {
toCommit.put(tp, new OffsetAndMetadata(val));
}
}
});

if (toCommit.isEmpty()) {
return;
}
consumer.commitSync(toCommit);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package com.baeldung.kafka.commitfailure.fixed.sequential;

import java.time.Duration;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicBoolean;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.errors.WakeupException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KafkaConsumerService {

private static final Logger log = LoggerFactory.getLogger(KafkaConsumerService.class);
private final KafkaConsumer<String, String> consumer;
private final AtomicBoolean running = new AtomicBoolean(true);

public KafkaConsumerService(Properties consumerProps, String topic) {
this.consumer = new KafkaConsumer<>(consumerProps);
consumer.subscribe(List.of(topic));
}

public void consume() {
try {
while (running.get()) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
if (records.isEmpty()) {
continue;
}
records.forEach(this::simulateDBUpdate);
consumer.commitSync();
}
} catch (WakeupException ex) {
if (running.get()) {
log.error("Error in the Kafka Consumer with exception", ex);
throw ex;
}
} finally {
consumer.close();
}
}

public void shutdown() {
running.compareAndSet(true, false);
consumer.wakeup();
}

private void simulateDBUpdate(ConsumerRecord<String, String> record) {
try {
log.info("Simulating a db call - record key {} value {}", record.key(), record.value());
Thread.sleep(150L);
} catch (InterruptedException ex) {
Thread.currentThread()
.interrupt();
throw new RuntimeException(ex);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package com.baeldung.kafka.commitfailure.sequential;

import java.time.Duration;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicBoolean;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.errors.WakeupException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KafkaConsumerService {

private static final Logger log = LoggerFactory.getLogger(KafkaConsumerService.class);
private final KafkaConsumer<String, String> consumer;
private final AtomicBoolean running = new AtomicBoolean(true);

public KafkaConsumerService(Properties props, String topic) {
this.consumer = new KafkaConsumer<>(props);
consumer.subscribe(List.of(topic));
}

public void consume() {
try {
while (running.get()) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
if (records.isEmpty()) {
continue;
}
records.forEach(this::simulateDBCall);
consumer.commitSync();
}
} catch (WakeupException ex) {
if (running.get()) {
log.error("Error in the Kafka Consumer with exception", ex);
throw ex;
}
} finally {
consumer.close();
}
}

public void shutdown() {
running.compareAndSet(true, false);
consumer.wakeup();
}

private void simulateDBCall(ConsumerRecord<String, String> record) {
try {
log.info("Simulating a DB call - record key {} value {}", record.key(), record.value());
Thread.sleep(150L);
} catch (InterruptedException ex) {
Thread.currentThread()
.interrupt();
throw new RuntimeException(ex);
}
}
}
Loading