Skip to content

Commit 3f919bf

Browse files
committed
custom consumer properties
1 parent 7ed5e99 commit 3f919bf

File tree

8 files changed

+175
-25
lines changed

8 files changed

+175
-25
lines changed
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
package com.jupitertools.springtestkafka.expected;
2+
3+
4+
import java.util.HashMap;
5+
import java.util.Map;
6+
7+
import org.apache.kafka.clients.consumer.ConsumerConfig;
8+
import org.apache.kafka.common.serialization.StringDeserializer;
9+
10+
public class DefaultKafkaConsumerConfig implements TestConsumerConfig {
11+
12+
@Override
13+
public Map<String, Object> getProperties() {
14+
Map<String, Object> props = new HashMap<>();
15+
props.put(ConsumerConfig.GROUP_ID_CONFIG, "group-name");
16+
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
17+
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
18+
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
19+
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
20+
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
21+
return props;
22+
}
23+
}
Lines changed: 16 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,25 +1,23 @@
11
package com.jupitertools.springtestkafka.expected;
22

3-
import java.util.HashMap;
43
import java.util.Map;
54

65
import org.apache.kafka.clients.consumer.ConsumerConfig;
7-
import org.apache.kafka.common.serialization.IntegerDeserializer;
8-
import org.apache.kafka.common.serialization.StringDeserializer;
96

107
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
118
import org.springframework.kafka.listener.ContainerProperties;
129
import org.springframework.kafka.listener.KafkaMessageListenerContainer;
1310
import org.springframework.kafka.listener.MessageListener;
1411

15-
public class KafkaContainerFactory {
1612

13+
public class KafkaContainerFactory {
1714

1815
public KafkaMessageListenerContainer createContainer(String bootstrapProperties,
1916
String[] topicNames,
20-
MessageListener kafkaTestConsumer) {
17+
MessageListener kafkaTestConsumer,
18+
Class<? extends TestConsumerConfig> consumerConfigClass) {
2119

22-
Map<String, Object> props = consumerProps(bootstrapProperties);
20+
Map<String, Object> props = prepareConsumerProperties(consumerConfigClass, bootstrapProperties);
2321
DefaultKafkaConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<Integer, String>(props);
2422

2523
ContainerProperties containerProps = new ContainerProperties(topicNames);
@@ -32,15 +30,17 @@ public KafkaMessageListenerContainer createContainer(String bootstrapProperties,
3230
return container;
3331
}
3432

35-
private Map<String, Object> consumerProps(String bootstrapProperties) {
36-
Map<String, Object> props = new HashMap<>();
37-
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapProperties);
38-
props.put(ConsumerConfig.GROUP_ID_CONFIG, "group-name");
39-
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
40-
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
41-
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
42-
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
43-
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
44-
return props;
33+
34+
private Map<String, Object> prepareConsumerProperties(Class<? extends TestConsumerConfig> consumerConfigClass,
35+
String bootstrapProperties) {
36+
try {
37+
TestConsumerConfig consumerConfig = consumerConfigClass.getDeclaredConstructor().newInstance();
38+
Map<String, Object> properties = consumerConfig.getProperties();
39+
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapProperties);
40+
return properties;
41+
} catch (Throwable e) {
42+
e.printStackTrace();
43+
throw new RuntimeException("Unable to instantiate a class with TestConsumerConfig", e);
44+
}
4545
}
4646
}

src/main/java/com/jupitertools/springtestkafka/expected/KafkaExpectedMessagesExtension.java

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
/**
1919
* Junit5 extension to work with the data-sets in Kafka tests.
20-
*
20+
* <p>
2121
* see {@link EnableKafkaTest}
2222
* see {@link ExpectedMessages}
2323
* see {@link NoMessagesExpected}
@@ -34,7 +34,8 @@ public class KafkaExpectedMessagesExtension implements BeforeAllCallback,
3434
public void beforeAll(ExtensionContext extensionContext) throws Exception {
3535
String bootstrap = getBootstrapProperties(extensionContext);
3636
String[] topics = getTopics(extensionContext);
37-
kafkaMessageBroker = new KafkaMessageBroker(bootstrap, topics);
37+
Class<? extends TestConsumerConfig> consumerConfigClass = getConsumerConfigClass(extensionContext);
38+
kafkaMessageBroker = new KafkaMessageBroker(bootstrap, topics, consumerConfigClass);
3839
kafkaMessageBroker.start();
3940
}
4041

@@ -110,12 +111,10 @@ private void assertMessages(ExtensionContext extensionContext) {
110111
}
111112

112113

113-
114114
private String[] getTopics(ExtensionContext extensionContext) {
115-
116-
EnableKafkaTest enableKafkaTest = extensionContext.getRequiredTestClass()
117-
.getAnnotation(EnableKafkaTest.class);
118-
return enableKafkaTest.topics();
115+
return extensionContext.getRequiredTestClass()
116+
.getAnnotation(EnableKafkaTest.class)
117+
.topics();
119118
}
120119

121120

@@ -127,6 +126,12 @@ private String getBootstrapProperties(ExtensionContext extensionContext) {
127126
return bootsrap;
128127
}
129128

129+
private Class<? extends TestConsumerConfig> getConsumerConfigClass(ExtensionContext extensionContext) {
130+
return extensionContext.getRequiredTestClass()
131+
.getAnnotation(EnableKafkaTest.class)
132+
.testConsumerConfig();
133+
}
134+
130135
@Override
131136
public void beforeEach(ExtensionContext extensionContext) throws Exception {
132137
kafkaMessageBroker.reset();

src/main/java/com/jupitertools/springtestkafka/expected/KafkaMessageBroker.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,10 @@ public class KafkaMessageBroker implements MessageBroker {
3131
private final ObjectMapper objectMapper;
3232

3333

34-
public KafkaMessageBroker(String bootstrapProperties, String[] topicNames) {
34+
public KafkaMessageBroker(String bootstrapProperties,
35+
String[] topicNames,
36+
Class<? extends TestConsumerConfig> consumerConfigClass) {
37+
3538
this.log = LoggerFactory.getLogger(KafkaMessageBroker.class);
3639
this.objectMapper = new ObjectMapper();
3740
this.topicDataHolder = new ConcurrentHashMap<>();
@@ -41,7 +44,8 @@ public KafkaMessageBroker(String bootstrapProperties, String[] topicNames) {
4144

4245
testConsumerContainer = kafkaContainerFactory.createContainer(bootstrapProperties,
4346
topicNames,
44-
kafkaTestConsumer);
47+
kafkaTestConsumer,
48+
consumerConfigClass);
4549
}
4650

4751
/**
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
package com.jupitertools.springtestkafka.expected;
2+
3+
4+
import java.util.Map;
5+
6+
/**
7+
* The properties holder for Kafka test consumer.
8+
*
9+
* @author Korovin Anatoliy
10+
*/
11+
public interface TestConsumerConfig {
12+
13+
/**
14+
* @return the the map of properties for the test Kafka consumer ({@link org.springframework.kafka.core.DefaultKafkaConsumerFactory})
15+
*/
16+
Map<String, Object> getProperties();
17+
}

src/main/java/com/jupitertools/springtestkafka/expected/annotation/EnableKafkaTest.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,9 @@
66
import java.lang.annotation.RetentionPolicy;
77
import java.lang.annotation.Target;
88

9+
import com.jupitertools.springtestkafka.expected.DefaultKafkaConsumerConfig;
910
import com.jupitertools.springtestkafka.expected.KafkaExpectedMessagesExtension;
11+
import com.jupitertools.springtestkafka.expected.TestConsumerConfig;
1012
import org.junit.jupiter.api.extension.ExtendWith;
1113

1214
/**
@@ -29,4 +31,20 @@
2931
* @return com.jupitertools.springtestkafka.expected.annotation.ExpectedMessages should receive messages only from this topics
3032
*/
3133
String[] topics();
34+
35+
/**
36+
* The custom configuration provider for test Kafka Consumer.
37+
* <p>
38+
* You can declare specific properties for Kafka consumer in this file.
39+
* For example, if you need to use an Integer key deserialization(instead of
40+
* default StringDeserialization) then you can set all necessary properties
41+
* in a separate file, and declare it here.
42+
* <p>
43+
* Be careful, this file will be instantiated manually,
44+
* you cannot use dependency injection or make a private default
45+
* constructor of this file. Also, this file cannot be a static nested class.
46+
*
47+
* @return class type of {@link TestConsumerConfig}
48+
*/
49+
Class<? extends TestConsumerConfig> testConsumerConfig() default DefaultKafkaConsumerConfig.class;
3250
}
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
package com.jupitertools.springtestkafka;
2+
3+
import java.util.Date;
4+
5+
import com.jupitertools.springtestkafka.expected.annotation.EnableKafkaTest;
6+
import com.jupitertools.springtestkafka.expected.annotation.ExpectedMessages;
7+
import lombok.AllArgsConstructor;
8+
import lombok.Builder;
9+
import lombok.Getter;
10+
import lombok.NoArgsConstructor;
11+
import lombok.Setter;
12+
import org.junit.jupiter.api.Test;
13+
14+
import org.springframework.beans.factory.annotation.Autowired;
15+
import org.springframework.boot.test.context.SpringBootTest;
16+
import org.springframework.kafka.core.KafkaTemplate;
17+
import org.springframework.test.context.TestPropertySource;
18+
19+
20+
@SpringBootTest
21+
@KafkaTestContainer
22+
@EnableKafkaTest(topics = "integer-key-test-topic",
23+
testConsumerConfig = IntegerKeyKafkaConsumerConfig.class)
24+
@TestPropertySource(properties = {
25+
"spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer",
26+
"spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.IntegerSerializer"
27+
})
28+
class CustomConsumerConfigTest {
29+
30+
@Autowired
31+
private KafkaTemplate<Integer, Object> kafkaTemplate;
32+
33+
@Test
34+
@ExpectedMessages(topic = "integer-key-test-topic", datasetFile = "/datasets/expected_without_class_ref.json")
35+
void withIntKeys() {
36+
kafkaTemplate.send("integer-key-test-topic", 1, Foo.builder().value("qwert").build());
37+
kafkaTemplate.send("integer-key-test-topic", 2, Bar.builder().name("baaark").build());
38+
}
39+
40+
@NoArgsConstructor
41+
@AllArgsConstructor
42+
@Builder
43+
@Getter
44+
@Setter
45+
static class Foo {
46+
private String value;
47+
}
48+
49+
@NoArgsConstructor
50+
@AllArgsConstructor
51+
@Builder
52+
@Getter
53+
@Setter
54+
static class Bar {
55+
private String name;
56+
private Date time;
57+
}
58+
}
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
package com.jupitertools.springtestkafka;
2+
3+
4+
import java.util.HashMap;
5+
import java.util.Map;
6+
7+
import com.jupitertools.springtestkafka.expected.TestConsumerConfig;
8+
import org.apache.kafka.clients.consumer.ConsumerConfig;
9+
import org.apache.kafka.common.serialization.IntegerDeserializer;
10+
import org.apache.kafka.common.serialization.StringDeserializer;
11+
12+
public class IntegerKeyKafkaConsumerConfig implements TestConsumerConfig {
13+
14+
@Override
15+
public Map<String, Object> getProperties() {
16+
Map<String, Object> props = new HashMap<>();
17+
props.put(ConsumerConfig.GROUP_ID_CONFIG, "group-name");
18+
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
19+
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
20+
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
21+
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
22+
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
23+
return props;
24+
}
25+
}

0 commit comments

Comments
 (0)