diff --git a/CHANGELOG.md b/CHANGELOG.md index 87b67b78..1b9cfab8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### New features * `ChatOpenAICompletions()` (and providers built on it like `ChatDeepSeek`, `ChatOpenRouter`, etc.) now extracts `reasoning_content` from model responses as `ContentThinking` objects. A new `preserve_thinking` parameter controls whether reasoning content is sent back to the API in multi-turn conversations; it defaults to `False` but is set to `True` for `ChatDeepSeek` (required for V4 tool-calling) and `ChatOpenRouter` (recommended for quality). (#295) +* New `StreamController` class for cooperative stream cancellation. Pass a controller to `.stream()` or `.stream_async()` and call `controller.cancel()` to stop the stream cleanly (e.g., from a Shiny "stop generating" button). The partial response is preserved in conversation history. (#279) +* When a stream is interrupted (closed early, cancelled, or errors), the accumulated content is now saved as a partial `AssistantTurn` so conversation state isn't lost. Partial turns display `[interrupted]` (or the cancellation reason) in the `Chat` repr and are excluded from token/cost accounting. (#279) ### Improvements diff --git a/chatlas/__init__.py b/chatlas/__init__.py index 880d0952..6f6073d5 100644 --- a/chatlas/__init__.py +++ b/chatlas/__init__.py @@ -34,6 +34,7 @@ from ._provider_perplexity import ChatPerplexity from ._provider_portkey import ChatPortkey from ._provider_snowflake import ChatSnowflake +from ._stream_controller import StreamController from ._tokens import token_usage from ._tools import Tool, ToolBuiltIn, ToolRejectError from ._tools_builtin import tool_web_fetch, tool_web_search @@ -85,6 +86,7 @@ "interpolate", "interpolate_file", "Provider", + "StreamController", "token_usage", "Tool", "ToolBuiltIn", diff --git a/chatlas/_chat.py b/chatlas/_chat.py index 9beafd65..abd90292 100644 --- a/chatlas/_chat.py +++ b/chatlas/_chat.py @@ -34,7 +34,6 @@ Content, ContentJson, ContentText, - ContentThinking, ContentThinkingDelta, ContentToolRequest, ContentToolResult, @@ -50,9 +49,11 @@ from ._logging import log_tool_error from ._mcp_manager import MCPSessionManager from ._provider import ModelInfo, Provider, StandardModelParams, SubmitInputArgsT +from ._stream_controller import StreamController from ._tokens import tokens_log from ._tools import Tool, ToolBuiltIn, ToolRejectError from ._turn import AssistantTurn, SystemTurn, Turn, UserTurn, user_turn +from ._turn_accumulator import TurnAccumulator from ._typing_extensions import TypedDict, TypeGuard from ._utils import MISSING, MISSING_TYPE, html_escape, wrap_async @@ -400,6 +401,19 @@ def get_tokens(self) -> list[TokensDict]: turns = self.get_turns(include_system_prompt=False) + # Exclude partial assistant turns and their preceding user turns + # (partial turns have no token data) + filtered: list[Turn] = [] + i = 0 + while i < len(turns): + next_turn = turns[i + 1] if i + 1 < len(turns) else None + if isinstance(next_turn, AssistantTurn) and next_turn.is_partial: + i += 2 # skip user + partial assistant pair + else: + filtered.append(turns[i]) + i += 1 + turns = filtered + if len(turns) == 0: return [] @@ -525,7 +539,9 @@ def get_cost( The cost of the chat, in USD. """ - assistant_turns = [t for t in self._turns if isinstance(t, AssistantTurn)] + assistant_turns = [ + t for t in self._turns if isinstance(t, AssistantTurn) and not t.is_partial + ] if len(assistant_turns) == 0: return 0.0 @@ -1070,6 +1086,7 @@ def chat( display = self._markdown_display(echo=echo) + controller = StreamController() response = ChatResponse( self._chat_impl( turn, @@ -1077,6 +1094,7 @@ def chat( content="text", stream=stream, kwargs=kwargs, + controller=controller, ) ) @@ -1123,6 +1141,7 @@ async def chat_async( display = self._markdown_display(echo=echo) + controller = StreamController() response = ChatResponseAsync( self._chat_impl_async( turn, @@ -1130,6 +1149,7 @@ async def chat_async( content="text", stream=stream, kwargs=kwargs, + controller=controller, ), ) @@ -1147,6 +1167,7 @@ def stream( echo: EchoOptions = "none", data_model: Optional[type[BaseModel]] = None, kwargs: Optional[SubmitInputArgsT] = None, + controller: StreamController | None = None, ) -> Generator[str, None, None]: ... @overload @@ -1157,6 +1178,7 @@ def stream( echo: EchoOptions = "none", data_model: Optional[type[BaseModel]] = None, kwargs: Optional[SubmitInputArgsT] = None, + controller: StreamController | None = None, ) -> Generator[ str | ContentThinkingDelta | ContentToolRequest | ContentToolResult, None, None ]: ... @@ -1168,6 +1190,7 @@ def stream( echo: EchoOptions = "none", data_model: Optional[type[BaseModel]] = None, kwargs: Optional[SubmitInputArgsT] = None, + controller: StreamController | None = None, ) -> Generator[ str | ContentThinkingDelta | ContentToolRequest | ContentToolResult, None, None ]: @@ -1196,6 +1219,14 @@ def stream( kwargs Additional keyword arguments to pass to the method used for requesting the response. + controller + A [](`~chatlas.StreamController`) for cooperative stream cancellation. + When provided, calling `controller.cancel()` stops the stream after the + current chunk and preserves the partial response in conversation history. + This is useful for building UIs (e.g., a "stop generating" button) where + you want to interrupt a response without losing what's been generated so + far. The same controller can be reused across multiple streams by calling + `controller.reset()` before starting a new stream. Returns ------- @@ -1224,6 +1255,8 @@ class Person(BaseModel): display = self._markdown_display(echo=echo) + controller = _as_controller(controller) + generator = self._chat_impl( turn, stream=True, @@ -1231,10 +1264,13 @@ class Person(BaseModel): content=content, kwargs=kwargs, data_model=data_model, + controller=controller, ) def wrapper() -> Generator[ - str | ContentThinkingDelta | ContentToolRequest | ContentToolResult, None, None + str | ContentThinkingDelta | ContentToolRequest | ContentToolResult, + None, + None, ]: with display: for chunk in generator: @@ -1250,6 +1286,7 @@ async def stream_async( echo: EchoOptions = "none", data_model: Optional[type[BaseModel]] = None, kwargs: Optional[SubmitInputArgsT] = None, + controller: StreamController | None = None, ) -> AsyncGenerator[str, None]: ... @overload @@ -1260,6 +1297,7 @@ async def stream_async( echo: EchoOptions = "none", data_model: Optional[type[BaseModel]] = None, kwargs: Optional[SubmitInputArgsT] = None, + controller: StreamController | None = None, ) -> AsyncGenerator[ str | ContentThinkingDelta | ContentToolRequest | ContentToolResult, None ]: ... @@ -1271,6 +1309,7 @@ async def stream_async( echo: EchoOptions = "none", data_model: Optional[type[BaseModel]] = None, kwargs: Optional[SubmitInputArgsT] = None, + controller: StreamController | None = None, ) -> AsyncGenerator[ str | ContentThinkingDelta | ContentToolRequest | ContentToolResult, None ]: @@ -1299,6 +1338,14 @@ async def stream_async( kwargs Additional keyword arguments to pass to the method used for requesting the response. + controller + A [](`~chatlas.StreamController`) for cooperative stream cancellation. + When provided, calling `controller.cancel()` stops the stream after the + current chunk and preserves the partial response in conversation history. + This is useful for building UIs (e.g., a "stop generating" button) where + you want to interrupt a response without losing what's been generated so + far. The same controller can be reused across multiple streams by calling + `controller.reset()` before starting a new stream. Returns ------- @@ -1332,6 +1379,8 @@ class Person(BaseModel): display = self._markdown_display(echo=echo) + controller = _as_controller(controller) + async def wrapper() -> AsyncGenerator[ str | ContentThinkingDelta | ContentToolRequest | ContentToolResult, None ]: @@ -1343,6 +1392,7 @@ async def wrapper() -> AsyncGenerator[ content=content, kwargs=kwargs, data_model=data_model, + controller=controller, ): yield chunk @@ -2483,6 +2533,7 @@ def _chat_impl( stream: bool, kwargs: Optional[SubmitInputArgsT] = None, data_model: Optional[type[BaseModel]] = None, + controller: StreamController | None = None, ) -> Generator[str, None, None]: ... @overload @@ -2494,6 +2545,7 @@ def _chat_impl( stream: bool, kwargs: Optional[SubmitInputArgsT] = None, data_model: Optional[type[BaseModel]] = None, + controller: StreamController | None = None, ) -> Generator[ str | ContentThinkingDelta | ContentToolRequest | ContentToolResult, None, None ]: ... @@ -2506,7 +2558,10 @@ def _chat_impl( stream: bool, kwargs: Optional[SubmitInputArgsT] = None, data_model: Optional[type[BaseModel]] = None, + controller: StreamController | None = None, ) -> Generator[str | Content, None, None]: + controller = _as_controller(controller) + user_turn_result: UserTurn | None = user_turn while user_turn_result is not None: for chunk in self._submit_turns( @@ -2516,6 +2571,7 @@ def _chat_impl( data_model=data_model, kwargs=kwargs, content_mode=content, + controller=controller, ): yield chunk @@ -2523,6 +2579,10 @@ def _chat_impl( assert turn is not None user_turn_result = None + # Don't invoke tools if the stream was cancelled + if controller.cancelled: + break + all_results: list[ContentToolResult] = [] for x in turn.contents: if isinstance(x, ContentToolRequest): @@ -2553,6 +2613,7 @@ def _chat_impl_async( stream: bool, kwargs: Optional[SubmitInputArgsT] = None, data_model: Optional[type[BaseModel]] = None, + controller: StreamController | None = None, ) -> AsyncGenerator[str, None]: ... @overload @@ -2564,6 +2625,7 @@ def _chat_impl_async( stream: bool, kwargs: Optional[SubmitInputArgsT] = None, data_model: Optional[type[BaseModel]] = None, + controller: StreamController | None = None, ) -> AsyncGenerator[ str | ContentThinkingDelta | ContentToolRequest | ContentToolResult, None ]: ... @@ -2576,7 +2638,10 @@ async def _chat_impl_async( stream: bool, kwargs: Optional[SubmitInputArgsT] = None, data_model: Optional[type[BaseModel]] = None, + controller: StreamController | None = None, ) -> AsyncGenerator[str | Content, None]: + controller = _as_controller(controller) + user_turn_result: UserTurn | None = user_turn while user_turn_result is not None: async for chunk in self._submit_turns_async( @@ -2586,6 +2651,7 @@ async def _chat_impl_async( data_model=data_model, kwargs=kwargs, content_mode=content, + controller=controller, ): yield chunk @@ -2593,6 +2659,10 @@ async def _chat_impl_async( assert turn is not None user_turn_result = None + # Don't invoke tools if the stream was cancelled + if controller.cancelled: + break + all_results: list[ContentToolResult] = [] for x in turn.contents: if isinstance(x, ContentToolRequest): @@ -2625,6 +2695,7 @@ def _submit_turns( data_model: type[BaseModel] | None = None, kwargs: Optional[SubmitInputArgsT] = None, content_mode: Literal["text"] = "text", + controller: StreamController | None = None, ) -> Generator[str, None, None]: ... @overload @@ -2637,6 +2708,7 @@ def _submit_turns( kwargs: Optional[SubmitInputArgsT] = None, *, content_mode: Literal["all"], + controller: StreamController | None = None, ) -> Generator[str | Content, None, None]: ... def _submit_turns( @@ -2647,7 +2719,10 @@ def _submit_turns( data_model: type[BaseModel] | None = None, kwargs: Optional[SubmitInputArgsT] = None, content_mode: Literal["text", "all"] = "text", + controller: StreamController | None = None, ) -> Generator[str | Content, None, None]: + controller = _as_controller(controller) + if any(isinstance(x, Tool) and x._is_async for x in self._tools.values()): raise ValueError("Cannot use async tools in a synchronous chat") @@ -2671,47 +2746,39 @@ def emit(text: str | Content): kwargs=all_kwargs, ) - result = None - inside_thinking = False - - for chunk in response: - content = self.provider.stream_content(chunk) - if content is not None: - if is_thinking_delta(content) and not inside_thinking: - content = ContentThinkingDelta( - thinking=content.thinking, phase="start" - ) - emit("\n") - inside_thinking = True - elif not is_thinking_delta(content) and inside_thinking: - emit("\n\n\n") - if content_mode == "all": - yield ContentThinkingDelta(thinking="", phase="end") - inside_thinking = False - - if is_thinking_delta(content): - emit(content.thinking) - if content_mode == "all": - yield content - else: - text = content_text(content) - if text: - emit(text) - yield text - result = self.provider.stream_merge_chunks(result, chunk) - - if inside_thinking: - emit("\n\n\n") - if content_mode == "all": - yield ContentThinkingDelta(thinking="", phase="end") - - turn = self.provider.stream_turn( - result, - has_data_model=data_model is not None, - ) - - if echo == "all": - emit_other_contents(turn, emit) + acc = TurnAccumulator(self._turns, controller) + acc.begin_turn(user_turn) + + try: + result = None + for chunk in response: + if controller.cancelled: + break + content = self.provider.stream_content(chunk) + if content is not None: + yield from acc.process_content(content, content_mode, emit) + result = self.provider.stream_merge_chunks(result, chunk) + + yield from acc.flush_thinking(content_mode, emit) + + if not controller.cancelled: + turn = self.provider.stream_turn( + result, + has_data_model=data_model is not None, + ) + if echo == "all": + emit_other_contents(turn, emit) + turn = resolve_assistant_turn(self.provider, turn) + acc.complete_turn(turn) + finally: + acc.finalize_turn() + # Breaking out of `for chunk in response` doesn't close the + # provider's underlying HTTP connection — only the temporary + # iterator is released. Explicitly close so connections aren't + # held until GC collects the Stream object. + _r: Any = response + if hasattr(_r, "close"): + _r.close() else: response = self.provider.chat_perform( @@ -2732,17 +2799,8 @@ def emit(text: str | Content): if echo == "all": emit_other_contents(turn, emit) - if not isinstance(turn, AssistantTurn): - raise TypeError( - f"Expected turn to be AssistantTurn, got {type(turn).__name__}" - ) - if turn.tokens is None and turn.completion: - turn.tokens = self.provider.value_tokens(turn.completion) - if turn.cost is None and turn.completion: - turn.cost = self.provider.value_cost(turn.completion, turn.tokens) - if turn.tokens is not None: - tokens_log(self.provider, turn.tokens) - self._turns.extend([user_turn, turn]) + turn = resolve_assistant_turn(self.provider, turn) + self._turns.extend([user_turn, turn]) @overload def _submit_turns_async( @@ -2753,6 +2811,7 @@ def _submit_turns_async( data_model: type[BaseModel] | None = None, kwargs: Optional[SubmitInputArgsT] = None, content_mode: Literal["text"] = "text", + controller: StreamController | None = None, ) -> AsyncGenerator[str, None]: ... @overload @@ -2765,6 +2824,7 @@ def _submit_turns_async( kwargs: Optional[SubmitInputArgsT] = None, *, content_mode: Literal["all"], + controller: StreamController | None = None, ) -> AsyncGenerator[str | Content, None]: ... async def _submit_turns_async( @@ -2775,7 +2835,10 @@ async def _submit_turns_async( data_model: type[BaseModel] | None = None, kwargs: Optional[SubmitInputArgsT] = None, content_mode: Literal["text", "all"] = "text", + controller: StreamController | None = None, ) -> AsyncGenerator[str | Content, None]: + controller = _as_controller(controller) + def emit(text: str | Content): self._echo_content(str(text)) @@ -2796,47 +2859,45 @@ def emit(text: str | Content): kwargs=all_kwargs, ) - result = None - inside_thinking = False - - async for chunk in response: - content = self.provider.stream_content(chunk) - if content is not None: - if is_thinking_delta(content) and not inside_thinking: - content = ContentThinkingDelta( - thinking=content.thinking, phase="start" - ) - emit("\n") - inside_thinking = True - elif not is_thinking_delta(content) and inside_thinking: - emit("\n\n\n") - if content_mode == "all": - yield ContentThinkingDelta(thinking="", phase="end") - inside_thinking = False - - if is_thinking_delta(content): - emit(content.thinking) - if content_mode == "all": - yield content + acc = TurnAccumulator(self._turns, controller) + acc.begin_turn(user_turn) + + try: + result = None + async for chunk in response: + if controller.cancelled: + break + content = self.provider.stream_content(chunk) + if content is not None: + for item in acc.process_content(content, content_mode, emit): + yield item + result = self.provider.stream_merge_chunks(result, chunk) + + for item in acc.flush_thinking(content_mode, emit): + yield item + + if not controller.cancelled: + turn = self.provider.stream_turn( + result, + has_data_model=data_model is not None, + ) + if echo == "all": + emit_other_contents(turn, emit) + turn = resolve_assistant_turn(self.provider, turn) + acc.complete_turn(turn) + finally: + acc.finalize_turn() + # Same as sync path above, but async generator finalization + # (PEP 525) is even less deterministic — abandoned generators + # may not close until event loop shutdown. + _r: Any = response + if hasattr(_r, "aclose"): + await _r.aclose() + elif hasattr(_r, "close"): + if inspect.iscoroutinefunction(_r.close): + await _r.close() else: - text = content_text(content) - if text: - emit(text) - yield text - result = self.provider.stream_merge_chunks(result, chunk) - - if inside_thinking: - emit("\n\n\n") - if content_mode == "all": - yield ContentThinkingDelta(thinking="", phase="end") - - turn = self.provider.stream_turn( - result, - has_data_model=data_model is not None, - ) - - if echo == "all": - emit_other_contents(turn, emit) + _r.close() else: response = await self.provider.chat_perform_async( @@ -2857,17 +2918,8 @@ def emit(text: str | Content): if echo == "all": emit_other_contents(turn, emit) - if not isinstance(turn, AssistantTurn): - raise TypeError( - f"Expected turn to be AssistantTurn, got {type(turn).__name__}" - ) - if turn.tokens is None and turn.completion: - turn.tokens = self.provider.value_tokens(turn.completion) - if turn.cost is None and turn.completion: - turn.cost = self.provider.value_cost(turn.completion, turn.tokens) - if turn.tokens is not None: - tokens_log(self.provider, turn.tokens) - self._turns.extend([user_turn, turn]) + turn = resolve_assistant_turn(self.provider, turn) + self._turns.extend([user_turn, turn]) def _collect_all_kwargs( self, @@ -3075,18 +3127,20 @@ def __str__(self): from ._repr import format_tokens turns = self.get_turns(include_system_prompt=True) - assistant_turns = [t for t in turns if isinstance(t, AssistantTurn)] + complete_assistant_turns = [ + t for t in turns if isinstance(t, AssistantTurn) and not t.is_partial + ] - # Sum tokens across assistant turns + # Sum tokens across complete assistant turns tokens: tuple[int, int, int] | None = None - if any(t.tokens for t in assistant_turns): + if any(t.tokens for t in complete_assistant_turns): tokens = ( - sum(t.tokens[0] for t in assistant_turns if t.tokens), - sum(t.tokens[1] for t in assistant_turns if t.tokens), - sum(t.tokens[2] for t in assistant_turns if t.tokens), + sum(t.tokens[0] for t in complete_assistant_turns if t.tokens), + sum(t.tokens[1] for t in complete_assistant_turns if t.tokens), + sum(t.tokens[2] for t in complete_assistant_turns if t.tokens), ) - costs = [t.cost for t in assistant_turns if t.cost is not None] + costs = [t.cost for t in complete_assistant_turns if t.cost is not None] total_cost = sum(costs) if costs else None res = f" TypeGuard[ContentThinkingDelta]: - return isinstance(content, ContentThinkingDelta) - - -def content_text(content: Content) -> str: - """Extract displayable text from a Content object.""" - if isinstance(content, ContentThinkingDelta): - return content.thinking - if isinstance(content, ContentThinking): - return content.thinking - if isinstance(content, ContentText): - return content.text - return str(content) +def _as_controller(controller: StreamController | None) -> StreamController: + """Ensure a non-None, ready-to-use StreamController.""" + if controller is None: + return StreamController() + controller._ensure_ready() + return controller + + +def resolve_assistant_turn(provider: Provider, turn: Turn) -> AssistantTurn: + """Validate turn type, compute tokens and cost, and log usage.""" + if not isinstance(turn, AssistantTurn): + raise TypeError(f"Expected turn to be AssistantTurn, got {type(turn).__name__}") + if turn.tokens is None and turn.completion: + turn.tokens = provider.value_tokens(turn.completion) + if turn.cost is None and turn.completion: + turn.cost = provider.value_cost(turn.completion, turn.tokens) + if turn.tokens is not None: + tokens_log(provider, turn.tokens) + return turn def is_quarto(): diff --git a/chatlas/_stream_controller.py b/chatlas/_stream_controller.py new file mode 100644 index 00000000..655c063a --- /dev/null +++ b/chatlas/_stream_controller.py @@ -0,0 +1,76 @@ +from __future__ import annotations + +import warnings + + +class StreamController: + """ + Cooperative cancellation handle for streaming responses. + + Create a controller and pass it to + :meth:`~chatlas.Chat.stream` or :meth:`~chatlas.Chat.stream_async` + via the ``controller`` argument, then call :meth:`cancel` from + anywhere (e.g., a Shiny observer) to stop the stream after the + current chunk. + + The same controller can be reused across multiple streams. Call + :meth:`reset` to clear the cancelled state before starting a new + stream. + + Examples + -------- + ```python + from chatlas import ChatOpenAI, StreamController + + chat = ChatOpenAI() + ctrl = StreamController() + + i = 0 + for chunk in chat.stream("Write a story", controller=ctrl): + i += 1 + print(chunk, end="") + if i > 10: + ctrl.cancel() + + # Partial response is preserved in history + print(chat.get_turns()) + ``` + """ + + def __init__(self): + self._cancelled: bool = False + self._reason: str | None = None + + def cancel(self, reason: str = "cancelled") -> None: + """Cancel the stream. The reason is stored on the partial turn.""" + # Set reason before cancelled so that readers who see cancelled=True + # will always find a reason. This is safe under CPython's GIL; on + # free-threaded builds the worst case is a benign extra iteration + # before the streaming loop notices the cancellation. + self._reason = reason + self._cancelled = True + + def reset(self) -> None: + """Clear the cancelled state and reason.""" + self._cancelled = False + self._reason = None + + def _ensure_ready(self) -> None: + """Auto-reset if already cancelled (prevents stale controller bugs).""" + if self._cancelled: + warnings.warn( + "StreamController was already cancelled — resetting automatically. " + "Call controller.reset() explicitly to avoid this warning.", + stacklevel=3, + ) + self.reset() + + @property + def cancelled(self) -> bool: + """Whether the controller has been cancelled.""" + return self._cancelled + + @property + def reason(self) -> str | None: + """The cancellation reason, or None if not cancelled.""" + return self._reason diff --git a/chatlas/_turn.py b/chatlas/_turn.py index 2cdcbf60..58246c59 100644 --- a/chatlas/_turn.py +++ b/chatlas/_turn.py @@ -308,6 +308,9 @@ class AssistantTurn(Turn, Generic[CompletionT]): cost The cost of this turn in USD. This is computed when the turn is created based on the token usage and pricing information (including service tier). + partial_reason + If set, indicates this turn is incomplete (e.g., the stream was interrupted + or cancelled). The value describes the reason for the partial state. See Also -------- @@ -322,6 +325,12 @@ class AssistantTurn(Turn, Generic[CompletionT]): finish_reason: Optional[str] = None completion: Optional[CompletionT] = Field(default=None, exclude=True) cost: Optional[float] = None + partial_reason: Optional[str] = None + + @property + def is_partial(self) -> bool: + """Whether this turn is a partial (interrupted/cancelled) turn.""" + return self.partial_reason is not None @field_validator("tokens", mode="before") @classmethod @@ -339,6 +348,7 @@ def __init__( finish_reason: Optional[str] = None, completion: Optional[CompletionT] = None, cost: Optional[float] = None, + partial_reason: Optional[str] = None, **kwargs, ): if isinstance(tokens, list): @@ -353,6 +363,8 @@ def __init__( kwargs["completion"] = completion if cost is not None: kwargs["cost"] = cost + if partial_reason is not None: + kwargs["partial_reason"] = partial_reason super().__init__(contents, **kwargs) diff --git a/chatlas/_turn_accumulator.py b/chatlas/_turn_accumulator.py new file mode 100644 index 00000000..c081ba68 --- /dev/null +++ b/chatlas/_turn_accumulator.py @@ -0,0 +1,158 @@ +from __future__ import annotations + +from typing import Callable, Literal, Sequence + +from ._content import ( + Content, + ContentText, + ContentThinking, + ContentThinkingDelta, + ContentUnion, +) +from ._stream_controller import StreamController +from ._turn import AssistantTurn, Turn, UserTurn + + +def merge_content_text(contents: list[ContentUnion]) -> list[ContentUnion]: + """Merge adjacent ContentText (and ContentThinking) fragments.""" + if not contents: + return [] + merged: list[ContentUnion] = [contents[0]] + for item in contents[1:]: + last = merged[-1] + if isinstance(last, ContentText) and isinstance(item, ContentText): + merged[-1] = ContentText.model_construct(text=last.text + item.text) + elif isinstance(last, ContentThinking) and isinstance(item, ContentThinking): + merged[-1] = ContentThinking.model_construct( + thinking=last.thinking + item.thinking, + extra=item.extra or last.extra, + ) + else: + merged.append(item) + return merged + + +class TurnAccumulator: + """ + Manages the lifecycle of one streaming assistant turn. + + Mirrors ellmer's TurnAccumulator R6 class. The stages are: + + 1. ``begin_turn(user_turn)`` — insert user + partial assistant into turns + 2. ``process_content(content, ...)`` — append content, handle thinking + phase boundaries, emit display text, and return items to yield + 3. ``flush_thinking(...)`` — emit closing thinking tags after the loop + 4. ``complete_turn(turn)`` — replace partial with the full turn (skipped if cancelled) + 5. ``finalize_turn()`` — called from ``finally``; merges text fragments + and stamps the cancellation reason if the turn is still partial + """ + + def __init__( + self, + turns: list[Turn], + controller: StreamController, + ): + self._turns = turns + self._controller = controller + self._turn_idx: int | None = None + self._inside_thinking: bool = False + + def begin_turn(self, user_turn: UserTurn) -> None: + """Insert user turn and a partial assistant placeholder.""" + partial: AssistantTurn = AssistantTurn([], partial_reason="interrupted") + self._turns.extend([user_turn, partial]) + self._turn_idx = len(self._turns) - 1 + + def process_content( + self, + content: Content, + content_mode: Literal["text", "all"], + emit: Callable[[str | Content], None], + ) -> Sequence[str | Content]: + """Append content to the turn, emit display text, return items to yield.""" + self._update_turn(content) + + items: list[str | Content] = [] + + if isinstance(content, ContentThinkingDelta) and not self._inside_thinking: + content = ContentThinkingDelta( + thinking=content.thinking, phase="start" + ) + emit("\n") + self._inside_thinking = True + elif not isinstance(content, ContentThinkingDelta) and self._inside_thinking: + emit("\n\n\n") + if content_mode == "all": + items.append(ContentThinkingDelta(thinking="", phase="end")) + self._inside_thinking = False + + if isinstance(content, ContentThinkingDelta): + emit(content.thinking) + if content_mode == "all": + items.append(content) + else: + text = _content_text(content) + if text: + emit(text) + items.append(text) + + return items + + def flush_thinking( + self, + content_mode: Literal["text", "all"], + emit: Callable[[str | Content], None], + ) -> Sequence[str | Content]: + """Emit closing thinking tags if the stream ended mid-thinking.""" + if not self._inside_thinking: + return [] + self._inside_thinking = False + emit("\n\n\n") + if content_mode == "all": + return [ContentThinkingDelta(thinking="", phase="end")] + return [] + + def complete_turn(self, turn: AssistantTurn) -> None: + """Replace the partial turn with the completed turn (no-op if cancelled).""" + if self._turn_idx is None: + raise RuntimeError("complete_turn called before begin_turn") + if self._controller.cancelled: + return + self._turns[self._turn_idx] = turn + + def finalize_turn(self) -> None: + """ + Safety net — called from ``finally``. + + If the turn is still partial (i.e., ``complete_turn`` was never called + or was skipped because of cancellation), merge adjacent text fragments + and stamp the cancellation reason. + """ + if self._turn_idx is None: + return + turn = self._turns[self._turn_idx] + if not isinstance(turn, AssistantTurn) or not turn.is_partial: + return + if self._controller.cancelled: + turn.partial_reason = self._controller.reason + turn.contents = merge_content_text(turn.contents) + + def _update_turn(self, content: Content) -> None: + """Append streamed content to the partial turn.""" + if self._turn_idx is None: + raise RuntimeError("_update_turn called before begin_turn") + # Content is the base class; contents is typed as list[ContentUnion] + # (discriminated union). At runtime all Content subclasses are ContentUnion + # members, so the append is safe. + self._turns[self._turn_idx].contents.append(content) # type: ignore[arg-type] + + +def _content_text(content: Content) -> str: + """Extract displayable text from a Content object.""" + if isinstance(content, ContentThinkingDelta): + return content.thinking + if isinstance(content, ContentThinking): + return content.thinking + if isinstance(content, ContentText): + return content.text + return str(content) diff --git a/docs/get-started/stream.qmd b/docs/get-started/stream.qmd index f5da3b62..228e37cc 100644 --- a/docs/get-started/stream.qmd +++ b/docs/get-started/stream.qmd @@ -78,6 +78,28 @@ for chunk in stream: ``` +## Cancelling a stream + +Use a [](`~chatlas.StreamController`) to stop a stream early while preserving what's been generated so far. Pass a controller to `.stream()`, then call `controller.cancel()` when you want to stop. + +```python +from chatlas import StreamController + +ctrl = StreamController() + +chunks = [] +for chunk in chat.stream("Write a long story", controller=ctrl): + chunks.append(chunk) + if len(chunks) >= 5: + ctrl.cancel() + +# The partial response is preserved in history +last_turn = chat.get_last_turn() +print(last_turn.text) +``` + +This is useful for building UIs with a "stop generating" button (e.g., in a [Shiny chatbot](chatbots.qmd)). The same controller can be reused across multiple streams by calling `controller.reset()` before starting a new stream. + ## Wrapping generators Sometimes it's useful to wrap the `.stream()` generator up into another generator function. diff --git a/tests/__snapshots__/test_chat.ambr b/tests/__snapshots__/test_chat.ambr index 120ce5bd..05be2d2c 100644 --- a/tests/__snapshots__/test_chat.ambr +++ b/tests/__snapshots__/test_chat.ambr @@ -66,3 +66,25 @@ ''' # --- +# name: test_partial_turn_display + ''' + + + ## User + + hello + + ## Assistant [input=10 output=5 cost=$0.0010] + + response + + ## User + + more + + ## Assistant [interrupted] + + partial response + + ''' +# --- diff --git a/tests/_vcr/test_chat/test_partial_turn_preserved_on_close.yaml b/tests/_vcr/test_chat/test_partial_turn_preserved_on_close.yaml new file mode 100644 index 00000000..ba6c770d --- /dev/null +++ b/tests/_vcr/test_chat/test_partial_turn_preserved_on_close.yaml @@ -0,0 +1,171 @@ +interactions: +- request: + body: '{"include": ["reasoning.encrypted_content"], "input": [{"role": "user", "content": [{"type": "input_text", "text": "\n What are the canonical colors of the ROYGBIV rainbow?\n Put each colour on its own line. Don''t use punctuation.\n "}]}], "model": "gpt-5.4", "store": false, "stream": true}' + headers: + Accept: + - application/json + Accept-Encoding: + - gzip, deflate + Connection: + - keep-alive + Content-Length: + - '251' + Content-Type: + - application/json + Host: + - api.openai.com + X-Stainless-Async: + - 'false' + x-stainless-read-timeout: + - '600' + method: POST + uri: https://api.openai.com/v1/responses + response: + body: + string: 'event: response.created + + data: {"type":"response.created","sequence_number":0,"response":{"id":"resp_04241dbaf3a847fc01695588904c60819c87095f38c2a25952","object":"response","created_at":1767213200,"status":"in_progress","background":false,"completed_at":null,"error":null,"incomplete_details":null,"instructions":null,"max_output_tokens":null,"max_tool_calls":null,"model":"gpt-5.4-2025-04-14","output":[],"parallel_tool_calls":true,"previous_response_id":null,"prompt_cache_key":null,"prompt_cache_retention":null,"reasoning":{"effort":null,"summary":null},"safety_identifier":null,"service_tier":"auto","store":false,"temperature":1.0,"text":{"format":{"type":"text"},"verbosity":"medium"},"tool_choice":"auto","tools":[],"top_logprobs":0,"top_p":1.0,"truncation":"disabled","usage":null,"user":null,"metadata":{}}} + + + event: response.in_progress + + data: {"type":"response.in_progress","sequence_number":1,"response":{"id":"resp_04241dbaf3a847fc01695588904c60819c87095f38c2a25952","object":"response","created_at":1767213200,"status":"in_progress","background":false,"completed_at":null,"error":null,"incomplete_details":null,"instructions":null,"max_output_tokens":null,"max_tool_calls":null,"model":"gpt-5.4-2025-04-14","output":[],"parallel_tool_calls":true,"previous_response_id":null,"prompt_cache_key":null,"prompt_cache_retention":null,"reasoning":{"effort":null,"summary":null},"safety_identifier":null,"service_tier":"auto","store":false,"temperature":1.0,"text":{"format":{"type":"text"},"verbosity":"medium"},"tool_choice":"auto","tools":[],"top_logprobs":0,"top_p":1.0,"truncation":"disabled","usage":null,"user":null,"metadata":{}}} + + + event: response.output_item.added + + data: {"type":"response.output_item.added","sequence_number":2,"output_index":0,"item":{"id":"msg_04241dbaf3a847fc01695588907c34819c939e24002aa91af2","type":"message","status":"in_progress","content":[],"role":"assistant"}} + + + event: response.content_part.added + + data: {"type":"response.content_part.added","sequence_number":3,"item_id":"msg_04241dbaf3a847fc01695588907c34819c939e24002aa91af2","output_index":0,"content_index":0,"part":{"type":"output_text","annotations":[],"logprobs":[],"text":""}} + + + event: response.output_text.delta + + data: {"type":"response.output_text.delta","sequence_number":4,"item_id":"msg_04241dbaf3a847fc01695588907c34819c939e24002aa91af2","output_index":0,"content_index":0,"delta":"Red","logprobs":[],"obfuscation":"VyLjtlp4pcHnl"} + + + event: response.output_text.delta + + data: {"type":"response.output_text.delta","sequence_number":5,"item_id":"msg_04241dbaf3a847fc01695588907c34819c939e24002aa91af2","output_index":0,"content_index":0,"delta":" \n","logprobs":[],"obfuscation":"VFBYwzpsUJWxH"} + + + event: response.output_text.delta + + data: {"type":"response.output_text.delta","sequence_number":6,"item_id":"msg_04241dbaf3a847fc01695588907c34819c939e24002aa91af2","output_index":0,"content_index":0,"delta":"Orange","logprobs":[],"obfuscation":"GB7EheuPAI"} + + + event: response.output_text.delta + + data: {"type":"response.output_text.delta","sequence_number":7,"item_id":"msg_04241dbaf3a847fc01695588907c34819c939e24002aa91af2","output_index":0,"content_index":0,"delta":" \n","logprobs":[],"obfuscation":"n7WBEjbLB9yuQ"} + + + event: response.output_text.delta + + data: {"type":"response.output_text.delta","sequence_number":8,"item_id":"msg_04241dbaf3a847fc01695588907c34819c939e24002aa91af2","output_index":0,"content_index":0,"delta":"Yellow","logprobs":[],"obfuscation":"tjVPBCfEwZ"} + + + event: response.output_text.delta + + data: {"type":"response.output_text.delta","sequence_number":9,"item_id":"msg_04241dbaf3a847fc01695588907c34819c939e24002aa91af2","output_index":0,"content_index":0,"delta":" \n","logprobs":[],"obfuscation":"ThsRrqFkcHdBY"} + + + event: response.output_text.delta + + data: {"type":"response.output_text.delta","sequence_number":10,"item_id":"msg_04241dbaf3a847fc01695588907c34819c939e24002aa91af2","output_index":0,"content_index":0,"delta":"Green","logprobs":[],"obfuscation":"vpm43U0OMRn"} + + + event: response.output_text.delta + + data: {"type":"response.output_text.delta","sequence_number":11,"item_id":"msg_04241dbaf3a847fc01695588907c34819c939e24002aa91af2","output_index":0,"content_index":0,"delta":" \n","logprobs":[],"obfuscation":"Ce8I6O3Ympl5O"} + + + event: response.output_text.delta + + data: {"type":"response.output_text.delta","sequence_number":12,"item_id":"msg_04241dbaf3a847fc01695588907c34819c939e24002aa91af2","output_index":0,"content_index":0,"delta":"Blue","logprobs":[],"obfuscation":"2qjVtLnv4y9z"} + + + event: response.output_text.delta + + data: {"type":"response.output_text.delta","sequence_number":13,"item_id":"msg_04241dbaf3a847fc01695588907c34819c939e24002aa91af2","output_index":0,"content_index":0,"delta":" \n","logprobs":[],"obfuscation":"RBKSpaeiKPIYL"} + + + event: response.output_text.delta + + data: {"type":"response.output_text.delta","sequence_number":14,"item_id":"msg_04241dbaf3a847fc01695588907c34819c939e24002aa91af2","output_index":0,"content_index":0,"delta":"Ind","logprobs":[],"obfuscation":"50myY2IpRyTEZ"} + + + event: response.output_text.delta + + data: {"type":"response.output_text.delta","sequence_number":15,"item_id":"msg_04241dbaf3a847fc01695588907c34819c939e24002aa91af2","output_index":0,"content_index":0,"delta":"igo","logprobs":[],"obfuscation":"zWfhh8RBqltML"} + + + event: response.output_text.delta + + data: {"type":"response.output_text.delta","sequence_number":16,"item_id":"msg_04241dbaf3a847fc01695588907c34819c939e24002aa91af2","output_index":0,"content_index":0,"delta":" \n","logprobs":[],"obfuscation":"s13DaAWMQbrhf"} + + + event: response.output_text.delta + + data: {"type":"response.output_text.delta","sequence_number":17,"item_id":"msg_04241dbaf3a847fc01695588907c34819c939e24002aa91af2","output_index":0,"content_index":0,"delta":"V","logprobs":[],"obfuscation":"2QRAGHqzRIJ4Rhb"} + + + event: response.output_text.delta + + data: {"type":"response.output_text.delta","sequence_number":18,"item_id":"msg_04241dbaf3a847fc01695588907c34819c939e24002aa91af2","output_index":0,"content_index":0,"delta":"iolet","logprobs":[],"obfuscation":"CWR5fXLEuEA"} + + + event: response.output_text.done + + data: {"type":"response.output_text.done","sequence_number":19,"item_id":"msg_04241dbaf3a847fc01695588907c34819c939e24002aa91af2","output_index":0,"content_index":0,"text":"Red \nOrange \nYellow \nGreen \nBlue \nIndigo \nViolet","logprobs":[]} + + + event: response.content_part.done + + data: {"type":"response.content_part.done","sequence_number":20,"item_id":"msg_04241dbaf3a847fc01695588907c34819c939e24002aa91af2","output_index":0,"content_index":0,"part":{"type":"output_text","annotations":[],"logprobs":[],"text":"Red \nOrange \nYellow \nGreen \nBlue \nIndigo \nViolet"}} + + + event: response.output_item.done + + data: {"type":"response.output_item.done","sequence_number":21,"output_index":0,"item":{"id":"msg_04241dbaf3a847fc01695588907c34819c939e24002aa91af2","type":"message","status":"completed","content":[{"type":"output_text","annotations":[],"logprobs":[],"text":"Red \nOrange \nYellow \nGreen \nBlue \nIndigo \nViolet"}],"role":"assistant"}} + + + event: response.completed + + data: {"type":"response.completed","sequence_number":22,"response":{"id":"resp_04241dbaf3a847fc01695588904c60819c87095f38c2a25952","object":"response","created_at":1767213200,"status":"completed","background":false,"completed_at":1767213200,"error":null,"incomplete_details":null,"instructions":null,"max_output_tokens":null,"max_tool_calls":null,"model":"gpt-5.4-2025-04-14","output":[{"id":"msg_04241dbaf3a847fc01695588907c34819c939e24002aa91af2","type":"message","status":"completed","content":[{"type":"output_text","annotations":[],"logprobs":[],"text":"Red \nOrange \nYellow \nGreen \nBlue \nIndigo \nViolet"}],"role":"assistant"}],"parallel_tool_calls":true,"previous_response_id":null,"prompt_cache_key":null,"prompt_cache_retention":null,"reasoning":{"effort":null,"summary":null},"safety_identifier":null,"service_tier":"default","store":false,"temperature":1.0,"text":{"format":{"type":"text"},"verbosity":"medium"},"tool_choice":"auto","tools":[],"top_logprobs":0,"top_p":1.0,"truncation":"disabled","usage":{"input_tokens":36,"input_tokens_details":{"cached_tokens":0},"output_tokens":16,"output_tokens_details":{"reasoning_tokens":0},"total_tokens":52},"user":null,"metadata":{}}} + + + ' + headers: + CF-RAY: + - 9b6c8d259a25eb65-ORD + Connection: + - keep-alive + Content-Type: + - text/event-stream; charset=utf-8 + Date: + - Wed, 31 Dec 2025 20:33:20 GMT + Server: + - cloudflare + Strict-Transport-Security: + - max-age=31536000; includeSubDomains; preload + Transfer-Encoding: + - chunked + X-Content-Type-Options: + - nosniff + alt-svc: + - h3=":443"; ma=86400 + cf-cache-status: + - DYNAMIC + openai-processing-ms: + - '59' + openai-version: + - '2020-10-01' + x-envoy-upstream-service-time: + - '62' + status: + code: 200 + message: OK +version: 1 diff --git a/tests/_vcr/test_chat/test_partial_turn_preserved_on_close_async.yaml b/tests/_vcr/test_chat/test_partial_turn_preserved_on_close_async.yaml new file mode 100644 index 00000000..94c54738 --- /dev/null +++ b/tests/_vcr/test_chat/test_partial_turn_preserved_on_close_async.yaml @@ -0,0 +1,171 @@ +interactions: +- request: + body: '{"include": ["reasoning.encrypted_content"], "input": [{"role": "user", "content": [{"type": "input_text", "text": "\n What are the canonical colors of the ROYGBIV rainbow?\n Put each colour on its own line. Don''t use punctuation.\n "}]}], "model": "gpt-5.4", "store": false, "stream": true}' + headers: + Accept: + - application/json + Accept-Encoding: + - gzip, deflate + Connection: + - keep-alive + Content-Length: + - '251' + Content-Type: + - application/json + Host: + - api.openai.com + X-Stainless-Async: + - async:asyncio + x-stainless-read-timeout: + - '600' + method: POST + uri: https://api.openai.com/v1/responses + response: + body: + string: 'event: response.created + + data: {"type":"response.created","sequence_number":0,"response":{"id":"resp_0ca73fa85f6cf2590169558890df78819cbbe0d71bc787fb5f","object":"response","created_at":1767213200,"status":"in_progress","background":false,"completed_at":null,"error":null,"incomplete_details":null,"instructions":null,"max_output_tokens":null,"max_tool_calls":null,"model":"gpt-5.4-2025-04-14","output":[],"parallel_tool_calls":true,"previous_response_id":null,"prompt_cache_key":null,"prompt_cache_retention":null,"reasoning":{"effort":null,"summary":null},"safety_identifier":null,"service_tier":"auto","store":false,"temperature":1.0,"text":{"format":{"type":"text"},"verbosity":"medium"},"tool_choice":"auto","tools":[],"top_logprobs":0,"top_p":1.0,"truncation":"disabled","usage":null,"user":null,"metadata":{}}} + + + event: response.in_progress + + data: {"type":"response.in_progress","sequence_number":1,"response":{"id":"resp_0ca73fa85f6cf2590169558890df78819cbbe0d71bc787fb5f","object":"response","created_at":1767213200,"status":"in_progress","background":false,"completed_at":null,"error":null,"incomplete_details":null,"instructions":null,"max_output_tokens":null,"max_tool_calls":null,"model":"gpt-5.4-2025-04-14","output":[],"parallel_tool_calls":true,"previous_response_id":null,"prompt_cache_key":null,"prompt_cache_retention":null,"reasoning":{"effort":null,"summary":null},"safety_identifier":null,"service_tier":"auto","store":false,"temperature":1.0,"text":{"format":{"type":"text"},"verbosity":"medium"},"tool_choice":"auto","tools":[],"top_logprobs":0,"top_p":1.0,"truncation":"disabled","usage":null,"user":null,"metadata":{}}} + + + event: response.output_item.added + + data: {"type":"response.output_item.added","sequence_number":2,"output_index":0,"item":{"id":"msg_0ca73fa85f6cf259016955889120c0819c8612db972f509b24","type":"message","status":"in_progress","content":[],"role":"assistant"}} + + + event: response.content_part.added + + data: {"type":"response.content_part.added","sequence_number":3,"item_id":"msg_0ca73fa85f6cf259016955889120c0819c8612db972f509b24","output_index":0,"content_index":0,"part":{"type":"output_text","annotations":[],"logprobs":[],"text":""}} + + + event: response.output_text.delta + + data: {"type":"response.output_text.delta","sequence_number":4,"item_id":"msg_0ca73fa85f6cf259016955889120c0819c8612db972f509b24","output_index":0,"content_index":0,"delta":"Red","logprobs":[],"obfuscation":"Fwrn6l3VdXbRX"} + + + event: response.output_text.delta + + data: {"type":"response.output_text.delta","sequence_number":5,"item_id":"msg_0ca73fa85f6cf259016955889120c0819c8612db972f509b24","output_index":0,"content_index":0,"delta":" \n","logprobs":[],"obfuscation":"1kJSXray3LSug"} + + + event: response.output_text.delta + + data: {"type":"response.output_text.delta","sequence_number":6,"item_id":"msg_0ca73fa85f6cf259016955889120c0819c8612db972f509b24","output_index":0,"content_index":0,"delta":"Orange","logprobs":[],"obfuscation":"Eih8h9dEZy"} + + + event: response.output_text.delta + + data: {"type":"response.output_text.delta","sequence_number":7,"item_id":"msg_0ca73fa85f6cf259016955889120c0819c8612db972f509b24","output_index":0,"content_index":0,"delta":" \n","logprobs":[],"obfuscation":"tHQucrSzXW82d"} + + + event: response.output_text.delta + + data: {"type":"response.output_text.delta","sequence_number":8,"item_id":"msg_0ca73fa85f6cf259016955889120c0819c8612db972f509b24","output_index":0,"content_index":0,"delta":"Yellow","logprobs":[],"obfuscation":"d7XiKGH6ge"} + + + event: response.output_text.delta + + data: {"type":"response.output_text.delta","sequence_number":9,"item_id":"msg_0ca73fa85f6cf259016955889120c0819c8612db972f509b24","output_index":0,"content_index":0,"delta":" \n","logprobs":[],"obfuscation":"Y8ax6kRokXoTk"} + + + event: response.output_text.delta + + data: {"type":"response.output_text.delta","sequence_number":10,"item_id":"msg_0ca73fa85f6cf259016955889120c0819c8612db972f509b24","output_index":0,"content_index":0,"delta":"Green","logprobs":[],"obfuscation":"lcahcSGcQIO"} + + + event: response.output_text.delta + + data: {"type":"response.output_text.delta","sequence_number":11,"item_id":"msg_0ca73fa85f6cf259016955889120c0819c8612db972f509b24","output_index":0,"content_index":0,"delta":" \n","logprobs":[],"obfuscation":"Ybguf4GM4Rxzd"} + + + event: response.output_text.delta + + data: {"type":"response.output_text.delta","sequence_number":12,"item_id":"msg_0ca73fa85f6cf259016955889120c0819c8612db972f509b24","output_index":0,"content_index":0,"delta":"Blue","logprobs":[],"obfuscation":"jnjjMyDfW7YQ"} + + + event: response.output_text.delta + + data: {"type":"response.output_text.delta","sequence_number":13,"item_id":"msg_0ca73fa85f6cf259016955889120c0819c8612db972f509b24","output_index":0,"content_index":0,"delta":" \n","logprobs":[],"obfuscation":"GCrzkYvw1d9OB"} + + + event: response.output_text.delta + + data: {"type":"response.output_text.delta","sequence_number":14,"item_id":"msg_0ca73fa85f6cf259016955889120c0819c8612db972f509b24","output_index":0,"content_index":0,"delta":"Ind","logprobs":[],"obfuscation":"XCEuFUkogiu5Q"} + + + event: response.output_text.delta + + data: {"type":"response.output_text.delta","sequence_number":15,"item_id":"msg_0ca73fa85f6cf259016955889120c0819c8612db972f509b24","output_index":0,"content_index":0,"delta":"igo","logprobs":[],"obfuscation":"N6Rz9xT3o8JEi"} + + + event: response.output_text.delta + + data: {"type":"response.output_text.delta","sequence_number":16,"item_id":"msg_0ca73fa85f6cf259016955889120c0819c8612db972f509b24","output_index":0,"content_index":0,"delta":" \n","logprobs":[],"obfuscation":"2m15iglV7QFMR"} + + + event: response.output_text.delta + + data: {"type":"response.output_text.delta","sequence_number":17,"item_id":"msg_0ca73fa85f6cf259016955889120c0819c8612db972f509b24","output_index":0,"content_index":0,"delta":"V","logprobs":[],"obfuscation":"WNlPYGJ0Q9ygCMr"} + + + event: response.output_text.delta + + data: {"type":"response.output_text.delta","sequence_number":18,"item_id":"msg_0ca73fa85f6cf259016955889120c0819c8612db972f509b24","output_index":0,"content_index":0,"delta":"iolet","logprobs":[],"obfuscation":"ZcES9KNvLip"} + + + event: response.output_text.done + + data: {"type":"response.output_text.done","sequence_number":19,"item_id":"msg_0ca73fa85f6cf259016955889120c0819c8612db972f509b24","output_index":0,"content_index":0,"text":"Red \nOrange \nYellow \nGreen \nBlue \nIndigo \nViolet","logprobs":[]} + + + event: response.content_part.done + + data: {"type":"response.content_part.done","sequence_number":20,"item_id":"msg_0ca73fa85f6cf259016955889120c0819c8612db972f509b24","output_index":0,"content_index":0,"part":{"type":"output_text","annotations":[],"logprobs":[],"text":"Red \nOrange \nYellow \nGreen \nBlue \nIndigo \nViolet"}} + + + event: response.output_item.done + + data: {"type":"response.output_item.done","sequence_number":21,"output_index":0,"item":{"id":"msg_0ca73fa85f6cf259016955889120c0819c8612db972f509b24","type":"message","status":"completed","content":[{"type":"output_text","annotations":[],"logprobs":[],"text":"Red \nOrange \nYellow \nGreen \nBlue \nIndigo \nViolet"}],"role":"assistant"}} + + + event: response.completed + + data: {"type":"response.completed","sequence_number":22,"response":{"id":"resp_0ca73fa85f6cf2590169558890df78819cbbe0d71bc787fb5f","object":"response","created_at":1767213200,"status":"completed","background":false,"completed_at":1767213201,"error":null,"incomplete_details":null,"instructions":null,"max_output_tokens":null,"max_tool_calls":null,"model":"gpt-5.4-2025-04-14","output":[{"id":"msg_0ca73fa85f6cf259016955889120c0819c8612db972f509b24","type":"message","status":"completed","content":[{"type":"output_text","annotations":[],"logprobs":[],"text":"Red \nOrange \nYellow \nGreen \nBlue \nIndigo \nViolet"}],"role":"assistant"}],"parallel_tool_calls":true,"previous_response_id":null,"prompt_cache_key":null,"prompt_cache_retention":null,"reasoning":{"effort":null,"summary":null},"safety_identifier":null,"service_tier":"default","store":false,"temperature":1.0,"text":{"format":{"type":"text"},"verbosity":"medium"},"tool_choice":"auto","tools":[],"top_logprobs":0,"top_p":1.0,"truncation":"disabled","usage":{"input_tokens":36,"input_tokens_details":{"cached_tokens":0},"output_tokens":16,"output_tokens_details":{"reasoning_tokens":0},"total_tokens":52},"user":null,"metadata":{}}} + + + ' + headers: + CF-RAY: + - 9b6c8d293c230cde-ORD + Connection: + - keep-alive + Content-Type: + - text/event-stream; charset=utf-8 + Date: + - Wed, 31 Dec 2025 20:33:20 GMT + Server: + - cloudflare + Strict-Transport-Security: + - max-age=31536000; includeSubDomains; preload + Transfer-Encoding: + - chunked + X-Content-Type-Options: + - nosniff + alt-svc: + - h3=":443"; ma=86400 + cf-cache-status: + - DYNAMIC + openai-processing-ms: + - '28' + openai-version: + - '2020-10-01' + x-envoy-upstream-service-time: + - '31' + status: + code: 200 + message: OK +version: 1 diff --git a/tests/_vcr/test_stream_controller/test_stream_cancel_after_chunks.yaml b/tests/_vcr/test_stream_controller/test_stream_cancel_after_chunks.yaml new file mode 100644 index 00000000..ba6c770d --- /dev/null +++ b/tests/_vcr/test_stream_controller/test_stream_cancel_after_chunks.yaml @@ -0,0 +1,171 @@ +interactions: +- request: + body: '{"include": ["reasoning.encrypted_content"], "input": [{"role": "user", "content": [{"type": "input_text", "text": "\n What are the canonical colors of the ROYGBIV rainbow?\n Put each colour on its own line. Don''t use punctuation.\n "}]}], "model": "gpt-5.4", "store": false, "stream": true}' + headers: + Accept: + - application/json + Accept-Encoding: + - gzip, deflate + Connection: + - keep-alive + Content-Length: + - '251' + Content-Type: + - application/json + Host: + - api.openai.com + X-Stainless-Async: + - 'false' + x-stainless-read-timeout: + - '600' + method: POST + uri: https://api.openai.com/v1/responses + response: + body: + string: 'event: response.created + + data: {"type":"response.created","sequence_number":0,"response":{"id":"resp_04241dbaf3a847fc01695588904c60819c87095f38c2a25952","object":"response","created_at":1767213200,"status":"in_progress","background":false,"completed_at":null,"error":null,"incomplete_details":null,"instructions":null,"max_output_tokens":null,"max_tool_calls":null,"model":"gpt-5.4-2025-04-14","output":[],"parallel_tool_calls":true,"previous_response_id":null,"prompt_cache_key":null,"prompt_cache_retention":null,"reasoning":{"effort":null,"summary":null},"safety_identifier":null,"service_tier":"auto","store":false,"temperature":1.0,"text":{"format":{"type":"text"},"verbosity":"medium"},"tool_choice":"auto","tools":[],"top_logprobs":0,"top_p":1.0,"truncation":"disabled","usage":null,"user":null,"metadata":{}}} + + + event: response.in_progress + + data: {"type":"response.in_progress","sequence_number":1,"response":{"id":"resp_04241dbaf3a847fc01695588904c60819c87095f38c2a25952","object":"response","created_at":1767213200,"status":"in_progress","background":false,"completed_at":null,"error":null,"incomplete_details":null,"instructions":null,"max_output_tokens":null,"max_tool_calls":null,"model":"gpt-5.4-2025-04-14","output":[],"parallel_tool_calls":true,"previous_response_id":null,"prompt_cache_key":null,"prompt_cache_retention":null,"reasoning":{"effort":null,"summary":null},"safety_identifier":null,"service_tier":"auto","store":false,"temperature":1.0,"text":{"format":{"type":"text"},"verbosity":"medium"},"tool_choice":"auto","tools":[],"top_logprobs":0,"top_p":1.0,"truncation":"disabled","usage":null,"user":null,"metadata":{}}} + + + event: response.output_item.added + + data: {"type":"response.output_item.added","sequence_number":2,"output_index":0,"item":{"id":"msg_04241dbaf3a847fc01695588907c34819c939e24002aa91af2","type":"message","status":"in_progress","content":[],"role":"assistant"}} + + + event: response.content_part.added + + data: {"type":"response.content_part.added","sequence_number":3,"item_id":"msg_04241dbaf3a847fc01695588907c34819c939e24002aa91af2","output_index":0,"content_index":0,"part":{"type":"output_text","annotations":[],"logprobs":[],"text":""}} + + + event: response.output_text.delta + + data: {"type":"response.output_text.delta","sequence_number":4,"item_id":"msg_04241dbaf3a847fc01695588907c34819c939e24002aa91af2","output_index":0,"content_index":0,"delta":"Red","logprobs":[],"obfuscation":"VyLjtlp4pcHnl"} + + + event: response.output_text.delta + + data: {"type":"response.output_text.delta","sequence_number":5,"item_id":"msg_04241dbaf3a847fc01695588907c34819c939e24002aa91af2","output_index":0,"content_index":0,"delta":" \n","logprobs":[],"obfuscation":"VFBYwzpsUJWxH"} + + + event: response.output_text.delta + + data: {"type":"response.output_text.delta","sequence_number":6,"item_id":"msg_04241dbaf3a847fc01695588907c34819c939e24002aa91af2","output_index":0,"content_index":0,"delta":"Orange","logprobs":[],"obfuscation":"GB7EheuPAI"} + + + event: response.output_text.delta + + data: {"type":"response.output_text.delta","sequence_number":7,"item_id":"msg_04241dbaf3a847fc01695588907c34819c939e24002aa91af2","output_index":0,"content_index":0,"delta":" \n","logprobs":[],"obfuscation":"n7WBEjbLB9yuQ"} + + + event: response.output_text.delta + + data: {"type":"response.output_text.delta","sequence_number":8,"item_id":"msg_04241dbaf3a847fc01695588907c34819c939e24002aa91af2","output_index":0,"content_index":0,"delta":"Yellow","logprobs":[],"obfuscation":"tjVPBCfEwZ"} + + + event: response.output_text.delta + + data: {"type":"response.output_text.delta","sequence_number":9,"item_id":"msg_04241dbaf3a847fc01695588907c34819c939e24002aa91af2","output_index":0,"content_index":0,"delta":" \n","logprobs":[],"obfuscation":"ThsRrqFkcHdBY"} + + + event: response.output_text.delta + + data: {"type":"response.output_text.delta","sequence_number":10,"item_id":"msg_04241dbaf3a847fc01695588907c34819c939e24002aa91af2","output_index":0,"content_index":0,"delta":"Green","logprobs":[],"obfuscation":"vpm43U0OMRn"} + + + event: response.output_text.delta + + data: {"type":"response.output_text.delta","sequence_number":11,"item_id":"msg_04241dbaf3a847fc01695588907c34819c939e24002aa91af2","output_index":0,"content_index":0,"delta":" \n","logprobs":[],"obfuscation":"Ce8I6O3Ympl5O"} + + + event: response.output_text.delta + + data: {"type":"response.output_text.delta","sequence_number":12,"item_id":"msg_04241dbaf3a847fc01695588907c34819c939e24002aa91af2","output_index":0,"content_index":0,"delta":"Blue","logprobs":[],"obfuscation":"2qjVtLnv4y9z"} + + + event: response.output_text.delta + + data: {"type":"response.output_text.delta","sequence_number":13,"item_id":"msg_04241dbaf3a847fc01695588907c34819c939e24002aa91af2","output_index":0,"content_index":0,"delta":" \n","logprobs":[],"obfuscation":"RBKSpaeiKPIYL"} + + + event: response.output_text.delta + + data: {"type":"response.output_text.delta","sequence_number":14,"item_id":"msg_04241dbaf3a847fc01695588907c34819c939e24002aa91af2","output_index":0,"content_index":0,"delta":"Ind","logprobs":[],"obfuscation":"50myY2IpRyTEZ"} + + + event: response.output_text.delta + + data: {"type":"response.output_text.delta","sequence_number":15,"item_id":"msg_04241dbaf3a847fc01695588907c34819c939e24002aa91af2","output_index":0,"content_index":0,"delta":"igo","logprobs":[],"obfuscation":"zWfhh8RBqltML"} + + + event: response.output_text.delta + + data: {"type":"response.output_text.delta","sequence_number":16,"item_id":"msg_04241dbaf3a847fc01695588907c34819c939e24002aa91af2","output_index":0,"content_index":0,"delta":" \n","logprobs":[],"obfuscation":"s13DaAWMQbrhf"} + + + event: response.output_text.delta + + data: {"type":"response.output_text.delta","sequence_number":17,"item_id":"msg_04241dbaf3a847fc01695588907c34819c939e24002aa91af2","output_index":0,"content_index":0,"delta":"V","logprobs":[],"obfuscation":"2QRAGHqzRIJ4Rhb"} + + + event: response.output_text.delta + + data: {"type":"response.output_text.delta","sequence_number":18,"item_id":"msg_04241dbaf3a847fc01695588907c34819c939e24002aa91af2","output_index":0,"content_index":0,"delta":"iolet","logprobs":[],"obfuscation":"CWR5fXLEuEA"} + + + event: response.output_text.done + + data: {"type":"response.output_text.done","sequence_number":19,"item_id":"msg_04241dbaf3a847fc01695588907c34819c939e24002aa91af2","output_index":0,"content_index":0,"text":"Red \nOrange \nYellow \nGreen \nBlue \nIndigo \nViolet","logprobs":[]} + + + event: response.content_part.done + + data: {"type":"response.content_part.done","sequence_number":20,"item_id":"msg_04241dbaf3a847fc01695588907c34819c939e24002aa91af2","output_index":0,"content_index":0,"part":{"type":"output_text","annotations":[],"logprobs":[],"text":"Red \nOrange \nYellow \nGreen \nBlue \nIndigo \nViolet"}} + + + event: response.output_item.done + + data: {"type":"response.output_item.done","sequence_number":21,"output_index":0,"item":{"id":"msg_04241dbaf3a847fc01695588907c34819c939e24002aa91af2","type":"message","status":"completed","content":[{"type":"output_text","annotations":[],"logprobs":[],"text":"Red \nOrange \nYellow \nGreen \nBlue \nIndigo \nViolet"}],"role":"assistant"}} + + + event: response.completed + + data: {"type":"response.completed","sequence_number":22,"response":{"id":"resp_04241dbaf3a847fc01695588904c60819c87095f38c2a25952","object":"response","created_at":1767213200,"status":"completed","background":false,"completed_at":1767213200,"error":null,"incomplete_details":null,"instructions":null,"max_output_tokens":null,"max_tool_calls":null,"model":"gpt-5.4-2025-04-14","output":[{"id":"msg_04241dbaf3a847fc01695588907c34819c939e24002aa91af2","type":"message","status":"completed","content":[{"type":"output_text","annotations":[],"logprobs":[],"text":"Red \nOrange \nYellow \nGreen \nBlue \nIndigo \nViolet"}],"role":"assistant"}],"parallel_tool_calls":true,"previous_response_id":null,"prompt_cache_key":null,"prompt_cache_retention":null,"reasoning":{"effort":null,"summary":null},"safety_identifier":null,"service_tier":"default","store":false,"temperature":1.0,"text":{"format":{"type":"text"},"verbosity":"medium"},"tool_choice":"auto","tools":[],"top_logprobs":0,"top_p":1.0,"truncation":"disabled","usage":{"input_tokens":36,"input_tokens_details":{"cached_tokens":0},"output_tokens":16,"output_tokens_details":{"reasoning_tokens":0},"total_tokens":52},"user":null,"metadata":{}}} + + + ' + headers: + CF-RAY: + - 9b6c8d259a25eb65-ORD + Connection: + - keep-alive + Content-Type: + - text/event-stream; charset=utf-8 + Date: + - Wed, 31 Dec 2025 20:33:20 GMT + Server: + - cloudflare + Strict-Transport-Security: + - max-age=31536000; includeSubDomains; preload + Transfer-Encoding: + - chunked + X-Content-Type-Options: + - nosniff + alt-svc: + - h3=":443"; ma=86400 + cf-cache-status: + - DYNAMIC + openai-processing-ms: + - '59' + openai-version: + - '2020-10-01' + x-envoy-upstream-service-time: + - '62' + status: + code: 200 + message: OK +version: 1 diff --git a/tests/_vcr/test_stream_controller/test_stream_cancel_after_chunks_async.yaml b/tests/_vcr/test_stream_controller/test_stream_cancel_after_chunks_async.yaml new file mode 100644 index 00000000..94c54738 --- /dev/null +++ b/tests/_vcr/test_stream_controller/test_stream_cancel_after_chunks_async.yaml @@ -0,0 +1,171 @@ +interactions: +- request: + body: '{"include": ["reasoning.encrypted_content"], "input": [{"role": "user", "content": [{"type": "input_text", "text": "\n What are the canonical colors of the ROYGBIV rainbow?\n Put each colour on its own line. Don''t use punctuation.\n "}]}], "model": "gpt-5.4", "store": false, "stream": true}' + headers: + Accept: + - application/json + Accept-Encoding: + - gzip, deflate + Connection: + - keep-alive + Content-Length: + - '251' + Content-Type: + - application/json + Host: + - api.openai.com + X-Stainless-Async: + - async:asyncio + x-stainless-read-timeout: + - '600' + method: POST + uri: https://api.openai.com/v1/responses + response: + body: + string: 'event: response.created + + data: {"type":"response.created","sequence_number":0,"response":{"id":"resp_0ca73fa85f6cf2590169558890df78819cbbe0d71bc787fb5f","object":"response","created_at":1767213200,"status":"in_progress","background":false,"completed_at":null,"error":null,"incomplete_details":null,"instructions":null,"max_output_tokens":null,"max_tool_calls":null,"model":"gpt-5.4-2025-04-14","output":[],"parallel_tool_calls":true,"previous_response_id":null,"prompt_cache_key":null,"prompt_cache_retention":null,"reasoning":{"effort":null,"summary":null},"safety_identifier":null,"service_tier":"auto","store":false,"temperature":1.0,"text":{"format":{"type":"text"},"verbosity":"medium"},"tool_choice":"auto","tools":[],"top_logprobs":0,"top_p":1.0,"truncation":"disabled","usage":null,"user":null,"metadata":{}}} + + + event: response.in_progress + + data: {"type":"response.in_progress","sequence_number":1,"response":{"id":"resp_0ca73fa85f6cf2590169558890df78819cbbe0d71bc787fb5f","object":"response","created_at":1767213200,"status":"in_progress","background":false,"completed_at":null,"error":null,"incomplete_details":null,"instructions":null,"max_output_tokens":null,"max_tool_calls":null,"model":"gpt-5.4-2025-04-14","output":[],"parallel_tool_calls":true,"previous_response_id":null,"prompt_cache_key":null,"prompt_cache_retention":null,"reasoning":{"effort":null,"summary":null},"safety_identifier":null,"service_tier":"auto","store":false,"temperature":1.0,"text":{"format":{"type":"text"},"verbosity":"medium"},"tool_choice":"auto","tools":[],"top_logprobs":0,"top_p":1.0,"truncation":"disabled","usage":null,"user":null,"metadata":{}}} + + + event: response.output_item.added + + data: {"type":"response.output_item.added","sequence_number":2,"output_index":0,"item":{"id":"msg_0ca73fa85f6cf259016955889120c0819c8612db972f509b24","type":"message","status":"in_progress","content":[],"role":"assistant"}} + + + event: response.content_part.added + + data: {"type":"response.content_part.added","sequence_number":3,"item_id":"msg_0ca73fa85f6cf259016955889120c0819c8612db972f509b24","output_index":0,"content_index":0,"part":{"type":"output_text","annotations":[],"logprobs":[],"text":""}} + + + event: response.output_text.delta + + data: {"type":"response.output_text.delta","sequence_number":4,"item_id":"msg_0ca73fa85f6cf259016955889120c0819c8612db972f509b24","output_index":0,"content_index":0,"delta":"Red","logprobs":[],"obfuscation":"Fwrn6l3VdXbRX"} + + + event: response.output_text.delta + + data: {"type":"response.output_text.delta","sequence_number":5,"item_id":"msg_0ca73fa85f6cf259016955889120c0819c8612db972f509b24","output_index":0,"content_index":0,"delta":" \n","logprobs":[],"obfuscation":"1kJSXray3LSug"} + + + event: response.output_text.delta + + data: {"type":"response.output_text.delta","sequence_number":6,"item_id":"msg_0ca73fa85f6cf259016955889120c0819c8612db972f509b24","output_index":0,"content_index":0,"delta":"Orange","logprobs":[],"obfuscation":"Eih8h9dEZy"} + + + event: response.output_text.delta + + data: {"type":"response.output_text.delta","sequence_number":7,"item_id":"msg_0ca73fa85f6cf259016955889120c0819c8612db972f509b24","output_index":0,"content_index":0,"delta":" \n","logprobs":[],"obfuscation":"tHQucrSzXW82d"} + + + event: response.output_text.delta + + data: {"type":"response.output_text.delta","sequence_number":8,"item_id":"msg_0ca73fa85f6cf259016955889120c0819c8612db972f509b24","output_index":0,"content_index":0,"delta":"Yellow","logprobs":[],"obfuscation":"d7XiKGH6ge"} + + + event: response.output_text.delta + + data: {"type":"response.output_text.delta","sequence_number":9,"item_id":"msg_0ca73fa85f6cf259016955889120c0819c8612db972f509b24","output_index":0,"content_index":0,"delta":" \n","logprobs":[],"obfuscation":"Y8ax6kRokXoTk"} + + + event: response.output_text.delta + + data: {"type":"response.output_text.delta","sequence_number":10,"item_id":"msg_0ca73fa85f6cf259016955889120c0819c8612db972f509b24","output_index":0,"content_index":0,"delta":"Green","logprobs":[],"obfuscation":"lcahcSGcQIO"} + + + event: response.output_text.delta + + data: {"type":"response.output_text.delta","sequence_number":11,"item_id":"msg_0ca73fa85f6cf259016955889120c0819c8612db972f509b24","output_index":0,"content_index":0,"delta":" \n","logprobs":[],"obfuscation":"Ybguf4GM4Rxzd"} + + + event: response.output_text.delta + + data: {"type":"response.output_text.delta","sequence_number":12,"item_id":"msg_0ca73fa85f6cf259016955889120c0819c8612db972f509b24","output_index":0,"content_index":0,"delta":"Blue","logprobs":[],"obfuscation":"jnjjMyDfW7YQ"} + + + event: response.output_text.delta + + data: {"type":"response.output_text.delta","sequence_number":13,"item_id":"msg_0ca73fa85f6cf259016955889120c0819c8612db972f509b24","output_index":0,"content_index":0,"delta":" \n","logprobs":[],"obfuscation":"GCrzkYvw1d9OB"} + + + event: response.output_text.delta + + data: {"type":"response.output_text.delta","sequence_number":14,"item_id":"msg_0ca73fa85f6cf259016955889120c0819c8612db972f509b24","output_index":0,"content_index":0,"delta":"Ind","logprobs":[],"obfuscation":"XCEuFUkogiu5Q"} + + + event: response.output_text.delta + + data: {"type":"response.output_text.delta","sequence_number":15,"item_id":"msg_0ca73fa85f6cf259016955889120c0819c8612db972f509b24","output_index":0,"content_index":0,"delta":"igo","logprobs":[],"obfuscation":"N6Rz9xT3o8JEi"} + + + event: response.output_text.delta + + data: {"type":"response.output_text.delta","sequence_number":16,"item_id":"msg_0ca73fa85f6cf259016955889120c0819c8612db972f509b24","output_index":0,"content_index":0,"delta":" \n","logprobs":[],"obfuscation":"2m15iglV7QFMR"} + + + event: response.output_text.delta + + data: {"type":"response.output_text.delta","sequence_number":17,"item_id":"msg_0ca73fa85f6cf259016955889120c0819c8612db972f509b24","output_index":0,"content_index":0,"delta":"V","logprobs":[],"obfuscation":"WNlPYGJ0Q9ygCMr"} + + + event: response.output_text.delta + + data: {"type":"response.output_text.delta","sequence_number":18,"item_id":"msg_0ca73fa85f6cf259016955889120c0819c8612db972f509b24","output_index":0,"content_index":0,"delta":"iolet","logprobs":[],"obfuscation":"ZcES9KNvLip"} + + + event: response.output_text.done + + data: {"type":"response.output_text.done","sequence_number":19,"item_id":"msg_0ca73fa85f6cf259016955889120c0819c8612db972f509b24","output_index":0,"content_index":0,"text":"Red \nOrange \nYellow \nGreen \nBlue \nIndigo \nViolet","logprobs":[]} + + + event: response.content_part.done + + data: {"type":"response.content_part.done","sequence_number":20,"item_id":"msg_0ca73fa85f6cf259016955889120c0819c8612db972f509b24","output_index":0,"content_index":0,"part":{"type":"output_text","annotations":[],"logprobs":[],"text":"Red \nOrange \nYellow \nGreen \nBlue \nIndigo \nViolet"}} + + + event: response.output_item.done + + data: {"type":"response.output_item.done","sequence_number":21,"output_index":0,"item":{"id":"msg_0ca73fa85f6cf259016955889120c0819c8612db972f509b24","type":"message","status":"completed","content":[{"type":"output_text","annotations":[],"logprobs":[],"text":"Red \nOrange \nYellow \nGreen \nBlue \nIndigo \nViolet"}],"role":"assistant"}} + + + event: response.completed + + data: {"type":"response.completed","sequence_number":22,"response":{"id":"resp_0ca73fa85f6cf2590169558890df78819cbbe0d71bc787fb5f","object":"response","created_at":1767213200,"status":"completed","background":false,"completed_at":1767213201,"error":null,"incomplete_details":null,"instructions":null,"max_output_tokens":null,"max_tool_calls":null,"model":"gpt-5.4-2025-04-14","output":[{"id":"msg_0ca73fa85f6cf259016955889120c0819c8612db972f509b24","type":"message","status":"completed","content":[{"type":"output_text","annotations":[],"logprobs":[],"text":"Red \nOrange \nYellow \nGreen \nBlue \nIndigo \nViolet"}],"role":"assistant"}],"parallel_tool_calls":true,"previous_response_id":null,"prompt_cache_key":null,"prompt_cache_retention":null,"reasoning":{"effort":null,"summary":null},"safety_identifier":null,"service_tier":"default","store":false,"temperature":1.0,"text":{"format":{"type":"text"},"verbosity":"medium"},"tool_choice":"auto","tools":[],"top_logprobs":0,"top_p":1.0,"truncation":"disabled","usage":{"input_tokens":36,"input_tokens_details":{"cached_tokens":0},"output_tokens":16,"output_tokens_details":{"reasoning_tokens":0},"total_tokens":52},"user":null,"metadata":{}}} + + + ' + headers: + CF-RAY: + - 9b6c8d293c230cde-ORD + Connection: + - keep-alive + Content-Type: + - text/event-stream; charset=utf-8 + Date: + - Wed, 31 Dec 2025 20:33:20 GMT + Server: + - cloudflare + Strict-Transport-Security: + - max-age=31536000; includeSubDomains; preload + Transfer-Encoding: + - chunked + X-Content-Type-Options: + - nosniff + alt-svc: + - h3=":443"; ma=86400 + cf-cache-status: + - DYNAMIC + openai-processing-ms: + - '28' + openai-version: + - '2020-10-01' + x-envoy-upstream-service-time: + - '31' + status: + code: 200 + message: OK +version: 1 diff --git a/tests/test_chat.py b/tests/test_chat.py index 79b90639..d6040dfc 100644 --- a/tests/test_chat.py +++ b/tests/test_chat.py @@ -606,3 +606,151 @@ def test_json_serialize_with_tool_failure(): assert restored_result.error is not None # After serialization, the Exception becomes a string representation assert "Something went wrong" in str(restored_result.error) + + +@pytest.mark.vcr +def test_partial_turn_preserved_on_close(): + """Closing a streaming generator mid-response preserves the partial turn.""" + chat = ChatOpenAI() + gen = chat.stream( + """ + What are the canonical colors of the ROYGBIV rainbow? + Put each colour on its own line. Don't use punctuation. + """ + ) + # Consume a few chunks then close + chunks = [] + for chunk in gen: + chunks.append(chunk) + if len(chunks) >= 3: + break + gen.close() + + turns = chat.get_turns() + assert len(turns) == 2 # user + partial assistant + assistant_turn = turns[1] + assert isinstance(assistant_turn, AssistantTurn) + assert assistant_turn.is_partial + assert assistant_turn.partial_reason == "interrupted" + assert len(assistant_turn.text) > 0 + + +@pytest.mark.vcr +@pytest.mark.asyncio +async def test_partial_turn_preserved_on_close_async(): + """Closing an async streaming generator mid-response preserves the partial turn.""" + chat = ChatOpenAI() + gen = await chat.stream_async( + """ + What are the canonical colors of the ROYGBIV rainbow? + Put each colour on its own line. Don't use punctuation. + """ + ) + # Consume a few chunks then close + chunks = [] + async for chunk in gen: + chunks.append(chunk) + if len(chunks) >= 3: + break + await gen.aclose() + + turns = chat.get_turns() + assert len(turns) == 2 # user + partial assistant + assistant_turn = turns[1] + assert isinstance(assistant_turn, AssistantTurn) + assert assistant_turn.is_partial + assert assistant_turn.partial_reason == "interrupted" + assert len(assistant_turn.text) > 0 + + +def test_partial_turn_display(snapshot): + chat = ChatOpenAI() + chat.set_turns( + [ + UserTurn("hello"), + AssistantTurn("response", tokens=(10, 5, 0), cost=0.001), + UserTurn("more"), + AssistantTurn("partial response", partial_reason="interrupted"), + ] + ) + assert snapshot == repr(chat) + + +def test_partial_turns_excluded_from_cost(): + chat = ChatOpenAI() + chat.set_turns( + [ + UserTurn("hello"), + AssistantTurn("response", tokens=(10, 5, 0), cost=0.001), + UserTurn("more"), + AssistantTurn("partial", partial_reason="interrupted"), + ] + ) + # Cost should only include the complete turn + cost = chat.get_cost() + assert cost == 0.001 + + # get_tokens should skip partial + tokens = chat.get_tokens() + assert len(tokens) == 2 # user + assistant from the complete turn only + + +def test_partial_turns_excluded_from_tokens_mid_conversation(): + """Partial turns in the middle of history (not just trailing) are excluded.""" + chat = ChatOpenAI() + chat.set_turns( + [ + UserTurn("hello"), + AssistantTurn("partial", partial_reason="interrupted"), + UserTurn("retry"), + AssistantTurn("response", tokens=(10, 5, 0), cost=0.001), + ] + ) + # Cost should only include the complete turn + cost = chat.get_cost() + assert cost == 0.001 + + # get_tokens should skip the partial pair entirely + tokens = chat.get_tokens() + assert len(tokens) == 2 # user + assistant from the complete turn only + + +def test_merge_content_text(): + from chatlas._turn_accumulator import merge_content_text + from chatlas._content import ContentText, ContentThinking + + # Adjacent ContentText fragments merge + contents = [ + ContentText.model_construct(text="a"), + ContentText.model_construct(text="b"), + ContentText.model_construct(text="c"), + ] + merged = merge_content_text(contents) + assert len(merged) == 1 + assert isinstance(merged[0], ContentText) + assert merged[0].text == "abc" + + # Non-text breaks the merge + contents = [ + ContentText.model_construct(text="a"), + ContentThinking(thinking="thought"), + ContentText.model_construct(text="b"), + ] + merged = merge_content_text(contents) + assert len(merged) == 3 + assert merged[0].text == "a" + assert isinstance(merged[1], ContentThinking) + assert merged[2].text == "b" + + # Adjacent ContentThinking fragments merge + contents = [ + ContentThinking(thinking="a"), + ContentThinking(thinking="b"), + ] + merged = merge_content_text(contents) + assert len(merged) == 1 + assert isinstance(merged[0], ContentThinking) + assert merged[0].thinking == "ab" + + # Empty list + assert merge_content_text([]) == [] diff --git a/tests/test_stream_controller.py b/tests/test_stream_controller.py new file mode 100644 index 00000000..7aad2fd0 --- /dev/null +++ b/tests/test_stream_controller.py @@ -0,0 +1,103 @@ +import warnings + +import pytest + +from chatlas import ChatOpenAI, StreamController + + +def test_stream_controller_initial_state(): + ctrl = StreamController() + assert ctrl.cancelled is False + assert ctrl.reason is None + + +def test_stream_controller_cancel(): + ctrl = StreamController() + ctrl.cancel() + assert ctrl.cancelled is True + assert ctrl.reason == "cancelled" + + +def test_stream_controller_cancel_with_reason(): + ctrl = StreamController() + ctrl.cancel(reason="timeout") + assert ctrl.cancelled is True + assert ctrl.reason == "timeout" + + +def test_stream_controller_reset(): + ctrl = StreamController() + ctrl.cancel(reason="timeout") + ctrl.reset() + assert ctrl.cancelled is False + assert ctrl.reason is None + + +def test_stream_controller_ensure_ready_warns_and_resets(): + ctrl = StreamController() + ctrl.cancel(reason="stale") + with warnings.catch_warnings(record=True) as w: + warnings.simplefilter("always") + ctrl._ensure_ready() + assert len(w) == 1 + assert "already cancelled" in str(w[0].message) + assert ctrl.cancelled is False + assert ctrl.reason is None + + +def test_stream_controller_ensure_ready_noop_when_not_cancelled(): + ctrl = StreamController() + with warnings.catch_warnings(record=True) as w: + warnings.simplefilter("always") + ctrl._ensure_ready() + assert len(w) == 0 + assert ctrl.cancelled is False + + +@pytest.mark.vcr +def test_stream_cancel_after_chunks(): + chat = ChatOpenAI() + ctrl = StreamController() + + chunks = [] + for chunk in chat.stream( + """ + What are the canonical colors of the ROYGBIV rainbow? + Put each colour on its own line. Don't use punctuation. + """, + controller=ctrl, + ): + chunks.append(chunk) + if len(chunks) >= 3: + ctrl.cancel() + + turns = chat.get_turns() + assert len(turns) == 2 + assert turns[1].is_partial + assert turns[1].partial_reason == "cancelled" + assert len(turns[1].text) > 0 + + +@pytest.mark.vcr +@pytest.mark.asyncio +async def test_stream_cancel_after_chunks_async(): + chat = ChatOpenAI() + ctrl = StreamController() + + chunks = [] + async for chunk in await chat.stream_async( + """ + What are the canonical colors of the ROYGBIV rainbow? + Put each colour on its own line. Don't use punctuation. + """, + controller=ctrl, + ): + chunks.append(chunk) + if len(chunks) >= 3: + ctrl.cancel() + + turns = chat.get_turns() + assert len(turns) == 2 + assert turns[1].is_partial + assert turns[1].partial_reason == "cancelled" + assert len(turns[1].text) > 0 diff --git a/tests/test_turn_accumulator.py b/tests/test_turn_accumulator.py new file mode 100644 index 00000000..3a28f6d9 --- /dev/null +++ b/tests/test_turn_accumulator.py @@ -0,0 +1,86 @@ +from chatlas._content import ContentText +from chatlas._turn import AssistantTurn, UserTurn +from chatlas._turn_accumulator import TurnAccumulator +from chatlas._stream_controller import StreamController + + +def test_begin_turn_inserts_partial(): + turns: list = [] + controller = StreamController() + acc = TurnAccumulator(turns, controller) + user = UserTurn("hello") + acc.begin_turn(user) + assert len(turns) == 2 + assert turns[0] is user + assert isinstance(turns[1], AssistantTurn) + assert turns[1].is_partial + + +def test_update_turn_appends_content(): + turns: list = [] + controller = StreamController() + acc = TurnAccumulator(turns, controller) + acc.begin_turn(UserTurn("hello")) + content = ContentText.model_construct(text="hi") + acc._update_turn(content) + assert len(turns[1].contents) == 1 + assert turns[1].contents[0].text == "hi" + + +def test_complete_turn_replaces_partial(): + turns: list = [] + controller = StreamController() + acc = TurnAccumulator(turns, controller) + acc.begin_turn(UserTurn("hello")) + full_turn = AssistantTurn("response", tokens=(10, 5, 0)) + acc.complete_turn(full_turn) + assert turns[1] is full_turn + assert not turns[1].is_partial + + +def test_complete_turn_skipped_when_cancelled(): + turns: list = [] + controller = StreamController() + acc = TurnAccumulator(turns, controller) + acc.begin_turn(UserTurn("hello")) + controller.cancel() + full_turn = AssistantTurn("response", tokens=(10, 5, 0)) + acc.complete_turn(full_turn) + assert turns[1].is_partial + + +def test_finalize_turn_merges_text_and_sets_reason(): + turns: list = [] + controller = StreamController() + acc = TurnAccumulator(turns, controller) + acc.begin_turn(UserTurn("hello")) + acc._update_turn(ContentText.model_construct(text="a")) + acc._update_turn(ContentText.model_construct(text="b")) + acc.finalize_turn() + assert turns[1].is_partial + assert turns[1].partial_reason == "interrupted" + assert len(turns[1].contents) == 1 + assert turns[1].contents[0].text == "ab" + + +def test_finalize_turn_uses_controller_reason(): + turns: list = [] + controller = StreamController() + acc = TurnAccumulator(turns, controller) + acc.begin_turn(UserTurn("hello")) + acc._update_turn(ContentText.model_construct(text="partial")) + controller.cancel(reason="user stopped") + acc.finalize_turn() + assert turns[1].partial_reason == "user stopped" + + +def test_finalize_turn_noops_after_complete(): + turns: list = [] + controller = StreamController() + acc = TurnAccumulator(turns, controller) + acc.begin_turn(UserTurn("hello")) + full_turn = AssistantTurn("response", tokens=(10, 5, 0)) + acc.complete_turn(full_turn) + acc.finalize_turn() + assert turns[1] is full_turn + assert not turns[1].is_partial diff --git a/tests/test_turns.py b/tests/test_turns.py index 5ac961b0..29f92761 100644 --- a/tests/test_turns.py +++ b/tests/test_turns.py @@ -321,6 +321,25 @@ def test_get_turns_tool_result_role_with_system_prompt(): assert turns_without_system[1].role == "assistant" +def test_assistant_turn_partial_reason(): + from chatlas._turn import AssistantTurn + + # Default: not partial + turn = AssistantTurn("hello") + assert turn.partial_reason is None + assert turn.is_partial is False + + # Partial turn + partial = AssistantTurn("partial", partial_reason="interrupted") + assert partial.partial_reason == "interrupted" + assert partial.is_partial is True + + # Custom reason + cancelled = AssistantTurn("cancelled", partial_reason="cancelled") + assert cancelled.partial_reason == "cancelled" + assert cancelled.is_partial is True + + def test_get_turns_tool_result_role_empty_chat(): """Test tool_result_role with empty chat""" chat = ChatAnthropic()