diff --git a/CHANGELOG.md b/CHANGELOG.md index e438577c7..fd788a3ad 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -34,6 +34,19 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/). `key_id || nonce(24) || ciphertext || tag(16)` blob layout with a one-key rotation window, and zeroized key material. Keyed from the vault `ZEPH_DURABLE_KEY`; see the new "Durable Journal Encryption" security reference page for the key-rotation policy. (#4945) +- `feat(durable)`: added the durable persistence engine to `zeph-durable`. `LocalBackend` owns a + dedicated `durable.db` pool (INV-14, schema applied via `zeph_db::run_migrations`), implements the + `Journal` trait (append, full and ranged reads, `finalize`, and a `prune` stub), AEAD-seals + payload-bearing entries through the injected `Option>` with the entry's + location bound as associated data, and stamps a keyed-BLAKE3 row HMAC over control entries when a + key is configured. The background `JournalWriter` actor decouples writes from the calling path: + buffered appends group-commit on a flush interval (dropped with a `WARN` under backpressure), + exactly-once appends flush all causally-preceding entries before committing (INV-4) and return + their `JournalSeq` over a oneshot bounded by `journal_ack_timeout_ms` (INV-12, FR-DE-11), and the + writer resumes from `MAX(seq)` on restart (FR-DE-12). The sealed `ExecutionBackend` trait and the + `DurableBackendEnum` enum dispatcher keep the backend surface closed with no `Box` on the + hot path. Promise, timer, and checkpoint journal entries fail closed + (`DurableError::UnsupportedEntryKind`) until the promise/timer layer lands. (#4946) ### Fixed diff --git a/Cargo.lock b/Cargo.lock index e4b2407a7..7e91122e5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -10863,10 +10863,14 @@ version = "0.21.4" dependencies = [ "blake3", "bytes", + "metrics", "serde", "serde_json", + "tempfile", "thiserror 2.0.18", + "tokio", "toml 1.1.2+spec-1.1.0", + "tracing", "uuid", "zeph-db", ] diff --git a/crates/zeph-durable/Cargo.toml b/crates/zeph-durable/Cargo.toml index 6661f1457..17dd73854 100644 --- a/crates/zeph-durable/Cargo.toml +++ b/crates/zeph-durable/Cargo.toml @@ -22,13 +22,18 @@ postgres = ["zeph-db/postgres"] [dependencies] blake3.workspace = true bytes.workspace = true +metrics.workspace = true serde = { workspace = true, features = ["derive"] } thiserror.workspace = true +tokio = { workspace = true, features = ["macros", "sync", "time"] } +tracing.workspace = true uuid = { workspace = true, features = ["serde", "v7"] } zeph-db.workspace = true [dev-dependencies] serde_json.workspace = true +tempfile.workspace = true +tokio = { workspace = true, features = ["macros", "rt-multi-thread", "sync", "time"] } toml.workspace = true [lints] diff --git a/crates/zeph-durable/README.md b/crates/zeph-durable/README.md index ad84f85bf..f759f9392 100644 --- a/crates/zeph-durable/README.md +++ b/crates/zeph-durable/README.md @@ -10,10 +10,12 @@ flow* of an execution (steps, promises, timers) so a crashed or interrupted run point of failure instead of restarting from scratch. > [!IMPORTANT] -> This crate is a **foundational scaffold** (spec-064, issue #4944). It currently exposes -> *type-level* building blocks only — there is **no execution behavior yet**. The journal writer, -> execution backends, replay cursor, and the durable step primitive land in follow-up issues of -> epic [#4707](https://github.com/bug-ops/zeph/issues/4707). +> This crate is **under active construction** (spec-064, epic +> [#4707](https://github.com/bug-ops/zeph/issues/4707)). The type-level foundation, the AEAD payload +> contract, and the persistence engine — `LocalBackend` (a dedicated `durable.db` pool), the +> background `JournalWriter` actor, and the sealed `ExecutionBackend` dispatcher — have landed. The +> `DurableContext` facade, the replay cursor, the promise/timer layer, and the consuming adapters +> land in follow-up issues of the epic. ## Overview @@ -36,15 +38,25 @@ a dedicated `durable.db` (SQLite) or a feature-gated Restate backend. enum, and `ExecutionStatus`. - **effect** — `EffectClass`, the per-step side-effect contract (`Idempotent` / `AtLeastOnce` / `ExactlyOnceGuarded`). +- **cipher** — the `PayloadCipher` AEAD seal/open contract, the `PayloadAad` location binding, and + the read-side `ensure_payload_within_limit` guard. The concrete cipher lives in a consuming crate + (INV-1). - **config** — pure-data `DurableConfig` and `RetentionPolicy` mirroring the `[durable]` TOML section, with spec defaults applied on deserialization. +- **backend** — the sealed `ExecutionBackend` trait, `BackendCapabilities`, the `DurableBackendEnum` + enum dispatcher, and `LocalBackend` (a dedicated `durable.db` pool implementing `Journal`, sealing + payloads through the injected cipher). +- **writer** — the background `JournalWriter` actor and its cloneable `JournalWriterHandle`: + group-commit for buffered appends, flush-before-commit ACKs for exactly-once entries, and + `MAX(seq)` restart resume. - **error** — the crate-wide `DurableError`. ## Architecture & invariants - **Layer 0, no business-logic dependencies (INV-1).** `zeph-durable` MUST NOT depend on `zeph-llm`, `zeph-memory`, `zeph-core`, `zeph-sanitizer`, or any business-layer crate. Its only - dependencies are `zeph-db` and `zeph-common`. + direct `zeph-*` dependency is `zeph-db`; the rest are infrastructure crates (`tokio`, `tracing`, + `metrics`, `bytes`, `blake3`, `serde`, `uuid`). The concrete payload cipher lives in `zeph-core`. - **Closed enums make illegal states unrepresentable.** Control entries (`EffectIntent`, `PromiseCreated`, `TimerArmed`) carry no payload field — a "control entry with payload" cannot be constructed. diff --git a/crates/zeph-durable/src/backend.rs b/crates/zeph-durable/src/backend.rs new file mode 100644 index 000000000..97c774891 --- /dev/null +++ b/crates/zeph-durable/src/backend.rs @@ -0,0 +1,159 @@ +// SPDX-FileCopyrightText: 2026 Andrei G +// SPDX-License-Identifier: MIT OR Apache-2.0 + +//! The sealed execution-backend abstraction and its enum-dispatch front door. +//! +//! A backend is the persistence engine behind a durable execution: it journals control flow and, +//! for the local backend, owns the dedicated `durable.db` pool. The [`ExecutionBackend`] trait is +//! **sealed** (it requires [`crate::sealed::Sealed`]), so only backends declared inside this crate +//! can implement it. External crates never name a concrete backend; they hold a +//! [`DurableBackendEnum`] and dispatch through it. +//! +//! # Why enum dispatch instead of `Box` +//! +//! The journal append path is hot. A trait object would force a virtual call and a heap allocation +//! per dispatch; [`DurableBackendEnum`] resolves the backend with a single `match` and no +//! allocation (the spec's NEVER list forbids `Box` on the dispatch path). +//! Because the trait is sealed, adding methods to it later — when the `DurableContext`, promise, +//! and timer entry points land — is a non-breaking change. +//! +//! # Scope +//! +//! This module defines [`BackendCapabilities`], the sealed [`ExecutionBackend`] trait (with its +//! `capabilities` accessor), and the [`DurableBackendEnum`] dispatcher. The execution-open, +//! promise-resolution, and timer-scan methods named in the spec land alongside the +//! `DurableContext` (the trait can gain them without breaking callers). + +use crate::config::RetentionPolicy; +use crate::error::DurableError; +use crate::ids::{ExecutionId, JournalSeq}; +use crate::journal::{ExecutionStatus, Journal, JournalEntry}; + +pub mod local; + +pub use local::LocalBackend; + +/// The capabilities a backend advertises so callers can adapt their journaling strategy. +/// +/// The replay cursor and the durable-step primitive read these flags to decide, for example, +/// whether parallel steps may journal concurrently or must be serialized into reserved-id order. +/// +/// # Examples +/// +/// ``` +/// use zeph_durable::BackendCapabilities; +/// +/// // The local backend journals parallel steps concurrently and stays in-process. +/// let caps = BackendCapabilities { +/// parallel_steps: true, +/// cross_process: false, +/// max_payload: 1_048_576, +/// }; +/// assert!(caps.parallel_steps); +/// ``` +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub struct BackendCapabilities { + /// Whether the backend may record parallel steps concurrently. `false` (e.g. Restate) requires + /// the durable wrapper to serialize *recording* into reserved-`StepId` order. + pub parallel_steps: bool, + /// Whether the journal lives on a database shared across processes. Drives the INV-8 encryption + /// gate and row-level HMAC requirement. + pub cross_process: bool, + /// The maximum payload size, in bytes, the backend accepts on append. + pub max_payload: usize, +} + +/// A durable-execution persistence backend. +/// +/// `ExecutionBackend` is the closed set of journal engines Zeph ships. It is sealed via +/// [`crate::sealed::Sealed`]: external crates cannot implement it and must dispatch through +/// [`DurableBackendEnum`]. Every backend is also a [`Journal`], so the append/read/finalize/prune +/// surface is available uniformly. +/// +/// # Contract for implementors +/// +/// - [`capabilities`](ExecutionBackend::capabilities) MUST return a stable description of the +/// backend; callers cache it and adapt their journaling strategy to it. +/// - The [`Journal`] half MUST serialize writes through a single connection so appends receive a +/// monotonic [`JournalSeq`]. +/// +/// Additional entry points (execution open, promise resolution, timer scan) are added as the +/// higher layers land; because the trait is sealed, those additions do not break callers. +pub trait ExecutionBackend: Journal + Send + Sync + crate::sealed::Sealed { + /// Return this backend's stable capability description. + fn capabilities(&self) -> BackendCapabilities; +} + +/// Closed enum dispatch over the compiled-in backends. +/// +/// Construct it from a concrete backend and hand it across the crate boundary behind an `Arc`; +/// callers invoke the [`Journal`] and [`ExecutionBackend`] methods on the enum and the dispatch +/// resolves to the active variant with a single `match`. The enum is `#[non_exhaustive]`: the +/// feature-gated `Restate` variant joins it with the `restate` feature without breaking in-crate +/// matches. +/// +/// # Examples +/// +/// ``` +/// use std::sync::Arc; +/// use zeph_durable::{BackendCapabilities, DurableBackendEnum}; +/// +/// fn max_payload(backend: &DurableBackendEnum) -> usize { +/// use zeph_durable::ExecutionBackend as _; +/// backend.capabilities().max_payload +/// } +/// # let _ = max_payload; +/// ``` +#[derive(Debug)] +#[non_exhaustive] +pub enum DurableBackendEnum { + /// The always-compiled local backend journaling to a dedicated `durable.db`. + Local(LocalBackend), +} + +impl crate::sealed::Sealed for DurableBackendEnum {} + +impl Journal for DurableBackendEnum { + async fn append(&self, entry: JournalEntry) -> Result { + match self { + Self::Local(backend) => backend.append(entry).await, + } + } + + async fn read_execution(&self, id: ExecutionId) -> Result, DurableError> { + match self { + Self::Local(backend) => backend.read_execution(id).await, + } + } + + async fn read_execution_range( + &self, + id: ExecutionId, + from_step_id: u32, + limit: usize, + ) -> Result, DurableError> { + match self { + Self::Local(backend) => backend.read_execution_range(id, from_step_id, limit).await, + } + } + + async fn finalize(&self, id: ExecutionId, status: ExecutionStatus) -> Result<(), DurableError> { + match self { + Self::Local(backend) => backend.finalize(id, status).await, + } + } + + async fn prune(&self, policy: &RetentionPolicy) -> Result { + match self { + Self::Local(backend) => backend.prune(policy).await, + } + } +} + +impl ExecutionBackend for DurableBackendEnum { + fn capabilities(&self) -> BackendCapabilities { + match self { + Self::Local(backend) => backend.capabilities(), + } + } +} diff --git a/crates/zeph-durable/src/backend/local.rs b/crates/zeph-durable/src/backend/local.rs new file mode 100644 index 000000000..18eb4f5d6 --- /dev/null +++ b/crates/zeph-durable/src/backend/local.rs @@ -0,0 +1,1045 @@ +// SPDX-FileCopyrightText: 2026 Andrei G +// SPDX-License-Identifier: MIT OR Apache-2.0 + +//! The always-compiled local journal backend. +//! +//! [`LocalBackend`] owns a dedicated [`zeph_db::DbPool`] on its own `durable.db` file (INV-14): a +//! separate database keeps the high-write journal off the shared application pool, where +//! `BEGIN IMMEDIATE` contention would otherwise serialize unrelated writers. The schema lives in +//! `zeph-db/migrations/{sqlite,postgres}/` and is applied via [`zeph_db::run_migrations`]; the +//! backend owns no `.sql` files of its own. +//! +//! # Sealing and integrity +//! +//! Payload-bearing entries (currently [`EntryKind::StepResult`]) are AEAD-sealed through the +//! injected [`PayloadCipher`] before they touch the database, with the entry's location bound as +//! associated data so a sealed blob cannot be relocated to another step or execution. Control +//! entries (currently [`EntryKind::EffectIntent`]) carry no payload; when an HMAC key is configured +//! the backend stamps a keyed BLAKE3 row HMAC over their identity for shared-database deployments. +//! When no cipher is injected the payload is stored verbatim — a development-only posture gated by +//! [`DurableConfig::encryption_gate`](crate::DurableConfig::encryption_gate) at startup. +//! +//! # Scope +//! +//! This revision journals the step-execution entries — [`EntryKind::StepResult`] and +//! [`EntryKind::EffectIntent`] — that the durable step primitive records, plus execution +//! lifecycle (open and [`finalize`](Journal::finalize)) and the writer's restart anchor +//! (`max_seq`). Promise, timer, and checkpoint entries are journaled by the +//! promise/timer and retention layers; until then [`append`](Journal::append) of those kinds fails +//! closed with [`DurableError::UnsupportedEntryKind`] rather than dropping their state. The +//! retention sweep ([`prune`](Journal::prune)) is a no-op stub here. + +use std::fmt; +use std::sync::Arc; +use std::time::{SystemTime, UNIX_EPOCH}; + +use bytes::Bytes; +use zeph_db::{DbPool, sql}; + +use crate::backend::{BackendCapabilities, ExecutionBackend}; +use crate::cipher::{EntryKindTag, PayloadAad, PayloadCipher, ensure_payload_within_limit}; +use crate::config::RetentionPolicy; +use crate::error::DurableError; +use crate::ids::{ExecutionId, ExecutionKind, IdempotencyKey, JournalSeq, StepId}; +use crate::journal::{EntryKind, ExecutionStatus, Journal, JournalEntry}; +use tracing::Instrument as _; + +/// Slack added to `max_payload_bytes` for the read-side size guard. +/// +/// The stored blob carries AEAD framing (key-id, extended nonce, tag) on top of the plaintext, so a +/// payload accepted at exactly the limit on write is slightly larger on read. The guard exists only +/// to reject absurdly large rows before allocation/decryption (INV-11), so a small fixed slack +/// above any real AEAD overhead keeps legitimate near-limit entries readable without weakening the +/// denial-of-service protection. +const SEAL_OVERHEAD_SLACK: u64 = 128; + +/// The always-compiled durable backend that journals to a dedicated `durable.db`. +/// +/// Construct it from a [`zeph_db::DbPool`] (or open one with [`LocalBackend::open`]), then attach an +/// optional [`PayloadCipher`] and HMAC key with the builder methods. Call [`LocalBackend::init`] +/// once before use to apply the schema migrations. +/// +/// # Examples +/// +/// ```no_run +/// # async fn run() -> Result<(), Box> { +/// use zeph_durable::LocalBackend; +/// +/// // 1 MiB payload ceiling, matching the spec default. +/// let backend = LocalBackend::open("durable.db", 1_048_576).await?; +/// backend.init().await?; +/// # Ok(()) } +/// ``` +pub struct LocalBackend { + pool: DbPool, + cipher: Option>, + hmac_key: Option<[u8; 32]>, + max_payload_bytes: u64, +} + +impl fmt::Debug for LocalBackend { + /// Redacts the cipher and HMAC key — never print key material or a cipher handle. + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("LocalBackend") + .field("cipher", &self.cipher.as_ref().map(|_| "")) + .field("hmac_key", &self.hmac_key.as_ref().map(|_| "")) + .field("max_payload_bytes", &self.max_payload_bytes) + .finish_non_exhaustive() + } +} + +impl LocalBackend { + /// Wrap an existing [`zeph_db::DbPool`] as a local backend with the given payload ceiling. + /// + /// Call [`LocalBackend::init`] before any journal operation to apply the schema. Attach a + /// cipher and HMAC key with [`with_cipher`](Self::with_cipher) and + /// [`with_hmac_key`](Self::with_hmac_key). + #[must_use] + pub fn new(pool: DbPool, max_payload_bytes: u64) -> Self { + Self { + pool, + cipher: None, + hmac_key: None, + max_payload_bytes, + } + } + + /// Open (or create) a backend on a dedicated `durable.db` file (or `:memory:`). + /// + /// Connecting also applies the schema migrations, so a freshly opened backend is ready to use; + /// [`init`](Self::init) may still be called and is idempotent. + /// + /// # Errors + /// + /// Returns [`DurableError::Storage`] if the pool cannot be opened or migrations fail. + pub async fn open(path: &str, max_payload_bytes: u64) -> Result { + let pool = zeph_db::DbConfig { + url: path.to_string(), + max_connections: 5, + pool_size: 5, + } + .connect() + .await + .map_err(|e| DurableError::storage("open", e))?; + Ok(Self::new(pool, max_payload_bytes)) + } + + /// Inject the AEAD payload cipher used to seal and open payload-bearing entries. + #[must_use] + pub fn with_cipher(mut self, cipher: Arc) -> Self { + self.cipher = Some(cipher); + self + } + + /// Configure the keyed-BLAKE3 HMAC key stamped over control entries on shared-database + /// deployments. + #[must_use] + pub fn with_hmac_key(mut self, key: [u8; 32]) -> Self { + self.hmac_key = Some(key); + self + } + + /// Borrow the underlying pool (for tests and adapters that need direct access). + #[must_use] + pub fn pool(&self) -> &DbPool { + &self.pool + } + + /// Apply the durable schema migrations to the backing pool. + /// + /// Idempotent: safe to call repeatedly. The schema is owned by `zeph-db`, not this crate. + /// + /// # Errors + /// + /// Returns [`DurableError::Storage`] if a migration fails. + pub async fn init(&self) -> Result<(), DurableError> { + zeph_db::run_migrations(&self.pool) + .await + .map_err(|e| DurableError::storage("init", e))?; + Ok(()) + } + + /// Ensure a `durable_executions` row exists for `id`, returning whether this is a resume. + /// + /// Inserts a fresh `running` row for a new execution (returning `false`) or detects an existing + /// row for a resumed one (returning `true`). The journal's foreign key requires this row before + /// any entry is appended, so callers open the execution first. + /// + /// Span: `durable.backend.open`. + /// + /// # Errors + /// + /// Returns [`DurableError::Storage`] if the lookup or insert fails. + pub async fn open_execution( + &self, + id: ExecutionId, + kind: ExecutionKind, + ) -> Result { + let span = tracing::info_span!( + "durable.backend.open", + execution_id = %id.as_uuid(), + kind = kind.as_str(), + is_resume = tracing::field::Empty, + ); + async move { + let exec = id.as_uuid().to_string(); + let existing: Option<(String,)> = zeph_db::query_as(sql!( + "SELECT status FROM durable_executions WHERE execution_id = ?" + )) + .bind(&exec) + .fetch_optional(&self.pool) + .await + .map_err(|e| DurableError::storage("open", e))?; + if existing.is_some() { + tracing::Span::current().record("is_resume", true); + return Ok(true); + } + let now = now_unix_millis(); + zeph_db::query(sql!( + "INSERT INTO durable_executions + (execution_id, kind, status, created_at, updated_at, finalized_at) + VALUES (?, ?, 'running', ?, ?, NULL)" + )) + .bind(&exec) + .bind(kind.as_str()) + .bind(now) + .bind(now) + .execute(&self.pool) + .await + .map_err(|e| DurableError::storage("open", e))?; + tracing::Span::current().record("is_resume", false); + Ok(false) + } + .instrument(span) + .await + } + + /// Group-commit a batch of buffered entries in a single write transaction. + /// + /// Used by the [`JournalWriter`](crate::JournalWriter) to amortize the WAL fsync across all + /// entries accumulated within a flush interval. Sealing and HMAC computation run before the + /// transaction opens, keeping CPU work off the write lock. The whole batch commits atomically; + /// a single malformed entry aborts the batch. + /// + /// # Errors + /// + /// Returns [`DurableError::Storage`] on a database failure, or a per-entry error + /// ([`DurableError::PayloadTooLarge`], [`DurableError::UnsupportedEntryKind`], or a cipher + /// failure) if an entry cannot be prepared. + pub(crate) async fn append_batch(&self, entries: &[JournalEntry]) -> Result<(), DurableError> { + if entries.is_empty() { + return Ok(()); + } + let mut rows = Vec::with_capacity(entries.len()); + for entry in entries { + rows.push(self.prepare_row(entry)?); + } + // Evaluate the dialect-rewritten statement once (the postgres `sql!` leaks on each call), + // then reuse it for every row in the batch. + let insert = sql!( + "INSERT INTO durable_journal + (execution_id, step_id, entry_kind, idem_key, effect_class, payload, payload_version, hmac, created_at) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)" + ); + let mut tx = zeph_db::begin_write(&self.pool) + .await + .map_err(|e| DurableError::storage("append_batch", e))?; + for row in rows { + zeph_db::query(insert) + .bind(row.execution_id) + .bind(row.step_id) + .bind(row.entry_kind) + .bind(row.idem_key) + .bind(row.effect_class) + .bind(row.payload) + .bind(row.payload_version) + .bind(row.hmac) + .bind(row.created_at) + .execute(&mut *tx) + .await + .map_err(|e| DurableError::storage("append_batch", e))?; + } + tx.commit() + .await + .map_err(|e| DurableError::storage("append_batch", e))?; + Ok(()) + } + + /// Read the highest committed [`JournalSeq`], or `None` for an empty journal. + /// + /// The [`JournalWriter`](crate::JournalWriter) calls this on (re)start to anchor itself at the + /// last durably-committed entry (FR-DE-12). Because `seq` is a database-assigned autoincrement, + /// resumed appends continue from `MAX(seq) + 1` with neither gap nor duplication. + /// + /// # Errors + /// + /// Returns [`DurableError::Storage`] if the query fails. + pub(crate) async fn max_seq(&self) -> Result, DurableError> { + let max: Option = zeph_db::query_scalar(sql!("SELECT MAX(seq) FROM durable_journal")) + .fetch_one(&self.pool) + .await + .map_err(|e| DurableError::storage("max_seq", e))?; + Ok(max.map(JournalSeq::new)) + } + + /// Seal a plaintext payload, or pass it through verbatim when no cipher is configured. + fn seal_payload(&self, plaintext: &[u8], aad: &PayloadAad) -> Result, DurableError> { + match &self.cipher { + Some(cipher) => Ok(cipher.seal(plaintext, aad)?), + None => Ok(plaintext.to_vec()), + } + } + + /// Open a sealed payload, or copy it through verbatim when no cipher is configured. + fn open_payload(&self, sealed: &[u8], aad: &PayloadAad) -> Result { + match &self.cipher { + Some(cipher) => Ok(Bytes::from(cipher.open(sealed, aad)?)), + None => Ok(Bytes::copy_from_slice(sealed)), + } + } + + /// Compute the keyed-BLAKE3 row HMAC over a control entry's identity, when an HMAC key is set. + /// + /// Binds `(execution_id, step_id, entry_kind, idem_key?)` so a control row cannot be forged or + /// relocated on a shared database. Returns `None` when no key is configured (single-user local). + fn control_hmac( + &self, + entry: &JournalEntry, + idem_key: Option<&IdempotencyKey>, + ) -> Option> { + let key = self.hmac_key.as_ref()?; + let mut input = Vec::with_capacity(16 + 4 + 16 + 32); + input.extend_from_slice(entry.execution_id.as_bytes()); + input.extend_from_slice(&entry.step_id.value().to_le_bytes()); + input.extend_from_slice(entry.entry.tag().as_bytes()); + if let Some(k) = idem_key { + input.extend_from_slice(k.as_bytes()); + } + Some(blake3::keyed_hash(key, &input).as_bytes().to_vec()) + } + + /// Derive the persisted column values for an entry, sealing payloads and stamping HMACs. + fn prepare_row(&self, entry: &JournalEntry) -> Result { + let execution_id = entry.execution_id.as_uuid().to_string(); + let step_id = i64::from(entry.step_id.value()); + let created_at = entry.created_at_ms; + let entry_kind = entry.entry.tag(); + match &entry.entry { + EntryKind::StepResult { + idempotency_key, + payload, + effect, + payload_version, + } => { + ensure_payload_within_limit(payload.len(), self.max_payload_bytes)?; + let aad = PayloadAad::new( + entry.execution_id, + entry.step_id, + EntryKindTag::StepResult, + Some(*idempotency_key), + ); + let sealed = self.seal_payload(payload.as_ref(), &aad)?; + Ok(JournalRow { + execution_id, + step_id, + entry_kind, + idem_key: Some(idempotency_key.as_bytes().to_vec()), + effect_class: Some(effect.as_str()), + payload: Some(sealed), + payload_version: Some(i32::from(*payload_version)), + hmac: None, + created_at, + }) + } + EntryKind::EffectIntent { + idempotency_key, + effect, + hmac: _, + } => { + // The backend is the HMAC keyholder; it stamps the row HMAC itself when configured + // and ignores any caller-supplied value. + let hmac = self.control_hmac(entry, Some(idempotency_key)); + Ok(JournalRow { + execution_id, + step_id, + entry_kind, + idem_key: Some(idempotency_key.as_bytes().to_vec()), + effect_class: Some(effect.as_str()), + payload: None, + payload_version: None, + hmac, + created_at, + }) + } + EntryKind::PromiseCreated { .. } + | EntryKind::PromiseResolved { .. } + | EntryKind::TimerArmed { .. } + | EntryKind::TimerFired { .. } + | EntryKind::Checkpoint { .. } => { + Err(DurableError::UnsupportedEntryKind { kind: entry_kind }) + } + } + } + + /// Look up the owning execution's kind for read-time entry reconstruction. + async fn lookup_kind(&self, id: ExecutionId) -> Result { + let kind: Option = zeph_db::query_scalar(sql!( + "SELECT kind FROM durable_executions WHERE execution_id = ?" + )) + .bind(id.as_uuid().to_string()) + .fetch_optional(&self.pool) + .await + .map_err(|e| DurableError::storage("read", e))?; + let kind = kind.ok_or(DurableError::Decode { + context: "journaled entries reference a missing execution row", + })?; + ExecutionKind::from_tag(&kind).ok_or(DurableError::Decode { + context: "execution kind is not reconstructible (custom kind read-back unsupported)", + }) + } + + /// Reconstruct a [`JournalEntry`] from a stored row, opening sealed payloads. + fn row_to_entry( + &self, + id: ExecutionId, + kind: ExecutionKind, + row: JournalRowRead, + ) -> Result { + let ( + seq, + step_id_raw, + entry_kind, + idem_key, + effect_class, + payload, + payload_version, + hmac, + created_at, + ) = row; + let step_id = + StepId::new( + u32::try_from(step_id_raw).map_err(|_| DurableError::Decode { + context: "step_id out of u32 range", + })?, + ); + let entry = match entry_kind.as_str() { + "step_result" => { + let idem_bytes = idem_key.ok_or(DurableError::Decode { + context: "step_result idem_key missing", + })?; + let idem_key = IdempotencyKey::from_bytes(slice_to_array32( + &idem_bytes, + "step_result idem_key", + )?); + let effect = effect_class + .as_deref() + .and_then(crate::EffectClass::from_tag) + .ok_or(DurableError::Decode { + context: "step_result effect_class missing or invalid", + })?; + let sealed = payload.ok_or(DurableError::Decode { + context: "step_result payload missing", + })?; + ensure_payload_within_limit( + sealed.len(), + self.max_payload_bytes.saturating_add(SEAL_OVERHEAD_SLACK), + )?; + let aad = PayloadAad::new(id, step_id, EntryKindTag::StepResult, Some(idem_key)); + let opened = self.open_payload(&sealed, &aad)?; + let version = u8::try_from(payload_version.unwrap_or(1)).map_err(|_| { + DurableError::Decode { + context: "payload_version out of u8 range", + } + })?; + EntryKind::StepResult { + idempotency_key: idem_key, + payload: opened, + effect, + payload_version: version, + } + } + "effect_intent" => { + let idem_bytes = idem_key.ok_or(DurableError::Decode { + context: "effect_intent idem_key missing", + })?; + let idem_key = IdempotencyKey::from_bytes(slice_to_array32( + &idem_bytes, + "effect_intent idem_key", + )?); + let effect = effect_class + .as_deref() + .and_then(crate::EffectClass::from_tag) + .ok_or(DurableError::Decode { + context: "effect_intent effect_class missing or invalid", + })?; + let hmac = hmac + .map(|bytes| slice_to_array32(&bytes, "effect_intent hmac")) + .transpose()?; + EntryKind::EffectIntent { + idempotency_key: idem_key, + effect, + hmac, + } + } + other => { + return Err(DurableError::UnsupportedEntryKind { + kind: static_entry_tag(other), + }); + } + }; + Ok(JournalEntry { + seq: Some(JournalSeq::new(seq)), + execution_id: id, + kind, + step_id, + entry, + created_at_ms: created_at, + }) + } + + /// Reconstruct every entry from a fetched row set, sharing one kind lookup. + async fn rows_to_entries( + &self, + id: ExecutionId, + rows: Vec, + ) -> Result, DurableError> { + if rows.is_empty() { + return Ok(Vec::new()); + } + let kind = self.lookup_kind(id).await?; + let mut entries = Vec::with_capacity(rows.len()); + for row in rows { + entries.push(self.row_to_entry(id, kind, row)?); + } + Ok(entries) + } +} + +impl Journal for LocalBackend { + async fn append(&self, entry: JournalEntry) -> Result { + let span = tracing::info_span!( + "durable.journal.append", + execution_id = %entry.execution_id.as_uuid(), + step_id = entry.step_id.value(), + entry_kind = entry.entry.tag(), + ); + async move { + let row = self.prepare_row(&entry)?; + let (seq,): (i64,) = zeph_db::query_as(sql!( + "INSERT INTO durable_journal + (execution_id, step_id, entry_kind, idem_key, effect_class, payload, payload_version, hmac, created_at) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) + RETURNING seq" + )) + .bind(row.execution_id) + .bind(row.step_id) + .bind(row.entry_kind) + .bind(row.idem_key) + .bind(row.effect_class) + .bind(row.payload) + .bind(row.payload_version) + .bind(row.hmac) + .bind(row.created_at) + .fetch_one(&self.pool) + .await + .map_err(|e| DurableError::storage("append", e))?; + Ok(JournalSeq::new(seq)) + } + .instrument(span) + .await + } + + async fn read_execution(&self, id: ExecutionId) -> Result, DurableError> { + let span = tracing::info_span!( + "durable.journal.read", + execution_id = %id.as_uuid(), + step_count = tracing::field::Empty, + ); + async move { + let rows: Vec = zeph_db::query_as(sql!( + "SELECT seq, step_id, entry_kind, idem_key, effect_class, payload, payload_version, hmac, created_at + FROM durable_journal WHERE execution_id = ? ORDER BY seq" + )) + .bind(id.as_uuid().to_string()) + .fetch_all(&self.pool) + .await + .map_err(|e| DurableError::storage("read", e))?; + let entries = self.rows_to_entries(id, rows).await?; + tracing::Span::current().record("step_count", entries.len()); + Ok(entries) + } + .instrument(span) + .await + } + + async fn read_execution_range( + &self, + id: ExecutionId, + from_step_id: u32, + limit: usize, + ) -> Result, DurableError> { + let span = tracing::info_span!( + "durable.journal.read_segment", + execution_id = %id.as_uuid(), + from_step_id, + count = tracing::field::Empty, + ); + async move { + let rows: Vec = zeph_db::query_as(sql!( + "SELECT seq, step_id, entry_kind, idem_key, effect_class, payload, payload_version, hmac, created_at + FROM durable_journal WHERE execution_id = ? AND step_id >= ? ORDER BY step_id, seq LIMIT ?" + )) + .bind(id.as_uuid().to_string()) + .bind(i64::from(from_step_id)) + .bind(i64::try_from(limit).unwrap_or(i64::MAX)) + .fetch_all(&self.pool) + .await + .map_err(|e| DurableError::storage("read_segment", e))?; + let entries = self.rows_to_entries(id, rows).await?; + tracing::Span::current().record("count", entries.len()); + Ok(entries) + } + .instrument(span) + .await + } + + async fn finalize(&self, id: ExecutionId, status: ExecutionStatus) -> Result<(), DurableError> { + let span = tracing::info_span!( + "durable.journal.finalize", + execution_id = %id.as_uuid(), + status = status.as_str(), + ); + async move { + let now = now_unix_millis(); + let finalized_at = (!status.is_running()).then_some(now); + let mut tx = zeph_db::begin_write(&self.pool) + .await + .map_err(|e| DurableError::storage("finalize", e))?; + zeph_db::query(sql!( + "UPDATE durable_executions SET status = ?, updated_at = ?, finalized_at = ? + WHERE execution_id = ?" + )) + .bind(status.as_str()) + .bind(now) + .bind(finalized_at) + .bind(id.as_uuid().to_string()) + .execute(&mut *tx) + .await + .map_err(|e| DurableError::storage("finalize", e))?; + tx.commit() + .await + .map_err(|e| DurableError::storage("finalize", e))?; + Ok(()) + } + .instrument(span) + .await + } + + async fn prune(&self, _policy: &RetentionPolicy) -> Result { + // The retention sweep lands with the compaction layer; this stub keeps the trait total and + // the span present for the trace-analysis loop. + let _span = tracing::info_span!("durable.journal.prune", deleted_count = 0u64).entered(); + Ok(0) + } +} + +impl crate::sealed::Sealed for LocalBackend {} + +impl ExecutionBackend for LocalBackend { + fn capabilities(&self) -> BackendCapabilities { + BackendCapabilities { + parallel_steps: true, + // The local backend is in-process on SQLite; a Postgres build talks to a shared server. + cross_process: cfg!(feature = "postgres"), + max_payload: usize::try_from(self.max_payload_bytes).unwrap_or(usize::MAX), + } + } +} + +/// Column values for a single `durable_journal` row, ready to bind. +struct JournalRow { + execution_id: String, + step_id: i64, + entry_kind: &'static str, + idem_key: Option>, + effect_class: Option<&'static str>, + payload: Option>, + payload_version: Option, + hmac: Option>, + created_at: i64, +} + +/// A `durable_journal` row read back from storage, decoded dialect-agnostically. +/// +/// Columns are read as a positional tuple (the convention for crates that depend on `zeph-db` but +/// not `sqlx` directly, mirroring `zeph-scheduler`): integers decode as `i64`/`i32` and blobs as +/// `Vec`, which both backends satisfy through the same `sql!()`-rewritten query. The +/// field order matches the `SELECT` column list: +/// `(seq, step_id, entry_kind, idem_key, effect_class, payload, payload_version, hmac, created_at)`. +type JournalRowRead = ( + i64, + i64, + String, + Option>, + Option, + Option>, + Option, + Option>, + i64, +); + +/// Current Unix time in milliseconds, clamped into `i64` and never panicking. +fn now_unix_millis() -> i64 { + SystemTime::now() + .duration_since(UNIX_EPOCH) + .map_or(0, |d| i64::try_from(d.as_millis()).unwrap_or(i64::MAX)) +} + +/// Decode a stored blob into a fixed 32-byte array, failing closed on the wrong length. +fn slice_to_array32(bytes: &[u8], field: &'static str) -> Result<[u8; 32], DurableError> { + <[u8; 32]>::try_from(bytes).map_err(|_| DurableError::Decode { context: field }) +} + +/// Map a database `entry_kind` string to a `'static` tag for [`DurableError::UnsupportedEntryKind`]. +fn static_entry_tag(tag: &str) -> &'static str { + match tag { + "promise_created" => "promise_created", + "promise_resolved" => "promise_resolved", + "timer_armed" => "timer_armed", + "timer_fired" => "timer_fired", + "checkpoint" => "checkpoint", + _ => "unknown", + } +} + +// Backend tests open a real pool, so they run under the SQLite build (mirroring `zeph-scheduler`, +// whose `:memory:` pool is SQLite-specific). The dialect-agnostic `sql!()` SQL and `i64`/`Vec` +// column types are verified to compile under the Postgres feature; live Postgres parity is exercised +// by the `#[ignore]`d integration test below. +#[cfg(all(test, feature = "sqlite", not(feature = "postgres")))] +mod tests { + use super::*; + use crate::cipher::CipherError; + use crate::effect::EffectClass; + + /// An AAD-authenticated test cipher: a BLAKE3 tag over the AAD prefixes an XOR-masked payload, + /// so opening with a relocated/forged AAD fails authentication exactly like the real cipher. + struct XorCipher; + const XOR_MASK: u8 = 0x5A; + + impl PayloadCipher for XorCipher { + fn seal(&self, plaintext: &[u8], aad: &PayloadAad) -> Result, CipherError> { + let tag = blake3::hash(&aad.canonical_bytes()); + let mut out = tag.as_bytes()[..8].to_vec(); + out.extend(plaintext.iter().map(|b| b ^ XOR_MASK)); + Ok(out) + } + + fn open(&self, sealed: &[u8], aad: &PayloadAad) -> Result, CipherError> { + if sealed.len() < 8 { + return Err(CipherError::Malformed { + context: "sealed blob shorter than the aad tag", + }); + } + let expected = blake3::hash(&aad.canonical_bytes()); + if sealed[..8] != expected.as_bytes()[..8] { + return Err(CipherError::Authentication); + } + Ok(sealed[8..].iter().map(|b| b ^ XOR_MASK).collect()) + } + } + + async fn mem_backend(max_payload_bytes: u64) -> LocalBackend { + let backend = LocalBackend::open(":memory:", max_payload_bytes) + .await + .expect("open in-memory backend"); + backend.init().await.expect("apply migrations"); + backend + } + + fn step_result(exec: ExecutionId, step: u32, payload: &[u8]) -> JournalEntry { + let step_id = StepId::new(step); + JournalEntry { + seq: None, + execution_id: exec, + kind: ExecutionKind::AgentTurn, + step_id, + entry: EntryKind::StepResult { + idempotency_key: IdempotencyKey::derive(exec, step_id, b"tool:read"), + payload: Bytes::copy_from_slice(payload), + effect: EffectClass::Idempotent, + payload_version: 1, + }, + created_at_ms: 100, + } + } + + fn effect_intent(exec: ExecutionId, step: u32) -> JournalEntry { + let step_id = StepId::new(step); + JournalEntry { + seq: None, + execution_id: exec, + kind: ExecutionKind::AgentTurn, + step_id, + entry: EntryKind::EffectIntent { + idempotency_key: IdempotencyKey::derive(exec, step_id, b"transfer"), + effect: EffectClass::ExactlyOnceGuarded, + hmac: None, + }, + created_at_ms: 100, + } + } + + #[tokio::test] + async fn open_execution_is_fresh_then_resume() { + let backend = mem_backend(1_048_576).await; + let exec = ExecutionId::new(); + assert!( + !backend + .open_execution(exec, ExecutionKind::AgentTurn) + .await + .unwrap() + ); + assert!( + backend + .open_execution(exec, ExecutionKind::AgentTurn) + .await + .unwrap() + ); + } + + #[tokio::test] + async fn append_and_read_round_trips_step_result() { + let backend = mem_backend(1_048_576).await; + let exec = ExecutionId::new(); + backend + .open_execution(exec, ExecutionKind::AgentTurn) + .await + .unwrap(); + + let seq = backend + .append(step_result(exec, 0, b"hello")) + .await + .unwrap(); + assert_eq!(seq.value(), 1, "first append takes seq 1"); + + let entries = backend.read_execution(exec).await.unwrap(); + assert_eq!(entries.len(), 1); + match &entries[0].entry { + EntryKind::StepResult { + payload, effect, .. + } => { + assert_eq!(payload.as_ref(), b"hello"); + assert_eq!(*effect, EffectClass::Idempotent); + } + other => panic!("unexpected entry kind: {other:?}"), + } + assert_eq!(entries[0].seq, Some(seq)); + } + + #[tokio::test] + async fn cipher_seals_payload_at_rest_but_round_trips() { + let backend = mem_backend(1_048_576) + .await + .with_cipher(Arc::new(XorCipher)); + let exec = ExecutionId::new(); + backend + .open_execution(exec, ExecutionKind::AgentTurn) + .await + .unwrap(); + backend + .append(step_result(exec, 0, b"secret-payload")) + .await + .unwrap(); + + // The stored column is sealed, never the plaintext. + let (stored,): (Option>,) = zeph_db::query_as(sql!( + "SELECT payload FROM durable_journal WHERE execution_id = ?" + )) + .bind(exec.as_uuid().to_string()) + .fetch_one(backend.pool()) + .await + .unwrap(); + let stored = stored.expect("payload present"); + assert_ne!( + stored.as_slice(), + b"secret-payload", + "payload must be sealed at rest" + ); + + // Reading opens it back to the original plaintext. + let entries = backend.read_execution(exec).await.unwrap(); + match &entries[0].entry { + EntryKind::StepResult { payload, .. } => { + assert_eq!(payload.as_ref(), b"secret-payload"); + } + other => panic!("unexpected entry kind: {other:?}"), + } + } + + #[tokio::test] + async fn control_entry_hmac_is_stamped_only_when_keyed() { + let exec = ExecutionId::new(); + + let unkeyed = mem_backend(1_048_576).await; + unkeyed + .open_execution(exec, ExecutionKind::AgentTurn) + .await + .unwrap(); + unkeyed.append(effect_intent(exec, 0)).await.unwrap(); + match &unkeyed.read_execution(exec).await.unwrap()[0].entry { + EntryKind::EffectIntent { hmac, .. } => assert!(hmac.is_none()), + other => panic!("unexpected entry kind: {other:?}"), + } + + let keyed = mem_backend(1_048_576).await.with_hmac_key([7u8; 32]); + let exec2 = ExecutionId::new(); + keyed + .open_execution(exec2, ExecutionKind::AgentTurn) + .await + .unwrap(); + keyed.append(effect_intent(exec2, 0)).await.unwrap(); + match &keyed.read_execution(exec2).await.unwrap()[0].entry { + EntryKind::EffectIntent { hmac, .. } => { + assert!( + hmac.is_some(), + "keyed backend stamps a row HMAC over control entries" + ); + } + other => panic!("unexpected entry kind: {other:?}"), + } + } + + #[tokio::test] + async fn promise_and_timer_entries_fail_closed() { + let backend = mem_backend(1_048_576).await; + let exec = ExecutionId::new(); + backend + .open_execution(exec, ExecutionKind::AgentTurn) + .await + .unwrap(); + let timer = JournalEntry { + seq: None, + execution_id: exec, + kind: ExecutionKind::AgentTurn, + step_id: StepId::new(0), + entry: EntryKind::TimerArmed { + timer_id: crate::TimerId::new(), + due_at_ms: 1_000, + hmac: None, + }, + created_at_ms: 0, + }; + assert!(matches!( + backend.append(timer).await, + Err(DurableError::UnsupportedEntryKind { + kind: "timer_armed" + }) + )); + } + + #[tokio::test] + async fn payload_over_limit_is_rejected_fail_closed() { + let backend = mem_backend(8).await; + let exec = ExecutionId::new(); + backend + .open_execution(exec, ExecutionKind::AgentTurn) + .await + .unwrap(); + let big = vec![0u8; 64]; + assert!(matches!( + backend.append(step_result(exec, 0, &big)).await, + Err(DurableError::PayloadTooLarge { .. }) + )); + } + + #[tokio::test] + async fn finalize_marks_terminal_status_and_time() { + let backend = mem_backend(1_048_576).await; + let exec = ExecutionId::new(); + backend + .open_execution(exec, ExecutionKind::AgentTurn) + .await + .unwrap(); + backend + .finalize(exec, ExecutionStatus::Completed) + .await + .unwrap(); + + let (status, finalized): (String, Option) = zeph_db::query_as(sql!( + "SELECT status, finalized_at FROM durable_executions WHERE execution_id = ?" + )) + .bind(exec.as_uuid().to_string()) + .fetch_one(backend.pool()) + .await + .unwrap(); + assert_eq!(status, "completed"); + assert!(finalized.is_some(), "a terminal status stamps finalized_at"); + } + + #[tokio::test] + async fn max_seq_reflects_committed_appends() { + let backend = mem_backend(1_048_576).await; + assert_eq!( + backend.max_seq().await.unwrap(), + None, + "empty journal has no max seq" + ); + + let exec = ExecutionId::new(); + backend + .open_execution(exec, ExecutionKind::AgentTurn) + .await + .unwrap(); + for step in 0..3 { + backend.append(step_result(exec, step, b"x")).await.unwrap(); + } + assert_eq!(backend.max_seq().await.unwrap(), Some(JournalSeq::new(3))); + } + + #[tokio::test] + async fn append_batch_group_commits_every_entry() { + let backend = mem_backend(1_048_576).await; + let exec = ExecutionId::new(); + backend + .open_execution(exec, ExecutionKind::AgentTurn) + .await + .unwrap(); + let batch = vec![ + step_result(exec, 0, b"a"), + step_result(exec, 1, b"b"), + step_result(exec, 2, b"c"), + ]; + backend.append_batch(&batch).await.unwrap(); + assert_eq!(backend.read_execution(exec).await.unwrap().len(), 3); + } + + #[tokio::test] + async fn read_execution_range_bounds_the_segment() { + let backend = mem_backend(1_048_576).await; + let exec = ExecutionId::new(); + backend + .open_execution(exec, ExecutionKind::AgentTurn) + .await + .unwrap(); + for step in 0..5 { + backend.append(step_result(exec, step, b"x")).await.unwrap(); + } + let segment = backend.read_execution_range(exec, 2, 2).await.unwrap(); + assert_eq!(segment.len(), 2); + assert_eq!(segment[0].step_id, StepId::new(2)); + assert_eq!(segment[1].step_id, StepId::new(3)); + } + + #[tokio::test] + async fn capabilities_describe_the_local_profile() { + let backend = mem_backend(4096).await; + let caps = backend.capabilities(); + assert!(caps.parallel_steps); + assert!( + !caps.cross_process, + "the SQLite local backend is in-process" + ); + assert_eq!(caps.max_payload, 4096); + } +} diff --git a/crates/zeph-durable/src/effect.rs b/crates/zeph-durable/src/effect.rs index a485cd03d..f78757abb 100644 --- a/crates/zeph-durable/src/effect.rs +++ b/crates/zeph-durable/src/effect.rs @@ -46,6 +46,19 @@ impl EffectClass { Self::ExactlyOnceGuarded => "exactly_once_guarded", } } + + /// Parse the canonical `effect_class` column string back into an [`EffectClass`]. + /// + /// Returns `None` for an unrecognized tag so a corrupt journal row fails closed rather than + /// defaulting to a weaker effect class. + pub(crate) fn from_tag(tag: &str) -> Option { + match tag { + "idempotent" => Some(Self::Idempotent), + "at_least_once" => Some(Self::AtLeastOnce), + "exactly_once_guarded" => Some(Self::ExactlyOnceGuarded), + _ => None, + } + } } #[cfg(test)] diff --git a/crates/zeph-durable/src/error.rs b/crates/zeph-durable/src/error.rs index 83b4a2819..38bbea410 100644 --- a/crates/zeph-durable/src/error.rs +++ b/crates/zeph-durable/src/error.rs @@ -81,6 +81,47 @@ pub enum DurableError { /// `"shared-database"`). context: &'static str, }, + + /// A journal entry of a kind whose persistence is provided by a higher layer not yet wired into + /// this backend revision. Promise, timer, and checkpoint entries land with the promise/timer and + /// retention layers; until then the backend fails closed rather than silently dropping the + /// entry's kind-specific state. + #[error("journal persistence for '{kind}' entries is not available in this backend revision")] + UnsupportedEntryKind { + /// The `entry_kind` tag of the entry whose persistence is deferred. + kind: &'static str, + }, + + /// A journal storage operation failed at the database layer (connection, migration, or query). + /// + /// The static `op` names the failing operation; the underlying database error is attached as + /// the error source. Per INV-5 the `Display` message carries only the operation name — the + /// boxed source never contains plaintext payloads, since every bind is ciphertext, a hash, or a + /// non-secret descriptor. + #[error("durable storage operation '{op}' failed")] + Storage { + /// The static name of the failing operation (e.g. `"append"`, `"finalize"`, `"open"`). + op: &'static str, + /// The underlying database error. + #[source] + source: Box, + }, +} + +impl DurableError { + /// Wrap a database-layer failure as a [`DurableError::Storage`] for the named operation. + /// + /// Used at every `zeph-db` call site so storage failures carry a stable, greppable operation + /// label while the original error remains reachable via [`std::error::Error::source`]. + pub(crate) fn storage( + op: &'static str, + source: impl Into>, + ) -> Self { + Self::Storage { + op, + source: source.into(), + } + } } #[cfg(test)] @@ -105,4 +146,16 @@ mod tests { }; assert!(err.to_string().contains("step 12")); } + + #[test] + fn storage_message_names_op_but_not_the_source_detail() { + let inner = std::io::Error::other("secret-bind-value"); + let err = DurableError::storage("append", inner); + let rendered = err.to_string(); + assert!(rendered.contains("append")); + // The top-line message is metadata-only: the source detail is reachable via `source()`, + // never inlined into Display (INV-5). + assert!(!rendered.contains("secret-bind-value")); + assert!(std::error::Error::source(&err).is_some()); + } } diff --git a/crates/zeph-durable/src/ids.rs b/crates/zeph-durable/src/ids.rs index 7b6aaefe8..bfe526e19 100644 --- a/crates/zeph-durable/src/ids.rs +++ b/crates/zeph-durable/src/ids.rs @@ -193,6 +193,14 @@ impl IdempotencyKey { pub fn as_bytes(&self) -> &[u8; 32] { &self.0 } + + /// Reconstruct a key from its 32 stored bytes. + /// + /// Used by a journal backend to rebuild a key read back from storage; the bytes MUST originate + /// from a prior [`IdempotencyKey::as_bytes`] of a key produced by [`IdempotencyKey::derive`]. + pub(crate) fn from_bytes(bytes: [u8; 32]) -> Self { + Self(bytes) + } } /// Reference to an external-completion handle (HITL, A2A async, subagent result). @@ -307,6 +315,22 @@ impl ExecutionKind { Self::Custom(name) => name, } } + + /// Reconstruct a standard execution kind from its canonical column string. + /// + /// Returns `None` for an unrecognized tag. [`ExecutionKind::Custom`] cannot round-trip from + /// storage — its inner `&'static str` has no representation recoverable from a dynamic database + /// string — so a custom kind read back from the journal is reported as unrecognized rather than + /// silently coerced. + pub(crate) fn from_tag(tag: &str) -> Option { + match tag { + "agent_turn" => Some(Self::AgentTurn), + "dag_run" => Some(Self::DagRun), + "scheduled_job" => Some(Self::ScheduledJob), + "subagent_session" => Some(Self::SubagentSession), + _ => None, + } + } } #[cfg(test)] diff --git a/crates/zeph-durable/src/lib.rs b/crates/zeph-durable/src/lib.rs index 2519844c3..b0383f27a 100644 --- a/crates/zeph-durable/src/lib.rs +++ b/crates/zeph-durable/src/lib.rs @@ -17,9 +17,9 @@ //! business-layer crate (INV-1). Domain meaning lives in thin adapter modules inside each //! consuming crate. //! -//! # Scaffold scope +//! # Module map //! -//! This release establishes the type-level foundation only: +//! Type-level foundation: //! //! - [`ids`] — the journal-boundary newtypes ([`ExecutionId`], [`StepId`], [`JournalSeq`], //! [`IdempotencyKey`], [`PromiseId`], [`TimerId`]) and the [`ExecutionKind`] discriminator. @@ -32,8 +32,15 @@ //! `[durable]` TOML section. //! - [`error`] — the crate-wide [`DurableError`]. //! -//! No execution behavior exists yet: the journal writer, execution backends, replay cursor, and -//! the durable step primitive land in follow-up issues. +//! Persistence engine: +//! +//! - [`backend`] — the sealed [`ExecutionBackend`] trait, [`BackendCapabilities`], the +//! [`DurableBackendEnum`] enum dispatcher, and [`LocalBackend`] (a dedicated `durable.db` pool). +//! - [`writer`] — the background [`JournalWriter`] actor and its cloneable +//! [`JournalWriterHandle`]: group-commit for buffered appends, flush-before-commit ACKs for +//! exactly-once entries, and `MAX(seq)` restart resume. +//! +//! The replay cursor and the durable step primitive build on these in follow-up issues. //! //! # Schema ownership //! @@ -57,16 +64,19 @@ mod sealed; +pub mod backend; pub mod cipher; pub mod config; pub mod effect; pub mod error; pub mod ids; pub mod journal; +pub mod writer; #[doc(hidden)] pub use sealed::Sealed; +pub use backend::{BackendCapabilities, DurableBackendEnum, ExecutionBackend, LocalBackend}; pub use cipher::{ CipherError, EntryKindTag, PayloadAad, PayloadCipher, ensure_payload_within_limit, }; @@ -75,3 +85,4 @@ pub use effect::EffectClass; pub use error::DurableError; pub use ids::{ExecutionId, ExecutionKind, IdempotencyKey, JournalSeq, PromiseId, StepId, TimerId}; pub use journal::{EntryKind, ExecutionStatus, Journal, JournalEntry}; +pub use writer::{JournalWriter, JournalWriterHandle}; diff --git a/crates/zeph-durable/src/writer.rs b/crates/zeph-durable/src/writer.rs new file mode 100644 index 000000000..7cb1c9220 --- /dev/null +++ b/crates/zeph-durable/src/writer.rs @@ -0,0 +1,437 @@ +// SPDX-FileCopyrightText: 2026 Andrei G +// SPDX-License-Identifier: MIT OR Apache-2.0 + +//! The background journal-writer actor. +//! +//! Every durable append routes through a single [`JournalWriter`] task so the calling path never +//! blocks on a database write and writes are serialized into a monotonic [`JournalSeq`]. Callers +//! hold a cheap, cloneable [`JournalWriterHandle`] and choose one of two durability classes: +//! +//! - **Buffered** ([`append_buffered`](JournalWriterHandle::append_buffered)) — fire-and-forget for +//! `Idempotent`/`AtLeastOnce` effects. Entries accumulate and group-commit on a flush interval, +//! amortizing the WAL fsync. On a full channel the entry is dropped with a `WARN`: a lost buffered +//! entry simply re-runs on resume, which is safe by class definition (the durability-on-return +//! guarantee, spec C-N1). +//! - **Acked** ([`append_acked`](JournalWriterHandle::append_acked)) — for `ExactlyOnceGuarded` +//! intents and results. The writer flushes all causally-preceding buffered entries first (INV-4), +//! commits the entry, and only then returns its [`JournalSeq`] over a oneshot. The call is bounded +//! by `journal_ack_timeout_ms`: a stalled or unreachable writer yields +//! [`DurableError::JournalUnavailable`] rather than blocking the agent loop (INV-12, FR-DE-11). +//! +//! # Supervision and restart +//! +//! [`JournalWriter::run`] is the actor future; the daemon spawns it under a `TaskSupervisor` +//! (spec-039). On every (re)start the writer reads `MAX(seq)` to anchor itself at the last +//! committed entry (FR-DE-12); because `seq` is database-assigned, resumed appends continue without +//! gap or duplication. The writer is bound to the local backend's `durable.db` — Restate journals +//! through its own SDK and does not use this actor. + +use std::sync::Arc; +use std::time::Duration; + +use tokio::sync::{mpsc, oneshot}; + +use crate::backend::local::LocalBackend; +use crate::config::DurableConfig; +use crate::error::DurableError; +use crate::ids::JournalSeq; +use crate::journal::{Journal, JournalEntry}; + +/// Bounded capacity of the writer's command channel (1024, per the spec's channel-capacity rule). +const CHANNEL_CAPACITY: usize = 1024; + +/// Maximum entries buffered before an early group-commit, bounding the actor's memory between ticks. +const MAX_BATCH: usize = 256; + +/// A command sent to the [`JournalWriter`] task. +/// +/// Buffered appends are fire-and-forget; acked appends and flushes carry a oneshot the calling task +/// awaits. This is the actor's internal protocol — callers use [`JournalWriterHandle`]. +pub(crate) enum JournalMsg { + /// Append a `Idempotent`/`AtLeastOnce` entry; group-committed, droppable under backpressure. + AppendBuffered(JournalEntry), + /// Append an `ExactlyOnceGuarded` entry; flushed-before-committed and acknowledged by seq. + AppendAcked( + JournalEntry, + oneshot::Sender>, + ), + /// Drain all buffered entries and acknowledge — a turn-boundary barrier. + Flush(oneshot::Sender<()>), +} + +/// The background actor that owns the write path to a [`LocalBackend`]'s `durable.db`. +/// +/// Construct it with [`JournalWriter::new`] (which also returns the handle), then drive it with +/// [`JournalWriter::run`] on a supervised task. +#[derive(Debug)] +pub struct JournalWriter { + backend: Arc, + rx: mpsc::Receiver, + flush_interval: Duration, + max_batch: usize, +} + +impl JournalWriter { + /// Build the writer and its cloneable handle from a backend and the durable configuration. + /// + /// The flush interval and ACK timeout are taken from `config`; the channel is bounded at the + /// spec capacity. Spawn [`JournalWriter::run`] to start processing. + /// + /// # Examples + /// + /// ```no_run + /// # async fn run() -> Result<(), Box> { + /// use std::sync::Arc; + /// use zeph_durable::{DurableConfig, LocalBackend, JournalWriter}; + /// + /// let backend = Arc::new(LocalBackend::open("durable.db", 1_048_576).await?); + /// backend.init().await?; + /// let (writer, handle) = JournalWriter::new(backend, &DurableConfig::default()); + /// let task = tokio::spawn(writer.run()); + /// // ... use `handle` to append; drop all handles to stop the writer ... + /// # let _ = (task, handle); + /// # Ok(()) } + /// ``` + #[must_use] + pub fn new(backend: Arc, config: &DurableConfig) -> (Self, JournalWriterHandle) { + let (tx, rx) = mpsc::channel(CHANNEL_CAPACITY); + let handle = JournalWriterHandle { + tx, + ack_timeout: Duration::from_millis(config.journal_ack_timeout_ms), + }; + let writer = Self { + backend, + rx, + // Tokio's interval panics on a zero period; clamp to at least 1 ms. + flush_interval: Duration::from_millis(config.journal_flush_interval_ms.max(1)), + max_batch: MAX_BATCH, + }; + (writer, handle) + } + + /// Run the actor loop until every [`JournalWriterHandle`] is dropped. + /// + /// On entry the writer reads `MAX(seq)` to resume from the last committed entry (FR-DE-12). It + /// then group-commits buffered entries on each flush tick (or when the batch fills), flushes + /// before every acked commit (INV-4), and emits a `durable.journal.writer.queue_depth` gauge per + /// commit cycle. When the channel closes it drains any remaining buffered entries and returns, + /// so the supervisor can restart it cleanly. + pub async fn run(mut self) { + let resume = match self.backend.max_seq().await { + Ok(seq) => seq, + Err(error) => { + tracing::error!(%error, "journal writer could not read resume seq; starting at 0"); + None + } + }; + tracing::info!( + resume_seq = resume.map(JournalSeq::value), + "journal writer started" + ); + + let mut buffer: Vec = Vec::new(); + let mut flush = tokio::time::interval(self.flush_interval); + flush.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay); + + loop { + tokio::select! { + maybe_msg = self.rx.recv() => match maybe_msg { + Some(JournalMsg::AppendBuffered(entry)) => { + buffer.push(entry); + if buffer.len() >= self.max_batch { + self.flush_buffer(&mut buffer).await; + } + } + Some(JournalMsg::AppendAcked(entry, reply)) => { + // INV-4: every causally-preceding buffered entry is durable before the + // exactly-once entry commits. + self.flush_buffer(&mut buffer).await; + let result = self.backend.append(entry).await; + let _ = reply.send(result); + } + Some(JournalMsg::Flush(reply)) => { + self.flush_buffer(&mut buffer).await; + let _ = reply.send(()); + } + None => { + self.flush_buffer(&mut buffer).await; + break; + } + }, + _ = flush.tick() => { + self.flush_buffer(&mut buffer).await; + } + } + } + tracing::info!("journal writer stopped"); + } + + /// Group-commit and clear the buffer, emitting the queue-depth gauge for the cycle. + /// + /// A failed group-commit drops the buffered entries with a `WARN` (they re-run safely on + /// resume) rather than wedging the actor. + async fn flush_buffer(&self, buffer: &mut Vec) { + if buffer.is_empty() { + return; + } + let depth = u32::try_from(buffer.len()).unwrap_or(u32::MAX); + metrics::gauge!("durable.journal.writer.queue_depth").set(f64::from(depth)); + if let Err(error) = self.backend.append_batch(buffer).await { + tracing::warn!( + %error, + dropped = buffer.len(), + "journal group-commit failed; buffered entries dropped (re-run safely on resume)" + ); + } + buffer.clear(); + } +} + +/// A cheap, cloneable handle to a [`JournalWriter`]. +/// +/// Cloning shares the same underlying channel; the writer stops once the last handle is dropped. +#[derive(Clone, Debug)] +pub struct JournalWriterHandle { + tx: mpsc::Sender, + ack_timeout: Duration, +} + +impl JournalWriterHandle { + /// Enqueue a buffered, fire-and-forget append. + /// + /// Returns immediately. On a full channel the entry is dropped with a `WARN` (acceptable for + /// `Idempotent`/`AtLeastOnce` effects, which re-run safely on resume); on a stopped writer it is + /// likewise dropped. Use [`append_acked`](Self::append_acked) when durability-on-return matters. + pub fn append_buffered(&self, entry: JournalEntry) { + match self.tx.try_send(JournalMsg::AppendBuffered(entry)) { + Ok(()) => {} + Err(mpsc::error::TrySendError::Full(_)) => { + tracing::warn!( + "journal writer channel full; dropping buffered entry (re-runs safely on resume)" + ); + } + Err(mpsc::error::TrySendError::Closed(_)) => { + tracing::warn!("journal writer stopped; dropping buffered entry"); + } + } + } + + /// Append an exactly-once entry and await its committed [`JournalSeq`]. + /// + /// The writer flushes all causally-preceding buffered entries before committing this one + /// (INV-4). The whole round-trip is bounded by `journal_ack_timeout_ms`. + /// + /// # Errors + /// + /// Returns [`DurableError::JournalUnavailable`] if the writer does not acknowledge within the + /// timeout or is unreachable, and propagates a backend [`DurableError`] if the commit itself + /// fails. The caller never blocks indefinitely (INV-12). + pub async fn append_acked(&self, entry: JournalEntry) -> Result { + let (reply_tx, reply_rx) = oneshot::channel(); + let send_and_wait = async { + self.tx + .send(JournalMsg::AppendAcked(entry, reply_tx)) + .await + .map_err(|_| DurableError::JournalUnavailable)?; + match reply_rx.await { + Ok(result) => result, + Err(_) => Err(DurableError::JournalUnavailable), + } + }; + tokio::time::timeout(self.ack_timeout, send_and_wait) + .await + .unwrap_or(Err(DurableError::JournalUnavailable)) + } + + /// Drain all buffered entries to the database and await confirmation — a turn-boundary barrier. + /// + /// # Errors + /// + /// Returns [`DurableError::JournalUnavailable`] if the writer does not confirm within the + /// timeout or is unreachable. + pub async fn flush(&self) -> Result<(), DurableError> { + let (reply_tx, reply_rx) = oneshot::channel(); + let send_and_wait = async { + self.tx + .send(JournalMsg::Flush(reply_tx)) + .await + .map_err(|_| DurableError::JournalUnavailable)?; + reply_rx.await.map_err(|_| DurableError::JournalUnavailable) + }; + tokio::time::timeout(self.ack_timeout, send_and_wait) + .await + .unwrap_or(Err(DurableError::JournalUnavailable)) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::effect::EffectClass; + use crate::ids::{ExecutionId, ExecutionKind, IdempotencyKey, StepId}; + use crate::journal::EntryKind; + use bytes::Bytes; + + fn step_result(exec: ExecutionId, step: u32, payload: &[u8]) -> JournalEntry { + let step_id = StepId::new(step); + JournalEntry { + seq: None, + execution_id: exec, + kind: ExecutionKind::AgentTurn, + step_id, + entry: EntryKind::StepResult { + idempotency_key: IdempotencyKey::derive(exec, step_id, b"tool:read"), + payload: Bytes::copy_from_slice(payload), + effect: EffectClass::Idempotent, + payload_version: 1, + }, + created_at_ms: 100, + } + } + + #[tokio::test] + async fn append_acked_times_out_when_writer_is_stalled() { + // A live channel whose receiver is never polled: the oneshot is never answered, so the + // bounded ACK wait must elapse and surface JournalUnavailable rather than block forever. + let (tx, _rx) = mpsc::channel(4); + let handle = JournalWriterHandle { + tx, + ack_timeout: Duration::from_millis(50), + }; + let result = handle + .append_acked(step_result(ExecutionId::new(), 0, b"x")) + .await; + assert!(matches!(result, Err(DurableError::JournalUnavailable))); + } + + #[tokio::test] + async fn append_acked_errors_when_writer_is_gone() { + // Dropping the receiver closes the channel; the send fails fast (no need to wait the timeout). + let (tx, rx) = mpsc::channel(4); + drop(rx); + let handle = JournalWriterHandle { + tx, + ack_timeout: Duration::from_secs(30), + }; + let result = handle + .append_acked(step_result(ExecutionId::new(), 0, b"x")) + .await; + assert!(matches!(result, Err(DurableError::JournalUnavailable))); + } + + #[tokio::test] + async fn append_buffered_drops_on_full_channel_without_blocking() { + let (tx, mut rx) = mpsc::channel(2); + let handle = JournalWriterHandle { + tx, + ack_timeout: Duration::from_millis(50), + }; + let exec = ExecutionId::new(); + // Three buffered sends into a capacity-2 channel: the third is dropped with a WARN, never + // blocks, and never errors (acceptable for re-runnable buffered entries). + handle.append_buffered(step_result(exec, 0, b"a")); + handle.append_buffered(step_result(exec, 1, b"b")); + handle.append_buffered(step_result(exec, 2, b"c")); + + let mut received = 0; + while rx.try_recv().is_ok() { + received += 1; + } + assert_eq!(received, 2, "the over-capacity buffered entry is dropped"); + } + + #[cfg(all(feature = "sqlite", not(feature = "postgres")))] + mod with_backend { + use super::*; + use crate::DurableConfig; + use crate::backend::local::LocalBackend; + use std::sync::Arc; + + async fn mem_backend() -> Arc { + let backend = LocalBackend::open(":memory:", 1_048_576).await.unwrap(); + backend.init().await.unwrap(); + Arc::new(backend) + } + + fn fast_config() -> DurableConfig { + DurableConfig { + journal_flush_interval_ms: 5, + journal_ack_timeout_ms: 2000, + ..DurableConfig::default() + } + } + + #[tokio::test] + async fn writer_group_commits_buffered_and_acks_exactly_once() { + let backend = mem_backend().await; + let exec = ExecutionId::new(); + backend + .open_execution(exec, ExecutionKind::AgentTurn) + .await + .unwrap(); + + let (writer, handle) = JournalWriter::new(backend.clone(), &fast_config()); + let task = tokio::spawn(writer.run()); + + handle.append_buffered(step_result(exec, 0, b"a")); + handle.append_buffered(step_result(exec, 1, b"b")); + // The acked append flushes the two buffered entries first (INV-4), then commits. + let seq = handle + .append_acked(step_result(exec, 2, b"c")) + .await + .unwrap(); + assert!(seq.value() >= 1); + handle.flush().await.unwrap(); + + let entries = backend.read_execution(exec).await.unwrap(); + assert_eq!( + entries.len(), + 3, + "all buffered and acked entries are durable" + ); + + drop(handle); + task.await.unwrap(); + } + + #[tokio::test] + async fn writer_resumes_from_max_seq_after_restart() { + let backend = mem_backend().await; + let exec = ExecutionId::new(); + backend + .open_execution(exec, ExecutionKind::AgentTurn) + .await + .unwrap(); + let config = fast_config(); + + // First writer commits three acked entries (seq 1, 2, 3), then stops. + let (writer1, handle1) = JournalWriter::new(backend.clone(), &config); + let task1 = tokio::spawn(writer1.run()); + for step in 0..3 { + handle1 + .append_acked(step_result(exec, step, b"x")) + .await + .unwrap(); + } + drop(handle1); + task1.await.unwrap(); + assert_eq!(backend.max_seq().await.unwrap(), Some(JournalSeq::new(3))); + + // A restarted writer resumes from MAX(seq); the next commit continues without a gap. + let (writer2, handle2) = JournalWriter::new(backend.clone(), &config); + let task2 = tokio::spawn(writer2.run()); + let seq4 = handle2 + .append_acked(step_result(exec, 3, b"y")) + .await + .unwrap(); + assert_eq!( + seq4.value(), + 4, + "resumed appends continue with neither gap nor duplication" + ); + + drop(handle2); + task2.await.unwrap(); + } + } +}