Skip to content

Commit 8e4d968

Browse files
garyrussellartembilan
authored andcommitted
Try/Catch around error recovery
- if a recoverer `BiConsumer` threw an exception, the seeks would not be performed - also add try/catch around after rollback processor calls ** Manual backport PR required due to new BackOff logic on master **
1 parent b46397c commit 8e4d968

File tree

3 files changed

+101
-9
lines changed

3 files changed

+101
-9
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/FailedRecordTracker.java

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -43,24 +43,31 @@ class FailedRecordTracker {
4343

4444
private final BackOff backOff;
4545

46+
private final LogAccessor logger;
47+
4648
FailedRecordTracker(@Nullable BiConsumer<ConsumerRecord<?, ?>, Exception> recoverer, BackOff backOff,
4749
LogAccessor logger) {
4850

4951
Assert.notNull(backOff, "'backOff' cannot be null");
5052
if (recoverer == null) {
51-
this.recoverer = (rec, thr) -> logger.error(thr, "Backoff " + this.failures.get().getBackOffExecution()
53+
FailedRecord failedRecord = this.failures.get();
54+
this.recoverer = (rec, thr) -> logger.error(thr, "Backoff "
55+
+ (failedRecord == null
56+
? "none"
57+
: failedRecord.getBackOffExecution())
5258
+ " exhausted for " + rec);
5359
}
5460
else {
5561
this.recoverer = recoverer;
5662
}
5763
this.noRetries = backOff.start().nextBackOff() == BackOffExecution.STOP;
5864
this.backOff = backOff;
65+
this.logger = logger;
5966
}
6067

6168
boolean skip(ConsumerRecord<?, ?> record, Exception exception) {
6269
if (this.noRetries) {
63-
this.recoverer.accept(record, exception);
70+
recover(record, exception);
6471
return true;
6572
}
6673
FailedRecord failedRecord = this.failures.get();
@@ -79,12 +86,21 @@ boolean skip(ConsumerRecord<?, ?> record, Exception exception) {
7986
return false;
8087
}
8188
else {
82-
this.recoverer.accept(record, exception);
89+
recover(record, exception);
8390
this.failures.remove();
8491
return true;
8592
}
8693
}
8794

95+
private void recover(ConsumerRecord<?, ?> record, Exception exception) {
96+
try {
97+
this.recoverer.accept(record, exception);
98+
}
99+
catch (Exception ex) {
100+
this.logger.error(ex, "Recoverer threw exception");
101+
}
102+
}
103+
88104
private boolean newFailure(ConsumerRecord<?, ?> record, FailedRecord failedRecord) {
89105
return !failedRecord.getTopic().equals(record.topic())
90106
|| failedRecord.getPartition() != record.partition()

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1133,11 +1133,16 @@ private void batchAfterRollback(final ConsumerRecords<K, V> records,
11331133
final List<ConsumerRecord<K, V>> recordList, RuntimeException e,
11341134
AfterRollbackProcessor<K, V> afterRollbackProcessorToUse) {
11351135

1136-
if (recordList == null) {
1137-
afterRollbackProcessorToUse.process(createRecordList(records), this.consumer, e, false);
1136+
try {
1137+
if (recordList == null) {
1138+
afterRollbackProcessorToUse.process(createRecordList(records), this.consumer, e, false);
1139+
}
1140+
else {
1141+
afterRollbackProcessorToUse.process(recordList, this.consumer, e, false);
1142+
}
11381143
}
1139-
else {
1140-
afterRollbackProcessorToUse.process(recordList, this.consumer, e, false);
1144+
catch (Exception ex) {
1145+
this.logger.error(ex, "AfterRollbackProcessor threw exception");
11411146
}
11421147
}
11431148

@@ -1321,7 +1326,12 @@ protected void doInTransactionWithoutResult(TransactionStatus status) {
13211326
});
13221327
}
13231328
else {
1324-
afterRollbackProcessorToUse.process(unprocessed, this.consumer, e, true);
1329+
try {
1330+
afterRollbackProcessorToUse.process(unprocessed, this.consumer, e, true);
1331+
}
1332+
catch (Exception ex) {
1333+
this.logger.error(ex, "AfterRollbackProcessor threw exception");
1334+
}
13251335
}
13261336
}
13271337

spring-kafka/src/test/java/org/springframework/kafka/listener/TransactionalContainerTests.java

Lines changed: 67 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@
4646
import java.util.concurrent.TimeUnit;
4747
import java.util.concurrent.atomic.AtomicBoolean;
4848
import java.util.concurrent.atomic.AtomicReference;
49+
import java.util.function.BiConsumer;
4950

5051
import org.apache.commons.logging.LogFactory;
5152
import org.apache.kafka.clients.consumer.Consumer;
@@ -112,9 +113,11 @@ public class TransactionalContainerTests {
112113

113114
private static String topic3DLT = "txTopic3.DLT";
114115

116+
private static String topic4 = "txTopic4";
117+
115118
@ClassRule
116119
public static EmbeddedKafkaRule embeddedKafkaRule = new EmbeddedKafkaRule(1, true, topic1, topic2, topic3,
117-
topic3DLT)
120+
topic3DLT, topic4)
118121
.brokerProperty(KafkaConfig.TransactionsTopicReplicationFactorProp(), "1")
119122
.brokerProperty(KafkaConfig.TransactionsTopicMinISRProp(), "1");
120123

@@ -595,6 +598,69 @@ public void accept(ConsumerRecord<?, ?> record, Exception exception) {
595598
logger.info("Stop testMaxAttempts");
596599
}
597600

601+
@SuppressWarnings("unchecked")
602+
@Test
603+
public void testRollbackProcessorCrash() throws Exception {
604+
logger.info("Start testRollbackNoRetries");
605+
Map<String, Object> props = KafkaTestUtils.consumerProps("testRollbackNoRetries", "false", embeddedKafka);
606+
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
607+
props.put(ConsumerConfig.GROUP_ID_CONFIG, "group");
608+
props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
609+
DefaultKafkaConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<>(props);
610+
ContainerProperties containerProps = new ContainerProperties(topic4);
611+
containerProps.setPollTimeout(10_000);
612+
613+
Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);
614+
senderProps.put(ProducerConfig.RETRIES_CONFIG, 1);
615+
DefaultKafkaProducerFactory<Object, Object> pf = new DefaultKafkaProducerFactory<>(senderProps);
616+
pf.setTransactionIdPrefix("noRetries.");
617+
final KafkaTemplate<Object, Object> template = new KafkaTemplate<>(pf);
618+
final CountDownLatch latch = new CountDownLatch(1);
619+
AtomicReference<String> data = new AtomicReference<>();
620+
containerProps.setMessageListener((MessageListener<Integer, String>) message -> {
621+
data.set(message.value());
622+
if (message.offset() == 0) {
623+
throw new RuntimeException("fail for no retry");
624+
}
625+
latch.countDown();
626+
});
627+
628+
@SuppressWarnings({ "rawtypes" })
629+
KafkaTransactionManager tm = new KafkaTransactionManager(pf);
630+
containerProps.setTransactionManager(tm);
631+
KafkaMessageListenerContainer<Integer, String> container =
632+
new KafkaMessageListenerContainer<>(cf, containerProps);
633+
container.setBeanName("testRollbackNoRetries");
634+
BiConsumer<ConsumerRecord<?, ?>, Exception> recoverer = (rec, ex) -> {
635+
throw new RuntimeException("arbp fail");
636+
};
637+
DefaultAfterRollbackProcessor<Object, Object> afterRollbackProcessor =
638+
spy(new DefaultAfterRollbackProcessor<>(recoverer, new FixedBackOff(0L, 0L)));
639+
container.setAfterRollbackProcessor(afterRollbackProcessor);
640+
final CountDownLatch stopLatch = new CountDownLatch(1);
641+
container.setApplicationEventPublisher(e -> {
642+
if (e instanceof ConsumerStoppedEvent) {
643+
stopLatch.countDown();
644+
}
645+
});
646+
container.start();
647+
648+
template.setDefaultTopic(topic4);
649+
template.executeInTransaction(t -> {
650+
RecordHeaders headers = new RecordHeaders(new RecordHeader[] { new RecordHeader("baz", "qux".getBytes()) });
651+
ProducerRecord<Object, Object> record = new ProducerRecord<>(topic4, 0, 0, "foo", headers);
652+
template.send(record);
653+
template.sendDefault(0, 0, "bar");
654+
return null;
655+
});
656+
assertThat(latch.await(60, TimeUnit.SECONDS)).isTrue();
657+
assertThat(data.get()).isEqualTo("bar");
658+
container.stop();
659+
pf.destroy();
660+
assertThat(stopLatch.await(10, TimeUnit.SECONDS)).isTrue();
661+
logger.info("Stop testRollbackNoRetries");
662+
}
663+
598664
@SuppressWarnings("serial")
599665
public static class SomeOtherTransactionManager extends AbstractPlatformTransactionManager {
600666

0 commit comments

Comments
 (0)