diff --git a/Dockerfile b/Dockerfile index 4dc3847..7d9919d 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,6 +1,6 @@ -ARG CP_VERSION=7.2.0 +ARG CP_VERSION=7.7.0 ARG BASE_PREFIX=confluentinc -ARG CONNECT_IMAGE=cp-server-connect +ARG CONNECT_IMAGE=cp-kafka-connect FROM openjdk:18-jdk-slim AS build WORKDIR /root/redis-kafka-connect diff --git a/core/redis-kafka-connect/src/main/java/com/redis/kafka/connect/sink/RedisSinkConfig.java b/core/redis-kafka-connect/src/main/java/com/redis/kafka/connect/sink/RedisSinkConfig.java index 72fe5c9..92776a0 100644 --- a/core/redis-kafka-connect/src/main/java/com/redis/kafka/connect/sink/RedisSinkConfig.java +++ b/core/redis-kafka-connect/src/main/java/com/redis/kafka/connect/sink/RedisSinkConfig.java @@ -37,6 +37,7 @@ public enum RedisType { private final boolean multiExec; private final int waitReplicas; private final Duration waitTimeout; + private final long keyTTL; public RedisSinkConfig(Map originals) { super(new RedisSinkConfigDef(), originals); @@ -48,6 +49,7 @@ public RedisSinkConfig(Map originals) { multiExec = Boolean.TRUE.equals(getBoolean(RedisSinkConfigDef.MULTIEXEC_CONFIG)); waitReplicas = getInt(RedisSinkConfigDef.WAIT_REPLICAS_CONFIG); waitTimeout = Duration.ofMillis(getLong(RedisSinkConfigDef.WAIT_TIMEOUT_CONFIG)); + keyTTL = getLong(RedisSinkConfigDef.KEY_TTL_CONFIG); } public Charset getCharset() { @@ -78,12 +80,16 @@ public Duration getWaitTimeout() { return waitTimeout; } + public long getKeyTTL() { + return keyTTL; + } + @Override public int hashCode() { final int prime = 31; int result = super.hashCode(); result = prime * result - + Objects.hash(charset, keyspace, separator, multiExec, type, waitReplicas, waitTimeout); + + Objects.hash(charset, keyspace, separator, multiExec, type, waitReplicas, waitTimeout, keyTTL); return result; } @@ -98,7 +104,7 @@ public boolean equals(Object obj) { RedisSinkConfig other = (RedisSinkConfig) obj; return Objects.equals(charset, other.charset) && Objects.equals(keyspace, other.keyspace) && Objects.equals(separator, other.separator) && multiExec == other.multiExec && type == other.type - && waitReplicas == other.waitReplicas && waitTimeout == other.waitTimeout; + && waitReplicas == other.waitReplicas && waitTimeout == other.waitTimeout && keyTTL == other.keyTTL; } } diff --git a/core/redis-kafka-connect/src/main/java/com/redis/kafka/connect/sink/RedisSinkConfigDef.java b/core/redis-kafka-connect/src/main/java/com/redis/kafka/connect/sink/RedisSinkConfigDef.java index bbdf42a..e9e24af 100644 --- a/core/redis-kafka-connect/src/main/java/com/redis/kafka/connect/sink/RedisSinkConfigDef.java +++ b/core/redis-kafka-connect/src/main/java/com/redis/kafka/connect/sink/RedisSinkConfigDef.java @@ -61,6 +61,12 @@ public class RedisSinkConfigDef extends RedisConfigDef { public static final String TYPE_DOC = "Destination data structure: " + String.join(",", Stream.of(RedisType.values()).map(RedisType::name).toArray(String[]::new)); + public static final String KEY_TTL_CONFIG = "redis.key.ttl"; + + public static final String KEY_TTL_CONFIG_DEFAULT = "-1"; + + public static final String KEY_TTL_CONFIG_DOC = "Time to live in seconds for the key. If not set, the record will not expire."; + protected static final Set MULTI_EXEC_COMMANDS = Stream .of(RedisType.STREAM, RedisType.LIST, RedisType.SET, RedisType.ZSET).collect(Collectors.toSet()); @@ -81,6 +87,7 @@ private void define() { define(MULTIEXEC_CONFIG, Type.BOOLEAN, MULTIEXEC_DEFAULT, Importance.MEDIUM, MULTIEXEC_DOC); define(WAIT_REPLICAS_CONFIG, Type.INT, WAIT_REPLICAS_DEFAULT, Importance.MEDIUM, WAIT_REPLICAS_DOC); define(WAIT_TIMEOUT_CONFIG, Type.LONG, WAIT_TIMEOUT_DEFAULT, Importance.MEDIUM, WAIT_TIMEOUT_DOC); + define(KEY_TTL_CONFIG, Type.LONG, KEY_TTL_CONFIG_DEFAULT, Importance.MEDIUM, KEY_TTL_CONFIG_DOC); } @Override diff --git a/core/redis-kafka-connect/src/main/java/com/redis/kafka/connect/sink/RedisSinkTask.java b/core/redis-kafka-connect/src/main/java/com/redis/kafka/connect/sink/RedisSinkTask.java index b62433a..e4bb61a 100644 --- a/core/redis-kafka-connect/src/main/java/com/redis/kafka/connect/sink/RedisSinkTask.java +++ b/core/redis-kafka-connect/src/main/java/com/redis/kafka/connect/sink/RedisSinkTask.java @@ -15,19 +15,21 @@ */ package com.redis.kafka.connect.sink; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.LinkedHashMap; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.function.Predicate; -import java.util.stream.Collector; -import java.util.stream.Collectors; - +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.redis.kafka.connect.common.ManifestVersionProvider; +import com.redis.lettucemod.RedisModulesUtils; +import com.redis.lettucemod.api.StatefulRedisModulesConnection; +import com.redis.lettucemod.timeseries.Sample; +import com.redis.spring.batch.item.redis.RedisItemWriter; +import com.redis.spring.batch.item.redis.common.Operation; +import com.redis.spring.batch.item.redis.writer.impl.*; +import com.redis.spring.batch.item.redis.writer.impl.Set; +import io.lettuce.core.*; +import io.lettuce.core.api.async.RedisAsyncCommands; +import io.lettuce.core.codec.ByteArrayCodec; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.config.ConfigException; @@ -36,6 +38,7 @@ import org.apache.kafka.connect.errors.ConnectException; import org.apache.kafka.connect.errors.DataException; import org.apache.kafka.connect.errors.RetriableException; +import org.apache.kafka.connect.header.Header; import org.apache.kafka.connect.json.JsonConverter; import org.apache.kafka.connect.sink.SinkRecord; import org.apache.kafka.connect.sink.SinkTask; @@ -46,347 +49,363 @@ import org.springframework.batch.item.ExecutionContext; import org.springframework.util.CollectionUtils; -import com.fasterxml.jackson.annotation.JsonInclude; -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.DeserializationFeature; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.redis.kafka.connect.common.ManifestVersionProvider; -import com.redis.lettucemod.RedisModulesUtils; -import com.redis.lettucemod.api.StatefulRedisModulesConnection; -import com.redis.lettucemod.timeseries.Sample; -import com.redis.spring.batch.item.redis.RedisItemWriter; -import com.redis.spring.batch.item.redis.common.Operation; -import com.redis.spring.batch.item.redis.writer.impl.Del; -import com.redis.spring.batch.item.redis.writer.impl.Hset; -import com.redis.spring.batch.item.redis.writer.impl.JsonSet; -import com.redis.spring.batch.item.redis.writer.impl.Noop; -import com.redis.spring.batch.item.redis.writer.impl.Rpush; -import com.redis.spring.batch.item.redis.writer.impl.Sadd; -import com.redis.spring.batch.item.redis.writer.impl.Set; -import com.redis.spring.batch.item.redis.writer.impl.TsAdd; -import com.redis.spring.batch.item.redis.writer.impl.Xadd; -import com.redis.spring.batch.item.redis.writer.impl.Zadd; - -import io.lettuce.core.AbstractRedisClient; -import io.lettuce.core.KeyValue; -import io.lettuce.core.RedisCommandTimeoutException; -import io.lettuce.core.RedisConnectionException; -import io.lettuce.core.RedisFuture; -import io.lettuce.core.ScoredValue; -import io.lettuce.core.StreamMessage; -import io.lettuce.core.api.async.RedisAsyncCommands; -import io.lettuce.core.codec.ByteArrayCodec; +import java.util.*; +import java.util.Map.Entry; +import java.util.function.Predicate; +import java.util.stream.Collector; +import java.util.stream.Collectors; public class RedisSinkTask extends SinkTask { - private static final Logger log = LoggerFactory.getLogger(RedisSinkTask.class); - - private static final String OFFSET_KEY_FORMAT = "com.redis.kafka.connect.sink.offset.%s.%s"; - - private static final ObjectMapper objectMapper = objectMapper(); - - private static final Collector> offsetCollector = Collectors - .toMap(RedisSinkTask::offsetKey, RedisSinkTask::offsetValue); - - private RedisSinkConfig config; - - private AbstractRedisClient client; - private StatefulRedisModulesConnection connection; - private Converter jsonConverter; - private RedisItemWriter writer; - - @Override - public String version() { - return ManifestVersionProvider.getVersion(); - } - - private static ObjectMapper objectMapper() { - ObjectMapper mapper = new ObjectMapper(); - mapper.setSerializationInclusion(JsonInclude.Include.NON_NULL); - mapper.configure(DeserializationFeature.USE_BIG_DECIMAL_FOR_FLOATS, true); - mapper.configure(DeserializationFeature.USE_LONG_FOR_INTS, true); - return mapper; - } - - @Override - public void start(final Map props) { - config = new RedisSinkConfig(props); - jsonConverter = new JsonConverter(); - jsonConverter.configure(Collections.singletonMap("schemas.enable", "false"), false); - client = config.client(); - connection = RedisModulesUtils.connection(client); - writer = RedisItemWriter.operation(ByteArrayCodec.INSTANCE, new ConditionalDel(operation(), del())); - writer.setClient(client); - writer.setMultiExec(config.isMultiExec()); - writer.setWaitReplicas(config.getWaitReplicas()); - writer.setWaitTimeout(config.getWaitTimeout()); - writer.setPoolSize(config.getPoolSize()); - writer.open(new ExecutionContext()); - java.util.Set assignment = this.context.assignment(); - if (CollectionUtils.isEmpty(assignment)) { - return; - } - Map partitionOffsets = new HashMap<>(assignment.size()); - for (SinkOffsetState state : offsetStates(assignment)) { - partitionOffsets.put(state.topicPartition(), state.offset()); - log.info("Requesting offset {} for {}", state.offset(), state.topicPartition()); - } - for (TopicPartition topicPartition : assignment) { - partitionOffsets.putIfAbsent(topicPartition, 0L); - } - this.context.offset(partitionOffsets); - } - - private Operation del() { - switch (config.getType()) { - case HASH: - case JSON: - case STRING: - return new Del<>(this::key); - default: - return new Noop<>(); - } - } - - private class ConditionalDel implements Operation { - - private final Predicate delPredicate = r -> r.value() == null; - private final Operation write; - private final Operation del; - - public ConditionalDel(Operation delegate, - Operation remove) { - this.write = delegate; - this.del = remove; - } - - @Override - public List> execute(RedisAsyncCommands commands, - Chunk items) { - List> futures = new ArrayList<>(); - List toRemove = items.getItems().stream().filter(delPredicate).collect(Collectors.toList()); - futures.addAll(del.execute(commands, new Chunk<>(toRemove))); - List toWrite = items.getItems().stream().filter(delPredicate.negate()) - .collect(Collectors.toList()); - futures.addAll(write.execute(commands, new Chunk<>(toWrite))); - return futures; - } - - } - - private Collection offsetStates(java.util.Set assignment) { - String[] partitionKeys = assignment.stream().map(this::offsetKey).toArray(String[]::new); - List> values = connection.sync().mget(partitionKeys); - return values.stream().filter(KeyValue::hasValue).map(this::offsetState).collect(Collectors.toList()); - } - - private String offsetKey(TopicPartition partition) { - return offsetKey(partition.topic(), partition.partition()); - } - - private SinkOffsetState offsetState(KeyValue value) { - try { - return objectMapper.readValue(value.getValue(), SinkOffsetState.class); - } catch (JsonProcessingException e) { - throw new DataException("Could not parse sink offset state", e); - } - } - - private static String offsetKey(String topic, Integer partition) { - return String.format(OFFSET_KEY_FORMAT, topic, partition); - } - - private Operation operation() { - switch (config.getType()) { - case HASH: - return new Hset<>(this::key, this::map); - case JSON: - return new JsonSet<>(this::key, this::jsonValue); - case STRING: - return new Set<>(this::key, this::value); - case STREAM: - return new Xadd<>(this::collectionKey, this::streamMessages); - case LIST: - return new Rpush<>(this::collectionKey, this::members); - case SET: - return new Sadd<>(this::collectionKey, this::members); - case TIMESERIES: - return new TsAdd<>(this::collectionKey, this::samples); - case ZSET: - return new Zadd<>(this::collectionKey, this::scoredValues); - default: - throw new ConfigException(RedisSinkConfigDef.TYPE_CONFIG, config.getType()); - } - } - - private Collection> scoredValues(SinkRecord sinkRecord) { - return Arrays.asList(ScoredValue.just(doubleValue(sinkRecord), member(sinkRecord))); - } - - private Collection samples(SinkRecord sinkRecord) { - return Arrays.asList(Sample.of(longMember(sinkRecord), doubleValue(sinkRecord))); - } - - private byte[] value(SinkRecord sinkRecord) { - return bytes("value", sinkRecord.value()); - } - - private byte[] jsonValue(SinkRecord sinkRecord) { - Object value = sinkRecord.value(); - if (value instanceof byte[]) { - return (byte[]) value; - } - if (value instanceof String) { - return ((String) value).getBytes(config.getCharset()); - } - return jsonConverter.fromConnectData(sinkRecord.topic(), sinkRecord.valueSchema(), value); - } - - private Long longMember(SinkRecord sinkRecord) { - Object key = sinkRecord.key(); - if (key == null) { - return null; - } - if (key instanceof Number) { - return ((Number) key).longValue(); - } - throw new DataException( - "The key for the record must be a number. Consider using a single message transformation to transform the data before it is written to Redis."); - } - - private Double doubleValue(SinkRecord sinkRecord) { - Object value = sinkRecord.value(); - if (value == null) { - return null; - } - if (value instanceof Number) { - return ((Number) value).doubleValue(); - } - throw new DataException( - "The value for the record must be a number. Consider using a single message transformation to transform the data before it is written to Redis."); - } - - private byte[] key(SinkRecord sinkRecord) { - if (config.getKeyspace().isEmpty()) { - return bytes("key", sinkRecord.key()); - } - String keyspace = keyspace(sinkRecord); - String key = keyspace + config.getSeparator() + String.valueOf(sinkRecord.key()); - return key.getBytes(config.getCharset()); - } - - private Collection members(SinkRecord sinkRecord) { - return Arrays.asList(member(sinkRecord)); - } - - private byte[] member(SinkRecord sinkRecord) { - return bytes("key", sinkRecord.key()); - } - - private String keyspace(SinkRecord sinkRecord) { - return config.getKeyspace().replace(RedisSinkConfigDef.TOKEN_TOPIC, sinkRecord.topic()); - } - - private byte[] bytes(String source, Object input) { - if (input instanceof byte[]) { - return (byte[]) input; - } - if (input instanceof String) { - return ((String) input).getBytes(config.getCharset()); - } - throw new DataException(String.format( - "The %s for the record must be a string or byte array. Consider using the StringConverter or ByteArrayConverter if the data is stored in Kafka in the format needed in Redis.", - source)); - } - - private byte[] collectionKey(SinkRecord sinkRecord) { - return keyspace(sinkRecord).getBytes(config.getCharset()); - } - - private Collection> streamMessages(SinkRecord sinkRecord) { - return Arrays.asList(new StreamMessage<>(collectionKey(sinkRecord), null, map(sinkRecord))); - } - - @SuppressWarnings("unchecked") - private Map map(SinkRecord sinkRecord) { - Object value = sinkRecord.value(); - if (value instanceof Struct) { - Map body = new LinkedHashMap<>(); - Struct struct = (Struct) value; - for (Field field : struct.schema().fields()) { - Object fieldValue = struct.get(field); - body.put(field.name().getBytes(config.getCharset()), - fieldValue == null ? null : fieldValue.toString().getBytes(config.getCharset())); - } - return body; - } - if (value instanceof Map) { - Map map = (Map) value; - Map body = new LinkedHashMap<>(); - for (Map.Entry e : map.entrySet()) { - body.put(e.getKey().getBytes(config.getCharset()), - String.valueOf(e.getValue()).getBytes(config.getCharset())); - } - return body; - } - throw new ConnectException("Unsupported source value type: " + sinkRecord.valueSchema().type().name()); - } - - @Override - public void stop() { - if (writer != null) { - writer.close(); - } - if (connection != null) { - connection.close(); - } - if (client != null) { - client.shutdown(); - client.getResources().shutdown(); - } - } - - @Override - public void put(final Collection records) { - log.debug("Processing {} records", records.size()); - try { - writer.write(new Chunk<>(new ArrayList<>(records))); - } catch (RedisConnectionException e) { - throw new RetriableException("Could not get connection to Redis", e); - } catch (RedisCommandTimeoutException e) { - throw new RetriableException("Timeout while writing sink records", e); - } catch (Exception e) { - throw new RetriableException("Could not write sink records", e); - } - log.info("Wrote {} records", records.size()); - } - - @Override - public void flush(Map currentOffsets) { - Map offsets = currentOffsets.entrySet().stream().map(this::offsetState) - .collect(offsetCollector); - log.trace("Writing offsets: {}", offsets); - try { - connection.sync().mset(offsets); - } catch (RedisCommandTimeoutException e) { - throw new RetriableException("Could not write offsets", e); - } - } - - private SinkOffsetState offsetState(Entry entry) { - return SinkOffsetState.of(entry.getKey(), entry.getValue().offset()); - } - - private static String offsetKey(SinkOffsetState state) { - return offsetKey(state.topic(), state.partition()); - } - - private static String offsetValue(SinkOffsetState state) { - try { - return objectMapper.writeValueAsString(state); - } catch (JsonProcessingException e) { - throw new DataException("Could not serialize sink offset state", e); - } - } + private static final Logger log = LoggerFactory.getLogger(RedisSinkTask.class); + + private static final String OFFSET_KEY_FORMAT = "com.redis.kafka.connect.sink.offset.%s.%s"; + + private static final ObjectMapper objectMapper = objectMapper(); + + private static final Collector> offsetCollector = Collectors + .toMap(RedisSinkTask::offsetKey, RedisSinkTask::offsetValue); + + private RedisSinkConfig config; + + private AbstractRedisClient client; + private StatefulRedisModulesConnection connection; + private Converter jsonConverter; + private RedisItemWriter writer; + + private static ObjectMapper objectMapper() { + ObjectMapper mapper = new ObjectMapper(); + mapper.setSerializationInclusion(JsonInclude.Include.NON_NULL); + mapper.configure(DeserializationFeature.USE_BIG_DECIMAL_FOR_FLOATS, true); + mapper.configure(DeserializationFeature.USE_LONG_FOR_INTS, true); + return mapper; + } + + private static String offsetKey(String topic, Integer partition) { + return String.format(OFFSET_KEY_FORMAT, topic, partition); + } + + private static String offsetKey(SinkOffsetState state) { + return offsetKey(state.topic(), state.partition()); + } + + private static String offsetValue(SinkOffsetState state) { + try { + return objectMapper.writeValueAsString(state); + } catch (JsonProcessingException e) { + throw new DataException("Could not serialize sink offset state", e); + } + } + + @Override + public String version() { + return ManifestVersionProvider.getVersion(); + } + + @Override + public void start(final Map props) { + config = new RedisSinkConfig(props); + jsonConverter = new JsonConverter(); + jsonConverter.configure(Collections.singletonMap("schemas.enable", "false"), false); + client = config.client(); + connection = RedisModulesUtils.connection(client); + writer = RedisItemWriter.operation(ByteArrayCodec.INSTANCE, new ConditionalDel(operation(), del())); + writer.setClient(client); + writer.setMultiExec(config.isMultiExec()); + writer.setWaitReplicas(config.getWaitReplicas()); + writer.setWaitTimeout(config.getWaitTimeout()); + writer.setPoolSize(config.getPoolSize()); + writer.open(new ExecutionContext()); + java.util.Set assignment = this.context.assignment(); + if (CollectionUtils.isEmpty(assignment)) { + return; + } + Map partitionOffsets = new HashMap<>(assignment.size()); + for (SinkOffsetState state : offsetStates(assignment)) { + partitionOffsets.put(state.topicPartition(), state.offset()); + log.info("Requesting offset {} for {}", state.offset(), state.topicPartition()); + } + for (TopicPartition topicPartition : assignment) { + partitionOffsets.putIfAbsent(topicPartition, 0L); + } + this.context.offset(partitionOffsets); + } + + private Operation del() { + switch (config.getType()) { + case HASH: + case JSON: + case STRING: + return new Del<>(this::key); + default: + return new Noop<>(); + } + } + + private Collection offsetStates(java.util.Set assignment) { + String[] partitionKeys = assignment.stream().map(this::offsetKey).toArray(String[]::new); + List> values = connection.sync().mget(partitionKeys); + return values.stream().filter(KeyValue::hasValue).map(this::offsetState).collect(Collectors.toList()); + } + + private String offsetKey(TopicPartition partition) { + return offsetKey(partition.topic(), partition.partition()); + } + + private SinkOffsetState offsetState(KeyValue value) { + try { + return objectMapper.readValue(value.getValue(), SinkOffsetState.class); + } catch (JsonProcessingException e) { + throw new DataException("Could not parse sink offset state", e); + } + } + + private Operation operation() { + Operation oper; + + switch (config.getType()) { + case HASH -> oper = new Hset<>(this::key, this::map); + case JSON -> oper = new JsonSet<>(this::key, this::jsonValue); + case STRING -> oper = new Set<>(this::key, this::value); + case STREAM -> oper = new Xadd<>(this::collectionKey, this::streamMessages); + case LIST -> oper = new Rpush<>(this::collectionKey, this::members); + case SET -> oper = new Sadd<>(this::collectionKey, this::members); + case TIMESERIES -> oper = new TsAdd<>(this::collectionKey, this::samples); + case ZSET -> oper = new Zadd<>(this::collectionKey, this::scoredValues); + default -> throw new ConfigException(RedisSinkConfigDef.TYPE_CONFIG, config.getType()); + } + + return new TTLSupport(oper); + } + + private class TTLSupport implements Operation { + private final Operation delegate; + + public TTLSupport(Operation delegate) { + this.delegate = delegate; + } + + @Override + public List> execute(RedisAsyncCommands commands, Chunk items) { + List> futures = new ArrayList<>(delegate.execute(commands, items)); + + for (SinkRecord record : items) { + byte[] key = determineKey(record); + long ttlFromRecord = extractTTLFromHeaders(record); + + if (ttlFromRecord > 0) { + futures.add((RedisFuture) (RedisFuture) commands.expire(key, ttlFromRecord)); + } else if (config.getKeyTTL() > 0) { + futures.add((RedisFuture) (RedisFuture) commands.expire(key, config.getKeyTTL())); + } + } + + return futures; + } + + private byte[] determineKey(SinkRecord record) { + return switch (config.getType()) { + case STREAM, LIST, SET, ZSET, TIMESERIES -> collectionKey(record); + default -> key(record); + }; + } + + private long extractTTLFromHeaders(SinkRecord record) { + for (Header header : record.headers()) { + if (header.key().equals(RedisSinkConfigDef.KEY_TTL_CONFIG)) { + try { + return Long.parseLong(header.value().toString()); + } catch (NumberFormatException e) { + log.warn("Invalid TTL header value for record {}: {}", record, header.value()); + } + } + } + return -1L; + } + } + + private Collection> scoredValues(SinkRecord sinkRecord) { + return Arrays.asList(ScoredValue.just(doubleValue(sinkRecord), member(sinkRecord))); + } + + private Collection samples(SinkRecord sinkRecord) { + return Arrays.asList(Sample.of(longMember(sinkRecord), doubleValue(sinkRecord))); + } + + private byte[] value(SinkRecord sinkRecord) { + return bytes("value", sinkRecord.value()); + } + + private byte[] jsonValue(SinkRecord sinkRecord) { + Object value = sinkRecord.value(); + if (value instanceof byte[]) { + return (byte[]) value; + } + if (value instanceof String) { + return ((String) value).getBytes(config.getCharset()); + } + return jsonConverter.fromConnectData(sinkRecord.topic(), sinkRecord.valueSchema(), value); + } + + private Long longMember(SinkRecord sinkRecord) { + Object key = sinkRecord.key(); + if (key == null) { + return null; + } + if (key instanceof Number) { + return ((Number) key).longValue(); + } + throw new DataException( + "The key for the record must be a number. Consider using a single message transformation to transform the data before it is written to Redis."); + } + + private Double doubleValue(SinkRecord sinkRecord) { + Object value = sinkRecord.value(); + if (value == null) { + return null; + } + if (value instanceof Number) { + return ((Number) value).doubleValue(); + } + throw new DataException( + "The value for the record must be a number. Consider using a single message transformation to transform the data before it is written to Redis."); + } + + private byte[] key(SinkRecord sinkRecord) { + if (config.getKeyspace().isEmpty()) { + return bytes("key", sinkRecord.key()); + } + String keyspace = keyspace(sinkRecord); + String key = keyspace + config.getSeparator() + String.valueOf(sinkRecord.key()); + return key.getBytes(config.getCharset()); + } + + private Collection members(SinkRecord sinkRecord) { + return Arrays.asList(member(sinkRecord)); + } + + private byte[] member(SinkRecord sinkRecord) { + return bytes("key", sinkRecord.key()); + } + + private String keyspace(SinkRecord sinkRecord) { + return config.getKeyspace().replace(RedisSinkConfigDef.TOKEN_TOPIC, sinkRecord.topic()); + } + + private byte[] bytes(String source, Object input) { + if (input instanceof byte[]) { + return (byte[]) input; + } + if (input instanceof String) { + return ((String) input).getBytes(config.getCharset()); + } + throw new DataException(String.format( + "The %s for the record must be a string or byte array. Consider using the StringConverter or ByteArrayConverter if the data is stored in Kafka in the format needed in Redis.", + source)); + } + + private byte[] collectionKey(SinkRecord sinkRecord) { + return keyspace(sinkRecord).getBytes(config.getCharset()); + } + + private Collection> streamMessages(SinkRecord sinkRecord) { + return Arrays.asList(new StreamMessage<>(collectionKey(sinkRecord), null, map(sinkRecord))); + } + + @SuppressWarnings("unchecked") + private Map map(SinkRecord sinkRecord) { + Object value = sinkRecord.value(); + if (value instanceof Struct) { + Map body = new LinkedHashMap<>(); + Struct struct = (Struct) value; + for (Field field : struct.schema().fields()) { + Object fieldValue = struct.get(field); + body.put(field.name().getBytes(config.getCharset()), + fieldValue == null ? null : fieldValue.toString().getBytes(config.getCharset())); + } + return body; + } + if (value instanceof Map) { + Map map = (Map) value; + Map body = new LinkedHashMap<>(); + for (Map.Entry e : map.entrySet()) { + body.put(e.getKey().getBytes(config.getCharset()), + String.valueOf(e.getValue()).getBytes(config.getCharset())); + } + return body; + } + throw new ConnectException("Unsupported source value type: " + sinkRecord.valueSchema().type().name()); + } + + @Override + public void stop() { + if (writer != null) { + writer.close(); + } + if (connection != null) { + connection.close(); + } + if (client != null) { + client.shutdown(); + client.getResources().shutdown(); + } + } + + @Override + public void put(final Collection records) { + log.debug("Processing {} records", records.size()); + try { + writer.write(new Chunk<>(new ArrayList<>(records))); + } catch (RedisConnectionException e) { + throw new RetriableException("Could not get connection to Redis", e); + } catch (RedisCommandTimeoutException e) { + throw new RetriableException("Timeout while writing sink records", e); + } catch (Exception e) { + throw new RetriableException("Could not write sink records", e); + } + log.info("Wrote {} records", records.size()); + } + + @Override + public void flush(Map currentOffsets) { + Map offsets = currentOffsets.entrySet().stream().map(this::offsetState) + .collect(offsetCollector); + log.trace("Writing offsets: {}", offsets); + try { + connection.sync().mset(offsets); + } catch (RedisCommandTimeoutException e) { + throw new RetriableException("Could not write offsets", e); + } + } + + private SinkOffsetState offsetState(Entry entry) { + return SinkOffsetState.of(entry.getKey(), entry.getValue().offset()); + } + + private class ConditionalDel implements Operation { + + private final Predicate delPredicate = r -> r.value() == null; + private final Operation write; + private final Operation del; + + public ConditionalDel(Operation delegate, + Operation remove) { + this.write = delegate; + this.del = remove; + } + + @Override + public List> execute(RedisAsyncCommands commands, + Chunk items) { + List> futures = new ArrayList<>(); + List toRemove = items.getItems().stream().filter(delPredicate).collect(Collectors.toList()); + futures.addAll(del.execute(commands, new Chunk<>(toRemove))); + List toWrite = items.getItems().stream().filter(delPredicate.negate()) + .collect(Collectors.toList()); + futures.addAll(write.execute(commands, new Chunk<>(toWrite))); + return futures; + } + + } } diff --git a/core/redis-kafka-connect/src/test/java/com/redis/kafka/connect/AbstractSinkIntegrationTests.java b/core/redis-kafka-connect/src/test/java/com/redis/kafka/connect/AbstractSinkIntegrationTests.java index 974ba62..d85f76f 100644 --- a/core/redis-kafka-connect/src/test/java/com/redis/kafka/connect/AbstractSinkIntegrationTests.java +++ b/core/redis-kafka-connect/src/test/java/com/redis/kafka/connect/AbstractSinkIntegrationTests.java @@ -25,6 +25,7 @@ import org.apache.kafka.connect.sink.SinkRecord; import org.apache.kafka.connect.sink.SinkTaskContext; import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.platform.commons.util.Preconditions; @@ -49,465 +50,494 @@ abstract class AbstractSinkIntegrationTests extends AbstractTestBase { - public static final int PARTITION = 1; - - public static final long OFFSET = 91283741L; - - public static final long TIMESTAMP = 1530286549123L; - - public static SinkRecord write(String topic, SchemaAndValue key, SchemaAndValue value) { - Preconditions.notNull(topic, "topic cannot be null"); - Preconditions.notNull(key, "key cannot be null."); - Preconditions.notNull(key.value(), "key cannot be null."); - Preconditions.notNull(value, "value cannot be null."); - Preconditions.notNull(value.value(), "value cannot be null."); - - return new SinkRecord(topic, PARTITION, key.schema(), key.value(), value.schema(), value.value(), OFFSET, - TIMESTAMP, TimestampType.CREATE_TIME); - } - - protected Map map(String... args) { - Assert.notNull(args, "Args cannot be null"); - Assert.isTrue(args.length % 2 == 0, "Args length is not a multiple of 2"); - Map body = new LinkedHashMap<>(); - for (int index = 0; index < args.length / 2; index++) { - body.put(args[index * 2], args[index * 2 + 1]); - } - return body; - } - - private RedisSinkTask task; - - @BeforeEach - public void createTask() { - task = new RedisSinkTask(); - } - - @AfterEach - public void stopTask() { - if (null != this.task) { - this.task.stop(); - } - } - - @Test - void emptyAssignment() { - SinkTaskContext taskContext = mock(SinkTaskContext.class); - when(taskContext.assignment()).thenReturn(ImmutableSet.of()); - this.task.initialize(taskContext); - this.task.start(ImmutableMap.of(RedisSinkConfigDef.URI_CONFIG, getRedisServer().getRedisURI())); - } - - @Test - void putEmpty() { - String topic = "putWrite"; - SinkTaskContext context = mock(SinkTaskContext.class); - when(context.assignment()).thenReturn(ImmutableSet.of(new TopicPartition(topic, 1))); - this.task.initialize(context); - this.task.start(ImmutableMap.of(RedisSinkConfigDef.URI_CONFIG, getRedisServer().getRedisURI())); - this.task.put(ImmutableList.of()); - } - - @Test - void putHash() { - String topic = "hash"; - int count = 50; - Map> expected = new LinkedHashMap<>(count); - List records = new ArrayList<>(count); - for (int i = 0; i < count; i++) { - Map map = map("field1", "This is field1 value" + i, "field2", "This is field2 value " + i); - expected.put("hash:" + i, map); - records.add(write(topic, new SchemaAndValue(Schema.STRING_SCHEMA, i), - new SchemaAndValue(SchemaBuilder.map(Schema.STRING_SCHEMA, Schema.STRING_SCHEMA), map))); - } - put(topic, RedisType.HASH, records); - for (String key : expected.keySet()) { - Map hash = expected.get(key); - Map actual = redisConnection.sync().hgetall(key); - assertEquals(hash, actual, String.format("Hash for key '%s' does not match.", key)); - } - } - - public static class Person { - - private long id; - - private String name; - - private Set hobbies = new HashSet<>(); - - private Address address; - - public long getId() { - return id; - } - - public void setId(long id) { - this.id = id; - } - - public String getName() { - return name; - } - - public void setName(String name) { - this.name = name; - } - - public Set getHobbies() { - return hobbies; - } - - public void setHobbies(Set hobbies) { - this.hobbies = hobbies; - } - - public Address getAddress() { - return address; - } - - public void setAddress(Address address) { - this.address = address; - } - - @Override - public int hashCode() { - return Objects.hash(address, hobbies, id, name); - } - - @Override - public boolean equals(Object obj) { - if (this == obj) - return true; - if (obj == null) - return false; - if (getClass() != obj.getClass()) - return false; - Person other = (Person) obj; - return Objects.equals(address, other.address) && Objects.equals(hobbies, other.hobbies) && id == other.id - && Objects.equals(name, other.name); - } - - } - - public static class Address { - - private String street; - - private String city; - - private String state; - - private String zip; - - public String getStreet() { - return street; - } - - public void setStreet(String street) { - this.street = street; - } - - public String getCity() { - return city; - } - - public void setCity(String city) { - this.city = city; - } - - public String getState() { - return state; - } - - public void setState(String state) { - this.state = state; - } - - public String getZip() { - return zip; - } - - public void setZip(String zip) { - this.zip = zip; - } - - @Override - public int hashCode() { - return Objects.hash(city, state, street, zip); - } - - @Override - public boolean equals(Object obj) { - if (this == obj) - return true; - if (obj == null) - return false; - if (getClass() != obj.getClass()) - return false; - Address other = (Address) obj; - return Objects.equals(city, other.city) && Objects.equals(state, other.state) - && Objects.equals(street, other.street) && Objects.equals(zip, other.zip); - } - - } - - @Test - void putJSON() throws JsonProcessingException { - String topic = "putJSON"; - List persons = new ArrayList<>(); - Person person1 = new Person(); - person1.setId(1); - person1.setName("Bodysnitch Canderbunt"); - person1.setHobbies(new HashSet<>(Arrays.asList("Fishing", "Singing"))); - Address address1 = new Address(); - address1.setCity("New York"); - address1.setZip("10013"); - address1.setState("NY"); - address1.setStreet("150 Mott St"); - person1.setAddress(address1); - persons.add(person1); - Person person2 = new Person(); - person2.setId(2); - person2.setName("Buffalo Custardbath"); - person2.setHobbies(new HashSet<>(Arrays.asList("Surfing", "Piano"))); - Address address2 = new Address(); - address2.setCity("Los Angeles"); - address2.setZip("90001"); - address2.setState("CA"); - address2.setStreet("123 Sunset Blvd"); - person2.setAddress(address2); - persons.add(person2); - Person person3 = new Person(); - person3.setId(3); - person3.setName("Bumblesnuff Crimpysnitch"); - person3.setHobbies(new HashSet<>(Arrays.asList("Skiing", "Drums"))); - Address address3 = new Address(); - address3.setCity("Chicago"); - address3.setZip("60603"); - address3.setState("IL"); - address3.setStreet("100 S State St"); - person3.setAddress(address3); - persons.add(person3); - List records = new ArrayList<>(); - ObjectMapper mapper = new ObjectMapper(); - for (Person person : persons) { - String json = mapper.writeValueAsString(person); - records.add(write(topic, new SchemaAndValue(Schema.STRING_SCHEMA, person.getId()), - new SchemaAndValue(Schema.STRING_SCHEMA, json))); - } - put(topic, RedisType.JSON, records); - for (Person person : persons) { - String json = redisConnection.sync().jsonGet(topic + ":" + person.getId()).get(0).toString(); - assertEquals(person, mapper.readValue(json, Person.class)); - } - } - - @Test - void putTimeSeries() { - String topic = "putTimeSeries"; - int count = 50; - long startTime = System.currentTimeMillis() - count; - List expectedSamples = new ArrayList<>(); - List records = new ArrayList<>(); - for (int index = 1; index <= count; index++) { - long timestamp = startTime + index; - double value = index; - expectedSamples.add(Sample.of(timestamp, value)); - records.add(write(topic, new SchemaAndValue(Schema.INT64_SCHEMA, timestamp), - new SchemaAndValue(Schema.FLOAT64_SCHEMA, value))); - } - put(topic, RedisType.TIMESERIES, records); - List actualSamples = redisConnection.sync().tsRange(topic, TimeRange.unbounded()); - assertEquals(expectedSamples.size(), actualSamples.size()); - for (int index = 0; index < expectedSamples.size(); index++) { - Sample expectedSample = expectedSamples.get(index); - Sample actualSample = actualSamples.get(index); - assertEquals(expectedSample.getTimestamp(), actualSample.getTimestamp()); - assertEquals(expectedSample.getValue(), actualSample.getValue()); - } - } - - @Test - void putList() { - String topic = "putList"; - int count = 50; - List expected = new ArrayList<>(count); - List records = new ArrayList<>(count); - for (int i = 0; i < count; i++) { - String member = "listmember:" + i; - expected.add(member); - records.add(write(topic, new SchemaAndValue(Schema.STRING_SCHEMA, member), - new SchemaAndValue(Schema.STRING_SCHEMA, member))); - } - put(topic, RedisType.LIST, records); - assertEquals(expected, redisConnection.sync().lrange(topic, 0, -1)); - } - - @Test - void putSet() { - String topic = "putSet"; - int count = 50; - Set expected = new HashSet<>(count); - List records = new ArrayList<>(count); - for (int i = 0; i < count; i++) { - String member = "setmember:" + i; - expected.add(member); - records.add(write(topic, new SchemaAndValue(Schema.STRING_SCHEMA, member), - new SchemaAndValue(Schema.STRING_SCHEMA, member))); - } - put(topic, RedisType.SET, records); - Set members = redisConnection.sync().smembers(topic); - assertEquals(expected, members); - } - - @Test - void putStream() { - String topic = "putStream"; - int count = 50; - List> expected = new ArrayList<>(count); - List records = new ArrayList<>(count); - for (int i = 0; i < count; i++) { - Map body = map("field1", "This is field1 value" + i, "field2", "This is field2 value " + i); - expected.add(body); - records.add(write(topic, new SchemaAndValue(Schema.STRING_SCHEMA, "key" + i), - new SchemaAndValue(SchemaBuilder.map(Schema.STRING_SCHEMA, Schema.STRING_SCHEMA), body))); - } - put(topic, RedisType.STREAM, records); - List> messages = redisConnection.sync().xrange(topic, Range.unbounded()); - assertEquals(records.size(), messages.size()); - for (int index = 0; index < messages.size(); index++) { - Map body = expected.get(index); - StreamMessage message = messages.get(index); - assertEquals(body, message.getBody(), String.format("Body for message #%s does not match.", index)); - } - } - - @Test - void putString() { - String topic = "string"; - int count = 50; - Map expected = new LinkedHashMap<>(count); - List records = new ArrayList<>(count); - for (int i = 0; i < count; i++) { - String key = String.valueOf(i); - String value = "This is value " + i; - expected.put(topic + ":" + key, value); - records.add(write(topic, new SchemaAndValue(Schema.STRING_SCHEMA, key), - new SchemaAndValue(Schema.STRING_SCHEMA, value))); - } - put(topic, RedisType.STRING, records); - String[] keys = expected.keySet().toArray(new String[0]); - List> actual = redisConnection.sync().mget(keys); - assertEquals(records.size(), actual.size()); - for (KeyValue keyValue : actual) { - assertEquals(expected.get(keyValue.getKey()), keyValue.getValue(), - String.format("Value for key '%s' does not match.", keyValue.getKey())); - } - } - - @Test - void setBytes() { - String topic = "setBytes"; - int count = 50; - Map expected = new LinkedHashMap<>(count); - List records = new ArrayList<>(count); - for (int i = 0; i < count; i++) { - String key = topic + i; - String value = "This is value " + i; - expected.put(key, value); - records.add(write(topic, new SchemaAndValue(Schema.BYTES_SCHEMA, key.getBytes(StandardCharsets.UTF_8)), - new SchemaAndValue(Schema.BYTES_SCHEMA, value.getBytes(StandardCharsets.UTF_8)))); - } - put(topic, RedisType.STRING, records, RedisSinkConfigDef.KEYSPACE_CONFIG, ""); - String[] keys = expected.keySet().toArray(new String[0]); - List> actual = redisConnection.sync().mget(keys); - assertEquals(records.size(), actual.size()); - for (KeyValue keyValue : actual) { - assertEquals(expected.get(keyValue.getKey()), keyValue.getValue(), - String.format("Value for key '%s' does not match.", keyValue.getKey())); - } - } - - @Test - void putZset() { - String topic = "putZset"; - int count = 50; - List> expected = new ArrayList<>(count); - List records = new ArrayList<>(count); - for (int i = 0; i < count; i++) { - String value = "zsetmember:" + i; - expected.add(ScoredValue.just(i, value)); - records.add(write(topic, new SchemaAndValue(Schema.STRING_SCHEMA, value), - new SchemaAndValue(Schema.FLOAT64_SCHEMA, i))); - } - put(topic, RedisType.ZSET, records); - List> actual = redisConnection.sync().zrangeWithScores(topic, 0, -1); - expected.sort(Comparator.comparing(ScoredValue::getScore)); - assertEquals(expected, actual); - } - - public void put(String topic, RedisType type, List records, String... props) { - SinkTaskContext taskContext = mock(SinkTaskContext.class); - when(taskContext.assignment()).thenReturn(ImmutableSet.of(new TopicPartition(topic, 1))); - task.initialize(taskContext); - Map propsMap = map(RedisSinkConfigDef.URI_CONFIG, getRedisServer().getRedisURI(), - RedisSinkConfigDef.TYPE_CONFIG, type.name()); - propsMap.putAll(map(props)); - task.start(propsMap); - task.put(records); - } - - public static SinkRecord sinkRecord(String topic, SchemaAndValue key, SchemaAndValue value) { - Preconditions.notNull(topic, "topic cannot be null"); - if (key == null) { - throw new DataException("key cannot be null."); - } - if (key.value() == null) { - throw new DataException("key cannot be null."); - } - - return new SinkRecord(topic, PARTITION, schema(key), value(key), schema(value), value(value), OFFSET, TIMESTAMP, - TimestampType.CREATE_TIME); - } - - private static Schema schema(SchemaAndValue value) { - if (value == null) { - return null; - } - return value.schema(); - } - - private static Object value(SchemaAndValue value) { - if (value == null) { - return null; - } - return value.value(); - } - - @Test - void putDelete() { - String topic = "putDelete"; - SinkTaskContext taskContext = mock(SinkTaskContext.class); - when(taskContext.assignment()).thenReturn(ImmutableSet.of(new TopicPartition(topic, 1))); - this.task.initialize(taskContext); - this.task.start(ImmutableMap.of(RedisSinkConfigDef.URI_CONFIG, getRedisServer().getRedisURI(), - RedisSinkConfigDef.TYPE_CONFIG, RedisType.HASH.name())); - int count = 50; - Map expected = new LinkedHashMap<>(count); - List records = new ArrayList<>(count); - for (int i = 0; i < count; i++) { - final String value = "This is value " + i; - records.add(sinkRecord(topic, new SchemaAndValue(Schema.STRING_SCHEMA, i), null)); - expected.put(topic + ":" + i, value); - } - Map values = expected.entrySet().stream() - .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); - redisConnection.sync().mset(values); - task.put(records); - String[] keys = expected.keySet().toArray(new String[0]); - long actual = redisConnection.sync().exists(keys); - assertEquals(0L, actual, "All of the keys should be removed from Redis."); - } + public static final int PARTITION = 1; + + public static final long OFFSET = 91283741L; + + public static final long TIMESTAMP = 1530286549123L; + + public static SinkRecord write(String topic, SchemaAndValue key, SchemaAndValue value) { + Preconditions.notNull(topic, "topic cannot be null"); + Preconditions.notNull(key, "key cannot be null."); + Preconditions.notNull(key.value(), "key cannot be null."); + Preconditions.notNull(value, "value cannot be null."); + Preconditions.notNull(value.value(), "value cannot be null."); + + return new SinkRecord(topic, PARTITION, key.schema(), key.value(), value.schema(), value.value(), OFFSET, TIMESTAMP, + TimestampType.CREATE_TIME); + } + + protected Map map(String... args) { + Assert.notNull(args, "Args cannot be null"); + Assert.isTrue(args.length % 2 == 0, "Args length is not a multiple of 2"); + Map body = new LinkedHashMap<>(); + for (int index = 0; index < args.length / 2; index++) { + body.put(args[index * 2], args[index * 2 + 1]); + } + return body; + } + + private RedisSinkTask task; + + @BeforeEach + public void createTask() { + task = new RedisSinkTask(); + } + + @AfterEach + public void stopTask() { + if (null != this.task) { + this.task.stop(); + } + } + + @Test + void emptyAssignment() { + SinkTaskContext taskContext = mock(SinkTaskContext.class); + when(taskContext.assignment()).thenReturn(ImmutableSet.of()); + this.task.initialize(taskContext); + this.task.start(ImmutableMap.of(RedisSinkConfigDef.URI_CONFIG, getRedisServer().getRedisURI())); + } + + @Test + void putEmpty() { + String topic = "putWrite"; + SinkTaskContext context = mock(SinkTaskContext.class); + when(context.assignment()).thenReturn(ImmutableSet.of(new TopicPartition(topic, 1))); + this.task.initialize(context); + this.task.start(ImmutableMap.of(RedisSinkConfigDef.URI_CONFIG, getRedisServer().getRedisURI())); + this.task.put(ImmutableList.of()); + } + + @Test + void putHash() { + String topic = "hash"; + int count = 50; + Map> expected = new LinkedHashMap<>(count); + List records = new ArrayList<>(count); + for (int i = 0; i < count; i++) { + Map map = map("field1", "This is field1 value" + i, "field2", "This is field2 value " + i); + expected.put("hash:" + i, map); + records.add(write(topic, new SchemaAndValue(Schema.STRING_SCHEMA, i), + new SchemaAndValue(SchemaBuilder.map(Schema.STRING_SCHEMA, Schema.STRING_SCHEMA), map))); + } + put(topic, RedisType.HASH, records); + for (String key : expected.keySet()) { + Map hash = expected.get(key); + Map actual = redisConnection.sync().hgetall(key); + assertEquals(hash, actual, String.format("Hash for key '%s' does not match.", key)); + } + } + + public static class Person { + + private long id; + + private String name; + + private Set hobbies = new HashSet<>(); + + private Address address; + + public long getId() { + return id; + } + + public void setId(long id) { + this.id = id; + } + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + + public Set getHobbies() { + return hobbies; + } + + public void setHobbies(Set hobbies) { + this.hobbies = hobbies; + } + + public Address getAddress() { + return address; + } + + public void setAddress(Address address) { + this.address = address; + } + + @Override + public int hashCode() { + return Objects.hash(address, hobbies, id, name); + } + + @Override + public boolean equals(Object obj) { + if (this == obj) + return true; + if (obj == null) + return false; + if (getClass() != obj.getClass()) + return false; + Person other = (Person) obj; + return Objects.equals(address, other.address) && Objects.equals(hobbies, other.hobbies) && id == other.id + && Objects.equals(name, other.name); + } + + } + + public static class Address { + + private String street; + + private String city; + + private String state; + + private String zip; + + public String getStreet() { + return street; + } + + public void setStreet(String street) { + this.street = street; + } + + public String getCity() { + return city; + } + + public void setCity(String city) { + this.city = city; + } + + public String getState() { + return state; + } + + public void setState(String state) { + this.state = state; + } + + public String getZip() { + return zip; + } + + public void setZip(String zip) { + this.zip = zip; + } + + @Override + public int hashCode() { + return Objects.hash(city, state, street, zip); + } + + @Override + public boolean equals(Object obj) { + if (this == obj) + return true; + if (obj == null) + return false; + if (getClass() != obj.getClass()) + return false; + Address other = (Address) obj; + return Objects.equals(city, other.city) && Objects.equals(state, other.state) && Objects.equals(street, + other.street) && Objects.equals(zip, other.zip); + } + + } + + @Test + void putJSON() throws JsonProcessingException { + String topic = "putJSON"; + List persons = new ArrayList<>(); + Person person1 = new Person(); + person1.setId(1); + person1.setName("Bodysnitch Canderbunt"); + person1.setHobbies(new HashSet<>(Arrays.asList("Fishing", "Singing"))); + Address address1 = new Address(); + address1.setCity("New York"); + address1.setZip("10013"); + address1.setState("NY"); + address1.setStreet("150 Mott St"); + person1.setAddress(address1); + persons.add(person1); + Person person2 = new Person(); + person2.setId(2); + person2.setName("Buffalo Custardbath"); + person2.setHobbies(new HashSet<>(Arrays.asList("Surfing", "Piano"))); + Address address2 = new Address(); + address2.setCity("Los Angeles"); + address2.setZip("90001"); + address2.setState("CA"); + address2.setStreet("123 Sunset Blvd"); + person2.setAddress(address2); + persons.add(person2); + Person person3 = new Person(); + person3.setId(3); + person3.setName("Bumblesnuff Crimpysnitch"); + person3.setHobbies(new HashSet<>(Arrays.asList("Skiing", "Drums"))); + Address address3 = new Address(); + address3.setCity("Chicago"); + address3.setZip("60603"); + address3.setState("IL"); + address3.setStreet("100 S State St"); + person3.setAddress(address3); + persons.add(person3); + List records = new ArrayList<>(); + ObjectMapper mapper = new ObjectMapper(); + for (Person person : persons) { + String json = mapper.writeValueAsString(person); + records.add(write(topic, new SchemaAndValue(Schema.STRING_SCHEMA, person.getId()), + new SchemaAndValue(Schema.STRING_SCHEMA, json))); + } + put(topic, RedisType.JSON, records); + for (Person person : persons) { + String json = redisConnection.sync().jsonGet(topic + ":" + person.getId()).get(0).toString(); + assertEquals(person, mapper.readValue(json, Person.class)); + } + } + + @Test + void putTimeSeries() { + String topic = "putTimeSeries"; + int count = 50; + long startTime = System.currentTimeMillis() - count; + List expectedSamples = new ArrayList<>(); + List records = new ArrayList<>(); + for (int index = 1; index <= count; index++) { + long timestamp = startTime + index; + double value = index; + expectedSamples.add(Sample.of(timestamp, value)); + records.add(write(topic, new SchemaAndValue(Schema.INT64_SCHEMA, timestamp), + new SchemaAndValue(Schema.FLOAT64_SCHEMA, value))); + } + put(topic, RedisType.TIMESERIES, records); + List actualSamples = redisConnection.sync().tsRange(topic, TimeRange.unbounded()); + assertEquals(expectedSamples.size(), actualSamples.size()); + for (int index = 0; index < expectedSamples.size(); index++) { + Sample expectedSample = expectedSamples.get(index); + Sample actualSample = actualSamples.get(index); + assertEquals(expectedSample.getTimestamp(), actualSample.getTimestamp()); + assertEquals(expectedSample.getValue(), actualSample.getValue()); + } + } + + @Test + void putList() { + String topic = "putList"; + int count = 50; + List expected = new ArrayList<>(count); + List records = new ArrayList<>(count); + for (int i = 0; i < count; i++) { + String member = "listmember:" + i; + expected.add(member); + records.add(write(topic, new SchemaAndValue(Schema.STRING_SCHEMA, member), + new SchemaAndValue(Schema.STRING_SCHEMA, member))); + } + put(topic, RedisType.LIST, records); + assertEquals(expected, redisConnection.sync().lrange(topic, 0, -1)); + } + + @Test + void putSet() { + String topic = "putSet"; + int count = 50; + Set expected = new HashSet<>(count); + List records = new ArrayList<>(count); + for (int i = 0; i < count; i++) { + String member = "setmember:" + i; + expected.add(member); + records.add(write(topic, new SchemaAndValue(Schema.STRING_SCHEMA, member), + new SchemaAndValue(Schema.STRING_SCHEMA, member))); + } + put(topic, RedisType.SET, records); + Set members = redisConnection.sync().smembers(topic); + assertEquals(expected, members); + } + + @Test + void putStream() { + String topic = "putStream"; + int count = 50; + List> expected = new ArrayList<>(count); + List records = new ArrayList<>(count); + for (int i = 0; i < count; i++) { + Map body = map("field1", "This is field1 value" + i, "field2", "This is field2 value " + i); + expected.add(body); + records.add(write(topic, new SchemaAndValue(Schema.STRING_SCHEMA, "key" + i), + new SchemaAndValue(SchemaBuilder.map(Schema.STRING_SCHEMA, Schema.STRING_SCHEMA), body))); + } + put(topic, RedisType.STREAM, records); + List> messages = redisConnection.sync().xrange(topic, Range.unbounded()); + assertEquals(records.size(), messages.size()); + for (int index = 0; index < messages.size(); index++) { + Map body = expected.get(index); + StreamMessage message = messages.get(index); + assertEquals(body, message.getBody(), String.format("Body for message #%s does not match.", index)); + } + } + + @Test + void putString() { + String topic = "string"; + int count = 50; + Map expected = new LinkedHashMap<>(count); + List records = new ArrayList<>(count); + for (int i = 0; i < count; i++) { + String key = String.valueOf(i); + String value = "This is value " + i; + expected.put(topic + ":" + key, value); + records.add(write(topic, new SchemaAndValue(Schema.STRING_SCHEMA, key), + new SchemaAndValue(Schema.STRING_SCHEMA, value))); + } + put(topic, RedisType.STRING, records); + String[] keys = expected.keySet().toArray(new String[0]); + List> actual = redisConnection.sync().mget(keys); + assertEquals(records.size(), actual.size()); + for (KeyValue keyValue : actual) { + assertEquals(expected.get(keyValue.getKey()), keyValue.getValue(), + String.format("Value for key '%s' does not match.", keyValue.getKey())); + } + } + + @Test + void setBytes() { + String topic = "setBytes"; + int count = 50; + Map expected = new LinkedHashMap<>(count); + List records = new ArrayList<>(count); + for (int i = 0; i < count; i++) { + String key = topic + i; + String value = "This is value " + i; + expected.put(key, value); + records.add(write(topic, new SchemaAndValue(Schema.BYTES_SCHEMA, key.getBytes(StandardCharsets.UTF_8)), + new SchemaAndValue(Schema.BYTES_SCHEMA, value.getBytes(StandardCharsets.UTF_8)))); + } + put(topic, RedisType.STRING, records, RedisSinkConfigDef.KEYSPACE_CONFIG, ""); + String[] keys = expected.keySet().toArray(new String[0]); + List> actual = redisConnection.sync().mget(keys); + assertEquals(records.size(), actual.size()); + for (KeyValue keyValue : actual) { + assertEquals(expected.get(keyValue.getKey()), keyValue.getValue(), + String.format("Value for key '%s' does not match.", keyValue.getKey())); + } + } + + @Test + void putZset() { + String topic = "putZset"; + int count = 50; + List> expected = new ArrayList<>(count); + List records = new ArrayList<>(count); + for (int i = 0; i < count; i++) { + String value = "zsetmember:" + i; + expected.add(ScoredValue.just(i, value)); + records.add(write(topic, new SchemaAndValue(Schema.STRING_SCHEMA, value), + new SchemaAndValue(Schema.FLOAT64_SCHEMA, i))); + } + put(topic, RedisType.ZSET, records); + List> actual = redisConnection.sync().zrangeWithScores(topic, 0, -1); + expected.sort(Comparator.comparing(ScoredValue::getScore)); + assertEquals(expected, actual); + } + + public void put(String topic, RedisType type, List records, String... props) { + SinkTaskContext taskContext = mock(SinkTaskContext.class); + when(taskContext.assignment()).thenReturn(ImmutableSet.of(new TopicPartition(topic, 1))); + task.initialize(taskContext); + Map propsMap = map(RedisSinkConfigDef.URI_CONFIG, getRedisServer().getRedisURI(), + RedisSinkConfigDef.TYPE_CONFIG, type.name()); + propsMap.putAll(map(props)); + task.start(propsMap); + task.put(records); + } + + public static SinkRecord sinkRecord(String topic, SchemaAndValue key, SchemaAndValue value) { + Preconditions.notNull(topic, "topic cannot be null"); + if (key == null) { + throw new DataException("key cannot be null."); + } + if (key.value() == null) { + throw new DataException("key cannot be null."); + } + + return new SinkRecord(topic, PARTITION, schema(key), value(key), schema(value), value(value), OFFSET, TIMESTAMP, + TimestampType.CREATE_TIME); + } + + private static Schema schema(SchemaAndValue value) { + if (value == null) { + return null; + } + return value.schema(); + } + + private static Object value(SchemaAndValue value) { + if (value == null) { + return null; + } + return value.value(); + } + + @Test + void putDelete() { + String topic = "putDelete"; + SinkTaskContext taskContext = mock(SinkTaskContext.class); + when(taskContext.assignment()).thenReturn(ImmutableSet.of(new TopicPartition(topic, 1))); + this.task.initialize(taskContext); + this.task.start( + ImmutableMap.of(RedisSinkConfigDef.URI_CONFIG, getRedisServer().getRedisURI(), RedisSinkConfigDef.TYPE_CONFIG, + RedisType.HASH.name())); + int count = 50; + Map expected = new LinkedHashMap<>(count); + List records = new ArrayList<>(count); + for (int i = 0; i < count; i++) { + final String value = "This is value " + i; + records.add(sinkRecord(topic, new SchemaAndValue(Schema.STRING_SCHEMA, i), null)); + expected.put(topic + ":" + i, value); + } + Map values = expected.entrySet().stream() + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + redisConnection.sync().mset(values); + task.put(records); + String[] keys = expected.keySet().toArray(new String[0]); + long actual = redisConnection.sync().exists(keys); + assertEquals(0L, actual, "All of the keys should be removed from Redis."); + } + + @Test + void putExpire() { + String topic = "putExpire"; + SinkTaskContext taskContext = mock(SinkTaskContext.class); + when(taskContext.assignment()).thenReturn(ImmutableSet.of(new TopicPartition(topic, 1))); + this.task.initialize(taskContext); + this.task.start( + ImmutableMap.of(RedisSinkConfigDef.URI_CONFIG, getRedisServer().getRedisURI(), RedisSinkConfigDef.TYPE_CONFIG, + RedisType.STRING.name(), RedisSinkConfigDef.KEY_TTL_CONFIG, "3600")); + int count = 50; + Map expected = new LinkedHashMap<>(count); + List records = new ArrayList<>(count); + for (int i = 0; i < count; i++) { + final String value = "This is value " + i; + records.add(sinkRecord(topic, new SchemaAndValue(Schema.STRING_SCHEMA, i), + new SchemaAndValue(Schema.STRING_SCHEMA, value))); + expected.put(topic + ":" + i, value); + } + Map values = expected.entrySet().stream() + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + redisConnection.sync().mset(values); + task.put(records); + String[] keys = expected.keySet().toArray(new String[0]); + for (String key : keys) { + Assertions.assertEquals(3600, redisConnection.sync().ttl(key), 100); + } + } } diff --git a/docker-compose.yml b/docker-compose.yml index 7195a68..10e21df 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,45 +1,38 @@ services: - zookeeper: - image: confluentinc/cp-zookeeper:7.2.0 - hostname: zookeeper - container_name: zookeeper - ports: - - "2181:2181" - environment: - ZOOKEEPER_CLIENT_PORT: 2181 - ZOOKEEPER_TICK_TIME: 2000 broker: - image: confluentinc/cp-server:7.2.0 + image: confluentinc/cp-kafka:7.7.0 hostname: broker container_name: broker - depends_on: - - zookeeper ports: - "9092:9092" - "9101:9101" environment: - KAFKA_BROKER_ID: 1 - KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181' - KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT - KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092 - KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter + KAFKA_NODE_ID: 1 + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT' + KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092' KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0 - KAFKA_CONFLUENT_LICENSE_TOPIC_REPLICATION_FACTOR: 1 - KAFKA_CONFLUENT_BALANCER_TOPIC_REPLICATION_FACTOR: 1 KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1 KAFKA_JMX_PORT: 9101 KAFKA_JMX_HOSTNAME: localhost - KAFKA_CONFLUENT_SCHEMA_REGISTRY_URL: http://schema-registry:8081 - CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: broker:29092 - CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1 - CONFLUENT_METRICS_ENABLE: 'true' - CONFLUENT_SUPPORT_CUSTOMER_ID: 'anonymous' + KAFKA_PROCESS_ROLES: 'broker,controller' + KAFKA_CONTROLLER_QUORUM_VOTERS: '1@broker:29093' + KAFKA_LISTENERS: 'PLAINTEXT://broker:29092,CONTROLLER://broker:29093,PLAINTEXT_HOST://0.0.0.0:9092' + KAFKA_INTER_BROKER_LISTENER_NAME: 'PLAINTEXT' + KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER' + KAFKA_LOG_DIRS: '/tmp/kraft-combined-logs' + # Replace CLUSTER_ID with a unique base64 UUID using "bin/kafka-storage.sh random-uuid" + # See https://docs.confluent.io/kafka/operations-tools/kafka-tools.html#kafka-storage-sh + CLUSTER_ID: 'MkU3OEVBNTcwNTJENDM2Qk' + healthcheck: + test: echo srvr | nc broker 9092 || exit 1 + interval: 5s + retries: 10 schema-registry: - image: confluentinc/cp-schema-registry:7.2.0 + image: confluentinc/cp-schema-registry:7.7.0 hostname: schema-registry container_name: schema-registry depends_on: @@ -50,9 +43,15 @@ services: SCHEMA_REGISTRY_HOST_NAME: schema-registry SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'broker:29092' SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081 + healthcheck: + interval: 5s + retries: 10 + test: curl --write-out 'HTTP %{http_code}' --fail --silent --output /dev/null http://localhost:8081 connect: image: fieldengineering/redis-kafka-connect + build: + context: . hostname: connect container_name: connect depends_on: @@ -80,9 +79,13 @@ services: CONNECT_CONSUMER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor" CONNECT_PLUGIN_PATH: "/usr/share/java,/usr/share/confluent-hub-components" CONNECT_LOG4J_LOGGERS: org.apache.zookeeper=ERROR,org.I0Itec.zkclient=ERROR,org.reflections=ERROR + healthcheck: + interval: 5s + retries: 10 + test: curl --write-out 'HTTP %{http_code}' --fail --silent --output /dev/null http://localhost:8083 control-center: - image: confluentinc/cp-enterprise-control-center:7.2.0 + image: confluentinc/cp-enterprise-control-center:7.7.0 hostname: control-center container_name: control-center depends_on: @@ -103,9 +106,13 @@ services: CONTROL_CENTER_MONITORING_INTERCEPTOR_TOPIC_PARTITIONS: 1 CONFLUENT_METRICS_TOPIC_REPLICATION: 1 PORT: 9021 + healthcheck: + interval: 5s + retries: 10 + test: curl --write-out 'HTTP %{http_code}' --fail --silent --output /dev/null http://localhost:9021 ksqldb-server: - image: confluentinc/cp-ksqldb-server:7.2.0 + image: confluentinc/cp-ksqldb-server:7.7.0 hostname: ksqldb-server container_name: ksqldb-server depends_on: @@ -126,9 +133,13 @@ services: KSQL_KSQL_LOGGING_PROCESSING_TOPIC_REPLICATION_FACTOR: 1 KSQL_KSQL_LOGGING_PROCESSING_TOPIC_AUTO_CREATE: 'true' KSQL_KSQL_LOGGING_PROCESSING_STREAM_AUTO_CREATE: 'true' + healthcheck: + interval: 5s + retries: 10 + test: curl --write-out 'HTTP %{http_code}' --fail --silent --output /dev/null http://localhost:8088 rest-proxy: - image: confluentinc/cp-kafka-rest:7.2.0 + image: confluentinc/cp-kafka-rest:7.7.0 depends_on: - broker - schema-registry @@ -141,10 +152,20 @@ services: KAFKA_REST_BOOTSTRAP_SERVERS: 'broker:29092' KAFKA_REST_LISTENERS: "http://0.0.0.0:8082" KAFKA_REST_SCHEMA_REGISTRY_URL: 'http://schema-registry:8081' - + healthcheck: + interval: 5s + retries: 10 + test: curl --write-out 'HTTP %{http_code}' --fail --silent --output /dev/null http://localhost:8082 + redis: image: redis/redis-stack-server hostname: redis container_name: redis ports: - - "6379:6379" \ No newline at end of file + - "6379:6379" + healthcheck: + test: [ "CMD-SHELL", "redis-cli ping | grep PONG" ] + interval: 10s + retries: 5 + start_period: 5s + timeout: 5s \ No newline at end of file diff --git a/docs/guide/src/docs/asciidoc/sink.adoc b/docs/guide/src/docs/asciidoc/sink.adoc index 5aa9386..e5d1bf6 100644 --- a/docs/guide/src/docs/asciidoc/sink.adoc +++ b/docs/guide/src/docs/asciidoc/sink.adoc @@ -266,6 +266,39 @@ value.converter.schemas.enable = <1> ---- <1> Set to `true` if the JSON record structure has an attached schema +[[_sink_ttl]] +== TTL (Time To Live) + +The {name} supports setting TTL (Time To Live) for Redis keys to automatically expire data after a specified duration. TTL can be configured globally for all keys or per-message using Kafka headers. + +=== Global TTL Configuration + +Set a global TTL for all keys using the `redis.key.ttl` configuration property: + +[source,properties] +---- +redis.key.ttl = 3600 # TTL in seconds (1 hour) +---- + +=== Per-Message TTL + +Include a `redis.key.ttl` header in your Kafka messages to set TTL for individual records: + +[source,java] +---- +ProducerRecord record = new ProducerRecord<>("topic", "key", "value"); +record.headers().add("redis.key.ttl", "1800".getBytes()); // 30 minutes +---- + +=== TTL Behavior + +* TTL is applied using the Redis `EXPIRE` command after data is written +* Per-message TTL headers take precedence over global configuration +* TTL applies to all Redis data types +* For collection types (STREAM, LIST, SET, ZSET, TIMESERIES), TTL is applied to the collection key +* Invalid TTL values are logged and ignored +* TTL value of -1 (default) means no expiration + [[_sink_config]] == Configuration @@ -275,12 +308,14 @@ connector.class = com.redis.kafka.connect.RedisSinkConnector topics = <1> redis.uri = <2> redis.type = <3> -key.converter = <4> -value.converter = <5> +redis.key.ttl = <4> +key.converter = <5> +value.converter = <6> ---- <1> Kafka topics to read messsages from. <2> <<_sink_redis_client,Redis URI>>. <3> <<_sink_redis_command,Redis command>>. -<4> <<_sink_key,Key converter>>. -<5> <<_sink_value,Value converter>>. +<4> <<_sink_ttl,TTL in seconds>> (optional, default: -1 for no expiration). +<5> <<_sink_key,Key converter>>. +<6> <<_sink_value,Value converter>>. diff --git a/run.sh b/run.sh index 15663fc..9926704 100755 --- a/run.sh +++ b/run.sh @@ -86,7 +86,8 @@ curl -X POST -H "Content-Type: application/json" --data ' "redis.uri": "redis://redis:6379", "key.converter": "org.apache.kafka.connect.storage.StringConverter", "value.converter": "org.apache.kafka.connect.json.JsonConverter", - "value.converter.schemas.enable": "false" + "value.converter.schemas.enable": "false", + "redis.key.ttl": "3600" }}' http://localhost:8083/connectors -w "\n" sleep 2