diff --git a/USER_GUIDE.md b/USER_GUIDE.md index 4ce1b9b..be808c9 100644 --- a/USER_GUIDE.md +++ b/USER_GUIDE.md @@ -1606,36 +1606,65 @@ SELECT started_at, last_seen_at, The background worker updates `last_seen_at` every ~5 seconds as part of its normal operation. -### Data Retention (Automatic Pruning) +### Automatic Reconciliation + +pg_durable tracks each workflow in its own `df` tables, while the durable engine +keeps workflow state in a separate schema. To keep those two stores consistent — +and to keep `df.instances`/`df.nodes` from growing without bound — the background +worker runs a **best-effort reconciliation pass** that does two things: + +- **Removes expired terminal instances.** Old **terminal** instances (status + `completed`, `failed`, or `cancelled`) and their `df.nodes` rows are deleted, + along with their engine records. Running and pending instances are **never** + removed, regardless of age. +- **Reclaims orphaned engine records.** `df.start()` writes the `df` rows in the + caller's transaction but hands the workflow to the engine over a separate + connection; if that transaction **rolls back**, the `df` rows vanish while the + engine keeps an inert record (it can never load its rolled-back graph, so it + ends up failed). Reconciliation deletes such df-less engine records once they + age past `retention_days`. Anything the engine is still tracking with a live + `df` row is left untouched. + +Two Postmaster-context GUCs govern it (set in `postgresql.conf`, restart to apply): -To keep `df.instances` and `df.nodes` from growing without bound, the background -worker runs a **best-effort pruning pass roughly once an hour** that deletes old -**terminal** instances (status `completed`, `failed`, or `cancelled`) and their -associated `df.nodes` rows. Running and pending instances are **never** pruned, -regardless of age. +```ini +# How often (seconds) a reconciliation pass runs. 0 disables reconciliation. +pg_durable.reconcile_interval = 3600 + +# Days a terminal instance is retained before reconciliation removes it (and its +# engine record); also the age bound for reclaiming orphaned engine records. +# 0 removes terminal instances as soon as the next pass runs. +pg_durable.retention_days = 30 +``` -The policy is currently fixed (no configuration GUC): +Retention combines that window with a fixed hard cap: - **Hard cap — at most 10,000 terminal instances are retained, regardless of age.** The newest 10,000 terminal instances are kept; any beyond that are - pruned even if they are only minutes old. -- **Retention window — 30 days.** Terminal instances older than 30 days are - pruned even if the table holds fewer than 10,000 of them. + removed even if they are only minutes old. (This cap is fixed, not a GUC.) +- **Retention window — `retention_days`.** Terminal instances older than the + window are removed even if the table holds fewer than 10,000 of them. Equivalently, a terminal instance is retained only while it is **both** among the -newest 10,000 terminal instances **and** less than 30 days old; otherwise it is -eligible for pruning. +newest 10,000 terminal instances **and** younger than `retention_days`; otherwise +it is eligible for removal. Notes: - "Age" is measured from `completed_at` when set (instances that reached `completed`), otherwise from `created_at`. `updated_at` is intentionally **not** used, because it is user-writable and would let a low-privilege user influence - pruning. -- Pruning runs in a single transaction with foreign-key constraints deferred: - matching `df.nodes` rows are deleted first, then the instance rows. -- Pruning is best-effort: if a pass fails it is logged and retried on the next - interval; it never stops workflow execution. + what is removed. +- A pass retires an instance's engine record **before** deleting its `df` rows, so + an interrupted pass can only leave the harmless direction — an engine record with + no `df` row (which the next pass reclaims) — never a `df` row without its engine + record. Because `df` and the engine are separate stores written by separate + transactions, they are therefore **eventually consistent**: read-only views like + `df.list_instances()` can briefly show a just-terminal row with empty + engine-derived columns until a later pass reconciles it. The artifact is + transient, self-healing, and never affects workflow execution or results. +- Reconciliation is best-effort: if a pass fails it is logged and retried on the + next interval; it never stops workflow execution. - If you need to retain terminal history beyond these limits (e.g. for auditing), copy the rows you care about into your own table before they age out. diff --git a/scripts/test-e2e-local.sh b/scripts/test-e2e-local.sh index 919ffa7..0fe28e8 100755 --- a/scripts/test-e2e-local.sh +++ b/scripts/test-e2e-local.sh @@ -59,6 +59,7 @@ DEFAULT_BUILD_PHASES=( "connlimit-backpressure" "connlimit-timeout" "connlimit-startup" + "reconcile" ) ALL_PHASES=( @@ -68,6 +69,7 @@ ALL_PHASES=( "connlimit-backpressure" "connlimit-timeout" "connlimit-startup" + "reconcile" "http-disabled" "http-allow-all" ) @@ -141,6 +143,9 @@ phase_label() { connlimit-startup) echo "connection limit startup validation" ;; + reconcile) + echo "reconcile orphans" + ;; http-disabled) echo "HTTP disabled (no http Cargo feature)" ;; @@ -170,6 +175,9 @@ phase_for_test() { 46_connection_limit_startup_validation) echo "connlimit-startup" ;; + 54_reconcile_orphans) + echo "reconcile" + ;; 47_http_dsl_disabled) echo "http-disabled" ;; @@ -301,7 +309,7 @@ set_conf_line() { } clear_connlimit_gucs() { - sed -i.bak '/^[#[:space:]]*pg_durable\.max_/d; /^[#[:space:]]*pg_durable\.execution_/d' "$CONF_FILE" + sed -i.bak '/^[#[:space:]]*pg_durable\.max_/d; /^[#[:space:]]*pg_durable\.execution_/d; /^[#[:space:]]*pg_durable\.reconcile_/d; /^[#[:space:]]*pg_durable\.retention_/d' "$CONF_FILE" } ensure_data_dir() { @@ -479,6 +487,17 @@ configure_phase() { set_conf_line "pg_durable.enable_superuser_instances" "on" set_conf_line "pg_durable.max_duroxide_connections" "1" ;; + reconcile) + set_conf_line "shared_preload_libraries" "'pg_durable'" + set_conf_line "pg_durable.worker_role" "'postgres'" + set_conf_line "pg_durable.database" "'postgres'" + set_conf_line "pg_durable.enable_superuser_instances" "on" + # Short reconcile cadence and zero retention so a pass acts within the + # test window instead of the conservative production defaults + # (retention_days=0 makes an aged-out orphan eligible at once). + set_conf_line "pg_durable.reconcile_interval" "2" + set_conf_line "pg_durable.retention_days" "0" + ;; http-disabled) set_conf_line "shared_preload_libraries" "'pg_durable'" set_conf_line "pg_durable.worker_role" "'postgres'" @@ -506,7 +525,7 @@ prepare_phase() { http-allow-all) build_extension_http_allow_all ;; - no-preload|standard|superuser-guc-off|connlimit-backpressure|connlimit-timeout|connlimit-startup) + no-preload|standard|superuser-guc-off|connlimit-backpressure|connlimit-timeout|connlimit-startup|reconcile) # Rebuild if previous phase changed the Cargo features if [ "$CURRENT_FEATURES" != "http-allow-test-domains" ]; then build_extension @@ -560,6 +579,9 @@ prepare_phase() { ;; connlimit-startup) ;; + reconcile) + wait_for_worker_ready + ;; http-disabled|http-allow-all) ensure_e2e_role wait_for_worker_ready diff --git a/scripts/test-upgrade.sh b/scripts/test-upgrade.sh index 3e05e84..602fdc8 100755 --- a/scripts/test-upgrade.sh +++ b/scripts/test-upgrade.sh @@ -294,10 +294,12 @@ if [ -f "$DATA_DIR/postgresql.conf" ]; then sed -i.bak '/^#*port = /d' "$DATA_DIR/postgresql.conf" echo "port = $PG_PORT" >> "$DATA_DIR/postgresql.conf" fi - # Clear any connection-limit GUCs that E2E test phases (connlimit-*) may have - # left behind. Without this, max_duroxide_connections=1 causes the BGW to - # refuse to start, breaking the B1 wait-for-readiness check. - sed -i.bak '/^[#[:space:]]*pg_durable\.max_/d; /^[#[:space:]]*pg_durable\.execution_/d' "$DATA_DIR/postgresql.conf" + # Clear any per-phase GUCs that E2E test phases may have left in the shared + # postgresql.conf. Without this, the connlimit-* phases' max_duroxide_connections=1 + # causes the BGW to refuse to start (breaking the B1 wait-for-readiness check), + # and the reconcile phase's aggressive reconcile_interval/retention_days would + # remove terminal instances mid-test and make df.result() flaky. + sed -i.bak '/^[#[:space:]]*pg_durable\.max_/d; /^[#[:space:]]*pg_durable\.execution_/d; /^[#[:space:]]*pg_durable\.reconcile_/d; /^[#[:space:]]*pg_durable\.retention_/d' "$DATA_DIR/postgresql.conf" fi # If the server is already running, restart it so both the freshly installed diff --git a/src/lib.rs b/src/lib.rs index 482e7e1..6796bd0 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -39,6 +39,16 @@ pub static ENABLE_SUPERUSER_INSTANCES: GucSetting = GucSetting::::ne /// `docs/api-reference.md` and `USER_GUIDE.md`; update those mirrors if you change them. pub static LIST_INSTANCES_MAX_LIMIT: GucSetting = GucSetting::::new(1000); +/// Days a terminal ('completed'/'failed'/'cancelled') instance is retained +/// before reconciliation removes it and its engine record. The same age bound +/// governs when orphaned engine records left by a rolled-back `df.start()` are +/// reclaimed. `0` removes terminal instances as soon as the next pass runs. +pub static RETENTION_DAYS: GucSetting = GucSetting::::new(30); + +/// Seconds between background reconciliation passes. Each pass removes expired +/// terminal instances and reclaims orphaned engine records. `0` disables it. +pub static RECONCILE_INTERVAL: GucSetting = GucSetting::::new(3600); + // Module declarations pub mod activities; pub mod client; @@ -163,6 +173,28 @@ pub extern "C-unwind" fn _PG_init() { GucFlags::default(), ); + GucRegistry::define_int_guc( + c"pg_durable.retention_days", + c"Days a terminal instance is retained before reconciliation removes it", + c"Background reconciliation removes 'completed'/'failed'/'cancelled' df.instances rows (and their engine records) once they are older than this many days, subject to a fixed hard cap on the number retained. The same age bound governs when orphaned engine records left by a rolled-back df.start() are reclaimed. 0 removes terminal instances as soon as the next pass runs.", + &RETENTION_DAYS, + 0, + 36500, + GucContext::Postmaster, + GucFlags::default(), + ); + + GucRegistry::define_int_guc( + c"pg_durable.reconcile_interval", + c"Seconds between background reconciliation passes (0 disables reconciliation)", + c"Each background reconciliation pass removes expired terminal instances and reclaims orphaned engine records left by a rolled-back df.start(). Set to 0 to disable background reconciliation entirely.", + &RECONCILE_INTERVAL, + 0, + 86400, + GucContext::Postmaster, + GucFlags::default(), + ); + worker::register_background_worker(); } @@ -947,23 +979,23 @@ mod tests { ) } - async fn delete_prune_test_rows(pool: &sqlx::PgPool, id_list: &str) { - let mut tx = pool.begin().await.expect("begin cleanup transaction"); + async fn delete_expired_test_rows(pool: &sqlx::PgPool, id_list: &str) { + let mut tx = pool.begin().await.expect("begin teardown transaction"); sqlx::query("SET CONSTRAINTS ALL DEFERRED") .execute(&mut *tx) .await - .expect("defer cleanup constraints"); + .expect("defer teardown constraints"); sqlx::query(&format!( "DELETE FROM df.nodes WHERE instance_id IN ({id_list})" )) .execute(&mut *tx) .await - .expect("clean prune test nodes"); + .expect("clean test nodes"); sqlx::query(&format!("DELETE FROM df.instances WHERE id IN ({id_list})")) .execute(&mut *tx) .await - .expect("clean prune test instances"); - tx.commit().await.expect("commit cleanup"); + .expect("clean test instances"); + tx.commit().await.expect("commit teardown"); } // ======================================================================== @@ -1507,7 +1539,7 @@ mod tests { } #[pg_test] - fn test_prune_terminal_instances_respects_age_and_keep_count() { + fn test_expired_instance_removal_respects_age_and_keep_count() { let conn = test_database_connection_string(); let rt = tokio::runtime::Builder::new_current_thread() @@ -1531,7 +1563,7 @@ mod tests { .collect::>() .join(", "); - delete_prune_test_rows(&pool, &id_list).await; + delete_expired_test_rows(&pool, &id_list).await; let mut fixture_tx = pool.begin().await.expect("begin fixture transaction"); sqlx::query("SET CONSTRAINTS ALL DEFERRED") @@ -1544,10 +1576,10 @@ mod tests { -- by COALESCE(completed_at, created_at): -- aa261001 (1d, r1) -> keep (within cap, young) -- aa261002 (2d, r2) -> keep (within cap, young) - -- aa261003 (3d, r3) -> PRUNE by the hard cap even though it is + -- aa261003 (3d, r3) -> REMOVE by the hard cap even though it is -- only 3 days old (rank > max_keep) - -- aa261004 (40d, r4) -> PRUNE (beyond cap and past retention) - -- aa261005 (50d, r5) -> PRUNE (beyond cap and past retention) + -- aa261004 (40d, r4) -> REMOVE (beyond cap and past retention) + -- aa261005 (50d, r5) -> REMOVE (beyond cap and past retention) -- aa261006 (running) -> keep (non-terminal, never considered) -- aa261004/aa261005 are 'failed'/'cancelled' (NULL completed_at) with -- a forged far-future `updated_at`; they must still rank/age by their @@ -1556,9 +1588,9 @@ mod tests { VALUES ('aa261001', 'keep-recent-1', 'bb261001', 'completed', 1, true, false), ('aa261002', 'keep-recent-2', 'bb261002', 'completed', 2, true, false), - ('aa261003', 'prune-young-over-cap', 'bb261003', 'completed', 3, true, false), - ('aa261004', 'prune-old-failed-forged', 'bb261004', 'failed', 40, false, true), - ('aa261005', 'prune-old-cancelled-forged', 'bb261005', 'cancelled', 50, false, true), + ('aa261003', 'expire-young-over-cap', 'bb261003', 'completed', 3, true, false), + ('aa261004', 'expire-old-failed-forged', 'bb261004', 'failed', 40, false, true), + ('aa261005', 'expire-old-cancelled-forged', 'bb261005', 'cancelled', 50, false, true), ('aa261006', 'keep-running', 'bb261006', 'running', 90, false, false) ) INSERT INTO df.instances @@ -1582,7 +1614,7 @@ mod tests { ) .execute(&mut *fixture_tx) .await - .expect("insert prune instances"); + .expect("insert test instances"); sqlx::query( r#" @@ -1610,12 +1642,12 @@ mod tests { ) .execute(&mut *fixture_tx) .await - .expect("insert prune nodes"); + .expect("insert test nodes"); let stats = - crate::worker::prune_terminal_instances_transaction(&mut fixture_tx, 30, 2) + crate::worker::delete_expired_instances_transaction(&mut fixture_tx, 30, 2) .await - .expect("prune terminal instances"); + .expect("delete expired instances"); let remaining_instances: i64 = sqlx::query_scalar(&format!( "SELECT pg_catalog.count(*)::bigint FROM df.instances WHERE id IN ({id_list})" @@ -1644,19 +1676,37 @@ mod tests { fixture_tx .rollback() .await - .expect("rollback prune test fixture"); + .expect("rollback test fixture"); pool.close().await; stats }); - assert_eq!( - stats, - crate::worker::PruneStats { - instances_deleted: 3, - nodes_deleted: 3, - } - ); + assert_eq!(stats.instances_deleted, 3); + assert_eq!(stats.nodes_deleted, 3); + assert_eq!(stats.deleted_ids.len(), 3); + } + + #[pg_test] + fn test_select_orphans_excludes_present_and_sub_orchestrations() { + use std::collections::HashSet; + + // Failed engine instances reconciliation sees: a df-backed one, a genuine + // orphan (rolled-back df.start), and two sub-orchestrations the engine + // created internally for fan-out. + let failed = vec![ + "inst-backed".to_string(), + "inst-orphan".to_string(), + "sub::inst-backed::0".to_string(), + "inst-backed::sub::1".to_string(), + ]; + // Only inst-backed still has a df.instances row. + let present: HashSet = ["inst-backed".to_string()].into_iter().collect(); + + let orphans = crate::worker::select_orphans(failed, &present); + + // Only the df-less, non-sub instance is reclaimed. + assert_eq!(orphans, vec!["inst-orphan".to_string()]); } // ======================================================================== diff --git a/src/types.rs b/src/types.rs index b0ba264..4a6d78f 100644 --- a/src/types.rs +++ b/src/types.rs @@ -57,6 +57,17 @@ pub fn get_execution_acquire_timeout() -> Duration { Duration::from_secs(crate::EXECUTION_ACQUIRE_TIMEOUT.get() as u64) } +/// Days a terminal instance is retained before reconciliation removes it and its +/// engine record; also the age bound for reclaiming orphaned engine records. +pub fn get_retention_days() -> i32 { + crate::RETENTION_DAYS.get() +} + +/// Interval between background reconciliation passes. Zero disables reconciliation. +pub fn get_reconcile_interval() -> Duration { + Duration::from_secs(crate::RECONCILE_INTERVAL.get() as u64) +} + /// Returns `true` when superuser-submitted instances are permitted. pub fn superuser_instances_enabled() -> bool { crate::ENABLE_SUPERUSER_INSTANCES.get() diff --git a/src/worker.rs b/src/worker.rs index 2bd3bf7..987a041 100644 --- a/src/worker.rs +++ b/src/worker.rs @@ -12,30 +12,32 @@ use std::sync::Arc; use std::time::Duration; use duroxide::runtime; +use duroxide::{Client, InstanceFilter}; use duroxide_pg::PostgresProvider; use tracing_subscriber::EnvFilter; use crate::registry::{create_activity_registry, create_orchestration_registry}; use crate::types::{ get_max_duroxide_connections, get_max_management_connections, get_max_user_connections, - postgres_connection_string, resolve_duroxide_schema_pool, worker_provider_config, + get_reconcile_interval, get_retention_days, postgres_connection_string, + resolve_duroxide_schema_pool, worker_provider_config, }; -const TERMINAL_INSTANCE_PRUNE_INTERVAL: Duration = Duration::from_secs(60 * 60); // Retention policy for terminal ('completed'/'failed'/'cancelled') instances. -// A terminal instance is pruned when it is OUTSIDE the newest +// A terminal instance is removed when it is OUTSIDE the newest // TERMINAL_INSTANCE_MAX_KEEP rows (a hard cap enforced regardless of age) OR -// older than TERMINAL_INSTANCE_RETENTION_DAYS. Equivalently, an instance is -// retained only while it is BOTH among the newest TERMINAL_INSTANCE_MAX_KEEP -// terminal rows AND younger than the retention window. The number of retained -// terminal instances therefore never exceeds TERMINAL_INSTANCE_MAX_KEEP. -const TERMINAL_INSTANCE_RETENTION_DAYS: i32 = 30; +// older than pg_durable.retention_days. Equivalently, an instance is retained +// only while it is BOTH among the newest TERMINAL_INSTANCE_MAX_KEEP terminal rows +// AND younger than the retention window. The number of retained terminal +// instances therefore never exceeds TERMINAL_INSTANCE_MAX_KEEP. const TERMINAL_INSTANCE_MAX_KEEP: i64 = 10_000; -#[derive(Debug, Clone, Copy, PartialEq, Eq)] -pub(crate) struct PruneStats { +#[derive(Debug, Clone, PartialEq, Eq)] +pub(crate) struct ExpiredDeletion { pub instances_deleted: i64, pub nodes_deleted: i64, + /// Ids of the df.instances rows this deletion removed. + pub deleted_ids: Vec, } /// Initialize tracing subscriber for duroxide logs. @@ -211,7 +213,7 @@ async fn run_duroxide_runtime() { duroxide_schema ); - let Some(duroxide_runtime) = initialize_duroxide_runtime( + let Some((duroxide_runtime, duroxide_store)) = initialize_duroxide_runtime( &pg_conn_str, INIT_RETRY_INTERVAL, &mgmt_pool, @@ -247,6 +249,7 @@ async fn run_duroxide_runtime() { &poll_pool, &mgmt_pool, duroxide_runtime, + duroxide_store, EXTENSION_DROP_POLL_INTERVAL, SHUTDOWN_CHECK_INTERVAL, epoch_id.as_deref(), @@ -470,7 +473,7 @@ async fn initialize_duroxide_runtime( retry_interval: Duration, mgmt_pool: &sqlx::PgPool, schema_name: &str, -) -> Option> { +) -> Option<(Arc, Arc)> { log!("pg_durable: initializing duroxide runtime..."); // Control duroxide provider pool size via env var (the only mechanism @@ -553,11 +556,13 @@ async fn initialize_duroxide_runtime( create_activity_registry(Arc::new(mgmt_pool.clone()), user_semaphore.clone()); let orchestrations = create_orchestration_registry(); + let store_for_client = store.clone(); + let duroxide_runtime = runtime::Runtime::start_with_store(store, activities, orchestrations).await; log!("pg_durable: duroxide runtime started"); - return Some(duroxide_runtime); + return Some((duroxide_runtime, store_for_client)); } } @@ -645,37 +650,15 @@ async fn check_epoch_sentinel(pool: &sqlx::PgPool, epoch_id: &str) -> bool { matches!(result, Ok(Some(_))) } -async fn prune_terminal_instances(pool: &sqlx::PgPool) -> Result { - prune_terminal_instances_with_limits( - pool, - TERMINAL_INSTANCE_RETENTION_DAYS, - TERMINAL_INSTANCE_MAX_KEEP, - ) - .await -} - -pub(crate) async fn prune_terminal_instances_with_limits( - pool: &sqlx::PgPool, - retention_days: i32, - max_keep: i64, -) -> Result { - let mut tx = pool.begin().await?; - let stats = prune_terminal_instances_transaction(&mut tx, retention_days, max_keep).await?; - tx.commit().await?; - - Ok(stats) -} - -pub(crate) async fn prune_terminal_instances_transaction( +/// Expired terminal instances: those beyond the newest `max_keep` (a hard cap +/// enforced regardless of age) OR older than `retention_days`. Read-only — the +/// caller retires the engine records before deleting the df rows. +async fn select_expired_instance_ids_tx( tx: &mut sqlx::Transaction<'_, sqlx::Postgres>, retention_days: i32, max_keep: i64, -) -> Result { - sqlx::query("SET CONSTRAINTS ALL DEFERRED") - .execute(&mut **tx) - .await?; - - let (instances_deleted, nodes_deleted): (i64, i64) = sqlx::query_as( +) -> Result, sqlx::Error> { + let ids: Option> = sqlx::query_scalar( r#" WITH terminal_instances AS ( SELECT @@ -691,72 +674,147 @@ pub(crate) async fn prune_terminal_instances_transaction( -- is intentionally excluded: PUBLIC has column-level UPDATE on -- it, and for 'failed'/'cancelled' rows `completed_at` is NULL, -- so including `updated_at` would let a low-privilege user forge - -- the prune ordering/age. `completed_at` (set by the worker on + -- the removal ordering/age. `completed_at` (set by the worker on -- completion) and `created_at` (insert-time default, not -- grantable) are not user-writable. COALESCE(completed_at, created_at) AS terminal_at FROM df.instances WHERE status OPERATOR(pg_catalog.=) ANY (ARRAY['completed', 'failed', 'cancelled']) ) ranked - ), - prune_candidates AS ( - SELECT id - FROM terminal_instances - -- Prune when the row is beyond the newest $1 terminal instances (a - -- hard cap enforced regardless of age) OR older than the retention - -- window ($2 days). Retained rows are thus always within the newest - -- $1 AND younger than the retention window, so the retained terminal - -- count never exceeds $1. - WHERE terminal_rank OPERATOR(pg_catalog.>) $1 - OR terminal_at OPERATOR(pg_catalog.<) - (pg_catalog.now() OPERATOR(pg_catalog.-) pg_catalog.make_interval(days => $2::int)) - ), - deleted_nodes AS ( + ) + -- Expired when the row is beyond the newest $1 terminal instances (a hard + -- cap enforced regardless of age) OR older than the retention window ($2 + -- days). Retained rows are thus always within the newest $1 AND younger than + -- the retention window, so the retained terminal count never exceeds $1. + SELECT pg_catalog.array_agg(id) + FROM terminal_instances + WHERE terminal_rank OPERATOR(pg_catalog.>) $1 + OR terminal_at OPERATOR(pg_catalog.<) + (pg_catalog.now() OPERATOR(pg_catalog.-) pg_catalog.make_interval(days => $2::int)) + "#, + ) + .bind(max_keep) + .bind(retention_days) + .fetch_one(&mut **tx) + .await?; + Ok(ids.unwrap_or_default()) +} + +/// Delete the df.nodes and df.instances rows for `ids` (nodes first, with the +/// circular same-instance FK deferred). Returns the counts deleted. +async fn delete_expired_instances_tx( + tx: &mut sqlx::Transaction<'_, sqlx::Postgres>, + ids: &[String], +) -> Result { + if ids.is_empty() { + return Ok(ExpiredDeletion { + instances_deleted: 0, + nodes_deleted: 0, + deleted_ids: Vec::new(), + }); + } + sqlx::query("SET CONSTRAINTS ALL DEFERRED") + .execute(&mut **tx) + .await?; + + let (deleted_ids, nodes_deleted): (Option>, i64) = sqlx::query_as( + r#" + WITH deleted_nodes AS ( DELETE FROM df.nodes n - USING prune_candidates pc - WHERE n.instance_id OPERATOR(pg_catalog.=) pc.id + USING pg_catalog.unnest($1::text[]) AS t(id) + WHERE n.instance_id OPERATOR(pg_catalog.=) t.id RETURNING n.instance_id ), deleted_instances AS ( DELETE FROM df.instances i - USING prune_candidates pc - WHERE i.id OPERATOR(pg_catalog.=) pc.id + USING pg_catalog.unnest($1::text[]) AS t(id) + WHERE i.id OPERATOR(pg_catalog.=) t.id RETURNING i.id ) SELECT - (SELECT pg_catalog.count(*)::bigint FROM deleted_instances) AS instances_deleted, + (SELECT pg_catalog.array_agg(id) FROM deleted_instances) AS deleted_ids, (SELECT pg_catalog.count(*)::bigint FROM deleted_nodes) AS nodes_deleted "#, ) - .bind(max_keep) - .bind(retention_days) + .bind(ids) .fetch_one(&mut **tx) .await?; - Ok(PruneStats { - instances_deleted, + let deleted_ids = deleted_ids.unwrap_or_default(); + Ok(ExpiredDeletion { + instances_deleted: deleted_ids.len() as i64, nodes_deleted, + deleted_ids, }) } +/// Read the expired terminal instances. +async fn select_expired_instance_ids( + pool: &sqlx::PgPool, + retention_days: i32, + max_keep: i64, +) -> Result, sqlx::Error> { + let mut tx = pool.begin().await?; + let ids = select_expired_instance_ids_tx(&mut tx, retention_days, max_keep).await?; + tx.commit().await?; + Ok(ids) +} + +/// Delete the df rows for the given expired-instance ids. +async fn delete_expired_instances( + pool: &sqlx::PgPool, + ids: &[String], +) -> Result { + let mut tx = pool.begin().await?; + let stats = delete_expired_instances_tx(&mut tx, ids).await?; + tx.commit().await?; + Ok(stats) +} + +/// Select and delete the eligible terminal instances in a single transaction. +/// Used by tests; the background worker instead runs the two halves separately so +/// it can retire the engine records between them (see +/// `run_until_extension_dropped_or_shutdown`). +#[cfg(any(test, feature = "pg_test"))] +pub(crate) async fn delete_expired_instances_transaction( + tx: &mut sqlx::Transaction<'_, sqlx::Postgres>, + retention_days: i32, + max_keep: i64, +) -> Result { + let ids = select_expired_instance_ids_tx(tx, retention_days, max_keep).await?; + delete_expired_instances_tx(tx, &ids).await +} + async fn run_until_extension_dropped_or_shutdown( poll_pool: &sqlx::PgPool, maintenance_pool: &sqlx::PgPool, duroxide_runtime: Arc, + duroxide_store: Arc, drop_poll_interval: Duration, shutdown_check_interval: Duration, epoch_id: Option<&str>, ) { log!("pg_durable: processing durable functions..."); + let client = Client::new(duroxide_store.clone()); + let mut drop_check = tokio::time::interval(drop_poll_interval); drop_check.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay); - let mut prune_check = tokio::time::interval_at( - tokio::time::Instant::now() + TERMINAL_INSTANCE_PRUNE_INTERVAL, - TERMINAL_INSTANCE_PRUNE_INTERVAL, + // reconcile_interval=0 disables the sweep; the tick is then gated off, but + // interval_at still needs a positive period, so fall back to 1h. + let reconcile_interval = get_reconcile_interval(); + let reconcile_enabled = !reconcile_interval.is_zero(); + let effective_reconcile_interval = if reconcile_interval.is_zero() { + Duration::from_secs(60 * 60) + } else { + reconcile_interval + }; + let mut reconcile_check = tokio::time::interval_at( + tokio::time::Instant::now() + effective_reconcile_interval, + effective_reconcile_interval, ); - prune_check.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay); + reconcile_check.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay); loop { tokio::select! { @@ -777,19 +835,51 @@ async fn run_until_extension_dropped_or_shutdown( break; } } - _ = prune_check.tick() => { - match prune_terminal_instances(maintenance_pool).await { - Ok(stats) if stats.instances_deleted > 0 || stats.nodes_deleted > 0 => { - log!( - "pg_durable: pruned {} terminal instance(s) and {} node row(s)", - stats.instances_deleted, - stats.nodes_deleted - ); + _ = reconcile_check.tick(), if reconcile_enabled => { + let retention_days = get_retention_days(); + + // Engine-first: retire the engine record before the df row, so a + // failed pass only leaves the harmless direction — an engine record + // with no df row (invisible to df.list_instances, reclaimed below), + // never the reverse. The stores are thus eventually consistent. + match select_expired_instance_ids( + maintenance_pool, + retention_days, + TERMINAL_INSTANCE_MAX_KEEP, + ) + .await + { + Ok(candidates) if !candidates.is_empty() => { + // Only delete the df rows once the engine records are gone; + // if that fails, leave both in place and retry next pass. + if retire_engine_records(&client, &candidates).await { + match delete_expired_instances(maintenance_pool, &candidates).await { + Ok(stats) => { + if stats.instances_deleted > 0 || stats.nodes_deleted > 0 { + log!( + "pg_durable: removed {} expired instance(s) and {} node row(s)", + stats.instances_deleted, + stats.nodes_deleted + ); + } + } + Err(e) => { + log!("pg_durable: removing expired instances failed: {e}"); + } + } + } } Ok(_) => {} - Err(e) => { - log!("pg_durable: terminal instance pruning failed: {}", e); + Err(e) => log!("pg_durable: selecting expired instances failed: {e}"), + } + + let retention = Duration::from_secs(retention_days as u64 * 86_400); + match reclaim_orphaned_instances(maintenance_pool, &client, retention).await { + Ok(reclaimed) if reclaimed > 0 => { + log!("pg_durable: reclaimed {reclaimed} orphaned engine record(s)"); } + Ok(_) => {} + Err(e) => log!("pg_durable: reclaiming orphaned engine records failed: {e}"), } } } @@ -799,3 +889,128 @@ async fn run_until_extension_dropped_or_shutdown( duroxide_runtime.shutdown(Some(10_000)).await; log!("pg_durable: duroxide runtime shutdown complete"); } + +/// Maximum orphans one reconciliation pass reclaims, bounding a single tick's work. +const RECLAIM_BATCH: u32 = 1000; + +/// True for sub-orchestration instance ids, which the engine creates internally +/// (JOIN/RACE fan-out) and which legitimately have no df.instances row — +/// reconciliation must never delete them. +fn is_sub_orchestration(id: &str) -> bool { + id.starts_with("sub::") || id.contains("::sub::") +} + +/// Milliseconds-since-epoch cutoff for "older than `retention`". Used as the +/// engine's `completed_before` filter so an orphan younger than the retention +/// window is left alone. +fn retention_cutoff_ms(retention: Duration) -> u64 { + let since_epoch = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap_or_default(); + since_epoch.saturating_sub(retention).as_millis() as u64 +} + +/// From the engine's `Failed` instance ids and the set that still have a +/// df.instances row, select the orphans to reclaim: those with no df row and that +/// are not sub-orchestrations (the engine creates sub-orchestrations internally +/// for JOIN/RACE fan-out; they legitimately have no df row and must be kept). +pub(crate) fn select_orphans( + failed_ids: Vec, + present: &std::collections::HashSet, +) -> Vec { + failed_ids + .into_iter() + .filter(|id| !is_sub_orchestration(id) && !present.contains(id)) + .collect() +} + +/// Delete engine records the engine still holds but pg_durable no longer has a +/// df.instances row for, once they age past the `retention` window. Returns the +/// count reclaimed. +/// +/// Only `Failed` engine instances are considered: a rolled-back df.start() leaves +/// its engine instance `Failed` (it can never load its graph), and scanning only +/// `Failed` keeps this cheap — the (potentially large) `Completed` set is retired +/// by reconciliation ahead of removing its df rows instead (see the reconcile +/// branch). Sub-orchestrations are excluded, and the `completed_before` filter +/// enforces the retention window so an instance whose df commit is merely +/// in-flight or just-landed is never touched. +async fn reclaim_orphaned_instances( + pool: &sqlx::PgPool, + client: &Client, + retention: Duration, +) -> Result { + let candidates: Vec = client + .list_instances_by_status("Failed") + .await + .map_err(|e| format!("list failed instances: {e:?}"))?; + if candidates.is_empty() { + return Ok(0); + } + + // Keep only ids with no df.instances row (orphans). The worker pool bypasses + // RLS, so this sees every user's rows. + let present: std::collections::HashSet = + sqlx::query_scalar("SELECT id FROM df.instances WHERE id = ANY($1)") + .bind(&candidates) + .fetch_all(pool) + .await + .map_err(|e| format!("cross-check df.instances: {e}"))? + .into_iter() + .collect(); + + let mut orphans = select_orphans(candidates, &present); + if orphans.is_empty() { + return Ok(0); + } + orphans.truncate(RECLAIM_BATCH as usize); + + let result = client + .delete_instance_bulk(InstanceFilter { + instance_ids: Some(orphans), + completed_before: Some(retention_cutoff_ms(retention)), + limit: Some(RECLAIM_BATCH), + }) + .await + .map_err(|e| format!("delete_instance_bulk: {e:?}"))?; + Ok(result.instances_deleted) +} + +/// Best-effort deletion of duroxide engine records by id, ahead of deleting the +/// matching df rows. Returns `true` when it is safe to delete those df rows — +/// the engine delete succeeded, or there was nothing to delete. On failure it +/// returns `false` so the caller leaves the df rows in place and retries the +/// whole instance next pass rather than orphaning its engine record. +async fn retire_engine_records(client: &Client, ids: &[String]) -> bool { + let ids: Vec = ids + .iter() + .filter(|id| !is_sub_orchestration(id)) + .cloned() + .collect(); + if ids.is_empty() { + return true; + } + let limit = ids.len().min(u32::MAX as usize) as u32; + match client + .delete_instance_bulk(InstanceFilter { + instance_ids: Some(ids), + completed_before: None, + limit: Some(limit), + }) + .await + { + Ok(result) => { + if result.instances_deleted > 0 { + log!( + "pg_durable: retired {} engine record(s) ahead of removing the df rows", + result.instances_deleted + ); + } + true + } + Err(e) => { + log!("pg_durable: failed to retire engine records; deferring df removal: {e:?}"); + false + } + } +} diff --git a/tests/e2e/sql/54_reconcile_orphans.sql b/tests/e2e/sql/54_reconcile_orphans.sql new file mode 100644 index 0000000..b443669 --- /dev/null +++ b/tests/e2e/sql/54_reconcile_orphans.sql @@ -0,0 +1,156 @@ +-- Copyright (c) Microsoft Corporation. +-- Licensed under the PostgreSQL License. + +-- Reconciliation reclaims orphaned engine records: those the engine still holds +-- but pg_durable no longer has a df.instances row for. +-- +-- A rolled-back df.start() is the canonical case: df.start() hands the workflow +-- to the engine over a separate connection (which commits), then the caller's +-- transaction rolls back — so the df rows vanish but the engine keeps an inert +-- instance that can never load its (rolled-back) graph and ends up Failed. +-- Reconciliation reclaims such df-less engine records once they age past the +-- retention window, and must leave df-backed instances alone. +-- +-- Runs in the "reconcile" phase with pg_durable.reconcile_interval = 2 and +-- pg_durable.retention_days = 0 so a pass acts within the test window instead of +-- the conservative production defaults. Base connection is the superuser +-- (postgres); enable_superuser_instances is on. + +-- Helper: does the engine still know this instance? +CREATE OR REPLACE FUNCTION pg_temp.duroxide_has(p_id TEXT) +RETURNS boolean LANGUAGE plpgsql AS $$ +DECLARE + sch TEXT := df.duroxide_schema(); + n INT; +BEGIN + EXECUTE format('SELECT pg_catalog.count(*) FROM %I.get_instance_info($1)', sch) + INTO n USING p_id; + RETURN n > 0; +END $$; + +-- =========================================================================== +-- Scenario 1: a rolled-back df.start() leaves a df-less engine record that +-- reconciliation reclaims. +-- =========================================================================== + +-- Capture the id via \gset (a client variable survives the rollback), then stash +-- it in a temp table *after* the rollback so the DO block can read it (\gset +-- variables do not interpolate inside DO blocks). +BEGIN; +SELECT df.start('SELECT 1', 'orphan-probe') AS gid \gset +ROLLBACK; + +DROP TABLE IF EXISTS _orphan; +CREATE TEMP TABLE _orphan (id TEXT); +INSERT INTO _orphan VALUES (:'gid'); + +DO $$ +DECLARE + gid TEXT; + appeared BOOLEAN := FALSE; + gone BOOLEAN := FALSE; + attempts INT := 0; +BEGIN + SELECT id INTO gid FROM _orphan; + + -- The engine record materializes once the worker fails to load the + -- rolled-back graph (~5s). Confirm it exists so the reclaim assertion is real. + LOOP + IF pg_temp.duroxide_has(gid) THEN appeared := TRUE; EXIT; END IF; + EXIT WHEN attempts > 300; -- 30s + PERFORM pg_sleep(0.1); + attempts := attempts + 1; + END LOOP; + IF NOT appeared THEN + RAISE EXCEPTION 'TEST FAILED [orphan_reclaimed]: orphan % never materialized in the engine (cannot test reclaiming)', gid; + END IF; + + -- Reconciliation (reconcile_interval=2s, retention_days=0) must then reclaim it. + attempts := 0; + LOOP + IF NOT pg_temp.duroxide_has(gid) THEN gone := TRUE; EXIT; END IF; + EXIT WHEN attempts > 300; -- 30s + PERFORM pg_sleep(0.1); + attempts := attempts + 1; + END LOOP; + IF NOT gone THEN + RAISE EXCEPTION 'TEST FAILED [orphan_reclaimed]: reconciliation did not reclaim the df-less engine record %', gid; + END IF; + + RAISE NOTICE 'PASSED [orphan_reclaimed]: reconciliation reclaimed the df-less (orphan) engine record'; +END $$; + +DROP TABLE _orphan; + +-- =========================================================================== +-- Scenario 2: a live, df-backed instance is left alone (reconciliation is not so +-- aggressive that it touches instances pg_durable still tracks). A signal wait +-- keeps it Running — hence neither terminal (removal skips it) nor Failed +-- (reclamation skips it) — so it stays a live, df-backed instance across passes. +-- =========================================================================== + +DROP TABLE IF EXISTS _keep; +CREATE TEMP TABLE _keep (id TEXT); +INSERT INTO _keep SELECT df.start(df.wait_for_signal('keep-go'), 'keep'); + +DO $$ +DECLARE + kid TEXT; + status TEXT; + attempts INT := 0; +BEGIN + SELECT id INTO kid FROM _keep; + + -- Wait until the engine is actually running it, so duroxide_has is meaningful. + LOOP + SELECT s INTO status FROM df.status(kid) s; + EXIT WHEN lower(COALESCE(status, '')) = 'running' OR attempts > 300; + PERFORM pg_sleep(0.1); + attempts := attempts + 1; + END LOOP; + IF lower(COALESCE(status, 'pending')) <> 'running' THEN + RAISE EXCEPTION 'TEST FAILED [live_kept]: instance % did not reach running (status=%)', kid, status; + END IF; + + -- Give reconciliation several passes; a live df-backed instance must survive. + PERFORM pg_sleep(6); + IF NOT pg_temp.duroxide_has(kid) THEN + RAISE EXCEPTION 'TEST FAILED [live_kept]: reconciliation reclaimed a df-backed engine record % (too aggressive)', kid; + END IF; + IF NOT EXISTS (SELECT 1 FROM df.instances WHERE id OPERATOR(pg_catalog.=) kid) THEN + RAISE EXCEPTION 'TEST FAILED [live_kept]: reconciliation removed the live df row for %', kid; + END IF; + + RAISE NOTICE 'PASSED [live_kept]: reconciliation left the live df-backed instance intact'; +END $$; + +-- Release the survivor and wait until it leaves the Running state, so no live +-- engine orchestration lingers past this phase. A Running orchestration left in +-- the shared data directory would be resumed by the next server started against +-- it (e.g. the upgrade tests) and run activities against that server's schema. +SELECT df.signal((SELECT id FROM _keep), 'keep-go'); + +DO $$ +DECLARE + kid TEXT; + status TEXT; + attempts INT := 0; +BEGIN + SELECT id INTO kid FROM _keep; + LOOP + SELECT s INTO status FROM df.status(kid) s; + -- Terminal, or already removed by the retention_days=0 pass. + EXIT WHEN status IS NULL + OR lower(status) IN ('completed', 'failed', 'cancelled') + OR attempts > 300; + PERFORM pg_sleep(0.1); + attempts := attempts + 1; + END LOOP; + IF status IS NOT NULL AND lower(status) = 'running' THEN + RAISE EXCEPTION 'TEST FAILED [live_kept]: survivor % never left running after signal', kid; + END IF; +END $$; + +DROP TABLE _keep; + +SELECT 'TEST PASSED: reconcile orphans' AS result;