1616package com .redis .kafka .connect .sink ;
1717
1818import java .io .IOException ;
19+ import java .time .Duration ;
1920import java .util .ArrayList ;
2021import java .util .Collection ;
2122import java .util .Collections ;
2930import org .apache .kafka .common .TopicPartition ;
3031import org .apache .kafka .common .config .ConfigException ;
3132import org .apache .kafka .connect .data .Field ;
32- import org .apache .kafka .connect .data .Schema ;
3333import org .apache .kafka .connect .data .Struct ;
3434import org .apache .kafka .connect .errors .ConnectException ;
3535import org .apache .kafka .connect .errors .DataException ;
4949import com .redis .kafka .connect .RedisEnterpriseSinkConnector ;
5050import com .redis .lettucemod .RedisModulesClient ;
5151import com .redis .spring .batch .RedisItemWriter ;
52- import com .redis .spring .batch .RedisItemWriter .OperationItemWriterBuilder ;
53- import com .redis .spring .batch .RedisItemWriter .RedisItemWriterBuilder ;
54- import com .redis .spring .batch .support . RedisOperation ;
55- import com .redis .spring .batch .support . convert .SampleConverter ;
56- import com .redis .spring .batch .support . convert . ScoredValueConverter ;
57- import com .redis .spring .batch .support .operation .Hset ;
58- import com .redis .spring .batch .support .operation .JsonSet ;
59- import com .redis .spring .batch .support .operation .Lpush ;
60- import com .redis .spring .batch .support .operation .Rpush ;
61- import com .redis .spring .batch .support .operation .Sadd ;
62- import com .redis .spring .batch .support .operation .Set ;
63- import com .redis .spring .batch .support .operation .TsAdd ;
64- import com .redis .spring .batch .support .operation .Xadd ;
65- import com .redis .spring .batch .support .operation .Zadd ;
52+ import com .redis .spring .batch .RedisItemWriter .OperationBuilder ;
53+ import com .redis .spring .batch .RedisItemWriter .WaitForReplication ;
54+ import com .redis .spring .batch .convert . SampleConverter ;
55+ import com .redis .spring .batch .convert .ScoredValueConverter ;
56+ import com .redis .spring .batch .writer . RedisOperation ;
57+ import com .redis .spring .batch .writer .operation .Hset ;
58+ import com .redis .spring .batch .writer .operation .JsonSet ;
59+ import com .redis .spring .batch .writer .operation .Lpush ;
60+ import com .redis .spring .batch .writer .operation .Rpush ;
61+ import com .redis .spring .batch .writer .operation .Sadd ;
62+ import com .redis .spring .batch .writer .operation .Set ;
63+ import com .redis .spring .batch .writer .operation .TsAdd ;
64+ import com .redis .spring .batch .writer .operation .Xadd ;
65+ import com .redis .spring .batch .writer .operation .Zadd ;
6666
6767import io .lettuce .core .KeyValue ;
6868import io .lettuce .core .api .StatefulRedisConnection ;
@@ -90,7 +90,7 @@ public void start(final Map<String, String> props) {
9090 client = RedisModulesClient .create (config .getRedisURI ());
9191 connection = client .connect ();
9292 jsonConverter = new JsonConverter ();
93- jsonConverter .configure (Collections .singletonMap ("schemas.enable" , "false" ), false );
93+ jsonConverter .configure (Collections .singletonMap ("schemas.enable" , "false" ), false );
9494 writer = writer (client ).build ();
9595 writer .open (new ExecutionContext ());
9696 final java .util .Set <TopicPartition > assignment = this .context .assignment ();
@@ -124,14 +124,15 @@ private Collection<SinkOffsetState> offsetStates(java.util.Set<TopicPartition> a
124124 return offsetStates ;
125125 }
126126
127- private RedisItemWriterBuilder <byte [], byte [], SinkRecord > writer (RedisModulesClient client ) {
128- RedisItemWriterBuilder <byte [], byte [], SinkRecord > builder = new OperationItemWriterBuilder <>(client ,
127+ private RedisItemWriter . Builder <byte [], byte [], SinkRecord > writer (RedisModulesClient client ) {
128+ RedisItemWriter . Builder <byte [], byte [], SinkRecord > builder = new OperationBuilder <>(client ,
129129 new ByteArrayCodec ()).operation (operation ());
130130 if (Boolean .TRUE .equals (config .isMultiexec ())) {
131131 builder .multiExec ();
132132 }
133133 if (config .getWaitReplicas () > 0 ) {
134- builder .waitForReplication (config .getWaitReplicas (), config .getWaitTimeout ());
134+ builder .waitForReplication (WaitForReplication .builder ().replicas (config .getWaitReplicas ())
135+ .timeout (Duration .ofMillis (config .getWaitTimeout ())).build ());
135136 }
136137 return builder ;
137138 }
@@ -169,22 +170,26 @@ private RedisOperation<byte[], byte[], SinkRecord> operation() {
169170 }
170171 }
171172
172- private byte [] value (SinkRecord record ) {
173- return bytes ("value" , record .value ());
173+ private byte [] value (SinkRecord sinkRecord ) {
174+ return bytes ("value" , sinkRecord .value ());
174175 }
175176
176- private byte [] jsonValue (SinkRecord record ) {
177- if (record .value () == null ) {
177+ private byte [] jsonValue (SinkRecord sinkRecord ) {
178+ Object value = sinkRecord .value ();
179+ if (value == null ) {
178180 return null ;
179181 }
180- Schema schema = record .valueSchema ();
181- Object value = record .value ();
182-
183- return jsonConverter .fromConnectData (record .topic (), schema , value );
182+ if (value instanceof byte []) {
183+ return (byte []) value ;
184+ }
185+ if (value instanceof String ) {
186+ return ((String ) value ).getBytes (config .getCharset ());
187+ }
188+ return jsonConverter .fromConnectData (sinkRecord .topic (), sinkRecord .valueSchema (), value );
184189 }
185190
186- private Long longMember (SinkRecord record ) {
187- Object key = record .key ();
191+ private Long longMember (SinkRecord sinkRecord ) {
192+ Object key = sinkRecord .key ();
188193 if (key == null ) {
189194 return null ;
190195 }
@@ -195,8 +200,8 @@ private Long longMember(SinkRecord record) {
195200 "The key for the record must be a number. Consider using a single message transformation to transform the data before it is written to Redis." );
196201 }
197202
198- private Double doubleValue (SinkRecord record ) {
199- Object value = record .value ();
203+ private Double doubleValue (SinkRecord sinkRecord ) {
204+ Object value = sinkRecord .value ();
200205 if (value == null ) {
201206 return null ;
202207 }
@@ -207,25 +212,25 @@ private Double doubleValue(SinkRecord record) {
207212 "The value for the record must be a number. Consider using a single message transformation to transform the data before it is written to Redis." );
208213 }
209214
210- private boolean isDelete (SinkRecord record ) {
211- return record .value () == null ;
215+ private boolean isDelete (SinkRecord sinkRecord ) {
216+ return sinkRecord .value () == null ;
212217 }
213218
214- private byte [] key (SinkRecord record ) {
219+ private byte [] key (SinkRecord sinkRecord ) {
215220 if (config .getKeyspace ().isEmpty ()) {
216- return bytes ("key" , record .key ());
221+ return bytes ("key" , sinkRecord .key ());
217222 }
218- String keyspace = keyspace (record );
219- String key = keyspace + config .getSeparator () + String .valueOf (record .key ());
223+ String keyspace = keyspace (sinkRecord );
224+ String key = keyspace + config .getSeparator () + String .valueOf (sinkRecord .key ());
220225 return key .getBytes (config .getCharset ());
221226 }
222227
223- private byte [] member (SinkRecord record ) {
224- return bytes ("key" , record .key ());
228+ private byte [] member (SinkRecord sinkRecord ) {
229+ return bytes ("key" , sinkRecord .key ());
225230 }
226231
227- private String keyspace (SinkRecord record ) {
228- return config .getKeyspace ().replace (RedisEnterpriseSinkConfig .TOKEN_TOPIC , record .topic ());
232+ private String keyspace (SinkRecord sinkRecord ) {
233+ return config .getKeyspace ().replace (RedisEnterpriseSinkConfig .TOKEN_TOPIC , sinkRecord .topic ());
229234 }
230235
231236 private byte [] bytes (String source , Object input ) {
@@ -243,13 +248,13 @@ private byte[] bytes(String source, Object input) {
243248 source ));
244249 }
245250
246- private byte [] collectionKey (SinkRecord record ) {
247- return keyspace (record ).getBytes (config .getCharset ());
251+ private byte [] collectionKey (SinkRecord sinkRecord ) {
252+ return keyspace (sinkRecord ).getBytes (config .getCharset ());
248253 }
249254
250255 @ SuppressWarnings ("unchecked" )
251- private Map <byte [], byte []> map (SinkRecord record ) {
252- Object value = record .value ();
256+ private Map <byte [], byte []> map (SinkRecord sinkRecord ) {
257+ Object value = sinkRecord .value ();
253258 if (value == null ) {
254259 return null ;
255260 }
@@ -272,7 +277,7 @@ private Map<byte[], byte[]> map(SinkRecord record) {
272277 }
273278 return body ;
274279 }
275- throw new ConnectException ("Unsupported source value type: " + record .valueSchema ().type ().name ());
280+ throw new ConnectException ("Unsupported source value type: " + sinkRecord .valueSchema ().type ().name ());
276281 }
277282
278283 @ Override
0 commit comments