From a8337236e09d414838411dcf4377f7c9c3538d60 Mon Sep 17 00:00:00 2001 From: tjgreen42 <1738591+tjgreen42@users.noreply.github.com> Date: Wed, 1 Jul 2026 15:02:06 +0000 Subject: [PATCH] Fail fast in df.start() when the engine hand-off fails df.start() writes its df rows in the caller's transaction and hands the workflow to the durable engine over a separate connection. A failed hand-off was swallowed (logged), so the caller committed an instance row for a workflow that never started and would never run. Raise on a failed hand-off instead, aborting the caller's transaction so the instance rows roll back with it. The caller gets a clear, retryable error rather than a silently stuck instance. The pgrx unit-test build does not run the background worker, so the hand-off always fails there; that build logs instead of aborting (matching the existing validate_database test-build carve-out) so df.start()'s graph construction stays unit-testable. Test: tests/e2e/sql/25_start_fail_fast.sql. --- src/dsl.rs | 18 +++++-- tests/e2e/sql/25_start_fail_fast.sql | 79 ++++++++++++++++++++++++++++ 2 files changed, 93 insertions(+), 4 deletions(-) create mode 100644 tests/e2e/sql/25_start_fail_fast.sql diff --git a/src/dsl.rs b/src/dsl.rs index 9a3ef22..f250c43 100644 --- a/src/dsl.rs +++ b/src/dsl.rs @@ -1065,10 +1065,20 @@ pub fn start( &instance_id, &input_json, ) { - pgrx::log!( - "pg_durable: Warning - failed to start durable function: {}", - e - ); + // Fail fast: the durable engine could not accept the start, so abort the + // whole df.start() transaction rather than committing an instance row + // that would never run (a stuck instance nothing recovers). The df rows + // and this error are in the caller's transaction, so the abort rolls + // them back cleanly; the caller can retry. + // + // The pgrx unit-test build does not run the background worker (see the + // test-build note on validate_database in lib.rs), so the enqueue always + // fails there; in that build we log instead of aborting so df.start()'s + // graph construction stays unit-testable. + #[cfg(not(any(test, feature = "pg_test")))] + pgrx::error!("failed to start durable function: {e}"); + #[cfg(any(test, feature = "pg_test"))] + pgrx::log!("pg_durable: start enqueue failed (test build, ignored): {e}"); } instance_id diff --git a/tests/e2e/sql/25_start_fail_fast.sql b/tests/e2e/sql/25_start_fail_fast.sql new file mode 100644 index 0000000..c0c7a87 --- /dev/null +++ b/tests/e2e/sql/25_start_fail_fast.sql @@ -0,0 +1,79 @@ +-- Copyright (c) Microsoft Corporation. +-- Licensed under the PostgreSQL License. + +-- df.start() must fail fast when the durable engine cannot accept the start. +-- +-- df.start() writes its df.instances/df.nodes rows in the caller's transaction +-- but hands the workflow to the engine over a separate connection. If that +-- hand-off fails, df.start() must abort the whole transaction rather than +-- commit an instance row that would never run (a "stuck" instance nothing +-- recovers). This test forces the hand-off to fail by marking the background +-- worker not-ready, then asserts df.start() raises and leaves no row behind. +-- +-- Readiness is toggled via ._worker_ready.schema_version (the same +-- signal the client's readiness probe reads). The original value is saved and +-- restored inside one DO block so a mid-test failure never leaves the server +-- wedged for later tests. + +DO $$ +DECLARE + dx_schema TEXT := df.duroxide_schema(); + orig INT; + raised BOOLEAN := FALSE; + leftover BOOLEAN; +BEGIN + -- Save the real readiness version, then break it so the client's readiness + -- probe reports "worker not ready" and the engine hand-off fails. + EXECUTE format('SELECT schema_version FROM %I._worker_ready LIMIT 1', dx_schema) INTO orig; + EXECUTE format('UPDATE %I._worker_ready SET schema_version = -1', dx_schema); + + -- df.start() should raise. The graph rows it inserted are rolled back with + -- the surrounding subtransaction when the exception is caught. + BEGIN + PERFORM df.start('SELECT 1', 'fail-fast-probe'); + EXCEPTION WHEN OTHERS THEN + raised := TRUE; + END; + + leftover := EXISTS (SELECT 1 FROM df.instances WHERE label = 'fail-fast-probe'); + + -- Restore readiness BEFORE asserting so a failed assertion never leaves the + -- worker marked not-ready for subsequent tests. + EXECUTE format('UPDATE %I._worker_ready SET schema_version = %L', dx_schema, orig); + + IF NOT raised THEN + RAISE EXCEPTION 'TEST FAILED [fail_fast]: df.start() did not raise when the engine could not accept the start'; + END IF; + IF leftover THEN + RAISE EXCEPTION 'TEST FAILED [fail_fast]: df.start() left a committed instance row after a failed start'; + END IF; + + RAISE NOTICE 'PASSED [fail_fast]: df.start() aborted and committed no instance row'; +END $$; + +-- Sanity: with readiness restored, a normal df.start() still works end-to-end. +SELECT public._e2e_wait_for_worker_ready(30); + +CREATE TEMP TABLE _ff_state (instance_id TEXT); +INSERT INTO _ff_state SELECT df.start('SELECT 1', 'fail-fast-ok'); + +DO $$ +DECLARE inst_id TEXT; status TEXT; attempts INT := 0; +BEGIN + SELECT instance_id INTO inst_id FROM _ff_state; + LOOP + SELECT s INTO status FROM df.status(inst_id) s; + EXIT WHEN lower(status) IN ('completed', 'failed', 'cancelled') OR attempts > 300; + PERFORM pg_sleep(0.1); + attempts := attempts + 1; + END LOOP; + + IF lower(COALESCE(status, 'pending')) <> 'completed' THEN + RAISE EXCEPTION 'TEST FAILED [normal_start]: df.start() did not complete after readiness restored (status=%)', status; + END IF; + RAISE NOTICE 'PASSED [normal_start]: df.start() completes normally when the worker is ready'; +END $$; + +DROP TABLE _ff_state; + +SELECT 'TEST PASSED: start fail-fast' AS result;