Skip to content

Commit ec98428

Browse files
committed
Fix Map Iteration
1 parent 0287046 commit ec98428

File tree

2 files changed

+8
-9
lines changed

2 files changed

+8
-9
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2088,15 +2088,12 @@ private void processTimestampSeeks() {
20882088
}
20892089
}
20902090
if (timestampSeeks != null) {
2091-
Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes = this.consumer
2092-
.offsetsForTimes(timestampSeeks);
2093-
2094-
for (TopicPartition tp : offsetsForTimes.keySet()) {
2095-
OffsetAndTimestamp ot = offsetsForTimes.get(tp);
2091+
Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes = this.consumer.offsetsForTimes(timestampSeeks);
2092+
offsetsForTimes.forEach((tp, ot) -> {
20962093
if (ot != null) {
20972094
this.consumer.seek(tp, ot.offset());
20982095
}
2099-
}
2096+
});
21002097
}
21012098
}
21022099

spring-kafka/src/test/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainerMockTests.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -372,10 +372,12 @@ void testAsyncTimestampSeeks() throws InterruptedException {
372372
.collect(Collectors.toMap(tp -> tp, tp -> 200L)));
373373
given(consumer.offsetsForTimes(Collections.singletonMap(tp0, 42L)))
374374
.willReturn(Collections.singletonMap(tp0, new OffsetAndTimestamp(73L, 42L)));
375+
Map<TopicPartition, OffsetAndTimestamp> map = new HashMap<>(assignments.stream()
376+
.collect(Collectors.toMap(tp -> tp,
377+
tp -> new OffsetAndTimestamp(tp.equals(tp0) ? 73L : 92L, 43L))));
378+
map.put(new TopicPartition("foo", 5), null);
375379
given(consumer.offsetsForTimes(any()))
376-
.willReturn(assignments.stream()
377-
.collect(Collectors.toMap(tp -> tp,
378-
tp -> new OffsetAndTimestamp(tp.equals(tp0) ? 73L : 92L, 43L))));
380+
.willReturn(map);
379381
given(consumerFactory.createConsumer("grp", "", "-0", KafkaTestUtils.defaultPropertyOverrides()))
380382
.willReturn(consumer);
381383
ContainerProperties containerProperties = new ContainerProperties("foo");

0 commit comments

Comments
 (0)