Skip to content

Commit 4c92e11

Browse files
authored
cats-effect 3.1.x for Rerunnable -> Sync + Clock + MonadCancel (#267)
* Add module 'catbird-effect3' for cats-effect 3.x * Exclude effect3 module from unidoc * Remove redundant tests * Update to cats-effect 3.1.1
1 parent a49d9d9 commit 4c92e11

File tree

8 files changed

+351
-2
lines changed

8 files changed

+351
-2
lines changed

build.sbt

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,9 @@
11
val catsVersion = "2.6.1"
2+
3+
// For the transition period, we publish artifacts for both cats-effect 2.x and 3.x
24
val catsEffectVersion = "2.5.1"
5+
val catsEffect3Version = "3.1.1"
6+
37
val utilVersion = "21.5.0"
48
val finagleVersion = "21.5.0"
59

@@ -64,7 +68,7 @@ lazy val root = project
6468
.enablePlugins(GhpagesPlugin, ScalaUnidocPlugin)
6569
.settings(allSettings ++ noPublishSettings)
6670
.settings(
67-
(ScalaUnidoc / unidoc / unidocProjectFilter) := inAnyProject -- inProjects(benchmark),
71+
(ScalaUnidoc / unidoc / unidocProjectFilter) := inAnyProject -- inProjects(benchmark, effect3),
6872
addMappingsToSiteDir((ScalaUnidoc / packageDoc / mappings), docMappingsApiDir),
6973
git.remoteRepo := "git@github.com:travisbrown/catbird.git"
7074
)
@@ -77,7 +81,7 @@ lazy val root = project
7781
|import io.catbird.util._
7882
""".stripMargin
7983
)
80-
.aggregate(util, effect, finagle, benchmark)
84+
.aggregate(util, effect, effect3, finagle, benchmark)
8185
.dependsOn(util, effect, finagle)
8286

8387
lazy val util = project
@@ -104,6 +108,22 @@ lazy val effect = project
104108
)
105109
.dependsOn(util, util % "test->test")
106110

111+
lazy val effect3 = project
112+
.in(file("effect3"))
113+
.settings(moduleName := "catbird-effect3")
114+
.settings(allSettings)
115+
.settings(
116+
libraryDependencies ++= Seq(
117+
"org.typelevel" %% "cats-effect" % catsEffect3Version,
118+
"org.typelevel" %% "cats-effect-laws" % catsEffect3Version % Test,
119+
"org.typelevel" %% "cats-effect-testkit" % catsEffect3Version % Test
120+
),
121+
(Test / scalacOptions) ~= {
122+
_.filterNot(Set("-Yno-imports", "-Yno-predef"))
123+
}
124+
)
125+
.dependsOn(util, util % "test->test")
126+
107127
lazy val finagle = project
108128
.settings(moduleName := "catbird-finagle")
109129
.settings(allSettings)
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
package io.catbird.util.effect
2+
3+
import cats.effect.kernel.{ MonadCancel, Outcome }
4+
import com.twitter.util.{ Future, Monitor }
5+
import io.catbird.util.FutureMonadError
6+
7+
import java.lang.Throwable
8+
9+
import scala.Unit
10+
11+
trait FutureInstances {
12+
implicit final val futureMonadCancelInstance
13+
: MonadCancel[Future, Throwable] with MonadCancel.Uncancelable[Future, Throwable] =
14+
new FutureMonadError with MonadCancel[Future, Throwable] with MonadCancel.Uncancelable[Future, Throwable] {
15+
16+
final override def forceR[A, B](fa: Future[A])(fb: Future[B]): Future[B] =
17+
fa.liftToTry.flatMap { resultA =>
18+
resultA.handle(Monitor.catcher)
19+
fb
20+
}
21+
22+
/**
23+
* Special implementation so exceptions in release are cought by the `Monitor`.
24+
*/
25+
final override def bracketCase[A, B](acquire: Future[A])(use: A => Future[B])(
26+
release: (A, Outcome[Future, Throwable, B]) => Future[Unit]
27+
): Future[B] =
28+
acquire
29+
.flatMap(a =>
30+
use(a).liftToTry
31+
.flatMap(result => release(a, tryToFutureOutcome(result)).handle(Monitor.catcher).map(_ => result))
32+
)
33+
.lowerFromTry
34+
}
35+
}
Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
package io.catbird.util.effect
2+
3+
import cats.effect.Clock
4+
import cats.effect.kernel.{ MonadCancel, Outcome, Sync }
5+
import com.twitter.util.{ Future, Monitor }
6+
import io.catbird.util.{ Rerunnable, RerunnableMonadError }
7+
8+
import java.lang.Throwable
9+
import java.util.concurrent.TimeUnit
10+
import java.lang.System
11+
12+
import scala.Unit
13+
import scala.concurrent.duration.FiniteDuration
14+
15+
trait RerunnableInstances {
16+
implicit final val rerunnableInstance
17+
: Sync[Rerunnable] with Clock[Rerunnable] with MonadCancel.Uncancelable[Rerunnable, Throwable] =
18+
new RerunnableMonadError
19+
with Sync[Rerunnable]
20+
with Clock[Rerunnable]
21+
with MonadCancel.Uncancelable[Rerunnable, Throwable] {
22+
23+
final override def suspend[A](hint: Sync.Type)(thunk: => A): Rerunnable[A] =
24+
Rerunnable(thunk)
25+
26+
final override def realTime: Rerunnable[FiniteDuration] =
27+
Rerunnable(FiniteDuration(System.currentTimeMillis(), TimeUnit.MILLISECONDS))
28+
29+
final override def monotonic: Rerunnable[FiniteDuration] =
30+
Rerunnable(FiniteDuration(System.nanoTime(), TimeUnit.NANOSECONDS))
31+
32+
final override def forceR[A, B](fa: Rerunnable[A])(fb: Rerunnable[B]): Rerunnable[B] =
33+
fa.liftToTry.flatMap { resultA =>
34+
resultA.handle(Monitor.catcher)
35+
fb
36+
}
37+
38+
/**
39+
* Special implementation so exceptions in release are cought by the `Monitor`.
40+
*/
41+
final override def bracketCase[A, B](acquire: Rerunnable[A])(use: A => Rerunnable[B])(
42+
release: (A, Outcome[Rerunnable, Throwable, B]) => Rerunnable[Unit]
43+
): Rerunnable[B] = new Rerunnable[B] {
44+
final def run: Future[B] =
45+
acquire.run.flatMap { a =>
46+
val future = use(a).run
47+
future.transform(result =>
48+
release(a, tryToRerunnableOutcome(result)).run.handle(Monitor.catcher).flatMap(_ => future)
49+
)
50+
}
51+
}
52+
}
53+
}
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
package io.catbird.util
2+
3+
import cats.effect.{ Async, IO }
4+
import com.twitter.util.{ Future, Return, Throw, Try }
5+
6+
import java.lang.Throwable
7+
8+
import cats.effect.Outcome
9+
import cats.effect.kernel.Resource.ExitCase
10+
11+
import scala.util.{ Left, Right }
12+
13+
package object effect extends FutureInstances with RerunnableInstances {
14+
15+
/**
16+
* Converts the `Future` to `F`.
17+
*/
18+
def futureToAsync[F[_], A](fa: => Future[A])(implicit F: Async[F]): F[A] = F.async_ { k =>
19+
fa.respond {
20+
case Return(a) => k(Right(a))
21+
case Throw(err) => k(Left(err))
22+
}
23+
}
24+
25+
/**
26+
* Converts the `Rerunnable` to `IO`.
27+
*/
28+
final def rerunnableToIO[A](fa: Rerunnable[A]): IO[A] =
29+
futureToAsync[IO, A](fa.run)
30+
31+
/**
32+
* Convert a twitter-util Try to cats-effect ExitCase
33+
*/
34+
final def tryToExitCase[A](ta: Try[A]): ExitCase =
35+
ta match {
36+
case Return(_) => ExitCase.Succeeded
37+
case Throw(e) => ExitCase.Errored(e)
38+
}
39+
40+
/**
41+
* Convert a twitter-util Try to cats-effect Outcome for Rerunnable
42+
*/
43+
final def tryToRerunnableOutcome[A](ta: Try[A]): Outcome[Rerunnable, Throwable, A] =
44+
ta match {
45+
case Return(a) => Outcome.Succeeded(Rerunnable.const(a))
46+
case Throw(e) => Outcome.Errored(e)
47+
}
48+
49+
/**
50+
* Convert a twitter-util Try to cats-effect Outcome for Future
51+
*/
52+
final def tryToFutureOutcome[A](ta: Try[A]): Outcome[Future, Throwable, A] =
53+
ta match {
54+
case Return(a) => Outcome.Succeeded(Future.value(a))
55+
case Throw(e) => Outcome.Errored(e)
56+
}
57+
}
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
package io.catbird.util.effect
2+
3+
import cats.Eq
4+
import cats.effect.laws.MonadCancelTests
5+
import cats.instances.all._
6+
import cats.laws.discipline.MonadErrorTests
7+
import cats.laws.discipline.arbitrary._
8+
import com.twitter.conversions.DurationOps._
9+
import com.twitter.util.Future
10+
import io.catbird.util.{ ArbitraryInstances, EqInstances, futureEqWithFailure }
11+
import org.scalatest.funsuite.AnyFunSuite
12+
import org.scalatest.prop.Configuration
13+
import org.typelevel.discipline.scalatest.FunSuiteDiscipline
14+
15+
class FutureSuite
16+
extends AnyFunSuite
17+
with FunSuiteDiscipline
18+
with Configuration
19+
with ArbitraryInstances
20+
with EqInstances {
21+
22+
implicit def futureEq[A](implicit A: Eq[A]): Eq[Future[A]] =
23+
futureEqWithFailure(1.seconds)
24+
25+
checkAll("Future[Int]", MonadErrorTests[Future, Throwable].monadError[Int, Int, Int])
26+
checkAll("Future[Int]", MonadCancelTests[Future, Throwable].monadCancel[Int, Int, Int])
27+
}
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
package io.catbird.util.effect
2+
3+
import java.time.Instant
4+
import java.util.concurrent.TimeUnit
5+
6+
import cats.effect.Clock
7+
import com.twitter.util.Await
8+
import io.catbird.util.Rerunnable
9+
import org.scalatest.{ Outcome }
10+
import org.scalatest.concurrent.{ Eventually, IntegrationPatience }
11+
import org.scalatest.funsuite.FixtureAnyFunSuite
12+
13+
/**
14+
* We'll use `eventually` and a reasonably big tolerance here to prevent CI from failing if it is a bit slow.
15+
*
16+
* Technically the implementation is just an extremely thin wrapper around `System.currentTimeMillis()`
17+
* and `System.nanoTime()` so as long as the result is the same order of magnitude (and therefore the
18+
* unit-conversion is correct) we should be fine.
19+
*/
20+
class RerunnableClockSuite extends FixtureAnyFunSuite with Eventually with IntegrationPatience {
21+
22+
protected final class FixtureParam {
23+
def now: Instant = Instant.now()
24+
}
25+
26+
test("Retrieval of real time") { f =>
27+
eventually {
28+
val result = Await.result(
29+
Clock[Rerunnable].realTime.map(duration => Instant.ofEpochMilli(duration.toMillis)).run
30+
)
31+
32+
assert(java.time.Duration.between(result, f.now).abs().toMillis < 50)
33+
}
34+
}
35+
36+
test("Retrieval of monotonic time") { f =>
37+
eventually {
38+
val result = Await.result(
39+
Clock[Rerunnable].monotonic.map(duration => duration.toNanos).run
40+
)
41+
42+
val durationBetween = Math.abs(System.nanoTime() - result)
43+
assert(TimeUnit.MILLISECONDS.convert(durationBetween, TimeUnit.NANOSECONDS) < 5)
44+
}
45+
}
46+
47+
override protected def withFixture(test: OneArgTest): Outcome = withFixture(test.toNoArgTest(new FixtureParam))
48+
}
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
package io.catbird.util.effect
2+
3+
import cats.effect.MonadCancel
4+
import cats.effect.kernel.testkit.SyncTypeGenerators
5+
import cats.effect.laws.SyncTests
6+
import cats.instances.either._
7+
import cats.instances.int._
8+
import cats.instances.tuple._
9+
import cats.instances.unit._
10+
import cats.laws.discipline.arbitrary._
11+
import com.twitter.util.{ Await, Monitor, Throw }
12+
import io.catbird.util.{ ArbitraryInstances, EqInstances, Rerunnable }
13+
import org.scalatest.funsuite.AnyFunSuite
14+
import org.scalatest.prop.Configuration
15+
import org.typelevel.discipline.scalatest.FunSuiteDiscipline
16+
17+
class RerunnableSuite
18+
extends AnyFunSuite
19+
with FunSuiteDiscipline
20+
with Configuration
21+
with ArbitraryInstances
22+
with SyncTypeGenerators
23+
with EqInstances
24+
with Runners {
25+
26+
// This includes tests for Clock, MonadCancel, and MonadError
27+
checkAll("Rerunnable[Int]", SyncTests[Rerunnable].sync[Int, Int, Int])
28+
29+
test("Exceptions thrown by release are handled by Monitor") {
30+
val useException = new Exception("thrown by use")
31+
val releaseException = new Exception("thrown by release")
32+
33+
var monitoredException: Throwable = null
34+
val monitor = Monitor.mk { case e => monitoredException = e; true; }
35+
36+
val rerunnable = MonadCancel[Rerunnable, Throwable]
37+
.bracket(Rerunnable.Unit)(_ => Rerunnable.raiseError(useException))(_ => Rerunnable.raiseError(releaseException))
38+
.liftToTry
39+
40+
val result = Await.result(Monitor.using(monitor)(rerunnable.run))
41+
42+
assert(result == Throw(useException))
43+
assert(monitoredException == releaseException)
44+
}
45+
}
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
package io.catbird.util.effect
2+
3+
import cats.Eq
4+
import cats.effect.{ IO, Outcome, unsafe }
5+
import cats.effect.testkit.TestContext
6+
import cats.effect.unsafe.IORuntimeConfig
7+
import io.catbird.util.{ EqInstances, Rerunnable }
8+
import org.scalacheck.Prop
9+
10+
import scala.annotation.implicitNotFound
11+
import scala.concurrent.duration.FiniteDuration
12+
import scala.concurrent.duration._
13+
import scala.language.implicitConversions
14+
15+
/**
16+
* Test helpers mostly taken from the cats-effect IOSpec.
17+
*/
18+
trait Runners { self: EqInstances =>
19+
20+
implicit val ticker: Ticker = Ticker(TestContext())
21+
22+
implicit def eqIOA[A: Eq](implicit ticker: Ticker): Eq[IO[A]] =
23+
Eq.by(unsafeRun(_))
24+
25+
implicit def rerunnableEq[A](implicit A: Eq[A]): Eq[Rerunnable[A]] =
26+
Eq.by[Rerunnable[A], IO[A]](rerunnableToIO)
27+
28+
implicit def boolRunnings(rerunnableB: Rerunnable[Boolean])(implicit ticker: Ticker): Prop =
29+
Prop(unsafeRun(rerunnableToIO(rerunnableB)).fold(false, _ => false, _.getOrElse(false)))
30+
31+
def unsafeRun[A](ioa: IO[A])(implicit ticker: Ticker): Outcome[Option, Throwable, A] =
32+
try {
33+
var results: Outcome[Option, Throwable, A] = Outcome.Succeeded(None)
34+
35+
ioa.unsafeRunAsync {
36+
case Left(t) => results = Outcome.Errored(t)
37+
case Right(a) => results = Outcome.Succeeded(Some(a))
38+
}(unsafe.IORuntime(ticker.ctx, ticker.ctx, scheduler, () => (), IORuntimeConfig()))
39+
40+
ticker.ctx.tickAll(1.days)
41+
42+
results
43+
} catch {
44+
case t: Throwable =>
45+
t.printStackTrace()
46+
throw t
47+
}
48+
49+
def scheduler(implicit ticker: Ticker): unsafe.Scheduler =
50+
new unsafe.Scheduler {
51+
import ticker.ctx
52+
53+
def sleep(delay: FiniteDuration, action: Runnable): Runnable = {
54+
val cancel = ctx.schedule(delay, action)
55+
new Runnable { def run() = cancel() }
56+
}
57+
58+
def nowMillis() = ctx.now().toMillis
59+
def monotonicNanos() = ctx.now().toNanos
60+
}
61+
62+
@implicitNotFound("could not find an instance of Ticker; try using `in ticked { implicit ticker =>`")
63+
case class Ticker(ctx: TestContext)
64+
}

0 commit comments

Comments
 (0)