diff --git a/core/redis-kafka-connect/src/main/java/com/redis/kafka/connect/source/RedisStreamSourceTask.java b/core/redis-kafka-connect/src/main/java/com/redis/kafka/connect/source/RedisStreamSourceTask.java index a18b7b5..3f63cbe 100644 --- a/core/redis-kafka-connect/src/main/java/com/redis/kafka/connect/source/RedisStreamSourceTask.java +++ b/core/redis-kafka-connect/src/main/java/com/redis/kafka/connect/source/RedisStreamSourceTask.java @@ -51,8 +51,10 @@ public class RedisStreamSourceTask extends SourceTask { /** * The offsets that have been processed and that are to be acknowledged by the * reader in {@link RedisStreamSourceTask#commit()}. + * Using AtomicReference for atomic swap operations to prevent race conditions. */ - private final List> sourceOffsets = new ArrayList<>(); + private final java.util.concurrent.atomic.AtomicReference>> sourceOffsets = + new java.util.concurrent.atomic.AtomicReference<>(new java.util.concurrent.ConcurrentLinkedQueue<>()); private final Clock clock; private RedisStreamSourceConfig config; @@ -111,19 +113,31 @@ private Optional> offsetMap() { public void commitRecord(SourceRecord sourceRecord, RecordMetadata metadata) throws InterruptedException { Map currentOffset = sourceRecord.sourceOffset(); if (currentOffset != null) { - sourceOffsets.add(currentOffset); + sourceOffsets.get().offer(currentOffset); } } @Override public void commit() throws InterruptedException { if (reader != null) { - String[] ids = sourceOffsets.stream().map(m -> (String) m.get(OFFSET_FIELD)).toArray(String[]::new); - try { - reader.ack(config.getStreamName(), ids); - sourceOffsets.clear(); - } catch (Exception e) { - throw new ConnectException("Could not ack offsets to Redis", e); + java.util.concurrent.ConcurrentLinkedQueue> currentQueue = + sourceOffsets.getAndSet(new java.util.concurrent.ConcurrentLinkedQueue<>()); + + java.util.List> offsetsToCommit = new java.util.ArrayList<>(currentQueue); + + if (!offsetsToCommit.isEmpty()) { + try { + String[] ids = offsetsToCommit.stream() + .map(m -> (String) m.get(OFFSET_FIELD)) + .toArray(String[]::new); + reader.ack(config.getStreamName(), ids); + } catch (Exception e) { + java.util.concurrent.ConcurrentLinkedQueue> newQueue = sourceOffsets.get(); + for (Map failedOffset : offsetsToCommit) { + newQueue.offer(failedOffset); + } + throw new ConnectException("Could not ack offsets to Redis, offsets restored for retry", e); + } } } }