Skip to content

Commit cc1e19b

Browse files
garyrussellartembilan
authored andcommitted
GH-1210: Add nack capability to MANUAL AckModes
Resolves #1210 * Remove unused variable * Fix race in tests * Fix race in test * Polishing - javadocs and docs
1 parent 04335d1 commit cc1e19b

File tree

6 files changed

+573
-7
lines changed

6 files changed

+573
-7
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 81 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@
7070
import org.springframework.kafka.support.Acknowledgment;
7171
import org.springframework.kafka.support.KafkaUtils;
7272
import org.springframework.kafka.support.LogIfLevelEnabled;
73+
import org.springframework.kafka.support.SeekUtils;
7374
import org.springframework.kafka.support.TopicPartitionOffset;
7475
import org.springframework.kafka.support.TopicPartitionOffset.SeekPosition;
7576
import org.springframework.kafka.support.TransactionSupport;
@@ -523,6 +524,10 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume
523524

524525
private long lastAlertAt = this.lastReceive;
525526

527+
private long nackSleep = -1;
528+
529+
private int nackIndex;
530+
526531
private volatile boolean consumerPaused;
527532

528533
private volatile long lastPoll = System.currentTimeMillis();
@@ -1193,6 +1198,7 @@ private RuntimeException doInvokeBatchListener(final ConsumerRecords<K, V> recor
11931198

11941199
private void invokeBatchOnMessage(final ConsumerRecords<K, V> records, List<ConsumerRecord<K, V>> recordList,
11951200
@SuppressWarnings(RAW_TYPES) Producer producer) throws InterruptedException {
1201+
11961202
if (this.wantsFullRecords) {
11971203
this.batchListener.onMessage(records,
11981204
this.isAnyManualAck
@@ -1203,6 +1209,21 @@ private void invokeBatchOnMessage(final ConsumerRecords<K, V> records, List<Cons
12031209
else {
12041210
doInvokeBatchOnMessage(records, recordList);
12051211
}
1212+
List<ConsumerRecord<?, ?>> toSeek = null;
1213+
if (this.nackSleep >= 0) {
1214+
Iterator<ConsumerRecord<K, V>> iterator = records.iterator();
1215+
int index = 0;
1216+
toSeek = new ArrayList<>();
1217+
while (iterator.hasNext()) {
1218+
ConsumerRecord<K, V> next = iterator.next();
1219+
if (index++ >= this.nackIndex) {
1220+
toSeek.add(next);
1221+
}
1222+
else {
1223+
this.acks.put(next);
1224+
}
1225+
}
1226+
}
12061227
if (!this.isAnyManualAck && !this.autoCommit) {
12071228
for (ConsumerRecord<K, V> record : getHighestOffsetRecords(records)) {
12081229
this.acks.put(record);
@@ -1211,10 +1232,18 @@ private void invokeBatchOnMessage(final ConsumerRecords<K, V> records, List<Cons
12111232
sendOffsetsToTransaction(producer);
12121233
}
12131234
}
1235+
if (this.nackSleep >= 0) {
1236+
if (!this.autoCommit) {
1237+
processCommits();
1238+
}
1239+
SeekUtils.doSeeks(toSeek, this.consumer, null, true, (rec, ex) -> false, this.logger);
1240+
this.nackSleep = -1;
1241+
}
12141242
}
12151243

12161244
private void doInvokeBatchOnMessage(final ConsumerRecords<K, V> records,
12171245
List<ConsumerRecord<K, V>> recordList) {
1246+
12181247
switch (this.listenerType) {
12191248
case ACKNOWLEDGING_CONSUMER_AWARE:
12201249
this.batchListener.onMessage(recordList,
@@ -1301,6 +1330,11 @@ public void doInTransactionWithoutResult(TransactionStatus s) {
13011330
finally {
13021331
TransactionSupport.clearTransactionIdSuffix();
13031332
}
1333+
if (this.nackSleep >= 0) {
1334+
handleNack(records, record);
1335+
break;
1336+
}
1337+
13041338
}
13051339
}
13061340

@@ -1341,9 +1375,35 @@ private void doInvokeWithRecords(final ConsumerRecords<K, V> records) {
13411375
final ConsumerRecord<K, V> record = iterator.next();
13421376
this.logger.trace(() -> "Processing " + record);
13431377
doInvokeRecordListener(record, null, iterator);
1378+
if (this.nackSleep >= 0) {
1379+
handleNack(records, record);
1380+
break;
1381+
}
13441382
}
13451383
}
13461384

1385+
private void handleNack(final ConsumerRecords<K, V> records, final ConsumerRecord<K, V> record) {
1386+
if (!this.autoCommit && !this.isRecordAck) {
1387+
processCommits();
1388+
}
1389+
List<ConsumerRecord<?, ?>> list = new ArrayList<>();
1390+
Iterator<ConsumerRecord<K, V>> iterator2 = records.iterator();
1391+
while (iterator2.hasNext()) {
1392+
ConsumerRecord<K, V> next = iterator2.next();
1393+
if (next.equals(record) || list.size() > 0) {
1394+
list.add(next);
1395+
}
1396+
}
1397+
SeekUtils.doSeeks(list, this.consumer, null, true, (rec, ex) -> false, this.logger);
1398+
try {
1399+
Thread.sleep(this.nackSleep);
1400+
}
1401+
catch (@SuppressWarnings("unused") InterruptedException e) {
1402+
Thread.currentThread().interrupt();
1403+
}
1404+
this.nackSleep = -1;
1405+
}
1406+
13471407
/**
13481408
* Actually invoke the listener.
13491409
* @param record the record.
@@ -1399,7 +1459,9 @@ private void invokeOnMessage(final ConsumerRecord<K, V> record,
13991459
checkDeser(record, ErrorHandlingDeserializer2.KEY_DESERIALIZER_EXCEPTION_HEADER);
14001460
}
14011461
doInvokeOnMessage(record);
1402-
ackCurrent(record, producer);
1462+
if (this.nackSleep < 0) {
1463+
ackCurrent(record, producer);
1464+
}
14031465
}
14041466

14051467
private void doInvokeOnMessage(final ConsumerRecord<K, V> recordArg) {
@@ -1829,11 +1891,17 @@ private final class ConsumerAcknowledgment implements Acknowledgment {
18291891

18301892
@Override
18311893
public void acknowledge() {
1832-
Assert.state(ListenerConsumer.this.isAnyManualAck,
1833-
"A manual ackmode is required for an acknowledging listener");
18341894
processAck(this.record);
18351895
}
18361896

1897+
@Override
1898+
public void nack(long sleep) {
1899+
Assert.state(Thread.currentThread().equals(ListenerConsumer.this.consumerThread),
1900+
"nack() can only be called on the consumer thread");
1901+
Assert.isTrue(sleep >= 0, "sleep cannot be negative");
1902+
ListenerConsumer.this.nackSleep = sleep;
1903+
}
1904+
18371905
@Override
18381906
public String toString() {
18391907
return "Acknowledgment for " + this.record;
@@ -1852,13 +1920,21 @@ private final class ConsumerBatchAcknowledgment implements Acknowledgment {
18521920

18531921
@Override
18541922
public void acknowledge() {
1855-
Assert.state(ListenerConsumer.this.isAnyManualAck,
1856-
"A manual ackmode is required for an acknowledging listener");
18571923
for (ConsumerRecord<K, V> record : getHighestOffsetRecords(this.records)) {
18581924
processAck(record);
18591925
}
18601926
}
18611927

1928+
@Override
1929+
public void nack(int index, long sleep) {
1930+
Assert.state(Thread.currentThread().equals(ListenerConsumer.this.consumerThread),
1931+
"nack() can only be called on the consumer thread");
1932+
Assert.isTrue(sleep >= 0, "sleep cannot be negative");
1933+
Assert.isTrue(index >= 0 && index < this.records.count(), "index out of bounds");
1934+
ListenerConsumer.this.nackIndex = index;
1935+
ListenerConsumer.this.nackSleep = sleep;
1936+
}
1937+
18621938
@Override
18631939
public String toString() {
18641940
return "Acknowledgment for " + this.records;

spring-kafka/src/main/java/org/springframework/kafka/support/Acknowledgment.java

Lines changed: 27 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,9 +28,34 @@
2828
public interface Acknowledgment {
2929

3030
/**
31-
* Invoked when the message for which the acknowledgment has been created has been processed.
32-
* Calling this method implies that all the previous messages in the partition have been processed already.
31+
* Invoked when the record or batch for which the acknowledgment has been created has
32+
* been processed. Calling this method implies that all the previous messages in the
33+
* partition have been processed already.
3334
*/
3435
void acknowledge();
3536

37+
/**
38+
* Negatively acknowledge the current record - discard remaining records from the poll
39+
* and re-seek all partitions so that this record will be redelivered after the sleep
40+
* time. Must be called on the consumer thread.
41+
* @param sleep the time to sleep.
42+
* @since 2.3
43+
*/
44+
default void nack(long sleep) {
45+
throw new UnsupportedOperationException("nack(sleep) is not supported by this Acknowledgment");
46+
}
47+
48+
/**
49+
* Negatively acknowledge the record at an index in a batch - commit the offset(s) of
50+
* records before the index and re-seek the partitions so that the record at the index
51+
* and subsequent records will be redelivered after the sleep time. Must be called on
52+
* the consumer thread.
53+
* @param index the index of the failed record in the batch.
54+
* @param sleep the time to sleep.
55+
* @since 2.3
56+
*/
57+
default void nack(int index, long sleep) {
58+
throw new UnsupportedOperationException("nack(index, sleep) is not supported by this Acknowledgment");
59+
}
60+
3661
}

0 commit comments

Comments
 (0)