Skip to content

Commit 17b83fa

Browse files
committed
pass in spread object and reorganize workflowContext
1 parent 53ffbd9 commit 17b83fa

File tree

14 files changed

+389
-357
lines changed

14 files changed

+389
-357
lines changed

example/convex/_generated/api.d.ts

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,9 +11,9 @@
1111
import type * as admin from "../admin.js";
1212
import type * as example from "../example.js";
1313
import type * as nestedWorkflow from "../nestedWorkflow.js";
14+
import type * as passingSignals from "../passingSignals.js";
1415
import type * as transcription from "../transcription.js";
15-
import type * as userConfirmation_steps from "../userConfirmation/steps.js";
16-
import type * as userConfirmation_workflow from "../userConfirmation/workflow.js";
16+
import type * as userConfirmation from "../userConfirmation.js";
1717

1818
import type {
1919
ApiFromModules,
@@ -33,9 +33,9 @@ declare const fullApi: ApiFromModules<{
3333
admin: typeof admin;
3434
example: typeof example;
3535
nestedWorkflow: typeof nestedWorkflow;
36+
passingSignals: typeof passingSignals;
3637
transcription: typeof transcription;
37-
"userConfirmation/steps": typeof userConfirmation_steps;
38-
"userConfirmation/workflow": typeof userConfirmation_workflow;
38+
userConfirmation: typeof userConfirmation;
3939
}>;
4040
declare const fullApiWithMounts: typeof fullApi;
4141

example/convex/passingSignals.ts

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
import { vWorkflowId, WorkflowManager } from "@convex-dev/workflow";
2+
import { components, internal } from "./_generated/api";
3+
import { internalMutation } from "./_generated/server";
4+
import { vEventId } from "../../src/types";
5+
6+
const workflow = new WorkflowManager(components.workflow);
7+
8+
export const signalBasedWorkflow = workflow.define({
9+
args: {},
10+
handler: async (ctx) => {
11+
console.log("Starting signal based workflow");
12+
for (let i = 0; i < 3; i++) {
13+
const signalId = await ctx.runMutation(
14+
internal.passingSignals.createSignal,
15+
{ workflowId: ctx.workflowId },
16+
);
17+
await ctx.awaitEvent({ id: signalId });
18+
console.log("Signal received", signalId);
19+
}
20+
console.log("All signals received");
21+
},
22+
});
23+
24+
export const createSignal = internalMutation({
25+
args: { workflowId: vWorkflowId },
26+
handler: async (ctx, args) => {
27+
const eventId = await workflow.createEvent(ctx, {
28+
name: "signal",
29+
workflowId: args.workflowId,
30+
});
31+
// You would normally store this eventId somewhere to be able to send the
32+
// signal later.
33+
await ctx.scheduler.runAfter(1000, internal.passingSignals.sendSignal, {
34+
eventId,
35+
});
36+
return eventId;
37+
},
38+
});
39+
40+
export const sendSignal = internalMutation({
41+
args: { eventId: vEventId("signal") },
42+
handler: async (ctx, args) => {
43+
await workflow.sendEvent(ctx, { id: args.eventId });
44+
},
45+
});

example/convex/userConfirmation.ts

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
import {
2+
defineEvent,
3+
vWorkflowId,
4+
WorkflowManager,
5+
} from "@convex-dev/workflow";
6+
import { v } from "convex/values";
7+
import { components, internal } from "./_generated/api";
8+
import { internalAction, internalMutation } from "./_generated/server";
9+
10+
export const approvalEvent = defineEvent({
11+
name: "approval" as const,
12+
validator: v.union(
13+
v.object({ approved: v.literal(true), choice: v.number() }),
14+
v.object({ approved: v.literal(false), reason: v.string() }),
15+
),
16+
});
17+
18+
const workflow = new WorkflowManager(components.workflow);
19+
20+
export const confirmationWorkflow = workflow.define({
21+
args: { prompt: v.string() },
22+
returns: v.string(),
23+
handler: async (ctx, args): Promise<string> => {
24+
console.log("Starting confirmation workflow");
25+
const proposals = await ctx.runAction(
26+
internal.userConfirmation.generateProposals,
27+
{ prompt: args.prompt },
28+
{ retry: true },
29+
);
30+
console.log("Proposals generated", proposals);
31+
const approval = await ctx.awaitEvent(approvalEvent);
32+
if (!approval.approved) {
33+
return "rejected: " + approval.reason;
34+
}
35+
const choice = proposals[approval.choice];
36+
console.log("Choice selected", choice);
37+
return choice;
38+
},
39+
});
40+
41+
export const generateProposals = internalAction({
42+
args: { prompt: v.string() },
43+
handler: async (_ctx, _args) => {
44+
// imagine this is a call to an LLM
45+
return ["proposal1", "proposal2", "proposal3"];
46+
},
47+
});
48+
49+
export const chooseProposal = internalMutation({
50+
args: { workflowId: vWorkflowId, choice: v.number() },
51+
handler: async (ctx, args) => {
52+
await workflow.sendEvent(ctx, {
53+
...approvalEvent,
54+
workflowId: args.workflowId,
55+
value: { approved: true, choice: args.choice },
56+
});
57+
return true;
58+
},
59+
});

example/convex/userConfirmation/steps.ts

Lines changed: 0 additions & 31 deletions
This file was deleted.

example/convex/userConfirmation/workflow.ts

Lines changed: 0 additions & 30 deletions
This file was deleted.

src/client/events.ts

Lines changed: 0 additions & 35 deletions
This file was deleted.

src/client/index.ts

Lines changed: 68 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ import type {
33
WorkpoolOptions,
44
WorkpoolRetryOptions,
55
} from "@convex-dev/workpool";
6+
import { parse } from "convex-helpers/validators";
67
import {
78
createFunctionHandle,
89
type FunctionArgs,
@@ -25,24 +26,22 @@ import type {
2526
import type { Step } from "../component/schema.js";
2627
import type {
2728
EventId,
28-
EventSpec,
2929
OnCompleteArgs,
3030
WorkflowId,
3131
WorkflowStep,
3232
} from "../types.js";
3333
import { safeFunctionName } from "./safeFunctionName.js";
34-
import type { OpaqueIds, WorkflowComponent, WorkflowCtx } from "./types.js";
34+
import type { OpaqueIds, WorkflowComponent } from "./types.js";
35+
import type { WorkflowCtx } from "./workflowContext.js";
3536
import { workflowMutation } from "./workflowMutation.js";
36-
import { parse } from "convex-helpers/validators";
3737

3838
export {
3939
vWorkflowId,
40-
type WorkflowId,
4140
vWorkflowStep,
41+
type WorkflowId,
4242
type WorkflowStep,
4343
} from "../types.js";
44-
export type { RunOptions, WorkflowCtx } from "./types.js";
45-
export { defineEvent } from "./events.js";
44+
export type { RunOptions, WorkflowCtx } from "./workflowContext.js";
4645

4746
export type CallbackOptions = {
4847
/**
@@ -263,51 +262,93 @@ export class WorkflowManager {
263262
/**
264263
* Send an event to a workflow.
265264
*
266-
* @param ctx - Either ctx from a mutation/action or a workflow step.
267-
* @param args - The event arguments.
265+
* @param ctx - From a mutation, action or workflow step.
266+
* @param args - Either send an event by its ID, or by name and workflow ID.
267+
* If you have a validator, you must provide a value.
268+
* If you provide an error string, awaiting the event will throw an error.
268269
*/
269270
async sendEvent<T = null, Name extends string = string>(
270271
ctx: RunMutationCtx,
271-
{
272-
workflowId,
273-
event,
274-
value,
275-
}: (
276-
| { workflowId: WorkflowId; event: EventSpec<Name, T> }
277-
| { workflowId?: undefined; event: EventSpec<Name, T> & { id: string } }
272+
args: (
273+
| { workflowId: WorkflowId; name: Name; id?: EventId<Name> }
274+
| { workflowId?: undefined; name?: Name; id: EventId<Name> }
278275
) &
279276
(
280-
| (T extends null
281-
? { value?: null; error?: undefined }
282-
: { value: T; error?: undefined })
277+
| { validator?: undefined; value?: T }
278+
| { validator: Validator<T, any, any>; value: T }
283279
| { error: string; value?: undefined }
284280
),
285281
): Promise<EventId<Name>> {
286-
let result = {
287-
kind: "success" as const,
288-
returnValue: event.validator ? parse(event.validator, value) : value,
289-
} satisfies RunResult;
282+
let result: RunResult =
283+
"error" in args
284+
? {
285+
kind: "failed",
286+
error: args.error,
287+
}
288+
: {
289+
kind: "success" as const,
290+
returnValue: args.validator
291+
? parse(args.validator, args.value)
292+
: "value" in args
293+
? args.value
294+
: null,
295+
};
290296
return (await ctx.runMutation(this.component.event.send, {
291-
eventId: event.id,
297+
eventId: args.id,
292298
result,
293-
name: event.name,
294-
workflowId: workflowId,
299+
name: args.name,
300+
workflowId: args.workflowId,
295301
workpoolOptions: this.options?.workpoolOptions,
296302
})) as EventId<Name>;
297303
}
298304

305+
/**
306+
* Create an event ahead of time, enabling awaiting a specific event by ID.
307+
* @param ctx - From an action, mutation or workflow step.
308+
* @param args - The name of the event and what workflow it belongs to.
309+
* @returns The event ID, which can be used to send the event or await it.
310+
*/
299311
async createEvent<Name extends string>(
300312
ctx: RunMutationCtx,
301-
component: WorkflowComponent,
302313
args: { name: Name; workflowId: WorkflowId },
303314
): Promise<EventId<Name>> {
304-
return (await ctx.runMutation(component.event.create, {
315+
return (await ctx.runMutation(this.component.event.create, {
305316
name: args.name,
306317
workflowId: args.workflowId,
307318
})) as EventId<Name>;
308319
}
309320
}
310321

322+
/**
323+
* Define an event specification: a name and a validator.
324+
* This helps share definitions between workflow.sendEvent and ctx.awaitEvent.
325+
* e.g.
326+
* ```ts
327+
* const approvalEvent = defineEvent({
328+
* name: "approval",
329+
* validator: v.object({ approved: v.boolean() }),
330+
* });
331+
* ```
332+
* Then you can await it in a workflow:
333+
* ```ts
334+
* const result = await ctx.awaitEvent(approvalEvent);
335+
* ```
336+
* And send from somewhere else:
337+
* ```ts
338+
* await workflow.sendEvent(ctx, {
339+
* ...approvalEvent,
340+
* workflowId,
341+
* value: { approved: true },
342+
* });
343+
* ```
344+
*/
345+
export function defineEvent<
346+
Name extends string,
347+
V extends Validator<unknown, "required", string>,
348+
>(spec: { name: Name; validator: V }) {
349+
return spec;
350+
}
351+
311352
type RunQueryCtx = {
312353
runQuery: GenericQueryCtx<GenericDataModel>["runQuery"];
313354
};

src/client/step.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,9 @@ import {
1717
journalEntrySize,
1818
valueSize,
1919
} from "../component/schema.js";
20-
import type { SchedulerOptions, WorkflowComponent } from "./types.js";
20+
import type { WorkflowComponent } from "./types.js";
2121
import { MAX_JOURNAL_SIZE } from "../shared.js";
22-
import type { EventId } from "../types.js";
22+
import type { EventId, SchedulerOptions } from "../types.js";
2323

2424
export type WorkerResult =
2525
| { type: "handlerDone"; runResult: RunResult }

0 commit comments

Comments
 (0)