Skip to content

Commit d2c7fa6

Browse files
Merge branch '5.0.x' into 5.1.x
2 parents 8a141c8 + cb96f0c commit d2c7fa6

File tree

8 files changed

+500
-414
lines changed

8 files changed

+500
-414
lines changed

src/main/java/io/confluent/connect/elasticsearch/DataConverter.java

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -406,10 +406,6 @@ public static String[] names() {
406406
return result;
407407
}
408408

409-
public static BehaviorOnNullValues forValue(String value) {
410-
return valueOf(value.toUpperCase(Locale.ROOT));
411-
}
412-
413409
@Override
414410
public String toString() {
415411
return name().toLowerCase(Locale.ROOT);

src/main/java/io/confluent/connect/elasticsearch/ElasticsearchSinkConnectorConfig.java

Lines changed: 459 additions & 278 deletions
Large diffs are not rendered by default.

src/main/java/io/confluent/connect/elasticsearch/ElasticsearchSinkTask.java

Lines changed: 20 additions & 80 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@
1515

1616
package io.confluent.connect.elasticsearch;
1717

18-
import io.confluent.connect.elasticsearch.bulk.BulkProcessor;
1918
import io.confluent.connect.elasticsearch.jest.JestElasticsearchClient;
2019
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
2120
import org.apache.kafka.common.TopicPartition;
@@ -27,9 +26,7 @@
2726
import org.slf4j.LoggerFactory;
2827

2928
import java.util.Collection;
30-
import java.util.HashMap;
3129
import java.util.HashSet;
32-
import java.util.List;
3330
import java.util.Map;
3431
import java.util.Set;
3532
import java.util.concurrent.TimeUnit;
@@ -58,61 +55,16 @@ public void start(Map<String, String> props, ElasticsearchClient client) {
5855
log.info("Starting ElasticsearchSinkTask.");
5956

6057
ElasticsearchSinkConnectorConfig config = new ElasticsearchSinkConnectorConfig(props);
61-
String type = config.getString(ElasticsearchSinkConnectorConfig.TYPE_NAME_CONFIG);
62-
boolean ignoreKey =
63-
config.getBoolean(ElasticsearchSinkConnectorConfig.KEY_IGNORE_CONFIG);
64-
boolean ignoreSchema =
65-
config.getBoolean(ElasticsearchSinkConnectorConfig.SCHEMA_IGNORE_CONFIG);
66-
boolean useCompactMapEntries =
67-
config.getBoolean(ElasticsearchSinkConnectorConfig.COMPACT_MAP_ENTRIES_CONFIG);
68-
69-
70-
Map<String, String> topicToIndexMap =
71-
parseMapConfig(config.getList(ElasticsearchSinkConnectorConfig.TOPIC_INDEX_MAP_CONFIG));
72-
Set<String> topicIgnoreKey =
73-
new HashSet<>(config.getList(ElasticsearchSinkConnectorConfig.TOPIC_KEY_IGNORE_CONFIG));
74-
Set<String> topicIgnoreSchema = new HashSet<>(
75-
config.getList(ElasticsearchSinkConnectorConfig.TOPIC_SCHEMA_IGNORE_CONFIG)
76-
);
77-
78-
long flushTimeoutMs =
79-
config.getLong(ElasticsearchSinkConnectorConfig.FLUSH_TIMEOUT_MS_CONFIG);
80-
int maxBufferedRecords =
81-
config.getInt(ElasticsearchSinkConnectorConfig.MAX_BUFFERED_RECORDS_CONFIG);
82-
int batchSize =
83-
config.getInt(ElasticsearchSinkConnectorConfig.BATCH_SIZE_CONFIG);
84-
long lingerMs =
85-
config.getLong(ElasticsearchSinkConnectorConfig.LINGER_MS_CONFIG);
86-
int maxInFlightRequests =
87-
config.getInt(ElasticsearchSinkConnectorConfig.MAX_IN_FLIGHT_REQUESTS_CONFIG);
88-
long retryBackoffMs =
89-
config.getLong(ElasticsearchSinkConnectorConfig.RETRY_BACKOFF_MS_CONFIG);
90-
int maxRetry =
91-
config.getInt(ElasticsearchSinkConnectorConfig.MAX_RETRIES_CONFIG);
92-
boolean dropInvalidMessage =
93-
config.getBoolean(ElasticsearchSinkConnectorConfig.DROP_INVALID_MESSAGE_CONFIG);
94-
boolean createIndicesAtStartTime =
95-
config.getBoolean(ElasticsearchSinkConnectorConfig.AUTO_CREATE_INDICES_AT_START_CONFIG);
96-
97-
DataConverter.BehaviorOnNullValues behaviorOnNullValues =
98-
DataConverter.BehaviorOnNullValues.forValue(
99-
config.getString(ElasticsearchSinkConnectorConfig.BEHAVIOR_ON_NULL_VALUES_CONFIG)
100-
);
101-
102-
BulkProcessor.BehaviorOnMalformedDoc behaviorOnMalformedDoc =
103-
BulkProcessor.BehaviorOnMalformedDoc.forValue(
104-
config.getString(ElasticsearchSinkConnectorConfig.BEHAVIOR_ON_MALFORMED_DOCS_CONFIG)
105-
);
10658

10759
// Calculate the maximum possible backoff time ...
10860
long maxRetryBackoffMs =
109-
RetryUtil.computeRetryWaitTimeInMillis(maxRetry, retryBackoffMs);
61+
RetryUtil.computeRetryWaitTimeInMillis(config.maxRetries(), config.retryBackoffMs());
11062
if (maxRetryBackoffMs > RetryUtil.MAX_RETRY_TIME_MS) {
11163
log.warn("This connector uses exponential backoff with jitter for retries, "
11264
+ "and using '{}={}' and '{}={}' results in an impractical but possible maximum "
11365
+ "backoff time greater than {} hours.",
114-
ElasticsearchSinkConnectorConfig.MAX_RETRIES_CONFIG, maxRetry,
115-
ElasticsearchSinkConnectorConfig.RETRY_BACKOFF_MS_CONFIG, retryBackoffMs,
66+
ElasticsearchSinkConnectorConfig.MAX_RETRIES_CONFIG, config.maxRetries(),
67+
ElasticsearchSinkConnectorConfig.RETRY_BACKOFF_MS_CONFIG, config.retryBackoffMs(),
11668
TimeUnit.MILLISECONDS.toHours(maxRetryBackoffMs));
11769
}
11870

@@ -123,23 +75,23 @@ public void start(Map<String, String> props, ElasticsearchClient client) {
12375
}
12476

12577
ElasticsearchWriter.Builder builder = new ElasticsearchWriter.Builder(this.client)
126-
.setType(type)
127-
.setIgnoreKey(ignoreKey, topicIgnoreKey)
128-
.setIgnoreSchema(ignoreSchema, topicIgnoreSchema)
129-
.setCompactMapEntries(useCompactMapEntries)
130-
.setTopicToIndexMap(topicToIndexMap)
131-
.setFlushTimoutMs(flushTimeoutMs)
132-
.setMaxBufferedRecords(maxBufferedRecords)
133-
.setMaxInFlightRequests(maxInFlightRequests)
134-
.setBatchSize(batchSize)
135-
.setLingerMs(lingerMs)
136-
.setRetryBackoffMs(retryBackoffMs)
137-
.setMaxRetry(maxRetry)
138-
.setDropInvalidMessage(dropInvalidMessage)
139-
.setBehaviorOnNullValues(behaviorOnNullValues)
140-
.setBehaviorOnMalformedDoc(behaviorOnMalformedDoc);
141-
142-
this.createIndicesAtStartTime = createIndicesAtStartTime;
78+
.setType(config.type())
79+
.setIgnoreKey(config.ignoreKey(), config.ignoreKeyTopics())
80+
.setIgnoreSchema(config.ignoreSchema(), config.ignoreSchemaTopics())
81+
.setCompactMapEntries(config.useCompactMapEntries())
82+
.setTopicToIndexMap(config.topicToIndexMap())
83+
.setFlushTimoutMs(config.flushTimeoutMs())
84+
.setMaxBufferedRecords(config.maxBufferedRecords())
85+
.setMaxInFlightRequests(config.maxInFlightRequests())
86+
.setBatchSize(config.batchSize())
87+
.setLingerMs(config.lingerMs())
88+
.setRetryBackoffMs(config.retryBackoffMs())
89+
.setMaxRetry(config.maxRetries())
90+
.setDropInvalidMessage(config.dropInvalidMessage())
91+
.setBehaviorOnNullValues(config.behaviorOnNullValues())
92+
.setBehaviorOnMalformedDoc(config.behaviorOnMalformedDoc());
93+
94+
this.createIndicesAtStartTime = config.createIndicesAtStart();
14395

14496
writer = builder.build();
14597
writer.start();
@@ -190,16 +142,4 @@ public void stop() throws ConnectException {
190142
client.close();
191143
}
192144
}
193-
194-
private Map<String, String> parseMapConfig(List<String> values) {
195-
Map<String, String> map = new HashMap<>();
196-
for (String value : values) {
197-
String[] parts = value.split(":");
198-
String topic = parts[0];
199-
String type = parts[1];
200-
map.put(topic, type);
201-
}
202-
return map;
203-
}
204-
205145
}

src/main/java/io/confluent/connect/elasticsearch/bulk/BulkProcessor.java

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -532,10 +532,6 @@ public static String[] names() {
532532
return result;
533533
}
534534

535-
public static BehaviorOnMalformedDoc forValue(String value) {
536-
return valueOf(value.toUpperCase(Locale.ROOT));
537-
}
538-
539535
@Override
540536
public String toString() {
541537
return name().toLowerCase(Locale.ROOT);

src/main/java/io/confluent/connect/elasticsearch/jest/JestElasticsearchClient.java

Lines changed: 11 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,6 @@
4949
import io.searchbox.indices.mapping.PutMapping;
5050
import org.apache.http.HttpHost;
5151
import org.apache.kafka.common.config.ConfigException;
52-
import org.apache.kafka.common.config.types.Password;
5352
import org.apache.kafka.connect.data.Schema;
5453
import org.apache.kafka.connect.errors.ConnectException;
5554
import org.slf4j.Logger;
@@ -126,26 +125,18 @@ public JestElasticsearchClient(Map<String, String> props) {
126125
protected JestElasticsearchClient(Map<String, String> props, JestClientFactory factory) {
127126
try {
128127
ElasticsearchSinkConnectorConfig config = new ElasticsearchSinkConnectorConfig(props);
129-
final int connTimeout = config.getInt(
130-
ElasticsearchSinkConnectorConfig.CONNECTION_TIMEOUT_MS_CONFIG);
131-
final int readTimeout = config.getInt(
132-
ElasticsearchSinkConnectorConfig.READ_TIMEOUT_MS_CONFIG);
133-
134-
final String username = config.getString(
135-
ElasticsearchSinkConnectorConfig.CONNECTION_USERNAME_CONFIG);
136-
final Password password = config.getPassword(
137-
ElasticsearchSinkConnectorConfig.CONNECTION_PASSWORD_CONFIG);
138-
139-
List<String> address =
140-
config.getList(ElasticsearchSinkConnectorConfig.CONNECTION_URL_CONFIG);
141-
HttpClientConfig.Builder builder = new HttpClientConfig.Builder(address)
142-
.connTimeout(connTimeout)
143-
.readTimeout(readTimeout)
128+
129+
Set<String> addresses = config.connectionUrls();
130+
HttpClientConfig.Builder builder = new HttpClientConfig.Builder(addresses)
131+
.connTimeout(config.connectionTimeoutMs())
132+
.readTimeout(config.readTimeoutMs())
144133
.multiThreaded(true);
145-
if (username != null && password != null) {
146-
builder.defaultCredentials(username, password.value())
147-
.preemptiveAuthTargetHosts(address.stream()
148-
.map(addr -> HttpHost.create(addr)).collect(Collectors.toSet()));
134+
if (config.isAuthenticatedConnection()) {
135+
builder
136+
.defaultCredentials(config.username(), config.password().value())
137+
.preemptiveAuthTargetHosts(
138+
addresses.stream().map(addr -> HttpHost.create(addr)).collect(Collectors.toSet())
139+
);
149140
}
150141
HttpClientConfig httpClientConfig = builder.build();
151142
factory.setHttpClientConfig(httpClientConfig);
Lines changed: 7 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package io.confluent.connect.elasticsearch;
22

3-
import org.junit.Assert;
3+
import static org.junit.Assert.assertEquals;
4+
45
import org.junit.Before;
56
import org.junit.Test;
67

@@ -16,34 +17,22 @@ public void setup() {
1617
props = new HashMap<>();
1718
props.put(ElasticsearchSinkConnectorConfig.TYPE_NAME_CONFIG, ElasticsearchSinkTestBase.TYPE);
1819
props.put(ElasticsearchSinkConnectorConfig.CONNECTION_URL_CONFIG, "localhost");
19-
props.put(ElasticsearchSinkConnectorConfig.KEY_IGNORE_CONFIG, "true");
20+
props.put(ElasticsearchSinkConnectorConfig.IGNORE_KEY_CONFIG, "true");
2021
}
2122

2223
@Test
2324
public void testDefaultHttpTimeoutsConfig() {
2425
ElasticsearchSinkConnectorConfig config = new ElasticsearchSinkConnectorConfig(props);
25-
Assert.assertEquals(
26-
config.getInt(ElasticsearchSinkConnectorConfig.READ_TIMEOUT_MS_CONFIG),
27-
(Integer) 3000
28-
);
29-
Assert.assertEquals(
30-
config.getInt(ElasticsearchSinkConnectorConfig.CONNECTION_TIMEOUT_MS_CONFIG),
31-
(Integer) 1000
32-
);
26+
assertEquals(config.readTimeoutMs(), 3000);
27+
assertEquals(config.connectionTimeoutMs(), 1000);
3328
}
3429

3530
@Test
3631
public void testSetHttpTimeoutsConfig() {
3732
props.put(ElasticsearchSinkConnectorConfig.READ_TIMEOUT_MS_CONFIG, "10000");
3833
props.put(ElasticsearchSinkConnectorConfig.CONNECTION_TIMEOUT_MS_CONFIG, "15000");
3934
ElasticsearchSinkConnectorConfig config = new ElasticsearchSinkConnectorConfig(props);
40-
Assert.assertEquals(
41-
config.getInt(ElasticsearchSinkConnectorConfig.READ_TIMEOUT_MS_CONFIG),
42-
(Integer) 10000
43-
);
44-
Assert.assertEquals(
45-
config.getInt(ElasticsearchSinkConnectorConfig.CONNECTION_TIMEOUT_MS_CONFIG),
46-
(Integer) 15000
47-
);
35+
assertEquals(config.readTimeoutMs(), 10000);
36+
assertEquals(config.connectionTimeoutMs(), 15000);
4837
}
4938
}

src/test/java/io/confluent/connect/elasticsearch/ElasticsearchSinkTaskTest.java

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -22,11 +22,8 @@
2222
import org.apache.kafka.connect.data.Struct;
2323
import org.apache.kafka.connect.sink.SinkRecord;
2424
import org.elasticsearch.action.ActionFuture;
25-
import org.elasticsearch.action.ActionRequest;
26-
import org.elasticsearch.action.ActionRequestValidationException;
2725
import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsRequest;
2826
import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsResponse;
29-
import org.elasticsearch.action.index.IndexRequest;
3027
import org.elasticsearch.test.ESIntegTestCase;
3128
import org.elasticsearch.test.InternalTestCluster;
3229
import org.junit.Test;
@@ -56,7 +53,7 @@ private Map<String, String> createProps() {
5653
Map<String, String> props = new HashMap<>();
5754
props.put(ElasticsearchSinkConnectorConfig.TYPE_NAME_CONFIG, TYPE);
5855
props.put(ElasticsearchSinkConnectorConfig.CONNECTION_URL_CONFIG, "localhost");
59-
props.put(ElasticsearchSinkConnectorConfig.KEY_IGNORE_CONFIG, "true");
56+
props.put(ElasticsearchSinkConnectorConfig.IGNORE_KEY_CONFIG, "true");
6057
return props;
6158
}
6259

@@ -128,7 +125,7 @@ public void testCreateAndWriteToIndexNotCreatedAtStartTime() {
128125
cluster.ensureAtLeastNumDataNodes(3);
129126
Map<String, String> props = createProps();
130127

131-
props.put(ElasticsearchSinkConnectorConfig.AUTO_CREATE_INDICES_AT_START_CONFIG, "false");
128+
props.put(ElasticsearchSinkConnectorConfig.CREATE_INDICES_AT_START_CONFIG, "false");
132129

133130
ElasticsearchSinkTask task = new ElasticsearchSinkTask();
134131

src/test/java/io/confluent/connect/elasticsearch/integration/ElasticsearchSinkTaskIT.java

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,16 +17,12 @@
1717

1818
import static org.junit.Assert.assertNotNull;
1919

20-
import com.google.gson.JsonObject;
2120
import io.confluent.common.utils.IntegrationTest;
2221
import io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig;
2322
import io.confluent.connect.elasticsearch.ElasticsearchSinkTask;
24-
import io.confluent.connect.elasticsearch.Mapping;
25-
import io.confluent.connect.elasticsearch.TestUtils;
2623
import java.util.ArrayList;
2724
import java.util.Arrays;
2825
import java.util.Collection;
29-
import java.util.Collections;
3026
import java.util.HashMap;
3127
import java.util.HashSet;
3228
import java.util.Map;
@@ -43,7 +39,7 @@ private Map<String, String> createProps() {
4339
Map<String, String> props = new HashMap<>();
4440
props.put(ElasticsearchSinkConnectorConfig.TYPE_NAME_CONFIG, TYPE);
4541
props.put(ElasticsearchSinkConnectorConfig.CONNECTION_URL_CONFIG, "localhost");
46-
props.put(ElasticsearchSinkConnectorConfig.KEY_IGNORE_CONFIG, "true");
42+
props.put(ElasticsearchSinkConnectorConfig.IGNORE_KEY_CONFIG, "true");
4743
return props;
4844
}
4945

0 commit comments

Comments
 (0)