Skip to content

Commit b5c68ea

Browse files
authored
Merge pull request #1 from jupiter-tools/expected-messages
Expected messages
2 parents cf785ea + 3f919bf commit b5c68ea

20 files changed

+792
-9
lines changed

.travis.yml

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,7 @@ script:
88
addons:
99
sonarcloud:
1010
organization: "antkorwin-github"
11-
token:
12-
secure: $SONAR_CLOUD_KEY
11+
token: $SONAR_CLOUD_KEY
1312

1413
after_success:
1514
- bash <(curl -s https://codecov.io/bash)

README.adoc

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,3 +62,68 @@ class MultipleContainerInOneTest {
6262
----
6363
<1> start a first Kafka test container and set a bootstrap servers of started container to default Spring Boot properties (`spring.kafka.bootstrap-servers`)
6464
<2> start one more Kafka container and set a bootstrap servers to specified property, exactly in this property you can read an actual value of bootstrap servers after run the application context.
65+
66+
67+
## A better way to test your data in integration tests
68+
69+
You can test received messages after test execution if you use JSON format.
70+
Just describe expected dataset for a test case by the using `@ExpectedDataSet` annotation:
71+
72+
[source, java]
73+
----
74+
@SpringBootTest
75+
@KafkaTestContainer
76+
@EnableKafkaTest(topics = {"test-topic", "another-topic"})
77+
class ExpectedMessagesTest {
78+
79+
@Autowired
80+
private KafkaTemplate<String, Object> kafkaTemplate;
81+
82+
83+
@Test
84+
@ExpectedMessages(topic = "test-topic", datasetFile = "/datasets/expected_event.json")
85+
void firstTopic() {
86+
kafkaTemplate.send("test-topic", new Foo("qwert"));
87+
kafkaTemplate.send("test-topic", new Bar("baaark"));
88+
}
89+
90+
@Test
91+
@ExpectedMessages(topic = "another-topic", datasetFile = "/datasets/expected_another_event.json")
92+
void anotherTopic() {
93+
kafkaTemplate.send("another-topic", Bar.builder().time(new Date()).build());
94+
}
95+
}
96+
----
97+
98+
And after test execution, we will wait for messages declared in this JSON file.
99+
The content of JSON dataset file:
100+
101+
[source, json]
102+
----
103+
{
104+
"com.kafkatest.example.events.Foo": [
105+
{
106+
"value": "qwert"
107+
}
108+
],
109+
"com.kafkatest.example.events.Bar": [
110+
{
111+
"name": "baaark"
112+
}
113+
]
114+
}
115+
----
116+
117+
Also, you can use `@NoMessagesExpected` to check the silence on air.
118+
Receiving any messages will fail the test case.
119+
120+
[source, java]
121+
----
122+
@Test
123+
@NoMessagesExpected(timeout = 3000)
124+
void silence() {
125+
126+
}
127+
----
128+
Waits during the timeout for messages in Kafka topics(set in `@EnableKafkaTest`),
129+
if receive something then throws an exception.

pom.xml

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111

1212
<groupId>com.jupiter-tools</groupId>
1313
<artifactId>spring-test-kafka</artifactId>
14-
<version>0.1</version>
14+
<version>0.2</version>
1515
<packaging>jar</packaging>
1616

1717
<name>spring-test-kafka</name>
@@ -83,12 +83,25 @@
8383
<artifactId>spring-boot-starter-logging</artifactId>
8484
</dependency>
8585

86+
<!-- import DataSets -->
87+
<dependency>
88+
<groupId>com.jupiter-tools</groupId>
89+
<artifactId>spring-test-core</artifactId>
90+
<version>0.4</version>
91+
</dependency>
92+
<!-- import DataSets -->
93+
94+
<dependency>
95+
<groupId>com.jupiter-tools</groupId>
96+
<artifactId>datasetroll</artifactId>
97+
<version>0.1</version>
98+
</dependency>
99+
86100
<!-- region TEST -->
87101
<dependency>
88102
<groupId>org.awaitility</groupId>
89103
<artifactId>awaitility</artifactId>
90104
<version>3.1.0</version>
91-
<scope>test</scope>
92105
</dependency>
93106
<dependency>
94107
<groupId>com.jupiter-tools</groupId>
@@ -113,19 +126,16 @@
113126
<groupId>org.junit.jupiter</groupId>
114127
<artifactId>junit-jupiter-api</artifactId>
115128
<version>${junit-jupiter.version}</version>
116-
<scope>test</scope>
117129
</dependency>
118130
<dependency>
119131
<groupId>org.junit.jupiter</groupId>
120132
<artifactId>junit-jupiter-engine</artifactId>
121133
<version>${junit-jupiter.version}</version>
122-
<scope>test</scope>
123134
</dependency>
124135
<dependency>
125136
<groupId>org.junit.platform</groupId>
126137
<artifactId>junit-platform-engine</artifactId>
127138
<version>${junit-platform.version}</version>
128-
<scope>test</scope>
129139
</dependency>
130140
<!-- endregion Junit 5 -->
131141
</dependencies>
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
package com.jupitertools.springtestkafka.expected;
2+
3+
4+
import java.util.HashMap;
5+
import java.util.Map;
6+
7+
import org.apache.kafka.clients.consumer.ConsumerConfig;
8+
import org.apache.kafka.common.serialization.StringDeserializer;
9+
10+
public class DefaultKafkaConsumerConfig implements TestConsumerConfig {
11+
12+
@Override
13+
public Map<String, Object> getProperties() {
14+
Map<String, Object> props = new HashMap<>();
15+
props.put(ConsumerConfig.GROUP_ID_CONFIG, "group-name");
16+
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
17+
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
18+
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
19+
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
20+
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
21+
return props;
22+
}
23+
}
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
package com.jupitertools.springtestkafka.expected;
2+
3+
import java.util.Map;
4+
5+
import org.apache.kafka.clients.consumer.ConsumerConfig;
6+
7+
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
8+
import org.springframework.kafka.listener.ContainerProperties;
9+
import org.springframework.kafka.listener.KafkaMessageListenerContainer;
10+
import org.springframework.kafka.listener.MessageListener;
11+
12+
13+
public class KafkaContainerFactory {
14+
15+
public KafkaMessageListenerContainer createContainer(String bootstrapProperties,
16+
String[] topicNames,
17+
MessageListener kafkaTestConsumer,
18+
Class<? extends TestConsumerConfig> consumerConfigClass) {
19+
20+
Map<String, Object> props = prepareConsumerProperties(consumerConfigClass, bootstrapProperties);
21+
DefaultKafkaConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<Integer, String>(props);
22+
23+
ContainerProperties containerProps = new ContainerProperties(topicNames);
24+
//new ContainerProperties(Pattern.compile("^.*$")); ? why this is'n working ?
25+
containerProps.setMessageListener(kafkaTestConsumer);
26+
27+
KafkaMessageListenerContainer<Integer, String> container = new KafkaMessageListenerContainer<>(cf, containerProps);
28+
container.setBeanName("kafkaExpectedMessagesExtensionContainer");
29+
30+
return container;
31+
}
32+
33+
34+
private Map<String, Object> prepareConsumerProperties(Class<? extends TestConsumerConfig> consumerConfigClass,
35+
String bootstrapProperties) {
36+
try {
37+
TestConsumerConfig consumerConfig = consumerConfigClass.getDeclaredConstructor().newInstance();
38+
Map<String, Object> properties = consumerConfig.getProperties();
39+
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapProperties);
40+
return properties;
41+
} catch (Throwable e) {
42+
e.printStackTrace();
43+
throw new RuntimeException("Unable to instantiate a class with TestConsumerConfig", e);
44+
}
45+
}
46+
}
Lines changed: 139 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,139 @@
1+
package com.jupitertools.springtestkafka.expected;
2+
3+
4+
import com.jupiter.tools.spring.test.core.expected.list.messages.AssertReceivedMessages;
5+
import com.jupiter.tools.spring.test.core.expected.list.messages.DataSetPreProcessor;
6+
import com.jupiter.tools.spring.test.core.expected.list.messages.ExpectedMessagesOptions;
7+
import com.jupitertools.springtestkafka.expected.annotation.EnableKafkaTest;
8+
import com.jupitertools.springtestkafka.expected.annotation.ExpectedMessages;
9+
import com.jupitertools.springtestkafka.expected.annotation.NoMessagesExpected;
10+
import org.junit.jupiter.api.Assertions;
11+
import org.junit.jupiter.api.extension.AfterEachCallback;
12+
import org.junit.jupiter.api.extension.BeforeAllCallback;
13+
import org.junit.jupiter.api.extension.BeforeEachCallback;
14+
import org.junit.jupiter.api.extension.ExtensionContext;
15+
16+
import org.springframework.test.context.junit.jupiter.SpringExtension;
17+
18+
/**
19+
* Junit5 extension to work with the data-sets in Kafka tests.
20+
* <p>
21+
* see {@link EnableKafkaTest}
22+
* see {@link ExpectedMessages}
23+
* see {@link NoMessagesExpected}
24+
*
25+
* @author Anatoliy Korovin
26+
*/
27+
public class KafkaExpectedMessagesExtension implements BeforeAllCallback,
28+
BeforeEachCallback,
29+
AfterEachCallback {
30+
31+
private KafkaMessageBroker kafkaMessageBroker;
32+
33+
@Override
34+
public void beforeAll(ExtensionContext extensionContext) throws Exception {
35+
String bootstrap = getBootstrapProperties(extensionContext);
36+
String[] topics = getTopics(extensionContext);
37+
Class<? extends TestConsumerConfig> consumerConfigClass = getConsumerConfigClass(extensionContext);
38+
kafkaMessageBroker = new KafkaMessageBroker(bootstrap, topics, consumerConfigClass);
39+
kafkaMessageBroker.start();
40+
}
41+
42+
43+
@Override
44+
public void afterEach(ExtensionContext extensionContext) throws Exception {
45+
46+
if (isExpectedSilence(extensionContext)) {
47+
assertSilence(extensionContext);
48+
return;
49+
}
50+
51+
if (isExpectedMessages(extensionContext)) {
52+
assertMessages(extensionContext);
53+
}
54+
}
55+
56+
private boolean isExpectedSilence(ExtensionContext extensionContext) {
57+
58+
NoMessagesExpected noMessagesExpected = extensionContext.getRequiredTestMethod()
59+
.getAnnotation(NoMessagesExpected.class);
60+
61+
ExpectedMessages expectedMessages = extensionContext.getRequiredTestMethod()
62+
.getAnnotation(ExpectedMessages.class);
63+
if (noMessagesExpected == null) {
64+
return false;
65+
}
66+
67+
if (expectedMessages != null) {
68+
throw new Error("Wrong expected state, you can use only one annotation: NoMessagesExpected or ExpectedMessages, simultaneously");
69+
}
70+
71+
return true;
72+
}
73+
74+
private void assertSilence(ExtensionContext context) {
75+
76+
NoMessagesExpected noMessagesExpected = context.getRequiredTestMethod()
77+
.getAnnotation(NoMessagesExpected.class);
78+
79+
ExpectedMessagesOptions options = ExpectedMessagesOptions.builder()
80+
.allQueues(getTopics(context))
81+
.timeout(noMessagesExpected.timeout())
82+
.build();
83+
84+
new AssertReceivedMessages(options, kafkaMessageBroker).doAssertSilence();
85+
}
86+
87+
private boolean isExpectedMessages(ExtensionContext extensionContext) {
88+
89+
ExpectedMessages expectedMessages = extensionContext.getRequiredTestMethod()
90+
.getAnnotation(ExpectedMessages.class);
91+
92+
return expectedMessages != null;
93+
}
94+
95+
96+
private void assertMessages(ExtensionContext extensionContext) {
97+
98+
ExpectedMessages expectedMessages = extensionContext.getRequiredTestMethod()
99+
.getAnnotation(ExpectedMessages.class);
100+
101+
DataSetPreProcessor dataSetPreProcessor = new UntypedDataSetPreProcessor();
102+
ExpectedMessagesOptions options = ExpectedMessagesOptions.builder()
103+
.messagesFile(expectedMessages.datasetFile())
104+
.queue(expectedMessages.topic())
105+
.timeout(expectedMessages.timeout())
106+
.actualDataSetPreProcessor(dataSetPreProcessor)
107+
.expectedDataSetPreProcessor(dataSetPreProcessor)
108+
.build();
109+
110+
new AssertReceivedMessages(options, kafkaMessageBroker).doAssert();
111+
}
112+
113+
114+
private String[] getTopics(ExtensionContext extensionContext) {
115+
return extensionContext.getRequiredTestClass()
116+
.getAnnotation(EnableKafkaTest.class)
117+
.topics();
118+
}
119+
120+
121+
private String getBootstrapProperties(ExtensionContext extensionContext) {
122+
String bootsrap = SpringExtension.getApplicationContext(extensionContext)
123+
.getEnvironment()
124+
.getProperty("spring.kafka.bootstrap-servers");
125+
Assertions.assertNotNull(bootsrap, "Not found Kafka bootstap host:port in properties");
126+
return bootsrap;
127+
}
128+
129+
private Class<? extends TestConsumerConfig> getConsumerConfigClass(ExtensionContext extensionContext) {
130+
return extensionContext.getRequiredTestClass()
131+
.getAnnotation(EnableKafkaTest.class)
132+
.testConsumerConfig();
133+
}
134+
135+
@Override
136+
public void beforeEach(ExtensionContext extensionContext) throws Exception {
137+
kafkaMessageBroker.reset();
138+
}
139+
}

0 commit comments

Comments
 (0)