Skip to content

Commit 678a21b

Browse files
garyrussellartembilan
authored andcommitted
GH-1196: Use close(Duration) instead of close()
Resolves #1196 Add `closeTimeout` to `KafkaTemplate` and `KafkaTransactionManager` (default 5s). Use a zero timeout if a transaction operation failed with a timeout.
1 parent 8ed5289 commit 678a21b

File tree

8 files changed

+184
-47
lines changed

8 files changed

+184
-47
lines changed

spring-kafka/src/main/java/org/springframework/kafka/core/DefaultKafkaProducerFactory.java

Lines changed: 19 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
import org.apache.kafka.common.PartitionInfo;
4343
import org.apache.kafka.common.TopicPartition;
4444
import org.apache.kafka.common.errors.ProducerFencedException;
45+
import org.apache.kafka.common.errors.TimeoutException;
4546
import org.apache.kafka.common.serialization.Serializer;
4647

4748
import org.springframework.beans.factory.DisposableBean;
@@ -123,8 +124,10 @@ public void setValueSerializer(Serializer<V> valueSerializer) {
123124
}
124125

125126
/**
126-
* The time to wait when physically closing the producer (when {@link #stop()} or {@link #destroy()} is invoked.
127-
* Specified in seconds; default {@value #DEFAULT_PHYSICAL_CLOSE_TIMEOUT}.
127+
* The time to wait when physically closing the producer via the factory rather than
128+
* closing the producer itself (when {@link #reset()}, {@link #destroy() or
129+
* #closeProducerFor(String)} are invoked). Specified in seconds; default
130+
* {@link #DEFAULT_PHYSICAL_CLOSE_TIMEOUT}.
128131
* @param physicalCloseTimeout the timeout in seconds.
129132
* @since 1.0.7
130133
*/
@@ -179,7 +182,7 @@ public boolean transactionCapable() {
179182

180183
@SuppressWarnings("resource")
181184
@Override
182-
public void destroy() throws Exception { //NOSONAR
185+
public void destroy() {
183186
CloseSafeProducer<K, V> producer = this.producer;
184187
this.producer = null;
185188
if (producer != null) {
@@ -334,7 +337,7 @@ protected static class CloseSafeProducer<K, V> implements Producer<K, V> {
334337

335338
private final String txId;
336339

337-
private volatile boolean txFailed;
340+
private volatile Exception txFailed;
338341

339342
CloseSafeProducer(Producer<K, V> delegate) {
340343
this(delegate, null, null);
@@ -402,7 +405,7 @@ public void beginTransaction() throws ProducerFencedException {
402405
if (logger.isErrorEnabled()) {
403406
logger.error("beginTransaction failed: " + this, e);
404407
}
405-
this.txFailed = true;
408+
this.txFailed = e;
406409
throw e;
407410
}
408411
}
@@ -426,7 +429,7 @@ public void commitTransaction() throws ProducerFencedException {
426429
if (logger.isErrorEnabled()) {
427430
logger.error("commitTransaction failed: " + this, e);
428431
}
429-
this.txFailed = true;
432+
this.txFailed = e;
430433
throw e;
431434
}
432435
}
@@ -443,7 +446,7 @@ public void abortTransaction() throws ProducerFencedException {
443446
if (logger.isErrorEnabled()) {
444447
logger.error("Abort failed: " + this, e);
445448
}
446-
this.txFailed = true;
449+
this.txFailed = e;
447450
throw e;
448451
}
449452
}
@@ -456,17 +459,16 @@ public void close() {
456459
@Override
457460
public void close(long timeout, TimeUnit unit) {
458461
if (this.cache != null) {
459-
if (this.txFailed) {
462+
long closeTimeout = this.txFailed instanceof TimeoutException || unit == null
463+
? 0L
464+
: timeout;
465+
if (this.txFailed != null) {
460466
if (logger.isWarnEnabled()) {
461-
logger.warn("Error during transactional operation; producer removed from cache; possible cause: "
462-
+ "broker restarted during transaction: " + this);
463-
}
464-
if (unit == null) {
465-
this.delegate.close();
466-
}
467-
else {
468-
this.delegate.close(timeout, unit);
467+
logger.warn("Error during transactional operation; producer removed from cache; "
468+
+ "possible cause: "
469+
+ "broker restarted during transaction: " + this);
469470
}
471+
this.delegate.close(closeTimeout, unit);
470472
if (this.consumerProducers != null) {
471473
removeConsumerProducer();
472474
}
@@ -476,12 +478,7 @@ public void close(long timeout, TimeUnit unit) {
476478
synchronized (this) {
477479
if (!this.cache.contains(this)
478480
&& !this.cache.offer(this)) {
479-
if (unit == null) {
480-
this.delegate.close();
481-
}
482-
else {
483-
this.delegate.close(timeout, unit);
484-
}
481+
this.delegate.close(closeTimeout, unit);
485482
}
486483
}
487484
}

spring-kafka/src/main/java/org/springframework/kafka/core/KafkaResourceHolder.java

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,12 @@
1616

1717
package org.springframework.kafka.core;
1818

19+
import java.util.concurrent.TimeUnit;
20+
1921
import org.apache.kafka.clients.producer.Producer;
2022

2123
import org.springframework.transaction.support.ResourceHolderSupport;
24+
import org.springframework.util.Assert;
2225

2326
/**
2427
* Kafka resource holder, wrapping a Kafka producer. KafkaTransactionManager binds instances of this
@@ -33,12 +36,27 @@ public class KafkaResourceHolder<K, V> extends ResourceHolderSupport {
3336

3437
private final Producer<K, V> producer;
3538

39+
private final long closeTimeout;
40+
3641
/**
3742
* Construct an instance for the producer.
3843
* @param producer the producer.
3944
*/
4045
public KafkaResourceHolder(Producer<K, V> producer) {
46+
this(producer, ProducerFactoryUtils.DEFAULT_CLOSE_TIMEOUT);
47+
}
48+
49+
/**
50+
* Construct an instance for the producer.
51+
* @param producer the producer.
52+
* @param closeTimeout the close timeout.
53+
* @since 1.3.11
54+
*/
55+
public KafkaResourceHolder(Producer<K, V> producer, long closeTimeout) {
56+
Assert.notNull(producer, "'producer' cannot be null");
57+
Assert.notNull(closeTimeout, "'closeTimeout' cannot be null");
4158
this.producer = producer;
59+
this.closeTimeout = closeTimeout;
4260
}
4361

4462
public Producer<K, V> getProducer() {
@@ -50,7 +68,7 @@ public void commit() {
5068
}
5169

5270
public void close() {
53-
this.producer.close();
71+
this.producer.close(this.closeTimeout, TimeUnit.MILLISECONDS);
5472
}
5573

5674
public void rollback() {

spring-kafka/src/main/java/org/springframework/kafka/core/KafkaTemplate.java

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
import java.util.List;
2020
import java.util.Map;
21+
import java.util.concurrent.TimeUnit;
2122

2223
import org.apache.commons.logging.Log;
2324
import org.apache.commons.logging.LogFactory;
@@ -77,6 +78,7 @@ public class KafkaTemplate<K, V> implements KafkaOperations<K, V> {
7778

7879
private volatile ProducerListener<K, V> producerListener = new LoggingProducerListener<K, V>();
7980

81+
private final long closeTimeout = ProducerFactoryUtils.DEFAULT_CLOSE_TIMEOUT;
8082

8183
/**
8284
* Create an instance using the supplied producer factory and autoFlush false.
@@ -149,6 +151,15 @@ public void setMessageConverter(RecordMessageConverter messageConverter) {
149151
this.messageConverter = messageConverter;
150152
}
151153

154+
/**
155+
* Set the maximum time to wait when closing a producer; default 5 seconds.
156+
* @param closeTimeout the close timeout.
157+
* @since 1.3.11
158+
*/
159+
public void setCloseTimeout(long closeTimeout) {
160+
setCloseTimeout(closeTimeout);
161+
}
162+
152163
@Override
153164
public ListenableFuture<SendResult<K, V>> sendDefault(V data) {
154165
return send(this.defaultTopic, data);
@@ -329,9 +340,9 @@ public void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offs
329340
producer.sendOffsetsToTransaction(offsets, consumerGroupId);
330341
}
331342

332-
protected void closeProducer(Producer<K, V> producer, boolean inLocalTx) {
333-
if (!inLocalTx) {
334-
producer.close();
343+
protected void closeProducer(Producer<K, V> producer, boolean inTx) {
344+
if (!inTx) {
345+
producer.close(this.closeTimeout, TimeUnit.MILLISECONDS);
335346
}
336347
}
337348

@@ -409,7 +420,7 @@ private Producer<K, V> getTheProducer() {
409420
return producer;
410421
}
411422
KafkaResourceHolder<K, V> holder = ProducerFactoryUtils
412-
.getTransactionalResourceHolder(this.producerFactory);
423+
.getTransactionalResourceHolder(this.producerFactory, this.closeTimeout);
413424
return holder.getProducer();
414425
}
415426
else {

spring-kafka/src/main/java/org/springframework/kafka/core/ProducerFactoryUtils.java

Lines changed: 25 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616

1717
package org.springframework.kafka.core;
1818

19+
import java.util.concurrent.TimeUnit;
20+
1921
import org.apache.kafka.clients.producer.Producer;
2022

2123
import org.springframework.transaction.support.ResourceHolderSynchronization;
@@ -34,6 +36,11 @@
3436
*/
3537
public final class ProducerFactoryUtils {
3638

39+
/**
40+
* The default close timeout (5 seconds) - package private to avoid later deprecation.
41+
*/
42+
static final long DEFAULT_CLOSE_TIMEOUT = 5000L;
43+
3744
private static ThreadLocal<String> groupIds = new ThreadLocal<>();
3845

3946
private ProducerFactoryUtils() {
@@ -50,6 +57,21 @@ private ProducerFactoryUtils() {
5057
public static <K, V> KafkaResourceHolder<K, V> getTransactionalResourceHolder(
5158
final ProducerFactory<K, V> producerFactory) {
5259

60+
return getTransactionalResourceHolder(producerFactory, DEFAULT_CLOSE_TIMEOUT);
61+
}
62+
63+
/**
64+
* Obtain a Producer that is synchronized with the current transaction, if any.
65+
* @param producerFactory the ProducerFactory to obtain a Channel for
66+
* @param closeTimeout the producer close timeout.
67+
* @param <K> the key type.
68+
* @param <V> the value type.
69+
* @return the resource holder.
70+
* @since 1.3.11
71+
*/
72+
public static <K, V> KafkaResourceHolder<K, V> getTransactionalResourceHolder(
73+
final ProducerFactory<K, V> producerFactory, long closeTimeout) {
74+
5375
Assert.notNull(producerFactory, "ProducerFactory must not be null");
5476

5577
@SuppressWarnings("unchecked")
@@ -62,19 +84,19 @@ public static <K, V> KafkaResourceHolder<K, V> getTransactionalResourceHolder(
6284
producer.beginTransaction();
6385
}
6486
catch (RuntimeException e) {
65-
producer.close();
87+
producer.close(closeTimeout, TimeUnit.MILLISECONDS);
6688
throw e;
6789
}
6890

69-
resourceHolder = new KafkaResourceHolder<K, V>(producer);
91+
resourceHolder = new KafkaResourceHolder<K, V>(producer, closeTimeout);
7092
bindResourceToTransaction(resourceHolder, producerFactory);
7193
}
7294
return resourceHolder;
7395
}
7496

7597
public static <K, V> void releaseResources(KafkaResourceHolder<K, V> resourceHolder) {
7698
if (resourceHolder != null) {
77-
resourceHolder.getProducer().close();
99+
resourceHolder.close();
78100
}
79101
}
80102

spring-kafka/src/main/java/org/springframework/kafka/transaction/KafkaTransactionManager.java

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818

1919
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
2020
import org.springframework.kafka.core.KafkaResourceHolder;
21-
import org.springframework.kafka.core.KafkaTemplate;
2221
import org.springframework.kafka.core.ProducerFactory;
2322
import org.springframework.kafka.core.ProducerFactoryUtils;
2423
import org.springframework.transaction.CannotCreateTransactionException;
@@ -45,9 +44,9 @@
4544
*
4645
* <p>
4746
* Application code is required to retrieve the transactional Kafka resources via
48-
* {@link ProducerFactoryUtils#getTransactionalResourceHolder(ProducerFactory)}.
49-
* Spring's {@link KafkaTemplate} will auto detect a thread-bound Producer and
50-
* automatically participate in it.
47+
* {@link ProducerFactoryUtils#getTransactionalResourceHolder(ProducerFactory, java.time.Duration)}.
48+
* Spring's {@link org.springframework.kafka.core.KafkaTemplate KafkaTemplate} will auto
49+
* detect a thread-bound Producer and automatically participate in it.
5150
*
5251
* <p>
5352
* <b>The use of {@link DefaultKafkaProducerFactory} as a target for this transaction
@@ -68,8 +67,12 @@
6867
public class KafkaTransactionManager<K, V> extends AbstractPlatformTransactionManager
6968
implements ResourceTransactionManager {
7069

70+
private static final long DEFAULT_CLOSE_TIMEOUT = 5000L;
71+
7172
private final ProducerFactory<K, V> producerFactory;
7273

74+
private final long closeTimeout = DEFAULT_CLOSE_TIMEOUT;
75+
7376
/**
7477
* Create a new KafkaTransactionManager, given a ConnectionFactory.
7578
* Transaction synchronization is turned off by default, as this manager might be used alongside a datastore-based
@@ -92,6 +95,15 @@ public ProducerFactory<K, V> getProducerFactory() {
9295
return this.producerFactory;
9396
}
9497

98+
/**
99+
* Set the maximum time to wait when closing a producer; default 5 seconds.
100+
* @param closeTimeout the close timeout.
101+
* @since 1.3.11
102+
*/
103+
public void setCloseTimeout(long closeTimeout) {
104+
setCloseTimeout(closeTimeout);
105+
}
106+
95107
@Override
96108
public Object getResourceFactory() {
97109
return getProducerFactory();
@@ -122,7 +134,8 @@ protected void doBegin(Object transaction, TransactionDefinition definition) {
122134
KafkaTransactionObject<K, V> txObject = (KafkaTransactionObject<K, V>) transaction;
123135
KafkaResourceHolder<K, V> resourceHolder = null;
124136
try {
125-
resourceHolder = ProducerFactoryUtils.getTransactionalResourceHolder(getProducerFactory());
137+
resourceHolder = ProducerFactoryUtils.getTransactionalResourceHolder(getProducerFactory(),
138+
this.closeTimeout);
126139
if (logger.isDebugEnabled()) {
127140
logger.debug("Created Kafka transaction on producer [" + resourceHolder.getProducer() + "]");
128141
}

spring-kafka/src/test/java/org/springframework/kafka/core/DefaultKafkaProducerFactoryTests.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424

2525
import java.util.HashMap;
2626
import java.util.concurrent.BlockingQueue;
27+
import java.util.concurrent.TimeUnit;
2728
import java.util.concurrent.atomic.AtomicInteger;
2829

2930
import org.apache.kafka.clients.producer.Producer;
@@ -93,7 +94,7 @@ protected Producer createTransactionalProducer() {
9394
inOrder.verify(producer).send(any(), any());
9495
inOrder.verify(producer).commitTransaction();
9596
inOrder.verify(producer).beginTransaction();
96-
inOrder.verify(producer).close();
97+
inOrder.verify(producer).close(ProducerFactoryUtils.DEFAULT_CLOSE_TIMEOUT, TimeUnit.MILLISECONDS);
9798
inOrder.verifyNoMoreInteractions();
9899
pf.destroy();
99100
}

0 commit comments

Comments
 (0)