diff --git a/CHANGELOG.md b/CHANGELOG.md index aaaf809..f9d4198 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,15 @@ Change Log ========== +1.8.0 +----- + +### Added + + - Coroutines + - `OngoingFlow` construct for flows that do not end, along with applicable + transformation operators. + 1.7.0 ----- diff --git a/coroutines/api/coroutines.api b/coroutines/api/coroutines.api index 113744f..059ccae 100644 --- a/coroutines/api/coroutines.api +++ b/coroutines/api/coroutines.api @@ -76,3 +76,59 @@ public final class com/inkapplications/coroutines/RetryStrategy$Static : com/ink public fun toString ()Ljava/lang/String; } +public final class com/inkapplications/coroutines/ongoing/CollectorsKt { + public static final fun collect (Lcom/inkapplications/coroutines/ongoing/OngoingFlow;Lkotlin/jvm/functions/Function2;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; + public static final fun collectLatest (Lcom/inkapplications/coroutines/ongoing/OngoingFlow;Lkotlin/jvm/functions/Function2;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; +} + +public abstract interface class com/inkapplications/coroutines/ongoing/OngoingFlow { + public abstract fun asFlow ()Lkotlinx/coroutines/flow/Flow; +} + +public final class com/inkapplications/coroutines/ongoing/TransformationsKt { + public static final fun collectOn (Lcom/inkapplications/coroutines/ongoing/OngoingFlow;Lkotlinx/coroutines/CoroutineScope;Lkotlin/jvm/functions/Function2;)Ljava/lang/Void; + public static final fun combine (Lcom/inkapplications/coroutines/ongoing/OngoingFlow;Lcom/inkapplications/coroutines/ongoing/OngoingFlow;Lcom/inkapplications/coroutines/ongoing/OngoingFlow;Lcom/inkapplications/coroutines/ongoing/OngoingFlow;Lcom/inkapplications/coroutines/ongoing/OngoingFlow;Lkotlin/jvm/functions/Function5;)Lcom/inkapplications/coroutines/ongoing/OngoingFlow; + public static final fun combine (Lcom/inkapplications/coroutines/ongoing/OngoingFlow;Lcom/inkapplications/coroutines/ongoing/OngoingFlow;Lcom/inkapplications/coroutines/ongoing/OngoingFlow;Lcom/inkapplications/coroutines/ongoing/OngoingFlow;Lkotlin/jvm/functions/Function4;)Lcom/inkapplications/coroutines/ongoing/OngoingFlow; + public static final fun combine (Lcom/inkapplications/coroutines/ongoing/OngoingFlow;Lcom/inkapplications/coroutines/ongoing/OngoingFlow;Lcom/inkapplications/coroutines/ongoing/OngoingFlow;Lkotlin/jvm/functions/Function3;)Lcom/inkapplications/coroutines/ongoing/OngoingFlow; + public static final fun combine (Lcom/inkapplications/coroutines/ongoing/OngoingFlow;Lcom/inkapplications/coroutines/ongoing/OngoingFlow;Lkotlin/jvm/functions/Function2;)Lcom/inkapplications/coroutines/ongoing/OngoingFlow; + public static final fun combineApply (Lcom/inkapplications/coroutines/ongoing/OngoingFlow;Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;)Lcom/inkapplications/coroutines/ongoing/OngoingFlow; + public static final fun combineFlatten (Lcom/inkapplications/coroutines/ongoing/OngoingFlow;Lcom/inkapplications/coroutines/ongoing/OngoingFlow;)Lcom/inkapplications/coroutines/ongoing/OngoingFlow; + public static final fun combinePair (Lcom/inkapplications/coroutines/ongoing/OngoingFlow;Lcom/inkapplications/coroutines/ongoing/OngoingFlow;)Lcom/inkapplications/coroutines/ongoing/OngoingFlow; + public static final fun combineTriple (Lcom/inkapplications/coroutines/ongoing/OngoingFlow;Lcom/inkapplications/coroutines/ongoing/OngoingFlow;)Lcom/inkapplications/coroutines/ongoing/OngoingFlow; + public static final fun combineWith (Lcom/inkapplications/coroutines/ongoing/OngoingFlow;Lcom/inkapplications/coroutines/ongoing/OngoingFlow;Lkotlin/jvm/functions/Function2;)Lcom/inkapplications/coroutines/ongoing/OngoingFlow; + public static final fun distinctUntilChanged (Lcom/inkapplications/coroutines/ongoing/OngoingFlow;)Lcom/inkapplications/coroutines/ongoing/OngoingFlow; + public static final fun drop (Lcom/inkapplications/coroutines/ongoing/OngoingFlow;I)Lcom/inkapplications/coroutines/ongoing/OngoingFlow; + public static final fun dropWhile (Lcom/inkapplications/coroutines/ongoing/OngoingFlow;Lkotlin/jvm/functions/Function1;)Lcom/inkapplications/coroutines/ongoing/OngoingFlow; + public static final fun filter (Lcom/inkapplications/coroutines/ongoing/OngoingFlow;Lkotlin/jvm/functions/Function1;)Lcom/inkapplications/coroutines/ongoing/OngoingFlow; + public static final fun filterItemFailure (Lcom/inkapplications/coroutines/ongoing/OngoingFlow;)Lcom/inkapplications/coroutines/ongoing/OngoingFlow; + public static final fun filterItemNotNull (Lcom/inkapplications/coroutines/ongoing/OngoingFlow;)Lcom/inkapplications/coroutines/ongoing/OngoingFlow; + public static final fun filterItemSuccess (Lcom/inkapplications/coroutines/ongoing/OngoingFlow;)Lcom/inkapplications/coroutines/ongoing/OngoingFlow; + public static final fun filterItems (Lcom/inkapplications/coroutines/ongoing/OngoingFlow;Lkotlin/jvm/functions/Function2;)Lcom/inkapplications/coroutines/ongoing/OngoingFlow; + public static final fun filterNotNull (Lcom/inkapplications/coroutines/ongoing/OngoingFlow;)Lcom/inkapplications/coroutines/ongoing/OngoingFlow; + public static final fun first (Lcom/inkapplications/coroutines/ongoing/OngoingFlow;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; + public static final fun flatMapConcat (Lcom/inkapplications/coroutines/ongoing/OngoingFlow;Lkotlin/jvm/functions/Function1;)Lcom/inkapplications/coroutines/ongoing/OngoingFlow; + public static final fun flatMapLatest (Lcom/inkapplications/coroutines/ongoing/OngoingFlow;Lkotlin/jvm/functions/Function2;)Lcom/inkapplications/coroutines/ongoing/OngoingFlow; + public static final fun map (Lcom/inkapplications/coroutines/ongoing/OngoingFlow;Lkotlin/jvm/functions/Function2;)Lcom/inkapplications/coroutines/ongoing/OngoingFlow; + public static final fun mapItems (Lcom/inkapplications/coroutines/ongoing/OngoingFlow;Lkotlin/jvm/functions/Function2;)Lcom/inkapplications/coroutines/ongoing/OngoingFlow; + public static final fun mapItemsCatching (Lcom/inkapplications/coroutines/ongoing/OngoingFlow;Lkotlin/jvm/functions/Function1;)Lcom/inkapplications/coroutines/ongoing/OngoingFlow; + public static final fun mapLatest (Lcom/inkapplications/coroutines/ongoing/OngoingFlow;Lkotlin/jvm/functions/Function2;)Lcom/inkapplications/coroutines/ongoing/OngoingFlow; + public static final fun onEach (Lcom/inkapplications/coroutines/ongoing/OngoingFlow;Lkotlin/jvm/functions/Function1;)Lcom/inkapplications/coroutines/ongoing/OngoingFlow; + public static final fun onItemFailure (Lcom/inkapplications/coroutines/ongoing/OngoingFlow;Lkotlin/jvm/functions/Function1;)Lcom/inkapplications/coroutines/ongoing/OngoingFlow; + public static final fun safeCollect (Lcom/inkapplications/coroutines/ongoing/OngoingFlow;Lkotlin/jvm/functions/Function2;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; + public static final fun startWith (Lcom/inkapplications/coroutines/ongoing/OngoingFlow;Ljava/lang/Object;)Lcom/inkapplications/coroutines/ongoing/OngoingFlow; + public static final fun take (Lcom/inkapplications/coroutines/ongoing/OngoingFlow;I)Lcom/inkapplications/coroutines/ongoing/OngoingFlow; + public static final fun unsafeTransform (Lcom/inkapplications/coroutines/ongoing/OngoingFlow;Lkotlin/jvm/functions/Function1;)Lcom/inkapplications/coroutines/ongoing/OngoingFlow; +} + +public final class com/inkapplications/coroutines/ongoing/UnexpectedEndOfFlow : java/lang/IllegalStateException { + public fun ()V + public fun (Ljava/lang/String;)V + public synthetic fun (Ljava/lang/String;ILkotlin/jvm/internal/DefaultConstructorMarker;)V +} + +public final class com/inkapplications/coroutines/ongoing/WrappedOngoingFlowKt { + public static final fun asOngoing (Lkotlinx/coroutines/flow/Flow;)Lcom/inkapplications/coroutines/ongoing/OngoingFlow; + public static final fun ongoingFlow (Lkotlin/jvm/functions/Function2;)Lcom/inkapplications/coroutines/ongoing/OngoingFlow; + public static final fun ongoingFlowOf ([Ljava/lang/Object;)Lcom/inkapplications/coroutines/ongoing/OngoingFlow; +} + diff --git a/coroutines/build.gradle.kts b/coroutines/build.gradle.kts index 759581d..97ceff7 100644 --- a/coroutines/build.gradle.kts +++ b/coroutines/build.gradle.kts @@ -12,7 +12,7 @@ kotlin { } } - val jvmTest by getting { + val commonTest by getting { dependencies { implementation(kotlin("test")) implementation(libs.kotlinx.coroutines.test) diff --git a/coroutines/src/commonMain/kotlin/com/inkapplications/coroutines/ongoing/Collectors.kt b/coroutines/src/commonMain/kotlin/com/inkapplications/coroutines/ongoing/Collectors.kt new file mode 100644 index 0000000..d7434fe --- /dev/null +++ b/coroutines/src/commonMain/kotlin/com/inkapplications/coroutines/ongoing/Collectors.kt @@ -0,0 +1,29 @@ +package com.inkapplications.coroutines.ongoing + +import kotlinx.coroutines.flow.collect +import kotlinx.coroutines.flow.collectLatest + + +/** + * Collects a flow that never returns. + * + * This is identical to [collect], but throws an exception if the flow ends. + */ +suspend inline fun OngoingFlow.collect( + crossinline observer: suspend (T) -> Unit +): Nothing { + asFlow().collect { observer(it) } + throw UnexpectedEndOfFlow() +} + +/** + * Collects a flow that never returns. + * + * This is identical to a Flow's [collectLatest], but throws an exception if the flow ends. + */ +suspend inline fun OngoingFlow.collectLatest( + crossinline observer: suspend (T) -> Unit +): Nothing { + asFlow().collectLatest { observer(it) } + throw UnexpectedEndOfFlow() +} diff --git a/coroutines/src/commonMain/kotlin/com/inkapplications/coroutines/ongoing/OngoingFlow.kt b/coroutines/src/commonMain/kotlin/com/inkapplications/coroutines/ongoing/OngoingFlow.kt new file mode 100644 index 0000000..ab2e614 --- /dev/null +++ b/coroutines/src/commonMain/kotlin/com/inkapplications/coroutines/ongoing/OngoingFlow.kt @@ -0,0 +1,17 @@ +package com.inkapplications.coroutines.ongoing + +import kotlinx.coroutines.flow.Flow + +/** + * An Ongoing flow is like a flow, but is not expected to end. + * + * This wraps a flow, discouraging accidental use of unsafe extensions + * for this type of data stream. + */ +interface OngoingFlow +{ + /** + * Convert thee ongoing flow to a standard flow. + */ + fun asFlow(): Flow +} diff --git a/coroutines/src/commonMain/kotlin/com/inkapplications/coroutines/ongoing/Transformations.kt b/coroutines/src/commonMain/kotlin/com/inkapplications/coroutines/ongoing/Transformations.kt new file mode 100644 index 0000000..6aee30d --- /dev/null +++ b/coroutines/src/commonMain/kotlin/com/inkapplications/coroutines/ongoing/Transformations.kt @@ -0,0 +1,359 @@ +package com.inkapplications.coroutines.ongoing + +import com.inkapplications.coroutines.* +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.ExperimentalCoroutinesApi +import kotlinx.coroutines.flow.* + +/** + * Modify an ongoing flow temporarily as a standard flow. + * + * This allows standard flow operators to be applied to the OngoingFlow + * + * ongoingFlow.modify { + * filter { it != "foo" }.distinct() + * } + */ +inline fun OngoingFlow.unsafeTransform( + modifier: Flow.() -> Flow, +): OngoingFlow { + return asFlow().let(modifier).asOngoing() +} + +/** + * @see Flow.first + */ +suspend fun OngoingFlow.first(): T +{ + return asFlow().first() +} + +/** + * @see Flow.take + */ +fun OngoingFlow.take(count: Int): OngoingFlow +{ + return unsafeTransform { take(count) } +} + +/** + * @see Flow.drop + */ +fun OngoingFlow.drop(count: Int): OngoingFlow +{ + return unsafeTransform { drop(count) } +} + +/** + * @see Flow.dropWhile + */ +fun OngoingFlow.dropWhile( + predicate: (T) -> Boolean, +): OngoingFlow { + return unsafeTransform { dropWhile(predicate) } +} + +/** + * @see Flow.filterIsInstance + */ +inline fun OngoingFlow<*>.filterIsInstance(): OngoingFlow +{ + return unsafeTransform { filterIsInstance() } +} + +/** + * @see Flow.filter + */ +inline fun OngoingFlow.filter( + crossinline predicate: (T) -> Boolean, +): OngoingFlow { + return unsafeTransform { filter { predicate(it) } } +} + +/** + * @see Flow.map + */ +inline fun OngoingFlow.map( + crossinline mapper: suspend (T) -> R, +): OngoingFlow { + return unsafeTransform { map { mapper(it) } } +} + +/** + * @see Flow.mapLatest + */ +@OptIn(ExperimentalCoroutinesApi::class) +inline fun OngoingFlow.mapLatest( + crossinline mapper: suspend (T) -> R, +): OngoingFlow { + return unsafeTransform { mapLatest { mapper(it) } } +} + +/** + * @see Flow.flatMapLatest + */ +@OptIn(ExperimentalCoroutinesApi::class) +inline fun OngoingFlow.flatMapLatest( + crossinline mapper: suspend (T) -> Flow, +): OngoingFlow { + return unsafeTransform { flatMapLatest { mapper(it) } } +} + +/** + * @see Flow.flatMapConcat + */ +@OptIn(ExperimentalCoroutinesApi::class) +inline fun OngoingFlow.flatMapConcat( + crossinline mapper: (T) -> Flow, +): OngoingFlow { + return unsafeTransform { flatMapConcat { mapper(it) } } +} + +/** + * @see Flow.onEach + */ +inline fun OngoingFlow.onEach( + crossinline action: (T) -> Unit, +): OngoingFlow { + return unsafeTransform { onEach { action(it) } } +} + +/** + * @see Flow.combine + */ +fun OngoingFlow.combineWith( + other: OngoingFlow, + transform: (a: T1, b: T2) -> R, +): OngoingFlow { + return unsafeTransform { + this.combine(other.asFlow(), transform) + } +} + +/** + * @see Flow.filterNotNull + */ +fun OngoingFlow.filterNotNull(): OngoingFlow +{ + return unsafeTransform { filterNotNull() } +} + +/** + * @see Flow.distinctUntilChanged + */ +fun OngoingFlow.distinctUntilChanged(): OngoingFlow +{ + return unsafeTransform { distinctUntilChanged() } +} + +/** + * Emits an item at the start of a flow. + */ +fun OngoingFlow.startWith(item: T): OngoingFlow +{ + return unsafeTransform { onStart { emit(item) } } +} + +/** + * @see Flow.combinePair + */ +fun OngoingFlow.combinePair( + other: OngoingFlow, +): OngoingFlow> { + return unsafeTransform { + this.combinePair(other.asFlow()) + } +} + +/** + * @see Flow.combineTriple + */ +fun OngoingFlow>.combineTriple( + other: OngoingFlow, +): OngoingFlow> { + return unsafeTransform { + this.combineTriple(other.asFlow()) + } +} + +/** + * @see Flow.combineFlatten + */ +fun OngoingFlow>.combineFlatten( + other: OngoingFlow>, +): OngoingFlow> { + return unsafeTransform { + this.combineFlatten(other.asFlow()) + } +} + +/** + * @see Flow.safeCollect + */ +suspend fun OngoingFlow.safeCollect( + observer: suspend (T) -> Unit, +) { + asFlow().safeCollect(observer) + throw UnexpectedEndOfFlow() +} + +/** + * @see Flow.collectOn + */ +fun OngoingFlow.collectOn( + scope: CoroutineScope, + observer: suspend (T) -> Unit, +): Nothing { + asFlow().collectOn(scope, observer) + throw UnexpectedEndOfFlow() +} + +/** + * @see Flow.mapItems + */ +inline fun OngoingFlow>.mapItems( + crossinline mapping: suspend (T) -> R, +): OngoingFlow> { + return unsafeTransform { this.mapItems(mapping) } +} + +/** + * @see Flow.filterItems + */ +inline fun OngoingFlow>.filterItems( + crossinline predicate: suspend (T) -> Boolean, +): OngoingFlow> { + return unsafeTransform { this.filterItems(predicate) } +} + +/** + * @see Flow.filterItemIsInstance + */ +inline fun OngoingFlow>.filterItemIsInstance(): OngoingFlow> +{ + return unsafeTransform { this.filterItemIsInstance() } +} + +/** + * @see Flow.filterItemNotNull + */ +fun OngoingFlow>.filterItemNotNull(): OngoingFlow> +{ + return unsafeTransform { this.filterItemNotNull() } +} + +/** + * @see Flow.mapItemsCatching + */ +inline fun OngoingFlow>.mapItemsCatching( + crossinline mapping: (T) -> R, +): OngoingFlow>> { + return unsafeTransform { this.mapItemsCatching(mapping) } +} + +/** + * @see Flow.onItemFailure + */ +inline fun OngoingFlow>>.onItemFailure( + crossinline action: (Throwable) -> Unit, +): OngoingFlow>> { + return unsafeTransform { this.onItemFailure(action) } +} + +/** + * @see Flow.filterItemSuccess + */ +fun OngoingFlow>>.filterItemSuccess(): OngoingFlow> +{ + return unsafeTransform { this.filterItemSuccess() } +} + +/** + * @see Flow.filterItemFailure + */ +fun OngoingFlow>>.filterItemFailure(): OngoingFlow> +{ + return unsafeTransform { this.filterItemFailure() } +} + +/** + * @see Flow.combineApply + */ +inline fun OngoingFlow.combineApply( + other: Flow, + crossinline applicator: STATE.(ITEM) -> Unit, +): OngoingFlow { + return unsafeTransform { this.combineApply(other, applicator) } +} + +/** + * Analogue to [Flow.combine] for ongoing flows. + */ +fun combine( + flow1: OngoingFlow, + flow2: OngoingFlow, + transform: (a: T1, b: T2) -> R, +): OngoingFlow { + return combine( + flow1.asFlow(), + flow2.asFlow(), + transform + ).asOngoing() +} + +/** + * Analogue to [Flow.combine] for ongoing flows. + */ +fun combine( + flow1: OngoingFlow, + flow2: OngoingFlow, + flow3: OngoingFlow, + transform: (a: T1, b: T2, c: T3) -> R, +): OngoingFlow { + return combine( + flow1.asFlow(), + flow2.asFlow(), + flow3.asFlow(), + transform + ).asOngoing() +} + +/** + * Analogue to [Flow.combine] for ongoing flows. + */ +fun combine( + flow1: OngoingFlow, + flow2: OngoingFlow, + flow3: OngoingFlow, + flow4: OngoingFlow, + transform: (a: T1, b: T2, c: T3, d: T4) -> R, +): OngoingFlow { + return combine( + flow1.asFlow(), + flow2.asFlow(), + flow3.asFlow(), + flow4.asFlow(), + transform + ).asOngoing() +} + +/** + * Analogue to [Flow.combine] for ongoing flows. + */ +fun combine( + flow1: OngoingFlow, + flow2: OngoingFlow, + flow3: OngoingFlow, + flow4: OngoingFlow, + flow5: OngoingFlow, + transform: (a: T1, b: T2, c: T3, d: T4, e: T5) -> R, +): OngoingFlow { + return combine( + flow1.asFlow(), + flow2.asFlow(), + flow3.asFlow(), + flow4.asFlow(), + flow5.asFlow(), + transform + ).asOngoing() +} diff --git a/coroutines/src/commonMain/kotlin/com/inkapplications/coroutines/ongoing/UnexpectedEndOfFlow.kt b/coroutines/src/commonMain/kotlin/com/inkapplications/coroutines/ongoing/UnexpectedEndOfFlow.kt new file mode 100644 index 0000000..3d4f5ba --- /dev/null +++ b/coroutines/src/commonMain/kotlin/com/inkapplications/coroutines/ongoing/UnexpectedEndOfFlow.kt @@ -0,0 +1,10 @@ +package com.inkapplications.coroutines.ongoing + +/** + * Exception thrown when an [OngoingFlow] ends unexpectedly. + */ +class UnexpectedEndOfFlow( + message: String? = null, +): IllegalStateException( + message ?: "Unexpected end of flow" +) diff --git a/coroutines/src/commonMain/kotlin/com/inkapplications/coroutines/ongoing/WrappedOngoingFlow.kt b/coroutines/src/commonMain/kotlin/com/inkapplications/coroutines/ongoing/WrappedOngoingFlow.kt new file mode 100644 index 0000000..17cd109 --- /dev/null +++ b/coroutines/src/commonMain/kotlin/com/inkapplications/coroutines/ongoing/WrappedOngoingFlow.kt @@ -0,0 +1,44 @@ +package com.inkapplications.coroutines.ongoing + +import kotlinx.coroutines.awaitCancellation +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.FlowCollector +import kotlinx.coroutines.flow.flow +import kotlin.jvm.JvmInline + +/** + * Wrapped implementation of an ongoing flow. + */ +@JvmInline +private value class WrappedOngoingFlow( + private val backing: Flow +): OngoingFlow { + override fun asFlow(): Flow = backing +} + +/** + * Convert an existing flow to an ongoing flow. + */ +fun Flow.asOngoing(): OngoingFlow = WrappedOngoingFlow(this) + +/** + * Create an ongoing flow. + */ +inline fun ongoingFlow( + crossinline builder: suspend FlowCollector.() -> Unit, +): OngoingFlow { + return flow { + builder(this) + awaitCancellation() + }.asOngoing() +} + +/** + * Create an ongoing flow from a set of items. + */ +fun ongoingFlowOf(vararg items: T): OngoingFlow +{ + return ongoingFlow { + items.forEach { emit(it) } + } +} diff --git a/coroutines/src/jvmTest/kotlin/com/inkapplications/coroutines/FlowsTest.kt b/coroutines/src/commonTest/kotlin/com/inkapplications/coroutines/FlowsTest.kt similarity index 83% rename from coroutines/src/jvmTest/kotlin/com/inkapplications/coroutines/FlowsTest.kt rename to coroutines/src/commonTest/kotlin/com/inkapplications/coroutines/FlowsTest.kt index e3ee5e7..03dc36a 100644 --- a/coroutines/src/jvmTest/kotlin/com/inkapplications/coroutines/FlowsTest.kt +++ b/coroutines/src/commonTest/kotlin/com/inkapplications/coroutines/FlowsTest.kt @@ -1,16 +1,16 @@ package com.inkapplications.coroutines +import com.inkapplications.coroutines.doubles.Animal +import com.inkapplications.coroutines.doubles.Plants import kotlinx.coroutines.* import kotlinx.coroutines.flow.asFlow import kotlinx.coroutines.flow.flowOf import kotlinx.coroutines.flow.toList import kotlinx.coroutines.test.runTest -import java.util.concurrent.Executors import kotlin.test.Test import kotlin.test.assertEquals import kotlin.test.assertTrue -@OptIn(ExperimentalCoroutinesApi::class) class FlowsTest { @Test fun safeCollectTest() = runTest { @@ -23,25 +23,25 @@ class FlowsTest { } - @Test(expected = CancellationException::class) - fun safeCollectCancels() { - runBlocking { - (1..5).asFlow().safeCollect { value -> - if (value > 3) throw AssertionError("Flow should not be collected") - if (value == 3) cancel() - } - } - } - - @Test - fun collectOnTest() = runTest { - val fixedThread = Thread() - val scope = Executors.newSingleThreadExecutor { fixedThread }.asCoroutineDispatcher().let(::CoroutineScope) - - flowOf(1).collectOn(scope) { - assertEquals(fixedThread, Thread.currentThread(), "Run on correct scope") - } - } +// @Test(expected = CancellationException::class) +// fun safeCollectCancels() { +// runBlocking { +// (1..5).asFlow().safeCollect { value -> +// if (value > 3) throw AssertionError("Flow should not be collected") +// if (value == 3) cancel() +// } +// } +// } + +// @Test +// fun collectOnTest() = runTest { +// val fixedThread = Thread() +// val scope = Executors.newSingleThreadExecutor { fixedThread }.asCoroutineDispatcher().let(::CoroutineScope) +// +// flowOf(1).collectOn(scope) { +// assertEquals(fixedThread, Thread.currentThread(), "Run on correct scope") +// } +// } @Test fun mapEachTest() = runTest { @@ -74,15 +74,20 @@ class FlowsTest { @Test fun filterEachIsInstanceTest() = runTest { val initial = flowOf( - listOf(1.0, 2f), - listOf(3f, 4.0) + listOf(Animal.Dog, Plants.Flower), + listOf(Animal.Cat, Plants.Tree, Animal.Bird) ) - val result = initial.filterItemIsInstance().toList() + val result = initial.filterItemIsInstance().toList() assertEquals(2, result.size, "Each item has a 1:1 mapping") - assertEquals(listOf(2f), result[0], "Filter is applied to each result") - assertEquals(listOf(3f), result[1], "Filter is applied to each result") + val first = result[0] + assertEquals(1, first.size, "Filter only emits items of the correct type") + assertTrue(first[0] is Animal.Dog, "Filter only emits items of the correct type") + + val second = result[1] + assertEquals(2, second.size, "Filter only emits items of the correct type") + assertTrue(second.all { it is Animal }, "Filter only emits items of the correct type") } @Test diff --git a/coroutines/src/jvmTest/kotlin/com/inkapplications/coroutines/RetriesTest.kt b/coroutines/src/commonTest/kotlin/com/inkapplications/coroutines/RetriesTest.kt similarity index 100% rename from coroutines/src/jvmTest/kotlin/com/inkapplications/coroutines/RetriesTest.kt rename to coroutines/src/commonTest/kotlin/com/inkapplications/coroutines/RetriesTest.kt diff --git a/coroutines/src/jvmTest/kotlin/com/inkapplications/coroutines/RetryStrategyTest.kt b/coroutines/src/commonTest/kotlin/com/inkapplications/coroutines/RetryStrategyTest.kt similarity index 100% rename from coroutines/src/jvmTest/kotlin/com/inkapplications/coroutines/RetryStrategyTest.kt rename to coroutines/src/commonTest/kotlin/com/inkapplications/coroutines/RetryStrategyTest.kt diff --git a/coroutines/src/commonTest/kotlin/com/inkapplications/coroutines/doubles/DummyInstances.kt b/coroutines/src/commonTest/kotlin/com/inkapplications/coroutines/doubles/DummyInstances.kt new file mode 100644 index 0000000..2bc8753 --- /dev/null +++ b/coroutines/src/commonTest/kotlin/com/inkapplications/coroutines/doubles/DummyInstances.kt @@ -0,0 +1,17 @@ +package com.inkapplications.coroutines.doubles + +interface Organism + +interface Animal: Organism +{ + object Dog: Animal + object Cat: Animal + object Bird: Animal +} + +interface Plants: Organism +{ + object Tree: Plants + object Flower: Plants + object Grass: Plants +} diff --git a/coroutines/src/commonTest/kotlin/com/inkapplications/coroutines/ongoing/CollectorsTest.kt b/coroutines/src/commonTest/kotlin/com/inkapplications/coroutines/ongoing/CollectorsTest.kt new file mode 100644 index 0000000..e0f65c4 --- /dev/null +++ b/coroutines/src/commonTest/kotlin/com/inkapplications/coroutines/ongoing/CollectorsTest.kt @@ -0,0 +1,114 @@ +package com.inkapplications.coroutines.ongoing + +import com.inkapplications.coroutines.doubles.Animal +import kotlinx.coroutines.async +import kotlinx.coroutines.cancelAndJoin +import kotlinx.coroutines.channels.awaitClose +import kotlinx.coroutines.delay +import kotlinx.coroutines.flow.MutableSharedFlow +import kotlinx.coroutines.flow.channelFlow +import kotlinx.coroutines.flow.first +import kotlinx.coroutines.flow.flow +import kotlinx.coroutines.launch +import kotlinx.coroutines.test.advanceUntilIdle +import kotlinx.coroutines.test.runTest +import kotlin.test.Test +import kotlin.test.assertEquals +import kotlin.test.assertSame +import kotlin.test.assertTrue +import kotlin.time.Duration.Companion.minutes + +class CollectorsTest +{ + @Test + fun collect() + { + runTest { + val flow = channelFlow { + send(Animal.Bird) + awaitClose {} + } + val ongoing = flow.asOngoing() + val collected = MutableSharedFlow() + + val result = async { collected.first() } + val job = launch { ongoing.collect { collected.emit(it) } } + + assertEquals(Animal.Bird, result.await(), "Collected item is emitted") + job.cancelAndJoin() + } + } + + @Test + fun collectLatest() + { + runTest { + val flow = channelFlow { + send(Animal.Bird) + delay(1.minutes) + send(Animal.Cat) + awaitClose {} + } + val ongoing = flow.asOngoing() + val collected = MutableSharedFlow() + + val result = async { collected.first() } + val job = launch { + ongoing.collectLatest { + delay(2.minutes) + collected.emit(it) + } + } + + advanceUntilIdle() + assertSame(Animal.Cat, result.await(), "Newest item is collected") + job.cancelAndJoin() + } + } + + @Test + fun unexpectedEnd() + { + runTest { + val flow = flow { + emit(Animal.Bird) + } + val ongoing = flow.asOngoing() + + val job = async { + runCatching { + ongoing.collect { + delay(2.minutes) + } + } + } + + advanceUntilIdle() + assertTrue(job.await().isFailure, "Exception thrown if flow ends") + assertTrue(job.await().exceptionOrNull() is UnexpectedEndOfFlow, "Exception is UnexpectedEndOfFlow") + } + } + + @Test + fun latestUnexpectedEnd() + { + runTest { + val flow = flow { + emit(Animal.Bird) + } + val ongoing = flow.asOngoing() + + val job = async { + runCatching { + ongoing.collectLatest { + delay(2.minutes) + } + } + } + + advanceUntilIdle() + assertTrue(job.await().isFailure, "Exception thrown if flow ends") + assertTrue(job.await().exceptionOrNull() is UnexpectedEndOfFlow, "Exception is UnexpectedEndOfFlow") + } + } +} diff --git a/coroutines/src/commonTest/kotlin/com/inkapplications/coroutines/ongoing/OngoingFlowTest.kt b/coroutines/src/commonTest/kotlin/com/inkapplications/coroutines/ongoing/OngoingFlowTest.kt new file mode 100644 index 0000000..4e69136 --- /dev/null +++ b/coroutines/src/commonTest/kotlin/com/inkapplications/coroutines/ongoing/OngoingFlowTest.kt @@ -0,0 +1,58 @@ +package com.inkapplications.coroutines.ongoing + +import com.inkapplications.coroutines.doubles.Animal +import kotlinx.coroutines.async +import kotlinx.coroutines.cancelAndJoin +import kotlinx.coroutines.flow.* +import kotlinx.coroutines.launch +import kotlinx.coroutines.test.runCurrent +import kotlinx.coroutines.test.runTest +import kotlin.test.Test +import kotlin.test.assertSame +import kotlin.test.assertTrue + +class OngoingFlowTest +{ + @Test + fun convertFlow() + { + val flow = channelFlow { send(Animal.Bird) } + val ongoing = flow.asOngoing() + + assertSame(flow, ongoing.asFlow(), "Flow instance is preserved") + } + + @Test + fun ongoingFlowOf() + { + runTest { + val ongoing = ongoingFlowOf(Animal.Bird, Animal.Cat) + val collected = mutableListOf() + + val job = launch { ongoing.collect { collected += it } } + + runCurrent() + assertSame(Animal.Bird, collected[0], "First item is emitted") + assertSame(Animal.Cat, collected[1], "Second item is emitted") + assertTrue(job.isActive, "Flow does not close") + job.cancelAndJoin() + } + } + + @Test + fun ongoingFlowBuilder() + { + runTest { + val ongoing = ongoingFlow { emit(Animal.Bird); emit(Animal.Cat) } + val collected = mutableListOf() + + val job = launch { ongoing.collect { collected += it } } + + runCurrent() + assertSame(Animal.Bird, collected[0], "First item is emitted") + assertSame(Animal.Cat, collected[1], "Second item is emitted") + assertTrue(job.isActive, "Flow does not close") + job.cancelAndJoin() + } + } +} diff --git a/coroutines/src/commonTest/kotlin/com/inkapplications/coroutines/ongoing/TransformationsTest.kt b/coroutines/src/commonTest/kotlin/com/inkapplications/coroutines/ongoing/TransformationsTest.kt new file mode 100644 index 0000000..566dc33 --- /dev/null +++ b/coroutines/src/commonTest/kotlin/com/inkapplications/coroutines/ongoing/TransformationsTest.kt @@ -0,0 +1,28 @@ +package com.inkapplications.coroutines.ongoing + +import kotlinx.coroutines.cancelAndJoin +import kotlinx.coroutines.flow.map +import kotlinx.coroutines.launch +import kotlinx.coroutines.test.runCurrent +import kotlinx.coroutines.test.runTest +import kotlin.test.Test +import kotlin.test.assertEquals + +class TransformationsTest +{ + @Test + fun unsafeTransform() + { + runTest { + val original = ongoingFlowOf(1, 2, 3) + val transformed = original.unsafeTransform { map { it * 2 } } + val collected = mutableListOf() + + val result = launch { transformed.collect { collected += it } } + + runCurrent() + assertEquals(listOf(2, 4, 6), collected, "Transformation is applied to each item") + result.cancelAndJoin() + } + } +} diff --git a/coroutines/src/jvmTest/kotlin/com/inkapplications/coroutines/JvmFlowsTest.kt b/coroutines/src/jvmTest/kotlin/com/inkapplications/coroutines/JvmFlowsTest.kt new file mode 100644 index 0000000..a559046 --- /dev/null +++ b/coroutines/src/jvmTest/kotlin/com/inkapplications/coroutines/JvmFlowsTest.kt @@ -0,0 +1,40 @@ +package com.inkapplications.coroutines + +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.asCoroutineDispatcher +import kotlinx.coroutines.cancel +import kotlinx.coroutines.flow.asFlow +import kotlinx.coroutines.flow.flowOf +import kotlinx.coroutines.runBlocking +import kotlinx.coroutines.test.runTest +import java.util.concurrent.Executors +import kotlin.coroutines.cancellation.CancellationException +import kotlin.test.Test +import kotlin.test.assertEquals + +class JvmFlowsTest +{ + @Test(expected = CancellationException::class) + fun safeCollectCancels() + { + runBlocking { + (1..5).asFlow().safeCollect { value -> + if (value > 3) throw AssertionError("Flow should not be collected") + if (value == 3) cancel() + } + } + } + + @Test + fun collectOnTest() + { + runTest { + val fixedThread = Thread() + val scope = Executors.newSingleThreadExecutor { fixedThread }.asCoroutineDispatcher().let(::CoroutineScope) + + flowOf(1).collectOn(scope) { + assertEquals(fixedThread, Thread.currentThread(), "Run on correct scope") + } + } + } +}