From b1d59f2c8358e2b5c8b6eb3c5a89fd4529c77c8a Mon Sep 17 00:00:00 2001 From: Yvette Carlisle Date: Thu, 14 May 2026 22:55:56 +0800 Subject: [PATCH] {"schema":"decodex/commit/1","summary":"Switch dashboard snapshots to WebSocket","authority":"manual"} --- README.md | 13 +- apps/decodex/src/orchestrator.rs | 2 - apps/decodex/src/orchestrator/daemon.rs | 4 - apps/decodex/src/orchestrator/entrypoints.rs | 10 +- .../src/orchestrator/operator_dashboard.html | 280 ++++-------- .../decodex/src/orchestrator/operator_http.rs | 222 +++------- .../tests/operator/status/dashboard.rs | 113 ++--- .../tests/operator/status/http.rs | 399 +++++------------- .../tests/operator/status/running_lanes.rs | 2 +- .../tests/operator/status_support.rs | 5 +- apps/decodex/src/orchestrator/types.rs | 11 +- dev/operator-dashboard-mock.mjs | 73 ++-- docs/reference/operator-control-plane.md | 45 +- docs/reference/test-suite.md | 6 +- docs/runbook/recover-review-handoff.md | 2 +- docs/runbook/self-dogfood-pilot.md | 53 +-- docs/spec/runtime.md | 4 +- 17 files changed, 405 insertions(+), 839 deletions(-) diff --git a/README.md b/README.md index 6457fd8a..ebfea997 100644 --- a/README.md +++ b/README.md @@ -17,8 +17,8 @@ Repo-native agent orchestration and public Codex signal publishing. - Rust CLI and runtime for repo-native retained coding-agent lanes. - Explicit project registry under `~/.codex/decodex/projects//`. -- Local operator listener with a read-only dashboard at `/` and `/dashboard`, plus - `GET /state` for JSON snapshots. +- Local operator listener with a dashboard at `/` and `/dashboard`, WebSocket + snapshot/control traffic at `/dashboard/control`, and `GET /livez` for liveness. - Static Astro site that publishes GitHub-backed Codex change signals. - Deterministic GitHub signal pipeline for change bundles, release deltas, rendered signal entries, and content validation. @@ -185,11 +185,12 @@ The governing workflow lives at `docs/runbook/local-github-signal-workflow.md`. ## Operator Dashboard -`decodex serve` owns the local operator listener. It serves one read-only operator -console from `GET /` and `GET /dashboard`, plus the same JSON status snapshot model from -`GET /state`. +`decodex serve` owns the local operator listener. It serves the operator dashboard from +`GET /` and `GET /dashboard`; published snapshots, active-run updates, and local +dashboard controls flow through the `/dashboard/control` WebSocket. The HTTP surface is +kept to dashboard pages/assets and `GET /livez`. -For dashboard UI development, use the mock operator state server: +For dashboard UI development, use the mock operator dashboard server: ```sh node dev/operator-dashboard-mock.mjs --listen-address 127.0.0.1:57399 diff --git a/apps/decodex/src/orchestrator.rs b/apps/decodex/src/orchestrator.rs index 893ac5f5..42599f8a 100644 --- a/apps/decodex/src/orchestrator.rs +++ b/apps/decodex/src/orchestrator.rs @@ -78,9 +78,7 @@ const TRACKER_RATE_LIMIT_WARNING: &str = "tracker_rate_limited"; const OPERATOR_DASHBOARD_ENDPOINT_PATH: &str = "/"; const OPERATOR_DASHBOARD_ALIAS_ENDPOINT_PATH: &str = "/dashboard"; const OPERATOR_DASHBOARD_WS_ENDPOINT_PATH: &str = "/dashboard/control"; -const OPERATOR_STATE_ENDPOINT_PATH: &str = "/state"; const OPERATOR_LIVE_ENDPOINT_PATH: &str = "/livez"; -const OPERATOR_READY_ENDPOINT_PATH: &str = "/readyz"; const OPERATOR_STATE_MAX_REQUEST_BYTES: usize = 8_192; const OPERATOR_DASHBOARD_WS_CLIENT_MESSAGE_MAX_BYTES: usize = 64 * 1_024; const OPERATOR_STATE_HEADER_TERMINATOR: &[u8] = b"\r\n\r\n"; diff --git a/apps/decodex/src/orchestrator/daemon.rs b/apps/decodex/src/orchestrator/daemon.rs index 99d41bc1..cb0d5803 100644 --- a/apps/decodex/src/orchestrator/daemon.rs +++ b/apps/decodex/src/orchestrator/daemon.rs @@ -209,10 +209,6 @@ where Ok(snapshot) } -fn operator_snapshot_ready_stale_after(poll_interval: Duration) -> Duration { - poll_interval.checked_mul(2).unwrap_or(Duration::MAX) -} - fn inspect_or_clear_active_children( active_children: &mut Vec, retry_queue: &mut RetryQueue, diff --git a/apps/decodex/src/orchestrator/entrypoints.rs b/apps/decodex/src/orchestrator/entrypoints.rs index 3259afdf..a33e7a7d 100644 --- a/apps/decodex/src/orchestrator/entrypoints.rs +++ b/apps/decodex/src/orchestrator/entrypoints.rs @@ -120,11 +120,8 @@ pub(crate) fn run_control_plane(request: ServeRequest<'_>) -> Result<()> { runtime::register_project_config(&state_store, &config_path, true)?; } - let operator_state_endpoint = OperatorStateEndpoint::start( - request.listen_address, - operator_snapshot_ready_stale_after(request.poll_interval), - Arc::clone(&state_store), - )?; + let operator_state_endpoint = + OperatorStateEndpoint::start(request.listen_address, Arc::clone(&state_store))?; let runtime_db_path = runtime::runtime_db_path()?; let global_config_path = runtime::global_config_path()?; let project_config_dir = runtime::project_config_dir()?; @@ -133,7 +130,8 @@ pub(crate) fn run_control_plane(request: ServeRequest<'_>) -> Result<()> { tracing::info!( poll_interval_s = request.poll_interval.as_secs(), listen_address = %operator_state_endpoint.listen_address(), - path = OPERATOR_STATE_ENDPOINT_PATH, + path = OPERATOR_DASHBOARD_ALIAS_ENDPOINT_PATH, + ws_path = OPERATOR_DASHBOARD_WS_ENDPOINT_PATH, runtime_db_path = %runtime_db_path.display(), global_config_path = %global_config_path.display(), project_config_dir = %project_config_dir.display(), diff --git a/apps/decodex/src/orchestrator/operator_dashboard.html b/apps/decodex/src/orchestrator/operator_dashboard.html index 78b63107..be5ef68a 100644 --- a/apps/decodex/src/orchestrator/operator_dashboard.html +++ b/apps/decodex/src/orchestrator/operator_dashboard.html @@ -3406,7 +3406,6 @@

Run History

diff --git a/apps/decodex/src/orchestrator/operator_http.rs b/apps/decodex/src/orchestrator/operator_http.rs index 89da2f7c..335e1149 100644 --- a/apps/decodex/src/orchestrator/operator_http.rs +++ b/apps/decodex/src/orchestrator/operator_http.rs @@ -21,13 +21,6 @@ const OPERATOR_DASHBOARD_LOGO_TOUCH_PNG: &[u8] = include_bytes!(concat!( static DASHBOARD_RUN_INTERRUPTER_FOR_TEST: Mutex> = Mutex::new(None); -#[derive(Clone, Copy, Debug, Eq, PartialEq)] -enum OperatorSnapshotReadiness { - Ready, - SnapshotUnavailable, - SnapshotStale, -} - #[derive(Clone, Copy, Debug, Eq, PartialEq)] enum OperatorRequestRoute { Dashboard, @@ -36,8 +29,6 @@ enum OperatorRequestRoute { DashboardLogoTouchPng, DashboardWs, Live, - Ready, - State, } enum DashboardClientFrame { @@ -160,7 +151,6 @@ fn run_operator_state_endpoint( snapshot: Arc>, dashboard_events: DashboardEventHub, state_store: Arc, - ready_stale_after: Duration, shutdown_rx: Receiver<()>, ) { loop { @@ -169,23 +159,22 @@ fn run_operator_state_endpoint( } match listener.accept() { - Ok((stream, _peer_addr)) => { - let connection_snapshot = Arc::clone(&snapshot); - let connection_dashboard_events = dashboard_events.clone(); - let connection_state_store = Arc::clone(&state_store); - - thread::spawn(move || { - if let Err(error) = handle_operator_state_endpoint_connection( - stream, - &connection_snapshot, - &connection_dashboard_events, - &connection_state_store, - ready_stale_after, - ) { - tracing::warn!(?error, "Operator state endpoint request failed."); - } - }); - }, + Ok((stream, _peer_addr)) => { + let connection_snapshot = Arc::clone(&snapshot); + let connection_dashboard_events = dashboard_events.clone(); + let connection_state_store = Arc::clone(&state_store); + + thread::spawn(move || { + if let Err(error) = handle_operator_state_endpoint_connection( + stream, + &connection_snapshot, + &connection_dashboard_events, + &connection_state_store, + ) { + tracing::warn!(?error, "Operator state endpoint request failed."); + } + }); + }, Err(error) if error.kind() == ErrorKind::WouldBlock => { thread::sleep(Duration::from_millis(20)); }, @@ -203,7 +192,6 @@ fn handle_operator_state_endpoint_connection( snapshot: &Arc>, dashboard_events: &DashboardEventHub, state_store: &Arc, - ready_stale_after: Duration, ) -> Result<()> { stream.set_read_timeout(Some(Duration::from_millis(250)))?; stream.set_write_timeout(Some(Duration::from_millis(250)))?; @@ -222,6 +210,7 @@ fn handle_operator_state_endpoint_connection( handle_operator_dashboard_websocket_connection( stream, &request, + snapshot, dashboard_events, state_store, )?; @@ -229,57 +218,7 @@ fn handle_operator_state_endpoint_connection( return Ok(()); } - let response = match route { - OperatorRequestRoute::Dashboard - | OperatorRequestRoute::DashboardIconPng - | OperatorRequestRoute::DashboardLogoIco - | OperatorRequestRoute::DashboardLogoTouchPng - | OperatorRequestRoute::Live => { - build_operator_state_http_response_for_route( - route, - None, - None, - OperatorSnapshotReadiness::Ready, - ) - }, - OperatorRequestRoute::Ready => { - let last_publish_unix_epoch = snapshot - .lock() - .map_err(|error| eyre::eyre!("Operator state snapshot lock poisoned: {error}"))? - .last_publish_unix_epoch; - - build_operator_state_http_response_for_route( - route, - None, - None, - operator_snapshot_readiness( - last_publish_unix_epoch, - OffsetDateTime::now_utc().unix_timestamp(), - ready_stale_after, - ), - ) - }, - OperatorRequestRoute::State => { - let published_snapshot = snapshot - .lock() - .map_err(|error| eyre::eyre!("Operator state snapshot lock poisoned: {error}"))? - .clone(); - let snapshot_json = published_snapshot - .snapshot_json - .as_ref() - .map(|snapshot_json| snapshot_json_with_live_account_control(snapshot_json)); - - build_operator_state_http_response_for_route( - route, - snapshot_json.as_deref(), - published_snapshot.last_publish_unix_epoch, - OperatorSnapshotReadiness::SnapshotUnavailable, - ) - }, - OperatorRequestRoute::DashboardWs => unreachable!( - "dashboard websocket route is handled before building one-shot responses" - ), - }; + let response = build_operator_state_http_response_for_route(route); stream.write_all(&response)?; @@ -317,6 +256,7 @@ fn snapshot_json_with_live_account_control(snapshot_json: &[u8]) -> Vec { fn handle_operator_dashboard_websocket_connection( mut stream: TcpStream, request: &[u8], + snapshot: &Arc>, dashboard_events: &DashboardEventHub, state_store: &Arc, ) -> Result<()> { @@ -344,14 +284,23 @@ fn handle_operator_dashboard_websocket_connection( &dashboard_control_ready_payload(&session.subscription), )?; + if let Some(payload) = dashboard_current_snapshot_event_payload(snapshot)? { + write_dashboard_websocket_event(&mut stream, "snapshot", &payload)?; + } + loop { for frame in read_dashboard_websocket_client_frames(&mut stream, &mut client_frame_buffer)? { match frame { DashboardClientFrame::Text(payload) => { - let response = - handle_dashboard_client_message(&mut session, state_store, &payload); + let response = handle_dashboard_client_message(&mut session, state_store, &payload); write_dashboard_websocket_event(&mut stream, "controlAck", &response)?; + + if dashboard_control_ack_should_push_snapshot(&response) + && let Some(payload) = dashboard_current_snapshot_event_payload(snapshot)? + { + write_dashboard_websocket_event(&mut stream, "snapshot", &payload)?; + } }, DashboardClientFrame::Close => return Ok(()), DashboardClientFrame::Ping(payload) => { @@ -462,6 +411,33 @@ fn build_operator_run_activity_event(state_store: &StateStore) -> Result>, +) -> Result> { + let published_snapshot = snapshot + .lock() + .map_err(|error| eyre::eyre!("Operator state snapshot lock poisoned: {error}"))? + .clone(); + let Some(snapshot_json) = published_snapshot.snapshot_json.as_ref() else { + return Ok(None); + }; + let snapshot_json = snapshot_json_with_live_account_control(snapshot_json); + let snapshot = serde_json::from_slice::(&snapshot_json)?; + + Ok(Some(json!({ + "snapshotPublishedAtUnixEpoch": published_snapshot.last_publish_unix_epoch, + "snapshot": snapshot, + }))) +} + +fn dashboard_control_ack_should_push_snapshot(ack: &Value) -> bool { + ack.get("accepted").and_then(Value::as_bool).unwrap_or(false) + && matches!( + ack.get("action").and_then(Value::as_str), + Some("selectAccount" | "clearAccountSelection") + ) +} + fn write_dashboard_websocket_event( stream: &mut TcpStream, event_type: &'static str, @@ -1258,31 +1234,6 @@ fn operator_http_header_contains_token(value: &str, token: &str) -> bool { .any(|candidate| candidate.trim().eq_ignore_ascii_case(token)) } -fn operator_snapshot_readiness( - last_publish_unix_epoch: Option, - now_unix_epoch: i64, - ready_stale_after: Duration, -) -> OperatorSnapshotReadiness { - let Some(last_publish_unix_epoch) = last_publish_unix_epoch else { - return OperatorSnapshotReadiness::SnapshotUnavailable; - }; - - if last_publish_unix_epoch > now_unix_epoch { - return OperatorSnapshotReadiness::SnapshotStale; - } - - let Some(snapshot_age_seconds) = now_unix_epoch.checked_sub(last_publish_unix_epoch) else { - return OperatorSnapshotReadiness::SnapshotStale; - }; - let ready_stale_after_seconds = i64::try_from(ready_stale_after.as_secs()).unwrap_or(i64::MAX); - - if snapshot_age_seconds <= ready_stale_after_seconds { - OperatorSnapshotReadiness::Ready - } else { - OperatorSnapshotReadiness::SnapshotStale - } -} - fn read_operator_state_request_headers(stream: &mut TcpStream) -> Result> { let mut request = Vec::with_capacity(1_024); @@ -1311,30 +1262,16 @@ fn read_operator_state_request_headers(stream: &mut TcpStream) -> Result } #[cfg(test)] -fn build_operator_state_http_response( - request: &[u8], - snapshot_json: Option<&[u8]>, - readiness: OperatorSnapshotReadiness, -) -> Result> { +fn build_operator_state_http_response(request: &[u8]) -> Result> { let route = match parse_operator_state_request_route(request) { Ok(route) => route, Err(response) => return Ok(response), }; - Ok(build_operator_state_http_response_for_route( - route, - snapshot_json, - None, - readiness, - )) + Ok(build_operator_state_http_response_for_route(route)) } -fn build_operator_state_http_response_for_route( - route: OperatorRequestRoute, - snapshot_json: Option<&[u8]>, - snapshot_last_publish_unix_epoch: Option, - readiness: OperatorSnapshotReadiness, -) -> Vec { +fn build_operator_state_http_response_for_route(route: OperatorRequestRoute) -> Vec { match route { OperatorRequestRoute::Dashboard => { http_response_bytes("200 OK", "text/html; charset=utf-8", OPERATOR_DASHBOARD_HTML.as_bytes()) @@ -1352,33 +1289,6 @@ fn build_operator_state_http_response_for_route( OperatorRequestRoute::Live => { http_response_bytes("200 OK", "text/plain; charset=utf-8", b"ok") }, - OperatorRequestRoute::Ready => match readiness { - OperatorSnapshotReadiness::Ready => { - http_response_bytes("200 OK", "text/plain; charset=utf-8", b"ready") - }, - OperatorSnapshotReadiness::SnapshotUnavailable => http_response_bytes( - "503 Service Unavailable", - "text/plain; charset=utf-8", - b"snapshot_unavailable", - ), - OperatorSnapshotReadiness::SnapshotStale => http_response_bytes( - "503 Service Unavailable", - "text/plain; charset=utf-8", - b"snapshot_stale", - ), - }, - OperatorRequestRoute::State => match snapshot_json { - Some(snapshot_json) => { - let headers = snapshot_response_headers(snapshot_last_publish_unix_epoch); - - http_response_bytes_with_headers("200 OK", "application/json", &headers, snapshot_json) - }, - None => http_response_bytes( - "503 Service Unavailable", - "text/plain; charset=utf-8", - b"operator snapshot unavailable", - ), - }, } } @@ -1433,8 +1343,6 @@ fn parse_operator_state_request_route( "/assets/logo-touch.png" => Ok(OperatorRequestRoute::DashboardLogoTouchPng), OPERATOR_DASHBOARD_WS_ENDPOINT_PATH => Ok(OperatorRequestRoute::DashboardWs), OPERATOR_LIVE_ENDPOINT_PATH => Ok(OperatorRequestRoute::Live), - OPERATOR_READY_ENDPOINT_PATH => Ok(OperatorRequestRoute::Ready), - OPERATOR_STATE_ENDPOINT_PATH => Ok(OperatorRequestRoute::State), _ => Err(http_response_bytes( "404 Not Found", "text/plain; charset=utf-8", @@ -1443,16 +1351,6 @@ fn parse_operator_state_request_route( } } -fn snapshot_response_headers( - last_publish_unix_epoch: Option, -) -> Vec<(&'static str, String)> { - last_publish_unix_epoch - .map(|last_publish_unix_epoch| { - vec![("X-Decodex-Snapshot-Unix-Epoch", last_publish_unix_epoch.to_string())] - }) - .unwrap_or_default() -} - fn http_response_bytes(status_line: &str, content_type: &str, body: &[u8]) -> Vec { http_response_bytes_with_headers(status_line, content_type, &[], body) } diff --git a/apps/decodex/src/orchestrator/tests/operator/status/dashboard.rs b/apps/decodex/src/orchestrator/tests/operator/status/dashboard.rs index 6bc973b1..393e20ef 100644 --- a/apps/decodex/src/orchestrator/tests/operator/status/dashboard.rs +++ b/apps/decodex/src/orchestrator/tests/operator/status/dashboard.rs @@ -6,8 +6,6 @@ fn dashboard_response() -> String { orchestrator::OPERATOR_DASHBOARD_ENDPOINT_PATH ) .as_bytes(), - None, - OperatorSnapshotReadiness::SnapshotUnavailable, ) .expect("dashboard response should build"), ) @@ -251,19 +249,7 @@ fn operator_dashboard_patches_active_run_cards_without_replacing_the_list() { #[test] fn operator_dashboard_child_bucket_rows_split_time_bars_from_event_diagnostics() { - let response = String::from_utf8( - orchestrator::build_operator_state_http_response( - format!( - "GET {} HTTP/1.1\r\nHost: localhost\r\nConnection: close\r\n\r\n", - orchestrator::OPERATOR_DASHBOARD_ENDPOINT_PATH - ) - .as_bytes(), - None, - OperatorSnapshotReadiness::SnapshotUnavailable, - ) - .expect("dashboard response should build"), - ) - .expect("dashboard response should be utf-8"); + let response = dashboard_response(); assert!(response.contains("childBucketIsSubsecond")); assert!(response.contains("childBucketIsEventOnly")); @@ -1040,19 +1026,7 @@ fn operator_dashboard_omits_watch_and_project_pause_controls() { #[test] fn operator_dashboard_projects_keep_status_summary_compact() { - let response = String::from_utf8( - orchestrator::build_operator_state_http_response( - format!( - "GET {} HTTP/1.1\r\nHost: localhost\r\nConnection: close\r\n\r\n", - orchestrator::OPERATOR_DASHBOARD_ENDPOINT_PATH - ) - .as_bytes(), - None, - OperatorSnapshotReadiness::SnapshotUnavailable, - ) - .expect("dashboard response should build"), - ) - .expect("dashboard response should be utf-8"); + let response = dashboard_response(); assert!(response.contains("function projectCapacitySummary(project)")); assert!(response.contains("function renderProjectStats(project)")); @@ -1322,19 +1296,7 @@ fn operator_dashboard_empty_lane_meta_uses_counts() { #[test] fn operator_dashboard_flow_counts_distinguish_intake_attention() { - let response = String::from_utf8( - orchestrator::build_operator_state_http_response( - format!( - "GET {} HTTP/1.1\r\nHost: localhost\r\nConnection: close\r\n\r\n", - orchestrator::OPERATOR_DASHBOARD_ENDPOINT_PATH - ) - .as_bytes(), - None, - OperatorSnapshotReadiness::SnapshotUnavailable, - ) - .expect("dashboard response should build"), - ) - .expect("dashboard response should be utf-8"); + let response = dashboard_response(); assert!(response.contains("queuedCandidateNeedsAttention")); assert!(response.contains("intakeAttentionCount")); @@ -1410,25 +1372,14 @@ fn operator_dashboard_prioritizes_needs_attention_reason_over_retry_count() { #[test] fn operator_dashboard_header_shows_endpoint_and_snapshot_freshness() { - let response = String::from_utf8( - orchestrator::build_operator_state_http_response( - format!( - "GET {} HTTP/1.1\r\nHost: localhost\r\nConnection: close\r\n\r\n", - orchestrator::OPERATOR_DASHBOARD_ENDPOINT_PATH - ) - .as_bytes(), - None, - OperatorSnapshotReadiness::SnapshotUnavailable, - ) - .expect("dashboard response should build"), - ) - .expect("dashboard response should be utf-8"); + let response = dashboard_response(); - assert!(response.contains("SNAPSHOT_PUBLISHED_HEADER")); - assert!(response.contains("x-decodex-snapshot-unix-epoch")); + assert!(response.contains("const DASHBOARD_WEBSOCKET_ENDPOINT = \"/dashboard/control\";")); + assert!(!response.contains("SNAPSHOT_PUBLISHED_HEADER")); + assert!(!response.contains("x-decodex-snapshot-unix-epoch")); assert!(!response.contains("function dashboardEndpointMeta(path)")); assert!(response.contains("function dashboardSocketUrl()")); - assert!(response.contains("function snapshotPublishedAtFromResponse(response)")); + assert!(!response.contains("function snapshotPublishedAtFromResponse(response)")); assert!(response.contains("function snapshotAgeSeconds(snapshotPublishedAt)")); assert!(response.contains("function snapshotFreshnessMeta(")); assert!(response.contains("function topbarReadinessLabel(label)")); @@ -1439,7 +1390,7 @@ fn operator_dashboard_header_shows_endpoint_and_snapshot_freshness() { assert!(response.contains("Transport${renderValueLink(\"WebSocket\", dashboardSocketUrl(), \"transport-link\") || escapeHtml(dashboardSocketUrl())}")); assert!(!response.contains("topbarStreamLabel(stream.label)")); - assert!(response.contains("Poll fallback: ${escapeHtml(ENDPOINTS.state)}")); + assert!(!response.contains("Poll fallback")); assert!(response.contains("Snapshot")); assert!(response.contains("case \"Snapshot ready\":")); @@ -1451,11 +1402,15 @@ fn operator_dashboard_header_shows_endpoint_and_snapshot_freshness() { assert!(response.contains("const snapshotFreshnessRow = snapshotFreshness")); assert!(response.contains("return null;")); assert!(response.contains("const staleByAge = ageSeconds != null && ageSeconds >= 30;")); - assert!(response.contains("const staleByReadiness = readiness.label === \"Snapshot stale\";")); + assert!(!response.contains("const staleByReadiness")); assert!(response.contains("data-tone=\"${escapeHtml(snapshotFreshness.tone)}\"")); assert!(response.contains("Published ${formatTimestamp(snapshotPublishedAt)}")); assert!(response.contains("formatRelativeTimestamp(snapshotPublishedAt)")); - assert!(response.contains("snapshotPublishedAt = stateResult.value.snapshotPublishedAt")); + assert!(response.contains("unixEpochSecondsToIso(payload.snapshotPublishedAtUnixEpoch)")); + assert!(!response.contains("stateResult.value.snapshotPublishedAt")); + assert!(!response.contains("readinessResponse")); + assert!(!response.contains("body: \"ready\"")); + assert!(!response.contains("polling active")); assert!( response.contains("renderHeader(snapshot, readiness, notices, snapshotPublishedAt, snapshotError)") ); @@ -1467,19 +1422,7 @@ fn operator_dashboard_header_shows_endpoint_and_snapshot_freshness() { #[test] fn operator_dashboard_active_freshness_prefers_live_activity_source() { - let response = String::from_utf8( - orchestrator::build_operator_state_http_response( - format!( - "GET {} HTTP/1.1\r\nHost: localhost\r\nConnection: close\r\n\r\n", - orchestrator::OPERATOR_DASHBOARD_ENDPOINT_PATH - ) - .as_bytes(), - None, - OperatorSnapshotReadiness::SnapshotUnavailable, - ) - .expect("dashboard response should build"), - ) - .expect("dashboard response should be utf-8"); + let response = dashboard_response(); assert!(response.contains("function activeRunFreshness(run)")); assert!(response.contains("source: \"last_run_activity_at\"")); @@ -1549,6 +1492,8 @@ fn operator_dashboard_run_activity_preserves_snapshot_detail_fields() { assert!(response.contains("function mergeDashboardRunRecord(snapshotRun, activityRun)")); assert!(response.contains("function mergeDashboardActiveRuns(snapshot, activeRunRows)")); + assert!(response.contains("let dashboardLiveActiveRuns = [];")); + assert!(response.contains("function snapshotWithLiveRunActivity(snapshot)")); assert!(response.contains("\"issue_identifier\"")); assert!(response.contains("\"title\"")); assert!(response.contains("\"child_agent_activity\"")); @@ -1558,6 +1503,28 @@ fn operator_dashboard_run_activity_preserves_snapshot_detail_fields() { assert!( response.contains("const mergedActiveRuns = mergeDashboardActiveRuns(snapshot, activeRunRows);") ); + assert!(response.contains("dashboardLiveActiveRuns = payload.activeRuns")); + assert!(response.contains("snapshot: snapshotWithLiveRunActivity(payload.snapshot),")); assert!(response.contains("active_runs: mergedActiveRuns,")); assert!(!response.contains("active_runs: activeRunRows,")); } + +#[test] +fn operator_dashboard_uses_websocket_without_http_state_fallback() { + let response = dashboard_response(); + + assert!(response.contains("connectDashboardSocket();")); + assert!(response.contains("function startDashboardStream()")); + assert!(response.contains("startDashboardStream();")); + assert!(response.contains("if (!document.hidden && !dashboardSocketIsOpen()) {\n\t\t\t\t\tconnectDashboardSocket();")); + assert!(!response.contains("function scheduleDashboardHttpFallback")); + assert!(!response.contains("clearDashboardHttpFallback();")); + assert!(!response.contains("requestJson(")); + assert!(!response.contains("requestText(")); + assert!(!response.contains("fetch(")); + assert!(!response.contains("\"/state\"")); + assert!(!response.contains("\"/readyz\"")); + assert!(!response.contains("window.setInterval(refreshDashboard")); + assert!(!response.contains("function refreshDashboard")); + assert!(!response.contains("const REFRESH_INTERVAL_MS")); +} diff --git a/apps/decodex/src/orchestrator/tests/operator/status/http.rs b/apps/decodex/src/orchestrator/tests/operator/status/http.rs index 5514bcce..9376af3a 100644 --- a/apps/decodex/src/orchestrator/tests/operator/status/http.rs +++ b/apps/decodex/src/orchestrator/tests/operator/status/http.rs @@ -2,163 +2,6 @@ use std::net::SocketAddr; use crate::runtime; -#[test] -fn operator_state_endpoint_serves_snapshot_json() { - let (_temp_dir, config, _workflow) = temp_project_layout(); - let state_store = StateStore::open_in_memory().expect("state store should open"); - let issue = sample_issue("Todo", &[]); - let worktree_path = config.worktree_root().join("PUB-101"); - - state_store - .record_run_attempt("run-1", &issue.id, 1, "running") - .expect("run attempt should record"); - state_store - .upsert_lease("pubfi", &issue.id, "run-1", "In Progress") - .expect("lease should record"); - state_store - .upsert_worktree( - "pubfi", - &issue.id, - "x/pubfi-pub-101", - &worktree_path.display().to_string(), - ) - .expect("worktree should record"); - - state::write_run_protocol_activity_marker( - &worktree_path, - &ProtocolActivityMarker { - run_id: "run-1", - attempt_number: 1, - thread_id: Some("thread-1"), - turn_id: Some("turn-1"), - event_count: 4, - last_event_type: "item/tool/call/response", - child_agent_activity: Some(&ChildAgentActivitySummary { - buckets: vec![state::ChildAgentActivityBucket { - name: String::from("Browser/Image"), - wall_seconds: 41, - event_count: 4, - tool_call_count: 2, - input_tokens: 0, - output_tokens: 0, - output_bytes: 240_000, - }], - current_bucket: Some(String::from("Model")), - current_detail: Some(String::from("waiting after tool output")), - current_started_unix_epoch: None, - current_elapsed_seconds: Some(0), - wall_seconds: 693, - event_count: 4, - tool_call_count: 2, - input_tokens_current: Some(135_000), - input_tokens_max: Some(135_000), - input_tokens_cumulative: 6_510_000, - output_tokens_cumulative: 18_000, - largest_tool_output_bytes: Some(240_000), - largest_tool_output_tool: Some(String::from("view_image")), - large_output_warnings: vec![String::from( - "view_image repeated 2 large outputs; largest 240000 bytes", - )], - }), - protocol_activity: None, - }, - ) - .expect("child activity marker should write"); - - let snapshot = orchestrator::build_operator_status_snapshot(&config, &state_store, 10) - .expect("snapshot should build"); - let snapshot_json = serde_json::to_vec(&snapshot).expect("snapshot json should serialize"); - let response = String::from_utf8( - orchestrator::build_operator_state_http_response( - format!( - "GET {} HTTP/1.1\r\nHost: localhost\r\nConnection: close\r\n\r\n", - orchestrator::OPERATOR_STATE_ENDPOINT_PATH - ) - .as_bytes(), - Some(snapshot_json.as_slice()), - OperatorSnapshotReadiness::Ready, - ) - .expect("response build should succeed"), - ) - .expect("response should be utf-8"); - let (status_line, body) = - response.split_once("\r\n").expect("response should contain a status line"); - let body = body.split_once("\r\n\r\n").expect("response should contain a body").1; - let served_snapshot: Value = serde_json::from_str(body).expect("body should be valid json"); - - assert_eq!(status_line, "HTTP/1.1 200 OK"); - assert_eq!(served_snapshot["project_id"], "pubfi"); - assert_eq!(served_snapshot["run_limit"], 10); - assert_eq!(served_snapshot["active_runs"][0]["run_id"], "run-1"); - assert_eq!(served_snapshot["active_runs"][0]["status"], "running"); - assert_eq!(served_snapshot["active_runs"][0]["attempt_status"], "running"); - assert_eq!(served_snapshot["active_runs"][0]["phase"], "executing"); - assert_eq!(served_snapshot["active_runs"][0]["queue_lease_state"], "held"); - assert_eq!(served_snapshot["active_runs"][0]["execution_liveness"], "process_alive"); - assert_eq!( - served_snapshot["active_runs"][0]["child_agent_activity"]["buckets"][0]["name"], - "Browser/Image" - ); - assert_eq!( - served_snapshot["active_runs"][0]["child_agent_activity"]["input_tokens_max"], - 135_000 - ); - assert_eq!(served_snapshot["queued_candidates"], Value::Array(Vec::new())); - assert_eq!(served_snapshot["worktrees"][0]["worktree_path"], ".worktrees/PUB-101"); -} - -#[test] -fn operator_state_endpoint_serializes_closed_queue_classification() { - let snapshot = OperatorStatusSnapshot { - project_id: String::from("pubfi"), - run_limit: 10, - warnings: Vec::new(), - connector_backoffs: Vec::new(), - projects: Vec::new(), - account_control: OperatorCodexAccountControlStatus { - mode: String::from("balanced"), - account_selector: None, - }, - accounts: Vec::new(), - active_runs: vec![], - recent_runs: vec![], - history_lanes: vec![], - queued_candidates: vec![orchestrator::OperatorQueuedIssueStatus { - issue_id: String::from("issue-closed"), - issue_identifier: String::from("PUB-104"), - title: String::from("Retire closed queue residue"), - state: String::from("Done"), - priority: Some(1), - created_at: String::from("2026-03-14T09:58:00Z"), - classification: String::from("closed"), - reason: String::from("terminal_state"), - attention: None, - blocker_identifiers: vec![], - }], - worktrees: vec![], - post_review_lanes: vec![], - }; - let snapshot_json = serde_json::to_vec(&snapshot).expect("snapshot json should serialize"); - let response = String::from_utf8( - orchestrator::build_operator_state_http_response( - format!( - "GET {} HTTP/1.1\r\nHost: localhost\r\nConnection: close\r\n\r\n", - orchestrator::OPERATOR_STATE_ENDPOINT_PATH - ) - .as_bytes(), - Some(snapshot_json.as_slice()), - OperatorSnapshotReadiness::Ready, - ) - .expect("response build should succeed"), - ) - .expect("response should be utf-8"); - let body = response.split_once("\r\n\r\n").expect("response should contain a body").1; - let served_snapshot: Value = serde_json::from_str(body).expect("body should be valid json"); - - assert_eq!(served_snapshot["queued_candidates"][0]["classification"], "closed"); - assert_eq!(served_snapshot["queued_candidates"][0]["reason"], "terminal_state"); -} - #[test] fn operator_state_endpoint_serves_dashboard_html_from_root_and_dashboard_route() { for path in [ @@ -169,8 +12,6 @@ fn operator_state_endpoint_serves_dashboard_html_from_root_and_dashboard_route() orchestrator::build_operator_state_http_response( format!("GET {path} HTTP/1.1\r\nHost: localhost\r\nConnection: close\r\n\r\n") .as_bytes(), - None, - OperatorSnapshotReadiness::SnapshotUnavailable, ) .expect("dashboard response should build"), ) @@ -200,7 +41,7 @@ fn operator_state_endpoint_serves_dashboard_html_from_root_and_dashboard_route() assert!(response.contains("notice-dock")); assert!(response.contains("Notices")); assert!(response.contains("notice-panel")); - assert!(response.contains("State fetch")); + assert!(response.contains("Snapshot stream")); assert!(response.contains("Snapshot warning")); assert!(response.contains("Tracker sync paused")); assert!(response.contains("connector_backoffs")); @@ -257,7 +98,7 @@ fn operator_state_endpoint_serves_dashboard_html_from_root_and_dashboard_route() assert!(!response.contains("Intake Pressure")); assert!(!response.contains("Landing Readiness")); - assert_dashboard_html_control_surface(response.as_str()); + assert_dashboard_html_control_surface(response.as_str()); assert!(!response.contains("Last updated: none")); assert!(!response.contains("Auto-refresh")); @@ -273,8 +114,6 @@ fn operator_state_endpoint_serves_dashboard_html_from_root_and_dashboard_route() fn assert_dashboard_html_control_surface(response: &str) { for required in [ - "/state", - "/readyz", "/dashboard/control", "WebSocket", "applyDashboardRunActivity", @@ -287,6 +126,8 @@ fn assert_dashboard_html_control_surface(response: &str) { assert!(response.contains(required), "missing required dashboard control marker `{required}`"); } for forbidden in [ + "/state", + "/readyz", "data-dashboard-control=\"focusProject\"", "data-dashboard-control=\"focusRun\"", "data-dashboard-control=\"pauseProject\"", @@ -312,8 +153,6 @@ fn operator_dashboard_uses_decodex_brand_icons() { orchestrator::OPERATOR_DASHBOARD_ENDPOINT_PATH ) .as_bytes(), - None, - OperatorSnapshotReadiness::SnapshotUnavailable, ) .expect("dashboard response should build"), ) @@ -338,8 +177,6 @@ fn operator_state_endpoint_serves_decodex_brand_assets() { let response = orchestrator::build_operator_state_http_response( format!("GET {path} HTTP/1.1\r\nHost: localhost\r\nConnection: close\r\n\r\n") .as_bytes(), - None, - OperatorSnapshotReadiness::SnapshotUnavailable, ) .expect("asset response should build"); let header_end = response @@ -365,8 +202,6 @@ fn operator_state_endpoint_rejects_dashboard_websocket_without_upgrade() { orchestrator::OPERATOR_DASHBOARD_WS_ENDPOINT_PATH ) .as_bytes(), - None, - OperatorSnapshotReadiness::SnapshotUnavailable, ) .expect("dashboard websocket response should build"), ) @@ -395,7 +230,6 @@ fn operator_dashboard_websocket_pushes_broadcast_events() { &server_snapshot, &server_dashboard_events, &server_state_store, - Duration::from_secs(30), ) .expect("websocket handler should complete after client disconnect"); }); @@ -446,12 +280,63 @@ fn operator_dashboard_websocket_pushes_broadcast_events() { server.join().expect("server thread should complete"); } +#[test] +fn operator_dashboard_websocket_sends_current_snapshot_on_connect() { + const SNAPSHOT_UNIX_EPOCH: i64 = 1_774_000_000; + + let listener = TcpListener::bind("127.0.0.1:0").expect("listener should bind"); + let address = listener.local_addr().expect("listener address should resolve"); + let snapshot = Arc::new(Mutex::new(PublishedOperatorSnapshot { + snapshot_json: Some(br#"{"project_id":"pubfi","active_runs":[]}"#.to_vec()), + last_publish_unix_epoch: Some(SNAPSHOT_UNIX_EPOCH), + })); + let state_store = Arc::new(StateStore::open_in_memory().expect("state store should open")); + let dashboard_events = DashboardEventHub::default(); + let server_snapshot = Arc::clone(&snapshot); + let server_state_store = Arc::clone(&state_store); + let server_dashboard_events = dashboard_events.clone(); + let server = thread::spawn(move || { + let (stream, _) = listener.accept().expect("listener should accept a connection"); + + orchestrator::handle_operator_state_endpoint_connection( + stream, + &server_snapshot, + &server_dashboard_events, + &server_state_store, + ) + .expect("websocket handler should complete after client disconnect"); + }); + let (mut client, response, mut frame) = open_dashboard_websocket_client(address); + + assert!(response.starts_with("HTTP/1.1 101 Switching Protocols\r\n")); + + let initial_snapshot = read_websocket_json_until(&mut client, &mut frame, |payload| { + payload["type"] == "snapshot" + }); + + assert_eq!( + initial_snapshot["payload"]["snapshotPublishedAtUnixEpoch"], + SNAPSHOT_UNIX_EPOCH + ); + assert_eq!(initial_snapshot["payload"]["snapshot"]["project_id"], "pubfi"); + + drop(client); + + dashboard_events.close_clients_for_test(); + server.join().expect("server thread should complete"); +} + #[test] fn operator_dashboard_websocket_accepts_subscription_and_project_pause_control() { let (_temp_dir, config, _workflow) = temp_project_layout(); let listener = TcpListener::bind("127.0.0.1:0").expect("listener should bind"); let address = listener.local_addr().expect("listener address should resolve"); - let snapshot = Arc::new(Mutex::new(PublishedOperatorSnapshot::default())); + let snapshot = Arc::new(Mutex::new(PublishedOperatorSnapshot { + snapshot_json: Some( + br#"{"project_id":"pubfi","active_runs":[],"account_control":{"mode":"balanced","account_selector":null}}"#.to_vec(), + ), + last_publish_unix_epoch: Some(1_774_000_000), + })); let state_store = Arc::new(StateStore::open_in_memory().expect("state store should open")); let registration = ProjectRegistration::from_config( config.service_id(), @@ -475,7 +360,6 @@ fn operator_dashboard_websocket_accepts_subscription_and_project_pause_control() &server_snapshot, &server_dashboard_events, &server_state_store, - Duration::from_secs(30), ) .expect("websocket handler should complete after client disconnect"); }); @@ -567,6 +451,16 @@ fn assert_dashboard_account_selection_controls( assert_eq!(account_ack["payload"]["accepted"], true); assert_eq!(account_ack["payload"]["status"], "fixed"); + + let account_snapshot = read_websocket_json_until(client, frame, |payload| { + payload["type"] == "snapshot" + && payload["payload"]["snapshot"]["account_control"]["mode"] == "fixed" + }); + + assert_eq!( + account_snapshot["payload"]["snapshot"]["account_control"]["account_selector"], + "copy@example.com" + ); assert_eq!( runtime::global_fixed_account_selector().expect("global account selector should read"), Some(String::from("copy@example.com")) @@ -590,6 +484,16 @@ fn assert_dashboard_account_selection_controls( assert_eq!(account_clear_ack["payload"]["accepted"], true); assert_eq!(account_clear_ack["payload"]["status"], "balanced"); + + let account_clear_snapshot = read_websocket_json_until(client, frame, |payload| { + payload["type"] == "snapshot" + && payload["payload"]["snapshot"]["account_control"]["mode"] == "balanced" + }); + + assert_eq!( + account_clear_snapshot["payload"]["snapshot"]["account_control"]["account_selector"], + Value::Null + ); assert_eq!( runtime::global_fixed_account_selector().expect("global account selector should read"), None @@ -614,7 +518,6 @@ fn operator_dashboard_websocket_controls_focus_and_clear_subscription() { &server_snapshot, &server_dashboard_events, &server_state_store, - Duration::from_secs(30), ) .expect("websocket handler should complete after client disconnect"); }); @@ -678,7 +581,6 @@ fn operator_dashboard_websocket_filters_run_activity_by_subscription() { &server_snapshot, &server_dashboard_events, &server_state_store, - Duration::from_secs(30), ) .expect("websocket handler should complete after client disconnect"); }); @@ -780,7 +682,6 @@ fn operator_dashboard_websocket_interrupt_control_stops_active_run_process() { &server_snapshot, &server_dashboard_events, &server_state_store, - Duration::from_secs(30), ) .expect("websocket handler should complete after client disconnect"); }); @@ -861,7 +762,6 @@ fn operator_dashboard_websocket_interrupt_control_reports_validation_errors() { &server_snapshot, &server_dashboard_events, &server_state_store, - Duration::from_secs(30), ) .expect("websocket handler should complete after client disconnect"); }); @@ -1156,14 +1056,9 @@ fn operator_dashboard_run_activity_event_summarizes_active_runs() { #[test] fn operator_state_endpoint_reads_complete_headers_before_parsing() { - const SNAPSHOT_UNIX_EPOCH: i64 = 1_774_000_000; - let listener = TcpListener::bind("127.0.0.1:0").expect("listener should bind"); let address = listener.local_addr().expect("listener address should resolve"); - let snapshot = Arc::new(Mutex::new(PublishedOperatorSnapshot { - snapshot_json: Some(br#"{"status":"ok"}"#.to_vec()), - last_publish_unix_epoch: Some(SNAPSHOT_UNIX_EPOCH), - })); + let snapshot = Arc::new(Mutex::new(PublishedOperatorSnapshot::default())); let state_store = Arc::new(StateStore::open_in_memory().expect("state store should open")); let server_snapshot = Arc::clone(&snapshot); let server_state_store = Arc::clone(&state_store); @@ -1176,29 +1071,25 @@ fn operator_state_endpoint_reads_complete_headers_before_parsing() { &server_snapshot, &dashboard_events, &server_state_store, - Duration::from_secs(30), ) .expect("handler should accept segmented headers"); }); let mut client = TcpStream::connect(address).expect("client should connect"); let mut response = String::new(); - client.write_all(b"GET /st").expect("client should write first request fragment"); + client.write_all(b"GET /dash").expect("client should write first request fragment"); thread::sleep(Duration::from_millis(10)); client - .write_all(b"ate HTTP/1.1\r\nHost: localhost\r\n\r\n") + .write_all(b"board HTTP/1.1\r\nHost: localhost\r\n\r\n") .expect("client should write second request fragment"); client.shutdown(Shutdown::Write).expect("client should close the request body stream"); client.read_to_string(&mut response).expect("client should read response"); server.join().expect("server thread should complete"); assert!(response.starts_with("HTTP/1.1 200 OK\r\n")); - assert!(response.contains(&format!( - "X-Decodex-Snapshot-Unix-Epoch: {SNAPSHOT_UNIX_EPOCH}\r\n" - ))); - assert!(response.ends_with("{\"status\":\"ok\"}")); + assert!(response.contains("Decodex")); } #[test] @@ -1233,41 +1124,42 @@ fn operator_state_endpoint_overlays_live_account_control_on_published_snapshot() last_publish_unix_epoch: Some(SNAPSHOT_UNIX_EPOCH), })); let state_store = Arc::new(StateStore::open_in_memory().expect("state store should open")); + let dashboard_events = DashboardEventHub::default(); let server_snapshot = Arc::clone(&snapshot); let server_state_store = Arc::clone(&state_store); + let server_dashboard_events = dashboard_events.clone(); let server = thread::spawn(move || { let (stream, _) = listener.accept().expect("listener should accept a connection"); - let dashboard_events = DashboardEventHub::default(); orchestrator::handle_operator_state_endpoint_connection( stream, &server_snapshot, - &dashboard_events, + &server_dashboard_events, &server_state_store, - Duration::from_secs(30), ) - .expect("handler should serve state"); + .expect("handler should serve websocket snapshot"); + }); + let (mut client, response, mut frame) = open_dashboard_websocket_client(address); + let served_snapshot = read_websocket_json_until(&mut client, &mut frame, |payload| { + payload["type"] == "snapshot" }); - let mut client = TcpStream::connect(address).expect("client should connect"); - let mut response = String::new(); - client - .write_all(b"GET /state HTTP/1.1\r\nHost: localhost\r\nConnection: close\r\n\r\n") - .expect("client should write state request"); - client.shutdown(Shutdown::Write).expect("client should close the request body stream"); - client.read_to_string(&mut response).expect("client should read response"); - server.join().expect("server thread should complete"); + assert!(response.starts_with("HTTP/1.1 101 Switching Protocols\r\n")); + assert_eq!(served_snapshot["payload"]["snapshot"]["account_control"]["mode"], "fixed"); + assert_eq!( + served_snapshot["payload"]["snapshot"]["account_control"]["account_selector"], + "copy@example.com" + ); + assert_eq!(served_snapshot["payload"]["snapshot"]["project_id"], "all"); + assert_eq!( + served_snapshot["payload"]["snapshotPublishedAtUnixEpoch"], + SNAPSHOT_UNIX_EPOCH + ); - let body = response.split_once("\r\n\r\n").expect("response should contain a body").1; - let served_snapshot: Value = serde_json::from_str(body).expect("body should be valid json"); + drop(client); - assert!(response.starts_with("HTTP/1.1 200 OK\r\n")); - assert_eq!(served_snapshot["account_control"]["mode"], "fixed"); - assert_eq!(served_snapshot["account_control"]["account_selector"], "copy@example.com"); - assert_eq!(served_snapshot["project_id"], "all"); - assert!(response.contains(&format!( - "X-Decodex-Snapshot-Unix-Epoch: {SNAPSHOT_UNIX_EPOCH}\r\n" - ))); + dashboard_events.close_clients_for_test(); + server.join().expect("server thread should complete"); } #[test] @@ -1296,7 +1188,6 @@ fn operator_state_endpoint_livez_ignores_poisoned_snapshot_lock() { &server_snapshot, &dashboard_events, &server_state_store, - Duration::from_secs(30), ) .expect("live probe should not require snapshot lock"); }); @@ -1321,7 +1212,7 @@ fn operator_state_endpoint_livez_ignores_poisoned_snapshot_lock() { } #[test] -fn operator_state_endpoint_serves_liveness_and_readiness_probes() { +fn operator_state_endpoint_serves_only_liveness_probe() { let live_response = String::from_utf8( orchestrator::build_operator_state_http_response( format!( @@ -1329,8 +1220,6 @@ fn operator_state_endpoint_serves_liveness_and_readiness_probes() { orchestrator::OPERATOR_LIVE_ENDPOINT_PATH ) .as_bytes(), - None, - OperatorSnapshotReadiness::SnapshotUnavailable, ) .expect("live response should build"), ) @@ -1338,81 +1227,23 @@ fn operator_state_endpoint_serves_liveness_and_readiness_probes() { assert!(live_response.starts_with("HTTP/1.1 200 OK\r\n")); assert!(live_response.ends_with("ok")); - - let ready_unavailable = String::from_utf8( - orchestrator::build_operator_state_http_response( - format!( - "GET {} HTTP/1.1\r\nHost: localhost\r\nConnection: close\r\n\r\n", - orchestrator::OPERATOR_READY_ENDPOINT_PATH - ) - .as_bytes(), - None, - OperatorSnapshotReadiness::SnapshotUnavailable, - ) - .expect("ready response should build"), - ) - .expect("ready response should be utf-8"); - - assert!(ready_unavailable.starts_with("HTTP/1.1 503 Service Unavailable\r\n")); - assert!(ready_unavailable.ends_with("snapshot_unavailable")); - - let ready_response = String::from_utf8( - orchestrator::build_operator_state_http_response( - format!( - "GET {} HTTP/1.1\r\nHost: localhost\r\nConnection: close\r\n\r\n", - orchestrator::OPERATOR_READY_ENDPOINT_PATH - ) - .as_bytes(), - Some(br#"{"status":"ok"}"#), - OperatorSnapshotReadiness::Ready, - ) - .expect("ready response should build"), - ) - .expect("ready response should be utf-8"); - - assert!(ready_response.starts_with("HTTP/1.1 200 OK\r\n")); - assert!(ready_response.ends_with("ready")); } #[test] -fn operator_state_endpoint_reports_stale_snapshots_as_not_ready() { - let stale_response = String::from_utf8( - orchestrator::build_operator_state_http_response( - format!( - "GET {} HTTP/1.1\r\nHost: localhost\r\nConnection: close\r\n\r\n", - orchestrator::OPERATOR_READY_ENDPOINT_PATH +fn operator_state_endpoint_rejects_removed_http_snapshot_routes() { + for removed_path in ["/state", "/readyz"] { + let response = String::from_utf8( + orchestrator::build_operator_state_http_response( + format!( + "GET {removed_path} HTTP/1.1\r\nHost: localhost\r\nConnection: close\r\n\r\n" + ) + .as_bytes(), ) - .as_bytes(), - Some(br#"{"status":"ok"}"#), - OperatorSnapshotReadiness::SnapshotStale, + .expect("removed route response should build"), ) - .expect("stale ready response should build"), - ) - .expect("stale ready response should be utf-8"); - - assert!(stale_response.starts_with("HTTP/1.1 503 Service Unavailable\r\n")); - assert!(stale_response.ends_with("snapshot_stale")); -} + .expect("removed route response should be utf-8"); -#[test] -fn operator_snapshot_readiness_handles_timestamp_edges() { - for (last_publish, now, threshold, expected) in [ - ( - Some(200), - 100, - Duration::from_secs(30), - OperatorSnapshotReadiness::SnapshotStale, - ), - ( - Some(100), - 101, - Duration::from_secs(u64::MAX), - OperatorSnapshotReadiness::Ready, - ), - ] { - assert_eq!( - orchestrator::operator_snapshot_readiness(last_publish, now, threshold), - expected - ); + assert!(response.starts_with("HTTP/1.1 404 Not Found\r\n")); + assert!(response.ends_with("not found")); } } diff --git a/apps/decodex/src/orchestrator/tests/operator/status/running_lanes.rs b/apps/decodex/src/orchestrator/tests/operator/status/running_lanes.rs index b6cd70c8..f8b0b99f 100644 --- a/apps/decodex/src/orchestrator/tests/operator/status/running_lanes.rs +++ b/apps/decodex/src/orchestrator/tests/operator/status/running_lanes.rs @@ -278,7 +278,7 @@ fn idle_operator_status_snapshot_has_no_runtime_or_recovery_noise() { assert_eq!( snapshot_json[field], serde_json::json!([]), - "idle /state field {field} should serialize as an empty array", + "idle operator snapshot field {field} should serialize as an empty array", ); } diff --git a/apps/decodex/src/orchestrator/tests/operator/status_support.rs b/apps/decodex/src/orchestrator/tests/operator/status_support.rs index 7277105e..205f72b0 100644 --- a/apps/decodex/src/orchestrator/tests/operator/status_support.rs +++ b/apps/decodex/src/orchestrator/tests/operator/status_support.rs @@ -1,9 +1,6 @@ use std::{panic, slice}; -use orchestrator::{ - OperatorPostReviewLaneStatus, OperatorQueuedIssueStatus, OperatorSnapshotReadiness, - OperatorWorktreeStatus, -}; +use orchestrator::{OperatorPostReviewLaneStatus, OperatorQueuedIssueStatus, OperatorWorktreeStatus}; use serde_json::Value; fn successful_linear_execution_history_comments(issue: &TrackerIssue) -> Vec { diff --git a/apps/decodex/src/orchestrator/types.rs b/apps/decodex/src/orchestrator/types.rs index 5adc2dd8..d8d4a0e8 100644 --- a/apps/decodex/src/orchestrator/types.rs +++ b/apps/decodex/src/orchestrator/types.rs @@ -515,8 +515,8 @@ struct ProjectDaemonRuntime { workflow_cache: Option, } -#[derive(Clone, Debug)] -struct TrackerConnectorBackoff { + #[derive(Clone, Debug)] + struct TrackerConnectorBackoff { until: Instant, reset_unix_epoch: i64, reset_source: &'static str, @@ -533,11 +533,7 @@ struct OperatorStateEndpoint { activity_thread: Option>, } impl OperatorStateEndpoint { - fn start( - listen_address: &str, - ready_stale_after: Duration, - state_store: Arc, - ) -> crate::prelude::Result { + fn start(listen_address: &str, state_store: Arc) -> crate::prelude::Result { let listener = TcpListener::bind(listen_address).map_err(|error| { eyre::eyre!("Failed to bind operator state endpoint on `{listen_address}`: {error}") })?; @@ -563,7 +559,6 @@ impl OperatorStateEndpoint { shared_snapshot, server_dashboard_events, server_state_store, - ready_stale_after, shutdown_rx, ); }); diff --git a/dev/operator-dashboard-mock.mjs b/dev/operator-dashboard-mock.mjs index 48422c64..5a11f7e0 100755 --- a/dev/operator-dashboard-mock.mjs +++ b/dev/operator-dashboard-mock.mjs @@ -7,17 +7,15 @@ import path from "node:path"; import { fileURLToPath } from "node:url"; const DEFAULT_LISTEN_ADDRESS = "127.0.0.1:57399"; -const DEFAULT_READY_STALE_SECONDS = 120; const repoRoot = path.resolve(path.dirname(fileURLToPath(import.meta.url)), ".."); function parseArgs(argv) { const options = { - authDir: null, - dashboardHtml: path.join(repoRoot, "src/orchestrator/operator_dashboard.html"), - listenAddress: DEFAULT_LISTEN_ADDRESS, - readyStaleSeconds: DEFAULT_READY_STALE_SECONDS, - }; + authDir: null, + dashboardHtml: path.join(repoRoot, "src/orchestrator/operator_dashboard.html"), + listenAddress: DEFAULT_LISTEN_ADDRESS, + }; for (let index = 0; index < argv.length; index += 1) { const arg = argv[index]; @@ -41,14 +39,6 @@ function parseArgs(argv) { options.authDir = path.join(process.env.HOME || ".", ".codex"); continue; } - if (arg === "--ready-stale-seconds") { - options.readyStaleSeconds = Number(requiredValue(argv, (index += 1), arg)); - if (!Number.isFinite(options.readyStaleSeconds) || options.readyStaleSeconds <= 0) { - throw new Error("--ready-stale-seconds must be a positive number"); - } - continue; - } - throw new Error(`Unknown argument: ${arg}`); } @@ -67,14 +57,13 @@ function requiredValue(argv, index, flag) { function printHelp() { console.log(`Usage: node dev/operator-dashboard-mock.mjs [options] -Serves the real operator dashboard HTML with a comprehensive mock /state payload. +Serves the real operator dashboard HTML with mock WebSocket snapshot and activity events. Options: --listen-address HOST:PORT Bind address (default ${DEFAULT_LISTEN_ADDRESS}) --dashboard-html PATH Dashboard HTML path --use-codex-auth Load auth*.json accounts from ~/.codex --codex-auth-dir DIR Load auth*.json accounts from DIR - --ready-stale-seconds N /readyz freshness window (default ${DEFAULT_READY_STALE_SECONDS}) -h, --help Show this help `); } @@ -855,30 +844,12 @@ async function main() { send(response, 200, "text/html; charset=utf-8", html); return; } - if (url.pathname === "/livez") { - send(response, 200, "text/plain; charset=utf-8", "ok"); - return; - } - if (url.pathname === "/readyz") { - const stale = nowUnix() - lastPublishedAt > options.readyStaleSeconds; - send( - response, - stale ? 503 : 200, - "text/plain; charset=utf-8", - stale ? "snapshot_stale" : "ready", - ); - return; - } - if (url.pathname === "/state") { - lastPublishedAt = nowUnix(); - const snapshot = buildSnapshot(staticAccounts, fixedAccountSelector); - send(response, 200, "application/json", JSON.stringify(snapshot), { - "X-Decodex-Snapshot-Unix-Epoch": String(lastPublishedAt), - }); - return; - } + if (url.pathname === "/livez") { + send(response, 200, "text/plain; charset=utf-8", "ok"); + return; + } - send(response, 404, "text/plain; charset=utf-8", "not found"); + send(response, 404, "text/plain; charset=utf-8", "not found"); } catch (error) { send(response, 500, "text/plain; charset=utf-8", error?.message || "mock server error"); } @@ -907,6 +878,30 @@ async function main() { "", ].join("\r\n"), ); + sendWebSocketJson(socket, { + type: "controlReady", + payload: { + supportedActions: [ + "subscribe", + "focus", + "clearFocus", + "pauseProject", + "resumeProject", + "interruptRun", + "selectAccount", + "clearAccountSelection", + "ack", + ], + subscription: {}, + }, + }); + sendWebSocketJson(socket, { + type: "snapshot", + payload: { + snapshot: buildSnapshot(staticAccounts, fixedAccountSelector), + snapshotPublishedAtUnixEpoch: lastPublishedAt, + }, + }); let buffered = Buffer.alloc(0); socket.on("data", (chunk) => { diff --git a/docs/reference/operator-control-plane.md b/docs/reference/operator-control-plane.md index c2229d3c..721f74c7 100644 --- a/docs/reference/operator-control-plane.md +++ b/docs/reference/operator-control-plane.md @@ -67,11 +67,12 @@ Use `decodex diagnose --json` when an agent needs the current handoff index dire ## Operator Dashboard Sections -The browser dashboard is a read-only view over the same operator snapshot served at -`GET /state`. +The browser dashboard is a local view over the operator snapshot delivered by the +`GET /dashboard/control` WebSocket after the page loads from `GET /` or +`GET /dashboard`. The dashboard header also shows the browser origin being viewed and, when the -served state response carries a publish timestamp, the relative age of that +WebSocket snapshot payload carries a publish timestamp, the relative age of that snapshot so an operator can catch stale tabs or an old listener port quickly. Because the runtime SQLite DB is authoritative, dashboard sections describe current @@ -97,18 +98,18 @@ state is, by itself, evidence that the Linear tracker or GitHub connector failed; confirm the central project registry and service queue label before treating it as a connector problem. -The browser dashboard reads the complete published state from `GET /state` and may -also keep a local WebSocket open at `GET /dashboard/control`. `/state` remains the -authoritative reconciliation snapshot; the WebSocket pushes Decodex-owned snapshot -and active-lane activity updates sooner than the polling interval, and accepts the -local dashboard control protocol. The current browser UI keeps live updates unscoped -and exposes only an explicit stop control for active lanes with a known live child -process; project watch, project pause/resume, and manual retry controls are -intentionally not shown. The stop control signals the recorded child process for that -run, marks the local attempt interrupted, and releases the local queue lease. `ack` is -dashboard-local acknowledgement only. The socket is not a browser connection to Codex -app-server, GitHub, or Linear, and it does not make high-frequency protocol activity -durable outside the local operator surface. +The browser dashboard reads the complete published state from the local +`GET /dashboard/control` WebSocket. That socket is the dashboard authority for +published snapshots, active-lane activity updates, and local dashboard control +acknowledgements; there is no separate HTTP snapshot polling route. The current +browser UI keeps live updates unscoped and exposes explicit stop controls for active +lanes with a known live child process plus account-pool selection controls; project +watch, project pause/resume, and manual retry controls are intentionally not shown. +The stop control signals the recorded child process for that run, marks the local +attempt interrupted, and releases the local queue lease. `ack` is dashboard-local +acknowledgement only. The socket is not a browser connection to Codex app-server, +GitHub, or Linear, and it does not make high-frequency protocol activity durable +outside the local operator surface. | Section | Meaning | | --- | --- | @@ -153,8 +154,8 @@ Worktree visibility follows the owning dashboard section: runtime owner is gone or cannot explain it as active, review/landing, or queued work. -Every `/state` worktree row includes an `ownership` and `ownership_reason` that -distinguishes active-lane ownership, post-review ownership, queued attention, and +Every operator snapshot worktree row includes an `ownership` and `ownership_reason` +that distinguishes active-lane ownership, post-review ownership, queued attention, and cleanup-only local retention. A `Recovery Worktrees` row tells the operator to inspect the local path and either clean it up or recover local-only changes; it is not, by itself, evidence that the SQLite runtime store lost an active lane. When the tracker @@ -231,11 +232,11 @@ rate-limited, or unavailable. - Connector failures should appear as typed health/backoff state, not raw API error blobs in the main layout. -- When a tracker connector enters backoff, `/state` includes a `connector_backoffs` - entry with the affected `project_id`, `connector`, `sync_phase`, `quota_class`, - `reset_at`, `reset_unix_epoch`, `retry_after_seconds`, and operator `next_action`. - Running lanes should still render from local runtime DB state while external sync - is paused. +- When a tracker connector enters backoff, the published operator snapshot includes a + `connector_backoffs` entry with the affected `project_id`, `connector`, + `sync_phase`, `quota_class`, `reset_at`, `reset_unix_epoch`, + `retry_after_seconds`, and operator `next_action`. Running lanes should still render + from local runtime DB state while external sync is paused. - Linear writes should stay coarse: one run-start ledger, material progress checkpoints, PR-ready/handoff, blocked/failed, landed, done, and cleanup summaries. - Fine-grained retry budgets, raw attempts, heartbeat, child buckets, token pressure, diff --git a/docs/reference/test-suite.md b/docs/reference/test-suite.md index 9fdbb6af..fbd2496b 100644 --- a/docs/reference/test-suite.md +++ b/docs/reference/test-suite.md @@ -74,8 +74,8 @@ large catch-all test file unless the behavior crosses several of these stages. | `apps/decodex/src/orchestrator/tests/operator/status/text.rs` | 4 | Human-readable operator status text | | `apps/decodex/src/orchestrator/tests/operator/status/publishing.rs` | 6 | Snapshot publishing, degraded observers, and tracker backoff | | `apps/decodex/src/orchestrator/tests/operator/status/queue.rs` | 8 | Intake queue classifications and shared-claim visibility | -| `apps/decodex/src/orchestrator/tests/operator/status/http.rs` | 10 | Operator `/state`, `/livez`, readiness, and dashboard route responses | -| `apps/decodex/src/orchestrator/tests/operator/status/dashboard.rs` | 3 | Dashboard client rendering contracts | +| `apps/decodex/src/orchestrator/tests/operator/status/http.rs` | 17 | Operator dashboard HTTP pages/assets, `/livez`, WebSocket control, and removed snapshot-route responses | +| `apps/decodex/src/orchestrator/tests/operator/status/dashboard.rs` | 33 | Dashboard client rendering contracts | | `apps/decodex/src/orchestrator/tests/review_landing/status_support.rs` | 0 | Shared Review & Landing status fixtures | | `apps/decodex/src/orchestrator/tests/review_landing/status_rows.rs` | 18 | Review & Landing status rows and handoff lineage | | `apps/decodex/src/orchestrator/tests/review_landing/orchestration.rs` | 12 | Review orchestration, admin merge, and repair routing | @@ -139,7 +139,7 @@ protect different contracts in retained review lanes, status rows, and cleanup f These areas should stay dense unless the implementation contract changes: - `operator/status/` covers operator-facing JSON, text, dashboard, `/livez`, and - readiness behavior. These tests are noisy but protect the local control-plane surface. + WebSocket behavior. These tests are noisy but protect the local control-plane surface. - `review_landing/status_rows.rs` keeps both descendant handoff and lineage-rewrite cases because the production branch distinguishes accepted ancestry from rejected rewrites. - `intake/candidate_selection.rs` keeps planner, dispatch policy, and block-reason cases diff --git a/docs/runbook/recover-review-handoff.md b/docs/runbook/recover-review-handoff.md index ff59f930..e27b1f43 100644 --- a/docs/runbook/recover-review-handoff.md +++ b/docs/runbook/recover-review-handoff.md @@ -3,7 +3,7 @@ Purpose: Diagnose and explicitly repair retained review lanes that are blocked by a missing runtime DB handoff marker. -Use this when: `decodex status`, `/state`, or the dashboard shows a `Review & Landing` +Use this when: `decodex status` or the dashboard shows a `Review & Landing` lane blocked with `missing_review_handoff_record`. Do not use this for: healthy PR handoffs, review repair, landing, closeout, cleanup-only diff --git a/docs/runbook/self-dogfood-pilot.md b/docs/runbook/self-dogfood-pilot.md index c0f9b676..47d9a71f 100644 --- a/docs/runbook/self-dogfood-pilot.md +++ b/docs/runbook/self-dogfood-pilot.md @@ -84,7 +84,7 @@ project directory: - `decodex project list` should show the project config as `~/.codex/decodex/projects//project.toml`. -- `GET /state` or the operator UI should show no project backed by a flat `*.toml` +- `decodex status --json` or the operator UI should show no project backed by a flat `*.toml` config path inside a checkout or lane worktree. Runtime state now lives in the Decodex-owned SQLite database at `~/.codex/decodex/runtime.sqlite3`, and logs live under `~/.codex/decodex/logs/`. On restart, `decodex` reloads retained worktree knowledge and active-lane recovery intent from that database, then refreshes low-frequency Linear and GitHub state as connector budgets allow. @@ -321,21 +321,20 @@ wants to observe the self-bootstrap loop without reading source code. lsof -nP -iTCP:8912 -sTCP:LISTEN ps -p "$SERVE_PID" -o pid,lstart,command curl -fsS http://127.0.0.1:8912/livez - curl -fsS http://127.0.0.1:8912/readyz ``` Use the port from the active `--listen-address` if it is not `127.0.0.1:8912`. Treat a missing listener, a binary revision that does not match the current landed `HEAD`, or a `decodex serve` start time older than the latest runtime or dashboard landing as stale evidence. Restart `decodex serve` after reinstalling or after any - runtime/UI land, then rerun `/livez` and `/readyz` on the same port before applying - new queue labels. A browser tab left open on an old port is not evidence for the - current serve process. + runtime/UI land, then rerun `/livez` on the same port and reload the dashboard + before applying new queue labels. A browser tab left open on an old port is not + evidence for the current serve process. Installed-dashboard smoke checklist: after `decodex --version` matches the short `HEAD`, restart `decodex serve` from that installed binary. Verify `GET /livez` - and `GET /readyz` pass, then confirm `GET /state` and the browser dashboard agree - on project registration and visible lane counts before queueing work. + passes, then confirm `decodex status --json` and the browser dashboard agree on + project registration and visible lane counts before queueing work. 3. Confirm the installed binary can reach the Codex app-server boundary: @@ -419,13 +418,13 @@ wants to observe the self-bootstrap loop without reading source code. Restart observation checkpoint after a runtime fix: - Restart `decodex serve` from the same registered project setup and confirm - `GET /readyz` reports a fresh operator snapshot. If readiness is stale or missing, - wait one poll tick; if it stays stale, stop before queueing new issues. - - Check `GET /state` before applying new `decodex:queued:decodex` labels. The - snapshot should show the intended `decodex` project registration, no previous - completed lane counted as running capacity, and retained or recovery worktree - entries only when they correspond to a live PR, review, landing, or recovery - state. + `GET /livez` responds. Reload the dashboard and wait for a WebSocket snapshot; if + it stays missing after one poll tick, stop before queueing new issues. + - Check `decodex status --json` before applying new `decodex:queued:decodex` + labels. The snapshot should show the intended `decodex` project registration, no + previous completed lane counted as running capacity, and retained or recovery + worktree entries only when they correspond to a live PR, review, landing, or + recovery state. - In the dashboard, confirm `Projects`, `Intake Queue`, `Running Lanes`, `Review & Landing`, `Recovery Worktrees`, and `Run Ledger` agree with the same snapshot: the landed fix is visible through the latest run history, no stale @@ -435,17 +434,19 @@ wants to observe the self-bootstrap loop without reading source code. Post-land self-bootstrap observation checklist: - - Check `GET /readyz` after the landed runtime fix is running again. The snapshot - should be fresh; if it stays stale after one poll tick, stop the demo loop. - - Check `GET /state` before queueing follow-up work. Active and post-review counts - should match the visible `Running Lanes` and `Review & Landing` rows. + - Reload the dashboard after the landed runtime fix is running again. The WebSocket + snapshot should be fresh; if it stays stale after one poll tick, stop the demo + loop. + - Check `decodex status --json` before queueing follow-up work. Active and + post-review counts should match the visible `Running Lanes` and `Review & Landing` + rows. - Watch host CPU while the landed lane drains. CPU should return to the expected idle range after closeout instead of staying pinned with zero active work. - `Recovery Worktrees` should be empty except cleanup-only retained worktrees for landed or closed lanes waiting deterministic removal. - - Stop before applying new queue labels if `/state` disagrees with the dashboard, - active or post-review counts are inflated, CPU stays elevated, or retained rows - lack a cleanup-only reason. + - Stop before applying new queue labels if `decodex status --json` disagrees with + the dashboard, active or post-review counts are inflated, CPU stays elevated, or + retained rows lack a cleanup-only reason. Concurrent self-bootstrap observation checklist: @@ -498,8 +499,8 @@ wants to observe the self-bootstrap loop without reading source code. timing when the ledger recorded those fields; raw attempts and heartbeat details stay in the expanded debug view. To validate rich local closeout persistence after a lane completes, keep - `decodex serve --listen-address 127.0.0.1:8912` running and inspect the served - `GET /state` snapshot or the 8912 dashboard `Run Ledger`, not a replay of Linear + `decodex serve --listen-address 127.0.0.1:8912` running and inspect + `decodex status --json` or the 8912 dashboard `Run Ledger`, not a replay of Linear comments. The completed row should still expose the local `run_id`, attempt number, lifecycle status, PR or landing reference, closeout or attention reason, elapsed timing, and debug-only raw attempt or heartbeat details after the lane @@ -509,7 +510,7 @@ wants to observe the self-bootstrap loop without reading source code. real failed or interrupted retry. Use the post-XY-370 installed-binary docs-only canary, such as XY-371, to verify that this clean closeout path preserves the original handoff `run_id` with - `attempt_number = 1` and does not add an interrupted attempt to `GET /state` + `attempt_number = 1` and does not add an interrupted attempt to status/dashboard history. If closeout reports `attempt-2` without a real failed or interrupted retry, record that as failed canary evidence. Record the PR URL, merge commit, and closeout identity in Linear before treating the canary as healthy. @@ -569,11 +570,11 @@ Decodex is intentionally Unix-only, and the control plane relies on Unix file-de decodex serve --interval 60s --listen-address 127.0.0.1:8912 ``` -The listener serves one read-only operator console from the canonical `GET /` and `GET /dashboard` routes, the same JSON operator snapshot used by `cargo run -p decodex -- status --json` from `GET /state`, plus two tiny text probes on the same listener. The single console keeps `Projects`, `Running Lanes`, `Intake Queue`, `Review & Landing`, `Recovery Worktrees`, and `Run Ledger` visible together. Intake candidates that are already claimed by a running lane are shown as active queue echoes, capacity-bound candidates are shown as waiting rather than blocked, running lane worktrees stay with their owning lane, and retained/recovery worktrees remain folded until diagnostics are needed: +The listener serves the operator console from the canonical `GET /` and `GET /dashboard` routes, the same JSON operator snapshot used by `cargo run -p decodex -- status --json` through the `/dashboard/control` WebSocket, and the minimal `GET /livez` liveness probe on the same listener. The single console keeps `Projects`, `Running Lanes`, `Intake Queue`, `Review & Landing`, `Recovery Worktrees`, and `Run Ledger` visible together. Intake candidates that are already claimed by a running lane are shown as active queue echoes, capacity-bound candidates are shown as waiting rather than blocked, running lane worktrees stay with their owning lane, and retained/recovery worktrees remain folded until diagnostics are needed: - `GET /` or `GET /dashboard`: the same single-page operator console +- `GET /dashboard/control`: WebSocket transport for snapshots, live run activity, and local dashboard control acknowledgements - `GET /livez`: process-level liveness for the operator listener only -- `GET /readyz`: whether a current operator snapshot exists and is still fresh within the control-plane poll budget During `serve`, each poll tick now does two distinct things: diff --git a/docs/spec/runtime.md b/docs/spec/runtime.md index 8bafce69..35dca40c 100644 --- a/docs/spec/runtime.md +++ b/docs/spec/runtime.md @@ -391,9 +391,9 @@ After a process restart, recent-run history, active lease ownership, retained po - During an active run, operator snapshots must expose `thread_id` as soon as the Codex thread exists, plus monotonically advancing `event_count`, `last_event_type`, and `last_event_at` once protocol events are recorded. These fields may be hydrated either from the current process journal or from the active lane's `.decodex-run-activity` marker when `status` is running in a separate process. - `thread_id = null` is expected only before the worker creates the Codex thread for the current run. `event_count = 0`, `last_event_type = null`, and `last_event_at = null` are expected only before the first protocol event for that same run. After the thread exists and protocol activity has started, those empty values indicate missing hydration rather than normal progress. - Operator snapshots may expose an additive `protocol_activity` object derived from app-server structured messages for the current run. The object stays local/operator-only and should summarize turn status, waiting reason, rate-limit status, and a compact recent event list for high-value app-server activity such as `turn/started`, `turn/completed`, plan updates, diff updates, item start/completion, command output deltas, server request responses, account updates, and rate-limit updates. Missing `protocol_activity` means no structured summary was captured yet; consumers must continue to rely on the older `event_count`, `last_event_type`, `last_event_at`, thread fields, and `child_agent_activity` fields when it is absent. -- The operator snapshot transport must remain read-only. `decodex serve` exposes a human-facing read-only operator console from the canonical HTTP `GET /` and `GET /dashboard` routes, that same snapshot model from HTTP `GET /state`, plus minimal `GET /livez` and `GET /readyz` probes on the same listener. +- The operator snapshot transport must stay local/operator-only. `decodex serve` exposes the human-facing operator console from the canonical HTTP `GET /` and `GET /dashboard` routes, serves only the necessary dashboard assets and `GET /livez` liveness probe over HTTP, and delivers published snapshots, active-run activity, and dashboard control acknowledgements through the local `GET /dashboard/control` WebSocket upgrade. - `GET /livez` is only a process- and listener-level liveness probe. It must not claim control-plane tick freshness or forward progress by itself. -- `GET /readyz` must fail closed until a current operator snapshot has been published, and it must return to not-ready once that published snapshot is stale relative to the control-plane poll budget instead of remaining green forever after the first successful tick. +- The dashboard must not depend on a separate HTTP snapshot or readiness endpoint; snapshot freshness belongs to the WebSocket-delivered snapshot payload and the browser connection state. - Reconciliation must mark locally active run attempts as `interrupted` when their stale lease is cleared, `terminated` when the tracker issue is already terminal, or `succeeded` for the matching recovered review-handoff lease exception above.