Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 46 additions & 17 deletions USER_GUIDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -1606,36 +1606,65 @@ SELECT started_at, last_seen_at,

The background worker updates `last_seen_at` every ~5 seconds as part of its normal operation.

### Data Retention (Automatic Pruning)
### Automatic Reconciliation

pg_durable tracks each workflow in its own `df` tables, while the durable engine
keeps workflow state in a separate schema. To keep those two stores consistent —
and to keep `df.instances`/`df.nodes` from growing without bound — the background
worker runs a **best-effort reconciliation pass** that does two things:

- **Removes expired terminal instances.** Old **terminal** instances (status
`completed`, `failed`, or `cancelled`) and their `df.nodes` rows are deleted,
along with their engine records. Running and pending instances are **never**
removed, regardless of age.
- **Reclaims orphaned engine records.** `df.start()` writes the `df` rows in the
caller's transaction but hands the workflow to the engine over a separate
connection; if that transaction **rolls back**, the `df` rows vanish while the
engine keeps an inert record (it can never load its rolled-back graph, so it
ends up failed). Reconciliation deletes such df-less engine records once they
age past `retention_days`. Anything the engine is still tracking with a live
`df` row is left untouched.

Two Postmaster-context GUCs govern it (set in `postgresql.conf`, restart to apply):

To keep `df.instances` and `df.nodes` from growing without bound, the background
worker runs a **best-effort pruning pass roughly once an hour** that deletes old
**terminal** instances (status `completed`, `failed`, or `cancelled`) and their
associated `df.nodes` rows. Running and pending instances are **never** pruned,
regardless of age.
```ini
# How often (seconds) a reconciliation pass runs. 0 disables reconciliation.
pg_durable.reconcile_interval = 3600

# Days a terminal instance is retained before reconciliation removes it (and its
# engine record); also the age bound for reclaiming orphaned engine records.
# 0 removes terminal instances as soon as the next pass runs.
pg_durable.retention_days = 30
```

The policy is currently fixed (no configuration GUC):
Retention combines that window with a fixed hard cap:

- **Hard cap — at most 10,000 terminal instances are retained, regardless of
age.** The newest 10,000 terminal instances are kept; any beyond that are
pruned even if they are only minutes old.
- **Retention window — 30 days.** Terminal instances older than 30 days are
pruned even if the table holds fewer than 10,000 of them.
removed even if they are only minutes old. (This cap is fixed, not a GUC.)
- **Retention window — `retention_days`.** Terminal instances older than the
window are removed even if the table holds fewer than 10,000 of them.

Equivalently, a terminal instance is retained only while it is **both** among the
newest 10,000 terminal instances **and** less than 30 days old; otherwise it is
eligible for pruning.
newest 10,000 terminal instances **and** younger than `retention_days`; otherwise
it is eligible for removal.

Notes:

- "Age" is measured from `completed_at` when set (instances that reached
`completed`), otherwise from `created_at`. `updated_at` is intentionally **not**
used, because it is user-writable and would let a low-privilege user influence
pruning.
- Pruning runs in a single transaction with foreign-key constraints deferred:
matching `df.nodes` rows are deleted first, then the instance rows.
- Pruning is best-effort: if a pass fails it is logged and retried on the next
interval; it never stops workflow execution.
what is removed.
- A pass retires an instance's engine record **before** deleting its `df` rows, so
an interrupted pass can only leave the harmless direction — an engine record with
no `df` row (which the next pass reclaims) — never a `df` row without its engine
record. Because `df` and the engine are separate stores written by separate
transactions, they are therefore **eventually consistent**: read-only views like
`df.list_instances()` can briefly show a just-terminal row with empty
engine-derived columns until a later pass reconciles it. The artifact is
transient, self-healing, and never affects workflow execution or results.
- Reconciliation is best-effort: if a pass fails it is logged and retried on the
next interval; it never stops workflow execution.
- If you need to retain terminal history beyond these limits (e.g. for auditing),
copy the rows you care about into your own table before they age out.

Expand Down
26 changes: 24 additions & 2 deletions scripts/test-e2e-local.sh
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ DEFAULT_BUILD_PHASES=(
"connlimit-backpressure"
"connlimit-timeout"
"connlimit-startup"
"reconcile"
)

ALL_PHASES=(
Expand All @@ -68,6 +69,7 @@ ALL_PHASES=(
"connlimit-backpressure"
"connlimit-timeout"
"connlimit-startup"
"reconcile"
"http-disabled"
"http-allow-all"
)
Expand Down Expand Up @@ -141,6 +143,9 @@ phase_label() {
connlimit-startup)
echo "connection limit startup validation"
;;
reconcile)
echo "reconcile orphans"
;;
http-disabled)
echo "HTTP disabled (no http Cargo feature)"
;;
Expand Down Expand Up @@ -170,6 +175,9 @@ phase_for_test() {
46_connection_limit_startup_validation)
echo "connlimit-startup"
;;
54_reconcile_orphans)
echo "reconcile"
;;
47_http_dsl_disabled)
echo "http-disabled"
;;
Expand Down Expand Up @@ -301,7 +309,7 @@ set_conf_line() {
}

clear_connlimit_gucs() {
sed -i.bak '/^[#[:space:]]*pg_durable\.max_/d; /^[#[:space:]]*pg_durable\.execution_/d' "$CONF_FILE"
sed -i.bak '/^[#[:space:]]*pg_durable\.max_/d; /^[#[:space:]]*pg_durable\.execution_/d; /^[#[:space:]]*pg_durable\.reconcile_/d; /^[#[:space:]]*pg_durable\.retention_/d' "$CONF_FILE"
}

ensure_data_dir() {
Expand Down Expand Up @@ -479,6 +487,17 @@ configure_phase() {
set_conf_line "pg_durable.enable_superuser_instances" "on"
set_conf_line "pg_durable.max_duroxide_connections" "1"
;;
reconcile)
set_conf_line "shared_preload_libraries" "'pg_durable'"
set_conf_line "pg_durable.worker_role" "'postgres'"
set_conf_line "pg_durable.database" "'postgres'"
set_conf_line "pg_durable.enable_superuser_instances" "on"
# Short reconcile cadence and zero retention so a pass acts within the
# test window instead of the conservative production defaults
# (retention_days=0 makes an aged-out orphan eligible at once).
set_conf_line "pg_durable.reconcile_interval" "2"
set_conf_line "pg_durable.retention_days" "0"
;;
http-disabled)
set_conf_line "shared_preload_libraries" "'pg_durable'"
set_conf_line "pg_durable.worker_role" "'postgres'"
Expand Down Expand Up @@ -506,7 +525,7 @@ prepare_phase() {
http-allow-all)
build_extension_http_allow_all
;;
no-preload|standard|superuser-guc-off|connlimit-backpressure|connlimit-timeout|connlimit-startup)
no-preload|standard|superuser-guc-off|connlimit-backpressure|connlimit-timeout|connlimit-startup|reconcile)
# Rebuild if previous phase changed the Cargo features
if [ "$CURRENT_FEATURES" != "http-allow-test-domains" ]; then
build_extension
Expand Down Expand Up @@ -560,6 +579,9 @@ prepare_phase() {
;;
connlimit-startup)
;;
reconcile)
wait_for_worker_ready
;;
http-disabled|http-allow-all)
ensure_e2e_role
wait_for_worker_ready
Expand Down
10 changes: 6 additions & 4 deletions scripts/test-upgrade.sh
Original file line number Diff line number Diff line change
Expand Up @@ -294,10 +294,12 @@ if [ -f "$DATA_DIR/postgresql.conf" ]; then
sed -i.bak '/^#*port = /d' "$DATA_DIR/postgresql.conf"
echo "port = $PG_PORT" >> "$DATA_DIR/postgresql.conf"
fi
# Clear any connection-limit GUCs that E2E test phases (connlimit-*) may have
# left behind. Without this, max_duroxide_connections=1 causes the BGW to
# refuse to start, breaking the B1 wait-for-readiness check.
sed -i.bak '/^[#[:space:]]*pg_durable\.max_/d; /^[#[:space:]]*pg_durable\.execution_/d' "$DATA_DIR/postgresql.conf"
# Clear any per-phase GUCs that E2E test phases may have left in the shared
# postgresql.conf. Without this, the connlimit-* phases' max_duroxide_connections=1
# causes the BGW to refuse to start (breaking the B1 wait-for-readiness check),
# and the reconcile phase's aggressive reconcile_interval/retention_days would
# remove terminal instances mid-test and make df.result() flaky.
sed -i.bak '/^[#[:space:]]*pg_durable\.max_/d; /^[#[:space:]]*pg_durable\.execution_/d; /^[#[:space:]]*pg_durable\.reconcile_/d; /^[#[:space:]]*pg_durable\.retention_/d' "$DATA_DIR/postgresql.conf"
fi

# If the server is already running, restart it so both the freshly installed
Expand Down
102 changes: 76 additions & 26 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,16 @@ pub static ENABLE_SUPERUSER_INSTANCES: GucSetting<bool> = GucSetting::<bool>::ne
/// `docs/api-reference.md` and `USER_GUIDE.md`; update those mirrors if you change them.
pub static LIST_INSTANCES_MAX_LIMIT: GucSetting<i32> = GucSetting::<i32>::new(1000);

/// Days a terminal ('completed'/'failed'/'cancelled') instance is retained
/// before reconciliation removes it and its engine record. The same age bound
/// governs when orphaned engine records left by a rolled-back `df.start()` are
/// reclaimed. `0` removes terminal instances as soon as the next pass runs.
pub static RETENTION_DAYS: GucSetting<i32> = GucSetting::<i32>::new(30);

/// Seconds between background reconciliation passes. Each pass removes expired
/// terminal instances and reclaims orphaned engine records. `0` disables it.
pub static RECONCILE_INTERVAL: GucSetting<i32> = GucSetting::<i32>::new(3600);

// Module declarations
pub mod activities;
pub mod client;
Expand Down Expand Up @@ -163,6 +173,28 @@ pub extern "C-unwind" fn _PG_init() {
GucFlags::default(),
);

GucRegistry::define_int_guc(
c"pg_durable.retention_days",
c"Days a terminal instance is retained before reconciliation removes it",
c"Background reconciliation removes 'completed'/'failed'/'cancelled' df.instances rows (and their engine records) once they are older than this many days, subject to a fixed hard cap on the number retained. The same age bound governs when orphaned engine records left by a rolled-back df.start() are reclaimed. 0 removes terminal instances as soon as the next pass runs.",
&RETENTION_DAYS,
0,
36500,
GucContext::Postmaster,
GucFlags::default(),
);

GucRegistry::define_int_guc(
c"pg_durable.reconcile_interval",
c"Seconds between background reconciliation passes (0 disables reconciliation)",
c"Each background reconciliation pass removes expired terminal instances and reclaims orphaned engine records left by a rolled-back df.start(). Set to 0 to disable background reconciliation entirely.",
&RECONCILE_INTERVAL,
0,
86400,
GucContext::Postmaster,
GucFlags::default(),
);

worker::register_background_worker();
}

Expand Down Expand Up @@ -947,23 +979,23 @@ mod tests {
)
}

async fn delete_prune_test_rows(pool: &sqlx::PgPool, id_list: &str) {
let mut tx = pool.begin().await.expect("begin cleanup transaction");
async fn delete_expired_test_rows(pool: &sqlx::PgPool, id_list: &str) {
let mut tx = pool.begin().await.expect("begin teardown transaction");
sqlx::query("SET CONSTRAINTS ALL DEFERRED")
.execute(&mut *tx)
.await
.expect("defer cleanup constraints");
.expect("defer teardown constraints");
sqlx::query(&format!(
"DELETE FROM df.nodes WHERE instance_id IN ({id_list})"
))
.execute(&mut *tx)
.await
.expect("clean prune test nodes");
.expect("clean test nodes");
sqlx::query(&format!("DELETE FROM df.instances WHERE id IN ({id_list})"))
.execute(&mut *tx)
.await
.expect("clean prune test instances");
tx.commit().await.expect("commit cleanup");
.expect("clean test instances");
tx.commit().await.expect("commit teardown");
}

// ========================================================================
Expand Down Expand Up @@ -1507,7 +1539,7 @@ mod tests {
}

#[pg_test]
fn test_prune_terminal_instances_respects_age_and_keep_count() {
fn test_expired_instance_removal_respects_age_and_keep_count() {
let conn = test_database_connection_string();

let rt = tokio::runtime::Builder::new_current_thread()
Expand All @@ -1531,7 +1563,7 @@ mod tests {
.collect::<Vec<_>>()
.join(", ");

delete_prune_test_rows(&pool, &id_list).await;
delete_expired_test_rows(&pool, &id_list).await;

let mut fixture_tx = pool.begin().await.expect("begin fixture transaction");
sqlx::query("SET CONSTRAINTS ALL DEFERRED")
Expand All @@ -1544,10 +1576,10 @@ mod tests {
-- by COALESCE(completed_at, created_at):
-- aa261001 (1d, r1) -> keep (within cap, young)
-- aa261002 (2d, r2) -> keep (within cap, young)
-- aa261003 (3d, r3) -> PRUNE by the hard cap even though it is
-- aa261003 (3d, r3) -> REMOVE by the hard cap even though it is
-- only 3 days old (rank > max_keep)
-- aa261004 (40d, r4) -> PRUNE (beyond cap and past retention)
-- aa261005 (50d, r5) -> PRUNE (beyond cap and past retention)
-- aa261004 (40d, r4) -> REMOVE (beyond cap and past retention)
-- aa261005 (50d, r5) -> REMOVE (beyond cap and past retention)
-- aa261006 (running) -> keep (non-terminal, never considered)
-- aa261004/aa261005 are 'failed'/'cancelled' (NULL completed_at) with
-- a forged far-future `updated_at`; they must still rank/age by their
Expand All @@ -1556,9 +1588,9 @@ mod tests {
VALUES
('aa261001', 'keep-recent-1', 'bb261001', 'completed', 1, true, false),
('aa261002', 'keep-recent-2', 'bb261002', 'completed', 2, true, false),
('aa261003', 'prune-young-over-cap', 'bb261003', 'completed', 3, true, false),
('aa261004', 'prune-old-failed-forged', 'bb261004', 'failed', 40, false, true),
('aa261005', 'prune-old-cancelled-forged', 'bb261005', 'cancelled', 50, false, true),
('aa261003', 'expire-young-over-cap', 'bb261003', 'completed', 3, true, false),
('aa261004', 'expire-old-failed-forged', 'bb261004', 'failed', 40, false, true),
('aa261005', 'expire-old-cancelled-forged', 'bb261005', 'cancelled', 50, false, true),
('aa261006', 'keep-running', 'bb261006', 'running', 90, false, false)
)
INSERT INTO df.instances
Expand All @@ -1582,7 +1614,7 @@ mod tests {
)
.execute(&mut *fixture_tx)
.await
.expect("insert prune instances");
.expect("insert test instances");

sqlx::query(
r#"
Expand Down Expand Up @@ -1610,12 +1642,12 @@ mod tests {
)
.execute(&mut *fixture_tx)
.await
.expect("insert prune nodes");
.expect("insert test nodes");

let stats =
crate::worker::prune_terminal_instances_transaction(&mut fixture_tx, 30, 2)
crate::worker::delete_expired_instances_transaction(&mut fixture_tx, 30, 2)
.await
.expect("prune terminal instances");
.expect("delete expired instances");

let remaining_instances: i64 = sqlx::query_scalar(&format!(
"SELECT pg_catalog.count(*)::bigint FROM df.instances WHERE id IN ({id_list})"
Expand Down Expand Up @@ -1644,19 +1676,37 @@ mod tests {
fixture_tx
.rollback()
.await
.expect("rollback prune test fixture");
.expect("rollback test fixture");

pool.close().await;
stats
});

assert_eq!(
stats,
crate::worker::PruneStats {
instances_deleted: 3,
nodes_deleted: 3,
}
);
assert_eq!(stats.instances_deleted, 3);
assert_eq!(stats.nodes_deleted, 3);
assert_eq!(stats.deleted_ids.len(), 3);
}

#[pg_test]
fn test_select_orphans_excludes_present_and_sub_orchestrations() {
use std::collections::HashSet;

// Failed engine instances reconciliation sees: a df-backed one, a genuine
// orphan (rolled-back df.start), and two sub-orchestrations the engine
// created internally for fan-out.
let failed = vec![
"inst-backed".to_string(),
"inst-orphan".to_string(),
"sub::inst-backed::0".to_string(),
"inst-backed::sub::1".to_string(),
];
// Only inst-backed still has a df.instances row.
let present: HashSet<String> = ["inst-backed".to_string()].into_iter().collect();

let orphans = crate::worker::select_orphans(failed, &present);

// Only the df-less, non-sub instance is reclaimed.
assert_eq!(orphans, vec!["inst-orphan".to_string()]);
}

// ========================================================================
Expand Down
11 changes: 11 additions & 0 deletions src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,17 @@ pub fn get_execution_acquire_timeout() -> Duration {
Duration::from_secs(crate::EXECUTION_ACQUIRE_TIMEOUT.get() as u64)
}

/// Days a terminal instance is retained before reconciliation removes it and its
/// engine record; also the age bound for reclaiming orphaned engine records.
pub fn get_retention_days() -> i32 {
crate::RETENTION_DAYS.get()
}

/// Interval between background reconciliation passes. Zero disables reconciliation.
pub fn get_reconcile_interval() -> Duration {
Duration::from_secs(crate::RECONCILE_INTERVAL.get() as u64)
}

/// Returns `true` when superuser-submitted instances are permitted.
pub fn superuser_instances_enabled() -> bool {
crate::ENABLE_SUPERUSER_INSTANCES.get()
Expand Down
Loading
Loading