Skip to content

Commit 8a11149

Browse files
committed
Added redis.wait.replicas and redis.wait.timeout options. Resolves
#12
1 parent 79b15be commit 8a11149

File tree

10 files changed

+359
-264
lines changed

10 files changed

+359
-264
lines changed

src/main/java/com/redis/kafka/connect/RedisEnterpriseSourceConnector.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ public List<Map<String, String>> taskConfigs(int maxTasks) {
6969

7070
private Map<String, String> taskConfig(List<String> patterns) {
7171
final Map<String, String> taskConfig = new HashMap<>(this.config.originalsStrings());
72-
taskConfig.put(RedisEnterpriseSourceConfig.KEY_PATTERNS, String.join(",", patterns));
72+
taskConfig.put(RedisEnterpriseSourceConfig.KEY_PATTERNS_CONFIG, String.join(",", patterns));
7373
return taskConfig;
7474
}
7575

src/main/java/com/redis/kafka/connect/common/RedisEnterpriseConfig.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -25,11 +25,11 @@
2525

2626
public class RedisEnterpriseConfig extends AbstractConfig {
2727

28-
public static final String REDIS_URI = "redis.uri";
28+
public static final String REDIS_URI_CONFIG = "redis.uri";
2929
private static final String REDIS_URI_DEFAULT = "redis://localhost:6379";
3030
private static final String REDIS_URI_DOC = "URI of the Redis Enterprise database to connect to, e.g. redis://redis-12000.redis.com:12000";
3131

32-
public static final String INSECURE = "redis.insecure";
32+
public static final String INSECURE_CONFIG = "redis.insecure";
3333
public static final String INSECURE_DEFAULT = "false";
3434
public static final String INSECURE_DOC = "Allow insecure connections (e.g. invalid certificates) to Redis Enterprise when using SSL.";
3535

@@ -38,8 +38,8 @@ public RedisEnterpriseConfig(ConfigDef config, Map<?, ?> originals) {
3838
}
3939

4040
public RedisURI getRedisURI() {
41-
RedisURI uri = RedisURI.create(getString(REDIS_URI));
42-
uri.setVerifyPeer(!getBoolean(INSECURE));
41+
RedisURI uri = RedisURI.create(getString(REDIS_URI_CONFIG));
42+
uri.setVerifyPeer(!getBoolean(INSECURE_CONFIG));
4343
return uri;
4444
}
4545

@@ -55,8 +55,8 @@ protected RedisEnterpriseConfigDef(ConfigDef base) {
5555
}
5656

5757
private void defineConfigs() {
58-
define(ConfigKeyBuilder.of(REDIS_URI, ConfigDef.Type.STRING).documentation(REDIS_URI_DOC).defaultValue(REDIS_URI_DEFAULT).importance(ConfigDef.Importance.HIGH).validator(Validators.validURI("redis", "rediss")).build());
59-
define(ConfigKeyBuilder.of(INSECURE, ConfigDef.Type.BOOLEAN).documentation(INSECURE_DOC).defaultValue(INSECURE_DEFAULT).importance(ConfigDef.Importance.MEDIUM).build());
58+
define(ConfigKeyBuilder.of(REDIS_URI_CONFIG, ConfigDef.Type.STRING).documentation(REDIS_URI_DOC).defaultValue(REDIS_URI_DEFAULT).importance(ConfigDef.Importance.HIGH).validator(Validators.validURI("redis", "rediss")).build());
59+
define(ConfigKeyBuilder.of(INSECURE_CONFIG, ConfigDef.Type.BOOLEAN).documentation(INSECURE_DOC).defaultValue(INSECURE_DEFAULT).importance(ConfigDef.Importance.MEDIUM).build());
6060
}
6161

6262
}

src/main/java/com/redis/kafka/connect/sink/RedisEnterpriseSinkConfig.java

Lines changed: 74 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import java.util.Arrays;
2020
import java.util.HashSet;
2121
import java.util.Map;
22+
import java.util.Objects;
2223
import java.util.Set;
2324

2425
import org.apache.kafka.common.config.ConfigDef;
@@ -31,30 +32,46 @@
3132

3233
public class RedisEnterpriseSinkConfig extends RedisEnterpriseConfig {
3334

35+
public enum DataType {
36+
HASH, JSON, TIMESERIES, STRING, STREAM, LIST, SET, ZSET
37+
}
38+
39+
public enum PushDirection {
40+
LEFT, RIGHT
41+
}
42+
3443
public static final String TOKEN_TOPIC = "${topic}";
3544

36-
public static final String CHARSET = "redis.charset";
45+
public static final String CHARSET_CONFIG = "redis.charset";
3746
public static final String CHARSET_DEFAULT = Charset.defaultCharset().name();
3847
public static final String CHARSET_DOC = "Character set to encode Redis key and value strings.";
3948

40-
public static final String KEY = "redis.key";
49+
public static final String KEY_CONFIG = "redis.key";
4150
public static final String KEY_DEFAULT = TOKEN_TOPIC;
4251
public static final String KEY_DOC = "A format string for the destination stream/set/zset/list key, which may contain '"
4352
+ TOKEN_TOPIC + "' as a placeholder for the originating topic name.\nFor example, ``kafka_" + TOKEN_TOPIC
4453
+ "`` for the topic 'orders' will map to the Redis key " + "'kafka_orders'.";
4554

46-
public static final String MULTIEXEC = "redis.multiexec";
55+
public static final String MULTIEXEC_CONFIG = "redis.multiexec";
4756
public static final String MULTIEXEC_DEFAULT = "false";
4857
public static final String MULTIEXEC_DOC = "Whether to execute Redis commands in multi/exec transactions.";
4958

50-
public static final String TYPE = "redis.type";
59+
public static final String WAIT_REPLICAS_CONFIG = "redis.wait.replicas";
60+
public static final String WAIT_REPLICAS_DEFAULT = "0";
61+
public static final String WAIT_REPLICAS_DOC = "Number of replicas to wait for. Use 0 to disable waiting for replicas.";
62+
63+
public static final String WAIT_TIMEOUT_CONFIG = "redis.wait.timeout";
64+
public static final String WAIT_TIMEOUT_DEFAULT = "1000";
65+
public static final String WAIT_TIMEOUT_DOC = "Timeout for WAIT command.";
66+
67+
public static final String TYPE_CONFIG = "redis.type";
5168
public static final String TYPE_DEFAULT = DataType.STREAM.name();
5269
public static final String TYPE_DOC = "Destination data structure: " + ConfigUtils.enumValues(DataType.class);
5370

54-
public static final Set<DataType> MULTI_EXEC_TYPES = new HashSet<>(
71+
protected static final Set<DataType> MULTI_EXEC_TYPES = new HashSet<>(
5572
Arrays.asList(DataType.STREAM, DataType.LIST, DataType.SET, DataType.ZSET));
5673

57-
public static final String PUSH_DIRECTION = "redis.push.direction";
74+
public static final String PUSH_DIRECTION_CONFIG = "redis.push.direction";
5875
public static final String PUSH_DIRECTION_DEFAULT = PushDirection.LEFT.name();
5976
public static final String PUSH_DIRECTION_DOC = "List push direction: " + PushDirection.LEFT + " (LPUSH) or "
6077
+ PushDirection.RIGHT + " (RPUSH)";
@@ -64,15 +81,19 @@ public class RedisEnterpriseSinkConfig extends RedisEnterpriseConfig {
6481
private final String keyFormat;
6582
private final PushDirection pushDirection;
6683
private final boolean multiexec;
84+
private final int waitReplicas;
85+
private final long waitTimeout;
6786

6887
public RedisEnterpriseSinkConfig(Map<?, ?> originals) {
6988
super(new RedisEnterpriseSinkConfigDef(), originals);
70-
String charsetName = getString(CHARSET).trim();
89+
String charsetName = getString(CHARSET_CONFIG).trim();
7190
charset = Charset.forName(charsetName);
72-
type = ConfigUtils.getEnum(DataType.class, this, TYPE);
73-
keyFormat = getString(KEY).trim();
74-
pushDirection = ConfigUtils.getEnum(PushDirection.class, this, PUSH_DIRECTION);
75-
multiexec = Boolean.TRUE.equals(getBoolean(MULTIEXEC));
91+
type = ConfigUtils.getEnum(DataType.class, this, TYPE_CONFIG);
92+
keyFormat = getString(KEY_CONFIG).trim();
93+
pushDirection = ConfigUtils.getEnum(PushDirection.class, this, PUSH_DIRECTION_CONFIG);
94+
multiexec = Boolean.TRUE.equals(getBoolean(MULTIEXEC_CONFIG));
95+
waitReplicas = getInt(WAIT_REPLICAS_CONFIG);
96+
waitTimeout = getLong(WAIT_TIMEOUT_CONFIG);
7697
}
7798

7899
public Charset getCharset() {
@@ -95,6 +116,14 @@ public boolean isMultiexec() {
95116
return multiexec;
96117
}
97118

119+
public int getWaitReplicas() {
120+
return waitReplicas;
121+
}
122+
123+
public long getWaitTimeout() {
124+
return waitTimeout;
125+
}
126+
98127
public static class RedisEnterpriseSinkConfigDef extends RedisEnterpriseConfigDef {
99128

100129
public RedisEnterpriseSinkConfigDef() {
@@ -107,16 +136,20 @@ public RedisEnterpriseSinkConfigDef(ConfigDef base) {
107136
}
108137

109138
private void define() {
110-
define(ConfigKeyBuilder.of(CHARSET, ConfigDef.Type.STRING).documentation(CHARSET_DOC)
139+
define(ConfigKeyBuilder.of(CHARSET_CONFIG, ConfigDef.Type.STRING).documentation(CHARSET_DOC)
111140
.defaultValue(CHARSET_DEFAULT).importance(ConfigDef.Importance.HIGH).build());
112-
define(ConfigKeyBuilder.of(TYPE, ConfigDef.Type.STRING).documentation(TYPE_DOC).defaultValue(TYPE_DEFAULT)
141+
define(ConfigKeyBuilder.of(TYPE_CONFIG, ConfigDef.Type.STRING).documentation(TYPE_DOC).defaultValue(TYPE_DEFAULT)
113142
.importance(ConfigDef.Importance.HIGH).validator(Validators.validEnum(DataType.class)).build());
114-
define(ConfigKeyBuilder.of(KEY, ConfigDef.Type.STRING).documentation(KEY_DOC).defaultValue(KEY_DEFAULT)
143+
define(ConfigKeyBuilder.of(KEY_CONFIG, ConfigDef.Type.STRING).documentation(KEY_DOC).defaultValue(KEY_DEFAULT)
115144
.importance(ConfigDef.Importance.MEDIUM).build());
116-
define(ConfigKeyBuilder.of(PUSH_DIRECTION, ConfigDef.Type.STRING).documentation(PUSH_DIRECTION_DOC)
145+
define(ConfigKeyBuilder.of(PUSH_DIRECTION_CONFIG, ConfigDef.Type.STRING).documentation(PUSH_DIRECTION_DOC)
117146
.defaultValue(PUSH_DIRECTION_DEFAULT).importance(ConfigDef.Importance.MEDIUM).build());
118-
define(ConfigKeyBuilder.of(MULTIEXEC, ConfigDef.Type.BOOLEAN).documentation(MULTIEXEC_DOC)
147+
define(ConfigKeyBuilder.of(MULTIEXEC_CONFIG, ConfigDef.Type.BOOLEAN).documentation(MULTIEXEC_DOC)
119148
.defaultValue(MULTIEXEC_DEFAULT).importance(ConfigDef.Importance.MEDIUM).build());
149+
define(ConfigKeyBuilder.of(WAIT_REPLICAS_CONFIG, ConfigDef.Type.INT).documentation(WAIT_REPLICAS_DOC)
150+
.defaultValue(WAIT_REPLICAS_DEFAULT).importance(ConfigDef.Importance.MEDIUM).build());
151+
define(ConfigKeyBuilder.of(WAIT_TIMEOUT_CONFIG, ConfigDef.Type.LONG).documentation(WAIT_TIMEOUT_DOC)
152+
.defaultValue(WAIT_TIMEOUT_DEFAULT).importance(ConfigDef.Importance.MEDIUM).build());
120153
}
121154

122155
@Override
@@ -126,33 +159,49 @@ public Map<String, ConfigValue> validateAll(Map<String, String> props) {
126159
return results;
127160
}
128161
DataType dataType = dataType(props);
129-
String multiexec = props.getOrDefault(MULTIEXEC, MULTIEXEC_DEFAULT).trim();
162+
String multiexec = props.getOrDefault(MULTIEXEC_CONFIG, MULTIEXEC_DEFAULT).trim();
130163
if (multiexec.equalsIgnoreCase("true") && !MULTI_EXEC_TYPES.contains(dataType)) {
131164
String supportedTypes = String.join(", ",
132165
MULTI_EXEC_TYPES.stream().map(Enum::name).toArray(String[]::new));
133-
results.get(MULTIEXEC)
166+
results.get(MULTIEXEC_CONFIG)
134167
.addErrorMessage("multi/exec is only supported with these data structures: " + supportedTypes);
135168
}
136-
String charsetName = props.getOrDefault(CHARSET, CHARSET_DEFAULT).trim();
169+
String charsetName = props.getOrDefault(CHARSET_CONFIG, CHARSET_DEFAULT).trim();
137170
try {
138171
Charset.forName(charsetName);
139172
} catch (Exception e) {
140-
results.get(CHARSET).addErrorMessage(e.getMessage());
173+
results.get(CHARSET_CONFIG).addErrorMessage(e.getMessage());
141174
}
142175
return results;
143176
}
144177

145178
private DataType dataType(Map<String, String> props) {
146-
return DataType.valueOf(props.getOrDefault(TYPE, TYPE_DEFAULT));
179+
return DataType.valueOf(props.getOrDefault(TYPE_CONFIG, TYPE_DEFAULT));
147180
}
148181

149182
}
150183

151-
public enum DataType {
152-
HASH, JSON, TIMESERIES, STRING, STREAM, LIST, SET, ZSET
184+
@Override
185+
public int hashCode() {
186+
final int prime = 31;
187+
int result = super.hashCode();
188+
result = prime * result
189+
+ Objects.hash(charset, keyFormat, multiexec, pushDirection, type, waitReplicas, waitTimeout);
190+
return result;
153191
}
154192

155-
public enum PushDirection {
156-
LEFT, RIGHT
193+
@Override
194+
public boolean equals(Object obj) {
195+
if (this == obj)
196+
return true;
197+
if (!super.equals(obj))
198+
return false;
199+
if (getClass() != obj.getClass())
200+
return false;
201+
RedisEnterpriseSinkConfig other = (RedisEnterpriseSinkConfig) obj;
202+
return Objects.equals(charset, other.charset) && Objects.equals(keyFormat, other.keyFormat)
203+
&& multiexec == other.multiexec && pushDirection == other.pushDirection && type == other.type
204+
&& waitReplicas == other.waitReplicas && waitTimeout == other.waitTimeout;
157205
}
206+
158207
}

src/main/java/com/redis/kafka/connect/sink/RedisEnterpriseSinkTask.java

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ public void start(final Map<String, String> props) {
8787
client = RedisModulesClient.create(config.getRedisURI());
8888
connection = client.connect();
8989
charset = config.getCharset();
90-
writer = writer(client);
90+
writer = writer(client).build();
9191
writer.open(new ExecutionContext());
9292
final java.util.Set<TopicPartition> assignment = this.context.assignment();
9393
if (!assignment.isEmpty()) {
@@ -120,13 +120,16 @@ private Collection<SinkOffsetState> offsetStates(java.util.Set<TopicPartition> a
120120
return offsetStates;
121121
}
122122

123-
private RedisItemWriter<byte[], byte[], SinkRecord> writer(RedisModulesClient client) {
124-
RedisItemWriterBuilder<byte[], byte[], SinkRecord> writerBuilder = new OperationItemWriterBuilder<>(client,
123+
private RedisItemWriterBuilder<byte[], byte[], SinkRecord> writer(RedisModulesClient client) {
124+
RedisItemWriterBuilder<byte[], byte[], SinkRecord> builder = new OperationItemWriterBuilder<>(client,
125125
new ByteArrayCodec()).operation(operation());
126126
if (Boolean.TRUE.equals(config.isMultiexec())) {
127-
writerBuilder.multiExec();
127+
builder.multiExec();
128128
}
129-
return writerBuilder.build();
129+
if (config.getWaitReplicas() > 0) {
130+
builder.waitForReplication(config.getWaitReplicas(), config.getWaitTimeout());
131+
}
132+
return builder;
130133
}
131134

132135
private String offsetKey(String topic, Integer partition) {
@@ -158,7 +161,7 @@ private RedisOperation<byte[], byte[], SinkRecord> operation() {
158161
return Zadd.<byte[], byte[], SinkRecord>key(this::collectionKey)
159162
.value(new ScoredValueConverter<>(this::key, this::doubleValue)).build();
160163
default:
161-
throw new ConfigException(RedisEnterpriseSinkConfig.TYPE, config.getType());
164+
throw new ConfigException(RedisEnterpriseSinkConfig.TYPE_CONFIG, config.getType());
162165
}
163166
}
164167

0 commit comments

Comments
 (0)