Skip to content

Commit 79b15be

Browse files
committed
Added support for TimeSeries
1 parent fa32ef4 commit 79b15be

12 files changed

+1492
-1459
lines changed

pom.xml

Lines changed: 409 additions & 423 deletions
Large diffs are not rendered by default.

src/main/java/com/redis/kafka/connect/sink/RedisEnterpriseSinkConfig.java

Lines changed: 132 additions & 120 deletions
Original file line numberDiff line numberDiff line change
@@ -15,132 +15,144 @@
1515
*/
1616
package com.redis.kafka.connect.sink;
1717

18-
import com.github.jcustenborder.kafka.connect.utils.config.ConfigKeyBuilder;
19-
import com.github.jcustenborder.kafka.connect.utils.config.ConfigUtils;
20-
import com.github.jcustenborder.kafka.connect.utils.config.validators.Validators;
21-
import com.redis.kafka.connect.common.RedisEnterpriseConfig;
22-
import org.apache.kafka.common.config.ConfigDef;
23-
import org.apache.kafka.common.config.ConfigValue;
24-
2518
import java.nio.charset.Charset;
2619
import java.util.Arrays;
2720
import java.util.HashSet;
2821
import java.util.Map;
2922
import java.util.Set;
3023

24+
import org.apache.kafka.common.config.ConfigDef;
25+
import org.apache.kafka.common.config.ConfigValue;
26+
27+
import com.github.jcustenborder.kafka.connect.utils.config.ConfigKeyBuilder;
28+
import com.github.jcustenborder.kafka.connect.utils.config.ConfigUtils;
29+
import com.github.jcustenborder.kafka.connect.utils.config.validators.Validators;
30+
import com.redis.kafka.connect.common.RedisEnterpriseConfig;
31+
3132
public class RedisEnterpriseSinkConfig extends RedisEnterpriseConfig {
3233

33-
public static final String TOKEN_TOPIC = "${topic}";
34-
35-
public static final String CHARSET = "redis.charset";
36-
public static final String CHARSET_DEFAULT = Charset.defaultCharset().name();
37-
public static final String CHARSET_DOC = "Character set to encode Redis key and value strings.";
38-
39-
public static final String KEY = "redis.key";
40-
public static final String KEY_DEFAULT = TOKEN_TOPIC;
41-
public static final String KEY_DOC = "A format string for the destination stream/set/zset/list key, which may contain '" + TOKEN_TOPIC + "' as a placeholder for the originating topic name.\nFor example, ``kafka_" + TOKEN_TOPIC + "`` for the topic 'orders' will map to the Redis key " + "'kafka_orders'.";
42-
43-
public static final String MULTIEXEC = "redis.multiexec";
44-
public static final String MULTIEXEC_DEFAULT = "false";
45-
public static final String MULTIEXEC_DOC = "Whether to execute Redis commands in multi/exec transactions.";
46-
47-
public static final String TYPE = "redis.type";
48-
public static final String TYPE_DEFAULT = DataType.STREAM.name();
49-
public static final String TYPE_DOC = "Destination data structure: " + ConfigUtils.enumValues(DataType.class);
50-
51-
public static final Set<DataType> MULTI_EXEC_TYPES = new HashSet<>(Arrays.asList(DataType.STREAM, DataType.LIST, DataType.SET, DataType.ZSET));
52-
53-
public static final String PUSH_DIRECTION = "redis.push.direction";
54-
public static final String PUSH_DIRECTION_DEFAULT = PushDirection.LEFT.name();
55-
public static final String PUSH_DIRECTION_DOC = "List push direction: " + PushDirection.LEFT + " (LPUSH) or " + PushDirection.RIGHT + " (RPUSH)";
56-
57-
private final Charset charset;
58-
private final DataType type;
59-
private final String keyFormat;
60-
private final PushDirection pushDirection;
61-
private final boolean multiexec;
62-
63-
public RedisEnterpriseSinkConfig(Map<?, ?> originals) {
64-
super(new RedisEnterpriseSinkConfigDef(), originals);
65-
String charsetName = getString(CHARSET).trim();
66-
charset = Charset.forName(charsetName);
67-
type = ConfigUtils.getEnum(DataType.class, this, TYPE);
68-
keyFormat = getString(KEY).trim();
69-
pushDirection = ConfigUtils.getEnum(PushDirection.class, this, PUSH_DIRECTION);
70-
multiexec = Boolean.TRUE.equals(getBoolean(MULTIEXEC));
71-
}
72-
73-
public Charset getCharset() {
74-
return charset;
75-
}
76-
77-
public DataType getType() {
78-
return type;
79-
}
80-
81-
public String getKeyFormat() {
82-
return keyFormat;
83-
}
84-
85-
public PushDirection getPushDirection() {
86-
return pushDirection;
87-
}
88-
89-
public boolean isMultiexec() {
90-
return multiexec;
91-
}
92-
93-
public static class RedisEnterpriseSinkConfigDef extends RedisEnterpriseConfigDef {
94-
95-
public RedisEnterpriseSinkConfigDef() {
96-
define();
97-
}
98-
99-
public RedisEnterpriseSinkConfigDef(ConfigDef base) {
100-
super(base);
101-
define();
102-
}
103-
104-
private void define() {
105-
define(ConfigKeyBuilder.of(CHARSET, ConfigDef.Type.STRING).documentation(CHARSET_DOC).defaultValue(CHARSET_DEFAULT).importance(ConfigDef.Importance.HIGH).build());
106-
define(ConfigKeyBuilder.of(TYPE, ConfigDef.Type.STRING).documentation(TYPE_DOC).defaultValue(TYPE_DEFAULT).importance(ConfigDef.Importance.HIGH).validator(Validators.validEnum(DataType.class)).build());
107-
define(ConfigKeyBuilder.of(KEY, ConfigDef.Type.STRING).documentation(KEY_DOC).defaultValue(KEY_DEFAULT).importance(ConfigDef.Importance.MEDIUM).build());
108-
define(ConfigKeyBuilder.of(PUSH_DIRECTION, ConfigDef.Type.STRING).documentation(PUSH_DIRECTION_DOC).defaultValue(PUSH_DIRECTION_DEFAULT).importance(ConfigDef.Importance.MEDIUM).build());
109-
define(ConfigKeyBuilder.of(MULTIEXEC, ConfigDef.Type.BOOLEAN).documentation(MULTIEXEC_DOC).defaultValue(MULTIEXEC_DEFAULT).importance(ConfigDef.Importance.MEDIUM).build());
110-
}
111-
112-
@Override
113-
public Map<String, ConfigValue> validateAll(Map<String, String> props) {
114-
Map<String, ConfigValue> results = super.validateAll(props);
115-
if (results.values().stream().map(ConfigValue::errorMessages).anyMatch(l -> !l.isEmpty())) {
116-
return results;
117-
}
118-
DataType dataType = dataType(props);
119-
String multiexec = props.getOrDefault(MULTIEXEC, MULTIEXEC_DEFAULT).trim();
120-
if (multiexec.equalsIgnoreCase("true") && !MULTI_EXEC_TYPES.contains(dataType)) {
121-
String supportedTypes = String.join(", ", MULTI_EXEC_TYPES.stream().map(Enum::name).toArray(String[]::new));
122-
results.get(MULTIEXEC).addErrorMessage("multi/exec is only supported with these data structures: " + supportedTypes);
123-
}
124-
String charsetName = props.getOrDefault(CHARSET, CHARSET_DEFAULT).trim();
125-
try {
126-
Charset.forName(charsetName);
127-
} catch (Exception e) {
128-
results.get(CHARSET).addErrorMessage(e.getMessage());
129-
}
130-
return results;
131-
}
132-
133-
private DataType dataType(Map<String, String> props) {
134-
return DataType.valueOf(props.getOrDefault(TYPE, TYPE_DEFAULT));
135-
}
136-
137-
}
138-
139-
public enum DataType {
140-
HASH, JSON, STRING, STREAM, LIST, SET, ZSET
141-
}
142-
143-
public enum PushDirection {
144-
LEFT, RIGHT
145-
}
34+
public static final String TOKEN_TOPIC = "${topic}";
35+
36+
public static final String CHARSET = "redis.charset";
37+
public static final String CHARSET_DEFAULT = Charset.defaultCharset().name();
38+
public static final String CHARSET_DOC = "Character set to encode Redis key and value strings.";
39+
40+
public static final String KEY = "redis.key";
41+
public static final String KEY_DEFAULT = TOKEN_TOPIC;
42+
public static final String KEY_DOC = "A format string for the destination stream/set/zset/list key, which may contain '"
43+
+ TOKEN_TOPIC + "' as a placeholder for the originating topic name.\nFor example, ``kafka_" + TOKEN_TOPIC
44+
+ "`` for the topic 'orders' will map to the Redis key " + "'kafka_orders'.";
45+
46+
public static final String MULTIEXEC = "redis.multiexec";
47+
public static final String MULTIEXEC_DEFAULT = "false";
48+
public static final String MULTIEXEC_DOC = "Whether to execute Redis commands in multi/exec transactions.";
49+
50+
public static final String TYPE = "redis.type";
51+
public static final String TYPE_DEFAULT = DataType.STREAM.name();
52+
public static final String TYPE_DOC = "Destination data structure: " + ConfigUtils.enumValues(DataType.class);
53+
54+
public static final Set<DataType> MULTI_EXEC_TYPES = new HashSet<>(
55+
Arrays.asList(DataType.STREAM, DataType.LIST, DataType.SET, DataType.ZSET));
56+
57+
public static final String PUSH_DIRECTION = "redis.push.direction";
58+
public static final String PUSH_DIRECTION_DEFAULT = PushDirection.LEFT.name();
59+
public static final String PUSH_DIRECTION_DOC = "List push direction: " + PushDirection.LEFT + " (LPUSH) or "
60+
+ PushDirection.RIGHT + " (RPUSH)";
61+
62+
private final Charset charset;
63+
private final DataType type;
64+
private final String keyFormat;
65+
private final PushDirection pushDirection;
66+
private final boolean multiexec;
67+
68+
public RedisEnterpriseSinkConfig(Map<?, ?> originals) {
69+
super(new RedisEnterpriseSinkConfigDef(), originals);
70+
String charsetName = getString(CHARSET).trim();
71+
charset = Charset.forName(charsetName);
72+
type = ConfigUtils.getEnum(DataType.class, this, TYPE);
73+
keyFormat = getString(KEY).trim();
74+
pushDirection = ConfigUtils.getEnum(PushDirection.class, this, PUSH_DIRECTION);
75+
multiexec = Boolean.TRUE.equals(getBoolean(MULTIEXEC));
76+
}
77+
78+
public Charset getCharset() {
79+
return charset;
80+
}
81+
82+
public DataType getType() {
83+
return type;
84+
}
85+
86+
public String getKeyFormat() {
87+
return keyFormat;
88+
}
89+
90+
public PushDirection getPushDirection() {
91+
return pushDirection;
92+
}
93+
94+
public boolean isMultiexec() {
95+
return multiexec;
96+
}
97+
98+
public static class RedisEnterpriseSinkConfigDef extends RedisEnterpriseConfigDef {
99+
100+
public RedisEnterpriseSinkConfigDef() {
101+
define();
102+
}
103+
104+
public RedisEnterpriseSinkConfigDef(ConfigDef base) {
105+
super(base);
106+
define();
107+
}
108+
109+
private void define() {
110+
define(ConfigKeyBuilder.of(CHARSET, ConfigDef.Type.STRING).documentation(CHARSET_DOC)
111+
.defaultValue(CHARSET_DEFAULT).importance(ConfigDef.Importance.HIGH).build());
112+
define(ConfigKeyBuilder.of(TYPE, ConfigDef.Type.STRING).documentation(TYPE_DOC).defaultValue(TYPE_DEFAULT)
113+
.importance(ConfigDef.Importance.HIGH).validator(Validators.validEnum(DataType.class)).build());
114+
define(ConfigKeyBuilder.of(KEY, ConfigDef.Type.STRING).documentation(KEY_DOC).defaultValue(KEY_DEFAULT)
115+
.importance(ConfigDef.Importance.MEDIUM).build());
116+
define(ConfigKeyBuilder.of(PUSH_DIRECTION, ConfigDef.Type.STRING).documentation(PUSH_DIRECTION_DOC)
117+
.defaultValue(PUSH_DIRECTION_DEFAULT).importance(ConfigDef.Importance.MEDIUM).build());
118+
define(ConfigKeyBuilder.of(MULTIEXEC, ConfigDef.Type.BOOLEAN).documentation(MULTIEXEC_DOC)
119+
.defaultValue(MULTIEXEC_DEFAULT).importance(ConfigDef.Importance.MEDIUM).build());
120+
}
121+
122+
@Override
123+
public Map<String, ConfigValue> validateAll(Map<String, String> props) {
124+
Map<String, ConfigValue> results = super.validateAll(props);
125+
if (results.values().stream().map(ConfigValue::errorMessages).anyMatch(l -> !l.isEmpty())) {
126+
return results;
127+
}
128+
DataType dataType = dataType(props);
129+
String multiexec = props.getOrDefault(MULTIEXEC, MULTIEXEC_DEFAULT).trim();
130+
if (multiexec.equalsIgnoreCase("true") && !MULTI_EXEC_TYPES.contains(dataType)) {
131+
String supportedTypes = String.join(", ",
132+
MULTI_EXEC_TYPES.stream().map(Enum::name).toArray(String[]::new));
133+
results.get(MULTIEXEC)
134+
.addErrorMessage("multi/exec is only supported with these data structures: " + supportedTypes);
135+
}
136+
String charsetName = props.getOrDefault(CHARSET, CHARSET_DEFAULT).trim();
137+
try {
138+
Charset.forName(charsetName);
139+
} catch (Exception e) {
140+
results.get(CHARSET).addErrorMessage(e.getMessage());
141+
}
142+
return results;
143+
}
144+
145+
private DataType dataType(Map<String, String> props) {
146+
return DataType.valueOf(props.getOrDefault(TYPE, TYPE_DEFAULT));
147+
}
148+
149+
}
150+
151+
public enum DataType {
152+
HASH, JSON, TIMESERIES, STRING, STREAM, LIST, SET, ZSET
153+
}
154+
155+
public enum PushDirection {
156+
LEFT, RIGHT
157+
}
146158
}

0 commit comments

Comments
 (0)