diff --git a/apps/decodex/src/agent/app_server.rs b/apps/decodex/src/agent/app_server.rs index aa42fa58..13c666d1 100644 --- a/apps/decodex/src/agent/app_server.rs +++ b/apps/decodex/src/agent/app_server.rs @@ -3062,15 +3062,22 @@ fn wait_for_turn_completion( Some(target_turn_id), ), )?, - JsonRpcMessage::Response(_) | JsonRpcMessage::Error(_) => { - eyre::bail!( - "Received an unexpected JSON-RPC response while waiting for turn completion." - ); - }, + JsonRpcMessage::Response(_) => ignore_orphan_turn_json_rpc_response(), + JsonRpcMessage::Error(_) => reject_unexpected_turn_json_rpc_error()?, } } } +fn ignore_orphan_turn_json_rpc_response() { + tracing::debug!( + "Recorded and ignored orphan app-server JSON-RPC response while waiting for turn completion." + ); +} + +fn reject_unexpected_turn_json_rpc_error() -> crate::prelude::Result<()> { + eyre::bail!("Received an unexpected JSON-RPC error while waiting for turn completion."); +} + fn turn_wait_timeout_error( target_thread_id: &str, target_turn_id: &str, diff --git a/apps/decodex/src/agent/app_server/protocol.rs b/apps/decodex/src/agent/app_server/protocol.rs index 82ac4079..28d34124 100644 --- a/apps/decodex/src/agent/app_server/protocol.rs +++ b/apps/decodex/src/agent/app_server/protocol.rs @@ -4,23 +4,23 @@ use std::{ time::Duration, }; -use serde::{Deserialize, Serialize}; +use serde::{Deserialize, Deserializer, Serialize, de::Error}; use serde_json::Value; -use crate::{ - agent::{ - app_server::REQUEST_TIMEOUT, - json_rpc::{AppServerProcessEnv, JsonRpcConnection, JsonRpcRequest, WireMessage}, - tracker_tool_bridge::{DynamicToolCallResponse, DynamicToolHandler, DynamicToolSpec}, - }, - prelude::Result, +use crate::agent::{ + app_server::REQUEST_TIMEOUT, + json_rpc::{AppServerProcessEnv, JsonRpcConnection, JsonRpcRequest, WireMessage}, + tracker_tool_bridge::{DynamicToolCallResponse, DynamicToolHandler, DynamicToolSpec}, }; pub(super) struct AppServerClient { pub(super) connection: JsonRpcConnection, } impl AppServerClient { - pub(super) fn spawn(listen: &str, process_env: &AppServerProcessEnv) -> Result { + pub(super) fn spawn( + listen: &str, + process_env: &AppServerProcessEnv, + ) -> crate::prelude::Result { Ok(Self { connection: JsonRpcConnection::spawn_app_server(listen, process_env)? }) } @@ -28,7 +28,7 @@ impl AppServerClient { pub(super) fn initialize( &mut self, enable_experimental_api: bool, - ) -> Result { + ) -> crate::prelude::Result { self.initialize_with_handler(enable_experimental_api, |_connection, _message, request| { color_eyre::eyre::bail!( "Unexpected inbound JSON-RPC request `{}` while waiting for `initialize`.", @@ -41,9 +41,13 @@ impl AppServerClient { &mut self, enable_experimental_api: bool, handler: H, - ) -> Result + ) -> crate::prelude::Result where - H: FnMut(&mut JsonRpcConnection, &WireMessage, &JsonRpcRequest) -> Result<()>, + H: FnMut( + &mut JsonRpcConnection, + &WireMessage, + &JsonRpcRequest, + ) -> crate::prelude::Result<()>, { self.connection.request_with_handler( "initialize", @@ -62,7 +66,7 @@ impl AppServerClient { ) } - pub(super) fn mark_initialized(&mut self) -> Result<()> { + pub(super) fn mark_initialized(&mut self) -> crate::prelude::Result<()> { self.connection.notify::("initialized", None) } @@ -70,9 +74,13 @@ impl AppServerClient { &mut self, params: LoginAccountParams, handler: H, - ) -> Result + ) -> crate::prelude::Result where - H: FnMut(&mut JsonRpcConnection, &WireMessage, &JsonRpcRequest) -> Result<()>, + H: FnMut( + &mut JsonRpcConnection, + &WireMessage, + &JsonRpcRequest, + ) -> crate::prelude::Result<()>, { self.connection.request_with_handler( "account/login/start", @@ -86,7 +94,7 @@ impl AppServerClient { pub(super) fn start_thread( &mut self, params: ThreadStartRequest, - ) -> Result { + ) -> crate::prelude::Result { self.start_thread_with_handler(params, |_connection, _message, request| { color_eyre::eyre::bail!( "Unexpected inbound JSON-RPC request `{}` while waiting for `thread/start`.", @@ -99,9 +107,13 @@ impl AppServerClient { &mut self, params: ThreadStartRequest, handler: H, - ) -> Result + ) -> crate::prelude::Result where - H: FnMut(&mut JsonRpcConnection, &WireMessage, &JsonRpcRequest) -> Result<()>, + H: FnMut( + &mut JsonRpcConnection, + &WireMessage, + &JsonRpcRequest, + ) -> crate::prelude::Result<()>, { self.connection.request_with_handler("thread/start", ¶ms, REQUEST_TIMEOUT, handler) } @@ -110,7 +122,7 @@ impl AppServerClient { pub(super) fn resume_thread( &mut self, params: ThreadResumeRequest, - ) -> Result { + ) -> crate::prelude::Result { self.resume_thread_with_handler(params, |_connection, _message, request| { color_eyre::eyre::bail!( "Unexpected inbound JSON-RPC request `{}` while waiting for `thread/resume`.", @@ -123,9 +135,13 @@ impl AppServerClient { &mut self, params: ThreadResumeRequest, handler: H, - ) -> Result + ) -> crate::prelude::Result where - H: FnMut(&mut JsonRpcConnection, &WireMessage, &JsonRpcRequest) -> Result<()>, + H: FnMut( + &mut JsonRpcConnection, + &WireMessage, + &JsonRpcRequest, + ) -> crate::prelude::Result<()>, { self.connection.request_with_handler("thread/resume", ¶ms, REQUEST_TIMEOUT, handler) } @@ -133,12 +149,15 @@ impl AppServerClient { pub(super) fn archive_thread( &mut self, params: ThreadArchiveRequest, - ) -> Result { + ) -> crate::prelude::Result { self.connection.request("thread/archive", ¶ms, REQUEST_TIMEOUT) } #[allow(dead_code)] - pub(super) fn start_turn(&mut self, params: TurnStartRequest) -> Result { + pub(super) fn start_turn( + &mut self, + params: TurnStartRequest, + ) -> crate::prelude::Result { self.start_turn_with_handler(params, |_connection, _message, request| { color_eyre::eyre::bail!( "Unexpected inbound JSON-RPC request `{}` while waiting for `turn/start`.", @@ -151,9 +170,13 @@ impl AppServerClient { &mut self, params: TurnStartRequest, handler: H, - ) -> Result + ) -> crate::prelude::Result where - H: FnMut(&mut JsonRpcConnection, &WireMessage, &JsonRpcRequest) -> Result<()>, + H: FnMut( + &mut JsonRpcConnection, + &WireMessage, + &JsonRpcRequest, + ) -> crate::prelude::Result<()>, { self.connection.request_with_handler("turn/start", ¶ms, REQUEST_TIMEOUT, handler) } @@ -161,21 +184,27 @@ impl AppServerClient { pub(super) fn command_exec( &mut self, params: &CommandExecParams, - ) -> Result { + ) -> crate::prelude::Result { self.connection.request("command/exec", params, params.request_timeout()) } - pub(super) fn read_config(&mut self, params: &ConfigReadParams) -> Result { + pub(super) fn read_config( + &mut self, + params: &ConfigReadParams, + ) -> crate::prelude::Result { self.connection.request("config/read", params, REQUEST_TIMEOUT) } - pub(super) fn list_models(&mut self, params: &ModelListParams) -> Result { + pub(super) fn list_models( + &mut self, + params: &ModelListParams, + ) -> crate::prelude::Result { self.connection.request("model/list", params, REQUEST_TIMEOUT) } pub(super) fn read_model_provider_capabilities( &mut self, - ) -> Result { + ) -> crate::prelude::Result { self.connection.request( "modelProvider/capabilities/read", &ModelProviderCapabilitiesReadParams {}, @@ -183,11 +212,17 @@ impl AppServerClient { ) } - pub(super) fn list_skills(&mut self, params: &SkillsListParams) -> Result { + pub(super) fn list_skills( + &mut self, + params: &SkillsListParams, + ) -> crate::prelude::Result { self.connection.request("skills/list", params, REQUEST_TIMEOUT) } - pub(super) fn list_plugins(&mut self, params: &PluginListParams) -> Result { + pub(super) fn list_plugins( + &mut self, + params: &PluginListParams, + ) -> crate::prelude::Result { self.connection.request("plugin/list", params, REQUEST_TIMEOUT) } @@ -195,16 +230,19 @@ impl AppServerClient { &mut self, params: &ListMcpServerStatusParams, timeout: Duration, - ) -> Result { + ) -> crate::prelude::Result { self.connection.request("mcpServerStatus/list", params, timeout) } - pub(super) fn recv(&mut self, timeout: Option) -> Result { + pub(super) fn recv( + &mut self, + timeout: Option, + ) -> crate::prelude::Result { self.connection.recv(timeout) } #[allow(dead_code)] - pub(super) fn respond(&mut self, id: &Value, result: &R) -> Result<()> + pub(super) fn respond(&mut self, id: &Value, result: &R) -> crate::prelude::Result<()> where R: Serialize, { @@ -212,7 +250,12 @@ impl AppServerClient { } #[allow(dead_code)] - pub(super) fn respond_error(&mut self, id: &Value, code: i64, message: &str) -> Result<()> { + pub(super) fn respond_error( + &mut self, + id: &Value, + code: i64, + message: &str, + ) -> crate::prelude::Result<()> { self.connection.respond_error(id, code, message) } @@ -596,15 +639,36 @@ pub(super) struct TurnStatusPayload { pub(super) error: Option, } -#[derive(Debug, Deserialize)] +#[derive(Debug)] pub(super) struct TurnError { pub(super) message: String, - #[serde(rename = "codexErrorInfo")] pub(super) codex_error_info: Option, #[allow(dead_code)] - #[serde(rename = "additionalDetails")] pub(super) additional_details: Option, } +impl<'de> Deserialize<'de> for TurnError { + fn deserialize(deserializer: D) -> std::result::Result + where + D: Deserializer<'de>, + { + let value = Value::deserialize(deserializer)?; + let entries = value + .as_object() + .ok_or_else(|| Error::custom("expected app-server turn error object"))?; + let message = entries + .get("message") + .and_then(string_like_json_value) + .ok_or_else(|| Error::custom("expected app-server turn error message"))?; + let codex_error_info = entries + .get("codexErrorInfo") + .or_else(|| entries.get("codex_error_info")) + .and_then(string_like_json_value); + let additional_details = + entries.get("additionalDetails").or_else(|| entries.get("additional_details")).cloned(); + + Ok(Self { message, codex_error_info, additional_details }) + } +} #[derive(Debug, Eq, PartialEq, Deserialize)] #[serde(rename_all = "camelCase")] @@ -661,16 +725,40 @@ pub(super) struct TurnCompletedNotification { pub(super) turn: TurnStatusPayload, } -#[derive(Debug, Deserialize)] +#[derive(Debug)] pub(super) struct ErrorNotification { pub(super) error: TurnError, - #[serde(rename = "willRetry")] pub(super) will_retry: Option, - #[serde(rename = "threadId")] pub(super) thread_id: Option, - #[serde(rename = "turnId")] pub(super) turn_id: Option, } +impl<'de> Deserialize<'de> for ErrorNotification { + fn deserialize(deserializer: D) -> std::result::Result + where + D: Deserializer<'de>, + { + let value = Value::deserialize(deserializer)?; + let entries = value + .as_object() + .ok_or_else(|| Error::custom("expected app-server error notification object"))?; + let error_value = entries + .get("error") + .ok_or_else(|| Error::custom("expected app-server error notification error"))?; + let error = TurnError::deserialize(error_value.clone()).map_err(Error::custom)?; + let will_retry = + entries.get("willRetry").or_else(|| entries.get("will_retry")).and_then(Value::as_bool); + let thread_id = entries + .get("threadId") + .or_else(|| entries.get("thread_id")) + .and_then(string_like_json_value); + let turn_id = entries + .get("turnId") + .or_else(|| entries.get("turn_id")) + .and_then(string_like_json_value); + + Ok(Self { error, will_retry, thread_id, turn_id }) + } +} #[derive(Debug, Deserialize)] pub(super) struct DynamicToolCallParams { @@ -795,6 +883,25 @@ pub(super) enum UserInput { Text { text: String }, } +fn string_like_json_value(value: &Value) -> Option { + match value { + Value::String(text) if !text.is_empty() => Some(text.clone()), + Value::Number(number) => Some(number.to_string()), + Value::Bool(value) => Some(value.to_string()), + Value::Object(entries) => + ["message", "text", "id", "codexErrorInfo", "type", "kind", "code", "reason", "name"] + .iter() + .find_map(|key| entries.get(*key).and_then(string_like_json_value)) + .or_else(|| { + (entries.len() == 1) + .then(|| entries.values().next().and_then(string_like_json_value)) + .flatten() + }), + Value::Array(items) => items.iter().find_map(string_like_json_value), + _ => None, + } +} + fn externally_tagged_value_name(value: &Value) -> Option { match value { Value::String(value) if !value.is_empty() => Some(value.clone()), @@ -833,6 +940,31 @@ mod tests { assert_eq!(notification.will_retry, None); } + #[test] + fn error_notifications_accept_structured_string_fields() { + let notification: super::ErrorNotification = serde_json::from_value(serde_json::json!({ + "error": { + "message": { + "type": "streamDisconnected", + "message": "stream disconnected" + }, + "codexErrorInfo": { + "type": "transientNetworkError" + } + }, + "threadId": { "id": "thread-1" }, + "turnId": { "id": "turn-1" }, + "willRetry": true + })) + .expect("structured error notification should parse"); + + assert_eq!(notification.error.message, "stream disconnected"); + assert_eq!(notification.error.codex_error_info.as_deref(), Some("transientNetworkError")); + assert_eq!(notification.thread_id.as_deref(), Some("thread-1")); + assert_eq!(notification.turn_id.as_deref(), Some("turn-1")); + assert_eq!(notification.will_retry, Some(true)); + } + #[test] fn chatgpt_auth_tokens_login_uses_app_server_protocol_shape() { let value = serde_json::to_value(super::LoginAccountParams::ChatgptAuthTokens { diff --git a/apps/decodex/src/agent/app_server/tests.rs b/apps/decodex/src/agent/app_server/tests.rs index 0819ff79..a4cf273e 100644 --- a/apps/decodex/src/agent/app_server/tests.rs +++ b/apps/decodex/src/agent/app_server/tests.rs @@ -405,6 +405,166 @@ fn command_exec_health_check_uses_bounded_standalone_request() { assert!(value.get("permissionProfile").is_none()); } +#[test] +fn turn_completion_ignores_orphan_json_rpc_response() { + let temp_dir = TempDir::new().expect("tempdir should create"); + let worktree_path = temp_dir.path().join("worktree"); + let marker_path = temp_dir.path().join("activity"); + let fake_bin_dir = install_fake_codex_script(&temp_dir, orphan_response_fake_codex_script()); + let path_env = env::var("PATH").unwrap_or_default(); + let _path_guard = + TestEnvVarGuard::set("PATH", &format!("{}:{path_env}", fake_bin_dir.display())); + let state_store = StateStore::open_in_memory().expect("state store should open"); + let mut request = minimal_run_request(); + + fs::create_dir_all(&worktree_path).expect("worktree directory should create"); + + request.run_id = String::from("orphan-response-run"); + request.issue_id = String::from("orphan-response-issue"); + request.cwd = worktree_path.display().to_string(); + request.timeout = Duration::from_secs(5); + request.activity_marker_path = Some(marker_path.clone()); + + let result = super::execute_app_server_run(&request, &state_store) + .expect("orphan response during turn wait should not fail the run"); + + assert_eq!(result.thread_id, "thread-1"); + assert_eq!(result.turn_id, "turn-1"); + assert_eq!(result.final_output, "ORPHAN_OK"); + + let marker = state::read_run_activity_marker_snapshot(&marker_path) + .expect("marker snapshot should load") + .expect("marker snapshot should exist"); + let protocol_activity = + marker.protocol_activity().expect("protocol activity should be captured"); + + assert!(state_store.event_count(&request.run_id).expect("event count should load") > 0); + assert!( + protocol_activity.recent_events.iter().any(|event| event.event_type == "json-rpc/response") + ); + assert_eq!(marker.last_event_type(), Some("turn/completed")); +} + +fn install_fake_codex_script(temp_dir: &TempDir, script: &str) -> PathBuf { + let fake_bin_dir = temp_dir.path().join("fake-bin"); + let fake_codex_path = fake_bin_dir.join("codex"); + + fs::create_dir_all(&fake_bin_dir).expect("fake bin directory should create"); + fs::write(&fake_codex_path, script).expect("fake codex script should write"); + + let mut permissions = + fs::metadata(&fake_codex_path).expect("fake codex metadata should read").permissions(); + + #[cfg(unix)] + PermissionsExt::set_mode(&mut permissions, 0o755); + fs::set_permissions(&fake_codex_path, permissions) + .expect("fake codex script should be executable"); + + fake_bin_dir +} + +fn orphan_response_fake_codex_script() -> &'static str { + r#"#!/usr/bin/env python3 +import json +import os +import sys + +def send(value): + print(json.dumps(value), flush=True) + +for line in sys.stdin: + message = json.loads(line) + method = message.get("method") + message_id = message.get("id") + params = message.get("params") or {} + + def reply(result): + send({"id": message_id, "result": result}) + + if method == "initialize": + reply({ + "userAgent": "fake-codex", + "codexHome": os.environ["CODEX_HOME"], + "platformFamily": "unix", + "platformOs": "macos" + }) + elif method == "initialized": + continue + elif method == "config/read": + reply({"config": { + "model": "gpt-5.5", + "model_provider": "openai", + "approval_policy": {"type": "never"}, + "sandbox_mode": {"type": "dangerFullAccess"} + }}) + elif method == "model/list": + reply({"data": [{ + "id": "gpt-5.5", + "model": "gpt-5.5", + "displayName": "GPT-5.5", + "isDefault": True, + "hidden": False + }], "nextCursor": None}) + elif method == "modelProvider/capabilities/read": + reply({"imageGeneration": True, "namespaceTools": True, "webSearch": True}) + elif method == "skills/list": + cwd = params.get("cwds", [""])[0] + reply({"data": [{"cwd": cwd, "errors": [], "skills": [{ + "enabled": True, + "name": "fake-skill", + "scope": "user" + }]}]}) + elif method == "plugin/list": + reply({"marketplaces": [{"name": "fake", "plugins": [{ + "enabled": True, + "id": "fake-plugin", + "installed": True, + "name": "Fake Plugin" + }]}], "marketplaceLoadErrors": []}) + elif method == "mcpServerStatus/list": + reply({"data": [], "nextCursor": None}) + elif method == "thread/start": + cwd = params.get("cwd") + reply({ + "thread": {"id": "thread-1"}, + "model": "gpt-5.5", + "modelProvider": "openai", + "serviceTier": None, + "cwd": cwd, + "instructionSources": [], + "approvalPolicy": {"type": "never"}, + "approvalsReviewer": "user", + "sandbox": {"type": "dangerFullAccess"}, + "reasoningEffort": None + }) + elif method == "turn/start": + reply({"turn": {"id": "turn-1", "status": "running", "error": None}}) + send({"method": "thread/status/changed", "params": { + "threadId": "thread-1", + "status": {"type": "active", "activeFlags": []} + }}) + send({"method": "turn/started", "params": { + "threadId": "thread-1", + "turn": {"id": "turn-1", "status": "running", "error": None} + }}) + send({"id": 999, "result": {"late": True}}) + send({"method": "item/completed", "params": { + "threadId": "thread-1", + "turnId": "turn-1", + "item": {"type": "agentMessage", "text": "ORPHAN_OK"} + }}) + send({"method": "turn/completed", "params": { + "threadId": "thread-1", + "turn": {"id": "turn-1", "status": "completed", "error": None} + }}) + else: + send({"id": message_id, "error": { + "code": -32601, + "message": "unexpected method " + str(method) + }}) +"# +} + #[test] fn archive_thread_after_success_calls_app_server_archive_and_records_event() { let temp_dir = TempDir::new().expect("tempdir should create"); diff --git a/apps/decodex/src/agent/json_rpc.rs b/apps/decodex/src/agent/json_rpc.rs index 525e8cb6..c311c329 100644 --- a/apps/decodex/src/agent/json_rpc.rs +++ b/apps/decodex/src/agent/json_rpc.rs @@ -378,9 +378,20 @@ impl JsonRpcConnection { )); }, JsonRpcMessage::Request(request) => handle_request(self, &wire_message, request)?, - JsonRpcMessage::Response(_) | JsonRpcMessage::Error(_) => { + JsonRpcMessage::Response(response) => { + tracing::debug!( + method, + response_id = %response.id, + expected_id = %expected_id, + "Recorded and ignored orphan app-server JSON-RPC response while waiting for request." + ); + }, + JsonRpcMessage::Error(error) => { return Err(eyre::eyre!( - "Received an unexpected JSON-RPC response while waiting for `{method}`." + "Received an unexpected JSON-RPC error while waiting for `{method}`: id {} failed with {}: {}", + error.id, + error.error.code, + error.error.message )); }, } @@ -688,10 +699,9 @@ mod tests { path::PathBuf, process::{Command, Stdio}, sync::{Arc, Mutex, mpsc}, + time::Duration, }; - use serde_json::json; - use crate::agent::json_rpc::{ AppServerHomePreflightFailure, AppServerOutputTimeout, AppServerProcessEnv, AppServerTransportFailure, JsonRpcConnection, JsonRpcMessage, @@ -708,7 +718,7 @@ mod tests { match message.message { JsonRpcMessage::Notification(notification) => { assert_eq!(notification.method, "thread/status/changed"); - assert_eq!(notification.params["threadId"], json!("thread-1")); + assert_eq!(notification.params["threadId"], serde_json::json!("thread-1")); }, other => panic!("unexpected message: {other:?}"), } @@ -722,13 +732,31 @@ mod tests { match message.message { JsonRpcMessage::Response(response) => { - assert_eq!(response.id, json!(1)); - assert_eq!(response.result["userAgent"], json!("decodex-test")); + assert_eq!(response.id, serde_json::json!(1)); + assert_eq!(response.result["userAgent"], serde_json::json!("decodex-test")); }, other => panic!("unexpected message: {other:?}"), } } + #[test] + fn request_wait_ignores_orphan_response_before_expected_response() { + let mut connection = test_connection_with_messages([ + r#"{"id":99,"result":{"late":true}}"#, + r#"{"id":1,"result":{"ok":true}}"#, + ]); + let response: serde_json::Value = connection + .request_with_handler( + "thread/start", + &serde_json::json!({}), + Duration::from_secs(1), + |_, _, _| Ok(()), + ) + .expect("orphan response should not fail the pending request"); + + assert_eq!(response, serde_json::json!({"ok": true})); + } + #[test] fn app_server_command_inherits_noninteractive_git_environment() { let process_env = AppServerProcessEnv::with_github_credentials( @@ -936,4 +964,29 @@ mod tests { assert!(error.downcast_ref::().is_some()); } + + fn test_connection_with_messages(messages: [&str; N]) -> JsonRpcConnection { + let mut child = Command::new("sh") + .args(["-c", "cat >/dev/null"]) + .stdin(Stdio::piped()) + .stdout(Stdio::null()) + .stderr(Stdio::null()) + .spawn() + .expect("child process should spawn"); + let stdin = child.stdin.take().expect("child stdin should be captured"); + let (stdout_tx, stdout_rx) = mpsc::channel(); + + for message in messages { + stdout_tx.send(message.to_owned()).expect("test message should send"); + } + + JsonRpcConnection { + child, + stdin, + stdout_rx, + stderr_tail: Arc::new(Mutex::new(VecDeque::new())), + pending_messages: VecDeque::new(), + next_request_id: 1, + } + } }