Skip to content

Commit 03a5e42

Browse files
authored
GH-2038: Fix DelegatingByTopicSerialization Config
Resolves #2038 - Don't add duplicate delegates - Configure pre-loaded delegates * Fix duplicate check.
1 parent 4f44ad4 commit 03a5e42

File tree

2 files changed

+117
-3
lines changed

2 files changed

+117
-3
lines changed

spring-kafka/src/main/java/org/springframework/kafka/support/serializer/DelegatingByTopicSerialization.java

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import java.util.HashMap;
2222
import java.util.Map;
2323
import java.util.Map.Entry;
24+
import java.util.Set;
2425
import java.util.concurrent.ConcurrentHashMap;
2526
import java.util.regex.Pattern;
2627

@@ -74,6 +75,8 @@ public abstract class DelegatingByTopicSerialization<T extends Closeable> implem
7475

7576
private final Map<Pattern, T> delegates = new ConcurrentHashMap<>();
7677

78+
private final Set<String> patterns = ConcurrentHashMap.newKeySet();
79+
7780
private T defaultDelegate;
7881

7982
private boolean forKeys;
@@ -84,7 +87,11 @@ public DelegatingByTopicSerialization() {
8487
}
8588

8689
public DelegatingByTopicSerialization(Map<Pattern, T> delegates, T defaultDelegate) {
90+
Assert.notNull(delegates, "'delegates' cannot be null");
91+
Assert.notNull(defaultDelegate, "'defaultDelegate' cannot be null");
8792
this.delegates.putAll(delegates);
93+
delegates.keySet().forEach(pattern -> Assert.isTrue(this.patterns.add(pattern.pattern()),
94+
"Duplicate pattern: " + pattern.pattern()));
8895
this.defaultDelegate = defaultDelegate;
8996
}
9097

@@ -98,6 +105,9 @@ public void setCaseSensitive(boolean caseSensitive) {
98105

99106
@SuppressWarnings(UNCHECKED)
100107
protected void configure(Map<String, ?> configs, boolean isKey) {
108+
if (this.delegates.size() > 0) {
109+
this.delegates.values().forEach(delegate -> configureDelegate(configs, isKey, delegate));
110+
}
101111
this.forKeys = isKey;
102112
Object insensitive = configs.get(CASE_SENSITIVE);
103113
if (insensitive instanceof String) {
@@ -107,7 +117,7 @@ else if (insensitive instanceof Boolean) {
107117
this.cased = (Boolean) insensitive;
108118
}
109119
String configKey = defaultKey();
110-
if (configKey != null) {
120+
if (configKey != null && configs.containsKey(configKey)) {
111121
buildDefault(configs, configKey, isKey, configs.get(configKey));
112122
}
113123
configKey = configKey();
@@ -146,6 +156,10 @@ private void processMap(Map<String, ?> configs, boolean isKey, String configKey,
146156
protected void build(Map<String, ?> configs, boolean isKey, String configKey, Object delegate, Pattern pattern) {
147157

148158
if (isInstance(delegate)) {
159+
if (pattern != null && !this.patterns.add(pattern.pattern())) {
160+
LOGGER.debug(() -> "Delegate already configured for " + pattern.pattern());
161+
return;
162+
}
149163
this.delegates.put(pattern, (T) delegate);
150164
configureDelegate(configs, isKey, (T) delegate);
151165
}
@@ -208,8 +222,9 @@ private Map<Pattern, T> createDelegates(String mappings, Map<String, ?> configs,
208222
return delegateMap;
209223
}
210224

225+
@Nullable
211226
private T createInstanceAndConfigure(Map<String, ?> configs, boolean isKey,
212-
Map<Pattern, T> delegates2, Pattern pattern, String className) {
227+
Map<Pattern, T> delegates2, @Nullable Pattern pattern, String className) {
213228

214229
try {
215230
Class<?> clazz = ClassUtils.forName(className.trim(), ClassUtils.getDefaultClassLoader());
@@ -240,6 +255,10 @@ else if (key instanceof String) {
240255
protected T instantiateAndConfigure(Map<String, ?> configs, boolean isKey, Map<Pattern, T> delegates2,
241256
@Nullable Pattern pattern, Class<?> clazz) {
242257

258+
if (pattern != null && !this.patterns.add(pattern.pattern())) {
259+
LOGGER.debug(() -> "Delegate already configured for " + pattern.pattern());
260+
return null;
261+
}
243262
try {
244263
@SuppressWarnings(UNCHECKED)
245264
T delegate = (T) clazz.getDeclaredConstructor().newInstance();
@@ -282,7 +301,7 @@ protected T findDelegate(String topic) {
282301
}
283302
if (delegate == null) {
284303
throw new IllegalStateException(
285-
"No serializer found for topic '" + topic + "'");
304+
"No (de)serializer found for topic '" + topic + "'");
286305
}
287306
return delegate;
288307
}
Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
/*
2+
* Copyright 2021 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.kafka.support.serializer;
18+
19+
import static org.assertj.core.api.Assertions.assertThat;
20+
import static org.mockito.Mockito.mock;
21+
22+
import java.util.Map;
23+
import java.util.regex.Pattern;
24+
25+
import org.apache.kafka.common.header.Headers;
26+
import org.apache.kafka.common.serialization.Deserializer;
27+
import org.apache.kafka.common.serialization.StringDeserializer;
28+
import org.junit.jupiter.api.Test;
29+
30+
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
31+
import org.springframework.kafka.listener.ContainerProperties;
32+
import org.springframework.kafka.listener.KafkaMessageListenerContainer;
33+
import org.springframework.kafka.listener.MessageListener;
34+
import org.springframework.kafka.test.condition.EmbeddedKafkaCondition;
35+
import org.springframework.kafka.test.context.EmbeddedKafka;
36+
import org.springframework.kafka.test.utils.KafkaTestUtils;
37+
38+
/**
39+
* @author Gary Russell
40+
* @since 2.8.1
41+
*
42+
*/
43+
@EmbeddedKafka(topics = SerializationIntegrationTests.DBTD_TOPIC)
44+
public class SerializationIntegrationTests {
45+
46+
public static final String DBTD_TOPIC = "dbtd";
47+
48+
@Test
49+
void configurePreLoadedDelegates() {
50+
Map<String, Object> consumerProps =
51+
KafkaTestUtils.consumerProps(DBTD_TOPIC, "false", EmbeddedKafkaCondition.getBroker());
52+
consumerProps.put(DelegatingByTopicDeserializer.VALUE_SERIALIZATION_TOPIC_CONFIG, DBTD_TOPIC + ":"
53+
+ TestDeserializer.class.getName());
54+
TestDeserializer testDeser = new TestDeserializer();
55+
DelegatingByTopicDeserializer delegating = new DelegatingByTopicDeserializer(
56+
Map.of(Pattern.compile(DBTD_TOPIC), testDeser), new StringDeserializer());
57+
DefaultKafkaConsumerFactory<String, Object> cFact =
58+
new DefaultKafkaConsumerFactory<String, Object>(consumerProps,
59+
new StringDeserializer(), delegating);
60+
ContainerProperties props = new ContainerProperties(DBTD_TOPIC);
61+
props.setMessageListener(mock(MessageListener.class));
62+
KafkaMessageListenerContainer<String, Object> container = new KafkaMessageListenerContainer<>(cFact, props);
63+
container.start();
64+
assertThat(KafkaTestUtils.getPropertyValue(container, "listenerConsumer.consumer.valueDeserializer"))
65+
.isSameAs(delegating);
66+
Map<?, ?> delegates = KafkaTestUtils.getPropertyValue(delegating, "delegates", Map.class);
67+
assertThat(delegates).hasSize(1);
68+
assertThat(delegates.values().iterator().next()).isSameAs(testDeser);
69+
assertThat(testDeser.configured).isTrue();
70+
container.stop();
71+
}
72+
73+
static class TestDeserializer implements Deserializer<Object> {
74+
75+
boolean configured;
76+
77+
@Override
78+
public void configure(Map<String, ?> configs, boolean isKey) {
79+
this.configured = true;
80+
}
81+
82+
@Override
83+
public Object deserialize(String topic, byte[] data) {
84+
return null;
85+
}
86+
87+
@Override
88+
public Object deserialize(String topic, Headers headers, byte[] data) {
89+
return Deserializer.super.deserialize(topic, headers, data);
90+
}
91+
92+
}
93+
94+
}
95+

0 commit comments

Comments
 (0)