From ae90b87ff2d49a20da3236cb59cd006e88f74b2d Mon Sep 17 00:00:00 2001 From: tjgreen42 <1738591+tjgreen42@users.noreply.github.com> Date: Wed, 1 Jul 2026 03:51:48 +0000 Subject: [PATCH 1/2] Reconcile df.start() with the durable engine via committed intent df.start() previously wrote its df rows in the caller's transaction but started the workflow engine out-of-band on a separate connection. Those two writes could diverge: - Ghost: a rolled-back df.start() left the engine running a workflow with no df record behind it. - Stuck: a committed df instance whose engine start was lost never ran. Record the start as committed intent and let the background worker converge the engine to match, going only through duroxide's Rust API (no shared transaction, no touching duroxide's internal schema): - df.start() persists a start_input payload on df.instances and issues a transactional NOTIFY; it no longer starts the engine itself. - The worker LISTENs for instant starts and periodically sweeps for committed-but-unstarted instances the engine does not yet know about, starting each exactly once via an atomic pending->running claim. A rolled-back df.start() therefore starts nothing, and a committed instance is always eventually started. Schema (0.2.4): add df.instances.start_input JSONB (fresh install + upgrade script), extend the grant/revoke INSERT column list, and backfill the column grant to existing df-usage roles. Two Postmaster GUCs control the sweep: pg_durable.reconcile_interval (15s) and reconcile_grace (300s). Backward compatible: on a pre-0.2.4 schema df.start() keeps the inline start, and the worker re-probes for the column so an in-place ALTER EXTENSION UPDATE enables reconciliation without a restart. Tests: tests/e2e/sql/25_reconcile_start.sql exercises both failure modes (RED on main); upgrade test adds grant-backfill coverage. Docs: USER_GUIDE section, docs/spec-reconcile-start.md, upgrade-testing notes. --- USER_GUIDE.md | 60 +++++- docs/spec-reconcile-start.md | 126 +++++++++++++ docs/upgrade-testing.md | 6 + scripts/test-upgrade.sh | 28 +++ sql/pg_durable--0.2.3--0.2.4.sql | 60 +++++- src/dsl.rs | 138 ++++++++++---- src/lib.rs | 39 +++- src/types.rs | 11 ++ src/worker.rs | 270 ++++++++++++++++++++++++++- tests/e2e/sql/25_reconcile_start.sql | 111 +++++++++++ 10 files changed, 799 insertions(+), 50 deletions(-) create mode 100644 docs/spec-reconcile-start.md create mode 100644 tests/e2e/sql/25_reconcile_start.sql diff --git a/USER_GUIDE.md b/USER_GUIDE.md index a2b8c2c..b7717ba 100644 --- a/USER_GUIDE.md +++ b/USER_GUIDE.md @@ -23,9 +23,10 @@ pg_durable is a PostgreSQL extension that brings durable, fault-tolerant functio 13. [Monitoring](#monitoring) 14. [User Isolation & Privileges](#user-isolation--privileges) 15. [Connection Limits](#connection-limits) -16. [Troubleshooting](#troubleshooting) -17. [Quick Reference Card](#quick-reference-card) -18. [Appendix: Test Data Setup](#appendix-test-data-setup) +16. [Start Reconciliation](#start-reconciliation) +17. [Troubleshooting](#troubleshooting) +18. [Quick Reference Card](#quick-reference-card) +19. [Appendix: Test Data Setup](#appendix-test-data-setup) --- @@ -2012,6 +2013,59 @@ pg_durable.execution_acquire_timeout = 60 --- +## Start Reconciliation + +pg_durable keeps its own bookkeeping in the `df` schema, while the workflow +engine (duroxide) keeps running-workflow state in its own schema. `df.start()` +writes the `df` rows for a new instance **in your transaction** and then lets the +background worker start the engine from that committed record. This keeps the two +schemas in sync without sharing a transaction: + +- **If your transaction rolls back**, the `df` rows disappear and the engine was + never told to run — so a rolled-back `df.start()` never leaves a "ghost" + workflow running with no `df` record. +- **Once your transaction commits**, the instance is *always* eventually started, + even if the worker was briefly down when you called `df.start()`. + +Two mechanisms converge the state: + +1. **Instant path** — `df.start()` sends a transactional `NOTIFY` (delivered only + if the transaction commits). The worker is listening and starts the instance + immediately. +2. **Backstop sweep** — the worker periodically scans for committed-but-unstarted + instances (a missed notification) and starts any it finds that the engine does + not yet know about. + +Under normal operation the instant path starts workflows in milliseconds; the +sweep exists only to guarantee eventual convergence. + +### GUC Reference + +Both GUCs are **Postmaster-context** — set them in `postgresql.conf` and restart +PostgreSQL. + +```ini +# postgresql.conf + +# How often (seconds) the worker sweeps for committed-but-unstarted instances. +# Set to 0 to disable the periodic sweep (the instant NOTIFY path stays on). +pg_durable.reconcile_interval = 15 + +# Minimum age (seconds) a pending instance must reach before the sweep starts it. +# Keeps the sweep from racing the instant NOTIFY path for freshly-committed work. +pg_durable.reconcile_grace = 300 +``` + +The defaults suit most deployments. Lower `reconcile_grace` if you want the +backstop to recover missed starts sooner; raise `reconcile_interval` to reduce +background query load on very large `df.instances` tables. + +> **Note:** Reconciliation activates only on schema version 0.2.4 and later +> (when `df.instances.start_input` exists). On an older schema `df.start()` +> starts the engine inline instead, so there is nothing to reconcile. + +--- + ## Troubleshooting ### Extension Exists But Workflows Don't Start diff --git a/docs/spec-reconcile-start.md b/docs/spec-reconcile-start.md new file mode 100644 index 0000000..5c06117 --- /dev/null +++ b/docs/spec-reconcile-start.md @@ -0,0 +1,126 @@ +# Start Reconciliation Specification + +## The problem, in plain terms + +pg_durable stores two copies of "what workflows exist": + +- The **`df` schema** — pg_durable's own bookkeeping (`df.instances`, `df.nodes`), + written by `df.start()` in the caller's transaction. +- The **duroxide schema** — the workflow engine's runtime state (its queue and + instance history), which actually makes a workflow run. + +These are two separate systems. Historically `df.start()` wrote the `df` rows in +your transaction but told the engine to run the workflow **on a separate +connection**, which commits independently. That split caused two failure modes: + +1. **Ghost workflow (rollback leak).** You call `df.start()` and then your + transaction rolls back. Your `df` rows vanish — but the engine was already + told to run the workflow on that other connection, so it keeps running with no + `df` record behind it. + +2. **Stuck workflow (lost start).** The `df` rows commit, but the message telling + the engine to run never lands (worker was down, connection dropped). Now there + is a workflow "on paper" in `df.instances` that never executes. + +Both come from the same root cause: **the `df` write and the engine start happen +in two different transactions, with nothing reconciling them afterward.** + +## The approach + +Stop trying to force both writes into one transaction, and stop reaching into the +engine's internal SQL. Instead, treat the committed `df` row as the **source of +truth (the intent)** and let the background worker converge the engine to match, +going only through duroxide's **Rust API** (`Client::start_orchestration`, +`Client::get_orchestration_status`). + +`df.start()` now: + +1. Writes the instance rows **and a `start_input` payload** (the vars snapshot + + label captured at start time) into `df` — all in the caller's transaction. +2. Issues a transactional `NOTIFY pg_durable_start` (delivered **only** if the + transaction commits). +3. Does **not** start the engine itself. + +The background worker converges the engine two ways: + +- **Instant path.** It `LISTEN`s on `pg_durable_start`. On a notification it starts + the just-committed instance immediately (typically single-digit milliseconds). +- **Backstop sweep.** Every `reconcile_interval` seconds it scans for `pending` + instances older than `reconcile_grace` that the engine still does not know about + (`get_orchestration_status == NotFound`) and starts them. This covers a missed + notification (worker restart, dropped connection). + +Why this fixes both modes: + +- **Ghost:** the engine is only ever started by the worker from a *committed* `df` + row. A rolled-back `df.start()` leaves no row and delivers no notification, so + nothing is ever started. No ghost is possible. +- **Stuck:** even if the instant notification is lost, the sweep eventually finds + the committed-but-unstarted row and starts it. Convergence is guaranteed. + +### Why go through the Rust API instead of the engine's SQL + +The engine's SQL schema is an internal implementation detail, not its contract. +Its contract is "start an orchestration." Driving it through +`Client::start_orchestration` keeps pg_durable decoupled from how duroxide happens +to store state, and it works against any duroxide provider. The philosophical +framing (per review discussion): duroxide's model is async, at-least-once, +converge-later; Postgres's model is synchronous and transactional. pg_durable +lives on Postgres but follows duroxide's model, so it records intent +transactionally and converges the async engine afterward rather than pretending +the two can share one transaction. + +### Exactly-once start + +The instant path and the sweep can both notice the same instance. Start-once is +guaranteed by an atomic claim: the worker flips the row `pending -> running` with +`UPDATE ... WHERE status = 'pending' RETURNING start_input`. Only the one caller +that observes the row still `pending` proceeds to call the engine. If that engine +call fails, the claim is rolled back to `pending` so a later sweep retries. + +## Schema changes (0.2.4) + +- `df.instances.start_input JSONB` (nullable) — the `FunctionInput` captured at + `df.start()` time so the worker can replay the exact start payload. Rows written + before this column existed replay with an empty vars set. +- Added to the `df.grant_usage()` / `df.revoke_usage()` INSERT column list; the + upgrade script backfills `GRANT INSERT (start_input)` to existing df-usage roles. + +### GUCs (Postmaster-context) + +| GUC | Default | Purpose | +|-----|---------|---------| +| `pg_durable.reconcile_interval` | `15` | Seconds between backstop sweeps. `0` disables the sweep (instant path stays on). | +| `pg_durable.reconcile_grace` | `300` | Minimum age a pending instance must reach before the sweep starts it. | + +## Upgrade & Migration + +- **Backward compatibility (B1).** The new `.so` must run against un-upgraded + 0.2.2 / 0.2.3 schemas that lack `start_input`. `df.start()` gates on the + installed extension version: `>= 0.2.4` uses the new intent path; older uses the + legacy inline start. The worker checks for the `start_input` column before + running any reconciliation, and re-checks it on each sweep so an in-place + `ALTER EXTENSION UPDATE` (which does not restart the worker) enables + reconciliation mid-epoch. The `LISTEN` is always established because `df.start()` + only sends the notification once the column exists. +- **Upgrade script.** `sql/pg_durable--0.2.3--0.2.4.sql` adds the column (appended + last, matching the fresh-install column order), updates the grant/revoke, and + backfills the column grant to existing df roles. +- **No duroxide schema change** — this feature only uses the duroxide Rust API, so + it requires no changes to the embedded duroxide migrations. + +## Scope + +This change fixes the **start** direction (df has it, engine does not). The reverse +directions (`df.cancel()` / `df.signal()` rollback leaks, or engine-has-it / +df-does-not orphan cleanup) are out of scope and remain on the existing client +path. + +## Testing + +`tests/e2e/sql/25_reconcile_start.sql` exercises both failure modes end-to-end: + +- **Scenario 1 (ghost):** `BEGIN; SELECT df.start(...); ROLLBACK;` then assert the + duroxide runtime has no residue for that instance. (Verified RED on `main`.) +- **Scenario 2 (stuck):** commit a backdated `pending` instance directly (no + notification), then assert the worker's sweep starts and completes it. diff --git a/docs/upgrade-testing.md b/docs/upgrade-testing.md index 8510bfe..102f5f8 100644 --- a/docs/upgrade-testing.md +++ b/docs/upgrade-testing.md @@ -254,6 +254,12 @@ what the upgrade script handles, and any backward compatibility considerations. - **Scenario B2 considerations:** No data migration. The `CREATE FUNCTION` adds catalog metadata only; `df.instances` rows are untouched. The new `created_at`/`completed_at` result columns are read from columns that already exist and are already populated on every prior install. - **Dependent-object note:** Because the upgrade only adds a function (no `DROP FUNCTION`), no customer-owned object that depends on the existing two-argument `df.list_instances` is affected — there is nothing to drop or repoint. +#### Start reconciliation — add `df.instances.start_input` (start-convergence) +- **DDL change (df schema):** Adds a nullable `df.instances.start_input JSONB` column that `df.start()` populates with the `FunctionInput` payload (instance id, label, vars snapshot, loop iteration) captured at start time, so the background worker can start the durable engine from committed intent rather than out-of-band on a separate connection. Fresh installs (`src/lib.rs`) declare the column **last** in `df.instances` (after `completed_at`); the upgrade script `sql/pg_durable--0.2.3--0.2.4.sql` runs `ALTER TABLE df.instances ADD COLUMN start_input JSONB`, which also appends it last — so both paths yield identical column ordinals. The column is added to the `df.grant_usage()` / `df.revoke_usage()` INSERT column list on both paths (`df.start()` runs as the caller and writes it), and the upgrade script backfills `GRANT INSERT (start_input)` to every role that already holds column-level INSERT on `df.instances` (enumerated via the `id` column's `attacl`, which every prior `df.grant_usage()` granted) so existing df users can keep calling `df.start()` without re-running `df.grant_usage()`. Two Postmaster GUCs are added (`pg_durable.reconcile_interval` default 15, `pg_durable.reconcile_grace` default 300). No duroxide schema change — reconciliation uses only the duroxide Rust `Client` API. +- **Scenario A considerations:** `ALTER TABLE ... ADD COLUMN start_input JSONB` appends the column at the last ordinal; the fresh-install `CREATE TABLE df.instances` places `start_input` last as well, so the post-upgrade catalog matches a fresh 0.2.4 install column-for-column. The grant/revoke bodies are `CREATE OR REPLACE`d identically on both paths. +- **Scenario B1 considerations:** The new `.so` must run against pre-0.2.4 schemas (0.2.2/0.2.3) that lack `start_input`. `df.start()` gates on the installed extension version (`start_input_supported()` → `>= 0.2.4`): on an older schema it takes the **legacy inline start path** (out-of-band `start_durable_function`, exactly as before), so no reference to the missing column is emitted. The background worker probes for the `start_input` column before running any reconciliation and **re-probes on each sweep tick** (monotonic false→true), so an in-place `ALTER EXTENSION UPDATE` — which does not restart the worker or start a new epoch — enables reconciliation mid-epoch without a restart. The `LISTEN pg_durable_start` is always established (schema-independent); a notification can only arrive once `df.start()` uses the intent path, which only happens after the column exists, so the notify handler never references a missing column. +- **Scenario B2 considerations:** No data migration of existing rows. Pre-upgrade `df.instances` rows have `start_input = NULL`; if such a row is ever reconciled, the worker synthesizes a minimal `{instance_id}` input (empty vars). After `ALTER EXTENSION UPDATE`, a new `df.start()` switches to the intent+NOTIFY path and the running worker starts it via the always-on listener (and the re-probing sweep as backstop). + ### v0.2.2 → v0.2.3 #### Rename duroxide provider schema to `_duroxide` for fresh installs diff --git a/scripts/test-upgrade.sh b/scripts/test-upgrade.sh index 3e05e84..7dee8d0 100755 --- a/scripts/test-upgrade.sh +++ b/scripts/test-upgrade.sh @@ -923,6 +923,7 @@ echo "" B2_PRE_INSTANCE_ID="" B2_INFLIGHT_INSTANCE_ID="" B2_POST_INSTANCE_ID="" +B2_BACKFILL_ROLE="durable_b2_backfill_probe" test_b2_data_survives_upgrade() { # Step 1: Install previous version and create test data @@ -933,6 +934,15 @@ test_b2_data_survives_upgrade() { assert_sql_equals "SELECT df.clearvars();" "OK" || return 1 assert_sql_equals "SELECT df.setvar('b2_key', 'b2_value');" "OK" || return 1 + # Create a NON-superuser df role and grant it df usage on the PREVIOUS + # schema (which has no start_input column), so the 0.2.4 upgrade's grant + # backfill has a pre-existing target. test_b2_start_input_grant_backfill_ + # after_upgrade verifies this role can still call df.start() after upgrade. + run_sql_capture "DROP OWNED BY ${B2_BACKFILL_ROLE};" >/dev/null 2>&1 || true + run_sql_capture "DROP ROLE IF EXISTS ${B2_BACKFILL_ROLE};" >/dev/null 2>&1 || true + run_sql_capture "CREATE ROLE ${B2_BACKFILL_ROLE} LOGIN;" >/dev/null || return 1 + run_sql_capture "SELECT df.grant_usage('${B2_BACKFILL_ROLE}');" >/dev/null || return 1 + B2_PRE_INSTANCE_ID=$(run_sql_capture "SELECT df.start('INSERT INTO test_upgrade_b2_log (kind, msg) VALUES (''pre'', ''{b2_key}'') RETURNING msg', 'b2-pre-upgrade');") || return 1 B2_INFLIGHT_INSTANCE_ID=$(run_sql_capture "SELECT df.start(df.sleep(2) ~> 'SELECT ''b2-running'' AS value', 'b2-inflight');") || return 1 @@ -977,6 +987,23 @@ test_b2_new_data_after_upgrade() { assert_sql_equals "SELECT msg FROM test_upgrade_b2_log WHERE kind = 'post' ORDER BY id DESC LIMIT 1;" "new_value" } +test_b2_start_input_grant_backfill_after_upgrade() { + # The 0.2.4 upgrade adds df.instances.start_input and df.start() switches to + # the intent path (INSERT ... start_input). A role granted df usage BEFORE the + # upgrade only holds INSERT on the old columns, so without the upgrade's grant + # backfill its df.start() would fail with "permission denied for column + # start_input". B2_BACKFILL_ROLE was created and granted df usage on the + # PREVIOUS schema (see test_b2_data_survives_upgrade), so this asserts the + # backfill extended its INSERT privilege to the new column. + assert_sql_equals \ + "SELECT has_column_privilege('${B2_BACKFILL_ROLE}', 'df.instances', 'start_input', 'INSERT');" \ + "t" + local rc=$? + + run_sql_capture "DROP OWNED BY ${B2_BACKFILL_ROLE}; DROP ROLE IF EXISTS ${B2_BACKFILL_ROLE};" >/dev/null 2>&1 || true + return $rc +} + test_b2_grant_usage_after_upgrade() { # Regression guard for #110: after ALTER EXTENSION UPDATE, df.debug_connection() # must be gone from the catalog. Scenario A only compares function name/args/ @@ -1012,6 +1039,7 @@ if [ "$HAS_COMPAT_PREV" = true ]; then run_test "B2: Pre-upgrade instance remains queryable" test_b2_pre_upgrade_instance_after_upgrade run_test "B2: In-flight work completes after upgrade" test_b2_inflight_work_after_upgrade run_test "B2: New data and execution after upgrade" test_b2_new_data_after_upgrade + run_test "B2: Pre-upgrade df role can df.start() after upgrade (start_input grant backfill)" test_b2_start_input_grant_backfill_after_upgrade run_test "B2: df.grant_usage() works and df.debug_connection() is gone after upgrade" test_b2_grant_usage_after_upgrade fi diff --git a/sql/pg_durable--0.2.3--0.2.4.sql b/sql/pg_durable--0.2.3--0.2.4.sql index 8ae8436..eb82ae9 100644 --- a/sql/pg_durable--0.2.3--0.2.4.sql +++ b/sql/pg_durable--0.2.3--0.2.4.sql @@ -89,7 +89,7 @@ BEGIN EXECUTE pg_catalog.format('GRANT SELECT ON df.instances TO %I', p_role) OPERATOR(pg_catalog.||) grant_opt; EXECUTE pg_catalog.format('GRANT UPDATE (status, updated_at) ON df.instances TO %I', p_role) OPERATOR(pg_catalog.||) grant_opt; EXECUTE pg_catalog.format('GRANT SELECT ON df.nodes TO %I', p_role) OPERATOR(pg_catalog.||) grant_opt; - EXECUTE pg_catalog.format('GRANT INSERT (id, label, root_node, submitted_by, database) ON df.instances TO %I', p_role) OPERATOR(pg_catalog.||) grant_opt; + EXECUTE pg_catalog.format('GRANT INSERT (id, label, root_node, submitted_by, database, start_input) ON df.instances TO %I', p_role) OPERATOR(pg_catalog.||) grant_opt; EXECUTE pg_catalog.format('GRANT INSERT (id, instance_id, node_type, query, result_name, left_node, right_node, submitted_by, database) ON df.nodes TO %I', p_role) OPERATOR(pg_catalog.||) grant_opt; EXECUTE pg_catalog.format('GRANT SELECT, INSERT, UPDATE, DELETE ON df.vars TO %I', p_role) OPERATOR(pg_catalog.||) grant_opt; @@ -154,7 +154,7 @@ BEGIN EXECUTE pg_catalog.format('REVOKE SELECT, INSERT, UPDATE, DELETE ON df.vars FROM %I CASCADE', p_role); EXECUTE pg_catalog.format('REVOKE INSERT (id, instance_id, node_type, query, result_name, left_node, right_node, submitted_by, database) ON df.nodes FROM %I CASCADE', p_role); EXECUTE pg_catalog.format('REVOKE SELECT ON df.nodes FROM %I CASCADE', p_role); - EXECUTE pg_catalog.format('REVOKE INSERT (id, label, root_node, submitted_by, database) ON df.instances FROM %I CASCADE', p_role); + EXECUTE pg_catalog.format('REVOKE INSERT (id, label, root_node, submitted_by, database, start_input) ON df.instances FROM %I CASCADE', p_role); EXECUTE pg_catalog.format('REVOKE UPDATE (status, updated_at) ON df.instances FROM %I CASCADE', p_role); EXECUTE pg_catalog.format('REVOKE SELECT ON df.instances FROM %I CASCADE', p_role); @@ -288,6 +288,62 @@ COMMENT ON COLUMN df.nodes.status_details IS '"execution_id": the orchestration instance_id::execution_id stamp recorded when the node last ' 'transitioned. df.instance_nodes() parses it to derive pending/skipped statuses; see USER_GUIDE.md.'; +-- ============================================================================ +-- Persisted start payload: add df.instances.start_input (start-convergence). +-- +-- df.start() now records the workflow's start input (the vars snapshot captured +-- at start time, plus label) in the SAME transaction that writes df.instances, +-- instead of calling the durable engine out-of-band on a separate connection. +-- The background worker starts the engine from this committed intent, so a +-- rolled-back df.start() leaves no engine "ghost" and a committed instance is +-- always eventually started. The column is nullable: rows written before this +-- upgrade have no snapshot, and the worker best-effort starts those with an +-- empty vars set. +-- +-- The column IS added to the user INSERT grant (df.start runs as the caller and +-- writes it). Backward compatibility (Scenario B1): the new .so gates the intent +-- path on the presence of this column (installed extension version >= 0.2.4) and +-- otherwise falls back to the pre-0.2.4 out-of-band start, so an un-upgraded +-- schema keeps working until this upgrade applies. +-- ============================================================================ +ALTER TABLE df.instances ADD COLUMN start_input JSONB; + +COMMENT ON COLUMN df.instances.start_input IS + 'FunctionInput JSON captured by df.start() (instance_id, label, vars snapshot, loop_iteration). ' + 'The background worker replays it to start the durable engine for this instance; see USER_GUIDE.md.'; + +-- Backfill the new column-level INSERT privilege to exactly the roles that +-- already hold column-level INSERT on df.instances (every df.grant_usage() call +-- grants INSERT on the id column), so existing df users can keep calling +-- df.start() after the new .so switches to the intent path -- without re-running +-- df.grant_usage(). Keying off the id column's attacl (rather than schema USAGE) +-- targets precisely the df.start() callers and preserves each grant's +-- grantability. +DO $$ +DECLARE + r RECORD; + grant_opt TEXT; +BEGIN + FOR r IN + SELECT + pg_catalog.quote_ident(pg_catalog.pg_get_userbyid(a.grantee)) AS grantee, + pg_catalog.bool_or(a.is_grantable) AS with_grant_option + FROM pg_catalog.pg_attribute att + JOIN pg_catalog.pg_class c ON c.oid OPERATOR(pg_catalog.=) att.attrelid + JOIN pg_catalog.pg_namespace n ON n.oid OPERATOR(pg_catalog.=) c.relnamespace + CROSS JOIN LATERAL pg_catalog.aclexplode(att.attacl) AS a + WHERE n.nspname OPERATOR(pg_catalog.=) 'df' + AND c.relname OPERATOR(pg_catalog.=) 'instances' + AND att.attname OPERATOR(pg_catalog.=) 'id' + AND a.privilege_type OPERATOR(pg_catalog.=) 'INSERT' + AND a.grantee OPERATOR(pg_catalog.<>) 0 -- skip PUBLIC + GROUP BY a.grantee + LOOP + grant_opt := CASE WHEN r.with_grant_option THEN ' WITH GRANT OPTION' ELSE '' END; + EXECUTE pg_catalog.format('GRANT INSERT (start_input) ON df.instances TO %s', r.grantee) OPERATOR(pg_catalog.||) grant_opt; + END LOOP; +END $$; + -- ============================================================================ -- df.instance_nodes(): one row per node with derived status (PR #263). -- diff --git a/src/dsl.rs b/src/dsl.rs index 9a3ef22..85f082c 100644 --- a/src/dsl.rs +++ b/src/dsl.rs @@ -129,6 +129,19 @@ fn legacy_login_role_schema() -> bool { !owner_scoped_vars_enabled() } +/// Returns true when the installed schema has `df.instances.start_input` +/// (added in 0.2.4). When true, `df.start()` records the start payload in the +/// caller's transaction and lets the background worker start the engine from it +/// (the intent path). When false — a pre-0.2.4 schema reached by a newer `.so` +/// before `ALTER EXTENSION UPDATE` — `df.start()` falls back to the previous +/// out-of-band start so the extension keeps working until the upgrade applies. +fn start_input_supported() -> bool { + let extversion = installed_extension_version(); + parse_semver(&extversion) + .map(|v| v >= (0, 2, 4)) + .unwrap_or(false) +} + /// Sets a workflow variable. Must be called BEFORE df.start(), not inside a workflow. /// Variables are captured at df.start() and remain immutable during execution. /// Each user has their own variable namespace (owner = current_user). @@ -940,6 +953,37 @@ pub fn start( let legacy_login_role = legacy_login_role_schema(); + // When the installed schema has df.instances.start_input (>= 0.2.4) we take + // the "intent" path: persist the start payload on the instance row in this + // (the caller's) transaction and let the background worker start the engine + // from it after commit. On an older schema we fall back to the pre-0.2.4 + // out-of-band start at the end of this function. + let persist_start_input = start_input_supported(); + + // Capture vars from df.vars using the installed extension version as the + // compatibility boundary: pre-0.2.0 uses legacy global vars, 0.2.0+ uses + // owner-scoped vars. Captured before the instance row is inserted so the + // snapshot can be persisted as start_input in the same INSERT. + let vars_query = if owner_scoped_vars_enabled() { + "SELECT name, value FROM df.vars WHERE owner = quote_ident(current_user)::regrole" + } else { + "SELECT name, value FROM df.vars" + }; + + let vars: std::collections::HashMap = Spi::connect(|client| { + let mut vars = std::collections::HashMap::new(); + if let Ok(table) = client.select(vars_query, None, &[]) { + for row in table { + if let (Ok(Some(name)), Ok(Some(value))) = + (row.get::(1), row.get::(2)) + { + vars.insert(name, value); + } + } + } + vars + }); + // Pre-generate the root node's ID so the instance row can point at it up // front. The same-instance FK on root_node is DEFERRABLE INITIALLY // DEFERRED, so the referenced node row need not exist yet — insert_nodes @@ -984,6 +1028,32 @@ pub fn start( database_arg, ], ) + } else if persist_start_input { + // Intent path (>= 0.2.4): persist the start payload — the vars + // snapshot captured above plus the label — as start_input so the + // worker can start the engine for this instance after commit. + let input = FunctionInput { + instance_id: candidate.to_string(), + label: label.map(|s| s.to_string()), + vars: vars.clone(), + loop_iteration: 0, + }; + let start_input_json = + serde_json::to_string(&input).unwrap_or_else(|_| candidate.to_string()); + ( + "INSERT INTO df.instances (id, label, root_node, submitted_by, database, start_input) + VALUES ($1, $2, $3, $4::oid::regrole, $5, $6::jsonb) + ON CONFLICT (id) DO NOTHING + RETURNING id", + vec![ + candidate.into(), + label_arg, + root_id.as_str().into(), + current_user_oid.into(), + database_arg, + start_input_json.into(), + ], + ) } else { ( "INSERT INTO df.instances (id, label, root_node, submitted_by, database) @@ -1028,47 +1098,39 @@ pub fn start( &mut node_count, ); - // Capture vars from df.vars using the installed extension version as the - // compatibility boundary: pre-0.2.0 uses legacy global vars, 0.2.0+ uses - // owner-scoped vars. - let vars_query = if owner_scoped_vars_enabled() { - "SELECT name, value FROM df.vars WHERE owner = quote_ident(current_user)::regrole" + if persist_start_input { + // Intent path (>= 0.2.4): the start payload is already committed on the + // instance row. Nudge the worker to start the engine immediately via a + // transactional NOTIFY — delivered only if this transaction commits, so + // a rolled-back df.start() leaves no engine "ghost". If the NOTIFY is + // missed (worker not listening, restart), the periodic reconcile sweep + // starts the instance from its committed start_input as a backstop. + if let Err(e) = Spi::run_with_args( + "SELECT pg_notify('pg_durable_start', $1)", + &[instance_id.as_str().into()], + ) { + pgrx::log!("pg_durable: Warning - failed to notify start of {instance_id}: {e}"); + } } else { - "SELECT name, value FROM df.vars" - }; + // Legacy path (< 0.2.4 schema, no start_input column): start the engine + // out-of-band on a separate connection, as before. This can leave a + // ghost if the caller's transaction rolls back, but it preserves + // behavior on a schema that has not yet been upgraded (Scenario B1). + let input = FunctionInput { + instance_id: instance_id.clone(), + label: label.map(|s| s.to_string()), + vars, + loop_iteration: 0, + }; + let input_json = serde_json::to_string(&input).unwrap_or_else(|_| instance_id.clone()); - let vars: std::collections::HashMap = Spi::connect(|client| { - let mut vars = std::collections::HashMap::new(); - if let Ok(table) = client.select(vars_query, None, &[]) { - for row in table { - if let (Ok(Some(name)), Ok(Some(value))) = - (row.get::(1), row.get::(2)) - { - vars.insert(name, value); - } - } + if let Err(e) = start_durable_function( + crate::orchestrations::execute_function_graph::NAME, + &instance_id, + &input_json, + ) { + pgrx::log!("pg_durable: Warning - failed to start durable function: {e}"); } - vars - }); - - // Start the orchestration via duroxide - let input = FunctionInput { - instance_id: instance_id.clone(), - label: label.map(|s| s.to_string()), - vars, - loop_iteration: 0, - }; - let input_json = serde_json::to_string(&input).unwrap_or(instance_id.clone()); - - if let Err(e) = start_durable_function( - crate::orchestrations::execute_function_graph::NAME, - &instance_id, - &input_json, - ) { - pgrx::log!( - "pg_durable: Warning - failed to start durable function: {}", - e - ); } instance_id diff --git a/src/lib.rs b/src/lib.rs index 14c6f8c..76dab4c 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -29,6 +29,16 @@ pub static EXECUTION_ACQUIRE_TIMEOUT: GucSetting = GucSetting::::new(3 /// functions are explicitly desired. See docs/superuser_guc.md. pub static ENABLE_SUPERUSER_INSTANCES: GucSetting = GucSetting::::new(false); +/// How often (seconds) the background worker sweeps for committed `df.instances` +/// that the durable engine has not started yet, and starts them. `0` disables +/// the periodic sweep (the instant `NOTIFY`-driven path still runs). +pub static RECONCILE_INTERVAL: GucSetting = GucSetting::::new(15); + +/// Minimum age (seconds) a pending `df.instances` row must reach before the +/// periodic sweep will start it. Keeps the sweep from racing the instant +/// `NOTIFY` path for freshly-committed instances. +pub static RECONCILE_GRACE: GucSetting = GucSetting::::new(300); + // Module declarations pub mod activities; pub mod client; @@ -127,6 +137,28 @@ pub extern "C-unwind" fn _PG_init() { GucFlags::default(), ); + GucRegistry::define_int_guc( + c"pg_durable.reconcile_interval", + c"Seconds between background sweeps that start committed df.instances the engine hasn't started yet (0 disables the sweep)", + c"The instant NOTIFY-driven start path always runs; this sweep is the backstop that starts any pending instance whose start notification was missed.", + &RECONCILE_INTERVAL, + 0, + 86400, + GucContext::Postmaster, + GucFlags::default(), + ); + + GucRegistry::define_int_guc( + c"pg_durable.reconcile_grace", + c"Minimum age in seconds a pending df.instances row must reach before the sweep starts it", + c"Prevents the periodic sweep from racing the instant NOTIFY start path for freshly-committed instances.", + &RECONCILE_GRACE, + 0, + 86400, + GucContext::Postmaster, + GucFlags::default(), + ); + GucRegistry::define_bool_guc( c"pg_durable.enable_superuser_instances", c"Allow pg_durable instances whose submitted_by role is a PostgreSQL superuser", @@ -220,7 +252,8 @@ CREATE TABLE df.instances ( database TEXT, created_at TIMESTAMPTZ DEFAULT pg_catalog.now(), updated_at TIMESTAMPTZ DEFAULT pg_catalog.now(), - completed_at TIMESTAMPTZ + completed_at TIMESTAMPTZ, + start_input JSONB ); COMMENT ON COLUMN df.instances.submitted_by IS @@ -451,7 +484,7 @@ BEGIN EXECUTE pg_catalog.format('GRANT SELECT ON df.instances TO %I', p_role) OPERATOR(pg_catalog.||) grant_opt; EXECUTE pg_catalog.format('GRANT UPDATE (status, updated_at) ON df.instances TO %I', p_role) OPERATOR(pg_catalog.||) grant_opt; EXECUTE pg_catalog.format('GRANT SELECT ON df.nodes TO %I', p_role) OPERATOR(pg_catalog.||) grant_opt; - EXECUTE pg_catalog.format('GRANT INSERT (id, label, root_node, submitted_by, database) ON df.instances TO %I', p_role) OPERATOR(pg_catalog.||) grant_opt; + EXECUTE pg_catalog.format('GRANT INSERT (id, label, root_node, submitted_by, database, start_input) ON df.instances TO %I', p_role) OPERATOR(pg_catalog.||) grant_opt; EXECUTE pg_catalog.format('GRANT INSERT (id, instance_id, node_type, query, result_name, left_node, right_node, submitted_by, database) ON df.nodes TO %I', p_role) OPERATOR(pg_catalog.||) grant_opt; EXECUTE pg_catalog.format('GRANT SELECT, INSERT, UPDATE, DELETE ON df.vars TO %I', p_role) OPERATOR(pg_catalog.||) grant_opt; @@ -501,7 +534,7 @@ BEGIN EXECUTE pg_catalog.format('REVOKE SELECT, INSERT, UPDATE, DELETE ON df.vars FROM %I CASCADE', p_role); EXECUTE pg_catalog.format('REVOKE INSERT (id, instance_id, node_type, query, result_name, left_node, right_node, submitted_by, database) ON df.nodes FROM %I CASCADE', p_role); EXECUTE pg_catalog.format('REVOKE SELECT ON df.nodes FROM %I CASCADE', p_role); - EXECUTE pg_catalog.format('REVOKE INSERT (id, label, root_node, submitted_by, database) ON df.instances FROM %I CASCADE', p_role); + EXECUTE pg_catalog.format('REVOKE INSERT (id, label, root_node, submitted_by, database, start_input) ON df.instances FROM %I CASCADE', p_role); EXECUTE pg_catalog.format('REVOKE UPDATE (status, updated_at) ON df.instances FROM %I CASCADE', p_role); EXECUTE pg_catalog.format('REVOKE SELECT ON df.instances FROM %I CASCADE', p_role); diff --git a/src/types.rs b/src/types.rs index b0ba264..622f5a7 100644 --- a/src/types.rs +++ b/src/types.rs @@ -57,6 +57,17 @@ pub fn get_execution_acquire_timeout() -> Duration { Duration::from_secs(crate::EXECUTION_ACQUIRE_TIMEOUT.get() as u64) } +/// Interval between background sweeps that start committed-but-unstarted +/// instances. Zero disables the periodic sweep (the instant NOTIFY path stays on). +pub fn get_reconcile_interval() -> Duration { + Duration::from_secs(crate::RECONCILE_INTERVAL.get() as u64) +} + +/// Minimum age a pending instance must reach before the periodic sweep starts it. +pub fn get_reconcile_grace() -> Duration { + Duration::from_secs(crate::RECONCILE_GRACE.get() as u64) +} + /// Returns `true` when superuser-submitted instances are permitted. pub fn superuser_instances_enabled() -> bool { crate::ENABLE_SUPERUSER_INSTANCES.get() diff --git a/src/worker.rs b/src/worker.rs index 2bd3bf7..e445f31 100644 --- a/src/worker.rs +++ b/src/worker.rs @@ -12,13 +12,15 @@ use std::sync::Arc; use std::time::Duration; use duroxide::runtime; +use duroxide::Client; use duroxide_pg::PostgresProvider; use tracing_subscriber::EnvFilter; use crate::registry::{create_activity_registry, create_orchestration_registry}; use crate::types::{ get_max_duroxide_connections, get_max_management_connections, get_max_user_connections, - postgres_connection_string, resolve_duroxide_schema_pool, worker_provider_config, + get_reconcile_grace, get_reconcile_interval, postgres_connection_string, + resolve_duroxide_schema_pool, worker_provider_config, }; const TERMINAL_INSTANCE_PRUNE_INTERVAL: Duration = Duration::from_secs(60 * 60); @@ -211,7 +213,7 @@ async fn run_duroxide_runtime() { duroxide_schema ); - let Some(duroxide_runtime) = initialize_duroxide_runtime( + let Some((duroxide_runtime, duroxide_store)) = initialize_duroxide_runtime( &pg_conn_str, INIT_RETRY_INTERVAL, &mgmt_pool, @@ -247,6 +249,7 @@ async fn run_duroxide_runtime() { &poll_pool, &mgmt_pool, duroxide_runtime, + duroxide_store, EXTENSION_DROP_POLL_INTERVAL, SHUTDOWN_CHECK_INTERVAL, epoch_id.as_deref(), @@ -470,7 +473,7 @@ async fn initialize_duroxide_runtime( retry_interval: Duration, mgmt_pool: &sqlx::PgPool, schema_name: &str, -) -> Option> { +) -> Option<(Arc, Arc)> { log!("pg_durable: initializing duroxide runtime..."); // Control duroxide provider pool size via env var (the only mechanism @@ -553,11 +556,16 @@ async fn initialize_duroxide_runtime( create_activity_registry(Arc::new(mgmt_pool.clone()), user_semaphore.clone()); let orchestrations = create_orchestration_registry(); + // Keep a clone of the store Arc so the worker can build a duroxide + // Client (for start reconciliation) that shares the same provider as + // the runtime. Client::new is cheap — it just wraps the Arc. + let store_for_client = store.clone(); + let duroxide_runtime = runtime::Runtime::start_with_store(store, activities, orchestrations).await; log!("pg_durable: duroxide runtime started"); - return Some(duroxide_runtime); + return Some((duroxide_runtime, store_for_client)); } } @@ -743,6 +751,7 @@ async fn run_until_extension_dropped_or_shutdown( poll_pool: &sqlx::PgPool, maintenance_pool: &sqlx::PgPool, duroxide_runtime: Arc, + duroxide_store: Arc, drop_poll_interval: Duration, shutdown_check_interval: Duration, epoch_id: Option<&str>, @@ -758,6 +767,64 @@ async fn run_until_extension_dropped_or_shutdown( ); prune_check.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay); + // Start reconciliation converges df.instances with the durable engine: + // df.start() records the instance (with its start payload) and commits, then + // the engine is started separately from that committed intent. This keeps the + // two in sync without a shared transaction — a rolled-back df.start() leaves + // no engine "ghost", and a committed instance is always eventually started. + // + // Reconciliation only acts once the schema has df.instances.start_input (>= + // 0.2.4). On an older schema df.start() starts the engine inline instead + // (Scenario B1), so there is nothing to reconcile. That schema check is made + // dynamically (not cached for the epoch) because ALTER EXTENSION UPDATE can + // add the column mid-epoch without restarting this worker. + let client = Client::new(duroxide_store.clone()); + let reconcile_interval = get_reconcile_interval(); + let sweep_configured = !reconcile_interval.is_zero(); + // Monotonic false->true: once the column appears it never goes away within a + // major version, so we stop re-probing after the first positive result. + let mut start_input_present = has_start_input_column(maintenance_pool).await; + + // The instant path: a transactional NOTIFY from df.start() (delivered only on + // commit) wakes us to start the just-committed instance immediately. LISTEN is + // schema-independent, so we always establish it — df.start() only sends the + // notification once the column exists, so a notification implies it is safe to + // start (this also covers an in-place ALTER EXTENSION UPDATE mid-epoch). + let mut start_listener: Option = + match sqlx::postgres::PgListener::connect_with(maintenance_pool).await { + Ok(mut listener) => { + match listener.listen(START_NOTIFY_CHANNEL).await { + Ok(()) => { + log!("pg_durable: listening for start notifications on '{START_NOTIFY_CHANNEL}'"); + Some(listener) + } + Err(e) => { + log!("pg_durable: failed to LISTEN {START_NOTIFY_CHANNEL} (sweep still active): {e}"); + None + } + } + } + Err(e) => { + log!("pg_durable: failed to open start-notification listener (sweep still active): {e}"); + None + } + }; + + // The backstop path: periodically start any pending instance older than the + // grace period that the engine still does not know about (a missed NOTIFY). + // The interval must be non-zero even when the sweep is disabled; the branch + // precondition gates it off in that case. + let effective_interval = if reconcile_interval.is_zero() { + TERMINAL_INSTANCE_PRUNE_INTERVAL + } else { + reconcile_interval + }; + let mut reconcile_check = tokio::time::interval_at( + tokio::time::Instant::now() + effective_interval, + effective_interval, + ); + reconcile_check.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay); + loop { tokio::select! { _ = tokio::time::sleep(shutdown_check_interval) => { @@ -792,6 +859,40 @@ async fn run_until_extension_dropped_or_shutdown( } } } + notification = recv_start_notification(&mut start_listener), if start_listener.is_some() => { + match notification { + Some(instance_id) => { + match try_start_instance(maintenance_pool, &client, &instance_id).await { + Ok(true) => log!("pg_durable: started instance {instance_id} (notify)"), + Ok(false) => {} + Err(e) => log!( + "pg_durable: notify-start of instance {instance_id} failed \ + (sweep will retry): {e}" + ), + } + } + None => { + // recv error: drop the listener and rely on the sweep. + log!("pg_durable: start-notification listener closed; relying on periodic sweep"); + start_listener = None; + } + } + } + _ = reconcile_check.tick(), if sweep_configured => { + // Re-probe until the column appears so an in-place ALTER EXTENSION + // UPDATE mid-epoch enables the sweep without a worker restart. + if !start_input_present { + start_input_present = has_start_input_column(maintenance_pool).await; + } + if start_input_present { + match reconcile_pending_starts(maintenance_pool, &client, get_reconcile_grace()).await { + (started, _) if started > 0 => { + log!("pg_durable: reconcile sweep started {started} pending instance(s)"); + } + _ => {} + } + } + } } } @@ -799,3 +900,164 @@ async fn run_until_extension_dropped_or_shutdown( duroxide_runtime.shutdown(Some(10_000)).await; log!("pg_durable: duroxide runtime shutdown complete"); } + +/// Channel used by `df.start()` (via `pg_notify`) to wake the worker so it can +/// start a just-committed instance immediately. +const START_NOTIFY_CHANNEL: &str = "pg_durable_start"; + +/// Await the next start notification, or park forever when there is no listener. +/// +/// Returns `Some(payload)` for a notification (the instance id), or `None` when +/// the listener connection errors — the caller then drops it and falls back to +/// the periodic sweep. The `None` (no-listener) arm is only reached when the +/// select! precondition wrongly lets it run; it parks so it never busy-loops. +async fn recv_start_notification( + listener: &mut Option, +) -> Option { + match listener { + Some(l) => match l.recv().await { + Ok(notification) => Some(notification.payload().to_string()), + Err(e) => { + log!("pg_durable: start-notification recv error: {e}"); + None + } + }, + None => std::future::pending().await, + } +} + +/// Returns true when the installed schema has `df.instances.start_input` +/// (added in 0.2.4). Start reconciliation is gated on this: an older schema has +/// no start payload to replay and df.start() starts the engine inline instead. +async fn has_start_input_column(pool: &sqlx::PgPool) -> bool { + match sqlx::query_scalar::<_, bool>( + "SELECT EXISTS(SELECT 1 FROM information_schema.columns + WHERE table_schema = 'df' AND table_name = 'instances' + AND column_name = 'start_input')", + ) + .fetch_one(pool) + .await + { + Ok(present) => present, + Err(e) => { + log!("pg_durable: could not probe df.instances.start_input (reconcile disabled): {e}"); + false + } + } +} + +/// Periodic backstop: start every pending instance older than `grace` that the +/// engine still does not know about. Returns `(started, errors)`. +async fn reconcile_pending_starts( + pool: &sqlx::PgPool, + client: &Client, + grace: Duration, +) -> (u64, u64) { + let grace_secs = grace.as_secs() as f64; + let ids: Vec = match sqlx::query_scalar( + "SELECT id FROM df.instances + WHERE status = 'pending' + AND created_at < now() - make_interval(secs => $1)", + ) + .bind(grace_secs) + .fetch_all(pool) + .await + { + Ok(ids) => ids, + Err(e) => { + log!("pg_durable: reconcile sweep query failed: {e}"); + return (0, 0); + } + }; + + let mut started = 0u64; + let mut errors = 0u64; + for id in ids { + match try_start_instance(pool, client, &id).await { + Ok(true) => started += 1, + Ok(false) => {} + Err(e) => { + errors += 1; + log!("pg_durable: reconcile sweep could not start instance {id}: {e}"); + } + } + } + (started, errors) +} + +/// Start a single pending instance in the durable engine, exactly once. +/// +/// Returns `Ok(true)` when this call started the engine, `Ok(false)` when there +/// was nothing to do (the engine already knows the instance, or another +/// caller/sweep claimed it first), and `Err` on a provider/database error. +/// +/// Single-start is guaranteed by an atomic claim: the row is flipped +/// `pending -> running` with `RETURNING`, so only the one caller that observes +/// it still `pending` proceeds to start the engine. If the engine start then +/// fails, the claim is rolled back to `pending` so a later sweep can retry. +async fn try_start_instance( + pool: &sqlx::PgPool, + client: &Client, + instance_id: &str, +) -> Result { + use duroxide::OrchestrationStatus; + + // Does the engine already know this instance? Only start ones it does not + // (NotFound) — this also skips instances started via the legacy inline path. + match client.get_orchestration_status(instance_id).await { + Ok(OrchestrationStatus::NotFound) => {} + Ok(_) => return Ok(false), + Err(e) => return Err(format!("status probe failed: {e:?}")), + } + + // Atomically claim the row so exactly one caller starts the engine. + let claimed: Option> = sqlx::query_scalar( + "UPDATE df.instances SET status = 'running', updated_at = now() + WHERE id = $1 AND status = 'pending' + RETURNING start_input", + ) + .bind(instance_id) + .fetch_optional(pool) + .await + .map_err(|e| format!("claim failed: {e}"))?; + + let Some(start_input) = claimed else { + // Lost the race, or the row is no longer pending — nothing to do. + return Ok(false); + }; + + // Replay the payload captured at df.start() time. Legacy rows written before + // the payload existed start with an empty vars set. + let input_json = match start_input { + Some(value) => value.to_string(), + None => serde_json::json!({ "instance_id": instance_id }).to_string(), + }; + + match client + .start_orchestration( + instance_id, + crate::orchestrations::execute_function_graph::NAME, + input_json, + ) + .await + { + Ok(()) => Ok(true), + Err(e) => { + // Roll the claim back so the next sweep can retry this instance. + if let Err(revert) = sqlx::query( + "UPDATE df.instances SET status = 'pending', updated_at = now() + WHERE id = $1 AND status = 'running'", + ) + .bind(instance_id) + .execute(pool) + .await + { + log!( + "pg_durable: failed to roll back claim for instance {instance_id} \ + after a failed start: {revert}" + ); + } + Err(format!("start_orchestration failed: {e:?}")) + } + } +} diff --git a/tests/e2e/sql/25_reconcile_start.sql b/tests/e2e/sql/25_reconcile_start.sql new file mode 100644 index 0000000..7344e04 --- /dev/null +++ b/tests/e2e/sql/25_reconcile_start.sql @@ -0,0 +1,111 @@ +-- Copyright (c) Microsoft Corporation. +-- Licensed under the PostgreSQL License. + +-- Tests the core df <-> duroxide start-convergence problem this change fixes. +-- +-- pg_durable keeps its own bookkeeping in the `df` schema; the workflow engine +-- (duroxide) keeps the running-workflow state in its own schema. `df.start()` +-- writes df rows in the caller's transaction, but the two schemas are separate +-- systems, so historically they could disagree in two ways: +-- +-- Scenario 1 (ghost): you start a workflow, then your transaction rolls back. +-- The df rows vanish, but duroxide was already told to run it out-of-band on +-- a separate connection -> a running workflow with no df record. +-- +-- Scenario 2 (stuck): the df rows are committed, but duroxide never learned it +-- should run -> a workflow that exists on paper but never executes. +-- +-- The fix records intent in df and lets the background worker converge duroxide +-- to match: a rolled-back start tells duroxide nothing, and a committed-but-not- +-- yet-running instance is started by the worker. +-- +-- The base connection is the superuser (postgres); durable functions run as the +-- non-superuser df_e2e_user. The runtime schema is resolved via +-- df.duroxide_schema(). + +-- Helper (superuser): assert no residual runtime rows for an instance id. +CREATE OR REPLACE FUNCTION pg_temp.assert_no_duroxide_residue(p_id text) +RETURNS void LANGUAGE plpgsql AS $$ +DECLARE + sch text := df.duroxide_schema(); + n int; +BEGIN + EXECUTE format( + 'SELECT (SELECT count(*) FROM %I.orchestrator_queue WHERE instance_id = $1) ' + ' + (SELECT count(*) FROM %I.instances WHERE instance_id = $1)', + sch, sch) + INTO n USING p_id; + IF n > 0 THEN + RAISE EXCEPTION 'TEST FAILED [start_rollback_ghost]: rolled-back df.start left % runtime row(s) for instance %', n, p_id; + END IF; + RAISE NOTICE 'PASSED [start_rollback_ghost]: no runtime residue for %', p_id; +END $$; + +-- =========================================================================== +-- Scenario 1: a rolled-back df.start() leaves no runtime "ghost". +-- =========================================================================== + +SET SESSION AUTHORIZATION df_e2e_user; +BEGIN; +SELECT df.start('SELECT 42', 'reconcile-start-rollback') AS rb_id \gset +ROLLBACK; +RESET SESSION AUTHORIZATION; + +-- Give any (buggy) out-of-band start a moment to surface before asserting. +SELECT pg_sleep(2); + +SELECT pg_temp.assert_no_duroxide_residue(:'rb_id'); + +-- =========================================================================== +-- Scenario 2: a committed df instance that the engine never heard about is +-- started by the worker (convergence backstop). +-- +-- We simulate "committed intent whose start never reached duroxide" by writing +-- df.instances/df.nodes directly (no df.start, no notification), backdated so it +-- is immediately eligible for the worker's reconcile sweep. On the unfixed code +-- nothing ever starts it and this scenario times out. +-- =========================================================================== + +DROP TABLE IF EXISTS reconcile_probe; +CREATE TABLE reconcile_probe (id SERIAL PRIMARY KEY, noted_at TIMESTAMPTZ DEFAULT now()); +GRANT INSERT ON reconcile_probe TO df_e2e_user; +GRANT USAGE, SELECT ON SEQUENCE reconcile_probe_id_seq TO df_e2e_user; + +-- Insert a minimal one-node graph owned by df_e2e_user, backdated past the +-- reconcile grace window. FKs between df.instances.root_node and df.nodes are +-- DEFERRABLE INITIALLY DEFERRED, so both rows can be inserted in one transaction. +BEGIN; +INSERT INTO df.nodes (id, instance_id, node_type, query, submitted_by, database) +VALUES ('deadbee1', 'abadcafe', 'SQL', 'INSERT INTO reconcile_probe DEFAULT VALUES', + 'df_e2e_user'::regrole, 'postgres'); + +INSERT INTO df.instances (id, label, root_node, status, submitted_by, database, created_at, updated_at) +VALUES ('abadcafe', 'reconcile-start-backstop', 'deadbee1', 'pending', + 'df_e2e_user'::regrole, 'postgres', + now() - interval '10 minutes', now() - interval '10 minutes'); +COMMIT; + +DO $$ +DECLARE status TEXT; attempts INT := 0; +BEGIN + LOOP + SELECT s INTO status FROM df.status('abadcafe') s; + EXIT WHEN lower(status) IN ('completed', 'failed', 'cancelled') OR attempts > 600; + PERFORM pg_sleep(0.1); + attempts := attempts + 1; + END LOOP; + + IF lower(COALESCE(status, 'pending')) <> 'completed' THEN + RAISE EXCEPTION 'TEST FAILED [start_backstop]: worker did not start committed instance abadcafe (status=%)', status; + END IF; + + IF NOT EXISTS (SELECT 1 FROM reconcile_probe) THEN + RAISE EXCEPTION 'TEST FAILED [start_backstop]: instance completed but its node never ran'; + END IF; + + RAISE NOTICE 'PASSED [start_backstop]: worker started and completed committed instance abadcafe'; +END $$; + +DROP TABLE reconcile_probe; + +SELECT 'TEST PASSED: reconcile start' AS result; From 368149edc4bb008a8ae4277848ab9278d349355f Mon Sep 17 00:00:00 2001 From: tjgreen42 <1738591+tjgreen42@users.noreply.github.com> Date: Wed, 1 Jul 2026 05:02:24 +0000 Subject: [PATCH 2/2] Harden start reconciliation per code review Address findings from the PR review: - Security (High): the worker replayed the caller-writable start_input verbatim, and the orchestration trusted the instance id embedded in it to load the graph (on the superuser pool, bypassing RLS). A low-privilege user could INSERT a crafted pending row whose start_input pointed at another user's instance and have the worker run the victim's workflow as the victim. The worker now forces the engine input's instance id to the claimed row id, and execute() uses the durable runtime's instance id (ctx.instance_id()) instead of the payload's. Added tests/e2e/sql/26_reconcile_security.sql. - Crash window (High): the claim (pending -> running) commits before the engine start, so a worker crash in between stranded the row in running forever (the pending-only sweep could never recover it). The sweep now also reconciles stale claims: a running row the engine reports NotFound for whose claim is older than the grace is re-claimed and re-started. Added a stale-running recovery scenario to 25_reconcile_start.sql. - Listener recovery (Medium): the start listener was created once and dropped permanently on any recv error, so with reconcile_interval=0 a single error silently stopped all starts. The backstop tick now re-establishes the listener when it drops, honoring the "interval=0 keeps the instant path" contract. - Bounded sweep (Low): the reconcile sweep now LIMITs its batch and checks for shutdown between iterations so a large backlog can't starve shutdown/drop detection. - Cancel race: if an instance is cancelled between the claim and the start, the worker re-issues the cancel to the engine so the workflow does not run while df reads cancelled. - Restart latency: a startup catch-up sweep (zero grace) starts instances whose NOTIFY was missed while the worker was down, instead of waiting out the grace. --- docs/spec-reconcile-start.md | 60 ++++- src/orchestrations/execute_function_graph.rs | 11 +- src/worker.rs | 251 ++++++++++++++----- tests/e2e/sql/25_reconcile_start.sql | 46 ++++ tests/e2e/sql/26_reconcile_security.sql | 117 +++++++++ 5 files changed, 406 insertions(+), 79 deletions(-) create mode 100644 tests/e2e/sql/26_reconcile_security.sql diff --git a/docs/spec-reconcile-start.md b/docs/spec-reconcile-start.md index 5c06117..7e2f431 100644 --- a/docs/spec-reconcile-start.md +++ b/docs/spec-reconcile-start.md @@ -56,7 +56,8 @@ Why this fixes both modes: row. A rolled-back `df.start()` leaves no row and delivers no notification, so nothing is ever started. No ghost is possible. - **Stuck:** even if the instant notification is lost, the sweep eventually finds - the committed-but-unstarted row and starts it. Convergence is guaranteed. + the committed-but-unstarted row and starts it. Convergence is guaranteed, + including the case where a worker dies mid-start (see "Exactly-once start"). ### Why go through the Rust API instead of the engine's SQL @@ -73,16 +74,41 @@ the two can share one transaction. ### Exactly-once start The instant path and the sweep can both notice the same instance. Start-once is -guaranteed by an atomic claim: the worker flips the row `pending -> running` with -`UPDATE ... WHERE status = 'pending' RETURNING start_input`. Only the one caller -that observes the row still `pending` proceeds to call the engine. If that engine -call fails, the claim is rolled back to `pending` so a later sweep retries. +guaranteed by an atomic claim: the worker flips the row to `running` with +`UPDATE ... RETURNING start_input`. Only the one caller that wins the claim calls +the engine. If that engine call fails, the claim is rolled back to `pending` so a +later sweep retries. + +The claim commits before the engine start (two systems, two transactions), so a +hard worker crash *between* the claim and the engine start would otherwise strand +the row in `running` forever — a state the `pending`-only sweep could never +recover, silently violating the convergence guarantee. To close this window the +sweep also reconciles **stale claims**: a `running` row the engine does not know +about (`get_orchestration_status == NotFound`) whose claim is older than the grace +is re-claimed and re-started. The `NotFound` gate ensures a genuinely-running +instance is never restarted, and the age guard avoids racing a start that is +merely mid-dispatch. + +### Trust boundary: `start_input` is caller-writable + +`df.instances.start_input` is written by the caller of `df.start()`, and the +orchestration selects which graph to load — and which role runs the SQL — from the +input's instance id. The worker must therefore **never trust the instance id +embedded in the payload**: it forces the engine input's instance id to the claimed +row id, and the orchestration independently uses the durable runtime's instance id +(`ctx.instance_id()`) rather than the payload's. Otherwise a low-privilege user +could INSERT a crafted `pending` row whose `start_input` points at another user's +instance and have the superuser worker run the victim's workflow as the victim. +The persisted vars/label are still replayed (they belong to the caller's own +instance), only the identity is re-derived from trusted state. ## Schema changes (0.2.4) - `df.instances.start_input JSONB` (nullable) — the `FunctionInput` captured at - `df.start()` time so the worker can replay the exact start payload. Rows written - before this column existed replay with an empty vars set. + `df.start()` time so the worker can replay the exact start payload. The worker + overrides the payload's instance id with the claimed row id before replaying it + (see the trust boundary above). Rows written before this column existed replay + with an empty vars set. - Added to the `df.grant_usage()` / `df.revoke_usage()` INSERT column list; the upgrade script backfills `GRANT INSERT (start_input)` to existing df-usage roles. @@ -111,16 +137,26 @@ call fails, the claim is rolled back to `pending` so a later sweep retries. ## Scope -This change fixes the **start** direction (df has it, engine does not). The reverse -directions (`df.cancel()` / `df.signal()` rollback leaks, or engine-has-it / -df-does-not orphan cleanup) are out of scope and remain on the existing client -path. +This change fixes the **start** direction (df has it, engine does not). The +reverse orphan-cleanup direction (engine-has-it / df-does-not) is out of scope. +`df.cancel()` / `df.signal()` remain on the existing client path; as a targeted +exception, the worker re-issues a cancel to the engine if an instance was moved to +`cancelled` between the claim and the start, so a cancel that races the deferred +start is not silently ignored. ## Testing -`tests/e2e/sql/25_reconcile_start.sql` exercises both failure modes end-to-end: +`tests/e2e/sql/25_reconcile_start.sql` exercises the failure modes end-to-end: - **Scenario 1 (ghost):** `BEGIN; SELECT df.start(...); ROLLBACK;` then assert the duroxide runtime has no residue for that instance. (Verified RED on `main`.) - **Scenario 2 (stuck):** commit a backdated `pending` instance directly (no notification), then assert the worker's sweep starts and completes it. +- **Scenario 3 (stale claim):** commit a backdated `running` instance the engine + never started (simulating a crash between the claim and the engine start), then + assert the sweep recovers and completes it. + +`tests/e2e/sql/26_reconcile_security.sql` constructs the cross-tenant attack — a +low-privilege role INSERTs a crafted `pending` row whose `start_input` points at a +victim instance and triggers it via `pg_notify` — and asserts the attacker's row +runs its own graph while the victim's protected table is never written. diff --git a/src/orchestrations/execute_function_graph.rs b/src/orchestrations/execute_function_graph.rs index d5752af..055471d 100644 --- a/src/orchestrations/execute_function_graph.rs +++ b/src/orchestrations/execute_function_graph.rs @@ -112,9 +112,18 @@ struct SubtreeEnvelope { /// rather than completing with a control-flow value. Callers should treat the returned /// `Err` strictly as a failure and must not add retry/recovery logic for break. pub async fn execute(ctx: OrchestrationContext, input_json: String) -> Result { - let input: FunctionInput = serde_json::from_str(&input_json) + let mut input: FunctionInput = serde_json::from_str(&input_json) .map_err(|e| format!("Invalid orchestration input: {e}"))?; + // Trust the durable runtime's instance id, never the (caller-writable) + // instance id embedded in the input payload. df.instances.start_input is + // written by the caller of df.start(), and the background worker replays it + // to start this orchestration; a forged instance_id in that payload must not + // be able to redirect graph/identity loading to a different instance. For + // every legitimate start the two ids are identical, so this is a no-op on the + // happy path and replay-safe for in-flight instances. + input.instance_id = ctx.instance_id(); + let label_info = input .label .as_ref() diff --git a/src/worker.rs b/src/worker.rs index e445f31..e7b99e1 100644 --- a/src/worker.rs +++ b/src/worker.rs @@ -790,39 +790,36 @@ async fn run_until_extension_dropped_or_shutdown( // schema-independent, so we always establish it — df.start() only sends the // notification once the column exists, so a notification implies it is safe to // start (this also covers an in-place ALTER EXTENSION UPDATE mid-epoch). - let mut start_listener: Option = - match sqlx::postgres::PgListener::connect_with(maintenance_pool).await { - Ok(mut listener) => { - match listener.listen(START_NOTIFY_CHANNEL).await { - Ok(()) => { - log!("pg_durable: listening for start notifications on '{START_NOTIFY_CHANNEL}'"); - Some(listener) - } - Err(e) => { - log!("pg_durable: failed to LISTEN {START_NOTIFY_CHANNEL} (sweep still active): {e}"); - None - } - } - } - Err(e) => { - log!("pg_durable: failed to open start-notification listener (sweep still active): {e}"); - None - } - }; + let mut start_listener = connect_start_listener(maintenance_pool).await; + + // Catch-up sweep: a NOTIFY is not durable, so any df.start() that committed + // while the worker was down (or before the listener was established) has no + // pending notification. Run one immediate sweep with zero grace so those + // already-committed rows start promptly instead of waiting out the grace + // period; there is no concurrent instant path racing these pre-existing rows + // at startup, and the atomic claim still makes it exactly-once against any + // notification that does arrive. + if start_input_present { + let (started, _) = + reconcile_pending_starts(maintenance_pool, &client, Duration::ZERO).await; + if started > 0 { + log!("pg_durable: startup reconcile started {started} pending instance(s)"); + } + } - // The backstop path: periodically start any pending instance older than the - // grace period that the engine still does not know about (a missed NOTIFY). - // The interval must be non-zero even when the sweep is disabled; the branch - // precondition gates it off in that case. - let effective_interval = if reconcile_interval.is_zero() { - TERMINAL_INSTANCE_PRUNE_INTERVAL + // The backstop tick: fires on a fixed cadence to (1) re-establish the start + // listener if it has dropped (so the instant path self-heals after a + // transient listener error) and (2) run the periodic sweep. The tick fires + // even when the sweep is disabled (reconcile_interval = 0) so the listener is + // still rebuilt — honoring the GUC contract that "0 disables the sweep, the + // instant NOTIFY path still runs". + let tick_interval = if reconcile_interval.is_zero() { + LISTENER_RECHECK_INTERVAL } else { reconcile_interval }; - let mut reconcile_check = tokio::time::interval_at( - tokio::time::Instant::now() + effective_interval, - effective_interval, - ); + let mut reconcile_check = + tokio::time::interval_at(tokio::time::Instant::now() + tick_interval, tick_interval); reconcile_check.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay); loop { @@ -862,7 +859,7 @@ async fn run_until_extension_dropped_or_shutdown( notification = recv_start_notification(&mut start_listener), if start_listener.is_some() => { match notification { Some(instance_id) => { - match try_start_instance(maintenance_pool, &client, &instance_id).await { + match try_start_instance(maintenance_pool, &client, &instance_id, get_reconcile_grace()).await { Ok(true) => log!("pg_durable: started instance {instance_id} (notify)"), Ok(false) => {} Err(e) => log!( @@ -872,24 +869,32 @@ async fn run_until_extension_dropped_or_shutdown( } } None => { - // recv error: drop the listener and rely on the sweep. - log!("pg_durable: start-notification listener closed; relying on periodic sweep"); + // recv error: drop the listener; the reconcile tick below + // re-establishes it so the instant path recovers. + log!("pg_durable: start-notification listener closed; will re-establish"); start_listener = None; } } } - _ = reconcile_check.tick(), if sweep_configured => { - // Re-probe until the column appears so an in-place ALTER EXTENSION - // UPDATE mid-epoch enables the sweep without a worker restart. - if !start_input_present { - start_input_present = has_start_input_column(maintenance_pool).await; + _ = reconcile_check.tick() => { + // Re-establish the instant path if the listener dropped on a + // transient recv error, so it recovers even when the sweep is off. + if start_listener.is_none() { + start_listener = connect_start_listener(maintenance_pool).await; } - if start_input_present { - match reconcile_pending_starts(maintenance_pool, &client, get_reconcile_grace()).await { - (started, _) if started > 0 => { - log!("pg_durable: reconcile sweep started {started} pending instance(s)"); + if sweep_configured { + // Re-probe until the column appears so an in-place ALTER EXTENSION + // UPDATE mid-epoch enables the sweep without a worker restart. + if !start_input_present { + start_input_present = has_start_input_column(maintenance_pool).await; + } + if start_input_present { + match reconcile_pending_starts(maintenance_pool, &client, get_reconcile_grace()).await { + (started, _) if started > 0 => { + log!("pg_durable: reconcile sweep started {started} pending instance(s)"); + } + _ => {} } - _ => {} } } } @@ -905,11 +910,43 @@ async fn run_until_extension_dropped_or_shutdown( /// start a just-committed instance immediately. const START_NOTIFY_CHANNEL: &str = "pg_durable_start"; +/// How often the backstop tick fires when the periodic sweep is disabled +/// (`reconcile_interval = 0`). The tick still runs at this cadence purely to +/// re-establish the start listener if it dropped, so the instant NOTIFY path +/// self-heals even with the sweep turned off. +const LISTENER_RECHECK_INTERVAL: Duration = Duration::from_secs(30); + +/// Open a dedicated connection and LISTEN on the start-notification channel. +/// Returns `None` (with a log) if the connection or LISTEN fails; the caller +/// falls back to the periodic sweep and retries the connection on the next tick. +async fn connect_start_listener(pool: &sqlx::PgPool) -> Option { + match sqlx::postgres::PgListener::connect_with(pool).await { + Ok(mut listener) => match listener.listen(START_NOTIFY_CHANNEL).await { + Ok(()) => { + log!("pg_durable: listening for start notifications on '{START_NOTIFY_CHANNEL}'"); + Some(listener) + } + Err(e) => { + log!( + "pg_durable: failed to LISTEN {START_NOTIFY_CHANNEL} (sweep still active): {e}" + ); + None + } + }, + Err(e) => { + log!( + "pg_durable: failed to open start-notification listener (sweep still active): {e}" + ); + None + } + } +} + /// Await the next start notification, or park forever when there is no listener. /// /// Returns `Some(payload)` for a notification (the instance id), or `None` when -/// the listener connection errors — the caller then drops it and falls back to -/// the periodic sweep. The `None` (no-listener) arm is only reached when the +/// the listener connection errors — the caller then drops it and the reconcile +/// tick re-establishes it. The `None` (no-listener) arm is only reached when the /// select! precondition wrongly lets it run; it parks so it never busy-loops. async fn recv_start_notification( listener: &mut Option, @@ -946,8 +983,18 @@ async fn has_start_input_column(pool: &sqlx::PgPool) -> bool { } } -/// Periodic backstop: start every pending instance older than `grace` that the -/// engine still does not know about. Returns `(started, errors)`. +/// Maximum instances one reconcile sweep tick processes, so a large backlog +/// cannot monopolize the worker's select! loop (starving shutdown/drop +/// detection). Remaining rows are picked up on subsequent ticks. +const RECONCILE_SWEEP_BATCH: i64 = 256; + +/// Periodic backstop. Starts (a) freshly-committed `pending` instances older than +/// `grace` whose start NOTIFY was missed, and (b) stale `running` instances whose +/// claim is older than `grace` but which the engine never actually started (a +/// worker that died between claiming and starting — see `try_start_instance`). +/// The `get_orchestration_status == NotFound` gate inside `try_start_instance` +/// ensures a genuinely-running instance is never restarted. Returns +/// `(started, errors)`. async fn reconcile_pending_starts( pool: &sqlx::PgPool, client: &Client, @@ -956,10 +1003,13 @@ async fn reconcile_pending_starts( let grace_secs = grace.as_secs() as f64; let ids: Vec = match sqlx::query_scalar( "SELECT id FROM df.instances - WHERE status = 'pending' - AND created_at < now() - make_interval(secs => $1)", + WHERE (status = 'pending' AND created_at < now() - make_interval(secs => $1)) + OR (status = 'running' AND updated_at < now() - make_interval(secs => $1)) + ORDER BY created_at + LIMIT $2", ) .bind(grace_secs) + .bind(RECONCILE_SWEEP_BATCH) .fetch_all(pool) .await { @@ -973,7 +1023,12 @@ async fn reconcile_pending_starts( let mut started = 0u64; let mut errors = 0u64; for id in ids { - match try_start_instance(pool, client, &id).await { + // Yield the select! loop promptly on shutdown instead of draining a + // potentially large backlog first. + if is_shutdown_requested() { + break; + } + match try_start_instance(pool, client, &id, grace).await { Ok(true) => started += 1, Ok(false) => {} Err(e) => { @@ -985,53 +1040,114 @@ async fn reconcile_pending_starts( (started, errors) } -/// Start a single pending instance in the durable engine, exactly once. +/// Build the durable-engine input for `instance_id` from its persisted +/// `start_input`, **forcing** the input's instance id to `instance_id`. +/// +/// `df.instances.start_input` is caller-writable, and the orchestration selects +/// which graph to load and which role to run as from the input's instance id, so +/// the worker must never trust the payload's own id — otherwise a caller could +/// craft a row whose payload points at another user's instance and have the +/// superuser worker start it. Legacy rows (NULL payload) start with an empty +/// vars set. +fn build_start_input(instance_id: &str, start_input: Option) -> String { + let mut value = match start_input { + Some(v @ serde_json::Value::Object(_)) => v, + _ => serde_json::json!({}), + }; + if let serde_json::Value::Object(ref mut map) = value { + map.insert( + "instance_id".to_string(), + serde_json::Value::String(instance_id.to_string()), + ); + } + value.to_string() +} + +/// If `instance_id` was moved to the terminal `cancelled` state between the claim +/// and the engine start, cancel the engine instance too so df and the engine +/// converge. `df.cancel()` enqueues its cancel before flipping the row, and that +/// cancel can be dropped as an orphan if it reaches the engine before any history +/// exists; without this the workflow would run to completion while df reads +/// `cancelled`. +async fn reconcile_cancel_if_needed(pool: &sqlx::PgPool, client: &Client, instance_id: &str) { + let cancelled: bool = + sqlx::query_scalar("SELECT status = 'cancelled' FROM df.instances WHERE id = $1") + .bind(instance_id) + .fetch_optional(pool) + .await + .ok() + .flatten() + .unwrap_or(false); + + if cancelled { + if let Err(e) = client + .cancel_instance(instance_id, "cancelled before start (reconciled)") + .await + { + log!("pg_durable: failed to reconcile cancel for instance {instance_id}: {e:?}"); + } + } +} + +/// Start a single instance in the durable engine, exactly once. /// /// Returns `Ok(true)` when this call started the engine, `Ok(false)` when there /// was nothing to do (the engine already knows the instance, or another /// caller/sweep claimed it first), and `Err` on a provider/database error. /// -/// Single-start is guaranteed by an atomic claim: the row is flipped -/// `pending -> running` with `RETURNING`, so only the one caller that observes -/// it still `pending` proceeds to start the engine. If the engine start then -/// fails, the claim is rolled back to `pending` so a later sweep can retry. +/// Single-start is guaranteed by an atomic claim: the row is flipped to +/// `running` with `RETURNING`, so only the one caller that wins the claim starts +/// the engine. Two cases are claimable: +/// - a fresh `pending` row (the normal instant/backstop start), and +/// - a `running` row the engine does not know about whose claim is older than +/// `stale_running_grace` — a stale claim left by a worker that died between the +/// claim and the engine start. Re-claiming it retries the start, closing the +/// crash-between-claim-and-start window (the claim commits before the engine +/// enqueue, so a hard crash in that gap would otherwise strand the row in +/// `running` forever, which the `pending`-only sweep could never recover). +/// +/// If the engine start fails, the claim is rolled back to `pending` so a later +/// sweep can retry. async fn try_start_instance( pool: &sqlx::PgPool, client: &Client, instance_id: &str, + stale_running_grace: Duration, ) -> Result { use duroxide::OrchestrationStatus; // Does the engine already know this instance? Only start ones it does not - // (NotFound) — this also skips instances started via the legacy inline path. + // (NotFound) — this skips instances the engine is already running or has + // finished (including any started via the legacy inline path), so a + // genuinely-running `running` row is never restarted by the stale-claim path. match client.get_orchestration_status(instance_id).await { Ok(OrchestrationStatus::NotFound) => {} Ok(_) => return Ok(false), Err(e) => return Err(format!("status probe failed: {e:?}")), } - // Atomically claim the row so exactly one caller starts the engine. + // Atomically claim the row (see the doc comment for the two claimable cases). + let stale_secs = stale_running_grace.as_secs() as f64; let claimed: Option> = sqlx::query_scalar( "UPDATE df.instances SET status = 'running', updated_at = now() - WHERE id = $1 AND status = 'pending' + WHERE id = $1 + AND (status = 'pending' + OR (status = 'running' + AND updated_at < now() - make_interval(secs => $2))) RETURNING start_input", ) .bind(instance_id) + .bind(stale_secs) .fetch_optional(pool) .await .map_err(|e| format!("claim failed: {e}"))?; let Some(start_input) = claimed else { - // Lost the race, or the row is no longer pending — nothing to do. + // Lost the race, or the row is no longer startable (e.g. cancelled). return Ok(false); }; - // Replay the payload captured at df.start() time. Legacy rows written before - // the payload existed start with an empty vars set. - let input_json = match start_input { - Some(value) => value.to_string(), - None => serde_json::json!({ "instance_id": instance_id }).to_string(), - }; + let input_json = build_start_input(instance_id, start_input); match client .start_orchestration( @@ -1041,9 +1157,12 @@ async fn try_start_instance( ) .await { - Ok(()) => Ok(true), + Ok(()) => { + reconcile_cancel_if_needed(pool, client, instance_id).await; + Ok(true) + } Err(e) => { - // Roll the claim back so the next sweep can retry this instance. + // Roll the claim back so a later sweep can retry this instance. if let Err(revert) = sqlx::query( "UPDATE df.instances SET status = 'pending', updated_at = now() WHERE id = $1 AND status = 'running'", diff --git a/tests/e2e/sql/25_reconcile_start.sql b/tests/e2e/sql/25_reconcile_start.sql index 7344e04..f0c9f0a 100644 --- a/tests/e2e/sql/25_reconcile_start.sql +++ b/tests/e2e/sql/25_reconcile_start.sql @@ -106,6 +106,52 @@ BEGIN RAISE NOTICE 'PASSED [start_backstop]: worker started and completed committed instance abadcafe'; END $$; +-- =========================================================================== +-- Scenario 3: a stale "running" claim left by a worker that died between the +-- atomic claim (pending -> running) and the engine start is recovered. +-- +-- We simulate the crashed-mid-start state directly: an instance whose df row is +-- 'running' (as if the claim committed) but which the engine never started, +-- backdated so its claim is older than the reconcile grace window. The sweep +-- must notice the engine does not know it and re-start it. On code that only +-- sweeps 'pending' rows this instance is stranded forever and the scenario times +-- out. +-- =========================================================================== + +BEGIN; +INSERT INTO df.nodes (id, instance_id, node_type, query, submitted_by, database) +VALUES ('deadbee2', 'badcab1e', 'SQL', 'INSERT INTO reconcile_probe DEFAULT VALUES', + 'df_e2e_user'::regrole, 'postgres'); + +INSERT INTO df.instances (id, label, root_node, status, submitted_by, database, start_input, created_at, updated_at) +VALUES ('badcab1e', 'reconcile-stale-running', 'deadbee2', 'running', + 'df_e2e_user'::regrole, 'postgres', '{"instance_id": "badcab1e", "vars": {}}'::jsonb, + now() - interval '10 minutes', now() - interval '10 minutes'); +COMMIT; + +DO $$ +DECLARE status TEXT; attempts INT := 0; probe_before INT; probe_after INT; +BEGIN + SELECT count(*) INTO probe_before FROM reconcile_probe; + LOOP + SELECT s INTO status FROM df.status('badcab1e') s; + EXIT WHEN lower(status) IN ('completed', 'failed', 'cancelled') OR attempts > 600; + PERFORM pg_sleep(0.1); + attempts := attempts + 1; + END LOOP; + + IF lower(COALESCE(status, 'running')) <> 'completed' THEN + RAISE EXCEPTION 'TEST FAILED [stale_running_recovery]: worker did not recover stale-running instance badcab1e (status=%)', status; + END IF; + + SELECT count(*) INTO probe_after FROM reconcile_probe; + IF probe_after <= probe_before THEN + RAISE EXCEPTION 'TEST FAILED [stale_running_recovery]: instance completed but its node never ran'; + END IF; + + RAISE NOTICE 'PASSED [stale_running_recovery]: worker recovered stale-running instance badcab1e'; +END $$; + DROP TABLE reconcile_probe; SELECT 'TEST PASSED: reconcile start' AS result; diff --git a/tests/e2e/sql/26_reconcile_security.sql b/tests/e2e/sql/26_reconcile_security.sql new file mode 100644 index 0000000..bf484ce --- /dev/null +++ b/tests/e2e/sql/26_reconcile_security.sql @@ -0,0 +1,117 @@ +-- Copyright (c) Microsoft Corporation. +-- Licensed under the PostgreSQL License. + +-- Security regression test for start reconciliation. +-- +-- df.instances.start_input is caller-writable (df.start() runs as the caller and +-- writes it). The background worker replays it to start the durable engine, and +-- the orchestration selects which graph to load — and therefore which role runs +-- the SQL — from the instance id. If the worker/orchestration trusted the +-- instance id embedded in the payload, a low-privilege user could INSERT a +-- crafted pending row whose start_input points at ANOTHER user's instance and +-- have the superuser worker start it, running the victim's workflow as the +-- victim with attacker-chosen vars (cross-tenant execution / SQL injection). +-- +-- This test constructs exactly that attack across two roles and asserts it is +-- blocked: the attacker's row runs the attacker's own graph, and the victim's +-- protected table is never written by the attacker-triggered start. +-- +-- Base connection is the superuser (postgres); the crafted rows are inserted by +-- the two non-superuser roles themselves so RLS and column grants apply. All ids +-- are 8 lowercase-hex characters (enforced by the df.instances/df.nodes CHECKs). + +-- --- Setup (superuser) ------------------------------------------------------ +DO $$ +BEGIN + IF NOT EXISTS (SELECT 1 FROM pg_roles WHERE rolname = 'df_sec_victim') THEN + CREATE ROLE df_sec_victim LOGIN; + END IF; +END $$; +SELECT df.grant_usage('df_sec_victim'); + +DROP TABLE IF EXISTS sec_victim_secret; +DROP TABLE IF EXISTS sec_attacker_probe; +CREATE TABLE sec_victim_secret (marker TEXT); +CREATE TABLE sec_attacker_probe (marker TEXT); +-- Only the victim can write its secret table; only the attacker can write its +-- own probe table. If the redirect worked, the victim's node (running as the +-- victim) would write sec_victim_secret. +GRANT INSERT ON sec_victim_secret TO df_sec_victim; +GRANT INSERT ON sec_attacker_probe TO df_e2e_user; + +-- --- Victim creates an (unstarted) instance whose graph writes its secret ----- +-- Inserted directly and left pending: it is neither notified nor old enough for +-- the reconcile grace window, so it will not start on its own during the test. +SET SESSION AUTHORIZATION df_sec_victim; +BEGIN; +INSERT INTO df.nodes (id, instance_id, node_type, query, submitted_by, database) +VALUES ('c0ffee0d', 'c0ffee00', 'SQL', + 'INSERT INTO sec_victim_secret VALUES (''victim-secret'')', + 'df_sec_victim'::regrole, 'postgres'); +INSERT INTO df.instances (id, label, root_node, submitted_by, database, start_input) +VALUES ('c0ffee00', 'sec-victim', 'c0ffee0d', + 'df_sec_victim'::regrole, 'postgres', + '{"instance_id": "c0ffee00", "vars": {}}'::jsonb); +COMMIT; +RESET SESSION AUTHORIZATION; + +-- --- Attacker crafts a row whose start_input points at the victim instance ---- +SET SESSION AUTHORIZATION df_e2e_user; +BEGIN; +INSERT INTO df.nodes (id, instance_id, node_type, query, submitted_by, database) +VALUES ('facade0d', 'facade00', 'SQL', + 'INSERT INTO sec_attacker_probe VALUES (''attacker-ran'')', + 'df_e2e_user'::regrole, 'postgres'); +INSERT INTO df.instances (id, label, root_node, submitted_by, database, start_input) +VALUES ('facade00', 'sec-attacker', 'facade0d', + 'df_e2e_user'::regrole, 'postgres', + '{"instance_id": "c0ffee00", "vars": {"x": "; DROP TABLE sec_victim_secret; --"}}'::jsonb); +COMMIT; +-- Trigger the crafted row immediately via the public NOTIFY channel. +SELECT pg_notify('pg_durable_start', 'facade00'); +RESET SESSION AUTHORIZATION; + +-- --- Wait for the attacker instance to finish, then assert isolation held ----- +DO $$ +DECLARE status TEXT; attempts INT := 0; +BEGIN + LOOP + SELECT s INTO status FROM df.status('facade00') s; + EXIT WHEN lower(status) IN ('completed', 'failed', 'cancelled') OR attempts > 600; + PERFORM pg_sleep(0.1); + attempts := attempts + 1; + END LOOP; + + -- The attacker instance must run ITS OWN graph (writing its own probe), not + -- the victim's — proving start_input.instance_id did not redirect loading. + IF lower(COALESCE(status, 'pending')) <> 'completed' THEN + RAISE EXCEPTION 'TEST FAILED [start_input_redirect]: attacker instance did not run its own graph (status=%)', status; + END IF; + IF NOT EXISTS (SELECT 1 FROM sec_attacker_probe WHERE marker = 'attacker-ran') THEN + RAISE EXCEPTION 'TEST FAILED [start_input_redirect]: attacker instance completed but its own node never ran'; + END IF; + + -- The victim's graph must NOT have executed as a side effect of starting the + -- attacker's instance. A non-empty secret table means the redirect worked. + IF EXISTS (SELECT 1 FROM sec_victim_secret) THEN + RAISE EXCEPTION 'TEST FAILED [start_input_redirect]: victim graph executed via attacker-controlled start_input (privilege escalation)'; + END IF; + + RAISE NOTICE 'PASSED [start_input_redirect]: crafted start_input could not redirect graph loading to the victim instance'; +END $$; + +-- --- Cleanup ---------------------------------------------------------------- +-- Mark the still-pending victim row terminal so the reconcile sweep never starts +-- it after this test (superuser UPDATE bypasses RLS/grants); terminal rows are +-- pruned by the worker. The attacker row is already terminal (completed). +UPDATE df.instances SET status = 'cancelled', updated_at = now() +WHERE id IN ('c0ffee00', 'facade00') AND status NOT IN ('completed', 'failed', 'cancelled'); +DROP TABLE sec_victim_secret; +DROP TABLE sec_attacker_probe; +-- DROP OWNED BY removes df_sec_victim's remaining grants so DROP ROLE succeeds; +-- the cancelled instance row (submitted_by = df_sec_victim) survives with a +-- dangling regrole, which the extension tolerates, and is pruned as terminal. +DROP OWNED BY df_sec_victim; +DROP ROLE IF EXISTS df_sec_victim; + +SELECT 'TEST PASSED: reconcile security' AS result;