diff --git a/CHANGELOG.md b/CHANGELOG.md index fd788a3ad..73c698993 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -47,6 +47,26 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/). `DurableBackendEnum` enum dispatcher keep the backend surface closed with no `Box` on the hot path. Promise, timer, and checkpoint journal entries fail closed (`DurableError::UnsupportedEntryKind`) until the promise/timer layer lands. (#4946) +- `feat(durable)`: added the durable step primitive and the `&self` `DurableContext` — the execution + heart every adapter wraps. `ctx.step()` runs an operation closure and journals its result; on + resume it replays the journaled result without re-invoking the closure (INV-10, FR-DE-02), and + `ctx.step_recorded()` / `StepOutcome::{Live, Replayed}` expose the live-vs-replayed distinction for + double-emit suppression. Step ids are assigned structurally via an `AtomicU32` (INV-2): + `parallel()` returns a `ParallelScope` whose children get contiguous, eagerly-assigned ids, so a + parallel batch's ids are independent of completion order. Before replaying a result the context + compares the journaled step's `IdempotencyKey` (a BLAKE3 structural fingerprint folding the step + name, effect, and op fingerprint) against the current descriptor's; a mismatch raises + `DurableError::ReplayDivergence`, marks the journal `aborted`, and disables replay so the execution + restarts fresh (INV-3, FR-DE-03). `StepDescriptor::exactly_once_guarded` enforces the + construction-time ambiguity rule — a destructive, security-relevant, money-moving, or custom + guarded step without an explicit `OnAmbiguous` is rejected with + `DurableError::AmbiguityPolicyRequired` (FR-DE-09). A guarded step commits its `EffectIntent` + (ACKed) before the closure runs and its `StepResult` (ACKed) after (FR-DE-04); a guarded effect + that already committed a result is recognized by an idempotency-key point lookup and not re-fired, + even on a post-divergence fresh run (INV-13). Every ambiguous-window resolution emits a mandatory + structured audit record (FR-DE-10), and a writer timeout degrades the step to non-durable mode + rather than blocking (INV-12). The `ReplayCursor` reads the journal in bounded step-range segments + (NFR-DE-02). The promise, timer, and retention layers land in follow-up issues. (#4947) ### Fixed diff --git a/Cargo.lock b/Cargo.lock index 7e91122e5..c7510f7e9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -10863,6 +10863,7 @@ version = "0.21.4" dependencies = [ "blake3", "bytes", + "futures", "metrics", "serde", "serde_json", diff --git a/crates/zeph-durable/Cargo.toml b/crates/zeph-durable/Cargo.toml index 17dd73854..4745eb4aa 100644 --- a/crates/zeph-durable/Cargo.toml +++ b/crates/zeph-durable/Cargo.toml @@ -24,6 +24,7 @@ blake3.workspace = true bytes.workspace = true metrics.workspace = true serde = { workspace = true, features = ["derive"] } +serde_json.workspace = true thiserror.workspace = true tokio = { workspace = true, features = ["macros", "sync", "time"] } tracing.workspace = true @@ -31,6 +32,7 @@ uuid = { workspace = true, features = ["serde", "v7"] } zeph-db.workspace = true [dev-dependencies] +futures.workspace = true serde_json.workspace = true tempfile.workspace = true tokio = { workspace = true, features = ["macros", "rt-multi-thread", "sync", "time"] } diff --git a/crates/zeph-durable/README.md b/crates/zeph-durable/README.md index f759f9392..b4d63888d 100644 --- a/crates/zeph-durable/README.md +++ b/crates/zeph-durable/README.md @@ -12,10 +12,11 @@ point of failure instead of restarting from scratch. > [!IMPORTANT] > This crate is **under active construction** (spec-064, epic > [#4707](https://github.com/bug-ops/zeph/issues/4707)). The type-level foundation, the AEAD payload -> contract, and the persistence engine — `LocalBackend` (a dedicated `durable.db` pool), the -> background `JournalWriter` actor, and the sealed `ExecutionBackend` dispatcher — have landed. The -> `DurableContext` facade, the replay cursor, the promise/timer layer, and the consuming adapters -> land in follow-up issues of the epic. +> contract, the persistence engine (`LocalBackend`, the background `JournalWriter` actor, the sealed +> `ExecutionBackend` dispatcher), and the execution heart — the `&self` `DurableContext` with +> deterministic step ids, the fingerprint-guarded replay cursor, the exactly-once intent/result +> protocol, and `parallel()` batches — have landed. The promise/timer layer, journal retention, the +> CLI/TUI integration, and the consuming adapters land in follow-up issues of the epic. ## Overview @@ -37,7 +38,15 @@ a dedicated `durable.db` (SQLite) or a feature-gated Restate backend. - **journal** — the `Journal` trait plus its data model: `JournalEntry`, the closed `EntryKind` enum, and `ExecutionStatus`. - **effect** — `EffectClass`, the per-step side-effect contract (`Idempotent` / `AtLeastOnce` / - `ExactlyOnceGuarded`). + `ExactlyOnceGuarded`), plus `EffectIntentSubClass` and the `OnAmbiguous` policy that govern the + ambiguous window. +- **step** — the durable step typestate: `StepDescriptor` (with the construction-time ambiguity + rule), `StepHandle` (exposes the idempotency key for boundary dedup), `StepError`, the + `Live`/`Replayed` `StepOutcome`, and the `DurableStep` record. +- **handle** — the `&self` `DurableContext` front door: `step()` / `step_recorded()` / + `parallel()`, deterministic `AtomicU32` step ids, a BLAKE3 replay-divergence guard, the + exactly-once intent/result protocol, and the `ParallelScope` for completion-order-independent + batches. - **cipher** — the `PayloadCipher` AEAD seal/open contract, the `PayloadAad` location binding, and the read-side `ensure_payload_within_limit` guard. The concrete cipher lives in a consuming crate (INV-1). @@ -124,6 +133,34 @@ assert_eq!(cfg.journal_ack_timeout_ms, 5_000); assert_eq!(cfg.max_payload_bytes, 1_048_576); ``` +A `DurableContext` wraps each unit of work in a step. A fresh run executes the closure and journals +its result; a resumed run replays the journaled result without re-running it. The closure receives a +`StepHandle` carrying the step's idempotency key for boundary deduplication: + +```rust,ignore +use zeph_durable::{DurableContext, EffectIntentSubClass, OnAmbiguous, StepDescriptor}; + +// Read-only work is idempotent and replays for free. +let preview: String = ctx + .step(StepDescriptor::idempotent("read_head", b"tool:read:/var/log".to_vec()), + |_handle| async { Ok(read_first_line().await?) }) + .await?; + +// A paid call is exactly-once-guarded: its intent is journaled before the call and its result +// after, and the idempotency key is forwarded to the provider for boundary dedup. +let reply: String = ctx + .step( + StepDescriptor::exactly_once_guarded( + "llm_call", + EffectIntentSubClass::CostBearingOrBoundaryIdempotent, + Some(OnAmbiguous::Skip), + b"llm:gpt:summarize".to_vec(), + )?, + |handle| async move { Ok(call_provider(handle.idempotency_key()).await?) }, + ) + .await?; +``` + ## MSRV Rust **1.95** (Edition 2024, resolver 3). diff --git a/crates/zeph-durable/src/backend.rs b/crates/zeph-durable/src/backend.rs index 97c774891..67902bce1 100644 --- a/crates/zeph-durable/src/backend.rs +++ b/crates/zeph-durable/src/backend.rs @@ -24,9 +24,11 @@ //! promise-resolution, and timer-scan methods named in the spec land alongside the //! `DurableContext` (the trait can gain them without breaking callers). +use std::sync::Arc; + use crate::config::RetentionPolicy; use crate::error::DurableError; -use crate::ids::{ExecutionId, JournalSeq}; +use crate::ids::{ExecutionId, IdempotencyKey, JournalSeq}; use crate::journal::{ExecutionStatus, Journal, JournalEntry}; pub mod local; @@ -82,6 +84,25 @@ pub struct BackendCapabilities { pub trait ExecutionBackend: Journal + Send + Sync + crate::sealed::Sealed { /// Return this backend's stable capability description. fn capabilities(&self) -> BackendCapabilities; + + /// Look up a committed `StepResult` anywhere in an execution by its [`IdempotencyKey`]. + /// + /// This is the point-lookup behind INV-13: after a [`DurableError::ReplayDivergence`] the + /// execution restarts fresh, but a guarded effect that already committed its result must not + /// re-fire. Before invoking a guarded operation the durable step consults this lookup; a `Some` + /// result means the effect already succeeded and its journaled value is returned instead. The + /// key uniquely locates the row via the `idx_durable_journal_idem_key` index, so the lookup is + /// `O(log n)`. + /// + /// # Errors + /// + /// Returns [`DurableError::Decode`] if the located row cannot be reconstructed, or + /// [`DurableError::Storage`] if the query fails. + fn lookup_committed_result( + &self, + id: ExecutionId, + idem_key: IdempotencyKey, + ) -> impl std::future::Future, DurableError>> + Send; } /// Closed enum dispatch over the compiled-in backends. @@ -108,7 +129,12 @@ pub trait ExecutionBackend: Journal + Send + Sync + crate::sealed::Sealed { #[non_exhaustive] pub enum DurableBackendEnum { /// The always-compiled local backend journaling to a dedicated `durable.db`. - Local(LocalBackend), + /// + /// Held behind an [`Arc`] so the same backing instance can be shared with the + /// [`JournalWriter`](crate::JournalWriter) (which owns the write path) while this enum serves + /// the read path consumed by the [`ReplayCursor`](crate::DurableContext) — both observe one + /// `durable.db` pool. + Local(Arc), } impl crate::sealed::Sealed for DurableBackendEnum {} @@ -156,4 +182,14 @@ impl ExecutionBackend for DurableBackendEnum { Self::Local(backend) => backend.capabilities(), } } + + async fn lookup_committed_result( + &self, + id: ExecutionId, + idem_key: IdempotencyKey, + ) -> Result, DurableError> { + match self { + Self::Local(backend) => backend.lookup_committed_result(id, idem_key).await, + } + } } diff --git a/crates/zeph-durable/src/backend/local.rs b/crates/zeph-durable/src/backend/local.rs index 18eb4f5d6..2bc229664 100644 --- a/crates/zeph-durable/src/backend/local.rs +++ b/crates/zeph-durable/src/backend/local.rs @@ -23,11 +23,14 @@ //! //! This revision journals the step-execution entries — [`EntryKind::StepResult`] and //! [`EntryKind::EffectIntent`] — that the durable step primitive records, plus execution -//! lifecycle (open and [`finalize`](Journal::finalize)) and the writer's restart anchor -//! (`max_seq`). Promise, timer, and checkpoint entries are journaled by the -//! promise/timer and retention layers; until then [`append`](Journal::append) of those kinds fails -//! closed with [`DurableError::UnsupportedEntryKind`] rather than dropping their state. The -//! retention sweep ([`prune`](Journal::prune)) is a no-op stub here. +//! lifecycle (open and [`finalize`](Journal::finalize)), the writer's restart anchor (`max_seq`), +//! and the idempotency-key point lookup +//! ([`lookup_committed_result`](ExecutionBackend::lookup_committed_result)) that lets a guarded step +//! recognize an already-committed effect after a replay divergence (INV-13). Promise, timer, and +//! checkpoint entries are journaled by the promise/timer and retention layers; until then +//! [`append`](Journal::append) of those kinds fails closed with +//! [`DurableError::UnsupportedEntryKind`] rather than dropping their state. The retention sweep +//! ([`prune`](Journal::prune)) is a no-op stub here. use std::fmt; use std::sync::Arc; @@ -265,6 +268,49 @@ impl LocalBackend { Ok(()) } + /// Look up a committed `StepResult` anywhere in an execution by its [`IdempotencyKey`]. + /// + /// Backs INV-13: a guarded effect that already committed its result must not re-fire after a + /// replay divergence restarts the execution fresh. Returns the (opened) `StepResult` entry when + /// one exists, or `None`. The `idx_durable_journal_idem_key` partial index makes this an + /// `O(log n)` point lookup rather than a scan. + /// + /// Span: `durable.journal.lookup_idem`. + /// + /// # Errors + /// + /// Returns [`DurableError::Storage`] if the query fails, or [`DurableError::Decode`] if the + /// located row cannot be reconstructed. + pub(crate) async fn lookup_committed_result( + &self, + id: ExecutionId, + idem_key: IdempotencyKey, + ) -> Result, DurableError> { + let span = tracing::info_span!( + "durable.journal.lookup_idem", + execution_id = %id.as_uuid(), + found = tracing::field::Empty, + ); + async move { + let rows: Vec = zeph_db::query_as(sql!( + "SELECT seq, step_id, entry_kind, idem_key, effect_class, payload, payload_version, hmac, created_at + FROM durable_journal + WHERE execution_id = ? AND idem_key = ? AND entry_kind = 'step_result' + ORDER BY seq LIMIT 1" + )) + .bind(id.as_uuid().to_string()) + .bind(idem_key.as_bytes().to_vec()) + .fetch_all(&self.pool) + .await + .map_err(|e| DurableError::storage("lookup_idem", e))?; + let entry = self.rows_to_entries(id, rows).await?.into_iter().next(); + tracing::Span::current().record("found", entry.is_some()); + Ok(entry) + } + .instrument(span) + .await + } + /// Read the highest committed [`JournalSeq`], or `None` for an empty journal. /// /// The [`JournalWriter`](crate::JournalWriter) calls this on (re)start to anchor itself at the @@ -654,6 +700,14 @@ impl ExecutionBackend for LocalBackend { max_payload: usize::try_from(self.max_payload_bytes).unwrap_or(usize::MAX), } } + + async fn lookup_committed_result( + &self, + id: ExecutionId, + idem_key: IdempotencyKey, + ) -> Result, DurableError> { + LocalBackend::lookup_committed_result(self, id, idem_key).await + } } /// Column values for a single `durable_journal` row, ready to bind. @@ -689,7 +743,7 @@ type JournalRowRead = ( ); /// Current Unix time in milliseconds, clamped into `i64` and never panicking. -fn now_unix_millis() -> i64 { +pub(crate) fn now_unix_millis() -> i64 { SystemTime::now() .duration_since(UNIX_EPOCH) .map_or(0, |d| i64::try_from(d.as_millis()).unwrap_or(i64::MAX)) @@ -1031,6 +1085,44 @@ mod tests { assert_eq!(segment[1].step_id, StepId::new(3)); } + #[tokio::test] + async fn lookup_committed_result_finds_by_idem_key() { + let backend = mem_backend(1_048_576).await; + let exec = ExecutionId::new(); + backend + .open_execution(exec, ExecutionKind::AgentTurn) + .await + .unwrap(); + let entry = step_result(exec, 0, b"committed"); + let idem_key = match &entry.entry { + EntryKind::StepResult { + idempotency_key, .. + } => *idempotency_key, + other => panic!("unexpected entry kind: {other:?}"), + }; + backend.append(entry).await.unwrap(); + + let found = backend + .lookup_committed_result(exec, idem_key) + .await + .unwrap() + .expect("committed result is located by its idempotency key"); + match &found.entry { + EntryKind::StepResult { payload, .. } => assert_eq!(payload.as_ref(), b"committed"), + other => panic!("unexpected entry kind: {other:?}"), + } + + // A key that was never committed yields nothing rather than erroring. + let absent = IdempotencyKey::derive(exec, StepId::new(99), b"never"); + assert!( + backend + .lookup_committed_result(exec, absent) + .await + .unwrap() + .is_none() + ); + } + #[tokio::test] async fn capabilities_describe_the_local_profile() { let backend = mem_backend(4096).await; diff --git a/crates/zeph-durable/src/effect.rs b/crates/zeph-durable/src/effect.rs index f78757abb..954a2d129 100644 --- a/crates/zeph-durable/src/effect.rs +++ b/crates/zeph-durable/src/effect.rs @@ -7,8 +7,15 @@ //! the exactly-once machinery: the replay cursor uses it to decide whether a journaled result may //! be returned without re-running the operation. //! -//! The ambiguity-policy types (`OnAmbiguous`, `EffectIntentSubClass`) and the construction-time -//! policy rule land alongside the durable step primitive in a follow-up issue. +//! For [`EffectClass::ExactlyOnceGuarded`] steps the contract is sharper: an +//! [`EffectIntentSubClass`] further classifies *what kind* of side effect the step performs, and an +//! [`OnAmbiguous`] policy decides what to do when a crash leaves the journal in the *ambiguous +//! window* — an `EffectIntent` committed, but no `StepResult`, so it is unknown whether the external +//! effect actually fired. The combination is enforced at construction time +//! ([`crate::StepDescriptor`]): a destructive, security-relevant, money-moving, or custom guarded +//! step that omits an explicit [`OnAmbiguous`] is rejected with +//! [`DurableError::AmbiguityPolicyRequired`](crate::DurableError::AmbiguityPolicyRequired), forcing +//! the safety decision to the call site rather than a silent runtime default (FR-DE-09). /// How a step's side effect behaves under replay. /// @@ -61,6 +68,104 @@ impl EffectClass { } } +/// What an [`EffectClass::ExactlyOnceGuarded`] step actually does, refining the ambiguity policy. +/// +/// The sub-class drives the construction-time policy rule: a guarded step whose effect is +/// destructive, security-relevant, money-moving, or custom MUST carry an explicit [`OnAmbiguous`] +/// (FR-DE-09); only a cost-bearing / boundary-idempotent effect gets a safe default +/// ([`OnAmbiguous::Skip`]). The sub-class is consumed when the descriptor is built and never stored +/// on the journal row — the persisted classification is the coarser [`EffectClass`]. +/// +/// # Examples +/// +/// ``` +/// use zeph_durable::EffectIntentSubClass; +/// +/// // A paid LLM call carrying a provider idempotency header is safe to skip on an ambiguous replay. +/// assert!(!EffectIntentSubClass::CostBearingOrBoundaryIdempotent.requires_explicit_policy()); +/// // A fund transfer must declare its ambiguity policy explicitly. +/// assert!(EffectIntentSubClass::MoneyMoving.requires_explicit_policy()); +/// ``` +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +pub enum EffectIntentSubClass { + /// A paid or rate-limited boundary effect that the external service deduplicates by + /// idempotency key (e.g. a paid LLM call). Default ambiguity policy: [`OnAmbiguous::Skip`]. + CostBearingOrBoundaryIdempotent, + /// An irreversible mutation (file delete, record drop). Requires an explicit policy. + Destructive, + /// A permission or credential mutation. Requires an explicit policy. + SecurityRelevant, + /// A financial transfer. Requires an explicit policy. + MoneyMoving, + /// A caller-defined effect with no built-in default. Requires an explicit policy. + Custom, +} + +impl EffectIntentSubClass { + /// Whether a guarded step of this sub-class MUST be given an explicit [`OnAmbiguous`] policy. + /// + /// Only [`EffectIntentSubClass::CostBearingOrBoundaryIdempotent`] has a safe default; every + /// other sub-class forces the decision to the call site. + #[must_use] + pub fn requires_explicit_policy(self) -> bool { + !matches!(self, Self::CostBearingOrBoundaryIdempotent) + } + + /// Return the canonical lower-snake-case string for diagnostics and audit records. + #[must_use] + pub fn as_str(self) -> &'static str { + match self { + Self::CostBearingOrBoundaryIdempotent => "cost_bearing_or_boundary_idempotent", + Self::Destructive => "destructive", + Self::SecurityRelevant => "security_relevant", + Self::MoneyMoving => "money_moving", + Self::Custom => "custom", + } + } +} + +/// What to do when a guarded step resumes inside the *ambiguous window*. +/// +/// The ambiguous window is the gap between committing the `EffectIntent` and committing the +/// `StepResult`: on resume the journal proves the effect was *about to* fire, but not whether it +/// did. The policy resolves that uncertainty. Every resolution emits a mandatory structured audit +/// record (FR-DE-10). +/// +/// # Examples +/// +/// ``` +/// use zeph_durable::OnAmbiguous; +/// +/// assert_eq!(OnAmbiguous::Skip.as_str(), "skip"); +/// ``` +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +pub enum OnAmbiguous { + /// Assume the effect happened. Safe for cost-bearing / boundary-idempotent effects, where the + /// external service deduplicates the re-issued operation by its idempotency key, so re-running + /// the closure cannot double-apply the effect (it is deduplicated at the boundary). + Skip, + /// Surface the ambiguity to the operator with [`DurableError::AmbiguousEffect`]. The required + /// choice for destructive and security-relevant effects, where guessing is unacceptable. + /// + /// [`DurableError::AmbiguousEffect`]: crate::DurableError::AmbiguousEffect + Fail, + /// Assume the effect did *not* happen and re-run the closure. For effects misclassified as + /// guarded that are in fact safe to repeat. + Rerun, +} + +impl OnAmbiguous { + /// Return the canonical lower-snake-case string used in the mandatory audit record. + #[must_use] + pub fn as_str(self) -> &'static str { + match self { + Self::Skip => "skip", + Self::Fail => "fail", + Self::Rerun => "rerun", + } + } +} + #[cfg(test)] mod tests { use super::*; @@ -74,4 +179,33 @@ mod tests { "exactly_once_guarded" ); } + + #[test] + fn only_cost_bearing_subclass_has_a_default_policy() { + assert!(!EffectIntentSubClass::CostBearingOrBoundaryIdempotent.requires_explicit_policy()); + for sub in [ + EffectIntentSubClass::Destructive, + EffectIntentSubClass::SecurityRelevant, + EffectIntentSubClass::MoneyMoving, + EffectIntentSubClass::Custom, + ] { + assert!( + sub.requires_explicit_policy(), + "{} must require an explicit ambiguity policy", + sub.as_str() + ); + } + } + + #[test] + fn subclass_and_policy_strings_are_stable() { + assert_eq!( + EffectIntentSubClass::CostBearingOrBoundaryIdempotent.as_str(), + "cost_bearing_or_boundary_idempotent" + ); + assert_eq!(EffectIntentSubClass::Destructive.as_str(), "destructive"); + assert_eq!(OnAmbiguous::Skip.as_str(), "skip"); + assert_eq!(OnAmbiguous::Fail.as_str(), "fail"); + assert_eq!(OnAmbiguous::Rerun.as_str(), "rerun"); + } } diff --git a/crates/zeph-durable/src/error.rs b/crates/zeph-durable/src/error.rs index 38bbea410..593eeeb0f 100644 --- a/crates/zeph-durable/src/error.rs +++ b/crates/zeph-durable/src/error.rs @@ -106,6 +106,38 @@ pub enum DurableError { #[source] source: Box, }, + + /// A step's operation closure returned an error on a fresh execution. The step did not complete, + /// so no `StepResult` is journaled; on a later resume the step re-runs (or, for a guarded effect, + /// its [`OnAmbiguous`](crate::OnAmbiguous) policy applies). The closure's own error is attached + /// as the source. + #[error("step '{step}' operation failed")] + StepFailed { + /// The name of the step whose operation closure failed. + step: &'static str, + /// The closure's underlying error. + #[source] + source: Box, + }, + + /// A guarded step resumed inside the ambiguous window (an `EffectIntent` is journaled but no + /// `StepResult`) and its policy is [`OnAmbiguous::Fail`](crate::OnAmbiguous::Fail): the layer + /// refuses to guess whether the irreversible effect fired and surfaces the decision to the + /// operator instead of re-running or skipping it. + #[error("step {step_id} resumed in the ambiguous window and its on_ambiguous policy is 'fail'")] + AmbiguousEffect { + /// The step caught in the ambiguous window. + step_id: StepId, + }, + + /// A step result could not be serialized into journal bytes before sealing. The step's value is + /// the consumer's serializable type, so this indicates a faulty `Serialize` implementation; it + /// fails closed rather than journaling a partial payload. Per INV-5 only the step name is named. + #[error("step '{step}' result could not be serialized for the journal")] + Serialize { + /// The name of the step whose result failed to serialize. + step: &'static str, + }, } impl DurableError { @@ -122,6 +154,17 @@ impl DurableError { source: source.into(), } } + + /// Wrap a step operation closure's failure as a [`DurableError::StepFailed`]. + /// + /// Keeps the originating error reachable via [`std::error::Error::source`] while the `Display` + /// line stays metadata-only (INV-5). + pub(crate) fn step_failed(step: &'static str, source: crate::step::StepError) -> Self { + Self::StepFailed { + step, + source: source.into_inner(), + } + } } #[cfg(test)] @@ -158,4 +201,27 @@ mod tests { assert!(!rendered.contains("secret-bind-value")); assert!(std::error::Error::source(&err).is_some()); } + + #[test] + fn step_failed_names_step_but_not_the_source_detail() { + let err = DurableError::step_failed( + "transfer_funds", + crate::step::StepError::new("secret-operation-detail"), + ); + let rendered = err.to_string(); + assert!(rendered.contains("transfer_funds")); + assert!(!rendered.contains("secret-operation-detail")); + assert!(std::error::Error::source(&err).is_some()); + } + + #[test] + fn ambiguous_and_serialize_messages_are_metadata_only() { + let ambiguous = DurableError::AmbiguousEffect { + step_id: StepId::new(4), + }; + assert!(ambiguous.to_string().contains("step 4")); + + let serialize = DurableError::Serialize { step: "persist" }; + assert!(serialize.to_string().contains("persist")); + } } diff --git a/crates/zeph-durable/src/handle.rs b/crates/zeph-durable/src/handle.rs new file mode 100644 index 000000000..957a6cf48 --- /dev/null +++ b/crates/zeph-durable/src/handle.rs @@ -0,0 +1,1012 @@ +// SPDX-FileCopyrightText: 2026 Andrei G +// SPDX-License-Identifier: MIT OR Apache-2.0 + +//! The `&self` durable execution context. +//! +//! [`DurableContext`] is the front door to durable execution: a consumer wraps each unit of work in +//! [`step`](DurableContext::step) (or a [`parallel`](DurableContext::parallel) batch) and the +//! context journals it, replays it on resume, and enforces the exactly-once contract. The entire +//! surface is `&self` — step ids come from an [`AtomicU32`], so concurrent steps under a single +//! shared context are sound without a mutable borrow (system-invariants §10). +//! +//! # Deterministic step ids (INV-2) +//! +//! A step id is assigned the moment a step is *started*, in program order, via `fetch_add`. A +//! [`ParallelScope`] assigns each child's id eagerly when the child future is constructed (before +//! any of them is polled), so a parallel batch's ids are fixed by argument order and are independent +//! of completion order. The same program therefore re-derives the same ids on replay. +//! +//! # Replay and the divergence guard (INV-3) +//! +//! On resume the program re-runs and each step consults the `ReplayCursor`: +//! +//! - a committed result replays without invoking the closure (INV-10); +//! - an intent-only entry means the *ambiguous window* — the step's +//! [`OnAmbiguous`] policy decides (and a mandatory audit record is emitted, FR-DE-10); +//! - nothing journaled means run fresh. +//! +//! Before replaying a result the context compares the journaled step's [`IdempotencyKey`] — the +//! step's structural fingerprint — against the key derived from the *current* descriptor. A mismatch +//! is a [`DurableError::ReplayDivergence`]: the journal is marked `aborted` and replay is disabled +//! so the execution restarts fresh, never returning a result for a structurally different step. +//! +//! # Exactly-once and the ambiguous window (FR-DE-04, INV-13) +//! +//! A guarded step commits its `EffectIntent` (acknowledged) *before* the closure runs and its +//! `StepResult` (acknowledged) *after*. If a replay divergence forces a fresh run, a guarded effect +//! that already committed a result is recognized by its idempotency key (a point lookup) and its +//! journaled value is returned rather than re-firing the effect. + +use std::fmt::Write as _; +use std::sync::Arc; +use std::sync::atomic::{AtomicBool, AtomicU32, Ordering}; +use std::time::SystemTime; + +use serde::Serialize; +use serde::de::DeserializeOwned; +use tracing::Instrument as _; + +use crate::backend::local::now_unix_millis; +use crate::backend::{DurableBackendEnum, ExecutionBackend as _}; +use crate::config::DurableConfig; +use crate::effect::{EffectClass, OnAmbiguous}; +use crate::error::DurableError; +use crate::ids::{ExecutionId, ExecutionKind, IdempotencyKey, StepId}; +use crate::journal::{EntryKind, ExecutionStatus, Journal as _, JournalEntry}; +use crate::replay::{DEFAULT_SEGMENT_STEPS, ReplayCursor, StepReplay}; +use crate::step::{ + DurableStep, PAYLOAD_VERSION, StepDescriptor, StepError, StepHandle, deserialize_result, + serialize_result, +}; +use crate::writer::JournalWriterHandle; + +/// The `&self` durable execution context handed to a consumer's program. +/// +/// Construct it with [`DurableContext::new`] from a shared backend (the read path) and a +/// [`JournalWriterHandle`] (the write path) — both bound to the same `durable.db`. A fresh +/// execution opens with `is_resume = false`; a resumed one with `is_resume = true`, which activates +/// the `ReplayCursor`. +#[derive(Debug)] +pub struct DurableContext { + execution_id: ExecutionId, + kind: ExecutionKind, + next_step: AtomicU32, + diverged: AtomicBool, + is_resume: bool, + cursor: ReplayCursor, + backend: Arc, + writer: JournalWriterHandle, + max_steps_per_execution: u32, + max_payload_bytes: u64, +} + +impl DurableContext { + /// Build a context for an execution. + /// + /// `backend` is the shared read path (the `ReplayCursor` reads segments through it) and must + /// be the same `durable.db` instance the `writer` commits to. `is_resume` activates replay: + /// pass `true` when reopening an execution that already has journal entries (as + /// [`LocalBackend::open_execution`](crate::LocalBackend::open_execution) reports), `false` for a + /// brand-new execution. + #[must_use] + pub fn new( + execution_id: ExecutionId, + kind: ExecutionKind, + is_resume: bool, + backend: Arc, + writer: JournalWriterHandle, + config: &DurableConfig, + ) -> Self { + let cursor = ReplayCursor::new(backend.clone(), execution_id, DEFAULT_SEGMENT_STEPS); + Self { + execution_id, + kind, + next_step: AtomicU32::new(0), + diverged: AtomicBool::new(false), + is_resume, + cursor, + backend, + writer, + max_steps_per_execution: config.max_steps_per_execution, + max_payload_bytes: config.max_payload_bytes, + } + } + + /// The execution this context drives. + #[must_use] + pub fn execution_id(&self) -> ExecutionId { + self.execution_id + } + + /// The execution's category. + #[must_use] + pub fn kind(&self) -> ExecutionKind { + self.kind + } + + /// Run a durable step, returning just its value. + /// + /// On a fresh execution the operation `op` runs and its result is journaled; on replay the + /// journaled result is returned and `op` is never invoked (INV-10). The closure receives a + /// [`StepHandle`] carrying the step's [`IdempotencyKey`] for boundary deduplication. + /// + /// Use [`step_recorded`](DurableContext::step_recorded) when the live/replayed distinction or the + /// step id matters. + /// + /// # Errors + /// + /// - [`DurableError::StepFailed`] if `op` returns an error on a fresh run. + /// - [`DurableError::ReplayDivergence`] if the journaled step at this position has a different + /// structural fingerprint (INV-3). + /// - [`DurableError::AmbiguousEffect`] for an [`OnAmbiguous::Fail`] guarded step caught in the + /// ambiguous window. + /// - [`DurableError::Serialize`] / [`DurableError::Decode`] on a payload codec failure, or a + /// storage error from the journal. + /// + /// # Examples + /// + /// ```no_run + /// # async fn run(ctx: &zeph_durable::DurableContext) -> Result<(), zeph_durable::DurableError> { + /// use zeph_durable::StepDescriptor; + /// + /// let lines: usize = ctx + /// .step(StepDescriptor::idempotent("count", b"tool:count".to_vec()), |_handle| async { + /// Ok(42) + /// }) + /// .await?; + /// assert_eq!(lines, 42); + /// # Ok(()) } + /// ``` + pub async fn step(&self, desc: StepDescriptor, op: F) -> Result + where + T: Serialize + DeserializeOwned + Send, + F: FnOnce(StepHandle) -> Fut + Send, + Fut: Future> + Send, + { + let step_id = self.assign_step_id(); + self.run_step_at(step_id, desc, op) + .await + .map(DurableStep::into_value) + } + + /// Run a durable step, returning the full [`DurableStep`] record (id, key, and outcome). + /// + /// # Errors + /// + /// Identical to [`step`](DurableContext::step). + pub async fn step_recorded( + &self, + desc: StepDescriptor, + op: F, + ) -> Result, DurableError> + where + T: Serialize + DeserializeOwned + Send, + F: FnOnce(StepHandle) -> Fut + Send, + Fut: Future> + Send, + { + let step_id = self.assign_step_id(); + self.run_step_at(step_id, desc, op).await + } + + /// Open a [`ParallelScope`] whose children receive contiguous, eagerly-assigned step ids. + /// + /// Construct the children synchronously (e.g. with a `Vec` or `.map(...).collect()`), then drive + /// them with `join_all`/`try_join_all`; their ids are fixed by construction order and are stable + /// across replay regardless of completion order (INV-2). + #[must_use] + pub fn parallel(&self) -> ParallelScope<'_> { + ParallelScope { ctx: self } + } + + /// Durable timer entry point — **not yet wired**; durable timers land with the promise/timer + /// layer (C5). + /// + /// # Errors + /// + /// Currently always returns [`DurableError::UnsupportedEntryKind`]: the backend does not yet + /// persist `timer_armed` entries. The signature is stable so consumers can target it ahead of + /// the timer layer. + #[allow( + clippy::unused_async, + reason = "stub: the C5 timer layer adds the awaiting body; the async signature is the stable surface" + )] + pub async fn sleep_until(&self, due: SystemTime) -> Result<(), DurableError> { + let due_ms = due + .duration_since(SystemTime::UNIX_EPOCH) + .map_or(0, |d| i64::try_from(d.as_millis()).unwrap_or(i64::MAX)); + tracing::debug!( + execution_id = %self.execution_id.as_uuid(), + due_ms, + "durable timers are not yet wired; sleep_until lands with the promise/timer layer" + ); + Err(DurableError::UnsupportedEntryKind { + kind: "timer_armed", + }) + } + + /// Assign the next deterministic step id (INV-2). + fn assign_step_id(&self) -> StepId { + StepId::new(self.next_step.fetch_add(1, Ordering::Relaxed)) + } + + /// Whether replay is currently consulted (a resume that has not diverged). + fn replay_active(&self) -> bool { + self.is_resume && !self.diverged.load(Ordering::Acquire) + } + + /// The core step state machine shared by the sequential and parallel entry points. + async fn run_step_at( + &self, + step_id: StepId, + desc: StepDescriptor, + op: F, + ) -> Result, DurableError> + where + T: Serialize + DeserializeOwned + Send, + F: FnOnce(StepHandle) -> Fut + Send, + Fut: Future> + Send, + { + if step_id.value() >= self.max_steps_per_execution { + return Err(DurableError::StepCapExceeded { + cap: self.max_steps_per_execution, + }); + } + let effect = desc.effect(); + let idem_key = + IdempotencyKey::derive(self.execution_id, step_id, &desc.fingerprint_input()); + + let span = tracing::info_span!( + "durable.step.run", + step_id = step_id.value(), + effect_class = effect.as_str(), + replayed = tracing::field::Empty, + ); + async move { + // 1) Sequential replay: consult the cursor for this position. + if self.replay_active() { + match self.cursor.lookup(step_id).await? { + StepReplay::Result(entry) => { + self.check_divergence(step_id, idem_key, &entry).await?; + let value = replay_value::(step_id, effect, &entry)?; + tracing::Span::current().record("replayed", true); + return Ok(DurableStep::replayed(step_id, idem_key, value)); + } + StepReplay::IntentOnly(entry) => { + self.check_divergence(step_id, idem_key, &entry).await?; + return self.resolve_ambiguous(step_id, idem_key, &desc, op).await; + } + StepReplay::Fresh => {} + } + } + + // 2) INV-13: a guarded effect that already committed a result must not re-fire, even on a + // fresh run that follows a divergence. A point lookup by idempotency key catches it. + if effect == EffectClass::ExactlyOnceGuarded + && let Some(entry) = self + .backend + .lookup_committed_result(self.execution_id, idem_key) + .await? + { + let value = replay_value::(step_id, effect, &entry)?; + tracing::Span::current().record("replayed", true); + return Ok(DurableStep::replayed(step_id, idem_key, value)); + } + + // 3) Fresh execution of the step. + tracing::Span::current().record("replayed", false); + if effect == EffectClass::ExactlyOnceGuarded { + // FR-DE-04: the intent is committed and ACKed before the effect fires. + let intent = self.intent_entry(step_id, idem_key, effect); + self.append_acked_degrading(intent, desc.name()).await?; + } + let value = self.run_op(op, step_id, idem_key, desc.name()).await?; + // Serialize before the journal await so no `&T` is held across it (that would force a + // `T: Sync` bound the consumer's value need not satisfy); the owned value moves into the + // returned record afterward. + let payload = serialize_result(&value, desc.name())?; + self.journal_result(payload, step_id, idem_key, effect, desc.name()) + .await?; + Ok(DurableStep::live(step_id, idem_key, value)) + } + .instrument(span) + .await + } + + /// Compare a journaled step's fingerprint against the current descriptor's; abort on mismatch. + async fn check_divergence( + &self, + step_id: StepId, + expected: IdempotencyKey, + entry: &JournalEntry, + ) -> Result<(), DurableError> { + // The idempotency key folds in the descriptor name, effect, and op fingerprint, so equality + // is the one-BLAKE3-compare fingerprint check the divergence guard requires (INV-3). + if entry.entry.idempotency_key() == Some(expected) { + return Ok(()); + } + self.on_divergence(step_id).await; + Err(DurableError::ReplayDivergence { step_id }) + } + + /// Mark the execution aborted and disable replay so it restarts fresh (FR-DE-03). + async fn on_divergence(&self, step_id: StepId) { + self.diverged.store(true, Ordering::Release); + tracing::warn!( + execution_id = %self.execution_id.as_uuid(), + step_id = step_id.value(), + "replay divergence detected; marking journal aborted and restarting fresh" + ); + if let Err(error) = self + .backend + .finalize(self.execution_id, ExecutionStatus::Aborted) + .await + { + tracing::warn!(%error, "failed to mark diverged execution aborted"); + } + } + + /// Apply the [`OnAmbiguous`] policy for a guarded step resumed in the ambiguous window. + async fn resolve_ambiguous( + &self, + step_id: StepId, + idem_key: IdempotencyKey, + desc: &StepDescriptor, + op: F, + ) -> Result, DurableError> + where + T: Serialize + DeserializeOwned + Send, + F: FnOnce(StepHandle) -> Fut + Send, + Fut: Future> + Send, + { + let effect = desc.effect(); + let policy = desc.on_ambiguous().unwrap_or(OnAmbiguous::Fail); + self.emit_ambiguous_audit(step_id, effect, idem_key, policy); + match policy { + // Fail: refuse to guess; surface the irreversible-effect uncertainty to the operator. + OnAmbiguous::Fail => Err(DurableError::AmbiguousEffect { step_id }), + // Skip re-runs the closure trusting the boundary to deduplicate the re-issued effect by + // its idempotency key; Rerun re-runs it assuming the effect never fired. At this layer + // both re-execute (the intent already exists, so it is not re-journaled) and the audit + // record above distinguishes which policy was applied. + OnAmbiguous::Skip | OnAmbiguous::Rerun => { + let value = self.run_op(op, step_id, idem_key, desc.name()).await?; + let payload = serialize_result(&value, desc.name())?; + self.journal_result(payload, step_id, idem_key, effect, desc.name()) + .await?; + Ok(DurableStep::live(step_id, idem_key, value)) + } + } + } + + /// Invoke the operation closure, mapping its failure to [`DurableError::StepFailed`]. + async fn run_op( + &self, + op: F, + step_id: StepId, + idem_key: IdempotencyKey, + name: &'static str, + ) -> Result + where + F: FnOnce(StepHandle) -> Fut + Send, + Fut: Future> + Send, + { + let handle = StepHandle::new(step_id, idem_key); + op(handle) + .await + .map_err(|err| DurableError::step_failed(name, err)) + } + + /// Journal a step's already-serialized result with the durability class its effect requires. + async fn journal_result( + &self, + payload: bytes::Bytes, + step_id: StepId, + idem_key: IdempotencyKey, + effect: EffectClass, + name: &'static str, + ) -> Result<(), DurableError> { + // INV-11 write-side guard: reject an oversized payload before it reaches the writer. + crate::cipher::ensure_payload_within_limit(payload.len(), self.max_payload_bytes)?; + let entry = JournalEntry { + seq: None, + execution_id: self.execution_id, + kind: self.kind, + step_id, + entry: EntryKind::StepResult { + idempotency_key: idem_key, + payload, + effect, + payload_version: PAYLOAD_VERSION, + }, + created_at_ms: now_unix_millis(), + }; + match effect { + // Exactly-once results are ACKed so durability-on-return holds (FR-DE-04). + EffectClass::ExactlyOnceGuarded => self.append_acked_degrading(entry, name).await, + // Buffered results group-commit; a crash before the flush simply re-runs the step. + EffectClass::Idempotent | EffectClass::AtLeastOnce => { + self.writer.append_buffered(entry); + Ok(()) + } + } + } + + /// Build an `EffectIntent` entry for a guarded step. + fn intent_entry( + &self, + step_id: StepId, + idem_key: IdempotencyKey, + effect: EffectClass, + ) -> JournalEntry { + JournalEntry { + seq: None, + execution_id: self.execution_id, + kind: self.kind, + step_id, + entry: EntryKind::EffectIntent { + idempotency_key: idem_key, + effect, + // The backend is the HMAC keyholder and stamps the row HMAC itself when configured. + hmac: None, + }, + created_at_ms: now_unix_millis(), + } + } + + /// ACK an append, degrading to non-durable mode on a writer timeout (INV-12) rather than failing. + async fn append_acked_degrading( + &self, + entry: JournalEntry, + name: &'static str, + ) -> Result<(), DurableError> { + match self.writer.append_acked(entry).await { + Ok(_) => Ok(()), + Err(DurableError::JournalUnavailable) => { + tracing::warn!( + step = name, + "journal writer unavailable; this step degrades to non-durable mode" + ); + Ok(()) + } + Err(error) => Err(error), + } + } + + /// Emit the mandatory structured audit record for an ambiguous-window resolution (FR-DE-10). + fn emit_ambiguous_audit( + &self, + step_id: StepId, + effect: EffectClass, + idem_key: IdempotencyKey, + policy: OnAmbiguous, + ) { + tracing::warn!( + target: "durable.audit", + execution_id = %self.execution_id.as_uuid(), + step_id = step_id.value(), + effect_class = effect.as_str(), + idem_key = %idem_key_hex8(idem_key), + on_ambiguous = policy.as_str(), + "durable step resumed in the ambiguous window; applying on_ambiguous policy" + ); + } +} + +/// A handle for spawning durable steps with eagerly-assigned, contiguous step ids. +/// +/// Returned by [`DurableContext::parallel`]. Each [`step`](ParallelScope::step) call assigns its id +/// synchronously, *before* returning the future, so building a batch of children fixes their ids in +/// construction order — completion order is then irrelevant (INV-2). +#[derive(Debug, Clone, Copy)] +pub struct ParallelScope<'a> { + ctx: &'a DurableContext, +} + +impl<'a> ParallelScope<'a> { + /// Construct a durable step future with its id assigned eagerly. + /// + /// The returned future runs (or replays) the step when awaited; its [`StepId`] is already fixed. + /// Collect the futures synchronously, then await them concurrently. + /// + /// # Errors + /// + /// The awaited future fails for the same reasons as [`DurableContext::step`]. + pub fn step( + &self, + desc: StepDescriptor, + op: F, + ) -> impl Future, DurableError>> + Send + 'a + where + T: Serialize + DeserializeOwned + Send + 'a, + F: FnOnce(StepHandle) -> Fut + Send + 'a, + Fut: Future> + Send + 'a, + { + let step_id = self.ctx.assign_step_id(); + let ctx = self.ctx; + async move { ctx.run_step_at(step_id, desc, op).await } + } +} + +/// Extract and decode a replayed step's value from its journaled `StepResult`. +fn replay_value( + step_id: StepId, + effect: EffectClass, + entry: &JournalEntry, +) -> Result { + let _span = tracing::info_span!( + "durable.step.replay", + step_id = step_id.value(), + effect_class = effect.as_str(), + ) + .entered(); + match &entry.entry { + EntryKind::StepResult { payload, .. } => deserialize_result(payload), + _ => Err(DurableError::Decode { + context: "replayed entry is not a step result", + }), + } +} + +/// Hex-encode the first 8 bytes of an idempotency key for an audit record. +/// +/// The key is a BLAKE3 hash, not secret material; the spec redaction rule shows only its first 8 +/// bytes in CLI/audit output (INV-5). +fn idem_key_hex8(key: IdempotencyKey) -> String { + let mut out = String::with_capacity(16); + for byte in &key.as_bytes()[..8] { + let _ = write!(out, "{byte:02x}"); + } + out +} + +#[cfg(all(test, feature = "sqlite", not(feature = "postgres")))] +mod tests { + use super::*; + use crate::backend::local::LocalBackend; + use crate::config::DurableConfig; + use crate::effect::EffectIntentSubClass; + use crate::writer::JournalWriter; + use std::pin::Pin; + use std::sync::atomic::AtomicU32; + use tokio::task::JoinHandle; + + /// A type-erased durable-step future, so a heterogeneous batch of step closures can share one + /// `Vec` for `join_all` (distinct closures otherwise produce distinct opaque future types). + type StepFut<'a> = + Pin, DurableError>> + Send + 'a>>; + + fn fast_config() -> DurableConfig { + DurableConfig { + journal_flush_interval_ms: 5, + journal_ack_timeout_ms: 2000, + ..DurableConfig::default() + } + } + + /// A running context over a fresh in-memory backend, with the writer task spawned. + struct Harness { + ctx: DurableContext, + backend: Arc, + writer_task: JoinHandle<()>, + handle: JournalWriterHandle, + } + + impl Harness { + async fn open(exec: ExecutionId, is_resume: bool) -> Self { + let local = Arc::new(LocalBackend::open(":memory:", 1_048_576).await.unwrap()); + local.init().await.unwrap(); + local + .open_execution(exec, ExecutionKind::AgentTurn) + .await + .unwrap(); + let (writer, handle) = JournalWriter::new(local.clone(), &fast_config()); + let writer_task = tokio::spawn(writer.run()); + let backend = Arc::new(DurableBackendEnum::Local(local.clone())); + let ctx = DurableContext::new( + exec, + ExecutionKind::AgentTurn, + is_resume, + backend, + handle.clone(), + &fast_config(), + ); + Self { + ctx, + backend: local, + writer_task, + handle, + } + } + + /// Reopen over the *same* backing journal to drive a resume run. + fn resume(&self) -> DurableContext { + let backend = Arc::new(DurableBackendEnum::Local(self.backend.clone())); + DurableContext::new( + self.ctx.execution_id, + ExecutionKind::AgentTurn, + true, + backend, + self.handle.clone(), + &fast_config(), + ) + } + + async fn shutdown(self) { + // Some tests build a second context (a resume / fresh run) that clones the writer + // handle; that clone keeps the channel open, so the writer never stops on its own. + // Abort it directly rather than awaiting a graceful drain (data was already flushed + // before any assertion that needed it). + self.writer_task.abort(); + let _ = self.writer_task.await; + } + } + + #[tokio::test] + async fn fresh_step_runs_op_and_journals_result() { + // FR-DE-01: a fresh step records its result in the journal. + let exec = ExecutionId::new(); + let h = Harness::open(exec, false).await; + let value: u32 = h + .ctx + .step( + StepDescriptor::idempotent("count", b"op".to_vec()), + |_| async { Ok(7) }, + ) + .await + .unwrap(); + assert_eq!(value, 7); + h.handle.flush().await.unwrap(); + + let entries = h.backend.read_execution(exec).await.unwrap(); + assert_eq!(entries.len(), 1); + assert!(matches!(entries[0].entry, EntryKind::StepResult { .. })); + h.shutdown().await; + } + + #[tokio::test] + async fn replayed_idempotent_step_skips_op() { + // INV-10 / FR-DE-02: a replayed idempotent step returns the journaled value without re-running. + let exec = ExecutionId::new(); + let h = Harness::open(exec, false).await; + let desc = || StepDescriptor::idempotent("count", b"op".to_vec()); + let first: u32 = h.ctx.step(desc(), |_| async { Ok(11) }).await.unwrap(); + assert_eq!(first, 11); + h.handle.flush().await.unwrap(); + + let resumed = h.resume(); + let ran_again = Arc::new(AtomicU32::new(0)); + let counter = ran_again.clone(); + let replayed: u32 = resumed + .step(desc(), move |_| { + let counter = counter.clone(); + async move { + counter.fetch_add(1, Ordering::SeqCst); + Ok(999) + } + }) + .await + .unwrap(); + assert_eq!( + replayed, 11, + "the journaled value is returned, not the new one" + ); + assert_eq!( + ran_again.load(Ordering::SeqCst), + 0, + "the operation closure must not run on replay" + ); + h.shutdown().await; + } + + #[tokio::test] + async fn guarded_step_commits_intent_before_result() { + // FR-DE-04: an EffectIntent is committed before op, a StepResult after. + let exec = ExecutionId::new(); + let h = Harness::open(exec, false).await; + let desc = StepDescriptor::exactly_once_guarded( + "charge", + EffectIntentSubClass::CostBearingOrBoundaryIdempotent, + Some(OnAmbiguous::Skip), + b"op".to_vec(), + ) + .unwrap(); + let _: u32 = h.ctx.step(desc, |_| async { Ok(5) }).await.unwrap(); + h.handle.flush().await.unwrap(); + + let entries = h.backend.read_execution(exec).await.unwrap(); + let kinds: Vec<_> = entries.iter().map(|e| e.entry.tag()).collect(); + assert_eq!( + kinds, + vec!["effect_intent", "step_result"], + "intent is journaled before the result" + ); + h.shutdown().await; + } + + #[tokio::test] + async fn replay_divergence_on_fingerprint_mismatch() { + // INV-3 / FR-DE-03: a structurally different step at the same id aborts and restarts fresh. + let exec = ExecutionId::new(); + let h = Harness::open(exec, false).await; + let _: u32 = h + .ctx + .step( + StepDescriptor::idempotent("count", b"v1".to_vec()), + |_| async { Ok(1) }, + ) + .await + .unwrap(); + h.handle.flush().await.unwrap(); + + let resumed = h.resume(); + // Same step position, different op fingerprint → different structural fingerprint. + let err = resumed + .step::( + StepDescriptor::idempotent("count", b"v2".to_vec()), + |_| async { Ok(2) }, + ) + .await + .unwrap_err(); + assert!(matches!(err, DurableError::ReplayDivergence { .. })); + + let (status,): (String,) = zeph_db::query_as(zeph_db::sql!( + "SELECT status FROM durable_executions WHERE execution_id = ?" + )) + .bind(exec.as_uuid().to_string()) + .fetch_one(h.backend.pool()) + .await + .unwrap(); + assert_eq!(status, "aborted", "the diverged journal is marked aborted"); + h.shutdown().await; + } + + #[tokio::test] + async fn ambiguous_window_fail_policy_surfaces_error() { + // FR-DE-14 path: an intent without a result, policy = Fail, must not re-fire. + let exec = ExecutionId::new(); + let h = Harness::open(exec, false).await; + let step_id = StepId::new(0); + let idem = IdempotencyKey::derive( + exec, + step_id, + &StepDescriptor::exactly_once_guarded( + "delete", + EffectIntentSubClass::Destructive, + Some(OnAmbiguous::Fail), + b"op".to_vec(), + ) + .unwrap() + .fingerprint_input(), + ); + // Seed only the intent (the crash happened before the result committed). + h.backend + .append(JournalEntry { + seq: None, + execution_id: exec, + kind: ExecutionKind::AgentTurn, + step_id, + entry: EntryKind::EffectIntent { + idempotency_key: idem, + effect: EffectClass::ExactlyOnceGuarded, + hmac: None, + }, + created_at_ms: 0, + }) + .await + .unwrap(); + + let resumed = h.resume(); + let ran = Arc::new(AtomicU32::new(0)); + let counter = ran.clone(); + let err = resumed + .step::( + StepDescriptor::exactly_once_guarded( + "delete", + EffectIntentSubClass::Destructive, + Some(OnAmbiguous::Fail), + b"op".to_vec(), + ) + .unwrap(), + move |_| { + let counter = counter.clone(); + async move { + counter.fetch_add(1, Ordering::SeqCst); + Ok(1) + } + }, + ) + .await + .unwrap_err(); + assert!(matches!(err, DurableError::AmbiguousEffect { .. })); + assert_eq!( + ran.load(Ordering::SeqCst), + 0, + "a fail-policy ambiguous step must not re-fire the effect" + ); + h.shutdown().await; + } + + #[tokio::test] + async fn inv13_committed_guarded_result_is_not_refired() { + // INV-13: on a fresh run after divergence, an already-committed guarded result is returned + // via its idempotency key without re-firing. + let exec = ExecutionId::new(); + let h = Harness::open(exec, false).await; + let desc = || { + StepDescriptor::exactly_once_guarded( + "transfer", + EffectIntentSubClass::MoneyMoving, + Some(OnAmbiguous::Fail), + b"op".to_vec(), + ) + .unwrap() + }; + let first: u32 = h.ctx.step(desc(), |_| async { Ok(500) }).await.unwrap(); + assert_eq!(first, 500); + h.handle.flush().await.unwrap(); + + // A "fresh run after divergence": replay is OFF, but the guarded point lookup must still find + // the committed result. Build a non-resume context over the same journal. + let backend = Arc::new(DurableBackendEnum::Local(h.backend.clone())); + let fresh = DurableContext::new( + exec, + ExecutionKind::AgentTurn, + false, + backend, + h.handle.clone(), + &fast_config(), + ); + let ran = Arc::new(AtomicU32::new(0)); + let counter = ran.clone(); + let value: u32 = fresh + .step(desc(), move |_| { + let counter = counter.clone(); + async move { + counter.fetch_add(1, Ordering::SeqCst); + Ok(0) + } + }) + .await + .unwrap(); + assert_eq!(value, 500, "the pre-committed guarded result is returned"); + assert_eq!( + ran.load(Ordering::SeqCst), + 0, + "the guarded effect must not re-fire" + ); + h.shutdown().await; + } + + #[tokio::test] + async fn parallel_step_ids_are_completion_order_independent() { + // INV-2: a parallel batch with shuffled completion order yields deterministic step ids. + let exec = ExecutionId::new(); + let h = Harness::open(exec, false).await; + let scope = h.ctx.parallel(); + // Construct children in argument order; each gets its id eagerly at the `scope.step` call + // (before any future is polled). Box them so the differently-typed closures share one Vec. + let futures: Vec = vec![ + Box::pin(scope.step::( + StepDescriptor::idempotent("a", b"a".to_vec()), + |handle: StepHandle| async move { + // Finishes last despite being constructed first. + tokio::time::sleep(std::time::Duration::from_millis(30)).await; + Ok(handle.step_id().value()) + }, + )), + Box::pin(scope.step::( + StepDescriptor::idempotent("b", b"b".to_vec()), + |handle: StepHandle| async move { + tokio::time::sleep(std::time::Duration::from_millis(5)).await; + Ok(handle.step_id().value()) + }, + )), + Box::pin(scope.step::( + StepDescriptor::idempotent("c", b"c".to_vec()), + |handle: StepHandle| async move { Ok(handle.step_id().value()) }, + )), + ]; + let results = futures::future::try_join_all(futures).await.unwrap(); + let ids: Vec = results + .iter() + .map(DurableStep::step_id) + .map(StepId::value) + .collect(); + // Each child observed the id assigned at construction, regardless of completion order. + assert_eq!(ids, vec![0, 1, 2]); + h.shutdown().await; + } + + #[tokio::test] + async fn concurrent_steps_under_shared_ref_are_sound() { + // System-invariants §10: concurrent step() calls under a single &self assign unique ids and + // all journal successfully. + let exec = ExecutionId::new(); + let h = Harness::open(exec, false).await; + let scope = h.ctx.parallel(); + let futures: Vec = (0..16) + .map(|i| { + Box::pin(scope.step::( + StepDescriptor::idempotent("worker", format!("op:{i}").into_bytes()), + move |handle: StepHandle| async move { Ok(handle.step_id().value()) }, + )) as StepFut + }) + .collect(); + let results = futures::future::try_join_all(futures).await.unwrap(); + let mut ids: Vec = results + .iter() + .map(DurableStep::step_id) + .map(StepId::value) + .collect(); + ids.sort_unstable(); + ids.dedup(); + assert_eq!(ids.len(), 16, "all 16 concurrent steps got unique ids"); + h.handle.flush().await.unwrap(); + assert_eq!(h.backend.read_execution(exec).await.unwrap().len(), 16); + h.shutdown().await; + } + + #[tokio::test] + async fn op_failure_surfaces_as_step_failed_without_journaling() { + let exec = ExecutionId::new(); + let h = Harness::open(exec, false).await; + let err = h + .ctx + .step::( + StepDescriptor::idempotent("boom", b"op".to_vec()), + |_| async { Err(StepError::new("op exploded")) }, + ) + .await + .unwrap_err(); + assert!(matches!(err, DurableError::StepFailed { step: "boom", .. })); + h.handle.flush().await.unwrap(); + assert!( + h.backend.read_execution(exec).await.unwrap().is_empty(), + "a failed step journals no result" + ); + h.shutdown().await; + } + + #[tokio::test] + async fn step_cap_is_enforced() { + let exec = ExecutionId::new(); + let local = Arc::new(LocalBackend::open(":memory:", 1_048_576).await.unwrap()); + local.init().await.unwrap(); + local + .open_execution(exec, ExecutionKind::AgentTurn) + .await + .unwrap(); + let (writer, handle) = JournalWriter::new(local.clone(), &fast_config()); + let task = tokio::spawn(writer.run()); + let backend = Arc::new(DurableBackendEnum::Local(local.clone())); + let ctx = DurableContext::new( + exec, + ExecutionKind::AgentTurn, + false, + backend, + handle.clone(), + &DurableConfig { + max_steps_per_execution: 1, + ..fast_config() + }, + ); + // Step id 0 is allowed; step id 1 exceeds the cap of 1. + ctx.step::( + StepDescriptor::idempotent("ok", b"op".to_vec()), + |_| async { Ok(0) }, + ) + .await + .unwrap(); + let err = ctx + .step::( + StepDescriptor::idempotent("over", b"op".to_vec()), + |_| async { Ok(0) }, + ) + .await + .unwrap_err(); + assert!(matches!(err, DurableError::StepCapExceeded { cap: 1 })); + drop(ctx); + drop(handle); + task.await.unwrap(); + } +} diff --git a/crates/zeph-durable/src/journal.rs b/crates/zeph-durable/src/journal.rs index 17953a605..430265e12 100644 --- a/crates/zeph-durable/src/journal.rs +++ b/crates/zeph-durable/src/journal.rs @@ -164,6 +164,28 @@ impl EntryKind { pub fn tag(&self) -> &'static str { self.tag_enum().as_str() } + + /// Return the entry's [`IdempotencyKey`], for the two step-bearing kinds that carry one. + /// + /// The replay-divergence guard (INV-3) compares the journaled key of a `StepResult` / + /// `EffectIntent` against the key freshly derived from the replayed descriptor; control and + /// promise/timer entries have no idempotency key and return `None`. + #[must_use] + pub fn idempotency_key(&self) -> Option { + match self { + Self::StepResult { + idempotency_key, .. + } + | Self::EffectIntent { + idempotency_key, .. + } => Some(*idempotency_key), + Self::PromiseCreated { .. } + | Self::PromiseResolved { .. } + | Self::TimerArmed { .. } + | Self::TimerFired { .. } + | Self::Checkpoint { .. } => None, + } + } } /// One ordered entry in a journal. diff --git a/crates/zeph-durable/src/lib.rs b/crates/zeph-durable/src/lib.rs index b0383f27a..5329203b0 100644 --- a/crates/zeph-durable/src/lib.rs +++ b/crates/zeph-durable/src/lib.rs @@ -40,7 +40,15 @@ //! [`JournalWriterHandle`]: group-commit for buffered appends, flush-before-commit ACKs for //! exactly-once entries, and `MAX(seq)` restart resume. //! -//! The replay cursor and the durable step primitive build on these in follow-up issues. +//! Execution surface: +//! +//! - [`step`] — the durable step typestate: [`StepDescriptor`] (with the construction-time +//! ambiguity rule), [`StepHandle`], [`StepError`], [`StepOutcome`], and [`DurableStep`]. +//! - [`handle`] — the `&self` [`DurableContext`] front door: deterministic step ids, replay with a +//! BLAKE3 divergence guard, the exactly-once intent/result protocol, and [`ParallelScope`] for +//! completion-order-independent parallel batches. +//! +//! The promise, timer, and retention layers build on these in follow-up issues. //! //! # Schema ownership //! @@ -62,6 +70,7 @@ //! assert_eq!(key, IdempotencyKey::derive(execution, StepId::new(0), b"tool:read_file")); //! ``` +mod replay; mod sealed; pub mod backend; @@ -69,8 +78,10 @@ pub mod cipher; pub mod config; pub mod effect; pub mod error; +pub mod handle; pub mod ids; pub mod journal; +pub mod step; pub mod writer; #[doc(hidden)] @@ -81,8 +92,10 @@ pub use cipher::{ CipherError, EntryKindTag, PayloadAad, PayloadCipher, ensure_payload_within_limit, }; pub use config::{DurableBackend, DurableConfig, EncryptionGate, RetentionPolicy}; -pub use effect::EffectClass; +pub use effect::{EffectClass, EffectIntentSubClass, OnAmbiguous}; pub use error::DurableError; +pub use handle::{DurableContext, ParallelScope}; pub use ids::{ExecutionId, ExecutionKind, IdempotencyKey, JournalSeq, PromiseId, StepId, TimerId}; pub use journal::{EntryKind, ExecutionStatus, Journal, JournalEntry}; +pub use step::{DurableStep, StepDescriptor, StepError, StepHandle, StepOutcome}; pub use writer::{JournalWriter, JournalWriterHandle}; diff --git a/crates/zeph-durable/src/replay.rs b/crates/zeph-durable/src/replay.rs new file mode 100644 index 000000000..5709e7c8e --- /dev/null +++ b/crates/zeph-durable/src/replay.rs @@ -0,0 +1,389 @@ +// SPDX-FileCopyrightText: 2026 Andrei G +// SPDX-License-Identifier: MIT OR Apache-2.0 + +//! The replay cursor that walks a resumed execution's journal. +//! +//! On resume, the program re-runs from the start and re-derives the same [`StepId`] sequence +//! (INV-2). For each step the [`DurableContext`](crate::DurableContext) asks the [`ReplayCursor`] +//! what the journal already knows about that position: +//! +//! - [`StepReplay::Result`] — a committed `StepResult` exists; the step's value is replayed and the +//! operation closure is skipped. +//! - [`StepReplay::IntentOnly`] — only an `EffectIntent` exists (the *ambiguous window*); the step's +//! [`OnAmbiguous`](crate::OnAmbiguous) policy decides what to do. +//! - [`StepReplay::Fresh`] — nothing is journaled for this position; the step runs for the first +//! time. The first `Fresh` step is the resume point. +//! +//! # Bounded memory +//! +//! The cursor reads the journal in step-range *segments* (default +//! [`DEFAULT_SEGMENT_STEPS`]) via [`Journal::read_execution_range`], prefetching one segment ahead +//! as replay advances rather than loading the whole journal — `O(segment)` resident memory for a +//! resume (NFR-DE-02). Each step is journaled at most twice (an intent and a result), so a segment +//! reads `2 × segment_steps` rows; the last (possibly truncated) step group of a full batch is +//! deferred to the next read so a step is never observed half-loaded. A looked-up step is removed +//! from the resident window, so consumed entries do not accumulate. +//! +//! The cursor is consulted only on resume; a fresh execution never touches it. + +use std::collections::BTreeMap; +use std::sync::Arc; + +use tokio::sync::Mutex; +use tracing::Instrument as _; + +use crate::backend::DurableBackendEnum; +use crate::error::DurableError; +use crate::ids::{ExecutionId, StepId}; +use crate::journal::{EntryKind, Journal as _, JournalEntry}; + +/// Default number of steps the cursor prefetches per segment read. +pub(crate) const DEFAULT_SEGMENT_STEPS: u32 = 100; + +/// What the journal knows about a [`StepId`] on resume. +#[derive(Debug)] +pub(crate) enum StepReplay { + /// A committed `StepResult` for this step (payload already opened by the backend). + Result(JournalEntry), + /// Only an `EffectIntent` for this step — the ambiguous window. + IntentOnly(JournalEntry), + /// Nothing journaled for this step: run it fresh. + Fresh, +} + +/// The journaled entries observed for a single step position. +#[derive(Debug, Default)] +struct LoadedStep { + result: Option, + intent: Option, +} + +/// Interior, lock-guarded loading state of a [`ReplayCursor`]. +#[derive(Debug)] +struct CursorState { + /// Resident window of loaded steps, keyed by `StepId` value; entries are removed on lookup. + loaded: BTreeMap, + /// The next step position a segment read should start from. + next_step_to_load: u32, + /// Whether the journal has been read to its end (a short final segment was seen). + exhausted: bool, +} + +/// A forward-walking, segment-buffered view of a resumed execution's journal. +/// +/// Built once when the execution is opened for resume and consulted per step. Cloning is not +/// supported — the cursor is owned by its [`DurableContext`](crate::DurableContext). +#[derive(Debug)] +pub(crate) struct ReplayCursor { + backend: Arc, + execution_id: ExecutionId, + segment_steps: u32, + // A tokio async mutex (intentionally held across the segment-read await): it serializes + // concurrent parallel-step lookups onto a single in-order loader so a segment is read once, + // never racing or double-reading. No other lock is taken while it is held, so it cannot + // deadlock. + state: Mutex, +} + +impl ReplayCursor { + /// Build a cursor over `execution_id`, prefetching `segment_steps` steps per read. + /// + /// Construction performs no I/O; the first segment is read lazily on the first + /// [`lookup`](ReplayCursor::lookup). + pub(crate) fn new( + backend: Arc, + execution_id: ExecutionId, + segment_steps: u32, + ) -> Self { + let _span = tracing::info_span!( + "durable.replay.cursor.build", + execution_id = %execution_id.as_uuid(), + ) + .entered(); + Self { + backend, + execution_id, + segment_steps: segment_steps.max(1), + state: Mutex::new(CursorState { + loaded: BTreeMap::new(), + next_step_to_load: 0, + exhausted: false, + }), + } + } + + /// Row budget for one segment read (each step is journaled at most twice). + fn segment_rows(&self) -> usize { + usize::try_from(self.segment_steps) + .unwrap_or(usize::MAX / 2) + .saturating_mul(2) + } + + /// Return what the journal knows about `step_id`, consuming it from the resident window. + /// + /// # Errors + /// + /// Returns a [`DurableError`] if a segment read fails or a stored entry cannot be decoded. + pub(crate) async fn lookup(&self, step_id: StepId) -> Result { + let step = step_id.value(); + let mut state = self.state.lock().await; + self.ensure_loaded_through(&mut state, step).await?; + Ok(match state.loaded.remove(&step) { + Some(LoadedStep { + result: Some(result), + .. + }) => StepReplay::Result(result), + Some(LoadedStep { + result: None, + intent: Some(intent), + }) => StepReplay::IntentOnly(intent), + Some(LoadedStep { + result: None, + intent: None, + }) + | None => StepReplay::Fresh, + }) + } + + /// Read forward until `step` is covered or the journal is exhausted. + async fn ensure_loaded_through( + &self, + state: &mut CursorState, + step: u32, + ) -> Result<(), DurableError> { + while !state.exhausted && state.next_step_to_load <= step { + self.load_segment(state).await?; + } + Ok(()) + } + + /// Read one segment from `state.next_step_to_load` and fold it into the resident window. + async fn load_segment(&self, state: &mut CursorState) -> Result<(), DurableError> { + let from = state.next_step_to_load; + let limit = self.segment_rows(); + let rows = async { + let rows = self + .backend + .read_execution_range(self.execution_id, from, limit) + .await?; + tracing::Span::current().record("count", rows.len()); + Ok::<_, DurableError>(rows) + } + .instrument(tracing::info_span!( + "durable.replay.cursor.read_segment", + from_step_id = from, + count = tracing::field::Empty, + )) + .await?; + + if rows.len() < limit { + // A short batch is the journal's tail: there is nothing past it to truncate. + let mut max_step = from; + for entry in rows { + max_step = max_step.max(entry.step_id.value()); + insert_entry(state, entry); + } + state.exhausted = true; + state.next_step_to_load = max_step.saturating_add(1); + return Ok(()); + } + + // A full batch may have split the final step group across the row limit. Defer that group + // and re-read it next time so a step is never observed with only part of its entries — + // unless the whole batch is a single step (impossible with ≥2-row budget per step), in + // which case insert it to guarantee forward progress. + let min_step = rows.iter().map(|e| e.step_id.value()).min().unwrap_or(from); + let max_step = rows.iter().map(|e| e.step_id.value()).max().unwrap_or(from); + if min_step == max_step { + for entry in rows { + insert_entry(state, entry); + } + state.next_step_to_load = max_step.saturating_add(1); + } else { + for entry in rows { + if entry.step_id.value() == max_step { + continue; + } + insert_entry(state, entry); + } + state.next_step_to_load = max_step; + } + Ok(()) + } +} + +/// Fold one journal entry into the resident window, keyed by its step position. +/// +/// Only the two step-bearing kinds are tracked; this revision journals no others, so any other kind +/// is ignored defensively. +fn insert_entry(state: &mut CursorState, entry: JournalEntry) { + let step = entry.step_id.value(); + match entry.entry { + EntryKind::StepResult { .. } => state.loaded.entry(step).or_default().result = Some(entry), + EntryKind::EffectIntent { .. } => { + state.loaded.entry(step).or_default().intent = Some(entry); + } + EntryKind::PromiseCreated { .. } + | EntryKind::PromiseResolved { .. } + | EntryKind::TimerArmed { .. } + | EntryKind::TimerFired { .. } + | EntryKind::Checkpoint { .. } => {} + } +} + +#[cfg(all(test, feature = "sqlite", not(feature = "postgres")))] +mod tests { + use super::*; + use crate::backend::local::LocalBackend; + use crate::effect::EffectClass; + use crate::ids::{ExecutionKind, IdempotencyKey}; + use bytes::Bytes; + + async fn backend_with(steps: &[(u32, bool)]) -> (Arc, ExecutionId) { + let local = LocalBackend::open(":memory:", 1_048_576).await.unwrap(); + local.init().await.unwrap(); + let exec = ExecutionId::new(); + local + .open_execution(exec, ExecutionKind::AgentTurn) + .await + .unwrap(); + for &(step, guarded) in steps { + let step_id = StepId::new(step); + let idem = IdempotencyKey::derive(exec, step_id, b"op"); + if guarded { + local + .append(JournalEntry { + seq: None, + execution_id: exec, + kind: ExecutionKind::AgentTurn, + step_id, + entry: EntryKind::EffectIntent { + idempotency_key: idem, + effect: EffectClass::ExactlyOnceGuarded, + hmac: None, + }, + created_at_ms: 0, + }) + .await + .unwrap(); + } + local + .append(JournalEntry { + seq: None, + execution_id: exec, + kind: ExecutionKind::AgentTurn, + step_id, + entry: EntryKind::StepResult { + idempotency_key: idem, + payload: Bytes::from_static(b"v"), + effect: if guarded { + EffectClass::ExactlyOnceGuarded + } else { + EffectClass::Idempotent + }, + payload_version: 1, + }, + created_at_ms: 0, + }) + .await + .unwrap(); + } + (Arc::new(DurableBackendEnum::Local(Arc::new(local))), exec) + } + + async fn intent_only(step: u32) -> (Arc, ExecutionId) { + let local = LocalBackend::open(":memory:", 1_048_576).await.unwrap(); + local.init().await.unwrap(); + let exec = ExecutionId::new(); + local + .open_execution(exec, ExecutionKind::AgentTurn) + .await + .unwrap(); + let step_id = StepId::new(step); + local + .append(JournalEntry { + seq: None, + execution_id: exec, + kind: ExecutionKind::AgentTurn, + step_id, + entry: EntryKind::EffectIntent { + idempotency_key: IdempotencyKey::derive(exec, step_id, b"op"), + effect: EffectClass::ExactlyOnceGuarded, + hmac: None, + }, + created_at_ms: 0, + }) + .await + .unwrap(); + (Arc::new(DurableBackendEnum::Local(Arc::new(local))), exec) + } + + #[tokio::test] + async fn lookup_classifies_result_intent_and_fresh() { + let (backend, exec) = backend_with(&[(0, false), (1, true)]).await; + let cursor = ReplayCursor::new(backend, exec, DEFAULT_SEGMENT_STEPS); + + assert!(matches!( + cursor.lookup(StepId::new(0)).await.unwrap(), + StepReplay::Result(_) + )); + // Step 1 has both an intent and a result → the result wins. + assert!(matches!( + cursor.lookup(StepId::new(1)).await.unwrap(), + StepReplay::Result(_) + )); + // Step 2 was never journaled → fresh. + assert!(matches!( + cursor.lookup(StepId::new(2)).await.unwrap(), + StepReplay::Fresh + )); + } + + #[tokio::test] + async fn lookup_reports_ambiguous_window_intent_only() { + let (backend, exec) = intent_only(0).await; + let cursor = ReplayCursor::new(backend, exec, DEFAULT_SEGMENT_STEPS); + assert!(matches!( + cursor.lookup(StepId::new(0)).await.unwrap(), + StepReplay::IntentOnly(_) + )); + } + + #[tokio::test] + async fn segmented_reads_cover_a_long_journal() { + // 25 idempotent steps with a tiny 2-step segment forces many segment reads, exercising the + // defer-last-step boundary handling. + let steps: Vec<(u32, bool)> = (0..25).map(|s| (s, false)).collect(); + let (backend, exec) = backend_with(&steps).await; + let cursor = ReplayCursor::new(backend, exec, 2); + for step in 0..25 { + assert!( + matches!( + cursor.lookup(StepId::new(step)).await.unwrap(), + StepReplay::Result(_) + ), + "step {step} should replay from the journal" + ); + } + assert!(matches!( + cursor.lookup(StepId::new(25)).await.unwrap(), + StepReplay::Fresh + )); + } + + #[tokio::test] + async fn out_of_order_lookups_within_a_segment_resolve() { + let steps: Vec<(u32, bool)> = (0..8).map(|s| (s, false)).collect(); + let (backend, exec) = backend_with(&steps).await; + let cursor = ReplayCursor::new(backend, exec, DEFAULT_SEGMENT_STEPS); + // Look up a higher step first, then a lower one — both must still resolve. + assert!(matches!( + cursor.lookup(StepId::new(5)).await.unwrap(), + StepReplay::Result(_) + )); + assert!(matches!( + cursor.lookup(StepId::new(2)).await.unwrap(), + StepReplay::Result(_) + )); + } +} diff --git a/crates/zeph-durable/src/step.rs b/crates/zeph-durable/src/step.rs new file mode 100644 index 000000000..ed1edbfd5 --- /dev/null +++ b/crates/zeph-durable/src/step.rs @@ -0,0 +1,565 @@ +// SPDX-FileCopyrightText: 2026 Andrei G +// SPDX-License-Identifier: MIT OR Apache-2.0 + +//! The durable step primitive and its typestate. +//! +//! A *step* is the unit of durable progress: [`DurableContext::step`](crate::DurableContext::step) +//! runs an operation closure, journals its result, and on a later resume returns the journaled +//! result instead of re-running the closure. This module defines the types that describe and carry a +//! step: +//! +//! - [`StepDescriptor`] — the *what* of a step: its name, [`EffectClass`], ambiguity policy, and an +//! opaque operation fingerprint. Its constructors enforce the construction-time ambiguity rule +//! (FR-DE-09): a destructive, security-relevant, money-moving, or custom guarded step that omits +//! an [`OnAmbiguous`] policy is rejected with [`DurableError::AmbiguityPolicyRequired`]. +//! - [`StepHandle`] — handed to the operation closure so it can forward the step's +//! [`IdempotencyKey`] to an external service as an `Idempotency-Key` header for boundary dedup. +//! - [`StepError`] — the closure's failure channel: any error type the closure produces is wrapped +//! here without coupling the Layer-0 crate to a consumer's error enum (INV-1). +//! - [`StepOutcome`] — the `Live` / `Replayed` typestate that lets a consumer suppress +//! already-emitted side effects (e.g. re-printing assistant output) on replay. +//! - [`DurableStep`] — the recorded result of a step: its id, idempotency key, and outcome. +//! +//! The payload codec is JSON: a step value is serialized to bytes, length-checked, then handed to +//! the journal where the backend AEAD-seals it. The bytes are opaque to the journal — the durable +//! layer never inspects a domain type (INV-1). + +use std::fmt; +use std::marker::PhantomData; + +use bytes::Bytes; +use serde::Serialize; +use serde::de::DeserializeOwned; + +use crate::effect::{EffectClass, EffectIntentSubClass, OnAmbiguous}; +use crate::error::DurableError; +use crate::ids::{IdempotencyKey, StepId}; + +/// Wire-format version stamped on every sealed step payload. +/// +/// Stored in the `payload_version` journal column so a future codec change can be detected and +/// migrated rather than silently misread. +pub(crate) const PAYLOAD_VERSION: u8 = 1; + +/// The error channel for a step's operation closure. +/// +/// The durable layer is Layer-0 infrastructure and must not depend on any consumer's error enum +/// (INV-1), so a closure reports failure through this opaque wrapper. Construct it from any boxable +/// error (or a message) with [`StepError::new`]; the wrapped error stays reachable through +/// [`DurableError::StepFailed`]'s source. +/// +/// # Examples +/// +/// ``` +/// use zeph_durable::StepError; +/// +/// // From a message: +/// let _ = StepError::new("provider returned 503"); +/// // From a concrete error: +/// let io = std::io::Error::other("disk full"); +/// let _ = StepError::new(io); +/// ``` +pub struct StepError(Box); + +impl StepError { + /// Wrap any boxable error (including a `&str` or `String` message) as a step failure. + #[must_use] + pub fn new(source: impl Into>) -> Self { + Self(source.into()) + } + + /// Consume the wrapper, returning the boxed source error. + pub(crate) fn into_inner(self) -> Box { + self.0 + } +} + +impl fmt::Debug for StepError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_tuple("StepError").field(&self.0).finish() + } +} + +impl fmt::Display for StepError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt::Display::fmt(&self.0, f) + } +} + +/// The description of a step: its identity, effect contract, and ambiguity policy. +/// +/// A descriptor is *what* a step is, independent of *when* it runs. The durable layer derives the +/// step's [`IdempotencyKey`] and replay-divergence fingerprint from the descriptor, so the same +/// program point must build an equal descriptor on every run — a structurally different descriptor +/// at a given [`StepId`] is a [`DurableError::ReplayDivergence`]. +/// +/// Build a descriptor through the effect-specific constructors; the +/// [`exactly_once_guarded`](StepDescriptor::exactly_once_guarded) constructor enforces the +/// construction-time ambiguity rule (FR-DE-09). +/// +/// # Examples +/// +/// ``` +/// use zeph_durable::{EffectIntentSubClass, OnAmbiguous, StepDescriptor}; +/// +/// // A read-only step is idempotent and needs no ambiguity policy. +/// let read = StepDescriptor::idempotent("read_file", b"tool:read_file:/etc/hosts".to_vec()); +/// +/// // A destructive guarded step MUST declare its ambiguity policy or construction fails. +/// let delete = StepDescriptor::exactly_once_guarded( +/// "delete_file", +/// EffectIntentSubClass::Destructive, +/// Some(OnAmbiguous::Fail), +/// b"tool:delete_file:/tmp/x".to_vec(), +/// ); +/// assert!(delete.is_ok()); +/// +/// let unsafe_delete = StepDescriptor::exactly_once_guarded( +/// "delete_file", +/// EffectIntentSubClass::Destructive, +/// None, +/// b"tool:delete_file:/tmp/x".to_vec(), +/// ); +/// assert!(unsafe_delete.is_err(), "a destructive guarded step needs an explicit policy"); +/// ``` +#[derive(Debug, Clone)] +pub struct StepDescriptor { + name: &'static str, + effect: EffectClass, + on_ambiguous: Option, + op_fingerprint: Bytes, +} + +impl StepDescriptor { + /// Describe an [`EffectClass::Idempotent`] step (pure or naturally repeatable). + /// + /// A replayed idempotent step returns its journaled result and never re-invokes the closure + /// (INV-10). No ambiguity policy applies. + #[must_use] + pub fn idempotent(name: &'static str, op_fingerprint: impl Into) -> Self { + Self { + name, + effect: EffectClass::Idempotent, + on_ambiguous: None, + op_fingerprint: op_fingerprint.into(), + } + } + + /// Describe an [`EffectClass::AtLeastOnce`] step (safe to repeat under an ambiguous replay). + #[must_use] + pub fn at_least_once(name: &'static str, op_fingerprint: impl Into) -> Self { + Self { + name, + effect: EffectClass::AtLeastOnce, + on_ambiguous: None, + op_fingerprint: op_fingerprint.into(), + } + } + + /// Describe an [`EffectClass::ExactlyOnceGuarded`] step, enforcing the ambiguity-policy rule. + /// + /// The `sub_class` refines what the effect does; the resulting [`OnAmbiguous`] policy decides + /// what happens if a crash leaves the step in the ambiguous window. Only + /// [`EffectIntentSubClass::CostBearingOrBoundaryIdempotent`] has a safe default + /// ([`OnAmbiguous::Skip`]); every other sub-class requires an explicit policy. + /// + /// # Errors + /// + /// Returns [`DurableError::AmbiguityPolicyRequired`] when `sub_class` requires an explicit + /// policy ([`EffectIntentSubClass::requires_explicit_policy`]) but `on_ambiguous` is `None`. + pub fn exactly_once_guarded( + name: &'static str, + sub_class: EffectIntentSubClass, + on_ambiguous: Option, + op_fingerprint: impl Into, + ) -> Result { + let resolved = match on_ambiguous { + Some(policy) => policy, + None if sub_class.requires_explicit_policy() => { + return Err(DurableError::AmbiguityPolicyRequired { step: name }); + } + // Only the cost-bearing / boundary-idempotent sub-class reaches here: a paid call the + // external boundary deduplicates by idempotency key is safe to skip on ambiguity. + None => OnAmbiguous::Skip, + }; + Ok(Self { + name, + effect: EffectClass::ExactlyOnceGuarded, + on_ambiguous: Some(resolved), + op_fingerprint: op_fingerprint.into(), + }) + } + + /// The step's stable name (used in spans, audit records, and error messages). + #[must_use] + pub fn name(&self) -> &'static str { + self.name + } + + /// The step's effect class. + #[must_use] + pub fn effect(&self) -> EffectClass { + self.effect + } + + /// The resolved ambiguity policy, `Some` only for a guarded step. + #[must_use] + pub fn on_ambiguous(&self) -> Option { + self.on_ambiguous + } + + /// The opaque, non-secret operation fingerprint (INV-6). + #[must_use] + pub fn op_fingerprint(&self) -> &Bytes { + &self.op_fingerprint + } + + /// The length-delimited structural fingerprint fed to [`IdempotencyKey::derive`]. + /// + /// Folding `name` and `effect` in (each length-prefixed, the variable `op_fingerprint` last) + /// makes the derived idempotency key the step's structural identity: changing any of them at a + /// given [`StepId`] changes the key, which the replay cursor detects as a divergence (INV-3). + /// The framing is injective so distinct descriptors never collide. + pub(crate) fn fingerprint_input(&self) -> Vec { + let effect = self.effect.as_str(); + let mut input = + Vec::with_capacity(4 + self.name.len() + 4 + effect.len() + self.op_fingerprint.len()); + input.extend_from_slice(&u32_len(self.name.len()).to_le_bytes()); + input.extend_from_slice(self.name.as_bytes()); + input.extend_from_slice(&u32_len(effect.len()).to_le_bytes()); + input.extend_from_slice(effect.as_bytes()); + input.extend_from_slice(&self.op_fingerprint); + input + } +} + +/// A length cast that saturates rather than wrapping — fingerprint inputs are tiny, but the cast +/// must never silently truncate a (pathological) oversized field into a colliding length prefix. +fn u32_len(len: usize) -> u32 { + u32::try_from(len).unwrap_or(u32::MAX) +} + +/// A handle passed to a step's operation closure. +/// +/// Its purpose is boundary deduplication: the closure reads [`StepHandle::idempotency_key`] and +/// forwards it to an external service (e.g. as an `Idempotency-Key` header) so a re-issued call +/// after an ambiguous crash is deduplicated at the boundary. The handle is `Copy` and carries no +/// secret material. +/// +/// # Examples +/// +/// ``` +/// use zeph_durable::{ExecutionId, IdempotencyKey, StepHandle, StepId}; +/// +/// # fn demo(handle: StepHandle) { +/// // The closure can thread the idempotency key into an outbound request. +/// let _header_value = handle.idempotency_key(); +/// let _which_step = handle.step_id(); +/// # } +/// ``` +#[derive(Debug, Clone, Copy)] +pub struct StepHandle { + step_id: StepId, + idempotency_key: IdempotencyKey, +} + +impl StepHandle { + /// Construct a handle for the closure (crate-internal; built by the durable context). + pub(crate) fn new(step_id: StepId, idempotency_key: IdempotencyKey) -> Self { + Self { + step_id, + idempotency_key, + } + } + + /// The step's deterministic position within its execution. + #[must_use] + pub fn step_id(&self) -> StepId { + self.step_id + } + + /// The step's idempotency key, suitable for forwarding to an external boundary for dedup. + #[must_use] + pub fn idempotency_key(&self) -> IdempotencyKey { + self.idempotency_key + } +} + +/// Whether a step's value came from a live run or from the journal. +/// +/// Both variants carry the same `T`; the discriminator lets a consumer suppress already-emitted +/// side effects on replay (the spec's `RuntimeLayer` double-print suppression) without the durable +/// layer knowing what those side effects are. +/// +/// # Examples +/// +/// ``` +/// use zeph_durable::StepOutcome; +/// +/// let live = StepOutcome::Live(7); +/// assert!(!live.was_replayed()); +/// assert_eq!(*live.get(), 7); +/// assert_eq!(live.into_inner(), 7); +/// ``` +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum StepOutcome { + /// The operation closure ran on this execution and produced the value. + Live(T), + /// The value was returned from the journal; the closure was not invoked. + Replayed(T), +} + +impl StepOutcome { + /// Whether the value was replayed from the journal rather than freshly computed. + #[must_use] + pub fn was_replayed(&self) -> bool { + matches!(self, Self::Replayed(_)) + } + + /// Borrow the contained value regardless of provenance. + #[must_use] + pub fn get(&self) -> &T { + match self { + Self::Live(value) | Self::Replayed(value) => value, + } + } + + /// Consume the outcome and return the contained value. + #[must_use] + pub fn into_inner(self) -> T { + match self { + Self::Live(value) | Self::Replayed(value) => value, + } + } +} + +/// The recorded result of a [`DurableContext::step`](crate::DurableContext::step) call. +/// +/// Bundles the step's deterministic identity (its [`StepId`] and [`IdempotencyKey`]) with the +/// [`StepOutcome`]. Most callers want only the value +/// ([`DurableContext::step`](crate::DurableContext::step) returns it directly); take a +/// `DurableStep` when the id, the key, or the live/replayed distinction matters. +/// +/// `T` is recorded only as a type witness — the value lives inside the [`StepOutcome`]. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub struct DurableStep { + step_id: StepId, + idempotency_key: IdempotencyKey, + outcome: StepOutcome, + _marker: PhantomData T>, +} + +impl DurableStep { + /// Build a record for a freshly-executed step. + pub(crate) fn live(step_id: StepId, idempotency_key: IdempotencyKey, value: T) -> Self { + Self { + step_id, + idempotency_key, + outcome: StepOutcome::Live(value), + _marker: PhantomData, + } + } + + /// Build a record for a step whose value was replayed from the journal. + pub(crate) fn replayed(step_id: StepId, idempotency_key: IdempotencyKey, value: T) -> Self { + Self { + step_id, + idempotency_key, + outcome: StepOutcome::Replayed(value), + _marker: PhantomData, + } + } + + /// The step's deterministic position within its execution. + #[must_use] + pub fn step_id(&self) -> StepId { + self.step_id + } + + /// The step's idempotency key. + #[must_use] + pub fn idempotency_key(&self) -> IdempotencyKey { + self.idempotency_key + } + + /// Whether the value was replayed from the journal. + #[must_use] + pub fn was_replayed(&self) -> bool { + self.outcome.was_replayed() + } + + /// Borrow the step's value. + #[must_use] + pub fn value(&self) -> &T { + self.outcome.get() + } + + /// Borrow the full outcome (value plus live/replayed provenance). + #[must_use] + pub fn outcome(&self) -> &StepOutcome { + &self.outcome + } + + /// Consume the record and return just the value. + #[must_use] + pub fn into_value(self) -> T { + self.outcome.into_inner() + } + + /// Consume the record and return the full outcome. + #[must_use] + pub fn into_outcome(self) -> StepOutcome { + self.outcome + } +} + +/// Serialize a step value into journal bytes (the codec is JSON; the journal seals these opaquely). +/// +/// # Errors +/// +/// Returns [`DurableError::Serialize`] if the value cannot be serialized. +pub(crate) fn serialize_result( + value: &T, + step: &'static str, +) -> Result { + serde_json::to_vec(value) + .map(Bytes::from) + .map_err(|_| DurableError::Serialize { step }) +} + +/// Deserialize journaled bytes back into a step value. +/// +/// # Errors +/// +/// Returns [`DurableError::Decode`] if the bytes cannot be decoded into `T`. +pub(crate) fn deserialize_result(bytes: &[u8]) -> Result { + serde_json::from_slice(bytes).map_err(|_| DurableError::Decode { + context: "step result payload could not be deserialized into its type", + }) +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::ids::ExecutionId; + + #[test] + fn guarded_destructive_requires_explicit_policy() { + let err = StepDescriptor::exactly_once_guarded( + "delete", + EffectIntentSubClass::Destructive, + None, + b"op".to_vec(), + ) + .unwrap_err(); + assert!(matches!( + err, + DurableError::AmbiguityPolicyRequired { step: "delete" } + )); + } + + #[test] + fn guarded_cost_bearing_defaults_to_skip() { + let desc = StepDescriptor::exactly_once_guarded( + "llm_call", + EffectIntentSubClass::CostBearingOrBoundaryIdempotent, + None, + b"op".to_vec(), + ) + .unwrap(); + assert_eq!(desc.on_ambiguous(), Some(OnAmbiguous::Skip)); + assert_eq!(desc.effect(), EffectClass::ExactlyOnceGuarded); + } + + #[test] + fn guarded_explicit_policy_overrides_default() { + let desc = StepDescriptor::exactly_once_guarded( + "llm_call", + EffectIntentSubClass::CostBearingOrBoundaryIdempotent, + Some(OnAmbiguous::Rerun), + b"op".to_vec(), + ) + .unwrap(); + assert_eq!(desc.on_ambiguous(), Some(OnAmbiguous::Rerun)); + } + + #[test] + fn non_guarded_descriptors_have_no_policy() { + assert_eq!( + StepDescriptor::idempotent("read", b"op".to_vec()).on_ambiguous(), + None + ); + assert_eq!( + StepDescriptor::at_least_once("enqueue", b"op".to_vec()).on_ambiguous(), + None + ); + } + + #[test] + fn fingerprint_input_is_injective_across_descriptor_fields() { + let base = StepDescriptor::idempotent("a", b"x".to_vec()).fingerprint_input(); + // A different name with a fingerprint that would naively concatenate to the same bytes must + // still differ thanks to the length framing. + let shifted = StepDescriptor::idempotent("ax", b"".to_vec()).fingerprint_input(); + assert_ne!(base, shifted); + // A different effect class changes the fingerprint even with identical name + op bytes. + let other_effect = StepDescriptor::at_least_once("a", b"x".to_vec()).fingerprint_input(); + assert_ne!(base, other_effect); + } + + #[test] + fn fingerprint_drives_idempotency_key_divergence() { + let exec = ExecutionId::new(); + let step = StepId::new(0); + let a = IdempotencyKey::derive( + exec, + step, + &StepDescriptor::idempotent("a", b"x".to_vec()).fingerprint_input(), + ); + let b = IdempotencyKey::derive( + exec, + step, + &StepDescriptor::idempotent("b", b"x".to_vec()).fingerprint_input(), + ); + assert_ne!( + a, b, + "a different descriptor derives a different idempotency key" + ); + } + + #[test] + fn step_outcome_and_durable_step_accessors() { + let key = IdempotencyKey::derive(ExecutionId::new(), StepId::new(2), b"op"); + let live = DurableStep::live(StepId::new(2), key, 41_u32); + assert_eq!(live.step_id(), StepId::new(2)); + assert_eq!(live.idempotency_key(), key); + assert!(!live.was_replayed()); + assert_eq!(*live.value(), 41); + assert!(matches!(live.outcome(), StepOutcome::Live(41))); + assert_eq!(live.into_value(), 41); + + let replayed = DurableStep::replayed(StepId::new(3), key, 7_u32); + assert!(replayed.was_replayed()); + assert!(matches!(replayed.into_outcome(), StepOutcome::Replayed(7))); + } + + #[test] + fn payload_codec_round_trips() { + let bytes = serialize_result(&vec![1_u32, 2, 3], "step").unwrap(); + let back: Vec = deserialize_result(&bytes).unwrap(); + assert_eq!(back, vec![1, 2, 3]); + } + + #[test] + fn deserialize_fails_closed_on_garbage() { + let err = deserialize_result::(b"not json").unwrap_err(); + assert!(matches!(err, DurableError::Decode { .. })); + } + + #[test] + fn step_error_wraps_message_and_concrete_error() { + assert_eq!(StepError::new("boom").to_string(), "boom"); + let io = std::io::Error::other("disk full"); + assert!(StepError::new(io).to_string().contains("disk full")); + } +}