diff --git a/CHANGELOG.md b/CHANGELOG.md
index 0d0bb9de..0d85ffed 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -11,6 +11,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Improvements
+* `.stream()` and `.stream_async()` now emit `` / `` tag boundaries around thinking content. With `content="text"`, concatenating all chunks produces well-formed output with thinking delimited by a single tag pair. With `content="all"`, behavior is unchanged — typed `ContentThinking` objects are yielded without tag strings. (#294)
* Updated default models across all providers to current generation: (#292)
* Anthropic: `claude-sonnet-4-6`
* Bedrock: `us.anthropic.claude-sonnet-4-6`
diff --git a/chatlas/_chat.py b/chatlas/_chat.py
index fec8997f..6b1a2705 100644
--- a/chatlas/_chat.py
+++ b/chatlas/_chat.py
@@ -2671,20 +2671,34 @@ def emit(text: str | Content):
)
result = None
+ inside_thinking = False
+
for chunk in response:
content = self.provider.stream_content(chunk)
if content is not None:
text = content_text(content)
if text:
+ is_thinking = isinstance(content, ContentThinking)
+ if is_thinking and not inside_thinking:
+ emit("\n")
+ yield "\n"
+ inside_thinking = True
+ elif not is_thinking and inside_thinking:
+ emit("\n\n\n")
+ yield "\n\n\n"
+ inside_thinking = False
+
emit(text)
- if content_mode == "all" and isinstance(
- content, ContentThinking
- ):
+ if content_mode == "all" and is_thinking:
yield content
else:
yield text
result = self.provider.stream_merge_chunks(result, chunk)
+ if inside_thinking:
+ emit("\n\n\n")
+ yield "\n\n\n"
+
turn = self.provider.stream_turn(
result,
has_data_model=data_model is not None,
@@ -2777,20 +2791,34 @@ def emit(text: str | Content):
)
result = None
+ inside_thinking = False
+
async for chunk in response:
content = self.provider.stream_content(chunk)
if content is not None:
text = content_text(content)
if text:
+ is_thinking = isinstance(content, ContentThinking)
+ if is_thinking and not inside_thinking:
+ emit("\n")
+ yield "\n"
+ inside_thinking = True
+ elif not is_thinking and inside_thinking:
+ emit("\n\n\n")
+ yield "\n\n\n"
+ inside_thinking = False
+
emit(text)
- if content_mode == "all" and isinstance(
- content, ContentThinking
- ):
+ if content_mode == "all" and is_thinking:
yield content
else:
yield text
result = self.provider.stream_merge_chunks(result, chunk)
+ if inside_thinking:
+ emit("\n\n\n")
+ yield "\n\n\n"
+
turn = self.provider.stream_turn(
result,
has_data_model=data_model is not None,
diff --git a/chatlas/_content.py b/chatlas/_content.py
index 4fff0af5..1c88acd6 100644
--- a/chatlas/_content.py
+++ b/chatlas/_content.py
@@ -6,7 +6,14 @@
from typing import TYPE_CHECKING, Any, Literal, Optional, Union, cast
import orjson
-from pydantic import BaseModel, ConfigDict, Field, field_serializer, field_validator
+from pydantic import (
+ BaseModel,
+ ConfigDict,
+ Field,
+ PrivateAttr,
+ field_serializer,
+ field_validator,
+)
from ._typing_extensions import TypedDict
@@ -624,11 +631,20 @@ class ContentThinking(Content):
thinking: str
extra: Optional[dict[str, Any]] = None
+ _complete: bool = PrivateAttr(default=True)
content_type: ContentTypeEnum = "thinking"
+ @classmethod
+ def _as_chunk(cls, thinking: str) -> "ContentThinking":
+ obj = cls.model_construct(thinking=thinking, content_type="thinking")
+ obj._complete = False
+ return obj
+
def __str__(self):
- return f"\n{self.thinking}\n\n"
+ if self._complete:
+ return f"\n{self.thinking}\n\n"
+ return self.thinking
def _repr_html_(self):
return str(self.tagify())
diff --git a/chatlas/_provider_anthropic.py b/chatlas/_provider_anthropic.py
index cf8c07d1..f7db8e63 100644
--- a/chatlas/_provider_anthropic.py
+++ b/chatlas/_provider_anthropic.py
@@ -468,7 +468,7 @@ def stream_content(self, chunk) -> Optional[Content]:
if chunk.delta.type == "text_delta":
return ContentText.model_construct(text=chunk.delta.text)
if chunk.delta.type == "thinking_delta":
- return ContentThinking(thinking=chunk.delta.thinking)
+ return ContentThinking._as_chunk(chunk.delta.thinking)
return None
def stream_merge_chunks(self, completion, chunk):
diff --git a/chatlas/_provider_google.py b/chatlas/_provider_google.py
index f512071e..f74aa64f 100644
--- a/chatlas/_provider_google.py
+++ b/chatlas/_provider_google.py
@@ -377,7 +377,7 @@ def stream_content(self, chunk) -> Optional[Content]:
if text is None:
return None
if getattr(part, "thought", None):
- return ContentThinking(thinking=text)
+ return ContentThinking._as_chunk(text)
return ContentText.model_construct(text=text)
def stream_merge_chunks(self, completion, chunk):
diff --git a/chatlas/_provider_openai.py b/chatlas/_provider_openai.py
index 6f4f1292..0ff81b4b 100644
--- a/chatlas/_provider_openai.py
+++ b/chatlas/_provider_openai.py
@@ -298,11 +298,11 @@ def stream_content(self, chunk) -> Optional[Content]:
return ContentText.model_construct(text=chunk.delta)
if chunk.type == "response.reasoning_summary_text.delta":
# https://platform.openai.com/docs/api-reference/responses-streaming/response/reasoning_summary_text/delta
- return ContentThinking(thinking=chunk.delta)
+ return ContentThinking._as_chunk(chunk.delta)
if chunk.type == "response.reasoning_summary_text.done":
- # Separator between reasoning summary and response text
- # https://platform.openai.com/docs/api-reference/responses-streaming/response/reasoning_summary_text/done
- return ContentText.model_construct(text="\n\n")
+ # The thinking→text transition in _submit_turns already emits
+ # "\n\n\n" which provides the visual separator.
+ return None
return None
def stream_merge_chunks(self, completion, chunk):
diff --git a/tests/test_stream_thinking.py b/tests/test_stream_thinking.py
new file mode 100644
index 00000000..a5ad76d0
--- /dev/null
+++ b/tests/test_stream_thinking.py
@@ -0,0 +1,243 @@
+"""Tests for streaming thinking tag boundary emission."""
+
+from collections.abc import Sequence
+from typing import Optional
+
+import pytest
+from chatlas import Chat
+from chatlas._content import Content, ContentText, ContentThinking
+from chatlas._provider import Provider
+from chatlas._turn import AssistantTurn
+
+
+class FakeChunk:
+ """A fake chunk that carries a Content object."""
+
+ def __init__(self, content: Optional[Content]):
+ self.content = content
+
+
+class FakeProvider(Provider):
+ """Minimal provider that yields a predetermined sequence of content chunks."""
+
+ def __init__(self, chunks: Sequence[Optional[Content]]):
+ super().__init__(name="fake", model="fake-model")
+ self._chunks = chunks
+
+ def list_models(self):
+ return []
+
+ def chat_perform(self, *, stream, turns, tools, data_model, kwargs):
+ if stream:
+ return iter([FakeChunk(c) for c in self._chunks])
+ raise NotImplementedError
+
+ async def chat_perform_async(self, *, stream, turns, tools, data_model, kwargs):
+ if stream:
+
+ async def _gen():
+ for c in self._chunks:
+ yield FakeChunk(c)
+
+ return _gen()
+ raise NotImplementedError
+
+ def stream_content(self, chunk) -> Optional[Content]:
+ return chunk.content
+
+ def stream_merge_chunks(self, completion, chunk):
+ return completion or {}
+
+ def stream_turn(self, completion, has_data_model):
+ return AssistantTurn(
+ contents=[ContentText.model_construct(text="response")],
+ tokens=None,
+ completion=None,
+ )
+
+ def value_turn(self, completion, has_data_model):
+ raise NotImplementedError
+
+ def value_tokens(self, completion):
+ return None
+
+ def value_cost(self, completion, tokens=None):
+ return None
+
+ def token_count(self, *args, **kwargs):
+ return 0
+
+ async def token_count_async(self, *args, **kwargs):
+ return 0
+
+ def translate_model_params(self, *args, **kwargs):
+ return {}
+
+ def supported_model_params(self):
+ return set()
+
+
+def _make_chat(chunks: Sequence[Optional[Content]]) -> Chat:
+ provider = FakeProvider(chunks)
+ return Chat(provider=provider)
+
+
+class TestStreamThinkingText:
+ """Tests for content='text' mode — tags should be yielded as string chunks."""
+
+ def test_thinking_then_text(self):
+ """Streaming thinking → text produces proper tag boundaries."""
+ chunks = [
+ ContentThinking._as_chunk("step 1 "),
+ ContentThinking._as_chunk("step 2"),
+ ContentText.model_construct(text="Hello world"),
+ ]
+ chat = _make_chat(chunks)
+ result = list(chat.stream("test"))
+ combined = "".join(result)
+ assert combined == "\nstep 1 step 2\n\n\nHello world"
+
+ def test_thinking_only(self):
+ """If stream ends during thinking, close tag is still emitted."""
+ chunks = [
+ ContentThinking._as_chunk("reasoning here"),
+ ]
+ chat = _make_chat(chunks)
+ result = list(chat.stream("test"))
+ combined = "".join(result)
+ assert combined == "\nreasoning here\n\n\n"
+
+ def test_text_only(self):
+ """No thinking chunks means no tags emitted."""
+ chunks = [
+ ContentText.model_construct(text="Just text"),
+ ]
+ chat = _make_chat(chunks)
+ result = list(chat.stream("test"))
+ combined = "".join(result)
+ assert combined == "Just text"
+
+ def test_tag_chunks_are_separate(self):
+ """Opening and closing tags are yielded as separate chunks."""
+ chunks = [
+ ContentThinking._as_chunk("thought"),
+ ContentText.model_construct(text="answer"),
+ ]
+ chat = _make_chat(chunks)
+ result = list(chat.stream("test"))
+ assert result[0] == "\n"
+ assert result[1] == "thought"
+ assert result[2] == "\n\n\n"
+ assert result[3] == "answer"
+
+
+class TestStreamThinkingAll:
+ """Tests for content='all' mode — ContentThinking objects AND tag boundary strings yielded."""
+
+ def test_thinking_then_text(self):
+ """content='all' yields tag boundaries AND ContentThinking objects."""
+ chunks = [
+ ContentThinking._as_chunk("step 1 "),
+ ContentThinking._as_chunk("step 2"),
+ ContentText.model_construct(text="Hello"),
+ ]
+ chat = _make_chat(chunks)
+ result = list(chat.stream("test", content="all"))
+
+ thinking_chunks = [x for x in result if isinstance(x, ContentThinking)]
+ str_chunks = [x for x in result if isinstance(x, str)]
+
+ assert len(thinking_chunks) == 2
+ assert thinking_chunks[0].thinking == "step 1 "
+ assert thinking_chunks[1].thinking == "step 2"
+ assert "\n" in str_chunks
+ assert "\n\n\n" in str_chunks
+ assert "Hello" in str_chunks
+
+ def test_tag_boundaries_yielded(self):
+ """content='all' mode SHOULD yield tag boundary strings."""
+ chunks = [
+ ContentThinking._as_chunk("thought"),
+ ContentText.model_construct(text="answer"),
+ ]
+ chat = _make_chat(chunks)
+ result = list(chat.stream("test", content="all"))
+
+ str_chunks = [x for x in result if isinstance(x, str)]
+ assert "\n" in str_chunks
+ assert "\n\n\n" in str_chunks
+
+ def test_order_of_chunks(self):
+ """content='all' mode: open tag, ContentThinking objects, close tag, then text."""
+ chunks = [
+ ContentThinking._as_chunk("thought"),
+ ContentText.model_construct(text="answer"),
+ ]
+ chat = _make_chat(chunks)
+ result = list(chat.stream("test", content="all"))
+
+ assert result[0] == "\n"
+ assert isinstance(result[1], ContentThinking)
+ assert result[1].thinking == "thought"
+ assert result[2] == "\n\n\n"
+ assert result[3] == "answer"
+
+ def test_str_on_chunk_has_no_tags(self):
+ """Calling str() on yielded ContentThinking chunks should not wrap in tags."""
+ chunks = [
+ ContentThinking._as_chunk("thought"),
+ ContentText.model_construct(text="answer"),
+ ]
+ chat = _make_chat(chunks)
+ result = list(chat.stream("test", content="all"))
+
+ thinking_chunks = [x for x in result if isinstance(x, ContentThinking)]
+ assert len(thinking_chunks) == 1
+ assert str(thinking_chunks[0]) == "thought"
+
+
+@pytest.mark.asyncio
+class TestStreamThinkingAsync:
+ """Tests for async streaming with thinking boundaries."""
+
+ async def test_thinking_then_text_async(self):
+ """Async streaming thinking → text produces proper tag boundaries."""
+ chunks = [
+ ContentThinking._as_chunk("async thought "),
+ ContentThinking._as_chunk("more"),
+ ContentText.model_construct(text="response"),
+ ]
+ chat = _make_chat(chunks)
+ result = [chunk async for chunk in await chat.stream_async("test")]
+ combined = "".join(result)
+ assert combined == "\nasync thought more\n\n\nresponse"
+
+ async def test_thinking_only_async(self):
+ """Async: close tag emitted even if stream ends during thinking."""
+ chunks = [
+ ContentThinking._as_chunk("reasoning"),
+ ]
+ chat = _make_chat(chunks)
+ result = [chunk async for chunk in await chat.stream_async("test")]
+ combined = "".join(result)
+ assert combined == "\nreasoning\n\n\n"
+
+ async def test_content_all_async(self):
+ """Async content='all' yields tag boundaries AND ContentThinking objects."""
+ chunks = [
+ ContentThinking._as_chunk("thought"),
+ ContentText.model_construct(text="answer"),
+ ]
+ chat = _make_chat(chunks)
+ result = [chunk async for chunk in await chat.stream_async("test", content="all")]
+
+ thinking_chunks = [x for x in result if isinstance(x, ContentThinking)]
+ str_chunks = [x for x in result if isinstance(x, str)]
+
+ assert len(thinking_chunks) == 1
+ assert thinking_chunks[0].thinking == "thought"
+ assert "\n" in str_chunks
+ assert "\n\n\n" in str_chunks
+ assert "answer" in str_chunks
+ assert result[0] == "\n"
+ assert result[2] == "\n\n\n"