From 440342cb6e60d531e831f7402ef73853bb3375ad Mon Sep 17 00:00:00 2001 From: Wojtek Majewski Date: Tue, 23 Dec 2025 12:09:58 +0100 Subject: [PATCH] remove run key wrapper from step task input construction --- NOMENCLATURE_GUIDE.md | 23 +- README.md | 14 +- context7.json | 10 +- .../__tests__/e2e/full-stack-dsl.test.ts | 2 +- .../__tests__/e2e/happy-path-e2e.test.ts | 4 +- .../schemas/0120_function_start_tasks.sql | 18 +- ...23110504_pgflow_add_flow_input_column.sql} | 18 +- pkgs/core/supabase/migrations/atlas.sum | 4 +- .../basic_aggregation.test.sql | 2 +- .../mixed_dependencies.test.sql | 2 +- .../multiple_maps_to_single.test.sql | 2 +- pkgs/dsl/README.md | 83 ++--- .../integration/array-integration.test.ts | 291 +++++++++--------- .../__tests__/integration/map-flow.test.ts | 36 +-- .../__tests__/runtime/array-method.test.ts | 54 ++-- pkgs/dsl/__tests__/runtime/flow-shape.test.ts | 28 +- .../dsl/__tests__/runtime/map-compile.test.ts | 10 +- pkgs/dsl/__tests__/runtime/steps.test.ts | 13 +- .../analyze_website_simplified.ts.raw | 18 +- .../code-examples/landing-page-flow.ts.raw | 8 +- .../docs/comparisons/vercel-workflows.mdx | 8 +- .../docs/get-started/flows/create-flow.mdx | 27 +- .../docs/tutorials/ai-web-scraper/backend.mdx | 14 +- .../docs/tutorials/use-cases/chatbot.mdx | 22 +- .../docs/tutorials/use-cases/rag-pipeline.mdx | 20 +- .../tutorials/use-cases/structured-output.mdx | 12 +- 26 files changed, 372 insertions(+), 371 deletions(-) rename pkgs/core/supabase/migrations/{20251223052347_pgflow_add_flow_input_column.sql => 20251223110504_pgflow_add_flow_input_column.sql} (91%) diff --git a/NOMENCLATURE_GUIDE.md b/NOMENCLATURE_GUIDE.md index f19db5141..88c3ce90b 100644 --- a/NOMENCLATURE_GUIDE.md +++ b/NOMENCLATURE_GUIDE.md @@ -200,19 +200,32 @@ Slugs are unique text identifiers with specific rules: ## Data Flow Terms +### Handler Signatures + +Step handlers use **asymmetric signatures** based on whether they have dependencies: + +**Root steps (no dependencies):** +- First parameter: `flowInput` - the original flow input directly +- Second parameter: `ctx` - context object (env, supabase, flowInput, etc.) + +**Dependent steps (with dependsOn):** +- First parameter: `deps` - object with outputs from dependency steps +- Second parameter: `ctx` - context object (includes `ctx.flowInput` if needed) + ### Input Structure -- `input.run` - Original flow input (available to all steps except map tasks) -- `input.{stepName}` - Output from dependency step +- `flowInput` - Original flow input (root step first parameter) +- `deps.{stepName}` - Output from dependency step (dependent step first parameter) +- `ctx.flowInput` - Original flow input via context (available in all steps) ### Map Step Input - Map step handlers receive **two parameters**: 1. **item** - The assigned array element - 2. **context** - BaseContext object (env, shutdownSignal, rawMessage, workerConfig) + 2. **context** - BaseContext object (env, shutdownSignal, rawMessage, workerConfig, flowInput) - Map handlers do NOT receive FlowContext (no access to `context.stepTask`) -- No access to `input.run` or other step dependencies via the item parameter -- Context provides environment and worker metadata, but not flow-level input data +- Access to original flow input via `context.flowInput` +- Context provides environment, worker metadata, and flow-level input data ### Output Handling diff --git a/README.md b/README.md index 51a6984be..075b7e541 100644 --- a/README.md +++ b/README.md @@ -43,17 +43,17 @@ Then define your workflow ([full guide](https://pgflow.dev/get-started/installat import { Flow } from '@pgflow/dsl'; new Flow<{ url: string }>({ slug: 'analyzeArticle' }) - .step({ slug: 'scrape' }, (input) => scrapeWebsite(input.run.url)) - .step({ slug: 'summarize', dependsOn: ['scrape'] }, (input) => - summarize(input.scrape) + .step({ slug: 'scrape' }, (flowInput) => scrapeWebsite(flowInput.url)) + .step({ slug: 'summarize', dependsOn: ['scrape'] }, (deps) => + summarize(deps.scrape) ) - .step({ slug: 'extractKeywords', dependsOn: ['scrape'] }, (input) => - extractKeywords(input.scrape) + .step({ slug: 'extractKeywords', dependsOn: ['scrape'] }, (deps) => + extractKeywords(deps.scrape) ) .step( { slug: 'publish', dependsOn: ['summarize', 'extractKeywords'] }, - (input) => - publish({ summary: input.summarize, keywords: input.extractKeywords }) + (deps) => + publish({ summary: deps.summarize, keywords: deps.extractKeywords }) ); ``` diff --git a/context7.json b/context7.json index 023186653..229aa9ce5 100644 --- a/context7.json +++ b/context7.json @@ -15,18 +15,18 @@ "excludeFiles": [], "rules": [ "Flow definitions are immutable and declarative - Each `.step()`, `.array()` or `.map()` call returns a new Flow instance; describe what steps exist and how they connect, pgflow handles execution.", - "Step slugs identify steps and type their outputs - Each step has a unique `slug` used in `dependsOn` arrays and to access outputs via `input.stepName`, which is automatically typed as the step's handler return type.", - "Flow input type determines `input.run` in all steps - Define `Flow` to type the original flow input accessible in ALL steps via `input.run` (e.g., `new Flow<{ url: string }>()` makes `input.run.url` available everywhere with full type safety).", + "Step slugs identify steps and type their outputs - Each step has a unique `slug` used in `dependsOn` arrays and to access outputs via `deps.stepName` in dependent steps, which is automatically typed as the step's handler return type.", + "Asymmetric handler signatures - Root steps receive `(flowInput, ctx)` where flowInput is the flow input directly; dependent steps receive `(deps, ctx)` where deps contains outputs from dependencies and `ctx.flowInput` provides the original flow input if needed.", "Root steps omit `dependsOn`, dependent steps require it - Steps with no dependencies omit `dependsOn` entirely; steps that depend on others specify them in the `dependsOn` array (e.g., `{ slug: 'summary', dependsOn: ['website'] }`).", "All inputs and outputs must be JSON-serializable - Step handlers must return plain objects, primitives, and arrays only; convert dates to ISO strings and avoid functions, class instances, or circular references.", "Use `.array()` for array-returning steps - The `.array()` method is a semantic wrapper around `.step()` that enforces array return types, useful for data collection that will be processed by map steps.", "Use `.map()` for parallel array processing - The `.map()` method creates a step that spawns N parallel tasks (one per array element), each with its own retry counter, returning an aggregated array of results.", - "Handler functions for `.map()` steps receive individual array elements - Unlike `.step()` and `.array()` handlers that receive the full `input` object, map handlers receive only the array element (e.g., `(item) => processItem(item)`).", - "Use `.array()` to prepare data for `.map()` handlers - When map handlers need access to `input.run` or outputs from other steps, create an intermediate `.array()` step that enriches each element with the required data.", + "Handler functions for `.map()` steps receive individual array elements - Unlike `.step()` and `.array()` handlers that receive flowInput or deps, map handlers receive only the array element (e.g., `(item, ctx) => processItem(item)`) with flowInput available via `ctx.flowInput`.", + "Use `.array()` to prepare data for `.map()` handlers - When map handlers need access to flowInput or outputs from other steps, create an intermediate `.array()` step that enriches each element with the required data.", "Map steps can only have a single dependency - Specify the array source via the `array` property (e.g., `{ slug: 'process', array: 'items' }`) and cannot use `dependsOn`.", "Root map steps omit `array:`, dependent maps require it - Root maps process flow input directly without `array:` property (e.g., `new Flow().map({ slug: 'process' }, ...)`); dependent maps must specify `array:` to indicate which step's output to process.", "Steps with the same dependencies run in parallel - pgflow automatically maximizes parallelism by running independent steps concurrently once their dependencies are satisfied.", - "Handler functions receive `(input, context)` parameters - The second parameter provides platform resources like `context.sql`, `context.supabase`, `context.env`, `context.stepTask`, and `context.rawMessage` without requiring global imports or connection setup.", + "Handler functions receive context as second parameter - Root steps: `(flowInput, ctx)`, dependent steps: `(deps, ctx)`. Context provides platform resources like `ctx.sql`, `ctx.supabase`, `ctx.env`, `ctx.flowInput`, `ctx.stepTask`, and `ctx.rawMessage` without requiring global imports or connection setup.", "Flows must be compiled before execution - Run `npx pgflow@latest compile` to convert TypeScript flow definitions into SQL migrations that register the flow structure in the database.", "Registered flows are immutable - Once a flow is registered in the database, its structure cannot be modified; use versioning or deletion (development only) to make changes.", "Use camelCase for step slugs - Step slugs follow JavaScript naming conventions (e.g., `fetchData`, not `fetch_data`) since they become property names in TypeScript for accessing outputs.", diff --git a/pkgs/client/__tests__/e2e/full-stack-dsl.test.ts b/pkgs/client/__tests__/e2e/full-stack-dsl.test.ts index 9ba7e3283..41f6e0489 100644 --- a/pkgs/client/__tests__/e2e/full-stack-dsl.test.ts +++ b/pkgs/client/__tests__/e2e/full-stack-dsl.test.ts @@ -132,7 +132,7 @@ describe('Full Stack DSL Integration', () => { tasks = await readAndStart(sql, sqlClient, SimpleFlow.slug, 1, 5); expect(tasks).toHaveLength(1); expect(tasks[0].step_slug).toBe('save'); - expect(tasks[0].input.run).toEqual(input); + expect(tasks[0].flow_input).toEqual(input); // The save step only depends on process, so it should only have process output // This is correct behavior - transitive dependencies are not automatically included diff --git a/pkgs/client/__tests__/e2e/happy-path-e2e.test.ts b/pkgs/client/__tests__/e2e/happy-path-e2e.test.ts index a9db6ff40..c052a1378 100644 --- a/pkgs/client/__tests__/e2e/happy-path-e2e.test.ts +++ b/pkgs/client/__tests__/e2e/happy-path-e2e.test.ts @@ -126,8 +126,8 @@ describe('Happy Path E2E Integration', () => { expect(tasks[0].step_slug).toBe('save'); log('Save task input:', JSON.stringify(tasks[0].input, null, 2)); - - expect(tasks[0].input.run).toEqual(input); + + expect(tasks[0].flow_input).toEqual(input); expect(tasks[0].input.process).toEqual(processOutput); // Note: save step only depends on process, so fetch output is not included (correct behavior) expect(tasks[0].input.fetch).toBeUndefined(); diff --git a/pkgs/core/schemas/0120_function_start_tasks.sql b/pkgs/core/schemas/0120_function_start_tasks.sql index ccdfdfea6..1e790b4cf 100644 --- a/pkgs/core/schemas/0120_function_start_tasks.sql +++ b/pkgs/core/schemas/0120_function_start_tasks.sql @@ -154,21 +154,19 @@ as $$ END -- -------------------- NON-MAP STEPS -------------------- - -- Regular (non-map) steps receive ALL inputs as a structured object. - -- This includes the original run input plus all dependency outputs. + -- Regular (non-map) steps receive dependency outputs as a structured object. + -- Root steps (no dependencies) get empty object - they access flowInput via context. + -- Dependent steps get only their dependency outputs. ELSE - -- Non-map steps get structured input with named keys - -- Example output: { - -- "run": {"original": "input"}, + -- Non-map steps get structured input with dependency keys only + -- Example for dependent step: { -- "step1": {"output": "from_step1"}, -- "step2": {"output": "from_step2"} -- } + -- Example for root step: {} -- - -- Build object with 'run' key containing original input - jsonb_build_object('run', r.input) || - -- Merge with deps_output which already has dependency outputs - -- deps_output format: {"dep1": output1, "dep2": output2, ...} - -- If no dependencies, defaults to empty object + -- Note: flow_input is available separately in the returned record + -- for workers to access via context.flowInput coalesce(dep_out.deps_output, '{}'::jsonb) END as input, st.message_id as msg_id, diff --git a/pkgs/core/supabase/migrations/20251223052347_pgflow_add_flow_input_column.sql b/pkgs/core/supabase/migrations/20251223110504_pgflow_add_flow_input_column.sql similarity index 91% rename from pkgs/core/supabase/migrations/20251223052347_pgflow_add_flow_input_column.sql rename to pkgs/core/supabase/migrations/20251223110504_pgflow_add_flow_input_column.sql index bbcf4915d..a3335d4cf 100644 --- a/pkgs/core/supabase/migrations/20251223052347_pgflow_add_flow_input_column.sql +++ b/pkgs/core/supabase/migrations/20251223110504_pgflow_add_flow_input_column.sql @@ -148,21 +148,19 @@ with tasks as ( END -- -------------------- NON-MAP STEPS -------------------- - -- Regular (non-map) steps receive ALL inputs as a structured object. - -- This includes the original run input plus all dependency outputs. + -- Regular (non-map) steps receive dependency outputs as a structured object. + -- Root steps (no dependencies) get empty object - they access flowInput via context. + -- Dependent steps get only their dependency outputs. ELSE - -- Non-map steps get structured input with named keys - -- Example output: { - -- "run": {"original": "input"}, + -- Non-map steps get structured input with dependency keys only + -- Example for dependent step: { -- "step1": {"output": "from_step1"}, -- "step2": {"output": "from_step2"} -- } + -- Example for root step: {} -- - -- Build object with 'run' key containing original input - jsonb_build_object('run', r.input) || - -- Merge with deps_output which already has dependency outputs - -- deps_output format: {"dep1": output1, "dep2": output2, ...} - -- If no dependencies, defaults to empty object + -- Note: flow_input is available separately in the returned record + -- for workers to access via context.flowInput coalesce(dep_out.deps_output, '{}'::jsonb) END as input, st.message_id as msg_id, diff --git a/pkgs/core/supabase/migrations/atlas.sum b/pkgs/core/supabase/migrations/atlas.sum index 1757e7d0b..aeb82b508 100644 --- a/pkgs/core/supabase/migrations/atlas.sum +++ b/pkgs/core/supabase/migrations/atlas.sum @@ -1,4 +1,4 @@ -h1:nPCLpuNY7xbRl3lNa/ns7GG1Uf3twA0VSK7DIVBhzsc= +h1:32eppXsursErjjXT7SfNzyRPfw4iiiL1Pnafyy4DSvk= 20250429164909_pgflow_initial.sql h1:I3n/tQIg5Q5nLg7RDoU3BzqHvFVjmumQxVNbXTPG15s= 20250517072017_pgflow_fix_poll_for_tasks_to_use_separate_statement_for_polling.sql h1:wTuXuwMxVniCr3ONCpodpVWJcHktoQZIbqMZ3sUHKMY= 20250609105135_pgflow_add_start_tasks_and_started_status.sql h1:ggGanW4Wyt8Kv6TWjnZ00/qVb3sm+/eFVDjGfT8qyPg= @@ -14,4 +14,4 @@ h1:nPCLpuNY7xbRl3lNa/ns7GG1Uf3twA0VSK7DIVBhzsc= 20251130000000_pgflow_auto_compilation.sql h1:qs+3qq1Vsyo0ETzbxDnmkVtSUa6XHkd/K9wF/3W46jM= 20251209074533_pgflow_worker_management.sql h1:ozFkYM1EvEH7dGvW+1pWgpzbwdWlwuQoddqLomL1GXw= 20251212100113_pgflow_allow_data_loss_parameter.sql h1:Fg3RHj51STNHS4epQ2J4AFMj7NwG0XfyDTSA/9dcBIQ= -20251223052347_pgflow_add_flow_input_column.sql h1:RwQBPOMml4cCR6qeqd5kVQKhxXAbFqkHEJa0ruXYiTo= +20251223110504_pgflow_add_flow_input_column.sql h1:HJ9GJc4xh68JT82JINf946RS/jQNt9jDhvIH4VA40c8= diff --git a/pkgs/core/supabase/tests/map_output_aggregation/basic_aggregation.test.sql b/pkgs/core/supabase/tests/map_output_aggregation/basic_aggregation.test.sql index 7688f26ba..0ac5b1472 100644 --- a/pkgs/core/supabase/tests/map_output_aggregation/basic_aggregation.test.sql +++ b/pkgs/core/supabase/tests/map_output_aggregation/basic_aggregation.test.sql @@ -42,10 +42,10 @@ begin end $$; -- Check that single_step receives aggregated array as input +-- Dependent steps only get dependency outputs (no 'run' key) select is( (select input from pgflow_tests.read_and_start('test_agg', 1, 1)), jsonb_build_object( - 'run', '[10, 20, 30]'::jsonb, 'map_step', jsonb_build_array( jsonb_build_object('result', 100), jsonb_build_object('result', 200), diff --git a/pkgs/core/supabase/tests/map_output_aggregation/mixed_dependencies.test.sql b/pkgs/core/supabase/tests/map_output_aggregation/mixed_dependencies.test.sql index cebc13fcd..d7fef49d9 100644 --- a/pkgs/core/supabase/tests/map_output_aggregation/mixed_dependencies.test.sql +++ b/pkgs/core/supabase/tests/map_output_aggregation/mixed_dependencies.test.sql @@ -89,10 +89,10 @@ begin end $$; -- Verify consumer receives both outputs correctly +-- Dependent steps only get dependency outputs (no 'run' key) select is( (select input from pgflow_tests.read_and_start('test_mixed', 1, 1)), jsonb_build_object( - 'run', jsonb_build_object('config', 'test'), 'single_src', jsonb_build_object('processed', true, 'value', 100), 'map_src', jsonb_build_array('"A"', '"B"', '"C"') -- Strings are JSON encoded ), diff --git a/pkgs/core/supabase/tests/map_output_aggregation/multiple_maps_to_single.test.sql b/pkgs/core/supabase/tests/map_output_aggregation/multiple_maps_to_single.test.sql index 1b57ef019..4544293a8 100644 --- a/pkgs/core/supabase/tests/map_output_aggregation/multiple_maps_to_single.test.sql +++ b/pkgs/core/supabase/tests/map_output_aggregation/multiple_maps_to_single.test.sql @@ -99,10 +99,10 @@ begin end $$; -- Verify collector receives both aggregated arrays +-- Dependent steps only get dependency outputs (no 'run' key) select is( (select input from pgflow_tests.read_and_start('test_multi_maps', 1, 1)), jsonb_build_object( - 'run', '{}'::jsonb, 'map_a', jsonb_build_array(20, 40, 60), 'map_b', jsonb_build_array('ALPHA', 'BETA') ), diff --git a/pkgs/dsl/README.md b/pkgs/dsl/README.md index 6b9dfd648..644d1ba32 100644 --- a/pkgs/dsl/README.md +++ b/pkgs/dsl/README.md @@ -38,23 +38,23 @@ export const AnalyzeWebsite = new Flow({ }) .step( { slug: 'website' }, - async (input) => await scrapeWebsite(input.run.url) + async (flowInput) => await scrapeWebsite(flowInput.url) ) .step( { slug: 'sentiment', dependsOn: ['website'] }, - async (input) => await analyzeSentiment(input.website.content) + async (deps) => await analyzeSentiment(deps.website.content) ) .step( { slug: 'summary', dependsOn: ['website'] }, - async (input) => await summarizeWithAI(input.website.content) + async (deps) => await summarizeWithAI(deps.website.content) ) .step( { slug: 'saveToDb', dependsOn: ['sentiment', 'summary'] }, - async (input) => { + async (deps, ctx) => { return await saveToDb({ - websiteUrl: input.run.url, - sentiment: input.sentiment.score, - summary: input.summary.aiSummary, + websiteUrl: ctx.flowInput.url, + sentiment: deps.sentiment.score, + summary: deps.summary.aiSummary, }); } ); @@ -62,16 +62,22 @@ export const AnalyzeWebsite = new Flow({ ### Understanding Data Flow -In pgflow, each step receives an `input` object that contains: +In pgflow, step handlers use **asymmetric signatures** based on whether they have dependencies: -1. **`input.run`** - The original flow input (available to all steps) -2. **`input.{stepName}`** - Outputs from dependency steps +**Root steps (no dependencies):** +- First parameter: `flowInput` - the original flow input directly +- Second parameter: `ctx` - context object (env, supabase, flowInput, etc.) + +**Dependent steps (with dependsOn):** +- First parameter: `deps` - object with outputs from dependency steps (`deps.{stepName}`) +- Second parameter: `ctx` - context object (includes `ctx.flowInput` if needed) This design ensures: -- Original flow parameters are accessible throughout the entire flow -- Data doesn't need to be manually forwarded through intermediate steps -- Steps can combine original input with processed data from previous steps +- Root steps receive flow input directly for clean, simple handlers +- Dependent steps focus on their dependencies without wrapping +- Original flow input is always accessible via `ctx.flowInput` when needed +- Steps can combine dependency outputs with original input via context ### Step Methods @@ -84,8 +90,9 @@ The standard method for adding steps to a flow. Each step processes input and re ```typescript .step( { slug: 'process', dependsOn: ['previous'] }, - async (input) => { - // Access input.run and input.previous + async (deps, ctx) => { + // Access deps.previous for dependency output + // Access ctx.flowInput if original flow input is needed return { result: 'processed' }; } ) @@ -105,7 +112,7 @@ A semantic wrapper around `.step()` that provides type enforcement for steps tha // With dependencies - combining data from multiple sources .array( { slug: 'combineResults', dependsOn: ['source1', 'source2'] }, - async (input) => [...input.source1, ...input.source2] + async (deps) => [...deps.source1, ...deps.source2] ) ``` @@ -200,9 +207,9 @@ new Flow<{}>({ slug: 'etlPipeline' }) .map({ slug: 'extract', array: 'scrape' }, (html) => { return extractData(html); }) - .step({ slug: 'aggregate', dependsOn: ['extract'] }, (input) => { - // input.extract is the aggregated array from all map tasks - return consolidateResults(input.extract); + .step({ slug: 'aggregate', dependsOn: ['extract'] }, (deps) => { + // deps.extract is the aggregated array from all map tasks + return consolidateResults(deps.extract); }); ``` @@ -218,9 +225,9 @@ Step handlers can optionally receive a second parameter - the **context object** ```typescript .step( { slug: 'saveToDb' }, - async (input, context) => { + async (flowInput, ctx) => { // Access platform resources through context - const result = await context.sql`SELECT * FROM users WHERE id = ${input.userId}`; + const result = await ctx.sql`SELECT * FROM users WHERE id = ${flowInput.userId}`; return result[0]; } ) @@ -230,40 +237,40 @@ Step handlers can optionally receive a second parameter - the **context object** All platforms provide these core resources: -- **`context.env`** - Environment variables (`Record`) -- **`context.shutdownSignal`** - AbortSignal for graceful shutdown handling -- **`context.rawMessage`** - Original pgmq message with metadata +- **`ctx.env`** - Environment variables (`Record`) +- **`ctx.flowInput`** - Original flow input (typed as the flow's input type) +- **`ctx.shutdownSignal`** - AbortSignal for graceful shutdown handling +- **`ctx.rawMessage`** - Original pgmq message with metadata ```typescript interface PgmqMessageRecord { msg_id: number; read_ct: number; enqueued_at: Date; vt: Date; - message: T; // <-- this is your 'input' + message: T; } ``` -- **`context.stepTask`** - Current step task details (flow handlers only) +- **`ctx.stepTask`** - Current step task details (flow handlers only) ```typescript interface StepTaskRecord { flow_slug: string; run_id: string; step_slug: string; - input: StepInput; // <-- this is handler 'input' msg_id: number; } ``` -- **`context.workerConfig`** - Resolved worker configuration with all defaults applied +- **`ctx.workerConfig`** - Resolved worker configuration with all defaults applied ```typescript // Provides access to worker settings like retry limits - const isLastAttempt = context.rawMessage.read_ct >= context.workerConfig.retry.limit; + const isLastAttempt = ctx.rawMessage.read_ct >= ctx.workerConfig.retry.limit; ``` #### Supabase Platform Resources When using the Supabase platform with EdgeWorker, additional resources are available: -- **`context.sql`** - PostgreSQL client (postgres.js) -- **`context.supabase`** - Supabase client with service role key for full database access +- **`ctx.sql`** - PostgreSQL client (postgres.js) +- **`ctx.supabase`** - Supabase client with service role key for full database access To use Supabase resources, import the `Flow` class from the Supabase preset: @@ -272,17 +279,17 @@ import { Flow } from '@pgflow/dsl/supabase'; const MyFlow = new Flow<{ userId: string }>({ slug: 'myFlow', -}).step({ slug: 'process' }, async (input, context) => { - // TypeScript knows context includes Supabase resources - const { data } = await context.supabase +}).step({ slug: 'process' }, async (flowInput, ctx) => { + // TypeScript knows ctx includes Supabase resources + const { data } = await ctx.supabase .from('users') .select('*') - .eq('id', input.userId); + .eq('id', flowInput.userId); // Use SQL directly - const stats = await context.sql` - SELECT COUNT(*) as total FROM events - WHERE user_id = ${input.userId} + const stats = await ctx.sql` + SELECT COUNT(*) as total FROM events + WHERE user_id = ${flowInput.userId} `; return { user: data[0], eventCount: stats[0].total }; diff --git a/pkgs/dsl/__tests__/integration/array-integration.test.ts b/pkgs/dsl/__tests__/integration/array-integration.test.ts index d31bf158d..38f2724eb 100644 --- a/pkgs/dsl/__tests__/integration/array-integration.test.ts +++ b/pkgs/dsl/__tests__/integration/array-integration.test.ts @@ -11,8 +11,8 @@ describe('Array Integration Tests', () => { const dataFlow = new Flow({ slug: 'user_processing_pipeline' }) // Fetch user data as array - .array({ slug: 'users' }, ({ run }) => - run.userIds.map(id => ({ + .array({ slug: 'users' }, (flowInput) => + flowInput.userIds.map(id => ({ id, name: `User ${id}`, active: id % 2 === 1, // Odd IDs are active @@ -20,27 +20,27 @@ describe('Array Integration Tests', () => { })) ) // Filter users based on criteria - .array({ slug: 'filtered_users', dependsOn: ['users'] }, ({ users, run }) => - run.includeInactive - ? users - : users.filter(user => user.active) + .array({ slug: 'filtered_users', dependsOn: ['users'] }, (deps, ctx) => + ctx.flowInput.includeInactive + ? deps.users + : deps.users.filter(user => user.active) ) // Calculate statistics from filtered users - .step({ slug: 'stats', dependsOn: ['filtered_users'] }, ({ filtered_users }) => ({ - count: filtered_users.length, - averageScore: filtered_users.length > 0 - ? filtered_users.reduce((sum, user) => sum + user.score, 0) / filtered_users.length + .step({ slug: 'stats', dependsOn: ['filtered_users'] }, (deps) => ({ + count: deps.filtered_users.length, + averageScore: deps.filtered_users.length > 0 + ? deps.filtered_users.reduce((sum, user) => sum + user.score, 0) / deps.filtered_users.length : 0, - activeCount: filtered_users.filter(user => user.active).length + activeCount: deps.filtered_users.filter(user => user.active).length })) // Generate reports array based on stats - .array({ slug: 'reports', dependsOn: ['stats', 'filtered_users'] }, ({ stats, filtered_users }) => - filtered_users.map(user => ({ + .array({ slug: 'reports', dependsOn: ['stats', 'filtered_users'] }, (deps) => + deps.filtered_users.map(user => ({ userId: user.id, userName: user.name, score: user.score, - percentile: stats.averageScore > 0 ? (user.score / stats.averageScore) * 100 : 0, - isAboveAverage: user.score > stats.averageScore + percentile: deps.stats.averageScore > 0 ? (user.score / deps.stats.averageScore) * 100 : 0, + isAboveAverage: user.score > deps.stats.averageScore })) ); @@ -49,23 +49,20 @@ describe('Array Integration Tests', () => { // Execute each step const usersHandler = dataFlow.getStepDefinition('users').handler; - const usersResult = await usersHandler({ run: input }); + const usersResult = await usersHandler(input); const filteredHandler = dataFlow.getStepDefinition('filtered_users').handler; - const filteredResult = await filteredHandler({ - run: input, - users: usersResult - }); + const filteredResult = await filteredHandler({ + users: usersResult + }, { flowInput: input }); const statsHandler = dataFlow.getStepDefinition('stats').handler; - const statsResult = await statsHandler({ - run: input, - filtered_users: filteredResult + const statsResult = await statsHandler({ + filtered_users: filteredResult }); const reportsHandler = dataFlow.getStepDefinition('reports').handler; - const reportsResult = await reportsHandler({ - run: input, + const reportsResult = await reportsHandler({ stats: statsResult, filtered_users: filteredResult }); @@ -87,17 +84,17 @@ describe('Array Integration Tests', () => { const pyramidFlow = new Flow({ slug: 'pyramid_builder' }) // Generate base level items - .array({ slug: 'level_0' }, ({ run }) => - Array(run.itemsPerLevel).fill(0).map((_, i) => ({ + .array({ slug: 'level_0' }, (flowInput) => + Array(flowInput.itemsPerLevel).fill(0).map((_, i) => ({ id: i, level: 0, value: i + 1 })) ) // Build level 1 from level 0 - .array({ slug: 'level_1', dependsOn: ['level_0'] }, ({ level_0, run }) => { - if (run.levels <= 1) return []; - return level_0.map(item => ({ + .array({ slug: 'level_1', dependsOn: ['level_0'] }, (deps, ctx) => { + if (ctx.flowInput.levels <= 1) return []; + return deps.level_0.map(item => ({ id: item.id + 100, level: 1, value: item.value * 2, @@ -105,9 +102,9 @@ describe('Array Integration Tests', () => { })); }) // Build level 2 from level 1 - .array({ slug: 'level_2', dependsOn: ['level_1'] }, ({ level_1, run }) => { - if (run.levels <= 2) return []; - return level_1.map(item => ({ + .array({ slug: 'level_2', dependsOn: ['level_1'] }, (deps, ctx) => { + if (ctx.flowInput.levels <= 2) return []; + return deps.level_1.map(item => ({ id: item.id + 100, level: 2, value: item.value * 2, @@ -115,18 +112,18 @@ describe('Array Integration Tests', () => { })); }) // Aggregate all levels - .step({ slug: 'summary', dependsOn: ['level_0', 'level_1', 'level_2'] }, - ({ level_0, level_1, level_2 }) => ({ - totalItems: level_0.length + level_1.length + level_2.length, + .step({ slug: 'summary', dependsOn: ['level_0', 'level_1', 'level_2'] }, + (deps) => ({ + totalItems: deps.level_0.length + deps.level_1.length + deps.level_2.length, totalValue: [ - ...level_0.map(item => item.value), - ...level_1.map(item => item.value), - ...level_2.map(item => item.value) + ...deps.level_0.map(item => item.value), + ...deps.level_1.map(item => item.value), + ...deps.level_2.map(item => item.value) ].reduce((sum, val) => sum + val, 0), levelCounts: { - level0: level_0.length, - level1: level_1.length, - level2: level_2.length + level0: deps.level_0.length, + level1: deps.level_1.length, + level2: deps.level_2.length } }) ); @@ -135,17 +132,16 @@ describe('Array Integration Tests', () => { const input = { levels: 3, itemsPerLevel: 2 }; const level0Handler = pyramidFlow.getStepDefinition('level_0').handler; - const level0Result = await level0Handler({ run: input }); + const level0Result = await level0Handler(input); const level1Handler = pyramidFlow.getStepDefinition('level_1').handler; - const level1Result = await level1Handler({ run: input, level_0: level0Result }); + const level1Result = await level1Handler({ level_0: level0Result }, { flowInput: input }); const level2Handler = pyramidFlow.getStepDefinition('level_2').handler; - const level2Result = await level2Handler({ run: input, level_1: level1Result }); + const level2Result = await level2Handler({ level_1: level1Result }, { flowInput: input }); const summaryHandler = pyramidFlow.getStepDefinition('summary').handler; const summaryResult = await summaryHandler({ - run: input, level_0: level0Result, level_1: level1Result, level_2: level2Result @@ -176,35 +172,35 @@ describe('Array Integration Tests', () => { const processingFlow = new Flow({ slug: 'mixed_processing' }) // Configuration step (regular) - .step({ slug: 'config' }, ({ run }) => ({ - source: run.dataSource, - maxItems: run.batchSize * 10, - processingMode: run.batchSize > 100 ? 'parallel' : 'sequential' + .step({ slug: 'config' }, (flowInput) => ({ + source: flowInput.dataSource, + maxItems: flowInput.batchSize * 10, + processingMode: flowInput.batchSize > 100 ? 'parallel' : 'sequential' })) // Generate data array based on config - .array({ slug: 'raw_data', dependsOn: ['config'] }, ({ config }) => - Array(Math.min(config.maxItems, 50)).fill(0).map((_, i) => ({ + .array({ slug: 'raw_data', dependsOn: ['config'] }, (deps) => + Array(Math.min(deps.config.maxItems, 50)).fill(0).map((_, i) => ({ id: i, - source: config.source, + source: deps.config.source, data: `item_${i}`, timestamp: Date.now() + i })) ) // Batch processing (regular step that groups array data) - .step({ slug: 'batches', dependsOn: ['raw_data', 'config'] }, ({ raw_data, config, run }) => { + .step({ slug: 'batches', dependsOn: ['raw_data', 'config'] }, (deps, ctx) => { const batches = []; - for (let i = 0; i < raw_data.length; i += run.batchSize) { + for (let i = 0; i < deps.raw_data.length; i += ctx.flowInput.batchSize) { batches.push({ - id: Math.floor(i / run.batchSize), - items: raw_data.slice(i, i + run.batchSize), - mode: config.processingMode + id: Math.floor(i / ctx.flowInput.batchSize), + items: deps.raw_data.slice(i, i + ctx.flowInput.batchSize), + mode: deps.config.processingMode }); } return { batches, count: batches.length }; }) // Process each batch into results array - .array({ slug: 'processed_batches', dependsOn: ['batches'] }, ({ batches }) => - batches.batches.map(batch => ({ + .array({ slug: 'processed_batches', dependsOn: ['batches'] }, (deps) => + deps.batches.batches.map(batch => ({ batchId: batch.id, processedCount: batch.items.length, mode: batch.mode, @@ -216,14 +212,14 @@ describe('Array Integration Tests', () => { })) ) // Final summary (regular step) - .step({ slug: 'final_summary', dependsOn: ['processed_batches', 'config'] }, - ({ processed_batches, config }) => ({ - totalBatches: processed_batches.length, - totalProcessedItems: processed_batches.reduce( + .step({ slug: 'final_summary', dependsOn: ['processed_batches', 'config'] }, + (deps) => ({ + totalBatches: deps.processed_batches.length, + totalProcessedItems: deps.processed_batches.reduce( (sum, batch) => sum + batch.processedCount, 0 ), - processingMode: config.processingMode, - allSuccessful: processed_batches.every(batch => + processingMode: deps.config.processingMode, + allSuccessful: deps.processed_batches.every(batch => batch.results.every(result => result.processed) ) }) @@ -233,27 +229,24 @@ describe('Array Integration Tests', () => { const input = { dataSource: 'test-db', batchSize: 5 }; const configHandler = processingFlow.getStepDefinition('config').handler; - const configResult = await configHandler({ run: input }); + const configResult = await configHandler(input); const rawDataHandler = processingFlow.getStepDefinition('raw_data').handler; - const rawDataResult = await rawDataHandler({ run: input, config: configResult }); + const rawDataResult = await rawDataHandler({ config: configResult }); const batchesHandler = processingFlow.getStepDefinition('batches').handler; - const batchesResult = await batchesHandler({ - run: input, - raw_data: rawDataResult, - config: configResult - }); + const batchesResult = await batchesHandler({ + raw_data: rawDataResult, + config: configResult + }, { flowInput: input }); const processedHandler = processingFlow.getStepDefinition('processed_batches').handler; - const processedResult = await processedHandler({ - run: input, - batches: batchesResult + const processedResult = await processedHandler({ + batches: batchesResult }); const summaryHandler = processingFlow.getStepDefinition('final_summary').handler; - const summaryResult = await summaryHandler({ - run: input, + const summaryResult = await summaryHandler({ processed_batches: processedResult, config: configResult }); @@ -275,44 +268,43 @@ describe('Array Integration Tests', () => { const diamondFlow = new Flow({ slug: 'diamond_pattern' }) // Root step - .step({ slug: 'root' }, ({ run }) => ({ value: run.base, multiplier: 2 })) - + .step({ slug: 'root' }, (flowInput) => ({ value: flowInput.base, multiplier: 2 })) + // Two parallel array branches from root - .array({ slug: 'left_branch', dependsOn: ['root'] }, ({ root }) => - [root.value, root.value + 1, root.value + 2].map(val => ({ - value: val * root.multiplier, - branch: 'left' + .array({ slug: 'left_branch', dependsOn: ['root'] }, (deps) => + [deps.root.value, deps.root.value + 1, deps.root.value + 2].map(val => ({ + value: val * deps.root.multiplier, + branch: 'left' })) ) - .array({ slug: 'right_branch', dependsOn: ['root'] }, ({ root }) => - [root.value + 10, root.value + 20].map(val => ({ - value: val * root.multiplier, - branch: 'right' + .array({ slug: 'right_branch', dependsOn: ['root'] }, (deps) => + [deps.root.value + 10, deps.root.value + 20].map(val => ({ + value: val * deps.root.multiplier, + branch: 'right' })) ) - + // Merge both branches in final array step - .array({ slug: 'merged', dependsOn: ['left_branch', 'right_branch'] }, - ({ left_branch, right_branch }) => [ - ...left_branch.map(item => ({ ...item, merged: true })), - ...right_branch.map(item => ({ ...item, merged: true })) + .array({ slug: 'merged', dependsOn: ['left_branch', 'right_branch'] }, + (deps) => [ + ...deps.left_branch.map(item => ({ ...item, merged: true })), + ...deps.right_branch.map(item => ({ ...item, merged: true })) ] ); const input = { base: 5 }; const rootHandler = diamondFlow.getStepDefinition('root').handler; - const rootResult = await rootHandler({ run: input }); + const rootResult = await rootHandler(input); const leftHandler = diamondFlow.getStepDefinition('left_branch').handler; - const leftResult = await leftHandler({ run: input, root: rootResult }); + const leftResult = await leftHandler({ root: rootResult }); const rightHandler = diamondFlow.getStepDefinition('right_branch').handler; - const rightResult = await rightHandler({ run: input, root: rootResult }); + const rightResult = await rightHandler({ root: rootResult }); const mergedHandler = diamondFlow.getStepDefinition('merged').handler; - const mergedResult = await mergedHandler({ - run: input, + const mergedResult = await mergedHandler({ left_branch: leftResult, right_branch: rightResult }); @@ -335,15 +327,15 @@ describe('Array Integration Tests', () => { type Input = { seed: number }; const chainFlow = new Flow({ slug: 'deep_chain' }) - .array({ slug: 'generation_1' }, ({ run }) => - Array(3).fill(0).map((_, i) => ({ - id: i, - generation: 1, - value: run.seed + i + .array({ slug: 'generation_1' }, (flowInput) => + Array(3).fill(0).map((_, i) => ({ + id: i, + generation: 1, + value: flowInput.seed + i })) ) - .array({ slug: 'generation_2', dependsOn: ['generation_1'] }, ({ generation_1 }) => - generation_1.flatMap(parent => + .array({ slug: 'generation_2', dependsOn: ['generation_1'] }, (deps) => + deps.generation_1.flatMap(parent => Array(2).fill(0).map((_, i) => ({ id: parent.id * 2 + i, generation: 2, @@ -352,8 +344,8 @@ describe('Array Integration Tests', () => { })) ) ) - .array({ slug: 'generation_3', dependsOn: ['generation_2'] }, ({ generation_2 }) => - generation_2.flatMap(parent => + .array({ slug: 'generation_3', dependsOn: ['generation_2'] }, (deps) => + deps.generation_2.flatMap(parent => Array(2).fill(0).map((_, i) => ({ id: parent.id * 2 + i, generation: 3, @@ -363,33 +355,32 @@ describe('Array Integration Tests', () => { })) ) ) - .step({ slug: 'lineage_analysis', dependsOn: ['generation_1', 'generation_2', 'generation_3'] }, - ({ generation_1, generation_2, generation_3 }) => ({ - totalNodes: generation_1.length + generation_2.length + generation_3.length, - generationCounts: [generation_1.length, generation_2.length, generation_3.length], + .step({ slug: 'lineage_analysis', dependsOn: ['generation_1', 'generation_2', 'generation_3'] }, + (deps) => ({ + totalNodes: deps.generation_1.length + deps.generation_2.length + deps.generation_3.length, + generationCounts: [deps.generation_1.length, deps.generation_2.length, deps.generation_3.length], maxValue: Math.max( - ...generation_1.map(n => n.value), - ...generation_2.map(n => n.value), - ...generation_3.map(n => n.value) + ...deps.generation_1.map(n => n.value), + ...deps.generation_2.map(n => n.value), + ...deps.generation_3.map(n => n.value) ), - leafNodes: generation_3.length + leafNodes: deps.generation_3.length }) ); const input = { seed: 10 }; const gen1Handler = chainFlow.getStepDefinition('generation_1').handler; - const gen1Result = await gen1Handler({ run: input }); + const gen1Result = await gen1Handler(input); const gen2Handler = chainFlow.getStepDefinition('generation_2').handler; - const gen2Result = await gen2Handler({ run: input, generation_1: gen1Result }); + const gen2Result = await gen2Handler({ generation_1: gen1Result }); const gen3Handler = chainFlow.getStepDefinition('generation_3').handler; - const gen3Result = await gen3Handler({ run: input, generation_2: gen2Result }); + const gen3Result = await gen3Handler({ generation_2: gen2Result }); const analysisHandler = chainFlow.getStepDefinition('lineage_analysis').handler; const analysisResult = await analysisHandler({ - run: input, generation_1: gen1Result, generation_2: gen2Result, generation_3: gen3Result @@ -416,34 +407,32 @@ describe('Array Integration Tests', () => { type Input = { includeItems: boolean }; const emptyFlow = new Flow({ slug: 'empty_handling' }) - .array({ slug: 'conditional_items' }, ({ run }) => - run.includeItems ? [1, 2, 3] : [] + .array({ slug: 'conditional_items' }, (flowInput) => + flowInput.includeItems ? [1, 2, 3] : [] ) - .array({ slug: 'processed_items', dependsOn: ['conditional_items'] }, ({ conditional_items }) => - conditional_items.map(item => ({ processed: item * 2 })) + .array({ slug: 'processed_items', dependsOn: ['conditional_items'] }, (deps) => + deps.conditional_items.map(item => ({ processed: item * 2 })) ) - .step({ slug: 'summary', dependsOn: ['processed_items'] }, ({ processed_items }) => ({ - count: processed_items.length, - hasItems: processed_items.length > 0, - total: processed_items.reduce((sum, item) => sum + item.processed, 0) + .step({ slug: 'summary', dependsOn: ['processed_items'] }, (deps) => ({ + count: deps.processed_items.length, + hasItems: deps.processed_items.length > 0, + total: deps.processed_items.reduce((sum, item) => sum + item.processed, 0) })); // Test with empty array const emptyInput = { includeItems: false }; - + const conditionalHandler = emptyFlow.getStepDefinition('conditional_items').handler; - const conditionalResult = await conditionalHandler({ run: emptyInput }); + const conditionalResult = await conditionalHandler(emptyInput); const processedHandler = emptyFlow.getStepDefinition('processed_items').handler; - const processedResult = await processedHandler({ - run: emptyInput, - conditional_items: conditionalResult + const processedResult = await processedHandler({ + conditional_items: conditionalResult }); const summaryHandler = emptyFlow.getStepDefinition('summary').handler; - const summaryResult = await summaryHandler({ - run: emptyInput, - processed_items: processedResult + const summaryResult = await summaryHandler({ + processed_items: processedResult }); expect(conditionalResult).toEqual([]); @@ -459,46 +448,44 @@ describe('Array Integration Tests', () => { type Input = { delays: number[] }; const asyncFlow = new Flow({ slug: 'async_operations' }) - .array({ slug: 'async_data' }, async ({ run }) => { + .array({ slug: 'async_data' }, async (flowInput) => { // Simulate async operations with different delays - const promises = run.delays.map(async (delay, index) => { + const promises = flowInput.delays.map(async (delay, index) => { await new Promise(resolve => setTimeout(resolve, delay)); return { id: index, delay, completed: Date.now() }; }); return Promise.all(promises); }) - .array({ slug: 'validated_data', dependsOn: ['async_data'] }, async ({ async_data }) => { + .array({ slug: 'validated_data', dependsOn: ['async_data'] }, async (deps) => { // Simulate async validation await new Promise(resolve => setTimeout(resolve, 1)); - return async_data.filter(item => item.delay < 100).map(item => ({ + return deps.async_data.filter(item => item.delay < 100).map(item => ({ ...item, validated: true, validatedAt: Date.now() })); }) - .step({ slug: 'timing_analysis', dependsOn: ['validated_data'] }, ({ validated_data }) => ({ - validatedCount: validated_data.length, - averageDelay: validated_data.length > 0 - ? validated_data.reduce((sum, item) => sum + item.delay, 0) / validated_data.length + .step({ slug: 'timing_analysis', dependsOn: ['validated_data'] }, (deps) => ({ + validatedCount: deps.validated_data.length, + averageDelay: deps.validated_data.length > 0 + ? deps.validated_data.reduce((sum, item) => sum + item.delay, 0) / deps.validated_data.length : 0, - allValidated: validated_data.every(item => item.validated) + allValidated: deps.validated_data.every(item => item.validated) })); const input = { delays: [10, 50, 150, 30] }; // 150ms will be filtered out - + const asyncHandler = asyncFlow.getStepDefinition('async_data').handler; - const asyncResult = await asyncHandler({ run: input }); + const asyncResult = await asyncHandler(input); const validatedHandler = asyncFlow.getStepDefinition('validated_data').handler; - const validatedResult = await validatedHandler({ - run: input, - async_data: asyncResult + const validatedResult = await validatedHandler({ + async_data: asyncResult }); const timingHandler = asyncFlow.getStepDefinition('timing_analysis').handler; - const timingResult = await timingHandler({ - run: input, - validated_data: validatedResult + const timingResult = await timingHandler({ + validated_data: validatedResult }); expect(asyncResult).toHaveLength(4); diff --git a/pkgs/dsl/__tests__/integration/map-flow.test.ts b/pkgs/dsl/__tests__/integration/map-flow.test.ts index eab37972d..a94c1518a 100644 --- a/pkgs/dsl/__tests__/integration/map-flow.test.ts +++ b/pkgs/dsl/__tests__/integration/map-flow.test.ts @@ -12,10 +12,10 @@ describe('Map flow integration tests', () => { // Validate each normalized item return item.length > 0 && item.length < 100; }) - .step({ slug: 'summarize', dependsOn: ['validate'] }, (input) => ({ - total: input.validate.length, - valid: input.validate.filter(v => v).length, - invalid: input.validate.filter(v => !v).length + .step({ slug: 'summarize', dependsOn: ['validate'] }, (deps) => ({ + total: deps.validate.length, + valid: deps.validate.filter(v => v).length, + invalid: deps.validate.filter(v => !v).length })); const sql = compileFlow(flow); @@ -29,9 +29,9 @@ describe('Map flow integration tests', () => { it('should compile an ETL flow with array generation and mapping', () => { const flow = new Flow<{ sourceIds: string[] }>({ slug: 'etl_flow' }) - .array({ slug: 'fetch_data' }, async ({ run }) => { + .array({ slug: 'fetch_data' }, async (flowInput) => { // Simulating fetching data for each source ID - return run.sourceIds.map(id => ({ id, data: `data_${id}` })); + return flowInput.sourceIds.map(id => ({ id, data: `data_${id}` })); }) .map({ slug: 'transform', array: 'fetch_data' }, (record) => ({ ...record, @@ -43,10 +43,10 @@ describe('Map flow integration tests', () => { enriched: true, metadata: { processedAt: new Date().toISOString() } })) - .step({ slug: 'load', dependsOn: ['enrich'] }, async (input) => { + .step({ slug: 'load', dependsOn: ['enrich'] }, async (deps) => { // Final loading step return { - recordsProcessed: input.enrich.length, + recordsProcessed: deps.enrich.length, success: true }; }); @@ -64,8 +64,8 @@ describe('Map flow integration tests', () => { // Flow that processes nested arrays (e.g., matrix operations) const flow = new Flow({ slug: 'matrix_flow' }) .map({ slug: 'row_sums' }, (row) => row.reduce((a, b) => a + b, 0)) - .step({ slug: 'total_sum', dependsOn: ['row_sums'] }, (input) => - input.row_sums.reduce((a, b) => a + b, 0) + .step({ slug: 'total_sum', dependsOn: ['row_sums'] }, (deps) => + deps.row_sums.reduce((a, b) => a + b, 0) ); const sql = compileFlow(flow); @@ -130,12 +130,12 @@ describe('Map flow integration tests', () => { describe('type inference validation', () => { it('should correctly infer types through map chains', () => { const flow = new Flow<{ items: string[] }>({ slug: 'test' }) - .step({ slug: 'extract', dependsOn: [] }, ({ run }) => run.items) + .step({ slug: 'extract' }, (flowInput) => flowInput.items) .map({ slug: 'lengths', array: 'extract' }, (item) => item.length) .map({ slug: 'doubles', array: 'lengths' }, (len) => len * 2) - .step({ slug: 'sum', dependsOn: ['doubles'] }, (input) => { + .step({ slug: 'sum', dependsOn: ['doubles'] }, (deps) => { // Type checking - this should compile without errors - const total: number = input.doubles.reduce((a, b) => a + b, 0); + const total: number = deps.doubles.reduce((a, b) => a + b, 0); return total; }); @@ -190,13 +190,13 @@ describe('Map flow integration tests', () => { it('should handle multiple independent map chains', () => { const flow = new Flow<{ a: number[]; b: string[] }>({ slug: 'parallel' }) - .step({ slug: 'extract_a' }, ({ run }) => run.a) - .step({ slug: 'extract_b' }, ({ run }) => run.b) + .step({ slug: 'extract_a' }, (flowInput) => flowInput.a) + .step({ slug: 'extract_b' }, (flowInput) => flowInput.b) .map({ slug: 'process_a', array: 'extract_a' }, (n) => n * 2) .map({ slug: 'process_b', array: 'extract_b' }, (s) => s.toUpperCase()) - .step({ slug: 'combine', dependsOn: ['process_a', 'process_b'] }, (input) => ({ - numbers: input.process_a, - strings: input.process_b + .step({ slug: 'combine', dependsOn: ['process_a', 'process_b'] }, (deps) => ({ + numbers: deps.process_a, + strings: deps.process_b })); const sql = compileFlow(flow); diff --git a/pkgs/dsl/__tests__/runtime/array-method.test.ts b/pkgs/dsl/__tests__/runtime/array-method.test.ts index 9304d9978..971586e46 100644 --- a/pkgs/dsl/__tests__/runtime/array-method.test.ts +++ b/pkgs/dsl/__tests__/runtime/array-method.test.ts @@ -239,11 +239,11 @@ describe('.array() method', () => { it('allows array steps to access run input', async () => { const testFlow = new Flow<{ count: number }>({ slug: 'test_flow' }).array( { slug: 'numbers' }, - ({ run }) => Array(run.count).fill(0).map((_, i) => ({ index: i })) + (flowInput) => Array(flowInput.count).fill(0).map((_, i) => ({ index: i })) ); const numbersDef = testFlow.getStepDefinition('numbers'); - const result = await numbersDef.handler({ run: { count: 3 } }); + const result = await numbersDef.handler({ count: 3 }); expect(result).toEqual([ { index: 0 }, @@ -255,18 +255,17 @@ describe('.array() method', () => { it('allows steps to depend on array steps', async () => { const testFlow = new Flow<{ multiplier: number }>({ slug: 'test_flow' }) .array({ slug: 'numbers' }, () => [1, 2, 3]) - .step({ slug: 'sum', dependsOn: ['numbers'] }, ({ numbers, run }) => - numbers.reduce((acc, n) => acc + n * run.multiplier, 0) + .step({ slug: 'sum', dependsOn: ['numbers'] }, (deps, ctx) => + deps.numbers.reduce((acc, n) => acc + n * ctx.flowInput.multiplier, 0) ); const numbersDef = testFlow.getStepDefinition('numbers'); - const numbersResult = await numbersDef.handler({ run: { multiplier: 2 } }); + const numbersResult = await numbersDef.handler({ multiplier: 2 }); const sumDef = testFlow.getStepDefinition('sum'); - const sumResult = await sumDef.handler({ - run: { multiplier: 2 }, - numbers: numbersResult - }); + const sumResult = await sumDef.handler({ + numbers: numbersResult + }, { flowInput: { multiplier: 2 } }); expect(numbersResult).toEqual([1, 2, 3]); expect(sumResult).toBe(12); // (1 + 2 + 3) * 2 @@ -275,18 +274,17 @@ describe('.array() method', () => { it('allows array steps to depend on other array steps', async () => { const testFlow = new Flow<{ factor: number }>({ slug: 'test_flow' }) .array({ slug: 'base_numbers' }, () => [1, 2, 3]) - .array({ slug: 'doubled', dependsOn: ['base_numbers'] }, ({ base_numbers, run }) => - base_numbers.map(n => n * run.factor) + .array({ slug: 'doubled', dependsOn: ['base_numbers'] }, (deps, ctx) => + deps.base_numbers.map(n => n * ctx.flowInput.factor) ); const baseDef = testFlow.getStepDefinition('base_numbers'); - const baseResult = await baseDef.handler({ run: { factor: 2 } }); + const baseResult = await baseDef.handler({ factor: 2 }); const doubledDef = testFlow.getStepDefinition('doubled'); const doubledResult = await doubledDef.handler({ - run: { factor: 2 }, base_numbers: baseResult - }); + }, { flowInput: { factor: 2 } }); expect(baseResult).toEqual([1, 2, 3]); expect(doubledResult).toEqual([2, 4, 6]); @@ -294,17 +292,16 @@ describe('.array() method', () => { it('allows array steps to depend on regular steps', async () => { const testFlow = new Flow<{ size: number }>({ slug: 'test_flow' }) - .step({ slug: 'config' }, ({ run }) => ({ length: run.size, prefix: 'item' })) - .array({ slug: 'items', dependsOn: ['config'] }, ({ config }) => - Array(config.length).fill(0).map((_, i) => `${config.prefix}_${i}`) + .step({ slug: 'config' }, (flowInput) => ({ length: flowInput.size, prefix: 'item' })) + .array({ slug: 'items', dependsOn: ['config'] }, (deps) => + Array(deps.config.length).fill(0).map((_, i) => `${deps.config.prefix}_${i}`) ); const configDef = testFlow.getStepDefinition('config'); - const configResult = await configDef.handler({ run: { size: 3 } }); + const configResult = await configDef.handler({ size: 3 }); const itemsDef = testFlow.getStepDefinition('items'); const itemsResult = await itemsDef.handler({ - run: { size: 3 }, config: configResult }); @@ -314,21 +311,20 @@ describe('.array() method', () => { it('supports multiple dependencies for array steps', async () => { const testFlow = new Flow<{ base: number }>({ slug: 'test_flow' }) - .step({ slug: 'multiplier' }, ({ run }) => run.base * 2) - .step({ slug: 'offset' }, ({ run }) => run.base + 10) - .array({ slug: 'combined', dependsOn: ['multiplier', 'offset'] }, ({ multiplier, offset }) => - [multiplier, offset, multiplier + offset] + .step({ slug: 'multiplier' }, (flowInput) => flowInput.base * 2) + .step({ slug: 'offset' }, (flowInput) => flowInput.base + 10) + .array({ slug: 'combined', dependsOn: ['multiplier', 'offset'] }, (deps) => + [deps.multiplier, deps.offset, deps.multiplier + deps.offset] ); const multiplierDef = testFlow.getStepDefinition('multiplier'); - const multiplierResult = await multiplierDef.handler({ run: { base: 5 } }); + const multiplierResult = await multiplierDef.handler({ base: 5 }); const offsetDef = testFlow.getStepDefinition('offset'); - const offsetResult = await offsetDef.handler({ run: { base: 5 } }); + const offsetResult = await offsetDef.handler({ base: 5 }); const combinedDef = testFlow.getStepDefinition('combined'); const combinedResult = await combinedDef.handler({ - run: { base: 5 }, multiplier: multiplierResult, offset: offsetResult }); @@ -343,15 +339,15 @@ describe('.array() method', () => { it('supports async array handlers', async () => { const testFlow = new Flow<{ delay: number }>({ slug: 'test_flow' }).array( { slug: 'async_data' }, - async ({ run }) => { + async (flowInput) => { // Simulate async operation - await new Promise(resolve => setTimeout(resolve, run.delay)); + await new Promise(resolve => setTimeout(resolve, flowInput.delay)); return [{ processed: true }, { processed: false }]; } ); const asyncDef = testFlow.getStepDefinition('async_data'); - const result = await asyncDef.handler({ run: { delay: 1 } }); + const result = await asyncDef.handler({ delay: 1 }); expect(result).toEqual([ { processed: true }, diff --git a/pkgs/dsl/__tests__/runtime/flow-shape.test.ts b/pkgs/dsl/__tests__/runtime/flow-shape.test.ts index dc5a1ac4b..6b7971568 100644 --- a/pkgs/dsl/__tests__/runtime/flow-shape.test.ts +++ b/pkgs/dsl/__tests__/runtime/flow-shape.test.ts @@ -52,7 +52,7 @@ describe('extractFlowShape', () => { it('should extract a single step with no dependencies', () => { const flow = new Flow({ slug: 'test_flow' }).step( { slug: 'step1' }, - ({ run }) => run.toUpperCase() + (flowInput) => flowInput.toUpperCase() ); const shape = extractFlowShape(flow); @@ -66,8 +66,8 @@ describe('extractFlowShape', () => { it('should extract step with dependencies', () => { const flow = new Flow({ slug: 'test_flow' }) - .step({ slug: 'step1' }, ({ run }) => run) - .step({ slug: 'step2', dependsOn: ['step1'] }, ({ step1 }) => step1); + .step({ slug: 'step1' }, (flowInput) => flowInput) + .step({ slug: 'step2', dependsOn: ['step1'] }, (deps) => deps.step1); const shape = extractFlowShape(flow); expect(shape.steps).toHaveLength(2); @@ -99,7 +99,7 @@ describe('extractFlowShape', () => { timeout: 30, startDelay: 100, }, - ({ run }) => run + (flowInput) => flowInput ); const shape = extractFlowShape(flow); @@ -120,7 +120,7 @@ describe('extractFlowShape', () => { it('should omit step options key when no options defined', () => { const flow = new Flow({ slug: 'test_flow' }).step( { slug: 'step1' }, - ({ run }) => run + (flowInput) => flowInput ); const shape = extractFlowShape(flow); @@ -137,7 +137,7 @@ describe('extractFlowShape', () => { // When only some options are set, only those should appear const flow = new Flow({ slug: 'test_flow', maxAttempts: 5 }).step( { slug: 'step1', timeout: 30 }, - ({ run }) => run + (flowInput) => flowInput ); const shape = extractFlowShape(flow); @@ -187,7 +187,7 @@ describe('extractFlowShape', () => { baseDelay: 5, timeout: 10, }) - .step({ slug: 'website' }, ({ run }) => ({ content: run.url })) + .step({ slug: 'website' }, (flowInput) => ({ content: flowInput.url })) .step( { slug: 'sentiment', dependsOn: ['website'], maxAttempts: 5, timeout: 30 }, () => ({ score: 0.8 }) @@ -475,12 +475,12 @@ describe('compareFlowShapes', () => { // but don't affect shape matching (runtime tunable via SQL) const flowA = new Flow({ slug: 'test_flow', maxAttempts: 3 }).step( { slug: 'step1', timeout: 60 }, - ({ run }) => run + (flowInput) => flowInput ); const flowB = new Flow({ slug: 'test_flow', maxAttempts: 10 }).step( { slug: 'step1', timeout: 300, startDelay: 100 }, - ({ run }) => run + (flowInput) => flowInput ); const shapeA = extractFlowShape(flowA); @@ -544,8 +544,8 @@ describe('compareFlowShapes', () => { it('should match shapes extracted from identical flows', () => { const createFlow = () => new Flow({ slug: 'test_flow', maxAttempts: 3 }) - .step({ slug: 'step1' }, ({ run }) => run) - .step({ slug: 'step2', dependsOn: ['step1'] }, ({ step1 }) => step1); + .step({ slug: 'step1' }, (flowInput) => flowInput) + .step({ slug: 'step2', dependsOn: ['step1'] }, (deps) => deps.step1); const shapeA = extractFlowShape(createFlow()); const shapeB = extractFlowShape(createFlow()); @@ -558,11 +558,11 @@ describe('compareFlowShapes', () => { it('should detect difference when step is added', () => { const flowA = new Flow({ slug: 'test_flow' }).step( { slug: 'step1' }, - ({ run }) => run + (flowInput) => flowInput ); const flowB = new Flow({ slug: 'test_flow' }) - .step({ slug: 'step1' }, ({ run }) => run) + .step({ slug: 'step1' }, (flowInput) => flowInput) .step({ slug: 'step2' }, () => 'extra'); const shapeA = extractFlowShape(flowA); @@ -579,7 +579,7 @@ describe('compareFlowShapes', () => { it('should detect difference when step type changes', () => { const flowA = new Flow({ slug: 'test_flow' }).step( { slug: 'process' }, - ({ run }) => run.join(',') + (flowInput) => flowInput.join(',') ); const flowB = new Flow({ slug: 'test_flow' }).map( diff --git a/pkgs/dsl/__tests__/runtime/map-compile.test.ts b/pkgs/dsl/__tests__/runtime/map-compile.test.ts index 66bb755f7..00275e17c 100644 --- a/pkgs/dsl/__tests__/runtime/map-compile.test.ts +++ b/pkgs/dsl/__tests__/runtime/map-compile.test.ts @@ -56,7 +56,7 @@ describe('compileFlow with map steps', () => { describe('dependent map compilation', () => { it('should compile dependent map with array dependency', () => { const flow = new Flow<{ count: number }>({ slug: 'test_flow' }) - .array({ slug: 'nums' }, ({ run }) => Array(run.count).fill(0).map((_, i) => i)) + .array({ slug: 'nums' }, (flowInput) => Array(flowInput.count).fill(0).map((_, i) => i)) .map({ slug: 'double', array: 'nums' }, (n) => n * 2); const sql = compileFlow(flow); @@ -93,8 +93,8 @@ describe('compileFlow with map steps', () => { it('should compile flow with map and regular steps', () => { const flow = new Flow({ slug: 'test_flow' }) .map({ slug: 'double' }, (n) => n * 2) - .step({ slug: 'sum', dependsOn: ['double'] }, (input) => - input.double.reduce((a, b) => a + b, 0) + .step({ slug: 'sum', dependsOn: ['double'] }, (deps) => + deps.double.reduce((a, b) => a + b, 0) ); const sql = compileFlow(flow); @@ -123,8 +123,8 @@ describe('compileFlow with map steps', () => { const flow = new Flow>({ slug: 'test_flow' }) .array({ slug: 'generate' }, () => [1, 2, 3]) .map({ slug: 'square', array: 'generate' }, (n) => n * n) - .step({ slug: 'total', dependsOn: ['square'] }, (input) => ({ - sum: input.square.reduce((a, b) => a + b, 0) + .step({ slug: 'total', dependsOn: ['square'] }, (deps) => ({ + sum: deps.square.reduce((a, b) => a + b, 0) })); const sql = compileFlow(flow); diff --git a/pkgs/dsl/__tests__/runtime/steps.test.ts b/pkgs/dsl/__tests__/runtime/steps.test.ts index e8d701243..a675eea57 100644 --- a/pkgs/dsl/__tests__/runtime/steps.test.ts +++ b/pkgs/dsl/__tests__/runtime/steps.test.ts @@ -207,28 +207,27 @@ describe('Steps', () => { it('allows steps to access run input', async () => { const testFlow = new Flow<{ value: number }>({ slug: 'test_flow' }).step( { slug: 'step1' }, - ({ run }) => ({ doubled: run.value * 2 }) + (flowInput) => ({ doubled: flowInput.value * 2 }) ); const step1Def = testFlow.getStepDefinition('step1'); - const result = await step1Def.handler({ run: { value: 5 } }); + const result = await step1Def.handler({ value: 5 }); expect(result).toEqual({ doubled: 10 }); }); it('allows steps to access dependencies', async () => { const testFlow = new Flow<{ value: number }>({ slug: 'test_flow' }) - .step({ slug: 'step1' }, ({ run }) => ({ doubled: run.value * 2 })) - .step({ slug: 'step2', dependsOn: ['step1'] }, ({ step1 }) => ({ - quadrupled: step1.doubled * 2, + .step({ slug: 'step1' }, (flowInput) => ({ doubled: flowInput.value * 2 })) + .step({ slug: 'step2', dependsOn: ['step1'] }, (deps) => ({ + quadrupled: deps.step1.doubled * 2, })); const step1Def = testFlow.getStepDefinition('step1'); - const step1Result = await step1Def.handler({ run: { value: 5 } }); + const step1Result = await step1Def.handler({ value: 5 }); const step2Def = testFlow.getStepDefinition('step2'); const step2Result = await step2Def.handler({ - run: { value: 5 }, step1: step1Result, }); diff --git a/pkgs/website/src/code-examples/analyze_website_simplified.ts.raw b/pkgs/website/src/code-examples/analyze_website_simplified.ts.raw index e11c386e2..25ac26920 100644 --- a/pkgs/website/src/code-examples/analyze_website_simplified.ts.raw +++ b/pkgs/website/src/code-examples/analyze_website_simplified.ts.raw @@ -1,25 +1,25 @@ -new Flow<{ url: string }>({ +new Flow<{ url: string; user_id: string }>({ slug: 'analyze_website', }) .step( { slug: 'website' }, - async (input) => await scrapeWebsite(input.run.url) + async (flowInput) => await scrapeWebsite(flowInput.url) ) .step( { slug: 'summary', dependsOn: ['website'] }, - async (input) => await summarizeWithAI(input.website.content) + async (deps) => await summarizeWithAI(deps.website.content) ) .step( { slug: 'tags', dependsOn: ['website'] }, - async ({}) => await extractTags(input.website.content) + async (deps) => await extractTags(deps.website.content) ) .step( { slug: 'saveToDb', dependsOn: ['summary', 'tags'] }, - async ({ run, summary, tags }) => + async (deps, ctx) => await saveWebsite({ - user_id: run.user_id, - website_url: run.url, - summary, - tags, + user_id: ctx.flowInput.user_id, + website_url: ctx.flowInput.url, + summary: deps.summary, + tags: deps.tags, }) ); diff --git a/pkgs/website/src/code-examples/landing-page-flow.ts.raw b/pkgs/website/src/code-examples/landing-page-flow.ts.raw index 3e8302cd9..19ee5c7c4 100644 --- a/pkgs/website/src/code-examples/landing-page-flow.ts.raw +++ b/pkgs/website/src/code-examples/landing-page-flow.ts.raw @@ -1,8 +1,8 @@ const WebsiteFlow = new Flow<{ url: string }>({ slug: 'website' }) - .step({ slug: 'fetch' }, input => scrape(input.run.url)) + .step({ slug: 'fetch' }, flowInput => scrape(flowInput.url)) .step({ slug: 'sentiment', dependsOn: ['fetch'] }, - input => analyzeSentiment(input.fetch)) + deps => analyzeSentiment(deps.fetch)) .step({ slug: 'summary', dependsOn: ['fetch'] }, - input => summarize(input.fetch)) + deps => summarize(deps.fetch)) .step({ slug: 'save', dependsOn: ['sentiment', 'summary'] }, - input => save({ ...input.sentiment, ...input.summary })); + deps => save({ ...deps.sentiment, ...deps.summary })); diff --git a/pkgs/website/src/content/docs/comparisons/vercel-workflows.mdx b/pkgs/website/src/content/docs/comparisons/vercel-workflows.mdx index 0ece7425a..8740bda25 100644 --- a/pkgs/website/src/content/docs/comparisons/vercel-workflows.mdx +++ b/pkgs/website/src/content/docs/comparisons/vercel-workflows.mdx @@ -59,15 +59,15 @@ new Flow<{ userId: string }>({ }) .step( { slug: 'createUser' }, - ({ run }) => ({ id: crypto.randomUUID(), email: run.userId }) + (flowInput) => ({ id: crypto.randomUUID(), email: flowInput.userId }) ) .step( { slug: 'sendWelcome', dependsOn: ['createUser'] }, - ({ createUser }) => sendEmail(createUser.email, 'Welcome!') + (deps) => sendEmail(deps.createUser.email, 'Welcome!') ) .step( - { slug: 'sendCheckin', dependsOn: ['sendWelcome'], startDelay: 604800 }, - ({ createUser }) => sendEmail(createUser.email, 'How are you doing?') + { slug: 'sendCheckin', dependsOn: ['sendWelcome', 'createUser'], startDelay: 604800 }, + (deps) => sendEmail(deps.createUser.email, 'How are you doing?') ); ``` diff --git a/pkgs/website/src/content/docs/get-started/flows/create-flow.mdx b/pkgs/website/src/content/docs/get-started/flows/create-flow.mdx index 5540ad46b..bc609dbd6 100644 --- a/pkgs/website/src/content/docs/get-started/flows/create-flow.mdx +++ b/pkgs/website/src/content/docs/get-started/flows/create-flow.mdx @@ -22,8 +22,6 @@ If you haven't run the example yet, start with the [Quickstart](/get-started/flo - greet-user.ts (flow definition) - index.ts (re-exports all flows) - functions - - pgflow - - index.ts (Control Plane) - greet-user-worker - index.ts (Edge Worker) @@ -104,21 +102,26 @@ export const GreetUser = new Flow({ - **`deps.fullName`**: Access the result from the `fullName` step by its slug - **Final output**: When the last step completes, its result becomes the flow output -## Flow registration - -The `supabase/flows/index.ts` file re-exports all flows: + + +## Exporting flows -```typescript title="supabase/functions/pgflow/index.ts" -import { ControlPlane } from '@pgflow/edge-worker'; -import * as flows from '../../flows/index.ts'; +Re-export each flow from `supabase/flows/index.ts`: -ControlPlane.serve(flows); +```typescript title="supabase/flows/index.ts" +export { GreetUser } from './greet-user.ts'; ``` ## Try modifying the flow diff --git a/pkgs/website/src/content/docs/tutorials/ai-web-scraper/backend.mdx b/pkgs/website/src/content/docs/tutorials/ai-web-scraper/backend.mdx index 896a0e7aa..687138b8b 100644 --- a/pkgs/website/src/content/docs/tutorials/ai-web-scraper/backend.mdx +++ b/pkgs/website/src/content/docs/tutorials/ai-web-scraper/backend.mdx @@ -267,17 +267,17 @@ import saveToDb from "../tasks/saveToDb.ts"; type Input = { url: string }; export const AnalyzeWebsite = new Flow({ slug: "analyzeWebsite", maxAttempts: 3 }) - .step({ slug: "website" }, ({ run }) => scrapeWebsite(run.url)) - .step({ slug: "summary", dependsOn: ["website"] }, ({ website }) => - summarize(website.content), + .step({ slug: "website" }, (flowInput) => scrapeWebsite(flowInput.url)) + .step({ slug: "summary", dependsOn: ["website"] }, (deps) => + summarize(deps.website.content), ) - .step({ slug: "tags", dependsOn: ["website"] }, ({ website }) => - extractTags(website.content), + .step({ slug: "tags", dependsOn: ["website"] }, (deps) => + extractTags(deps.website.content), ) .step( { slug: "saveToDb", dependsOn: ["summary", "tags"] }, - ({ run, summary, tags }) => - saveToDb({ website_url: run.url, summary, tags }), + (deps, ctx) => + saveToDb({ website_url: ctx.flowInput.url, summary: deps.summary, tags: deps.tags }), ); ``` diff --git a/pkgs/website/src/content/docs/tutorials/use-cases/chatbot.mdx b/pkgs/website/src/content/docs/tutorials/use-cases/chatbot.mdx index 6fa79f014..4d0b5b1ad 100644 --- a/pkgs/website/src/content/docs/tutorials/use-cases/chatbot.mdx +++ b/pkgs/website/src/content/docs/tutorials/use-cases/chatbot.mdx @@ -213,25 +213,25 @@ type Input = { }; export default new Flow({ slug: 'chatbot' }) - .step({ slug: 'history' }, ({ run }) => - getHistory(run.conversationId) + .step({ slug: 'history' }, (flowInput) => + getHistory(flowInput.conversationId) ) - .step({ slug: 'knowledge' }, ({ run }) => - retrieveKnowledge(run.message) + .step({ slug: 'knowledge' }, (flowInput) => + retrieveKnowledge(flowInput.message) ) .step( { slug: 'response', dependsOn: ['history', 'knowledge'] }, - ({ run, history, knowledge }) => + (deps, ctx) => generateResponse({ - message: run.message, - history, - knowledge, + message: ctx.flowInput.message, + history: deps.history, + knowledge: deps.knowledge, }) ) - .step({ slug: 'save', dependsOn: ['response'] }, ({ run, response }) => + .step({ slug: 'save', dependsOn: ['response'] }, (deps, ctx) => saveMessage({ - conversationId: run.conversationId, - content: response, + conversationId: ctx.flowInput.conversationId, + content: deps.response, }) ); ``` diff --git a/pkgs/website/src/content/docs/tutorials/use-cases/rag-pipeline.mdx b/pkgs/website/src/content/docs/tutorials/use-cases/rag-pipeline.mdx index af7d624e4..a68ec42f3 100644 --- a/pkgs/website/src/content/docs/tutorials/use-cases/rag-pipeline.mdx +++ b/pkgs/website/src/content/docs/tutorials/use-cases/rag-pipeline.mdx @@ -201,25 +201,25 @@ import generateAnswer from '../_tasks/generateAnswer.ts'; type Input = { query: string }; export default new Flow({ slug: 'ragSearch' }) - .step({ slug: 'transform' }, ({ run }) => - transformQuery(run.query) + .step({ slug: 'transform' }, (flowInput) => + transformQuery(flowInput.query) ) - .step({ slug: 'retrieve', dependsOn: ['transform'] }, ({ transform }) => + .step({ slug: 'retrieve', dependsOn: ['transform'] }, (deps) => retrieveDocuments({ - query: transform.enhanced, + query: deps.transform.enhanced, limit: 20, }) ) - .step({ slug: 'rerank', dependsOn: ['retrieve', 'transform'] }, ({ run, retrieve }) => + .step({ slug: 'rerank', dependsOn: ['retrieve', 'transform'] }, (deps, ctx) => rerankResults({ - query: run.query, - documents: retrieve, + query: ctx.flowInput.query, + documents: deps.retrieve, }) ) - .step({ slug: 'answer', dependsOn: ['rerank'] }, ({ run, rerank }) => + .step({ slug: 'answer', dependsOn: ['rerank'] }, (deps, ctx) => generateAnswer({ - query: run.query, - context: rerank, + query: ctx.flowInput.query, + context: deps.rerank, }) ); ``` diff --git a/pkgs/website/src/content/docs/tutorials/use-cases/structured-output.mdx b/pkgs/website/src/content/docs/tutorials/use-cases/structured-output.mdx index 230313a0b..dc98a9bef 100644 --- a/pkgs/website/src/content/docs/tutorials/use-cases/structured-output.mdx +++ b/pkgs/website/src/content/docs/tutorials/use-cases/structured-output.mdx @@ -125,14 +125,14 @@ import saveProduct from '../_tasks/saveProduct.ts'; type Input = { text: string }; export default new Flow({ slug: 'processProduct' }) - .step({ slug: 'extract' }, ({ run }) => - extractProduct(run.text) + .step({ slug: 'extract' }, (flowInput) => + extractProduct(flowInput.text) ) - .step({ slug: 'enrich', dependsOn: ['extract'] }, ({ extract }) => - enrichProduct(extract) + .step({ slug: 'enrich', dependsOn: ['extract'] }, (deps) => + enrichProduct(deps.extract) ) - .step({ slug: 'save', dependsOn: ['enrich'] }, ({ enrich }) => - saveProduct(enrich) + .step({ slug: 'save', dependsOn: ['enrich'] }, (deps) => + saveProduct(deps.enrich) ); ```