Skip to content

Commit 5e45d04

Browse files
artembilangaryrussell
authored andcommitted
Avoid BeanDefinitionOverrideEx with EnableKafka
The current `KafkaBootstrapConfiguration` provides a couple beans which can be overridden in the target project, but at the same time the latest Spring Boot is going to fail with the `BeanDefinitionOverrideException` * Change `KafkaBootstrapConfiguration` to be as an `ImportBeanDefinitionRegistrar` instead of `@Configuration` * Check for bean definition existence before registering `KafkaListenerAnnotationBeanPostProcessor` and `KafkaListenerEndpointRegistry` in this `KafkaBootstrapConfiguration` * Deffer such a bean definition processing with the `KafkaListenerConfigurationSelector`
1 parent cc1e19b commit 5e45d04

File tree

4 files changed

+72
-16
lines changed

4 files changed

+72
-16
lines changed

spring-kafka/src/main/java/org/springframework/kafka/annotation/EnableKafka.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -241,6 +241,7 @@
241241
*
242242
* @author Stephane Nicoll
243243
* @author Gary Russell
244+
* @author Artem Bilan
244245
*
245246
* @see KafkaListener
246247
* @see KafkaListenerAnnotationBeanPostProcessor
@@ -250,6 +251,6 @@
250251
@Target(ElementType.TYPE)
251252
@Retention(RetentionPolicy.RUNTIME)
252253
@Documented
253-
@Import(KafkaBootstrapConfiguration.class)
254+
@Import(KafkaListenerConfigurationSelector.class)
254255
public @interface EnableKafka {
255256
}

spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaBootstrapConfiguration.java

Lines changed: 18 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -16,15 +16,15 @@
1616

1717
package org.springframework.kafka.annotation;
1818

19-
import org.springframework.beans.factory.config.BeanDefinition;
20-
import org.springframework.context.annotation.Bean;
21-
import org.springframework.context.annotation.Configuration;
22-
import org.springframework.context.annotation.Role;
19+
import org.springframework.beans.factory.support.BeanDefinitionRegistry;
20+
import org.springframework.beans.factory.support.RootBeanDefinition;
21+
import org.springframework.context.annotation.ImportBeanDefinitionRegistrar;
22+
import org.springframework.core.type.AnnotationMetadata;
2323
import org.springframework.kafka.config.KafkaListenerConfigUtils;
2424
import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
2525

2626
/**
27-
* {@code @Configuration} class that registers a {@link KafkaListenerAnnotationBeanPostProcessor}
27+
* An {@link ImportBeanDefinitionRegistrar} class that registers a {@link KafkaListenerAnnotationBeanPostProcessor}
2828
* bean capable of processing Spring's @{@link KafkaListener} annotation. Also register
2929
* a default {@link KafkaListenerEndpointRegistry}.
3030
*
@@ -39,18 +39,21 @@
3939
* @see KafkaListenerEndpointRegistry
4040
* @see EnableKafka
4141
*/
42-
@Configuration(proxyBeanMethods = false)
43-
public class KafkaBootstrapConfiguration {
42+
public class KafkaBootstrapConfiguration implements ImportBeanDefinitionRegistrar {
4443

45-
@Bean(name = KafkaListenerConfigUtils.KAFKA_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME)
46-
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
47-
public KafkaListenerAnnotationBeanPostProcessor<?, ?> kafkaListenerAnnotationProcessor() {
48-
return new KafkaListenerAnnotationBeanPostProcessor<>();
49-
}
44+
@Override
45+
public void registerBeanDefinitions(AnnotationMetadata importingClassMetadata, BeanDefinitionRegistry registry) {
46+
if (!registry.containsBeanDefinition(
47+
KafkaListenerConfigUtils.KAFKA_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME)) {
48+
49+
registry.registerBeanDefinition(KafkaListenerConfigUtils.KAFKA_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME,
50+
new RootBeanDefinition(KafkaListenerAnnotationBeanPostProcessor.class));
51+
}
5052

51-
@Bean(name = KafkaListenerConfigUtils.KAFKA_LISTENER_ENDPOINT_REGISTRY_BEAN_NAME)
52-
public KafkaListenerEndpointRegistry defaultKafkaListenerEndpointRegistry() {
53-
return new KafkaListenerEndpointRegistry();
53+
if (!registry.containsBeanDefinition(KafkaListenerConfigUtils.KAFKA_LISTENER_ENDPOINT_REGISTRY_BEAN_NAME)) {
54+
registry.registerBeanDefinition(KafkaListenerConfigUtils.KAFKA_LISTENER_ENDPOINT_REGISTRY_BEAN_NAME,
55+
new RootBeanDefinition(KafkaListenerEndpointRegistry.class));
56+
}
5457
}
5558

5659
}
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
/*
2+
* Copyright 2019 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.kafka.annotation;
18+
19+
import org.springframework.context.annotation.DeferredImportSelector;
20+
import org.springframework.core.annotation.Order;
21+
import org.springframework.core.type.AnnotationMetadata;
22+
23+
/**
24+
* A {@link DeferredImportSelector} implementation with the lowest order to import a
25+
* {@link KafkaBootstrapConfiguration} as late as possible.
26+
*
27+
* @author Artem Bilan
28+
*
29+
* @since 2.3
30+
*/
31+
@Order
32+
public class KafkaListenerConfigurationSelector implements DeferredImportSelector {
33+
34+
@Override
35+
public String[] selectImports(AnnotationMetadata importingClassMetadata) {
36+
return new String[] { KafkaBootstrapConfiguration.class.getName() };
37+
}
38+
39+
}

spring-kafka/src/test/java/org/springframework/kafka/annotation/AliasPropertiesTests.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,9 @@
3434
import org.springframework.context.annotation.Configuration;
3535
import org.springframework.core.annotation.AliasFor;
3636
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
37+
import org.springframework.kafka.config.KafkaListenerConfigUtils;
3738
import org.springframework.kafka.config.KafkaListenerContainerFactory;
39+
import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
3840
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
3941
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
4042
import org.springframework.kafka.core.KafkaTemplate;
@@ -46,6 +48,8 @@
4648

4749
/**
4850
* @author Gary Russell
51+
* @author Artem Bilan
52+
*
4953
* @since 2.2
5054
*
5155
*/
@@ -59,10 +63,14 @@ public class AliasPropertiesTests {
5963
@Autowired
6064
private Config config;
6165

66+
@Autowired
67+
private KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;
68+
6269
@Test
6370
public void testAliasFor() throws Exception {
6471
this.template.send("alias.tests", "foo");
6572
assertThat(this.config.latch.await(10, TimeUnit.SECONDS)).isTrue();
73+
assertThat(this.config.kafkaListenerEndpointRegistry()).isSameAs(this.kafkaListenerEndpointRegistry);
6674
}
6775

6876
@Configuration
@@ -71,6 +79,11 @@ public static class Config {
7179

7280
private final CountDownLatch latch = new CountDownLatch(1);
7381

82+
@Bean(KafkaListenerConfigUtils.KAFKA_LISTENER_ENDPOINT_REGISTRY_BEAN_NAME)
83+
public KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry() {
84+
return new KafkaListenerEndpointRegistry();
85+
}
86+
7487
@Bean
7588
public EmbeddedKafkaBroker embeddedKafka() {
7689
return new EmbeddedKafkaBroker(1, true, "alias.tests");

0 commit comments

Comments
 (0)