diff --git a/integrations/anthropic/src/haystack_integrations/components/generators/anthropic/chat/utils.py b/integrations/anthropic/src/haystack_integrations/components/generators/anthropic/chat/utils.py index e992d3a5fd..d85484cdb9 100644 --- a/integrations/anthropic/src/haystack_integrations/components/generators/anthropic/chat/utils.py +++ b/integrations/anthropic/src/haystack_integrations/components/generators/anthropic/chat/utils.py @@ -329,6 +329,7 @@ def _convert_anthropic_chunk_to_streaming_chunk( """ content = "" tool_calls = [] + reasoning = None start = False finish_reason = None index = getattr(chunk, "index", None) @@ -348,6 +349,12 @@ def _convert_anthropic_chunk_to_streaming_chunk( tool_name=chunk.content_block.name, ) ) + elif chunk.content_block.type == "thinking": + reasoning = ReasoningContent(reasoning_text="") + elif chunk.content_block.type == "redacted_thinking": + reasoning = ReasoningContent( + reasoning_text="", extra={"redacted_thinking": getattr(chunk.content_block, "data", "")} + ) # delta of a content block elif chunk.type == "content_block_delta": @@ -355,6 +362,10 @@ def _convert_anthropic_chunk_to_streaming_chunk( content = chunk.delta.text elif chunk.delta.type == "input_json_delta": tool_calls.append(ToolCallDelta(index=tool_call_index, arguments=chunk.delta.partial_json)) + elif chunk.delta.type == "thinking_delta": + reasoning = ReasoningContent(reasoning_text=chunk.delta.thinking) + elif chunk.delta.type == "signature_delta": + reasoning = ReasoningContent(reasoning_text="", extra={"signature": chunk.delta.signature}) # end of streaming message elif chunk.type == "message_delta": @@ -369,6 +380,7 @@ def _convert_anthropic_chunk_to_streaming_chunk( start=start, finish_reason=finish_reason, tool_calls=tool_calls if tool_calls else None, + reasoning=reasoning, meta=meta, ) @@ -377,75 +389,63 @@ def _process_reasoning_contents(chunks: list[StreamingChunk]) -> ReasoningConten """ Process reasoning contents from a list of StreamingChunk objects into the Anthropic expected format. - :param chunks: List of StreamingChunk objects potentially containing reasoning contents. + Reads reasoning data from the dedicated StreamingChunk.reasoning field. - :returns: List of Anthropic formatted reasoning content dictionaries + :param chunks: List of StreamingChunk objects potentially containing reasoning contents. + :returns: A ReasoningContent object combining all reasoning chunks, or None if no reasoning was found. """ - formatted_reasoning_contents = [] + formatted_reasoning_contents: list[dict[str, Any]] = [] current_index = None content_block_text = "" content_block_signature = None content_block_redacted_thinking = None - content_block_index = None for chunk in chunks: - if (delta := chunk.meta.get("delta")) is not None: - if delta.get("type") == "thinking_delta" and delta.get("thinking") is not None: - content_block_index = chunk.meta.get("index", None) - if (content_block := chunk.meta.get("content_block")) is not None and content_block.get( - "type" - ) == "redacted_thinking": - content_block_index = chunk.meta.get("index", None) + if chunk.reasoning is None: + continue - # Start new group when index changes - if current_index is not None and content_block_index != current_index: - # Finalize current group - if content_block_text: - formatted_reasoning_contents.append( - { - "reasoning_content": { - "reasoning_text": {"text": content_block_text, "signature": content_block_signature}, - } - } - ) - if content_block_redacted_thinking: - formatted_reasoning_contents.append( - {"reasoning_content": {"redacted_thinking": content_block_redacted_thinking}} - ) + # Only thinking text or redacted thinking trigger grouping. Signatures are metadata only + has_thinking_text = bool(chunk.reasoning.reasoning_text) + has_redacted_thinking = chunk.reasoning.extra.get("redacted_thinking") is not None - # Reset accumulators for new group + if not (has_thinking_text or has_redacted_thinking): + # This is a signature-only chunk. Accumulate it but don't start a new group. + if chunk.reasoning.extra.get("signature") is not None: + content_block_signature = chunk.reasoning.extra["signature"] + continue + + # Start new group when index changes + if current_index is not None and chunk.index != current_index: + _finalize_reasoning_group( + formatted_reasoning_contents, + content_block_text, + content_block_signature, + content_block_redacted_thinking, + ) content_block_text = "" content_block_signature = None content_block_redacted_thinking = None - # Accumulate content for current index - current_index = content_block_index - if (delta := chunk.meta.get("delta")) is not None: - if delta.get("type") == "thinking_delta" and delta.get("thinking") is not None: - content_block_text += delta.get("thinking", "") - if delta.get("type") == "signature_delta" and delta.get("signature") is not None: - content_block_signature = delta.get("signature", "") - if (content_block := chunk.meta.get("content_block")) is not None and content_block.get( - "type" - ) == "redacted_thinking": - content_block_redacted_thinking = content_block.get("data", "") + current_index = chunk.index + + # Accumulate reasoning content + if chunk.reasoning.reasoning_text: + content_block_text += chunk.reasoning.reasoning_text + if chunk.reasoning.extra.get("signature") is not None: + content_block_signature = chunk.reasoning.extra["signature"] + if chunk.reasoning.extra.get("redacted_thinking") is not None: + content_block_redacted_thinking = chunk.reasoning.extra["redacted_thinking"] # Finalize the last group if current_index is not None: - if content_block_text: - formatted_reasoning_contents.append( - { - "reasoning_content": { - "reasoning_text": {"text": content_block_text, "signature": content_block_signature}, - } - } - ) - if content_block_redacted_thinking: - formatted_reasoning_contents.append( - {"reasoning_content": {"redacted_thinking": content_block_redacted_thinking}} - ) + _finalize_reasoning_group( + formatted_reasoning_contents, + content_block_text, + content_block_signature, + content_block_redacted_thinking, + ) - # Combine all reasoning texts into a single string for the main reasoning_text field + # Combine all reasoning texts final_reasoning_text = "" for content in formatted_reasoning_contents: if "reasoning_text" in content["reasoning_content"]: @@ -461,3 +461,31 @@ def _process_reasoning_contents(chunks: list[StreamingChunk]) -> ReasoningConten if formatted_reasoning_contents else None ) + + +def _finalize_reasoning_group( + formatted_reasoning_contents: list[dict[str, Any]], + content_block_text: str, + content_block_signature: str | None, + content_block_redacted_thinking: str | None, +) -> None: + """ + Finalize a reasoning content group and append it to the formatted list. + + :param formatted_reasoning_contents: The list to append the finalized reasoning content to. + :param content_block_text: The accumulated reasoning text for the current group. + :param content_block_signature: The signature for the current reasoning group, if any. + :param content_block_redacted_thinking: The redacted thinking data for the current group, if any. + """ + if content_block_text: + formatted_reasoning_contents.append( + { + "reasoning_content": { + "reasoning_text": {"text": content_block_text, "signature": content_block_signature}, + } + } + ) + if content_block_redacted_thinking: + formatted_reasoning_contents.append( + {"reasoning_content": {"redacted_thinking": content_block_redacted_thinking}} + ) diff --git a/integrations/anthropic/tests/test_utils.py b/integrations/anthropic/tests/test_utils.py index 1a2a604b2b..6e10739a5e 100644 --- a/integrations/anthropic/tests/test_utils.py +++ b/integrations/anthropic/tests/test_utils.py @@ -40,6 +40,7 @@ _convert_chat_completion_to_chat_message, _convert_image_content_to_anthropic_format, _convert_messages_to_anthropic_format, + _finalize_reasoning_group, ) @@ -149,6 +150,8 @@ def test_convert_anthropic_completion_chunks_with_multiple_tool_calls_and_reason assert streaming_chunk.index == 0 assert streaming_chunk.tool_calls is None assert streaming_chunk.content == "" + assert streaming_chunk.reasoning is not None + assert streaming_chunk.reasoning.reasoning_text == "" # Test content_block_delta for reasoning reasoning_delta_chunk = RawContentBlockDeltaEvent( @@ -167,6 +170,8 @@ def test_convert_anthropic_completion_chunks_with_multiple_tool_calls_and_reason assert streaming_chunk.index == 0 assert streaming_chunk.tool_calls is None assert not streaming_chunk.start + assert streaming_chunk.reasoning is not None + assert streaming_chunk.reasoning.reasoning_text == "The user is asking 2 questions." # Test content_block_delta for reasoning signature reasoning_signature_delta_chunk = RawContentBlockDeltaEvent( @@ -181,6 +186,8 @@ def test_convert_anthropic_completion_chunks_with_multiple_tool_calls_and_reason assert streaming_chunk.component_info == component_info assert streaming_chunk.meta == reasoning_signature_delta_chunk.model_dump() assert streaming_chunk.content == "" + assert streaming_chunk.reasoning is not None + assert streaming_chunk.reasoning.extra.get("signature") == "1234567890" # Test content_block_start for text text_block_start_chunk = RawContentBlockStartEvent( @@ -929,3 +936,39 @@ def test_convert_message_to_anthropic_invalid(self): message = ChatMessage.from_tool(tool_result="result", origin=tool_call_null_id) with pytest.raises(ValueError): _convert_messages_to_anthropic_format([message]) + + def test_finalize_reasoning_group_with_thinking_text(self): + """Test that _finalize_reasoning_group appends a reasoning_text entry.""" + formatted: list = [] + _finalize_reasoning_group(formatted, "The user is asking about weather.", "sig123", None) + assert len(formatted) == 1 + assert formatted[0] == { + "reasoning_content": { + "reasoning_text": {"text": "The user is asking about weather.", "signature": "sig123"}, + } + } + + def test_finalize_reasoning_group_with_redacted_thinking(self): + """Test that _finalize_reasoning_group appends a redacted_thinking entry.""" + formatted: list = [] + _finalize_reasoning_group(formatted, "", None, "redacted_data_abc") + assert len(formatted) == 1 + assert formatted[0] == {"reasoning_content": {"redacted_thinking": "redacted_data_abc"}} + + def test_finalize_reasoning_group_with_both(self): + """Test that _finalize_reasoning_group appends both reasoning_text and redacted_thinking entries.""" + formatted: list = [] + _finalize_reasoning_group(formatted, "Some thinking.", "sig456", "redacted_xyz") + assert len(formatted) == 2 + assert formatted[0] == { + "reasoning_content": { + "reasoning_text": {"text": "Some thinking.", "signature": "sig456"}, + } + } + assert formatted[1] == {"reasoning_content": {"redacted_thinking": "redacted_xyz"}} + + def test_finalize_reasoning_group_with_empty_inputs(self): + """Test that _finalize_reasoning_group does nothing when all inputs are empty.""" + formatted: list = [] + _finalize_reasoning_group(formatted, "", None, None) + assert len(formatted) == 0