Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .changeset/asymmetric-handler-signatures.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,5 +10,6 @@ BREAKING: Asymmetric handler signatures - remove `run` key from step inputs

- Root steps: `(flowInput, ctx) => ...` - flow input directly as first param
- Dependent steps: `(deps, ctx) => ...` - only dependency outputs as first param
- Access flow input in dependent steps via `ctx.flowInput`
- Access flow input in dependent steps via `await ctx.flowInput` (async/lazy-loaded)
- Lazy loading prevents data duplication for map steps processing large arrays
- Enables functional composition and simplifies types for future subflows
3 changes: 2 additions & 1 deletion pkgs/cli/examples/analyze_website.ts
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ const AnalyzeWebsite = new Flow<WebsiteAnalysisInput>({
.step(
{ slug: 'saveToDb', dependsOn: ['sentiment', 'summary'] },
async (_deps, ctx) => {
console.log(`Saving results to database for: ${ctx.flowInput.url}`);
const flowInput = await ctx.flowInput;
console.log(`Saving results to database for: ${flowInput.url}`);
// In a real implementation, this would save to a database
return {
status: 'success',
Expand Down
23 changes: 22 additions & 1 deletion pkgs/cli/src/commands/install/create-example-worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,28 @@ import { log, confirm } from '@clack/prompts';
import chalk from 'chalk';
import { getVersion } from '../../utils/get-version.js';

const INDEX_TS_TEMPLATE = `import { EdgeWorker } from '@pgflow/edge-worker';
const INDEX_TS_TEMPLATE = `/**
* To run this worker locally:
*
* 1. Start the Edge Runtime (in one terminal):
* npx supabase functions serve --no-verify-jwt
*
* 2. Start the worker (in another terminal):
* curl http://localhost:54321/functions/v1/greet-user-worker
*
* 3. Trigger a flow run (in Supabase Studio SQL Editor):
* SELECT * FROM pgflow.start_flow(
* flow_slug => 'greetUser',
* input => '{"firstName": "Alice", "lastName": "Smith"}'::jsonb
* );
*
* 4. Check run status:
* SELECT * FROM pgflow.runs
* WHERE flow_slug = 'greetUser'
* ORDER BY started_at DESC
* LIMIT 1;
*/
import { EdgeWorker } from '@pgflow/edge-worker';
import { GreetUser } from '../../flows/greet-user.ts';

EdgeWorker.start(GreetUser);
Expand Down
6 changes: 4 additions & 2 deletions pkgs/client/__tests__/e2e/full-stack-dsl.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,8 @@ describe('Full Stack DSL Integration', () => {
tasks = await readAndStart(sql, sqlClient, SimpleFlow.slug, 1, 5);
expect(tasks).toHaveLength(1);
expect(tasks[0].step_slug).toBe('process');
expect(tasks[0].flow_input).toEqual(input);
// Dependent steps have null flow_input (lazy loaded via ctx.flowInput)
expect(tasks[0].flow_input).toBeNull();
expect(tasks[0].input.fetch).toEqual(fetchOutput); // Critical: dependency output included

const processOutput = {
Expand All @@ -132,7 +133,8 @@ describe('Full Stack DSL Integration', () => {
tasks = await readAndStart(sql, sqlClient, SimpleFlow.slug, 1, 5);
expect(tasks).toHaveLength(1);
expect(tasks[0].step_slug).toBe('save');
expect(tasks[0].flow_input).toEqual(input);
// Dependent steps have null flow_input (lazy loaded via ctx.flowInput)
expect(tasks[0].flow_input).toBeNull();

// The save step only depends on process, so it should only have process output
// This is correct behavior - transitive dependencies are not automatically included
Expand Down
6 changes: 4 additions & 2 deletions pkgs/client/__tests__/e2e/happy-path-e2e.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,8 @@ describe('Happy Path E2E Integration', () => {
tasks = await readAndStart(sql, sqlClient, testFlow.slug, 1, 5);
expect(tasks).toHaveLength(1);
expect(tasks[0].step_slug).toBe('process');
expect(tasks[0].flow_input).toEqual(input);
// Dependent steps have null flow_input (lazy loaded via ctx.flowInput)
expect(tasks[0].flow_input).toBeNull();
expect(tasks[0].input.fetch).toEqual(fetchOutput);

const processOutput = {
Expand Down Expand Up @@ -127,7 +128,8 @@ describe('Happy Path E2E Integration', () => {

log('Save task input:', JSON.stringify(tasks[0].input, null, 2));

expect(tasks[0].flow_input).toEqual(input);
// Dependent steps have null flow_input (lazy loaded via ctx.flowInput)
expect(tasks[0].flow_input).toBeNull();
expect(tasks[0].input.process).toEqual(processOutput);
// Note: save step only depends on process, so fetch output is not included (correct behavior)
expect(tasks[0].input.fetch).toBeUndefined();
Expand Down
11 changes: 8 additions & 3 deletions pkgs/core/schemas/0120_function_start_tasks.sql
Original file line number Diff line number Diff line change
Expand Up @@ -172,9 +172,14 @@ as $$
st.message_id as msg_id,
st.task_index as task_index,
-- flow_input: Original run input for worker context
-- This allows workers to access the original flow input directly
-- without parsing it from the constructed 'input' field
r.input as flow_input
-- Only included for root non-map steps to avoid data duplication.
-- Root map steps: flowInput IS the array, useless to include
-- Dependent steps: lazy load via ctx.flowInput when needed
CASE
WHEN step.step_type != 'map' AND step.deps_count = 0
THEN r.input
ELSE NULL
END as flow_input
from tasks st
join runs r on st.run_id = r.run_id
join pgflow.steps step on
Expand Down
6 changes: 5 additions & 1 deletion pkgs/core/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@ export type { Json };
* Same as pgflow.step_task_record type, but with not-null fields and type argument for payload.
* The input type is automatically inferred based on the step_slug using a discriminated union.
* This ensures that each step only receives inputs from its declared dependencies and the flow's run input.
*
* Note: flow_input is nullable because start_tasks only includes it for root non-map steps.
* For dependent and map steps, flow_input is NULL to avoid data duplication.
* Workers can access the original flow input via ctx.flowInput (lazy loaded).
*/
export type StepTaskRecord<TFlow extends AnyFlow> = {
[StepSlug in Extract<keyof ExtractFlowSteps<TFlow>, string>]: {
Expand All @@ -25,7 +29,7 @@ export type StepTaskRecord<TFlow extends AnyFlow> = {
task_index: number;
input: Simplify<StepInput<TFlow, StepSlug>>;
msg_id: number;
flow_input: ExtractFlowInput<TFlow>;
flow_input: ExtractFlowInput<TFlow> | null;
};
}[Extract<keyof ExtractFlowSteps<TFlow>, string>];

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,9 +166,14 @@ with tasks as (
st.message_id as msg_id,
st.task_index as task_index,
-- flow_input: Original run input for worker context
-- This allows workers to access the original flow input directly
-- without parsing it from the constructed 'input' field
r.input as flow_input
-- Only included for root non-map steps to avoid data duplication.
-- Root map steps: flowInput IS the array, useless to include
-- Dependent steps: lazy load via ctx.flowInput when needed
CASE
WHEN step.step_type != 'map' AND step.deps_count = 0
THEN r.input
ELSE NULL
END as flow_input
from tasks st
join runs r on st.run_id = r.run_id
join pgflow.steps step on
Expand Down
4 changes: 2 additions & 2 deletions pkgs/core/supabase/migrations/atlas.sum
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
h1:32eppXsursErjjXT7SfNzyRPfw4iiiL1Pnafyy4DSvk=
h1:SkrHj8RTAbqZRxs3/TvkZ0FV1l9SeNX2Cxba02DAPAk=
20250429164909_pgflow_initial.sql h1:I3n/tQIg5Q5nLg7RDoU3BzqHvFVjmumQxVNbXTPG15s=
20250517072017_pgflow_fix_poll_for_tasks_to_use_separate_statement_for_polling.sql h1:wTuXuwMxVniCr3ONCpodpVWJcHktoQZIbqMZ3sUHKMY=
20250609105135_pgflow_add_start_tasks_and_started_status.sql h1:ggGanW4Wyt8Kv6TWjnZ00/qVb3sm+/eFVDjGfT8qyPg=
Expand All @@ -14,4 +14,4 @@ h1:32eppXsursErjjXT7SfNzyRPfw4iiiL1Pnafyy4DSvk=
20251130000000_pgflow_auto_compilation.sql h1:qs+3qq1Vsyo0ETzbxDnmkVtSUa6XHkd/K9wF/3W46jM=
20251209074533_pgflow_worker_management.sql h1:ozFkYM1EvEH7dGvW+1pWgpzbwdWlwuQoddqLomL1GXw=
20251212100113_pgflow_allow_data_loss_parameter.sql h1:Fg3RHj51STNHS4epQ2J4AFMj7NwG0XfyDTSA/9dcBIQ=
20251223110504_pgflow_add_flow_input_column.sql h1:HJ9GJc4xh68JT82JINf946RS/jQNt9jDhvIH4VA40c8=
20251225163110_pgflow_add_flow_input_column.sql h1:734uCbTgKmPhTK3TY56uNYZ31T8u59yll9ea7nwtEoc=
207 changes: 207 additions & 0 deletions pkgs/core/supabase/tests/start_tasks/conditional_flow_input.test.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,207 @@
begin;
select plan(6);
select pgflow_tests.reset_db();

-- =========================================================================
-- Test: Conditional flow_input in start_tasks
--
-- Only root non-map steps receive flow_input.
-- All other step types (root map, dependent non-map, dependent map) get NULL.
-- This optimization prevents data duplication for large array processing.
-- =========================================================================

-- Test 1: Root non-map step receives flow_input
select pgflow.create_flow('root_step_flow');
select pgflow.add_step('root_step_flow', 'root_step');
select pgflow.start_flow('root_step_flow', '{"user_id": "abc123"}'::jsonb);

select pgflow_tests.ensure_worker('root_step_flow');

with msgs as (
select * from pgmq.read_with_poll('root_step_flow', 10, 5, 1, 50) limit 1
),
msg_ids as (
select array_agg(msg_id) as ids from msgs
),
started_tasks as (
select * from pgflow.start_tasks(
'root_step_flow',
(select ids from msg_ids),
'11111111-1111-1111-1111-111111111111'::uuid
)
)
select is(
(select flow_input from started_tasks),
'{"user_id": "abc123"}'::jsonb,
'Root non-map step should receive flow_input'
);

-- Test 2: Dependent non-map step receives NULL flow_input
select pgflow_tests.reset_db();
select pgflow.create_flow('dep_flow');
select pgflow.add_step('dep_flow', 'first');
select pgflow.add_step('dep_flow', 'second', ARRAY['first']);
select pgflow.start_flow('dep_flow', '{"original": "input"}'::jsonb);

select pgflow_tests.ensure_worker('dep_flow');

-- Complete first step
with poll_result as (
select * from pgflow_tests.read_and_start('dep_flow', 1, 1)
)
select pgflow.complete_task(
run_id,
step_slug,
task_index,
'{"first_result": "done"}'::jsonb
) from poll_result;

-- Start second step and verify flow_input is NULL
select pgflow_tests.ensure_worker('dep_flow', '22222222-2222-2222-2222-222222222222'::uuid);
with msgs as (
select * from pgmq.read_with_poll('dep_flow', 10, 5, 1, 50) limit 1
),
msg_ids as (
select array_agg(msg_id) as ids from msgs
),
started_tasks as (
select * from pgflow.start_tasks(
'dep_flow',
(select ids from msg_ids),
'22222222-2222-2222-2222-222222222222'::uuid
)
)
select is(
(select flow_input from started_tasks),
NULL::jsonb,
'Dependent non-map step should receive NULL flow_input (lazy loaded)'
);

-- Test 3: Root map step receives NULL flow_input
-- (flowInput IS the array, useless to include - workers get element via task.input)
select pgflow_tests.reset_db();
select pgflow.create_flow('root_map_flow');
select pgflow.add_step('root_map_flow', 'map_step', '{}', null, null, null, null, 'map');
select pgflow.start_flow('root_map_flow', '[1, 2, 3]'::jsonb);

select pgflow_tests.ensure_worker('root_map_flow');

with msgs as (
select * from pgmq.read_with_poll('root_map_flow', 10, 5, 3, 50) limit 1
),
msg_ids as (
select array_agg(msg_id) as ids from msgs
),
started_tasks as (
select * from pgflow.start_tasks(
'root_map_flow',
(select ids from msg_ids),
'11111111-1111-1111-1111-111111111111'::uuid
)
)
select is(
(select flow_input from started_tasks limit 1),
NULL::jsonb,
'Root map step should receive NULL flow_input (flowInput is the array itself)'
);

-- Test 4: Dependent map step receives NULL flow_input
select pgflow_tests.reset_db();
select pgflow.create_flow('dep_map_flow');
select pgflow.add_step('dep_map_flow', 'fetch_items', '{}', null, null, null, null, 'single');
select pgflow.add_step('dep_map_flow', 'process_item', ARRAY['fetch_items'], null, null, null, null, 'map');
select pgflow.start_flow('dep_map_flow', '{"config": "value"}'::jsonb);

select pgflow_tests.ensure_worker('dep_map_flow');

-- Complete first step with array
with poll_result as (
select * from pgflow_tests.read_and_start('dep_map_flow', 1, 1)
)
select pgflow.complete_task(
run_id,
step_slug,
task_index,
'["a", "b", "c"]'::jsonb
) from poll_result;

-- Start map step and verify flow_input is NULL
select pgflow_tests.ensure_worker('dep_map_flow', '33333333-3333-3333-3333-333333333333'::uuid);
with msgs as (
select * from pgmq.read_with_poll('dep_map_flow', 10, 5, 3, 50) limit 1
),
msg_ids as (
select array_agg(msg_id) as ids from msgs
),
started_tasks as (
select * from pgflow.start_tasks(
'dep_map_flow',
(select ids from msg_ids),
'33333333-3333-3333-3333-333333333333'::uuid
)
)
select is(
(select flow_input from started_tasks limit 1),
NULL::jsonb,
'Dependent map step should receive NULL flow_input (lazy loaded)'
);

-- Test 5: Multiple parallel root non-map steps all receive flow_input
select pgflow_tests.reset_db();
select pgflow.create_flow('parallel_flow');
select pgflow.add_step('parallel_flow', 'step1');
select pgflow.add_step('parallel_flow', 'step2'); -- parallel step, no deps
select pgflow.start_flow('parallel_flow', '{"batch": "test"}'::jsonb);

select pgflow_tests.ensure_worker('parallel_flow');

with msgs as (
select * from pgmq.read_with_poll('parallel_flow', 10, 5, 2, 50)
),
msg_ids as (
select array_agg(msg_id) as ids from msgs
),
started_tasks as (
select * from pgflow.start_tasks(
'parallel_flow',
(select ids from msg_ids),
'11111111-1111-1111-1111-111111111111'::uuid
)
)
select ok(
(select bool_and(flow_input = '{"batch": "test"}'::jsonb) from started_tasks),
'All parallel root non-map steps should receive flow_input'
);

-- Test 6: Mixed batch - only root non-map steps receive flow_input
-- This tests a complex scenario with different step types in same batch
select pgflow_tests.reset_db();
select pgflow.create_flow('mixed_flow');
select pgflow.add_step('mixed_flow', 'root_single'); -- root non-map
select pgflow.add_step('mixed_flow', 'root_array'); -- root non-map (array is just single that returns array)
select pgflow.start_flow('mixed_flow', '{"data": "value"}'::jsonb);

select pgflow_tests.ensure_worker('mixed_flow');

-- Both are root non-map steps, both should get flow_input
with msgs as (
select * from pgmq.read_with_poll('mixed_flow', 10, 5, 2, 50)
),
msg_ids as (
select array_agg(msg_id) as ids from msgs
),
started_tasks as (
select * from pgflow.start_tasks(
'mixed_flow',
(select ids from msg_ids),
'11111111-1111-1111-1111-111111111111'::uuid
)
)
select is(
(select count(*)::int from started_tasks where flow_input is not null),
2,
'Both root non-map steps should receive non-null flow_input'
);

select finish();
rollback;
Loading