Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 33 additions & 1 deletion apps/decodex/src/orchestrator/operator_http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -264,10 +264,14 @@ fn handle_operator_state_endpoint_connection(
.lock()
.map_err(|error| eyre::eyre!("Operator state snapshot lock poisoned: {error}"))?
.clone();
let snapshot_json = published_snapshot
.snapshot_json
.as_ref()
.map(|snapshot_json| snapshot_json_with_live_account_control(snapshot_json));

build_operator_state_http_response_for_route(
route,
published_snapshot.snapshot_json.as_deref(),
snapshot_json.as_deref(),
published_snapshot.last_publish_unix_epoch,
OperatorSnapshotReadiness::SnapshotUnavailable,
)
Expand All @@ -282,6 +286,34 @@ fn handle_operator_state_endpoint_connection(
Ok(())
}

fn snapshot_json_with_live_account_control(snapshot_json: &[u8]) -> Vec<u8> {
let Ok(mut snapshot) = serde_json::from_slice::<Value>(snapshot_json) else {
return snapshot_json.to_vec();
};
let Some(snapshot_object) = snapshot.as_object_mut() else {
return snapshot_json.to_vec();
};

if !snapshot_object.contains_key("account_control") {
return snapshot_json.to_vec();
}

let account_control = global_codex_account_control_status();

snapshot_object.insert(
String::from("account_control"),
json!({
"mode": account_control.mode,
"account_selector": account_control.account_selector,
}),
);

match serde_json::to_vec(&snapshot) {
Ok(output) => output,
Err(_) => snapshot_json.to_vec(),
}
}

fn handle_operator_dashboard_websocket_connection(
mut stream: TcpStream,
request: &[u8],
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
#[test]
fn control_plane_snapshot_lists_disabled_registered_projects() {
let (_temp_dir, config, _workflow) = temp_project_layout();
let (temp_dir, config, _workflow) = temp_project_layout();
let _home_guard =
TestEnvVarGuard::set("HOME", temp_dir.path().to_str().expect("home should be utf-8"));
let state_store = StateStore::open_in_memory().expect("state store should open");
let registration = ProjectRegistration::from_config(
config.service_id(),
Expand Down Expand Up @@ -32,8 +34,12 @@ fn control_plane_snapshot_lists_disabled_registered_projects() {

#[test]
fn control_plane_snapshot_aggregates_top_level_lanes_for_all_registered_projects() {
let (_active_temp_dir, active_config, _active_workflow) = temp_project_layout();
let (active_temp_dir, active_config, _active_workflow) = temp_project_layout();
let (_idle_temp_dir, idle_base_config, _idle_workflow) = temp_project_layout();
let _home_guard = TestEnvVarGuard::set(
"HOME",
active_temp_dir.path().to_str().expect("home should be utf-8"),
);
let state_store = StateStore::open_in_memory().expect("state store should open");
let issue = sample_issue_with_sort_fields(
"issue-active",
Expand Down
69 changes: 69 additions & 0 deletions apps/decodex/src/orchestrator/tests/operator/status/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1201,6 +1201,75 @@ fn operator_state_endpoint_reads_complete_headers_before_parsing() {
assert!(response.ends_with("{\"status\":\"ok\"}"));
}

#[test]
fn operator_state_endpoint_overlays_live_account_control_on_published_snapshot() {
const SNAPSHOT_UNIX_EPOCH: i64 = 1_774_000_000;

let temp_dir = TempDir::new().expect("temp dir should exist");
let _home_guard =
TestEnvVarGuard::set("HOME", temp_dir.path().to_str().expect("temp path should be UTF-8"));
let stale_snapshot = serde_json::json!({
"project_id": "all",
"account_control": {
"mode": "balanced",
"account_selector": null,
},
"accounts": [],
"active_runs": [],
"recent_runs": [],
"history_lanes": [],
"queued_candidates": [],
"worktrees": [],
"post_review_lanes": [],
});

runtime::write_global_fixed_account_selector(Some("copy@example.com"))
.expect("global account selector should write");

let listener = TcpListener::bind("127.0.0.1:0").expect("listener should bind");
let address = listener.local_addr().expect("listener address should resolve");
let snapshot = Arc::new(Mutex::new(PublishedOperatorSnapshot {
snapshot_json: Some(serde_json::to_vec(&stale_snapshot).expect("snapshot should serialize")),
last_publish_unix_epoch: Some(SNAPSHOT_UNIX_EPOCH),
}));
let state_store = Arc::new(StateStore::open_in_memory().expect("state store should open"));
let server_snapshot = Arc::clone(&snapshot);
let server_state_store = Arc::clone(&state_store);
let server = thread::spawn(move || {
let (stream, _) = listener.accept().expect("listener should accept a connection");
let dashboard_events = DashboardEventHub::default();

orchestrator::handle_operator_state_endpoint_connection(
stream,
&server_snapshot,
&dashboard_events,
&server_state_store,
Duration::from_secs(30),
)
.expect("handler should serve state");
});
let mut client = TcpStream::connect(address).expect("client should connect");
let mut response = String::new();

client
.write_all(b"GET /state HTTP/1.1\r\nHost: localhost\r\nConnection: close\r\n\r\n")
.expect("client should write state request");
client.shutdown(Shutdown::Write).expect("client should close the request body stream");
client.read_to_string(&mut response).expect("client should read response");
server.join().expect("server thread should complete");

let body = response.split_once("\r\n\r\n").expect("response should contain a body").1;
let served_snapshot: Value = serde_json::from_str(body).expect("body should be valid json");

assert!(response.starts_with("HTTP/1.1 200 OK\r\n"));
assert_eq!(served_snapshot["account_control"]["mode"], "fixed");
assert_eq!(served_snapshot["account_control"]["account_selector"], "copy@example.com");
assert_eq!(served_snapshot["project_id"], "all");
assert!(response.contains(&format!(
"X-Decodex-Snapshot-Unix-Epoch: {SNAPSHOT_UNIX_EPOCH}\r\n"
)));
}

#[test]
fn operator_state_endpoint_livez_ignores_poisoned_snapshot_lock() {
let listener = TcpListener::bind("127.0.0.1:0").expect("listener should bind");
Expand Down