diff --git a/CHANGELOG.md b/CHANGELOG.md
index 87b67b78..1b9cfab8 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -12,6 +12,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### New features
* `ChatOpenAICompletions()` (and providers built on it like `ChatDeepSeek`, `ChatOpenRouter`, etc.) now extracts `reasoning_content` from model responses as `ContentThinking` objects. A new `preserve_thinking` parameter controls whether reasoning content is sent back to the API in multi-turn conversations; it defaults to `False` but is set to `True` for `ChatDeepSeek` (required for V4 tool-calling) and `ChatOpenRouter` (recommended for quality). (#295)
+* New `StreamController` class for cooperative stream cancellation. Pass a controller to `.stream()` or `.stream_async()` and call `controller.cancel()` to stop the stream cleanly (e.g., from a Shiny "stop generating" button). The partial response is preserved in conversation history. (#279)
+* When a stream is interrupted (closed early, cancelled, or errors), the accumulated content is now saved as a partial `AssistantTurn` so conversation state isn't lost. Partial turns display `[interrupted]` (or the cancellation reason) in the `Chat` repr and are excluded from token/cost accounting. (#279)
### Improvements
diff --git a/chatlas/__init__.py b/chatlas/__init__.py
index 880d0952..6f6073d5 100644
--- a/chatlas/__init__.py
+++ b/chatlas/__init__.py
@@ -34,6 +34,7 @@
from ._provider_perplexity import ChatPerplexity
from ._provider_portkey import ChatPortkey
from ._provider_snowflake import ChatSnowflake
+from ._stream_controller import StreamController
from ._tokens import token_usage
from ._tools import Tool, ToolBuiltIn, ToolRejectError
from ._tools_builtin import tool_web_fetch, tool_web_search
@@ -85,6 +86,7 @@
"interpolate",
"interpolate_file",
"Provider",
+ "StreamController",
"token_usage",
"Tool",
"ToolBuiltIn",
diff --git a/chatlas/_chat.py b/chatlas/_chat.py
index 9beafd65..abd90292 100644
--- a/chatlas/_chat.py
+++ b/chatlas/_chat.py
@@ -34,7 +34,6 @@
Content,
ContentJson,
ContentText,
- ContentThinking,
ContentThinkingDelta,
ContentToolRequest,
ContentToolResult,
@@ -50,9 +49,11 @@
from ._logging import log_tool_error
from ._mcp_manager import MCPSessionManager
from ._provider import ModelInfo, Provider, StandardModelParams, SubmitInputArgsT
+from ._stream_controller import StreamController
from ._tokens import tokens_log
from ._tools import Tool, ToolBuiltIn, ToolRejectError
from ._turn import AssistantTurn, SystemTurn, Turn, UserTurn, user_turn
+from ._turn_accumulator import TurnAccumulator
from ._typing_extensions import TypedDict, TypeGuard
from ._utils import MISSING, MISSING_TYPE, html_escape, wrap_async
@@ -400,6 +401,19 @@ def get_tokens(self) -> list[TokensDict]:
turns = self.get_turns(include_system_prompt=False)
+ # Exclude partial assistant turns and their preceding user turns
+ # (partial turns have no token data)
+ filtered: list[Turn] = []
+ i = 0
+ while i < len(turns):
+ next_turn = turns[i + 1] if i + 1 < len(turns) else None
+ if isinstance(next_turn, AssistantTurn) and next_turn.is_partial:
+ i += 2 # skip user + partial assistant pair
+ else:
+ filtered.append(turns[i])
+ i += 1
+ turns = filtered
+
if len(turns) == 0:
return []
@@ -525,7 +539,9 @@ def get_cost(
The cost of the chat, in USD.
"""
- assistant_turns = [t for t in self._turns if isinstance(t, AssistantTurn)]
+ assistant_turns = [
+ t for t in self._turns if isinstance(t, AssistantTurn) and not t.is_partial
+ ]
if len(assistant_turns) == 0:
return 0.0
@@ -1070,6 +1086,7 @@ def chat(
display = self._markdown_display(echo=echo)
+ controller = StreamController()
response = ChatResponse(
self._chat_impl(
turn,
@@ -1077,6 +1094,7 @@ def chat(
content="text",
stream=stream,
kwargs=kwargs,
+ controller=controller,
)
)
@@ -1123,6 +1141,7 @@ async def chat_async(
display = self._markdown_display(echo=echo)
+ controller = StreamController()
response = ChatResponseAsync(
self._chat_impl_async(
turn,
@@ -1130,6 +1149,7 @@ async def chat_async(
content="text",
stream=stream,
kwargs=kwargs,
+ controller=controller,
),
)
@@ -1147,6 +1167,7 @@ def stream(
echo: EchoOptions = "none",
data_model: Optional[type[BaseModel]] = None,
kwargs: Optional[SubmitInputArgsT] = None,
+ controller: StreamController | None = None,
) -> Generator[str, None, None]: ...
@overload
@@ -1157,6 +1178,7 @@ def stream(
echo: EchoOptions = "none",
data_model: Optional[type[BaseModel]] = None,
kwargs: Optional[SubmitInputArgsT] = None,
+ controller: StreamController | None = None,
) -> Generator[
str | ContentThinkingDelta | ContentToolRequest | ContentToolResult, None, None
]: ...
@@ -1168,6 +1190,7 @@ def stream(
echo: EchoOptions = "none",
data_model: Optional[type[BaseModel]] = None,
kwargs: Optional[SubmitInputArgsT] = None,
+ controller: StreamController | None = None,
) -> Generator[
str | ContentThinkingDelta | ContentToolRequest | ContentToolResult, None, None
]:
@@ -1196,6 +1219,14 @@ def stream(
kwargs
Additional keyword arguments to pass to the method used for requesting
the response.
+ controller
+ A [](`~chatlas.StreamController`) for cooperative stream cancellation.
+ When provided, calling `controller.cancel()` stops the stream after the
+ current chunk and preserves the partial response in conversation history.
+ This is useful for building UIs (e.g., a "stop generating" button) where
+ you want to interrupt a response without losing what's been generated so
+ far. The same controller can be reused across multiple streams by calling
+ `controller.reset()` before starting a new stream.
Returns
-------
@@ -1224,6 +1255,8 @@ class Person(BaseModel):
display = self._markdown_display(echo=echo)
+ controller = _as_controller(controller)
+
generator = self._chat_impl(
turn,
stream=True,
@@ -1231,10 +1264,13 @@ class Person(BaseModel):
content=content,
kwargs=kwargs,
data_model=data_model,
+ controller=controller,
)
def wrapper() -> Generator[
- str | ContentThinkingDelta | ContentToolRequest | ContentToolResult, None, None
+ str | ContentThinkingDelta | ContentToolRequest | ContentToolResult,
+ None,
+ None,
]:
with display:
for chunk in generator:
@@ -1250,6 +1286,7 @@ async def stream_async(
echo: EchoOptions = "none",
data_model: Optional[type[BaseModel]] = None,
kwargs: Optional[SubmitInputArgsT] = None,
+ controller: StreamController | None = None,
) -> AsyncGenerator[str, None]: ...
@overload
@@ -1260,6 +1297,7 @@ async def stream_async(
echo: EchoOptions = "none",
data_model: Optional[type[BaseModel]] = None,
kwargs: Optional[SubmitInputArgsT] = None,
+ controller: StreamController | None = None,
) -> AsyncGenerator[
str | ContentThinkingDelta | ContentToolRequest | ContentToolResult, None
]: ...
@@ -1271,6 +1309,7 @@ async def stream_async(
echo: EchoOptions = "none",
data_model: Optional[type[BaseModel]] = None,
kwargs: Optional[SubmitInputArgsT] = None,
+ controller: StreamController | None = None,
) -> AsyncGenerator[
str | ContentThinkingDelta | ContentToolRequest | ContentToolResult, None
]:
@@ -1299,6 +1338,14 @@ async def stream_async(
kwargs
Additional keyword arguments to pass to the method used for requesting
the response.
+ controller
+ A [](`~chatlas.StreamController`) for cooperative stream cancellation.
+ When provided, calling `controller.cancel()` stops the stream after the
+ current chunk and preserves the partial response in conversation history.
+ This is useful for building UIs (e.g., a "stop generating" button) where
+ you want to interrupt a response without losing what's been generated so
+ far. The same controller can be reused across multiple streams by calling
+ `controller.reset()` before starting a new stream.
Returns
-------
@@ -1332,6 +1379,8 @@ class Person(BaseModel):
display = self._markdown_display(echo=echo)
+ controller = _as_controller(controller)
+
async def wrapper() -> AsyncGenerator[
str | ContentThinkingDelta | ContentToolRequest | ContentToolResult, None
]:
@@ -1343,6 +1392,7 @@ async def wrapper() -> AsyncGenerator[
content=content,
kwargs=kwargs,
data_model=data_model,
+ controller=controller,
):
yield chunk
@@ -2483,6 +2533,7 @@ def _chat_impl(
stream: bool,
kwargs: Optional[SubmitInputArgsT] = None,
data_model: Optional[type[BaseModel]] = None,
+ controller: StreamController | None = None,
) -> Generator[str, None, None]: ...
@overload
@@ -2494,6 +2545,7 @@ def _chat_impl(
stream: bool,
kwargs: Optional[SubmitInputArgsT] = None,
data_model: Optional[type[BaseModel]] = None,
+ controller: StreamController | None = None,
) -> Generator[
str | ContentThinkingDelta | ContentToolRequest | ContentToolResult, None, None
]: ...
@@ -2506,7 +2558,10 @@ def _chat_impl(
stream: bool,
kwargs: Optional[SubmitInputArgsT] = None,
data_model: Optional[type[BaseModel]] = None,
+ controller: StreamController | None = None,
) -> Generator[str | Content, None, None]:
+ controller = _as_controller(controller)
+
user_turn_result: UserTurn | None = user_turn
while user_turn_result is not None:
for chunk in self._submit_turns(
@@ -2516,6 +2571,7 @@ def _chat_impl(
data_model=data_model,
kwargs=kwargs,
content_mode=content,
+ controller=controller,
):
yield chunk
@@ -2523,6 +2579,10 @@ def _chat_impl(
assert turn is not None
user_turn_result = None
+ # Don't invoke tools if the stream was cancelled
+ if controller.cancelled:
+ break
+
all_results: list[ContentToolResult] = []
for x in turn.contents:
if isinstance(x, ContentToolRequest):
@@ -2553,6 +2613,7 @@ def _chat_impl_async(
stream: bool,
kwargs: Optional[SubmitInputArgsT] = None,
data_model: Optional[type[BaseModel]] = None,
+ controller: StreamController | None = None,
) -> AsyncGenerator[str, None]: ...
@overload
@@ -2564,6 +2625,7 @@ def _chat_impl_async(
stream: bool,
kwargs: Optional[SubmitInputArgsT] = None,
data_model: Optional[type[BaseModel]] = None,
+ controller: StreamController | None = None,
) -> AsyncGenerator[
str | ContentThinkingDelta | ContentToolRequest | ContentToolResult, None
]: ...
@@ -2576,7 +2638,10 @@ async def _chat_impl_async(
stream: bool,
kwargs: Optional[SubmitInputArgsT] = None,
data_model: Optional[type[BaseModel]] = None,
+ controller: StreamController | None = None,
) -> AsyncGenerator[str | Content, None]:
+ controller = _as_controller(controller)
+
user_turn_result: UserTurn | None = user_turn
while user_turn_result is not None:
async for chunk in self._submit_turns_async(
@@ -2586,6 +2651,7 @@ async def _chat_impl_async(
data_model=data_model,
kwargs=kwargs,
content_mode=content,
+ controller=controller,
):
yield chunk
@@ -2593,6 +2659,10 @@ async def _chat_impl_async(
assert turn is not None
user_turn_result = None
+ # Don't invoke tools if the stream was cancelled
+ if controller.cancelled:
+ break
+
all_results: list[ContentToolResult] = []
for x in turn.contents:
if isinstance(x, ContentToolRequest):
@@ -2625,6 +2695,7 @@ def _submit_turns(
data_model: type[BaseModel] | None = None,
kwargs: Optional[SubmitInputArgsT] = None,
content_mode: Literal["text"] = "text",
+ controller: StreamController | None = None,
) -> Generator[str, None, None]: ...
@overload
@@ -2637,6 +2708,7 @@ def _submit_turns(
kwargs: Optional[SubmitInputArgsT] = None,
*,
content_mode: Literal["all"],
+ controller: StreamController | None = None,
) -> Generator[str | Content, None, None]: ...
def _submit_turns(
@@ -2647,7 +2719,10 @@ def _submit_turns(
data_model: type[BaseModel] | None = None,
kwargs: Optional[SubmitInputArgsT] = None,
content_mode: Literal["text", "all"] = "text",
+ controller: StreamController | None = None,
) -> Generator[str | Content, None, None]:
+ controller = _as_controller(controller)
+
if any(isinstance(x, Tool) and x._is_async for x in self._tools.values()):
raise ValueError("Cannot use async tools in a synchronous chat")
@@ -2671,47 +2746,39 @@ def emit(text: str | Content):
kwargs=all_kwargs,
)
- result = None
- inside_thinking = False
-
- for chunk in response:
- content = self.provider.stream_content(chunk)
- if content is not None:
- if is_thinking_delta(content) and not inside_thinking:
- content = ContentThinkingDelta(
- thinking=content.thinking, phase="start"
- )
- emit("\n")
- inside_thinking = True
- elif not is_thinking_delta(content) and inside_thinking:
- emit("\n\n\n")
- if content_mode == "all":
- yield ContentThinkingDelta(thinking="", phase="end")
- inside_thinking = False
-
- if is_thinking_delta(content):
- emit(content.thinking)
- if content_mode == "all":
- yield content
- else:
- text = content_text(content)
- if text:
- emit(text)
- yield text
- result = self.provider.stream_merge_chunks(result, chunk)
-
- if inside_thinking:
- emit("\n\n\n")
- if content_mode == "all":
- yield ContentThinkingDelta(thinking="", phase="end")
-
- turn = self.provider.stream_turn(
- result,
- has_data_model=data_model is not None,
- )
-
- if echo == "all":
- emit_other_contents(turn, emit)
+ acc = TurnAccumulator(self._turns, controller)
+ acc.begin_turn(user_turn)
+
+ try:
+ result = None
+ for chunk in response:
+ if controller.cancelled:
+ break
+ content = self.provider.stream_content(chunk)
+ if content is not None:
+ yield from acc.process_content(content, content_mode, emit)
+ result = self.provider.stream_merge_chunks(result, chunk)
+
+ yield from acc.flush_thinking(content_mode, emit)
+
+ if not controller.cancelled:
+ turn = self.provider.stream_turn(
+ result,
+ has_data_model=data_model is not None,
+ )
+ if echo == "all":
+ emit_other_contents(turn, emit)
+ turn = resolve_assistant_turn(self.provider, turn)
+ acc.complete_turn(turn)
+ finally:
+ acc.finalize_turn()
+ # Breaking out of `for chunk in response` doesn't close the
+ # provider's underlying HTTP connection — only the temporary
+ # iterator is released. Explicitly close so connections aren't
+ # held until GC collects the Stream object.
+ _r: Any = response
+ if hasattr(_r, "close"):
+ _r.close()
else:
response = self.provider.chat_perform(
@@ -2732,17 +2799,8 @@ def emit(text: str | Content):
if echo == "all":
emit_other_contents(turn, emit)
- if not isinstance(turn, AssistantTurn):
- raise TypeError(
- f"Expected turn to be AssistantTurn, got {type(turn).__name__}"
- )
- if turn.tokens is None and turn.completion:
- turn.tokens = self.provider.value_tokens(turn.completion)
- if turn.cost is None and turn.completion:
- turn.cost = self.provider.value_cost(turn.completion, turn.tokens)
- if turn.tokens is not None:
- tokens_log(self.provider, turn.tokens)
- self._turns.extend([user_turn, turn])
+ turn = resolve_assistant_turn(self.provider, turn)
+ self._turns.extend([user_turn, turn])
@overload
def _submit_turns_async(
@@ -2753,6 +2811,7 @@ def _submit_turns_async(
data_model: type[BaseModel] | None = None,
kwargs: Optional[SubmitInputArgsT] = None,
content_mode: Literal["text"] = "text",
+ controller: StreamController | None = None,
) -> AsyncGenerator[str, None]: ...
@overload
@@ -2765,6 +2824,7 @@ def _submit_turns_async(
kwargs: Optional[SubmitInputArgsT] = None,
*,
content_mode: Literal["all"],
+ controller: StreamController | None = None,
) -> AsyncGenerator[str | Content, None]: ...
async def _submit_turns_async(
@@ -2775,7 +2835,10 @@ async def _submit_turns_async(
data_model: type[BaseModel] | None = None,
kwargs: Optional[SubmitInputArgsT] = None,
content_mode: Literal["text", "all"] = "text",
+ controller: StreamController | None = None,
) -> AsyncGenerator[str | Content, None]:
+ controller = _as_controller(controller)
+
def emit(text: str | Content):
self._echo_content(str(text))
@@ -2796,47 +2859,45 @@ def emit(text: str | Content):
kwargs=all_kwargs,
)
- result = None
- inside_thinking = False
-
- async for chunk in response:
- content = self.provider.stream_content(chunk)
- if content is not None:
- if is_thinking_delta(content) and not inside_thinking:
- content = ContentThinkingDelta(
- thinking=content.thinking, phase="start"
- )
- emit("\n")
- inside_thinking = True
- elif not is_thinking_delta(content) and inside_thinking:
- emit("\n\n\n")
- if content_mode == "all":
- yield ContentThinkingDelta(thinking="", phase="end")
- inside_thinking = False
-
- if is_thinking_delta(content):
- emit(content.thinking)
- if content_mode == "all":
- yield content
+ acc = TurnAccumulator(self._turns, controller)
+ acc.begin_turn(user_turn)
+
+ try:
+ result = None
+ async for chunk in response:
+ if controller.cancelled:
+ break
+ content = self.provider.stream_content(chunk)
+ if content is not None:
+ for item in acc.process_content(content, content_mode, emit):
+ yield item
+ result = self.provider.stream_merge_chunks(result, chunk)
+
+ for item in acc.flush_thinking(content_mode, emit):
+ yield item
+
+ if not controller.cancelled:
+ turn = self.provider.stream_turn(
+ result,
+ has_data_model=data_model is not None,
+ )
+ if echo == "all":
+ emit_other_contents(turn, emit)
+ turn = resolve_assistant_turn(self.provider, turn)
+ acc.complete_turn(turn)
+ finally:
+ acc.finalize_turn()
+ # Same as sync path above, but async generator finalization
+ # (PEP 525) is even less deterministic — abandoned generators
+ # may not close until event loop shutdown.
+ _r: Any = response
+ if hasattr(_r, "aclose"):
+ await _r.aclose()
+ elif hasattr(_r, "close"):
+ if inspect.iscoroutinefunction(_r.close):
+ await _r.close()
else:
- text = content_text(content)
- if text:
- emit(text)
- yield text
- result = self.provider.stream_merge_chunks(result, chunk)
-
- if inside_thinking:
- emit("\n\n\n")
- if content_mode == "all":
- yield ContentThinkingDelta(thinking="", phase="end")
-
- turn = self.provider.stream_turn(
- result,
- has_data_model=data_model is not None,
- )
-
- if echo == "all":
- emit_other_contents(turn, emit)
+ _r.close()
else:
response = await self.provider.chat_perform_async(
@@ -2857,17 +2918,8 @@ def emit(text: str | Content):
if echo == "all":
emit_other_contents(turn, emit)
- if not isinstance(turn, AssistantTurn):
- raise TypeError(
- f"Expected turn to be AssistantTurn, got {type(turn).__name__}"
- )
- if turn.tokens is None and turn.completion:
- turn.tokens = self.provider.value_tokens(turn.completion)
- if turn.cost is None and turn.completion:
- turn.cost = self.provider.value_cost(turn.completion, turn.tokens)
- if turn.tokens is not None:
- tokens_log(self.provider, turn.tokens)
- self._turns.extend([user_turn, turn])
+ turn = resolve_assistant_turn(self.provider, turn)
+ self._turns.extend([user_turn, turn])
def _collect_all_kwargs(
self,
@@ -3075,18 +3127,20 @@ def __str__(self):
from ._repr import format_tokens
turns = self.get_turns(include_system_prompt=True)
- assistant_turns = [t for t in turns if isinstance(t, AssistantTurn)]
+ complete_assistant_turns = [
+ t for t in turns if isinstance(t, AssistantTurn) and not t.is_partial
+ ]
- # Sum tokens across assistant turns
+ # Sum tokens across complete assistant turns
tokens: tuple[int, int, int] | None = None
- if any(t.tokens for t in assistant_turns):
+ if any(t.tokens for t in complete_assistant_turns):
tokens = (
- sum(t.tokens[0] for t in assistant_turns if t.tokens),
- sum(t.tokens[1] for t in assistant_turns if t.tokens),
- sum(t.tokens[2] for t in assistant_turns if t.tokens),
+ sum(t.tokens[0] for t in complete_assistant_turns if t.tokens),
+ sum(t.tokens[1] for t in complete_assistant_turns if t.tokens),
+ sum(t.tokens[2] for t in complete_assistant_turns if t.tokens),
)
- costs = [t.cost for t in assistant_turns if t.cost is not None]
+ costs = [t.cost for t in complete_assistant_turns if t.cost is not None]
total_cost = sum(costs) if costs else None
res = f" TypeGuard[ContentThinkingDelta]:
- return isinstance(content, ContentThinkingDelta)
-
-
-def content_text(content: Content) -> str:
- """Extract displayable text from a Content object."""
- if isinstance(content, ContentThinkingDelta):
- return content.thinking
- if isinstance(content, ContentThinking):
- return content.thinking
- if isinstance(content, ContentText):
- return content.text
- return str(content)
+def _as_controller(controller: StreamController | None) -> StreamController:
+ """Ensure a non-None, ready-to-use StreamController."""
+ if controller is None:
+ return StreamController()
+ controller._ensure_ready()
+ return controller
+
+
+def resolve_assistant_turn(provider: Provider, turn: Turn) -> AssistantTurn:
+ """Validate turn type, compute tokens and cost, and log usage."""
+ if not isinstance(turn, AssistantTurn):
+ raise TypeError(f"Expected turn to be AssistantTurn, got {type(turn).__name__}")
+ if turn.tokens is None and turn.completion:
+ turn.tokens = provider.value_tokens(turn.completion)
+ if turn.cost is None and turn.completion:
+ turn.cost = provider.value_cost(turn.completion, turn.tokens)
+ if turn.tokens is not None:
+ tokens_log(provider, turn.tokens)
+ return turn
def is_quarto():
diff --git a/chatlas/_stream_controller.py b/chatlas/_stream_controller.py
new file mode 100644
index 00000000..655c063a
--- /dev/null
+++ b/chatlas/_stream_controller.py
@@ -0,0 +1,76 @@
+from __future__ import annotations
+
+import warnings
+
+
+class StreamController:
+ """
+ Cooperative cancellation handle for streaming responses.
+
+ Create a controller and pass it to
+ :meth:`~chatlas.Chat.stream` or :meth:`~chatlas.Chat.stream_async`
+ via the ``controller`` argument, then call :meth:`cancel` from
+ anywhere (e.g., a Shiny observer) to stop the stream after the
+ current chunk.
+
+ The same controller can be reused across multiple streams. Call
+ :meth:`reset` to clear the cancelled state before starting a new
+ stream.
+
+ Examples
+ --------
+ ```python
+ from chatlas import ChatOpenAI, StreamController
+
+ chat = ChatOpenAI()
+ ctrl = StreamController()
+
+ i = 0
+ for chunk in chat.stream("Write a story", controller=ctrl):
+ i += 1
+ print(chunk, end="")
+ if i > 10:
+ ctrl.cancel()
+
+ # Partial response is preserved in history
+ print(chat.get_turns())
+ ```
+ """
+
+ def __init__(self):
+ self._cancelled: bool = False
+ self._reason: str | None = None
+
+ def cancel(self, reason: str = "cancelled") -> None:
+ """Cancel the stream. The reason is stored on the partial turn."""
+ # Set reason before cancelled so that readers who see cancelled=True
+ # will always find a reason. This is safe under CPython's GIL; on
+ # free-threaded builds the worst case is a benign extra iteration
+ # before the streaming loop notices the cancellation.
+ self._reason = reason
+ self._cancelled = True
+
+ def reset(self) -> None:
+ """Clear the cancelled state and reason."""
+ self._cancelled = False
+ self._reason = None
+
+ def _ensure_ready(self) -> None:
+ """Auto-reset if already cancelled (prevents stale controller bugs)."""
+ if self._cancelled:
+ warnings.warn(
+ "StreamController was already cancelled — resetting automatically. "
+ "Call controller.reset() explicitly to avoid this warning.",
+ stacklevel=3,
+ )
+ self.reset()
+
+ @property
+ def cancelled(self) -> bool:
+ """Whether the controller has been cancelled."""
+ return self._cancelled
+
+ @property
+ def reason(self) -> str | None:
+ """The cancellation reason, or None if not cancelled."""
+ return self._reason
diff --git a/chatlas/_turn.py b/chatlas/_turn.py
index 2cdcbf60..58246c59 100644
--- a/chatlas/_turn.py
+++ b/chatlas/_turn.py
@@ -308,6 +308,9 @@ class AssistantTurn(Turn, Generic[CompletionT]):
cost
The cost of this turn in USD. This is computed when the turn is created
based on the token usage and pricing information (including service tier).
+ partial_reason
+ If set, indicates this turn is incomplete (e.g., the stream was interrupted
+ or cancelled). The value describes the reason for the partial state.
See Also
--------
@@ -322,6 +325,12 @@ class AssistantTurn(Turn, Generic[CompletionT]):
finish_reason: Optional[str] = None
completion: Optional[CompletionT] = Field(default=None, exclude=True)
cost: Optional[float] = None
+ partial_reason: Optional[str] = None
+
+ @property
+ def is_partial(self) -> bool:
+ """Whether this turn is a partial (interrupted/cancelled) turn."""
+ return self.partial_reason is not None
@field_validator("tokens", mode="before")
@classmethod
@@ -339,6 +348,7 @@ def __init__(
finish_reason: Optional[str] = None,
completion: Optional[CompletionT] = None,
cost: Optional[float] = None,
+ partial_reason: Optional[str] = None,
**kwargs,
):
if isinstance(tokens, list):
@@ -353,6 +363,8 @@ def __init__(
kwargs["completion"] = completion
if cost is not None:
kwargs["cost"] = cost
+ if partial_reason is not None:
+ kwargs["partial_reason"] = partial_reason
super().__init__(contents, **kwargs)
diff --git a/chatlas/_turn_accumulator.py b/chatlas/_turn_accumulator.py
new file mode 100644
index 00000000..c081ba68
--- /dev/null
+++ b/chatlas/_turn_accumulator.py
@@ -0,0 +1,158 @@
+from __future__ import annotations
+
+from typing import Callable, Literal, Sequence
+
+from ._content import (
+ Content,
+ ContentText,
+ ContentThinking,
+ ContentThinkingDelta,
+ ContentUnion,
+)
+from ._stream_controller import StreamController
+from ._turn import AssistantTurn, Turn, UserTurn
+
+
+def merge_content_text(contents: list[ContentUnion]) -> list[ContentUnion]:
+ """Merge adjacent ContentText (and ContentThinking) fragments."""
+ if not contents:
+ return []
+ merged: list[ContentUnion] = [contents[0]]
+ for item in contents[1:]:
+ last = merged[-1]
+ if isinstance(last, ContentText) and isinstance(item, ContentText):
+ merged[-1] = ContentText.model_construct(text=last.text + item.text)
+ elif isinstance(last, ContentThinking) and isinstance(item, ContentThinking):
+ merged[-1] = ContentThinking.model_construct(
+ thinking=last.thinking + item.thinking,
+ extra=item.extra or last.extra,
+ )
+ else:
+ merged.append(item)
+ return merged
+
+
+class TurnAccumulator:
+ """
+ Manages the lifecycle of one streaming assistant turn.
+
+ Mirrors ellmer's TurnAccumulator R6 class. The stages are:
+
+ 1. ``begin_turn(user_turn)`` — insert user + partial assistant into turns
+ 2. ``process_content(content, ...)`` — append content, handle thinking
+ phase boundaries, emit display text, and return items to yield
+ 3. ``flush_thinking(...)`` — emit closing thinking tags after the loop
+ 4. ``complete_turn(turn)`` — replace partial with the full turn (skipped if cancelled)
+ 5. ``finalize_turn()`` — called from ``finally``; merges text fragments
+ and stamps the cancellation reason if the turn is still partial
+ """
+
+ def __init__(
+ self,
+ turns: list[Turn],
+ controller: StreamController,
+ ):
+ self._turns = turns
+ self._controller = controller
+ self._turn_idx: int | None = None
+ self._inside_thinking: bool = False
+
+ def begin_turn(self, user_turn: UserTurn) -> None:
+ """Insert user turn and a partial assistant placeholder."""
+ partial: AssistantTurn = AssistantTurn([], partial_reason="interrupted")
+ self._turns.extend([user_turn, partial])
+ self._turn_idx = len(self._turns) - 1
+
+ def process_content(
+ self,
+ content: Content,
+ content_mode: Literal["text", "all"],
+ emit: Callable[[str | Content], None],
+ ) -> Sequence[str | Content]:
+ """Append content to the turn, emit display text, return items to yield."""
+ self._update_turn(content)
+
+ items: list[str | Content] = []
+
+ if isinstance(content, ContentThinkingDelta) and not self._inside_thinking:
+ content = ContentThinkingDelta(
+ thinking=content.thinking, phase="start"
+ )
+ emit("\n")
+ self._inside_thinking = True
+ elif not isinstance(content, ContentThinkingDelta) and self._inside_thinking:
+ emit("\n\n\n")
+ if content_mode == "all":
+ items.append(ContentThinkingDelta(thinking="", phase="end"))
+ self._inside_thinking = False
+
+ if isinstance(content, ContentThinkingDelta):
+ emit(content.thinking)
+ if content_mode == "all":
+ items.append(content)
+ else:
+ text = _content_text(content)
+ if text:
+ emit(text)
+ items.append(text)
+
+ return items
+
+ def flush_thinking(
+ self,
+ content_mode: Literal["text", "all"],
+ emit: Callable[[str | Content], None],
+ ) -> Sequence[str | Content]:
+ """Emit closing thinking tags if the stream ended mid-thinking."""
+ if not self._inside_thinking:
+ return []
+ self._inside_thinking = False
+ emit("\n\n\n")
+ if content_mode == "all":
+ return [ContentThinkingDelta(thinking="", phase="end")]
+ return []
+
+ def complete_turn(self, turn: AssistantTurn) -> None:
+ """Replace the partial turn with the completed turn (no-op if cancelled)."""
+ if self._turn_idx is None:
+ raise RuntimeError("complete_turn called before begin_turn")
+ if self._controller.cancelled:
+ return
+ self._turns[self._turn_idx] = turn
+
+ def finalize_turn(self) -> None:
+ """
+ Safety net — called from ``finally``.
+
+ If the turn is still partial (i.e., ``complete_turn`` was never called
+ or was skipped because of cancellation), merge adjacent text fragments
+ and stamp the cancellation reason.
+ """
+ if self._turn_idx is None:
+ return
+ turn = self._turns[self._turn_idx]
+ if not isinstance(turn, AssistantTurn) or not turn.is_partial:
+ return
+ if self._controller.cancelled:
+ turn.partial_reason = self._controller.reason
+ turn.contents = merge_content_text(turn.contents)
+
+ def _update_turn(self, content: Content) -> None:
+ """Append streamed content to the partial turn."""
+ if self._turn_idx is None:
+ raise RuntimeError("_update_turn called before begin_turn")
+ # Content is the base class; contents is typed as list[ContentUnion]
+ # (discriminated union). At runtime all Content subclasses are ContentUnion
+ # members, so the append is safe.
+ self._turns[self._turn_idx].contents.append(content) # type: ignore[arg-type]
+
+
+def _content_text(content: Content) -> str:
+ """Extract displayable text from a Content object."""
+ if isinstance(content, ContentThinkingDelta):
+ return content.thinking
+ if isinstance(content, ContentThinking):
+ return content.thinking
+ if isinstance(content, ContentText):
+ return content.text
+ return str(content)
diff --git a/docs/get-started/stream.qmd b/docs/get-started/stream.qmd
index f5da3b62..228e37cc 100644
--- a/docs/get-started/stream.qmd
+++ b/docs/get-started/stream.qmd
@@ -78,6 +78,28 @@ for chunk in stream:
```
+## Cancelling a stream
+
+Use a [](`~chatlas.StreamController`) to stop a stream early while preserving what's been generated so far. Pass a controller to `.stream()`, then call `controller.cancel()` when you want to stop.
+
+```python
+from chatlas import StreamController
+
+ctrl = StreamController()
+
+chunks = []
+for chunk in chat.stream("Write a long story", controller=ctrl):
+ chunks.append(chunk)
+ if len(chunks) >= 5:
+ ctrl.cancel()
+
+# The partial response is preserved in history
+last_turn = chat.get_last_turn()
+print(last_turn.text)
+```
+
+This is useful for building UIs with a "stop generating" button (e.g., in a [Shiny chatbot](chatbots.qmd)). The same controller can be reused across multiple streams by calling `controller.reset()` before starting a new stream.
+
## Wrapping generators
Sometimes it's useful to wrap the `.stream()` generator up into another generator function.
diff --git a/tests/__snapshots__/test_chat.ambr b/tests/__snapshots__/test_chat.ambr
index 120ce5bd..05be2d2c 100644
--- a/tests/__snapshots__/test_chat.ambr
+++ b/tests/__snapshots__/test_chat.ambr
@@ -66,3 +66,25 @@
'''
# ---
+# name: test_partial_turn_display
+ '''
+
+
+ ## User
+
+ hello
+
+ ## Assistant [input=10 output=5 cost=$0.0010]
+
+ response
+
+ ## User
+
+ more
+
+ ## Assistant [interrupted]
+
+ partial response
+
+ '''
+# ---
diff --git a/tests/_vcr/test_chat/test_partial_turn_preserved_on_close.yaml b/tests/_vcr/test_chat/test_partial_turn_preserved_on_close.yaml
new file mode 100644
index 00000000..ba6c770d
--- /dev/null
+++ b/tests/_vcr/test_chat/test_partial_turn_preserved_on_close.yaml
@@ -0,0 +1,171 @@
+interactions:
+- request:
+ body: '{"include": ["reasoning.encrypted_content"], "input": [{"role": "user", "content": [{"type": "input_text", "text": "\n What are the canonical colors of the ROYGBIV rainbow?\n Put each colour on its own line. Don''t use punctuation.\n "}]}], "model": "gpt-5.4", "store": false, "stream": true}'
+ headers:
+ Accept:
+ - application/json
+ Accept-Encoding:
+ - gzip, deflate
+ Connection:
+ - keep-alive
+ Content-Length:
+ - '251'
+ Content-Type:
+ - application/json
+ Host:
+ - api.openai.com
+ X-Stainless-Async:
+ - 'false'
+ x-stainless-read-timeout:
+ - '600'
+ method: POST
+ uri: https://api.openai.com/v1/responses
+ response:
+ body:
+ string: 'event: response.created
+
+ data: {"type":"response.created","sequence_number":0,"response":{"id":"resp_04241dbaf3a847fc01695588904c60819c87095f38c2a25952","object":"response","created_at":1767213200,"status":"in_progress","background":false,"completed_at":null,"error":null,"incomplete_details":null,"instructions":null,"max_output_tokens":null,"max_tool_calls":null,"model":"gpt-5.4-2025-04-14","output":[],"parallel_tool_calls":true,"previous_response_id":null,"prompt_cache_key":null,"prompt_cache_retention":null,"reasoning":{"effort":null,"summary":null},"safety_identifier":null,"service_tier":"auto","store":false,"temperature":1.0,"text":{"format":{"type":"text"},"verbosity":"medium"},"tool_choice":"auto","tools":[],"top_logprobs":0,"top_p":1.0,"truncation":"disabled","usage":null,"user":null,"metadata":{}}}
+
+
+ event: response.in_progress
+
+ data: {"type":"response.in_progress","sequence_number":1,"response":{"id":"resp_04241dbaf3a847fc01695588904c60819c87095f38c2a25952","object":"response","created_at":1767213200,"status":"in_progress","background":false,"completed_at":null,"error":null,"incomplete_details":null,"instructions":null,"max_output_tokens":null,"max_tool_calls":null,"model":"gpt-5.4-2025-04-14","output":[],"parallel_tool_calls":true,"previous_response_id":null,"prompt_cache_key":null,"prompt_cache_retention":null,"reasoning":{"effort":null,"summary":null},"safety_identifier":null,"service_tier":"auto","store":false,"temperature":1.0,"text":{"format":{"type":"text"},"verbosity":"medium"},"tool_choice":"auto","tools":[],"top_logprobs":0,"top_p":1.0,"truncation":"disabled","usage":null,"user":null,"metadata":{}}}
+
+
+ event: response.output_item.added
+
+ data: {"type":"response.output_item.added","sequence_number":2,"output_index":0,"item":{"id":"msg_04241dbaf3a847fc01695588907c34819c939e24002aa91af2","type":"message","status":"in_progress","content":[],"role":"assistant"}}
+
+
+ event: response.content_part.added
+
+ data: {"type":"response.content_part.added","sequence_number":3,"item_id":"msg_04241dbaf3a847fc01695588907c34819c939e24002aa91af2","output_index":0,"content_index":0,"part":{"type":"output_text","annotations":[],"logprobs":[],"text":""}}
+
+
+ event: response.output_text.delta
+
+ data: {"type":"response.output_text.delta","sequence_number":4,"item_id":"msg_04241dbaf3a847fc01695588907c34819c939e24002aa91af2","output_index":0,"content_index":0,"delta":"Red","logprobs":[],"obfuscation":"VyLjtlp4pcHnl"}
+
+
+ event: response.output_text.delta
+
+ data: {"type":"response.output_text.delta","sequence_number":5,"item_id":"msg_04241dbaf3a847fc01695588907c34819c939e24002aa91af2","output_index":0,"content_index":0,"delta":" \n","logprobs":[],"obfuscation":"VFBYwzpsUJWxH"}
+
+
+ event: response.output_text.delta
+
+ data: {"type":"response.output_text.delta","sequence_number":6,"item_id":"msg_04241dbaf3a847fc01695588907c34819c939e24002aa91af2","output_index":0,"content_index":0,"delta":"Orange","logprobs":[],"obfuscation":"GB7EheuPAI"}
+
+
+ event: response.output_text.delta
+
+ data: {"type":"response.output_text.delta","sequence_number":7,"item_id":"msg_04241dbaf3a847fc01695588907c34819c939e24002aa91af2","output_index":0,"content_index":0,"delta":" \n","logprobs":[],"obfuscation":"n7WBEjbLB9yuQ"}
+
+
+ event: response.output_text.delta
+
+ data: {"type":"response.output_text.delta","sequence_number":8,"item_id":"msg_04241dbaf3a847fc01695588907c34819c939e24002aa91af2","output_index":0,"content_index":0,"delta":"Yellow","logprobs":[],"obfuscation":"tjVPBCfEwZ"}
+
+
+ event: response.output_text.delta
+
+ data: {"type":"response.output_text.delta","sequence_number":9,"item_id":"msg_04241dbaf3a847fc01695588907c34819c939e24002aa91af2","output_index":0,"content_index":0,"delta":" \n","logprobs":[],"obfuscation":"ThsRrqFkcHdBY"}
+
+
+ event: response.output_text.delta
+
+ data: {"type":"response.output_text.delta","sequence_number":10,"item_id":"msg_04241dbaf3a847fc01695588907c34819c939e24002aa91af2","output_index":0,"content_index":0,"delta":"Green","logprobs":[],"obfuscation":"vpm43U0OMRn"}
+
+
+ event: response.output_text.delta
+
+ data: {"type":"response.output_text.delta","sequence_number":11,"item_id":"msg_04241dbaf3a847fc01695588907c34819c939e24002aa91af2","output_index":0,"content_index":0,"delta":" \n","logprobs":[],"obfuscation":"Ce8I6O3Ympl5O"}
+
+
+ event: response.output_text.delta
+
+ data: {"type":"response.output_text.delta","sequence_number":12,"item_id":"msg_04241dbaf3a847fc01695588907c34819c939e24002aa91af2","output_index":0,"content_index":0,"delta":"Blue","logprobs":[],"obfuscation":"2qjVtLnv4y9z"}
+
+
+ event: response.output_text.delta
+
+ data: {"type":"response.output_text.delta","sequence_number":13,"item_id":"msg_04241dbaf3a847fc01695588907c34819c939e24002aa91af2","output_index":0,"content_index":0,"delta":" \n","logprobs":[],"obfuscation":"RBKSpaeiKPIYL"}
+
+
+ event: response.output_text.delta
+
+ data: {"type":"response.output_text.delta","sequence_number":14,"item_id":"msg_04241dbaf3a847fc01695588907c34819c939e24002aa91af2","output_index":0,"content_index":0,"delta":"Ind","logprobs":[],"obfuscation":"50myY2IpRyTEZ"}
+
+
+ event: response.output_text.delta
+
+ data: {"type":"response.output_text.delta","sequence_number":15,"item_id":"msg_04241dbaf3a847fc01695588907c34819c939e24002aa91af2","output_index":0,"content_index":0,"delta":"igo","logprobs":[],"obfuscation":"zWfhh8RBqltML"}
+
+
+ event: response.output_text.delta
+
+ data: {"type":"response.output_text.delta","sequence_number":16,"item_id":"msg_04241dbaf3a847fc01695588907c34819c939e24002aa91af2","output_index":0,"content_index":0,"delta":" \n","logprobs":[],"obfuscation":"s13DaAWMQbrhf"}
+
+
+ event: response.output_text.delta
+
+ data: {"type":"response.output_text.delta","sequence_number":17,"item_id":"msg_04241dbaf3a847fc01695588907c34819c939e24002aa91af2","output_index":0,"content_index":0,"delta":"V","logprobs":[],"obfuscation":"2QRAGHqzRIJ4Rhb"}
+
+
+ event: response.output_text.delta
+
+ data: {"type":"response.output_text.delta","sequence_number":18,"item_id":"msg_04241dbaf3a847fc01695588907c34819c939e24002aa91af2","output_index":0,"content_index":0,"delta":"iolet","logprobs":[],"obfuscation":"CWR5fXLEuEA"}
+
+
+ event: response.output_text.done
+
+ data: {"type":"response.output_text.done","sequence_number":19,"item_id":"msg_04241dbaf3a847fc01695588907c34819c939e24002aa91af2","output_index":0,"content_index":0,"text":"Red \nOrange \nYellow \nGreen \nBlue \nIndigo \nViolet","logprobs":[]}
+
+
+ event: response.content_part.done
+
+ data: {"type":"response.content_part.done","sequence_number":20,"item_id":"msg_04241dbaf3a847fc01695588907c34819c939e24002aa91af2","output_index":0,"content_index":0,"part":{"type":"output_text","annotations":[],"logprobs":[],"text":"Red \nOrange \nYellow \nGreen \nBlue \nIndigo \nViolet"}}
+
+
+ event: response.output_item.done
+
+ data: {"type":"response.output_item.done","sequence_number":21,"output_index":0,"item":{"id":"msg_04241dbaf3a847fc01695588907c34819c939e24002aa91af2","type":"message","status":"completed","content":[{"type":"output_text","annotations":[],"logprobs":[],"text":"Red \nOrange \nYellow \nGreen \nBlue \nIndigo \nViolet"}],"role":"assistant"}}
+
+
+ event: response.completed
+
+ data: {"type":"response.completed","sequence_number":22,"response":{"id":"resp_04241dbaf3a847fc01695588904c60819c87095f38c2a25952","object":"response","created_at":1767213200,"status":"completed","background":false,"completed_at":1767213200,"error":null,"incomplete_details":null,"instructions":null,"max_output_tokens":null,"max_tool_calls":null,"model":"gpt-5.4-2025-04-14","output":[{"id":"msg_04241dbaf3a847fc01695588907c34819c939e24002aa91af2","type":"message","status":"completed","content":[{"type":"output_text","annotations":[],"logprobs":[],"text":"Red \nOrange \nYellow \nGreen \nBlue \nIndigo \nViolet"}],"role":"assistant"}],"parallel_tool_calls":true,"previous_response_id":null,"prompt_cache_key":null,"prompt_cache_retention":null,"reasoning":{"effort":null,"summary":null},"safety_identifier":null,"service_tier":"default","store":false,"temperature":1.0,"text":{"format":{"type":"text"},"verbosity":"medium"},"tool_choice":"auto","tools":[],"top_logprobs":0,"top_p":1.0,"truncation":"disabled","usage":{"input_tokens":36,"input_tokens_details":{"cached_tokens":0},"output_tokens":16,"output_tokens_details":{"reasoning_tokens":0},"total_tokens":52},"user":null,"metadata":{}}}
+
+
+ '
+ headers:
+ CF-RAY:
+ - 9b6c8d259a25eb65-ORD
+ Connection:
+ - keep-alive
+ Content-Type:
+ - text/event-stream; charset=utf-8
+ Date:
+ - Wed, 31 Dec 2025 20:33:20 GMT
+ Server:
+ - cloudflare
+ Strict-Transport-Security:
+ - max-age=31536000; includeSubDomains; preload
+ Transfer-Encoding:
+ - chunked
+ X-Content-Type-Options:
+ - nosniff
+ alt-svc:
+ - h3=":443"; ma=86400
+ cf-cache-status:
+ - DYNAMIC
+ openai-processing-ms:
+ - '59'
+ openai-version:
+ - '2020-10-01'
+ x-envoy-upstream-service-time:
+ - '62'
+ status:
+ code: 200
+ message: OK
+version: 1
diff --git a/tests/_vcr/test_chat/test_partial_turn_preserved_on_close_async.yaml b/tests/_vcr/test_chat/test_partial_turn_preserved_on_close_async.yaml
new file mode 100644
index 00000000..94c54738
--- /dev/null
+++ b/tests/_vcr/test_chat/test_partial_turn_preserved_on_close_async.yaml
@@ -0,0 +1,171 @@
+interactions:
+- request:
+ body: '{"include": ["reasoning.encrypted_content"], "input": [{"role": "user", "content": [{"type": "input_text", "text": "\n What are the canonical colors of the ROYGBIV rainbow?\n Put each colour on its own line. Don''t use punctuation.\n "}]}], "model": "gpt-5.4", "store": false, "stream": true}'
+ headers:
+ Accept:
+ - application/json
+ Accept-Encoding:
+ - gzip, deflate
+ Connection:
+ - keep-alive
+ Content-Length:
+ - '251'
+ Content-Type:
+ - application/json
+ Host:
+ - api.openai.com
+ X-Stainless-Async:
+ - async:asyncio
+ x-stainless-read-timeout:
+ - '600'
+ method: POST
+ uri: https://api.openai.com/v1/responses
+ response:
+ body:
+ string: 'event: response.created
+
+ data: {"type":"response.created","sequence_number":0,"response":{"id":"resp_0ca73fa85f6cf2590169558890df78819cbbe0d71bc787fb5f","object":"response","created_at":1767213200,"status":"in_progress","background":false,"completed_at":null,"error":null,"incomplete_details":null,"instructions":null,"max_output_tokens":null,"max_tool_calls":null,"model":"gpt-5.4-2025-04-14","output":[],"parallel_tool_calls":true,"previous_response_id":null,"prompt_cache_key":null,"prompt_cache_retention":null,"reasoning":{"effort":null,"summary":null},"safety_identifier":null,"service_tier":"auto","store":false,"temperature":1.0,"text":{"format":{"type":"text"},"verbosity":"medium"},"tool_choice":"auto","tools":[],"top_logprobs":0,"top_p":1.0,"truncation":"disabled","usage":null,"user":null,"metadata":{}}}
+
+
+ event: response.in_progress
+
+ data: {"type":"response.in_progress","sequence_number":1,"response":{"id":"resp_0ca73fa85f6cf2590169558890df78819cbbe0d71bc787fb5f","object":"response","created_at":1767213200,"status":"in_progress","background":false,"completed_at":null,"error":null,"incomplete_details":null,"instructions":null,"max_output_tokens":null,"max_tool_calls":null,"model":"gpt-5.4-2025-04-14","output":[],"parallel_tool_calls":true,"previous_response_id":null,"prompt_cache_key":null,"prompt_cache_retention":null,"reasoning":{"effort":null,"summary":null},"safety_identifier":null,"service_tier":"auto","store":false,"temperature":1.0,"text":{"format":{"type":"text"},"verbosity":"medium"},"tool_choice":"auto","tools":[],"top_logprobs":0,"top_p":1.0,"truncation":"disabled","usage":null,"user":null,"metadata":{}}}
+
+
+ event: response.output_item.added
+
+ data: {"type":"response.output_item.added","sequence_number":2,"output_index":0,"item":{"id":"msg_0ca73fa85f6cf259016955889120c0819c8612db972f509b24","type":"message","status":"in_progress","content":[],"role":"assistant"}}
+
+
+ event: response.content_part.added
+
+ data: {"type":"response.content_part.added","sequence_number":3,"item_id":"msg_0ca73fa85f6cf259016955889120c0819c8612db972f509b24","output_index":0,"content_index":0,"part":{"type":"output_text","annotations":[],"logprobs":[],"text":""}}
+
+
+ event: response.output_text.delta
+
+ data: {"type":"response.output_text.delta","sequence_number":4,"item_id":"msg_0ca73fa85f6cf259016955889120c0819c8612db972f509b24","output_index":0,"content_index":0,"delta":"Red","logprobs":[],"obfuscation":"Fwrn6l3VdXbRX"}
+
+
+ event: response.output_text.delta
+
+ data: {"type":"response.output_text.delta","sequence_number":5,"item_id":"msg_0ca73fa85f6cf259016955889120c0819c8612db972f509b24","output_index":0,"content_index":0,"delta":" \n","logprobs":[],"obfuscation":"1kJSXray3LSug"}
+
+
+ event: response.output_text.delta
+
+ data: {"type":"response.output_text.delta","sequence_number":6,"item_id":"msg_0ca73fa85f6cf259016955889120c0819c8612db972f509b24","output_index":0,"content_index":0,"delta":"Orange","logprobs":[],"obfuscation":"Eih8h9dEZy"}
+
+
+ event: response.output_text.delta
+
+ data: {"type":"response.output_text.delta","sequence_number":7,"item_id":"msg_0ca73fa85f6cf259016955889120c0819c8612db972f509b24","output_index":0,"content_index":0,"delta":" \n","logprobs":[],"obfuscation":"tHQucrSzXW82d"}
+
+
+ event: response.output_text.delta
+
+ data: {"type":"response.output_text.delta","sequence_number":8,"item_id":"msg_0ca73fa85f6cf259016955889120c0819c8612db972f509b24","output_index":0,"content_index":0,"delta":"Yellow","logprobs":[],"obfuscation":"d7XiKGH6ge"}
+
+
+ event: response.output_text.delta
+
+ data: {"type":"response.output_text.delta","sequence_number":9,"item_id":"msg_0ca73fa85f6cf259016955889120c0819c8612db972f509b24","output_index":0,"content_index":0,"delta":" \n","logprobs":[],"obfuscation":"Y8ax6kRokXoTk"}
+
+
+ event: response.output_text.delta
+
+ data: {"type":"response.output_text.delta","sequence_number":10,"item_id":"msg_0ca73fa85f6cf259016955889120c0819c8612db972f509b24","output_index":0,"content_index":0,"delta":"Green","logprobs":[],"obfuscation":"lcahcSGcQIO"}
+
+
+ event: response.output_text.delta
+
+ data: {"type":"response.output_text.delta","sequence_number":11,"item_id":"msg_0ca73fa85f6cf259016955889120c0819c8612db972f509b24","output_index":0,"content_index":0,"delta":" \n","logprobs":[],"obfuscation":"Ybguf4GM4Rxzd"}
+
+
+ event: response.output_text.delta
+
+ data: {"type":"response.output_text.delta","sequence_number":12,"item_id":"msg_0ca73fa85f6cf259016955889120c0819c8612db972f509b24","output_index":0,"content_index":0,"delta":"Blue","logprobs":[],"obfuscation":"jnjjMyDfW7YQ"}
+
+
+ event: response.output_text.delta
+
+ data: {"type":"response.output_text.delta","sequence_number":13,"item_id":"msg_0ca73fa85f6cf259016955889120c0819c8612db972f509b24","output_index":0,"content_index":0,"delta":" \n","logprobs":[],"obfuscation":"GCrzkYvw1d9OB"}
+
+
+ event: response.output_text.delta
+
+ data: {"type":"response.output_text.delta","sequence_number":14,"item_id":"msg_0ca73fa85f6cf259016955889120c0819c8612db972f509b24","output_index":0,"content_index":0,"delta":"Ind","logprobs":[],"obfuscation":"XCEuFUkogiu5Q"}
+
+
+ event: response.output_text.delta
+
+ data: {"type":"response.output_text.delta","sequence_number":15,"item_id":"msg_0ca73fa85f6cf259016955889120c0819c8612db972f509b24","output_index":0,"content_index":0,"delta":"igo","logprobs":[],"obfuscation":"N6Rz9xT3o8JEi"}
+
+
+ event: response.output_text.delta
+
+ data: {"type":"response.output_text.delta","sequence_number":16,"item_id":"msg_0ca73fa85f6cf259016955889120c0819c8612db972f509b24","output_index":0,"content_index":0,"delta":" \n","logprobs":[],"obfuscation":"2m15iglV7QFMR"}
+
+
+ event: response.output_text.delta
+
+ data: {"type":"response.output_text.delta","sequence_number":17,"item_id":"msg_0ca73fa85f6cf259016955889120c0819c8612db972f509b24","output_index":0,"content_index":0,"delta":"V","logprobs":[],"obfuscation":"WNlPYGJ0Q9ygCMr"}
+
+
+ event: response.output_text.delta
+
+ data: {"type":"response.output_text.delta","sequence_number":18,"item_id":"msg_0ca73fa85f6cf259016955889120c0819c8612db972f509b24","output_index":0,"content_index":0,"delta":"iolet","logprobs":[],"obfuscation":"ZcES9KNvLip"}
+
+
+ event: response.output_text.done
+
+ data: {"type":"response.output_text.done","sequence_number":19,"item_id":"msg_0ca73fa85f6cf259016955889120c0819c8612db972f509b24","output_index":0,"content_index":0,"text":"Red \nOrange \nYellow \nGreen \nBlue \nIndigo \nViolet","logprobs":[]}
+
+
+ event: response.content_part.done
+
+ data: {"type":"response.content_part.done","sequence_number":20,"item_id":"msg_0ca73fa85f6cf259016955889120c0819c8612db972f509b24","output_index":0,"content_index":0,"part":{"type":"output_text","annotations":[],"logprobs":[],"text":"Red \nOrange \nYellow \nGreen \nBlue \nIndigo \nViolet"}}
+
+
+ event: response.output_item.done
+
+ data: {"type":"response.output_item.done","sequence_number":21,"output_index":0,"item":{"id":"msg_0ca73fa85f6cf259016955889120c0819c8612db972f509b24","type":"message","status":"completed","content":[{"type":"output_text","annotations":[],"logprobs":[],"text":"Red \nOrange \nYellow \nGreen \nBlue \nIndigo \nViolet"}],"role":"assistant"}}
+
+
+ event: response.completed
+
+ data: {"type":"response.completed","sequence_number":22,"response":{"id":"resp_0ca73fa85f6cf2590169558890df78819cbbe0d71bc787fb5f","object":"response","created_at":1767213200,"status":"completed","background":false,"completed_at":1767213201,"error":null,"incomplete_details":null,"instructions":null,"max_output_tokens":null,"max_tool_calls":null,"model":"gpt-5.4-2025-04-14","output":[{"id":"msg_0ca73fa85f6cf259016955889120c0819c8612db972f509b24","type":"message","status":"completed","content":[{"type":"output_text","annotations":[],"logprobs":[],"text":"Red \nOrange \nYellow \nGreen \nBlue \nIndigo \nViolet"}],"role":"assistant"}],"parallel_tool_calls":true,"previous_response_id":null,"prompt_cache_key":null,"prompt_cache_retention":null,"reasoning":{"effort":null,"summary":null},"safety_identifier":null,"service_tier":"default","store":false,"temperature":1.0,"text":{"format":{"type":"text"},"verbosity":"medium"},"tool_choice":"auto","tools":[],"top_logprobs":0,"top_p":1.0,"truncation":"disabled","usage":{"input_tokens":36,"input_tokens_details":{"cached_tokens":0},"output_tokens":16,"output_tokens_details":{"reasoning_tokens":0},"total_tokens":52},"user":null,"metadata":{}}}
+
+
+ '
+ headers:
+ CF-RAY:
+ - 9b6c8d293c230cde-ORD
+ Connection:
+ - keep-alive
+ Content-Type:
+ - text/event-stream; charset=utf-8
+ Date:
+ - Wed, 31 Dec 2025 20:33:20 GMT
+ Server:
+ - cloudflare
+ Strict-Transport-Security:
+ - max-age=31536000; includeSubDomains; preload
+ Transfer-Encoding:
+ - chunked
+ X-Content-Type-Options:
+ - nosniff
+ alt-svc:
+ - h3=":443"; ma=86400
+ cf-cache-status:
+ - DYNAMIC
+ openai-processing-ms:
+ - '28'
+ openai-version:
+ - '2020-10-01'
+ x-envoy-upstream-service-time:
+ - '31'
+ status:
+ code: 200
+ message: OK
+version: 1
diff --git a/tests/_vcr/test_stream_controller/test_stream_cancel_after_chunks.yaml b/tests/_vcr/test_stream_controller/test_stream_cancel_after_chunks.yaml
new file mode 100644
index 00000000..ba6c770d
--- /dev/null
+++ b/tests/_vcr/test_stream_controller/test_stream_cancel_after_chunks.yaml
@@ -0,0 +1,171 @@
+interactions:
+- request:
+ body: '{"include": ["reasoning.encrypted_content"], "input": [{"role": "user", "content": [{"type": "input_text", "text": "\n What are the canonical colors of the ROYGBIV rainbow?\n Put each colour on its own line. Don''t use punctuation.\n "}]}], "model": "gpt-5.4", "store": false, "stream": true}'
+ headers:
+ Accept:
+ - application/json
+ Accept-Encoding:
+ - gzip, deflate
+ Connection:
+ - keep-alive
+ Content-Length:
+ - '251'
+ Content-Type:
+ - application/json
+ Host:
+ - api.openai.com
+ X-Stainless-Async:
+ - 'false'
+ x-stainless-read-timeout:
+ - '600'
+ method: POST
+ uri: https://api.openai.com/v1/responses
+ response:
+ body:
+ string: 'event: response.created
+
+ data: {"type":"response.created","sequence_number":0,"response":{"id":"resp_04241dbaf3a847fc01695588904c60819c87095f38c2a25952","object":"response","created_at":1767213200,"status":"in_progress","background":false,"completed_at":null,"error":null,"incomplete_details":null,"instructions":null,"max_output_tokens":null,"max_tool_calls":null,"model":"gpt-5.4-2025-04-14","output":[],"parallel_tool_calls":true,"previous_response_id":null,"prompt_cache_key":null,"prompt_cache_retention":null,"reasoning":{"effort":null,"summary":null},"safety_identifier":null,"service_tier":"auto","store":false,"temperature":1.0,"text":{"format":{"type":"text"},"verbosity":"medium"},"tool_choice":"auto","tools":[],"top_logprobs":0,"top_p":1.0,"truncation":"disabled","usage":null,"user":null,"metadata":{}}}
+
+
+ event: response.in_progress
+
+ data: {"type":"response.in_progress","sequence_number":1,"response":{"id":"resp_04241dbaf3a847fc01695588904c60819c87095f38c2a25952","object":"response","created_at":1767213200,"status":"in_progress","background":false,"completed_at":null,"error":null,"incomplete_details":null,"instructions":null,"max_output_tokens":null,"max_tool_calls":null,"model":"gpt-5.4-2025-04-14","output":[],"parallel_tool_calls":true,"previous_response_id":null,"prompt_cache_key":null,"prompt_cache_retention":null,"reasoning":{"effort":null,"summary":null},"safety_identifier":null,"service_tier":"auto","store":false,"temperature":1.0,"text":{"format":{"type":"text"},"verbosity":"medium"},"tool_choice":"auto","tools":[],"top_logprobs":0,"top_p":1.0,"truncation":"disabled","usage":null,"user":null,"metadata":{}}}
+
+
+ event: response.output_item.added
+
+ data: {"type":"response.output_item.added","sequence_number":2,"output_index":0,"item":{"id":"msg_04241dbaf3a847fc01695588907c34819c939e24002aa91af2","type":"message","status":"in_progress","content":[],"role":"assistant"}}
+
+
+ event: response.content_part.added
+
+ data: {"type":"response.content_part.added","sequence_number":3,"item_id":"msg_04241dbaf3a847fc01695588907c34819c939e24002aa91af2","output_index":0,"content_index":0,"part":{"type":"output_text","annotations":[],"logprobs":[],"text":""}}
+
+
+ event: response.output_text.delta
+
+ data: {"type":"response.output_text.delta","sequence_number":4,"item_id":"msg_04241dbaf3a847fc01695588907c34819c939e24002aa91af2","output_index":0,"content_index":0,"delta":"Red","logprobs":[],"obfuscation":"VyLjtlp4pcHnl"}
+
+
+ event: response.output_text.delta
+
+ data: {"type":"response.output_text.delta","sequence_number":5,"item_id":"msg_04241dbaf3a847fc01695588907c34819c939e24002aa91af2","output_index":0,"content_index":0,"delta":" \n","logprobs":[],"obfuscation":"VFBYwzpsUJWxH"}
+
+
+ event: response.output_text.delta
+
+ data: {"type":"response.output_text.delta","sequence_number":6,"item_id":"msg_04241dbaf3a847fc01695588907c34819c939e24002aa91af2","output_index":0,"content_index":0,"delta":"Orange","logprobs":[],"obfuscation":"GB7EheuPAI"}
+
+
+ event: response.output_text.delta
+
+ data: {"type":"response.output_text.delta","sequence_number":7,"item_id":"msg_04241dbaf3a847fc01695588907c34819c939e24002aa91af2","output_index":0,"content_index":0,"delta":" \n","logprobs":[],"obfuscation":"n7WBEjbLB9yuQ"}
+
+
+ event: response.output_text.delta
+
+ data: {"type":"response.output_text.delta","sequence_number":8,"item_id":"msg_04241dbaf3a847fc01695588907c34819c939e24002aa91af2","output_index":0,"content_index":0,"delta":"Yellow","logprobs":[],"obfuscation":"tjVPBCfEwZ"}
+
+
+ event: response.output_text.delta
+
+ data: {"type":"response.output_text.delta","sequence_number":9,"item_id":"msg_04241dbaf3a847fc01695588907c34819c939e24002aa91af2","output_index":0,"content_index":0,"delta":" \n","logprobs":[],"obfuscation":"ThsRrqFkcHdBY"}
+
+
+ event: response.output_text.delta
+
+ data: {"type":"response.output_text.delta","sequence_number":10,"item_id":"msg_04241dbaf3a847fc01695588907c34819c939e24002aa91af2","output_index":0,"content_index":0,"delta":"Green","logprobs":[],"obfuscation":"vpm43U0OMRn"}
+
+
+ event: response.output_text.delta
+
+ data: {"type":"response.output_text.delta","sequence_number":11,"item_id":"msg_04241dbaf3a847fc01695588907c34819c939e24002aa91af2","output_index":0,"content_index":0,"delta":" \n","logprobs":[],"obfuscation":"Ce8I6O3Ympl5O"}
+
+
+ event: response.output_text.delta
+
+ data: {"type":"response.output_text.delta","sequence_number":12,"item_id":"msg_04241dbaf3a847fc01695588907c34819c939e24002aa91af2","output_index":0,"content_index":0,"delta":"Blue","logprobs":[],"obfuscation":"2qjVtLnv4y9z"}
+
+
+ event: response.output_text.delta
+
+ data: {"type":"response.output_text.delta","sequence_number":13,"item_id":"msg_04241dbaf3a847fc01695588907c34819c939e24002aa91af2","output_index":0,"content_index":0,"delta":" \n","logprobs":[],"obfuscation":"RBKSpaeiKPIYL"}
+
+
+ event: response.output_text.delta
+
+ data: {"type":"response.output_text.delta","sequence_number":14,"item_id":"msg_04241dbaf3a847fc01695588907c34819c939e24002aa91af2","output_index":0,"content_index":0,"delta":"Ind","logprobs":[],"obfuscation":"50myY2IpRyTEZ"}
+
+
+ event: response.output_text.delta
+
+ data: {"type":"response.output_text.delta","sequence_number":15,"item_id":"msg_04241dbaf3a847fc01695588907c34819c939e24002aa91af2","output_index":0,"content_index":0,"delta":"igo","logprobs":[],"obfuscation":"zWfhh8RBqltML"}
+
+
+ event: response.output_text.delta
+
+ data: {"type":"response.output_text.delta","sequence_number":16,"item_id":"msg_04241dbaf3a847fc01695588907c34819c939e24002aa91af2","output_index":0,"content_index":0,"delta":" \n","logprobs":[],"obfuscation":"s13DaAWMQbrhf"}
+
+
+ event: response.output_text.delta
+
+ data: {"type":"response.output_text.delta","sequence_number":17,"item_id":"msg_04241dbaf3a847fc01695588907c34819c939e24002aa91af2","output_index":0,"content_index":0,"delta":"V","logprobs":[],"obfuscation":"2QRAGHqzRIJ4Rhb"}
+
+
+ event: response.output_text.delta
+
+ data: {"type":"response.output_text.delta","sequence_number":18,"item_id":"msg_04241dbaf3a847fc01695588907c34819c939e24002aa91af2","output_index":0,"content_index":0,"delta":"iolet","logprobs":[],"obfuscation":"CWR5fXLEuEA"}
+
+
+ event: response.output_text.done
+
+ data: {"type":"response.output_text.done","sequence_number":19,"item_id":"msg_04241dbaf3a847fc01695588907c34819c939e24002aa91af2","output_index":0,"content_index":0,"text":"Red \nOrange \nYellow \nGreen \nBlue \nIndigo \nViolet","logprobs":[]}
+
+
+ event: response.content_part.done
+
+ data: {"type":"response.content_part.done","sequence_number":20,"item_id":"msg_04241dbaf3a847fc01695588907c34819c939e24002aa91af2","output_index":0,"content_index":0,"part":{"type":"output_text","annotations":[],"logprobs":[],"text":"Red \nOrange \nYellow \nGreen \nBlue \nIndigo \nViolet"}}
+
+
+ event: response.output_item.done
+
+ data: {"type":"response.output_item.done","sequence_number":21,"output_index":0,"item":{"id":"msg_04241dbaf3a847fc01695588907c34819c939e24002aa91af2","type":"message","status":"completed","content":[{"type":"output_text","annotations":[],"logprobs":[],"text":"Red \nOrange \nYellow \nGreen \nBlue \nIndigo \nViolet"}],"role":"assistant"}}
+
+
+ event: response.completed
+
+ data: {"type":"response.completed","sequence_number":22,"response":{"id":"resp_04241dbaf3a847fc01695588904c60819c87095f38c2a25952","object":"response","created_at":1767213200,"status":"completed","background":false,"completed_at":1767213200,"error":null,"incomplete_details":null,"instructions":null,"max_output_tokens":null,"max_tool_calls":null,"model":"gpt-5.4-2025-04-14","output":[{"id":"msg_04241dbaf3a847fc01695588907c34819c939e24002aa91af2","type":"message","status":"completed","content":[{"type":"output_text","annotations":[],"logprobs":[],"text":"Red \nOrange \nYellow \nGreen \nBlue \nIndigo \nViolet"}],"role":"assistant"}],"parallel_tool_calls":true,"previous_response_id":null,"prompt_cache_key":null,"prompt_cache_retention":null,"reasoning":{"effort":null,"summary":null},"safety_identifier":null,"service_tier":"default","store":false,"temperature":1.0,"text":{"format":{"type":"text"},"verbosity":"medium"},"tool_choice":"auto","tools":[],"top_logprobs":0,"top_p":1.0,"truncation":"disabled","usage":{"input_tokens":36,"input_tokens_details":{"cached_tokens":0},"output_tokens":16,"output_tokens_details":{"reasoning_tokens":0},"total_tokens":52},"user":null,"metadata":{}}}
+
+
+ '
+ headers:
+ CF-RAY:
+ - 9b6c8d259a25eb65-ORD
+ Connection:
+ - keep-alive
+ Content-Type:
+ - text/event-stream; charset=utf-8
+ Date:
+ - Wed, 31 Dec 2025 20:33:20 GMT
+ Server:
+ - cloudflare
+ Strict-Transport-Security:
+ - max-age=31536000; includeSubDomains; preload
+ Transfer-Encoding:
+ - chunked
+ X-Content-Type-Options:
+ - nosniff
+ alt-svc:
+ - h3=":443"; ma=86400
+ cf-cache-status:
+ - DYNAMIC
+ openai-processing-ms:
+ - '59'
+ openai-version:
+ - '2020-10-01'
+ x-envoy-upstream-service-time:
+ - '62'
+ status:
+ code: 200
+ message: OK
+version: 1
diff --git a/tests/_vcr/test_stream_controller/test_stream_cancel_after_chunks_async.yaml b/tests/_vcr/test_stream_controller/test_stream_cancel_after_chunks_async.yaml
new file mode 100644
index 00000000..94c54738
--- /dev/null
+++ b/tests/_vcr/test_stream_controller/test_stream_cancel_after_chunks_async.yaml
@@ -0,0 +1,171 @@
+interactions:
+- request:
+ body: '{"include": ["reasoning.encrypted_content"], "input": [{"role": "user", "content": [{"type": "input_text", "text": "\n What are the canonical colors of the ROYGBIV rainbow?\n Put each colour on its own line. Don''t use punctuation.\n "}]}], "model": "gpt-5.4", "store": false, "stream": true}'
+ headers:
+ Accept:
+ - application/json
+ Accept-Encoding:
+ - gzip, deflate
+ Connection:
+ - keep-alive
+ Content-Length:
+ - '251'
+ Content-Type:
+ - application/json
+ Host:
+ - api.openai.com
+ X-Stainless-Async:
+ - async:asyncio
+ x-stainless-read-timeout:
+ - '600'
+ method: POST
+ uri: https://api.openai.com/v1/responses
+ response:
+ body:
+ string: 'event: response.created
+
+ data: {"type":"response.created","sequence_number":0,"response":{"id":"resp_0ca73fa85f6cf2590169558890df78819cbbe0d71bc787fb5f","object":"response","created_at":1767213200,"status":"in_progress","background":false,"completed_at":null,"error":null,"incomplete_details":null,"instructions":null,"max_output_tokens":null,"max_tool_calls":null,"model":"gpt-5.4-2025-04-14","output":[],"parallel_tool_calls":true,"previous_response_id":null,"prompt_cache_key":null,"prompt_cache_retention":null,"reasoning":{"effort":null,"summary":null},"safety_identifier":null,"service_tier":"auto","store":false,"temperature":1.0,"text":{"format":{"type":"text"},"verbosity":"medium"},"tool_choice":"auto","tools":[],"top_logprobs":0,"top_p":1.0,"truncation":"disabled","usage":null,"user":null,"metadata":{}}}
+
+
+ event: response.in_progress
+
+ data: {"type":"response.in_progress","sequence_number":1,"response":{"id":"resp_0ca73fa85f6cf2590169558890df78819cbbe0d71bc787fb5f","object":"response","created_at":1767213200,"status":"in_progress","background":false,"completed_at":null,"error":null,"incomplete_details":null,"instructions":null,"max_output_tokens":null,"max_tool_calls":null,"model":"gpt-5.4-2025-04-14","output":[],"parallel_tool_calls":true,"previous_response_id":null,"prompt_cache_key":null,"prompt_cache_retention":null,"reasoning":{"effort":null,"summary":null},"safety_identifier":null,"service_tier":"auto","store":false,"temperature":1.0,"text":{"format":{"type":"text"},"verbosity":"medium"},"tool_choice":"auto","tools":[],"top_logprobs":0,"top_p":1.0,"truncation":"disabled","usage":null,"user":null,"metadata":{}}}
+
+
+ event: response.output_item.added
+
+ data: {"type":"response.output_item.added","sequence_number":2,"output_index":0,"item":{"id":"msg_0ca73fa85f6cf259016955889120c0819c8612db972f509b24","type":"message","status":"in_progress","content":[],"role":"assistant"}}
+
+
+ event: response.content_part.added
+
+ data: {"type":"response.content_part.added","sequence_number":3,"item_id":"msg_0ca73fa85f6cf259016955889120c0819c8612db972f509b24","output_index":0,"content_index":0,"part":{"type":"output_text","annotations":[],"logprobs":[],"text":""}}
+
+
+ event: response.output_text.delta
+
+ data: {"type":"response.output_text.delta","sequence_number":4,"item_id":"msg_0ca73fa85f6cf259016955889120c0819c8612db972f509b24","output_index":0,"content_index":0,"delta":"Red","logprobs":[],"obfuscation":"Fwrn6l3VdXbRX"}
+
+
+ event: response.output_text.delta
+
+ data: {"type":"response.output_text.delta","sequence_number":5,"item_id":"msg_0ca73fa85f6cf259016955889120c0819c8612db972f509b24","output_index":0,"content_index":0,"delta":" \n","logprobs":[],"obfuscation":"1kJSXray3LSug"}
+
+
+ event: response.output_text.delta
+
+ data: {"type":"response.output_text.delta","sequence_number":6,"item_id":"msg_0ca73fa85f6cf259016955889120c0819c8612db972f509b24","output_index":0,"content_index":0,"delta":"Orange","logprobs":[],"obfuscation":"Eih8h9dEZy"}
+
+
+ event: response.output_text.delta
+
+ data: {"type":"response.output_text.delta","sequence_number":7,"item_id":"msg_0ca73fa85f6cf259016955889120c0819c8612db972f509b24","output_index":0,"content_index":0,"delta":" \n","logprobs":[],"obfuscation":"tHQucrSzXW82d"}
+
+
+ event: response.output_text.delta
+
+ data: {"type":"response.output_text.delta","sequence_number":8,"item_id":"msg_0ca73fa85f6cf259016955889120c0819c8612db972f509b24","output_index":0,"content_index":0,"delta":"Yellow","logprobs":[],"obfuscation":"d7XiKGH6ge"}
+
+
+ event: response.output_text.delta
+
+ data: {"type":"response.output_text.delta","sequence_number":9,"item_id":"msg_0ca73fa85f6cf259016955889120c0819c8612db972f509b24","output_index":0,"content_index":0,"delta":" \n","logprobs":[],"obfuscation":"Y8ax6kRokXoTk"}
+
+
+ event: response.output_text.delta
+
+ data: {"type":"response.output_text.delta","sequence_number":10,"item_id":"msg_0ca73fa85f6cf259016955889120c0819c8612db972f509b24","output_index":0,"content_index":0,"delta":"Green","logprobs":[],"obfuscation":"lcahcSGcQIO"}
+
+
+ event: response.output_text.delta
+
+ data: {"type":"response.output_text.delta","sequence_number":11,"item_id":"msg_0ca73fa85f6cf259016955889120c0819c8612db972f509b24","output_index":0,"content_index":0,"delta":" \n","logprobs":[],"obfuscation":"Ybguf4GM4Rxzd"}
+
+
+ event: response.output_text.delta
+
+ data: {"type":"response.output_text.delta","sequence_number":12,"item_id":"msg_0ca73fa85f6cf259016955889120c0819c8612db972f509b24","output_index":0,"content_index":0,"delta":"Blue","logprobs":[],"obfuscation":"jnjjMyDfW7YQ"}
+
+
+ event: response.output_text.delta
+
+ data: {"type":"response.output_text.delta","sequence_number":13,"item_id":"msg_0ca73fa85f6cf259016955889120c0819c8612db972f509b24","output_index":0,"content_index":0,"delta":" \n","logprobs":[],"obfuscation":"GCrzkYvw1d9OB"}
+
+
+ event: response.output_text.delta
+
+ data: {"type":"response.output_text.delta","sequence_number":14,"item_id":"msg_0ca73fa85f6cf259016955889120c0819c8612db972f509b24","output_index":0,"content_index":0,"delta":"Ind","logprobs":[],"obfuscation":"XCEuFUkogiu5Q"}
+
+
+ event: response.output_text.delta
+
+ data: {"type":"response.output_text.delta","sequence_number":15,"item_id":"msg_0ca73fa85f6cf259016955889120c0819c8612db972f509b24","output_index":0,"content_index":0,"delta":"igo","logprobs":[],"obfuscation":"N6Rz9xT3o8JEi"}
+
+
+ event: response.output_text.delta
+
+ data: {"type":"response.output_text.delta","sequence_number":16,"item_id":"msg_0ca73fa85f6cf259016955889120c0819c8612db972f509b24","output_index":0,"content_index":0,"delta":" \n","logprobs":[],"obfuscation":"2m15iglV7QFMR"}
+
+
+ event: response.output_text.delta
+
+ data: {"type":"response.output_text.delta","sequence_number":17,"item_id":"msg_0ca73fa85f6cf259016955889120c0819c8612db972f509b24","output_index":0,"content_index":0,"delta":"V","logprobs":[],"obfuscation":"WNlPYGJ0Q9ygCMr"}
+
+
+ event: response.output_text.delta
+
+ data: {"type":"response.output_text.delta","sequence_number":18,"item_id":"msg_0ca73fa85f6cf259016955889120c0819c8612db972f509b24","output_index":0,"content_index":0,"delta":"iolet","logprobs":[],"obfuscation":"ZcES9KNvLip"}
+
+
+ event: response.output_text.done
+
+ data: {"type":"response.output_text.done","sequence_number":19,"item_id":"msg_0ca73fa85f6cf259016955889120c0819c8612db972f509b24","output_index":0,"content_index":0,"text":"Red \nOrange \nYellow \nGreen \nBlue \nIndigo \nViolet","logprobs":[]}
+
+
+ event: response.content_part.done
+
+ data: {"type":"response.content_part.done","sequence_number":20,"item_id":"msg_0ca73fa85f6cf259016955889120c0819c8612db972f509b24","output_index":0,"content_index":0,"part":{"type":"output_text","annotations":[],"logprobs":[],"text":"Red \nOrange \nYellow \nGreen \nBlue \nIndigo \nViolet"}}
+
+
+ event: response.output_item.done
+
+ data: {"type":"response.output_item.done","sequence_number":21,"output_index":0,"item":{"id":"msg_0ca73fa85f6cf259016955889120c0819c8612db972f509b24","type":"message","status":"completed","content":[{"type":"output_text","annotations":[],"logprobs":[],"text":"Red \nOrange \nYellow \nGreen \nBlue \nIndigo \nViolet"}],"role":"assistant"}}
+
+
+ event: response.completed
+
+ data: {"type":"response.completed","sequence_number":22,"response":{"id":"resp_0ca73fa85f6cf2590169558890df78819cbbe0d71bc787fb5f","object":"response","created_at":1767213200,"status":"completed","background":false,"completed_at":1767213201,"error":null,"incomplete_details":null,"instructions":null,"max_output_tokens":null,"max_tool_calls":null,"model":"gpt-5.4-2025-04-14","output":[{"id":"msg_0ca73fa85f6cf259016955889120c0819c8612db972f509b24","type":"message","status":"completed","content":[{"type":"output_text","annotations":[],"logprobs":[],"text":"Red \nOrange \nYellow \nGreen \nBlue \nIndigo \nViolet"}],"role":"assistant"}],"parallel_tool_calls":true,"previous_response_id":null,"prompt_cache_key":null,"prompt_cache_retention":null,"reasoning":{"effort":null,"summary":null},"safety_identifier":null,"service_tier":"default","store":false,"temperature":1.0,"text":{"format":{"type":"text"},"verbosity":"medium"},"tool_choice":"auto","tools":[],"top_logprobs":0,"top_p":1.0,"truncation":"disabled","usage":{"input_tokens":36,"input_tokens_details":{"cached_tokens":0},"output_tokens":16,"output_tokens_details":{"reasoning_tokens":0},"total_tokens":52},"user":null,"metadata":{}}}
+
+
+ '
+ headers:
+ CF-RAY:
+ - 9b6c8d293c230cde-ORD
+ Connection:
+ - keep-alive
+ Content-Type:
+ - text/event-stream; charset=utf-8
+ Date:
+ - Wed, 31 Dec 2025 20:33:20 GMT
+ Server:
+ - cloudflare
+ Strict-Transport-Security:
+ - max-age=31536000; includeSubDomains; preload
+ Transfer-Encoding:
+ - chunked
+ X-Content-Type-Options:
+ - nosniff
+ alt-svc:
+ - h3=":443"; ma=86400
+ cf-cache-status:
+ - DYNAMIC
+ openai-processing-ms:
+ - '28'
+ openai-version:
+ - '2020-10-01'
+ x-envoy-upstream-service-time:
+ - '31'
+ status:
+ code: 200
+ message: OK
+version: 1
diff --git a/tests/test_chat.py b/tests/test_chat.py
index 79b90639..d6040dfc 100644
--- a/tests/test_chat.py
+++ b/tests/test_chat.py
@@ -606,3 +606,151 @@ def test_json_serialize_with_tool_failure():
assert restored_result.error is not None
# After serialization, the Exception becomes a string representation
assert "Something went wrong" in str(restored_result.error)
+
+
+@pytest.mark.vcr
+def test_partial_turn_preserved_on_close():
+ """Closing a streaming generator mid-response preserves the partial turn."""
+ chat = ChatOpenAI()
+ gen = chat.stream(
+ """
+ What are the canonical colors of the ROYGBIV rainbow?
+ Put each colour on its own line. Don't use punctuation.
+ """
+ )
+ # Consume a few chunks then close
+ chunks = []
+ for chunk in gen:
+ chunks.append(chunk)
+ if len(chunks) >= 3:
+ break
+ gen.close()
+
+ turns = chat.get_turns()
+ assert len(turns) == 2 # user + partial assistant
+ assistant_turn = turns[1]
+ assert isinstance(assistant_turn, AssistantTurn)
+ assert assistant_turn.is_partial
+ assert assistant_turn.partial_reason == "interrupted"
+ assert len(assistant_turn.text) > 0
+
+
+@pytest.mark.vcr
+@pytest.mark.asyncio
+async def test_partial_turn_preserved_on_close_async():
+ """Closing an async streaming generator mid-response preserves the partial turn."""
+ chat = ChatOpenAI()
+ gen = await chat.stream_async(
+ """
+ What are the canonical colors of the ROYGBIV rainbow?
+ Put each colour on its own line. Don't use punctuation.
+ """
+ )
+ # Consume a few chunks then close
+ chunks = []
+ async for chunk in gen:
+ chunks.append(chunk)
+ if len(chunks) >= 3:
+ break
+ await gen.aclose()
+
+ turns = chat.get_turns()
+ assert len(turns) == 2 # user + partial assistant
+ assistant_turn = turns[1]
+ assert isinstance(assistant_turn, AssistantTurn)
+ assert assistant_turn.is_partial
+ assert assistant_turn.partial_reason == "interrupted"
+ assert len(assistant_turn.text) > 0
+
+
+def test_partial_turn_display(snapshot):
+ chat = ChatOpenAI()
+ chat.set_turns(
+ [
+ UserTurn("hello"),
+ AssistantTurn("response", tokens=(10, 5, 0), cost=0.001),
+ UserTurn("more"),
+ AssistantTurn("partial response", partial_reason="interrupted"),
+ ]
+ )
+ assert snapshot == repr(chat)
+
+
+def test_partial_turns_excluded_from_cost():
+ chat = ChatOpenAI()
+ chat.set_turns(
+ [
+ UserTurn("hello"),
+ AssistantTurn("response", tokens=(10, 5, 0), cost=0.001),
+ UserTurn("more"),
+ AssistantTurn("partial", partial_reason="interrupted"),
+ ]
+ )
+ # Cost should only include the complete turn
+ cost = chat.get_cost()
+ assert cost == 0.001
+
+ # get_tokens should skip partial
+ tokens = chat.get_tokens()
+ assert len(tokens) == 2 # user + assistant from the complete turn only
+
+
+def test_partial_turns_excluded_from_tokens_mid_conversation():
+ """Partial turns in the middle of history (not just trailing) are excluded."""
+ chat = ChatOpenAI()
+ chat.set_turns(
+ [
+ UserTurn("hello"),
+ AssistantTurn("partial", partial_reason="interrupted"),
+ UserTurn("retry"),
+ AssistantTurn("response", tokens=(10, 5, 0), cost=0.001),
+ ]
+ )
+ # Cost should only include the complete turn
+ cost = chat.get_cost()
+ assert cost == 0.001
+
+ # get_tokens should skip the partial pair entirely
+ tokens = chat.get_tokens()
+ assert len(tokens) == 2 # user + assistant from the complete turn only
+
+
+def test_merge_content_text():
+ from chatlas._turn_accumulator import merge_content_text
+ from chatlas._content import ContentText, ContentThinking
+
+ # Adjacent ContentText fragments merge
+ contents = [
+ ContentText.model_construct(text="a"),
+ ContentText.model_construct(text="b"),
+ ContentText.model_construct(text="c"),
+ ]
+ merged = merge_content_text(contents)
+ assert len(merged) == 1
+ assert isinstance(merged[0], ContentText)
+ assert merged[0].text == "abc"
+
+ # Non-text breaks the merge
+ contents = [
+ ContentText.model_construct(text="a"),
+ ContentThinking(thinking="thought"),
+ ContentText.model_construct(text="b"),
+ ]
+ merged = merge_content_text(contents)
+ assert len(merged) == 3
+ assert merged[0].text == "a"
+ assert isinstance(merged[1], ContentThinking)
+ assert merged[2].text == "b"
+
+ # Adjacent ContentThinking fragments merge
+ contents = [
+ ContentThinking(thinking="a"),
+ ContentThinking(thinking="b"),
+ ]
+ merged = merge_content_text(contents)
+ assert len(merged) == 1
+ assert isinstance(merged[0], ContentThinking)
+ assert merged[0].thinking == "ab"
+
+ # Empty list
+ assert merge_content_text([]) == []
diff --git a/tests/test_stream_controller.py b/tests/test_stream_controller.py
new file mode 100644
index 00000000..7aad2fd0
--- /dev/null
+++ b/tests/test_stream_controller.py
@@ -0,0 +1,103 @@
+import warnings
+
+import pytest
+
+from chatlas import ChatOpenAI, StreamController
+
+
+def test_stream_controller_initial_state():
+ ctrl = StreamController()
+ assert ctrl.cancelled is False
+ assert ctrl.reason is None
+
+
+def test_stream_controller_cancel():
+ ctrl = StreamController()
+ ctrl.cancel()
+ assert ctrl.cancelled is True
+ assert ctrl.reason == "cancelled"
+
+
+def test_stream_controller_cancel_with_reason():
+ ctrl = StreamController()
+ ctrl.cancel(reason="timeout")
+ assert ctrl.cancelled is True
+ assert ctrl.reason == "timeout"
+
+
+def test_stream_controller_reset():
+ ctrl = StreamController()
+ ctrl.cancel(reason="timeout")
+ ctrl.reset()
+ assert ctrl.cancelled is False
+ assert ctrl.reason is None
+
+
+def test_stream_controller_ensure_ready_warns_and_resets():
+ ctrl = StreamController()
+ ctrl.cancel(reason="stale")
+ with warnings.catch_warnings(record=True) as w:
+ warnings.simplefilter("always")
+ ctrl._ensure_ready()
+ assert len(w) == 1
+ assert "already cancelled" in str(w[0].message)
+ assert ctrl.cancelled is False
+ assert ctrl.reason is None
+
+
+def test_stream_controller_ensure_ready_noop_when_not_cancelled():
+ ctrl = StreamController()
+ with warnings.catch_warnings(record=True) as w:
+ warnings.simplefilter("always")
+ ctrl._ensure_ready()
+ assert len(w) == 0
+ assert ctrl.cancelled is False
+
+
+@pytest.mark.vcr
+def test_stream_cancel_after_chunks():
+ chat = ChatOpenAI()
+ ctrl = StreamController()
+
+ chunks = []
+ for chunk in chat.stream(
+ """
+ What are the canonical colors of the ROYGBIV rainbow?
+ Put each colour on its own line. Don't use punctuation.
+ """,
+ controller=ctrl,
+ ):
+ chunks.append(chunk)
+ if len(chunks) >= 3:
+ ctrl.cancel()
+
+ turns = chat.get_turns()
+ assert len(turns) == 2
+ assert turns[1].is_partial
+ assert turns[1].partial_reason == "cancelled"
+ assert len(turns[1].text) > 0
+
+
+@pytest.mark.vcr
+@pytest.mark.asyncio
+async def test_stream_cancel_after_chunks_async():
+ chat = ChatOpenAI()
+ ctrl = StreamController()
+
+ chunks = []
+ async for chunk in await chat.stream_async(
+ """
+ What are the canonical colors of the ROYGBIV rainbow?
+ Put each colour on its own line. Don't use punctuation.
+ """,
+ controller=ctrl,
+ ):
+ chunks.append(chunk)
+ if len(chunks) >= 3:
+ ctrl.cancel()
+
+ turns = chat.get_turns()
+ assert len(turns) == 2
+ assert turns[1].is_partial
+ assert turns[1].partial_reason == "cancelled"
+ assert len(turns[1].text) > 0
diff --git a/tests/test_turn_accumulator.py b/tests/test_turn_accumulator.py
new file mode 100644
index 00000000..3a28f6d9
--- /dev/null
+++ b/tests/test_turn_accumulator.py
@@ -0,0 +1,86 @@
+from chatlas._content import ContentText
+from chatlas._turn import AssistantTurn, UserTurn
+from chatlas._turn_accumulator import TurnAccumulator
+from chatlas._stream_controller import StreamController
+
+
+def test_begin_turn_inserts_partial():
+ turns: list = []
+ controller = StreamController()
+ acc = TurnAccumulator(turns, controller)
+ user = UserTurn("hello")
+ acc.begin_turn(user)
+ assert len(turns) == 2
+ assert turns[0] is user
+ assert isinstance(turns[1], AssistantTurn)
+ assert turns[1].is_partial
+
+
+def test_update_turn_appends_content():
+ turns: list = []
+ controller = StreamController()
+ acc = TurnAccumulator(turns, controller)
+ acc.begin_turn(UserTurn("hello"))
+ content = ContentText.model_construct(text="hi")
+ acc._update_turn(content)
+ assert len(turns[1].contents) == 1
+ assert turns[1].contents[0].text == "hi"
+
+
+def test_complete_turn_replaces_partial():
+ turns: list = []
+ controller = StreamController()
+ acc = TurnAccumulator(turns, controller)
+ acc.begin_turn(UserTurn("hello"))
+ full_turn = AssistantTurn("response", tokens=(10, 5, 0))
+ acc.complete_turn(full_turn)
+ assert turns[1] is full_turn
+ assert not turns[1].is_partial
+
+
+def test_complete_turn_skipped_when_cancelled():
+ turns: list = []
+ controller = StreamController()
+ acc = TurnAccumulator(turns, controller)
+ acc.begin_turn(UserTurn("hello"))
+ controller.cancel()
+ full_turn = AssistantTurn("response", tokens=(10, 5, 0))
+ acc.complete_turn(full_turn)
+ assert turns[1].is_partial
+
+
+def test_finalize_turn_merges_text_and_sets_reason():
+ turns: list = []
+ controller = StreamController()
+ acc = TurnAccumulator(turns, controller)
+ acc.begin_turn(UserTurn("hello"))
+ acc._update_turn(ContentText.model_construct(text="a"))
+ acc._update_turn(ContentText.model_construct(text="b"))
+ acc.finalize_turn()
+ assert turns[1].is_partial
+ assert turns[1].partial_reason == "interrupted"
+ assert len(turns[1].contents) == 1
+ assert turns[1].contents[0].text == "ab"
+
+
+def test_finalize_turn_uses_controller_reason():
+ turns: list = []
+ controller = StreamController()
+ acc = TurnAccumulator(turns, controller)
+ acc.begin_turn(UserTurn("hello"))
+ acc._update_turn(ContentText.model_construct(text="partial"))
+ controller.cancel(reason="user stopped")
+ acc.finalize_turn()
+ assert turns[1].partial_reason == "user stopped"
+
+
+def test_finalize_turn_noops_after_complete():
+ turns: list = []
+ controller = StreamController()
+ acc = TurnAccumulator(turns, controller)
+ acc.begin_turn(UserTurn("hello"))
+ full_turn = AssistantTurn("response", tokens=(10, 5, 0))
+ acc.complete_turn(full_turn)
+ acc.finalize_turn()
+ assert turns[1] is full_turn
+ assert not turns[1].is_partial
diff --git a/tests/test_turns.py b/tests/test_turns.py
index 5ac961b0..29f92761 100644
--- a/tests/test_turns.py
+++ b/tests/test_turns.py
@@ -321,6 +321,25 @@ def test_get_turns_tool_result_role_with_system_prompt():
assert turns_without_system[1].role == "assistant"
+def test_assistant_turn_partial_reason():
+ from chatlas._turn import AssistantTurn
+
+ # Default: not partial
+ turn = AssistantTurn("hello")
+ assert turn.partial_reason is None
+ assert turn.is_partial is False
+
+ # Partial turn
+ partial = AssistantTurn("partial", partial_reason="interrupted")
+ assert partial.partial_reason == "interrupted"
+ assert partial.is_partial is True
+
+ # Custom reason
+ cancelled = AssistantTurn("cancelled", partial_reason="cancelled")
+ assert cancelled.partial_reason == "cancelled"
+ assert cancelled.is_partial is True
+
+
def test_get_turns_tool_result_role_empty_chat():
"""Test tool_result_role with empty chat"""
chat = ChatAnthropic()