Skip to content

Commit 1d7cd2c

Browse files
committed
Merge branch '10.0.x' into 11.0.x
2 parents 410cf14 + 413d493 commit 1d7cd2c

File tree

5 files changed

+237
-13
lines changed

5 files changed

+237
-13
lines changed

pom.xml

Lines changed: 32 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -230,6 +230,37 @@
230230
<version>2.30.1</version>
231231
<scope>test</scope>
232232
</dependency>
233+
<dependency>
234+
<groupId>io.confluent</groupId>
235+
<artifactId>kafka-connect-avro-converter</artifactId>
236+
<version>${confluent.version}</version>
237+
<scope>test</scope>
238+
</dependency>
239+
<dependency>
240+
<groupId>io.confluent</groupId>
241+
<artifactId>kafka-connect-protobuf-converter</artifactId>
242+
<version>${confluent.version}</version>
243+
<scope>test</scope>
244+
</dependency>
245+
<dependency>
246+
<groupId>io.confluent</groupId>
247+
<artifactId>kafka-connect-json-schema-converter</artifactId>
248+
<version>${confluent.version}</version>
249+
<scope>test</scope>
250+
</dependency>
251+
<dependency>
252+
<groupId>io.confluent</groupId>
253+
<artifactId>kafka-schema-registry</artifactId>
254+
<version>${confluent.version}</version>
255+
<type>test-jar</type>
256+
<scope>test</scope>
257+
</dependency>
258+
<dependency>
259+
<groupId>io.confluent</groupId>
260+
<artifactId>kafka-schema-registry</artifactId>
261+
<version>${confluent.version}</version>
262+
<scope>test</scope>
263+
</dependency>
233264
</dependencies>
234265

235266
<dependencyManagement>
@@ -535,4 +566,4 @@
535566
</build>
536567
</profile>
537568
</profiles>
538-
</project>
569+
</project>

src/test/java/io/confluent/connect/elasticsearch/integration/BaseConnectorIT.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import java.util.Optional;
1919
import java.util.concurrent.TimeUnit;
2020

21+
import io.confluent.kafka.schemaregistry.RestApp;
2122
import org.apache.kafka.connect.runtime.AbstractStatus;
2223
import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo;
2324
import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster;
@@ -36,6 +37,7 @@ public abstract class BaseConnectorIT {
3637
protected static final long CONNECTOR_STARTUP_DURATION_MS = TimeUnit.MINUTES.toMillis(60);
3738

3839
protected EmbeddedConnectCluster connect;
40+
protected RestApp restApp;
3941

4042
protected void startConnect() {
4143
connect = new EmbeddedConnectCluster.Builder()

src/test/java/io/confluent/connect/elasticsearch/integration/ElasticsearchConnectorBaseIT.java

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,9 @@
1515

1616
package io.confluent.connect.elasticsearch.integration;
1717

18+
import io.confluent.connect.elasticsearch.ElasticsearchSinkConnector;
19+
import io.confluent.connect.elasticsearch.helper.ElasticsearchContainer;
20+
import io.confluent.connect.elasticsearch.helper.ElasticsearchHelperClient;
1821
import org.apache.kafka.connect.json.JsonConverter;
1922
import org.apache.kafka.connect.storage.StringConverter;
2023
import org.apache.kafka.test.TestUtils;
@@ -27,17 +30,13 @@
2730
import org.junit.After;
2831
import org.junit.AfterClass;
2932
import org.junit.Before;
33+
import org.junit.BeforeClass;
3034

31-
import java.io.IOException;
3235
import java.net.ConnectException;
3336
import java.util.Collections;
3437
import java.util.HashMap;
3538
import java.util.Map;
3639

37-
import io.confluent.connect.elasticsearch.ElasticsearchSinkConnector;
38-
import io.confluent.connect.elasticsearch.helper.ElasticsearchContainer;
39-
import io.confluent.connect.elasticsearch.helper.ElasticsearchHelperClient;
40-
4140
import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.CONNECTION_URL_CONFIG;
4241
import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.IGNORE_KEY_CONFIG;
4342
import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.IGNORE_SCHEMA_CONFIG;
@@ -69,13 +68,19 @@ public class ElasticsearchConnectorBaseIT extends BaseConnectorIT {
6968
protected ElasticsearchHelperClient helperClient;
7069
protected Map<String, String> props;
7170

71+
@BeforeClass
72+
public static void setupBeforeAll() throws Exception {
73+
container = ElasticsearchContainer.fromSystemProperties();
74+
container.start();
75+
}
76+
7277
@AfterClass
7378
public static void cleanupAfterAll() {
7479
container.close();
7580
}
7681

7782
@Before
78-
public void setup() {
83+
public void setup() throws Exception {
7984
startConnect();
8085
connect.kafka().createTopic(TOPIC);
8186

@@ -84,7 +89,7 @@ public void setup() {
8489
}
8590

8691
@After
87-
public void cleanup() throws IOException {
92+
public void cleanup() throws Exception {
8893
stopConnect();
8994

9095
if (container.isRunning()) {
@@ -103,7 +108,7 @@ protected Map<String, String> createProps() {
103108
Map<String, String> props = new HashMap<>();
104109

105110
// generic configs
106-
props.put(CONNECTOR_CLASS_CONFIG, ElasticsearchSinkConnector.class.getName());
111+
props.put(CONNECTOR_CLASS_CONFIG, ElasticsearchSinkConnector.class.getSimpleName());
107112
props.put(TOPICS_CONFIG, TOPIC);
108113
props.put(TASKS_MAX_CONFIG, Integer.toString(TASKS_MAX));
109114
props.put(KEY_CONVERTER_CLASS_CONFIG, StringConverter.class.getName());
@@ -124,7 +129,6 @@ protected void runSimpleTest(Map<String, String> props) throws Exception {
124129

125130
// wait for tasks to spin up
126131
waitForConnectorToStart(CONNECTOR_NAME, TASKS_MAX);
127-
128132
writeRecords(NUM_RECORDS);
129133

130134
verifySearchResults(NUM_RECORDS);
Lines changed: 187 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,187 @@
1+
/*
2+
* Copyright 2020 Confluent Inc.
3+
*
4+
* Licensed under the Confluent Community License (the "License"); you may not use
5+
* this file except in compliance with the License. You may obtain a copy of the
6+
* License at
7+
*
8+
* http://www.confluent.io/confluent-community-license
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
13+
* specific language governing permissions and limitations under the License.
14+
*/
15+
16+
package io.confluent.connect.elasticsearch.integration;
17+
18+
import io.confluent.connect.avro.AvroConverter;
19+
import io.confluent.connect.json.JsonSchemaConverter;
20+
import io.confluent.connect.protobuf.ProtobufConverter;
21+
import io.confluent.kafka.schemaregistry.CompatibilityLevel;
22+
import io.confluent.kafka.schemaregistry.RestApp;
23+
import org.apache.kafka.clients.producer.KafkaProducer;
24+
import org.apache.kafka.clients.producer.ProducerConfig;
25+
import org.apache.kafka.clients.producer.ProducerRecord;
26+
import org.apache.kafka.common.KafkaException;
27+
import org.apache.kafka.common.serialization.ByteArraySerializer;
28+
import org.apache.kafka.connect.data.Schema;
29+
import org.apache.kafka.connect.data.SchemaAndValue;
30+
import org.apache.kafka.connect.data.SchemaBuilder;
31+
import org.apache.kafka.connect.data.Struct;
32+
import org.apache.kafka.connect.storage.Converter;
33+
import org.apache.kafka.test.TestUtils;
34+
import org.junit.Test;
35+
import org.junit.runner.RunWith;
36+
import org.junit.runners.Parameterized;
37+
import org.junit.runners.Parameterized.Parameters;
38+
39+
import java.io.IOException;
40+
import java.net.ServerSocket;
41+
import java.util.ArrayList;
42+
import java.util.Arrays;
43+
import java.util.Collections;
44+
import java.util.HashMap;
45+
import java.util.List;
46+
import java.util.Map;
47+
import java.util.Properties;
48+
import java.util.concurrent.TimeUnit;
49+
50+
import static io.confluent.kafka.schemaregistry.ClusterTestHarness.KAFKASTORE_TOPIC;
51+
52+
@RunWith(Parameterized.class)
53+
public class ElasticsearchConnectorDataFormatIT extends ElasticsearchConnectorBaseIT {
54+
55+
protected void startSchemaRegistry() throws Exception {
56+
int port = findAvailableOpenPort();
57+
restApp = new RestApp(port, null, connect.kafka().bootstrapServers(),
58+
KAFKASTORE_TOPIC, CompatibilityLevel.NONE.name, true, new Properties());
59+
restApp.start();
60+
}
61+
62+
protected void stopSchemaRegistry() throws Exception {
63+
restApp.stop();
64+
}
65+
66+
protected void waitForSchemaRegistryToStart() throws InterruptedException {
67+
TestUtils.waitForCondition(
68+
() -> restApp.restServer.isRunning(),
69+
CONNECTOR_STARTUP_DURATION_MS,
70+
"Schema-registry server did not start in time."
71+
);
72+
}
73+
74+
private Converter converter;
75+
private Class<? extends Converter> converterClass;
76+
77+
@Override
78+
public void setup() throws Exception {
79+
super.setup();
80+
startSchemaRegistry();
81+
}
82+
83+
@Override
84+
public void cleanup() throws Exception {
85+
super.cleanup();
86+
stopSchemaRegistry();}
87+
88+
@Parameters
89+
public static List<Class<? extends Converter>> data() {
90+
return Arrays.asList(JsonSchemaConverter.class, ProtobufConverter.class, AvroConverter.class);
91+
}
92+
93+
public ElasticsearchConnectorDataFormatIT(Class<? extends Converter> converter) throws Exception {
94+
this.converterClass = converter;
95+
this.converter = converterClass.getConstructor().newInstance();
96+
}
97+
98+
@Test
99+
public void testHappyPathDataFormat() throws Exception {
100+
// configure configs and converter with schema-registry addr
101+
props.put("value.converter", converterClass.getSimpleName());
102+
props.put("value.converter.schema.registry.url", restApp.restServer.getURI().toString());
103+
props.put("value.converter.scrub.invalid.names", "true");
104+
converter.configure(Collections.singletonMap(
105+
"schema.registry.url", restApp.restServer.getURI().toString()
106+
), false
107+
);
108+
109+
// start the connector
110+
connect.configureConnector(CONNECTOR_NAME, props);
111+
112+
// wait for schema-registry to spin up
113+
waitForSchemaRegistryToStart();
114+
115+
// wait for tasks to spin up
116+
waitForConnectorToStart(CONNECTOR_NAME, TASKS_MAX);
117+
118+
// run test
119+
writeRecords(NUM_RECORDS);
120+
verifySearchResults(NUM_RECORDS);
121+
}
122+
123+
@Override
124+
protected void writeRecords(int numRecords) {
125+
writeRecordsFromIndex(0, numRecords, converter);
126+
}
127+
128+
protected void writeRecordsFromIndex(int start, int numRecords, Converter converter) {
129+
// get defined schema for the test
130+
Schema schema = getRecordSchema();
131+
132+
// configure producer with default properties
133+
KafkaProducer<byte[], byte[]> producer = configureProducer();
134+
135+
List<SchemaAndValue> recordsList = getRecords(schema, start, numRecords);
136+
137+
// produce records into topic
138+
produceRecords(producer, converter, recordsList, TOPIC);
139+
}
140+
141+
private Integer findAvailableOpenPort() throws IOException {
142+
try (ServerSocket socket = new ServerSocket(0)) {
143+
return socket.getLocalPort();
144+
}
145+
}
146+
147+
protected List<SchemaAndValue> getRecords(Schema schema, int start, int numRecords) {
148+
List<SchemaAndValue> recordList = new ArrayList<>();
149+
for (int i = start; i < start + numRecords; i++) {
150+
Struct struct = new Struct(schema);
151+
struct.put("doc_num", i);
152+
SchemaAndValue schemaAndValue = new SchemaAndValue(schema, struct);
153+
recordList.add(schemaAndValue);
154+
}
155+
return recordList;
156+
}
157+
158+
protected void produceRecords(
159+
KafkaProducer<byte[], byte[]> producer,
160+
Converter converter,
161+
List<SchemaAndValue> recordsList,
162+
String topic
163+
) {
164+
for (int i = 0; i < recordsList.size(); i++) {
165+
SchemaAndValue schemaAndValue = recordsList.get(i);
166+
byte[] convertedStruct = converter.fromConnectData(topic, schemaAndValue.schema(), schemaAndValue.value());
167+
ProducerRecord<byte[], byte[]> msg = new ProducerRecord<>(topic, 0, String.valueOf(i).getBytes(), convertedStruct);
168+
try {
169+
producer.send(msg).get(TimeUnit.SECONDS.toMillis(120), TimeUnit.MILLISECONDS);
170+
} catch (Exception e) {
171+
throw new KafkaException("Could not produce message: " + msg, e);
172+
}
173+
}
174+
}
175+
176+
protected KafkaProducer<byte[], byte[]> configureProducer() {
177+
Map<String, Object> producerProps = new HashMap<>();
178+
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, connect.kafka().bootstrapServers());
179+
return new KafkaProducer<>(producerProps, new ByteArraySerializer(), new ByteArraySerializer());
180+
}
181+
182+
protected Schema getRecordSchema() {
183+
SchemaBuilder schemaBuilder = SchemaBuilder.struct();
184+
schemaBuilder.field("doc_num", Schema.INT32_SCHEMA);
185+
return schemaBuilder.build();
186+
}
187+
}

src/test/java/io/confluent/connect/elasticsearch/integration/ElasticsearchConnectorIT.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ public static void setupBeforeAll() {
6060
}
6161

6262
@Override
63-
public void setup() {
63+
public void setup() throws Exception {
6464
if (!container.isRunning()) {
6565
setupBeforeAll();
6666
}
@@ -161,8 +161,8 @@ public void testPrimitive() throws Exception {
161161
// wait for tasks to spin up
162162
waitForConnectorToStart(CONNECTOR_NAME, TASKS_MAX);
163163

164-
for (int i = 0; i < NUM_RECORDS; i++) {
165-
connect.kafka().produce(TOPIC, String.valueOf(i), String.valueOf(i));
164+
for (int i = 0; i < NUM_RECORDS; i++) {
165+
connect.kafka().produce(TOPIC, String.valueOf(i), String.valueOf(i));
166166
}
167167

168168
waitForRecords(0);

0 commit comments

Comments
 (0)