diff --git a/docs/spec-atomic-start.md b/docs/spec-atomic-start.md new file mode 100644 index 0000000..683ecf1 --- /dev/null +++ b/docs/spec-atomic-start.md @@ -0,0 +1,153 @@ +# Keeping the control plane and the runtime consistent + +**Status:** Implemented and validated. Atomic `df.start` / `df.cancel` / +`df.signal`, plus a reconciler that repairs leftover drift. +**Author:** pg_durable team +**Date:** June 2026 + +## Problem + +A durable function has state in two places: + +- **pg_durable control plane** — `df.nodes` and `df.instances`, written on the + caller's PostgreSQL transaction. +- **duroxide runtime** — `_duroxide` queue/history/instance state, previously + written out-of-band through a separate connection. + +Those writes were not atomic. The visible failure was a rolled-back `df.start()`: +`df.*` rows rolled back, but the duroxide enqueue survived. The worker then retried +an orchestration whose graph no longer existed, waited up to 5 seconds for rows that +would never appear, and left an orphan behind. We reproduced this with rolled-back +starts leaving `_duroxide.instances` rows that had no matching `df.instances` row. + +`df.cancel()` and `df.signal()` had the same transaction-boundary problem for their +runtime enqueue: a rollback of the caller's transaction did not roll back the runtime +work. + +## Decision + +Implement **prevent + repair**: + +1. **Prevent:** enqueue `df.start()`, `df.cancel()`, and `df.signal()` runtime work + inside the caller's transaction via SPI. The control-plane writes and runtime + enqueue now commit or roll back together. +2. **Repair:** run a lightweight reconciler that removes leftover runtime orphans + and marks stuck control-plane rows failed. This catches legacy/fallback drift and + crash-time drift that transaction-local enqueue cannot prevent. + +## Alternatives considered + +| Option | Summary | Decision | +|---|---|---| +| Single source of truth in `_duroxide` | Move all state into the runtime schema and rebuild pg_durable reads/security over it. | Too large and migration-heavy for this fix. | +| Run pg_durable writes inside duroxide's transaction | Let duroxide-pg perform the `df.*` writes on its worker-pool connection. | Atomic, but not with the caller's transaction; breaks identity capture and row security expectations. | +| Run duroxide enqueue inside the caller's transaction | Keep both stores and use SPI to enqueue runtime work next to `df.*` writes. | Chosen primary fix. | +| Async reconciler only | Tolerate drift and repair it later. | Useful backstop, but insufficient alone. | + +## Design overview + +`df.start()` still creates the graph and instance rows in `df.*`. The final enqueue +step now happens through SQL on the same transaction, so a rollback undoes both the +control-plane rows and the runtime queue row. `df.cancel()` and `df.signal()` follow +the same transaction rule. + +The runtime queue is not writable by ordinary users, so the enqueue goes through +private `SECURITY DEFINER` wrappers in the `df` schema. These wrappers are granted +through `df.grant_usage()`, build the runtime work items themselves, and perform +their own authorization checks before writing to `_duroxide`. + +The start wrapper is intentionally **not** a general-purpose privileged runtime +entrypoint. It only starts the root function-graph orchestration and validates that +the input targets the same instance id. Cancel/signal wrappers authorize against the +instance owner. + +### Direct duroxide-pg coupling + +This is the abstraction break in the design. Most pg_durable code talks to the +runtime through duroxide's Rust provider/client API. That API uses a separate +connection pool, so it cannot share the caller's backend transaction. + +To get caller-transaction atomicity, this PR calls the duroxide-pg SQL surface +directly (`enqueue_orchestrator_work`, `delete_instances_atomic`, and selected +runtime tables). That only works with a PostgreSQL-backed provider. In practice, +pg_durable is itself a PostgreSQL extension whose runtime state lives in the same +database, so a non-PG provider is not a meaningful deployment target; still, the +code probes for the SQL surface and falls back to the old out-of-band path when it +is absent. + +### Signals + +`df.signal()` fans out to the root instance and any running sub-orchestrations, +because a child branch may be the one waiting on the signal. + +Duroxide does not buffer external events until an orchestration is ready to receive +them. Therefore a signal sent before the root runtime row exists is rejected instead +of returning `OK` and being silently skipped. Once the runtime row exists, the +signal enqueue is atomic with the caller's transaction. + +### Reconciler + +`df.reconcile()` is an admin-only backstop. It: + +- deletes orphaned runtime **root** instances whose full subtree has no matching + `df.instances` row; and +- marks stuck `df.instances` rows failed when there is no live runtime instance and + no queued start. + +The background worker keeps one reconciler durable loop running per cluster on +`pg_durable.reconciler_cron` (default `*/5 * * * *`; empty disables it), submitted +by the dedicated non-superuser role `df_reconciler`. + +## Behavior changes + +- `df.start()`, `df.cancel()`, and `df.signal()` now participate in the caller's + transaction. For example, `BEGIN; SELECT df.start(...); ROLLBACK;` no longer + starts the workflow on the atomic path. +- If the duroxide-pg SQL surface or the `df` wrappers are missing, pg_durable logs + and falls back to the previous non-atomic client path. The fallback is not emitted + as a client-visible SQL `WARNING`, so scripts that capture `SELECT df.start(...)` + output remain compatible. +- `df.wait_for_schedule` now records an `utc_now` event before the timer so repeated + schedule waits compute the next cron tick each generation. This fixes a loop + busy-loop bug, but changes the recorded replay event sequence. + +## Upgrade and compatibility + +The wrappers and `df.reconcile()` are `df`-schema objects, so they are included in +both fresh-install SQL and the `0.2.3 -> 0.2.4` upgrade script. The upgrade script +also updates `df.grant_usage()` / `df.revoke_usage()` and backfills wrapper EXECUTE +privileges to existing roles that already had explicit `USAGE` on schema `df`. + +Binary backward compatibility is preserved for old schemas that have not yet run +`ALTER EXTENSION UPDATE`: the new binary uses the atomic path only when both the +duroxide-pg enqueue function and the `df._enqueue_orchestrator_*` wrappers exist; +otherwise it falls back to the old out-of-band client path. + +There is no table data migration. + +**Upgrade caveat:** drain or restart any in-flight instance already waiting in a +`WAIT_SCHEDULE` node during upgrade. Those histories may replay expecting the old +sequence and fail as nondeterministic after the `utc_now` change. + +## Validation + +Automated coverage added in this PR: + +- `24_atomic_rollback` — rolled-back start/cancel/signal leave no runtime effect; + signaling before runtime materialization is rejected. +- `25_enqueue_wrapper_authz` — wrapper authorization and start-wrapper hardening. +- `26_reconcile_orphan_gc` — orphan root with child sub-orchestrations is collected + as a full subtree; healthy instances remain untouched. +- `scripts/test-upgrade.sh` — schema equivalence, binary compatibility, data + compatibility, and existing-role wrapper-grant backfill. + +Manual/targeted validation included signal fan-out, cancel consistency, reconciler +liveness, cron scheduling, formatting, clippy, pgspot, and upgrade tests. + +## Remaining risks / follow-up + +- The direct duroxide-pg SQL dependency is intentional but should remain small and + well documented. Moving the privileged enqueue surface into duroxide-pg would be a + cleaner long-term boundary. +- Reconciler grace/cadence and role provisioning may need tuning after operational + experience. diff --git a/docs/upgrade-testing.md b/docs/upgrade-testing.md index 4e8375b..4b29a0e 100644 --- a/docs/upgrade-testing.md +++ b/docs/upgrade-testing.md @@ -206,14 +206,23 @@ what the upgrade script handles, and any backward compatibility considerations. ### v0.2.3 → v0.2.4 #### Simplify `df.grant_usage()` — drop the explicit function allowlist -- **DDL change (df schema):** `df.grant_usage()` no longer loops over a hard-coded `func_sigs` array issuing `GRANT EXECUTE` per function. Fresh installs (`src/lib.rs`) and the upgrade script (`sql/pg_durable--0.2.3--0.2.4.sql`) both `CREATE OR REPLACE` the function with a body that grants `USAGE ON SCHEMA df` plus the table privileges, and conditionally grants `df.http()` / the admin helpers. The signature `df.grant_usage(text, boolean, boolean)` is unchanged. -- **DDL change (df schema):** `df.revoke_usage()` is made symmetric with the new `grant_usage()`. It no longer loops over every `df.*` function in `pg_proc` issuing `REVOKE EXECUTE` (which, post-simplification, only produced "no privileges could be revoked" warnings since ordinary functions are never granted per-function EXECUTE). The new body revokes only what `grant_usage()` grants: schema `USAGE`, EXECUTE on the sensitive functions (`df.http`, `df.grant_usage`, `df.revoke_usage`), and the table privileges. The signature `df.revoke_usage(text)` is unchanged. +- **DDL change (df schema):** `df.grant_usage()` no longer loops over a hard-coded `func_sigs` array issuing `GRANT EXECUTE` per function. Fresh installs (`src/lib.rs`) and the upgrade script (`sql/pg_durable--0.2.3--0.2.4.sql`) both `CREATE OR REPLACE` the function with a body that grants `USAGE ON SCHEMA df` plus the table privileges, conditionally grants `df.http()` / the admin helpers, and explicitly grants the new private enqueue wrappers (`df._enqueue_orchestrator_start`, `df._enqueue_orchestrator_cancel`, `df._enqueue_orchestrator_signal`) to ordinary df users. The signature `df.grant_usage(text, boolean, boolean)` is unchanged. +- **DDL change (df schema):** `df.revoke_usage()` is made symmetric with the new `grant_usage()`. It no longer loops over every `df.*` function in `pg_proc` issuing `REVOKE EXECUTE` (which, post-simplification, only produced "no privileges could be revoked" warnings since ordinary functions are never granted per-function EXECUTE). The new body revokes only what `grant_usage()` grants: schema `USAGE`, EXECUTE on the sensitive/admin functions, EXECUTE on the private enqueue wrappers, and the table privileges. The signature `df.revoke_usage(text)` is unchanged. - **Rationale:** The ordinary `df.*` functions retain PostgreSQL's default PUBLIC `EXECUTE`, so schema `USAGE` is the real access gate; the per-function grants/revokes were redundant. The sensitive functions have PUBLIC `EXECUTE` revoked at install time and were never in the allowlist, so their protection is unchanged. - **Behavioral note:** A newly added `df.*` function is now callable by any role with schema `USAGE` by default. To keep a future function private, `REVOKE EXECUTE ... FROM PUBLIC` at install time and grant it explicitly in `df.grant_usage()`. - **Legacy cleanup caveat:** A role that was granted under the *old* `grant_usage()` (explicit per-function EXECUTE) and is later revoked under the new `revoke_usage()` may retain inert EXECUTE entries on ordinary functions. These are harmless — revoking schema `USAGE` fully locks the role out — and clear on the next drop/regrant cycle. - **Scenario A considerations:** Signatures are identical on the fresh-install and upgrade paths (only the bodies differ), so the function-signature equivalence contract passes. - **Scenario B1/B2 considerations:** No schema/data migration and no new objects. The replaced bodies work against the existing schema and change no privileges already granted. +#### Atomic in-transaction enqueue wrappers + `df.reconcile()` +- **DDL change (df schema):** Adds three private `SECURITY DEFINER` wrappers: `df._enqueue_orchestrator_start(text, text, text)`, `df._enqueue_orchestrator_cancel(text, text)`, and `df._enqueue_orchestrator_signal(text, text, text)`. Fresh installs and the upgrade script create the same functions and `REVOKE EXECUTE ... FROM PUBLIC`; `df.grant_usage()` grants them explicitly to df users because `df.start()` / `df.cancel()` / `df.signal()` call them via SPI as the caller. The upgrade script also backfills these wrapper grants to roles that already had explicit `USAGE` on schema `df` before `ALTER EXTENSION UPDATE`, preserving grant option where present. +- **DDL change (df schema):** Adds admin-only `df.reconcile(integer)` (`SECURITY DEFINER`, `REVOKE EXECUTE ... FROM PUBLIC`) to delete orphaned duroxide instance subtrees and mark stuck `df.instances` rows failed. The background worker starts it through a built-in durable loop as the dedicated `df_reconciler` role. +- **Provider coupling:** These wrappers and `df.reconcile()` call the duroxide-pg SQL surface directly (`enqueue_orchestrator_work`, `delete_instances_atomic`, and `_duroxide` tables). This is intentional: only direct SQL via SPI can share the caller's transaction. Non-PG providers (or older schemas without the wrappers) use the legacy out-of-band fallback. +- **Scenario A considerations:** Fresh-install and upgrade schemas must expose the same new `df` functions and grants. The upgrade script creates the wrappers/reconciler, updates `grant_usage()`/`revoke_usage()`, and backfills existing df users in the same release, so upgraded users can call `df.start()` / `df.cancel()` / `df.signal()` immediately after `ALTER EXTENSION UPDATE`. +- **Scenario B1 considerations:** The new `.so` remains compatible with pre-upgrade 0.2.3 schemas because `df.start()` / `df.cancel()` / `df.signal()` use the in-transaction path only when both the duroxide-pg provider SQL function and the `df._enqueue_orchestrator_*` wrappers exist; otherwise they log and fall back to the old out-of-band client path. This fallback is not client-visible as a SQL `WARNING`, so it does not contaminate scripts that capture `SELECT df.start(...)` output. +- **Scenario B2 considerations:** No data migration. Existing instances and queued work keep their original behavior; the new atomic semantics apply to calls made after the schema has the wrappers. +- **In-flight schedule caveat:** This release also changes `df.wait_for_schedule` to compute from the orchestration's recorded clock. That fixes a loop busy-loop bug, but changes the recorded replay event sequence (`utc_now` is recorded before the timer). Any in-flight instance already waiting in a `WAIT_SCHEDULE` node when the binary changes should be drained or restarted during upgrade; otherwise it may replay expecting the old sequence and fail as nondeterministic. + #### Rename `df.wait_for_completion()` to `df.await_instance()` - **DDL change (df schema):** Adds `df.await_instance(text, integer)` as the canonical C binding for the helper formerly exposed as `df.wait_for_completion(text, integer)`. The old SQL function remains present and the new `.so` continues exporting `wait_for_completion_wrapper` as a shim, so existing customer scripts keep working. - **Grant behavior:** No explicit grant migration is required. PostgreSQL grants `EXECUTE` on newly created functions to `PUBLIC` by default, and `df.await_instance` is not a sensitive helper whose default PUBLIC grant is revoked. diff --git a/scripts/test-upgrade.sh b/scripts/test-upgrade.sh index 8f5af24..73dec60 100755 --- a/scripts/test-upgrade.sh +++ b/scripts/test-upgrade.sh @@ -947,6 +947,7 @@ echo "" B2_PRE_INSTANCE_ID="" B2_INFLIGHT_INSTANCE_ID="" B2_POST_INSTANCE_ID="" +B2_PRE_GRANTED_ROLE="durable_b2_pre_grant" test_b2_data_survives_upgrade() { # Step 1: Install previous version and create test data @@ -957,6 +958,14 @@ test_b2_data_survives_upgrade() { assert_sql_equals "SELECT df.clearvars();" "OK" || return 1 assert_sql_equals "SELECT df.setvar('b2_key', 'b2_value');" "OK" || return 1 + # Role granted under the previous schema. The upgrade must backfill EXECUTE + # on new private wrappers so this existing df user can keep calling + # df.start()/df.cancel()/df.signal() without re-running df.grant_usage(). + run_sql_capture "DROP OWNED BY ${B2_PRE_GRANTED_ROLE};" >/dev/null 2>&1 || true + run_sql_capture "DROP ROLE IF EXISTS ${B2_PRE_GRANTED_ROLE};" >/dev/null 2>&1 || true + run_sql_capture "CREATE ROLE ${B2_PRE_GRANTED_ROLE} LOGIN;" >/dev/null || return 1 + run_sql_capture "SELECT df.grant_usage('${B2_PRE_GRANTED_ROLE}');" >/dev/null || return 1 + B2_PRE_INSTANCE_ID=$(run_sql_capture "SELECT df.start('INSERT INTO test_upgrade_b2_log (kind, msg) VALUES (''pre'', ''{b2_key}'') RETURNING msg', 'b2-pre-upgrade');") || return 1 B2_INFLIGHT_INSTANCE_ID=$(run_sql_capture "SELECT df.start(df.sleep(2) ~> 'SELECT ''b2-running'' AS value', 'b2-inflight');") || return 1 @@ -1001,6 +1010,12 @@ test_b2_new_data_after_upgrade() { assert_sql_equals "SELECT msg FROM test_upgrade_b2_log WHERE kind = 'post' ORDER BY id DESC LIMIT 1;" "new_value" } +test_b2_existing_grants_after_upgrade() { + assert_sql_equals "SELECT has_function_privilege('${B2_PRE_GRANTED_ROLE}', 'df._enqueue_orchestrator_start(text, text, text)', 'EXECUTE');" "t" && + assert_sql_equals "SELECT has_function_privilege('${B2_PRE_GRANTED_ROLE}', 'df._enqueue_orchestrator_cancel(text, text)', 'EXECUTE');" "t" && + assert_sql_equals "SELECT has_function_privilege('${B2_PRE_GRANTED_ROLE}', 'df._enqueue_orchestrator_signal(text, text, text)', 'EXECUTE');" "t" +} + test_b2_grant_usage_after_upgrade() { # Regression guard for #110: after ALTER EXTENSION UPDATE, df.debug_connection() # must be gone from the catalog. Scenario A only compares function name/args/ @@ -1029,6 +1044,7 @@ test_b2_grant_usage_after_upgrade() { # Clean up the probe role. run_sql_capture "DROP OWNED BY ${probe_role}; DROP ROLE IF EXISTS ${probe_role};" >/dev/null 2>&1 || true + run_sql_capture "DROP OWNED BY ${B2_PRE_GRANTED_ROLE}; DROP ROLE IF EXISTS ${B2_PRE_GRANTED_ROLE};" >/dev/null 2>&1 || true } if [ "$HAS_COMPAT_PREV" = true ]; then @@ -1036,6 +1052,7 @@ if [ "$HAS_COMPAT_PREV" = true ]; then run_test "B2: Pre-upgrade instance remains queryable" test_b2_pre_upgrade_instance_after_upgrade run_test "B2: In-flight work completes after upgrade" test_b2_inflight_work_after_upgrade run_test "B2: New data and execution after upgrade" test_b2_new_data_after_upgrade + run_test "B2: Existing df users retain wrapper privileges after upgrade" test_b2_existing_grants_after_upgrade run_test "B2: df.grant_usage() works and df.debug_connection() is gone after upgrade" test_b2_grant_usage_after_upgrade fi diff --git a/sql/pg_durable--0.2.3--0.2.4.sql b/sql/pg_durable--0.2.3--0.2.4.sql index 40153a6..ba8b90a 100644 --- a/sql/pg_durable--0.2.3--0.2.4.sql +++ b/sql/pg_durable--0.2.3--0.2.4.sql @@ -41,7 +41,10 @@ DROP FUNCTION IF EXISTS df.debug_connection(); -- The sensitive functions (df.http, df.grant_usage, df.revoke_usage) have -- PUBLIC EXECUTE revoked; df.http and the admin helpers are granted explicitly -- here when requested. The updated body also grants df.metrics() (system-wide --- aggregate counts) to with_grant => true admins. +-- aggregate counts) to with_grant => true admins. The new in-transaction +-- enqueue wrappers are also private (REVOKE FROM PUBLIC below) and are granted +-- explicitly to df users because df.start()/df.cancel()/df.signal() call them +-- via SPI as the calling role. -- -- Unlike a fresh 0.2.4 install, this upgrade does NOT revoke df.metrics()'s -- PUBLIC EXECUTE. Making df.metrics() private by default is a posture change for @@ -85,6 +88,14 @@ BEGIN EXECUTE pg_catalog.format('GRANT EXECUTE ON FUNCTION df.metrics() TO %I', p_role) OPERATOR(pg_catalog.||) grant_opt; END IF; + -- In-transaction enqueue wrappers — SECURITY DEFINER, revoked from PUBLIC at + -- install. Granted unconditionally to every df user because df.start() / + -- df.cancel() / df.signal() call them via SPI as the calling role; their own + -- internal authorization checks gate access to other users' instances. + EXECUTE pg_catalog.format('GRANT EXECUTE ON FUNCTION df._enqueue_orchestrator_start(text, text, text) TO %I', p_role) OPERATOR(pg_catalog.||) grant_opt; + EXECUTE pg_catalog.format('GRANT EXECUTE ON FUNCTION df._enqueue_orchestrator_cancel(text, text) TO %I', p_role) OPERATOR(pg_catalog.||) grant_opt; + EXECUTE pg_catalog.format('GRANT EXECUTE ON FUNCTION df._enqueue_orchestrator_signal(text, text, text) TO %I', p_role) OPERATOR(pg_catalog.||) grant_opt; + -- Table privileges EXECUTE pg_catalog.format('GRANT SELECT ON df.instances TO %I', p_role) OPERATOR(pg_catalog.||) grant_opt; EXECUTE pg_catalog.format('GRANT UPDATE (status, updated_at) ON df.instances TO %I', p_role) OPERATOR(pg_catalog.||) grant_opt; @@ -109,11 +120,11 @@ $fn$; -- the role out of every ordinary df.* function. -- -- The new body undoes exactly what grant_usage() grants: schema USAGE, EXECUTE --- on the sensitive functions (including df.metrics(), which grant_usage() grants --- to with_grant admins), and the table privileges. Note: a role granted under --- the OLD grant_usage() (explicit per-function EXECUTE) may retain inert EXECUTE --- entries on ordinary functions after this revoke; they are harmless because --- schema USAGE is gone. +-- on the sensitive/admin functions (including df.metrics() for with_grant +-- admins), EXECUTE on the new enqueue wrappers, and the table privileges. Note: +-- a role granted under the OLD grant_usage() (explicit per-function EXECUTE) may +-- retain inert EXECUTE entries on ordinary functions after this revoke; they are +-- harmless because schema USAGE is gone. -- ============================================================================ CREATE OR REPLACE FUNCTION df.revoke_usage(p_role TEXT) RETURNS VOID @@ -149,6 +160,24 @@ BEGIN NULL; END; + -- In-transaction enqueue wrappers (granted unconditionally by grant_usage()). + -- A delegated admin may not be the grantor of these; skip if so. + BEGIN + EXECUTE pg_catalog.format('REVOKE EXECUTE ON FUNCTION df._enqueue_orchestrator_start(text, text, text) FROM %I CASCADE', p_role); + EXCEPTION WHEN insufficient_privilege THEN + NULL; + END; + BEGIN + EXECUTE pg_catalog.format('REVOKE EXECUTE ON FUNCTION df._enqueue_orchestrator_cancel(text, text) FROM %I CASCADE', p_role); + EXCEPTION WHEN insufficient_privilege THEN + NULL; + END; + BEGIN + EXECUTE pg_catalog.format('REVOKE EXECUTE ON FUNCTION df._enqueue_orchestrator_signal(text, text, text) FROM %I CASCADE', p_role); + EXCEPTION WHEN insufficient_privilege THEN + NULL; + END; + -- Table privileges. -- Column-level revokes must match the column-level grants from grant_usage(). EXECUTE pg_catalog.format('REVOKE SELECT, INSERT, UPDATE, DELETE ON df.vars FROM %I CASCADE', p_role); @@ -183,13 +212,214 @@ LANGUAGE c AS 'MODULE_PATHNAME', 'await_instance_wrapper'; -- ============================================================================ --- df.reconcile(): repair residual df.* / duroxide divergence. +-- In-transaction enqueue wrappers + reconciler (atomic df.start/cancel/signal, +-- and df.reconcile()). +-- +-- df.start()/df.cancel()/df.signal() now enqueue the duroxide work item over SPI +-- inside the CALLER'S transaction (so a rollback undoes the enqueue), through +-- these SECURITY DEFINER wrappers. The orchestrator queue is owner-only, so the +-- wrappers perform the privileged INSERT; each builds the work item server-side +-- and authorizes the caller (df.start by brand-new-instance state; df.cancel / +-- df.signal by pg_has_role(session_user, , 'MEMBER')). They are revoked +-- from PUBLIC and granted to df users by df.grant_usage() (above). -- --- Best-effort admin backstop: delete orphaned duroxide instance subtrees whose --- root has no df.instances row, and mark stale df.instances rows failed when --- the runtime has neither a live instance nor a queued start. Fresh installs --- define the same function in src/lib.rs. +-- df.reconcile() is the admin-only backstop that deletes orphaned duroxide +-- instance subtrees with no df.instances row and fails stuck df.instances rows. +-- +-- The wrappers resolve the duroxide schema via df.duroxide_schema() (defined in +-- the 0.2.2→0.2.3 upgrade) and require the duroxide-pg provider; df.start / +-- df.cancel / df.signal fall back to the out-of-band path when it is absent, so +-- this upgrade is safe on a fresh '_duroxide' or a legacy 'duroxide' schema. -- ============================================================================ +CREATE FUNCTION df._enqueue_orchestrator_start( + p_instance_id text, + p_orchestration text, + p_input text) +RETURNS void +LANGUAGE plpgsql +SECURITY DEFINER +SET search_path = pg_catalog, pg_temp +AS $fn$ +DECLARE + sch text := df.duroxide_schema(); + work_item text; + v_blocked boolean; +BEGIN + -- This wrapper is not a generic privileged "start any orchestration" entry + -- point. df.start() passes the root graph-executor name and FunctionInput + -- JSON; reject anything else so a caller cannot use the SECURITY DEFINER + -- privilege to enqueue an internal sub-orchestration with crafted input. + IF p_orchestration OPERATOR(pg_catalog.<>) 'pg_durable::orchestration::execute-function-graph' THEN + RAISE EXCEPTION 'pg_durable: invalid start orchestration %', p_orchestration + USING ERRCODE = 'invalid_parameter_value'; + END IF; + + IF (p_input::jsonb ->> 'instance_id') IS DISTINCT FROM p_instance_id THEN + RAISE EXCEPTION 'pg_durable: start input instance_id does not match %', p_instance_id + USING ERRCODE = 'invalid_parameter_value'; + END IF; + + -- Authorization. This runs as the (privileged) definer, so it must not + -- trust the caller to only target their own instance. Permit the enqueue + -- only for the transaction that inserted a brand-new, not-yet-started + -- instance: a 'pending' df.instances row with no orchestrator-queue entry, + -- no duroxide instance, and an in-progress xmin visible to this transaction. + -- This preserves SECURITY DEFINER / SET ROLE df.start() semantics while + -- blocking a caller from starting another user's previously-committed + -- pending row. Checking pg_xact_status(xmin) rather than equality to + -- pg_current_xact_id() keeps PL/pgSQL exception subtransactions working. + -- The wrapper is safe because it also fixes the orchestration to the root + -- graph executor and validates the input instance id, so callers cannot + -- start internal orchestrations or target someone else's already-started + -- instance. + EXECUTE pg_catalog.format( + 'SELECT NOT EXISTS (SELECT 1 FROM df.instances i ' + ' WHERE i.id = $1 ' + ' AND i.status = ''pending'' ' + ' AND pg_catalog.pg_xact_status(i.xmin::text::xid8) = ''in progress'') ' + ' OR EXISTS (SELECT 1 FROM %I.orchestrator_queue q WHERE q.instance_id = $1) ' + ' OR EXISTS (SELECT 1 FROM %I.instances d WHERE d.instance_id = $1)', + sch, sch) + INTO v_blocked + USING p_instance_id; + + IF v_blocked THEN + RAISE EXCEPTION 'pg_durable: not authorized to enqueue a start for instance %', p_instance_id + USING ERRCODE = 'insufficient_privilege'; + END IF; + + -- Build the StartOrchestration work item server-side so the caller cannot + -- choose the work-item variant (no CancelInstance/ExternalRaised/etc.) or + -- target a different instance. Mirrors duroxide's WorkItem::StartOrchestration. + work_item := pg_catalog.json_build_object( + 'StartOrchestration', pg_catalog.json_build_object( + 'instance', p_instance_id, + 'orchestration', 'pg_durable::orchestration::execute-function-graph', + 'input', p_input, + 'version', NULL, + 'parent_instance', NULL, + 'parent_id', NULL, + 'execution_id', 1))::text; + + EXECUTE pg_catalog.format('SELECT %I.enqueue_orchestrator_work($1, $2, $3)', sch) + USING p_instance_id, work_item, pg_catalog.now(); +END; +$fn$; + +REVOKE EXECUTE ON FUNCTION df._enqueue_orchestrator_start(text, text, text) FROM PUBLIC; + +CREATE FUNCTION df._enqueue_orchestrator_cancel(p_instance_id text, p_reason text) +RETURNS void +LANGUAGE plpgsql +SECURITY DEFINER +SET search_path = pg_catalog, pg_temp +AS $fn$ +DECLARE + sch text := df.duroxide_schema(); + owner_oid oid; +BEGIN + SELECT i.submitted_by::oid INTO owner_oid FROM df.instances i WHERE i.id = p_instance_id; + IF owner_oid IS NULL OR NOT pg_catalog.pg_has_role(session_user, owner_oid, 'MEMBER') THEN + RAISE EXCEPTION 'pg_durable: not authorized to cancel instance %', p_instance_id + USING ERRCODE = 'insufficient_privilege'; + END IF; + + EXECUTE pg_catalog.format('SELECT %I.enqueue_orchestrator_work($1, $2, $3)', sch) + USING p_instance_id, + pg_catalog.json_build_object('CancelInstance', + pg_catalog.json_build_object('instance', p_instance_id, 'reason', p_reason))::text, + pg_catalog.now(); +END; +$fn$; + +REVOKE EXECUTE ON FUNCTION df._enqueue_orchestrator_cancel(text, text) FROM PUBLIC; + +CREATE FUNCTION df._enqueue_orchestrator_signal(p_instance_id text, p_name text, p_data text) +RETURNS void +LANGUAGE plpgsql +SECURITY DEFINER +SET search_path = pg_catalog, pg_temp +AS $fn$ +DECLARE + sch text := df.duroxide_schema(); + owner_oid oid; + root_exists boolean; +BEGIN + SELECT i.submitted_by::oid INTO owner_oid FROM df.instances i WHERE i.id = p_instance_id; + IF owner_oid IS NULL OR NOT pg_catalog.pg_has_role(session_user, owner_oid, 'MEMBER') THEN + RAISE EXCEPTION 'pg_durable: not authorized to signal instance %', p_instance_id + USING ERRCODE = 'insufficient_privilege'; + END IF; + + -- Duroxide does not buffer external events until an orchestration has a + -- pending subscription. If the root runtime row is not materialized yet, a + -- signal would be accepted but dropped before the workflow can observe it. + EXECUTE pg_catalog.format( + 'SELECT EXISTS (SELECT 1 FROM %I.instances WHERE instance_id = $1)', sch) + INTO root_exists + USING p_instance_id; + IF NOT root_exists THEN + RAISE EXCEPTION 'pg_durable: instance % is not ready to receive signals', p_instance_id + USING ERRCODE = 'object_not_in_prerequisite_state'; + END IF; + + -- Raise the event for the target instance and every RUNNING descendant + -- (a sub-orchestration — JOIN/RACE branch or loop generation — may be the one + -- waiting on the signal), mirroring the out-of-band fan-out. %1$I = schema. + EXECUTE pg_catalog.format( + 'INSERT INTO %1$I.orchestrator_queue (instance_id, work_item, visible_at, created_at) ' + 'SELECT t.instance_id, ' + ' pg_catalog.json_build_object(''ExternalRaised'', ' + ' pg_catalog.json_build_object(''instance'', t.instance_id, ''name'', $2, ''data'', $3))::text, ' + ' pg_catalog.now(), pg_catalog.now() ' + 'FROM ( ' + ' WITH RECURSIVE tree AS ( ' + ' SELECT i.instance_id, i.current_execution_id, true AS is_root ' + ' FROM %1$I.instances i WHERE i.instance_id = $1 ' + ' UNION ' + ' SELECT c.instance_id, c.current_execution_id, false ' + ' FROM %1$I.instances c JOIN tree p ON c.parent_instance_id = p.instance_id ' + ' ) ' + ' SELECT tr.instance_id ' + ' FROM tree tr ' + ' LEFT JOIN %1$I.executions e ' + ' ON e.instance_id = tr.instance_id AND e.execution_id = tr.current_execution_id ' + ' WHERE tr.is_root OR pg_catalog.lower(COALESCE(e.status, '''')) = ''running'' ' + ') t', + sch) + USING p_instance_id, p_name, p_data; +END; +$fn$; + +REVOKE EXECUTE ON FUNCTION df._enqueue_orchestrator_signal(text, text, text) FROM PUBLIC; + +-- Backfill wrapper EXECUTE to roles that already had df usage before ALTER +-- EXTENSION UPDATE. New calls to df.grant_usage() grant these wrappers via the +-- function body above, but existing users would otherwise lose df.start() / +-- df.cancel() / df.signal() when the new .so chooses the atomic path. +DO $$ +DECLARE + r RECORD; + grant_opt TEXT; +BEGIN + FOR r IN + SELECT + pg_catalog.quote_ident(pg_catalog.pg_get_userbyid(a.grantee)) AS grantee, + pg_catalog.bool_or(a.is_grantable) AS with_grant_option + FROM pg_catalog.pg_namespace n + CROSS JOIN LATERAL pg_catalog.aclexplode(n.nspacl) AS a + WHERE n.nspname OPERATOR(pg_catalog.=) 'df' + AND a.privilege_type OPERATOR(pg_catalog.=) 'USAGE' + AND a.grantee OPERATOR(pg_catalog.<>) 0 -- skip PUBLIC + GROUP BY a.grantee + LOOP + grant_opt := CASE WHEN r.with_grant_option THEN ' WITH GRANT OPTION' ELSE '' END; + EXECUTE pg_catalog.format('GRANT EXECUTE ON FUNCTION df._enqueue_orchestrator_start(text, text, text) TO %s', r.grantee) OPERATOR(pg_catalog.||) grant_opt; + EXECUTE pg_catalog.format('GRANT EXECUTE ON FUNCTION df._enqueue_orchestrator_cancel(text, text) TO %s', r.grantee) OPERATOR(pg_catalog.||) grant_opt; + EXECUTE pg_catalog.format('GRANT EXECUTE ON FUNCTION df._enqueue_orchestrator_signal(text, text, text) TO %s', r.grantee) OPERATOR(pg_catalog.||) grant_opt; + END LOOP; +END $$; + CREATE FUNCTION df.reconcile(p_grace_seconds integer DEFAULT 60) RETURNS TABLE(duroxide_orphans_deleted bigint, stuck_instances_failed bigint) LANGUAGE plpgsql @@ -243,7 +473,7 @@ BEGIN END; -- 2) df.instances stuck non-terminal with no live duroxide instance and no - -- queued start (lost enqueue) -> mark failed. The duroxide queue row + -- queued start (lost enqueue) → mark failed. The duroxide queue row -- persists (locked) until ack, and the instance row is created at ack, so -- a healthy in-flight start always matches one of the NOT EXISTS guards -- and is never failed here. Best-effort; wrapped like step 1. @@ -406,7 +636,7 @@ CREATE FUNCTION df."instance_nodes"( "result" TEXT, /* core::option::Option */ "updated_at" timestamp with time zone /* core::option::Option */ ) -STRICT +STRICT LANGUAGE c /* Rust */ AS 'MODULE_PATHNAME', 'instance_nodes_wrapper'; @@ -426,6 +656,6 @@ CREATE FUNCTION df."instance_nodes"( "inferred_status_from_ancestor_id" TEXT, /* core::option::Option */ "updated_at" timestamp with time zone /* core::option::Option */ ) -STRICT +STRICT LANGUAGE c /* Rust */ AS 'MODULE_PATHNAME', 'instance_nodes_v2_wrapper'; diff --git a/src/client.rs b/src/client.rs index 01ae597..f11f5b9 100644 --- a/src/client.rs +++ b/src/client.rs @@ -36,7 +36,7 @@ thread_local! { /// row, or has a `schema_version` below `WORKER_SCHEMA_VERSION`. This is a fast /// SPI read called once per session on the first call to any `df.*` function /// that needs the duroxide client. -fn is_worker_ready() -> bool { +pub(crate) fn is_worker_ready() -> bool { let schema = backend_duroxide_schema(); // First check if the readiness table exists via the catalogue. Querying @@ -191,7 +191,10 @@ async fn list_running_descendants(client: &Client, root_instance_id: &str) -> Ve descendants } -/// Start a durable function via the shared PostgreSQL store. +/// Start a durable function via the duroxide client (out-of-band, on the cached +/// pool). This is the provider-agnostic fallback used by df.start() when the +/// duroxide-pg SQL enqueue surface is not available; it is NOT atomic with the +/// caller's transaction. pub fn start_durable_function( function_name: &str, instance_id: &str, diff --git a/src/dsl.rs b/src/dsl.rs index fbbbbb8..dd30107 100644 --- a/src/dsl.rs +++ b/src/dsl.rs @@ -11,7 +11,6 @@ use std::str::FromStr; use std::cell::RefCell; use std::time::Instant; -use crate::client::start_durable_function; use crate::types::{ mark_non_future_helper_call, short_id, validate_result_name, Durofut, FunctionInput, }; @@ -624,9 +623,32 @@ pub fn signal(instance_id: &str, signal_name: &str, signal_data: default!(&str, pgrx::error!("Instance not found or access denied: {}", instance_id); } - match raise_external_event(instance_id, signal_name, &signal_data) { - Ok(_) => "OK".to_string(), - Err(e) => pgrx::error!("Failed to send signal: {}", e), + // Enqueue the ExternalRaised work item. Prefer the in-transaction SPI path + // (atomic with the caller's transaction; fans out to running descendants in + // the wrapper) when the duroxide-pg SQL surface is present; otherwise fall + // back to the out-of-band client path. + let schema = crate::types::backend_duroxide_schema(); + if in_tx_enqueue_supported(schema) { + if let Err(e) = Spi::run_with_args( + "SELECT df._enqueue_orchestrator_signal($1, $2, $3)", + &[ + instance_id.into(), + signal_name.into(), + signal_data.as_str().into(), + ], + ) { + pgrx::error!("Failed to send signal: {:?}", e); + } + "OK".to_string() + } else { + pgrx::log!( + "pg_durable: df.signal() is using the non-atomic fallback enqueue \ + (duroxide-pg SQL surface not detected)" + ); + match raise_external_event(instance_id, signal_name, &signal_data) { + Ok(_) => "OK".to_string(), + Err(e) => pgrx::error!("Failed to send signal: {}", e), + } } } @@ -634,6 +656,41 @@ pub fn signal(instance_id: &str, signal_name: &str, signal_data: default!(&str, // Orchestration Control Functions // ============================================================================ +/// True when both required pieces of the in-transaction enqueue path exist: +/// the duroxide-pg SQL enqueue surface in the provider schema and the df-schema +/// SECURITY DEFINER wrappers created by the extension/upgrade script. When +/// false (a non-pg provider, or a schema that predates the wrappers), callers +/// fall back to the out-of-band client path. The catalog probe never raises, so +/// it is safe in a backend session even when the schema is absent. +fn in_tx_enqueue_supported(schema: &str) -> bool { + Spi::get_one_with_args::( + "SELECT \ + EXISTS(SELECT 1 FROM pg_catalog.pg_proc p \ + JOIN pg_catalog.pg_namespace n ON n.oid = p.pronamespace \ + WHERE n.nspname = $1 \ + AND p.proname = 'enqueue_orchestrator_work') \ + AND EXISTS(SELECT 1 FROM pg_catalog.pg_proc p \ + JOIN pg_catalog.pg_namespace n ON n.oid = p.pronamespace \ + WHERE n.nspname = 'df' \ + AND p.proname = '_enqueue_orchestrator_start' \ + AND p.pronargs = 3) \ + AND EXISTS(SELECT 1 FROM pg_catalog.pg_proc p \ + JOIN pg_catalog.pg_namespace n ON n.oid = p.pronamespace \ + WHERE n.nspname = 'df' \ + AND p.proname = '_enqueue_orchestrator_cancel' \ + AND p.pronargs = 2) \ + AND EXISTS(SELECT 1 FROM pg_catalog.pg_proc p \ + JOIN pg_catalog.pg_namespace n ON n.oid = p.pronamespace \ + WHERE n.nspname = 'df' \ + AND p.proname = '_enqueue_orchestrator_signal' \ + AND p.pronargs = 3)", + &[schema.into()], + ) + .ok() + .flatten() + .unwrap_or(false) +} + /// Maximum number of attempts to generate a collision-free random ID before /// giving up. The 8-hex ID space (`short_id`) makes collisions rare, so a small /// bound is plenty; exhausting it signals either an astronomically unlucky run @@ -1051,24 +1108,70 @@ pub fn start( vars }); - // Start the orchestration via duroxide + // Start the orchestration durably. let input = FunctionInput { instance_id: instance_id.clone(), label: label.map(|s| s.to_string()), vars, loop_iteration: 0, }; - let input_json = serde_json::to_string(&input).unwrap_or(instance_id.clone()); + let input_json = serde_json::to_string(&input).unwrap_or_else(|_| instance_id.clone()); + + // Prefer the atomic in-transaction enqueue (Option 3) when the duroxide-pg + // SQL provider is present; otherwise fall back to the provider-agnostic + // out-of-band client path (legacy behavior). + let schema = crate::types::backend_duroxide_schema(); + if in_tx_enqueue_supported(schema) { + // Option 3 (atomic start): enqueue the StartOrchestration work item via + // SPI inside the CALLER'S transaction, so the df.nodes/df.instances + // INSERTs above and this orchestrator-queue INSERT commit or roll back + // together. A rolled-back df.start() leaves no duroxide orphan, and the + // worker only observes the queue row after the df.* rows are visible + // (which also removes the load-function-graph "wait for commit" race). + // + // Fail clearly (and atomically — the df.* INSERTs roll back) if the + // worker has not yet initialised the duroxide schema. + if !crate::client::is_worker_ready() { + pgrx::error!( + "pg_durable background worker not yet initialized — try again in a moment" + ); + } - if let Err(e) = start_durable_function( - crate::orchestrations::execute_function_graph::NAME, - &instance_id, - &input_json, - ) { + // The SECURITY DEFINER wrapper builds the StartOrchestration work item + // server-side from these trusted arguments (the caller cannot choose the + // work-item variant or target a foreign instance) and performs the + // privileged orchestrator-queue INSERT on the caller's transaction. + // Raising on failure aborts the whole df.start atomically. + if let Err(e) = Spi::run_with_args( + "SELECT df._enqueue_orchestrator_start($1, $2, $3)", + &[ + instance_id.as_str().into(), + crate::orchestrations::execute_function_graph::NAME.into(), + input_json.as_str().into(), + ], + ) { + pgrx::error!("Failed to enqueue durable function start: {:?}", e); + } + } else { + // Fallback: no duroxide-pg SQL enqueue surface detected (a non-pg + // provider, or a schema predating the wrapper). Enqueue out-of-band via + // the duroxide client. NOTE: this path is NOT atomic with the caller's + // transaction — a rollback will NOT undo the start. Warn so the + // non-atomic semantics are observable. pgrx::log!( - "pg_durable: Warning - failed to start durable function: {}", - e + "pg_durable: df.start() is using the non-atomic fallback enqueue \ + (duroxide-pg SQL surface not detected); a rollback will not undo this start" ); + if let Err(e) = crate::client::start_durable_function( + crate::orchestrations::execute_function_graph::NAME, + &instance_id, + &input_json, + ) { + pgrx::log!( + "pg_durable: Warning - failed to start durable function: {}", + e + ); + } } instance_id @@ -1092,8 +1195,26 @@ pub fn cancel(instance_id: &str, reason: default!(&str, "'Cancelled by user'")) pgrx::error!("Instance not found or access denied: {}", instance_id); } - if let Err(e) = cancel_durable_function(instance_id, reason) { - return format!("Failed to cancel: {e}"); + // Enqueue the CancelInstance work item and flip the status mirror together. + // On the in-transaction path the enqueue and the status UPDATE commit + // atomically (no df.* / duroxide divergence on cancel); otherwise fall back + // to the out-of-band client path. + let schema = crate::types::backend_duroxide_schema(); + if in_tx_enqueue_supported(schema) { + if let Err(e) = Spi::run_with_args( + "SELECT df._enqueue_orchestrator_cancel($1, $2)", + &[instance_id.into(), reason.into()], + ) { + pgrx::error!("Failed to cancel: {:?}", e); + } + } else { + pgrx::log!( + "pg_durable: df.cancel() is using the non-atomic fallback enqueue \ + (duroxide-pg SQL surface not detected)" + ); + if let Err(e) = cancel_durable_function(instance_id, reason) { + return format!("Failed to cancel: {e}"); + } } // Update the instance status to 'cancelled' via SPI only when the instance is not diff --git a/src/lib.rs b/src/lib.rs index c3aad99..7b35905 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -29,7 +29,7 @@ pub static EXECUTION_ACQUIRE_TIMEOUT: GucSetting = GucSetting::::new(3 /// functions are explicitly desired. See docs/superuser_guc.md. pub static ENABLE_SUPERUSER_INSTANCES: GucSetting = GucSetting::::new(false); -/// Cron schedule for the built-in durable reconciler. +/// Cron schedule for the built-in durable reconciler (Option 4 / df.reconcile()). /// The background worker ensures one reconciler instance per cluster, running on /// this schedule. Set to an empty string to disable the built-in reconciler. pub static RECONCILER_CRON: GucSetting> = @@ -448,6 +448,14 @@ BEGIN EXECUTE pg_catalog.format('GRANT EXECUTE ON FUNCTION df.metrics() TO %I', p_role) OPERATOR(pg_catalog.||) grant_opt; END IF; + -- In-transaction enqueue wrappers — SECURITY DEFINER, revoked from PUBLIC at + -- install. Granted unconditionally to every df user because df.start() / + -- df.cancel() / df.signal() call them via SPI as the calling role; their own + -- internal authorization checks gate access to other users' instances. + EXECUTE pg_catalog.format('GRANT EXECUTE ON FUNCTION df._enqueue_orchestrator_start(text, text, text) TO %I', p_role) OPERATOR(pg_catalog.||) grant_opt; + EXECUTE pg_catalog.format('GRANT EXECUTE ON FUNCTION df._enqueue_orchestrator_cancel(text, text) TO %I', p_role) OPERATOR(pg_catalog.||) grant_opt; + EXECUTE pg_catalog.format('GRANT EXECUTE ON FUNCTION df._enqueue_orchestrator_signal(text, text, text) TO %I', p_role) OPERATOR(pg_catalog.||) grant_opt; + -- Table privileges EXECUTE pg_catalog.format('GRANT SELECT ON df.instances TO %I', p_role) OPERATOR(pg_catalog.||) grant_opt; EXECUTE pg_catalog.format('GRANT UPDATE (status, updated_at) ON df.instances TO %I', p_role) OPERATOR(pg_catalog.||) grant_opt; @@ -497,6 +505,24 @@ BEGIN NULL; END; + -- In-transaction enqueue wrappers (granted unconditionally by grant_usage()). + -- A delegated admin may not be the grantor of these; skip if so. + BEGIN + EXECUTE pg_catalog.format('REVOKE EXECUTE ON FUNCTION df._enqueue_orchestrator_start(text, text, text) FROM %I CASCADE', p_role); + EXCEPTION WHEN insufficient_privilege THEN + NULL; + END; + BEGIN + EXECUTE pg_catalog.format('REVOKE EXECUTE ON FUNCTION df._enqueue_orchestrator_cancel(text, text) FROM %I CASCADE', p_role); + EXCEPTION WHEN insufficient_privilege THEN + NULL; + END; + BEGIN + EXECUTE pg_catalog.format('REVOKE EXECUTE ON FUNCTION df._enqueue_orchestrator_signal(text, text, text) FROM %I CASCADE', p_role); + EXCEPTION WHEN insufficient_privilege THEN + NULL; + END; + -- Table privileges. -- Column-level revokes must match the column-level grants from grant_usage(). EXECUTE pg_catalog.format('REVOKE SELECT, INSERT, UPDATE, DELETE ON df.vars FROM %I CASCADE', p_role); @@ -550,16 +576,240 @@ REVOKE EXECUTE ON FUNCTION df.revoke_usage(text) FROM PUBLIC; ); // ============================================================================ -// Reconciler — repair residual df.* / duroxide divergence +// Atomic start enqueue (Option 3) — SECURITY DEFINER wrapper +// ============================================================================ +// +// df.start() enqueues the StartOrchestration work item by calling this wrapper +// via SPI, inside the caller's transaction. The duroxide orchestrator queue +// grants INSERT to its owner only, so the privileged INSERT must run as a +// definer that owns the duroxide tables; the call still commits/rolls back as +// part of the caller's transaction, giving an atomic df.start(). +// +// PROVIDER REQUIREMENT: this path only works with the duroxide-pg (SQL-backed) +// provider, because it calls the provider's `enqueue_orchestrator_work` SQL +// function directly. df.start() probes for that function and falls back to the +// provider-agnostic out-of-band client path when it is absent (see dsl::start). +// +// The schema is resolved dynamically via df.duroxide_schema() ('_duroxide' on +// fresh installs, legacy 'duroxide' on upgraded ones); %I quotes the identifier. +// The function is created by the worker's migrations (not this extension); +// plpgsql resolves it at call time. +// +// SECURITY: this wrapper is SECURITY DEFINER and granted to every df user via +// df.grant_usage() (df.start() runs SECURITY INVOKER and calls it via SPI). To +// avoid handing users an arbitrary-work-item enqueue primitive (which would let +// a caller forge CancelInstance/ExternalRaised/etc. against another user's +// instance), it (1) constructs the StartOrchestration work item server-side from +// the caller's arguments — the caller cannot choose the variant or a foreign +// target — and (2) authorizes the enqueue only for a brand-new, not-yet-started +// instance (pending df.instances row, no queue entry, no duroxide instance), +// which under atomic-start semantics is reachable only for the row df.start just +// inserted in the current transaction. Hardening direction (move the entrypoint +// into duroxide-pg, owned by the schema owner) is in docs/spec-atomic-start.md. +extension_sql!( + r#" +CREATE FUNCTION df._enqueue_orchestrator_start( + p_instance_id text, + p_orchestration text, + p_input text) +RETURNS void +LANGUAGE plpgsql +SECURITY DEFINER +SET search_path = pg_catalog, pg_temp +AS $fn$ +DECLARE + sch text := df.duroxide_schema(); + work_item text; + v_blocked boolean; +BEGIN + -- This wrapper is not a generic privileged "start any orchestration" entry + -- point. df.start() passes the root graph-executor name and FunctionInput + -- JSON; reject anything else so a caller cannot use the SECURITY DEFINER + -- privilege to enqueue an internal sub-orchestration with crafted input. + IF p_orchestration OPERATOR(pg_catalog.<>) 'pg_durable::orchestration::execute-function-graph' THEN + RAISE EXCEPTION 'pg_durable: invalid start orchestration %', p_orchestration + USING ERRCODE = 'invalid_parameter_value'; + END IF; + + IF (p_input::jsonb ->> 'instance_id') IS DISTINCT FROM p_instance_id THEN + RAISE EXCEPTION 'pg_durable: start input instance_id does not match %', p_instance_id + USING ERRCODE = 'invalid_parameter_value'; + END IF; + + -- Authorization. This runs as the (privileged) definer, so it must not + -- trust the caller to only target their own instance. Permit the enqueue + -- only for the transaction that inserted a brand-new, not-yet-started + -- instance: a 'pending' df.instances row with no orchestrator-queue entry, + -- no duroxide instance, and an in-progress xmin visible to this transaction. + -- This preserves SECURITY DEFINER / SET ROLE df.start() semantics while + -- blocking a caller from starting another user's previously-committed + -- pending row. Checking pg_xact_status(xmin) rather than equality to + -- pg_current_xact_id() keeps PL/pgSQL exception subtransactions working. + -- The wrapper is safe because it also fixes the orchestration to the root + -- graph executor and validates the input instance id, so callers cannot + -- start internal orchestrations or target someone else's already-started + -- instance. + EXECUTE pg_catalog.format( + 'SELECT NOT EXISTS (SELECT 1 FROM df.instances i ' + ' WHERE i.id = $1 ' + ' AND i.status = ''pending'' ' + ' AND pg_catalog.pg_xact_status(i.xmin::text::xid8) = ''in progress'') ' + ' OR EXISTS (SELECT 1 FROM %I.orchestrator_queue q WHERE q.instance_id = $1) ' + ' OR EXISTS (SELECT 1 FROM %I.instances d WHERE d.instance_id = $1)', + sch, sch) + INTO v_blocked + USING p_instance_id; + + IF v_blocked THEN + RAISE EXCEPTION 'pg_durable: not authorized to enqueue a start for instance %', p_instance_id + USING ERRCODE = 'insufficient_privilege'; + END IF; + + -- Build the StartOrchestration work item server-side so the caller cannot + -- choose the work-item variant (no CancelInstance/ExternalRaised/etc.) or + -- target a different instance. Mirrors duroxide's WorkItem::StartOrchestration. + work_item := pg_catalog.json_build_object( + 'StartOrchestration', pg_catalog.json_build_object( + 'instance', p_instance_id, + 'orchestration', 'pg_durable::orchestration::execute-function-graph', + 'input', p_input, + 'version', NULL, + 'parent_instance', NULL, + 'parent_id', NULL, + 'execution_id', 1))::text; + + EXECUTE pg_catalog.format('SELECT %I.enqueue_orchestrator_work($1, $2, $3)', sch) + USING p_instance_id, work_item, pg_catalog.now(); +END; +$fn$; + +REVOKE EXECUTE ON FUNCTION df._enqueue_orchestrator_start(text, text, text) FROM PUBLIC; +"#, + name = "atomic_start_enqueue", + requires = ["create_tables"] +); + +// ============================================================================ +// In-transaction signal / cancel enqueue (Part 1, extended) — SECURITY DEFINER +// ============================================================================ +// +// df.signal()/df.cancel() enqueue their work items (ExternalRaised / +// CancelInstance) via SPI through these wrappers, inside the caller's +// transaction, instead of out-of-band on the duroxide client pool. Both target +// an already-committed instance, so the start wrapper's "brand-new instance" +// guard does not apply; instead they authorize on ownership. +// +// AUTHORIZATION: these are SECURITY DEFINER (the orchestrator queue is +// owner-only) and granted to every df user via df.grant_usage(), so they must +// not let a caller signal/cancel a foreign instance. current_user is the definer +// here, so ownership is checked against session_user (the unforgeable +// authenticated role) plus membership in the instance's submitted_by — i.e. the +// caller's session must be able to act as the instance owner. The work items are +// built server-side from the arguments (no opaque caller-supplied work item). +extension_sql!( + r#" +CREATE FUNCTION df._enqueue_orchestrator_cancel(p_instance_id text, p_reason text) +RETURNS void +LANGUAGE plpgsql +SECURITY DEFINER +SET search_path = pg_catalog, pg_temp +AS $fn$ +DECLARE + sch text := df.duroxide_schema(); + owner_oid oid; +BEGIN + SELECT i.submitted_by::oid INTO owner_oid FROM df.instances i WHERE i.id = p_instance_id; + IF owner_oid IS NULL OR NOT pg_catalog.pg_has_role(session_user, owner_oid, 'MEMBER') THEN + RAISE EXCEPTION 'pg_durable: not authorized to cancel instance %', p_instance_id + USING ERRCODE = 'insufficient_privilege'; + END IF; + + EXECUTE pg_catalog.format('SELECT %I.enqueue_orchestrator_work($1, $2, $3)', sch) + USING p_instance_id, + pg_catalog.json_build_object('CancelInstance', + pg_catalog.json_build_object('instance', p_instance_id, 'reason', p_reason))::text, + pg_catalog.now(); +END; +$fn$; + +REVOKE EXECUTE ON FUNCTION df._enqueue_orchestrator_cancel(text, text) FROM PUBLIC; + +CREATE FUNCTION df._enqueue_orchestrator_signal(p_instance_id text, p_name text, p_data text) +RETURNS void +LANGUAGE plpgsql +SECURITY DEFINER +SET search_path = pg_catalog, pg_temp +AS $fn$ +DECLARE + sch text := df.duroxide_schema(); + owner_oid oid; + root_exists boolean; +BEGIN + SELECT i.submitted_by::oid INTO owner_oid FROM df.instances i WHERE i.id = p_instance_id; + IF owner_oid IS NULL OR NOT pg_catalog.pg_has_role(session_user, owner_oid, 'MEMBER') THEN + RAISE EXCEPTION 'pg_durable: not authorized to signal instance %', p_instance_id + USING ERRCODE = 'insufficient_privilege'; + END IF; + + -- Duroxide does not buffer external events until an orchestration has a + -- pending subscription. If the root runtime row is not materialized yet, a + -- signal would be accepted but dropped before the workflow can observe it. + EXECUTE pg_catalog.format( + 'SELECT EXISTS (SELECT 1 FROM %I.instances WHERE instance_id = $1)', sch) + INTO root_exists + USING p_instance_id; + IF NOT root_exists THEN + RAISE EXCEPTION 'pg_durable: instance % is not ready to receive signals', p_instance_id + USING ERRCODE = 'object_not_in_prerequisite_state'; + END IF; + + -- Raise the event for the target instance and every RUNNING descendant + -- (a sub-orchestration — JOIN/RACE branch or loop generation — may be the one + -- waiting on the signal), mirroring the out-of-band fan-out. %1$I = schema. + EXECUTE pg_catalog.format( + 'INSERT INTO %1$I.orchestrator_queue (instance_id, work_item, visible_at, created_at) ' + 'SELECT t.instance_id, ' + ' pg_catalog.json_build_object(''ExternalRaised'', ' + ' pg_catalog.json_build_object(''instance'', t.instance_id, ''name'', $2, ''data'', $3))::text, ' + ' pg_catalog.now(), pg_catalog.now() ' + 'FROM ( ' + ' WITH RECURSIVE tree AS ( ' + ' SELECT i.instance_id, i.current_execution_id, true AS is_root ' + ' FROM %1$I.instances i WHERE i.instance_id = $1 ' + ' UNION ' + ' SELECT c.instance_id, c.current_execution_id, false ' + ' FROM %1$I.instances c JOIN tree p ON c.parent_instance_id = p.instance_id ' + ' ) ' + ' SELECT tr.instance_id ' + ' FROM tree tr ' + ' LEFT JOIN %1$I.executions e ' + ' ON e.instance_id = tr.instance_id AND e.execution_id = tr.current_execution_id ' + ' WHERE tr.is_root OR pg_catalog.lower(COALESCE(e.status, '''')) = ''running'' ' + ') t', + sch) + USING p_instance_id, p_name, p_data; +END; +$fn$; + +REVOKE EXECUTE ON FUNCTION df._enqueue_orchestrator_signal(text, text, text) FROM PUBLIC; +"#, + name = "atomic_signal_cancel_enqueue", + requires = ["create_tables"] +); + +// ============================================================================ +// Reconciler (Option 4) — repair residual df.* / duroxide divergence // ============================================================================ // -// Some divergence can arise from legacy non-atomic starts, fallback paths, or -// crashes mid-execution. df.reconcile() is a best-effort garbage collector for -// that residue. It is admin-only (EXECUTE revoked from PUBLIC) and SECURITY -// DEFINER so it can touch the duroxide-owned tables and all users' df.instances. +// Atomic df.start()/df.signal()/df.cancel() commit their duroxide enqueue with +// the caller's transaction, but some divergence can still arise: legacy orphans, +// the non-atomic fallback path, and crashes mid-execution. df.reconcile() is a +// best-effort garbage collector for that residue. It is +// admin-only (EXECUTE revoked from PUBLIC) and SECURITY DEFINER so it can touch +// the duroxide-owned tables and all users' df.instances. // -// The background worker keeps one reconciler instance running automatically, -// dogfooding pg_durable as a durable cron loop: +// The background worker keeps one reconciler instance running automatically +// (worker::ensure_reconciler), dogfooding pg_durable as a durable cron loop: // df.start(df.loop(df.seq('SELECT * FROM df.reconcile()', // df.wait_for_schedule())), // 'df_reconciler') @@ -625,7 +875,7 @@ BEGIN END; -- 2) df.instances stuck non-terminal with no live duroxide instance and no - -- queued start (lost enqueue) -> mark failed. The duroxide queue row + -- queued start (lost enqueue) → mark failed. The duroxide queue row -- persists (locked) until ack, and the instance row is created at ack, so -- a healthy in-flight start always matches one of the NOT EXISTS guards -- and is never failed here. Best-effort; wrapped like step 1. diff --git a/src/types.rs b/src/types.rs index 9affd2a..25395cb 100644 --- a/src/types.rs +++ b/src/types.rs @@ -365,25 +365,29 @@ pub fn worker_provider_config( config } -/// Calculate the duration until the next cron schedule match -pub fn calculate_cron_wait(cron_expr: &str) -> Result { +/// Calculate the duration until the next cron schedule match, relative to a +/// caller-supplied `now`. Used by the WAIT_SCHEDULE node executor with the +/// orchestration's deterministic clock so a looped `df.wait_for_schedule` waits +/// until the *next* tick each generation instead of replaying a fixed offset. +pub fn calculate_cron_wait_from(cron_expr: &str, now: DateTime) -> Result { let cron_with_seconds = format!("0 {cron_expr}"); let schedule = CronSchedule::from_str(&cron_with_seconds) .map_err(|e| format!("Invalid cron expression '{cron_expr}': {e}"))?; - let now: DateTime = Utc::now(); - let next = schedule - .upcoming(Utc) + .after(&now) .next() .ok_or_else(|| "No upcoming schedule found".to_string())?; - let duration = (next - now) + (next - now) .to_std() - .map_err(|_| "Failed to calculate wait duration".to_string())?; + .map_err(|_| "Failed to calculate wait duration".to_string()) +} - Ok(duration) +/// Calculate the duration until the next cron schedule match +pub fn calculate_cron_wait(cron_expr: &str) -> Result { + calculate_cron_wait_from(cron_expr, Utc::now()) } /// Evaluate a condition result to determine if it's truthy. diff --git a/tests/e2e/sql/07_signals.sql b/tests/e2e/sql/07_signals.sql index c093f69..a35f40d 100644 --- a/tests/e2e/sql/07_signals.sql +++ b/tests/e2e/sql/07_signals.sql @@ -15,6 +15,53 @@ CREATE TABLE signal_test_log ( created_at TIMESTAMPTZ DEFAULT now() ); +RESET SESSION AUTHORIZATION; + +CREATE OR REPLACE FUNCTION public.signal_test_send_until_terminal( + p_instance_id TEXT, + p_signal_name TEXT, + p_signal_data TEXT, + p_max_attempts INT DEFAULT 100 +) RETURNS VOID +LANGUAGE plpgsql +SECURITY DEFINER +SET search_path = pg_catalog, public, pg_temp +AS $$ +DECLARE + status TEXT; + err_msg TEXT; + connstr TEXT := format('host=localhost dbname=postgres port=%s user=postgres', current_setting('port')); + attempts INT := 0; +BEGIN + LOOP + SELECT t.status INTO status + FROM dblink(connstr, format('SELECT df.status(%L)', p_instance_id)) + AS t(status TEXT); + EXIT WHEN lower(status) IN ('completed', 'failed', 'cancelled'); + EXIT WHEN attempts > p_max_attempts; + + BEGIN + PERFORM 1 + FROM dblink( + connstr, + format('SELECT df.signal(%L, %L, %L)', p_instance_id, p_signal_name, p_signal_data) + ) AS t(result TEXT); + EXCEPTION + WHEN OTHERS THEN + GET STACKED DIAGNOSTICS err_msg = MESSAGE_TEXT; + IF err_msg NOT LIKE '%not ready to receive signals%' THEN + RAISE; + END IF; + END; + + PERFORM pg_sleep(0.1); + attempts := attempts + 1; + END LOOP; +END $$; +GRANT EXECUTE ON FUNCTION public.signal_test_send_until_terminal(TEXT, TEXT, TEXT, INT) TO df_e2e_user; + +SET SESSION AUTHORIZATION df_e2e_user; + -- Test 1: Basic Signal Send/Receive CREATE TEMP TABLE _test_signal_basic (instance_id TEXT); @@ -53,18 +100,9 @@ END $$; DO $$ DECLARE inst_id TEXT; - status TEXT; - attempts INT := 0; BEGIN SELECT instance_id INTO inst_id FROM _test_signal_basic; - LOOP - SELECT s INTO status FROM df.status(inst_id) s; - EXIT WHEN lower(status) IN ('completed', 'failed', 'cancelled'); - EXIT WHEN attempts > 100; - PERFORM df.signal(inst_id, 'go', '{"value": 42}'); - PERFORM pg_sleep(0.1); - attempts := attempts + 1; - END LOOP; + PERFORM public.signal_test_send_until_terminal(inst_id, 'go', '{"value": 42}'); RAISE NOTICE 'Sent signal to %', inst_id; END $$; @@ -163,20 +201,12 @@ DO $$ DECLARE inst_id TEXT; status TEXT; - attempts INT := 0; BEGIN SELECT instance_id INTO inst_id FROM _test_signal_data; RAISE NOTICE 'Testing signal with data: %', inst_id; -- Retry the signal until consumed; see Test 1 / duroxide #154. - LOOP - SELECT s INTO status FROM df.status(inst_id) s; - EXIT WHEN lower(status) IN ('completed', 'failed', 'cancelled'); - EXIT WHEN attempts > 100; - PERFORM df.signal(inst_id, 'approval', '{"approved": true, "approver": "jane@acme.com"}'); - PERFORM pg_sleep(0.1); - attempts := attempts + 1; - END LOOP; + PERFORM public.signal_test_send_until_terminal(inst_id, 'approval', '{"approved": true, "approver": "jane@acme.com"}'); SELECT df.await_instance(inst_id, 10) INTO status; @@ -219,20 +249,12 @@ DO $$ DECLARE inst_id TEXT; status TEXT; - attempts INT := 0; BEGIN SELECT instance_id INTO inst_id FROM _test_signal_text; RAISE NOTICE 'Testing signal with plain text data: %', inst_id; -- Retry the signal until consumed; see Test 1 / duroxide #154. - LOOP - SELECT s INTO status FROM df.status(inst_id) s; - EXIT WHEN lower(status) IN ('completed', 'failed', 'cancelled'); - EXIT WHEN attempts > 100; - PERFORM df.signal(inst_id, 'plain_text', 'approve'); - PERFORM pg_sleep(0.1); - attempts := attempts + 1; - END LOOP; + PERFORM public.signal_test_send_until_terminal(inst_id, 'plain_text', 'approve'); SELECT df.await_instance(inst_id, 10) INTO status; @@ -282,18 +304,9 @@ INSERT INTO _test_signal_then_named SELECT df.start( DO $$ DECLARE inst_id TEXT; - status TEXT; - attempts INT := 0; BEGIN SELECT instance_id INTO inst_id FROM _test_signal_then_named; - LOOP - SELECT s INTO status FROM df.status(inst_id) s; - EXIT WHEN lower(status) IN ('completed', 'failed', 'cancelled'); - EXIT WHEN attempts > 100; - PERFORM df.signal(inst_id, 'test_approval_then', '{"approved": true}'); - PERFORM pg_sleep(0.1); - attempts := attempts + 1; - END LOOP; + PERFORM public.signal_test_send_until_terminal(inst_id, 'test_approval_then', '{"approved": true}'); END $$; DO $$ @@ -324,4 +337,5 @@ DROP TABLE _test_signal_then_named; DROP TABLE signal_test_log; RESET SESSION AUTHORIZATION; +DROP FUNCTION public.signal_test_send_until_terminal(TEXT, TEXT, TEXT, INT); SELECT 'ALL SIGNAL TESTS PASSED' AS result; diff --git a/tests/e2e/sql/11_cross_connection.sql b/tests/e2e/sql/11_cross_connection.sql index 6347b2e..8e879e3 100644 --- a/tests/e2e/sql/11_cross_connection.sql +++ b/tests/e2e/sql/11_cross_connection.sql @@ -56,15 +56,38 @@ DECLARE inst_id TEXT; connstr TEXT; result TEXT; + status TEXT; + err_msg TEXT; + attempts INT := 0; BEGIN SELECT instance_id INTO inst_id FROM _test_cross_signal; SELECT c.connstr INTO connstr FROM _dblink_conn c; - - SELECT * INTO result FROM dblink( - connstr, - format('SELECT df.signal(%L, ''external_trigger'', ''{"source": "other_connection", "value": 123}'')', inst_id) - ) AS t(result TEXT); - + + LOOP + SELECT * INTO status FROM dblink( + connstr, + format('SELECT df.status(%L)', inst_id) + ) AS t(status TEXT); + EXIT WHEN lower(status) IN ('completed', 'failed', 'cancelled'); + EXIT WHEN attempts > 100; + + BEGIN + SELECT * INTO result FROM dblink( + connstr, + format('SELECT df.signal(%L, ''external_trigger'', ''{"source": "other_connection", "value": 123}'')', inst_id) + ) AS t(result TEXT); + EXCEPTION + WHEN OTHERS THEN + GET STACKED DIAGNOSTICS err_msg = MESSAGE_TEXT; + IF err_msg NOT LIKE '%not ready to receive signals%' THEN + RAISE; + END IF; + END; + + PERFORM pg_sleep(0.1); + attempts := attempts + 1; + END LOOP; + RAISE NOTICE 'Signal sent from other connection: %', result; END $$; diff --git a/tests/e2e/sql/24_atomic_rollback.sql b/tests/e2e/sql/24_atomic_rollback.sql new file mode 100644 index 0000000..d9de3d4 --- /dev/null +++ b/tests/e2e/sql/24_atomic_rollback.sql @@ -0,0 +1,207 @@ +-- Copyright (c) Microsoft Corporation. +-- Licensed under the PostgreSQL License. + +-- Tests: df.start / df.cancel / df.signal enqueue on the CALLER'S transaction. +-- +-- Regression test for df/_duroxide divergence. Originally the duroxide enqueue +-- happened out-of-band on a separate connection that committed independently, so +-- rolling back the caller's transaction left the runtime store ahead of the +-- control plane: +-- * a rolled-back df.start() left an orphaned orchestration in _duroxide; +-- * a rolled-back df.cancel() still cancelled the (committed) instance; +-- * a rolled-back df.signal() still delivered the signal. +-- +-- With the in-transaction enqueue, a ROLLBACK undoes the runtime work too. Each +-- scenario below FAILS without that change. +-- +-- _duroxide is owned by the worker role, so its tables are read as the superuser +-- (postgres); the durable functions themselves run as the non-superuser +-- df_e2e_user. + +-- Helper (superuser): assert no _duroxide residue remains for an instance id. +-- Resolves the runtime schema dynamically (df.duroxide_schema()). +CREATE OR REPLACE FUNCTION pg_temp.assert_no_duroxide_residue(p_id text) +RETURNS void LANGUAGE plpgsql AS $$ +DECLARE + sch text := df.duroxide_schema(); + n int; +BEGIN + EXECUTE format( + 'SELECT (SELECT count(*) FROM %I.orchestrator_queue WHERE instance_id = $1) ' + ' + (SELECT count(*) FROM %I.instances WHERE instance_id = $1)', + sch, sch) + INTO n USING p_id; + + IF n > 0 THEN + RAISE EXCEPTION 'TEST FAILED: rolled-back df.start left % _duroxide row(s) for instance % (non-atomic enqueue leaked)', n, p_id; + END IF; + RAISE NOTICE 'PASSED [start_rollback]: no _duroxide residue for %', p_id; +END $$; + +-- =========================================================================== +-- Scenario 1: a rolled-back df.start() leaves no _duroxide orphan +-- =========================================================================== + +SET SESSION AUTHORIZATION df_e2e_user; +BEGIN; +SELECT df.start('SELECT 42', 'atomic-rb-start') AS rb_id \gset +ROLLBACK; +RESET SESSION AUTHORIZATION; + +-- Give any (buggy) out-of-band enqueue a moment to surface before asserting. +SELECT pg_sleep(1); +SELECT pg_temp.assert_no_duroxide_residue(:'rb_id'); + +-- =========================================================================== +-- Scenario 2: a rolled-back df.cancel() does NOT cancel the instance +-- =========================================================================== + +SET SESSION AUTHORIZATION df_e2e_user; + +CREATE TEMP TABLE _t_cancel (instance_id TEXT); +INSERT INTO _t_cancel +SELECT df.start(df.loop(df.seq('SELECT 1', df.sleep(1))), 'atomic-rb-cancel'); + +-- Wait until the instance is genuinely running. +DO $$ +DECLARE inst_id TEXT; status TEXT; attempts INT := 0; +BEGIN + SELECT instance_id INTO inst_id FROM _t_cancel; + LOOP + SELECT s INTO status FROM df.status(inst_id) s; + EXIT WHEN lower(status) = 'running' OR attempts > 300; + PERFORM pg_sleep(0.1); + attempts := attempts + 1; + END LOOP; + IF lower(status) <> 'running' THEN + RAISE EXCEPTION 'Scenario 2 setup: cancel victim never reached running (status=%)', status; + END IF; +END $$; + +-- Issue a cancel, then roll it back. +BEGIN; +SELECT df.cancel((SELECT instance_id FROM _t_cancel), 'rolled-back-cancel'); +ROLLBACK; + +-- The instance must NOT become cancelled: the cancel enqueue was rolled back. +-- (Without the change the out-of-band CancelInstance committed, so the worker +-- cancels the instance within ~1s.) +DO $$ +DECLARE inst_id TEXT; status TEXT; attempts INT := 0; +BEGIN + SELECT instance_id INTO inst_id FROM _t_cancel; + LOOP + SELECT s INTO status FROM df.status(inst_id) s; + IF lower(status) = 'cancelled' THEN + RAISE EXCEPTION 'TEST FAILED: rolled-back df.cancel still cancelled instance % (non-atomic enqueue leaked)', inst_id; + END IF; + EXIT WHEN attempts > 50; -- watch for ~5s + PERFORM pg_sleep(0.1); + attempts := attempts + 1; + END LOOP; + RAISE NOTICE 'PASSED [cancel_rollback]: instance % still % after a rolled-back cancel', inst_id, status; +END $$; + +-- Cleanup: cancel the (infinite) loop for real. +SELECT df.cancel((SELECT instance_id FROM _t_cancel), 'cleanup'); +DROP TABLE _t_cancel; + +RESET SESSION AUTHORIZATION; + +-- =========================================================================== +-- Scenario 3: a rolled-back df.signal() does NOT deliver the signal +-- =========================================================================== + +SET SESSION AUTHORIZATION df_e2e_user; + +CREATE TEMP TABLE _t_signal (instance_id TEXT); +INSERT INTO _t_signal +SELECT df.start('SELECT 1' ~> (df.wait_for_signal('go') |=> 'sig') ~> 'SELECT 1', + 'atomic-rb-signal'); + +-- Wait until the instance is running (blocked on the signal). +DO $$ +DECLARE inst_id TEXT; status TEXT; attempts INT := 0; +BEGIN + SELECT instance_id INTO inst_id FROM _t_signal; + LOOP + SELECT s INTO status FROM df.status(inst_id) s; + EXIT WHEN lower(status) = 'running' OR attempts > 300; + PERFORM pg_sleep(0.1); + attempts := attempts + 1; + END LOOP; + IF lower(status) <> 'running' THEN + RAISE EXCEPTION 'Scenario 3 setup: signal victim never reached running (status=%)', status; + END IF; +END $$; + +-- Send a signal, then roll it back. +BEGIN; +SELECT df.signal((SELECT instance_id FROM _t_signal), 'go', '{}'); +ROLLBACK; + +-- The instance must NOT complete: the signal enqueue was rolled back. +-- (Without the change the out-of-band ExternalRaised committed, so the instance +-- receives the signal and completes within ~1s.) +DO $$ +DECLARE inst_id TEXT; status TEXT; attempts INT := 0; +BEGIN + SELECT instance_id INTO inst_id FROM _t_signal; + LOOP + SELECT s INTO status FROM df.status(inst_id) s; + IF lower(status) = 'completed' THEN + RAISE EXCEPTION 'TEST FAILED: rolled-back df.signal still delivered to instance % (non-atomic enqueue leaked)', inst_id; + END IF; + EXIT WHEN attempts > 50; -- watch for ~5s + PERFORM pg_sleep(0.1); + attempts := attempts + 1; + END LOOP; + RAISE NOTICE 'PASSED [signal_rollback]: instance % still % after a rolled-back signal', inst_id, status; +END $$; + +-- Cleanup: deliver the signal for real and let it finish. +SELECT df.signal((SELECT instance_id FROM _t_signal), 'go', '{}'); +DO $$ +DECLARE inst_id TEXT; status TEXT; +BEGIN + SELECT instance_id INTO inst_id FROM _t_signal; + SELECT df.wait_for_completion(inst_id, 30) INTO status; +END $$; +DROP TABLE _t_signal; + +RESET SESSION AUTHORIZATION; + +-- =========================================================================== +-- Scenario 4: a signal sent before runtime materialization is rejected +-- =========================================================================== + +SET SESSION AUTHORIZATION df_e2e_user; + +DO $$ +DECLARE inst_id TEXT; +BEGIN + BEGIN + inst_id := df.start( + 'SELECT 1' ~> (df.wait_for_signal('go') |=> 'sig') ~> 'SELECT 1', + 'atomic-signal-before-runtime-ready' + ); + + -- The worker cannot have materialized the root _duroxide.instances row + -- yet (we are still inside the same backend statement/transaction). A + -- signal here would be accepted by the runtime but skipped before the + -- workflow subscribes, so df.signal must fail instead of returning OK. + PERFORM df.signal(inst_id, 'go', '{}'); + RAISE EXCEPTION 'TEST FAILED: df.signal accepted instance % before runtime materialization', inst_id; + EXCEPTION + WHEN object_not_in_prerequisite_state THEN + IF SQLERRM LIKE '%not ready to receive signals%' THEN + RAISE NOTICE 'PASSED [signal_before_runtime_ready_rejected]: %', SQLERRM; + ELSE + RAISE EXCEPTION 'TEST FAILED: unexpected not-ready signal error: %', SQLERRM; + END IF; + END; +END $$; + +RESET SESSION AUTHORIZATION; + +SELECT 'TEST PASSED: atomic rollback (start/cancel/signal)' AS result; diff --git a/tests/e2e/sql/25_enqueue_wrapper_authz.sql b/tests/e2e/sql/25_enqueue_wrapper_authz.sql new file mode 100644 index 0000000..631179a --- /dev/null +++ b/tests/e2e/sql/25_enqueue_wrapper_authz.sql @@ -0,0 +1,246 @@ +-- Copyright (c) Microsoft Corporation. +-- Licensed under the PostgreSQL License. + +-- Tests: authorization and hardening on the in-transaction enqueue wrappers. +-- +-- The df._enqueue_orchestrator_* functions are SECURITY DEFINER (the runtime +-- queue is writable by its owner only) and granted to every df user via +-- df.grant_usage(). They must therefore refuse to enqueue work against an +-- instance the caller does not own, and the start wrapper must not become a +-- generic "start any internal orchestration with arbitrary input" primitive. +-- Cancel/signal ownership is checked with pg_has_role(session_user, +-- , 'MEMBER'): session_user cannot be spoofed inside a +-- SECURITY DEFINER function, and membership lets a role that owns the instance +-- through SET ROLE still qualify. +-- +-- Without the change these wrappers do not exist, so the forge attempts below +-- raise undefined_function and the test fails. + +-- Fresh, non-superuser roles. (Superusers bypass pg_has_role, so the denial can +-- only be exercised by a non-superuser caller.) +DO $precleanup$ +DECLARE + inst_id TEXT; +BEGIN + -- A failed/interrupted prior run can leave a loop instance submitted by one + -- of these roles. Cancel it before DROP ROLE so the worker stops reconnecting + -- as that role while this test rebuilds privileges. + FOR inst_id IN + SELECT i.id + FROM df.instances i + JOIN pg_catalog.pg_roles r ON r.oid = i.submitted_by::oid + WHERE r.rolname IN ('authz_owner', 'authz_other') + AND i.status NOT IN ('completed', 'failed', 'cancelled') + LOOP + PERFORM df.cancel(inst_id, 'authz-test-precleanup'); + END LOOP; +END $precleanup$; + +SELECT pg_sleep(0.5); + +DO $setup$ +BEGIN + BEGIN DROP OWNED BY authz_owner; EXCEPTION WHEN undefined_object THEN NULL; END; + BEGIN DROP OWNED BY authz_other; EXCEPTION WHEN undefined_object THEN NULL; END; + BEGIN DROP ROLE authz_owner; EXCEPTION WHEN undefined_object THEN NULL; END; + BEGIN DROP ROLE authz_other; EXCEPTION WHEN undefined_object THEN NULL; END; +END $setup$; + +CREATE ROLE authz_owner LOGIN; +CREATE ROLE authz_other LOGIN; +SELECT df.grant_usage('authz_owner'); +SELECT df.grant_usage('authz_other'); + +-- The owner starts a long-running instance. +SET SESSION AUTHORIZATION authz_owner; +CREATE TEMP TABLE _t_authz (instance_id TEXT); +INSERT INTO _t_authz +SELECT df.start(df.loop(df.seq('SELECT 1', df.sleep(1))), 'authz-owner-inst'); +RESET SESSION AUTHORIZATION; + +-- Let the non-owner read the instance id (so the forge attempts can target it). +GRANT SELECT ON _t_authz TO authz_other; + +-- Wait until it is running. +DO $$ +DECLARE inst_id TEXT; status TEXT; attempts INT := 0; +BEGIN + SELECT instance_id INTO inst_id FROM _t_authz; + LOOP + SELECT s INTO status FROM df.status(inst_id) s; + EXIT WHEN lower(status) = 'running' OR attempts > 300; + PERFORM pg_sleep(0.1); + attempts := attempts + 1; + END LOOP; + IF lower(status) <> 'running' THEN + RAISE EXCEPTION 'Setup: owner instance never reached running (status=%)', status; + END IF; +END $$; + +-- =========================================================================== +-- A non-owner cannot forge a cancel/signal against the owner's instance. +-- =========================================================================== + +SET SESSION AUTHORIZATION authz_other; +DO $$ +DECLARE inst_id TEXT; +BEGIN + SELECT instance_id INTO inst_id FROM _t_authz; + + -- Forge a cancel. + BEGIN + PERFORM df._enqueue_orchestrator_cancel(inst_id, 'forged'); + RAISE EXCEPTION 'TEST FAILED: authz_other was allowed to enqueue a cancel for owner instance %', inst_id; + EXCEPTION + WHEN insufficient_privilege THEN + IF SQLERRM LIKE '%not authorized%' THEN + RAISE NOTICE 'PASSED [cancel_forge_denied]: %', SQLERRM; + ELSE + RAISE EXCEPTION 'TEST FAILED: cancel denied, but not by the wrapper authorization check: %', SQLERRM; + END IF; + WHEN undefined_function THEN + RAISE EXCEPTION 'TEST FAILED: df._enqueue_orchestrator_cancel is missing (change not present): %', SQLERRM; + END; + + -- Forge a signal. + BEGIN + PERFORM df._enqueue_orchestrator_signal(inst_id, 'go', '{}'); + RAISE EXCEPTION 'TEST FAILED: authz_other was allowed to enqueue a signal for owner instance %', inst_id; + EXCEPTION + WHEN insufficient_privilege THEN + IF SQLERRM LIKE '%not authorized%' THEN + RAISE NOTICE 'PASSED [signal_forge_denied]: %', SQLERRM; + ELSE + RAISE EXCEPTION 'TEST FAILED: signal denied, but not by the wrapper authorization check: %', SQLERRM; + END IF; + WHEN undefined_function THEN + RAISE EXCEPTION 'TEST FAILED: df._enqueue_orchestrator_signal is missing (change not present): %', SQLERRM; + END; +END $$; +RESET SESSION AUTHORIZATION; + +-- =========================================================================== +-- A non-owner cannot forge a start against another user's pending instance. +-- =========================================================================== + +SET SESSION AUTHORIZATION authz_owner; +DO $$ +DECLARE + inst_id CONSTANT TEXT := 'badc0ffe'; + root_id CONSTANT TEXT := 'cafebabe'; +BEGIN + INSERT INTO df.instances (id, label, root_node, submitted_by, database) + VALUES (inst_id, 'authz-foreign-pending-start', root_id, current_user::regrole, 'postgres'); + + INSERT INTO df.nodes (id, instance_id, node_type, query, submitted_by, database) + VALUES (root_id, inst_id, 'SQL', 'SELECT 1', current_user::regrole, 'postgres'); +END $$; +RESET SESSION AUTHORIZATION; + +SET SESSION AUTHORIZATION authz_other; +DO $$ +DECLARE inst_id CONSTANT TEXT := 'badc0ffe'; +BEGIN + BEGIN + PERFORM df._enqueue_orchestrator_start( + inst_id, + 'pg_durable::orchestration::execute-function-graph', + json_build_object('instance_id', inst_id)::text); + RAISE EXCEPTION 'TEST FAILED: authz_other was allowed to enqueue a start for owner pending instance %', inst_id; + EXCEPTION + WHEN insufficient_privilege THEN + IF SQLERRM LIKE '%not authorized%' THEN + RAISE NOTICE 'PASSED [start_forge_denied]: %', SQLERRM; + ELSE + RAISE EXCEPTION 'TEST FAILED: start denied, but not by the wrapper authorization check: %', SQLERRM; + END IF; + WHEN undefined_function THEN + RAISE EXCEPTION 'TEST FAILED: df._enqueue_orchestrator_start is missing (change not present): %', SQLERRM; + END; +END $$; +RESET SESSION AUTHORIZATION; + +BEGIN; +DELETE FROM df.instances WHERE id = 'badc0ffe'; +DELETE FROM df.nodes WHERE instance_id = 'badc0ffe'; +COMMIT; + +-- =========================================================================== +-- A caller cannot use the start wrapper as a generic privileged entrypoint. +-- =========================================================================== + +SET SESSION AUTHORIZATION authz_other; +DO $$ +DECLARE + inst_id CONSTANT TEXT := 'feedf00d'; + root_id CONSTANT TEXT := 'deadbeef'; +BEGIN + -- Create a legitimate brand-new, pending instance owned by authz_other. The + -- start wrapper's state check should pass for this row; the test verifies + -- the wrapper still rejects a non-root internal orchestration name. + INSERT INTO df.instances (id, label, root_node, submitted_by, database) + VALUES (inst_id, 'authz-start-wrapper-hardening', root_id, current_user::regrole, 'postgres'); + + INSERT INTO df.nodes (id, instance_id, node_type, query, submitted_by, database) + VALUES (root_id, inst_id, 'SQL', 'SELECT 1', current_user::regrole, 'postgres'); + + BEGIN + PERFORM df._enqueue_orchestrator_start( + inst_id, + 'pg_durable::orchestration::execute-subtree', + json_build_object('instance_id', inst_id)::text); + RAISE EXCEPTION 'TEST FAILED: start wrapper accepted a non-root orchestration for instance %', inst_id; + EXCEPTION + WHEN invalid_parameter_value THEN + IF SQLERRM LIKE '%invalid start orchestration%' THEN + RAISE NOTICE 'PASSED [start_wrapper_rejects_internal_orchestration]: %', SQLERRM; + ELSE + RAISE EXCEPTION 'TEST FAILED: start wrapper rejected with an unexpected validation error: %', SQLERRM; + END IF; + WHEN undefined_function THEN + RAISE EXCEPTION 'TEST FAILED: df._enqueue_orchestrator_start is missing (change not present): %', SQLERRM; + END; +END $$; +RESET SESSION AUTHORIZATION; + +BEGIN; +DELETE FROM df.instances WHERE id = 'feedf00d'; +DELETE FROM df.nodes WHERE instance_id = 'feedf00d'; +COMMIT; + +-- =========================================================================== +-- The owner can still cancel its own instance (the authorized path works). +-- =========================================================================== + +SET SESSION AUTHORIZATION authz_owner; +SELECT df.cancel((SELECT instance_id FROM _t_authz), 'owner-cancel'); +RESET SESSION AUTHORIZATION; + +DO $$ +DECLARE inst_id TEXT; status TEXT; attempts INT := 0; +BEGIN + SELECT instance_id INTO inst_id FROM _t_authz; + LOOP + SELECT s INTO status FROM df.status(inst_id) s; + EXIT WHEN lower(status) = 'cancelled' OR attempts > 100; + PERFORM pg_sleep(0.1); + attempts := attempts + 1; + END LOOP; + IF lower(status) <> 'cancelled' THEN + RAISE EXCEPTION 'TEST FAILED: owner could not cancel its own instance % (status=%)', inst_id, status; + END IF; + RAISE NOTICE 'PASSED [owner_cancel_allowed]: instance % cancelled by its owner', inst_id; +END $$; + +DROP TABLE _t_authz; + +-- Cleanup roles. +DO $cleanup$ +BEGIN + BEGIN DROP OWNED BY authz_owner; EXCEPTION WHEN undefined_object THEN NULL; END; + BEGIN DROP OWNED BY authz_other; EXCEPTION WHEN undefined_object THEN NULL; END; + BEGIN DROP ROLE authz_owner; EXCEPTION WHEN undefined_object THEN NULL; END; + BEGIN DROP ROLE authz_other; EXCEPTION WHEN undefined_object THEN NULL; END; +END $cleanup$; + +SELECT 'TEST PASSED: enqueue wrapper authorization' AS result;