Skip to content

Commit 252ac8d

Browse files
authored
Merge pull request #51 from riferrei/add-ttl-support
Add Support for Key TTL Setting for Sink Connector
2 parents 83e3128 + 922e5cd commit 252ac8d

File tree

8 files changed

+970
-851
lines changed

8 files changed

+970
-851
lines changed

Dockerfile

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
1-
ARG CP_VERSION=7.2.0
1+
ARG CP_VERSION=7.7.0
22
ARG BASE_PREFIX=confluentinc
3-
ARG CONNECT_IMAGE=cp-server-connect
3+
ARG CONNECT_IMAGE=cp-kafka-connect
44

55
FROM openjdk:18-jdk-slim AS build
66
WORKDIR /root/redis-kafka-connect

core/redis-kafka-connect/src/main/java/com/redis/kafka/connect/sink/RedisSinkConfig.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ public enum RedisType {
3737
private final boolean multiExec;
3838
private final int waitReplicas;
3939
private final Duration waitTimeout;
40+
private final long keyTTL;
4041

4142
public RedisSinkConfig(Map<?, ?> originals) {
4243
super(new RedisSinkConfigDef(), originals);
@@ -48,6 +49,7 @@ public RedisSinkConfig(Map<?, ?> originals) {
4849
multiExec = Boolean.TRUE.equals(getBoolean(RedisSinkConfigDef.MULTIEXEC_CONFIG));
4950
waitReplicas = getInt(RedisSinkConfigDef.WAIT_REPLICAS_CONFIG);
5051
waitTimeout = Duration.ofMillis(getLong(RedisSinkConfigDef.WAIT_TIMEOUT_CONFIG));
52+
keyTTL = getLong(RedisSinkConfigDef.KEY_TTL_CONFIG);
5153
}
5254

5355
public Charset getCharset() {
@@ -78,12 +80,16 @@ public Duration getWaitTimeout() {
7880
return waitTimeout;
7981
}
8082

83+
public long getKeyTTL() {
84+
return keyTTL;
85+
}
86+
8187
@Override
8288
public int hashCode() {
8389
final int prime = 31;
8490
int result = super.hashCode();
8591
result = prime * result
86-
+ Objects.hash(charset, keyspace, separator, multiExec, type, waitReplicas, waitTimeout);
92+
+ Objects.hash(charset, keyspace, separator, multiExec, type, waitReplicas, waitTimeout, keyTTL);
8793
return result;
8894
}
8995

@@ -98,7 +104,7 @@ public boolean equals(Object obj) {
98104
RedisSinkConfig other = (RedisSinkConfig) obj;
99105
return Objects.equals(charset, other.charset) && Objects.equals(keyspace, other.keyspace)
100106
&& Objects.equals(separator, other.separator) && multiExec == other.multiExec && type == other.type
101-
&& waitReplicas == other.waitReplicas && waitTimeout == other.waitTimeout;
107+
&& waitReplicas == other.waitReplicas && waitTimeout == other.waitTimeout && keyTTL == other.keyTTL;
102108
}
103109

104110
}

core/redis-kafka-connect/src/main/java/com/redis/kafka/connect/sink/RedisSinkConfigDef.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,12 @@ public class RedisSinkConfigDef extends RedisConfigDef {
6161
public static final String TYPE_DOC = "Destination data structure: "
6262
+ String.join(",", Stream.of(RedisType.values()).map(RedisType::name).toArray(String[]::new));
6363

64+
public static final String KEY_TTL_CONFIG = "redis.key.ttl";
65+
66+
public static final String KEY_TTL_CONFIG_DEFAULT = "-1";
67+
68+
public static final String KEY_TTL_CONFIG_DOC = "Time to live in seconds for the key. If not set, the record will not expire.";
69+
6470
protected static final Set<RedisType> MULTI_EXEC_COMMANDS = Stream
6571
.of(RedisType.STREAM, RedisType.LIST, RedisType.SET, RedisType.ZSET).collect(Collectors.toSet());
6672

@@ -81,6 +87,7 @@ private void define() {
8187
define(MULTIEXEC_CONFIG, Type.BOOLEAN, MULTIEXEC_DEFAULT, Importance.MEDIUM, MULTIEXEC_DOC);
8288
define(WAIT_REPLICAS_CONFIG, Type.INT, WAIT_REPLICAS_DEFAULT, Importance.MEDIUM, WAIT_REPLICAS_DOC);
8389
define(WAIT_TIMEOUT_CONFIG, Type.LONG, WAIT_TIMEOUT_DEFAULT, Importance.MEDIUM, WAIT_TIMEOUT_DOC);
90+
define(KEY_TTL_CONFIG, Type.LONG, KEY_TTL_CONFIG_DEFAULT, Importance.MEDIUM, KEY_TTL_CONFIG_DOC);
8491
}
8592

8693
@Override

0 commit comments

Comments
 (0)