Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion pkgs/core/schemas/0040_types.sql
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,6 @@ create type pgflow.step_task_record as (
step_slug text,
input jsonb,
msg_id bigint,
task_index int
task_index int,
flow_input jsonb
);
6 changes: 5 additions & 1 deletion pkgs/core/schemas/0120_function_start_tasks.sql
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,11 @@ as $$
coalesce(dep_out.deps_output, '{}'::jsonb)
END as input,
st.message_id as msg_id,
st.task_index as task_index
st.task_index as task_index,
-- flow_input: Original run input for worker context
-- This allows workers to access the original flow input directly
-- without parsing it from the constructed 'input' field
r.input as flow_input
from tasks st
join runs r on st.run_id = r.run_id
join pgflow.steps step on
Expand Down
1 change: 1 addition & 0 deletions pkgs/core/src/database-types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -613,6 +613,7 @@ export type Database = {
input: Json | null
msg_id: number | null
task_index: number | null
flow_input: Json | null
}
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,182 @@
-- Modify "step_task_record" composite type
ALTER TYPE "pgflow"."step_task_record" ADD ATTRIBUTE "flow_input" jsonb;
-- Modify "start_tasks" function
CREATE OR REPLACE FUNCTION "pgflow"."start_tasks" ("flow_slug" text, "msg_ids" bigint[], "worker_id" uuid) RETURNS SETOF "pgflow"."step_task_record" LANGUAGE sql SET "search_path" = '' AS $$
with tasks as (
select
task.flow_slug,
task.run_id,
task.step_slug,
task.task_index,
task.message_id
from pgflow.step_tasks as task
join pgflow.runs r on r.run_id = task.run_id
where task.flow_slug = start_tasks.flow_slug
and task.message_id = any(msg_ids)
and task.status = 'queued'
-- MVP: Don't start tasks on failed runs
and r.status != 'failed'
),
start_tasks_update as (
update pgflow.step_tasks
set
attempts_count = attempts_count + 1,
status = 'started',
started_at = now(),
last_worker_id = worker_id
from tasks
where step_tasks.message_id = tasks.message_id
and step_tasks.flow_slug = tasks.flow_slug
and step_tasks.status = 'queued'
),
runs as (
select
r.run_id,
r.input
from pgflow.runs r
where r.run_id in (select run_id from tasks)
),
deps as (
select
st.run_id,
st.step_slug,
dep.dep_slug,
-- Aggregate map outputs or use single output
CASE
WHEN dep_step.step_type = 'map' THEN
-- Aggregate all task outputs ordered by task_index
-- Use COALESCE to return empty array if no tasks
(SELECT COALESCE(jsonb_agg(dt.output ORDER BY dt.task_index), '[]'::jsonb)
FROM pgflow.step_tasks dt
WHERE dt.run_id = st.run_id
AND dt.step_slug = dep.dep_slug
AND dt.status = 'completed')
ELSE
-- Single step: use the single task output
dep_task.output
END as dep_output
from tasks st
join pgflow.deps dep on dep.flow_slug = st.flow_slug and dep.step_slug = st.step_slug
join pgflow.steps dep_step on dep_step.flow_slug = dep.flow_slug and dep_step.step_slug = dep.dep_slug
left join pgflow.step_tasks dep_task on
dep_task.run_id = st.run_id and
dep_task.step_slug = dep.dep_slug and
dep_task.status = 'completed'
and dep_step.step_type = 'single' -- Only join for single steps
),
deps_outputs as (
select
d.run_id,
d.step_slug,
jsonb_object_agg(d.dep_slug, d.dep_output) as deps_output,
count(*) as dep_count
from deps d
group by d.run_id, d.step_slug
),
timeouts as (
select
task.message_id,
task.flow_slug,
coalesce(step.opt_timeout, flow.opt_timeout) + 2 as vt_delay
from tasks task
join pgflow.flows flow on flow.flow_slug = task.flow_slug
join pgflow.steps step on step.flow_slug = task.flow_slug and step.step_slug = task.step_slug
),
-- Batch update visibility timeouts for all messages
set_vt_batch as (
select pgflow.set_vt_batch(
start_tasks.flow_slug,
array_agg(t.message_id order by t.message_id),
array_agg(t.vt_delay order by t.message_id)
)
from timeouts t
)
select
st.flow_slug,
st.run_id,
st.step_slug,
-- ==========================================
-- INPUT CONSTRUCTION LOGIC
-- ==========================================
-- This nested CASE statement determines how to construct the input
-- for each task based on the step type (map vs non-map).
--
-- The fundamental difference:
-- - Map steps: Receive RAW array elements (e.g., just 42 or "hello")
-- - Non-map steps: Receive structured objects with named keys
-- (e.g., {"run": {...}, "dependency1": {...}})
-- ==========================================
CASE
-- -------------------- MAP STEPS --------------------
-- Map steps process arrays element-by-element.
-- Each task receives ONE element from the array at its task_index position.
WHEN step.step_type = 'map' THEN
-- Map steps get raw array elements without any wrapper object
CASE
-- ROOT MAP: Gets array from run input
-- Example: run input = [1, 2, 3]
-- task 0 gets: 1
-- task 1 gets: 2
-- task 2 gets: 3
WHEN step.deps_count = 0 THEN
-- Root map (deps_count = 0): no dependencies, reads from run input.
-- Extract the element at task_index from the run's input array.
-- Note: If run input is not an array, this will return NULL
-- and the flow will fail (validated in start_flow).
jsonb_array_element(r.input, st.task_index)

-- DEPENDENT MAP: Gets array from its single dependency
-- Example: dependency output = ["a", "b", "c"]
-- task 0 gets: "a"
-- task 1 gets: "b"
-- task 2 gets: "c"
ELSE
-- Has dependencies (should be exactly 1 for map steps).
-- Extract the element at task_index from the dependency's output array.
--
-- Why the subquery with jsonb_each?
-- - The dependency outputs a raw array: [1, 2, 3]
-- - deps_outputs aggregates it into: {"dep_name": [1, 2, 3]}
-- - We need to unwrap and get just the array value
-- - Map steps have exactly 1 dependency (enforced by add_step)
-- - So jsonb_each will return exactly 1 row
-- - We extract the 'value' which is the raw array [1, 2, 3]
-- - Then get the element at task_index from that array
(SELECT jsonb_array_element(value, st.task_index)
FROM jsonb_each(dep_out.deps_output)
LIMIT 1)
END

-- -------------------- NON-MAP STEPS --------------------
-- Regular (non-map) steps receive ALL inputs as a structured object.
-- This includes the original run input plus all dependency outputs.
ELSE
-- Non-map steps get structured input with named keys
-- Example output: {
-- "run": {"original": "input"},
-- "step1": {"output": "from_step1"},
-- "step2": {"output": "from_step2"}
-- }
--
-- Build object with 'run' key containing original input
jsonb_build_object('run', r.input) ||
-- Merge with deps_output which already has dependency outputs
-- deps_output format: {"dep1": output1, "dep2": output2, ...}
-- If no dependencies, defaults to empty object
coalesce(dep_out.deps_output, '{}'::jsonb)
END as input,
st.message_id as msg_id,
st.task_index as task_index,
-- flow_input: Original run input for worker context
-- This allows workers to access the original flow input directly
-- without parsing it from the constructed 'input' field
r.input as flow_input
from tasks st
join runs r on st.run_id = r.run_id
join pgflow.steps step on
step.flow_slug = st.flow_slug and
step.step_slug = st.step_slug
left join deps_outputs dep_out on
dep_out.run_id = st.run_id and
dep_out.step_slug = st.step_slug
$$;
3 changes: 2 additions & 1 deletion pkgs/core/supabase/migrations/atlas.sum
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
h1:yUEa4QnI0kEuEtxWNiNgRgf9GENvG1FkUmzwlu0vLm0=
h1:nPCLpuNY7xbRl3lNa/ns7GG1Uf3twA0VSK7DIVBhzsc=
20250429164909_pgflow_initial.sql h1:I3n/tQIg5Q5nLg7RDoU3BzqHvFVjmumQxVNbXTPG15s=
20250517072017_pgflow_fix_poll_for_tasks_to_use_separate_statement_for_polling.sql h1:wTuXuwMxVniCr3ONCpodpVWJcHktoQZIbqMZ3sUHKMY=
20250609105135_pgflow_add_start_tasks_and_started_status.sql h1:ggGanW4Wyt8Kv6TWjnZ00/qVb3sm+/eFVDjGfT8qyPg=
Expand All @@ -14,3 +14,4 @@ h1:yUEa4QnI0kEuEtxWNiNgRgf9GENvG1FkUmzwlu0vLm0=
20251130000000_pgflow_auto_compilation.sql h1:qs+3qq1Vsyo0ETzbxDnmkVtSUa6XHkd/K9wF/3W46jM=
20251209074533_pgflow_worker_management.sql h1:ozFkYM1EvEH7dGvW+1pWgpzbwdWlwuQoddqLomL1GXw=
20251212100113_pgflow_allow_data_loss_parameter.sql h1:Fg3RHj51STNHS4epQ2J4AFMj7NwG0XfyDTSA/9dcBIQ=
20251223052347_pgflow_add_flow_input_column.sql h1:RwQBPOMml4cCR6qeqd5kVQKhxXAbFqkHEJa0ruXYiTo=
140 changes: 140 additions & 0 deletions pkgs/core/supabase/tests/start_tasks/returns_flow_input.test.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
begin;
select plan(5);
select pgflow_tests.reset_db();

-- Test 1: Root step returns flow_input matching original run input
select pgflow.create_flow('simple_flow');
select pgflow.add_step('simple_flow', 'root_step');
select pgflow.start_flow('simple_flow', '{"user_id": "abc123", "config": {"debug": true}}'::jsonb);

select pgflow_tests.ensure_worker('simple_flow');

with msgs as (
select * from pgmq.read_with_poll('simple_flow', 10, 5, 1, 50) limit 1
),
msg_ids as (
select array_agg(msg_id) as ids from msgs
),
started_tasks as (
select * from pgflow.start_tasks(
'simple_flow',
(select ids from msg_ids),
'11111111-1111-1111-1111-111111111111'::uuid
)
)
select is(
(select flow_input from started_tasks),
'{"user_id": "abc123", "config": {"debug": true}}'::jsonb,
'Root step flow_input should match original run input'
);

-- Test 2: Dependent step also returns flow_input
select pgflow_tests.reset_db();
select pgflow.create_flow('dep_flow');
select pgflow.add_step('dep_flow', 'first');
select pgflow.add_step('dep_flow', 'second', ARRAY['first']);
select pgflow.start_flow('dep_flow', '{"original": "input"}'::jsonb);

select pgflow_tests.ensure_worker('dep_flow');

-- Complete first step
with poll_result as (
select * from pgflow_tests.read_and_start('dep_flow', 1, 1)
)
select pgflow.complete_task(
run_id,
step_slug,
task_index,
'{"first_result": "done"}'::jsonb
) from poll_result;

-- Start second step and verify flow_input
select pgflow_tests.ensure_worker('dep_flow', '22222222-2222-2222-2222-222222222222'::uuid);
with msgs as (
select * from pgmq.read_with_poll('dep_flow', 10, 5, 1, 50) limit 1
),
msg_ids as (
select array_agg(msg_id) as ids from msgs
),
started_tasks as (
select * from pgflow.start_tasks(
'dep_flow',
(select ids from msg_ids),
'22222222-2222-2222-2222-222222222222'::uuid
)
)
select is(
(select flow_input from started_tasks),
'{"original": "input"}'::jsonb,
'Dependent step flow_input should match original run input'
);

-- Tests 3 & 4: Root map step returns flow_input (the array)
-- All map tasks should have same flow_input and it should be the original array
select pgflow_tests.reset_db();
select pgflow.create_flow('map_flow');
select pgflow.add_step('map_flow', 'map_step', '{}', null, null, null, null, 'map');
select pgflow.start_flow('map_flow', '[1, 2, 3]'::jsonb);

select pgflow_tests.ensure_worker('map_flow');

-- Save map tasks to temp table so we can run multiple assertions
create temp table map_started_tasks as
with msgs as (
select * from pgmq.read_with_poll('map_flow', 10, 5, 3, 50) order by msg_id
),
msg_ids as (
select array_agg(msg_id order by msg_id) as ids from msgs
)
select * from pgflow.start_tasks(
'map_flow',
(select ids from msg_ids),
'11111111-1111-1111-1111-111111111111'::uuid
);

-- Test 3: All map tasks should have same flow_input
select is(
(select count(distinct flow_input)::int from map_started_tasks),
1,
'All map tasks should have same flow_input'
);

-- Test 4: Map task flow_input is the original array, not the element
select is(
(select flow_input from map_started_tasks limit 1),
'[1, 2, 3]'::jsonb,
'Map task flow_input should be original array, not individual element'
);

drop table map_started_tasks;

-- Test 5: Multiple tasks in batch all have correct flow_input
select pgflow_tests.reset_db();
select pgflow.create_flow('multi_flow');
select pgflow.add_step('multi_flow', 'step1');
select pgflow.add_step('multi_flow', 'step2'); -- parallel step, no deps
select pgflow.start_flow('multi_flow', '{"batch": "test"}'::jsonb);

select pgflow_tests.ensure_worker('multi_flow');

-- Both tasks should be queued, read both
with msgs as (
select * from pgmq.read_with_poll('multi_flow', 10, 5, 2, 50)
),
msg_ids as (
select array_agg(msg_id) as ids from msgs
),
started_tasks as (
select * from pgflow.start_tasks(
'multi_flow',
(select ids from msg_ids),
'11111111-1111-1111-1111-111111111111'::uuid
)
)
select ok(
(select bool_and(flow_input = '{"batch": "test"}'::jsonb) from started_tasks),
'All tasks in batch should have correct flow_input'
);

select finish();
rollback;
Loading