diff --git a/USER_GUIDE.md b/USER_GUIDE.md index a2b8c2c..b7717ba 100644 --- a/USER_GUIDE.md +++ b/USER_GUIDE.md @@ -23,9 +23,10 @@ pg_durable is a PostgreSQL extension that brings durable, fault-tolerant functio 13. [Monitoring](#monitoring) 14. [User Isolation & Privileges](#user-isolation--privileges) 15. [Connection Limits](#connection-limits) -16. [Troubleshooting](#troubleshooting) -17. [Quick Reference Card](#quick-reference-card) -18. [Appendix: Test Data Setup](#appendix-test-data-setup) +16. [Start Reconciliation](#start-reconciliation) +17. [Troubleshooting](#troubleshooting) +18. [Quick Reference Card](#quick-reference-card) +19. [Appendix: Test Data Setup](#appendix-test-data-setup) --- @@ -2012,6 +2013,59 @@ pg_durable.execution_acquire_timeout = 60 --- +## Start Reconciliation + +pg_durable keeps its own bookkeeping in the `df` schema, while the workflow +engine (duroxide) keeps running-workflow state in its own schema. `df.start()` +writes the `df` rows for a new instance **in your transaction** and then lets the +background worker start the engine from that committed record. This keeps the two +schemas in sync without sharing a transaction: + +- **If your transaction rolls back**, the `df` rows disappear and the engine was + never told to run — so a rolled-back `df.start()` never leaves a "ghost" + workflow running with no `df` record. +- **Once your transaction commits**, the instance is *always* eventually started, + even if the worker was briefly down when you called `df.start()`. + +Two mechanisms converge the state: + +1. **Instant path** — `df.start()` sends a transactional `NOTIFY` (delivered only + if the transaction commits). The worker is listening and starts the instance + immediately. +2. **Backstop sweep** — the worker periodically scans for committed-but-unstarted + instances (a missed notification) and starts any it finds that the engine does + not yet know about. + +Under normal operation the instant path starts workflows in milliseconds; the +sweep exists only to guarantee eventual convergence. + +### GUC Reference + +Both GUCs are **Postmaster-context** — set them in `postgresql.conf` and restart +PostgreSQL. + +```ini +# postgresql.conf + +# How often (seconds) the worker sweeps for committed-but-unstarted instances. +# Set to 0 to disable the periodic sweep (the instant NOTIFY path stays on). +pg_durable.reconcile_interval = 15 + +# Minimum age (seconds) a pending instance must reach before the sweep starts it. +# Keeps the sweep from racing the instant NOTIFY path for freshly-committed work. +pg_durable.reconcile_grace = 300 +``` + +The defaults suit most deployments. Lower `reconcile_grace` if you want the +backstop to recover missed starts sooner; raise `reconcile_interval` to reduce +background query load on very large `df.instances` tables. + +> **Note:** Reconciliation activates only on schema version 0.2.4 and later +> (when `df.instances.start_input` exists). On an older schema `df.start()` +> starts the engine inline instead, so there is nothing to reconcile. + +--- + ## Troubleshooting ### Extension Exists But Workflows Don't Start diff --git a/docs/spec-reconcile-start.md b/docs/spec-reconcile-start.md new file mode 100644 index 0000000..7e2f431 --- /dev/null +++ b/docs/spec-reconcile-start.md @@ -0,0 +1,162 @@ +# Start Reconciliation Specification + +## The problem, in plain terms + +pg_durable stores two copies of "what workflows exist": + +- The **`df` schema** — pg_durable's own bookkeeping (`df.instances`, `df.nodes`), + written by `df.start()` in the caller's transaction. +- The **duroxide schema** — the workflow engine's runtime state (its queue and + instance history), which actually makes a workflow run. + +These are two separate systems. Historically `df.start()` wrote the `df` rows in +your transaction but told the engine to run the workflow **on a separate +connection**, which commits independently. That split caused two failure modes: + +1. **Ghost workflow (rollback leak).** You call `df.start()` and then your + transaction rolls back. Your `df` rows vanish — but the engine was already + told to run the workflow on that other connection, so it keeps running with no + `df` record behind it. + +2. **Stuck workflow (lost start).** The `df` rows commit, but the message telling + the engine to run never lands (worker was down, connection dropped). Now there + is a workflow "on paper" in `df.instances` that never executes. + +Both come from the same root cause: **the `df` write and the engine start happen +in two different transactions, with nothing reconciling them afterward.** + +## The approach + +Stop trying to force both writes into one transaction, and stop reaching into the +engine's internal SQL. Instead, treat the committed `df` row as the **source of +truth (the intent)** and let the background worker converge the engine to match, +going only through duroxide's **Rust API** (`Client::start_orchestration`, +`Client::get_orchestration_status`). + +`df.start()` now: + +1. Writes the instance rows **and a `start_input` payload** (the vars snapshot + + label captured at start time) into `df` — all in the caller's transaction. +2. Issues a transactional `NOTIFY pg_durable_start` (delivered **only** if the + transaction commits). +3. Does **not** start the engine itself. + +The background worker converges the engine two ways: + +- **Instant path.** It `LISTEN`s on `pg_durable_start`. On a notification it starts + the just-committed instance immediately (typically single-digit milliseconds). +- **Backstop sweep.** Every `reconcile_interval` seconds it scans for `pending` + instances older than `reconcile_grace` that the engine still does not know about + (`get_orchestration_status == NotFound`) and starts them. This covers a missed + notification (worker restart, dropped connection). + +Why this fixes both modes: + +- **Ghost:** the engine is only ever started by the worker from a *committed* `df` + row. A rolled-back `df.start()` leaves no row and delivers no notification, so + nothing is ever started. No ghost is possible. +- **Stuck:** even if the instant notification is lost, the sweep eventually finds + the committed-but-unstarted row and starts it. Convergence is guaranteed, + including the case where a worker dies mid-start (see "Exactly-once start"). + +### Why go through the Rust API instead of the engine's SQL + +The engine's SQL schema is an internal implementation detail, not its contract. +Its contract is "start an orchestration." Driving it through +`Client::start_orchestration` keeps pg_durable decoupled from how duroxide happens +to store state, and it works against any duroxide provider. The philosophical +framing (per review discussion): duroxide's model is async, at-least-once, +converge-later; Postgres's model is synchronous and transactional. pg_durable +lives on Postgres but follows duroxide's model, so it records intent +transactionally and converges the async engine afterward rather than pretending +the two can share one transaction. + +### Exactly-once start + +The instant path and the sweep can both notice the same instance. Start-once is +guaranteed by an atomic claim: the worker flips the row to `running` with +`UPDATE ... RETURNING start_input`. Only the one caller that wins the claim calls +the engine. If that engine call fails, the claim is rolled back to `pending` so a +later sweep retries. + +The claim commits before the engine start (two systems, two transactions), so a +hard worker crash *between* the claim and the engine start would otherwise strand +the row in `running` forever — a state the `pending`-only sweep could never +recover, silently violating the convergence guarantee. To close this window the +sweep also reconciles **stale claims**: a `running` row the engine does not know +about (`get_orchestration_status == NotFound`) whose claim is older than the grace +is re-claimed and re-started. The `NotFound` gate ensures a genuinely-running +instance is never restarted, and the age guard avoids racing a start that is +merely mid-dispatch. + +### Trust boundary: `start_input` is caller-writable + +`df.instances.start_input` is written by the caller of `df.start()`, and the +orchestration selects which graph to load — and which role runs the SQL — from the +input's instance id. The worker must therefore **never trust the instance id +embedded in the payload**: it forces the engine input's instance id to the claimed +row id, and the orchestration independently uses the durable runtime's instance id +(`ctx.instance_id()`) rather than the payload's. Otherwise a low-privilege user +could INSERT a crafted `pending` row whose `start_input` points at another user's +instance and have the superuser worker run the victim's workflow as the victim. +The persisted vars/label are still replayed (they belong to the caller's own +instance), only the identity is re-derived from trusted state. + +## Schema changes (0.2.4) + +- `df.instances.start_input JSONB` (nullable) — the `FunctionInput` captured at + `df.start()` time so the worker can replay the exact start payload. The worker + overrides the payload's instance id with the claimed row id before replaying it + (see the trust boundary above). Rows written before this column existed replay + with an empty vars set. +- Added to the `df.grant_usage()` / `df.revoke_usage()` INSERT column list; the + upgrade script backfills `GRANT INSERT (start_input)` to existing df-usage roles. + +### GUCs (Postmaster-context) + +| GUC | Default | Purpose | +|-----|---------|---------| +| `pg_durable.reconcile_interval` | `15` | Seconds between backstop sweeps. `0` disables the sweep (instant path stays on). | +| `pg_durable.reconcile_grace` | `300` | Minimum age a pending instance must reach before the sweep starts it. | + +## Upgrade & Migration + +- **Backward compatibility (B1).** The new `.so` must run against un-upgraded + 0.2.2 / 0.2.3 schemas that lack `start_input`. `df.start()` gates on the + installed extension version: `>= 0.2.4` uses the new intent path; older uses the + legacy inline start. The worker checks for the `start_input` column before + running any reconciliation, and re-checks it on each sweep so an in-place + `ALTER EXTENSION UPDATE` (which does not restart the worker) enables + reconciliation mid-epoch. The `LISTEN` is always established because `df.start()` + only sends the notification once the column exists. +- **Upgrade script.** `sql/pg_durable--0.2.3--0.2.4.sql` adds the column (appended + last, matching the fresh-install column order), updates the grant/revoke, and + backfills the column grant to existing df roles. +- **No duroxide schema change** — this feature only uses the duroxide Rust API, so + it requires no changes to the embedded duroxide migrations. + +## Scope + +This change fixes the **start** direction (df has it, engine does not). The +reverse orphan-cleanup direction (engine-has-it / df-does-not) is out of scope. +`df.cancel()` / `df.signal()` remain on the existing client path; as a targeted +exception, the worker re-issues a cancel to the engine if an instance was moved to +`cancelled` between the claim and the start, so a cancel that races the deferred +start is not silently ignored. + +## Testing + +`tests/e2e/sql/25_reconcile_start.sql` exercises the failure modes end-to-end: + +- **Scenario 1 (ghost):** `BEGIN; SELECT df.start(...); ROLLBACK;` then assert the + duroxide runtime has no residue for that instance. (Verified RED on `main`.) +- **Scenario 2 (stuck):** commit a backdated `pending` instance directly (no + notification), then assert the worker's sweep starts and completes it. +- **Scenario 3 (stale claim):** commit a backdated `running` instance the engine + never started (simulating a crash between the claim and the engine start), then + assert the sweep recovers and completes it. + +`tests/e2e/sql/26_reconcile_security.sql` constructs the cross-tenant attack — a +low-privilege role INSERTs a crafted `pending` row whose `start_input` points at a +victim instance and triggers it via `pg_notify` — and asserts the attacker's row +runs its own graph while the victim's protected table is never written. diff --git a/docs/upgrade-testing.md b/docs/upgrade-testing.md index 8510bfe..102f5f8 100644 --- a/docs/upgrade-testing.md +++ b/docs/upgrade-testing.md @@ -254,6 +254,12 @@ what the upgrade script handles, and any backward compatibility considerations. - **Scenario B2 considerations:** No data migration. The `CREATE FUNCTION` adds catalog metadata only; `df.instances` rows are untouched. The new `created_at`/`completed_at` result columns are read from columns that already exist and are already populated on every prior install. - **Dependent-object note:** Because the upgrade only adds a function (no `DROP FUNCTION`), no customer-owned object that depends on the existing two-argument `df.list_instances` is affected — there is nothing to drop or repoint. +#### Start reconciliation — add `df.instances.start_input` (start-convergence) +- **DDL change (df schema):** Adds a nullable `df.instances.start_input JSONB` column that `df.start()` populates with the `FunctionInput` payload (instance id, label, vars snapshot, loop iteration) captured at start time, so the background worker can start the durable engine from committed intent rather than out-of-band on a separate connection. Fresh installs (`src/lib.rs`) declare the column **last** in `df.instances` (after `completed_at`); the upgrade script `sql/pg_durable--0.2.3--0.2.4.sql` runs `ALTER TABLE df.instances ADD COLUMN start_input JSONB`, which also appends it last — so both paths yield identical column ordinals. The column is added to the `df.grant_usage()` / `df.revoke_usage()` INSERT column list on both paths (`df.start()` runs as the caller and writes it), and the upgrade script backfills `GRANT INSERT (start_input)` to every role that already holds column-level INSERT on `df.instances` (enumerated via the `id` column's `attacl`, which every prior `df.grant_usage()` granted) so existing df users can keep calling `df.start()` without re-running `df.grant_usage()`. Two Postmaster GUCs are added (`pg_durable.reconcile_interval` default 15, `pg_durable.reconcile_grace` default 300). No duroxide schema change — reconciliation uses only the duroxide Rust `Client` API. +- **Scenario A considerations:** `ALTER TABLE ... ADD COLUMN start_input JSONB` appends the column at the last ordinal; the fresh-install `CREATE TABLE df.instances` places `start_input` last as well, so the post-upgrade catalog matches a fresh 0.2.4 install column-for-column. The grant/revoke bodies are `CREATE OR REPLACE`d identically on both paths. +- **Scenario B1 considerations:** The new `.so` must run against pre-0.2.4 schemas (0.2.2/0.2.3) that lack `start_input`. `df.start()` gates on the installed extension version (`start_input_supported()` → `>= 0.2.4`): on an older schema it takes the **legacy inline start path** (out-of-band `start_durable_function`, exactly as before), so no reference to the missing column is emitted. The background worker probes for the `start_input` column before running any reconciliation and **re-probes on each sweep tick** (monotonic false→true), so an in-place `ALTER EXTENSION UPDATE` — which does not restart the worker or start a new epoch — enables reconciliation mid-epoch without a restart. The `LISTEN pg_durable_start` is always established (schema-independent); a notification can only arrive once `df.start()` uses the intent path, which only happens after the column exists, so the notify handler never references a missing column. +- **Scenario B2 considerations:** No data migration of existing rows. Pre-upgrade `df.instances` rows have `start_input = NULL`; if such a row is ever reconciled, the worker synthesizes a minimal `{instance_id}` input (empty vars). After `ALTER EXTENSION UPDATE`, a new `df.start()` switches to the intent+NOTIFY path and the running worker starts it via the always-on listener (and the re-probing sweep as backstop). + ### v0.2.2 → v0.2.3 #### Rename duroxide provider schema to `_duroxide` for fresh installs diff --git a/scripts/test-upgrade.sh b/scripts/test-upgrade.sh index 3e05e84..7dee8d0 100755 --- a/scripts/test-upgrade.sh +++ b/scripts/test-upgrade.sh @@ -923,6 +923,7 @@ echo "" B2_PRE_INSTANCE_ID="" B2_INFLIGHT_INSTANCE_ID="" B2_POST_INSTANCE_ID="" +B2_BACKFILL_ROLE="durable_b2_backfill_probe" test_b2_data_survives_upgrade() { # Step 1: Install previous version and create test data @@ -933,6 +934,15 @@ test_b2_data_survives_upgrade() { assert_sql_equals "SELECT df.clearvars();" "OK" || return 1 assert_sql_equals "SELECT df.setvar('b2_key', 'b2_value');" "OK" || return 1 + # Create a NON-superuser df role and grant it df usage on the PREVIOUS + # schema (which has no start_input column), so the 0.2.4 upgrade's grant + # backfill has a pre-existing target. test_b2_start_input_grant_backfill_ + # after_upgrade verifies this role can still call df.start() after upgrade. + run_sql_capture "DROP OWNED BY ${B2_BACKFILL_ROLE};" >/dev/null 2>&1 || true + run_sql_capture "DROP ROLE IF EXISTS ${B2_BACKFILL_ROLE};" >/dev/null 2>&1 || true + run_sql_capture "CREATE ROLE ${B2_BACKFILL_ROLE} LOGIN;" >/dev/null || return 1 + run_sql_capture "SELECT df.grant_usage('${B2_BACKFILL_ROLE}');" >/dev/null || return 1 + B2_PRE_INSTANCE_ID=$(run_sql_capture "SELECT df.start('INSERT INTO test_upgrade_b2_log (kind, msg) VALUES (''pre'', ''{b2_key}'') RETURNING msg', 'b2-pre-upgrade');") || return 1 B2_INFLIGHT_INSTANCE_ID=$(run_sql_capture "SELECT df.start(df.sleep(2) ~> 'SELECT ''b2-running'' AS value', 'b2-inflight');") || return 1 @@ -977,6 +987,23 @@ test_b2_new_data_after_upgrade() { assert_sql_equals "SELECT msg FROM test_upgrade_b2_log WHERE kind = 'post' ORDER BY id DESC LIMIT 1;" "new_value" } +test_b2_start_input_grant_backfill_after_upgrade() { + # The 0.2.4 upgrade adds df.instances.start_input and df.start() switches to + # the intent path (INSERT ... start_input). A role granted df usage BEFORE the + # upgrade only holds INSERT on the old columns, so without the upgrade's grant + # backfill its df.start() would fail with "permission denied for column + # start_input". B2_BACKFILL_ROLE was created and granted df usage on the + # PREVIOUS schema (see test_b2_data_survives_upgrade), so this asserts the + # backfill extended its INSERT privilege to the new column. + assert_sql_equals \ + "SELECT has_column_privilege('${B2_BACKFILL_ROLE}', 'df.instances', 'start_input', 'INSERT');" \ + "t" + local rc=$? + + run_sql_capture "DROP OWNED BY ${B2_BACKFILL_ROLE}; DROP ROLE IF EXISTS ${B2_BACKFILL_ROLE};" >/dev/null 2>&1 || true + return $rc +} + test_b2_grant_usage_after_upgrade() { # Regression guard for #110: after ALTER EXTENSION UPDATE, df.debug_connection() # must be gone from the catalog. Scenario A only compares function name/args/ @@ -1012,6 +1039,7 @@ if [ "$HAS_COMPAT_PREV" = true ]; then run_test "B2: Pre-upgrade instance remains queryable" test_b2_pre_upgrade_instance_after_upgrade run_test "B2: In-flight work completes after upgrade" test_b2_inflight_work_after_upgrade run_test "B2: New data and execution after upgrade" test_b2_new_data_after_upgrade + run_test "B2: Pre-upgrade df role can df.start() after upgrade (start_input grant backfill)" test_b2_start_input_grant_backfill_after_upgrade run_test "B2: df.grant_usage() works and df.debug_connection() is gone after upgrade" test_b2_grant_usage_after_upgrade fi diff --git a/sql/pg_durable--0.2.3--0.2.4.sql b/sql/pg_durable--0.2.3--0.2.4.sql index 8ae8436..eb82ae9 100644 --- a/sql/pg_durable--0.2.3--0.2.4.sql +++ b/sql/pg_durable--0.2.3--0.2.4.sql @@ -89,7 +89,7 @@ BEGIN EXECUTE pg_catalog.format('GRANT SELECT ON df.instances TO %I', p_role) OPERATOR(pg_catalog.||) grant_opt; EXECUTE pg_catalog.format('GRANT UPDATE (status, updated_at) ON df.instances TO %I', p_role) OPERATOR(pg_catalog.||) grant_opt; EXECUTE pg_catalog.format('GRANT SELECT ON df.nodes TO %I', p_role) OPERATOR(pg_catalog.||) grant_opt; - EXECUTE pg_catalog.format('GRANT INSERT (id, label, root_node, submitted_by, database) ON df.instances TO %I', p_role) OPERATOR(pg_catalog.||) grant_opt; + EXECUTE pg_catalog.format('GRANT INSERT (id, label, root_node, submitted_by, database, start_input) ON df.instances TO %I', p_role) OPERATOR(pg_catalog.||) grant_opt; EXECUTE pg_catalog.format('GRANT INSERT (id, instance_id, node_type, query, result_name, left_node, right_node, submitted_by, database) ON df.nodes TO %I', p_role) OPERATOR(pg_catalog.||) grant_opt; EXECUTE pg_catalog.format('GRANT SELECT, INSERT, UPDATE, DELETE ON df.vars TO %I', p_role) OPERATOR(pg_catalog.||) grant_opt; @@ -154,7 +154,7 @@ BEGIN EXECUTE pg_catalog.format('REVOKE SELECT, INSERT, UPDATE, DELETE ON df.vars FROM %I CASCADE', p_role); EXECUTE pg_catalog.format('REVOKE INSERT (id, instance_id, node_type, query, result_name, left_node, right_node, submitted_by, database) ON df.nodes FROM %I CASCADE', p_role); EXECUTE pg_catalog.format('REVOKE SELECT ON df.nodes FROM %I CASCADE', p_role); - EXECUTE pg_catalog.format('REVOKE INSERT (id, label, root_node, submitted_by, database) ON df.instances FROM %I CASCADE', p_role); + EXECUTE pg_catalog.format('REVOKE INSERT (id, label, root_node, submitted_by, database, start_input) ON df.instances FROM %I CASCADE', p_role); EXECUTE pg_catalog.format('REVOKE UPDATE (status, updated_at) ON df.instances FROM %I CASCADE', p_role); EXECUTE pg_catalog.format('REVOKE SELECT ON df.instances FROM %I CASCADE', p_role); @@ -288,6 +288,62 @@ COMMENT ON COLUMN df.nodes.status_details IS '"execution_id": the orchestration instance_id::execution_id stamp recorded when the node last ' 'transitioned. df.instance_nodes() parses it to derive pending/skipped statuses; see USER_GUIDE.md.'; +-- ============================================================================ +-- Persisted start payload: add df.instances.start_input (start-convergence). +-- +-- df.start() now records the workflow's start input (the vars snapshot captured +-- at start time, plus label) in the SAME transaction that writes df.instances, +-- instead of calling the durable engine out-of-band on a separate connection. +-- The background worker starts the engine from this committed intent, so a +-- rolled-back df.start() leaves no engine "ghost" and a committed instance is +-- always eventually started. The column is nullable: rows written before this +-- upgrade have no snapshot, and the worker best-effort starts those with an +-- empty vars set. +-- +-- The column IS added to the user INSERT grant (df.start runs as the caller and +-- writes it). Backward compatibility (Scenario B1): the new .so gates the intent +-- path on the presence of this column (installed extension version >= 0.2.4) and +-- otherwise falls back to the pre-0.2.4 out-of-band start, so an un-upgraded +-- schema keeps working until this upgrade applies. +-- ============================================================================ +ALTER TABLE df.instances ADD COLUMN start_input JSONB; + +COMMENT ON COLUMN df.instances.start_input IS + 'FunctionInput JSON captured by df.start() (instance_id, label, vars snapshot, loop_iteration). ' + 'The background worker replays it to start the durable engine for this instance; see USER_GUIDE.md.'; + +-- Backfill the new column-level INSERT privilege to exactly the roles that +-- already hold column-level INSERT on df.instances (every df.grant_usage() call +-- grants INSERT on the id column), so existing df users can keep calling +-- df.start() after the new .so switches to the intent path -- without re-running +-- df.grant_usage(). Keying off the id column's attacl (rather than schema USAGE) +-- targets precisely the df.start() callers and preserves each grant's +-- grantability. +DO $$ +DECLARE + r RECORD; + grant_opt TEXT; +BEGIN + FOR r IN + SELECT + pg_catalog.quote_ident(pg_catalog.pg_get_userbyid(a.grantee)) AS grantee, + pg_catalog.bool_or(a.is_grantable) AS with_grant_option + FROM pg_catalog.pg_attribute att + JOIN pg_catalog.pg_class c ON c.oid OPERATOR(pg_catalog.=) att.attrelid + JOIN pg_catalog.pg_namespace n ON n.oid OPERATOR(pg_catalog.=) c.relnamespace + CROSS JOIN LATERAL pg_catalog.aclexplode(att.attacl) AS a + WHERE n.nspname OPERATOR(pg_catalog.=) 'df' + AND c.relname OPERATOR(pg_catalog.=) 'instances' + AND att.attname OPERATOR(pg_catalog.=) 'id' + AND a.privilege_type OPERATOR(pg_catalog.=) 'INSERT' + AND a.grantee OPERATOR(pg_catalog.<>) 0 -- skip PUBLIC + GROUP BY a.grantee + LOOP + grant_opt := CASE WHEN r.with_grant_option THEN ' WITH GRANT OPTION' ELSE '' END; + EXECUTE pg_catalog.format('GRANT INSERT (start_input) ON df.instances TO %s', r.grantee) OPERATOR(pg_catalog.||) grant_opt; + END LOOP; +END $$; + -- ============================================================================ -- df.instance_nodes(): one row per node with derived status (PR #263). -- diff --git a/src/dsl.rs b/src/dsl.rs index 9a3ef22..85f082c 100644 --- a/src/dsl.rs +++ b/src/dsl.rs @@ -129,6 +129,19 @@ fn legacy_login_role_schema() -> bool { !owner_scoped_vars_enabled() } +/// Returns true when the installed schema has `df.instances.start_input` +/// (added in 0.2.4). When true, `df.start()` records the start payload in the +/// caller's transaction and lets the background worker start the engine from it +/// (the intent path). When false — a pre-0.2.4 schema reached by a newer `.so` +/// before `ALTER EXTENSION UPDATE` — `df.start()` falls back to the previous +/// out-of-band start so the extension keeps working until the upgrade applies. +fn start_input_supported() -> bool { + let extversion = installed_extension_version(); + parse_semver(&extversion) + .map(|v| v >= (0, 2, 4)) + .unwrap_or(false) +} + /// Sets a workflow variable. Must be called BEFORE df.start(), not inside a workflow. /// Variables are captured at df.start() and remain immutable during execution. /// Each user has their own variable namespace (owner = current_user). @@ -940,6 +953,37 @@ pub fn start( let legacy_login_role = legacy_login_role_schema(); + // When the installed schema has df.instances.start_input (>= 0.2.4) we take + // the "intent" path: persist the start payload on the instance row in this + // (the caller's) transaction and let the background worker start the engine + // from it after commit. On an older schema we fall back to the pre-0.2.4 + // out-of-band start at the end of this function. + let persist_start_input = start_input_supported(); + + // Capture vars from df.vars using the installed extension version as the + // compatibility boundary: pre-0.2.0 uses legacy global vars, 0.2.0+ uses + // owner-scoped vars. Captured before the instance row is inserted so the + // snapshot can be persisted as start_input in the same INSERT. + let vars_query = if owner_scoped_vars_enabled() { + "SELECT name, value FROM df.vars WHERE owner = quote_ident(current_user)::regrole" + } else { + "SELECT name, value FROM df.vars" + }; + + let vars: std::collections::HashMap = Spi::connect(|client| { + let mut vars = std::collections::HashMap::new(); + if let Ok(table) = client.select(vars_query, None, &[]) { + for row in table { + if let (Ok(Some(name)), Ok(Some(value))) = + (row.get::(1), row.get::(2)) + { + vars.insert(name, value); + } + } + } + vars + }); + // Pre-generate the root node's ID so the instance row can point at it up // front. The same-instance FK on root_node is DEFERRABLE INITIALLY // DEFERRED, so the referenced node row need not exist yet — insert_nodes @@ -984,6 +1028,32 @@ pub fn start( database_arg, ], ) + } else if persist_start_input { + // Intent path (>= 0.2.4): persist the start payload — the vars + // snapshot captured above plus the label — as start_input so the + // worker can start the engine for this instance after commit. + let input = FunctionInput { + instance_id: candidate.to_string(), + label: label.map(|s| s.to_string()), + vars: vars.clone(), + loop_iteration: 0, + }; + let start_input_json = + serde_json::to_string(&input).unwrap_or_else(|_| candidate.to_string()); + ( + "INSERT INTO df.instances (id, label, root_node, submitted_by, database, start_input) + VALUES ($1, $2, $3, $4::oid::regrole, $5, $6::jsonb) + ON CONFLICT (id) DO NOTHING + RETURNING id", + vec![ + candidate.into(), + label_arg, + root_id.as_str().into(), + current_user_oid.into(), + database_arg, + start_input_json.into(), + ], + ) } else { ( "INSERT INTO df.instances (id, label, root_node, submitted_by, database) @@ -1028,47 +1098,39 @@ pub fn start( &mut node_count, ); - // Capture vars from df.vars using the installed extension version as the - // compatibility boundary: pre-0.2.0 uses legacy global vars, 0.2.0+ uses - // owner-scoped vars. - let vars_query = if owner_scoped_vars_enabled() { - "SELECT name, value FROM df.vars WHERE owner = quote_ident(current_user)::regrole" + if persist_start_input { + // Intent path (>= 0.2.4): the start payload is already committed on the + // instance row. Nudge the worker to start the engine immediately via a + // transactional NOTIFY — delivered only if this transaction commits, so + // a rolled-back df.start() leaves no engine "ghost". If the NOTIFY is + // missed (worker not listening, restart), the periodic reconcile sweep + // starts the instance from its committed start_input as a backstop. + if let Err(e) = Spi::run_with_args( + "SELECT pg_notify('pg_durable_start', $1)", + &[instance_id.as_str().into()], + ) { + pgrx::log!("pg_durable: Warning - failed to notify start of {instance_id}: {e}"); + } } else { - "SELECT name, value FROM df.vars" - }; + // Legacy path (< 0.2.4 schema, no start_input column): start the engine + // out-of-band on a separate connection, as before. This can leave a + // ghost if the caller's transaction rolls back, but it preserves + // behavior on a schema that has not yet been upgraded (Scenario B1). + let input = FunctionInput { + instance_id: instance_id.clone(), + label: label.map(|s| s.to_string()), + vars, + loop_iteration: 0, + }; + let input_json = serde_json::to_string(&input).unwrap_or_else(|_| instance_id.clone()); - let vars: std::collections::HashMap = Spi::connect(|client| { - let mut vars = std::collections::HashMap::new(); - if let Ok(table) = client.select(vars_query, None, &[]) { - for row in table { - if let (Ok(Some(name)), Ok(Some(value))) = - (row.get::(1), row.get::(2)) - { - vars.insert(name, value); - } - } + if let Err(e) = start_durable_function( + crate::orchestrations::execute_function_graph::NAME, + &instance_id, + &input_json, + ) { + pgrx::log!("pg_durable: Warning - failed to start durable function: {e}"); } - vars - }); - - // Start the orchestration via duroxide - let input = FunctionInput { - instance_id: instance_id.clone(), - label: label.map(|s| s.to_string()), - vars, - loop_iteration: 0, - }; - let input_json = serde_json::to_string(&input).unwrap_or(instance_id.clone()); - - if let Err(e) = start_durable_function( - crate::orchestrations::execute_function_graph::NAME, - &instance_id, - &input_json, - ) { - pgrx::log!( - "pg_durable: Warning - failed to start durable function: {}", - e - ); } instance_id diff --git a/src/lib.rs b/src/lib.rs index 14c6f8c..76dab4c 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -29,6 +29,16 @@ pub static EXECUTION_ACQUIRE_TIMEOUT: GucSetting = GucSetting::::new(3 /// functions are explicitly desired. See docs/superuser_guc.md. pub static ENABLE_SUPERUSER_INSTANCES: GucSetting = GucSetting::::new(false); +/// How often (seconds) the background worker sweeps for committed `df.instances` +/// that the durable engine has not started yet, and starts them. `0` disables +/// the periodic sweep (the instant `NOTIFY`-driven path still runs). +pub static RECONCILE_INTERVAL: GucSetting = GucSetting::::new(15); + +/// Minimum age (seconds) a pending `df.instances` row must reach before the +/// periodic sweep will start it. Keeps the sweep from racing the instant +/// `NOTIFY` path for freshly-committed instances. +pub static RECONCILE_GRACE: GucSetting = GucSetting::::new(300); + // Module declarations pub mod activities; pub mod client; @@ -127,6 +137,28 @@ pub extern "C-unwind" fn _PG_init() { GucFlags::default(), ); + GucRegistry::define_int_guc( + c"pg_durable.reconcile_interval", + c"Seconds between background sweeps that start committed df.instances the engine hasn't started yet (0 disables the sweep)", + c"The instant NOTIFY-driven start path always runs; this sweep is the backstop that starts any pending instance whose start notification was missed.", + &RECONCILE_INTERVAL, + 0, + 86400, + GucContext::Postmaster, + GucFlags::default(), + ); + + GucRegistry::define_int_guc( + c"pg_durable.reconcile_grace", + c"Minimum age in seconds a pending df.instances row must reach before the sweep starts it", + c"Prevents the periodic sweep from racing the instant NOTIFY start path for freshly-committed instances.", + &RECONCILE_GRACE, + 0, + 86400, + GucContext::Postmaster, + GucFlags::default(), + ); + GucRegistry::define_bool_guc( c"pg_durable.enable_superuser_instances", c"Allow pg_durable instances whose submitted_by role is a PostgreSQL superuser", @@ -220,7 +252,8 @@ CREATE TABLE df.instances ( database TEXT, created_at TIMESTAMPTZ DEFAULT pg_catalog.now(), updated_at TIMESTAMPTZ DEFAULT pg_catalog.now(), - completed_at TIMESTAMPTZ + completed_at TIMESTAMPTZ, + start_input JSONB ); COMMENT ON COLUMN df.instances.submitted_by IS @@ -451,7 +484,7 @@ BEGIN EXECUTE pg_catalog.format('GRANT SELECT ON df.instances TO %I', p_role) OPERATOR(pg_catalog.||) grant_opt; EXECUTE pg_catalog.format('GRANT UPDATE (status, updated_at) ON df.instances TO %I', p_role) OPERATOR(pg_catalog.||) grant_opt; EXECUTE pg_catalog.format('GRANT SELECT ON df.nodes TO %I', p_role) OPERATOR(pg_catalog.||) grant_opt; - EXECUTE pg_catalog.format('GRANT INSERT (id, label, root_node, submitted_by, database) ON df.instances TO %I', p_role) OPERATOR(pg_catalog.||) grant_opt; + EXECUTE pg_catalog.format('GRANT INSERT (id, label, root_node, submitted_by, database, start_input) ON df.instances TO %I', p_role) OPERATOR(pg_catalog.||) grant_opt; EXECUTE pg_catalog.format('GRANT INSERT (id, instance_id, node_type, query, result_name, left_node, right_node, submitted_by, database) ON df.nodes TO %I', p_role) OPERATOR(pg_catalog.||) grant_opt; EXECUTE pg_catalog.format('GRANT SELECT, INSERT, UPDATE, DELETE ON df.vars TO %I', p_role) OPERATOR(pg_catalog.||) grant_opt; @@ -501,7 +534,7 @@ BEGIN EXECUTE pg_catalog.format('REVOKE SELECT, INSERT, UPDATE, DELETE ON df.vars FROM %I CASCADE', p_role); EXECUTE pg_catalog.format('REVOKE INSERT (id, instance_id, node_type, query, result_name, left_node, right_node, submitted_by, database) ON df.nodes FROM %I CASCADE', p_role); EXECUTE pg_catalog.format('REVOKE SELECT ON df.nodes FROM %I CASCADE', p_role); - EXECUTE pg_catalog.format('REVOKE INSERT (id, label, root_node, submitted_by, database) ON df.instances FROM %I CASCADE', p_role); + EXECUTE pg_catalog.format('REVOKE INSERT (id, label, root_node, submitted_by, database, start_input) ON df.instances FROM %I CASCADE', p_role); EXECUTE pg_catalog.format('REVOKE UPDATE (status, updated_at) ON df.instances FROM %I CASCADE', p_role); EXECUTE pg_catalog.format('REVOKE SELECT ON df.instances FROM %I CASCADE', p_role); diff --git a/src/orchestrations/execute_function_graph.rs b/src/orchestrations/execute_function_graph.rs index d5752af..055471d 100644 --- a/src/orchestrations/execute_function_graph.rs +++ b/src/orchestrations/execute_function_graph.rs @@ -112,9 +112,18 @@ struct SubtreeEnvelope { /// rather than completing with a control-flow value. Callers should treat the returned /// `Err` strictly as a failure and must not add retry/recovery logic for break. pub async fn execute(ctx: OrchestrationContext, input_json: String) -> Result { - let input: FunctionInput = serde_json::from_str(&input_json) + let mut input: FunctionInput = serde_json::from_str(&input_json) .map_err(|e| format!("Invalid orchestration input: {e}"))?; + // Trust the durable runtime's instance id, never the (caller-writable) + // instance id embedded in the input payload. df.instances.start_input is + // written by the caller of df.start(), and the background worker replays it + // to start this orchestration; a forged instance_id in that payload must not + // be able to redirect graph/identity loading to a different instance. For + // every legitimate start the two ids are identical, so this is a no-op on the + // happy path and replay-safe for in-flight instances. + input.instance_id = ctx.instance_id(); + let label_info = input .label .as_ref() diff --git a/src/types.rs b/src/types.rs index b0ba264..622f5a7 100644 --- a/src/types.rs +++ b/src/types.rs @@ -57,6 +57,17 @@ pub fn get_execution_acquire_timeout() -> Duration { Duration::from_secs(crate::EXECUTION_ACQUIRE_TIMEOUT.get() as u64) } +/// Interval between background sweeps that start committed-but-unstarted +/// instances. Zero disables the periodic sweep (the instant NOTIFY path stays on). +pub fn get_reconcile_interval() -> Duration { + Duration::from_secs(crate::RECONCILE_INTERVAL.get() as u64) +} + +/// Minimum age a pending instance must reach before the periodic sweep starts it. +pub fn get_reconcile_grace() -> Duration { + Duration::from_secs(crate::RECONCILE_GRACE.get() as u64) +} + /// Returns `true` when superuser-submitted instances are permitted. pub fn superuser_instances_enabled() -> bool { crate::ENABLE_SUPERUSER_INSTANCES.get() diff --git a/src/worker.rs b/src/worker.rs index 2bd3bf7..e7b99e1 100644 --- a/src/worker.rs +++ b/src/worker.rs @@ -12,13 +12,15 @@ use std::sync::Arc; use std::time::Duration; use duroxide::runtime; +use duroxide::Client; use duroxide_pg::PostgresProvider; use tracing_subscriber::EnvFilter; use crate::registry::{create_activity_registry, create_orchestration_registry}; use crate::types::{ get_max_duroxide_connections, get_max_management_connections, get_max_user_connections, - postgres_connection_string, resolve_duroxide_schema_pool, worker_provider_config, + get_reconcile_grace, get_reconcile_interval, postgres_connection_string, + resolve_duroxide_schema_pool, worker_provider_config, }; const TERMINAL_INSTANCE_PRUNE_INTERVAL: Duration = Duration::from_secs(60 * 60); @@ -211,7 +213,7 @@ async fn run_duroxide_runtime() { duroxide_schema ); - let Some(duroxide_runtime) = initialize_duroxide_runtime( + let Some((duroxide_runtime, duroxide_store)) = initialize_duroxide_runtime( &pg_conn_str, INIT_RETRY_INTERVAL, &mgmt_pool, @@ -247,6 +249,7 @@ async fn run_duroxide_runtime() { &poll_pool, &mgmt_pool, duroxide_runtime, + duroxide_store, EXTENSION_DROP_POLL_INTERVAL, SHUTDOWN_CHECK_INTERVAL, epoch_id.as_deref(), @@ -470,7 +473,7 @@ async fn initialize_duroxide_runtime( retry_interval: Duration, mgmt_pool: &sqlx::PgPool, schema_name: &str, -) -> Option> { +) -> Option<(Arc, Arc)> { log!("pg_durable: initializing duroxide runtime..."); // Control duroxide provider pool size via env var (the only mechanism @@ -553,11 +556,16 @@ async fn initialize_duroxide_runtime( create_activity_registry(Arc::new(mgmt_pool.clone()), user_semaphore.clone()); let orchestrations = create_orchestration_registry(); + // Keep a clone of the store Arc so the worker can build a duroxide + // Client (for start reconciliation) that shares the same provider as + // the runtime. Client::new is cheap — it just wraps the Arc. + let store_for_client = store.clone(); + let duroxide_runtime = runtime::Runtime::start_with_store(store, activities, orchestrations).await; log!("pg_durable: duroxide runtime started"); - return Some(duroxide_runtime); + return Some((duroxide_runtime, store_for_client)); } } @@ -743,6 +751,7 @@ async fn run_until_extension_dropped_or_shutdown( poll_pool: &sqlx::PgPool, maintenance_pool: &sqlx::PgPool, duroxide_runtime: Arc, + duroxide_store: Arc, drop_poll_interval: Duration, shutdown_check_interval: Duration, epoch_id: Option<&str>, @@ -758,6 +767,61 @@ async fn run_until_extension_dropped_or_shutdown( ); prune_check.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay); + // Start reconciliation converges df.instances with the durable engine: + // df.start() records the instance (with its start payload) and commits, then + // the engine is started separately from that committed intent. This keeps the + // two in sync without a shared transaction — a rolled-back df.start() leaves + // no engine "ghost", and a committed instance is always eventually started. + // + // Reconciliation only acts once the schema has df.instances.start_input (>= + // 0.2.4). On an older schema df.start() starts the engine inline instead + // (Scenario B1), so there is nothing to reconcile. That schema check is made + // dynamically (not cached for the epoch) because ALTER EXTENSION UPDATE can + // add the column mid-epoch without restarting this worker. + let client = Client::new(duroxide_store.clone()); + let reconcile_interval = get_reconcile_interval(); + let sweep_configured = !reconcile_interval.is_zero(); + // Monotonic false->true: once the column appears it never goes away within a + // major version, so we stop re-probing after the first positive result. + let mut start_input_present = has_start_input_column(maintenance_pool).await; + + // The instant path: a transactional NOTIFY from df.start() (delivered only on + // commit) wakes us to start the just-committed instance immediately. LISTEN is + // schema-independent, so we always establish it — df.start() only sends the + // notification once the column exists, so a notification implies it is safe to + // start (this also covers an in-place ALTER EXTENSION UPDATE mid-epoch). + let mut start_listener = connect_start_listener(maintenance_pool).await; + + // Catch-up sweep: a NOTIFY is not durable, so any df.start() that committed + // while the worker was down (or before the listener was established) has no + // pending notification. Run one immediate sweep with zero grace so those + // already-committed rows start promptly instead of waiting out the grace + // period; there is no concurrent instant path racing these pre-existing rows + // at startup, and the atomic claim still makes it exactly-once against any + // notification that does arrive. + if start_input_present { + let (started, _) = + reconcile_pending_starts(maintenance_pool, &client, Duration::ZERO).await; + if started > 0 { + log!("pg_durable: startup reconcile started {started} pending instance(s)"); + } + } + + // The backstop tick: fires on a fixed cadence to (1) re-establish the start + // listener if it has dropped (so the instant path self-heals after a + // transient listener error) and (2) run the periodic sweep. The tick fires + // even when the sweep is disabled (reconcile_interval = 0) so the listener is + // still rebuilt — honoring the GUC contract that "0 disables the sweep, the + // instant NOTIFY path still runs". + let tick_interval = if reconcile_interval.is_zero() { + LISTENER_RECHECK_INTERVAL + } else { + reconcile_interval + }; + let mut reconcile_check = + tokio::time::interval_at(tokio::time::Instant::now() + tick_interval, tick_interval); + reconcile_check.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay); + loop { tokio::select! { _ = tokio::time::sleep(shutdown_check_interval) => { @@ -792,6 +856,48 @@ async fn run_until_extension_dropped_or_shutdown( } } } + notification = recv_start_notification(&mut start_listener), if start_listener.is_some() => { + match notification { + Some(instance_id) => { + match try_start_instance(maintenance_pool, &client, &instance_id, get_reconcile_grace()).await { + Ok(true) => log!("pg_durable: started instance {instance_id} (notify)"), + Ok(false) => {} + Err(e) => log!( + "pg_durable: notify-start of instance {instance_id} failed \ + (sweep will retry): {e}" + ), + } + } + None => { + // recv error: drop the listener; the reconcile tick below + // re-establishes it so the instant path recovers. + log!("pg_durable: start-notification listener closed; will re-establish"); + start_listener = None; + } + } + } + _ = reconcile_check.tick() => { + // Re-establish the instant path if the listener dropped on a + // transient recv error, so it recovers even when the sweep is off. + if start_listener.is_none() { + start_listener = connect_start_listener(maintenance_pool).await; + } + if sweep_configured { + // Re-probe until the column appears so an in-place ALTER EXTENSION + // UPDATE mid-epoch enables the sweep without a worker restart. + if !start_input_present { + start_input_present = has_start_input_column(maintenance_pool).await; + } + if start_input_present { + match reconcile_pending_starts(maintenance_pool, &client, get_reconcile_grace()).await { + (started, _) if started > 0 => { + log!("pg_durable: reconcile sweep started {started} pending instance(s)"); + } + _ => {} + } + } + } + } } } @@ -799,3 +905,278 @@ async fn run_until_extension_dropped_or_shutdown( duroxide_runtime.shutdown(Some(10_000)).await; log!("pg_durable: duroxide runtime shutdown complete"); } + +/// Channel used by `df.start()` (via `pg_notify`) to wake the worker so it can +/// start a just-committed instance immediately. +const START_NOTIFY_CHANNEL: &str = "pg_durable_start"; + +/// How often the backstop tick fires when the periodic sweep is disabled +/// (`reconcile_interval = 0`). The tick still runs at this cadence purely to +/// re-establish the start listener if it dropped, so the instant NOTIFY path +/// self-heals even with the sweep turned off. +const LISTENER_RECHECK_INTERVAL: Duration = Duration::from_secs(30); + +/// Open a dedicated connection and LISTEN on the start-notification channel. +/// Returns `None` (with a log) if the connection or LISTEN fails; the caller +/// falls back to the periodic sweep and retries the connection on the next tick. +async fn connect_start_listener(pool: &sqlx::PgPool) -> Option { + match sqlx::postgres::PgListener::connect_with(pool).await { + Ok(mut listener) => match listener.listen(START_NOTIFY_CHANNEL).await { + Ok(()) => { + log!("pg_durable: listening for start notifications on '{START_NOTIFY_CHANNEL}'"); + Some(listener) + } + Err(e) => { + log!( + "pg_durable: failed to LISTEN {START_NOTIFY_CHANNEL} (sweep still active): {e}" + ); + None + } + }, + Err(e) => { + log!( + "pg_durable: failed to open start-notification listener (sweep still active): {e}" + ); + None + } + } +} + +/// Await the next start notification, or park forever when there is no listener. +/// +/// Returns `Some(payload)` for a notification (the instance id), or `None` when +/// the listener connection errors — the caller then drops it and the reconcile +/// tick re-establishes it. The `None` (no-listener) arm is only reached when the +/// select! precondition wrongly lets it run; it parks so it never busy-loops. +async fn recv_start_notification( + listener: &mut Option, +) -> Option { + match listener { + Some(l) => match l.recv().await { + Ok(notification) => Some(notification.payload().to_string()), + Err(e) => { + log!("pg_durable: start-notification recv error: {e}"); + None + } + }, + None => std::future::pending().await, + } +} + +/// Returns true when the installed schema has `df.instances.start_input` +/// (added in 0.2.4). Start reconciliation is gated on this: an older schema has +/// no start payload to replay and df.start() starts the engine inline instead. +async fn has_start_input_column(pool: &sqlx::PgPool) -> bool { + match sqlx::query_scalar::<_, bool>( + "SELECT EXISTS(SELECT 1 FROM information_schema.columns + WHERE table_schema = 'df' AND table_name = 'instances' + AND column_name = 'start_input')", + ) + .fetch_one(pool) + .await + { + Ok(present) => present, + Err(e) => { + log!("pg_durable: could not probe df.instances.start_input (reconcile disabled): {e}"); + false + } + } +} + +/// Maximum instances one reconcile sweep tick processes, so a large backlog +/// cannot monopolize the worker's select! loop (starving shutdown/drop +/// detection). Remaining rows are picked up on subsequent ticks. +const RECONCILE_SWEEP_BATCH: i64 = 256; + +/// Periodic backstop. Starts (a) freshly-committed `pending` instances older than +/// `grace` whose start NOTIFY was missed, and (b) stale `running` instances whose +/// claim is older than `grace` but which the engine never actually started (a +/// worker that died between claiming and starting — see `try_start_instance`). +/// The `get_orchestration_status == NotFound` gate inside `try_start_instance` +/// ensures a genuinely-running instance is never restarted. Returns +/// `(started, errors)`. +async fn reconcile_pending_starts( + pool: &sqlx::PgPool, + client: &Client, + grace: Duration, +) -> (u64, u64) { + let grace_secs = grace.as_secs() as f64; + let ids: Vec = match sqlx::query_scalar( + "SELECT id FROM df.instances + WHERE (status = 'pending' AND created_at < now() - make_interval(secs => $1)) + OR (status = 'running' AND updated_at < now() - make_interval(secs => $1)) + ORDER BY created_at + LIMIT $2", + ) + .bind(grace_secs) + .bind(RECONCILE_SWEEP_BATCH) + .fetch_all(pool) + .await + { + Ok(ids) => ids, + Err(e) => { + log!("pg_durable: reconcile sweep query failed: {e}"); + return (0, 0); + } + }; + + let mut started = 0u64; + let mut errors = 0u64; + for id in ids { + // Yield the select! loop promptly on shutdown instead of draining a + // potentially large backlog first. + if is_shutdown_requested() { + break; + } + match try_start_instance(pool, client, &id, grace).await { + Ok(true) => started += 1, + Ok(false) => {} + Err(e) => { + errors += 1; + log!("pg_durable: reconcile sweep could not start instance {id}: {e}"); + } + } + } + (started, errors) +} + +/// Build the durable-engine input for `instance_id` from its persisted +/// `start_input`, **forcing** the input's instance id to `instance_id`. +/// +/// `df.instances.start_input` is caller-writable, and the orchestration selects +/// which graph to load and which role to run as from the input's instance id, so +/// the worker must never trust the payload's own id — otherwise a caller could +/// craft a row whose payload points at another user's instance and have the +/// superuser worker start it. Legacy rows (NULL payload) start with an empty +/// vars set. +fn build_start_input(instance_id: &str, start_input: Option) -> String { + let mut value = match start_input { + Some(v @ serde_json::Value::Object(_)) => v, + _ => serde_json::json!({}), + }; + if let serde_json::Value::Object(ref mut map) = value { + map.insert( + "instance_id".to_string(), + serde_json::Value::String(instance_id.to_string()), + ); + } + value.to_string() +} + +/// If `instance_id` was moved to the terminal `cancelled` state between the claim +/// and the engine start, cancel the engine instance too so df and the engine +/// converge. `df.cancel()` enqueues its cancel before flipping the row, and that +/// cancel can be dropped as an orphan if it reaches the engine before any history +/// exists; without this the workflow would run to completion while df reads +/// `cancelled`. +async fn reconcile_cancel_if_needed(pool: &sqlx::PgPool, client: &Client, instance_id: &str) { + let cancelled: bool = + sqlx::query_scalar("SELECT status = 'cancelled' FROM df.instances WHERE id = $1") + .bind(instance_id) + .fetch_optional(pool) + .await + .ok() + .flatten() + .unwrap_or(false); + + if cancelled { + if let Err(e) = client + .cancel_instance(instance_id, "cancelled before start (reconciled)") + .await + { + log!("pg_durable: failed to reconcile cancel for instance {instance_id}: {e:?}"); + } + } +} + +/// Start a single instance in the durable engine, exactly once. +/// +/// Returns `Ok(true)` when this call started the engine, `Ok(false)` when there +/// was nothing to do (the engine already knows the instance, or another +/// caller/sweep claimed it first), and `Err` on a provider/database error. +/// +/// Single-start is guaranteed by an atomic claim: the row is flipped to +/// `running` with `RETURNING`, so only the one caller that wins the claim starts +/// the engine. Two cases are claimable: +/// - a fresh `pending` row (the normal instant/backstop start), and +/// - a `running` row the engine does not know about whose claim is older than +/// `stale_running_grace` — a stale claim left by a worker that died between the +/// claim and the engine start. Re-claiming it retries the start, closing the +/// crash-between-claim-and-start window (the claim commits before the engine +/// enqueue, so a hard crash in that gap would otherwise strand the row in +/// `running` forever, which the `pending`-only sweep could never recover). +/// +/// If the engine start fails, the claim is rolled back to `pending` so a later +/// sweep can retry. +async fn try_start_instance( + pool: &sqlx::PgPool, + client: &Client, + instance_id: &str, + stale_running_grace: Duration, +) -> Result { + use duroxide::OrchestrationStatus; + + // Does the engine already know this instance? Only start ones it does not + // (NotFound) — this skips instances the engine is already running or has + // finished (including any started via the legacy inline path), so a + // genuinely-running `running` row is never restarted by the stale-claim path. + match client.get_orchestration_status(instance_id).await { + Ok(OrchestrationStatus::NotFound) => {} + Ok(_) => return Ok(false), + Err(e) => return Err(format!("status probe failed: {e:?}")), + } + + // Atomically claim the row (see the doc comment for the two claimable cases). + let stale_secs = stale_running_grace.as_secs() as f64; + let claimed: Option> = sqlx::query_scalar( + "UPDATE df.instances SET status = 'running', updated_at = now() + WHERE id = $1 + AND (status = 'pending' + OR (status = 'running' + AND updated_at < now() - make_interval(secs => $2))) + RETURNING start_input", + ) + .bind(instance_id) + .bind(stale_secs) + .fetch_optional(pool) + .await + .map_err(|e| format!("claim failed: {e}"))?; + + let Some(start_input) = claimed else { + // Lost the race, or the row is no longer startable (e.g. cancelled). + return Ok(false); + }; + + let input_json = build_start_input(instance_id, start_input); + + match client + .start_orchestration( + instance_id, + crate::orchestrations::execute_function_graph::NAME, + input_json, + ) + .await + { + Ok(()) => { + reconcile_cancel_if_needed(pool, client, instance_id).await; + Ok(true) + } + Err(e) => { + // Roll the claim back so a later sweep can retry this instance. + if let Err(revert) = sqlx::query( + "UPDATE df.instances SET status = 'pending', updated_at = now() + WHERE id = $1 AND status = 'running'", + ) + .bind(instance_id) + .execute(pool) + .await + { + log!( + "pg_durable: failed to roll back claim for instance {instance_id} \ + after a failed start: {revert}" + ); + } + Err(format!("start_orchestration failed: {e:?}")) + } + } +} diff --git a/tests/e2e/sql/25_reconcile_start.sql b/tests/e2e/sql/25_reconcile_start.sql new file mode 100644 index 0000000..f0c9f0a --- /dev/null +++ b/tests/e2e/sql/25_reconcile_start.sql @@ -0,0 +1,157 @@ +-- Copyright (c) Microsoft Corporation. +-- Licensed under the PostgreSQL License. + +-- Tests the core df <-> duroxide start-convergence problem this change fixes. +-- +-- pg_durable keeps its own bookkeeping in the `df` schema; the workflow engine +-- (duroxide) keeps the running-workflow state in its own schema. `df.start()` +-- writes df rows in the caller's transaction, but the two schemas are separate +-- systems, so historically they could disagree in two ways: +-- +-- Scenario 1 (ghost): you start a workflow, then your transaction rolls back. +-- The df rows vanish, but duroxide was already told to run it out-of-band on +-- a separate connection -> a running workflow with no df record. +-- +-- Scenario 2 (stuck): the df rows are committed, but duroxide never learned it +-- should run -> a workflow that exists on paper but never executes. +-- +-- The fix records intent in df and lets the background worker converge duroxide +-- to match: a rolled-back start tells duroxide nothing, and a committed-but-not- +-- yet-running instance is started by the worker. +-- +-- The base connection is the superuser (postgres); durable functions run as the +-- non-superuser df_e2e_user. The runtime schema is resolved via +-- df.duroxide_schema(). + +-- Helper (superuser): assert no residual runtime rows for an instance id. +CREATE OR REPLACE FUNCTION pg_temp.assert_no_duroxide_residue(p_id text) +RETURNS void LANGUAGE plpgsql AS $$ +DECLARE + sch text := df.duroxide_schema(); + n int; +BEGIN + EXECUTE format( + 'SELECT (SELECT count(*) FROM %I.orchestrator_queue WHERE instance_id = $1) ' + ' + (SELECT count(*) FROM %I.instances WHERE instance_id = $1)', + sch, sch) + INTO n USING p_id; + IF n > 0 THEN + RAISE EXCEPTION 'TEST FAILED [start_rollback_ghost]: rolled-back df.start left % runtime row(s) for instance %', n, p_id; + END IF; + RAISE NOTICE 'PASSED [start_rollback_ghost]: no runtime residue for %', p_id; +END $$; + +-- =========================================================================== +-- Scenario 1: a rolled-back df.start() leaves no runtime "ghost". +-- =========================================================================== + +SET SESSION AUTHORIZATION df_e2e_user; +BEGIN; +SELECT df.start('SELECT 42', 'reconcile-start-rollback') AS rb_id \gset +ROLLBACK; +RESET SESSION AUTHORIZATION; + +-- Give any (buggy) out-of-band start a moment to surface before asserting. +SELECT pg_sleep(2); + +SELECT pg_temp.assert_no_duroxide_residue(:'rb_id'); + +-- =========================================================================== +-- Scenario 2: a committed df instance that the engine never heard about is +-- started by the worker (convergence backstop). +-- +-- We simulate "committed intent whose start never reached duroxide" by writing +-- df.instances/df.nodes directly (no df.start, no notification), backdated so it +-- is immediately eligible for the worker's reconcile sweep. On the unfixed code +-- nothing ever starts it and this scenario times out. +-- =========================================================================== + +DROP TABLE IF EXISTS reconcile_probe; +CREATE TABLE reconcile_probe (id SERIAL PRIMARY KEY, noted_at TIMESTAMPTZ DEFAULT now()); +GRANT INSERT ON reconcile_probe TO df_e2e_user; +GRANT USAGE, SELECT ON SEQUENCE reconcile_probe_id_seq TO df_e2e_user; + +-- Insert a minimal one-node graph owned by df_e2e_user, backdated past the +-- reconcile grace window. FKs between df.instances.root_node and df.nodes are +-- DEFERRABLE INITIALLY DEFERRED, so both rows can be inserted in one transaction. +BEGIN; +INSERT INTO df.nodes (id, instance_id, node_type, query, submitted_by, database) +VALUES ('deadbee1', 'abadcafe', 'SQL', 'INSERT INTO reconcile_probe DEFAULT VALUES', + 'df_e2e_user'::regrole, 'postgres'); + +INSERT INTO df.instances (id, label, root_node, status, submitted_by, database, created_at, updated_at) +VALUES ('abadcafe', 'reconcile-start-backstop', 'deadbee1', 'pending', + 'df_e2e_user'::regrole, 'postgres', + now() - interval '10 minutes', now() - interval '10 minutes'); +COMMIT; + +DO $$ +DECLARE status TEXT; attempts INT := 0; +BEGIN + LOOP + SELECT s INTO status FROM df.status('abadcafe') s; + EXIT WHEN lower(status) IN ('completed', 'failed', 'cancelled') OR attempts > 600; + PERFORM pg_sleep(0.1); + attempts := attempts + 1; + END LOOP; + + IF lower(COALESCE(status, 'pending')) <> 'completed' THEN + RAISE EXCEPTION 'TEST FAILED [start_backstop]: worker did not start committed instance abadcafe (status=%)', status; + END IF; + + IF NOT EXISTS (SELECT 1 FROM reconcile_probe) THEN + RAISE EXCEPTION 'TEST FAILED [start_backstop]: instance completed but its node never ran'; + END IF; + + RAISE NOTICE 'PASSED [start_backstop]: worker started and completed committed instance abadcafe'; +END $$; + +-- =========================================================================== +-- Scenario 3: a stale "running" claim left by a worker that died between the +-- atomic claim (pending -> running) and the engine start is recovered. +-- +-- We simulate the crashed-mid-start state directly: an instance whose df row is +-- 'running' (as if the claim committed) but which the engine never started, +-- backdated so its claim is older than the reconcile grace window. The sweep +-- must notice the engine does not know it and re-start it. On code that only +-- sweeps 'pending' rows this instance is stranded forever and the scenario times +-- out. +-- =========================================================================== + +BEGIN; +INSERT INTO df.nodes (id, instance_id, node_type, query, submitted_by, database) +VALUES ('deadbee2', 'badcab1e', 'SQL', 'INSERT INTO reconcile_probe DEFAULT VALUES', + 'df_e2e_user'::regrole, 'postgres'); + +INSERT INTO df.instances (id, label, root_node, status, submitted_by, database, start_input, created_at, updated_at) +VALUES ('badcab1e', 'reconcile-stale-running', 'deadbee2', 'running', + 'df_e2e_user'::regrole, 'postgres', '{"instance_id": "badcab1e", "vars": {}}'::jsonb, + now() - interval '10 minutes', now() - interval '10 minutes'); +COMMIT; + +DO $$ +DECLARE status TEXT; attempts INT := 0; probe_before INT; probe_after INT; +BEGIN + SELECT count(*) INTO probe_before FROM reconcile_probe; + LOOP + SELECT s INTO status FROM df.status('badcab1e') s; + EXIT WHEN lower(status) IN ('completed', 'failed', 'cancelled') OR attempts > 600; + PERFORM pg_sleep(0.1); + attempts := attempts + 1; + END LOOP; + + IF lower(COALESCE(status, 'running')) <> 'completed' THEN + RAISE EXCEPTION 'TEST FAILED [stale_running_recovery]: worker did not recover stale-running instance badcab1e (status=%)', status; + END IF; + + SELECT count(*) INTO probe_after FROM reconcile_probe; + IF probe_after <= probe_before THEN + RAISE EXCEPTION 'TEST FAILED [stale_running_recovery]: instance completed but its node never ran'; + END IF; + + RAISE NOTICE 'PASSED [stale_running_recovery]: worker recovered stale-running instance badcab1e'; +END $$; + +DROP TABLE reconcile_probe; + +SELECT 'TEST PASSED: reconcile start' AS result; diff --git a/tests/e2e/sql/26_reconcile_security.sql b/tests/e2e/sql/26_reconcile_security.sql new file mode 100644 index 0000000..bf484ce --- /dev/null +++ b/tests/e2e/sql/26_reconcile_security.sql @@ -0,0 +1,117 @@ +-- Copyright (c) Microsoft Corporation. +-- Licensed under the PostgreSQL License. + +-- Security regression test for start reconciliation. +-- +-- df.instances.start_input is caller-writable (df.start() runs as the caller and +-- writes it). The background worker replays it to start the durable engine, and +-- the orchestration selects which graph to load — and therefore which role runs +-- the SQL — from the instance id. If the worker/orchestration trusted the +-- instance id embedded in the payload, a low-privilege user could INSERT a +-- crafted pending row whose start_input points at ANOTHER user's instance and +-- have the superuser worker start it, running the victim's workflow as the +-- victim with attacker-chosen vars (cross-tenant execution / SQL injection). +-- +-- This test constructs exactly that attack across two roles and asserts it is +-- blocked: the attacker's row runs the attacker's own graph, and the victim's +-- protected table is never written by the attacker-triggered start. +-- +-- Base connection is the superuser (postgres); the crafted rows are inserted by +-- the two non-superuser roles themselves so RLS and column grants apply. All ids +-- are 8 lowercase-hex characters (enforced by the df.instances/df.nodes CHECKs). + +-- --- Setup (superuser) ------------------------------------------------------ +DO $$ +BEGIN + IF NOT EXISTS (SELECT 1 FROM pg_roles WHERE rolname = 'df_sec_victim') THEN + CREATE ROLE df_sec_victim LOGIN; + END IF; +END $$; +SELECT df.grant_usage('df_sec_victim'); + +DROP TABLE IF EXISTS sec_victim_secret; +DROP TABLE IF EXISTS sec_attacker_probe; +CREATE TABLE sec_victim_secret (marker TEXT); +CREATE TABLE sec_attacker_probe (marker TEXT); +-- Only the victim can write its secret table; only the attacker can write its +-- own probe table. If the redirect worked, the victim's node (running as the +-- victim) would write sec_victim_secret. +GRANT INSERT ON sec_victim_secret TO df_sec_victim; +GRANT INSERT ON sec_attacker_probe TO df_e2e_user; + +-- --- Victim creates an (unstarted) instance whose graph writes its secret ----- +-- Inserted directly and left pending: it is neither notified nor old enough for +-- the reconcile grace window, so it will not start on its own during the test. +SET SESSION AUTHORIZATION df_sec_victim; +BEGIN; +INSERT INTO df.nodes (id, instance_id, node_type, query, submitted_by, database) +VALUES ('c0ffee0d', 'c0ffee00', 'SQL', + 'INSERT INTO sec_victim_secret VALUES (''victim-secret'')', + 'df_sec_victim'::regrole, 'postgres'); +INSERT INTO df.instances (id, label, root_node, submitted_by, database, start_input) +VALUES ('c0ffee00', 'sec-victim', 'c0ffee0d', + 'df_sec_victim'::regrole, 'postgres', + '{"instance_id": "c0ffee00", "vars": {}}'::jsonb); +COMMIT; +RESET SESSION AUTHORIZATION; + +-- --- Attacker crafts a row whose start_input points at the victim instance ---- +SET SESSION AUTHORIZATION df_e2e_user; +BEGIN; +INSERT INTO df.nodes (id, instance_id, node_type, query, submitted_by, database) +VALUES ('facade0d', 'facade00', 'SQL', + 'INSERT INTO sec_attacker_probe VALUES (''attacker-ran'')', + 'df_e2e_user'::regrole, 'postgres'); +INSERT INTO df.instances (id, label, root_node, submitted_by, database, start_input) +VALUES ('facade00', 'sec-attacker', 'facade0d', + 'df_e2e_user'::regrole, 'postgres', + '{"instance_id": "c0ffee00", "vars": {"x": "; DROP TABLE sec_victim_secret; --"}}'::jsonb); +COMMIT; +-- Trigger the crafted row immediately via the public NOTIFY channel. +SELECT pg_notify('pg_durable_start', 'facade00'); +RESET SESSION AUTHORIZATION; + +-- --- Wait for the attacker instance to finish, then assert isolation held ----- +DO $$ +DECLARE status TEXT; attempts INT := 0; +BEGIN + LOOP + SELECT s INTO status FROM df.status('facade00') s; + EXIT WHEN lower(status) IN ('completed', 'failed', 'cancelled') OR attempts > 600; + PERFORM pg_sleep(0.1); + attempts := attempts + 1; + END LOOP; + + -- The attacker instance must run ITS OWN graph (writing its own probe), not + -- the victim's — proving start_input.instance_id did not redirect loading. + IF lower(COALESCE(status, 'pending')) <> 'completed' THEN + RAISE EXCEPTION 'TEST FAILED [start_input_redirect]: attacker instance did not run its own graph (status=%)', status; + END IF; + IF NOT EXISTS (SELECT 1 FROM sec_attacker_probe WHERE marker = 'attacker-ran') THEN + RAISE EXCEPTION 'TEST FAILED [start_input_redirect]: attacker instance completed but its own node never ran'; + END IF; + + -- The victim's graph must NOT have executed as a side effect of starting the + -- attacker's instance. A non-empty secret table means the redirect worked. + IF EXISTS (SELECT 1 FROM sec_victim_secret) THEN + RAISE EXCEPTION 'TEST FAILED [start_input_redirect]: victim graph executed via attacker-controlled start_input (privilege escalation)'; + END IF; + + RAISE NOTICE 'PASSED [start_input_redirect]: crafted start_input could not redirect graph loading to the victim instance'; +END $$; + +-- --- Cleanup ---------------------------------------------------------------- +-- Mark the still-pending victim row terminal so the reconcile sweep never starts +-- it after this test (superuser UPDATE bypasses RLS/grants); terminal rows are +-- pruned by the worker. The attacker row is already terminal (completed). +UPDATE df.instances SET status = 'cancelled', updated_at = now() +WHERE id IN ('c0ffee00', 'facade00') AND status NOT IN ('completed', 'failed', 'cancelled'); +DROP TABLE sec_victim_secret; +DROP TABLE sec_attacker_probe; +-- DROP OWNED BY removes df_sec_victim's remaining grants so DROP ROLE succeeds; +-- the cancelled instance row (submitted_by = df_sec_victim) survives with a +-- dangling regrole, which the extension tolerates, and is pruned as terminal. +DROP OWNED BY df_sec_victim; +DROP ROLE IF EXISTS df_sec_victim; + +SELECT 'TEST PASSED: reconcile security' AS result;