@@ -127,8 +127,6 @@ public class DefaultKafkaProducerFactory<K, V> implements ProducerFactory<K, V>,
127127
128128 private final ThreadLocal <CloseSafeProducer <K , V >> threadBoundProducers = new ThreadLocal <>();
129129
130- private final ThreadLocal <Integer > threadBoundProducerEpochs = new ThreadLocal <>();
131-
132130 private final AtomicInteger epoch = new AtomicInteger ();
133131
134132 private final AtomicInteger clientIdCounter = new AtomicInteger ();
@@ -402,25 +400,21 @@ private Producer<K, V> doCreateProducer(@Nullable String txIdPrefix) {
402400 }
403401 if (this .producerPerThread ) {
404402 CloseSafeProducer <K , V > tlProducer = this .threadBoundProducers .get ();
405- if (this .threadBoundProducerEpochs .get () == null ) {
406- this .threadBoundProducerEpochs .set (this .epoch .get ());
407- }
408- if (tlProducer != null && this .epoch .get () != this .threadBoundProducerEpochs .get ()) {
403+ if (tlProducer != null && this .epoch .get () != tlProducer .epoch ) {
409404 closeThreadBoundProducer ();
410405 tlProducer = null ;
411406 }
412407 if (tlProducer == null ) {
413408 tlProducer = new CloseSafeProducer <>(createKafkaProducer (), this ::removeProducer ,
414- this .physicalCloseTimeout );
409+ this .physicalCloseTimeout , this . epoch );
415410 this .threadBoundProducers .set (tlProducer );
416- this .threadBoundProducerEpochs .set (this .epoch .get ());
417411 }
418412 return tlProducer ;
419413 }
420414 synchronized (this ) {
421415 if (this .producer == null ) {
422416 this .producer = new CloseSafeProducer <>(createKafkaProducer (), this ::removeProducer ,
423- this .physicalCloseTimeout );
417+ this .physicalCloseTimeout , this . epoch );
424418 }
425419 return this .producer ;
426420 }
@@ -527,7 +521,8 @@ private CloseSafeProducer<K, V> doCreateTxProducer(String prefix, String suffix,
527521 newProducer = createRawProducer (newProducerConfigs );
528522 newProducer .initTransactions ();
529523 return new CloseSafeProducer <>(newProducer , getCache (prefix ), remover ,
530- (String ) newProducerConfigs .get (ProducerConfig .TRANSACTIONAL_ID_CONFIG ), this .physicalCloseTimeout );
524+ (String ) newProducerConfigs .get (ProducerConfig .TRANSACTIONAL_ID_CONFIG ), this .physicalCloseTimeout ,
525+ this .epoch );
531526 }
532527
533528 protected Producer <K , V > createRawProducer (Map <String , Object > configs ) {
@@ -596,37 +591,57 @@ protected static class CloseSafeProducer<K, V> implements Producer<K, V> {
596591
597592 private final Duration closeTimeout ;
598593
594+ final int epoch ; // NOSONAR
595+
596+ private final AtomicInteger factoryEpoch ;
597+
599598 private volatile Exception producerFailed ;
600599
601600 private volatile boolean closed ;
602601
603602 CloseSafeProducer (Producer <K , V > delegate , Consumer <CloseSafeProducer <K , V >> removeProducer ,
604603 Duration closeTimeout ) {
605604
606- this (delegate , null , removeProducer , null , closeTimeout );
605+ this (delegate , null , removeProducer , null , closeTimeout , new AtomicInteger ());
606+ Assert .isTrue (!(delegate instanceof CloseSafeProducer ), "Cannot double-wrap a producer" );
607+ }
608+
609+ CloseSafeProducer (Producer <K , V > delegate , Consumer <CloseSafeProducer <K , V >> removeProducer ,
610+ Duration closeTimeout , AtomicInteger epoch ) {
611+
612+ this (delegate , null , removeProducer , null , closeTimeout , epoch );
607613 Assert .isTrue (!(delegate instanceof CloseSafeProducer ), "Cannot double-wrap a producer" );
608614 }
609615
610616 CloseSafeProducer (Producer <K , V > delegate , BlockingQueue <CloseSafeProducer <K , V >> cache ,
611617 Duration closeTimeout ) {
612- this (delegate , cache , null , closeTimeout );
618+ this (delegate , cache , null , null , closeTimeout , new AtomicInteger () );
613619 }
614620
615621 CloseSafeProducer (Producer <K , V > delegate , BlockingQueue <CloseSafeProducer <K , V >> cache ,
616622 @ Nullable Consumer <CloseSafeProducer <K , V >> removeConsumerProducer , Duration closeTimeout ) {
617623
618- this (delegate , cache , removeConsumerProducer , null , closeTimeout );
624+ this (delegate , cache , removeConsumerProducer , null , closeTimeout , new AtomicInteger () );
619625 }
620626
621627 CloseSafeProducer (Producer <K , V > delegate , BlockingQueue <CloseSafeProducer <K , V >> cache ,
622628 @ Nullable Consumer <CloseSafeProducer <K , V >> removeProducer , @ Nullable String txId ,
623629 Duration closeTimeout ) {
624630
631+ this (delegate , cache , removeProducer , txId , closeTimeout , new AtomicInteger ());
632+ }
633+
634+ CloseSafeProducer (Producer <K , V > delegate , BlockingQueue <CloseSafeProducer <K , V >> cache ,
635+ @ Nullable Consumer <CloseSafeProducer <K , V >> removeProducer , @ Nullable String txId ,
636+ Duration closeTimeout , AtomicInteger epoch ) {
637+
625638 this .delegate = delegate ;
626639 this .cache = cache ;
627640 this .removeProducer = removeProducer ;
628641 this .txId = txId ;
629642 this .closeTimeout = closeTimeout ;
643+ this .epoch = epoch .get ();
644+ this .factoryEpoch = epoch ;
630645 LOGGER .debug (() -> "Created new Producer: " + this );
631646 }
632647
@@ -760,8 +775,8 @@ public void close(@Nullable Duration timeout) {
760775 else {
761776 if (this .cache != null && this .removeProducer == null ) { // dedicated consumer producers are not cached
762777 synchronized (this ) {
763- if (! this .cache . contains ( this )
764- && !this .cache .offer (this )) {
778+ if (this . epoch != this .factoryEpoch . get ( )
779+ || (! this . cache . contains ( this ) && !this .cache .offer (this ) )) {
765780 this .closed = true ;
766781 this .delegate .close (timeout );
767782 }
0 commit comments