Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 16 additions & 2 deletions core/shared/src/main/scala/fs2/concurrent/Topic.scala
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,10 @@ abstract class Topic[F[_], A] { self =>
* Note: if `publish1` is called concurrently by multiple producers,
* different subscribers may receive messages from different producers
* in a different order.
*
* Note: if `publish1` returns `Left(Topic.Closed)`, it is possible
* that some subscribers received the event while others did not due
* to concurrent closure.
*/
def publish1(a: A): F[Either[Topic.Closed, Unit]]

Expand Down Expand Up @@ -164,8 +168,18 @@ object Topic {
case State.Closed() =>
Topic.closed.pure[F]
case State.Active(subs, _) =>
foreach(subs)(_.send(a).void)
.as(Topic.rightUnit)
subs.toList.foldLeftM(Topic.rightUnit) {
case (Left(Topic.Closed), _) => Topic.closed.pure[F]
case (Right(_), (_, chan)) =>
chan.send(a).flatMap {
case Right(_) => Topic.rightUnit.pure[F]
case Left(_) =>
state.get.map {
case State.Closed() => Topic.closed
case _ => Topic.rightUnit
}
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor thing: I think constructing the effect in a right-associative way will lead to slightly faster execution for an IO-like F.

}

def subscribeAwait(maxQueued: Int): Resource[F, Stream[F, A]] =
Expand Down
26 changes: 15 additions & 11 deletions core/shared/src/test/scala/fs2/concurrent/TopicSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ class TopicSuite extends Fs2Suite {

// https://github.com/typelevel/fs2/issues/3644
test(
"when publish1 returns success, subscribers must receive the event, even if the publish1 races with close".fail
"when publish1 returns success, subscribers must receive the event, even if the publish1 races with close"
) {
val check: IO[Unit] =
Topic[IO, String]
Expand All @@ -235,16 +235,20 @@ class TopicSuite extends Fs2Suite {
sub.compile.toList // all subscriptions must terminate, since the Topic was closed
)
.map { eventss =>
val expected: List[String] =
published match {
case Right(()) =>
// publication succeeded, expecting singleton list with the event
List("foo")
case Left(Topic.Closed) =>
// publication rejected, expecting empty list
Nil
}
eventss.foreach(events => assertEquals(events, expected))
published match {
case Right(()) =>
// publication succeeded, expecting singleton list with the event
val expected = List("foo")
eventss.foreach(events => assertEquals(events, expected))
case Left(Topic.Closed) =>
// publication rejected due to closure, some subscribers might have received it
eventss.foreach { events =>
assert(
events == Nil || events == List("foo"),
s"Unexpected events: $events"
)
}
}
}
}
}
Expand Down
Loading