diff --git a/CHANGELOG.md b/CHANGELOG.md index 0d0bb9de..0d85ffed 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Improvements +* `.stream()` and `.stream_async()` now emit `` / `` tag boundaries around thinking content. With `content="text"`, concatenating all chunks produces well-formed output with thinking delimited by a single tag pair. With `content="all"`, behavior is unchanged — typed `ContentThinking` objects are yielded without tag strings. (#294) * Updated default models across all providers to current generation: (#292) * Anthropic: `claude-sonnet-4-6` * Bedrock: `us.anthropic.claude-sonnet-4-6` diff --git a/chatlas/_chat.py b/chatlas/_chat.py index fec8997f..6b1a2705 100644 --- a/chatlas/_chat.py +++ b/chatlas/_chat.py @@ -2671,20 +2671,34 @@ def emit(text: str | Content): ) result = None + inside_thinking = False + for chunk in response: content = self.provider.stream_content(chunk) if content is not None: text = content_text(content) if text: + is_thinking = isinstance(content, ContentThinking) + if is_thinking and not inside_thinking: + emit("\n") + yield "\n" + inside_thinking = True + elif not is_thinking and inside_thinking: + emit("\n\n\n") + yield "\n\n\n" + inside_thinking = False + emit(text) - if content_mode == "all" and isinstance( - content, ContentThinking - ): + if content_mode == "all" and is_thinking: yield content else: yield text result = self.provider.stream_merge_chunks(result, chunk) + if inside_thinking: + emit("\n\n\n") + yield "\n\n\n" + turn = self.provider.stream_turn( result, has_data_model=data_model is not None, @@ -2777,20 +2791,34 @@ def emit(text: str | Content): ) result = None + inside_thinking = False + async for chunk in response: content = self.provider.stream_content(chunk) if content is not None: text = content_text(content) if text: + is_thinking = isinstance(content, ContentThinking) + if is_thinking and not inside_thinking: + emit("\n") + yield "\n" + inside_thinking = True + elif not is_thinking and inside_thinking: + emit("\n\n\n") + yield "\n\n\n" + inside_thinking = False + emit(text) - if content_mode == "all" and isinstance( - content, ContentThinking - ): + if content_mode == "all" and is_thinking: yield content else: yield text result = self.provider.stream_merge_chunks(result, chunk) + if inside_thinking: + emit("\n\n\n") + yield "\n\n\n" + turn = self.provider.stream_turn( result, has_data_model=data_model is not None, diff --git a/chatlas/_content.py b/chatlas/_content.py index 4fff0af5..1c88acd6 100644 --- a/chatlas/_content.py +++ b/chatlas/_content.py @@ -6,7 +6,14 @@ from typing import TYPE_CHECKING, Any, Literal, Optional, Union, cast import orjson -from pydantic import BaseModel, ConfigDict, Field, field_serializer, field_validator +from pydantic import ( + BaseModel, + ConfigDict, + Field, + PrivateAttr, + field_serializer, + field_validator, +) from ._typing_extensions import TypedDict @@ -624,11 +631,20 @@ class ContentThinking(Content): thinking: str extra: Optional[dict[str, Any]] = None + _complete: bool = PrivateAttr(default=True) content_type: ContentTypeEnum = "thinking" + @classmethod + def _as_chunk(cls, thinking: str) -> "ContentThinking": + obj = cls.model_construct(thinking=thinking, content_type="thinking") + obj._complete = False + return obj + def __str__(self): - return f"\n{self.thinking}\n\n" + if self._complete: + return f"\n{self.thinking}\n\n" + return self.thinking def _repr_html_(self): return str(self.tagify()) diff --git a/chatlas/_provider_anthropic.py b/chatlas/_provider_anthropic.py index cf8c07d1..f7db8e63 100644 --- a/chatlas/_provider_anthropic.py +++ b/chatlas/_provider_anthropic.py @@ -468,7 +468,7 @@ def stream_content(self, chunk) -> Optional[Content]: if chunk.delta.type == "text_delta": return ContentText.model_construct(text=chunk.delta.text) if chunk.delta.type == "thinking_delta": - return ContentThinking(thinking=chunk.delta.thinking) + return ContentThinking._as_chunk(chunk.delta.thinking) return None def stream_merge_chunks(self, completion, chunk): diff --git a/chatlas/_provider_google.py b/chatlas/_provider_google.py index f512071e..f74aa64f 100644 --- a/chatlas/_provider_google.py +++ b/chatlas/_provider_google.py @@ -377,7 +377,7 @@ def stream_content(self, chunk) -> Optional[Content]: if text is None: return None if getattr(part, "thought", None): - return ContentThinking(thinking=text) + return ContentThinking._as_chunk(text) return ContentText.model_construct(text=text) def stream_merge_chunks(self, completion, chunk): diff --git a/chatlas/_provider_openai.py b/chatlas/_provider_openai.py index 6f4f1292..0ff81b4b 100644 --- a/chatlas/_provider_openai.py +++ b/chatlas/_provider_openai.py @@ -298,11 +298,11 @@ def stream_content(self, chunk) -> Optional[Content]: return ContentText.model_construct(text=chunk.delta) if chunk.type == "response.reasoning_summary_text.delta": # https://platform.openai.com/docs/api-reference/responses-streaming/response/reasoning_summary_text/delta - return ContentThinking(thinking=chunk.delta) + return ContentThinking._as_chunk(chunk.delta) if chunk.type == "response.reasoning_summary_text.done": - # Separator between reasoning summary and response text - # https://platform.openai.com/docs/api-reference/responses-streaming/response/reasoning_summary_text/done - return ContentText.model_construct(text="\n\n") + # The thinking→text transition in _submit_turns already emits + # "\n\n\n" which provides the visual separator. + return None return None def stream_merge_chunks(self, completion, chunk): diff --git a/tests/test_stream_thinking.py b/tests/test_stream_thinking.py new file mode 100644 index 00000000..a5ad76d0 --- /dev/null +++ b/tests/test_stream_thinking.py @@ -0,0 +1,243 @@ +"""Tests for streaming thinking tag boundary emission.""" + +from collections.abc import Sequence +from typing import Optional + +import pytest +from chatlas import Chat +from chatlas._content import Content, ContentText, ContentThinking +from chatlas._provider import Provider +from chatlas._turn import AssistantTurn + + +class FakeChunk: + """A fake chunk that carries a Content object.""" + + def __init__(self, content: Optional[Content]): + self.content = content + + +class FakeProvider(Provider): + """Minimal provider that yields a predetermined sequence of content chunks.""" + + def __init__(self, chunks: Sequence[Optional[Content]]): + super().__init__(name="fake", model="fake-model") + self._chunks = chunks + + def list_models(self): + return [] + + def chat_perform(self, *, stream, turns, tools, data_model, kwargs): + if stream: + return iter([FakeChunk(c) for c in self._chunks]) + raise NotImplementedError + + async def chat_perform_async(self, *, stream, turns, tools, data_model, kwargs): + if stream: + + async def _gen(): + for c in self._chunks: + yield FakeChunk(c) + + return _gen() + raise NotImplementedError + + def stream_content(self, chunk) -> Optional[Content]: + return chunk.content + + def stream_merge_chunks(self, completion, chunk): + return completion or {} + + def stream_turn(self, completion, has_data_model): + return AssistantTurn( + contents=[ContentText.model_construct(text="response")], + tokens=None, + completion=None, + ) + + def value_turn(self, completion, has_data_model): + raise NotImplementedError + + def value_tokens(self, completion): + return None + + def value_cost(self, completion, tokens=None): + return None + + def token_count(self, *args, **kwargs): + return 0 + + async def token_count_async(self, *args, **kwargs): + return 0 + + def translate_model_params(self, *args, **kwargs): + return {} + + def supported_model_params(self): + return set() + + +def _make_chat(chunks: Sequence[Optional[Content]]) -> Chat: + provider = FakeProvider(chunks) + return Chat(provider=provider) + + +class TestStreamThinkingText: + """Tests for content='text' mode — tags should be yielded as string chunks.""" + + def test_thinking_then_text(self): + """Streaming thinking → text produces proper tag boundaries.""" + chunks = [ + ContentThinking._as_chunk("step 1 "), + ContentThinking._as_chunk("step 2"), + ContentText.model_construct(text="Hello world"), + ] + chat = _make_chat(chunks) + result = list(chat.stream("test")) + combined = "".join(result) + assert combined == "\nstep 1 step 2\n\n\nHello world" + + def test_thinking_only(self): + """If stream ends during thinking, close tag is still emitted.""" + chunks = [ + ContentThinking._as_chunk("reasoning here"), + ] + chat = _make_chat(chunks) + result = list(chat.stream("test")) + combined = "".join(result) + assert combined == "\nreasoning here\n\n\n" + + def test_text_only(self): + """No thinking chunks means no tags emitted.""" + chunks = [ + ContentText.model_construct(text="Just text"), + ] + chat = _make_chat(chunks) + result = list(chat.stream("test")) + combined = "".join(result) + assert combined == "Just text" + + def test_tag_chunks_are_separate(self): + """Opening and closing tags are yielded as separate chunks.""" + chunks = [ + ContentThinking._as_chunk("thought"), + ContentText.model_construct(text="answer"), + ] + chat = _make_chat(chunks) + result = list(chat.stream("test")) + assert result[0] == "\n" + assert result[1] == "thought" + assert result[2] == "\n\n\n" + assert result[3] == "answer" + + +class TestStreamThinkingAll: + """Tests for content='all' mode — ContentThinking objects AND tag boundary strings yielded.""" + + def test_thinking_then_text(self): + """content='all' yields tag boundaries AND ContentThinking objects.""" + chunks = [ + ContentThinking._as_chunk("step 1 "), + ContentThinking._as_chunk("step 2"), + ContentText.model_construct(text="Hello"), + ] + chat = _make_chat(chunks) + result = list(chat.stream("test", content="all")) + + thinking_chunks = [x for x in result if isinstance(x, ContentThinking)] + str_chunks = [x for x in result if isinstance(x, str)] + + assert len(thinking_chunks) == 2 + assert thinking_chunks[0].thinking == "step 1 " + assert thinking_chunks[1].thinking == "step 2" + assert "\n" in str_chunks + assert "\n\n\n" in str_chunks + assert "Hello" in str_chunks + + def test_tag_boundaries_yielded(self): + """content='all' mode SHOULD yield tag boundary strings.""" + chunks = [ + ContentThinking._as_chunk("thought"), + ContentText.model_construct(text="answer"), + ] + chat = _make_chat(chunks) + result = list(chat.stream("test", content="all")) + + str_chunks = [x for x in result if isinstance(x, str)] + assert "\n" in str_chunks + assert "\n\n\n" in str_chunks + + def test_order_of_chunks(self): + """content='all' mode: open tag, ContentThinking objects, close tag, then text.""" + chunks = [ + ContentThinking._as_chunk("thought"), + ContentText.model_construct(text="answer"), + ] + chat = _make_chat(chunks) + result = list(chat.stream("test", content="all")) + + assert result[0] == "\n" + assert isinstance(result[1], ContentThinking) + assert result[1].thinking == "thought" + assert result[2] == "\n\n\n" + assert result[3] == "answer" + + def test_str_on_chunk_has_no_tags(self): + """Calling str() on yielded ContentThinking chunks should not wrap in tags.""" + chunks = [ + ContentThinking._as_chunk("thought"), + ContentText.model_construct(text="answer"), + ] + chat = _make_chat(chunks) + result = list(chat.stream("test", content="all")) + + thinking_chunks = [x for x in result if isinstance(x, ContentThinking)] + assert len(thinking_chunks) == 1 + assert str(thinking_chunks[0]) == "thought" + + +@pytest.mark.asyncio +class TestStreamThinkingAsync: + """Tests for async streaming with thinking boundaries.""" + + async def test_thinking_then_text_async(self): + """Async streaming thinking → text produces proper tag boundaries.""" + chunks = [ + ContentThinking._as_chunk("async thought "), + ContentThinking._as_chunk("more"), + ContentText.model_construct(text="response"), + ] + chat = _make_chat(chunks) + result = [chunk async for chunk in await chat.stream_async("test")] + combined = "".join(result) + assert combined == "\nasync thought more\n\n\nresponse" + + async def test_thinking_only_async(self): + """Async: close tag emitted even if stream ends during thinking.""" + chunks = [ + ContentThinking._as_chunk("reasoning"), + ] + chat = _make_chat(chunks) + result = [chunk async for chunk in await chat.stream_async("test")] + combined = "".join(result) + assert combined == "\nreasoning\n\n\n" + + async def test_content_all_async(self): + """Async content='all' yields tag boundaries AND ContentThinking objects.""" + chunks = [ + ContentThinking._as_chunk("thought"), + ContentText.model_construct(text="answer"), + ] + chat = _make_chat(chunks) + result = [chunk async for chunk in await chat.stream_async("test", content="all")] + + thinking_chunks = [x for x in result if isinstance(x, ContentThinking)] + str_chunks = [x for x in result if isinstance(x, str)] + + assert len(thinking_chunks) == 1 + assert thinking_chunks[0].thinking == "thought" + assert "\n" in str_chunks + assert "\n\n\n" in str_chunks + assert "answer" in str_chunks + assert result[0] == "\n" + assert result[2] == "\n\n\n"