Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
190 changes: 165 additions & 25 deletions crates/buzz-acp/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1776,7 +1776,6 @@ async fn tokio_main() -> Result<()> {
&respawn_tx,
&mut respawn_tasks,
observer.clone(),
&relay,
) == LoopAction::Exit
{
break;
Expand Down Expand Up @@ -2071,23 +2070,13 @@ fn handle_prompt_result(
respawn_tx: &mpsc::Sender<RespawnResult>,
respawn_tasks: &mut tokio::task::JoinSet<()>,
observer: Option<observer::ObserverHandle>,
relay: &HarnessRelay,
) -> LoopAction {
let before = pool.task_map().len();
let agent_index = result.agent.index;
pool.task_map_mut()
.retain(|_, meta| meta.agent_index != agent_index);
debug_assert_eq!(before, pool.task_map().len() + 1);

// Extract thread root from the batch before it's consumed by requeue.
// Used by death notices to thread the message into the original conversation.
let thread_root: Option<String> = result
.batch
.as_ref()
.and_then(|b| b.events.first())
.map(|e| queue::parse_thread_tags(&e.event))
.and_then(|tags| tags.root_event_id);

// Requeue BEFORE mark_complete: requeue() sets retry_after with a future
// deadline, and mark_complete() checks for it to decide whether to preserve
// retry_counts. If mark_complete runs first, retry_counts is cleared and
Expand Down Expand Up @@ -2176,11 +2165,6 @@ fn handle_prompt_result(
};
emit_turn_error(death_message);

// Post a visible death notice to the channel so humans know why
// the agent went silent.
if let Some(ch) = channel_id {
relay.publish_death_notice(ch, death_message, thread_root.as_deref());
}
let index = result.agent.index;
let slot_history = &mut crash_history[index];
if !spawn_respawn_task(
Expand Down Expand Up @@ -2238,15 +2222,6 @@ fn handle_prompt_result(
);
emit_turn_error(&e.to_string());

// Post a visible death notice for transport errors too.
if let Some(ch) = channel_id {
relay.publish_death_notice(
ch,
"Agent connection lost (transport error)",
thread_root.as_deref(),
);
}

let index = result.agent.index;
let slot_history = &mut crash_history[index];
if !spawn_respawn_task(
Expand Down Expand Up @@ -3146,3 +3121,168 @@ mod build_mcp_servers_tests {
);
}
}

#[cfg(test)]
mod error_outcome_emission_tests {
//! Pins the policy that error-class outcomes surface to the activity feed
//! and never to the channel:
//!
//! - Channel silence is enforced *structurally* — `handle_prompt_result`
//! takes no relay handle, so it has no way to post a channel message. A
//! future re-introduction of channel notices would have to add the relay
//! parameter back, which these tests' construction would then refuse to
//! compile against.
//! - Feed coverage is the regression-prone half and is asserted at runtime:
//! each error outcome must emit exactly one `turn_error` observer event.
//! If any branch drops its `emit_turn_error` call, the matching test goes
//! red.

use super::*;
use crate::acp::{AcpClient, AcpError};
use crate::observer::ObserverHandle;
use crate::pool::{AgentPool, OwnedAgent, PromptOutcome, PromptResult, PromptSource};
use std::collections::HashSet;

fn test_config() -> Config {
Config {
keys: nostr::Keys::generate(),
relay_url: "ws://localhost:3000".into(),
// `true` exits cleanly, so the async respawn fails fast and
// harmlessly off the JoinSet — irrelevant to the synchronous
// feed emission under test.
agent_command: "true".into(),
agent_args: vec![],
mcp_command: "test-mcp-server".into(),
idle_timeout_secs: config::DEFAULT_IDLE_TIMEOUT_SECS,
max_turn_duration_secs: 3600,
agents: 1,
heartbeat_interval_secs: 0,
heartbeat_prompt: None,
system_prompt: None,
initial_message: None,
subscribe_mode: config::SubscribeMode::All,
dedup_mode: config::DedupMode::Queue,
multiple_event_handling: config::MultipleEventHandling::Queue,
ignore_self: true,
kinds_override: None,
channels_override: None,
no_mention_filter: false,
config_path: std::path::PathBuf::from("./buzz-acp.toml"),
context_message_limit: 12,
max_turns_per_session: 0,
presence_enabled: true,
typing_enabled: true,
memory_enabled: false,
model: None,
permission_mode: config::PermissionMode::BypassPermissions,
respond_to: config::RespondTo::Anyone,
respond_to_allowlist: HashSet::new(),
persona_env_vars: vec![],
relay_observer: false,
agent_owner: None,
no_base_prompt: false,
base_prompt_content: None,
}
}

/// Spawn a real but inert agent subprocess (`cat`) so the error paths have
/// an `OwnedAgent` to move into respawn or return to the pool. The error
/// branches never talk to the subprocess.
async fn dummy_agent(index: usize) -> OwnedAgent {
OwnedAgent {
index,
acp: AcpClient::spawn("cat", &[], &[])
.await
.expect("spawn cat as inert agent"),
state: Default::default(),
model_capabilities: None,
desired_model: None,
// Error branches under test never read this; 1 is the legacy
// non-systemPrompt path, the simplest valid value.
protocol_version: 1,
}
}

/// Drive one error outcome through `handle_prompt_result` and return how
/// many `turn_error` events it emitted to the observer feed.
async fn turn_errors_emitted_for(outcome: PromptOutcome) -> usize {
let agent = dummy_agent(0).await;
let mut pool = AgentPool::from_slots(vec![None]);

// `handle_prompt_result` asserts it removes exactly one in-flight task
// for the completing agent (the slot was checked out, not idle). Mirror
// the real dispatch path by registering a TaskMeta keyed on a genuine
// `task::Id` — only obtainable from inside a spawned task.
let task_id = pool.join_set.spawn(async {}).id();
pool.task_map_mut().insert(
task_id,
crate::pool::TaskMeta {
agent_index: 0,
channel_id: None,
recoverable_batch: None,
control_tx: None,
},
);

let mut queue = EventQueue::new(config::DedupMode::Queue);
let config = test_config();
let mut heartbeat_in_flight = false;
let removed_channels = HashSet::new();
let mut crash_history = vec![SlotCircuit {
crash_times: Vec::new(),
open_until: None,
respawn_in_flight: false,
}];
let (respawn_tx, _respawn_rx) = mpsc::channel(8);
let mut respawn_tasks = tokio::task::JoinSet::new();
let observer = ObserverHandle::in_process();

let result = PromptResult {
agent,
source: PromptSource::Channel(Uuid::new_v4()),
outcome,
batch: None,
};

handle_prompt_result(
&mut pool,
&mut queue,
&config,
result,
&mut heartbeat_in_flight,
&removed_channels,
&mut crash_history,
&respawn_tx,
&mut respawn_tasks,
Some(observer.clone()),
);

observer
.snapshot()
.iter()
.filter(|e| e.kind == "turn_error")
.count()
}

#[tokio::test]
async fn agent_exited_emits_exactly_one_feed_event() {
assert_eq!(turn_errors_emitted_for(PromptOutcome::AgentExited).await, 1);
}

#[tokio::test]
async fn timeout_emits_exactly_one_feed_event() {
assert_eq!(turn_errors_emitted_for(PromptOutcome::Timeout).await, 1);
}

#[tokio::test]
async fn transport_error_emits_exactly_one_feed_event() {
let io = AcpError::Io(std::io::Error::other("pipe broke"));
assert_eq!(turn_errors_emitted_for(PromptOutcome::Error(io)).await, 1);
}

#[tokio::test]
async fn application_error_emits_exactly_one_feed_event() {
let app = AcpError::IdleTimeout(std::time::Duration::from_secs(1));
assert_eq!(turn_errors_emitted_for(PromptOutcome::Error(app)).await, 1);
}
}
119 changes: 0 additions & 119 deletions crates/buzz-acp/src/relay.rs
Original file line number Diff line number Diff line change
Expand Up @@ -327,9 +327,6 @@ pub enum RelayError {

#[error("Unexpected message: {0}")]
UnexpectedMessage(String),

#[error("Event build error: {0}")]
EventBuild(String),
}

impl From<nostr::event::builder::Error> for RelayError {
Expand Down Expand Up @@ -778,44 +775,6 @@ impl HarnessRelay {
Ok(event)
}

/// Build a channel message (kind:9) for death notices — posted when the
/// agent session ends due to timeout or unexpected exit.
/// If `thread_root` is provided, the message is threaded as a reply.
pub fn build_death_notice(
&self,
channel_id: Uuid,
message: &str,
thread_root: Option<&str>,
) -> Result<Event, RelayError> {
use buzz_core::kind::KIND_STREAM_MESSAGE;

let h_tag = Tag::parse(["h", &channel_id.to_string()])
.map_err(|e| RelayError::EventBuild(e.to_string()))?;
let mut tags = vec![h_tag];
if let Some(root_id) = thread_root {
let e_tag = Tag::parse(["e", root_id, "", "root"])
.map_err(|e| RelayError::EventBuild(e.to_string()))?;
tags.push(e_tag);
}
let event = EventBuilder::new(Kind::Custom(KIND_STREAM_MESSAGE as u16), message)
.tags(tags)
.sign_with_keys(&self.keys)?;
Ok(event)
}

/// Build and publish a death notice, logging any failure rather than
/// propagating it. Used by both the timeout and transport-error paths.
pub fn publish_death_notice(&self, channel_id: Uuid, message: &str, thread_root: Option<&str>) {
match self.build_death_notice(channel_id, message, thread_root) {
Ok(event) => {
if let Err(e) = self.try_publish_event(event) {
warn!("failed to publish death notice: {e}");
}
}
Err(e) => warn!("failed to build death notice: {e}"),
}
}

/// Set the startup watermark timestamp (Finding #22).
///
/// Call this once after `connect()` with the Unix timestamp captured just
Expand Down Expand Up @@ -3519,82 +3478,4 @@ mod tests {
"after backpressure removal, replay must be accepted"
);
}

// ── Death-notice builder (PR #935 fix: reply→root marker) ─────────────────

/// Construct a minimal HarnessRelay for unit-testing `build_death_notice`.
/// Only `keys` is exercised; other fields are dummies.
fn make_test_relay() -> HarnessRelay {
let (cmd_tx, _cmd_rx) = mpsc::channel(1);
let (_event_tx, event_rx) = mpsc::channel(1);
HarnessRelay {
event_rx,
observer_control_rx: None,
cmd_tx,
http: reqwest::Client::new(),
relay_url: "ws://localhost:3000".into(),
keys: nostr::Keys::generate(),
auth_tag: None,
bg_handle: None,
}
}

#[test]
fn death_notice_has_root_marker_not_reply() {
let relay = make_test_relay();
let channel_id = Uuid::new_v4();
let root_id = "abcdef1234567890abcdef1234567890abcdef1234567890abcdef1234567890";
let event = relay
.build_death_notice(channel_id, "agent timed out", Some(root_id))
.expect("build_death_notice should succeed");

// The NIP-10 e-tag must use "root" marker (not "reply").
let e_tags: Vec<_> = event
.tags
.iter()
.filter(|t| t.as_slice().first().map(|s| s.as_str()) == Some("e"))
.collect();
assert_eq!(e_tags.len(), 1, "expected exactly one e-tag");
let e_tag = e_tags[0].as_slice();
assert_eq!(e_tag[1], root_id, "e-tag should reference the root event");
assert_eq!(e_tag[2], "", "relay hint should be empty");
assert_eq!(e_tag[3], "root", "marker must be 'root', not 'reply'");
}

#[test]
fn death_notice_has_channel_h_tag() {
let relay = make_test_relay();
let channel_id = Uuid::new_v4();
let event = relay
.build_death_notice(channel_id, "agent crashed", None)
.expect("build_death_notice should succeed");

let h_tags: Vec<_> = event
.tags
.iter()
.filter(|t| t.as_slice().first().map(|s| s.as_str()) == Some("h"))
.collect();
assert_eq!(h_tags.len(), 1, "expected exactly one h-tag");
assert_eq!(
h_tags[0].as_slice()[1],
channel_id.to_string(),
"h-tag should contain the channel UUID"
);
}

#[test]
fn death_notice_without_thread_root_has_no_e_tag() {
let relay = make_test_relay();
let channel_id = Uuid::new_v4();
let event = relay
.build_death_notice(channel_id, "agent exited", None)
.expect("build_death_notice should succeed");

let e_tags: Vec<_> = event
.tags
.iter()
.filter(|t| t.as_slice().first().map(|s| s.as_str()) == Some("e"))
.collect();
assert!(e_tags.is_empty(), "no e-tag when thread_root is None");
}
}