Skip to content

Commit 6195850

Browse files
[source] Made stream the default reader and removed key reader entry from docs
1 parent c6ea435 commit 6195850

File tree

8 files changed

+100
-75
lines changed

8 files changed

+100
-75
lines changed

docker/run.sh

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,6 @@ curl -X POST -H "Content-Type: application/json" --data '
103103
"config": {
104104
"tasks.max":"1",
105105
"connector.class":"com.redislabs.kafka.connect.RedisEnterpriseSourceConnector",
106-
"redis.reader":"STREAM",
107106
"redis.uri":"redis://redis:6379",
108107
"redis.stream.name":"mystream",
109108
"topic": "mystream"

src/docs/asciidoc/sink.adoc

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,7 @@ value.converter.schema.registry.url=http://localhost:8081
103103
[source,properties]
104104
----
105105
value.converter=org.apache.kafka.connect.json.JsonConverter
106-
value.converter.schemas.enable=<true|false> # <1>
106+
value.converter.schemas.enable=<true|false> <1>
107107
----
108108

109109
<1> Set to `true` if the JSON record structure has an attached schema
@@ -123,8 +123,8 @@ Use the following properties to store Kafka records as Redis stream messages:
123123
[source,properties]
124124
----
125125
redis.type=STREAM
126-
redis.key=<stream key> # <1>
127-
value.converter=<Avro or JSON> # <2>
126+
redis.key=<stream key> <1>
127+
value.converter=<Avro or JSON> <2>
128128
----
129129

130130
<1> <<collection-key,Stream key>>
@@ -136,8 +136,8 @@ Use the following properties to write Kafka records as Redis hashes:
136136
[source,properties]
137137
----
138138
redis.type=HASH
139-
key.converter=<string or bytes> # <1>
140-
value.converter=<Avro or JSON> # <2>
139+
key.converter=<string or bytes> <1>
140+
value.converter=<Avro or JSON> <2>
141141
----
142142

143143
<1> <<key-string,String>> or <<key-bytes,bytes>>
@@ -149,8 +149,8 @@ Use the following properties to write Kafka records as Redis strings:
149149
[source,properties]
150150
----
151151
redis.type=STRING
152-
key.converter=<string or bytes> # <1>
153-
value.converter=<string or bytes> # <2>
152+
key.converter=<string or bytes> <1>
153+
value.converter=<string or bytes> <2>
154154
----
155155

156156
<1> <<key-string,String>> or <<key-bytes,bytes>>
@@ -162,9 +162,9 @@ Use the following properties to add Kafka record keys to a Redis list:
162162
[source,properties]
163163
----
164164
redis.type=LIST
165-
redis.key=<key name> # <1>
166-
key.converter=<string or bytes> # <2>
167-
redis.push.direction=<LEFT or RIGHT> # <3>
165+
redis.key=<key name> <1>
166+
key.converter=<string or bytes> <2>
167+
redis.push.direction=<LEFT or RIGHT> <3>
168168
----
169169

170170
<1> <<collection-key,List key>>
@@ -179,8 +179,8 @@ Use the following properties to add Kafka record keys to a Redis set:
179179
[source,properties]
180180
----
181181
redis.type=SET
182-
redis.key=<key name> # <1>
183-
key.converter=<string or bytes> # <2>
182+
redis.key=<key name> <1>
183+
key.converter=<string or bytes> <2>
184184
----
185185

186186
<1> <<collection-key,Set key>>
@@ -194,8 +194,8 @@ Use the following properties to add Kafka record keys to a Redis sorted set:
194194
[source,properties]
195195
----
196196
redis.type=ZSET
197-
redis.key=<key name> # <1>
198-
key.converter=<string or bytes> # <2>
197+
redis.key=<key name> <1>
198+
key.converter=<string or bytes> <2>
199199
----
200200

201201
<1> <<collection-key,Sorted set key>>

src/docs/asciidoc/source.adoc

Lines changed: 37 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,14 @@
11
= Source Connector Guide
22
:name: Redis Enterprise Kafka Source Connector
33

4-
The {name} reads data from a Redis Enterprise database and publishes messages to a Kafka topic.
4+
The {name} reads from a Redis Enterprise stream and publishes messages to a Kafka topic.
55

66
== Features
77

88
The {name} includes the following features:
99

1010
* <<at-least-once-delivery,At least once delivery>>
1111
* <<tasks,Multiple tasks>>
12-
* <<key-reader,Key Reader>>
1312
* <<stream-reader,Stream Reader>>
1413

1514
[[at-least-once-delivery]]
@@ -20,44 +19,47 @@ The {name} guarantees that records from the Kafka topic are delivered at least o
2019
=== Multiple Tasks
2120
Use configuration property `tasks.max` to have the change stream handled by multiple tasks. The connector splits the work based on the number of configured key patterns. When the number of tasks is greater than the number of patterns, the number of patterns will be used instead.
2221

23-
[[key-reader]]
24-
=== Key Reader
25-
In key reader mode, the {name} captures changes happening to keys in a Redis database and publishes keys and values to a Kafka topic. The data structure key will be mapped to the record key, and the value will be mapped to the record value.
26-
27-
[IMPORTANT]
28-
.Supported Data Structures
29-
====
30-
The {name} supports the following data structures:
31-
32-
* String: the Kafka record values will be strings
33-
* Hash: the Kafka record values will be maps (string key/value pairs)
34-
35-
====
36-
37-
[source,properties]
38-
----
39-
redis.keys.patterns=<glob> # <1>
40-
topic=<topic> # <2>
41-
----
42-
43-
<1> Key portion of the pattern that will be used to listen to keyspace events. For example `foo:*` translates to pubsub channel `$$__$$keyspace@0$$__$$:foo:*` and will capture changes to keys `foo:1`, `foo:2`, etc. Use comma-separated values for multiple patterns (`foo:*,bar:*`)
44-
<2> Name of the destination topic.
22+
//
23+
//[[key-reader]]
24+
//=== Key Reader
25+
//In key reader mode, the {name} captures changes happening to keys in a Redis database and publishes keys and values to a Kafka topic. The data structure key will be mapped to the record key, and the value will be mapped to the record value.
26+
//
27+
//[IMPORTANT]
28+
//.Supported Data Structures
29+
//====
30+
//The {name} supports the following data structures:
31+
//
32+
//* String: the Kafka record values will be strings
33+
//* Hash: the Kafka record values will be maps (string key/value pairs)
34+
//
35+
//====
36+
//
37+
//[source,properties]
38+
//----
39+
//redis.keys.patterns=<glob> <1>
40+
//topic=<topic> <2>
41+
//----
42+
//
43+
//<1> Key portion of the pattern that will be used to listen to keyspace events. For example `foo:*` translates to pubsub channel `$$__$$keyspace@0$$__$$:foo:*` and will capture changes to keys `foo:1`, `foo:2`, etc. Use comma-separated values for multiple patterns (`foo:*,bar:*`)
44+
//<2> Name of the destination topic.
4545
4646
[[stream-reader]]
4747
=== Stream Reader
48-
In stream reader mode, the {name} reads messages from a Redis stream and publishes to a Kafka topic.
48+
The {name} reads messages from a stream and publishes to a Kafka topic. Reading is done through a consumer group so that <<multiple-tasks,multiple instances>> of the connector configured via the `tasks.max` can consume messages in a round-robin fashion.
4949
5050
[source,properties]
5151
----
52-
redis.reader=STREAM # <1>
53-
redis.stream.name=<stream name> # <2>
54-
redis.stream.offset=<stream offset> # <3>
55-
redis.stream.block=<millis> # <4>
56-
topic=<topic> # <5>
52+
redis.stream.name=<name> <1>
53+
redis.stream.offset=<offset> <2>
54+
redis.stream.block=<millis> <3>
55+
redis.stream.consumer.group=<group> <4>
56+
redis.stream.consumer.name=<name> <5>
57+
topic=<name> <6>
5758
----
5859
59-
<1> Reader mode is STREAM
60-
<2> Name of the stream to read from.
61-
<3> https://redis.io/commands/xread#incomplete-ids[Message ID] to start reading from (default: `0-0`).
62-
<4> Maximum duration in milliseconds to wait while https://redis.io/commands/xread[reading] from the stream (default: 100).
63-
<5> Name of the destination topic which may contain `${stream}` as a placeholder for the originating stream name. For example, `redis_${stream}` for the stream 'orders' will map to the topic name `redis_orders`.
60+
<1> Name of the stream to read from.
61+
<2> https://redis.io/commands/xread#incomplete-ids[Message ID] to start reading from (default: `0-0`).
62+
<3> Maximum https://redis.io/commands/xread[XREAD] wait duration in milliseconds (default: `100`).
63+
<4> Name of the stream consumer group (default: `kafka-consumer-group`).
64+
<5> Name of the stream consumer (default: `consumer-${task}`). May contain `${task}` as a placeholder for the task id. For example, `foo${task}` and task `123` => consumer `foo123`.
65+
<6> Destination topic (default: `${stream}`). May contain `${stream}` as a placeholder for the originating stream name. For example, `redis_${stream}` and stream `orders` => topic `redis_orders`.

src/main/java/com/redislabs/kafka/connect/source/RedisEnterpriseSourceConfig.java

Lines changed: 21 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -21,20 +21,20 @@
2121
import com.redislabs.kafka.connect.common.RedisEnterpriseConfig;
2222
import org.apache.kafka.common.config.ConfigDef;
2323

24-
import java.util.Arrays;
25-
import java.util.Collections;
2624
import java.util.List;
2725
import java.util.Map;
2826

2927
public class RedisEnterpriseSourceConfig extends RedisEnterpriseConfig {
3028

3129
public static final String TOKEN_STREAM = "${stream}";
30+
public static final String TOKEN_TASK = "${task}";
3231

3332
public static final String TOPIC = "topic";
34-
public static final String TOPIC_DOC = String.format("Name of the destination topic. In stream reader mode the topic name may contain '%s' as a placeholder for the originating stream name. For example `redis_%s` for the stream 'orders' will map to the topic name 'redis_orders'.", TOKEN_STREAM, TOKEN_STREAM);
33+
public static final String TOPIC_DEFAULT = TOKEN_STREAM;
34+
public static final String TOPIC_DOC = String.format("Name of the destination topic, which may contain '%s' as a placeholder for the originating stream name. For example `redis_%s` for the stream 'orders' will map to the topic name 'redis_orders'.", TOKEN_STREAM, TOKEN_STREAM);
3535

3636
public static final String READER_TYPE = "redis.reader";
37-
public static final String READER_DEFAULT = ReaderType.KEYS.name();
37+
public static final String READER_DEFAULT = ReaderType.STREAM.name();
3838
public static final String READER_DOC = "Source from which to read Redis records. " + ReaderType.KEYS + ": generate records from key events and respective values generated from write operations in the Redis database. " + ReaderType.STREAM + ": read messages from a Redis stream";
3939

4040
public static final String BATCH_SIZE = "batch.size";
@@ -46,7 +46,6 @@ public class RedisEnterpriseSourceConfig extends RedisEnterpriseConfig {
4646
public static final String KEY_PATTERNS_DOC = "Keyspace glob-style patterns to subscribe to, comma-separated.";
4747

4848
public static final String STREAM_NAME = "redis.stream.name";
49-
public static final String STREAM_NAME_DEFAULT = "kafka:stream";
5049
public static final String STREAM_NAME_DOC = "Name of the Redis stream to read from";
5150

5251
public static final String STREAM_OFFSET = "redis.stream.offset";
@@ -57,6 +56,11 @@ public class RedisEnterpriseSourceConfig extends RedisEnterpriseConfig {
5756
public static final String STREAM_CONSUMER_GROUP_DEFAULT = "kafka-consumer-group";
5857
public static final String STREAM_CONSUMER_GROUP_DOC = "Stream consumer group";
5958

59+
public static final String STREAM_CONSUMER_NAME = "redis.stream.consumer.name";
60+
public static final String STREAM_CONSUMER_NAME_DEFAULT = "consumer-" + TOKEN_TASK;
61+
public static final String STREAM_CONSUMER_NAME_DOC = "A format string for the stream consumer, which may contain '" + TOKEN_TASK + "' as a placeholder for the task id.\nFor example, 'consumer-" + TOKEN_TASK + "' for the task id '123' will map to the consumer name 'consumer-123'.";
62+
63+
6064
public static final String STREAM_BLOCK = "redis.stream.block";
6165
public static final long STREAM_BLOCK_DEFAULT = 100;
6266
public static final String STREAM_BLOCK_DOC = "The max amount of time in milliseconds to wait while polling for stream messages (XREAD [BLOCK milliseconds])";
@@ -66,6 +70,7 @@ public class RedisEnterpriseSourceConfig extends RedisEnterpriseConfig {
6670
private final String streamName;
6771
private final String streamOffset;
6872
private final String streamConsumerGroup;
73+
private final String streamConsumerName;
6974
private final Long batchSize;
7075
private final Long streamBlock;
7176
private final String topicName;
@@ -79,16 +84,10 @@ public RedisEnterpriseSourceConfig(Map<?, ?> originals) {
7984
this.streamName = getString(STREAM_NAME);
8085
this.streamOffset = getString(STREAM_OFFSET);
8186
this.streamConsumerGroup = getString(STREAM_CONSUMER_GROUP);
87+
this.streamConsumerName = getString(STREAM_CONSUMER_NAME);
8288
this.streamBlock = getLong(STREAM_BLOCK);
8389
}
8490

85-
private List<String> splitCommaSeparated(String string) {
86-
if (string.trim().isEmpty()) {
87-
return Collections.emptyList();
88-
}
89-
return Arrays.asList(string.split(","));
90-
}
91-
9291
public ReaderType getReaderType() {
9392
return readerType;
9493
}
@@ -117,6 +116,10 @@ public String getStreamConsumerGroup() {
117116
return streamConsumerGroup;
118117
}
119118

119+
public String getStreamConsumerName() {
120+
return streamConsumerName;
121+
}
122+
120123
public String getTopicName() {
121124
return topicName;
122125
}
@@ -133,13 +136,14 @@ public RedisEnterpriseSourceConfigDef(ConfigDef base) {
133136
}
134137

135138
private void define() {
136-
define(ConfigKeyBuilder.of(TOPIC, ConfigDef.Type.STRING).importance(ConfigDef.Importance.MEDIUM).documentation(TOPIC_DOC).build());
137-
define(ConfigKeyBuilder.of(READER_TYPE, ConfigDef.Type.STRING).documentation(READER_DOC).defaultValue(READER_DEFAULT).importance(ConfigDef.Importance.HIGH).validator(Validators.validEnum(ReaderType.class)).build());
138-
define(ConfigKeyBuilder.of(KEY_PATTERNS, Type.LIST).documentation(KEY_PATTERNS_DOC).defaultValue(KEY_PATTERNS_DEFAULT).importance(Importance.MEDIUM).build());
139-
define(ConfigKeyBuilder.of(STREAM_NAME, ConfigDef.Type.STRING).documentation(STREAM_NAME_DOC).defaultValue(STREAM_NAME_DEFAULT).importance(ConfigDef.Importance.HIGH).build());
139+
define(ConfigKeyBuilder.of(TOPIC, ConfigDef.Type.STRING).defaultValue(TOPIC_DEFAULT).importance(ConfigDef.Importance.MEDIUM).documentation(TOPIC_DOC).build());
140+
define(ConfigKeyBuilder.of(BATCH_SIZE, ConfigDef.Type.LONG).defaultValue(BATCH_SIZE_DEFAULT).importance(ConfigDef.Importance.LOW).documentation(BATCH_SIZE_DOC).validator(ConfigDef.Range.atLeast(1L)).build());
141+
define(ConfigKeyBuilder.of(READER_TYPE, ConfigDef.Type.STRING).documentation(READER_DOC).defaultValue(READER_DEFAULT).importance(ConfigDef.Importance.HIGH).validator(Validators.validEnum(ReaderType.class)).internalConfig(true).build());
142+
define(ConfigKeyBuilder.of(KEY_PATTERNS, Type.LIST).documentation(KEY_PATTERNS_DOC).defaultValue(KEY_PATTERNS_DEFAULT).importance(Importance.MEDIUM).internalConfig(true).build());
143+
define(ConfigKeyBuilder.of(STREAM_NAME, ConfigDef.Type.STRING).documentation(STREAM_NAME_DOC).importance(ConfigDef.Importance.HIGH).build());
140144
define(ConfigKeyBuilder.of(STREAM_OFFSET, ConfigDef.Type.STRING).documentation(STREAM_OFFSET_DOC).defaultValue(STREAM_OFFSET_DEFAULT).importance(ConfigDef.Importance.MEDIUM).build());
141145
define(ConfigKeyBuilder.of(STREAM_CONSUMER_GROUP, ConfigDef.Type.STRING).documentation(STREAM_CONSUMER_GROUP_DOC).defaultValue(STREAM_CONSUMER_GROUP_DEFAULT).importance(ConfigDef.Importance.MEDIUM).build());
142-
define(ConfigKeyBuilder.of(BATCH_SIZE, ConfigDef.Type.LONG).defaultValue(BATCH_SIZE_DEFAULT).importance(ConfigDef.Importance.LOW).documentation(BATCH_SIZE_DOC).validator(ConfigDef.Range.atLeast(1L)).build());
146+
define(ConfigKeyBuilder.of(STREAM_CONSUMER_NAME, ConfigDef.Type.STRING).documentation(STREAM_CONSUMER_NAME_DOC).defaultValue(STREAM_CONSUMER_NAME_DEFAULT).importance(ConfigDef.Importance.MEDIUM).build());
143147
define(ConfigKeyBuilder.of(STREAM_BLOCK, ConfigDef.Type.LONG).defaultValue(STREAM_BLOCK_DEFAULT).importance(ConfigDef.Importance.LOW).documentation(STREAM_BLOCK_DOC).validator(ConfigDef.Range.atLeast(1L)).build());
144148
}
145149

src/main/java/com/redislabs/kafka/connect/source/StreamSourceRecordReader.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,20 +23,20 @@ public class StreamSourceRecordReader extends AbstractSourceRecordReader<StreamM
2323
private static final String VALUE_SCHEMA_NAME = "com.redislabs.kafka.connect.StreamEventValue";
2424
private static final Schema VALUE_SCHEMA = SchemaBuilder.map(Schema.STRING_SCHEMA, Schema.STRING_SCHEMA).name(VALUE_SCHEMA_NAME);
2525
private final String topic;
26-
private final int taskId;
26+
private final String consumer;
2727

2828
private StreamItemReader reader;
2929

3030
public StreamSourceRecordReader(RedisEnterpriseSourceConfig sourceConfig, int taskId) {
3131
super(sourceConfig);
3232
this.topic = sourceConfig.getTopicName().replace(RedisEnterpriseSourceConfig.TOKEN_STREAM, sourceConfig.getStreamName());
33-
this.taskId = taskId;
33+
this.consumer = sourceConfig.getStreamConsumerName().replace(RedisEnterpriseSourceConfig.TOKEN_TASK, String.valueOf(taskId));
3434
}
3535

3636
@Override
3737
protected void open(RedisClient client) {
3838
XReadArgs.StreamOffset<String> streamOffset = XReadArgs.StreamOffset.from(sourceConfig.getStreamName(), sourceConfig.getStreamOffset());
39-
reader = StreamItemReader.client(client).offset(streamOffset).block(Duration.ofMillis(sourceConfig.getStreamBlock())).count(sourceConfig.getBatchSize()).consumerGroup(sourceConfig.getStreamConsumerGroup()).consumer("consumer" + taskId).build();
39+
reader = StreamItemReader.client(client).offset(streamOffset).block(Duration.ofMillis(sourceConfig.getStreamBlock())).count(sourceConfig.getBatchSize()).consumerGroup(sourceConfig.getStreamConsumerGroup()).consumer(consumer).build();
4040
reader.open(new ExecutionContext());
4141
}
4242

src/test/integration/java/com/redislabs/kafka/connect/RedisEnterpriseSourceConnectorIT.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,18 +20,21 @@ public void taskConfigsReturnsPartitionedTaskConfigs(RedisServer server) {
2020

2121
final Map<String, String> connectorConfig = new HashMap<>();
2222
connectorConfig.put(RedisEnterpriseSourceConfig.READER_TYPE, RedisEnterpriseSourceConfig.ReaderType.KEYS.name());
23+
connectorConfig.put(RedisEnterpriseSourceConfig.STREAM_NAME, "dummy");
2324
connectorConfig.put(RedisEnterpriseSourceConfig.TOPIC, "mytopic");
2425
connectorConfig.put(RedisEnterpriseSourceConfig.REDIS_URI, server.getRedisURI());
2526
connectorConfig.put(RedisEnterpriseSourceConfig.KEY_PATTERNS, "a:*,b:*,c:*");
2627

2728
final Map<String, String> expectedPartitionedConnectorConfigA = new HashMap<>();
2829
expectedPartitionedConnectorConfigA.put(RedisEnterpriseSourceConfig.READER_TYPE, RedisEnterpriseSourceConfig.ReaderType.KEYS.name());
30+
expectedPartitionedConnectorConfigA.put(RedisEnterpriseSourceConfig.STREAM_NAME, "dummy");
2931
expectedPartitionedConnectorConfigA.put(RedisEnterpriseSourceConfig.TOPIC, "mytopic");
3032
expectedPartitionedConnectorConfigA.put(RedisEnterpriseSourceConfig.REDIS_URI, server.getRedisURI());
3133
expectedPartitionedConnectorConfigA.put(RedisEnterpriseSourceConfig.KEY_PATTERNS, "a:*,b:*");
3234

3335
final Map<String, String> expectedPartitionedConnectorConfigB = new HashMap<>();
3436
expectedPartitionedConnectorConfigB.put(RedisEnterpriseSourceConfig.READER_TYPE, RedisEnterpriseSourceConfig.ReaderType.KEYS.name());
37+
expectedPartitionedConnectorConfigB.put(RedisEnterpriseSourceConfig.STREAM_NAME, "dummy");
3538
expectedPartitionedConnectorConfigB.put(RedisEnterpriseSourceConfig.TOPIC, "mytopic");
3639
expectedPartitionedConnectorConfigB.put(RedisEnterpriseSourceConfig.REDIS_URI, server.getRedisURI());
3740
expectedPartitionedConnectorConfigB.put(RedisEnterpriseSourceConfig.KEY_PATTERNS, "c:*");

src/test/integration/java/com/redislabs/kafka/connect/RedisEnterpriseSourceTaskIT.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ public void pollStream(RedisServer redis) throws InterruptedException {
6363
@MethodSource("redisServers")
6464
public void pollKeys(RedisServer redis) throws InterruptedException {
6565
String topic = "mytopic";
66-
startTask(redis, RedisEnterpriseSourceConfig.TOPIC, topic, RedisEnterpriseSourceTask.KEYS_IDLE_TIMEOUT, "800");
66+
startTask(redis, RedisEnterpriseSourceConfig.READER_TYPE, RedisEnterpriseSourceConfig.ReaderType.KEYS.name(), RedisEnterpriseSourceConfig.STREAM_NAME, "dummy", RedisEnterpriseSourceConfig.TOPIC, topic, RedisEnterpriseSourceTask.KEYS_IDLE_TIMEOUT, "800");
6767
Thread.sleep(500);
6868
String stringKey = "key:1";
6969
String stringValue = "my string";

0 commit comments

Comments
 (0)