diff --git a/CHANGELOG.md b/CHANGELOG.md
index cc32d81d..518d6af8 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -11,7 +11,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Improvements
-* `.stream()` and `.stream_async()` now emit `` / `` tag boundaries around thinking content in all stream modes. With `content="text"`, concatenating all chunks produces well-formed output with thinking delimited by a single tag pair. With `content="all"`, tag boundary strings are yielded alongside typed `ContentThinking` objects, so downstream consumers can detect thinking boundaries without type inspection. (#294, #297)
+* `.stream()` and `.stream_async()` now handle thinking content differently by mode. With `content="text"`, thinking is suppressed entirely. With `content="all"`, thinking fragments are yielded as `ContentThinkingDelta` objects with a `phase` property (`"start"`, `"body"`, or `"end"`) that communicates block boundaries to downstream consumers without injecting synthetic strings into the stream. (#299, #297, #294)
* Updated default models across all providers to current generation: (#292)
* Anthropic: `claude-sonnet-4-6`
* Bedrock: `us.anthropic.claude-sonnet-4-6`
diff --git a/chatlas/_chat.py b/chatlas/_chat.py
index 6b1a2705..9beafd65 100644
--- a/chatlas/_chat.py
+++ b/chatlas/_chat.py
@@ -35,6 +35,7 @@
ContentJson,
ContentText,
ContentThinking,
+ ContentThinkingDelta,
ContentToolRequest,
ContentToolResult,
ToolInfo,
@@ -1157,7 +1158,7 @@ def stream(
data_model: Optional[type[BaseModel]] = None,
kwargs: Optional[SubmitInputArgsT] = None,
) -> Generator[
- str | ContentThinking | ContentToolRequest | ContentToolResult, None, None
+ str | ContentThinkingDelta | ContentToolRequest | ContentToolResult, None, None
]: ...
def stream(
@@ -1168,7 +1169,7 @@ def stream(
data_model: Optional[type[BaseModel]] = None,
kwargs: Optional[SubmitInputArgsT] = None,
) -> Generator[
- str | ContentThinking | ContentToolRequest | ContentToolResult, None, None
+ str | ContentThinkingDelta | ContentToolRequest | ContentToolResult, None, None
]:
"""
Generate a response from the chat in a streaming fashion.
@@ -1233,7 +1234,7 @@ class Person(BaseModel):
)
def wrapper() -> Generator[
- str | ContentThinking | ContentToolRequest | ContentToolResult, None, None
+ str | ContentThinkingDelta | ContentToolRequest | ContentToolResult, None, None
]:
with display:
for chunk in generator:
@@ -1260,7 +1261,7 @@ async def stream_async(
data_model: Optional[type[BaseModel]] = None,
kwargs: Optional[SubmitInputArgsT] = None,
) -> AsyncGenerator[
- str | ContentThinking | ContentToolRequest | ContentToolResult, None
+ str | ContentThinkingDelta | ContentToolRequest | ContentToolResult, None
]: ...
async def stream_async(
@@ -1271,7 +1272,7 @@ async def stream_async(
data_model: Optional[type[BaseModel]] = None,
kwargs: Optional[SubmitInputArgsT] = None,
) -> AsyncGenerator[
- str | ContentThinking | ContentToolRequest | ContentToolResult, None
+ str | ContentThinkingDelta | ContentToolRequest | ContentToolResult, None
]:
"""
Generate a response from the chat in a streaming fashion asynchronously.
@@ -1332,7 +1333,7 @@ class Person(BaseModel):
display = self._markdown_display(echo=echo)
async def wrapper() -> AsyncGenerator[
- str | ContentThinking | ContentToolRequest | ContentToolResult, None
+ str | ContentThinkingDelta | ContentToolRequest | ContentToolResult, None
]:
with display:
async for chunk in self._chat_impl_async(
@@ -2494,7 +2495,7 @@ def _chat_impl(
kwargs: Optional[SubmitInputArgsT] = None,
data_model: Optional[type[BaseModel]] = None,
) -> Generator[
- str | ContentThinking | ContentToolRequest | ContentToolResult, None, None
+ str | ContentThinkingDelta | ContentToolRequest | ContentToolResult, None, None
]: ...
def _chat_impl(
@@ -2564,7 +2565,7 @@ def _chat_impl_async(
kwargs: Optional[SubmitInputArgsT] = None,
data_model: Optional[type[BaseModel]] = None,
) -> AsyncGenerator[
- str | ContentThinking | ContentToolRequest | ContentToolResult, None
+ str | ContentThinkingDelta | ContentToolRequest | ContentToolResult, None
]: ...
async def _chat_impl_async(
@@ -2676,28 +2677,33 @@ def emit(text: str | Content):
for chunk in response:
content = self.provider.stream_content(chunk)
if content is not None:
- text = content_text(content)
- if text:
- is_thinking = isinstance(content, ContentThinking)
- if is_thinking and not inside_thinking:
- emit("\n")
- yield "\n"
- inside_thinking = True
- elif not is_thinking and inside_thinking:
- emit("\n\n\n")
- yield "\n\n\n"
- inside_thinking = False
-
- emit(text)
- if content_mode == "all" and is_thinking:
+ if is_thinking_delta(content) and not inside_thinking:
+ content = ContentThinkingDelta(
+ thinking=content.thinking, phase="start"
+ )
+ emit("\n")
+ inside_thinking = True
+ elif not is_thinking_delta(content) and inside_thinking:
+ emit("\n\n\n")
+ if content_mode == "all":
+ yield ContentThinkingDelta(thinking="", phase="end")
+ inside_thinking = False
+
+ if is_thinking_delta(content):
+ emit(content.thinking)
+ if content_mode == "all":
yield content
- else:
+ else:
+ text = content_text(content)
+ if text:
+ emit(text)
yield text
result = self.provider.stream_merge_chunks(result, chunk)
if inside_thinking:
emit("\n\n\n")
- yield "\n\n\n"
+ if content_mode == "all":
+ yield ContentThinkingDelta(thinking="", phase="end")
turn = self.provider.stream_turn(
result,
@@ -2796,28 +2802,33 @@ def emit(text: str | Content):
async for chunk in response:
content = self.provider.stream_content(chunk)
if content is not None:
- text = content_text(content)
- if text:
- is_thinking = isinstance(content, ContentThinking)
- if is_thinking and not inside_thinking:
- emit("\n")
- yield "\n"
- inside_thinking = True
- elif not is_thinking and inside_thinking:
- emit("\n\n\n")
- yield "\n\n\n"
- inside_thinking = False
-
- emit(text)
- if content_mode == "all" and is_thinking:
+ if is_thinking_delta(content) and not inside_thinking:
+ content = ContentThinkingDelta(
+ thinking=content.thinking, phase="start"
+ )
+ emit("\n")
+ inside_thinking = True
+ elif not is_thinking_delta(content) and inside_thinking:
+ emit("\n\n\n")
+ if content_mode == "all":
+ yield ContentThinkingDelta(thinking="", phase="end")
+ inside_thinking = False
+
+ if is_thinking_delta(content):
+ emit(content.thinking)
+ if content_mode == "all":
yield content
- else:
+ else:
+ text = content_text(content)
+ if text:
+ emit(text)
yield text
result = self.provider.stream_merge_chunks(result, chunk)
if inside_thinking:
emit("\n\n\n")
- yield "\n\n\n"
+ if content_mode == "all":
+ yield ContentThinkingDelta(thinking="", phase="end")
turn = self.provider.stream_turn(
result,
@@ -3292,8 +3303,14 @@ class ToolFailureWarning(RuntimeWarning):
warnings.simplefilter("always", ToolFailureWarning)
+def is_thinking_delta(content: Content) -> TypeGuard[ContentThinkingDelta]:
+ return isinstance(content, ContentThinkingDelta)
+
+
def content_text(content: Content) -> str:
"""Extract displayable text from a Content object."""
+ if isinstance(content, ContentThinkingDelta):
+ return content.thinking
if isinstance(content, ContentThinking):
return content.thinking
if isinstance(content, ContentText):
diff --git a/chatlas/_content.py b/chatlas/_content.py
index 1c88acd6..296bdda6 100644
--- a/chatlas/_content.py
+++ b/chatlas/_content.py
@@ -10,7 +10,6 @@
BaseModel,
ConfigDict,
Field,
- PrivateAttr,
field_serializer,
field_validator,
)
@@ -139,6 +138,7 @@ def from_tool(cls, tool: "Tool | ToolBuiltIn") -> "ToolInfo":
"json",
"pdf",
"thinking",
+ "thinking_delta",
"web_search_request",
"web_search_results",
"web_fetch_request",
@@ -631,20 +631,11 @@ class ContentThinking(Content):
thinking: str
extra: Optional[dict[str, Any]] = None
- _complete: bool = PrivateAttr(default=True)
content_type: ContentTypeEnum = "thinking"
- @classmethod
- def _as_chunk(cls, thinking: str) -> "ContentThinking":
- obj = cls.model_construct(thinking=thinking, content_type="thinking")
- obj._complete = False
- return obj
-
def __str__(self):
- if self._complete:
- return f"\n{self.thinking}\n\n"
- return self.thinking
+ return f"\n{self.thinking}\n\n"
def _repr_html_(self):
return str(self.tagify())
@@ -663,6 +654,30 @@ def tagify(self):
return HTML(html)
+class ContentThinkingDelta(Content):
+ """
+ A streaming fragment of thinking/reasoning content.
+
+ Emitted during streaming to represent a chunk of the model's thinking.
+ The ``phase`` attribute communicates block boundaries to downstream consumers.
+
+ Parameters
+ ----------
+ thinking
+ The thinking/reasoning text fragment.
+ phase
+ The phase of the thinking delta: ``"start"``, ``"body"``, or ``"end"``.
+ """
+
+ thinking: str
+ phase: Literal["start", "body", "end"] = "body"
+
+ content_type: ContentTypeEnum = "thinking_delta"
+
+ def __str__(self):
+ return self.thinking
+
+
class ContentToolRequestSearch(Content):
"""
A web search request from the model.
diff --git a/chatlas/_provider_anthropic.py b/chatlas/_provider_anthropic.py
index f7db8e63..60ff9fb1 100644
--- a/chatlas/_provider_anthropic.py
+++ b/chatlas/_provider_anthropic.py
@@ -26,6 +26,7 @@
ContentPDF,
ContentText,
ContentThinking,
+ ContentThinkingDelta,
ContentToolRequest,
ContentToolRequestFetch,
ContentToolRequestSearch,
@@ -468,7 +469,7 @@ def stream_content(self, chunk) -> Optional[Content]:
if chunk.delta.type == "text_delta":
return ContentText.model_construct(text=chunk.delta.text)
if chunk.delta.type == "thinking_delta":
- return ContentThinking._as_chunk(chunk.delta.thinking)
+ return ContentThinkingDelta(thinking=chunk.delta.thinking)
return None
def stream_merge_chunks(self, completion, chunk):
diff --git a/chatlas/_provider_google.py b/chatlas/_provider_google.py
index f74aa64f..585ab57b 100644
--- a/chatlas/_provider_google.py
+++ b/chatlas/_provider_google.py
@@ -15,6 +15,7 @@
ContentPDF,
ContentText,
ContentThinking,
+ ContentThinkingDelta,
ContentToolRequest,
ContentToolResult,
)
@@ -377,7 +378,7 @@ def stream_content(self, chunk) -> Optional[Content]:
if text is None:
return None
if getattr(part, "thought", None):
- return ContentThinking._as_chunk(text)
+ return ContentThinkingDelta(thinking=text)
return ContentText.model_construct(text=text)
def stream_merge_chunks(self, completion, chunk):
diff --git a/chatlas/_provider_openai.py b/chatlas/_provider_openai.py
index 0ff81b4b..e097f501 100644
--- a/chatlas/_provider_openai.py
+++ b/chatlas/_provider_openai.py
@@ -17,6 +17,7 @@
ContentPDF,
ContentText,
ContentThinking,
+ ContentThinkingDelta,
ContentToolRequest,
ContentToolRequestSearch,
ContentToolResult,
@@ -298,7 +299,7 @@ def stream_content(self, chunk) -> Optional[Content]:
return ContentText.model_construct(text=chunk.delta)
if chunk.type == "response.reasoning_summary_text.delta":
# https://platform.openai.com/docs/api-reference/responses-streaming/response/reasoning_summary_text/delta
- return ContentThinking._as_chunk(chunk.delta)
+ return ContentThinkingDelta(thinking=chunk.delta)
if chunk.type == "response.reasoning_summary_text.done":
# The thinking→text transition in _submit_turns already emits
# "\n\n\n" which provides the visual separator.
diff --git a/chatlas/types/__init__.py b/chatlas/types/__init__.py
index 782b7e11..4cb2cf4f 100644
--- a/chatlas/types/__init__.py
+++ b/chatlas/types/__init__.py
@@ -10,6 +10,8 @@
ContentImageRemote,
ContentJson,
ContentText,
+ ContentThinking,
+ ContentThinkingDelta,
ContentToolRequest,
ContentToolRequestFetch,
ContentToolRequestSearch,
@@ -32,6 +34,8 @@
"ContentImageRemote",
"ContentJson",
"ContentText",
+ "ContentThinking",
+ "ContentThinkingDelta",
"ContentToolRequest",
"ContentToolResult",
"ContentToolRequestFetch",
diff --git a/docs/_quarto.yml b/docs/_quarto.yml
index 3dcf40da..e77eff7e 100644
--- a/docs/_quarto.yml
+++ b/docs/_quarto.yml
@@ -202,6 +202,8 @@ quartodoc:
- types.ContentImageRemote
- types.ContentJson
- types.ContentText
+ - types.ContentThinking
+ - types.ContentThinkingDelta
- types.ContentToolRequest
- types.ContentToolResult
- types.ContentToolRequestSearch
diff --git a/tests/test_stream_thinking.py b/tests/test_stream_thinking.py
index a5ad76d0..6574dbbd 100644
--- a/tests/test_stream_thinking.py
+++ b/tests/test_stream_thinking.py
@@ -1,11 +1,11 @@
-"""Tests for streaming thinking tag boundary emission."""
+"""Tests for streaming thinking with ContentThinkingDelta."""
from collections.abc import Sequence
from typing import Optional
import pytest
from chatlas import Chat
-from chatlas._content import Content, ContentText, ContentThinking
+from chatlas._content import Content, ContentText, ContentThinkingDelta
from chatlas._provider import Provider
from chatlas._turn import AssistantTurn
@@ -83,32 +83,28 @@ def _make_chat(chunks: Sequence[Optional[Content]]) -> Chat:
class TestStreamThinkingText:
- """Tests for content='text' mode — tags should be yielded as string chunks."""
+ """Tests for content='text' mode — thinking is suppressed."""
- def test_thinking_then_text(self):
- """Streaming thinking → text produces proper tag boundaries."""
+ def test_thinking_suppressed(self):
chunks = [
- ContentThinking._as_chunk("step 1 "),
- ContentThinking._as_chunk("step 2"),
+ ContentThinkingDelta(thinking="step 1 "),
+ ContentThinkingDelta(thinking="step 2"),
ContentText.model_construct(text="Hello world"),
]
chat = _make_chat(chunks)
result = list(chat.stream("test"))
combined = "".join(result)
- assert combined == "\nstep 1 step 2\n\n\nHello world"
+ assert combined == "Hello world"
def test_thinking_only(self):
- """If stream ends during thinking, close tag is still emitted."""
chunks = [
- ContentThinking._as_chunk("reasoning here"),
+ ContentThinkingDelta(thinking="reasoning here"),
]
chat = _make_chat(chunks)
result = list(chat.stream("test"))
- combined = "".join(result)
- assert combined == "\nreasoning here\n\n\n"
+ assert result == []
def test_text_only(self):
- """No thinking chunks means no tags emitted."""
chunks = [
ContentText.model_construct(text="Just text"),
]
@@ -117,127 +113,107 @@ def test_text_only(self):
combined = "".join(result)
assert combined == "Just text"
- def test_tag_chunks_are_separate(self):
- """Opening and closing tags are yielded as separate chunks."""
- chunks = [
- ContentThinking._as_chunk("thought"),
- ContentText.model_construct(text="answer"),
- ]
- chat = _make_chat(chunks)
- result = list(chat.stream("test"))
- assert result[0] == "\n"
- assert result[1] == "thought"
- assert result[2] == "\n\n\n"
- assert result[3] == "answer"
-
class TestStreamThinkingAll:
- """Tests for content='all' mode — ContentThinking objects AND tag boundary strings yielded."""
+ """Tests for content='all' mode — ContentThinkingDelta objects yielded with phase."""
def test_thinking_then_text(self):
- """content='all' yields tag boundaries AND ContentThinking objects."""
chunks = [
- ContentThinking._as_chunk("step 1 "),
- ContentThinking._as_chunk("step 2"),
+ ContentThinkingDelta(thinking="step 1 "),
+ ContentThinkingDelta(thinking="step 2"),
ContentText.model_construct(text="Hello"),
]
chat = _make_chat(chunks)
result = list(chat.stream("test", content="all"))
- thinking_chunks = [x for x in result if isinstance(x, ContentThinking)]
- str_chunks = [x for x in result if isinstance(x, str)]
-
- assert len(thinking_chunks) == 2
+ thinking_chunks = [x for x in result if isinstance(x, ContentThinkingDelta)]
+ assert len(thinking_chunks) == 3
assert thinking_chunks[0].thinking == "step 1 "
+ assert thinking_chunks[0].phase == "start"
assert thinking_chunks[1].thinking == "step 2"
- assert "\n" in str_chunks
- assert "\n\n\n" in str_chunks
- assert "Hello" in str_chunks
+ assert thinking_chunks[1].phase == "body"
+ assert thinking_chunks[2].thinking == ""
+ assert thinking_chunks[2].phase == "end"
- def test_tag_boundaries_yielded(self):
- """content='all' mode SHOULD yield tag boundary strings."""
+ def test_phase_sequence(self):
chunks = [
- ContentThinking._as_chunk("thought"),
+ ContentThinkingDelta(thinking="thought"),
ContentText.model_construct(text="answer"),
]
chat = _make_chat(chunks)
result = list(chat.stream("test", content="all"))
- str_chunks = [x for x in result if isinstance(x, str)]
- assert "\n" in str_chunks
- assert "\n\n\n" in str_chunks
+ assert isinstance(result[0], ContentThinkingDelta)
+ assert result[0].phase == "start"
+ assert result[0].thinking == "thought"
+ assert isinstance(result[1], ContentThinkingDelta)
+ assert result[1].phase == "end"
+ assert result[1].thinking == ""
+ assert result[2] == "answer"
- def test_order_of_chunks(self):
- """content='all' mode: open tag, ContentThinking objects, close tag, then text."""
+ def test_thinking_only(self):
chunks = [
- ContentThinking._as_chunk("thought"),
- ContentText.model_construct(text="answer"),
+ ContentThinkingDelta(thinking="reasoning"),
]
chat = _make_chat(chunks)
result = list(chat.stream("test", content="all"))
- assert result[0] == "\n"
- assert isinstance(result[1], ContentThinking)
- assert result[1].thinking == "thought"
- assert result[2] == "\n\n\n"
- assert result[3] == "answer"
+ thinking_chunks = [x for x in result if isinstance(x, ContentThinkingDelta)]
+ assert len(thinking_chunks) == 2
+ assert thinking_chunks[0].phase == "start"
+ assert thinking_chunks[1].phase == "end"
- def test_str_on_chunk_has_no_tags(self):
- """Calling str() on yielded ContentThinking chunks should not wrap in tags."""
+ def test_str_on_delta_has_no_tags(self):
chunks = [
- ContentThinking._as_chunk("thought"),
+ ContentThinkingDelta(thinking="thought"),
ContentText.model_construct(text="answer"),
]
chat = _make_chat(chunks)
result = list(chat.stream("test", content="all"))
- thinking_chunks = [x for x in result if isinstance(x, ContentThinking)]
- assert len(thinking_chunks) == 1
- assert str(thinking_chunks[0]) == "thought"
+ thinking_chunks = [x for x in result if isinstance(x, ContentThinkingDelta)]
+ for chunk in thinking_chunks:
+ assert "" not in str(chunk)
+ assert "" not in str(chunk)
@pytest.mark.asyncio
class TestStreamThinkingAsync:
- """Tests for async streaming with thinking boundaries."""
+ """Tests for async streaming with thinking."""
- async def test_thinking_then_text_async(self):
- """Async streaming thinking → text produces proper tag boundaries."""
+ async def test_thinking_suppressed_text_async(self):
chunks = [
- ContentThinking._as_chunk("async thought "),
- ContentThinking._as_chunk("more"),
+ ContentThinkingDelta(thinking="async thought "),
+ ContentThinkingDelta(thinking="more"),
ContentText.model_construct(text="response"),
]
chat = _make_chat(chunks)
result = [chunk async for chunk in await chat.stream_async("test")]
combined = "".join(result)
- assert combined == "\nasync thought more\n\n\nresponse"
+ assert combined == "response"
async def test_thinking_only_async(self):
- """Async: close tag emitted even if stream ends during thinking."""
chunks = [
- ContentThinking._as_chunk("reasoning"),
+ ContentThinkingDelta(thinking="reasoning"),
]
chat = _make_chat(chunks)
result = [chunk async for chunk in await chat.stream_async("test")]
- combined = "".join(result)
- assert combined == "\nreasoning\n\n\n"
+ assert result == []
async def test_content_all_async(self):
- """Async content='all' yields tag boundaries AND ContentThinking objects."""
chunks = [
- ContentThinking._as_chunk("thought"),
+ ContentThinkingDelta(thinking="thought"),
ContentText.model_construct(text="answer"),
]
chat = _make_chat(chunks)
- result = [chunk async for chunk in await chat.stream_async("test", content="all")]
-
- thinking_chunks = [x for x in result if isinstance(x, ContentThinking)]
- str_chunks = [x for x in result if isinstance(x, str)]
+ result = [
+ chunk
+ async for chunk in await chat.stream_async("test", content="all")
+ ]
- assert len(thinking_chunks) == 1
+ thinking_chunks = [x for x in result if isinstance(x, ContentThinkingDelta)]
+ assert len(thinking_chunks) == 2
assert thinking_chunks[0].thinking == "thought"
- assert "\n" in str_chunks
- assert "\n\n\n" in str_chunks
- assert "answer" in str_chunks
- assert result[0] == "\n"
- assert result[2] == "\n\n\n"
+ assert thinking_chunks[0].phase == "start"
+ assert thinking_chunks[1].phase == "end"
+ assert "answer" in [x for x in result if isinstance(x, str)]