Skip to content

Commit 52277a4

Browse files
authored
Merge pull request #275 from purbon/fix/remove_index_at_creation41x
Disable index creation at bootstrap time
2 parents 78b05e0 + 52f4444 commit 52277a4

File tree

5 files changed

+105
-21
lines changed

5 files changed

+105
-21
lines changed

src/main/java/io/confluent/connect/elasticsearch/ElasticsearchSinkConnectorConfig.java

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,11 @@ public class ElasticsearchSinkConnectorConfig extends AbstractConfig {
139139
+ " mapping conflict or a field name containing illegal characters. Valid options are "
140140
+ "'ignore', 'warn', and 'fail'.";
141141

142+
public static final String AUTO_CREATE_INDICES_AT_START_CONFIG = "auto.create.indices.at.start";
143+
private static final String AUTO_CREATE_INDICES_AT_START_DOC = "Auto create the Elasticsearch"
144+
+ " indices at startup. This is useful when the indices are a direct mapping "
145+
+ " of the Kafka topics.";
146+
142147
protected static ConfigDef baseConfigDef() {
143148
final ConfigDef configDef = new ConfigDef();
144149
addConnectorConfigs(configDef);
@@ -244,10 +249,21 @@ private static void addConnectorConfigs(ConfigDef configDef) {
244249
3000,
245250
Importance.LOW,
246251
READ_TIMEOUT_MS_CONFIG_DOC,
247-
group,
248-
++order,
249-
Width.SHORT,
250-
"Read Timeout");
252+
group,
253+
++order,
254+
Width.SHORT,
255+
"Read Timeout"
256+
).define(
257+
AUTO_CREATE_INDICES_AT_START_CONFIG,
258+
Type.BOOLEAN,
259+
true,
260+
Importance.LOW,
261+
AUTO_CREATE_INDICES_AT_START_DOC,
262+
group,
263+
++order,
264+
Width.SHORT,
265+
"Create indices at startup"
266+
);
251267
}
252268

253269
private static void addConversionConfigs(ConfigDef configDef) {

src/main/java/io/confluent/connect/elasticsearch/ElasticsearchSinkTask.java

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ public class ElasticsearchSinkTask extends SinkTask {
4040
private static final Logger log = LoggerFactory.getLogger(ElasticsearchSinkTask.class);
4141
private ElasticsearchWriter writer;
4242
private ElasticsearchClient client;
43+
private Boolean createIndicesAtStartTime;
4344

4445
@Override
4546
public String version() {
@@ -90,6 +91,8 @@ public void start(Map<String, String> props, ElasticsearchClient client) {
9091
config.getInt(ElasticsearchSinkConnectorConfig.MAX_RETRIES_CONFIG);
9192
boolean dropInvalidMessage =
9293
config.getBoolean(ElasticsearchSinkConnectorConfig.DROP_INVALID_MESSAGE_CONFIG);
94+
boolean createIndicesAtStartTime =
95+
config.getBoolean(ElasticsearchSinkConnectorConfig.AUTO_CREATE_INDICES_AT_START_CONFIG);
9396

9497
DataConverter.BehaviorOnNullValues behaviorOnNullValues =
9598
DataConverter.BehaviorOnNullValues.forValue(
@@ -136,6 +139,8 @@ public void start(Map<String, String> props, ElasticsearchClient client) {
136139
.setBehaviorOnNullValues(behaviorOnNullValues)
137140
.setBehaviorOnMalformedDoc(behaviorOnMalformedDoc);
138141

142+
this.createIndicesAtStartTime = createIndicesAtStartTime;
143+
139144
writer = builder.build();
140145
writer.start();
141146
} catch (ConfigException e) {
@@ -149,11 +154,13 @@ public void start(Map<String, String> props, ElasticsearchClient client) {
149154
@Override
150155
public void open(Collection<TopicPartition> partitions) {
151156
log.debug("Opening the task for topic partitions: {}", partitions);
152-
Set<String> topics = new HashSet<>();
153-
for (TopicPartition tp : partitions) {
154-
topics.add(tp.topic());
157+
if (createIndicesAtStartTime) {
158+
Set<String> topics = new HashSet<>();
159+
for (TopicPartition tp : partitions) {
160+
topics.add(tp.topic());
161+
}
162+
writer.createIndicesForTopics(topics);
155163
}
156-
writer.createIndicesForTopics(topics);
157164
}
158165

159166
@Override

src/main/java/io/confluent/connect/elasticsearch/ElasticsearchWriter.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -253,6 +253,8 @@ public void write(Collection<SinkRecord> records) {
253253
final boolean ignoreSchema =
254254
ignoreSchemaTopics.contains(sinkRecord.topic()) || this.ignoreSchema;
255255

256+
client.createIndices(Collections.singleton(index));
257+
256258
if (!ignoreSchema && !existingMappings.contains(index)) {
257259
try {
258260
if (Mapping.getMapping(client, index, type) == null) {

src/main/java/io/confluent/connect/elasticsearch/jest/JestElasticsearchClient.java

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -20,12 +20,12 @@
2020
import com.fasterxml.jackson.databind.node.JsonNodeFactory;
2121
import com.fasterxml.jackson.databind.node.ObjectNode;
2222
import com.google.gson.JsonObject;
23-
import io.confluent.connect.elasticsearch.bulk.BulkRequest;
2423
import io.confluent.connect.elasticsearch.ElasticsearchClient;
2524
import io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig;
2625
import io.confluent.connect.elasticsearch.IndexableRecord;
2726
import io.confluent.connect.elasticsearch.Key;
2827
import io.confluent.connect.elasticsearch.Mapping;
28+
import io.confluent.connect.elasticsearch.bulk.BulkRequest;
2929
import io.confluent.connect.elasticsearch.bulk.BulkResponse;
3030
import io.searchbox.action.Action;
3131
import io.searchbox.action.BulkableAction;
@@ -44,17 +44,17 @@
4444
import io.searchbox.indices.IndicesExists;
4545
import io.searchbox.indices.mapping.GetMapping;
4646
import io.searchbox.indices.mapping.PutMapping;
47-
import org.apache.kafka.common.config.ConfigException;
48-
import org.apache.kafka.connect.data.Schema;
49-
import org.apache.kafka.connect.errors.ConnectException;
50-
import org.slf4j.Logger;
51-
import org.slf4j.LoggerFactory;
52-
5347
import java.io.IOException;
5448
import java.util.ArrayList;
49+
import java.util.HashSet;
5550
import java.util.List;
5651
import java.util.Map;
5752
import java.util.Set;
53+
import org.apache.kafka.common.config.ConfigException;
54+
import org.apache.kafka.connect.data.Schema;
55+
import org.apache.kafka.connect.errors.ConnectException;
56+
import org.slf4j.Logger;
57+
import org.slf4j.LoggerFactory;
5858

5959
public class JestElasticsearchClient implements ElasticsearchClient {
6060

@@ -71,6 +71,8 @@ public class JestElasticsearchClient implements ElasticsearchClient {
7171
private final JestClient client;
7272
private final Version version;
7373

74+
private final Set<String> indexCache = new HashSet<>();
75+
7476
// visible for testing
7577
public JestElasticsearchClient(JestClient client) {
7678
try {
@@ -187,6 +189,9 @@ public Version getVersion() {
187189
}
188190

189191
private boolean indexExists(String index) {
192+
if (indexCache.contains(index)) {
193+
return true;
194+
}
190195
Action action = new IndicesExists.Builder(index).build();
191196
try {
192197
JestResult result = client.execute(action);
@@ -209,6 +214,7 @@ public void createIndices(Set<String> indices) {
209214
throw new ConnectException("Could not create index '" + index + "'" + msg);
210215
}
211216
}
217+
indexCache.add(index);
212218
} catch (IOException e) {
213219
throw new ConnectException(e);
214220
}

src/test/java/io/confluent/connect/elasticsearch/ElasticsearchSinkTaskTest.java

Lines changed: 59 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,12 @@
2222
import org.apache.kafka.connect.data.Schema;
2323
import org.apache.kafka.connect.data.Struct;
2424
import org.apache.kafka.connect.sink.SinkRecord;
25+
import org.elasticsearch.action.ActionFuture;
26+
import org.elasticsearch.action.ActionRequest;
27+
import org.elasticsearch.action.ActionRequestValidationException;
28+
import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsRequest;
29+
import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsResponse;
30+
import org.elasticsearch.action.index.IndexRequest;
2531
import org.elasticsearch.test.ESIntegTestCase;
2632
import org.elasticsearch.test.InternalTestCluster;
2733
import org.junit.Test;
@@ -42,6 +48,11 @@ public class ElasticsearchSinkTaskTest extends ElasticsearchSinkTestBase {
4248
private static final int PARTITION_113 = 113;
4349
private static final TopicPartition TOPIC_IN_CAPS_PARTITION = new TopicPartition(TOPIC_IN_CAPS, PARTITION_113);
4450

51+
private static final String UNSEEN_TOPIC = "UnseenTopic";
52+
private static final int PARTITION_114 = 114;
53+
private static final TopicPartition UNSEEN_TOPIC_PARTITION = new TopicPartition(UNSEEN_TOPIC, PARTITION_114);
54+
55+
4556
private Map<String, String> createProps() {
4657
Map<String, String> props = new HashMap<>();
4758
props.put(ElasticsearchSinkConnectorConfig.TYPE_NAME_CONFIG, TYPE);
@@ -94,12 +105,12 @@ public void testCreateAndWriteToIndexForTopicWithUppercaseCharacters() {
94105
Struct record = createRecord(schema);
95106

96107
SinkRecord sinkRecord = new SinkRecord(TOPIC_IN_CAPS,
97-
PARTITION_113,
98-
Schema.STRING_SCHEMA,
99-
key,
100-
schema,
101-
record,
102-
0 );
108+
PARTITION_113,
109+
Schema.STRING_SCHEMA,
110+
key,
111+
schema,
112+
record,
113+
0 );
103114

104115
try {
105116
task.start(props, client);
@@ -111,4 +122,46 @@ public void testCreateAndWriteToIndexForTopicWithUppercaseCharacters() {
111122
task.stop();
112123
}
113124
}
125+
126+
@Test
127+
public void testCreateAndWriteToIndexNotCreatedAtStartTime() {
128+
InternalTestCluster cluster = ESIntegTestCase.internalCluster();
129+
cluster.ensureAtLeastNumDataNodes(3);
130+
Map<String, String> props = createProps();
131+
132+
props.put(ElasticsearchSinkConnectorConfig.AUTO_CREATE_INDICES_AT_START_CONFIG, "false");
133+
134+
ElasticsearchSinkTask task = new ElasticsearchSinkTask();
135+
136+
String key = "key";
137+
Schema schema = createSchema();
138+
Struct record = createRecord(schema);
139+
140+
SinkRecord sinkRecord = new SinkRecord(UNSEEN_TOPIC,
141+
PARTITION_114,
142+
Schema.STRING_SCHEMA,
143+
key,
144+
schema,
145+
record,
146+
0 );
147+
148+
task.start(props, client);
149+
task.open(new HashSet<>(Collections.singletonList(TOPIC_IN_CAPS_PARTITION)));
150+
task.put(Collections.singleton(sinkRecord));
151+
task.stop();
152+
153+
assertTrue(UNSEEN_TOPIC + " index created without errors ",
154+
verifyIndexExist(cluster, UNSEEN_TOPIC.toLowerCase()));
155+
156+
}
157+
158+
private boolean verifyIndexExist(InternalTestCluster cluster, String ... indices) {
159+
ActionFuture<IndicesExistsResponse> action = cluster
160+
.client()
161+
.admin()
162+
.indices()
163+
.exists(new IndicesExistsRequest(indices));
164+
165+
return action.actionGet().isExists();
166+
}
114167
}

0 commit comments

Comments
 (0)