Skip to content

Commit ce2c11a

Browse files
committed
Add an option to disable index creation at bootstrap time, so index are created once every request to write an record is performed
1 parent 2713dd7 commit ce2c11a

File tree

4 files changed

+70
-10
lines changed

4 files changed

+70
-10
lines changed

src/main/java/io/confluent/connect/elasticsearch/ElasticsearchSinkConnectorConfig.java

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,11 @@ public class ElasticsearchSinkConnectorConfig extends AbstractConfig {
139139
+ " mapping conflict or a field name containing illegal characters. Valid options are "
140140
+ "'ignore', 'warn', and 'fail'.";
141141

142+
public static final String CREATE_INDEX_AT_START_CONFIG = "create.index.at.start";
143+
private static final String CREATE_INDEX_AT_START__DOC = "Create the Elasticsearch indexes at "
144+
+ " bootstrap time. This is useful when the indexes are a direct mapping "
145+
+ " of the Kafka topics.";
146+
142147
protected static ConfigDef baseConfigDef() {
143148
final ConfigDef configDef = new ConfigDef();
144149
addConnectorConfigs(configDef);
@@ -244,10 +249,20 @@ private static void addConnectorConfigs(ConfigDef configDef) {
244249
3000,
245250
Importance.LOW,
246251
READ_TIMEOUT_MS_CONFIG_DOC,
247-
group,
248-
++order,
249-
Width.SHORT,
250-
"Read Timeout");
252+
group,
253+
++order,
254+
Width.SHORT,
255+
"Read Timeout"
256+
).define(
257+
CREATE_INDEX_AT_START_CONFIG,
258+
Type.BOOLEAN,
259+
true,
260+
Importance.LOW,
261+
CREATE_INDEX_AT_START__DOC,
262+
group,
263+
++order,
264+
Width.SHORT,
265+
"Create indexes at bootstrap time");
251266
}
252267

253268
private static void addConversionConfigs(ConfigDef configDef) {

src/main/java/io/confluent/connect/elasticsearch/ElasticsearchSinkTask.java

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ public class ElasticsearchSinkTask extends SinkTask {
4040
private static final Logger log = LoggerFactory.getLogger(ElasticsearchSinkTask.class);
4141
private ElasticsearchWriter writer;
4242
private ElasticsearchClient client;
43+
private Boolean createIndexAtStartTime;
4344

4445
@Override
4546
public String version() {
@@ -90,6 +91,8 @@ public void start(Map<String, String> props, ElasticsearchClient client) {
9091
config.getInt(ElasticsearchSinkConnectorConfig.MAX_RETRIES_CONFIG);
9192
boolean dropInvalidMessage =
9293
config.getBoolean(ElasticsearchSinkConnectorConfig.DROP_INVALID_MESSAGE_CONFIG);
94+
boolean createIndexAtStartTime =
95+
config.getBoolean(ElasticsearchSinkConnectorConfig.CREATE_INDEX_AT_START_CONFIG);
9396

9497
DataConverter.BehaviorOnNullValues behaviorOnNullValues =
9598
DataConverter.BehaviorOnNullValues.forValue(
@@ -136,6 +139,8 @@ public void start(Map<String, String> props, ElasticsearchClient client) {
136139
.setBehaviorOnNullValues(behaviorOnNullValues)
137140
.setBehaviorOnMalformedDoc(behaviorOnMalformedDoc);
138141

142+
this.createIndexAtStartTime = createIndexAtStartTime;
143+
139144
writer = builder.build();
140145
writer.start();
141146
} catch (ConfigException e) {
@@ -149,11 +154,13 @@ public void start(Map<String, String> props, ElasticsearchClient client) {
149154
@Override
150155
public void open(Collection<TopicPartition> partitions) {
151156
log.debug("Opening the task for topic partitions: {}", partitions);
152-
Set<String> topics = new HashSet<>();
153-
for (TopicPartition tp : partitions) {
154-
topics.add(tp.topic());
157+
if (createIndexAtStartTime) {
158+
Set<String> topics = new HashSet<>();
159+
for (TopicPartition tp : partitions) {
160+
topics.add(tp.topic());
161+
}
162+
writer.createIndicesForTopics(topics);
155163
}
156-
writer.createIndicesForTopics(topics);
157164
}
158165

159166
@Override

src/main/java/io/confluent/connect/elasticsearch/ElasticsearchWriter.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -253,6 +253,8 @@ public void write(Collection<SinkRecord> records) {
253253
final boolean ignoreSchema =
254254
ignoreSchemaTopics.contains(sinkRecord.topic()) || this.ignoreSchema;
255255

256+
client.createIndices(Collections.singleton(index));
257+
256258
if (!ignoreSchema && !existingMappings.contains(index)) {
257259
try {
258260
if (Mapping.getMapping(client, index, type) == null) {

src/test/java/io/confluent/connect/elasticsearch/ElasticsearchSinkTaskTest.java

Lines changed: 38 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,11 @@ public class ElasticsearchSinkTaskTest extends ElasticsearchSinkTestBase {
4242
private static final int PARTITION_113 = 113;
4343
private static final TopicPartition TOPIC_IN_CAPS_PARTITION = new TopicPartition(TOPIC_IN_CAPS, PARTITION_113);
4444

45+
private static final String UNSEEN_TOPIC = "UnseenTopic";
46+
private static final int PARTITION_114 = 114;
47+
private static final TopicPartition UNSEEN_TOPIC_PARTITION = new TopicPartition(UNSEEN_TOPIC, PARTITION_114);
48+
49+
4550
private Map<String, String> createProps() {
4651
Map<String, String> props = new HashMap<>();
4752
props.put(ElasticsearchSinkConnectorConfig.TYPE_NAME_CONFIG, TYPE);
@@ -94,7 +99,38 @@ public void testCreateAndWriteToIndexForTopicWithUppercaseCharacters() {
9499
Struct record = createRecord(schema);
95100

96101
SinkRecord sinkRecord = new SinkRecord(TOPIC_IN_CAPS,
97-
PARTITION_113,
102+
PARTITION_113,
103+
Schema.STRING_SCHEMA,
104+
key,
105+
schema,
106+
record,
107+
0 );
108+
109+
try {
110+
task.start(props, client);
111+
task.open(new HashSet<>(Collections.singletonList(TOPIC_IN_CAPS_PARTITION)));
112+
task.put(Collections.singleton(sinkRecord));
113+
} catch (Exception ex) {
114+
fail("A topic name not in lowercase can not be used as index name in Elasticsearch");
115+
} finally {
116+
task.stop();
117+
}
118+
}
119+
120+
@Test
121+
public void testCreateAndWriteToIndexNotCreatedAtStartTime() {
122+
InternalTestCluster cluster = ESIntegTestCase.internalCluster();
123+
cluster.ensureAtLeastNumDataNodes(3);
124+
Map<String, String> props = createProps();
125+
126+
ElasticsearchSinkTask task = new ElasticsearchSinkTask();
127+
128+
String key = "key";
129+
Schema schema = createSchema();
130+
Struct record = createRecord(schema);
131+
132+
SinkRecord sinkRecord = new SinkRecord(UNSEEN_TOPIC,
133+
PARTITION_114,
98134
Schema.STRING_SCHEMA,
99135
key,
100136
schema,
@@ -106,7 +142,7 @@ public void testCreateAndWriteToIndexForTopicWithUppercaseCharacters() {
106142
task.open(new HashSet<>(Collections.singletonList(TOPIC_IN_CAPS_PARTITION)));
107143
task.put(Collections.singleton(sinkRecord));
108144
} catch (Exception ex) {
109-
fail("A topic name not in lowercase can not be used as index name in Elasticsearch");
145+
fail("Record could not be written to elasticsearch due to non existing index");
110146
} finally {
111147
task.stop();
112148
}

0 commit comments

Comments
 (0)