From a862d9149263b01eba2a776ff125deed0b37e589 Mon Sep 17 00:00:00 2001 From: npub1mn7jgtj4w2pd0g0zeuhxsa6jy6p0rewxz4kujt98my82ahfmp72sxjexk7 Date: Fri, 12 Jun 2026 14:10:26 -0400 Subject: [PATCH 1/2] fix(acp): remove channel-level death notices from handle_prompt_result MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Death/timeout notices are debugging signals that belong in the activity feed (emit_turn_error/observer), not as regular channel messages. Remove all publish_death_notice calls from handle_prompt_result — the observer path already fires for every error outcome and routes to the activity panel. Removes the thread_root extraction that was only consumed by these calls. Co-authored-by: Will Pfleger Signed-off-by: Will Pfleger --- crates/buzz-acp/src/lib.rs | 24 +----------------------- 1 file changed, 1 insertion(+), 23 deletions(-) diff --git a/crates/buzz-acp/src/lib.rs b/crates/buzz-acp/src/lib.rs index 7a0e7f3af..f2638440e 100644 --- a/crates/buzz-acp/src/lib.rs +++ b/crates/buzz-acp/src/lib.rs @@ -2079,15 +2079,6 @@ fn handle_prompt_result( .retain(|_, meta| meta.agent_index != agent_index); debug_assert_eq!(before, pool.task_map().len() + 1); - // Extract thread root from the batch before it's consumed by requeue. - // Used by death notices to thread the message into the original conversation. - let thread_root: Option = result - .batch - .as_ref() - .and_then(|b| b.events.first()) - .map(|e| queue::parse_thread_tags(&e.event)) - .and_then(|tags| tags.root_event_id); - // Requeue BEFORE mark_complete: requeue() sets retry_after with a future // deadline, and mark_complete() checks for it to decide whether to preserve // retry_counts. If mark_complete runs first, retry_counts is cleared and @@ -2176,11 +2167,6 @@ fn handle_prompt_result( }; emit_turn_error(death_message); - // Post a visible death notice to the channel so humans know why - // the agent went silent. - if let Some(ch) = channel_id { - relay.publish_death_notice(ch, death_message, thread_root.as_deref()); - } let index = result.agent.index; let slot_history = &mut crash_history[index]; if !spawn_respawn_task( @@ -2238,15 +2224,6 @@ fn handle_prompt_result( ); emit_turn_error(&e.to_string()); - // Post a visible death notice for transport errors too. - if let Some(ch) = channel_id { - relay.publish_death_notice( - ch, - "Agent connection lost (transport error)", - thread_root.as_deref(), - ); - } - let index = result.agent.index; let slot_history = &mut crash_history[index]; if !spawn_respawn_task( @@ -2270,6 +2247,7 @@ fn handle_prompt_result( "agent_returned (application error — pipe intact)" ); emit_turn_error(&e.to_string()); + pool.return_agent(result.agent); } } From 05f17da09f06771f169feecb0b22abee6d64cc04 Mon Sep 17 00:00:00 2001 From: npub1mn7jgtj4w2pd0g0zeuhxsa6jy6p0rewxz4kujt98my82ahfmp72sxjexk7 Date: Fri, 12 Jun 2026 17:31:07 -0400 Subject: [PATCH 2/2] test(acp): remove dead death-notice builder, pin feed-only error policy MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit After the channel-notice call sites were removed, publish_death_notice and build_death_notice in relay.rs had zero callers — dead public API that left a path for re-introducing channel notices. Remove them, their EventBuild error variant, and the three builder unit tests. Add error_outcome_emission_tests pinning the policy that error-class outcomes surface only to the activity feed: handle_prompt_result takes no relay handle (channel silence is structural — re-adding a notice would have to re-add the parameter), and each error branch must emit exactly one turn_error observer event (asserted at runtime, red if any branch drops emit_turn_error). Co-authored-by: Will Pfleger Signed-off-by: Will Pfleger --- crates/buzz-acp/src/lib.rs | 168 ++++++++++++++++++++++++++++++++++- crates/buzz-acp/src/relay.rs | 119 ------------------------- 2 files changed, 165 insertions(+), 122 deletions(-) diff --git a/crates/buzz-acp/src/lib.rs b/crates/buzz-acp/src/lib.rs index f2638440e..93ce85637 100644 --- a/crates/buzz-acp/src/lib.rs +++ b/crates/buzz-acp/src/lib.rs @@ -1776,7 +1776,6 @@ async fn tokio_main() -> Result<()> { &respawn_tx, &mut respawn_tasks, observer.clone(), - &relay, ) == LoopAction::Exit { break; @@ -2071,7 +2070,6 @@ fn handle_prompt_result( respawn_tx: &mpsc::Sender, respawn_tasks: &mut tokio::task::JoinSet<()>, observer: Option, - relay: &HarnessRelay, ) -> LoopAction { let before = pool.task_map().len(); let agent_index = result.agent.index; @@ -2247,7 +2245,6 @@ fn handle_prompt_result( "agent_returned (application error — pipe intact)" ); emit_turn_error(&e.to_string()); - pool.return_agent(result.agent); } } @@ -3124,3 +3121,168 @@ mod build_mcp_servers_tests { ); } } + +#[cfg(test)] +mod error_outcome_emission_tests { + //! Pins the policy that error-class outcomes surface to the activity feed + //! and never to the channel: + //! + //! - Channel silence is enforced *structurally* — `handle_prompt_result` + //! takes no relay handle, so it has no way to post a channel message. A + //! future re-introduction of channel notices would have to add the relay + //! parameter back, which these tests' construction would then refuse to + //! compile against. + //! - Feed coverage is the regression-prone half and is asserted at runtime: + //! each error outcome must emit exactly one `turn_error` observer event. + //! If any branch drops its `emit_turn_error` call, the matching test goes + //! red. + + use super::*; + use crate::acp::{AcpClient, AcpError}; + use crate::observer::ObserverHandle; + use crate::pool::{AgentPool, OwnedAgent, PromptOutcome, PromptResult, PromptSource}; + use std::collections::HashSet; + + fn test_config() -> Config { + Config { + keys: nostr::Keys::generate(), + relay_url: "ws://localhost:3000".into(), + // `true` exits cleanly, so the async respawn fails fast and + // harmlessly off the JoinSet — irrelevant to the synchronous + // feed emission under test. + agent_command: "true".into(), + agent_args: vec![], + mcp_command: "test-mcp-server".into(), + idle_timeout_secs: config::DEFAULT_IDLE_TIMEOUT_SECS, + max_turn_duration_secs: 3600, + agents: 1, + heartbeat_interval_secs: 0, + heartbeat_prompt: None, + system_prompt: None, + initial_message: None, + subscribe_mode: config::SubscribeMode::All, + dedup_mode: config::DedupMode::Queue, + multiple_event_handling: config::MultipleEventHandling::Queue, + ignore_self: true, + kinds_override: None, + channels_override: None, + no_mention_filter: false, + config_path: std::path::PathBuf::from("./buzz-acp.toml"), + context_message_limit: 12, + max_turns_per_session: 0, + presence_enabled: true, + typing_enabled: true, + memory_enabled: false, + model: None, + permission_mode: config::PermissionMode::BypassPermissions, + respond_to: config::RespondTo::Anyone, + respond_to_allowlist: HashSet::new(), + persona_env_vars: vec![], + relay_observer: false, + agent_owner: None, + no_base_prompt: false, + base_prompt_content: None, + } + } + + /// Spawn a real but inert agent subprocess (`cat`) so the error paths have + /// an `OwnedAgent` to move into respawn or return to the pool. The error + /// branches never talk to the subprocess. + async fn dummy_agent(index: usize) -> OwnedAgent { + OwnedAgent { + index, + acp: AcpClient::spawn("cat", &[], &[]) + .await + .expect("spawn cat as inert agent"), + state: Default::default(), + model_capabilities: None, + desired_model: None, + // Error branches under test never read this; 1 is the legacy + // non-systemPrompt path, the simplest valid value. + protocol_version: 1, + } + } + + /// Drive one error outcome through `handle_prompt_result` and return how + /// many `turn_error` events it emitted to the observer feed. + async fn turn_errors_emitted_for(outcome: PromptOutcome) -> usize { + let agent = dummy_agent(0).await; + let mut pool = AgentPool::from_slots(vec![None]); + + // `handle_prompt_result` asserts it removes exactly one in-flight task + // for the completing agent (the slot was checked out, not idle). Mirror + // the real dispatch path by registering a TaskMeta keyed on a genuine + // `task::Id` — only obtainable from inside a spawned task. + let task_id = pool.join_set.spawn(async {}).id(); + pool.task_map_mut().insert( + task_id, + crate::pool::TaskMeta { + agent_index: 0, + channel_id: None, + recoverable_batch: None, + control_tx: None, + }, + ); + + let mut queue = EventQueue::new(config::DedupMode::Queue); + let config = test_config(); + let mut heartbeat_in_flight = false; + let removed_channels = HashSet::new(); + let mut crash_history = vec![SlotCircuit { + crash_times: Vec::new(), + open_until: None, + respawn_in_flight: false, + }]; + let (respawn_tx, _respawn_rx) = mpsc::channel(8); + let mut respawn_tasks = tokio::task::JoinSet::new(); + let observer = ObserverHandle::in_process(); + + let result = PromptResult { + agent, + source: PromptSource::Channel(Uuid::new_v4()), + outcome, + batch: None, + }; + + handle_prompt_result( + &mut pool, + &mut queue, + &config, + result, + &mut heartbeat_in_flight, + &removed_channels, + &mut crash_history, + &respawn_tx, + &mut respawn_tasks, + Some(observer.clone()), + ); + + observer + .snapshot() + .iter() + .filter(|e| e.kind == "turn_error") + .count() + } + + #[tokio::test] + async fn agent_exited_emits_exactly_one_feed_event() { + assert_eq!(turn_errors_emitted_for(PromptOutcome::AgentExited).await, 1); + } + + #[tokio::test] + async fn timeout_emits_exactly_one_feed_event() { + assert_eq!(turn_errors_emitted_for(PromptOutcome::Timeout).await, 1); + } + + #[tokio::test] + async fn transport_error_emits_exactly_one_feed_event() { + let io = AcpError::Io(std::io::Error::other("pipe broke")); + assert_eq!(turn_errors_emitted_for(PromptOutcome::Error(io)).await, 1); + } + + #[tokio::test] + async fn application_error_emits_exactly_one_feed_event() { + let app = AcpError::IdleTimeout(std::time::Duration::from_secs(1)); + assert_eq!(turn_errors_emitted_for(PromptOutcome::Error(app)).await, 1); + } +} diff --git a/crates/buzz-acp/src/relay.rs b/crates/buzz-acp/src/relay.rs index 79c8fe216..e4c357a61 100644 --- a/crates/buzz-acp/src/relay.rs +++ b/crates/buzz-acp/src/relay.rs @@ -327,9 +327,6 @@ pub enum RelayError { #[error("Unexpected message: {0}")] UnexpectedMessage(String), - - #[error("Event build error: {0}")] - EventBuild(String), } impl From for RelayError { @@ -778,44 +775,6 @@ impl HarnessRelay { Ok(event) } - /// Build a channel message (kind:9) for death notices — posted when the - /// agent session ends due to timeout or unexpected exit. - /// If `thread_root` is provided, the message is threaded as a reply. - pub fn build_death_notice( - &self, - channel_id: Uuid, - message: &str, - thread_root: Option<&str>, - ) -> Result { - use buzz_core::kind::KIND_STREAM_MESSAGE; - - let h_tag = Tag::parse(["h", &channel_id.to_string()]) - .map_err(|e| RelayError::EventBuild(e.to_string()))?; - let mut tags = vec![h_tag]; - if let Some(root_id) = thread_root { - let e_tag = Tag::parse(["e", root_id, "", "root"]) - .map_err(|e| RelayError::EventBuild(e.to_string()))?; - tags.push(e_tag); - } - let event = EventBuilder::new(Kind::Custom(KIND_STREAM_MESSAGE as u16), message) - .tags(tags) - .sign_with_keys(&self.keys)?; - Ok(event) - } - - /// Build and publish a death notice, logging any failure rather than - /// propagating it. Used by both the timeout and transport-error paths. - pub fn publish_death_notice(&self, channel_id: Uuid, message: &str, thread_root: Option<&str>) { - match self.build_death_notice(channel_id, message, thread_root) { - Ok(event) => { - if let Err(e) = self.try_publish_event(event) { - warn!("failed to publish death notice: {e}"); - } - } - Err(e) => warn!("failed to build death notice: {e}"), - } - } - /// Set the startup watermark timestamp (Finding #22). /// /// Call this once after `connect()` with the Unix timestamp captured just @@ -3519,82 +3478,4 @@ mod tests { "after backpressure removal, replay must be accepted" ); } - - // ── Death-notice builder (PR #935 fix: reply→root marker) ───────────────── - - /// Construct a minimal HarnessRelay for unit-testing `build_death_notice`. - /// Only `keys` is exercised; other fields are dummies. - fn make_test_relay() -> HarnessRelay { - let (cmd_tx, _cmd_rx) = mpsc::channel(1); - let (_event_tx, event_rx) = mpsc::channel(1); - HarnessRelay { - event_rx, - observer_control_rx: None, - cmd_tx, - http: reqwest::Client::new(), - relay_url: "ws://localhost:3000".into(), - keys: nostr::Keys::generate(), - auth_tag: None, - bg_handle: None, - } - } - - #[test] - fn death_notice_has_root_marker_not_reply() { - let relay = make_test_relay(); - let channel_id = Uuid::new_v4(); - let root_id = "abcdef1234567890abcdef1234567890abcdef1234567890abcdef1234567890"; - let event = relay - .build_death_notice(channel_id, "agent timed out", Some(root_id)) - .expect("build_death_notice should succeed"); - - // The NIP-10 e-tag must use "root" marker (not "reply"). - let e_tags: Vec<_> = event - .tags - .iter() - .filter(|t| t.as_slice().first().map(|s| s.as_str()) == Some("e")) - .collect(); - assert_eq!(e_tags.len(), 1, "expected exactly one e-tag"); - let e_tag = e_tags[0].as_slice(); - assert_eq!(e_tag[1], root_id, "e-tag should reference the root event"); - assert_eq!(e_tag[2], "", "relay hint should be empty"); - assert_eq!(e_tag[3], "root", "marker must be 'root', not 'reply'"); - } - - #[test] - fn death_notice_has_channel_h_tag() { - let relay = make_test_relay(); - let channel_id = Uuid::new_v4(); - let event = relay - .build_death_notice(channel_id, "agent crashed", None) - .expect("build_death_notice should succeed"); - - let h_tags: Vec<_> = event - .tags - .iter() - .filter(|t| t.as_slice().first().map(|s| s.as_str()) == Some("h")) - .collect(); - assert_eq!(h_tags.len(), 1, "expected exactly one h-tag"); - assert_eq!( - h_tags[0].as_slice()[1], - channel_id.to_string(), - "h-tag should contain the channel UUID" - ); - } - - #[test] - fn death_notice_without_thread_root_has_no_e_tag() { - let relay = make_test_relay(); - let channel_id = Uuid::new_v4(); - let event = relay - .build_death_notice(channel_id, "agent exited", None) - .expect("build_death_notice should succeed"); - - let e_tags: Vec<_> = event - .tags - .iter() - .filter(|t| t.as_slice().first().map(|s| s.as_str()) == Some("e")) - .collect(); - assert!(e_tags.is_empty(), "no e-tag when thread_root is None"); - } }