diff --git a/CHANGELOG.md b/CHANGELOG.md index cc32d81d..518d6af8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,7 +11,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Improvements -* `.stream()` and `.stream_async()` now emit `` / `` tag boundaries around thinking content in all stream modes. With `content="text"`, concatenating all chunks produces well-formed output with thinking delimited by a single tag pair. With `content="all"`, tag boundary strings are yielded alongside typed `ContentThinking` objects, so downstream consumers can detect thinking boundaries without type inspection. (#294, #297) +* `.stream()` and `.stream_async()` now handle thinking content differently by mode. With `content="text"`, thinking is suppressed entirely. With `content="all"`, thinking fragments are yielded as `ContentThinkingDelta` objects with a `phase` property (`"start"`, `"body"`, or `"end"`) that communicates block boundaries to downstream consumers without injecting synthetic strings into the stream. (#299, #297, #294) * Updated default models across all providers to current generation: (#292) * Anthropic: `claude-sonnet-4-6` * Bedrock: `us.anthropic.claude-sonnet-4-6` diff --git a/chatlas/_chat.py b/chatlas/_chat.py index 6b1a2705..9beafd65 100644 --- a/chatlas/_chat.py +++ b/chatlas/_chat.py @@ -35,6 +35,7 @@ ContentJson, ContentText, ContentThinking, + ContentThinkingDelta, ContentToolRequest, ContentToolResult, ToolInfo, @@ -1157,7 +1158,7 @@ def stream( data_model: Optional[type[BaseModel]] = None, kwargs: Optional[SubmitInputArgsT] = None, ) -> Generator[ - str | ContentThinking | ContentToolRequest | ContentToolResult, None, None + str | ContentThinkingDelta | ContentToolRequest | ContentToolResult, None, None ]: ... def stream( @@ -1168,7 +1169,7 @@ def stream( data_model: Optional[type[BaseModel]] = None, kwargs: Optional[SubmitInputArgsT] = None, ) -> Generator[ - str | ContentThinking | ContentToolRequest | ContentToolResult, None, None + str | ContentThinkingDelta | ContentToolRequest | ContentToolResult, None, None ]: """ Generate a response from the chat in a streaming fashion. @@ -1233,7 +1234,7 @@ class Person(BaseModel): ) def wrapper() -> Generator[ - str | ContentThinking | ContentToolRequest | ContentToolResult, None, None + str | ContentThinkingDelta | ContentToolRequest | ContentToolResult, None, None ]: with display: for chunk in generator: @@ -1260,7 +1261,7 @@ async def stream_async( data_model: Optional[type[BaseModel]] = None, kwargs: Optional[SubmitInputArgsT] = None, ) -> AsyncGenerator[ - str | ContentThinking | ContentToolRequest | ContentToolResult, None + str | ContentThinkingDelta | ContentToolRequest | ContentToolResult, None ]: ... async def stream_async( @@ -1271,7 +1272,7 @@ async def stream_async( data_model: Optional[type[BaseModel]] = None, kwargs: Optional[SubmitInputArgsT] = None, ) -> AsyncGenerator[ - str | ContentThinking | ContentToolRequest | ContentToolResult, None + str | ContentThinkingDelta | ContentToolRequest | ContentToolResult, None ]: """ Generate a response from the chat in a streaming fashion asynchronously. @@ -1332,7 +1333,7 @@ class Person(BaseModel): display = self._markdown_display(echo=echo) async def wrapper() -> AsyncGenerator[ - str | ContentThinking | ContentToolRequest | ContentToolResult, None + str | ContentThinkingDelta | ContentToolRequest | ContentToolResult, None ]: with display: async for chunk in self._chat_impl_async( @@ -2494,7 +2495,7 @@ def _chat_impl( kwargs: Optional[SubmitInputArgsT] = None, data_model: Optional[type[BaseModel]] = None, ) -> Generator[ - str | ContentThinking | ContentToolRequest | ContentToolResult, None, None + str | ContentThinkingDelta | ContentToolRequest | ContentToolResult, None, None ]: ... def _chat_impl( @@ -2564,7 +2565,7 @@ def _chat_impl_async( kwargs: Optional[SubmitInputArgsT] = None, data_model: Optional[type[BaseModel]] = None, ) -> AsyncGenerator[ - str | ContentThinking | ContentToolRequest | ContentToolResult, None + str | ContentThinkingDelta | ContentToolRequest | ContentToolResult, None ]: ... async def _chat_impl_async( @@ -2676,28 +2677,33 @@ def emit(text: str | Content): for chunk in response: content = self.provider.stream_content(chunk) if content is not None: - text = content_text(content) - if text: - is_thinking = isinstance(content, ContentThinking) - if is_thinking and not inside_thinking: - emit("\n") - yield "\n" - inside_thinking = True - elif not is_thinking and inside_thinking: - emit("\n\n\n") - yield "\n\n\n" - inside_thinking = False - - emit(text) - if content_mode == "all" and is_thinking: + if is_thinking_delta(content) and not inside_thinking: + content = ContentThinkingDelta( + thinking=content.thinking, phase="start" + ) + emit("\n") + inside_thinking = True + elif not is_thinking_delta(content) and inside_thinking: + emit("\n\n\n") + if content_mode == "all": + yield ContentThinkingDelta(thinking="", phase="end") + inside_thinking = False + + if is_thinking_delta(content): + emit(content.thinking) + if content_mode == "all": yield content - else: + else: + text = content_text(content) + if text: + emit(text) yield text result = self.provider.stream_merge_chunks(result, chunk) if inside_thinking: emit("\n\n\n") - yield "\n\n\n" + if content_mode == "all": + yield ContentThinkingDelta(thinking="", phase="end") turn = self.provider.stream_turn( result, @@ -2796,28 +2802,33 @@ def emit(text: str | Content): async for chunk in response: content = self.provider.stream_content(chunk) if content is not None: - text = content_text(content) - if text: - is_thinking = isinstance(content, ContentThinking) - if is_thinking and not inside_thinking: - emit("\n") - yield "\n" - inside_thinking = True - elif not is_thinking and inside_thinking: - emit("\n\n\n") - yield "\n\n\n" - inside_thinking = False - - emit(text) - if content_mode == "all" and is_thinking: + if is_thinking_delta(content) and not inside_thinking: + content = ContentThinkingDelta( + thinking=content.thinking, phase="start" + ) + emit("\n") + inside_thinking = True + elif not is_thinking_delta(content) and inside_thinking: + emit("\n\n\n") + if content_mode == "all": + yield ContentThinkingDelta(thinking="", phase="end") + inside_thinking = False + + if is_thinking_delta(content): + emit(content.thinking) + if content_mode == "all": yield content - else: + else: + text = content_text(content) + if text: + emit(text) yield text result = self.provider.stream_merge_chunks(result, chunk) if inside_thinking: emit("\n\n\n") - yield "\n\n\n" + if content_mode == "all": + yield ContentThinkingDelta(thinking="", phase="end") turn = self.provider.stream_turn( result, @@ -3292,8 +3303,14 @@ class ToolFailureWarning(RuntimeWarning): warnings.simplefilter("always", ToolFailureWarning) +def is_thinking_delta(content: Content) -> TypeGuard[ContentThinkingDelta]: + return isinstance(content, ContentThinkingDelta) + + def content_text(content: Content) -> str: """Extract displayable text from a Content object.""" + if isinstance(content, ContentThinkingDelta): + return content.thinking if isinstance(content, ContentThinking): return content.thinking if isinstance(content, ContentText): diff --git a/chatlas/_content.py b/chatlas/_content.py index 1c88acd6..296bdda6 100644 --- a/chatlas/_content.py +++ b/chatlas/_content.py @@ -10,7 +10,6 @@ BaseModel, ConfigDict, Field, - PrivateAttr, field_serializer, field_validator, ) @@ -139,6 +138,7 @@ def from_tool(cls, tool: "Tool | ToolBuiltIn") -> "ToolInfo": "json", "pdf", "thinking", + "thinking_delta", "web_search_request", "web_search_results", "web_fetch_request", @@ -631,20 +631,11 @@ class ContentThinking(Content): thinking: str extra: Optional[dict[str, Any]] = None - _complete: bool = PrivateAttr(default=True) content_type: ContentTypeEnum = "thinking" - @classmethod - def _as_chunk(cls, thinking: str) -> "ContentThinking": - obj = cls.model_construct(thinking=thinking, content_type="thinking") - obj._complete = False - return obj - def __str__(self): - if self._complete: - return f"\n{self.thinking}\n\n" - return self.thinking + return f"\n{self.thinking}\n\n" def _repr_html_(self): return str(self.tagify()) @@ -663,6 +654,30 @@ def tagify(self): return HTML(html) +class ContentThinkingDelta(Content): + """ + A streaming fragment of thinking/reasoning content. + + Emitted during streaming to represent a chunk of the model's thinking. + The ``phase`` attribute communicates block boundaries to downstream consumers. + + Parameters + ---------- + thinking + The thinking/reasoning text fragment. + phase + The phase of the thinking delta: ``"start"``, ``"body"``, or ``"end"``. + """ + + thinking: str + phase: Literal["start", "body", "end"] = "body" + + content_type: ContentTypeEnum = "thinking_delta" + + def __str__(self): + return self.thinking + + class ContentToolRequestSearch(Content): """ A web search request from the model. diff --git a/chatlas/_provider_anthropic.py b/chatlas/_provider_anthropic.py index f7db8e63..60ff9fb1 100644 --- a/chatlas/_provider_anthropic.py +++ b/chatlas/_provider_anthropic.py @@ -26,6 +26,7 @@ ContentPDF, ContentText, ContentThinking, + ContentThinkingDelta, ContentToolRequest, ContentToolRequestFetch, ContentToolRequestSearch, @@ -468,7 +469,7 @@ def stream_content(self, chunk) -> Optional[Content]: if chunk.delta.type == "text_delta": return ContentText.model_construct(text=chunk.delta.text) if chunk.delta.type == "thinking_delta": - return ContentThinking._as_chunk(chunk.delta.thinking) + return ContentThinkingDelta(thinking=chunk.delta.thinking) return None def stream_merge_chunks(self, completion, chunk): diff --git a/chatlas/_provider_google.py b/chatlas/_provider_google.py index f74aa64f..585ab57b 100644 --- a/chatlas/_provider_google.py +++ b/chatlas/_provider_google.py @@ -15,6 +15,7 @@ ContentPDF, ContentText, ContentThinking, + ContentThinkingDelta, ContentToolRequest, ContentToolResult, ) @@ -377,7 +378,7 @@ def stream_content(self, chunk) -> Optional[Content]: if text is None: return None if getattr(part, "thought", None): - return ContentThinking._as_chunk(text) + return ContentThinkingDelta(thinking=text) return ContentText.model_construct(text=text) def stream_merge_chunks(self, completion, chunk): diff --git a/chatlas/_provider_openai.py b/chatlas/_provider_openai.py index 0ff81b4b..e097f501 100644 --- a/chatlas/_provider_openai.py +++ b/chatlas/_provider_openai.py @@ -17,6 +17,7 @@ ContentPDF, ContentText, ContentThinking, + ContentThinkingDelta, ContentToolRequest, ContentToolRequestSearch, ContentToolResult, @@ -298,7 +299,7 @@ def stream_content(self, chunk) -> Optional[Content]: return ContentText.model_construct(text=chunk.delta) if chunk.type == "response.reasoning_summary_text.delta": # https://platform.openai.com/docs/api-reference/responses-streaming/response/reasoning_summary_text/delta - return ContentThinking._as_chunk(chunk.delta) + return ContentThinkingDelta(thinking=chunk.delta) if chunk.type == "response.reasoning_summary_text.done": # The thinking→text transition in _submit_turns already emits # "\n\n\n" which provides the visual separator. diff --git a/chatlas/types/__init__.py b/chatlas/types/__init__.py index 782b7e11..4cb2cf4f 100644 --- a/chatlas/types/__init__.py +++ b/chatlas/types/__init__.py @@ -10,6 +10,8 @@ ContentImageRemote, ContentJson, ContentText, + ContentThinking, + ContentThinkingDelta, ContentToolRequest, ContentToolRequestFetch, ContentToolRequestSearch, @@ -32,6 +34,8 @@ "ContentImageRemote", "ContentJson", "ContentText", + "ContentThinking", + "ContentThinkingDelta", "ContentToolRequest", "ContentToolResult", "ContentToolRequestFetch", diff --git a/docs/_quarto.yml b/docs/_quarto.yml index 3dcf40da..e77eff7e 100644 --- a/docs/_quarto.yml +++ b/docs/_quarto.yml @@ -202,6 +202,8 @@ quartodoc: - types.ContentImageRemote - types.ContentJson - types.ContentText + - types.ContentThinking + - types.ContentThinkingDelta - types.ContentToolRequest - types.ContentToolResult - types.ContentToolRequestSearch diff --git a/tests/test_stream_thinking.py b/tests/test_stream_thinking.py index a5ad76d0..6574dbbd 100644 --- a/tests/test_stream_thinking.py +++ b/tests/test_stream_thinking.py @@ -1,11 +1,11 @@ -"""Tests for streaming thinking tag boundary emission.""" +"""Tests for streaming thinking with ContentThinkingDelta.""" from collections.abc import Sequence from typing import Optional import pytest from chatlas import Chat -from chatlas._content import Content, ContentText, ContentThinking +from chatlas._content import Content, ContentText, ContentThinkingDelta from chatlas._provider import Provider from chatlas._turn import AssistantTurn @@ -83,32 +83,28 @@ def _make_chat(chunks: Sequence[Optional[Content]]) -> Chat: class TestStreamThinkingText: - """Tests for content='text' mode — tags should be yielded as string chunks.""" + """Tests for content='text' mode — thinking is suppressed.""" - def test_thinking_then_text(self): - """Streaming thinking → text produces proper tag boundaries.""" + def test_thinking_suppressed(self): chunks = [ - ContentThinking._as_chunk("step 1 "), - ContentThinking._as_chunk("step 2"), + ContentThinkingDelta(thinking="step 1 "), + ContentThinkingDelta(thinking="step 2"), ContentText.model_construct(text="Hello world"), ] chat = _make_chat(chunks) result = list(chat.stream("test")) combined = "".join(result) - assert combined == "\nstep 1 step 2\n\n\nHello world" + assert combined == "Hello world" def test_thinking_only(self): - """If stream ends during thinking, close tag is still emitted.""" chunks = [ - ContentThinking._as_chunk("reasoning here"), + ContentThinkingDelta(thinking="reasoning here"), ] chat = _make_chat(chunks) result = list(chat.stream("test")) - combined = "".join(result) - assert combined == "\nreasoning here\n\n\n" + assert result == [] def test_text_only(self): - """No thinking chunks means no tags emitted.""" chunks = [ ContentText.model_construct(text="Just text"), ] @@ -117,127 +113,107 @@ def test_text_only(self): combined = "".join(result) assert combined == "Just text" - def test_tag_chunks_are_separate(self): - """Opening and closing tags are yielded as separate chunks.""" - chunks = [ - ContentThinking._as_chunk("thought"), - ContentText.model_construct(text="answer"), - ] - chat = _make_chat(chunks) - result = list(chat.stream("test")) - assert result[0] == "\n" - assert result[1] == "thought" - assert result[2] == "\n\n\n" - assert result[3] == "answer" - class TestStreamThinkingAll: - """Tests for content='all' mode — ContentThinking objects AND tag boundary strings yielded.""" + """Tests for content='all' mode — ContentThinkingDelta objects yielded with phase.""" def test_thinking_then_text(self): - """content='all' yields tag boundaries AND ContentThinking objects.""" chunks = [ - ContentThinking._as_chunk("step 1 "), - ContentThinking._as_chunk("step 2"), + ContentThinkingDelta(thinking="step 1 "), + ContentThinkingDelta(thinking="step 2"), ContentText.model_construct(text="Hello"), ] chat = _make_chat(chunks) result = list(chat.stream("test", content="all")) - thinking_chunks = [x for x in result if isinstance(x, ContentThinking)] - str_chunks = [x for x in result if isinstance(x, str)] - - assert len(thinking_chunks) == 2 + thinking_chunks = [x for x in result if isinstance(x, ContentThinkingDelta)] + assert len(thinking_chunks) == 3 assert thinking_chunks[0].thinking == "step 1 " + assert thinking_chunks[0].phase == "start" assert thinking_chunks[1].thinking == "step 2" - assert "\n" in str_chunks - assert "\n\n\n" in str_chunks - assert "Hello" in str_chunks + assert thinking_chunks[1].phase == "body" + assert thinking_chunks[2].thinking == "" + assert thinking_chunks[2].phase == "end" - def test_tag_boundaries_yielded(self): - """content='all' mode SHOULD yield tag boundary strings.""" + def test_phase_sequence(self): chunks = [ - ContentThinking._as_chunk("thought"), + ContentThinkingDelta(thinking="thought"), ContentText.model_construct(text="answer"), ] chat = _make_chat(chunks) result = list(chat.stream("test", content="all")) - str_chunks = [x for x in result if isinstance(x, str)] - assert "\n" in str_chunks - assert "\n\n\n" in str_chunks + assert isinstance(result[0], ContentThinkingDelta) + assert result[0].phase == "start" + assert result[0].thinking == "thought" + assert isinstance(result[1], ContentThinkingDelta) + assert result[1].phase == "end" + assert result[1].thinking == "" + assert result[2] == "answer" - def test_order_of_chunks(self): - """content='all' mode: open tag, ContentThinking objects, close tag, then text.""" + def test_thinking_only(self): chunks = [ - ContentThinking._as_chunk("thought"), - ContentText.model_construct(text="answer"), + ContentThinkingDelta(thinking="reasoning"), ] chat = _make_chat(chunks) result = list(chat.stream("test", content="all")) - assert result[0] == "\n" - assert isinstance(result[1], ContentThinking) - assert result[1].thinking == "thought" - assert result[2] == "\n\n\n" - assert result[3] == "answer" + thinking_chunks = [x for x in result if isinstance(x, ContentThinkingDelta)] + assert len(thinking_chunks) == 2 + assert thinking_chunks[0].phase == "start" + assert thinking_chunks[1].phase == "end" - def test_str_on_chunk_has_no_tags(self): - """Calling str() on yielded ContentThinking chunks should not wrap in tags.""" + def test_str_on_delta_has_no_tags(self): chunks = [ - ContentThinking._as_chunk("thought"), + ContentThinkingDelta(thinking="thought"), ContentText.model_construct(text="answer"), ] chat = _make_chat(chunks) result = list(chat.stream("test", content="all")) - thinking_chunks = [x for x in result if isinstance(x, ContentThinking)] - assert len(thinking_chunks) == 1 - assert str(thinking_chunks[0]) == "thought" + thinking_chunks = [x for x in result if isinstance(x, ContentThinkingDelta)] + for chunk in thinking_chunks: + assert "" not in str(chunk) + assert "" not in str(chunk) @pytest.mark.asyncio class TestStreamThinkingAsync: - """Tests for async streaming with thinking boundaries.""" + """Tests for async streaming with thinking.""" - async def test_thinking_then_text_async(self): - """Async streaming thinking → text produces proper tag boundaries.""" + async def test_thinking_suppressed_text_async(self): chunks = [ - ContentThinking._as_chunk("async thought "), - ContentThinking._as_chunk("more"), + ContentThinkingDelta(thinking="async thought "), + ContentThinkingDelta(thinking="more"), ContentText.model_construct(text="response"), ] chat = _make_chat(chunks) result = [chunk async for chunk in await chat.stream_async("test")] combined = "".join(result) - assert combined == "\nasync thought more\n\n\nresponse" + assert combined == "response" async def test_thinking_only_async(self): - """Async: close tag emitted even if stream ends during thinking.""" chunks = [ - ContentThinking._as_chunk("reasoning"), + ContentThinkingDelta(thinking="reasoning"), ] chat = _make_chat(chunks) result = [chunk async for chunk in await chat.stream_async("test")] - combined = "".join(result) - assert combined == "\nreasoning\n\n\n" + assert result == [] async def test_content_all_async(self): - """Async content='all' yields tag boundaries AND ContentThinking objects.""" chunks = [ - ContentThinking._as_chunk("thought"), + ContentThinkingDelta(thinking="thought"), ContentText.model_construct(text="answer"), ] chat = _make_chat(chunks) - result = [chunk async for chunk in await chat.stream_async("test", content="all")] - - thinking_chunks = [x for x in result if isinstance(x, ContentThinking)] - str_chunks = [x for x in result if isinstance(x, str)] + result = [ + chunk + async for chunk in await chat.stream_async("test", content="all") + ] - assert len(thinking_chunks) == 1 + thinking_chunks = [x for x in result if isinstance(x, ContentThinkingDelta)] + assert len(thinking_chunks) == 2 assert thinking_chunks[0].thinking == "thought" - assert "\n" in str_chunks - assert "\n\n\n" in str_chunks - assert "answer" in str_chunks - assert result[0] == "\n" - assert result[2] == "\n\n\n" + assert thinking_chunks[0].phase == "start" + assert thinking_chunks[1].phase == "end" + assert "answer" in [x for x in result if isinstance(x, str)]