@@ -1196,8 +1196,8 @@ private RuntimeException doInvokeBatchListener(final ConsumerRecords<K, V> recor
11961196 return null ;
11971197 }
11981198
1199- private void invokeBatchOnMessage (final ConsumerRecords <K , V > records , List < ConsumerRecord < K , V >> recordList ,
1200- @ SuppressWarnings (RAW_TYPES ) Producer producer ) throws InterruptedException {
1199+ private void invokeBatchOnMessage (final ConsumerRecords <K , V > records , // NOSONAR - Cyclomatic Complexity
1200+ List < ConsumerRecord < K , V >> recordList , @ SuppressWarnings (RAW_TYPES ) Producer producer ) throws InterruptedException {
12011201
12021202 if (this .wantsFullRecords ) {
12031203 this .batchListener .onMessage (records ,
@@ -1211,16 +1211,14 @@ private void invokeBatchOnMessage(final ConsumerRecords<K, V> records, List<Cons
12111211 }
12121212 List <ConsumerRecord <?, ?>> toSeek = null ;
12131213 if (this .nackSleep >= 0 ) {
1214- Iterator <ConsumerRecord <K , V >> iterator = records .iterator ();
12151214 int index = 0 ;
12161215 toSeek = new ArrayList <>();
1217- while (iterator .hasNext ()) {
1218- ConsumerRecord <K , V > next = iterator .next ();
1216+ for (ConsumerRecord <K , V > record : records ) {
12191217 if (index ++ >= this .nackIndex ) {
1220- toSeek .add (next );
1218+ toSeek .add (record );
12211219 }
12221220 else {
1223- this .acks .put (next );
1221+ this .acks .put (record );
12241222 }
12251223 }
12261224 }
@@ -1232,7 +1230,7 @@ private void invokeBatchOnMessage(final ConsumerRecords<K, V> records, List<Cons
12321230 sendOffsetsToTransaction (producer );
12331231 }
12341232 }
1235- if (this . nackSleep >= 0 ) {
1233+ if (toSeek != null ) {
12361234 if (!this .autoCommit ) {
12371235 processCommits ();
12381236 }
0 commit comments