Skip to content

Commit 72f89a2

Browse files
committed
Fix manual acks with transactions
- send the offset to the transaction instead of committing via the consumer.
1 parent badcb8a commit 72f89a2

File tree

2 files changed

+55
-15
lines changed

2 files changed

+55
-15
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -361,6 +361,8 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume
361361

362362
private boolean taskSchedulerExplicitlySet;
363363

364+
private Producer<?, ?> producer;
365+
364366
private volatile long lastPoll = System.currentTimeMillis();
365367

366368
@SuppressWarnings("unchecked")
@@ -776,7 +778,10 @@ private void ackImmediate(ConsumerRecord<K, V> record) {
776778
if (ListenerConsumer.this.logger.isDebugEnabled()) {
777779
ListenerConsumer.this.logger.debug("Committing: " + commits);
778780
}
779-
if (this.containerProperties.isSyncCommits()) {
781+
if (this.producer != null) {
782+
this.producer.sendOffsetsToTransaction(commits, this.consumerGroupId);
783+
}
784+
else if (this.containerProperties.isSyncCommits()) {
780785
ListenerConsumer.this.consumer.commitSync(commits);
781786
}
782787
else {
@@ -821,7 +826,9 @@ public void doInTransactionWithoutResult(TransactionStatus s) {
821826
Producer producer = null;
822827
if (ListenerConsumer.this.kafkaTxManager != null) {
823828
producer = ((KafkaResourceHolder) TransactionSynchronizationManager
824-
.getResource(ListenerConsumer.this.kafkaTxManager.getProducerFactory())).getProducer();
829+
.getResource(ListenerConsumer.this.kafkaTxManager.getProducerFactory()))
830+
.getProducer(); // NOSONAR nullable
831+
ListenerConsumer.this.producer = producer;
825832
}
826833
RuntimeException aborted = doInvokeBatchListener(records, recordList, producer);
827834
if (aborted != null) {
@@ -931,7 +938,9 @@ public void doInTransactionWithoutResult(TransactionStatus s) {
931938
Producer producer = null;
932939
if (ListenerConsumer.this.kafkaTxManager != null) {
933940
producer = ((KafkaResourceHolder) TransactionSynchronizationManager
934-
.getResource(ListenerConsumer.this.kafkaTxManager.getProducerFactory())).getProducer();
941+
.getResource(ListenerConsumer.this.kafkaTxManager.getProducerFactory()))
942+
.getProducer(); // NOSONAR
943+
ListenerConsumer.this.producer = producer;
935944
}
936945
RuntimeException aborted = doInvokeRecordListener(record, producer);
937946
if (aborted != null) {
@@ -987,7 +996,9 @@ private RuntimeException doInvokeRecordListener(final ConsumerRecord<K, V> recor
987996
else {
988997
this.listener.onMessage(record);
989998
}
990-
ackCurrent(record, producer);
999+
if (!this.isManualImmediateAck) {
1000+
ackCurrent(record, producer);
1001+
}
9911002
}
9921003
catch (RuntimeException e) {
9931004
if (this.containerProperties.isAckOnError() && !this.autoCommit && producer == null) {

spring-kafka/src/test/java/org/springframework/kafka/listener/TransactionalContainerTests.java

Lines changed: 40 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2017-2018 the original author or authors.
2+
* Copyright 2017-2020 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -67,7 +67,9 @@
6767
import org.springframework.kafka.core.KafkaTemplate;
6868
import org.springframework.kafka.core.ProducerFactory;
6969
import org.springframework.kafka.core.ProducerFactoryUtils;
70+
import org.springframework.kafka.listener.AbstractMessageListenerContainer.AckMode;
7071
import org.springframework.kafka.listener.config.ContainerProperties;
72+
import org.springframework.kafka.support.Acknowledgment;
7173
import org.springframework.kafka.support.TopicPartitionInitialOffset;
7274
import org.springframework.kafka.support.TransactionSupport;
7375
import org.springframework.kafka.test.rule.KafkaEmbedded;
@@ -98,21 +100,28 @@ public class TransactionalContainerTests {
98100

99101
@Test
100102
public void testConsumeAndProduceTransactionKTM() throws Exception {
101-
testConsumeAndProduceTransactionGuts(false, false);
103+
testConsumeAndProduceTransactionGuts(false, false, AckMode.RECORD);
102104
}
103105

104106
@Test
105107
public void testConsumeAndProduceTransactionKCTM() throws Exception {
106-
testConsumeAndProduceTransactionGuts(true, false);
108+
testConsumeAndProduceTransactionGuts(true, false, AckMode.RECORD);
107109
}
108110

109111
@Test
110112
public void testConsumeAndProduceTransactionHandleError() throws Exception {
111-
testConsumeAndProduceTransactionGuts(false, true);
113+
testConsumeAndProduceTransactionGuts(false, true, AckMode.RECORD);
114+
}
115+
116+
@Test
117+
public void testConsumeAndProduceTransactionKTMManual() throws Exception {
118+
testConsumeAndProduceTransactionGuts(false, false, AckMode.MANUAL_IMMEDIATE);
112119
}
113120

114121
@SuppressWarnings({ "rawtypes", "unchecked" })
115-
private void testConsumeAndProduceTransactionGuts(boolean chained, boolean handleError) throws Exception {
122+
private void testConsumeAndProduceTransactionGuts(boolean chained, boolean handleError, AckMode ackMode)
123+
throws Exception {
124+
116125
Consumer consumer = mock(Consumer.class);
117126
final TopicPartition topicPartition = new TopicPartition("foo", 0);
118127
willAnswer(i -> {
@@ -122,14 +131,15 @@ private void testConsumeAndProduceTransactionGuts(boolean chained, boolean handl
122131
}).given(consumer).subscribe(any(Collection.class), any(ConsumerRebalanceListener.class));
123132
ConsumerRecords records = new ConsumerRecords(Collections.singletonMap(topicPartition,
124133
Collections.singletonList(new ConsumerRecord<>("foo", 0, 0, "key", "value"))));
134+
ConsumerRecords empty = new ConsumerRecords(Collections.emptyMap());
125135
final AtomicBoolean done = new AtomicBoolean();
126136
willAnswer(i -> {
127137
if (done.compareAndSet(false, true)) {
128138
return records;
129139
}
130140
else {
131141
Thread.sleep(500);
132-
return null;
142+
return empty;
133143
}
134144
}).given(consumer).poll(anyLong());
135145
ConsumerFactory cf = mock(ConsumerFactory.class);
@@ -149,15 +159,34 @@ private void testConsumeAndProduceTransactionGuts(boolean chained, boolean handl
149159
}).given(pf).createProducer();
150160
KafkaTransactionManager tm = new KafkaTransactionManager(pf);
151161
ContainerProperties props = new ContainerProperties("foo");
162+
props.setAckMode(ackMode);
152163
props.setGroupId("group");
153164
props.setTransactionManager(tm);
154165
final KafkaTemplate template = new KafkaTemplate(pf);
155-
props.setMessageListener((MessageListener) m -> {
156-
template.send("bar", "baz");
157-
if (handleError) {
158-
throw new RuntimeException("fail");
166+
if (AckMode.MANUAL_IMMEDIATE.equals(ackMode)) {
167+
class AckListener implements AcknowledgingMessageListener {
168+
// not a lambda https://bugs.openjdk.java.net/browse/JDK-8074381
169+
170+
@Override
171+
public void onMessage(Object data, Acknowledgment acknowledgment) {
172+
template.send("bar", "baz");
173+
if (handleError) {
174+
throw new RuntimeException("fail");
175+
}
176+
acknowledgment.acknowledge();
177+
}
178+
159179
}
160-
});
180+
props.setMessageListener(new AckListener());
181+
}
182+
else {
183+
props.setMessageListener((MessageListener) m -> {
184+
template.send("bar", "baz");
185+
if (handleError) {
186+
throw new RuntimeException("fail");
187+
}
188+
});
189+
}
161190
if (handleError) {
162191
props.setErrorHandler((e, data) -> {
163192
});

0 commit comments

Comments
 (0)