Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

### Improvements

* `.stream()` and `.stream_async()` now emit `<thinking>` / `</thinking>` tag boundaries around thinking content. With `content="text"`, concatenating all chunks produces well-formed output with thinking delimited by a single tag pair. With `content="all"`, behavior is unchanged — typed `ContentThinking` objects are yielded without tag strings. (#294)
* Updated default models across all providers to current generation: (#292)
* Anthropic: `claude-sonnet-4-6`
* Bedrock: `us.anthropic.claude-sonnet-4-6`
Expand Down
40 changes: 34 additions & 6 deletions chatlas/_chat.py
Original file line number Diff line number Diff line change
Expand Up @@ -2671,20 +2671,34 @@ def emit(text: str | Content):
)

result = None
inside_thinking = False

for chunk in response:
content = self.provider.stream_content(chunk)
if content is not None:
text = content_text(content)
if text:
is_thinking = isinstance(content, ContentThinking)
if is_thinking and not inside_thinking:
emit("<thinking>\n")
yield "<thinking>\n"
inside_thinking = True
elif not is_thinking and inside_thinking:
emit("\n</thinking>\n\n")
yield "\n</thinking>\n\n"
inside_thinking = False

emit(text)
if content_mode == "all" and isinstance(
content, ContentThinking
):
if content_mode == "all" and is_thinking:
yield content
else:
yield text
result = self.provider.stream_merge_chunks(result, chunk)

if inside_thinking:
emit("\n</thinking>\n\n")
yield "\n</thinking>\n\n"

turn = self.provider.stream_turn(
result,
has_data_model=data_model is not None,
Expand Down Expand Up @@ -2777,20 +2791,34 @@ def emit(text: str | Content):
)

result = None
inside_thinking = False

async for chunk in response:
content = self.provider.stream_content(chunk)
if content is not None:
text = content_text(content)
if text:
is_thinking = isinstance(content, ContentThinking)
if is_thinking and not inside_thinking:
emit("<thinking>\n")
yield "<thinking>\n"
inside_thinking = True
elif not is_thinking and inside_thinking:
emit("\n</thinking>\n\n")
yield "\n</thinking>\n\n"
inside_thinking = False

emit(text)
if content_mode == "all" and isinstance(
content, ContentThinking
):
if content_mode == "all" and is_thinking:
yield content
else:
yield text
result = self.provider.stream_merge_chunks(result, chunk)

if inside_thinking:
emit("\n</thinking>\n\n")
yield "\n</thinking>\n\n"

turn = self.provider.stream_turn(
result,
has_data_model=data_model is not None,
Expand Down
20 changes: 18 additions & 2 deletions chatlas/_content.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,14 @@
from typing import TYPE_CHECKING, Any, Literal, Optional, Union, cast

import orjson
from pydantic import BaseModel, ConfigDict, Field, field_serializer, field_validator
from pydantic import (
BaseModel,
ConfigDict,
Field,
PrivateAttr,
field_serializer,
field_validator,
)

from ._typing_extensions import TypedDict

Expand Down Expand Up @@ -624,11 +631,20 @@ class ContentThinking(Content):

thinking: str
extra: Optional[dict[str, Any]] = None
_complete: bool = PrivateAttr(default=True)

content_type: ContentTypeEnum = "thinking"

@classmethod
def _as_chunk(cls, thinking: str) -> "ContentThinking":
obj = cls.model_construct(thinking=thinking, content_type="thinking")
obj._complete = False
return obj

def __str__(self):
return f"<thinking>\n{self.thinking}\n</thinking>\n"
if self._complete:
return f"<thinking>\n{self.thinking}\n</thinking>\n"
return self.thinking

def _repr_html_(self):
return str(self.tagify())
Expand Down
2 changes: 1 addition & 1 deletion chatlas/_provider_anthropic.py
Original file line number Diff line number Diff line change
Expand Up @@ -468,7 +468,7 @@ def stream_content(self, chunk) -> Optional[Content]:
if chunk.delta.type == "text_delta":
return ContentText.model_construct(text=chunk.delta.text)
if chunk.delta.type == "thinking_delta":
return ContentThinking(thinking=chunk.delta.thinking)
return ContentThinking._as_chunk(chunk.delta.thinking)
return None

def stream_merge_chunks(self, completion, chunk):
Expand Down
2 changes: 1 addition & 1 deletion chatlas/_provider_google.py
Original file line number Diff line number Diff line change
Expand Up @@ -377,7 +377,7 @@ def stream_content(self, chunk) -> Optional[Content]:
if text is None:
return None
if getattr(part, "thought", None):
return ContentThinking(thinking=text)
return ContentThinking._as_chunk(text)
return ContentText.model_construct(text=text)

def stream_merge_chunks(self, completion, chunk):
Expand Down
8 changes: 4 additions & 4 deletions chatlas/_provider_openai.py
Original file line number Diff line number Diff line change
Expand Up @@ -298,11 +298,11 @@ def stream_content(self, chunk) -> Optional[Content]:
return ContentText.model_construct(text=chunk.delta)
if chunk.type == "response.reasoning_summary_text.delta":
# https://platform.openai.com/docs/api-reference/responses-streaming/response/reasoning_summary_text/delta
return ContentThinking(thinking=chunk.delta)
return ContentThinking._as_chunk(chunk.delta)
if chunk.type == "response.reasoning_summary_text.done":
# Separator between reasoning summary and response text
# https://platform.openai.com/docs/api-reference/responses-streaming/response/reasoning_summary_text/done
return ContentText.model_construct(text="\n\n")
# The thinking→text transition in _submit_turns already emits
# "\n</thinking>\n\n" which provides the visual separator.
return None
return None

def stream_merge_chunks(self, completion, chunk):
Expand Down
243 changes: 243 additions & 0 deletions tests/test_stream_thinking.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,243 @@
"""Tests for streaming thinking tag boundary emission."""

from collections.abc import Sequence
from typing import Optional

import pytest
from chatlas import Chat
from chatlas._content import Content, ContentText, ContentThinking
from chatlas._provider import Provider
from chatlas._turn import AssistantTurn


class FakeChunk:
"""A fake chunk that carries a Content object."""

def __init__(self, content: Optional[Content]):
self.content = content


class FakeProvider(Provider):
"""Minimal provider that yields a predetermined sequence of content chunks."""

def __init__(self, chunks: Sequence[Optional[Content]]):
super().__init__(name="fake", model="fake-model")
self._chunks = chunks

def list_models(self):
return []

def chat_perform(self, *, stream, turns, tools, data_model, kwargs):
if stream:
return iter([FakeChunk(c) for c in self._chunks])
raise NotImplementedError

async def chat_perform_async(self, *, stream, turns, tools, data_model, kwargs):
if stream:

async def _gen():
for c in self._chunks:
yield FakeChunk(c)

return _gen()
raise NotImplementedError

def stream_content(self, chunk) -> Optional[Content]:
return chunk.content

def stream_merge_chunks(self, completion, chunk):
return completion or {}

def stream_turn(self, completion, has_data_model):
return AssistantTurn(
contents=[ContentText.model_construct(text="response")],
tokens=None,
completion=None,
)

def value_turn(self, completion, has_data_model):
raise NotImplementedError

def value_tokens(self, completion):
return None

def value_cost(self, completion, tokens=None):
return None

def token_count(self, *args, **kwargs):
return 0

async def token_count_async(self, *args, **kwargs):
return 0

def translate_model_params(self, *args, **kwargs):
return {}

def supported_model_params(self):
return set()


def _make_chat(chunks: Sequence[Optional[Content]]) -> Chat:
provider = FakeProvider(chunks)
return Chat(provider=provider)


class TestStreamThinkingText:
"""Tests for content='text' mode — tags should be yielded as string chunks."""

def test_thinking_then_text(self):
"""Streaming thinking → text produces proper tag boundaries."""
chunks = [
ContentThinking._as_chunk("step 1 "),
ContentThinking._as_chunk("step 2"),
ContentText.model_construct(text="Hello world"),
]
chat = _make_chat(chunks)
result = list(chat.stream("test"))
combined = "".join(result)
assert combined == "<thinking>\nstep 1 step 2\n</thinking>\n\nHello world"

def test_thinking_only(self):
"""If stream ends during thinking, close tag is still emitted."""
chunks = [
ContentThinking._as_chunk("reasoning here"),
]
chat = _make_chat(chunks)
result = list(chat.stream("test"))
combined = "".join(result)
assert combined == "<thinking>\nreasoning here\n</thinking>\n\n"

def test_text_only(self):
"""No thinking chunks means no tags emitted."""
chunks = [
ContentText.model_construct(text="Just text"),
]
chat = _make_chat(chunks)
result = list(chat.stream("test"))
combined = "".join(result)
assert combined == "Just text"

def test_tag_chunks_are_separate(self):
"""Opening and closing tags are yielded as separate chunks."""
chunks = [
ContentThinking._as_chunk("thought"),
ContentText.model_construct(text="answer"),
]
chat = _make_chat(chunks)
result = list(chat.stream("test"))
assert result[0] == "<thinking>\n"
assert result[1] == "thought"
assert result[2] == "\n</thinking>\n\n"
assert result[3] == "answer"


class TestStreamThinkingAll:
"""Tests for content='all' mode — ContentThinking objects AND tag boundary strings yielded."""

def test_thinking_then_text(self):
"""content='all' yields tag boundaries AND ContentThinking objects."""
chunks = [
ContentThinking._as_chunk("step 1 "),
ContentThinking._as_chunk("step 2"),
ContentText.model_construct(text="Hello"),
]
chat = _make_chat(chunks)
result = list(chat.stream("test", content="all"))

thinking_chunks = [x for x in result if isinstance(x, ContentThinking)]
str_chunks = [x for x in result if isinstance(x, str)]

assert len(thinking_chunks) == 2
assert thinking_chunks[0].thinking == "step 1 "
assert thinking_chunks[1].thinking == "step 2"
assert "<thinking>\n" in str_chunks
assert "\n</thinking>\n\n" in str_chunks
assert "Hello" in str_chunks

def test_tag_boundaries_yielded(self):
"""content='all' mode SHOULD yield tag boundary strings."""
chunks = [
ContentThinking._as_chunk("thought"),
ContentText.model_construct(text="answer"),
]
chat = _make_chat(chunks)
result = list(chat.stream("test", content="all"))

str_chunks = [x for x in result if isinstance(x, str)]
assert "<thinking>\n" in str_chunks
assert "\n</thinking>\n\n" in str_chunks

def test_order_of_chunks(self):
"""content='all' mode: open tag, ContentThinking objects, close tag, then text."""
chunks = [
ContentThinking._as_chunk("thought"),
ContentText.model_construct(text="answer"),
]
chat = _make_chat(chunks)
result = list(chat.stream("test", content="all"))

assert result[0] == "<thinking>\n"
assert isinstance(result[1], ContentThinking)
assert result[1].thinking == "thought"
assert result[2] == "\n</thinking>\n\n"
assert result[3] == "answer"

def test_str_on_chunk_has_no_tags(self):
"""Calling str() on yielded ContentThinking chunks should not wrap in tags."""
chunks = [
ContentThinking._as_chunk("thought"),
ContentText.model_construct(text="answer"),
]
chat = _make_chat(chunks)
result = list(chat.stream("test", content="all"))

thinking_chunks = [x for x in result if isinstance(x, ContentThinking)]
assert len(thinking_chunks) == 1
assert str(thinking_chunks[0]) == "thought"


@pytest.mark.asyncio
class TestStreamThinkingAsync:
"""Tests for async streaming with thinking boundaries."""

async def test_thinking_then_text_async(self):
"""Async streaming thinking → text produces proper tag boundaries."""
chunks = [
ContentThinking._as_chunk("async thought "),
ContentThinking._as_chunk("more"),
ContentText.model_construct(text="response"),
]
chat = _make_chat(chunks)
result = [chunk async for chunk in await chat.stream_async("test")]
combined = "".join(result)
assert combined == "<thinking>\nasync thought more\n</thinking>\n\nresponse"

async def test_thinking_only_async(self):
"""Async: close tag emitted even if stream ends during thinking."""
chunks = [
ContentThinking._as_chunk("reasoning"),
]
chat = _make_chat(chunks)
result = [chunk async for chunk in await chat.stream_async("test")]
combined = "".join(result)
assert combined == "<thinking>\nreasoning\n</thinking>\n\n"

async def test_content_all_async(self):
"""Async content='all' yields tag boundaries AND ContentThinking objects."""
chunks = [
ContentThinking._as_chunk("thought"),
ContentText.model_construct(text="answer"),
]
chat = _make_chat(chunks)
result = [chunk async for chunk in await chat.stream_async("test", content="all")]

thinking_chunks = [x for x in result if isinstance(x, ContentThinking)]
str_chunks = [x for x in result if isinstance(x, str)]

assert len(thinking_chunks) == 1
assert thinking_chunks[0].thinking == "thought"
assert "<thinking>\n" in str_chunks
assert "\n</thinking>\n\n" in str_chunks
assert "answer" in str_chunks
assert result[0] == "<thinking>\n"
assert result[2] == "\n</thinking>\n\n"