From 1e5dc542b306f091f90346c5f4a380f4152693b1 Mon Sep 17 00:00:00 2001 From: yaowubarbara <113857460+yaowubarbara@users.noreply.github.com> Date: Tue, 17 Feb 2026 22:22:40 +0800 Subject: [PATCH 1/3] feat: use StreamingChunk.reasoning field for Anthropic thinking content Migrate Anthropic streaming to use the dedicated reasoning field instead of storing thinking content in StreamingChunk.meta. --- .../generators/anthropic/chat/utils.py | 114 ++++++++++-------- integrations/anthropic/tests/test_utils.py | 6 + 2 files changed, 68 insertions(+), 52 deletions(-) diff --git a/integrations/anthropic/src/haystack_integrations/components/generators/anthropic/chat/utils.py b/integrations/anthropic/src/haystack_integrations/components/generators/anthropic/chat/utils.py index e992d3a5fd..6708077e6f 100644 --- a/integrations/anthropic/src/haystack_integrations/components/generators/anthropic/chat/utils.py +++ b/integrations/anthropic/src/haystack_integrations/components/generators/anthropic/chat/utils.py @@ -329,6 +329,7 @@ def _convert_anthropic_chunk_to_streaming_chunk( """ content = "" tool_calls = [] + reasoning = None start = False finish_reason = None index = getattr(chunk, "index", None) @@ -348,6 +349,12 @@ def _convert_anthropic_chunk_to_streaming_chunk( tool_name=chunk.content_block.name, ) ) + elif chunk.content_block.type == "thinking": + reasoning = ReasoningContent(reasoning_text="") + elif chunk.content_block.type == "redacted_thinking": + reasoning = ReasoningContent( + reasoning_text="", extra={"redacted_thinking": getattr(chunk.content_block, "data", "")} + ) # delta of a content block elif chunk.type == "content_block_delta": @@ -355,6 +362,10 @@ def _convert_anthropic_chunk_to_streaming_chunk( content = chunk.delta.text elif chunk.delta.type == "input_json_delta": tool_calls.append(ToolCallDelta(index=tool_call_index, arguments=chunk.delta.partial_json)) + elif chunk.delta.type == "thinking_delta": + reasoning = ReasoningContent(reasoning_text=chunk.delta.thinking) + elif chunk.delta.type == "signature_delta": + reasoning = ReasoningContent(reasoning_text="", extra={"signature": chunk.delta.signature}) # end of streaming message elif chunk.type == "message_delta": @@ -369,6 +380,7 @@ def _convert_anthropic_chunk_to_streaming_chunk( start=start, finish_reason=finish_reason, tool_calls=tool_calls if tool_calls else None, + reasoning=reasoning, meta=meta, ) @@ -377,79 +389,56 @@ def _process_reasoning_contents(chunks: list[StreamingChunk]) -> ReasoningConten """ Process reasoning contents from a list of StreamingChunk objects into the Anthropic expected format. - :param chunks: List of StreamingChunk objects potentially containing reasoning contents. + Reads reasoning data from the dedicated StreamingChunk.reasoning field. - :returns: List of Anthropic formatted reasoning content dictionaries + :param chunks: List of StreamingChunk objects potentially containing reasoning contents. + :returns: A ReasoningContent object combining all reasoning chunks, or None if no reasoning was found. """ - formatted_reasoning_contents = [] + formatted_reasoning_contents: list[dict[str, Any]] = [] current_index = None content_block_text = "" content_block_signature = None content_block_redacted_thinking = None - content_block_index = None for chunk in chunks: - if (delta := chunk.meta.get("delta")) is not None: - if delta.get("type") == "thinking_delta" and delta.get("thinking") is not None: - content_block_index = chunk.meta.get("index", None) - if (content_block := chunk.meta.get("content_block")) is not None and content_block.get( - "type" - ) == "redacted_thinking": - content_block_index = chunk.meta.get("index", None) + if chunk.reasoning is None: + continue # Start new group when index changes - if current_index is not None and content_block_index != current_index: - # Finalize current group - if content_block_text: - formatted_reasoning_contents.append( - { - "reasoning_content": { - "reasoning_text": {"text": content_block_text, "signature": content_block_signature}, - } - } - ) - if content_block_redacted_thinking: - formatted_reasoning_contents.append( - {"reasoning_content": {"redacted_thinking": content_block_redacted_thinking}} - ) - - # Reset accumulators for new group + if current_index is not None and chunk.index != current_index: + _finalize_reasoning_group( + formatted_reasoning_contents, + content_block_text, + content_block_signature, + content_block_redacted_thinking, + ) content_block_text = "" content_block_signature = None content_block_redacted_thinking = None - # Accumulate content for current index - current_index = content_block_index - if (delta := chunk.meta.get("delta")) is not None: - if delta.get("type") == "thinking_delta" and delta.get("thinking") is not None: - content_block_text += delta.get("thinking", "") - if delta.get("type") == "signature_delta" and delta.get("signature") is not None: - content_block_signature = delta.get("signature", "") - if (content_block := chunk.meta.get("content_block")) is not None and content_block.get( - "type" - ) == "redacted_thinking": - content_block_redacted_thinking = content_block.get("data", "") + current_index = chunk.index + + # Accumulate reasoning content + if chunk.reasoning.reasoning_text: + content_block_text += chunk.reasoning.reasoning_text + if chunk.reasoning.extra.get("signature") is not None: + content_block_signature = chunk.reasoning.extra["signature"] + if chunk.reasoning.extra.get("redacted_thinking") is not None: + content_block_redacted_thinking = chunk.reasoning.extra["redacted_thinking"] # Finalize the last group if current_index is not None: - if content_block_text: - formatted_reasoning_contents.append( - { - "reasoning_content": { - "reasoning_text": {"text": content_block_text, "signature": content_block_signature}, - } - } - ) - if content_block_redacted_thinking: - formatted_reasoning_contents.append( - {"reasoning_content": {"redacted_thinking": content_block_redacted_thinking}} - ) + _finalize_reasoning_group( + formatted_reasoning_contents, + content_block_text, + content_block_signature, + content_block_redacted_thinking, + ) - # Combine all reasoning texts into a single string for the main reasoning_text field + # Combine all reasoning texts final_reasoning_text = "" for content in formatted_reasoning_contents: if "reasoning_text" in content["reasoning_content"]: - # mypy somehow thinks that content["reasoning_content"]["reasoning_text"]["text"] can be of type None final_reasoning_text += content["reasoning_content"]["reasoning_text"]["text"] # type: ignore[operator] elif "redacted_thinking" in content["reasoning_content"]: final_reasoning_text += "[REDACTED]" @@ -461,3 +450,24 @@ def _process_reasoning_contents(chunks: list[StreamingChunk]) -> ReasoningConten if formatted_reasoning_contents else None ) + + +def _finalize_reasoning_group( + formatted_reasoning_contents: list[dict[str, Any]], + content_block_text: str, + content_block_signature: str | None, + content_block_redacted_thinking: str | None, +) -> None: + """Finalize a reasoning content group and append it to the formatted list.""" + if content_block_text: + formatted_reasoning_contents.append( + { + "reasoning_content": { + "reasoning_text": {"text": content_block_text, "signature": content_block_signature}, + } + } + ) + if content_block_redacted_thinking: + formatted_reasoning_contents.append( + {"reasoning_content": {"redacted_thinking": content_block_redacted_thinking}} + ) diff --git a/integrations/anthropic/tests/test_utils.py b/integrations/anthropic/tests/test_utils.py index 1a2a604b2b..e531136f4f 100644 --- a/integrations/anthropic/tests/test_utils.py +++ b/integrations/anthropic/tests/test_utils.py @@ -149,6 +149,8 @@ def test_convert_anthropic_completion_chunks_with_multiple_tool_calls_and_reason assert streaming_chunk.index == 0 assert streaming_chunk.tool_calls is None assert streaming_chunk.content == "" + assert streaming_chunk.reasoning is not None + assert streaming_chunk.reasoning.reasoning_text == "" # Test content_block_delta for reasoning reasoning_delta_chunk = RawContentBlockDeltaEvent( @@ -167,6 +169,8 @@ def test_convert_anthropic_completion_chunks_with_multiple_tool_calls_and_reason assert streaming_chunk.index == 0 assert streaming_chunk.tool_calls is None assert not streaming_chunk.start + assert streaming_chunk.reasoning is not None + assert streaming_chunk.reasoning.reasoning_text == "The user is asking 2 questions." # Test content_block_delta for reasoning signature reasoning_signature_delta_chunk = RawContentBlockDeltaEvent( @@ -181,6 +185,8 @@ def test_convert_anthropic_completion_chunks_with_multiple_tool_calls_and_reason assert streaming_chunk.component_info == component_info assert streaming_chunk.meta == reasoning_signature_delta_chunk.model_dump() assert streaming_chunk.content == "" + assert streaming_chunk.reasoning is not None + assert streaming_chunk.reasoning.extra.get("signature") == "1234567890" # Test content_block_start for text text_block_start_chunk = RawContentBlockStartEvent( From 17711c9b619e266fa5bcb09410abb312b6ea96ce Mon Sep 17 00:00:00 2001 From: yaowubarbara <113857460+yaowubarbara@users.noreply.github.com> Date: Wed, 18 Feb 2026 10:49:30 +0800 Subject: [PATCH 2/3] fix: address review feedback for Anthropic streaming reasoning - Restore mypy comment for reasoning_text type inference - Add parameter descriptions to _finalize_reasoning_group docstring - Add unit tests for _finalize_reasoning_group - Add check to skip chunks with empty reasoning content --- .../generators/anthropic/chat/utils.py | 18 ++++++++- integrations/anthropic/tests/test_utils.py | 37 +++++++++++++++++++ 2 files changed, 54 insertions(+), 1 deletion(-) diff --git a/integrations/anthropic/src/haystack_integrations/components/generators/anthropic/chat/utils.py b/integrations/anthropic/src/haystack_integrations/components/generators/anthropic/chat/utils.py index 6708077e6f..e46513b9c0 100644 --- a/integrations/anthropic/src/haystack_integrations/components/generators/anthropic/chat/utils.py +++ b/integrations/anthropic/src/haystack_integrations/components/generators/anthropic/chat/utils.py @@ -404,6 +404,14 @@ def _process_reasoning_contents(chunks: list[StreamingChunk]) -> ReasoningConten if chunk.reasoning is None: continue + # Skip chunks with empty reasoning content + if ( + not chunk.reasoning.reasoning_text + and not chunk.reasoning.extra.get("signature") + and not chunk.reasoning.extra.get("redacted_thinking") + ): + continue + # Start new group when index changes if current_index is not None and chunk.index != current_index: _finalize_reasoning_group( @@ -439,6 +447,7 @@ def _process_reasoning_contents(chunks: list[StreamingChunk]) -> ReasoningConten final_reasoning_text = "" for content in formatted_reasoning_contents: if "reasoning_text" in content["reasoning_content"]: + # mypy somehow thinks that content["reasoning_content"]["reasoning_text"]["text"] can be of type None final_reasoning_text += content["reasoning_content"]["reasoning_text"]["text"] # type: ignore[operator] elif "redacted_thinking" in content["reasoning_content"]: final_reasoning_text += "[REDACTED]" @@ -458,7 +467,14 @@ def _finalize_reasoning_group( content_block_signature: str | None, content_block_redacted_thinking: str | None, ) -> None: - """Finalize a reasoning content group and append it to the formatted list.""" + """ + Finalize a reasoning content group and append it to the formatted list. + + :param formatted_reasoning_contents: The list to append the finalized reasoning content to. + :param content_block_text: The accumulated reasoning text for the current group. + :param content_block_signature: The signature for the current reasoning group, if any. + :param content_block_redacted_thinking: The redacted thinking data for the current group, if any. + """ if content_block_text: formatted_reasoning_contents.append( { diff --git a/integrations/anthropic/tests/test_utils.py b/integrations/anthropic/tests/test_utils.py index e531136f4f..6e10739a5e 100644 --- a/integrations/anthropic/tests/test_utils.py +++ b/integrations/anthropic/tests/test_utils.py @@ -40,6 +40,7 @@ _convert_chat_completion_to_chat_message, _convert_image_content_to_anthropic_format, _convert_messages_to_anthropic_format, + _finalize_reasoning_group, ) @@ -935,3 +936,39 @@ def test_convert_message_to_anthropic_invalid(self): message = ChatMessage.from_tool(tool_result="result", origin=tool_call_null_id) with pytest.raises(ValueError): _convert_messages_to_anthropic_format([message]) + + def test_finalize_reasoning_group_with_thinking_text(self): + """Test that _finalize_reasoning_group appends a reasoning_text entry.""" + formatted: list = [] + _finalize_reasoning_group(formatted, "The user is asking about weather.", "sig123", None) + assert len(formatted) == 1 + assert formatted[0] == { + "reasoning_content": { + "reasoning_text": {"text": "The user is asking about weather.", "signature": "sig123"}, + } + } + + def test_finalize_reasoning_group_with_redacted_thinking(self): + """Test that _finalize_reasoning_group appends a redacted_thinking entry.""" + formatted: list = [] + _finalize_reasoning_group(formatted, "", None, "redacted_data_abc") + assert len(formatted) == 1 + assert formatted[0] == {"reasoning_content": {"redacted_thinking": "redacted_data_abc"}} + + def test_finalize_reasoning_group_with_both(self): + """Test that _finalize_reasoning_group appends both reasoning_text and redacted_thinking entries.""" + formatted: list = [] + _finalize_reasoning_group(formatted, "Some thinking.", "sig456", "redacted_xyz") + assert len(formatted) == 2 + assert formatted[0] == { + "reasoning_content": { + "reasoning_text": {"text": "Some thinking.", "signature": "sig456"}, + } + } + assert formatted[1] == {"reasoning_content": {"redacted_thinking": "redacted_xyz"}} + + def test_finalize_reasoning_group_with_empty_inputs(self): + """Test that _finalize_reasoning_group does nothing when all inputs are empty.""" + formatted: list = [] + _finalize_reasoning_group(formatted, "", None, None) + assert len(formatted) == 0 From 9826c0e8f0cdb8f0605a83f4187417f17560112e Mon Sep 17 00:00:00 2001 From: yaowubarbara <113857460+yaowubarbara@users.noreply.github.com> Date: Wed, 18 Feb 2026 16:26:52 +0800 Subject: [PATCH 3/3] fix: treat signature-only chunks as metadata in reasoning processing Signature-only chunks now accumulate without triggering new content groups, matching the previous behavior more closely. --- .../components/generators/anthropic/chat/utils.py | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/integrations/anthropic/src/haystack_integrations/components/generators/anthropic/chat/utils.py b/integrations/anthropic/src/haystack_integrations/components/generators/anthropic/chat/utils.py index e46513b9c0..d85484cdb9 100644 --- a/integrations/anthropic/src/haystack_integrations/components/generators/anthropic/chat/utils.py +++ b/integrations/anthropic/src/haystack_integrations/components/generators/anthropic/chat/utils.py @@ -404,12 +404,14 @@ def _process_reasoning_contents(chunks: list[StreamingChunk]) -> ReasoningConten if chunk.reasoning is None: continue - # Skip chunks with empty reasoning content - if ( - not chunk.reasoning.reasoning_text - and not chunk.reasoning.extra.get("signature") - and not chunk.reasoning.extra.get("redacted_thinking") - ): + # Only thinking text or redacted thinking trigger grouping. Signatures are metadata only + has_thinking_text = bool(chunk.reasoning.reasoning_text) + has_redacted_thinking = chunk.reasoning.extra.get("redacted_thinking") is not None + + if not (has_thinking_text or has_redacted_thinking): + # This is a signature-only chunk. Accumulate it but don't start a new group. + if chunk.reasoning.extra.get("signature") is not None: + content_block_signature = chunk.reasoning.extra["signature"] continue # Start new group when index changes