From 39eb0b71a210fc49e17c592d62efb80469b4f9b5 Mon Sep 17 00:00:00 2001 From: slinkydeveloper Date: Thu, 18 Jun 2026 12:09:46 +0200 Subject: [PATCH] Bump SDK test suite, with new signals tests --- .github/workflows/integration.yaml | 2 +- .../sdk/testservices/TestUtilsServiceImpl.kt | 15 +++--- .../VirtualObjectCommandInterpreterImpl.kt | 50 +++++++++++++++++++ .../contracts/TestUtilsService.kt | 24 +++++++-- .../VirtualObjectCommandInterpreter.kt | 33 ++++++++++++ 5 files changed, 113 insertions(+), 11 deletions(-) diff --git a/.github/workflows/integration.yaml b/.github/workflows/integration.yaml index 393fc83a..39ab6741 100644 --- a/.github/workflows/integration.yaml +++ b/.github/workflows/integration.yaml @@ -135,7 +135,7 @@ jobs: - name: Run test tool continue-on-error: ${{ inputs.continueOnError == 'true' }} - uses: restatedev/e2e/sdk-tests@v1.0 + uses: restatedev/e2e/sdk-tests@v2.1 with: envVars: ${{ inputs.envVars }} testArtifactOutput: ${{ inputs.testArtifactOutput != '' && inputs.testArtifactOutput || 'sdk-java-integration-test-report' }} diff --git a/test-services/src/main/kotlin/dev/restate/sdk/testservices/TestUtilsServiceImpl.kt b/test-services/src/main/kotlin/dev/restate/sdk/testservices/TestUtilsServiceImpl.kt index 398cdcac..c4a29870 100644 --- a/test-services/src/main/kotlin/dev/restate/sdk/testservices/TestUtilsServiceImpl.kt +++ b/test-services/src/main/kotlin/dev/restate/sdk/testservices/TestUtilsServiceImpl.kt @@ -12,7 +12,6 @@ import dev.restate.sdk.kotlin.* import dev.restate.sdk.testservices.contracts.* import java.util.* import java.util.concurrent.atomic.AtomicInteger -import kotlin.time.Duration.Companion.milliseconds class TestUtilsServiceImpl : TestUtilsService { override suspend fun echo(input: String): String { @@ -32,12 +31,6 @@ class TestUtilsServiceImpl : TestUtilsService { return input } - override suspend fun sleepConcurrently(millisDuration: List) { - val timers = millisDuration.map { timer("${it.milliseconds}ms", it.milliseconds) }.toList() - - timers.awaitAll() - } - override suspend fun countExecutedSideEffects(increments: Int): Int { val invokedSideEffects = AtomicInteger(0) @@ -51,4 +44,12 @@ class TestUtilsServiceImpl : TestUtilsService { override suspend fun cancelInvocation(invocationId: String) { invocationHandle(invocationId).cancel() } + + override suspend fun resolveSignal(req: TestUtilsService.ResolveSignalRequest) { + invocationHandle(req.invocationId).signal(req.signalName).resolve(req.value) + } + + override suspend fun rejectSignal(req: TestUtilsService.RejectSignalRequest) { + invocationHandle(req.invocationId).signal(req.signalName).reject(req.reason) + } } diff --git a/test-services/src/main/kotlin/dev/restate/sdk/testservices/VirtualObjectCommandInterpreterImpl.kt b/test-services/src/main/kotlin/dev/restate/sdk/testservices/VirtualObjectCommandInterpreterImpl.kt index 1ebdede1..9b12f218 100644 --- a/test-services/src/main/kotlin/dev/restate/sdk/testservices/VirtualObjectCommandInterpreterImpl.kt +++ b/test-services/src/main/kotlin/dev/restate/sdk/testservices/VirtualObjectCommandInterpreterImpl.kt @@ -62,6 +62,53 @@ class VirtualObjectCommandInterpreterImpl : VirtualObjectCommandInterpreter { is VirtualObjectCommandInterpreter.AwaitOne -> { result = it.command.toAwaitable().await() } + is VirtualObjectCommandInterpreter.AwaitFirstCompleted -> { + val cmds = it.commands.map { it.toAwaitable() } + result = + try { + select { cmds.forEach { cmd -> cmd.onAwait { v -> v } } }.await() + } catch (e: TerminalException) { + throw e + } + } + is VirtualObjectCommandInterpreter.AwaitFirstSucceededOrAllFailed -> { + val cmds = it.commands.map { it.toAwaitable() }.toMutableList() + var lastError: TerminalException? = null + while (cmds.isNotEmpty()) { + @Suppress("UNCHECKED_CAST") + val completed = DurableFuture.any(cmds as List>).await() + try { + result = cmds[completed].await() + lastError = null + break + } catch (e: TerminalException) { + lastError = e + cmds.removeAt(completed) + } + } + if (lastError != null) { + throw lastError + } + } + is VirtualObjectCommandInterpreter.AwaitAllSucceededOrFirstFailed -> { + val cmds = it.commands.map { it.toAwaitable() } + // DurableFuture.all completes on first failure or when all succeed. + @Suppress("UNCHECKED_CAST") DurableFuture.all(cmds as List>).await() + result = cmds.map { c -> c.await() }.joinToString(separator = "|") + } + is VirtualObjectCommandInterpreter.AwaitAllCompleted -> { + val cmds = it.commands.map { it.toAwaitable() } + // Wait for all to settle (no fail-fast). Accomplish by individually awaiting each. + val parts = mutableListOf() + for (cmd in cmds) { + try { + parts += "ok:${cmd.await()}" + } catch (e: TerminalException) { + parts += "err:${e.message ?: ""}" + } + } + result = parts.joinToString(separator = "|") + } is VirtualObjectCommandInterpreter.GetEnvVariable -> { result = runBlock { System.getenv(it.envName) ?: "" } } @@ -127,8 +174,11 @@ class VirtualObjectCommandInterpreterImpl : VirtualObjectCommandInterpreter { runAsync("should-fail-with-${this.reason}") { throw TerminalException(this.reason) } + is VirtualObjectCommandInterpreter.RunReturns -> + runAsync("run-returns-${this.value}") { this.value } is VirtualObjectCommandInterpreter.Sleep -> timer("command-timer", this.timeoutMillis.milliseconds).map { "sleep" } + is VirtualObjectCommandInterpreter.CreateSignal -> signal(this.signalName) } } diff --git a/test-services/src/main/kotlin/dev/restate/sdk/testservices/contracts/TestUtilsService.kt b/test-services/src/main/kotlin/dev/restate/sdk/testservices/contracts/TestUtilsService.kt index ee966f81..f7d37aca 100644 --- a/test-services/src/main/kotlin/dev/restate/sdk/testservices/contracts/TestUtilsService.kt +++ b/test-services/src/main/kotlin/dev/restate/sdk/testservices/contracts/TestUtilsService.kt @@ -9,6 +9,7 @@ package dev.restate.sdk.testservices.contracts import dev.restate.sdk.annotation.* +import kotlinx.serialization.Serializable /** Collection of various utilities/corner cases scenarios used by tests */ @Service @@ -26,9 +27,6 @@ interface TestUtilsService { /** Just echo */ @Handler @Raw suspend fun rawEcho(@Raw input: ByteArray): ByteArray - /** Create timers and await them all. Durations in milliseconds */ - @Handler suspend fun sleepConcurrently(millisDuration: List) - /** * Invoke `ctx.run` incrementing a local variable counter (not a restate state key!). * @@ -40,4 +38,24 @@ interface TestUtilsService { /** Cancel invocation using the context. */ @Handler suspend fun cancelInvocation(invocationId: String) + + @Serializable + data class ResolveSignalRequest( + val invocationId: String, + val signalName: String, + val value: String, + ) + + /** Resolve a named signal on the target invocation with a string value. */ + @Handler suspend fun resolveSignal(req: ResolveSignalRequest) + + @Serializable + data class RejectSignalRequest( + val invocationId: String, + val signalName: String, + val reason: String, + ) + + /** Reject a named signal on the target invocation. */ + @Handler suspend fun rejectSignal(req: RejectSignalRequest) } diff --git a/test-services/src/main/kotlin/dev/restate/sdk/testservices/contracts/VirtualObjectCommandInterpreter.kt b/test-services/src/main/kotlin/dev/restate/sdk/testservices/contracts/VirtualObjectCommandInterpreter.kt index ec962eb0..25f3a65d 100644 --- a/test-services/src/main/kotlin/dev/restate/sdk/testservices/contracts/VirtualObjectCommandInterpreter.kt +++ b/test-services/src/main/kotlin/dev/restate/sdk/testservices/contracts/VirtualObjectCommandInterpreter.kt @@ -32,6 +32,18 @@ interface VirtualObjectCommandInterpreter { @SerialName("runThrowTerminalException") data class RunThrowTerminalException(val reason: String) : AwaitableCommand + // This is serialized as `{"type": "runReturns", ...}` + // Executes a ctx.run side effect that returns the given value. + @Serializable + @SerialName("runReturns") + data class RunReturns(val value: String) : AwaitableCommand + + // This is serialized as `{"type": "createSignal", ...}` + // Awaits a named signal on the current invocation. + @Serializable + @SerialName("createSignal") + data class CreateSignal(val signalName: String) : AwaitableCommand + @Serializable sealed interface Command // Returns the index of the one that completed first successfully @@ -47,6 +59,27 @@ interface VirtualObjectCommandInterpreter { // Returns the result @Serializable @SerialName("awaitOne") data class AwaitOne(val command: AwaitableCommand) : Command + // Promise.any — returns the value of the first command to succeed. + // Throws with the last error if all commands fail. + @Serializable + @SerialName("awaitFirstSucceededOrAllFailed") + data class AwaitFirstSucceededOrAllFailed(val commands: List) : Command + + // Promise.race — returns the value of the first command to settle (success or failure). + @Serializable + @SerialName("awaitFirstCompleted") + data class AwaitFirstCompleted(val commands: List) : Command + + // Promise.all — pipe-joined values of all commands. Throws on first failure. + @Serializable + @SerialName("awaitAllSucceededOrFirstFailed") + data class AwaitAllSucceededOrFirstFailed(val commands: List) : Command + + // Promise.allSettled — pipe-joined "ok:val" / "err:reason" entries. Never throws. + @Serializable + @SerialName("awaitAllCompleted") + data class AwaitAllCompleted(val commands: List) : Command + // This is serialized as `{"type": "awaitAwakeableOrTimeout", ...}` // The timeout throws a terminal error with "await-timeout" string in it @Serializable