From c14135eae1bd361a149c3c335d96788f9c8a5bf5 Mon Sep 17 00:00:00 2001 From: Clifford Ressel Date: Fri, 29 May 2026 16:07:27 -0400 Subject: [PATCH 1/4] refactor(goal): replace hand-rolled MCP stack with rmcp; rename nori-goal -> nori-client MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Replace the hand-rolled HTTP/1.1 server (thread_goal_http_mcp.rs, ~213 LoC) and hand-dispatched JSON-RPC (thread_goal_mcp.rs) with rmcp's spec-compliant StreamableHttpService and typed #[tool] handlers in nori_client_mcp.rs, served over a loopback axum listener. Rename the advertised MCP server from `nori-goal` to `nori-client` — Nori's general harness-side channel to the agent, not a goal-only surface. --- nori-rs/Cargo.lock | 1 + nori-rs/acp/Cargo.toml | 10 + nori-rs/acp/src/backend/mod.rs | 9 +- nori-rs/acp/src/backend/nori_client_mcp.rs | 494 ++++++++++++++++ nori-rs/acp/src/backend/session.rs | 6 +- .../acp/src/backend/session_runtime_driver.rs | 9 +- nori-rs/acp/src/backend/spawn_and_relay.rs | 2 +- nori-rs/acp/src/backend/tests/part5.rs | 33 +- .../acp/src/backend/thread_goal_http_mcp.rs | 213 ------- nori-rs/acp/src/backend/thread_goal_mcp.rs | 528 ------------------ 10 files changed, 540 insertions(+), 765 deletions(-) create mode 100644 nori-rs/acp/src/backend/nori_client_mcp.rs delete mode 100644 nori-rs/acp/src/backend/thread_goal_http_mcp.rs delete mode 100644 nori-rs/acp/src/backend/thread_goal_mcp.rs diff --git a/nori-rs/Cargo.lock b/nori-rs/Cargo.lock index d68cb9c19..770ed7667 100644 --- a/nori-rs/Cargo.lock +++ b/nori-rs/Cargo.lock @@ -3626,6 +3626,7 @@ version = "0.0.0" dependencies = [ "agent-client-protocol-schema 0.11.6", "anyhow", + "axum", "base64", "chrono", "codex-core", diff --git a/nori-rs/acp/Cargo.toml b/nori-rs/acp/Cargo.toml index 26a80f797..c33e69874 100644 --- a/nori-rs/acp/Cargo.toml +++ b/nori-rs/acp/Cargo.toml @@ -14,6 +14,16 @@ workspace = true [dependencies] agent-client-protocol-schema = { workspace = true, features = ["unstable"] } anyhow = { workspace = true } +axum = { workspace = true, default-features = false, features = [ + "http1", + "tokio", +] } +rmcp = { workspace = true, features = [ + "macros", + "schemars", + "server", + "transport-streamable-http-server", +] } sacp = { workspace = true } base64 = { workspace = true } codex-core = { workspace = true } diff --git a/nori-rs/acp/src/backend/mod.rs b/nori-rs/acp/src/backend/mod.rs index 6bf3766e1..5705180d5 100644 --- a/nori-rs/acp/src/backend/mod.rs +++ b/nori-rs/acp/src/backend/mod.rs @@ -290,10 +290,10 @@ pub struct AcpBackend { /// Persistent goal for this ACP session. thread_goal_state: Arc>, /// True after the active ACP agent has successfully opened the backend-owned - /// `nori-goal` MCP endpoint. + /// `nori-client` MCP endpoint. goal_mcp_connected: Arc, - /// Loopback HTTP server exposing the backend-owned `nori-goal` MCP tools. - goal_mcp_http_server: Arc>>, + /// Loopback HTTP server exposing the backend-owned `nori-client` MCP tools. + goal_mcp_http_server: Arc>>, /// Transcript recorder cell used by local MCP tools created before the /// recorder's session ID is known. transcript_recorder_cell: Arc>>>, @@ -348,14 +348,13 @@ pub struct AcpBackend { } mod helpers; +mod nori_client_mcp; mod session; pub(crate) mod session_reducer; mod session_runtime_driver; mod spawn_and_relay; mod submit_and_ops; mod thread_goal; -mod thread_goal_http_mcp; -mod thread_goal_mcp; mod user_input; mod user_shell; use helpers::get_op_name; diff --git a/nori-rs/acp/src/backend/nori_client_mcp.rs b/nori-rs/acp/src/backend/nori_client_mcp.rs new file mode 100644 index 000000000..5deafb965 --- /dev/null +++ b/nori-rs/acp/src/backend/nori_client_mcp.rs @@ -0,0 +1,494 @@ +//! The backend-owned `nori-client` MCP server. +//! +//! This is Nori's harness-side channel to the external ACP agent: the single +//! loopback MCP server through which Nori exposes harness-specific tooling that +//! the ACP protocol itself does not (yet) provide. Today it carries the goal +//! tools (`get_goal`/`create_goal`/`update_goal`); it is named `nori-client` +//! rather than `nori-goal` because it is the general point of contact, not a +//! goal-only surface. +//! +//! The transport is rmcp's spec-compliant streamable-HTTP server, served on a +//! loopback port. The tools are typed `#[tool]` handlers — there is no +//! hand-rolled HTTP or JSON-RPC framing here. + +use std::sync::Arc; +use std::sync::atomic::AtomicBool; +use std::sync::atomic::Ordering; + +use anyhow::Context as _; +use anyhow::Result; +use rmcp::ErrorData as McpError; +use rmcp::RoleServer; +use rmcp::ServerHandler; +use rmcp::handler::server::router::tool::ToolRouter; +use rmcp::handler::server::wrapper::Parameters; +use rmcp::model::CallToolResult; +use rmcp::model::Content; +use rmcp::model::Implementation; +use rmcp::model::InitializeRequestParam; +use rmcp::model::InitializeResult; +use rmcp::model::ProtocolVersion; +use rmcp::model::ServerCapabilities; +use rmcp::model::ServerInfo; +use rmcp::schemars; +use rmcp::service::RequestContext; +use rmcp::tool; +use rmcp::tool_handler; +use rmcp::tool_router; +use rmcp::transport::streamable_http_server::StreamableHttpService; +use rmcp::transport::streamable_http_server::session::local::LocalSessionManager; +use rmcp::transport::streamable_http_server::tower::StreamableHttpServerConfig; +use serde::Deserialize; +use tokio::net::TcpListener; +use tokio::sync::Mutex; + +use super::*; +use codex_protocol::protocol::ThreadGoalStatus; +use nori_protocol::ThreadGoalUpdated; + +const DUPLICATE_CREATE_GOAL_ERROR: &str = "cannot create a new goal because this thread already has a goal; use update_goal only when the existing goal is complete"; + +#[derive(Deserialize, schemars::JsonSchema)] +#[schemars(crate = "rmcp::schemars")] +#[serde(deny_unknown_fields)] +struct CreateGoalRequest { + /// Required. The concrete objective to start pursuing. This starts a new + /// active goal only when no goal is currently defined; if a goal already + /// exists, this tool fails. + objective: String, +} + +#[derive(Deserialize, schemars::JsonSchema)] +#[schemars(crate = "rmcp::schemars")] +#[serde(deny_unknown_fields)] +struct UpdateGoalRequest { + /// Required. Set to `complete` only when the objective is achieved and no + /// required work remains. Set to `blocked` only after the same blocking + /// condition has repeated and the agent is at an impasse. + status: GoalCompletion, +} + +/// The only goal transitions the agent may drive through `update_goal`. Other +/// statuses (active/paused/usage_limited/budget_limited) are owned by the user +/// or the backend, so they are intentionally not representable here. +#[derive(Deserialize, schemars::JsonSchema)] +#[schemars(crate = "rmcp::schemars")] +#[serde(rename_all = "lowercase")] +enum GoalCompletion { + Complete, + Blocked, +} + +impl From for ThreadGoalStatus { + fn from(value: GoalCompletion) -> Self { + match value { + GoalCompletion::Complete => ThreadGoalStatus::Complete, + GoalCompletion::Blocked => ThreadGoalStatus::Blocked, + } + } +} + +/// The Arcs the `nori-client` tools operate on, shared across every request the +/// streamable-HTTP transport spawns. Cloned (cheaply) into each per-session +/// [`NoriClientService`] by the service factory. +#[derive(Clone)] +pub(crate) struct NoriClientShared { + thread_goal_state: Arc>, + backend_event_tx: mpsc::Sender, + transcript_recorder: Arc>>>, + connected: Arc, +} + +impl NoriClientShared { + pub(crate) fn new( + thread_goal_state: Arc>, + backend_event_tx: mpsc::Sender, + transcript_recorder: Arc>>>, + connected: Arc, + ) -> Self { + Self { + thread_goal_state, + backend_event_tx, + transcript_recorder, + connected, + } + } +} + +/// One rmcp server handler instance. The transport constructs a fresh service +/// per session via the factory; all instances share the same [`NoriClientShared`] +/// Arcs, so goal state and the connected flag are global to the backend. +#[derive(Clone)] +pub(crate) struct NoriClientService { + shared: NoriClientShared, + tool_router: ToolRouter, +} + +#[tool_router] +impl NoriClientService { + fn new(shared: NoriClientShared) -> Self { + Self { + shared, + tool_router: Self::tool_router(), + } + } + + #[tool( + description = "Get the current goal for this thread, including status, token and elapsed-time usage." + )] + async fn get_goal(&self) -> Result { + let state = self.shared.thread_goal_state.lock().await; + let goal = state.snapshot(thread_goal::now_seconds()).map(goal_json); + Ok(tool_success(serde_json::json!({ "goal": goal }))) + } + + #[tool( + description = "Create a goal only when explicitly requested by the user or system/developer instructions; do not infer goals from ordinary tasks. Fails if a goal exists; use update_goal only for status." + )] + async fn create_goal( + &self, + Parameters(CreateGoalRequest { objective }): Parameters, + ) -> Result { + let objective = objective.trim().to_string(); + let result = { + let now = thread_goal::now_seconds(); + let mut state = self.shared.thread_goal_state.lock().await; + if state.snapshot(now).is_some() { + return Ok(tool_error(DUPLICATE_CREATE_GOAL_ERROR)); + } + state.set_objective(objective, Some(ThreadGoalStatus::Active), now) + }; + Ok(self.respond_with_goal(result).await) + } + + #[tool( + description = "Update the existing goal. Use this tool only to mark the goal achieved or genuinely blocked. Pause, resume, budget-limited, and usage-limited transitions are controlled by the user or system, not this tool." + )] + async fn update_goal( + &self, + Parameters(UpdateGoalRequest { status }): Parameters, + ) -> Result { + let result = { + let now = thread_goal::now_seconds(); + let mut state = self.shared.thread_goal_state.lock().await; + state.set_status(status.into(), now) + }; + Ok(self.respond_with_goal(result).await) + } + + /// Emit a `ThreadGoalUpdated` client event and render the tool reply for a + /// state mutation, or surface the validation error as a tool error. + async fn respond_with_goal( + &self, + result: Result, + ) -> CallToolResult { + match result { + Ok(goal) => { + let recorder = self.shared.transcript_recorder.lock().await.clone(); + emit_client_event( + &self.shared.backend_event_tx, + recorder.as_ref(), + ClientEvent::ThreadGoalUpdated(ThreadGoalUpdated { + goal: goal.clone().into_client_goal(), + }), + ) + .await; + tool_success(serde_json::json!({ "goal": goal_json(goal) })) + } + Err(err) => tool_error(err), + } + } +} + +#[tool_handler] +impl ServerHandler for NoriClientService { + fn get_info(&self) -> ServerInfo { + ServerInfo { + protocol_version: ProtocolVersion::V_2024_11_05, + capabilities: ServerCapabilities::builder().enable_tools().build(), + server_info: Implementation { + name: "nori-client".to_string(), + version: env!("CARGO_PKG_VERSION").to_string(), + ..Default::default() + }, + instructions: None, + } + } + + /// Mark the agent connected on initialize. This flag is the sole gate that + /// lets hidden goal continuations chain (see the safety invariant in + /// `session_runtime_driver.rs`): an agent that never opens this server can + /// never reach `update_goal`, so it must not be allowed to self-chain. + async fn initialize( + &self, + request: InitializeRequestParam, + context: RequestContext, + ) -> Result { + self.shared.connected.store(true, Ordering::Relaxed); + if context.peer.peer_info().is_none() { + context.peer.set_peer_info(request); + } + Ok(self.get_info()) + } +} + +fn tool_success(body: serde_json::Value) -> CallToolResult { + CallToolResult::success(vec![Content::text(body.to_string())]) +} + +fn tool_error(message: impl Into) -> CallToolResult { + CallToolResult::error(vec![Content::text(message.into())]) +} + +fn goal_json(goal: thread_goal::ThreadGoalSnapshot) -> serde_json::Value { + serde_json::json!({ + "objective": goal.objective, + "status": status_label(goal.status), + "tokens_used": goal.tokens_used, + "token_budget": null, + "tokens_remaining": null, + "time_used_seconds": goal.time_used_seconds, + "created_at": goal.created_at, + "updated_at": goal.updated_at, + }) +} + +fn status_label(status: ThreadGoalStatus) -> &'static str { + match status { + ThreadGoalStatus::Active => "active", + ThreadGoalStatus::Paused => "paused", + ThreadGoalStatus::Blocked => "blocked", + ThreadGoalStatus::UsageLimited => "usage_limited", + ThreadGoalStatus::BudgetLimited => "budget_limited", + ThreadGoalStatus::Complete => "complete", + } +} + +/// Loopback `nori-client` MCP server. Bound on first use and torn down on drop. +pub(crate) struct NoriClientServer { + url: String, + abort_handle: tokio::task::AbortHandle, +} + +impl NoriClientServer { + pub(crate) async fn spawn(shared: NoriClientShared) -> Result { + let listener = TcpListener::bind(("127.0.0.1", 0)) + .await + .context("failed to bind local nori-client MCP server")?; + let url = format!("http://{}/mcp", listener.local_addr()?); + + let service = StreamableHttpService::new( + move || Ok(NoriClientService::new(shared.clone())), + Arc::new(LocalSessionManager::default()), + StreamableHttpServerConfig { + stateful_mode: false, + sse_keep_alive: None, + ..Default::default() + }, + ); + let router = axum::Router::new().nest_service("/mcp", service); + let task = tokio::spawn(async move { + if let Err(err) = axum::serve(listener, router).await { + tracing::debug!("nori-client MCP server stopped: {err}"); + } + }); + + Ok(Self { + url, + abort_handle: task.abort_handle(), + }) + } + + pub(crate) fn url(&self) -> &str { + &self.url + } +} + +impl Drop for NoriClientServer { + fn drop(&mut self) { + self.abort_handle.abort(); + } +} + +pub(super) async fn register_for_session( + connection: &SacpConnection, + mcp_servers: &mut Vec, + thread_goal_state: Arc>, + backend_event_tx: mpsc::Sender, + transcript_recorder: Arc>>>, + connected: Arc, + http_server: Arc>>, +) -> Result<()> { + connected.store(false, Ordering::Relaxed); + + if !connection.capabilities().mcp_capabilities.http { + return Ok(()); + } + + let mut server = http_server.lock().await; + if server.is_none() { + *server = Some( + NoriClientServer::spawn(NoriClientShared::new( + thread_goal_state, + backend_event_tx, + transcript_recorder, + Arc::clone(&connected), + )) + .await?, + ); + } + let Some(server) = server.as_ref() else { + return Ok(()); + }; + mcp_servers.push(acp::McpServer::Http(acp::McpServerHttp::new( + "nori-client", + server.url().to_string(), + ))); + + Ok(()) +} + +#[cfg(test)] +mod tests { + use pretty_assertions::assert_eq; + use serde_json::json; + + use super::*; + + fn service() -> NoriClientService { + let (backend_event_tx, _backend_event_rx) = mpsc::channel(8); + NoriClientService::new(NoriClientShared::new( + Arc::new(Mutex::new(thread_goal::ThreadGoalState::default())), + backend_event_tx, + Arc::new(Mutex::new(None)), + Arc::new(AtomicBool::new(false)), + )) + } + + fn tool_text(result: &CallToolResult) -> &str { + result.content[0] + .as_text() + .expect("tool result should contain text content") + .text + .as_str() + } + + fn is_error(result: &CallToolResult) -> bool { + result.is_error.unwrap_or(false) + } + + fn parsed_tool_text(result: &CallToolResult) -> serde_json::Value { + serde_json::from_str(tool_text(result)).expect("tool text should be json") + } + + async fn create(service: &NoriClientService, objective: &str) -> CallToolResult { + service + .create_goal(Parameters(CreateGoalRequest { + objective: objective.to_string(), + })) + .await + .expect("create_goal should not fail at the protocol level") + } + + async fn get(service: &NoriClientService) -> CallToolResult { + service + .get_goal() + .await + .expect("get_goal should not fail at the protocol level") + } + + async fn update(service: &NoriClientService, status: GoalCompletion) -> CallToolResult { + service + .update_goal(Parameters(UpdateGoalRequest { status })) + .await + .expect("update_goal should not fail at the protocol level") + } + + #[test] + fn get_info_advertises_nori_client_with_tools() { + let info = service().get_info(); + assert_eq!(info.server_info.name, "nori-client"); + assert!(info.capabilities.tools.is_some()); + } + + #[tokio::test] + async fn get_goal_returns_null_without_goal() { + let result = get(&service()).await; + assert!(!is_error(&result)); + assert_eq!(parsed_tool_text(&result), json!({ "goal": null })); + } + + #[tokio::test] + async fn create_goal_creates_active_goal_and_emits_event() { + let (backend_event_tx, mut backend_event_rx) = mpsc::channel(8); + let service = NoriClientService::new(NoriClientShared::new( + Arc::new(Mutex::new(thread_goal::ThreadGoalState::default())), + backend_event_tx, + Arc::new(Mutex::new(None)), + Arc::new(AtomicBool::new(false)), + )); + + let create_result = create(&service, "Ship the ACP goal bridge").await; + assert!(!is_error(&create_result)); + assert!(tool_text(&create_result).contains("Ship the ACP goal bridge")); + + let emitted_event = tokio::time::timeout( + std::time::Duration::from_millis(200), + backend_event_rx.recv(), + ) + .await + .expect("create_goal should emit a client event before timeout") + .expect("create_goal should emit a client event"); + match emitted_event { + BackendEvent::Client(ClientEvent::ThreadGoalUpdated(update)) => { + assert_eq!(update.goal.objective, "Ship the ACP goal bridge"); + } + other => panic!("expected thread goal update, got {other:?}"), + } + + let get_result = get(&service).await; + let goal = &parsed_tool_text(&get_result)["goal"]; + assert_eq!(goal["status"], "active"); + assert_eq!(goal["objective"], "Ship the ACP goal bridge"); + } + + #[tokio::test] + async fn create_goal_rejects_existing_goal() { + let service = service(); + assert!(!is_error(&create(&service, "First goal").await)); + + let second = create(&service, "Second goal").await; + assert!(is_error(&second)); + assert!(tool_text(&second).contains("already has a goal")); + + let goal = parsed_tool_text(&get(&service).await); + assert_eq!(goal["goal"]["objective"], "First goal"); + } + + #[tokio::test] + async fn update_goal_marks_goal_complete() { + let service = service(); + assert!(!is_error(&create(&service, "Finish completely").await)); + + let complete = update(&service, GoalCompletion::Complete).await; + assert!(!is_error(&complete)); + assert_eq!(parsed_tool_text(&complete)["goal"]["status"], "complete"); + } + + #[tokio::test] + async fn update_goal_marks_goal_blocked() { + let service = service(); + assert!(!is_error(&create(&service, "Finish carefully").await)); + + let blocked = update(&service, GoalCompletion::Blocked).await; + assert!(!is_error(&blocked)); + assert_eq!(parsed_tool_text(&blocked)["goal"]["status"], "blocked"); + } + + #[tokio::test] + async fn update_goal_reports_missing_goal() { + let result = update(&service(), GoalCompletion::Complete).await; + assert!(is_error(&result)); + assert!(tool_text(&result).contains("no goal exists")); + } +} diff --git a/nori-rs/acp/src/backend/session.rs b/nori-rs/acp/src/backend/session.rs index e03acca4e..936198a64 100644 --- a/nori-rs/acp/src/backend/session.rs +++ b/nori-rs/acp/src/backend/session.rs @@ -138,7 +138,7 @@ impl AcpBackend { &config.mcp_servers, config.mcp_oauth_credentials_store_mode, ); - thread_goal_mcp::register_for_session( + nori_client_mcp::register_for_session( &connection, &mut mcp_servers, Arc::clone(&thread_goal_state), @@ -187,7 +187,7 @@ impl AcpBackend { &config.mcp_servers, config.mcp_oauth_credentials_store_mode, ); - thread_goal_mcp::register_for_session( + nori_client_mcp::register_for_session( &connection, &mut mcp_servers, Arc::clone(&thread_goal_state), @@ -246,7 +246,7 @@ impl AcpBackend { &config.mcp_servers, config.mcp_oauth_credentials_store_mode, ); - thread_goal_mcp::register_for_session( + nori_client_mcp::register_for_session( &connection, &mut mcp_servers, Arc::clone(&thread_goal_state), diff --git a/nori-rs/acp/src/backend/session_runtime_driver.rs b/nori-rs/acp/src/backend/session_runtime_driver.rs index 1e776a4df..6c1a963a3 100644 --- a/nori-rs/acp/src/backend/session_runtime_driver.rs +++ b/nori-rs/acp/src/backend/session_runtime_driver.rs @@ -490,7 +490,7 @@ impl AcpBackend { &self.mcp_servers, self.mcp_oauth_credentials_store_mode, ); - if let Err(err) = thread_goal_mcp::register_for_session( + if let Err(err) = nori_client_mcp::register_for_session( &self.connection, &mut mcp_servers, Arc::clone(&self.thread_goal_state), @@ -538,6 +538,13 @@ impl AcpBackend { return; } + // Safety invariant: a hidden continuation may only chain after another + // hidden continuation once we have observed the agent actually connect to + // the `nori-client` MCP endpoint (`goal.connected`). Until then the agent + // has no way to mark the goal complete/blocked, so unbounded + // continuation→continuation chaining would loop forever. A user turn may + // always start one continuation; only continuation→continuation chaining + // requires the completion tool to be reachable. let can_chain_continuation = self .goal_mcp_connected .load(std::sync::atomic::Ordering::Relaxed); diff --git a/nori-rs/acp/src/backend/spawn_and_relay.rs b/nori-rs/acp/src/backend/spawn_and_relay.rs index 92b295691..95738d387 100644 --- a/nori-rs/acp/src/backend/spawn_and_relay.rs +++ b/nori-rs/acp/src/backend/spawn_and_relay.rs @@ -64,7 +64,7 @@ impl AcpBackend { &config.mcp_servers, config.mcp_oauth_credentials_store_mode, ); - thread_goal_mcp::register_for_session( + nori_client_mcp::register_for_session( &connection, &mut mcp_servers, Arc::clone(&thread_goal_state), diff --git a/nori-rs/acp/src/backend/tests/part5.rs b/nori-rs/acp/src/backend/tests/part5.rs index 784927d16..1c5fbe135 100644 --- a/nori-rs/acp/src/backend/tests/part5.rs +++ b/nori-rs/acp/src/backend/tests/part5.rs @@ -651,8 +651,8 @@ async fn goal_mcp_initialize_allows_chained_hidden_continuations() { .expect("Should receive SessionConfigured event"); let new_session = latest_logged_new_session(&wire_log_dir); - let goal_mcp_url = nori_goal_http_url(&new_session); - initialize_goal_mcp(&goal_mcp_url).await; + let goal_mcp_url = nori_client_http_url(&new_session); + initialize_nori_client_mcp(&goal_mcp_url).await; backend .submit(Op::ThreadGoalSet { @@ -850,10 +850,10 @@ async fn goal_mcp_server_is_advertised_to_http_mcp_agents() { matches!( server, acp::McpServer::Http(http) - if http.name == "nori-goal" && http.url.starts_with("http://127.0.0.1:") + if http.name == "nori-client" && http.url.starts_with("http://127.0.0.1:") ) }), - "expected session/new to advertise the local loopback nori-goal MCP server, got: {:?}", + "expected session/new to advertise the local loopback nori-client MCP server, got: {:?}", new_session.mcp_servers ); } @@ -869,10 +869,10 @@ async fn goal_mcp_server_is_not_advertised_without_http_mcp_capability() { new_session.mcp_servers.iter().all(|server| { !matches!( server, - acp::McpServer::Http(http) if http.name == "nori-goal" + acp::McpServer::Http(http) if http.name == "nori-client" ) }), - "expected session/new to omit nori-goal without HTTP MCP capability, got: {:?}", + "expected session/new to omit nori-client without HTTP MCP capability, got: {:?}", new_session.mcp_servers ); } @@ -914,10 +914,10 @@ async fn codex_agent_receives_loopback_http_goal_mcp_server() { matches!( server, acp::McpServer::Http(http) - if http.name == "nori-goal" && http.url.starts_with("http://127.0.0.1:") + if http.name == "nori-client" && http.url.starts_with("http://127.0.0.1:") ) }), - "expected session/new to advertise a real loopback HTTP nori-goal server for Codex ACP, got: {:?}", + "expected session/new to advertise a real loopback HTTP nori-client server for Codex ACP, got: {:?}", new_session.mcp_servers ); } @@ -1013,18 +1013,22 @@ fn count_logged_requests(log_dir: &std::path::Path, method: &str) -> usize { .count() } -fn nori_goal_http_url(new_session: &acp::NewSessionRequest) -> String { +fn nori_client_http_url(new_session: &acp::NewSessionRequest) -> String { new_session .mcp_servers .iter() .find_map(|server| match server { - acp::McpServer::Http(http) if http.name == "nori-goal" => Some(http.url.clone()), + acp::McpServer::Http(http) if http.name == "nori-client" => Some(http.url.clone()), _ => None, }) - .expect("session/new should advertise nori-goal HTTP MCP server") + .expect("session/new should advertise nori-client HTTP MCP server") } -async fn initialize_goal_mcp(url: &str) { +/// Drive a real MCP `initialize` against the loopback `nori-client` server, +/// mirroring how an external agent connects. The rmcp streamable-HTTP server is +/// spec-compliant, so the POST must send `Accept: application/json, +/// text/event-stream` and the result arrives as an SSE `data:` line. +async fn initialize_nori_client_mcp(url: &str) { use tokio::io::AsyncReadExt; use tokio::io::AsyncWriteExt; @@ -1048,6 +1052,7 @@ async fn initialize_goal_mcp(url: &str) { let request = format!( "POST /{path} HTTP/1.1\r\n\ Host: {host_port}\r\n\ +Accept: application/json, text/event-stream\r\n\ Content-Type: application/json\r\n\ Content-Length: {}\r\n\ Connection: close\r\n\r\n\ @@ -1056,7 +1061,7 @@ Connection: close\r\n\r\n\ ); let mut stream = tokio::net::TcpStream::connect(host_port) .await - .expect("goal MCP HTTP server should accept initialize"); + .expect("nori-client MCP server should accept initialize"); stream .write_all(request.as_bytes()) .await @@ -1068,7 +1073,7 @@ Connection: close\r\n\r\n\ .expect("initialize response should read"); assert!( response.contains("200 OK") && response.contains("\"serverInfo\""), - "expected successful goal MCP initialize response, got: {response}" + "expected successful nori-client MCP initialize response, got: {response}" ); } diff --git a/nori-rs/acp/src/backend/thread_goal_http_mcp.rs b/nori-rs/acp/src/backend/thread_goal_http_mcp.rs deleted file mode 100644 index a0d11b6f2..000000000 --- a/nori-rs/acp/src/backend/thread_goal_http_mcp.rs +++ /dev/null @@ -1,213 +0,0 @@ -use anyhow::Context; -use anyhow::Result; -use serde_json::Value; -use tokio::io::AsyncReadExt; -use tokio::io::AsyncWriteExt; -use tokio::net::TcpListener; -use tokio::net::TcpStream; - -use super::thread_goal_mcp::ThreadGoalMcpBridge; - -pub(crate) struct GoalMcpHttpServer { - url: String, - abort_handle: tokio::task::AbortHandle, -} - -impl GoalMcpHttpServer { - pub(crate) async fn spawn(bridge: ThreadGoalMcpBridge) -> Result { - let listener = TcpListener::bind(("127.0.0.1", 0)) - .await - .context("failed to bind local goal MCP HTTP server")?; - let url = format!("http://{}/mcp", listener.local_addr()?); - let task = tokio::spawn(async move { - loop { - let Ok((stream, _addr)) = listener.accept().await else { - break; - }; - let bridge = bridge.clone(); - tokio::spawn(async move { - if let Err(err) = handle_connection(stream, bridge).await { - tracing::debug!("local goal MCP HTTP request failed: {err}"); - } - }); - } - }); - - Ok(Self { - url, - abort_handle: task.abort_handle(), - }) - } - - pub(crate) fn url(&self) -> &str { - &self.url - } -} - -impl Drop for GoalMcpHttpServer { - fn drop(&mut self) { - self.abort_handle.abort(); - } -} - -async fn handle_connection(mut stream: TcpStream, bridge: ThreadGoalMcpBridge) -> Result<()> { - let request = read_http_request(&mut stream).await?; - let (status, body) = match request { - HttpRequest::Post { body } => match serde_json::from_slice::(&body) { - Ok(value) => handle_json_rpc(bridge, value).await, - Err(err) => ( - "400 Bad Request", - Some(json_rpc_error( - Value::Null, - -32700, - format!("parse error: {err}"), - )), - ), - }, - HttpRequest::Options => ("204 No Content", None), - HttpRequest::Other => ( - "405 Method Not Allowed", - Some(serde_json::json!({ "error": "method not allowed" })), - ), - }; - write_http_response(&mut stream, status, body.as_ref()).await -} - -enum HttpRequest { - Post { body: Vec }, - Options, - Other, -} - -async fn read_http_request(stream: &mut TcpStream) -> Result { - let mut buffer = Vec::new(); - let mut chunk = [0_u8; 1024]; - let header_end = loop { - let read = stream.read(&mut chunk).await?; - if read == 0 { - anyhow::bail!("connection closed before HTTP headers"); - } - buffer.extend_from_slice(&chunk[..read]); - if let Some(header_end) = find_header_end(&buffer) { - break header_end; - } - }; - - let headers = String::from_utf8_lossy(&buffer[..header_end]); - let request_line = headers.lines().next().unwrap_or_default(); - if request_line.starts_with("OPTIONS ") { - return Ok(HttpRequest::Options); - } - if !request_line.starts_with("POST ") { - return Ok(HttpRequest::Other); - } - - let content_length = headers - .lines() - .find_map(|line| { - let (name, value) = line.split_once(':')?; - name.eq_ignore_ascii_case("content-length") - .then(|| value.trim().parse::().ok()) - .flatten() - }) - .unwrap_or(0); - let body_start = header_end + 4; - while buffer.len().saturating_sub(body_start) < content_length { - let read = stream.read(&mut chunk).await?; - if read == 0 { - anyhow::bail!("connection closed before HTTP body"); - } - buffer.extend_from_slice(&chunk[..read]); - } - - Ok(HttpRequest::Post { - body: buffer[body_start..body_start + content_length].to_vec(), - }) -} - -fn find_header_end(buffer: &[u8]) -> Option { - buffer.windows(4).position(|window| window == b"\r\n\r\n") -} - -async fn handle_json_rpc( - bridge: ThreadGoalMcpBridge, - value: Value, -) -> (&'static str, Option) { - if let Some(items) = value.as_array() { - let mut responses = Vec::new(); - for item in items { - if let Some(response) = handle_json_rpc_message(&bridge, item.clone()).await { - responses.push(response); - } - } - return if responses.is_empty() { - ("202 Accepted", None) - } else { - ("200 OK", Some(Value::Array(responses))) - }; - } - - match handle_json_rpc_message(&bridge, value).await { - Some(response) => ("200 OK", Some(response)), - None => ("202 Accepted", None), - } -} - -async fn handle_json_rpc_message(bridge: &ThreadGoalMcpBridge, message: Value) -> Option { - let id = message.get("id").cloned(); - let method = message - .get("method") - .and_then(Value::as_str) - .unwrap_or_default(); - let params = message - .get("params") - .cloned() - .unwrap_or_else(|| serde_json::json!({})); - - let id = id?; - let result = bridge.handle_mcp_request(method, params).await; - Some(serde_json::json!({ - "jsonrpc": "2.0", - "id": id, - "result": result, - })) -} - -fn json_rpc_error(id: Value, code: i64, message: String) -> Value { - serde_json::json!({ - "jsonrpc": "2.0", - "id": id, - "error": { - "code": code, - "message": message, - }, - }) -} - -async fn write_http_response( - stream: &mut TcpStream, - status: &str, - body: Option<&Value>, -) -> Result<()> { - let body = body.map(serde_json::to_vec).transpose()?; - let body_len = body.as_ref().map_or(0, Vec::len); - let content_type = if body.is_some() { - "application/json" - } else { - "text/plain" - }; - let headers = format!( - "HTTP/1.1 {status}\r\n\ -Access-Control-Allow-Origin: *\r\n\ -Access-Control-Allow-Headers: content-type, mcp-session-id, mcp-protocol-version\r\n\ -Access-Control-Allow-Methods: POST, OPTIONS\r\n\ -Content-Type: {content_type}\r\n\ -Content-Length: {body_len}\r\n\ -Connection: close\r\n\r\n" - ); - stream.write_all(headers.as_bytes()).await?; - if let Some(body) = body { - stream.write_all(&body).await?; - } - Ok(()) -} diff --git a/nori-rs/acp/src/backend/thread_goal_mcp.rs b/nori-rs/acp/src/backend/thread_goal_mcp.rs deleted file mode 100644 index 8bdd1e460..000000000 --- a/nori-rs/acp/src/backend/thread_goal_mcp.rs +++ /dev/null @@ -1,528 +0,0 @@ -use serde_json::Value; -use std::sync::atomic::AtomicBool; -use std::sync::atomic::Ordering; -use tokio::sync::Mutex; - -use super::*; -use crate::backend::thread_goal_http_mcp::GoalMcpHttpServer; -use codex_protocol::protocol::ThreadGoalStatus; -use nori_protocol::ThreadGoalUpdated; - -const GET_GOAL_TOOL_NAME: &str = "get_goal"; -const CREATE_GOAL_TOOL_NAME: &str = "create_goal"; -const UPDATE_GOAL_TOOL_NAME: &str = "update_goal"; - -const DUPLICATE_CREATE_GOAL_ERROR: &str = "cannot create a new goal because this thread already has a goal; use update_goal only when the existing goal is complete"; -const UPDATE_GOAL_STATUS_ERROR: &str = "update_goal can only mark the existing goal complete or blocked; pause, resume, budget-limited, and usage-limited status changes are controlled by the user or system"; - -#[derive(Clone)] -pub(crate) struct ThreadGoalMcpBridge { - thread_goal_state: Arc>, - backend_event_tx: mpsc::Sender, - transcript_recorder: Arc>>>, - connected: Arc, -} - -impl ThreadGoalMcpBridge { - pub(crate) fn new( - thread_goal_state: Arc>, - backend_event_tx: mpsc::Sender, - transcript_recorder: Arc>>>, - connected: Arc, - ) -> Self { - Self { - thread_goal_state, - backend_event_tx, - transcript_recorder, - connected, - } - } - - pub(crate) async fn handle_mcp_request(&self, method: &str, params: Value) -> Value { - match method { - "initialize" => { - self.connected.store(true, Ordering::Relaxed); - serde_json::json!({ - "protocolVersion": "2024-11-05", - "capabilities": { "tools": {} }, - "serverInfo": { "name": "nori-goal", "version": env!("CARGO_PKG_VERSION") } - }) - } - "tools/list" => serde_json::json!({ "tools": tools() }), - "tools/call" => self.handle_tool_call(params).await, - _ => tool_error(format!("unsupported goal MCP request: {method}")), - } - } - - async fn handle_tool_call(&self, params: Value) -> Value { - let name = params - .get("name") - .and_then(Value::as_str) - .unwrap_or_default(); - let arguments = params - .get("arguments") - .cloned() - .unwrap_or_else(|| serde_json::json!({})); - - match name { - GET_GOAL_TOOL_NAME => self.get_goal().await, - CREATE_GOAL_TOOL_NAME => self.create_goal(arguments).await, - UPDATE_GOAL_TOOL_NAME => self.update_goal(arguments).await, - "" => tool_error("tools/call requires a tool name"), - other => tool_error(format!("unknown goal tool: {other}")), - } - } - - async fn get_goal(&self) -> Value { - let state = self.thread_goal_state.lock().await; - let goal = state.snapshot(thread_goal::now_seconds()).map(goal_json); - tool_success(serde_json::json!({ "goal": goal })) - } - - async fn create_goal(&self, arguments: Value) -> Value { - if arguments.get("token_budget").is_some() { - return tool_error("token budgets are not supported by Nori ACP goals yet"); - } - - let Some(objective) = arguments.get("objective").and_then(Value::as_str) else { - return tool_error("create_goal requires objective"); - }; - let objective = objective.trim().to_string(); - let result = { - let now = thread_goal::now_seconds(); - let mut state = self.thread_goal_state.lock().await; - if state.snapshot(now).is_some() { - return tool_error(DUPLICATE_CREATE_GOAL_ERROR); - } - state.set_objective(objective, Some(ThreadGoalStatus::Active), now) - }; - - match result { - Ok(goal) => { - self.emit_goal_updated(goal.clone()).await; - tool_success(serde_json::json!({ "goal": goal_json(goal) })) - } - Err(err) => tool_error(err), - } - } - - async fn update_goal(&self, arguments: Value) -> Value { - let Some(status) = arguments.get("status").and_then(Value::as_str) else { - return tool_error("update_goal requires status"); - }; - let status = match status { - "complete" => ThreadGoalStatus::Complete, - "blocked" => ThreadGoalStatus::Blocked, - "active" | "paused" | "usage_limited" | "budget_limited" => { - return tool_error(UPDATE_GOAL_STATUS_ERROR); - } - other => return tool_error(format!("unsupported goal status: {other}")), - }; - - let result = { - let now = thread_goal::now_seconds(); - let mut state = self.thread_goal_state.lock().await; - state.set_status(status, now) - }; - - match result { - Ok(goal) => { - self.emit_goal_updated(goal.clone()).await; - tool_success(serde_json::json!({ "goal": goal_json(goal) })) - } - Err(err) => tool_error(err), - } - } - - async fn emit_goal_updated(&self, goal: thread_goal::ThreadGoalSnapshot) { - let recorder = self.transcript_recorder.lock().await.clone(); - emit_client_event( - &self.backend_event_tx, - recorder.as_ref(), - ClientEvent::ThreadGoalUpdated(ThreadGoalUpdated { - goal: goal.into_client_goal(), - }), - ) - .await; - } -} - -pub(super) async fn register_for_session( - connection: &SacpConnection, - mcp_servers: &mut Vec, - thread_goal_state: Arc>, - backend_event_tx: mpsc::Sender, - transcript_recorder: Arc>>>, - connected: Arc, - http_server: Arc>>, -) -> Result<()> { - connected.store(false, Ordering::Relaxed); - - if !supports_local_goal_mcp(connection) { - return Ok(()); - } - - let mut server = http_server.lock().await; - if server.is_none() { - *server = Some( - GoalMcpHttpServer::spawn(ThreadGoalMcpBridge::new( - thread_goal_state, - backend_event_tx, - transcript_recorder, - Arc::clone(&connected), - )) - .await?, - ); - } - let Some(server) = server.as_ref() else { - return Ok(()); - }; - mcp_servers.push(acp::McpServer::Http(acp::McpServerHttp::new( - "nori-goal", - server.url().to_string(), - ))); - - Ok(()) -} - -fn supports_local_goal_mcp(connection: &SacpConnection) -> bool { - connection.capabilities().mcp_capabilities.http -} - -fn tools() -> Vec { - vec![ - serde_json::json!({ - "name": GET_GOAL_TOOL_NAME, - "description": "Get the current goal for this thread, including status, token and elapsed-time usage.", - "inputSchema": { - "type": "object", - "properties": {}, - "required": [], - "additionalProperties": false - } - }), - serde_json::json!({ - "name": CREATE_GOAL_TOOL_NAME, - "description": format!( - "Create a goal only when explicitly requested by the user or system/developer instructions; do not infer goals from ordinary tasks. Fails if a goal exists; use {UPDATE_GOAL_TOOL_NAME} only for status." - ), - "inputSchema": { - "type": "object", - "properties": { - "objective": { - "type": "string", - "description": "Required. The concrete objective to start pursuing. This starts a new active goal only when no goal is currently defined; if a goal already exists, this tool fails." - } - }, - "required": ["objective"], - "additionalProperties": false - } - }), - serde_json::json!({ - "name": UPDATE_GOAL_TOOL_NAME, - "description": "Update the existing goal. Use this tool only to mark the goal achieved or genuinely blocked.", - "inputSchema": { - "type": "object", - "properties": { - "status": { - "type": "string", - "enum": ["complete", "blocked"], - "description": "Required. Set to complete only when the objective is achieved and no required work remains. Set to blocked only after the same blocking condition has repeated and the agent is at an impasse." - } - }, - "required": ["status"], - "additionalProperties": false - } - }), - ] -} - -fn tool_success(body: Value) -> Value { - tool_response(body, false) -} - -fn tool_error(message: impl Into) -> Value { - tool_response(Value::String(message.into()), true) -} - -fn tool_response(body: Value, is_error: bool) -> Value { - let text = match body { - Value::String(text) => text, - other => serde_json::to_string(&other) - .unwrap_or_else(|err| format!("failed to serialize goal MCP response: {err}")), - }; - serde_json::json!({ - "content": [{ "type": "text", "text": text }], - "isError": is_error - }) -} - -fn goal_json(goal: thread_goal::ThreadGoalSnapshot) -> Value { - serde_json::json!({ - "objective": goal.objective, - "status": status_label(goal.status), - "tokens_used": goal.tokens_used, - "token_budget": null, - "tokens_remaining": null, - "time_used_seconds": goal.time_used_seconds, - "created_at": goal.created_at, - "updated_at": goal.updated_at, - }) -} - -fn status_label(status: ThreadGoalStatus) -> &'static str { - match status { - ThreadGoalStatus::Active => "active", - ThreadGoalStatus::Paused => "paused", - ThreadGoalStatus::Blocked => "blocked", - ThreadGoalStatus::UsageLimited => "usage_limited", - ThreadGoalStatus::BudgetLimited => "budget_limited", - ThreadGoalStatus::Complete => "complete", - } -} - -#[cfg(test)] -mod tests { - use pretty_assertions::assert_eq; - use serde_json::json; - - use super::*; - - fn bridge() -> ThreadGoalMcpBridge { - let (backend_event_tx, _backend_event_rx) = mpsc::channel(8); - ThreadGoalMcpBridge::new( - Arc::new(Mutex::new(thread_goal::ThreadGoalState::default())), - backend_event_tx, - Arc::new(Mutex::new(None)), - Arc::new(AtomicBool::new(false)), - ) - } - - fn tool_text(response: &Value) -> &str { - response["content"][0]["text"] - .as_str() - .expect("tool response should contain text content") - } - - fn is_error(response: &Value) -> bool { - response["isError"].as_bool().unwrap_or(false) - } - - fn parsed_tool_text(response: &Value) -> Value { - serde_json::from_str(tool_text(response)).expect("tool text should be json") - } - - fn tool_by_name<'a>(response: &'a Value, name: &str) -> &'a Value { - response["tools"] - .as_array() - .expect("tools/list should return tools") - .iter() - .find(|tool| tool["name"] == name) - .expect("expected tool to be listed") - } - - #[tokio::test] - async fn goal_mcp_lists_codex_compatible_goal_tools() { - let response = bridge().handle_mcp_request("tools/list", json!({})).await; - - let get_goal = tool_by_name(&response, "get_goal"); - assert_eq!( - get_goal["inputSchema"]["additionalProperties"], - json!(false) - ); - assert_eq!( - tool_by_name(&response, "create_goal")["inputSchema"]["required"], - json!(["objective"]) - ); - assert_eq!( - tool_by_name(&response, "update_goal")["inputSchema"]["properties"]["status"]["enum"], - json!(["complete", "blocked"]) - ); - } - - #[tokio::test] - async fn get_goal_tool_returns_null_without_goal() { - let response = bridge() - .handle_mcp_request("tools/call", json!({ "name": "get_goal" })) - .await; - - assert!(!is_error(&response)); - assert_eq!(parsed_tool_text(&response), json!({ "goal": null })); - } - - #[tokio::test] - async fn create_goal_tool_creates_active_goal_and_get_goal_reads_it() { - let (backend_event_tx, mut backend_event_rx) = mpsc::channel(8); - let bridge = ThreadGoalMcpBridge::new( - Arc::new(Mutex::new(thread_goal::ThreadGoalState::default())), - backend_event_tx, - Arc::new(Mutex::new(None)), - Arc::new(AtomicBool::new(false)), - ); - - let create_response = bridge - .handle_mcp_request( - "tools/call", - json!({ - "name": "create_goal", - "arguments": { "objective": "Ship the ACP goal bridge" } - }), - ) - .await; - - assert!(!is_error(&create_response)); - assert!(tool_text(&create_response).contains("Ship the ACP goal bridge")); - let emitted_event = tokio::time::timeout( - std::time::Duration::from_millis(200), - backend_event_rx.recv(), - ) - .await - .expect("create_goal should emit a client event before timeout") - .expect("create_goal should emit a client event"); - match emitted_event { - BackendEvent::Client(ClientEvent::ThreadGoalUpdated(update)) => { - assert_eq!(update.goal.objective, "Ship the ACP goal bridge"); - } - other => panic!("expected thread goal update, got {other:?}"), - } - - let get_response = bridge - .handle_mcp_request("tools/call", json!({ "name": "get_goal" })) - .await; - assert!(!is_error(&get_response)); - let goal = &parsed_tool_text(&get_response)["goal"]; - assert_eq!(goal["status"], "active"); - assert_eq!(goal["objective"], "Ship the ACP goal bridge"); - } - - #[tokio::test] - async fn create_goal_tool_rejects_existing_goal() { - let bridge = bridge(); - let first_response = bridge - .handle_mcp_request( - "tools/call", - json!({ - "name": "create_goal", - "arguments": { "objective": "First goal" } - }), - ) - .await; - assert!(!is_error(&first_response)); - - let second_response = bridge - .handle_mcp_request( - "tools/call", - json!({ - "name": "create_goal", - "arguments": { "objective": "Second goal" } - }), - ) - .await; - - assert!(is_error(&second_response)); - assert!(tool_text(&second_response).contains("already has a goal")); - - let get_response = bridge - .handle_mcp_request("tools/call", json!({ "name": "get_goal" })) - .await; - assert_eq!( - parsed_tool_text(&get_response)["goal"]["objective"], - "First goal" - ); - } - - #[tokio::test] - async fn update_goal_tool_only_allows_complete_or_blocked() { - let bridge = bridge(); - let create_response = bridge - .handle_mcp_request( - "tools/call", - json!({ - "name": "create_goal", - "arguments": { "objective": "Finish carefully" } - }), - ) - .await; - assert!(!is_error(&create_response)); - - let paused_response = bridge - .handle_mcp_request( - "tools/call", - json!({ - "name": "update_goal", - "arguments": { "status": "paused" } - }), - ) - .await; - assert!(is_error(&paused_response)); - assert!( - tool_text(&paused_response).contains("only mark the existing goal complete or blocked") - ); - - let get_response = bridge - .handle_mcp_request("tools/call", json!({ "name": "get_goal" })) - .await; - assert_eq!(parsed_tool_text(&get_response)["goal"]["status"], "active"); - - let blocked_response = bridge - .handle_mcp_request( - "tools/call", - json!({ - "name": "update_goal", - "arguments": { "status": "blocked" } - }), - ) - .await; - assert!(!is_error(&blocked_response)); - assert_eq!( - parsed_tool_text(&blocked_response)["goal"]["status"], - "blocked" - ); - } - - #[tokio::test] - async fn update_goal_tool_marks_goal_complete() { - let bridge = bridge(); - let create_response = bridge - .handle_mcp_request( - "tools/call", - json!({ - "name": "create_goal", - "arguments": { "objective": "Finish completely" } - }), - ) - .await; - assert!(!is_error(&create_response)); - - let complete_response = bridge - .handle_mcp_request( - "tools/call", - json!({ - "name": "update_goal", - "arguments": { "status": "complete" } - }), - ) - .await; - assert!(!is_error(&complete_response)); - assert_eq!( - parsed_tool_text(&complete_response)["goal"]["status"], - "complete" - ); - } - - #[tokio::test] - async fn update_goal_tool_reports_missing_goal() { - let response = bridge() - .handle_mcp_request( - "tools/call", - json!({ - "name": "update_goal", - "arguments": { "status": "complete" } - }), - ) - .await; - - assert!(is_error(&response)); - assert!(tool_text(&response).contains("no goal exists")); - } -} From 7886f48ec209bb828c4a942e18f7a3164fc0149d Mon Sep 17 00:00:00 2001 From: Clifford Ressel Date: Fri, 29 May 2026 16:11:12 -0400 Subject: [PATCH 2/4] docs(goal): document nori-client rmcp server (was hand-rolled nori-goal) Update the acp Noridoc and the goal architecture-diagram note for R1: the loopback goal MCP server is now rmcp's StreamableHttpService with typed #[tool] handlers (nori_client_mcp.rs), and the advertised server is renamed nori-goal -> nori-client to reflect that it is Nori's general harness-side channel to the agent, with the goal tools as its first tenants. --- goal-command-architecture-diagrams.md | 27 ++++++++++++----------- nori-rs/acp/docs.md | 31 +++++++++++++-------------- 2 files changed, 30 insertions(+), 28 deletions(-) diff --git a/goal-command-architecture-diagrams.md b/goal-command-architecture-diagrams.md index d1c1b77ba..4670de2b6 100644 --- a/goal-command-architecture-diagrams.md +++ b/goal-command-architecture-diagrams.md @@ -6,7 +6,7 @@ goal complete or blocked. - In the raw Codex harness, goals are native session/runtime state. - In Nori CLI over ACP, goals are owned by the Nori ACP backend and projected - into an external ACP agent through prompt context plus a local `nori-goal` + into an external ACP agent through prompt context plus a local `nori-client` MCP server. ## Raw Codex Harness @@ -120,7 +120,7 @@ ThreadGoal.status changes; continuation loop stops ## Nori CLI Over ACP Nori keeps the user-facing goal state in the ACP backend. During ACP session -setup, it advertises a local `nori-goal` MCP server when the agent connection +setup, it advertises a local `nori-client` MCP server when the agent connection reports HTTP MCP support. Per turn, it sends goal context to the external ACP agent as prompt text, and the external agent marks completion/blocking through that local MCP server. @@ -137,11 +137,11 @@ sequenceDiagram participant Runtime as SessionRuntimeDriver participant ACP as ACP connection participant Agent as External ACP agent - participant MCP as local nori-goal MCP server + participant MCP as local nori-client MCP server - Backend->>MCP: ensure local nori-goal server exists - Backend->>ACP: session/new or session/load with mcpServers[nori-goal] - ACP-->>Agent: advertise nori-goal HTTP MCP server + Backend->>MCP: ensure local nori-client server exists + Backend->>ACP: session/new or session/load with mcpServers[nori-client] + ACP-->>Agent: advertise nori-client HTTP MCP server Agent->>MCP: connect and initialize goal tools User->>TUI: /goal or /goal resume @@ -191,7 +191,7 @@ ACP session setup | | if connection supports HTTP MCP v -Advertise local nori-goal HTTP MCP server +Advertise local nori-client HTTP MCP server | | external agent connects and initializes tools v @@ -232,7 +232,7 @@ External ACP agent keeps working | | when evidence proves done or blocked v -nori-goal.update_goal(status="complete" | "blocked") +nori-client.update_goal(status="complete" | "blocked") | v ThreadGoalState status changes; continuation loop stops @@ -249,11 +249,14 @@ ThreadGoalState status changes; continuation loop stops - Nori registers the local goal MCP server during ACP session setup/load and advertises it only when the connection reports HTTP MCP support: `acp/src/backend/spawn_and_relay.rs`, `acp/src/backend/session.rs`, and - `acp/src/backend/thread_goal_mcp.rs`. + `acp/src/backend/nori_client_mcp.rs`. - The ACP connection forwards `mcpServers` to the external agent when creating a session in `acp/src/connection/sacp_connection.rs`. -- The local `nori-goal` MCP server exposes `get_goal`, `create_goal`, and - `update_goal` in `acp/src/backend/thread_goal_mcp.rs`. +- The local `nori-client` MCP server exposes `get_goal`, `create_goal`, and + `update_goal` as typed rmcp `#[tool]` handlers on an rmcp `StreamableHttpService` + (served over a loopback `axum` listener) in `acp/src/backend/nori_client_mcp.rs`. + `nori-client` is Nori's general harness-side channel to the agent — the goal + tools are its first tenants, not the whole of it. ## Comparison @@ -263,7 +266,7 @@ ThreadGoalState status changes; continuation loop stops | Model-facing goal context | Hidden `GoalContext` response item | Prepended prompt text and hidden continuation prompt | | Continuation scheduler | `GoalRuntimeState::MaybeContinueIfIdle` | `SessionRuntimeDriver::maybe_submit_goal_continuation` | | Completion evaluator | The model self-audits against current evidence | The external ACP agent self-audits against current evidence | -| Completion actuator | Built-in Codex `update_goal` tool | Local `nori-goal` MCP `update_goal` tool | +| Completion actuator | Built-in Codex `update_goal` tool | Local `nori-client` MCP `update_goal` tool | | Context window | Same Codex thread/session history, compacted as needed | External ACP agent's session context, steered by Nori prompts | | Subagents | Separate Codex threads only when explicitly spawned | Determined by the external ACP agent, not by Nori goal state | diff --git a/nori-rs/acp/docs.md b/nori-rs/acp/docs.md index 90fafde55..4b686d6a0 100644 --- a/nori-rs/acp/docs.md +++ b/nori-rs/acp/docs.md @@ -27,7 +27,7 @@ The ACP crate serves as a bridge between: - The shared `codex-protocol` event stream, which is still used for control-plane signals such as warnings, hook output, prompt summaries, shutdown, and other app-level notifications - `SessionRuntime` in `@/nori-rs/nori-protocol/`, which is now the ACP backend's single source of truth for prompt state, load state, queued prompts, permission ownership, and final assistant-message assembly - Thread-goal operations from `@/nori-rs/protocol` and normalized goal events from `@/nori-rs/nori-protocol`, with backend storage and prompt transformation in `@/nori-rs/acp/src/backend/thread_goal.rs` -- Backend-owned local MCP tools that expose the same goal state to ACP agents through `@/nori-rs/acp/src/backend/thread_goal_mcp.rs` and the loopback HTTP server in `@/nori-rs/acp/src/backend/thread_goal_http_mcp.rs` +- The backend-owned `nori-client` MCP server in `@/nori-rs/acp/src/backend/nori_client_mcp.rs`, Nori's harness-side channel to the external ACP agent, which today exposes the same goal state to ACP agents as typed tools Key files: - `registry.rs` - Agent configuration and npm package detection @@ -35,8 +35,7 @@ Key files: - `translator.rs` - User input to ACP `ContentBlock` conversion and related parsing helpers - `backend/mod.rs` - Implements `ConversationClient` trait from codex-core and emits normalized ACP session events - `backend/thread_goal.rs` - Owns per-session `/goal` state, prompt goal-context formatting, transcript rehydration, and usage checkpoint updates -- `backend/thread_goal_mcp.rs` - Adapts backend-owned goal state into MCP tools for agents that advertise HTTP MCP support -- `backend/thread_goal_http_mcp.rs` - Exposes those backend-owned tools over a loopback HTTP endpoint for ACP adapters +- `backend/nori_client_mcp.rs` - Hosts the `nori-client` MCP server: typed `#[tool]` goal handlers on `NoriClientService` (an rmcp `ServerHandler`), served via rmcp's `StreamableHttpService` over a loopback `axum` listener (`NoriClientServer`) - `transcript_discovery.rs` - Discovers transcript files for external agents - `auto_worktree.rs` - Orchestrates automatic git worktree creation, eligibility checking, and summary-based renaming @@ -100,32 +99,32 @@ Metadata notifications that ACP permits while idle are treated as session-owned The runtime differentiates visible user work from backend-internal continuation work through `QueuedPromptKind` in `@/nori-rs/nori-protocol/src/session_runtime.rs`. Goal continuations are sent through the same reducer and ACP side-effect path as user prompts, so assistant deltas, tool activity, hooks, transcript assistant messages, usage updates, and completion events remain normal. Their prompt text is hidden from visible queue updates and from persisted user transcript entries, which keeps the user's transcript anchored to explicit user input while still letting the ACP session continue the active goal. -**Thread Goal State** (`backend/thread_goal.rs`, `backend/thread_goal_mcp.rs`, `backend/thread_goal_http_mcp.rs`, `backend/submit_and_ops.rs`, `backend/user_input.rs`, `backend/transcript.rs`): +**Thread Goal State** (`backend/thread_goal.rs`, `backend/nori_client_mcp.rs`, `backend/submit_and_ops.rs`, `backend/user_input.rs`, `backend/transcript.rs`): -The ACP backend owns the `/goal` feature as per-session state instead of delegating it to the ACP agent. The TUI sends typed `codex_protocol::protocol::Op::ThreadGoalGet`, `ThreadGoalSet`, and `ThreadGoalClear` operations; `submit_and_ops.rs` routes those operations directly to the backend goal handler; and successful mutations are emitted as `nori_protocol::ClientEvent::ThreadGoalUpdated` or `ThreadGoalCleared`. Eligible ACP agents can also interact with the same state through the backend-owned `nori-goal` local MCP server, which exposes `get_goal`, `create_goal`, and `update_goal`. +The ACP backend owns the `/goal` feature as per-session state instead of delegating it to the ACP agent. The TUI sends typed `codex_protocol::protocol::Op::ThreadGoalGet`, `ThreadGoalSet`, and `ThreadGoalClear` operations; `submit_and_ops.rs` routes those operations directly to the backend goal handler; and successful mutations are emitted as `nori_protocol::ClientEvent::ThreadGoalUpdated` or `ThreadGoalCleared`. Eligible ACP agents can also interact with the same state through the backend-owned `nori-client` local MCP server, which exposes the goal tools `get_goal`, `create_goal`, and `update_goal`. ``` @/nori-rs/tui/src/chatwidget/goal.rs -> @/nori-rs/protocol/src/protocol/mod.rs (typed Op) -> @/nori-rs/acp/src/backend/thread_goal.rs - -> @/nori-rs/acp/src/backend/thread_goal_mcp.rs (optional model-facing MCP) + -> @/nori-rs/acp/src/backend/nori_client_mcp.rs (optional model-facing MCP) -> @/nori-rs/nori-protocol/src/lib.rs (ClientEvent) -> @/nori-rs/tui/src/chatwidget/event_handlers.rs ``` `ThreadGoalState` tracks the current objective, lifecycle status, active elapsed time, accumulated goal token usage, and the latest ACP session-usage checkpoint. ACP usage updates add only positive deltas since the last checkpoint to goal-local `tokens_used`; if context-window usage drops after compaction or session reset, the already accumulated goal usage is preserved and the lower value becomes the next checkpoint. Only the `Active` status accrues active time; paused, blocked, usage-limited, budget-limited, and complete goals keep their accumulated time until they become active again. Objective validation is shared with `@/nori-rs/protocol/src/protocol/mod.rs` so the TUI and backend enforce the same acceptance rules, and goal status text uses the shared compact elapsed-time and SI-token formatters from `@/nori-rs/protocol/src/num_format.rs`. -`thread_goal_mcp.rs` is a bridge, not a second store. Its tools lock the same `ThreadGoalState` used by TUI `/goal` operations, return JSON snapshots shaped for model consumption, and emit the same `ThreadGoalUpdated` client event after mutations. The bridge records those emitted events through `@/nori-rs/acp/src/backend/transcript.rs` when a transcript recorder is available; session setup stores the recorder behind a shared cell because the goal MCP bridge can be created before all resume/create paths know the final transcript session id. +`nori_client_mcp.rs` is a bridge, not a second store. The goal tools are typed rmcp `#[tool]` handlers on `NoriClientService`; they lock the same `ThreadGoalState` used by TUI `/goal` operations, return JSON snapshots shaped for model consumption, and emit the same `ThreadGoalUpdated` client event after mutations. The bridge records those emitted events through `@/nori-rs/acp/src/backend/transcript.rs` when a transcript recorder is available; `NoriClientShared` stores the recorder behind a shared cell because the service is built before the session id is known. -The local `nori-goal` server is only advertised when `@/nori-rs/acp/src/backend/thread_goal_mcp.rs` sees HTTP MCP support from `@/nori-rs/acp/src/connection/mod.rs`. Nori advertises a real `http://127.0.0.1:/mcp` endpoint rather than an ACP pseudo-URL, because Codex ACP and Claude ACP both forward ACP `mcpServers` to their underlying clients as ordinary HTTP MCP server config. The loopback server is owned by the ACP backend and talks directly to the same in-memory goal state as `/goal`. +The local `nori-client` server is only advertised when `register_for_session` in `@/nori-rs/acp/src/backend/nori_client_mcp.rs` sees HTTP MCP support from `@/nori-rs/acp/src/connection/mod.rs`. Nori advertises a real `http://127.0.0.1:/mcp` endpoint rather than an ACP pseudo-URL, because Codex ACP and Claude ACP both forward ACP `mcpServers` to their underlying clients as ordinary HTTP MCP server config. The loopback server (`NoriClientServer`) is served by rmcp's spec-compliant `StreamableHttpService` (stateless mode) behind an `axum` listener; it is owned by the ACP backend (abort-on-drop) and talks directly to the same in-memory goal state as `/goal`. The server is named `nori-client` rather than `nori-goal` because it is Nori's general harness-side channel to the external ACP agent -- the single point of contact for harness-specific tooling the ACP protocol does not yet provide -- and the goal tools are simply its first tenants. -The model-facing MCP contract is intentionally narrower than the user-facing `/goal` command surface. `create_goal` creates a new active goal only when no goal exists, rejects token budgets for now, and delegates objective validation to `ThreadGoalState`. `update_goal` only lets an agent mark the existing goal `complete` or `blocked`; pause, resume, usage-limited, and budget-limited transitions remain controlled by the user or the backend system path. Errors are returned as MCP tool errors instead of changing state. +The model-facing tool contract is intentionally narrower than the user-facing `/goal` command surface. `create_goal` creates a new active goal only when no goal exists, rejects token budgets for now, and delegates objective validation to `ThreadGoalState`. `update_goal` takes a typed `complete`/`blocked` enum, so the advertised tool schema exposes only those two Codex-compatible statuses and any other value is rejected at deserialization; pause, resume, usage-limited, and budget-limited transitions remain controlled by the user or the backend system path. Errors are returned as MCP tool errors instead of changing state. Before user prompts are submitted to the ACP runtime, `user_input.rs` prepends the current goal as a structured `` block when a goal exists. Hook context is still applied before goal context, and compact summaries remain the outermost framing instruction, so resumed/compacted turns retain their existing prompt-ordering invariant while still carrying goal state to the agent. The prompt goal context and hidden continuation prompt use the same compact elapsed-time and token-count formatting as the visible TUI goal summary. -Agents that are not advertised the local `nori-goal` server still receive goal context through prompt transformation, an immediate hidden goal-continuation prompt when an active goal is set while the runtime is idle, and a single hidden goal-continuation prompt after visible user turns. The local MCP server is additive for capable agents so they can use structured goal tools; it is never required for goal context, transcript replay, usage accounting, or one-shot continuation behavior. +Agents that are not advertised the local `nori-client` server still receive goal context through prompt transformation, an immediate hidden goal-continuation prompt when an active goal is set while the runtime is idle, and a single hidden goal-continuation prompt after visible user turns. The local MCP server is additive for capable agents so they can use structured goal tools; it is never required for goal context, transcript replay, usage accounting, or one-shot continuation behavior. -After an active goal mutation or a visible user prompt completes with `StopReason::EndTurn`, `session_runtime_driver.rs` may submit a hidden goal-continuation prompt to the same ACP session. `thread_goal.rs` owns the continuation prompt text so it is derived from the current backend goal snapshot, not from TUI state or transcript text. The driver only starts a continuation when the goal is active, the reducer has returned to idle, and no queued user work remains. Chaining from one hidden `GoalContinuation` into another is gated on `goal_mcp_connected`, an `@/nori-rs/acp/src/backend/mod.rs` session flag that `thread_goal_mcp.rs` flips only after the local HTTP MCP server receives an `initialize` request. Agents without a connected goal MCP endpoint receive at most one hidden continuation per active goal mutation or visible user turn. +After an active goal mutation or a visible user prompt completes with `StopReason::EndTurn`, `session_runtime_driver.rs` may submit a hidden goal-continuation prompt to the same ACP session. `thread_goal.rs` owns the continuation prompt text so it is derived from the current backend goal snapshot, not from TUI state or transcript text. The driver only starts a continuation when the goal is active, the reducer has returned to idle, and no queued user work remains. Chaining from one hidden `GoalContinuation` into another is gated on `goal_mcp_connected`, an `@/nori-rs/acp/src/backend/mod.rs` session flag that `NoriClientService::initialize` (the rmcp `ServerHandler::initialize` hook) flips only after the local MCP server receives an `initialize` request. This is a safety invariant: until the agent has actually connected to the `nori-client` endpoint it has no way to mark the goal complete, so unbounded continuation-to-continuation chaining is not allowed. Agents without a connected goal MCP endpoint receive at most one hidden continuation per active goal mutation or visible user turn. Goal state is also part of the replay contract. `transcript.rs` passes Nori-owned goal update and clear events through replay, and `session.rs` seeds `ThreadGoalState` from those transcript-derived events before ACP session setup advertises local MCP tools. Server-side `session/load` can also emit ACP replay notifications while loading; those normalized client events are deferred until backend setup completes, then combined with the transcript replay events before rebuilding `ThreadGoalState`. This ordering matters because ACP agents replay their own session history, but they do not replay Nori-owned `ThreadGoalUpdated` events, so the transcript remains authoritative for goal state even when the agent emits load replay notifications. @@ -652,7 +651,7 @@ The ACP connection layer uses SACP v11 (`sacp` crate) to communicate with agent **Approval flow:** The `RequestPermissionRequest` handler translates the request to a Codex `ApprovalRequest`, sends it through the ordered inbox, and uses the SACP responder plus `ConnectionTo` to send the eventual review decision back without blocking the dispatch loop while the UI collects user input. -**MCP Server Forwarding and Backend-Owned Goal MCP** (`connection/mcp.rs`, `backend/thread_goal_http_mcp.rs`): +**MCP Server Forwarding and the Backend-Owned `nori-client` MCP Server** (`connection/mcp.rs`, `backend/nori_client_mcp.rs`): CLI-configured MCP servers (from `config.toml`) are converted to ACP schema types and passed to the agent via `NewSessionRequest.mcp_servers` at session creation time. The `to_sacp_mcp_servers()` function in `connection/mcp.rs` bridges `codex_core::config::types::McpServerConfig` to ACP `McpServer` values inside the transport adapter: @@ -670,13 +669,13 @@ Disabled servers (`enabled == false`) are filtered out before conversion. Enviro This means `to_sacp_mcp_servers()` has side effects (reads from keyring/file system) rather than being a pure config transformation. The `acp` crate depends on `codex-rmcp-client`'s `load_oauth_tokens` for this purpose. -`thread_goal_http_mcp.rs` is the backend-owned MCP escape hatch for goal tools. It binds a loopback TCP listener, serves the minimal JSON-RPC-over-HTTP requests the ACP adapters send for MCP initialization, tool listing, and tool calls, and appends an HTTP ACP MCP server entry to the same `mcp_servers` list used for configured servers. The server handle is stored on `AcpBackend`, so it lives for the backend conversation and is dropped with the backend. +`nori_client_mcp.rs` is the backend-owned MCP channel for harness-side tooling (currently the goal tools). It serves rmcp's spec-compliant `StreamableHttpService` (streamable-HTTP, stateless mode) over a loopback `axum` listener bound on `127.0.0.1`, and `register_for_session` appends an HTTP ACP MCP server entry named `nori-client` to the same `mcp_servers` list used for configured servers. The transport handles all MCP framing (initialize, tool listing, tool calls); there is no hand-rolled HTTP or JSON-RPC parsing in process. The `NoriClientServer` handle lives on `GoalRuntime`, aborts its serving task on drop, and is dropped with the backend. -The goal server uses a normal `http://127.0.0.1:/mcp` URL because current Codex and Claude ACP adapters forward ACP MCP server entries into their underlying clients as ordinary HTTP MCP config. Avoiding `acp:` keeps startup compatible with those adapters while still keeping the tool implementation in process. +The server uses a normal `http://127.0.0.1:/mcp` URL because current Codex and Claude ACP adapters forward ACP MCP server entries into their underlying clients as ordinary HTTP MCP config. Avoiding `acp:` keeps startup compatible with those adapters while still keeping the tool implementation in process. -ACP session setup paths build the MCP server list in two phases: first convert configured MCP servers with `to_sacp_mcp_servers()`, then let backend-owned features append local MCP servers when their own eligibility checks pass. The goal feature requires HTTP MCP support. This setup applies to resumed, fresh, fallback, and compaction-created sessions. Hook-only ACP sessions pass an empty list because hooks do not need user-configured or backend-owned MCP servers. +ACP session setup paths build the MCP server list in two phases: first convert configured MCP servers with `to_sacp_mcp_servers()`, then let backend-owned features append local MCP servers when their own eligibility checks pass. The `nori-client` server requires HTTP MCP support. This setup applies to resumed, fresh, fallback, and compaction-created sessions. Hook-only ACP sessions pass an empty list because hooks do not need user-configured or backend-owned MCP servers. -The local goal MCP server is intentionally additive. User-configured MCP servers are still forwarded normally, and ineligible agents simply do not receive the `nori-goal` loopback endpoint. Continuation chaining depends on the local MCP server actually being initialized for the current advertised endpoint, not just on HTTP MCP support or endpoint advertisement. +The local `nori-client` MCP server is intentionally additive. User-configured MCP servers are still forwarded normally, and ineligible agents simply do not receive the loopback endpoint. Continuation chaining depends on the local MCP server actually being initialized for the current advertised endpoint, not just on HTTP MCP support or endpoint advertisement. ### Transcript Persistence From 2ef2ef6817bf6c1c2ebd9eab747eed0662957c7f Mon Sep 17 00:00:00 2001 From: Clifford Ressel Date: Fri, 29 May 2026 16:33:59 -0400 Subject: [PATCH 3/4] test(goal): real rmcp-client round-trip; address review nits - Add real_mcp_client_round_trips_over_http: a real rmcp StreamableHttp client connects to the loopback nori-client server, verifies the initialize handshake flips the `connected` gate, lists tools, and round-trips create_goal/get_goal. Enables rmcp client features on the acp dev-dependency only (no new crate). - Rename GoalRuntime.http_server -> mcp_server for clarity. - Fix stale mock-acp-agent/docs.md reference to the deleted goal MCP modules. --- nori-rs/acp/Cargo.toml | 5 +- nori-rs/acp/src/backend/nori_client_mcp.rs | 72 ++++++++++++++++++++++ nori-rs/mock-acp-agent/docs.md | 2 +- 3 files changed, 77 insertions(+), 2 deletions(-) diff --git a/nori-rs/acp/Cargo.toml b/nori-rs/acp/Cargo.toml index c33e69874..34b260a5e 100644 --- a/nori-rs/acp/Cargo.toml +++ b/nori-rs/acp/Cargo.toml @@ -56,7 +56,10 @@ libc = { workspace = true } [dev-dependencies] filetime = "0.2" pretty_assertions = { workspace = true } -rmcp = { workspace = true } +rmcp = { workspace = true, features = [ + "client", + "transport-streamable-http-client-reqwest", +] } serial_test = { workspace = true } tempfile = { workspace = true } tokio-test = { workspace = true } diff --git a/nori-rs/acp/src/backend/nori_client_mcp.rs b/nori-rs/acp/src/backend/nori_client_mcp.rs index 5deafb965..353318288 100644 --- a/nori-rs/acp/src/backend/nori_client_mcp.rs +++ b/nori-rs/acp/src/backend/nori_client_mcp.rs @@ -491,4 +491,76 @@ mod tests { assert!(is_error(&result)); assert!(tool_text(&result).contains("no goal exists")); } + + /// End-to-end: a real spec-compliant rmcp client connects to the loopback + /// server over HTTP, exercising the streamable-HTTP transport, the + /// `initialize` handshake (which must flip `connected`), tool discovery, and + /// a `create_goal` round-trip — i.e. the path a real ACP agent takes. + #[tokio::test] + async fn real_mcp_client_round_trips_over_http() { + use rmcp::ServiceExt; + use rmcp::model::CallToolRequestParam; + use rmcp::transport::StreamableHttpClientTransport; + + let connected = Arc::new(AtomicBool::new(false)); + let state = Arc::new(Mutex::new(thread_goal::ThreadGoalState::default())); + let (backend_event_tx, _backend_event_rx) = mpsc::channel(8); + let server = NoriClientServer::spawn(NoriClientShared::new( + Arc::clone(&state), + backend_event_tx, + Arc::new(Mutex::new(None)), + Arc::clone(&connected), + )) + .await + .expect("nori-client server should spawn"); + + let transport = StreamableHttpClientTransport::from_uri(server.url().to_string()); + let client = () + .serve(transport) + .await + .expect("rmcp client should complete the initialize handshake"); + + assert!( + connected.load(Ordering::Relaxed), + "initialize handshake must flip the connected gate" + ); + + let tools = client + .list_all_tools() + .await + .expect("client should list tools"); + let tool_names: Vec<&str> = tools.iter().map(|tool| tool.name.as_ref()).collect(); + assert!(tool_names.contains(&"get_goal")); + assert!(tool_names.contains(&"create_goal")); + assert!(tool_names.contains(&"update_goal")); + + let create = client + .call_tool(CallToolRequestParam { + name: "create_goal".into(), + arguments: serde_json::json!({ "objective": "Round-trip over real MCP" }) + .as_object() + .cloned(), + }) + .await + .expect("create_goal call should succeed"); + assert_eq!(create.is_error, Some(false)); + + let get = client + .call_tool(CallToolRequestParam { + name: "get_goal".into(), + arguments: None, + }) + .await + .expect("get_goal call should succeed"); + let goal_text = get.content[0] + .as_text() + .expect("get_goal returns text") + .text + .as_str(); + let goal: serde_json::Value = serde_json::from_str(goal_text).expect("goal json"); + assert_eq!(goal["goal"]["objective"], "Round-trip over real MCP"); + assert_eq!(goal["goal"]["status"], "active"); + + client.cancel().await.expect("client should shut down"); + } } diff --git a/nori-rs/mock-acp-agent/docs.md b/nori-rs/mock-acp-agent/docs.md index 021d5edbb..42c7dff05 100644 --- a/nori-rs/mock-acp-agent/docs.md +++ b/nori-rs/mock-acp-agent/docs.md @@ -23,7 +23,7 @@ Used by `@/nori-rs/tui-pty-e2e/` for end-to-end integration testing. The mock ag **Session Lifecycle Testing**: Several env vars control `session/load` behavior for testing the resume path in `@/nori-rs/acp/src/backend/session.rs`: - `MOCK_AGENT_SUPPORT_LOAD_SESSION` -- when set, the agent advertises `load_session: true` in its capabilities during `initialize()` -- `MOCK_AGENT_MCP_HTTP` -- when set, the agent advertises HTTP MCP capability so `@/nori-rs/acp/src/backend/thread_goal_mcp.rs` and `@/nori-rs/acp/src/backend/thread_goal_http_mcp.rs` can be tested through the normal `session/new` MCP server advertisement path +- `MOCK_AGENT_MCP_HTTP` -- when set, the agent advertises HTTP MCP capability so the backend-owned `nori-client` MCP server in `@/nori-rs/acp/src/backend/nori_client_mcp.rs` can be tested through the normal `session/new` MCP server advertisement path - `MOCK_AGENT_LOAD_SESSION_FAIL` -- when set, the `load_session()` handler returns an error instead of succeeding, allowing tests to exercise the runtime-failure fallback path - `MOCK_AGENT_LOAD_SESSION_NOTIFICATION_COUNT` -- when set to an integer N, the `load_session()` handler sends N text-chunk notifications (via `send_text_chunk()`) before returning, simulating history replay with a configurable volume of events. Used to test the deferred-relay pattern in `resume_session()` that prevents deadlocks when the notification count exceeds the bounded `event_tx` channel capacity. From 428b5a5804ce8e55fb8317101935a88bbdf51f81 Mon Sep 17 00:00:00 2001 From: Clifford Ressel Date: Mon, 1 Jun 2026 13:20:01 -0400 Subject: [PATCH 4/4] chore(goal): tidy nori-client docs --- goal-command-architecture-diagrams.md | 2 +- nori-rs/acp/docs.md | 2 +- nori-rs/acp/src/backend/nori_client_mcp.rs | 4 ++-- nori-rs/acp/src/backend/session_runtime_driver.rs | 4 ++-- 4 files changed, 6 insertions(+), 6 deletions(-) diff --git a/goal-command-architecture-diagrams.md b/goal-command-architecture-diagrams.md index 4670de2b6..712ece552 100644 --- a/goal-command-architecture-diagrams.md +++ b/goal-command-architecture-diagrams.md @@ -255,7 +255,7 @@ ThreadGoalState status changes; continuation loop stops - The local `nori-client` MCP server exposes `get_goal`, `create_goal`, and `update_goal` as typed rmcp `#[tool]` handlers on an rmcp `StreamableHttpService` (served over a loopback `axum` listener) in `acp/src/backend/nori_client_mcp.rs`. - `nori-client` is Nori's general harness-side channel to the agent — the goal + `nori-client` is Nori's general harness-side channel to the agent; the goal tools are its first tenants, not the whole of it. ## Comparison diff --git a/nori-rs/acp/docs.md b/nori-rs/acp/docs.md index 4b686d6a0..778099154 100644 --- a/nori-rs/acp/docs.md +++ b/nori-rs/acp/docs.md @@ -669,7 +669,7 @@ Disabled servers (`enabled == false`) are filtered out before conversion. Enviro This means `to_sacp_mcp_servers()` has side effects (reads from keyring/file system) rather than being a pure config transformation. The `acp` crate depends on `codex-rmcp-client`'s `load_oauth_tokens` for this purpose. -`nori_client_mcp.rs` is the backend-owned MCP channel for harness-side tooling (currently the goal tools). It serves rmcp's spec-compliant `StreamableHttpService` (streamable-HTTP, stateless mode) over a loopback `axum` listener bound on `127.0.0.1`, and `register_for_session` appends an HTTP ACP MCP server entry named `nori-client` to the same `mcp_servers` list used for configured servers. The transport handles all MCP framing (initialize, tool listing, tool calls); there is no hand-rolled HTTP or JSON-RPC parsing in process. The `NoriClientServer` handle lives on `GoalRuntime`, aborts its serving task on drop, and is dropped with the backend. +`nori_client_mcp.rs` is the backend-owned MCP channel for harness-side tooling (currently the goal tools). It serves rmcp's spec-compliant `StreamableHttpService` (streamable-HTTP, stateless mode) over a loopback `axum` listener bound on `127.0.0.1`, and `register_for_session` appends an HTTP ACP MCP server entry named `nori-client` to the same `mcp_servers` list used for configured servers. The transport handles all MCP framing (initialize, tool listing, tool calls); there is no hand-rolled HTTP or JSON-RPC parsing in process. The `NoriClientServer` handle lives on `AcpBackend`, aborts its serving task on drop, and is dropped with the backend. The server uses a normal `http://127.0.0.1:/mcp` URL because current Codex and Claude ACP adapters forward ACP MCP server entries into their underlying clients as ordinary HTTP MCP config. Avoiding `acp:` keeps startup compatible with those adapters while still keeping the tool implementation in process. diff --git a/nori-rs/acp/src/backend/nori_client_mcp.rs b/nori-rs/acp/src/backend/nori_client_mcp.rs index 353318288..f9325b85c 100644 --- a/nori-rs/acp/src/backend/nori_client_mcp.rs +++ b/nori-rs/acp/src/backend/nori_client_mcp.rs @@ -8,7 +8,7 @@ //! goal-only surface. //! //! The transport is rmcp's spec-compliant streamable-HTTP server, served on a -//! loopback port. The tools are typed `#[tool]` handlers — there is no +//! loopback port. The tools are typed `#[tool]` handlers; there is no //! hand-rolled HTTP or JSON-RPC framing here. use std::sync::Arc; @@ -495,7 +495,7 @@ mod tests { /// End-to-end: a real spec-compliant rmcp client connects to the loopback /// server over HTTP, exercising the streamable-HTTP transport, the /// `initialize` handshake (which must flip `connected`), tool discovery, and - /// a `create_goal` round-trip — i.e. the path a real ACP agent takes. + /// a `create_goal` round-trip, i.e. the path a real ACP agent takes. #[tokio::test] async fn real_mcp_client_round_trips_over_http() { use rmcp::ServiceExt; diff --git a/nori-rs/acp/src/backend/session_runtime_driver.rs b/nori-rs/acp/src/backend/session_runtime_driver.rs index 6c1a963a3..fbd16110e 100644 --- a/nori-rs/acp/src/backend/session_runtime_driver.rs +++ b/nori-rs/acp/src/backend/session_runtime_driver.rs @@ -542,8 +542,8 @@ impl AcpBackend { // hidden continuation once we have observed the agent actually connect to // the `nori-client` MCP endpoint (`goal.connected`). Until then the agent // has no way to mark the goal complete/blocked, so unbounded - // continuation→continuation chaining would loop forever. A user turn may - // always start one continuation; only continuation→continuation chaining + // continuation-to-continuation chaining would loop forever. A user turn may + // always start one continuation; only continuation-to-continuation chaining // requires the completion tool to be reachable. let can_chain_continuation = self .goal_mcp_connected