Skip to content

Commit e36c22c

Browse files
artembilansobychacko
authored andcommitted
GH-2981: Suppress abortTransaction exception
Fixes: #2981 When `producer.abortTransaction()` fails, the original exception is lost in `KafkaTemplate`. * Catch an exception on `producer.abortTransaction()` and `ex.addSuppressed(abortException)` **Cherry-pick to `3.0.x`**
1 parent 659b066 commit e36c22c

File tree

2 files changed

+34
-4
lines changed

2 files changed

+34
-4
lines changed

spring-kafka/src/main/java/org/springframework/kafka/core/KafkaTemplate.java

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -655,9 +655,14 @@ public <T> T executeInTransaction(OperationsCallback<K, V, T> callback) {
655655
catch (SkipAbortException e) { // NOSONAR - exception flow control
656656
throw ((RuntimeException) e.getCause()); // NOSONAR - lost stack trace
657657
}
658-
catch (Exception e) {
659-
producer.abortTransaction();
660-
throw e;
658+
catch (Exception ex) {
659+
try {
660+
producer.abortTransaction();
661+
}
662+
catch (Exception abortException) {
663+
ex.addSuppressed(abortException);
664+
}
665+
throw ex;
661666
}
662667
finally {
663668
this.producers.remove();

spring-kafka/src/test/java/org/springframework/kafka/core/KafkaTemplateTransactionTests.java

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2017-2023 the original author or authors.
2+
* Copyright 2017-2024 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -505,6 +505,31 @@ public void testAbort() {
505505
verify(producer, never()).commitTransaction();
506506
}
507507

508+
@Test
509+
public void abortFiledOriginalExceptionRethrown() {
510+
MockProducer<String, String> producer = spy(new MockProducer<>());
511+
producer.initTransactions();
512+
producer.abortTransactionException = new RuntimeException("abort failed");
513+
514+
ProducerFactory<String, String> pf = new MockProducerFactory<>((tx, id) -> producer, null);
515+
516+
KafkaTemplate<String, String> template = new KafkaTemplate<>(pf);
517+
template.setDefaultTopic(STRING_KEY_TOPIC);
518+
519+
assertThatExceptionOfType(RuntimeException.class)
520+
.isThrownBy(() ->
521+
template.executeInTransaction(t -> {
522+
throw new RuntimeException("intentional");
523+
}))
524+
.withMessage("intentional")
525+
.withStackTraceContaining("abort failed");
526+
527+
assertThat(producer.transactionCommitted()).isFalse();
528+
assertThat(producer.transactionAborted()).isFalse();
529+
assertThat(producer.closed()).isTrue();
530+
verify(producer, never()).commitTransaction();
531+
}
532+
508533
@Test
509534
public void testExecuteInTransactionNewInnerTx() {
510535
@SuppressWarnings("unchecked")

0 commit comments

Comments
 (0)