Skip to content

Commit ef6c115

Browse files
garyrussellartembilan
authored andcommitted
GH-1646 Clear lastCommits after fixTxOffset
Resolves #1646 ``` Failed to correct transactional offset(s) java.lang.IllegalStateException: You can only check the position for partitions assigned to this consumer. ``` It is not clear how this happened since we remove revoked partitions from `lastCommits`. However, `lastCommits` should be cleared, even if successful, to avoid unnecessary processing if no records are received by the next poll. - add the `lastCommits` to the error log - clear `lastCommits` in a finally block **cherry-pick to 2.5.x**
1 parent 3a86d8d commit ef6c115

File tree

2 files changed

+6
-1
lines changed

2 files changed

+6
-1
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1237,7 +1237,11 @@ private void fixTxOffsetsIfNeeded() {
12371237
}
12381238
}
12391239
catch (Exception e) {
1240-
this.logger.error(e, "Failed to correct transactional offset(s)");
1240+
this.logger.error(e, () -> "Failed to correct transactional offset(s): "
1241+
+ ListenerConsumer.this.lastCommits);
1242+
}
1243+
finally {
1244+
ListenerConsumer.this.lastCommits.clear();
12411245
}
12421246
}
12431247
}

spring-kafka/src/test/java/org/springframework/kafka/listener/TransactionalContainerTests.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -710,6 +710,7 @@ private void testFixLagGuts(String topic, int whichTm) throws InterruptedExcepti
710710
assertThat(latch.await(60, TimeUnit.SECONDS)).isTrue();
711711
TopicPartition partition0 = new TopicPartition(topic, 0);
712712
assertThat(committed.get().get(partition0).offset()).isEqualTo(2L);
713+
assertThat(KafkaTestUtils.getPropertyValue(container, "listenerConsumer.lastCommits", Map.class)).isEmpty();
713714
container.stop();
714715
pf.destroy();
715716
}

0 commit comments

Comments
 (0)