Skip to content

Commit dbb621f

Browse files
committed
Merge branch '3.2.x' into 3.3.x
2 parents e7a821d + e52099d commit dbb621f

File tree

2 files changed

+54
-10
lines changed

2 files changed

+54
-10
lines changed

src/main/java/io/confluent/connect/elasticsearch/ElasticsearchWriter.java

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -225,8 +225,7 @@ public void write(Collection<SinkRecord> records) {
225225
continue;
226226
}
227227

228-
final String indexOverride = topicToIndexMap.get(sinkRecord.topic());
229-
final String index = indexOverride != null ? indexOverride : sinkRecord.topic();
228+
final String index = convertTopicToIndexName(sinkRecord.topic());
230229
final boolean ignoreKey = ignoreKeyTopics.contains(sinkRecord.topic()) || this.ignoreKey;
231230
final boolean ignoreSchema = ignoreSchemaTopics.contains(sinkRecord.topic()) || this.ignoreSchema;
232231

@@ -261,6 +260,17 @@ public void write(Collection<SinkRecord> records) {
261260
}
262261
}
263262

263+
/**
264+
* Return the expected index name for a given topic, using the configured mapping or the topic name. Elasticsearch
265+
* <a href="https://github.com/elastic/elasticsearch/issues/29420">accepts only lowercase index names</a>.
266+
*/
267+
private String convertTopicToIndexName(String topic) {
268+
final String indexOverride = topicToIndexMap.get(topic);
269+
String index = indexOverride != null ? indexOverride : topic.toLowerCase();
270+
log.debug("Topic '{}' was translated as index '{}'", topic, index);
271+
return index;
272+
}
273+
264274
public void flush() {
265275
bulkProcessor.flush(flushTimeoutMs);
266276
}
@@ -309,12 +319,7 @@ public void createIndicesForTopics(Set<String> assignedTopics) {
309319
private Set<String> indicesForTopics(Set<String> assignedTopics) {
310320
final Set<String> indices = new HashSet<>();
311321
for (String topic : assignedTopics) {
312-
final String index = topicToIndexMap.get(topic);
313-
if (index != null) {
314-
indices.add(index);
315-
} else {
316-
indices.add(topic);
317-
}
322+
indices.add(convertTopicToIndexName(topic));
318323
}
319324
return indices;
320325
}

src/test/java/io/confluent/connect/elasticsearch/ElasticsearchSinkTaskTest.java

Lines changed: 41 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,23 +18,30 @@
1818

1919
import com.carrotsearch.randomizedtesting.annotations.ThreadLeakScope;
2020

21+
import org.apache.kafka.common.TopicPartition;
2122
import org.apache.kafka.connect.data.Schema;
2223
import org.apache.kafka.connect.data.Struct;
2324
import org.apache.kafka.connect.sink.SinkRecord;
2425
import org.elasticsearch.test.ESIntegTestCase;
2526
import org.elasticsearch.test.InternalTestCluster;
2627
import org.junit.Test;
2728

28-
import java.util.ArrayList;
2929
import java.util.Arrays;
30-
import java.util.Collection;
3130
import java.util.HashMap;
3231
import java.util.HashSet;
3332
import java.util.Map;
33+
import java.util.Collection;
34+
import java.util.ArrayList;
35+
import java.util.Collections;
36+
3437

3538
@ThreadLeakScope(ThreadLeakScope.Scope.NONE)
3639
public class ElasticsearchSinkTaskTest extends ElasticsearchSinkTestBase {
3740

41+
private static final String TOPIC_IN_CAPS = "AnotherTopicInCaps";
42+
private static final int PARTITION_113 = 113;
43+
private static final TopicPartition TOPIC_IN_CAPS_PARTITION = new TopicPartition(TOPIC_IN_CAPS, PARTITION_113);
44+
3845
private Map<String, String> createProps() {
3946
Map<String, String> props = new HashMap<>();
4047
props.put(ElasticsearchSinkConnectorConfig.TYPE_NAME_CONFIG, TYPE);
@@ -72,4 +79,36 @@ public void testPutAndFlush() throws Exception {
7279
verifySearchResults(records, true, false);
7380
}
7481

82+
@Test
83+
public void testCreateAndWriteToIndexForTopicWithUppercaseCharacters() {
84+
// We should as well test that writing a record with a previously un seen record will create
85+
// an index following the required elasticsearch requirements of lowercasing.
86+
InternalTestCluster cluster = ESIntegTestCase.internalCluster();
87+
cluster.ensureAtLeastNumDataNodes(3);
88+
Map<String, String> props = createProps();
89+
90+
ElasticsearchSinkTask task = new ElasticsearchSinkTask();
91+
92+
String key = "key";
93+
Schema schema = createSchema();
94+
Struct record = createRecord(schema);
95+
96+
SinkRecord sinkRecord = new SinkRecord(TOPIC_IN_CAPS,
97+
PARTITION_113,
98+
Schema.STRING_SCHEMA,
99+
key,
100+
schema,
101+
record,
102+
0 );
103+
104+
try {
105+
task.start(props, client);
106+
task.open(new HashSet<>(Collections.singletonList(TOPIC_IN_CAPS_PARTITION)));
107+
task.put(Collections.singleton(sinkRecord));
108+
} catch (Exception ex) {
109+
fail("A topic name not in lowercase can not be used as index name in Elasticsearch");
110+
} finally {
111+
task.stop();
112+
}
113+
}
75114
}

0 commit comments

Comments
 (0)