From 6090a08f3e6533c69ddbffa077633495fa1b9c60 Mon Sep 17 00:00:00 2001 From: Wojtek Majewski Date: Tue, 23 Dec 2025 11:50:11 +0100 Subject: [PATCH] migrate 48+ files from input.run to asymmetric handler signatures --- apps/demo/src/lib/data/flow-code.ts | 26 ++--- .../article_flow_worker/article_flow.ts | 20 ++-- .../install/create-flows-directory.test.ts | 2 +- pkgs/cli/examples/analyze_website.ts | 22 ++--- .../install/create-flows-directory.ts | 4 +- pkgs/cli/supabase/flows/test_flow_e2e.ts | 4 +- .../__tests__/e2e/full-stack-dsl.test.ts | 14 +-- .../__tests__/e2e/happy-path-e2e.test.ts | 4 +- .../__tests__/e2e/real-flow-execution.test.ts | 2 +- pkgs/dsl/tests/context-inference.test.ts | 10 +- .../docs/build/create-reusable-tasks.mdx | 6 +- .../docs/build/process-arrays-in-parallel.mdx | 26 ++--- .../src/content/docs/build/retrying-steps.mdx | 8 +- .../content/docs/build/validation-steps.mdx | 26 ++--- .../docs/concepts/how-pgflow-works.mdx | 22 ++--- .../src/content/docs/concepts/map-steps.mdx | 25 +++-- .../docs/concepts/understanding-flows.mdx | 98 ++++++++++--------- .../docs/get-started/flows/create-flow.mdx | 19 ++-- pkgs/website/src/content/docs/index.mdx | 16 +-- .../content/docs/reference/compile-api.mdx | 8 +- .../docs/reference/manual-installation.mdx | 4 +- .../tutorials/rag/automatic-embeddings.mdx | 12 +-- 22 files changed, 189 insertions(+), 189 deletions(-) diff --git a/apps/demo/src/lib/data/flow-code.ts b/apps/demo/src/lib/data/flow-code.ts index 22b3c12fe..1a794e658 100644 --- a/apps/demo/src/lib/data/flow-code.ts +++ b/apps/demo/src/lib/data/flow-code.ts @@ -26,50 +26,50 @@ export const FLOW_SECTIONS: Record = { fetchArticle: { code: ` .step( { slug: 'fetchArticle' }, - (input) => scrapeUrl(input.run.url) + (flowInput) => scrapeUrl(flowInput.url) )`, mobileCode: ` .step( { slug: 'fetchArticle' }, - (input) => scrapeUrl( - input.run.url + (flowInput) => scrapeUrl( + flowInput.url ) )` }, summarize: { code: ` .step( { slug: 'summarize', dependsOn: ['fetchArticle'] }, - (input) => summarize(schema, input.fetchArticle.content) + (deps) => summarize(schema, deps.fetchArticle.content) )`, mobileCode: ` .step( { slug: 'summarize', dependsOn: ['fetchArticle'] }, - (input) => summarize( + (deps) => summarize( schema, - input.fetchArticle.content + deps.fetchArticle.content ) )` }, extractKeywords: { code: ` .step( { slug: 'extractKeywords', dependsOn: ['fetchArticle'] }, - (input) => extractKeywords(input.fetchArticle.content) + (deps) => extractKeywords(deps.fetchArticle.content) )`, mobileCode: ` .step( { slug: 'extractKeywords', dependsOn: ['fetchArticle'] }, - (input) => extractKeywords( - input.fetchArticle.content + (deps) => extractKeywords( + deps.fetchArticle.content ) )` }, publish: { code: ` .step( { slug: 'publish', dependsOn: ['summarize', 'extractKeywords'] }, - (input) => publishArticle(input.summarize, input.extractKeywords) + (deps) => publishArticle(deps.summarize, deps.extractKeywords) );`, mobileCode: ` .step( { @@ -79,9 +79,9 @@ export const FLOW_SECTIONS: Record = { 'extractKeywords' ] }, - (input) => publishArticle( - input.summarize, - input.extractKeywords + (deps) => publishArticle( + deps.summarize, + deps.extractKeywords ) );` } diff --git a/apps/demo/supabase/functions/article_flow_worker/article_flow.ts b/apps/demo/supabase/functions/article_flow_worker/article_flow.ts index 9ef955f2b..937e7d3b7 100644 --- a/apps/demo/supabase/functions/article_flow_worker/article_flow.ts +++ b/apps/demo/supabase/functions/article_flow_worker/article_flow.ts @@ -17,10 +17,10 @@ export default new Flow<{ url: string }>({ baseDelay: 1, maxAttempts: 2 }) - .step({ slug: 'fetchArticle' }, async (input) => { + .step({ slug: 'fetchArticle' }, async (flowInput) => { await sleep(SLEEP_MS); const startTime = Date.now(); - const result = await fetchArticle(input.run.url); + const result = await fetchArticle(flowInput.url); const durationMs = Date.now() - startTime; return { ...result, @@ -30,16 +30,12 @@ export default new Flow<{ url: string }>({ } }; }) - .step({ slug: 'summarize', dependsOn: ['fetchArticle'], baseDelay: 1 }, async (input) => - summarizeArticle(input.fetchArticle.content) + .step({ slug: 'summarize', dependsOn: ['fetchArticle'], baseDelay: 1 }, async (deps) => + summarizeArticle(deps.fetchArticle.content) ) - .step({ slug: 'extractKeywords', dependsOn: ['fetchArticle'] }, async (input) => - extractKeywords(input.fetchArticle.content) + .step({ slug: 'extractKeywords', dependsOn: ['fetchArticle'] }, async (deps) => + extractKeywords(deps.fetchArticle.content) ) - .step({ slug: 'publish', dependsOn: ['summarize', 'extractKeywords'] }, async (input) => - publishArticle( - input.summarize.summary, - input.summarize.sentiment, - input.extractKeywords.keywords - ) + .step({ slug: 'publish', dependsOn: ['summarize', 'extractKeywords'] }, async (deps) => + publishArticle(deps.summarize.summary, deps.summarize.sentiment, deps.extractKeywords.keywords) ); diff --git a/pkgs/cli/__tests__/commands/install/create-flows-directory.test.ts b/pkgs/cli/__tests__/commands/install/create-flows-directory.test.ts index 4dc942afc..f6ab23ab9 100644 --- a/pkgs/cli/__tests__/commands/install/create-flows-directory.test.ts +++ b/pkgs/cli/__tests__/commands/install/create-flows-directory.test.ts @@ -92,7 +92,7 @@ describe('createFlowsDirectory', () => { // Second step should depend on first expect(greetUserContent).toContain("dependsOn: ['fullName']"); // Second step should access result from first step - expect(greetUserContent).toContain('input.fullName'); + expect(greetUserContent).toContain('deps.fullName'); }); it('should not create files when they already exist', async () => { diff --git a/pkgs/cli/examples/analyze_website.ts b/pkgs/cli/examples/analyze_website.ts index e86fe694d..f9ab67b86 100644 --- a/pkgs/cli/examples/analyze_website.ts +++ b/pkgs/cli/examples/analyze_website.ts @@ -14,11 +14,11 @@ const AnalyzeWebsite = new Flow({ timeout: 10, }) // First step: scrape the website - .step({ slug: 'website' }, async (input) => { - console.log(`Scraping website: ${input.run.url}`); + .step({ slug: 'website' }, async (flowInput) => { + console.log(`Scraping website: ${flowInput.url}`); // In a real implementation, this would call an actual web scraper return { - content: `Sample content from ${input.run.url}`, + content: `Sample content from ${flowInput.url}`, title: 'Sample Website Title', links: ['https://example.com/page1', 'https://example.com/page2'], }; @@ -31,8 +31,8 @@ const AnalyzeWebsite = new Flow({ timeout: 30, maxAttempts: 5, }, - async (input) => { - console.log(`Analyzing sentiment for: ${input.website.title}`); + async (deps) => { + console.log(`Analyzing sentiment for: ${deps.website.title}`); // In a real implementation, this would call a sentiment analysis service return { score: 0.75, @@ -42,19 +42,19 @@ const AnalyzeWebsite = new Flow({ } ) // Third step: generate summary (depends on website step) - .step({ slug: 'summary', dependsOn: ['website'] }, async (input) => { - console.log(`Generating summary for: ${input.website.title}`); + .step({ slug: 'summary', dependsOn: ['website'] }, async (deps) => { + console.log(`Generating summary for: ${deps.website.title}`); // In a real implementation, this might use an AI service return { - aiSummary: `This is a summary of ${input.website.title}`, - wordCount: input.website.content.split(' ').length, + aiSummary: `This is a summary of ${deps.website.title}`, + wordCount: deps.website.content.split(' ').length, }; }) // Fourth step: save results to database (depends on sentiment and summary) .step( { slug: 'saveToDb', dependsOn: ['sentiment', 'summary'] }, - async (input) => { - console.log(`Saving results to database for: ${input.run.url}`); + async (_deps, ctx) => { + console.log(`Saving results to database for: ${ctx.flowInput.url}`); // In a real implementation, this would save to a database return { status: 'success', diff --git a/pkgs/cli/src/commands/install/create-flows-directory.ts b/pkgs/cli/src/commands/install/create-flows-directory.ts index 0b2014451..def6baa61 100644 --- a/pkgs/cli/src/commands/install/create-flows-directory.ts +++ b/pkgs/cli/src/commands/install/create-flows-directory.ts @@ -21,11 +21,11 @@ export const GreetUser = new Flow({ }) .step( { slug: 'fullName' }, - (input) => \`\${input.run.firstName} \${input.run.lastName}\` + (flowInput) => \`\${flowInput.firstName} \${flowInput.lastName}\` ) .step( { slug: 'greeting', dependsOn: ['fullName'] }, - (input) => \`Hello, \${input.fullName}!\` + (deps) => \`Hello, \${deps.fullName}!\` ); `; diff --git a/pkgs/cli/supabase/flows/test_flow_e2e.ts b/pkgs/cli/supabase/flows/test_flow_e2e.ts index db7e56399..d7984b5a1 100644 --- a/pkgs/cli/supabase/flows/test_flow_e2e.ts +++ b/pkgs/cli/supabase/flows/test_flow_e2e.ts @@ -4,6 +4,6 @@ import { Flow } from '@pgflow/dsl'; export const TestFlowE2E = new Flow<{ value: string }>({ slug: 'test_flow_e2e', maxAttempts: 3, -}).step({ slug: 'step1' }, async (input) => ({ - result: `processed: ${input.run.value}`, +}).step({ slug: 'step1' }, async (flowInput) => ({ + result: `processed: ${flowInput.value}`, })); diff --git a/pkgs/client/__tests__/e2e/full-stack-dsl.test.ts b/pkgs/client/__tests__/e2e/full-stack-dsl.test.ts index c05728e08..9ba7e3283 100644 --- a/pkgs/client/__tests__/e2e/full-stack-dsl.test.ts +++ b/pkgs/client/__tests__/e2e/full-stack-dsl.test.ts @@ -19,20 +19,20 @@ describe('Full Stack DSL Integration', () => { const SimpleFlow = new Flow<{ url: string }>({ slug: 'simple_dag_test', }) - .step({ slug: 'fetch' }, async (_input) => ({ + .step({ slug: 'fetch' }, async (_flowInput) => ({ data: 'fetched content', status: 200, items: 10, })) - .step({ slug: 'process', dependsOn: ['fetch'] }, async (input) => ({ + .step({ slug: 'process', dependsOn: ['fetch'] }, async (deps) => ({ processed_data: 'cleaned and validated', - item_count: input.fetch.items * 2, + item_count: deps.fetch.items * 2, metadata: { stage: 'processed' }, })) - .step({ slug: 'save', dependsOn: ['process'] }, async (input) => ({ + .step({ slug: 'save', dependsOn: ['process'] }, async (deps) => ({ saved: true, record_id: 'rec_12345', - final_count: input.process.item_count, + final_count: deps.process.item_count, })); // Clean up flow data to ensure clean state @@ -95,7 +95,7 @@ describe('Full Stack DSL Integration', () => { let tasks = await readAndStart(sql, sqlClient, SimpleFlow.slug, 1, 5); expect(tasks).toHaveLength(1); expect(tasks[0].step_slug).toBe('fetch'); - expect(tasks[0].input.run).toEqual(input); + expect(tasks[0].flow_input).toEqual(input); const fetchOutput = { data: 'fetched content', status: 200, items: 10 }; await sqlClient.completeTask(tasks[0], fetchOutput); @@ -111,7 +111,7 @@ describe('Full Stack DSL Integration', () => { tasks = await readAndStart(sql, sqlClient, SimpleFlow.slug, 1, 5); expect(tasks).toHaveLength(1); expect(tasks[0].step_slug).toBe('process'); - expect(tasks[0].input.run).toEqual(input); + expect(tasks[0].flow_input).toEqual(input); expect(tasks[0].input.fetch).toEqual(fetchOutput); // Critical: dependency output included const processOutput = { diff --git a/pkgs/client/__tests__/e2e/happy-path-e2e.test.ts b/pkgs/client/__tests__/e2e/happy-path-e2e.test.ts index 362e7668b..a9db6ff40 100644 --- a/pkgs/client/__tests__/e2e/happy-path-e2e.test.ts +++ b/pkgs/client/__tests__/e2e/happy-path-e2e.test.ts @@ -73,7 +73,7 @@ describe('Happy Path E2E Integration', () => { let tasks = await readAndStart(sql, sqlClient, testFlow.slug, 1, 5); expect(tasks).toHaveLength(1); expect(tasks[0].step_slug).toBe('fetch'); - expect(tasks[0].input.run).toEqual(input); + expect(tasks[0].flow_input).toEqual(input); const fetchOutput = { data: 'fetched content', status: 200, items: 10 }; await sqlClient.completeTask(tasks[0], fetchOutput); @@ -88,7 +88,7 @@ describe('Happy Path E2E Integration', () => { tasks = await readAndStart(sql, sqlClient, testFlow.slug, 1, 5); expect(tasks).toHaveLength(1); expect(tasks[0].step_slug).toBe('process'); - expect(tasks[0].input.run).toEqual(input); + expect(tasks[0].flow_input).toEqual(input); expect(tasks[0].input.fetch).toEqual(fetchOutput); const processOutput = { diff --git a/pkgs/client/__tests__/e2e/real-flow-execution.test.ts b/pkgs/client/__tests__/e2e/real-flow-execution.test.ts index e5cbae6e2..a121c1919 100644 --- a/pkgs/client/__tests__/e2e/real-flow-execution.test.ts +++ b/pkgs/client/__tests__/e2e/real-flow-execution.test.ts @@ -41,7 +41,7 @@ describe('Real Flow Execution', () => { const tasks = await readAndStart(sql, sqlClient, testFlow.slug, 1, 5); expect(tasks).toHaveLength(1); - expect(tasks[0].input.run).toEqual(input); + expect(tasks[0].flow_input).toEqual(input); // Complete task with complex nested output to verify JSON parsing const taskOutput = { diff --git a/pkgs/dsl/tests/context-inference.test.ts b/pkgs/dsl/tests/context-inference.test.ts index 1aacf8978..259e90292 100644 --- a/pkgs/dsl/tests/context-inference.test.ts +++ b/pkgs/dsl/tests/context-inference.test.ts @@ -77,10 +77,10 @@ interface ExplicitContext { } const ExplicitFlow = new Flow<{ userId: string }, ExplicitContext>({ slug: 'explicit-flow' }) - .step({ slug: 'get-user' }, async (input, context) => { + .step({ slug: 'get-user' }, async (flowInput, context) => { // All ExplicitContext properties should be available - const user = await context.sql.query(`SELECT * FROM users WHERE id = ${input.run.userId}`); - await context.cache.set(`user:${input.run.userId}`, JSON.stringify(user)); + const user = await context.sql.query(`SELECT * FROM users WHERE id = ${flowInput.userId}`); + await context.cache.set(`user:${flowInput.userId}`, JSON.stringify(user)); context.pubsub.publish('user-fetched'); return user; }); @@ -95,9 +95,9 @@ const _explicitTest: ExplicitFlowContext = { // Test 5: Mixed approach - explicit base with inferred additions const MixedFlow = new Flow<{ id: string }, { sql: TestSql }>({ slug: 'mixed-flow' }) - .step({ slug: 'query' }, async (input, context) => { + .step({ slug: 'query' }, async (flowInput, context) => { // Has sql from explicit type - return await context.sql.query(`SELECT * FROM items WHERE id = ${input.run.id}`); + return await context.sql.query(`SELECT * FROM items WHERE id = ${flowInput.id}`); }) .step({ slug: 'enhance' }, async (input, context: Context & { sql: TestSql, ai: { generate: () => string } }) => { // Adds ai requirement via inference diff --git a/pkgs/website/src/content/docs/build/create-reusable-tasks.mdx b/pkgs/website/src/content/docs/build/create-reusable-tasks.mdx index e22a070e3..985af32da 100644 --- a/pkgs/website/src/content/docs/build/create-reusable-tasks.mdx +++ b/pkgs/website/src/content/docs/build/create-reusable-tasks.mdx @@ -53,7 +53,7 @@ To create task functions that can be used across multiple flows without tight co Let the flow's step handler extract needed data from the flow context: - ```typescript "input.run.userId" + ```typescript "flowInput.userId" // Your task function async function fetchUserProfile(userId: string) { const response = await fetch(`https://api.example.com/users/${userId}`); @@ -62,8 +62,8 @@ To create task functions that can be used across multiple flows without tight co // Flow uses handler to adapt context to task parameters new Flow<{ userId: string }>({ slug: 'userFlow' }) - .step({ slug: 'profile' }, async (input) => - await fetchUserProfile(input.run.userId) + .step({ slug: 'profile' }, async (flowInput) => + await fetchUserProfile(flowInput.userId) ) ``` diff --git a/pkgs/website/src/content/docs/build/process-arrays-in-parallel.mdx b/pkgs/website/src/content/docs/build/process-arrays-in-parallel.mdx index 9b5a062e0..5829e9cd8 100644 --- a/pkgs/website/src/content/docs/build/process-arrays-in-parallel.mdx +++ b/pkgs/website/src/content/docs/build/process-arrays-in-parallel.mdx @@ -30,7 +30,7 @@ const ScrapeMultipleUrls = new Flow({ // Flow input must be array for ) .step( { slug: 'summary', dependsOn: ['scrapedPages'] }, - (input) => summarizeResults(input.scrapedPages) + (deps) => summarizeResults(deps.scrapedPages) ); // Usage (SQL): @@ -53,7 +53,7 @@ const CsvProcessor = new Flow<{ csvUrl: string }>({ }) .array( { slug: 'csvRows' }, - async (input) => await fetchAndParseCSV(input.run.csvUrl) + async (flowInput) => await fetchAndParseCSV(flowInput.csvUrl) ) .map( { slug: 'validatedRows', array: 'csvRows' }, @@ -65,23 +65,23 @@ const CsvProcessor = new Flow<{ csvUrl: string }>({ ) .step( { slug: 'saveResults', dependsOn: ['processedRows'] }, - async (input, context) => await saveProcessedData(input.processedRows, context) + async (deps, ctx) => await saveProcessedData(deps.processedRows, ctx) ); ``` ## Enriching Array Elements with Additional Data -Map handlers only receive individual array elements. If a handler needs access to the original flow input (`run`), outputs from other dependencies, or any additional data, include that data in the array elements via a previous step. +Map handlers only receive individual array elements. If a handler needs access to the original flow input, outputs from other dependencies, or any additional data, include that data in the array elements via a previous step. ### Problem: Map Handlers Can't Access Flow Context ```typescript "apiKey: string" del="id, ???" del="(id)" -// This won't work - map handler can't access input.run +// This won't work - map handler can't access ctx.flowInput const ProblemFlow = new Flow<{ apiKey: string, ids: string[] }>({ slug: 'problemFlow', }) .map({ slug: 'fetch' }, async (id) => { - // Can't access input.run.apiKey here! + // Can't access flowInput.apiKey here! return await fetchWithKey(id, ???); // No access to apiKey }); ``` @@ -94,11 +94,11 @@ const SolutionFlow = new Flow<{ apiKey: string, ids: string[] }>({ }) .array( { slug: 'prepareItems' }, - (input) => { + (flowInput) => { // Include needed context in each element - return input.run.ids.map(id => ({ + return flowInput.ids.map(id => ({ id, - apiKey: input.run.apiKey + apiKey: flowInput.apiKey })); } ) @@ -111,7 +111,7 @@ const SolutionFlow = new Flow<{ apiKey: string, ids: string[] }>({ ); ``` -This pattern applies whenever a map handler needs any data beyond the array elements themselves. Add a step before the map that enriches the array elements with whatever data the handler needs - whether that's the original flow input (`run`), outputs from other dependencies, or both. +This pattern applies whenever a map handler needs any data beyond the array elements themselves. Add a step before the map that enriches the array elements with whatever data the handler needs - whether that's the original flow input, outputs from other dependencies, or both. :::note[Debugging Map Tasks] When debugging map steps, use [`context.stepTask.task_index`](/reference/context/#steptask) to identify which array element each task is processing. @@ -141,12 +141,12 @@ const EmptyHandling = new Flow<{}>({ ) .step( { slug: 'handleResults', dependsOn: ['enrichedItems'] }, - (input) => { + (deps) => { // This still executes to handle the empty result - if (input.enrichedItems.length === 0) { + if (deps.enrichedItems.length === 0) { return { message: 'No items to process' }; } - return { processed: input.enrichedItems.length }; + return { processed: deps.enrichedItems.length }; } ); ``` diff --git a/pkgs/website/src/content/docs/build/retrying-steps.mdx b/pkgs/website/src/content/docs/build/retrying-steps.mdx index 83d67e9ba..29c60acd5 100644 --- a/pkgs/website/src/content/docs/build/retrying-steps.mdx +++ b/pkgs/website/src/content/docs/build/retrying-steps.mdx @@ -33,7 +33,7 @@ Configure with retries: slug: 'fetchExternalData', maxAttempts: 5, // Retry transient failures baseDelay: 2, -}, async (input) => await fetchFromAPI(input.run.url)) +}, async (flowInput) => await fetchFromAPI(flowInput.url)) ``` ### Permanent Failures @@ -49,9 +49,9 @@ Configure without retries: .step({ slug: 'validInput', maxAttempts: 1, // No retries for validation -}, (input) => { - if (!input.run.email) throw new Error('Email required'); - return input.run; +}, (flowInput) => { + if (!flowInput.email) throw new Error('Email required'); + return flowInput; }) ``` diff --git a/pkgs/website/src/content/docs/build/validation-steps.mdx b/pkgs/website/src/content/docs/build/validation-steps.mdx index 7078f7084..f16cc9161 100644 --- a/pkgs/website/src/content/docs/build/validation-steps.mdx +++ b/pkgs/website/src/content/docs/build/validation-steps.mdx @@ -18,11 +18,11 @@ pgflow retries all exceptions based on `maxAttempts`. Without explicit validatio new Flow<{ email: string }>({ slug: 'sendEmail', maxAttempts: 5 }) .step( { slug: 'send' }, - async (input) => { - if (!input.run.email.includes('@')) { + async (flowInput) => { + if (!flowInput.email.includes('@')) { throw new Error('Invalid email'); // Retries 5 times! } - return await sendEmail(input.run.email); + return await sendEmail(flowInput.email); } ) ``` @@ -34,16 +34,16 @@ With explicit validation, failures stop immediately: new Flow<{ email: string }>({ slug: 'sendEmail' }) .step( { slug: 'validInput', maxAttempts: 1 }, - (input) => { - if (!input.run.email.includes('@')) { + (flowInput) => { + if (!flowInput.email.includes('@')) { throw new Error('Invalid email'); } - return input.run; + return flowInput; } ) .step( { slug: 'send', dependsOn: ['validInput'], maxAttempts: 5 }, - async (input) => await sendEmail(input.validInput.email) + async (deps) => await sendEmail(deps.validInput.email) ) ``` @@ -55,24 +55,24 @@ Validation steps should be fast, synchronous functions that check input format a // Good: Fast, synchronous validation .step( { slug: 'validOrder', maxAttempts: 1 }, - (input) => { - const { amount, items } = input.run; + (flowInput) => { + const { amount, items } = flowInput; if (amount <= 0) throw new Error('amount must be positive'); if (!items?.length) throw new Error('items cannot be empty'); - return input.run; + return flowInput; } ) // Bad: Async checks in validation .step( { slug: 'validCustomer', maxAttempts: 1 }, - async (input) => { + async (flowInput) => { // Database lookups belong in separate steps with retries - const exists = await checkCustomerExists(input.run.customerId); + const exists = await checkCustomerExists(flowInput.customerId); if (!exists) throw new Error('Customer not found'); - return input.run; + return flowInput; } ) ``` diff --git a/pkgs/website/src/content/docs/concepts/how-pgflow-works.mdx b/pkgs/website/src/content/docs/concepts/how-pgflow-works.mdx index 2dd3da799..f39737eb9 100644 --- a/pkgs/website/src/content/docs/concepts/how-pgflow-works.mdx +++ b/pkgs/website/src/content/docs/concepts/how-pgflow-works.mdx @@ -107,40 +107,40 @@ new Flow({ slug: "process_article" }) .step({ slug: "publish", dependsOn: ["summarize", "extractKeywords"] }, fn); ``` -pgflow uses dependencies to figure out execution order and parallelism. Steps with the same dependencies run in parallel. Return values become available as `input.` to dependent steps. +pgflow uses dependencies to figure out execution order and parallelism. Steps with the same dependencies run in parallel. Return values become available as `deps.` to dependent steps.
See full code -```typescript "input.run" "dependsOn: [\"fetchArticle\"]" "input.fetchArticle" "dependsOn: [\"summarize\", \"extractKeywords\"]" "input.summarize" "input.extractKeywords" +```typescript "flowInput.url" "dependsOn: [\"fetchArticle\"]" "deps.fetchArticle" "dependsOn: [\"summarize\", \"extractKeywords\"]" "deps.summarize" "deps.extractKeywords" type Input = { url: string }; new Flow({ slug: "process_article" }) .step( { slug: "fetchArticle" }, - async (input) => { - const response = await fetch(input.run.url); + async (flowInput) => { + const response = await fetch(flowInput.url); return response.text(); } ) .step( { slug: "summarize", dependsOn: ["fetchArticle"] }, - async (input) => { - return await openai.summarize(input.fetchArticle); + async (deps) => { + return await openai.summarize(deps.fetchArticle); } ) .step( { slug: "extractKeywords", dependsOn: ["fetchArticle"] }, - async (input) => { - return await openai.extractKeywords(input.fetchArticle); + async (deps) => { + return await openai.extractKeywords(deps.fetchArticle); } ) .step( { slug: "publish", dependsOn: ["summarize", "extractKeywords"] }, - async (input) => { + async (deps) => { await db.insert({ - summary: input.summarize, - keywords: input.extractKeywords + summary: deps.summarize, + keywords: deps.extractKeywords }); } ); diff --git a/pkgs/website/src/content/docs/concepts/map-steps.mdx b/pkgs/website/src/content/docs/concepts/map-steps.mdx index 8ced85918..6a78fd101 100644 --- a/pkgs/website/src/content/docs/concepts/map-steps.mdx +++ b/pkgs/website/src/content/docs/concepts/map-steps.mdx @@ -116,25 +116,24 @@ As map tasks complete, their outputs are collected: Map step handlers receive only the individual array element, not the full input object that regular step handlers receive: **Regular Step Handler:** -```typescript "input" -.step({ slug: 'regular' }, async (input) => { - // input contains: - // - input.run: The original flow input - // - input.dependencyName: Output from each dependency - return processAllData(input); +```typescript "deps" +.step({ slug: 'regular', dependsOn: ['source'] }, async (deps, ctx) => { + // deps contains outputs from dependencies + // ctx.flowInput provides access to the original flow input + return processAllData(deps.source); }) ``` **Map Step Handler:** ```typescript "item" -.map({ slug: 'mapStep', array: 'source' }, async (item) => { +.map({ slug: 'mapStep', array: 'source' }, async (item, ctx) => { // item is ONLY the individual array element - // No access to input.run or other dependencies + // ctx.flowInput provides access to the original flow input return processItem(item); }) ``` -This constraint keeps map handlers focused on transforming individual elements. If a map handler needs additional data (like flow input or other step outputs), include that data in the array elements via a previous step. Future versions will provide access to flow input via the handler's context parameter. +This constraint keeps map handlers focused on transforming individual elements. If a map handler needs additional data from other step outputs, include that data in the array elements via a previous step. The context parameter provides access to flow input when needed. ## Root Maps vs Dependent Maps @@ -169,7 +168,7 @@ Process another step's array output by specifying the `array` property: new Flow<{ searchQuery: string }>({ slug: 'searchPipeline' }) .array( { slug: 'searchResults' }, - async (input) => await searchAPI(input.run.searchQuery) + async (flowInput) => await searchAPI(flowInput.searchQuery) ) .map( { slug: 'processResults', array: 'searchResults' }, // Processes search output @@ -237,11 +236,11 @@ If your map handler needs data from other steps or must wait for additional depe slug: 'enrichedItems', dependsOn: ['items', 'config'] // Array steps CAN use dependsOn }, - (input) => { + (deps) => { // Enrich each item with the config data it needs - return input.items.map(item => ({ + return deps.items.map(item => ({ item, - apiKey: input.config.apiKey + apiKey: deps.config.apiKey })); } ) diff --git a/pkgs/website/src/content/docs/concepts/understanding-flows.mdx b/pkgs/website/src/content/docs/concepts/understanding-flows.mdx index 77a1bdf82..f9c0f6f0a 100644 --- a/pkgs/website/src/content/docs/concepts/understanding-flows.mdx +++ b/pkgs/website/src/content/docs/concepts/understanding-flows.mdx @@ -32,45 +32,47 @@ Each `.step()` call creates a new Flow instance without modifying the original, ## Understanding Step Inputs and Data Flow -In pgflow, **every step receives a unified `input` object** with two critical parts: +In pgflow, step handlers have **asymmetric signatures** based on whether they have dependencies: -1. **`input.run`** - The original flow input, available to ALL steps -2. **`input.{stepName}`** - Outputs from any dependency steps +1. **Root steps** (no dependencies) receive **flow input directly** as the first parameter +2. **Dependent steps** receive **dependency outputs** as the first parameter -:::note[The input.run Field] -Every step has access to the complete flow input via `input.run`, ensuring: -- Original flow parameters are accessible throughout the flow -- Data doesn't need to be manually forwarded through intermediate steps -- Steps can combine original input with processed data +Both step types receive a **context object** as the second parameter, which provides access to environment variables, the Supabase client, and the original flow input. + +:::note[Accessing Flow Input] +- Root steps: Access directly via the first parameter (e.g., `flowInput.url`) +- Dependent steps: Access via context (e.g., `ctx.flowInput.url`) + +This asymmetric design matches the domain - root and dependent steps ARE different. ::: Consider this example: -```typescript "input.run.url" "input.run.userId" "input.scrape.content" +```typescript "flowInput.url" "ctx.flowInput.userId" "deps.scrape.content" new Flow<{ url: string, userId: string }>({ slug: 'analyzeWebsite', }) .step( { slug: 'scrape' }, - async (input) => { - // Access to input.run.url and input.run.userId - return await scrapeWebsite(input.run.url); + async (flowInput) => { + // flowInput is the flow input directly + return await scrapeWebsite(flowInput.url); } ) .step( { slug: 'analyze', dependsOn: ['scrape'] }, - async (input) => { - // Still has access to input.run.userId - // Now also has access to input.scrape - return await analyzeContent(input.scrape.content); + async (deps, ctx) => { + // deps contains dependency outputs + // ctx.flowInput provides access to original flow input if needed + return await analyzeContent(deps.scrape.content); } ); ``` When this flow runs: 1. The flow receives an input object (e.g., `{ url: "example.com", userId: "123" }`) -2. Each step receives both the original input via `input.run` and the outputs of any dependency steps -3. Steps can combine original parameters with processed data from previous steps +2. Root steps receive the flow input directly as the first parameter +3. Dependent steps receive their dependencies as the first parameter, with flow input available via `ctx.flowInput` ## The Type System @@ -86,15 +88,15 @@ new Flow({ }) .step( { slug: 'scrape' }, - async (input) => { - // input.run is typed as WebsiteInput + async (flowInput) => { + // flowInput is typed as WebsiteInput return { content: "..." }; } ) .step( { slug: 'analyze', dependsOn: ['scrape'] }, - async (input) => { - // input.scrape is typed based on the scrape step's return type + async (deps) => { + // deps.scrape is typed based on the scrape step's return type return { analysis: "..." }; } ); @@ -102,12 +104,12 @@ new Flow({ The type system automatically: - Enforces the correct input type for the flow -- Makes the flow input available as `input.run` in every step +- Types the first parameter based on step type (root vs dependent) - Tracks each step's output type and makes it available to dependent steps - Provides IDE autocompletion and catches type errors at compile time :::tip[Type Safety] -TypeScript provides full autocompletion for both `input.run` properties and dependency outputs, allowing you to confidently access data without runtime errors. +TypeScript provides full autocompletion for flow input (first parameter for root steps) and dependency outputs (first parameter for dependent steps), allowing you to confidently access data without runtime errors. ::: ## Step Methods @@ -121,8 +123,8 @@ The standard method for adding steps to a flow. Each step processes input and re ```typescript .step( { slug: 'process', dependsOn: ['previous'] }, - async (input) => { - // Access input.run and input.previous + async (deps) => { + // Access deps.previous for dependency output return { result: 'processed' }; } ) @@ -142,7 +144,7 @@ A semantic wrapper around `.step()` that enforces array return types. Useful for // With dependencies .array( { slug: 'combine', dependsOn: ['source1', 'source2'] }, - async (input) => [...input.source1, ...input.source2] + async (deps) => [...deps.source1, ...deps.source2] ) ``` @@ -204,7 +206,7 @@ All step inputs and outputs MUST be serializable to JSON since pgflow stores the // GOOD: Fully serializable objects .step( { slug: 'processData' }, - async (input) => { + async () => { return { count: 42, items: ["apple", "banana"], @@ -219,7 +221,7 @@ All step inputs and outputs MUST be serializable to JSON since pgflow stores the // BAD: Non-serializable types will cause runtime errors .step( { slug: 'badExample' }, - async (input) => { + async () => { return { date: new Date(), // Use toISOString() instead regex: /test/, // Not serializable @@ -270,7 +272,7 @@ The TypeScript code is used only for definition and compilation, not for executi Here's a practical example showing how data flows efficiently through steps: -```typescript "input.run.userId" "input.run.reportType" "input.run.includeDetails" "input.run.reportType" {21,30,38} +```typescript "flowInput.userId" "ctx.flowInput.reportType" "ctx.flowInput.includeDetails" {21,30,38} // Flow with multiple input parameters type Input = { userId: string, @@ -284,35 +286,35 @@ new Flow({ // Step 1: Fetch user data .step( { slug: 'user' }, - async (input) => { - return await fetchUser(input.run.userId); + async (flowInput) => { + return await fetchUser(flowInput.userId); } ) // Steps 2 & 3: Process user data in parallel .step( { slug: 'activity', dependsOn: ['user'] }, - async (input) => { - // Uses input.run.reportType to determine timespan - const timespan = input.run.reportType === 'advanced' ? '1y' : '30d'; - return await getUserActivity(input.user.id, timespan); + async (deps, ctx) => { + // Uses ctx.flowInput.reportType to determine timespan + const timespan = ctx.flowInput.reportType === 'advanced' ? '1y' : '30d'; + return await getUserActivity(deps.user.id, timespan); } ) .step( { slug: 'preferences', dependsOn: ['user'] }, - async (input) => { - // Uses input.run.includeDetails parameter - return await getUserPreferences(input.user.id, input.run.includeDetails); + async (deps, ctx) => { + // Uses ctx.flowInput.includeDetails parameter + return await getUserPreferences(deps.user.id, ctx.flowInput.includeDetails); } ) // Step 4: Combine results .step( { slug: 'report', dependsOn: ['activity', 'preferences'] }, - async (input) => { + async (deps, ctx) => { return { - user: input.user, - activity: input.activity, - preferences: input.preferences, - reportType: input.run.reportType, // Original parameter still available + user: deps.user, + activity: deps.activity, + preferences: deps.preferences, + reportType: ctx.flowInput.reportType, // Original parameter still available generatedAt: new Date().toISOString() }; } @@ -320,7 +322,7 @@ new Flow({ ``` This example demonstrates: -1. **Original parameters available throughout** - Every step can access the input parameters +1. **Original parameters available via context** - Dependent steps access flow input via `ctx.flowInput` 2. **Conditional processing** - Steps adapt behavior based on original parameters 3. **No manual parameter forwarding needed** - The `user` step doesn't need to include original parameters 4. **Type safety throughout** - TypeScript ensures all data accesses are valid @@ -329,10 +331,12 @@ This example demonstrates: pgflow provides a powerful, type-safe way to define complex workflows: -- Every step receives the original flow input via `input.run` +- Root steps receive flow input directly as the first parameter +- Dependent steps receive dependency outputs as the first parameter +- The context object (second parameter) provides access to environment, supabase client, and flow input - TypeScript's type system ensures data flows correctly between steps - The functional approach keeps task implementation separate from flow orchestration - Parallel execution of independent steps is managed automatically - Dependencies between steps are handled transparently -The `input.run` design enables original parameters to be available throughout the workflow without manual forwarding, allowing steps to focus on their specific tasks while maintaining type safety. +The asymmetric handler signatures match the domain - root and dependent steps ARE different - while still providing full access to flow input when needed via the context object. diff --git a/pkgs/website/src/content/docs/get-started/flows/create-flow.mdx b/pkgs/website/src/content/docs/get-started/flows/create-flow.mdx index 8d800e36d..5540ad46b 100644 --- a/pkgs/website/src/content/docs/get-started/flows/create-flow.mdx +++ b/pkgs/website/src/content/docs/get-started/flows/create-flow.mdx @@ -45,11 +45,11 @@ export const GreetUser = new Flow({ }) .step( { slug: 'fullName' }, - (input) => `${input.run.firstName} ${input.run.lastName}` + (flowInput) => `${flowInput.firstName} ${flowInput.lastName}` ) .step( { slug: 'greeting', dependsOn: ['fullName'] }, - (input) => `Hello, ${input.fullName}!` + (deps) => `Hello, ${deps.fullName}!` ); ``` @@ -64,7 +64,7 @@ type Input = { }; ``` -This defines what data the flow accepts when started. The input is always accessible via `input.run` in any step. +This defines what data the flow accepts when started. Root steps receive this directly, while dependent steps access it via `ctx.flowInput`. ### Flow constructor @@ -83,12 +83,12 @@ export const GreetUser = new Flow({ ```typescript .step( { slug: 'fullName' }, - (input) => `${input.run.firstName} ${input.run.lastName}` + (flowInput) => `${flowInput.firstName} ${flowInput.lastName}` ) ``` - **No dependencies**: This step runs immediately when the flow starts -- **`input.run`**: Contains the original flow input +- **`flowInput`**: Root steps receive the flow input directly as the first parameter - **Return value**: The string becomes available to dependent steps ### Second step: greeting @@ -96,12 +96,12 @@ export const GreetUser = new Flow({ ```typescript .step( { slug: 'greeting', dependsOn: ['fullName'] }, - (input) => `Hello, ${input.fullName}!` + (deps) => `Hello, ${deps.fullName}!` ) ``` - **`dependsOn`**: This step waits for `fullName` to complete -- **`input.fullName`**: Access the result from the `fullName` step by its slug +- **`deps.fullName`**: Access the result from the `fullName` step by its slug - **Final output**: When the last step completes, its result becomes the flow output ## Flow registration @@ -128,7 +128,7 @@ ControlPlane.serve(flows); ```typescript .step( { slug: 'shout', dependsOn: ['greeting'] }, - (input) => input.greeting.toUpperCase() + (deps) => deps.greeting.toUpperCase() ) ``` @@ -141,7 +141,8 @@ Recompiling **deletes the existing flow and all its run data**. See [Startup Com :::note[Key Concepts] -- **input.run**: Contains the original flow input, accessible in every step +- **Root steps**: Receive flow input directly as first parameter +- **Dependent steps**: Receive dependency outputs as first parameter, access flow input via `ctx.flowInput` - **step slugs**: Unique identifiers for each step - **dependsOn**: Specifies which steps must complete before a step can run - **JSON serialization**: All inputs and outputs must be JSON-serializable diff --git a/pkgs/website/src/content/docs/index.mdx b/pkgs/website/src/content/docs/index.mdx index 766a57a44..a87bb79c9 100644 --- a/pkgs/website/src/content/docs/index.mdx +++ b/pkgs/website/src/content/docs/index.mdx @@ -325,20 +325,20 @@ import { Flow } from 'npm:@pgflow/dsl'; new Flow<{ url: string }>({ slug: 'analyzeArticle' }) .step({ slug: 'fetchArticle' }, - (input) => scrapeWebsite(input.run.url) + (flowInput) => scrapeWebsite(flowInput.url) ) .step({ slug: 'summarize', dependsOn: ['fetchArticle'] }, - (input) => summarizeContent(input.fetchArticle) + (deps) => summarizeContent(deps.fetchArticle) ) .step({ slug: 'extractKeywords', dependsOn: ['fetchArticle'] }, - (input) => extractKeywords(input.fetchArticle) + (deps) => extractKeywords(deps.fetchArticle) ) .step({ slug: 'publish', dependsOn: ['summarize', 'extractKeywords'] }, - (input) => publishArticle({ - url: input.run.url, - content: input.fetchArticle, - summary: input.summarize, - keywords: input.extractKeywords + (deps, ctx) => publishArticle({ + url: ctx.flowInput.url, + content: deps.fetchArticle, + summary: deps.summarize, + keywords: deps.extractKeywords }) ); ``` diff --git a/pkgs/website/src/content/docs/reference/compile-api.mdx b/pkgs/website/src/content/docs/reference/compile-api.mdx index 25e3f70b3..d572b4cc7 100644 --- a/pkgs/website/src/content/docs/reference/compile-api.mdx +++ b/pkgs/website/src/content/docs/reference/compile-api.mdx @@ -29,10 +29,10 @@ const MyFlow = new Flow({ maxAttempts: 3, timeout: 60, }) - .step({ slug: 'fetch' }, (input) => input.run.url) - .step({ slug: 'process', dependsOn: ['fetch'] }, (input) => ({ - url: input.run.url, - content: input.fetch, + .step({ slug: 'fetch' }, (flowInput) => flowInput.url) + .step({ slug: 'process', dependsOn: ['fetch'] }, (deps, ctx) => ({ + url: ctx.flowInput.url, + content: deps.fetch, })); const statements = compileFlow(MyFlow); diff --git a/pkgs/website/src/content/docs/reference/manual-installation.mdx b/pkgs/website/src/content/docs/reference/manual-installation.mdx index 3af30ecb0..d3ac7725b 100644 --- a/pkgs/website/src/content/docs/reference/manual-installation.mdx +++ b/pkgs/website/src/content/docs/reference/manual-installation.mdx @@ -120,11 +120,11 @@ export const GreetUser = new Flow({ }) .step( { slug: 'fullName' }, - (input) => `${input.run.firstName} ${input.run.lastName}` + (flowInput) => `${flowInput.firstName} ${flowInput.lastName}` ) .step( { slug: 'greeting', dependsOn: ['fullName'] }, - (input) => `Hello, ${input.fullName}!` + (deps) => `Hello, ${deps.fullName}!` ); ``` diff --git a/pkgs/website/src/content/docs/tutorials/rag/automatic-embeddings.mdx b/pkgs/website/src/content/docs/tutorials/rag/automatic-embeddings.mdx index b51a6fc98..f92b33418 100644 --- a/pkgs/website/src/content/docs/tutorials/rag/automatic-embeddings.mdx +++ b/pkgs/website/src/content/docs/tutorials/rag/automatic-embeddings.mdx @@ -219,16 +219,16 @@ type Input = { }; export const GenerateEmbeddings = new Flow({ slug: 'generateEmbeddings' }) - .array({ slug: 'chunks' }, (input) => splitChunks(input.run.content)) + .array({ slug: 'chunks' }, (flowInput) => splitChunks(flowInput.content)) .map({ slug: 'embeddings', array: 'chunks' }, (chunk) => generateEmbedding(chunk) ) - .step({ slug: 'save', dependsOn: ['chunks', 'embeddings'] }, (input, context) => + .step({ slug: 'save', dependsOn: ['chunks', 'embeddings'] }, (deps, ctx) => saveChunks({ - documentId: input.run.documentId, - chunks: input.chunks, - embeddings: input.embeddings, - }, context.supabase) + documentId: ctx.flowInput.documentId, + chunks: deps.chunks, + embeddings: deps.embeddings, + }, ctx.supabase) ); ```