From 0360f1c6f252649f42c66cf1a77798670e2ee8dd Mon Sep 17 00:00:00 2001 From: Nanook Date: Thu, 25 Jun 2026 17:47:12 +0000 Subject: [PATCH] feat: expose signal wait marker on instances --- docs/spec-security-model.md | 2 +- docs/upgrade-testing.md | 7 ++ sql/pg_durable--0.2.3--0.2.4.sql | 64 ++++++++++++- src/activities/update_instance_status.rs | 43 ++++++++- src/activities/update_node_status.rs | 110 +++++++++++++++++++++++ src/dsl.rs | 45 ++++++++-- src/lib.rs | 9 +- tests/e2e/sql/07_signals.sql | 32 ++++++- tests/e2e/sql/17_superuser_guc.sql | 2 +- 9 files changed, 296 insertions(+), 18 deletions(-) diff --git a/docs/spec-security-model.md b/docs/spec-security-model.md index c496185e..6bc87e63 100644 --- a/docs/spec-security-model.md +++ b/docs/spec-security-model.md @@ -184,7 +184,7 @@ SELECT df.start( **Threat**: User queries `df.instances` or `df.nodes` to see other users' durable functions. -**Mitigation (implemented)**: RLS policies on `df.instances` and `df.nodes` enforce per-user visibility using `submitted_by = current_user::regrole`. Auto-grants provide SELECT+INSERT on both tables and column-level `UPDATE (status, updated_at)` on instances (no DELETE). Ownership checks in `df.cancel()` and `df.signal()` prevent cross-user operations via the duroxide client. Monitoring functions (`df.list_instances()`, `df.instance_info()`, etc.) also enforce ownership. +**Mitigation (implemented)**: RLS policies on `df.instances` and `df.nodes` enforce per-user visibility using `submitted_by = current_user::regrole`. Auto-grants provide SELECT+INSERT on both tables and column-level `UPDATE` on runtime status columns for instances (no DELETE). Ownership checks in `df.cancel()` and `df.signal()` prevent cross-user operations via the duroxide client. Monitoring functions (`df.list_instances()`, `df.instance_info()`, etc.) also enforce ownership. See [rls.md](rls.md) for the full design, policy definitions, grant strategy, and decisions. diff --git a/docs/upgrade-testing.md b/docs/upgrade-testing.md index 3008e633..1768171b 100644 --- a/docs/upgrade-testing.md +++ b/docs/upgrade-testing.md @@ -205,6 +205,13 @@ what the upgrade script handles, and any backward compatibility considerations. ### v0.2.3 → v0.2.4 +#### #239 Expose signal waits on `df.instances` +- **DDL change (df schema):** Adds nullable `df.instances.blocked_on_signal TEXT`. Fresh installs (`src/lib.rs`) create it directly; the upgrade script adds the column and its comment. Existing rows need no backfill: non-waiting instances remain `NULL`, and future SIGNAL node execution sets the marker when the node enters `running`. +- **Runtime behavior:** The `update-node-status` activity recomputes `blocked_on_signal` from the instance's currently running SIGNAL nodes whenever a SIGNAL node changes status. `update-instance-status` clears the column on terminal transitions (`completed`, `failed`, `cancelled`), and `df.cancel()` clears it with the cancellation status update. +- **Privilege behavior:** `df.grant_usage()` now grants `UPDATE (status, updated_at, blocked_on_signal)` on `df.instances`; `df.revoke_usage()` revokes the same column set. During upgrade, roles that already had column-level UPDATE on `status` or `updated_at` receive UPDATE on `blocked_on_signal`, preserving `WITH GRANT OPTION` only when all existing relevant runtime UPDATE grants were grantable. +- **Scenario B1 considerations:** New Rust code probes `pg_catalog.pg_attribute` before referencing `blocked_on_signal`, so the new `.so` remains valid against pre-upgrade schemas that do not yet have the column. `df.cancel()` similarly falls back to the old two-column UPDATE until the column exists. +- **Scenario B2 considerations:** The migration is additive and nullable. Existing instances, nodes, and vars are untouched; active SIGNAL nodes set the marker on their next node-status transition after the new schema is present. + #### Simplify `df.grant_usage()` — drop the explicit function allowlist - **DDL change (df schema):** `df.grant_usage()` no longer loops over a hard-coded `func_sigs` array issuing `GRANT EXECUTE` per function. Fresh installs (`src/lib.rs`) and the upgrade script (`sql/pg_durable--0.2.3--0.2.4.sql`) both `CREATE OR REPLACE` the function with a body that grants `USAGE ON SCHEMA df` plus the table privileges, and conditionally grants `df.http()` / the admin helpers. The signature `df.grant_usage(text, boolean, boolean)` is unchanged. - **DDL change (df schema):** `df.revoke_usage()` is made symmetric with the new `grant_usage()`. It no longer loops over every `df.*` function in `pg_proc` issuing `REVOKE EXECUTE` (which, post-simplification, only produced "no privileges could be revoked" warnings since ordinary functions are never granted per-function EXECUTE). The new body revokes only what `grant_usage()` grants: schema `USAGE`, EXECUTE on the sensitive functions (`df.http`, `df.grant_usage`, `df.revoke_usage`), and the table privileges. The signature `df.revoke_usage(text)` is unchanged. diff --git a/sql/pg_durable--0.2.3--0.2.4.sql b/sql/pg_durable--0.2.3--0.2.4.sql index 93380f42..36e01350 100644 --- a/sql/pg_durable--0.2.3--0.2.4.sql +++ b/sql/pg_durable--0.2.3--0.2.4.sql @@ -6,6 +6,66 @@ -- See docs/upgrade-testing.md for the upgrade-script and backward-compatibility -- requirements (Scenario A / B1 / B2). +-- ============================================================================ +-- Expose signal waits at the instance level (issue #239). +-- +-- `df.instances.blocked_on_signal` is set to the signal name while the instance +-- is parked on a SIGNAL node, and cleared when no SIGNAL wait remains or the +-- instance reaches a terminal state. Backfill is unnecessary: existing terminal +-- rows remain NULL, and newly executing SIGNAL nodes set the column when their +-- node status transitions to `running`. +-- +-- Preserve existing delegated permissions by granting UPDATE on the new column +-- to every role that already had UPDATE on df.instances.status or updated_at. +-- ============================================================================ +ALTER TABLE df.instances ADD COLUMN blocked_on_signal TEXT; + +COMMENT ON COLUMN df.instances.blocked_on_signal IS + 'Signal name while the instance is parked on a SIGNAL node; NULL otherwise'; + +DO $do$ +DECLARE + r RECORD; +BEGIN + FOR r IN + SELECT role_ident, pg_catalog.bool_and(is_grantable) AS all_grantable + FROM ( + SELECT + CASE + WHEN acl.grantee OPERATOR(pg_catalog.=) 0::oid THEN 'PUBLIC' + ELSE grantee_role.rolname + END AS role_ident, + acl.is_grantable + FROM pg_catalog.pg_class c + JOIN pg_catalog.pg_namespace n ON n.oid OPERATOR(pg_catalog.=) c.relnamespace + JOIN pg_catalog.pg_attribute a ON a.attrelid OPERATOR(pg_catalog.=) c.oid + CROSS JOIN LATERAL pg_catalog.aclexplode(a.attacl) acl + LEFT JOIN pg_catalog.pg_roles grantee_role ON grantee_role.oid OPERATOR(pg_catalog.=) acl.grantee + WHERE n.nspname OPERATOR(pg_catalog.=) 'df' + AND c.relname OPERATOR(pg_catalog.=) 'instances' + AND a.attname OPERATOR(pg_catalog.=) ANY (ARRAY['status', 'updated_at']) + AND acl.privilege_type OPERATOR(pg_catalog.=) 'UPDATE' + ) grants + WHERE role_ident IS NOT NULL + GROUP BY role_ident + LOOP + IF r.role_ident OPERATOR(pg_catalog.=) 'PUBLIC' THEN + EXECUTE 'GRANT UPDATE (blocked_on_signal) ON df.instances TO PUBLIC'; + ELSIF r.all_grantable THEN + EXECUTE pg_catalog.format( + 'GRANT UPDATE (blocked_on_signal) ON df.instances TO %I WITH GRANT OPTION', + r.role_ident + ); + ELSE + EXECUTE pg_catalog.format( + 'GRANT UPDATE (blocked_on_signal) ON df.instances TO %I', + r.role_ident + ); + END IF; + END LOOP; +END; +$do$; + -- ============================================================================ -- Remove df.debug_connection() (issue #110, reclassified non-security cleanup). -- @@ -87,7 +147,7 @@ BEGIN -- Table privileges EXECUTE pg_catalog.format('GRANT SELECT ON df.instances TO %I', p_role) OPERATOR(pg_catalog.||) grant_opt; - EXECUTE pg_catalog.format('GRANT UPDATE (status, updated_at) ON df.instances TO %I', p_role) OPERATOR(pg_catalog.||) grant_opt; + EXECUTE pg_catalog.format('GRANT UPDATE (status, updated_at, blocked_on_signal) ON df.instances TO %I', p_role) OPERATOR(pg_catalog.||) grant_opt; EXECUTE pg_catalog.format('GRANT SELECT ON df.nodes TO %I', p_role) OPERATOR(pg_catalog.||) grant_opt; EXECUTE pg_catalog.format('GRANT INSERT (id, label, root_node, submitted_by, database) ON df.instances TO %I', p_role) OPERATOR(pg_catalog.||) grant_opt; EXECUTE pg_catalog.format('GRANT INSERT (id, instance_id, node_type, query, result_name, left_node, right_node, submitted_by, database) ON df.nodes TO %I', p_role) OPERATOR(pg_catalog.||) grant_opt; @@ -155,7 +215,7 @@ BEGIN EXECUTE pg_catalog.format('REVOKE INSERT (id, instance_id, node_type, query, result_name, left_node, right_node, submitted_by, database) ON df.nodes FROM %I CASCADE', p_role); EXECUTE pg_catalog.format('REVOKE SELECT ON df.nodes FROM %I CASCADE', p_role); EXECUTE pg_catalog.format('REVOKE INSERT (id, label, root_node, submitted_by, database) ON df.instances FROM %I CASCADE', p_role); - EXECUTE pg_catalog.format('REVOKE UPDATE (status, updated_at) ON df.instances FROM %I CASCADE', p_role); + EXECUTE pg_catalog.format('REVOKE UPDATE (status, updated_at, blocked_on_signal) ON df.instances FROM %I CASCADE', p_role); EXECUTE pg_catalog.format('REVOKE SELECT ON df.instances FROM %I CASCADE', p_role); -- Schema access — the access gate for all ordinary df.* functions. diff --git a/src/activities/update_instance_status.rs b/src/activities/update_instance_status.rs index c467c872..4531ed5e 100644 --- a/src/activities/update_instance_status.rs +++ b/src/activities/update_instance_status.rs @@ -10,6 +10,26 @@ use std::sync::Arc; /// Activity name for registration and scheduling pub const NAME: &str = "pg_durable::activity::update-instance-status"; +/// New binaries can briefly run against an old extension schema before +/// ALTER EXTENSION UPDATE adds this column. +pub async fn instances_have_blocked_on_signal(pool: &PgPool) -> Result { + sqlx::query_scalar::<_, bool>( + "SELECT EXISTS ( + SELECT 1 + FROM pg_catalog.pg_attribute a + JOIN pg_catalog.pg_class c ON c.oid = a.attrelid + JOIN pg_catalog.pg_namespace n ON n.oid = c.relnamespace + WHERE n.nspname = 'df' + AND c.relname = 'instances' + AND a.attname = 'blocked_on_signal' + AND NOT a.attisdropped + )", + ) + .fetch_one(pool) + .await + .map_err(|e| format!("Failed to inspect df.instances columns: {e}")) +} + /// Update the status of an instance in df.instances pub async fn execute( ctx: ActivityContext, @@ -26,10 +46,23 @@ pub async fn execute( "Updating instance {instance_id} status to {status}" )); + let has_blocked_on_signal = instances_have_blocked_on_signal(pool.as_ref()).await?; + // Never overwrite a terminal state ('completed', 'failed', 'cancelled') with any status. // This prevents a race where an in-flight activity (scheduled just before cancel was // processed) tries to flip the status back from 'cancelled' to 'running'/'completed'. - let query = if status == "completed" { + let clears_blocked_on_signal = + has_blocked_on_signal && matches!(status, "completed" | "failed" | "cancelled"); + + let query = if status == "completed" && clears_blocked_on_signal { + sqlx::query( + "UPDATE df.instances + SET status = $1, completed_at = now(), updated_at = now(), blocked_on_signal = NULL + WHERE id = $2 AND status NOT IN ('completed', 'failed', 'cancelled')", + ) + .bind(status) + .bind(instance_id) + } else if status == "completed" { sqlx::query( "UPDATE df.instances SET status = $1, completed_at = now(), updated_at = now() @@ -37,6 +70,14 @@ pub async fn execute( ) .bind(status) .bind(instance_id) + } else if clears_blocked_on_signal { + sqlx::query( + "UPDATE df.instances + SET status = $1, updated_at = now(), blocked_on_signal = NULL + WHERE id = $2 AND status NOT IN ('completed', 'failed', 'cancelled')", + ) + .bind(status) + .bind(instance_id) } else { sqlx::query( "UPDATE df.instances diff --git a/src/activities/update_node_status.rs b/src/activities/update_node_status.rs index ca751429..9f6351c1 100644 --- a/src/activities/update_node_status.rs +++ b/src/activities/update_node_status.rs @@ -5,8 +5,11 @@ use duroxide::ActivityContext; use sqlx::PgPool; +use sqlx::Row; use std::sync::Arc; +use crate::activities::update_instance_status::instances_have_blocked_on_signal; + /// Activity name for registration and scheduling pub const NAME: &str = "pg_durable::activity::update-node-status"; @@ -75,6 +78,11 @@ pub async fn execute( Ok(done) => { let rows = done.rows_affected(); if rows == 1 { + if let Err(e) = sync_instance_signal_wait(pool.as_ref(), instance_id, node_id).await + { + ctx.trace_info(&e); + return Err(e); + } Ok("Node status updated".to_string()) } else { // Exactly one row must match (instance_id, id). Anything else @@ -96,3 +104,105 @@ pub async fn execute( } } } + +async fn sync_instance_signal_wait( + pool: &PgPool, + instance_id: &str, + node_id: &str, +) -> Result<(), String> { + if !instances_have_blocked_on_signal(pool).await? { + return Ok(()); + } + + let row = sqlx::query( + "SELECT node_type + FROM df.nodes + WHERE id = $1 AND instance_id = $2", + ) + .bind(node_id) + .bind(instance_id) + .fetch_optional(pool) + .await + .map_err(|e| format!("Failed to load node for signal wait sync: {e}"))?; + + let Some(row) = row else { + return Err(format!( + "signal wait sync found no node {node_id} in instance {instance_id}" + )); + }; + + let node_type: String = row + .try_get("node_type") + .map_err(|e| format!("Failed to read node_type for signal wait sync: {e}"))?; + if node_type != "SIGNAL" { + return Ok(()); + } + + let mut tx = pool + .begin() + .await + .map_err(|e| format!("Failed to start signal wait sync transaction: {e}"))?; + + let instance_row = sqlx::query( + "SELECT 1 + FROM df.instances + WHERE id = $1 AND status NOT IN ('completed', 'failed', 'cancelled') + FOR UPDATE", + ) + .bind(instance_id) + .fetch_optional(&mut *tx) + .await + .map_err(|e| format!("Failed to lock instance for signal wait sync: {e}"))?; + + if instance_row.is_none() { + tx.commit() + .await + .map_err(|e| format!("Failed to finish signal wait sync transaction: {e}"))?; + return Ok(()); + } + + // Parallel branches can have overlapping SIGNAL waits. Recompute from the + // current running SIGNAL nodes instead of blindly clearing this node's name. + let running_signal_query = sqlx::query_scalar::<_, Option>( + "SELECT query + FROM df.nodes + WHERE instance_id = $1 + AND node_type = 'SIGNAL' + AND status = 'running' + ORDER BY updated_at DESC, id + LIMIT 1", + ) + .bind(instance_id) + .fetch_optional(&mut *tx) + .await + .map_err(|e| format!("Failed to find running SIGNAL node: {e}"))? + .flatten(); + + let blocked_on_signal = running_signal_query.and_then(|config_str| { + serde_json::from_str::(&config_str) + .ok() + .and_then(|config| { + config + .get("signal_name") + .and_then(|value| value.as_str()) + .map(str::to_string) + }) + }); + + sqlx::query( + "UPDATE df.instances + SET blocked_on_signal = $1, updated_at = now() + WHERE id = $2 AND status NOT IN ('completed', 'failed', 'cancelled')", + ) + .bind(blocked_on_signal) + .bind(instance_id) + .execute(&mut *tx) + .await + .map_err(|e| format!("Failed to sync instance signal wait marker: {e}"))?; + + tx.commit() + .await + .map_err(|e| format!("Failed to finish signal wait sync transaction: {e}"))?; + + Ok(()) +} diff --git a/src/dsl.rs b/src/dsl.rs index 9a3ef226..8e276017 100644 --- a/src/dsl.rs +++ b/src/dsl.rs @@ -129,6 +129,24 @@ fn legacy_login_role_schema() -> bool { !owner_scoped_vars_enabled() } +fn instances_have_blocked_on_signal() -> bool { + Spi::get_one::( + "SELECT EXISTS ( + SELECT 1 + FROM pg_catalog.pg_attribute a + JOIN pg_catalog.pg_class c ON c.oid = a.attrelid + JOIN pg_catalog.pg_namespace n ON n.oid = c.relnamespace + WHERE n.nspname = 'df' + AND c.relname = 'instances' + AND a.attname = 'blocked_on_signal' + AND NOT a.attisdropped + )", + ) + .ok() + .flatten() + .unwrap_or(false) +} + /// Sets a workflow variable. Must be called BEFORE df.start(), not inside a workflow. /// Variables are captured at df.start() and remain immutable during execution. /// Each user has their own variable namespace (owner = current_user). @@ -1101,13 +1119,26 @@ pub fn cancel(instance_id: &str, reason: default!(&str, "'Cancelled by user'")) // 1. Overwriting a 'completed' or 'failed' instance that finished before the cancel // signal was processed by duroxide. // 2. Calling df.cancel twice in a row (idempotent by guard). - // User has column-level UPDATE on (status, updated_at) with RLS restricting to own rows. - Spi::run_with_args( - "UPDATE df.instances SET status = 'cancelled', updated_at = pg_catalog.now() \ - WHERE id = $1 AND status NOT IN ('completed', 'failed', 'cancelled')", - &[instance_id.into()], - ) - .unwrap_or_else(|e| warning!("Failed to update instance status: {e}")); + // User has column-level UPDATE on runtime status columns with RLS restricting to own rows. + if instances_have_blocked_on_signal() { + Spi::run_with_args( + "UPDATE df.instances + SET status = 'cancelled', + blocked_on_signal = NULL, + updated_at = pg_catalog.now() + WHERE id = $1 AND status NOT IN ('completed', 'failed', 'cancelled')", + &[instance_id.into()], + ) + .unwrap_or_else(|e| warning!("Failed to update instance status: {e}")); + } else { + Spi::run_with_args( + "UPDATE df.instances + SET status = 'cancelled', updated_at = pg_catalog.now() + WHERE id = $1 AND status NOT IN ('completed', 'failed', 'cancelled')", + &[instance_id.into()], + ) + .unwrap_or_else(|e| warning!("Failed to update instance status: {e}")); + } format!("Instance {instance_id} cancelled: {reason}") } diff --git a/src/lib.rs b/src/lib.rs index 60fce559..8a21322d 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -211,11 +211,14 @@ CREATE TABLE df.instances ( database TEXT, created_at TIMESTAMPTZ DEFAULT pg_catalog.now(), updated_at TIMESTAMPTZ DEFAULT pg_catalog.now(), - completed_at TIMESTAMPTZ + completed_at TIMESTAMPTZ, + blocked_on_signal TEXT ); COMMENT ON COLUMN df.instances.submitted_by IS 'Effective role (current_user) at df.start() time - used for connection authentication and SQL execution'; +COMMENT ON COLUMN df.instances.blocked_on_signal IS + 'Signal name while the instance is parked on a SIGNAL node; NULL otherwise'; -- Index for finding pending instances CREATE INDEX idx_instances_status ON df.instances(status); @@ -413,7 +416,7 @@ BEGIN -- Table privileges EXECUTE pg_catalog.format('GRANT SELECT ON df.instances TO %I', p_role) OPERATOR(pg_catalog.||) grant_opt; - EXECUTE pg_catalog.format('GRANT UPDATE (status, updated_at) ON df.instances TO %I', p_role) OPERATOR(pg_catalog.||) grant_opt; + EXECUTE pg_catalog.format('GRANT UPDATE (status, updated_at, blocked_on_signal) ON df.instances TO %I', p_role) OPERATOR(pg_catalog.||) grant_opt; EXECUTE pg_catalog.format('GRANT SELECT ON df.nodes TO %I', p_role) OPERATOR(pg_catalog.||) grant_opt; EXECUTE pg_catalog.format('GRANT INSERT (id, label, root_node, submitted_by, database) ON df.instances TO %I', p_role) OPERATOR(pg_catalog.||) grant_opt; EXECUTE pg_catalog.format('GRANT INSERT (id, instance_id, node_type, query, result_name, left_node, right_node, submitted_by, database) ON df.nodes TO %I', p_role) OPERATOR(pg_catalog.||) grant_opt; @@ -466,7 +469,7 @@ BEGIN EXECUTE pg_catalog.format('REVOKE INSERT (id, instance_id, node_type, query, result_name, left_node, right_node, submitted_by, database) ON df.nodes FROM %I CASCADE', p_role); EXECUTE pg_catalog.format('REVOKE SELECT ON df.nodes FROM %I CASCADE', p_role); EXECUTE pg_catalog.format('REVOKE INSERT (id, label, root_node, submitted_by, database) ON df.instances FROM %I CASCADE', p_role); - EXECUTE pg_catalog.format('REVOKE UPDATE (status, updated_at) ON df.instances FROM %I CASCADE', p_role); + EXECUTE pg_catalog.format('REVOKE UPDATE (status, updated_at, blocked_on_signal) ON df.instances FROM %I CASCADE', p_role); EXECUTE pg_catalog.format('REVOKE SELECT ON df.instances FROM %I CASCADE', p_role); -- Schema access — the access gate for all ordinary df.* functions. diff --git a/tests/e2e/sql/07_signals.sql b/tests/e2e/sql/07_signals.sql index 544cbfeb..69e5cc64 100644 --- a/tests/e2e/sql/07_signals.sql +++ b/tests/e2e/sql/07_signals.sql @@ -34,15 +34,23 @@ DO $$ DECLARE inst_id TEXT; status TEXT; + blocked TEXT; BEGIN SELECT instance_id INTO inst_id FROM _test_signal_basic; - SELECT s INTO status FROM df.status(inst_id) s; + SELECT i.status, i.blocked_on_signal + INTO status, blocked + FROM df.instances i + WHERE i.id = inst_id; IF lower(status) = 'completed' THEN RAISE EXCEPTION 'TEST FAILED: workflow should be waiting for signal, not completed'; END IF; + + IF blocked IS DISTINCT FROM 'go' THEN + RAISE EXCEPTION 'TEST FAILED: blocked_on_signal = %, expected go', blocked; + END IF; - RAISE NOTICE 'Verified workflow is waiting (status: %)', status; + RAISE NOTICE 'Verified workflow is waiting (status: %, blocked_on_signal: %)', status, blocked; END $$; -- Send the signal @@ -60,6 +68,7 @@ DO $$ DECLARE inst_id TEXT; status TEXT; + blocked TEXT; BEGIN SELECT instance_id INTO inst_id FROM _test_signal_basic; RAISE NOTICE 'Testing basic signal: %', inst_id; @@ -69,6 +78,11 @@ BEGIN IF status != 'completed' THEN RAISE EXCEPTION 'TEST FAILED: basic signal status = %', status; END IF; + + SELECT blocked_on_signal INTO blocked FROM df.instances WHERE id = inst_id; + IF blocked IS NOT NULL THEN + RAISE EXCEPTION 'TEST FAILED: blocked_on_signal should clear after signal, got %', blocked; + END IF; IF NOT EXISTS ( SELECT 1 FROM signal_test_log @@ -96,7 +110,7 @@ DELETE FROM signal_test_log; CREATE TEMP TABLE _test_signal_timeout (instance_id TEXT); INSERT INTO _test_signal_timeout SELECT df.start( - df.wait_for_signal('never_arrives', 2) |=> 'sig' + df.wait_for_signal('never_arrives', 4) |=> 'sig' ~> 'INSERT INTO signal_test_log (msg, data) VALUES (''timeout_result'', $sig::jsonb)', 'test-signal-timeout' @@ -106,15 +120,27 @@ DO $$ DECLARE inst_id TEXT; status TEXT; + blocked TEXT; BEGIN SELECT instance_id INTO inst_id FROM _test_signal_timeout; RAISE NOTICE 'Testing signal timeout: %', inst_id; + PERFORM pg_sleep(1); + SELECT blocked_on_signal INTO blocked FROM df.instances WHERE id = inst_id; + IF blocked IS DISTINCT FROM 'never_arrives' THEN + RAISE EXCEPTION 'TEST FAILED: timeout blocked_on_signal = %, expected never_arrives', blocked; + END IF; + SELECT df.await_instance(inst_id, 10) INTO status; IF status != 'completed' THEN RAISE EXCEPTION 'TEST FAILED: signal timeout status = %', status; END IF; + + SELECT blocked_on_signal INTO blocked FROM df.instances WHERE id = inst_id; + IF blocked IS NOT NULL THEN + RAISE EXCEPTION 'TEST FAILED: blocked_on_signal should clear after timeout, got %', blocked; + END IF; IF NOT EXISTS ( SELECT 1 FROM signal_test_log diff --git a/tests/e2e/sql/17_superuser_guc.sql b/tests/e2e/sql/17_superuser_guc.sql index 008a5ebc..29f04e08 100644 --- a/tests/e2e/sql/17_superuser_guc.sql +++ b/tests/e2e/sql/17_superuser_guc.sql @@ -38,7 +38,7 @@ CREATE ROLE su_guc_forger LOGIN BYPASSRLS; SELECT df.grant_usage('su_guc_forger'); GRANT TEMPORARY ON DATABASE postgres TO su_guc_forger; -- The forger needs UPDATE on submitted_by to simulate the attack. --- df.grant_usage() only grants UPDATE (status, updated_at). +-- df.grant_usage() only grants UPDATE on runtime status columns. GRANT UPDATE (submitted_by) ON df.instances TO su_guc_forger; GRANT UPDATE (submitted_by) ON df.nodes TO su_guc_forger;