Skip to content

Commit 6b67eb2

Browse files
committed
[deps] Upgraded to LettuceMod 1.7.3
1 parent 607763e commit 6b67eb2

File tree

9 files changed

+76
-24
lines changed

9 files changed

+76
-24
lines changed

pom.xml

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -23,11 +23,11 @@
2323
<asciidoctorj.pdf.version>1.6.0</asciidoctorj.pdf.version>
2424
<jruby.version>9.2.17.0</jruby.version>
2525
<junit.version>5.7.1</junit.version>
26-
<lettucemod.version>1.7.2</lettucemod.version>
26+
<lettucemod.version>1.7.3</lettucemod.version>
2727
<lombok.version>1.18.20</lombok.version>
2828
<mockito.version>3.9.0</mockito.version>
2929
<reactor.version>3.4.6</reactor.version>
30-
<spring-batch-redis.version>2.17.1</spring-batch-redis.version>
30+
<spring-batch-redis.version>2.17.2</spring-batch-redis.version>
3131
<slf4j.version>1.7.30</slf4j.version>
3232
<testcontainers-redis.version>1.3.3</testcontainers-redis.version>
3333
<testcontainers.version>1.15.3</testcontainers.version>
@@ -269,7 +269,7 @@
269269
<descriptorRefs>
270270
<descriptorRef>jar-with-dependencies</descriptorRef>
271271
</descriptorRefs>
272-
<finalName>${project.name}-${project.version}</finalName>
272+
<finalName>${artifactId}-${project.version}</finalName>
273273
<appendAssemblyId>false</appendAssemblyId>
274274
</configuration>
275275
<executions>
@@ -287,9 +287,10 @@
287287
<artifactId>kafka-connect-maven-plugin</artifactId>
288288
<version>0.12.0</version>
289289
<configuration>
290+
<title>Redis Enterprise Connector</title>
290291
<ownerUsername>redis</ownerUsername>
291292
<documentationUrl>https://${github.owner}.github.io/${github.repo}/</documentationUrl>
292-
<ownerLogo>src/docs/asciidoc/images/redis_logo.svg</ownerLogo>
293+
<ownerLogo>src/docs/asciidoc/images/redis_logo.png</ownerLogo>
293294
<ownerName>Redis</ownerName>
294295
<ownerUrl>https://redis.com</ownerUrl>
295296
<sourceUrl>${project.scm.url}</sourceUrl>
@@ -317,7 +318,7 @@
317318
<goal>kafka-connect</goal>
318319
</goals>
319320
<configuration>
320-
<title>Redis Enterprise Kafka for Confluent Platform 5.x</title>
321+
<description>Kafka Connect source and sink connectors for Redis Enterprise. It is for Confluent Plaform 5.0 and above</description>
321322
<version>cp5-${project.version}</version>
322323
<requirements>
323324
<requirement>Confluent Platform 5.0+</requirement>
@@ -332,7 +333,7 @@
332333
<goal>kafka-connect</goal>
333334
</goals>
334335
<configuration>
335-
<title>Redis Enterprise Kafka for Confluent Platform 6.x</title>
336+
<description>Kafka Connect source and sink connectors for Redis Enterprise. It is for Confluent Plaform 6.0 and above</description>
336337
<version>cp6-${project.version}</version>
337338
<requirements>
338339
<requirement>Confluent Platform 6.0+</requirement>
8.19 KB
Loading

src/main/java/com/redis/kafka/connect/sink/RedisEnterpriseSinkConfig.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,7 @@ private DataType dataType(Map<String, String> props) {
137137
}
138138

139139
public enum DataType {
140-
HASH, STRING, STREAM, LIST, SET, ZSET
140+
HASH, JSON, STRING, STREAM, LIST, SET, ZSET
141141
}
142142

143143
public enum PushDirection {

src/main/java/com/redis/kafka/connect/sink/RedisEnterpriseSinkTask.java

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,8 @@
2121
import com.google.common.base.Preconditions;
2222
import com.google.common.base.Strings;
2323
import com.redis.kafka.connect.RedisEnterpriseSinkConnector;
24+
import com.redis.lettucemod.RedisModulesClient;
2425
import io.lettuce.core.KeyValue;
25-
import io.lettuce.core.RedisClient;
2626
import io.lettuce.core.api.StatefulRedisConnection;
2727
import io.lettuce.core.codec.ByteArrayCodec;
2828
import org.apache.kafka.common.TopicPartition;
@@ -40,6 +40,7 @@
4040
import org.springframework.batch.item.redis.support.RedisOperation;
4141
import org.springframework.batch.item.redis.support.convert.ScoredValueConverter;
4242
import org.springframework.batch.item.redis.support.operation.Hset;
43+
import org.springframework.batch.item.redis.support.operation.JsonSet;
4344
import org.springframework.batch.item.redis.support.operation.Lpush;
4445
import org.springframework.batch.item.redis.support.operation.Rpush;
4546
import org.springframework.batch.item.redis.support.operation.Sadd;
@@ -63,7 +64,7 @@ public class RedisEnterpriseSinkTask extends SinkTask {
6364
private static final Logger log = LoggerFactory.getLogger(RedisEnterpriseSinkTask.class);
6465
private static final String OFFSET_KEY_FORMAT = "com.redis.kafka.connect.sink.offset.%s.%s";
6566

66-
private RedisClient client;
67+
private RedisModulesClient client;
6768
private RedisEnterpriseSinkConfig config;
6869
private Charset charset;
6970
private OperationItemWriter<byte[], byte[], SinkRecord> writer;
@@ -77,7 +78,7 @@ public String version() {
7778
@Override
7879
public void start(final Map<String, String> props) {
7980
config = new RedisEnterpriseSinkConfig(props);
80-
client = RedisClient.create(config.getRedisURI());
81+
client = RedisModulesClient.create(config.getRedisURI());
8182
connection = client.connect();
8283
charset = config.getCharset();
8384
writer = writer(client);
@@ -111,7 +112,7 @@ public void start(final Map<String, String> props) {
111112
}
112113
}
113114

114-
private OperationItemWriter<byte[],byte[], SinkRecord> writer(RedisClient client) {
115+
private OperationItemWriter<byte[], byte[], SinkRecord> writer(RedisModulesClient client) {
115116
OperationItemWriter.OperationItemWriterBuilder<byte[], byte[], SinkRecord> builder = OperationItemWriter.client(client, new ByteArrayCodec()).operation(operation());
116117
if (Boolean.TRUE.equals(config.isMultiexec())) {
117118
return builder.transactional().build();
@@ -129,6 +130,8 @@ private RedisOperation<byte[], byte[], SinkRecord> operation() {
129130
return Xadd.key(this::collectionKey).body(this::map).build();
130131
case HASH:
131132
return Hset.key(this::key).map(this::map).del(this::isDelete).build();
133+
case JSON:
134+
return JsonSet.key(this::key).path(".".getBytes(charset)).value(this::value).del(this::isDelete).build();
132135
case STRING:
133136
return Set.key(this::key).value(this::value).del(this::isDelete).build();
134137
case LIST:

src/main/java/com/redis/kafka/connect/source/AbstractSourceRecordReader.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
package com.redis.kafka.connect.source;
22

33
import com.redis.lettucemod.RedisModulesClient;
4-
import io.lettuce.core.RedisClient;
54
import org.apache.kafka.connect.source.SourceRecord;
65
import org.slf4j.Logger;
76
import org.slf4j.LoggerFactory;
@@ -15,7 +14,7 @@ public abstract class AbstractSourceRecordReader<T> implements SourceRecordReade
1514
private static final Logger log = LoggerFactory.getLogger(AbstractSourceRecordReader.class);
1615

1716
protected final RedisEnterpriseSourceConfig sourceConfig;
18-
private RedisClient client;
17+
private RedisModulesClient client;
1918

2019
protected AbstractSourceRecordReader(RedisEnterpriseSourceConfig sourceConfig) {
2120
Assert.notNull(sourceConfig, "Source connector config must not be null");
@@ -28,7 +27,7 @@ public void open() {
2827
open(client);
2928
}
3029

31-
protected abstract void open(RedisClient client);
30+
protected abstract void open(RedisModulesClient client);
3231

3332
@Override
3433
public List<SourceRecord> poll() {

src/main/java/com/redis/kafka/connect/source/KeySourceRecordReader.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
package com.redis.kafka.connect.source;
22

3-
import io.lettuce.core.RedisClient;
3+
import com.redis.lettucemod.RedisModulesClient;
44
import org.apache.kafka.connect.data.Schema;
55
import org.apache.kafka.connect.data.SchemaBuilder;
66
import org.apache.kafka.connect.source.SourceRecord;
@@ -35,7 +35,7 @@ public KeySourceRecordReader(RedisEnterpriseSourceConfig sourceConfig, Duration
3535
}
3636

3737
@Override
38-
protected void open(RedisClient client) {
38+
protected void open(RedisModulesClient client) {
3939
reader = DataStructureItemReader.client(client).live().idleTimeout(idleTimeout).keyPatterns(sourceConfig.getKeyPatterns().toArray(new String[0])).build();
4040
reader.open(new ExecutionContext());
4141
}

src/main/java/com/redis/kafka/connect/source/StreamSourceRecordReader.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
package com.redis.kafka.connect.source;
22

3-
import io.lettuce.core.RedisClient;
3+
import com.redis.lettucemod.RedisModulesClient;
44
import io.lettuce.core.StreamMessage;
55
import io.lettuce.core.XReadArgs;
66
import org.apache.kafka.connect.data.Schema;
@@ -38,7 +38,7 @@ public StreamSourceRecordReader(RedisEnterpriseSourceConfig sourceConfig, int ta
3838
}
3939

4040
@Override
41-
protected void open(RedisClient client) {
41+
protected void open(RedisModulesClient client) {
4242
XReadArgs.StreamOffset<String> streamOffset = XReadArgs.StreamOffset.from(sourceConfig.getStreamName(), sourceConfig.getStreamOffset());
4343
reader = StreamItemReader.client(client).offset(streamOffset).block(Duration.ofMillis(sourceConfig.getStreamBlock())).count(sourceConfig.getBatchSize()).consumerGroup(sourceConfig.getStreamConsumerGroup()).consumer(consumer).build();
4444
reader.open(new ExecutionContext());

src/test/integration/java/com/redis/kafka/connect/AbstractRedisEnterpriseIT.java

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,11 @@
11
package com.redis.kafka.connect;
22

3-
import com.redis.testcontainers.RedisContainer;
3+
import com.redis.lettucemod.RedisModulesClient;
4+
import com.redis.lettucemod.api.sync.RedisJSONCommands;
5+
import com.redis.lettucemod.cluster.RedisModulesClusterClient;
6+
import com.redis.testcontainers.RedisModulesContainer;
47
import com.redis.testcontainers.RedisServer;
58
import io.lettuce.core.AbstractRedisClient;
6-
import io.lettuce.core.RedisClient;
79
import io.lettuce.core.api.StatefulConnection;
810
import io.lettuce.core.api.StatefulRedisConnection;
911
import io.lettuce.core.api.async.BaseRedisAsyncCommands;
@@ -17,7 +19,6 @@
1719
import io.lettuce.core.api.sync.RedisSortedSetCommands;
1820
import io.lettuce.core.api.sync.RedisStreamCommands;
1921
import io.lettuce.core.api.sync.RedisStringCommands;
20-
import io.lettuce.core.cluster.RedisClusterClient;
2122
import io.lettuce.core.cluster.api.StatefulRedisClusterConnection;
2223
import org.junit.jupiter.api.AfterEach;
2324
import org.junit.jupiter.api.BeforeEach;
@@ -36,7 +37,7 @@
3637
public class AbstractRedisEnterpriseIT {
3738

3839
@Container
39-
private static final RedisContainer REDIS = new RedisContainer().withKeyspaceNotifications();
40+
private static final RedisModulesContainer REDIS = new RedisModulesContainer();
4041

4142
static Stream<RedisServer> redisServers() {
4243
return Stream.of(REDIS);
@@ -52,15 +53,15 @@ static Stream<RedisServer> redisServers() {
5253
public void setupEach() {
5354
for (RedisServer redis : redisServers().collect(Collectors.toList())) {
5455
if (redis.isCluster()) {
55-
RedisClusterClient client = RedisClusterClient.create(redis.getRedisURI());
56+
RedisModulesClusterClient client = RedisModulesClusterClient.create(redis.getRedisURI());
5657
clients.put(redis, client);
5758
StatefulRedisClusterConnection<String, String> connection = client.connect();
5859
connections.put(redis, connection);
5960
syncs.put(redis, connection.sync());
6061
asyncs.put(redis, connection.async());
6162
reactives.put(redis, connection.reactive());
6263
} else {
63-
RedisClient client = RedisClient.create(redis.getRedisURI());
64+
RedisModulesClient client = RedisModulesClient.create(redis.getRedisURI());
6465
clients.put(redis, client);
6566
StatefulRedisConnection<String, String> connection = client.connect();
6667
connections.put(redis, connection);
@@ -98,6 +99,10 @@ protected RedisHashCommands<String, String> syncHash(RedisServer redis) {
9899
return ((RedisHashCommands<String, String>) sync(redis));
99100
}
100101

102+
protected RedisJSONCommands<String, String> syncJSON(RedisServer redis) {
103+
return ((RedisJSONCommands<String, String>) sync(redis));
104+
}
105+
101106
protected RedisListCommands<String, String> syncList(RedisServer redis) {
102107
return ((RedisListCommands<String, String>) sync(redis));
103108
}

src/test/integration/java/com/redis/kafka/connect/RedisEnterpriseSinkTaskIT.java

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
package com.redis.kafka.connect;
22

3+
import com.fasterxml.jackson.core.JsonProcessingException;
4+
import com.fasterxml.jackson.databind.ObjectMapper;
35
import com.github.jcustenborder.kafka.connect.utils.SinkRecordHelper;
46
import com.redis.kafka.connect.sink.RedisEnterpriseSinkConfig;
57
import com.redis.kafka.connect.sink.RedisEnterpriseSinkTask;
@@ -8,6 +10,9 @@
810
import io.lettuce.core.Range;
911
import io.lettuce.core.ScoredValue;
1012
import io.lettuce.core.StreamMessage;
13+
import lombok.Builder;
14+
import lombok.Data;
15+
import lombok.Singular;
1116
import lombok.extern.slf4j.Slf4j;
1217
import org.apache.kafka.common.TopicPartition;
1318
import org.apache.kafka.connect.data.Schema;
@@ -96,6 +101,45 @@ public void putHash(RedisServer redis) {
96101
}
97102
}
98103

104+
@Data
105+
@Builder
106+
private static class Person {
107+
private long id;
108+
private String name;
109+
@Singular
110+
private Set<String> hobbies;
111+
private Address address;
112+
}
113+
114+
@Data
115+
@Builder
116+
private static class Address {
117+
private String street;
118+
private String city;
119+
private String state;
120+
private String zip;
121+
}
122+
123+
@ParameterizedTest
124+
@MethodSource("redisServers")
125+
public void putJSON(RedisServer redis) throws JsonProcessingException {
126+
String topic = "putJSON";
127+
List<Person> persons = new ArrayList<>();
128+
persons.add(Person.builder().id(1).name("Bodysnatch Cummerbund").address(Address.builder().city("New York").zip("10013").state("NY").street("150 Mott St").build()).hobby("Fishing").hobby("Singing").build());
129+
persons.add(Person.builder().id(2).name("Buffalo Custardbath").address(Address.builder().city("Los Angeles").zip("90001").state("CA").street("123 Sunset Blvd").build()).hobby("Surfing").hobby("Piano").build());
130+
persons.add(Person.builder().id(3).name("Bumblesnuff Crimpysnitch").address(Address.builder().city("Chicago").zip("60603").state("IL").street("100 S State St").build()).hobby("Skiing").hobby("Drums").build());
131+
List<SinkRecord> records = new ArrayList<>();
132+
ObjectMapper mapper = new ObjectMapper();
133+
for (Person person : persons) {
134+
records.add(SinkRecordHelper.write(topic, new SchemaAndValue(Schema.STRING_SCHEMA, "person:" + person.getId()), new SchemaAndValue(Schema.STRING_SCHEMA, mapper.writeValueAsString(person))));
135+
}
136+
put(topic, RedisEnterpriseSinkConfig.DataType.JSON, redis, records);
137+
for (Person person : persons) {
138+
String json = syncJSON(redis).jsonGet("person:" + person.getId());
139+
assertEquals(mapper.writeValueAsString(person), json);
140+
}
141+
}
142+
99143
@ParameterizedTest
100144
@MethodSource("redisServers")
101145
public void putLpush(RedisServer redis) {

0 commit comments

Comments
 (0)