-
Notifications
You must be signed in to change notification settings - Fork 15
feat: make flowInput lazy-loaded to prevent data duplication in map steps #560
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
jumski
wants to merge
1
commit into
12-24-add-changeset-and-news-for-assymetric-handlers
Choose a base branch
from
12-25-feat_lazy_load_ctx.flowinput_to_prevent_data_duplication_for_map_steps
base: 12-24-add-changeset-and-news-for-assymetric-handlers
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from all commits
Commits
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
207 changes: 207 additions & 0 deletions
207
pkgs/core/supabase/tests/start_tasks/conditional_flow_input.test.sql
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,207 @@ | ||
| begin; | ||
| select plan(6); | ||
| select pgflow_tests.reset_db(); | ||
|
|
||
| -- ========================================================================= | ||
| -- Test: Conditional flow_input in start_tasks | ||
| -- | ||
| -- Only root non-map steps receive flow_input. | ||
| -- All other step types (root map, dependent non-map, dependent map) get NULL. | ||
| -- This optimization prevents data duplication for large array processing. | ||
| -- ========================================================================= | ||
|
|
||
| -- Test 1: Root non-map step receives flow_input | ||
| select pgflow.create_flow('root_step_flow'); | ||
| select pgflow.add_step('root_step_flow', 'root_step'); | ||
| select pgflow.start_flow('root_step_flow', '{"user_id": "abc123"}'::jsonb); | ||
|
|
||
| select pgflow_tests.ensure_worker('root_step_flow'); | ||
|
|
||
| with msgs as ( | ||
| select * from pgmq.read_with_poll('root_step_flow', 10, 5, 1, 50) limit 1 | ||
| ), | ||
| msg_ids as ( | ||
| select array_agg(msg_id) as ids from msgs | ||
| ), | ||
| started_tasks as ( | ||
| select * from pgflow.start_tasks( | ||
| 'root_step_flow', | ||
| (select ids from msg_ids), | ||
| '11111111-1111-1111-1111-111111111111'::uuid | ||
| ) | ||
| ) | ||
| select is( | ||
| (select flow_input from started_tasks), | ||
| '{"user_id": "abc123"}'::jsonb, | ||
| 'Root non-map step should receive flow_input' | ||
| ); | ||
|
|
||
| -- Test 2: Dependent non-map step receives NULL flow_input | ||
| select pgflow_tests.reset_db(); | ||
| select pgflow.create_flow('dep_flow'); | ||
| select pgflow.add_step('dep_flow', 'first'); | ||
| select pgflow.add_step('dep_flow', 'second', ARRAY['first']); | ||
| select pgflow.start_flow('dep_flow', '{"original": "input"}'::jsonb); | ||
|
|
||
| select pgflow_tests.ensure_worker('dep_flow'); | ||
|
|
||
| -- Complete first step | ||
| with poll_result as ( | ||
| select * from pgflow_tests.read_and_start('dep_flow', 1, 1) | ||
| ) | ||
| select pgflow.complete_task( | ||
| run_id, | ||
| step_slug, | ||
| task_index, | ||
| '{"first_result": "done"}'::jsonb | ||
| ) from poll_result; | ||
|
|
||
| -- Start second step and verify flow_input is NULL | ||
| select pgflow_tests.ensure_worker('dep_flow', '22222222-2222-2222-2222-222222222222'::uuid); | ||
| with msgs as ( | ||
| select * from pgmq.read_with_poll('dep_flow', 10, 5, 1, 50) limit 1 | ||
| ), | ||
| msg_ids as ( | ||
| select array_agg(msg_id) as ids from msgs | ||
| ), | ||
| started_tasks as ( | ||
| select * from pgflow.start_tasks( | ||
| 'dep_flow', | ||
| (select ids from msg_ids), | ||
| '22222222-2222-2222-2222-222222222222'::uuid | ||
| ) | ||
| ) | ||
| select is( | ||
| (select flow_input from started_tasks), | ||
| NULL::jsonb, | ||
| 'Dependent non-map step should receive NULL flow_input (lazy loaded)' | ||
| ); | ||
|
|
||
| -- Test 3: Root map step receives NULL flow_input | ||
| -- (flowInput IS the array, useless to include - workers get element via task.input) | ||
| select pgflow_tests.reset_db(); | ||
| select pgflow.create_flow('root_map_flow'); | ||
| select pgflow.add_step('root_map_flow', 'map_step', '{}', null, null, null, null, 'map'); | ||
| select pgflow.start_flow('root_map_flow', '[1, 2, 3]'::jsonb); | ||
|
|
||
| select pgflow_tests.ensure_worker('root_map_flow'); | ||
|
|
||
| with msgs as ( | ||
| select * from pgmq.read_with_poll('root_map_flow', 10, 5, 3, 50) limit 1 | ||
| ), | ||
| msg_ids as ( | ||
| select array_agg(msg_id) as ids from msgs | ||
| ), | ||
| started_tasks as ( | ||
| select * from pgflow.start_tasks( | ||
| 'root_map_flow', | ||
| (select ids from msg_ids), | ||
| '11111111-1111-1111-1111-111111111111'::uuid | ||
| ) | ||
| ) | ||
| select is( | ||
| (select flow_input from started_tasks limit 1), | ||
| NULL::jsonb, | ||
| 'Root map step should receive NULL flow_input (flowInput is the array itself)' | ||
| ); | ||
|
|
||
| -- Test 4: Dependent map step receives NULL flow_input | ||
| select pgflow_tests.reset_db(); | ||
| select pgflow.create_flow('dep_map_flow'); | ||
| select pgflow.add_step('dep_map_flow', 'fetch_items', '{}', null, null, null, null, 'single'); | ||
| select pgflow.add_step('dep_map_flow', 'process_item', ARRAY['fetch_items'], null, null, null, null, 'map'); | ||
| select pgflow.start_flow('dep_map_flow', '{"config": "value"}'::jsonb); | ||
|
|
||
| select pgflow_tests.ensure_worker('dep_map_flow'); | ||
|
|
||
| -- Complete first step with array | ||
| with poll_result as ( | ||
| select * from pgflow_tests.read_and_start('dep_map_flow', 1, 1) | ||
| ) | ||
| select pgflow.complete_task( | ||
| run_id, | ||
| step_slug, | ||
| task_index, | ||
| '["a", "b", "c"]'::jsonb | ||
| ) from poll_result; | ||
|
|
||
| -- Start map step and verify flow_input is NULL | ||
| select pgflow_tests.ensure_worker('dep_map_flow', '33333333-3333-3333-3333-333333333333'::uuid); | ||
| with msgs as ( | ||
| select * from pgmq.read_with_poll('dep_map_flow', 10, 5, 3, 50) limit 1 | ||
| ), | ||
| msg_ids as ( | ||
| select array_agg(msg_id) as ids from msgs | ||
| ), | ||
| started_tasks as ( | ||
| select * from pgflow.start_tasks( | ||
| 'dep_map_flow', | ||
| (select ids from msg_ids), | ||
| '33333333-3333-3333-3333-333333333333'::uuid | ||
| ) | ||
| ) | ||
| select is( | ||
| (select flow_input from started_tasks limit 1), | ||
| NULL::jsonb, | ||
| 'Dependent map step should receive NULL flow_input (lazy loaded)' | ||
| ); | ||
|
|
||
| -- Test 5: Multiple parallel root non-map steps all receive flow_input | ||
| select pgflow_tests.reset_db(); | ||
| select pgflow.create_flow('parallel_flow'); | ||
| select pgflow.add_step('parallel_flow', 'step1'); | ||
| select pgflow.add_step('parallel_flow', 'step2'); -- parallel step, no deps | ||
| select pgflow.start_flow('parallel_flow', '{"batch": "test"}'::jsonb); | ||
|
|
||
| select pgflow_tests.ensure_worker('parallel_flow'); | ||
|
|
||
| with msgs as ( | ||
| select * from pgmq.read_with_poll('parallel_flow', 10, 5, 2, 50) | ||
| ), | ||
| msg_ids as ( | ||
| select array_agg(msg_id) as ids from msgs | ||
| ), | ||
| started_tasks as ( | ||
| select * from pgflow.start_tasks( | ||
| 'parallel_flow', | ||
| (select ids from msg_ids), | ||
| '11111111-1111-1111-1111-111111111111'::uuid | ||
| ) | ||
| ) | ||
| select ok( | ||
| (select bool_and(flow_input = '{"batch": "test"}'::jsonb) from started_tasks), | ||
| 'All parallel root non-map steps should receive flow_input' | ||
| ); | ||
|
|
||
| -- Test 6: Mixed batch - only root non-map steps receive flow_input | ||
| -- This tests a complex scenario with different step types in same batch | ||
| select pgflow_tests.reset_db(); | ||
| select pgflow.create_flow('mixed_flow'); | ||
| select pgflow.add_step('mixed_flow', 'root_single'); -- root non-map | ||
| select pgflow.add_step('mixed_flow', 'root_array'); -- root non-map (array is just single that returns array) | ||
| select pgflow.start_flow('mixed_flow', '{"data": "value"}'::jsonb); | ||
|
|
||
| select pgflow_tests.ensure_worker('mixed_flow'); | ||
|
|
||
| -- Both are root non-map steps, both should get flow_input | ||
| with msgs as ( | ||
| select * from pgmq.read_with_poll('mixed_flow', 10, 5, 2, 50) | ||
| ), | ||
| msg_ids as ( | ||
| select array_agg(msg_id) as ids from msgs | ||
| ), | ||
| started_tasks as ( | ||
| select * from pgflow.start_tasks( | ||
| 'mixed_flow', | ||
| (select ids from msg_ids), | ||
| '11111111-1111-1111-1111-111111111111'::uuid | ||
| ) | ||
| ) | ||
| select is( | ||
| (select count(*)::int from started_tasks where flow_input is not null), | ||
| 2, | ||
| 'Both root non-map steps should receive non-null flow_input' | ||
| ); | ||
|
|
||
| select finish(); | ||
| rollback; |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.