diff --git a/apps/decodex/src/orchestrator/operator_http.rs b/apps/decodex/src/orchestrator/operator_http.rs index 234e0f98..89da2f7c 100644 --- a/apps/decodex/src/orchestrator/operator_http.rs +++ b/apps/decodex/src/orchestrator/operator_http.rs @@ -264,10 +264,14 @@ fn handle_operator_state_endpoint_connection( .lock() .map_err(|error| eyre::eyre!("Operator state snapshot lock poisoned: {error}"))? .clone(); + let snapshot_json = published_snapshot + .snapshot_json + .as_ref() + .map(|snapshot_json| snapshot_json_with_live_account_control(snapshot_json)); build_operator_state_http_response_for_route( route, - published_snapshot.snapshot_json.as_deref(), + snapshot_json.as_deref(), published_snapshot.last_publish_unix_epoch, OperatorSnapshotReadiness::SnapshotUnavailable, ) @@ -282,6 +286,34 @@ fn handle_operator_state_endpoint_connection( Ok(()) } +fn snapshot_json_with_live_account_control(snapshot_json: &[u8]) -> Vec { + let Ok(mut snapshot) = serde_json::from_slice::(snapshot_json) else { + return snapshot_json.to_vec(); + }; + let Some(snapshot_object) = snapshot.as_object_mut() else { + return snapshot_json.to_vec(); + }; + + if !snapshot_object.contains_key("account_control") { + return snapshot_json.to_vec(); + } + + let account_control = global_codex_account_control_status(); + + snapshot_object.insert( + String::from("account_control"), + json!({ + "mode": account_control.mode, + "account_selector": account_control.account_selector, + }), + ); + + match serde_json::to_vec(&snapshot) { + Ok(output) => output, + Err(_) => snapshot_json.to_vec(), + } +} + fn handle_operator_dashboard_websocket_connection( mut stream: TcpStream, request: &[u8], diff --git a/apps/decodex/src/orchestrator/tests/operator/status/control_plane.rs b/apps/decodex/src/orchestrator/tests/operator/status/control_plane.rs index bbfe06eb..a0ae94c6 100644 --- a/apps/decodex/src/orchestrator/tests/operator/status/control_plane.rs +++ b/apps/decodex/src/orchestrator/tests/operator/status/control_plane.rs @@ -1,6 +1,8 @@ #[test] fn control_plane_snapshot_lists_disabled_registered_projects() { - let (_temp_dir, config, _workflow) = temp_project_layout(); + let (temp_dir, config, _workflow) = temp_project_layout(); + let _home_guard = + TestEnvVarGuard::set("HOME", temp_dir.path().to_str().expect("home should be utf-8")); let state_store = StateStore::open_in_memory().expect("state store should open"); let registration = ProjectRegistration::from_config( config.service_id(), @@ -32,8 +34,12 @@ fn control_plane_snapshot_lists_disabled_registered_projects() { #[test] fn control_plane_snapshot_aggregates_top_level_lanes_for_all_registered_projects() { - let (_active_temp_dir, active_config, _active_workflow) = temp_project_layout(); + let (active_temp_dir, active_config, _active_workflow) = temp_project_layout(); let (_idle_temp_dir, idle_base_config, _idle_workflow) = temp_project_layout(); + let _home_guard = TestEnvVarGuard::set( + "HOME", + active_temp_dir.path().to_str().expect("home should be utf-8"), + ); let state_store = StateStore::open_in_memory().expect("state store should open"); let issue = sample_issue_with_sort_fields( "issue-active", diff --git a/apps/decodex/src/orchestrator/tests/operator/status/http.rs b/apps/decodex/src/orchestrator/tests/operator/status/http.rs index d480716b..5514bcce 100644 --- a/apps/decodex/src/orchestrator/tests/operator/status/http.rs +++ b/apps/decodex/src/orchestrator/tests/operator/status/http.rs @@ -1201,6 +1201,75 @@ fn operator_state_endpoint_reads_complete_headers_before_parsing() { assert!(response.ends_with("{\"status\":\"ok\"}")); } +#[test] +fn operator_state_endpoint_overlays_live_account_control_on_published_snapshot() { + const SNAPSHOT_UNIX_EPOCH: i64 = 1_774_000_000; + + let temp_dir = TempDir::new().expect("temp dir should exist"); + let _home_guard = + TestEnvVarGuard::set("HOME", temp_dir.path().to_str().expect("temp path should be UTF-8")); + let stale_snapshot = serde_json::json!({ + "project_id": "all", + "account_control": { + "mode": "balanced", + "account_selector": null, + }, + "accounts": [], + "active_runs": [], + "recent_runs": [], + "history_lanes": [], + "queued_candidates": [], + "worktrees": [], + "post_review_lanes": [], + }); + + runtime::write_global_fixed_account_selector(Some("copy@example.com")) + .expect("global account selector should write"); + + let listener = TcpListener::bind("127.0.0.1:0").expect("listener should bind"); + let address = listener.local_addr().expect("listener address should resolve"); + let snapshot = Arc::new(Mutex::new(PublishedOperatorSnapshot { + snapshot_json: Some(serde_json::to_vec(&stale_snapshot).expect("snapshot should serialize")), + last_publish_unix_epoch: Some(SNAPSHOT_UNIX_EPOCH), + })); + let state_store = Arc::new(StateStore::open_in_memory().expect("state store should open")); + let server_snapshot = Arc::clone(&snapshot); + let server_state_store = Arc::clone(&state_store); + let server = thread::spawn(move || { + let (stream, _) = listener.accept().expect("listener should accept a connection"); + let dashboard_events = DashboardEventHub::default(); + + orchestrator::handle_operator_state_endpoint_connection( + stream, + &server_snapshot, + &dashboard_events, + &server_state_store, + Duration::from_secs(30), + ) + .expect("handler should serve state"); + }); + let mut client = TcpStream::connect(address).expect("client should connect"); + let mut response = String::new(); + + client + .write_all(b"GET /state HTTP/1.1\r\nHost: localhost\r\nConnection: close\r\n\r\n") + .expect("client should write state request"); + client.shutdown(Shutdown::Write).expect("client should close the request body stream"); + client.read_to_string(&mut response).expect("client should read response"); + server.join().expect("server thread should complete"); + + let body = response.split_once("\r\n\r\n").expect("response should contain a body").1; + let served_snapshot: Value = serde_json::from_str(body).expect("body should be valid json"); + + assert!(response.starts_with("HTTP/1.1 200 OK\r\n")); + assert_eq!(served_snapshot["account_control"]["mode"], "fixed"); + assert_eq!(served_snapshot["account_control"]["account_selector"], "copy@example.com"); + assert_eq!(served_snapshot["project_id"], "all"); + assert!(response.contains(&format!( + "X-Decodex-Snapshot-Unix-Epoch: {SNAPSHOT_UNIX_EPOCH}\r\n" + ))); +} + #[test] fn operator_state_endpoint_livez_ignores_poisoned_snapshot_lock() { let listener = TcpListener::bind("127.0.0.1:0").expect("listener should bind");