From 564fdb17779581006466bd415ab4908b6502cbf8 Mon Sep 17 00:00:00 2001 From: Renee Vandervelde Date: Sat, 28 Dec 2024 14:24:11 -0600 Subject: [PATCH] Add `whenTrue` collector extension to flows --- CHANGELOG.md | 1 + coroutines/api/coroutines.api | 2 + .../com/inkapplications/coroutines/Flows.kt | 16 ++++++ .../coroutines/ongoing/Collectors.kt | 11 ++++ .../inkapplications/coroutines/FlowsTest.kt | 55 +++++++++++-------- 5 files changed, 63 insertions(+), 22 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index f9d4198..3d7d5cf 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -25,6 +25,7 @@ Change Log - Coroutines - `runRetryable` method for retrying an operation with optional delay strategies such as exponential backoff. + - `whenTrue` extension for collecting a flow only when a value is true. ### Changed diff --git a/coroutines/api/coroutines.api b/coroutines/api/coroutines.api index 059ccae..81b42f5 100644 --- a/coroutines/api/coroutines.api +++ b/coroutines/api/coroutines.api @@ -17,6 +17,7 @@ public final class com/inkapplications/coroutines/FlowsKt { public static final fun mapItemsCatching (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function1;)Lkotlinx/coroutines/flow/Flow; public static final fun onItemFailure (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function1;)Lkotlinx/coroutines/flow/Flow; public static final fun safeCollect (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; + public static final fun whenTrue (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; } public final class com/inkapplications/coroutines/RetriesKt { @@ -79,6 +80,7 @@ public final class com/inkapplications/coroutines/RetryStrategy$Static : com/ink public final class com/inkapplications/coroutines/ongoing/CollectorsKt { public static final fun collect (Lcom/inkapplications/coroutines/ongoing/OngoingFlow;Lkotlin/jvm/functions/Function2;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; public static final fun collectLatest (Lcom/inkapplications/coroutines/ongoing/OngoingFlow;Lkotlin/jvm/functions/Function2;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; + public static final fun whenTrue (Lcom/inkapplications/coroutines/ongoing/OngoingFlow;Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; } public abstract interface class com/inkapplications/coroutines/ongoing/OngoingFlow { diff --git a/coroutines/src/commonMain/kotlin/com/inkapplications/coroutines/Flows.kt b/coroutines/src/commonMain/kotlin/com/inkapplications/coroutines/Flows.kt index 7e4a967..0944145 100644 --- a/coroutines/src/commonMain/kotlin/com/inkapplications/coroutines/Flows.kt +++ b/coroutines/src/commonMain/kotlin/com/inkapplications/coroutines/Flows.kt @@ -38,6 +38,22 @@ fun Flow.collectOn(scope: CoroutineScope, action: suspend (T) -> Unit): J } } +/** + * Collector that is invoked whenever the emitted value is true. + * + * This is run with a [collectLatest] operation, ensuring that if the + * value becomes false, the collector will be canceled. + * The value is also filtered with a [distinctUntilChanged] operation + * to avoid multiple invocations of the collector. + */ +suspend fun Flow.whenTrue( + collector: suspend () -> Unit, +) { + distinctUntilChanged().collectLatest { + if (it) collector() + } +} + /** * Map each item in the emitted lists for the flow. */ diff --git a/coroutines/src/commonMain/kotlin/com/inkapplications/coroutines/ongoing/Collectors.kt b/coroutines/src/commonMain/kotlin/com/inkapplications/coroutines/ongoing/Collectors.kt index d7434fe..b276467 100644 --- a/coroutines/src/commonMain/kotlin/com/inkapplications/coroutines/ongoing/Collectors.kt +++ b/coroutines/src/commonMain/kotlin/com/inkapplications/coroutines/ongoing/Collectors.kt @@ -1,5 +1,6 @@ package com.inkapplications.coroutines.ongoing +import com.inkapplications.coroutines.whenTrue import kotlinx.coroutines.flow.collect import kotlinx.coroutines.flow.collectLatest @@ -27,3 +28,13 @@ suspend inline fun OngoingFlow.collectLatest( asFlow().collectLatest { observer(it) } throw UnexpectedEndOfFlow() } + +/** + * @see whenTrue + */ +suspend inline fun OngoingFlow.whenTrue( + crossinline observer: suspend () -> Unit +): Nothing { + asFlow().whenTrue { observer() } + throw UnexpectedEndOfFlow() +} diff --git a/coroutines/src/commonTest/kotlin/com/inkapplications/coroutines/FlowsTest.kt b/coroutines/src/commonTest/kotlin/com/inkapplications/coroutines/FlowsTest.kt index 03dc36a..4cae1d9 100644 --- a/coroutines/src/commonTest/kotlin/com/inkapplications/coroutines/FlowsTest.kt +++ b/coroutines/src/commonTest/kotlin/com/inkapplications/coroutines/FlowsTest.kt @@ -3,9 +3,8 @@ package com.inkapplications.coroutines import com.inkapplications.coroutines.doubles.Animal import com.inkapplications.coroutines.doubles.Plants import kotlinx.coroutines.* -import kotlinx.coroutines.flow.asFlow -import kotlinx.coroutines.flow.flowOf -import kotlinx.coroutines.flow.toList +import kotlinx.coroutines.flow.* +import kotlinx.coroutines.test.runCurrent import kotlinx.coroutines.test.runTest import kotlin.test.Test import kotlin.test.assertEquals @@ -23,25 +22,37 @@ class FlowsTest { } -// @Test(expected = CancellationException::class) -// fun safeCollectCancels() { -// runBlocking { -// (1..5).asFlow().safeCollect { value -> -// if (value > 3) throw AssertionError("Flow should not be collected") -// if (value == 3) cancel() -// } -// } -// } - -// @Test -// fun collectOnTest() = runTest { -// val fixedThread = Thread() -// val scope = Executors.newSingleThreadExecutor { fixedThread }.asCoroutineDispatcher().let(::CoroutineScope) -// -// flowOf(1).collectOn(scope) { -// assertEquals(fixedThread, Thread.currentThread(), "Run on correct scope") -// } -// } + @Test + fun whenTrueTest() + { + runTest { + val flow = MutableSharedFlow() + var collected = 0 + + val job = launch { + flow.whenTrue { + ++collected + } + } + + runCurrent() + assertEquals(0, collected, "Does not run before emissions") + + flow.emit(false) + runCurrent() + assertEquals(0, collected, "Does not run when false") + + flow.emit(true) + runCurrent() + assertEquals(1, collected, "Runs when true") + + flow.emit(true) + runCurrent() + assertEquals(1, collected, "Does not re-run for duplicate event") + + job.cancelAndJoin() + } + } @Test fun mapEachTest() = runTest {