Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,26 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/).
`DurableBackendEnum` enum dispatcher keep the backend surface closed with no `Box<dyn>` on the
hot path. Promise, timer, and checkpoint journal entries fail closed
(`DurableError::UnsupportedEntryKind`) until the promise/timer layer lands. (#4946)
- `feat(durable)`: added the durable step primitive and the `&self` `DurableContext` — the execution
heart every adapter wraps. `ctx.step()` runs an operation closure and journals its result; on
resume it replays the journaled result without re-invoking the closure (INV-10, FR-DE-02), and
`ctx.step_recorded()` / `StepOutcome::{Live, Replayed}` expose the live-vs-replayed distinction for
double-emit suppression. Step ids are assigned structurally via an `AtomicU32` (INV-2):
`parallel()` returns a `ParallelScope` whose children get contiguous, eagerly-assigned ids, so a
parallel batch's ids are independent of completion order. Before replaying a result the context
compares the journaled step's `IdempotencyKey` (a BLAKE3 structural fingerprint folding the step
name, effect, and op fingerprint) against the current descriptor's; a mismatch raises
`DurableError::ReplayDivergence`, marks the journal `aborted`, and disables replay so the execution
restarts fresh (INV-3, FR-DE-03). `StepDescriptor::exactly_once_guarded` enforces the
construction-time ambiguity rule — a destructive, security-relevant, money-moving, or custom
guarded step without an explicit `OnAmbiguous` is rejected with
`DurableError::AmbiguityPolicyRequired` (FR-DE-09). A guarded step commits its `EffectIntent`
(ACKed) before the closure runs and its `StepResult` (ACKed) after (FR-DE-04); a guarded effect
that already committed a result is recognized by an idempotency-key point lookup and not re-fired,
even on a post-divergence fresh run (INV-13). Every ambiguous-window resolution emits a mandatory
structured audit record (FR-DE-10), and a writer timeout degrades the step to non-durable mode
rather than blocking (INV-12). The `ReplayCursor` reads the journal in bounded step-range segments
(NFR-DE-02). The promise, timer, and retention layers land in follow-up issues. (#4947)

### Fixed

Expand Down
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions crates/zeph-durable/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,15 @@ blake3.workspace = true
bytes.workspace = true
metrics.workspace = true
serde = { workspace = true, features = ["derive"] }
serde_json.workspace = true
thiserror.workspace = true
tokio = { workspace = true, features = ["macros", "sync", "time"] }
tracing.workspace = true
uuid = { workspace = true, features = ["serde", "v7"] }
zeph-db.workspace = true

[dev-dependencies]
futures.workspace = true
serde_json.workspace = true
tempfile.workspace = true
tokio = { workspace = true, features = ["macros", "rt-multi-thread", "sync", "time"] }
Expand Down
47 changes: 42 additions & 5 deletions crates/zeph-durable/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,11 @@ point of failure instead of restarting from scratch.
> [!IMPORTANT]
> This crate is **under active construction** (spec-064, epic
> [#4707](https://github.com/bug-ops/zeph/issues/4707)). The type-level foundation, the AEAD payload
> contract, and the persistence engine — `LocalBackend` (a dedicated `durable.db` pool), the
> background `JournalWriter` actor, and the sealed `ExecutionBackend` dispatcher — have landed. The
> `DurableContext` facade, the replay cursor, the promise/timer layer, and the consuming adapters
> land in follow-up issues of the epic.
> contract, the persistence engine (`LocalBackend`, the background `JournalWriter` actor, the sealed
> `ExecutionBackend` dispatcher), and the execution heart — the `&self` `DurableContext` with
> deterministic step ids, the fingerprint-guarded replay cursor, the exactly-once intent/result
> protocol, and `parallel()` batches — have landed. The promise/timer layer, journal retention, the
> CLI/TUI integration, and the consuming adapters land in follow-up issues of the epic.

## Overview

Expand All @@ -37,7 +38,15 @@ a dedicated `durable.db` (SQLite) or a feature-gated Restate backend.
- **journal** — the `Journal` trait plus its data model: `JournalEntry`, the closed `EntryKind`
enum, and `ExecutionStatus`.
- **effect** — `EffectClass`, the per-step side-effect contract (`Idempotent` / `AtLeastOnce` /
`ExactlyOnceGuarded`).
`ExactlyOnceGuarded`), plus `EffectIntentSubClass` and the `OnAmbiguous` policy that govern the
ambiguous window.
- **step** — the durable step typestate: `StepDescriptor` (with the construction-time ambiguity
rule), `StepHandle` (exposes the idempotency key for boundary dedup), `StepError`, the
`Live`/`Replayed` `StepOutcome`, and the `DurableStep` record.
- **handle** — the `&self` `DurableContext` front door: `step()` / `step_recorded()` /
`parallel()`, deterministic `AtomicU32` step ids, a BLAKE3 replay-divergence guard, the
exactly-once intent/result protocol, and the `ParallelScope` for completion-order-independent
batches.
- **cipher** — the `PayloadCipher` AEAD seal/open contract, the `PayloadAad` location binding, and
the read-side `ensure_payload_within_limit` guard. The concrete cipher lives in a consuming crate
(INV-1).
Expand Down Expand Up @@ -124,6 +133,34 @@ assert_eq!(cfg.journal_ack_timeout_ms, 5_000);
assert_eq!(cfg.max_payload_bytes, 1_048_576);
```

A `DurableContext` wraps each unit of work in a step. A fresh run executes the closure and journals
its result; a resumed run replays the journaled result without re-running it. The closure receives a
`StepHandle` carrying the step's idempotency key for boundary deduplication:

```rust,ignore
use zeph_durable::{DurableContext, EffectIntentSubClass, OnAmbiguous, StepDescriptor};

// Read-only work is idempotent and replays for free.
let preview: String = ctx
.step(StepDescriptor::idempotent("read_head", b"tool:read:/var/log".to_vec()),
|_handle| async { Ok(read_first_line().await?) })
.await?;

// A paid call is exactly-once-guarded: its intent is journaled before the call and its result
// after, and the idempotency key is forwarded to the provider for boundary dedup.
let reply: String = ctx
.step(
StepDescriptor::exactly_once_guarded(
"llm_call",
EffectIntentSubClass::CostBearingOrBoundaryIdempotent,
Some(OnAmbiguous::Skip),
b"llm:gpt:summarize".to_vec(),
)?,
|handle| async move { Ok(call_provider(handle.idempotency_key()).await?) },
)
.await?;
```

## MSRV

Rust **1.95** (Edition 2024, resolver 3).
Expand Down
40 changes: 38 additions & 2 deletions crates/zeph-durable/src/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,11 @@
//! promise-resolution, and timer-scan methods named in the spec land alongside the
//! `DurableContext` (the trait can gain them without breaking callers).

use std::sync::Arc;

use crate::config::RetentionPolicy;
use crate::error::DurableError;
use crate::ids::{ExecutionId, JournalSeq};
use crate::ids::{ExecutionId, IdempotencyKey, JournalSeq};
use crate::journal::{ExecutionStatus, Journal, JournalEntry};

pub mod local;
Expand Down Expand Up @@ -82,6 +84,25 @@ pub struct BackendCapabilities {
pub trait ExecutionBackend: Journal + Send + Sync + crate::sealed::Sealed {
/// Return this backend's stable capability description.
fn capabilities(&self) -> BackendCapabilities;

/// Look up a committed `StepResult` anywhere in an execution by its [`IdempotencyKey`].
///
/// This is the point-lookup behind INV-13: after a [`DurableError::ReplayDivergence`] the
/// execution restarts fresh, but a guarded effect that already committed its result must not
/// re-fire. Before invoking a guarded operation the durable step consults this lookup; a `Some`
/// result means the effect already succeeded and its journaled value is returned instead. The
/// key uniquely locates the row via the `idx_durable_journal_idem_key` index, so the lookup is
/// `O(log n)`.
///
/// # Errors
///
/// Returns [`DurableError::Decode`] if the located row cannot be reconstructed, or
/// [`DurableError::Storage`] if the query fails.
fn lookup_committed_result(
&self,
id: ExecutionId,
idem_key: IdempotencyKey,
) -> impl std::future::Future<Output = Result<Option<JournalEntry>, DurableError>> + Send;
}

/// Closed enum dispatch over the compiled-in backends.
Expand All @@ -108,7 +129,12 @@ pub trait ExecutionBackend: Journal + Send + Sync + crate::sealed::Sealed {
#[non_exhaustive]
pub enum DurableBackendEnum {
/// The always-compiled local backend journaling to a dedicated `durable.db`.
Local(LocalBackend),
///
/// Held behind an [`Arc`] so the same backing instance can be shared with the
/// [`JournalWriter`](crate::JournalWriter) (which owns the write path) while this enum serves
/// the read path consumed by the [`ReplayCursor`](crate::DurableContext) — both observe one
/// `durable.db` pool.
Local(Arc<LocalBackend>),
}

impl crate::sealed::Sealed for DurableBackendEnum {}
Expand Down Expand Up @@ -156,4 +182,14 @@ impl ExecutionBackend for DurableBackendEnum {
Self::Local(backend) => backend.capabilities(),
}
}

async fn lookup_committed_result(
&self,
id: ExecutionId,
idem_key: IdempotencyKey,
) -> Result<Option<JournalEntry>, DurableError> {
match self {
Self::Local(backend) => backend.lookup_committed_result(id, idem_key).await,
}
}
}
104 changes: 98 additions & 6 deletions crates/zeph-durable/src/backend/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,14 @@
//!
//! This revision journals the step-execution entries — [`EntryKind::StepResult`] and
//! [`EntryKind::EffectIntent`] — that the durable step primitive records, plus execution
//! lifecycle (open and [`finalize`](Journal::finalize)) and the writer's restart anchor
//! (`max_seq`). Promise, timer, and checkpoint entries are journaled by the
//! promise/timer and retention layers; until then [`append`](Journal::append) of those kinds fails
//! closed with [`DurableError::UnsupportedEntryKind`] rather than dropping their state. The
//! retention sweep ([`prune`](Journal::prune)) is a no-op stub here.
//! lifecycle (open and [`finalize`](Journal::finalize)), the writer's restart anchor (`max_seq`),
//! and the idempotency-key point lookup
//! ([`lookup_committed_result`](ExecutionBackend::lookup_committed_result)) that lets a guarded step
//! recognize an already-committed effect after a replay divergence (INV-13). Promise, timer, and
//! checkpoint entries are journaled by the promise/timer and retention layers; until then
//! [`append`](Journal::append) of those kinds fails closed with
//! [`DurableError::UnsupportedEntryKind`] rather than dropping their state. The retention sweep
//! ([`prune`](Journal::prune)) is a no-op stub here.

use std::fmt;
use std::sync::Arc;
Expand Down Expand Up @@ -265,6 +268,49 @@ impl LocalBackend {
Ok(())
}

/// Look up a committed `StepResult` anywhere in an execution by its [`IdempotencyKey`].
///
/// Backs INV-13: a guarded effect that already committed its result must not re-fire after a
/// replay divergence restarts the execution fresh. Returns the (opened) `StepResult` entry when
/// one exists, or `None`. The `idx_durable_journal_idem_key` partial index makes this an
/// `O(log n)` point lookup rather than a scan.
///
/// Span: `durable.journal.lookup_idem`.
///
/// # Errors
///
/// Returns [`DurableError::Storage`] if the query fails, or [`DurableError::Decode`] if the
/// located row cannot be reconstructed.
pub(crate) async fn lookup_committed_result(
&self,
id: ExecutionId,
idem_key: IdempotencyKey,
) -> Result<Option<JournalEntry>, DurableError> {
let span = tracing::info_span!(
"durable.journal.lookup_idem",
execution_id = %id.as_uuid(),
found = tracing::field::Empty,
);
async move {
let rows: Vec<JournalRowRead> = zeph_db::query_as(sql!(
"SELECT seq, step_id, entry_kind, idem_key, effect_class, payload, payload_version, hmac, created_at
FROM durable_journal
WHERE execution_id = ? AND idem_key = ? AND entry_kind = 'step_result'
ORDER BY seq LIMIT 1"
))
.bind(id.as_uuid().to_string())
.bind(idem_key.as_bytes().to_vec())
.fetch_all(&self.pool)
.await
.map_err(|e| DurableError::storage("lookup_idem", e))?;
let entry = self.rows_to_entries(id, rows).await?.into_iter().next();
tracing::Span::current().record("found", entry.is_some());
Ok(entry)
}
.instrument(span)
.await
}

/// Read the highest committed [`JournalSeq`], or `None` for an empty journal.
///
/// The [`JournalWriter`](crate::JournalWriter) calls this on (re)start to anchor itself at the
Expand Down Expand Up @@ -654,6 +700,14 @@ impl ExecutionBackend for LocalBackend {
max_payload: usize::try_from(self.max_payload_bytes).unwrap_or(usize::MAX),
}
}

async fn lookup_committed_result(
&self,
id: ExecutionId,
idem_key: IdempotencyKey,
) -> Result<Option<JournalEntry>, DurableError> {
LocalBackend::lookup_committed_result(self, id, idem_key).await
}
}

/// Column values for a single `durable_journal` row, ready to bind.
Expand Down Expand Up @@ -689,7 +743,7 @@ type JournalRowRead = (
);

/// Current Unix time in milliseconds, clamped into `i64` and never panicking.
fn now_unix_millis() -> i64 {
pub(crate) fn now_unix_millis() -> i64 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.map_or(0, |d| i64::try_from(d.as_millis()).unwrap_or(i64::MAX))
Expand Down Expand Up @@ -1031,6 +1085,44 @@ mod tests {
assert_eq!(segment[1].step_id, StepId::new(3));
}

#[tokio::test]
async fn lookup_committed_result_finds_by_idem_key() {
let backend = mem_backend(1_048_576).await;
let exec = ExecutionId::new();
backend
.open_execution(exec, ExecutionKind::AgentTurn)
.await
.unwrap();
let entry = step_result(exec, 0, b"committed");
let idem_key = match &entry.entry {
EntryKind::StepResult {
idempotency_key, ..
} => *idempotency_key,
other => panic!("unexpected entry kind: {other:?}"),
};
backend.append(entry).await.unwrap();

let found = backend
.lookup_committed_result(exec, idem_key)
.await
.unwrap()
.expect("committed result is located by its idempotency key");
match &found.entry {
EntryKind::StepResult { payload, .. } => assert_eq!(payload.as_ref(), b"committed"),
other => panic!("unexpected entry kind: {other:?}"),
}

// A key that was never committed yields nothing rather than erroring.
let absent = IdempotencyKey::derive(exec, StepId::new(99), b"never");
assert!(
backend
.lookup_committed_result(exec, absent)
.await
.unwrap()
.is_none()
);
}

#[tokio::test]
async fn capabilities_describe_the_local_profile() {
let backend = mem_backend(4096).await;
Expand Down
Loading
Loading