Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
56 commits
Select commit Hold shift + click to select a range
7e1c4c7
Implement detach and reattach entity in JPA
saikatcse03 Dec 11, 2025
1b115b3
Merge remote-tracking branch 'origin/master'
saikatcse03 Dec 11, 2025
4368b19
Implement additional tests
saikatcse03 Dec 11, 2025
7178d0a
Merge branch 'eugenp:master' into master
saikatcse03 Dec 11, 2025
faebcfc
Merge branch 'eugenp:master' into master
saikatcse03 Jan 16, 2026
a7ae8df
implemented kafka offset reset
saikatcse03 Jan 18, 2026
edffad7
Merge remote-tracking branch 'origin/master'
saikatcse03 Jan 18, 2026
ca2ed3d
refactored code and test cases
saikatcse03 Jan 19, 2026
83a68a0
refactored code
saikatcse03 Jan 19, 2026
5b15352
refactored code
saikatcse03 Jan 19, 2026
3564b3d
refactored test code
saikatcse03 Jan 19, 2026
f9fae8e
refactored test code
saikatcse03 Jan 20, 2026
77ba154
refactored code
saikatcse03 Jan 20, 2026
7ca07b9
refactored code
saikatcse03 Jan 20, 2026
755d33f
refactored code
saikatcse03 Jan 21, 2026
46bdd7f
refactored code
saikatcse03 Jan 22, 2026
7d025bb
refactored code
saikatcse03 Jan 23, 2026
c7d4a5f
Merge branch 'eugenp:master' into master
saikatcse03 Feb 10, 2026
6a77f5a
API Versioning in Spring boot 4
saikatcse03 Feb 14, 2026
cf6c0e2
API Versioning in Spring boot 4 refactoring
saikatcse03 Feb 15, 2026
2e69dbb
update pom file
saikatcse03 Feb 15, 2026
bbde43f
update pom file
saikatcse03 Feb 15, 2026
b755840
resources file
saikatcse03 Feb 15, 2026
79e967d
refactor code
saikatcse03 Feb 15, 2026
54db6b6
refactor code
saikatcse03 Feb 15, 2026
a59ecab
refactor code
saikatcse03 Feb 15, 2026
e84baaa
refactor code
saikatcse03 Feb 16, 2026
3bea137
refactor tests
saikatcse03 Feb 16, 2026
50f25d9
add more test case
saikatcse03 Feb 16, 2026
f8aad08
update project to parent pom and test case refactor
saikatcse03 Feb 16, 2026
d80e698
update test names
saikatcse03 Feb 23, 2026
cd5899f
update test name and property for surefire
saikatcse03 Feb 23, 2026
fbc2e5d
Merge branch 'eugenp:master' into master
saikatcse03 Mar 17, 2026
70357fc
Merge branch 'eugenp:master' into master
saikatcse03 Mar 23, 2026
e2d958e
Implement virtual thread pinning examples
saikatcse03 Mar 31, 2026
ea2b2f6
remove c related files and rename tests
saikatcse03 Mar 31, 2026
67a3a2b
remove non required dependency
saikatcse03 Mar 31, 2026
9dcaba5
rename variable
saikatcse03 Mar 31, 2026
ebb6430
rename variable
saikatcse03 Mar 31, 2026
8a436e7
include maven-shade-plugin
saikatcse03 Mar 31, 2026
046055d
remove unused property
saikatcse03 Mar 31, 2026
e8f13da
update pom file with unused var removal
saikatcse03 Mar 31, 2026
deb50df
safe unlock handling
saikatcse03 Apr 21, 2026
fbcc2c5
rename variable
saikatcse03 Apr 21, 2026
706bf90
refactor method
saikatcse03 Apr 21, 2026
88cdf5a
Merge branch 'eugenp:master' into master
saikatcse03 May 12, 2026
a764823
kafka commit failed exception fix
saikatcse03 May 31, 2026
0b1b660
kafka commit failed exception fix
saikatcse03 May 31, 2026
d85ff9c
Merge remote-tracking branch 'origin/master'
saikatcse03 May 31, 2026
3ea6409
fixed test cases
saikatcse03 Jun 1, 2026
740b8e1
refactor code
saikatcse03 Jun 1, 2026
9d35541
refactor code
saikatcse03 Jun 1, 2026
2297e1c
refactor test
saikatcse03 Jun 1, 2026
502f647
refactor offset commit on rebalance
saikatcse03 Jun 1, 2026
44b9599
test refactor
saikatcse03 Jun 2, 2026
6e665dd
test refactor
saikatcse03 Jun 2, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions apache-kafka-4/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@
</build>

<properties>
<java.version>21</java.version>
<kafka.version>3.9.0</kafka.version>
<testcontainers-kafka.version>1.19.3</testcontainers-kafka.version>
<testcontainers-jupiter.version>1.19.3</testcontainers-jupiter.version>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package com.baeldung.kafka.commitfailure.batch;

import java.time.Duration;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicBoolean;

import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.errors.WakeupException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KafkaConsumerService {

private static final Logger log = LoggerFactory.getLogger(KafkaConsumerService.class);
private final KafkaConsumer<String, String> consumer;
private final AtomicBoolean running = new AtomicBoolean(true);

public KafkaConsumerService(Properties consumerProps, String topic) {
this.consumer = new KafkaConsumer<>(consumerProps);
consumer.subscribe(List.of(topic));
}

public void consume() {
try {
while (running.get()) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(250));
if (records.isEmpty()) {
continue;
}
simulateDBCall(records);
consumer.commitSync();
}
} catch (WakeupException ex) {
if (running.get()) {
log.error("Error in the Kafka Consumer with exception {} {}", ex.getMessage(), ex, ex.getCause());
throw ex;
}
} finally {
consumer.close();
}
}

public void shutdown() {
running.compareAndSet(true, false);
consumer.wakeup();
}

private void simulateDBCall(ConsumerRecords<String, String> records) {
try {
log.info("Simulating long running batch db update for records {}", records.count());
Thread.sleep(1000L);
} catch (InterruptedException ex) {
Thread.currentThread()
.interrupt();
throw new RuntimeException(ex);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
package com.baeldung.kafka.commitfailure.fixed.async;

import java.time.Duration;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.StreamSupport;

import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.WakeupException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KafkaConsumerService {

private static final Logger log = LoggerFactory.getLogger(KafkaConsumerService.class);
private final KafkaConsumer<String, String> consumer;
private final AtomicBoolean running = new AtomicBoolean(true);
private final ExecutorService workers;
private final Map<TopicPartition, AtomicLong> committableOffsets = new ConcurrentHashMap<>();

public KafkaConsumerService(Properties consumerProps, String topic) {
this.consumer = new KafkaConsumer<>(consumerProps);
consumer.subscribe(List.of(topic), new ConsumerRebalanceListener() {
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
try {
commitOffsets(partitions);
} catch (Exception ex) {
log.error("Commit failed during rebalance {} {}", ex.getMessage(), ex, ex.getCause());
} finally {
partitions.forEach(committableOffsets::remove);
}
}

@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
partitions.forEach(committableOffsets::remove);
}
});
workers = Executors.newVirtualThreadPerTaskExecutor();
}

public void consume() {
try {
while (running.get()) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
if (records.isEmpty()) {
continue;
}

List<CompletableFuture<Void>> futures = StreamSupport.stream(records.spliterator(), false)
.map(record -> CompletableFuture.runAsync(() -> simulateDBUpdate(record), workers)
.whenComplete((ignored, ex) -> {
if (ex == null) {
markComplete(record);
} else {
log.error("Failed offset and send to DLQ {} {} {}", record.offset(), record.key(), ex.getMessage());
}
})
.exceptionally(ex -> null))
.toList();

try {
CompletableFuture.allOf(futures.toArray(CompletableFuture[]::new))
.orTimeout(1000, TimeUnit.MILLISECONDS)
.join();
} catch (CompletionException ex) {
log.error("Batch processing timed out or failed {} {}", ex.getMessage(), ex, ex.getCause());
}

commitOffsets();
}
} catch (WakeupException ex) {
if (running.get()) {
log.error("Error in the Kafka Consumer with exception {} {}", ex.getMessage(), ex, ex.getCause());
throw ex;
}
} finally {
commitOffsets();
consumer.close();
}
}

public void shutdown() {
running.compareAndSet(true, false);
consumer.wakeup();
try {
workers.shutdown();
if (!workers.awaitTermination(60, TimeUnit.SECONDS)) {
workers.shutdownNow();
}
} catch (InterruptedException ex) {
workers.shutdownNow();
Thread.currentThread()
.interrupt();
}
}

private void simulateDBUpdate(ConsumerRecord<String, String> record) {
try {
log.info("Simulating a db call - record key {} value {}", record.key(), record.value());
Thread.sleep(300L);
} catch (InterruptedException ex) {
Thread.currentThread()
.interrupt();
throw new RuntimeException(ex);
}
}

private void markComplete(ConsumerRecord<String, String> record) {
TopicPartition tp = new TopicPartition(record.topic(), record.partition());
committableOffsets.computeIfAbsent(tp, k -> new AtomicLong(-1))
.accumulateAndGet(record.offset() + 1, Math::max);
}

private void commitOffsets() {
Map<TopicPartition, OffsetAndMetadata> toCommit = new HashMap<>();

committableOffsets.forEach((tp, atomicOffset) -> {
long val = atomicOffset.get();
if (val != -1L) {
toCommit.put(tp, new OffsetAndMetadata(val));
}
});

if (toCommit.isEmpty()) {
return;
}
consumer.commitSync(toCommit);
toCommit.forEach((tp, meta) -> {
AtomicLong ref = committableOffsets.get(tp);
if (ref != null) {
ref.compareAndSet(meta.offset(), -1L);
}
});
}

private void commitOffsets(Collection<TopicPartition> partitions) {
Map<TopicPartition, OffsetAndMetadata> toCommit = new HashMap<>();

partitions.forEach(tp -> {
AtomicLong ref = committableOffsets.get(tp);
if (ref != null) {
long val = ref.get();
if (val != -1L) {
toCommit.put(tp, new OffsetAndMetadata(val));
}
}
});

if (toCommit.isEmpty()) {
return;
}
consumer.commitSync(toCommit);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package com.baeldung.kafka.commitfailure.fixed.batch;

import java.time.Duration;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicBoolean;

import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.errors.WakeupException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KafkaConsumerService {

private static final Logger log = LoggerFactory.getLogger(KafkaConsumerService.class);
private final KafkaConsumer<String, String> consumer;
private final AtomicBoolean running = new AtomicBoolean(true);

public KafkaConsumerService(Properties consumerProps, String topic) {
this.consumer = new KafkaConsumer<>(consumerProps);
consumer.subscribe(List.of(topic));
}

public void consume() {
try {
while (running.get()) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
if (records.isEmpty()) {
continue;
}
simulateDBUpdate(records);
consumer.commitSync();
}
} catch (WakeupException ex) {
if (running.get()) {
log.error("Error in the Kafka Consumer with exception {} {}", ex.getMessage(), ex, ex.getCause());
throw ex;
}
} finally {
consumer.close();
}
}

public void shutdown() {
running.compareAndSet(true, false);
consumer.wakeup();
}

private void simulateDBUpdate(ConsumerRecords<String, String> records) {
try {
log.info("Simulating long running batch db update {}", records.count());
Thread.sleep(1000L);
} catch (InterruptedException ex) {
Thread.currentThread()
.interrupt();
throw new RuntimeException(ex);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package com.baeldung.kafka.commitfailure.fixed.sequential;

import java.time.Duration;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicBoolean;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.errors.WakeupException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KafkaConsumerService {

private static final Logger log = LoggerFactory.getLogger(KafkaConsumerService.class);
private final KafkaConsumer<String, String> consumer;
private final AtomicBoolean running = new AtomicBoolean(true);

public KafkaConsumerService(Properties consumerProps, String topic) {
this.consumer = new KafkaConsumer<>(consumerProps);
consumer.subscribe(List.of(topic));
}

public void consume() {
try {
while (running.get()) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
if (records.isEmpty()) {
continue;
}
records.forEach(this::simulateDBUpdate);
consumer.commitSync();
}
} catch (WakeupException ex) {
if (running.get()) {
log.error("Error in the Kafka Consumer with exception {} {}", ex.getMessage(), ex, ex.getCause());
throw ex;
}
} finally {
consumer.close();
}
}

public void shutdown() {
running.compareAndSet(true, false);
consumer.wakeup();
}

private void simulateDBUpdate(ConsumerRecord<String, String> record) {
try {
log.info("Simulating a db call - record key {} value {}", record.key(), record.value());
Thread.sleep(100L);
} catch (InterruptedException ex) {
Thread.currentThread()
.interrupt();
throw new RuntimeException(ex);
}
}
}
Loading