Skip to content

Commit 34a2723

Browse files
authored
Merge branch '11.1.x' into pr_merge_from_11_0_x_to_11_1_x
2 parents f159b7a + 0ff91e0 commit 34a2723

32 files changed

+3311
-488
lines changed

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
# Kafka Connect Elasticsearch Connector
22
[![FOSSA Status](https://app.fossa.io/api/projects/git%2Bhttps%3A%2F%2Fgithub.com%2Fconfluentinc%2Fkafka-connect-elasticsearch.svg?type=shield)](https://app.fossa.io/projects/git%2Bhttps%3A%2F%2Fgithub.com%2Fconfluentinc%2Fkafka-connect-elasticsearch?ref=badge_shield)
33

4+
Changelog for this connector can be found [here](https://docs.confluent.io/kafka-connect-elasticsearch/current/changelog.html).
45

56
kafka-connect-elasticsearch is a [Kafka Connector](http://kafka.apache.org/documentation.html#connect)
67
for copying data between Kafka and Elasticsearch.

checkstyle/suppressions.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
<!-- switch statements on types exceed maximum complexity -->
1010
<suppress
1111
checks="(CyclomaticComplexity)"
12-
files="Mapping.java"
12+
files="(Mapping|DataConverter).java"
1313
/>
1414

1515
<suppress

pom.xml

Lines changed: 37 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010

1111
<groupId>io.confluent</groupId>
1212
<artifactId>kafka-connect-elasticsearch</artifactId>
13-
<version>11.0.15-SNAPSHOT</version>
13+
<version>11.1.20-SNAPSHOT</version>
1414
<packaging>jar</packaging>
1515
<name>kafka-connect-elasticsearch</name>
1616
<organization>
@@ -34,7 +34,7 @@
3434
<connection>scm:git:git://github.com/confluentinc/kafka-connect-elasticsearch.git</connection>
3535
<developerConnection>scm:git:git@github.com:confluentinc/kafka-connect-elasticsearch.git</developerConnection>
3636
<url>https://github.com/confluentinc/kafka-connect-elasticsearch</url>
37-
<tag>11.0.x</tag>
37+
<tag>11.1.x</tag>
3838
</scm>
3939

4040
<properties>
@@ -47,7 +47,9 @@
4747
<maven.release.plugin.version>2.5.3</maven.release.plugin.version>
4848
<hadoop.version>3.3.0</hadoop.version>
4949
<apacheds-jdbm1.version>2.0.0-M2</apacheds-jdbm1.version>
50-
<jackson.databind.version>2.10.5.1</jackson.databind.version>
50+
<!-- TODO: Remove the version pin after releasing https://github.com/confluentinc/common/pull/494 -->
51+
<jackson.databind.version>2.15.0</jackson.databind.version>
52+
<jackson.version>2.15.0</jackson.version>
5153
<!-- temporary fix by pinning the version until we upgrade to a version of common that contains this or newer version.
5254
See https://github.com/confluentinc/common/pull/332 for details -->
5355
<dependency.check.version>6.1.6</dependency.check.version>
@@ -60,7 +62,7 @@
6062
<repository>
6163
<id>confluent</id>
6264
<name>Confluent</name>
63-
<url>http://packages.confluent.io/maven/</url>
65+
<url>https://packages.confluent.io/maven/</url>
6466
</repository>
6567
</repositories>
6668

@@ -84,11 +86,17 @@
8486
<groupId>org.elasticsearch</groupId>
8587
<artifactId>elasticsearch</artifactId>
8688
<version>${es.version}</version>
89+
<exclusions>
90+
<exclusion>
91+
<groupId>org.yaml</groupId>
92+
<artifactId>snakeyaml</artifactId>
93+
</exclusion>
94+
</exclusions>
8795
</dependency>
8896
<dependency>
8997
<groupId>org.apache.logging.log4j</groupId>
9098
<artifactId>log4j-api</artifactId>
91-
<version>2.16.0</version>
99+
<version>2.17.1</version>
92100
</dependency>
93101
<!-- pin jackson-dataformat-cbor for CVE - the version comes from confluentinc/common -->
94102
<dependency>
@@ -101,13 +109,6 @@
101109
<artifactId>commons-codec</artifactId>
102110
<version>1.15</version>
103111
</dependency>
104-
<!-- pin snakeyaml for CVE -->
105-
<dependency>
106-
<groupId>org.yaml</groupId>
107-
<artifactId>snakeyaml</artifactId>
108-
<version>1.27</version>
109-
</dependency>
110-
<!-- pin httpclient for CVE -->
111112
<dependency>
112113
<groupId>org.apache.httpcomponents</groupId>
113114
<artifactId>httpclient</artifactId>
@@ -282,11 +283,33 @@
282283

283284
<dependencyManagement>
284285
<dependencies>
286+
<dependency>
287+
<groupId>com.fasterxml.jackson.core</groupId>
288+
<artifactId>jackson-core</artifactId>
289+
<version>${jackson.version}</version>
290+
</dependency>
291+
<dependency>
292+
<groupId>com.fasterxml.jackson.core</groupId>
293+
<artifactId>jackson-annotations</artifactId>
294+
<version>${jackson.version}</version>
295+
</dependency>
285296
<dependency>
286297
<groupId>com.fasterxml.jackson.core</groupId>
287298
<artifactId>jackson-databind</artifactId>
288299
<version>${jackson.databind.version}</version>
289300
</dependency>
301+
<dependency>
302+
<groupId>com.fasterxml.jackson</groupId>
303+
<artifactId>jackson-bom</artifactId>
304+
<version>${jackson.version}</version>
305+
<scope>import</scope>
306+
<type>pom</type>
307+
</dependency>
308+
<dependency>
309+
<groupId>com.fasterxml.jackson.dataformat</groupId>
310+
<artifactId>jackson-dataformat-cbor</artifactId>
311+
<version>${jackson.version}</version>
312+
</dependency>
290313
</dependencies>
291314
</dependencyManagement>
292315

@@ -341,7 +364,7 @@
341364
<limit>
342365
<counter>INSTRUCTION</counter>
343366
<value>COVEREDRATIO</value>
344-
<minimum>0.80</minimum>
367+
<minimum>0.85</minimum>
345368
</limit>
346369
<limit>
347370
<counter>BRANCH</counter>
@@ -376,7 +399,7 @@
376399
</execution>
377400
<execution>
378401
<id>report</id>
379-
<phase>verify</phase>
402+
<phase>test</phase>
380403
<goals>
381404
<goal>report</goal>
382405
</goals>
Lines changed: 181 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,181 @@
1+
/*
2+
* Copyright 2018 Confluent Inc.
3+
*
4+
* Licensed under the Confluent Community License (the "License"); you may not use
5+
* this file except in compliance with the License. You may obtain a copy of the
6+
* License at
7+
*
8+
* http://www.confluent.io/confluent-community-license
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
13+
* specific language governing permissions and limitations under the License.
14+
*/
15+
16+
package io.confluent.connect.elasticsearch;
17+
18+
import java.util.Collection;
19+
import java.util.HashMap;
20+
import java.util.Iterator;
21+
import java.util.LinkedHashMap;
22+
import java.util.Map;
23+
import java.util.concurrent.atomic.AtomicLong;
24+
25+
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
26+
import org.apache.kafka.common.TopicPartition;
27+
import org.apache.kafka.connect.errors.ConnectException;
28+
import org.apache.kafka.connect.sink.SinkRecord;
29+
import org.apache.kafka.connect.sink.SinkTaskContext;
30+
import org.slf4j.Logger;
31+
import org.slf4j.LoggerFactory;
32+
33+
import static java.util.stream.Collectors.toMap;
34+
35+
/**
36+
* It's an asynchronous implementation of <code>OffsetTracker</code>
37+
*
38+
* <p>Since ElasticsearchClient can potentially process multiple batches asynchronously for the same
39+
* partition, if we don't want to wait for all in-flight batches at the end of the put call
40+
* (or flush/preCommit) we need to keep track of what's the highest offset that is safe to commit.
41+
* For now, we do that at the individual record level because batching is handled by BulkProcessor,
42+
* and we don't have control over grouping/ordering.
43+
*/
44+
class AsyncOffsetTracker implements OffsetTracker {
45+
46+
private static final Logger log = LoggerFactory.getLogger(AsyncOffsetTracker.class);
47+
48+
private final Map<TopicPartition, Map<Long, OffsetState>> offsetsByPartition = new HashMap<>();
49+
private final Map<TopicPartition, Long> maxOffsetByPartition = new HashMap<>();
50+
51+
private final AtomicLong numEntries = new AtomicLong();
52+
private final SinkTaskContext context;
53+
54+
public AsyncOffsetTracker(SinkTaskContext context) {
55+
this.context = context;
56+
}
57+
58+
static class AsyncOffsetState implements OffsetState {
59+
60+
private final long offset;
61+
private volatile boolean processed;
62+
63+
AsyncOffsetState(long offset) {
64+
this.offset = offset;
65+
}
66+
67+
@Override
68+
public void markProcessed() {
69+
processed = true;
70+
}
71+
72+
@Override
73+
public boolean isProcessed() {
74+
return processed;
75+
}
76+
77+
@Override
78+
public long offset() {
79+
return offset;
80+
}
81+
}
82+
83+
/**
84+
* Partitions are no longer owned, we should release all related resources.
85+
* @param topicPartitions partitions to close
86+
*/
87+
@Override
88+
public synchronized void closePartitions(Collection<TopicPartition> topicPartitions) {
89+
topicPartitions.forEach(tp -> {
90+
Map<Long, OffsetState> offsets = offsetsByPartition.remove(tp);
91+
if (offsets != null) {
92+
numEntries.getAndAdd(-offsets.size());
93+
}
94+
maxOffsetByPartition.remove(tp);
95+
});
96+
}
97+
98+
/**
99+
* This method assumes that new records are added in offset order.
100+
* Older records can be re-added, and the same Offset object will be return if its
101+
* offset hasn't been reported yet.
102+
* @param sinkRecord record to add
103+
* @return offset state record that can be used to mark the record as processed
104+
*/
105+
@Override
106+
public synchronized OffsetState addPendingRecord(
107+
SinkRecord sinkRecord
108+
) {
109+
log.trace("Adding pending record");
110+
TopicPartition tp = new TopicPartition(sinkRecord.topic(), sinkRecord.kafkaPartition());
111+
if (!context.assignment().contains(tp)) {
112+
String msg = String.format("Found a topic name '%s' that doesn't match assigned partitions."
113+
+ " Connector doesn't support topic mutating SMTs", sinkRecord.topic());
114+
throw new ConnectException(msg);
115+
}
116+
Long partitionMax = maxOffsetByPartition.get(tp);
117+
if (partitionMax == null || sinkRecord.kafkaOffset() > partitionMax) {
118+
numEntries.incrementAndGet();
119+
return offsetsByPartition
120+
// Insertion order needs to be maintained
121+
.computeIfAbsent(tp, key -> new LinkedHashMap<>())
122+
.computeIfAbsent(sinkRecord.kafkaOffset(), AsyncOffsetState::new);
123+
} else {
124+
return new AsyncOffsetState(sinkRecord.kafkaOffset());
125+
}
126+
}
127+
128+
/**
129+
* @return overall number of entries currently in memory.
130+
*/
131+
@Override
132+
public long numOffsetStateEntries() {
133+
return numEntries.get();
134+
}
135+
136+
/**
137+
* Move offsets to the highest we can.
138+
*/
139+
@Override
140+
public synchronized void updateOffsets() {
141+
log.trace("Updating offsets");
142+
offsetsByPartition.forEach(((topicPartition, offsets) -> {
143+
Long max = maxOffsetByPartition.get(topicPartition);
144+
boolean newMaxFound = false;
145+
Iterator<OffsetState> iterator = offsets.values().iterator();
146+
while (iterator.hasNext()) {
147+
OffsetState offsetState = iterator.next();
148+
if (offsetState.isProcessed()) {
149+
iterator.remove();
150+
numEntries.decrementAndGet();
151+
if (max == null || offsetState.offset() > max) {
152+
max = offsetState.offset();
153+
newMaxFound = true;
154+
}
155+
} else {
156+
break;
157+
}
158+
}
159+
if (newMaxFound) {
160+
maxOffsetByPartition.put(topicPartition, max);
161+
}
162+
}));
163+
log.trace("Updated offsets, num entries: {}", numEntries);
164+
}
165+
166+
/**
167+
* @param currentOffsets current offsets from a task
168+
* @return offsets to commit
169+
*/
170+
@Override
171+
public synchronized Map<TopicPartition, OffsetAndMetadata> offsets(
172+
Map<TopicPartition, OffsetAndMetadata> currentOffsets
173+
) {
174+
return maxOffsetByPartition.entrySet().stream()
175+
.collect(toMap(
176+
Map.Entry::getKey,
177+
// The offsets you commit are the offsets of the messages you want to read next
178+
// (not the offsets of the messages you did read last)
179+
e -> new OffsetAndMetadata(e.getValue() + 1)));
180+
}
181+
}

0 commit comments

Comments
 (0)