Skip to content

Commit 5b2fd49

Browse files
committed
Fix Map Iteration
1 parent f1f687b commit 5b2fd49

File tree

2 files changed

+8
-9
lines changed

2 files changed

+8
-9
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2003,15 +2003,12 @@ private void processTimestampSeeks() {
20032003
}
20042004
}
20052005
if (timestampSeeks != null) {
2006-
Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes = this.consumer
2007-
.offsetsForTimes(timestampSeeks);
2008-
2009-
for (TopicPartition tp : offsetsForTimes.keySet()) {
2010-
OffsetAndTimestamp ot = offsetsForTimes.get(tp);
2006+
Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes = this.consumer.offsetsForTimes(timestampSeeks);
2007+
offsetsForTimes.forEach((tp, ot) -> {
20112008
if (ot != null) {
20122009
this.consumer.seek(tp, ot.offset());
20132010
}
2014-
}
2011+
});
20152012
}
20162013
}
20172014

spring-kafka/src/test/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainerMockTests.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -372,10 +372,12 @@ void testAsyncTimestampSeeks() throws InterruptedException {
372372
.collect(Collectors.toMap(tp -> tp, tp -> 200L)));
373373
given(consumer.offsetsForTimes(Collections.singletonMap(tp0, 42L)))
374374
.willReturn(Collections.singletonMap(tp0, new OffsetAndTimestamp(73L, 42L)));
375+
Map<TopicPartition, OffsetAndTimestamp> map = new HashMap<>(assignments.stream()
376+
.collect(Collectors.toMap(tp -> tp,
377+
tp -> new OffsetAndTimestamp(tp.equals(tp0) ? 73L : 92L, 43L))));
378+
map.put(new TopicPartition("foo", 5), null);
375379
given(consumer.offsetsForTimes(any()))
376-
.willReturn(assignments.stream()
377-
.collect(Collectors.toMap(tp -> tp,
378-
tp -> new OffsetAndTimestamp(tp.equals(tp0) ? 73L : 92L, 43L))));
380+
.willReturn(map);
379381
given(consumerFactory.createConsumer("grp", "", "-0", KafkaTestUtils.defaultPropertyOverrides()))
380382
.willReturn(consumer);
381383
ContainerProperties containerProperties = new ContainerProperties("foo");

0 commit comments

Comments
 (0)