|
1 | 1 | /* |
2 | | - * Copyright 2016-2020 the original author or authors. |
| 2 | + * Copyright 2016-2021 the original author or authors. |
3 | 3 | * |
4 | 4 | * Licensed under the Apache License, Version 2.0 (the "License"); |
5 | 5 | * you may not use this file except in compliance with the License. |
|
57 | 57 | import org.springframework.context.ApplicationListener; |
58 | 58 | import org.springframework.context.event.ContextStoppedEvent; |
59 | 59 | import org.springframework.core.log.LogAccessor; |
| 60 | +import org.springframework.kafka.KafkaException; |
60 | 61 | import org.springframework.kafka.support.TransactionSupport; |
61 | 62 | import org.springframework.lang.Nullable; |
62 | 63 | import org.springframework.util.Assert; |
@@ -519,7 +520,20 @@ private CloseSafeProducer<K, V> doCreateTxProducer(String prefix, String suffix, |
519 | 520 | this.clientIdPrefix + "-" + this.clientIdCounter.incrementAndGet()); |
520 | 521 | } |
521 | 522 | newProducer = createRawProducer(newProducerConfigs); |
522 | | - newProducer.initTransactions(); |
| 523 | + try { |
| 524 | + newProducer.initTransactions(); |
| 525 | + } |
| 526 | + catch (RuntimeException ex) { |
| 527 | + try { |
| 528 | + newProducer.close(this.physicalCloseTimeout); |
| 529 | + } |
| 530 | + catch (RuntimeException ex2) { |
| 531 | + KafkaException newEx = new KafkaException("initTransactions() failed and then close() failed", ex); |
| 532 | + newEx.addSuppressed(ex2); |
| 533 | + throw newEx; // NOSONAR - lost stack trace |
| 534 | + } |
| 535 | + throw new KafkaException("initTransactions() failed", ex); |
| 536 | + } |
523 | 537 | return new CloseSafeProducer<>(newProducer, getCache(prefix), remover, |
524 | 538 | (String) newProducerConfigs.get(ProducerConfig.TRANSACTIONAL_ID_CONFIG), this.physicalCloseTimeout, |
525 | 539 | this.epoch); |
|
0 commit comments