Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 18 additions & 5 deletions NOMENCLATURE_GUIDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -200,19 +200,32 @@ Slugs are unique text identifiers with specific rules:

## Data Flow Terms

### Handler Signatures

Step handlers use **asymmetric signatures** based on whether they have dependencies:

**Root steps (no dependencies):**
- First parameter: `flowInput` - the original flow input directly
- Second parameter: `ctx` - context object (env, supabase, flowInput, etc.)

**Dependent steps (with dependsOn):**
- First parameter: `deps` - object with outputs from dependency steps
- Second parameter: `ctx` - context object (includes `ctx.flowInput` if needed)

### Input Structure

- `input.run` - Original flow input (available to all steps except map tasks)
- `input.{stepName}` - Output from dependency step
- `flowInput` - Original flow input (root step first parameter)
- `deps.{stepName}` - Output from dependency step (dependent step first parameter)
- `ctx.flowInput` - Original flow input via context (available in all steps)

### Map Step Input

- Map step handlers receive **two parameters**:
1. **item** - The assigned array element
2. **context** - BaseContext object (env, shutdownSignal, rawMessage, workerConfig)
2. **context** - BaseContext object (env, shutdownSignal, rawMessage, workerConfig, flowInput)
- Map handlers do NOT receive FlowContext (no access to `context.stepTask`)
- No access to `input.run` or other step dependencies via the item parameter
- Context provides environment and worker metadata, but not flow-level input data
- Access to original flow input via `context.flowInput`
- Context provides environment, worker metadata, and flow-level input data

### Output Handling

Expand Down
14 changes: 7 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,17 +43,17 @@ Then define your workflow ([full guide](https://pgflow.dev/get-started/installat
import { Flow } from '@pgflow/dsl';

new Flow<{ url: string }>({ slug: 'analyzeArticle' })
.step({ slug: 'scrape' }, (input) => scrapeWebsite(input.run.url))
.step({ slug: 'summarize', dependsOn: ['scrape'] }, (input) =>
summarize(input.scrape)
.step({ slug: 'scrape' }, (flowInput) => scrapeWebsite(flowInput.url))
.step({ slug: 'summarize', dependsOn: ['scrape'] }, (deps) =>
summarize(deps.scrape)
)
.step({ slug: 'extractKeywords', dependsOn: ['scrape'] }, (input) =>
extractKeywords(input.scrape)
.step({ slug: 'extractKeywords', dependsOn: ['scrape'] }, (deps) =>
extractKeywords(deps.scrape)
)
.step(
{ slug: 'publish', dependsOn: ['summarize', 'extractKeywords'] },
(input) =>
publish({ summary: input.summarize, keywords: input.extractKeywords })
(deps) =>
publish({ summary: deps.summarize, keywords: deps.extractKeywords })
);
```

Expand Down
10 changes: 5 additions & 5 deletions context7.json
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,18 @@
"excludeFiles": [],
"rules": [
"Flow definitions are immutable and declarative - Each `.step()`, `.array()` or `.map()` call returns a new Flow instance; describe what steps exist and how they connect, pgflow handles execution.",
"Step slugs identify steps and type their outputs - Each step has a unique `slug` used in `dependsOn` arrays and to access outputs via `input.stepName`, which is automatically typed as the step's handler return type.",
"Flow input type determines `input.run` in all steps - Define `Flow<InputType>` to type the original flow input accessible in ALL steps via `input.run` (e.g., `new Flow<{ url: string }>()` makes `input.run.url` available everywhere with full type safety).",
"Step slugs identify steps and type their outputs - Each step has a unique `slug` used in `dependsOn` arrays and to access outputs via `deps.stepName` in dependent steps, which is automatically typed as the step's handler return type.",
"Asymmetric handler signatures - Root steps receive `(flowInput, ctx)` where flowInput is the flow input directly; dependent steps receive `(deps, ctx)` where deps contains outputs from dependencies and `ctx.flowInput` provides the original flow input if needed.",
"Root steps omit `dependsOn`, dependent steps require it - Steps with no dependencies omit `dependsOn` entirely; steps that depend on others specify them in the `dependsOn` array (e.g., `{ slug: 'summary', dependsOn: ['website'] }`).",
"All inputs and outputs must be JSON-serializable - Step handlers must return plain objects, primitives, and arrays only; convert dates to ISO strings and avoid functions, class instances, or circular references.",
"Use `.array()` for array-returning steps - The `.array()` method is a semantic wrapper around `.step()` that enforces array return types, useful for data collection that will be processed by map steps.",
"Use `.map()` for parallel array processing - The `.map()` method creates a step that spawns N parallel tasks (one per array element), each with its own retry counter, returning an aggregated array of results.",
"Handler functions for `.map()` steps receive individual array elements - Unlike `.step()` and `.array()` handlers that receive the full `input` object, map handlers receive only the array element (e.g., `(item) => processItem(item)`).",
"Use `.array()` to prepare data for `.map()` handlers - When map handlers need access to `input.run` or outputs from other steps, create an intermediate `.array()` step that enriches each element with the required data.",
"Handler functions for `.map()` steps receive individual array elements - Unlike `.step()` and `.array()` handlers that receive flowInput or deps, map handlers receive only the array element (e.g., `(item, ctx) => processItem(item)`) with flowInput available via `ctx.flowInput`.",
"Use `.array()` to prepare data for `.map()` handlers - When map handlers need access to flowInput or outputs from other steps, create an intermediate `.array()` step that enriches each element with the required data.",
"Map steps can only have a single dependency - Specify the array source via the `array` property (e.g., `{ slug: 'process', array: 'items' }`) and cannot use `dependsOn`.",
"Root map steps omit `array:`, dependent maps require it - Root maps process flow input directly without `array:` property (e.g., `new Flow<string[]>().map({ slug: 'process' }, ...)`); dependent maps must specify `array:` to indicate which step's output to process.",
"Steps with the same dependencies run in parallel - pgflow automatically maximizes parallelism by running independent steps concurrently once their dependencies are satisfied.",
"Handler functions receive `(input, context)` parameters - The second parameter provides platform resources like `context.sql`, `context.supabase`, `context.env`, `context.stepTask`, and `context.rawMessage` without requiring global imports or connection setup.",
"Handler functions receive context as second parameter - Root steps: `(flowInput, ctx)`, dependent steps: `(deps, ctx)`. Context provides platform resources like `ctx.sql`, `ctx.supabase`, `ctx.env`, `ctx.flowInput`, `ctx.stepTask`, and `ctx.rawMessage` without requiring global imports or connection setup.",
"Flows must be compiled before execution - Run `npx pgflow@latest compile` to convert TypeScript flow definitions into SQL migrations that register the flow structure in the database.",
"Registered flows are immutable - Once a flow is registered in the database, its structure cannot be modified; use versioning or deletion (development only) to make changes.",
"Use camelCase for step slugs - Step slugs follow JavaScript naming conventions (e.g., `fetchData`, not `fetch_data`) since they become property names in TypeScript for accessing outputs.",
Expand Down
2 changes: 1 addition & 1 deletion pkgs/client/__tests__/e2e/full-stack-dsl.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ describe('Full Stack DSL Integration', () => {
tasks = await readAndStart(sql, sqlClient, SimpleFlow.slug, 1, 5);
expect(tasks).toHaveLength(1);
expect(tasks[0].step_slug).toBe('save');
expect(tasks[0].input.run).toEqual(input);
expect(tasks[0].flow_input).toEqual(input);

// The save step only depends on process, so it should only have process output
// This is correct behavior - transitive dependencies are not automatically included
Expand Down
4 changes: 2 additions & 2 deletions pkgs/client/__tests__/e2e/happy-path-e2e.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -126,8 +126,8 @@ describe('Happy Path E2E Integration', () => {
expect(tasks[0].step_slug).toBe('save');

log('Save task input:', JSON.stringify(tasks[0].input, null, 2));
expect(tasks[0].input.run).toEqual(input);

expect(tasks[0].flow_input).toEqual(input);
expect(tasks[0].input.process).toEqual(processOutput);
// Note: save step only depends on process, so fetch output is not included (correct behavior)
expect(tasks[0].input.fetch).toBeUndefined();
Expand Down
18 changes: 8 additions & 10 deletions pkgs/core/schemas/0120_function_start_tasks.sql
Original file line number Diff line number Diff line change
Expand Up @@ -154,21 +154,19 @@ as $$
END

-- -------------------- NON-MAP STEPS --------------------
-- Regular (non-map) steps receive ALL inputs as a structured object.
-- This includes the original run input plus all dependency outputs.
-- Regular (non-map) steps receive dependency outputs as a structured object.
-- Root steps (no dependencies) get empty object - they access flowInput via context.
-- Dependent steps get only their dependency outputs.
ELSE
-- Non-map steps get structured input with named keys
-- Example output: {
-- "run": {"original": "input"},
-- Non-map steps get structured input with dependency keys only
-- Example for dependent step: {
-- "step1": {"output": "from_step1"},
-- "step2": {"output": "from_step2"}
-- }
-- Example for root step: {}
--
-- Build object with 'run' key containing original input
jsonb_build_object('run', r.input) ||
-- Merge with deps_output which already has dependency outputs
-- deps_output format: {"dep1": output1, "dep2": output2, ...}
-- If no dependencies, defaults to empty object
-- Note: flow_input is available separately in the returned record
-- for workers to access via context.flowInput
coalesce(dep_out.deps_output, '{}'::jsonb)
END as input,
st.message_id as msg_id,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,21 +148,19 @@ with tasks as (
END

-- -------------------- NON-MAP STEPS --------------------
-- Regular (non-map) steps receive ALL inputs as a structured object.
-- This includes the original run input plus all dependency outputs.
-- Regular (non-map) steps receive dependency outputs as a structured object.
-- Root steps (no dependencies) get empty object - they access flowInput via context.
-- Dependent steps get only their dependency outputs.
ELSE
-- Non-map steps get structured input with named keys
-- Example output: {
-- "run": {"original": "input"},
-- Non-map steps get structured input with dependency keys only
-- Example for dependent step: {
-- "step1": {"output": "from_step1"},
-- "step2": {"output": "from_step2"}
-- }
-- Example for root step: {}
--
-- Build object with 'run' key containing original input
jsonb_build_object('run', r.input) ||
-- Merge with deps_output which already has dependency outputs
-- deps_output format: {"dep1": output1, "dep2": output2, ...}
-- If no dependencies, defaults to empty object
-- Note: flow_input is available separately in the returned record
-- for workers to access via context.flowInput
coalesce(dep_out.deps_output, '{}'::jsonb)
END as input,
st.message_id as msg_id,
Expand Down
4 changes: 2 additions & 2 deletions pkgs/core/supabase/migrations/atlas.sum
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
h1:nPCLpuNY7xbRl3lNa/ns7GG1Uf3twA0VSK7DIVBhzsc=
h1:32eppXsursErjjXT7SfNzyRPfw4iiiL1Pnafyy4DSvk=
20250429164909_pgflow_initial.sql h1:I3n/tQIg5Q5nLg7RDoU3BzqHvFVjmumQxVNbXTPG15s=
20250517072017_pgflow_fix_poll_for_tasks_to_use_separate_statement_for_polling.sql h1:wTuXuwMxVniCr3ONCpodpVWJcHktoQZIbqMZ3sUHKMY=
20250609105135_pgflow_add_start_tasks_and_started_status.sql h1:ggGanW4Wyt8Kv6TWjnZ00/qVb3sm+/eFVDjGfT8qyPg=
Expand All @@ -14,4 +14,4 @@ h1:nPCLpuNY7xbRl3lNa/ns7GG1Uf3twA0VSK7DIVBhzsc=
20251130000000_pgflow_auto_compilation.sql h1:qs+3qq1Vsyo0ETzbxDnmkVtSUa6XHkd/K9wF/3W46jM=
20251209074533_pgflow_worker_management.sql h1:ozFkYM1EvEH7dGvW+1pWgpzbwdWlwuQoddqLomL1GXw=
20251212100113_pgflow_allow_data_loss_parameter.sql h1:Fg3RHj51STNHS4epQ2J4AFMj7NwG0XfyDTSA/9dcBIQ=
20251223052347_pgflow_add_flow_input_column.sql h1:RwQBPOMml4cCR6qeqd5kVQKhxXAbFqkHEJa0ruXYiTo=
20251223110504_pgflow_add_flow_input_column.sql h1:HJ9GJc4xh68JT82JINf946RS/jQNt9jDhvIH4VA40c8=
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,10 @@ begin
end $$;

-- Check that single_step receives aggregated array as input
-- Dependent steps only get dependency outputs (no 'run' key)
select is(
(select input from pgflow_tests.read_and_start('test_agg', 1, 1)),
jsonb_build_object(
'run', '[10, 20, 30]'::jsonb,
'map_step', jsonb_build_array(
jsonb_build_object('result', 100),
jsonb_build_object('result', 200),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,10 +89,10 @@ begin
end $$;

-- Verify consumer receives both outputs correctly
-- Dependent steps only get dependency outputs (no 'run' key)
select is(
(select input from pgflow_tests.read_and_start('test_mixed', 1, 1)),
jsonb_build_object(
'run', jsonb_build_object('config', 'test'),
'single_src', jsonb_build_object('processed', true, 'value', 100),
'map_src', jsonb_build_array('"A"', '"B"', '"C"') -- Strings are JSON encoded
),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,10 +99,10 @@ begin
end $$;

-- Verify collector receives both aggregated arrays
-- Dependent steps only get dependency outputs (no 'run' key)
select is(
(select input from pgflow_tests.read_and_start('test_multi_maps', 1, 1)),
jsonb_build_object(
'run', '{}'::jsonb,
'map_a', jsonb_build_array(20, 40, 60),
'map_b', jsonb_build_array('ALPHA', 'BETA')
),
Expand Down
Loading
Loading