diff --git a/USER_GUIDE.md b/USER_GUIDE.md index ad2611b..2dd0632 100644 --- a/USER_GUIDE.md +++ b/USER_GUIDE.md @@ -1535,6 +1535,23 @@ SELECT * FROM df.metrics(); > **Note:** `df.metrics()` returns system-wide aggregate counts across all users and is omitted from an ordinary `df.grant_usage('role')`. It is granted automatically to pg_durable admins via `df.grant_usage('role', with_grant => true)`, or you can grant EXECUTE on `df.metrics()` directly to any role that may view cluster-wide pg_durable activity. Other users can call `df.list_instances()` to view a summary of their own workflows. +### Reconcile Runtime Drift (Admin Only) + +```sql +-- Requires an explicit admin grant or superuser. +SELECT * FROM df.reconcile(); +SELECT * FROM df.reconcile(0); -- no grace window, useful for tests +``` + +**Columns:** `duroxide_orphans_deleted`, `stuck_instances_failed` + +`df.reconcile()` is a best-effort repair backstop. It deletes old runtime root +instances whose full `_duroxide` subtree has no matching `df.instances` row, and +marks stale non-terminal `df.instances` rows failed when there is no live runtime +instance and no queued start. The background worker keeps one built-in reconciler +loop running on `pg_durable.reconciler_cron` (default `*/5 * * * *`); set the GUC +to an empty string and restart to disable the built-in loop. + ### Quick Status Check `df.status()` and `df.result()` take an **`instance_id`** (returned by `df.start()`), **not** a label. Passing a label returns `NULL`. diff --git a/docs/E2E_TESTING.md b/docs/E2E_TESTING.md index e832388..75baa4f 100644 --- a/docs/E2E_TESTING.md +++ b/docs/E2E_TESTING.md @@ -71,6 +71,7 @@ The test suite is organized into 23 files. Files `01`–`09` open with `SET SESS | `14_database.sql` | Wrong-database `CREATE EXTENSION` rejection; `df.start(query, label, database)` multi-database routing | | `15_rls.sql` | RLS on `df.instances` / `df.nodes` / `df.vars` — per-user visibility, cross-user cancel/signal denied, column-level UPDATE, superuser bypass, per-user variable isolation | | `16_heartbeat.sql` | Worker heartbeat liveness — `df._worker_epoch.last_seen_at` advances over time | +| `26_reconcile_orphan_gc.sql` | `df.reconcile()` deletes orphaned runtime root subtrees and leaves healthy instances untouched | | `52_node_id_collision_across_instances.sql` | Cross-instance node-ID collision — two instances own the same 8-hex node id; asserts composite-PK coexistence, that `(instance_id, id)` addresses exactly one row, `df.result()` is instance-scoped, and a scoped `update_node_status`-style UPDATE affects exactly one row (issue #129) | ### Build-Phase Specific @@ -237,4 +238,3 @@ tail -f ~/.pgrx/17.log Error: cargo pgrx install failed ``` → Run `cargo build --features pg17` and then retry the test runner - diff --git a/docs/api-reference.md b/docs/api-reference.md index 61eaaa5..303825e 100644 --- a/docs/api-reference.md +++ b/docs/api-reference.md @@ -351,6 +351,23 @@ SELECT df.result('a1b2c3d4'); --- +### df.reconcile([p_grace_seconds]) + +Repairs residual drift between `df.*` control-plane rows and the configured +duroxide provider schema. Admin-only; `EXECUTE` is revoked from `PUBLIC`. + +| Parameter | Type | Auto-wrap | Description | +|-----------|------|-----------|-------------| +| `p_grace_seconds` | INTEGER | ❌ Literal | Minimum orphan/stuck age before repair; defaults to 60 | + +```sql +SELECT * FROM df.reconcile(); +``` + +Returns `duroxide_orphans_deleted` and `stuck_instances_failed`. + +--- + ### df.instance_nodes(instance_id) Returns one row per node in an instance's graph, with each node's stored physical @@ -562,4 +579,3 @@ SHOW pg_durable.enable_superuser_instances; ``` **Security note:** Setting this GUC to `on` in a multi-tenant environment allows any role with `BYPASSRLS` to forge `submitted_by` to a superuser OID and execute arbitrary SQL as superuser. Keep `off` unless you have a specific need and understand the risk. See [docs/superuser_guc.md](superuser_guc.md) for the full threat analysis. - diff --git a/docs/upgrade-testing.md b/docs/upgrade-testing.md index 9b29b0e..4e8375b 100644 --- a/docs/upgrade-testing.md +++ b/docs/upgrade-testing.md @@ -221,6 +221,13 @@ what the upgrade script handles, and any backward compatibility considerations. - **Scenario B1 considerations:** The new `.so` remains compatible with v0.2.3 schemas that have not run `ALTER EXTENSION UPDATE`: existing catalog entries still bind `df.wait_for_completion` to `wait_for_completion_wrapper`, which is retained as a Rust shim to `df.await_instance`. - **Scenario B2 considerations:** No data migration. Existing instances are unaffected; the upgrade only adds a SQL function binding. +#### `df.reconcile()` repair backstop +- **DDL change (df schema):** Adds admin-only `df.reconcile(integer)` (`SECURITY DEFINER`, `REVOKE EXECUTE ... FROM PUBLIC`) to delete orphaned duroxide instance subtrees and mark stuck `df.instances` rows failed. Fresh installs and the upgrade script define the same function. +- **Runtime change:** The background worker starts a built-in durable reconciler loop as the dedicated non-superuser role `df_reconciler`, using `pg_durable.reconciler_cron` (default `*/5 * * * *`; empty disables it) to schedule `SELECT * FROM df.reconcile()`. +- **Provider coupling:** `df.reconcile()` intentionally uses the configured duroxide schema SQL API and tables (`df.duroxide_schema()`, provider `instances`, `orchestrator_queue`, and `delete_instances_atomic`) because repair must inspect and delete provider-owned runtime rows. +- **Scenario A considerations:** Fresh-install and upgrade schemas must both expose `df.reconcile(integer)` with the same result shape and PUBLIC EXECUTE revoked. +- **Scenario B1/B2 considerations:** A new `.so` against older schemas simply lacks `df.reconcile()` until `ALTER EXTENSION UPDATE`; existing workflows are unaffected. The repair function is best-effort and additive. + #### #110 Remove df.debug_connection() (reclassified non-security cleanup) - **DDL change (df schema):** The upgrade script `sql/pg_durable--0.2.3--0.2.4.sql` runs `DROP FUNCTION IF EXISTS df.debug_connection();`. Fresh v0.2.4 installs never create the function: its `#[pg_extern]` in `src/dsl.rs` is annotated `#[pg_extern(sql = false)]`, so pgrx emits no `CREATE FUNCTION` for it (the generated schema records `-- Skipped due to #[pgrx(sql = false)]`). The function returned the worker connection string (no credential) and is dropped as surface-reduction, because the worker role is already exposed to any role via native PostgreSQL channels — the world-readable `pg_durable.worker_role` GUC and `pg_stat_activity.usename` (see security-review item I-6); the remaining fields (database, host/port, schema) are connection-topology metadata, not secrets (the host comes from `PGHOST`, defaulting to loopback). Reclassified from security to cleanup; see issue #110. - **Interaction with the `df.grant_usage()` simplification:** Earlier in this release `df.grant_usage()` carried `'df.debug_connection()'` in its explicit per-function allowlist (`func_sigs`), so dropping the function would have required editing that allowlist. The grant_usage simplification above (#242) removed the allowlist entirely in this same release, so the upgrade no longer needs any `grant_usage` change to account for the removed function — it simply drops `df.debug_connection()`. diff --git a/scripts/test-upgrade.sh b/scripts/test-upgrade.sh index 3e05e84..8f5af24 100755 --- a/scripts/test-upgrade.sh +++ b/scripts/test-upgrade.sh @@ -241,7 +241,9 @@ stop_server() { cleanup_databases() { "$PSQL" -h localhost -p "$PG_PORT" -U postgres -d "$PG_DB" \ - -c "DROP EXTENSION IF EXISTS pg_durable CASCADE;" 2>/dev/null || true + -c "DROP EXTENSION IF EXISTS pg_durable CASCADE; + DROP SCHEMA IF EXISTS _duroxide CASCADE; + DROP SCHEMA IF EXISTS duroxide CASCADE;" 2>/dev/null || true } cleanup() { @@ -290,6 +292,10 @@ if [ -f "$DATA_DIR/postgresql.conf" ]; then sed -i.bak '/^#*pg_durable.database/d' "$DATA_DIR/postgresql.conf" echo "pg_durable.database = 'postgres'" >> "$DATA_DIR/postgresql.conf" fi + if ! grep -q "^pg_durable.reconciler_cron = ''" "$DATA_DIR/postgresql.conf" 2>/dev/null; then + sed -i.bak '/^#*pg_durable.reconciler_cron/d' "$DATA_DIR/postgresql.conf" + echo "pg_durable.reconciler_cron = ''" >> "$DATA_DIR/postgresql.conf" + fi if ! grep -q "^port = $PG_PORT$" "$DATA_DIR/postgresql.conf" 2>/dev/null; then sed -i.bak '/^#*port = /d' "$DATA_DIR/postgresql.conf" echo "port = $PG_PORT" >> "$DATA_DIR/postgresql.conf" @@ -397,6 +403,27 @@ assert_sql_contains() { fi } +assert_sql_contains_eventually() { + local sql="$1" + local expected_fragment="$2" + local attempts="${3:-100}" + local result="" + + for _ in $(seq 1 "$attempts"); do + result=$(run_sql_capture "$sql") || return 1 + if [[ "$result" == *"$expected_fragment"* ]]; then + return 0 + fi + sleep 0.1 + done + + echo "" + echo " SQL: $sql" + echo " Expected fragment: $expected_fragment" + echo " Got: $result" + return 1 +} + assert_sql_empty() { local sql="$1" local result @@ -468,8 +495,7 @@ create_extension_at_version() { return 1 fi - "$PSQL" -h localhost -p "$PG_PORT" -U postgres -d "$PG_DB" \ - -c "DROP EXTENSION IF EXISTS pg_durable CASCADE;" >/dev/null 2>&1 + cleanup_databases "$PSQL" -h localhost -p "$PG_PORT" -U postgres -d "$PG_DB" \ -v ON_ERROR_STOP=1 \ -c "CREATE EXTENSION pg_durable VERSION '${base_version}';" >/dev/null 2>&1 @@ -691,16 +717,14 @@ test_schema_upgrade() { snapshot_schema "$tmpdir/upgraded.txt" # Step 2: Fresh install at current version - "$PSQL" -h localhost -p "$PG_PORT" -U postgres -d "$PG_DB" \ - -c "DROP EXTENSION IF EXISTS pg_durable CASCADE;" >/dev/null 2>&1 + cleanup_databases "$PSQL" -h localhost -p "$PG_PORT" -U postgres -d "$PG_DB" \ -v ON_ERROR_STOP=1 \ -c "CREATE EXTENSION pg_durable;" >/dev/null 2>&1 snapshot_schema "$tmpdir/fresh.txt" # Clean up - "$PSQL" -h localhost -p "$PG_PORT" -U postgres -d "$PG_DB" \ - -c "DROP EXTENSION IF EXISTS pg_durable CASCADE;" >/dev/null 2>&1 + cleanup_databases # Filter out PUBLIC grant rows (grant_table, grant_routine, grant_schema) # from both snapshots before comparison. Grants to PUBLIC intentionally @@ -857,7 +881,7 @@ test_b1_status_instance() { } test_b1_result() { - assert_sql_contains "SELECT df.result('${B1_INSTANCE_ID}');" "test_value" + assert_sql_contains_eventually "SELECT df.result('${B1_INSTANCE_ID}');" "test_value" } test_b1_status_nonexistent() { @@ -957,14 +981,14 @@ test_b2_data_survives_upgrade() { test_b2_pre_upgrade_instance_after_upgrade() { assert_sql_equals "SELECT df.status('${B2_PRE_INSTANCE_ID}');" "completed" && - assert_sql_contains "SELECT df.result('${B2_PRE_INSTANCE_ID}');" "b2_value" && + assert_sql_contains_eventually "SELECT df.result('${B2_PRE_INSTANCE_ID}');" "b2_value" && assert_sql_equals "SELECT lower(status) FROM df.instance_info('${B2_PRE_INSTANCE_ID}');" "completed" && assert_sql_equals "SELECT EXISTS (SELECT 1 FROM df.list_instances() WHERE instance_id = '${B2_PRE_INSTANCE_ID}');" "t" } test_b2_inflight_work_after_upgrade() { assert_sql_equals_ignoring_warnings "SELECT df.wait_for_completion('${B2_INFLIGHT_INSTANCE_ID}', 30);" "completed" && - assert_sql_contains "SELECT df.result('${B2_INFLIGHT_INSTANCE_ID}');" "b2-running" + assert_sql_contains_eventually "SELECT df.result('${B2_INFLIGHT_INSTANCE_ID}');" "b2-running" } test_b2_new_data_after_upgrade() { diff --git a/sql/pg_durable--0.2.3--0.2.4.sql b/sql/pg_durable--0.2.3--0.2.4.sql index 70be4cc..40153a6 100644 --- a/sql/pg_durable--0.2.3--0.2.4.sql +++ b/sql/pg_durable--0.2.3--0.2.4.sql @@ -182,6 +182,95 @@ STRICT LANGUAGE c AS 'MODULE_PATHNAME', 'await_instance_wrapper'; +-- ============================================================================ +-- df.reconcile(): repair residual df.* / duroxide divergence. +-- +-- Best-effort admin backstop: delete orphaned duroxide instance subtrees whose +-- root has no df.instances row, and mark stale df.instances rows failed when +-- the runtime has neither a live instance nor a queued start. Fresh installs +-- define the same function in src/lib.rs. +-- ============================================================================ +CREATE FUNCTION df.reconcile(p_grace_seconds integer DEFAULT 60) +RETURNS TABLE(duroxide_orphans_deleted bigint, stuck_instances_failed bigint) +LANGUAGE plpgsql +SECURITY DEFINER +SET search_path = pg_catalog, pg_temp +AS $fn$ +DECLARE + sch text := df.duroxide_schema(); + orphan_ids text[]; + deleted bigint := 0; + stuck bigint := 0; +BEGIN + -- 1) Delete orphaned duroxide subtrees: every duroxide instance whose ROOT + -- ancestor has no df.instances row and is older than the grace window. + -- We must gather the FULL subtree (root + all descendants) because + -- delete_instances_atomic refuses (even with force) to delete a parent + -- whose children are not also in the list. Sub-orchestrations (JOIN/RACE + -- branches, loop generations) have no df.instances row and would be + -- mis-detected as roots if we keyed on "no df row" alone, so the orphan + -- seed is restricted to parent_instance_id IS NULL. + -- Wrapped so a GC failure never aborts reconcile or kills the built-in + -- reconciler loop. + BEGIN + EXECUTE pg_catalog.format( + 'WITH RECURSIVE orphan_root AS ( ' + ' SELECT d.instance_id ' + ' FROM %1$I.instances d ' + ' LEFT JOIN df.instances i ON i.id = d.instance_id ' + ' WHERE i.id IS NULL ' + ' AND d.parent_instance_id IS NULL ' + ' AND d.created_at < pg_catalog.now() - pg_catalog.make_interval(secs => $1) ' + '), subtree AS ( ' + ' SELECT instance_id FROM orphan_root ' + ' UNION ' + ' SELECT c.instance_id FROM %1$I.instances c ' + ' JOIN subtree s ON c.parent_instance_id = s.instance_id ' + ') SELECT pg_catalog.array_agg(instance_id) FROM subtree', + sch) + INTO orphan_ids + USING p_grace_seconds; + + IF orphan_ids IS NOT NULL AND pg_catalog.array_length(orphan_ids, 1) > 0 THEN + EXECUTE pg_catalog.format( + 'SELECT instances_deleted FROM %I.delete_instances_atomic($1, $2)', sch) + INTO deleted + USING orphan_ids, true; + END IF; + EXCEPTION WHEN OTHERS THEN + deleted := 0; + RAISE WARNING 'pg_durable: reconcile orphan-GC pass failed: %', SQLERRM; + END; + + -- 2) df.instances stuck non-terminal with no live duroxide instance and no + -- queued start (lost enqueue) -> mark failed. The duroxide queue row + -- persists (locked) until ack, and the instance row is created at ack, so + -- a healthy in-flight start always matches one of the NOT EXISTS guards + -- and is never failed here. Best-effort; wrapped like step 1. + BEGIN + EXECUTE pg_catalog.format( + 'UPDATE df.instances i ' + 'SET status = ''failed'', updated_at = pg_catalog.now() ' + 'WHERE i.status IN (''pending'', ''running'') ' + ' AND i.updated_at < pg_catalog.now() - pg_catalog.make_interval(secs => $1) ' + ' AND NOT EXISTS (SELECT 1 FROM %1$I.instances d WHERE d.instance_id = i.id) ' + ' AND NOT EXISTS (SELECT 1 FROM %1$I.orchestrator_queue q WHERE q.instance_id = i.id)', + sch) + USING p_grace_seconds; + GET DIAGNOSTICS stuck = ROW_COUNT; + EXCEPTION WHEN OTHERS THEN + stuck := 0; + RAISE WARNING 'pg_durable: reconcile stuck-failover pass failed: %', SQLERRM; + END; + + duroxide_orphans_deleted := deleted; + stuck_instances_failed := stuck; + RETURN NEXT; +END; +$fn$; + +REVOKE EXECUTE ON FUNCTION df.reconcile(integer) FROM PUBLIC; + -- ============================================================================ -- Promote df.nodes to a composite primary key (instance_id, id) (issue #129). -- diff --git a/src/activities/update_node_status.rs b/src/activities/update_node_status.rs index ab2978d..a7c86f7 100644 --- a/src/activities/update_node_status.rs +++ b/src/activities/update_node_status.rs @@ -5,35 +5,24 @@ use duroxide::ActivityContext; use sqlx::{PgPool, Postgres, QueryBuilder}; -use std::sync::atomic::{AtomicU8, Ordering}; use std::sync::Arc; /// Activity name for registration and scheduling pub const NAME: &str = "pg_durable::activity::update-node-status"; -/// Process-global cache for whether df.nodes.status_details exists. -/// -/// 0 = unknown, 1 = present, 2 = absent. The column is added by the -/// 0.2.3 → 0.2.4 upgrade; a binary newer than the schema (Scenario B1) must run -/// against an older schema that lacks it. We cache "present" permanently once -/// seen, but re-probe on "unknown"/"absent" so an in-place ALTER EXTENSION -/// UPDATE that adds the column is picked up without a worker restart. -static STATUS_DETAILS_COL: AtomicU8 = AtomicU8::new(0); - async fn status_details_present(pool: &PgPool) -> bool { - if STATUS_DETAILS_COL.load(Ordering::Relaxed) == 1 { - return true; - } - let present = sqlx::query_scalar::<_, bool>( + // The background worker process survives DROP/CREATE EXTENSION in upgrade + // tests and in real binary-only swaps. The df schema can therefore move + // from "has status_details" to "does not have status_details" without a + // process restart, so a process-global positive cache would be unsafe. + sqlx::query_scalar::<_, bool>( "SELECT EXISTS (SELECT 1 FROM information_schema.columns \ WHERE table_schema = 'df' AND table_name = 'nodes' \ AND column_name = 'status_details')", ) .fetch_one(pool) .await - .unwrap_or(false); - STATUS_DETAILS_COL.store(if present { 1 } else { 2 }, Ordering::Relaxed); - present + .unwrap_or(false) } /// Update the status and optionally the result of a node in df.nodes. diff --git a/src/client.rs b/src/client.rs index 3d10dfb..01ae597 100644 --- a/src/client.rs +++ b/src/client.rs @@ -56,10 +56,7 @@ fn is_worker_ready() -> bool { } Spi::get_one_with_args::( - &format!( - "SELECT EXISTS(SELECT 1 FROM {}._worker_ready WHERE schema_version >= $1)", - schema - ), + &format!("SELECT EXISTS(SELECT 1 FROM {schema}._worker_ready WHERE schema_version >= $1)"), &[crate::WORKER_SCHEMA_VERSION.into()], ) .ok() diff --git a/src/dsl.rs b/src/dsl.rs index 9a3ef22..fbbbbb8 100644 --- a/src/dsl.rs +++ b/src/dsl.rs @@ -1205,7 +1205,7 @@ pub fn await_instance( "SELECT status FROM df.instances WHERE id = $1", &[instance_id.into()], ) - .map_err(|e| format!("Failed to query status: {:?}", e))?; + .map_err(|e| format!("Failed to query status: {e:?}"))?; if let Some(ref s) = status { let s_lower = s.to_lowercase(); @@ -1220,7 +1220,7 @@ pub fn await_instance( return Ok(s_lower); } } else { - return Err(format!("Instance not found: {}", instance_id).into()); + return Err(format!("Instance not found: {instance_id}").into()); } attempts += 1; diff --git a/src/explain.rs b/src/explain.rs index df14c62..923131e 100644 --- a/src/explain.rs +++ b/src/explain.rs @@ -290,7 +290,7 @@ fn collect_nodes( id_counter: &mut i32, ) -> String { *id_counter += 1; - let node_id = format!("N{}", id_counter); + let node_id = format!("N{id_counter}"); // Recursively collect children first to get their IDs let left_id = node @@ -341,8 +341,7 @@ fn load_nodes_from_table(table: &str, instance_id: Option<&str>) -> HashMap) = if let Some(id) = instance_id { ( format!( - "SELECT id, node_type, query, result_name, left_node, right_node, status, result::text, {status_details_expr} FROM {} WHERE instance_id = $1", - table + "SELECT id, node_type, query, result_name, left_node, right_node, status, result::text, {status_details_expr} FROM {table} WHERE instance_id = $1" ), vec![id.into()], ) diff --git a/src/lib.rs b/src/lib.rs index 33795ed..c3aad99 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -29,6 +29,12 @@ pub static EXECUTION_ACQUIRE_TIMEOUT: GucSetting = GucSetting::::new(3 /// functions are explicitly desired. See docs/superuser_guc.md. pub static ENABLE_SUPERUSER_INSTANCES: GucSetting = GucSetting::::new(false); +/// Cron schedule for the built-in durable reconciler. +/// The background worker ensures one reconciler instance per cluster, running on +/// this schedule. Set to an empty string to disable the built-in reconciler. +pub static RECONCILER_CRON: GucSetting> = + GucSetting::>::new(Some(c"*/5 * * * *")); + // Module declarations pub mod activities; pub mod client; @@ -136,6 +142,15 @@ pub extern "C-unwind" fn _PG_init() { GucFlags::SUPERUSER_ONLY, ); + GucRegistry::define_string_guc( + c"pg_durable.reconciler_cron", + c"Cron schedule for the built-in durable reconciler (df.reconcile()); empty disables it", + c"The background worker keeps one reconciler instance running on this schedule to repair residual df.* / duroxide divergence. Empty string disables the built-in reconciler. Requires server restart to change.", + &RECONCILER_CRON, + GucContext::Postmaster, + GucFlags::default(), + ); + worker::register_background_worker(); } @@ -534,6 +549,114 @@ REVOKE EXECUTE ON FUNCTION df.revoke_usage(text) FROM PUBLIC; requires = ["create_tables", dsl::http, monitoring::metrics] ); +// ============================================================================ +// Reconciler — repair residual df.* / duroxide divergence +// ============================================================================ +// +// Some divergence can arise from legacy non-atomic starts, fallback paths, or +// crashes mid-execution. df.reconcile() is a best-effort garbage collector for +// that residue. It is admin-only (EXECUTE revoked from PUBLIC) and SECURITY +// DEFINER so it can touch the duroxide-owned tables and all users' df.instances. +// +// The background worker keeps one reconciler instance running automatically, +// dogfooding pg_durable as a durable cron loop: +// df.start(df.loop(df.seq('SELECT * FROM df.reconcile()', +// df.wait_for_schedule())), +// 'df_reconciler') +// submitted by the dedicated non-superuser role df_reconciler. Set +// pg_durable.reconciler_cron = '' to disable it. +// +// Only ROOT duroxide instances (parent_instance_id IS NULL) are considered — +// sub-orchestrations (JOIN/RACE branches, loop generations) intentionally have +// no df.instances row and must not be collected. A grace window avoids racing +// legitimately in-flight operations. +extension_sql!( + r#" +CREATE FUNCTION df.reconcile(p_grace_seconds integer DEFAULT 60) +RETURNS TABLE(duroxide_orphans_deleted bigint, stuck_instances_failed bigint) +LANGUAGE plpgsql +SECURITY DEFINER +SET search_path = pg_catalog, pg_temp +AS $fn$ +DECLARE + sch text := df.duroxide_schema(); + orphan_ids text[]; + deleted bigint := 0; + stuck bigint := 0; +BEGIN + -- 1) Delete orphaned duroxide subtrees: every duroxide instance whose ROOT + -- ancestor has no df.instances row and is older than the grace window. + -- We must gather the FULL subtree (root + all descendants) because + -- delete_instances_atomic refuses (even with force) to delete a parent + -- whose children are not also in the list. Sub-orchestrations (JOIN/RACE + -- branches, loop generations) have no df.instances row and would be + -- mis-detected as roots if we keyed on "no df row" alone, so the orphan + -- seed is restricted to parent_instance_id IS NULL. + -- Wrapped so a GC failure never aborts reconcile or kills the built-in + -- reconciler loop. + BEGIN + EXECUTE pg_catalog.format( + 'WITH RECURSIVE orphan_root AS ( ' + ' SELECT d.instance_id ' + ' FROM %1$I.instances d ' + ' LEFT JOIN df.instances i ON i.id = d.instance_id ' + ' WHERE i.id IS NULL ' + ' AND d.parent_instance_id IS NULL ' + ' AND d.created_at < pg_catalog.now() - pg_catalog.make_interval(secs => $1) ' + '), subtree AS ( ' + ' SELECT instance_id FROM orphan_root ' + ' UNION ' + ' SELECT c.instance_id FROM %1$I.instances c ' + ' JOIN subtree s ON c.parent_instance_id = s.instance_id ' + ') SELECT pg_catalog.array_agg(instance_id) FROM subtree', + sch) + INTO orphan_ids + USING p_grace_seconds; + + IF orphan_ids IS NOT NULL AND pg_catalog.array_length(orphan_ids, 1) > 0 THEN + EXECUTE pg_catalog.format( + 'SELECT instances_deleted FROM %I.delete_instances_atomic($1, $2)', sch) + INTO deleted + USING orphan_ids, true; + END IF; + EXCEPTION WHEN OTHERS THEN + deleted := 0; + RAISE WARNING 'pg_durable: reconcile orphan-GC pass failed: %', SQLERRM; + END; + + -- 2) df.instances stuck non-terminal with no live duroxide instance and no + -- queued start (lost enqueue) -> mark failed. The duroxide queue row + -- persists (locked) until ack, and the instance row is created at ack, so + -- a healthy in-flight start always matches one of the NOT EXISTS guards + -- and is never failed here. Best-effort; wrapped like step 1. + BEGIN + EXECUTE pg_catalog.format( + 'UPDATE df.instances i ' + 'SET status = ''failed'', updated_at = pg_catalog.now() ' + 'WHERE i.status IN (''pending'', ''running'') ' + ' AND i.updated_at < pg_catalog.now() - pg_catalog.make_interval(secs => $1) ' + ' AND NOT EXISTS (SELECT 1 FROM %1$I.instances d WHERE d.instance_id = i.id) ' + ' AND NOT EXISTS (SELECT 1 FROM %1$I.orchestrator_queue q WHERE q.instance_id = i.id)', + sch) + USING p_grace_seconds; + GET DIAGNOSTICS stuck = ROW_COUNT; + EXCEPTION WHEN OTHERS THEN + stuck := 0; + RAISE WARNING 'pg_durable: reconcile stuck-failover pass failed: %', SQLERRM; + END; + + duroxide_orphans_deleted := deleted; + stuck_instances_failed := stuck; + RETURN NEXT; +END; +$fn$; + +REVOKE EXECUTE ON FUNCTION df.reconcile(integer) FROM PUBLIC; +"#, + name = "reconcile", + requires = ["create_tables"] +); + // ============================================================================ // Extension Validation (must run before duroxide schema creation) // ============================================================================ @@ -973,8 +1096,7 @@ mod tests { // Test that is_durofut recognizes it assert!( Durofut::is_durofut(&sleep_json), - "Durofut::is_durofut should recognize SLEEP node JSON: {}", - sleep_json + "Durofut::is_durofut should recognize SLEEP node JSON: {sleep_json}" ); // Test that ensure doesn't wrap it in SQL @@ -2766,9 +2888,8 @@ mod tests { use crate::types::evaluate_condition; // Simulates a SQL condition query that returns zero rows let empty_result = r#"{"rows":[],"row_count":0}"#; - assert_eq!( - evaluate_condition(empty_result).unwrap(), - false, + assert!( + !evaluate_condition(empty_result).unwrap(), "Empty result set should evaluate as false for conditions" ); } @@ -2777,9 +2898,8 @@ mod tests { fn test_evaluate_condition_single_true_row() { use crate::types::evaluate_condition; let result = r#"{"rows":[{"col":true}],"row_count":1}"#; - assert_eq!( + assert!( evaluate_condition(result).unwrap(), - true, "Single row with true value should be truthy" ); } @@ -2788,9 +2908,8 @@ mod tests { fn test_evaluate_condition_single_false_row() { use crate::types::evaluate_condition; let result = r#"{"rows":[{"col":false}],"row_count":1}"#; - assert_eq!( - evaluate_condition(result).unwrap(), - false, + assert!( + !evaluate_condition(result).unwrap(), "Single row with false value should be falsy" ); } @@ -2800,9 +2919,8 @@ mod tests { use crate::types::evaluate_condition; // A query like SELECT count(*) FROM empty_table returns 0 let result = r#"{"rows":[{"count":0}],"row_count":1}"#; - assert_eq!( - evaluate_condition(result).unwrap(), - false, + assert!( + !evaluate_condition(result).unwrap(), "Row with zero value should be falsy" ); } diff --git a/src/monitoring.rs b/src/monitoring.rs index 12f2872..54bbf30 100644 --- a/src/monitoring.rs +++ b/src/monitoring.rs @@ -135,8 +135,7 @@ pub fn list_instances( let batch_sql = format!( "SELECT gi.instance_id, gi.orchestration_name, gi.current_execution_id, gi.output \ FROM unnest($1::text[]) AS t(id) \ - CROSS JOIN LATERAL {schema}.get_instance_info(t.id) AS gi", - schema = provider_schema + CROSS JOIN LATERAL {provider_schema}.get_instance_info(t.id) AS gi" ); let rows = match sqlx::query_as::<_, (String, String, i64, Option)>(&batch_sql) diff --git a/src/ssrf.rs b/src/ssrf.rs index 7ceab95..40469b1 100644 --- a/src/ssrf.rs +++ b/src/ssrf.rs @@ -222,9 +222,8 @@ pub fn validate_url_allowlist(url: &str) -> Result<(), String> { } Err(format!( - "Blocked: '{}' is not in the allowed endpoint list. \ - Only requests to approved Azure service domains are permitted.", - host + "Blocked: '{host}' is not in the allowed endpoint list. \ + Only requests to approved Azure service domains are permitted." )) } } diff --git a/src/types.rs b/src/types.rs index b0ba264..9affd2a 100644 --- a/src/types.rs +++ b/src/types.rs @@ -71,10 +71,9 @@ pub fn is_role_superuser_oid(role_oid: pgrx::pg_sys::Oid) -> Result Ok(v), - Ok(None) => Err(format!("role oid {} not found in pg_roles", role_oid)), + Ok(None) => Err(format!("role oid {role_oid} not found in pg_roles")), Err(e) => Err(format!( - "superuser check failed for role oid {}: {}", - role_oid, e + "superuser check failed for role oid {role_oid}: {e}" )), } } @@ -87,8 +86,8 @@ pub async fn is_role_superuser_name(pool: &sqlx::PgPool, role_name: &str) -> Res .bind(role_name) .fetch_optional(pool) .await - .map_err(|e| format!("superuser check failed for role '{}': {}", role_name, e)) - .and_then(|opt| opt.ok_or_else(|| format!("role '{}' not found in pg_roles", role_name))) + .map_err(|e| format!("superuser check failed for role '{role_name}': {e}")) + .and_then(|opt| opt.ok_or_else(|| format!("role '{role_name}' not found in pg_roles"))) } /// Maximum nesting depth for workflow graphs. Prevents stack overflow from @@ -159,8 +158,7 @@ fn normalize_role_name_for_connection(user: &str) -> Result, String if !user.starts_with('"') { if user.ends_with('"') { return Err(format!( - "Invalid role name '{}': unexpected trailing double quote in connection username", - user + "Invalid role name '{user}': unexpected trailing double quote in connection username" )); } return Ok(Cow::Borrowed(user)); @@ -168,8 +166,7 @@ fn normalize_role_name_for_connection(user: &str) -> Result, String if !user.ends_with('"') || user.len() < 2 { return Err(format!( - "Invalid role name '{}': unterminated quoted identifier in connection username", - user + "Invalid role name '{user}': unterminated quoted identifier in connection username" )); } @@ -190,8 +187,7 @@ fn normalize_role_name_for_connection(user: &str) -> Result, String } return Err(format!( - "Invalid quoted role name '{}': expected doubled double quotes inside identifier", - user + "Invalid quoted role name '{user}': expected doubled double quotes inside identifier" )); } @@ -249,7 +245,7 @@ pub async fn connect_as_user( sqlx::query("SET df.in_workflow = 'true'") .execute(&mut conn) .await - .map_err(|e| format!("SET df.in_workflow failed: {}", e))?; + .map_err(|e| format!("SET df.in_workflow failed: {e}"))?; Ok(conn) } @@ -485,8 +481,7 @@ pub fn validate_result_name(name: &str) -> Result<(), String> { let parsed = parse_identifier(name); if parsed.len() != name.len() { return Err(format!( - "result name '{}' is not a valid identifier — must match [a-zA-Z_][a-zA-Z0-9_]*", - name + "result name '{name}' is not a valid identifier — must match [a-zA-Z_][a-zA-Z0-9_]*" )); } Ok(()) @@ -998,8 +993,7 @@ impl Durofut { } let marker = Spi::get_one::(&format!( - "SELECT pg_catalog.current_setting('{}', true)", - NON_FUTURE_HELPER_GUC + "SELECT pg_catalog.current_setting('{NON_FUTURE_HELPER_GUC}', true)" )) .ok() .flatten()?; @@ -1014,8 +1008,7 @@ impl Durofut { fn non_future_helper_error(helper_name: &str) -> String { format!( - "{} cannot be used as a workflow step. Call {} before df.start().", - helper_name, helper_name + "{helper_name} cannot be used as a workflow step. Call {helper_name} before df.start()." ) } @@ -1026,7 +1019,7 @@ impl Durofut { /// Fallible deserialization from JSON. Preferred over `from_json()` in /// production code paths where corrupted data must not crash the worker. pub fn try_from_json(s: &str) -> Result { - serde_json::from_str(s).map_err(|e| format!("failed to deserialize Durofut: {}", e)) + serde_json::from_str(s).map_err(|e| format!("failed to deserialize Durofut: {e}")) } /// Deserialize from JSON, panicking on failure. @@ -1084,8 +1077,7 @@ impl Durofut { if VALID_NODE_TYPES.contains(&nt) { // Valid node_type but malformed structure return Err(format!( - "Malformed Durofut JSON with node_type '{}': {}", - nt, serde_err + "Malformed Durofut JSON with node_type '{nt}': {serde_err}" )); } return Err(format!( @@ -1118,16 +1110,14 @@ impl Durofut { *node_count += 1; if *node_count > MAX_GRAPH_NODES { return Err(format!( - "Workflow exceeds maximum node count of {}. \ - Simplify the workflow or break it into multiple instances.", - MAX_GRAPH_NODES + "Workflow exceeds maximum node count of {MAX_GRAPH_NODES}. \ + Simplify the workflow or break it into multiple instances." )); } if depth > MAX_GRAPH_DEPTH { return Err(format!( - "Graph exceeds maximum nesting depth of {}. \ - Simplify the workflow or break it into multiple instances.", - MAX_GRAPH_DEPTH + "Graph exceeds maximum nesting depth of {MAX_GRAPH_DEPTH}. \ + Simplify the workflow or break it into multiple instances." )); } if !VALID_NODE_TYPES.contains(&self.node_type.as_str()) { diff --git a/src/worker.rs b/src/worker.rs index 2bd3bf7..b22c37c 100644 --- a/src/worker.rs +++ b/src/worker.rs @@ -243,6 +243,10 @@ async fn run_duroxide_runtime() { } }; + // Ensure the built-in durable reconciler is running for this epoch. + // Idempotent: starts one only if none is pending/running. + ensure_reconciler(&mgmt_pool).await; + run_until_extension_dropped_or_shutdown( &poll_pool, &mgmt_pool, @@ -258,6 +262,106 @@ async fn run_duroxide_runtime() { poll_pool.close().await; } +/// Built-in durable reconciler. Ensures one reconciler instance is running per +/// cluster: an infinite durable loop that calls `df.reconcile()` on the +/// `pg_durable.reconciler_cron` schedule. +/// +/// Idempotent: does nothing if a reconciler is already pending/running, and +/// (re)starts one if it has died. Called once per epoch and then periodically +/// from the steady-state poll loop, so a reconciler that fails mid-epoch is +/// restarted within the poll interval rather than only at the next epoch. The +/// instance is submitted by a dedicated non-superuser role (`df_reconciler`) +/// granted only df usage plus EXECUTE on the SECURITY DEFINER `df.reconcile()`. +async fn ensure_reconciler(pool: &sqlx::PgPool) { + const ROLE: &str = "df_reconciler"; + const LABEL: &str = "df_reconciler"; + + let cron = crate::RECONCILER_CRON + .get() + .map(|c| c.to_string_lossy().into_owned()) + .unwrap_or_default(); + let cron = cron.trim().to_string(); + if cron.is_empty() { + return; + } + + // Runs as the worker role (bypasses RLS). Use submitted_by, not label, as + // the singleton key because labels are user-controlled. + let already_running: i64 = match sqlx::query_scalar( + "SELECT count(*) FROM df.instances \ + WHERE submitted_by::text = $1 AND status IN ('pending', 'running')", + ) + .bind(ROLE) + .fetch_one(pool) + .await + { + Ok(n) => n, + Err(e) => { + log!("pg_durable: reconciler liveness check failed: {}", e); + return; + } + }; + if already_running > 0 { + return; + } + + if let Err(e) = ensure_reconciler_role(pool, ROLE).await { + log!("pg_durable: failed to provision reconciler role: {}", e); + return; + } + + // Start the durable loop as the dedicated role on a single connection so the + // SET ROLE applies to df.start() (which captures submitted_by = current_user). + let started = async { + let mut conn = pool.acquire().await?; + sqlx::query(&format!("SET ROLE {ROLE}")) + .execute(&mut *conn) + .await?; + let res = sqlx::query( + "SELECT df.start(\ + df.loop(df.seq('SELECT * FROM df.reconcile()', df.wait_for_schedule($1))), \ + $2)", + ) + .bind(&cron) + .bind(LABEL) + .execute(&mut *conn) + .await; + let _ = sqlx::query("RESET ROLE").execute(&mut *conn).await; + res.map(|_| ()) + } + .await; + + match started { + Ok(()) => log!("pg_durable: started built-in reconciler (cron='{cron}')"), + Err(e) => log!("pg_durable: failed to start built-in reconciler: {}", e), + } +} + +/// Create the dedicated non-superuser reconciler role (if absent) and grant it +/// df usage plus EXECUTE on df.reconcile(). Idempotent. +async fn ensure_reconciler_role(pool: &sqlx::PgPool, role: &str) -> Result<(), sqlx::Error> { + // role is a fixed compile-time constant, so the format! is injection-safe. + sqlx::query(&format!( + "DO $$ BEGIN \ + IF NOT EXISTS (SELECT 1 FROM pg_catalog.pg_roles WHERE rolname = '{role}') THEN \ + CREATE ROLE {role} LOGIN; \ + END IF; \ + END $$;" + )) + .execute(pool) + .await?; + sqlx::query("SELECT df.grant_usage($1)") + .bind(role) + .execute(pool) + .await?; + sqlx::query(&format!( + "GRANT EXECUTE ON FUNCTION df.reconcile(integer) TO {role}" + )) + .execute(pool) + .await?; + Ok(()) +} + async fn wait_for_extension_creation(poll_pool: &sqlx::PgPool, poll_interval: Duration) -> bool { log!("pg_durable: waiting for CREATE EXTENSION pg_durable..."); @@ -346,7 +450,7 @@ BEGIN JOIN pg_extension e ON e.oid = d.refobjid AND e.extname = 'pg_durable' - WHERE n.nspname = '{schema}' + WHERE n.nspname = '{schema_name}' LOOP EXECUTE 'ALTER EXTENSION pg_durable DROP TRIGGER ' || r.trigger_name || ' ON ' || r.table_name; @@ -364,7 +468,7 @@ BEGIN JOIN pg_extension e ON e.oid = d.refobjid AND e.extname = 'pg_durable' - WHERE n.nspname = '{schema}' + WHERE n.nspname = '{schema_name}' LOOP EXECUTE 'ALTER EXTENSION pg_durable DROP FUNCTION ' || r.sig; END LOOP; @@ -381,7 +485,7 @@ BEGIN JOIN pg_extension e ON e.oid = d.refobjid AND e.extname = 'pg_durable' - WHERE n.nspname = '{schema}' AND c.relkind = 'r' + WHERE n.nspname = '{schema_name}' AND c.relkind = 'r' LOOP EXECUTE 'ALTER EXTENSION pg_durable DROP TABLE ' || r.name; END LOOP; @@ -400,7 +504,7 @@ BEGIN JOIN pg_extension e ON e.oid = d.refobjid AND e.extname = 'pg_durable' - WHERE n.nspname = '{schema}' AND c.relkind = 'i' + WHERE n.nspname = '{schema_name}' AND c.relkind = 'i' LOOP EXECUTE 'ALTER EXTENSION pg_durable DROP INDEX ' || r.name; END LOOP; @@ -417,12 +521,11 @@ BEGIN JOIN pg_extension e ON e.oid = d.refobjid AND e.extname = 'pg_durable' - WHERE n.nspname = '{schema}' AND c.relkind = 'S' + WHERE n.nspname = '{schema_name}' AND c.relkind = 'S' LOOP EXECUTE 'ALTER EXTENSION pg_durable DROP SEQUENCE ' || r.name; END LOOP; -END $$"#, - schema = schema_name +END $$"# )) .execute(pool) .await?; @@ -585,40 +688,34 @@ async fn write_epoch_sentinel(pool: &sqlx::PgPool) -> Result Result<(), sqlx::Error> { sqlx::query(&format!( - "CREATE TABLE IF NOT EXISTS {schema}._worker_ready ( + "CREATE TABLE IF NOT EXISTS {schema_name}._worker_ready ( sentinel BOOLEAN PRIMARY KEY DEFAULT TRUE, CONSTRAINT only_one_sentinel CHECK (sentinel), schema_version INT NOT NULL, initialized_at TIMESTAMPTZ NOT NULL DEFAULT now() - )", - schema = schema_name + )" )) .execute(pool) .await?; // Allow non-superuser sessions to read the readiness record via // is_worker_ready() which runs SPI in the caller's security context. + sqlx::query(&format!("GRANT USAGE ON SCHEMA {schema_name} TO PUBLIC")) + .execute(pool) + .await?; sqlx::query(&format!( - "GRANT USAGE ON SCHEMA {schema} TO PUBLIC", - schema = schema_name - )) - .execute(pool) - .await?; - sqlx::query(&format!( - "GRANT SELECT ON {schema}._worker_ready TO PUBLIC", - schema = schema_name + "GRANT SELECT ON {schema_name}._worker_ready TO PUBLIC" )) .execute(pool) .await?; sqlx::query(&format!( - "INSERT INTO {schema}._worker_ready (sentinel, schema_version, initialized_at) \ + "INSERT INTO {schema_name}._worker_ready (sentinel, schema_version, initialized_at) \ VALUES (TRUE, $1, now()) \ ON CONFLICT (sentinel) DO UPDATE SET \ schema_version = EXCLUDED.schema_version, \ initialized_at = EXCLUDED.initialized_at \ - WHERE {schema}._worker_ready.schema_version != EXCLUDED.schema_version", - schema = schema_name + WHERE {schema_name}._worker_ready.schema_version != EXCLUDED.schema_version" )) .bind(crate::WORKER_SCHEMA_VERSION) .execute(pool) @@ -776,6 +873,11 @@ async fn run_until_extension_dropped_or_shutdown( log!("pg_durable: epoch sentinel gone — extension dropped or recreated"); break; } + + // Self-heal: re-assert the built-in reconciler so a loop that + // failed/cancelled mid-epoch gets restarted without waiting for + // the next extension epoch. + ensure_reconciler(maintenance_pool).await; } _ = prune_check.tick() => { match prune_terminal_instances(maintenance_pool).await { diff --git a/tests/e2e/sql/06_http_and_ssrf.sql b/tests/e2e/sql/06_http_and_ssrf.sql index 561ee54..d333f4a 100644 --- a/tests/e2e/sql/06_http_and_ssrf.sql +++ b/tests/e2e/sql/06_http_and_ssrf.sql @@ -263,85 +263,84 @@ END $$; DROP TABLE _test_http_vars; SELECT df.clearvars(); --- === Test: 19_github_api === +-- === Test: 19_http_json_api_loop === -DROP TABLE IF EXISTS github_commits; -CREATE TABLE github_commits ( +DROP TABLE IF EXISTS http_loop_results; +CREATE TABLE http_loop_results ( id SERIAL PRIMARY KEY, - sha TEXT UNIQUE, - author TEXT, - message TEXT, - committed_at TIMESTAMPTZ, + url TEXT UNIQUE, + source TEXT, + user_agent TEXT, fetched_at TIMESTAMPTZ DEFAULT now() ); SELECT df.clearvars(); -SELECT df.setvar('github_url', 'https://api.github.com/repos/microsoft/duroxide/commits?per_page=5'); +SELECT df.setvar('api_url', 'https://httpbingo.org/get?source=pg_durable'); -CREATE TEMP TABLE _test_github (instance_id TEXT); +CREATE TEMP TABLE _test_http_loop (instance_id TEXT); -INSERT INTO _test_github SELECT df.start( +INSERT INTO _test_http_loop SELECT df.start( @> ( (df.http( - '{github_url}', + '{api_url}', 'GET', NULL, - '{"Accept": "application/vnd.github.v3+json", "User-Agent": "pg_durable-test"}'::jsonb + '{"Accept": "application/json", "User-Agent": "pg_durable-test"}'::jsonb ) |=> 'response') - ~> 'INSERT INTO github_commits (sha, author, message, committed_at) - SELECT - c->>''sha'', - c->''commit''->''author''->>''name'', - c->''commit''->>''message'', - (c->''commit''->''author''->>''date'')::timestamptz - FROM jsonb_array_elements(($response::jsonb->>''body'')::jsonb) AS c - ON CONFLICT (sha) DO UPDATE SET + ~> 'INSERT INTO http_loop_results (url, source, user_agent) + SELECT + body->>''url'', + body->''args''->>''source'', + body->''headers''->>''User-Agent'' + FROM (SELECT ($response::jsonb->>''body'')::jsonb AS body) s + WHERE ($response::jsonb->>''ok'')::boolean + ON CONFLICT (url) DO UPDATE SET fetched_at = now() - RETURNING sha' + RETURNING url' ~> df.wait_for_schedule('*/30 * * * *') ), - 'github-commit-sync' + 'http-json-loop-sync' ); DO $$ DECLARE inst_id TEXT; status TEXT; - commit_count INT; + result_count INT; attempts INT := 0; BEGIN - SELECT instance_id INTO inst_id FROM _test_github; - RAISE NOTICE 'Testing GitHub API with vars and loop: %', inst_id; + SELECT instance_id INTO inst_id FROM _test_http_loop; + RAISE NOTICE 'Testing HTTP JSON API with vars and loop: %', inst_id; LOOP SELECT s INTO status FROM df.status(inst_id) s; - SELECT COUNT(*) INTO commit_count FROM github_commits; - EXIT WHEN commit_count > 0 OR lower(status) = 'failed' OR attempts > 300; + SELECT COUNT(*) INTO result_count FROM http_loop_results; + EXIT WHEN result_count > 0 OR lower(status) = 'failed' OR attempts > 300; PERFORM pg_sleep(0.1); attempts := attempts + 1; END LOOP; IF lower(status) = 'failed' THEN - RAISE EXCEPTION 'TEST FAILED: GitHub API fetch status = %', status; + RAISE EXCEPTION 'TEST FAILED: HTTP JSON API fetch status = %', status; END IF; - SELECT COUNT(*) INTO commit_count FROM github_commits; - RAISE NOTICE 'Fetched % commits from GitHub', commit_count; + SELECT COUNT(*) INTO result_count FROM http_loop_results; + RAISE NOTICE 'Fetched % HTTP JSON row(s)', result_count; - IF commit_count = 0 THEN - RAISE EXCEPTION 'TEST FAILED: No commits fetched from GitHub API'; + IF result_count = 0 THEN + RAISE EXCEPTION 'TEST FAILED: No rows fetched from HTTP JSON API'; END IF; PERFORM df.cancel(inst_id, 'Test completed - cancelling scheduled loop'); RAISE NOTICE 'Cancelled scheduled loop after successful first iteration'; - RAISE NOTICE 'TEST PASSED: github_api_with_vars_and_loop'; + RAISE NOTICE 'TEST PASSED: http_json_api_with_vars_and_loop'; END $$; -SELECT sha, author, committed_at, LEFT(message, 50) AS message FROM github_commits ORDER BY committed_at DESC; +SELECT url, source, user_agent FROM http_loop_results ORDER BY fetched_at DESC; -DROP TABLE _test_github; -DROP TABLE github_commits; +DROP TABLE _test_http_loop; +DROP TABLE http_loop_results; SELECT df.clearvars(); -- === Test: 36_ssrf_protection === diff --git a/tests/e2e/sql/26_reconcile_orphan_gc.sql b/tests/e2e/sql/26_reconcile_orphan_gc.sql new file mode 100644 index 0000000..f8e6589 --- /dev/null +++ b/tests/e2e/sql/26_reconcile_orphan_gc.sql @@ -0,0 +1,139 @@ +-- Copyright (c) Microsoft Corporation. +-- Licensed under the PostgreSQL License. + +-- Tests: df.reconcile() repairs leftover df/_duroxide drift. +-- +-- df.reconcile() deletes orphaned ROOT runtime instances (no df.instances row, +-- older than the grace window), gathering each orphan's full subtree so the +-- delete is accepted, and leaves healthy instances untouched. +-- +-- Without the change df.reconcile() does not exist, so the calls below raise +-- undefined_function and the test fails. +-- +-- _duroxide is read as the superuser (postgres); durable functions run as +-- df_e2e_user. + +-- Helper (superuser): number of direct children (sub-orchestrations) of a root. +CREATE OR REPLACE FUNCTION pg_temp.duroxide_child_count(p_root text) +RETURNS bigint LANGUAGE plpgsql AS $$ +DECLARE sch text := df.duroxide_schema(); n bigint; +BEGIN + EXECUTE format('SELECT count(*) FROM %I.instances WHERE parent_instance_id = $1', sch) + INTO n USING p_root; + RETURN n; +END $$; + +-- Helper (superuser): size of the full instance subtree rooted at p_root. +CREATE OR REPLACE FUNCTION pg_temp.duroxide_subtree_count(p_root text) +RETURNS bigint LANGUAGE plpgsql AS $$ +DECLARE sch text := df.duroxide_schema(); n bigint; +BEGIN + EXECUTE format( + 'WITH RECURSIVE t AS ( ' + ' SELECT instance_id FROM %1$I.instances WHERE instance_id = $1 ' + ' UNION ' + ' SELECT c.instance_id FROM %1$I.instances c JOIN t ON c.parent_instance_id = t.instance_id ' + ') SELECT count(*) FROM t', sch) + INTO n USING p_root; + RETURN n; +END $$; + +-- =========================================================================== +-- Scenario 1: a planted root orphan WITH a running subtree is fully collected +-- =========================================================================== + +-- Start a parallel JOIN: each branch runs as its own sub-orchestration, which +-- has no df.instances row of its own. Orphaning the root then forces reconcile +-- to gather the FULL subtree (root + children) -- deleting only the root would be +-- refused by delete_instances_atomic, leaving the orphan behind. Long branches +-- keep the children running for the duration of the test. +SET SESSION AUTHORIZATION df_e2e_user; +CREATE TEMP TABLE _t_orphan (instance_id TEXT); +INSERT INTO _t_orphan +SELECT df.start('SELECT pg_sleep(30)' & 'SELECT pg_sleep(30)', 'reconcile-orphan-victim'); +RESET SESSION AUTHORIZATION; + +-- Wait until the worker has materialized the root AND at least one child, so the +-- subtree genuinely exists when we reconcile. +DO $$ +DECLARE inst_id TEXT; attempts INT := 0; +BEGIN + SELECT instance_id INTO inst_id FROM _t_orphan; + LOOP + EXIT WHEN pg_temp.duroxide_child_count(inst_id) >= 1 OR attempts > 300; + PERFORM pg_sleep(0.1); + attempts := attempts + 1; + END LOOP; + IF pg_temp.duroxide_child_count(inst_id) < 1 THEN + RAISE EXCEPTION 'Setup: orphan victim never spawned a child sub-orchestration'; + END IF; + RAISE NOTICE 'Setup: orphan victim % has % child sub-orchestration(s)', inst_id, pg_temp.duroxide_child_count(inst_id); +END $$; + +-- Orphan it: drop the control-plane rows (superuser bypasses row security). +-- df.instances and df.nodes reference each other, but the FKs are +-- DEFERRABLE INITIALLY DEFERRED, so deleting both in one transaction is fine. +BEGIN; +DELETE FROM df.instances WHERE id = (SELECT instance_id FROM _t_orphan); +DELETE FROM df.nodes WHERE instance_id = (SELECT instance_id FROM _t_orphan); +COMMIT; + +-- Reconcile with a zero grace window: the orphaned root AND its subtree must be +-- collected. +SELECT df.reconcile(0); + +DO $$ +DECLARE inst_id TEXT; remaining bigint; +BEGIN + SELECT instance_id INTO inst_id FROM _t_orphan; + remaining := pg_temp.duroxide_subtree_count(inst_id); + IF remaining > 0 THEN + RAISE EXCEPTION 'TEST FAILED: df.reconcile left % subtree row(s) for orphaned root % (subtree not gathered)', remaining, inst_id; + END IF; + RAISE NOTICE 'PASSED [orphan_subtree_collected]: root % and all children removed', inst_id; +END $$; + +DROP TABLE _t_orphan; + +-- =========================================================================== +-- Scenario 2: a healthy, running instance is left untouched +-- =========================================================================== + +SET SESSION AUTHORIZATION df_e2e_user; +CREATE TEMP TABLE _t_live (instance_id TEXT); +INSERT INTO _t_live SELECT df.start('SELECT pg_sleep(3)', 'reconcile-live'); +RESET SESSION AUTHORIZATION; + +-- Ensure it is running before reconciling. +DO $$ +DECLARE inst_id TEXT; status TEXT; attempts INT := 0; +BEGIN + SELECT instance_id INTO inst_id FROM _t_live; + LOOP + SELECT s INTO status FROM df.status(inst_id) s; + EXIT WHEN lower(status) = 'running' OR attempts > 300; + PERFORM pg_sleep(0.1); + attempts := attempts + 1; + END LOOP; + IF lower(status) <> 'running' THEN + RAISE EXCEPTION 'Setup: live instance never reached running (status=%)', status; + END IF; +END $$; + +-- Reconcile must not disturb a healthy instance (it has a df.instances row). +SELECT df.reconcile(0); + +DO $$ +DECLARE inst_id TEXT; status TEXT; +BEGIN + SELECT instance_id INTO inst_id FROM _t_live; + SELECT df.wait_for_completion(inst_id, 30) INTO status; + IF lower(status) <> 'completed' THEN + RAISE EXCEPTION 'TEST FAILED: df.reconcile disturbed a healthy instance % (status=%)', inst_id, status; + END IF; + RAISE NOTICE 'PASSED [live_untouched]: % completed normally despite reconcile', inst_id; +END $$; + +DROP TABLE _t_live; + +SELECT 'TEST PASSED: reconcile orphan GC' AS result;