From 65b69dfb81168e16fd72832acdd2c4797099fa4f Mon Sep 17 00:00:00 2001 From: tjgreen42 <1738591+tjgreen42@users.noreply.github.com> Date: Wed, 1 Jul 2026 15:48:49 +0000 Subject: [PATCH] Reconcile df control-plane with the durable engine MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit df.start() writes its df rows in the caller's transaction but hands the workflow to the engine over a separate connection, so a rolled-back df.start() leaves the engine holding an inert record with no df row. Nothing reclaimed that engine state, and the engine records for removed terminal instances were never retired alongside their df rows. Turn the existing periodic pass that removes expired terminal instances into a reconciliation of the df control-plane and the engine. Each pass removes expired terminal instances (their df rows and engine records) and reclaims df-less orphaned engine records older than the retention window. Two Postmaster GUCs govern reconciliation: - pg_durable.retention_days (default 30) — how long terminal instances and their engine records are kept; also the age bound for reclaiming orphans. - pg_durable.reconcile_interval (default 3600s, 0 disables) — pass cadence. Covered by expired-instance and select_orphans unit tests and the 54_reconcile_orphans E2E test, which terminates its signal-wait survivor so no Running orchestration lingers in the shared data directory. The upgrade harness strips the reconcile phase's reconcile_interval/retention_days from postgresql.conf so neither leaks into the upgrade run. --- USER_GUIDE.md | 63 +++-- scripts/test-e2e-local.sh | 26 +- scripts/test-upgrade.sh | 10 +- src/lib.rs | 102 +++++-- src/types.rs | 11 + src/worker.rs | 373 +++++++++++++++++++------ tests/e2e/sql/54_reconcile_orphans.sql | 156 +++++++++++ 7 files changed, 613 insertions(+), 128 deletions(-) create mode 100644 tests/e2e/sql/54_reconcile_orphans.sql diff --git a/USER_GUIDE.md b/USER_GUIDE.md index 4ce1b9b..be808c9 100644 --- a/USER_GUIDE.md +++ b/USER_GUIDE.md @@ -1606,36 +1606,65 @@ SELECT started_at, last_seen_at, The background worker updates `last_seen_at` every ~5 seconds as part of its normal operation. -### Data Retention (Automatic Pruning) +### Automatic Reconciliation + +pg_durable tracks each workflow in its own `df` tables, while the durable engine +keeps workflow state in a separate schema. To keep those two stores consistent — +and to keep `df.instances`/`df.nodes` from growing without bound — the background +worker runs a **best-effort reconciliation pass** that does two things: + +- **Removes expired terminal instances.** Old **terminal** instances (status + `completed`, `failed`, or `cancelled`) and their `df.nodes` rows are deleted, + along with their engine records. Running and pending instances are **never** + removed, regardless of age. +- **Reclaims orphaned engine records.** `df.start()` writes the `df` rows in the + caller's transaction but hands the workflow to the engine over a separate + connection; if that transaction **rolls back**, the `df` rows vanish while the + engine keeps an inert record (it can never load its rolled-back graph, so it + ends up failed). Reconciliation deletes such df-less engine records once they + age past `retention_days`. Anything the engine is still tracking with a live + `df` row is left untouched. + +Two Postmaster-context GUCs govern it (set in `postgresql.conf`, restart to apply): -To keep `df.instances` and `df.nodes` from growing without bound, the background -worker runs a **best-effort pruning pass roughly once an hour** that deletes old -**terminal** instances (status `completed`, `failed`, or `cancelled`) and their -associated `df.nodes` rows. Running and pending instances are **never** pruned, -regardless of age. +```ini +# How often (seconds) a reconciliation pass runs. 0 disables reconciliation. +pg_durable.reconcile_interval = 3600 + +# Days a terminal instance is retained before reconciliation removes it (and its +# engine record); also the age bound for reclaiming orphaned engine records. +# 0 removes terminal instances as soon as the next pass runs. +pg_durable.retention_days = 30 +``` -The policy is currently fixed (no configuration GUC): +Retention combines that window with a fixed hard cap: - **Hard cap — at most 10,000 terminal instances are retained, regardless of age.** The newest 10,000 terminal instances are kept; any beyond that are - pruned even if they are only minutes old. -- **Retention window — 30 days.** Terminal instances older than 30 days are - pruned even if the table holds fewer than 10,000 of them. + removed even if they are only minutes old. (This cap is fixed, not a GUC.) +- **Retention window — `retention_days`.** Terminal instances older than the + window are removed even if the table holds fewer than 10,000 of them. Equivalently, a terminal instance is retained only while it is **both** among the -newest 10,000 terminal instances **and** less than 30 days old; otherwise it is -eligible for pruning. +newest 10,000 terminal instances **and** younger than `retention_days`; otherwise +it is eligible for removal. Notes: - "Age" is measured from `completed_at` when set (instances that reached `completed`), otherwise from `created_at`. `updated_at` is intentionally **not** used, because it is user-writable and would let a low-privilege user influence - pruning. -- Pruning runs in a single transaction with foreign-key constraints deferred: - matching `df.nodes` rows are deleted first, then the instance rows. -- Pruning is best-effort: if a pass fails it is logged and retried on the next - interval; it never stops workflow execution. + what is removed. +- A pass retires an instance's engine record **before** deleting its `df` rows, so + an interrupted pass can only leave the harmless direction — an engine record with + no `df` row (which the next pass reclaims) — never a `df` row without its engine + record. Because `df` and the engine are separate stores written by separate + transactions, they are therefore **eventually consistent**: read-only views like + `df.list_instances()` can briefly show a just-terminal row with empty + engine-derived columns until a later pass reconciles it. The artifact is + transient, self-healing, and never affects workflow execution or results. +- Reconciliation is best-effort: if a pass fails it is logged and retried on the + next interval; it never stops workflow execution. - If you need to retain terminal history beyond these limits (e.g. for auditing), copy the rows you care about into your own table before they age out. diff --git a/scripts/test-e2e-local.sh b/scripts/test-e2e-local.sh index 919ffa7..0fe28e8 100755 --- a/scripts/test-e2e-local.sh +++ b/scripts/test-e2e-local.sh @@ -59,6 +59,7 @@ DEFAULT_BUILD_PHASES=( "connlimit-backpressure" "connlimit-timeout" "connlimit-startup" + "reconcile" ) ALL_PHASES=( @@ -68,6 +69,7 @@ ALL_PHASES=( "connlimit-backpressure" "connlimit-timeout" "connlimit-startup" + "reconcile" "http-disabled" "http-allow-all" ) @@ -141,6 +143,9 @@ phase_label() { connlimit-startup) echo "connection limit startup validation" ;; + reconcile) + echo "reconcile orphans" + ;; http-disabled) echo "HTTP disabled (no http Cargo feature)" ;; @@ -170,6 +175,9 @@ phase_for_test() { 46_connection_limit_startup_validation) echo "connlimit-startup" ;; + 54_reconcile_orphans) + echo "reconcile" + ;; 47_http_dsl_disabled) echo "http-disabled" ;; @@ -301,7 +309,7 @@ set_conf_line() { } clear_connlimit_gucs() { - sed -i.bak '/^[#[:space:]]*pg_durable\.max_/d; /^[#[:space:]]*pg_durable\.execution_/d' "$CONF_FILE" + sed -i.bak '/^[#[:space:]]*pg_durable\.max_/d; /^[#[:space:]]*pg_durable\.execution_/d; /^[#[:space:]]*pg_durable\.reconcile_/d; /^[#[:space:]]*pg_durable\.retention_/d' "$CONF_FILE" } ensure_data_dir() { @@ -479,6 +487,17 @@ configure_phase() { set_conf_line "pg_durable.enable_superuser_instances" "on" set_conf_line "pg_durable.max_duroxide_connections" "1" ;; + reconcile) + set_conf_line "shared_preload_libraries" "'pg_durable'" + set_conf_line "pg_durable.worker_role" "'postgres'" + set_conf_line "pg_durable.database" "'postgres'" + set_conf_line "pg_durable.enable_superuser_instances" "on" + # Short reconcile cadence and zero retention so a pass acts within the + # test window instead of the conservative production defaults + # (retention_days=0 makes an aged-out orphan eligible at once). + set_conf_line "pg_durable.reconcile_interval" "2" + set_conf_line "pg_durable.retention_days" "0" + ;; http-disabled) set_conf_line "shared_preload_libraries" "'pg_durable'" set_conf_line "pg_durable.worker_role" "'postgres'" @@ -506,7 +525,7 @@ prepare_phase() { http-allow-all) build_extension_http_allow_all ;; - no-preload|standard|superuser-guc-off|connlimit-backpressure|connlimit-timeout|connlimit-startup) + no-preload|standard|superuser-guc-off|connlimit-backpressure|connlimit-timeout|connlimit-startup|reconcile) # Rebuild if previous phase changed the Cargo features if [ "$CURRENT_FEATURES" != "http-allow-test-domains" ]; then build_extension @@ -560,6 +579,9 @@ prepare_phase() { ;; connlimit-startup) ;; + reconcile) + wait_for_worker_ready + ;; http-disabled|http-allow-all) ensure_e2e_role wait_for_worker_ready diff --git a/scripts/test-upgrade.sh b/scripts/test-upgrade.sh index 3e05e84..602fdc8 100755 --- a/scripts/test-upgrade.sh +++ b/scripts/test-upgrade.sh @@ -294,10 +294,12 @@ if [ -f "$DATA_DIR/postgresql.conf" ]; then sed -i.bak '/^#*port = /d' "$DATA_DIR/postgresql.conf" echo "port = $PG_PORT" >> "$DATA_DIR/postgresql.conf" fi - # Clear any connection-limit GUCs that E2E test phases (connlimit-*) may have - # left behind. Without this, max_duroxide_connections=1 causes the BGW to - # refuse to start, breaking the B1 wait-for-readiness check. - sed -i.bak '/^[#[:space:]]*pg_durable\.max_/d; /^[#[:space:]]*pg_durable\.execution_/d' "$DATA_DIR/postgresql.conf" + # Clear any per-phase GUCs that E2E test phases may have left in the shared + # postgresql.conf. Without this, the connlimit-* phases' max_duroxide_connections=1 + # causes the BGW to refuse to start (breaking the B1 wait-for-readiness check), + # and the reconcile phase's aggressive reconcile_interval/retention_days would + # remove terminal instances mid-test and make df.result() flaky. + sed -i.bak '/^[#[:space:]]*pg_durable\.max_/d; /^[#[:space:]]*pg_durable\.execution_/d; /^[#[:space:]]*pg_durable\.reconcile_/d; /^[#[:space:]]*pg_durable\.retention_/d' "$DATA_DIR/postgresql.conf" fi # If the server is already running, restart it so both the freshly installed diff --git a/src/lib.rs b/src/lib.rs index 482e7e1..6796bd0 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -39,6 +39,16 @@ pub static ENABLE_SUPERUSER_INSTANCES: GucSetting = GucSetting::::ne /// `docs/api-reference.md` and `USER_GUIDE.md`; update those mirrors if you change them. pub static LIST_INSTANCES_MAX_LIMIT: GucSetting = GucSetting::::new(1000); +/// Days a terminal ('completed'/'failed'/'cancelled') instance is retained +/// before reconciliation removes it and its engine record. The same age bound +/// governs when orphaned engine records left by a rolled-back `df.start()` are +/// reclaimed. `0` removes terminal instances as soon as the next pass runs. +pub static RETENTION_DAYS: GucSetting = GucSetting::::new(30); + +/// Seconds between background reconciliation passes. Each pass removes expired +/// terminal instances and reclaims orphaned engine records. `0` disables it. +pub static RECONCILE_INTERVAL: GucSetting = GucSetting::::new(3600); + // Module declarations pub mod activities; pub mod client; @@ -163,6 +173,28 @@ pub extern "C-unwind" fn _PG_init() { GucFlags::default(), ); + GucRegistry::define_int_guc( + c"pg_durable.retention_days", + c"Days a terminal instance is retained before reconciliation removes it", + c"Background reconciliation removes 'completed'/'failed'/'cancelled' df.instances rows (and their engine records) once they are older than this many days, subject to a fixed hard cap on the number retained. The same age bound governs when orphaned engine records left by a rolled-back df.start() are reclaimed. 0 removes terminal instances as soon as the next pass runs.", + &RETENTION_DAYS, + 0, + 36500, + GucContext::Postmaster, + GucFlags::default(), + ); + + GucRegistry::define_int_guc( + c"pg_durable.reconcile_interval", + c"Seconds between background reconciliation passes (0 disables reconciliation)", + c"Each background reconciliation pass removes expired terminal instances and reclaims orphaned engine records left by a rolled-back df.start(). Set to 0 to disable background reconciliation entirely.", + &RECONCILE_INTERVAL, + 0, + 86400, + GucContext::Postmaster, + GucFlags::default(), + ); + worker::register_background_worker(); } @@ -947,23 +979,23 @@ mod tests { ) } - async fn delete_prune_test_rows(pool: &sqlx::PgPool, id_list: &str) { - let mut tx = pool.begin().await.expect("begin cleanup transaction"); + async fn delete_expired_test_rows(pool: &sqlx::PgPool, id_list: &str) { + let mut tx = pool.begin().await.expect("begin teardown transaction"); sqlx::query("SET CONSTRAINTS ALL DEFERRED") .execute(&mut *tx) .await - .expect("defer cleanup constraints"); + .expect("defer teardown constraints"); sqlx::query(&format!( "DELETE FROM df.nodes WHERE instance_id IN ({id_list})" )) .execute(&mut *tx) .await - .expect("clean prune test nodes"); + .expect("clean test nodes"); sqlx::query(&format!("DELETE FROM df.instances WHERE id IN ({id_list})")) .execute(&mut *tx) .await - .expect("clean prune test instances"); - tx.commit().await.expect("commit cleanup"); + .expect("clean test instances"); + tx.commit().await.expect("commit teardown"); } // ======================================================================== @@ -1507,7 +1539,7 @@ mod tests { } #[pg_test] - fn test_prune_terminal_instances_respects_age_and_keep_count() { + fn test_expired_instance_removal_respects_age_and_keep_count() { let conn = test_database_connection_string(); let rt = tokio::runtime::Builder::new_current_thread() @@ -1531,7 +1563,7 @@ mod tests { .collect::>() .join(", "); - delete_prune_test_rows(&pool, &id_list).await; + delete_expired_test_rows(&pool, &id_list).await; let mut fixture_tx = pool.begin().await.expect("begin fixture transaction"); sqlx::query("SET CONSTRAINTS ALL DEFERRED") @@ -1544,10 +1576,10 @@ mod tests { -- by COALESCE(completed_at, created_at): -- aa261001 (1d, r1) -> keep (within cap, young) -- aa261002 (2d, r2) -> keep (within cap, young) - -- aa261003 (3d, r3) -> PRUNE by the hard cap even though it is + -- aa261003 (3d, r3) -> REMOVE by the hard cap even though it is -- only 3 days old (rank > max_keep) - -- aa261004 (40d, r4) -> PRUNE (beyond cap and past retention) - -- aa261005 (50d, r5) -> PRUNE (beyond cap and past retention) + -- aa261004 (40d, r4) -> REMOVE (beyond cap and past retention) + -- aa261005 (50d, r5) -> REMOVE (beyond cap and past retention) -- aa261006 (running) -> keep (non-terminal, never considered) -- aa261004/aa261005 are 'failed'/'cancelled' (NULL completed_at) with -- a forged far-future `updated_at`; they must still rank/age by their @@ -1556,9 +1588,9 @@ mod tests { VALUES ('aa261001', 'keep-recent-1', 'bb261001', 'completed', 1, true, false), ('aa261002', 'keep-recent-2', 'bb261002', 'completed', 2, true, false), - ('aa261003', 'prune-young-over-cap', 'bb261003', 'completed', 3, true, false), - ('aa261004', 'prune-old-failed-forged', 'bb261004', 'failed', 40, false, true), - ('aa261005', 'prune-old-cancelled-forged', 'bb261005', 'cancelled', 50, false, true), + ('aa261003', 'expire-young-over-cap', 'bb261003', 'completed', 3, true, false), + ('aa261004', 'expire-old-failed-forged', 'bb261004', 'failed', 40, false, true), + ('aa261005', 'expire-old-cancelled-forged', 'bb261005', 'cancelled', 50, false, true), ('aa261006', 'keep-running', 'bb261006', 'running', 90, false, false) ) INSERT INTO df.instances @@ -1582,7 +1614,7 @@ mod tests { ) .execute(&mut *fixture_tx) .await - .expect("insert prune instances"); + .expect("insert test instances"); sqlx::query( r#" @@ -1610,12 +1642,12 @@ mod tests { ) .execute(&mut *fixture_tx) .await - .expect("insert prune nodes"); + .expect("insert test nodes"); let stats = - crate::worker::prune_terminal_instances_transaction(&mut fixture_tx, 30, 2) + crate::worker::delete_expired_instances_transaction(&mut fixture_tx, 30, 2) .await - .expect("prune terminal instances"); + .expect("delete expired instances"); let remaining_instances: i64 = sqlx::query_scalar(&format!( "SELECT pg_catalog.count(*)::bigint FROM df.instances WHERE id IN ({id_list})" @@ -1644,19 +1676,37 @@ mod tests { fixture_tx .rollback() .await - .expect("rollback prune test fixture"); + .expect("rollback test fixture"); pool.close().await; stats }); - assert_eq!( - stats, - crate::worker::PruneStats { - instances_deleted: 3, - nodes_deleted: 3, - } - ); + assert_eq!(stats.instances_deleted, 3); + assert_eq!(stats.nodes_deleted, 3); + assert_eq!(stats.deleted_ids.len(), 3); + } + + #[pg_test] + fn test_select_orphans_excludes_present_and_sub_orchestrations() { + use std::collections::HashSet; + + // Failed engine instances reconciliation sees: a df-backed one, a genuine + // orphan (rolled-back df.start), and two sub-orchestrations the engine + // created internally for fan-out. + let failed = vec![ + "inst-backed".to_string(), + "inst-orphan".to_string(), + "sub::inst-backed::0".to_string(), + "inst-backed::sub::1".to_string(), + ]; + // Only inst-backed still has a df.instances row. + let present: HashSet = ["inst-backed".to_string()].into_iter().collect(); + + let orphans = crate::worker::select_orphans(failed, &present); + + // Only the df-less, non-sub instance is reclaimed. + assert_eq!(orphans, vec!["inst-orphan".to_string()]); } // ======================================================================== diff --git a/src/types.rs b/src/types.rs index b0ba264..4a6d78f 100644 --- a/src/types.rs +++ b/src/types.rs @@ -57,6 +57,17 @@ pub fn get_execution_acquire_timeout() -> Duration { Duration::from_secs(crate::EXECUTION_ACQUIRE_TIMEOUT.get() as u64) } +/// Days a terminal instance is retained before reconciliation removes it and its +/// engine record; also the age bound for reclaiming orphaned engine records. +pub fn get_retention_days() -> i32 { + crate::RETENTION_DAYS.get() +} + +/// Interval between background reconciliation passes. Zero disables reconciliation. +pub fn get_reconcile_interval() -> Duration { + Duration::from_secs(crate::RECONCILE_INTERVAL.get() as u64) +} + /// Returns `true` when superuser-submitted instances are permitted. pub fn superuser_instances_enabled() -> bool { crate::ENABLE_SUPERUSER_INSTANCES.get() diff --git a/src/worker.rs b/src/worker.rs index 2bd3bf7..987a041 100644 --- a/src/worker.rs +++ b/src/worker.rs @@ -12,30 +12,32 @@ use std::sync::Arc; use std::time::Duration; use duroxide::runtime; +use duroxide::{Client, InstanceFilter}; use duroxide_pg::PostgresProvider; use tracing_subscriber::EnvFilter; use crate::registry::{create_activity_registry, create_orchestration_registry}; use crate::types::{ get_max_duroxide_connections, get_max_management_connections, get_max_user_connections, - postgres_connection_string, resolve_duroxide_schema_pool, worker_provider_config, + get_reconcile_interval, get_retention_days, postgres_connection_string, + resolve_duroxide_schema_pool, worker_provider_config, }; -const TERMINAL_INSTANCE_PRUNE_INTERVAL: Duration = Duration::from_secs(60 * 60); // Retention policy for terminal ('completed'/'failed'/'cancelled') instances. -// A terminal instance is pruned when it is OUTSIDE the newest +// A terminal instance is removed when it is OUTSIDE the newest // TERMINAL_INSTANCE_MAX_KEEP rows (a hard cap enforced regardless of age) OR -// older than TERMINAL_INSTANCE_RETENTION_DAYS. Equivalently, an instance is -// retained only while it is BOTH among the newest TERMINAL_INSTANCE_MAX_KEEP -// terminal rows AND younger than the retention window. The number of retained -// terminal instances therefore never exceeds TERMINAL_INSTANCE_MAX_KEEP. -const TERMINAL_INSTANCE_RETENTION_DAYS: i32 = 30; +// older than pg_durable.retention_days. Equivalently, an instance is retained +// only while it is BOTH among the newest TERMINAL_INSTANCE_MAX_KEEP terminal rows +// AND younger than the retention window. The number of retained terminal +// instances therefore never exceeds TERMINAL_INSTANCE_MAX_KEEP. const TERMINAL_INSTANCE_MAX_KEEP: i64 = 10_000; -#[derive(Debug, Clone, Copy, PartialEq, Eq)] -pub(crate) struct PruneStats { +#[derive(Debug, Clone, PartialEq, Eq)] +pub(crate) struct ExpiredDeletion { pub instances_deleted: i64, pub nodes_deleted: i64, + /// Ids of the df.instances rows this deletion removed. + pub deleted_ids: Vec, } /// Initialize tracing subscriber for duroxide logs. @@ -211,7 +213,7 @@ async fn run_duroxide_runtime() { duroxide_schema ); - let Some(duroxide_runtime) = initialize_duroxide_runtime( + let Some((duroxide_runtime, duroxide_store)) = initialize_duroxide_runtime( &pg_conn_str, INIT_RETRY_INTERVAL, &mgmt_pool, @@ -247,6 +249,7 @@ async fn run_duroxide_runtime() { &poll_pool, &mgmt_pool, duroxide_runtime, + duroxide_store, EXTENSION_DROP_POLL_INTERVAL, SHUTDOWN_CHECK_INTERVAL, epoch_id.as_deref(), @@ -470,7 +473,7 @@ async fn initialize_duroxide_runtime( retry_interval: Duration, mgmt_pool: &sqlx::PgPool, schema_name: &str, -) -> Option> { +) -> Option<(Arc, Arc)> { log!("pg_durable: initializing duroxide runtime..."); // Control duroxide provider pool size via env var (the only mechanism @@ -553,11 +556,13 @@ async fn initialize_duroxide_runtime( create_activity_registry(Arc::new(mgmt_pool.clone()), user_semaphore.clone()); let orchestrations = create_orchestration_registry(); + let store_for_client = store.clone(); + let duroxide_runtime = runtime::Runtime::start_with_store(store, activities, orchestrations).await; log!("pg_durable: duroxide runtime started"); - return Some(duroxide_runtime); + return Some((duroxide_runtime, store_for_client)); } } @@ -645,37 +650,15 @@ async fn check_epoch_sentinel(pool: &sqlx::PgPool, epoch_id: &str) -> bool { matches!(result, Ok(Some(_))) } -async fn prune_terminal_instances(pool: &sqlx::PgPool) -> Result { - prune_terminal_instances_with_limits( - pool, - TERMINAL_INSTANCE_RETENTION_DAYS, - TERMINAL_INSTANCE_MAX_KEEP, - ) - .await -} - -pub(crate) async fn prune_terminal_instances_with_limits( - pool: &sqlx::PgPool, - retention_days: i32, - max_keep: i64, -) -> Result { - let mut tx = pool.begin().await?; - let stats = prune_terminal_instances_transaction(&mut tx, retention_days, max_keep).await?; - tx.commit().await?; - - Ok(stats) -} - -pub(crate) async fn prune_terminal_instances_transaction( +/// Expired terminal instances: those beyond the newest `max_keep` (a hard cap +/// enforced regardless of age) OR older than `retention_days`. Read-only — the +/// caller retires the engine records before deleting the df rows. +async fn select_expired_instance_ids_tx( tx: &mut sqlx::Transaction<'_, sqlx::Postgres>, retention_days: i32, max_keep: i64, -) -> Result { - sqlx::query("SET CONSTRAINTS ALL DEFERRED") - .execute(&mut **tx) - .await?; - - let (instances_deleted, nodes_deleted): (i64, i64) = sqlx::query_as( +) -> Result, sqlx::Error> { + let ids: Option> = sqlx::query_scalar( r#" WITH terminal_instances AS ( SELECT @@ -691,72 +674,147 @@ pub(crate) async fn prune_terminal_instances_transaction( -- is intentionally excluded: PUBLIC has column-level UPDATE on -- it, and for 'failed'/'cancelled' rows `completed_at` is NULL, -- so including `updated_at` would let a low-privilege user forge - -- the prune ordering/age. `completed_at` (set by the worker on + -- the removal ordering/age. `completed_at` (set by the worker on -- completion) and `created_at` (insert-time default, not -- grantable) are not user-writable. COALESCE(completed_at, created_at) AS terminal_at FROM df.instances WHERE status OPERATOR(pg_catalog.=) ANY (ARRAY['completed', 'failed', 'cancelled']) ) ranked - ), - prune_candidates AS ( - SELECT id - FROM terminal_instances - -- Prune when the row is beyond the newest $1 terminal instances (a - -- hard cap enforced regardless of age) OR older than the retention - -- window ($2 days). Retained rows are thus always within the newest - -- $1 AND younger than the retention window, so the retained terminal - -- count never exceeds $1. - WHERE terminal_rank OPERATOR(pg_catalog.>) $1 - OR terminal_at OPERATOR(pg_catalog.<) - (pg_catalog.now() OPERATOR(pg_catalog.-) pg_catalog.make_interval(days => $2::int)) - ), - deleted_nodes AS ( + ) + -- Expired when the row is beyond the newest $1 terminal instances (a hard + -- cap enforced regardless of age) OR older than the retention window ($2 + -- days). Retained rows are thus always within the newest $1 AND younger than + -- the retention window, so the retained terminal count never exceeds $1. + SELECT pg_catalog.array_agg(id) + FROM terminal_instances + WHERE terminal_rank OPERATOR(pg_catalog.>) $1 + OR terminal_at OPERATOR(pg_catalog.<) + (pg_catalog.now() OPERATOR(pg_catalog.-) pg_catalog.make_interval(days => $2::int)) + "#, + ) + .bind(max_keep) + .bind(retention_days) + .fetch_one(&mut **tx) + .await?; + Ok(ids.unwrap_or_default()) +} + +/// Delete the df.nodes and df.instances rows for `ids` (nodes first, with the +/// circular same-instance FK deferred). Returns the counts deleted. +async fn delete_expired_instances_tx( + tx: &mut sqlx::Transaction<'_, sqlx::Postgres>, + ids: &[String], +) -> Result { + if ids.is_empty() { + return Ok(ExpiredDeletion { + instances_deleted: 0, + nodes_deleted: 0, + deleted_ids: Vec::new(), + }); + } + sqlx::query("SET CONSTRAINTS ALL DEFERRED") + .execute(&mut **tx) + .await?; + + let (deleted_ids, nodes_deleted): (Option>, i64) = sqlx::query_as( + r#" + WITH deleted_nodes AS ( DELETE FROM df.nodes n - USING prune_candidates pc - WHERE n.instance_id OPERATOR(pg_catalog.=) pc.id + USING pg_catalog.unnest($1::text[]) AS t(id) + WHERE n.instance_id OPERATOR(pg_catalog.=) t.id RETURNING n.instance_id ), deleted_instances AS ( DELETE FROM df.instances i - USING prune_candidates pc - WHERE i.id OPERATOR(pg_catalog.=) pc.id + USING pg_catalog.unnest($1::text[]) AS t(id) + WHERE i.id OPERATOR(pg_catalog.=) t.id RETURNING i.id ) SELECT - (SELECT pg_catalog.count(*)::bigint FROM deleted_instances) AS instances_deleted, + (SELECT pg_catalog.array_agg(id) FROM deleted_instances) AS deleted_ids, (SELECT pg_catalog.count(*)::bigint FROM deleted_nodes) AS nodes_deleted "#, ) - .bind(max_keep) - .bind(retention_days) + .bind(ids) .fetch_one(&mut **tx) .await?; - Ok(PruneStats { - instances_deleted, + let deleted_ids = deleted_ids.unwrap_or_default(); + Ok(ExpiredDeletion { + instances_deleted: deleted_ids.len() as i64, nodes_deleted, + deleted_ids, }) } +/// Read the expired terminal instances. +async fn select_expired_instance_ids( + pool: &sqlx::PgPool, + retention_days: i32, + max_keep: i64, +) -> Result, sqlx::Error> { + let mut tx = pool.begin().await?; + let ids = select_expired_instance_ids_tx(&mut tx, retention_days, max_keep).await?; + tx.commit().await?; + Ok(ids) +} + +/// Delete the df rows for the given expired-instance ids. +async fn delete_expired_instances( + pool: &sqlx::PgPool, + ids: &[String], +) -> Result { + let mut tx = pool.begin().await?; + let stats = delete_expired_instances_tx(&mut tx, ids).await?; + tx.commit().await?; + Ok(stats) +} + +/// Select and delete the eligible terminal instances in a single transaction. +/// Used by tests; the background worker instead runs the two halves separately so +/// it can retire the engine records between them (see +/// `run_until_extension_dropped_or_shutdown`). +#[cfg(any(test, feature = "pg_test"))] +pub(crate) async fn delete_expired_instances_transaction( + tx: &mut sqlx::Transaction<'_, sqlx::Postgres>, + retention_days: i32, + max_keep: i64, +) -> Result { + let ids = select_expired_instance_ids_tx(tx, retention_days, max_keep).await?; + delete_expired_instances_tx(tx, &ids).await +} + async fn run_until_extension_dropped_or_shutdown( poll_pool: &sqlx::PgPool, maintenance_pool: &sqlx::PgPool, duroxide_runtime: Arc, + duroxide_store: Arc, drop_poll_interval: Duration, shutdown_check_interval: Duration, epoch_id: Option<&str>, ) { log!("pg_durable: processing durable functions..."); + let client = Client::new(duroxide_store.clone()); + let mut drop_check = tokio::time::interval(drop_poll_interval); drop_check.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay); - let mut prune_check = tokio::time::interval_at( - tokio::time::Instant::now() + TERMINAL_INSTANCE_PRUNE_INTERVAL, - TERMINAL_INSTANCE_PRUNE_INTERVAL, + // reconcile_interval=0 disables the sweep; the tick is then gated off, but + // interval_at still needs a positive period, so fall back to 1h. + let reconcile_interval = get_reconcile_interval(); + let reconcile_enabled = !reconcile_interval.is_zero(); + let effective_reconcile_interval = if reconcile_interval.is_zero() { + Duration::from_secs(60 * 60) + } else { + reconcile_interval + }; + let mut reconcile_check = tokio::time::interval_at( + tokio::time::Instant::now() + effective_reconcile_interval, + effective_reconcile_interval, ); - prune_check.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay); + reconcile_check.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay); loop { tokio::select! { @@ -777,19 +835,51 @@ async fn run_until_extension_dropped_or_shutdown( break; } } - _ = prune_check.tick() => { - match prune_terminal_instances(maintenance_pool).await { - Ok(stats) if stats.instances_deleted > 0 || stats.nodes_deleted > 0 => { - log!( - "pg_durable: pruned {} terminal instance(s) and {} node row(s)", - stats.instances_deleted, - stats.nodes_deleted - ); + _ = reconcile_check.tick(), if reconcile_enabled => { + let retention_days = get_retention_days(); + + // Engine-first: retire the engine record before the df row, so a + // failed pass only leaves the harmless direction — an engine record + // with no df row (invisible to df.list_instances, reclaimed below), + // never the reverse. The stores are thus eventually consistent. + match select_expired_instance_ids( + maintenance_pool, + retention_days, + TERMINAL_INSTANCE_MAX_KEEP, + ) + .await + { + Ok(candidates) if !candidates.is_empty() => { + // Only delete the df rows once the engine records are gone; + // if that fails, leave both in place and retry next pass. + if retire_engine_records(&client, &candidates).await { + match delete_expired_instances(maintenance_pool, &candidates).await { + Ok(stats) => { + if stats.instances_deleted > 0 || stats.nodes_deleted > 0 { + log!( + "pg_durable: removed {} expired instance(s) and {} node row(s)", + stats.instances_deleted, + stats.nodes_deleted + ); + } + } + Err(e) => { + log!("pg_durable: removing expired instances failed: {e}"); + } + } + } } Ok(_) => {} - Err(e) => { - log!("pg_durable: terminal instance pruning failed: {}", e); + Err(e) => log!("pg_durable: selecting expired instances failed: {e}"), + } + + let retention = Duration::from_secs(retention_days as u64 * 86_400); + match reclaim_orphaned_instances(maintenance_pool, &client, retention).await { + Ok(reclaimed) if reclaimed > 0 => { + log!("pg_durable: reclaimed {reclaimed} orphaned engine record(s)"); } + Ok(_) => {} + Err(e) => log!("pg_durable: reclaiming orphaned engine records failed: {e}"), } } } @@ -799,3 +889,128 @@ async fn run_until_extension_dropped_or_shutdown( duroxide_runtime.shutdown(Some(10_000)).await; log!("pg_durable: duroxide runtime shutdown complete"); } + +/// Maximum orphans one reconciliation pass reclaims, bounding a single tick's work. +const RECLAIM_BATCH: u32 = 1000; + +/// True for sub-orchestration instance ids, which the engine creates internally +/// (JOIN/RACE fan-out) and which legitimately have no df.instances row — +/// reconciliation must never delete them. +fn is_sub_orchestration(id: &str) -> bool { + id.starts_with("sub::") || id.contains("::sub::") +} + +/// Milliseconds-since-epoch cutoff for "older than `retention`". Used as the +/// engine's `completed_before` filter so an orphan younger than the retention +/// window is left alone. +fn retention_cutoff_ms(retention: Duration) -> u64 { + let since_epoch = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap_or_default(); + since_epoch.saturating_sub(retention).as_millis() as u64 +} + +/// From the engine's `Failed` instance ids and the set that still have a +/// df.instances row, select the orphans to reclaim: those with no df row and that +/// are not sub-orchestrations (the engine creates sub-orchestrations internally +/// for JOIN/RACE fan-out; they legitimately have no df row and must be kept). +pub(crate) fn select_orphans( + failed_ids: Vec, + present: &std::collections::HashSet, +) -> Vec { + failed_ids + .into_iter() + .filter(|id| !is_sub_orchestration(id) && !present.contains(id)) + .collect() +} + +/// Delete engine records the engine still holds but pg_durable no longer has a +/// df.instances row for, once they age past the `retention` window. Returns the +/// count reclaimed. +/// +/// Only `Failed` engine instances are considered: a rolled-back df.start() leaves +/// its engine instance `Failed` (it can never load its graph), and scanning only +/// `Failed` keeps this cheap — the (potentially large) `Completed` set is retired +/// by reconciliation ahead of removing its df rows instead (see the reconcile +/// branch). Sub-orchestrations are excluded, and the `completed_before` filter +/// enforces the retention window so an instance whose df commit is merely +/// in-flight or just-landed is never touched. +async fn reclaim_orphaned_instances( + pool: &sqlx::PgPool, + client: &Client, + retention: Duration, +) -> Result { + let candidates: Vec = client + .list_instances_by_status("Failed") + .await + .map_err(|e| format!("list failed instances: {e:?}"))?; + if candidates.is_empty() { + return Ok(0); + } + + // Keep only ids with no df.instances row (orphans). The worker pool bypasses + // RLS, so this sees every user's rows. + let present: std::collections::HashSet = + sqlx::query_scalar("SELECT id FROM df.instances WHERE id = ANY($1)") + .bind(&candidates) + .fetch_all(pool) + .await + .map_err(|e| format!("cross-check df.instances: {e}"))? + .into_iter() + .collect(); + + let mut orphans = select_orphans(candidates, &present); + if orphans.is_empty() { + return Ok(0); + } + orphans.truncate(RECLAIM_BATCH as usize); + + let result = client + .delete_instance_bulk(InstanceFilter { + instance_ids: Some(orphans), + completed_before: Some(retention_cutoff_ms(retention)), + limit: Some(RECLAIM_BATCH), + }) + .await + .map_err(|e| format!("delete_instance_bulk: {e:?}"))?; + Ok(result.instances_deleted) +} + +/// Best-effort deletion of duroxide engine records by id, ahead of deleting the +/// matching df rows. Returns `true` when it is safe to delete those df rows — +/// the engine delete succeeded, or there was nothing to delete. On failure it +/// returns `false` so the caller leaves the df rows in place and retries the +/// whole instance next pass rather than orphaning its engine record. +async fn retire_engine_records(client: &Client, ids: &[String]) -> bool { + let ids: Vec = ids + .iter() + .filter(|id| !is_sub_orchestration(id)) + .cloned() + .collect(); + if ids.is_empty() { + return true; + } + let limit = ids.len().min(u32::MAX as usize) as u32; + match client + .delete_instance_bulk(InstanceFilter { + instance_ids: Some(ids), + completed_before: None, + limit: Some(limit), + }) + .await + { + Ok(result) => { + if result.instances_deleted > 0 { + log!( + "pg_durable: retired {} engine record(s) ahead of removing the df rows", + result.instances_deleted + ); + } + true + } + Err(e) => { + log!("pg_durable: failed to retire engine records; deferring df removal: {e:?}"); + false + } + } +} diff --git a/tests/e2e/sql/54_reconcile_orphans.sql b/tests/e2e/sql/54_reconcile_orphans.sql new file mode 100644 index 0000000..b443669 --- /dev/null +++ b/tests/e2e/sql/54_reconcile_orphans.sql @@ -0,0 +1,156 @@ +-- Copyright (c) Microsoft Corporation. +-- Licensed under the PostgreSQL License. + +-- Reconciliation reclaims orphaned engine records: those the engine still holds +-- but pg_durable no longer has a df.instances row for. +-- +-- A rolled-back df.start() is the canonical case: df.start() hands the workflow +-- to the engine over a separate connection (which commits), then the caller's +-- transaction rolls back — so the df rows vanish but the engine keeps an inert +-- instance that can never load its (rolled-back) graph and ends up Failed. +-- Reconciliation reclaims such df-less engine records once they age past the +-- retention window, and must leave df-backed instances alone. +-- +-- Runs in the "reconcile" phase with pg_durable.reconcile_interval = 2 and +-- pg_durable.retention_days = 0 so a pass acts within the test window instead of +-- the conservative production defaults. Base connection is the superuser +-- (postgres); enable_superuser_instances is on. + +-- Helper: does the engine still know this instance? +CREATE OR REPLACE FUNCTION pg_temp.duroxide_has(p_id TEXT) +RETURNS boolean LANGUAGE plpgsql AS $$ +DECLARE + sch TEXT := df.duroxide_schema(); + n INT; +BEGIN + EXECUTE format('SELECT pg_catalog.count(*) FROM %I.get_instance_info($1)', sch) + INTO n USING p_id; + RETURN n > 0; +END $$; + +-- =========================================================================== +-- Scenario 1: a rolled-back df.start() leaves a df-less engine record that +-- reconciliation reclaims. +-- =========================================================================== + +-- Capture the id via \gset (a client variable survives the rollback), then stash +-- it in a temp table *after* the rollback so the DO block can read it (\gset +-- variables do not interpolate inside DO blocks). +BEGIN; +SELECT df.start('SELECT 1', 'orphan-probe') AS gid \gset +ROLLBACK; + +DROP TABLE IF EXISTS _orphan; +CREATE TEMP TABLE _orphan (id TEXT); +INSERT INTO _orphan VALUES (:'gid'); + +DO $$ +DECLARE + gid TEXT; + appeared BOOLEAN := FALSE; + gone BOOLEAN := FALSE; + attempts INT := 0; +BEGIN + SELECT id INTO gid FROM _orphan; + + -- The engine record materializes once the worker fails to load the + -- rolled-back graph (~5s). Confirm it exists so the reclaim assertion is real. + LOOP + IF pg_temp.duroxide_has(gid) THEN appeared := TRUE; EXIT; END IF; + EXIT WHEN attempts > 300; -- 30s + PERFORM pg_sleep(0.1); + attempts := attempts + 1; + END LOOP; + IF NOT appeared THEN + RAISE EXCEPTION 'TEST FAILED [orphan_reclaimed]: orphan % never materialized in the engine (cannot test reclaiming)', gid; + END IF; + + -- Reconciliation (reconcile_interval=2s, retention_days=0) must then reclaim it. + attempts := 0; + LOOP + IF NOT pg_temp.duroxide_has(gid) THEN gone := TRUE; EXIT; END IF; + EXIT WHEN attempts > 300; -- 30s + PERFORM pg_sleep(0.1); + attempts := attempts + 1; + END LOOP; + IF NOT gone THEN + RAISE EXCEPTION 'TEST FAILED [orphan_reclaimed]: reconciliation did not reclaim the df-less engine record %', gid; + END IF; + + RAISE NOTICE 'PASSED [orphan_reclaimed]: reconciliation reclaimed the df-less (orphan) engine record'; +END $$; + +DROP TABLE _orphan; + +-- =========================================================================== +-- Scenario 2: a live, df-backed instance is left alone (reconciliation is not so +-- aggressive that it touches instances pg_durable still tracks). A signal wait +-- keeps it Running — hence neither terminal (removal skips it) nor Failed +-- (reclamation skips it) — so it stays a live, df-backed instance across passes. +-- =========================================================================== + +DROP TABLE IF EXISTS _keep; +CREATE TEMP TABLE _keep (id TEXT); +INSERT INTO _keep SELECT df.start(df.wait_for_signal('keep-go'), 'keep'); + +DO $$ +DECLARE + kid TEXT; + status TEXT; + attempts INT := 0; +BEGIN + SELECT id INTO kid FROM _keep; + + -- Wait until the engine is actually running it, so duroxide_has is meaningful. + LOOP + SELECT s INTO status FROM df.status(kid) s; + EXIT WHEN lower(COALESCE(status, '')) = 'running' OR attempts > 300; + PERFORM pg_sleep(0.1); + attempts := attempts + 1; + END LOOP; + IF lower(COALESCE(status, 'pending')) <> 'running' THEN + RAISE EXCEPTION 'TEST FAILED [live_kept]: instance % did not reach running (status=%)', kid, status; + END IF; + + -- Give reconciliation several passes; a live df-backed instance must survive. + PERFORM pg_sleep(6); + IF NOT pg_temp.duroxide_has(kid) THEN + RAISE EXCEPTION 'TEST FAILED [live_kept]: reconciliation reclaimed a df-backed engine record % (too aggressive)', kid; + END IF; + IF NOT EXISTS (SELECT 1 FROM df.instances WHERE id OPERATOR(pg_catalog.=) kid) THEN + RAISE EXCEPTION 'TEST FAILED [live_kept]: reconciliation removed the live df row for %', kid; + END IF; + + RAISE NOTICE 'PASSED [live_kept]: reconciliation left the live df-backed instance intact'; +END $$; + +-- Release the survivor and wait until it leaves the Running state, so no live +-- engine orchestration lingers past this phase. A Running orchestration left in +-- the shared data directory would be resumed by the next server started against +-- it (e.g. the upgrade tests) and run activities against that server's schema. +SELECT df.signal((SELECT id FROM _keep), 'keep-go'); + +DO $$ +DECLARE + kid TEXT; + status TEXT; + attempts INT := 0; +BEGIN + SELECT id INTO kid FROM _keep; + LOOP + SELECT s INTO status FROM df.status(kid) s; + -- Terminal, or already removed by the retention_days=0 pass. + EXIT WHEN status IS NULL + OR lower(status) IN ('completed', 'failed', 'cancelled') + OR attempts > 300; + PERFORM pg_sleep(0.1); + attempts := attempts + 1; + END LOOP; + IF status IS NOT NULL AND lower(status) = 'running' THEN + RAISE EXCEPTION 'TEST FAILED [live_kept]: survivor % never left running after signal', kid; + END IF; +END $$; + +DROP TABLE _keep; + +SELECT 'TEST PASSED: reconcile orphans' AS result;