Skip to content

Commit 28feb9d

Browse files
author
Lev Zemlyanov
committed
Merge branch '5.3.x' into 5.4.x
2 parents d9d9e68 + 72c5314 commit 28feb9d

File tree

8 files changed

+557
-468
lines changed

8 files changed

+557
-468
lines changed

src/main/java/io/confluent/connect/elasticsearch/DataConverter.java

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -428,10 +428,6 @@ public static String[] names() {
428428
return result;
429429
}
430430

431-
public static BehaviorOnNullValues forValue(String value) {
432-
return valueOf(value.toUpperCase(Locale.ROOT));
433-
}
434-
435431
@Override
436432
public String toString() {
437433
return name().toLowerCase(Locale.ROOT);

src/main/java/io/confluent/connect/elasticsearch/ElasticsearchSinkConnectorConfig.java

Lines changed: 500 additions & 313 deletions
Large diffs are not rendered by default.

src/main/java/io/confluent/connect/elasticsearch/ElasticsearchSinkTask.java

Lines changed: 21 additions & 81 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@
1515

1616
package io.confluent.connect.elasticsearch;
1717

18-
import io.confluent.connect.elasticsearch.bulk.BulkProcessor;
1918
import io.confluent.connect.elasticsearch.jest.JestElasticsearchClient;
2019
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
2120
import org.apache.kafka.common.TopicPartition;
@@ -27,9 +26,7 @@
2726
import org.slf4j.LoggerFactory;
2827

2928
import java.util.Collection;
30-
import java.util.HashMap;
3129
import java.util.HashSet;
32-
import java.util.List;
3330
import java.util.Map;
3431
import java.util.Set;
3532
import java.util.concurrent.TimeUnit;
@@ -58,61 +55,16 @@ public void start(Map<String, String> props, ElasticsearchClient client) {
5855
log.info("Starting ElasticsearchSinkTask");
5956

6057
ElasticsearchSinkConnectorConfig config = new ElasticsearchSinkConnectorConfig(props);
61-
String type = config.getString(ElasticsearchSinkConnectorConfig.TYPE_NAME_CONFIG);
62-
boolean ignoreKey =
63-
config.getBoolean(ElasticsearchSinkConnectorConfig.KEY_IGNORE_CONFIG);
64-
boolean ignoreSchema =
65-
config.getBoolean(ElasticsearchSinkConnectorConfig.SCHEMA_IGNORE_CONFIG);
66-
boolean useCompactMapEntries =
67-
config.getBoolean(ElasticsearchSinkConnectorConfig.COMPACT_MAP_ENTRIES_CONFIG);
68-
69-
70-
Map<String, String> topicToIndexMap =
71-
parseMapConfig(config.getList(ElasticsearchSinkConnectorConfig.TOPIC_INDEX_MAP_CONFIG));
72-
Set<String> topicIgnoreKey =
73-
new HashSet<>(config.getList(ElasticsearchSinkConnectorConfig.TOPIC_KEY_IGNORE_CONFIG));
74-
Set<String> topicIgnoreSchema = new HashSet<>(
75-
config.getList(ElasticsearchSinkConnectorConfig.TOPIC_SCHEMA_IGNORE_CONFIG)
76-
);
77-
78-
long flushTimeoutMs =
79-
config.getLong(ElasticsearchSinkConnectorConfig.FLUSH_TIMEOUT_MS_CONFIG);
80-
int maxBufferedRecords =
81-
config.getInt(ElasticsearchSinkConnectorConfig.MAX_BUFFERED_RECORDS_CONFIG);
82-
int batchSize =
83-
config.getInt(ElasticsearchSinkConnectorConfig.BATCH_SIZE_CONFIG);
84-
long lingerMs =
85-
config.getLong(ElasticsearchSinkConnectorConfig.LINGER_MS_CONFIG);
86-
int maxInFlightRequests =
87-
config.getInt(ElasticsearchSinkConnectorConfig.MAX_IN_FLIGHT_REQUESTS_CONFIG);
88-
long retryBackoffMs =
89-
config.getLong(ElasticsearchSinkConnectorConfig.RETRY_BACKOFF_MS_CONFIG);
90-
int maxRetry =
91-
config.getInt(ElasticsearchSinkConnectorConfig.MAX_RETRIES_CONFIG);
92-
boolean dropInvalidMessage =
93-
config.getBoolean(ElasticsearchSinkConnectorConfig.DROP_INVALID_MESSAGE_CONFIG);
94-
boolean createIndicesAtStartTime =
95-
config.getBoolean(ElasticsearchSinkConnectorConfig.AUTO_CREATE_INDICES_AT_START_CONFIG);
96-
97-
DataConverter.BehaviorOnNullValues behaviorOnNullValues =
98-
DataConverter.BehaviorOnNullValues.forValue(
99-
config.getString(ElasticsearchSinkConnectorConfig.BEHAVIOR_ON_NULL_VALUES_CONFIG)
100-
);
101-
102-
BulkProcessor.BehaviorOnMalformedDoc behaviorOnMalformedDoc =
103-
BulkProcessor.BehaviorOnMalformedDoc.forValue(
104-
config.getString(ElasticsearchSinkConnectorConfig.BEHAVIOR_ON_MALFORMED_DOCS_CONFIG)
105-
);
10658

10759
// Calculate the maximum possible backoff time ...
10860
long maxRetryBackoffMs =
109-
RetryUtil.computeRetryWaitTimeInMillis(maxRetry, retryBackoffMs);
61+
RetryUtil.computeRetryWaitTimeInMillis(config.maxRetries(), config.retryBackoffMs());
11062
if (maxRetryBackoffMs > RetryUtil.MAX_RETRY_TIME_MS) {
11163
log.warn("This connector uses exponential backoff with jitter for retries, "
11264
+ "and using '{}={}' and '{}={}' results in an impractical but possible maximum "
11365
+ "backoff time greater than {} hours.",
114-
ElasticsearchSinkConnectorConfig.MAX_RETRIES_CONFIG, maxRetry,
115-
ElasticsearchSinkConnectorConfig.RETRY_BACKOFF_MS_CONFIG, retryBackoffMs,
66+
ElasticsearchSinkConnectorConfig.MAX_RETRIES_CONFIG, config.maxRetries(),
67+
ElasticsearchSinkConnectorConfig.RETRY_BACKOFF_MS_CONFIG, config.retryBackoffMs(),
11668
TimeUnit.MILLISECONDS.toHours(maxRetryBackoffMs));
11769
}
11870

@@ -123,29 +75,29 @@ public void start(Map<String, String> props, ElasticsearchClient client) {
12375
}
12476

12577
ElasticsearchWriter.Builder builder = new ElasticsearchWriter.Builder(this.client)
126-
.setType(type)
127-
.setIgnoreKey(ignoreKey, topicIgnoreKey)
128-
.setIgnoreSchema(ignoreSchema, topicIgnoreSchema)
129-
.setCompactMapEntries(useCompactMapEntries)
130-
.setTopicToIndexMap(topicToIndexMap)
131-
.setFlushTimoutMs(flushTimeoutMs)
132-
.setMaxBufferedRecords(maxBufferedRecords)
133-
.setMaxInFlightRequests(maxInFlightRequests)
134-
.setBatchSize(batchSize)
135-
.setLingerMs(lingerMs)
136-
.setRetryBackoffMs(retryBackoffMs)
137-
.setMaxRetry(maxRetry)
138-
.setDropInvalidMessage(dropInvalidMessage)
139-
.setBehaviorOnNullValues(behaviorOnNullValues)
140-
.setBehaviorOnMalformedDoc(behaviorOnMalformedDoc);
141-
142-
this.createIndicesAtStartTime = createIndicesAtStartTime;
78+
.setType(config.type())
79+
.setIgnoreKey(config.ignoreKey(), config.ignoreKeyTopics())
80+
.setIgnoreSchema(config.ignoreSchema(), config.ignoreSchemaTopics())
81+
.setCompactMapEntries(config.useCompactMapEntries())
82+
.setTopicToIndexMap(config.topicToIndexMap())
83+
.setFlushTimoutMs(config.flushTimeoutMs())
84+
.setMaxBufferedRecords(config.maxBufferedRecords())
85+
.setMaxInFlightRequests(config.maxInFlightRequests())
86+
.setBatchSize(config.batchSize())
87+
.setLingerMs(config.lingerMs())
88+
.setRetryBackoffMs(config.retryBackoffMs())
89+
.setMaxRetry(config.maxRetries())
90+
.setDropInvalidMessage(config.dropInvalidMessage())
91+
.setBehaviorOnNullValues(config.behaviorOnNullValues())
92+
.setBehaviorOnMalformedDoc(config.behaviorOnMalformedDoc());
93+
94+
this.createIndicesAtStartTime = config.createIndicesAtStart();
14395

14496
writer = builder.build();
14597
writer.start();
14698
log.info(
14799
"Started ElasticsearchSinkTask, will {} records with null values ('{}')",
148-
behaviorOnNullValues,
100+
config.behaviorOnNullValues().name(),
149101
ElasticsearchSinkConnectorConfig.BEHAVIOR_ON_NULL_VALUES_CONFIG
150102
);
151103
} catch (ConfigException e) {
@@ -195,16 +147,4 @@ public void stop() throws ConnectException {
195147
client.close();
196148
}
197149
}
198-
199-
private Map<String, String> parseMapConfig(List<String> values) {
200-
Map<String, String> map = new HashMap<>();
201-
for (String value : values) {
202-
String[] parts = value.split(":");
203-
String topic = parts[0];
204-
String type = parts[1];
205-
map.put(topic, type);
206-
}
207-
return map;
208-
}
209-
210150
}

src/main/java/io/confluent/connect/elasticsearch/bulk/BulkProcessor.java

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -584,10 +584,6 @@ public static String[] names() {
584584
return result;
585585
}
586586

587-
public static BehaviorOnMalformedDoc forValue(String value) {
588-
return valueOf(value.toUpperCase(Locale.ROOT));
589-
}
590-
591587
@Override
592588
public String toString() {
593589
return name().toLowerCase(Locale.ROOT);

src/main/java/io/confluent/connect/elasticsearch/jest/JestElasticsearchClient.java

Lines changed: 21 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,6 @@
5656
import org.apache.http.conn.ssl.SSLConnectionSocketFactory;
5757
import org.apache.http.nio.conn.ssl.SSLIOSessionStrategy;
5858
import org.apache.kafka.common.config.ConfigException;
59-
import org.apache.kafka.common.config.types.Password;
6059
import org.apache.kafka.common.network.Mode;
6160
import org.apache.kafka.common.security.ssl.SslFactory;
6261
import org.apache.kafka.common.utils.SystemTime;
@@ -149,12 +148,10 @@ protected JestElasticsearchClient(Map<String, String> props, JestClientFactory f
149148
factory.setHttpClientConfig(getClientConfig(config));
150149
this.client = factory.getObject();
151150
this.version = getServerVersion();
152-
this.writeMethod = WriteMethod.forValue(
153-
config.getString(ElasticsearchSinkConnectorConfig.WRITE_METHOD_CONFIG));
154-
this.retryBackoffMs =
155-
config.getLong(ElasticsearchSinkConnectorConfig.RETRY_BACKOFF_MS_CONFIG);
156-
this.maxRetries = config.getInt(ElasticsearchSinkConnectorConfig.MAX_RETRIES_CONFIG);
157-
this.timeout = config.getInt(ElasticsearchSinkConnectorConfig.READ_TIMEOUT_MS_CONFIG);
151+
this.writeMethod = config.writeMethod();
152+
this.retryBackoffMs = config.retryBackoffMs();
153+
this.maxRetries = config.maxRetries();
154+
this.timeout = config.readTimeoutMs();
158155
} catch (IOException e) {
159156
throw new ConnectException(
160157
"Couldn't start ElasticsearchSinkTask due to connection error:",
@@ -170,44 +167,35 @@ protected JestElasticsearchClient(Map<String, String> props, JestClientFactory f
170167

171168
// Visible for Testing
172169
public static HttpClientConfig getClientConfig(ElasticsearchSinkConnectorConfig config) {
173-
final int connTimeout = config.getInt(
174-
ElasticsearchSinkConnectorConfig.CONNECTION_TIMEOUT_MS_CONFIG);
175-
final int readTimeout = config.getInt(
176-
ElasticsearchSinkConnectorConfig.READ_TIMEOUT_MS_CONFIG);
177-
178-
final String username = config.getString(
179-
ElasticsearchSinkConnectorConfig.CONNECTION_USERNAME_CONFIG);
180-
final Password password = config.getPassword(
181-
ElasticsearchSinkConnectorConfig.CONNECTION_PASSWORD_CONFIG);
182-
List<String> address = config.getList(
183-
ElasticsearchSinkConnectorConfig.CONNECTION_URL_CONFIG);
184-
185-
final int maxInFlightRequests = config.getInt(
186-
ElasticsearchSinkConnectorConfig.MAX_IN_FLIGHT_REQUESTS_CONFIG);
187170

171+
Set<String> addresses = config.connectionUrls();
188172
HttpClientConfig.Builder builder =
189-
new HttpClientConfig.Builder(address)
190-
.connTimeout(connTimeout)
191-
.readTimeout(readTimeout)
192-
.defaultMaxTotalConnectionPerRoute(maxInFlightRequests)
173+
new HttpClientConfig.Builder(addresses)
174+
.connTimeout(config.connectionTimeoutMs())
175+
.readTimeout(config.readTimeoutMs())
176+
.defaultMaxTotalConnectionPerRoute(config.maxInFlightRequests())
193177
.multiThreaded(true);
194-
if (username != null && password != null) {
195-
builder.defaultCredentials(username, password.value())
196-
.preemptiveAuthTargetHosts(address.stream()
197-
.map(addr -> HttpHost.create(addr)).collect(Collectors.toSet()));
178+
if (config.isAuthenticatedConnection()) {
179+
builder.defaultCredentials(config.username(), config.password().value())
180+
.preemptiveAuthTargetHosts(
181+
addresses.stream().map(addr -> HttpHost.create(addr)).collect(Collectors.toSet())
182+
);
198183
}
199184

200185
if (config.secured()) {
201-
log.info("Using secured connection to {}", address);
186+
log.info("Using secured connection to {}", addresses);
202187
configureSslContext(builder, config);
203188
} else {
204-
log.info("Using unsecured connection to {}", address);
189+
log.info("Using unsecured connection to {}", addresses);
205190
}
206191
return builder.build();
207192
}
208193

209-
private static void configureSslContext(HttpClientConfig.Builder builder,
210-
ElasticsearchSinkConnectorConfig config) {
194+
private static void configureSslContext(
195+
HttpClientConfig.Builder builder,
196+
ElasticsearchSinkConnectorConfig config
197+
) {
198+
211199
SslFactory kafkaSslFactory = new SslFactory(Mode.CLIENT, null, false);
212200
kafkaSslFactory.configure(config.sslConfigs());
213201

@@ -649,10 +637,6 @@ public static String[] names() {
649637
return new String[] {INSERT.toString(), UPSERT.toString()};
650638
}
651639

652-
public static WriteMethod forValue(String value) {
653-
return valueOf(value.toUpperCase(Locale.ROOT));
654-
}
655-
656640
@Override
657641
public String toString() {
658642
return name().toLowerCase(Locale.ROOT);

src/test/java/io/confluent/connect/elasticsearch/ElasticsearchSinkConnectorConfigTest.java

Lines changed: 12 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
package io.confluent.connect.elasticsearch;
22

3+
import static org.junit.Assert.assertEquals;
4+
35
import org.apache.kafka.common.config.types.Password;
4-
import org.junit.Assert;
56
import org.junit.Before;
67
import org.junit.Test;
78

@@ -20,35 +21,23 @@ public void setup() {
2021
props = new HashMap<>();
2122
props.put(ElasticsearchSinkConnectorConfig.TYPE_NAME_CONFIG, ElasticsearchSinkTestBase.TYPE);
2223
props.put(ElasticsearchSinkConnectorConfig.CONNECTION_URL_CONFIG, "localhost");
23-
props.put(ElasticsearchSinkConnectorConfig.KEY_IGNORE_CONFIG, "true");
24+
props.put(ElasticsearchSinkConnectorConfig.IGNORE_KEY_CONFIG, "true");
2425
}
2526

2627
@Test
2728
public void testDefaultHttpTimeoutsConfig() {
2829
ElasticsearchSinkConnectorConfig config = new ElasticsearchSinkConnectorConfig(props);
29-
Assert.assertEquals(
30-
config.getInt(ElasticsearchSinkConnectorConfig.READ_TIMEOUT_MS_CONFIG),
31-
(Integer) 3000
32-
);
33-
Assert.assertEquals(
34-
config.getInt(ElasticsearchSinkConnectorConfig.CONNECTION_TIMEOUT_MS_CONFIG),
35-
(Integer) 1000
36-
);
30+
assertEquals(config.readTimeoutMs(), 3000);
31+
assertEquals(config.connectionTimeoutMs(), 1000);
3732
}
3833

3934
@Test
4035
public void testSetHttpTimeoutsConfig() {
4136
props.put(ElasticsearchSinkConnectorConfig.READ_TIMEOUT_MS_CONFIG, "10000");
4237
props.put(ElasticsearchSinkConnectorConfig.CONNECTION_TIMEOUT_MS_CONFIG, "15000");
4338
ElasticsearchSinkConnectorConfig config = new ElasticsearchSinkConnectorConfig(props);
44-
Assert.assertEquals(
45-
config.getInt(ElasticsearchSinkConnectorConfig.READ_TIMEOUT_MS_CONFIG),
46-
(Integer) 10000
47-
);
48-
Assert.assertEquals(
49-
config.getInt(ElasticsearchSinkConnectorConfig.CONNECTION_TIMEOUT_MS_CONFIG),
50-
(Integer) 15000
51-
);
39+
assertEquals(config.readTimeoutMs(), 10000);
40+
assertEquals(config.connectionTimeoutMs(), 15000);
5241
}
5342

5443
@Test
@@ -59,17 +48,17 @@ public void testSslConfigs() {
5948
props.put("elastic.https.ssl.truststore.password", "opensesame2");
6049
ElasticsearchSinkConnectorConfig config = new ElasticsearchSinkConnectorConfig(props);
6150
Map<String, Object> sslConfigs = config.sslConfigs();
62-
Assert.assertTrue(sslConfigs.size() > 0);
63-
Assert.assertEquals(
51+
assertTrue(sslConfigs.size() > 0);
52+
assertEquals(
6453
new Password("opensesame"),
6554
sslConfigs.get("ssl.keystore.password")
6655
);
67-
Assert.assertEquals(
56+
assertEquals(
6857
new Password("opensesame2"),
6958
sslConfigs.get("ssl.truststore.password")
7059
);
71-
Assert.assertEquals("/path", sslConfigs.get("ssl.keystore.location"));
72-
Assert.assertEquals("/path2", sslConfigs.get("ssl.truststore.location"));
60+
assertEquals("/path", sslConfigs.get("ssl.keystore.location"));
61+
assertEquals("/path2", sslConfigs.get("ssl.truststore.location"));
7362
}
7463

7564
@Test

src/test/java/io/confluent/connect/elasticsearch/ElasticsearchSinkTaskTest.java

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -22,11 +22,8 @@
2222
import org.apache.kafka.connect.data.Struct;
2323
import org.apache.kafka.connect.sink.SinkRecord;
2424
import org.elasticsearch.action.ActionFuture;
25-
import org.elasticsearch.action.ActionRequest;
26-
import org.elasticsearch.action.ActionRequestValidationException;
2725
import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsRequest;
2826
import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsResponse;
29-
import org.elasticsearch.action.index.IndexRequest;
3027
import org.elasticsearch.test.ESIntegTestCase;
3128
import org.elasticsearch.test.InternalTestCluster;
3229
import org.junit.Test;
@@ -56,7 +53,7 @@ private Map<String, String> createProps() {
5653
Map<String, String> props = new HashMap<>();
5754
props.put(ElasticsearchSinkConnectorConfig.TYPE_NAME_CONFIG, TYPE);
5855
props.put(ElasticsearchSinkConnectorConfig.CONNECTION_URL_CONFIG, "localhost");
59-
props.put(ElasticsearchSinkConnectorConfig.KEY_IGNORE_CONFIG, "true");
56+
props.put(ElasticsearchSinkConnectorConfig.IGNORE_KEY_CONFIG, "true");
6057
props.put(ElasticsearchSinkConnectorConfig.READ_TIMEOUT_MS_CONFIG, "3000");
6158
return props;
6259
}
@@ -129,7 +126,7 @@ public void testCreateAndWriteToIndexNotCreatedAtStartTime() {
129126
cluster.ensureAtLeastNumDataNodes(3);
130127
Map<String, String> props = createProps();
131128

132-
props.put(ElasticsearchSinkConnectorConfig.AUTO_CREATE_INDICES_AT_START_CONFIG, "false");
129+
props.put(ElasticsearchSinkConnectorConfig.CREATE_INDICES_AT_START_CONFIG, "false");
133130

134131
ElasticsearchSinkTask task = new ElasticsearchSinkTask();
135132

src/test/java/io/confluent/connect/elasticsearch/integration/ElasticsearchSinkTaskIT.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ private Map<String, String> createProps() {
5757
Map<String, String> props = new HashMap<>();
5858
props.put(ElasticsearchSinkConnectorConfig.TYPE_NAME_CONFIG, TYPE);
5959
props.put(ElasticsearchSinkConnectorConfig.CONNECTION_URL_CONFIG, container.getConnectionUrl());
60-
props.put(ElasticsearchSinkConnectorConfig.KEY_IGNORE_CONFIG, "true");
60+
props.put(ElasticsearchSinkConnectorConfig.IGNORE_KEY_CONFIG, "true");
6161
return props;
6262
}
6363

0 commit comments

Comments
 (0)