From c04a7aaba6cd9c406e1f6624b2a58ec6dccbad9b Mon Sep 17 00:00:00 2001 From: "Andrei G." Date: Sat, 6 Jun 2026 20:06:07 +0200 Subject: [PATCH] feat(durable): scaffold zeph-durable crate, core newtypes, journal schema Add the Layer-0 zeph-durable crate (spec-064) as the foundation for the native durable execution layer. This first slice is type-level only, with no runtime behavior. - ids: ExecutionId/PromiseId/TimerId (UUIDv7), StepId, JournalSeq, and IdempotencyKey (BLAKE3 derive_key, domain-separated, length-delimited injective input), plus the ExecutionKind discriminator. Private fields, smart constructors, serde-round-trip stable. - journal: the Journal trait (native async via RPITIT + Send), JournalEntry, the closed EntryKind enum (control entries carry no payload, so illegal states are unrepresentable), and ExecutionStatus. - effect: the EffectClass per-step side-effect contract. - config: pure-data DurableConfig + RetentionPolicy with container-level serde defaults mirroring the [durable] TOML section. - error: DurableError (non_exhaustive, metadata-only messages). - sealed: Sealed token for the future backend hierarchy. Schema: durable_executions/journal/promises/timers added as numbered migrations 097-100 in zeph-db/migrations/{sqlite,postgres} (matching file counts, schema-equivalent). zeph-durable owns no .sql and no sqlx::migrate!; migrations are applied via zeph_db::run_migrations against the dedicated durable pool. No business-layer dependencies (INV-1). Migration ownership stays in zeph-db (INV-14). Part of #4707. Closes #4944. --- CHANGELOG.md | 15 + Cargo.lock | 14 + Cargo.toml | 1 + .../postgres/097_durable_executions.sql | 13 + .../postgres/098_durable_journal.sql | 27 ++ .../postgres/099_durable_promises.sql | 11 + .../postgres/100_durable_timers.sql | 10 + .../sqlite/097_durable_executions.sql | 13 + .../migrations/sqlite/098_durable_journal.sql | 27 ++ .../sqlite/099_durable_promises.sql | 11 + .../migrations/sqlite/100_durable_timers.sql | 10 + crates/zeph-durable/Cargo.toml | 35 ++ crates/zeph-durable/README.md | 121 ++++++ crates/zeph-durable/src/config.rs | 199 +++++++++ crates/zeph-durable/src/effect.rs | 64 +++ crates/zeph-durable/src/error.rs | 96 ++++ crates/zeph-durable/src/ids.rs | 411 ++++++++++++++++++ crates/zeph-durable/src/journal.rs | 336 ++++++++++++++ crates/zeph-durable/src/lib.rs | 71 +++ crates/zeph-durable/src/sealed.rs | 20 + 20 files changed, 1505 insertions(+) create mode 100644 crates/zeph-db/migrations/postgres/097_durable_executions.sql create mode 100644 crates/zeph-db/migrations/postgres/098_durable_journal.sql create mode 100644 crates/zeph-db/migrations/postgres/099_durable_promises.sql create mode 100644 crates/zeph-db/migrations/postgres/100_durable_timers.sql create mode 100644 crates/zeph-db/migrations/sqlite/097_durable_executions.sql create mode 100644 crates/zeph-db/migrations/sqlite/098_durable_journal.sql create mode 100644 crates/zeph-db/migrations/sqlite/099_durable_promises.sql create mode 100644 crates/zeph-db/migrations/sqlite/100_durable_timers.sql create mode 100644 crates/zeph-durable/Cargo.toml create mode 100644 crates/zeph-durable/README.md create mode 100644 crates/zeph-durable/src/config.rs create mode 100644 crates/zeph-durable/src/effect.rs create mode 100644 crates/zeph-durable/src/error.rs create mode 100644 crates/zeph-durable/src/ids.rs create mode 100644 crates/zeph-durable/src/journal.rs create mode 100644 crates/zeph-durable/src/lib.rs create mode 100644 crates/zeph-durable/src/sealed.rs diff --git a/CHANGELOG.md b/CHANGELOG.md index f8fb743e1..112c7ad45 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,21 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/). ## [Unreleased] +### Added + +- `feat(durable)`: scaffolded the new Layer-0 `zeph-durable` crate (spec-064) — the foundation of + the native durable execution layer. This first slice is type-level only, with no runtime + behavior: journal-boundary newtypes (`ExecutionId`/`PromiseId`/`TimerId` as UUIDv7, `StepId`, + `JournalSeq`, `IdempotencyKey`, plus the `ExecutionKind` discriminator), the `Journal` trait and + its `JournalEntry`/`EntryKind`/`ExecutionStatus` data model, the `EffectClass` side-effect + contract, the pure-data `DurableConfig`/`RetentionPolicy` mirroring `[durable]` TOML (all + spec-default-backed), and the `DurableError` type. `IdempotencyKey::derive` uses BLAKE3 + `derive_key` with a domain-separation context and length-delimited (injective) input. The crate + is pure infrastructure with no business-layer dependencies (INV-1). The four `durable_*` schema + tables (`durable_executions`, `durable_journal`, `durable_promises`, `durable_timers`) were added + as numbered migrations `097`–`100` in both `zeph-db/migrations/sqlite/` and `.../postgres/`; + `zeph-durable` owns no `.sql` files and no `sqlx::migrate!` (INV-14). (#4944) + ### Fixed - `perf(acp)`: `list_directory` and `find_path` tools now offload their synchronous diff --git a/Cargo.lock b/Cargo.lock index 18db8392a..1320f9f4a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -10854,6 +10854,20 @@ dependencies = [ "zeph-common", ] +[[package]] +name = "zeph-durable" +version = "0.21.4" +dependencies = [ + "blake3", + "bytes", + "serde", + "serde_json", + "thiserror 2.0.18", + "toml 1.1.2+spec-1.1.0", + "uuid", + "zeph-db", +] + [[package]] name = "zeph-experiments" version = "0.21.4" diff --git a/Cargo.toml b/Cargo.toml index 7b2455a70..2e93316a8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -30,6 +30,7 @@ base64 = "0.22.1" bech32 = "0.11.1" blake3 = "1.8.5" bytemuck = "1.25" +bytes = "1.11.1" candle-core = { version = "0.10.2", default-features = false } candle-nn = { version = "0.10.2", default-features = false } candle-transformers = { version = "0.10.2", default-features = false } diff --git a/crates/zeph-db/migrations/postgres/097_durable_executions.sql b/crates/zeph-db/migrations/postgres/097_durable_executions.sql new file mode 100644 index 000000000..fe856b1be --- /dev/null +++ b/crates/zeph-db/migrations/postgres/097_durable_executions.sql @@ -0,0 +1,13 @@ +-- Durable execution layer (spec-064, #4944): one row per durable execution. +-- Applied via zeph_db::run_migrations against the dedicated durable pool (INV-14). +-- The owning zeph-durable crate holds no .sql files and no sqlx::migrate!. +CREATE TABLE durable_executions ( + execution_id TEXT PRIMARY KEY, + kind TEXT NOT NULL, + status TEXT NOT NULL CHECK(status IN ('running', 'completed', 'failed', 'aborted')), + created_at BIGINT NOT NULL, + updated_at BIGINT NOT NULL, + finalized_at BIGINT -- NULL until terminal; drives retention. +); + +CREATE INDEX idx_durable_exec_status_time ON durable_executions(status, finalized_at); diff --git a/crates/zeph-db/migrations/postgres/098_durable_journal.sql b/crates/zeph-db/migrations/postgres/098_durable_journal.sql new file mode 100644 index 000000000..2bc9dd7d3 --- /dev/null +++ b/crates/zeph-db/migrations/postgres/098_durable_journal.sql @@ -0,0 +1,27 @@ +-- Durable execution layer (spec-064, #4944): append-only journal entries. +-- payload holds AEAD-sealed bytes (nonce || ciphertext || tag); control entries leave it NULL. +CREATE TABLE durable_journal ( + seq BIGSERIAL PRIMARY KEY, -- global append order (durability anchor) + execution_id TEXT NOT NULL REFERENCES durable_executions(execution_id), + step_id BIGINT NOT NULL, + entry_kind TEXT NOT NULL, + idem_key BYTEA, -- IdempotencyKey (32B); NULL for non-step entries + effect_class TEXT, + payload BYTEA, -- AEAD-sealed; NULL for control entries + payload_version INTEGER, + hmac BYTEA, -- row-level HMAC for shared-DB / Restate + created_at BIGINT NOT NULL +); + +CREATE INDEX idx_durable_journal_exec_step + ON durable_journal(execution_id, step_id, seq); + +-- Enforce at most one committed result per step (defense in depth alongside the writer). +CREATE UNIQUE INDEX idx_durable_journal_result + ON durable_journal(execution_id, step_id) + WHERE entry_kind = 'step_result'; + +-- Efficient exactly-once intent lookup ("does this intent already exist?"). +CREATE INDEX idx_durable_journal_idem_key + ON durable_journal(execution_id, idem_key) + WHERE idem_key IS NOT NULL; diff --git a/crates/zeph-db/migrations/postgres/099_durable_promises.sql b/crates/zeph-db/migrations/postgres/099_durable_promises.sql new file mode 100644 index 000000000..d5a89ca60 --- /dev/null +++ b/crates/zeph-db/migrations/postgres/099_durable_promises.sql @@ -0,0 +1,11 @@ +-- Durable execution layer (spec-064, #4944): external-completion handles. +-- The 32-byte resolver token is never stored; only its BLAKE3 hash (INV-9). +CREATE TABLE durable_promises ( + promise_id TEXT PRIMARY KEY, + execution_id TEXT NOT NULL REFERENCES durable_executions(execution_id), + resolver_token_hash BYTEA NOT NULL, + resolved INTEGER NOT NULL DEFAULT 0, + payload BYTEA, + created_at BIGINT NOT NULL, + resolved_at BIGINT +); diff --git a/crates/zeph-db/migrations/postgres/100_durable_timers.sql b/crates/zeph-db/migrations/postgres/100_durable_timers.sql new file mode 100644 index 000000000..2d8ee6f43 --- /dev/null +++ b/crates/zeph-db/migrations/postgres/100_durable_timers.sql @@ -0,0 +1,10 @@ +-- Durable execution layer (spec-064, #4944): durable wakes persisted across restarts. +CREATE TABLE durable_timers ( + timer_id TEXT PRIMARY KEY, + execution_id TEXT NOT NULL REFERENCES durable_executions(execution_id), + due_at BIGINT NOT NULL, + fired INTEGER NOT NULL DEFAULT 0, + created_at BIGINT NOT NULL +); + +CREATE INDEX idx_durable_timers_due ON durable_timers(fired, due_at); diff --git a/crates/zeph-db/migrations/sqlite/097_durable_executions.sql b/crates/zeph-db/migrations/sqlite/097_durable_executions.sql new file mode 100644 index 000000000..2dc440c87 --- /dev/null +++ b/crates/zeph-db/migrations/sqlite/097_durable_executions.sql @@ -0,0 +1,13 @@ +-- Durable execution layer (spec-064, #4944): one row per durable execution. +-- Applied via zeph_db::run_migrations against the dedicated durable.db pool (INV-14). +-- The owning zeph-durable crate holds no .sql files and no sqlx::migrate!. +CREATE TABLE durable_executions ( + execution_id TEXT PRIMARY KEY, + kind TEXT NOT NULL, + status TEXT NOT NULL CHECK(status IN ('running', 'completed', 'failed', 'aborted')), + created_at INTEGER NOT NULL, + updated_at INTEGER NOT NULL, + finalized_at INTEGER -- NULL until terminal; drives retention. +); + +CREATE INDEX idx_durable_exec_status_time ON durable_executions(status, finalized_at); diff --git a/crates/zeph-db/migrations/sqlite/098_durable_journal.sql b/crates/zeph-db/migrations/sqlite/098_durable_journal.sql new file mode 100644 index 000000000..05a8fa861 --- /dev/null +++ b/crates/zeph-db/migrations/sqlite/098_durable_journal.sql @@ -0,0 +1,27 @@ +-- Durable execution layer (spec-064, #4944): append-only journal entries. +-- payload holds AEAD-sealed bytes (nonce || ciphertext || tag); control entries leave it NULL. +CREATE TABLE durable_journal ( + seq INTEGER PRIMARY KEY AUTOINCREMENT, -- global append order (durability anchor) + execution_id TEXT NOT NULL REFERENCES durable_executions(execution_id), + step_id INTEGER NOT NULL, + entry_kind TEXT NOT NULL, + idem_key BLOB, -- IdempotencyKey (32B); NULL for non-step entries + effect_class TEXT, + payload BLOB, -- AEAD-sealed; NULL for control entries + payload_version INTEGER, + hmac BLOB, -- row-level HMAC for shared-DB / Restate + created_at INTEGER NOT NULL +); + +CREATE INDEX idx_durable_journal_exec_step + ON durable_journal(execution_id, step_id, seq); + +-- Enforce at most one committed result per step (defense in depth alongside the writer). +CREATE UNIQUE INDEX idx_durable_journal_result + ON durable_journal(execution_id, step_id) + WHERE entry_kind = 'step_result'; + +-- Efficient exactly-once intent lookup ("does this intent already exist?"). +CREATE INDEX idx_durable_journal_idem_key + ON durable_journal(execution_id, idem_key) + WHERE idem_key IS NOT NULL; diff --git a/crates/zeph-db/migrations/sqlite/099_durable_promises.sql b/crates/zeph-db/migrations/sqlite/099_durable_promises.sql new file mode 100644 index 000000000..6f96b4aad --- /dev/null +++ b/crates/zeph-db/migrations/sqlite/099_durable_promises.sql @@ -0,0 +1,11 @@ +-- Durable execution layer (spec-064, #4944): external-completion handles. +-- The 32-byte resolver token is never stored; only its BLAKE3 hash (INV-9). +CREATE TABLE durable_promises ( + promise_id TEXT PRIMARY KEY, + execution_id TEXT NOT NULL REFERENCES durable_executions(execution_id), + resolver_token_hash BLOB NOT NULL, + resolved INTEGER NOT NULL DEFAULT 0, + payload BLOB, + created_at INTEGER NOT NULL, + resolved_at INTEGER +); diff --git a/crates/zeph-db/migrations/sqlite/100_durable_timers.sql b/crates/zeph-db/migrations/sqlite/100_durable_timers.sql new file mode 100644 index 000000000..c42cfcf49 --- /dev/null +++ b/crates/zeph-db/migrations/sqlite/100_durable_timers.sql @@ -0,0 +1,10 @@ +-- Durable execution layer (spec-064, #4944): durable wakes persisted across restarts. +CREATE TABLE durable_timers ( + timer_id TEXT PRIMARY KEY, + execution_id TEXT NOT NULL REFERENCES durable_executions(execution_id), + due_at INTEGER NOT NULL, + fired INTEGER NOT NULL DEFAULT 0, + created_at INTEGER NOT NULL +); + +CREATE INDEX idx_durable_timers_due ON durable_timers(fired, due_at); diff --git a/crates/zeph-durable/Cargo.toml b/crates/zeph-durable/Cargo.toml new file mode 100644 index 000000000..6661f1457 --- /dev/null +++ b/crates/zeph-durable/Cargo.toml @@ -0,0 +1,35 @@ +[package] +name = "zeph-durable" +version.workspace = true +edition.workspace = true +authors.workspace = true +license.workspace = true +repository.workspace = true +homepage.workspace = true +keywords.workspace = true +categories.workspace = true +publish.workspace = true +description = "Native durable execution layer for Zeph: journaled control flow with crash-resume" +readme = "README.md" + +[features] +# Backend selection forwarded to zeph-db (mutually exclusive, mirrors zeph-scheduler). +# default = ["sqlite"] keeps standalone `cargo check -p zeph-durable` and rust-analyzer working. +default = ["sqlite"] +sqlite = ["zeph-db/sqlite"] +postgres = ["zeph-db/postgres"] + +[dependencies] +blake3.workspace = true +bytes.workspace = true +serde = { workspace = true, features = ["derive"] } +thiserror.workspace = true +uuid = { workspace = true, features = ["serde", "v7"] } +zeph-db.workspace = true + +[dev-dependencies] +serde_json.workspace = true +toml.workspace = true + +[lints] +workspace = true diff --git a/crates/zeph-durable/README.md b/crates/zeph-durable/README.md new file mode 100644 index 000000000..ad84f85bf --- /dev/null +++ b/crates/zeph-durable/README.md @@ -0,0 +1,121 @@ +# zeph-durable + +[![Crates.io](https://img.shields.io/crates/v/zeph-durable)](https://crates.io/crates/zeph-durable) +[![docs.rs](https://img.shields.io/docsrs/zeph-durable)](https://docs.rs/zeph-durable) +[![License: MIT](https://img.shields.io/badge/License-MIT-yellow.svg)](../../LICENSE) +[![MSRV](https://img.shields.io/badge/MSRV-1.95-blue)](https://www.rust-lang.org) + +Native durable execution layer for [Zeph](https://github.com/bug-ops/zeph) — journals the *control +flow* of an execution (steps, promises, timers) so a crashed or interrupted run can resume at the +point of failure instead of restarting from scratch. + +> [!IMPORTANT] +> This crate is a **foundational scaffold** (spec-064, issue #4944). It currently exposes +> *type-level* building blocks only — there is **no execution behavior yet**. The journal writer, +> execution backends, replay cursor, and the durable step primitive land in follow-up issues of +> epic [#4707](https://github.com/bug-ops/zeph/issues/4707). + +## Overview + +`zeph-durable` is a Layer-0 infrastructure crate, analogous to `zeph-db` and `zeph-common`. It is a +pure infrastructure primitive: it sees opaque serialized payloads, never domain types. Domain +meaning lives in thin adapter modules inside each consuming crate (the agent tool-loop, +orchestration, scheduler, and subagent layers). + +The eventual design provides a `DurableContext` facade (`step()` / `parallel()` / `promise()` / +`sleep_until()`), an explicit `EffectClass` contract per step, a background journal-writer actor +with group-commit, AEAD payload encryption, and a fingerprint-guarded replay cursor — all backed by +a dedicated `durable.db` (SQLite) or a feature-gated Restate backend. + +## Key Modules + +- **ids** — journal-boundary newtypes: `ExecutionId` / `PromiseId` / `TimerId` (UUIDv7), `StepId`, + `JournalSeq`, `IdempotencyKey`, and the `ExecutionKind` discriminator. Private fields, smart + constructors, serde-round-trip stable. +- **journal** — the `Journal` trait plus its data model: `JournalEntry`, the closed `EntryKind` + enum, and `ExecutionStatus`. +- **effect** — `EffectClass`, the per-step side-effect contract (`Idempotent` / `AtLeastOnce` / + `ExactlyOnceGuarded`). +- **config** — pure-data `DurableConfig` and `RetentionPolicy` mirroring the `[durable]` TOML + section, with spec defaults applied on deserialization. +- **error** — the crate-wide `DurableError`. + +## Architecture & invariants + +- **Layer 0, no business-logic dependencies (INV-1).** `zeph-durable` MUST NOT depend on + `zeph-llm`, `zeph-memory`, `zeph-core`, `zeph-sanitizer`, or any business-layer crate. Its only + dependencies are `zeph-db` and `zeph-common`. +- **Closed enums make illegal states unrepresentable.** Control entries (`EffectIntent`, + `PromiseCreated`, `TimerArmed`) carry no payload field — a "control entry with payload" cannot be + constructed. +- **Domain-separated idempotency keys.** `IdempotencyKey::derive` uses BLAKE3 `derive_key` with a + fixed context string and length-delimited (injective) input, so an attacker-controlled + fingerprint cannot collide with a different `(execution_id, step_id)` pair. + +> [!NOTE] +> **Schema ownership (INV-14).** `zeph-durable` owns **no** `.sql` files and **no** +> `sqlx::migrate!`. The four `durable_*` tables (`durable_executions`, `durable_journal`, +> `durable_promises`, `durable_timers`) live as numbered migrations in +> `zeph-db/migrations/{sqlite,postgres}/` and are applied via `zeph_db::run_migrations` against a +> dedicated `durable.db` pool. + +## Installation + +This crate is an internal workspace member of Zeph. To use it from another workspace crate: + +```toml +[dependencies] +zeph-durable = { path = "../zeph-durable" } +# or with the postgres backend: +zeph-durable = { path = "../zeph-durable", default-features = false, features = ["postgres"] } +``` + +## Feature Flags + +Backend selection is forwarded to `zeph-db`; exactly one backend is active at a time. + +| Feature | Description | Default | +|---------|-------------|---------| +| `sqlite` | Enables the SQLite backend via `zeph-db/sqlite` | Yes | +| `postgres` | Enables the PostgreSQL backend via `zeph-db/postgres` | No | + +> [!WARNING] +> `sqlite` and `postgres` are mutually exclusive (enforced by `zeph-db`). Building with +> `--all-features` is intentionally unsupported — use `--features full` or `--features full,postgres`. + +## Usage + +Idempotency keys are deterministic for a given `(execution, step, fingerprint)` and domain-separated +from any other BLAKE3 use: + +```rust +use zeph_durable::{ExecutionId, IdempotencyKey, StepId}; + +let execution = ExecutionId::new(); // fresh, time-ordered UUIDv7 + +let key = IdempotencyKey::derive(execution, StepId::new(0), b"tool:read_file"); +assert_eq!( + key, + IdempotencyKey::derive(execution, StepId::new(0), b"tool:read_file"), +); +``` + +Configuration deserializes from the `[durable]` TOML table with every field defaulted to its spec +value: + +```rust +use zeph_durable::DurableConfig; + +let cfg: DurableConfig = toml::from_str("").unwrap(); // empty table => all defaults +assert!(!cfg.enabled); +assert_eq!(cfg.journal_ack_timeout_ms, 5_000); +assert_eq!(cfg.max_payload_bytes, 1_048_576); +``` + +## MSRV + +Rust **1.95** (Edition 2024, resolver 3). + +## License + +MIT — see [LICENSE](../../LICENSE). diff --git a/crates/zeph-durable/src/config.rs b/crates/zeph-durable/src/config.rs new file mode 100644 index 000000000..c059857f8 --- /dev/null +++ b/crates/zeph-durable/src/config.rs @@ -0,0 +1,199 @@ +// SPDX-FileCopyrightText: 2026 Andrei G +// SPDX-License-Identifier: MIT OR Apache-2.0 + +//! Pure-data configuration mirroring the `[durable]` TOML section. +//! +//! Every field carries a spec default via the container-level `#[serde(default)]` attribute backed +//! by [`Default`], so deserializing an empty table yields a fully-populated, spec-compliant +//! configuration. No credentials appear inline — the AEAD key and any Restate endpoints are +//! resolved from the vault by key name (spec-038 vault contract), never stored here. + +use serde::{Deserialize, Serialize}; + +/// Which journal backend an execution uses. +/// +/// `Restate` is only meaningful when the `restate` feature and an external Restate server are +/// available; the variant is accepted in configuration regardless so a config can be authored +/// ahead of the backend being compiled in. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Serialize, Deserialize)] +#[serde(rename_all = "lowercase")] +pub enum DurableBackend { + /// Dedicated `durable.db` SQLite/Postgres file managed in-process. The default. + #[default] + Local, + /// External Restate server (feature-gated, server deployments only). + Restate, +} + +/// Configuration for the durable execution layer (`[durable]`). +/// +/// # Examples +/// +/// ``` +/// use zeph_durable::DurableConfig; +/// +/// // An empty table deserializes to the spec defaults. +/// let cfg: DurableConfig = toml::from_str("").unwrap(); +/// assert!(!cfg.enabled); +/// assert_eq!(cfg.journal_ack_timeout_ms, 5000); +/// assert_eq!(cfg.max_payload_bytes, 1_048_576); +/// ``` +#[allow(clippy::struct_excessive_bools)] // config struct — boolean flags are idiomatic for TOML-deserialized configuration +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +#[serde(default)] +pub struct DurableConfig { + /// Master opt-in. When `false`, no journal is opened and behavior is identical to a build + /// without the durable layer. + pub enabled: bool, + /// Selected journal backend. + pub backend: DurableBackend, + /// Encrypt payloads with AEAD. A `false` value is a development-only override (it emits a + /// startup warning) and is forbidden for non-local backends (INV-8). + pub encrypt_payload: bool, + /// P1 adapter: wrap agent-loop steps in durable steps. + pub agent_turns: bool, + /// P2 adapter: journal the orchestration `/plan resume` replan budget. + pub orchestration: bool, + /// P3 adapter: exactly-once scheduler job fire. + pub scheduler: bool, + /// P4 adapter: durable promise for subagent spawn/await. + pub subagent: bool, + /// Group-commit interval for buffered appends, in milliseconds. + pub journal_flush_interval_ms: u64, + /// Timeout for an acknowledged append before degrading to non-durable mode, in milliseconds. + pub journal_ack_timeout_ms: u64, + /// In-execution step cap (soft fold at 90%, hard abort at 100%). + pub max_steps_per_execution: u32, + /// Maximum payload size in bytes, enforced on both append and read. + pub max_payload_bytes: u64, + /// Database fallback poll interval for parked promises, in seconds. + pub promise_poll_interval_secs: u64, + /// Above this many parked promises, resolution falls back to pure polling. + pub max_parked_promises: u32, + /// Journal retention and compaction policy (`[durable.retention]`). + pub retention: RetentionPolicy, +} + +impl Default for DurableConfig { + fn default() -> Self { + Self { + enabled: false, + backend: DurableBackend::Local, + encrypt_payload: true, + agent_turns: true, + orchestration: true, + scheduler: true, + subagent: true, + journal_flush_interval_ms: 10, + journal_ack_timeout_ms: 5000, + max_steps_per_execution: 10_000, + max_payload_bytes: 1_048_576, + promise_poll_interval_secs: 2, + max_parked_promises: 1000, + retention: RetentionPolicy::default(), + } + } +} + +/// Journal retention and compaction policy (`[durable.retention]`). +/// +/// Drives the background prune sweep, which never runs on the dispatch hot path. +/// +/// # Examples +/// +/// ``` +/// use zeph_durable::RetentionPolicy; +/// +/// let policy = RetentionPolicy::default(); +/// assert_eq!(policy.ttl_completed_secs, 604_800); // 7 days +/// assert_eq!(policy.ttl_failed_secs, 2_592_000); // 30 days +/// ``` +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +#[serde(default)] +pub struct RetentionPolicy { + /// Prune completed executions older than this, in seconds. + pub ttl_completed_secs: u64, + /// Prune failed or aborted executions older than this, in seconds. + pub ttl_failed_secs: u64, + /// LRU cap on the number of stored executions. + pub max_executions: u64, + /// Size cap on the journal in bytes; exceeding it triggers an LRU sweep. + pub max_journal_bytes: u64, + /// Rows deleted per transaction during a prune sweep; the task yields between batches. + pub prune_batch_size: u64, + /// Background prune poll interval, in seconds. + pub prune_interval_secs: u64, +} + +impl Default for RetentionPolicy { + fn default() -> Self { + Self { + ttl_completed_secs: 604_800, + ttl_failed_secs: 2_592_000, + max_executions: 10_000, + max_journal_bytes: 1_073_741_824, + prune_batch_size: 500, + prune_interval_secs: 3600, + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn empty_table_yields_every_spec_default() { + let cfg: DurableConfig = toml::from_str("").unwrap(); + assert!(!cfg.enabled); + assert_eq!(cfg.backend, DurableBackend::Local); + assert!(cfg.encrypt_payload); + assert!(cfg.agent_turns); + assert!(cfg.orchestration); + assert!(cfg.scheduler); + assert!(cfg.subagent); + assert_eq!(cfg.journal_flush_interval_ms, 10); + assert_eq!(cfg.journal_ack_timeout_ms, 5000); + assert_eq!(cfg.max_steps_per_execution, 10_000); + assert_eq!(cfg.max_payload_bytes, 1_048_576); + assert_eq!(cfg.promise_poll_interval_secs, 2); + assert_eq!(cfg.max_parked_promises, 1000); + } + + #[test] + fn empty_table_yields_retention_defaults() { + let cfg: DurableConfig = toml::from_str("").unwrap(); + assert_eq!(cfg.retention.ttl_completed_secs, 604_800); + assert_eq!(cfg.retention.ttl_failed_secs, 2_592_000); + assert_eq!(cfg.retention.max_executions, 10_000); + assert_eq!(cfg.retention.max_journal_bytes, 1_073_741_824); + assert_eq!(cfg.retention.prune_batch_size, 500); + assert_eq!(cfg.retention.prune_interval_secs, 3600); + } + + #[test] + fn default_impl_matches_serde_default() { + let from_toml: DurableConfig = toml::from_str("").unwrap(); + assert_eq!(from_toml, DurableConfig::default()); + } + + #[test] + fn partial_table_overrides_only_named_fields() { + let cfg: DurableConfig = toml::from_str( + r#" + enabled = true + backend = "restate" + + [retention] + prune_batch_size = 999 + "#, + ) + .unwrap(); + assert!(cfg.enabled); + assert_eq!(cfg.backend, DurableBackend::Restate); + // Untouched fields keep their defaults. + assert_eq!(cfg.journal_ack_timeout_ms, 5000); + assert_eq!(cfg.retention.prune_batch_size, 999); + assert_eq!(cfg.retention.ttl_completed_secs, 604_800); + } +} diff --git a/crates/zeph-durable/src/effect.rs b/crates/zeph-durable/src/effect.rs new file mode 100644 index 000000000..a485cd03d --- /dev/null +++ b/crates/zeph-durable/src/effect.rs @@ -0,0 +1,64 @@ +// SPDX-FileCopyrightText: 2026 Andrei G +// SPDX-License-Identifier: MIT OR Apache-2.0 + +//! The per-step side-effect contract. +//! +//! [`EffectClass`] declares how a step's side effect behaves under replay. It is the foundation of +//! the exactly-once machinery: the replay cursor uses it to decide whether a journaled result may +//! be returned without re-running the operation. +//! +//! The ambiguity-policy types (`OnAmbiguous`, `EffectIntentSubClass`) and the construction-time +//! policy rule land alongside the durable step primitive in a follow-up issue. + +/// How a step's side effect behaves under replay. +/// +/// This classification is recorded with every step result so the replay cursor can reason about +/// re-execution safety without inspecting the payload. +/// +/// # Examples +/// +/// ``` +/// use zeph_durable::EffectClass; +/// +/// // A pure or naturally-idempotent step is safe to skip on replay. +/// assert_eq!(EffectClass::Idempotent.as_str(), "idempotent"); +/// ``` +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +pub enum EffectClass { + /// The operation is pure or naturally idempotent: a replayed step returns the journaled result + /// and never invokes the operation closure again (INV-10). + Idempotent, + /// The operation tolerates being run more than once. On an ambiguous replay it may be re-run + /// without correctness loss, accepting at-least-once delivery. + AtLeastOnce, + /// The operation must run exactly once. It is fenced by an [`crate::IdempotencyKey`] and an + /// explicit ambiguity policy; a replayed guarded step never re-fires a committed effect. + ExactlyOnceGuarded, +} + +impl EffectClass { + /// Return the canonical lower-snake-case string used in the `effect_class` journal column. + #[must_use] + pub fn as_str(self) -> &'static str { + match self { + Self::Idempotent => "idempotent", + Self::AtLeastOnce => "at_least_once", + Self::ExactlyOnceGuarded => "exactly_once_guarded", + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn effect_class_as_str_is_stable() { + assert_eq!(EffectClass::Idempotent.as_str(), "idempotent"); + assert_eq!(EffectClass::AtLeastOnce.as_str(), "at_least_once"); + assert_eq!( + EffectClass::ExactlyOnceGuarded.as_str(), + "exactly_once_guarded" + ); + } +} diff --git a/crates/zeph-durable/src/error.rs b/crates/zeph-durable/src/error.rs new file mode 100644 index 000000000..88c0b342b --- /dev/null +++ b/crates/zeph-durable/src/error.rs @@ -0,0 +1,96 @@ +// SPDX-FileCopyrightText: 2026 Andrei G +// SPDX-License-Identifier: MIT OR Apache-2.0 + +//! The crate-wide error type. +//! +//! [`DurableError`] never carries payload bytes or resolver tokens in its messages (INV-5): every +//! variant reports metadata only, so an error can be logged without leaking sealed content. + +use crate::ids::StepId; + +/// An error raised by the durable execution layer. +/// +/// The enum is `#[non_exhaustive]`: follow-up issues add variants as runtime behavior lands, and +/// downstream `match` expressions must keep a wildcard arm. +#[derive(Debug, thiserror::Error)] +#[non_exhaustive] +pub enum DurableError { + /// The replayed step's descriptor fingerprint did not match the fingerprint journaled for this + /// [`StepId`] (INV-3). The execution is discarded and restarted fresh rather than returning a + /// result for a structurally different step. + #[error("replay divergence at step {step_id}: journaled descriptor fingerprint mismatch")] + ReplayDivergence { + /// The step whose fingerprint diverged. + step_id: StepId, + }, + + /// A destructive or security-relevant `ExactlyOnceGuarded` step was constructed without an + /// explicit ambiguity policy. The safety decision must be made at the call site, not deferred + /// to a runtime default. + #[error("step '{step}' requires an explicit on_ambiguous policy for its effect class")] + AmbiguityPolicyRequired { + /// The name of the offending step descriptor. + step: &'static str, + }, + + /// The journal writer did not acknowledge an append within the configured timeout, or is + /// otherwise unreachable. The calling path degrades to non-durable mode rather than hanging + /// (INV-12). + #[error("journal writer unavailable: append was not acknowledged in time")] + JournalUnavailable, + + /// A payload exceeded the configured `max_payload_bytes` limit. Enforced on both append and + /// read; it fails closed and never panics (INV-11). + #[error("payload of {size} bytes exceeds the {max}-byte limit")] + PayloadTooLarge { + /// The size of the offending payload, in bytes. + size: u64, + /// The configured maximum payload size, in bytes. + max: u64, + }, + + /// A journal entry could not be decoded: corrupt, truncated, or written under an unknown wire + /// format version. Fails closed. + #[error("failed to decode journal entry: {context}")] + Decode { + /// A non-sensitive description of the decode failure. + context: &'static str, + }, + + /// AEAD authentication failed when opening a sealed payload: the entry was forged, moved to a + /// different step, or replayed under a different execution. Fails closed. + #[error("replay integrity check failed: sealed payload did not authenticate")] + ReplayIntegrity, + + /// An execution exceeded the hard per-execution step cap and was aborted rather than allowed to + /// grow unboundedly. + #[error("execution exceeded the step cap of {cap} steps")] + StepCapExceeded { + /// The configured hard step cap. + cap: u32, + }, +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn messages_are_metadata_only() { + let err = DurableError::PayloadTooLarge { + size: 2_000_000, + max: 1_048_576, + }; + let rendered = err.to_string(); + assert!(rendered.contains("2000000")); + assert!(rendered.contains("1048576")); + } + + #[test] + fn replay_divergence_reports_step() { + let err = DurableError::ReplayDivergence { + step_id: StepId::new(12), + }; + assert!(err.to_string().contains("step 12")); + } +} diff --git a/crates/zeph-durable/src/ids.rs b/crates/zeph-durable/src/ids.rs new file mode 100644 index 000000000..7b6aaefe8 --- /dev/null +++ b/crates/zeph-durable/src/ids.rs @@ -0,0 +1,411 @@ +// SPDX-FileCopyrightText: 2026 Andrei G +// SPDX-License-Identifier: MIT OR Apache-2.0 + +//! Journal-boundary newtypes. +//! +//! Every identifier that crosses the journal boundary is a distinct newtype with private fields +//! and a smart constructor. No raw `String` or `i64` is passed across the API, which makes it +//! impossible to confuse, say, a [`JournalSeq`] with a [`StepId`]. Each newtype is serde-round-trip +//! stable so it can be persisted and reloaded without loss. + +use std::fmt; + +use serde::{Deserialize, Serialize}; +use uuid::Uuid; + +/// Domain-separation context for [`IdempotencyKey`] derivation. +/// +/// Passed to BLAKE3's `derive_key` mode so an idempotency key can never collide with a hash +/// produced for any other purpose, even under identical key material. +const IDEMPOTENCY_CONTEXT: &str = "zeph-durable v1 idempotency-key 2026"; + +/// Identifier of a single durable execution. +/// +/// Runtime-minted as a `UUIDv7` (time-ordered) at execution start. It is **never** consumer-supplied +/// for a fresh execution — a resumed execution reuses the persisted value, but a new one always +/// calls [`ExecutionId::new`]. +/// +/// # Examples +/// +/// ``` +/// use zeph_durable::ExecutionId; +/// +/// let a = ExecutionId::new(); +/// let b = ExecutionId::new(); +/// assert_ne!(a, b, "each execution gets a distinct identity"); +/// ``` +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)] +pub struct ExecutionId(Uuid); + +impl ExecutionId { + /// Mint a fresh, time-ordered execution identity. + #[must_use] + pub fn new() -> Self { + Self(Uuid::now_v7()) + } + + /// Return the underlying UUID. + #[must_use] + pub fn as_uuid(self) -> Uuid { + self.0 + } + + /// Return the 16 raw bytes of the underlying UUID. + #[must_use] + pub fn as_bytes(&self) -> &[u8; 16] { + self.0.as_bytes() + } +} + +impl Default for ExecutionId { + fn default() -> Self { + Self::new() + } +} + +/// Position of a step within an execution. +/// +/// Assigned at the moment a step is *called* (the Nth call in program order is `StepId(N)`), never +/// at completion, so the value is stable across replays regardless of concurrent completion order +/// (INV-2). Wraps a [`u32`]: an execution is capped well below `u32::MAX` steps by the retention +/// policy. +/// +/// # Examples +/// +/// ``` +/// use zeph_durable::StepId; +/// +/// let step = StepId::new(7); +/// assert_eq!(step.value(), 7); +/// ``` +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Deserialize)] +pub struct StepId(u32); + +impl StepId { + /// Wrap a raw step position. + /// + /// The value normally comes from the execution's atomic step counter; this constructor exists + /// for the journal backend and tests that reconstruct a persisted step. + #[must_use] + pub fn new(value: u32) -> Self { + Self(value) + } + + /// Return the raw step position. + #[must_use] + pub fn value(self) -> u32 { + self.0 + } +} + +impl fmt::Display for StepId { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt::Display::fmt(&self.0, f) + } +} + +/// Global append order of a journal entry — the durability anchor. +/// +/// Assigned by the database (an autoincrement / `BIGSERIAL` column), so it is monotonically +/// increasing across all entries of all executions in a journal. Wraps an [`i64`] to match the +/// column type. +/// +/// # Examples +/// +/// ``` +/// use zeph_durable::JournalSeq; +/// +/// let first = JournalSeq::new(1); +/// let second = JournalSeq::new(2); +/// assert!(second > first); +/// ``` +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Deserialize)] +pub struct JournalSeq(i64); + +impl JournalSeq { + /// Wrap a database-assigned sequence number. + #[must_use] + pub fn new(value: i64) -> Self { + Self(value) + } + + /// Return the raw sequence number. + #[must_use] + pub fn value(self) -> i64 { + self.0 + } +} + +/// Domain-separated deduplication key for a non-idempotent effect. +/// +/// Derived with BLAKE3 in `derive_key` mode from `(execution_id, step_id, op_fingerprint)`. The +/// derivation is injective (length-delimited input) so an attacker-controlled `op_fingerprint` +/// cannot be crafted to collide with a different `(execution_id, step_id)` pair. The key is a +/// *deduplication discriminator only* — never the sole trust basis for skipping a guarded effect. +/// +/// # Examples +/// +/// ``` +/// use zeph_durable::{ExecutionId, IdempotencyKey, StepId}; +/// +/// let exec = ExecutionId::new(); +/// let a = IdempotencyKey::derive(exec, StepId::new(0), b"transfer:acct-7"); +/// let b = IdempotencyKey::derive(exec, StepId::new(0), b"transfer:acct-7"); +/// let c = IdempotencyKey::derive(exec, StepId::new(1), b"transfer:acct-7"); +/// assert_eq!(a, b, "same inputs derive the same key"); +/// assert_ne!(a, c, "a different step derives a different key"); +/// ``` +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)] +pub struct IdempotencyKey([u8; 32]); + +impl IdempotencyKey { + /// Derive an idempotency key from the execution identity, step position, and an opaque + /// operation fingerprint. + /// + /// The fingerprint MUST be derived from non-secret descriptors only (e.g. a tool name and its + /// non-secret arguments); resolved secret material MUST NOT be passed here (INV-6). + /// + /// The input is length-delimited — `len(execution_id) || execution_id || len(step_id) || + /// step_id || op_fingerprint` — so the field boundaries are unambiguous and the derivation is + /// injective. The fixed BLAKE3 `derive_key` context string keeps these keys disjoint from any + /// other BLAKE3 use in the workspace. + #[must_use] + pub fn derive(execution_id: ExecutionId, step_id: StepId, op_fingerprint: &[u8]) -> Self { + let exec_bytes = execution_id.as_bytes(); + let step_bytes = step_id.value().to_le_bytes(); + debug_assert_eq!(exec_bytes.len(), 16, "UUID is always 16 bytes"); + debug_assert_eq!(step_bytes.len(), 4, "u32 is always 4 bytes"); + + // Length-prefix each fixed-width field (injective framing); the variable-length + // op_fingerprint is appended last, where its boundary is unambiguous. + let mut input = Vec::with_capacity(4 + 16 + 4 + 4 + op_fingerprint.len()); + input.extend_from_slice(&16u32.to_le_bytes()); + input.extend_from_slice(exec_bytes); + input.extend_from_slice(&4u32.to_le_bytes()); + input.extend_from_slice(&step_bytes); + input.extend_from_slice(op_fingerprint); + + Self(blake3::derive_key(IDEMPOTENCY_CONTEXT, &input)) + } + + /// Return the 32 raw key bytes. + #[must_use] + pub fn as_bytes(&self) -> &[u8; 32] { + &self.0 + } +} + +/// Reference to an external-completion handle (HITL, A2A async, subagent result). +/// +/// A `PromiseId` is **not** a bearer capability: resolving a promise additionally requires a +/// separate high-entropy resolver token (INV-9). The id is a `UUIDv7` so it is unguessable for +/// practical purposes and time-ordered for indexing. +/// +/// # Examples +/// +/// ``` +/// use zeph_durable::PromiseId; +/// +/// let id = PromiseId::new(); +/// assert_ne!(id, PromiseId::new()); +/// ``` +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)] +pub struct PromiseId(Uuid); + +impl PromiseId { + /// Mint a fresh promise identity. + #[must_use] + pub fn new() -> Self { + Self(Uuid::now_v7()) + } + + /// Return the underlying UUID. + #[must_use] + pub fn as_uuid(self) -> Uuid { + self.0 + } +} + +impl Default for PromiseId { + fn default() -> Self { + Self::new() + } +} + +/// Handle to a durable timer that wakes at a persisted instant, surviving process restarts. +/// +/// # Examples +/// +/// ``` +/// use zeph_durable::TimerId; +/// +/// let id = TimerId::new(); +/// assert_ne!(id, TimerId::new()); +/// ``` +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)] +pub struct TimerId(Uuid); + +impl TimerId { + /// Mint a fresh timer identity. + #[must_use] + pub fn new() -> Self { + Self(Uuid::now_v7()) + } + + /// Return the underlying UUID. + #[must_use] + pub fn as_uuid(self) -> Uuid { + self.0 + } +} + +impl Default for TimerId { + fn default() -> Self { + Self::new() + } +} + +/// Closed classification of what a durable execution represents. +/// +/// A closed enum (rather than a free-form string) prevents typos and lets the retention policy +/// reason about execution categories. The `Custom` variant carries a compile-time string literal +/// for execution kinds defined outside the standard set. +/// +/// # Examples +/// +/// ``` +/// use zeph_durable::ExecutionKind; +/// +/// assert_eq!(ExecutionKind::AgentTurn.as_str(), "agent_turn"); +/// assert_eq!(ExecutionKind::Custom("nightly_sweep").as_str(), "nightly_sweep"); +/// ``` +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +pub enum ExecutionKind { + /// A single agent reasoning turn (the P1 adapter target). + AgentTurn, + /// An orchestration DAG run (the P2 adapter target). + DagRun, + /// A scheduler job fire (the P3 adapter target). + ScheduledJob, + /// A subagent session (the P4 adapter target). + SubagentSession, + /// A caller-defined execution kind identified by a compile-time literal. + Custom(&'static str), +} + +impl ExecutionKind { + /// Return the canonical lower-snake-case string used in the `kind` journal column. + /// + /// For [`ExecutionKind::Custom`] the inner literal is returned verbatim. + #[must_use] + pub fn as_str(&self) -> &'static str { + match self { + Self::AgentTurn => "agent_turn", + Self::DagRun => "dag_run", + Self::ScheduledJob => "scheduled_job", + Self::SubagentSession => "subagent_session", + Self::Custom(name) => name, + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn execution_id_new_is_unique() { + let a = ExecutionId::new(); + let b = ExecutionId::new(); + assert_ne!(a, b); + } + + #[test] + fn promise_and_timer_ids_are_unique() { + assert_ne!(PromiseId::new(), PromiseId::new()); + assert_ne!(TimerId::new(), TimerId::new()); + } + + #[test] + fn execution_id_serde_round_trip() { + let id = ExecutionId::new(); + let json = serde_json::to_string(&id).unwrap(); + let back: ExecutionId = serde_json::from_str(&json).unwrap(); + assert_eq!(id, back); + } + + #[test] + fn step_id_serde_round_trip_and_accessor() { + let step = StepId::new(42); + assert_eq!(step.value(), 42); + let json = serde_json::to_string(&step).unwrap(); + let back: StepId = serde_json::from_str(&json).unwrap(); + assert_eq!(step, back); + } + + #[test] + fn journal_seq_serde_round_trip_and_ordering() { + let seq = JournalSeq::new(99); + assert_eq!(seq.value(), 99); + assert!(JournalSeq::new(2) > JournalSeq::new(1)); + let json = serde_json::to_string(&seq).unwrap(); + let back: JournalSeq = serde_json::from_str(&json).unwrap(); + assert_eq!(seq, back); + } + + #[test] + fn promise_and_timer_serde_round_trip() { + let promise = PromiseId::new(); + let timer = TimerId::new(); + let pj = serde_json::to_string(&promise).unwrap(); + let tj = serde_json::to_string(&timer).unwrap(); + assert_eq!(promise, serde_json::from_str::(&pj).unwrap()); + assert_eq!(timer, serde_json::from_str::(&tj).unwrap()); + } + + #[test] + fn idempotency_key_serde_round_trip() { + let key = IdempotencyKey::derive(ExecutionId::new(), StepId::new(3), b"op"); + let json = serde_json::to_string(&key).unwrap(); + let back: IdempotencyKey = serde_json::from_str(&json).unwrap(); + assert_eq!(key, back); + } + + #[test] + fn idempotency_key_is_deterministic() { + let exec = ExecutionId::new(); + let a = IdempotencyKey::derive(exec, StepId::new(5), b"tool:read"); + let b = IdempotencyKey::derive(exec, StepId::new(5), b"tool:read"); + assert_eq!(a, b); + } + + #[test] + fn idempotency_key_varies_with_each_input() { + let exec = ExecutionId::new(); + let other = ExecutionId::new(); + let base = IdempotencyKey::derive(exec, StepId::new(0), b"op"); + assert_ne!(base, IdempotencyKey::derive(other, StepId::new(0), b"op")); + assert_ne!(base, IdempotencyKey::derive(exec, StepId::new(1), b"op")); + assert_ne!(base, IdempotencyKey::derive(exec, StepId::new(0), b"op2")); + } + + #[test] + fn idempotency_key_framing_is_injective() { + // The length-delimited framing keeps the step_id/op_fingerprint boundary unambiguous: + // moving the step bytes into the fingerprint must change the derived key. A naive + // concatenation that merged the two fields could collide here. + let exec = ExecutionId::new(); + let with_step = IdempotencyKey::derive(exec, StepId::new(2), b""); + let with_fingerprint = IdempotencyKey::derive(exec, StepId::new(0), &2u32.to_le_bytes()); + assert_ne!(with_step, with_fingerprint); + } + + #[test] + fn execution_kind_as_str_is_stable() { + assert_eq!(ExecutionKind::AgentTurn.as_str(), "agent_turn"); + assert_eq!(ExecutionKind::DagRun.as_str(), "dag_run"); + assert_eq!(ExecutionKind::ScheduledJob.as_str(), "scheduled_job"); + assert_eq!(ExecutionKind::SubagentSession.as_str(), "subagent_session"); + assert_eq!(ExecutionKind::Custom("x").as_str(), "x"); + } +} diff --git a/crates/zeph-durable/src/journal.rs b/crates/zeph-durable/src/journal.rs new file mode 100644 index 000000000..6b94b4c46 --- /dev/null +++ b/crates/zeph-durable/src/journal.rs @@ -0,0 +1,336 @@ +// SPDX-FileCopyrightText: 2026 Andrei G +// SPDX-License-Identifier: MIT OR Apache-2.0 + +//! The append-only journal abstraction and its data model. +//! +//! A [`Journal`] records the control flow of an execution as an ordered sequence of +//! [`JournalEntry`] values. Each entry is one [`EntryKind`] — a closed enum that makes illegal +//! states unrepresentable: control entries (effect intents, promise creation, timer arming) carry +//! no ciphertext payload field at all, so a "control entry with payload" cannot be constructed. +//! +//! This module defines the *types* only. The concrete journal backends, the writer actor, and the +//! replay cursor land in follow-up issues. + +use std::future::Future; + +use bytes::Bytes; + +use crate::config::RetentionPolicy; +use crate::effect::EffectClass; +use crate::error::DurableError; +use crate::ids::{ + ExecutionId, ExecutionKind, IdempotencyKey, JournalSeq, PromiseId, StepId, TimerId, +}; + +/// Terminal and in-flight status of a durable execution. +/// +/// Maps one-to-one to the `status` column `CHECK` constraint +/// (`'running' | 'completed' | 'failed' | 'aborted'`). +/// +/// # Examples +/// +/// ``` +/// use zeph_durable::ExecutionStatus; +/// +/// assert_eq!(ExecutionStatus::Completed.as_str(), "completed"); +/// assert!(ExecutionStatus::Running.is_running()); +/// assert!(!ExecutionStatus::Failed.is_running()); +/// ``` +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +pub enum ExecutionStatus { + /// The execution is in flight. + Running, + /// The execution finished successfully. + Completed, + /// The execution ended in an error. + Failed, + /// The execution was discarded (e.g. after a replay divergence or step-cap abort). + Aborted, +} + +impl ExecutionStatus { + /// Return the canonical string used in the `status` column. + #[must_use] + pub fn as_str(self) -> &'static str { + match self { + Self::Running => "running", + Self::Completed => "completed", + Self::Failed => "failed", + Self::Aborted => "aborted", + } + } + + /// Whether the execution is still in flight (not yet in a terminal state). + #[must_use] + pub fn is_running(self) -> bool { + matches!(self, Self::Running) + } +} + +/// The kind of a single journal entry. +/// +/// A closed enum: an exhaustive `match` over its variants is required, which guarantees every +/// replay-relevant entry shape is handled. Only the variants that genuinely carry data +/// (`StepResult`, `PromiseResolved`, `Checkpoint`) own a `payload`/`snapshot` field; the control +/// entries hold identifiers and an optional row-level HMAC instead, so an illegal "control entry +/// with payload" is unrepresentable. +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum EntryKind { + /// The committed result of a completed step. The `payload` is AEAD-sealed + /// (`nonce || ciphertext || tag`). + StepResult { + /// Deduplication key for the step's effect. + idempotency_key: IdempotencyKey, + /// Sealed result bytes. + payload: Bytes, + /// How the step's side effect behaves under replay. + effect: EffectClass, + /// Wire-format version discriminator for the sealed payload. + payload_version: u8, + }, + /// An intent to run an exactly-once-guarded effect, journaled before the effect fires. + EffectIntent { + /// Deduplication key for the guarded effect. + idempotency_key: IdempotencyKey, + /// How the step's side effect behaves under replay. + effect: EffectClass, + /// Row-level HMAC for shared-DB / Restate deployments; `None` for single-user `SQLite`. + hmac: Option<[u8; 32]>, + }, + /// Creation of an external-completion promise. + PromiseCreated { + /// The new promise's identifier. + promise_id: PromiseId, + /// BLAKE3 hash of the 32-byte resolver token (the token itself is never journaled). + resolver_token_hash: [u8; 32], + /// Row-level HMAC for shared-DB / Restate deployments; `None` for single-user `SQLite`. + hmac: Option<[u8; 32]>, + }, + /// Resolution of a previously-created promise with its sealed result. + PromiseResolved { + /// The resolved promise's identifier. + promise_id: PromiseId, + /// Sealed resolution bytes. + payload: Bytes, + }, + /// A durable timer was armed to fire at a persisted instant. + TimerArmed { + /// The armed timer's identifier. + timer_id: TimerId, + /// Wake instant, as Unix epoch milliseconds. + due_at_ms: i64, + /// Row-level HMAC for shared-DB / Restate deployments; `None` for single-user `SQLite`. + hmac: Option<[u8; 32]>, + }, + /// A previously-armed timer fired. + TimerFired { + /// The fired timer's identifier. + timer_id: TimerId, + }, + /// A checkpoint fold that compacts the idempotent prefix up to a step. + Checkpoint { + /// All steps strictly below this id are folded into the snapshot. + up_to_step: u32, + /// Sealed snapshot bytes. + snapshot: Bytes, + }, +} + +impl EntryKind { + /// Return the canonical string used in the `entry_kind` column. + /// + /// The `step_result` tag in particular is the predicate of the unique partial index that + /// enforces "at most one committed result per step". + #[must_use] + pub fn tag(&self) -> &'static str { + match self { + Self::StepResult { .. } => "step_result", + Self::EffectIntent { .. } => "effect_intent", + Self::PromiseCreated { .. } => "promise_created", + Self::PromiseResolved { .. } => "promise_resolved", + Self::TimerArmed { .. } => "timer_armed", + Self::TimerFired { .. } => "timer_fired", + Self::Checkpoint { .. } => "checkpoint", + } + } +} + +/// One ordered entry in a journal. +/// +/// `seq` is `None` before the entry is appended and `Some` once the database assigns its global +/// order. The remaining fields locate the entry within its execution. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct JournalEntry { + /// Global append order; `None` until the backend assigns it on append. + pub seq: Option, + /// The execution this entry belongs to. + pub execution_id: ExecutionId, + /// The category of the owning execution. + pub kind: ExecutionKind, + /// The step this entry is associated with. + pub step_id: StepId, + /// The entry payload. + pub entry: EntryKind, + /// Creation time, as Unix epoch milliseconds. + pub created_at_ms: i64, +} + +/// An append-only, ordered journal of execution control flow. +/// +/// Implementations are `Send + Sync` and route all writes through a dedicated connection so that +/// appends are serialized. The returned futures are `Send`, so a journal can be shared across +/// spawned tasks; the trait is consumed via enum dispatch, never as a trait object. +pub trait Journal: Send + Sync { + /// Append an entry and return its database-assigned global sequence number. + /// + /// # Errors + /// + /// Returns [`DurableError::JournalUnavailable`] if the write cannot be acknowledged in time, + /// or [`DurableError::PayloadTooLarge`] if a payload exceeds the configured limit. + fn append( + &self, + entry: JournalEntry, + ) -> impl Future> + Send; + + /// Read every entry of an execution in append order. + /// + /// Intended for short executions; long executions use [`Journal::read_execution_range`] to + /// bound memory. + /// + /// # Errors + /// + /// Returns [`DurableError::Decode`] if a stored entry cannot be decoded, or + /// [`DurableError::JournalUnavailable`] if the journal cannot be read. + fn read_execution( + &self, + id: ExecutionId, + ) -> impl Future, DurableError>> + Send; + + /// Read up to `limit` entries of an execution starting at `from_step_id`. + /// + /// The replay cursor calls this repeatedly to walk a long execution with `O(segment)` memory. + /// + /// # Errors + /// + /// Returns [`DurableError::Decode`] if a stored entry cannot be decoded, or + /// [`DurableError::JournalUnavailable`] if the journal cannot be read. + fn read_execution_range( + &self, + id: ExecutionId, + from_step_id: u32, + limit: usize, + ) -> impl Future, DurableError>> + Send; + + /// Transition an execution to a terminal status. + /// + /// # Errors + /// + /// Returns [`DurableError::JournalUnavailable`] if the transition cannot be committed. + fn finalize( + &self, + id: ExecutionId, + status: ExecutionStatus, + ) -> impl Future> + Send; + + /// Prune terminal executions according to `policy` and return the number of rows deleted. + /// + /// Runs exclusively on a background task — never on the dispatch hot path. + /// + /// # Errors + /// + /// Returns [`DurableError::JournalUnavailable`] if the prune sweep cannot complete. + fn prune( + &self, + policy: &RetentionPolicy, + ) -> impl Future> + Send; +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::ids::ExecutionId; + + fn sample_entry(entry: EntryKind) -> JournalEntry { + JournalEntry { + seq: None, + execution_id: ExecutionId::new(), + kind: ExecutionKind::AgentTurn, + step_id: StepId::new(0), + entry, + created_at_ms: 0, + } + } + + #[test] + fn entry_kind_match_is_exhaustive() { + let key = IdempotencyKey::derive(ExecutionId::new(), StepId::new(0), b"op"); + for entry in [ + EntryKind::StepResult { + idempotency_key: key, + payload: Bytes::from_static(b"x"), + effect: EffectClass::Idempotent, + payload_version: 1, + }, + EntryKind::EffectIntent { + idempotency_key: key, + effect: EffectClass::ExactlyOnceGuarded, + hmac: None, + }, + EntryKind::PromiseCreated { + promise_id: PromiseId::new(), + resolver_token_hash: [0u8; 32], + hmac: Some([1u8; 32]), + }, + EntryKind::PromiseResolved { + promise_id: PromiseId::new(), + payload: Bytes::new(), + }, + EntryKind::TimerArmed { + timer_id: TimerId::new(), + due_at_ms: 100, + hmac: None, + }, + EntryKind::TimerFired { + timer_id: TimerId::new(), + }, + EntryKind::Checkpoint { + up_to_step: 3, + snapshot: Bytes::new(), + }, + ] { + // Exhaustive match — no wildcard arm — over every variant. + let tag = match &entry { + EntryKind::StepResult { .. } => "step_result", + EntryKind::EffectIntent { .. } => "effect_intent", + EntryKind::PromiseCreated { .. } => "promise_created", + EntryKind::PromiseResolved { .. } => "promise_resolved", + EntryKind::TimerArmed { .. } => "timer_armed", + EntryKind::TimerFired { .. } => "timer_fired", + EntryKind::Checkpoint { .. } => "checkpoint", + }; + assert_eq!(tag, entry.tag()); + } + } + + #[test] + fn journal_entry_is_clonable_and_comparable() { + let entry = sample_entry(EntryKind::TimerFired { + timer_id: TimerId::new(), + }); + assert_eq!(entry, entry.clone()); + } + + #[test] + fn execution_status_round_trips_through_str() { + for status in [ + ExecutionStatus::Running, + ExecutionStatus::Completed, + ExecutionStatus::Failed, + ExecutionStatus::Aborted, + ] { + assert!(!status.as_str().is_empty()); + } + assert!(ExecutionStatus::Running.is_running()); + assert!(!ExecutionStatus::Aborted.is_running()); + } +} diff --git a/crates/zeph-durable/src/lib.rs b/crates/zeph-durable/src/lib.rs new file mode 100644 index 000000000..0900c5830 --- /dev/null +++ b/crates/zeph-durable/src/lib.rs @@ -0,0 +1,71 @@ +// SPDX-FileCopyrightText: 2026 Andrei G +// SPDX-License-Identifier: MIT OR Apache-2.0 + +//! Native durable execution layer for Zeph. +//! +//! `zeph-durable` is a Layer-0 infrastructure crate — analogous to `zeph-db` and `zeph-common` — +//! that journals the *control flow* of an execution (individual steps, their inputs and outputs, +//! promises, and timers) so a crashed or interrupted execution can resume at the point of failure +//! rather than restart from scratch. +//! +//! # Architectural placement +//! +//! Consumers of this crate span several layers (`zeph-scheduler`, `zeph-subagent`, +//! `zeph-orchestration`, `zeph-agent-tools`), so the crate must sit at Layer 0. It is a pure +//! infrastructure primitive: it sees opaque serialized payloads, never domain types, and it +//! MUST NOT depend on `zeph-llm`, `zeph-memory`, `zeph-core`, `zeph-sanitizer`, or any +//! business-layer crate (INV-1). Domain meaning lives in thin adapter modules inside each +//! consuming crate. +//! +//! # Scaffold scope +//! +//! This release establishes the type-level foundation only: +//! +//! - [`ids`] — the journal-boundary newtypes ([`ExecutionId`], [`StepId`], [`JournalSeq`], +//! [`IdempotencyKey`], [`PromiseId`], [`TimerId`]) and the [`ExecutionKind`] discriminator. +//! - [`journal`] — the [`Journal`] trait plus the [`JournalEntry`] / [`EntryKind`] / +//! [`ExecutionStatus`] data model. +//! - [`effect`] — the [`EffectClass`] side-effect contract referenced by journal entries. +//! - [`config`] — the pure-data [`DurableConfig`] and [`RetentionPolicy`] mirroring the +//! `[durable]` TOML section. +//! - [`error`] — the crate-wide [`DurableError`]. +//! +//! No execution behavior exists yet: the journal writer, execution backends, replay cursor, and +//! the durable step primitive land in follow-up issues. +//! +//! # Schema ownership +//! +//! `zeph-durable` owns **no** `.sql` files and **no** `sqlx::migrate!`. All durable schema (the +//! four `durable_*` tables) lives as numbered migration files in +//! `zeph-db/migrations/{sqlite,postgres}/` and is applied via `zeph_db::run_migrations` against a +//! dedicated `durable.db` pool (INV-14). +//! +//! # Examples +//! +//! ``` +//! use zeph_durable::{ExecutionId, IdempotencyKey, StepId}; +//! +//! // Each execution gets a fresh, runtime-minted identity. +//! let execution = ExecutionId::new(); +//! +//! // Idempotency keys are domain-separated and deterministic for a given step. +//! let key = IdempotencyKey::derive(execution, StepId::new(0), b"tool:read_file"); +//! assert_eq!(key, IdempotencyKey::derive(execution, StepId::new(0), b"tool:read_file")); +//! ``` + +mod sealed; + +pub mod config; +pub mod effect; +pub mod error; +pub mod ids; +pub mod journal; + +#[doc(hidden)] +pub use sealed::Sealed; + +pub use config::{DurableBackend, DurableConfig, RetentionPolicy}; +pub use effect::EffectClass; +pub use error::DurableError; +pub use ids::{ExecutionId, ExecutionKind, IdempotencyKey, JournalSeq, PromiseId, StepId, TimerId}; +pub use journal::{EntryKind, ExecutionStatus, Journal, JournalEntry}; diff --git a/crates/zeph-durable/src/sealed.rs b/crates/zeph-durable/src/sealed.rs new file mode 100644 index 000000000..32cba2c8f --- /dev/null +++ b/crates/zeph-durable/src/sealed.rs @@ -0,0 +1,20 @@ +// SPDX-FileCopyrightText: 2026 Andrei G +// SPDX-License-Identifier: MIT OR Apache-2.0 + +//! Sealing token for the durable backend trait hierarchy. +//! +//! The execution-backend trait (landing in a later issue) is sealed so that only backends +//! declared inside `zeph-durable` — `LocalBackend` and the feature-gated `RestateBackend` — can +//! implement it. External crates select a backend through the `DurableBackendEnum` enum-dispatch +//! type, never by providing their own implementation. This keeps the backend surface closed and +//! makes adding backend methods a non-breaking change. + +/// Sealing supertrait for the durable backend hierarchy. +/// +/// # Stability +/// +/// This trait is `#[doc(hidden)]`. External crates MUST NOT implement it. Because no external +/// implementations can exist, the sealed traits that depend on it may gain methods without a +/// breaking change. +#[doc(hidden)] +pub trait Sealed {}