refactor: accumulator consumes EventFrame instead of inline JSON#52
Conversation
Replaces inline JSON parsing in process_sse_line with: - normalize_sse_line() from events/ module → typed EventFrame - New pub process_event(&EventFrame) method for direct consumption process_sse_line is now a thin wrapper — backwards compatible. process_event is pub for future StreamTee to call directly. No behavioral change — same output for same input. Pure refactor that wires the events/ module (PR vllm-project#49) into the executor (PR vllm-project#46). Signed-off-by: Ashwin Giridharan <girida@amazon.com>
- Change pub → pub(crate) for process_event (internal to crate only) - Match (SSEEventType::OutputItemAdded, EventPayload::OutputItemAdded) explicitly instead of wildcard (defensive correctness) - Same for (SSEEventType::OutputTextDelta, EventPayload::TextDelta) Signed-off-by: Ashwin Giridharan <girida@amazon.com>
5 new tests exercising the refactored process_event method directly: - ResponseCreated sets response_id - ResponseCreated with empty id doesn't overwrite - TextDelta accumulates and attaches to current message - ResponseCompleted extracts usage token counts - Unknown event types silently ignored Signed-off-by: Ashwin Giridharan <girida@amazon.com>
| pub(crate) fn process_event(&mut self, frame: &EventFrame) { | ||
| match (&frame.event_type, &frame.payload) { | ||
| (SSEEventType::ResponseCreated, EventPayload::Response { id, .. }) if !id.is_empty() => { | ||
| self.response_id.clone_from(id); |
There was a problem hiding this comment.
is the cloning necessary? can it be avoided?
There was a problem hiding this comment.
Yeah the clone is unavoidable here since we borrow &EventFrame (so StreamTee can inspect the frame after). clone_from at least reuses the existing buffer. If we later find accumulator is the sole consumer we can switch to taking the frame by value and destructure instead.
| let id = if item_id.is_empty() { | ||
| uuid7_str("msg_") | ||
| } else { | ||
| item_id.clone() |
There was a problem hiding this comment.
is item_id.clone() necessary here ? also could write in a single line instead of if/else braces.
There was a problem hiding this comment.
Same deal — need an owned String since we borrow the frame. Removed the &id in the OutputMessage::new call though (72e430a) so the String moves in directly instead of re-allocating from a ref. Re single line — rustfmt expands it back at our max_width so left it as-is.
maralbahari
left a comment
There was a problem hiding this comment.
@ashwing thank you for the refactor. just left some nit comments.
OutputMessage::new takes `impl Into<String>`, so passing the owned String directly avoids a second allocation from &String → String. Signed-off-by: Ashwin Giridharan <girida@amazon.com>
Summary
Refactors
ResponseAccumulator::process_sse_lineto use the typedEventFramefrom theevents/module (PR #49) instead of inline JSON parsing. Pure behavioral refactor — same output for same input.Changes:
process_sse_lineis now a thin wrapper: callsnormalize_sse_line()→process_event()pub(crate) fn process_event(&EventFrame)— typed matching on(SSEEventType, EventPayload)pairsserde_json::Valuefield access, oldSSEEventTypeimport,deserialize_from_valueimportWhy: Enables
StreamTee(future) to callprocess_eventdirectly with pre-normalized frames — avoiding double-parsing when forwarding SSE to client while accumulating for tool detection.No behavioral change — existing cassette-based integration tests pass unchanged.
Test Plan
process_eventdirectly:ResponseCreatedsets response_idResponseCreatedwith empty id doesn't overwriteTextDeltaaccumulates and attaches to messageResponseCompletedextracts usagecargo clippy -- -D warningsclean