1818import java .io .IOException ;
1919import java .util .ArrayList ;
2020import java .util .Collection ;
21+ import java .util .Collections ;
2122import java .util .HashMap ;
2223import java .util .LinkedHashMap ;
2324import java .util .List ;
2829import org .apache .kafka .common .TopicPartition ;
2930import org .apache .kafka .common .config .ConfigException ;
3031import org .apache .kafka .connect .data .Field ;
32+ import org .apache .kafka .connect .data .Schema ;
3133import org .apache .kafka .connect .data .Struct ;
3234import org .apache .kafka .connect .errors .ConnectException ;
3335import org .apache .kafka .connect .errors .DataException ;
36+ import org .apache .kafka .connect .json .JsonConverter ;
3437import org .apache .kafka .connect .sink .SinkRecord ;
3538import org .apache .kafka .connect .sink .SinkTask ;
39+ import org .apache .kafka .connect .storage .Converter ;
3640import org .slf4j .Logger ;
3741import org .slf4j .LoggerFactory ;
3842import org .springframework .batch .item .ExecutionContext ;
@@ -73,6 +77,7 @@ public class RedisEnterpriseSinkTask extends SinkTask {
7377 private RedisEnterpriseSinkConfig config ;
7478 private RedisItemWriter <byte [], byte [], SinkRecord > writer ;
7579 private StatefulRedisConnection <String , String > connection ;
80+ private Converter jsonConverter ;
7681
7782 @ Override
7883 public String version () {
@@ -84,6 +89,8 @@ public void start(final Map<String, String> props) {
8489 config = new RedisEnterpriseSinkConfig (props );
8590 client = RedisModulesClient .create (config .getRedisURI ());
8691 connection = client .connect ();
92+ jsonConverter = new JsonConverter ();
93+ jsonConverter .configure (Collections .singletonMap ("schemas.enable" , "false" ), false );
8794 writer = writer (client ).build ();
8895 writer .open (new ExecutionContext ());
8996 final java .util .Set <TopicPartition > assignment = this .context .assignment ();
@@ -139,7 +146,7 @@ private RedisOperation<byte[], byte[], SinkRecord> operation() {
139146 return Hset .<byte [], byte [], SinkRecord >key (this ::key ).map (this ::map ).del (this ::isDelete ).build ();
140147 case JSON :
141148 return JsonSet .<byte [], byte [], SinkRecord >key (this ::key ).path ("." .getBytes (config .getCharset ()))
142- .value (this ::value ).del (this ::isDelete ).build ();
149+ .value (this ::jsonValue ).del (this ::isDelete ).build ();
143150 case STRING :
144151 return Set .<byte [], byte [], SinkRecord >key (this ::key ).value (this ::value ).del (this ::isDelete ).build ();
145152 case STREAM :
@@ -166,6 +173,16 @@ private byte[] value(SinkRecord record) {
166173 return bytes ("value" , record .value ());
167174 }
168175
176+ private byte [] jsonValue (SinkRecord record ) {
177+ if (record .value () == null ) {
178+ return null ;
179+ }
180+ Schema schema = record .valueSchema ();
181+ Object value = record .value ();
182+
183+ return jsonConverter .fromConnectData (record .topic (), schema , value );
184+ }
185+
169186 private Long longMember (SinkRecord record ) {
170187 Object key = record .key ();
171188 if (key == null ) {
0 commit comments