Skip to content

Commit e7fe8a8

Browse files
committed
Added redis.insecure option to disable verification of TLS certificates for Redis connections. Resolves #10
1 parent bf5be65 commit e7fe8a8

File tree

3 files changed

+14
-9
lines changed

3 files changed

+14
-9
lines changed

src/main/java/com/redislabs/kafka/connect/common/RedisEnterpriseConfig.java

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
import com.github.jcustenborder.kafka.connect.utils.config.ConfigKeyBuilder;
1919
import com.github.jcustenborder.kafka.connect.utils.config.validators.Validators;
20+
import io.lettuce.core.RedisURI;
2021
import org.apache.kafka.common.config.AbstractConfig;
2122
import org.apache.kafka.common.config.ConfigDef;
2223

@@ -28,30 +29,34 @@ public class RedisEnterpriseConfig extends AbstractConfig {
2829
private static final String REDIS_URI_DEFAULT = "redis://localhost:6379";
2930
private static final String REDIS_URI_DOC = "URI of the Redis Enterprise database to connect to, e.g. redis://redis-12000.redislabs.com:12000";
3031

31-
private final String redisUri;
32+
public static final String INSECURE = "redis.insecure";
33+
public static final String INSECURE_DEFAULT = "false";
34+
public static final String INSECURE_DOC = "Allow insecure connections (e.g. invalid certificates) to Redis Enterprise when using SSL.";
3235

3336
public RedisEnterpriseConfig(ConfigDef config, Map<?, ?> originals) {
3437
super(config, originals);
35-
redisUri = getString(REDIS_URI);
3638
}
3739

38-
public String getRedisUri() {
39-
return redisUri;
40+
public RedisURI getRedisURI() {
41+
RedisURI uri = RedisURI.create(getString(REDIS_URI));
42+
uri.setVerifyPeer(!getBoolean(INSECURE));
43+
return uri;
4044
}
4145

4246
protected static class RedisEnterpriseConfigDef extends ConfigDef {
4347

4448
protected RedisEnterpriseConfigDef() {
45-
defineRedisURI();
49+
defineConfigs();
4650
}
4751

4852
protected RedisEnterpriseConfigDef(ConfigDef base) {
4953
super(base);
50-
defineRedisURI();
54+
defineConfigs();
5155
}
5256

53-
private void defineRedisURI() {
57+
private void defineConfigs() {
5458
define(ConfigKeyBuilder.of(REDIS_URI, ConfigDef.Type.STRING).documentation(REDIS_URI_DOC).defaultValue(REDIS_URI_DEFAULT).importance(ConfigDef.Importance.HIGH).validator(Validators.validURI("redis", "rediss")).build());
59+
define(ConfigKeyBuilder.of(INSECURE, ConfigDef.Type.BOOLEAN).documentation(INSECURE_DOC).defaultValue(INSECURE_DEFAULT).importance(ConfigDef.Importance.MEDIUM).build());
5560
}
5661

5762
}

src/main/java/com/redislabs/kafka/connect/sink/RedisEnterpriseSinkTask.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ public String version() {
7878
@Override
7979
public void start(final Map<String, String> props) {
8080
config = new RedisEnterpriseSinkConfig(props);
81-
client = RedisClient.create(config.getRedisUri());
81+
client = RedisClient.create(config.getRedisURI());
8282
connection = client.connect();
8383
charset = config.getCharset();
8484
writer = OperationItemWriter.operation(operation()).codec(new ByteArrayCodec()).client(client).transactional(Boolean.TRUE.equals(config.isMultiexec())).build();

src/main/java/com/redislabs/kafka/connect/source/AbstractSourceRecordReader.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ protected AbstractSourceRecordReader(RedisEnterpriseSourceConfig sourceConfig) {
2424

2525
@Override
2626
public void open() {
27-
this.client = RedisModulesClient.create(sourceConfig.getRedisUri());
27+
this.client = RedisModulesClient.create(sourceConfig.getRedisURI());
2828
open(client);
2929
}
3030

0 commit comments

Comments
 (0)