diff --git a/core/shared/src/main/scala/fs2/concurrent/Topic.scala b/core/shared/src/main/scala/fs2/concurrent/Topic.scala index b069bcfe6f..d60acc0031 100644 --- a/core/shared/src/main/scala/fs2/concurrent/Topic.scala +++ b/core/shared/src/main/scala/fs2/concurrent/Topic.scala @@ -60,6 +60,10 @@ abstract class Topic[F[_], A] { self => * Note: if `publish1` is called concurrently by multiple producers, * different subscribers may receive messages from different producers * in a different order. + * + * Note: if `publish1` returns `Left(Topic.Closed)`, it is possible + * that some subscribers received the event while others did not due + * to concurrent closure. */ def publish1(a: A): F[Either[Topic.Closed, Unit]] @@ -164,8 +168,18 @@ object Topic { case State.Closed() => Topic.closed.pure[F] case State.Active(subs, _) => - foreach(subs)(_.send(a).void) - .as(Topic.rightUnit) + subs.toList.foldLeftM(Topic.rightUnit) { + case (Left(Topic.Closed), _) => Topic.closed.pure[F] + case (Right(_), (_, chan)) => + chan.send(a).flatMap { + case Right(_) => Topic.rightUnit.pure[F] + case Left(_) => + state.get.map { + case State.Closed() => Topic.closed + case _ => Topic.rightUnit + } + } + } } def subscribeAwait(maxQueued: Int): Resource[F, Stream[F, A]] = diff --git a/core/shared/src/test/scala/fs2/concurrent/TopicSuite.scala b/core/shared/src/test/scala/fs2/concurrent/TopicSuite.scala index f2d889b92d..90e688428b 100644 --- a/core/shared/src/test/scala/fs2/concurrent/TopicSuite.scala +++ b/core/shared/src/test/scala/fs2/concurrent/TopicSuite.scala @@ -218,7 +218,7 @@ class TopicSuite extends Fs2Suite { // https://github.com/typelevel/fs2/issues/3644 test( - "when publish1 returns success, subscribers must receive the event, even if the publish1 races with close".fail + "when publish1 returns success, subscribers must receive the event, even if the publish1 races with close" ) { val check: IO[Unit] = Topic[IO, String] @@ -235,16 +235,20 @@ class TopicSuite extends Fs2Suite { sub.compile.toList // all subscriptions must terminate, since the Topic was closed ) .map { eventss => - val expected: List[String] = - published match { - case Right(()) => - // publication succeeded, expecting singleton list with the event - List("foo") - case Left(Topic.Closed) => - // publication rejected, expecting empty list - Nil - } - eventss.foreach(events => assertEquals(events, expected)) + published match { + case Right(()) => + // publication succeeded, expecting singleton list with the event + val expected = List("foo") + eventss.foreach(events => assertEquals(events, expected)) + case Left(Topic.Closed) => + // publication rejected due to closure, some subscribers might have received it + eventss.foreach { events => + assert( + events == Nil || events == List("foo"), + s"Unexpected events: $events" + ) + } + } } } }