diff --git a/.github/workflows/build.yaml b/.github/workflows/build.yaml index 1fbd6c9..745e49d 100644 --- a/.github/workflows/build.yaml +++ b/.github/workflows/build.yaml @@ -38,21 +38,3 @@ jobs: path: | **/build/reports/ **/build/test-results/ - -# test-sbt: -# runs-on: ubuntu-latest -# steps: -# - uses: actions/checkout@v4 -# - name: Set up JDK 21 -# uses: actions/setup-java@v4 -# with: -# cache: 'sbt' -# java-version: 21 -# distribution: 'temurin' -# - name: Setup Gradle -# uses: gradle/actions/setup-gradle@v3 -# - name: Setup sbt -# uses: sbt/setup-sbt@v1 -# - name: Build and test -# run: sbt -v publishLocalGradleDependencies ++test -# working-directory: ./tasks-scala diff --git a/.github/workflows/publish-release.yaml b/.github/workflows/publish-release.yaml index 95620da..f31efa4 100644 --- a/.github/workflows/publish-release.yaml +++ b/.github/workflows/publish-release.yaml @@ -2,6 +2,9 @@ name: Publish Release on: workflow_dispatch +permissions: + contents: write + jobs: publish-release: runs-on: ubuntu-latest @@ -12,6 +15,21 @@ jobs: with: java-version: 17 distribution: 'temurin' + - name: Generate API docs + run: ./gradlew -PbuildRelease=true dokkaGeneratePublicationHtml --no-daemon + - name: Prepare GitHub Pages content + run: | + VERSION=$(grep -E '^project\.version=' gradle.properties | cut -d= -f2 | tr -d '[:space:]') + mkdir -p "site/api/$VERSION" + cp -R build/dokka/. "site/api/$VERSION/" + ln -sfn "$VERSION" "site/api/current" + - name: Publish API docs to GitHub Pages + uses: peaceiris/actions-gh-pages@v4 + with: + github_token: ${{ secrets.GITHUB_TOKEN }} + publish_branch: gh-pages + publish_dir: site + keep_files: true - name: Publish Snapshot run: ./gradlew -PbuildRelease=true build publish --no-daemon env: diff --git a/.gitignore b/.gitignore index 6b6a7b6..541cfbb 100644 --- a/.gitignore +++ b/.gitignore @@ -39,8 +39,8 @@ bin/ .DS_Store ### Kotlin 2? -/.kotlin -/kotlin-js-store/ +.kotlin +kotlin-js-store/ # sbt specific dist/* diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..862bfdc --- /dev/null +++ b/LICENSE @@ -0,0 +1,190 @@ + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + Copyright 2024 Alexandru Nedelcu + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. diff --git a/build.gradle.kts b/build.gradle.kts index 2d4f0db..8f03191 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -3,6 +3,7 @@ import com.github.benmanes.gradle.versions.updates.DependencyUpdatesTask val projectVersion = property("project.version").toString() plugins { + id("org.jetbrains.dokka") id("com.github.ben-manes.versions") } @@ -10,6 +11,43 @@ repositories { mavenCentral() } +buildscript { + dependencies { + classpath("org.jetbrains.dokka:dokka-base:2.1.0") + // classpath("org.jetbrains.dokka:kotlin-as-java-plugin:2.0.0") + } +} + +//dokka { +// dokkaPublications.html { +// outputDirectory.set(rootDir.resolve("build/dokka")) +// outputDirectory.set(file("build/dokka")) +// } +//} + +dokka { + dokkaPublications.html { + includes.from("docs/introduction.md") + outputDirectory.set(file("build/dokka")) + } + + pluginsConfiguration.html { + customAssets.from( + "docs/funfix-512.png", + "docs/favicon.ico" + ) + customStyleSheets.from("docs/logo-styles.css") + templatesDir.set(file("docs/dokka-templates")) + footerMessage.set("© Alexandru Nedelcu") + } +} + +dependencies { + dokka(project(":tasks-jvm")) + dokka(project(":tasks-kotlin")) + dokka(project(":tasks-kotlin-coroutines")) +} + tasks.named("dependencyUpdates").configure { fun isNonStable(version: String): Boolean { val stableKeyword = listOf("RELEASE", "FINAL", "GA").any { version.uppercase().contains(it) } diff --git a/buildSrc/build.gradle.kts b/buildSrc/build.gradle.kts index 33e1ef6..f96debf 100644 --- a/buildSrc/build.gradle.kts +++ b/buildSrc/build.gradle.kts @@ -23,5 +23,10 @@ fun version(k: String) = dependencies { implementation(libs.gradle.versions.plugin) implementation(libs.vanniktech.publish.plugin) - implementation(libs.errorprone.gradle.plugin) + // removed errorprone plugin + // Provide plugins used by precompiled script plugins so their ids are available + implementation(libs.kotlin.gradle.plugin) + implementation(libs.kover.gradle.plugin) + implementation(libs.dokka.gradle.plugin) + implementation(libs.binary.compatibility.validator.plugin) } diff --git a/buildSrc/src/main/kotlin/tasks.java-project.gradle.kts b/buildSrc/src/main/kotlin/tasks.java-project.gradle.kts index 7bdb8a4..bb787e5 100644 --- a/buildSrc/src/main/kotlin/tasks.java-project.gradle.kts +++ b/buildSrc/src/main/kotlin/tasks.java-project.gradle.kts @@ -1,6 +1,20 @@ +import java.net.URI + plugins { `java-library` jacoco - id("net.ltgt.errorprone") + id("org.jetbrains.dokka") id("tasks.base") } + +dokka { + dokkaSourceSets.configureEach { + val tag = "v${project.version}" + val relativePath = project.projectDir.relativeTo(project.rootDir).invariantSeparatorsPath + sourceLink { + localDirectory.set(file("src")) + remoteUrl.set(URI("https://github.com/funfix/tasks/tree/${tag}/${relativePath}/src")) + remoteLineSuffix.set("#L") + } + } +} diff --git a/buildSrc/src/main/kotlin/tasks.kmp-project.gradle.kts b/buildSrc/src/main/kotlin/tasks.kmp-project.gradle.kts new file mode 100644 index 0000000..814e22f --- /dev/null +++ b/buildSrc/src/main/kotlin/tasks.kmp-project.gradle.kts @@ -0,0 +1,90 @@ +import org.jetbrains.kotlin.gradle.dsl.JvmTarget +import org.jetbrains.kotlin.gradle.tasks.KotlinCompile +import java.net.URI + +plugins { + id("org.jetbrains.kotlin.multiplatform") + id("org.jetbrains.kotlinx.kover") + id("org.jetbrains.dokka") + id("org.jetbrains.kotlinx.binary-compatibility-validator") + id("tasks.base") +} + +val dokkaOutputDir = layout.buildDirectory.dir("dokka").get().asFile + +dokka { + dokkaPublications.html { + outputDirectory.set(dokkaOutputDir) + } + + dokkaSourceSets.configureEach { + val tag = "v${project.version}" + val relativePath = project.projectDir.relativeTo(project.rootDir).invariantSeparatorsPath + sourceLink { + localDirectory.set(file("src")) + remoteUrl.set(URI("https://github.com/funfix/tasks/tree/${tag}/${relativePath}/src")) + remoteLineSuffix.set("#L") + } + } +} + +val deleteDokkaOutputDir by tasks.register("deleteDokkaOutputDirectory") { + delete(dokkaOutputDir) +} + +val javadocJar = tasks.register("javadocJar") { + archiveClassifier.set("javadoc") + duplicatesStrategy = DuplicatesStrategy.EXCLUDE + dependsOn(deleteDokkaOutputDir, tasks.named("dokkaGeneratePublicationHtml")) + from(dokkaOutputDir) +} + +java { + sourceCompatibility = JavaVersion.VERSION_17 + targetCompatibility = JavaVersion.VERSION_17 +} + +kotlin { + jvm {} + + js(IR) { + browser { + testTask { + useKarma { + useChromeHeadless() + } + } + } + } + + tasks.withType { + sourceCompatibility = JavaVersion.VERSION_17.majorVersion + targetCompatibility = JavaVersion.VERSION_17.majorVersion + jvmToolchain { + languageVersion.set(JavaLanguageVersion.of(JavaVersion.VERSION_17.majorVersion)) + } + } + + tasks.withType { + compilerOptions { + // Set on a project-by-project basis + // explicitApiMode = ExplicitApiMode.Strict + // allWarningsAsErrors = true + jvmTarget.set(JvmTarget.JVM_17) + freeCompilerArgs.add("-jvm-default=enable") + } + kotlinJavaToolchain.toolchain.use( + javaLauncher = javaToolchains.launcherFor { + languageVersion = JavaLanguageVersion.of(JavaVersion.VERSION_17.majorVersion) + } + ) + } +} + +tasks.withType { + useJUnitPlatform() + javaLauncher = + javaToolchains.launcherFor { + languageVersion = JavaLanguageVersion.of(JavaVersion.VERSION_17.majorVersion) + } +} diff --git a/buildSrc/src/main/kotlin/tasks.versions.gradle.kts b/buildSrc/src/main/kotlin/tasks.versions.gradle.kts new file mode 100644 index 0000000..2dc36a2 --- /dev/null +++ b/buildSrc/src/main/kotlin/tasks.versions.gradle.kts @@ -0,0 +1,24 @@ +import com.github.benmanes.gradle.versions.updates.DependencyUpdatesTask + +plugins { + id("com.github.ben-manes.versions") +} + +// Configure the plugin's task with shared defaults for all projects that apply this precompiled plugin +tasks.named("dependencyUpdates").configure { + fun isNonStable(version: String): Boolean { + val stableKeyword = listOf("RELEASE", "FINAL", "GA").any { version.uppercase().contains(it) } + val regex = "^[0-9,.v-]+(-r)?$".toRegex() + val isStable = stableKeyword || regex.matches(version) + return isStable.not() + } + + rejectVersionIf { + isNonStable(candidate.version) && !isNonStable(currentVersion) + } + + checkForGradleUpdate = true + outputFormatter = "html" + outputDir = "build/dependencyUpdates" + reportfileName = "report" +} diff --git a/docs/dokka-templates/includes/page_metadata.ftl b/docs/dokka-templates/includes/page_metadata.ftl new file mode 100644 index 0000000..962e316 --- /dev/null +++ b/docs/dokka-templates/includes/page_metadata.ftl @@ -0,0 +1,6 @@ +<#macro display> + ${pageName} + <@template_cmd name="pathToRoot"> + + + diff --git a/docs/favicon.ico b/docs/favicon.ico new file mode 100644 index 0000000..0977e87 Binary files /dev/null and b/docs/favicon.ico differ diff --git a/docs/funfix-512.png b/docs/funfix-512.png new file mode 100644 index 0000000..a2a84a2 Binary files /dev/null and b/docs/funfix-512.png differ diff --git a/docs/introduction.md b/docs/introduction.md new file mode 100644 index 0000000..65eb2b0 --- /dev/null +++ b/docs/introduction.md @@ -0,0 +1,3 @@ +# Funfix Tasks + +This is a library meant for library authors that want to build libraries that work across Java, Scala, or Kotlin, without having to worry about interoperability with whatever method of I/O that the library is using under the hood. diff --git a/docs/logo-styles.css b/docs/logo-styles.css new file mode 100644 index 0000000..0c47ff9 --- /dev/null +++ b/docs/logo-styles.css @@ -0,0 +1,6 @@ +/* Override Dokka logo to use Funfix PNG */ +:root { + --dokka-logo-image-url: url('../images/funfix-512.png'); + --dokka-logo-height: 32px; + --dokka-logo-width: 32px; +} diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index 0876f18..9df7dc8 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -1,21 +1,30 @@ [versions] -versions-plugin = "0.51.0" -publish-plugin = "0.29.0" -errorprone-plugin = "4.3.0" -errorprone = "2.41.0" -errorprone-nullaway = "0.12.8" - +binary-compatibility-validator = "0.16.2" +dokka = "2.1.0" jetbrains-annotations = "26.0.2" jspecify = "1.0.0" +junit-jupiter = "6.0.2" +kotlin = "2.3.0" +kover = "0.9.5" +lombok = "1.18.36" +publish-plugin = "0.29.0" +versions-plugin = "0.51.0" +kotlinx-coroutines = "1.10.2" [libraries] # Plugins specified in buildSrc/build.gradle.kts +binary-compatibility-validator-plugin = { module = "org.jetbrains.kotlinx.binary-compatibility-validator:org.jetbrains.kotlinx.binary-compatibility-validator.gradle.plugin", version.ref = "binary-compatibility-validator" } +dokka-gradle-plugin = { module = "org.jetbrains.dokka:dokka-gradle-plugin", version.ref = "dokka" } gradle-versions-plugin = { module = "com.github.ben-manes:gradle-versions-plugin", version.ref = "versions-plugin" } +kotlin-gradle-plugin = { module = "org.jetbrains.kotlin:kotlin-gradle-plugin", version.ref = "kotlin" } +kover-gradle-plugin = { module = "org.jetbrains.kotlinx:kover-gradle-plugin", version.ref = "kover" } vanniktech-publish-plugin = { module = "com.vanniktech:gradle-maven-publish-plugin", version.ref = "publish-plugin" } -errorprone-gradle-plugin = { module = "net.ltgt.gradle:gradle-errorprone-plugin", version.ref = "errorprone-plugin" } -errorprone-core = { module = "com.google.errorprone:error_prone_core", version.ref = "errorprone"} -errorprone-nullaway = { module = "com.uber.nullaway:nullaway", version.ref = "errorprone-nullaway" } # Actual libraries jetbrains-annotations = { module = "org.jetbrains:annotations", version.ref = "jetbrains-annotations" } jspecify = { module = "org.jspecify:jspecify", version.ref = "jspecify" } +junit-jupiter-api = { module = "org.junit.jupiter:junit-jupiter-api", version.ref = "junit-jupiter" } +kotlin-test = { module = "org.jetbrains.kotlin:kotlin-test", version.ref = "kotlin" } +kotlinx-coroutines-core = { module = "org.jetbrains.kotlinx:kotlinx-coroutines-core", version.ref = "kotlinx-coroutines" } +kotlinx-coroutines-test = { module = "org.jetbrains.kotlinx:kotlinx-coroutines-test", version.ref = "kotlinx-coroutines" } +lombok = { module = "org.projectlombok:lombok", version.ref = "lombok" } diff --git a/gradle/wrapper/gradle-wrapper.properties b/gradle/wrapper/gradle-wrapper.properties index ca025c8..37f78a6 100644 --- a/gradle/wrapper/gradle-wrapper.properties +++ b/gradle/wrapper/gradle-wrapper.properties @@ -1,6 +1,6 @@ distributionBase=GRADLE_USER_HOME distributionPath=wrapper/dists -distributionUrl=https\://services.gradle.org/distributions/gradle-8.14-bin.zip +distributionUrl=https\://services.gradle.org/distributions/gradle-9.3.1-bin.zip networkTimeout=10000 validateDistributionUrl=true zipStoreBase=GRADLE_USER_HOME diff --git a/settings.gradle.kts b/settings.gradle.kts index 98f724f..c425a36 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -1,6 +1,8 @@ rootProject.name = "tasks" include("tasks-jvm") +include("tasks-kotlin") +include("tasks-kotlin-coroutines") pluginManagement { repositories { diff --git a/tasks-jvm/build.gradle.kts b/tasks-jvm/build.gradle.kts index 0076d9e..1df7cb5 100644 --- a/tasks-jvm/build.gradle.kts +++ b/tasks-jvm/build.gradle.kts @@ -1,8 +1,6 @@ -import net.ltgt.gradle.errorprone.CheckSeverity -import net.ltgt.gradle.errorprone.errorprone - plugins { id("tasks.java-project") + id("tasks.versions") } mavenPublishing { @@ -15,12 +13,9 @@ mavenPublishing { dependencies { api(libs.jspecify) - errorprone(libs.errorprone.core) - errorprone(libs.errorprone.nullaway) - compileOnly(libs.jetbrains.annotations) - testImplementation(platform("org.junit:junit-bom:5.12.1")) + testImplementation(platform("org.junit:junit-bom:6.0.2")) testImplementation("org.junit.jupiter:junit-jupiter") testRuntimeOnly("org.junit.platform:junit-platform-launcher") } @@ -41,12 +36,7 @@ tasks.withType { "-Xlint:deprecation", // "-Werror" )) - - options.errorprone { - disableAllChecks.set(true) - check("NullAway", CheckSeverity.ERROR) - option("NullAway:AnnotatedPackages", "org.funfix") - } + } tasks.register("testsOn21") { diff --git a/tasks-jvm/src/main/java/org/funfix/tasks/jvm/Task.java b/tasks-jvm/src/main/java/org/funfix/tasks/jvm/Task.java index ec7b4ed..ff0acc7 100644 --- a/tasks-jvm/src/main/java/org/funfix/tasks/jvm/Task.java +++ b/tasks-jvm/src/main/java/org/funfix/tasks/jvm/Task.java @@ -509,9 +509,9 @@ public void invoke(Continuation continuation) { cancellableRef.set(() -> { try { - cancellable.cancel(); - } finally { future.cancel(true); + } finally { + cancellable.cancel(); } }); } catch (Throwable e) { @@ -526,7 +526,7 @@ private static CompletableFuture getCompletableFuture( ) { CompletableFuture future = cancellableFuture.future(); future.whenComplete((value, error) -> { - if (error instanceof InterruptedException || error instanceof TaskCancellationException) { + if (error instanceof InterruptedException || error instanceof TaskCancellationException || error instanceof CancellationException) { callback.onCancellation(); } else if (error instanceof ExecutionException) { callback.onFailure(error.getCause() != null ? error.getCause() : error); diff --git a/tasks-kotlin-coroutines/api/tasks-kotlin-coroutines.api b/tasks-kotlin-coroutines/api/tasks-kotlin-coroutines.api new file mode 100644 index 0000000..86a457c --- /dev/null +++ b/tasks-kotlin-coroutines/api/tasks-kotlin-coroutines.api @@ -0,0 +1,9 @@ +public final class org/funfix/tasks/kotlin/CoroutinesJvmKt { + public static final fun fromSuspended (Lorg/funfix/tasks/kotlin/Task$Companion;Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; + public static synthetic fun fromSuspended$default (Lorg/funfix/tasks/kotlin/Task$Companion;Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;ILjava/lang/Object;)Ljava/lang/Object; + public static final fun runSuspended (Lorg/funfix/tasks/jvm/Task;Ljava/util/concurrent/Executor;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; + public static synthetic fun runSuspended$default (Lorg/funfix/tasks/jvm/Task;Ljava/util/concurrent/Executor;Lkotlin/coroutines/Continuation;ILjava/lang/Object;)Ljava/lang/Object; + public static final fun runSuspended-A-R0woo (Lorg/funfix/tasks/jvm/Task;Ljava/util/concurrent/Executor;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; + public static synthetic fun runSuspended-A-R0woo$default (Lorg/funfix/tasks/jvm/Task;Ljava/util/concurrent/Executor;Lkotlin/coroutines/Continuation;ILjava/lang/Object;)Ljava/lang/Object; +} + diff --git a/tasks-kotlin-coroutines/build.gradle.kts b/tasks-kotlin-coroutines/build.gradle.kts new file mode 100644 index 0000000..1175a8a --- /dev/null +++ b/tasks-kotlin-coroutines/build.gradle.kts @@ -0,0 +1,71 @@ +@file:OptIn(ExperimentalKotlinGradlePluginApi::class) + +import org.jetbrains.kotlin.gradle.ExperimentalKotlinGradlePluginApi +import org.jetbrains.kotlin.gradle.dsl.ExplicitApiMode + +plugins { + id("tasks.kmp-project") + id("tasks.versions") +} + +mavenPublishing { + pom { + name = "Tasks / Kotlin Coroutines" + description = "Integration with Kotlin's Coroutines" + } +} + +kotlin { + sourceSets { + val commonMain by getting { + compilerOptions { + explicitApi = ExplicitApiMode.Strict + allWarningsAsErrors = true + } + + dependencies { + implementation(project(":tasks-kotlin")) + implementation(libs.kotlinx.coroutines.core) + } + } + + val commonTest by getting { + dependencies { + implementation(libs.kotlin.test) + implementation(libs.kotlinx.coroutines.test) + } + } + + val jvmMain by getting { + compilerOptions { + explicitApi = ExplicitApiMode.Strict + allWarningsAsErrors = true + } + + dependencies { + implementation(project(":tasks-jvm")) + implementation(project(":tasks-kotlin")) + implementation(libs.kotlinx.coroutines.core) + } + } + + val jvmTest by getting { + dependencies { + implementation(libs.kotlin.test) + implementation(libs.kotlinx.coroutines.test) + } + } + + val jsMain by getting { + compilerOptions { + explicitApi = ExplicitApiMode.Strict + allWarningsAsErrors = true + } + + dependencies { + implementation(project(":tasks-kotlin")) + implementation(libs.kotlinx.coroutines.core) + } + } + } +} diff --git a/tasks-kotlin-coroutines/src/commonMain/kotlin/org/funfix/tasks/kotlin/coroutines.kt b/tasks-kotlin-coroutines/src/commonMain/kotlin/org/funfix/tasks/kotlin/coroutines.kt new file mode 100644 index 0000000..1eedc3f --- /dev/null +++ b/tasks-kotlin-coroutines/src/commonMain/kotlin/org/funfix/tasks/kotlin/coroutines.kt @@ -0,0 +1,40 @@ +package org.funfix.tasks.kotlin + +import kotlin.coroutines.CoroutineContext +import kotlin.coroutines.EmptyCoroutineContext + +/** + * Similar with `runBlocking`, however this is a "suspended" function, + * to be executed in the context of kotlinx.coroutines. + * + * NOTES: + * - The `kotlinx.coroutines.CoroutineDispatcher`, made available via the + * "coroutine context", is used to execute the task, being passed to + * the task's implementation as an `Executor`. + * - The coroutine's cancellation protocol cooperates with that of [Task], + * so cancelling the coroutine will also cancel the task (including the + * possibility for back-pressuring on the fiber's completion after + * cancellation). + * + * @param executor is an override of the `Executor` to be used for executing + * the task. If `null`, the `Executor` will be derived from the + * `CoroutineDispatcher` + */ +public expect suspend fun Task.runSuspended( + executor: Executor? = null +): T + +/** + * See documentation for [Task.runSuspended]. + */ +public expect suspend fun PlatformTask.runSuspended( + executor: Executor? = null +): T + +/** + * Creates a [Task] from a suspended block of code. + */ +public expect suspend fun Task.Companion.fromSuspended( + coroutineContext: CoroutineContext = EmptyCoroutineContext, + block: suspend () -> T +): Task diff --git a/tasks-kotlin-coroutines/src/commonMain/kotlin/org/funfix/tasks/kotlin/internals.kt b/tasks-kotlin-coroutines/src/commonMain/kotlin/org/funfix/tasks/kotlin/internals.kt new file mode 100644 index 0000000..1cef288 --- /dev/null +++ b/tasks-kotlin-coroutines/src/commonMain/kotlin/org/funfix/tasks/kotlin/internals.kt @@ -0,0 +1,15 @@ +package org.funfix.tasks.kotlin + +import kotlinx.coroutines.CoroutineDispatcher +import kotlinx.coroutines.Dispatchers +import kotlin.coroutines.ContinuationInterceptor +import kotlinx.coroutines.currentCoroutineContext + +/** + * Internal API: gets the current [CoroutineDispatcher] from the coroutine context. + */ +internal suspend fun currentDispatcher(): CoroutineDispatcher { + // Access the coroutineContext to get the ContinuationInterceptor + val continuationInterceptor = currentCoroutineContext()[ContinuationInterceptor] + return continuationInterceptor as? CoroutineDispatcher ?: Dispatchers.Default +} diff --git a/tasks-kotlin-coroutines/src/commonTest/kotlin/org/funfix/tasks/kotlin/CoroutinesCommonTest.kt b/tasks-kotlin-coroutines/src/commonTest/kotlin/org/funfix/tasks/kotlin/CoroutinesCommonTest.kt new file mode 100644 index 0000000..64a0901 --- /dev/null +++ b/tasks-kotlin-coroutines/src/commonTest/kotlin/org/funfix/tasks/kotlin/CoroutinesCommonTest.kt @@ -0,0 +1,97 @@ +package org.funfix.tasks.kotlin + +import kotlinx.coroutines.CancellationException +import kotlinx.coroutines.CompletableDeferred +import kotlinx.coroutines.awaitCancellation +import kotlinx.coroutines.async +import kotlinx.coroutines.test.runTest +import kotlin.test.Test +import kotlin.test.assertEquals +import kotlin.test.assertFailsWith + +class CoroutinesCommonTest { + @Test + fun runSuspendedSuccess() = runTest { + val task = Task.fromAsync { _, cb -> + cb(Outcome.Success(42)) + Cancellable {} + } + + val result = task.runSuspended() + + assertEquals(42, result) + } + + @Test + fun runSuspendedFailure() = runTest { + val ex = RuntimeException("Boom") + val task = Task.fromAsync { _, cb -> + cb(Outcome.Failure(ex)) + Cancellable {} + } + + val thrown = assertFailsWith { task.runSuspended() } + + assertEquals("Boom", thrown.message) + } + + @Test + fun runSuspendedCancelsTaskToken() = runTest { + val cancelled = CompletableDeferred() + val started = CompletableDeferred() + val task = Task.fromAsync { _, cb -> + started.complete(Unit) + Cancellable { + cancelled.complete(Unit) + cb(Outcome.Cancellation) + } + } + + val deferred = async { task.runSuspended() } + started.await() + deferred.cancel() + + assertFailsWith { deferred.await() } + cancelled.await() + } + + @Test + fun fromSuspendedSuccess() = runTest { + val task = Task.fromSuspended { + 21 + 21 + } + val deferred = CompletableDeferred>() + task.runAsync { outcome -> deferred.complete(outcome) } + + assertEquals(Outcome.Success(42), deferred.await()) + } + + @Test + fun fromSuspendedFailure() = runTest { + val ex = RuntimeException("Boom") + val task = Task.fromSuspended { + throw ex + } + val deferred = CompletableDeferred>() + task.runAsync { outcome -> deferred.complete(outcome) } + + assertEquals(Outcome.Failure(ex), deferred.await()) + } + + @Test + fun fromSuspendedCancellation() = runTest { + val started = CompletableDeferred() + val task = Task.fromSuspended { + started.complete(Unit) + awaitCancellation() + } + val deferred = CompletableDeferred>() + val cancel = task.runAsync { outcome -> deferred.complete(outcome) } + + started.await() + cancel.cancel() + + assertEquals(Outcome.Cancellation, deferred.await()) + } + +} diff --git a/tasks-kotlin-coroutines/src/jsMain/kotlin/org/funfix/tasks/kotlin/coroutines.js.kt b/tasks-kotlin-coroutines/src/jsMain/kotlin/org/funfix/tasks/kotlin/coroutines.js.kt new file mode 100644 index 0000000..c76ac15 --- /dev/null +++ b/tasks-kotlin-coroutines/src/jsMain/kotlin/org/funfix/tasks/kotlin/coroutines.js.kt @@ -0,0 +1,115 @@ +@file:OptIn(DelicateCoroutinesApi::class) + +package org.funfix.tasks.kotlin + +import kotlinx.coroutines.CancellableContinuation +import kotlinx.coroutines.CoroutineDispatcher +import kotlinx.coroutines.DelicateCoroutinesApi +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.GlobalScope +import kotlinx.coroutines.launch +import kotlinx.coroutines.suspendCancellableCoroutine +import kotlin.coroutines.CoroutineContext +import kotlin.coroutines.EmptyCoroutineContext +import kotlin.coroutines.cancellation.CancellationException +import kotlin.coroutines.resumeWithException + +public actual suspend fun PlatformTask.runSuspended( + executor: Executor? +): T = run { + val executorOrDefault = executor ?: buildExecutor(currentDispatcher()) + suspendCancellableCoroutine { cont -> + val contCallback = cont.asCompletionCallback() + try { + val token = this.invoke(executorOrDefault, contCallback) + cont.invokeOnCancellation { + token.cancel() + } + } catch (e: Throwable) { + UncaughtExceptionHandler.rethrowIfFatal(e) + contCallback(Outcome.Failure(e)) + } + } +} + +internal fun buildExecutor(dispatcher: CoroutineDispatcher): Executor = + DispatcherExecutor(dispatcher) + +internal fun buildCoroutineDispatcher( + @Suppress("UNUSED_PARAMETER") executor: Executor +): CoroutineDispatcher = + // Building this CoroutineDispatcher from an Executor is problematic, and there's no + // point in even trying on top of JS engines. + Dispatchers.Default + +private class DispatcherExecutor(val dispatcher: CoroutineDispatcher) : Executor { + override fun execute(command: Runnable) { + if (dispatcher.isDispatchNeeded(EmptyCoroutineContext)) { + dispatcher.dispatch( + EmptyCoroutineContext, + { command.run() } + ) + } else { + command.run() + } + } + + override fun toString(): String = + dispatcher.toString() +} + +internal fun CancellableContinuation.asCompletionCallback(): Callback { + var isActive = true + return { outcome -> + if (outcome is Outcome.Failure) { + UncaughtExceptionHandler.rethrowIfFatal(outcome.exception) + } + if (isActive) { + isActive = false + when (outcome) { + is Outcome.Success -> + resume(outcome.value) { _, _, _ -> + // on cancellation? + } + is Outcome.Failure -> + resumeWithException(outcome.exception) + is Outcome.Cancellation -> + resumeWithException(kotlinx.coroutines.CancellationException()) + } + } else if (outcome is Outcome.Failure) { + UncaughtExceptionHandler.logOrRethrow(outcome.exception) + } + } +} + +/** + * Creates a [Task] from a suspended block of code. + */ +public actual suspend fun Task.Companion.fromSuspended( + coroutineContext: CoroutineContext, + block: suspend () -> T +): Task = + Task.fromAsync { executor, callback -> + val job = GlobalScope.launch( + buildCoroutineDispatcher(executor) + coroutineContext + ) { + try { + val r = block() + callback(Outcome.Success(r)) + } catch (e: Throwable) { + UncaughtExceptionHandler.rethrowIfFatal(e) + when (e) { + is CancellationException, is TaskCancellationException -> + callback(Outcome.Cancellation) + else -> + callback(Outcome.Failure(e)) + } + } + } + Cancellable { + job.cancel() + } + } + +public actual suspend fun Task.runSuspended(executor: Executor?): T = + asPlatform.runSuspended(executor) diff --git a/tasks-kotlin-coroutines/src/jsTest/kotlin/org/funfix/tasks/kotlin/CoroutinesJsTest.kt b/tasks-kotlin-coroutines/src/jsTest/kotlin/org/funfix/tasks/kotlin/CoroutinesJsTest.kt new file mode 100644 index 0000000..f939325 --- /dev/null +++ b/tasks-kotlin-coroutines/src/jsTest/kotlin/org/funfix/tasks/kotlin/CoroutinesJsTest.kt @@ -0,0 +1,26 @@ +package org.funfix.tasks.kotlin + +import kotlinx.coroutines.test.runTest +import kotlin.test.Test +import kotlin.test.assertEquals +import kotlin.test.assertTrue + +class CoroutinesJsTest { + @Test + fun runSuspendedUsesProvidedExecutor() = runTest { + var executed = false + val executor = Executor { command -> + executed = true + command.run() + } + val task = Task.fromAsync { exec, cb -> + exec.execute { cb(Outcome.Success(7)) } + Cancellable {} + } + + val result = task.runSuspended(executor) + + assertEquals(7, result) + assertTrue(executed) + } +} diff --git a/tasks-kotlin-coroutines/src/jvmMain/kotlin/org/funfix/tasks/kotlin/coroutines.jvm.kt b/tasks-kotlin-coroutines/src/jvmMain/kotlin/org/funfix/tasks/kotlin/coroutines.jvm.kt new file mode 100644 index 0000000..aff9371 --- /dev/null +++ b/tasks-kotlin-coroutines/src/jvmMain/kotlin/org/funfix/tasks/kotlin/coroutines.jvm.kt @@ -0,0 +1,114 @@ +@file:JvmName("CoroutinesJvmKt") +@file:OptIn(DelicateCoroutinesApi::class) + +package org.funfix.tasks.kotlin + +import kotlinx.coroutines.CancellableContinuation +import kotlinx.coroutines.DelicateCoroutinesApi +import kotlinx.coroutines.GlobalScope +import kotlinx.coroutines.asCoroutineDispatcher +import kotlinx.coroutines.asExecutor +import kotlinx.coroutines.launch +import kotlinx.coroutines.suspendCancellableCoroutine +import org.funfix.tasks.jvm.CompletionCallback +import java.util.concurrent.atomic.AtomicBoolean +import kotlin.coroutines.CoroutineContext +import kotlin.coroutines.cancellation.CancellationException +import kotlin.coroutines.resumeWithException +import org.funfix.tasks.jvm.Outcome + +public actual suspend fun PlatformTask.runSuspended(executor: Executor?): T = + run { + val executorOrDefault = executor ?: currentDispatcher().asExecutor() + suspendCancellableCoroutine { cont -> + val contCallback = CoroutineAsCompletionCallback(cont) + try { + val token = runAsync(executorOrDefault, contCallback) + cont.invokeOnCancellation { + token.cancel() + } + } catch (e: Throwable) { + UncaughtExceptionHandler.rethrowIfFatal(e) + contCallback.onFailure(e) + } + } + } + +/** + * Internal API: wraps a [CancellableContinuation] into a [CompletionCallback]. + */ +internal class CoroutineAsCompletionCallback( + private val cont: CancellableContinuation +) : CompletionCallback { + private val isActive = AtomicBoolean(true) + + private inline fun completeWith(crossinline block: () -> Unit): Boolean = + if (isActive.getAndSet(false)) { + block() + true + } else { + false + } + + override fun onOutcome(outcome: Outcome) { + when (outcome) { + is Outcome.Success -> onSuccess(outcome.value) + is Outcome.Failure -> onFailure(outcome.exception) + is Outcome.Cancellation -> onCancellation() + } + } + + override fun onSuccess(value: T) { + completeWith { + cont.resume(value) { _, _, _ -> + // on cancellation? + } + } + } + + override fun onFailure(e: Throwable) { + if (!completeWith { + cont.resumeWithException(e) + }) { + UncaughtExceptionHandler.logOrRethrow(e) + } + } + + override fun onCancellation() { + completeWith { + cont.resumeWithException(kotlinx.coroutines.CancellationException()) + } + } +} + +public actual suspend fun Task.Companion.fromSuspended( + coroutineContext: CoroutineContext, + block: suspend () -> T +): Task = Task( + PlatformTask.fromAsync { executor, callback -> + val job = GlobalScope.launch( + executor.asCoroutineDispatcher() + coroutineContext + ) { + try { + val r = block() + callback.onSuccess(r) + } catch (e: Throwable) { + UncaughtExceptionHandler.rethrowIfFatal(e) + when (e) { + is CancellationException, + is TaskCancellationException, + is InterruptedException -> + callback.onCancellation() + else -> + callback.onFailure(e) + } + } + } + Cancellable { + job.cancel() + } + } +) + +public actual suspend fun Task.runSuspended(executor: Executor?): T = + asPlatform.runSuspended(executor) diff --git a/tasks-kotlin-coroutines/src/jvmTest/kotlin/org/funfix/tasks/kotlin/CoroutinesJvmTest.kt b/tasks-kotlin-coroutines/src/jvmTest/kotlin/org/funfix/tasks/kotlin/CoroutinesJvmTest.kt new file mode 100644 index 0000000..998a1c6 --- /dev/null +++ b/tasks-kotlin-coroutines/src/jvmTest/kotlin/org/funfix/tasks/kotlin/CoroutinesJvmTest.kt @@ -0,0 +1,31 @@ +package org.funfix.tasks.kotlin + +import kotlinx.coroutines.asCoroutineDispatcher +import kotlinx.coroutines.test.runTest +import kotlinx.coroutines.withContext +import java.util.concurrent.Executors +import kotlin.test.Test +import kotlin.test.assertTrue + +class CoroutinesJvmTest { + @Test + fun runSuspendedUsesCurrentDispatcherWhenExecutorNull() = runTest { + val executor = Executors.newSingleThreadExecutor { r -> + val thread = Thread(r) + thread.isDaemon = true + thread.name = "coroutines-test-${thread.id}" + thread + } + val dispatcher = executor.asCoroutineDispatcher() + try { + val threadName = withContext(dispatcher) { + Task.fromBlockingIO { Thread.currentThread().name }.runSuspended() + } + + assertTrue(threadName.startsWith("coroutines-test-")) + } finally { + dispatcher.close() + executor.shutdown() + } + } +} diff --git a/tasks-kotlin/api/tasks-kotlin.api b/tasks-kotlin/api/tasks-kotlin.api new file mode 100644 index 0000000..fca333b --- /dev/null +++ b/tasks-kotlin/api/tasks-kotlin.api @@ -0,0 +1,129 @@ +public final class org/funfix/tasks/kotlin/ExecutorsJvmKt { + public static final fun getSharedIOExecutor ()Ljava/util/concurrent/Executor; + public static final fun getTrampolineExecutor ()Ljava/util/concurrent/Executor; +} + +public final class org/funfix/tasks/kotlin/Fiber : org/funfix/tasks/jvm/Cancellable { + public static final synthetic fun box-impl (Lorg/funfix/tasks/jvm/Fiber;)Lorg/funfix/tasks/kotlin/Fiber; + public fun cancel ()V + public static fun cancel-impl (Lorg/funfix/tasks/jvm/Fiber;)V + public static fun constructor-impl (Lorg/funfix/tasks/jvm/Fiber;)Lorg/funfix/tasks/jvm/Fiber; + public fun equals (Ljava/lang/Object;)Z + public static fun equals-impl (Lorg/funfix/tasks/jvm/Fiber;Ljava/lang/Object;)Z + public static final fun equals-impl0 (Lorg/funfix/tasks/jvm/Fiber;Lorg/funfix/tasks/jvm/Fiber;)Z + public final fun getAsPlatform ()Lorg/funfix/tasks/jvm/Fiber; + public fun hashCode ()I + public static fun hashCode-impl (Lorg/funfix/tasks/jvm/Fiber;)I + public fun toString ()Ljava/lang/String; + public static fun toString-impl (Lorg/funfix/tasks/jvm/Fiber;)Ljava/lang/String; + public final synthetic fun unbox-impl ()Lorg/funfix/tasks/jvm/Fiber; +} + +public final class org/funfix/tasks/kotlin/FiberJvmKt { + public static final fun asKotlin (Lorg/funfix/tasks/jvm/Fiber;)Lorg/funfix/tasks/jvm/Fiber; + public static final fun awaitAsync-bilpdk0 (Lorg/funfix/tasks/jvm/Fiber;Lkotlin/jvm/functions/Function1;)Lorg/funfix/tasks/jvm/Cancellable; + public static final fun awaitBlocking-eJoBjQM (Lorg/funfix/tasks/jvm/Fiber;)Ljava/lang/Object; + public static final fun awaitBlockingTimed-MI-qbaI (Lorg/funfix/tasks/jvm/Fiber;J)Ljava/lang/Object; + public static final fun getOutcomeOrNull-eJoBjQM (Lorg/funfix/tasks/jvm/Fiber;)Lorg/funfix/tasks/kotlin/Outcome; + public static final fun getResultOrThrow-eJoBjQM (Lorg/funfix/tasks/jvm/Fiber;)Ljava/lang/Object; + public static final fun joinAsync-bilpdk0 (Lorg/funfix/tasks/jvm/Fiber;Ljava/lang/Runnable;)Lorg/funfix/tasks/jvm/Cancellable; + public static final fun joinBlocking-eJoBjQM (Lorg/funfix/tasks/jvm/Fiber;)V + public static final fun joinBlockingTimed-MI-qbaI (Lorg/funfix/tasks/jvm/Fiber;J)V +} + +public abstract interface class org/funfix/tasks/kotlin/Outcome { + public static final field Companion Lorg/funfix/tasks/kotlin/Outcome$Companion; + public fun getOrThrow ()Ljava/lang/Object; +} + +public final class org/funfix/tasks/kotlin/Outcome$Cancellation : org/funfix/tasks/kotlin/Outcome { + public static final field INSTANCE Lorg/funfix/tasks/kotlin/Outcome$Cancellation; + public fun equals (Ljava/lang/Object;)Z + public synthetic fun getOrThrow ()Ljava/lang/Object; + public fun getOrThrow ()Ljava/lang/Void; + public fun hashCode ()I + public fun toString ()Ljava/lang/String; +} + +public final class org/funfix/tasks/kotlin/Outcome$Companion { + public final fun cancellation ()Lorg/funfix/tasks/kotlin/Outcome; + public final fun failure (Ljava/lang/Throwable;)Lorg/funfix/tasks/kotlin/Outcome; + public final fun success (Ljava/lang/Object;)Lorg/funfix/tasks/kotlin/Outcome; +} + +public final class org/funfix/tasks/kotlin/Outcome$DefaultImpls { + public static fun getOrThrow (Lorg/funfix/tasks/kotlin/Outcome;)Ljava/lang/Object; +} + +public final class org/funfix/tasks/kotlin/Outcome$Failure : org/funfix/tasks/kotlin/Outcome { + public fun (Ljava/lang/Throwable;)V + public final fun component1 ()Ljava/lang/Throwable; + public final fun copy (Ljava/lang/Throwable;)Lorg/funfix/tasks/kotlin/Outcome$Failure; + public static synthetic fun copy$default (Lorg/funfix/tasks/kotlin/Outcome$Failure;Ljava/lang/Throwable;ILjava/lang/Object;)Lorg/funfix/tasks/kotlin/Outcome$Failure; + public fun equals (Ljava/lang/Object;)Z + public final fun getException ()Ljava/lang/Throwable; + public synthetic fun getOrThrow ()Ljava/lang/Object; + public fun getOrThrow ()Ljava/lang/Void; + public fun hashCode ()I + public fun toString ()Ljava/lang/String; +} + +public final class org/funfix/tasks/kotlin/Outcome$Success : org/funfix/tasks/kotlin/Outcome { + public fun (Ljava/lang/Object;)V + public final fun component1 ()Ljava/lang/Object; + public final fun copy (Ljava/lang/Object;)Lorg/funfix/tasks/kotlin/Outcome$Success; + public static synthetic fun copy$default (Lorg/funfix/tasks/kotlin/Outcome$Success;Ljava/lang/Object;ILjava/lang/Object;)Lorg/funfix/tasks/kotlin/Outcome$Success; + public fun equals (Ljava/lang/Object;)Z + public fun getOrThrow ()Ljava/lang/Object; + public final fun getValue ()Ljava/lang/Object; + public fun hashCode ()I + public fun toString ()Ljava/lang/String; +} + +public final class org/funfix/tasks/kotlin/Task { + public static final field Companion Lorg/funfix/tasks/kotlin/Task$Companion; + public static final synthetic fun box-impl (Lorg/funfix/tasks/jvm/Task;)Lorg/funfix/tasks/kotlin/Task; + public static fun constructor-impl (Lorg/funfix/tasks/jvm/Task;)Lorg/funfix/tasks/jvm/Task; + public fun equals (Ljava/lang/Object;)Z + public static fun equals-impl (Lorg/funfix/tasks/jvm/Task;Ljava/lang/Object;)Z + public static final fun equals-impl0 (Lorg/funfix/tasks/jvm/Task;Lorg/funfix/tasks/jvm/Task;)Z + public static final fun getAsJava-impl (Lorg/funfix/tasks/jvm/Task;)Lorg/funfix/tasks/jvm/Task; + public final fun getAsPlatform ()Lorg/funfix/tasks/jvm/Task; + public fun hashCode ()I + public static fun hashCode-impl (Lorg/funfix/tasks/jvm/Task;)I + public fun toString ()Ljava/lang/String; + public static fun toString-impl (Lorg/funfix/tasks/jvm/Task;)Ljava/lang/String; + public final synthetic fun unbox-impl ()Lorg/funfix/tasks/jvm/Task; +} + +public final class org/funfix/tasks/kotlin/Task$Companion { +} + +public final class org/funfix/tasks/kotlin/TaskJvmKt { + public static final fun ensureRunningOnExecutor-EZXAkWY (Lorg/funfix/tasks/jvm/Task;Ljava/util/concurrent/Executor;)Lorg/funfix/tasks/jvm/Task; + public static synthetic fun ensureRunningOnExecutor-EZXAkWY$default (Lorg/funfix/tasks/jvm/Task;Ljava/util/concurrent/Executor;ILjava/lang/Object;)Lorg/funfix/tasks/jvm/Task; + public static final fun fromAsync (Lorg/funfix/tasks/kotlin/Task$Companion;Lkotlin/jvm/functions/Function2;)Lorg/funfix/tasks/jvm/Task; + public static final fun fromBlockingFuture (Lorg/funfix/tasks/kotlin/Task$Companion;Lkotlin/jvm/functions/Function0;)Lorg/funfix/tasks/jvm/Task; + public static final fun fromBlockingIO (Lorg/funfix/tasks/kotlin/Task$Companion;Lkotlin/jvm/functions/Function0;)Lorg/funfix/tasks/jvm/Task; + public static final fun fromCancellableFuture (Lorg/funfix/tasks/kotlin/Task$Companion;Lkotlin/jvm/functions/Function0;)Lorg/funfix/tasks/jvm/Task; + public static final fun fromCompletionStage (Lorg/funfix/tasks/kotlin/Task$Companion;Lkotlin/jvm/functions/Function0;)Lorg/funfix/tasks/jvm/Task; + public static final fun runAsync-A-R0woo (Lorg/funfix/tasks/jvm/Task;Ljava/util/concurrent/Executor;Lkotlin/jvm/functions/Function1;)Lorg/funfix/tasks/jvm/Cancellable; + public static synthetic fun runAsync-A-R0woo$default (Lorg/funfix/tasks/jvm/Task;Ljava/util/concurrent/Executor;Lkotlin/jvm/functions/Function1;ILjava/lang/Object;)Lorg/funfix/tasks/jvm/Cancellable; + public static final fun runBlocking-EZXAkWY (Lorg/funfix/tasks/jvm/Task;Ljava/util/concurrent/Executor;)Ljava/lang/Object; + public static synthetic fun runBlocking-EZXAkWY$default (Lorg/funfix/tasks/jvm/Task;Ljava/util/concurrent/Executor;ILjava/lang/Object;)Ljava/lang/Object; + public static final fun runBlockingTimed-4GGJJa0 (Lorg/funfix/tasks/jvm/Task;JLjava/util/concurrent/Executor;)Ljava/lang/Object; + public static synthetic fun runBlockingTimed-4GGJJa0$default (Lorg/funfix/tasks/jvm/Task;JLjava/util/concurrent/Executor;ILjava/lang/Object;)Ljava/lang/Object; + public static final fun runFiber-EZXAkWY (Lorg/funfix/tasks/jvm/Task;Ljava/util/concurrent/Executor;)Lorg/funfix/tasks/jvm/Fiber; + public static synthetic fun runFiber-EZXAkWY$default (Lorg/funfix/tasks/jvm/Task;Ljava/util/concurrent/Executor;ILjava/lang/Object;)Lorg/funfix/tasks/jvm/Fiber; +} + +public final class org/funfix/tasks/kotlin/TaskKt { + public static final fun asKotlin (Lorg/funfix/tasks/jvm/Task;)Lorg/funfix/tasks/jvm/Task; +} + +public final class org/funfix/tasks/kotlin/UncaughtExceptionHandler { + public static final field INSTANCE Lorg/funfix/tasks/kotlin/UncaughtExceptionHandler; + public final fun logOrRethrow (Ljava/lang/Throwable;)V + public final fun rethrowIfFatal (Ljava/lang/Throwable;)V +} + diff --git a/tasks-kotlin/build.gradle.kts b/tasks-kotlin/build.gradle.kts new file mode 100644 index 0000000..0873570 --- /dev/null +++ b/tasks-kotlin/build.gradle.kts @@ -0,0 +1,60 @@ +@file:OptIn(ExperimentalKotlinGradlePluginApi::class) + +import org.jetbrains.kotlin.gradle.ExperimentalKotlinGradlePluginApi +import org.jetbrains.kotlin.gradle.dsl.ExplicitApiMode + +plugins { + id("tasks.kmp-project") + id("tasks.versions") +} + +mavenPublishing { + pom { + name = "Tasks / Kotlin" + description = "Integration with Kotlin Multiplatform" + } +} + +kotlin { + sourceSets { + val commonMain by getting { + compilerOptions { + explicitApi = ExplicitApiMode.Strict + allWarningsAsErrors = true + } + } + + val commonTest by getting { + dependencies { + implementation(libs.kotlin.test) + implementation(libs.kotlinx.coroutines.test) + } + } + + val jvmMain by getting { + compilerOptions { + explicitApi = ExplicitApiMode.Strict + allWarningsAsErrors = true + } + + dependencies { + implementation(project(":tasks-jvm")) + compileOnly(libs.jetbrains.annotations) + } + } + + val jvmTest by getting { + dependencies { + implementation(libs.kotlin.test) + implementation(libs.kotlinx.coroutines.test) + } + } + + val jsMain by getting { + compilerOptions { + explicitApi = ExplicitApiMode.Strict + allWarningsAsErrors = true + } + } + } +} diff --git a/tasks-kotlin/src/commonMain/kotlin/org/funfix/tasks/kotlin/Cancellable.kt b/tasks-kotlin/src/commonMain/kotlin/org/funfix/tasks/kotlin/Cancellable.kt new file mode 100644 index 0000000..d7b550c --- /dev/null +++ b/tasks-kotlin/src/commonMain/kotlin/org/funfix/tasks/kotlin/Cancellable.kt @@ -0,0 +1,22 @@ +@file:Suppress("EXPECT_ACTUAL_CLASSIFIERS_ARE_IN_BETA_WARNING") + +package org.funfix.tasks.kotlin + +/** + * Represents a non-blocking piece of logic that triggers the cancellation + * procedure of an asynchronous computation. + * + * MUST NOT block the calling thread. Interruption of the computation + * isn't guaranteed to have happened after this call returns. + * + * MUST BE idempotent, i.e. calling it multiple times should have the same + * effect as calling it once. + * + * MUST BE thread-safe. + */ +public expect fun interface Cancellable { + /** + * Triggers the cancellation of the computation. + */ + public fun cancel() +} diff --git a/tasks-kotlin/src/commonMain/kotlin/org/funfix/tasks/kotlin/Outcome.kt b/tasks-kotlin/src/commonMain/kotlin/org/funfix/tasks/kotlin/Outcome.kt new file mode 100644 index 0000000..6f4dd4d --- /dev/null +++ b/tasks-kotlin/src/commonMain/kotlin/org/funfix/tasks/kotlin/Outcome.kt @@ -0,0 +1,59 @@ +package org.funfix.tasks.kotlin + +/** + * Represents the result of a computation. + * + * This is a union type that can signal: + * - a successful result, via [Outcome.Success] + * - a failure (with an exception), via [Outcome.Failure] + * - a cancelled computation, via [Outcome.Cancellation] + */ +public sealed interface Outcome { + public val orThrow: T + /** + * Returns the successful result of a computation, or throws an exception + * if the computation failed or was cancelled. + * + * @throws TaskCancellationException in case this is an [Outcome.Cancellation] + * @throws Throwable in case this is an [Outcome.Failure] + */ + @Throws(TaskCancellationException::class) + get() = + when (this) { + is Success -> value + is Failure -> throw exception + is Cancellation -> throw TaskCancellationException("Task was cancelled") + } + + /** + * Returned in case the task was successful. + */ + public data class Success(val value: T): Outcome + + /** + * Returned in case the task failed with an exception. + */ + public data class Failure(val exception: Throwable): Outcome + + /** + * Returned in case the task was cancelled. + */ + public data object Cancellation: Outcome + + public companion object { + /** + * Constructs a successful [Outcome] with the given value. + */ + public fun success(value: T): Outcome = Success(value) + + /** + * Constructs a failed [Outcome] with the given exception. + */ + public fun failure(e: Throwable): Outcome = Failure(e) + + /** + * Constructs a cancelled [Outcome]. + */ + public fun cancellation(): Outcome = Cancellation + } +} diff --git a/tasks-kotlin/src/commonMain/kotlin/org/funfix/tasks/kotlin/Task.kt b/tasks-kotlin/src/commonMain/kotlin/org/funfix/tasks/kotlin/Task.kt new file mode 100644 index 0000000..f444713 --- /dev/null +++ b/tasks-kotlin/src/commonMain/kotlin/org/funfix/tasks/kotlin/Task.kt @@ -0,0 +1,120 @@ +@file:Suppress("EXPECT_ACTUAL_CLASSIFIERS_ARE_IN_BETA_WARNING") + +package org.funfix.tasks.kotlin + +/** + * An alias for a platform-specific implementation that powers [Task]. + */ +public expect class PlatformTask + +/** + * Kotlin-specific callback type used for signaling the completion of running + * tasks. + */ +public typealias Callback = (Outcome) -> Unit + +/** + * A task is a computation that can be executed asynchronously. + * + * In the vocabulary of "reactive streams", this is a "cold data source", + * meaning that the computation hasn't executed yet, and when it will execute, + * the result won't get cached (memoized). In the vocabulary of + * "functional programming", this is a pure value, being somewhat equivalent + * to `IO`. + * + * This is designed to be a compile-time type that's going to be erased at + * runtime. Therefore, for the JVM at least, when using it in your APIs, it + * won't pollute it with Kotlin-specific wrappers. + */ +public expect value class Task public constructor( + public val asPlatform: PlatformTask +) { + // Companion object currently doesn't do anything, but we + // need to define one to make the class open for extensions. + public companion object +} + +/** + * Converts a platform task to a Kotlin task. + * + * E.g., can convert a `jvm.Task` to a `kotlin.Task`. + */ +public fun PlatformTask.asKotlin(): Task = + Task(this) + +/** + * Ensures that the task starts asynchronously and runs on the given executor, + * regardless of the `run` method that is used, or the injected executor in + * any of those methods. + * + * One example where this is useful is for blocking I/O operations, for + * ensuring that the task runs on the thread-pool meant for blocking I/O, + * regardless of what executor is passed to [runAsync]. + * + * Example: + * ```kotlin + * Task.fromBlockingIO { + * // Reads a file from disk + * Files.readString(Paths.get("file.txt")) + * }.ensureRunningOnExecutor( + * BlockingIOExecutor + * ) + * ``` + * + * Another use-case is for ensuring that the task runs asynchronously, on + * another thread. Otherwise, tasks may be able to execute on the current thread: + * + * ```kotlin + * val task = Task.fromBlockingIO { + * // Reads a file from disk + * Files.readString(Paths.get("file.txt")) + * } + * + * task + * // Ensuring the task runs on a different thread + * .ensureRunningOnExecutor() + * // Blocking the current thread for the result (JVM API) + * .runBlocking() + * ``` + * + * @param executor is the [Executor] used as an override. If `null`, then + * the executor injected (e.g., in [runAsync]) will be used. + */ +public expect fun Task.ensureRunningOnExecutor(executor: Executor? = null): Task + +/** + * Executes the task asynchronously. + * + * @param executor is the [Executor] to use for running the task + * @param callback is the callback given for signaling completion + * @return a [Cancellable] that can be used to cancel the running task + */ +public expect fun Task.runAsync( + executor: Executor? = null, + callback: Callback +): Cancellable + +/** + * Creates a task from an asynchronous computation, initiated on the current thread. + * + * This method ensures: + * 1. Idempotent cancellation + * 2. Trampolined execution to avoid stack-overflows + * + * The created task will execute the given function on the current + * thread, by using a "trampoline" to avoid stack overflows. This may + * be useful if the computation for initiating the async process is + * expected to be fast. If the computation can block the current + * thread, consider ensuring the task runs on a different executor + * (for example via [ensureRunningOnExecutor] or by wrapping the + * blocking portion in a `fromBlockingIO` task). + * + * @param start is the function that will trigger the async computation, + * injecting a callback that will be used to signal the result, and an + * executor that can be used for creating additional threads. + * + * @return a new task that will execute the given builder function upon execution + */ +public expect fun Task.Companion.fromAsync( + start: (Executor, Callback) -> Cancellable +): Task diff --git a/tasks-kotlin/src/commonMain/kotlin/org/funfix/tasks/kotlin/UncaughtExceptionHandler.kt b/tasks-kotlin/src/commonMain/kotlin/org/funfix/tasks/kotlin/UncaughtExceptionHandler.kt new file mode 100644 index 0000000..eda1499 --- /dev/null +++ b/tasks-kotlin/src/commonMain/kotlin/org/funfix/tasks/kotlin/UncaughtExceptionHandler.kt @@ -0,0 +1,19 @@ +@file:Suppress("EXPECT_ACTUAL_CLASSIFIERS_ARE_IN_BETA_WARNING") + +package org.funfix.tasks.kotlin + +/** + * Utilities for handling uncaught exceptions. + */ +public expect object UncaughtExceptionHandler { + /** + * Used for filtering the fatal exceptions that should + * crash the process (e.g., `OutOfMemoryError`). + */ + public fun rethrowIfFatal(e: Throwable) + + /** + * Logs a caught exception, or rethrows it if it's fatal. + */ + public fun logOrRethrow(e: Throwable) +} diff --git a/tasks-kotlin/src/commonMain/kotlin/org/funfix/tasks/kotlin/exceptions.kt b/tasks-kotlin/src/commonMain/kotlin/org/funfix/tasks/kotlin/exceptions.kt new file mode 100644 index 0000000..d29fa54 --- /dev/null +++ b/tasks-kotlin/src/commonMain/kotlin/org/funfix/tasks/kotlin/exceptions.kt @@ -0,0 +1,24 @@ +@file:Suppress("EXPECT_ACTUAL_CLASSIFIERS_ARE_IN_BETA_WARNING") + +package org.funfix.tasks.kotlin + +/** + * An exception that is thrown when waiting for the result of a task + * that has been cancelled. + * + * Note, this is unlike the JVM's `InterruptedException` or Kotlin's + * `CancellationException`, which are thrown when the current thread or fiber is + * interrupted. This exception is thrown when waiting for the result of a task + * that has been cancelled concurrently, but this doesn't mean that the current + * thread or fiber was interrupted. + */ +public expect open class TaskCancellationException(message: String?): Exception { + public constructor() +} + +/** + * Exception thrown when trying to get the result of a fiber that + * hasn't completed yet. + */ +public expect class FiberNotCompletedException public constructor() : + Exception diff --git a/tasks-kotlin/src/commonMain/kotlin/org/funfix/tasks/kotlin/executors.kt b/tasks-kotlin/src/commonMain/kotlin/org/funfix/tasks/kotlin/executors.kt new file mode 100644 index 0000000..db83c28 --- /dev/null +++ b/tasks-kotlin/src/commonMain/kotlin/org/funfix/tasks/kotlin/executors.kt @@ -0,0 +1,49 @@ +@file:Suppress("EXPECT_ACTUAL_CLASSIFIERS_ARE_IN_BETA_WARNING") + +package org.funfix.tasks.kotlin + +/** + * An [Executor] is an abstraction for a thread-pool or a single-threaded + * event-loop, used for running tasks. + * + * On the JVM, this is an alias for the `java.util.concurrent.Executor` + * interface. On top of JavaScript, one way to implement this is via + * `setTimeout`. + */ +public expect fun interface Executor { + public fun execute(command: Runnable) +} + +/** + * A simple interface for a task that can be executed asynchronously. + * + * On the JVM, this is an alias for the `java.lang.Runnable` interface. + */ +public expect fun interface Runnable { + public fun run() +} + +/** + * The global executor, used for running tasks that don't specify an + * explicit executor. + * + * On top of the JVM, this is powered by "virtual threads" (project loom), if + * the runtime supports it (Java 21+). Otherwise, it's an unlimited "cached" + * thread-pool. On top of JavaScript, blocking I/O operations are not possible + * in the browser, and discouraged in Node.js. JS runtimes don't have + * multi-threading with shared-memory concurrency, so this will be just a plain + * executor. + */ +public expect val SharedIOExecutor: Executor + +/** + * An [Executor] that runs tasks on the current thread. + * + * Uses a [trampoline](https://en.wikipedia.org/wiki/Trampoline_(computing)) + * to ensure that recursive calls don't blow the stack. + * + * Using this executor is useful for making asynchronous callbacks stack-safe. + * Note, however, that the tasks get executed on the current thread, immediately, + * even if the implementation guards against stack overflows. + */ +public expect val TrampolineExecutor: Executor diff --git a/tasks-kotlin/src/commonTest/kotlin/org/funfix/tasks/kotlin/AsyncTestUtils.kt b/tasks-kotlin/src/commonTest/kotlin/org/funfix/tasks/kotlin/AsyncTestUtils.kt new file mode 100644 index 0000000..aa186ec --- /dev/null +++ b/tasks-kotlin/src/commonTest/kotlin/org/funfix/tasks/kotlin/AsyncTestUtils.kt @@ -0,0 +1,20 @@ +package org.funfix.tasks.kotlin + +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.test.TestScope +import kotlinx.coroutines.withContext +import kotlin.coroutines.CoroutineContext +import kotlin.coroutines.EmptyCoroutineContext + +interface AsyncTestUtils { + fun runTest( + context: CoroutineContext = EmptyCoroutineContext, + testBody: suspend TestScope.() -> Unit + ) { + kotlinx.coroutines.test.runTest(context) { + withContext(Dispatchers.Unconfined) { + testBody() + } + } + } +} diff --git a/tasks-kotlin/src/commonTest/kotlin/org/funfix/tasks/kotlin/AsyncTests.kt b/tasks-kotlin/src/commonTest/kotlin/org/funfix/tasks/kotlin/AsyncTests.kt new file mode 100644 index 0000000..4e02a99 --- /dev/null +++ b/tasks-kotlin/src/commonTest/kotlin/org/funfix/tasks/kotlin/AsyncTests.kt @@ -0,0 +1,135 @@ +//package org.funfix.tasks.kotlin +// +//import kotlinx.coroutines.yield +//import kotlin.test.Test +//import kotlin.test.assertEquals +//import kotlin.test.fail +// +//class AsyncTests: AsyncTestUtils { +// @Test +// fun createAsync() = runTest { +// val task = taskFromAsync { executor, callback -> +// executor.execute { +// callback(Outcome.Success(1 + 1)) +// } +// EmptyCancellable +// } +// +// val r = task.executeSuspended() +// assertEquals(2, r) +// } +// +// @Test +// fun fromSuspendedHappy() = runTest { +// val task = taskFromSuspended { +// yield() +// 1 + 1 +// } +// +// val r = task.executeSuspended() +// assertEquals(2, r) +// } +// +// @Test +// fun fromSuspendedFailure() = runTest { +// val e = RuntimeException("Boom") +// val task = taskFromSuspended { +// yield() +// throw e +// } +// +// try { +// task.executeSuspended() +// fail("Should have thrown") +// } catch (e: RuntimeException) { +// assertEquals("Boom", e.message) +// } +// } +// +// @Test +// fun simpleSuspendedChaining() = runTest { +// val task = taskFromSuspended { +// yield() +// 1 + 1 +// } +// +// val task2 = taskFromSuspended { +// yield() +// task.executeSuspended() + 1 +// } +// +// val r = task2.executeSuspended() +// assertEquals(3, r) +// } +// +// @Test +// fun fiberChaining() = runTest { +// val task = taskFromSuspended { +// yield() +// 1 + 1 +// } +// +// val task2 = taskFromSuspended { +// yield() +// task.executeFiber().awaitSuspended() + 1 +// } +// +// val r = task2.executeSuspended() +// assertEquals(3, r) +// } +// +// @Test +// fun complexChaining() = runTest { +// val task = taskFromSuspended { +// yield() +// 1 + 1 +// } +// +// val task2 = taskFromSuspended { +// yield() +// task.executeSuspended() + 1 +// } +// +// val task3 = taskFromSuspended { +// yield() +// task2.executeFiber().awaitSuspended() + 1 +// } +// +// val task4 = taskFromSuspended { +// yield() +// val deferred = async { task3.executeSuspended() } +// deferred.await() + 1 +// } +// +// val r = task4.executeSuspended() +// assertEquals(5, r) +// } +// +// @Test +// fun cancellation() = runTest { +// val lock = Mutex() +// val latch = CompletableDeferred() +// val wasCancelled = CompletableDeferred() +// lock.lock() +// +// val job = async { +// taskFromSuspended { +// yield() +// latch.complete(Unit) +// try { +// lock.lock() +// } finally { +// wasCancelled.complete(Unit) +// lock.unlock() +// } +// }.executeSuspended() +// } +// +// withTimeout(5000) { latch.await() } +// job.cancel() +// +// withTimeout(5000) { +// wasCancelled.await() +// } +// } +//} diff --git a/tasks-kotlin/src/jsMain/kotlin/org/funfix/tasks/kotlin/Cancellable.js.kt b/tasks-kotlin/src/jsMain/kotlin/org/funfix/tasks/kotlin/Cancellable.js.kt new file mode 100644 index 0000000..a90c1e0 --- /dev/null +++ b/tasks-kotlin/src/jsMain/kotlin/org/funfix/tasks/kotlin/Cancellable.js.kt @@ -0,0 +1,46 @@ +@file:Suppress("EXPECT_ACTUAL_CLASSIFIERS_ARE_IN_BETA_WARNING") + +package org.funfix.tasks.kotlin + +public actual fun interface Cancellable { + public actual fun cancel() + + public companion object { + public val empty: Cancellable = + Cancellable {} + } +} + +internal class MutableCancellable : Cancellable { + private var ref: State = State.Active(Cancellable.empty, 0) + + override fun cancel() { + when (val current = ref) { + is State.Active -> { + ref = State.Cancelled + current.token.cancel() + } + State.Cancelled -> return + } + } + + fun set(token: Cancellable) { + while (true) { + when (val current = ref) { + is State.Active -> { + ref = State.Active(token, current.order + 1) + return + } + is State.Cancelled -> { + token.cancel() + return + } + } + } + } + + private sealed interface State { + data class Active(val token: Cancellable, val order: Int) : State + data object Cancelled : State + } +} diff --git a/tasks-kotlin/src/jsMain/kotlin/org/funfix/tasks/kotlin/CancellablePromise.kt b/tasks-kotlin/src/jsMain/kotlin/org/funfix/tasks/kotlin/CancellablePromise.kt new file mode 100644 index 0000000..c77e67f --- /dev/null +++ b/tasks-kotlin/src/jsMain/kotlin/org/funfix/tasks/kotlin/CancellablePromise.kt @@ -0,0 +1,44 @@ +package org.funfix.tasks.kotlin + +import kotlin.js.Promise + +/** + * This is a wrapper around a JavaScript [Promise] with a + * [Cancellable] reference attached. + * + * A standard JavaScript [Promise] is not connected to its + * asynchronous task and cannot be cancelled. Thus, if we want to cancel + * a task, we need to keep a reference to a [Cancellable] object that + * can do the job. + * + * Contract: + * - [join] completes regardless of how the underlying computation ends, + * including when it gets cancelled via [cancellable]. + * - [join] does not reveal any outcome; it is only a completion signal. + * + * Example: + * ```kotlin + * val promise = Promise { resolve, _ -> resolve(1) } + * val join = Promise { resolve, _ -> + * promise.then({ resolve(Unit) }, { resolve(Unit) }) + * } + * val cp = CancellablePromise(promise, join, Cancellable.empty) + * cp.cancelAndJoin().then { /* completion observed */ } + * ``` + */ +public data class CancellablePromise( + val promise: Promise, + val join: Promise, + val cancellable: Cancellable +) { + /** + * Triggers cancellation and then waits for [join] to complete. + * + * Note that [join] completes regardless of whether [promise] + * resolves, rejects, or the computation is cancelled. + */ + public fun cancelAndJoin(): Promise { + cancellable.cancel() + return join + } +} diff --git a/tasks-kotlin/src/jsMain/kotlin/org/funfix/tasks/kotlin/Task.js.kt b/tasks-kotlin/src/jsMain/kotlin/org/funfix/tasks/kotlin/Task.js.kt new file mode 100644 index 0000000..b61706c --- /dev/null +++ b/tasks-kotlin/src/jsMain/kotlin/org/funfix/tasks/kotlin/Task.js.kt @@ -0,0 +1,259 @@ +@file:Suppress("EXPECT_ACTUAL_CLASSIFIERS_ARE_IN_BETA_WARNING") + +package org.funfix.tasks.kotlin + +import kotlin.js.Promise + +public actual class PlatformTask( + private val f: (Executor, (Outcome) -> Unit) -> Cancellable +) { + public operator fun invoke( + executor: Executor, + callback: (Outcome) -> Unit + ): Cancellable = + f(executor, callback) +} + +public actual value class Task public actual constructor( + public actual val asPlatform: PlatformTask +) { + public actual companion object +} + +public actual fun Task.runAsync( + executor: Executor?, + callback: (Outcome) -> Unit +): Cancellable { + val protected = callback.protect() + try { + return asPlatform.invoke( + executor ?: SharedIOExecutor, + protected + ) + } catch (e: Throwable) { + UncaughtExceptionHandler.rethrowIfFatal(e) + protected(Outcome.failure(e)) + return Cancellable.empty + } +} + +public actual fun Task.Companion.fromAsync( + start: (Executor, Callback) -> Cancellable +): Task = + Task(PlatformTask { executor, cb -> + val cRef = MutableCancellable() + TrampolineExecutor.execute { + cRef.set(start(executor, cb)) + } + cRef + }) + +internal fun Callback.protect(): Callback { + var isWaiting = true + return { o -> + if (o is Outcome.Failure) { + UncaughtExceptionHandler.logOrRethrow(o.exception) + } + if (isWaiting) { + isWaiting = false + TrampolineExecutor.execute { + this@protect.invoke(o) + } + } + } +} + +public actual fun Task.ensureRunningOnExecutor(executor: Executor?): Task = + Task(PlatformTask { injectedExecutor, callback -> + val ec = executor ?: injectedExecutor + val cRef = MutableCancellable() + ec.execute { + val c = this@ensureRunningOnExecutor.asPlatform.invoke(ec, callback) + cRef.set(c) + } + cRef + }) + +private fun promiseFailure(cause: Any?): Throwable = + when (cause) { + is Throwable -> cause + null -> RuntimeException("Promise rejected") + else -> RuntimeException(cause.toString()) + } + +/** + * Creates a task from a JavaScript [Promise] builder. + * + * Contract: + * - The resulting task is not cancellable; cancellation does not + * affect the underlying promise. + * - Completion mirrors the promise outcome: resolve maps to + * [Outcome.Success], reject maps to [Outcome.Failure]. + * + * Example: + * ```kotlin + * val task = Task.fromPromise { + * Promise { resolve, _ -> resolve(1) } + * } + * ``` + */ +public fun Task.Companion.fromPromise( + builder: () -> Promise +): Task = + Task.fromAsync { _, callback -> + var isDone = false + try { + val promise = builder() + promise.then( + { value -> + if (!isDone) { + isDone = true + callback(Outcome.Success(value)) + } + }, + { error -> + if (!isDone) { + isDone = true + callback(Outcome.Failure(promiseFailure(error))) + } + } + ) + } catch (e: Throwable) { + callback(Outcome.Failure(e)) + } + Cancellable.empty + } + +/** + * Creates a task from a [CancellablePromise] builder. + * + * Contract: + * - Cancellation triggers the provided [Cancellable], but the resulting + * task only completes when [CancellablePromise.join] completes. + * - If [CancellablePromise.join] rejects, the task fails with that error. + * - Otherwise, if cancellation was requested, the task completes with + * [Outcome.Cancellation]. + * - Otherwise, the task mirrors [CancellablePromise.promise]. + * + * Example: + * ```kotlin + * val task = Task.fromCancellablePromise { + * val promise = Promise { resolve, _ -> resolve(1) } + * val join = Promise { resolve, _ -> + * promise.then({ resolve(Unit) }, { resolve(Unit) }) + * } + * CancellablePromise(promise, join, Cancellable.empty) + * } + * ``` + */ +public fun Task.Companion.fromCancellablePromise( + builder: () -> CancellablePromise +): Task = + Task.fromAsync { _, callback -> + var isDone = false + var isCancelled = false + var token: Cancellable = Cancellable.empty + try { + val value = builder() + token = value.cancellable + value.join.then( + { + if (!isDone) { + if (isCancelled) { + isDone = true + callback(Outcome.Cancellation) + } else { + value.promise.then( + { result -> + if (!isDone) { + isDone = true + callback(Outcome.Success(result)) + } + }, + { error -> + if (!isDone) { + isDone = true + callback(Outcome.Failure(promiseFailure(error))) + } + } + ) + } + } + }, + { error -> + if (!isDone) { + isDone = true + callback(Outcome.Failure(promiseFailure(error))) + } + } + ) + } catch (e: Throwable) { + callback(Outcome.Failure(e)) + } + Cancellable { + isCancelled = true + token.cancel() + } + } + +/** + * Executes the task and returns its result as a JavaScript [Promise]. + * + * Contract: + * - [Outcome.Success] resolves the promise. + * - [Outcome.Failure] rejects the promise with the original exception. + * - [Outcome.Cancellation] rejects with [TaskCancellationException]. + * + * Example: + * ```kotlin + * Task.fromAsync { _, cb -> + * cb(Outcome.Success(1)) + * Cancellable.empty + * }.runToPromise() + * ``` + */ +public fun Task.runToPromise(): Promise = + Promise { resolve, reject -> + runAsync { outcome -> + when (outcome) { + is Outcome.Success -> resolve(outcome.value) + is Outcome.Failure -> reject(outcome.exception) + is Outcome.Cancellation -> reject(TaskCancellationException()) + } + } + } + +/** + * Executes the task and returns a [CancellablePromise]. + * + * The resulting [CancellablePromise.join] completes regardless of whether + * the task succeeds, fails, or is cancelled. + * + * Example: + * ```kotlin + * val cp = Task.fromAsync { _, cb -> + * cb(Outcome.Success(1)) + * Cancellable.empty + * }.runToCancellablePromise() + * cp.cancelAndJoin() + * ``` + */ +public fun Task.runToCancellablePromise(): CancellablePromise { + var token: Cancellable = Cancellable.empty + val promise = Promise { resolve, reject -> + token = runAsync { outcome -> + when (outcome) { + is Outcome.Success -> resolve(outcome.value) + is Outcome.Failure -> reject(outcome.exception) + is Outcome.Cancellation -> reject(TaskCancellationException()) + } + } + } + val join = Promise { resolve, _ -> + promise.then( + { resolve(Unit) }, + { resolve(Unit) } + ) + } + return CancellablePromise(promise, join, Cancellable { token.cancel() }) +} diff --git a/tasks-kotlin/src/jsMain/kotlin/org/funfix/tasks/kotlin/exceptions.js.kt b/tasks-kotlin/src/jsMain/kotlin/org/funfix/tasks/kotlin/exceptions.js.kt new file mode 100644 index 0000000..bb51e11 --- /dev/null +++ b/tasks-kotlin/src/jsMain/kotlin/org/funfix/tasks/kotlin/exceptions.js.kt @@ -0,0 +1,13 @@ +@file:Suppress("EXPECT_ACTUAL_CLASSIFIERS_ARE_IN_BETA_WARNING") + +package org.funfix.tasks.kotlin + +public actual open class TaskCancellationException public actual constructor( + message: String? +): Exception(message) { + public actual constructor() : this(null) +} + +public actual class FiberNotCompletedException + public actual constructor(): Exception("Fiber not completed yet") + diff --git a/tasks-kotlin/src/jsMain/kotlin/org/funfix/tasks/kotlin/executors.js.kt b/tasks-kotlin/src/jsMain/kotlin/org/funfix/tasks/kotlin/executors.js.kt new file mode 100644 index 0000000..ca602ae --- /dev/null +++ b/tasks-kotlin/src/jsMain/kotlin/org/funfix/tasks/kotlin/executors.js.kt @@ -0,0 +1,129 @@ +@file:Suppress("EXPECT_ACTUAL_CLASSIFIERS_ARE_IN_BETA_WARNING") + +package org.funfix.tasks.kotlin + +import org.w3c.dom.WindowOrWorkerGlobalScope +import kotlin.js.Promise + +public actual fun interface Runnable { + public actual fun run() +} + +public actual fun interface Executor { + public actual fun execute(command: Runnable) +} + +public actual val SharedIOExecutor: Executor + get() = JSExecutor + +public actual val TrampolineExecutor: Executor + get() = Trampoline + +private external val self: dynamic +private external val global: dynamic + +private val globalOrSelfDynamic = + (self ?: global)!! +private val globalOrSelf = + globalOrSelfDynamic.unsafeCast() + +private object JSExecutor: Executor { + class NotSupported(execType: ExecType): Exception( + "Executor type $execType is not supported on this runtime" + ) + + private var execType = + if (globalOrSelfDynamic.setInterval != null) ExecType.ViaSetInterval + else if (globalOrSelfDynamic.setTimeout != null) ExecType.ViaSetTimeout + else ExecType.Trampolined + + inline fun withExecType(execType: ExecType, block: () -> T): T { + when (execType) { + ExecType.ViaSetInterval -> + if (globalOrSelfDynamic.setInterval == null) throw NotSupported(execType) + ExecType.ViaSetTimeout -> + if (globalOrSelfDynamic.setTimeout == null) throw NotSupported(execType) + ExecType.Trampolined -> + Unit + } + val oldRef = this.execType + this.execType = execType + try { + return block() + } finally { + this.execType = oldRef + } + } + + fun yield(): Promise { + return Promise { resolve, _ -> + execute { + resolve(Unit) + } + } + } + + override fun execute(command: Runnable) { + val handler: () -> Unit = { + try { + command.run() + } catch (e: Exception) { + UncaughtExceptionHandler.logOrRethrow(e) + } + } + when (execType) { + ExecType.ViaSetInterval -> + globalOrSelf.setInterval(handler) + ExecType.ViaSetTimeout -> + globalOrSelf.setTimeout(handler, -1) + ExecType.Trampolined -> + Trampoline.execute(command) + } + } + + sealed interface ExecType { + data object ViaSetInterval: ExecType + data object ViaSetTimeout: ExecType + data object Trampolined: ExecType + } +} + +private object Trampoline: Executor { + private var queue: MutableList? = null + + private fun eventLoop() { + while (true) { + val current = queue + if (current.isNullOrEmpty()) { + return + } + val next = current.removeFirstOrNull() + try { + next?.run() + } catch (e: Exception) { + UncaughtExceptionHandler.logOrRethrow(e) + } + } + } + + override fun execute(command: Runnable) { + val current = queue ?: mutableListOf() + current.add(command) + queue = current + try { + eventLoop() + } finally { + queue = null + } + } +} + +public actual object UncaughtExceptionHandler { + public actual fun rethrowIfFatal(e: Throwable) { + // Can we do something here? + } + + public actual fun logOrRethrow(e: Throwable) { + console.error(e) + } +} diff --git a/tasks-kotlin/src/jsTest/kotlin/org/funfix/tasks/kotlin/SampleJsTest.kt b/tasks-kotlin/src/jsTest/kotlin/org/funfix/tasks/kotlin/SampleJsTest.kt new file mode 100644 index 0000000..8b67470 --- /dev/null +++ b/tasks-kotlin/src/jsTest/kotlin/org/funfix/tasks/kotlin/SampleJsTest.kt @@ -0,0 +1,11 @@ +package org.funfix.tasks.kotlin + +import kotlin.test.Test +import kotlin.test.assertEquals + +class SampleJsTest { + @Test + fun sampleJsTest() { + assertEquals(4, 2 + 2) + } +} diff --git a/tasks-kotlin/src/jsTest/kotlin/org/funfix/tests/TaskFromPromiseJsTest.kt b/tasks-kotlin/src/jsTest/kotlin/org/funfix/tests/TaskFromPromiseJsTest.kt new file mode 100644 index 0000000..68c057c --- /dev/null +++ b/tasks-kotlin/src/jsTest/kotlin/org/funfix/tests/TaskFromPromiseJsTest.kt @@ -0,0 +1,104 @@ +package org.funfix.tests + +import kotlin.js.Promise +import kotlin.test.Test +import kotlin.test.assertEquals +import kotlin.test.assertFalse +import kotlin.test.assertTrue +import kotlinx.coroutines.CompletableDeferred +import kotlinx.coroutines.test.runTest +import kotlinx.coroutines.yield +import org.funfix.tasks.kotlin.Cancellable +import org.funfix.tasks.kotlin.CancellablePromise +import org.funfix.tasks.kotlin.Outcome +import org.funfix.tasks.kotlin.Task +import org.funfix.tasks.kotlin.fromCancellablePromise +import org.funfix.tasks.kotlin.fromPromise +import org.funfix.tasks.kotlin.runAsync + +class TaskFromPromiseJsTest { + private fun joinOf(promise: Promise): Promise = + Promise { resolve, _ -> + promise.then( + { resolve(Unit) }, + { resolve(Unit) } + ) + } + + @Test + fun `fromPromise (success)`() = runTest { + val task = Task.fromPromise { + Promise { resolve, _ -> resolve(1) } + } + val deferred = CompletableDeferred>() + task.runAsync { outcome -> deferred.complete(outcome) } + assertEquals(Outcome.Success(1), deferred.await()) + } + + @Test + fun `fromPromise (failure)`() = runTest { + val ex = RuntimeException("Boom!") + val task = Task.fromPromise { + Promise { _, reject -> reject(ex) } + } + val deferred = CompletableDeferred>() + task.runAsync { outcome -> deferred.complete(outcome) } + assertEquals(Outcome.Failure(ex), deferred.await()) + } + + @Test + fun `fromPromise (cancellation waits for completion)`() = runTest { + lateinit var resolve: (Int) -> Unit + val task = Task.fromPromise { + Promise { res, _ -> resolve = res } + } + val deferred = CompletableDeferred>() + val cancel = task.runAsync { outcome -> deferred.complete(outcome) } + cancel.cancel() + yield() + assertFalse(deferred.isCompleted) + resolve(1) + assertEquals(Outcome.Success(1), deferred.await()) + } + + @Test + fun `fromCancellablePromise (success)`() = runTest { + val task = Task.fromCancellablePromise { + val promise = Promise { resolve, _ -> resolve(1) } + CancellablePromise(promise, joinOf(promise), Cancellable.empty) + } + val deferred = CompletableDeferred>() + task.runAsync { outcome -> deferred.complete(outcome) } + assertEquals(Outcome.Success(1), deferred.await()) + } + + @Test + fun `fromCancellablePromise (failure)`() = runTest { + val ex = RuntimeException("Boom!") + val task = Task.fromCancellablePromise { + val promise = Promise { _, reject -> reject(ex) } + CancellablePromise(promise, joinOf(promise), Cancellable.empty) + } + val deferred = CompletableDeferred>() + task.runAsync { outcome -> deferred.complete(outcome) } + assertEquals(Outcome.Failure(ex), deferred.await()) + } + + @Test + fun `fromCancellablePromise (cancellation waits and triggers token)`() = runTest { + lateinit var resolve: (Int) -> Unit + var cancelled = false + val task = Task.fromCancellablePromise { + val promise = Promise { res, _ -> resolve = res } + CancellablePromise(promise, joinOf(promise), Cancellable { cancelled = true }) + } + val deferred = CompletableDeferred>() + val cancel = task.runAsync { outcome -> deferred.complete(outcome) } + cancel.cancel() + yield() + assertTrue(cancelled) + assertFalse(deferred.isCompleted) + resolve(1) + assertEquals(Outcome.Cancellation, deferred.await()) + } +} diff --git a/tasks-kotlin/src/jsTest/kotlin/org/funfix/tests/TaskRunToPromiseJsTest.kt b/tasks-kotlin/src/jsTest/kotlin/org/funfix/tests/TaskRunToPromiseJsTest.kt new file mode 100644 index 0000000..0d2afc8 --- /dev/null +++ b/tasks-kotlin/src/jsTest/kotlin/org/funfix/tests/TaskRunToPromiseJsTest.kt @@ -0,0 +1,87 @@ +package org.funfix.tests + +import kotlin.test.Test +import kotlin.test.assertEquals +import kotlin.test.assertTrue +import kotlinx.coroutines.CompletableDeferred +import kotlinx.coroutines.test.runTest +import org.funfix.tasks.kotlin.Cancellable +import org.funfix.tasks.kotlin.Outcome +import org.funfix.tasks.kotlin.Task +import org.funfix.tasks.kotlin.TaskCancellationException +import org.funfix.tasks.kotlin.fromAsync +import org.funfix.tasks.kotlin.runToCancellablePromise +import org.funfix.tasks.kotlin.runToPromise + +class TaskRunToPromiseJsTest { + @Test + fun `runToPromise (success)`() = runTest { + val task = Task.fromAsync { _, cb -> + cb(Outcome.Success(1)) + Cancellable.empty + } + val deferred = CompletableDeferred() + task.runToPromise().then({ value -> + deferred.complete(value) + }, { error -> + deferred.completeExceptionally(error) + }) + assertEquals(1, deferred.await()) + } + + @Test + fun `runToPromise (failure)`() = runTest { + val ex = RuntimeException("Boom!") + val task = Task.fromAsync { _, cb -> + cb(Outcome.Failure(ex)) + Cancellable.empty + } + val deferred = CompletableDeferred() + task.runToPromise().then({ _ -> + deferred.complete(RuntimeException("Unexpected")) + }, { error -> + deferred.complete(error) + }) + assertEquals(ex, deferred.await()) + } + + @Test + fun `runToPromise (cancellation)`() = runTest { + val task = Task.fromAsync { _, cb -> + cb(Outcome.Cancellation) + Cancellable.empty + } + val deferred = CompletableDeferred() + task.runToPromise().then({ _ -> + deferred.complete(RuntimeException("Unexpected")) + }, { error -> + deferred.complete(error) + }) + val error = deferred.await() + assertTrue(error is TaskCancellationException) + } + + @Test + fun `runToCancellablePromise cancelAndJoin`() = runTest { + val task = Task.fromAsync { _, cb -> + Cancellable { + cb(Outcome.Cancellation) + } + } + val cp = task.runToCancellablePromise() + val error = CompletableDeferred() + val joined = CompletableDeferred() + cp.promise.then({ _ -> + error.complete(RuntimeException("Unexpected")) + }, { err -> + error.complete(err) + }) + cp.cancelAndJoin().then({ + joined.complete(Unit) + }, { + joined.complete(Unit) + }) + joined.await() + assertTrue(error.await() is TaskCancellationException) + } +} diff --git a/tasks-kotlin/src/jvmMain/kotlin/org/funfix/tasks/kotlin/Fiber.jvm.kt b/tasks-kotlin/src/jvmMain/kotlin/org/funfix/tasks/kotlin/Fiber.jvm.kt new file mode 100644 index 0000000..1d5d358 --- /dev/null +++ b/tasks-kotlin/src/jvmMain/kotlin/org/funfix/tasks/kotlin/Fiber.jvm.kt @@ -0,0 +1,197 @@ +@file:JvmName("FiberJvmKt") + +package org.funfix.tasks.kotlin + +import org.jetbrains.annotations.Blocking +import org.jetbrains.annotations.NonBlocking +import java.util.concurrent.ExecutionException +import java.util.concurrent.TimeoutException +import kotlin.jvm.Throws +import kotlin.time.Duration +import kotlin.time.toJavaDuration + +public typealias PlatformFiber = org.funfix.tasks.jvm.Fiber + +/** + * A fiber is a running task being executed concurrently, and that can be + * joined/awaited or cancelled. + * + * This is the equivalent of Kotlin's `Deferred` type. + * + * This is designed to be a compile-time type that's going to be erased at + * runtime. Therefore, for the JVM at least, when using it in your APIs, it + * won't pollute it with Kotlin-specific wrappers. + */ +@JvmInline +public value class Fiber public constructor( + public val asPlatform: PlatformFiber +): Cancellable { + /** + * Cancels the fiber, which will eventually stop the running fiber (if + * it's still running), completing it via "cancellation". + * + * This manifests either in a [TaskCancellationException] being thrown by + * [resultOrThrow], or in the completion callback being triggered. + */ + @NonBlocking + override fun cancel(): Unit = asPlatform.cancel() +} + +/** + * Converts the source to a [Kotlin Fiber][Fiber]. + * + * E.g., can convert from a `jvm.Fiber` to a `kotlin.Fiber`. + */ +public fun PlatformFiber.asKotlin(): Fiber = + Fiber(this) + +/** + * Returns the result of the completed fiber. + * + * This method does not block for the result. In case the fiber is not + * completed, it throws [FiberNotCompletedException]. Therefore, by contract, + * it should be called only after the fiber was "joined". + * + * @return the result of the concurrent task, if successful. + * @throws TaskCancellationException if the task was cancelled concurrently, + * thus being completed via cancellation. + * @throws FiberNotCompletedException if the fiber is not completed yet. + * @throws Throwable if the task finished with an exception. + */ +public val Fiber.resultOrThrow: T + @NonBlocking + @Throws(TaskCancellationException::class, FiberNotCompletedException::class) + get() = asPlatform.resultOrThrow + +/** + * Returns the [Outcome] of the completed fiber, or `null` in case the + * fiber is not completed yet. + * + * This method does not block for the result. In case the fiber is not + * completed, it returns `null`. Therefore, it should be called after + * the fiber was "joined". + */ +public val Fiber.outcomeOrNull: Outcome? get() = + try { + Outcome.Success(asPlatform.resultOrThrow) + } catch (e: TaskCancellationException) { + Outcome.Cancellation + } catch (e: ExecutionException) { + Outcome.Failure(e.cause ?: e) + } catch (e: Throwable) { + UncaughtExceptionHandler.rethrowIfFatal(e) + Outcome.Failure(e) + } + +/** + * Waits until the fiber completes, and then runs the given callback to + * signal its completion. + * + * Completion includes cancellation. Triggering [Fiber.cancel] before + * [joinAsync] will cause the fiber to get cancelled, and then the + * "join" back-pressures on cancellation. + * + * @param onComplete is the callback to run when the fiber completes + * (successfully, or with failure, or cancellation) + */ +@NonBlocking +public fun Fiber.joinAsync(onComplete: Runnable): Cancellable = + asPlatform.joinAsync(onComplete) + +/** + * Waits until the fiber completes, and then runs the given callback + * to signal its completion. + * + * This method can be executed as many times as necessary, with the + * result of the `Fiber` being memoized. It can also be executed + * after the fiber has completed, in which case the callback will be + * executed immediately. + * + * @param callback will be called with the result when the fiber completes. + * + * @return a [Cancellable] that can be used to unregister the callback, + * in case the caller is no longer interested in the result. Note this + * does not cancel the fiber itself. + */ +@NonBlocking +public fun Fiber.awaitAsync(callback: Callback): Cancellable = + asPlatform.awaitAsync(callback.asJava()) + +/** + * Blocks the current thread until the fiber completes. + * + * This method does not return the outcome of the fiber. To check + * the outcome, use [resultOrThrow]. + * + * @throws InterruptedException if the current thread is interrupted, which + * will just stop waiting for the fiber, but will not cancel the running + * task. + */ +@Blocking +@Throws(InterruptedException::class) +public fun Fiber.joinBlocking(): Unit = + asPlatform.joinBlocking() + +/** + * Blocks the current thread until the fiber completes, then returns the + * result of the fiber. + * + * @throws InterruptedException if the current thread is interrupted, which + * will just stop waiting for the fiber, but will not cancel the running task. + * + * @throws TaskCancellationException if the fiber was cancelled concurrently. + * + * @throws Throwable if the task failed with an exception. + */ +@Blocking +@Throws(InterruptedException::class, TaskCancellationException::class) +public fun Fiber.awaitBlocking(): T = + try { + asPlatform.awaitBlocking() + } catch (e: ExecutionException) { + throw e.cause ?: e + } + +/** + * Blocks the current thread until the fiber completes, or until the + * timeout is reached. + * + * This method does not return the outcome of the fiber. To check the + * outcome, use [resultOrThrow]. + * + * @throws InterruptedException if the current thread is interrupted, which + * will just stop waiting for the fiber, but will not cancel the running + * task. + * + * @throws TimeoutException if the timeout is reached before the fiber + * completes. + */ +@Blocking +@Throws(InterruptedException::class, TimeoutException::class) +public fun Fiber.joinBlockingTimed(timeout: Duration): Unit = + asPlatform.joinBlockingTimed(timeout.toJavaDuration()) + +/** + * Blocks the current thread until the fiber completes, then returns the result of the fiber. + * + * @param timeout the maximum time to wait for the fiber to complete, before + * throwing a [TimeoutException]. + * + * @return the result of the fiber, if successful. + * + * @throws InterruptedException if the current thread is interrupted, which + * will just stop waiting for the fiber, but will not cancel the running + * task. + * @throws TimeoutException if the timeout is reached before the fiber completes. + * @throws TaskCancellationException if the fiber was cancelled concurrently. + * @throws Throwable if the task failed with an exception. + */ +@Blocking +@Throws(InterruptedException::class, TaskCancellationException::class, TimeoutException::class) +public fun Fiber.awaitBlockingTimed(timeout: Duration): T = + try { + asPlatform.awaitBlockingTimed(timeout.toJavaDuration()) + } catch (e: ExecutionException) { + throw e.cause ?: e + } + diff --git a/tasks-kotlin/src/jvmMain/kotlin/org/funfix/tasks/kotlin/Task.jvm.kt b/tasks-kotlin/src/jvmMain/kotlin/org/funfix/tasks/kotlin/Task.jvm.kt new file mode 100644 index 0000000..0c8f59c --- /dev/null +++ b/tasks-kotlin/src/jvmMain/kotlin/org/funfix/tasks/kotlin/Task.jvm.kt @@ -0,0 +1,220 @@ +@file:Suppress("EXPECT_ACTUAL_CLASSIFIERS_ARE_IN_BETA_WARNING") +@file:JvmName("TaskJvmKt") + +package org.funfix.tasks.kotlin + +import org.jetbrains.annotations.Blocking +import org.jetbrains.annotations.NonBlocking +import java.util.concurrent.CompletionStage +import java.util.concurrent.ExecutionException +import java.util.concurrent.Future +import java.util.concurrent.TimeoutException +import kotlin.jvm.Throws +import kotlin.time.Duration +import kotlin.time.toJavaDuration + +public actual typealias PlatformTask = org.funfix.tasks.jvm.Task + +@JvmInline +public actual value class Task public actual constructor( + public actual val asPlatform: PlatformTask +) { + /** + * Converts this task to a `jvm.Task`. + */ + public val asJava: PlatformTask get() = asPlatform + + public actual companion object +} + +@NonBlocking +public actual fun Task.ensureRunningOnExecutor(executor: Executor?): Task = + Task(when (executor) { + null -> asPlatform.ensureRunningOnExecutor() + else -> asPlatform.ensureRunningOnExecutor(executor) + }) + +@NonBlocking +public actual fun Task.runAsync( + executor: Executor?, + callback: Callback +): Cancellable = + when (executor) { + null -> asPlatform.runAsync(callback.asJava()) + else -> asPlatform.runAsync(executor, callback.asJava()) + } + +/** + * Executes the task concurrently and returns a [Fiber] that can be + * used to wait for the result or cancel the task. + * + * Similar to [runAsync], this method starts the execution on a different thread. + * + * @param executor is the [Executor] that may be used to run the task. + * + * @return a [Fiber] that can be used to wait for the outcome, + * or to cancel the running fiber. + */ +@NonBlocking +public fun Task.runFiber(executor: Executor? = null): Fiber = + Fiber(when (executor) { + null -> asPlatform.runFiber() + else -> asPlatform.runFiber(executor) + }) + +/** + * Executes the task and blocks until it completes, or the current thread gets + * interrupted (in which case the task is also cancelled). + * + * Given that the intention is to block the current thread for the result, the + * task starts execution on the current thread. + * + * @param executor the [Executor] that may be used to run the task. + * @return the successful result of the task. + * + * @throws InterruptedException if the current thread is interrupted, which also + * cancels the running task. Note that on interruption, the running concurrent + * task must also be interrupted, as this method always blocks for its + * interruption or completion. + * + * @throws Throwable if the task fails with an exception + */ +@Blocking +@Throws(InterruptedException::class) +public fun Task.runBlocking(executor: Executor? = null): T = + try { + when (executor) { + null -> asPlatform.runBlocking() + else -> asPlatform.runBlocking(executor) + } + } catch (e: ExecutionException) { + throw e.cause ?: e + } + +/** + * Executes the task and blocks until it completes, the timeout is reached, or + * the current thread is interrupted. + * + * **EXECUTION MODEL:** Execution starts on a different thread, by necessity, + * otherwise the execution could block the current thread indefinitely, without + * the possibility of interrupting the task after the timeout occurs. + * + * @param timeout the maximum time to wait for the task to complete before + * throwing a [TimeoutException]. + * + * @param executor the [Executor] that may be used to run the task. If one isn't + * provided, the execution will use [SharedIOExecutor] as the default. + * + * @return the successful result of the task. + * + * @throws InterruptedException if the current thread is interrupted. The + * running task is also cancelled, and this method does not return until + * `onCancel` is signaled. + * + * @throws TimeoutException if the task doesn't complete within the specified + * timeout. The running task is also cancelled on timeout, and this method does + * not return until `onCancel` is signaled. + * + * @throws Throwable if the task fails with an exception. + */ +@Blocking +@Throws(InterruptedException::class, TimeoutException::class) +public fun Task.runBlockingTimed( + timeout: Duration, + executor: Executor? = null +): T = + try { + when (executor) { + null -> asPlatform.runBlockingTimed(timeout.toJavaDuration()) + else -> asPlatform.runBlockingTimed(executor, timeout.toJavaDuration()) + } + } catch (e: ExecutionException) { + throw e.cause ?: e + } + +// Builders + +@NonBlocking +public actual fun Task.Companion.fromAsync( + start: (Executor, Callback) -> Cancellable +): Task = + Task(PlatformTask.fromAsync { executor, cb -> + start(executor, cb.asKotlin()) + }) + +/** + * Creates a task from a function executing blocking I/O. + * + * This uses Java's interruption protocol (i.e., [Thread.interrupt]) for + * cancelling the task. + */ +@NonBlocking +public fun Task.Companion.fromBlockingIO(block: () -> T): Task = + Task(PlatformTask.fromBlockingIO(block)) + +/** + * Creates a task from a [Future] builder. + * + * This is compatible with Java's interruption protocol and [Future.cancel], + * with the resulting task being cancellable. + * + * **NOTE:** Use [fromCompletionStage] for directly converting + * [java.util.concurrent.CompletableFuture] builders, because it is not possible + * to cancel such values, and the logic needs to reflect it. Better yet, use + * [fromCancellableFuture] for working with [CompletionStage] values that can be + * cancelled. + * + * @param builder is the function that will create the [Future] upon this task's + * execution. + * + * @return a new task that will complete with the result of the created `Future` + * upon execution + * + * @see fromCompletionStage + * @see fromCancellableFuture + */ +@NonBlocking +public fun Task.Companion.fromBlockingFuture(builder: () -> Future): Task = + Task(PlatformTask.fromBlockingFuture(builder)) + +/** + * Creates tasks from a builder of [CompletionStage]. + * + * **NOTE:** `CompletionStage` isn't cancellable, and the resulting task should + * reflect this (i.e., on cancellation, the listener should not receive an + * `onCancel` signal until the `CompletionStage` actually completes). + * + * Prefer using [fromCancellableFuture] for working with [CompletionStage] + * values that can be cancelled. + * + * @param builder is the function that will create the [CompletionStage] + * value. It's a builder because `Task` values are cold values + * (lazy, not executed yet). + * + * @return a new task that upon execution will complete with the result of + * the created `CancellableCompletionStage` + * + * @see fromCancellableFuture + */ +@NonBlocking +public fun Task.Companion.fromCompletionStage(builder: () -> CompletionStage): Task = + Task(PlatformTask.fromCompletionStage(builder)) + +/** + * Creates tasks from a builder of [CancellableFuture]. + * + * This is the recommended way to work with [CompletionStage] builders, because + * cancelling such values (e.g., [java.util.concurrent.CompletableFuture]) + * doesn't work for cancelling the connecting computation. As such, the user + * should provide an explicit [Cancellable] token that can be used. + * + * @param builder the function that will create the [CancellableFuture] value. + * It's a builder because [Task] values are cold values (lazy, not executed + * yet). + * + * @return a new task that upon execution will complete with the result of the + * created [CancellableFuture] + */ +@NonBlocking +public fun Task.Companion.fromCancellableFuture(builder: () -> CancellableFuture): Task = + Task(PlatformTask.fromCancellableFuture(builder)) diff --git a/tasks-kotlin/src/jvmMain/kotlin/org/funfix/tasks/kotlin/UncaughtExceptionHandler.jvm.kt b/tasks-kotlin/src/jvmMain/kotlin/org/funfix/tasks/kotlin/UncaughtExceptionHandler.jvm.kt new file mode 100644 index 0000000..9d7679f --- /dev/null +++ b/tasks-kotlin/src/jvmMain/kotlin/org/funfix/tasks/kotlin/UncaughtExceptionHandler.jvm.kt @@ -0,0 +1,16 @@ +@file:Suppress("EXPECT_ACTUAL_CLASSIFIERS_ARE_IN_BETA_WARNING") + +package org.funfix.tasks.kotlin + +/** + * Utilities for handling uncaught exceptions. + */ +public actual object UncaughtExceptionHandler { + public actual fun rethrowIfFatal(e: Throwable) { + org.funfix.tasks.jvm.UncaughtExceptionHandler.rethrowIfFatal(e) + } + + public actual fun logOrRethrow(e: Throwable) { + org.funfix.tasks.jvm.UncaughtExceptionHandler.logOrRethrow(e) + } +} diff --git a/tasks-kotlin/src/jvmMain/kotlin/org/funfix/tasks/kotlin/aliases.kt b/tasks-kotlin/src/jvmMain/kotlin/org/funfix/tasks/kotlin/aliases.kt new file mode 100644 index 0000000..2ef84a3 --- /dev/null +++ b/tasks-kotlin/src/jvmMain/kotlin/org/funfix/tasks/kotlin/aliases.kt @@ -0,0 +1,26 @@ +@file:JvmName("AliasesKt") +@file:Suppress("EXPECT_ACTUAL_CLASSIFIERS_ARE_IN_BETA_WARNING") + +package org.funfix.tasks.kotlin + +public actual typealias Cancellable = org.funfix.tasks.jvm.Cancellable + +/** + * A `CancellableFuture` is a tuple of a `CompletableFuture` and a `Cancellable` + * reference. + * + * It's used to model the result of asynchronous computations that can be + * cancelled. Needed because `CompletableFuture` doesn't actually support + * cancellation. It's similar to [Fiber], which should be preferred, because + * it's more principled. `CancellableFuture` is useful for interop with + * Java libraries that use `CompletableFuture`. + */ +public typealias CancellableFuture = org.funfix.tasks.jvm.CancellableFuture + +public actual typealias TaskCancellationException = org.funfix.tasks.jvm.TaskCancellationException + +public actual typealias FiberNotCompletedException = org.funfix.tasks.jvm.Fiber.NotCompletedException + +public actual typealias Runnable = java.lang.Runnable + +public actual typealias Executor = java.util.concurrent.Executor diff --git a/tasks-kotlin/src/jvmMain/kotlin/org/funfix/tasks/kotlin/executors.jvm.kt b/tasks-kotlin/src/jvmMain/kotlin/org/funfix/tasks/kotlin/executors.jvm.kt new file mode 100644 index 0000000..1480b94 --- /dev/null +++ b/tasks-kotlin/src/jvmMain/kotlin/org/funfix/tasks/kotlin/executors.jvm.kt @@ -0,0 +1,11 @@ +@file:JvmName("ExecutorsJvmKt") + +package org.funfix.tasks.kotlin + +import org.funfix.tasks.jvm.TaskExecutors + +public actual val TrampolineExecutor: Executor + get() = TaskExecutors.trampoline() + +public actual val SharedIOExecutor: Executor + get() = TaskExecutors.sharedBlockingIO() diff --git a/tasks-kotlin/src/jvmMain/kotlin/org/funfix/tasks/kotlin/internals.kt b/tasks-kotlin/src/jvmMain/kotlin/org/funfix/tasks/kotlin/internals.kt new file mode 100644 index 0000000..5d15ab3 --- /dev/null +++ b/tasks-kotlin/src/jvmMain/kotlin/org/funfix/tasks/kotlin/internals.kt @@ -0,0 +1,26 @@ +@file:JvmName("InternalsJvmKt") + +package org.funfix.tasks.kotlin + +import org.funfix.tasks.jvm.CompletionCallback + + +internal typealias KotlinCallback = (Outcome) -> Unit + +internal fun CompletionCallback.asKotlin(): KotlinCallback = + { outcome -> + when (outcome) { + is Outcome.Success -> this.onSuccess(outcome.value) + is Outcome.Failure -> this.onFailure(outcome.exception) + is Outcome.Cancellation -> this.onCancellation() + } + } + +internal fun KotlinCallback.asJava(): CompletionCallback = + CompletionCallback { outcome -> + when (outcome) { + is org.funfix.tasks.jvm.Outcome.Success -> this@asJava(Outcome.Success(outcome.value)) + is org.funfix.tasks.jvm.Outcome.Failure -> this@asJava(Outcome.Failure(outcome.exception)) + is org.funfix.tasks.jvm.Outcome.Cancellation -> this@asJava(Outcome.Cancellation) + } + } diff --git a/tasks-kotlin/src/jvmTest/kotlin/org/funfix/tests/TaskEnsureRunningOnExecutorTest.kt b/tasks-kotlin/src/jvmTest/kotlin/org/funfix/tests/TaskEnsureRunningOnExecutorTest.kt new file mode 100644 index 0000000..2d68f79 --- /dev/null +++ b/tasks-kotlin/src/jvmTest/kotlin/org/funfix/tests/TaskEnsureRunningOnExecutorTest.kt @@ -0,0 +1,34 @@ +package org.funfix.tests + +import org.funfix.tasks.kotlin.Task +import org.funfix.tasks.kotlin.ensureRunningOnExecutor +import org.funfix.tasks.kotlin.fromBlockingIO +import org.funfix.tasks.kotlin.runBlocking +import org.junit.jupiter.api.Assertions.assertTrue +import java.util.concurrent.Executors +import kotlin.test.Test +import kotlin.test.assertEquals + +class TaskEnsureRunningOnExecutorTest { + @Test + @Suppress("DEPRECATION") + fun ensureRunningOnExecutorWorks() { + val ex = Executors.newCachedThreadPool { r -> + val th = Thread(r) + th.name = "my-thread-" + th.id + th + } + try { + val t = Task.fromBlockingIO { Thread.currentThread().name } + val n1 = t.runBlocking() + val n2 = t.ensureRunningOnExecutor().runBlocking() + val n3 = t.ensureRunningOnExecutor(ex).runBlocking() + + assertEquals(Thread.currentThread().name, n1) + assertTrue(n2.startsWith("tasks-io-"), "tasks-io") + assertTrue(n3.startsWith("my-thread-"), "my-thread") + } finally { + ex.shutdown() + } + } +} diff --git a/tasks-kotlin/src/jvmTest/kotlin/org/funfix/tests/TaskFromAsyncTest.kt b/tasks-kotlin/src/jvmTest/kotlin/org/funfix/tests/TaskFromAsyncTest.kt new file mode 100644 index 0000000..79a1d37 --- /dev/null +++ b/tasks-kotlin/src/jvmTest/kotlin/org/funfix/tests/TaskFromAsyncTest.kt @@ -0,0 +1,56 @@ +package org.funfix.tests + +import org.funfix.tasks.kotlin.Cancellable +import org.funfix.tasks.kotlin.Outcome +import org.funfix.tasks.kotlin.Task +import org.funfix.tasks.kotlin.fromAsync +import org.funfix.tasks.kotlin.joinBlocking +import org.funfix.tasks.kotlin.outcomeOrNull +import org.funfix.tasks.kotlin.runBlocking +import org.funfix.tasks.kotlin.runFiber +import java.util.concurrent.CountDownLatch +import kotlin.test.Test +import kotlin.test.assertEquals + +class TaskFromAsyncTest { + @Test + fun `fromAsync happy path`() { + val task = Task.fromAsync { _, cb -> + cb(Outcome.success(1)) + Cancellable.getEmpty() + } + assertEquals(1, task.runBlocking()) + } + + @Test + fun `fromAsync with error`() { + val ex = RuntimeException("Boom!") + val task = Task.fromAsync { _, cb -> + cb(Outcome.failure(ex)) + Cancellable.getEmpty() + } + try { + task.runBlocking() + } catch (e: RuntimeException) { + assertEquals(ex, e) + } + } + + @Test + fun `fromAsync can be cancelled`() { + val latch = CountDownLatch(1) + val task = Task.fromAsync { executor, cb -> + executor.execute { + latch.await() + cb(Outcome.Cancellation) + } + Cancellable { + latch.countDown() + } + } + val fiber = task.runFiber() + fiber.cancel() + fiber.joinBlocking() + assertEquals(Outcome.Cancellation, fiber.outcomeOrNull) + } +} diff --git a/tasks-kotlin/src/jvmTest/kotlin/org/funfix/tests/TaskFromBlockingIOTest.kt b/tasks-kotlin/src/jvmTest/kotlin/org/funfix/tests/TaskFromBlockingIOTest.kt new file mode 100644 index 0000000..3493719 --- /dev/null +++ b/tasks-kotlin/src/jvmTest/kotlin/org/funfix/tests/TaskFromBlockingIOTest.kt @@ -0,0 +1,42 @@ +package org.funfix.tests + +import org.funfix.tasks.kotlin.Outcome +import org.funfix.tasks.kotlin.Task +import org.funfix.tasks.kotlin.fromBlockingIO +import org.funfix.tasks.kotlin.joinBlocking +import org.funfix.tasks.kotlin.outcomeOrNull +import org.funfix.tasks.kotlin.runBlocking +import org.funfix.tasks.kotlin.runFiber +import kotlin.test.Test +import kotlin.test.assertEquals + +class TaskFromBlockingIOTest { + @Test + fun `fromBlockingIO (success)`() { + val task = Task.fromBlockingIO { 1 } + assertEquals(1, task.runBlocking()) + } + + @Test + fun `fromBlockingIO (failure)`() { + val ex = RuntimeException("Boom!") + val task = Task.fromBlockingIO { throw ex } + try { + task.runBlocking() + } catch (e: RuntimeException) { + assertEquals(ex, e) + } + } + + @Test + fun `fromBlockingIO (cancellation)`() { + val task: Task = Task.fromBlockingIO { + Thread.sleep(10000) + 1 + } + val fiber = task.runFiber() + fiber.cancel() + fiber.joinBlocking() + assertEquals(Outcome.Cancellation, fiber.outcomeOrNull) + } +} diff --git a/tasks-kotlin/src/jvmTest/kotlin/org/funfix/tests/TaskFromFutureTest.kt b/tasks-kotlin/src/jvmTest/kotlin/org/funfix/tests/TaskFromFutureTest.kt new file mode 100644 index 0000000..e6726a3 --- /dev/null +++ b/tasks-kotlin/src/jvmTest/kotlin/org/funfix/tests/TaskFromFutureTest.kt @@ -0,0 +1,169 @@ +package org.funfix.tests + +import org.funfix.tasks.jvm.Cancellable +import org.funfix.tasks.jvm.CancellableFuture +import org.funfix.tasks.kotlin.Outcome +import org.funfix.tasks.kotlin.Task +import org.funfix.tasks.kotlin.fromBlockingFuture +import org.funfix.tasks.kotlin.fromCancellableFuture +import org.funfix.tasks.kotlin.fromCompletionStage +import org.funfix.tasks.kotlin.joinBlocking +import org.funfix.tasks.kotlin.outcomeOrNull +import org.funfix.tasks.kotlin.runBlocking +import org.funfix.tasks.kotlin.runFiber +import java.util.concurrent.Callable +import java.util.concurrent.CompletableFuture +import java.util.concurrent.CountDownLatch +import java.util.concurrent.Executors +import kotlin.test.Test +import kotlin.test.assertEquals +import kotlin.test.fail + +class TaskFromFutureTest { + @Test + fun `fromBlockingFuture (success)`() { + val ec = Executors.newCachedThreadPool() + try { + val task = Task.fromBlockingFuture { + ec.submit(Callable { 1 }) + } + assertEquals(1, task.runBlocking()) + } finally { + ec.shutdown() + } + } + + @Test + fun `fromBlockingFuture (failure)`() { + val ex = RuntimeException("Boom!") + val ec = Executors.newCachedThreadPool() + try { + val task = Task.fromBlockingFuture { + ec.submit { throw ex } + } + try { + task.runBlocking() + fail("Expected exception") + } catch (e: RuntimeException) { + assertEquals(ex, e) + } + } finally { + ec.shutdown() + } + } + + @Test + fun `fromBlockingFuture (cancellation)`() { + val ec = Executors.newCachedThreadPool() + val wasStarted = CountDownLatch(1) + val wasCancelled = CountDownLatch(1) + try { + val task = Task.fromBlockingFuture { + ec.submit(Callable { + wasStarted.countDown() + try { + Thread.sleep(10000) + 1 + } catch (e: InterruptedException) { + wasCancelled.countDown() + throw e + } + }) + } + val fiber = task.runFiber() + TimedAwait.latchAndExpectCompletion(wasStarted, "wasStarted") + fiber.cancel() + fiber.joinBlocking() + assertEquals(Outcome.Cancellation, fiber.outcomeOrNull) + TimedAwait.latchAndExpectCompletion(wasCancelled, "wasCancelled") + } finally { + ec.shutdown() + } + } + + @Test + fun `fromCompletionStage (success)`() { + val task = Task.fromCompletionStage { + CompletableFuture.supplyAsync { 1 } + } + assertEquals(1, task.runBlocking()) + } + + @Test + fun `fromCompletionStage (failure)`() { + val ex = RuntimeException("Boom!") + val task = Task.fromCompletionStage { + CompletableFuture.supplyAsync { + throw ex + } + } + try { + task.runBlocking() + fail("Expected exception") + } catch (e: RuntimeException) { + assertEquals(ex, e) + } + } + + @Test + fun `fromCancellableFuture (success)`() { + val ec = Executors.newCachedThreadPool() + try { + val task = Task.fromCancellableFuture { + CancellableFuture( + CompletableFuture.supplyAsync { 1 }, + Cancellable.getEmpty() + ) + } + assertEquals(1, task.runBlocking()) + } finally { + ec.shutdown() + } + } + + @Test + fun `fromCancellableFuture (failure)`() { + val ex = RuntimeException("Boom!") + val ec = Executors.newCachedThreadPool() + try { + val task = Task.fromCancellableFuture { + CancellableFuture( + CompletableFuture.supplyAsync { + throw ex + }, + Cancellable.getEmpty() + ) + } + try { + task.runBlocking() + fail("Expected exception") + } catch (e: RuntimeException) { + assertEquals(ex, e) + } + } finally { + ec.shutdown() + } + } + + @Test + fun `fromCancellableFuture (cancellation)`() { + val wasStarted = CountDownLatch(1) + val wasCancelled = CountDownLatch(1) + val task = Task.fromCancellableFuture { + CancellableFuture( + CompletableFuture.supplyAsync { + wasStarted.countDown() + TimedAwait.latchNoExpectations(wasCancelled) + } + ) { + wasCancelled.countDown() + } + } + val fiber = task.runFiber() + TimedAwait.latchAndExpectCompletion(wasStarted, "wasStarted") + fiber.cancel() + fiber.joinBlocking() + assertEquals(Outcome.Cancellation, fiber.outcomeOrNull) + TimedAwait.latchAndExpectCompletion(wasCancelled, "wasCancelled") + } +} diff --git a/tasks-kotlin/src/jvmTest/kotlin/org/funfix/tests/TaskRunAsyncTest.kt b/tasks-kotlin/src/jvmTest/kotlin/org/funfix/tests/TaskRunAsyncTest.kt new file mode 100644 index 0000000..be81a16 --- /dev/null +++ b/tasks-kotlin/src/jvmTest/kotlin/org/funfix/tests/TaskRunAsyncTest.kt @@ -0,0 +1,105 @@ +package org.funfix.tests + +import java.util.concurrent.CountDownLatch +import java.util.concurrent.Executors +import java.util.concurrent.atomic.AtomicReference +import kotlin.test.Test +import kotlin.test.assertEquals +import kotlin.test.fail +import org.funfix.tasks.jvm.TaskCancellationException +import org.funfix.tasks.kotlin.Outcome +import org.funfix.tasks.kotlin.Task +import org.funfix.tasks.kotlin.fromBlockingIO +import org.funfix.tasks.kotlin.runAsync +import org.junit.jupiter.api.Assertions.assertTrue + +class TaskRunAsyncTest { + @Test + fun `runAsync (success)`() { + val latch = CountDownLatch(1) + val outcomeRef = AtomicReference?>(null) + val task = Task.fromBlockingIO { 1 } + task.runAsync { outcome -> + outcomeRef.set(outcome) + latch.countDown() + } + + TimedAwait.latchAndExpectCompletion(latch) + assertEquals(Outcome.Success(1), outcomeRef.get()) + assertEquals(1, outcomeRef.get()!!.orThrow) + } + + @Test + fun `runAsync (failure)`() { + val latch = CountDownLatch(1) + val outcomeRef = AtomicReference?>(null) + val ex = RuntimeException("Boom!") + val task = Task.fromBlockingIO { throw ex } + task.runAsync { outcome -> + outcomeRef.set(outcome) + latch.countDown() + } + + TimedAwait.latchAndExpectCompletion(latch) + assertEquals(Outcome.Failure(ex), outcomeRef.get()) + try { + outcomeRef.get()!!.orThrow + fail("Expected exception") + } catch (e: RuntimeException) { + assertEquals(ex, e) + } + } + + @Test + fun `runAsync (cancellation)`() { + val latch = CountDownLatch(1) + val wasStarted = CountDownLatch(1) + val outcomeRef = AtomicReference?>(null) + val task = Task.fromBlockingIO { + wasStarted.countDown() + Thread.sleep(10000) + 1 + } + val cancel = task.runAsync { outcome -> + outcomeRef.set(outcome) + latch.countDown() + } + + TimedAwait.latchAndExpectCompletion(wasStarted, "wasStarted") + cancel.cancel() + TimedAwait.latchAndExpectCompletion(latch) + assertEquals(Outcome.Cancellation, outcomeRef.get()) + try { + outcomeRef.get()!!.orThrow + fail("Expected exception") + } catch (e: TaskCancellationException) { + // expected + } + } + + @Test + @Suppress("DEPRECATION") + fun `runAsync runs with given executor`() { + val ec = Executors.newCachedThreadPool { r -> + val t = Thread(r) + t.isDaemon = true + t.name = "my-thread-${t.id}" + t + } + try { + val latch = CountDownLatch(1) + val outcomeRef = AtomicReference?>(null) + val task = Task.fromBlockingIO { + Thread.currentThread().name + } + task.runAsync(ec) { outcome -> + outcomeRef.set(outcome) + latch.countDown() + } + TimedAwait.latchAndExpectCompletion(latch) + assertTrue(outcomeRef.get()!!.orThrow.startsWith("my-thread-")) + } finally { + ec.shutdown() + } + } +} diff --git a/tasks-kotlin/src/jvmTest/kotlin/org/funfix/tests/TaskRunBlockingTest.kt b/tasks-kotlin/src/jvmTest/kotlin/org/funfix/tests/TaskRunBlockingTest.kt new file mode 100644 index 0000000..5d8ae69 --- /dev/null +++ b/tasks-kotlin/src/jvmTest/kotlin/org/funfix/tests/TaskRunBlockingTest.kt @@ -0,0 +1,106 @@ +package org.funfix.tests + +import org.funfix.tasks.kotlin.SharedIOExecutor +import org.funfix.tasks.kotlin.Task +import org.funfix.tasks.kotlin.fromBlockingIO +import org.funfix.tasks.kotlin.runBlocking +import org.funfix.tasks.kotlin.runBlockingTimed +import kotlin.test.Test +import kotlin.test.assertEquals +import kotlin.test.fail +import kotlin.time.toKotlinDuration + +class TaskRunBlockingTest { + @Test + fun `runBlocking (success)`() { + val task = Task.fromBlockingIO { 1 } + + assertEquals(1, task.runBlocking()) + assertEquals(1, task.runBlocking(SharedIOExecutor)) + } + + @Test + fun `runBlocking (failure)`() { + val ex = RuntimeException("Boom!") + val task = Task.fromBlockingIO { throw ex } + + try { + task.runBlocking() + } catch (e: RuntimeException) { + assertEquals(ex, e) + } + + try { + task.runBlocking(SharedIOExecutor) + } catch (e: RuntimeException) { + assertEquals(ex, e) + } + } + + @Test + fun `runBlocking (cancellation)`() { + val task: Task = Task.fromBlockingIO { + throw InterruptedException() + } + try { + task.runBlocking() + } catch (e: InterruptedException) { + // expected + } + try { + task.runBlocking(SharedIOExecutor) + } catch (e: InterruptedException) { + // expected + } + } + + @Test + fun `runBlockingTimed (success)`() { + val task = Task.fromBlockingIO { 1 } + + assertEquals(1, task.runBlockingTimed( + TimedAwait.TIMEOUT.toKotlinDuration() + )) + assertEquals(1, task.runBlockingTimed( + TimedAwait.TIMEOUT.toKotlinDuration(), + SharedIOExecutor + )) + } + + @Test + fun `runBlockingTimed (failure)`() { + val ex = RuntimeException("Boom!") + val task = Task.fromBlockingIO { throw ex } + + try { + task.runBlockingTimed(TimedAwait.TIMEOUT.toKotlinDuration()) + } catch (e: RuntimeException) { + assertEquals(ex, e) + } + + try { + task.runBlockingTimed(TimedAwait.TIMEOUT.toKotlinDuration(), SharedIOExecutor) + } catch (e: RuntimeException) { + assertEquals(ex, e) + } + } + + @Test + fun `runBlockingTimed (cancellation)`() { + val task: Task = Task.fromBlockingIO { + throw InterruptedException() + } + try { + task.runBlockingTimed(TimedAwait.TIMEOUT.toKotlinDuration()) + fail("Expected exception") + } catch (e: InterruptedException) { + // expected + } + try { + task.runBlockingTimed(TimedAwait.TIMEOUT.toKotlinDuration(), SharedIOExecutor) + fail("Expected exception") + } catch (e: InterruptedException) { + // expected + } + } +} diff --git a/tasks-kotlin/src/jvmTest/kotlin/org/funfix/tests/TaskRunFiberTest.kt b/tasks-kotlin/src/jvmTest/kotlin/org/funfix/tests/TaskRunFiberTest.kt new file mode 100644 index 0000000..d4ecd20 --- /dev/null +++ b/tasks-kotlin/src/jvmTest/kotlin/org/funfix/tests/TaskRunFiberTest.kt @@ -0,0 +1,275 @@ +package org.funfix.tests + +import org.funfix.tasks.jvm.TaskCancellationException +import org.funfix.tasks.kotlin.Outcome +import org.funfix.tasks.kotlin.Task +import org.funfix.tasks.kotlin.awaitAsync +import org.funfix.tasks.kotlin.awaitBlocking +import org.funfix.tasks.kotlin.awaitBlockingTimed +import org.funfix.tasks.kotlin.fromBlockingIO +import org.funfix.tasks.kotlin.joinAsync +import org.funfix.tasks.kotlin.joinBlocking +import org.funfix.tasks.kotlin.joinBlockingTimed +import org.funfix.tasks.kotlin.outcomeOrNull +import org.funfix.tasks.kotlin.resultOrThrow +import org.funfix.tasks.kotlin.runFiber +import org.junit.jupiter.api.Test +import java.util.concurrent.CountDownLatch +import java.util.concurrent.atomic.AtomicReference +import kotlin.test.assertEquals +import kotlin.test.fail +import kotlin.time.toKotlinDuration + +class TaskRunFiberTest { + @Test + fun `runFiber + joinAsync (success)`() { + val fiber = Task + .fromBlockingIO { 1 } + .runFiber() + + val latch = CountDownLatch(1) + val outcomeRef = AtomicReference?>(null) + fiber.joinAsync { + outcomeRef.set(fiber.outcomeOrNull!!) + latch.countDown() + } + + TimedAwait.latchAndExpectCompletion(latch) + assertEquals(Outcome.Success(1), outcomeRef.get()) + } + + @Test + fun `runFiber + joinAsync (failure)`() { + val ex = RuntimeException("Boom!") + val fiber = Task + .fromBlockingIO { throw ex } + .runFiber() + + val latch = CountDownLatch(1) + val outcomeRef = AtomicReference?>(null) + fiber.joinAsync { + outcomeRef.set(fiber.outcomeOrNull!!) + latch.countDown() + } + + TimedAwait.latchAndExpectCompletion(latch) + assertEquals(Outcome.Failure(ex), outcomeRef.get()) + } + + @Test + fun `runFiber + joinAsync (cancellation)`() { + val fiber = Task + .fromBlockingIO { Thread.sleep(10000) } + .runFiber() + + val latch = CountDownLatch(1) + val outcomeRef = AtomicReference?>(null) + fiber.joinAsync { + outcomeRef.set(fiber.outcomeOrNull!!) + latch.countDown() + } + + fiber.cancel() + TimedAwait.latchAndExpectCompletion(latch) + assertEquals(Outcome.Cancellation, outcomeRef.get()) + } + + @Test + fun `runFiber + awaitAsync (success)`() { + val fiber = Task + .fromBlockingIO { 1 } + .runFiber() + + val latch = CountDownLatch(1) + val outcomeRef = AtomicReference?>(null) + fiber.awaitAsync { outcome -> + outcomeRef.set(outcome) + latch.countDown() + } + + TimedAwait.latchAndExpectCompletion(latch) + assertEquals(Outcome.Success(1), outcomeRef.get()) + } + + @Test + fun `runFiber + awaitAsync (failure)`() { + val ex = RuntimeException("Boom!") + val fiber = Task + .fromBlockingIO { throw ex } + .runFiber() + + val latch = CountDownLatch(1) + val outcomeRef = AtomicReference?>(null) + fiber.awaitAsync { outcome -> + outcomeRef.set(outcome) + latch.countDown() + } + + TimedAwait.latchAndExpectCompletion(latch) + assertEquals(Outcome.Failure(ex), outcomeRef.get()) + } + + @Test + fun `runFiber + awaitAsync (cancellation)`() { + val fiber = Task + .fromBlockingIO { Thread.sleep(10000) } + .runFiber() + + val latch = CountDownLatch(1) + val outcomeRef = AtomicReference?>(null) + fiber.awaitAsync { outcome -> + outcomeRef.set(outcome) + latch.countDown() + } + + fiber.cancel() + TimedAwait.latchAndExpectCompletion(latch) + assertEquals(Outcome.Cancellation, outcomeRef.get()) + } + + @Test + fun `runFiber + joinBlocking (success)`() { + val fiber = Task + .fromBlockingIO { 1 } + .runFiber() + + fiber.joinBlocking() + assertEquals(1, fiber.resultOrThrow) + assertEquals(Outcome.Success(1), fiber.outcomeOrNull) + } + + @Test + fun `runFiber + joinBlocking (failure)`() { + val ex = RuntimeException("Boom!") + val fiber = Task + .fromBlockingIO { throw ex } + .runFiber() + + fiber.joinBlocking() + assertEquals(Outcome.Failure(ex), fiber.outcomeOrNull) + } + + @Test + fun `runFiber + joinBlocking (cancellation)`() { + val fiber = Task + .fromBlockingIO { Thread.sleep(10000) } + .runFiber() + + fiber.cancel() + fiber.joinBlocking() + assertEquals(Outcome.Cancellation, fiber.outcomeOrNull) + } + + @Test + fun `runFiber + awaitBlocking (success)`() { + val fiber = Task + .fromBlockingIO { 1 } + .runFiber() + + val result = fiber.awaitBlocking() + assertEquals(1, result) + } + + @Test + fun `runFiber + awaitBlocking (failure)`() { + val ex = RuntimeException("Boom!") + val fiber = Task + .fromBlockingIO { throw ex } + .runFiber() + + try { + fiber.awaitBlocking() + fail("Expected exception") + } catch (e: RuntimeException) { + assertEquals(ex, e) + } + } + + @Test + fun `runFiber + awaitBlocking (cancellation)`() { + val fiber = Task + .fromBlockingIO { Thread.sleep(10000) } + .runFiber() + + fiber.cancel() + try { + fiber.awaitBlocking() + fail("Expected exception") + } catch (e: TaskCancellationException) { + // expected + } + } + + @Test + fun `runFiber + joinBlockingTimed (success)`() { + val fiber = Task + .fromBlockingIO { 1 } + .runFiber() + + fiber.joinBlockingTimed(TimedAwait.TIMEOUT.toKotlinDuration()) + assertEquals(1, fiber.resultOrThrow) + assertEquals(Outcome.Success(1), fiber.outcomeOrNull) + } + + @Test + fun `runFiber + joinBlockingTimed (failure)`() { + val ex = RuntimeException("Boom!") + val fiber = Task + .fromBlockingIO { throw ex } + .runFiber() + + fiber.joinBlockingTimed(TimedAwait.TIMEOUT.toKotlinDuration()) + assertEquals(Outcome.Failure(ex), fiber.outcomeOrNull) + } + + @Test + fun `runFiber + joinBlockingTimed (cancellation)`() { + val fiber = Task + .fromBlockingIO { Thread.sleep(10000) } + .runFiber() + + fiber.cancel() + fiber.joinBlockingTimed(TimedAwait.TIMEOUT.toKotlinDuration()) + assertEquals(Outcome.Cancellation, fiber.outcomeOrNull) + } + + @Test + fun `runFiber + awaitBlockingTimed (success)`() { + val fiber = Task + .fromBlockingIO { 1 } + .runFiber() + + val result = fiber.awaitBlockingTimed(TimedAwait.TIMEOUT.toKotlinDuration()) + assertEquals(1, result) + } + + @Test + fun `runFiber + awaitBlockingTimed (failure)`() { + val ex = RuntimeException("Boom!") + val fiber = Task + .fromBlockingIO { throw ex } + .runFiber() + + try { + fiber.awaitBlockingTimed(TimedAwait.TIMEOUT.toKotlinDuration()) + fail("Expected exception") + } catch (e: RuntimeException) { + assertEquals(ex, e) + } + } + + @Test + fun `runFiber + awaitBlockingTimed (cancellation)`() { + val fiber = Task + .fromBlockingIO { Thread.sleep(10000) } + .runFiber() + + fiber.cancel() + try { + fiber.awaitBlockingTimed(TimedAwait.TIMEOUT.toKotlinDuration()) + fail("Expected exception") + } catch (e: TaskCancellationException) { + // expected + } + } +} diff --git a/tasks-kotlin/src/jvmTest/kotlin/org/funfix/tests/TimedAwait.kt b/tasks-kotlin/src/jvmTest/kotlin/org/funfix/tests/TimedAwait.kt new file mode 100644 index 0000000..3b89538 --- /dev/null +++ b/tasks-kotlin/src/jvmTest/kotlin/org/funfix/tests/TimedAwait.kt @@ -0,0 +1,31 @@ +package org.funfix.tests + +import java.time.Duration +import java.util.concurrent.* + +object TimedAwait { + val TIMEOUT: Duration = + if (System.getenv("CI") != null) Duration.ofSeconds(20) + else Duration.ofSeconds(10) + + @Throws(InterruptedException::class) + fun latchNoExpectations(latch: CountDownLatch): Boolean = + latch.await(TIMEOUT.toMillis(), TimeUnit.MILLISECONDS) + + @Throws(InterruptedException::class) + fun latchAndExpectCompletion(latch: CountDownLatch) = latchAndExpectCompletion(latch, "latch") + + @Throws(InterruptedException::class) + fun latchAndExpectCompletion(latch: CountDownLatch, name: String) { + assert(latch.await(TIMEOUT.toMillis(), TimeUnit.MILLISECONDS)) { "$name.await" } + } + + @Throws(InterruptedException::class, TimeoutException::class) + fun future(future: Future<*>) { + try { + future.get(TIMEOUT.toMillis(), TimeUnit.MILLISECONDS) + } catch (e: ExecutionException) { + throw RuntimeException(e) + } + } +}