Skip to content

Commit d2bfca9

Browse files
committed
Fix idleBetweenPolls max value calculation
- Wrong default used when `max.poll.interval.ms` is not specified - 30s Vs 300s - Value set in `@KafkaListener.properties` ignored for this calculatio and default used instead - Also use actual default for `ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG` **I will back port - expecting conflicts** * Remove unused constant
1 parent 313852a commit d2bfca9

File tree

2 files changed

+38
-13
lines changed

2 files changed

+38
-13
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 22 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,8 @@ public class KafkaMessageListenerContainer<K, V> // NOSONAR line count
139139
private static final boolean MICROMETER_PRESENT = ClassUtils.isPresent(
140140
"io.micrometer.core.instrument.MeterRegistry", KafkaMessageListenerContainer.class.getClassLoader());
141141

142+
private static final Map<String, Object> CONSUMER_CONFIG_DEFAULTS = ConsumerConfig.configDef().defaultValues();
143+
142144
private final AbstractMessageListenerContainer<K, V> thisOrParentContainer;
143145

144146
private final TopicPartitionOffset[] topicPartitions;
@@ -466,8 +468,6 @@ public String toString() {
466468

467469
private final class ListenerConsumer implements SchedulingAwareRunnable, ConsumerSeekCallback {
468470

469-
private static final int SIXTY = 60;
470-
471471
private static final String UNCHECKED = "unchecked";
472472

473473
private static final String RAWTYPES = "rawtypes";
@@ -613,7 +613,7 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume
613613

614614
@SuppressWarnings(UNCHECKED)
615615
ListenerConsumer(GenericMessageListener<?> listener, ListenerType listenerType) {
616-
Properties consumerProperties = new Properties(this.containerProperties.getKafkaConsumerProperties());
616+
Properties consumerProperties = propertiesFromProperties();
617617
checkGroupInstance(consumerProperties, KafkaMessageListenerContainer.this.consumerFactory);
618618
this.autoCommit = determineAutoCommit(consumerProperties);
619619
this.consumer =
@@ -694,6 +694,20 @@ else if (listener instanceof MessageListener) {
694694
this.micrometerHolder = obtainMicrometerHolder();
695695
}
696696

697+
private Properties propertiesFromProperties() {
698+
Properties propertyOverrides = this.containerProperties.getKafkaConsumerProperties();
699+
Properties props = new Properties();
700+
props.putAll(propertyOverrides);
701+
Set<String> stringPropertyNames = propertyOverrides.stringPropertyNames();
702+
// User might have provided properties as defaults
703+
stringPropertyNames.forEach((name) -> {
704+
if (!props.contains(name)) {
705+
props.setProperty(name, propertyOverrides.getProperty(name));
706+
}
707+
});
708+
return props;
709+
}
710+
697711
private void checkGroupInstance(Properties properties, ConsumerFactory<K, V> consumerFactory) {
698712
String groupInstance = properties.getProperty(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG);
699713
if (!StringUtils.hasText(groupInstance)) {
@@ -753,9 +767,9 @@ else if (timeout instanceof String) {
753767
this.logger.warn(() -> "Unexpected type: " + timeoutToLog.getClass().getName()
754768
+ " in property '"
755769
+ ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG
756-
+ "'; defaulting to 30 seconds.");
770+
+ "'; using Kafka default.");
757771
}
758-
return Duration.ofSeconds(SIXTY / 2).toMillis(); // Default 'max.poll.interval.ms' is 30 seconds
772+
return (int) CONSUMER_CONFIG_DEFAULTS.get(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG);
759773
}
760774
}
761775

@@ -822,12 +836,12 @@ else if (timeout instanceof String) {
822836
this.logger.warn(() -> "Unexpected type: " + timeoutToLog.getClass().getName()
823837
+ " in property '"
824838
+ ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG
825-
+ "'; defaulting to 60 seconds for sync commit timeouts");
839+
+ "'; defaulting to Kafka default for sync commit timeouts");
826840
}
827-
return Duration.ofSeconds(SIXTY);
841+
return Duration
842+
.ofMillis((int) CONSUMER_CONFIG_DEFAULTS.get(ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG));
828843
}
829844
}
830-
831845
}
832846

833847
private Object findDeserializerClass(Map<String, Object> props, boolean isValue) {

spring-kafka/src/test/java/org/springframework/kafka/annotation/EnableKafkaIntegrationTests.java

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,7 @@
9090
import org.springframework.kafka.listener.ContainerProperties;
9191
import org.springframework.kafka.listener.ContainerProperties.AckMode;
9292
import org.springframework.kafka.listener.KafkaListenerErrorHandler;
93+
import org.springframework.kafka.listener.KafkaMessageListenerContainer;
9394
import org.springframework.kafka.listener.ListenerExecutionFailedException;
9495
import org.springframework.kafka.listener.MessageListenerContainer;
9596
import org.springframework.kafka.listener.adapter.FilteringMessageListenerAdapter;
@@ -223,6 +224,10 @@ public void testAnonymous() {
223224
List<?> containers = KafkaTestUtils.getPropertyValue(container, "containers", List.class);
224225
assertThat(KafkaTestUtils.getPropertyValue(containers.get(0), "listenerConsumer.consumerGroupId"))
225226
.isEqualTo(DEFAULT_TEST_GROUP_ID);
227+
assertThat(KafkaTestUtils.getPropertyValue(containers.get(0), "listenerConsumer.maxPollInterval"))
228+
.isEqualTo(300000L);
229+
assertThat(KafkaTestUtils.getPropertyValue(containers.get(0), "listenerConsumer.syncCommitTimeout"))
230+
.isEqualTo(Duration.ofSeconds(60));
226231
container.stop();
227232
}
228233

@@ -349,20 +354,24 @@ public void testAutoStartup() {
349354
assertThat(listenerContainer.getContainerProperties().getSyncCommitTimeout()).isNull();
350355
this.registry.start();
351356
assertThat(listenerContainer.isRunning()).isTrue();
352-
assertThat(((ConcurrentMessageListenerContainer<?, ?>) listenerContainer)
357+
KafkaMessageListenerContainer<?, ?> kafkaMessageListenerContainer =
358+
((ConcurrentMessageListenerContainer<?, ?>) listenerContainer)
353359
.getContainers()
354-
.get(0)
360+
.get(0);
361+
assertThat(kafkaMessageListenerContainer
355362
.getContainerProperties().getSyncCommitTimeout())
356-
.isEqualTo(Duration.ofSeconds(60));
363+
.isEqualTo(Duration.ofSeconds(59));
357364
assertThat(listenerContainer.getContainerProperties().getSyncCommitTimeout())
358-
.isEqualTo(Duration.ofSeconds(60));
365+
.isEqualTo(Duration.ofSeconds(59));
359366
listenerContainer.stop();
360367
assertThat(KafkaTestUtils.getPropertyValue(listenerContainer, "containerProperties.syncCommits", Boolean.class))
361368
.isFalse();
362369
assertThat(KafkaTestUtils.getPropertyValue(listenerContainer, "containerProperties.commitCallback"))
363370
.isNotNull();
364371
assertThat(KafkaTestUtils.getPropertyValue(listenerContainer, "containerProperties.consumerRebalanceListener"))
365372
.isNotNull();
373+
assertThat(KafkaTestUtils.getPropertyValue(kafkaMessageListenerContainer, "listenerConsumer.maxPollInterval"))
374+
.isEqualTo(301000L);
366375
}
367376

368377
@Test
@@ -1582,7 +1591,9 @@ static class Listener implements ConsumerSeekAware {
15821591
volatile String name;
15831592

15841593
@KafkaListener(id = "manualStart", topics = "manualStart",
1585-
containerFactory = "kafkaAutoStartFalseListenerContainerFactory")
1594+
containerFactory = "kafkaAutoStartFalseListenerContainerFactory",
1595+
properties = { ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG + ":301000",
1596+
ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG + ":59000" })
15861597
public void manualStart(String foo) {
15871598
}
15881599

0 commit comments

Comments
 (0)