Skip to content
Open
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
0.9.1
0.10.0-beta
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
package com.redis.kafka.connect.operation;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import com.redis.lettucemod.api.async.RedisJSONAsyncCommands;
import com.redis.spring.batch.writer.operation.AbstractKeyWriteOperation;
import io.lettuce.core.RedisFuture;
import io.lettuce.core.api.async.BaseRedisAsyncCommands;
import io.lettuce.core.api.async.RedisKeyAsyncCommands;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.nio.charset.StandardCharsets;
import java.util.function.Function;

public class JsonMerge<K, V, T> extends AbstractKeyWriteOperation<K, V, T> {
private static final Logger log = LoggerFactory.getLogger(JsonSet.class);

public static final String ROOT_PATH = "$";
private static final Function<Object, String> DEFAULT_PATH_FUNCTION = t -> ROOT_PATH;

private Function<T, String> pathFunction;
private Function<T, V> valueFunction;
private Function<T, Boolean> conditionFunction;

private final ObjectMapper mapper;

public JsonMerge() {
// Set default path function
this.pathFunction = (Function<T, String>) DEFAULT_PATH_FUNCTION;
this.mapper = new ObjectMapper();
this.mapper.configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, false);
}

public void setPath(String path) {
this.pathFunction = t -> path;
}

public void setPathFunction(Function<T, String> pathFunction) {
this.pathFunction = pathFunction;
}

public void setValueFunction(Function<T, V> valueFunction) {
this.valueFunction = valueFunction;
}

public void setConditionFunction(Function<T, Boolean> conditionFunction) {
this.conditionFunction = conditionFunction;
}

protected RedisFuture<String> execute(BaseRedisAsyncCommands<K, V> commands, T item, K key) {
try {
String path = determinePath(item);
logPath(path);
V value = this.valueFunction.apply(item);
log.info("Value: {}", value);

if (conditionFunction.apply(item)) {
if (isPathSet()) {
return deleteJsonPath(commands, key, path);
} else {
return deleteKey(commands, key);
}
}
System.out.println("isPathSet() "+isPathSet());
// Perform JSON operation based on whether path is set
if (isPathSet()) {

return performJsonMerge(commands, key, path, value);
} else {
return performJsonSet(commands, key, value);
}
} catch (JsonProcessingException e) {
log.error("Error processing JSON", e);
return null;
} catch (Exception e) {
log.error("Error executing Redis command", e);
return null;
}
}

private RedisFuture<String> deleteKey(BaseRedisAsyncCommands<K, V> commands, K key) {
return ((RedisKeyAsyncCommands) commands).del(key);
}

private RedisFuture<String> deleteJsonPath(BaseRedisAsyncCommands<K, V> commands, K key, String path) {
return ((RedisJSONAsyncCommands) commands).jsonDel(key, path);
}

private String determinePath(T item) {
return this.pathFunction.apply(item);
}

private void logPath(String path) {
if (isPathSet()) {
log.info("Path is set to: {}", path);
} else {
log.info("Path is not set, using default: {}", ROOT_PATH);
}
}

@SuppressWarnings("unchecked")
private RedisFuture<String> performJsonMerge(BaseRedisAsyncCommands<K, V> commands, K key, String path, V value) throws JsonProcessingException {
// Convert empty JSON object
String emptyJson = mapper.writeValueAsString(new Object());
byte[] emptyJsonBytes = emptyJson.getBytes(StandardCharsets.UTF_8);

// Merge empty JSON object first
((RedisJSONAsyncCommands<K, V>) commands).jsonMerge(key, ROOT_PATH, (V) emptyJsonBytes);
// Merge actual value
return ((RedisJSONAsyncCommands<K, V>) commands).jsonMerge(key, path, value);
}

private RedisFuture<String> performJsonSet(BaseRedisAsyncCommands<K, V> commands, K key, V value) throws JsonProcessingException {
return ((RedisJSONAsyncCommands<K, V>) commands).jsonSet(key, ROOT_PATH, value);
}

private boolean isPathSet() {
return this.pathFunction != DEFAULT_PATH_FUNCTION;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package com.redis.kafka.connect.operation;

import com.redis.lettucemod.api.async.RedisJSONAsyncCommands;
import com.redis.spring.batch.writer.operation.AbstractKeyWriteOperation;
import io.lettuce.core.RedisFuture;
import io.lettuce.core.api.async.BaseRedisAsyncCommands;
import io.lettuce.core.api.async.RedisKeyAsyncCommands;

import java.util.function.Function;

public class JsonSet<K, V, T> extends AbstractKeyWriteOperation<K, V, T> {
public static final String ROOT_PATH = "$";
private Function<T, String> pathFunction = (t) -> {
return "$";
};
private Function<T, V> valueFunction;
private Function<T, Boolean> conditionFunction;

public JsonSet() {
}

public void setPath(String path) {
this.pathFunction = (t) -> {
return path;
};
}

public void setPathFunction(Function<T, String> path) {
this.pathFunction = path;
}

public void setValueFunction(Function<T, V> value) {
this.valueFunction = value;
}

public void setConditionFunction(Function<T, Boolean> function) {
this.conditionFunction = function;
}

protected RedisFuture<String> execute(BaseRedisAsyncCommands<K, V> commands, T item, K key) {
if (conditionFunction.apply(item)) {
return ((RedisKeyAsyncCommands)commands).del(new Object[]{key});
} else {
String path = (String)this.pathFunction.apply(item);
V value = this.valueFunction.apply(item);
return ((RedisJSONAsyncCommands)commands).jsonSet(key, path, value);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package com.redis.kafka.connect.operation;

import com.redis.spring.batch.writer.operation.AbstractKeyWriteOperation;
import io.lettuce.core.RedisFuture;
import io.lettuce.core.api.async.BaseRedisAsyncCommands;
import io.lettuce.core.api.async.RedisSetAsyncCommands;
import java.util.function.Function;

public class Sadd<K, V, T> extends AbstractKeyWriteOperation<K, V, T> {
private Function<T, V> valueFunction;
private Function<T, Boolean> conditionFunction;

public Sadd() {
}

public void setValueFunction(Function<T, V> function) {
this.valueFunction = function;
}

public void setConditionFunction(Function<T, Boolean> function) {
this.conditionFunction = function;
}

protected RedisFuture<Long> execute(BaseRedisAsyncCommands<K, V> commands, T item, K key) {
V value = valueFunction.apply(item);
if (conditionFunction.apply(item)) {
return ((RedisSetAsyncCommands<K, V>) commands).srem(key, value);
} else {
return ((RedisSetAsyncCommands<K, V>) commands).sadd(key, value);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
public class RedisSinkConfig extends RedisConfig {

public enum RedisCommand {
HSET, JSONSET, TSADD, SET, XADD, LPUSH, RPUSH, SADD, ZADD, DEL
HSET, JSONSET, JSONMERGE, TSADD, SET, XADD, LPUSH, RPUSH, SADD, ZADD, DEL
}

public static final RedisSinkConfigDef CONFIG = new RedisSinkConfigDef();
Expand All @@ -44,6 +44,10 @@ public enum RedisCommand {

private final Duration waitTimeout;

private final String jsonPath;

private final String fixedJsonPath;

public RedisSinkConfig(Map<?, ?> originals) {
super(new RedisSinkConfigDef(), originals);
String charsetName = getString(RedisSinkConfigDef.CHARSET_CONFIG).trim();
Expand All @@ -54,6 +58,14 @@ public RedisSinkConfig(Map<?, ?> originals) {
multiExec = Boolean.TRUE.equals(getBoolean(RedisSinkConfigDef.MULTIEXEC_CONFIG));
waitReplicas = getInt(RedisSinkConfigDef.WAIT_REPLICAS_CONFIG);
waitTimeout = Duration.ofMillis(getLong(RedisSinkConfigDef.WAIT_TIMEOUT_CONFIG));

if (command == RedisCommand.JSONMERGE) {
jsonPath = getString(RedisSinkConfigDef.JSON_PATH_CONFIG).trim();
fixedJsonPath = getString(RedisSinkConfigDef.FIXED_JSON_PATH_CONFIG).trim();
} else {
jsonPath = null;
fixedJsonPath = null;
}
}

public Charset getCharset() {
Expand Down Expand Up @@ -84,6 +96,15 @@ public Duration getWaitTimeout() {
return waitTimeout;
}

public String getJsonPath() {
return jsonPath;
}

public String getFixedJsonPath() {
return fixedJsonPath;
}


@Override
public int hashCode() {
final int prime = 31;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,18 @@ public class RedisSinkConfigDef extends RedisConfigDef {

public static final RedisCommand COMMAND_DEFAULT = RedisCommand.XADD;

public static final String JSON_PATH_CONFIG = "redis.json.path";

public static final String JSON_PATH_DEFAULT = "$";

public static final String JSON_PATH_DOC = "The JSON attribute in the header of the Kafka message based on which the JSON path will be dynamically set.";

public static final String FIXED_JSON_PATH_CONFIG = "redis.json.path.fixed";

public static final String FIXED_JSON_PATH_DEFAULT = "$";

public static final String FIXED_JSON_PATH_DOC = "The fixed JSON path to set within the value in the header of the Kafka message. This path will be used if the dynamic path is not present.";

public static final String COMMAND_DOC = "Destination data structure: "
+ String.join(",", Stream.of(RedisCommand.values()).map(RedisCommand::name).toArray(String[]::new));

Expand All @@ -83,6 +95,8 @@ private void define() {
define(MULTIEXEC_CONFIG, Type.BOOLEAN, MULTIEXEC_DEFAULT, Importance.MEDIUM, MULTIEXEC_DOC);
define(WAIT_REPLICAS_CONFIG, Type.INT, WAIT_REPLICAS_DEFAULT, Importance.MEDIUM, WAIT_REPLICAS_DOC);
define(WAIT_TIMEOUT_CONFIG, Type.LONG, WAIT_TIMEOUT_DEFAULT, Importance.MEDIUM, WAIT_TIMEOUT_DOC);
define(JSON_PATH_CONFIG, Type.STRING, JSON_PATH_DEFAULT, Importance.MEDIUM, JSON_PATH_DOC);
define(FIXED_JSON_PATH_CONFIG, Type.STRING, FIXED_JSON_PATH_DEFAULT, Importance.MEDIUM, FIXED_JSON_PATH_DOC);
}

@Override
Expand All @@ -92,6 +106,17 @@ public Map<String, ConfigValue> validateAll(Map<String, String> props) {
return results;
}
RedisCommand command = redisCommand(props);

// Ensure JSON_PATH_CONFIG and FIXED_JSON_PATH_CONFIG are only set if the command is JSONMERGE
if (command != RedisCommand.JSONMERGE) {
if (props.containsKey(JSON_PATH_CONFIG)) {
results.get(JSON_PATH_CONFIG).addErrorMessage("The JSON path configuration is not allowed unless the command is JSONMERGE.");
}
if (props.containsKey(FIXED_JSON_PATH_CONFIG)) {
results.get(FIXED_JSON_PATH_CONFIG).addErrorMessage("The fixed JSON path configuration is not allowed unless the command is JSONMERGE.");
}
}

String multiexec = props.getOrDefault(MULTIEXEC_CONFIG, MULTIEXEC_DEFAULT).trim();
if (multiexec.equalsIgnoreCase("true") && !MULTI_EXEC_COMMANDS.contains(command)) {
String supportedTypes = String.join(", ", MULTI_EXEC_COMMANDS.stream().map(Enum::name).toArray(String[]::new));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.errors.DataException;
import org.apache.kafka.connect.errors.RetriableException;
import org.apache.kafka.connect.header.Header;
import org.apache.kafka.connect.header.Headers;
import org.apache.kafka.connect.json.JsonConverter;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.sink.SinkTask;
Expand All @@ -56,10 +58,13 @@
import com.redis.spring.batch.writer.WriteOperation;
import com.redis.spring.batch.writer.operation.Del;
import com.redis.spring.batch.writer.operation.Hset;
import com.redis.spring.batch.writer.operation.JsonSet;
//import com.redis.spring.batch.writer.operation.JsonSet;
import com.redis.kafka.connect.operation.JsonSet;
import com.redis.kafka.connect.operation.JsonMerge;
import com.redis.spring.batch.writer.operation.Lpush;
import com.redis.spring.batch.writer.operation.Rpush;
import com.redis.spring.batch.writer.operation.Sadd;
//import com.redis.spring.batch.writer.operation.Sadd;
import com.redis.kafka.connect.operation.Sadd;
import com.redis.spring.batch.writer.operation.Set;
import com.redis.spring.batch.writer.operation.TsAdd;
import com.redis.spring.batch.writer.operation.Xadd;
Expand Down Expand Up @@ -166,7 +171,15 @@ private WriteOperation<byte[], byte[], SinkRecord> operation() {
JsonSet<byte[], byte[], SinkRecord> jsonSet = new JsonSet<>();
jsonSet.setKeyFunction(this::key);
jsonSet.setValueFunction(this::jsonValue);
jsonSet.setConditionFunction(this::isNullValue);
return jsonSet;
case JSONMERGE:
JsonMerge<byte[], byte[], SinkRecord> jsonMerge = new JsonMerge<>();
jsonMerge.setKeyFunction(this::key);
jsonMerge.setValueFunction(this::jsonValue);
jsonMerge.setConditionFunction(this::isNullValue);
jsonMerge.setPathFunction(this::determineJsonPath);
return jsonMerge;
case SET:
Set<byte[], byte[], SinkRecord> set = new Set<>();
set.setKeyFunction(this::key);
Expand All @@ -191,6 +204,7 @@ private WriteOperation<byte[], byte[], SinkRecord> operation() {
Sadd<byte[], byte[], SinkRecord> sadd = new Sadd<>();
sadd.setKeyFunction(this::collectionKey);
sadd.setValueFunction(this::member);
sadd.setConditionFunction(this::isNullValue);
return sadd;
case TSADD:
TsAdd<byte[], byte[], SinkRecord> tsAdd = new TsAdd<>();
Expand All @@ -211,6 +225,10 @@ private WriteOperation<byte[], byte[], SinkRecord> operation() {
}
}

private boolean isNullValue(SinkRecord sinkRecord) {
return sinkRecord.value() == null;
}

private byte[] value(SinkRecord sinkRecord) {
return bytes("value", sinkRecord.value());
}
Expand Down Expand Up @@ -307,6 +325,33 @@ private Map<byte[], byte[]> map(SinkRecord sinkRecord) {
throw new ConnectException("Unsupported source value type: " + sinkRecord.valueSchema().type().name());
}

private String determineJsonPath(SinkRecord record) {
try {
Headers headers = record.headers();
String jsonPath = getJsonPathFromHeader(headers, config.getJsonPath());

if (jsonPath == null) {
return config.getFixedJsonPath();
}

return jsonPath;
} catch (Exception e) {
log.error("Error determining JSON path: ", e);
return config.getFixedJsonPath();
}
}

private String getJsonPathFromHeader(Headers headers, String path) {
for (Header header : headers) {
System.out.println("path "+path);
System.out.println("header "+header);
if (header.key().equals(path)) {
return header.value().toString();
}
}
return null;
}

@Override
public void stop() {
if (writer != null) {
Expand Down
Loading