diff --git a/apache-kafka-4/pom.xml b/apache-kafka-4/pom.xml index ffdd84b03754..1cad893182ff 100644 --- a/apache-kafka-4/pom.xml +++ b/apache-kafka-4/pom.xml @@ -100,6 +100,7 @@ + 21 3.9.0 1.19.3 1.19.3 diff --git a/apache-kafka-4/src/main/java/com/baeldung/kafka/commitfailure/fixed/async/KafkaConsumerService.java b/apache-kafka-4/src/main/java/com/baeldung/kafka/commitfailure/fixed/async/KafkaConsumerService.java new file mode 100644 index 000000000000..f091cba4cffe --- /dev/null +++ b/apache-kafka-4/src/main/java/com/baeldung/kafka/commitfailure/fixed/async/KafkaConsumerService.java @@ -0,0 +1,176 @@ +package com.baeldung.kafka.commitfailure.fixed.async; + +import java.time.Duration; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.StreamSupport; + +import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.WakeupException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class KafkaConsumerService { + + private static final Logger log = LoggerFactory.getLogger(KafkaConsumerService.class); + private final KafkaConsumer consumer; + private final AtomicBoolean running = new AtomicBoolean(true); + private final ExecutorService workers; + private final Map committableOffsets = new ConcurrentHashMap<>(); + + public KafkaConsumerService(Properties consumerProps, String topic) { + this.consumer = new KafkaConsumer<>(consumerProps); + consumer.subscribe(List.of(topic), new ConsumerRebalanceListener() { + @Override + public void onPartitionsRevoked(Collection partitions) { + try { + commitOffsets(partitions); + } catch (Exception ex) { + log.error("Commit failed during rebalance", ex); + } finally { + partitions.forEach(committableOffsets::remove); + } + } + + @Override + public void onPartitionsAssigned(Collection partitions) { + partitions.forEach(committableOffsets::remove); + } + }); + workers = Executors.newVirtualThreadPerTaskExecutor(); + } + + public void consume() { + try { + while (running.get()) { + ConsumerRecords records = consumer.poll(Duration.ofMillis(100)); + if (records.isEmpty()) { + continue; + } + + List> futures = processAsync(records); + + try { + CompletableFuture.allOf(futures.toArray(CompletableFuture[]::new)) + .orTimeout(700L, TimeUnit.MILLISECONDS) + .join(); + } catch (CompletionException ex) { + log.error("Batch processing timed out or failed", ex); + } + + commitOffsets(); + } + } catch (WakeupException ex) { + if (running.get()) { + log.error("Error in the Kafka Consumer with exception", ex); + throw ex; + } + } finally { + commitOffsets(); + consumer.close(); + } + } + + public void shutdown() { + running.compareAndSet(true, false); + consumer.wakeup(); + try { + workers.shutdown(); + if (!workers.awaitTermination(60, TimeUnit.SECONDS)) { + workers.shutdownNow(); + } + } catch (InterruptedException ex) { + workers.shutdownNow(); + Thread.currentThread() + .interrupt(); + } + } + + private List> processAsync(ConsumerRecords records) { + return StreamSupport.stream(records.spliterator(), false) + .map(record -> CompletableFuture.runAsync(() -> simulateDBUpdate(record), workers) + .whenComplete((ignored, ex) -> { + if (ex == null) { + markComplete(record); + } else { + log.error("Failed offset and send to DLQ {} {} {}", record.offset(), record.key(), ex.getMessage()); + } + }) + .exceptionally(ex -> null)) + .toList(); + } + + private void simulateDBUpdate(ConsumerRecord record) { + try { + log.info("Simulating a db call - record key {} value {}", record.key(), record.value()); + Thread.sleep(300L); + } catch (InterruptedException ex) { + Thread.currentThread() + .interrupt(); + throw new RuntimeException(ex); + } + } + + private void markComplete(ConsumerRecord record) { + TopicPartition tp = new TopicPartition(record.topic(), record.partition()); + committableOffsets.computeIfAbsent(tp, k -> new AtomicLong(-1L)) + .accumulateAndGet(record.offset() + 1L, Math::max); + } + + private void commitOffsets() { + Map toCommit = new HashMap<>(); + + committableOffsets.forEach((tp, atomicOffset) -> { + long val = atomicOffset.get(); + if (val != -1L) { + toCommit.put(tp, new OffsetAndMetadata(val)); + } + }); + + if (toCommit.isEmpty()) { + return; + } + consumer.commitSync(toCommit); + toCommit.forEach((tp, meta) -> { + AtomicLong ref = committableOffsets.get(tp); + if (ref != null) { + ref.compareAndSet(meta.offset(), -1L); + } + }); + } + + private void commitOffsets(Collection partitions) { + Map toCommit = new HashMap<>(); + + partitions.forEach(tp -> { + AtomicLong ref = committableOffsets.get(tp); + if (ref != null) { + long val = ref.get(); + if (val != -1L) { + toCommit.put(tp, new OffsetAndMetadata(val)); + } + } + }); + + if (toCommit.isEmpty()) { + return; + } + consumer.commitSync(toCommit); + } +} diff --git a/apache-kafka-4/src/main/java/com/baeldung/kafka/commitfailure/fixed/sequential/KafkaConsumerService.java b/apache-kafka-4/src/main/java/com/baeldung/kafka/commitfailure/fixed/sequential/KafkaConsumerService.java new file mode 100644 index 000000000000..0d19d3817d47 --- /dev/null +++ b/apache-kafka-4/src/main/java/com/baeldung/kafka/commitfailure/fixed/sequential/KafkaConsumerService.java @@ -0,0 +1,61 @@ +package com.baeldung.kafka.commitfailure.fixed.sequential; + +import java.time.Duration; +import java.util.List; +import java.util.Properties; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.errors.WakeupException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class KafkaConsumerService { + + private static final Logger log = LoggerFactory.getLogger(KafkaConsumerService.class); + private final KafkaConsumer consumer; + private final AtomicBoolean running = new AtomicBoolean(true); + + public KafkaConsumerService(Properties consumerProps, String topic) { + this.consumer = new KafkaConsumer<>(consumerProps); + consumer.subscribe(List.of(topic)); + } + + public void consume() { + try { + while (running.get()) { + ConsumerRecords records = consumer.poll(Duration.ofMillis(100)); + if (records.isEmpty()) { + continue; + } + records.forEach(this::simulateDBUpdate); + consumer.commitSync(); + } + } catch (WakeupException ex) { + if (running.get()) { + log.error("Error in the Kafka Consumer with exception", ex); + throw ex; + } + } finally { + consumer.close(); + } + } + + public void shutdown() { + running.compareAndSet(true, false); + consumer.wakeup(); + } + + private void simulateDBUpdate(ConsumerRecord record) { + try { + log.info("Simulating a db call - record key {} value {}", record.key(), record.value()); + Thread.sleep(150L); + } catch (InterruptedException ex) { + Thread.currentThread() + .interrupt(); + throw new RuntimeException(ex); + } + } +} diff --git a/apache-kafka-4/src/main/java/com/baeldung/kafka/commitfailure/sequential/KafkaConsumerService.java b/apache-kafka-4/src/main/java/com/baeldung/kafka/commitfailure/sequential/KafkaConsumerService.java new file mode 100644 index 000000000000..63d08c339361 --- /dev/null +++ b/apache-kafka-4/src/main/java/com/baeldung/kafka/commitfailure/sequential/KafkaConsumerService.java @@ -0,0 +1,61 @@ +package com.baeldung.kafka.commitfailure.sequential; + +import java.time.Duration; +import java.util.List; +import java.util.Properties; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.errors.WakeupException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class KafkaConsumerService { + + private static final Logger log = LoggerFactory.getLogger(KafkaConsumerService.class); + private final KafkaConsumer consumer; + private final AtomicBoolean running = new AtomicBoolean(true); + + public KafkaConsumerService(Properties props, String topic) { + this.consumer = new KafkaConsumer<>(props); + consumer.subscribe(List.of(topic)); + } + + public void consume() { + try { + while (running.get()) { + ConsumerRecords records = consumer.poll(Duration.ofMillis(100)); + if (records.isEmpty()) { + continue; + } + records.forEach(this::simulateDBCall); + consumer.commitSync(); + } + } catch (WakeupException ex) { + if (running.get()) { + log.error("Error in the Kafka Consumer with exception", ex); + throw ex; + } + } finally { + consumer.close(); + } + } + + public void shutdown() { + running.compareAndSet(true, false); + consumer.wakeup(); + } + + private void simulateDBCall(ConsumerRecord record) { + try { + log.info("Simulating a DB call - record key {} value {}", record.key(), record.value()); + Thread.sleep(150L); + } catch (InterruptedException ex) { + Thread.currentThread() + .interrupt(); + throw new RuntimeException(ex); + } + } +} diff --git a/apache-kafka-4/src/test/java/com/baeldung/kafka/commitfailure/fixed/async/KafkaConsumerServiceLiveTest.java b/apache-kafka-4/src/test/java/com/baeldung/kafka/commitfailure/fixed/async/KafkaConsumerServiceLiveTest.java new file mode 100644 index 000000000000..a73fc99474c9 --- /dev/null +++ b/apache-kafka-4/src/test/java/com/baeldung/kafka/commitfailure/fixed/async/KafkaConsumerServiceLiveTest.java @@ -0,0 +1,97 @@ +package com.baeldung.kafka.commitfailure.fixed.async; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; + +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.TimeUnit; + +import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.admin.AdminClientConfig; +import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsResult; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.awaitility.Awaitility; +import org.junit.jupiter.api.Test; +import org.testcontainers.containers.KafkaContainer; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; +import org.testcontainers.utility.DockerImageName; + +@Testcontainers +public class KafkaConsumerServiceLiveTest { + + @Container + private static final KafkaContainer KAFKA_CONTAINER = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:7.9.0")); + + @Test + void givenProducerMessagesAreSent_whenConsumerIsRunning_thenConsumerOffsetIsCommitted() { + KafkaConsumerService kafkaConsumerService = new KafkaConsumerService(getConsumerConfig(), "test-topic"); + Thread th = new Thread(kafkaConsumerService::consume); + th.start(); + + try (KafkaProducer producer = new KafkaProducer<>(getProducerConfig())) { + for (int num = 0; num < 100; num++) { + producer.send(new ProducerRecord<>("test-topic", "x1" + num, "test" + num)); + } + producer.flush(); + } + + Awaitility.await() + .atMost(30, TimeUnit.SECONDS) + .pollInterval(2, TimeUnit.SECONDS) + .untilAsserted(() -> { + TopicPartition topicPartition = new TopicPartition("test-topic", 0); + Map committedOffsets; + + try (AdminClient adminClient = AdminClient.create(getAdminProps())) { + ListConsumerGroupOffsetsResult result = adminClient.listConsumerGroupOffsets("consumer-app"); + committedOffsets = result.partitionsToOffsetAndMetadata() + .get(); + } + + assertNotNull(committedOffsets); + assertNotNull(committedOffsets.get(topicPartition)); + assertEquals(100L, committedOffsets.get(topicPartition) + .offset()); + }); + + kafkaConsumerService.shutdown(); + } + + private static Properties getProducerConfig() { + Properties producerProperties = new Properties(); + producerProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_CONTAINER.getBootstrapServers()); + producerProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + producerProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + + return producerProperties; + } + + private static Properties getConsumerConfig() { + Properties consumerProperties = new Properties(); + consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_CONTAINER.getBootstrapServers()); + consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, "consumer-app"); + consumerProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); + consumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); + consumerProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + consumerProperties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); + consumerProperties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 30); + consumerProperties.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 1000); + + return consumerProperties; + } + + private static Properties getAdminProps() { + Properties props = new Properties(); + props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_CONTAINER.getBootstrapServers()); + return props; + } +} diff --git a/apache-kafka-4/src/test/java/com/baeldung/kafka/commitfailure/fixed/sequential/KafkaConsumerServiceLiveTest.java b/apache-kafka-4/src/test/java/com/baeldung/kafka/commitfailure/fixed/sequential/KafkaConsumerServiceLiveTest.java new file mode 100644 index 000000000000..d5e9c4e45e7d --- /dev/null +++ b/apache-kafka-4/src/test/java/com/baeldung/kafka/commitfailure/fixed/sequential/KafkaConsumerServiceLiveTest.java @@ -0,0 +1,95 @@ +package com.baeldung.kafka.commitfailure.fixed.sequential; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; + +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.TimeUnit; + +import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.admin.AdminClientConfig; +import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsResult; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.awaitility.Awaitility; +import org.junit.jupiter.api.Test; +import org.testcontainers.containers.KafkaContainer; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; +import org.testcontainers.utility.DockerImageName; + +@Testcontainers +public class KafkaConsumerServiceLiveTest { + + @Container + private static final KafkaContainer KAFKA_CONTAINER = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:7.9.0")); + + @Test + void givenProducerMessageIsSent_whenConsumerIsRunning_thenConsumerOffsetIsCommitted() { + KafkaConsumerService kafkaConsumerService = new KafkaConsumerService(getConsumerConfig(), "test-topic"); + Thread th = new Thread(kafkaConsumerService::consume); + th.start(); + + try (KafkaProducer producer = new KafkaProducer<>(getProducerConfig())) { + producer.send(new ProducerRecord<>("test-topic", "x1", "test")); + producer.flush(); + } + + Awaitility.await() + .atMost(30, TimeUnit.SECONDS) + .pollInterval(2, TimeUnit.SECONDS) + .untilAsserted(() -> { + TopicPartition topicPartition = new TopicPartition("test-topic", 0); + Map committedOffsets; + + try (AdminClient adminClient = AdminClient.create(getAdminProps())) { + ListConsumerGroupOffsetsResult result = adminClient.listConsumerGroupOffsets("consumer-app"); + committedOffsets = result.partitionsToOffsetAndMetadata() + .get(); + } + + assertNotNull(committedOffsets); + assertNotNull(committedOffsets.get(topicPartition)); + assertEquals(1L, committedOffsets.get(topicPartition) + .offset()); + }); + + kafkaConsumerService.shutdown(); + } + + private static Properties getProducerConfig() { + Properties producerProperties = new Properties(); + producerProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_CONTAINER.getBootstrapServers()); + producerProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + producerProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + + return producerProperties; + } + + private static Properties getConsumerConfig() { + Properties consumerProperties = new Properties(); + consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_CONTAINER.getBootstrapServers()); + consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, "consumer-app"); + consumerProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); + consumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); + consumerProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + consumerProperties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); + consumerProperties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1); + consumerProperties.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 300); + + return consumerProperties; + } + + private static Properties getAdminProps() { + Properties props = new Properties(); + props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_CONTAINER.getBootstrapServers()); + return props; + } +} diff --git a/apache-kafka-4/src/test/java/com/baeldung/kafka/commitfailure/sequential/KafkaConsumerServiceLiveTest.java b/apache-kafka-4/src/test/java/com/baeldung/kafka/commitfailure/sequential/KafkaConsumerServiceLiveTest.java new file mode 100644 index 000000000000..4f42f9ab9759 --- /dev/null +++ b/apache-kafka-4/src/test/java/com/baeldung/kafka/commitfailure/sequential/KafkaConsumerServiceLiveTest.java @@ -0,0 +1,78 @@ +package com.baeldung.kafka.commitfailure.sequential; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.util.Properties; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +import org.apache.kafka.clients.consumer.CommitFailedException; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.junit.jupiter.api.Test; +import org.testcontainers.containers.KafkaContainer; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; +import org.testcontainers.utility.DockerImageName; + +@Testcontainers +public class KafkaConsumerServiceLiveTest { + + @Container + private static final KafkaContainer KAFKA_CONTAINER = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:7.9.0")); + + @Test + void givenProducerMessageIsSent_whenConsumerIsRunning_thenConsumerThrowsCommitFailedException() throws InterruptedException { + CountDownLatch countDownLatch = new CountDownLatch(1); + AtomicReference uncaughtException = new AtomicReference<>(); + + KafkaConsumerService kafkaConsumerService = new KafkaConsumerService(getConsumerConfig(), "test-topic"); + Thread th = new Thread(kafkaConsumerService::consume); + th.setUncaughtExceptionHandler((thread, ex) -> { + uncaughtException.set(ex); + countDownLatch.countDown(); + }); + + th.start(); + + try (KafkaProducer producer = new KafkaProducer<>(getProducerConfig())) { + producer.send(new ProducerRecord<>("test-topic", "x1", "test1")); + producer.flush(); + } + + assertTrue(countDownLatch.await(30, TimeUnit.SECONDS)); + assertThat(uncaughtException.get()).isNotNull() + .isInstanceOf(CommitFailedException.class); + + kafkaConsumerService.shutdown(); + } + + private static Properties getProducerConfig() { + Properties producerProperties = new Properties(); + producerProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_CONTAINER.getBootstrapServers()); + producerProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + producerProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + + return producerProperties; + } + + private static Properties getConsumerConfig() { + Properties consumerProperties = new Properties(); + consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_CONTAINER.getBootstrapServers()); + consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, "consumer-app"); + consumerProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); + consumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); + consumerProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + consumerProperties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); + consumerProperties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1); + consumerProperties.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 50); + + return consumerProperties; + } +}