Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 57 additions & 3 deletions USER_GUIDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,10 @@ pg_durable is a PostgreSQL extension that brings durable, fault-tolerant functio
13. [Monitoring](#monitoring)
14. [User Isolation & Privileges](#user-isolation--privileges)
15. [Connection Limits](#connection-limits)
16. [Troubleshooting](#troubleshooting)
17. [Quick Reference Card](#quick-reference-card)
18. [Appendix: Test Data Setup](#appendix-test-data-setup)
16. [Start Reconciliation](#start-reconciliation)
17. [Troubleshooting](#troubleshooting)
18. [Quick Reference Card](#quick-reference-card)
19. [Appendix: Test Data Setup](#appendix-test-data-setup)

---

Expand Down Expand Up @@ -2012,6 +2013,59 @@ pg_durable.execution_acquire_timeout = 60

---

## Start Reconciliation

pg_durable keeps its own bookkeeping in the `df` schema, while the workflow
engine (duroxide) keeps running-workflow state in its own schema. `df.start()`
writes the `df` rows for a new instance **in your transaction** and then lets the
background worker start the engine from that committed record. This keeps the two
schemas in sync without sharing a transaction:

- **If your transaction rolls back**, the `df` rows disappear and the engine was
never told to run — so a rolled-back `df.start()` never leaves a "ghost"
workflow running with no `df` record.
- **Once your transaction commits**, the instance is *always* eventually started,
even if the worker was briefly down when you called `df.start()`.

Two mechanisms converge the state:

1. **Instant path** — `df.start()` sends a transactional `NOTIFY` (delivered only
if the transaction commits). The worker is listening and starts the instance
immediately.
2. **Backstop sweep** — the worker periodically scans for committed-but-unstarted
instances (a missed notification) and starts any it finds that the engine does
not yet know about.

Under normal operation the instant path starts workflows in milliseconds; the
sweep exists only to guarantee eventual convergence.

### GUC Reference

Both GUCs are **Postmaster-context** — set them in `postgresql.conf` and restart
PostgreSQL.

```ini
# postgresql.conf

# How often (seconds) the worker sweeps for committed-but-unstarted instances.
# Set to 0 to disable the periodic sweep (the instant NOTIFY path stays on).
pg_durable.reconcile_interval = 15

# Minimum age (seconds) a pending instance must reach before the sweep starts it.
# Keeps the sweep from racing the instant NOTIFY path for freshly-committed work.
pg_durable.reconcile_grace = 300
```

The defaults suit most deployments. Lower `reconcile_grace` if you want the
backstop to recover missed starts sooner; raise `reconcile_interval` to reduce
background query load on very large `df.instances` tables.

> **Note:** Reconciliation activates only on schema version 0.2.4 and later
> (when `df.instances.start_input` exists). On an older schema `df.start()`
> starts the engine inline instead, so there is nothing to reconcile.

---

## Troubleshooting

### Extension Exists But Workflows Don't Start
Expand Down
162 changes: 162 additions & 0 deletions docs/spec-reconcile-start.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
# Start Reconciliation Specification

## The problem, in plain terms

pg_durable stores two copies of "what workflows exist":

- The **`df` schema** — pg_durable's own bookkeeping (`df.instances`, `df.nodes`),
written by `df.start()` in the caller's transaction.
- The **duroxide schema** — the workflow engine's runtime state (its queue and
instance history), which actually makes a workflow run.

These are two separate systems. Historically `df.start()` wrote the `df` rows in
your transaction but told the engine to run the workflow **on a separate
connection**, which commits independently. That split caused two failure modes:

1. **Ghost workflow (rollback leak).** You call `df.start()` and then your
transaction rolls back. Your `df` rows vanish — but the engine was already
told to run the workflow on that other connection, so it keeps running with no
`df` record behind it.

2. **Stuck workflow (lost start).** The `df` rows commit, but the message telling
the engine to run never lands (worker was down, connection dropped). Now there
is a workflow "on paper" in `df.instances` that never executes.

Both come from the same root cause: **the `df` write and the engine start happen
in two different transactions, with nothing reconciling them afterward.**

## The approach

Stop trying to force both writes into one transaction, and stop reaching into the
engine's internal SQL. Instead, treat the committed `df` row as the **source of
truth (the intent)** and let the background worker converge the engine to match,
going only through duroxide's **Rust API** (`Client::start_orchestration`,
`Client::get_orchestration_status`).

`df.start()` now:

1. Writes the instance rows **and a `start_input` payload** (the vars snapshot +
label captured at start time) into `df` — all in the caller's transaction.
2. Issues a transactional `NOTIFY pg_durable_start` (delivered **only** if the
transaction commits).
3. Does **not** start the engine itself.

The background worker converges the engine two ways:

- **Instant path.** It `LISTEN`s on `pg_durable_start`. On a notification it starts
the just-committed instance immediately (typically single-digit milliseconds).
- **Backstop sweep.** Every `reconcile_interval` seconds it scans for `pending`
instances older than `reconcile_grace` that the engine still does not know about
(`get_orchestration_status == NotFound`) and starts them. This covers a missed
notification (worker restart, dropped connection).

Why this fixes both modes:

- **Ghost:** the engine is only ever started by the worker from a *committed* `df`
row. A rolled-back `df.start()` leaves no row and delivers no notification, so
nothing is ever started. No ghost is possible.
- **Stuck:** even if the instant notification is lost, the sweep eventually finds
the committed-but-unstarted row and starts it. Convergence is guaranteed,
including the case where a worker dies mid-start (see "Exactly-once start").

### Why go through the Rust API instead of the engine's SQL

The engine's SQL schema is an internal implementation detail, not its contract.
Its contract is "start an orchestration." Driving it through
`Client::start_orchestration` keeps pg_durable decoupled from how duroxide happens
to store state, and it works against any duroxide provider. The philosophical
framing (per review discussion): duroxide's model is async, at-least-once,
converge-later; Postgres's model is synchronous and transactional. pg_durable
lives on Postgres but follows duroxide's model, so it records intent
transactionally and converges the async engine afterward rather than pretending
the two can share one transaction.

### Exactly-once start

The instant path and the sweep can both notice the same instance. Start-once is
guaranteed by an atomic claim: the worker flips the row to `running` with
`UPDATE ... RETURNING start_input`. Only the one caller that wins the claim calls
the engine. If that engine call fails, the claim is rolled back to `pending` so a
later sweep retries.

The claim commits before the engine start (two systems, two transactions), so a
hard worker crash *between* the claim and the engine start would otherwise strand
the row in `running` forever — a state the `pending`-only sweep could never
recover, silently violating the convergence guarantee. To close this window the
sweep also reconciles **stale claims**: a `running` row the engine does not know
about (`get_orchestration_status == NotFound`) whose claim is older than the grace
is re-claimed and re-started. The `NotFound` gate ensures a genuinely-running
instance is never restarted, and the age guard avoids racing a start that is
merely mid-dispatch.

### Trust boundary: `start_input` is caller-writable

`df.instances.start_input` is written by the caller of `df.start()`, and the
orchestration selects which graph to load — and which role runs the SQL — from the
input's instance id. The worker must therefore **never trust the instance id
embedded in the payload**: it forces the engine input's instance id to the claimed
row id, and the orchestration independently uses the durable runtime's instance id
(`ctx.instance_id()`) rather than the payload's. Otherwise a low-privilege user
could INSERT a crafted `pending` row whose `start_input` points at another user's
instance and have the superuser worker run the victim's workflow as the victim.
The persisted vars/label are still replayed (they belong to the caller's own
instance), only the identity is re-derived from trusted state.

## Schema changes (0.2.4)

- `df.instances.start_input JSONB` (nullable) — the `FunctionInput` captured at
`df.start()` time so the worker can replay the exact start payload. The worker
overrides the payload's instance id with the claimed row id before replaying it
(see the trust boundary above). Rows written before this column existed replay
with an empty vars set.
- Added to the `df.grant_usage()` / `df.revoke_usage()` INSERT column list; the
upgrade script backfills `GRANT INSERT (start_input)` to existing df-usage roles.

### GUCs (Postmaster-context)

| GUC | Default | Purpose |
|-----|---------|---------|
| `pg_durable.reconcile_interval` | `15` | Seconds between backstop sweeps. `0` disables the sweep (instant path stays on). |
| `pg_durable.reconcile_grace` | `300` | Minimum age a pending instance must reach before the sweep starts it. |

## Upgrade & Migration

- **Backward compatibility (B1).** The new `.so` must run against un-upgraded
0.2.2 / 0.2.3 schemas that lack `start_input`. `df.start()` gates on the
installed extension version: `>= 0.2.4` uses the new intent path; older uses the
legacy inline start. The worker checks for the `start_input` column before
running any reconciliation, and re-checks it on each sweep so an in-place
`ALTER EXTENSION UPDATE` (which does not restart the worker) enables
reconciliation mid-epoch. The `LISTEN` is always established because `df.start()`
only sends the notification once the column exists.
- **Upgrade script.** `sql/pg_durable--0.2.3--0.2.4.sql` adds the column (appended
last, matching the fresh-install column order), updates the grant/revoke, and
backfills the column grant to existing df roles.
- **No duroxide schema change** — this feature only uses the duroxide Rust API, so
it requires no changes to the embedded duroxide migrations.

## Scope

This change fixes the **start** direction (df has it, engine does not). The
reverse orphan-cleanup direction (engine-has-it / df-does-not) is out of scope.
`df.cancel()` / `df.signal()` remain on the existing client path; as a targeted
exception, the worker re-issues a cancel to the engine if an instance was moved to
`cancelled` between the claim and the start, so a cancel that races the deferred
start is not silently ignored.

## Testing

`tests/e2e/sql/25_reconcile_start.sql` exercises the failure modes end-to-end:

- **Scenario 1 (ghost):** `BEGIN; SELECT df.start(...); ROLLBACK;` then assert the
duroxide runtime has no residue for that instance. (Verified RED on `main`.)
- **Scenario 2 (stuck):** commit a backdated `pending` instance directly (no
notification), then assert the worker's sweep starts and completes it.
- **Scenario 3 (stale claim):** commit a backdated `running` instance the engine
never started (simulating a crash between the claim and the engine start), then
assert the sweep recovers and completes it.

`tests/e2e/sql/26_reconcile_security.sql` constructs the cross-tenant attack — a
low-privilege role INSERTs a crafted `pending` row whose `start_input` points at a
victim instance and triggers it via `pg_notify` — and asserts the attacker's row
runs its own graph while the victim's protected table is never written.
6 changes: 6 additions & 0 deletions docs/upgrade-testing.md
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,12 @@ what the upgrade script handles, and any backward compatibility considerations.
- **Scenario B2 considerations:** No data migration. The `CREATE FUNCTION` adds catalog metadata only; `df.instances` rows are untouched. The new `created_at`/`completed_at` result columns are read from columns that already exist and are already populated on every prior install.
- **Dependent-object note:** Because the upgrade only adds a function (no `DROP FUNCTION`), no customer-owned object that depends on the existing two-argument `df.list_instances` is affected — there is nothing to drop or repoint.

#### Start reconciliation — add `df.instances.start_input` (start-convergence)
- **DDL change (df schema):** Adds a nullable `df.instances.start_input JSONB` column that `df.start()` populates with the `FunctionInput` payload (instance id, label, vars snapshot, loop iteration) captured at start time, so the background worker can start the durable engine from committed intent rather than out-of-band on a separate connection. Fresh installs (`src/lib.rs`) declare the column **last** in `df.instances` (after `completed_at`); the upgrade script `sql/pg_durable--0.2.3--0.2.4.sql` runs `ALTER TABLE df.instances ADD COLUMN start_input JSONB`, which also appends it last — so both paths yield identical column ordinals. The column is added to the `df.grant_usage()` / `df.revoke_usage()` INSERT column list on both paths (`df.start()` runs as the caller and writes it), and the upgrade script backfills `GRANT INSERT (start_input)` to every role that already holds column-level INSERT on `df.instances` (enumerated via the `id` column's `attacl`, which every prior `df.grant_usage()` granted) so existing df users can keep calling `df.start()` without re-running `df.grant_usage()`. Two Postmaster GUCs are added (`pg_durable.reconcile_interval` default 15, `pg_durable.reconcile_grace` default 300). No duroxide schema change — reconciliation uses only the duroxide Rust `Client` API.
- **Scenario A considerations:** `ALTER TABLE ... ADD COLUMN start_input JSONB` appends the column at the last ordinal; the fresh-install `CREATE TABLE df.instances` places `start_input` last as well, so the post-upgrade catalog matches a fresh 0.2.4 install column-for-column. The grant/revoke bodies are `CREATE OR REPLACE`d identically on both paths.
- **Scenario B1 considerations:** The new `.so` must run against pre-0.2.4 schemas (0.2.2/0.2.3) that lack `start_input`. `df.start()` gates on the installed extension version (`start_input_supported()` → `>= 0.2.4`): on an older schema it takes the **legacy inline start path** (out-of-band `start_durable_function`, exactly as before), so no reference to the missing column is emitted. The background worker probes for the `start_input` column before running any reconciliation and **re-probes on each sweep tick** (monotonic false→true), so an in-place `ALTER EXTENSION UPDATE` — which does not restart the worker or start a new epoch — enables reconciliation mid-epoch without a restart. The `LISTEN pg_durable_start` is always established (schema-independent); a notification can only arrive once `df.start()` uses the intent path, which only happens after the column exists, so the notify handler never references a missing column.
- **Scenario B2 considerations:** No data migration of existing rows. Pre-upgrade `df.instances` rows have `start_input = NULL`; if such a row is ever reconciled, the worker synthesizes a minimal `{instance_id}` input (empty vars). After `ALTER EXTENSION UPDATE`, a new `df.start()` switches to the intent+NOTIFY path and the running worker starts it via the always-on listener (and the re-probing sweep as backstop).

### v0.2.2 → v0.2.3

#### Rename duroxide provider schema to `_duroxide` for fresh installs
Expand Down
28 changes: 28 additions & 0 deletions scripts/test-upgrade.sh
Original file line number Diff line number Diff line change
Expand Up @@ -923,6 +923,7 @@ echo ""
B2_PRE_INSTANCE_ID=""
B2_INFLIGHT_INSTANCE_ID=""
B2_POST_INSTANCE_ID=""
B2_BACKFILL_ROLE="durable_b2_backfill_probe"

test_b2_data_survives_upgrade() {
# Step 1: Install previous version and create test data
Expand All @@ -933,6 +934,15 @@ test_b2_data_survives_upgrade() {
assert_sql_equals "SELECT df.clearvars();" "OK" || return 1
assert_sql_equals "SELECT df.setvar('b2_key', 'b2_value');" "OK" || return 1

# Create a NON-superuser df role and grant it df usage on the PREVIOUS
# schema (which has no start_input column), so the 0.2.4 upgrade's grant
# backfill has a pre-existing target. test_b2_start_input_grant_backfill_
# after_upgrade verifies this role can still call df.start() after upgrade.
run_sql_capture "DROP OWNED BY ${B2_BACKFILL_ROLE};" >/dev/null 2>&1 || true
run_sql_capture "DROP ROLE IF EXISTS ${B2_BACKFILL_ROLE};" >/dev/null 2>&1 || true
run_sql_capture "CREATE ROLE ${B2_BACKFILL_ROLE} LOGIN;" >/dev/null || return 1
run_sql_capture "SELECT df.grant_usage('${B2_BACKFILL_ROLE}');" >/dev/null || return 1

B2_PRE_INSTANCE_ID=$(run_sql_capture "SELECT df.start('INSERT INTO test_upgrade_b2_log (kind, msg) VALUES (''pre'', ''{b2_key}'') RETURNING msg', 'b2-pre-upgrade');") || return 1
B2_INFLIGHT_INSTANCE_ID=$(run_sql_capture "SELECT df.start(df.sleep(2) ~> 'SELECT ''b2-running'' AS value', 'b2-inflight');") || return 1

Expand Down Expand Up @@ -977,6 +987,23 @@ test_b2_new_data_after_upgrade() {
assert_sql_equals "SELECT msg FROM test_upgrade_b2_log WHERE kind = 'post' ORDER BY id DESC LIMIT 1;" "new_value"
}

test_b2_start_input_grant_backfill_after_upgrade() {
# The 0.2.4 upgrade adds df.instances.start_input and df.start() switches to
# the intent path (INSERT ... start_input). A role granted df usage BEFORE the
# upgrade only holds INSERT on the old columns, so without the upgrade's grant
# backfill its df.start() would fail with "permission denied for column
# start_input". B2_BACKFILL_ROLE was created and granted df usage on the
# PREVIOUS schema (see test_b2_data_survives_upgrade), so this asserts the
# backfill extended its INSERT privilege to the new column.
assert_sql_equals \
"SELECT has_column_privilege('${B2_BACKFILL_ROLE}', 'df.instances', 'start_input', 'INSERT');" \
"t"
local rc=$?

run_sql_capture "DROP OWNED BY ${B2_BACKFILL_ROLE}; DROP ROLE IF EXISTS ${B2_BACKFILL_ROLE};" >/dev/null 2>&1 || true
return $rc
}

test_b2_grant_usage_after_upgrade() {
# Regression guard for #110: after ALTER EXTENSION UPDATE, df.debug_connection()
# must be gone from the catalog. Scenario A only compares function name/args/
Expand Down Expand Up @@ -1012,6 +1039,7 @@ if [ "$HAS_COMPAT_PREV" = true ]; then
run_test "B2: Pre-upgrade instance remains queryable" test_b2_pre_upgrade_instance_after_upgrade
run_test "B2: In-flight work completes after upgrade" test_b2_inflight_work_after_upgrade
run_test "B2: New data and execution after upgrade" test_b2_new_data_after_upgrade
run_test "B2: Pre-upgrade df role can df.start() after upgrade (start_input grant backfill)" test_b2_start_input_grant_backfill_after_upgrade
run_test "B2: df.grant_usage() works and df.debug_connection() is gone after upgrade" test_b2_grant_usage_after_upgrade
fi

Expand Down
Loading
Loading