Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ public record ListQueuedWorkflowsRequest(
String end_time,
String status,
String fork_from,
String parent_workflow_id,
String queue_name,
Integer limit,
Integer offset,
Expand All @@ -37,6 +38,7 @@ public ListWorkflowsInput asInput() {
queue_name,
true, // queuesOnly: only list queued workflows
null, // Executor IDs
fork_from);
fork_from,
parent_workflow_id); // parent workflow id
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ public record ListWorkflowsRequest(
String status,
String application_version,
String fork_from,
String parent_workflow_id,
Integer limit,
Integer offset,
Boolean sort_desc,
Expand All @@ -38,9 +39,10 @@ public ListWorkflowsInput asInput() {
workflow_id_prefix,
load_input,
load_output,
null,
false,
null, // queueName
false, // queuesOnly
null, // Executor IDs
fork_from);
fork_from,
parent_workflow_id);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ public static class Body {
public String end_time;
public String status;
public String forked_from;
public String parent_workflow_id;
public String queue_name;
public Integer limit;
public Integer offset;
Expand All @@ -30,6 +31,7 @@ public static class Builder {
private String end_time;
private String status;
private String forked_from;
private String parent_workflow_id;
private String queue_name;
private Integer limit;
private Integer offset;
Expand Down Expand Up @@ -61,6 +63,11 @@ public Builder forkedFrom(String forkedFrom) {
return this;
}

public Builder parentWorkflowId(String parentWorkflowId) {
this.parent_workflow_id = parentWorkflowId;
return this;
}

public Builder queueName(String queueName) {
this.queue_name = queueName;
return this;
Expand Down Expand Up @@ -97,6 +104,7 @@ public ListQueuedWorkflowsRequest build(String requestId) {
body.end_time = this.end_time;
body.status = this.status;
body.forked_from = this.forked_from;
body.parent_workflow_id = this.parent_workflow_id;
body.queue_name = this.queue_name;
body.limit = this.limit;
body.offset = this.offset;
Expand All @@ -117,6 +125,7 @@ public ListWorkflowsInput asInput() {
.withEndTime(body.end_time != null ? OffsetDateTime.parse(body.end_time) : null)
.withStatus(body.status)
.withForkedFrom(body.forked_from)
.withParentWorkflowId(body.parent_workflow_id)
.withQueueName(body.queue_name)
.withLimit(body.limit)
.withOffset(body.offset)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ public static class Body {
public String end_time;
public String status;
public String forked_from;
public String parent_workflow_id;
public String application_version;
public Integer limit;
public Integer offset;
Expand All @@ -36,6 +37,7 @@ public static class Builder {
private String end_time;
private String status;
private String forked_from;
private String parent_workflow_id;
private String application_version;
private Integer limit;
private Integer offset;
Expand Down Expand Up @@ -83,6 +85,11 @@ public Builder forkedFrom(String forked_from) {
return this;
}

public Builder parentWorkflowId(String parentWorkflowId) {
this.parent_workflow_id = parentWorkflowId;
return this;
}

public Builder applicationVersion(String application_version) {
this.application_version = application_version;
return this;
Expand Down Expand Up @@ -116,6 +123,7 @@ public ListWorkflowsRequest build(String requestId) {
body.end_time = this.end_time;
body.status = this.status;
body.forked_from = this.forked_from;
body.parent_workflow_id = this.parent_workflow_id;
body.application_version = this.application_version;
body.limit = this.limit;
body.offset = this.offset;
Expand All @@ -136,6 +144,7 @@ public ListWorkflowsInput asInput() {
.withEndTime(body.end_time != null ? OffsetDateTime.parse(body.end_time) : null)
.withStatus(body.status)
.withForkedFrom(body.forked_from)
.withParentWorkflowId(body.parent_workflow_id)
.withApplicationVersion(body.application_version)
.withLimit(body.limit)
.withOffset(body.offset)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,15 @@ public class WorkflowsOutput {
public String Priority;
public String QueuePartitionKey;
public String ForkedFrom;
public String ParentWorkflowID;
public String DequeuedAt;

public WorkflowsOutput(WorkflowStatus status) {
Object[] input = status.input();
Object output = status.output();
Long createdAt = status.createdAt();
Long updatedAt = status.updatedAt();
Long startedAt = status.startedAtEpochMs();
String[] authenticatedRoles = status.authenticatedRoles();

this.WorkflowUUID = status.workflowId();
Expand Down Expand Up @@ -69,5 +72,7 @@ public WorkflowsOutput(WorkflowStatus status) {
this.Priority = Objects.requireNonNullElse(status.priority(), 0).toString();
this.QueuePartitionKey = status.queuePartitionKey();
this.ForkedFrom = status.forkedFrom();
this.ParentWorkflowID = status.parentWorkflowId();
this.DequeuedAt = startedAt != null ? Long.toString(startedAt) : null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -779,9 +779,10 @@ public void importWorkflow(List<ExportedWorkflow> workflows) {
executor_id, application_version, application_id,
created_at, updated_at, started_at_epoch_ms,
queue_name, deduplication_id, priority, queue_partition_key,
workflow_timeout_ms, workflow_deadline_epoch_ms, recovery_attempts, forked_from
workflow_timeout_ms, workflow_deadline_epoch_ms,
recovery_attempts, forked_from, parent_workflow_id
) VALUES (
?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?
?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?
)
"""
.formatted(this.schema);
Expand Down Expand Up @@ -870,6 +871,7 @@ public void importWorkflow(List<ExportedWorkflow> workflows) {
stmt.setObject(23, status.deadlineEpochMs());
stmt.setObject(24, status.recoveryAttempts());
stmt.setString(25, status.forkedFrom());
stmt.setString(26, status.parentWorkflowId());

stmt.executeUpdate();
}
Expand Down
26 changes: 17 additions & 9 deletions transact/src/main/java/dev/dbos/transact/database/WorkflowDAO.java
Original file line number Diff line number Diff line change
Expand Up @@ -170,8 +170,8 @@ InsertWorkflowResult insertWorkflowStatus(
executor_id, application_version, application_id,
created_at, updated_at, recovery_attempts,
workflow_timeout_ms, workflow_deadline_epoch_ms,
owner_xid
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
parent_workflow_id, owner_xid
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT (workflow_uuid)
DO UPDATE SET
recovery_attempts = CASE
Expand Down Expand Up @@ -222,9 +222,10 @@ ON CONFLICT (workflow_uuid)

stmt.setObject(20, status.timeoutMs());
stmt.setObject(21, status.deadlineEpochMs());
stmt.setString(22, status.parentWorkflowId());

stmt.setObject(22, ownerXid);
stmt.setInt(23, incrementAttempts ? 1 : 0);
stmt.setObject(23, ownerXid);
stmt.setInt(24, incrementAttempts ? 1 : 0);

try (ResultSet rs = stmt.executeQuery()) {
if (rs.next()) {
Expand Down Expand Up @@ -326,14 +327,15 @@ WorkflowStatus getWorkflowStatus(Connection conn, String workflowId) throws SQLE
var sql =
"""
SELECT
workflow_uuid, status, forked_from,
workflow_uuid, status,
name, class_name, config_name,
inputs, output, error,
queue_name, deduplication_id, priority, queue_partition_key,
executor_id, application_version, application_id,
authenticated_user, assumed_role, authenticated_roles,
created_at, updated_at, recovery_attempts, started_at_epoch_ms,
workflow_timeout_ms, workflow_deadline_epoch_ms
workflow_timeout_ms, workflow_deadline_epoch_ms,
forked_from, parent_workflow_id
FROM %s.workflow_status
WHERE workflow_uuid = ?
"""
Expand Down Expand Up @@ -367,13 +369,14 @@ List<WorkflowStatus> listWorkflows(ListWorkflowsInput input) throws SQLException
sqlBuilder.append(
"""
SELECT
workflow_uuid, status, forked_from,
workflow_uuid, status,
name, class_name, config_name,
queue_name, deduplication_id, priority, queue_partition_key,
executor_id, application_version, application_id,
authenticated_user, assumed_role, authenticated_roles,
created_at, updated_at, recovery_attempts, started_at_epoch_ms,
workflow_timeout_ms, workflow_deadline_epoch_ms
workflow_timeout_ms, workflow_deadline_epoch_ms,
forked_from, parent_workflow_id
""");

var loadInput = input.loadInput() == null || input.loadInput();
Expand Down Expand Up @@ -413,6 +416,10 @@ List<WorkflowStatus> listWorkflows(ListWorkflowsInput input) throws SQLException
whereConditions.add("forked_from = ?");
parameters.add(input.forkedFrom());
}
if (input.parentWorkflowId() != null) {
whereConditions.add("parent_workflow_id = ?");
parameters.add(input.parentWorkflowId());
}
if (input.workflowIdPrefix() != null) {
whereConditions.add("workflow_uuid LIKE ?");
// Append wildcard directly to the parameter value
Expand Down Expand Up @@ -544,7 +551,8 @@ private static WorkflowStatus resultsToWorkflowStatus(
rs.getString("deduplication_id"),
rs.getObject("priority", Integer.class),
rs.getString("queue_partition_key"),
rs.getString("forked_from"));
rs.getString("forked_from"),
rs.getString("parent_workflow_id"));
return info;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1400,7 +1400,8 @@ private static WorkflowInitResult preInvokeWorkflow(
null,
null,
timeoutMs,
deadlineEpochMs);
deadlineEpochMs,
parentWorkflow != null ? parentWorkflow.workflowId() : null);

WorkflowInitResult[] initResult = {null};
initResult[0] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,15 @@ static void runDbosMigrations(Connection conn, String schema, List<String> migra
public static List<String> getMigrations(String schema) {
Objects.requireNonNull(schema);
var migrations =
List.of(migration1, migration2, migration3, migration4, migration5, migration6, migration7);
List.of(
migration1,
migration2,
migration3,
migration4,
migration5,
migration6,
migration7,
migration8);
return migrations.stream().map(m -> m.formatted(schema)).toList();
}

Expand Down Expand Up @@ -393,4 +401,10 @@ FOREIGN KEY (workflow_uuid) REFERENCES %1$s.workflow_status(workflow_uuid)
"""
ALTER TABLE %1$s."workflow_status" ADD COLUMN "owner_xid" VARCHAR(40) DEFAULT NULL
""";

static final String migration8 =
"""
ALTER TABLE %1$s."workflow_status" ADD COLUMN "parent_workflow_id" TEXT DEFAULT NULL;
CREATE INDEX "idx_workflow_status_parent_workflow_id" ON %1$s."workflow_status" ("parent_workflow_id");
""";
}
Loading