Skip to content

Commit b51a778

Browse files
garyrussellartembilan
authored andcommitted
Fix fatal early return from onPartitionsAssigned
Exit from the method when we encounter a fatal error. (Early exit was lost during refactoring).
1 parent 1810409 commit b51a778

File tree

1 file changed

+6
-3
lines changed

1 file changed

+6
-3
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2354,7 +2354,9 @@ public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
23542354
}
23552355
ListenerConsumer.this.assignedPartitions.addAll(partitions);
23562356
if (ListenerConsumer.this.commitCurrentOnAssignment) {
2357-
collectAndCommitIfNecessary(partitions);
2357+
if (!collectAndCommitIfNecessary(partitions)) {
2358+
return;
2359+
}
23582360
}
23592361
if (ListenerConsumer.this.genericListener instanceof ConsumerSeekAware) {
23602362
seekPartitions(partitions, false);
@@ -2367,7 +2369,7 @@ public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
23672369
}
23682370
}
23692371

2370-
private void collectAndCommitIfNecessary(Collection<TopicPartition> partitions) {
2372+
private boolean collectAndCommitIfNecessary(Collection<TopicPartition> partitions) {
23712373
// Commit initial positions - this is generally redundant but
23722374
// it protects us from the case when another consumer starts
23732375
// and rebalance would cause it to reset at the end
@@ -2385,12 +2387,13 @@ private void collectAndCommitIfNecessary(Collection<TopicPartition> partitions)
23852387
catch (NoOffsetForPartitionException e) {
23862388
ListenerConsumer.this.fatalError = true;
23872389
ListenerConsumer.this.logger.error(e, "No offset and no reset policy");
2388-
return;
2390+
return false;
23892391
}
23902392
}
23912393
if (offsetsToCommit.size() > 0) {
23922394
commitCurrentOffsets(offsetsToCommit);
23932395
}
2396+
return true;
23942397
}
23952398

23962399
private void commitCurrentOffsets(Map<TopicPartition, OffsetAndMetadata> offsetsToCommit) {

0 commit comments

Comments
 (0)