From 9f2453f4aae29ba6193042a828664dd1572ee79d Mon Sep 17 00:00:00 2001 From: Pino de Candia <32303022+pinodeca@users.noreply.github.com> Date: Fri, 12 Jun 2026 20:53:47 +0000 Subject: [PATCH 1/3] Re-implement df.loop via sub-orchestration to fix non-root continue_as_new df.loop called continue_as_new inline in the main orchestration, so every new generation restarted from graph.root_node_id, re-executing prefix nodes on every iteration. Each df.loop() node now spawns a dedicated child sub-orchestration (execute_loop) that owns continue_as_new; the parent awaits it and runs any suffix nodes exactly once. Relies on duroxide PR #31 (parent link preserved across continue_as_new), pulled in as a git dependency until merged and released. Co-authored-by: copilot-swe-agent Co-authored-by: pinodeca <32303022+pinodeca@users.noreply.github.com> --- Cargo.lock | 4 +- Cargo.toml | 14 +- USER_GUIDE.md | 9 + docs/api-reference.md | 16 +- sql/pg_durable--0.2.3--0.2.4.sql | 11 +- src/node_status.rs | 73 ++- src/orchestrations/execute_function_graph.rs | 562 +++++++++++++++--- src/registry.rs | 4 + tests/e2e/sql/24_nonroot_loop.sql | 208 +++++++ tests/e2e/sql/25_loop_contains_join_race.sql | 152 +++++ tests/e2e/sql/26_loop_in_join_race_branch.sql | 239 ++++++++ tests/e2e/sql/53_inferred_status.sql | 81 +++ 12 files changed, 1282 insertions(+), 91 deletions(-) create mode 100644 tests/e2e/sql/24_nonroot_loop.sql create mode 100644 tests/e2e/sql/25_loop_contains_join_race.sql create mode 100644 tests/e2e/sql/26_loop_in_join_race_branch.sql diff --git a/Cargo.lock b/Cargo.lock index 8e07f322..93f7c86b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -689,8 +689,7 @@ checksum = "1aaf95b3e5c8f23aa320147307562d361db0ae0d51242340f558153b4eb2439b" [[package]] name = "duroxide" version = "0.1.29" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9377f5bf81d9a8ce56a13f913f60ca0e0ba8a9464349c407e7d11c326488bf0c" +source = "git+https://github.com/microsoft/duroxide.git?branch=pinodeca%2Fcontinue-parent-link#6650410626110f5e29c32269bec0156b15d6ed7c" dependencies = [ "async-trait", "futures", @@ -702,6 +701,7 @@ dependencies = [ "tokio-util", "tracing", "tracing-subscriber", + "uuid", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index 8b11990f..47fd1f6b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -34,7 +34,14 @@ serde_json = { version = "1.0", features = ["preserve_order"] } uuid = { version = "1.0", features = ["v4", "serde"] } # duroxide integration -duroxide = "=0.1.29" +# Using git dependency on pinodeca/continue-parent-link branch which preserves the +# parent link when a sub-orchestration calls continue_as_new, unblocking the +# sub-orchestration approach for df.loop. +# Compatibility: duroxide-pg 0.1.34 has been verified to compile and run correctly +# against this branch (same public API as 0.1.29 — PR #31 is a runtime-only change). +# Once the branch is merged and a new duroxide release is published, revert both +# entries back to crates.io version pins as a compatible pair. +duroxide = { git = "https://github.com/microsoft/duroxide.git", branch = "pinodeca/continue-parent-link" } duroxide-pg = "=0.1.34" tokio = { version = "1", features = ["rt-multi-thread", "sync", "time"] } @@ -61,6 +68,11 @@ tracing-subscriber = { version = "0.3", features = ["env-filter"] } [dev-dependencies] pgrx-tests = "=0.16.1" +# Override the crates.io duroxide with the git branch for both the direct dependency +# and duroxide-pg's transitive dependency on duroxide. +[patch.crates-io] +duroxide = { git = "https://github.com/microsoft/duroxide.git", branch = "pinodeca/continue-parent-link" } + [profile.dev] panic = "unwind" diff --git a/USER_GUIDE.md b/USER_GUIDE.md index be808c98..74fcdcd3 100644 --- a/USER_GUIDE.md +++ b/USER_GUIDE.md @@ -1065,6 +1065,15 @@ SELECT df.start( > ) > ``` +### How Loops Execute + +Each loop iteration advances via *continue-as-new*, which restarts the loop with fresh state while preserving durability. Where that restart happens depends on whether the loop is the **root** of the function: + +- A **root loop** (the outermost node, e.g. `df.start(df.loop(...))` or the `@>` prefix) runs inline on the function's own orchestration. There is no surrounding work to preserve, so each iteration simply restarts the function. +- A **non-root loop** (a loop with prefix/suffix nodes, or one nested inside a `df.if()`, JOIN (`&`), or RACE (`|`) branch) runs as its own **child sub-orchestration**. Only the loop body restarts on each iteration — any work *before* the loop runs exactly once and is never re-executed, and a loop nested in a parallel branch gets its own durable instance. + +This is transparent to your workflow; it only affects observability. The child sub-orchestration is an internal durable instance: it does **not** appear in `df.list_instances()` (which lists only the instances you started with `df.start()`). Instead, the loop node's status in `df.instance_nodes()` / `df.explain()` reflects the child's progress, so you observe the loop through its parent instance as usual. + ### Stopping a Loop Externally ```sql diff --git a/docs/api-reference.md b/docs/api-reference.md index 332686ca..82d8aa3c 100644 --- a/docs/api-reference.md +++ b/docs/api-reference.md @@ -425,10 +425,18 @@ Return columns: **`status_details` JSON contract.** Written by the worker through the `update-node-status` activity and stored verbatim in `df.nodes.status_details`: -- `execution_id` — the node's full segmented execution path, e.g. - `a1b2c3d4::1::7f9a0012::1`. Parse it positionally: the second `::`-token is the - root loop generation (used to detect superseded loop iterations), and the - trailing segments encode `JOIN`/`RACE` sub-orchestration lineage. +- `execution_id` — the node's full execution stamp, `{instance_path}::{generation}`, + e.g. `a1b2c3d4::1::7f9a0012::2`. The **last** `::`-token is the generation + (continue-as-new count) of the orchestration that transitioned the node, and the + preceding `{instance_path}` is that orchestration's instance id. `instance_path` + encodes sub-orchestration lineage: it starts with the root function instance id and + appends a `::{parent_generation}::{branch_or_loop_node_id}` segment for each nested + `JOIN`/`RACE` branch and each non-root `df.loop()` (which runs as its own child + sub-orchestration). Instance ids and node ids are 8-char hex and never contain `::`, + so the path is unambiguous. Supersession is evaluated **per scope**: a node is + superseded when a newer generation exists for its own `instance_path`, or when any + ancestor scope in its path has advanced to a newer generation. For a plain root-level + loop this reduces to the second `::`-token being the loop generation. `inferred_status` and `inferred_status_from_ancestor_id` are **computed at read time** and are not stored in `df.nodes.status_details`. diff --git a/sql/pg_durable--0.2.3--0.2.4.sql b/sql/pg_durable--0.2.3--0.2.4.sql index 8ae8436f..7fa5bcec 100644 --- a/sql/pg_durable--0.2.3--0.2.4.sql +++ b/sql/pg_durable--0.2.3--0.2.4.sql @@ -276,10 +276,13 @@ CREATE INDEX idx_instances_label ON df.instances(label, created_at DESC, id) WHE -- Upgrade ordering (in-flight instances): the worker's orchestration history -- changed shape in this release -- update_node_status activity inputs gained an -- execution_id field, and JOIN/RACE branch sub-orchestrations now use --- deterministic composed instance ids instead of auto-generated ones. duroxide --- replays by exact equality on recorded inputs/ids, so instances in flight across --- the upgrade cannot resume; drain or recreate them before upgrading (the same --- constraint documented for issue #129). +-- deterministic composed instance ids instead of auto-generated ones. Non-root +-- df.loop() nodes also run as their own child sub-orchestration (with a +-- deterministic composed instance id) that stamps the loop node directly, instead +-- of the parent stamping it inline. This adds no new DDL -- status_details already +-- covers the loop node's stamps. duroxide replays by exact equality on recorded +-- inputs/ids, so instances in flight across the upgrade cannot resume; drain or +-- recreate them before upgrading (the same constraint documented for issue #129). -- ============================================================================ ALTER TABLE df.nodes ADD COLUMN status_details JSONB; diff --git a/src/node_status.rs b/src/node_status.rs index a9e03c5e..db97e8bb 100644 --- a/src/node_status.rs +++ b/src/node_status.rs @@ -52,17 +52,48 @@ pub struct Inferred { pub from_ancestor_id: Option, } -/// Parse the loop generation (the second "::"-token) from a node's -/// status_details `execution_id` stamp. None when never stamped / unparseable. -fn gen_of(status_details: Option<&str>) -> Option { +/// Parse a node's `execution_id` stamp into `(instance_id, generation)`. +/// +/// The stamp is `{orchestration_instance_id}::{execution_id}`. The trailing token is the +/// generation written by the innermost orchestration (a loop advances it via +/// `continue_as_new`); everything before it is that orchestration's instance id (its +/// "scope"). Instance ids and node ids are 8-char hex, so splitting on the last "::" is +/// unambiguous. None when never stamped / unparseable. +fn stamp_of(status_details: Option<&str>) -> Option<(String, i64)> { let sd = status_details?; let v: serde_json::Value = serde_json::from_str(sd).ok()?; - v.get("execution_id")? - .as_str()? - .split("::") - .nth(1)? - .parse::() - .ok() + let eid = v.get("execution_id")?.as_str()?; + let (instance_id, generation) = eid.rsplit_once("::")?; + Some((instance_id.to_string(), generation.parse::().ok()?)) +} + +/// Whether a node stamped `(instance_id, gen)` belongs to a superseded generation. +/// +/// A node is superseded when a newer generation exists in its own scope, or when any +/// ancestor scope along its spawn lineage has advanced past the generation that spawned +/// this scope. A spawned scope's instance id is +/// `{parent_instance}::{parent_gen}::{child_root_node_id}` (see `subtree_instance_id` in +/// the orchestration), so the spawning generation is the second-to-last "::"-token and the +/// parent instance id is everything before it. The recursion makes a non-root loop body +/// (older inner generation) and a parallel branch spawned by an older loop generation both +/// read as superseded, while the root case (a single-token instance id) reduces to a plain +/// per-scope generation compare. +fn is_superseded(instance_id: &str, gen: i64, scope_max: &HashMap) -> bool { + if let Some(&m) = scope_max.get(instance_id) { + if gen < m { + return true; + } + } + let tokens: Vec<&str> = instance_id.split("::").collect(); + if tokens.len() < 3 { + // Root scope: no parent scope can supersede this one. + return false; + } + let parent_instance = tokens[..tokens.len() - 2].join("::"); + match tokens[tokens.len() - 2].parse::() { + Ok(parent_gen) => is_superseded(&parent_instance, parent_gen, scope_max), + Err(_) => false, + } } fn is_terminal(status: Option<&str>) -> bool { @@ -128,6 +159,20 @@ pub fn infer_statuses( let mut out: HashMap = HashMap::new(); let mut visited: HashSet = HashSet::new(); + // Per-scope current generation: the highest trailing generation stamped for each + // orchestration instance id (the stamp with its last "::"-token removed). A node is + // superseded when a newer generation exists for its own scope or for any ancestor + // scope along its spawn lineage (see `is_superseded`). + let mut scope_max: HashMap = HashMap::new(); + for n in nodes.values() { + if let Some((instance_id, generation)) = stamp_of(n.status_details()) { + let slot = scope_max.entry(instance_id).or_insert(i64::MIN); + if generation > *slot { + *slot = generation; + } + } + } + // Top-down walk from the root, carrying the nearest terminal ancestor and // highest-generation ancestor seen so far. if let Some(root) = root_node { @@ -141,11 +186,13 @@ pub fn infer_statuses( None => continue, // dangling reference (should not happen) }; - let gen_n = gen_of(n.status_details()); + let gen_n = stamp_of(n.status_details()).map(|(_, g)| g); let terminal = is_terminal(n.status()); - let superseded = match (gen_n, &max_gen_anc) { - (Some(g), Some((ag, _))) => *ag > g, - _ => false, + let superseded = match stamp_of(n.status_details()) { + Some((instance_id, generation)) => { + is_superseded(&instance_id, generation, &scope_max) + } + None => false, }; let (inferred, from_anc): (String, Option) = if terminal { diff --git a/src/orchestrations/execute_function_graph.rs b/src/orchestrations/execute_function_graph.rs index d5752af6..f61593bf 100644 --- a/src/orchestrations/execute_function_graph.rs +++ b/src/orchestrations/execute_function_graph.rs @@ -345,6 +345,16 @@ async fn execute_function_node_with_vars( node_id, node.node_type )); + // A *non-root* loop runs as a child sub-orchestration so its `continue_as_new` restarts + // only the loop body (not the upstream prefix; #227) and a loop nested in a parallel + // branch gets its own durable instance (#233). The parent does NOT stamp the loop node + // running/terminal: the child sub-orchestration owns the node as its root and stamps it + // (see `execute_loop`). Intercept here, before any status stamping. A *root* loop falls + // through and runs inline via `execute_loop_node`. + if node.node_type.eq_ignore_ascii_case("loop") && node_id != graph.root_node_id { + return execute_loop_suborchestration(ctx, graph, node, node_id, results, exec_ctx).await; + } + // Stamp identifying which orchestration generation is transitioning this // node: "{orchestration_instance_id}::{execution_id}". For the root // orchestration this is "{df_instance_id}::{loop_generation}"; for a JOIN/RACE @@ -602,6 +612,335 @@ const LOOP_MIN_ITER_DURATION: Duration = Duration::from_secs(1); /// This prevents runaway infinite loops from consuming resources indefinitely. /// At the minimum 1-second rate limit, this allows ~27 hours of looping. const MAX_LOOP_ITERATIONS: u64 = 100_000; + +/// Orchestration name for the loop sub-orchestration. +/// +/// Each `df.loop()` node spawns a child orchestration under this name. The +/// child handles all iterations via `continue_as_new`; when the loop exits it +/// returns a `SubtreeEnvelope` to the parent. The parent link is preserved +/// across `continue_as_new` generations by duroxide (see duroxide PR #31), so +/// the parent orchestration is notified when the loop finally completes. +pub const LOOP_NAME: &str = "pg_durable::orchestration::execute-loop"; + +/// Build the `SubtreeEnvelope` a loop returns to its parent on exit. +/// +/// A loop always exits with a *normal* result: a `df.break()` inside the body is the loop's +/// own terminator (caught here as `NodeError::Break`), not a break that should unwind past +/// the loop, so the envelope is always tagged `Normal`. `execute_loop_node` merges `results` +/// back into the parent map via `parse_subtree_envelope`. +fn loop_exit_envelope(result: String, results: HashMap) -> Result { + let envelope = SubtreeEnvelope { + control: Some(SubtreeControl::Normal), + result, + results, + }; + serde_json::to_string(&envelope).map_err(|e| format!("Failed to serialize loop envelope: {e}")) +} + +/// Stamp the loop node's status from inside its own (`execute_loop`) sub-orchestration. +/// +/// The loop node is the *root* of this child instance, so the child owns its status +/// transitions: `running` at the start of every generation, then `completed`/`failed` on +/// exit. `stamp` is `{child_instance_id}::{execution_id}` — the same `execution_stamp` +/// scheme `execute_function_node_with_vars` uses — whose trailing `::`-token is this child's +/// `continue_as_new` generation. df.instance_nodes()/df.explain() and the write fence read +/// that token to fence stale writes and infer pending/skipped. +async fn stamp_loop_node( + ctx: &OrchestrationContext, + instance_id: &str, + loop_node_id: &str, + status: &str, + result: Option<&str>, + stamp: &str, +) { + let mut input = serde_json::json!({ + "node_id": loop_node_id, + "instance_id": instance_id, + "status": status, + "execution_id": stamp, + }); + if let Some(r) = result { + input["result"] = serde_json::Value::String(r.to_string()); + } + let _ = ctx + .schedule_activity(activities::update_node_status::NAME, input.to_string()) + .await; +} + +/// Run one iteration of a loop body (and its optional while-condition) for the +/// `execute_loop` sub-orchestration. +/// +/// Returns `Ok(Some(final_result))` when the loop should exit (a `df.break()` in the body, +/// or the while-condition evaluating false), `Ok(None)` when another iteration is needed, +/// and `Err` when the body or condition fails. Named results are stored on exit, mirroring +/// the inline root-loop path in `execute_loop_node`. +async fn run_loop_iteration( + ctx: &OrchestrationContext, + graph: &FunctionGraph, + node: &FunctionNode, + loop_node_id: &str, + body_id: &str, + results: &mut HashMap, + exec_ctx: &ExecutionContext, +) -> Result, String> { + // The loop is where `NodeError::Break` is caught: a break unwinds through the body via + // `?` and is converted here into the loop's normal exit value. A `Failure` propagates + // out of the sub-orchestration unchanged. + let body_result = + match execute_function_node_with_vars(ctx, graph, body_id, results, exec_ctx).await { + Ok(v) => v, + Err(NodeError::Break(break_value)) => { + ctx.trace_info(format!( + "Loop terminated by break with value: {break_value}" + )); + store_named_result(ctx, node, &break_value, results, "LOOP"); + return Ok(Some(break_value)); + } + Err(NodeError::Failure(e)) => return Err(e), + }; + + // While-condition: if present and false, exit the loop. + if let Some(ref config_str) = node.query { + let config: serde_json::Value = serde_json::from_str(config_str).map_err(|e| { + // M8: Malformed condition config should fail the loop rather than + // silently creating an infinite loop without exit condition. + format!("LOOP node {loop_node_id}: failed to parse condition config: {e}") + })?; + if let Some(condition_node_id) = config["condition_node"].as_str() { + ctx.trace_info("Evaluating loop condition"); + let condition_result = match execute_function_node_with_vars( + ctx, + graph, + condition_node_id, + results, + exec_ctx, + ) + .await + { + Ok(v) => v, + Err(NodeError::Break(break_value)) => { + store_named_result(ctx, node, &break_value, results, "LOOP"); + return Ok(Some(break_value)); + } + Err(NodeError::Failure(e)) => return Err(e), + }; + + // Parse condition result to check truthiness (uses evaluate_condition to extract boolean from SQL result) + let should_continue = evaluate_condition(&condition_result).unwrap_or(false); + ctx.trace_info(format!( + "Loop condition evaluated to: {condition_result} (continue={should_continue})" + )); + + if !should_continue { + ctx.trace_info("Loop condition false, exiting loop"); + store_named_result(ctx, node, &body_result, results, "LOOP"); + return Ok(Some(body_result)); + } + } + } + + Ok(None) +} + +/// Sub-orchestration that runs a single loop iteration and either returns or +/// calls `continue_as_new` for the next iteration. +/// +/// Input JSON: +/// ```json +/// { "instance_id": "...", "loop_node_id": "...", +/// "results": "", "vars": "", "label": "...", +/// "iteration": 0 } +/// ``` +/// +/// `load_function_graph` is called at the start of **every** generation +/// (including after `continue_as_new`) so that cross-iteration security +/// tampering is caught and the instance is failed — the same guarantee the +/// main `execute()` orchestration provides at its generation boundary. +/// +/// On loop exit the function returns a `SubtreeEnvelope` containing the final +/// result and any named results accumulated during the loop. +pub async fn execute_loop(ctx: OrchestrationContext, input_json: String) -> Result { + let input: serde_json::Value = serde_json::from_str(&input_json) + .map_err(|e| format!("Failed to parse ExecuteLoop input: {e}"))?; + + let instance_id = input["instance_id"] + .as_str() + .ok_or("Missing instance_id in ExecuteLoop input")? + .to_string(); + let loop_node_id = input["loop_node_id"] + .as_str() + .ok_or("Missing loop_node_id in ExecuteLoop input")? + .to_string(); + let results_json = input["results"] + .as_str() + .ok_or("Missing results in ExecuteLoop input")?; + + // Iteration counter threaded across continue_as_new generations (M7). + // Absent on the first generation (spawned by execute_loop_node), so default to 0. + let iteration = input["iteration"].as_u64().unwrap_or(0); + + // re-load the graph from the database on every generation — this re-validates + // submitted_by and catches cross-iteration security tampering. + let graph_json = ctx + .schedule_activity(activities::load_function_graph::NAME, instance_id.clone()) + .await?; + let graph: FunctionGraph = serde_json::from_str(&graph_json) + .map_err(|e| format!("Failed to parse graph in ExecuteLoop: {e}"))?; + + let mut results: HashMap = serde_json::from_str(results_json) + .map_err(|e| format!("Failed to parse results in ExecuteLoop: {e}"))?; + + let vars: HashMap = if let Some(vars_str) = input["vars"].as_str() { + serde_json::from_str(vars_str) + .map_err(|e| format!("Failed to parse vars in ExecuteLoop: {e}"))? + } else { + HashMap::new() + }; + let label: Option = input["label"].as_str().map(|s| s.to_string()); + + let exec_ctx = ExecutionContext { + vars, + label, + loop_iteration: iteration, + }; + + let node = graph + .nodes + .get(&loop_node_id) + .ok_or_else(|| format!("Loop node not found: {loop_node_id}"))?; + + let body_id = node + .left_node + .as_ref() + .ok_or_else(|| format!("LOOP node {loop_node_id} has no body"))? + .clone(); + + // Capture iteration start time for rate-limiting continue_as_new. + let iter_started = ctx.utc_now().await.ok(); + + // Stamp this loop node as running for the current generation. The loop node is the ROOT + // of this sub-orchestration, so it is owned and stamped here (not by the parent): + // running on each generation, completed/failed on exit. The stamp's instance id carries + // the composed lineage (subtree_instance_id), so the trailing `::`-token is this child's + // continue_as_new generation — what df.instance_nodes()/df.explain() and the write fence + // read to infer pending/skipped and to fence stale writes. + let execution_stamp = format!("{}::{}", ctx.instance_id(), ctx.execution_id()); + stamp_loop_node( + &ctx, + &instance_id, + &loop_node_id, + "running", + None, + &execution_stamp, + ) + .await; + + ctx.trace_info("Executing loop iteration"); + + match run_loop_iteration( + &ctx, + &graph, + node, + &loop_node_id, + &body_id, + &mut results, + &exec_ctx, + ) + .await + { + Ok(Some(final_result)) => { + stamp_loop_node( + &ctx, + &instance_id, + &loop_node_id, + "completed", + Some(&final_result), + &execution_stamp, + ) + .await; + return loop_exit_envelope(final_result, results); + } + Ok(None) => {} + Err(e) => { + stamp_loop_node( + &ctx, + &instance_id, + &loop_node_id, + "failed", + Some(&e), + &execution_stamp, + ) + .await; + return Err(e); + } + } + + ctx.trace_info("Continuing as new for next loop iteration"); + + // M7: Enforce maximum iteration count to prevent runaway infinite loops + let next_iteration = exec_ctx.loop_iteration + 1; + if next_iteration >= MAX_LOOP_ITERATIONS { + let e = format!( + "Loop exceeded maximum iteration count of {MAX_LOOP_ITERATIONS}. \ + Use df.break() to exit the loop or restructure the workflow." + ); + stamp_loop_node( + &ctx, + &instance_id, + &loop_node_id, + "failed", + Some(&e), + &execution_stamp, + ) + .await; + return Err(e); + } + + // Enforce a minimum per-iteration wall-clock duration to prevent busy-looping. + if let Some(started) = iter_started { + if let Ok(now) = ctx.utc_now().await { + let elapsed = now.duration_since(started).unwrap_or(Duration::ZERO); + if elapsed < LOOP_MIN_ITER_DURATION { + let deficit = LOOP_MIN_ITER_DURATION - elapsed; + ctx.trace_info(format!( + "Loop iteration took {elapsed:?} (< {LOOP_MIN_ITER_DURATION:?}); \ + adding {deficit:?} rate-limit delay" + )); + ctx.schedule_timer(deficit).await; + } + } + } + + // Another iteration needed: continue_as_new within this sub-orchestration. + // The parent orchestration keeps its awaiting handle because duroxide preserves + // the parent link across continue_as_new (duroxide PR #31). + ctx.trace_info(format!( + "Loop continuing with continue_as_new at node {loop_node_id}" + )); + let new_results_json = serde_json::to_string(&results) + .map_err(|e| format!("Failed to serialize updated results: {e}"))?; + let mut new_input = input.clone(); + new_input["results"] = serde_json::Value::String(new_results_json); + // Persist the incremented iteration counter for the next generation (M7). + new_input["iteration"] = serde_json::Value::Number(next_iteration.into()); + let new_input_json = serde_json::to_string(&new_input) + .map_err(|e| format!("Failed to serialize loop input: {e}"))?; + ctx.continue_as_new(new_input_json) + .await + .map(|_| String::new()) + .map_err(|e| format!("continue_as_new failed: {e:?}")) +} + +/// Execute a loop node. +/// +/// Only a *root* loop reaches this function: a non-root loop is intercepted in +/// `execute_function_node_with_vars` and delegated to a child sub-orchestration +/// (`execute_loop`) before any status stamping. A root loop runs inline because its +/// `continue_as_new` restarts the root orchestration, and with no upstream prefix to +/// re-execute, re-entering from the root lands back on this same loop node each +/// generation. Running inline also keeps the body's node stamps under the root df +/// instance id (so the second `::`-token of every stamp is the loop generation), which +/// df.instance_nodes()/df.explain() and the write fence rely on. async fn execute_loop_node( ctx: &OrchestrationContext, graph: &FunctionGraph, @@ -717,12 +1056,73 @@ async fn execute_loop_node( loop_iteration: next_iteration, }; - // duroxide 0.1.1: continue_as_new returns an awaitable future - return it directly - return ctx - .continue_as_new(serde_json::to_string(&new_input).unwrap_or(graph.instance_id.clone())) + // duroxide: continue_as_new returns an awaitable future - return it directly + ctx.continue_as_new(serde_json::to_string(&new_input).unwrap_or(graph.instance_id.clone())) .await .map(|_| body_result) - .map_err(|e| NodeError::Failure(format!("continue_as_new failed: {e:?}"))); + .map_err(|e| NodeError::Failure(format!("continue_as_new failed: {e:?}"))) +} + +/// Run a *non-root* loop as a child sub-orchestration. +/// +/// A non-root loop must not run inline: its `continue_as_new` would restart the parent +/// orchestration and re-execute the upstream prefix (#227), and a loop nested inside a +/// parallel branch needs its own durable instance (#233). The child (`execute_loop`) +/// advances iterations via its own `continue_as_new`, so the parent's prefix is preserved. +/// +/// The child instance id is built with `subtree_instance_id`, giving it the root loop +/// generation as the second `::`-token (for node-status inference and the write fence) and a +/// per-iteration-unique id (the parent execution id advances on each generation, so a loop +/// inside a parallel branch never collides across iterations). The loop node itself is the +/// root of this child instance and is stamped (running/completed/failed) there — NOT by the +/// parent — so the parent invokes this *before* any status stamping (see +/// `execute_function_node_with_vars`). +async fn execute_loop_suborchestration( + ctx: &OrchestrationContext, + graph: &FunctionGraph, + node: &FunctionNode, + node_id: &str, + results: &mut HashMap, + exec_ctx: &ExecutionContext, +) -> NodeResult { + // Validate the loop has a body before spawning the child sub-orchestration. + node.left_node + .as_ref() + .ok_or_else(|| format!("LOOP node {node_id} has no body"))?; + + let results_json = + serde_json::to_string(results).map_err(|e| format!("Failed to serialize results: {e}"))?; + let vars_json = serde_json::to_string(&exec_ctx.vars) + .map_err(|e| format!("Failed to serialize vars: {e}"))?; + + let loop_input = serde_json::json!({ + "instance_id": graph.instance_id, + "loop_node_id": node_id, + "results": results_json, + "vars": vars_json, + "label": exec_ctx.label, + "iteration": 0, + }) + .to_string(); + + ctx.trace_info(format!( + "Spawning loop sub-orchestration for node {node_id}" + )); + + let raw = ctx + .schedule_sub_orchestration_with_id( + LOOP_NAME, + subtree_instance_id(ctx, node_id), + loop_input, + ) + .await + .map_err(|e| format!("Loop sub-orchestration failed: {e}"))?; + + // Merge named results from the loop sub-orchestration back into the parent map and + // return the loop's final result. The loop always returns a `Normal` envelope (a break + // inside the body is the loop's own terminator), so `parse_subtree_envelope` will not + // re-raise a `NodeError::Break` here. + parse_subtree_envelope(&raw, "LOOP", results) } async fn execute_break_node( @@ -904,6 +1304,53 @@ fn parse_subtree_envelope( } } +/// Choose the child orchestration (name + input JSON) used to run a JOIN/RACE branch. +/// +/// A branch whose root node is a loop is spawned *directly* as an `execute_loop` child: the +/// loop is already the root of its own orchestration and advances via its own +/// `continue_as_new`, so wrapping it in an extra `execute_subtree` layer (which would then +/// itself spawn the loop child) is both redundant and wrong — it would create a third +/// sub-orchestration where only two are expected. Every other branch runs as an +/// `execute_subtree` child. Both children return a `SubtreeEnvelope`, so `parse_subtree_envelope` +/// handles the result identically regardless of which was chosen. +fn branch_child_orchestration( + graph: &FunctionGraph, + branch_node_id: &str, + graph_json: &str, + results_json: &str, + vars_json: &str, + label: &Option, +) -> (&'static str, String) { + let is_loop = graph + .nodes + .get(branch_node_id) + .map(|n| n.node_type.eq_ignore_ascii_case("loop")) + .unwrap_or(false); + + if is_loop { + let input = serde_json::json!({ + "instance_id": graph.instance_id, + "loop_node_id": branch_node_id, + "results": results_json, + "vars": vars_json, + "label": label, + "iteration": 0, + }) + .to_string(); + (LOOP_NAME, input) + } else { + let input = serde_json::json!({ + "graph": graph_json, + "node_id": branch_node_id, + "results": results_json, + "vars": vars_json, + "label": label, + }) + .to_string(); + (SUBTREE_NAME, input) + } +} + async fn execute_join_node( ctx: &OrchestrationContext, graph: &FunctionGraph, @@ -930,31 +1377,11 @@ async fn execute_join_node( let vars_json = serde_json::to_string(&exec_ctx.vars) .map_err(|e| format!("Failed to serialize vars: {e}"))?; - let left_input = serde_json::json!({ - "graph": graph_json, - "node_id": left_id, - "results": results_json, - "vars": vars_json, - "label": exec_ctx.label - }) - .to_string(); - - let right_input = serde_json::json!({ - "graph": graph_json, - "node_id": right_id, - "results": results_json, - "vars": vars_json, - "label": exec_ctx.label - }) - .to_string(); - - // Build list of branch inputs, each paired with its branch root node id so the - // sub-orchestration can be scheduled with a deterministic, generation-stamped - // instance id (see `subtree_instance_id`). - let mut branch_inputs: Vec<(String, String)> = vec![ - (left_id.clone(), left_input), - (right_id.clone(), right_input), - ]; + // Collect the branch root node ids (left, right, and any join3 extras). Each branch is + // spawned as its own child sub-orchestration with a deterministic, generation-stamped + // instance id (see `subtree_instance_id`) so its node stamps carry the root loop + // generation and remain unique across loop iterations. + let mut branch_ids: Vec = vec![left_id.clone(), right_id.clone()]; // Check for extra nodes (join3) if let Some(config_str) = &node.query { @@ -962,29 +1389,30 @@ async fn execute_join_node( if let Some(extra_nodes) = config["extra_nodes"].as_array() { for extra_node_val in extra_nodes { if let Some(extra_id) = extra_node_val.as_str() { - let extra_input = serde_json::json!({ - "graph": graph_json, - "node_id": extra_id, - "results": results_json, - "vars": vars_json, - "label": exec_ctx.label - }) - .to_string(); - branch_inputs.push((extra_id.to_string(), extra_input)); + branch_ids.push(extra_id.to_string()); } } } } } - // Schedule sub-orchestrations and collect DurableFutures. Each branch gets a - // deterministic `{parent}::{generation}::{branch_root}` instance id so its node - // stamps carry the root loop generation and remain unique across loop iterations. + // Schedule each branch. A branch whose root is a loop is spawned directly as an + // `execute_loop` child (it is already the root of its own orchestration, so it must not + // be wrapped in an extra `execute_subtree` layer); every other branch runs as an + // `execute_subtree` child. Both return a `SubtreeEnvelope`. let mut durable_futures = Vec::new(); - for (child_root, input) in branch_inputs { + for child_root in &branch_ids { + let (name, input) = branch_child_orchestration( + graph, + child_root, + &graph_json, + &results_json, + &vars_json, + &exec_ctx.label, + ); let fut = ctx.schedule_sub_orchestration_with_id( - SUBTREE_NAME, - subtree_instance_id(ctx, &child_root), + name, + subtree_instance_id(ctx, child_root), input, ); durable_futures.push(fut); @@ -1060,34 +1488,34 @@ async fn execute_race_node( let vars_json = serde_json::to_string(&exec_ctx.vars) .map_err(|e| format!("Failed to serialize vars: {e}"))?; - let left_input = serde_json::json!({ - "graph": graph_json, - "node_id": left_id, - "results": results_json, - "vars": vars_json, - "label": exec_ctx.label - }) - .to_string(); - - let right_input = serde_json::json!({ - "graph": graph_json, - "node_id": right_id, - "results": results_json, - "vars": vars_json, - "label": exec_ctx.label - }) - .to_string(); - - // Schedule sub-orchestrations with deterministic, generation-stamped instance - // ids so each branch's node stamps carry the root loop generation (see - // `subtree_instance_id`). + // Schedule each branch with a deterministic, generation-stamped instance id so its node + // stamps carry the root loop generation (see `subtree_instance_id`). A branch whose root + // is a loop is spawned directly as an `execute_loop` child (it is already the root of its + // own orchestration, so it must not be wrapped in an extra `execute_subtree` layer); + // every other branch runs as an `execute_subtree` child. + let (left_name, left_input) = branch_child_orchestration( + graph, + left_id, + &graph_json, + &results_json, + &vars_json, + &exec_ctx.label, + ); + let (right_name, right_input) = branch_child_orchestration( + graph, + right_id, + &graph_json, + &results_json, + &vars_json, + &exec_ctx.label, + ); let left_fut = ctx.schedule_sub_orchestration_with_id( - SUBTREE_NAME, + left_name, subtree_instance_id(ctx, left_id), left_input, ); let right_fut = ctx.schedule_sub_orchestration_with_id( - SUBTREE_NAME, + right_name, subtree_instance_id(ctx, right_id), right_input, ); diff --git a/src/registry.rs b/src/registry.rs index 03e5c73d..f20d128d 100644 --- a/src/registry.rs +++ b/src/registry.rs @@ -55,5 +55,9 @@ pub fn create_orchestration_registry() -> OrchestrationRegistry { orchestrations::execute_function_graph::SUBTREE_NAME, orchestrations::execute_function_graph::execute_subtree, ) + .register( + orchestrations::execute_function_graph::LOOP_NAME, + orchestrations::execute_function_graph::execute_loop, + ) .build() } diff --git a/tests/e2e/sql/24_nonroot_loop.sql b/tests/e2e/sql/24_nonroot_loop.sql new file mode 100644 index 00000000..fc412ba8 --- /dev/null +++ b/tests/e2e/sql/24_nonroot_loop.sql @@ -0,0 +1,208 @@ +-- Copyright (c) Microsoft Corporation. +-- Licensed under the PostgreSQL License. + +-- Regression tests for: df.loop continue_as_new restarts from root when loop is not root +-- (https://github.com/microsoft/pg_durable/issues/227) +-- +-- When a loop is not the root node of a function graph (i.e., there are prefix or suffix +-- nodes), continue_as_new must NOT re-execute the prefix or skip the suffix. Loops are +-- now executed as a scoped sub-orchestration so continue_as_new is scoped to the loop +-- child, leaving the parent graph orchestration parked. + +SET SESSION AUTHORIZATION df_e2e_user; + +-- === Test 1: Non-root loop — prefix runs once, body runs N times === +-- +-- Graph: INSERT into prefix_table ~> df.loop(INSERT into body_table ~> break after 3) +-- Expected: prefix_table has exactly 1 row after completion, body_table has exactly 3 rows. + +DROP TABLE IF EXISTS test_nonroot_prefix; +DROP TABLE IF EXISTS test_nonroot_body; +CREATE TABLE test_nonroot_prefix (id SERIAL, ts TIMESTAMPTZ DEFAULT clock_timestamp()); +CREATE TABLE test_nonroot_body (id SERIAL, ts TIMESTAMPTZ DEFAULT clock_timestamp()); + +CREATE TEMP TABLE _t1 AS +SELECT df.start( + df.seq( + 'INSERT INTO test_nonroot_prefix DEFAULT VALUES', + df.loop( + 'INSERT INTO test_nonroot_body DEFAULT VALUES' + ~> ( + 'SELECT COUNT(*) >= 3 FROM test_nonroot_body' + ?> df.break() + !> df.sleep(1) + ) + ) + ), + 'test-nonroot-loop-prefix' +) AS instance_id; + +DO $$ +DECLARE + v_id TEXT; + v_status TEXT; + v_prefix INT; + v_body INT; +BEGIN + SELECT instance_id INTO v_id FROM _t1; + RAISE NOTICE 'Test 1 - non-root loop prefix: instance %', v_id; + + SELECT df.wait_for_completion(v_id, 90) INTO v_status; + + IF v_status != 'completed' THEN + RAISE EXCEPTION 'TEST FAILED [nonroot-prefix]: expected completed, got %', v_status; + END IF; + + SELECT COUNT(*) INTO v_prefix FROM test_nonroot_prefix; + SELECT COUNT(*) INTO v_body FROM test_nonroot_body; + + IF v_prefix != 1 THEN + RAISE EXCEPTION 'TEST FAILED [nonroot-prefix]: prefix ran % time(s) (expected 1); ' + 'prefix nodes must not be re-executed on each loop iteration', v_prefix; + END IF; + + IF v_body != 3 THEN + RAISE EXCEPTION 'TEST FAILED [nonroot-prefix]: body ran % time(s) (expected 3)', v_body; + END IF; + + RAISE NOTICE 'PASSED: non-root loop prefix — prefix ran once, body ran 3 times'; +END $$; + +DROP TABLE _t1; +DROP TABLE test_nonroot_prefix; +DROP TABLE test_nonroot_body; + +-- === Test 2: Non-root loop — prefix once, body N times, suffix once === +-- +-- Graph: INSERT prefix ~> df.loop(body, break after 2) ~> INSERT suffix +-- Expected: prefix_table = 1 row, body_table = 2 rows, suffix_table = 1 row. + +DROP TABLE IF EXISTS test_nonroot2_prefix; +DROP TABLE IF EXISTS test_nonroot2_body; +DROP TABLE IF EXISTS test_nonroot2_suffix; +CREATE TABLE test_nonroot2_prefix (id SERIAL, ts TIMESTAMPTZ DEFAULT clock_timestamp()); +CREATE TABLE test_nonroot2_body (id SERIAL, ts TIMESTAMPTZ DEFAULT clock_timestamp()); +CREATE TABLE test_nonroot2_suffix (id SERIAL, ts TIMESTAMPTZ DEFAULT clock_timestamp()); + +CREATE TEMP TABLE _t2 AS +SELECT df.start( + df.seq( + 'INSERT INTO test_nonroot2_prefix DEFAULT VALUES', + df.seq( + df.loop( + 'INSERT INTO test_nonroot2_body DEFAULT VALUES' + ~> ( + 'SELECT COUNT(*) >= 2 FROM test_nonroot2_body' + ?> df.break() + !> df.sleep(1) + ) + ), + 'INSERT INTO test_nonroot2_suffix DEFAULT VALUES' + ) + ), + 'test-nonroot-loop-prefix-suffix' +) AS instance_id; + +DO $$ +DECLARE + v_id TEXT; + v_status TEXT; + v_prefix INT; + v_body INT; + v_suffix INT; +BEGIN + SELECT instance_id INTO v_id FROM _t2; + RAISE NOTICE 'Test 2 - non-root loop prefix+suffix: instance %', v_id; + + SELECT df.wait_for_completion(v_id, 90) INTO v_status; + + IF v_status != 'completed' THEN + RAISE EXCEPTION 'TEST FAILED [nonroot-suffix]: expected completed, got %', v_status; + END IF; + + SELECT COUNT(*) INTO v_prefix FROM test_nonroot2_prefix; + SELECT COUNT(*) INTO v_body FROM test_nonroot2_body; + SELECT COUNT(*) INTO v_suffix FROM test_nonroot2_suffix; + + IF v_prefix != 1 THEN + RAISE EXCEPTION 'TEST FAILED [nonroot-suffix]: prefix ran % time(s) (expected 1)', v_prefix; + END IF; + + IF v_body != 2 THEN + RAISE EXCEPTION 'TEST FAILED [nonroot-suffix]: body ran % time(s) (expected 2)', v_body; + END IF; + + IF v_suffix != 1 THEN + RAISE EXCEPTION 'TEST FAILED [nonroot-suffix]: suffix ran % time(s) (expected 1)', v_suffix; + END IF; + + RAISE NOTICE 'PASSED: non-root loop prefix+suffix — prefix once, body twice, suffix once'; +END $$; + +DROP TABLE _t2; +DROP TABLE test_nonroot2_prefix; +DROP TABLE test_nonroot2_body; +DROP TABLE test_nonroot2_suffix; + +-- === Test 3: Non-root loop — named result from prefix available inside loop body === +-- +-- Verify that named results accumulated before the loop are still accessible +-- inside the loop body after continue_as_new (they are preserved in the loop +-- sub-orchestration's input). + +DROP TABLE IF EXISTS test_nonroot3_log; +CREATE TABLE test_nonroot3_log (id SERIAL, val TEXT, ts TIMESTAMPTZ DEFAULT clock_timestamp()); + +CREATE TEMP TABLE _t3 AS +SELECT df.start( + df.seq( + ('SELECT ''hello'' AS greeting' |=> 'prefix_result'), + df.loop( + ($$INSERT INTO test_nonroot3_log (val) + VALUES ($prefix_result) + RETURNING val$$ + |=> 'last_val') + ~> ( + 'SELECT COUNT(*) >= 2 FROM test_nonroot3_log' + ?> df.break() + !> df.sleep(1) + ) + ) + ), + 'test-nonroot-loop-named-result' +) AS instance_id; + +DO $$ +DECLARE + v_id TEXT; + v_status TEXT; + v_cnt INT; + v_val TEXT; +BEGIN + SELECT instance_id INTO v_id FROM _t3; + RAISE NOTICE 'Test 3 - non-root loop uses prefix named result: instance %', v_id; + + SELECT df.wait_for_completion(v_id, 90) INTO v_status; + + IF v_status != 'completed' THEN + RAISE EXCEPTION 'TEST FAILED [nonroot-named]: expected completed, got %', v_status; + END IF; + + SELECT COUNT(*) INTO v_cnt FROM test_nonroot3_log; + IF v_cnt != 2 THEN + RAISE EXCEPTION 'TEST FAILED [nonroot-named]: expected 2 rows, got %', v_cnt; + END IF; + + SELECT val INTO v_val FROM test_nonroot3_log ORDER BY id LIMIT 1; + IF v_val != 'hello' THEN + RAISE EXCEPTION 'TEST FAILED [nonroot-named]: expected ''hello'', got ''%''', v_val; + END IF; + + RAISE NOTICE 'PASSED: non-root loop uses prefix named result across iterations'; +END $$; + +DROP TABLE _t3; +DROP TABLE test_nonroot3_log; + +RESET SESSION AUTHORIZATION; +SELECT 'TEST PASSED' AS result; diff --git a/tests/e2e/sql/25_loop_contains_join_race.sql b/tests/e2e/sql/25_loop_contains_join_race.sql new file mode 100644 index 00000000..61b46cda --- /dev/null +++ b/tests/e2e/sql/25_loop_contains_join_race.sql @@ -0,0 +1,152 @@ +-- Copyright (c) Microsoft Corporation. +-- Licensed under the PostgreSQL License. + +-- Regression tests for: df.loop whose body CONTAINS a JOIN or RACE +-- (https://github.com/microsoft/pg_durable/issues/230) +-- +-- A loop iterates via continue_as_new. Each generation that runs a JOIN/RACE inside the +-- body spawns child sub-orchestrations whose instance ids are derived from the loop's +-- current execution (generation). If those child ids collided across generations the +-- second iteration would replay the first iteration's history ("instance already exists" / +-- stale subtree result). The loop must run >= 2 iterations and every branch must run once +-- per iteration. + +SET SESSION AUTHORIZATION df_e2e_user; + +-- === Test 1: JOIN inside a NON-ROOT loop body, >= 3 iterations === +-- +-- Graph: INSERT prefix ~> df.loop( (INSERT left & INSERT right) ~> break after 3 ) +-- Expected: prefix = 1 row, left = 3 rows, right = 3 rows (both branches every iteration). + +DROP TABLE IF EXISTS test_loopjoin_prefix; +DROP TABLE IF EXISTS test_loopjoin_left; +DROP TABLE IF EXISTS test_loopjoin_right; +CREATE TABLE test_loopjoin_prefix (id SERIAL, ts TIMESTAMPTZ DEFAULT clock_timestamp()); +CREATE TABLE test_loopjoin_left (id SERIAL, ts TIMESTAMPTZ DEFAULT clock_timestamp()); +CREATE TABLE test_loopjoin_right (id SERIAL, ts TIMESTAMPTZ DEFAULT clock_timestamp()); + +CREATE TEMP TABLE _t1 AS +SELECT df.start( + df.seq( + 'INSERT INTO test_loopjoin_prefix DEFAULT VALUES', + df.loop( + ( + 'INSERT INTO test_loopjoin_left DEFAULT VALUES' + & 'INSERT INTO test_loopjoin_right DEFAULT VALUES' + ) + ~> ( + 'SELECT COUNT(*) >= 3 FROM test_loopjoin_left' + ?> df.break() + !> df.sleep(1) + ) + ) + ), + 'test-loop-contains-join' +) AS instance_id; + +DO $$ +DECLARE + v_id TEXT; + v_status TEXT; + v_prefix INT; + v_left INT; + v_right INT; +BEGIN + SELECT instance_id INTO v_id FROM _t1; + RAISE NOTICE 'Test 1 - JOIN inside loop body: instance %', v_id; + + SELECT df.wait_for_completion(v_id, 90) INTO v_status; + + IF v_status != 'completed' THEN + RAISE EXCEPTION 'TEST FAILED [loop-join]: expected completed, got %', v_status; + END IF; + + SELECT COUNT(*) INTO v_prefix FROM test_loopjoin_prefix; + SELECT COUNT(*) INTO v_left FROM test_loopjoin_left; + SELECT COUNT(*) INTO v_right FROM test_loopjoin_right; + + IF v_prefix != 1 THEN + RAISE EXCEPTION 'TEST FAILED [loop-join]: prefix ran % time(s) (expected 1)', v_prefix; + END IF; + + IF v_left != 3 THEN + RAISE EXCEPTION 'TEST FAILED [loop-join]: left branch ran % time(s) (expected 3)', v_left; + END IF; + + IF v_right != 3 THEN + RAISE EXCEPTION 'TEST FAILED [loop-join]: right branch ran % time(s) (expected 3)', v_right; + END IF; + + RAISE NOTICE 'PASSED: JOIN inside loop body — both branches ran once per iteration (3 iterations)'; +END $$; + +DROP TABLE _t1; +DROP TABLE test_loopjoin_prefix; +DROP TABLE test_loopjoin_left; +DROP TABLE test_loopjoin_right; + +-- === Test 2: RACE inside a NON-ROOT loop body, >= 2 iterations === +-- +-- Graph: INSERT prefix ~> df.loop( body ~> race(fast-or-break, slow) ) +-- The race's first branch returns fast (no break) on iteration 1 and breaks on iteration 2, +-- so the loop runs exactly 2 iterations. The slow branch (pg_sleep) never wins. + +DROP TABLE IF EXISTS test_looprace_prefix; +DROP TABLE IF EXISTS test_looprace_log; +CREATE TABLE test_looprace_prefix (id SERIAL, ts TIMESTAMPTZ DEFAULT clock_timestamp()); +CREATE TABLE test_looprace_log (id SERIAL, iteration INT, ts TIMESTAMPTZ DEFAULT clock_timestamp()); + +CREATE TEMP TABLE _t2 AS +SELECT df.start( + df.seq( + 'INSERT INTO test_looprace_prefix DEFAULT VALUES', + df.loop( + 'INSERT INTO test_looprace_log (iteration) VALUES ((SELECT COALESCE(MAX(iteration), 0) + 1 FROM test_looprace_log))' + ~> df.race( + df.if( + 'SELECT COUNT(*) >= 2 FROM test_looprace_log', + df.break('race-done'), + 'SELECT 1' + ), + 'SELECT pg_sleep(30)' + ) + ) + ), + 'test-loop-contains-race' +) AS instance_id; + +DO $$ +DECLARE + v_id TEXT; + v_status TEXT; + v_prefix INT; + v_iters INT; +BEGIN + SELECT instance_id INTO v_id FROM _t2; + RAISE NOTICE 'Test 2 - RACE inside loop body: instance %', v_id; + + SELECT df.wait_for_completion(v_id, 90) INTO v_status; + + IF v_status != 'completed' THEN + RAISE EXCEPTION 'TEST FAILED [loop-race]: expected completed, got %', v_status; + END IF; + + SELECT COUNT(*) INTO v_prefix FROM test_looprace_prefix; + SELECT COUNT(*) INTO v_iters FROM test_looprace_log; + + IF v_prefix != 1 THEN + RAISE EXCEPTION 'TEST FAILED [loop-race]: prefix ran % time(s) (expected 1)', v_prefix; + END IF; + + IF v_iters != 2 THEN + RAISE EXCEPTION 'TEST FAILED [loop-race]: loop ran % iteration(s) (expected 2)', v_iters; + END IF; + + RAISE NOTICE 'PASSED: RACE inside loop body — loop ran 2 iterations and exited on break'; +END $$; + +DROP TABLE _t2; +DROP TABLE test_looprace_prefix; +DROP TABLE test_looprace_log; + +SELECT 'TEST PASSED' AS result; diff --git a/tests/e2e/sql/26_loop_in_join_race_branch.sql b/tests/e2e/sql/26_loop_in_join_race_branch.sql new file mode 100644 index 00000000..73eb48e9 --- /dev/null +++ b/tests/e2e/sql/26_loop_in_join_race_branch.sql @@ -0,0 +1,239 @@ +-- Copyright (c) Microsoft Corporation. +-- Licensed under the PostgreSQL License. + +-- Regression tests for: df.loop nested INSIDE a JOIN or RACE branch +-- (https://github.com/microsoft/pg_durable/issues/233) +-- +-- A loop nested inside a parallel branch is a non-root loop, so it runs as its own child +-- sub-orchestration spawned by the branch's subtree orchestration. Its instance id embeds +-- the branch lineage, so it gets a distinct durable instance and continue_as_new restarts +-- only the loop body (not the branch). Before the fix this raised +-- "Missing graph in ExecuteSubtree input" / failed to iterate. + +SET SESSION AUTHORIZATION df_e2e_user; + +-- === Test 1: loop inside a JOIN branch, loop iterates >= 2 === +-- +-- Graph: df.loop(body, break after 2) & 'SELECT sibling' +-- Expected: completes, loop body ran exactly 2 times, sibling branch ran once. + +DROP TABLE IF EXISTS test_joinloop_body; +DROP TABLE IF EXISTS test_joinloop_sibling; +CREATE TABLE test_joinloop_body (id SERIAL, iteration INT, ts TIMESTAMPTZ DEFAULT clock_timestamp()); +CREATE TABLE test_joinloop_sibling (id SERIAL, ts TIMESTAMPTZ DEFAULT clock_timestamp()); + +CREATE TEMP TABLE _t1 AS +SELECT df.start( + ( + df.loop( + 'INSERT INTO test_joinloop_body (iteration) VALUES ((SELECT COALESCE(MAX(iteration), 0) + 1 FROM test_joinloop_body))' + ~> ( + 'SELECT COUNT(*) >= 2 FROM test_joinloop_body' + ?> df.break() + !> df.sleep(1) + ) + ) + & 'INSERT INTO test_joinloop_sibling DEFAULT VALUES' + ), + 'test-loop-in-join-branch' +) AS instance_id; + +DO $$ +DECLARE + v_id TEXT; + v_status TEXT; + v_body INT; + v_sibling INT; +BEGIN + SELECT instance_id INTO v_id FROM _t1; + RAISE NOTICE 'Test 1 - loop inside JOIN branch: instance %', v_id; + + SELECT df.wait_for_completion(v_id, 90) INTO v_status; + + IF v_status != 'completed' THEN + RAISE EXCEPTION 'TEST FAILED [join-loop]: expected completed, got %', v_status; + END IF; + + SELECT COUNT(*) INTO v_body FROM test_joinloop_body; + SELECT COUNT(*) INTO v_sibling FROM test_joinloop_sibling; + + IF v_body != 2 THEN + RAISE EXCEPTION 'TEST FAILED [join-loop]: loop body ran % time(s) (expected 2)', v_body; + END IF; + + IF v_sibling != 1 THEN + RAISE EXCEPTION 'TEST FAILED [join-loop]: sibling branch ran % time(s) (expected 1)', v_sibling; + END IF; + + RAISE NOTICE 'PASSED: loop inside JOIN branch — loop iterated twice, sibling ran once'; +END $$; + +DROP TABLE _t1; +DROP TABLE test_joinloop_body; +DROP TABLE test_joinloop_sibling; + +-- === Test 2: loop inside a RACE branch, loop iterates >= 2 and wins === +-- +-- Graph: df.race( df.loop(body, break after 2), 'SELECT pg_sleep(30)' ) +-- The loop completes after 2 iterations (a few seconds) and wins the race against the slow +-- branch. Expected: completes, loop body ran exactly 2 times. + +DROP TABLE IF EXISTS test_raceloop_body; +CREATE TABLE test_raceloop_body (id SERIAL, iteration INT, ts TIMESTAMPTZ DEFAULT clock_timestamp()); + +CREATE TEMP TABLE _t2 AS +SELECT df.start( + df.race( + df.loop( + 'INSERT INTO test_raceloop_body (iteration) VALUES ((SELECT COALESCE(MAX(iteration), 0) + 1 FROM test_raceloop_body))' + ~> ( + 'SELECT COUNT(*) >= 2 FROM test_raceloop_body' + ?> df.break() + !> df.sleep(1) + ) + ), + 'SELECT pg_sleep(30)' + ), + 'test-loop-in-race-branch' +) AS instance_id; + +DO $$ +DECLARE + v_id TEXT; + v_status TEXT; + v_body INT; +BEGIN + SELECT instance_id INTO v_id FROM _t2; + RAISE NOTICE 'Test 2 - loop inside RACE branch: instance %', v_id; + + SELECT df.wait_for_completion(v_id, 90) INTO v_status; + + IF v_status != 'completed' THEN + RAISE EXCEPTION 'TEST FAILED [race-loop]: expected completed, got %', v_status; + END IF; + + SELECT COUNT(*) INTO v_body FROM test_raceloop_body; + + IF v_body != 2 THEN + RAISE EXCEPTION 'TEST FAILED [race-loop]: loop body ran % time(s) (expected 2)', v_body; + END IF; + + RAISE NOTICE 'PASSED: loop inside RACE branch — loop iterated twice and won the race'; +END $$; + +DROP TABLE _t2; +DROP TABLE test_raceloop_body; + +-- === Test 3: topology — race(loop, sleep) spawns 1 parent + 2 subs (loop not double-wrapped) === +-- +-- Same shape as Test 2 (a RACE at the root; left = df.loop(sleep), right = a sleep) but this +-- time we assert the *orchestration topology* rather than the behaviour: +-- * parent : the function-graph orchestration (RACE is its root node) +-- * loop sub : spawned DIRECTLY as an execute-loop child, because the loop is +-- already the root of its own branch orchestration +-- * right-branch sub : an execute-subtree wrapping the sleep +-- +-- The loop branch must NOT be double-wrapped (execute-subtree -> execute-loop): doing so +-- would create a third sub-orchestration for the single loop branch. We verify the loop's +-- own node is stamped by an orchestration instance that is a *direct* child of the parent, +-- i.e. `{parent}::1::{loop_node_id}` — not `{parent}::1::{loop_node_id}::1::{loop_node_id}`. +-- +-- The stamp recorded in df.nodes.status_details->>'execution_id' has the shape +-- `{orchestration_instance_id}::{execution_id}`, and an orchestration instance id is itself +-- `{parent}::{parent_execution_id}::{branch_root_node_id}` for a spawned child. + +DROP TABLE IF EXISTS test_topo_log; +CREATE TABLE test_topo_log (id SERIAL, iteration INT, ts TIMESTAMPTZ DEFAULT clock_timestamp()); + +CREATE TEMP TABLE _t3 AS +SELECT df.start( + df.race( + df.loop( + 'INSERT INTO test_topo_log (iteration) VALUES ((SELECT COALESCE(MAX(iteration), 0) + 1 FROM test_topo_log))' + ~> ( + 'SELECT COUNT(*) >= 2 FROM test_topo_log' + ?> df.break() + !> df.sleep(1) + ) + ), + 'SELECT pg_sleep(30)' + ), + 'test-loop-in-race-topology' +) AS instance_id; + +DO $$ +DECLARE + v_id TEXT; + v_status TEXT; + v_loop_node TEXT; + v_loop_scope TEXT; + v_body_scope TEXT; + v_expected TEXT; + v_distinct_orch INT; +BEGIN + SELECT instance_id INTO v_id FROM _t3; + RAISE NOTICE 'Test 3 - race(loop, sleep) topology: instance %', v_id; + + SELECT df.wait_for_completion(v_id, 90) INTO v_status; + IF v_status != 'completed' THEN + RAISE EXCEPTION 'TEST FAILED [topo]: expected completed, got %', v_status; + END IF; + + -- The loop node id is the root of the loop sub-orchestration. + SELECT id INTO v_loop_node + FROM df.nodes + WHERE instance_id = v_id AND node_type = 'LOOP'; + + -- Strip the trailing :: token from the loop node's stamp to recover the + -- loop sub-orchestration's instance id. + SELECT substring(status_details->>'execution_id' FROM '^(.*)::[0-9]+$') + INTO v_loop_scope + FROM df.nodes + WHERE instance_id = v_id AND id = v_loop_node; + + -- A directly-spawned loop child runs in `{parent}::1::{loop_node_id}`. A double-wrapped + -- loop (subtree -> loop) would instead read `{parent}::1::{loop_node_id}::1::{loop_node_id}`. + v_expected := v_id || '::1::' || v_loop_node; + IF v_loop_scope IS DISTINCT FROM v_expected THEN + RAISE EXCEPTION 'TEST FAILED [topo]: loop sub-orchestration id = % (expected %); loop was double-wrapped', + v_loop_scope, v_expected; + END IF; + + -- The loop body (the INSERT SQL node) must run inside the SAME loop sub-orchestration. + SELECT substring(status_details->>'execution_id' FROM '^(.*)::[0-9]+$') + INTO v_body_scope + FROM df.nodes + WHERE instance_id = v_id + AND node_type = 'SQL' + AND query LIKE 'INSERT INTO test_topo_log%'; + IF v_body_scope IS DISTINCT FROM v_loop_scope THEN + RAISE EXCEPTION 'TEST FAILED [topo]: loop body scope = % (expected loop scope %)', + v_body_scope, v_loop_scope; + END IF; + + -- Count distinct orchestration instances that stamped any node of this instance. + -- Correct topology = 3 (parent RACE + loop sub + right-branch subtree). A double-wrapped + -- loop would add a fourth (the redundant subtree wrapper around the loop), so <= 3 is the + -- regression guard. (The abandoned right branch reliably stamps its node 'running' before + -- the race is decided, so 3 is what we observe.) + SELECT count(DISTINCT substring(status_details->>'execution_id' FROM '^(.*)::[0-9]+$')) + INTO v_distinct_orch + FROM df.nodes + WHERE instance_id = v_id + AND status_details->>'execution_id' IS NOT NULL; + + IF v_distinct_orch > 3 THEN + RAISE EXCEPTION 'TEST FAILED [topo]: % distinct orchestration scopes (> 3 means the loop branch was double-wrapped)', v_distinct_orch; + END IF; + + IF v_distinct_orch < 2 THEN + RAISE EXCEPTION 'TEST FAILED [topo]: % distinct orchestration scopes (expected parent + loop sub at minimum)', v_distinct_orch; + END IF; + + RAISE NOTICE 'PASSED: race(loop, sleep) = 1 parent + 2 subs; loop spawned directly (% distinct scopes)', v_distinct_orch; +END $$; + +DROP TABLE _t3; +DROP TABLE test_topo_log; + +SELECT 'TEST PASSED' AS result; diff --git a/tests/e2e/sql/53_inferred_status.sql b/tests/e2e/sql/53_inferred_status.sql index f7b278fc..70ef6855 100644 --- a/tests/e2e/sql/53_inferred_status.sql +++ b/tests/e2e/sql/53_inferred_status.sql @@ -157,4 +157,85 @@ END $$; DROP TABLE _test_state; +-- === Test 4: non-root loop — untaken IF arm in the body is skipped, nothing stuck running === +-- +-- A non-root loop runs as a child sub-orchestration that re-stamps its body nodes every +-- generation under a scope whose trailing `::`-token is the loop's inner generation. The +-- derived status must be SCOPE-AWARE: after the loop completes, the untaken ELSE arm inside +-- the body reads 'skipped', the loop node reads 'completed', and no node is left inferred as +-- 'running' (older-generation body stamps are superseded, not surfaced as in-flight). + +DROP TABLE IF EXISTS test_inferred_loop_body; +CREATE TABLE test_inferred_loop_body (id SERIAL, ts TIMESTAMPTZ DEFAULT clock_timestamp()); + +CREATE TEMP TABLE _test_state (instance_id TEXT); + +INSERT INTO _test_state SELECT df.start( + df.seq( + 'SELECT 1', + df.loop( + df.if( + 'SELECT true', + 'INSERT INTO test_inferred_loop_body DEFAULT VALUES', + 'SELECT 999' + ) + ~> ( + 'SELECT COUNT(*) >= 2 FROM test_inferred_loop_body' + ?> df.break() + !> df.sleep(1) + ) + ) + ), + 'test-inferred-nonroot-loop' +); + +DO $$ +DECLARE + inst_id TEXT; + status TEXT; + skipped_count INT; + running_count INT; + loop_inferred TEXT; +BEGIN + SELECT instance_id INTO inst_id FROM _test_state; + SELECT df.wait_for_completion(inst_id, 90) INTO status; + + IF lower(status) != 'completed' THEN + RAISE EXCEPTION 'TEST FAILED [nonroot-loop-inferred]: status = %', status; + END IF; + + -- The untaken ELSE arm ('SELECT 999') never runs in any generation → skipped. + SELECT count(*) INTO skipped_count + FROM df.instance_nodes(inst_id) + WHERE inferred_status = 'skipped'; + + IF skipped_count < 1 THEN + RAISE EXCEPTION 'TEST FAILED [nonroot-loop-inferred]: expected >= 1 skipped node, got %', skipped_count; + END IF; + + -- After completion nothing may be reported as still running (superseded older-generation + -- body stamps must not surface as in-flight). + SELECT count(*) INTO running_count + FROM df.instance_nodes(inst_id) + WHERE inferred_status = 'running'; + + IF running_count != 0 THEN + RAISE EXCEPTION 'TEST FAILED [nonroot-loop-inferred]: % node(s) still inferred running after completion', running_count; + END IF; + + -- The loop node itself is stamped completed by its child sub-orchestration on exit. + SELECT n.inferred_status INTO loop_inferred + FROM df.instance_nodes(inst_id) n + WHERE lower(n.node_type) = 'loop'; + + IF loop_inferred != 'completed' THEN + RAISE EXCEPTION 'TEST FAILED [nonroot-loop-inferred]: loop node inferred_status = % (expected completed)', loop_inferred; + END IF; + + RAISE NOTICE 'TEST PASSED: inferred_status non-root-loop-scope-aware'; +END $$; + +DROP TABLE _test_state; +DROP TABLE test_inferred_loop_body; + SELECT 'TEST PASSED' AS result; From a497e145df103f3df0c12d5eaca631f07bd5a8ca Mon Sep 17 00:00:00 2001 From: Pino de Candia <32303022+pinodeca@users.noreply.github.com> Date: Thu, 2 Jul 2026 18:49:37 +0000 Subject: [PATCH 2/3] test(e2e): renumber loop tests to 55-57 to avoid collision with main --- tests/e2e/sql/{24_nonroot_loop.sql => 55_nonroot_loop.sql} | 0 ...loop_contains_join_race.sql => 56_loop_contains_join_race.sql} | 0 ...op_in_join_race_branch.sql => 57_loop_in_join_race_branch.sql} | 0 3 files changed, 0 insertions(+), 0 deletions(-) rename tests/e2e/sql/{24_nonroot_loop.sql => 55_nonroot_loop.sql} (100%) rename tests/e2e/sql/{25_loop_contains_join_race.sql => 56_loop_contains_join_race.sql} (100%) rename tests/e2e/sql/{26_loop_in_join_race_branch.sql => 57_loop_in_join_race_branch.sql} (100%) diff --git a/tests/e2e/sql/24_nonroot_loop.sql b/tests/e2e/sql/55_nonroot_loop.sql similarity index 100% rename from tests/e2e/sql/24_nonroot_loop.sql rename to tests/e2e/sql/55_nonroot_loop.sql diff --git a/tests/e2e/sql/25_loop_contains_join_race.sql b/tests/e2e/sql/56_loop_contains_join_race.sql similarity index 100% rename from tests/e2e/sql/25_loop_contains_join_race.sql rename to tests/e2e/sql/56_loop_contains_join_race.sql diff --git a/tests/e2e/sql/26_loop_in_join_race_branch.sql b/tests/e2e/sql/57_loop_in_join_race_branch.sql similarity index 100% rename from tests/e2e/sql/26_loop_in_join_race_branch.sql rename to tests/e2e/sql/57_loop_in_join_race_branch.sql From 385dca82bffd1f7ea65d0c735251a8bacd0c44b6 Mon Sep 17 00:00:00 2001 From: Pino de Candia <32303022+pinodeca@users.noreply.github.com> Date: Thu, 2 Jul 2026 18:49:38 +0000 Subject: [PATCH 3/3] fix(worker): make node-status write fence scope-aware for nested loops The write fence compared only the root generation (2nd ::-token of the stamp), so a stale late write from an older child-loop generation could overwrite a newer generation's status/result/status_details. Replace the single-scope compare with stamp_lineage + a level-by-level lineage compare in incoming_stamp_is_superseded, mirroring node_status read-time inference but over two stamps directly (the activity sees only one existing row). Handles nested loops and never fences independent sibling branch scopes. --- src/activities/update_node_status.rs | 316 +++++++++++++++++++++------ 1 file changed, 253 insertions(+), 63 deletions(-) diff --git a/src/activities/update_node_status.rs b/src/activities/update_node_status.rs index ab2978d7..da31d376 100644 --- a/src/activities/update_node_status.rs +++ b/src/activities/update_node_status.rs @@ -36,6 +36,120 @@ async fn status_details_present(pool: &PgPool) -> bool { present } +fn execution_id_from_details(status_details: Option<&serde_json::Value>) -> Option<&str> { + status_details?.get("execution_id")?.as_str() +} + +/// Decompose a node stamp into its generation lineage and the branch node ids +/// spawned at each level. +/// +/// A stamp is `{root_instance}::{g0}::{node1}::{g1}::...::{nodeN}::{gN}`: the +/// root df instance id, then the root orchestration generation `g0`, then for +/// every spawned sub-orchestration the child root node id and that child's own +/// `continue_as_new` generation. `subtree_instance_id` composes exactly this +/// shape, so the returned `gens` reads root→innermost (`g0..gN`) and `nodes` +/// holds the branch node id taken to descend from each level to the next. +/// +/// Returns `None` for a malformed stamp (no generation, or an odd trailing +/// token count that is not `gen (node gen)*`), so a legacy/unparseable stamp +/// degrades to an unfenced write rather than silently dropping it. +fn stamp_lineage(execution_id: &str) -> Option<(Vec, Vec<&str>)> { + let tokens: Vec<&str> = execution_id.split("::").collect(); + // token[0] is the root df instance id; the remainder must be + // `gen (node gen)*`, i.e. an even total token count (>= 2). + if tokens.len() < 2 || !tokens.len().is_multiple_of(2) { + return None; + } + let mut gens = Vec::new(); + let mut nodes = Vec::new(); + let mut i = 1; + while i < tokens.len() { + gens.push(tokens[i].parse::().ok()?); + i += 1; + if i < tokens.len() { + nodes.push(tokens[i]); + i += 1; + } + } + Some((gens, nodes)) +} + +/// Whether the `incoming` write belongs to a superseded generation relative to +/// the `existing` stamp already on the row. +/// +/// Both stamps transition the SAME node, so they share a node lineage but may +/// carry different generations at each nesting level. Walk both lineages from +/// the root: the first level whose incoming generation is OLDER means the write +/// comes from a superseded ancestor (or its own older) generation and must be +/// fenced. If the generations tie at a level, the branch node id spawned next +/// decides — a different id is an independent sibling scope (one loop iteration's +/// parallel branch must never fence another's), an equal id descends, and a +/// lineage that ends is the same-or-deeper scope in the same generation and is +/// accepted. A newer incoming generation at any level is accepted. This is the +/// write-side mirror of the read-time inference in `node_status::is_superseded`, +/// but compares two stamps directly because the activity sees only one row. +fn incoming_stamp_is_superseded(incoming: &str, existing: Option<&str>) -> bool { + let Some(existing) = existing else { + return false; + }; + let (Some((inc_gens, inc_nodes)), Some((ex_gens, ex_nodes))) = + (stamp_lineage(incoming), stamp_lineage(existing)) + else { + return false; + }; + + let depth = inc_gens.len().min(ex_gens.len()); + for level in 0..depth { + if inc_gens[level] < ex_gens[level] { + return true; + } + if inc_gens[level] > ex_gens[level] { + return false; + } + // Generations tie at this level: an independent sibling scope (a + // different branch node id spawned next) is never fenced. + if let (Some(a), Some(b)) = (inc_nodes.get(level), ex_nodes.get(level)) { + if a != b { + return false; + } + } + } + false +} +fn push_status_update<'a>( + update: &mut QueryBuilder<'a, Postgres>, + status: &'a str, + result: Option<&'a str>, + execution_id: Option<&'a str>, + write_details: bool, +) { + update + .push("UPDATE df.nodes SET status = ") + .push_bind(status); + + if let Some(res) = result { + let json_result = serde_json::from_str::(res) + .unwrap_or_else(|_| serde_json::Value::String(res.to_string())); + update + .push(", result = ") + .push_bind(json_result) + .push("::jsonb"); + } else if status == "running" { + // When marking as running, clear any stale result from a previous loop + // iteration to satisfy nodes_result_status_chk + // (result IS NULL OR status IN ('completed', 'failed')). + update.push(", result = NULL"); + } + + if write_details { + let details = serde_json::json!({ "execution_id": execution_id }); + update + .push(", status_details = ") + .push_bind(details) + .push("::jsonb"); + } +} + /// Update the status and optionally the result of a node in df.nodes. pub async fn execute( ctx: ActivityContext, @@ -76,86 +190,81 @@ pub async fn execute( // pre-0.2.4 schema lacking it -- degrade to the plain status/result write). let write_details = execution_id.is_some() && status_details_present(pool.as_ref()).await; - // Fence value: the incoming root generation (second "::"-token of the stamp). - // When the stamp can't be parsed we pass i64::MAX so the fence always - // accepts (i.e. behaves as no fence) rather than silently dropping writes. - let incoming_gen: i64 = execution_id - .and_then(|s| s.split("::").nth(1)) - .and_then(|tok| tok.parse::().ok()) - .unwrap_or(i64::MAX); + let mut update = QueryBuilder::::new(""); + push_status_update(&mut update, status, result, execution_id, write_details); - let mut update = QueryBuilder::::new("UPDATE df.nodes SET status = "); - update.push_bind(status); + // Monotonic write fence: when status_details exists, first lock the row and compare the + // incoming stamp against the existing stamp using the same scope-lineage semantics as + // read-time status inference. A stale write is one whose own scope generation is older, + // or whose parent scope was spawned by an older ancestor generation. Equal-or-newer + // generations (including running -> terminal within the same generation) are accepted. + if write_details { + let mut tx = pool + .begin() + .await + .map_err(|e| format!("Failed to begin node status update transaction: {e}"))?; - if let Some(res) = result { - let json_result = serde_json::from_str::(res) - .unwrap_or_else(|_| serde_json::Value::String(res.to_string())); - update - .push(", result = ") - .push_bind(json_result) - .push("::jsonb"); - } else if status == "running" { - // When marking as running, clear any stale result from a previous loop - // iteration to satisfy nodes_result_status_chk - // (result IS NULL OR status IN ('completed', 'failed')). - update.push(", result = NULL"); - } + let existing_details = sqlx::query_scalar::<_, Option>( + "SELECT status_details FROM df.nodes WHERE id = $1 AND instance_id = $2 FOR UPDATE", + ) + .bind(node_id) + .bind(instance_id) + .fetch_optional(&mut *tx) + .await + .map_err(|e| format!("Failed to lock node status row: {e}"))?; + + let Some(existing_details) = existing_details else { + let err_msg = format!( + "update_node_status affected 0 rows: node {node_id} \ + not found in instance {instance_id}" + ); + ctx.trace_info(&err_msg); + return Err(err_msg); + }; + + let existing_execution_id = execution_id_from_details(existing_details.as_ref()); + if incoming_stamp_is_superseded(execution_id.unwrap_or_default(), existing_execution_id) { + return Ok("Node status write fenced (superseded by newer generation)".to_string()); + } - if write_details { - let details = serde_json::json!({ "execution_id": execution_id }); update - .push(", status_details = ") - .push_bind(details) - .push("::jsonb"); + .push(", updated_at = now() WHERE id = ") + .push_bind(node_id) + .push(" AND instance_id = ") + .push_bind(instance_id); + + let done = update + .build() + .execute(&mut *tx) + .await + .map_err(|e| format!("Failed to update node status: {e}"))?; + let rows = done.rows_affected(); + if rows != 1 { + let err_msg = format!( + "update_node_status affected {rows} row(s) for node {node_id} \ + in instance {instance_id} (expected exactly 1)" + ); + ctx.trace_info(&err_msg); + return Err(err_msg); + } + + tx.commit() + .await + .map_err(|e| format!("Failed to commit node status update transaction: {e}"))?; + return Ok("Node status updated".to_string()); } - // Monotonic write fence: reject a write carrying an OLDER root generation than - // the one already stamped on the row, so a stale loser/iteration drain can't - // clobber a newer generation's status. Equal-or-newer generations (including - // running -> terminal within the same generation) are accepted. update .push(", updated_at = now() WHERE id = ") .push_bind(node_id) .push(" AND instance_id = ") .push_bind(instance_id); - if write_details { - update - .push( - " AND (status_details IS NULL OR \ - COALESCE(NULLIF(split_part(status_details->>'execution_id', '::', 2), '')::bigint, 0) <= ", - ) - .push_bind(incoming_gen) - .push(")"); - } match update.build().execute(pool.as_ref()).await { Ok(done) => { let rows = done.rows_affected(); if rows == 1 { Ok("Node status updated".to_string()) - } else if write_details { - // Zero rows under an active fence is ambiguous: the row may exist - // but carry a NEWER generation (a legitimate fenced-out stale - // write), or the node may genuinely be missing. Distinguish the - // two so a fence rejection is not surfaced as a correctness error. - let exists = sqlx::query_scalar::<_, bool>( - "SELECT EXISTS (SELECT 1 FROM df.nodes WHERE id = $1 AND instance_id = $2)", - ) - .bind(node_id) - .bind(instance_id) - .fetch_one(pool.as_ref()) - .await - .unwrap_or(false); - if exists { - Ok("Node status write fenced (superseded by newer generation)".to_string()) - } else { - let err_msg = format!( - "update_node_status affected 0 rows: node {node_id} \ - not found in instance {instance_id}" - ); - ctx.trace_info(&err_msg); - Err(err_msg) - } } else { // No fence in play: exactly one row must match (instance_id, id). // Anything else (typically zero rows: a missing node or a @@ -175,3 +284,84 @@ pub async fn execute( } } } + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn accepts_terminal_write_in_same_generation() { + assert!(!incoming_stamp_is_superseded( + "root::1::loop::2", + Some("root::1::loop::2") + )); + } + + #[test] + fn rejects_older_child_loop_generation() { + assert!(incoming_stamp_is_superseded( + "root::1::loop::1", + Some("root::1::loop::2") + )); + } + + #[test] + fn rejects_child_spawned_by_older_root_generation() { + assert!(incoming_stamp_is_superseded( + "root::1::loop::5", + Some("root::2") + )); + } + + #[test] + fn accepts_sibling_scope_in_same_parent_generation() { + assert!(!incoming_stamp_is_superseded( + "root::1::left::1", + Some("root::1::right::2") + )); + } + + #[test] + fn accepts_unparseable_stamp_as_unfenced_for_backward_compatibility() { + assert!(!incoming_stamp_is_superseded( + "legacy", + Some("root::1::loop::2") + )); + } + + #[test] + fn rejects_inner_loop_write_from_older_outer_generation() { + // Nested loops: an inner (non-root) loop whose outer loop advanced from + // generation 1 -> 2 is re-spawned under scope root::2::inner. A stale late + // write from the previous outer generation (root::1::inner::5) must be fenced + // out even though its own child generation (5) is numerically higher than the + // current row's child generation (1) -- the deciding factor is that its parent + // scope was spawned by the older outer generation. + assert!(incoming_stamp_is_superseded( + "root::1::inner::5", + Some("root::2::inner::1") + )); + } + + #[test] + fn accepts_newer_inner_loop_write_from_same_outer_generation() { + // Same nested shape, but the outer generation matches and the inner + // generation advanced: this is the current in-flight write and must be + // accepted, not fenced. + assert!(!incoming_stamp_is_superseded( + "root::2::inner::5", + Some("root::2::inner::1") + )); + } + + #[test] + fn accepts_sibling_branch_nested_under_same_generations() { + // Two parallel branches (left/right) spawned inside the same inner-loop + // iteration are independent scopes: neither may fence the other even + // though their trailing generations differ. + assert!(!incoming_stamp_is_superseded( + "root::1::inner::2::left::1", + Some("root::1::inner::2::right::9") + )); + } +}