diff --git a/transact/src/main/java/dev/dbos/transact/DBOS.java b/transact/src/main/java/dev/dbos/transact/DBOS.java index 70cccf9d..3de0a453 100644 --- a/transact/src/main/java/dev/dbos/transact/DBOS.java +++ b/transact/src/main/java/dev/dbos/transact/DBOS.java @@ -19,6 +19,7 @@ import dev.dbos.transact.workflow.ForkOptions; import dev.dbos.transact.workflow.ListWorkflowsInput; import dev.dbos.transact.workflow.Queue; +import dev.dbos.transact.workflow.SerializationStrategy; import dev.dbos.transact.workflow.StepInfo; import dev.dbos.transact.workflow.StepOptions; import dev.dbos.transact.workflow.Workflow; @@ -144,7 +145,13 @@ private void registerClassWorkflows( String name = wfTag.name().isEmpty() ? method.getName() : wfTag.name(); workflowRegistry.register( - className, name, target, instanceName, method, wfTag.maxRecoveryAttempts()); + className, + name, + target, + instanceName, + method, + wfTag.maxRecoveryAttempts(), + wfTag.serializationStrategy()); return name; } @@ -545,6 +552,17 @@ public static T getResult(@NonNull String workflowId) t return executor("getWorkflowStatus").getWorkflowStatus(workflowId); } + /** + * Get the serialization format of the current workflow context. + * + * @return the serialization format name (e.g., "portable_json", "java_jackson"), or null if not + * in a workflow context or using default serialization + */ + public static @Nullable SerializationStrategy getSerialization() { + var ctx = DBOSContextHolder.get(); + return ctx != null ? ctx.getSerialization() : null; + } + /** * Send a message to a workflow * @@ -558,8 +576,33 @@ public static void send( @NonNull Object message, @NonNull String topic, @Nullable String idempotencyKey) { + send(destinationId, message, topic, idempotencyKey, null); + } + + /** + * Send a message to a workflow with serialization strategy + * + * @param destinationId recipient of the message + * @param message message to be sent + * @param topic topic to which the message is send + * @param idempotencyKey optional idempotency key for exactly-once send + * @param serialization serialization strategy to use (null for default) + */ + public static void send( + @NonNull String destinationId, + @NonNull Object message, + @NonNull String topic, + @Nullable String idempotencyKey, + @Nullable SerializationStrategy serialization) { + if (serialization == null) serialization = SerializationStrategy.DEFAULT; executor("send") - .send(destinationId, message, topic, instance().internalWorkflowsService, idempotencyKey); + .send( + destinationId, + message, + topic, + instance().internalWorkflowsService, + idempotencyKey, + serialization); } /** @@ -571,7 +614,7 @@ public static void send( */ public static void send( @NonNull String destinationId, @NonNull Object message, @NonNull String topic) { - DBOS.send(destinationId, message, topic, null); + DBOS.send(destinationId, message, topic, null, null); } /** @@ -586,13 +629,29 @@ public static void send( } /** - * Call within a workflow to publish a key value pair + * Call within a workflow to publish a key value pair. Uses the workflow's serialization format. * * @param key identifier for published data * @param value data that is published */ public static void setEvent(@NonNull String key, @NonNull Object value) { - executor("setEvent").setEvent(key, value); + setEvent(key, value, null); + } + + /** + * Call within a workflow to publish a key value pair with a specific serialization strategy. + * + * @param key identifier for published data + * @param value data that is published + * @param serialization serialization strategy to use (null to use workflow's default) + */ + public static void setEvent( + @NonNull String key, @NonNull Object value, @Nullable SerializationStrategy serialization) { + // If no explicit serialization specified, use the workflow context's serialization + if (serialization == null) { + serialization = getSerialization(); + } + executor("setEvent").setEvent(key, value, serialization); } /** diff --git a/transact/src/main/java/dev/dbos/transact/DBOSClient.java b/transact/src/main/java/dev/dbos/transact/DBOSClient.java index 8066937c..a364e7f9 100644 --- a/transact/src/main/java/dev/dbos/transact/DBOSClient.java +++ b/transact/src/main/java/dev/dbos/transact/DBOSClient.java @@ -3,8 +3,11 @@ import dev.dbos.transact.database.Result; import dev.dbos.transact.database.SystemDatabase; import dev.dbos.transact.execution.DBOSExecutor; +import dev.dbos.transact.json.PortableWorkflowException; +import dev.dbos.transact.json.SerializationUtil; import dev.dbos.transact.workflow.ForkOptions; import dev.dbos.transact.workflow.ListWorkflowsInput; +import dev.dbos.transact.workflow.SerializationStrategy; import dev.dbos.transact.workflow.StepInfo; import dev.dbos.transact.workflow.Timeout; import dev.dbos.transact.workflow.WorkflowHandle; @@ -15,6 +18,7 @@ import java.time.Duration; import java.time.Instant; import java.util.List; +import java.util.Map; import java.util.Objects; import java.util.Optional; import java.util.UUID; @@ -123,7 +127,8 @@ public record EnqueueOptions( @Nullable Instant deadline, @Nullable String deduplicationId, @Nullable Integer priority, - @Nullable String queuePartitionKey) { + @Nullable String queuePartitionKey, + @Nullable SerializationStrategy serialization) { public EnqueueOptions { if (Objects.requireNonNull(workflowName, "EnqueueOptions workflowName must not be null") @@ -169,7 +174,7 @@ public record EnqueueOptions( /** Construct `EnqueueOptions` with a minimum set of required options */ public EnqueueOptions( @NonNull String className, @NonNull String workflowName, @NonNull String queueName) { - this(workflowName, queueName, className, "", null, null, null, null, null, null, null); + this(workflowName, queueName, className, "", null, null, null, null, null, null, null, null); } /** @@ -190,7 +195,8 @@ public EnqueueOptions( this.deadline, this.deduplicationId, this.priority, - this.queuePartitionKey); + this.queuePartitionKey, + this.serialization); } /** @@ -212,7 +218,8 @@ public EnqueueOptions( this.deadline, this.deduplicationId, this.priority, - this.queuePartitionKey); + this.queuePartitionKey, + this.serialization); } /** @@ -234,7 +241,8 @@ public EnqueueOptions( this.deadline, this.deduplicationId, this.priority, - this.queuePartitionKey); + this.queuePartitionKey, + this.serialization); } /** @@ -256,7 +264,8 @@ public EnqueueOptions( this.deadline, this.deduplicationId, this.priority, - this.queuePartitionKey); + this.queuePartitionKey, + this.serialization); } /** @@ -278,7 +287,8 @@ public EnqueueOptions( deadline, this.deduplicationId, this.priority, - this.queuePartitionKey); + this.queuePartitionKey, + this.serialization); } /** @@ -300,7 +310,8 @@ public EnqueueOptions( this.deadline, deduplicationId, this.priority, - this.queuePartitionKey); + this.queuePartitionKey, + this.serialization); } /** @@ -322,7 +333,8 @@ public EnqueueOptions( this.deadline, this.deduplicationId, this.priority, - this.queuePartitionKey); + this.queuePartitionKey, + this.serialization); } /** @@ -343,7 +355,8 @@ public EnqueueOptions( this.deadline, this.deduplicationId, priority, - this.queuePartitionKey); + this.queuePartitionKey, + this.serialization); } /** @@ -366,7 +379,33 @@ public EnqueueOptions( this.deadline, this.deduplicationId, this.priority, - partitionKey); + partitionKey, + this.serialization); + } + + /** + * Specify the serialization strategy for the workflow arguments. + * + * @param serialization The serialization strategy ({@link SerializationStrategy#PORTABLE} for + * cross-language compatibility, {@link SerializationStrategy#NATIVE} for Java-specific, or + * {@link SerializationStrategy#DEFAULT} for the default behavior) + * @return New `EnqueueOptions` with the serialization strategy set + */ + public @NonNull EnqueueOptions withSerialization( + @Nullable SerializationStrategy serialization) { + return new EnqueueOptions( + this.workflowName, + this.queueName, + this.className, + this.instanceName, + this.workflowId, + this.appVersion, + this.timeout, + this.deadline, + this.deduplicationId, + this.priority, + this.queuePartitionKey, + serialization); } /** @@ -392,6 +431,9 @@ public EnqueueOptions( public @NonNull WorkflowHandle enqueueWorkflow( @NonNull EnqueueOptions options, @Nullable Object[] args) { + String serializationFormat = + options.serialization() != null ? options.serialization().formatName() : null; + return DBOSExecutor.enqueueWorkflow( Objects.requireNonNull( options.workflowName(), "EnqueueOptions workflowName must not be null"), @@ -409,7 +451,8 @@ public EnqueueOptions( options.priority, options.queuePartitionKey, false, - false), + false, + serializationFormat), null, null, null, @@ -417,6 +460,73 @@ public EnqueueOptions( systemDatabase); } + /** + * Enqueue a workflow using portable JSON serialization. This method is intended for + * cross-language workflow initiation where the workflow function definition may not be available + * in Java. + * + * @param Return type of workflow function + * @param options `DBOSClient.EnqueueOptions` for enqueuing the workflow + * @param positionalArgs Positional arguments to pass to the workflow function + * @param namedArgs Optional named arguments (for workflows that support them, e.g., Python + * kwargs) + * @return WorkflowHandle for retrieving workflow ID, status, and results + */ + public @NonNull WorkflowHandle enqueuePortableWorkflow( + @NonNull EnqueueOptions options, + @Nullable Object[] positionalArgs, + @Nullable Map namedArgs) { + + String workflowId = + Objects.requireNonNullElseGet(options.workflowId(), () -> UUID.randomUUID().toString()); + + // Serialize arguments in portable format + SerializationUtil.SerializedResult serializedArgs = + SerializationUtil.serializeArgs( + positionalArgs, namedArgs, SerializationUtil.PORTABLE, null); + + // Create workflow status directly with portable serialization + var statusBuilder = + WorkflowStatusInternal.builder(workflowId, WorkflowState.ENQUEUED) + .name(options.workflowName()) + .className(options.className()) + .instanceName(Objects.requireNonNullElse(options.instanceName(), "")) + .queueName(options.queueName()) + .inputs(serializedArgs.serializedValue()) + .serialization(serializedArgs.serialization()) + .createdAt(System.currentTimeMillis()) + .deduplicationId(options.deduplicationId()) + .priority(Objects.requireNonNullElse(options.priority(), 0)) + .queuePartitionKey(options.queuePartitionKey()) + .appVersion(options.appVersion()); + + if (options.timeout() != null) { + statusBuilder.timeoutMs(options.timeout().toMillis()); + } + if (options.deadline() != null) { + statusBuilder.deadlineEpochMs(options.deadline().toEpochMilli()); + } + + var status = statusBuilder.build(); + + systemDatabase.initWorkflowStatus(status, null, false, false); + + return new WorkflowHandleClient<>(workflowId); + } + + /** Options for sending a message. */ + public record SendOptions(@Nullable SerializationStrategy serialization) { + /** Create SendOptions with default serialization. */ + public static SendOptions defaults() { + return new SendOptions(SerializationStrategy.DEFAULT); + } + + /** Create SendOptions with portable JSON serialization. */ + public static SendOptions portable() { + return new SendOptions(SerializationStrategy.PORTABLE); + } + } + /** * Send a message to a workflow * @@ -430,17 +540,42 @@ public void send( @NonNull Object message, @NonNull String topic, @Nullable String idempotencyKey) { + send(destinationId, message, topic, idempotencyKey, null); + } + + /** + * Send a message to a workflow with serialization options + * + * @param destinationId workflowId of the workflow to receive the message + * @param message Message contents + * @param topic Topic for the message + * @param idempotencyKey If specified, use the value to ensure exactly-once send semantics + * @param options Optional send options including serialization type + */ + public void send( + @NonNull String destinationId, + @NonNull Object message, + @NonNull String topic, + @Nullable String idempotencyKey, + @Nullable SendOptions options) { if (idempotencyKey == null) { idempotencyKey = UUID.randomUUID().toString(); } var workflowId = "%s-%s".formatted(destinationId, idempotencyKey); + String serializationFormat = + (options != null && options.serialization() != null) + ? options.serialization().formatName() + : null; + var status = WorkflowStatusInternal.builder(workflowId, WorkflowState.SUCCESS) .name("temp_workflow-send-client") + .serialization( + serializationFormat != null ? serializationFormat : SerializationUtil.NATIVE) .build(); systemDatabase.initWorkflowStatus(status, null, false, false); - systemDatabase.send(status.workflowId(), 0, destinationId, message, topic); + systemDatabase.send(status.workflowId(), 0, destinationId, message, topic, serializationFormat); } /** diff --git a/transact/src/main/java/dev/dbos/transact/context/DBOSContext.java b/transact/src/main/java/dev/dbos/transact/context/DBOSContext.java index a2cc3e0e..58699d08 100644 --- a/transact/src/main/java/dev/dbos/transact/context/DBOSContext.java +++ b/transact/src/main/java/dev/dbos/transact/context/DBOSContext.java @@ -1,6 +1,7 @@ package dev.dbos.transact.context; import dev.dbos.transact.StartWorkflowOptions; +import dev.dbos.transact.workflow.SerializationStrategy; import dev.dbos.transact.workflow.Timeout; import java.time.Duration; @@ -21,6 +22,7 @@ public class DBOSContext { private final WorkflowInfo parent; private final Duration timeout; private final Instant deadline; + private SerializationStrategy serialization; // private StepStatus stepStatus; @@ -30,14 +32,25 @@ public DBOSContext() { parent = null; timeout = null; deadline = null; + serialization = SerializationStrategy.DEFAULT; } public DBOSContext(String workflowId, WorkflowInfo parent, Duration timeout, Instant deadline) { + this(workflowId, parent, timeout, deadline, null); + } + + public DBOSContext( + String workflowId, + WorkflowInfo parent, + Duration timeout, + Instant deadline, + SerializationStrategy serialization) { this.workflowId = workflowId; this.functionId = 0; this.parent = parent; this.timeout = timeout; this.deadline = deadline; + this.serialization = serialization; } public DBOSContext( @@ -54,6 +67,7 @@ public DBOSContext( this.parent = other.parent; this.timeout = other.timeout; this.deadline = other.deadline; + this.serialization = other.serialization; } public boolean isInWorkflow() { @@ -122,6 +136,14 @@ public Instant getDeadline() { return deadline; } + public SerializationStrategy getSerialization() { + return serialization; + } + + public void setSerializationStrategy(SerializationStrategy strat) { + this.serialization = strat; + } + public static String workflowId() { var ctx = DBOSContextHolder.get(); return ctx == null ? null : ctx.workflowId; diff --git a/transact/src/main/java/dev/dbos/transact/database/NotificationsDAO.java b/transact/src/main/java/dev/dbos/transact/database/NotificationsDAO.java index 290454e8..85b410a2 100644 --- a/transact/src/main/java/dev/dbos/transact/database/NotificationsDAO.java +++ b/transact/src/main/java/dev/dbos/transact/database/NotificationsDAO.java @@ -4,6 +4,7 @@ import dev.dbos.transact.exceptions.DBOSNonExistentWorkflowException; import dev.dbos.transact.exceptions.DBOSWorkflowExecutionConflictException; import dev.dbos.transact.json.JSONUtil; +import dev.dbos.transact.json.SerializationUtil; import dev.dbos.transact.workflow.internal.StepResult; import java.sql.Connection; @@ -39,7 +40,12 @@ void speedUpPollingForTest() { } void send( - String workflowUuid, int functionId, String destinationUuid, Object message, String topic) + String workflowUuid, + int functionId, + String destinationUuid, + Object message, + String topic, + String serialization) throws SQLException { var startTime = System.currentTimeMillis(); @@ -71,17 +77,22 @@ void send( finalTopic); } - // Insert notification + // Serialize the message using the specified format + SerializationUtil.SerializedResult serializedMsg = + SerializationUtil.serializeValue(message, serialization, null); + + // Insert notification with serialization format final String sql = """ - INSERT INTO %s.notifications (destination_uuid, topic, message) VALUES (?, ?, ?) + INSERT INTO %s.notifications (destination_uuid, topic, message, serialization) VALUES (?, ?, ?, ?) """ .formatted(this.schema); try (PreparedStatement stmt = conn.prepareStatement(sql)) { stmt.setString(1, destinationUuid); stmt.setString(2, finalTopic); - stmt.setString(3, JSONUtil.serialize(message)); + stmt.setString(3, serializedMsg.serializedValue()); + stmt.setString(4, serializedMsg.serialization()); stmt.executeUpdate(); } catch (SQLException e) { // Foreign key violation @@ -92,7 +103,7 @@ void send( } // Record operation result - var output = new StepResult(workflowUuid, functionId, functionName); + var output = new StepResult(workflowUuid, functionId, functionName, null, null, null, null); StepsDAO.recordStepResultTxn( output, startTime, System.currentTimeMillis(), conn, this.schema); @@ -128,8 +139,8 @@ Object recv( if (recordedOutput != null) { logger.debug("Replaying recv, id: {}, topic: {}", functionId, finalTopic); if (recordedOutput.output() != null) { - Object[] dSerOut = JSONUtil.deserializeToArray(recordedOutput.output()); - return dSerOut == null ? null : dSerOut[0]; + return SerializationUtil.deserializeValue( + recordedOutput.output(), recordedOutput.serialization(), null); } else { throw new RuntimeException("No output recorded in the last recv"); } @@ -213,7 +224,7 @@ Object recv( final String sql = """ WITH oldest_entry AS ( - SELECT destination_uuid, topic, message, created_at_epoch_ms + SELECT destination_uuid, topic, message, serialization, created_at_epoch_ms FROM %1$s.notifications WHERE destination_uuid = ? AND topic = ? ORDER BY created_at_epoch_ms ASC @@ -223,11 +234,11 @@ WITH oldest_entry AS ( WHERE destination_uuid = (SELECT destination_uuid FROM oldest_entry) AND topic = (SELECT topic FROM oldest_entry) AND created_at_epoch_ms = (SELECT created_at_epoch_ms FROM oldest_entry) - RETURNING message + RETURNING message, serialization """ .formatted(this.schema); - Object[] recvdSermessage = null; + Object recvdMessage = null; try (PreparedStatement stmt = conn.prepareStatement(sql)) { stmt.setString(1, workflowUuid); stmt.setString(2, finalTopic); @@ -235,15 +246,17 @@ WITH oldest_entry AS ( try (ResultSet rs = stmt.executeQuery()) { if (rs.next()) { String serializedMessage = rs.getString("message"); - recvdSermessage = JSONUtil.deserializeToArray(serializedMessage); + String serialization = rs.getString("serialization"); + recvdMessage = + SerializationUtil.deserializeValue(serializedMessage, serialization, null); } } } // Record operation result - Object toSave = recvdSermessage == null ? null : recvdSermessage[0]; + Object toSave = recvdMessage; StepResult output = - new StepResult(workflowUuid, functionId, functionName) + new StepResult(workflowUuid, functionId, functionName, null, null, null, null) .withOutput(JSONUtil.serialize(toSave)); StepsDAO.recordStepResultTxn( output, startTime, System.currentTimeMillis(), conn, this.schema); @@ -259,14 +272,19 @@ WITH oldest_entry AS ( } private void setEvent( - Connection conn, String workflowId, int functionId, String key, String message) + Connection conn, + String workflowId, + int functionId, + String key, + String message, + String serialization) throws SQLException { final String eventSql = """ - INSERT INTO %s.workflow_events (workflow_uuid, key, value) - VALUES (?, ?, ?) + INSERT INTO %s.workflow_events (workflow_uuid, key, value, serialization) + VALUES (?, ?, ?, ?) ON CONFLICT (workflow_uuid, key) - DO UPDATE SET value = EXCLUDED.value + DO UPDATE SET value = EXCLUDED.value, serialization = EXCLUDED.serialization """ .formatted(this.schema); @@ -274,15 +292,16 @@ ON CONFLICT (workflow_uuid, key) stmt.setString(1, workflowId); stmt.setString(2, key); stmt.setString(3, message); + stmt.setString(4, serialization); stmt.executeUpdate(); } final String eventHistorySql = """ - INSERT INTO %s.workflow_events_history (workflow_uuid, function_id, key, value) - VALUES (?, ?, ?, ?) + INSERT INTO %s.workflow_events_history (workflow_uuid, function_id, key, value, serialization) + VALUES (?, ?, ?, ?, ?) ON CONFLICT (workflow_uuid, key, function_id) - DO UPDATE SET value = EXCLUDED.value + DO UPDATE SET value = EXCLUDED.value, serialization = EXCLUDED.serialization """ .formatted(this.schema); @@ -291,16 +310,26 @@ ON CONFLICT (workflow_uuid, key, function_id) stmt.setInt(2, functionId); stmt.setString(3, key); stmt.setString(4, message); + stmt.setString(5, serialization); stmt.executeUpdate(); } } - void setEvent(String workflowId, int functionId, String key, Object message, boolean asStep) + void setEvent( + String workflowId, + int functionId, + String key, + Object message, + boolean asStep, + String serialization) throws SQLException { var startTime = System.currentTimeMillis(); String functionName = "DBOS.setEvent"; - String serializedMessage = JSONUtil.serialize(message); + + // Serialize the message using the specified format + SerializationUtil.SerializedResult serializedResult = + SerializationUtil.serializeValue(message, serialization, null); try (Connection conn = dataSource.getConnection()) { conn.setAutoCommit(false); @@ -321,11 +350,18 @@ void setEvent(String workflowId, int functionId, String key, Object message, boo } } - this.setEvent(conn, workflowId, functionId, key, serializedMessage); + this.setEvent( + conn, + workflowId, + functionId, + key, + serializedResult.serializedValue(), + serializedResult.serialization()); if (asStep) { // Record the operation result - StepResult output = new StepResult(workflowId, functionId, functionName); + StepResult output = + new StepResult(workflowId, functionId, functionName, null, null, null, null); StepsDAO.recordStepResultTxn( output, startTime, System.currentTimeMillis(), conn, this.schema); } @@ -361,8 +397,8 @@ Object getEvent( if (recordedOutput != null) { logger.debug("Replaying getEvent, id: {}, key: {}", callerCtx.functionId(), key); if (recordedOutput.output() != null) { - Object[] outputArray = JSONUtil.deserializeToArray(recordedOutput.output()); - return outputArray == null ? null : outputArray[0]; + return SerializationUtil.deserializeValue( + recordedOutput.output(), recordedOutput.serialization(), null); } else { throw new RuntimeException("No output recorded in the last getEvent"); } @@ -382,7 +418,7 @@ Object getEvent( Object value = null; final String sql = """ - SELECT value FROM %s.workflow_events WHERE workflow_uuid = ? AND key = ? + SELECT value, serialization FROM %s.workflow_events WHERE workflow_uuid = ? AND key = ? """ .formatted(this.schema); @@ -405,8 +441,8 @@ Object getEvent( try (ResultSet rs = stmt.executeQuery()) { if (rs.next()) { String serializedValue = rs.getString("value"); - Object[] valueArray = JSONUtil.deserializeToArray(serializedValue); - value = valueArray == null ? null : valueArray[0]; + String serialization = rs.getString("serialization"); + value = SerializationUtil.deserializeValue(serializedValue, serialization, null); hasExistingNotification = true; } } @@ -445,7 +481,14 @@ Object getEvent( // Record the output if it's in a workflow if (callerCtx != null) { StepResult output = - new StepResult(callerCtx.workflowId(), callerCtx.functionId(), functionName) + new StepResult( + callerCtx.workflowId(), + callerCtx.functionId(), + functionName, + null, + null, + null, + null) .withOutput(JSONUtil.serialize(value)); StepsDAO.recordStepResultTxn( dataSource, output, startTime, System.currentTimeMillis(), this.schema); diff --git a/transact/src/main/java/dev/dbos/transact/database/StepsDAO.java b/transact/src/main/java/dev/dbos/transact/database/StepsDAO.java index b67ebaeb..30aca478 100644 --- a/transact/src/main/java/dev/dbos/transact/database/StepsDAO.java +++ b/transact/src/main/java/dev/dbos/transact/database/StepsDAO.java @@ -3,6 +3,7 @@ import dev.dbos.transact.exceptions.*; import dev.dbos.transact.internal.DebugTriggers; import dev.dbos.transact.json.JSONUtil; +import dev.dbos.transact.json.SerializationUtil; import dev.dbos.transact.workflow.ErrorResult; import dev.dbos.transact.workflow.StepInfo; import dev.dbos.transact.workflow.WorkflowState; @@ -157,7 +158,7 @@ static StepResult checkStepExecutionTxn( String operationOutputSql = """ - SELECT output, error, function_name + SELECT output, error, function_name, serialization FROM %s.operation_outputs WHERE workflow_uuid = ? AND function_id = ? """ @@ -174,8 +175,10 @@ static StepResult checkStepExecutionTxn( String output = rs.getString("output"); String error = rs.getString("error"); recordedFunctionName = rs.getString("function_name"); + String serialization = rs.getString("serialization"); recordedResult = - new StepResult(workflowId, functionId, recordedFunctionName, output, error, null); + new StepResult( + workflowId, functionId, recordedFunctionName, output, error, null, serialization); } } } @@ -202,7 +205,7 @@ List listWorkflowSteps(Connection connection, String workflowId) throw final String sql = """ - SELECT function_id, function_name, output, error, child_workflow_id, started_at_epoch_ms, completed_at_epoch_ms + SELECT function_id, function_name, output, error, child_workflow_id, started_at_epoch_ms, completed_at_epoch_ms, serialization FROM %s.operation_outputs WHERE workflow_uuid = ? ORDER BY function_id; @@ -225,12 +228,13 @@ List listWorkflowSteps(Connection connection, String workflowId) throw String childWorkflowId = rs.getString("child_workflow_id"); Long startedAt = rs.getObject("started_at_epoch_ms", Long.class); Long completedAt = rs.getObject("completed_at_epoch_ms", Long.class); + String serialization = rs.getString("serialization"); // Deserialize output if present - Object[] output = null; + Object outputVal = null; if (outputData != null) { try { - output = JSONUtil.deserializeToArray(outputData); + outputVal = SerializationUtil.deserializeValue(outputData, serialization, null); } catch (Exception e) { throw new RuntimeException( "Failed to deserialize output for function " + functionId, e); @@ -238,8 +242,7 @@ List listWorkflowSteps(Connection connection, String workflowId) throw } // Deserialize error if present - ErrorResult stepError = ErrorResult.deserialize(errorData); - Object outputVal = output != null ? output[0] : null; + ErrorResult stepError = ErrorResult.deserialize(errorData, serialization, null); steps.add( new StepInfo( functionId, @@ -248,7 +251,8 @@ List listWorkflowSteps(Connection connection, String workflowId) throw stepError, childWorkflowId, startedAt, - completedAt)); + completedAt, + serialization)); } } } @@ -290,8 +294,10 @@ static Duration durableSleepDuration( if (recordedOutput.output() == null) { throw new IllegalStateException("No recorded timeout for sleep"); } - Object[] dser = JSONUtil.deserializeToArray(recordedOutput.output()); - endTime = (long) dser[0]; + Object deserialized = + SerializationUtil.deserializeValue( + recordedOutput.output(), recordedOutput.serialization(), null); + endTime = ((Number) deserialized).longValue(); } else { logger.debug( "Running sleep, workflow {}, id: {}, duration: {}", workflowUuid, functionId, duration); @@ -299,7 +305,7 @@ static Duration durableSleepDuration( try { StepResult output = - new StepResult(workflowUuid, functionId, functionName) + new StepResult(workflowUuid, functionId, functionName, null, null, null, null) .withOutput(JSONUtil.serialize(endTime)); recordStepResultTxn(dataSource, output, startTime, (long) endTime, schema); } catch (DBOSWorkflowExecutionConflictException e) { diff --git a/transact/src/main/java/dev/dbos/transact/database/SystemDatabase.java b/transact/src/main/java/dev/dbos/transact/database/SystemDatabase.java index f9d4f1c8..67763e63 100644 --- a/transact/src/main/java/dev/dbos/transact/database/SystemDatabase.java +++ b/transact/src/main/java/dev/dbos/transact/database/SystemDatabase.java @@ -4,6 +4,7 @@ import dev.dbos.transact.config.DBOSConfig; import dev.dbos.transact.exceptions.*; import dev.dbos.transact.json.JSONUtil; +import dev.dbos.transact.json.SerializationUtil; import dev.dbos.transact.workflow.ExportedWorkflow; import dev.dbos.transact.workflow.ForkOptions; import dev.dbos.transact.workflow.ListWorkflowsInput; @@ -344,11 +345,17 @@ public Optional checkChildWorkflow(String workflowUuid, int functionId) } public void send( - String workflowId, int functionId, String destinationId, Object message, String topic) { + String workflowId, + int functionId, + String destinationId, + Object message, + String topic, + String serialization) { dbRetry( () -> { - notificationsDAO.send(workflowId, functionId, destinationId, message, topic); + notificationsDAO.send( + workflowId, functionId, destinationId, message, topic, serialization); return null; }); } @@ -363,11 +370,16 @@ public Object recv( } public void setEvent( - String workflowId, int functionId, String key, Object message, boolean asStep) { + String workflowId, + int functionId, + String key, + Object message, + boolean asStep, + String serialization) { dbRetry( () -> { - notificationsDAO.setEvent(workflowId, functionId, key, message, asStep); + notificationsDAO.setEvent(workflowId, functionId, key, message, asStep, serialization); return null; }); } @@ -584,7 +596,8 @@ public boolean patch(String workflowId, int functionId, String patchName) { try (Connection conn = dataSource.getConnection()) { var checkpointName = getCheckpointName(conn, workflowId, functionId); if (checkpointName == null) { - var output = new StepResult(workflowId, functionId, patchName); + var output = + new StepResult(workflowId, functionId, patchName, null, null, null, null); StepsDAO.recordStepResultTxn( output, System.currentTimeMillis(), null, conn, this.schema); return true; @@ -672,7 +685,7 @@ List getWorkflowChildrenInternal(String workflowId) throws SQLException List listWorkflowEvents(Connection conn, String workflowId) throws SQLException { var sql = """ - SELECT key, value + SELECT key, value, serialization FROM %s.workflow_events WHERE workflow_uuid = ? """ @@ -685,7 +698,8 @@ List listWorkflowEvents(Connection conn, String workflowId) throw while (rs.next()) { var key = rs.getString("key"); var value = rs.getString("value"); - events.add(new WorkflowEvent(key, value)); + var serialization = rs.getString("serialization"); + events.add(new WorkflowEvent(key, value, serialization)); } } } @@ -696,7 +710,7 @@ List listWorkflowEventHistory(Connection conn, String work throws SQLException { var sql = """ - SELECT key, value, function_id + SELECT key, value, function_id, serialization FROM %s.workflow_events_history WHERE workflow_uuid = ? """ @@ -710,7 +724,8 @@ List listWorkflowEventHistory(Connection conn, String work var key = rs.getString("key"); var value = rs.getString("value"); var stepId = rs.getInt("function_id"); - history.add(new WorkflowEventHistory(key, value, stepId)); + var serialization = rs.getString("serialization"); + history.add(new WorkflowEventHistory(key, value, stepId, serialization)); } } } @@ -720,7 +735,7 @@ List listWorkflowEventHistory(Connection conn, String work List listWorkflowStreams(Connection conn, String workflowId) throws SQLException { var sql = """ - SELECT key, value, "offset", function_id + SELECT key, value, "offset", function_id, serialization FROM %s.streams WHERE workflow_uuid = ? """ @@ -735,7 +750,8 @@ List listWorkflowStreams(Connection conn, String workflowId) thr var value = rs.getString("value"); var offset = rs.getInt("offset"); var stepId = rs.getInt("function_id"); - streams.add(new WorkflowStream(key, value, offset, stepId)); + var serialization = rs.getString("serialization"); + streams.add(new WorkflowStream(key, value, offset, stepId, serialization)); } } } @@ -780,9 +796,9 @@ public void importWorkflow(List workflows) { created_at, updated_at, started_at_epoch_ms, queue_name, deduplication_id, priority, queue_partition_key, workflow_timeout_ms, workflow_deadline_epoch_ms, - recovery_attempts, forked_from, parent_workflow_id + recovery_attempts, forked_from, parent_workflow_id, serialization ) VALUES ( - ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ? + ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ? ) """ .formatted(this.schema); @@ -792,9 +808,10 @@ public void importWorkflow(List workflows) { INSERT INTO %s.operation_outputs ( workflow_uuid, function_id, function_name, output, error, child_workflow_id, - started_at_epoch_ms, completed_at_epoch_ms + started_at_epoch_ms, completed_at_epoch_ms, + serialization ) VALUES ( - ?, ?, ?, ?, ?, ?, ?, ? + ?, ?, ?, ?, ?, ?, ?, ?, ? ) """ .formatted(this.schema); @@ -802,9 +819,9 @@ public void importWorkflow(List workflows) { var eventSQL = """ INSERT INTO %s.workflow_events ( - workflow_uuid, key, value + workflow_uuid, key, value, serialization ) VALUES ( - ?, ?, ? + ?, ?, ?, ? ) """ .formatted(this.schema); @@ -812,9 +829,9 @@ public void importWorkflow(List workflows) { var eventHistorySQL = """ INSERT INTO %s.workflow_events_history ( - workflow_uuid, key, value, function_id + workflow_uuid, key, value, function_id, serialization ) VALUES ( - ?, ?, ?, ? + ?, ?, ?, ?, ? ) """ .formatted(this.schema); @@ -822,9 +839,9 @@ public void importWorkflow(List workflows) { var streamsSQL = """ INSERT INTO %s.streams ( - workflow_uuid, key, value, function_id, offset + workflow_uuid, key, value, function_id, offset, serialization ) VALUES ( - ?, ?, ?, ?, ? + ?, ?, ?, ?, ?, ? ) """ .formatted(this.schema); @@ -852,11 +869,26 @@ public void importWorkflow(List workflows) { ? null : JSONUtil.serializeArray(status.authenticatedRoles())); stmt.setString( - 9, status.output() == null ? null : JSONUtil.serialize(status.output())); + 9, + status.output() == null + ? null + : SerializationUtil.serializeValue( + status.output(), status.serialization(), null) + .serializedValue()); stmt.setString( - 10, status.error() == null ? null : status.error().serializedError()); + 10, + status.error() == null + ? null + : SerializationUtil.serializeError( + status.error().throwable(), status.serialization(), null) + .serializedValue()); stmt.setString( - 11, status.input() == null ? null : JSONUtil.serializeArray(status.input())); + 11, + status.input() == null + ? null + : SerializationUtil.serializeArgs( + status.input(), null, status.serialization(), null) + .serializedValue()); stmt.setString(12, status.executorId()); stmt.setString(13, status.appVersion()); stmt.setString(14, status.appId()); @@ -872,6 +904,7 @@ public void importWorkflow(List workflows) { stmt.setObject(24, status.recoveryAttempts()); stmt.setString(25, status.forkedFrom()); stmt.setString(26, status.parentWorkflowId()); + stmt.setString(27, status.serialization()); stmt.executeUpdate(); } @@ -882,11 +915,17 @@ public void importWorkflow(List workflows) { stmt.setInt(2, step.functionId()); stmt.setString(3, step.functionName()); stmt.setString( - 4, step.output() == null ? null : JSONUtil.serialize(step.output())); + 4, + step.output() == null + ? null + : SerializationUtil.serializeValue( + step.output(), step.serialization(), null) + .serializedValue()); stmt.setString(5, step.error() == null ? null : step.error().serializedError()); stmt.setString(6, step.childWorkflowId()); stmt.setObject(7, step.startedAtEpochMs()); stmt.setObject(8, step.completedAtEpochMs()); + stmt.setString(9, step.serialization()); stmt.executeUpdate(); } @@ -897,6 +936,7 @@ public void importWorkflow(List workflows) { stmt.setString(1, status.workflowId()); stmt.setString(2, event.key()); stmt.setString(3, event.value()); + stmt.setString(4, event.serialization()); stmt.executeUpdate(); } @@ -908,6 +948,7 @@ public void importWorkflow(List workflows) { stmt.setString(2, history.key()); stmt.setString(3, history.value()); stmt.setInt(4, history.stepId()); + stmt.setString(5, history.serialization()); stmt.executeUpdate(); } @@ -920,6 +961,7 @@ public void importWorkflow(List workflows) { stmt.setString(3, stream.value()); stmt.setInt(4, stream.stepId()); stmt.setInt(5, stream.offset()); + stmt.setString(6, stream.serialization()); stmt.executeUpdate(); } diff --git a/transact/src/main/java/dev/dbos/transact/database/WorkflowDAO.java b/transact/src/main/java/dev/dbos/transact/database/WorkflowDAO.java index 80bbe26d..0dfb8bc6 100644 --- a/transact/src/main/java/dev/dbos/transact/database/WorkflowDAO.java +++ b/transact/src/main/java/dev/dbos/transact/database/WorkflowDAO.java @@ -3,7 +3,9 @@ import dev.dbos.transact.Constants; import dev.dbos.transact.exceptions.*; import dev.dbos.transact.internal.DebugTriggers; +import dev.dbos.transact.json.DBOSSerializer; import dev.dbos.transact.json.JSONUtil; +import dev.dbos.transact.json.SerializationUtil; import dev.dbos.transact.workflow.ErrorResult; import dev.dbos.transact.workflow.ForkOptions; import dev.dbos.transact.workflow.ListWorkflowsInput; @@ -91,7 +93,11 @@ WorkflowInitResult initWorkflowStatus( throw new DBOSMaxRecoveryAttemptsExceededException(initStatus.workflowId(), maxRetries); } return new WorkflowInitResult( - initStatus.workflowId(), resRow.status(), resRow.deadlineEpochMs(), false); + initStatus.workflowId(), + resRow.status(), + resRow.deadlineEpochMs(), + false, + resRow.serialization()); } // Upsert above already set executor assignment and incremented the recovery attempt @@ -120,7 +126,11 @@ WorkflowInitResult initWorkflowStatus( } return new WorkflowInitResult( - initStatus.workflowId(), resRow.status(), resRow.deadlineEpochMs(), true); + initStatus.workflowId(), + resRow.status(), + resRow.deadlineEpochMs(), + true, + resRow.serialization()); } finally { if (shouldCommit) { @@ -142,6 +152,7 @@ static record InsertWorkflowResult( String queueName, Long timeoutMs, Long deadlineEpochMs, + String serialization, String ownerXid) {} /** @@ -170,8 +181,8 @@ InsertWorkflowResult insertWorkflowStatus( executor_id, application_version, application_id, created_at, updated_at, recovery_attempts, workflow_timeout_ms, workflow_deadline_epoch_ms, - parent_workflow_id, owner_xid - ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + parent_workflow_id, owner_xid, serialization + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ON CONFLICT (workflow_uuid) DO UPDATE SET recovery_attempts = CASE @@ -185,7 +196,7 @@ ON CONFLICT (workflow_uuid) THEN workflow_status.executor_id ELSE EXCLUDED.executor_id END - RETURNING recovery_attempts, status, name, class_name, config_name, queue_name, workflow_timeout_ms, workflow_deadline_epoch_ms, owner_xid + RETURNING recovery_attempts, status, name, class_name, config_name, queue_name, workflow_timeout_ms, workflow_deadline_epoch_ms, owner_xid, serialization """ .formatted(this.schema); @@ -225,7 +236,8 @@ ON CONFLICT (workflow_uuid) stmt.setString(22, status.parentWorkflowId()); stmt.setObject(23, ownerXid); - stmt.setInt(24, incrementAttempts ? 1 : 0); + stmt.setString(24, status.serialization()); + stmt.setInt(25, incrementAttempts ? 1 : 0); try (ResultSet rs = stmt.executeQuery()) { if (rs.next()) { @@ -239,6 +251,7 @@ ON CONFLICT (workflow_uuid) rs.getString("queue_name"), rs.getObject("workflow_timeout_ms", Long.class), rs.getObject("workflow_deadline_epoch_ms", Long.class), + rs.getString("serialization"), rs.getString("owner_xid")); return result; @@ -329,7 +342,7 @@ WorkflowStatus getWorkflowStatus(Connection conn, String workflowId) throws SQLE SELECT workflow_uuid, status, name, class_name, config_name, - inputs, output, error, + inputs, output, error, serialization, queue_name, deduplication_id, priority, queue_partition_key, executor_id, application_version, application_id, authenticated_user, assumed_role, authenticated_roles, @@ -345,7 +358,7 @@ WorkflowStatus getWorkflowStatus(Connection conn, String workflowId) throws SQLE stmt.setString(1, workflowId); try (var rs = stmt.executeQuery()) { if (rs.next()) { - return resultsToWorkflowStatus(rs, true, true); + return resultsToWorkflowStatus(rs, true, true, null); } } } @@ -387,6 +400,9 @@ List listWorkflows(ListWorkflowsInput input) throws SQLException if (loadOutput) { sqlBuilder.append(", output, error"); } + if (loadInput || loadOutput) { + sqlBuilder.append(", serialization"); + } sqlBuilder.append(" FROM %s.workflow_status ".formatted(this.schema)); @@ -506,7 +522,7 @@ List listWorkflows(ListWorkflowsInput input) throws SQLException try (ResultSet rs = pstmt.executeQuery()) { while (rs.next()) { - WorkflowStatus info = resultsToWorkflowStatus(rs, loadInput, loadOutput); + WorkflowStatus info = resultsToWorkflowStatus(rs, loadInput, loadOutput, null); workflows.add(info); } } @@ -516,27 +532,29 @@ List listWorkflows(ListWorkflowsInput input) throws SQLException } private static WorkflowStatus resultsToWorkflowStatus( - ResultSet rs, boolean loadInput, boolean loadOutput) throws SQLException { + ResultSet rs, boolean loadInput, boolean loadOutput, DBOSSerializer serializer) + throws SQLException { var workflow_uuid = rs.getString("workflow_uuid"); String authenticatedRolesJson = rs.getString("authenticated_roles"); String serializedInput = loadInput ? rs.getString("inputs") : null; String serializedOutput = loadOutput ? rs.getString("output") : null; String serializedError = loadOutput ? rs.getString("error") : null; - ErrorResult err = ErrorResult.deserialize(serializedError); + String serialization = loadInput || loadOutput ? rs.getString("serialization") : null; + var err = ErrorResult.deserialize(serializedError, serialization, null); WorkflowStatus info = new WorkflowStatus( workflow_uuid, rs.getString("status"), rs.getString("name"), - rs.getString("class_name"), - rs.getString("config_name"), + Objects.requireNonNullElse(rs.getString("class_name"), ""), + Objects.requireNonNullElse(rs.getString("config_name"), ""), rs.getString("authenticated_user"), rs.getString("assumed_role"), (authenticatedRolesJson != null) ? (String[]) JSONUtil.deserializeToArray(authenticatedRolesJson) : null, - (serializedInput != null) ? JSONUtil.deserializeToArray(serializedInput) : null, - (serializedOutput != null) ? JSONUtil.deserializeToArray(serializedOutput)[0] : null, + SerializationUtil.deserializePositionalArgs(serializedInput, serialization, serializer), + SerializationUtil.deserializeValue(serializedOutput, serialization, serializer), err, rs.getString("executor_id"), rs.getObject("created_at", Long.class), @@ -552,7 +570,8 @@ private static WorkflowStatus resultsToWorkflowStatus( rs.getObject("priority", Integer.class), rs.getString("queue_partition_key"), rs.getString("forked_from"), - rs.getString("parent_workflow_id")); + rs.getString("parent_workflow_id"), + rs.getString("serialization")); return info; } @@ -595,7 +614,7 @@ Result awaitWorkflowResult(String workflowId) throws SQLException { final String sql = """ - SELECT status, output, error + SELECT status, output, error, serialization FROM %s.workflow_status WHERE workflow_uuid = ? """ @@ -610,16 +629,18 @@ Result awaitWorkflowResult(String workflowId) throws SQLException { try (ResultSet rs = stmt.executeQuery()) { if (rs.next()) { String status = rs.getString("status"); + String serialization = rs.getString("serialization"); switch (WorkflowState.valueOf(status.toUpperCase())) { case SUCCESS: String output = rs.getString("output"); - Object[] oArray = JSONUtil.deserializeToArray(output); - return Result.success((T) oArray[0]); + Object outputValue = + SerializationUtil.deserializeValue(output, serialization, null); + return Result.success((T) outputValue); case ERROR: String error = rs.getString("error"); - Throwable t = JSONUtil.deserializeAppException(error); + Throwable t = SerializationUtil.deserializeError(error, serialization, null); return Result.failure(t); case CANCELLED: throw new DBOSAwaitedWorkflowCancelledException(workflowId); @@ -650,7 +671,9 @@ void recordChildWorkflow( long startTime) throws SQLException { - var result = new StepResult(parentId, functionId, functionName).withChildWorkflowId(childId); + var result = + new StepResult(parentId, functionId, functionName, null, null, null, null) + .withChildWorkflowId(childId); try (Connection connection = dataSource.getConnection()) { StepsDAO.recordStepResultTxn(result, null, null, connection, schema); } @@ -835,8 +858,8 @@ private static void insertForkedWorkflowStatus( """ INSERT INTO %s.workflow_status ( workflow_uuid, status, name, class_name, config_name, application_version, application_id, - authenticated_user, authenticated_roles, assumed_role, queue_name, inputs, workflow_timeout_ms, forked_from - ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + authenticated_user, authenticated_roles, assumed_role, queue_name, inputs, workflow_timeout_ms, forked_from, serialization + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """ .formatted(schema); @@ -852,9 +875,14 @@ private static void insertForkedWorkflowStatus( stmt.setString(9, JSONUtil.serializeArray(originalStatus.authenticatedRoles())); stmt.setString(10, originalStatus.assumedRole()); stmt.setString(11, Constants.DBOS_INTERNAL_QUEUE); - stmt.setString(12, JSONUtil.serializeArray(originalStatus.input())); + stmt.setString( + 12, + SerializationUtil.serializeArgs( + originalStatus.input(), null, originalStatus.serialization(), null) + .serializedValue()); stmt.setObject(13, timeoutMS); stmt.setString(14, originalWorkflowId); + stmt.setString(15, originalStatus.serialization()); stmt.executeUpdate(); } @@ -871,8 +899,8 @@ private static void copyOperationOutputs( String stepOutputsSql = """ INSERT INTO %1$s.operation_outputs - (workflow_uuid, function_id, output, error, function_name, child_workflow_id, started_at_epoch_ms, completed_at_epoch_ms) - SELECT ? as workflow_uuid, function_id, output, error, function_name, child_workflow_id, started_at_epoch_ms, completed_at_epoch_ms + (workflow_uuid, function_id, output, error, function_name, child_workflow_id, started_at_epoch_ms, completed_at_epoch_ms, serialization) + SELECT ? as workflow_uuid, function_id, output, error, function_name, child_workflow_id, started_at_epoch_ms, completed_at_epoch_ms, serialization FROM %1$s.operation_outputs WHERE workflow_uuid = ? AND function_id < ? """ @@ -889,8 +917,8 @@ private static void copyOperationOutputs( var eventHistorySql = """ INSERT INTO %1$s.workflow_events_history - (workflow_uuid, function_id, key, value) - SELECT ? as workflow_uuid, function_id, key, value + (workflow_uuid, function_id, key, value, serialization) + SELECT ? as workflow_uuid, function_id, key, value, serialization FROM %1$s.workflow_events_history WHERE workflow_uuid = ? AND function_id < ? """ @@ -907,8 +935,8 @@ private static void copyOperationOutputs( var eventSql = """ INSERT INTO %1$s.workflow_events - (workflow_uuid, key, value) - SELECT ?, weh1.key, weh1.value + (workflow_uuid, key, value, serialization) + SELECT ?, weh1.key, weh1.value, weh1.serialization FROM %1$s.workflow_events_history weh1 WHERE weh1.workflow_uuid = ? AND weh1.function_id = ( diff --git a/transact/src/main/java/dev/dbos/transact/database/WorkflowInitResult.java b/transact/src/main/java/dev/dbos/transact/database/WorkflowInitResult.java index 77286278..dea26bed 100644 --- a/transact/src/main/java/dev/dbos/transact/database/WorkflowInitResult.java +++ b/transact/src/main/java/dev/dbos/transact/database/WorkflowInitResult.java @@ -3,7 +3,11 @@ import java.util.Objects; public record WorkflowInitResult( - String workflowId, String status, Long deadlineEpochMS, boolean shouldExecuteOnThisExecutor) { + String workflowId, + String status, + Long deadlineEpochMS, + boolean shouldExecuteOnThisExecutor, + String serialization) { public Long deadlineEpochMS() { return Objects.requireNonNullElse(deadlineEpochMS, 0L); } diff --git a/transact/src/main/java/dev/dbos/transact/execution/DBOSExecutor.java b/transact/src/main/java/dev/dbos/transact/execution/DBOSExecutor.java index 5053a753..929435c3 100644 --- a/transact/src/main/java/dev/dbos/transact/execution/DBOSExecutor.java +++ b/transact/src/main/java/dev/dbos/transact/execution/DBOSExecutor.java @@ -25,10 +25,12 @@ import dev.dbos.transact.internal.DBOSInvocationHandler; import dev.dbos.transact.internal.Invocation; import dev.dbos.transact.json.JSONUtil; +import dev.dbos.transact.json.SerializationUtil; import dev.dbos.transact.tempworkflows.InternalWorkflowsService; import dev.dbos.transact.workflow.ForkOptions; import dev.dbos.transact.workflow.ListWorkflowsInput; import dev.dbos.transact.workflow.Queue; +import dev.dbos.transact.workflow.SerializationStrategy; import dev.dbos.transact.workflow.StepInfo; import dev.dbos.transact.workflow.StepOptions; import dev.dbos.transact.workflow.Timeout; @@ -408,18 +410,17 @@ public void fireAlertHandler(String name, String message, Map me } private static void postInvokeWorkflowResult( - SystemDatabase systemDatabase, String workflowId, Object result) { + SystemDatabase systemDatabase, String workflowId, Object result, String serialization) { - String resultString = JSONUtil.serialize(result); - systemDatabase.recordWorkflowOutput(workflowId, resultString); + var serialized = SerializationUtil.serializeValue(result, serialization, null); + systemDatabase.recordWorkflowOutput(workflowId, serialized.serializedValue()); } private static void postInvokeWorkflowError( - SystemDatabase systemDatabase, String workflowId, Throwable error) { + SystemDatabase systemDatabase, String workflowId, Throwable error, String serialization) { - String errorString = JSONUtil.serializeAppException(error); - - systemDatabase.recordWorkflowError(workflowId, errorString); + var serialized = SerializationUtil.serializeError(error, serialization, null); + systemDatabase.recordWorkflowError(workflowId, serialized.serializedValue()); } /** This does not retry */ @@ -452,7 +453,7 @@ public T callFunctionAsStep( String jsonError = JSONUtil.serializeAppException(e); StepResult r = new StepResult( - ctx.getWorkflowId(), nextFuncId, functionName, null, jsonError, childWfId); + ctx.getWorkflowId(), nextFuncId, functionName, null, jsonError, childWfId, null); systemDatabase.recordStepResultTxn(r, startTime); } throw (E) e; @@ -461,7 +462,8 @@ public T callFunctionAsStep( // Record the successful result String jsonOutput = JSONUtil.serialize(functionResult); StepResult o = - new StepResult(ctx.getWorkflowId(), nextFuncId, functionName, jsonOutput, null, childWfId); + new StepResult( + ctx.getWorkflowId(), nextFuncId, functionName, jsonOutput, null, childWfId, null); systemDatabase.recordStepResultTxn(o, startTime); return functionResult; @@ -491,10 +493,12 @@ public T runStepInternal( private T handleExistingResult(StepResult result, String functionName) throws E { if (result.output() != null) { - Object[] resArray = JSONUtil.deserializeToArray(result.output()); - return resArray == null ? null : (T) resArray[0]; + Object outputValue = + SerializationUtil.deserializeValue(result.output(), result.serialization(), null); + return (T) outputValue; } else if (result.error() != null) { - Throwable t = JSONUtil.deserializeAppException(result.error()); + Throwable t = + SerializationUtil.deserializeError(result.error(), result.serialization(), null); if (t instanceof Exception) { throw (E) t; } else { @@ -546,13 +550,15 @@ public T runStepInternal( if (recordedResult != null) { String output = recordedResult.output(); if (output != null) { - Object[] stepO = JSONUtil.deserializeToArray(output); - return stepO == null ? null : (T) stepO[0]; + Object outputValue = + SerializationUtil.deserializeValue(output, recordedResult.serialization(), null); + return (T) outputValue; } String error = recordedResult.error(); if (error != null) { - var throwable = JSONUtil.deserializeAppException(error); + var throwable = + SerializationUtil.deserializeError(error, recordedResult.serialization(), null); if (!(throwable instanceof Exception)) throw new RuntimeException(throwable.getMessage(), throwable); throw (E) throwable; @@ -597,7 +603,8 @@ public T runStepInternal( if (eThrown == null) { StepResult stepResult = - new StepResult(workflowId, stepFunctionId, stepName, serializedOutput, null, childWfId); + new StepResult( + workflowId, stepFunctionId, stepName, serializedOutput, null, childWfId, null); systemDatabase.recordStepResultTxn(stepResult, startTime); return result; } else { @@ -608,7 +615,8 @@ public T runStepInternal( stepName, null, JSONUtil.serializeAppException(eThrown), - childWfId); + childWfId, + null); systemDatabase.recordStepResultTxn(stepResult, startTime); throw (E) eThrown; } @@ -716,7 +724,8 @@ public void send( Object message, String topic, InternalWorkflowsService internalWorkflowsService, - String idempotencyKey) { + String idempotencyKey, + SerializationStrategy serialization) { DBOSContext ctx = DBOSContextHolder.get(); if (ctx.isInStep()) { @@ -726,7 +735,7 @@ public void send( var sendWfid = idempotencyKey == null ? null : "%s-%s".formatted(destinationId, idempotencyKey); try (var wfid = new WorkflowOptions(sendWfid).setContext()) { - internalWorkflowsService.sendWorkflow(destinationId, message, topic); + internalWorkflowsService.sendWorkflow(destinationId, message, topic, serialization); } return; } @@ -737,7 +746,13 @@ public void send( } int stepFunctionId = ctx.getAndIncrementFunctionId(); - systemDatabase.send(ctx.getWorkflowId(), stepFunctionId, destinationId, message, topic); + systemDatabase.send( + ctx.getWorkflowId(), + stepFunctionId, + destinationId, + message, + topic, + serialization.formatName()); } /** @@ -762,7 +777,7 @@ public Object recv(String topic, Duration timeout) { ctx.getWorkflowId(), stepFunctionId, timeoutFunctionId, topic, timeout); } - public void setEvent(String key, Object value) { + public void setEvent(String key, Object value, SerializationStrategy serialization) { logger.debug("Received setEvent for key {}", key); DBOSContext ctx = DBOSContextHolder.get(); @@ -770,9 +785,18 @@ public void setEvent(String key, Object value) { throw new IllegalStateException("DBOS.setEvent() must be called from a workflow."); } + if (serialization == null || serialization.equals(SerializationStrategy.DEFAULT)) { + if (ctx.getSerialization() != null) { + serialization = ctx.getSerialization(); + } else { + serialization = SerializationStrategy.DEFAULT; + } + } + var asStep = !ctx.isInStep(); var stepId = ctx.isInStep() ? ctx.getCurrentFunctionId() : ctx.getAndIncrementFunctionId(); - systemDatabase.setEvent(ctx.getWorkflowId(), stepId, key, value, asStep); + systemDatabase.setEvent( + ctx.getWorkflowId(), stepId, key, value, asStep, serialization.formatName()); } public Object getEvent(String workflowId, String key, Duration timeout) { @@ -940,7 +964,8 @@ public record ExecutionOptions( Integer priority, String queuePartitionKey, boolean isRecoveryRequest, - boolean isDequeuedRequest) { + boolean isDequeuedRequest, + String serialization) { public ExecutionOptions { if (timeout instanceof Timeout.Explicit explicit) { if (explicit.value().isNegative() || explicit.value().isZero()) { @@ -970,7 +995,7 @@ public record ExecutionOptions( } public ExecutionOptions(String workflowId, Duration timeout, Instant deadline) { - this(workflowId, Timeout.of(timeout), deadline, null, null, null, null, false, false); + this(workflowId, Timeout.of(timeout), deadline, null, null, null, null, false, false, null); } public ExecutionOptions asRecoveryRequest() { @@ -983,7 +1008,8 @@ public ExecutionOptions asRecoveryRequest() { this.priority, this.queuePartitionKey, true, - false); + false, + this.serialization); } public ExecutionOptions asDequeuedRequest() { @@ -996,7 +1022,22 @@ public ExecutionOptions asDequeuedRequest() { this.priority, this.queuePartitionKey, false, - true); + true, + this.serialization); + } + + public ExecutionOptions withSerialization(String serialization) { + return new ExecutionOptions( + this.workflowId, + this.timeout, + this.deadline, + this.queueName, + this.deduplicationId, + this.priority, + this.queuePartitionKey, + this.isRecoveryRequest, + this.isDequeuedRequest, + serialization); } public Duration timeoutDuration() { @@ -1024,7 +1065,8 @@ public WorkflowHandle startWorkflow( options.priority(), options.queuePartitionKey(), false, - false); + false, + null); return executeWorkflow(regWorkflow, args, execOptions, null); } @@ -1075,7 +1117,8 @@ public WorkflowHandle startWorkflow( options.priority(), options.queuePartitionKey(), false, - false); + false, + null); return executeWorkflow(workflow, invocation.args(), execOptions, parent); } @@ -1116,6 +1159,9 @@ public WorkflowHandle invokeWorkflow( } var options = new ExecutionOptions(workflowId, timeout, deadline); + if (workflow.serializationStrategy() != null) { + options = options.withSerialization(workflow.serializationStrategy().formatName()); + } return executeWorkflow(workflow, args, options, parent); } @@ -1139,7 +1185,9 @@ public WorkflowHandle executeWorkflowById( throw new DBOSWorkflowFunctionNotFoundException(workflowId, wfName); } - var options = new ExecutionOptions(workflowId, status.timeout(), status.deadline()); + var options = + new ExecutionOptions(workflowId, status.timeout(), status.deadline()) + .withSerialization(status.serialization()); if (isRecoveryRequest) options = options.asRecoveryRequest(); if (isDequeuedRequest) options = options.asDequeuedRequest(); return executeWorkflow(workflow, inputs, options, null); @@ -1155,11 +1203,19 @@ private WorkflowHandle executeWorkflow( } } + if (options.serialization() == null) { + if (workflow.serializationStrategy() != null) { + options = options.withSerialization(workflow.serializationStrategy().formatName()); + } + } + Integer maxRetries = workflow.maxRecoveryAttempts() > 0 ? workflow.maxRecoveryAttempts() : null; + final var foptions = options; + if (options.queueName() != null) { - var queue = queues.stream().filter(q -> q.name().equals(options.queueName())).findFirst(); + var queue = queues.stream().filter(q -> q.name().equals(foptions.queueName())).findFirst(); if (queue.isPresent()) { if (queue.get().partitionedEnabled() && options.queuePartitionKey() == null) { throw new IllegalArgumentException( @@ -1197,8 +1253,7 @@ private WorkflowHandle executeWorkflow( if (workflowId.isEmpty()) { throw new IllegalArgumentException("workflowId cannot be empty"); } - WorkflowInitResult initResult = null; - initResult = + WorkflowInitResult initResult = preInvokeWorkflow( systemDatabase, workflow.name(), @@ -1218,7 +1273,8 @@ private WorkflowHandle executeWorkflow( options.timeoutDuration(), options.deadline(), options.isRecoveryRequest, - options.isDequeuedRequest); + options.isDequeuedRequest, + options.serialization()); if (!initResult.shouldExecuteOnThisExecutor()) { return retrieveWorkflow(workflowId); } @@ -1237,10 +1293,17 @@ private WorkflowHandle executeWorkflow( if (res != null) throw new DBOSWorkflowExecutionConflictException(workflowId); try { logger.debug( - "executeWorkflow task {}({}) {}", workflow.fullyQualifiedName(), args, options); + "executeWorkflow task {}({}) {}", workflow.fullyQualifiedName(), args, foptions); DBOSContextHolder.set( - new DBOSContext(workflowId, parent, options.timeoutDuration(), options.deadline())); + new DBOSContext( + workflowId, + parent, + foptions.timeoutDuration(), + foptions.deadline(), + SerializationUtil.PORTABLE.equals(initResult.serialization()) + ? SerializationStrategy.PORTABLE + : SerializationStrategy.DEFAULT)); if (Thread.currentThread().isInterrupted()) { logger.debug("executeWorkflow task interrupted before workflow.invoke"); return null; @@ -1250,7 +1313,8 @@ private WorkflowHandle executeWorkflow( logger.debug("executeWorkflow task interrupted before postInvokeWorkflowResult"); return null; } - postInvokeWorkflowResult(systemDatabase, workflowId, result); + postInvokeWorkflowResult( + systemDatabase, workflowId, result, initResult.serialization()); return result; } catch (DBOSWorkflowExecutionConflictException e) { // don't persist execution conflict exception @@ -1275,7 +1339,7 @@ private WorkflowHandle executeWorkflow( throw new DBOSAwaitedWorkflowCancelledException(workflowId); } - postInvokeWorkflowError(systemDatabase, workflowId, actual); + postInvokeWorkflowError(systemDatabase, workflowId, actual, initResult.serialization()); throw e; } finally { DBOSContextHolder.clear(); @@ -1353,7 +1417,8 @@ public static WorkflowHandle enqueueWorkflow( options.timeoutDuration(), options.deadline(), options.isRecoveryRequest, - options.isDequeuedRequest); + options.isDequeuedRequest, + options.serialization()); return new WorkflowHandleDBPoll(workflowId); } catch (DBOSWorkflowExecutionConflictException e) { logger.debug("Workflow execution conflict for workflowId {}", workflowId); @@ -1384,12 +1449,16 @@ private static WorkflowInitResult preInvokeWorkflow( Duration timeout, Instant deadline, boolean isRecoveryRequest, - boolean isDequeuedRequest) { + boolean isDequeuedRequest, + String serialization) { if (inputs == null) { inputs = new Object[0]; } - String inputString = JSONUtil.serializeArray(inputs); + // Serialize inputs using the specified serialization format + var serializedArgs = SerializationUtil.serializeArgs(inputs, null, serialization, null); + String inputString = serializedArgs.serializedValue(); + String actualSerialization = serializedArgs.serialization(); var startTime = System.currentTimeMillis(); WorkflowState status = queueName == null ? WorkflowState.PENDING : WorkflowState.ENQUEUED; @@ -1427,7 +1496,8 @@ private static WorkflowInitResult preInvokeWorkflow( null, timeoutMs, deadlineEpochMs, - parentWorkflow != null ? parentWorkflow.workflowId() : null); + parentWorkflow != null ? parentWorkflow.workflowId() : null, + actualSerialization); WorkflowInitResult[] initResult = {null}; initResult[0] = diff --git a/transact/src/main/java/dev/dbos/transact/execution/RegisteredWorkflow.java b/transact/src/main/java/dev/dbos/transact/execution/RegisteredWorkflow.java index a3cca21c..c18df692 100644 --- a/transact/src/main/java/dev/dbos/transact/execution/RegisteredWorkflow.java +++ b/transact/src/main/java/dev/dbos/transact/execution/RegisteredWorkflow.java @@ -1,5 +1,7 @@ package dev.dbos.transact.execution; +import dev.dbos.transact.workflow.SerializationStrategy; + import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import java.util.Objects; @@ -10,7 +12,8 @@ public record RegisteredWorkflow( String instanceName, Object target, Method workflowMethod, - int maxRecoveryAttempts) { + int maxRecoveryAttempts, + SerializationStrategy serializationStrategy) { public RegisteredWorkflow { Objects.requireNonNull(name, "workflow name must not be null"); diff --git a/transact/src/main/java/dev/dbos/transact/internal/WorkflowRegistry.java b/transact/src/main/java/dev/dbos/transact/internal/WorkflowRegistry.java index cf433007..61df42d1 100644 --- a/transact/src/main/java/dev/dbos/transact/internal/WorkflowRegistry.java +++ b/transact/src/main/java/dev/dbos/transact/internal/WorkflowRegistry.java @@ -3,6 +3,7 @@ import dev.dbos.transact.execution.RegisteredWorkflow; import dev.dbos.transact.execution.RegisteredWorkflowInstance; import dev.dbos.transact.execution.SchedulerService; +import dev.dbos.transact.workflow.SerializationStrategy; import java.lang.reflect.Method; import java.util.Map; @@ -29,12 +30,19 @@ public void register( Object target, String instanceName, Method method, - int maxRecoveryAttempts) { + int maxRecoveryAttempts, + SerializationStrategy serializationStrategy) { var fqName = RegisteredWorkflow.fullyQualifiedName(className, instanceName, workflowName); var regWorkflow = new RegisteredWorkflow( - workflowName, className, instanceName, target, method, maxRecoveryAttempts); + workflowName, + className, + instanceName, + target, + method, + maxRecoveryAttempts, + serializationStrategy); SchedulerService.validateScheduledWorkflow(regWorkflow); var previous = wfRegistry.putIfAbsent(fqName, regWorkflow); diff --git a/transact/src/main/java/dev/dbos/transact/json/DBOSJavaSerializer.java b/transact/src/main/java/dev/dbos/transact/json/DBOSJavaSerializer.java new file mode 100644 index 00000000..5e48711a --- /dev/null +++ b/transact/src/main/java/dev/dbos/transact/json/DBOSJavaSerializer.java @@ -0,0 +1,35 @@ +package dev.dbos.transact.json; + +/** + * Native Java serializer using Jackson with type information. This is the default serializer for + * Java DBOS applications. + */ +public class DBOSJavaSerializer implements DBOSSerializer { + + public static final String NAME = "java_jackson"; + + public static final DBOSJavaSerializer INSTANCE = new DBOSJavaSerializer(); + + public DBOSJavaSerializer() {} + + @Override + public String name() { + return NAME; + } + + @Override + public String stringify(Object value, boolean noHistoricalWrapper) { + if (noHistoricalWrapper) return JSONUtil.serializeArray((Object[]) value); + return JSONUtil.serialize(value); + } + + @Override + public Object parse(String text, boolean noHistoricalWrapper) { + if (text == null) { + return null; + } + var vi = JSONUtil.deserializeToArray(text); + if (noHistoricalWrapper) return vi; + return vi[0]; + } +} diff --git a/transact/src/main/java/dev/dbos/transact/json/DBOSPortableSerializer.java b/transact/src/main/java/dev/dbos/transact/json/DBOSPortableSerializer.java new file mode 100644 index 00000000..ec993d27 --- /dev/null +++ b/transact/src/main/java/dev/dbos/transact/json/DBOSPortableSerializer.java @@ -0,0 +1,178 @@ +package dev.dbos.transact.json; + +import java.time.Instant; +import java.time.OffsetDateTime; +import java.time.ZonedDateTime; +import java.time.format.DateTimeFormatter; +import java.util.Date; +import java.util.List; +import java.util.Map; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.SerializationFeature; +import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; + +/** + * Portable JSON serializer that produces output compatible with any language. Does not include + * Java-specific type information. + * + *

Dates are serialized as ISO-8601 strings. Maps and Sets are serialized as plain JSON + * objects/arrays. Does not preserve Java class information. + */ +public class DBOSPortableSerializer implements DBOSSerializer { + + public static final String NAME = "portable_json"; + + public static final DBOSPortableSerializer INSTANCE = new DBOSPortableSerializer(); + + private final ObjectMapper mapper; + + public DBOSPortableSerializer() { + this.mapper = new ObjectMapper(); + mapper.registerModule(new JavaTimeModule()); + // Write dates as ISO-8601 strings for portability + mapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS); + } + + @Override + public String name() { + return NAME; + } + + @Override + public String stringify(Object value, boolean _noHistoricalWrapper) { + try { + return mapper.writeValueAsString(toPortable(value)); + } catch (JsonProcessingException e) { + throw new JSONUtil.JsonRuntimeException(e); + } + } + + @Override + public Object parse(String text, boolean _noHistoricalWrapper) { + if (text == null) { + return null; + } + try { + return mapper.readValue(text, Object.class); + } catch (JsonProcessingException e) { + throw new JSONUtil.JsonRuntimeException(e); + } + } + + /** Serialize workflow arguments in portable format. */ + public String stringifyArgs(Object[] positionalArgs, Map namedArgs) { + JsonWorkflowArgs args = + new JsonWorkflowArgs(positionalArgs, namedArgs != null ? toPortableMap(namedArgs) : null); + try { + return mapper.writeValueAsString(args); + } catch (JsonProcessingException e) { + throw new JSONUtil.JsonRuntimeException(e); + } + } + + /** Deserialize workflow arguments from portable format. */ + public JsonWorkflowArgs parseArgs(String text) { + if (text == null) { + return null; + } + try { + return mapper.readValue(text, JsonWorkflowArgs.class); + } catch (JsonProcessingException e) { + throw new JSONUtil.JsonRuntimeException(e); + } + } + + /** Serialize an error in portable format. */ + public String stringifyError(Throwable error) { + JsonWorkflowErrorData errorData = + new JsonWorkflowErrorData( + error.getClass().getSimpleName(), + error.getMessage(), + error instanceof PortableWorkflowException pwe ? pwe.getCode() : null, + error instanceof PortableWorkflowException pwe ? pwe.getData() : null); + try { + return mapper.writeValueAsString(errorData); + } catch (JsonProcessingException e) { + throw new JSONUtil.JsonRuntimeException(e); + } + } + + /** Deserialize an error from portable format. */ + public PortableWorkflowException parseError(String text) { + if (text == null) { + return null; + } + try { + JsonWorkflowErrorData errorData = mapper.readValue(text, JsonWorkflowErrorData.class); + return PortableWorkflowException.fromErrorData(errorData); + } catch (JsonProcessingException e) { + throw new JSONUtil.JsonRuntimeException(e); + } + } + + /** + * Convert a value to its portable representation. - Dates become ISO-8601 strings - Other objects + * pass through (Jackson handles them) + */ + @SuppressWarnings("unchecked") + private Object toPortable(Object value) { + if (value == null) { + return null; + } + + // Convert dates to ISO-8601 strings + if (value instanceof Date date) { + return DateTimeFormatter.ISO_INSTANT.format(date.toInstant()); + } + if (value instanceof Instant instant) { + return DateTimeFormatter.ISO_INSTANT.format(instant); + } + if (value instanceof OffsetDateTime odt) { + return DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(odt); + } + if (value instanceof ZonedDateTime zdt) { + return DateTimeFormatter.ISO_ZONED_DATE_TIME.format(zdt); + } + + // Convert arrays recursively + if (value instanceof Object[] array) { + return toPortableArray(array); + } + + // Convert lists recursively + if (value instanceof List list) { + return list.stream().map(this::toPortable).toList(); + } + + // Convert maps recursively + if (value instanceof Map map) { + return toPortableMap((Map) map); + } + + // Errors become error data + if (value instanceof Throwable t) { + return new JsonWorkflowErrorData(t.getClass().getSimpleName(), t.getMessage()); + } + + return value; + } + + private Object[] toPortableArray(Object[] array) { + Object[] result = new Object[array.length]; + for (int i = 0; i < array.length; i++) { + result[i] = toPortable(array[i]); + } + return result; + } + + @SuppressWarnings("unchecked") + private Map toPortableMap(Map map) { + java.util.HashMap result = new java.util.HashMap<>(); + for (Map.Entry entry : map.entrySet()) { + result.put(entry.getKey(), toPortable(entry.getValue())); + } + return result; + } +} diff --git a/transact/src/main/java/dev/dbos/transact/json/DBOSSerializer.java b/transact/src/main/java/dev/dbos/transact/json/DBOSSerializer.java new file mode 100644 index 00000000..88ba9794 --- /dev/null +++ b/transact/src/main/java/dev/dbos/transact/json/DBOSSerializer.java @@ -0,0 +1,31 @@ +package dev.dbos.transact.json; + +/** + * Generic serializer interface for DBOS. Implementations must be able to serialize any value to a + * string and deserialize it back. + */ +public interface DBOSSerializer { + /** + * Return a name for the serialization format. This name is stored in the database to identify how + * data was serialized. + */ + String name(); + + /** + * Serialize a value to a string. + * + * @param value The value to serialize + * @param noHistoricalWrapper The value is not expected to have a wrapper enclosing array + * @return The serialized string representation + */ + String stringify(Object value, boolean noHistoricalWrapper); + + /** + * Deserialize a string back to a value. + * + * @param text A serialized string (potentially null) + * @param noHistoricalWrapper The value is not expected to have a wrapper enclosing array + * @return The deserialized value, or null if the input was null + */ + Object parse(String text, boolean noHistoricalWrapper); +} diff --git a/transact/src/main/java/dev/dbos/transact/json/JsonWorkflowArgs.java b/transact/src/main/java/dev/dbos/transact/json/JsonWorkflowArgs.java new file mode 100644 index 00000000..4374d4a7 --- /dev/null +++ b/transact/src/main/java/dev/dbos/transact/json/JsonWorkflowArgs.java @@ -0,0 +1,18 @@ +package dev.dbos.transact.json; + +import java.util.Map; + +/** + * Portable representation of workflow arguments. This format can be serialized/deserialized by any + * language. + */ +public record JsonWorkflowArgs(Object[] positionalArgs, Map namedArgs) { + + public JsonWorkflowArgs() { + this(null, null); + } + + public JsonWorkflowArgs(Object[] positionalArgs) { + this(positionalArgs, null); + } +} diff --git a/transact/src/main/java/dev/dbos/transact/json/JsonWorkflowErrorData.java b/transact/src/main/java/dev/dbos/transact/json/JsonWorkflowErrorData.java new file mode 100644 index 00000000..bc671505 --- /dev/null +++ b/transact/src/main/java/dev/dbos/transact/json/JsonWorkflowErrorData.java @@ -0,0 +1,12 @@ +package dev.dbos.transact.json; + +/** + * Portable representation of workflow errors. This format can be serialized/deserialized by any + * language. + */ +public record JsonWorkflowErrorData(String name, String message, Object code, Object data) { + + public JsonWorkflowErrorData(String name, String message) { + this(name, message, null, null); + } +} diff --git a/transact/src/main/java/dev/dbos/transact/json/PortableWorkflowException.java b/transact/src/main/java/dev/dbos/transact/json/PortableWorkflowException.java new file mode 100644 index 00000000..48dbfb20 --- /dev/null +++ b/transact/src/main/java/dev/dbos/transact/json/PortableWorkflowException.java @@ -0,0 +1,45 @@ +package dev.dbos.transact.json; + +/** + * Exception that can be serialized and deserialized portably across languages. Used when + * deserializing errors from the portable JSON format. + */ +public class PortableWorkflowException extends RuntimeException { + private final String errorName; + private final Object code; + private final Object data; + + public PortableWorkflowException(String message, String errorName, Object code, Object data) { + super(message); + this.errorName = errorName; + this.code = code; + this.data = data; + } + + public PortableWorkflowException(String message, String errorName) { + this(message, errorName, null, null); + } + + public String getErrorName() { + return errorName; + } + + public Object getCode() { + return code; + } + + public Object getData() { + return data; + } + + /** Create from portable error data. */ + public static PortableWorkflowException fromErrorData(JsonWorkflowErrorData errorData) { + return new PortableWorkflowException( + errorData.message(), errorData.name(), errorData.code(), errorData.data()); + } + + /** Convert to portable error data. */ + public JsonWorkflowErrorData toErrorData() { + return new JsonWorkflowErrorData(errorName, getMessage(), code, data); + } +} diff --git a/transact/src/main/java/dev/dbos/transact/json/SerializationUtil.java b/transact/src/main/java/dev/dbos/transact/json/SerializationUtil.java new file mode 100644 index 00000000..7aa641fa --- /dev/null +++ b/transact/src/main/java/dev/dbos/transact/json/SerializationUtil.java @@ -0,0 +1,278 @@ +package dev.dbos.transact.json; + +import java.util.Map; +import java.util.Objects; + +/** + * Utility class for serialization and deserialization with support for multiple formats. + * + *

This class handles the logic of choosing the appropriate serializer based on the serialization + * format stored in the database. It supports: + * + *

    + *
  • {@code portable_json} - Portable format compatible with any language + *
  • {@code java_serialize} - Native Java format with type information + *
  • Custom serializers registered by the application + *
+ */ +public final class SerializationUtil { + + /** Serialization format for portable JSON (cross-language compatible). */ + public static final String PORTABLE = DBOSPortableSerializer.NAME; + + /** Serialization format for native Java serialization. */ + public static final String NATIVE = DBOSJavaSerializer.NAME; + + private SerializationUtil() {} + + // ============ Value Serialization ============ + + /** + * Serialize a value using the specified format. + * + * @param value the value to serialize + * @param format the serialization format ("portable_json", "java_jackson", custom name, null) + * @param customSerializer optional custom serializer (used if format is not portable/native) + * @return the serialized result containing the serialized string and the serializer name + */ + public static SerializedResult serializeValue( + Object value, String format, DBOSSerializer customSerializer) { + + if (PORTABLE.equals(format)) { + String serialized = DBOSPortableSerializer.INSTANCE.stringify(value, false); + return new SerializedResult(serialized, DBOSPortableSerializer.NAME); + } + + if (NATIVE.equals(format)) { + String serialized = DBOSJavaSerializer.INSTANCE.stringify(value, false); + return new SerializedResult(serialized, DBOSJavaSerializer.NAME); + } + + // Default / custom + DBOSSerializer serializer = + customSerializer != null ? customSerializer : DBOSJavaSerializer.INSTANCE; + if (format != null && !serializer.name().equals(format)) { + throw new IllegalArgumentException("Serializer is not available"); + } + String serialized = serializer.stringify(value, false); + return new SerializedResult(serialized, serializer.name()); + } + + /** + * Deserialize a value using the serialization format stored with it. + * + * @param serializedValue the serialized string + * @param serialization the serialization format name (from DB column) + * @param customSerializer optional custom serializer + * @return the deserialized value + */ + public static Object deserializeValue( + String serializedValue, String serialization, DBOSSerializer customSerializer) { + + if (serializedValue == null) { + return null; + } + + if (DBOSPortableSerializer.NAME.equals(serialization)) { + return DBOSPortableSerializer.INSTANCE.parse(serializedValue, false); + } + + if (DBOSJavaSerializer.NAME.equals(serialization)) { + return DBOSJavaSerializer.INSTANCE.parse(serializedValue, false); + } + + DBOSSerializer serializer = customSerializer; + if (serializer == null) serializer = DBOSJavaSerializer.INSTANCE; + if (serialization != null && !serializer.name().equals(serialization)) { + throw new IllegalArgumentException("Serialization is not available"); + } + + return serializer.parse(serializedValue, false); + } + + // ============ Arguments Serialization ============ + + /** + * Serialize workflow arguments using the specified format. + * + * @param positionalArgs the positional arguments + * @param namedArgs the named arguments (only supported for portable format) + * @param serialization the serialization format + * @param customSerializer optional custom serializer + * @return the serialized result + */ + public static SerializedResult serializeArgs( + Object[] positionalArgs, + Map namedArgs, + String serialization, + DBOSSerializer customSerializer) { + + if (PORTABLE.equals(serialization)) { + String serialized = DBOSPortableSerializer.INSTANCE.stringifyArgs(positionalArgs, namedArgs); + return new SerializedResult(serialized, DBOSPortableSerializer.NAME); + } + + if (namedArgs != null && !namedArgs.isEmpty()) { + throw new IllegalArgumentException( + "Serialization format '" + serialization + "' does not support named arguments"); + } + + if (NATIVE.equals(serialization)) { + String serialized = DBOSJavaSerializer.INSTANCE.stringify(positionalArgs, true); + return new SerializedResult(serialized, DBOSJavaSerializer.NAME); + } + + DBOSSerializer serializer = customSerializer; + if (serializer == null) serializer = DBOSJavaSerializer.INSTANCE; + if (serialization != null && !serializer.name().equals(serialization)) { + throw new IllegalArgumentException("Serialization is not available"); + } + + String serialized = serializer.stringify(positionalArgs, true); + return new SerializedResult(serialized, serializer.name()); + } + + /** + * Deserialize workflow arguments (positional only). + * + * @param serializedValue the serialized string + * @param serialization the serialization format name + * @param customSerializer optional custom serializer + * @return the positional arguments array + */ + public static Object[] deserializePositionalArgs( + String serializedValue, String serialization, DBOSSerializer customSerializer) { + + if (serializedValue == null) { + return new Object[0]; + } + + if (DBOSPortableSerializer.NAME.equals(serialization)) { + JsonWorkflowArgs args = DBOSPortableSerializer.INSTANCE.parseArgs(serializedValue); + if (args == null || args.positionalArgs() == null) { + return new Object[0]; + } + return args.positionalArgs(); + } + + if (DBOSJavaSerializer.NAME.equals(serialization) || serialization == null) { + return (Object[]) DBOSJavaSerializer.INSTANCE.parse(serializedValue, true); + } + + DBOSSerializer serializer = customSerializer; + if (serializer == null) serializer = DBOSJavaSerializer.INSTANCE; + if (serialization != null && !serializer.name().equals(serialization)) { + throw new IllegalArgumentException("Serialization is not available"); + } + + return (Object[]) serializer.parse(serializedValue, true); + } + + // ============ Error Serialization ============ + + /** + * Serialize an error using the specified format. + * + * @param error the error to serialize + * @param format the serialization format + * @param customSerializer optional custom serializer + * @return the serialized result + */ + public static SerializedResult serializeError( + Throwable error, String format, DBOSSerializer customSerializer) { + + if (PORTABLE.equals(format)) { + String serialized = DBOSPortableSerializer.INSTANCE.stringifyError(error); + return new SerializedResult(serialized, DBOSPortableSerializer.NAME); + } + + if (NATIVE.equals(format) || format == null) { + // Use the existing Java error serialization + String serialized = JSONUtil.serializeAppException(error); + return new SerializedResult(serialized, DBOSJavaSerializer.NAME); + } + + // Custom serializer - use native Java format + String serialized = JSONUtil.serializeAppException(error); + DBOSSerializer serializer = + customSerializer != null ? customSerializer : DBOSJavaSerializer.INSTANCE; + return new SerializedResult(serialized, serializer.name()); + } + + /** + * Deserialize an error. + * + * @param serializedValue the serialized string + * @param serialization the serialization format name + * @param customSerializer optional custom serializer + * @return the deserialized throwable + */ + public static Throwable deserializeError( + String serializedValue, String serialization, DBOSSerializer customSerializer) { + + if (serializedValue == null) { + return null; + } + + if (DBOSPortableSerializer.NAME.equals(serialization)) { + return DBOSPortableSerializer.INSTANCE.parseError(serializedValue); + } + + if (DBOSJavaSerializer.NAME.equals(serialization) || serialization == null) { + return JSONUtil.deserializeAppException(serializedValue); + } + + // Try custom or fall back to Java + try { + return JSONUtil.deserializeAppException(serializedValue); + } catch (Exception e) { + // Return a generic exception with the message + return new RuntimeException("Deserialization failed for format: " + serialization, e); + } + } + + /** + * Safely parse a value, returning the raw string if parsing fails. Used for introspection methods + * that may encounter old or undeserializable data. + */ + public static Object safeParse( + String serializedValue, String serialization, DBOSSerializer customSerializer) { + try { + return deserializeValue(serializedValue, serialization, customSerializer); + } catch (Exception e) { + return serializedValue; + } + } + + /** Safely parse arguments, returning the raw string if parsing fails. */ + public static Object safeParseArgs( + String serializedValue, String serialization, DBOSSerializer customSerializer) { + try { + return deserializePositionalArgs(serializedValue, serialization, customSerializer); + } catch (Exception e) { + return serializedValue; + } + } + + /** Safely parse an error, returning a RuntimeException with the raw message if parsing fails. */ + public static Throwable safeParseError( + String serializedValue, String serialization, DBOSSerializer customSerializer) { + try { + return deserializeError(serializedValue, serialization, customSerializer); + } catch (Exception e) { + return new RuntimeException(serializedValue); + } + } + + /** + * Result of a serialization operation, containing both the serialized string and the name of the + * serializer used (to be stored in the DB). + */ + /** Result of serialization, containing the serialized string and the format used. */ + public static record SerializedResult(String serializedValue, String serialization) { + public SerializedResult { + Objects.requireNonNull(serializedValue); + // serialization can be null for backward compatibility (default format) + } + } +} diff --git a/transact/src/main/java/dev/dbos/transact/migrations/MigrationManager.java b/transact/src/main/java/dev/dbos/transact/migrations/MigrationManager.java index 4abb6bca..f079c024 100644 --- a/transact/src/main/java/dev/dbos/transact/migrations/MigrationManager.java +++ b/transact/src/main/java/dev/dbos/transact/migrations/MigrationManager.java @@ -236,7 +236,8 @@ public static List getMigrations(String schema) { migration5, migration6, migration7, - migration8); + migration8, + migration9); return migrations.stream().map(m -> m.formatted(schema)).toList(); } @@ -407,4 +408,14 @@ FOREIGN KEY (workflow_uuid) REFERENCES %1$s.workflow_status(workflow_uuid) ALTER TABLE %1$s."workflow_status" ADD COLUMN "parent_workflow_id" TEXT DEFAULT NULL; CREATE INDEX "idx_workflow_status_parent_workflow_id" ON %1$s."workflow_status" ("parent_workflow_id"); """; + + static final String migration9 = + """ + ALTER TABLE %1$s."workflow_status" ADD COLUMN "serialization" TEXT DEFAULT NULL; + ALTER TABLE %1$s."notifications" ADD COLUMN "serialization" TEXT DEFAULT NULL; + ALTER TABLE %1$s."workflow_events" ADD COLUMN "serialization" TEXT DEFAULT NULL; + ALTER TABLE %1$s."workflow_events_history" ADD COLUMN "serialization" TEXT DEFAULT NULL; + ALTER TABLE %1$s."operation_outputs" ADD COLUMN "serialization" TEXT DEFAULT NULL; + ALTER TABLE %1$s."streams" ADD COLUMN "serialization" TEXT DEFAULT NULL; + """; } diff --git a/transact/src/main/java/dev/dbos/transact/tempworkflows/InternalWorkflowsService.java b/transact/src/main/java/dev/dbos/transact/tempworkflows/InternalWorkflowsService.java index 6cd3ebd7..c3e9e07b 100644 --- a/transact/src/main/java/dev/dbos/transact/tempworkflows/InternalWorkflowsService.java +++ b/transact/src/main/java/dev/dbos/transact/tempworkflows/InternalWorkflowsService.java @@ -1,6 +1,9 @@ package dev.dbos.transact.tempworkflows; +import dev.dbos.transact.workflow.SerializationStrategy; + public interface InternalWorkflowsService { - void sendWorkflow(String destinationId, Object message, String topic); + void sendWorkflow( + String destinationId, Object message, String topic, SerializationStrategy serialization); } diff --git a/transact/src/main/java/dev/dbos/transact/tempworkflows/InternalWorkflowsServiceImpl.java b/transact/src/main/java/dev/dbos/transact/tempworkflows/InternalWorkflowsServiceImpl.java index 4b898a66..4b49f184 100644 --- a/transact/src/main/java/dev/dbos/transact/tempworkflows/InternalWorkflowsServiceImpl.java +++ b/transact/src/main/java/dev/dbos/transact/tempworkflows/InternalWorkflowsServiceImpl.java @@ -1,11 +1,14 @@ package dev.dbos.transact.tempworkflows; import dev.dbos.transact.DBOS; +import dev.dbos.transact.workflow.SerializationStrategy; import dev.dbos.transact.workflow.Workflow; public class InternalWorkflowsServiceImpl implements InternalWorkflowsService { @Workflow(name = "internalSendWorkflow") - public void sendWorkflow(String destinationId, Object message, String topic) { - DBOS.send(destinationId, message, topic); + public void sendWorkflow( + String destinationId, Object message, String topic, SerializationStrategy serialization) { + + DBOS.send(destinationId, message, topic, null, serialization); } } diff --git a/transact/src/main/java/dev/dbos/transact/workflow/ErrorResult.java b/transact/src/main/java/dev/dbos/transact/workflow/ErrorResult.java index de57c32b..6b14cfa2 100644 --- a/transact/src/main/java/dev/dbos/transact/workflow/ErrorResult.java +++ b/transact/src/main/java/dev/dbos/transact/workflow/ErrorResult.java @@ -1,26 +1,26 @@ package dev.dbos.transact.workflow; +import dev.dbos.transact.json.DBOSSerializer; import dev.dbos.transact.json.JSONUtil; public record ErrorResult( String className, String message, String serializedError, Throwable throwable) { - public static ErrorResult fromThrowable(Throwable error) { - if (error != null) { - var serializedError = JSONUtil.serializeAppException(error); - return deserialize(serializedError); - } else { + public static ErrorResult fromThrowable( + Throwable error, String serialization, DBOSSerializer serializer) { + if (error == null) { return null; } + var serializedError = JSONUtil.serializeAppException(error); + return deserialize(serializedError, serialization, serializer); } - public static ErrorResult deserialize(String serializedError) { - if (serializedError != null) { - var wrapper = JSONUtil.deserializeAppExceptionWrapper(serializedError); - Throwable throwable = JSONUtil.deserializeAppException(serializedError); - return new ErrorResult(wrapper.type, wrapper.message, serializedError, throwable); - } else { - return null; - } + public static ErrorResult deserialize( + String serializedError, String serialization, DBOSSerializer serializer) { + if (serializedError == null) return null; + + var wrapper = JSONUtil.deserializeAppExceptionWrapper(serializedError); + Throwable throwable = JSONUtil.deserializeAppException(serializedError); + return new ErrorResult(wrapper.type, wrapper.message, serializedError, throwable); } } diff --git a/transact/src/main/java/dev/dbos/transact/workflow/SerializationStrategy.java b/transact/src/main/java/dev/dbos/transact/workflow/SerializationStrategy.java new file mode 100644 index 00000000..59980faf --- /dev/null +++ b/transact/src/main/java/dev/dbos/transact/workflow/SerializationStrategy.java @@ -0,0 +1,52 @@ +package dev.dbos.transact.workflow; + +import dev.dbos.transact.json.SerializationUtil; + +/** + * Serialization strategy for workflow arguments, results, events, messages, and streams. + * + *

This enum represents the strategic choice of serialization format at the client level. The + * actual serialization format name used in the database is determined by the strategy: + * + *

    + *
  • {@link #PORTABLE} - Uses portable JSON format for cross-language compatibility + *
  • {@link #NATIVE} - Explicitly uses the native format for this language + *
  • {@link #DEFAULT} - Uses the default format for this language (native Java serialization, + * unless context dictates portable) + *
+ */ +public enum SerializationStrategy { + /** + * Use the default serialization for this language. For Java, this is the native Java + * serialization format ({@code java_jackson}), except if the running workflow is portable. + */ + DEFAULT(null), + + /** + * Use portable JSON serialization ({@code portable_json}). This format is compatible across + * languages and should be used when workflows may be initiated or consumed by applications + * written in different languages (e.g., TypeScript, Python). + */ + PORTABLE(SerializationUtil.PORTABLE), + + /** + * Explicitly use the native serialization format for this language. For Java, this is {@code + * java_jackson}. + */ + NATIVE(SerializationUtil.NATIVE); + + private final String formatName; + + SerializationStrategy(String formatName) { + this.formatName = formatName; + } + + /** + * Get the serialization format name to use in the database. + * + * @return the format name, or null for DEFAULT (which lets the lower layers decide) + */ + public String formatName() { + return formatName; + } +} diff --git a/transact/src/main/java/dev/dbos/transact/workflow/StepInfo.java b/transact/src/main/java/dev/dbos/transact/workflow/StepInfo.java index deb08363..a0af78df 100644 --- a/transact/src/main/java/dev/dbos/transact/workflow/StepInfo.java +++ b/transact/src/main/java/dev/dbos/transact/workflow/StepInfo.java @@ -7,4 +7,5 @@ public record StepInfo( ErrorResult error, String childWorkflowId, Long startedAtEpochMs, - Long completedAtEpochMs) {} + Long completedAtEpochMs, + String serialization) {} diff --git a/transact/src/main/java/dev/dbos/transact/workflow/Workflow.java b/transact/src/main/java/dev/dbos/transact/workflow/Workflow.java index 746f6e46..eaf7f43d 100644 --- a/transact/src/main/java/dev/dbos/transact/workflow/Workflow.java +++ b/transact/src/main/java/dev/dbos/transact/workflow/Workflow.java @@ -11,4 +11,6 @@ String name() default ""; int maxRecoveryAttempts() default -1; + + SerializationStrategy serializationStrategy() default SerializationStrategy.DEFAULT; } diff --git a/transact/src/main/java/dev/dbos/transact/workflow/WorkflowEvent.java b/transact/src/main/java/dev/dbos/transact/workflow/WorkflowEvent.java index ef0ddd40..47443446 100644 --- a/transact/src/main/java/dev/dbos/transact/workflow/WorkflowEvent.java +++ b/transact/src/main/java/dev/dbos/transact/workflow/WorkflowEvent.java @@ -1,3 +1,3 @@ package dev.dbos.transact.workflow; -public record WorkflowEvent(String key, String value) {} +public record WorkflowEvent(String key, String value, String serialization) {} diff --git a/transact/src/main/java/dev/dbos/transact/workflow/WorkflowEventHistory.java b/transact/src/main/java/dev/dbos/transact/workflow/WorkflowEventHistory.java index e5d3895c..90646b36 100644 --- a/transact/src/main/java/dev/dbos/transact/workflow/WorkflowEventHistory.java +++ b/transact/src/main/java/dev/dbos/transact/workflow/WorkflowEventHistory.java @@ -1,3 +1,3 @@ package dev.dbos.transact.workflow; -public record WorkflowEventHistory(String key, String value, int stepId) {} +public record WorkflowEventHistory(String key, String value, int stepId, String serialization) {} diff --git a/transact/src/main/java/dev/dbos/transact/workflow/WorkflowStatus.java b/transact/src/main/java/dev/dbos/transact/workflow/WorkflowStatus.java index 3a52773b..030f5cf5 100644 --- a/transact/src/main/java/dev/dbos/transact/workflow/WorkflowStatus.java +++ b/transact/src/main/java/dev/dbos/transact/workflow/WorkflowStatus.java @@ -31,7 +31,8 @@ public record WorkflowStatus( Integer priority, String queuePartitionKey, String forkedFrom, - String parentWorkflowId) { + String parentWorkflowId, + String serialization) { @com.fasterxml.jackson.annotation.JsonProperty(access = JsonProperty.Access.READ_ONLY) public Instant deadline() { diff --git a/transact/src/main/java/dev/dbos/transact/workflow/WorkflowStream.java b/transact/src/main/java/dev/dbos/transact/workflow/WorkflowStream.java index aed8d105..301209d6 100644 --- a/transact/src/main/java/dev/dbos/transact/workflow/WorkflowStream.java +++ b/transact/src/main/java/dev/dbos/transact/workflow/WorkflowStream.java @@ -1,3 +1,4 @@ package dev.dbos.transact.workflow; -public record WorkflowStream(String key, String value, int offset, int stepId) {} +public record WorkflowStream( + String key, String value, int offset, int stepId, String serialization) {} diff --git a/transact/src/main/java/dev/dbos/transact/workflow/internal/StepResult.java b/transact/src/main/java/dev/dbos/transact/workflow/internal/StepResult.java index 175ed8a8..f3e6f3e4 100644 --- a/transact/src/main/java/dev/dbos/transact/workflow/internal/StepResult.java +++ b/transact/src/main/java/dev/dbos/transact/workflow/internal/StepResult.java @@ -6,21 +6,28 @@ public record StepResult( String functionName, String output, String error, - String childWorkflowId) { + String childWorkflowId, + String serialization) { public StepResult(String workflowId, int stepId, String functionName) { - this(workflowId, stepId, functionName, null, null, null); + this(workflowId, stepId, functionName, null, null, null, null); } public StepResult withOutput(String v) { - return new StepResult(workflowId, stepId, functionName, v, error, childWorkflowId); + return new StepResult( + workflowId, stepId, functionName, v, error, childWorkflowId, serialization); } public StepResult withError(String v) { - return new StepResult(workflowId, stepId, functionName, output, v, childWorkflowId); + return new StepResult( + workflowId, stepId, functionName, output, v, childWorkflowId, serialization); } public StepResult withChildWorkflowId(String v) { - return new StepResult(workflowId, stepId, functionName, output, error, v); + return new StepResult(workflowId, stepId, functionName, output, error, v, serialization); + } + + public StepResult withSerialization(String v) { + return new StepResult(workflowId, stepId, functionName, output, error, childWorkflowId, v); } } diff --git a/transact/src/main/java/dev/dbos/transact/workflow/internal/WorkflowStatusInternal.java b/transact/src/main/java/dev/dbos/transact/workflow/internal/WorkflowStatusInternal.java index fd130f7a..017e22e4 100644 --- a/transact/src/main/java/dev/dbos/transact/workflow/internal/WorkflowStatusInternal.java +++ b/transact/src/main/java/dev/dbos/transact/workflow/internal/WorkflowStatusInternal.java @@ -27,12 +27,13 @@ public record WorkflowStatusInternal( Long startedAt, Long timeoutMs, Long deadlineEpochMs, - String parentWorkflowId) { + String parentWorkflowId, + String serialization) { public WorkflowStatusInternal() { this( null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, - null, null, null, null, null, null, null, null, null, null); + null, null, null, null, null, null, null, null, null, null, null); } public WorkflowStatusInternal(String workflowUUID, WorkflowState state) { @@ -61,6 +62,7 @@ public WorkflowStatusInternal(String workflowUUID, WorkflowState state) { null, null, null, + null, null); } @@ -90,6 +92,7 @@ public static class Builder { private Long startedAt; private Long timeoutMs; private Long deadlineEpochMs; + private String serialization; public Builder workflowId(String workflowId) { this.workflowId = workflowId; @@ -216,6 +219,11 @@ public Builder deadlineEpochMs(Long deadlineEpochMs) { return this; } + public Builder serialization(String serialization) { + this.serialization = serialization; + return this; + } + public WorkflowStatusInternal build() { return new WorkflowStatusInternal( workflowId, @@ -242,7 +250,8 @@ public WorkflowStatusInternal build() { startedAt, timeoutMs, deadlineEpochMs, - parentWorkflowId); + parentWorkflowId, + serialization); } } diff --git a/transact/src/test/java/dev/dbos/transact/admin/AdminServerTest.java b/transact/src/test/java/dev/dbos/transact/admin/AdminServerTest.java index e5f204f4..6c6e09b6 100644 --- a/transact/src/test/java/dev/dbos/transact/admin/AdminServerTest.java +++ b/transact/src/test/java/dev/dbos/transact/admin/AdminServerTest.java @@ -481,12 +481,22 @@ public void listSteps() throws IOException { List steps = new ArrayList<>(); for (int i = 0; i < 3; i++) { var step = - new StepInfo(i, "step-%d".formatted(i), "output-%d".formatted(i), null, null, null, null); + new StepInfo( + i, "step-%d".formatted(i), "output-%d".formatted(i), null, null, null, null, null); steps.add(step); } - steps.add(new StepInfo(3, "step-3", null, null, "child-wfid-3", null, null)); + steps.add(new StepInfo(3, "step-3", null, null, "child-wfid-3", null, null, null)); var error = new RuntimeException("error-4"); - steps.add(new StepInfo(4, "step-4", null, ErrorResult.fromThrowable(error), null, null, null)); + steps.add( + new StepInfo( + 4, + "step-4", + null, + ErrorResult.fromThrowable(error, null, null), + null, + null, + null, + null)); when(mockDB.listWorkflowSteps(any())).thenReturn(steps); diff --git a/transact/src/test/java/dev/dbos/transact/conductor/ConductorTest.java b/transact/src/test/java/dev/dbos/transact/conductor/ConductorTest.java index c3273d51..b42972c7 100644 --- a/transact/src/test/java/dev/dbos/transact/conductor/ConductorTest.java +++ b/transact/src/test/java/dev/dbos/transact/conductor/ConductorTest.java @@ -365,7 +365,8 @@ public void onWebsocketMessage(WebSocket conn, Framedata frame) { for (int j = 0; j < 1024; j++) { stringBuilder.append(characters.charAt(random.nextInt(characters.length()))); } - steps.add(new StepInfo(i, "function" + i, stringBuilder.toString(), null, null, null, null)); + steps.add( + new StepInfo(i, "function" + i, stringBuilder.toString(), null, null, null, null, null)); } when(mockExec.listWorkflowSteps("large-wf")).thenReturn(steps); @@ -1095,11 +1096,11 @@ public void canListSteps() throws Exception { String workflowId = "workflow-id-1"; List steps = new ArrayList<>(); - steps.add(new StepInfo(0, "function1", null, null, null, null, null)); - steps.add(new StepInfo(1, "function2", null, null, null, null, null)); - steps.add(new StepInfo(2, "function3", null, null, null, null, null)); - steps.add(new StepInfo(3, "function4", null, null, null, null, null)); - steps.add(new StepInfo(4, "function5", null, null, null, null, null)); + steps.add(new StepInfo(0, "function1", null, null, null, null, null, null)); + steps.add(new StepInfo(1, "function2", null, null, null, null, null, null)); + steps.add(new StepInfo(2, "function3", null, null, null, null, null, null)); + steps.add(new StepInfo(3, "function4", null, null, null, null, null, null)); + steps.add(new StepInfo(4, "function5", null, null, null, null, null, null)); when(mockExec.listWorkflowSteps(workflowId)).thenReturn(steps); @@ -1578,13 +1579,14 @@ private static ExportedWorkflow createTestExportedWorkflow(int index) { null, null, currentTime + (i * 1000), - currentTime + ((i + 1) * 1000))); + currentTime + ((i + 1) * 1000), + null)); } int eventCount = (int) (Math.random() * 8) + 2; List events = new ArrayList<>(); for (int i = 0; i < eventCount; i++) { - events.add(new WorkflowEvent(prefix + "event" + (i + 1), prefix + "value" + (i + 1))); + events.add(new WorkflowEvent(prefix + "event" + (i + 1), prefix + "value" + (i + 1), null)); } int historyCount = (int) (Math.random() * 8) + 2; @@ -1594,7 +1596,7 @@ private static ExportedWorkflow createTestExportedWorkflow(int index) { String eventKey = eventCount > 0 ? prefix + "event" + ((i % eventCount) + 1) : prefix + "event" + (i + 1); eventHistory.add( - new WorkflowEventHistory(eventKey, prefix + "historyvalue" + (i + 1), stepId)); + new WorkflowEventHistory(eventKey, prefix + "historyvalue" + (i + 1), stepId, null)); } int streamCount = (int) (Math.random() * 8) + 2; @@ -1603,7 +1605,8 @@ private static ExportedWorkflow createTestExportedWorkflow(int index) { int stepId = i % Math.max(1, stepCount); // Distribute across available steps int offset = i % 3; // Vary offset between 0-2 String streamKey = prefix + "stream" + ((i % 3) + 1); // Use 3 different stream keys - streams.add(new WorkflowStream(streamKey, prefix + "streamvalue" + (i + 1), offset, stepId)); + streams.add( + new WorkflowStream(streamKey, prefix + "streamvalue" + (i + 1), offset, stepId, null)); } return new ExportedWorkflow(status, steps, events, eventHistory, streams); diff --git a/transact/src/test/java/dev/dbos/transact/execution/DBOSExecutorTest.java b/transact/src/test/java/dev/dbos/transact/execution/DBOSExecutorTest.java index b7ddd588..c075b539 100644 --- a/transact/src/test/java/dev/dbos/transact/execution/DBOSExecutorTest.java +++ b/transact/src/test/java/dev/dbos/transact/execution/DBOSExecutorTest.java @@ -9,7 +9,7 @@ import dev.dbos.transact.database.SystemDatabase; import dev.dbos.transact.exceptions.DBOSNonExistentWorkflowException; import dev.dbos.transact.exceptions.DBOSWorkflowFunctionNotFoundException; -import dev.dbos.transact.json.JSONUtil; +import dev.dbos.transact.json.SerializationUtil; import dev.dbos.transact.utils.DBUtils; import dev.dbos.transact.workflow.*; @@ -330,7 +330,8 @@ public void sleepRecovery() throws Exception { long currenttime = System.currentTimeMillis(); long newEndtime = (currenttime + 2000); - String endTimeAsJson = JSONUtil.serialize(newEndtime); + String endTimeAsJson = + SerializationUtil.serializeValue(newEndtime, null, null).serializedValue(); DBUtils.updateStepEndTime(dataSource, wfid, steps.get(0).functionId(), endTimeAsJson); diff --git a/transact/src/test/java/dev/dbos/transact/json/PortableSerializationTest.java b/transact/src/test/java/dev/dbos/transact/json/PortableSerializationTest.java new file mode 100644 index 00000000..ac1762fa --- /dev/null +++ b/transact/src/test/java/dev/dbos/transact/json/PortableSerializationTest.java @@ -0,0 +1,656 @@ +package dev.dbos.transact.json; + +import static org.junit.jupiter.api.Assertions.*; + +import dev.dbos.transact.DBOS; +import dev.dbos.transact.DBOSClient; +import dev.dbos.transact.StartWorkflowOptions; +import dev.dbos.transact.config.DBOSConfig; +import dev.dbos.transact.database.SystemDatabase; +import dev.dbos.transact.utils.DBUtils; +import dev.dbos.transact.workflow.Queue; +import dev.dbos.transact.workflow.SerializationStrategy; +import dev.dbos.transact.workflow.Workflow; +import dev.dbos.transact.workflow.WorkflowClassName; +import dev.dbos.transact.workflow.WorkflowHandle; + +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.SQLException; +import java.time.Duration; +import java.util.UUID; + +import com.zaxxer.hikari.HikariDataSource; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +/** + * Tests for portable serialization format. These tests verify that workflows can be triggered via + * direct database inserts using the portable JSON format, simulating cross-language workflow + * initiation. + */ +@org.junit.jupiter.api.Timeout(value = 2, unit = java.util.concurrent.TimeUnit.MINUTES) +public class PortableSerializationTest { + + private static DBOSConfig dbosConfig; + private HikariDataSource dataSource; + + @BeforeAll + static void onetimeSetup() throws Exception { + PortableSerializationTest.dbosConfig = + DBOSConfig.defaultsFromEnv("portablesertest") + .withDatabaseUrl("jdbc:postgresql://localhost:5432/dbos_java_sys"); + } + + @BeforeEach + void beforeEachTest() throws SQLException { + DBUtils.recreateDB(dbosConfig); + dataSource = SystemDatabase.createDataSource(dbosConfig); + DBOS.reinitialize(dbosConfig); + } + + @AfterEach + void afterEachTest() throws Exception { + dataSource.close(); + DBOS.shutdown(); + } + + /** Workflow interface for portable serialization tests. */ + public interface PortableTestService { + String recvWorkflow(String topic, long timeoutMs); + } + + /** Implementation of the portable test workflow. */ + @WorkflowClassName("PortableTestService") + public static class PortableTestServiceImpl implements PortableTestService { + @Workflow(name = "recvWorkflow") + @Override + public String recvWorkflow(String topic, long timeoutMs) { + Object received = DBOS.recv(topic, Duration.ofMillis(timeoutMs)); + return "received:" + received; + } + } + + /** + * Tests that a workflow can be triggered via direct database insert using portable JSON format. + * This simulates the scenario where a workflow is initiated by another language. + */ + @Test + public void testDirectInsertPortable() throws Exception { + // Register queue and workflow + Queue testQueue = new Queue("testq"); + DBOS.registerQueue(testQueue); + + PortableTestService service = + DBOS.registerWorkflows(PortableTestService.class, new PortableTestServiceImpl()); + + DBOS.launch(); + + String workflowId = UUID.randomUUID().toString(); + + // Insert workflow_status directly with portable_json format + // The inputs are in portable format: { "positionalArgs": ["incoming", 30000] } + // where "incoming" is the topic and 30000 is the timeout in ms + try (Connection conn = dataSource.getConnection()) { + String insertWorkflowSql = + """ + INSERT INTO dbos.workflow_status( + workflow_uuid, + name, + class_name, + config_name, + queue_name, + status, + inputs, + created_at, + serialization + ) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) + """; + + try (PreparedStatement stmt = conn.prepareStatement(insertWorkflowSql)) { + stmt.setString(1, workflowId); + stmt.setString(2, "recvWorkflow"); // workflow name (from @Workflow annotation) + stmt.setString(3, "PortableTestService"); // class name alias (from @WorkflowClassName) + stmt.setString(4, null); + stmt.setString(5, "testq"); // queue name + stmt.setString(6, "ENQUEUED"); // status + // Portable JSON format for inputs: positionalArgs array with topic and timeout + stmt.setString(7, "{\"positionalArgs\":[\"incoming\",30000]}"); // inputs in portable format + stmt.setLong(8, System.currentTimeMillis()); // created_at + stmt.setString(9, "portable_json"); // serialization format + stmt.executeUpdate(); + } + + // Insert notification directly with portable_json format + String insertNotificationSql = + """ + INSERT INTO dbos.notifications( + destination_uuid, + topic, + message, + serialization + ) + VALUES (?, ?, ?, ?) + """; + + try (PreparedStatement stmt = conn.prepareStatement(insertNotificationSql)) { + stmt.setString(1, workflowId); + stmt.setString(2, "incoming"); // topic + stmt.setString( + 3, "\"HelloFromPortable\""); // message in portable JSON format (quoted string) + stmt.setString(4, "portable_json"); // serialization format + stmt.executeUpdate(); + } + } + + // Retrieve the workflow handle and await the result + WorkflowHandle handle = DBOS.retrieveWorkflow(workflowId); + String result = handle.getResult(); + + // Verify the result + assertEquals("received:HelloFromPortable", result); + + // Verify the workflow completed successfully + var status = handle.getStatus(); + assertEquals("SUCCESS", status.status()); + + // Verify the output was written in portable format + var row = DBUtils.getWorkflowRow(dataSource, workflowId); + assertNotNull(row); + assertEquals("portable_json", row.serialization()); + // The output should be in portable format (simple quoted string) + assertEquals("\"received:HelloFromPortable\"", row.output()); + } + + /** + * Tests that a workflow can be enqueued using DBOSClient.enqueueWorkflow with portable + * serialization type option. + */ + @Test + public void testClientEnqueueWithPortableSerialization() throws Exception { + // Register queue and workflow + Queue testQueue = new Queue("testq"); + DBOS.registerQueue(testQueue); + + PortableTestService service = + DBOS.registerWorkflows(PortableTestService.class, new PortableTestServiceImpl()); + + DBOS.launch(); + + // Create a DBOSClient + try (DBOSClient client = new DBOSClient(dataSource)) { + String workflowId = UUID.randomUUID().toString(); + + // Enqueue workflow using client with portable serialization + var options = + new DBOSClient.EnqueueOptions("PortableTestService", "recvWorkflow", "testq") + .withWorkflowId(workflowId) + .withSerialization(SerializationStrategy.PORTABLE); + + WorkflowHandle handle = + client.enqueueWorkflow(options, new Object[] {"incoming", 30000L}); + + // Send a message using portable serialization + client.send( + workflowId, "HelloFromClient", "incoming", null, DBOSClient.SendOptions.portable()); + + // Await the result + String result = handle.getResult(); + + // Verify the result + assertEquals("received:HelloFromClient", result); + + // Verify the workflow completed successfully + var status = handle.getStatus(); + assertEquals("SUCCESS", status.status()); + + // Verify the workflow was stored with portable serialization + var row = DBUtils.getWorkflowRow(dataSource, workflowId); + assertNotNull(row); + assertEquals("portable_json", row.serialization()); + } + } + + /** + * Tests that a workflow can be enqueued using DBOSClient.enqueuePortableWorkflow which uses + * portable JSON serialization by default without validation. + */ + @Test + public void testClientEnqueuePortableWorkflow() throws Exception { + // Register queue and workflow + Queue testQueue = new Queue("testq"); + DBOS.registerQueue(testQueue); + + PortableTestService service = + DBOS.registerWorkflows(PortableTestService.class, new PortableTestServiceImpl()); + + DBOS.launch(); + + // Create a DBOSClient + try (DBOSClient client = new DBOSClient(dataSource)) { + String workflowId = UUID.randomUUID().toString(); + + // Enqueue workflow using enqueuePortableWorkflow + var options = + new DBOSClient.EnqueueOptions("PortableTestService", "recvWorkflow", "testq") + .withWorkflowId(workflowId); + + // Use enqueuePortableWorkflow which defaults to portable serialization + var handle = + client.enqueuePortableWorkflow(options, new Object[] {"incoming", 30000L}, null); + + // Send a message using portable serialization + client.send( + workflowId, + "HelloFromPortableClient", + "incoming", + null, + DBOSClient.SendOptions.portable()); + + // Await the result + String result = handle.getResult(); + + // Verify the result + assertEquals("received:HelloFromPortableClient", result); + + // Verify the workflow completed successfully + var status = handle.getStatus(); + assertEquals("SUCCESS", status.status()); + + // Verify the workflow was stored with portable serialization + var row = DBUtils.getWorkflowRow(dataSource, workflowId); + assertNotNull(row); + assertEquals("portable_json", row.serialization()); + // The output should be in portable format + assertEquals("\"received:HelloFromPortableClient\"", row.output()); + } + } + + /** Workflow interface for testing setEvent and send with explicit serialization. */ + public interface ExplicitSerService { + String eventWorkflow(); + + void senderWorkflow(String targetId); + } + + /** Implementation that sets events with different serialization types. */ + @WorkflowClassName("ExplicitSerService") + public static class ExplicitSerServiceImpl implements ExplicitSerService { + @Workflow(name = "eventWorkflow") + @Override + public String eventWorkflow() { + // Set events with different serialization types + DBOS.setEvent("defaultEvent", "defaultValue"); + DBOS.setEvent("nativeEvent", "nativeValue", SerializationStrategy.NATIVE); + DBOS.setEvent("portableEvent", "portableValue", SerializationStrategy.PORTABLE); + return "done"; + } + + @Workflow(name = "senderWorkflow") + @Override + public void senderWorkflow(String targetId) { + // Send messages with different serialization types + DBOS.send(targetId, "defaultMsg", "defaultTopic"); + DBOS.send(targetId, "nativeMsg", "nativeTopic", null, SerializationStrategy.NATIVE); + DBOS.send(targetId, "portableMsg", "portableTopic", null, SerializationStrategy.PORTABLE); + } + } + + /** Implementation that sets events with different serialization types. */ + @WorkflowClassName("ExplicitSerServicePortable") + public static class ExplicitSerServicePortableImpl implements ExplicitSerService { + @Workflow(name = "eventWorkflow", serializationStrategy = SerializationStrategy.PORTABLE) + @Override + public String eventWorkflow() { + // Set events with different serialization types + DBOS.setEvent("defaultEvent", "defaultValue"); + DBOS.setEvent("nativeEvent", "nativeValue", SerializationStrategy.NATIVE); + DBOS.setEvent("portableEvent", "portableValue", SerializationStrategy.PORTABLE); + return "done"; + } + + @Workflow(name = "senderWorkflow") + @Override + public void senderWorkflow(String targetId) { + // Send messages with different serialization types + DBOS.send(targetId, "defaultMsg", "defaultTopic"); + DBOS.send(targetId, "nativeMsg", "nativeTopic", null, SerializationStrategy.NATIVE); + DBOS.send(targetId, "portableMsg", "portableTopic", null, SerializationStrategy.PORTABLE); + } + } + + /** Workflow that throws an error for testing portable error serialization. */ + public interface ErrorService { + void errorWorkflow(); + } + + @WorkflowClassName("ErrorService") + public static class ErrorServiceImpl implements ErrorService { + @Workflow(name = "errorWorkflow") + @Override + public void errorWorkflow() { + throw new RuntimeException("Workflow failed!"); + } + } + + /** + * Tests that DBOS.setEvent() with explicit SerializationStrategy correctly stores the + * serialization format in the database. + */ + @Test + public void testSetEventWithVaryingSerialization() throws Exception { + Queue testQueue = new Queue("testq"); + DBOS.registerQueue(testQueue); + var defsvc = DBOS.registerWorkflows(ExplicitSerService.class, new ExplicitSerServiceImpl()); + var portsvc = + DBOS.registerWorkflows(ExplicitSerService.class, new ExplicitSerServicePortableImpl()); + + DBOS.launch(); + + for (String sertype : new String[] {"defq", "portq", "defstart", "portstart"}) { + // Use DBOSClient to enqueue and run the workflow + try (DBOSClient client = new DBOSClient(dataSource)) { + String workflowId = UUID.randomUUID().toString(); + WorkflowHandle handle = null; + boolean isPortable = sertype.startsWith("port"); + + if (sertype.equals("defq")) { + var options = + new DBOSClient.EnqueueOptions("ExplicitSerService", "eventWorkflow", "testq") + .withWorkflowId(workflowId); + + handle = client.enqueueWorkflow(options, new Object[] {}); + } + if (sertype.equals("portq")) { + var options = + new DBOSClient.EnqueueOptions("ExplicitSerService", "eventWorkflow", "testq") + .withWorkflowId(workflowId) + .withSerialization(SerializationStrategy.PORTABLE); + + handle = client.enqueueWorkflow(options, new Object[] {}); + } + if (sertype.equals("defstart")) { + handle = + DBOS.startWorkflow( + () -> { + return defsvc.eventWorkflow(); + }, + new StartWorkflowOptions(workflowId)); + } + if (sertype.equals("portstart")) { + handle = + DBOS.startWorkflow( + () -> { + return portsvc.eventWorkflow(); + }, + new StartWorkflowOptions(workflowId)); + } + + String result = handle.getResult(); + assertEquals("done", result); + + // Check workflow's serialization + var wfRow = DBUtils.getWorkflowRow(dataSource, workflowId); + assertNotNull(wfRow); + var expectedSer = isPortable ? "portable_json" : "java_jackson"; + if (!expectedSer.equals(wfRow.serialization())) { + System.err.println("Expected serialization does not match in: " + sertype); + } + assertEquals(expectedSer, wfRow.serialization()); + + // Verify the events in the database have correct serialization + var events = DBUtils.getWorkflowEvents(dataSource, workflowId); + assertEquals(3, events.size()); + + // Find each event and verify serialization + var defaultEvent = events.stream().filter(e -> e.key().equals("defaultEvent")).findFirst(); + var nativeEvent = events.stream().filter(e -> e.key().equals("nativeEvent")).findFirst(); + var portableEvent = + events.stream().filter(e -> e.key().equals("portableEvent")).findFirst(); + + assertTrue(defaultEvent.isPresent()); + assertTrue(nativeEvent.isPresent()); + assertTrue(portableEvent.isPresent()); + + // Default setEvent inherits workflow's serialization + assertEquals(expectedSer, defaultEvent.get().serialization()); + // Native should have java_jackson (explicitly set) + assertEquals("java_jackson", nativeEvent.get().serialization()); + // Portable should have portable_json (explicitly set) + assertEquals("portable_json", portableEvent.get().serialization()); + + // Also verify the event history + var eventHistory = DBUtils.getWorkflowEventHistory(dataSource, workflowId); + assertEquals(3, eventHistory.size()); + + var defaultHist = + eventHistory.stream().filter(e -> e.key().equals("defaultEvent")).findFirst(); + var nativeHist = + eventHistory.stream().filter(e -> e.key().equals("nativeEvent")).findFirst(); + var portableHist = + eventHistory.stream().filter(e -> e.key().equals("portableEvent")).findFirst(); + + assertTrue(defaultHist.isPresent()); + assertTrue(nativeHist.isPresent()); + assertTrue(portableHist.isPresent()); + + assertEquals(expectedSer, defaultHist.get().serialization()); + assertEquals("java_jackson", nativeHist.get().serialization()); + assertEquals("portable_json", portableHist.get().serialization()); + } + } + } + + /** + * Tests that DBOS.send() with explicit SerializationStrategy correctly stores the serialization + * format in the notifications table. + */ + @Test + public void testSendWithExplicitSerialization() throws Exception { + Queue testQueue = new Queue("testq"); + DBOS.registerQueue(testQueue); + DBOS.registerWorkflows(ExplicitSerService.class, new ExplicitSerServiceImpl()); + + DBOS.launch(); + + // Create a target workflow to receive messages + String targetId = UUID.randomUUID().toString(); + + // Insert a dummy workflow to be the target (so FK constraint is satisfied) + try (Connection conn = dataSource.getConnection()) { + String insertSql = + """ + INSERT INTO dbos.workflow_status(workflow_uuid, name, class_name, config_name, status, created_at) + VALUES (?, 'dummy', 'Dummy', '', 'PENDING', ?) + """; + try (PreparedStatement stmt = conn.prepareStatement(insertSql)) { + stmt.setString(1, targetId); + stmt.setLong(2, System.currentTimeMillis()); + stmt.executeUpdate(); + } + } + + // Use DBOSClient to enqueue and run the sender workflow + try (DBOSClient client = new DBOSClient(dataSource)) { + String workflowId = UUID.randomUUID().toString(); + + var options = + new DBOSClient.EnqueueOptions("ExplicitSerService", "senderWorkflow", "testq") + .withWorkflowId(workflowId); + + WorkflowHandle handle = client.enqueueWorkflow(options, new Object[] {targetId}); + handle.getResult(); + + // Verify the notifications in the database have correct serialization + var notifications = DBUtils.getNotifications(dataSource, targetId); + assertEquals(3, notifications.size()); + + var defaultNotif = + notifications.stream().filter(n -> n.topic().equals("defaultTopic")).findFirst(); + var nativeNotif = + notifications.stream().filter(n -> n.topic().equals("nativeTopic")).findFirst(); + var portableNotif = + notifications.stream().filter(n -> n.topic().equals("portableTopic")).findFirst(); + + assertTrue(defaultNotif.isPresent()); + assertTrue(nativeNotif.isPresent()); + assertTrue(portableNotif.isPresent()); + + // Default should have native serialization (backward compatible) + assertEquals("java_jackson", defaultNotif.get().serialization()); + // Native should have java_jackson + assertEquals("java_jackson", nativeNotif.get().serialization()); + // Portable should have portable_json + assertEquals("portable_json", portableNotif.get().serialization()); + + // Also verify the message format + // Portable format wraps strings in quotes + assertEquals("\"portableMsg\"", portableNotif.get().message()); + } + } + + /** Simple workflow interface for event setting tests. */ + public interface EventSetterService { + String setEventWorkflow(); + } + + @WorkflowClassName("EventSetterService") + public static class EventSetterServiceImpl implements EventSetterService { + @Workflow(name = "setEventWorkflow") + @Override + public String setEventWorkflow() { + // Set event without explicit serialization - should inherit from workflow + // context + DBOS.setEvent("myKey", "myValue"); + return "eventSet"; + } + } + + /** + * Tests that a portable workflow (started via portable enqueue) uses portable serialization by + * default for setEvent when no explicit serialization is specified. + */ + @Test + public void testPortableWorkflowDefaultSerialization() throws Exception { + // Workflow that sets an event without explicit serialization + Queue testQueue = new Queue("testq"); + DBOS.registerQueue(testQueue); + + // Register a simple workflow that sets an event + DBOS.registerWorkflows(EventSetterService.class, new EventSetterServiceImpl()); + + DBOS.launch(); + + try (DBOSClient client = new DBOSClient(dataSource)) { + String workflowId = UUID.randomUUID().toString(); + + // Enqueue with portable serialization + var options = + new DBOSClient.EnqueueOptions("EventSetterService", "setEventWorkflow", "testq") + .withWorkflowId(workflowId) + .withSerialization(SerializationStrategy.PORTABLE); + + WorkflowHandle handle = client.enqueueWorkflow(options, new Object[] {}); + + // Wait for completion + String result = handle.getResult(); + assertEquals("eventSet", result); + + // Verify the workflow used portable serialization + var row = DBUtils.getWorkflowRow(dataSource, workflowId); + assertNotNull(row); + assertEquals("portable_json", row.serialization()); + + // Verify the event inherited portable serialization (since it was set without + // explicit type) + var events = DBUtils.getWorkflowEvents(dataSource, workflowId); + assertEquals(1, events.size()); + assertEquals("myKey", events.get(0).key()); + // Event should inherit workflow's portable serialization + assertEquals("portable_json", events.get(0).serialization()); + } + } + + /** Tests that errors thrown from portable workflows are stored in portable JSON format. */ + @Test + public void testPortableWorkflowErrorSerialization() throws Exception { + Queue testQueue = new Queue("testq"); + DBOS.registerQueue(testQueue); + + DBOS.registerWorkflows(ErrorService.class, new ErrorServiceImpl()); + + DBOS.launch(); + + try (DBOSClient client = new DBOSClient(dataSource)) { + String workflowId = UUID.randomUUID().toString(); + + // Enqueue with portable serialization + var options = + new DBOSClient.EnqueueOptions("ErrorService", "errorWorkflow", "testq") + .withWorkflowId(workflowId) + .withSerialization(SerializationStrategy.PORTABLE); + + WorkflowHandle handle = client.enqueueWorkflow(options, new Object[] {}); + + // Wait for completion - should throw + try { + handle.getResult(); + fail("Expected exception to be thrown"); + } catch (Exception e) { + // Expected + assertTrue(e.getMessage().contains("Workflow failed!")); + } + + // Verify the workflow stored error in portable format + var row = DBUtils.getWorkflowRow(dataSource, workflowId); + assertNotNull(row); + assertEquals("portable_json", row.serialization()); + assertEquals("ERROR", row.status()); + + // Verify error is in portable JSON format + assertNotNull(row.error()); + // Portable error format: {"name":"...", "message":"..."} + assertTrue(row.error().contains("\"name\"")); + assertTrue(row.error().contains("\"message\"")); + assertTrue(row.error().contains("Workflow failed!")); + } + } + + /** + * Tests that DBOSClient.getEvent can retrieve events set with different serialization formats. + */ + @Test + public void testClientGetEvent() throws Exception { + Queue testQueue = new Queue("testq"); + DBOS.registerQueue(testQueue); + DBOS.registerWorkflows(ExplicitSerService.class, new ExplicitSerServiceImpl()); + + DBOS.launch(); + + // Use DBOSClient to enqueue and run the workflow that sets events + try (DBOSClient client = new DBOSClient(dataSource)) { + String workflowId = UUID.randomUUID().toString(); + + var options = + new DBOSClient.EnqueueOptions("ExplicitSerService", "eventWorkflow", "testq") + .withWorkflowId(workflowId); + + WorkflowHandle handle = client.enqueueWorkflow(options, new Object[] {}); + String result = handle.getResult(); + assertEquals("done", result); + + // Get events with different serializations + Object defaultVal = client.getEvent(workflowId, "defaultEvent", Duration.ofSeconds(5)); + Object nativeVal = client.getEvent(workflowId, "nativeEvent", Duration.ofSeconds(5)); + Object portableVal = client.getEvent(workflowId, "portableEvent", Duration.ofSeconds(5)); + + // All should be retrievable regardless of serialization format + assertEquals("defaultValue", defaultVal); + assertEquals("nativeValue", nativeVal); + assertEquals("portableValue", portableVal); + } + } +} diff --git a/transact/src/test/java/dev/dbos/transact/utils/DBUtils.java b/transact/src/test/java/dev/dbos/transact/utils/DBUtils.java index ae9c2983..89e0d8b4 100644 --- a/transact/src/test/java/dev/dbos/transact/utils/DBUtils.java +++ b/transact/src/test/java/dev/dbos/transact/utils/DBUtils.java @@ -275,7 +275,7 @@ public static List getStepRows( } } - public record Event(String key, String value) {} + public record Event(String key, String value, String serialization) {} public static List getWorkflowEvents(DataSource ds, String workflowId) throws SQLException { @@ -296,14 +296,15 @@ public static List getWorkflowEvents(DataSource ds, String workflowId, St while (rs.next()) { var key = rs.getString("key"); var value = rs.getString("value"); - rows.add(new Event(key, value)); + var serialization = rs.getString("serialization"); + rows.add(new Event(key, value, serialization)); } return rows; } } - public record EventHistory(int stepId, String key, String value) {} + public record EventHistory(int stepId, String key, String value, String serialization) {} public static List getWorkflowEventHistory(DataSource ds, String workflowId) throws SQLException { @@ -325,13 +326,44 @@ public static List getWorkflowEventHistory( var stepId = rs.getInt("function_id"); var key = rs.getString("key"); var value = rs.getString("value"); - rows.add(new EventHistory(stepId, key, value)); + var serialization = rs.getString("serialization"); + rows.add(new EventHistory(stepId, key, value, serialization)); } return rows; } } + public record Notification( + String destinationUuid, String topic, String message, String serialization) {} + + public static List getNotifications(DataSource ds, String destinationUuid) + throws SQLException { + return getNotifications(ds, destinationUuid, null); + } + + public static List getNotifications( + DataSource ds, String destinationUuid, String schema) throws SQLException { + schema = SystemDatabase.sanitizeSchema(schema); + var sql = + "SELECT * FROM %s.notifications WHERE destination_uuid = ? ORDER BY created_at_epoch_ms" + .formatted(schema); + try (var conn = ds.getConnection(); + var stmt = conn.prepareStatement(sql)) { + stmt.setString(1, destinationUuid); + try (var rs = stmt.executeQuery()) { + List rows = new ArrayList<>(); + while (rs.next()) { + var topic = rs.getString("topic"); + var message = rs.getString("message"); + var serialization = rs.getString("serialization"); + rows.add(new Notification(destinationUuid, topic, message, serialization)); + } + return rows; + } + } + } + public static boolean queueEntriesCleanedUp(DataSource ds) throws SQLException { return queueEntriesCleanedUp(ds, null); } diff --git a/transact/src/test/java/dev/dbos/transact/utils/WorkflowStatusBuilder.java b/transact/src/test/java/dev/dbos/transact/utils/WorkflowStatusBuilder.java index 4c93df1f..dffe970d 100644 --- a/transact/src/test/java/dev/dbos/transact/utils/WorkflowStatusBuilder.java +++ b/transact/src/test/java/dev/dbos/transact/utils/WorkflowStatusBuilder.java @@ -40,6 +40,7 @@ public class WorkflowStatusBuilder { private Long deadlineEpochMs; private String forkedFrom; private String parentWorkflowId; + private String serialization; public WorkflowStatus build() { return new WorkflowStatus( @@ -68,7 +69,8 @@ public WorkflowStatus build() { priority, partitionKey, forkedFrom, - parentWorkflowId); + parentWorkflowId, + serialization); } public WorkflowStatusBuilder(String workflowId) { @@ -111,7 +113,7 @@ public WorkflowStatusBuilder output(Object output) { } public WorkflowStatusBuilder error(Throwable error) { - this.error = ErrorResult.fromThrowable(error); + this.error = ErrorResult.fromThrowable(error, this.serialization, null); return this; } @@ -204,4 +206,9 @@ public WorkflowStatusBuilder parentWorkflowId(String parentWorkflowId) { this.parentWorkflowId = parentWorkflowId; return this; } + + public WorkflowStatusBuilder serialization(String serialization) { + this.serialization = serialization; + return this; + } } diff --git a/transact/src/test/java/dev/dbos/transact/utils/WorkflowStatusRow.java b/transact/src/test/java/dev/dbos/transact/utils/WorkflowStatusRow.java index 900d1b5c..a6214cc2 100644 --- a/transact/src/test/java/dev/dbos/transact/utils/WorkflowStatusRow.java +++ b/transact/src/test/java/dev/dbos/transact/utils/WorkflowStatusRow.java @@ -30,7 +30,8 @@ public record WorkflowStatusRow( Integer priority, String queuePartitionKey, String forkedFrom, - String parentWorkflowId) { + String parentWorkflowId, + String serialization) { public WorkflowStatusRow(ResultSet rs) throws SQLException { this( @@ -60,6 +61,7 @@ public WorkflowStatusRow(ResultSet rs) throws SQLException { rs.getObject("priority", Integer.class), rs.getString("queue_partition_key"), rs.getString("forked_from"), - rs.getString("parent_workflow_id")); + rs.getString("parent_workflow_id"), + rs.getString("serialization")); } }