Reconcile df.start() with the durable engine via committed intent#281
Closed
tjgreen42 wants to merge 2 commits into
Closed
Reconcile df.start() with the durable engine via committed intent#281tjgreen42 wants to merge 2 commits into
tjgreen42 wants to merge 2 commits into
Conversation
df.start() previously wrote its df rows in the caller's transaction but started the workflow engine out-of-band on a separate connection. Those two writes could diverge: - Ghost: a rolled-back df.start() left the engine running a workflow with no df record behind it. - Stuck: a committed df instance whose engine start was lost never ran. Record the start as committed intent and let the background worker converge the engine to match, going only through duroxide's Rust API (no shared transaction, no touching duroxide's internal schema): - df.start() persists a start_input payload on df.instances and issues a transactional NOTIFY; it no longer starts the engine itself. - The worker LISTENs for instant starts and periodically sweeps for committed-but-unstarted instances the engine does not yet know about, starting each exactly once via an atomic pending->running claim. A rolled-back df.start() therefore starts nothing, and a committed instance is always eventually started. Schema (0.2.4): add df.instances.start_input JSONB (fresh install + upgrade script), extend the grant/revoke INSERT column list, and backfill the column grant to existing df-usage roles. Two Postmaster GUCs control the sweep: pg_durable.reconcile_interval (15s) and reconcile_grace (300s). Backward compatible: on a pre-0.2.4 schema df.start() keeps the inline start, and the worker re-probes for the column so an in-place ALTER EXTENSION UPDATE enables reconciliation without a restart. Tests: tests/e2e/sql/25_reconcile_start.sql exercises both failure modes (RED on main); upgrade test adds grant-backfill coverage. Docs: USER_GUIDE section, docs/spec-reconcile-start.md, upgrade-testing notes.
Address findings from the PR review: - Security (High): the worker replayed the caller-writable start_input verbatim, and the orchestration trusted the instance id embedded in it to load the graph (on the superuser pool, bypassing RLS). A low-privilege user could INSERT a crafted pending row whose start_input pointed at another user's instance and have the worker run the victim's workflow as the victim. The worker now forces the engine input's instance id to the claimed row id, and execute() uses the durable runtime's instance id (ctx.instance_id()) instead of the payload's. Added tests/e2e/sql/26_reconcile_security.sql. - Crash window (High): the claim (pending -> running) commits before the engine start, so a worker crash in between stranded the row in running forever (the pending-only sweep could never recover it). The sweep now also reconciles stale claims: a running row the engine reports NotFound for whose claim is older than the grace is re-claimed and re-started. Added a stale-running recovery scenario to 25_reconcile_start.sql. - Listener recovery (Medium): the start listener was created once and dropped permanently on any recv error, so with reconcile_interval=0 a single error silently stopped all starts. The backstop tick now re-establishes the listener when it drops, honoring the "interval=0 keeps the instant path" contract. - Bounded sweep (Low): the reconcile sweep now LIMITs its batch and checks for shutdown between iterations so a large backlog can't starve shutdown/drop detection. - Cancel race: if an instance is cancelled between the claim and the start, the worker re-issues the cancel to the engine so the workflow does not run while df reads cancelled. - Restart latency: a startup catch-up sweep (zero grace) starts instances whose NOTIFY was missed while the worker was down, instead of waiting out the grace.
Contributor
Author
|
Superseding this approach. After review discussion we're pivoting away from deferring the start (persist-intent + worker-driven start) to a simpler model: keep the eager start in df.start() but propagate the enqueue error (fail-fast), and add a periodic background reaper that drives duroxide's retention/deletion API. The rolled-back-start 'ghost' is provably inert (the orchestration fails at graph-load before any node runs), so it only needs cleanup, not prevention — and that cleanup doubles as the duroxide-side retention pg_durable currently lacks. Replacement PRs to follow. |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Problem
df.start()writes itsdfrows in the caller's transaction but starts the workflow engine out-of-band on a separate connection. Those two writes can diverge:df.start()leaves the engine running a workflow with nodfrecord behind it.dfinstance whose engine start was lost (worker down, dropped connection) never runs.Root cause: the
dfwrite and the engine start happen in two different transactions with nothing reconciling them.Approach
Treat the committed
dfrow as the source of truth (intent) and let the background worker converge the engine to match, through duroxide's Rust API (Client::start_orchestration/get_orchestration_status).df.start()persists astart_inputpayload ondf.instancesand issues a transactionalNOTIFY; it no longer starts the engine itself.The
dfwrite and the engine start stay in separate transactions and are reconciled asynchronously: a rolled-backdf.start()starts nothing (no ghost), and a committed instance is always eventually started (no stuck).Exactly-once, crash recovery, and trust boundary
UPDATE ... RETURNING; only the winner starts the engine.running. The sweep therefore also recovers stale claims — arunningrow the engine reportsNotFoundfor whose claim is older than the grace is re-claimed and re-started.start_inputis caller-writable and the orchestration selects the graph/role from the input's instance id. The worker forces the engine input's instance id to the claimed row id, andexecute()uses the durable runtime'sctx.instance_id()instead of the payload's — so a craftedstart_inputcannot redirect the superuser worker to run another user's instance.reconcile_interval=0still keeps the instant path); the sweep is batch-bounded and yields to shutdown; a startup catch-up sweep recovers starts whose NOTIFY was missed during downtime.Changes
src/dsl.rs—df.start()intent path (persiststart_input, transactionalpg_notify); legacy inline start retained for pre-0.2.4 schemas (version-gated).src/worker.rs— duroxideClient,LISTEN+ reconcile sweep with stale-claim recovery, exactly-once claim, listener rebuild, bounded batches.src/orchestrations/execute_function_graph.rs— trustctx.instance_id()over the caller-supplied payload.df.instances.start_input JSONB(fresh install + upgrade script), grant/revoke + backfill for existing df roles. GUCspg_durable.reconcile_interval(15s) /reconcile_grace(300s).docs/spec-reconcile-start.md, USER_GUIDE section, upgrade-testing notes;tests/e2e/sql/25_reconcile_start.sql(ghost / stuck / stale-claim, RED on main),tests/e2e/sql/26_reconcile_security.sql(cross-tenant redirect blocked), upgrade grant-backfill coverage.Backward compatibility
New
.soruns against un-upgraded 0.2.2/0.2.3 schemas:df.start()falls back to the inline start until the column exists, and the worker re-probes so an in-placeALTER EXTENSION UPDATEenables reconciliation without a restart.Scope
Start-convergence only. The engine-orphan cleanup direction (engine has it, df does not) is out of scope;
df.cancel()/df.signal()stay on the existing path, with a targeted re-cancel if an instance is cancelled between the claim and the start.Testing
build·clippy(nightly-equivalent) ·fmt· unit 194 · E2E 38 · upgrade 37 — all green locally and in CI.