Skip to content

Commit a110259

Browse files
tomazfernandesgaryrussell
authored andcommitted
GH-2069: Same factory for retryable and normal endpoints (#2108)
* GH-2069: Same factory for retryable and normal endpoints Resolves #2069 Originally the retryable topic's ListenerContainerFactory configuration would interfere with non-retryable topics. Now we do not set the feature's factory configurations if the container is not retryable. * Add and update javadoc as requested in code review
1 parent 2cb2adf commit a110259

File tree

7 files changed

+651
-19
lines changed

7 files changed

+651
-19
lines changed

spring-kafka-docs/src/main/asciidoc/retrytopic.adoc

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -668,8 +668,6 @@ public RetryTopicConfiguration myRetryTopic(KafkaTemplate<Integer, MyPojo> templ
668668

669669
By default the RetryTopic configuration will use the provided factory from the `@KafkaListener` annotation, but you can specify a different one to be used to create the retry topic and dlt listener containers.
670670

671-
IMPORTANT: The provided factory will be configured for the retry topic functionality, so you should not use the same factory for both retrying and non-retrying topics. You can however share the same factory between many retry topic configurations.
672-
673671
For the `@RetryableTopic` annotation you can provide the factory's bean name, and using the `RetryTopicConfiguration` bean you can either provide the bean name or the instance itself.
674672

675673
====
@@ -703,6 +701,27 @@ public RetryTopicConfiguration myOtherRetryTopic(KafkaTemplate<Integer, MyPojo>
703701
----
704702
====
705703

704+
705+
IMPORTANT: Since 2.8.3 you can use the same factory for retryable and non-retryable topics.
706+
707+
If you need to revert the factory configuration behavior to prior 2.8.3, you can replace the standard `RetryTopicConfigurer` bean and set `useLegacyFactoryConfigurer` to `true`, such as:
708+
709+
====
710+
[source, java]
711+
----
712+
713+
@Bean(name = RetryTopicInternalBeanNames.RETRY_TOPIC_CONFIGURER)
714+
public RetryTopicConfigurer retryTopicConfigurer(DestinationTopicProcessor destinationTopicProcessor,
715+
ListenerContainerFactoryResolver containerFactoryResolver,
716+
ListenerContainerFactoryConfigurer listenerContainerFactoryConfigurer,
717+
BeanFactory beanFactory,
718+
RetryTopicNamesProviderFactory retryTopicNamesProviderFactory) {
719+
RetryTopicConfigurer retryTopicConfigurer = new RetryTopicConfigurer(destinationTopicProcessor, containerFactoryResolver, listenerContainerFactoryConfigurer, beanFactory, retryTopicNamesProviderFactory);
720+
retryTopicConfigurer.useLegacyFactoryConfigurer(true);
721+
return retryTopicConfigurer;
722+
}
723+
----
724+
706725
[[change-kboe-logging-level]]
707726
==== Changing KafkaBackOffException Logging Level
708727

spring-kafka/src/main/java/org/springframework/kafka/retrytopic/ListenerContainerFactoryConfigurer.java

Lines changed: 95 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -23,13 +23,16 @@
2323
import java.util.List;
2424
import java.util.Set;
2525
import java.util.function.Consumer;
26+
import java.util.regex.Pattern;
2627

2728
import org.apache.commons.logging.LogFactory;
2829

2930
import org.springframework.beans.factory.annotation.Qualifier;
3031
import org.springframework.core.log.LogAccessor;
3132
import org.springframework.kafka.KafkaException;
3233
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
34+
import org.springframework.kafka.config.KafkaListenerContainerFactory;
35+
import org.springframework.kafka.config.KafkaListenerEndpoint;
3336
import org.springframework.kafka.listener.AcknowledgingConsumerAwareMessageListener;
3437
import org.springframework.kafka.listener.CommonErrorHandler;
3538
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
@@ -38,18 +41,23 @@
3841
import org.springframework.kafka.listener.DefaultErrorHandler;
3942
import org.springframework.kafka.listener.KafkaConsumerBackoffManager;
4043
import org.springframework.kafka.listener.adapter.KafkaBackoffAwareMessageListenerAdapter;
44+
import org.springframework.kafka.support.TopicPartitionOffset;
4145
import org.springframework.util.Assert;
4246
import org.springframework.util.backoff.FixedBackOff;
4347

4448
/**
4549
*
46-
* Configures the provided {@link ConcurrentKafkaListenerContainerFactory} with a
47-
* {@link DefaultErrorHandler}, the {@link DeadLetterPublishingRecoverer} created by
48-
* the {@link DeadLetterPublishingRecovererFactory}.
50+
* Decorates the provided {@link ConcurrentKafkaListenerContainerFactory} to add a
51+
* {@link DefaultErrorHandler} and the {@link DeadLetterPublishingRecoverer}
52+
* created by the {@link DeadLetterPublishingRecovererFactory}.
4953
*
50-
* Mind that the same factory can be used by many different
51-
* {@link org.springframework.kafka.annotation.RetryableTopic}s but should not be shared
52-
* with non retryable topics as some of their configurations will be overriden.
54+
* Also sets {@link ContainerProperties#setIdlePartitionEventInterval(Long)}
55+
* and {@link ContainerProperties#setPollTimeout(long)} if its defaults haven't
56+
* been overridden by the user.
57+
*
58+
* Since 2.8.3 these configurations don't interfere with the provided factory
59+
* instance itself, so the same factory instance can be shared among retryable and
60+
* non-retryable endpoints.
5361
*
5462
* @author Tomaz Fernandes
5563
* @since 2.7
@@ -95,20 +103,62 @@ public class ListenerContainerFactoryConfigurer {
95103
this.clock = clock;
96104
}
97105

106+
/**
107+
* Configures the provided {@link ConcurrentKafkaListenerContainerFactory}.
108+
* @param containerFactory the factory instance to be configured.
109+
* @param configuration the configuration provided by the {@link RetryTopicConfiguration}.
110+
* @return the configured factory instance.
111+
* @deprecated in favor of
112+
* {@link #decorateFactory(ConcurrentKafkaListenerContainerFactory, Configuration)}.
113+
*/
114+
@Deprecated
98115
public ConcurrentKafkaListenerContainerFactory<?, ?> configure(
99116
ConcurrentKafkaListenerContainerFactory<?, ?> containerFactory, Configuration configuration) {
100117
return isCached(containerFactory)
101118
? containerFactory
102119
: addToCache(doConfigure(containerFactory, configuration.backOffValues));
103120
}
104121

122+
/**
123+
* Configures the provided {@link ConcurrentKafkaListenerContainerFactory}.
124+
* Meant to be used for the main endpoint, this method ignores the provided backOff values.
125+
* @param containerFactory the factory instance to be configured.
126+
* @param configuration the configuration provided by the {@link RetryTopicConfiguration}.
127+
* @return the configured factory instance.
128+
* @deprecated in favor of
129+
* {@link #decorateFactoryWithoutBackOffValues(ConcurrentKafkaListenerContainerFactory, Configuration)}.
130+
*/
131+
@Deprecated
105132
public ConcurrentKafkaListenerContainerFactory<?, ?> configureWithoutBackOffValues(
106133
ConcurrentKafkaListenerContainerFactory<?, ?> containerFactory, Configuration configuration) {
107134
return isCached(containerFactory)
108135
? containerFactory
109136
: doConfigure(containerFactory, Collections.emptyList());
110137
}
111138

139+
/**
140+
* Decorates the provided {@link ConcurrentKafkaListenerContainerFactory}.
141+
* @param factory the factory instance to be decorated.
142+
* @param configuration the configuration provided by the {@link RetryTopicConfiguration}.
143+
* @return the decorated factory instance.
144+
*/
145+
public KafkaListenerContainerFactory<?> decorateFactory(ConcurrentKafkaListenerContainerFactory<?, ?> factory,
146+
Configuration configuration) {
147+
return new RetryTopicListenerContainerFactoryDecorator(factory, configuration.backOffValues);
148+
}
149+
150+
/**
151+
* Decorates the provided {@link ConcurrentKafkaListenerContainerFactory}.
152+
* Meant to be used for the main endpoint, this method ignores the provided backOff values.
153+
* @param factory the factory instance to be decorated.
154+
* @param configuration the configuration provided by the {@link RetryTopicConfiguration}.
155+
* @return the decorated factory instance.
156+
*/
157+
public KafkaListenerContainerFactory<?> decorateFactoryWithoutBackOffValues(
158+
ConcurrentKafkaListenerContainerFactory<?, ?> factory, Configuration configuration) {
159+
return new RetryTopicListenerContainerFactoryDecorator(factory, Collections.emptyList());
160+
}
161+
112162
private ConcurrentKafkaListenerContainerFactory<?, ?> doConfigure(
113163
ConcurrentKafkaListenerContainerFactory<?, ?> containerFactory, List<Long> backOffValues) {
114164

@@ -217,6 +267,45 @@ private <T> T checkAndCast(Object obj, Class<T> clazz) {
217267
return (T) obj;
218268
}
219269

270+
private class RetryTopicListenerContainerFactoryDecorator implements KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<?, ?>> {
271+
272+
private final ConcurrentKafkaListenerContainerFactory<?, ?> delegate;
273+
private final List<Long> backOffValues;
274+
275+
RetryTopicListenerContainerFactoryDecorator(ConcurrentKafkaListenerContainerFactory<?, ?> delegate, List<Long> backOffValues) {
276+
this.delegate = delegate;
277+
this.backOffValues = backOffValues;
278+
}
279+
280+
@Override
281+
public ConcurrentMessageListenerContainer<?, ?> createListenerContainer(KafkaListenerEndpoint endpoint) {
282+
return decorate(this.delegate.createListenerContainer(endpoint));
283+
}
284+
285+
private ConcurrentMessageListenerContainer<?, ?> decorate(ConcurrentMessageListenerContainer<?, ?> listenerContainer) {
286+
setupBackoffAwareMessageListenerAdapter(listenerContainer, this.backOffValues);
287+
listenerContainer
288+
.setCommonErrorHandler(createErrorHandler(
289+
ListenerContainerFactoryConfigurer.this.deadLetterPublishingRecovererFactory.create()));
290+
return listenerContainer;
291+
}
292+
293+
@Override
294+
public ConcurrentMessageListenerContainer<?, ?> createContainer(TopicPartitionOffset... topicPartitions) {
295+
return decorate(this.delegate.createContainer(topicPartitions));
296+
}
297+
298+
@Override
299+
public ConcurrentMessageListenerContainer<?, ?> createContainer(String... topics) {
300+
return decorate(this.delegate.createContainer(topics));
301+
}
302+
303+
@Override
304+
public ConcurrentMessageListenerContainer<?, ?> createContainer(Pattern topicPattern) {
305+
return decorate(this.delegate.createContainer(topicPattern));
306+
}
307+
}
308+
220309
static class Configuration {
221310

222311
private final List<Long> backOffValues;

spring-kafka/src/main/java/org/springframework/kafka/retrytopic/RetryTopicConfigurer.java

Lines changed: 35 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -220,6 +220,8 @@ public class RetryTopicConfigurer {
220220

221221
private final RetryTopicNamesProviderFactory retryTopicNamesProviderFactory;
222222

223+
private boolean useLegacyFactoryConfigurer = false;
224+
223225
@Deprecated
224226
public RetryTopicConfigurer(DestinationTopicProcessor destinationTopicProcessor,
225227
ListenerContainerFactoryResolver containerFactoryResolver,
@@ -229,6 +231,14 @@ public RetryTopicConfigurer(DestinationTopicProcessor destinationTopicProcessor,
229231
this(destinationTopicProcessor, containerFactoryResolver, listenerContainerFactoryConfigurer, beanFactory, new SuffixingRetryTopicNamesProviderFactory());
230232
}
231233

234+
/**
235+
* Create an instance with the provided properties.
236+
* @param destinationTopicProcessor the destination topic processor.
237+
* @param containerFactoryResolver the container factory resolver.
238+
* @param listenerContainerFactoryConfigurer the container factory configurer.
239+
* @param beanFactory the bean factory.
240+
* @param retryTopicNamesProviderFactory the retry topic names factory.
241+
*/
232242
@Autowired
233243
public RetryTopicConfigurer(DestinationTopicProcessor destinationTopicProcessor,
234244
ListenerContainerFactoryResolver containerFactoryResolver,
@@ -298,7 +308,7 @@ private void processAndRegisterEndpoint(MethodKafkaListenerEndpoint<?, ?> mainEn
298308
RetryTopicConfiguration configuration, DestinationTopicProcessor.Context context,
299309
DestinationTopic.Properties destinationTopicProperties) {
300310

301-
ConcurrentKafkaListenerContainerFactory<?, ?> resolvedFactory =
311+
KafkaListenerContainerFactory<?> resolvedFactory =
302312
destinationTopicProperties.isMainEndpoint()
303313
? resolveAndConfigureFactoryForMainEndpoint(factory, defaultFactoryBeanName, configuration)
304314
: resolveAndConfigureFactoryForRetryEndpoint(factory, defaultFactoryBeanName, configuration);
@@ -361,25 +371,32 @@ private EndpointHandlerMethod getDltEndpointHandlerMethodOrDefault(EndpointHandl
361371
return dltEndpointHandlerMethod != null ? dltEndpointHandlerMethod : DEFAULT_DLT_HANDLER;
362372
}
363373

364-
private ConcurrentKafkaListenerContainerFactory<?, ?> resolveAndConfigureFactoryForMainEndpoint(
374+
private KafkaListenerContainerFactory<?> resolveAndConfigureFactoryForMainEndpoint(
365375
KafkaListenerContainerFactory<?> providedFactory,
366376
String defaultFactoryBeanName, RetryTopicConfiguration configuration) {
367377
ConcurrentKafkaListenerContainerFactory<?, ?> resolvedFactory = this.containerFactoryResolver
368378
.resolveFactoryForMainEndpoint(providedFactory, defaultFactoryBeanName,
369379
configuration.forContainerFactoryResolver());
370-
return this.listenerContainerFactoryConfigurer
371-
.configureWithoutBackOffValues(resolvedFactory, configuration.forContainerFactoryConfigurer());
380+
381+
return this.useLegacyFactoryConfigurer
382+
? this.listenerContainerFactoryConfigurer
383+
.configureWithoutBackOffValues(resolvedFactory, configuration.forContainerFactoryConfigurer())
384+
: this.listenerContainerFactoryConfigurer
385+
.decorateFactoryWithoutBackOffValues(resolvedFactory, configuration.forContainerFactoryConfigurer());
372386
}
373387

374-
private ConcurrentKafkaListenerContainerFactory<?, ?> resolveAndConfigureFactoryForRetryEndpoint(
388+
private KafkaListenerContainerFactory<?> resolveAndConfigureFactoryForRetryEndpoint(
375389
KafkaListenerContainerFactory<?> providedFactory,
376390
String defaultFactoryBeanName,
377391
RetryTopicConfiguration configuration) {
378392
ConcurrentKafkaListenerContainerFactory<?, ?> resolvedFactory =
379393
this.containerFactoryResolver.resolveFactoryForRetryEndpoint(providedFactory, defaultFactoryBeanName,
380394
configuration.forContainerFactoryResolver());
381-
return this.listenerContainerFactoryConfigurer
382-
.configure(resolvedFactory, configuration.forContainerFactoryConfigurer());
395+
return this.useLegacyFactoryConfigurer
396+
? this.listenerContainerFactoryConfigurer.configure(resolvedFactory,
397+
configuration.forContainerFactoryConfigurer())
398+
: this.listenerContainerFactoryConfigurer
399+
.decorateFactory(resolvedFactory, configuration.forContainerFactoryConfigurer());
383400
}
384401

385402
private void throwIfMultiMethodEndpoint(MethodKafkaListenerEndpoint<?, ?> mainEndpoint) {
@@ -396,6 +413,17 @@ public static EndpointHandlerMethod createHandlerMethodWith(Object bean, Method
396413
return new EndpointHandlerMethod(bean, method);
397414
}
398415

416+
/**
417+
* Set to true if you want the {@link ListenerContainerFactoryConfigurer} to
418+
* behave as before 2.8.3.
419+
* @param useLegacyFactoryConfigurer Whether to use the legacy factory configuration.
420+
* @deprecated for removal after the deprecated legacy configuration methods are removed.
421+
*/
422+
@Deprecated
423+
public void useLegacyFactoryConfigurer(boolean useLegacyFactoryConfigurer) {
424+
this.useLegacyFactoryConfigurer = useLegacyFactoryConfigurer;
425+
}
426+
399427
public interface EndpointProcessor extends Consumer<MethodKafkaListenerEndpoint<?, ?>> {
400428

401429
default void process(MethodKafkaListenerEndpoint<?, ?> listenerEndpoint) {

spring-kafka/src/test/java/org/springframework/kafka/retrytopic/ListenerContainerFactoryConfigurerTests.java

Lines changed: 45 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2018-2021 the original author or authors.
2+
* Copyright 2018-2022 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -22,6 +22,7 @@
2222
import static org.mockito.ArgumentMatchers.eq;
2323
import static org.mockito.BDDMockito.given;
2424
import static org.mockito.BDDMockito.then;
25+
import static org.mockito.BDDMockito.willReturn;
2526
import static org.mockito.Mockito.never;
2627
import static org.mockito.Mockito.times;
2728

@@ -46,6 +47,8 @@
4647

4748
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
4849
import org.springframework.kafka.config.ContainerCustomizer;
50+
import org.springframework.kafka.config.KafkaListenerContainerFactory;
51+
import org.springframework.kafka.config.KafkaListenerEndpoint;
4952
import org.springframework.kafka.listener.AcknowledgingConsumerAwareMessageListener;
5053
import org.springframework.kafka.listener.CommonErrorHandler;
5154
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
@@ -132,6 +135,9 @@ class ListenerContainerFactoryConfigurerTests {
132135
@Mock
133136
private RetryTopicConfiguration configuration;
134137

138+
@Mock
139+
private KafkaListenerEndpoint endpoint;
140+
135141
private final long backOffValue = 2000L;
136142

137143
private final ListenerContainerFactoryConfigurer.Configuration lcfcConfiguration =
@@ -360,6 +366,44 @@ void shouldSetupMessageListenerAdapter() {
360366
then(this.configurerContainerCustomizer).should(times(1)).accept(container);
361367
}
362368

369+
@Test
370+
void shouldDecorateFactory() {
371+
372+
// given
373+
given(container.getContainerProperties()).willReturn(containerProperties);
374+
given(deadLetterPublishingRecovererFactory.create()).willReturn(recoverer);
375+
given(containerProperties.getMessageListener()).willReturn(listener);
376+
RecordHeaders headers = new RecordHeaders();
377+
headers.add(RetryTopicHeaders.DEFAULT_HEADER_BACKOFF_TIMESTAMP, originalTimestampBytes);
378+
given(data.headers()).willReturn(headers);
379+
String testListenerId = "testListenerId";
380+
given(container.getListenerId()).willReturn(testListenerId);
381+
given(configuration.forContainerFactoryConfigurer()).willReturn(lcfcConfiguration);
382+
willReturn(container).given(containerFactory).createListenerContainer(endpoint);
383+
384+
// when
385+
ListenerContainerFactoryConfigurer configurer =
386+
new ListenerContainerFactoryConfigurer(kafkaConsumerBackoffManager,
387+
deadLetterPublishingRecovererFactory, clock);
388+
configurer.setContainerCustomizer(configurerContainerCustomizer);
389+
KafkaListenerContainerFactory<?> factory = configurer
390+
.decorateFactory(containerFactory, configuration.forContainerFactoryConfigurer());
391+
factory.createListenerContainer(endpoint);
392+
393+
// then
394+
then(container).should(times(1)).setupMessageListener(listenerAdapterCaptor.capture());
395+
KafkaBackoffAwareMessageListenerAdapter<?, ?> listenerAdapter =
396+
(KafkaBackoffAwareMessageListenerAdapter<?, ?>) listenerAdapterCaptor.getValue();
397+
listenerAdapter.onMessage(data, ack, consumer);
398+
399+
then(this.kafkaConsumerBackoffManager).should(times(1))
400+
.createContext(anyLong(), listenerIdCaptor.capture(), any(TopicPartition.class), eq(consumer));
401+
assertThat(listenerIdCaptor.getValue()).isEqualTo(testListenerId);
402+
then(listener).should(times(1)).onMessage(data, ack, consumer);
403+
404+
then(this.configurerContainerCustomizer).should(times(1)).accept(container);
405+
}
406+
363407
@Test
364408
void shouldCacheFactoryInstances() {
365409

spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryTopicConfigurerTests.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2018-2021 the original author or authors.
2+
* Copyright 2018-2022 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -224,9 +224,9 @@ void shouldConfigureRetryEndpoints() {
224224

225225
willReturn(containerFactory).given(containerFactoryResolver).resolveFactoryForRetryEndpoint(containerFactory,
226226
defaultFactoryBeanName, factoryResolverConfig);
227-
willReturn(containerFactory).given(this.listenerContainerFactoryConfigurer).configure(containerFactory,
227+
willReturn(containerFactory).given(this.listenerContainerFactoryConfigurer).decorateFactory(containerFactory,
228228
lcfcConfiguration);
229-
willReturn(containerFactory).given(this.listenerContainerFactoryConfigurer).configureWithoutBackOffValues(containerFactory,
229+
willReturn(containerFactory).given(this.listenerContainerFactoryConfigurer).decorateFactoryWithoutBackOffValues(containerFactory,
230230
lcfcConfiguration);
231231

232232
RetryTopicConfigurer configurer = new RetryTopicConfigurer(destinationTopicProcessor, containerFactoryResolver,

0 commit comments

Comments
 (0)