Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ Change Log
- Coroutines
- `runRetryable` method for retrying an operation with optional delay
strategies such as exponential backoff.
- `whenTrue` extension for collecting a flow only when a value is true.

### Changed

Expand Down
2 changes: 2 additions & 0 deletions coroutines/api/coroutines.api
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ public final class com/inkapplications/coroutines/FlowsKt {
public static final fun mapItemsCatching (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function1;)Lkotlinx/coroutines/flow/Flow;
public static final fun onItemFailure (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function1;)Lkotlinx/coroutines/flow/Flow;
public static final fun safeCollect (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static final fun whenTrue (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
}

public final class com/inkapplications/coroutines/RetriesKt {
Expand Down Expand Up @@ -79,6 +80,7 @@ public final class com/inkapplications/coroutines/RetryStrategy$Static : com/ink
public final class com/inkapplications/coroutines/ongoing/CollectorsKt {
public static final fun collect (Lcom/inkapplications/coroutines/ongoing/OngoingFlow;Lkotlin/jvm/functions/Function2;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static final fun collectLatest (Lcom/inkapplications/coroutines/ongoing/OngoingFlow;Lkotlin/jvm/functions/Function2;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static final fun whenTrue (Lcom/inkapplications/coroutines/ongoing/OngoingFlow;Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
}

public abstract interface class com/inkapplications/coroutines/ongoing/OngoingFlow {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,22 @@ fun <T> Flow<T>.collectOn(scope: CoroutineScope, action: suspend (T) -> Unit): J
}
}

/**
* Collector that is invoked whenever the emitted value is true.
*
* This is run with a [collectLatest] operation, ensuring that if the
* value becomes false, the collector will be canceled.
* The value is also filtered with a [distinctUntilChanged] operation
* to avoid multiple invocations of the collector.
*/
suspend fun Flow<Boolean>.whenTrue(
collector: suspend () -> Unit,
) {
distinctUntilChanged().collectLatest {
if (it) collector()
}
}

/**
* Map each item in the emitted lists for the flow.
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.inkapplications.coroutines.ongoing

import com.inkapplications.coroutines.whenTrue
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.collectLatest

Expand Down Expand Up @@ -27,3 +28,13 @@ suspend inline fun <T> OngoingFlow<T>.collectLatest(
asFlow().collectLatest { observer(it) }
throw UnexpectedEndOfFlow()
}

/**
* @see whenTrue
*/
suspend inline fun OngoingFlow<Boolean>.whenTrue(
crossinline observer: suspend () -> Unit
): Nothing {
asFlow().whenTrue { observer() }
throw UnexpectedEndOfFlow()
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,8 @@ package com.inkapplications.coroutines
import com.inkapplications.coroutines.doubles.Animal
import com.inkapplications.coroutines.doubles.Plants
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.asFlow
import kotlinx.coroutines.flow.flowOf
import kotlinx.coroutines.flow.toList
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.test.runCurrent
import kotlinx.coroutines.test.runTest
import kotlin.test.Test
import kotlin.test.assertEquals
Expand All @@ -23,25 +22,37 @@ class FlowsTest {

}

// @Test(expected = CancellationException::class)
// fun safeCollectCancels() {
// runBlocking {
// (1..5).asFlow().safeCollect { value ->
// if (value > 3) throw AssertionError("Flow should not be collected")
// if (value == 3) cancel()
// }
// }
// }

// @Test
// fun collectOnTest() = runTest {
// val fixedThread = Thread()
// val scope = Executors.newSingleThreadExecutor { fixedThread }.asCoroutineDispatcher().let(::CoroutineScope)
//
// flowOf(1).collectOn(scope) {
// assertEquals(fixedThread, Thread.currentThread(), "Run on correct scope")
// }
// }
@Test
fun whenTrueTest()
{
runTest {
val flow = MutableSharedFlow<Boolean>()
var collected = 0

val job = launch {
flow.whenTrue {
++collected
}
}

runCurrent()
assertEquals(0, collected, "Does not run before emissions")

flow.emit(false)
runCurrent()
assertEquals(0, collected, "Does not run when false")

flow.emit(true)
runCurrent()
assertEquals(1, collected, "Runs when true")

flow.emit(true)
runCurrent()
assertEquals(1, collected, "Does not re-run for duplicate event")

job.cancelAndJoin()
}
}

@Test
fun mapEachTest() = runTest {
Expand Down
Loading