Skip to content

Commit 3c5b8d0

Browse files
committed
* Apply general review feedback on naming and conventions
* Update indexes to indices per comment by @rayokota * Introduce a local in memory cache to prevent recurrent calls to indexExist to Elasticsearch * use a HashSet as a cache and not a HashMap * general code style
1 parent ce2c11a commit 3c5b8d0

File tree

4 files changed

+57
-33
lines changed

4 files changed

+57
-33
lines changed

src/main/java/io/confluent/connect/elasticsearch/ElasticsearchSinkConnectorConfig.java

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -139,9 +139,9 @@ public class ElasticsearchSinkConnectorConfig extends AbstractConfig {
139139
+ " mapping conflict or a field name containing illegal characters. Valid options are "
140140
+ "'ignore', 'warn', and 'fail'.";
141141

142-
public static final String CREATE_INDEX_AT_START_CONFIG = "create.index.at.start";
143-
private static final String CREATE_INDEX_AT_START__DOC = "Create the Elasticsearch indexes at "
144-
+ " bootstrap time. This is useful when the indexes are a direct mapping "
142+
public static final String AUTO_CREATE_INDICES_AT_START_CONFIG = "auto.create.indices.at.start";
143+
private static final String AUTO_CREATE_INDICES_AT_START_DOC = "Auto create the Elasticsearch"
144+
+ " indices at bootstrap time. This is useful when the indices are a direct mapping "
145145
+ " of the Kafka topics.";
146146

147147
protected static ConfigDef baseConfigDef() {
@@ -254,15 +254,16 @@ private static void addConnectorConfigs(ConfigDef configDef) {
254254
Width.SHORT,
255255
"Read Timeout"
256256
).define(
257-
CREATE_INDEX_AT_START_CONFIG,
257+
AUTO_CREATE_INDICES_AT_START_CONFIG,
258258
Type.BOOLEAN,
259259
true,
260260
Importance.LOW,
261-
CREATE_INDEX_AT_START__DOC,
261+
AUTO_CREATE_INDICES_AT_START_DOC,
262262
group,
263263
++order,
264264
Width.SHORT,
265-
"Create indexes at bootstrap time");
265+
"Create indices at bootstrap time"
266+
);
266267
}
267268

268269
private static void addConversionConfigs(ConfigDef configDef) {

src/main/java/io/confluent/connect/elasticsearch/ElasticsearchSinkTask.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ public class ElasticsearchSinkTask extends SinkTask {
4040
private static final Logger log = LoggerFactory.getLogger(ElasticsearchSinkTask.class);
4141
private ElasticsearchWriter writer;
4242
private ElasticsearchClient client;
43-
private Boolean createIndexAtStartTime;
43+
private Boolean createIndicesAtStartTime;
4444

4545
@Override
4646
public String version() {
@@ -91,8 +91,8 @@ public void start(Map<String, String> props, ElasticsearchClient client) {
9191
config.getInt(ElasticsearchSinkConnectorConfig.MAX_RETRIES_CONFIG);
9292
boolean dropInvalidMessage =
9393
config.getBoolean(ElasticsearchSinkConnectorConfig.DROP_INVALID_MESSAGE_CONFIG);
94-
boolean createIndexAtStartTime =
95-
config.getBoolean(ElasticsearchSinkConnectorConfig.CREATE_INDEX_AT_START_CONFIG);
94+
boolean createIndicesAtStartTime =
95+
config.getBoolean(ElasticsearchSinkConnectorConfig.AUTO_CREATE_INDICES_AT_START_CONFIG);
9696

9797
DataConverter.BehaviorOnNullValues behaviorOnNullValues =
9898
DataConverter.BehaviorOnNullValues.forValue(
@@ -139,7 +139,7 @@ public void start(Map<String, String> props, ElasticsearchClient client) {
139139
.setBehaviorOnNullValues(behaviorOnNullValues)
140140
.setBehaviorOnMalformedDoc(behaviorOnMalformedDoc);
141141

142-
this.createIndexAtStartTime = createIndexAtStartTime;
142+
this.createIndicesAtStartTime = createIndicesAtStartTime;
143143

144144
writer = builder.build();
145145
writer.start();
@@ -154,7 +154,7 @@ public void start(Map<String, String> props, ElasticsearchClient client) {
154154
@Override
155155
public void open(Collection<TopicPartition> partitions) {
156156
log.debug("Opening the task for topic partitions: {}", partitions);
157-
if (createIndexAtStartTime) {
157+
if (createIndicesAtStartTime) {
158158
Set<String> topics = new HashSet<>();
159159
for (TopicPartition tp : partitions) {
160160
topics.add(tp.topic());

src/main/java/io/confluent/connect/elasticsearch/jest/JestElasticsearchClient.java

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -20,12 +20,12 @@
2020
import com.fasterxml.jackson.databind.node.JsonNodeFactory;
2121
import com.fasterxml.jackson.databind.node.ObjectNode;
2222
import com.google.gson.JsonObject;
23-
import io.confluent.connect.elasticsearch.bulk.BulkRequest;
2423
import io.confluent.connect.elasticsearch.ElasticsearchClient;
2524
import io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig;
2625
import io.confluent.connect.elasticsearch.IndexableRecord;
2726
import io.confluent.connect.elasticsearch.Key;
2827
import io.confluent.connect.elasticsearch.Mapping;
28+
import io.confluent.connect.elasticsearch.bulk.BulkRequest;
2929
import io.confluent.connect.elasticsearch.bulk.BulkResponse;
3030
import io.searchbox.action.Action;
3131
import io.searchbox.action.BulkableAction;
@@ -44,17 +44,17 @@
4444
import io.searchbox.indices.IndicesExists;
4545
import io.searchbox.indices.mapping.GetMapping;
4646
import io.searchbox.indices.mapping.PutMapping;
47-
import org.apache.kafka.common.config.ConfigException;
48-
import org.apache.kafka.connect.data.Schema;
49-
import org.apache.kafka.connect.errors.ConnectException;
50-
import org.slf4j.Logger;
51-
import org.slf4j.LoggerFactory;
52-
5347
import java.io.IOException;
5448
import java.util.ArrayList;
49+
import java.util.HashSet;
5550
import java.util.List;
5651
import java.util.Map;
5752
import java.util.Set;
53+
import org.apache.kafka.common.config.ConfigException;
54+
import org.apache.kafka.connect.data.Schema;
55+
import org.apache.kafka.connect.errors.ConnectException;
56+
import org.slf4j.Logger;
57+
import org.slf4j.LoggerFactory;
5858

5959
public class JestElasticsearchClient implements ElasticsearchClient {
6060

@@ -71,6 +71,8 @@ public class JestElasticsearchClient implements ElasticsearchClient {
7171
private final JestClient client;
7272
private final Version version;
7373

74+
private final Set<String> indexCache = new HashSet<>();
75+
7476
// visible for testing
7577
public JestElasticsearchClient(JestClient client) {
7678
try {
@@ -187,6 +189,9 @@ public Version getVersion() {
187189
}
188190

189191
private boolean indexExists(String index) {
192+
if (indexCache.contains(index)) {
193+
return true;
194+
}
190195
Action action = new IndicesExists.Builder(index).build();
191196
try {
192197
JestResult result = client.execute(action);
@@ -209,6 +214,7 @@ public void createIndices(Set<String> indices) {
209214
throw new ConnectException("Could not create index '" + index + "'" + msg);
210215
}
211216
}
217+
indexCache.add(index);
212218
} catch (IOException e) {
213219
throw new ConnectException(e);
214220
}

src/test/java/io/confluent/connect/elasticsearch/ElasticsearchSinkTaskTest.java

Lines changed: 32 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,12 @@
2222
import org.apache.kafka.connect.data.Schema;
2323
import org.apache.kafka.connect.data.Struct;
2424
import org.apache.kafka.connect.sink.SinkRecord;
25+
import org.elasticsearch.action.ActionFuture;
26+
import org.elasticsearch.action.ActionRequest;
27+
import org.elasticsearch.action.ActionRequestValidationException;
28+
import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsRequest;
29+
import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsResponse;
30+
import org.elasticsearch.action.index.IndexRequest;
2531
import org.elasticsearch.test.ESIntegTestCase;
2632
import org.elasticsearch.test.InternalTestCluster;
2733
import org.junit.Test;
@@ -123,28 +129,39 @@ public void testCreateAndWriteToIndexNotCreatedAtStartTime() {
123129
cluster.ensureAtLeastNumDataNodes(3);
124130
Map<String, String> props = createProps();
125131

132+
props.put(ElasticsearchSinkConnectorConfig.AUTO_CREATE_INDICES_AT_START_CONFIG, "false");
133+
126134
ElasticsearchSinkTask task = new ElasticsearchSinkTask();
127135

128136
String key = "key";
129137
Schema schema = createSchema();
130138
Struct record = createRecord(schema);
131139

132140
SinkRecord sinkRecord = new SinkRecord(UNSEEN_TOPIC,
133-
PARTITION_114,
134-
Schema.STRING_SCHEMA,
135-
key,
136-
schema,
137-
record,
138-
0 );
141+
PARTITION_114,
142+
Schema.STRING_SCHEMA,
143+
key,
144+
schema,
145+
record,
146+
0 );
139147

140-
try {
141-
task.start(props, client);
142-
task.open(new HashSet<>(Collections.singletonList(TOPIC_IN_CAPS_PARTITION)));
143-
task.put(Collections.singleton(sinkRecord));
144-
} catch (Exception ex) {
145-
fail("Record could not be written to elasticsearch due to non existing index");
146-
} finally {
147-
task.stop();
148-
}
148+
task.start(props, client);
149+
task.open(new HashSet<>(Collections.singletonList(TOPIC_IN_CAPS_PARTITION)));
150+
task.put(Collections.singleton(sinkRecord));
151+
task.stop();
152+
153+
assertTrue(UNSEEN_TOPIC + " index created without errors ",
154+
verifyIndexExist(cluster, UNSEEN_TOPIC.toLowerCase()));
155+
156+
}
157+
158+
private boolean verifyIndexExist(InternalTestCluster cluster, String ... indices) {
159+
ActionFuture<IndicesExistsResponse> action = cluster
160+
.client()
161+
.admin()
162+
.indices()
163+
.exists(new IndicesExistsRequest(indices));
164+
165+
return action.actionGet().isExists();
149166
}
150167
}

0 commit comments

Comments
 (0)