Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/integration.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ jobs:

- name: Run test tool
continue-on-error: ${{ inputs.continueOnError == 'true' }}
uses: restatedev/e2e/sdk-tests@v1.0
uses: restatedev/e2e/sdk-tests@v2.1
with:
envVars: ${{ inputs.envVars }}
testArtifactOutput: ${{ inputs.testArtifactOutput != '' && inputs.testArtifactOutput || 'sdk-java-integration-test-report' }}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import dev.restate.sdk.kotlin.*
import dev.restate.sdk.testservices.contracts.*
import java.util.*
import java.util.concurrent.atomic.AtomicInteger
import kotlin.time.Duration.Companion.milliseconds

class TestUtilsServiceImpl : TestUtilsService {
override suspend fun echo(input: String): String {
Expand All @@ -32,12 +31,6 @@ class TestUtilsServiceImpl : TestUtilsService {
return input
}

override suspend fun sleepConcurrently(millisDuration: List<Long>) {
val timers = millisDuration.map { timer("${it.milliseconds}ms", it.milliseconds) }.toList()

timers.awaitAll()
}

override suspend fun countExecutedSideEffects(increments: Int): Int {
val invokedSideEffects = AtomicInteger(0)

Expand All @@ -51,4 +44,12 @@ class TestUtilsServiceImpl : TestUtilsService {
override suspend fun cancelInvocation(invocationId: String) {
invocationHandle<Unit>(invocationId).cancel()
}

override suspend fun resolveSignal(req: TestUtilsService.ResolveSignalRequest) {
invocationHandle<Unit>(req.invocationId).signal(req.signalName).resolve(req.value)
}

override suspend fun rejectSignal(req: TestUtilsService.RejectSignalRequest) {
invocationHandle<Unit>(req.invocationId).signal(req.signalName).reject(req.reason)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,53 @@ class VirtualObjectCommandInterpreterImpl : VirtualObjectCommandInterpreter {
is VirtualObjectCommandInterpreter.AwaitOne -> {
result = it.command.toAwaitable().await()
}
is VirtualObjectCommandInterpreter.AwaitFirstCompleted -> {
val cmds = it.commands.map { it.toAwaitable() }
result =
try {
select { cmds.forEach { cmd -> cmd.onAwait { v -> v } } }.await()
} catch (e: TerminalException) {
throw e
}
}
is VirtualObjectCommandInterpreter.AwaitFirstSucceededOrAllFailed -> {
val cmds = it.commands.map { it.toAwaitable() }.toMutableList()
var lastError: TerminalException? = null
while (cmds.isNotEmpty()) {
@Suppress("UNCHECKED_CAST")
val completed = DurableFuture.any(cmds as List<DurableFuture<*>>).await()
try {
result = cmds[completed].await()
lastError = null
break
} catch (e: TerminalException) {
lastError = e
cmds.removeAt(completed)
}
}
if (lastError != null) {
throw lastError
}
}
is VirtualObjectCommandInterpreter.AwaitAllSucceededOrFirstFailed -> {
val cmds = it.commands.map { it.toAwaitable() }
// DurableFuture.all completes on first failure or when all succeed.
@Suppress("UNCHECKED_CAST") DurableFuture.all(cmds as List<DurableFuture<*>>).await()
result = cmds.map { c -> c.await() }.joinToString(separator = "|")
}
is VirtualObjectCommandInterpreter.AwaitAllCompleted -> {
val cmds = it.commands.map { it.toAwaitable() }
// Wait for all to settle (no fail-fast). Accomplish by individually awaiting each.
val parts = mutableListOf<String>()
for (cmd in cmds) {
try {
parts += "ok:${cmd.await()}"
} catch (e: TerminalException) {
parts += "err:${e.message ?: ""}"
}
}
result = parts.joinToString(separator = "|")
}
is VirtualObjectCommandInterpreter.GetEnvVariable -> {
result = runBlock { System.getenv(it.envName) ?: "" }
}
Expand Down Expand Up @@ -127,8 +174,11 @@ class VirtualObjectCommandInterpreterImpl : VirtualObjectCommandInterpreter {
runAsync<String>("should-fail-with-${this.reason}") {
throw TerminalException(this.reason)
}
is VirtualObjectCommandInterpreter.RunReturns ->
runAsync<String>("run-returns-${this.value}") { this.value }
is VirtualObjectCommandInterpreter.Sleep ->
timer("command-timer", this.timeoutMillis.milliseconds).map { "sleep" }
is VirtualObjectCommandInterpreter.CreateSignal -> signal<String>(this.signalName)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
package dev.restate.sdk.testservices.contracts

import dev.restate.sdk.annotation.*
import kotlinx.serialization.Serializable

/** Collection of various utilities/corner cases scenarios used by tests */
@Service
Expand All @@ -26,9 +27,6 @@ interface TestUtilsService {
/** Just echo */
@Handler @Raw suspend fun rawEcho(@Raw input: ByteArray): ByteArray

/** Create timers and await them all. Durations in milliseconds */
@Handler suspend fun sleepConcurrently(millisDuration: List<Long>)

/**
* Invoke `ctx.run` incrementing a local variable counter (not a restate state key!).
*
Expand All @@ -40,4 +38,24 @@ interface TestUtilsService {

/** Cancel invocation using the context. */
@Handler suspend fun cancelInvocation(invocationId: String)

@Serializable
data class ResolveSignalRequest(
val invocationId: String,
val signalName: String,
val value: String,
)

/** Resolve a named signal on the target invocation with a string value. */
@Handler suspend fun resolveSignal(req: ResolveSignalRequest)

@Serializable
data class RejectSignalRequest(
val invocationId: String,
val signalName: String,
val reason: String,
)

/** Reject a named signal on the target invocation. */
@Handler suspend fun rejectSignal(req: RejectSignalRequest)
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,18 @@ interface VirtualObjectCommandInterpreter {
@SerialName("runThrowTerminalException")
data class RunThrowTerminalException(val reason: String) : AwaitableCommand

// This is serialized as `{"type": "runReturns", ...}`
// Executes a ctx.run side effect that returns the given value.
@Serializable
@SerialName("runReturns")
data class RunReturns(val value: String) : AwaitableCommand

// This is serialized as `{"type": "createSignal", ...}`
// Awaits a named signal on the current invocation.
@Serializable
@SerialName("createSignal")
data class CreateSignal(val signalName: String) : AwaitableCommand

@Serializable sealed interface Command

// Returns the index of the one that completed first successfully
Expand All @@ -47,6 +59,27 @@ interface VirtualObjectCommandInterpreter {
// Returns the result
@Serializable @SerialName("awaitOne") data class AwaitOne(val command: AwaitableCommand) : Command

// Promise.any — returns the value of the first command to succeed.
// Throws with the last error if all commands fail.
@Serializable
@SerialName("awaitFirstSucceededOrAllFailed")
data class AwaitFirstSucceededOrAllFailed(val commands: List<AwaitableCommand>) : Command

// Promise.race — returns the value of the first command to settle (success or failure).
@Serializable
@SerialName("awaitFirstCompleted")
data class AwaitFirstCompleted(val commands: List<AwaitableCommand>) : Command

// Promise.all — pipe-joined values of all commands. Throws on first failure.
@Serializable
@SerialName("awaitAllSucceededOrFirstFailed")
data class AwaitAllSucceededOrFirstFailed(val commands: List<AwaitableCommand>) : Command

// Promise.allSettled — pipe-joined "ok:val" / "err:reason" entries. Never throws.
@Serializable
@SerialName("awaitAllCompleted")
data class AwaitAllCompleted(val commands: List<AwaitableCommand>) : Command

// This is serialized as `{"type": "awaitAwakeableOrTimeout", ...}`
// The timeout throws a terminal error with "await-timeout" string in it
@Serializable
Expand Down
Loading