diff --git a/crates/agentic-core/src/executor/accumulator.rs b/crates/agentic-core/src/executor/accumulator.rs index 5b94e8e..f64b53d 100644 --- a/crates/agentic-core/src/executor/accumulator.rs +++ b/crates/agentic-core/src/executor/accumulator.rs @@ -12,11 +12,12 @@ use std::sync::mpsc; use futures::{Stream, StreamExt}; +use crate::events::{EventFrame, EventPayload, SSEEventType, normalize_sse_line}; use crate::executor::error::{ExecutorError, ExecutorResult}; -use crate::types::event::{MessageStatus, ResponseStatus, SSEEventType}; +use crate::types::event::{MessageStatus, ResponseStatus}; use crate::types::io::{OutputItem, OutputMessage, OutputTextContent, ResponseUsage}; use crate::types::request_response::{IncompleteDetails, ResponsePayload}; -use crate::utils::common::{deserialize_from_str, deserialize_from_value, deserialize_from_value_opt}; +use crate::utils::common::{deserialize_from_str, deserialize_from_value_opt}; use crate::utils::uuid7_str; /// Accumulates LLM response chunks from streaming or non-streaming sources. @@ -178,45 +179,43 @@ impl ResponseAccumulator { /// /// Non-`data:` lines, `[DONE]`, and malformed JSON are silently skipped. fn process_sse_line(&mut self, line: &str) { - let Some(data_str) = line.strip_prefix("data: ") else { - return; - }; - if data_str == "[DONE]" { - return; + if let Some(frame) = normalize_sse_line(line) { + self.process_event(&frame); } - let Ok(json) = deserialize_from_str::(data_str) else { - return; - }; + } - match json["type"] - .as_str() - .map_or(SSEEventType::Other, |s| s.parse().unwrap_or_default()) - { - SSEEventType::ResponseCreated => { - if let Some(id) = json["response"]["id"].as_str() { - self.response_id = id.to_string(); - } + /// Processes a typed [`EventFrame`], updating accumulator state. + /// + /// This is the core state machine — callers that already have a normalized + /// frame (e.g. [`StreamTee`](future)) can call this directly without + /// re-parsing from a raw line. + pub(crate) fn process_event(&mut self, frame: &EventFrame) { + match (&frame.event_type, &frame.payload) { + (SSEEventType::ResponseCreated, EventPayload::Response { id, .. }) if !id.is_empty() => { + self.response_id.clone_from(id); } - SSEEventType::ResponseOutputItemAdded => { + (SSEEventType::OutputItemAdded, EventPayload::OutputItemAdded { item_id, .. }) => { self.finalize_current_message(); - let item_id = json["item"]["id"] - .as_str() - .map_or_else(|| uuid7_str("msg_"), str::to_string); - self.current_message = Some(OutputMessage::new(&item_id, MessageStatus::InProgress.as_str())); + let id = if item_id.is_empty() { + uuid7_str("msg_") + } else { + item_id.clone() + }; + self.current_message = Some(OutputMessage::new(id, MessageStatus::InProgress.as_str())); } - SSEEventType::ResponseOutputTextDelta => { - if let Some(delta) = json["delta"].as_str() { - self.accumulated_text.push_str(delta); - } + (SSEEventType::OutputTextDelta, EventPayload::TextDelta { delta, .. }) => { + self.accumulated_text.push_str(delta); } - SSEEventType::ResponseDone => { + (SSEEventType::ResponseCompleted, EventPayload::Response { usage, .. }) => { self.finalize_current_message(); self.status = ResponseStatus::Completed; - if let Ok(usage) = deserialize_from_value::(json["response"]["usage"].clone()) { - self.usage = Some(usage); + if let Some(u) = usage { + if let Ok(parsed) = serde_json::from_value::(u.clone()) { + self.usage = Some(parsed); + } } } - SSEEventType::Other => {} + _ => {} } } @@ -326,4 +325,144 @@ mod tests { assert_eq!(MessageStatus::Completed.as_str(), "completed"); assert_eq!(MessageStatus::InProgress.as_str(), "in_progress"); } + + // --- process_event tests (exercises the refactored path directly) --- + + /// Feeding a `ResponseCreated` `EventFrame` sets the `response_id` on the accumulator. + #[test] + fn test_process_event_response_created_sets_id() { + let mut acc = ResponseAccumulator::new("resp_old".into(), None); + let frame = EventFrame { + event_type: SSEEventType::ResponseCreated, + payload: EventPayload::Response { + id: "resp_new".into(), + status: "in_progress".into(), + usage: None, + }, + sequence_number: Some(0), + }; + + acc.process_event(&frame); + assert_eq!(acc.response_id, "resp_new"); + } + + /// `ResponseCreated` with empty id should NOT overwrite the existing `response_id`. + #[test] + fn test_process_event_response_created_empty_id_no_overwrite() { + let mut acc = ResponseAccumulator::new("resp_keep".into(), None); + let frame = EventFrame { + event_type: SSEEventType::ResponseCreated, + payload: EventPayload::Response { + id: String::new(), + status: "in_progress".into(), + usage: None, + }, + sequence_number: Some(0), + }; + + acc.process_event(&frame); + assert_eq!(acc.response_id, "resp_keep"); + } + + /// `TextDelta` events accumulate text which gets attached to the current message. + #[test] + fn test_process_event_text_delta_accumulates() { + let mut acc = ResponseAccumulator::new("resp_1".into(), None); + + // Start a message + acc.process_event(&EventFrame { + event_type: SSEEventType::OutputItemAdded, + payload: EventPayload::OutputItemAdded { + item_id: "msg_1".into(), + item_type: "message".into(), + output_index: 0, + name: None, + call_id: None, + }, + sequence_number: Some(1), + }); + + // Feed deltas + acc.process_event(&EventFrame { + event_type: SSEEventType::OutputTextDelta, + payload: EventPayload::TextDelta { + delta: "Hello".into(), + item_id: "msg_1".into(), + output_index: 0, + content_index: 0, + }, + sequence_number: Some(2), + }); + acc.process_event(&EventFrame { + event_type: SSEEventType::OutputTextDelta, + payload: EventPayload::TextDelta { + delta: " world".into(), + item_id: "msg_1".into(), + output_index: 0, + content_index: 0, + }, + sequence_number: Some(3), + }); + + // Finalize + acc.process_event(&EventFrame { + event_type: SSEEventType::ResponseCompleted, + payload: EventPayload::Response { + id: "resp_1".into(), + status: "completed".into(), + usage: None, + }, + sequence_number: Some(4), + }); + + assert_eq!(acc.status, ResponseStatus::Completed); + assert_eq!(acc.output.len(), 1); + if let OutputItem::Message(msg) = &acc.output[0] { + assert_eq!(msg.content[0].text, "Hello world"); + } else { + panic!("expected Message"); + } + } + + /// `ResponseCompleted` with usage extracts token counts correctly. + #[test] + fn test_process_event_completed_with_usage() { + let mut acc = ResponseAccumulator::new("resp_1".into(), None); + let frame = EventFrame { + event_type: SSEEventType::ResponseCompleted, + payload: EventPayload::Response { + id: "resp_1".into(), + status: "completed".into(), + usage: Some(serde_json::json!({ + "input_tokens": 10, + "output_tokens": 5, + "total_tokens": 15 + })), + }, + sequence_number: Some(9), + }; + + acc.process_event(&frame); + assert_eq!(acc.status, ResponseStatus::Completed); + assert!(acc.usage.is_some()); + assert_eq!(acc.usage.unwrap().total_tokens, 15); + } + + /// Unknown/unhandled event types are silently ignored — no panic or state change. + /// Verifies the wildcard `_ => {}` arm works correctly. + #[test] + fn test_process_event_unknown_payload_ignored() { + let mut acc = ResponseAccumulator::new("resp_1".into(), None); + let frame = EventFrame { + event_type: SSEEventType::ContentPartAdded, + payload: EventPayload::Raw(serde_json::json!({"type": "response.content_part.added"})), + sequence_number: Some(3), + }; + + acc.process_event(&frame); + // No state change — still initial state + assert_eq!(acc.response_id, "resp_1"); + assert_eq!(acc.status, ResponseStatus::InProgress); + assert!(acc.output.is_empty()); + } }