Skip to content

Commit a3747d4

Browse files
Added key reader to source connector. Resolves #3
1 parent 83ecf0b commit a3747d4

15 files changed

+410
-175
lines changed

src/docs/asciidoc/source.adoc

Lines changed: 38 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -8,27 +8,56 @@ The {name} reads data from a Redis Enterprise database and publishes messages to
88
The {name} includes the following features:
99

1010
* <<at-least-once-delivery,At least once delivery>>
11+
* <<tasks,Multiple tasks>>
12+
* <<key-reader,Key Reader>>
1113
* <<stream-reader,Stream Reader>>
1214

1315
[[at-least-once-delivery]]
1416
=== At least once delivery
1517
The {name} guarantees that records from the Kafka topic are delivered at least once.
1618

19+
[[tasks]]
20+
=== Multiple Tasks
21+
Use configuration property `tasks.max` to have the change stream handled by multiple tasks. The connector splits the work based on the number of configured key patterns. When the number of tasks is greater than the number of patterns, the number of patterns will be used instead.
22+
23+
[[key-reader]]
24+
=== Key Reader
25+
In key reader mode, the {name} captures changes happening to keys in a Redis database and publishes keys and values to a Kafka topic. The data structure key will be mapped to the record key, and the value will be mapped to the record value.
26+
27+
[IMPORTANT]
28+
.Supported Data Structures
29+
====
30+
The {name} supports the following data structures:
31+
32+
* String: the Kafka record values will be strings
33+
* Hash: the Kafka record values will be maps (string key/value pairs)
34+
35+
====
36+
37+
[source,properties]
38+
----
39+
redis.keys.patterns=<glob> # <1>
40+
topic=<topic> # <2>
41+
----
42+
43+
<1> Key portion of the pattern that will be used to listen to keyspace events. For example `foo:*` translates to pubsub channel `$$__$$keyspace@0$$__$$:foo:*` and will capture changes to keys `foo:1`, `foo:2`, etc. Use comma-separated values for multiple patterns (`foo:*,bar:*`)
44+
<2> Name of the destination topic.
45+
1746
[[stream-reader]]
1847
=== Stream Reader
1948
In stream reader mode, the {name} reads messages from a Redis stream and publishes to a Kafka topic.
2049

2150
[source,properties]
2251
----
23-
redis.stream.name=<stream name> # <1>
24-
redis.stream.offset=<stream offset> # <2>
25-
redis.stream.count=<count> # <3>
52+
redis.reader=STREAM # <1>
53+
redis.stream.name=<stream name> # <2>
54+
redis.stream.offset=<stream offset> # <3>
2655
redis.stream.block=<millis> # <4>
27-
topic.name=<topic> # <5>
56+
topic=<topic> # <5>
2857
----
2958

30-
<1> Name of the stream to read from
31-
<2> https://redis.io/commands/xread#incomplete-ids[Message ID] to start reading from (default: `0-0`)
32-
<3> Maximum number of stream messages to include in a single https://redis.io/commands/xread[XREAD] when polling for new data (default: `50`). This setting can be used to limit the amount of data buffered internally in the connector
33-
<4> Maximum duration in milliseconds to wait while https://redis.io/commands/xread[reading] from the stream (default: 100)
34-
<5> A format string for the destination topic name (default: `${stream}`), which may contain `${stream}` as a placeholder for the originating stream name. For example, `redis_${stream}` for the stream 'orders' will map to the topic name `redis_orders`.
59+
<1> Reader mode is STREAM
60+
<2> Name of the stream to read from.
61+
<3> https://redis.io/commands/xread#incomplete-ids[Message ID] to start reading from (default: `0-0`).
62+
<4> Maximum duration in milliseconds to wait while https://redis.io/commands/xread[reading] from the stream (default: 100).
63+
<5> Name of the destination topic which may contain `${stream}` as a placeholder for the originating stream name. For example, `redis_${stream}` for the stream 'orders' will map to the topic name `redis_orders`.

src/main/java/com/redislabs/kafka/connect/RedisEnterpriseSourceConnector.java

Lines changed: 33 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,21 +15,32 @@
1515
import com.redislabs.kafka.connect.source.RedisEnterpriseSourceConfig;
1616
import com.redislabs.kafka.connect.source.RedisEnterpriseSourceTask;
1717
import org.apache.kafka.common.config.ConfigDef;
18+
import org.apache.kafka.common.config.ConfigException;
1819
import org.apache.kafka.common.utils.AppInfoParser;
1920
import org.apache.kafka.connect.connector.Task;
21+
import org.apache.kafka.connect.errors.ConnectException;
2022
import org.apache.kafka.connect.source.SourceConnector;
23+
import org.apache.kafka.connect.util.ConnectorUtils;
2124

22-
import java.util.Collections;
25+
import java.util.ArrayList;
26+
import java.util.HashMap;
2327
import java.util.List;
2428
import java.util.Map;
29+
import java.util.stream.Collectors;
2530

2631
public class RedisEnterpriseSourceConnector extends SourceConnector {
2732

2833
private Map<String, String> props;
34+
private RedisEnterpriseSourceConfig config;
2935

3036
@Override
3137
public void start(Map<String, String> props) {
3238
this.props = props;
39+
try {
40+
this.config = new RedisEnterpriseSourceConfig(props);
41+
} catch (ConfigException configException) {
42+
throw new ConnectException(configException);
43+
}
3344
}
3445

3546
@Override
@@ -39,7 +50,27 @@ public Class<? extends Task> taskClass() {
3950

4051
@Override
4152
public List<Map<String, String>> taskConfigs(int maxTasks) {
42-
return Collections.singletonList(props);
53+
if (this.config.getReaderType() == RedisEnterpriseSourceConfig.ReaderType.KEYS) {
54+
// Partition the configs based on channels
55+
final List<List<String>> partitionedPatterns = ConnectorUtils
56+
.groupPartitions(this.config.getKeyPatterns(), Math.min(this.config.getKeyPatterns().size(), maxTasks));
57+
58+
// Create task configs based on the partitions
59+
return partitionedPatterns.stream().map(this::taskConfig).collect(Collectors.toList());
60+
}
61+
List<Map<String, String>> taskConfigs = new ArrayList<>();
62+
for (int i = 0; i < maxTasks; i++) {
63+
Map<String, String> taskConfig = new HashMap<>(this.props);
64+
taskConfig.put(RedisEnterpriseSourceTask.TASK_ID, Integer.toString(i));
65+
taskConfigs.add(taskConfig);
66+
}
67+
return taskConfigs;
68+
}
69+
70+
private Map<String, String> taskConfig(List<String> patterns) {
71+
final Map<String, String> taskConfig = new HashMap<>(this.config.originalsStrings());
72+
taskConfig.put(RedisEnterpriseSourceConfig.KEY_PATTERNS, String.join(",", patterns));
73+
return taskConfig;
4374
}
4475

4576
@Override
@@ -51,7 +82,6 @@ public ConfigDef config() {
5182
return new RedisEnterpriseSourceConfig.RedisEnterpriseSourceConfigDef();
5283
}
5384

54-
5585
@Override
5686
public String version() {
5787
return AppInfoParser.getVersion();

src/main/java/com/redislabs/kafka/connect/sink/RedisEnterpriseSinkTask.java

Lines changed: 17 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,9 @@
1717

1818
import com.fasterxml.jackson.core.JsonProcessingException;
1919
import com.github.jcustenborder.kafka.connect.utils.data.SinkOffsetState;
20-
import com.github.jcustenborder.kafka.connect.utils.data.TopicPartitionCounter;
2120
import com.github.jcustenborder.kafka.connect.utils.jackson.ObjectMapperFactory;
21+
import com.google.common.base.Preconditions;
22+
import com.google.common.base.Strings;
2223
import com.redislabs.kafka.connect.RedisEnterpriseSinkConnector;
2324
import io.lettuce.core.KeyValue;
2425
import io.lettuce.core.RedisClient;
@@ -55,6 +56,8 @@
5556
import java.util.LinkedHashMap;
5657
import java.util.List;
5758
import java.util.Map;
59+
import java.util.concurrent.ConcurrentHashMap;
60+
import java.util.stream.Collectors;
5861

5962
public class RedisEnterpriseSinkTask extends SinkTask {
6063

@@ -144,10 +147,10 @@ private Double score(SinkRecord record) {
144147
if (value == null) {
145148
return null;
146149
}
147-
if (value instanceof Double) {
148-
return (Double) value;
150+
if (value instanceof Number) {
151+
return ((Number) value).doubleValue();
149152
}
150-
throw new DataException("The value for the record must be float64 (Java double). Consider using a single message transformation to transform the data before it is written to Redis.");
153+
throw new DataException("The value for the record must be a number. Consider using a single message transformation to transform the data before it is written to Redis.");
151154
}
152155

153156
private boolean isDelete(SinkRecord record) {
@@ -224,11 +227,18 @@ public void put(final Collection<SinkRecord> records) {
224227
} catch (Exception e) {
225228
log.warn("Could not write {} records", records.size(), e);
226229
}
227-
TopicPartitionCounter counter = new TopicPartitionCounter();
230+
Map<TopicPartition, Long> data = new ConcurrentHashMap<>(100);
228231
for (SinkRecord record : records) {
229-
counter.increment(record.topic(), record.kafkaPartition(), record.kafkaOffset());
232+
Preconditions.checkState(!Strings.isNullOrEmpty(record.topic()), "topic cannot be null or empty.");
233+
Preconditions.checkNotNull(record.kafkaPartition(), "partition cannot be null.");
234+
Preconditions.checkState(record.kafkaOffset() >= 0, "offset must be greater than or equal 0.");
235+
TopicPartition partition = new TopicPartition(record.topic(), record.kafkaPartition());
236+
long current = data.getOrDefault(partition, Long.MIN_VALUE);
237+
if (record.kafkaOffset() > current) {
238+
data.put(partition, record.kafkaOffset());
239+
}
230240
}
231-
List<SinkOffsetState> offsetData = counter.offsetStates();
241+
List<SinkOffsetState> offsetData = data.entrySet().stream().map(e -> SinkOffsetState.of(e.getKey(), e.getValue())).collect(Collectors.toList());
232242
if (!offsetData.isEmpty()) {
233243
Map<String, String> offsets = new LinkedHashMap<>(offsetData.size());
234244
for (SinkOffsetState e : offsetData) {
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
package com.redislabs.kafka.connect.source;
2+
3+
import com.redislabs.mesclun.RedisModulesClient;
4+
import io.lettuce.core.RedisClient;
5+
import org.apache.kafka.connect.source.SourceRecord;
6+
import org.slf4j.Logger;
7+
import org.slf4j.LoggerFactory;
8+
import org.springframework.util.Assert;
9+
10+
import java.util.List;
11+
import java.util.stream.Collectors;
12+
13+
public abstract class AbstractSourceRecordReader<T> implements SourceRecordReader {
14+
15+
private static final Logger log = LoggerFactory.getLogger(AbstractSourceRecordReader.class);
16+
17+
protected final RedisEnterpriseSourceConfig sourceConfig;
18+
private RedisClient client;
19+
20+
protected AbstractSourceRecordReader(RedisEnterpriseSourceConfig sourceConfig) {
21+
Assert.notNull(sourceConfig, "Source connector config must not be null");
22+
this.sourceConfig = sourceConfig;
23+
}
24+
25+
@Override
26+
public void open() {
27+
this.client = RedisModulesClient.create(sourceConfig.getRedisUri());
28+
open(client);
29+
}
30+
31+
protected abstract void open(RedisClient client);
32+
33+
@Override
34+
public List<SourceRecord> poll() {
35+
List<T> records;
36+
try {
37+
records = doPoll();
38+
} catch (Exception e) {
39+
log.error("Could not read messages", e);
40+
return null;
41+
}
42+
return records.stream().map(this::convert).collect(Collectors.toList());
43+
}
44+
45+
protected abstract List<T> doPoll() throws Exception;
46+
47+
protected abstract SourceRecord convert(T input);
48+
49+
@Override
50+
public void close() {
51+
doClose();
52+
client.shutdown();
53+
client.getResources().shutdown();
54+
}
55+
56+
protected abstract void doClose();
57+
}
Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
package com.redislabs.kafka.connect.source;
2+
3+
import io.lettuce.core.RedisClient;
4+
import org.apache.kafka.connect.data.Schema;
5+
import org.apache.kafka.connect.data.SchemaBuilder;
6+
import org.apache.kafka.connect.source.SourceRecord;
7+
import org.springframework.batch.item.ExecutionContext;
8+
import org.springframework.batch.item.redis.DataStructureItemReader;
9+
import org.springframework.batch.item.redis.support.DataStructure;
10+
import org.springframework.batch.item.redis.support.LiveKeyValueItemReader;
11+
12+
import java.time.Duration;
13+
import java.time.Instant;
14+
import java.util.HashMap;
15+
import java.util.List;
16+
import java.util.Map;
17+
18+
public class KeySourceRecordReader extends AbstractSourceRecordReader<DataStructure> {
19+
20+
private static final Schema KEY_SCHEMA = Schema.STRING_SCHEMA;
21+
private static final Schema STRING_VALUE_SCHEMA = Schema.STRING_SCHEMA;
22+
private static final String HASH_VALUE_SCHEMA_NAME = "com.redislabs.kafka.connect.HashEventValue";
23+
private static final Schema HASH_VALUE_SCHEMA = SchemaBuilder.map(Schema.STRING_SCHEMA, Schema.STRING_SCHEMA).name(HASH_VALUE_SCHEMA_NAME);
24+
25+
private final int batchSize;
26+
private final String topic;
27+
private LiveKeyValueItemReader<DataStructure> reader;
28+
private final Duration idleTimeout;
29+
30+
public KeySourceRecordReader(RedisEnterpriseSourceConfig sourceConfig, Duration idleTimeout) {
31+
super(sourceConfig);
32+
this.topic = sourceConfig.getTopicName();
33+
this.batchSize = Math.toIntExact(sourceConfig.getBatchSize());
34+
this.idleTimeout = idleTimeout;
35+
}
36+
37+
@Override
38+
protected void open(RedisClient client) {
39+
reader = DataStructureItemReader.client(client).live().idleTimeout(idleTimeout).keyPatterns(sourceConfig.getKeyPatterns().toArray(new String[0])).build();
40+
reader.open(new ExecutionContext());
41+
}
42+
43+
@Override
44+
protected List<DataStructure> doPoll() throws Exception {
45+
return reader.read(batchSize);
46+
}
47+
48+
@Override
49+
protected SourceRecord convert(DataStructure input) {
50+
Map<String, ?> sourcePartition = new HashMap<>();
51+
Map<String, ?> sourceOffset = new HashMap<>();
52+
return new SourceRecord(sourcePartition, sourceOffset, topic, null, KEY_SCHEMA, input.getKey(), schema(input), input.getValue(), Instant.now().getEpochSecond());
53+
}
54+
55+
private Schema schema(DataStructure input) {
56+
if (DataStructure.HASH.equals(input.getType())) {
57+
return HASH_VALUE_SCHEMA;
58+
}
59+
return STRING_VALUE_SCHEMA;
60+
}
61+
62+
@Override
63+
protected void doClose() {
64+
if (reader != null) {
65+
reader.close();
66+
}
67+
}
68+
}

0 commit comments

Comments
 (0)