-
Notifications
You must be signed in to change notification settings - Fork 53.5k
[BAEL-6569] implement the Kafka commit failure handling #19239
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
85bdaef
3a48a80
e9226de
93b3a41
e26bbac
4704563
bdbca8d
6be6c99
8b88ebc
efcc548
6130530
9839c1f
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,176 @@ | ||
| package com.baeldung.kafka.commitfailure.fixed.async; | ||
|
|
||
| import java.time.Duration; | ||
| import java.util.Collection; | ||
| import java.util.HashMap; | ||
| import java.util.List; | ||
| import java.util.Map; | ||
| import java.util.Properties; | ||
| import java.util.concurrent.CompletableFuture; | ||
| import java.util.concurrent.CompletionException; | ||
| import java.util.concurrent.ConcurrentHashMap; | ||
| import java.util.concurrent.ExecutorService; | ||
| import java.util.concurrent.Executors; | ||
| import java.util.concurrent.TimeUnit; | ||
| import java.util.concurrent.atomic.AtomicBoolean; | ||
| import java.util.concurrent.atomic.AtomicLong; | ||
| import java.util.stream.StreamSupport; | ||
|
|
||
| import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; | ||
| import org.apache.kafka.clients.consumer.ConsumerRecord; | ||
| import org.apache.kafka.clients.consumer.ConsumerRecords; | ||
| import org.apache.kafka.clients.consumer.KafkaConsumer; | ||
| import org.apache.kafka.clients.consumer.OffsetAndMetadata; | ||
| import org.apache.kafka.common.TopicPartition; | ||
| import org.apache.kafka.common.errors.WakeupException; | ||
| import org.slf4j.Logger; | ||
| import org.slf4j.LoggerFactory; | ||
|
|
||
| public class KafkaConsumerService { | ||
|
|
||
| private static final Logger log = LoggerFactory.getLogger(KafkaConsumerService.class); | ||
| private final KafkaConsumer<String, String> consumer; | ||
| private final AtomicBoolean running = new AtomicBoolean(true); | ||
| private final ExecutorService workers; | ||
| private final Map<TopicPartition, AtomicLong> committableOffsets = new ConcurrentHashMap<>(); | ||
|
|
||
| public KafkaConsumerService(Properties consumerProps, String topic) { | ||
| this.consumer = new KafkaConsumer<>(consumerProps); | ||
| consumer.subscribe(List.of(topic), new ConsumerRebalanceListener() { | ||
| @Override | ||
| public void onPartitionsRevoked(Collection<TopicPartition> partitions) { | ||
| try { | ||
| commitOffsets(partitions); | ||
| } catch (Exception ex) { | ||
| log.error("Commit failed during rebalance", ex); | ||
| } finally { | ||
| partitions.forEach(committableOffsets::remove); | ||
| } | ||
| } | ||
|
|
||
| @Override | ||
| public void onPartitionsAssigned(Collection<TopicPartition> partitions) { | ||
| partitions.forEach(committableOffsets::remove); | ||
| } | ||
| }); | ||
| workers = Executors.newVirtualThreadPerTaskExecutor(); | ||
| } | ||
|
|
||
| public void consume() { | ||
| try { | ||
| while (running.get()) { | ||
| ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); | ||
| if (records.isEmpty()) { | ||
| continue; | ||
| } | ||
|
|
||
| List<CompletableFuture<Void>> futures = processAsync(records); | ||
|
|
||
| try { | ||
| CompletableFuture.allOf(futures.toArray(CompletableFuture[]::new)) | ||
| .orTimeout(700L, TimeUnit.MILLISECONDS) | ||
| .join(); | ||
| } catch (CompletionException ex) { | ||
| log.error("Batch processing timed out or failed", ex); | ||
| } | ||
|
|
||
| commitOffsets(); | ||
| } | ||
| } catch (WakeupException ex) { | ||
| if (running.get()) { | ||
| log.error("Error in the Kafka Consumer with exception", ex); | ||
| throw ex; | ||
| } | ||
| } finally { | ||
| commitOffsets(); | ||
| consumer.close(); | ||
| } | ||
| } | ||
|
|
||
| public void shutdown() { | ||
| running.compareAndSet(true, false); | ||
| consumer.wakeup(); | ||
| try { | ||
| workers.shutdown(); | ||
| if (!workers.awaitTermination(60, TimeUnit.SECONDS)) { | ||
| workers.shutdownNow(); | ||
| } | ||
| } catch (InterruptedException ex) { | ||
| workers.shutdownNow(); | ||
| Thread.currentThread() | ||
| .interrupt(); | ||
| } | ||
| } | ||
|
|
||
| private List<CompletableFuture<Void>> processAsync(ConsumerRecords<String, String> records) { | ||
| return StreamSupport.stream(records.spliterator(), false) | ||
| .map(record -> CompletableFuture.runAsync(() -> simulateDBUpdate(record), workers) | ||
| .whenComplete((ignored, ex) -> { | ||
| if (ex == null) { | ||
| markComplete(record); | ||
| } else { | ||
| log.error("Failed offset and send to DLQ {} {} {}", record.offset(), record.key(), ex.getMessage()); | ||
| } | ||
| }) | ||
| .exceptionally(ex -> null)) | ||
| .toList(); | ||
| } | ||
|
|
||
| private void simulateDBUpdate(ConsumerRecord<String, String> record) { | ||
| try { | ||
| log.info("Simulating a db call - record key {} value {}", record.key(), record.value()); | ||
| Thread.sleep(300L); | ||
| } catch (InterruptedException ex) { | ||
| Thread.currentThread() | ||
| .interrupt(); | ||
| throw new RuntimeException(ex); | ||
| } | ||
| } | ||
|
|
||
| private void markComplete(ConsumerRecord<String, String> record) { | ||
| TopicPartition tp = new TopicPartition(record.topic(), record.partition()); | ||
| committableOffsets.computeIfAbsent(tp, k -> new AtomicLong(-1L)) | ||
| .accumulateAndGet(record.offset() + 1L, Math::max); | ||
| } | ||
|
|
||
| private void commitOffsets() { | ||
| Map<TopicPartition, OffsetAndMetadata> toCommit = new HashMap<>(); | ||
|
|
||
| committableOffsets.forEach((tp, atomicOffset) -> { | ||
| long val = atomicOffset.get(); | ||
| if (val != -1L) { | ||
| toCommit.put(tp, new OffsetAndMetadata(val)); | ||
| } | ||
| }); | ||
|
|
||
| if (toCommit.isEmpty()) { | ||
| return; | ||
| } | ||
| consumer.commitSync(toCommit); | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can't this throw a CommitFailedException? Both the timeout and deadline are set to 1 second so there is no safety margin We don't see it at the moment because the workload itself finishes in well under 1 second
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, i did not realize this timeout is same with as with the test poll timeout. reduced it. Ideally, in production code, i would read the config timeout and then derive it here with some buffer. |
||
| toCommit.forEach((tp, meta) -> { | ||
| AtomicLong ref = committableOffsets.get(tp); | ||
| if (ref != null) { | ||
| ref.compareAndSet(meta.offset(), -1L); | ||
| } | ||
| }); | ||
| } | ||
|
|
||
| private void commitOffsets(Collection<TopicPartition> partitions) { | ||
| Map<TopicPartition, OffsetAndMetadata> toCommit = new HashMap<>(); | ||
|
|
||
| partitions.forEach(tp -> { | ||
| AtomicLong ref = committableOffsets.get(tp); | ||
| if (ref != null) { | ||
| long val = ref.get(); | ||
| if (val != -1L) { | ||
| toCommit.put(tp, new OffsetAndMetadata(val)); | ||
| } | ||
| } | ||
| }); | ||
|
|
||
| if (toCommit.isEmpty()) { | ||
| return; | ||
| } | ||
| consumer.commitSync(toCommit); | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,61 @@ | ||
| package com.baeldung.kafka.commitfailure.fixed.sequential; | ||
|
|
||
| import java.time.Duration; | ||
| import java.util.List; | ||
| import java.util.Properties; | ||
| import java.util.concurrent.atomic.AtomicBoolean; | ||
|
|
||
| import org.apache.kafka.clients.consumer.ConsumerRecord; | ||
| import org.apache.kafka.clients.consumer.ConsumerRecords; | ||
| import org.apache.kafka.clients.consumer.KafkaConsumer; | ||
| import org.apache.kafka.common.errors.WakeupException; | ||
| import org.slf4j.Logger; | ||
| import org.slf4j.LoggerFactory; | ||
|
|
||
| public class KafkaConsumerService { | ||
|
|
||
| private static final Logger log = LoggerFactory.getLogger(KafkaConsumerService.class); | ||
| private final KafkaConsumer<String, String> consumer; | ||
| private final AtomicBoolean running = new AtomicBoolean(true); | ||
|
|
||
| public KafkaConsumerService(Properties consumerProps, String topic) { | ||
| this.consumer = new KafkaConsumer<>(consumerProps); | ||
| consumer.subscribe(List.of(topic)); | ||
| } | ||
|
|
||
| public void consume() { | ||
| try { | ||
| while (running.get()) { | ||
| ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); | ||
| if (records.isEmpty()) { | ||
| continue; | ||
| } | ||
| records.forEach(this::simulateDBUpdate); | ||
| consumer.commitSync(); | ||
| } | ||
| } catch (WakeupException ex) { | ||
| if (running.get()) { | ||
| log.error("Error in the Kafka Consumer with exception", ex); | ||
| throw ex; | ||
| } | ||
| } finally { | ||
| consumer.close(); | ||
| } | ||
| } | ||
|
|
||
| public void shutdown() { | ||
| running.compareAndSet(true, false); | ||
| consumer.wakeup(); | ||
| } | ||
|
|
||
| private void simulateDBUpdate(ConsumerRecord<String, String> record) { | ||
| try { | ||
| log.info("Simulating a db call - record key {} value {}", record.key(), record.value()); | ||
| Thread.sleep(150L); | ||
| } catch (InterruptedException ex) { | ||
| Thread.currentThread() | ||
| .interrupt(); | ||
| throw new RuntimeException(ex); | ||
| } | ||
| } | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,61 @@ | ||
| package com.baeldung.kafka.commitfailure.sequential; | ||
|
|
||
| import java.time.Duration; | ||
| import java.util.List; | ||
| import java.util.Properties; | ||
| import java.util.concurrent.atomic.AtomicBoolean; | ||
|
|
||
| import org.apache.kafka.clients.consumer.ConsumerRecord; | ||
| import org.apache.kafka.clients.consumer.ConsumerRecords; | ||
| import org.apache.kafka.clients.consumer.KafkaConsumer; | ||
| import org.apache.kafka.common.errors.WakeupException; | ||
| import org.slf4j.Logger; | ||
| import org.slf4j.LoggerFactory; | ||
|
|
||
| public class KafkaConsumerService { | ||
|
|
||
| private static final Logger log = LoggerFactory.getLogger(KafkaConsumerService.class); | ||
| private final KafkaConsumer<String, String> consumer; | ||
| private final AtomicBoolean running = new AtomicBoolean(true); | ||
|
|
||
| public KafkaConsumerService(Properties props, String topic) { | ||
| this.consumer = new KafkaConsumer<>(props); | ||
| consumer.subscribe(List.of(topic)); | ||
| } | ||
|
|
||
| public void consume() { | ||
| try { | ||
| while (running.get()) { | ||
| ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); | ||
| if (records.isEmpty()) { | ||
| continue; | ||
| } | ||
| records.forEach(this::simulateDBCall); | ||
| consumer.commitSync(); | ||
| } | ||
| } catch (WakeupException ex) { | ||
| if (running.get()) { | ||
| log.error("Error in the Kafka Consumer with exception", ex); | ||
| throw ex; | ||
| } | ||
| } finally { | ||
| consumer.close(); | ||
| } | ||
| } | ||
|
|
||
| public void shutdown() { | ||
| running.compareAndSet(true, false); | ||
| consumer.wakeup(); | ||
| } | ||
|
|
||
| private void simulateDBCall(ConsumerRecord<String, String> record) { | ||
| try { | ||
| log.info("Simulating a DB call - record key {} value {}", record.key(), record.value()); | ||
| Thread.sleep(150L); | ||
| } catch (InterruptedException ex) { | ||
| Thread.currentThread() | ||
| .interrupt(); | ||
| throw new RuntimeException(ex); | ||
| } | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could this mask a CommitFailedException being caught?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no, in this place CommitFailedException has to only log the failure and no need to rethrow it as that might fail consumer process for other partitions. Anyways the failed committed offset will be polled by other consumer in the group, and no other action required here. Even committing offset during rebalance is not always required and is only best effort.