From 8e9e99eebab2fe0bf61b7adffaae6f1670cbebff Mon Sep 17 00:00:00 2001 From: Carson Date: Thu, 2 Apr 2026 11:01:59 -0500 Subject: [PATCH 01/22] feat: add partial_reason to AssistantTurn and merge_content_text helper Add partial_reason field and is_partial property to AssistantTurn for marking incomplete turns on stream interruption. Add merge_content_text() helper to combine adjacent ContentText/ContentThinking fragments. Co-Authored-By: Claude Opus 4.6 --- chatlas/_chat.py | 16 ++++++++++++++++ chatlas/_turn.py | 12 ++++++++++++ tests/test_chat.py | 41 +++++++++++++++++++++++++++++++++++++++++ tests/test_turns.py | 19 +++++++++++++++++++ 4 files changed, 88 insertions(+) diff --git a/chatlas/_chat.py b/chatlas/_chat.py index cccc4941..54bef368 100644 --- a/chatlas/_chat.py +++ b/chatlas/_chat.py @@ -3261,6 +3261,22 @@ def content_text(content: Content) -> str: return str(content) +def merge_content_text(contents: Sequence[Content]) -> list[Content]: + """Merge adjacent ContentText (and ContentThinking) fragments.""" + if not contents: + return [] + merged: list[Content] = [contents[0]] + for item in contents[1:]: + last = merged[-1] + if isinstance(last, ContentText) and isinstance(item, ContentText): + merged[-1] = ContentText.model_construct(text=last.text + item.text) + elif isinstance(last, ContentThinking) and isinstance(item, ContentThinking): + merged[-1] = ContentThinking(thinking=last.thinking + item.thinking) + else: + merged.append(item) + return merged + + def is_quarto(): return os.getenv("QUARTO_PYTHON", None) is not None diff --git a/chatlas/_turn.py b/chatlas/_turn.py index 2cdcbf60..58246c59 100644 --- a/chatlas/_turn.py +++ b/chatlas/_turn.py @@ -308,6 +308,9 @@ class AssistantTurn(Turn, Generic[CompletionT]): cost The cost of this turn in USD. This is computed when the turn is created based on the token usage and pricing information (including service tier). + partial_reason + If set, indicates this turn is incomplete (e.g., the stream was interrupted + or cancelled). The value describes the reason for the partial state. See Also -------- @@ -322,6 +325,12 @@ class AssistantTurn(Turn, Generic[CompletionT]): finish_reason: Optional[str] = None completion: Optional[CompletionT] = Field(default=None, exclude=True) cost: Optional[float] = None + partial_reason: Optional[str] = None + + @property + def is_partial(self) -> bool: + """Whether this turn is a partial (interrupted/cancelled) turn.""" + return self.partial_reason is not None @field_validator("tokens", mode="before") @classmethod @@ -339,6 +348,7 @@ def __init__( finish_reason: Optional[str] = None, completion: Optional[CompletionT] = None, cost: Optional[float] = None, + partial_reason: Optional[str] = None, **kwargs, ): if isinstance(tokens, list): @@ -353,6 +363,8 @@ def __init__( kwargs["completion"] = completion if cost is not None: kwargs["cost"] = cost + if partial_reason is not None: + kwargs["partial_reason"] = partial_reason super().__init__(contents, **kwargs) diff --git a/tests/test_chat.py b/tests/test_chat.py index 1ef25cba..787d12b2 100644 --- a/tests/test_chat.py +++ b/tests/test_chat.py @@ -606,3 +606,44 @@ def test_json_serialize_with_tool_failure(): assert restored_result.error is not None # After serialization, the Exception becomes a string representation assert "Something went wrong" in str(restored_result.error) + + +def test_merge_content_text(): + from chatlas._chat import merge_content_text + from chatlas._content import ContentText, ContentThinking + + # Adjacent ContentText fragments merge + contents = [ + ContentText.model_construct(text="a"), + ContentText.model_construct(text="b"), + ContentText.model_construct(text="c"), + ] + merged = merge_content_text(contents) + assert len(merged) == 1 + assert isinstance(merged[0], ContentText) + assert merged[0].text == "abc" + + # Non-text breaks the merge + contents = [ + ContentText.model_construct(text="a"), + ContentThinking(thinking="thought"), + ContentText.model_construct(text="b"), + ] + merged = merge_content_text(contents) + assert len(merged) == 3 + assert merged[0].text == "a" + assert isinstance(merged[1], ContentThinking) + assert merged[2].text == "b" + + # Adjacent ContentThinking fragments merge + contents = [ + ContentThinking(thinking="a"), + ContentThinking(thinking="b"), + ] + merged = merge_content_text(contents) + assert len(merged) == 1 + assert isinstance(merged[0], ContentThinking) + assert merged[0].thinking == "ab" + + # Empty list + assert merge_content_text([]) == [] diff --git a/tests/test_turns.py b/tests/test_turns.py index 5ac961b0..29f92761 100644 --- a/tests/test_turns.py +++ b/tests/test_turns.py @@ -321,6 +321,25 @@ def test_get_turns_tool_result_role_with_system_prompt(): assert turns_without_system[1].role == "assistant" +def test_assistant_turn_partial_reason(): + from chatlas._turn import AssistantTurn + + # Default: not partial + turn = AssistantTurn("hello") + assert turn.partial_reason is None + assert turn.is_partial is False + + # Partial turn + partial = AssistantTurn("partial", partial_reason="interrupted") + assert partial.partial_reason == "interrupted" + assert partial.is_partial is True + + # Custom reason + cancelled = AssistantTurn("cancelled", partial_reason="cancelled") + assert cancelled.partial_reason == "cancelled" + assert cancelled.is_partial is True + + def test_get_turns_tool_result_role_empty_chat(): """Test tool_result_role with empty chat""" chat = ChatAnthropic() From b042ad74cf496d850ac0c40cf5ba3359bfe0678c Mon Sep 17 00:00:00 2001 From: Carson Date: Thu, 2 Apr 2026 11:09:08 -0500 Subject: [PATCH 02/22] feat: preserve partial turns on stream interruption via try/finally Restructure _submit_turns and _submit_turns_async to eagerly append a partial AssistantTurn to self._turns before streaming begins. On each chunk, content is appended to the partial turn in-place. On normal completion, the partial turn is replaced with the full turn. On interruption (GeneratorExit, KeyboardInterrupt, CancelledError), the finally block merges adjacent content fragments via merge_content_text(). Co-Authored-By: Claude Opus 4.6 --- chatlas/_chat.py | 178 +++++++++++------- .../test_partial_turn_preserved_on_close.yaml | 174 +++++++++++++++++ ...partial_turn_preserved_on_close_async.yaml | 174 +++++++++++++++++ tests/test_chat.py | 55 ++++++ 4 files changed, 517 insertions(+), 64 deletions(-) create mode 100644 tests/_vcr/test_chat/test_partial_turn_preserved_on_close.yaml create mode 100644 tests/_vcr/test_chat/test_partial_turn_preserved_on_close_async.yaml diff --git a/chatlas/_chat.py b/chatlas/_chat.py index 54bef368..a4a02ab7 100644 --- a/chatlas/_chat.py +++ b/chatlas/_chat.py @@ -2658,28 +2658,53 @@ def emit(text: str | Content): kwargs=all_kwargs, ) - result = None - for chunk in response: - content = self.provider.stream_content(chunk) - if content is not None: - text = content_text(content) - if text: - emit(text) - if content_mode == "all" and isinstance( - content, ContentThinking - ): - yield content - else: - yield text - result = self.provider.stream_merge_chunks(result, chunk) - - turn = self.provider.stream_turn( - result, - has_data_model=data_model is not None, + partial_turn: AssistantTurn = AssistantTurn( + [], partial_reason="interrupted" ) - - if echo == "all": - emit_other_contents(turn, emit) + self._turns.extend([user_turn, partial_turn]) + turn_idx = len(self._turns) - 1 + + try: + result = None + for chunk in response: + content = self.provider.stream_content(chunk) + if content is not None: + text = content_text(content) + if text: + emit(text) + self._turns[turn_idx].contents.append(content) + if content_mode == "all" and isinstance( + content, ContentThinking + ): + yield content + else: + yield text + result = self.provider.stream_merge_chunks(result, chunk) + else: + # Normal completion — replace partial with full turn + turn = self.provider.stream_turn( + result, + has_data_model=data_model is not None, + ) + if echo == "all": + emit_other_contents(turn, emit) + if not isinstance(turn, AssistantTurn): + raise TypeError( + f"Expected turn to be AssistantTurn, got {type(turn).__name__}" + ) + if turn.tokens is None and turn.completion: + turn.tokens = self.provider.value_tokens(turn.completion) + if turn.cost is None and turn.completion: + turn.cost = self.provider.value_cost( + turn.completion, turn.tokens + ) + if turn.tokens is not None: + tokens_log(self.provider, turn.tokens) + self._turns[turn_idx] = turn + finally: + turn = self._turns[turn_idx] + if turn.is_partial: + turn.contents = merge_content_text(turn.contents) else: response = self.provider.chat_perform( @@ -2700,17 +2725,17 @@ def emit(text: str | Content): if echo == "all": emit_other_contents(turn, emit) - if not isinstance(turn, AssistantTurn): - raise TypeError( - f"Expected turn to be AssistantTurn, got {type(turn).__name__}" - ) - if turn.tokens is None and turn.completion: - turn.tokens = self.provider.value_tokens(turn.completion) - if turn.cost is None and turn.completion: - turn.cost = self.provider.value_cost(turn.completion, turn.tokens) - if turn.tokens is not None: - tokens_log(self.provider, turn.tokens) - self._turns.extend([user_turn, turn]) + if not isinstance(turn, AssistantTurn): + raise TypeError( + f"Expected turn to be AssistantTurn, got {type(turn).__name__}" + ) + if turn.tokens is None and turn.completion: + turn.tokens = self.provider.value_tokens(turn.completion) + if turn.cost is None and turn.completion: + turn.cost = self.provider.value_cost(turn.completion, turn.tokens) + if turn.tokens is not None: + tokens_log(self.provider, turn.tokens) + self._turns.extend([user_turn, turn]) @overload def _submit_turns_async( @@ -2764,28 +2789,53 @@ def emit(text: str | Content): kwargs=all_kwargs, ) - result = None - async for chunk in response: - content = self.provider.stream_content(chunk) - if content is not None: - text = content_text(content) - if text: - emit(text) - if content_mode == "all" and isinstance( - content, ContentThinking - ): - yield content - else: - yield text - result = self.provider.stream_merge_chunks(result, chunk) - - turn = self.provider.stream_turn( - result, - has_data_model=data_model is not None, + partial_turn: AssistantTurn = AssistantTurn( + [], partial_reason="interrupted" ) - - if echo == "all": - emit_other_contents(turn, emit) + self._turns.extend([user_turn, partial_turn]) + turn_idx = len(self._turns) - 1 + + try: + result = None + async for chunk in response: + content = self.provider.stream_content(chunk) + if content is not None: + text = content_text(content) + if text: + emit(text) + self._turns[turn_idx].contents.append(content) + if content_mode == "all" and isinstance( + content, ContentThinking + ): + yield content + else: + yield text + result = self.provider.stream_merge_chunks(result, chunk) + else: + # Normal completion — replace partial with full turn + turn = self.provider.stream_turn( + result, + has_data_model=data_model is not None, + ) + if echo == "all": + emit_other_contents(turn, emit) + if not isinstance(turn, AssistantTurn): + raise TypeError( + f"Expected turn to be AssistantTurn, got {type(turn).__name__}" + ) + if turn.tokens is None and turn.completion: + turn.tokens = self.provider.value_tokens(turn.completion) + if turn.cost is None and turn.completion: + turn.cost = self.provider.value_cost( + turn.completion, turn.tokens + ) + if turn.tokens is not None: + tokens_log(self.provider, turn.tokens) + self._turns[turn_idx] = turn + finally: + turn = self._turns[turn_idx] + if turn.is_partial: + turn.contents = merge_content_text(turn.contents) else: response = await self.provider.chat_perform_async( @@ -2806,17 +2856,17 @@ def emit(text: str | Content): if echo == "all": emit_other_contents(turn, emit) - if not isinstance(turn, AssistantTurn): - raise TypeError( - f"Expected turn to be AssistantTurn, got {type(turn).__name__}" - ) - if turn.tokens is None and turn.completion: - turn.tokens = self.provider.value_tokens(turn.completion) - if turn.cost is None and turn.completion: - turn.cost = self.provider.value_cost(turn.completion, turn.tokens) - if turn.tokens is not None: - tokens_log(self.provider, turn.tokens) - self._turns.extend([user_turn, turn]) + if not isinstance(turn, AssistantTurn): + raise TypeError( + f"Expected turn to be AssistantTurn, got {type(turn).__name__}" + ) + if turn.tokens is None and turn.completion: + turn.tokens = self.provider.value_tokens(turn.completion) + if turn.cost is None and turn.completion: + turn.cost = self.provider.value_cost(turn.completion, turn.tokens) + if turn.tokens is not None: + tokens_log(self.provider, turn.tokens) + self._turns.extend([user_turn, turn]) def _collect_all_kwargs( self, diff --git a/tests/_vcr/test_chat/test_partial_turn_preserved_on_close.yaml b/tests/_vcr/test_chat/test_partial_turn_preserved_on_close.yaml new file mode 100644 index 00000000..333030e9 --- /dev/null +++ b/tests/_vcr/test_chat/test_partial_turn_preserved_on_close.yaml @@ -0,0 +1,174 @@ +interactions: +- request: + body: '{"input": [{"role": "user", "content": [{"type": "input_text", "text": + "\n What are the canonical colors of the ROYGBIV rainbow?\n Put + each colour on its own line. Don''t use punctuation.\n "}]}], "model": "gpt-4.1", + "store": false, "stream": true}' + headers: + Accept: + - application/json + Accept-Encoding: + - gzip, deflate + Connection: + - keep-alive + Content-Length: + - '251' + Content-Type: + - application/json + Host: + - api.openai.com + X-Stainless-Async: + - 'false' + x-stainless-read-timeout: + - '600' + method: POST + uri: https://api.openai.com/v1/responses + response: + body: + string: 'event: response.created + + data: {"type":"response.created","sequence_number":0,"response":{"id":"resp_04241dbaf3a847fc01695588904c60819c87095f38c2a25952","object":"response","created_at":1767213200,"status":"in_progress","background":false,"completed_at":null,"error":null,"incomplete_details":null,"instructions":null,"max_output_tokens":null,"max_tool_calls":null,"model":"gpt-4.1-2025-04-14","output":[],"parallel_tool_calls":true,"previous_response_id":null,"prompt_cache_key":null,"prompt_cache_retention":null,"reasoning":{"effort":null,"summary":null},"safety_identifier":null,"service_tier":"auto","store":false,"temperature":1.0,"text":{"format":{"type":"text"},"verbosity":"medium"},"tool_choice":"auto","tools":[],"top_logprobs":0,"top_p":1.0,"truncation":"disabled","usage":null,"user":null,"metadata":{}}} + + + event: response.in_progress + + data: {"type":"response.in_progress","sequence_number":1,"response":{"id":"resp_04241dbaf3a847fc01695588904c60819c87095f38c2a25952","object":"response","created_at":1767213200,"status":"in_progress","background":false,"completed_at":null,"error":null,"incomplete_details":null,"instructions":null,"max_output_tokens":null,"max_tool_calls":null,"model":"gpt-4.1-2025-04-14","output":[],"parallel_tool_calls":true,"previous_response_id":null,"prompt_cache_key":null,"prompt_cache_retention":null,"reasoning":{"effort":null,"summary":null},"safety_identifier":null,"service_tier":"auto","store":false,"temperature":1.0,"text":{"format":{"type":"text"},"verbosity":"medium"},"tool_choice":"auto","tools":[],"top_logprobs":0,"top_p":1.0,"truncation":"disabled","usage":null,"user":null,"metadata":{}}} + + + event: response.output_item.added + + data: {"type":"response.output_item.added","sequence_number":2,"output_index":0,"item":{"id":"msg_04241dbaf3a847fc01695588907c34819c939e24002aa91af2","type":"message","status":"in_progress","content":[],"role":"assistant"}} + + + event: response.content_part.added + + data: {"type":"response.content_part.added","sequence_number":3,"item_id":"msg_04241dbaf3a847fc01695588907c34819c939e24002aa91af2","output_index":0,"content_index":0,"part":{"type":"output_text","annotations":[],"logprobs":[],"text":""}} + + + event: response.output_text.delta + + data: {"type":"response.output_text.delta","sequence_number":4,"item_id":"msg_04241dbaf3a847fc01695588907c34819c939e24002aa91af2","output_index":0,"content_index":0,"delta":"Red","logprobs":[],"obfuscation":"VyLjtlp4pcHnl"} + + + event: response.output_text.delta + + data: {"type":"response.output_text.delta","sequence_number":5,"item_id":"msg_04241dbaf3a847fc01695588907c34819c939e24002aa91af2","output_index":0,"content_index":0,"delta":" \n","logprobs":[],"obfuscation":"VFBYwzpsUJWxH"} + + + event: response.output_text.delta + + data: {"type":"response.output_text.delta","sequence_number":6,"item_id":"msg_04241dbaf3a847fc01695588907c34819c939e24002aa91af2","output_index":0,"content_index":0,"delta":"Orange","logprobs":[],"obfuscation":"GB7EheuPAI"} + + + event: response.output_text.delta + + data: {"type":"response.output_text.delta","sequence_number":7,"item_id":"msg_04241dbaf3a847fc01695588907c34819c939e24002aa91af2","output_index":0,"content_index":0,"delta":" \n","logprobs":[],"obfuscation":"n7WBEjbLB9yuQ"} + + + event: response.output_text.delta + + data: {"type":"response.output_text.delta","sequence_number":8,"item_id":"msg_04241dbaf3a847fc01695588907c34819c939e24002aa91af2","output_index":0,"content_index":0,"delta":"Yellow","logprobs":[],"obfuscation":"tjVPBCfEwZ"} + + + event: response.output_text.delta + + data: {"type":"response.output_text.delta","sequence_number":9,"item_id":"msg_04241dbaf3a847fc01695588907c34819c939e24002aa91af2","output_index":0,"content_index":0,"delta":" \n","logprobs":[],"obfuscation":"ThsRrqFkcHdBY"} + + + event: response.output_text.delta + + data: {"type":"response.output_text.delta","sequence_number":10,"item_id":"msg_04241dbaf3a847fc01695588907c34819c939e24002aa91af2","output_index":0,"content_index":0,"delta":"Green","logprobs":[],"obfuscation":"vpm43U0OMRn"} + + + event: response.output_text.delta + + data: {"type":"response.output_text.delta","sequence_number":11,"item_id":"msg_04241dbaf3a847fc01695588907c34819c939e24002aa91af2","output_index":0,"content_index":0,"delta":" \n","logprobs":[],"obfuscation":"Ce8I6O3Ympl5O"} + + + event: response.output_text.delta + + data: {"type":"response.output_text.delta","sequence_number":12,"item_id":"msg_04241dbaf3a847fc01695588907c34819c939e24002aa91af2","output_index":0,"content_index":0,"delta":"Blue","logprobs":[],"obfuscation":"2qjVtLnv4y9z"} + + + event: response.output_text.delta + + data: {"type":"response.output_text.delta","sequence_number":13,"item_id":"msg_04241dbaf3a847fc01695588907c34819c939e24002aa91af2","output_index":0,"content_index":0,"delta":" \n","logprobs":[],"obfuscation":"RBKSpaeiKPIYL"} + + + event: response.output_text.delta + + data: {"type":"response.output_text.delta","sequence_number":14,"item_id":"msg_04241dbaf3a847fc01695588907c34819c939e24002aa91af2","output_index":0,"content_index":0,"delta":"Ind","logprobs":[],"obfuscation":"50myY2IpRyTEZ"} + + + event: response.output_text.delta + + data: {"type":"response.output_text.delta","sequence_number":15,"item_id":"msg_04241dbaf3a847fc01695588907c34819c939e24002aa91af2","output_index":0,"content_index":0,"delta":"igo","logprobs":[],"obfuscation":"zWfhh8RBqltML"} + + + event: response.output_text.delta + + data: {"type":"response.output_text.delta","sequence_number":16,"item_id":"msg_04241dbaf3a847fc01695588907c34819c939e24002aa91af2","output_index":0,"content_index":0,"delta":" \n","logprobs":[],"obfuscation":"s13DaAWMQbrhf"} + + + event: response.output_text.delta + + data: {"type":"response.output_text.delta","sequence_number":17,"item_id":"msg_04241dbaf3a847fc01695588907c34819c939e24002aa91af2","output_index":0,"content_index":0,"delta":"V","logprobs":[],"obfuscation":"2QRAGHqzRIJ4Rhb"} + + + event: response.output_text.delta + + data: {"type":"response.output_text.delta","sequence_number":18,"item_id":"msg_04241dbaf3a847fc01695588907c34819c939e24002aa91af2","output_index":0,"content_index":0,"delta":"iolet","logprobs":[],"obfuscation":"CWR5fXLEuEA"} + + + event: response.output_text.done + + data: {"type":"response.output_text.done","sequence_number":19,"item_id":"msg_04241dbaf3a847fc01695588907c34819c939e24002aa91af2","output_index":0,"content_index":0,"text":"Red \nOrange \nYellow \nGreen \nBlue \nIndigo \nViolet","logprobs":[]} + + + event: response.content_part.done + + data: {"type":"response.content_part.done","sequence_number":20,"item_id":"msg_04241dbaf3a847fc01695588907c34819c939e24002aa91af2","output_index":0,"content_index":0,"part":{"type":"output_text","annotations":[],"logprobs":[],"text":"Red \nOrange \nYellow \nGreen \nBlue \nIndigo \nViolet"}} + + + event: response.output_item.done + + data: {"type":"response.output_item.done","sequence_number":21,"output_index":0,"item":{"id":"msg_04241dbaf3a847fc01695588907c34819c939e24002aa91af2","type":"message","status":"completed","content":[{"type":"output_text","annotations":[],"logprobs":[],"text":"Red \nOrange \nYellow \nGreen \nBlue \nIndigo \nViolet"}],"role":"assistant"}} + + + event: response.completed + + data: {"type":"response.completed","sequence_number":22,"response":{"id":"resp_04241dbaf3a847fc01695588904c60819c87095f38c2a25952","object":"response","created_at":1767213200,"status":"completed","background":false,"completed_at":1767213200,"error":null,"incomplete_details":null,"instructions":null,"max_output_tokens":null,"max_tool_calls":null,"model":"gpt-4.1-2025-04-14","output":[{"id":"msg_04241dbaf3a847fc01695588907c34819c939e24002aa91af2","type":"message","status":"completed","content":[{"type":"output_text","annotations":[],"logprobs":[],"text":"Red \nOrange \nYellow \nGreen \nBlue \nIndigo \nViolet"}],"role":"assistant"}],"parallel_tool_calls":true,"previous_response_id":null,"prompt_cache_key":null,"prompt_cache_retention":null,"reasoning":{"effort":null,"summary":null},"safety_identifier":null,"service_tier":"default","store":false,"temperature":1.0,"text":{"format":{"type":"text"},"verbosity":"medium"},"tool_choice":"auto","tools":[],"top_logprobs":0,"top_p":1.0,"truncation":"disabled","usage":{"input_tokens":36,"input_tokens_details":{"cached_tokens":0},"output_tokens":16,"output_tokens_details":{"reasoning_tokens":0},"total_tokens":52},"user":null,"metadata":{}}} + + + ' + headers: + CF-RAY: + - 9b6c8d259a25eb65-ORD + Connection: + - keep-alive + Content-Type: + - text/event-stream; charset=utf-8 + Date: + - Wed, 31 Dec 2025 20:33:20 GMT + Server: + - cloudflare + Strict-Transport-Security: + - max-age=31536000; includeSubDomains; preload + Transfer-Encoding: + - chunked + X-Content-Type-Options: + - nosniff + alt-svc: + - h3=":443"; ma=86400 + cf-cache-status: + - DYNAMIC + openai-processing-ms: + - '59' + openai-version: + - '2020-10-01' + x-envoy-upstream-service-time: + - '62' + status: + code: 200 + message: OK +version: 1 diff --git a/tests/_vcr/test_chat/test_partial_turn_preserved_on_close_async.yaml b/tests/_vcr/test_chat/test_partial_turn_preserved_on_close_async.yaml new file mode 100644 index 00000000..c3772615 --- /dev/null +++ b/tests/_vcr/test_chat/test_partial_turn_preserved_on_close_async.yaml @@ -0,0 +1,174 @@ +interactions: +- request: + body: '{"input": [{"role": "user", "content": [{"type": "input_text", "text": + "\n What are the canonical colors of the ROYGBIV rainbow?\n Put + each colour on its own line. Don''t use punctuation.\n "}]}], "model": "gpt-4.1", + "store": false, "stream": true}' + headers: + Accept: + - application/json + Accept-Encoding: + - gzip, deflate + Connection: + - keep-alive + Content-Length: + - '251' + Content-Type: + - application/json + Host: + - api.openai.com + X-Stainless-Async: + - async:asyncio + x-stainless-read-timeout: + - '600' + method: POST + uri: https://api.openai.com/v1/responses + response: + body: + string: 'event: response.created + + data: {"type":"response.created","sequence_number":0,"response":{"id":"resp_0ca73fa85f6cf2590169558890df78819cbbe0d71bc787fb5f","object":"response","created_at":1767213200,"status":"in_progress","background":false,"completed_at":null,"error":null,"incomplete_details":null,"instructions":null,"max_output_tokens":null,"max_tool_calls":null,"model":"gpt-4.1-2025-04-14","output":[],"parallel_tool_calls":true,"previous_response_id":null,"prompt_cache_key":null,"prompt_cache_retention":null,"reasoning":{"effort":null,"summary":null},"safety_identifier":null,"service_tier":"auto","store":false,"temperature":1.0,"text":{"format":{"type":"text"},"verbosity":"medium"},"tool_choice":"auto","tools":[],"top_logprobs":0,"top_p":1.0,"truncation":"disabled","usage":null,"user":null,"metadata":{}}} + + + event: response.in_progress + + data: {"type":"response.in_progress","sequence_number":1,"response":{"id":"resp_0ca73fa85f6cf2590169558890df78819cbbe0d71bc787fb5f","object":"response","created_at":1767213200,"status":"in_progress","background":false,"completed_at":null,"error":null,"incomplete_details":null,"instructions":null,"max_output_tokens":null,"max_tool_calls":null,"model":"gpt-4.1-2025-04-14","output":[],"parallel_tool_calls":true,"previous_response_id":null,"prompt_cache_key":null,"prompt_cache_retention":null,"reasoning":{"effort":null,"summary":null},"safety_identifier":null,"service_tier":"auto","store":false,"temperature":1.0,"text":{"format":{"type":"text"},"verbosity":"medium"},"tool_choice":"auto","tools":[],"top_logprobs":0,"top_p":1.0,"truncation":"disabled","usage":null,"user":null,"metadata":{}}} + + + event: response.output_item.added + + data: {"type":"response.output_item.added","sequence_number":2,"output_index":0,"item":{"id":"msg_0ca73fa85f6cf259016955889120c0819c8612db972f509b24","type":"message","status":"in_progress","content":[],"role":"assistant"}} + + + event: response.content_part.added + + data: {"type":"response.content_part.added","sequence_number":3,"item_id":"msg_0ca73fa85f6cf259016955889120c0819c8612db972f509b24","output_index":0,"content_index":0,"part":{"type":"output_text","annotations":[],"logprobs":[],"text":""}} + + + event: response.output_text.delta + + data: {"type":"response.output_text.delta","sequence_number":4,"item_id":"msg_0ca73fa85f6cf259016955889120c0819c8612db972f509b24","output_index":0,"content_index":0,"delta":"Red","logprobs":[],"obfuscation":"Fwrn6l3VdXbRX"} + + + event: response.output_text.delta + + data: {"type":"response.output_text.delta","sequence_number":5,"item_id":"msg_0ca73fa85f6cf259016955889120c0819c8612db972f509b24","output_index":0,"content_index":0,"delta":" \n","logprobs":[],"obfuscation":"1kJSXray3LSug"} + + + event: response.output_text.delta + + data: {"type":"response.output_text.delta","sequence_number":6,"item_id":"msg_0ca73fa85f6cf259016955889120c0819c8612db972f509b24","output_index":0,"content_index":0,"delta":"Orange","logprobs":[],"obfuscation":"Eih8h9dEZy"} + + + event: response.output_text.delta + + data: {"type":"response.output_text.delta","sequence_number":7,"item_id":"msg_0ca73fa85f6cf259016955889120c0819c8612db972f509b24","output_index":0,"content_index":0,"delta":" \n","logprobs":[],"obfuscation":"tHQucrSzXW82d"} + + + event: response.output_text.delta + + data: {"type":"response.output_text.delta","sequence_number":8,"item_id":"msg_0ca73fa85f6cf259016955889120c0819c8612db972f509b24","output_index":0,"content_index":0,"delta":"Yellow","logprobs":[],"obfuscation":"d7XiKGH6ge"} + + + event: response.output_text.delta + + data: {"type":"response.output_text.delta","sequence_number":9,"item_id":"msg_0ca73fa85f6cf259016955889120c0819c8612db972f509b24","output_index":0,"content_index":0,"delta":" \n","logprobs":[],"obfuscation":"Y8ax6kRokXoTk"} + + + event: response.output_text.delta + + data: {"type":"response.output_text.delta","sequence_number":10,"item_id":"msg_0ca73fa85f6cf259016955889120c0819c8612db972f509b24","output_index":0,"content_index":0,"delta":"Green","logprobs":[],"obfuscation":"lcahcSGcQIO"} + + + event: response.output_text.delta + + data: {"type":"response.output_text.delta","sequence_number":11,"item_id":"msg_0ca73fa85f6cf259016955889120c0819c8612db972f509b24","output_index":0,"content_index":0,"delta":" \n","logprobs":[],"obfuscation":"Ybguf4GM4Rxzd"} + + + event: response.output_text.delta + + data: {"type":"response.output_text.delta","sequence_number":12,"item_id":"msg_0ca73fa85f6cf259016955889120c0819c8612db972f509b24","output_index":0,"content_index":0,"delta":"Blue","logprobs":[],"obfuscation":"jnjjMyDfW7YQ"} + + + event: response.output_text.delta + + data: {"type":"response.output_text.delta","sequence_number":13,"item_id":"msg_0ca73fa85f6cf259016955889120c0819c8612db972f509b24","output_index":0,"content_index":0,"delta":" \n","logprobs":[],"obfuscation":"GCrzkYvw1d9OB"} + + + event: response.output_text.delta + + data: {"type":"response.output_text.delta","sequence_number":14,"item_id":"msg_0ca73fa85f6cf259016955889120c0819c8612db972f509b24","output_index":0,"content_index":0,"delta":"Ind","logprobs":[],"obfuscation":"XCEuFUkogiu5Q"} + + + event: response.output_text.delta + + data: {"type":"response.output_text.delta","sequence_number":15,"item_id":"msg_0ca73fa85f6cf259016955889120c0819c8612db972f509b24","output_index":0,"content_index":0,"delta":"igo","logprobs":[],"obfuscation":"N6Rz9xT3o8JEi"} + + + event: response.output_text.delta + + data: {"type":"response.output_text.delta","sequence_number":16,"item_id":"msg_0ca73fa85f6cf259016955889120c0819c8612db972f509b24","output_index":0,"content_index":0,"delta":" \n","logprobs":[],"obfuscation":"2m15iglV7QFMR"} + + + event: response.output_text.delta + + data: {"type":"response.output_text.delta","sequence_number":17,"item_id":"msg_0ca73fa85f6cf259016955889120c0819c8612db972f509b24","output_index":0,"content_index":0,"delta":"V","logprobs":[],"obfuscation":"WNlPYGJ0Q9ygCMr"} + + + event: response.output_text.delta + + data: {"type":"response.output_text.delta","sequence_number":18,"item_id":"msg_0ca73fa85f6cf259016955889120c0819c8612db972f509b24","output_index":0,"content_index":0,"delta":"iolet","logprobs":[],"obfuscation":"ZcES9KNvLip"} + + + event: response.output_text.done + + data: {"type":"response.output_text.done","sequence_number":19,"item_id":"msg_0ca73fa85f6cf259016955889120c0819c8612db972f509b24","output_index":0,"content_index":0,"text":"Red \nOrange \nYellow \nGreen \nBlue \nIndigo \nViolet","logprobs":[]} + + + event: response.content_part.done + + data: {"type":"response.content_part.done","sequence_number":20,"item_id":"msg_0ca73fa85f6cf259016955889120c0819c8612db972f509b24","output_index":0,"content_index":0,"part":{"type":"output_text","annotations":[],"logprobs":[],"text":"Red \nOrange \nYellow \nGreen \nBlue \nIndigo \nViolet"}} + + + event: response.output_item.done + + data: {"type":"response.output_item.done","sequence_number":21,"output_index":0,"item":{"id":"msg_0ca73fa85f6cf259016955889120c0819c8612db972f509b24","type":"message","status":"completed","content":[{"type":"output_text","annotations":[],"logprobs":[],"text":"Red \nOrange \nYellow \nGreen \nBlue \nIndigo \nViolet"}],"role":"assistant"}} + + + event: response.completed + + data: {"type":"response.completed","sequence_number":22,"response":{"id":"resp_0ca73fa85f6cf2590169558890df78819cbbe0d71bc787fb5f","object":"response","created_at":1767213200,"status":"completed","background":false,"completed_at":1767213201,"error":null,"incomplete_details":null,"instructions":null,"max_output_tokens":null,"max_tool_calls":null,"model":"gpt-4.1-2025-04-14","output":[{"id":"msg_0ca73fa85f6cf259016955889120c0819c8612db972f509b24","type":"message","status":"completed","content":[{"type":"output_text","annotations":[],"logprobs":[],"text":"Red \nOrange \nYellow \nGreen \nBlue \nIndigo \nViolet"}],"role":"assistant"}],"parallel_tool_calls":true,"previous_response_id":null,"prompt_cache_key":null,"prompt_cache_retention":null,"reasoning":{"effort":null,"summary":null},"safety_identifier":null,"service_tier":"default","store":false,"temperature":1.0,"text":{"format":{"type":"text"},"verbosity":"medium"},"tool_choice":"auto","tools":[],"top_logprobs":0,"top_p":1.0,"truncation":"disabled","usage":{"input_tokens":36,"input_tokens_details":{"cached_tokens":0},"output_tokens":16,"output_tokens_details":{"reasoning_tokens":0},"total_tokens":52},"user":null,"metadata":{}}} + + + ' + headers: + CF-RAY: + - 9b6c8d293c230cde-ORD + Connection: + - keep-alive + Content-Type: + - text/event-stream; charset=utf-8 + Date: + - Wed, 31 Dec 2025 20:33:20 GMT + Server: + - cloudflare + Strict-Transport-Security: + - max-age=31536000; includeSubDomains; preload + Transfer-Encoding: + - chunked + X-Content-Type-Options: + - nosniff + alt-svc: + - h3=":443"; ma=86400 + cf-cache-status: + - DYNAMIC + openai-processing-ms: + - '28' + openai-version: + - '2020-10-01' + x-envoy-upstream-service-time: + - '31' + status: + code: 200 + message: OK +version: 1 diff --git a/tests/test_chat.py b/tests/test_chat.py index 787d12b2..79dcd607 100644 --- a/tests/test_chat.py +++ b/tests/test_chat.py @@ -608,6 +608,61 @@ def test_json_serialize_with_tool_failure(): assert "Something went wrong" in str(restored_result.error) +@pytest.mark.vcr +def test_partial_turn_preserved_on_close(): + """Closing a streaming generator mid-response preserves the partial turn.""" + chat = ChatOpenAI() + gen = chat.stream( + """ + What are the canonical colors of the ROYGBIV rainbow? + Put each colour on its own line. Don't use punctuation. + """ + ) + # Consume a few chunks then close + chunks = [] + for chunk in gen: + chunks.append(chunk) + if len(chunks) >= 3: + break + gen.close() + + turns = chat.get_turns() + assert len(turns) == 2 # user + partial assistant + assistant_turn = turns[1] + assert isinstance(assistant_turn, AssistantTurn) + assert assistant_turn.is_partial + assert assistant_turn.partial_reason == "interrupted" + assert len(assistant_turn.text) > 0 + + +@pytest.mark.vcr +@pytest.mark.asyncio +async def test_partial_turn_preserved_on_close_async(): + """Closing an async streaming generator mid-response preserves the partial turn.""" + chat = ChatOpenAI() + gen = await chat.stream_async( + """ + What are the canonical colors of the ROYGBIV rainbow? + Put each colour on its own line. Don't use punctuation. + """ + ) + # Consume a few chunks then close + chunks = [] + async for chunk in gen: + chunks.append(chunk) + if len(chunks) >= 3: + break + await gen.aclose() + + turns = chat.get_turns() + assert len(turns) == 2 # user + partial assistant + assistant_turn = turns[1] + assert isinstance(assistant_turn, AssistantTurn) + assert assistant_turn.is_partial + assert assistant_turn.partial_reason == "interrupted" + assert len(assistant_turn.text) > 0 + + def test_merge_content_text(): from chatlas._chat import merge_content_text from chatlas._content import ContentText, ContentThinking From d9ceae94396635db188679f39b9f9d52b5a5c5b2 Mon Sep 17 00:00:00 2001 From: Carson Date: Thu, 2 Apr 2026 11:10:35 -0500 Subject: [PATCH 03/22] feat: exclude partial turns from token accounting and cost Partial turns (from interrupted streams) have no token or cost data. Filter them out in get_cost() and get_tokens() to avoid errors. Co-Authored-By: Claude Opus 4.6 --- chatlas/_chat.py | 14 +++++++++++++- tests/test_chat.py | 19 +++++++++++++++++++ 2 files changed, 32 insertions(+), 1 deletion(-) diff --git a/chatlas/_chat.py b/chatlas/_chat.py index a4a02ab7..efb18665 100644 --- a/chatlas/_chat.py +++ b/chatlas/_chat.py @@ -399,6 +399,14 @@ def get_tokens(self) -> list[TokensDict]: turns = self.get_turns(include_system_prompt=False) + # Exclude trailing partial turn (and its preceding user turn) + while ( + len(turns) >= 2 + and isinstance(turns[-1], AssistantTurn) + and turns[-1].is_partial + ): + turns = turns[:-2] + if len(turns) == 0: return [] @@ -524,7 +532,11 @@ def get_cost( The cost of the chat, in USD. """ - assistant_turns = [t for t in self._turns if isinstance(t, AssistantTurn)] + assistant_turns = [ + t + for t in self._turns + if isinstance(t, AssistantTurn) and not t.is_partial + ] if len(assistant_turns) == 0: return 0.0 diff --git a/tests/test_chat.py b/tests/test_chat.py index 79dcd607..6f163bbb 100644 --- a/tests/test_chat.py +++ b/tests/test_chat.py @@ -663,6 +663,25 @@ async def test_partial_turn_preserved_on_close_async(): assert len(assistant_turn.text) > 0 +def test_partial_turns_excluded_from_cost(): + chat = ChatOpenAI() + chat.set_turns( + [ + UserTurn("hello"), + AssistantTurn("response", tokens=(10, 5, 0), cost=0.001), + UserTurn("more"), + AssistantTurn("partial", partial_reason="interrupted"), + ] + ) + # Cost should only include the complete turn + cost = chat.get_cost() + assert cost == 0.001 + + # get_tokens should skip partial + tokens = chat.get_tokens() + assert len(tokens) == 2 # user + assistant from the complete turn only + + def test_merge_content_text(): from chatlas._chat import merge_content_text from chatlas._content import ContentText, ContentThinking From 110c0823d0b4efdf52b9c63fb07a30fc58fb2837 Mon Sep 17 00:00:00 2001 From: Carson Date: Thu, 2 Apr 2026 11:11:28 -0500 Subject: [PATCH 04/22] feat: display partial turns with [interrupted] in Chat repr Partial assistant turns now show their partial_reason (e.g. [interrupted]) instead of token counts. Token/cost totals in the Chat header exclude partial turns. Co-Authored-By: Claude Opus 4.6 --- chatlas/_chat.py | 22 ++++++++++++++-------- tests/__snapshots__/test_chat.ambr | 22 ++++++++++++++++++++++ tests/test_chat.py | 13 +++++++++++++ 3 files changed, 49 insertions(+), 8 deletions(-) diff --git a/chatlas/_chat.py b/chatlas/_chat.py index efb18665..7a1b3646 100644 --- a/chatlas/_chat.py +++ b/chatlas/_chat.py @@ -3086,18 +3086,22 @@ def __str__(self): from ._repr import format_tokens turns = self.get_turns(include_system_prompt=True) - assistant_turns = [t for t in turns if isinstance(t, AssistantTurn)] + complete_assistant_turns = [ + t + for t in turns + if isinstance(t, AssistantTurn) and not t.is_partial + ] - # Sum tokens across assistant turns + # Sum tokens across complete assistant turns tokens: tuple[int, int, int] | None = None - if any(t.tokens for t in assistant_turns): + if any(t.tokens for t in complete_assistant_turns): tokens = ( - sum(t.tokens[0] for t in assistant_turns if t.tokens), - sum(t.tokens[1] for t in assistant_turns if t.tokens), - sum(t.tokens[2] for t in assistant_turns if t.tokens), + sum(t.tokens[0] for t in complete_assistant_turns if t.tokens), + sum(t.tokens[1] for t in complete_assistant_turns if t.tokens), + sum(t.tokens[2] for t in complete_assistant_turns if t.tokens), ) - costs = [t.cost for t in assistant_turns if t.cost is not None] + costs = [t.cost for t in complete_assistant_turns if t.cost is not None] total_cost = sum(costs) if costs else None res = f" + + ## User + + hello + + ## Assistant [input=10 output=5 cost=$0.0010] + + response + + ## User + + more + + ## Assistant [interrupted] + + partial response + + ''' +# --- diff --git a/tests/test_chat.py b/tests/test_chat.py index 6f163bbb..cf5b2e84 100644 --- a/tests/test_chat.py +++ b/tests/test_chat.py @@ -663,6 +663,19 @@ async def test_partial_turn_preserved_on_close_async(): assert len(assistant_turn.text) > 0 +def test_partial_turn_display(snapshot): + chat = ChatOpenAI() + chat.set_turns( + [ + UserTurn("hello"), + AssistantTurn("response", tokens=(10, 5, 0), cost=0.001), + UserTurn("more"), + AssistantTurn("partial response", partial_reason="interrupted"), + ] + ) + assert snapshot == repr(chat) + + def test_partial_turns_excluded_from_cost(): chat = ChatOpenAI() chat.set_turns( From b93e7c00b44f392d997963a707a2f1093dffbfcb Mon Sep 17 00:00:00 2001 From: Carson Date: Thu, 2 Apr 2026 11:18:05 -0500 Subject: [PATCH 05/22] fix: resolve pyright errors in try/finally blocks Cast Content to ContentUnion for list append compatibility and merge_content_text results. Use isinstance check in finally block instead of accessing is_partial on Turn base type. Co-Authored-By: Claude Opus 4.6 --- chatlas/_chat.py | 113 +++++++++++++++++++++++++++-------------------- 1 file changed, 65 insertions(+), 48 deletions(-) diff --git a/chatlas/_chat.py b/chatlas/_chat.py index 7a1b3646..4d7ca740 100644 --- a/chatlas/_chat.py +++ b/chatlas/_chat.py @@ -37,6 +37,7 @@ ContentThinking, ContentToolRequest, ContentToolResult, + ContentUnion, ToolInfo, ) from ._display import ( @@ -2684,7 +2685,9 @@ def emit(text: str | Content): text = content_text(content) if text: emit(text) - self._turns[turn_idx].contents.append(content) + self._turns[turn_idx].contents.append( + cast(ContentUnion, content) + ) if content_mode == "all" and isinstance( content, ContentThinking ): @@ -2692,31 +2695,37 @@ def emit(text: str | Content): else: yield text result = self.provider.stream_merge_chunks(result, chunk) - else: - # Normal completion — replace partial with full turn - turn = self.provider.stream_turn( - result, - has_data_model=data_model is not None, + + # Normal completion — replace partial with full turn + turn = self.provider.stream_turn( + result, + has_data_model=data_model is not None, + ) + if echo == "all": + emit_other_contents(turn, emit) + if not isinstance(turn, AssistantTurn): + raise TypeError( + f"Expected turn to be AssistantTurn, got {type(turn).__name__}" ) - if echo == "all": - emit_other_contents(turn, emit) - if not isinstance(turn, AssistantTurn): - raise TypeError( - f"Expected turn to be AssistantTurn, got {type(turn).__name__}" - ) - if turn.tokens is None and turn.completion: - turn.tokens = self.provider.value_tokens(turn.completion) - if turn.cost is None and turn.completion: - turn.cost = self.provider.value_cost( - turn.completion, turn.tokens - ) - if turn.tokens is not None: - tokens_log(self.provider, turn.tokens) - self._turns[turn_idx] = turn + if turn.tokens is None and turn.completion: + turn.tokens = self.provider.value_tokens(turn.completion) + if turn.cost is None and turn.completion: + turn.cost = self.provider.value_cost( + turn.completion, turn.tokens + ) + if turn.tokens is not None: + tokens_log(self.provider, turn.tokens) + self._turns[turn_idx] = turn finally: - turn = self._turns[turn_idx] - if turn.is_partial: - turn.contents = merge_content_text(turn.contents) + final_turn = self._turns[turn_idx] + if ( + isinstance(final_turn, AssistantTurn) + and final_turn.is_partial + ): + final_turn.contents = cast( + list[ContentUnion], + merge_content_text(final_turn.contents), + ) else: response = self.provider.chat_perform( @@ -2815,7 +2824,9 @@ def emit(text: str | Content): text = content_text(content) if text: emit(text) - self._turns[turn_idx].contents.append(content) + self._turns[turn_idx].contents.append( + cast(ContentUnion, content) + ) if content_mode == "all" and isinstance( content, ContentThinking ): @@ -2823,31 +2834,37 @@ def emit(text: str | Content): else: yield text result = self.provider.stream_merge_chunks(result, chunk) - else: - # Normal completion — replace partial with full turn - turn = self.provider.stream_turn( - result, - has_data_model=data_model is not None, + + # Normal completion — replace partial with full turn + turn = self.provider.stream_turn( + result, + has_data_model=data_model is not None, + ) + if echo == "all": + emit_other_contents(turn, emit) + if not isinstance(turn, AssistantTurn): + raise TypeError( + f"Expected turn to be AssistantTurn, got {type(turn).__name__}" ) - if echo == "all": - emit_other_contents(turn, emit) - if not isinstance(turn, AssistantTurn): - raise TypeError( - f"Expected turn to be AssistantTurn, got {type(turn).__name__}" - ) - if turn.tokens is None and turn.completion: - turn.tokens = self.provider.value_tokens(turn.completion) - if turn.cost is None and turn.completion: - turn.cost = self.provider.value_cost( - turn.completion, turn.tokens - ) - if turn.tokens is not None: - tokens_log(self.provider, turn.tokens) - self._turns[turn_idx] = turn + if turn.tokens is None and turn.completion: + turn.tokens = self.provider.value_tokens(turn.completion) + if turn.cost is None and turn.completion: + turn.cost = self.provider.value_cost( + turn.completion, turn.tokens + ) + if turn.tokens is not None: + tokens_log(self.provider, turn.tokens) + self._turns[turn_idx] = turn finally: - turn = self._turns[turn_idx] - if turn.is_partial: - turn.contents = merge_content_text(turn.contents) + final_turn = self._turns[turn_idx] + if ( + isinstance(final_turn, AssistantTurn) + and final_turn.is_partial + ): + final_turn.contents = cast( + list[ContentUnion], + merge_content_text(final_turn.contents), + ) else: response = await self.provider.chat_perform_async( From cd51551e6b094b83ea7ef8106ab846188486aea9 Mon Sep 17 00:00:00 2001 From: Carson Date: Thu, 2 Apr 2026 11:19:28 -0500 Subject: [PATCH 06/22] feat: add StreamController class for cooperative stream cancellation StreamController provides a simple cancel/reset/cancelled/reason API for cooperatively cancelling streaming responses. Exported from chatlas. Co-Authored-By: Claude Opus 4.6 --- chatlas/__init__.py | 2 ++ chatlas/_stream_controller.py | 60 +++++++++++++++++++++++++++++++++ tests/test_stream_controller.py | 29 ++++++++++++++++ 3 files changed, 91 insertions(+) create mode 100644 chatlas/_stream_controller.py create mode 100644 tests/test_stream_controller.py diff --git a/chatlas/__init__.py b/chatlas/__init__.py index 91e25672..60a6f099 100644 --- a/chatlas/__init__.py +++ b/chatlas/__init__.py @@ -33,6 +33,7 @@ from ._provider_perplexity import ChatPerplexity from ._provider_portkey import ChatPortkey from ._provider_snowflake import ChatSnowflake +from ._stream_controller import StreamController from ._tokens import token_usage from ._tools import Tool, ToolBuiltIn, ToolRejectError from ._tools_builtin import tool_web_fetch, tool_web_search @@ -83,6 +84,7 @@ "interpolate", "interpolate_file", "Provider", + "StreamController", "token_usage", "Tool", "ToolBuiltIn", diff --git a/chatlas/_stream_controller.py b/chatlas/_stream_controller.py new file mode 100644 index 00000000..5f07203f --- /dev/null +++ b/chatlas/_stream_controller.py @@ -0,0 +1,60 @@ +from __future__ import annotations + + +class StreamController: + """ + Cooperative cancellation handle for streaming responses. + + Create a controller and pass it to + :meth:`~chatlas.Chat.stream` or :meth:`~chatlas.Chat.stream_async` + via the ``controller`` argument, then call :meth:`cancel` from + anywhere (e.g., a Shiny observer) to stop the stream after the + current chunk. + + The same controller can be reused across multiple streams. Call + :meth:`reset` to clear the cancelled state before starting a new + stream. + + Examples + -------- + ```python + from chatlas import ChatOpenAI, StreamController + + chat = ChatOpenAI() + ctrl = StreamController() + + i = 0 + for chunk in chat.stream("Write a story", controller=ctrl): + i += 1 + print(chunk, end="") + if i > 10: + ctrl.cancel() + + # Partial response is preserved in history + print(chat.get_turns()) + ``` + """ + + def __init__(self): + self._cancelled: bool = False + self._reason: str | None = None + + def cancel(self, reason: str = "cancelled") -> None: + """Cancel the stream. The reason is stored on the partial turn.""" + self._reason = reason + self._cancelled = True + + def reset(self) -> None: + """Clear the cancelled state and reason.""" + self._cancelled = False + self._reason = None + + @property + def cancelled(self) -> bool: + """Whether the controller has been cancelled.""" + return self._cancelled + + @property + def reason(self) -> str | None: + """The cancellation reason, or None if not cancelled.""" + return self._reason diff --git a/tests/test_stream_controller.py b/tests/test_stream_controller.py new file mode 100644 index 00000000..51005930 --- /dev/null +++ b/tests/test_stream_controller.py @@ -0,0 +1,29 @@ +from chatlas import StreamController + + +def test_stream_controller_initial_state(): + ctrl = StreamController() + assert ctrl.cancelled is False + assert ctrl.reason is None + + +def test_stream_controller_cancel(): + ctrl = StreamController() + ctrl.cancel() + assert ctrl.cancelled is True + assert ctrl.reason == "cancelled" + + +def test_stream_controller_cancel_with_reason(): + ctrl = StreamController() + ctrl.cancel(reason="timeout") + assert ctrl.cancelled is True + assert ctrl.reason == "timeout" + + +def test_stream_controller_reset(): + ctrl = StreamController() + ctrl.cancel(reason="timeout") + ctrl.reset() + assert ctrl.cancelled is False + assert ctrl.reason is None From 1a4e02f6dc0ff376ffa237c929be695928afdfed Mon Sep 17 00:00:00 2001 From: Carson Date: Thu, 2 Apr 2026 11:23:07 -0500 Subject: [PATCH 07/22] feat: add controller parameter to stream() and stream_async() MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Thread StreamController through stream → _chat_impl → _submit_turns (and async equivalents). When controller.cancelled is True, the streaming loop breaks and the partial turn's reason is set from the controller. Also skips tool invocation when cancelled. Co-Authored-By: Claude Opus 4.6 --- chatlas/_chat.py | 125 +++++++++---- .../test_stream_cancel_after_chunks.yaml | 174 ++++++++++++++++++ ...test_stream_cancel_after_chunks_async.yaml | 174 ++++++++++++++++++ tests/test_stream_controller.py | 53 +++++- 4 files changed, 485 insertions(+), 41 deletions(-) create mode 100644 tests/_vcr/test_stream_controller/test_stream_cancel_after_chunks.yaml create mode 100644 tests/_vcr/test_stream_controller/test_stream_cancel_after_chunks_async.yaml diff --git a/chatlas/_chat.py b/chatlas/_chat.py index 4d7ca740..d8d1f295 100644 --- a/chatlas/_chat.py +++ b/chatlas/_chat.py @@ -40,6 +40,7 @@ ContentUnion, ToolInfo, ) +from ._stream_controller import StreamController from ._display import ( EchoDisplayOptions, IPyMarkdownDisplay, @@ -1159,6 +1160,7 @@ def stream( echo: EchoOptions = "none", data_model: Optional[type[BaseModel]] = None, kwargs: Optional[SubmitInputArgsT] = None, + controller: StreamController | None = None, ) -> Generator[str, None, None]: ... @overload @@ -1169,6 +1171,7 @@ def stream( echo: EchoOptions = "none", data_model: Optional[type[BaseModel]] = None, kwargs: Optional[SubmitInputArgsT] = None, + controller: StreamController | None = None, ) -> Generator[str | ContentThinking | ContentToolRequest | ContentToolResult, None, None]: ... def stream( @@ -1178,6 +1181,7 @@ def stream( echo: EchoOptions = "none", data_model: Optional[type[BaseModel]] = None, kwargs: Optional[SubmitInputArgsT] = None, + controller: StreamController | None = None, ) -> Generator[str | ContentThinking | ContentToolRequest | ContentToolResult, None, None]: """ Generate a response from the chat in a streaming fashion. @@ -1239,6 +1243,7 @@ class Person(BaseModel): content=content, kwargs=kwargs, data_model=data_model, + controller=controller, ) def wrapper() -> Generator[ @@ -1258,6 +1263,7 @@ async def stream_async( echo: EchoOptions = "none", data_model: Optional[type[BaseModel]] = None, kwargs: Optional[SubmitInputArgsT] = None, + controller: StreamController | None = None, ) -> AsyncGenerator[str, None]: ... @overload @@ -1268,6 +1274,7 @@ async def stream_async( echo: EchoOptions = "none", data_model: Optional[type[BaseModel]] = None, kwargs: Optional[SubmitInputArgsT] = None, + controller: StreamController | None = None, ) -> AsyncGenerator[str | ContentThinking | ContentToolRequest | ContentToolResult, None]: ... async def stream_async( @@ -1277,6 +1284,7 @@ async def stream_async( echo: EchoOptions = "none", data_model: Optional[type[BaseModel]] = None, kwargs: Optional[SubmitInputArgsT] = None, + controller: StreamController | None = None, ) -> AsyncGenerator[str | ContentThinking | ContentToolRequest | ContentToolResult, None]: """ Generate a response from the chat in a streaming fashion asynchronously. @@ -1347,6 +1355,7 @@ async def wrapper() -> AsyncGenerator[ content=content, kwargs=kwargs, data_model=data_model, + controller=controller, ): yield chunk @@ -2487,6 +2496,7 @@ def _chat_impl( stream: bool, kwargs: Optional[SubmitInputArgsT] = None, data_model: Optional[type[BaseModel]] = None, + controller: StreamController | None = None, ) -> Generator[str, None, None]: ... @overload @@ -2498,6 +2508,7 @@ def _chat_impl( stream: bool, kwargs: Optional[SubmitInputArgsT] = None, data_model: Optional[type[BaseModel]] = None, + controller: StreamController | None = None, ) -> Generator[str | ContentThinking | ContentToolRequest | ContentToolResult, None, None]: ... def _chat_impl( @@ -2508,6 +2519,7 @@ def _chat_impl( stream: bool, kwargs: Optional[SubmitInputArgsT] = None, data_model: Optional[type[BaseModel]] = None, + controller: StreamController | None = None, ) -> Generator[str | Content, None, None]: user_turn_result: UserTurn | None = user_turn while user_turn_result is not None: @@ -2518,6 +2530,7 @@ def _chat_impl( data_model=data_model, kwargs=kwargs, content_mode=content, + controller=controller, ): yield chunk @@ -2525,6 +2538,10 @@ def _chat_impl( assert turn is not None user_turn_result = None + # Don't invoke tools if the stream was cancelled + if controller is not None and controller.cancelled: + break + all_results: list[ContentToolResult] = [] for x in turn.contents: if isinstance(x, ContentToolRequest): @@ -2555,6 +2572,7 @@ def _chat_impl_async( stream: bool, kwargs: Optional[SubmitInputArgsT] = None, data_model: Optional[type[BaseModel]] = None, + controller: StreamController | None = None, ) -> AsyncGenerator[str, None]: ... @overload @@ -2566,6 +2584,7 @@ def _chat_impl_async( stream: bool, kwargs: Optional[SubmitInputArgsT] = None, data_model: Optional[type[BaseModel]] = None, + controller: StreamController | None = None, ) -> AsyncGenerator[str | ContentThinking | ContentToolRequest | ContentToolResult, None]: ... async def _chat_impl_async( @@ -2576,6 +2595,7 @@ async def _chat_impl_async( stream: bool, kwargs: Optional[SubmitInputArgsT] = None, data_model: Optional[type[BaseModel]] = None, + controller: StreamController | None = None, ) -> AsyncGenerator[str | Content, None]: user_turn_result: UserTurn | None = user_turn while user_turn_result is not None: @@ -2586,6 +2606,7 @@ async def _chat_impl_async( data_model=data_model, kwargs=kwargs, content_mode=content, + controller=controller, ): yield chunk @@ -2593,6 +2614,10 @@ async def _chat_impl_async( assert turn is not None user_turn_result = None + # Don't invoke tools if the stream was cancelled + if controller is not None and controller.cancelled: + break + all_results: list[ContentToolResult] = [] for x in turn.contents: if isinstance(x, ContentToolRequest): @@ -2647,6 +2672,7 @@ def _submit_turns( data_model: type[BaseModel] | None = None, kwargs: Optional[SubmitInputArgsT] = None, content_mode: Literal["text", "all"] = "text", + controller: StreamController | None = None, ) -> Generator[str | Content, None, None]: if any(isinstance(x, Tool) and x._is_async for x in self._tools.values()): raise ValueError("Cannot use async tools in a synchronous chat") @@ -2694,34 +2720,43 @@ def emit(text: str | Content): yield content else: yield text + if ( + controller is not None + and controller.cancelled + ): + break result = self.provider.stream_merge_chunks(result, chunk) - - # Normal completion — replace partial with full turn - turn = self.provider.stream_turn( - result, - has_data_model=data_model is not None, - ) - if echo == "all": - emit_other_contents(turn, emit) - if not isinstance(turn, AssistantTurn): - raise TypeError( - f"Expected turn to be AssistantTurn, got {type(turn).__name__}" - ) - if turn.tokens is None and turn.completion: - turn.tokens = self.provider.value_tokens(turn.completion) - if turn.cost is None and turn.completion: - turn.cost = self.provider.value_cost( - turn.completion, turn.tokens + else: + # Normal completion — replace partial with full turn + turn = self.provider.stream_turn( + result, + has_data_model=data_model is not None, ) - if turn.tokens is not None: - tokens_log(self.provider, turn.tokens) - self._turns[turn_idx] = turn + if echo == "all": + emit_other_contents(turn, emit) + if not isinstance(turn, AssistantTurn): + raise TypeError( + f"Expected turn to be AssistantTurn, got {type(turn).__name__}" + ) + if turn.tokens is None and turn.completion: + turn.tokens = self.provider.value_tokens( + turn.completion + ) + if turn.cost is None and turn.completion: + turn.cost = self.provider.value_cost( + turn.completion, turn.tokens + ) + if turn.tokens is not None: + tokens_log(self.provider, turn.tokens) + self._turns[turn_idx] = turn finally: final_turn = self._turns[turn_idx] if ( isinstance(final_turn, AssistantTurn) and final_turn.is_partial ): + if controller is not None and controller.cancelled: + final_turn.partial_reason = controller.reason final_turn.contents = cast( list[ContentUnion], merge_content_text(final_turn.contents), @@ -2789,6 +2824,7 @@ async def _submit_turns_async( data_model: type[BaseModel] | None = None, kwargs: Optional[SubmitInputArgsT] = None, content_mode: Literal["text", "all"] = "text", + controller: StreamController | None = None, ) -> AsyncGenerator[str | Content, None]: def emit(text: str | Content): self._echo_content(str(text)) @@ -2833,34 +2869,43 @@ def emit(text: str | Content): yield content else: yield text + if ( + controller is not None + and controller.cancelled + ): + break result = self.provider.stream_merge_chunks(result, chunk) - - # Normal completion — replace partial with full turn - turn = self.provider.stream_turn( - result, - has_data_model=data_model is not None, - ) - if echo == "all": - emit_other_contents(turn, emit) - if not isinstance(turn, AssistantTurn): - raise TypeError( - f"Expected turn to be AssistantTurn, got {type(turn).__name__}" - ) - if turn.tokens is None and turn.completion: - turn.tokens = self.provider.value_tokens(turn.completion) - if turn.cost is None and turn.completion: - turn.cost = self.provider.value_cost( - turn.completion, turn.tokens + else: + # Normal completion — replace partial with full turn + turn = self.provider.stream_turn( + result, + has_data_model=data_model is not None, ) - if turn.tokens is not None: - tokens_log(self.provider, turn.tokens) - self._turns[turn_idx] = turn + if echo == "all": + emit_other_contents(turn, emit) + if not isinstance(turn, AssistantTurn): + raise TypeError( + f"Expected turn to be AssistantTurn, got {type(turn).__name__}" + ) + if turn.tokens is None and turn.completion: + turn.tokens = self.provider.value_tokens( + turn.completion + ) + if turn.cost is None and turn.completion: + turn.cost = self.provider.value_cost( + turn.completion, turn.tokens + ) + if turn.tokens is not None: + tokens_log(self.provider, turn.tokens) + self._turns[turn_idx] = turn finally: final_turn = self._turns[turn_idx] if ( isinstance(final_turn, AssistantTurn) and final_turn.is_partial ): + if controller is not None and controller.cancelled: + final_turn.partial_reason = controller.reason final_turn.contents = cast( list[ContentUnion], merge_content_text(final_turn.contents), diff --git a/tests/_vcr/test_stream_controller/test_stream_cancel_after_chunks.yaml b/tests/_vcr/test_stream_controller/test_stream_cancel_after_chunks.yaml new file mode 100644 index 00000000..333030e9 --- /dev/null +++ b/tests/_vcr/test_stream_controller/test_stream_cancel_after_chunks.yaml @@ -0,0 +1,174 @@ +interactions: +- request: + body: '{"input": [{"role": "user", "content": [{"type": "input_text", "text": + "\n What are the canonical colors of the ROYGBIV rainbow?\n Put + each colour on its own line. Don''t use punctuation.\n "}]}], "model": "gpt-4.1", + "store": false, "stream": true}' + headers: + Accept: + - application/json + Accept-Encoding: + - gzip, deflate + Connection: + - keep-alive + Content-Length: + - '251' + Content-Type: + - application/json + Host: + - api.openai.com + X-Stainless-Async: + - 'false' + x-stainless-read-timeout: + - '600' + method: POST + uri: https://api.openai.com/v1/responses + response: + body: + string: 'event: response.created + + data: {"type":"response.created","sequence_number":0,"response":{"id":"resp_04241dbaf3a847fc01695588904c60819c87095f38c2a25952","object":"response","created_at":1767213200,"status":"in_progress","background":false,"completed_at":null,"error":null,"incomplete_details":null,"instructions":null,"max_output_tokens":null,"max_tool_calls":null,"model":"gpt-4.1-2025-04-14","output":[],"parallel_tool_calls":true,"previous_response_id":null,"prompt_cache_key":null,"prompt_cache_retention":null,"reasoning":{"effort":null,"summary":null},"safety_identifier":null,"service_tier":"auto","store":false,"temperature":1.0,"text":{"format":{"type":"text"},"verbosity":"medium"},"tool_choice":"auto","tools":[],"top_logprobs":0,"top_p":1.0,"truncation":"disabled","usage":null,"user":null,"metadata":{}}} + + + event: response.in_progress + + data: {"type":"response.in_progress","sequence_number":1,"response":{"id":"resp_04241dbaf3a847fc01695588904c60819c87095f38c2a25952","object":"response","created_at":1767213200,"status":"in_progress","background":false,"completed_at":null,"error":null,"incomplete_details":null,"instructions":null,"max_output_tokens":null,"max_tool_calls":null,"model":"gpt-4.1-2025-04-14","output":[],"parallel_tool_calls":true,"previous_response_id":null,"prompt_cache_key":null,"prompt_cache_retention":null,"reasoning":{"effort":null,"summary":null},"safety_identifier":null,"service_tier":"auto","store":false,"temperature":1.0,"text":{"format":{"type":"text"},"verbosity":"medium"},"tool_choice":"auto","tools":[],"top_logprobs":0,"top_p":1.0,"truncation":"disabled","usage":null,"user":null,"metadata":{}}} + + + event: response.output_item.added + + data: {"type":"response.output_item.added","sequence_number":2,"output_index":0,"item":{"id":"msg_04241dbaf3a847fc01695588907c34819c939e24002aa91af2","type":"message","status":"in_progress","content":[],"role":"assistant"}} + + + event: response.content_part.added + + data: {"type":"response.content_part.added","sequence_number":3,"item_id":"msg_04241dbaf3a847fc01695588907c34819c939e24002aa91af2","output_index":0,"content_index":0,"part":{"type":"output_text","annotations":[],"logprobs":[],"text":""}} + + + event: response.output_text.delta + + data: {"type":"response.output_text.delta","sequence_number":4,"item_id":"msg_04241dbaf3a847fc01695588907c34819c939e24002aa91af2","output_index":0,"content_index":0,"delta":"Red","logprobs":[],"obfuscation":"VyLjtlp4pcHnl"} + + + event: response.output_text.delta + + data: {"type":"response.output_text.delta","sequence_number":5,"item_id":"msg_04241dbaf3a847fc01695588907c34819c939e24002aa91af2","output_index":0,"content_index":0,"delta":" \n","logprobs":[],"obfuscation":"VFBYwzpsUJWxH"} + + + event: response.output_text.delta + + data: {"type":"response.output_text.delta","sequence_number":6,"item_id":"msg_04241dbaf3a847fc01695588907c34819c939e24002aa91af2","output_index":0,"content_index":0,"delta":"Orange","logprobs":[],"obfuscation":"GB7EheuPAI"} + + + event: response.output_text.delta + + data: {"type":"response.output_text.delta","sequence_number":7,"item_id":"msg_04241dbaf3a847fc01695588907c34819c939e24002aa91af2","output_index":0,"content_index":0,"delta":" \n","logprobs":[],"obfuscation":"n7WBEjbLB9yuQ"} + + + event: response.output_text.delta + + data: {"type":"response.output_text.delta","sequence_number":8,"item_id":"msg_04241dbaf3a847fc01695588907c34819c939e24002aa91af2","output_index":0,"content_index":0,"delta":"Yellow","logprobs":[],"obfuscation":"tjVPBCfEwZ"} + + + event: response.output_text.delta + + data: {"type":"response.output_text.delta","sequence_number":9,"item_id":"msg_04241dbaf3a847fc01695588907c34819c939e24002aa91af2","output_index":0,"content_index":0,"delta":" \n","logprobs":[],"obfuscation":"ThsRrqFkcHdBY"} + + + event: response.output_text.delta + + data: {"type":"response.output_text.delta","sequence_number":10,"item_id":"msg_04241dbaf3a847fc01695588907c34819c939e24002aa91af2","output_index":0,"content_index":0,"delta":"Green","logprobs":[],"obfuscation":"vpm43U0OMRn"} + + + event: response.output_text.delta + + data: {"type":"response.output_text.delta","sequence_number":11,"item_id":"msg_04241dbaf3a847fc01695588907c34819c939e24002aa91af2","output_index":0,"content_index":0,"delta":" \n","logprobs":[],"obfuscation":"Ce8I6O3Ympl5O"} + + + event: response.output_text.delta + + data: {"type":"response.output_text.delta","sequence_number":12,"item_id":"msg_04241dbaf3a847fc01695588907c34819c939e24002aa91af2","output_index":0,"content_index":0,"delta":"Blue","logprobs":[],"obfuscation":"2qjVtLnv4y9z"} + + + event: response.output_text.delta + + data: {"type":"response.output_text.delta","sequence_number":13,"item_id":"msg_04241dbaf3a847fc01695588907c34819c939e24002aa91af2","output_index":0,"content_index":0,"delta":" \n","logprobs":[],"obfuscation":"RBKSpaeiKPIYL"} + + + event: response.output_text.delta + + data: {"type":"response.output_text.delta","sequence_number":14,"item_id":"msg_04241dbaf3a847fc01695588907c34819c939e24002aa91af2","output_index":0,"content_index":0,"delta":"Ind","logprobs":[],"obfuscation":"50myY2IpRyTEZ"} + + + event: response.output_text.delta + + data: {"type":"response.output_text.delta","sequence_number":15,"item_id":"msg_04241dbaf3a847fc01695588907c34819c939e24002aa91af2","output_index":0,"content_index":0,"delta":"igo","logprobs":[],"obfuscation":"zWfhh8RBqltML"} + + + event: response.output_text.delta + + data: {"type":"response.output_text.delta","sequence_number":16,"item_id":"msg_04241dbaf3a847fc01695588907c34819c939e24002aa91af2","output_index":0,"content_index":0,"delta":" \n","logprobs":[],"obfuscation":"s13DaAWMQbrhf"} + + + event: response.output_text.delta + + data: {"type":"response.output_text.delta","sequence_number":17,"item_id":"msg_04241dbaf3a847fc01695588907c34819c939e24002aa91af2","output_index":0,"content_index":0,"delta":"V","logprobs":[],"obfuscation":"2QRAGHqzRIJ4Rhb"} + + + event: response.output_text.delta + + data: {"type":"response.output_text.delta","sequence_number":18,"item_id":"msg_04241dbaf3a847fc01695588907c34819c939e24002aa91af2","output_index":0,"content_index":0,"delta":"iolet","logprobs":[],"obfuscation":"CWR5fXLEuEA"} + + + event: response.output_text.done + + data: {"type":"response.output_text.done","sequence_number":19,"item_id":"msg_04241dbaf3a847fc01695588907c34819c939e24002aa91af2","output_index":0,"content_index":0,"text":"Red \nOrange \nYellow \nGreen \nBlue \nIndigo \nViolet","logprobs":[]} + + + event: response.content_part.done + + data: {"type":"response.content_part.done","sequence_number":20,"item_id":"msg_04241dbaf3a847fc01695588907c34819c939e24002aa91af2","output_index":0,"content_index":0,"part":{"type":"output_text","annotations":[],"logprobs":[],"text":"Red \nOrange \nYellow \nGreen \nBlue \nIndigo \nViolet"}} + + + event: response.output_item.done + + data: {"type":"response.output_item.done","sequence_number":21,"output_index":0,"item":{"id":"msg_04241dbaf3a847fc01695588907c34819c939e24002aa91af2","type":"message","status":"completed","content":[{"type":"output_text","annotations":[],"logprobs":[],"text":"Red \nOrange \nYellow \nGreen \nBlue \nIndigo \nViolet"}],"role":"assistant"}} + + + event: response.completed + + data: {"type":"response.completed","sequence_number":22,"response":{"id":"resp_04241dbaf3a847fc01695588904c60819c87095f38c2a25952","object":"response","created_at":1767213200,"status":"completed","background":false,"completed_at":1767213200,"error":null,"incomplete_details":null,"instructions":null,"max_output_tokens":null,"max_tool_calls":null,"model":"gpt-4.1-2025-04-14","output":[{"id":"msg_04241dbaf3a847fc01695588907c34819c939e24002aa91af2","type":"message","status":"completed","content":[{"type":"output_text","annotations":[],"logprobs":[],"text":"Red \nOrange \nYellow \nGreen \nBlue \nIndigo \nViolet"}],"role":"assistant"}],"parallel_tool_calls":true,"previous_response_id":null,"prompt_cache_key":null,"prompt_cache_retention":null,"reasoning":{"effort":null,"summary":null},"safety_identifier":null,"service_tier":"default","store":false,"temperature":1.0,"text":{"format":{"type":"text"},"verbosity":"medium"},"tool_choice":"auto","tools":[],"top_logprobs":0,"top_p":1.0,"truncation":"disabled","usage":{"input_tokens":36,"input_tokens_details":{"cached_tokens":0},"output_tokens":16,"output_tokens_details":{"reasoning_tokens":0},"total_tokens":52},"user":null,"metadata":{}}} + + + ' + headers: + CF-RAY: + - 9b6c8d259a25eb65-ORD + Connection: + - keep-alive + Content-Type: + - text/event-stream; charset=utf-8 + Date: + - Wed, 31 Dec 2025 20:33:20 GMT + Server: + - cloudflare + Strict-Transport-Security: + - max-age=31536000; includeSubDomains; preload + Transfer-Encoding: + - chunked + X-Content-Type-Options: + - nosniff + alt-svc: + - h3=":443"; ma=86400 + cf-cache-status: + - DYNAMIC + openai-processing-ms: + - '59' + openai-version: + - '2020-10-01' + x-envoy-upstream-service-time: + - '62' + status: + code: 200 + message: OK +version: 1 diff --git a/tests/_vcr/test_stream_controller/test_stream_cancel_after_chunks_async.yaml b/tests/_vcr/test_stream_controller/test_stream_cancel_after_chunks_async.yaml new file mode 100644 index 00000000..c3772615 --- /dev/null +++ b/tests/_vcr/test_stream_controller/test_stream_cancel_after_chunks_async.yaml @@ -0,0 +1,174 @@ +interactions: +- request: + body: '{"input": [{"role": "user", "content": [{"type": "input_text", "text": + "\n What are the canonical colors of the ROYGBIV rainbow?\n Put + each colour on its own line. Don''t use punctuation.\n "}]}], "model": "gpt-4.1", + "store": false, "stream": true}' + headers: + Accept: + - application/json + Accept-Encoding: + - gzip, deflate + Connection: + - keep-alive + Content-Length: + - '251' + Content-Type: + - application/json + Host: + - api.openai.com + X-Stainless-Async: + - async:asyncio + x-stainless-read-timeout: + - '600' + method: POST + uri: https://api.openai.com/v1/responses + response: + body: + string: 'event: response.created + + data: {"type":"response.created","sequence_number":0,"response":{"id":"resp_0ca73fa85f6cf2590169558890df78819cbbe0d71bc787fb5f","object":"response","created_at":1767213200,"status":"in_progress","background":false,"completed_at":null,"error":null,"incomplete_details":null,"instructions":null,"max_output_tokens":null,"max_tool_calls":null,"model":"gpt-4.1-2025-04-14","output":[],"parallel_tool_calls":true,"previous_response_id":null,"prompt_cache_key":null,"prompt_cache_retention":null,"reasoning":{"effort":null,"summary":null},"safety_identifier":null,"service_tier":"auto","store":false,"temperature":1.0,"text":{"format":{"type":"text"},"verbosity":"medium"},"tool_choice":"auto","tools":[],"top_logprobs":0,"top_p":1.0,"truncation":"disabled","usage":null,"user":null,"metadata":{}}} + + + event: response.in_progress + + data: {"type":"response.in_progress","sequence_number":1,"response":{"id":"resp_0ca73fa85f6cf2590169558890df78819cbbe0d71bc787fb5f","object":"response","created_at":1767213200,"status":"in_progress","background":false,"completed_at":null,"error":null,"incomplete_details":null,"instructions":null,"max_output_tokens":null,"max_tool_calls":null,"model":"gpt-4.1-2025-04-14","output":[],"parallel_tool_calls":true,"previous_response_id":null,"prompt_cache_key":null,"prompt_cache_retention":null,"reasoning":{"effort":null,"summary":null},"safety_identifier":null,"service_tier":"auto","store":false,"temperature":1.0,"text":{"format":{"type":"text"},"verbosity":"medium"},"tool_choice":"auto","tools":[],"top_logprobs":0,"top_p":1.0,"truncation":"disabled","usage":null,"user":null,"metadata":{}}} + + + event: response.output_item.added + + data: {"type":"response.output_item.added","sequence_number":2,"output_index":0,"item":{"id":"msg_0ca73fa85f6cf259016955889120c0819c8612db972f509b24","type":"message","status":"in_progress","content":[],"role":"assistant"}} + + + event: response.content_part.added + + data: {"type":"response.content_part.added","sequence_number":3,"item_id":"msg_0ca73fa85f6cf259016955889120c0819c8612db972f509b24","output_index":0,"content_index":0,"part":{"type":"output_text","annotations":[],"logprobs":[],"text":""}} + + + event: response.output_text.delta + + data: {"type":"response.output_text.delta","sequence_number":4,"item_id":"msg_0ca73fa85f6cf259016955889120c0819c8612db972f509b24","output_index":0,"content_index":0,"delta":"Red","logprobs":[],"obfuscation":"Fwrn6l3VdXbRX"} + + + event: response.output_text.delta + + data: {"type":"response.output_text.delta","sequence_number":5,"item_id":"msg_0ca73fa85f6cf259016955889120c0819c8612db972f509b24","output_index":0,"content_index":0,"delta":" \n","logprobs":[],"obfuscation":"1kJSXray3LSug"} + + + event: response.output_text.delta + + data: {"type":"response.output_text.delta","sequence_number":6,"item_id":"msg_0ca73fa85f6cf259016955889120c0819c8612db972f509b24","output_index":0,"content_index":0,"delta":"Orange","logprobs":[],"obfuscation":"Eih8h9dEZy"} + + + event: response.output_text.delta + + data: {"type":"response.output_text.delta","sequence_number":7,"item_id":"msg_0ca73fa85f6cf259016955889120c0819c8612db972f509b24","output_index":0,"content_index":0,"delta":" \n","logprobs":[],"obfuscation":"tHQucrSzXW82d"} + + + event: response.output_text.delta + + data: {"type":"response.output_text.delta","sequence_number":8,"item_id":"msg_0ca73fa85f6cf259016955889120c0819c8612db972f509b24","output_index":0,"content_index":0,"delta":"Yellow","logprobs":[],"obfuscation":"d7XiKGH6ge"} + + + event: response.output_text.delta + + data: {"type":"response.output_text.delta","sequence_number":9,"item_id":"msg_0ca73fa85f6cf259016955889120c0819c8612db972f509b24","output_index":0,"content_index":0,"delta":" \n","logprobs":[],"obfuscation":"Y8ax6kRokXoTk"} + + + event: response.output_text.delta + + data: {"type":"response.output_text.delta","sequence_number":10,"item_id":"msg_0ca73fa85f6cf259016955889120c0819c8612db972f509b24","output_index":0,"content_index":0,"delta":"Green","logprobs":[],"obfuscation":"lcahcSGcQIO"} + + + event: response.output_text.delta + + data: {"type":"response.output_text.delta","sequence_number":11,"item_id":"msg_0ca73fa85f6cf259016955889120c0819c8612db972f509b24","output_index":0,"content_index":0,"delta":" \n","logprobs":[],"obfuscation":"Ybguf4GM4Rxzd"} + + + event: response.output_text.delta + + data: {"type":"response.output_text.delta","sequence_number":12,"item_id":"msg_0ca73fa85f6cf259016955889120c0819c8612db972f509b24","output_index":0,"content_index":0,"delta":"Blue","logprobs":[],"obfuscation":"jnjjMyDfW7YQ"} + + + event: response.output_text.delta + + data: {"type":"response.output_text.delta","sequence_number":13,"item_id":"msg_0ca73fa85f6cf259016955889120c0819c8612db972f509b24","output_index":0,"content_index":0,"delta":" \n","logprobs":[],"obfuscation":"GCrzkYvw1d9OB"} + + + event: response.output_text.delta + + data: {"type":"response.output_text.delta","sequence_number":14,"item_id":"msg_0ca73fa85f6cf259016955889120c0819c8612db972f509b24","output_index":0,"content_index":0,"delta":"Ind","logprobs":[],"obfuscation":"XCEuFUkogiu5Q"} + + + event: response.output_text.delta + + data: {"type":"response.output_text.delta","sequence_number":15,"item_id":"msg_0ca73fa85f6cf259016955889120c0819c8612db972f509b24","output_index":0,"content_index":0,"delta":"igo","logprobs":[],"obfuscation":"N6Rz9xT3o8JEi"} + + + event: response.output_text.delta + + data: {"type":"response.output_text.delta","sequence_number":16,"item_id":"msg_0ca73fa85f6cf259016955889120c0819c8612db972f509b24","output_index":0,"content_index":0,"delta":" \n","logprobs":[],"obfuscation":"2m15iglV7QFMR"} + + + event: response.output_text.delta + + data: {"type":"response.output_text.delta","sequence_number":17,"item_id":"msg_0ca73fa85f6cf259016955889120c0819c8612db972f509b24","output_index":0,"content_index":0,"delta":"V","logprobs":[],"obfuscation":"WNlPYGJ0Q9ygCMr"} + + + event: response.output_text.delta + + data: {"type":"response.output_text.delta","sequence_number":18,"item_id":"msg_0ca73fa85f6cf259016955889120c0819c8612db972f509b24","output_index":0,"content_index":0,"delta":"iolet","logprobs":[],"obfuscation":"ZcES9KNvLip"} + + + event: response.output_text.done + + data: {"type":"response.output_text.done","sequence_number":19,"item_id":"msg_0ca73fa85f6cf259016955889120c0819c8612db972f509b24","output_index":0,"content_index":0,"text":"Red \nOrange \nYellow \nGreen \nBlue \nIndigo \nViolet","logprobs":[]} + + + event: response.content_part.done + + data: {"type":"response.content_part.done","sequence_number":20,"item_id":"msg_0ca73fa85f6cf259016955889120c0819c8612db972f509b24","output_index":0,"content_index":0,"part":{"type":"output_text","annotations":[],"logprobs":[],"text":"Red \nOrange \nYellow \nGreen \nBlue \nIndigo \nViolet"}} + + + event: response.output_item.done + + data: {"type":"response.output_item.done","sequence_number":21,"output_index":0,"item":{"id":"msg_0ca73fa85f6cf259016955889120c0819c8612db972f509b24","type":"message","status":"completed","content":[{"type":"output_text","annotations":[],"logprobs":[],"text":"Red \nOrange \nYellow \nGreen \nBlue \nIndigo \nViolet"}],"role":"assistant"}} + + + event: response.completed + + data: {"type":"response.completed","sequence_number":22,"response":{"id":"resp_0ca73fa85f6cf2590169558890df78819cbbe0d71bc787fb5f","object":"response","created_at":1767213200,"status":"completed","background":false,"completed_at":1767213201,"error":null,"incomplete_details":null,"instructions":null,"max_output_tokens":null,"max_tool_calls":null,"model":"gpt-4.1-2025-04-14","output":[{"id":"msg_0ca73fa85f6cf259016955889120c0819c8612db972f509b24","type":"message","status":"completed","content":[{"type":"output_text","annotations":[],"logprobs":[],"text":"Red \nOrange \nYellow \nGreen \nBlue \nIndigo \nViolet"}],"role":"assistant"}],"parallel_tool_calls":true,"previous_response_id":null,"prompt_cache_key":null,"prompt_cache_retention":null,"reasoning":{"effort":null,"summary":null},"safety_identifier":null,"service_tier":"default","store":false,"temperature":1.0,"text":{"format":{"type":"text"},"verbosity":"medium"},"tool_choice":"auto","tools":[],"top_logprobs":0,"top_p":1.0,"truncation":"disabled","usage":{"input_tokens":36,"input_tokens_details":{"cached_tokens":0},"output_tokens":16,"output_tokens_details":{"reasoning_tokens":0},"total_tokens":52},"user":null,"metadata":{}}} + + + ' + headers: + CF-RAY: + - 9b6c8d293c230cde-ORD + Connection: + - keep-alive + Content-Type: + - text/event-stream; charset=utf-8 + Date: + - Wed, 31 Dec 2025 20:33:20 GMT + Server: + - cloudflare + Strict-Transport-Security: + - max-age=31536000; includeSubDomains; preload + Transfer-Encoding: + - chunked + X-Content-Type-Options: + - nosniff + alt-svc: + - h3=":443"; ma=86400 + cf-cache-status: + - DYNAMIC + openai-processing-ms: + - '28' + openai-version: + - '2020-10-01' + x-envoy-upstream-service-time: + - '31' + status: + code: 200 + message: OK +version: 1 diff --git a/tests/test_stream_controller.py b/tests/test_stream_controller.py index 51005930..f20f7423 100644 --- a/tests/test_stream_controller.py +++ b/tests/test_stream_controller.py @@ -1,4 +1,6 @@ -from chatlas import StreamController +import pytest + +from chatlas import ChatOpenAI, StreamController def test_stream_controller_initial_state(): @@ -27,3 +29,52 @@ def test_stream_controller_reset(): ctrl.reset() assert ctrl.cancelled is False assert ctrl.reason is None + + +@pytest.mark.vcr +def test_stream_cancel_after_chunks(): + chat = ChatOpenAI() + ctrl = StreamController() + + chunks = [] + for chunk in chat.stream( + """ + What are the canonical colors of the ROYGBIV rainbow? + Put each colour on its own line. Don't use punctuation. + """, + controller=ctrl, + ): + chunks.append(chunk) + if len(chunks) >= 3: + ctrl.cancel() + + turns = chat.get_turns() + assert len(turns) == 2 + assert turns[1].is_partial + assert turns[1].partial_reason == "cancelled" + assert len(turns[1].text) > 0 + + +@pytest.mark.vcr +@pytest.mark.asyncio +async def test_stream_cancel_after_chunks_async(): + chat = ChatOpenAI() + ctrl = StreamController() + + chunks = [] + async for chunk in await chat.stream_async( + """ + What are the canonical colors of the ROYGBIV rainbow? + Put each colour on its own line. Don't use punctuation. + """, + controller=ctrl, + ): + chunks.append(chunk) + if len(chunks) >= 3: + ctrl.cancel() + + turns = chat.get_turns() + assert len(turns) == 2 + assert turns[1].is_partial + assert turns[1].partial_reason == "cancelled" + assert len(turns[1].text) > 0 From 1ed21fb271e4e7b58a8b7680a2e4485923de6158 Mon Sep 17 00:00:00 2001 From: Carson Date: Thu, 2 Apr 2026 11:23:53 -0500 Subject: [PATCH 08/22] feat: create internal StreamController for chat() and chat_async() Both chat() and chat_async() now create an internal StreamController and thread it through _chat_impl. This ensures the try/finally partial turn machinery is always active, even for non-streaming chat calls. Co-Authored-By: Claude Opus 4.6 --- chatlas/_chat.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/chatlas/_chat.py b/chatlas/_chat.py index d8d1f295..a973c9f4 100644 --- a/chatlas/_chat.py +++ b/chatlas/_chat.py @@ -1083,6 +1083,7 @@ def chat( display = self._markdown_display(echo=echo) + controller = StreamController() response = ChatResponse( self._chat_impl( turn, @@ -1090,6 +1091,7 @@ def chat( content="text", stream=stream, kwargs=kwargs, + controller=controller, ) ) @@ -1136,6 +1138,7 @@ async def chat_async( display = self._markdown_display(echo=echo) + controller = StreamController() response = ChatResponseAsync( self._chat_impl_async( turn, @@ -1143,6 +1146,7 @@ async def chat_async( content="text", stream=stream, kwargs=kwargs, + controller=controller, ), ) From 6290ce4695374ab07f447bbdb38ec64ab470ed46 Mon Sep 17 00:00:00 2001 From: Carson Date: Thu, 2 Apr 2026 11:27:35 -0500 Subject: [PATCH 09/22] style: auto-format with ruff Co-Authored-By: Claude Opus 4.6 --- chatlas/_chat.py | 42 +++++++++++++----------------------------- 1 file changed, 13 insertions(+), 29 deletions(-) diff --git a/chatlas/_chat.py b/chatlas/_chat.py index a973c9f4..799627ae 100644 --- a/chatlas/_chat.py +++ b/chatlas/_chat.py @@ -40,7 +40,6 @@ ContentUnion, ToolInfo, ) -from ._stream_controller import StreamController from ._display import ( EchoDisplayOptions, IPyMarkdownDisplay, @@ -51,6 +50,7 @@ from ._logging import log_tool_error from ._mcp_manager import MCPSessionManager from ._provider import ModelInfo, Provider, StandardModelParams, SubmitInputArgsT +from ._stream_controller import StreamController from ._tokens import tokens_log from ._tools import Tool, ToolBuiltIn, ToolRejectError from ._turn import AssistantTurn, SystemTurn, Turn, UserTurn, user_turn @@ -535,9 +535,7 @@ def get_cost( """ assistant_turns = [ - t - for t in self._turns - if isinstance(t, AssistantTurn) and not t.is_partial + t for t in self._turns if isinstance(t, AssistantTurn) and not t.is_partial ] if len(assistant_turns) == 0: @@ -2654,6 +2652,7 @@ def _submit_turns( data_model: type[BaseModel] | None = None, kwargs: Optional[SubmitInputArgsT] = None, content_mode: Literal["text"] = "text", + controller: StreamController | None = None, ) -> Generator[str, None, None]: ... @overload @@ -2666,6 +2665,7 @@ def _submit_turns( kwargs: Optional[SubmitInputArgsT] = None, *, content_mode: Literal["all"], + controller: StreamController | None = None, ) -> Generator[str | Content, None, None]: ... def _submit_turns( @@ -2724,10 +2724,7 @@ def emit(text: str | Content): yield content else: yield text - if ( - controller is not None - and controller.cancelled - ): + if controller is not None and controller.cancelled: break result = self.provider.stream_merge_chunks(result, chunk) else: @@ -2743,9 +2740,7 @@ def emit(text: str | Content): f"Expected turn to be AssistantTurn, got {type(turn).__name__}" ) if turn.tokens is None and turn.completion: - turn.tokens = self.provider.value_tokens( - turn.completion - ) + turn.tokens = self.provider.value_tokens(turn.completion) if turn.cost is None and turn.completion: turn.cost = self.provider.value_cost( turn.completion, turn.tokens @@ -2755,10 +2750,7 @@ def emit(text: str | Content): self._turns[turn_idx] = turn finally: final_turn = self._turns[turn_idx] - if ( - isinstance(final_turn, AssistantTurn) - and final_turn.is_partial - ): + if isinstance(final_turn, AssistantTurn) and final_turn.is_partial: if controller is not None and controller.cancelled: final_turn.partial_reason = controller.reason final_turn.contents = cast( @@ -2806,6 +2798,7 @@ def _submit_turns_async( data_model: type[BaseModel] | None = None, kwargs: Optional[SubmitInputArgsT] = None, content_mode: Literal["text"] = "text", + controller: StreamController | None = None, ) -> AsyncGenerator[str, None]: ... @overload @@ -2818,6 +2811,7 @@ def _submit_turns_async( kwargs: Optional[SubmitInputArgsT] = None, *, content_mode: Literal["all"], + controller: StreamController | None = None, ) -> AsyncGenerator[str | Content, None]: ... async def _submit_turns_async( @@ -2873,10 +2867,7 @@ def emit(text: str | Content): yield content else: yield text - if ( - controller is not None - and controller.cancelled - ): + if controller is not None and controller.cancelled: break result = self.provider.stream_merge_chunks(result, chunk) else: @@ -2892,9 +2883,7 @@ def emit(text: str | Content): f"Expected turn to be AssistantTurn, got {type(turn).__name__}" ) if turn.tokens is None and turn.completion: - turn.tokens = self.provider.value_tokens( - turn.completion - ) + turn.tokens = self.provider.value_tokens(turn.completion) if turn.cost is None and turn.completion: turn.cost = self.provider.value_cost( turn.completion, turn.tokens @@ -2904,10 +2893,7 @@ def emit(text: str | Content): self._turns[turn_idx] = turn finally: final_turn = self._turns[turn_idx] - if ( - isinstance(final_turn, AssistantTurn) - and final_turn.is_partial - ): + if isinstance(final_turn, AssistantTurn) and final_turn.is_partial: if controller is not None and controller.cancelled: final_turn.partial_reason = controller.reason final_turn.contents = cast( @@ -3153,9 +3139,7 @@ def __str__(self): turns = self.get_turns(include_system_prompt=True) complete_assistant_turns = [ - t - for t in turns - if isinstance(t, AssistantTurn) and not t.is_partial + t for t in turns if isinstance(t, AssistantTurn) and not t.is_partial ] # Sum tokens across complete assistant turns From b37be747c112d1fdb7f012bc3b6d725dc03c808b Mon Sep 17 00:00:00 2001 From: Carson Date: Thu, 2 Apr 2026 12:55:17 -0500 Subject: [PATCH 10/22] docs: document controller parameter on stream() and stream_async() Co-Authored-By: Claude Opus 4.6 --- chatlas/_chat.py | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/chatlas/_chat.py b/chatlas/_chat.py index 799627ae..3d373749 100644 --- a/chatlas/_chat.py +++ b/chatlas/_chat.py @@ -1210,6 +1210,14 @@ def stream( kwargs Additional keyword arguments to pass to the method used for requesting the response. + controller + A [](`~chatlas.StreamController`) for cooperative stream cancellation. + When provided, calling `controller.cancel()` stops the stream after the + current chunk and preserves the partial response in conversation history. + This is useful for building UIs (e.g., a "stop generating" button) where + you want to interrupt a response without losing what's been generated so + far. The same controller can be reused across multiple streams by calling + `controller.reset()` before starting a new stream. Returns ------- @@ -1313,6 +1321,14 @@ async def stream_async( kwargs Additional keyword arguments to pass to the method used for requesting the response. + controller + A [](`~chatlas.StreamController`) for cooperative stream cancellation. + When provided, calling `controller.cancel()` stops the stream after the + current chunk and preserves the partial response in conversation history. + This is useful for building UIs (e.g., a "stop generating" button) where + you want to interrupt a response without losing what's been generated so + far. The same controller can be reused across multiple streams by calling + `controller.reset()` before starting a new stream. Returns ------- From 0fef5fc301966666f7e64ce5544262a9821d91c3 Mon Sep 17 00:00:00 2001 From: Carson Date: Thu, 2 Apr 2026 13:34:07 -0500 Subject: [PATCH 11/22] fix: address review feedback on stream cancellation PR - Capture all content types (not just text) in partial turns so ContentToolRequest etc. aren't silently dropped on interruption - Default-create StreamController when none provided, eliminating all `if controller is not None` guards - Add comments explaining for/else + GeneratorExit interaction - Add thread-safety comment on StreamController.cancel() ordering - Return list[ContentUnion] from merge_content_text to avoid casts Co-Authored-By: Claude Opus 4.6 --- chatlas/_chat.py | 68 ++++++++++++++++++++++------------- chatlas/_stream_controller.py | 4 +++ 2 files changed, 47 insertions(+), 25 deletions(-) diff --git a/chatlas/_chat.py b/chatlas/_chat.py index 3d373749..4b9d0595 100644 --- a/chatlas/_chat.py +++ b/chatlas/_chat.py @@ -1246,6 +1246,9 @@ class Person(BaseModel): display = self._markdown_display(echo=echo) + if controller is None: + controller = StreamController() + generator = self._chat_impl( turn, stream=True, @@ -1362,6 +1365,9 @@ class Person(BaseModel): display = self._markdown_display(echo=echo) + if controller is None: + controller = StreamController() + async def wrapper() -> AsyncGenerator[ str | ContentThinking | ContentToolRequest | ContentToolResult, None ]: @@ -2539,6 +2545,9 @@ def _chat_impl( data_model: Optional[type[BaseModel]] = None, controller: StreamController | None = None, ) -> Generator[str | Content, None, None]: + if controller is None: + controller = StreamController() + user_turn_result: UserTurn | None = user_turn while user_turn_result is not None: for chunk in self._submit_turns( @@ -2557,7 +2566,7 @@ def _chat_impl( user_turn_result = None # Don't invoke tools if the stream was cancelled - if controller is not None and controller.cancelled: + if controller.cancelled: break all_results: list[ContentToolResult] = [] @@ -2615,6 +2624,9 @@ async def _chat_impl_async( data_model: Optional[type[BaseModel]] = None, controller: StreamController | None = None, ) -> AsyncGenerator[str | Content, None]: + if controller is None: + controller = StreamController() + user_turn_result: UserTurn | None = user_turn while user_turn_result is not None: async for chunk in self._submit_turns_async( @@ -2633,7 +2645,7 @@ async def _chat_impl_async( user_turn_result = None # Don't invoke tools if the stream was cancelled - if controller is not None and controller.cancelled: + if controller.cancelled: break all_results: list[ContentToolResult] = [] @@ -2694,6 +2706,9 @@ def _submit_turns( content_mode: Literal["text", "all"] = "text", controller: StreamController | None = None, ) -> Generator[str | Content, None, None]: + if controller is None: + controller = StreamController() + if any(isinstance(x, Tool) and x._is_async for x in self._tools.values()): raise ValueError("Cannot use async tools in a synchronous chat") @@ -2725,23 +2740,26 @@ def emit(text: str | Content): try: result = None + # Note: this for/else relies on the fact that GeneratorExit + # (from gen.close()) is an exception, not a break — so it + # skips the else clause while still running the finally block. for chunk in response: content = self.provider.stream_content(chunk) if content is not None: + self._turns[turn_idx].contents.append( + cast(ContentUnion, content) + ) text = content_text(content) if text: emit(text) - self._turns[turn_idx].contents.append( - cast(ContentUnion, content) - ) if content_mode == "all" and isinstance( content, ContentThinking ): yield content else: yield text - if controller is not None and controller.cancelled: - break + if controller.cancelled: + break result = self.provider.stream_merge_chunks(result, chunk) else: # Normal completion — replace partial with full turn @@ -2767,12 +2785,9 @@ def emit(text: str | Content): finally: final_turn = self._turns[turn_idx] if isinstance(final_turn, AssistantTurn) and final_turn.is_partial: - if controller is not None and controller.cancelled: + if controller.cancelled: final_turn.partial_reason = controller.reason - final_turn.contents = cast( - list[ContentUnion], - merge_content_text(final_turn.contents), - ) + final_turn.contents = merge_content_text(final_turn.contents) else: response = self.provider.chat_perform( @@ -2840,6 +2855,9 @@ async def _submit_turns_async( content_mode: Literal["text", "all"] = "text", controller: StreamController | None = None, ) -> AsyncGenerator[str | Content, None]: + if controller is None: + controller = StreamController() + def emit(text: str | Content): self._echo_content(str(text)) @@ -2868,23 +2886,26 @@ def emit(text: str | Content): try: result = None + # Note: this for/else relies on the fact that GeneratorExit + # (from gen.aclose()) is an exception, not a break — so it + # skips the else clause while still running the finally block. async for chunk in response: content = self.provider.stream_content(chunk) if content is not None: + self._turns[turn_idx].contents.append( + cast(ContentUnion, content) + ) text = content_text(content) if text: emit(text) - self._turns[turn_idx].contents.append( - cast(ContentUnion, content) - ) if content_mode == "all" and isinstance( content, ContentThinking ): yield content else: yield text - if controller is not None and controller.cancelled: - break + if controller.cancelled: + break result = self.provider.stream_merge_chunks(result, chunk) else: # Normal completion — replace partial with full turn @@ -2910,12 +2931,9 @@ def emit(text: str | Content): finally: final_turn = self._turns[turn_idx] if isinstance(final_turn, AssistantTurn) and final_turn.is_partial: - if controller is not None and controller.cancelled: + if controller.cancelled: final_turn.partial_reason = controller.reason - final_turn.contents = cast( - list[ContentUnion], - merge_content_text(final_turn.contents), - ) + final_turn.contents = merge_content_text(final_turn.contents) else: response = await self.provider.chat_perform_async( @@ -3395,11 +3413,11 @@ def content_text(content: Content) -> str: return str(content) -def merge_content_text(contents: Sequence[Content]) -> list[Content]: +def merge_content_text(contents: Sequence[Content]) -> list[ContentUnion]: """Merge adjacent ContentText (and ContentThinking) fragments.""" if not contents: return [] - merged: list[Content] = [contents[0]] + merged: list[ContentUnion] = [cast(ContentUnion, contents[0])] for item in contents[1:]: last = merged[-1] if isinstance(last, ContentText) and isinstance(item, ContentText): @@ -3407,7 +3425,7 @@ def merge_content_text(contents: Sequence[Content]) -> list[Content]: elif isinstance(last, ContentThinking) and isinstance(item, ContentThinking): merged[-1] = ContentThinking(thinking=last.thinking + item.thinking) else: - merged.append(item) + merged.append(cast(ContentUnion, item)) return merged diff --git a/chatlas/_stream_controller.py b/chatlas/_stream_controller.py index 5f07203f..523e2c81 100644 --- a/chatlas/_stream_controller.py +++ b/chatlas/_stream_controller.py @@ -41,6 +41,10 @@ def __init__(self): def cancel(self, reason: str = "cancelled") -> None: """Cancel the stream. The reason is stored on the partial turn.""" + # Set reason before cancelled so that readers who see cancelled=True + # will always find a reason. This is safe under CPython's GIL; on + # free-threaded builds the worst case is a benign extra iteration + # before the streaming loop notices the cancellation. self._reason = reason self._cancelled = True From 736c6bcdb382c732b9caf4f25cea5aae090f4655 Mon Sep 17 00:00:00 2001 From: Carson Date: Thu, 2 Apr 2026 14:01:38 -0500 Subject: [PATCH 12/22] docs: add changelog entries for stream cancellation and partial turns Co-Authored-By: Claude Opus 4.6 --- CHANGELOG.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 897a5246..3491df71 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### New features * The `.stream()` and `.stream_async()` methods now yield `ContentThinking` objects (instead of plain strings) for thinking/reasoning content when `content="all"`. This allows downstream packages like shinychat to provide specific UI for thinking content. (#276) +* New `StreamController` class for cooperative stream cancellation. Pass a controller to `.stream()` or `.stream_async()` and call `controller.cancel()` to stop the stream cleanly (e.g., from a Shiny "stop generating" button). The partial response is preserved in conversation history. (#279) +* When a stream is interrupted (closed early, cancelled, or errors), the accumulated content is now saved as a partial `AssistantTurn` so conversation state isn't lost. Partial turns display `[interrupted]` (or the cancellation reason) in the `Chat` repr and are excluded from token/cost accounting. (#279) ### Bug fixes From 9767a94604c4ebb0f75321c5c1a0753656088b61 Mon Sep 17 00:00:00 2001 From: Carson Date: Thu, 2 Apr 2026 14:24:39 -0500 Subject: [PATCH 13/22] feat: extract TurnAccumulator class for streaming turn lifecycle Introduces TurnAccumulator in chatlas/_turn_accumulator.py mirroring ellmer's R6 class, along with merge_content_text helper and full test coverage in tests/test_turn_accumulator.py. Co-Authored-By: Claude Sonnet 4.6 --- chatlas/_turn_accumulator.py | 81 ++++++++++++++++++++++++++++++++ tests/test_turn_accumulator.py | 86 ++++++++++++++++++++++++++++++++++ 2 files changed, 167 insertions(+) create mode 100644 chatlas/_turn_accumulator.py create mode 100644 tests/test_turn_accumulator.py diff --git a/chatlas/_turn_accumulator.py b/chatlas/_turn_accumulator.py new file mode 100644 index 00000000..1dd4a0fa --- /dev/null +++ b/chatlas/_turn_accumulator.py @@ -0,0 +1,81 @@ +from __future__ import annotations + +from typing import cast + +from ._content import Content, ContentText, ContentThinking, ContentUnion +from ._stream_controller import StreamController +from ._turn import AssistantTurn, Turn, UserTurn + + +def merge_content_text(contents: list[ContentUnion]) -> list[ContentUnion]: + """Merge adjacent ContentText (and ContentThinking) fragments.""" + if not contents: + return [] + merged: list[ContentUnion] = [contents[0]] + for item in contents[1:]: + last = merged[-1] + if isinstance(last, ContentText) and isinstance(item, ContentText): + merged[-1] = ContentText.model_construct(text=last.text + item.text) + elif isinstance(last, ContentThinking) and isinstance(item, ContentThinking): + merged[-1] = ContentThinking(thinking=last.thinking + item.thinking) + else: + merged.append(item) + return merged + + +class TurnAccumulator: + """ + Manages the lifecycle of one streaming assistant turn. + + Mirrors ellmer's TurnAccumulator R6 class. The four stages are: + + 1. ``begin_turn(user_turn)`` — insert user + partial assistant into turns + 2. ``update_turn(content)`` — append streamed content to the partial turn + 3. ``complete_turn(turn)`` — replace partial with the full turn (skipped if cancelled) + 4. ``finalize_turn()`` — called from ``finally``; merges text fragments + and stamps the cancellation reason if the turn is still partial + """ + + def __init__( + self, + turns: list[Turn], + controller: StreamController, + ): + self._turns = turns + self._controller = controller + self._turn_idx: int | None = None + + def begin_turn(self, user_turn: UserTurn) -> None: + """Insert user turn and a partial assistant placeholder.""" + partial: AssistantTurn = AssistantTurn([], partial_reason="interrupted") + self._turns.extend([user_turn, partial]) + self._turn_idx = len(self._turns) - 1 + + def update_turn(self, content: Content) -> None: + """Append streamed content to the partial turn.""" + assert self._turn_idx is not None + self._turns[self._turn_idx].contents.append(cast(ContentUnion, content)) + + def complete_turn(self, turn: AssistantTurn) -> None: + """Replace the partial turn with the completed turn (no-op if cancelled).""" + assert self._turn_idx is not None + if self._controller.cancelled: + return + self._turns[self._turn_idx] = turn + + def finalize_turn(self) -> None: + """ + Safety net — called from ``finally``. + + If the turn is still partial (i.e., ``complete_turn`` was never called + or was skipped because of cancellation), merge adjacent text fragments + and stamp the cancellation reason. + """ + if self._turn_idx is None: + return + turn = self._turns[self._turn_idx] + if not isinstance(turn, AssistantTurn) or not turn.is_partial: + return + if self._controller.cancelled: + turn.partial_reason = self._controller.reason + turn.contents = merge_content_text(turn.contents) diff --git a/tests/test_turn_accumulator.py b/tests/test_turn_accumulator.py new file mode 100644 index 00000000..e55c8be1 --- /dev/null +++ b/tests/test_turn_accumulator.py @@ -0,0 +1,86 @@ +from chatlas._content import ContentText, ContentThinking, ContentToolRequest +from chatlas._turn import AssistantTurn, UserTurn +from chatlas._turn_accumulator import TurnAccumulator +from chatlas._stream_controller import StreamController + + +def test_begin_turn_inserts_partial(): + turns: list = [] + controller = StreamController() + acc = TurnAccumulator(turns, controller) + user = UserTurn("hello") + acc.begin_turn(user) + assert len(turns) == 2 + assert turns[0] is user + assert isinstance(turns[1], AssistantTurn) + assert turns[1].is_partial + + +def test_update_turn_appends_content(): + turns: list = [] + controller = StreamController() + acc = TurnAccumulator(turns, controller) + acc.begin_turn(UserTurn("hello")) + content = ContentText.model_construct(text="hi") + acc.update_turn(content) + assert len(turns[1].contents) == 1 + assert turns[1].contents[0].text == "hi" + + +def test_complete_turn_replaces_partial(): + turns: list = [] + controller = StreamController() + acc = TurnAccumulator(turns, controller) + acc.begin_turn(UserTurn("hello")) + full_turn = AssistantTurn("response", tokens=(10, 5, 0)) + acc.complete_turn(full_turn) + assert turns[1] is full_turn + assert not turns[1].is_partial + + +def test_complete_turn_skipped_when_cancelled(): + turns: list = [] + controller = StreamController() + acc = TurnAccumulator(turns, controller) + acc.begin_turn(UserTurn("hello")) + controller.cancel() + full_turn = AssistantTurn("response", tokens=(10, 5, 0)) + acc.complete_turn(full_turn) + assert turns[1].is_partial + + +def test_finalize_turn_merges_text_and_sets_reason(): + turns: list = [] + controller = StreamController() + acc = TurnAccumulator(turns, controller) + acc.begin_turn(UserTurn("hello")) + acc.update_turn(ContentText.model_construct(text="a")) + acc.update_turn(ContentText.model_construct(text="b")) + acc.finalize_turn() + assert turns[1].is_partial + assert turns[1].partial_reason == "interrupted" + assert len(turns[1].contents) == 1 + assert turns[1].contents[0].text == "ab" + + +def test_finalize_turn_uses_controller_reason(): + turns: list = [] + controller = StreamController() + acc = TurnAccumulator(turns, controller) + acc.begin_turn(UserTurn("hello")) + acc.update_turn(ContentText.model_construct(text="partial")) + controller.cancel(reason="user stopped") + acc.finalize_turn() + assert turns[1].partial_reason == "user stopped" + + +def test_finalize_turn_noops_after_complete(): + turns: list = [] + controller = StreamController() + acc = TurnAccumulator(turns, controller) + acc.begin_turn(UserTurn("hello")) + full_turn = AssistantTurn("response", tokens=(10, 5, 0)) + acc.complete_turn(full_turn) + acc.finalize_turn() + assert turns[1] is full_turn + assert not turns[1].is_partial From 2db64472a6e36682b93fd53d14d6ede21b74e57b Mon Sep 17 00:00:00 2001 From: Carson Date: Thu, 2 Apr 2026 14:28:23 -0500 Subject: [PATCH 14/22] fix: address code review feedback on TurnAccumulator - Replace assert with RuntimeError for precondition checks - Narrow update_turn param to ContentUnion (removes cast) - Use model_construct for ContentThinking merge (consistency) - Remove unused ContentToolRequest import from tests Co-Authored-By: Claude Opus 4.6 --- chatlas/_turn_accumulator.py | 18 ++++++++++-------- tests/test_turn_accumulator.py | 2 +- 2 files changed, 11 insertions(+), 9 deletions(-) diff --git a/chatlas/_turn_accumulator.py b/chatlas/_turn_accumulator.py index 1dd4a0fa..9d56884d 100644 --- a/chatlas/_turn_accumulator.py +++ b/chatlas/_turn_accumulator.py @@ -1,8 +1,6 @@ from __future__ import annotations -from typing import cast - -from ._content import Content, ContentText, ContentThinking, ContentUnion +from ._content import ContentText, ContentThinking, ContentUnion from ._stream_controller import StreamController from ._turn import AssistantTurn, Turn, UserTurn @@ -17,7 +15,9 @@ def merge_content_text(contents: list[ContentUnion]) -> list[ContentUnion]: if isinstance(last, ContentText) and isinstance(item, ContentText): merged[-1] = ContentText.model_construct(text=last.text + item.text) elif isinstance(last, ContentThinking) and isinstance(item, ContentThinking): - merged[-1] = ContentThinking(thinking=last.thinking + item.thinking) + merged[-1] = ContentThinking.model_construct( + thinking=last.thinking + item.thinking + ) else: merged.append(item) return merged @@ -51,14 +51,16 @@ def begin_turn(self, user_turn: UserTurn) -> None: self._turns.extend([user_turn, partial]) self._turn_idx = len(self._turns) - 1 - def update_turn(self, content: Content) -> None: + def update_turn(self, content: ContentUnion) -> None: """Append streamed content to the partial turn.""" - assert self._turn_idx is not None - self._turns[self._turn_idx].contents.append(cast(ContentUnion, content)) + if self._turn_idx is None: + raise RuntimeError("update_turn called before begin_turn") + self._turns[self._turn_idx].contents.append(content) def complete_turn(self, turn: AssistantTurn) -> None: """Replace the partial turn with the completed turn (no-op if cancelled).""" - assert self._turn_idx is not None + if self._turn_idx is None: + raise RuntimeError("complete_turn called before begin_turn") if self._controller.cancelled: return self._turns[self._turn_idx] = turn diff --git a/tests/test_turn_accumulator.py b/tests/test_turn_accumulator.py index e55c8be1..c3426917 100644 --- a/tests/test_turn_accumulator.py +++ b/tests/test_turn_accumulator.py @@ -1,4 +1,4 @@ -from chatlas._content import ContentText, ContentThinking, ContentToolRequest +from chatlas._content import ContentText, ContentThinking from chatlas._turn import AssistantTurn, UserTurn from chatlas._turn_accumulator import TurnAccumulator from chatlas._stream_controller import StreamController From 31696b03831af238dcce4985d740f9386e127950 Mon Sep 17 00:00:00 2001 From: Carson Date: Thu, 2 Apr 2026 14:34:07 -0500 Subject: [PATCH 15/22] refactor: rewrite _submit_turns streaming branches to use TurnAccumulator Delegates partial-turn lifecycle management to TurnAccumulator, replacing the inline for/else + partial turn index tracking with clean begin/update/ complete/finalize calls. Also closes the HTTP response in finally, drops the local merge_content_text (now in _turn_accumulator.py), and updates the test import accordingly. Co-Authored-By: Claude Sonnet 4.6 --- chatlas/_chat.py | 92 +++++++++++++++++----------------------------- tests/test_chat.py | 2 +- 2 files changed, 35 insertions(+), 59 deletions(-) diff --git a/chatlas/_chat.py b/chatlas/_chat.py index 4b9d0595..785bc374 100644 --- a/chatlas/_chat.py +++ b/chatlas/_chat.py @@ -52,6 +52,7 @@ from ._provider import ModelInfo, Provider, StandardModelParams, SubmitInputArgsT from ._stream_controller import StreamController from ._tokens import tokens_log +from ._turn_accumulator import TurnAccumulator from ._tools import Tool, ToolBuiltIn, ToolRejectError from ._turn import AssistantTurn, SystemTurn, Turn, UserTurn, user_turn from ._typing_extensions import TypedDict, TypeGuard @@ -2732,23 +2733,15 @@ def emit(text: str | Content): kwargs=all_kwargs, ) - partial_turn: AssistantTurn = AssistantTurn( - [], partial_reason="interrupted" - ) - self._turns.extend([user_turn, partial_turn]) - turn_idx = len(self._turns) - 1 + acc = TurnAccumulator(self._turns, controller) + acc.begin_turn(user_turn) try: result = None - # Note: this for/else relies on the fact that GeneratorExit - # (from gen.close()) is an exception, not a break — so it - # skips the else clause while still running the finally block. for chunk in response: content = self.provider.stream_content(chunk) if content is not None: - self._turns[turn_idx].contents.append( - cast(ContentUnion, content) - ) + acc.update_turn(cast(ContentUnion, content)) text = content_text(content) if text: emit(text) @@ -2758,11 +2751,11 @@ def emit(text: str | Content): yield content else: yield text - if controller.cancelled: - break + if controller.cancelled: + break result = self.provider.stream_merge_chunks(result, chunk) - else: - # Normal completion — replace partial with full turn + + if not controller.cancelled: turn = self.provider.stream_turn( result, has_data_model=data_model is not None, @@ -2781,13 +2774,14 @@ def emit(text: str | Content): ) if turn.tokens is not None: tokens_log(self.provider, turn.tokens) - self._turns[turn_idx] = turn + acc.complete_turn(turn) finally: - final_turn = self._turns[turn_idx] - if isinstance(final_turn, AssistantTurn) and final_turn.is_partial: - if controller.cancelled: - final_turn.partial_reason = controller.reason - final_turn.contents = merge_content_text(final_turn.contents) + acc.finalize_turn() + # response type is Iterable[Unknown]; hasattr guards runtime safety, + # cast(Any) needed because Pyright can't narrow through hasattr. + _r: Any = response + if hasattr(_r, "close"): + _r.close() else: response = self.provider.chat_perform( @@ -2878,23 +2872,15 @@ def emit(text: str | Content): kwargs=all_kwargs, ) - partial_turn: AssistantTurn = AssistantTurn( - [], partial_reason="interrupted" - ) - self._turns.extend([user_turn, partial_turn]) - turn_idx = len(self._turns) - 1 + acc = TurnAccumulator(self._turns, controller) + acc.begin_turn(user_turn) try: result = None - # Note: this for/else relies on the fact that GeneratorExit - # (from gen.aclose()) is an exception, not a break — so it - # skips the else clause while still running the finally block. async for chunk in response: content = self.provider.stream_content(chunk) if content is not None: - self._turns[turn_idx].contents.append( - cast(ContentUnion, content) - ) + acc.update_turn(cast(ContentUnion, content)) text = content_text(content) if text: emit(text) @@ -2904,11 +2890,11 @@ def emit(text: str | Content): yield content else: yield text - if controller.cancelled: - break + if controller.cancelled: + break result = self.provider.stream_merge_chunks(result, chunk) - else: - # Normal completion — replace partial with full turn + + if not controller.cancelled: turn = self.provider.stream_turn( result, has_data_model=data_model is not None, @@ -2927,13 +2913,19 @@ def emit(text: str | Content): ) if turn.tokens is not None: tokens_log(self.provider, turn.tokens) - self._turns[turn_idx] = turn + acc.complete_turn(turn) finally: - final_turn = self._turns[turn_idx] - if isinstance(final_turn, AssistantTurn) and final_turn.is_partial: - if controller.cancelled: - final_turn.partial_reason = controller.reason - final_turn.contents = merge_content_text(final_turn.contents) + acc.finalize_turn() + # response type is AsyncIterable[Unknown]; hasattr guards runtime + # safety, cast(Any) needed because Pyright can't narrow through hasattr. + _r: Any = response + if hasattr(_r, "aclose"): + await _r.aclose() + elif hasattr(_r, "close"): + if inspect.iscoroutinefunction(_r.close): + await _r.close() + else: + _r.close() else: response = await self.provider.chat_perform_async( @@ -3413,22 +3405,6 @@ def content_text(content: Content) -> str: return str(content) -def merge_content_text(contents: Sequence[Content]) -> list[ContentUnion]: - """Merge adjacent ContentText (and ContentThinking) fragments.""" - if not contents: - return [] - merged: list[ContentUnion] = [cast(ContentUnion, contents[0])] - for item in contents[1:]: - last = merged[-1] - if isinstance(last, ContentText) and isinstance(item, ContentText): - merged[-1] = ContentText.model_construct(text=last.text + item.text) - elif isinstance(last, ContentThinking) and isinstance(item, ContentThinking): - merged[-1] = ContentThinking(thinking=last.thinking + item.thinking) - else: - merged.append(cast(ContentUnion, item)) - return merged - - def is_quarto(): return os.getenv("QUARTO_PYTHON", None) is not None diff --git a/tests/test_chat.py b/tests/test_chat.py index cf5b2e84..c191878c 100644 --- a/tests/test_chat.py +++ b/tests/test_chat.py @@ -696,7 +696,7 @@ def test_partial_turns_excluded_from_cost(): def test_merge_content_text(): - from chatlas._chat import merge_content_text + from chatlas._turn_accumulator import merge_content_text from chatlas._content import ContentText, ContentThinking # Adjacent ContentText fragments merge From 383eae1e898904efea881bab32fe4c8e9bf0cce5 Mon Sep 17 00:00:00 2001 From: Carson Date: Thu, 2 Apr 2026 14:39:55 -0500 Subject: [PATCH 16/22] style: auto-format with ruff Co-Authored-By: Claude Opus 4.6 --- chatlas/_chat.py | 26 +++++++++++++++++++------- 1 file changed, 19 insertions(+), 7 deletions(-) diff --git a/chatlas/_chat.py b/chatlas/_chat.py index 785bc374..c46f3b18 100644 --- a/chatlas/_chat.py +++ b/chatlas/_chat.py @@ -52,9 +52,9 @@ from ._provider import ModelInfo, Provider, StandardModelParams, SubmitInputArgsT from ._stream_controller import StreamController from ._tokens import tokens_log -from ._turn_accumulator import TurnAccumulator from ._tools import Tool, ToolBuiltIn, ToolRejectError from ._turn import AssistantTurn, SystemTurn, Turn, UserTurn, user_turn +from ._turn_accumulator import TurnAccumulator from ._typing_extensions import TypedDict, TypeGuard from ._utils import MISSING, MISSING_TYPE, html_escape, wrap_async @@ -1175,7 +1175,9 @@ def stream( data_model: Optional[type[BaseModel]] = None, kwargs: Optional[SubmitInputArgsT] = None, controller: StreamController | None = None, - ) -> Generator[str | ContentThinking | ContentToolRequest | ContentToolResult, None, None]: ... + ) -> Generator[ + str | ContentThinking | ContentToolRequest | ContentToolResult, None, None + ]: ... def stream( self, @@ -1185,7 +1187,9 @@ def stream( data_model: Optional[type[BaseModel]] = None, kwargs: Optional[SubmitInputArgsT] = None, controller: StreamController | None = None, - ) -> Generator[str | ContentThinking | ContentToolRequest | ContentToolResult, None, None]: + ) -> Generator[ + str | ContentThinking | ContentToolRequest | ContentToolResult, None, None + ]: """ Generate a response from the chat in a streaming fashion. @@ -1289,7 +1293,9 @@ async def stream_async( data_model: Optional[type[BaseModel]] = None, kwargs: Optional[SubmitInputArgsT] = None, controller: StreamController | None = None, - ) -> AsyncGenerator[str | ContentThinking | ContentToolRequest | ContentToolResult, None]: ... + ) -> AsyncGenerator[ + str | ContentThinking | ContentToolRequest | ContentToolResult, None + ]: ... async def stream_async( self, @@ -1299,7 +1305,9 @@ async def stream_async( data_model: Optional[type[BaseModel]] = None, kwargs: Optional[SubmitInputArgsT] = None, controller: StreamController | None = None, - ) -> AsyncGenerator[str | ContentThinking | ContentToolRequest | ContentToolResult, None]: + ) -> AsyncGenerator[ + str | ContentThinking | ContentToolRequest | ContentToolResult, None + ]: """ Generate a response from the chat in a streaming fashion asynchronously. @@ -2534,7 +2542,9 @@ def _chat_impl( kwargs: Optional[SubmitInputArgsT] = None, data_model: Optional[type[BaseModel]] = None, controller: StreamController | None = None, - ) -> Generator[str | ContentThinking | ContentToolRequest | ContentToolResult, None, None]: ... + ) -> Generator[ + str | ContentThinking | ContentToolRequest | ContentToolResult, None, None + ]: ... def _chat_impl( self, @@ -2613,7 +2623,9 @@ def _chat_impl_async( kwargs: Optional[SubmitInputArgsT] = None, data_model: Optional[type[BaseModel]] = None, controller: StreamController | None = None, - ) -> AsyncGenerator[str | ContentThinking | ContentToolRequest | ContentToolResult, None]: ... + ) -> AsyncGenerator[ + str | ContentThinking | ContentToolRequest | ContentToolResult, None + ]: ... async def _chat_impl_async( self, From a2e145aad14623e61da85864af72c11f398de346 Mon Sep 17 00:00:00 2001 From: Carson Date: Thu, 2 Apr 2026 15:02:49 -0500 Subject: [PATCH 17/22] refactor: extract resolve_assistant_turn to deduplicate turn finalization MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Four copies of the validate-type/compute-tokens/compute-cost/log pattern (sync/async × streaming/non-streaming) consolidated into one function. Co-Authored-By: Claude Opus 4.6 --- chatlas/_chat.py | 63 +++++++++++++++--------------------------------- 1 file changed, 19 insertions(+), 44 deletions(-) diff --git a/chatlas/_chat.py b/chatlas/_chat.py index c46f3b18..b3ae2366 100644 --- a/chatlas/_chat.py +++ b/chatlas/_chat.py @@ -2774,18 +2774,7 @@ def emit(text: str | Content): ) if echo == "all": emit_other_contents(turn, emit) - if not isinstance(turn, AssistantTurn): - raise TypeError( - f"Expected turn to be AssistantTurn, got {type(turn).__name__}" - ) - if turn.tokens is None and turn.completion: - turn.tokens = self.provider.value_tokens(turn.completion) - if turn.cost is None and turn.completion: - turn.cost = self.provider.value_cost( - turn.completion, turn.tokens - ) - if turn.tokens is not None: - tokens_log(self.provider, turn.tokens) + turn = resolve_assistant_turn(self.provider, turn) acc.complete_turn(turn) finally: acc.finalize_turn() @@ -2814,16 +2803,7 @@ def emit(text: str | Content): if echo == "all": emit_other_contents(turn, emit) - if not isinstance(turn, AssistantTurn): - raise TypeError( - f"Expected turn to be AssistantTurn, got {type(turn).__name__}" - ) - if turn.tokens is None and turn.completion: - turn.tokens = self.provider.value_tokens(turn.completion) - if turn.cost is None and turn.completion: - turn.cost = self.provider.value_cost(turn.completion, turn.tokens) - if turn.tokens is not None: - tokens_log(self.provider, turn.tokens) + turn = resolve_assistant_turn(self.provider, turn) self._turns.extend([user_turn, turn]) @overload @@ -2913,18 +2893,7 @@ def emit(text: str | Content): ) if echo == "all": emit_other_contents(turn, emit) - if not isinstance(turn, AssistantTurn): - raise TypeError( - f"Expected turn to be AssistantTurn, got {type(turn).__name__}" - ) - if turn.tokens is None and turn.completion: - turn.tokens = self.provider.value_tokens(turn.completion) - if turn.cost is None and turn.completion: - turn.cost = self.provider.value_cost( - turn.completion, turn.tokens - ) - if turn.tokens is not None: - tokens_log(self.provider, turn.tokens) + turn = resolve_assistant_turn(self.provider, turn) acc.complete_turn(turn) finally: acc.finalize_turn() @@ -2958,16 +2927,7 @@ def emit(text: str | Content): if echo == "all": emit_other_contents(turn, emit) - if not isinstance(turn, AssistantTurn): - raise TypeError( - f"Expected turn to be AssistantTurn, got {type(turn).__name__}" - ) - if turn.tokens is None and turn.completion: - turn.tokens = self.provider.value_tokens(turn.completion) - if turn.cost is None and turn.completion: - turn.cost = self.provider.value_cost(turn.completion, turn.tokens) - if turn.tokens is not None: - tokens_log(self.provider, turn.tokens) + turn = resolve_assistant_turn(self.provider, turn) self._turns.extend([user_turn, turn]) def _collect_all_kwargs( @@ -3417,6 +3377,21 @@ def content_text(content: Content) -> str: return str(content) +def resolve_assistant_turn(provider: Provider, turn: Turn) -> AssistantTurn: + """Validate turn type, compute tokens and cost, and log usage.""" + if not isinstance(turn, AssistantTurn): + raise TypeError( + f"Expected turn to be AssistantTurn, got {type(turn).__name__}" + ) + if turn.tokens is None and turn.completion: + turn.tokens = provider.value_tokens(turn.completion) + if turn.cost is None and turn.completion: + turn.cost = provider.value_cost(turn.completion, turn.tokens) + if turn.tokens is not None: + tokens_log(provider, turn.tokens) + return turn + + def is_quarto(): return os.getenv("QUARTO_PYTHON", None) is not None From 1fd01a3b39a73a1162087decd8b612a07d60e86e Mon Sep 17 00:00:00 2001 From: Carson Date: Thu, 2 Apr 2026 15:43:41 -0500 Subject: [PATCH 18/22] =?UTF-8?q?fix:=20address=20code=20review=20?= =?UTF-8?q?=E2=80=94=20auto-reset=20stale=20controllers,=20fix=20partial?= =?UTF-8?q?=20turn=20filtering?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Add _ensure_ready() to StreamController that warns and auto-resets if already cancelled (aligns with ellmer's as_controller() behavior) - Add _as_controller() helper, replacing redundant StreamController() creation at 6 call sites with one consistent pattern - Widen TurnAccumulator.update_turn to accept Content, removing 2 cast sites and the ContentUnion import from _chat.py - Fix get_tokens() to filter partial turns at any position in history, not just trailing (aligns with ellmer's discard approach) Co-Authored-By: Claude Opus 4.6 --- chatlas/_chat.py | 50 ++++++++++++++++++--------------- chatlas/_stream_controller.py | 12 ++++++++ chatlas/_turn_accumulator.py | 9 ++++-- tests/test_chat.py | 20 +++++++++++++ tests/test_stream_controller.py | 23 +++++++++++++++ 5 files changed, 89 insertions(+), 25 deletions(-) diff --git a/chatlas/_chat.py b/chatlas/_chat.py index b3ae2366..f3cf6cdc 100644 --- a/chatlas/_chat.py +++ b/chatlas/_chat.py @@ -37,7 +37,6 @@ ContentThinking, ContentToolRequest, ContentToolResult, - ContentUnion, ToolInfo, ) from ._display import ( @@ -402,13 +401,18 @@ def get_tokens(self) -> list[TokensDict]: turns = self.get_turns(include_system_prompt=False) - # Exclude trailing partial turn (and its preceding user turn) - while ( - len(turns) >= 2 - and isinstance(turns[-1], AssistantTurn) - and turns[-1].is_partial - ): - turns = turns[:-2] + # Exclude partial assistant turns and their preceding user turns + # (partial turns have no token data) + filtered: list[Turn] = [] + i = 0 + while i < len(turns): + next_turn = turns[i + 1] if i + 1 < len(turns) else None + if isinstance(next_turn, AssistantTurn) and next_turn.is_partial: + i += 2 # skip user + partial assistant pair + else: + filtered.append(turns[i]) + i += 1 + turns = filtered if len(turns) == 0: return [] @@ -1251,8 +1255,7 @@ class Person(BaseModel): display = self._markdown_display(echo=echo) - if controller is None: - controller = StreamController() + controller = _as_controller(controller) generator = self._chat_impl( turn, @@ -1374,8 +1377,7 @@ class Person(BaseModel): display = self._markdown_display(echo=echo) - if controller is None: - controller = StreamController() + controller = _as_controller(controller) async def wrapper() -> AsyncGenerator[ str | ContentThinking | ContentToolRequest | ContentToolResult, None @@ -2556,8 +2558,7 @@ def _chat_impl( data_model: Optional[type[BaseModel]] = None, controller: StreamController | None = None, ) -> Generator[str | Content, None, None]: - if controller is None: - controller = StreamController() + controller = _as_controller(controller) user_turn_result: UserTurn | None = user_turn while user_turn_result is not None: @@ -2637,8 +2638,7 @@ async def _chat_impl_async( data_model: Optional[type[BaseModel]] = None, controller: StreamController | None = None, ) -> AsyncGenerator[str | Content, None]: - if controller is None: - controller = StreamController() + controller = _as_controller(controller) user_turn_result: UserTurn | None = user_turn while user_turn_result is not None: @@ -2719,8 +2719,7 @@ def _submit_turns( content_mode: Literal["text", "all"] = "text", controller: StreamController | None = None, ) -> Generator[str | Content, None, None]: - if controller is None: - controller = StreamController() + controller = _as_controller(controller) if any(isinstance(x, Tool) and x._is_async for x in self._tools.values()): raise ValueError("Cannot use async tools in a synchronous chat") @@ -2753,7 +2752,7 @@ def emit(text: str | Content): for chunk in response: content = self.provider.stream_content(chunk) if content is not None: - acc.update_turn(cast(ContentUnion, content)) + acc.update_turn(content) text = content_text(content) if text: emit(text) @@ -2841,8 +2840,7 @@ async def _submit_turns_async( content_mode: Literal["text", "all"] = "text", controller: StreamController | None = None, ) -> AsyncGenerator[str | Content, None]: - if controller is None: - controller = StreamController() + controller = _as_controller(controller) def emit(text: str | Content): self._echo_content(str(text)) @@ -2872,7 +2870,7 @@ def emit(text: str | Content): async for chunk in response: content = self.provider.stream_content(chunk) if content is not None: - acc.update_turn(cast(ContentUnion, content)) + acc.update_turn(content) text = content_text(content) if text: emit(text) @@ -3377,6 +3375,14 @@ def content_text(content: Content) -> str: return str(content) +def _as_controller(controller: StreamController | None) -> StreamController: + """Ensure a non-None, ready-to-use StreamController.""" + if controller is None: + return StreamController() + controller._ensure_ready() + return controller + + def resolve_assistant_turn(provider: Provider, turn: Turn) -> AssistantTurn: """Validate turn type, compute tokens and cost, and log usage.""" if not isinstance(turn, AssistantTurn): diff --git a/chatlas/_stream_controller.py b/chatlas/_stream_controller.py index 523e2c81..60e246d3 100644 --- a/chatlas/_stream_controller.py +++ b/chatlas/_stream_controller.py @@ -53,6 +53,18 @@ def reset(self) -> None: self._cancelled = False self._reason = None + def _ensure_ready(self) -> None: + """Auto-reset if already cancelled (prevents stale controller bugs).""" + if self._cancelled: + import warnings + + warnings.warn( + "StreamController was already cancelled — resetting automatically. " + "Call controller.reset() explicitly to avoid this warning.", + stacklevel=3, + ) + self.reset() + @property def cancelled(self) -> bool: """Whether the controller has been cancelled.""" diff --git a/chatlas/_turn_accumulator.py b/chatlas/_turn_accumulator.py index 9d56884d..2d54eb3c 100644 --- a/chatlas/_turn_accumulator.py +++ b/chatlas/_turn_accumulator.py @@ -1,6 +1,6 @@ from __future__ import annotations -from ._content import ContentText, ContentThinking, ContentUnion +from ._content import Content, ContentText, ContentThinking, ContentUnion from ._stream_controller import StreamController from ._turn import AssistantTurn, Turn, UserTurn @@ -51,11 +51,14 @@ def begin_turn(self, user_turn: UserTurn) -> None: self._turns.extend([user_turn, partial]) self._turn_idx = len(self._turns) - 1 - def update_turn(self, content: ContentUnion) -> None: + def update_turn(self, content: Content) -> None: """Append streamed content to the partial turn.""" if self._turn_idx is None: raise RuntimeError("update_turn called before begin_turn") - self._turns[self._turn_idx].contents.append(content) + # Content is the base class; contents is typed as list[ContentUnion] + # (discriminated union). At runtime all Content subclasses are ContentUnion + # members, so the append is safe. + self._turns[self._turn_idx].contents.append(content) # type: ignore[arg-type] def complete_turn(self, turn: AssistantTurn) -> None: """Replace the partial turn with the completed turn (no-op if cancelled).""" diff --git a/tests/test_chat.py b/tests/test_chat.py index c191878c..2a570d9f 100644 --- a/tests/test_chat.py +++ b/tests/test_chat.py @@ -695,6 +695,26 @@ def test_partial_turns_excluded_from_cost(): assert len(tokens) == 2 # user + assistant from the complete turn only +def test_partial_turns_excluded_from_tokens_mid_conversation(): + """Partial turns in the middle of history (not just trailing) are excluded.""" + chat = ChatOpenAI() + chat.set_turns( + [ + UserTurn("hello"), + AssistantTurn("partial", partial_reason="interrupted"), + UserTurn("retry"), + AssistantTurn("response", tokens=(10, 5, 0), cost=0.001), + ] + ) + # Cost should only include the complete turn + cost = chat.get_cost() + assert cost == 0.001 + + # get_tokens should skip the partial pair entirely + tokens = chat.get_tokens() + assert len(tokens) == 2 # user + assistant from the complete turn only + + def test_merge_content_text(): from chatlas._turn_accumulator import merge_content_text from chatlas._content import ContentText, ContentThinking diff --git a/tests/test_stream_controller.py b/tests/test_stream_controller.py index f20f7423..7aad2fd0 100644 --- a/tests/test_stream_controller.py +++ b/tests/test_stream_controller.py @@ -1,3 +1,5 @@ +import warnings + import pytest from chatlas import ChatOpenAI, StreamController @@ -31,6 +33,27 @@ def test_stream_controller_reset(): assert ctrl.reason is None +def test_stream_controller_ensure_ready_warns_and_resets(): + ctrl = StreamController() + ctrl.cancel(reason="stale") + with warnings.catch_warnings(record=True) as w: + warnings.simplefilter("always") + ctrl._ensure_ready() + assert len(w) == 1 + assert "already cancelled" in str(w[0].message) + assert ctrl.cancelled is False + assert ctrl.reason is None + + +def test_stream_controller_ensure_ready_noop_when_not_cancelled(): + ctrl = StreamController() + with warnings.catch_warnings(record=True) as w: + warnings.simplefilter("always") + ctrl._ensure_ready() + assert len(w) == 0 + assert ctrl.cancelled is False + + @pytest.mark.vcr def test_stream_cancel_after_chunks(): chat = ChatOpenAI() From a63a0b8735ed19e3060ba4d489df4124266345f2 Mon Sep 17 00:00:00 2001 From: Carson Date: Thu, 7 May 2026 20:03:15 -0500 Subject: [PATCH 19/22] refactor: move streaming content logic into TurnAccumulator Extract thinking-delta phase tracking and content emit/yield logic from the duplicated sync/async streaming loops into TurnAccumulator.process_content() and flush_thinking(). Also add cancellation section to the streaming docs. --- chatlas/_chat.py | 78 +++-------------------------- chatlas/_turn_accumulator.py | 97 +++++++++++++++++++++++++++++++----- docs/get-started/stream.qmd | 22 ++++++++ 3 files changed, 112 insertions(+), 85 deletions(-) diff --git a/chatlas/_chat.py b/chatlas/_chat.py index c13e0242..12f5c11c 100644 --- a/chatlas/_chat.py +++ b/chatlas/_chat.py @@ -34,7 +34,6 @@ Content, ContentJson, ContentText, - ContentThinking, ContentThinkingDelta, ContentToolRequest, ContentToolResult, @@ -2752,41 +2751,15 @@ def emit(text: str | Content): try: result = None - inside_thinking = False - for chunk in response: if controller.cancelled: break content = self.provider.stream_content(chunk) if content is not None: - acc.update_turn(content) - if is_thinking_delta(content) and not inside_thinking: - content = ContentThinkingDelta( - thinking=content.thinking, phase="start" - ) - emit("\n") - inside_thinking = True - elif not is_thinking_delta(content) and inside_thinking: - emit("\n\n\n") - if content_mode == "all": - yield ContentThinkingDelta(thinking="", phase="end") - inside_thinking = False - - if is_thinking_delta(content): - emit(content.thinking) - if content_mode == "all": - yield content - else: - text = content_text(content) - if text: - emit(text) - yield text + yield from acc.process_content(content, content_mode, emit) result = self.provider.stream_merge_chunks(result, chunk) - if inside_thinking: - emit("\n\n\n") - if content_mode == "all": - yield ContentThinkingDelta(thinking="", phase="end") + yield from acc.flush_thinking(content_mode, emit) if not controller.cancelled: turn = self.provider.stream_turn( @@ -2887,41 +2860,17 @@ def emit(text: str | Content): try: result = None - inside_thinking = False - async for chunk in response: if controller.cancelled: break content = self.provider.stream_content(chunk) if content is not None: - acc.update_turn(content) - if is_thinking_delta(content) and not inside_thinking: - content = ContentThinkingDelta( - thinking=content.thinking, phase="start" - ) - emit("\n") - inside_thinking = True - elif not is_thinking_delta(content) and inside_thinking: - emit("\n\n\n") - if content_mode == "all": - yield ContentThinkingDelta(thinking="", phase="end") - inside_thinking = False - - if is_thinking_delta(content): - emit(content.thinking) - if content_mode == "all": - yield content - else: - text = content_text(content) - if text: - emit(text) - yield text + for item in acc.process_content(content, content_mode, emit): + yield item result = self.provider.stream_merge_chunks(result, chunk) - if inside_thinking: - emit("\n\n\n") - if content_mode == "all": - yield ContentThinkingDelta(thinking="", phase="end") + for item in acc.flush_thinking(content_mode, emit): + yield item if not controller.cancelled: turn = self.provider.stream_turn( @@ -3403,21 +3352,6 @@ class ToolFailureWarning(RuntimeWarning): warnings.simplefilter("always", ToolFailureWarning) -def is_thinking_delta(content: Content) -> TypeGuard[ContentThinkingDelta]: - return isinstance(content, ContentThinkingDelta) - - -def content_text(content: Content) -> str: - """Extract displayable text from a Content object.""" - if isinstance(content, ContentThinkingDelta): - return content.thinking - if isinstance(content, ContentThinking): - return content.thinking - if isinstance(content, ContentText): - return content.text - return str(content) - - def _as_controller(controller: StreamController | None) -> StreamController: """Ensure a non-None, ready-to-use StreamController.""" if controller is None: diff --git a/chatlas/_turn_accumulator.py b/chatlas/_turn_accumulator.py index 8ed0f7cd..c081ba68 100644 --- a/chatlas/_turn_accumulator.py +++ b/chatlas/_turn_accumulator.py @@ -1,6 +1,14 @@ from __future__ import annotations -from ._content import Content, ContentText, ContentThinking, ContentUnion +from typing import Callable, Literal, Sequence + +from ._content import ( + Content, + ContentText, + ContentThinking, + ContentThinkingDelta, + ContentUnion, +) from ._stream_controller import StreamController from ._turn import AssistantTurn, Turn, UserTurn @@ -28,12 +36,14 @@ class TurnAccumulator: """ Manages the lifecycle of one streaming assistant turn. - Mirrors ellmer's TurnAccumulator R6 class. The four stages are: + Mirrors ellmer's TurnAccumulator R6 class. The stages are: 1. ``begin_turn(user_turn)`` — insert user + partial assistant into turns - 2. ``update_turn(content)`` — append streamed content to the partial turn - 3. ``complete_turn(turn)`` — replace partial with the full turn (skipped if cancelled) - 4. ``finalize_turn()`` — called from ``finally``; merges text fragments + 2. ``process_content(content, ...)`` — append content, handle thinking + phase boundaries, emit display text, and return items to yield + 3. ``flush_thinking(...)`` — emit closing thinking tags after the loop + 4. ``complete_turn(turn)`` — replace partial with the full turn (skipped if cancelled) + 5. ``finalize_turn()`` — called from ``finally``; merges text fragments and stamps the cancellation reason if the turn is still partial """ @@ -45,6 +55,7 @@ def __init__( self._turns = turns self._controller = controller self._turn_idx: int | None = None + self._inside_thinking: bool = False def begin_turn(self, user_turn: UserTurn) -> None: """Insert user turn and a partial assistant placeholder.""" @@ -52,14 +63,54 @@ def begin_turn(self, user_turn: UserTurn) -> None: self._turns.extend([user_turn, partial]) self._turn_idx = len(self._turns) - 1 - def update_turn(self, content: Content) -> None: - """Append streamed content to the partial turn.""" - if self._turn_idx is None: - raise RuntimeError("update_turn called before begin_turn") - # Content is the base class; contents is typed as list[ContentUnion] - # (discriminated union). At runtime all Content subclasses are ContentUnion - # members, so the append is safe. - self._turns[self._turn_idx].contents.append(content) # type: ignore[arg-type] + def process_content( + self, + content: Content, + content_mode: Literal["text", "all"], + emit: Callable[[str | Content], None], + ) -> Sequence[str | Content]: + """Append content to the turn, emit display text, return items to yield.""" + self._update_turn(content) + + items: list[str | Content] = [] + + if isinstance(content, ContentThinkingDelta) and not self._inside_thinking: + content = ContentThinkingDelta( + thinking=content.thinking, phase="start" + ) + emit("\n") + self._inside_thinking = True + elif not isinstance(content, ContentThinkingDelta) and self._inside_thinking: + emit("\n\n\n") + if content_mode == "all": + items.append(ContentThinkingDelta(thinking="", phase="end")) + self._inside_thinking = False + + if isinstance(content, ContentThinkingDelta): + emit(content.thinking) + if content_mode == "all": + items.append(content) + else: + text = _content_text(content) + if text: + emit(text) + items.append(text) + + return items + + def flush_thinking( + self, + content_mode: Literal["text", "all"], + emit: Callable[[str | Content], None], + ) -> Sequence[str | Content]: + """Emit closing thinking tags if the stream ended mid-thinking.""" + if not self._inside_thinking: + return [] + self._inside_thinking = False + emit("\n\n\n") + if content_mode == "all": + return [ContentThinkingDelta(thinking="", phase="end")] + return [] def complete_turn(self, turn: AssistantTurn) -> None: """Replace the partial turn with the completed turn (no-op if cancelled).""" @@ -85,3 +136,23 @@ def finalize_turn(self) -> None: if self._controller.cancelled: turn.partial_reason = self._controller.reason turn.contents = merge_content_text(turn.contents) + + def _update_turn(self, content: Content) -> None: + """Append streamed content to the partial turn.""" + if self._turn_idx is None: + raise RuntimeError("_update_turn called before begin_turn") + # Content is the base class; contents is typed as list[ContentUnion] + # (discriminated union). At runtime all Content subclasses are ContentUnion + # members, so the append is safe. + self._turns[self._turn_idx].contents.append(content) # type: ignore[arg-type] + + +def _content_text(content: Content) -> str: + """Extract displayable text from a Content object.""" + if isinstance(content, ContentThinkingDelta): + return content.thinking + if isinstance(content, ContentThinking): + return content.thinking + if isinstance(content, ContentText): + return content.text + return str(content) diff --git a/docs/get-started/stream.qmd b/docs/get-started/stream.qmd index f5da3b62..228e37cc 100644 --- a/docs/get-started/stream.qmd +++ b/docs/get-started/stream.qmd @@ -78,6 +78,28 @@ for chunk in stream: ``` +## Cancelling a stream + +Use a [](`~chatlas.StreamController`) to stop a stream early while preserving what's been generated so far. Pass a controller to `.stream()`, then call `controller.cancel()` when you want to stop. + +```python +from chatlas import StreamController + +ctrl = StreamController() + +chunks = [] +for chunk in chat.stream("Write a long story", controller=ctrl): + chunks.append(chunk) + if len(chunks) >= 5: + ctrl.cancel() + +# The partial response is preserved in history +last_turn = chat.get_last_turn() +print(last_turn.text) +``` + +This is useful for building UIs with a "stop generating" button (e.g., in a [Shiny chatbot](chatbots.qmd)). The same controller can be reused across multiple streams by calling `controller.reset()` before starting a new stream. + ## Wrapping generators Sometimes it's useful to wrap the `.stream()` generator up into another generator function. From 5cb333d0b77f57ef71ddf82f95da738f71b75bcc Mon Sep 17 00:00:00 2001 From: Carson Date: Thu, 7 May 2026 20:21:48 -0500 Subject: [PATCH 20/22] docs: explain why streaming responses need explicit close() --- chatlas/_chat.py | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/chatlas/_chat.py b/chatlas/_chat.py index 12f5c11c..abd90292 100644 --- a/chatlas/_chat.py +++ b/chatlas/_chat.py @@ -2772,6 +2772,10 @@ def emit(text: str | Content): acc.complete_turn(turn) finally: acc.finalize_turn() + # Breaking out of `for chunk in response` doesn't close the + # provider's underlying HTTP connection — only the temporary + # iterator is released. Explicitly close so connections aren't + # held until GC collects the Stream object. _r: Any = response if hasattr(_r, "close"): _r.close() @@ -2883,6 +2887,9 @@ def emit(text: str | Content): acc.complete_turn(turn) finally: acc.finalize_turn() + # Same as sync path above, but async generator finalization + # (PEP 525) is even less deterministic — abandoned generators + # may not close until event loop shutdown. _r: Any = response if hasattr(_r, "aclose"): await _r.aclose() From 1602000996dc16922d86f34baecea8c418b50cd2 Mon Sep 17 00:00:00 2001 From: Carson Date: Thu, 7 May 2026 20:24:19 -0500 Subject: [PATCH 21/22] chore: move stream cancellation changelog entries to unreleased --- CHANGELOG.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index d3fc90f2..1b9cfab8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### New features * `ChatOpenAICompletions()` (and providers built on it like `ChatDeepSeek`, `ChatOpenRouter`, etc.) now extracts `reasoning_content` from model responses as `ContentThinking` objects. A new `preserve_thinking` parameter controls whether reasoning content is sent back to the API in multi-turn conversations; it defaults to `False` but is set to `True` for `ChatDeepSeek` (required for V4 tool-calling) and `ChatOpenRouter` (recommended for quality). (#295) +* New `StreamController` class for cooperative stream cancellation. Pass a controller to `.stream()` or `.stream_async()` and call `controller.cancel()` to stop the stream cleanly (e.g., from a Shiny "stop generating" button). The partial response is preserved in conversation history. (#279) +* When a stream is interrupted (closed early, cancelled, or errors), the accumulated content is now saved as a partial `AssistantTurn` so conversation state isn't lost. Partial turns display `[interrupted]` (or the cancellation reason) in the `Chat` repr and are excluded from token/cost accounting. (#279) ### Improvements @@ -34,8 +36,6 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 * New `ChatLMStudio()` provider for chatting with local models via [LM Studio](https://lmstudio.ai). (#280) * The `.stream()` and `.stream_async()` methods now yield `ContentThinking` objects (instead of plain strings) for thinking/reasoning content when `content="all"`. This allows downstream packages like shinychat to provide specific UI for thinking content. (#276) -* New `StreamController` class for cooperative stream cancellation. Pass a controller to `.stream()` or `.stream_async()` and call `controller.cancel()` to stop the stream cleanly (e.g., from a Shiny "stop generating" button). The partial response is preserved in conversation history. (#279) -* When a stream is interrupted (closed early, cancelled, or errors), the accumulated content is now saved as a partial `AssistantTurn` so conversation state isn't lost. Partial turns display `[interrupted]` (or the cancellation reason) in the `Chat` repr and are excluded from token/cost accounting. (#279) * Built-in tools (`tool_web_search()`, `tool_web_fetch()`) now include `description` and `annotations` properties, making their metadata consistent with user-defined tools created by `Tool()`. (#278) ### Bug fixes From 5b66d8dfcdfae2352d6da053f8c471b465735ee8 Mon Sep 17 00:00:00 2001 From: Carson Date: Fri, 8 May 2026 11:07:15 -0500 Subject: [PATCH 22/22] fix: use private _update_turn in TurnAccumulator tests --- tests/test_turn_accumulator.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/tests/test_turn_accumulator.py b/tests/test_turn_accumulator.py index c3426917..3a28f6d9 100644 --- a/tests/test_turn_accumulator.py +++ b/tests/test_turn_accumulator.py @@ -1,4 +1,4 @@ -from chatlas._content import ContentText, ContentThinking +from chatlas._content import ContentText from chatlas._turn import AssistantTurn, UserTurn from chatlas._turn_accumulator import TurnAccumulator from chatlas._stream_controller import StreamController @@ -22,7 +22,7 @@ def test_update_turn_appends_content(): acc = TurnAccumulator(turns, controller) acc.begin_turn(UserTurn("hello")) content = ContentText.model_construct(text="hi") - acc.update_turn(content) + acc._update_turn(content) assert len(turns[1].contents) == 1 assert turns[1].contents[0].text == "hi" @@ -54,8 +54,8 @@ def test_finalize_turn_merges_text_and_sets_reason(): controller = StreamController() acc = TurnAccumulator(turns, controller) acc.begin_turn(UserTurn("hello")) - acc.update_turn(ContentText.model_construct(text="a")) - acc.update_turn(ContentText.model_construct(text="b")) + acc._update_turn(ContentText.model_construct(text="a")) + acc._update_turn(ContentText.model_construct(text="b")) acc.finalize_turn() assert turns[1].is_partial assert turns[1].partial_reason == "interrupted" @@ -68,7 +68,7 @@ def test_finalize_turn_uses_controller_reason(): controller = StreamController() acc = TurnAccumulator(turns, controller) acc.begin_turn(UserTurn("hello")) - acc.update_turn(ContentText.model_construct(text="partial")) + acc._update_turn(ContentText.model_construct(text="partial")) controller.cancel(reason="user stopped") acc.finalize_turn() assert turns[1].partial_reason == "user stopped"