Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 13 additions & 13 deletions apps/demo/src/lib/data/flow-code.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,50 +26,50 @@ export const FLOW_SECTIONS: Record<string, CodeSection> = {
fetchArticle: {
code: ` .step(
{ slug: 'fetchArticle' },
(input) => scrapeUrl(input.run.url)
(flowInput) => scrapeUrl(flowInput.url)
)`,
mobileCode: ` .step(
{ slug: 'fetchArticle' },
(input) => scrapeUrl(
input.run.url
(flowInput) => scrapeUrl(
flowInput.url
)
)`
},
summarize: {
code: ` .step(
{ slug: 'summarize', dependsOn: ['fetchArticle'] },
(input) => summarize(schema, input.fetchArticle.content)
(deps) => summarize(schema, deps.fetchArticle.content)
)`,
mobileCode: ` .step(
{
slug: 'summarize',
dependsOn: ['fetchArticle']
},
(input) => summarize(
(deps) => summarize(
schema,
input.fetchArticle.content
deps.fetchArticle.content
)
)`
},
extractKeywords: {
code: ` .step(
{ slug: 'extractKeywords', dependsOn: ['fetchArticle'] },
(input) => extractKeywords(input.fetchArticle.content)
(deps) => extractKeywords(deps.fetchArticle.content)
)`,
mobileCode: ` .step(
{
slug: 'extractKeywords',
dependsOn: ['fetchArticle']
},
(input) => extractKeywords(
input.fetchArticle.content
(deps) => extractKeywords(
deps.fetchArticle.content
)
)`
},
publish: {
code: ` .step(
{ slug: 'publish', dependsOn: ['summarize', 'extractKeywords'] },
(input) => publishArticle(input.summarize, input.extractKeywords)
(deps) => publishArticle(deps.summarize, deps.extractKeywords)
);`,
mobileCode: ` .step(
{
Expand All @@ -79,9 +79,9 @@ export const FLOW_SECTIONS: Record<string, CodeSection> = {
'extractKeywords'
]
},
(input) => publishArticle(
input.summarize,
input.extractKeywords
(deps) => publishArticle(
deps.summarize,
deps.extractKeywords
)
);`
}
Expand Down
20 changes: 8 additions & 12 deletions apps/demo/supabase/functions/article_flow_worker/article_flow.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@ export default new Flow<{ url: string }>({
baseDelay: 1,
maxAttempts: 2
})
.step({ slug: 'fetchArticle' }, async (input) => {
.step({ slug: 'fetchArticle' }, async (flowInput) => {
await sleep(SLEEP_MS);
const startTime = Date.now();
const result = await fetchArticle(input.run.url);
const result = await fetchArticle(flowInput.url);
const durationMs = Date.now() - startTime;
return {
...result,
Expand All @@ -30,16 +30,12 @@ export default new Flow<{ url: string }>({
}
};
})
.step({ slug: 'summarize', dependsOn: ['fetchArticle'], baseDelay: 1 }, async (input) =>
summarizeArticle(input.fetchArticle.content)
.step({ slug: 'summarize', dependsOn: ['fetchArticle'], baseDelay: 1 }, async (deps) =>
summarizeArticle(deps.fetchArticle.content)
)
.step({ slug: 'extractKeywords', dependsOn: ['fetchArticle'] }, async (input) =>
extractKeywords(input.fetchArticle.content)
.step({ slug: 'extractKeywords', dependsOn: ['fetchArticle'] }, async (deps) =>
extractKeywords(deps.fetchArticle.content)
)
.step({ slug: 'publish', dependsOn: ['summarize', 'extractKeywords'] }, async (input) =>
publishArticle(
input.summarize.summary,
input.summarize.sentiment,
input.extractKeywords.keywords
)
.step({ slug: 'publish', dependsOn: ['summarize', 'extractKeywords'] }, async (deps) =>
publishArticle(deps.summarize.summary, deps.summarize.sentiment, deps.extractKeywords.keywords)
);
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ describe('createFlowsDirectory', () => {
// Second step should depend on first
expect(greetUserContent).toContain("dependsOn: ['fullName']");
// Second step should access result from first step
expect(greetUserContent).toContain('input.fullName');
expect(greetUserContent).toContain('deps.fullName');
});

it('should not create files when they already exist', async () => {
Expand Down
22 changes: 11 additions & 11 deletions pkgs/cli/examples/analyze_website.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,11 @@ const AnalyzeWebsite = new Flow<WebsiteAnalysisInput>({
timeout: 10,
})
// First step: scrape the website
.step({ slug: 'website' }, async (input) => {
console.log(`Scraping website: ${input.run.url}`);
.step({ slug: 'website' }, async (flowInput) => {
console.log(`Scraping website: ${flowInput.url}`);
// In a real implementation, this would call an actual web scraper
return {
content: `Sample content from ${input.run.url}`,
content: `Sample content from ${flowInput.url}`,
title: 'Sample Website Title',
links: ['https://example.com/page1', 'https://example.com/page2'],
};
Expand All @@ -31,8 +31,8 @@ const AnalyzeWebsite = new Flow<WebsiteAnalysisInput>({
timeout: 30,
maxAttempts: 5,
},
async (input) => {
console.log(`Analyzing sentiment for: ${input.website.title}`);
async (deps) => {
console.log(`Analyzing sentiment for: ${deps.website.title}`);
// In a real implementation, this would call a sentiment analysis service
return {
score: 0.75,
Expand All @@ -42,19 +42,19 @@ const AnalyzeWebsite = new Flow<WebsiteAnalysisInput>({
}
)
// Third step: generate summary (depends on website step)
.step({ slug: 'summary', dependsOn: ['website'] }, async (input) => {
console.log(`Generating summary for: ${input.website.title}`);
.step({ slug: 'summary', dependsOn: ['website'] }, async (deps) => {
console.log(`Generating summary for: ${deps.website.title}`);
// In a real implementation, this might use an AI service
return {
aiSummary: `This is a summary of ${input.website.title}`,
wordCount: input.website.content.split(' ').length,
aiSummary: `This is a summary of ${deps.website.title}`,
wordCount: deps.website.content.split(' ').length,
};
})
// Fourth step: save results to database (depends on sentiment and summary)
.step(
{ slug: 'saveToDb', dependsOn: ['sentiment', 'summary'] },
async (input) => {
console.log(`Saving results to database for: ${input.run.url}`);
async (_deps, ctx) => {
console.log(`Saving results to database for: ${ctx.flowInput.url}`);
// In a real implementation, this would save to a database
return {
status: 'success',
Expand Down
4 changes: 2 additions & 2 deletions pkgs/cli/src/commands/install/create-flows-directory.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,11 @@ export const GreetUser = new Flow<Input>({
})
.step(
{ slug: 'fullName' },
(input) => \`\${input.run.firstName} \${input.run.lastName}\`
(flowInput) => \`\${flowInput.firstName} \${flowInput.lastName}\`
)
.step(
{ slug: 'greeting', dependsOn: ['fullName'] },
(input) => \`Hello, \${input.fullName}!\`
(deps) => \`Hello, \${deps.fullName}!\`
);
`;

Expand Down
4 changes: 2 additions & 2 deletions pkgs/cli/supabase/flows/test_flow_e2e.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,6 @@ import { Flow } from '@pgflow/dsl';
export const TestFlowE2E = new Flow<{ value: string }>({
slug: 'test_flow_e2e',
maxAttempts: 3,
}).step({ slug: 'step1' }, async (input) => ({
result: `processed: ${input.run.value}`,
}).step({ slug: 'step1' }, async (flowInput) => ({
result: `processed: ${flowInput.value}`,
}));
14 changes: 7 additions & 7 deletions pkgs/client/__tests__/e2e/full-stack-dsl.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,20 +19,20 @@ describe('Full Stack DSL Integration', () => {
const SimpleFlow = new Flow<{ url: string }>({
slug: 'simple_dag_test',
})
.step({ slug: 'fetch' }, async (_input) => ({
.step({ slug: 'fetch' }, async (_flowInput) => ({
data: 'fetched content',
status: 200,
items: 10,
}))
.step({ slug: 'process', dependsOn: ['fetch'] }, async (input) => ({
.step({ slug: 'process', dependsOn: ['fetch'] }, async (deps) => ({
processed_data: 'cleaned and validated',
item_count: input.fetch.items * 2,
item_count: deps.fetch.items * 2,
metadata: { stage: 'processed' },
}))
.step({ slug: 'save', dependsOn: ['process'] }, async (input) => ({
.step({ slug: 'save', dependsOn: ['process'] }, async (deps) => ({
saved: true,
record_id: 'rec_12345',
final_count: input.process.item_count,
final_count: deps.process.item_count,
}));

// Clean up flow data to ensure clean state
Expand Down Expand Up @@ -95,7 +95,7 @@ describe('Full Stack DSL Integration', () => {
let tasks = await readAndStart(sql, sqlClient, SimpleFlow.slug, 1, 5);
expect(tasks).toHaveLength(1);
expect(tasks[0].step_slug).toBe('fetch');
expect(tasks[0].input.run).toEqual(input);
expect(tasks[0].flow_input).toEqual(input);

const fetchOutput = { data: 'fetched content', status: 200, items: 10 };
await sqlClient.completeTask(tasks[0], fetchOutput);
Expand All @@ -111,7 +111,7 @@ describe('Full Stack DSL Integration', () => {
tasks = await readAndStart(sql, sqlClient, SimpleFlow.slug, 1, 5);
expect(tasks).toHaveLength(1);
expect(tasks[0].step_slug).toBe('process');
expect(tasks[0].input.run).toEqual(input);
expect(tasks[0].flow_input).toEqual(input);
expect(tasks[0].input.fetch).toEqual(fetchOutput); // Critical: dependency output included

const processOutput = {
Expand Down
4 changes: 2 additions & 2 deletions pkgs/client/__tests__/e2e/happy-path-e2e.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ describe('Happy Path E2E Integration', () => {
let tasks = await readAndStart(sql, sqlClient, testFlow.slug, 1, 5);
expect(tasks).toHaveLength(1);
expect(tasks[0].step_slug).toBe('fetch');
expect(tasks[0].input.run).toEqual(input);
expect(tasks[0].flow_input).toEqual(input);

const fetchOutput = { data: 'fetched content', status: 200, items: 10 };
await sqlClient.completeTask(tasks[0], fetchOutput);
Expand All @@ -88,7 +88,7 @@ describe('Happy Path E2E Integration', () => {
tasks = await readAndStart(sql, sqlClient, testFlow.slug, 1, 5);
expect(tasks).toHaveLength(1);
expect(tasks[0].step_slug).toBe('process');
expect(tasks[0].input.run).toEqual(input);
expect(tasks[0].flow_input).toEqual(input);
expect(tasks[0].input.fetch).toEqual(fetchOutput);

const processOutput = {
Expand Down
2 changes: 1 addition & 1 deletion pkgs/client/__tests__/e2e/real-flow-execution.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ describe('Real Flow Execution', () => {
const tasks = await readAndStart(sql, sqlClient, testFlow.slug, 1, 5);

expect(tasks).toHaveLength(1);
expect(tasks[0].input.run).toEqual(input);
expect(tasks[0].flow_input).toEqual(input);

// Complete task with complex nested output to verify JSON parsing
const taskOutput = {
Expand Down
10 changes: 5 additions & 5 deletions pkgs/dsl/tests/context-inference.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -77,10 +77,10 @@ interface ExplicitContext {
}

const ExplicitFlow = new Flow<{ userId: string }, ExplicitContext>({ slug: 'explicit-flow' })
.step({ slug: 'get-user' }, async (input, context) => {
.step({ slug: 'get-user' }, async (flowInput, context) => {
// All ExplicitContext properties should be available
const user = await context.sql.query(`SELECT * FROM users WHERE id = ${input.run.userId}`);
await context.cache.set(`user:${input.run.userId}`, JSON.stringify(user));
const user = await context.sql.query(`SELECT * FROM users WHERE id = ${flowInput.userId}`);
await context.cache.set(`user:${flowInput.userId}`, JSON.stringify(user));
context.pubsub.publish('user-fetched');
return user;
});
Expand All @@ -95,9 +95,9 @@ const _explicitTest: ExplicitFlowContext = {

// Test 5: Mixed approach - explicit base with inferred additions
const MixedFlow = new Flow<{ id: string }, { sql: TestSql }>({ slug: 'mixed-flow' })
.step({ slug: 'query' }, async (input, context) => {
.step({ slug: 'query' }, async (flowInput, context) => {
// Has sql from explicit type
return await context.sql.query(`SELECT * FROM items WHERE id = ${input.run.id}`);
return await context.sql.query(`SELECT * FROM items WHERE id = ${flowInput.id}`);
})
.step({ slug: 'enhance' }, async (input, context: Context & { sql: TestSql, ai: { generate: () => string } }) => {
// Adds ai requirement via inference
Expand Down
6 changes: 3 additions & 3 deletions pkgs/website/src/content/docs/build/create-reusable-tasks.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ To create task functions that can be used across multiple flows without tight co

Let the flow's step handler extract needed data from the flow context:

```typescript "input.run.userId"
```typescript "flowInput.userId"
// Your task function
async function fetchUserProfile(userId: string) {
const response = await fetch(`https://api.example.com/users/${userId}`);
Expand All @@ -62,8 +62,8 @@ To create task functions that can be used across multiple flows without tight co

// Flow uses handler to adapt context to task parameters
new Flow<{ userId: string }>({ slug: 'userFlow' })
.step({ slug: 'profile' }, async (input) =>
await fetchUserProfile(input.run.userId)
.step({ slug: 'profile' }, async (flowInput) =>
await fetchUserProfile(flowInput.userId)
)
```

Expand Down
26 changes: 13 additions & 13 deletions pkgs/website/src/content/docs/build/process-arrays-in-parallel.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ const ScrapeMultipleUrls = new Flow<string[]>({ // Flow input must be array for
)
.step(
{ slug: 'summary', dependsOn: ['scrapedPages'] },
(input) => summarizeResults(input.scrapedPages)
(deps) => summarizeResults(deps.scrapedPages)
);

// Usage (SQL):
Expand All @@ -53,7 +53,7 @@ const CsvProcessor = new Flow<{ csvUrl: string }>({
})
.array(
{ slug: 'csvRows' },
async (input) => await fetchAndParseCSV(input.run.csvUrl)
async (flowInput) => await fetchAndParseCSV(flowInput.csvUrl)
)
.map(
{ slug: 'validatedRows', array: 'csvRows' },
Expand All @@ -65,23 +65,23 @@ const CsvProcessor = new Flow<{ csvUrl: string }>({
)
.step(
{ slug: 'saveResults', dependsOn: ['processedRows'] },
async (input, context) => await saveProcessedData(input.processedRows, context)
async (deps, ctx) => await saveProcessedData(deps.processedRows, ctx)
);
```

## Enriching Array Elements with Additional Data

Map handlers only receive individual array elements. If a handler needs access to the original flow input (`run`), outputs from other dependencies, or any additional data, include that data in the array elements via a previous step.
Map handlers only receive individual array elements. If a handler needs access to the original flow input, outputs from other dependencies, or any additional data, include that data in the array elements via a previous step.

### Problem: Map Handlers Can't Access Flow Context

```typescript "apiKey: string" del="id, ???" del="(id)"
// This won't work - map handler can't access input.run
// This won't work - map handler can't access ctx.flowInput
const ProblemFlow = new Flow<{ apiKey: string, ids: string[] }>({
slug: 'problemFlow',
})
.map({ slug: 'fetch' }, async (id) => {
// Can't access input.run.apiKey here!
// Can't access flowInput.apiKey here!
return await fetchWithKey(id, ???); // No access to apiKey
});
```
Expand All @@ -94,11 +94,11 @@ const SolutionFlow = new Flow<{ apiKey: string, ids: string[] }>({
})
.array(
{ slug: 'prepareItems' },
(input) => {
(flowInput) => {
// Include needed context in each element
return input.run.ids.map(id => ({
return flowInput.ids.map(id => ({
id,
apiKey: input.run.apiKey
apiKey: flowInput.apiKey
}));
}
)
Expand All @@ -111,7 +111,7 @@ const SolutionFlow = new Flow<{ apiKey: string, ids: string[] }>({
);
```

This pattern applies whenever a map handler needs any data beyond the array elements themselves. Add a step before the map that enriches the array elements with whatever data the handler needs - whether that's the original flow input (`run`), outputs from other dependencies, or both.
This pattern applies whenever a map handler needs any data beyond the array elements themselves. Add a step before the map that enriches the array elements with whatever data the handler needs - whether that's the original flow input, outputs from other dependencies, or both.

:::note[Debugging Map Tasks]
When debugging map steps, use [`context.stepTask.task_index`](/reference/context/#steptask) to identify which array element each task is processing.
Expand Down Expand Up @@ -141,12 +141,12 @@ const EmptyHandling = new Flow<{}>({
)
.step(
{ slug: 'handleResults', dependsOn: ['enrichedItems'] },
(input) => {
(deps) => {
// This still executes to handle the empty result
if (input.enrichedItems.length === 0) {
if (deps.enrichedItems.length === 0) {
return { message: 'No items to process' };
}
return { processed: input.enrichedItems.length };
return { processed: deps.enrichedItems.length };
}
);
```
Expand Down
Loading
Loading