From 8a211768dd1475118a9a646e1307b48a4ace1f8c Mon Sep 17 00:00:00 2001 From: Ankit Kumar Date: Tue, 17 Mar 2026 14:30:43 +0000 Subject: [PATCH] Wrote test and fixed the bug for confalte chunk --- core/shared/src/main/scala/fs2/Stream.scala | 2 +- .../test/scala/fs2/StreamConflateSuite.scala | 20 +++++++++++++++++++ 2 files changed, 21 insertions(+), 1 deletion(-) diff --git a/core/shared/src/main/scala/fs2/Stream.scala b/core/shared/src/main/scala/fs2/Stream.scala index a3642794a8..2e92d47cf8 100644 --- a/core/shared/src/main/scala/fs2/Stream.scala +++ b/core/shared/src/main/scala/fs2/Stream.scala @@ -576,7 +576,7 @@ final class Stream[+F[_], +O] private[fs2] (private[fs2] val underlying: Pull[F, * The `chunkLimit` parameter controls backpressure on the source stream. */ def conflateChunks[F2[x] >: F[x]: Concurrent](chunkLimit: Int): Stream[F2, Chunk[O]] = - Stream.eval(Channel.bounded[F2, Chunk[O]](chunkLimit)).flatMap { chan => + Stream.eval(Channel.bounded[F2, Chunk[O]](chunkLimit - 1)).flatMap { chan => val producer = chunks.through(chan.sendAll) val consumer = chan.stream.chunks.map(_.combineAll) consumer.concurrently(producer) diff --git a/core/shared/src/test/scala/fs2/StreamConflateSuite.scala b/core/shared/src/test/scala/fs2/StreamConflateSuite.scala index c50b7c4451..cd7c9d9b84 100644 --- a/core/shared/src/test/scala/fs2/StreamConflateSuite.scala +++ b/core/shared/src/test/scala/fs2/StreamConflateSuite.scala @@ -23,6 +23,7 @@ package fs2 import cats.effect.IO import cats.effect.testkit.TestControl +import cats.syntax.all._ import scala.concurrent.duration._ @@ -44,4 +45,23 @@ class StreamConflateSuite extends Fs2Suite { ) ) } + test("conflateChunks respects chunk limit") { + + (1 to 1000).toList.traverse_ { _ => + Stream(1, 2, 3, 4, 5, 6, 7) + .covary[IO] + .chunkLimit(1) + .unchunks + .conflateChunks(3) + .compile + .toList + .map { chunks => + assert( + chunks.forall(_.size <= 3), + s"Expected all chunks <= 3, but got: $chunks" + ) + } + } + } + }