Skip to content

Commit e52099d

Browse files
authored
Merge pull request #251 from purbon/fix/elastic-lowercase-indexname
[Backport ready] Fix bug when topic names are not in all lowercase
2 parents 26984c3 + 72f996e commit e52099d

File tree

2 files changed

+54
-10
lines changed

2 files changed

+54
-10
lines changed

src/main/java/io/confluent/connect/elasticsearch/ElasticsearchWriter.java

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -190,8 +190,7 @@ public ElasticsearchWriter build() {
190190

191191
public void write(Collection<SinkRecord> records) {
192192
for (SinkRecord sinkRecord : records) {
193-
final String indexOverride = topicToIndexMap.get(sinkRecord.topic());
194-
final String index = indexOverride != null ? indexOverride : sinkRecord.topic();
193+
final String index = convertTopicToIndexName(sinkRecord.topic());
195194
final boolean ignoreKey = ignoreKeyTopics.contains(sinkRecord.topic()) || this.ignoreKey;
196195
final boolean ignoreSchema = ignoreSchemaTopics.contains(sinkRecord.topic()) || this.ignoreSchema;
197196

@@ -213,6 +212,17 @@ public void write(Collection<SinkRecord> records) {
213212
}
214213
}
215214

215+
/**
216+
* Return the expected index name for a given topic, using the configured mapping or the topic name. Elasticsearch
217+
* <a href="https://github.com/elastic/elasticsearch/issues/29420">accepts only lowercase index names</a>.
218+
*/
219+
private String convertTopicToIndexName(String topic) {
220+
final String indexOverride = topicToIndexMap.get(topic);
221+
String index = indexOverride != null ? indexOverride : topic.toLowerCase();
222+
log.debug("Topic '{}' was translated as index '{}'", topic, index);
223+
return index;
224+
}
225+
216226
public void flush() {
217227
bulkProcessor.flush(flushTimeoutMs);
218228
}
@@ -261,12 +271,7 @@ public void createIndicesForTopics(Set<String> assignedTopics) {
261271
private Set<String> indicesForTopics(Set<String> assignedTopics) {
262272
final Set<String> indices = new HashSet<>();
263273
for (String topic : assignedTopics) {
264-
final String index = topicToIndexMap.get(topic);
265-
if (index != null) {
266-
indices.add(index);
267-
} else {
268-
indices.add(topic);
269-
}
274+
indices.add(convertTopicToIndexName(topic));
270275
}
271276
return indices;
272277
}

src/test/java/io/confluent/connect/elasticsearch/ElasticsearchSinkTaskTest.java

Lines changed: 41 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,23 +18,30 @@
1818

1919
import com.carrotsearch.randomizedtesting.annotations.ThreadLeakScope;
2020

21+
import org.apache.kafka.common.TopicPartition;
2122
import org.apache.kafka.connect.data.Schema;
2223
import org.apache.kafka.connect.data.Struct;
2324
import org.apache.kafka.connect.sink.SinkRecord;
2425
import org.elasticsearch.test.ESIntegTestCase;
2526
import org.elasticsearch.test.InternalTestCluster;
2627
import org.junit.Test;
2728

28-
import java.util.ArrayList;
2929
import java.util.Arrays;
30-
import java.util.Collection;
3130
import java.util.HashMap;
3231
import java.util.HashSet;
3332
import java.util.Map;
33+
import java.util.Collection;
34+
import java.util.ArrayList;
35+
import java.util.Collections;
36+
3437

3538
@ThreadLeakScope(ThreadLeakScope.Scope.NONE)
3639
public class ElasticsearchSinkTaskTest extends ElasticsearchSinkTestBase {
3740

41+
private static final String TOPIC_IN_CAPS = "AnotherTopicInCaps";
42+
private static final int PARTITION_113 = 113;
43+
private static final TopicPartition TOPIC_IN_CAPS_PARTITION = new TopicPartition(TOPIC_IN_CAPS, PARTITION_113);
44+
3845
private Map<String, String> createProps() {
3946
Map<String, String> props = new HashMap<>();
4047
props.put(ElasticsearchSinkConnectorConfig.TYPE_NAME_CONFIG, TYPE);
@@ -72,4 +79,36 @@ public void testPutAndFlush() throws Exception {
7279
verifySearchResults(records, true, false);
7380
}
7481

82+
@Test
83+
public void testCreateAndWriteToIndexForTopicWithUppercaseCharacters() {
84+
// We should as well test that writing a record with a previously un seen record will create
85+
// an index following the required elasticsearch requirements of lowercasing.
86+
InternalTestCluster cluster = ESIntegTestCase.internalCluster();
87+
cluster.ensureAtLeastNumDataNodes(3);
88+
Map<String, String> props = createProps();
89+
90+
ElasticsearchSinkTask task = new ElasticsearchSinkTask();
91+
92+
String key = "key";
93+
Schema schema = createSchema();
94+
Struct record = createRecord(schema);
95+
96+
SinkRecord sinkRecord = new SinkRecord(TOPIC_IN_CAPS,
97+
PARTITION_113,
98+
Schema.STRING_SCHEMA,
99+
key,
100+
schema,
101+
record,
102+
0 );
103+
104+
try {
105+
task.start(props, client);
106+
task.open(new HashSet<>(Collections.singletonList(TOPIC_IN_CAPS_PARTITION)));
107+
task.put(Collections.singleton(sinkRecord));
108+
} catch (Exception ex) {
109+
fail("A topic name not in lowercase can not be used as index name in Elasticsearch");
110+
} finally {
111+
task.stop();
112+
}
113+
}
75114
}

0 commit comments

Comments
 (0)