Skip to content

Commit ce2658e

Browse files
committed
fix various pr commends from runkey stack
1 parent 878a4d1 commit ce2658e

File tree

14 files changed

+113
-71
lines changed

14 files changed

+113
-71
lines changed

pkgs/dsl/__tests__/types/dsl-types.test-d.ts

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,24 @@ describe('Flow Type System Tests', () => {
7878
});
7979
});
8080

81+
describe('Empty dependsOn array handling', () => {
82+
it('should reject empty dependsOn array at compile time', () => {
83+
// Empty dependsOn: [] is semantically meaningless - if you have no dependencies,
84+
// simply omit dependsOn. Allowing it creates a type mismatch where:
85+
// - TypeScript infers deps as {} (empty object from never[])
86+
// - Runtime treats it as a root step and passes flowInput
87+
//
88+
// This test verifies that dependsOn: [] is rejected at compile time.
89+
new Flow<{ userId: string }>({ slug: 'test_flow' })
90+
.step({ slug: 'root' }, () => ({ value: 1 }))
91+
// @ts-expect-error - empty dependsOn array should be rejected
92+
.step({ slug: 'bad_step', dependsOn: [] }, (deps) => {
93+
// If this compiled, deps would be {} but runtime would pass { userId: string }
94+
return { result: deps };
95+
});
96+
});
97+
});
98+
8199
describe('Multi-level dependencies', () => {
82100
it('should correctly type multi-level dependencies', () => {
83101
new Flow<string>({ slug: 'test_flow' })

pkgs/dsl/src/dsl.ts

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -431,12 +431,13 @@ export class Flow<
431431
>;
432432

433433
// Overload 2: Dependent step (with dependsOn) - receives deps, flowInput via context
434+
// Note: [Deps, ...Deps[]] requires at least one dependency - empty arrays are rejected at compile time
434435
step<
435436
Slug extends string,
436437
Deps extends Extract<keyof Steps, string>,
437438
TOutput
438439
>(
439-
opts: Simplify<{ slug: Slug extends keyof Steps ? never : Slug; dependsOn: Deps[] } & StepRuntimeOptions>,
440+
opts: Simplify<{ slug: Slug extends keyof Steps ? never : Slug; dependsOn: [Deps, ...Deps[]] } & StepRuntimeOptions>,
440441
handler: (
441442
deps: { [K in Deps]: K extends keyof Steps ? Steps[K] : never },
442443
context: FlowContext<TEnv, TFlowInput> & TContext
@@ -537,12 +538,13 @@ export class Flow<
537538
>;
538539

539540
// Overload 2: Dependent array (with dependsOn) - receives deps, flowInput via context
541+
// Note: [Deps, ...Deps[]] requires at least one dependency - empty arrays are rejected at compile time
540542
array<
541543
Slug extends string,
542544
Deps extends Extract<keyof Steps, string>,
543545
TOutput extends readonly any[]
544546
>(
545-
opts: Simplify<{ slug: Slug extends keyof Steps ? never : Slug; dependsOn: Deps[] } & StepRuntimeOptions>,
547+
opts: Simplify<{ slug: Slug extends keyof Steps ? never : Slug; dependsOn: [Deps, ...Deps[]] } & StepRuntimeOptions>,
546548
handler: (
547549
deps: { [K in Deps]: K extends keyof Steps ? Steps[K] : never },
548550
context: FlowContext<TEnv, TFlowInput> & TContext

pkgs/edge-worker/src/flow/StepTaskExecutor.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -111,8 +111,8 @@ export class StepTaskExecutor<TFlow extends AnyFlow, TContext extends StepTaskHa
111111
// Map steps: SQL already extracted element
112112
handlerInput = this.stepTask.input;
113113
} else if (stepDef.dependencies.length === 0) {
114-
// Root single/array step: use flowInput from context (await the Promise)
115-
handlerInput = await this.context.flowInput;
114+
// Root single/array step: flow_input is guaranteed by SQL (start_tasks)
115+
handlerInput = this.stepTask.flow_input!;
116116
} else {
117117
// Dependent single/array step: use deps object from task input
118118
handlerInput = this.stepTask.input;

pkgs/website/src/code-examples/analyze_website_simplified.ts.raw

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,13 @@ new Flow<{ url: string; user_id: string }>({
1515
)
1616
.step(
1717
{ slug: 'saveToDb', dependsOn: ['summary', 'tags'] },
18-
async (deps, ctx) =>
19-
await saveWebsite({
20-
user_id: ctx.flowInput.user_id,
21-
website_url: ctx.flowInput.url,
18+
async (deps, ctx) => {
19+
const flowInput = await ctx.flowInput;
20+
return saveWebsite({
21+
user_id: flowInput.user_id,
22+
website_url: flowInput.url,
2223
summary: deps.summary,
2324
tags: deps.tags,
24-
})
25+
});
26+
}
2527
);

pkgs/website/src/content/docs/build/process-arrays-in-parallel.mdx

Lines changed: 14 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -71,22 +71,22 @@ const CsvProcessor = new Flow<{ csvUrl: string }>({
7171

7272
## Enriching Array Elements with Additional Data
7373

74-
Map handlers only receive individual array elements. If a handler needs access to the original flow input, outputs from other dependencies, or any additional data, include that data in the array elements via a previous step.
74+
Map handlers can access flow input via `await ctx.flowInput`, but for better performance, include needed data directly in the array elements via a previous step.
7575

76-
### Problem: Map Handlers Can't Access Flow Context
76+
### Inefficient: Awaiting flowInput in Each Map Task
7777

78-
```typescript "apiKey: string" del="id, ???" del="(id)"
79-
// This won't work - map handler can't access ctx.flowInput
80-
const ProblemFlow = new Flow<{ apiKey: string, ids: string[] }>({
81-
slug: 'problemFlow',
78+
```typescript "apiKey: string" del="await ctx.flowInput" del="(id, ctx)"
79+
// Works, but inefficient - fetches flowInput for each of N tasks
80+
const InefficientFlow = new Flow<{ apiKey: string, ids: string[] }>({
81+
slug: 'inefficientFlow',
8282
})
83-
.map({ slug: 'fetch' }, async (id) => {
84-
// Can't access flowInput.apiKey here!
85-
return await fetchWithKey(id, ???); // No access to apiKey
83+
.map({ slug: 'fetch' }, async (id, ctx) => {
84+
const flowInput = await ctx.flowInput; // N fetches for N items
85+
return await fetchWithKey(id, flowInput.apiKey);
8686
});
8787
```
8888

89-
### Solution: Enrich Array Elements
89+
### Better: Enrich Array Elements
9090

9191
```typescript ins={4-13} ins="item.id, item.apiKey" ins="(item)"
9292
const SolutionFlow = new Flow<{ apiKey: string, ids: string[] }>({
@@ -113,6 +113,10 @@ const SolutionFlow = new Flow<{ apiKey: string, ids: string[] }>({
113113

114114
This pattern applies whenever a map handler needs any data beyond the array elements themselves. Add a step before the map that enriches the array elements with whatever data the handler needs - whether that's the original flow input, outputs from other dependencies, or both.
115115

116+
:::tip[Why is ctx.flowInput async?]
117+
`ctx.flowInput` is lazy-loaded to prevent data duplication. For map steps processing thousands of items, including the full flow input in each task record would multiply storage and transfer costs. Instead, it's fetched on-demand when you `await` it. The enrichment pattern above is more efficient because it includes only the data each item actually needs.
118+
:::
119+
116120
:::note[Debugging Map Tasks]
117121
When debugging map steps, use [`context.stepTask.task_index`](/reference/context/#steptask) to identify which array element each task is processing.
118122
:::

pkgs/website/src/content/docs/concepts/map-steps.mdx

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,7 @@ Map step handlers receive only the individual array element, not the full input
119119
```typescript "deps"
120120
.step({ slug: 'regular', dependsOn: ['source'] }, async (deps, ctx) => {
121121
// deps contains outputs from dependencies
122-
// ctx.flowInput provides access to the original flow input
122+
// await ctx.flowInput provides access to the original flow input
123123
return processAllData(deps.source);
124124
})
125125
```
@@ -128,13 +128,17 @@ Map step handlers receive only the individual array element, not the full input
128128
```typescript "item"
129129
.map({ slug: 'mapStep', array: 'source' }, async (item, ctx) => {
130130
// item is ONLY the individual array element
131-
// ctx.flowInput provides access to the original flow input
131+
// await ctx.flowInput provides access to the original flow input
132132
return processItem(item);
133133
})
134134
```
135135

136136
This constraint keeps map handlers focused on transforming individual elements. If a map handler needs additional data from other step outputs, include that data in the array elements via a previous step. The context parameter provides access to flow input when needed.
137137

138+
:::tip[Accessing flowInput in map handlers]
139+
While `await ctx.flowInput` works in map handlers, it's lazy-loaded and fetched separately for each task. For better performance with large arrays, enrich your array elements with the data they need before mapping. See [Enriching Array Elements](/build/process-arrays-in-parallel/#enriching-array-elements-with-additional-data) for the recommended pattern.
140+
:::
141+
138142
## Root Maps vs Dependent Maps
139143

140144
Map steps operate in two modes:

pkgs/website/src/content/docs/concepts/understanding-flows.mdx

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -41,14 +41,14 @@ Both step types receive a **context object** as the second parameter, which prov
4141

4242
:::note[Accessing Flow Input]
4343
- Root steps: Access directly via the first parameter (e.g., `flowInput.url`)
44-
- Dependent steps: Access via context (e.g., `ctx.flowInput.url`)
44+
- Dependent steps: Access via context (e.g., `await ctx.flowInput` - it's lazy-loaded)
4545

4646
This asymmetric design matches the domain - root and dependent steps ARE different.
4747
:::
4848

4949
Consider this example:
5050

51-
```typescript "flowInput.url" "ctx.flowInput.userId" "deps.scrape.content"
51+
```typescript "flowInput.url" "await ctx.flowInput" "deps.scrape.content"
5252
new Flow<{ url: string, userId: string }>({
5353
slug: 'analyzeWebsite',
5454
})
@@ -63,7 +63,7 @@ new Flow<{ url: string, userId: string }>({
6363
{ slug: 'analyze', dependsOn: ['scrape'] },
6464
async (deps, ctx) => {
6565
// deps contains dependency outputs
66-
// ctx.flowInput provides access to original flow input if needed
66+
// await ctx.flowInput provides access to original flow input if needed
6767
return await analyzeContent(deps.scrape.content);
6868
}
6969
);
@@ -72,7 +72,7 @@ new Flow<{ url: string, userId: string }>({
7272
When this flow runs:
7373
1. The flow receives an input object (e.g., `{ url: "example.com", userId: "123" }`)
7474
2. Root steps receive the flow input directly as the first parameter
75-
3. Dependent steps receive their dependencies as the first parameter, with flow input available via `ctx.flowInput`
75+
3. Dependent steps receive their dependencies as the first parameter, with flow input available via `await ctx.flowInput`
7676

7777
## The Type System
7878

@@ -272,7 +272,7 @@ The TypeScript code is used only for definition and compilation, not for executi
272272

273273
Here's a practical example showing how data flows efficiently through steps:
274274

275-
```typescript "flowInput.userId" "ctx.flowInput.reportType" "ctx.flowInput.includeDetails" {21,30,38}
275+
```typescript "flowInput.userId" "await ctx.flowInput" {22-23,32-33,42-43}
276276
// Flow with multiple input parameters
277277
type Input = {
278278
userId: string,
@@ -294,35 +294,36 @@ new Flow<Input>({
294294
.step(
295295
{ slug: 'activity', dependsOn: ['user'] },
296296
async (deps, ctx) => {
297-
// Uses ctx.flowInput.reportType to determine timespan
298-
const timespan = ctx.flowInput.reportType === 'advanced' ? '1y' : '30d';
297+
const flowInput = await ctx.flowInput;
298+
const timespan = flowInput.reportType === 'advanced' ? '1y' : '30d';
299299
return await getUserActivity(deps.user.id, timespan);
300300
}
301301
)
302302
.step(
303303
{ slug: 'preferences', dependsOn: ['user'] },
304304
async (deps, ctx) => {
305-
// Uses ctx.flowInput.includeDetails parameter
306-
return await getUserPreferences(deps.user.id, ctx.flowInput.includeDetails);
305+
const flowInput = await ctx.flowInput;
306+
return await getUserPreferences(deps.user.id, flowInput.includeDetails);
307307
}
308308
)
309309
// Step 4: Combine results
310310
.step(
311311
{ slug: 'report', dependsOn: ['activity', 'preferences'] },
312312
async (deps, ctx) => {
313+
const flowInput = await ctx.flowInput;
313314
return {
314315
user: deps.user,
315316
activity: deps.activity,
316317
preferences: deps.preferences,
317-
reportType: ctx.flowInput.reportType, // Original parameter still available
318+
reportType: flowInput.reportType,
318319
generatedAt: new Date().toISOString()
319320
};
320321
}
321322
);
322323
```
323324

324325
This example demonstrates:
325-
1. **Original parameters available via context** - Dependent steps access flow input via `ctx.flowInput`
326+
1. **Original parameters available via context** - Dependent steps access flow input via `await ctx.flowInput`
326327
2. **Conditional processing** - Steps adapt behavior based on original parameters
327328
3. **No manual parameter forwarding needed** - The `user` step doesn't need to include original parameters
328329
4. **Type safety throughout** - TypeScript ensures all data accesses are valid

pkgs/website/src/content/docs/get-started/flows/create-flow.mdx

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ type Input = {
6262
};
6363
```
6464

65-
This defines what data the flow accepts when started. Root steps receive this directly, while dependent steps access it via `ctx.flowInput`.
65+
This defines what data the flow accepts when started. Root steps receive this directly, while dependent steps access it via `await ctx.flowInput`.
6666

6767
### Flow constructor
6868

@@ -113,7 +113,7 @@ export const GreetUser = new Flow<Input>({
113113
(deps) => ... // deps = { fullName: "John Doe" }
114114
```
115115

116-
Need flow input in a dependent step? Use `ctx.flowInput`.
116+
Need flow input in a dependent step? Use `await ctx.flowInput`.
117117
</Aside>
118118

119119
## Exporting flows
@@ -145,7 +145,7 @@ Recompiling **deletes the existing flow and all its run data**. See [Startup Com
145145

146146
:::note[Key Concepts]
147147
- **Root steps**: Receive flow input directly as first parameter
148-
- **Dependent steps**: Receive dependency outputs as first parameter, access flow input via `ctx.flowInput`
148+
- **Dependent steps**: Receive dependency outputs as first parameter, access flow input via `await ctx.flowInput`
149149
- **step slugs**: Unique identifiers for each step
150150
- **dependsOn**: Specifies which steps must complete before a step can run
151151
- **JSON serialization**: All inputs and outputs must be JSON-serializable

pkgs/website/src/content/docs/index.mdx

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -334,12 +334,15 @@ new Flow<{ url: string }>({ slug: 'analyzeArticle' })
334334
(deps) => extractKeywords(deps.fetchArticle)
335335
)
336336
.step({ slug: 'publish', dependsOn: ['summarize', 'extractKeywords'] },
337-
(deps, ctx) => publishArticle({
338-
url: ctx.flowInput.url,
339-
content: deps.fetchArticle,
340-
summary: deps.summarize,
341-
keywords: deps.extractKeywords
342-
})
337+
async (deps, ctx) => {
338+
const flowInput = await ctx.flowInput;
339+
return publishArticle({
340+
url: flowInput.url,
341+
content: deps.fetchArticle,
342+
summary: deps.summarize,
343+
keywords: deps.extractKeywords
344+
});
345+
}
343346
);
344347
```
345348

pkgs/website/src/content/docs/reference/compile-api.mdx

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -30,10 +30,10 @@ const MyFlow = new Flow<Input>({
3030
timeout: 60,
3131
})
3232
.step({ slug: 'fetch' }, (flowInput) => flowInput.url)
33-
.step({ slug: 'process', dependsOn: ['fetch'] }, (deps, ctx) => ({
34-
url: ctx.flowInput.url,
35-
content: deps.fetch,
36-
}));
33+
.step({ slug: 'process', dependsOn: ['fetch'] }, async (deps, ctx) => {
34+
const flowInput = await ctx.flowInput;
35+
return { url: flowInput.url, content: deps.fetch };
36+
});
3737

3838
const statements = compileFlow(MyFlow);
3939
// Returns: string[]

0 commit comments

Comments
 (0)