-
Notifications
You must be signed in to change notification settings - Fork 14
refactor: accumulator consumes EventFrame instead of inline JSON #52
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
74890a9
eaca01f
6f08ef4
72e430a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -12,11 +12,12 @@ use std::sync::mpsc; | |
|
|
||
| use futures::{Stream, StreamExt}; | ||
|
|
||
| use crate::events::{EventFrame, EventPayload, SSEEventType, normalize_sse_line}; | ||
| use crate::executor::error::{ExecutorError, ExecutorResult}; | ||
| use crate::types::event::{MessageStatus, ResponseStatus, SSEEventType}; | ||
| use crate::types::event::{MessageStatus, ResponseStatus}; | ||
| use crate::types::io::{OutputItem, OutputMessage, OutputTextContent, ResponseUsage}; | ||
| use crate::types::request_response::{IncompleteDetails, ResponsePayload}; | ||
| use crate::utils::common::{deserialize_from_str, deserialize_from_value, deserialize_from_value_opt}; | ||
| use crate::utils::common::{deserialize_from_str, deserialize_from_value_opt}; | ||
| use crate::utils::uuid7_str; | ||
|
|
||
| /// Accumulates LLM response chunks from streaming or non-streaming sources. | ||
|
|
@@ -178,45 +179,43 @@ impl ResponseAccumulator { | |
| /// | ||
| /// Non-`data:` lines, `[DONE]`, and malformed JSON are silently skipped. | ||
| fn process_sse_line(&mut self, line: &str) { | ||
| let Some(data_str) = line.strip_prefix("data: ") else { | ||
| return; | ||
| }; | ||
| if data_str == "[DONE]" { | ||
| return; | ||
| if let Some(frame) = normalize_sse_line(line) { | ||
| self.process_event(&frame); | ||
| } | ||
| let Ok(json) = deserialize_from_str::<serde_json::Value>(data_str) else { | ||
| return; | ||
| }; | ||
| } | ||
|
|
||
| match json["type"] | ||
| .as_str() | ||
| .map_or(SSEEventType::Other, |s| s.parse().unwrap_or_default()) | ||
| { | ||
| SSEEventType::ResponseCreated => { | ||
| if let Some(id) = json["response"]["id"].as_str() { | ||
| self.response_id = id.to_string(); | ||
| } | ||
| /// Processes a typed [`EventFrame`], updating accumulator state. | ||
| /// | ||
| /// This is the core state machine — callers that already have a normalized | ||
| /// frame (e.g. [`StreamTee`](future)) can call this directly without | ||
| /// re-parsing from a raw line. | ||
| pub(crate) fn process_event(&mut self, frame: &EventFrame) { | ||
| match (&frame.event_type, &frame.payload) { | ||
| (SSEEventType::ResponseCreated, EventPayload::Response { id, .. }) if !id.is_empty() => { | ||
| self.response_id.clone_from(id); | ||
| } | ||
| SSEEventType::ResponseOutputItemAdded => { | ||
| (SSEEventType::OutputItemAdded, EventPayload::OutputItemAdded { item_id, .. }) => { | ||
| self.finalize_current_message(); | ||
| let item_id = json["item"]["id"] | ||
| .as_str() | ||
| .map_or_else(|| uuid7_str("msg_"), str::to_string); | ||
| self.current_message = Some(OutputMessage::new(&item_id, MessageStatus::InProgress.as_str())); | ||
| let id = if item_id.is_empty() { | ||
| uuid7_str("msg_") | ||
| } else { | ||
| item_id.clone() | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. is
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same deal — need an owned String since we borrow the frame. Removed the |
||
| }; | ||
| self.current_message = Some(OutputMessage::new(id, MessageStatus::InProgress.as_str())); | ||
| } | ||
| SSEEventType::ResponseOutputTextDelta => { | ||
| if let Some(delta) = json["delta"].as_str() { | ||
| self.accumulated_text.push_str(delta); | ||
| } | ||
| (SSEEventType::OutputTextDelta, EventPayload::TextDelta { delta, .. }) => { | ||
| self.accumulated_text.push_str(delta); | ||
| } | ||
| SSEEventType::ResponseDone => { | ||
| (SSEEventType::ResponseCompleted, EventPayload::Response { usage, .. }) => { | ||
| self.finalize_current_message(); | ||
| self.status = ResponseStatus::Completed; | ||
| if let Ok(usage) = deserialize_from_value::<ResponseUsage>(json["response"]["usage"].clone()) { | ||
| self.usage = Some(usage); | ||
| if let Some(u) = usage { | ||
| if let Ok(parsed) = serde_json::from_value::<ResponseUsage>(u.clone()) { | ||
| self.usage = Some(parsed); | ||
| } | ||
| } | ||
| } | ||
| SSEEventType::Other => {} | ||
| _ => {} | ||
| } | ||
| } | ||
|
|
||
|
|
@@ -326,4 +325,144 @@ mod tests { | |
| assert_eq!(MessageStatus::Completed.as_str(), "completed"); | ||
| assert_eq!(MessageStatus::InProgress.as_str(), "in_progress"); | ||
| } | ||
|
|
||
| // --- process_event tests (exercises the refactored path directly) --- | ||
|
|
||
| /// Feeding a `ResponseCreated` `EventFrame` sets the `response_id` on the accumulator. | ||
| #[test] | ||
| fn test_process_event_response_created_sets_id() { | ||
| let mut acc = ResponseAccumulator::new("resp_old".into(), None); | ||
| let frame = EventFrame { | ||
| event_type: SSEEventType::ResponseCreated, | ||
| payload: EventPayload::Response { | ||
| id: "resp_new".into(), | ||
| status: "in_progress".into(), | ||
| usage: None, | ||
| }, | ||
| sequence_number: Some(0), | ||
| }; | ||
|
|
||
| acc.process_event(&frame); | ||
| assert_eq!(acc.response_id, "resp_new"); | ||
| } | ||
|
|
||
| /// `ResponseCreated` with empty id should NOT overwrite the existing `response_id`. | ||
| #[test] | ||
| fn test_process_event_response_created_empty_id_no_overwrite() { | ||
| let mut acc = ResponseAccumulator::new("resp_keep".into(), None); | ||
| let frame = EventFrame { | ||
| event_type: SSEEventType::ResponseCreated, | ||
| payload: EventPayload::Response { | ||
| id: String::new(), | ||
| status: "in_progress".into(), | ||
| usage: None, | ||
| }, | ||
| sequence_number: Some(0), | ||
| }; | ||
|
|
||
| acc.process_event(&frame); | ||
| assert_eq!(acc.response_id, "resp_keep"); | ||
| } | ||
|
|
||
| /// `TextDelta` events accumulate text which gets attached to the current message. | ||
| #[test] | ||
| fn test_process_event_text_delta_accumulates() { | ||
| let mut acc = ResponseAccumulator::new("resp_1".into(), None); | ||
|
|
||
| // Start a message | ||
| acc.process_event(&EventFrame { | ||
| event_type: SSEEventType::OutputItemAdded, | ||
| payload: EventPayload::OutputItemAdded { | ||
| item_id: "msg_1".into(), | ||
| item_type: "message".into(), | ||
| output_index: 0, | ||
| name: None, | ||
| call_id: None, | ||
| }, | ||
| sequence_number: Some(1), | ||
| }); | ||
|
|
||
| // Feed deltas | ||
| acc.process_event(&EventFrame { | ||
| event_type: SSEEventType::OutputTextDelta, | ||
| payload: EventPayload::TextDelta { | ||
| delta: "Hello".into(), | ||
| item_id: "msg_1".into(), | ||
| output_index: 0, | ||
| content_index: 0, | ||
| }, | ||
| sequence_number: Some(2), | ||
| }); | ||
| acc.process_event(&EventFrame { | ||
| event_type: SSEEventType::OutputTextDelta, | ||
| payload: EventPayload::TextDelta { | ||
| delta: " world".into(), | ||
| item_id: "msg_1".into(), | ||
| output_index: 0, | ||
| content_index: 0, | ||
| }, | ||
| sequence_number: Some(3), | ||
| }); | ||
|
|
||
| // Finalize | ||
| acc.process_event(&EventFrame { | ||
| event_type: SSEEventType::ResponseCompleted, | ||
| payload: EventPayload::Response { | ||
| id: "resp_1".into(), | ||
| status: "completed".into(), | ||
| usage: None, | ||
| }, | ||
| sequence_number: Some(4), | ||
| }); | ||
|
|
||
| assert_eq!(acc.status, ResponseStatus::Completed); | ||
| assert_eq!(acc.output.len(), 1); | ||
| if let OutputItem::Message(msg) = &acc.output[0] { | ||
| assert_eq!(msg.content[0].text, "Hello world"); | ||
| } else { | ||
| panic!("expected Message"); | ||
| } | ||
| } | ||
|
|
||
| /// `ResponseCompleted` with usage extracts token counts correctly. | ||
| #[test] | ||
| fn test_process_event_completed_with_usage() { | ||
| let mut acc = ResponseAccumulator::new("resp_1".into(), None); | ||
| let frame = EventFrame { | ||
| event_type: SSEEventType::ResponseCompleted, | ||
| payload: EventPayload::Response { | ||
| id: "resp_1".into(), | ||
| status: "completed".into(), | ||
| usage: Some(serde_json::json!({ | ||
| "input_tokens": 10, | ||
| "output_tokens": 5, | ||
| "total_tokens": 15 | ||
| })), | ||
| }, | ||
| sequence_number: Some(9), | ||
| }; | ||
|
|
||
| acc.process_event(&frame); | ||
| assert_eq!(acc.status, ResponseStatus::Completed); | ||
| assert!(acc.usage.is_some()); | ||
| assert_eq!(acc.usage.unwrap().total_tokens, 15); | ||
| } | ||
|
|
||
| /// Unknown/unhandled event types are silently ignored — no panic or state change. | ||
| /// Verifies the wildcard `_ => {}` arm works correctly. | ||
| #[test] | ||
| fn test_process_event_unknown_payload_ignored() { | ||
| let mut acc = ResponseAccumulator::new("resp_1".into(), None); | ||
| let frame = EventFrame { | ||
| event_type: SSEEventType::ContentPartAdded, | ||
| payload: EventPayload::Raw(serde_json::json!({"type": "response.content_part.added"})), | ||
| sequence_number: Some(3), | ||
| }; | ||
|
|
||
| acc.process_event(&frame); | ||
| // No state change — still initial state | ||
| assert_eq!(acc.response_id, "resp_1"); | ||
| assert_eq!(acc.status, ResponseStatus::InProgress); | ||
| assert!(acc.output.is_empty()); | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is the
cloningnecessary? can it be avoided?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah the clone is unavoidable here since we borrow
&EventFrame(soStreamTeecan inspect the frame after).clone_fromat least reuses the existing buffer. If we later find accumulator is the sole consumer we can switch to taking the frame by value and destructure instead.