diff --git a/pkgs/core/schemas/0040_types.sql b/pkgs/core/schemas/0040_types.sql index 1bd799fc4..102a6cf01 100644 --- a/pkgs/core/schemas/0040_types.sql +++ b/pkgs/core/schemas/0040_types.sql @@ -5,5 +5,6 @@ create type pgflow.step_task_record as ( step_slug text, input jsonb, msg_id bigint, - task_index int + task_index int, + flow_input jsonb ); diff --git a/pkgs/core/schemas/0120_function_start_tasks.sql b/pkgs/core/schemas/0120_function_start_tasks.sql index 3824bad78..ccdfdfea6 100644 --- a/pkgs/core/schemas/0120_function_start_tasks.sql +++ b/pkgs/core/schemas/0120_function_start_tasks.sql @@ -172,7 +172,11 @@ as $$ coalesce(dep_out.deps_output, '{}'::jsonb) END as input, st.message_id as msg_id, - st.task_index as task_index + st.task_index as task_index, + -- flow_input: Original run input for worker context + -- This allows workers to access the original flow input directly + -- without parsing it from the constructed 'input' field + r.input as flow_input from tasks st join runs r on st.run_id = r.run_id join pgflow.steps step on diff --git a/pkgs/core/src/database-types.ts b/pkgs/core/src/database-types.ts index 3d7456c87..d37bdfa40 100644 --- a/pkgs/core/src/database-types.ts +++ b/pkgs/core/src/database-types.ts @@ -613,6 +613,7 @@ export type Database = { input: Json | null msg_id: number | null task_index: number | null + flow_input: Json | null } } } diff --git a/pkgs/core/supabase/migrations/20251223052347_pgflow_add_flow_input_column.sql b/pkgs/core/supabase/migrations/20251223052347_pgflow_add_flow_input_column.sql new file mode 100644 index 000000000..bbcf4915d --- /dev/null +++ b/pkgs/core/supabase/migrations/20251223052347_pgflow_add_flow_input_column.sql @@ -0,0 +1,182 @@ +-- Modify "step_task_record" composite type +ALTER TYPE "pgflow"."step_task_record" ADD ATTRIBUTE "flow_input" jsonb; +-- Modify "start_tasks" function +CREATE OR REPLACE FUNCTION "pgflow"."start_tasks" ("flow_slug" text, "msg_ids" bigint[], "worker_id" uuid) RETURNS SETOF "pgflow"."step_task_record" LANGUAGE sql SET "search_path" = '' AS $$ +with tasks as ( + select + task.flow_slug, + task.run_id, + task.step_slug, + task.task_index, + task.message_id + from pgflow.step_tasks as task + join pgflow.runs r on r.run_id = task.run_id + where task.flow_slug = start_tasks.flow_slug + and task.message_id = any(msg_ids) + and task.status = 'queued' + -- MVP: Don't start tasks on failed runs + and r.status != 'failed' + ), + start_tasks_update as ( + update pgflow.step_tasks + set + attempts_count = attempts_count + 1, + status = 'started', + started_at = now(), + last_worker_id = worker_id + from tasks + where step_tasks.message_id = tasks.message_id + and step_tasks.flow_slug = tasks.flow_slug + and step_tasks.status = 'queued' + ), + runs as ( + select + r.run_id, + r.input + from pgflow.runs r + where r.run_id in (select run_id from tasks) + ), + deps as ( + select + st.run_id, + st.step_slug, + dep.dep_slug, + -- Aggregate map outputs or use single output + CASE + WHEN dep_step.step_type = 'map' THEN + -- Aggregate all task outputs ordered by task_index + -- Use COALESCE to return empty array if no tasks + (SELECT COALESCE(jsonb_agg(dt.output ORDER BY dt.task_index), '[]'::jsonb) + FROM pgflow.step_tasks dt + WHERE dt.run_id = st.run_id + AND dt.step_slug = dep.dep_slug + AND dt.status = 'completed') + ELSE + -- Single step: use the single task output + dep_task.output + END as dep_output + from tasks st + join pgflow.deps dep on dep.flow_slug = st.flow_slug and dep.step_slug = st.step_slug + join pgflow.steps dep_step on dep_step.flow_slug = dep.flow_slug and dep_step.step_slug = dep.dep_slug + left join pgflow.step_tasks dep_task on + dep_task.run_id = st.run_id and + dep_task.step_slug = dep.dep_slug and + dep_task.status = 'completed' + and dep_step.step_type = 'single' -- Only join for single steps + ), + deps_outputs as ( + select + d.run_id, + d.step_slug, + jsonb_object_agg(d.dep_slug, d.dep_output) as deps_output, + count(*) as dep_count + from deps d + group by d.run_id, d.step_slug + ), + timeouts as ( + select + task.message_id, + task.flow_slug, + coalesce(step.opt_timeout, flow.opt_timeout) + 2 as vt_delay + from tasks task + join pgflow.flows flow on flow.flow_slug = task.flow_slug + join pgflow.steps step on step.flow_slug = task.flow_slug and step.step_slug = task.step_slug + ), + -- Batch update visibility timeouts for all messages + set_vt_batch as ( + select pgflow.set_vt_batch( + start_tasks.flow_slug, + array_agg(t.message_id order by t.message_id), + array_agg(t.vt_delay order by t.message_id) + ) + from timeouts t + ) + select + st.flow_slug, + st.run_id, + st.step_slug, + -- ========================================== + -- INPUT CONSTRUCTION LOGIC + -- ========================================== + -- This nested CASE statement determines how to construct the input + -- for each task based on the step type (map vs non-map). + -- + -- The fundamental difference: + -- - Map steps: Receive RAW array elements (e.g., just 42 or "hello") + -- - Non-map steps: Receive structured objects with named keys + -- (e.g., {"run": {...}, "dependency1": {...}}) + -- ========================================== + CASE + -- -------------------- MAP STEPS -------------------- + -- Map steps process arrays element-by-element. + -- Each task receives ONE element from the array at its task_index position. + WHEN step.step_type = 'map' THEN + -- Map steps get raw array elements without any wrapper object + CASE + -- ROOT MAP: Gets array from run input + -- Example: run input = [1, 2, 3] + -- task 0 gets: 1 + -- task 1 gets: 2 + -- task 2 gets: 3 + WHEN step.deps_count = 0 THEN + -- Root map (deps_count = 0): no dependencies, reads from run input. + -- Extract the element at task_index from the run's input array. + -- Note: If run input is not an array, this will return NULL + -- and the flow will fail (validated in start_flow). + jsonb_array_element(r.input, st.task_index) + + -- DEPENDENT MAP: Gets array from its single dependency + -- Example: dependency output = ["a", "b", "c"] + -- task 0 gets: "a" + -- task 1 gets: "b" + -- task 2 gets: "c" + ELSE + -- Has dependencies (should be exactly 1 for map steps). + -- Extract the element at task_index from the dependency's output array. + -- + -- Why the subquery with jsonb_each? + -- - The dependency outputs a raw array: [1, 2, 3] + -- - deps_outputs aggregates it into: {"dep_name": [1, 2, 3]} + -- - We need to unwrap and get just the array value + -- - Map steps have exactly 1 dependency (enforced by add_step) + -- - So jsonb_each will return exactly 1 row + -- - We extract the 'value' which is the raw array [1, 2, 3] + -- - Then get the element at task_index from that array + (SELECT jsonb_array_element(value, st.task_index) + FROM jsonb_each(dep_out.deps_output) + LIMIT 1) + END + + -- -------------------- NON-MAP STEPS -------------------- + -- Regular (non-map) steps receive ALL inputs as a structured object. + -- This includes the original run input plus all dependency outputs. + ELSE + -- Non-map steps get structured input with named keys + -- Example output: { + -- "run": {"original": "input"}, + -- "step1": {"output": "from_step1"}, + -- "step2": {"output": "from_step2"} + -- } + -- + -- Build object with 'run' key containing original input + jsonb_build_object('run', r.input) || + -- Merge with deps_output which already has dependency outputs + -- deps_output format: {"dep1": output1, "dep2": output2, ...} + -- If no dependencies, defaults to empty object + coalesce(dep_out.deps_output, '{}'::jsonb) + END as input, + st.message_id as msg_id, + st.task_index as task_index, + -- flow_input: Original run input for worker context + -- This allows workers to access the original flow input directly + -- without parsing it from the constructed 'input' field + r.input as flow_input + from tasks st + join runs r on st.run_id = r.run_id + join pgflow.steps step on + step.flow_slug = st.flow_slug and + step.step_slug = st.step_slug + left join deps_outputs dep_out on + dep_out.run_id = st.run_id and + dep_out.step_slug = st.step_slug +$$; diff --git a/pkgs/core/supabase/migrations/atlas.sum b/pkgs/core/supabase/migrations/atlas.sum index b50746007..1757e7d0b 100644 --- a/pkgs/core/supabase/migrations/atlas.sum +++ b/pkgs/core/supabase/migrations/atlas.sum @@ -1,4 +1,4 @@ -h1:yUEa4QnI0kEuEtxWNiNgRgf9GENvG1FkUmzwlu0vLm0= +h1:nPCLpuNY7xbRl3lNa/ns7GG1Uf3twA0VSK7DIVBhzsc= 20250429164909_pgflow_initial.sql h1:I3n/tQIg5Q5nLg7RDoU3BzqHvFVjmumQxVNbXTPG15s= 20250517072017_pgflow_fix_poll_for_tasks_to_use_separate_statement_for_polling.sql h1:wTuXuwMxVniCr3ONCpodpVWJcHktoQZIbqMZ3sUHKMY= 20250609105135_pgflow_add_start_tasks_and_started_status.sql h1:ggGanW4Wyt8Kv6TWjnZ00/qVb3sm+/eFVDjGfT8qyPg= @@ -14,3 +14,4 @@ h1:yUEa4QnI0kEuEtxWNiNgRgf9GENvG1FkUmzwlu0vLm0= 20251130000000_pgflow_auto_compilation.sql h1:qs+3qq1Vsyo0ETzbxDnmkVtSUa6XHkd/K9wF/3W46jM= 20251209074533_pgflow_worker_management.sql h1:ozFkYM1EvEH7dGvW+1pWgpzbwdWlwuQoddqLomL1GXw= 20251212100113_pgflow_allow_data_loss_parameter.sql h1:Fg3RHj51STNHS4epQ2J4AFMj7NwG0XfyDTSA/9dcBIQ= +20251223052347_pgflow_add_flow_input_column.sql h1:RwQBPOMml4cCR6qeqd5kVQKhxXAbFqkHEJa0ruXYiTo= diff --git a/pkgs/core/supabase/tests/start_tasks/returns_flow_input.test.sql b/pkgs/core/supabase/tests/start_tasks/returns_flow_input.test.sql new file mode 100644 index 000000000..18d064676 --- /dev/null +++ b/pkgs/core/supabase/tests/start_tasks/returns_flow_input.test.sql @@ -0,0 +1,140 @@ +begin; +select plan(5); +select pgflow_tests.reset_db(); + +-- Test 1: Root step returns flow_input matching original run input +select pgflow.create_flow('simple_flow'); +select pgflow.add_step('simple_flow', 'root_step'); +select pgflow.start_flow('simple_flow', '{"user_id": "abc123", "config": {"debug": true}}'::jsonb); + +select pgflow_tests.ensure_worker('simple_flow'); + +with msgs as ( + select * from pgmq.read_with_poll('simple_flow', 10, 5, 1, 50) limit 1 +), +msg_ids as ( + select array_agg(msg_id) as ids from msgs +), +started_tasks as ( + select * from pgflow.start_tasks( + 'simple_flow', + (select ids from msg_ids), + '11111111-1111-1111-1111-111111111111'::uuid + ) +) +select is( + (select flow_input from started_tasks), + '{"user_id": "abc123", "config": {"debug": true}}'::jsonb, + 'Root step flow_input should match original run input' +); + +-- Test 2: Dependent step also returns flow_input +select pgflow_tests.reset_db(); +select pgflow.create_flow('dep_flow'); +select pgflow.add_step('dep_flow', 'first'); +select pgflow.add_step('dep_flow', 'second', ARRAY['first']); +select pgflow.start_flow('dep_flow', '{"original": "input"}'::jsonb); + +select pgflow_tests.ensure_worker('dep_flow'); + +-- Complete first step +with poll_result as ( + select * from pgflow_tests.read_and_start('dep_flow', 1, 1) +) +select pgflow.complete_task( + run_id, + step_slug, + task_index, + '{"first_result": "done"}'::jsonb +) from poll_result; + +-- Start second step and verify flow_input +select pgflow_tests.ensure_worker('dep_flow', '22222222-2222-2222-2222-222222222222'::uuid); +with msgs as ( + select * from pgmq.read_with_poll('dep_flow', 10, 5, 1, 50) limit 1 +), +msg_ids as ( + select array_agg(msg_id) as ids from msgs +), +started_tasks as ( + select * from pgflow.start_tasks( + 'dep_flow', + (select ids from msg_ids), + '22222222-2222-2222-2222-222222222222'::uuid + ) +) +select is( + (select flow_input from started_tasks), + '{"original": "input"}'::jsonb, + 'Dependent step flow_input should match original run input' +); + +-- Tests 3 & 4: Root map step returns flow_input (the array) +-- All map tasks should have same flow_input and it should be the original array +select pgflow_tests.reset_db(); +select pgflow.create_flow('map_flow'); +select pgflow.add_step('map_flow', 'map_step', '{}', null, null, null, null, 'map'); +select pgflow.start_flow('map_flow', '[1, 2, 3]'::jsonb); + +select pgflow_tests.ensure_worker('map_flow'); + +-- Save map tasks to temp table so we can run multiple assertions +create temp table map_started_tasks as +with msgs as ( + select * from pgmq.read_with_poll('map_flow', 10, 5, 3, 50) order by msg_id +), +msg_ids as ( + select array_agg(msg_id order by msg_id) as ids from msgs +) +select * from pgflow.start_tasks( + 'map_flow', + (select ids from msg_ids), + '11111111-1111-1111-1111-111111111111'::uuid +); + +-- Test 3: All map tasks should have same flow_input +select is( + (select count(distinct flow_input)::int from map_started_tasks), + 1, + 'All map tasks should have same flow_input' +); + +-- Test 4: Map task flow_input is the original array, not the element +select is( + (select flow_input from map_started_tasks limit 1), + '[1, 2, 3]'::jsonb, + 'Map task flow_input should be original array, not individual element' +); + +drop table map_started_tasks; + +-- Test 5: Multiple tasks in batch all have correct flow_input +select pgflow_tests.reset_db(); +select pgflow.create_flow('multi_flow'); +select pgflow.add_step('multi_flow', 'step1'); +select pgflow.add_step('multi_flow', 'step2'); -- parallel step, no deps +select pgflow.start_flow('multi_flow', '{"batch": "test"}'::jsonb); + +select pgflow_tests.ensure_worker('multi_flow'); + +-- Both tasks should be queued, read both +with msgs as ( + select * from pgmq.read_with_poll('multi_flow', 10, 5, 2, 50) +), +msg_ids as ( + select array_agg(msg_id) as ids from msgs +), +started_tasks as ( + select * from pgflow.start_tasks( + 'multi_flow', + (select ids from msg_ids), + '11111111-1111-1111-1111-111111111111'::uuid + ) +) +select ok( + (select bool_and(flow_input = '{"batch": "test"}'::jsonb) from started_tasks), + 'All tasks in batch should have correct flow_input' +); + +select finish(); +rollback;