Skip to content

Commit 413d493

Browse files
authored
[CC-19402] SR testing using RestApp (#678)
* Add schema-registry ITs
1 parent 1d11fd0 commit 413d493

File tree

7 files changed

+244
-20
lines changed

7 files changed

+244
-20
lines changed

pom.xml

Lines changed: 32 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -202,6 +202,37 @@
202202
<version>${snakeyaml.version}</version>
203203
<scope>test</scope>
204204
</dependency>
205+
<dependency>
206+
<groupId>io.confluent</groupId>
207+
<artifactId>kafka-connect-avro-converter</artifactId>
208+
<version>${confluent.version}</version>
209+
<scope>test</scope>
210+
</dependency>
211+
<dependency>
212+
<groupId>io.confluent</groupId>
213+
<artifactId>kafka-connect-protobuf-converter</artifactId>
214+
<version>${confluent.version}</version>
215+
<scope>test</scope>
216+
</dependency>
217+
<dependency>
218+
<groupId>io.confluent</groupId>
219+
<artifactId>kafka-connect-json-schema-converter</artifactId>
220+
<version>${confluent.version}</version>
221+
<scope>test</scope>
222+
</dependency>
223+
<dependency>
224+
<groupId>io.confluent</groupId>
225+
<artifactId>kafka-schema-registry</artifactId>
226+
<version>${confluent.version}</version>
227+
<type>test-jar</type>
228+
<scope>test</scope>
229+
</dependency>
230+
<dependency>
231+
<groupId>io.confluent</groupId>
232+
<artifactId>kafka-schema-registry</artifactId>
233+
<version>${confluent.version}</version>
234+
<scope>test</scope>
235+
</dependency>
205236
</dependencies>
206237

207238
<build>
@@ -497,4 +528,4 @@
497528
</build>
498529
</profile>
499530
</profiles>
500-
</project>
531+
</project>

src/test/java/io/confluent/connect/elasticsearch/integration/BaseConnectorIT.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import java.util.Optional;
1919
import java.util.concurrent.TimeUnit;
2020

21+
import io.confluent.kafka.schemaregistry.RestApp;
2122
import org.apache.kafka.connect.runtime.AbstractStatus;
2223
import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo;
2324
import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster;
@@ -36,6 +37,7 @@ public abstract class BaseConnectorIT {
3637
protected static final long CONNECTOR_STARTUP_DURATION_MS = TimeUnit.MINUTES.toMillis(60);
3738

3839
protected EmbeddedConnectCluster connect;
40+
protected RestApp restApp;
3941

4042
protected void startConnect() {
4143
connect = new EmbeddedConnectCluster.Builder()

src/test/java/io/confluent/connect/elasticsearch/integration/ElasticsearchConnectorBaseIT.java

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
import org.junit.After;
4343
import org.junit.AfterClass;
4444
import org.junit.Before;
45+
import org.junit.BeforeClass;
4546

4647
public class ElasticsearchConnectorBaseIT extends BaseConnectorIT {
4748

@@ -56,13 +57,19 @@ public class ElasticsearchConnectorBaseIT extends BaseConnectorIT {
5657
protected ElasticsearchClient client;
5758
protected Map<String, String> props;
5859

60+
@BeforeClass
61+
public static void setupBeforeAll() {
62+
container = ElasticsearchContainer.fromSystemProperties();
63+
container.start();
64+
}
65+
5966
@AfterClass
6067
public static void cleanupAfterAll() {
6168
container.close();
6269
}
6370

6471
@Before
65-
public void setup() {
72+
public void setup() throws Exception {
6673
startConnect();
6774
connect.kafka().createTopic(TOPIC);
6875

@@ -71,7 +78,7 @@ public void setup() {
7178
}
7279

7380
@After
74-
public void cleanup() throws IOException {
81+
public void cleanup() throws Exception {
7582
stopConnect();
7683
client.deleteAll();
7784
client.close();
@@ -85,7 +92,7 @@ protected Map<String, String> createProps() {
8592
Map<String, String> props = new HashMap<>();
8693

8794
// generic configs
88-
props.put(CONNECTOR_CLASS_CONFIG, ElasticsearchSinkConnector.class.getName());
95+
props.put(CONNECTOR_CLASS_CONFIG, ElasticsearchSinkConnector.class.getSimpleName());
8996
props.put(TOPICS_CONFIG, TOPIC);
9097
props.put(TASKS_MAX_CONFIG, Integer.toString(TASKS_MAX));
9198
props.put(KEY_CONVERTER_CLASS_CONFIG, StringConverter.class.getName());
@@ -107,7 +114,6 @@ protected void runSimpleTest(Map<String, String> props) throws Exception {
107114

108115
// wait for tasks to spin up
109116
waitForConnectorToStart(CONNECTOR_NAME, TASKS_MAX);
110-
111117
writeRecords(NUM_RECORDS);
112118

113119
verifySearchResults(NUM_RECORDS);
@@ -118,7 +124,7 @@ protected void writeRecords(int numRecords) {
118124
}
119125

120126
protected void writeRecordsFromIndex(int start, int numRecords) {
121-
for (int i = start; i < start + numRecords; i++) {
127+
for (int i = start; i < start + numRecords; i++) {
122128
connect.kafka().produce(TOPIC, String.valueOf(i), String.format("{\"doc_num\":%d}", i));
123129
}
124130
}
@@ -156,4 +162,5 @@ protected void waitForRecords(int numRecords) throws InterruptedException {
156162
"Sufficient amount of document were not found in ES on time."
157163
);
158164
}
165+
159166
}
Lines changed: 194 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,194 @@
1+
/*
2+
* Copyright 2020 Confluent Inc.
3+
*
4+
* Licensed under the Confluent Community License (the "License"); you may not use
5+
* this file except in compliance with the License. You may obtain a copy of the
6+
* License at
7+
*
8+
* http://www.confluent.io/confluent-community-license
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
13+
* specific language governing permissions and limitations under the License.
14+
*/
15+
16+
package io.confluent.connect.elasticsearch.integration;
17+
18+
import io.confluent.connect.avro.AvroConverter;
19+
import io.confluent.connect.json.JsonSchemaConverter;
20+
import io.confluent.connect.protobuf.ProtobufConverter;
21+
import io.confluent.kafka.schemaregistry.CompatibilityLevel;
22+
import io.confluent.kafka.schemaregistry.RestApp;
23+
import org.apache.kafka.clients.producer.KafkaProducer;
24+
import org.apache.kafka.clients.producer.ProducerConfig;
25+
import org.apache.kafka.clients.producer.ProducerRecord;
26+
import org.apache.kafka.common.KafkaException;
27+
import org.apache.kafka.common.serialization.ByteArraySerializer;
28+
import org.apache.kafka.connect.data.Schema;
29+
import org.apache.kafka.connect.data.SchemaAndValue;
30+
import org.apache.kafka.connect.data.SchemaBuilder;
31+
import org.apache.kafka.connect.data.Struct;
32+
import org.apache.kafka.connect.storage.Converter;
33+
import org.apache.kafka.test.TestUtils;
34+
import org.junit.Test;
35+
import org.junit.runner.RunWith;
36+
import org.junit.runners.Parameterized;
37+
import org.junit.runners.Parameterized.Parameters;
38+
39+
import java.io.IOException;
40+
import java.net.ServerSocket;
41+
import java.util.ArrayList;
42+
import java.util.Arrays;
43+
import java.util.Collections;
44+
import java.util.HashMap;
45+
import java.util.List;
46+
import java.util.Map;
47+
import java.util.Properties;
48+
import java.util.concurrent.TimeUnit;
49+
50+
import static io.confluent.kafka.schemaregistry.ClusterTestHarness.KAFKASTORE_TOPIC;
51+
52+
@RunWith(Parameterized.class)
53+
public class ElasticsearchConnectorDataFormatIT extends ElasticsearchConnectorBaseIT {
54+
55+
protected void startSchemaRegistry() throws Exception {
56+
int port = findAvailableOpenPort();
57+
restApp = new RestApp(port, null, connect.kafka().bootstrapServers(),
58+
KAFKASTORE_TOPIC, CompatibilityLevel.NONE.name, true, new Properties());
59+
restApp.start();
60+
}
61+
62+
protected void stopSchemaRegistry() throws Exception {
63+
restApp.stop();
64+
}
65+
66+
protected void waitForSchemaRegistryToStart() throws InterruptedException {
67+
TestUtils.waitForCondition(
68+
() -> restApp.restServer.isRunning(),
69+
CONNECTOR_STARTUP_DURATION_MS,
70+
"Schema-registry server did not start in time."
71+
);
72+
}
73+
74+
private Converter converter;
75+
private Class<? extends Converter> converterClass;
76+
77+
@Override
78+
public void setup() throws Exception {
79+
startConnect();
80+
startSchemaRegistry();
81+
connect.kafka().createTopic(TOPIC);
82+
83+
props = createProps();
84+
client = createClient();
85+
}
86+
87+
@Override
88+
public void cleanup() throws Exception {
89+
stopConnect();
90+
stopSchemaRegistry();
91+
client.deleteAll();
92+
client.close();
93+
}
94+
95+
@Parameters
96+
public static List<Class<? extends Converter>> data() {
97+
return Arrays.asList(JsonSchemaConverter.class, ProtobufConverter.class, AvroConverter.class);
98+
}
99+
100+
public ElasticsearchConnectorDataFormatIT(Class<? extends Converter> converter) throws Exception {
101+
this.converterClass = converter;
102+
this.converter = converterClass.getConstructor().newInstance();
103+
}
104+
105+
@Test
106+
public void testHappyPathDataFormat() throws Exception {
107+
// configure configs and converter with schema-registry addr
108+
props.put("value.converter", converterClass.getSimpleName());
109+
props.put("value.converter.schema.registry.url", restApp.restServer.getURI().toString());
110+
props.put("value.converter.scrub.invalid.names", "true");
111+
converter.configure(Collections.singletonMap(
112+
"schema.registry.url", restApp.restServer.getURI().toString()
113+
), false
114+
);
115+
116+
// start the connector
117+
connect.configureConnector(CONNECTOR_NAME, props);
118+
119+
// wait for schema-registry to spin up
120+
waitForSchemaRegistryToStart();
121+
122+
// wait for tasks to spin up
123+
waitForConnectorToStart(CONNECTOR_NAME, TASKS_MAX);
124+
125+
// run test
126+
writeRecords(NUM_RECORDS);
127+
verifySearchResults(NUM_RECORDS);
128+
}
129+
130+
@Override
131+
protected void writeRecords(int numRecords) {
132+
writeRecordsFromIndex(0, numRecords, converter);
133+
}
134+
135+
protected void writeRecordsFromIndex(int start, int numRecords, Converter converter) {
136+
// get defined schema for the test
137+
Schema schema = getRecordSchema();
138+
139+
// configure producer with default properties
140+
KafkaProducer<byte[], byte[]> producer = configureProducer();
141+
142+
List<SchemaAndValue> recordsList = getRecords(schema, start, numRecords);
143+
144+
// produce records into topic
145+
produceRecords(producer, converter, recordsList, TOPIC);
146+
}
147+
148+
private Integer findAvailableOpenPort() throws IOException {
149+
try (ServerSocket socket = new ServerSocket(0)) {
150+
return socket.getLocalPort();
151+
}
152+
}
153+
154+
protected List<SchemaAndValue> getRecords(Schema schema, int start, int numRecords) {
155+
List<SchemaAndValue> recordList = new ArrayList<>();
156+
for (int i = start; i < start + numRecords; i++) {
157+
Struct struct = new Struct(schema);
158+
struct.put("doc_num", i);
159+
SchemaAndValue schemaAndValue = new SchemaAndValue(schema, struct);
160+
recordList.add(schemaAndValue);
161+
}
162+
return recordList;
163+
}
164+
165+
protected void produceRecords(
166+
KafkaProducer<byte[], byte[]> producer,
167+
Converter converter,
168+
List<SchemaAndValue> recordsList,
169+
String topic
170+
) {
171+
for (int i = 0; i < recordsList.size(); i++) {
172+
SchemaAndValue schemaAndValue = recordsList.get(i);
173+
byte[] convertedStruct = converter.fromConnectData(topic, schemaAndValue.schema(), schemaAndValue.value());
174+
ProducerRecord<byte[], byte[]> msg = new ProducerRecord<>(topic, 0, String.valueOf(i).getBytes(), convertedStruct);
175+
try {
176+
producer.send(msg).get(TimeUnit.SECONDS.toMillis(120), TimeUnit.MILLISECONDS);
177+
} catch (Exception e) {
178+
throw new KafkaException("Could not produce message: " + msg, e);
179+
}
180+
}
181+
}
182+
183+
protected KafkaProducer<byte[], byte[]> configureProducer() {
184+
Map<String, Object> producerProps = new HashMap<>();
185+
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, connect.kafka().bootstrapServers());
186+
return new KafkaProducer<>(producerProps, new ByteArraySerializer(), new ByteArraySerializer());
187+
}
188+
189+
protected Schema getRecordSchema() {
190+
SchemaBuilder schemaBuilder = SchemaBuilder.struct();
191+
schemaBuilder.field("doc_num", Schema.INT32_SCHEMA);
192+
return schemaBuilder.build();
193+
}
194+
}

src/test/java/io/confluent/connect/elasticsearch/integration/ElasticsearchConnectorIT.java

Lines changed: 4 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.LINGER_MS_CONFIG;
2222
import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.WRITE_METHOD_CONFIG;
2323
import static org.apache.kafka.connect.runtime.ConnectorConfig.VALUE_CONVERTER_CLASS_CONFIG;
24-
import static org.junit.Assert.assertEquals;
2524
import static org.junit.Assert.assertNotNull;
2625
import static org.junit.Assert.assertTrue;
2726

@@ -31,11 +30,12 @@
3130
import io.confluent.connect.elasticsearch.Mapping;
3231
import io.confluent.connect.elasticsearch.TestUtils;
3332
import io.confluent.connect.elasticsearch.jest.JestElasticsearchClient.WriteMethod;
33+
3434
import java.util.Collections;
35+
3536
import org.apache.kafka.connect.data.Schema;
3637
import org.apache.kafka.connect.storage.StringConverter;
3738
import org.apache.kafka.test.IntegrationTest;
38-
import org.junit.BeforeClass;
3939
import org.junit.Test;
4040
import org.junit.experimental.categories.Category;
4141
import org.slf4j.Logger;
@@ -46,12 +46,6 @@ public class ElasticsearchConnectorIT extends ElasticsearchConnectorBaseIT {
4646

4747
private static Logger log = LoggerFactory.getLogger(ElasticsearchConnectorIT.class);
4848

49-
@BeforeClass
50-
public static void setupBeforeAll() {
51-
container = ElasticsearchContainer.fromSystemProperties();
52-
container.start();
53-
}
54-
5549
@Test
5650
public void testChangeConfigsAndRestart() throws Exception {
5751
// run connector and write
@@ -124,8 +118,8 @@ public void testPrimitive() throws Exception {
124118
// wait for tasks to spin up
125119
waitForConnectorToStart(CONNECTOR_NAME, TASKS_MAX);
126120

127-
for (int i = 0; i < NUM_RECORDS; i++) {
128-
connect.kafka().produce(TOPIC, String.valueOf(i), String.valueOf(i));
121+
for (int i = 0; i < NUM_RECORDS; i++) {
122+
connect.kafka().produce(TOPIC, String.valueOf(i), String.valueOf(i));
129123
}
130124

131125
waitForRecords(0);

src/test/java/io/confluent/connect/elasticsearch/integration/ElasticsearchConnectorSecureIT.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717

1818
import io.confluent.common.utils.IntegrationTest;
1919
import io.confluent.connect.elasticsearch.ElasticsearchClient;
20-
import io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig;
2120
import io.confluent.connect.elasticsearch.SecurityProtocol;
2221
import io.confluent.connect.elasticsearch.jest.JestElasticsearchClient;
2322
import org.apache.kafka.common.config.SslConfigs;

src/test/java/io/confluent/connect/elasticsearch/integration/ElasticsearchContainer.java

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,11 +9,8 @@
99
import java.io.IOException;
1010
import java.io.InputStream;
1111
import java.net.InetAddress;
12-
import java.net.MalformedURLException;
1312
import java.net.URI;
1413
import java.net.URISyntaxException;
15-
import java.net.URL;
16-
import java.net.UnknownHostException;
1714
import java.time.Duration;
1815
import java.util.concurrent.Future;
1916

0 commit comments

Comments
 (0)