diff --git a/README.md b/README.md index d76af2f7..755648ef 100644 --- a/README.md +++ b/README.md @@ -194,7 +194,8 @@ The governing workflow lives at `docs/runbook/local-github-signal-workflow.md`. `decodex serve` owns the local operator listener. It serves the operator dashboard from `GET /` and `GET /dashboard`; published snapshots, active-run updates, and local dashboard controls flow through the `/dashboard/control` WebSocket. The HTTP surface is -kept to dashboard pages/assets and `GET /livez`. +kept to dashboard pages/assets, `GET /livez`, and the local account-control API used by +Decodex App. For dashboard UI development, use the mock operator dashboard server: diff --git a/apps/decodex-app/Sources/DecodexApp/DecodexServerBridge.swift b/apps/decodex-app/Sources/DecodexApp/DecodexServerBridge.swift index d8420290..5ef6c10c 100644 --- a/apps/decodex-app/Sources/DecodexApp/DecodexServerBridge.swift +++ b/apps/decodex-app/Sources/DecodexApp/DecodexServerBridge.swift @@ -11,7 +11,10 @@ actor DecodexServerBridge { private let defaultBaseURL = URL(string: "http://127.0.0.1:8912")! private let defaultListenAddress = "127.0.0.1:8912" + private let liveCheckFreshness: TimeInterval = 5 private var serverBaseURL: URL? + private var liveCheckBaseURL: URL? + private var liveCheckedAt: Date? private var startedProcess: Process? func run(_ request: AppBridgeRequest, as type: T.Type) async throws -> T { @@ -20,6 +23,24 @@ actor DecodexServerBridge { } let baseURL = try await ensureServer() + do { + return try await send(route, baseURL: baseURL, as: type) + } catch let error as DecodexAppBridgeError { + throw error + } catch let error as DecodingError { + throw error + } catch { + clearLive(baseURL) + + return try await send(route, baseURL: try await ensureServer(), as: type) + } + } + + private func send( + _ route: ServerRoute, + baseURL: URL, + as type: T.Type + ) async throws -> T { let url = routeURL(baseURL: baseURL, route: route) var urlRequest = URLRequest(url: url) @@ -47,22 +68,30 @@ actor DecodexServerBridge { ) } + noteLive(baseURL) + return try JSONDecoder().decode(type, from: data) } private func ensureServer() async throws -> URL { + if let serverBaseURL, hasFreshLiveCheck(serverBaseURL) { + return serverBaseURL + } + if let serverBaseURL, await isLive(serverBaseURL) { + noteLive(serverBaseURL) + return serverBaseURL } if let configured = configuredServerURL(), await isLive(configured) { - serverBaseURL = configured + noteLive(configured) return configured } if await isLive(defaultBaseURL) { - serverBaseURL = defaultBaseURL + noteLive(defaultBaseURL) return defaultBaseURL } @@ -71,7 +100,7 @@ actor DecodexServerBridge { for _ in 0..<40 { if await isLive(defaultBaseURL) { - serverBaseURL = defaultBaseURL + noteLive(defaultBaseURL) return defaultBaseURL } @@ -88,6 +117,30 @@ actor DecodexServerBridge { return value.isEmpty ? nil : URL(string: value) } + private func hasFreshLiveCheck(_ baseURL: URL) -> Bool { + guard liveCheckBaseURL == baseURL, let liveCheckedAt else { + return false + } + + return Date().timeIntervalSince(liveCheckedAt) < liveCheckFreshness + } + + private func noteLive(_ baseURL: URL) { + serverBaseURL = baseURL + liveCheckBaseURL = baseURL + liveCheckedAt = Date() + } + + private func clearLive(_ baseURL: URL) { + if serverBaseURL == baseURL { + serverBaseURL = nil + } + if liveCheckBaseURL == baseURL { + liveCheckBaseURL = nil + liveCheckedAt = nil + } + } + private func isLive(_ baseURL: URL) async -> Bool { let url = baseURL.appendingPathComponent("livez") var request = URLRequest(url: url) @@ -117,7 +170,6 @@ actor DecodexServerBridge { "serve", "--api-only", "--listen-address", defaultListenAddress, - "--interval", "30s", ] process.standardOutput = nullDevice process.standardError = nullDevice diff --git a/apps/decodex/src/cli.rs b/apps/decodex/src/cli.rs index 546b723a..a4fa2c03 100644 --- a/apps/decodex/src/cli.rs +++ b/apps/decodex/src/cli.rs @@ -374,9 +374,10 @@ impl RunCommand { #[derive(Debug, Args)] struct ServeCommand { - /// Poll interval between control-plane ticks, for example `60s` or `5m`. - #[arg(long, value_name = "INTERVAL", default_value = "60s", value_parser = parse_duration_arg)] - interval: Duration, + /// Poll interval between control-plane ticks, for example `60s` or `5m`; unavailable with + /// `--api-only`. + #[arg(long, value_name = "INTERVAL", value_parser = parse_duration_arg)] + interval: Option, /// Operator UI listen address. #[arg(long, value_name = "ADDR", default_value = "127.0.0.1:8912")] listen_address: String, @@ -386,9 +387,19 @@ struct ServeCommand { } impl ServeCommand { fn run(&self, config_path: Option<&Path>) -> crate::prelude::Result<()> { + if self.api_only && self.interval.is_some() { + eyre::bail!( + "serve --api-only does not accept --interval because API-only mode does not poll projects." + ); + } + orchestrator::run_control_plane(ServeRequest { config_path, - poll_interval: self.interval, + poll_interval: if self.api_only { + None + } else { + Some(self.interval.unwrap_or_else(|| Duration::from_secs(60))) + }, listen_address: &self.listen_address, api_only: self.api_only, }) @@ -868,7 +879,7 @@ mod tests { assert!(matches!( cli.command, Command::Serve(ServeCommand { interval, listen_address, api_only }) - if interval == Duration::from_secs(30) + if interval == Some(Duration::from_secs(30)) && listen_address == "127.0.0.1:9000" && !api_only )); @@ -878,7 +889,24 @@ mod tests { fn parses_serve_api_only() { let cli = Cli::parse_from(["decodex", "serve", "--api-only"]); - assert!(matches!(cli.command, Command::Serve(ServeCommand { api_only: true, .. }))); + assert!(matches!( + cli.command, + Command::Serve(ServeCommand { interval: None, api_only: true, .. }) + )); + } + + #[test] + fn rejects_serve_api_only_with_interval() { + let cli = Cli::parse_from(["decodex", "serve", "--api-only", "--interval", "30s"]); + let Command::Serve(command) = cli.command else { + panic!("expected serve command"); + }; + let error = + command.run(None).expect_err("api-only serve must reject interval configuration"); + let message = error.to_string(); + + assert!(message.contains("--api-only")); + assert!(message.contains("--interval")); } #[test] diff --git a/apps/decodex/src/orchestrator/entrypoints.rs b/apps/decodex/src/orchestrator/entrypoints.rs index 7754df3c..a39bd33d 100644 --- a/apps/decodex/src/orchestrator/entrypoints.rs +++ b/apps/decodex/src/orchestrator/entrypoints.rs @@ -102,19 +102,61 @@ pub(crate) fn run_once(request: RunOnceRequest<'_>) -> Result<()> { } pub(crate) fn run_control_plane(request: ServeRequest<'_>) -> Result<()> { - if request.poll_interval.is_zero() { - eyre::bail!("serve interval must be greater than zero."); - } if request.api_only && request.config_path.is_some() { eyre::bail!( "serve --api-only does not accept --config because it must not register or poll projects." ); } + if request.api_only && request.poll_interval.is_some() { + eyre::bail!( + "serve --api-only does not accept --interval because API-only mode does not poll projects." + ); + } validate_daemon_runtime()?; let state_store = Arc::new(runtime::open_runtime_store()?); + if request.api_only { + let operator_state_endpoint = + OperatorStateEndpoint::start(request.listen_address, Arc::clone(&state_store))?; + let runtime_db_path = runtime::runtime_db_path()?; + let global_config_path = runtime::global_config_path()?; + let project_config_dir = runtime::project_config_dir()?; + let snapshot = run_control_plane_api_only_tick(&state_store)?; + + if let Err(error) = operator_state_endpoint.publish_snapshot(&snapshot) { + let _ = error; + + tracing::warn!( + "Operator snapshot publish failed; sensitive runtime details were withheld from control-plane logs." + ); + } + + tracing::info!( + listen_address = %operator_state_endpoint.listen_address(), + path = OPERATOR_DASHBOARD_ALIAS_ENDPOINT_PATH, + ws_path = OPERATOR_DASHBOARD_WS_ENDPOINT_PATH, + api_only = true, + runtime_db_path = %runtime_db_path.display(), + global_config_path = %global_config_path.display(), + project_config_dir = %project_config_dir.display(), + "Starting Decodex API-only operator endpoint." + ); + + loop { + thread::park(); + } + } + + let poll_interval = request + .poll_interval + .unwrap_or_else(|| Duration::from_secs(60)); + + if poll_interval.is_zero() { + eyre::bail!("serve interval must be greater than zero."); + } + if let Some(config_path) = request.config_path { let Some(config_path) = resolve_config_path(Some(config_path), &state_store)? else { eyre::bail!( @@ -133,11 +175,11 @@ pub(crate) fn run_control_plane(request: ServeRequest<'_>) -> Result<()> { let mut project_runtimes: HashMap = HashMap::new(); tracing::info!( - poll_interval_s = request.poll_interval.as_secs(), + poll_interval_s = poll_interval.as_secs(), listen_address = %operator_state_endpoint.listen_address(), path = OPERATOR_DASHBOARD_ALIAS_ENDPOINT_PATH, ws_path = OPERATOR_DASHBOARD_WS_ENDPOINT_PATH, - api_only = request.api_only, + api_only = false, runtime_db_path = %runtime_db_path.display(), global_config_path = %global_config_path.display(), project_config_dir = %project_config_dir.display(), @@ -146,11 +188,7 @@ pub(crate) fn run_control_plane(request: ServeRequest<'_>) -> Result<()> { loop { let tick_started_at = Instant::now(); - let snapshot = if request.api_only { - run_control_plane_api_only_tick(&state_store)? - } else { - run_control_plane_tick(&state_store, &mut project_runtimes)? - }; + let snapshot = run_control_plane_tick(&state_store, &mut project_runtimes)?; if let Err(error) = operator_state_endpoint.publish_snapshot(&snapshot) { let _ = error; @@ -160,7 +198,7 @@ pub(crate) fn run_control_plane(request: ServeRequest<'_>) -> Result<()> { ); } - sleep_until_next_tick(request.poll_interval, tick_started_at); + sleep_until_next_tick(poll_interval, tick_started_at); } } diff --git a/apps/decodex/src/orchestrator/types.rs b/apps/decodex/src/orchestrator/types.rs index 29ffb9ef..8eee5019 100644 --- a/apps/decodex/src/orchestrator/types.rs +++ b/apps/decodex/src/orchestrator/types.rs @@ -30,7 +30,7 @@ pub(crate) struct RunOnceRequest<'a> { /// Multi-project local control-plane daemon request. pub(crate) struct ServeRequest<'a> { pub(crate) config_path: Option<&'a Path>, - pub(crate) poll_interval: Duration, + pub(crate) poll_interval: Option, pub(crate) listen_address: &'a str, pub(crate) api_only: bool, } diff --git a/docs/spec/runtime.md b/docs/spec/runtime.md index 35dca40c..1ff6a976 100644 --- a/docs/spec/runtime.md +++ b/docs/spec/runtime.md @@ -391,7 +391,7 @@ After a process restart, recent-run history, active lease ownership, retained po - During an active run, operator snapshots must expose `thread_id` as soon as the Codex thread exists, plus monotonically advancing `event_count`, `last_event_type`, and `last_event_at` once protocol events are recorded. These fields may be hydrated either from the current process journal or from the active lane's `.decodex-run-activity` marker when `status` is running in a separate process. - `thread_id = null` is expected only before the worker creates the Codex thread for the current run. `event_count = 0`, `last_event_type = null`, and `last_event_at = null` are expected only before the first protocol event for that same run. After the thread exists and protocol activity has started, those empty values indicate missing hydration rather than normal progress. - Operator snapshots may expose an additive `protocol_activity` object derived from app-server structured messages for the current run. The object stays local/operator-only and should summarize turn status, waiting reason, rate-limit status, and a compact recent event list for high-value app-server activity such as `turn/started`, `turn/completed`, plan updates, diff updates, item start/completion, command output deltas, server request responses, account updates, and rate-limit updates. Missing `protocol_activity` means no structured summary was captured yet; consumers must continue to rely on the older `event_count`, `last_event_type`, `last_event_at`, thread fields, and `child_agent_activity` fields when it is absent. -- The operator snapshot transport must stay local/operator-only. `decodex serve` exposes the human-facing operator console from the canonical HTTP `GET /` and `GET /dashboard` routes, serves only the necessary dashboard assets and `GET /livez` liveness probe over HTTP, and delivers published snapshots, active-run activity, and dashboard control acknowledgements through the local `GET /dashboard/control` WebSocket upgrade. +- The operator snapshot transport must stay local/operator-only. `decodex serve` exposes the human-facing operator console from the canonical HTTP `GET /` and `GET /dashboard` routes, serves only the necessary dashboard assets, `GET /livez` liveness probe, and local account-control API over HTTP, and delivers published snapshots, active-run activity, and dashboard control acknowledgements through the local `GET /dashboard/control` WebSocket upgrade. - `GET /livez` is only a process- and listener-level liveness probe. It must not claim control-plane tick freshness or forward progress by itself. - The dashboard must not depend on a separate HTTP snapshot or readiness endpoint; snapshot freshness belongs to the WebSocket-delivered snapshot payload and the browser connection state. - Reconciliation must mark locally active run attempts as `interrupted` when their