diff --git a/Cargo.lock b/Cargo.lock index 42e29a7d..0d6e0de7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2067,6 +2067,7 @@ dependencies = [ name = "jp_conversation" version = "0.1.0" dependencies = [ + "assert_matches", "chrono", "indexmap", "insta", diff --git a/crates/jp_conversation/Cargo.toml b/crates/jp_conversation/Cargo.toml index 1a23b089..8fc0ad7d 100644 --- a/crates/jp_conversation/Cargo.toml +++ b/crates/jp_conversation/Cargo.toml @@ -16,17 +16,17 @@ version.workspace = true jp_attachment = { workspace = true } jp_config = { workspace = true } jp_id = { workspace = true } -jp_serde = { workspace = true } - +base64 = { workspace = true, features = ["std"] } +chrono = { workspace = true } indexmap = { workspace = true } quick-xml = { workspace = true, features = ["serialize"] } serde = { workspace = true } serde_json = { workspace = true, features = ["preserve_order"] } thiserror = { workspace = true } -chrono = { workspace = true } tracing = { workspace = true } [dev-dependencies] +assert_matches = { workspace = true } insta = { workspace = true, features = ["json"] } test-log = { workspace = true } diff --git a/crates/jp_conversation/src/conversation.rs b/crates/jp_conversation/src/conversation.rs index b61aa294..5312d753 100644 --- a/crates/jp_conversation/src/conversation.rs +++ b/crates/jp_conversation/src/conversation.rs @@ -5,9 +5,8 @@ use std::{fmt, str::FromStr}; use chrono::{DateTime, Utc}; use jp_id::{ Id, NANOSECONDS_PER_DECISECOND, - parts::{GlobalId, TargetId, Variant}, + parts::{TargetId, Variant}, }; -use jp_serde::skip_if; use serde::{Deserialize, Serialize}; use crate::error::{Error, Result}; @@ -28,7 +27,7 @@ pub struct Conversation { /// Whether the conversation is stored in the user or workspace storage. // TODO: rename to `user_local` - #[serde(default, rename = "local", skip_serializing_if = "skip_if::is_false")] + #[serde(default, rename = "local", skip_serializing_if = "std::ops::Not::not")] pub user: bool, /// When the conversation expires. @@ -110,7 +109,7 @@ pub struct ConversationId(#[serde(with = "jp_id::serde")] DateTime); impl fmt::Debug for ConversationId { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_tuple("ConversationId") - .field(&self.to_string()) + .field(&self.as_deciseconds()) .finish() } } @@ -236,10 +235,6 @@ impl Id for ConversationId { fn target_id(&self) -> TargetId { self.as_deciseconds().to_string().into() } - - fn global_id(&self) -> GlobalId { - jp_id::global::get().into() - } } impl fmt::Display for ConversationId { diff --git a/crates/jp_conversation/src/event.rs b/crates/jp_conversation/src/event.rs index 66a12734..63e361a3 100644 --- a/crates/jp_conversation/src/event.rs +++ b/crates/jp_conversation/src/event.rs @@ -3,6 +3,7 @@ mod chat; mod inquiry; mod tool_call; +mod turn; use std::fmt; @@ -13,8 +14,12 @@ use serde_json::{Map, Value}; pub use self::{ chat::{ChatRequest, ChatResponse}, - inquiry::{InquiryAnswerType, InquiryQuestion, InquiryRequest, InquiryResponse, InquirySource}, + inquiry::{ + InquiryAnswerType, InquiryId, InquiryQuestion, InquiryRequest, InquiryResponse, + InquirySource, SelectOption, + }, tool_call::{ToolCallRequest, ToolCallResponse}, + turn::TurnStart, }; /// A single event in a conversation. @@ -32,11 +37,7 @@ pub struct ConversationEvent { pub kind: EventKind, /// Additional opaque metadata associated with the event. - #[serde( - default, - skip_serializing_if = "Map::is_empty", - with = "jp_serde::repr::base64_json_map" - )] + #[serde(default, skip_serializing_if = "Map::is_empty")] pub metadata: Map, } @@ -305,12 +306,43 @@ impl ConversationEvent { _ => None, } } + + /// Returns `true` if the event is a [`TurnStart`]. + #[must_use] + pub const fn is_turn_start(&self) -> bool { + matches!(self.kind, EventKind::TurnStart(_)) + } + + /// Returns a reference to the [`TurnStart`], if applicable. + #[must_use] + pub const fn as_turn_start(&self) -> Option<&TurnStart> { + match &self.kind { + EventKind::TurnStart(turn_start) => Some(turn_start), + _ => None, + } + } + + /// Consumes the event and returns the [`TurnStart`], if applicable. + #[must_use] + pub fn into_turn_start(self) -> Option { + match self.kind { + EventKind::TurnStart(turn_start) => Some(turn_start), + _ => None, + } + } } /// A type of event in a conversation. #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] #[serde(tag = "type", rename_all = "snake_case")] pub enum EventKind { + /// A turn start event. + /// + /// This event marks the beginning of a new turn in the conversation. A turn + /// groups together a user's chat request through the assistant's final + /// response, including any intermediate tool calls. + TurnStart(TurnStart), + /// A chat request event. /// /// This event is usually triggered by the user, but can also be @@ -353,6 +385,22 @@ pub enum EventKind { InquiryResponse(InquiryResponse), } +impl EventKind { + /// Returns the name of the event kind. + #[must_use] + pub const fn as_str(&self) -> &str { + match self { + Self::TurnStart(_) => "TurnStart", + Self::ChatRequest(_) => "ChatRequest", + Self::ChatResponse(_) => "ChatResponse", + Self::ToolCallRequest(_) => "ToolCallRequest", + Self::ToolCallResponse(_) => "ToolCallResponse", + Self::InquiryRequest(_) => "InquiryRequest", + Self::InquiryResponse(_) => "InquiryResponse", + } + } +} + impl From for EventKind { fn from(request: ChatRequest) -> Self { Self::ChatRequest(request) @@ -389,6 +437,12 @@ impl From for EventKind { } } +impl From for EventKind { + fn from(turn_start: TurnStart) -> Self { + Self::TurnStart(turn_start) + } +} + impl From for ConversationEvent { fn from(request: ChatRequest) -> Self { Self::now(request) @@ -424,3 +478,9 @@ impl From for ConversationEvent { Self::now(response) } } + +impl From for ConversationEvent { + fn from(turn_start: TurnStart) -> Self { + Self::now(turn_start) + } +} diff --git a/crates/jp_conversation/src/event/chat.rs b/crates/jp_conversation/src/event/chat.rs index a76e946d..43f9584b 100644 --- a/crates/jp_conversation/src/event/chat.rs +++ b/crates/jp_conversation/src/event/chat.rs @@ -3,19 +3,31 @@ use std::{fmt, ops}; use serde::{Deserialize, Serialize}; +use serde_json::{Map, Value}; /// A chat request event - the user's query or message. /// /// This represents the user's side of a conversation turn. #[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize)] pub struct ChatRequest { - /// The user's query or message content + /// The user's query or message content. pub content: String, + + /// Optional JSON schema constraining the assistant's response format. + /// + /// When present, providers set their native structured output + /// configuration and the assistant's response is emitted as + /// `ChatResponse::Structured` instead of `ChatResponse::Message`. + #[serde(default, skip_serializing_if = "Option::is_none")] + pub schema: Option>, } impl From for ChatRequest { fn from(content: String) -> Self { - Self { content } + Self { + content, + schema: None, + } } } @@ -23,6 +35,7 @@ impl From<&str> for ChatRequest { fn from(content: &str) -> Self { Self { content: content.to_owned(), + schema: None, } } } @@ -55,7 +68,7 @@ impl ops::DerefMut for ChatRequest { #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] #[serde(untagged, rename_all = "snake_case")] pub enum ChatResponse { - /// A standard message response + /// A standard message response. Message { /// The message content. message: String, @@ -67,6 +80,17 @@ pub enum ChatResponse { /// The reasoning content. reasoning: String, }, + + /// Structured JSON response conforming to the schema from the + /// preceding `ChatRequest`. + Structured { + /// The structured JSON value. + /// + /// After flush, this is the parsed JSON (object, array, etc.). + /// During streaming, individual parts carry `Value::String` + /// chunks that are concatenated by the `EventBuilder`. + data: Value, + }, } impl ChatResponse { @@ -86,45 +110,127 @@ impl ChatResponse { } } - /// Returns the content of the response, either the message or the - /// reasoning. + /// Creates a new structured response. #[must_use] - pub fn content(&self) -> &str { - match self { - Self::Message { message, .. } => message, - Self::Reasoning { reasoning, .. } => reasoning, - } + pub fn structured(data: impl Into) -> Self { + Self::Structured { data: data.into() } + } + + /// Returns `true` if the response is a message. + #[must_use] + pub const fn is_message(&self) -> bool { + matches!(self, Self::Message { .. }) + } + + /// Returns `true` if the response is reasoning. + #[must_use] + pub const fn is_reasoning(&self) -> bool { + matches!(self, Self::Reasoning { .. }) + } + + /// Returns `true` if the response is structured data. + #[must_use] + pub const fn is_structured(&self) -> bool { + matches!(self, Self::Structured { .. }) } - /// Returns a mutable reference to the content of the response, either the - /// message or the reasoning. + /// Returns a reference to the structured JSON data, if applicable. #[must_use] - pub const fn content_mut(&mut self) -> &mut String { + pub const fn as_structured_data(&self) -> Option<&Value> { match self { - Self::Message { message, .. } => message, - Self::Reasoning { reasoning, .. } => reasoning, + Self::Structured { data } => Some(data), + _ => None, } } - /// Consumes the response and returns the content, either the message or - /// the reasoning. + /// Consumes the response and returns the structured JSON data, if + /// applicable. #[must_use] - pub fn into_content(self) -> String { + pub fn into_structured_data(self) -> Option { match self { - Self::Message { message, .. } => message, - Self::Reasoning { reasoning, .. } => reasoning, + Self::Structured { data } => Some(data), + _ => None, } } +} - /// Returns `true` if the response is a message. - #[must_use] - pub const fn is_message(&self) -> bool { - matches!(self, Self::Message { .. }) +#[cfg(test)] +mod tests { + use serde_json::json; + + use super::*; + use crate::{ConversationEvent, EventKind}; + + #[test] + fn chat_request_with_schema_roundtrip() { + let request = ChatRequest { + content: "Extract contacts".into(), + schema: Some(Map::from_iter([("type".into(), json!("object"))])), + }; + + let json = serde_json::to_value(&request).unwrap(); + assert_eq!(json["content"], "Extract contacts"); + assert_eq!(json["schema"]["type"], "object"); + + let deserialized: ChatRequest = serde_json::from_value(json).unwrap(); + assert_eq!(deserialized, request); } - /// Returns `true` if the response is reasoning. - #[must_use] - pub const fn is_reasoning(&self) -> bool { - matches!(self, Self::Reasoning { .. }) + #[test] + fn chat_request_without_schema_omits_field() { + let request = ChatRequest::from("hello"); + let json = serde_json::to_value(&request).unwrap(); + assert!(json.get("schema").is_none()); + } + + #[test] + fn old_chat_request_json_deserializes_with_schema_none() { + let json = json!({ "content": "hello" }); + let request: ChatRequest = serde_json::from_value(json).unwrap(); + assert_eq!(request.content, "hello"); + assert!(request.schema.is_none()); + } + + #[test] + fn structured_response_roundtrip() { + let event = ConversationEvent::now(ChatResponse::structured(json!({"name": "Alice"}))); + let json = serde_json::to_value(&event).unwrap(); + assert_eq!(json["data"]["name"], "Alice"); + + let deserialized: ConversationEvent = serde_json::from_value(json).unwrap(); + let resp = deserialized.as_chat_response().unwrap(); + assert!(resp.is_structured()); + assert_eq!(resp.as_structured_data(), Some(&json!({"name": "Alice"}))); + } + + #[test] + fn untagged_deserialization_distinguishes_variants() { + let msg_json = json!({ "message": "hello" }); + let msg: ChatResponse = serde_json::from_value(msg_json).unwrap(); + assert!(msg.is_message()); + + let reason_json = json!({ "reasoning": "let me think" }); + let reason: ChatResponse = serde_json::from_value(reason_json).unwrap(); + assert!(reason.is_reasoning()); + + let structured_json = json!({ "data": { "key": "value" } }); + let structured: ChatResponse = serde_json::from_value(structured_json).unwrap(); + assert!(structured.is_structured()); + } + + #[test] + fn structured_within_event_kind_roundtrip() { + let kind = EventKind::ChatResponse(ChatResponse::structured(json!([1, 2, 3]))); + let json = serde_json::to_value(&kind).unwrap(); + assert_eq!(json["type"], "chat_response"); + assert_eq!(json["data"], json!([1, 2, 3])); + + let deserialized: EventKind = serde_json::from_value(json).unwrap(); + match deserialized { + EventKind::ChatResponse(ChatResponse::Structured { data }) => { + assert_eq!(data, json!([1, 2, 3])); + } + other => panic!("expected Structured, got {other:?}"), + } } } diff --git a/crates/jp_conversation/src/event/inquiry.rs b/crates/jp_conversation/src/event/inquiry.rs index 16c8ec10..bcf4d3a9 100644 --- a/crates/jp_conversation/src/event/inquiry.rs +++ b/crates/jp_conversation/src/event/inquiry.rs @@ -1,8 +1,47 @@ //! See [`InquiryRequest`] and [`InquiryResponse`]. +use std::fmt; + use serde::{Deserialize, Serialize}; use serde_json::Value; +/// Opaque identifier for an inquiry. +#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)] +#[serde(transparent)] +pub struct InquiryId(String); + +impl InquiryId { + /// Creates a new inquiry ID. + #[must_use] + pub fn new(id: impl Into) -> Self { + Self(id.into()) + } + + /// Returns the ID as a string slice. + #[must_use] + pub fn as_str(&self) -> &str { + &self.0 + } +} + +impl fmt::Display for InquiryId { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.write_str(&self.0) + } +} + +impl From for InquiryId { + fn from(s: String) -> Self { + Self(s) + } +} + +impl From<&str> for InquiryId { + fn from(s: &str) -> Self { + Self(s.to_owned()) + } +} + /// An inquiry request event - requesting additional input or clarification. /// /// This event can be triggered by tools, the assistant, or even the user, @@ -10,10 +49,11 @@ use serde_json::Value; /// pause execution and wait for a corresponding `InquiryResponse` event. #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] pub struct InquiryRequest { - /// Unique identifier for this inquiry. + /// Identifier for this inquiry. /// /// This must match the `id` in the corresponding `InquiryResponse`. - pub id: String, + /// The caller determines the ID convention. + pub id: InquiryId, /// The source of the inquiry (who is asking). pub source: InquirySource, @@ -25,9 +65,9 @@ pub struct InquiryRequest { impl InquiryRequest { /// Creates a new inquiry request. #[must_use] - pub const fn new(id: String, source: InquirySource, question: InquiryQuestion) -> Self { + pub fn new(id: impl Into, source: InquirySource, question: InquiryQuestion) -> Self { Self { - id, + id: id.into(), source, question, } @@ -98,9 +138,16 @@ impl InquiryQuestion { Self::new(text, InquiryAnswerType::Boolean) } - /// Creates a new select inquiry question. + /// Creates a new select inquiry question from `SelectOption`s. + #[must_use] + pub const fn select(text: String, options: Vec) -> Self { + Self::new(text, InquiryAnswerType::Select { options }) + } + + /// Creates a new select inquiry question from plain values (no descriptions). #[must_use] - pub const fn select(text: String, options: Vec) -> Self { + pub fn select_values(text: String, values: impl IntoIterator) -> Self { + let options = values.into_iter().map(SelectOption::from).collect(); Self::new(text, InquiryAnswerType::Select { options }) } @@ -119,17 +166,56 @@ pub enum InquiryAnswerType { Boolean, /// Select from predefined options. - /// - /// The options can be any JSON value (strings, numbers, booleans, etc.). Select { /// The available options to choose from. - options: Vec, + options: Vec, }, /// Free-form text input. Text, } +/// A single option in a select inquiry. +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub struct SelectOption { + /// The value returned when this option is selected. + pub value: Value, + + /// Human-readable description of this option, used as help text in + /// interactive prompts and as context for assistant-targeted inquiries. + #[serde(default, skip_serializing_if = "Option::is_none")] + pub description: Option, +} + +impl SelectOption { + /// Creates a new select option with a value and description. + #[must_use] + pub fn new(value: impl Into, description: impl Into) -> Self { + Self { + value: value.into(), + description: Some(description.into()), + } + } +} + +impl From for SelectOption { + fn from(value: Value) -> Self { + Self { + value, + description: None, + } + } +} + +impl From<&str> for SelectOption { + fn from(s: &str) -> Self { + Self { + value: s.into(), + description: None, + } + } +} + /// An inquiry response event - the answer to an inquiry request. /// /// This event MUST be in response to an `InquiryRequest` event, with a @@ -137,14 +223,14 @@ pub enum InquiryAnswerType { #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] pub struct InquiryResponse { /// ID matching the corresponding `InquiryRequest`. - pub id: String, + pub id: InquiryId, /// The answer provided. /// /// The shape of this value depends on the `answer_type` of the /// corresponding inquiry: /// - `Boolean`: `Value::Bool` - /// - `Select`: `Value::String` (one of the options) + /// - `Select`: one of the option values /// - `Text`: `Value::String` pub answer: Value, } @@ -152,25 +238,28 @@ pub struct InquiryResponse { impl InquiryResponse { /// Creates a new inquiry response. #[must_use] - pub const fn new(id: String, answer: Value) -> Self { - Self { id, answer } + pub fn new(id: impl Into, answer: Value) -> Self { + Self { + id: id.into(), + answer, + } } /// Creates a new boolean inquiry response. #[must_use] - pub const fn boolean(id: String, answer: bool) -> Self { + pub fn boolean(id: impl Into, answer: bool) -> Self { Self::new(id, Value::Bool(answer)) } /// Creates a new select inquiry response. #[must_use] - pub fn select(id: String, answer: impl Into) -> Self { + pub fn select(id: impl Into, answer: impl Into) -> Self { Self::new(id, answer.into()) } /// Creates a new text inquiry response. #[must_use] - pub const fn text(id: String, answer: String) -> Self { + pub fn text(id: impl Into, answer: String) -> Self { Self::new(id, Value::String(answer)) } @@ -199,10 +288,44 @@ mod tests { use super::*; + #[test] + fn test_inquiry_id_display() { + let id = InquiryId::new("fs_modify_file.__permission__"); + assert_eq!(id.to_string(), "fs_modify_file.__permission__"); + assert_eq!(id.as_str(), "fs_modify_file.__permission__"); + } + + #[test] + fn test_inquiry_id_equality_and_hash() { + use std::collections::HashMap; + + let id1 = InquiryId::new("same"); + let id2 = InquiryId::new("same"); + let id3 = InquiryId::new("different"); + + assert_eq!(id1, id2); + assert_ne!(id1, id3); + + let mut map = HashMap::new(); + map.insert(id1, "value"); + assert_eq!(map.get(&id2), Some(&"value")); + assert_eq!(map.get(&id3), None); + } + + #[test] + fn test_inquiry_id_serialization() { + let id = InquiryId::new("test-id"); + let json = serde_json::to_value(&id).unwrap(); + assert_eq!(json, "test-id"); // transparent serialization + + let deserialized: InquiryId = serde_json::from_value(json).unwrap(); + assert_eq!(deserialized, id); + } + #[test] fn test_inquiry_request_serialization() { let request = InquiryRequest::new( - "test-id".to_string(), + "test-id", InquirySource::Tool { name: "file_editor".to_string(), }, @@ -224,7 +347,7 @@ mod tests { #[test] fn test_inquiry_response_serialization() { - let response = InquiryResponse::boolean("test-id".to_string(), true); + let response = InquiryResponse::boolean("test-id", true); let json = serde_json::to_value(&response).unwrap(); assert_eq!(json["id"], "test-id"); @@ -236,54 +359,70 @@ mod tests { #[test] fn test_inquiry_question_types() { - // Boolean let q = InquiryQuestion::boolean("Confirm?".to_string()); assert!(matches!(q.answer_type, InquiryAnswerType::Boolean)); - // Select with strings let q = InquiryQuestion::select("Choose one:".to_string(), vec![ - "option1".into(), - "option2".into(), + SelectOption::new("y", "yes"), + SelectOption::new("n", "no"), ]); - assert!(matches!(q.answer_type, InquiryAnswerType::Select { .. })); + if let InquiryAnswerType::Select { options } = &q.answer_type { + assert_eq!(options.len(), 2); + assert_eq!(options[0].value, "y"); + assert_eq!(options[1].value, "n"); + assert_eq!(options[0].description.as_deref(), Some("yes")); + assert_eq!(options[1].description.as_deref(), Some("no")); + } else { + panic!("Expected Select variant"); + } - // Select with integers - let q = InquiryQuestion::select("Choose a number:".to_string(), vec![ + let q = InquiryQuestion::select_values("Pick:".to_string(), vec![ Value::Number(1.into()), Value::Number(2.into()), - Value::Number(3.into()), ]); - if let InquiryAnswerType::Select { options } = q.answer_type { - assert_eq!(options.len(), 3); - assert_eq!(options[0], 1); - assert_eq!(options[1], 2); - assert_eq!(options[2], 3); + if let InquiryAnswerType::Select { options } = &q.answer_type { + assert_eq!(options.len(), 2); + assert_eq!(options[0].value, 1); + assert_eq!(options[1].value, 2); + assert!(options[0].description.is_none()); + assert!(options[1].description.is_none()); } else { panic!("Expected Select variant"); } - // Text let q = InquiryQuestion::text("Enter name:".to_string()); assert!(matches!(q.answer_type, InquiryAnswerType::Text)); } + #[test] + fn test_select_option_serialization() { + let opt = SelectOption::new("y", "Run tool"); + let json = serde_json::to_value(&opt).unwrap(); + assert_eq!(json["value"], "y"); + assert_eq!(json["description"], "Run tool"); + + let opt_no_desc = SelectOption::from("n"); + let json = serde_json::to_value(&opt_no_desc).unwrap(); + assert_eq!(json["value"], "n"); + assert!(json.get("description").is_none()); + + let deserialized: SelectOption = serde_json::from_value(json).unwrap(); + assert_eq!(deserialized, opt_no_desc); + } + #[test] fn test_inquiry_response_helpers() { - let response = InquiryResponse::boolean("id".to_string(), true); + let response = InquiryResponse::boolean("id", true); assert_eq!(response.as_bool(), Some(true)); assert_eq!(response.as_str(), None); - let response = InquiryResponse::text("id".to_string(), "hello".to_string()); - assert_eq!(response.as_bool(), None); + let response = InquiryResponse::text("id", "hello".to_string()); assert_eq!(response.as_str(), Some("hello")); - assert_eq!(response.as_string(), Some("hello".to_string())); - // Select with string - let response = InquiryResponse::select("id".to_string(), "option1"); + let response = InquiryResponse::select("id", "option1"); assert_eq!(response.as_str(), Some("option1")); - // Select with integer - let response = InquiryResponse::select("id".to_string(), 42); + let response = InquiryResponse::select("id", 42); assert_eq!(response.answer, 42); } } diff --git a/crates/jp_conversation/src/event/tool_call.rs b/crates/jp_conversation/src/event/tool_call.rs index 4a84d0f3..094365fd 100644 --- a/crates/jp_conversation/src/event/tool_call.rs +++ b/crates/jp_conversation/src/event/tool_call.rs @@ -36,12 +36,6 @@ impl Serialize for ToolCallRequest { where Ser: Serializer, { - #[derive(Serialize)] - #[serde(transparent)] - struct Wrapper<'a>( - #[serde(with = "jp_serde::repr::base64_json_map")] &'a Map, - ); - let mut arguments = self.arguments.clone(); let tool_answers = arguments .remove("tool_answers") @@ -59,10 +53,10 @@ impl Serialize for ToolCallRequest { state.serialize_field("id", &self.id)?; state.serialize_field("name", &self.name)?; - state.serialize_field("arguments", &Wrapper(&arguments))?; + state.serialize_field("arguments", &arguments)?; if !tool_answers.is_empty() { - state.serialize_field("tool_answers", &Wrapper(&tool_answers))?; + state.serialize_field("tool_answers", &tool_answers)?; } state.end() @@ -79,9 +73,9 @@ impl<'de> Deserialize<'de> for ToolCallRequest { struct Helper { id: String, name: String, - #[serde(default, with = "jp_serde::repr::base64_json_map")] + #[serde(default)] arguments: Map, - #[serde(default, with = "jp_serde::repr::base64_json_map")] + #[serde(default)] tool_answers: Map, } @@ -143,7 +137,6 @@ impl Serialize for ToolCallResponse { #[allow(clippy::allow_attributes, clippy::missing_docs_in_private_items)] struct Helper<'a> { id: &'a str, - #[serde(with = "jp_serde::repr::base64_string")] content: &'a str, is_error: bool, } @@ -172,7 +165,6 @@ impl<'de> Deserialize<'de> for ToolCallResponse { #[allow(clippy::allow_attributes, clippy::missing_docs_in_private_items)] struct Helper { id: String, - #[serde(with = "jp_serde::repr::base64_string")] content: String, is_error: bool, } diff --git a/crates/jp_conversation/src/event/turn.rs b/crates/jp_conversation/src/event/turn.rs new file mode 100644 index 00000000..1d2c360d --- /dev/null +++ b/crates/jp_conversation/src/event/turn.rs @@ -0,0 +1,21 @@ +//! Turn-related event types. + +use serde::{Deserialize, Serialize}; + +/// Marks the beginning of a new turn in the conversation. +/// +/// A turn groups together the sequence of events from a user's chat request +/// through the assistant's final response, including any intermediate tool +/// calls. It corresponds to a single `jp query` invocation. +/// +/// The timestamp on the enclosing [`ConversationEvent`] records when the +/// turn started. +/// +/// A [`ChatRequest`] event does NOT mark the beginning of a turn. During a +/// turn, a user might interrupt the assistant with a [`ChatRequest`], but this +/// happens within the context of a single turn. +/// +/// [`ConversationEvent`]: super::ConversationEvent +/// [`ChatRequest`]: super::ChatRequest +#[derive(Debug, Clone, PartialEq, Eq, Default, Serialize, Deserialize)] +pub struct TurnStart; diff --git a/crates/jp_conversation/src/event_builder.rs b/crates/jp_conversation/src/event_builder.rs new file mode 100644 index 00000000..b031961f --- /dev/null +++ b/crates/jp_conversation/src/event_builder.rs @@ -0,0 +1,336 @@ +//! Event accumulation for the query stream pipeline. +//! +//! The [`EventBuilder`] accumulates streamed `jp_llm::Event` chunks from the +//! LLM into complete [`ConversationEvent`]s. It uses index-based buffering +//! where each index represents one logical event. +//! +//! # Event Model +//! +//! LLM providers stream events with an index: +//! +//! ```text +//! Part { index: 0, ChatResponse::Reasoning("Let ") } +//! Part { index: 0, ChatResponse::Reasoning("me think") } +//! Flush { index: 0 } → Reasoning complete, pushed to stream +//! +//! Part { index: 1, ChatResponse::Message("The ") } +//! Part { index: 1, ChatResponse::Message("answer") } +//! Flush { index: 1 } → Message complete, pushed to stream +//! ``` +//! +//! # Key Properties +//! +//! - **Index-based grouping**: Parts with the same index are accumulated +//! together +//! - **Flush boundary**: `Flush { index }` signals that all parts for that +//! index are complete and should be merged into a single `ConversationEvent` +//! - **Order preservation**: Flush events arrive in index order +//! - **Tool calls may be multi-part**: Providers that stream tool calls +//! incrementally (e.g. Anthropic) emit an initial Part with name + id +//! when the tool call starts, followed by a final Part with the parsed +//! arguments once all JSON chunks have arrived. The Flush after the last +//! Part marks the tool call as complete. + +use std::collections::{HashMap, hash_map::Entry}; + +use indexmap::IndexMap; +use serde_json::{Map, Value}; +use tracing::warn; + +use crate::{ + ConversationEvent, EventKind, + event::{ChatResponse, ToolCallRequest}, +}; + +/// Accumulates streamed events into complete [`ConversationEvent`]s. +pub struct EventBuilder { + /// Index-based buffers for accumulating partial events. + buffers: HashMap, + + /// Metadata accumulated from `jp_llm::Event::Part`s, keyed by stream index. + metadata: HashMap>, +} + +impl EventBuilder { + /// Creates a new empty event builder. + #[must_use] + pub fn new() -> Self { + Self { + buffers: HashMap::new(), + metadata: HashMap::new(), + } + } + + /// Returns the partial content accumulated in unflushed buffers. + /// + /// This is used when the user interrupts streaming and chooses to continue + /// with assistant prefill. The partial content is injected into the next + /// request so the LLM can continue from where it left off. + /// + /// Returns `None` if there's no meaningful partial content. Structured + /// buffers are excluded — partial JSON isn't useful for prefill. + #[must_use] + pub fn peek_partial_content(&self) -> Option { + if self.buffers.is_empty() { + return None; + } + + // Collect content from all buffers, sorted by index for deterministic + // output + let mut indices: Vec<_> = self.buffers.keys().copied().collect(); + indices.sort_unstable(); + + let mut parts = Vec::new(); + for index in indices { + if let Some(buffer) = self.buffers.get(&index) { + match buffer { + IndexBuffer::Reasoning { content } | IndexBuffer::Message { content } + if !content.is_empty() => + { + parts.push(content.clone()); + } + // Tool calls, structured buffers, empty content - skip + _ => {} + } + } + } + + if parts.is_empty() { + None + } else { + Some(parts.join("")) + } + } + + /// Handles a partial event from the LLM stream. + /// + /// Accumulates the event content into the buffer for the given index. + pub fn handle_part(&mut self, index: usize, event: ConversationEvent) { + // Accumulate metadata from each part (e.g. thinking signatures). + if !event.metadata.is_empty() { + self.metadata + .entry(index) + .or_default() + .extend(event.metadata); + } + + match event.kind { + EventKind::ChatResponse(ChatResponse::Reasoning { reasoning }) => { + match self.buffers.entry(index) { + Entry::Occupied(mut e) => match e.get_mut().as_reasoning_mut() { + Some(content) => content.push_str(&reasoning), + None => warn_mismatch(e.get(), "Reasoning"), + }, + Entry::Vacant(e) => { + e.insert(IndexBuffer::Reasoning { content: reasoning }); + } + } + } + EventKind::ChatResponse(ChatResponse::Message { message }) => { + match self.buffers.entry(index) { + Entry::Occupied(mut e) => match e.get_mut().as_message_mut() { + Some(content) => content.push_str(&message), + None => warn_mismatch(e.get(), "Message"), + }, + Entry::Vacant(e) => { + e.insert(IndexBuffer::Message { content: message }); + } + } + } + EventKind::ToolCallRequest(request) => match self.buffers.entry(index) { + Entry::Occupied(mut e) => e.get_mut().merge_tool_call(request), + Entry::Vacant(e) => { + e.insert(IndexBuffer::ToolCall { request }); + } + }, + EventKind::ChatResponse(ChatResponse::Structured { data }) => { + let Value::String(chunk) = data else { + warn!("Structured part with non-string data; ignoring"); + return; + }; + + match self.buffers.entry(index) { + Entry::Occupied(mut e) => match e.get_mut().as_structured_mut() { + Some(content) => content.push_str(&chunk), + None => warn_mismatch(e.get(), "Structured"), + }, + Entry::Vacant(e) => { + e.insert(IndexBuffer::Structured { content: chunk }); + } + } + } + EventKind::ChatRequest(_) + | EventKind::ToolCallResponse(_) + | EventKind::InquiryRequest(_) + | EventKind::InquiryResponse(_) + | EventKind::TurnStart(_) => {} + } + } + + /// Flushes the buffer for the given index, producing a complete + /// [`ConversationEvent`]. + /// + /// Returns `None` if the index had no buffered content (or was a + /// whitespace-only message that was dropped). + pub fn handle_flush( + &mut self, + index: usize, + metadata: IndexMap, + ) -> Option { + let buffer = self.buffers.remove(&index)?; + + let mut event = match buffer { + IndexBuffer::Reasoning { content } => { + ConversationEvent::now(ChatResponse::Reasoning { reasoning: content }) + } + // Skip whitespace-only messages. These appear when the LLM + // emits blank text content blocks (e.g. "\n\n" between + // interleaved thinking blocks). + IndexBuffer::Message { content } if content.trim().is_empty() => return None, + IndexBuffer::Message { content } => { + ConversationEvent::now(ChatResponse::Message { message: content }) + } + IndexBuffer::ToolCall { request } => ConversationEvent::now(request), + IndexBuffer::Structured { content } => { + let data = serde_json::from_str::(&content).unwrap_or_else(|e| { + warn!("Failed to parse structured response JSON: {e}"); + Value::String(content) + }); + ConversationEvent::now(ChatResponse::Structured { data }) + } + }; + + // Merge metadata accumulated from Part events (e.g. thinking + // signatures that arrive via SignatureDelta). + if let Some(part_metadata) = self.metadata.remove(&index) { + event.metadata.extend(part_metadata); + } + + // Merge metadata from the Flush event itself. + event.metadata.extend(metadata); + + Some(event) + } + + /// Flushes all remaining buffers. + /// + /// This is used when the stream ends (e.g. on `jp_llm::Event::Finished`) to + /// ensure any partially accumulated events are not silently dropped. + #[expect( + clippy::needless_collect, + reason = "collect breaks the borrow on self.buffers" + )] + pub fn drain(&mut self) -> Vec { + let indices: Vec = self.buffers.keys().copied().collect(); + indices + .into_iter() + .filter_map(|index| self.handle_flush(index, IndexMap::new())) + .collect() + } +} + +impl Default for EventBuilder { + fn default() -> Self { + Self::new() + } +} + +/// Buffer for accumulating partial events by type. +enum IndexBuffer { + /// Accumulates reasoning content. + Reasoning { + /// The reasoning content accumulated so far. + content: String, + }, + /// Accumulates message content. + Message { + /// The message content accumulated so far. + content: String, + }, + /// Accumulates a tool call request (may be multi-part). + ToolCall { + /// The tool call request accumulated so far. + request: ToolCallRequest, + }, + /// Accumulates streamed JSON chunks for a structured response. + /// + /// During streaming, providers emit `ChatResponse::Structured` parts + /// with `Value::String` chunks. On flush, the concatenated string is + /// parsed into a `Value`. If parsing fails, the raw string is preserved. + Structured { + /// The JSON string accumulated so far. + content: String, + }, +} + +impl IndexBuffer { + /// Merges an incoming tool call request into this buffer. + /// + /// First non-empty value wins for id and name; arguments are extended. + /// Logs a warning if the buffer is not a `ToolCall`. + fn merge_tool_call(&mut self, incoming: ToolCallRequest) { + let Self::ToolCall { request } = self else { + warn!( + buffer_type = self.as_str(), + "Expected ToolCall buffer; ignoring merge." + ); + return; + }; + + if request.id.is_empty() && !incoming.id.is_empty() { + request.id = incoming.id; + } + if request.name.is_empty() && !incoming.name.is_empty() { + request.name = incoming.name; + } + request.arguments.extend(incoming.arguments); + } + + /// Returns a mutable reference to the reasoning buffer content, if any. + const fn as_reasoning_mut(&mut self) -> Option<&mut String> { + match self { + Self::Reasoning { content } => Some(content), + _ => None, + } + } + + /// Returns a mutable reference to the message buffer content, if any. + const fn as_message_mut(&mut self) -> Option<&mut String> { + match self { + Self::Message { content } => Some(content), + _ => None, + } + } + + /// Returns a mutable reference to the structured buffer content, if any. + const fn as_structured_mut(&mut self) -> Option<&mut String> { + match self { + Self::Structured { content } => Some(content), + _ => None, + } + } + + /// Returns the name of the buffer type. + #[must_use] + const fn as_str(&self) -> &str { + match self { + Self::Reasoning { .. } => "Reasoning", + Self::Message { .. } => "Message", + Self::ToolCall { .. } => "ToolCall", + Self::Structured { .. } => "Structured", + } + } +} + +/// Logs a warning when a part's type doesn't match the existing buffer. +fn warn_mismatch(buffer: &IndexBuffer, incoming: &str) { + warn!( + buffer_type = buffer.as_str(), + incoming_type = incoming, + "Mismatched event type for index; ignoring." + ); +} + +#[cfg(test)] +#[path = "event_builder_tests.rs"] +mod tests; diff --git a/crates/jp_conversation/src/event_builder_tests.rs b/crates/jp_conversation/src/event_builder_tests.rs new file mode 100644 index 00000000..0e4eb5c0 --- /dev/null +++ b/crates/jp_conversation/src/event_builder_tests.rs @@ -0,0 +1,544 @@ +use assert_matches::assert_matches; +use serde_json::json; + +use super::*; +use crate::EventKind; + +#[test] +fn test_accumulates_reasoning_chunks() { + let mut builder = EventBuilder::new(); + + builder.handle_part( + 0, + ConversationEvent::now(ChatResponse::Reasoning { + reasoning: "Hello ".into(), + }), + ); + builder.handle_part( + 0, + ConversationEvent::now(ChatResponse::Reasoning { + reasoning: "world".into(), + }), + ); + let event = builder.handle_flush(0, IndexMap::new()).unwrap(); + + assert_matches!( + &event.kind, + EventKind::ChatResponse(ChatResponse::Reasoning { reasoning }) + if reasoning == "Hello world" + ); +} + +#[test] +fn test_accumulates_message_chunks() { + let mut builder = EventBuilder::new(); + + builder.handle_part( + 1, + ConversationEvent::now(ChatResponse::Message { + message: "Hello ".into(), + }), + ); + builder.handle_part( + 1, + ConversationEvent::now(ChatResponse::Message { + message: "world".into(), + }), + ); + let event = builder.handle_flush(1, IndexMap::new()).unwrap(); + + match &event.kind { + EventKind::ChatResponse(ChatResponse::Message { message }) => { + assert_eq!(message, "Hello world"); + } + _ => panic!("Expected message event"), + } +} + +#[test] +fn test_handles_tool_call() { + let mut builder = EventBuilder::new(); + + let request = ToolCallRequest { + id: "call_1".into(), + name: "test_tool".into(), + arguments: serde_json::Map::new(), + }; + + builder.handle_part(2, ConversationEvent::now(request)); + let event = builder.handle_flush(2, IndexMap::new()).unwrap(); + + let req = event.as_tool_call_request().expect("expected a tool call"); + assert_eq!(req.name, "test_tool"); +} + +#[test] +fn test_merges_multi_part_tool_call() { + let mut builder = EventBuilder::new(); + + // First Part: name + id, empty arguments (from content_block_start) + builder.handle_part( + 1, + ConversationEvent::now(ToolCallRequest { + id: "call_42".into(), + name: "fs_create_file".into(), + arguments: serde_json::Map::new(), + }), + ); + + // Second Part: arguments only (from content_block_stop after JSON + // aggregation) + let mut args = serde_json::Map::new(); + args.insert("path".into(), "src/main.rs".into()); + args.insert("content".into(), "fn main() {}".into()); + builder.handle_part( + 1, + ConversationEvent::now(ToolCallRequest { + id: "call_42".into(), + name: "fs_create_file".into(), + arguments: args, + }), + ); + + let event = builder.handle_flush(1, IndexMap::new()).unwrap(); + + let req = event.as_tool_call_request().expect("expected a tool call"); + assert_eq!(req.id, "call_42"); + assert_eq!(req.name, "fs_create_file"); + assert_eq!(req.arguments.len(), 2); + assert_eq!(req.arguments["path"], "src/main.rs"); + assert_eq!(req.arguments["content"], "fn main() {}"); +} + +#[test] +fn test_multi_part_tool_call_first_write_wins_for_id_and_name() { + let mut builder = EventBuilder::new(); + + // First Part with id+name + builder.handle_part( + 0, + ConversationEvent::now(ToolCallRequest { + id: "first_id".into(), + name: "first_name".into(), + arguments: serde_json::Map::new(), + }), + ); + + // Second Part with different id+name (should be ignored for id/name) + let mut args = serde_json::Map::new(); + args.insert("key".into(), "value".into()); + builder.handle_part( + 0, + ConversationEvent::now(ToolCallRequest { + id: "second_id".into(), + name: "second_name".into(), + arguments: args, + }), + ); + + let event = builder.handle_flush(0, IndexMap::new()).unwrap(); + + let req = event.as_tool_call_request().expect("expected a tool call"); + + // First non-empty wins + assert_eq!(req.id, "first_id"); + assert_eq!(req.name, "first_name"); + // Arguments are extended + assert_eq!(req.arguments["key"], "value"); +} + +#[test] +fn test_interleaved_indices() { + let mut builder = EventBuilder::new(); + + // Index 0: Message + builder.handle_part( + 0, + ConversationEvent::now(ChatResponse::Message { + message: "Part 1".into(), + }), + ); + // Index 1: Reasoning + builder.handle_part( + 1, + ConversationEvent::now(ChatResponse::Reasoning { + reasoning: "Reasoning".into(), + }), + ); + // Index 0: Message continues + builder.handle_part( + 0, + ConversationEvent::now(ChatResponse::Message { + message: " Part 2".into(), + }), + ); + + // Flush 1 first + let event1 = builder.handle_flush(1, IndexMap::new()).unwrap(); + // Flush 0 second + let event2 = builder.handle_flush(0, IndexMap::new()).unwrap(); + + assert_matches!( + &event1.kind, + EventKind::ChatResponse(ChatResponse::Reasoning { reasoning }) + if reasoning == "Reasoning" + ); + assert_matches!( + &event2.kind, + EventKind::ChatResponse(ChatResponse::Message { message }) + if message == "Part 1 Part 2" + ); +} + +#[test] +fn test_metadata_preserved_on_flush() { + let mut builder = EventBuilder::new(); + + builder.handle_part( + 0, + ConversationEvent::now(ChatResponse::Message { + message: "Hello".into(), + }), + ); + + let mut metadata = IndexMap::new(); + metadata.insert("tokens".to_string(), Value::Number(100.into())); + + let event = builder.handle_flush(0, metadata).unwrap(); + + assert_eq!( + event.metadata.get("tokens"), + Some(&Value::Number(100.into())) + ); +} + +/// Regression test: metadata arriving on individual `Part` events (e.g. +/// Anthropic thinking signatures via `SignatureDelta`) must be preserved +/// through aggregation and appear on the flushed event. +#[test] +fn test_part_metadata_accumulated_through_flush() { + let mut builder = EventBuilder::new(); + + // First part: reasoning content, no metadata. + builder.handle_part( + 0, + ConversationEvent::now(ChatResponse::Reasoning { + reasoning: "Let me think...".into(), + }), + ); + + // Second part: empty reasoning content with signature metadata + // (simulates Anthropic's SignatureDelta). + builder.handle_part( + 0, + ConversationEvent::now(ChatResponse::Reasoning { + reasoning: String::new(), + }) + .with_metadata_field("anthropic_thinking_signature", "sig_abc123"), + ); + + // Flush with no additional metadata. + let event = builder.handle_flush(0, IndexMap::new()).unwrap(); + + // Content should be accumulated. + assert_matches!( + &event.kind, + EventKind::ChatResponse(ChatResponse::Reasoning { reasoning }) + if reasoning == "Let me think..." + ); + + // Signature metadata from the Part should be present. + assert_eq!( + event.metadata.get("anthropic_thinking_signature"), + Some(&Value::String("sig_abc123".into())) + ); +} + +/// Both part metadata and flush metadata should be merged. +#[test] +fn test_part_and_flush_metadata_merged() { + let mut builder = EventBuilder::new(); + + builder.handle_part( + 0, + ConversationEvent::now(ChatResponse::Reasoning { + reasoning: "thinking".into(), + }) + .with_metadata_field("from_part", "part_value"), + ); + + let mut flush_metadata = IndexMap::new(); + flush_metadata.insert( + "from_flush".to_string(), + Value::String("flush_value".into()), + ); + + let event = builder.handle_flush(0, flush_metadata).unwrap(); + + assert_eq!( + event.metadata.get("from_part"), + Some(&Value::String("part_value".into())) + ); + assert_eq!( + event.metadata.get("from_flush"), + Some(&Value::String("flush_value".into())) + ); +} + +#[test] +fn test_whitespace_only_message_not_persisted() { + let mut builder = EventBuilder::new(); + + // Simulate Anthropic emitting "\n\n" as a text content block + // between interleaved thinking blocks. + builder.handle_part( + 0, + ConversationEvent::now(ChatResponse::Message { + message: "\n\n".into(), + }), + ); + assert!(builder.handle_flush(0, IndexMap::new()).is_none()); +} + +#[test] +fn test_ignores_mismatched_event_type() { + let mut builder = EventBuilder::new(); + + // Start with Reasoning + builder.handle_part( + 0, + ConversationEvent::now(ChatResponse::Reasoning { + reasoning: "Thinking...".into(), + }), + ); + + // Try to append Message to same index (should be ignored) + builder.handle_part( + 0, + ConversationEvent::now(ChatResponse::Message { + message: "Hello".into(), + }), + ); + + let event = builder.handle_flush(0, IndexMap::new()).unwrap(); + assert_matches!( + &event.kind, + EventKind::ChatResponse(ChatResponse::Reasoning { reasoning }) + if reasoning == "Thinking..." + ); +} + +#[test] +fn test_ignores_irrelevant_event_kinds() { + let mut builder = EventBuilder::new(); + + // ChatRequest should be ignored + builder.handle_part( + 0, + ConversationEvent::now(crate::EventKind::ChatRequest(crate::event::ChatRequest { + content: String::new(), + schema: None, + })), + ); + + // Flush should produce nothing because nothing was buffered + assert!(builder.handle_flush(0, IndexMap::new()).is_none()); +} + +#[test] +fn test_peek_partial_content_empty() { + let builder = EventBuilder::new(); + assert_eq!(builder.peek_partial_content(), None); +} + +#[test] +fn test_peek_partial_content_single_buffer() { + let mut builder = EventBuilder::new(); + + builder.handle_part( + 0, + ConversationEvent::now(ChatResponse::Message { + message: "Hello ".into(), + }), + ); + builder.handle_part( + 0, + ConversationEvent::now(ChatResponse::Message { + message: "world".into(), + }), + ); + + assert_eq!( + builder.peek_partial_content(), + Some("Hello world".to_string()) + ); +} + +#[test] +fn test_peek_partial_content_multiple_buffers() { + let mut builder = EventBuilder::new(); + + // Index 0: Reasoning + builder.handle_part( + 0, + ConversationEvent::now(ChatResponse::Reasoning { + reasoning: "Let me think".into(), + }), + ); + // Index 1: Message + builder.handle_part( + 1, + ConversationEvent::now(ChatResponse::Message { + message: "The answer is".into(), + }), + ); + + // Should concatenate in index order + assert_eq!( + builder.peek_partial_content(), + Some("Let me thinkThe answer is".to_string()) + ); +} + +#[test] +fn test_peek_partial_content_after_partial_flush() { + let mut builder = EventBuilder::new(); + + // Index 0: Reasoning (will be flushed) + builder.handle_part( + 0, + ConversationEvent::now(ChatResponse::Reasoning { + reasoning: "Thinking...".into(), + }), + ); + // Index 1: Message (will remain unflushed) + builder.handle_part( + 1, + ConversationEvent::now(ChatResponse::Message { + message: "Partial answer".into(), + }), + ); + + // Flush index 0 only + builder.handle_flush(0, IndexMap::new()); + + // Only index 1 should remain + assert_eq!( + builder.peek_partial_content(), + Some("Partial answer".to_string()) + ); +} + +// --- Structured response (Phase 2) --- + +#[test] +fn test_accumulates_structured_chunks() { + let mut builder = EventBuilder::new(); + + builder.handle_part( + 0, + ConversationEvent::now(ChatResponse::Structured { + data: Value::String("{\"name".into()), + }), + ); + builder.handle_part( + 0, + ConversationEvent::now(ChatResponse::Structured { + data: Value::String("\": \"Alice\"}".into()), + }), + ); + let event = builder.handle_flush(0, IndexMap::new()).unwrap(); + + let resp = event.as_chat_response().unwrap(); + assert_eq!(resp.as_structured_data(), Some(&json!({"name": "Alice"}))); +} + +#[test] +fn test_structured_malformed_json_falls_back_to_string() { + let mut builder = EventBuilder::new(); + + builder.handle_part( + 0, + ConversationEvent::now(ChatResponse::Structured { + data: Value::String("{\"truncated".into()), + }), + ); + let event = builder.handle_flush(0, IndexMap::new()).unwrap(); + + let resp = event.as_chat_response().unwrap(); + assert_eq!( + resp.as_structured_data(), + Some(&Value::String("{\"truncated".into())) + ); +} + +#[test] +fn test_structured_preserves_metadata() { + let mut builder = EventBuilder::new(); + + builder.handle_part( + 0, + ConversationEvent::now(ChatResponse::Structured { + data: Value::String("{}".into()), + }) + .with_metadata_field("provider", "anthropic"), + ); + + let mut flush_meta = IndexMap::new(); + flush_meta.insert("tokens".into(), json!(42)); + let event = builder.handle_flush(0, flush_meta).unwrap(); + + assert_eq!( + event.metadata.get("provider"), + Some(&Value::String("anthropic".into())) + ); + assert_eq!(event.metadata.get("tokens"), Some(&json!(42))); +} + +#[test] +fn test_structured_ignores_non_string_part() { + let mut builder = EventBuilder::new(); + + // A part with a non-string Value should be silently ignored. + builder.handle_part( + 0, + ConversationEvent::now(ChatResponse::Structured { + data: json!({"already": "parsed"}), + }), + ); + assert!(builder.handle_flush(0, IndexMap::new()).is_none()); +} + +#[test] +fn test_structured_not_included_in_peek_partial_content() { + let mut builder = EventBuilder::new(); + + builder.handle_part( + 0, + ConversationEvent::now(ChatResponse::Structured { + data: Value::String("{\"partial".into()), + }), + ); + + // Structured buffers are not useful for assistant prefill. + assert_eq!(builder.peek_partial_content(), None); +} + +#[test] +fn test_structured_array_response() { + let mut builder = EventBuilder::new(); + + builder.handle_part( + 0, + ConversationEvent::now(ChatResponse::Structured { + data: Value::String("[\"title one\",\"title two\"]".into()), + }), + ); + let event = builder.handle_flush(0, IndexMap::new()).unwrap(); + + let resp = event.as_chat_response().unwrap(); + assert_eq!( + resp.as_structured_data(), + Some(&json!(["title one", "title two"])) + ); +} diff --git a/crates/jp_conversation/src/lib.rs b/crates/jp_conversation/src/lib.rs index ce253f6e..80ed1c07 100644 --- a/crates/jp_conversation/src/lib.rs +++ b/crates/jp_conversation/src/lib.rs @@ -30,6 +30,8 @@ pub mod conversation; pub mod error; pub mod event; +pub mod event_builder; +pub(crate) mod storage; pub mod stream; pub mod thread; diff --git a/crates/jp_conversation/src/snapshots/jp_conversation__stream__tests__conversation_stream_serialization_roundtrip-2.snap b/crates/jp_conversation/src/snapshots/jp_conversation__stream__tests__conversation_stream_serialization_roundtrip-2.snap index 2e820dfe..0123c072 100644 --- a/crates/jp_conversation/src/snapshots/jp_conversation__stream__tests__conversation_stream_serialization_roundtrip-2.snap +++ b/crates/jp_conversation/src/snapshots/jp_conversation__stream__tests__conversation_stream_serialization_roundtrip-2.snap @@ -21,6 +21,11 @@ expression: "&stream" "stop_words": [], "other": {} } + }, + "request": { + "max_retries": 0, + "base_backoff_ms": 0, + "max_backoff_secs": 0 } }, "conversation": { @@ -34,6 +39,7 @@ expression: "&stream" "run": "ask", "result": "unattended", "style": { + "hidden": false, "inline_results": { "truncate": { "lines": 10 @@ -61,8 +67,25 @@ expression: "&stream" "reasoning": { "display": "full" }, + "streaming": { + "progress": { + "show": false, + "delay_secs": 0, + "interval_ms": 0 + } + }, "tool_call": { - "show": false + "show": false, + "progress": { + "show": false, + "delay_secs": 0, + "interval_ms": 0 + }, + "preparing": { + "show": false, + "delay_secs": 0, + "interval_ms": 0 + } }, "typewriter": { "text_delay": { diff --git a/crates/jp_conversation/src/snapshots/jp_conversation__stream__tests__conversation_stream_serialization_roundtrip.snap b/crates/jp_conversation/src/snapshots/jp_conversation__stream__tests__conversation_stream_serialization_roundtrip.snap index 9fa9eaa4..8b40533c 100644 --- a/crates/jp_conversation/src/snapshots/jp_conversation__stream__tests__conversation_stream_serialization_roundtrip.snap +++ b/crates/jp_conversation/src/snapshots/jp_conversation__stream__tests__conversation_stream_serialization_roundtrip.snap @@ -21,6 +21,11 @@ expression: "&stream" "stop_words": [], "other": {} } + }, + "request": { + "max_retries": 0, + "base_backoff_ms": 0, + "max_backoff_secs": 0 } }, "conversation": { @@ -34,6 +39,7 @@ expression: "&stream" "run": "ask", "result": "unattended", "style": { + "hidden": false, "inline_results": { "truncate": { "lines": 10 @@ -61,8 +67,25 @@ expression: "&stream" "reasoning": { "display": "full" }, + "streaming": { + "progress": { + "show": false, + "delay_secs": 0, + "interval_ms": 0 + } + }, "tool_call": { - "show": false + "show": false, + "progress": { + "show": false, + "delay_secs": 0, + "interval_ms": 0 + }, + "preparing": { + "show": false, + "delay_secs": 0, + "interval_ms": 0 + } }, "typewriter": { "text_delay": { diff --git a/crates/jp_conversation/src/storage.rs b/crates/jp_conversation/src/storage.rs new file mode 100644 index 00000000..3edeca9f --- /dev/null +++ b/crates/jp_conversation/src/storage.rs @@ -0,0 +1,161 @@ +//! Base64 encoding/decoding for storage-persisted event fields. +//! +//! This module encodes select content fields (tool arguments, tool response +//! content, metadata) so that raw conversation text doesn't appear in plain +//! text on disk - keeping it out of `grep` and editor search results. +//! +//! The encoding is applied during [`InternalEvent`] serialization and reversed +//! during deserialization. The inner event types serialize as plain text. +//! +//! [`InternalEvent`]: crate::stream::InternalEvent + +use base64::{Engine as _, engine::general_purpose::STANDARD}; +use serde_json::{Map, Value}; + +use crate::event::EventKind; + +/// Which encoding to apply to a given field. +enum Field { + /// Base64-encode the string value itself. + String(&'static str), + + /// Base64-encode all string values within a JSON map, recursively. + Map(&'static str), +} + +impl Field { + /// Encode the field in the given value. + fn encode(&self, value: &mut Map) { + match self { + Self::String(key) => { + if let Some(v) = value.get_mut(*key) { + encode_string(v); + } + } + Self::Map(key) => { + if let Some(v) = value.get_mut(*key) { + encode_map_strings(v); + } + } + } + } +} + +/// Encode content fields for a conversation event being written to storage. +/// +/// The `kind` is used to select the correct field mapping via an exhaustive +/// match — adding a new [`EventKind`] variant will produce a compiler error +/// here, forcing the developer to decide which fields (if any) need encoding. +pub fn encode_event(value: &mut Value, kind: &EventKind) { + let Some(obj) = value.as_object_mut() else { + return; + }; + + // Metadata is present on all events. + if let Some(v) = obj.get_mut("metadata") { + encode_map_strings(v); + } + + // Each variant is listed explicitly so adding a new EventKind forces a + // decision about which fields (if any) need encoding. + // + // NOTE: don't forget to update `decode_event_value` when adding new + // variants! + let fields: &[Field] = match kind { + EventKind::ToolCallRequest(_) => &[Field::Map("arguments"), Field::Map("tool_answers")], + EventKind::ToolCallResponse(_) => &[Field::String("content")], + EventKind::TurnStart(_) + | EventKind::ChatRequest(_) + | EventKind::ChatResponse(_) + | EventKind::InquiryRequest(_) + | EventKind::InquiryResponse(_) => &[], + }; + + for field in fields { + field.encode(obj); + } +} + +/// Decode base64-encoded storage fields from a raw event JSON value. +/// +/// This uses the `type` tag to determine which fields to decode, mirroring the +/// encoding in [`encode_event`]. +pub fn decode_event_value(value: &mut Value) { + let Some(obj) = value.as_object_mut() else { + return; + }; + + // Metadata is present on all events. + if let Some(v) = obj.get_mut("metadata") { + decode_map_strings(v); + } + + let tag = obj.get("type").and_then(Value::as_str).unwrap_or_default(); + + match tag { + "tool_call_request" => { + if let Some(v) = obj.get_mut("arguments") { + decode_map_strings(v); + } + if let Some(v) = obj.get_mut("tool_answers") { + decode_map_strings(v); + } + } + "tool_call_response" => { + if let Some(v) = obj.get_mut("content") { + decode_string(v); + } + } + "turn_start" | "chat_request" | "chat_response" | "inquiry_request" + | "inquiry_response" => {} + _ => panic!("unknown event kind: {tag}"), + } +} + +/// Base64-encode a single JSON string value in place. +fn encode_string(value: &mut Value) { + if let Value::String(s) = value { + *s = STANDARD.encode(s.as_bytes()); + } +} + +/// Decode a base64-encoded JSON string value in place. Non-base64 strings are +/// left untouched. +fn decode_string(value: &mut Value) { + if let Value::String(s) = value + && let Ok(bytes) = STANDARD.decode(s.as_bytes()) + && let Ok(decoded) = String::from_utf8(bytes) + { + *s = decoded; + } +} + +/// Recursively base64-encode all string values in a JSON tree. +fn encode_map_strings(value: &mut Value) { + match value { + Value::String(s) => *s = STANDARD.encode(s.as_bytes()), + Value::Array(arr) => arr.iter_mut().for_each(encode_map_strings), + Value::Object(obj) => obj.values_mut().for_each(encode_map_strings), + _ => {} + } +} + +/// Recursively decode all base64-encoded string values in a JSON tree. +fn decode_map_strings(value: &mut Value) { + match value { + Value::String(s) => { + if let Ok(bytes) = STANDARD.decode(s.as_bytes()) + && let Ok(decoded) = String::from_utf8(bytes) + { + *s = decoded; + } + } + Value::Array(arr) => arr.iter_mut().for_each(decode_map_strings), + Value::Object(obj) => obj.values_mut().for_each(decode_map_strings), + _ => {} + } +} + +#[cfg(test)] +#[path = "storage_tests.rs"] +mod tests; diff --git a/crates/jp_conversation/src/storage_tests.rs b/crates/jp_conversation/src/storage_tests.rs new file mode 100644 index 00000000..8473c393 --- /dev/null +++ b/crates/jp_conversation/src/storage_tests.rs @@ -0,0 +1,75 @@ +use serde_json::json; + +use super::*; + +#[test] +fn encode_decode_tool_call_request() { + let mut value = json!({ + "type": "tool_call_request", + "id": "tc_1", + "name": "read_file", + "arguments": {"path": "src/main.rs"}, + "metadata": {"model": "gpt-4"} + }); + + let kind = EventKind::ToolCallRequest(crate::event::ToolCallRequest::new( + String::new(), + String::new(), + serde_json::Map::default(), + )); + + encode_event(&mut value, &kind); + + // Structural fields are untouched. + assert_eq!(value["id"], "tc_1"); + assert_eq!(value["name"], "read_file"); + + // Content fields are encoded. + assert_ne!(value["arguments"]["path"], "src/main.rs"); + assert_ne!(value["metadata"]["model"], "gpt-4"); + + // Round-trip. + decode_event_value(&mut value); + assert_eq!(value["arguments"]["path"], "src/main.rs"); + assert_eq!(value["metadata"]["model"], "gpt-4"); +} + +#[test] +fn encode_decode_tool_call_response() { + let mut value = json!({ + "type": "tool_call_response", + "id": "tc_1", + "content": "file contents here", + "is_error": false + }); + + let kind = EventKind::ToolCallResponse(crate::event::ToolCallResponse { + id: String::new(), + result: Ok(String::new()), + }); + + encode_event(&mut value, &kind); + assert_ne!(value["content"], "file contents here"); + assert_eq!(value["id"], "tc_1"); + + decode_event_value(&mut value); + assert_eq!(value["content"], "file contents here"); +} + +#[test] +fn events_without_content_fields_are_unchanged() { + let original = json!({ + "type": "chat_request", + "content": "hello world", + "timestamp": "2025-01-01T00:00:00Z" + }); + let mut value = original.clone(); + + let kind = EventKind::ChatRequest(crate::event::ChatRequest::from("hello world")); + encode_event(&mut value, &kind); + + assert_eq!(value["content"], "hello world"); + + decode_event_value(&mut value); + assert_eq!(value, original); +} diff --git a/crates/jp_conversation/src/stream.rs b/crates/jp_conversation/src/stream.rs index 59432558..55b202e0 100644 --- a/crates/jp_conversation/src/stream.rs +++ b/crates/jp_conversation/src/stream.rs @@ -2,20 +2,27 @@ use std::sync::Arc; -use chrono::{DateTime, TimeZone as _, Utc}; +use chrono::{DateTime, Utc}; use jp_config::{AppConfig, Config as _, PartialAppConfig, PartialConfig as _}; -use serde::{Deserialize, Serialize}; -use serde_json::{Map, Value}; +use serde::{Deserialize, Serialize, Serializer}; +use serde_json::Value; use tracing::error; -use crate::event::{ - ChatRequest, ChatResponse, ConversationEvent, EventKind, InquiryRequest, InquiryResponse, - ToolCallRequest, ToolCallResponse, +use crate::{ + event::{ + ChatRequest, ChatResponse, ConversationEvent, EventKind, InquiryId, InquiryRequest, + InquiryResponse, ToolCallRequest, ToolCallResponse, TurnStart, + }, + storage::{decode_event_value, encode_event}, }; /// An internal representation of events in a conversation stream. -#[derive(Debug, Clone, Serialize, PartialEq)] -#[serde(tag = "type", rename_all = "snake_case")] +/// +/// This type handles base64-encoding of content fields (tool arguments, tool +/// response content, metadata) during serialization, and decoding during +/// deserialization. This keeps the encoding concern isolated to the storage +/// layer — the inner [`ConversationEvent`] types serialize as plain text. +#[derive(Debug, Clone, PartialEq)] pub enum InternalEvent { /// The configuration state of the conversation is updated. /// @@ -28,12 +35,40 @@ pub enum InternalEvent { /// Any non-config events before the first `ConfigDelta` event are /// considered to have the default configuration. ConfigDelta(ConfigDelta), - // ConfigDelta(Box), /// An event in the conversation stream. - #[serde(untagged)] Event(Box), } +impl Serialize for InternalEvent { + fn serialize(&self, serializer: S) -> Result { + match self { + Self::ConfigDelta(delta) => { + #[derive(Serialize)] + struct Tagged<'a> { + #[serde(rename = "type")] + tag: &'static str, + #[serde(flatten)] + inner: &'a ConfigDelta, + } + + Tagged { + tag: "config_delta", + inner: delta, + } + .serialize(serializer) + } + Self::Event(event) => { + let mut value = + serde_json::to_value(event.as_ref()).map_err(serde::ser::Error::custom)?; + + // Base64-encode storage fields. + encode_event(&mut value, &event.kind); + value.serialize(serializer) + } + } + } +} + impl InternalEvent { /// Create a new [`InternalEvent::ConfigDelta`]. pub fn config_delta(delta: impl Into) -> Self { @@ -386,6 +421,20 @@ impl ConversationStream { self } + /// Push a [`ConversationEvent`] of type [`EventKind::TurnStart`] onto + /// the stream. + pub fn add_turn_start(&mut self) { + self.push(ConversationEvent::now(TurnStart)); + } + + /// Add a [`ConversationEvent`] of type [`EventKind::TurnStart`] onto the + /// stream. + #[must_use] + pub fn with_turn_start(mut self) -> Self { + self.add_turn_start(); + self + } + /// Returns the last [`ConversationEvent`] in the stream, wrapped in a /// [`ConversationEventWithConfigRef`], containing the [`PartialAppConfig`] /// at the time the event was added. @@ -438,6 +487,27 @@ impl ConversationStream { } } + /// Similar to [`Self::pop`], but only pops if the predicate returns `true`. + pub fn pop_if( + &mut self, + f: impl Fn(&ConversationEvent) -> bool, + ) -> Option { + if !self + .events + .iter() + .rev() + .find_map(|event| match event { + InternalEvent::Event(event) => Some(f(event)), + InternalEvent::ConfigDelta(_) => None, + }) + .unwrap_or(false) + { + return None; + } + + self.pop() + } + /// Retains only the [`ConversationEvent`]s that pass the predicate. /// /// This does NOT remove the [`ConfigDelta`]s. @@ -453,6 +523,277 @@ impl ConversationStream { self.events.clear(); } + /// Repairs structural invariants that may be violated after arbitrary + /// filtering (e.g. `--from`/`--until` on fork). + /// + /// Specifically: + /// 1. Drops conversation events before the first [`ChatRequest`], + /// preserving [`ConfigDelta`]s and [`TurnStart`]s. + /// 2. Removes orphaned [`ToolCallResponse`]s whose matching + /// [`ToolCallRequest`] is missing. + /// 3. Injects synthetic error [`ToolCallResponse`]s for + /// [`ToolCallRequest`]s that lack a matching response. + /// 4. Removes orphaned [`InquiryResponse`]s whose matching + /// [`InquiryRequest`] is missing. + /// 5. Removes orphaned [`InquiryRequest`]s whose matching + /// [`InquiryResponse`] is missing. + /// 6. Normalizes [`TurnStart`] events: ensures the stream begins + /// with exactly one `TurnStart` and re-indexes all turn starts + /// to a zero-based sequence. + pub fn sanitize(&mut self) { + self.drop_leading_non_user_events(); + self.remove_orphaned_tool_call_responses(); + self.sanitize_orphaned_tool_calls(); + self.remove_orphaned_inquiry_responses(); + self.remove_orphaned_inquiry_requests(); + self.normalize_turn_starts(); + } + + /// Drops conversation events before the first [`ChatRequest`] that + /// would be invalid as leading content (e.g. assistant responses, + /// tool call results). [`ConfigDelta`]s and [`TurnStart`]s are + /// preserved — config deltas maintain configuration state, and turn + /// markers are invisible to providers but useful for `--last`. + fn drop_leading_non_user_events(&mut self) { + let Some(pos) = self + .events + .iter() + .position(|e| matches!(e, InternalEvent::Event(event) if event.is_chat_request())) + else { + return; + }; + + let mut idx = 0; + self.events.retain(|event| { + let i = idx; + idx += 1; + if i >= pos { + return true; + } + match event { + InternalEvent::ConfigDelta(_) => true, + InternalEvent::Event(e) => e.is_turn_start(), + } + }); + } + + /// Removes [`ToolCallResponse`]s whose ID doesn't match any + /// [`ToolCallRequest`] in the stream. + fn remove_orphaned_tool_call_responses(&mut self) { + let request_ids: Vec = self + .events + .iter() + .filter_map(InternalEvent::as_event) + .filter_map(|e| e.as_tool_call_request()) + .map(|r| r.id.clone()) + .collect(); + + self.events.retain(|event| { + if let Some(event) = event.as_event() + && let Some(response) = event.as_tool_call_response() + { + return request_ids.contains(&response.id); + } + true + }); + } + + /// Removes [`InquiryResponse`]s whose ID doesn't match any + /// [`InquiryRequest`] in the stream. + fn remove_orphaned_inquiry_responses(&mut self) { + let request_ids: Vec = self + .events + .iter() + .filter_map(InternalEvent::as_event) + .filter_map(|e| e.as_inquiry_request()) + .map(|r| r.id.clone()) + .collect(); + + self.events.retain(|event| { + if let Some(event) = event.as_event() + && let Some(response) = event.as_inquiry_response() + { + return request_ids.contains(&response.id); + } + true + }); + } + + /// Removes [`InquiryRequest`]s whose ID doesn't match any + /// [`InquiryResponse`] in the stream. + fn remove_orphaned_inquiry_requests(&mut self) { + let response_ids: Vec = self + .events + .iter() + .filter_map(InternalEvent::as_event) + .filter_map(|e| e.as_inquiry_response()) + .map(|r| r.id.clone()) + .collect(); + + self.events.retain(|event| { + if let Some(event) = event.as_event() + && let Some(request) = event.as_inquiry_request() + { + return response_ids.contains(&request.id); + } + true + }); + } + + /// Ensures the stream has exactly one leading [`TurnStart`] and that + /// all `TurnStart` indices form a zero-based sequence. + /// + /// After filtering, the stream may have multiple stale `TurnStart`s + /// from earlier turns piled up at the front, or gaps in the index + /// sequence. This step: + /// - Inserts a `TurnStart(0)` if the stream has events but no + /// leading `TurnStart`. + /// - Removes duplicate `TurnStart`s that precede the first + /// `ChatRequest` (keeping only the last one). + /// - Re-indexes all `TurnStart` events to `0, 1, 2, …`. + fn normalize_turn_starts(&mut self) { + if self + .events + .iter() + .all(|e| !matches!(e, InternalEvent::Event(event) if !event.is_turn_start())) + { + // Stream has no non-TurnStart events, nothing to normalize. + return; + } + + // Find the position of the first ChatRequest. + let first_chat_pos = self + .events + .iter() + .position(|e| matches!(e, InternalEvent::Event(event) if event.is_chat_request())); + + // Remove all but the last TurnStart before the first ChatRequest. + // This collapses multiple stale turn markers from filtered turns + // into a single one. + if let Some(chat_pos) = first_chat_pos { + let leading_turn_starts: Vec = self.events[..chat_pos] + .iter() + .enumerate() + .filter(|(_, e)| matches!(e, InternalEvent::Event(event) if event.is_turn_start())) + .map(|(i, _)| i) + .collect(); + + if leading_turn_starts.len() > 1 { + // Keep the last one, remove the rest. + let to_remove: Vec = + leading_turn_starts[..leading_turn_starts.len() - 1].to_vec(); + let mut idx = 0; + self.events.retain(|_| { + let i = idx; + idx += 1; + !to_remove.contains(&i) + }); + } + } + + // Ensure there's a TurnStart before the first ChatRequest. + let first_event_is_turn_start = + self.events + .iter() + .any(|e| matches!(e, InternalEvent::Event(event) if event.is_turn_start())) + && self.events.iter().position( + |e| matches!(e, InternalEvent::Event(event) if event.is_turn_start()), + ) < self.events.iter().position( + |e| matches!(e, InternalEvent::Event(event) if event.is_chat_request()), + ); + + if !first_event_is_turn_start { + // Find where to insert (right before the first ChatRequest, + // or at position 0 if there are no ChatRequests). + let insert_pos = self + .events + .iter() + .position(|e| matches!(e, InternalEvent::Event(event) if event.is_chat_request())) + .unwrap_or(0); + + let timestamp = self + .events + .get(insert_pos) + .and_then(InternalEvent::as_event) + .map_or(DateTime::::UNIX_EPOCH, |e| e.timestamp); + + self.events.insert( + insert_pos, + InternalEvent::Event(Box::new(ConversationEvent::new(TurnStart, timestamp))), + ); + } + } + + /// Injects synthetic [`ToolCallResponse`]s for any [`ToolCallRequest`]s + /// that lack a matching response. + /// + /// This can happen when the user interrupts tool execution (e.g. Ctrl+C + /// → "save & exit") after the request has been streamed but before + /// responses are recorded. Providers such as Anthropic reject streams + /// where a `tool_use` block has no corresponding `tool_result`. + /// + /// The synthetic responses carry an error message explaining the + /// interruption, preserving the context that a tool call was attempted. + pub fn sanitize_orphaned_tool_calls(&mut self) { + // Collect IDs that already have a response. + let mut response_ids: Vec = Vec::new(); + for event in &self.events { + if let Some(event) = event.as_event() + && let EventKind::ToolCallResponse(resp) = &event.kind + { + response_ids.push(resp.id.clone()); + } + } + + // Walk the events to find orphaned request positions. + // Collect (index, id) pairs for requests that lack a response. + #[expect(clippy::needless_collect, reason = "borrow checker")] + let orphans: Vec<(usize, String, DateTime)> = self + .events + .iter() + .enumerate() + .filter_map(|(i, event)| { + event.as_event().and_then(|event| { + event.as_tool_call_request().and_then(|request| { + (!response_ids.contains(&request.id)) + .then(|| (i, request.id.clone(), event.timestamp)) + }) + }) + }) + .collect(); + + // Insert synthetic responses directly after each orphaned request. + // Iterate in reverse so earlier indices remain valid. + for (pos, id, timestamp) in orphans.into_iter().rev() { + let response = InternalEvent::Event(Box::new(ConversationEvent::new( + ToolCallResponse { + id, + result: Err("Tool call was interrupted.".to_string()), + }, + timestamp, + ))); + self.events.insert(pos + 1, response); + } + } + + /// Removes a trailing [`TurnStart`] event if it is the last + /// conversation event in the stream. + /// + /// This cleans up empty turns that can occur when the turn loop errors + /// out before any real events are added after the turn marker. + pub fn trim_trailing_empty_turn(&mut self) { + // Walk backwards past any config deltas to find the last real event. + if let Some(pos) = self + .events + .iter() + .rposition(|e| matches!(e, InternalEvent::Event(_))) + && let InternalEvent::Event(ref event) = self.events[pos] + && event.is_turn_start() + { + self.events.remove(pos); + } + } + /// Returns an iterator over the events in the stream, wrapped in a /// [`ConversationEventWithConfigRef`], containing the /// [`PartialAppConfig`] at the time the event was added. @@ -482,6 +823,8 @@ impl ConversationStream { #[doc(hidden)] #[must_use] pub fn new_test() -> Self { + use chrono::TimeZone as _; + Self { base_config: AppConfig::new_test().into(), events: vec![], @@ -873,170 +1216,42 @@ pub enum StreamError { Config(#[from] jp_config::ConfigError), } -// A custom deserializer for `InternalEvent` that allows us to avoid serde -// allocations when trying to match `untagged` enum variants. +// A custom deserializer for `InternalEvent` that avoids serde allocations +// when trying to match `untagged` enum variants. // -// This essentially "flattens" the `InternalEvent` enum into one where no -// untagged variants exist, allowing serde to match the correct variant based on -// the `type` field. +// Deserializes to a JSON `Value` first, then dispatches on the `type` tag. +// This avoids the allocation overhead serde incurs when trying each variant +// of an untagged enum. Base64-encoded storage fields are decoded before the +// final deserialization into typed events. // -// We then convert the `InternalEventFlattened` back into the original -// `InternalEvent` type. -// -// `cargo dhat` had shown this to be a hotspot in the codebase, so we're -// optimizing it for performance. -#[allow(clippy::allow_attributes, clippy::missing_docs_in_private_items)] +// `cargo dhat` had shown the untagged approach to be a hotspot. impl<'de> Deserialize<'de> for InternalEvent { - #[expect(clippy::too_many_lines)] fn deserialize(deserializer: D) -> Result where D: serde::Deserializer<'de>, { - #[derive(Deserialize)] - #[serde(tag = "type", rename_all = "snake_case")] - enum InternalEventFlattened { - ConfigDelta(ConfigDelta), - - ChatRequest { - #[serde(deserialize_with = "crate::deserialize_dt")] - timestamp: DateTime, - #[serde(default, with = "jp_serde::repr::base64_json_map")] - metadata: Map, - #[serde(flatten)] - data: ChatRequest, - }, - ChatResponse { - #[serde(deserialize_with = "crate::deserialize_dt")] - timestamp: DateTime, - #[serde(default, with = "jp_serde::repr::base64_json_map")] - metadata: Map, - #[serde(flatten)] - data: ChatResponse, - }, - ToolCallRequest { - #[serde(deserialize_with = "crate::deserialize_dt")] - timestamp: DateTime, - #[serde(default, with = "jp_serde::repr::base64_json_map")] - metadata: Map, - #[serde(flatten)] - data: ToolCallRequest, - }, - ToolCallResponse { - #[serde(deserialize_with = "crate::deserialize_dt")] - timestamp: DateTime, - #[serde(default, with = "jp_serde::repr::base64_json_map")] - metadata: Map, - #[serde(flatten)] - data: ToolCallResponse, - }, - InquiryRequest { - #[serde(deserialize_with = "crate::deserialize_dt")] - timestamp: DateTime, - #[serde(default, with = "jp_serde::repr::base64_json_map")] - metadata: Map, - #[serde(flatten)] - data: InquiryRequest, - }, - InquiryResponse { - #[serde(deserialize_with = "crate::deserialize_dt")] - timestamp: DateTime, - #[serde(default, with = "jp_serde::repr::base64_json_map")] - metadata: Map, - #[serde(flatten)] - data: InquiryResponse, - }, - } + let mut value = Value::deserialize(deserializer)?; - let event = match InternalEventFlattened::deserialize(deserializer)? { - InternalEventFlattened::ConfigDelta(d) => return Ok(Self::ConfigDelta(d)), + let tag = value + .get("type") + .and_then(Value::as_str) + .unwrap_or_default(); - InternalEventFlattened::ChatRequest { - timestamp, - metadata, - data, - } => ConversationEvent { - timestamp, - metadata, - kind: data.into(), - }, - InternalEventFlattened::ChatResponse { - timestamp, - metadata, - data, - } => ConversationEvent { - timestamp, - metadata, - kind: data.into(), - }, - InternalEventFlattened::ToolCallRequest { - timestamp, - metadata, - data, - } => ConversationEvent { - timestamp, - metadata, - kind: data.into(), - }, - InternalEventFlattened::ToolCallResponse { - timestamp, - metadata, - data, - } => ConversationEvent { - timestamp, - metadata, - kind: data.into(), - }, - InternalEventFlattened::InquiryRequest { - timestamp, - metadata, - data, - } => ConversationEvent { - timestamp, - metadata, - kind: data.into(), - }, - InternalEventFlattened::InquiryResponse { - timestamp, - metadata, - data, - } => ConversationEvent { - timestamp, - metadata, - kind: data.into(), - }, - }; + if tag == "config_delta" { + return serde_json::from_value(value) + .map(Self::ConfigDelta) + .map_err(serde::de::Error::custom); + } - Ok(Self::Event(Box::new(event))) + // Decode base64-encoded storage fields before deserializing. + decode_event_value(&mut value); + + serde_json::from_value(value) + .map(|e| Self::Event(Box::new(e))) + .map_err(serde::de::Error::custom) } } #[cfg(test)] -mod tests { - use super::*; - - #[test] - fn test_conversation_stream_serialization_roundtrip() { - let mut stream = ConversationStream::new_test(); - - insta::assert_json_snapshot!(&stream); - - stream - .events - .push(InternalEvent::Event(Box::new(ConversationEvent::new( - ChatRequest::from("foo"), - Utc.with_ymd_and_hms(2020, 1, 1, 0, 0, 0).unwrap(), - )))); - - stream - .events - .push(InternalEvent::Event(Box::new(ConversationEvent::new( - ChatResponse::message("bar"), - Utc.with_ymd_and_hms(2020, 1, 2, 0, 0, 0).unwrap(), - )))); - - insta::assert_json_snapshot!(&stream); - let json = serde_json::to_string(&stream).unwrap(); - let stream2 = serde_json::from_str::(&json).unwrap(); - assert_eq!(stream, stream2); - } -} +#[path = "stream_tests.rs"] +mod tests; diff --git a/crates/jp_conversation/src/stream_tests.rs b/crates/jp_conversation/src/stream_tests.rs new file mode 100644 index 00000000..e07d272b --- /dev/null +++ b/crates/jp_conversation/src/stream_tests.rs @@ -0,0 +1,488 @@ +use chrono::TimeZone as _; +use serde_json::{Map, Value}; + +use super::*; +use crate::event::{InquiryQuestion, InquirySource}; + +#[test] +fn test_sanitize_orphaned_tool_calls_injects_directly_after_request() { + let mut stream = ConversationStream::new_test(); + + stream.add_chat_request("Hello"); + stream.add_tool_call_request(ToolCallRequest { + id: "orphan_1".into(), + name: "some_tool".into(), + arguments: Map::new(), + }); + stream.add_chat_response(ChatResponse::message("trailing")); + + stream.sanitize_orphaned_tool_calls(); + + // Verify the response was injected. + let response = stream.find_tool_call_response("orphan_1"); + assert!(response.is_some(), "Expected synthetic response for orphan"); + assert!(response.unwrap().result.is_err()); + assert!(response.unwrap().content().contains("interrupted")); + + // Verify ordering: request must be immediately followed by + // its response — no events in between. + let events: Vec<_> = stream.iter().collect(); + let req_pos = events + .iter() + .position(|e| { + e.event + .as_tool_call_request() + .is_some_and(|r| r.id == "orphan_1") + }) + .unwrap(); + let resp_pos = events + .iter() + .position(|e| { + e.event + .as_tool_call_response() + .is_some_and(|r| r.id == "orphan_1") + }) + .unwrap(); + assert_eq!( + resp_pos, + req_pos + 1, + "Response must be directly after request" + ); +} + +#[test] +fn test_sanitize_orphaned_tool_calls_leaves_matched_alone() { + let mut stream = ConversationStream::new_test(); + + stream.add_tool_call_request(ToolCallRequest { + id: "matched_1".into(), + name: "tool".into(), + arguments: Map::new(), + }); + stream.add_tool_call_response(ToolCallResponse { + id: "matched_1".into(), + result: Ok("ok".into()), + }); + + let len_before = stream.len(); + stream.sanitize_orphaned_tool_calls(); + assert_eq!(stream.len(), len_before, "No extra events should be added"); +} + +#[test] +fn test_sanitize_orphaned_tool_calls_handles_partial_set() { + let mut stream = ConversationStream::new_test(); + + // Two parallel requests, only 'a' has a response. + stream.add_tool_call_request(ToolCallRequest { + id: "a".into(), + name: "tool".into(), + arguments: Map::new(), + }); + stream.add_tool_call_request(ToolCallRequest { + id: "b".into(), + name: "tool".into(), + arguments: Map::new(), + }); + stream.add_tool_call_response(ToolCallResponse { + id: "a".into(), + result: Ok("ok".into()), + }); + + stream.sanitize_orphaned_tool_calls(); + + // 'b' should get a synthetic response directly after its request. + let events: Vec<_> = stream.iter().collect(); + let req_b = events + .iter() + .position(|e| e.event.as_tool_call_request().is_some_and(|r| r.id == "b")) + .unwrap(); + let resp_b = events + .iter() + .position(|e| e.event.as_tool_call_response().is_some_and(|r| r.id == "b")) + .unwrap(); + assert_eq!( + resp_b, + req_b + 1, + "Synthetic response for 'b' must follow its request" + ); + + // 'a' should still have its original response. + assert_eq!(stream.find_tool_call_response("a").unwrap().content(), "ok"); +} + +#[test] +fn test_trim_trailing_empty_turn_removes_lone_turn_start() { + let mut stream = ConversationStream::new_test(); + stream.add_turn_start(); + assert_eq!(stream.len(), 1); + + stream.trim_trailing_empty_turn(); + assert_eq!(stream.len(), 0); +} + +#[test] +fn test_trim_trailing_empty_turn_keeps_non_empty_turn() { + let mut stream = ConversationStream::new_test(); + stream.add_turn_start(); + stream.add_chat_request("Hello"); + assert_eq!(stream.len(), 2); + + stream.trim_trailing_empty_turn(); + assert_eq!(stream.len(), 2, "Turn with events should not be removed"); +} + +#[test] +fn test_conversation_stream_serialization_roundtrip() { + let mut stream = ConversationStream::new_test(); + + insta::assert_json_snapshot!(&stream); + + stream + .events + .push(InternalEvent::Event(Box::new(ConversationEvent::new( + ChatRequest::from("foo"), + Utc.with_ymd_and_hms(2020, 1, 1, 0, 0, 0).unwrap(), + )))); + + stream + .events + .push(InternalEvent::Event(Box::new(ConversationEvent::new( + ChatResponse::message("bar"), + Utc.with_ymd_and_hms(2020, 1, 2, 0, 0, 0).unwrap(), + )))); + + insta::assert_json_snapshot!(&stream); + let json = serde_json::to_string(&stream).unwrap(); + let stream2 = serde_json::from_str::(&json).unwrap(); + assert_eq!(stream, stream2); +} + +#[test] +fn test_serialization_base64_encodes_tool_call_fields() { + let mut stream = ConversationStream::new_test(); + + let mut args = Map::new(); + args.insert("path".into(), Value::String("src/main.rs".into())); + + stream.add_tool_call_request(ToolCallRequest { + id: "tc1".into(), + name: "read_file".into(), + arguments: args, + }); + stream.add_tool_call_response(ToolCallResponse { + id: "tc1".into(), + result: Ok("file contents here".into()), + }); + + // Serialize to JSON (as storage would). + let json = serde_json::to_string(&stream).unwrap(); + + // The raw JSON should NOT contain the plain-text values — they + // should be base64-encoded. + assert!( + !json.contains("src/main.rs"), + "Tool arguments should be base64-encoded on disk" + ); + assert!( + !json.contains("file contents here"), + "Tool response content should be base64-encoded on disk" + ); + + // Roundtrip should recover the original values. + let stream2 = serde_json::from_str::(&json).unwrap(); + + let req = stream2 + .iter() + .find_map(|e| e.event.as_tool_call_request()) + .unwrap(); + assert_eq!(req.arguments["path"], "src/main.rs"); + + let resp = stream2.find_tool_call_response("tc1").unwrap(); + assert_eq!(resp.content(), "file contents here"); +} + +#[test] +fn test_sanitize_drops_leading_non_user_events() { + let mut stream = ConversationStream::new_test(); + + // Simulate a fork where --from cut into the middle of a turn: + // the ChatRequest was removed but the response and subsequent + // events remain. + stream.push(ConversationEvent::new( + ChatResponse::message("orphaned response"), + Utc.with_ymd_and_hms(2020, 1, 1, 0, 0, 0).unwrap(), + )); + stream.push(ConversationEvent::new( + ChatRequest::from("real question"), + Utc.with_ymd_and_hms(2020, 1, 1, 0, 0, 1).unwrap(), + )); + stream.push(ConversationEvent::new( + ChatResponse::message("real answer"), + Utc.with_ymd_and_hms(2020, 1, 1, 0, 0, 2).unwrap(), + )); + + stream.sanitize(); + + // TurnStart injected + ChatRequest + ChatResponse + assert_eq!(stream.len(), 3); + assert!( + stream.first().unwrap().event.is_turn_start(), + "Stream should start with TurnStart after sanitize" + ); +} + +#[test] +fn test_sanitize_removes_orphaned_tool_call_response() { + let mut stream = ConversationStream::new_test(); + + // --from removed the ToolCallRequest but kept the response. + stream.push(ConversationEvent::new( + ChatRequest::from("question"), + Utc.with_ymd_and_hms(2020, 1, 1, 0, 0, 0).unwrap(), + )); + stream.push(ConversationEvent::new( + ToolCallResponse { + id: "orphan".into(), + result: Ok("data".into()), + }, + Utc.with_ymd_and_hms(2020, 1, 1, 0, 0, 1).unwrap(), + )); + stream.push(ConversationEvent::new( + ChatResponse::message("answer"), + Utc.with_ymd_and_hms(2020, 1, 1, 0, 0, 2).unwrap(), + )); + + stream.sanitize(); + + // TurnStart injected + ChatRequest + ChatResponse + assert_eq!(stream.len(), 3); + assert!( + stream.find_tool_call_response("orphan").is_none(), + "Orphaned ToolCallResponse should be removed" + ); +} + +#[test] +fn test_sanitize_removes_orphaned_inquiry_response() { + let mut stream = ConversationStream::new_test(); + + stream.push(ConversationEvent::new( + ChatRequest::from("question"), + Utc.with_ymd_and_hms(2020, 1, 1, 0, 0, 0).unwrap(), + )); + stream.push(ConversationEvent::new( + InquiryResponse::boolean("orphan", true), + Utc.with_ymd_and_hms(2020, 1, 1, 0, 0, 1).unwrap(), + )); + stream.push(ConversationEvent::new( + ChatResponse::message("answer"), + Utc.with_ymd_and_hms(2020, 1, 1, 0, 0, 2).unwrap(), + )); + + stream.sanitize(); + + // TurnStart injected + ChatRequest + ChatResponse + assert_eq!(stream.len(), 3); + let has_inquiry_response = stream.iter().any(|e| e.event.is_inquiry_response()); + assert!( + !has_inquiry_response, + "Orphaned InquiryResponse should be removed" + ); +} + +#[test] +fn test_sanitize_removes_orphaned_inquiry_request() { + let mut stream = ConversationStream::new_test(); + + stream.push(ConversationEvent::new( + ChatRequest::from("question"), + Utc.with_ymd_and_hms(2020, 1, 1, 0, 0, 0).unwrap(), + )); + stream.push(ConversationEvent::new( + InquiryRequest::new( + "orphan", + InquirySource::Assistant, + InquiryQuestion::boolean("proceed?".into()), + ), + Utc.with_ymd_and_hms(2020, 1, 1, 0, 0, 1).unwrap(), + )); + stream.push(ConversationEvent::new( + ChatResponse::message("answer"), + Utc.with_ymd_and_hms(2020, 1, 1, 0, 0, 2).unwrap(), + )); + + stream.sanitize(); + + // TurnStart injected + ChatRequest + ChatResponse + assert_eq!(stream.len(), 3); + let has_inquiry_request = stream.iter().any(|e| e.event.is_inquiry_request()); + assert!( + !has_inquiry_request, + "Orphaned InquiryRequest should be removed" + ); +} + +#[test] +fn test_sanitize_keeps_matched_pairs_intact() { + let mut stream = ConversationStream::new_test(); + + stream.push(ConversationEvent::new( + ChatRequest::from("question"), + Utc.with_ymd_and_hms(2020, 1, 1, 0, 0, 0).unwrap(), + )); + stream.push(ConversationEvent::new( + ToolCallRequest { + id: "tc1".into(), + name: "read_file".into(), + arguments: Map::new(), + }, + Utc.with_ymd_and_hms(2020, 1, 1, 0, 0, 1).unwrap(), + )); + stream.push(ConversationEvent::new( + ToolCallResponse { + id: "tc1".into(), + result: Ok("contents".into()), + }, + Utc.with_ymd_and_hms(2020, 1, 1, 0, 0, 2).unwrap(), + )); + stream.push(ConversationEvent::new( + InquiryRequest::new( + "iq1", + InquirySource::Tool { + name: "read_file".into(), + }, + InquiryQuestion::boolean("proceed?".into()), + ), + Utc.with_ymd_and_hms(2020, 1, 1, 0, 0, 3).unwrap(), + )); + stream.push(ConversationEvent::new( + InquiryResponse::boolean("iq1", true), + Utc.with_ymd_and_hms(2020, 1, 1, 0, 0, 4).unwrap(), + )); + stream.push(ConversationEvent::new( + ChatResponse::message("done"), + Utc.with_ymd_and_hms(2020, 1, 1, 0, 0, 5).unwrap(), + )); + + stream.sanitize(); + + // TurnStart injected + 6 original events + assert_eq!( + stream.len(), + 7, + "All matched events should be preserved (plus injected TurnStart)" + ); +} + +#[test] +fn test_sanitize_handles_from_cutting_through_tool_call() { + let mut stream = ConversationStream::new_test(); + + // Simulates --from removing the ToolCallRequest but keeping + // its response, plus the subsequent turn. + stream.push(ConversationEvent::new( + ToolCallResponse { + id: "cut".into(), + result: Ok("data".into()), + }, + Utc.with_ymd_and_hms(2020, 1, 1, 0, 0, 1).unwrap(), + )); + stream.push(ConversationEvent::new( + ChatResponse::message("based on that tool..."), + Utc.with_ymd_and_hms(2020, 1, 1, 0, 0, 2).unwrap(), + )); + stream.push(ConversationEvent::new( + ChatRequest::from("next question"), + Utc.with_ymd_and_hms(2020, 1, 1, 0, 0, 3).unwrap(), + )); + stream.push(ConversationEvent::new( + ChatResponse::message("next answer"), + Utc.with_ymd_and_hms(2020, 1, 1, 0, 0, 4).unwrap(), + )); + + stream.sanitize(); + + // Leading ToolCallResponse and ChatResponse dropped, + // orphaned ToolCallResponse also removed. + // TurnStart injected + ChatRequest + ChatResponse + assert_eq!(stream.len(), 3); + assert!(stream.first().unwrap().event.is_turn_start()); +} + +#[test] +fn test_sanitize_deduplicates_leading_turn_starts() { + let mut stream = ConversationStream::new_test(); + + // Simulate --from keeping TurnStarts from multiple filtered turns + // that all precede the first ChatRequest. + stream.add_turn_start(); + stream.add_turn_start(); + stream.add_turn_start(); + stream.push(ConversationEvent::new( + ChatRequest::from("hello"), + Utc.with_ymd_and_hms(2020, 1, 1, 0, 0, 0).unwrap(), + )); + stream.push(ConversationEvent::new( + ChatResponse::message("hi"), + Utc.with_ymd_and_hms(2020, 1, 1, 0, 0, 1).unwrap(), + )); + + stream.sanitize(); + + let turn_count = stream.iter().filter(|e| e.event.is_turn_start()).count(); + assert_eq!(turn_count, 1, "Should have exactly one TurnStart"); +} + +#[test] +fn test_sanitize_reindexes_turn_starts() { + let mut stream = ConversationStream::new_test(); + + // Two turns with non-sequential indices (from filtering). + stream.add_turn_start(); + stream.push(ConversationEvent::new( + ChatRequest::from("first"), + Utc.with_ymd_and_hms(2020, 1, 1, 0, 0, 0).unwrap(), + )); + stream.push(ConversationEvent::new( + ChatResponse::message("reply"), + Utc.with_ymd_and_hms(2020, 1, 1, 0, 0, 1).unwrap(), + )); + stream.add_turn_start(); + stream.push(ConversationEvent::new( + ChatRequest::from("second"), + Utc.with_ymd_and_hms(2020, 1, 1, 0, 0, 2).unwrap(), + )); + stream.push(ConversationEvent::new( + ChatResponse::message("reply2"), + Utc.with_ymd_and_hms(2020, 1, 1, 0, 0, 3).unwrap(), + )); + + stream.sanitize(); + + let turn_count = stream.iter().filter(|e| e.event.is_turn_start()).count(); + assert_eq!(turn_count, 2); +} + +#[test] +fn test_sanitize_noop_on_healthy_stream() { + let mut stream = ConversationStream::new_test(); + + stream.add_turn_start(); + stream.push(ConversationEvent::new( + ChatRequest::from("hello"), + Utc.with_ymd_and_hms(2020, 1, 1, 0, 0, 0).unwrap(), + )); + stream.push(ConversationEvent::new( + ChatResponse::message("hi"), + Utc.with_ymd_and_hms(2020, 1, 1, 0, 0, 1).unwrap(), + )); + + let len_before = stream.len(); + stream.sanitize(); + assert_eq!( + stream.len(), + len_before, + "Healthy stream should be unchanged" + ); +} diff --git a/crates/jp_conversation/src/thread.rs b/crates/jp_conversation/src/thread.rs index c21aeee4..2065a237 100644 --- a/crates/jp_conversation/src/thread.rs +++ b/crates/jp_conversation/src/thread.rs @@ -1,7 +1,7 @@ //! See [`Thread`]. use jp_attachment::Attachment; -use jp_config::assistant::instructions::InstructionsConfig; +use jp_config::assistant::sections::SectionConfig; use quick_xml::se::TextFormat; use serde::Serialize; use tracing::trace; @@ -17,8 +17,8 @@ pub struct ThreadBuilder { /// See [`Thread::system_prompt`]. pub system_prompt: Option, - /// See [`Thread::instructions`]. - pub instructions: Vec, + /// See [`Thread::sections`]. + pub sections: Vec, /// See [`Thread::attachments`]. pub attachments: Vec, @@ -33,7 +33,7 @@ impl ThreadBuilder { pub const fn new() -> Self { Self { system_prompt: None, - instructions: Vec::new(), + sections: Vec::new(), attachments: Vec::new(), events: None, } @@ -46,17 +46,17 @@ impl ThreadBuilder { self } - /// Set the instructions for the thread. + /// Set the sections for the thread. #[must_use] - pub fn with_instructions(mut self, instructions: Vec) -> Self { - self.instructions = instructions; + pub fn with_sections(mut self, sections: Vec) -> Self { + self.sections = sections; self } - /// Add an instruction to the thread. + /// Add a section to the thread. #[must_use] - pub fn add_instruction(mut self, instruction: InstructionsConfig) -> Self { - self.instructions.push(instruction); + pub fn add_section(mut self, section: SectionConfig) -> Self { + self.sections.push(section); self } @@ -74,16 +74,6 @@ impl ThreadBuilder { self } - // /// Add an event to the thread. - // /// - // /// If no [`ConversationStream`] exists, a default one is created. - // #[must_use] - // pub fn with_event(mut self, event: impl Into) -> Self { - // let mut stream = self.events.take().unwrap_or_default(); - // stream.push(event); - // self.with_events(stream) - // } - /// Build the thread. /// /// # Errors @@ -92,7 +82,7 @@ impl ThreadBuilder { pub fn build(self) -> Result { let Self { system_prompt, - instructions, + sections, attachments, events, } = self; @@ -102,7 +92,7 @@ impl ThreadBuilder { Ok(Thread { system_prompt, - instructions, + sections, attachments, events, }) @@ -118,8 +108,11 @@ pub struct Thread { /// The system prompt to use. pub system_prompt: Option, - /// The instructions to use. - pub instructions: Vec, + /// The sections to include after the system prompt. + /// + /// Each section is rendered via [`SectionConfig::render()`] before + /// being sent to the provider. + pub sections: Vec, /// The attachments to use. pub attachments: Vec, @@ -131,6 +124,8 @@ pub struct Thread { impl Thread { /// Convert the thread into a list of messages. /// + /// Sections are rendered to strings via [`SectionConfig::render()`]. + /// /// # Errors /// /// Returns an error if XML serialization fails. @@ -146,7 +141,7 @@ impl Thread { { let Self { system_prompt, - instructions, + sections, attachments, events, } = self; @@ -158,8 +153,8 @@ impl Thread { parts.push(system_prompt); } - for instruction in &instructions { - parts.push(instruction.try_to_xml()?); + for section in §ions { + parts.push(section.render()); } if !attachments.is_empty() {