Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
201 changes: 170 additions & 31 deletions crates/agentic-core/src/executor/accumulator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,12 @@ use std::sync::mpsc;

use futures::{Stream, StreamExt};

use crate::events::{EventFrame, EventPayload, SSEEventType, normalize_sse_line};
use crate::executor::error::{ExecutorError, ExecutorResult};
use crate::types::event::{MessageStatus, ResponseStatus, SSEEventType};
use crate::types::event::{MessageStatus, ResponseStatus};
use crate::types::io::{OutputItem, OutputMessage, OutputTextContent, ResponseUsage};
use crate::types::request_response::{IncompleteDetails, ResponsePayload};
use crate::utils::common::{deserialize_from_str, deserialize_from_value, deserialize_from_value_opt};
use crate::utils::common::{deserialize_from_str, deserialize_from_value_opt};
use crate::utils::uuid7_str;

/// Accumulates LLM response chunks from streaming or non-streaming sources.
Expand Down Expand Up @@ -178,45 +179,43 @@ impl ResponseAccumulator {
///
/// Non-`data:` lines, `[DONE]`, and malformed JSON are silently skipped.
fn process_sse_line(&mut self, line: &str) {
let Some(data_str) = line.strip_prefix("data: ") else {
return;
};
if data_str == "[DONE]" {
return;
if let Some(frame) = normalize_sse_line(line) {
self.process_event(&frame);
}
let Ok(json) = deserialize_from_str::<serde_json::Value>(data_str) else {
return;
};
}

match json["type"]
.as_str()
.map_or(SSEEventType::Other, |s| s.parse().unwrap_or_default())
{
SSEEventType::ResponseCreated => {
if let Some(id) = json["response"]["id"].as_str() {
self.response_id = id.to_string();
}
/// Processes a typed [`EventFrame`], updating accumulator state.
///
/// This is the core state machine — callers that already have a normalized
/// frame (e.g. [`StreamTee`](future)) can call this directly without
/// re-parsing from a raw line.
pub(crate) fn process_event(&mut self, frame: &EventFrame) {
match (&frame.event_type, &frame.payload) {
(SSEEventType::ResponseCreated, EventPayload::Response { id, .. }) if !id.is_empty() => {
self.response_id.clone_from(id);

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is the cloning necessary? can it be avoided?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah the clone is unavoidable here since we borrow &EventFrame (so StreamTee can inspect the frame after). clone_from at least reuses the existing buffer. If we later find accumulator is the sole consumer we can switch to taking the frame by value and destructure instead.

}
SSEEventType::ResponseOutputItemAdded => {
(SSEEventType::OutputItemAdded, EventPayload::OutputItemAdded { item_id, .. }) => {
self.finalize_current_message();
let item_id = json["item"]["id"]
.as_str()
.map_or_else(|| uuid7_str("msg_"), str::to_string);
self.current_message = Some(OutputMessage::new(&item_id, MessageStatus::InProgress.as_str()));
let id = if item_id.is_empty() {
uuid7_str("msg_")
} else {
item_id.clone()

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is item_id.clone() necessary here ? also could write in a single line instead of if/else braces.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same deal — need an owned String since we borrow the frame. Removed the &id in the OutputMessage::new call though (72e430a) so the String moves in directly instead of re-allocating from a ref. Re single line — rustfmt expands it back at our max_width so left it as-is.

};
self.current_message = Some(OutputMessage::new(id, MessageStatus::InProgress.as_str()));
}
SSEEventType::ResponseOutputTextDelta => {
if let Some(delta) = json["delta"].as_str() {
self.accumulated_text.push_str(delta);
}
(SSEEventType::OutputTextDelta, EventPayload::TextDelta { delta, .. }) => {
self.accumulated_text.push_str(delta);
}
SSEEventType::ResponseDone => {
(SSEEventType::ResponseCompleted, EventPayload::Response { usage, .. }) => {
self.finalize_current_message();
self.status = ResponseStatus::Completed;
if let Ok(usage) = deserialize_from_value::<ResponseUsage>(json["response"]["usage"].clone()) {
self.usage = Some(usage);
if let Some(u) = usage {
if let Ok(parsed) = serde_json::from_value::<ResponseUsage>(u.clone()) {
self.usage = Some(parsed);
}
}
}
SSEEventType::Other => {}
_ => {}
}
}

Expand Down Expand Up @@ -326,4 +325,144 @@ mod tests {
assert_eq!(MessageStatus::Completed.as_str(), "completed");
assert_eq!(MessageStatus::InProgress.as_str(), "in_progress");
}

// --- process_event tests (exercises the refactored path directly) ---

/// Feeding a `ResponseCreated` `EventFrame` sets the `response_id` on the accumulator.
#[test]
fn test_process_event_response_created_sets_id() {
let mut acc = ResponseAccumulator::new("resp_old".into(), None);
let frame = EventFrame {
event_type: SSEEventType::ResponseCreated,
payload: EventPayload::Response {
id: "resp_new".into(),
status: "in_progress".into(),
usage: None,
},
sequence_number: Some(0),
};

acc.process_event(&frame);
assert_eq!(acc.response_id, "resp_new");
}

/// `ResponseCreated` with empty id should NOT overwrite the existing `response_id`.
#[test]
fn test_process_event_response_created_empty_id_no_overwrite() {
let mut acc = ResponseAccumulator::new("resp_keep".into(), None);
let frame = EventFrame {
event_type: SSEEventType::ResponseCreated,
payload: EventPayload::Response {
id: String::new(),
status: "in_progress".into(),
usage: None,
},
sequence_number: Some(0),
};

acc.process_event(&frame);
assert_eq!(acc.response_id, "resp_keep");
}

/// `TextDelta` events accumulate text which gets attached to the current message.
#[test]
fn test_process_event_text_delta_accumulates() {
let mut acc = ResponseAccumulator::new("resp_1".into(), None);

// Start a message
acc.process_event(&EventFrame {
event_type: SSEEventType::OutputItemAdded,
payload: EventPayload::OutputItemAdded {
item_id: "msg_1".into(),
item_type: "message".into(),
output_index: 0,
name: None,
call_id: None,
},
sequence_number: Some(1),
});

// Feed deltas
acc.process_event(&EventFrame {
event_type: SSEEventType::OutputTextDelta,
payload: EventPayload::TextDelta {
delta: "Hello".into(),
item_id: "msg_1".into(),
output_index: 0,
content_index: 0,
},
sequence_number: Some(2),
});
acc.process_event(&EventFrame {
event_type: SSEEventType::OutputTextDelta,
payload: EventPayload::TextDelta {
delta: " world".into(),
item_id: "msg_1".into(),
output_index: 0,
content_index: 0,
},
sequence_number: Some(3),
});

// Finalize
acc.process_event(&EventFrame {
event_type: SSEEventType::ResponseCompleted,
payload: EventPayload::Response {
id: "resp_1".into(),
status: "completed".into(),
usage: None,
},
sequence_number: Some(4),
});

assert_eq!(acc.status, ResponseStatus::Completed);
assert_eq!(acc.output.len(), 1);
if let OutputItem::Message(msg) = &acc.output[0] {
assert_eq!(msg.content[0].text, "Hello world");
} else {
panic!("expected Message");
}
}

/// `ResponseCompleted` with usage extracts token counts correctly.
#[test]
fn test_process_event_completed_with_usage() {
let mut acc = ResponseAccumulator::new("resp_1".into(), None);
let frame = EventFrame {
event_type: SSEEventType::ResponseCompleted,
payload: EventPayload::Response {
id: "resp_1".into(),
status: "completed".into(),
usage: Some(serde_json::json!({
"input_tokens": 10,
"output_tokens": 5,
"total_tokens": 15
})),
},
sequence_number: Some(9),
};

acc.process_event(&frame);
assert_eq!(acc.status, ResponseStatus::Completed);
assert!(acc.usage.is_some());
assert_eq!(acc.usage.unwrap().total_tokens, 15);
}

/// Unknown/unhandled event types are silently ignored — no panic or state change.
/// Verifies the wildcard `_ => {}` arm works correctly.
#[test]
fn test_process_event_unknown_payload_ignored() {
let mut acc = ResponseAccumulator::new("resp_1".into(), None);
let frame = EventFrame {
event_type: SSEEventType::ContentPartAdded,
payload: EventPayload::Raw(serde_json::json!({"type": "response.content_part.added"})),
sequence_number: Some(3),
};

acc.process_event(&frame);
// No state change — still initial state
assert_eq!(acc.response_id, "resp_1");
assert_eq!(acc.status, ResponseStatus::InProgress);
assert!(acc.output.is_empty());
}
}