feat: partial turn preservation and cooperative stream cancellation#279
Open
feat: partial turn preservation and cooperative stream cancellation#279
Conversation
Add partial_reason field and is_partial property to AssistantTurn for marking incomplete turns on stream interruption. Add merge_content_text() helper to combine adjacent ContentText/ContentThinking fragments. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Restructure _submit_turns and _submit_turns_async to eagerly append a partial AssistantTurn to self._turns before streaming begins. On each chunk, content is appended to the partial turn in-place. On normal completion, the partial turn is replaced with the full turn. On interruption (GeneratorExit, KeyboardInterrupt, CancelledError), the finally block merges adjacent content fragments via merge_content_text(). Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Partial turns (from interrupted streams) have no token or cost data. Filter them out in get_cost() and get_tokens() to avoid errors. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Partial assistant turns now show their partial_reason (e.g. [interrupted]) instead of token counts. Token/cost totals in the Chat header exclude partial turns. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Cast Content to ContentUnion for list append compatibility and merge_content_text results. Use isinstance check in finally block instead of accessing is_partial on Turn base type. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
StreamController provides a simple cancel/reset/cancelled/reason API for cooperatively cancelling streaming responses. Exported from chatlas. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Thread StreamController through stream → _chat_impl → _submit_turns (and async equivalents). When controller.cancelled is True, the streaming loop breaks and the partial turn's reason is set from the controller. Also skips tool invocation when cancelled. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Both chat() and chat_async() now create an internal StreamController and thread it through _chat_impl. This ensures the try/finally partial turn machinery is always active, even for non-streaming chat calls. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
- Capture all content types (not just text) in partial turns so ContentToolRequest etc. aren't silently dropped on interruption - Default-create StreamController when none provided, eliminating all `if controller is not None` guards - Add comments explaining for/else + GeneratorExit interaction - Add thread-safety comment on StreamController.cancel() ordering - Return list[ContentUnion] from merge_content_text to avoid casts Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
3 tasks
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Introduces TurnAccumulator in chatlas/_turn_accumulator.py mirroring ellmer's R6 class, along with merge_content_text helper and full test coverage in tests/test_turn_accumulator.py. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
- Replace assert with RuntimeError for precondition checks - Narrow update_turn param to ContentUnion (removes cast) - Use model_construct for ContentThinking merge (consistency) - Remove unused ContentToolRequest import from tests Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…ator Delegates partial-turn lifecycle management to TurnAccumulator, replacing the inline for/else + partial turn index tracking with clean begin/update/ complete/finalize calls. Also closes the HTTP response in finally, drops the local merge_content_text (now in _turn_accumulator.py), and updates the test import accordingly. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…tion Four copies of the validate-type/compute-tokens/compute-cost/log pattern (sync/async × streaming/non-streaming) consolidated into one function. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…turn filtering - Add _ensure_ready() to StreamController that warns and auto-resets if already cancelled (aligns with ellmer's as_controller() behavior) - Add _as_controller() helper, replacing redundant StreamController() creation at 6 call sites with one consistent pattern - Widen TurnAccumulator.update_turn to accept Content, removing 2 cast sites and the ContentUnion import from _chat.py - Fix get_tokens() to filter partial turns at any position in history, not just trailing (aligns with ellmer's discard approach) Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Contributor
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 15 out of 15 changed files in this pull request and generated 1 comment.
Comments suppressed due to low confidence (2)
chatlas/_chat.py:1271
stream()wraps_chat_impl()but doesn’t explicitly close the underlying generator if the caller closes the wrapper early. Because the partial-turn preservation relies on generator finalization (finallyin_submit_turns), it’s safer to ensuregenerator.close()is called in afinallyblock insidewrapper()so the partial turn (and provider response) are finalized deterministically (especially on non-refcounted Python implementations).
controller=controller,
)
def wrapper() -> Generator[
str | ContentThinking | ContentToolRequest | ContentToolResult, None, None
chatlas/_chat.py:1386
- Similar to the sync path:
stream_async()’s wrapper doesn’t explicitly ensure the underlying async generator is closed when the wrapper is closed early. Adding atry/finallythat awaits the inner generator’saclose()(when available) would make partial-turn preservation and transport cleanup deterministic.
controller = _as_controller(controller)
async def wrapper() -> AsyncGenerator[
str | ContentThinking | ContentToolRequest | ContentToolResult, None
]:
with display:
async for chunk in self._chat_impl_async(
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
cpsievert
commented
Apr 2, 2026
cpsievert
commented
Apr 2, 2026
Resolve conflicts integrating ContentThinkingDelta streaming changes from main with stream cancellation/partial turn preservation. Also address PR feedback: - Move cancellation check to top of loop iteration for responsiveness - Move warnings import to top-level in _stream_controller.py - Preserve extra metadata when merging ContentThinking fragments - Update VCR cassettes for new default model (gpt-5.4)
Extract thinking-delta phase tracking and content emit/yield logic from the duplicated sync/async streaming loops into TurnAccumulator.process_content() and flush_thinking(). Also add cancellation section to the streaming docs.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
PR 2 in the streaming improvements series (after #276). Python port of ellmer's tidyverse/ellmer#951. Adds:
AssistantTurnwithpartial_reasonset, so conversation state isn't lostStreamController: A cooperative cancellation mechanism forstream()andstream_async()— callers can request the stream stop cleanly viacontroller.cancel(), which triggers the partial turn preservation path[interrupted]in the Chat repr; partial turns are excluded from token accounting and cost calculationsChanges
chatlas/_turn.py: Addedpartial_reasonfield toAssistantTurnandmerge_content_texthelperchatlas/_stream_controller.py: NewStreamControllerclass for cooperative cancellationchatlas/_chat.py:stream()/stream_async()accept optionalcontrollerparameter;_submit_turns/_submit_turns_asyncwrap streaming in try/finally to preserve partial turns on interruptionchatlas/__init__.py: ExportStreamControllerTest plan
make check-typespasses (0 errors)[interrupted]display in Chat repr