Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ internal constructor(
)
.await()

object : BaseInvocationHandle<Res>(handlerContext, responseSerde) {
object : BaseInvocationHandle<Res>(this, responseSerde) {
override suspend fun invocationId(): String = invocationIdAsyncResult.poll().await()
}
}
Expand All @@ -136,7 +136,7 @@ internal constructor(
responseTypeTag: TypeTag<Res>,
): InvocationHandle<Res> =
resolveSerde<Res>(responseTypeTag).let { responseSerde ->
object : BaseInvocationHandle<Res>(handlerContext, responseSerde) {
object : BaseInvocationHandle<Res>(this, responseSerde) {
override suspend fun invocationId(): String = invocationId
}
}
Expand Down Expand Up @@ -200,6 +200,14 @@ internal constructor(
return AwakeableHandleImpl(this, id)
}

override suspend fun <T : Any> signal(name: String, typeTag: TypeTag<T>): DurableFuture<T> {
checkNotInsideRun()
val serde: Serde<T> = resolveSerde(typeTag)
return SingleDurableFutureImpl(handlerContext.signal(name).await()).simpleMap {
serde.deserialize(it)
}
}

override fun random(): RestateRandom {
return this.random
}
Expand Down
72 changes: 72 additions & 0 deletions sdk-api-kotlin/src/main/kotlin/dev/restate/sdk/kotlin/api.kt
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,21 @@ sealed interface Context {
*/
fun awakeableHandle(id: String): AwakeableHandle

/**
* Create a [DurableFuture] waiting on a named signal targeting the current invocation.
*
* Signals are identified by `(invocationId, name)`. The resolution can arrive before or after the
* handler starts waiting on the signal — there's no need to pre-register.
*
* Another invocation can resolve or reject the signal using [signalHandle].
*
* @param name the signal name.
* @param typeTag the response type tag to use for deserializing the signal result.
* @return a [DurableFuture] that resolves to the signal value (or rejects with a
* [dev.restate.sdk.common.TerminalException]).
*/
suspend fun <T : Any> signal(name: String, typeTag: TypeTag<T>): DurableFuture<T>

/**
* Create a [RestateRandom] instance inherently predictable, seeded on the
* [dev.restate.sdk.common.InvocationId], which is not secret.
Expand Down Expand Up @@ -336,6 +351,15 @@ suspend inline fun <reified T : Any> Context.awakeable(): Awakeable<T> {
return this.awakeable(typeTag<T>())
}

/**
* Create a [DurableFuture] waiting on a named signal targeting the current invocation.
*
* @see Context.signal
*/
suspend inline fun <reified T : Any> Context.signal(name: String): DurableFuture<T> {
return this.signal(name, typeTag<T>())
}

/**
* This interface can be used only within shared handlers of virtual objects. It extends [Context]
* adding access to the virtual object instance key-value state storage.
Expand Down Expand Up @@ -629,6 +653,14 @@ sealed interface InvocationHandle<Res : Any?> {

/** @return the output of this invocation, if present. */
suspend fun output(): Output<Res>

/**
* Get a [SignalHandle] for resolving or rejecting a named signal on this invocation. The
* receiving handler can await on the signal using [Context.signal].
*
* @param name the signal name.
*/
suspend fun signal(name: String): SignalHandle
}

/**
Expand Down Expand Up @@ -677,6 +709,35 @@ suspend inline fun <reified T : Any> AwakeableHandle.resolve(payload: T) {
return this.resolve(typeTag<T>(), payload)
}

/**
* Handle to resolve or reject a named signal on a target invocation.
*
* Unlike awakeables, signals are identified by `(invocationId, name)` and do not need to be
* pre-registered: the resolution can arrive before or after the handler starts waiting.
*/
sealed interface SignalHandle {
/**
* Resolve the signal with the given value.
*
* @param typeTag used to serialize the result payload.
* @param payload the result payload.
*/
suspend fun <T : Any> resolve(typeTag: TypeTag<T>, payload: T)

/**
* Reject the signal with the given reason. The handler awaiting the signal will receive a
* terminal error with [reason] as the message.
*
* @param reason the rejection reason.
*/
suspend fun reject(reason: String)
}

/** Resolve the signal with the given value. */
suspend inline fun <reified T : Any> SignalHandle.resolve(payload: T) {
return this.resolve(typeTag<T>(), payload)
}

/**
* A [DurablePromise] is a durable, distributed version of a Kotlin's Deferred, or more commonly of
* a future/promise. Restate keeps track of the [DurablePromise] across restarts/failures.
Expand Down Expand Up @@ -965,6 +1026,17 @@ suspend fun awakeableHandle(id: String): AwakeableHandle {
return context().awakeableHandle(id)
}

/**
* Create a [DurableFuture] waiting on a named signal targeting the current invocation.
*
* @throws IllegalStateException if called outside of a Restate handler
* @see Context.signal
*/
@org.jetbrains.annotations.ApiStatus.Experimental
suspend inline fun <reified T : Any> signal(name: String): DurableFuture<T> {
return context().signal(name, typeTag<T>())
}

/**
* Get an [InvocationHandle] for an already existing invocation.
*
Expand Down
28 changes: 27 additions & 1 deletion sdk-api-kotlin/src/main/kotlin/dev/restate/sdk/kotlin/futures.kt
Original file line number Diff line number Diff line change
Expand Up @@ -191,9 +191,12 @@ internal constructor(

internal abstract class BaseInvocationHandle<Res>
internal constructor(
private val handlerContext: HandlerContext,
private val contextImpl: ContextImpl,
private val responseSerde: Serde<Res>,
) : InvocationHandle<Res> {
private val handlerContext: HandlerContext
get() = contextImpl.handlerContext

override suspend fun cancel() {
checkNotInsideRun()
val ignored = handlerContext.cancelInvocation(invocationId()).await()
Expand All @@ -214,6 +217,11 @@ internal constructor(
.simpleMap { it.map { responseSerde.deserialize(it) } }
.await()
}

override suspend fun signal(name: String): SignalHandle {
val resolvedId = invocationId()
return SignalHandleImpl(contextImpl, resolvedId, name)
}
}

internal class AwakeableImpl<T : Any?>
Expand All @@ -237,6 +245,24 @@ internal class AwakeableHandleImpl(val contextImpl: ContextImpl, val id: String)
}
}

internal class SignalHandleImpl(
val contextImpl: ContextImpl,
val invocationId: String,
val name: String,
) : SignalHandle {
override suspend fun <T : Any> resolve(typeTag: TypeTag<T>, payload: T) {
checkNotInsideRun()
contextImpl.handlerContext
.resolveSignal(invocationId, name, contextImpl.resolveAndSerialize(typeTag, payload))
.await()
}

override suspend fun reject(reason: String) {
checkNotInsideRun()
contextImpl.handlerContext.rejectSignal(invocationId, name, TerminalException(reason)).await()
}
}

internal class SelectClauseImpl<T>(override val durableFuture: DurableFuture<T>) : SelectClause<T>

@PublishedApi
Expand Down
30 changes: 30 additions & 0 deletions sdk-api/src/main/java/dev/restate/sdk/Context.java
Original file line number Diff line number Diff line change
Expand Up @@ -478,6 +478,36 @@ default <T> Awakeable<T> awakeable(Class<T> clazz) {
*/
AwakeableHandle awakeableHandle(String id);

/**
* Create a {@link DurableFuture} waiting on a named signal targeting the current invocation.
*
* <p>Signals are identified by {@code (invocationId, name)}. The resolution can arrive before or
* after the handler starts waiting on the signal — there's no need to pre-register.
*
* <p>Another invocation can resolve or reject the signal using {@link
* SignalHandle#resolve(TypeTag, Object)} / {@link SignalHandle#reject(String)}.
*
* @param name the signal name.
* @param clazz the response type to use for deserializing the signal result. When using generic
* types, use {@link #signal(String, TypeTag)} instead.
* @return a {@link DurableFuture} that resolves to the signal value (or rejects with a {@link
* TerminalException}).
*/
default <T> DurableFuture<T> signal(String name, Class<T> clazz) {
return signal(name, TypeTag.of(clazz));
}

/**
* Create a {@link DurableFuture} waiting on a named signal targeting the current invocation.
*
* @param name the signal name.
* @param typeTag the response type tag to use for deserializing the signal result.
* @return a {@link DurableFuture} that resolves to the signal value (or rejects with a {@link
* TerminalException}).
* @see #signal(String, Class)
*/
<T> DurableFuture<T> signal(String name, TypeTag<T> typeTag);

/**
* Returns a deterministic random.
*
Expand Down
38 changes: 35 additions & 3 deletions sdk-api/src/main/java/dev/restate/sdk/ContextImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ public <T> Optional<T> get(StateKey<T> key) {
checkNotInsideRun();
return DurableFuture.fromAsyncResult(
Util.awaitCompletableFuture(handlerContext.get(key.name())), serviceExecutor)
.mapWithoutExecutor(opt -> opt.map(serdeFactory.create(key.serdeInfo())::deserialize))
.map(opt -> opt.map(serdeFactory.create(key.serdeInfo())::deserialize))
.await();
}

Expand Down Expand Up @@ -227,6 +227,30 @@ public Output<R> getOutput() {
serviceExecutor)
.await();
}

@Override
public SignalHandle signal(String name) {
String invocationId = invocationId();
return new SignalHandle() {
@Override
public <T> void resolve(TypeTag<T> typeTag, T payload) {
checkNotInsideRun();
Util.awaitCompletableFuture(
handlerContext.resolveSignal(
invocationId,
name,
Util.executeOrFail(
handlerContext, serdeFactory.create(typeTag)::serialize, payload)));
}

@Override
public void reject(String reason) {
checkNotInsideRun();
Util.awaitCompletableFuture(
handlerContext.rejectSignal(invocationId, name, new TerminalException(reason)));
}
};
}
}

@Override
Expand All @@ -249,7 +273,7 @@ public <T> DurableFuture<T> runAsync(
return DurableFuture.fromAsyncResult(
Util.awaitCompletableFuture(handlerContext.submitRun(name, runClosure)),
serviceExecutor)
.mapWithoutExecutor(serde::deserialize);
.map(serde::deserialize);
}

private <T> void executeRunAction(
Expand Down Expand Up @@ -325,6 +349,14 @@ public void reject(String reason) {
};
}

@Override
public <T> DurableFuture<T> signal(String name, TypeTag<T> typeTag) throws TerminalException {
checkNotInsideRun();
Serde<T> serde = serdeFactory.create(typeTag);
AsyncResult<Slice> result = Util.awaitCompletableFuture(handlerContext.signal(name));
return DurableFuture.fromAsyncResult(result, serviceExecutor).map(serde::deserialize);
}

@Override
public RestateRandom random() {
return this.random;
Expand All @@ -338,7 +370,7 @@ public DurableFuture<T> future() {
checkNotInsideRun();
AsyncResult<Slice> result = Util.awaitCompletableFuture(handlerContext.promise(key.name()));
return DurableFuture.fromAsyncResult(result, serviceExecutor)
.mapWithoutExecutor(serdeFactory.create(key.serdeInfo())::deserialize);
.map(serdeFactory.create(key.serdeInfo())::deserialize);
}

@Override
Expand Down
8 changes: 8 additions & 0 deletions sdk-api/src/main/java/dev/restate/sdk/InvocationHandle.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,4 +31,12 @@ public interface InvocationHandle<Res> {
* @return the output of this invocation, if present.
*/
Output<Res> getOutput();

/**
* Get a {@link SignalHandle} for resolving or rejecting a named signal on this invocation. The
* receiving handler can await on the signal using {@link Context#signal(String, Class)}.
*
* @param name the signal name.
*/
SignalHandle signal(String name);
}
48 changes: 48 additions & 0 deletions sdk-api/src/main/java/dev/restate/sdk/SignalHandle.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
// Copyright (c) 2023 - Restate Software, Inc., Restate GmbH
//
// This file is part of the Restate Java SDK,
// which is released under the MIT license.
//
// You can find a copy of the license in file LICENSE in the root
// directory of this repository or package, or at
// https://github.com/restatedev/sdk-java/blob/main/LICENSE
package dev.restate.sdk;

import dev.restate.serde.TypeTag;

/**
* Handle to resolve or reject a named signal on a target invocation. Acquired via {@link
* InvocationHandle#signal(String)}.
*
* <p>Unlike awakeables, signals are identified by {@code (invocationId, name)} and do not need to
* be pre-registered: the resolution can arrive before or after the handler starts waiting on the
* signal.
*/
public interface SignalHandle {

/**
* Resolve the signal with the given value.
*
* @param typeTag used to serialize the result payload.
* @param payload the result payload. MUST NOT be null.
*/
<T> void resolve(TypeTag<T> typeTag, T payload);

/**
* Resolve the signal with the given value.
*
* @param clazz used to serialize the result payload.
* @param payload the result payload. MUST NOT be null.
*/
default <T> void resolve(Class<T> clazz, T payload) {
resolve(TypeTag.of(clazz), payload);
}

/**
* Reject the signal with the given reason. The handler awaiting the signal will receive a
* terminal error with {@code reason} as the message.
*
* @param reason the rejection reason. MUST NOT be null.
*/
void reject(String reason);
}
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,17 @@ record Awakeable(String id, AsyncResult<Slice> asyncResult) {}

CompletableFuture<AsyncResult<Void>> rejectPromise(String key, TerminalException reason);

// ----- Named signals
//
// Signals are identified by (invocationId, name). Unlike awakeables, signals do not need to be
// pre-registered: the resolution can arrive before or after the handler starts waiting.

CompletableFuture<AsyncResult<Slice>> signal(String name);

CompletableFuture<Void> resolveSignal(String invocationId, String name, Slice payload);

CompletableFuture<Void> rejectSignal(String invocationId, String name, TerminalException reason);

CompletableFuture<Void> cancelInvocation(String invocationId);

CompletableFuture<AsyncResult<Slice>> attachInvocation(String invocationId);
Expand Down
Loading
Loading