Skip to content

Commit b1f33aa

Browse files
garyrussellartembilan
authored andcommitted
Don't commit on assignment with existing commit
Unnecessary to perform the redundant commit on assignment if there is already a committed offset for a partition. It is only necessary when using `latest` auto offset reset and when there is no current committed offset.
1 parent 4a57a8d commit b1f33aa

File tree

2 files changed

+85
-19
lines changed

2 files changed

+85
-19
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 28 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import java.util.Collection;
2424
import java.util.Collections;
2525
import java.util.HashMap;
26+
import java.util.HashSet;
2627
import java.util.Iterator;
2728
import java.util.LinkedHashSet;
2829
import java.util.LinkedList;
@@ -2353,25 +2354,7 @@ public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
23532354
}
23542355
ListenerConsumer.this.assignedPartitions.addAll(partitions);
23552356
if (ListenerConsumer.this.commitCurrentOnAssignment) {
2356-
// Commit initial positions - this is generally redundant but
2357-
// it protects us from the case when another consumer starts
2358-
// and rebalance would cause it to reset at the end
2359-
// see https://github.com/spring-projects/spring-kafka/issues/110
2360-
Map<TopicPartition, OffsetAndMetadata> offsetsToCommit = new HashMap<>();
2361-
for (TopicPartition partition : partitions) {
2362-
try {
2363-
offsetsToCommit.put(partition,
2364-
new OffsetAndMetadata(ListenerConsumer.this.consumer.position(partition)));
2365-
}
2366-
catch (NoOffsetForPartitionException e) {
2367-
ListenerConsumer.this.fatalError = true;
2368-
ListenerConsumer.this.logger.error(e, "No offset and no reset policy");
2369-
return;
2370-
}
2371-
}
2372-
if (offsetsToCommit.size() > 0) {
2373-
commitCurrentOffsets(offsetsToCommit);
2374-
}
2357+
collectAndCommitIfNecessary(partitions);
23752358
}
23762359
if (ListenerConsumer.this.genericListener instanceof ConsumerSeekAware) {
23772360
seekPartitions(partitions, false);
@@ -2384,6 +2367,32 @@ public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
23842367
}
23852368
}
23862369

2370+
private void collectAndCommitIfNecessary(Collection<TopicPartition> partitions) {
2371+
// Commit initial positions - this is generally redundant but
2372+
// it protects us from the case when another consumer starts
2373+
// and rebalance would cause it to reset at the end
2374+
// see https://github.com/spring-projects/spring-kafka/issues/110
2375+
Map<TopicPartition, OffsetAndMetadata> offsetsToCommit = new HashMap<>();
2376+
Map<TopicPartition, OffsetAndMetadata> committed =
2377+
ListenerConsumer.this.consumer.committed(new HashSet<>(partitions));
2378+
for (TopicPartition partition : partitions) {
2379+
try {
2380+
if (committed.get(partition) == null) { // no existing commit for this group
2381+
offsetsToCommit.put(partition,
2382+
new OffsetAndMetadata(ListenerConsumer.this.consumer.position(partition)));
2383+
}
2384+
}
2385+
catch (NoOffsetForPartitionException e) {
2386+
ListenerConsumer.this.fatalError = true;
2387+
ListenerConsumer.this.logger.error(e, "No offset and no reset policy");
2388+
return;
2389+
}
2390+
}
2391+
if (offsetsToCommit.size() > 0) {
2392+
commitCurrentOffsets(offsetsToCommit);
2393+
}
2394+
}
2395+
23872396
private void commitCurrentOffsets(Map<TopicPartition, OffsetAndMetadata> offsetsToCommit) {
23882397
ListenerConsumer.this.commitLogger.log(() -> "Committing on assignment: " + offsetsToCommit);
23892398
if (ListenerConsumer.this.transactionTemplate != null

spring-kafka/src/test/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainerMockTests.java

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@
4747
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
4848
import org.apache.kafka.clients.consumer.ConsumerRecord;
4949
import org.apache.kafka.clients.consumer.ConsumerRecords;
50+
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
5051
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
5152
import org.apache.kafka.clients.producer.Producer;
5253
import org.apache.kafka.common.TopicPartition;
@@ -527,6 +528,62 @@ void testNoCommitOnAssignmentWithEarliest() throws InterruptedException {
527528
verify(consumer, never()).commitSync(any(), any());
528529
}
529530

531+
@Test
532+
void testNoInitialCommitIfAlreadyCommitted() throws InterruptedException {
533+
testInitialCommitIBasedOnCommitted(true);
534+
}
535+
536+
@Test
537+
void testInitialCommitIfNotAlreadyCommitted() throws InterruptedException {
538+
testInitialCommitIBasedOnCommitted(false);
539+
}
540+
541+
@SuppressWarnings({ "rawtypes", "unchecked" })
542+
private void testInitialCommitIBasedOnCommitted(boolean committed) throws InterruptedException {
543+
Consumer consumer = mock(Consumer.class);
544+
ConsumerRecords records = new ConsumerRecords<>(Collections.emptyMap());
545+
CountDownLatch latch = new CountDownLatch(1);
546+
willAnswer(inv -> {
547+
latch.countDown();
548+
Thread.sleep(50);
549+
return records;
550+
}).given(consumer).poll(any());
551+
TopicPartition tp0 = new TopicPartition("foo", 0);
552+
List<TopicPartition> assignments = Arrays.asList(tp0);
553+
willAnswer(invocation -> {
554+
((ConsumerRebalanceListener) invocation.getArgument(1))
555+
.onPartitionsAssigned(assignments);
556+
return null;
557+
}).given(consumer).subscribe(any(Collection.class), any());
558+
if (committed) {
559+
given(consumer.committed(Collections.singleton(tp0)))
560+
.willReturn(Collections.singletonMap(tp0, new OffsetAndMetadata(0L)));
561+
}
562+
else {
563+
given(consumer.committed(Collections.singleton(tp0)))
564+
.willReturn(Collections.singletonMap(tp0, null));
565+
}
566+
ConsumerFactory cf = mock(ConsumerFactory.class);
567+
given(cf.createConsumer(any(), any(), any(), any())).willReturn(consumer);
568+
given(cf.getConfigurationProperties())
569+
.willReturn(Collections.singletonMap(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"));
570+
ContainerProperties containerProperties = new ContainerProperties("foo");
571+
containerProperties.setGroupId("grp");
572+
containerProperties.setMessageListener((MessageListener) rec -> { });
573+
containerProperties.setMissingTopicsFatal(false);
574+
containerProperties.setAssignmentCommitOption(AssignmentCommitOption.LATEST_ONLY);
575+
ConcurrentMessageListenerContainer container = new ConcurrentMessageListenerContainer(cf,
576+
containerProperties);
577+
container.start();
578+
assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue();
579+
if (committed) {
580+
verify(consumer, never()).commitSync(any(), any());
581+
}
582+
else {
583+
verify(consumer).commitSync(any(), any());
584+
}
585+
}
586+
530587
public static class TestMessageListener1 implements MessageListener<String, String>, ConsumerSeekAware {
531588

532589
private static ThreadLocal<ConsumerSeekCallback> callbacks = new ThreadLocal<>();

0 commit comments

Comments
 (0)