Skip to content

Commit 00b265f

Browse files
committed
Merge branch '5.0.x' into 5.1.x
2 parents e867ddb + 2460cc8 commit 00b265f

File tree

5 files changed

+101
-16
lines changed

5 files changed

+101
-16
lines changed

src/main/java/io/confluent/connect/elasticsearch/ElasticsearchSinkConnectorConfig.java

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -148,6 +148,11 @@ public class ElasticsearchSinkConnectorConfig extends AbstractConfig {
148148
+ " mapping conflict or a field name containing illegal characters. Valid options are "
149149
+ "'ignore', 'warn', and 'fail'.";
150150

151+
public static final String AUTO_CREATE_INDICES_AT_START_CONFIG = "auto.create.indices.at.start";
152+
private static final String AUTO_CREATE_INDICES_AT_START_DOC = "Auto create the Elasticsearch"
153+
+ " indices at startup. This is useful when the indices are a direct mapping "
154+
+ " of the Kafka topics.";
155+
151156
protected static ConfigDef baseConfigDef() {
152157
final ConfigDef configDef = new ConfigDef();
153158
addConnectorConfigs(configDef);
@@ -273,10 +278,21 @@ private static void addConnectorConfigs(ConfigDef configDef) {
273278
3000,
274279
Importance.LOW,
275280
READ_TIMEOUT_MS_CONFIG_DOC,
276-
group,
277-
++order,
278-
Width.SHORT,
279-
"Read Timeout");
281+
group,
282+
++order,
283+
Width.SHORT,
284+
"Read Timeout"
285+
).define(
286+
AUTO_CREATE_INDICES_AT_START_CONFIG,
287+
Type.BOOLEAN,
288+
true,
289+
Importance.LOW,
290+
AUTO_CREATE_INDICES_AT_START_DOC,
291+
group,
292+
++order,
293+
Width.SHORT,
294+
"Create indices at startup"
295+
);
280296
}
281297

282298
private static void addConversionConfigs(ConfigDef configDef) {

src/main/java/io/confluent/connect/elasticsearch/ElasticsearchSinkTask.java

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ public class ElasticsearchSinkTask extends SinkTask {
3939
private static final Logger log = LoggerFactory.getLogger(ElasticsearchSinkTask.class);
4040
private ElasticsearchWriter writer;
4141
private ElasticsearchClient client;
42+
private Boolean createIndicesAtStartTime;
4243

4344
@Override
4445
public String version() {
@@ -90,6 +91,8 @@ public void start(Map<String, String> props, ElasticsearchClient client) {
9091
config.getInt(ElasticsearchSinkConnectorConfig.MAX_RETRIES_CONFIG);
9192
boolean dropInvalidMessage =
9293
config.getBoolean(ElasticsearchSinkConnectorConfig.DROP_INVALID_MESSAGE_CONFIG);
94+
boolean createIndicesAtStartTime =
95+
config.getBoolean(ElasticsearchSinkConnectorConfig.AUTO_CREATE_INDICES_AT_START_CONFIG);
9396

9497
DataConverter.BehaviorOnNullValues behaviorOnNullValues =
9598
DataConverter.BehaviorOnNullValues.forValue(
@@ -136,6 +139,8 @@ public void start(Map<String, String> props, ElasticsearchClient client) {
136139
.setBehaviorOnNullValues(behaviorOnNullValues)
137140
.setBehaviorOnMalformedDoc(behaviorOnMalformedDoc);
138141

142+
this.createIndicesAtStartTime = createIndicesAtStartTime;
143+
139144
writer = builder.build();
140145
writer.start();
141146
} catch (ConfigException e) {
@@ -149,11 +154,13 @@ public void start(Map<String, String> props, ElasticsearchClient client) {
149154
@Override
150155
public void open(Collection<TopicPartition> partitions) {
151156
log.debug("Opening the task for topic partitions: {}", partitions);
152-
Set<String> topics = new HashSet<>();
153-
for (TopicPartition tp : partitions) {
154-
topics.add(tp.topic());
157+
if (createIndicesAtStartTime) {
158+
Set<String> topics = new HashSet<>();
159+
for (TopicPartition tp : partitions) {
160+
topics.add(tp.topic());
161+
}
162+
writer.createIndicesForTopics(topics);
155163
}
156-
writer.createIndicesForTopics(topics);
157164
}
158165

159166
@Override

src/main/java/io/confluent/connect/elasticsearch/ElasticsearchWriter.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -252,6 +252,8 @@ public void write(Collection<SinkRecord> records) {
252252
final boolean ignoreSchema =
253253
ignoreSchemaTopics.contains(sinkRecord.topic()) || this.ignoreSchema;
254254

255+
client.createIndices(Collections.singleton(index));
256+
255257
if (!ignoreSchema && !existingMappings.contains(index)) {
256258
try {
257259
if (Mapping.getMapping(client, index, type) == null) {

src/main/java/io/confluent/connect/elasticsearch/jest/JestElasticsearchClient.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,12 @@
1919
import com.fasterxml.jackson.databind.node.JsonNodeFactory;
2020
import com.fasterxml.jackson.databind.node.ObjectNode;
2121
import com.google.gson.JsonObject;
22-
import io.confluent.connect.elasticsearch.bulk.BulkRequest;
2322
import io.confluent.connect.elasticsearch.ElasticsearchClient;
2423
import io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig;
2524
import io.confluent.connect.elasticsearch.IndexableRecord;
2625
import io.confluent.connect.elasticsearch.Key;
2726
import io.confluent.connect.elasticsearch.Mapping;
27+
import io.confluent.connect.elasticsearch.bulk.BulkRequest;
2828
import io.confluent.connect.elasticsearch.bulk.BulkResponse;
2929
import io.searchbox.action.Action;
3030
import io.searchbox.action.BulkableAction;
@@ -53,6 +53,7 @@
5353

5454
import java.io.IOException;
5555
import java.util.ArrayList;
56+
import java.util.HashSet;
5657
import java.util.List;
5758
import java.util.Map;
5859
import java.util.Set;
@@ -73,6 +74,8 @@ public class JestElasticsearchClient implements ElasticsearchClient {
7374
private final JestClient client;
7475
private final Version version;
7576

77+
private final Set<String> indexCache = new HashSet<>();
78+
7679
// visible for testing
7780
public JestElasticsearchClient(JestClient client) {
7881
try {
@@ -215,7 +218,10 @@ public Version getVersion() {
215218
}
216219

217220
private boolean indexExists(String index) {
218-
Action<JestResult> action = new IndicesExists.Builder(index).build();
221+
if (indexCache.contains(index)) {
222+
return true;
223+
}
224+
Action<?> action = new IndicesExists.Builder(index).build();
219225
try {
220226
JestResult result = client.execute(action);
221227
return result.isSucceeded();
@@ -237,6 +243,7 @@ public void createIndices(Set<String> indices) {
237243
throw new ConnectException("Could not create index '" + index + "'" + msg);
238244
}
239245
}
246+
indexCache.add(index);
240247
} catch (IOException e) {
241248
throw new ConnectException(e);
242249
}

src/test/java/io/confluent/connect/elasticsearch/ElasticsearchSinkTaskTest.java

Lines changed: 59 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,12 @@
2121
import org.apache.kafka.connect.data.Schema;
2222
import org.apache.kafka.connect.data.Struct;
2323
import org.apache.kafka.connect.sink.SinkRecord;
24+
import org.elasticsearch.action.ActionFuture;
25+
import org.elasticsearch.action.ActionRequest;
26+
import org.elasticsearch.action.ActionRequestValidationException;
27+
import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsRequest;
28+
import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsResponse;
29+
import org.elasticsearch.action.index.IndexRequest;
2430
import org.elasticsearch.test.ESIntegTestCase;
2531
import org.elasticsearch.test.InternalTestCluster;
2632
import org.junit.Test;
@@ -41,6 +47,11 @@ public class ElasticsearchSinkTaskTest extends ElasticsearchSinkTestBase {
4147
private static final int PARTITION_113 = 113;
4248
private static final TopicPartition TOPIC_IN_CAPS_PARTITION = new TopicPartition(TOPIC_IN_CAPS, PARTITION_113);
4349

50+
private static final String UNSEEN_TOPIC = "UnseenTopic";
51+
private static final int PARTITION_114 = 114;
52+
private static final TopicPartition UNSEEN_TOPIC_PARTITION = new TopicPartition(UNSEEN_TOPIC, PARTITION_114);
53+
54+
4455
private Map<String, String> createProps() {
4556
Map<String, String> props = new HashMap<>();
4657
props.put(ElasticsearchSinkConnectorConfig.TYPE_NAME_CONFIG, TYPE);
@@ -93,12 +104,12 @@ public void testCreateAndWriteToIndexForTopicWithUppercaseCharacters() {
93104
Struct record = createRecord(schema);
94105

95106
SinkRecord sinkRecord = new SinkRecord(TOPIC_IN_CAPS,
96-
PARTITION_113,
97-
Schema.STRING_SCHEMA,
98-
key,
99-
schema,
100-
record,
101-
0 );
107+
PARTITION_113,
108+
Schema.STRING_SCHEMA,
109+
key,
110+
schema,
111+
record,
112+
0 );
102113

103114
try {
104115
task.start(props, client);
@@ -110,4 +121,46 @@ public void testCreateAndWriteToIndexForTopicWithUppercaseCharacters() {
110121
task.stop();
111122
}
112123
}
124+
125+
@Test
126+
public void testCreateAndWriteToIndexNotCreatedAtStartTime() {
127+
InternalTestCluster cluster = ESIntegTestCase.internalCluster();
128+
cluster.ensureAtLeastNumDataNodes(3);
129+
Map<String, String> props = createProps();
130+
131+
props.put(ElasticsearchSinkConnectorConfig.AUTO_CREATE_INDICES_AT_START_CONFIG, "false");
132+
133+
ElasticsearchSinkTask task = new ElasticsearchSinkTask();
134+
135+
String key = "key";
136+
Schema schema = createSchema();
137+
Struct record = createRecord(schema);
138+
139+
SinkRecord sinkRecord = new SinkRecord(UNSEEN_TOPIC,
140+
PARTITION_114,
141+
Schema.STRING_SCHEMA,
142+
key,
143+
schema,
144+
record,
145+
0 );
146+
147+
task.start(props, client);
148+
task.open(new HashSet<>(Collections.singletonList(TOPIC_IN_CAPS_PARTITION)));
149+
task.put(Collections.singleton(sinkRecord));
150+
task.stop();
151+
152+
assertTrue(UNSEEN_TOPIC + " index created without errors ",
153+
verifyIndexExist(cluster, UNSEEN_TOPIC.toLowerCase()));
154+
155+
}
156+
157+
private boolean verifyIndexExist(InternalTestCluster cluster, String ... indices) {
158+
ActionFuture<IndicesExistsResponse> action = cluster
159+
.client()
160+
.admin()
161+
.indices()
162+
.exists(new IndicesExistsRequest(indices));
163+
164+
return action.actionGet().isExists();
165+
}
113166
}

0 commit comments

Comments
 (0)