Skip to content

Commit 2460cc8

Browse files
authored
Merge pull request #286 from wicknicks/merge-275-into-5.0.x
Merge 275 into 5.0.x
2 parents 9bab0d6 + 429d5d7 commit 2460cc8

File tree

5 files changed

+101
-16
lines changed

5 files changed

+101
-16
lines changed

src/main/java/io/confluent/connect/elasticsearch/ElasticsearchSinkConnectorConfig.java

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -149,6 +149,11 @@ public class ElasticsearchSinkConnectorConfig extends AbstractConfig {
149149
+ " mapping conflict or a field name containing illegal characters. Valid options are "
150150
+ "'ignore', 'warn', and 'fail'.";
151151

152+
public static final String AUTO_CREATE_INDICES_AT_START_CONFIG = "auto.create.indices.at.start";
153+
private static final String AUTO_CREATE_INDICES_AT_START_DOC = "Auto create the Elasticsearch"
154+
+ " indices at startup. This is useful when the indices are a direct mapping "
155+
+ " of the Kafka topics.";
156+
152157
protected static ConfigDef baseConfigDef() {
153158
final ConfigDef configDef = new ConfigDef();
154159
addConnectorConfigs(configDef);
@@ -274,10 +279,21 @@ private static void addConnectorConfigs(ConfigDef configDef) {
274279
3000,
275280
Importance.LOW,
276281
READ_TIMEOUT_MS_CONFIG_DOC,
277-
group,
278-
++order,
279-
Width.SHORT,
280-
"Read Timeout");
282+
group,
283+
++order,
284+
Width.SHORT,
285+
"Read Timeout"
286+
).define(
287+
AUTO_CREATE_INDICES_AT_START_CONFIG,
288+
Type.BOOLEAN,
289+
true,
290+
Importance.LOW,
291+
AUTO_CREATE_INDICES_AT_START_DOC,
292+
group,
293+
++order,
294+
Width.SHORT,
295+
"Create indices at startup"
296+
);
281297
}
282298

283299
private static void addConversionConfigs(ConfigDef configDef) {

src/main/java/io/confluent/connect/elasticsearch/ElasticsearchSinkTask.java

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ public class ElasticsearchSinkTask extends SinkTask {
4040
private static final Logger log = LoggerFactory.getLogger(ElasticsearchSinkTask.class);
4141
private ElasticsearchWriter writer;
4242
private ElasticsearchClient client;
43+
private Boolean createIndicesAtStartTime;
4344

4445
@Override
4546
public String version() {
@@ -91,6 +92,8 @@ public void start(Map<String, String> props, ElasticsearchClient client) {
9192
config.getInt(ElasticsearchSinkConnectorConfig.MAX_RETRIES_CONFIG);
9293
boolean dropInvalidMessage =
9394
config.getBoolean(ElasticsearchSinkConnectorConfig.DROP_INVALID_MESSAGE_CONFIG);
95+
boolean createIndicesAtStartTime =
96+
config.getBoolean(ElasticsearchSinkConnectorConfig.AUTO_CREATE_INDICES_AT_START_CONFIG);
9497

9598
DataConverter.BehaviorOnNullValues behaviorOnNullValues =
9699
DataConverter.BehaviorOnNullValues.forValue(
@@ -137,6 +140,8 @@ public void start(Map<String, String> props, ElasticsearchClient client) {
137140
.setBehaviorOnNullValues(behaviorOnNullValues)
138141
.setBehaviorOnMalformedDoc(behaviorOnMalformedDoc);
139142

143+
this.createIndicesAtStartTime = createIndicesAtStartTime;
144+
140145
writer = builder.build();
141146
writer.start();
142147
} catch (ConfigException e) {
@@ -150,11 +155,13 @@ public void start(Map<String, String> props, ElasticsearchClient client) {
150155
@Override
151156
public void open(Collection<TopicPartition> partitions) {
152157
log.debug("Opening the task for topic partitions: {}", partitions);
153-
Set<String> topics = new HashSet<>();
154-
for (TopicPartition tp : partitions) {
155-
topics.add(tp.topic());
158+
if (createIndicesAtStartTime) {
159+
Set<String> topics = new HashSet<>();
160+
for (TopicPartition tp : partitions) {
161+
topics.add(tp.topic());
162+
}
163+
writer.createIndicesForTopics(topics);
156164
}
157-
writer.createIndicesForTopics(topics);
158165
}
159166

160167
@Override

src/main/java/io/confluent/connect/elasticsearch/ElasticsearchWriter.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -253,6 +253,8 @@ public void write(Collection<SinkRecord> records) {
253253
final boolean ignoreSchema =
254254
ignoreSchemaTopics.contains(sinkRecord.topic()) || this.ignoreSchema;
255255

256+
client.createIndices(Collections.singleton(index));
257+
256258
if (!ignoreSchema && !existingMappings.contains(index)) {
257259
try {
258260
if (Mapping.getMapping(client, index, type) == null) {

src/main/java/io/confluent/connect/elasticsearch/jest/JestElasticsearchClient.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,12 +20,12 @@
2020
import com.fasterxml.jackson.databind.node.JsonNodeFactory;
2121
import com.fasterxml.jackson.databind.node.ObjectNode;
2222
import com.google.gson.JsonObject;
23-
import io.confluent.connect.elasticsearch.bulk.BulkRequest;
2423
import io.confluent.connect.elasticsearch.ElasticsearchClient;
2524
import io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig;
2625
import io.confluent.connect.elasticsearch.IndexableRecord;
2726
import io.confluent.connect.elasticsearch.Key;
2827
import io.confluent.connect.elasticsearch.Mapping;
28+
import io.confluent.connect.elasticsearch.bulk.BulkRequest;
2929
import io.confluent.connect.elasticsearch.bulk.BulkResponse;
3030
import io.searchbox.action.Action;
3131
import io.searchbox.action.BulkableAction;
@@ -54,6 +54,7 @@
5454

5555
import java.io.IOException;
5656
import java.util.ArrayList;
57+
import java.util.HashSet;
5758
import java.util.List;
5859
import java.util.Map;
5960
import java.util.Set;
@@ -74,6 +75,8 @@ public class JestElasticsearchClient implements ElasticsearchClient {
7475
private final JestClient client;
7576
private final Version version;
7677

78+
private final Set<String> indexCache = new HashSet<>();
79+
7780
// visible for testing
7881
public JestElasticsearchClient(JestClient client) {
7982
try {
@@ -216,7 +219,10 @@ public Version getVersion() {
216219
}
217220

218221
private boolean indexExists(String index) {
219-
Action<JestResult> action = new IndicesExists.Builder(index).build();
222+
if (indexCache.contains(index)) {
223+
return true;
224+
}
225+
Action<?> action = new IndicesExists.Builder(index).build();
220226
try {
221227
JestResult result = client.execute(action);
222228
return result.isSucceeded();
@@ -238,6 +244,7 @@ public void createIndices(Set<String> indices) {
238244
throw new ConnectException("Could not create index '" + index + "'" + msg);
239245
}
240246
}
247+
indexCache.add(index);
241248
} catch (IOException e) {
242249
throw new ConnectException(e);
243250
}

src/test/java/io/confluent/connect/elasticsearch/ElasticsearchSinkTaskTest.java

Lines changed: 59 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,12 @@
2222
import org.apache.kafka.connect.data.Schema;
2323
import org.apache.kafka.connect.data.Struct;
2424
import org.apache.kafka.connect.sink.SinkRecord;
25+
import org.elasticsearch.action.ActionFuture;
26+
import org.elasticsearch.action.ActionRequest;
27+
import org.elasticsearch.action.ActionRequestValidationException;
28+
import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsRequest;
29+
import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsResponse;
30+
import org.elasticsearch.action.index.IndexRequest;
2531
import org.elasticsearch.test.ESIntegTestCase;
2632
import org.elasticsearch.test.InternalTestCluster;
2733
import org.junit.Test;
@@ -42,6 +48,11 @@ public class ElasticsearchSinkTaskTest extends ElasticsearchSinkTestBase {
4248
private static final int PARTITION_113 = 113;
4349
private static final TopicPartition TOPIC_IN_CAPS_PARTITION = new TopicPartition(TOPIC_IN_CAPS, PARTITION_113);
4450

51+
private static final String UNSEEN_TOPIC = "UnseenTopic";
52+
private static final int PARTITION_114 = 114;
53+
private static final TopicPartition UNSEEN_TOPIC_PARTITION = new TopicPartition(UNSEEN_TOPIC, PARTITION_114);
54+
55+
4556
private Map<String, String> createProps() {
4657
Map<String, String> props = new HashMap<>();
4758
props.put(ElasticsearchSinkConnectorConfig.TYPE_NAME_CONFIG, TYPE);
@@ -94,12 +105,12 @@ public void testCreateAndWriteToIndexForTopicWithUppercaseCharacters() {
94105
Struct record = createRecord(schema);
95106

96107
SinkRecord sinkRecord = new SinkRecord(TOPIC_IN_CAPS,
97-
PARTITION_113,
98-
Schema.STRING_SCHEMA,
99-
key,
100-
schema,
101-
record,
102-
0 );
108+
PARTITION_113,
109+
Schema.STRING_SCHEMA,
110+
key,
111+
schema,
112+
record,
113+
0 );
103114

104115
try {
105116
task.start(props, client);
@@ -111,4 +122,46 @@ public void testCreateAndWriteToIndexForTopicWithUppercaseCharacters() {
111122
task.stop();
112123
}
113124
}
125+
126+
@Test
127+
public void testCreateAndWriteToIndexNotCreatedAtStartTime() {
128+
InternalTestCluster cluster = ESIntegTestCase.internalCluster();
129+
cluster.ensureAtLeastNumDataNodes(3);
130+
Map<String, String> props = createProps();
131+
132+
props.put(ElasticsearchSinkConnectorConfig.AUTO_CREATE_INDICES_AT_START_CONFIG, "false");
133+
134+
ElasticsearchSinkTask task = new ElasticsearchSinkTask();
135+
136+
String key = "key";
137+
Schema schema = createSchema();
138+
Struct record = createRecord(schema);
139+
140+
SinkRecord sinkRecord = new SinkRecord(UNSEEN_TOPIC,
141+
PARTITION_114,
142+
Schema.STRING_SCHEMA,
143+
key,
144+
schema,
145+
record,
146+
0 );
147+
148+
task.start(props, client);
149+
task.open(new HashSet<>(Collections.singletonList(TOPIC_IN_CAPS_PARTITION)));
150+
task.put(Collections.singleton(sinkRecord));
151+
task.stop();
152+
153+
assertTrue(UNSEEN_TOPIC + " index created without errors ",
154+
verifyIndexExist(cluster, UNSEEN_TOPIC.toLowerCase()));
155+
156+
}
157+
158+
private boolean verifyIndexExist(InternalTestCluster cluster, String ... indices) {
159+
ActionFuture<IndicesExistsResponse> action = cluster
160+
.client()
161+
.admin()
162+
.indices()
163+
.exists(new IndicesExistsRequest(indices));
164+
165+
return action.actionGet().isExists();
166+
}
114167
}

0 commit comments

Comments
 (0)