diff --git a/transact/src/main/java/dev/dbos/transact/admin/ListQueuedWorkflowsRequest.java b/transact/src/main/java/dev/dbos/transact/admin/ListQueuedWorkflowsRequest.java index 5c48606f..b3f132dc 100644 --- a/transact/src/main/java/dev/dbos/transact/admin/ListQueuedWorkflowsRequest.java +++ b/transact/src/main/java/dev/dbos/transact/admin/ListQueuedWorkflowsRequest.java @@ -11,6 +11,7 @@ public record ListQueuedWorkflowsRequest( String end_time, String status, String fork_from, + String parent_workflow_id, String queue_name, Integer limit, Integer offset, @@ -37,6 +38,7 @@ public ListWorkflowsInput asInput() { queue_name, true, // queuesOnly: only list queued workflows null, // Executor IDs - fork_from); + fork_from, + parent_workflow_id); // parent workflow id } } diff --git a/transact/src/main/java/dev/dbos/transact/admin/ListWorkflowsRequest.java b/transact/src/main/java/dev/dbos/transact/admin/ListWorkflowsRequest.java index e5cf4aa4..c7094d80 100644 --- a/transact/src/main/java/dev/dbos/transact/admin/ListWorkflowsRequest.java +++ b/transact/src/main/java/dev/dbos/transact/admin/ListWorkflowsRequest.java @@ -14,6 +14,7 @@ public record ListWorkflowsRequest( String status, String application_version, String fork_from, + String parent_workflow_id, Integer limit, Integer offset, Boolean sort_desc, @@ -38,9 +39,10 @@ public ListWorkflowsInput asInput() { workflow_id_prefix, load_input, load_output, - null, - false, + null, // queueName + false, // queuesOnly null, // Executor IDs - fork_from); + fork_from, + parent_workflow_id); } } diff --git a/transact/src/main/java/dev/dbos/transact/conductor/protocol/ListQueuedWorkflowsRequest.java b/transact/src/main/java/dev/dbos/transact/conductor/protocol/ListQueuedWorkflowsRequest.java index f5d69892..5a3e8cd9 100644 --- a/transact/src/main/java/dev/dbos/transact/conductor/protocol/ListQueuedWorkflowsRequest.java +++ b/transact/src/main/java/dev/dbos/transact/conductor/protocol/ListQueuedWorkflowsRequest.java @@ -17,6 +17,7 @@ public static class Body { public String end_time; public String status; public String forked_from; + public String parent_workflow_id; public String queue_name; public Integer limit; public Integer offset; @@ -30,6 +31,7 @@ public static class Builder { private String end_time; private String status; private String forked_from; + private String parent_workflow_id; private String queue_name; private Integer limit; private Integer offset; @@ -61,6 +63,11 @@ public Builder forkedFrom(String forkedFrom) { return this; } + public Builder parentWorkflowId(String parentWorkflowId) { + this.parent_workflow_id = parentWorkflowId; + return this; + } + public Builder queueName(String queueName) { this.queue_name = queueName; return this; @@ -97,6 +104,7 @@ public ListQueuedWorkflowsRequest build(String requestId) { body.end_time = this.end_time; body.status = this.status; body.forked_from = this.forked_from; + body.parent_workflow_id = this.parent_workflow_id; body.queue_name = this.queue_name; body.limit = this.limit; body.offset = this.offset; @@ -117,6 +125,7 @@ public ListWorkflowsInput asInput() { .withEndTime(body.end_time != null ? OffsetDateTime.parse(body.end_time) : null) .withStatus(body.status) .withForkedFrom(body.forked_from) + .withParentWorkflowId(body.parent_workflow_id) .withQueueName(body.queue_name) .withLimit(body.limit) .withOffset(body.offset) diff --git a/transact/src/main/java/dev/dbos/transact/conductor/protocol/ListWorkflowsRequest.java b/transact/src/main/java/dev/dbos/transact/conductor/protocol/ListWorkflowsRequest.java index 191404ba..5bf7d722 100644 --- a/transact/src/main/java/dev/dbos/transact/conductor/protocol/ListWorkflowsRequest.java +++ b/transact/src/main/java/dev/dbos/transact/conductor/protocol/ListWorkflowsRequest.java @@ -20,6 +20,7 @@ public static class Body { public String end_time; public String status; public String forked_from; + public String parent_workflow_id; public String application_version; public Integer limit; public Integer offset; @@ -36,6 +37,7 @@ public static class Builder { private String end_time; private String status; private String forked_from; + private String parent_workflow_id; private String application_version; private Integer limit; private Integer offset; @@ -83,6 +85,11 @@ public Builder forkedFrom(String forked_from) { return this; } + public Builder parentWorkflowId(String parentWorkflowId) { + this.parent_workflow_id = parentWorkflowId; + return this; + } + public Builder applicationVersion(String application_version) { this.application_version = application_version; return this; @@ -116,6 +123,7 @@ public ListWorkflowsRequest build(String requestId) { body.end_time = this.end_time; body.status = this.status; body.forked_from = this.forked_from; + body.parent_workflow_id = this.parent_workflow_id; body.application_version = this.application_version; body.limit = this.limit; body.offset = this.offset; @@ -136,6 +144,7 @@ public ListWorkflowsInput asInput() { .withEndTime(body.end_time != null ? OffsetDateTime.parse(body.end_time) : null) .withStatus(body.status) .withForkedFrom(body.forked_from) + .withParentWorkflowId(body.parent_workflow_id) .withApplicationVersion(body.application_version) .withLimit(body.limit) .withOffset(body.offset) diff --git a/transact/src/main/java/dev/dbos/transact/conductor/protocol/WorkflowsOutput.java b/transact/src/main/java/dev/dbos/transact/conductor/protocol/WorkflowsOutput.java index b6c67aef..389ecc32 100644 --- a/transact/src/main/java/dev/dbos/transact/conductor/protocol/WorkflowsOutput.java +++ b/transact/src/main/java/dev/dbos/transact/conductor/protocol/WorkflowsOutput.java @@ -31,12 +31,15 @@ public class WorkflowsOutput { public String Priority; public String QueuePartitionKey; public String ForkedFrom; + public String ParentWorkflowID; + public String DequeuedAt; public WorkflowsOutput(WorkflowStatus status) { Object[] input = status.input(); Object output = status.output(); Long createdAt = status.createdAt(); Long updatedAt = status.updatedAt(); + Long startedAt = status.startedAtEpochMs(); String[] authenticatedRoles = status.authenticatedRoles(); this.WorkflowUUID = status.workflowId(); @@ -69,5 +72,7 @@ public WorkflowsOutput(WorkflowStatus status) { this.Priority = Objects.requireNonNullElse(status.priority(), 0).toString(); this.QueuePartitionKey = status.queuePartitionKey(); this.ForkedFrom = status.forkedFrom(); + this.ParentWorkflowID = status.parentWorkflowId(); + this.DequeuedAt = startedAt != null ? Long.toString(startedAt) : null; } } diff --git a/transact/src/main/java/dev/dbos/transact/database/SystemDatabase.java b/transact/src/main/java/dev/dbos/transact/database/SystemDatabase.java index c3fb87d4..f9d4f1c8 100644 --- a/transact/src/main/java/dev/dbos/transact/database/SystemDatabase.java +++ b/transact/src/main/java/dev/dbos/transact/database/SystemDatabase.java @@ -779,9 +779,10 @@ public void importWorkflow(List workflows) { executor_id, application_version, application_id, created_at, updated_at, started_at_epoch_ms, queue_name, deduplication_id, priority, queue_partition_key, - workflow_timeout_ms, workflow_deadline_epoch_ms, recovery_attempts, forked_from + workflow_timeout_ms, workflow_deadline_epoch_ms, + recovery_attempts, forked_from, parent_workflow_id ) VALUES ( - ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ? + ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ? ) """ .formatted(this.schema); @@ -870,6 +871,7 @@ public void importWorkflow(List workflows) { stmt.setObject(23, status.deadlineEpochMs()); stmt.setObject(24, status.recoveryAttempts()); stmt.setString(25, status.forkedFrom()); + stmt.setString(26, status.parentWorkflowId()); stmt.executeUpdate(); } diff --git a/transact/src/main/java/dev/dbos/transact/database/WorkflowDAO.java b/transact/src/main/java/dev/dbos/transact/database/WorkflowDAO.java index cd78c2c2..80bbe26d 100644 --- a/transact/src/main/java/dev/dbos/transact/database/WorkflowDAO.java +++ b/transact/src/main/java/dev/dbos/transact/database/WorkflowDAO.java @@ -170,8 +170,8 @@ InsertWorkflowResult insertWorkflowStatus( executor_id, application_version, application_id, created_at, updated_at, recovery_attempts, workflow_timeout_ms, workflow_deadline_epoch_ms, - owner_xid - ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + parent_workflow_id, owner_xid + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ON CONFLICT (workflow_uuid) DO UPDATE SET recovery_attempts = CASE @@ -222,9 +222,10 @@ ON CONFLICT (workflow_uuid) stmt.setObject(20, status.timeoutMs()); stmt.setObject(21, status.deadlineEpochMs()); + stmt.setString(22, status.parentWorkflowId()); - stmt.setObject(22, ownerXid); - stmt.setInt(23, incrementAttempts ? 1 : 0); + stmt.setObject(23, ownerXid); + stmt.setInt(24, incrementAttempts ? 1 : 0); try (ResultSet rs = stmt.executeQuery()) { if (rs.next()) { @@ -326,14 +327,15 @@ WorkflowStatus getWorkflowStatus(Connection conn, String workflowId) throws SQLE var sql = """ SELECT - workflow_uuid, status, forked_from, + workflow_uuid, status, name, class_name, config_name, inputs, output, error, queue_name, deduplication_id, priority, queue_partition_key, executor_id, application_version, application_id, authenticated_user, assumed_role, authenticated_roles, created_at, updated_at, recovery_attempts, started_at_epoch_ms, - workflow_timeout_ms, workflow_deadline_epoch_ms + workflow_timeout_ms, workflow_deadline_epoch_ms, + forked_from, parent_workflow_id FROM %s.workflow_status WHERE workflow_uuid = ? """ @@ -367,13 +369,14 @@ List listWorkflows(ListWorkflowsInput input) throws SQLException sqlBuilder.append( """ SELECT - workflow_uuid, status, forked_from, + workflow_uuid, status, name, class_name, config_name, queue_name, deduplication_id, priority, queue_partition_key, executor_id, application_version, application_id, authenticated_user, assumed_role, authenticated_roles, created_at, updated_at, recovery_attempts, started_at_epoch_ms, - workflow_timeout_ms, workflow_deadline_epoch_ms + workflow_timeout_ms, workflow_deadline_epoch_ms, + forked_from, parent_workflow_id """); var loadInput = input.loadInput() == null || input.loadInput(); @@ -413,6 +416,10 @@ List listWorkflows(ListWorkflowsInput input) throws SQLException whereConditions.add("forked_from = ?"); parameters.add(input.forkedFrom()); } + if (input.parentWorkflowId() != null) { + whereConditions.add("parent_workflow_id = ?"); + parameters.add(input.parentWorkflowId()); + } if (input.workflowIdPrefix() != null) { whereConditions.add("workflow_uuid LIKE ?"); // Append wildcard directly to the parameter value @@ -544,7 +551,8 @@ private static WorkflowStatus resultsToWorkflowStatus( rs.getString("deduplication_id"), rs.getObject("priority", Integer.class), rs.getString("queue_partition_key"), - rs.getString("forked_from")); + rs.getString("forked_from"), + rs.getString("parent_workflow_id")); return info; } diff --git a/transact/src/main/java/dev/dbos/transact/execution/DBOSExecutor.java b/transact/src/main/java/dev/dbos/transact/execution/DBOSExecutor.java index 20b290cf..3319b546 100644 --- a/transact/src/main/java/dev/dbos/transact/execution/DBOSExecutor.java +++ b/transact/src/main/java/dev/dbos/transact/execution/DBOSExecutor.java @@ -1400,7 +1400,8 @@ private static WorkflowInitResult preInvokeWorkflow( null, null, timeoutMs, - deadlineEpochMs); + deadlineEpochMs, + parentWorkflow != null ? parentWorkflow.workflowId() : null); WorkflowInitResult[] initResult = {null}; initResult[0] = diff --git a/transact/src/main/java/dev/dbos/transact/migrations/MigrationManager.java b/transact/src/main/java/dev/dbos/transact/migrations/MigrationManager.java index de48f940..4abb6bca 100644 --- a/transact/src/main/java/dev/dbos/transact/migrations/MigrationManager.java +++ b/transact/src/main/java/dev/dbos/transact/migrations/MigrationManager.java @@ -228,7 +228,15 @@ static void runDbosMigrations(Connection conn, String schema, List migra public static List getMigrations(String schema) { Objects.requireNonNull(schema); var migrations = - List.of(migration1, migration2, migration3, migration4, migration5, migration6, migration7); + List.of( + migration1, + migration2, + migration3, + migration4, + migration5, + migration6, + migration7, + migration8); return migrations.stream().map(m -> m.formatted(schema)).toList(); } @@ -393,4 +401,10 @@ FOREIGN KEY (workflow_uuid) REFERENCES %1$s.workflow_status(workflow_uuid) """ ALTER TABLE %1$s."workflow_status" ADD COLUMN "owner_xid" VARCHAR(40) DEFAULT NULL """; + + static final String migration8 = + """ + ALTER TABLE %1$s."workflow_status" ADD COLUMN "parent_workflow_id" TEXT DEFAULT NULL; + CREATE INDEX "idx_workflow_status_parent_workflow_id" ON %1$s."workflow_status" ("parent_workflow_id"); + """; } diff --git a/transact/src/main/java/dev/dbos/transact/workflow/ListWorkflowsInput.java b/transact/src/main/java/dev/dbos/transact/workflow/ListWorkflowsInput.java index 0a709f17..c5dbf863 100644 --- a/transact/src/main/java/dev/dbos/transact/workflow/ListWorkflowsInput.java +++ b/transact/src/main/java/dev/dbos/transact/workflow/ListWorkflowsInput.java @@ -28,12 +28,13 @@ public record ListWorkflowsInput( String queueName, Boolean queuesOnly, List executorIds, - String forkedFrom) { + String forkedFrom, + String parentWorkflowId) { public ListWorkflowsInput() { this( null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, - null, null, null, null); + null, null, null, null, null); } /** Restrict the returned workflows to those on the specified `workflowIds` list */ @@ -57,7 +58,8 @@ public ListWorkflowsInput withWorkflowIds(List workflowIds) { queueName, queuesOnly, executorIds, - forkedFrom); + forkedFrom, + parentWorkflowId); } /** @@ -94,7 +96,8 @@ public ListWorkflowsInput withStatuses(List status) { queueName, queuesOnly, executorIds, - forkedFrom); + forkedFrom, + parentWorkflowId); } /** Restrict the returned workflows to those with a status of `status` */ @@ -134,7 +137,8 @@ public ListWorkflowsInput withStartTime(OffsetDateTime startTime) { queueName, queuesOnly, executorIds, - forkedFrom); + forkedFrom, + parentWorkflowId); } /** Restrict the returned workflows to those initiated on or before `endTime` */ @@ -158,7 +162,8 @@ public ListWorkflowsInput withEndTime(OffsetDateTime endTime) { queueName, queuesOnly, executorIds, - forkedFrom); + forkedFrom, + parentWorkflowId); } /** Restrict the returned workflows to those with the function name `workflowName` */ @@ -182,7 +187,8 @@ public ListWorkflowsInput withWorkflowName(String workflowName) { queueName, queuesOnly, executorIds, - forkedFrom); + forkedFrom, + parentWorkflowId); } /** Restrict the returned workflows to those within the class named `className` */ @@ -206,7 +212,8 @@ public ListWorkflowsInput withClassName(String className) { queueName, queuesOnly, executorIds, - forkedFrom); + forkedFrom, + parentWorkflowId); } /** Restrict the returned workflows to those within the instance named `instanceName` */ @@ -230,7 +237,8 @@ public ListWorkflowsInput withInstanceName(String instanceName) { queueName, queuesOnly, executorIds, - forkedFrom); + forkedFrom, + parentWorkflowId); } /** Restrict the returned workflows to those run on app version `applicationVersion` */ @@ -254,7 +262,8 @@ public ListWorkflowsInput withApplicationVersion(String applicationVersion) { queueName, queuesOnly, executorIds, - forkedFrom); + forkedFrom, + parentWorkflowId); } /** Restrict the returned workflows to those run by user `authenticatedUser` */ @@ -278,7 +287,8 @@ public ListWorkflowsInput withAuthenticatedUser(String authenticatedUser) { queueName, queuesOnly, executorIds, - forkedFrom); + forkedFrom, + parentWorkflowId); } /** Restrict the number of returned workflows to `limit` */ @@ -302,7 +312,8 @@ public ListWorkflowsInput withLimit(Integer limit) { queueName, queuesOnly, executorIds, - forkedFrom); + forkedFrom, + parentWorkflowId); } /** @@ -329,7 +340,8 @@ public ListWorkflowsInput withOffset(Integer offset) { queueName, queuesOnly, executorIds, - forkedFrom); + forkedFrom, + parentWorkflowId); } /** @@ -356,7 +368,8 @@ public ListWorkflowsInput withSortDesc(Boolean sortDesc) { queueName, queuesOnly, executorIds, - forkedFrom); + forkedFrom, + parentWorkflowId); } /** Filter returned workflows by a prefix of the workflow ID */ @@ -380,7 +393,8 @@ public ListWorkflowsInput withWorkflowIdPrefix(String workflowIdPrefix) { queueName, queuesOnly, executorIds, - forkedFrom); + forkedFrom, + parentWorkflowId); } /** If true, workflow inputs will be materialized and returned as part of the record */ @@ -404,7 +418,8 @@ public ListWorkflowsInput withLoadInput(Boolean loadInput) { queueName, queuesOnly, executorIds, - forkedFrom); + forkedFrom, + parentWorkflowId); } /** @@ -431,7 +446,8 @@ public ListWorkflowsInput withLoadOutput(Boolean loadOutput) { queueName, queuesOnly, executorIds, - forkedFrom); + forkedFrom, + parentWorkflowId); } /** @@ -458,7 +474,8 @@ public ListWorkflowsInput withQueueName(String queueName) { queueName, queuesOnly, executorIds, - forkedFrom); + forkedFrom, + parentWorkflowId); } /** Restrict the returned workflows to only those run on queues. */ @@ -486,7 +503,8 @@ public ListWorkflowsInput withQueuesOnly(Boolean queuesOnly) { queueName, queuesOnly, executorIds, - forkedFrom); + forkedFrom, + parentWorkflowId); } /** @@ -513,7 +531,8 @@ public ListWorkflowsInput withExecutorIds(List executorIds) { queueName, queuesOnly, executorIds == null ? null : List.copyOf(executorIds), - forkedFrom); + forkedFrom, + parentWorkflowId); } /** Restrict the returned workflows to those forked from the specified workflow ID */ @@ -537,7 +556,33 @@ public ListWorkflowsInput withForkedFrom(String forkedFrom) { queueName, queuesOnly, executorIds, - forkedFrom); + forkedFrom, + parentWorkflowId); + } + + /** Restrict the returned workflows to those with the specified parent workflow ID */ + public ListWorkflowsInput withParentWorkflowId(String parentWorkflowId) { + return new ListWorkflowsInput( + workflowIds, + status, + startTime, + endTime, + workflowName, + className, + instanceName, + applicationVersion, + authenticatedUser, + limit, + offset, + sortDesc, + workflowIdPrefix, + loadInput, + loadOutput, + queueName, + queuesOnly, + executorIds, + forkedFrom, + parentWorkflowId); } public ListWorkflowsInput withAddedWorkflowId(String workflowId) { diff --git a/transact/src/main/java/dev/dbos/transact/workflow/WorkflowStatus.java b/transact/src/main/java/dev/dbos/transact/workflow/WorkflowStatus.java index 52c5cae7..3a52773b 100644 --- a/transact/src/main/java/dev/dbos/transact/workflow/WorkflowStatus.java +++ b/transact/src/main/java/dev/dbos/transact/workflow/WorkflowStatus.java @@ -30,7 +30,8 @@ public record WorkflowStatus( String deduplicationId, Integer priority, String queuePartitionKey, - String forkedFrom) { + String forkedFrom, + String parentWorkflowId) { @com.fasterxml.jackson.annotation.JsonProperty(access = JsonProperty.Access.READ_ONLY) public Instant deadline() { @@ -80,7 +81,8 @@ public boolean equals(Object obj) { && java.util.Objects.equals(deduplicationId, that.deduplicationId) && java.util.Objects.equals(priority, that.priority) && java.util.Objects.equals(queuePartitionKey, that.queuePartitionKey) - && java.util.Objects.equals(forkedFrom, that.forkedFrom); + && java.util.Objects.equals(forkedFrom, that.forkedFrom) + && java.util.Objects.equals(parentWorkflowId, that.parentWorkflowId); } @Override @@ -110,6 +112,7 @@ public int hashCode() { deduplicationId, priority, queuePartitionKey, - forkedFrom); + forkedFrom, + parentWorkflowId); } } diff --git a/transact/src/main/java/dev/dbos/transact/workflow/internal/WorkflowStatusInternal.java b/transact/src/main/java/dev/dbos/transact/workflow/internal/WorkflowStatusInternal.java index 65e5e3fd..fd130f7a 100644 --- a/transact/src/main/java/dev/dbos/transact/workflow/internal/WorkflowStatusInternal.java +++ b/transact/src/main/java/dev/dbos/transact/workflow/internal/WorkflowStatusInternal.java @@ -26,12 +26,13 @@ public record WorkflowStatusInternal( Long recoveryAttempts, Long startedAt, Long timeoutMs, - Long deadlineEpochMs) { + Long deadlineEpochMs, + String parentWorkflowId) { public WorkflowStatusInternal() { this( null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, - null, null, null, null, null, null, null, null, null); + null, null, null, null, null, null, null, null, null, null); } public WorkflowStatusInternal(String workflowUUID, WorkflowState state) { @@ -59,11 +60,13 @@ public WorkflowStatusInternal(String workflowUUID, WorkflowState state) { null, null, null, + null, null); } public static class Builder { private String workflowId; + private String parentWorkflowId; private WorkflowState status; private String name; private String className; @@ -93,6 +96,11 @@ public Builder workflowId(String workflowId) { return this; } + public Builder parentWorkflowId(String parentWorkflowId) { + this.parentWorkflowId = parentWorkflowId; + return this; + } + public Builder status(WorkflowState status) { this.status = status; return this; @@ -233,7 +241,8 @@ public WorkflowStatusInternal build() { recoveryAttempts, startedAt, timeoutMs, - deadlineEpochMs); + deadlineEpochMs, + parentWorkflowId); } } diff --git a/transact/src/test/java/dev/dbos/transact/invocation/DirectInvocationTest.java b/transact/src/test/java/dev/dbos/transact/invocation/DirectInvocationTest.java index 40b72066..70f0078e 100644 --- a/transact/src/test/java/dev/dbos/transact/invocation/DirectInvocationTest.java +++ b/transact/src/test/java/dev/dbos/transact/invocation/DirectInvocationTest.java @@ -217,6 +217,8 @@ void directInvokeParent() throws Exception { assertNull(row1.timeoutMs()); assertNull(row0.deadlineEpochMs()); assertNull(row1.deadlineEpochMs()); + assertNull(row0.parentWorkflowId()); + assertEquals(row0.workflowId(), row1.parentWorkflowId()); var steps = DBUtils.getStepRows(dataSource, row0.workflowId()); assertEquals(1, steps.size()); @@ -278,6 +280,7 @@ void directInvokeParentSetWorkflowId() throws Exception { var row1 = rows.get(1); assertEquals(workflowId, row0.workflowId()); assertEquals(workflowId + "-0", row1.workflowId()); + assertEquals(workflowId, row1.parentWorkflowId()); } @Test diff --git a/transact/src/test/java/dev/dbos/transact/utils/WorkflowStatusBuilder.java b/transact/src/test/java/dev/dbos/transact/utils/WorkflowStatusBuilder.java index 34be02b7..4c93df1f 100644 --- a/transact/src/test/java/dev/dbos/transact/utils/WorkflowStatusBuilder.java +++ b/transact/src/test/java/dev/dbos/transact/utils/WorkflowStatusBuilder.java @@ -39,6 +39,7 @@ public class WorkflowStatusBuilder { private Long timeoutMs; private Long deadlineEpochMs; private String forkedFrom; + private String parentWorkflowId; public WorkflowStatus build() { return new WorkflowStatus( @@ -66,7 +67,8 @@ public WorkflowStatus build() { deduplicationId, priority, partitionKey, - forkedFrom); + forkedFrom, + parentWorkflowId); } public WorkflowStatusBuilder(String workflowId) { @@ -197,4 +199,9 @@ public WorkflowStatusBuilder forkedFrom(String forkedFrom) { this.forkedFrom = forkedFrom; return this; } + + public WorkflowStatusBuilder parentWorkflowId(String parentWorkflowId) { + this.parentWorkflowId = parentWorkflowId; + return this; + } } diff --git a/transact/src/test/java/dev/dbos/transact/utils/WorkflowStatusRow.java b/transact/src/test/java/dev/dbos/transact/utils/WorkflowStatusRow.java index 2adf2042..900d1b5c 100644 --- a/transact/src/test/java/dev/dbos/transact/utils/WorkflowStatusRow.java +++ b/transact/src/test/java/dev/dbos/transact/utils/WorkflowStatusRow.java @@ -27,7 +27,10 @@ public record WorkflowStatusRow( String inputs, Long startedAtEpochMs, String deduplicationId, - Integer priority) { + Integer priority, + String queuePartitionKey, + String forkedFrom, + String parentWorkflowId) { public WorkflowStatusRow(ResultSet rs) throws SQLException { this( @@ -54,6 +57,9 @@ public WorkflowStatusRow(ResultSet rs) throws SQLException { rs.getString("inputs"), rs.getObject("started_at_epoch_ms", Long.class), rs.getString("deduplication_id"), - rs.getObject("priority", Integer.class)); + rs.getObject("priority", Integer.class), + rs.getString("queue_partition_key"), + rs.getString("forked_from"), + rs.getString("parent_workflow_id")); } }