diff --git a/CHANGELOG.md b/CHANGELOG.md index beb9feee3..314775e04 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -88,6 +88,17 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/). hard cap (100%) aborts with `DurableError::StepCapExceeded`. A resume replays folded steps from the checkpoint snapshot — preserving each step's idempotency key for the divergence guard — without re-running their operations. (#4948) +- `feat(durable)`: wired the durable execution layer into all mandatory integration points (spec-064 + C6). A new `zeph durable` CLI group (`list`/`show`/`inspect`/`prune`/`resume`) connects directly to + `durable.db` with no running agent; output is redacted by default (INV-5) and `--reveal` decrypts + through the vault-resolved `ZEPH_DURABLE_KEY` (FR-DE-07/FR-DE-08). The `[durable]` config section is + now part of the root `Config`, the `--init` wizard generates and stores `ZEPH_DURABLE_KEY` in the + age vault (never inline), and an additive, idempotent `--migrate-config` step adds `[durable]` + (default-off) to existing configs. A ratatui `DurableView` (command-palette `durable`, `D` key) + shows in-flight executions with mandatory status spinners (spec-011), fed by a read-only poll task. + The pure-data `DurableConfig`/`RetentionPolicy`/`DurableBackend` moved to `zeph-config` (single + source of truth, re-exported by `zeph-durable`); the AEAD enforcement gate `encryption_gate` is now + a free function in `zeph-durable`. (#4949) ### Fixed diff --git a/Cargo.lock b/Cargo.lock index 41466c8ef..4b78589f6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -10475,6 +10475,7 @@ dependencies = [ "zeph-context", "zeph-core", "zeph-db", + "zeph-durable", "zeph-experiments", "zeph-gateway", "zeph-index", @@ -10874,6 +10875,7 @@ dependencies = [ "toml 1.1.2+spec-1.1.0", "tracing", "uuid", + "zeph-config", "zeph-db", "zeroize", ] diff --git a/Cargo.toml b/Cargo.toml index 8baf9d45f..235efc480 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -262,6 +262,7 @@ sqlite = [ "zeph-tools/sqlite", "zeph-scheduler?/sqlite", "zeph-core/sqlite", + "zeph-durable/sqlite", ] postgres = [ "zeph-db/postgres", @@ -272,6 +273,7 @@ postgres = [ "zeph-tools/postgres", "zeph-scheduler?/postgres", "zeph-core/postgres", + "zeph-durable/postgres", ] [dependencies] @@ -322,6 +324,7 @@ zeph-config.workspace = true zeph-context.workspace = true zeph-core.workspace = true zeph-db.workspace = true +zeph-durable.workspace = true zeph-experiments.workspace = true zeph-gateway = { workspace = true, optional = true } zeph-index.workspace = true diff --git a/book/src/reference/cli.md b/book/src/reference/cli.md index c30ee4bea..1e26c91e2 100644 --- a/book/src/reference/cli.md +++ b/book/src/reference/cli.md @@ -24,6 +24,7 @@ zeph [OPTIONS] [COMMAND] | `sessions` | Manage ACP session history — list, show, delete (requires `acp` feature) | | `schedule` | Manage cron-based scheduled jobs — list, add, remove, show (requires `scheduler` feature; see [Scheduler](../concepts/scheduler.md)) | | `db` | Database management — run migrations, check status (see [Database Abstraction](../concepts/database.md)) | +| `durable` | Inspect the durable execution journal — list, show, inspect, prune, resume (see [Durable Journal Encryption](security/durable-encryption.md)) | | `migrate-config` | Add missing config parameters as commented-out blocks and reformat the file (see [Migrate Config](../guides/migrate-config.md)) | | `worktree` | Manage background sub-agent git worktrees — list active, remove stale (requires `[worktree] enabled = true`; see [Worktree Isolation](../guides/worktree.md)) | @@ -43,6 +44,28 @@ zeph db migrate # apply pending migrations zeph db migrate --status # check what would be applied ``` +### `zeph durable` + +Inspect the durable execution journal directly — no running agent process is +required. Output is **redacted by default** (INV-5): payload bytes and resolver +tokens are shown only with `--reveal`, which decrypts through the vault-resolved +`ZEPH_DURABLE_KEY`. + +| Subcommand | Description | +|------------|-------------| +| `durable list [--status ] [--kind ] [--limit ]` | List executions, newest first | +| `durable show [--reveal]` | Show an execution's journal entries (metadata only by default) | +| `durable inspect --step [--reveal]` | Inspect a single step entry | +| `durable prune [--dry-run]` | Sweep terminal executions past their TTL | +| `durable resume ` | Report resume state for an execution | + +```bash +zeph durable list --status running # in-flight executions +zeph durable show # redacted journal entries +zeph durable show --reveal # decrypted payloads (prints a warning) +zeph durable prune --dry-run # how many would be pruned +``` + ### `zeph init` Generate a `config.toml` through a guided wizard. diff --git a/book/src/reference/configuration.md b/book/src/reference/configuration.md index fd3aefa77..280fed552 100644 --- a/book/src/reference/configuration.md +++ b/book/src/reference/configuration.md @@ -794,6 +794,32 @@ self_check = false # Enable MARCH Proposer+Checker self- [cli.loop] min_interval_secs = 5 # Minimum loop interval in seconds (default: 5) max_iterations = 1000 # Max repetitions before loop auto-stops (default: 1000) + +# Durable execution layer (spec-064). Opt-in, default-off. When enabled, the +# agent journals control flow to a dedicated durable.db for crash-resume. +# The AEAD key is vault-only (ZEPH_DURABLE_KEY); see Durable Journal Encryption. +[durable] +enabled = false # Master opt-in (default: false — current behavior) +backend = "local" # "local" (durable.db) or "restate" (server feature) +encrypt_payload = true # AEAD-encrypt payloads (dev-only override; see security docs) +agent_turns = true # Wrap agent-loop steps when enabled +orchestration = true # Journal /plan resume replan budget when enabled +scheduler = true # Exactly-once scheduler job fire when enabled +subagent = true # Durable promise for subagent spawn/await when enabled +journal_flush_interval_ms = 10 # Group-commit interval for buffered appends (ms) +journal_ack_timeout_ms = 5000 # Acknowledged-append timeout before non-durable degrade (ms) +max_steps_per_execution = 10000 # In-execution step cap (soft fold 90%, hard abort 100%) +max_payload_bytes = 1048576 # Max payload size, enforced on append + read (1 MiB) +promise_poll_interval_secs = 2 # DB fallback poll interval for parked promises (s) +max_parked_promises = 1000 # Above this, promise resolution falls back to polling + +[durable.retention] +ttl_completed_secs = 604800 # Prune completed executions older than this (7 days) +ttl_failed_secs = 2592000 # Prune failed/aborted executions older than this (30 days) +max_executions = 10000 # LRU cap on stored executions +max_journal_bytes = 1073741824 # Journal size cap in bytes (1 GiB) +prune_batch_size = 500 # Rows deleted per transaction during a sweep +prune_interval_secs = 3600 # Background prune poll interval (s) ``` ### Provider Entry Fields diff --git a/book/src/reference/security/durable-encryption.md b/book/src/reference/security/durable-encryption.md index 8c0cd3ecf..16e39eef1 100644 --- a/book/src/reference/security/durable-encryption.md +++ b/book/src/reference/security/durable-encryption.md @@ -35,16 +35,21 @@ rather than decrypted into a bogus result. The cipher key is resolved from the age vault under the key name `ZEPH_DURABLE_KEY`, never from inline TOML or environment variables (the standard -Zeph vault contract). It must be exactly **32 bytes** of high-entropy key -material. +Zeph vault contract). It is exactly **32 bytes** of high-entropy key material, +**base64-encoded** for storage as a vault string value. -Generate and store it once: +The easiest path is the configuration wizard: `zeph --init` generates a fresh +key and stores it in the age vault automatically when you enable durable +execution. To generate and store it manually instead: ```bash -# Generate 32 random bytes and store them in the age vault. -head -c 32 /dev/urandom | zeph vault set ZEPH_DURABLE_KEY --stdin +# Generate 32 random bytes, base64-encode them, and store in the age vault. +head -c 32 /dev/urandom | base64 | zeph vault set ZEPH_DURABLE_KEY --stdin ``` +Inspect a journal with decrypted payloads using `zeph durable show +--reveal`, which resolves and decodes this key. + ## Encryption requirement (`encrypt_payload`) AEAD encryption is **on by default** (`[durable].encrypt_payload = true`). diff --git a/config/default.toml b/config/default.toml index a12b610cd..6bc68626a 100644 --- a/config/default.toml +++ b/config/default.toml @@ -1545,3 +1545,50 @@ provider_persistence = true # command = "osascript -e 'display notification \"Task complete\" with title \"Zeph\"'" # timeout_secs = 3 # fail_closed = false + +# ---------------------------------------------------------------------------- +# Durable execution layer (spec-064). Opt-in, default-off. When enabled, the +# agent journals control flow to a dedicated durable.db so a crashed or +# interrupted execution can resume at the point of failure instead of +# restarting. Inspect with `zeph durable`. The AEAD key is vault-only +# (ZEPH_DURABLE_KEY) and never written inline here. +# ---------------------------------------------------------------------------- +[durable] +# Master opt-in. false = current behavior, no journal opened. +enabled = false +# Journal backend: "local" (dedicated durable.db) | "restate" (server feature). +backend = "local" +# Encrypt payloads with AEAD. Disabling is a dev-only override (forbidden for +# non-local backends and shared databases). +encrypt_payload = true +# Per-adapter opt-in (only take effect when enabled = true). +agent_turns = true +orchestration = true +scheduler = true +subagent = true +# Group-commit interval for buffered appends (ms). +journal_flush_interval_ms = 10 +# Acknowledged-append timeout before degrading to non-durable mode (ms). +journal_ack_timeout_ms = 5000 +# In-execution step cap (soft fold at 90%, hard abort at 100%). +max_steps_per_execution = 10000 +# Maximum payload size in bytes, enforced on append and read (1 MiB). +max_payload_bytes = 1048576 +# Database fallback poll interval for parked promises (seconds). +promise_poll_interval_secs = 2 +# Above this many parked promises, resolution falls back to pure polling. +max_parked_promises = 1000 + +[durable.retention] +# Prune completed executions older than this (seconds, 7 days). +ttl_completed_secs = 604800 +# Prune failed/aborted executions older than this (seconds, 30 days). +ttl_failed_secs = 2592000 +# LRU cap on stored executions. +max_executions = 10000 +# Size cap on the journal in bytes (1 GiB). +max_journal_bytes = 1073741824 +# Rows deleted per transaction during a prune sweep. +prune_batch_size = 500 +# Background prune poll interval (seconds). +prune_interval_secs = 3600 diff --git a/crates/zeph-config/src/durable.rs b/crates/zeph-config/src/durable.rs new file mode 100644 index 000000000..9508c95da --- /dev/null +++ b/crates/zeph-config/src/durable.rs @@ -0,0 +1,206 @@ +// SPDX-FileCopyrightText: 2026 Andrei G +// SPDX-License-Identifier: MIT OR Apache-2.0 + +//! Pure-data configuration for the durable execution layer (`[durable]`). +//! +//! These types mirror the `[durable]` TOML section and are the single source of truth for the +//! durable execution configuration. They live in `zeph-config` (alongside every other subsystem +//! config) so the aggregate [`Config`](crate::Config) can hold them without forcing the heavy +//! `zeph-db`/`sqlx` dependency tree of `zeph-durable` onto the config layer. The `zeph-durable` +//! crate re-exports these types and applies the AEAD enforcement policy (the `encryption_gate`) +//! on top of them. +//! +//! Every field carries a spec default via the container-level `#[serde(default)]` attribute backed +//! by [`Default`], so deserializing an empty table yields a fully-populated, spec-compliant +//! configuration. No credentials appear inline — the AEAD key and any Restate endpoints are +//! resolved from the vault by key name (spec-038 vault contract), never stored here. + +use serde::{Deserialize, Serialize}; + +/// Which journal backend an execution uses. +/// +/// `Restate` is only meaningful when the `restate` feature and an external Restate server are +/// available; the variant is accepted in configuration regardless so a config can be authored +/// ahead of the backend being compiled in. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Serialize, Deserialize)] +#[serde(rename_all = "lowercase")] +pub enum DurableBackend { + /// Dedicated `durable.db` SQLite/Postgres file managed in-process. The default. + #[default] + Local, + /// External Restate server (feature-gated, server deployments only). + Restate, +} + +/// Configuration for the durable execution layer (`[durable]`). +/// +/// # Examples +/// +/// ``` +/// use zeph_config::DurableConfig; +/// +/// // An empty table deserializes to the spec defaults. +/// let cfg: DurableConfig = toml::from_str("").unwrap(); +/// assert!(!cfg.enabled); +/// assert_eq!(cfg.journal_ack_timeout_ms, 5000); +/// assert_eq!(cfg.max_payload_bytes, 1_048_576); +/// ``` +#[allow(clippy::struct_excessive_bools)] // config struct — boolean flags are idiomatic for TOML-deserialized configuration +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +#[serde(default)] +pub struct DurableConfig { + /// Master opt-in. When `false`, no journal is opened and behavior is identical to a build + /// without the durable layer. + pub enabled: bool, + /// Selected journal backend. + pub backend: DurableBackend, + /// Encrypt payloads with AEAD. A `false` value is a development-only override (it emits a + /// startup warning) and is forbidden for non-local backends (INV-8). + pub encrypt_payload: bool, + /// P1 adapter: wrap agent-loop steps in durable steps. + pub agent_turns: bool, + /// P2 adapter: journal the orchestration `/plan resume` replan budget. + pub orchestration: bool, + /// P3 adapter: exactly-once scheduler job fire. + pub scheduler: bool, + /// P4 adapter: durable promise for subagent spawn/await. + pub subagent: bool, + /// Group-commit interval for buffered appends, in milliseconds. + pub journal_flush_interval_ms: u64, + /// Timeout for an acknowledged append before degrading to non-durable mode, in milliseconds. + pub journal_ack_timeout_ms: u64, + /// In-execution step cap (soft fold at 90%, hard abort at 100%). + pub max_steps_per_execution: u32, + /// Maximum payload size in bytes, enforced on both append and read. + pub max_payload_bytes: u64, + /// Database fallback poll interval for parked promises, in seconds. + pub promise_poll_interval_secs: u64, + /// Above this many parked promises, resolution falls back to pure polling. + pub max_parked_promises: u32, + /// Journal retention and compaction policy (`[durable.retention]`). + pub retention: RetentionPolicy, +} + +impl Default for DurableConfig { + fn default() -> Self { + Self { + enabled: false, + backend: DurableBackend::Local, + encrypt_payload: true, + agent_turns: true, + orchestration: true, + scheduler: true, + subagent: true, + journal_flush_interval_ms: 10, + journal_ack_timeout_ms: 5000, + max_steps_per_execution: 10_000, + max_payload_bytes: 1_048_576, + promise_poll_interval_secs: 2, + max_parked_promises: 1000, + retention: RetentionPolicy::default(), + } + } +} + +/// Journal retention and compaction policy (`[durable.retention]`). +/// +/// Drives the background prune sweep, which never runs on the dispatch hot path. +/// +/// # Examples +/// +/// ``` +/// use zeph_config::RetentionPolicy; +/// +/// let policy = RetentionPolicy::default(); +/// assert_eq!(policy.ttl_completed_secs, 604_800); // 7 days +/// assert_eq!(policy.ttl_failed_secs, 2_592_000); // 30 days +/// ``` +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +#[serde(default)] +pub struct RetentionPolicy { + /// Prune completed executions older than this, in seconds. + pub ttl_completed_secs: u64, + /// Prune failed or aborted executions older than this, in seconds. + pub ttl_failed_secs: u64, + /// LRU cap on the number of stored executions. + pub max_executions: u64, + /// Size cap on the journal in bytes; exceeding it triggers an LRU sweep. + pub max_journal_bytes: u64, + /// Rows deleted per transaction during a prune sweep; the task yields between batches. + pub prune_batch_size: u64, + /// Background prune poll interval, in seconds. + pub prune_interval_secs: u64, +} + +impl Default for RetentionPolicy { + fn default() -> Self { + Self { + ttl_completed_secs: 604_800, + ttl_failed_secs: 2_592_000, + max_executions: 10_000, + max_journal_bytes: 1_073_741_824, + prune_batch_size: 500, + prune_interval_secs: 3600, + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn empty_table_yields_every_spec_default() { + let cfg: DurableConfig = toml::from_str("").unwrap(); + assert!(!cfg.enabled); + assert_eq!(cfg.backend, DurableBackend::Local); + assert!(cfg.encrypt_payload); + assert!(cfg.agent_turns); + assert!(cfg.orchestration); + assert!(cfg.scheduler); + assert!(cfg.subagent); + assert_eq!(cfg.journal_flush_interval_ms, 10); + assert_eq!(cfg.journal_ack_timeout_ms, 5000); + assert_eq!(cfg.max_steps_per_execution, 10_000); + assert_eq!(cfg.max_payload_bytes, 1_048_576); + assert_eq!(cfg.promise_poll_interval_secs, 2); + assert_eq!(cfg.max_parked_promises, 1000); + } + + #[test] + fn empty_table_yields_retention_defaults() { + let cfg: DurableConfig = toml::from_str("").unwrap(); + assert_eq!(cfg.retention.ttl_completed_secs, 604_800); + assert_eq!(cfg.retention.ttl_failed_secs, 2_592_000); + assert_eq!(cfg.retention.max_executions, 10_000); + assert_eq!(cfg.retention.max_journal_bytes, 1_073_741_824); + assert_eq!(cfg.retention.prune_batch_size, 500); + assert_eq!(cfg.retention.prune_interval_secs, 3600); + } + + #[test] + fn default_impl_matches_serde_default() { + let from_toml: DurableConfig = toml::from_str("").unwrap(); + assert_eq!(from_toml, DurableConfig::default()); + } + + #[test] + fn partial_table_overrides_only_named_fields() { + let cfg: DurableConfig = toml::from_str( + r#" + enabled = true + backend = "restate" + + [retention] + prune_batch_size = 999 + "#, + ) + .unwrap(); + assert!(cfg.enabled); + assert_eq!(cfg.backend, DurableBackend::Restate); + // Untouched fields keep their defaults. + assert_eq!(cfg.journal_ack_timeout_ms, 5000); + assert_eq!(cfg.retention.prune_batch_size, 999); + assert_eq!(cfg.retention.ttl_completed_secs, 604_800); + } +} diff --git a/crates/zeph-config/src/lib.rs b/crates/zeph-config/src/lib.rs index 1592e0118..7414d8948 100644 --- a/crates/zeph-config/src/lib.rs +++ b/crates/zeph-config/src/lib.rs @@ -78,6 +78,7 @@ pub mod cocoon; mod de_helpers; pub mod defaults; pub mod dump_format; +pub mod durable; mod env; pub mod error; pub mod execution; @@ -126,6 +127,7 @@ pub use defaults::{ is_legacy_default_skills_path, is_legacy_default_sqlite_path, }; pub use dump_format::DumpFormat; +pub use durable::{DurableBackend, DurableConfig, RetentionPolicy}; pub use execution::{EnvironmentConfig, ExecutionConfig}; pub use experiment::{ AdaptOrchConfig, ExperimentConfig, ExperimentSchedule, FailureStrategy, OrchestrationConfig, diff --git a/crates/zeph-config/src/migrate/infra.rs b/crates/zeph-config/src/migrate/infra.rs index d9e4e002c..f0560a314 100644 --- a/crates/zeph-config/src/migrate/infra.rs +++ b/crates/zeph-config/src/migrate/infra.rs @@ -568,3 +568,56 @@ pub fn migrate_worktree_git_timeout(toml_src: &str) -> Result Result { + let commented_present = toml_src.lines().any(|l| l.trim() == "# [durable]"); + if section_header_present(toml_src, "durable") || commented_present { + return Ok(MigrationResult { + output: toml_src.to_owned(), + changed_count: 0, + sections_changed: Vec::new(), + }); + } + + let _doc = toml_src.parse::()?; + + let block = "\n# Native durable execution layer (spec-064, #4949). Opt-in, default-off.\n\ + # [durable]\n\ + # enabled = false\n\ + # backend = \"local\"\n\ + # encrypt_payload = true\n\ + # agent_turns = true\n\ + # orchestration = true\n\ + # scheduler = true\n\ + # subagent = true\n\ + # journal_flush_interval_ms = 10\n\ + # journal_ack_timeout_ms = 5000\n\ + # max_steps_per_execution = 10000\n\ + # max_payload_bytes = 1048576\n\ + # promise_poll_interval_secs = 2\n\ + # max_parked_promises = 1000\n\ + #\n\ + # [durable.retention]\n\ + # ttl_completed_secs = 604800\n\ + # ttl_failed_secs = 2592000\n\ + # max_executions = 10000\n\ + # max_journal_bytes = 1073741824\n\ + # prune_batch_size = 500\n\ + # prune_interval_secs = 3600\n"; + let output = format!("{}{}", toml_src.trim_end(), block); + Ok(MigrationResult { + output, + changed_count: 1, + sections_changed: vec!["durable".to_owned()], + }) +} diff --git a/crates/zeph-config/src/migrate/mod.rs b/crates/zeph-config/src/migrate/mod.rs index 001d7b84f..6641ba757 100644 --- a/crates/zeph-config/src/migrate/mod.rs +++ b/crates/zeph-config/src/migrate/mod.rs @@ -589,19 +589,19 @@ mod steps; use steps::{ MigrateAcpSubagentsConfig, MigrateAgentBudgetHint, MigrateAgentRetryToToolsRetry, MigrateAutodreamConfig, MigrateCocoonProviderNotice, MigrateCocoonShowBalance, - MigrateCompressionPredictorConfig, MigrateDatabaseUrl, MigrateEgressConfig, - MigrateEmbedProviderRename, MigrateFidelityTimeoutDefaults, MigrateFiveSignalConfig, - MigrateFocusAutoConsolidateMinWindow, MigrateForgettingConfig, MigrateGoalsConfig, - MigrateGonkagateToGonka, MigrateHooksPermissionDeniedConfig, MigrateHooksTurnComplete, - MigrateLlmStreamLimits, MigrateMagicDocsConfig, MigrateMcpElicitationConfig, - MigrateMcpMaxConnectAttempts, MigrateMcpRetryAndToolTimeout, MigrateMcpTrustLevels, - MigrateMemoryGraph, MigrateMemoryHebbian, MigrateMemoryHebbianConsolidation, - MigrateMemoryHebbianSpread, MigrateMemoryPersonaConfig, MigrateMemoryReasoning, - MigrateMemoryReasoningJudge, MigrateMemoryRetrieval, MigrateMemoryRetrievalQueryBias, - MigrateMicrocompactConfig, MigrateOrchestrationPersistence, MigrateOrchestratorProvider, - MigrateOtelFilter, MigratePlannerModelToProvider, MigrateProviderMaxConcurrent, - MigrateQdrantApiKey, MigrateQualityConfig, MigrateSandboxConfig, MigrateSandboxEgressFilter, - MigrateSchedulerDaemon, MigrateSessionPersistProviderOverrides, + MigrateCompressionPredictorConfig, MigrateDatabaseUrl, MigrateDurableConfig, + MigrateEgressConfig, MigrateEmbedProviderRename, MigrateFidelityTimeoutDefaults, + MigrateFiveSignalConfig, MigrateFocusAutoConsolidateMinWindow, MigrateForgettingConfig, + MigrateGoalsConfig, MigrateGonkagateToGonka, MigrateHooksPermissionDeniedConfig, + MigrateHooksTurnComplete, MigrateLlmStreamLimits, MigrateMagicDocsConfig, + MigrateMcpElicitationConfig, MigrateMcpMaxConnectAttempts, MigrateMcpRetryAndToolTimeout, + MigrateMcpTrustLevels, MigrateMemoryGraph, MigrateMemoryHebbian, + MigrateMemoryHebbianConsolidation, MigrateMemoryHebbianSpread, MigrateMemoryPersonaConfig, + MigrateMemoryReasoning, MigrateMemoryReasoningJudge, MigrateMemoryRetrieval, + MigrateMemoryRetrievalQueryBias, MigrateMicrocompactConfig, MigrateOrchestrationPersistence, + MigrateOrchestratorProvider, MigrateOtelFilter, MigratePlannerModelToProvider, + MigrateProviderMaxConcurrent, MigrateQdrantApiKey, MigrateQualityConfig, MigrateSandboxConfig, + MigrateSandboxEgressFilter, MigrateSchedulerDaemon, MigrateSessionPersistProviderOverrides, MigrateSessionProviderPersistence, MigrateSessionRecapConfig, MigrateShellTransactional, MigrateSttToProvider, MigrateSupervisorConfig, MigrateTelemetryConfig, MigrateToolsCompressionConfig, MigrateTraceMetadata, MigrateVigilConfig, MigrateWorktreeConfig, @@ -703,6 +703,8 @@ pub static MIGRATIONS: std::sync::LazyLock> Box::new(MigrateWorktreeGitTimeout), // Step 56 — add [llm.stream_limits] commented advisory notice (#4750) Box::new(MigrateLlmStreamLimits), + // Step 57 — add [durable] execution-layer section, default-off (spec-064, #4949) + Box::new(MigrateDurableConfig), ] }); diff --git a/crates/zeph-config/src/migrate/steps.rs b/crates/zeph-config/src/migrate/steps.rs index 471128535..3fe3e4065 100644 --- a/crates/zeph-config/src/migrate/steps.rs +++ b/crates/zeph-config/src/migrate/steps.rs @@ -25,21 +25,21 @@ use super::{ MigrateError, Migration, MigrationResult, migrate_acp_subagents_config, migrate_agent_budget_hint, migrate_agent_retry_to_tools_retry, migrate_autodream_config, migrate_cocoon_provider_notice, migrate_cocoon_show_balance, - migrate_compression_predictor_config, migrate_database_url, migrate_egress_config, - migrate_embed_provider_rename, migrate_fidelity_timeout_defaults, migrate_five_signal_config, - migrate_focus_auto_consolidate_min_window, migrate_forgetting_config, migrate_goals_config, - migrate_hooks_permission_denied_config, migrate_hooks_turn_complete_config, - migrate_llm_stream_limits, migrate_magic_docs_config, migrate_mcp_elicitation_config, - migrate_mcp_max_connect_attempts, migrate_mcp_retry_and_tool_timeout, migrate_mcp_trust_levels, - migrate_memory_graph_config, migrate_memory_hebbian_config, - migrate_memory_hebbian_consolidation_config, migrate_memory_hebbian_spread_config, - migrate_memory_persona_config, migrate_memory_reasoning_config, - migrate_memory_reasoning_judge_config, migrate_memory_retrieval_config, - migrate_memory_retrieval_query_bias, migrate_microcompact_config, - migrate_orchestration_orchestrator_provider, migrate_orchestration_persistence, - migrate_otel_filter, migrate_planner_model_to_provider, migrate_provider_max_concurrent, - migrate_qdrant_api_key, migrate_quality_config, migrate_sandbox_config, - migrate_sandbox_egress_filter, migrate_scheduler_daemon_config, + migrate_compression_predictor_config, migrate_database_url, migrate_durable_config, + migrate_egress_config, migrate_embed_provider_rename, migrate_fidelity_timeout_defaults, + migrate_five_signal_config, migrate_focus_auto_consolidate_min_window, + migrate_forgetting_config, migrate_goals_config, migrate_hooks_permission_denied_config, + migrate_hooks_turn_complete_config, migrate_llm_stream_limits, migrate_magic_docs_config, + migrate_mcp_elicitation_config, migrate_mcp_max_connect_attempts, + migrate_mcp_retry_and_tool_timeout, migrate_mcp_trust_levels, migrate_memory_graph_config, + migrate_memory_hebbian_config, migrate_memory_hebbian_consolidation_config, + migrate_memory_hebbian_spread_config, migrate_memory_persona_config, + migrate_memory_reasoning_config, migrate_memory_reasoning_judge_config, + migrate_memory_retrieval_config, migrate_memory_retrieval_query_bias, + migrate_microcompact_config, migrate_orchestration_orchestrator_provider, + migrate_orchestration_persistence, migrate_otel_filter, migrate_planner_model_to_provider, + migrate_provider_max_concurrent, migrate_qdrant_api_key, migrate_quality_config, + migrate_sandbox_config, migrate_sandbox_egress_filter, migrate_scheduler_daemon_config, migrate_session_persist_provider_overrides, migrate_session_provider_persistence, migrate_session_recap_config, migrate_shell_transactional, migrate_stt_to_provider, migrate_supervisor_config, migrate_telemetry_config, migrate_tools_compression_config, @@ -664,3 +664,14 @@ impl Migration for MigrateLlmStreamLimits { migrate_llm_stream_limits(toml_src) } } + +pub(super) struct MigrateDurableConfig; +impl Migration for MigrateDurableConfig { + fn name(&self) -> &'static str { + "migrate_durable_config" + } + + fn apply(&self, toml_src: &str) -> Result { + migrate_durable_config(toml_src) + } +} diff --git a/crates/zeph-config/src/migrate/tests.rs b/crates/zeph-config/src/migrate/tests.rs index 7b3498260..e50e5f337 100644 --- a/crates/zeph-config/src/migrate/tests.rs +++ b/crates/zeph-config/src/migrate/tests.rs @@ -9,8 +9,8 @@ use super::*; fn migrations_registry_has_all_steps() { assert_eq!( MIGRATIONS.len(), - 56, - "MIGRATIONS registry must contain all 56 sequential steps" + 57, + "MIGRATIONS registry must contain all 57 sequential steps" ); for m in MIGRATIONS.iter() { assert!( @@ -1599,7 +1599,7 @@ fn migrate_focus_auto_consolidate_noop_when_only_commented_section() { #[test] fn registry_has_fifty_entries() { - assert_eq!(MIGRATIONS.len(), 56); + assert_eq!(MIGRATIONS.len(), 57); } #[test] @@ -1638,7 +1638,7 @@ fn registry_is_idempotent_on_empty_input() { #[test] fn registry_preserves_order_matches_dispatch() { - // Names must follow the documented step order (steps 1–56). + // Names must follow the documented step order (steps 1–57). let expected = [ "migrate_stt_to_provider", "migrate_planner_model_to_provider", @@ -1696,6 +1696,7 @@ fn registry_preserves_order_matches_dispatch() { "migrate_worktree_config", "migrate_worktree_git_timeout", "migrate_llm_stream_limits", + "migrate_durable_config", ]; let actual: Vec<&str> = MIGRATIONS.iter().map(|m| m.name()).collect(); assert_eq!(actual, expected); @@ -2217,6 +2218,65 @@ fn step_54_does_not_skip_when_worktree_in_value() { ); } +// ── migrate_durable_config tests (spec-064, #4949) ─────────────────────── + +#[test] +fn step_57_inserts_durable_section_on_fresh_config() { + let input = "[agent]\nmax_turns = 10\n"; + let result = migrate_durable_config(input).unwrap(); + assert_eq!(result.changed_count, 1); + assert!( + result.output.contains("# [durable]"), + "should insert commented [durable] section" + ); + assert!( + result.output.contains("# [durable.retention]"), + "should include the retention sub-table" + ); + assert!( + result.output.contains("# enabled = false"), + "durable migration is default-off" + ); +} + +#[test] +fn step_57_is_idempotent_when_durable_present() { + let input = "[durable]\nenabled = true\n"; + let result = migrate_durable_config(input).unwrap(); + assert_eq!(result.changed_count, 0); + assert_eq!( + result.output.matches("[durable]").count(), + 1, + "should not duplicate [durable]" + ); +} + +#[test] +fn step_57_is_idempotent_across_repeated_runs() { + let input = "[agent]\nmax_turns = 10\n"; + let once = migrate_durable_config(input).unwrap(); + let twice = migrate_durable_config(&once.output).unwrap(); + assert_eq!( + twice.changed_count, 0, + "second run must not re-insert the commented block" + ); + assert_eq!( + twice.output.matches("# [durable]").count(), + 1, + "running twice must not duplicate the durable block" + ); +} + +#[test] +fn step_57_does_not_skip_when_durable_in_value() { + let input = "[agent]\ndescription = \"the [durable] layer\"\n"; + let result = migrate_durable_config(input).unwrap(); + assert_eq!( + result.changed_count, 1, + "[durable] in a value must not suppress migration" + ); +} + // ── migrate_worktree_git_timeout tests (#4704) ─────────────────────────── #[test] diff --git a/crates/zeph-config/src/root.rs b/crates/zeph-config/src/root.rs index 1e49cfc5f..2e792435a 100644 --- a/crates/zeph-config/src/root.rs +++ b/crates/zeph-config/src/root.rs @@ -136,6 +136,9 @@ pub struct Config { /// Git worktree isolation configuration. #[serde(default)] pub worktree: crate::worktree::WorktreeConfig, + /// Durable execution layer configuration (`[durable]`). + #[serde(default)] + pub durable: crate::durable::DurableConfig, } /// Secrets resolved from the vault at runtime. @@ -338,6 +341,7 @@ impl Default for Config { cocoon: CocoonConfig::default(), secrets: ResolvedSecrets::default(), worktree: crate::worktree::WorktreeConfig::default(), + durable: crate::durable::DurableConfig::default(), } } } diff --git a/crates/zeph-core/src/config.rs b/crates/zeph-core/src/config.rs index 5338bec5c..d006be87c 100644 --- a/crates/zeph-core/src/config.rs +++ b/crates/zeph-core/src/config.rs @@ -32,6 +32,7 @@ pub use zeph_config::{ MicrocompactConfig, PersonaConfig, TrajectoryConfig, TreeConfig, }; pub use zeph_config::{DiagnosticSeverity, DiagnosticsConfig, HoverConfig, LspConfig}; +pub use zeph_config::{DurableBackend, DurableConfig, RetentionPolicy}; pub use zeph_config::{QualityConfig, TriggerPolicy}; pub use zeph_config::{TelemetryBackend, TelemetryConfig}; diff --git a/crates/zeph-core/src/durable.rs b/crates/zeph-core/src/durable.rs index ac88d1f1b..a7380dd3e 100644 --- a/crates/zeph-core/src/durable.rs +++ b/crates/zeph-core/src/durable.rs @@ -47,6 +47,14 @@ const NONCE_END: usize = KEY_ID_LEN + NONCE_LEN; /// Smallest valid sealed blob: `key_id || nonce || tag` (empty ciphertext). const MIN_SEALED_LEN: usize = NONCE_END + TAG_LEN; +/// The key-id byte stamped on every payload sealed with the current `ZEPH_DURABLE_KEY`. +/// +/// `seal` writes this as the leading byte and `open` selects the current key by it. Both the +/// agent-loop engine and the `zeph durable --reveal` CLI build the cipher with this id so a sealed +/// blob round-trips. Rotating to a fresh key bumps the id and registers the old one as the previous +/// slot ([`XChaCha20Poly1305Cipher::with_previous`]). +pub const DURABLE_KEY_ID: u8 = 0; + /// Failure constructing an [`XChaCha20Poly1305Cipher`] from raw vault bytes. #[derive(Debug, thiserror::Error)] #[non_exhaustive] @@ -59,6 +67,9 @@ pub enum CipherKeyError { /// The length of the supplied key material. actual: usize, }, + /// The vault-resolved key string was not valid base64. + #[error("durable cipher key is not valid base64")] + MalformedEncoding, } /// One key registered with the cipher, addressed by its on-disk key-id byte. @@ -126,6 +137,34 @@ impl XChaCha20Poly1305Cipher { Ok(Self::new(key_id, array)) } + /// Construct the current cipher from the base64-encoded `ZEPH_DURABLE_KEY` vault value. + /// + /// This is the single decode path shared by the agent-loop engine and the `zeph durable + /// --reveal` CLI; both use [`DURABLE_KEY_ID`] so a sealed blob round-trips. The key is generated + /// in this same encoding by [`generate_durable_key_b64`]. + /// + /// # Errors + /// + /// Returns [`CipherKeyError::MalformedEncoding`] when `b64_key` is not valid base64, or + /// [`CipherKeyError::InvalidKeyLength`] when the decoded key is not exactly 32 bytes. + /// + /// # Examples + /// + /// ``` + /// use zeph_core::durable::{XChaCha20Poly1305Cipher, generate_durable_key_b64}; + /// + /// let key = generate_durable_key_b64(); + /// assert!(XChaCha20Poly1305Cipher::from_vault_b64(&key).is_ok()); + /// assert!(XChaCha20Poly1305Cipher::from_vault_b64("not base64!").is_err()); + /// ``` + pub fn from_vault_b64(b64_key: &str) -> Result { + use base64::Engine as _; + let bytes = base64::engine::general_purpose::STANDARD + .decode(b64_key.trim()) + .map_err(|_| CipherKeyError::MalformedEncoding)?; + Self::from_vault_bytes(DURABLE_KEY_ID, &bytes) + } + /// Register a previous key for the rotation window. /// /// `open` will select this key for blobs whose leading key-id byte matches `key_id`; `seal` @@ -150,6 +189,26 @@ impl XChaCha20Poly1305Cipher { } } +/// Generate a fresh random 32-byte durable payload key, base64-encoded for vault storage. +/// +/// Stored under `ZEPH_DURABLE_KEY` (never inline in TOML); decode it back with +/// [`XChaCha20Poly1305Cipher::from_vault_b64`]. Drawn from the OS CSPRNG. +/// +/// # Examples +/// +/// ``` +/// use zeph_core::durable::{generate_durable_key_b64, XChaCha20Poly1305Cipher}; +/// +/// let key = generate_durable_key_b64(); +/// assert!(XChaCha20Poly1305Cipher::from_vault_b64(&key).is_ok()); +/// ``` +#[must_use] +pub fn generate_durable_key_b64() -> String { + use base64::Engine as _; + let key = XChaCha20Poly1305::generate_key(&mut OsRng); + base64::engine::general_purpose::STANDARD.encode(key.as_slice()) +} + impl PayloadCipher for XChaCha20Poly1305Cipher { fn seal(&self, plaintext: &[u8], aad: &PayloadAad) -> Result, CipherError> { let aad_bytes = aad.canonical_bytes(); diff --git a/crates/zeph-durable/Cargo.toml b/crates/zeph-durable/Cargo.toml index ed6747b44..5ec81aaf2 100644 --- a/crates/zeph-durable/Cargo.toml +++ b/crates/zeph-durable/Cargo.toml @@ -30,6 +30,7 @@ thiserror.workspace = true tokio = { workspace = true, features = ["macros", "rt", "sync", "time"] } tracing.workspace = true uuid = { workspace = true, features = ["serde", "v7", "v8"] } +zeph-config.workspace = true zeph-db.workspace = true zeroize.workspace = true diff --git a/crates/zeph-durable/src/backend.rs b/crates/zeph-durable/src/backend.rs index e8768fb55..cc426d338 100644 --- a/crates/zeph-durable/src/backend.rs +++ b/crates/zeph-durable/src/backend.rs @@ -39,6 +39,56 @@ pub mod local; pub use local::LocalBackend; +/// A read-only summary of a single durable execution, for operability surfaces. +/// +/// Returned by [`LocalBackend::list_executions`]. It carries only the execution-level metadata that +/// the `zeph durable list` CLI and the TUI `DurableView` display — never payload bytes or resolver +/// tokens (INV-5 redaction). The `kind` is the raw column tag (an [`ExecutionKind::Custom`] cannot +/// round-trip to a typed value, so the stored string is exposed verbatim for display). +/// +/// [`ExecutionKind::Custom`]: crate::ExecutionKind::Custom +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct ExecutionSummary { + /// The execution identity. + pub execution_id: ExecutionId, + /// The canonical kind tag as stored (`agent_turn`, `dag_run`, …, or a custom literal). + pub kind: String, + /// The current execution status. + pub status: ExecutionStatus, + /// Creation time, Unix epoch milliseconds. + pub created_at_ms: i64, + /// Last-update time, Unix epoch milliseconds. + pub updated_at_ms: i64, + /// Finalization time, Unix epoch milliseconds; `None` while the execution is non-terminal. + pub finalized_at_ms: Option, + /// Number of journal entries recorded for this execution. + pub step_count: u64, +} + +/// A redaction-safe view of one journal entry, for the `zeph durable show`/`inspect` CLI. +/// +/// Returned by [`LocalBackend::read_execution_redacted`]. It deliberately excludes the payload bytes +/// and full idempotency key — only the metadata the spec's INV-5 redaction rule permits in default +/// output. To see decrypted payloads a caller must opt in via `--reveal`, which reads through the +/// AEAD cipher with [`Journal::read_execution`](crate::Journal::read_execution) instead. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct RedactedEntry { + /// Global append sequence. + pub seq: i64, + /// The step this entry belongs to. + pub step_id: crate::ids::StepId, + /// The raw `entry_kind` column tag (`step_result`, `effect_intent`, …). + pub entry_kind: String, + /// The effect-class tag, when the entry carries one. + pub effect_class: Option, + /// Hex of the first 8 bytes of the idempotency key, when present (INV-5 prefix only). + pub idem_key_prefix: Option, + /// Size in bytes of the stored (AEAD-sealed) payload; `0` for control entries. + pub payload_len: u64, + /// Creation time, Unix epoch milliseconds. + pub created_at_ms: i64, +} + /// The capabilities a backend advertises so callers can adapt their journaling strategy. /// /// The replay cursor and the durable-step primitive read these flags to decide, for example, diff --git a/crates/zeph-durable/src/backend/local.rs b/crates/zeph-durable/src/backend/local.rs index a8ade614e..d3b9ad80a 100644 --- a/crates/zeph-durable/src/backend/local.rs +++ b/crates/zeph-durable/src/backend/local.rs @@ -17,7 +17,7 @@ //! entries (currently [`EntryKind::EffectIntent`]) carry no payload; when an HMAC key is configured //! the backend stamps a keyed BLAKE3 row HMAC over their identity for shared-database deployments. //! When no cipher is injected the payload is stored verbatim — a development-only posture gated by -//! [`DurableConfig::encryption_gate`](crate::DurableConfig::encryption_gate) at startup. +//! [`encryption_gate`](crate::encryption_gate) at startup. //! //! # Scope //! @@ -33,13 +33,14 @@ //! ([`prune`](Journal::prune)) is a no-op stub here. use std::fmt; +use std::fmt::Write as _; use std::sync::Arc; use std::time::{SystemTime, UNIX_EPOCH}; use bytes::Bytes; use zeph_db::{DbPool, sql}; -use crate::backend::{BackendCapabilities, ExecutionBackend}; +use crate::backend::{BackendCapabilities, ExecutionBackend, ExecutionSummary, RedactedEntry}; use crate::cipher::{EntryKindTag, PayloadAad, PayloadCipher, ensure_payload_within_limit}; use crate::config::RetentionPolicy; use crate::error::DurableError; @@ -61,6 +62,28 @@ use tracing::Instrument as _; /// denial-of-service protection. const SEAL_OVERHEAD_SLACK: u64 = 128; +/// Row shape returned by the `list_executions` query. +type ExecutionRow = (String, String, String, i64, i64, Option, i64); + +/// Row shape returned by the `read_execution_redacted` query. +type RedactedRow = ( + i64, + i64, + String, + Option>, + Option, + Option, + i64, +); + +/// Render the first 8 bytes of an idempotency key as a lowercase hex prefix (INV-5). +fn idem_key_prefix(bytes: &[u8]) -> String { + bytes.iter().take(8).fold(String::new(), |mut acc, b| { + let _ = write!(acc, "{b:02x}"); + acc + }) +} + /// The always-compiled durable backend that journals to a dedicated `durable.db`. /// /// Construct it from a [`zeph_db::DbPool`] (or open one with [`LocalBackend::open`]), then attach an @@ -173,6 +196,144 @@ impl LocalBackend { Ok(()) } + /// List execution summaries for operability surfaces (the `zeph durable` CLI and TUI). + /// + /// Returns at most `limit` executions, newest first, optionally filtered by `status` and `kind` + /// (each is matched against the raw column tag; `None` disables that filter). Only execution-level + /// metadata is read — never payload bytes or resolver tokens (INV-5). The per-execution step + /// count is the number of journal entries recorded for it. + /// + /// Span: `durable.backend.list`. + /// + /// # Errors + /// + /// Returns [`DurableError::Storage`] if the query fails, or [`DurableError::Decode`] if a stored + /// id or status cannot be reconstructed (schema corruption — the `status` column is + /// `CHECK`-constrained, so this is a fail-closed guard rather than a routine path). + pub async fn list_executions( + &self, + status: Option<&str>, + kind: Option<&str>, + limit: i64, + ) -> Result, DurableError> { + let span = tracing::info_span!( + "durable.backend.list", + status = status.unwrap_or("*"), + kind = kind.unwrap_or("*"), + count = tracing::field::Empty, + ); + async move { + // `COALESCE(?, col)` keeps a single positional bind per filter and lets the column type + // drive the bind type, so the same literal works on both SQLite and Postgres without a + // cast on the `?` placeholder. + let rows: Vec = + zeph_db::query_as(sql!( + "SELECT + e.execution_id, + e.kind, + e.status, + e.created_at, + e.updated_at, + e.finalized_at, + (SELECT COUNT(*) FROM durable_journal j WHERE j.execution_id = e.execution_id) + FROM durable_executions e + WHERE e.status = COALESCE(?, e.status) + AND e.kind = COALESCE(?, e.kind) + ORDER BY e.created_at DESC + LIMIT ?" + )) + .bind(status) + .bind(kind) + .bind(limit) + .fetch_all(&self.pool) + .await + .map_err(|e| DurableError::storage("list", e))?; + tracing::Span::current().record("count", rows.len()); + rows.into_iter() + .map(|(id, kind, status, created, updated, finalized, steps)| { + Ok(ExecutionSummary { + execution_id: parse_execution_id(&id)?, + kind, + status: ExecutionStatus::from_tag(&status).ok_or(DurableError::Decode { + context: "execution status is not a recognized CHECK-constrained value", + })?, + created_at_ms: created, + updated_at_ms: updated, + finalized_at_ms: finalized, + step_count: steps.max(0).cast_unsigned(), + }) + }) + .collect() + } + .instrument(span) + .await + } + + /// Read one execution's journal entries as redaction-safe metadata, without decrypting payloads. + /// + /// Unlike [`read_execution`](Journal::read_execution), this never touches the cipher, so it works + /// against a journal whose AEAD key is unavailable and never exposes plaintext (INV-5). It backs + /// the default (redacted) `zeph durable show`/`inspect` output. Entries are returned in append + /// order. + /// + /// Span: `durable.backend.read_redacted`. + /// + /// # Errors + /// + /// Returns [`DurableError::Storage`] if the query fails. + pub async fn read_execution_redacted( + &self, + id: ExecutionId, + ) -> Result, DurableError> { + let exec = id.as_uuid().to_string(); + let rows: Vec = zeph_db::query_as(sql!( + "SELECT seq, step_id, entry_kind, idem_key, effect_class, LENGTH(payload), created_at + FROM durable_journal WHERE execution_id = ? ORDER BY seq" + )) + .bind(&exec) + .fetch_all(&self.pool) + .await + .map_err(|e| DurableError::storage("read_redacted", e))?; + Ok(rows + .into_iter() + .map( + |(seq, step, entry_kind, idem, effect_class, payload_len, created)| RedactedEntry { + seq, + step_id: StepId::new(u32::try_from(step).unwrap_or(0)), + entry_kind, + effect_class, + idem_key_prefix: idem.as_deref().map(idem_key_prefix), + payload_len: payload_len.unwrap_or(0).max(0).cast_unsigned(), + created_at_ms: created, + }, + ) + .collect()) + } + + /// Count terminal executions a [`prune`](Journal::prune) sweep would delete under `policy`. + /// + /// Read-only: backs `zeph durable prune --dry-run`. It applies the same TTL cutoffs as the + /// delete path, so the count is exactly what a real sweep would remove now. + /// + /// # Errors + /// + /// Returns [`DurableError::Storage`] if the query fails. + pub async fn count_prunable(&self, policy: &RetentionPolicy) -> Result { + let cutoffs = crate::retention::PruneCutoffs::from_policy(policy, now_unix_millis()); + let (count,): (i64,) = zeph_db::query_as(sql!( + "SELECT COUNT(*) FROM durable_executions + WHERE finalized_at IS NOT NULL + AND ( (status = 'completed' AND finalized_at <= ?) + OR (status IN ('failed', 'aborted') AND finalized_at <= ?) )" + )) + .bind(cutoffs.completed_before_ms) + .bind(cutoffs.failed_before_ms) + .fetch_one(&self.pool) + .await + .map_err(|e| DurableError::storage("count_prunable", e))?; + Ok(count.max(0).cast_unsigned()) + } + /// Ensure a `durable_executions` row exists for `id`, returning whether this is a resume. /// /// Inserts a fresh `running` row for a new execution (returning `false`) or detects an existing @@ -1440,6 +1601,69 @@ mod tests { ); } + #[tokio::test] + async fn list_executions_summarizes_and_filters() { + let backend = mem_backend(1_048_576).await; + let turn = ExecutionId::new(); + let dag = ExecutionId::new(); + backend + .open_execution(turn, ExecutionKind::AgentTurn) + .await + .unwrap(); + backend + .open_execution(dag, ExecutionKind::DagRun) + .await + .unwrap(); + backend.append(step_result(turn, 0, b"a")).await.unwrap(); + backend.append(step_result(turn, 1, b"b")).await.unwrap(); + backend.append(step_result(dag, 0, b"c")).await.unwrap(); + backend + .finalize(turn, ExecutionStatus::Completed) + .await + .unwrap(); + + // Unfiltered: both executions with their per-execution step counts. + let all = backend.list_executions(None, None, 10).await.unwrap(); + assert_eq!(all.len(), 2); + + let turn_row = all + .iter() + .find(|e| e.execution_id == turn) + .expect("turn present"); + assert_eq!(turn_row.kind, "agent_turn"); + assert_eq!(turn_row.status, ExecutionStatus::Completed); + assert_eq!(turn_row.step_count, 2); + assert!(turn_row.finalized_at_ms.is_some()); + + let dag_row = all + .iter() + .find(|e| e.execution_id == dag) + .expect("dag present"); + assert_eq!(dag_row.status, ExecutionStatus::Running); + assert_eq!(dag_row.step_count, 1); + assert!(dag_row.finalized_at_ms.is_none()); + + // Status filter narrows to the still-running execution. + let running = backend + .list_executions(Some("running"), None, 10) + .await + .unwrap(); + assert_eq!(running.len(), 1); + assert_eq!(running[0].execution_id, dag); + + // Kind filter narrows to the DAG execution. + let dags = backend + .list_executions(None, Some("dag_run"), 10) + .await + .unwrap(); + assert_eq!(dags.len(), 1); + assert_eq!(dags[0].execution_id, dag); + + // Limit caps the result set. + let one = backend.list_executions(None, None, 1).await.unwrap(); + assert_eq!(one.len(), 1); + } + #[tokio::test] async fn append_and_read_round_trips_step_result() { let backend = mem_backend(1_048_576).await; diff --git a/crates/zeph-durable/src/cipher.rs b/crates/zeph-durable/src/cipher.rs index a95e43bd7..dfcc7f14c 100644 --- a/crates/zeph-durable/src/cipher.rs +++ b/crates/zeph-durable/src/cipher.rs @@ -57,7 +57,7 @@ const AAD_FORMAT_V1: u8 = 1; /// A `PayloadCipher` is the only component permitted to see plaintext payload bytes. It is injected /// into a backend as `Option>`: `None` disables encryption (a development /// override permitted only for a single-user local backend, see -/// [`DurableConfig::encryption_gate`](crate::DurableConfig::encryption_gate)). +/// [`encryption_gate`](crate::encryption_gate)). /// /// # Contract for implementors /// diff --git a/crates/zeph-durable/src/config.rs b/crates/zeph-durable/src/config.rs index 4188776c8..94382cb53 100644 --- a/crates/zeph-durable/src/config.rs +++ b/crates/zeph-durable/src/config.rs @@ -1,107 +1,30 @@ // SPDX-FileCopyrightText: 2026 Andrei G // SPDX-License-Identifier: MIT OR Apache-2.0 -//! Pure-data configuration mirroring the `[durable]` TOML section. +//! Durable configuration and the AEAD enforcement gate. //! -//! Every field carries a spec default via the container-level `#[serde(default)]` attribute backed -//! by [`Default`], so deserializing an empty table yields a fully-populated, spec-compliant -//! configuration. No credentials appear inline — the AEAD key and any Restate endpoints are -//! resolved from the vault by key name (spec-038 vault contract), never stored here. +//! The pure-data configuration types ([`DurableConfig`], [`RetentionPolicy`], [`DurableBackend`]) +//! live in `zeph-config` so the aggregate [`Config`](zeph_config::Config) can hold them without +//! pulling this crate's `zeph-db`/`sqlx` dependency tree onto the config layer. They are re-exported +//! here for ergonomic access from the engine APIs that consume them ([`DurableContext`] and +//! [`JournalWriter`]). +//! +//! On top of the data, this module owns the **security policy**: [`encryption_gate`] evaluates the +//! INV-8 AEAD requirement for a deployment. The policy lives next to [`DurableError`] and the cipher +//! contract (in this crate), not with the pure data. +//! +//! [`DurableContext`]: crate::DurableContext +//! [`JournalWriter`]: crate::JournalWriter -use serde::{Deserialize, Serialize}; +pub use zeph_config::{DurableBackend, DurableConfig, RetentionPolicy}; use crate::error::DurableError; -/// Which journal backend an execution uses. -/// -/// `Restate` is only meaningful when the `restate` feature and an external Restate server are -/// available; the variant is accepted in configuration regardless so a config can be authored -/// ahead of the backend being compiled in. -#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Serialize, Deserialize)] -#[serde(rename_all = "lowercase")] -pub enum DurableBackend { - /// Dedicated `durable.db` SQLite/Postgres file managed in-process. The default. - #[default] - Local, - /// External Restate server (feature-gated, server deployments only). - Restate, -} - -/// Configuration for the durable execution layer (`[durable]`). -/// -/// # Examples -/// -/// ``` -/// use zeph_durable::DurableConfig; -/// -/// // An empty table deserializes to the spec defaults. -/// let cfg: DurableConfig = toml::from_str("").unwrap(); -/// assert!(!cfg.enabled); -/// assert_eq!(cfg.journal_ack_timeout_ms, 5000); -/// assert_eq!(cfg.max_payload_bytes, 1_048_576); -/// ``` -#[allow(clippy::struct_excessive_bools)] // config struct — boolean flags are idiomatic for TOML-deserialized configuration -#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] -#[serde(default)] -pub struct DurableConfig { - /// Master opt-in. When `false`, no journal is opened and behavior is identical to a build - /// without the durable layer. - pub enabled: bool, - /// Selected journal backend. - pub backend: DurableBackend, - /// Encrypt payloads with AEAD. A `false` value is a development-only override (it emits a - /// startup warning) and is forbidden for non-local backends (INV-8). - pub encrypt_payload: bool, - /// P1 adapter: wrap agent-loop steps in durable steps. - pub agent_turns: bool, - /// P2 adapter: journal the orchestration `/plan resume` replan budget. - pub orchestration: bool, - /// P3 adapter: exactly-once scheduler job fire. - pub scheduler: bool, - /// P4 adapter: durable promise for subagent spawn/await. - pub subagent: bool, - /// Group-commit interval for buffered appends, in milliseconds. - pub journal_flush_interval_ms: u64, - /// Timeout for an acknowledged append before degrading to non-durable mode, in milliseconds. - pub journal_ack_timeout_ms: u64, - /// In-execution step cap (soft fold at 90%, hard abort at 100%). - pub max_steps_per_execution: u32, - /// Maximum payload size in bytes, enforced on both append and read. - pub max_payload_bytes: u64, - /// Database fallback poll interval for parked promises, in seconds. - pub promise_poll_interval_secs: u64, - /// Above this many parked promises, resolution falls back to pure polling. - pub max_parked_promises: u32, - /// Journal retention and compaction policy (`[durable.retention]`). - pub retention: RetentionPolicy, -} - -impl Default for DurableConfig { - fn default() -> Self { - Self { - enabled: false, - backend: DurableBackend::Local, - encrypt_payload: true, - agent_turns: true, - orchestration: true, - scheduler: true, - subagent: true, - journal_flush_interval_ms: 10, - journal_ack_timeout_ms: 5000, - max_steps_per_execution: 10_000, - max_payload_bytes: 1_048_576, - promise_poll_interval_secs: 2, - max_parked_promises: 1000, - retention: RetentionPolicy::default(), - } - } -} - /// Outcome of evaluating the INV-8 AEAD requirement for a deployment. /// -/// Returned by [`DurableConfig::encryption_gate`]. The error case -/// ([`DurableError::EncryptionRequired`]) covers the forbidden combinations; this enum distinguishes -/// the two *permitted* outcomes so the caller can act on the development-override warning. +/// Returned by [`encryption_gate`]. The error case ([`DurableError::EncryptionRequired`]) covers the +/// forbidden combinations; this enum distinguishes the two *permitted* outcomes so the caller can act +/// on the development-override warning. #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub enum EncryptionGate { /// AEAD payload encryption is enabled — proceed normally. @@ -111,144 +34,73 @@ pub enum EncryptionGate { DisabledLocalWarn, } -impl DurableConfig { - /// Evaluate whether the configured `encrypt_payload` setting is permitted for this deployment - /// (INV-8). - /// - /// AEAD is default-on. Disabling it is a development-only override that is permitted **only** for - /// a single-user local backend on a non-shared database. `shared_db` MUST be `true` whenever the - /// journal lives on a multi-client database (Postgres, or any file shared across processes), - /// where the DB-file trust boundary does not hold. - /// - /// # Errors - /// - /// Returns [`DurableError::EncryptionRequired`] when `encrypt_payload = false` is combined with - /// a non-local backend or a shared database. - /// - /// # Examples - /// - /// ``` - /// use zeph_durable::{DurableConfig, EncryptionGate}; - /// - /// // Default config keeps AEAD on regardless of deployment. - /// let cfg = DurableConfig::default(); - /// assert_eq!(cfg.encryption_gate(true).unwrap(), EncryptionGate::Enabled); - /// - /// // Disabling AEAD is tolerated only on a single-user local backend. - /// let dev = DurableConfig { encrypt_payload: false, ..DurableConfig::default() }; - /// assert_eq!(dev.encryption_gate(false).unwrap(), EncryptionGate::DisabledLocalWarn); - /// assert!(dev.encryption_gate(true).is_err(), "forbidden on a shared database"); - /// ``` - pub fn encryption_gate(&self, shared_db: bool) -> Result { - if self.encrypt_payload { - return Ok(EncryptionGate::Enabled); - } - if self.backend != DurableBackend::Local { - return Err(DurableError::EncryptionRequired { context: "restate" }); - } - if shared_db { - return Err(DurableError::EncryptionRequired { - context: "shared-database", - }); - } - Ok(EncryptionGate::DisabledLocalWarn) - } -} - -/// Journal retention and compaction policy (`[durable.retention]`). +/// Evaluate whether the configured `encrypt_payload` setting is permitted for this deployment +/// (INV-8). +/// +/// AEAD is default-on. Disabling it is a development-only override that is permitted **only** for a +/// single-user local backend on a non-shared database. `shared_db` MUST be `true` whenever the +/// journal lives on a multi-client database (Postgres, or any file shared across processes), where +/// the DB-file trust boundary does not hold. /// -/// Drives the background prune sweep, which never runs on the dispatch hot path. +/// # Errors +/// +/// Returns [`DurableError::EncryptionRequired`] when `encrypt_payload = false` is combined with a +/// non-local backend or a shared database. /// /// # Examples /// /// ``` -/// use zeph_durable::RetentionPolicy; +/// use zeph_durable::{DurableBackend, DurableConfig, EncryptionGate, encryption_gate}; +/// +/// // Default config keeps AEAD on regardless of deployment. +/// let cfg = DurableConfig::default(); +/// assert_eq!(encryption_gate(&cfg, true).unwrap(), EncryptionGate::Enabled); /// -/// let policy = RetentionPolicy::default(); -/// assert_eq!(policy.ttl_completed_secs, 604_800); // 7 days -/// assert_eq!(policy.ttl_failed_secs, 2_592_000); // 30 days +/// // Disabling AEAD is tolerated only on a single-user local backend. +/// let dev = DurableConfig { encrypt_payload: false, ..DurableConfig::default() }; +/// assert_eq!(encryption_gate(&dev, false).unwrap(), EncryptionGate::DisabledLocalWarn); +/// assert!(encryption_gate(&dev, true).is_err(), "forbidden on a shared database"); /// ``` -#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] -#[serde(default)] -pub struct RetentionPolicy { - /// Prune completed executions older than this, in seconds. - pub ttl_completed_secs: u64, - /// Prune failed or aborted executions older than this, in seconds. - pub ttl_failed_secs: u64, - /// LRU cap on the number of stored executions. - pub max_executions: u64, - /// Size cap on the journal in bytes; exceeding it triggers an LRU sweep. - pub max_journal_bytes: u64, - /// Rows deleted per transaction during a prune sweep; the task yields between batches. - pub prune_batch_size: u64, - /// Background prune poll interval, in seconds. - pub prune_interval_secs: u64, -} - -impl Default for RetentionPolicy { - fn default() -> Self { - Self { - ttl_completed_secs: 604_800, - ttl_failed_secs: 2_592_000, - max_executions: 10_000, - max_journal_bytes: 1_073_741_824, - prune_batch_size: 500, - prune_interval_secs: 3600, - } +pub fn encryption_gate( + cfg: &DurableConfig, + shared_db: bool, +) -> Result { + if cfg.encrypt_payload { + return Ok(EncryptionGate::Enabled); + } + if cfg.backend != DurableBackend::Local { + return Err(DurableError::EncryptionRequired { context: "restate" }); + } + if shared_db { + return Err(DurableError::EncryptionRequired { + context: "shared-database", + }); } + Ok(EncryptionGate::DisabledLocalWarn) } #[cfg(test)] mod tests { use super::*; - #[test] - fn empty_table_yields_every_spec_default() { - let cfg: DurableConfig = toml::from_str("").unwrap(); - assert!(!cfg.enabled); - assert_eq!(cfg.backend, DurableBackend::Local); - assert!(cfg.encrypt_payload); - assert!(cfg.agent_turns); - assert!(cfg.orchestration); - assert!(cfg.scheduler); - assert!(cfg.subagent); - assert_eq!(cfg.journal_flush_interval_ms, 10); - assert_eq!(cfg.journal_ack_timeout_ms, 5000); - assert_eq!(cfg.max_steps_per_execution, 10_000); - assert_eq!(cfg.max_payload_bytes, 1_048_576); - assert_eq!(cfg.promise_poll_interval_secs, 2); - assert_eq!(cfg.max_parked_promises, 1000); - } - - #[test] - fn empty_table_yields_retention_defaults() { - let cfg: DurableConfig = toml::from_str("").unwrap(); - assert_eq!(cfg.retention.ttl_completed_secs, 604_800); - assert_eq!(cfg.retention.ttl_failed_secs, 2_592_000); - assert_eq!(cfg.retention.max_executions, 10_000); - assert_eq!(cfg.retention.max_journal_bytes, 1_073_741_824); - assert_eq!(cfg.retention.prune_batch_size, 500); - assert_eq!(cfg.retention.prune_interval_secs, 3600); - } - - #[test] - fn default_impl_matches_serde_default() { - let from_toml: DurableConfig = toml::from_str("").unwrap(); - assert_eq!(from_toml, DurableConfig::default()); - } - #[test] fn encryption_gate_passes_when_aead_enabled() { let cfg = DurableConfig::default(); assert!(cfg.encrypt_payload); - assert_eq!(cfg.encryption_gate(false).unwrap(), EncryptionGate::Enabled); - assert_eq!(cfg.encryption_gate(true).unwrap(), EncryptionGate::Enabled); + assert_eq!( + encryption_gate(&cfg, false).unwrap(), + EncryptionGate::Enabled + ); + assert_eq!( + encryption_gate(&cfg, true).unwrap(), + EncryptionGate::Enabled + ); let restate = DurableConfig { backend: DurableBackend::Restate, ..DurableConfig::default() }; assert_eq!( - restate.encryption_gate(true).unwrap(), + encryption_gate(&restate, true).unwrap(), EncryptionGate::Enabled ); } @@ -261,7 +113,7 @@ mod tests { ..DurableConfig::default() }; assert_eq!( - cfg.encryption_gate(false).unwrap(), + encryption_gate(&cfg, false).unwrap(), EncryptionGate::DisabledLocalWarn ); } @@ -274,7 +126,7 @@ mod tests { ..DurableConfig::default() }; assert!(matches!( - local_shared.encryption_gate(true), + encryption_gate(&local_shared, true), Err(DurableError::EncryptionRequired { context: "shared-database" }) @@ -286,28 +138,8 @@ mod tests { ..DurableConfig::default() }; assert!(matches!( - restate.encryption_gate(false), + encryption_gate(&restate, false), Err(DurableError::EncryptionRequired { context: "restate" }) )); } - - #[test] - fn partial_table_overrides_only_named_fields() { - let cfg: DurableConfig = toml::from_str( - r#" - enabled = true - backend = "restate" - - [retention] - prune_batch_size = 999 - "#, - ) - .unwrap(); - assert!(cfg.enabled); - assert_eq!(cfg.backend, DurableBackend::Restate); - // Untouched fields keep their defaults. - assert_eq!(cfg.journal_ack_timeout_ms, 5000); - assert_eq!(cfg.retention.prune_batch_size, 999); - assert_eq!(cfg.retention.ttl_completed_secs, 604_800); - } } diff --git a/crates/zeph-durable/src/ids.rs b/crates/zeph-durable/src/ids.rs index efc3c178e..23426f2e1 100644 --- a/crates/zeph-durable/src/ids.rs +++ b/crates/zeph-durable/src/ids.rs @@ -87,6 +87,30 @@ impl ExecutionId { pub(crate) fn from_uuid(uuid: Uuid) -> Self { Self(uuid) } + + /// Parse a canonical UUID string into an execution identity. + /// + /// Used by operability surfaces (the `zeph durable` CLI, the TUI) that accept a user-supplied + /// execution id. A fresh execution always uses [`ExecutionId::new`]; this is for addressing an + /// existing one. + /// + /// # Errors + /// + /// Returns the underlying [`uuid::Error`] when `s` is not a valid UUID. + /// + /// # Examples + /// + /// ``` + /// use zeph_durable::ExecutionId; + /// + /// let id = ExecutionId::new(); + /// let parsed = ExecutionId::parse_str(&id.as_uuid().to_string()).unwrap(); + /// assert_eq!(parsed, id); + /// assert!(ExecutionId::parse_str("not-a-uuid").is_err()); + /// ``` + pub fn parse_str(s: &str) -> Result { + Ok(Self::from_uuid(Uuid::parse_str(s)?)) + } } impl Default for ExecutionId { diff --git a/crates/zeph-durable/src/journal.rs b/crates/zeph-durable/src/journal.rs index 430265e12..a66f081d2 100644 --- a/crates/zeph-durable/src/journal.rs +++ b/crates/zeph-durable/src/journal.rs @@ -61,6 +61,22 @@ impl ExecutionStatus { } } + /// Reconstruct a status from its canonical `status`-column string. + /// + /// Returns `None` for an unrecognized tag. The `durable_executions.status` column carries a + /// `CHECK` constraint over exactly these four values, so a `None` indicates schema corruption + /// or drift rather than a routine miss; callers should fail closed. + #[must_use] + pub fn from_tag(tag: &str) -> Option { + match tag { + "running" => Some(Self::Running), + "completed" => Some(Self::Completed), + "failed" => Some(Self::Failed), + "aborted" => Some(Self::Aborted), + _ => None, + } + } + /// Whether the execution is still in flight (not yet in a terminal state). #[must_use] pub fn is_running(self) -> bool { diff --git a/crates/zeph-durable/src/lib.rs b/crates/zeph-durable/src/lib.rs index 08c10d44d..8e7f64857 100644 --- a/crates/zeph-durable/src/lib.rs +++ b/crates/zeph-durable/src/lib.rs @@ -28,8 +28,8 @@ //! - [`cipher`] — the [`PayloadCipher`] AEAD contract, [`PayloadAad`] binding, and the read-side //! `max_payload` guard. The concrete cipher lives in a consuming crate (INV-1). //! - [`effect`] — the [`EffectClass`] side-effect contract referenced by journal entries. -//! - [`config`] — the pure-data [`DurableConfig`] and [`RetentionPolicy`] mirroring the -//! `[durable]` TOML section. +//! - [`config`] — re-exports the pure-data [`DurableConfig`] and [`RetentionPolicy`] (which live in +//! `zeph-config`) and owns the [`encryption_gate`](crate::encryption_gate) AEAD enforcement policy. //! - [`error`] — the crate-wide [`DurableError`]. //! //! Persistence engine: @@ -91,11 +91,14 @@ pub mod writer; #[doc(hidden)] pub use sealed::Sealed; -pub use backend::{BackendCapabilities, DurableBackendEnum, ExecutionBackend, LocalBackend}; +pub use backend::{ + BackendCapabilities, DurableBackendEnum, ExecutionBackend, ExecutionSummary, LocalBackend, + RedactedEntry, +}; pub use cipher::{ CipherError, EntryKindTag, PayloadAad, PayloadCipher, ensure_payload_within_limit, }; -pub use config::{DurableBackend, DurableConfig, EncryptionGate, RetentionPolicy}; +pub use config::{DurableBackend, DurableConfig, EncryptionGate, RetentionPolicy, encryption_gate}; pub use effect::{EffectClass, EffectIntentSubClass, OnAmbiguous}; pub use error::DurableError; pub use handle::{DurableContext, ParallelScope}; diff --git a/crates/zeph-durable/src/retention.rs b/crates/zeph-durable/src/retention.rs index 5d69b95bc..4c401d2c6 100644 --- a/crates/zeph-durable/src/retention.rs +++ b/crates/zeph-durable/src/retention.rs @@ -288,7 +288,7 @@ pub(crate) struct PruneCutoffs { } impl PruneCutoffs { - fn from_policy(policy: &RetentionPolicy, now_ms: i64) -> Self { + pub(crate) fn from_policy(policy: &RetentionPolicy, now_ms: i64) -> Self { let completed = i64::try_from(policy.ttl_completed_secs.saturating_mul(1000)).unwrap_or(i64::MAX); let failed = i64::try_from(policy.ttl_failed_secs.saturating_mul(1000)).unwrap_or(i64::MAX); diff --git a/crates/zeph-tui/src/app/draw.rs b/crates/zeph-tui/src/app/draw.rs index 2d598c7f1..1a9945264 100644 --- a/crates/zeph-tui/src/app/draw.rs +++ b/crates/zeph-tui/src/app/draw.rs @@ -163,6 +163,16 @@ impl App { ); } + // Overlay durable panel over the subagents slot when `D` key is active (spec-064, #4949). + if self.active_panel == Panel::Durable { + widgets::durable::render( + &self.durable_snapshot, + frame, + layout.subagents, + &mut self.durable_list_state, + ); + } + // Overlay task registry over the subagents slot when `/tasks` is toggled. if self.show_task_panel { if self.task_supervisor.is_some() { diff --git a/crates/zeph-tui/src/app/events.rs b/crates/zeph-tui/src/app/events.rs index 6238353f1..1b7c33da5 100644 --- a/crates/zeph-tui/src/app/events.rs +++ b/crates/zeph-tui/src/app/events.rs @@ -244,6 +244,9 @@ impl App { AgentEvent::FleetSnapshot(snapshot) => { self.fleet_snapshot = snapshot; } + AgentEvent::DurableSnapshot(snapshot) => { + self.durable_snapshot = snapshot; + } } } diff --git a/crates/zeph-tui/src/app/keys.rs b/crates/zeph-tui/src/app/keys.rs index 404384a81..7118fa1a2 100644 --- a/crates/zeph-tui/src/app/keys.rs +++ b/crates/zeph-tui/src/app/keys.rs @@ -237,6 +237,9 @@ impl App { TuiCommand::FleetPanel => { self.active_panel = Panel::Fleet; } + TuiCommand::DurablePanel => { + self.active_panel = Panel::Durable; + } TuiCommand::CocoonStatus => { self.push_system_message("Querying Cocoon sidecar...".to_owned()); let _ = self.user_input_tx.try_send("/cocoon status".to_owned()); @@ -808,7 +811,8 @@ impl App { Panel::Memory => Panel::Resources, Panel::Resources => Panel::SubAgents, Panel::SubAgents | Panel::Tasks => Panel::Fleet, - Panel::Fleet => Panel::Chat, + Panel::Fleet => Panel::Durable, + Panel::Durable => Panel::Chat, }; } KeyCode::Char('l') if key.modifiers.contains(KeyModifiers::CONTROL) => { @@ -828,6 +832,9 @@ impl App { KeyCode::Char('f') => { self.active_panel = Panel::Fleet; } + KeyCode::Char('D') => { + self.active_panel = Panel::Durable; + } KeyCode::Char('a') => { self.active_panel = Panel::SubAgents; // Auto-select first agent if nothing selected yet. diff --git a/crates/zeph-tui/src/app/mod.rs b/crates/zeph-tui/src/app/mod.rs index e1f26d1f8..3bd91ec6d 100644 --- a/crates/zeph-tui/src/app/mod.rs +++ b/crates/zeph-tui/src/app/mod.rs @@ -59,6 +59,8 @@ pub enum Panel { Tasks, /// The fleet session overview panel (side column). Fleet, + /// The durable execution journal panel (side column). + Durable, } /// Discriminates what the main chat area is currently displaying. @@ -417,6 +419,10 @@ pub struct App { pub(crate) fleet_snapshot: crate::widgets::fleet::FleetSnapshot, /// List scroll state for the fleet panel. pub(crate) fleet_list_state: ratatui::widgets::ListState, + /// Cached durable execution data for the durable panel (spec-064, #4949). + pub(crate) durable_snapshot: crate::widgets::durable::DurableSnapshot, + /// List scroll state for the durable panel. + pub(crate) durable_list_state: ratatui::widgets::ListState, } mod draw; diff --git a/crates/zeph-tui/src/app/state.rs b/crates/zeph-tui/src/app/state.rs index c1cc86f13..5f6ae3135 100644 --- a/crates/zeph-tui/src/app/state.rs +++ b/crates/zeph-tui/src/app/state.rs @@ -87,6 +87,8 @@ impl App { clipboard: crate::clipboard::ClipboardHandle::new(), fleet_snapshot: crate::widgets::fleet::FleetSnapshot::default(), fleet_list_state: ratatui::widgets::ListState::default(), + durable_snapshot: crate::widgets::durable::DurableSnapshot::default(), + durable_list_state: ratatui::widgets::ListState::default(), } } diff --git a/crates/zeph-tui/src/app/tests.rs b/crates/zeph-tui/src/app/tests.rs index 5c2f38d63..c0c73f511 100644 --- a/crates/zeph-tui/src/app/tests.rs +++ b/crates/zeph-tui/src/app/tests.rs @@ -182,6 +182,9 @@ fn tab_cycles_panels() { app.handle_event(AppEvent::Key(tab)); assert_eq!(app.active_panel, Panel::Fleet); + app.handle_event(AppEvent::Key(tab)); + assert_eq!(app.active_panel, Panel::Durable); + app.handle_event(AppEvent::Key(tab)); assert_eq!(app.active_panel, Panel::Chat); } diff --git a/crates/zeph-tui/src/command.rs b/crates/zeph-tui/src/command.rs index 09cd8f733..44a5a9cbc 100644 --- a/crates/zeph-tui/src/command.rs +++ b/crates/zeph-tui/src/command.rs @@ -122,6 +122,8 @@ pub enum TuiCommand { CopyLastAssistant, // Fleet session overview (#3884) FleetPanel, + // Durable execution journal (spec-064, #4949) + DurablePanel, // Worktree subsystem (#4679) WorktreeList, WorktreeClean, @@ -242,6 +244,13 @@ fn build_view_commands() -> Vec { shortcut: Some("f"), command: TuiCommand::FleetPanel, }, + CommandEntry { + id: "durable", + label: "Durable: show durable executions", + category: "view", + shortcut: Some("D"), + command: TuiCommand::DurablePanel, + }, ] } @@ -879,7 +888,7 @@ mod tests { #[test] fn registry_has_correct_count() { - assert_eq!(command_registry().len(), 21); + assert_eq!(command_registry().len(), 22); } #[test] diff --git a/crates/zeph-tui/src/event.rs b/crates/zeph-tui/src/event.rs index 22fe8627a..3a513e900 100644 --- a/crates/zeph-tui/src/event.rs +++ b/crates/zeph-tui/src/event.rs @@ -240,6 +240,8 @@ pub enum AgentEvent { ContextEstimate(usize), /// Updated fleet snapshot from the background DB poll task (#3884). FleetSnapshot(crate::widgets::fleet::FleetSnapshot), + /// Updated durable execution snapshot from the background poll task (spec-064, #4949). + DurableSnapshot(crate::widgets::durable::DurableSnapshot), } /// Blocking event pump that forwards terminal events to the async [`AppEvent`] channel. diff --git a/crates/zeph-tui/src/widgets/durable.rs b/crates/zeph-tui/src/widgets/durable.rs new file mode 100644 index 000000000..3b4a7042f --- /dev/null +++ b/crates/zeph-tui/src/widgets/durable.rs @@ -0,0 +1,156 @@ +// SPDX-FileCopyrightText: 2026 Andrei G +// SPDX-License-Identifier: MIT OR Apache-2.0 + +//! TUI durable execution panel (spec-064, #4949): a live table of durable executions. +//! +//! Renders only redaction-safe metadata (INV-5) — execution id, kind, status, step count, age — +//! never payload bytes. Data arrives as a [`DurableSnapshot`] via +//! [`AgentEvent::DurableSnapshot`](crate::AgentEvent::DurableSnapshot), refreshed by the binary's +//! durable poll task. When the journal is unreachable (durable execution disabled or the file is +//! missing) the panel shows the [`STATUS_UNAVAILABLE`] message. + +use ratatui::Frame; +use ratatui::layout::Rect; +use ratatui::style::{Color, Modifier, Style}; +use ratatui::text::{Line, Span}; +use ratatui::widgets::{Block, Borders, List, ListItem, ListState}; + +use crate::theme::Theme; + +/// Status message: a crashed execution is replaying its journal (spec-011). +pub const STATUS_REPLAYING: &str = "Replaying execution…"; +/// Status message: a background retention sweep is running (spec-011). +pub const STATUS_PRUNING: &str = "Pruning journal…"; +/// Status message: a durable promise is parked awaiting an external completion (spec-011). +pub const STATUS_AWAITING: &str = "Awaiting external completion…"; +/// Status message: the journal is unreachable, so the agent runs without durability (spec-011). +pub const STATUS_UNAVAILABLE: &str = "Journal unavailable — non-durable mode"; + +/// One durable execution, display-ready. Carries no payload bytes (INV-5 redaction). +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct DurableRow { + /// First 8 characters of the execution UUID. + pub id_short: String, + /// The execution kind tag (`agent_turn`, `dag_run`, …). + pub kind: String, + /// The execution status (`running`, `completed`, `failed`, `aborted`). + pub status: String, + /// Number of journal entries recorded for the execution. + pub step_count: u64, + /// Seconds since the execution was created. + pub age_secs: u64, +} + +/// Snapshot of the durable journal for the panel, refreshed by the binary's poll task. +#[derive(Debug, Clone, Default)] +pub struct DurableSnapshot { + /// Whether the journal could be reached. `false` renders [`STATUS_UNAVAILABLE`]. + pub available: bool, + /// Executions, newest first. + pub executions: Vec, +} + +fn status_color(status: &str) -> Color { + match status { + "running" => Color::Green, + "completed" => Color::DarkGray, + "failed" => Color::Red, + "aborted" => Color::Yellow, + _ => Color::White, + } +} + +/// Render a coarse age such as `12s`, `5m`, `3h`, `2d`. +fn fmt_age(secs: u64) -> String { + match secs { + s if s < 60 => format!("{s}s"), + s if s < 3_600 => format!("{}m", s / 60), + s if s < 86_400 => format!("{}h", s / 3_600), + s => format!("{}d", s / 86_400), + } +} + +/// Render the durable executions panel. `list_state` tracks scroll position. +pub fn render( + snapshot: &DurableSnapshot, + frame: &mut Frame, + area: Rect, + list_state: &mut ListState, +) { + let theme = Theme::default(); + let block = Block::default() + .borders(Borders::ALL) + .border_style(theme.panel_border) + .title(Span::styled( + " Durable [D] ", + Style::default().add_modifier(Modifier::BOLD), + )); + + let placeholder = if !snapshot.available { + Some((STATUS_UNAVAILABLE, Color::Yellow)) + } else if snapshot.executions.is_empty() { + Some(("No durable executions recorded.", Color::DarkGray)) + } else { + None + }; + + if let Some((text, color)) = placeholder { + let inner = block.inner(area); + frame.render_widget(block, area); + let msg = ratatui::widgets::Paragraph::new(text).style(Style::default().fg(color)); + frame.render_widget(msg, inner); + return; + } + + let header = ListItem::new(Line::from(vec![Span::styled( + format!( + " {:<10} {:<16}{:<10}{:>6} {:>6}", + "ID", "KIND", "STATUS", "STEPS", "AGE" + ), + Style::default().add_modifier(Modifier::BOLD), + )])); + + let mut items: Vec = vec![header]; + for (i, row) in snapshot.executions.iter().enumerate() { + let selected = list_state.selected() == Some(i + 1); + let base = if selected { + Style::default().add_modifier(Modifier::REVERSED) + } else { + Style::default() + }; + let color = status_color(&row.status); + let line = Line::from(vec![ + Span::styled(format!(" {:<10} ", row.id_short), base), + Span::styled(format!("{:<16}", row.kind), base), + Span::styled(format!("{:<10}", row.status), base.fg(color)), + Span::styled(format!("{:>6}", row.step_count), base), + Span::styled(format!(" {:>6}", fmt_age(row.age_secs)), base), + ]); + items.push(ListItem::new(line)); + } + + let list = List::new(items) + .block(block) + .highlight_style(Style::default().add_modifier(Modifier::REVERSED)); + frame.render_stateful_widget(list, area, list_state); +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn fmt_age_scales_units() { + assert_eq!(fmt_age(12), "12s"); + assert_eq!(fmt_age(300), "5m"); + assert_eq!(fmt_age(7_200), "2h"); + assert_eq!(fmt_age(172_800), "2d"); + } + + #[test] + fn status_color_maps_known_states() { + assert_eq!(status_color("running"), Color::Green); + assert_eq!(status_color("failed"), Color::Red); + assert_eq!(status_color("mystery"), Color::White); + } +} diff --git a/crates/zeph-tui/src/widgets/mod.rs b/crates/zeph-tui/src/widgets/mod.rs index 7d045553a..357f4380e 100644 --- a/crates/zeph-tui/src/widgets/mod.rs +++ b/crates/zeph-tui/src/widgets/mod.rs @@ -7,6 +7,7 @@ pub mod compaction_badge; pub mod confirm; pub mod context_gauge; pub mod diff; +pub mod durable; pub mod elicitation; pub mod file_picker; pub mod fleet; diff --git a/specs/064-durable-execution/spec.md b/specs/064-durable-execution/spec.md index 23287d291..f4600d694 100644 --- a/specs/064-durable-execution/spec.md +++ b/specs/064-durable-execution/spec.md @@ -246,7 +246,7 @@ helper; deferred post-v1. Both options comply with 031. ``` zeph-durable (Layer 0 — infrastructure, analogous to zeph-db) -Cargo.toml # dep: zeph-db, zeph-common, serde, thiserror, tokio, tracing, blake3, chacha20poly1305 +Cargo.toml # dep: zeph-config, zeph-db, serde, thiserror, tokio, tracing, blake3 (chacha20poly1305 lives in zeph-core) src/ lib.rs # pub re-exports; crate-level //! docs; sealed module sealed.rs # Sealed marker (private supertrait for ExecutionBackend) @@ -264,8 +264,9 @@ src/ replay.rs # ReplayCursor, ReplayDivergence check, range-read cursor writer.rs # JournalWriter actor, JournalMsg enum, group-commit, ACK protocol cipher.rs # PayloadCipher trait, PayloadAad, CipherError - retention.rs # RetentionPolicy, compaction/prune, in-execution step cap - config.rs # DurableConfig (pure-data, mirrors [durable] TOML section) + retention.rs # compaction/prune, in-execution step cap + config.rs # re-exports DurableConfig/RetentionPolicy/DurableBackend from zeph-config; + # owns the EncryptionGate + encryption_gate AEAD policy (free fn) error.rs # DurableError (thiserror) # NO migrations/ directory — durable schema files live in zeph-db/migrations/{sqlite,postgres}/ # and are applied via zeph_db::run_migrations against the dedicated durable.db pool. @@ -288,6 +289,18 @@ and NO `sqlx::migrate!`. The `JobStore` precedent cited in rev-C.3 covers the *p `JobStore::init` calls `zeph_db::run_migrations` — it does not own schema files (`zeph-scheduler/src/store.rs:137`; its schema is `051_scheduler_jobs.sql` inside `zeph-db`). +**Config placement (C6 #4949 reconciliation).** The pure-data `DurableConfig`, `RetentionPolicy`, +and `DurableBackend` live in `zeph-config` — the single source of truth for every subsystem's +config, exactly like `OrchestrationConfig`. This keeps the aggregate `Config` free of the +`zeph-db`/`sqlx` dependency tree (12 leaf crates depend on `zeph-config` without `zeph-db`; pulling +it in would be a workspace-wide compile regression). `zeph-durable` depends on `zeph-config` and +re-exports those types, so `zeph_durable::DurableConfig` still resolves and the engine APIs +(`DurableContext`, `JournalWriter`) consume the same type the root `Config` holds — no duplication, +no conversion. The AEAD enforcement policy (`EncryptionGate` + `encryption_gate`, a free function +returning `DurableError`) stays in `zeph-durable` next to the cipher contract and error type +(data/policy separation). `zeph-config` is pure-data and below `zeph-durable` in the layer DAG, so +this introduces no cycle and does not violate INV-1 (`zeph-config` is not a business-layer crate). + --- ## Core Types diff --git a/src/cli.rs b/src/cli.rs index 42055740a..1e80d4964 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -400,6 +400,11 @@ pub(crate) enum Command { #[command(subcommand)] command: DbCommand, }, + /// Inspect the durable execution journal (connects directly to `durable.db`) + Durable { + #[command(subcommand)] + command: DurableCommand, + }, /// ACP sub-agent client commands #[cfg(feature = "acp")] Acp { @@ -522,6 +527,56 @@ pub(crate) enum DbCommand { Migrate, } +/// Durable execution journal subcommands. +/// +/// All subcommands connect directly to `durable.db`; no running agent process is required. Default +/// output is redacted (INV-5) — payload bytes and resolver tokens are shown only with `--reveal`. +#[derive(Subcommand)] +pub(crate) enum DurableCommand { + /// List durable executions, newest first + List { + /// Filter by status: running | completed | failed | aborted + #[arg(long)] + status: Option, + /// Filter by execution kind (e.g. `agent_turn`, `dag_run`, `scheduled_job`, `subagent_session`) + #[arg(long)] + kind: Option, + /// Maximum number of executions to display + #[arg(long, default_value = "50")] + limit: i64, + }, + /// Show the journal entries of one execution (payload redacted by default) + Show { + /// Execution id (UUID) + id: String, + /// Decrypt and print payload bytes (prints a warning first; requires `ZEPH_DURABLE_KEY`) + #[arg(long)] + reveal: bool, + }, + /// Inspect a single journal step entry + Inspect { + /// Execution id (UUID) + id: String, + /// Step index to inspect + #[arg(long)] + step: u32, + /// Decrypt and print payload bytes (prints a warning first; requires `ZEPH_DURABLE_KEY`) + #[arg(long)] + reveal: bool, + }, + /// Force a retention sweep over terminal executions past their TTL + Prune { + /// Report how many executions would be pruned without deleting anything + #[arg(long)] + dry_run: bool, + }, + /// Trigger a manual replay of a supported execution + Resume { + /// Execution id (UUID) + id: String, + }, +} + /// Typed session status filter for the `agents fleet` sub-command. #[derive(Clone, Copy, Debug, ValueEnum)] pub(crate) enum FleetStatus { diff --git a/src/commands/durable.rs b/src/commands/durable.rs new file mode 100644 index 000000000..ea987acc3 --- /dev/null +++ b/src/commands/durable.rs @@ -0,0 +1,316 @@ +// SPDX-FileCopyrightText: 2026 Andrei G +// SPDX-License-Identifier: MIT OR Apache-2.0 + +//! Handler for the `zeph durable` CLI command group. +//! +//! Connects directly to the dedicated `durable.db` journal — no running agent process is required +//! (FR-DE-08). Default output is redacted: payload bytes and resolver tokens are shown only with +//! `--reveal`, which decrypts through the vault-resolved `ZEPH_DURABLE_KEY` (INV-5, FR-DE-07). + +use std::path::Path; +use std::sync::Arc; + +use zeph_core::config::Config; +use zeph_core::durable::XChaCha20Poly1305Cipher; +use zeph_core::vault::AgeVaultProvider; +use zeph_durable::{ExecutionId, Journal, LocalBackend}; + +use crate::cli::DurableCommand; + +/// Printed before any decrypted payload is shown, so the operator knows plaintext is on screen. +const REVEAL_WARNING: &str = + "WARNING: --reveal decrypts and prints payload bytes in cleartext. Do not share this output."; + +/// Resolve the dedicated `durable.db` path: a sibling of the main database file. +/// +/// The durable journal lives in its own file on its own pool (INV-14), next to `memory.sqlite_path`. +/// Shared with the TUI durable poll task so both target the same file. +pub(crate) fn resolve_durable_db_url(config: &Config) -> String { + let main = config.memory.sqlite_path.as_str(); + match Path::new(main).parent() { + Some(dir) if !dir.as_os_str().is_empty() => { + dir.join("durable.db").to_string_lossy().into_owned() + } + _ => "durable.db".to_owned(), + } +} + +/// Format a Unix-epoch-millisecond timestamp as a readable UTC string, falling back to the raw value. +fn fmt_ts(ms: i64) -> String { + chrono::DateTime::from_timestamp_millis(ms).map_or_else( + || ms.to_string(), + |dt| dt.format("%Y-%m-%d %H:%M:%S").to_string(), + ) +} + +/// Load the AEAD cipher from the vault-stored `ZEPH_DURABLE_KEY` for `--reveal`. +fn load_durable_cipher() -> anyhow::Result { + let dir = zeph_core::vault::default_vault_dir(); + let provider = AgeVaultProvider::load(&dir.join("vault-key.txt"), &dir.join("secrets.age")) + .map_err(|e| anyhow::anyhow!("failed to load vault: {e}"))?; + let key = provider.get("ZEPH_DURABLE_KEY").ok_or_else(|| { + anyhow::anyhow!("ZEPH_DURABLE_KEY not found in vault; cannot --reveal payloads") + })?; + XChaCha20Poly1305Cipher::from_vault_b64(key) + .map_err(|e| anyhow::anyhow!("invalid ZEPH_DURABLE_KEY: {e}")) +} + +/// Open the local durable backend, attaching the AEAD cipher when `reveal` is set. +/// +/// Returns `Ok(None)` when no journal file exists yet (a friendly signal that durable execution has +/// not run on this deployment), so the caller can print guidance instead of creating an empty file. +async fn open_backend(config: &Config, reveal: bool) -> anyhow::Result> { + let url = resolve_durable_db_url(config); + if url != ":memory:" && !Path::new(&url).exists() { + println!( + "No durable journal at {url}.\n\ + Durable execution may be disabled; enable it with `[durable] enabled = true`." + ); + return Ok(None); + } + let backend = LocalBackend::open(&url, config.durable.max_payload_bytes) + .await + .map_err(|e| anyhow::anyhow!("failed to open durable journal: {e}"))?; + backend + .init() + .await + .map_err(|e| anyhow::anyhow!("failed to initialize durable schema: {e}"))?; + if reveal { + let cipher = load_durable_cipher()?; + Ok(Some(backend.with_cipher(Arc::new(cipher)))) + } else { + Ok(Some(backend)) + } +} + +/// Dispatch a `zeph durable` subcommand. +/// +/// # Errors +/// +/// Returns an error if the config cannot be loaded, the journal cannot be opened, or a query fails. +pub(crate) async fn handle_durable_command( + cmd: DurableCommand, + config_path: Option<&Path>, +) -> anyhow::Result<()> { + let config_file = crate::bootstrap::resolve_config_path(config_path); + let config = Config::load(&config_file).unwrap_or_default(); + + match cmd { + DurableCommand::List { + status, + kind, + limit, + } => { + let Some(backend) = open_backend(&config, false).await? else { + return Ok(()); + }; + let rows = backend + .list_executions(status.as_deref(), kind.as_deref(), limit) + .await + .map_err(|e| anyhow::anyhow!("failed to list executions: {e}"))?; + if rows.is_empty() { + println!("No durable executions match."); + return Ok(()); + } + println!( + "{:<36} {:<18} {:<10} {:>6} CREATED", + "EXECUTION ID", "KIND", "STATUS", "STEPS" + ); + println!("{}", "-".repeat(96)); + for row in &rows { + println!( + "{:<36} {:<18} {:<10} {:>6} {}", + row.execution_id.as_uuid(), + row.kind, + row.status.as_str(), + row.step_count, + fmt_ts(row.created_at_ms), + ); + } + } + + DurableCommand::Show { id, reveal } => { + let exec = ExecutionId::parse_str(&id) + .map_err(|e| anyhow::anyhow!("invalid execution id '{id}': {e}"))?; + let Some(backend) = open_backend(&config, reveal).await? else { + return Ok(()); + }; + show_entries(&backend, exec, reveal, None).await?; + } + + DurableCommand::Inspect { id, step, reveal } => { + let exec = ExecutionId::parse_str(&id) + .map_err(|e| anyhow::anyhow!("invalid execution id '{id}': {e}"))?; + let Some(backend) = open_backend(&config, reveal).await? else { + return Ok(()); + }; + show_entries(&backend, exec, reveal, Some(step)).await?; + } + + DurableCommand::Prune { dry_run } => { + let Some(backend) = open_backend(&config, false).await? else { + return Ok(()); + }; + let policy = &config.durable.retention; + if dry_run { + let n = backend + .count_prunable(policy) + .await + .map_err(|e| anyhow::anyhow!("failed to count prunable executions: {e}"))?; + println!("Dry run: {n} terminal execution(s) past TTL would be pruned."); + } else { + let n = backend + .prune(policy) + .await + .map_err(|e| anyhow::anyhow!("failed to prune journal: {e}"))?; + println!("Pruned {n} execution(s)."); + } + } + + DurableCommand::Resume { id } => { + let exec = ExecutionId::parse_str(&id) + .map_err(|e| anyhow::anyhow!("invalid execution id '{id}': {e}"))?; + let Some(backend) = open_backend(&config, false).await? else { + return Ok(()); + }; + let entries = backend + .read_execution_redacted(exec) + .await + .map_err(|e| anyhow::anyhow!("failed to read execution: {e}"))?; + if entries.is_empty() { + println!("No journal entries found for execution {id}."); + return Ok(()); + } + // Resume semantics are owned by the per-kind adapters (epic #4707, A1-A4); none are wired + // in this build. Report state honestly rather than pretending to replay. + println!( + "Execution {id} has {} journaled step(s).\n\ + Automatic resume is performed by the agent process for supported execution kinds; \ + standalone CLI replay is not available in this build (durable adapters A1-A4).", + entries.len() + ); + } + } + + Ok(()) +} + +/// Show one execution's entries, optionally filtered to a single `step`, redacted unless `reveal`. +/// +/// `reveal` reads through the AEAD cipher (decrypted payloads) and prints a warning first; otherwise +/// only redaction-safe metadata is shown (INV-5). +/// +/// # Errors +/// +/// Returns an error if the journal read fails. +async fn show_entries( + backend: &LocalBackend, + exec: ExecutionId, + reveal: bool, + step: Option, +) -> anyhow::Result<()> { + if reveal { + println!("{REVEAL_WARNING}\n"); + let mut entries = backend + .read_execution(exec) + .await + .map_err(|e| anyhow::anyhow!("failed to read execution: {e}"))?; + if let Some(s) = step { + entries.retain(|e| e.step_id.value() == s); + } + if entries.is_empty() { + println!("No matching journal entry."); + } else { + print_revealed(&entries); + } + } else { + let mut entries = backend + .read_execution_redacted(exec) + .await + .map_err(|e| anyhow::anyhow!("failed to read execution: {e}"))?; + if let Some(s) = step { + entries.retain(|e| e.step_id.value() == s); + } + if entries.is_empty() { + println!("No matching journal entry."); + } else { + print_redacted(&entries); + } + } + Ok(()) +} + +/// Print redaction-safe entry metadata (default output, INV-5). +fn print_redacted(entries: &[zeph_durable::RedactedEntry]) { + if entries.is_empty() { + println!("No journal entries."); + return; + } + println!( + "{:>6} {:>6} {:<14} {:<20} {:>10} {:<18} CREATED", + "SEQ", "STEP", "ENTRY KIND", "EFFECT CLASS", "BYTES", "IDEM KEY" + ); + println!("{}", "-".repeat(96)); + for e in entries { + println!( + "{:>6} {:>6} {:<14} {:<20} {:>10} {:<18} {}", + e.seq, + e.step_id.value(), + e.entry_kind, + e.effect_class.as_deref().unwrap_or("-"), + e.payload_len, + e.idem_key_prefix.as_deref().unwrap_or("-"), + fmt_ts(e.created_at_ms), + ); + } +} + +/// Print decrypted entry payloads (`--reveal` only). +fn print_revealed(entries: &[zeph_durable::JournalEntry]) { + use zeph_durable::EntryKind; + if entries.is_empty() { + println!("No journal entries."); + return; + } + for e in entries { + let step = e.step_id.value(); + match &e.entry { + EntryKind::StepResult { + payload, effect, .. + } => { + println!( + "step {step} [step_result, {effect:?}] {} bytes:", + payload.len() + ); + println!(" {}", String::from_utf8_lossy(payload)); + } + other => { + println!("step {step} [{}] (no payload)", other.tag()); + } + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn durable_db_is_sibling_of_sqlite_path() { + let mut config = Config::default(); + config.memory.sqlite_path = "/data/zeph/zeph.db".to_owned(); + assert_eq!(resolve_durable_db_url(&config), "/data/zeph/durable.db"); + } + + #[test] + fn durable_db_bare_filename_when_path_has_no_parent() { + let mut config = Config::default(); + config.memory.sqlite_path = "zeph.db".to_owned(); + assert_eq!(resolve_durable_db_url(&config), "durable.db"); + } + + #[test] + fn fmt_ts_formats_epoch_millis_as_utc() { + assert_eq!(fmt_ts(0), "1970-01-01 00:00:00"); + } +} diff --git a/src/commands/mod.rs b/src/commands/mod.rs index f77833242..bb4d11de2 100644 --- a/src/commands/mod.rs +++ b/src/commands/mod.rs @@ -11,6 +11,7 @@ pub(crate) mod classifiers; pub(crate) mod cocoon; pub(crate) mod db; pub(crate) mod doctor; +pub(crate) mod durable; #[cfg(feature = "gonka")] pub(crate) mod gonka; pub(crate) mod ingest; diff --git a/src/init/durable.rs b/src/init/durable.rs new file mode 100644 index 000000000..bcacb1752 --- /dev/null +++ b/src/init/durable.rs @@ -0,0 +1,113 @@ +// SPDX-FileCopyrightText: 2026 Andrei G +// SPDX-License-Identifier: MIT OR Apache-2.0 + +//! Wizard step for the durable execution layer (spec-064, #4949). +//! +//! Prompts whether to enable durable execution, the backend, and optional retention overrides, and +//! generates a fresh AEAD `ZEPH_DURABLE_KEY`. The key is stored in the age vault during the review +//! step ([`store_durable_key`]), never written inline in the config TOML (vault contract spec-038). + +use dialoguer::{Confirm, Input, Select}; +use zeph_core::config::DurableBackend; + +use super::WizardState; + +/// Collect durable-execution settings and generate the AEAD key when enabled. +/// +/// # Errors +/// +/// Returns an error if a prompt cannot be read. +pub(super) fn step_durable(state: &mut WizardState) -> anyhow::Result<()> { + println!("== Durable Execution ==\n"); + + state.durable.enabled = Confirm::new() + .with_prompt("Enable durable execution (crash-resumable agent turns)?") + .default(false) + .interact()?; + + if !state.durable.enabled { + println!(); + return Ok(()); + } + + let backends = ["local (dedicated durable.db)", "restate (external server)"]; + let backend = Select::new() + .with_prompt("Durable backend") + .items(backends) + .default(0) + .interact()?; + state.durable.backend = if backend == 1 { + DurableBackend::Restate + } else { + DurableBackend::Local + }; + + let customize = Confirm::new() + .with_prompt("Customize retention (TTL and size caps)?") + .default(false) + .interact()?; + if customize { + state.durable.retention.ttl_completed_secs = Input::new() + .with_prompt("Completed-execution TTL (seconds)") + .default(state.durable.retention.ttl_completed_secs) + .interact_text()?; + state.durable.retention.ttl_failed_secs = Input::new() + .with_prompt("Failed/aborted-execution TTL (seconds)") + .default(state.durable.retention.ttl_failed_secs) + .interact_text()?; + state.durable.retention.max_executions = Input::new() + .with_prompt("Maximum stored executions") + .default(state.durable.retention.max_executions) + .interact_text()?; + } + + // The key is generated here and stored in the vault during review — never inline in the TOML. + state.durable_key_b64 = Some(zeph_core::durable::generate_durable_key_b64()); + println!("Generated a new ZEPH_DURABLE_KEY (stored in the age vault during review)."); + println!(); + Ok(()) +} + +/// Store the generated `ZEPH_DURABLE_KEY` in the age vault (INV-5 / vault contract spec-038). +/// +/// Initializes the vault if it does not yet exist. A no-op unless durable execution is enabled with +/// a generated key. When the `env` secrets backend is selected, prints guidance instead of writing, +/// since the durable key must live in the age vault. +/// +/// # Errors +/// +/// Returns an error if the vault cannot be initialized, loaded, or saved. +pub(super) fn store_durable_key(state: &WizardState) -> anyhow::Result<()> { + let Some(key) = state.durable_key_b64.as_deref() else { + return Ok(()); + }; + if !state.durable.enabled { + return Ok(()); + } + if state.vault_backend != "age" { + println!( + "Durable execution stores its AEAD key in the age vault. Re-run `zeph --init` with the \ + age backend, then the key will be stored automatically." + ); + return Ok(()); + } + + let dir = zeph_core::vault::default_vault_dir(); + let key_path = dir.join("vault-key.txt"); + let vault_path = dir.join("secrets.age"); + if !key_path.exists() || !vault_path.exists() { + zeph_core::vault::AgeVaultProvider::init_vault(&dir) + .map_err(|e| anyhow::anyhow!("failed to initialize age vault: {e}"))?; + } + let mut provider = zeph_core::vault::AgeVaultProvider::load(&key_path, &vault_path) + .map_err(|e| anyhow::anyhow!("failed to load age vault: {e}"))?; + provider.set_secret_mut("ZEPH_DURABLE_KEY".to_owned(), key.to_owned()); + provider + .save() + .map_err(|e| anyhow::anyhow!("failed to save age vault: {e}"))?; + println!( + "Stored ZEPH_DURABLE_KEY in the age vault ({}).", + vault_path.display() + ); + Ok(()) +} diff --git a/src/init/mod.rs b/src/init/mod.rs index 89ebfceba..1a84d3590 100644 --- a/src/init/mod.rs +++ b/src/init/mod.rs @@ -17,6 +17,7 @@ use zeph_subagent::def::{MemoryScope, PermissionMode}; use zeroize::Zeroizing; pub(super) mod agents; +pub(super) mod durable; pub(super) mod llm; pub(super) mod mcp; pub(super) mod memory; @@ -24,6 +25,7 @@ pub(super) mod security; pub(super) mod worktree; use agents::{step_agents, step_learning, step_orchestration, step_router}; +use durable::step_durable; use llm::step_llm; use mcp::{step_mcp_discovery, step_mcp_remote, step_mcpls, write_mcpls_config}; use memory::{step_context_compression, step_memory}; @@ -263,6 +265,13 @@ pub(crate) struct WizardState { pub(crate) worktree_enabled: bool, pub(crate) worktree_bg_isolation: BgIsolation, pub(crate) worktree_base_ref: WorktreeBaseRef, + // Durable execution layer (spec-064, #4949) + /// The durable section as configured by the wizard (the AEAD key is stored separately in the + /// vault, never inline). + pub(crate) durable: zeph_core::config::DurableConfig, + /// A freshly generated base64 `ZEPH_DURABLE_KEY`, set when durable execution is enabled. Written + /// to the age vault during review, never serialized into the config TOML. + pub(crate) durable_key_b64: Option, } impl Default for WizardState { @@ -442,6 +451,8 @@ impl Default for WizardState { worktree_enabled: false, worktree_bg_isolation: BgIsolation::Worktree, worktree_base_ref: WorktreeBaseRef::Head, + durable: zeph_core::config::DurableConfig::default(), + durable_key_b64: None, } } } @@ -490,6 +501,7 @@ pub fn run(output: Option) -> anyhow::Result<()> { step_update_check(&mut state)?; step_scheduler(&mut state)?; step_orchestration(&mut state)?; + step_durable(&mut state)?; step_daemon(&mut state)?; step_acp(&mut state)?; step_mcpls(&mut state)?; @@ -909,6 +921,9 @@ pub(crate) fn build_config(state: &WizardState) -> Config { config.worktree.base_ref = state.worktree_base_ref.clone(); } + // Durable execution layer (spec-064, #4949). The AEAD key lives only in the vault. + config.durable = state.durable.clone(); + match state.detector_mode.as_deref() { Some("judge") => { config.skills.learning.detector_mode = zeph_core::config::DetectorMode::Judge; @@ -1659,6 +1674,7 @@ fn step_review_and_write(state: &WizardState, output: Option) -> anyhow write_mcpls_config(state, &path)?; } + durable::store_durable_key(state)?; print_secrets_instructions(state); print_next_steps(state, &path); diff --git a/src/runner.rs b/src/runner.rs index c20fa211f..244b624be 100644 --- a/src/runner.rs +++ b/src/runner.rs @@ -558,6 +558,15 @@ pub(crate) async fn run(cli: Cli) -> anyhow::Result<()> { } }; } + Some(Command::Durable { + command: durable_cmd, + }) => { + return crate::commands::durable::handle_durable_command( + durable_cmd, + cli.config.as_deref(), + ) + .await; + } #[cfg(feature = "bench")] Some(Command::Bench { command: bench_cmd }) => { return crate::commands::bench::handle_bench_command(&bench_cmd, cli.config.as_deref()) diff --git a/src/tui_bridge.rs b/src/tui_bridge.rs index 02a0d1925..9e289914c 100644 --- a/src/tui_bridge.rs +++ b/src/tui_bridge.rs @@ -315,6 +315,16 @@ pub(crate) async fn run_tui_agent( forwarders.spawn(fleet_poll_task(db_path, fleet_cfg, agent_tx.clone())); } + if params.config.durable.enabled { + let durable_cfg = params.config.durable.clone(); + let durable_url = crate::commands::durable::resolve_durable_db_url(params.config); + forwarders.spawn(durable_poll_task( + durable_url, + durable_cfg, + agent_tx.clone(), + )); + } + if let Some(tool_rx) = params.tool_rx { forwarders.spawn(forward_tool_events_to_tui(tool_rx, agent_tx.clone())); } @@ -710,3 +720,95 @@ pub(crate) async fn fleet_poll_task( } } } + +/// Refresh interval for the TUI durable executions panel, in seconds. +#[cfg(feature = "tui")] +const DURABLE_REFRESH_SECS: u64 = 5; + +/// Periodically poll the durable journal and forward snapshots to the TUI (spec-064, #4949). +/// +/// Read-only: it never mutates the journal. Runs until the `tx` channel closes (agent exited). Spawned +/// only when `[durable] enabled = true`; otherwise the panel keeps its default state and renders the +/// "non-durable mode" message. +#[cfg(feature = "tui")] +pub(crate) async fn durable_poll_task( + db_url: String, + cfg: zeph_config::DurableConfig, + tx: tokio::sync::mpsc::Sender, +) { + use zeph_tui::widgets::durable::{DurableRow, DurableSnapshot}; + + let backend = match zeph_durable::LocalBackend::open(&db_url, cfg.max_payload_bytes).await { + Ok(b) => b, + Err(e) => { + tracing::warn!(error = %e, "durable poll: failed to open journal; panel shows non-durable mode"); + let _ = tx + .send(zeph_tui::AgentEvent::DurableSnapshot( + DurableSnapshot::default(), + )) + .await; + return; + } + }; + if let Err(e) = backend.init().await { + tracing::warn!(error = %e, "durable poll: schema init failed"); + } + + let mut ticker = tokio::time::interval(Duration::from_secs(DURABLE_REFRESH_SECS)); + ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); + + loop { + ticker.tick().await; + + if tx + .send(zeph_tui::AgentEvent::Status( + "Refreshing durable journal...".to_owned(), + )) + .await + .is_err() + { + break; + } + + let rows = match backend.list_executions(None, None, 200).await { + Ok(r) => r, + Err(e) => { + tracing::debug!(error = %e, "durable poll: list_executions failed"); + let _ = tx.send(zeph_tui::AgentEvent::Status(String::new())).await; + continue; + } + }; + + let _ = tx.send(zeph_tui::AgentEvent::Status(String::new())).await; + + let now_ms = chrono::Utc::now().timestamp_millis(); + let executions: Vec = rows + .into_iter() + .map(|r| DurableRow { + id_short: r + .execution_id + .as_uuid() + .to_string() + .chars() + .take(8) + .collect(), + kind: r.kind, + status: r.status.as_str().to_owned(), + step_count: r.step_count, + age_secs: ((now_ms - r.created_at_ms).max(0) / 1000).cast_unsigned(), + }) + .collect(); + + let snapshot = DurableSnapshot { + available: true, + executions, + }; + if tx + .send(zeph_tui::AgentEvent::DurableSnapshot(snapshot)) + .await + .is_err() + { + break; + } + } +}