5757import reactor .kafka .sender .SenderOptions ;
5858import reactor .kafka .sender .SenderRecord ;
5959import reactor .kafka .sender .SenderResult ;
60+ import reactor .kafka .sender .internals .DefaultKafkaSender ;
6061import reactor .test .StepVerifier ;
6162
6263/**
@@ -281,6 +282,7 @@ public void shouldSendOneRecordTransactionallyViaTemplateAsSenderRecordAndReceiv
281282
282283 @ LogLevels (categories = "reactor.kafka.receiver.internals.ConsumerEventLoop" , level = "TRACE" ,
283284 classes = { JUnitUtils .class , LogLevelsCondition .class ,
285+ DefaultKafkaSender .class ,
284286 ReactiveKafkaProducerTemplateTransactionIntegrationTests .class })
285287 @ Test
286288 public void shouldSendOneRecordTransactionallyViaTemplateAsSenderRecordAndReceiveItExactlyOnce () {
@@ -293,21 +295,20 @@ public void shouldSendOneRecordTransactionallyViaTemplateAsSenderRecordAndReceiv
293295 .expectComplete ()
294296 .verify ();
295297
296- StepVerifier .create (reactiveKafkaConsumerTemplate
297- .receiveExactlyOnce (this .reactiveKafkaProducerTemplate .transactionManager ())
298- .concatMap (consumerRecordFlux -> sendAndCommit (consumerRecordFlux , false ))
299- .onErrorResume (error -> this .reactiveKafkaProducerTemplate .transactionManager ()
300- .abort ()
301- .then (Mono .error (error ))))
298+ StepVerifier .create (
299+ reactiveKafkaConsumerTemplate
300+ .receiveExactlyOnce (this .reactiveKafkaProducerTemplate .transactionManager ())
301+ .concatMap (consumerRecordFlux -> sendAndCommit (consumerRecordFlux , false )))
302302 .assertNext (senderResult -> {
303303 assertThat (senderResult .correlationMetadata ().intValue ()).isEqualTo (DEFAULT_KEY );
304304 assertThat (senderResult .recordMetadata ().offset ()).isGreaterThan (0 );
305305 })
306306 .thenCancel ()
307307 .verify (DEFAULT_VERIFY_TIMEOUT );
308308
309- StepVerifier .create (reactiveKafkaConsumerTemplate
310- .receive ().doOnNext (receiverRecord -> receiverRecord .receiverOffset ().acknowledge ()))
309+ StepVerifier .create (
310+ reactiveKafkaConsumerTemplate
311+ .receive ().doOnNext (receiverRecord -> receiverRecord .receiverOffset ().acknowledge ()))
311312 .assertNext (receiverRecord -> {
312313 logger .info (ListenerUtils .recordToString (receiverRecord , true ));
313314 assertThat (receiverRecord .value ()).isEqualTo (DEFAULT_VALUE + "xyz" );
0 commit comments