@@ -227,7 +227,8 @@ public void shouldSendMultipleRecordsTransactionallyViaTemplateAndReceiveIt() {
227227 .skip (expectedTotalRecordsCount - 1 ).findFirst ();
228228 assertThat (lastRecord .isPresent ()).isEqualTo (true );
229229 lastRecord .ifPresent (last -> assertThat (last .value ())
230- .endsWith (String .valueOf (recordsCountInGroup * (int ) Math .pow (10 , transactionGroupsCount ))));
230+ .endsWith (
231+ String .valueOf (recordsCountInGroup * (int ) Math .pow (10 , transactionGroupsCount ))));
231232 })
232233 .expectComplete ()
233234 .verify (DEFAULT_VERIFY_TIMEOUT );
@@ -300,8 +301,9 @@ public void shouldSendOneRecordTransactionallyViaTemplateAsSenderRecordAndReceiv
300301 StepVerifier .create (reactiveKafkaConsumerTemplate
301302 .receiveExactlyOnce (reactiveKafkaProducerTemplate .transactionManager ())
302303 .concatMap (consumerRecordFlux -> sendAndCommit (consumerRecordFlux , true ))
303- .onErrorResume (error -> reactiveKafkaProducerTemplate .transactionManager ().abort ().then (Mono .error (error )))
304- )
304+ .onErrorResume (error -> reactiveKafkaProducerTemplate .transactionManager ()
305+ .abort ()
306+ .then (Mono .error (error ))))
305307 .expectErrorMatches (throwable -> throwable instanceof KafkaException &&
306308 throwable .getMessage ().equals ("TransactionalId reactive.transaction: Invalid transition " +
307309 "attempted from state READY to state ABORTING_TRANSACTION" ))
@@ -328,8 +330,9 @@ public void shouldSendOneRecordTransactionallyViaTemplateAsSenderRecordAndReceiv
328330 StepVerifier .create (reactiveKafkaConsumerTemplate
329331 .receiveExactlyOnce (reactiveKafkaProducerTemplate .transactionManager ())
330332 .concatMap (consumerRecordFlux -> sendAndCommit (consumerRecordFlux , false ))
331- .onErrorResume (error -> reactiveKafkaProducerTemplate .transactionManager ().abort ().then (Mono .error (error )))
332- )
333+ .onErrorResume (error -> reactiveKafkaProducerTemplate .transactionManager ()
334+ .abort ()
335+ .then (Mono .error (error ))))
333336 .assertNext (senderResult -> {
334337 assertThat (senderResult .correlationMetadata ().intValue ()).isEqualTo (DEFAULT_KEY );
335338 assertThat (senderResult .recordMetadata ().offset ()).isGreaterThan (0 );
@@ -347,7 +350,9 @@ public void shouldSendOneRecordTransactionallyViaTemplateAsSenderRecordAndReceiv
347350 .verify (DEFAULT_VERIFY_TIMEOUT );
348351 }
349352
350- private Flux <SenderResult <Integer >> sendAndCommit (Flux <ConsumerRecord <Integer , String >> fluxConsumerRecord , boolean failCommit ) {
353+ private Flux <SenderResult <Integer >> sendAndCommit (Flux <ConsumerRecord <Integer , String >> fluxConsumerRecord ,
354+ boolean failCommit ) {
355+
351356 return reactiveKafkaProducerTemplate
352357 .send (fluxConsumerRecord .map (this ::toSenderRecord )
353358 .concatWith (failCommit ?
@@ -360,7 +365,8 @@ private Publisher<? extends SenderRecord<Integer, String, Integer>> doThrowKafka
360365 }
361366
362367 private SenderRecord <Integer , String , Integer > toSenderRecord (ConsumerRecord <Integer , String > record ) {
363- return SenderRecord .create (REACTIVE_INT_KEY_TOPIC , record .partition (), null , record .key (), record .value () + "xyz" , record .key ());
368+ return SenderRecord .create (REACTIVE_INT_KEY_TOPIC , record .partition (), null , record .key (),
369+ record .value () + "xyz" , record .key ());
364370 }
365371
366372}
0 commit comments