Skip to content
60 changes: 56 additions & 4 deletions transact/src/main/java/dev/dbos/transact/DBOS.java
Original file line number Diff line number Diff line change
Expand Up @@ -515,6 +515,17 @@ public static <T, E extends Exception> T getResult(@NonNull String workflowId) t
return executor("getWorkflowStatus").getWorkflowStatus(workflowId);
}

/**
* Get the serialization format of the current workflow context.
*
* @return the serialization format name (e.g., "portable_json", "java_jackson"), or null if not
* in a workflow context or using default serialization
*/
public static @Nullable SerializationStrategy getSerialization() {
var ctx = DBOSContextHolder.get();
return ctx != null ? ctx.getSerialization() : null;
}

/**
* Send a message to a workflow
*
Expand All @@ -528,8 +539,33 @@ public static void send(
@NonNull Object message,
@NonNull String topic,
@Nullable String idempotencyKey) {
send(destinationId, message, topic, idempotencyKey, null);
}

/**
* Send a message to a workflow with serialization strategy
*
* @param destinationId recipient of the message
* @param message message to be sent
* @param topic topic to which the message is send
* @param idempotencyKey optional idempotency key for exactly-once send
* @param serialization serialization strategy to use (null for default)
*/
public static void send(
@NonNull String destinationId,
@NonNull Object message,
@NonNull String topic,
@Nullable String idempotencyKey,
@Nullable SerializationStrategy serialization) {
if (serialization == null) serialization = SerializationStrategy.DEFAULT;
executor("send")
.send(destinationId, message, topic, instance().internalWorkflowsService, idempotencyKey);
.send(
destinationId,
message,
topic,
instance().internalWorkflowsService,
idempotencyKey,
serialization);
}

/**
Expand All @@ -541,7 +577,7 @@ public static void send(
*/
public static void send(
@NonNull String destinationId, @NonNull Object message, @NonNull String topic) {
DBOS.send(destinationId, message, topic, null);
DBOS.send(destinationId, message, topic, null, null);
}

/**
Expand All @@ -556,13 +592,29 @@ public static void send(
}

/**
* Call within a workflow to publish a key value pair
* Call within a workflow to publish a key value pair. Uses the workflow's serialization format.
*
* @param key identifier for published data
* @param value data that is published
*/
public static void setEvent(@NonNull String key, @NonNull Object value) {
executor("setEvent").setEvent(key, value);
setEvent(key, value, null);
}

/**
* Call within a workflow to publish a key value pair with a specific serialization strategy.
*
* @param key identifier for published data
* @param value data that is published
* @param serialization serialization strategy to use (null to use workflow's default)
*/
public static void setEvent(
@NonNull String key, @NonNull Object value, @Nullable SerializationStrategy serialization) {
// If no explicit serialization specified, use the workflow context's serialization
if (serialization == null) {
serialization = getSerialization();
}
executor("setEvent").setEvent(key, value, serialization);
}

/**
Expand Down
167 changes: 154 additions & 13 deletions transact/src/main/java/dev/dbos/transact/DBOSClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@
import dev.dbos.transact.database.Result;
import dev.dbos.transact.database.SystemDatabase;
import dev.dbos.transact.execution.DBOSExecutor;
import dev.dbos.transact.json.SerializationUtil;
import dev.dbos.transact.workflow.ForkOptions;
import dev.dbos.transact.workflow.ListWorkflowsInput;
import dev.dbos.transact.workflow.SerializationStrategy;
import dev.dbos.transact.workflow.StepInfo;
import dev.dbos.transact.workflow.Timeout;
import dev.dbos.transact.workflow.WorkflowHandle;
Expand All @@ -15,6 +17,7 @@
import java.time.Duration;
import java.time.Instant;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.UUID;
Expand Down Expand Up @@ -123,7 +126,8 @@ public record EnqueueOptions(
@Nullable Instant deadline,
@Nullable String deduplicationId,
@Nullable Integer priority,
@Nullable String queuePartitionKey) {
@Nullable String queuePartitionKey,
@Nullable SerializationStrategy serialization) {

public EnqueueOptions {
if (Objects.requireNonNull(workflowName, "EnqueueOptions workflowName must not be null")
Expand Down Expand Up @@ -169,7 +173,7 @@ public record EnqueueOptions(
/** Construct `EnqueueOptions` with a minimum set of required options */
public EnqueueOptions(
@NonNull String className, @NonNull String workflowName, @NonNull String queueName) {
this(workflowName, queueName, className, "", null, null, null, null, null, null, null);
this(workflowName, queueName, className, "", null, null, null, null, null, null, null, null);
}

/**
Expand All @@ -190,7 +194,8 @@ public EnqueueOptions(
this.deadline,
this.deduplicationId,
this.priority,
this.queuePartitionKey);
this.queuePartitionKey,
this.serialization);
}

/**
Expand All @@ -212,7 +217,8 @@ public EnqueueOptions(
this.deadline,
this.deduplicationId,
this.priority,
this.queuePartitionKey);
this.queuePartitionKey,
this.serialization);
}

/**
Expand All @@ -234,7 +240,8 @@ public EnqueueOptions(
this.deadline,
this.deduplicationId,
this.priority,
this.queuePartitionKey);
this.queuePartitionKey,
this.serialization);
}

/**
Expand All @@ -256,7 +263,8 @@ public EnqueueOptions(
this.deadline,
this.deduplicationId,
this.priority,
this.queuePartitionKey);
this.queuePartitionKey,
this.serialization);
}

/**
Expand All @@ -278,7 +286,8 @@ public EnqueueOptions(
deadline,
this.deduplicationId,
this.priority,
this.queuePartitionKey);
this.queuePartitionKey,
this.serialization);
}

/**
Expand All @@ -300,7 +309,8 @@ public EnqueueOptions(
this.deadline,
deduplicationId,
this.priority,
this.queuePartitionKey);
this.queuePartitionKey,
this.serialization);
}

/**
Expand All @@ -322,7 +332,8 @@ public EnqueueOptions(
this.deadline,
this.deduplicationId,
this.priority,
this.queuePartitionKey);
this.queuePartitionKey,
this.serialization);
}

/**
Expand All @@ -343,7 +354,8 @@ public EnqueueOptions(
this.deadline,
this.deduplicationId,
priority,
this.queuePartitionKey);
this.queuePartitionKey,
this.serialization);
}

/**
Expand All @@ -366,7 +378,33 @@ public EnqueueOptions(
this.deadline,
this.deduplicationId,
this.priority,
partitionKey);
partitionKey,
this.serialization);
}

/**
* Specify the serialization strategy for the workflow arguments.
*
* @param serialization The serialization strategy ({@link SerializationStrategy#PORTABLE} for
* cross-language compatibility, {@link SerializationStrategy#NATIVE} for Java-specific, or
* {@link SerializationStrategy#DEFAULT} for the default behavior)
* @return New `EnqueueOptions` with the serialization strategy set
*/
public @NonNull EnqueueOptions withSerialization(
@Nullable SerializationStrategy serialization) {
return new EnqueueOptions(
this.workflowName,
this.queueName,
this.className,
this.instanceName,
this.workflowId,
this.appVersion,
this.timeout,
this.deadline,
this.deduplicationId,
this.priority,
this.queuePartitionKey,
serialization);
}

/**
Expand All @@ -392,6 +430,9 @@ public EnqueueOptions(
public <T, E extends Exception> @NonNull WorkflowHandle<T, E> enqueueWorkflow(
@NonNull EnqueueOptions options, @Nullable Object[] args) {

String serializationFormat =
options.serialization() != null ? options.serialization().formatName() : null;

return DBOSExecutor.enqueueWorkflow(
Objects.requireNonNull(
options.workflowName(), "EnqueueOptions workflowName must not be null"),
Expand All @@ -409,14 +450,89 @@ public EnqueueOptions(
options.priority,
options.queuePartitionKey,
false,
false),
false,
serializationFormat),
null,
null,
null,
options.appVersion,
systemDatabase);
}

/**
* Enqueue a workflow using portable JSON serialization. This method is intended for
* cross-language workflow initiation where the workflow function definition may not be available
* in Java. Unlike {@link #enqueueWorkflow}, this method does not validate function names or
* arguments.
*
* @param <T> Return type of workflow function
* @param <E> Exception thrown by workflow function
* @param options `DBOSClient.EnqueueOptions` for enqueuing the workflow
* @param positionalArgs Positional arguments to pass to the workflow function
* @param namedArgs Optional named arguments (for workflows that support them, e.g., Python
* kwargs)
* @return WorkflowHandle for retrieving workflow ID, status, and results
*/
public <T, E extends Exception> @NonNull WorkflowHandle<T, E> enqueuePortableWorkflow(
@NonNull EnqueueOptions options,
@Nullable Object[] positionalArgs,
@Nullable Map<String, Object> namedArgs) {

String workflowId =
Objects.requireNonNullElseGet(options.workflowId(), () -> UUID.randomUUID().toString());

// Serialize arguments in portable format
SerializationUtil.SerializedResult serializedArgs =
SerializationUtil.serializeArgs(
positionalArgs, namedArgs, SerializationUtil.PORTABLE, null);

// Create workflow status directly with portable serialization
var statusBuilder =
WorkflowStatusInternal.builder(workflowId, WorkflowState.ENQUEUED)
.name(options.workflowName())
.className(options.className())
.instanceName(Objects.requireNonNullElse(options.instanceName(), ""))
.queueName(options.queueName())
.inputs(serializedArgs.serializedValue())
.serialization(serializedArgs.serialization())
.createdAt(System.currentTimeMillis())
.deduplicationId(options.deduplicationId())
.priority(Objects.requireNonNullElse(options.priority(), 0))
.queuePartitionKey(options.queuePartitionKey())
.appVersion(options.appVersion());

if (options.timeout() != null) {
statusBuilder.timeoutMs(options.timeout().toMillis());
}
if (options.deadline() != null) {
statusBuilder.deadlineEpochMs(options.deadline().toEpochMilli());
}

var status = statusBuilder.build();

systemDatabase.initWorkflowStatus(status, null, false, false);

return new WorkflowHandleClient<>(workflowId);
}

/** Options for sending a message. */
public record SendOptions(@Nullable SerializationStrategy serialization) {
/** Create SendOptions with default serialization. */
public static SendOptions defaults() {
return new SendOptions(SerializationStrategy.DEFAULT);
}

/** Create SendOptions with portable JSON serialization. */
public static SendOptions portable() {
return new SendOptions(SerializationStrategy.PORTABLE);
}

/** Create SendOptions with native Java serialization. */
public static SendOptions nativeSerialization() {
return new SendOptions(SerializationStrategy.NATIVE);
}
}

/**
* Send a message to a workflow
*
Expand All @@ -430,17 +546,42 @@ public void send(
@NonNull Object message,
@NonNull String topic,
@Nullable String idempotencyKey) {
send(destinationId, message, topic, idempotencyKey, null);
}

/**
* Send a message to a workflow with serialization options
*
* @param destinationId workflowId of the workflow to receive the message
* @param message Message contents
* @param topic Topic for the message
* @param idempotencyKey If specified, use the value to ensure exactly-once send semantics
* @param options Optional send options including serialization type
*/
public void send(
@NonNull String destinationId,
@NonNull Object message,
@NonNull String topic,
@Nullable String idempotencyKey,
@Nullable SendOptions options) {
if (idempotencyKey == null) {
idempotencyKey = UUID.randomUUID().toString();
}
var workflowId = "%s-%s".formatted(destinationId, idempotencyKey);

String serializationFormat =
(options != null && options.serialization() != null)
? options.serialization().formatName()
: null;

var status =
WorkflowStatusInternal.builder(workflowId, WorkflowState.SUCCESS)
.name("temp_workflow-send-client")
.serialization(
serializationFormat != null ? serializationFormat : SerializationUtil.NATIVE)
.build();
systemDatabase.initWorkflowStatus(status, null, false, false);
systemDatabase.send(status.workflowId(), 0, destinationId, message, topic);
systemDatabase.send(status.workflowId(), 0, destinationId, message, topic, serializationFormat);
}

/**
Expand Down
Loading
Loading