Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions USER_GUIDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -1535,6 +1535,23 @@ SELECT * FROM df.metrics();

> **Note:** `df.metrics()` returns system-wide aggregate counts across all users and is omitted from an ordinary `df.grant_usage('role')`. It is granted automatically to pg_durable admins via `df.grant_usage('role', with_grant => true)`, or you can grant EXECUTE on `df.metrics()` directly to any role that may view cluster-wide pg_durable activity. Other users can call `df.list_instances()` to view a summary of their own workflows.

### Reconcile Runtime Drift (Admin Only)

```sql
-- Requires an explicit admin grant or superuser.
SELECT * FROM df.reconcile();
SELECT * FROM df.reconcile(0); -- no grace window, useful for tests
```

**Columns:** `duroxide_orphans_deleted`, `stuck_instances_failed`

`df.reconcile()` is a best-effort repair backstop. It deletes old runtime root
instances whose full `_duroxide` subtree has no matching `df.instances` row, and
marks stale non-terminal `df.instances` rows failed when there is no live runtime
instance and no queued start. The background worker keeps one built-in reconciler
loop running on `pg_durable.reconciler_cron` (default `*/5 * * * *`); set the GUC
to an empty string and restart to disable the built-in loop.

### Quick Status Check

`df.status()` and `df.result()` take an **`instance_id`** (returned by `df.start()`), **not** a label. Passing a label returns `NULL`.
Expand Down
2 changes: 1 addition & 1 deletion docs/E2E_TESTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ The test suite is organized into 23 files. Files `01`–`09` open with `SET SESS
| `14_database.sql` | Wrong-database `CREATE EXTENSION` rejection; `df.start(query, label, database)` multi-database routing |
| `15_rls.sql` | RLS on `df.instances` / `df.nodes` / `df.vars` — per-user visibility, cross-user cancel/signal denied, column-level UPDATE, superuser bypass, per-user variable isolation |
| `16_heartbeat.sql` | Worker heartbeat liveness — `df._worker_epoch.last_seen_at` advances over time |
| `26_reconcile_orphan_gc.sql` | `df.reconcile()` deletes orphaned runtime root subtrees and leaves healthy instances untouched |
| `52_node_id_collision_across_instances.sql` | Cross-instance node-ID collision — two instances own the same 8-hex node id; asserts composite-PK coexistence, that `(instance_id, id)` addresses exactly one row, `df.result()` is instance-scoped, and a scoped `update_node_status`-style UPDATE affects exactly one row (issue #129) |

### Build-Phase Specific
Expand Down Expand Up @@ -237,4 +238,3 @@ tail -f ~/.pgrx/17.log
Error: cargo pgrx install failed
```
→ Run `cargo build --features pg17` and then retry the test runner

18 changes: 17 additions & 1 deletion docs/api-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,23 @@ SELECT df.result('a1b2c3d4');

---

### df.reconcile([p_grace_seconds])

Repairs residual drift between `df.*` control-plane rows and the configured
duroxide provider schema. Admin-only; `EXECUTE` is revoked from `PUBLIC`.

| Parameter | Type | Auto-wrap | Description |
|-----------|------|-----------|-------------|
| `p_grace_seconds` | INTEGER | ❌ Literal | Minimum orphan/stuck age before repair; defaults to 60 |

```sql
SELECT * FROM df.reconcile();
```

Returns `duroxide_orphans_deleted` and `stuck_instances_failed`.

---

### df.instance_nodes(instance_id)

Returns one row per node in an instance's graph, with each node's stored physical
Expand Down Expand Up @@ -562,4 +579,3 @@ SHOW pg_durable.enable_superuser_instances;
```

**Security note:** Setting this GUC to `on` in a multi-tenant environment allows any role with `BYPASSRLS` to forge `submitted_by` to a superuser OID and execute arbitrary SQL as superuser. Keep `off` unless you have a specific need and understand the risk. See [docs/superuser_guc.md](superuser_guc.md) for the full threat analysis.

7 changes: 7 additions & 0 deletions docs/upgrade-testing.md
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,13 @@ what the upgrade script handles, and any backward compatibility considerations.
- **Scenario B1 considerations:** The new `.so` remains compatible with v0.2.3 schemas that have not run `ALTER EXTENSION UPDATE`: existing catalog entries still bind `df.wait_for_completion` to `wait_for_completion_wrapper`, which is retained as a Rust shim to `df.await_instance`.
- **Scenario B2 considerations:** No data migration. Existing instances are unaffected; the upgrade only adds a SQL function binding.

#### `df.reconcile()` repair backstop
- **DDL change (df schema):** Adds admin-only `df.reconcile(integer)` (`SECURITY DEFINER`, `REVOKE EXECUTE ... FROM PUBLIC`) to delete orphaned duroxide instance subtrees and mark stuck `df.instances` rows failed. Fresh installs and the upgrade script define the same function.
- **Runtime change:** The background worker starts a built-in durable reconciler loop as the dedicated non-superuser role `df_reconciler`, using `pg_durable.reconciler_cron` (default `*/5 * * * *`; empty disables it) to schedule `SELECT * FROM df.reconcile()`.
- **Provider coupling:** `df.reconcile()` intentionally uses the configured duroxide schema SQL API and tables (`df.duroxide_schema()`, provider `instances`, `orchestrator_queue`, and `delete_instances_atomic`) because repair must inspect and delete provider-owned runtime rows.
- **Scenario A considerations:** Fresh-install and upgrade schemas must both expose `df.reconcile(integer)` with the same result shape and PUBLIC EXECUTE revoked.
- **Scenario B1/B2 considerations:** A new `.so` against older schemas simply lacks `df.reconcile()` until `ALTER EXTENSION UPDATE`; existing workflows are unaffected. The repair function is best-effort and additive.

#### #110 Remove df.debug_connection() (reclassified non-security cleanup)
- **DDL change (df schema):** The upgrade script `sql/pg_durable--0.2.3--0.2.4.sql` runs `DROP FUNCTION IF EXISTS df.debug_connection();`. Fresh v0.2.4 installs never create the function: its `#[pg_extern]` in `src/dsl.rs` is annotated `#[pg_extern(sql = false)]`, so pgrx emits no `CREATE FUNCTION` for it (the generated schema records `-- Skipped due to #[pgrx(sql = false)]`). The function returned the worker connection string (no credential) and is dropped as surface-reduction, because the worker role is already exposed to any role via native PostgreSQL channels — the world-readable `pg_durable.worker_role` GUC and `pg_stat_activity.usename` (see security-review item I-6); the remaining fields (database, host/port, schema) are connection-topology metadata, not secrets (the host comes from `PGHOST`, defaulting to loopback). Reclassified from security to cleanup; see issue #110.
- **Interaction with the `df.grant_usage()` simplification:** Earlier in this release `df.grant_usage()` carried `'df.debug_connection()'` in its explicit per-function allowlist (`func_sigs`), so dropping the function would have required editing that allowlist. The grant_usage simplification above (#242) removed the allowlist entirely in this same release, so the upgrade no longer needs any `grant_usage` change to account for the removed function — it simply drops `df.debug_connection()`.
Expand Down
44 changes: 34 additions & 10 deletions scripts/test-upgrade.sh
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,9 @@ stop_server() {

cleanup_databases() {
"$PSQL" -h localhost -p "$PG_PORT" -U postgres -d "$PG_DB" \
-c "DROP EXTENSION IF EXISTS pg_durable CASCADE;" 2>/dev/null || true
-c "DROP EXTENSION IF EXISTS pg_durable CASCADE;
DROP SCHEMA IF EXISTS _duroxide CASCADE;
DROP SCHEMA IF EXISTS duroxide CASCADE;" 2>/dev/null || true
}

cleanup() {
Expand Down Expand Up @@ -290,6 +292,10 @@ if [ -f "$DATA_DIR/postgresql.conf" ]; then
sed -i.bak '/^#*pg_durable.database/d' "$DATA_DIR/postgresql.conf"
echo "pg_durable.database = 'postgres'" >> "$DATA_DIR/postgresql.conf"
fi
if ! grep -q "^pg_durable.reconciler_cron = ''" "$DATA_DIR/postgresql.conf" 2>/dev/null; then
sed -i.bak '/^#*pg_durable.reconciler_cron/d' "$DATA_DIR/postgresql.conf"
echo "pg_durable.reconciler_cron = ''" >> "$DATA_DIR/postgresql.conf"
fi
if ! grep -q "^port = $PG_PORT$" "$DATA_DIR/postgresql.conf" 2>/dev/null; then
sed -i.bak '/^#*port = /d' "$DATA_DIR/postgresql.conf"
echo "port = $PG_PORT" >> "$DATA_DIR/postgresql.conf"
Expand Down Expand Up @@ -397,6 +403,27 @@ assert_sql_contains() {
fi
}

assert_sql_contains_eventually() {
local sql="$1"
local expected_fragment="$2"
local attempts="${3:-100}"
local result=""

for _ in $(seq 1 "$attempts"); do
result=$(run_sql_capture "$sql") || return 1
if [[ "$result" == *"$expected_fragment"* ]]; then
return 0
fi
sleep 0.1
done

echo ""
echo " SQL: $sql"
echo " Expected fragment: $expected_fragment"
echo " Got: $result"
return 1
}

assert_sql_empty() {
local sql="$1"
local result
Expand Down Expand Up @@ -468,8 +495,7 @@ create_extension_at_version() {
return 1
fi

"$PSQL" -h localhost -p "$PG_PORT" -U postgres -d "$PG_DB" \
-c "DROP EXTENSION IF EXISTS pg_durable CASCADE;" >/dev/null 2>&1
cleanup_databases
"$PSQL" -h localhost -p "$PG_PORT" -U postgres -d "$PG_DB" \
-v ON_ERROR_STOP=1 \
-c "CREATE EXTENSION pg_durable VERSION '${base_version}';" >/dev/null 2>&1
Expand Down Expand Up @@ -691,16 +717,14 @@ test_schema_upgrade() {
snapshot_schema "$tmpdir/upgraded.txt"

# Step 2: Fresh install at current version
"$PSQL" -h localhost -p "$PG_PORT" -U postgres -d "$PG_DB" \
-c "DROP EXTENSION IF EXISTS pg_durable CASCADE;" >/dev/null 2>&1
cleanup_databases
"$PSQL" -h localhost -p "$PG_PORT" -U postgres -d "$PG_DB" \
-v ON_ERROR_STOP=1 \
-c "CREATE EXTENSION pg_durable;" >/dev/null 2>&1
snapshot_schema "$tmpdir/fresh.txt"

# Clean up
"$PSQL" -h localhost -p "$PG_PORT" -U postgres -d "$PG_DB" \
-c "DROP EXTENSION IF EXISTS pg_durable CASCADE;" >/dev/null 2>&1
cleanup_databases

# Filter out PUBLIC grant rows (grant_table, grant_routine, grant_schema)
# from both snapshots before comparison. Grants to PUBLIC intentionally
Expand Down Expand Up @@ -857,7 +881,7 @@ test_b1_status_instance() {
}

test_b1_result() {
assert_sql_contains "SELECT df.result('${B1_INSTANCE_ID}');" "test_value"
assert_sql_contains_eventually "SELECT df.result('${B1_INSTANCE_ID}');" "test_value"
}

test_b1_status_nonexistent() {
Expand Down Expand Up @@ -957,14 +981,14 @@ test_b2_data_survives_upgrade() {

test_b2_pre_upgrade_instance_after_upgrade() {
assert_sql_equals "SELECT df.status('${B2_PRE_INSTANCE_ID}');" "completed" &&
assert_sql_contains "SELECT df.result('${B2_PRE_INSTANCE_ID}');" "b2_value" &&
assert_sql_contains_eventually "SELECT df.result('${B2_PRE_INSTANCE_ID}');" "b2_value" &&
assert_sql_equals "SELECT lower(status) FROM df.instance_info('${B2_PRE_INSTANCE_ID}');" "completed" &&
assert_sql_equals "SELECT EXISTS (SELECT 1 FROM df.list_instances() WHERE instance_id = '${B2_PRE_INSTANCE_ID}');" "t"
}

test_b2_inflight_work_after_upgrade() {
assert_sql_equals_ignoring_warnings "SELECT df.wait_for_completion('${B2_INFLIGHT_INSTANCE_ID}', 30);" "completed" &&
assert_sql_contains "SELECT df.result('${B2_INFLIGHT_INSTANCE_ID}');" "b2-running"
assert_sql_contains_eventually "SELECT df.result('${B2_INFLIGHT_INSTANCE_ID}');" "b2-running"
}

test_b2_new_data_after_upgrade() {
Expand Down
89 changes: 89 additions & 0 deletions sql/pg_durable--0.2.3--0.2.4.sql
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,95 @@ STRICT
LANGUAGE c
AS 'MODULE_PATHNAME', 'await_instance_wrapper';

-- ============================================================================
-- df.reconcile(): repair residual df.* / duroxide divergence.
--
-- Best-effort admin backstop: delete orphaned duroxide instance subtrees whose
-- root has no df.instances row, and mark stale df.instances rows failed when
-- the runtime has neither a live instance nor a queued start. Fresh installs
-- define the same function in src/lib.rs.
-- ============================================================================
CREATE FUNCTION df.reconcile(p_grace_seconds integer DEFAULT 60)
RETURNS TABLE(duroxide_orphans_deleted bigint, stuck_instances_failed bigint)
LANGUAGE plpgsql
SECURITY DEFINER
SET search_path = pg_catalog, pg_temp
AS $fn$
DECLARE
sch text := df.duroxide_schema();
orphan_ids text[];
deleted bigint := 0;
stuck bigint := 0;
BEGIN
-- 1) Delete orphaned duroxide subtrees: every duroxide instance whose ROOT
-- ancestor has no df.instances row and is older than the grace window.
-- We must gather the FULL subtree (root + all descendants) because
-- delete_instances_atomic refuses (even with force) to delete a parent
-- whose children are not also in the list. Sub-orchestrations (JOIN/RACE
-- branches, loop generations) have no df.instances row and would be
-- mis-detected as roots if we keyed on "no df row" alone, so the orphan
-- seed is restricted to parent_instance_id IS NULL.
-- Wrapped so a GC failure never aborts reconcile or kills the built-in
-- reconciler loop.
BEGIN
EXECUTE pg_catalog.format(
'WITH RECURSIVE orphan_root AS ( '
' SELECT d.instance_id '
' FROM %1$I.instances d '
' LEFT JOIN df.instances i ON i.id = d.instance_id '
' WHERE i.id IS NULL '
' AND d.parent_instance_id IS NULL '
' AND d.created_at < pg_catalog.now() - pg_catalog.make_interval(secs => $1) '
'), subtree AS ( '
' SELECT instance_id FROM orphan_root '
' UNION '
' SELECT c.instance_id FROM %1$I.instances c '
' JOIN subtree s ON c.parent_instance_id = s.instance_id '
') SELECT pg_catalog.array_agg(instance_id) FROM subtree',
sch)
INTO orphan_ids
USING p_grace_seconds;

IF orphan_ids IS NOT NULL AND pg_catalog.array_length(orphan_ids, 1) > 0 THEN
EXECUTE pg_catalog.format(
'SELECT instances_deleted FROM %I.delete_instances_atomic($1, $2)', sch)
INTO deleted
USING orphan_ids, true;
END IF;
EXCEPTION WHEN OTHERS THEN
deleted := 0;
RAISE WARNING 'pg_durable: reconcile orphan-GC pass failed: %', SQLERRM;
END;

-- 2) df.instances stuck non-terminal with no live duroxide instance and no
-- queued start (lost enqueue) -> mark failed. The duroxide queue row
-- persists (locked) until ack, and the instance row is created at ack, so
-- a healthy in-flight start always matches one of the NOT EXISTS guards
-- and is never failed here. Best-effort; wrapped like step 1.
BEGIN
EXECUTE pg_catalog.format(
'UPDATE df.instances i '
'SET status = ''failed'', updated_at = pg_catalog.now() '
'WHERE i.status IN (''pending'', ''running'') '
' AND i.updated_at < pg_catalog.now() - pg_catalog.make_interval(secs => $1) '
' AND NOT EXISTS (SELECT 1 FROM %1$I.instances d WHERE d.instance_id = i.id) '
' AND NOT EXISTS (SELECT 1 FROM %1$I.orchestrator_queue q WHERE q.instance_id = i.id)',
sch)
USING p_grace_seconds;
GET DIAGNOSTICS stuck = ROW_COUNT;
EXCEPTION WHEN OTHERS THEN
stuck := 0;
RAISE WARNING 'pg_durable: reconcile stuck-failover pass failed: %', SQLERRM;
END;

duroxide_orphans_deleted := deleted;
stuck_instances_failed := stuck;
RETURN NEXT;
END;
$fn$;

REVOKE EXECUTE ON FUNCTION df.reconcile(integer) FROM PUBLIC;

-- ============================================================================
-- Promote df.nodes to a composite primary key (instance_id, id) (issue #129).
--
Expand Down
23 changes: 6 additions & 17 deletions src/activities/update_node_status.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,35 +5,24 @@

use duroxide::ActivityContext;
use sqlx::{PgPool, Postgres, QueryBuilder};
use std::sync::atomic::{AtomicU8, Ordering};
use std::sync::Arc;

/// Activity name for registration and scheduling
pub const NAME: &str = "pg_durable::activity::update-node-status";

/// Process-global cache for whether df.nodes.status_details exists.
///
/// 0 = unknown, 1 = present, 2 = absent. The column is added by the
/// 0.2.3 → 0.2.4 upgrade; a binary newer than the schema (Scenario B1) must run
/// against an older schema that lacks it. We cache "present" permanently once
/// seen, but re-probe on "unknown"/"absent" so an in-place ALTER EXTENSION
/// UPDATE that adds the column is picked up without a worker restart.
static STATUS_DETAILS_COL: AtomicU8 = AtomicU8::new(0);

async fn status_details_present(pool: &PgPool) -> bool {
if STATUS_DETAILS_COL.load(Ordering::Relaxed) == 1 {
return true;
}
let present = sqlx::query_scalar::<_, bool>(
// The background worker process survives DROP/CREATE EXTENSION in upgrade
// tests and in real binary-only swaps. The df schema can therefore move
// from "has status_details" to "does not have status_details" without a
// process restart, so a process-global positive cache would be unsafe.
sqlx::query_scalar::<_, bool>(
"SELECT EXISTS (SELECT 1 FROM information_schema.columns \
WHERE table_schema = 'df' AND table_name = 'nodes' \
AND column_name = 'status_details')",
)
.fetch_one(pool)
.await
.unwrap_or(false);
STATUS_DETAILS_COL.store(if present { 1 } else { 2 }, Ordering::Relaxed);
present
.unwrap_or(false)
}

/// Update the status and optionally the result of a node in df.nodes.
Expand Down
5 changes: 1 addition & 4 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,7 @@ fn is_worker_ready() -> bool {
}

Spi::get_one_with_args::<bool>(
&format!(
"SELECT EXISTS(SELECT 1 FROM {}._worker_ready WHERE schema_version >= $1)",
schema
),
&format!("SELECT EXISTS(SELECT 1 FROM {schema}._worker_ready WHERE schema_version >= $1)"),
&[crate::WORKER_SCHEMA_VERSION.into()],
)
.ok()
Expand Down
4 changes: 2 additions & 2 deletions src/dsl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1205,7 +1205,7 @@ pub fn await_instance(
"SELECT status FROM df.instances WHERE id = $1",
&[instance_id.into()],
)
.map_err(|e| format!("Failed to query status: {:?}", e))?;
.map_err(|e| format!("Failed to query status: {e:?}"))?;

if let Some(ref s) = status {
let s_lower = s.to_lowercase();
Expand All @@ -1220,7 +1220,7 @@ pub fn await_instance(
return Ok(s_lower);
}
} else {
return Err(format!("Instance not found: {}", instance_id).into());
return Err(format!("Instance not found: {instance_id}").into());
}

attempts += 1;
Expand Down
5 changes: 2 additions & 3 deletions src/explain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,7 @@ fn collect_nodes(
id_counter: &mut i32,
) -> String {
*id_counter += 1;
let node_id = format!("N{}", id_counter);
let node_id = format!("N{id_counter}");

// Recursively collect children first to get their IDs
let left_id = node
Expand Down Expand Up @@ -341,8 +341,7 @@ fn load_nodes_from_table(table: &str, instance_id: Option<&str>) -> HashMap<Stri
let (sql, args): (String, Vec<pgrx::datum::DatumWithOid>) = if let Some(id) = instance_id {
(
format!(
"SELECT id, node_type, query, result_name, left_node, right_node, status, result::text, {status_details_expr} FROM {} WHERE instance_id = $1",
table
"SELECT id, node_type, query, result_name, left_node, right_node, status, result::text, {status_details_expr} FROM {table} WHERE instance_id = $1"
),
vec![id.into()],
)
Expand Down
Loading
Loading