diff --git a/.changeset/asymmetric-handler-signatures.md b/.changeset/asymmetric-handler-signatures.md index d7dfcf932..61d815047 100644 --- a/.changeset/asymmetric-handler-signatures.md +++ b/.changeset/asymmetric-handler-signatures.md @@ -10,5 +10,6 @@ BREAKING: Asymmetric handler signatures - remove `run` key from step inputs - Root steps: `(flowInput, ctx) => ...` - flow input directly as first param - Dependent steps: `(deps, ctx) => ...` - only dependency outputs as first param -- Access flow input in dependent steps via `ctx.flowInput` +- Access flow input in dependent steps via `await ctx.flowInput` (async/lazy-loaded) +- Lazy loading prevents data duplication for map steps processing large arrays - Enables functional composition and simplifies types for future subflows diff --git a/pkgs/cli/examples/analyze_website.ts b/pkgs/cli/examples/analyze_website.ts index f9ab67b86..2e667685e 100644 --- a/pkgs/cli/examples/analyze_website.ts +++ b/pkgs/cli/examples/analyze_website.ts @@ -54,7 +54,8 @@ const AnalyzeWebsite = new Flow({ .step( { slug: 'saveToDb', dependsOn: ['sentiment', 'summary'] }, async (_deps, ctx) => { - console.log(`Saving results to database for: ${ctx.flowInput.url}`); + const flowInput = await ctx.flowInput; + console.log(`Saving results to database for: ${flowInput.url}`); // In a real implementation, this would save to a database return { status: 'success', diff --git a/pkgs/cli/src/commands/install/create-example-worker.ts b/pkgs/cli/src/commands/install/create-example-worker.ts index 137ccc722..ba2f23a74 100644 --- a/pkgs/cli/src/commands/install/create-example-worker.ts +++ b/pkgs/cli/src/commands/install/create-example-worker.ts @@ -4,7 +4,28 @@ import { log, confirm } from '@clack/prompts'; import chalk from 'chalk'; import { getVersion } from '../../utils/get-version.js'; -const INDEX_TS_TEMPLATE = `import { EdgeWorker } from '@pgflow/edge-worker'; +const INDEX_TS_TEMPLATE = `/** + * To run this worker locally: + * + * 1. Start the Edge Runtime (in one terminal): + * npx supabase functions serve --no-verify-jwt + * + * 2. Start the worker (in another terminal): + * curl http://localhost:54321/functions/v1/greet-user-worker + * + * 3. Trigger a flow run (in Supabase Studio SQL Editor): + * SELECT * FROM pgflow.start_flow( + * flow_slug => 'greetUser', + * input => '{"firstName": "Alice", "lastName": "Smith"}'::jsonb + * ); + * + * 4. Check run status: + * SELECT * FROM pgflow.runs + * WHERE flow_slug = 'greetUser' + * ORDER BY started_at DESC + * LIMIT 1; + */ +import { EdgeWorker } from '@pgflow/edge-worker'; import { GreetUser } from '../../flows/greet-user.ts'; EdgeWorker.start(GreetUser); diff --git a/pkgs/client/__tests__/e2e/full-stack-dsl.test.ts b/pkgs/client/__tests__/e2e/full-stack-dsl.test.ts index 41f6e0489..dc210f8e5 100644 --- a/pkgs/client/__tests__/e2e/full-stack-dsl.test.ts +++ b/pkgs/client/__tests__/e2e/full-stack-dsl.test.ts @@ -111,7 +111,8 @@ describe('Full Stack DSL Integration', () => { tasks = await readAndStart(sql, sqlClient, SimpleFlow.slug, 1, 5); expect(tasks).toHaveLength(1); expect(tasks[0].step_slug).toBe('process'); - expect(tasks[0].flow_input).toEqual(input); + // Dependent steps have null flow_input (lazy loaded via ctx.flowInput) + expect(tasks[0].flow_input).toBeNull(); expect(tasks[0].input.fetch).toEqual(fetchOutput); // Critical: dependency output included const processOutput = { @@ -132,7 +133,8 @@ describe('Full Stack DSL Integration', () => { tasks = await readAndStart(sql, sqlClient, SimpleFlow.slug, 1, 5); expect(tasks).toHaveLength(1); expect(tasks[0].step_slug).toBe('save'); - expect(tasks[0].flow_input).toEqual(input); + // Dependent steps have null flow_input (lazy loaded via ctx.flowInput) + expect(tasks[0].flow_input).toBeNull(); // The save step only depends on process, so it should only have process output // This is correct behavior - transitive dependencies are not automatically included diff --git a/pkgs/client/__tests__/e2e/happy-path-e2e.test.ts b/pkgs/client/__tests__/e2e/happy-path-e2e.test.ts index c052a1378..084b94446 100644 --- a/pkgs/client/__tests__/e2e/happy-path-e2e.test.ts +++ b/pkgs/client/__tests__/e2e/happy-path-e2e.test.ts @@ -88,7 +88,8 @@ describe('Happy Path E2E Integration', () => { tasks = await readAndStart(sql, sqlClient, testFlow.slug, 1, 5); expect(tasks).toHaveLength(1); expect(tasks[0].step_slug).toBe('process'); - expect(tasks[0].flow_input).toEqual(input); + // Dependent steps have null flow_input (lazy loaded via ctx.flowInput) + expect(tasks[0].flow_input).toBeNull(); expect(tasks[0].input.fetch).toEqual(fetchOutput); const processOutput = { @@ -127,7 +128,8 @@ describe('Happy Path E2E Integration', () => { log('Save task input:', JSON.stringify(tasks[0].input, null, 2)); - expect(tasks[0].flow_input).toEqual(input); + // Dependent steps have null flow_input (lazy loaded via ctx.flowInput) + expect(tasks[0].flow_input).toBeNull(); expect(tasks[0].input.process).toEqual(processOutput); // Note: save step only depends on process, so fetch output is not included (correct behavior) expect(tasks[0].input.fetch).toBeUndefined(); diff --git a/pkgs/core/schemas/0120_function_start_tasks.sql b/pkgs/core/schemas/0120_function_start_tasks.sql index 1e790b4cf..d212a6934 100644 --- a/pkgs/core/schemas/0120_function_start_tasks.sql +++ b/pkgs/core/schemas/0120_function_start_tasks.sql @@ -172,9 +172,14 @@ as $$ st.message_id as msg_id, st.task_index as task_index, -- flow_input: Original run input for worker context - -- This allows workers to access the original flow input directly - -- without parsing it from the constructed 'input' field - r.input as flow_input + -- Only included for root non-map steps to avoid data duplication. + -- Root map steps: flowInput IS the array, useless to include + -- Dependent steps: lazy load via ctx.flowInput when needed + CASE + WHEN step.step_type != 'map' AND step.deps_count = 0 + THEN r.input + ELSE NULL + END as flow_input from tasks st join runs r on st.run_id = r.run_id join pgflow.steps step on diff --git a/pkgs/core/src/types.ts b/pkgs/core/src/types.ts index 32c7d0ed0..5789dbd8c 100644 --- a/pkgs/core/src/types.ts +++ b/pkgs/core/src/types.ts @@ -16,6 +16,10 @@ export type { Json }; * Same as pgflow.step_task_record type, but with not-null fields and type argument for payload. * The input type is automatically inferred based on the step_slug using a discriminated union. * This ensures that each step only receives inputs from its declared dependencies and the flow's run input. + * + * Note: flow_input is nullable because start_tasks only includes it for root non-map steps. + * For dependent and map steps, flow_input is NULL to avoid data duplication. + * Workers can access the original flow input via ctx.flowInput (lazy loaded). */ export type StepTaskRecord = { [StepSlug in Extract, string>]: { @@ -25,7 +29,7 @@ export type StepTaskRecord = { task_index: number; input: Simplify>; msg_id: number; - flow_input: ExtractFlowInput; + flow_input: ExtractFlowInput | null; }; }[Extract, string>]; diff --git a/pkgs/core/supabase/migrations/20251223110504_pgflow_add_flow_input_column.sql b/pkgs/core/supabase/migrations/20251225163110_pgflow_add_flow_input_column.sql similarity index 95% rename from pkgs/core/supabase/migrations/20251223110504_pgflow_add_flow_input_column.sql rename to pkgs/core/supabase/migrations/20251225163110_pgflow_add_flow_input_column.sql index a3335d4cf..0fd0ff576 100644 --- a/pkgs/core/supabase/migrations/20251223110504_pgflow_add_flow_input_column.sql +++ b/pkgs/core/supabase/migrations/20251225163110_pgflow_add_flow_input_column.sql @@ -166,9 +166,14 @@ with tasks as ( st.message_id as msg_id, st.task_index as task_index, -- flow_input: Original run input for worker context - -- This allows workers to access the original flow input directly - -- without parsing it from the constructed 'input' field - r.input as flow_input + -- Only included for root non-map steps to avoid data duplication. + -- Root map steps: flowInput IS the array, useless to include + -- Dependent steps: lazy load via ctx.flowInput when needed + CASE + WHEN step.step_type != 'map' AND step.deps_count = 0 + THEN r.input + ELSE NULL + END as flow_input from tasks st join runs r on st.run_id = r.run_id join pgflow.steps step on diff --git a/pkgs/core/supabase/migrations/atlas.sum b/pkgs/core/supabase/migrations/atlas.sum index aeb82b508..1f9f6eb2a 100644 --- a/pkgs/core/supabase/migrations/atlas.sum +++ b/pkgs/core/supabase/migrations/atlas.sum @@ -1,4 +1,4 @@ -h1:32eppXsursErjjXT7SfNzyRPfw4iiiL1Pnafyy4DSvk= +h1:SkrHj8RTAbqZRxs3/TvkZ0FV1l9SeNX2Cxba02DAPAk= 20250429164909_pgflow_initial.sql h1:I3n/tQIg5Q5nLg7RDoU3BzqHvFVjmumQxVNbXTPG15s= 20250517072017_pgflow_fix_poll_for_tasks_to_use_separate_statement_for_polling.sql h1:wTuXuwMxVniCr3ONCpodpVWJcHktoQZIbqMZ3sUHKMY= 20250609105135_pgflow_add_start_tasks_and_started_status.sql h1:ggGanW4Wyt8Kv6TWjnZ00/qVb3sm+/eFVDjGfT8qyPg= @@ -14,4 +14,4 @@ h1:32eppXsursErjjXT7SfNzyRPfw4iiiL1Pnafyy4DSvk= 20251130000000_pgflow_auto_compilation.sql h1:qs+3qq1Vsyo0ETzbxDnmkVtSUa6XHkd/K9wF/3W46jM= 20251209074533_pgflow_worker_management.sql h1:ozFkYM1EvEH7dGvW+1pWgpzbwdWlwuQoddqLomL1GXw= 20251212100113_pgflow_allow_data_loss_parameter.sql h1:Fg3RHj51STNHS4epQ2J4AFMj7NwG0XfyDTSA/9dcBIQ= -20251223110504_pgflow_add_flow_input_column.sql h1:HJ9GJc4xh68JT82JINf946RS/jQNt9jDhvIH4VA40c8= +20251225163110_pgflow_add_flow_input_column.sql h1:734uCbTgKmPhTK3TY56uNYZ31T8u59yll9ea7nwtEoc= diff --git a/pkgs/core/supabase/tests/start_tasks/conditional_flow_input.test.sql b/pkgs/core/supabase/tests/start_tasks/conditional_flow_input.test.sql new file mode 100644 index 000000000..bffff444b --- /dev/null +++ b/pkgs/core/supabase/tests/start_tasks/conditional_flow_input.test.sql @@ -0,0 +1,207 @@ +begin; +select plan(6); +select pgflow_tests.reset_db(); + +-- ========================================================================= +-- Test: Conditional flow_input in start_tasks +-- +-- Only root non-map steps receive flow_input. +-- All other step types (root map, dependent non-map, dependent map) get NULL. +-- This optimization prevents data duplication for large array processing. +-- ========================================================================= + +-- Test 1: Root non-map step receives flow_input +select pgflow.create_flow('root_step_flow'); +select pgflow.add_step('root_step_flow', 'root_step'); +select pgflow.start_flow('root_step_flow', '{"user_id": "abc123"}'::jsonb); + +select pgflow_tests.ensure_worker('root_step_flow'); + +with msgs as ( + select * from pgmq.read_with_poll('root_step_flow', 10, 5, 1, 50) limit 1 +), +msg_ids as ( + select array_agg(msg_id) as ids from msgs +), +started_tasks as ( + select * from pgflow.start_tasks( + 'root_step_flow', + (select ids from msg_ids), + '11111111-1111-1111-1111-111111111111'::uuid + ) +) +select is( + (select flow_input from started_tasks), + '{"user_id": "abc123"}'::jsonb, + 'Root non-map step should receive flow_input' +); + +-- Test 2: Dependent non-map step receives NULL flow_input +select pgflow_tests.reset_db(); +select pgflow.create_flow('dep_flow'); +select pgflow.add_step('dep_flow', 'first'); +select pgflow.add_step('dep_flow', 'second', ARRAY['first']); +select pgflow.start_flow('dep_flow', '{"original": "input"}'::jsonb); + +select pgflow_tests.ensure_worker('dep_flow'); + +-- Complete first step +with poll_result as ( + select * from pgflow_tests.read_and_start('dep_flow', 1, 1) +) +select pgflow.complete_task( + run_id, + step_slug, + task_index, + '{"first_result": "done"}'::jsonb +) from poll_result; + +-- Start second step and verify flow_input is NULL +select pgflow_tests.ensure_worker('dep_flow', '22222222-2222-2222-2222-222222222222'::uuid); +with msgs as ( + select * from pgmq.read_with_poll('dep_flow', 10, 5, 1, 50) limit 1 +), +msg_ids as ( + select array_agg(msg_id) as ids from msgs +), +started_tasks as ( + select * from pgflow.start_tasks( + 'dep_flow', + (select ids from msg_ids), + '22222222-2222-2222-2222-222222222222'::uuid + ) +) +select is( + (select flow_input from started_tasks), + NULL::jsonb, + 'Dependent non-map step should receive NULL flow_input (lazy loaded)' +); + +-- Test 3: Root map step receives NULL flow_input +-- (flowInput IS the array, useless to include - workers get element via task.input) +select pgflow_tests.reset_db(); +select pgflow.create_flow('root_map_flow'); +select pgflow.add_step('root_map_flow', 'map_step', '{}', null, null, null, null, 'map'); +select pgflow.start_flow('root_map_flow', '[1, 2, 3]'::jsonb); + +select pgflow_tests.ensure_worker('root_map_flow'); + +with msgs as ( + select * from pgmq.read_with_poll('root_map_flow', 10, 5, 3, 50) limit 1 +), +msg_ids as ( + select array_agg(msg_id) as ids from msgs +), +started_tasks as ( + select * from pgflow.start_tasks( + 'root_map_flow', + (select ids from msg_ids), + '11111111-1111-1111-1111-111111111111'::uuid + ) +) +select is( + (select flow_input from started_tasks limit 1), + NULL::jsonb, + 'Root map step should receive NULL flow_input (flowInput is the array itself)' +); + +-- Test 4: Dependent map step receives NULL flow_input +select pgflow_tests.reset_db(); +select pgflow.create_flow('dep_map_flow'); +select pgflow.add_step('dep_map_flow', 'fetch_items', '{}', null, null, null, null, 'single'); +select pgflow.add_step('dep_map_flow', 'process_item', ARRAY['fetch_items'], null, null, null, null, 'map'); +select pgflow.start_flow('dep_map_flow', '{"config": "value"}'::jsonb); + +select pgflow_tests.ensure_worker('dep_map_flow'); + +-- Complete first step with array +with poll_result as ( + select * from pgflow_tests.read_and_start('dep_map_flow', 1, 1) +) +select pgflow.complete_task( + run_id, + step_slug, + task_index, + '["a", "b", "c"]'::jsonb +) from poll_result; + +-- Start map step and verify flow_input is NULL +select pgflow_tests.ensure_worker('dep_map_flow', '33333333-3333-3333-3333-333333333333'::uuid); +with msgs as ( + select * from pgmq.read_with_poll('dep_map_flow', 10, 5, 3, 50) limit 1 +), +msg_ids as ( + select array_agg(msg_id) as ids from msgs +), +started_tasks as ( + select * from pgflow.start_tasks( + 'dep_map_flow', + (select ids from msg_ids), + '33333333-3333-3333-3333-333333333333'::uuid + ) +) +select is( + (select flow_input from started_tasks limit 1), + NULL::jsonb, + 'Dependent map step should receive NULL flow_input (lazy loaded)' +); + +-- Test 5: Multiple parallel root non-map steps all receive flow_input +select pgflow_tests.reset_db(); +select pgflow.create_flow('parallel_flow'); +select pgflow.add_step('parallel_flow', 'step1'); +select pgflow.add_step('parallel_flow', 'step2'); -- parallel step, no deps +select pgflow.start_flow('parallel_flow', '{"batch": "test"}'::jsonb); + +select pgflow_tests.ensure_worker('parallel_flow'); + +with msgs as ( + select * from pgmq.read_with_poll('parallel_flow', 10, 5, 2, 50) +), +msg_ids as ( + select array_agg(msg_id) as ids from msgs +), +started_tasks as ( + select * from pgflow.start_tasks( + 'parallel_flow', + (select ids from msg_ids), + '11111111-1111-1111-1111-111111111111'::uuid + ) +) +select ok( + (select bool_and(flow_input = '{"batch": "test"}'::jsonb) from started_tasks), + 'All parallel root non-map steps should receive flow_input' +); + +-- Test 6: Mixed batch - only root non-map steps receive flow_input +-- This tests a complex scenario with different step types in same batch +select pgflow_tests.reset_db(); +select pgflow.create_flow('mixed_flow'); +select pgflow.add_step('mixed_flow', 'root_single'); -- root non-map +select pgflow.add_step('mixed_flow', 'root_array'); -- root non-map (array is just single that returns array) +select pgflow.start_flow('mixed_flow', '{"data": "value"}'::jsonb); + +select pgflow_tests.ensure_worker('mixed_flow'); + +-- Both are root non-map steps, both should get flow_input +with msgs as ( + select * from pgmq.read_with_poll('mixed_flow', 10, 5, 2, 50) +), +msg_ids as ( + select array_agg(msg_id) as ids from msgs +), +started_tasks as ( + select * from pgflow.start_tasks( + 'mixed_flow', + (select ids from msg_ids), + '11111111-1111-1111-1111-111111111111'::uuid + ) +) +select is( + (select count(*)::int from started_tasks where flow_input is not null), + 2, + 'Both root non-map steps should receive non-null flow_input' +); + +select finish(); +rollback; diff --git a/pkgs/core/supabase/tests/start_tasks/returns_flow_input.test.sql b/pkgs/core/supabase/tests/start_tasks/returns_flow_input.test.sql index 18d064676..cea8c936d 100644 --- a/pkgs/core/supabase/tests/start_tasks/returns_flow_input.test.sql +++ b/pkgs/core/supabase/tests/start_tasks/returns_flow_input.test.sql @@ -2,7 +2,15 @@ begin; select plan(5); select pgflow_tests.reset_db(); --- Test 1: Root step returns flow_input matching original run input +-- ========================================================================= +-- Test: flow_input in start_tasks (conditional inclusion) +-- +-- Only root non-map steps receive flow_input. +-- Other step types (dependent, map) receive NULL for efficiency. +-- Workers lazy-load flow_input via ctx.flowInput when needed. +-- ========================================================================= + +-- Test 1: Root non-map step returns flow_input matching original run input select pgflow.create_flow('simple_flow'); select pgflow.add_step('simple_flow', 'root_step'); select pgflow.start_flow('simple_flow', '{"user_id": "abc123", "config": {"debug": true}}'::jsonb); @@ -25,10 +33,10 @@ started_tasks as ( select is( (select flow_input from started_tasks), '{"user_id": "abc123", "config": {"debug": true}}'::jsonb, - 'Root step flow_input should match original run input' + 'Root non-map step flow_input should match original run input' ); --- Test 2: Dependent step also returns flow_input +-- Test 2: Dependent step returns NULL flow_input (lazy loaded by worker) select pgflow_tests.reset_db(); select pgflow.create_flow('dep_flow'); select pgflow.add_step('dep_flow', 'first'); @@ -48,7 +56,7 @@ select pgflow.complete_task( '{"first_result": "done"}'::jsonb ) from poll_result; --- Start second step and verify flow_input +-- Start second step and verify flow_input is NULL select pgflow_tests.ensure_worker('dep_flow', '22222222-2222-2222-2222-222222222222'::uuid); with msgs as ( select * from pgmq.read_with_poll('dep_flow', 10, 5, 1, 50) limit 1 @@ -65,12 +73,12 @@ started_tasks as ( ) select is( (select flow_input from started_tasks), - '{"original": "input"}'::jsonb, - 'Dependent step flow_input should match original run input' + NULL::jsonb, + 'Dependent step flow_input should be NULL (lazy loaded by worker)' ); --- Tests 3 & 4: Root map step returns flow_input (the array) --- All map tasks should have same flow_input and it should be the original array +-- Tests 3 & 4: Root map step returns NULL flow_input +-- Map steps receive NULL because flowInput IS the array (useless to duplicate) select pgflow_tests.reset_db(); select pgflow.create_flow('map_flow'); select pgflow.add_step('map_flow', 'map_step', '{}', null, null, null, null, 'map'); @@ -92,23 +100,23 @@ select * from pgflow.start_tasks( '11111111-1111-1111-1111-111111111111'::uuid ); --- Test 3: All map tasks should have same flow_input +-- Test 3: All map tasks should have NULL flow_input (consistent) select is( - (select count(distinct flow_input)::int from map_started_tasks), + (select count(distinct coalesce(flow_input::text, 'NULL'))::int from map_started_tasks), 1, - 'All map tasks should have same flow_input' + 'All map tasks should have consistent flow_input (all NULL)' ); --- Test 4: Map task flow_input is the original array, not the element +-- Test 4: Map task flow_input is NULL (lazy loaded by worker) select is( (select flow_input from map_started_tasks limit 1), - '[1, 2, 3]'::jsonb, - 'Map task flow_input should be original array, not individual element' + NULL::jsonb, + 'Map task flow_input should be NULL (lazy loaded by worker)' ); drop table map_started_tasks; --- Test 5: Multiple tasks in batch all have correct flow_input +-- Test 5: Multiple parallel root non-map tasks all have correct flow_input select pgflow_tests.reset_db(); select pgflow.create_flow('multi_flow'); select pgflow.add_step('multi_flow', 'step1'); @@ -133,7 +141,7 @@ started_tasks as ( ) select ok( (select bool_and(flow_input = '{"batch": "test"}'::jsonb) from started_tasks), - 'All tasks in batch should have correct flow_input' + 'All parallel root non-map tasks should have correct flow_input' ); select finish(); diff --git a/pkgs/dsl/__tests__/integration/array-integration.test.ts b/pkgs/dsl/__tests__/integration/array-integration.test.ts index 38f2724eb..bc98b2246 100644 --- a/pkgs/dsl/__tests__/integration/array-integration.test.ts +++ b/pkgs/dsl/__tests__/integration/array-integration.test.ts @@ -20,11 +20,12 @@ describe('Array Integration Tests', () => { })) ) // Filter users based on criteria - .array({ slug: 'filtered_users', dependsOn: ['users'] }, (deps, ctx) => - ctx.flowInput.includeInactive + .array({ slug: 'filtered_users', dependsOn: ['users'] }, async (deps, ctx) => { + const flowInput = await ctx.flowInput; + return flowInput.includeInactive ? deps.users - : deps.users.filter(user => user.active) - ) + : deps.users.filter(user => user.active); + }) // Calculate statistics from filtered users .step({ slug: 'stats', dependsOn: ['filtered_users'] }, (deps) => ({ count: deps.filtered_users.length, @@ -54,7 +55,7 @@ describe('Array Integration Tests', () => { const filteredHandler = dataFlow.getStepDefinition('filtered_users').handler; const filteredResult = await filteredHandler({ users: usersResult - }, { flowInput: input }); + }, { flowInput: Promise.resolve(input) } as any); const statsHandler = dataFlow.getStepDefinition('stats').handler; const statsResult = await statsHandler({ @@ -92,8 +93,9 @@ describe('Array Integration Tests', () => { })) ) // Build level 1 from level 0 - .array({ slug: 'level_1', dependsOn: ['level_0'] }, (deps, ctx) => { - if (ctx.flowInput.levels <= 1) return []; + .array({ slug: 'level_1', dependsOn: ['level_0'] }, async (deps, ctx) => { + const flowInput = await ctx.flowInput; + if (flowInput.levels <= 1) return []; return deps.level_0.map(item => ({ id: item.id + 100, level: 1, @@ -102,8 +104,9 @@ describe('Array Integration Tests', () => { })); }) // Build level 2 from level 1 - .array({ slug: 'level_2', dependsOn: ['level_1'] }, (deps, ctx) => { - if (ctx.flowInput.levels <= 2) return []; + .array({ slug: 'level_2', dependsOn: ['level_1'] }, async (deps, ctx) => { + const flowInput = await ctx.flowInput; + if (flowInput.levels <= 2) return []; return deps.level_1.map(item => ({ id: item.id + 100, level: 2, @@ -135,10 +138,10 @@ describe('Array Integration Tests', () => { const level0Result = await level0Handler(input); const level1Handler = pyramidFlow.getStepDefinition('level_1').handler; - const level1Result = await level1Handler({ level_0: level0Result }, { flowInput: input }); + const level1Result = await level1Handler({ level_0: level0Result }, { flowInput: Promise.resolve(input) } as any); const level2Handler = pyramidFlow.getStepDefinition('level_2').handler; - const level2Result = await level2Handler({ level_1: level1Result }, { flowInput: input }); + const level2Result = await level2Handler({ level_1: level1Result }, { flowInput: Promise.resolve(input) } as any); const summaryHandler = pyramidFlow.getStepDefinition('summary').handler; const summaryResult = await summaryHandler({ @@ -187,12 +190,13 @@ describe('Array Integration Tests', () => { })) ) // Batch processing (regular step that groups array data) - .step({ slug: 'batches', dependsOn: ['raw_data', 'config'] }, (deps, ctx) => { + .step({ slug: 'batches', dependsOn: ['raw_data', 'config'] }, async (deps, ctx) => { + const flowInput = await ctx.flowInput; const batches = []; - for (let i = 0; i < deps.raw_data.length; i += ctx.flowInput.batchSize) { + for (let i = 0; i < deps.raw_data.length; i += flowInput.batchSize) { batches.push({ - id: Math.floor(i / ctx.flowInput.batchSize), - items: deps.raw_data.slice(i, i + ctx.flowInput.batchSize), + id: Math.floor(i / flowInput.batchSize), + items: deps.raw_data.slice(i, i + flowInput.batchSize), mode: deps.config.processingMode }); } @@ -238,7 +242,7 @@ describe('Array Integration Tests', () => { const batchesResult = await batchesHandler({ raw_data: rawDataResult, config: configResult - }, { flowInput: input }); + }, { flowInput: Promise.resolve(input) } as any); const processedHandler = processingFlow.getStepDefinition('processed_batches').handler; const processedResult = await processedHandler({ diff --git a/pkgs/dsl/__tests__/runtime/array-method.test.ts b/pkgs/dsl/__tests__/runtime/array-method.test.ts index 971586e46..0130861e7 100644 --- a/pkgs/dsl/__tests__/runtime/array-method.test.ts +++ b/pkgs/dsl/__tests__/runtime/array-method.test.ts @@ -255,9 +255,10 @@ describe('.array() method', () => { it('allows steps to depend on array steps', async () => { const testFlow = new Flow<{ multiplier: number }>({ slug: 'test_flow' }) .array({ slug: 'numbers' }, () => [1, 2, 3]) - .step({ slug: 'sum', dependsOn: ['numbers'] }, (deps, ctx) => - deps.numbers.reduce((acc, n) => acc + n * ctx.flowInput.multiplier, 0) - ); + .step({ slug: 'sum', dependsOn: ['numbers'] }, async (deps, ctx) => { + const flowInput = await ctx.flowInput; + return deps.numbers.reduce((acc, n) => acc + n * flowInput.multiplier, 0); + }); const numbersDef = testFlow.getStepDefinition('numbers'); const numbersResult = await numbersDef.handler({ multiplier: 2 }); @@ -265,7 +266,7 @@ describe('.array() method', () => { const sumDef = testFlow.getStepDefinition('sum'); const sumResult = await sumDef.handler({ numbers: numbersResult - }, { flowInput: { multiplier: 2 } }); + }, { flowInput: Promise.resolve({ multiplier: 2 }) } as any); expect(numbersResult).toEqual([1, 2, 3]); expect(sumResult).toBe(12); // (1 + 2 + 3) * 2 @@ -274,9 +275,10 @@ describe('.array() method', () => { it('allows array steps to depend on other array steps', async () => { const testFlow = new Flow<{ factor: number }>({ slug: 'test_flow' }) .array({ slug: 'base_numbers' }, () => [1, 2, 3]) - .array({ slug: 'doubled', dependsOn: ['base_numbers'] }, (deps, ctx) => - deps.base_numbers.map(n => n * ctx.flowInput.factor) - ); + .array({ slug: 'doubled', dependsOn: ['base_numbers'] }, async (deps, ctx) => { + const flowInput = await ctx.flowInput; + return deps.base_numbers.map(n => n * flowInput.factor); + }); const baseDef = testFlow.getStepDefinition('base_numbers'); const baseResult = await baseDef.handler({ factor: 2 }); @@ -284,7 +286,7 @@ describe('.array() method', () => { const doubledDef = testFlow.getStepDefinition('doubled'); const doubledResult = await doubledDef.handler({ base_numbers: baseResult - }, { flowInput: { factor: 2 } }); + }, { flowInput: Promise.resolve({ factor: 2 }) } as any); expect(baseResult).toEqual([1, 2, 3]); expect(doubledResult).toEqual([2, 4, 6]); diff --git a/pkgs/dsl/__tests__/types/context-inference.test-d.ts b/pkgs/dsl/__tests__/types/context-inference.test-d.ts index 7ca2fafe7..0d2ddf6e6 100644 --- a/pkgs/dsl/__tests__/types/context-inference.test-d.ts +++ b/pkgs/dsl/__tests__/types/context-inference.test-d.ts @@ -84,8 +84,8 @@ describe('Context Type Inference Tests', () => { expectTypeOf(deps.double.doubled).toEqualTypeOf(); // And still has custom context expectTypeOf(context.multiplier).toEqualTypeOf(); - // Access flow input via context.flowInput - expectTypeOf(context.flowInput.initial).toEqualTypeOf(); + // Access flow input via context.flowInput (Promise type) + expectTypeOf(context.flowInput).toEqualTypeOf>(); return { formatted: String(deps.double.doubled) }; }); diff --git a/pkgs/dsl/__tests__/types/extract-flow-leaf-steps.test-d.ts b/pkgs/dsl/__tests__/types/extract-flow-leaf-steps.test-d.ts index 1c9f1487c..1767aa294 100644 --- a/pkgs/dsl/__tests__/types/extract-flow-leaf-steps.test-d.ts +++ b/pkgs/dsl/__tests__/types/extract-flow-leaf-steps.test-d.ts @@ -96,10 +96,13 @@ describe('ExtractFlowLeafSteps utility type', () => { .step({ slug: 'step3', dependsOn: ['step1'] }, (deps) => ({ value: deps.step1.value - 1, })) - .step({ slug: 'step4', dependsOn: ['step2', 'step3'] }, (deps, ctx) => ({ - sum: deps.step2.value + deps.step3.value, - original: ctx.flowInput.input, - })); + .step({ slug: 'step4', dependsOn: ['step2', 'step3'] }, async (deps, ctx) => { + const flowInput = await ctx.flowInput; + return { + sum: deps.step2.value + deps.step3.value, + original: flowInput.input, + }; + }); type LeafSteps = ExtractFlowLeafSteps; diff --git a/pkgs/dsl/__tests__/types/extract-flow-output.test-d.ts b/pkgs/dsl/__tests__/types/extract-flow-output.test-d.ts index a5b6ac280..b69410912 100644 --- a/pkgs/dsl/__tests__/types/extract-flow-output.test-d.ts +++ b/pkgs/dsl/__tests__/types/extract-flow-output.test-d.ts @@ -112,10 +112,13 @@ describe('ExtractFlowOutput utility type', () => { .step({ slug: 'step3', dependsOn: ['step1'] }, (deps) => ({ value: deps.step1.value - 1, })) - .step({ slug: 'step4', dependsOn: ['step2', 'step3'] }, (deps, ctx) => ({ - sum: deps.step2.value + deps.step3.value, - original: ctx.flowInput.input, - })); + .step({ slug: 'step4', dependsOn: ['step2', 'step3'] }, async (deps, ctx) => { + const flowInput = await ctx.flowInput; + return { + sum: deps.step2.value + deps.step3.value, + original: flowInput.input, + }; + }); type FlowOutput = ExtractFlowOutput; diff --git a/pkgs/dsl/__tests__/types/map-method.test-d.ts b/pkgs/dsl/__tests__/types/map-method.test-d.ts index 492c259af..cb52f264a 100644 --- a/pkgs/dsl/__tests__/types/map-method.test-d.ts +++ b/pkgs/dsl/__tests__/types/map-method.test-d.ts @@ -177,8 +177,8 @@ describe('.map() method type constraints', () => { // Let TypeScript infer the full context type expectTypeOf(context.env).toEqualTypeOf>(); expectTypeOf(context.shutdownSignal).toEqualTypeOf(); - // Context should include flowInput for map steps - expectTypeOf(context.flowInput).toEqualTypeOf(); + // Context should include flowInput as Promise for map steps + expectTypeOf(context.flowInput).toEqualTypeOf>(); return String(item); }); diff --git a/pkgs/dsl/__tests__/types/step-input.test-d.ts b/pkgs/dsl/__tests__/types/step-input.test-d.ts index 4f13d485a..c95f47db5 100644 --- a/pkgs/dsl/__tests__/types/step-input.test-d.ts +++ b/pkgs/dsl/__tests__/types/step-input.test-d.ts @@ -181,8 +181,8 @@ describe('StepInput utility type (asymmetric)', () => { .step({ slug: 'process', dependsOn: ['fetch'] }, (deps, ctx) => { // deps should only contain dependencies expectTypeOf(deps).toMatchTypeOf<{ fetch: { data: string } }>(); - // flowInput available via context - expectTypeOf(ctx.flowInput).toMatchTypeOf<{ userId: string }>(); + // flowInput available via context as Promise + expectTypeOf(ctx.flowInput).toMatchTypeOf>(); return { result: deps.fetch.data }; }); diff --git a/pkgs/dsl/__tests__/types/supabase-preset.test-d.ts b/pkgs/dsl/__tests__/types/supabase-preset.test-d.ts index 77353949f..84451dcd9 100644 --- a/pkgs/dsl/__tests__/types/supabase-preset.test-d.ts +++ b/pkgs/dsl/__tests__/types/supabase-preset.test-d.ts @@ -177,8 +177,8 @@ describe('Supabase Flow Type Tests', () => { .step({ slug: 'format', dependsOn: ['multiply'] }, (deps, context) => { // Should have proper step input inference from dependencies (deps only, no run key) expectTypeOf(deps.multiply.result).toEqualTypeOf(); - // Access flow input via context.flowInput - expectTypeOf(context.flowInput.initial).toEqualTypeOf(); + // Access flow input via context.flowInput (Promise type) + expectTypeOf(context.flowInput).toEqualTypeOf>(); // Should still have all context expectTypeOf(context.sql).toEqualTypeOf(); diff --git a/pkgs/dsl/src/dsl.ts b/pkgs/dsl/src/dsl.ts index 11c261c6e..b96fc43a3 100644 --- a/pkgs/dsl/src/dsl.ts +++ b/pkgs/dsl/src/dsl.ts @@ -302,11 +302,16 @@ export interface BaseContext { workerConfig: Readonly; } -// Flow context extends base with stepTask and flowInput -// TFlowInput is typed as Json by default but gets properly typed via method overloads +/** + * Flow context extends base with stepTask and flowInput. + * + * Note: flowInput is a Promise to support lazy loading. Only root non-map steps + * receive flow_input from SQL; other step types lazy-load it on demand. + * Use `await ctx.flowInput` to access the original flow input. + */ export interface FlowContext extends BaseContext { stepTask: StepTaskRecord; - flowInput: TFlowInput; + flowInput: Promise; } // Generic context type helper (uses FlowContext for flow handlers) diff --git a/pkgs/dsl/src/example-flow.ts b/pkgs/dsl/src/example-flow.ts index 22fa98995..54fa3ad73 100644 --- a/pkgs/dsl/src/example-flow.ts +++ b/pkgs/dsl/src/example-flow.ts @@ -28,10 +28,11 @@ export const AnalyzeWebsite = new Flow({ ) .step( { slug: 'saveToDb', dependsOn: ['sentiment', 'summary'] }, - // Dependent step needing flowInput: access via ctx.flowInput + // Dependent step needing flowInput: access via await ctx.flowInput async (deps, ctx) => { + const flowInput = await ctx.flowInput; const results = await saveToDb({ - websiteUrl: ctx.flowInput.url, + websiteUrl: flowInput.url, sentiment: deps.sentiment.score, summary: deps.summary.aiSummary, }); diff --git a/pkgs/edge-worker/src/core/context.ts b/pkgs/edge-worker/src/core/context.ts index cba1a12e9..47d0e70c6 100644 --- a/pkgs/edge-worker/src/core/context.ts +++ b/pkgs/edge-worker/src/core/context.ts @@ -26,11 +26,17 @@ export interface MessageExecution { workerConfig: Readonly>; } +/** + * Execution context for step task handlers. + * + * Note: flowInput is a Promise to support lazy loading. Only root non-map steps + * receive flow_input from SQL; other step types lazy-load it on demand. + */ export interface StepTaskExecution { rawMessage: PgmqMessageRecord>; stepTask : StepTaskRecord; workerConfig: Readonly>; - flowInput: ExtractFlowInput; + flowInput: Promise>; } /** Message handler context for any platform */ @@ -49,11 +55,18 @@ export type StepTaskContext< 3. UTILITIES --------------------------------------------------------------------- */ +/** + * Combined task and message data from polling. + * + * Note: flowInput is nullable because start_tasks only provides it for root + * non-map steps. The executor will create a Promise that either resolves + * immediately (if provided) or lazy-loads from the runs table. + */ export interface StepTaskWithMessage { msg_id : number; message: PgmqMessageRecord>; task : StepTaskRecord; - flowInput: ExtractFlowInput; + flowInput: ExtractFlowInput | null; } import { deepClone, deepFreeze } from './deepUtils.js'; diff --git a/pkgs/edge-worker/src/flow/FlowInputProvider.ts b/pkgs/edge-worker/src/flow/FlowInputProvider.ts new file mode 100644 index 000000000..38ce852b5 --- /dev/null +++ b/pkgs/edge-worker/src/flow/FlowInputProvider.ts @@ -0,0 +1,92 @@ +import type postgres from 'postgres'; +import type { AnyFlow, ExtractFlowInput } from '@pgflow/dsl'; + +/** + * Provides lazy loading and caching of flow input for step handlers. + * + * When `start_tasks` returns `flow_input = NULL` (for dependent and map steps), + * handlers that need to access the original flow input can do so via + * `await ctx.flowInput`. This class: + * + * 1. Caches flow input per run_id to avoid duplicate fetches + * 2. Deduplicates concurrent requests for the same run_id + * 3. Populates cache from task records when flow_input is provided + * + * This optimization reduces data transfer significantly for map steps + * processing large arrays, where each of potentially thousands of tasks + * would otherwise carry the full flow input. + */ +export class FlowInputProvider { + private cache = new Map>(); + private pending = new Map>>(); + + constructor(private readonly sql: postgres.Sql) {} + + /** + * Get flow input for a run, fetching from DB if not cached. + * Concurrent requests for the same run_id share the same promise. + */ + get(runId: string): Promise> { + // 1. Check cache + const cached = this.cache.get(runId); + if (cached !== undefined) { + return Promise.resolve(cached); + } + + // 2. Dedupe concurrent awaits for same run_id + const pending = this.pending.get(runId); + if (pending) { + return pending; + } + + // 3. Fetch and cache + const promise = this.fetchAndCache(runId); + this.pending.set(runId, promise); + return promise; + } + + /** + * Populate cache from task record when flow_input is provided by SQL. + * Called by StepTaskPoller when processing root non-map steps. + */ + populate(runId: string, flowInput: ExtractFlowInput): void { + if (!this.cache.has(runId)) { + this.cache.set(runId, flowInput); + } + } + + /** + * Check if flow input is cached for a run. + * Useful for testing and debugging. + */ + has(runId: string): boolean { + return this.cache.has(runId); + } + + /** + * Clear all cached flow inputs. + * Useful for testing or when worker restarts. + */ + clear(): void { + this.cache.clear(); + this.pending.clear(); + } + + private async fetchAndCache(runId: string): Promise> { + try { + const [row] = await this.sql<{ input: ExtractFlowInput }[]>` + SELECT input FROM pgflow.runs WHERE run_id = ${runId} + `; + + if (!row) { + throw new Error(`Run not found: ${runId}`); + } + + const flowInput = row.input; + this.cache.set(runId, flowInput); + return flowInput; + } finally { + this.pending.delete(runId); + } + } +} diff --git a/pkgs/edge-worker/src/flow/StepTaskExecutor.ts b/pkgs/edge-worker/src/flow/StepTaskExecutor.ts index cc339d6ec..9f77fffeb 100644 --- a/pkgs/edge-worker/src/flow/StepTaskExecutor.ts +++ b/pkgs/edge-worker/src/flow/StepTaskExecutor.ts @@ -111,8 +111,8 @@ export class StepTaskExecutor(sql); + // Create StepTaskPoller with two-phase approach const pollerConfig: StepTaskPollerConfig = { batchSize: resolvedConfig.batchSize, @@ -122,7 +126,15 @@ export function createFlowWorker, signal: AbortSignal ): IExecutor => { + const runId = taskWithMessage.task.run_id; + + // Populate cache if flow_input was provided by SQL (root non-map steps only) + if (taskWithMessage.flowInput !== null) { + flowInputProvider.populate(runId, taskWithMessage.flowInput); + } + // Build context directly using platform resources + // flowInput is a Promise that either resolves immediately (cached) or lazy-loads const context: FlowContext & TResources = { // Core platform resources env: platformAdapter.env, @@ -132,7 +144,7 @@ export function createFlowWorker(params: { rawMessage, stepTask, workerConfig: defaultWorkerConfig, - flowInput, + flowInput: Promise.resolve(flowInput), // Supabase-specific resources (always present) sql, diff --git a/pkgs/edge-worker/tests/integration/flow/mapFlow.test.ts b/pkgs/edge-worker/tests/integration/flow/mapFlow.test.ts index a62636229..93f4fbd50 100644 --- a/pkgs/edge-worker/tests/integration/flow/mapFlow.test.ts +++ b/pkgs/edge-worker/tests/integration/flow/mapFlow.test.ts @@ -32,8 +32,9 @@ const DependentMapFlow = new Flow<{ prefix: string }>({ slug: 'test_dependent_ma }) .step({ slug: 'aggregate', dependsOn: ['uppercase'] }, async (deps, ctx) => { await delay(1); + const flowInput = await ctx.flowInput; return { - prefix: ctx.flowInput.prefix, + prefix: flowInput.prefix, processed: deps.uppercase, count: deps.uppercase.length }; diff --git a/pkgs/edge-worker/tests/unit/contextUtils.test.ts b/pkgs/edge-worker/tests/unit/contextUtils.test.ts index 2c76f24ea..d7b0bddc2 100644 --- a/pkgs/edge-worker/tests/unit/contextUtils.test.ts +++ b/pkgs/edge-worker/tests/unit/contextUtils.test.ts @@ -57,9 +57,9 @@ const mockStepTask = { step_slug: 'test-step', input: { run: { test: 'input' } }, msg_id: 123, - flow_input: mockFlowInput, + flow_input: mockFlowInput, // Can be actual value or null - test helper wraps in Promise task_index: 0 -} as StepTaskRecord; +} as unknown as StepTaskRecord; Deno.test('createSupabaseMessageContext - creates context with all Supabase resources', () => { const context = createSupabaseMessageContext({ @@ -102,7 +102,7 @@ Deno.test('createTestMessageContext - allows custom resources for testing', () = assertEquals((context as unknown as { customResource: string }).customResource, 'test-value'); }); -Deno.test('createSupabaseStepTaskContext - creates context with step task', () => { +Deno.test('createSupabaseStepTaskContext - creates context with step task', async () => { const context = createSupabaseStepTaskContext({ env: fullEnv, sql: mockSql, @@ -118,7 +118,9 @@ Deno.test('createSupabaseStepTaskContext - creates context with step task', () = assertEquals(context.shutdownSignal, mockAbortSignal); assertEquals(context.stepTask, mockStepTask); assertEquals(context.rawMessage, mockStepMessage); - assertEquals(context.flowInput, mockFlowInput); + + // flowInput is now a Promise + assertEquals(await context.flowInput, mockFlowInput); // Supabase client should always be present assertExists(context.supabase); diff --git a/pkgs/website/src/content/docs/news/pgflow-0-12-0-simpler-handler-signatures-for-flow-composition.mdx b/pkgs/website/src/content/docs/news/pgflow-0-12-0-simpler-handler-signatures-for-flow-composition.mdx index 9de8fd4e5..fcc533ec9 100644 --- a/pkgs/website/src/content/docs/news/pgflow-0-12-0-simpler-handler-signatures-for-flow-composition.mdx +++ b/pkgs/website/src/content/docs/news/pgflow-0-12-0-simpler-handler-signatures-for-flow-composition.mdx @@ -24,8 +24,8 @@ Handler signatures are now asymmetric based on step type: |-----------|--------|-------| | Root step | `(input) => input.run.xxx` | `(flowInput, ctx) => flowInput.xxx` | | Dependent step | `(input) => input.dep.xxx` | `(deps, ctx) => deps.dep.xxx` | -| Dependent (needs flowInput) | `(input) => input.run.xxx` | `(deps, ctx) => ctx.flowInput.xxx` | -| Map step | `(item) => ...` | `(item, ctx) => ctx.flowInput.xxx` | +| Dependent (needs flowInput) | `(input) => input.run.xxx` | `async (deps, ctx) => (await ctx.flowInput).xxx` | +| Map step | `(item) => ...` | `async (item, ctx) => (await ctx.flowInput).xxx` |