Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,7 @@ def _convert_anthropic_chunk_to_streaming_chunk(
"""
content = ""
tool_calls = []
reasoning = None
start = False
finish_reason = None
index = getattr(chunk, "index", None)
Expand All @@ -348,13 +349,23 @@ def _convert_anthropic_chunk_to_streaming_chunk(
tool_name=chunk.content_block.name,
)
)
elif chunk.content_block.type == "thinking":
reasoning = ReasoningContent(reasoning_text="")
elif chunk.content_block.type == "redacted_thinking":
reasoning = ReasoningContent(
reasoning_text="", extra={"redacted_thinking": getattr(chunk.content_block, "data", "")}
)

# delta of a content block
elif chunk.type == "content_block_delta":
if chunk.delta.type == "text_delta":
content = chunk.delta.text
elif chunk.delta.type == "input_json_delta":
tool_calls.append(ToolCallDelta(index=tool_call_index, arguments=chunk.delta.partial_json))
elif chunk.delta.type == "thinking_delta":
reasoning = ReasoningContent(reasoning_text=chunk.delta.thinking)
elif chunk.delta.type == "signature_delta":
reasoning = ReasoningContent(reasoning_text="", extra={"signature": chunk.delta.signature})

# end of streaming message
elif chunk.type == "message_delta":
Expand All @@ -369,6 +380,7 @@ def _convert_anthropic_chunk_to_streaming_chunk(
start=start,
finish_reason=finish_reason,
tool_calls=tool_calls if tool_calls else None,
reasoning=reasoning,
meta=meta,
)

Expand All @@ -377,75 +389,63 @@ def _process_reasoning_contents(chunks: list[StreamingChunk]) -> ReasoningConten
"""
Process reasoning contents from a list of StreamingChunk objects into the Anthropic expected format.

:param chunks: List of StreamingChunk objects potentially containing reasoning contents.
Reads reasoning data from the dedicated StreamingChunk.reasoning field.

:returns: List of Anthropic formatted reasoning content dictionaries
:param chunks: List of StreamingChunk objects potentially containing reasoning contents.
:returns: A ReasoningContent object combining all reasoning chunks, or None if no reasoning was found.
"""
formatted_reasoning_contents = []
formatted_reasoning_contents: list[dict[str, Any]] = []
current_index = None
content_block_text = ""
content_block_signature = None
content_block_redacted_thinking = None
content_block_index = None

for chunk in chunks:
if (delta := chunk.meta.get("delta")) is not None:
if delta.get("type") == "thinking_delta" and delta.get("thinking") is not None:
content_block_index = chunk.meta.get("index", None)
if (content_block := chunk.meta.get("content_block")) is not None and content_block.get(
"type"
) == "redacted_thinking":
content_block_index = chunk.meta.get("index", None)
if chunk.reasoning is None:
continue

# Start new group when index changes
if current_index is not None and content_block_index != current_index:
# Finalize current group
if content_block_text:
formatted_reasoning_contents.append(
{
"reasoning_content": {
"reasoning_text": {"text": content_block_text, "signature": content_block_signature},
}
}
)
if content_block_redacted_thinking:
formatted_reasoning_contents.append(
{"reasoning_content": {"redacted_thinking": content_block_redacted_thinking}}
)
# Only thinking text or redacted thinking trigger grouping. Signatures are metadata only
has_thinking_text = bool(chunk.reasoning.reasoning_text)
has_redacted_thinking = chunk.reasoning.extra.get("redacted_thinking") is not None

# Reset accumulators for new group
if not (has_thinking_text or has_redacted_thinking):
# This is a signature-only chunk. Accumulate it but don't start a new group.
if chunk.reasoning.extra.get("signature") is not None:
content_block_signature = chunk.reasoning.extra["signature"]
continue

# Start new group when index changes
if current_index is not None and chunk.index != current_index:
_finalize_reasoning_group(
formatted_reasoning_contents,
content_block_text,
content_block_signature,
content_block_redacted_thinking,
)
content_block_text = ""
content_block_signature = None
content_block_redacted_thinking = None

# Accumulate content for current index
current_index = content_block_index
if (delta := chunk.meta.get("delta")) is not None:
if delta.get("type") == "thinking_delta" and delta.get("thinking") is not None:
content_block_text += delta.get("thinking", "")
if delta.get("type") == "signature_delta" and delta.get("signature") is not None:
content_block_signature = delta.get("signature", "")
if (content_block := chunk.meta.get("content_block")) is not None and content_block.get(
"type"
) == "redacted_thinking":
content_block_redacted_thinking = content_block.get("data", "")
current_index = chunk.index

# Accumulate reasoning content
if chunk.reasoning.reasoning_text:
content_block_text += chunk.reasoning.reasoning_text
if chunk.reasoning.extra.get("signature") is not None:
content_block_signature = chunk.reasoning.extra["signature"]
if chunk.reasoning.extra.get("redacted_thinking") is not None:
content_block_redacted_thinking = chunk.reasoning.extra["redacted_thinking"]

# Finalize the last group
if current_index is not None:
if content_block_text:
formatted_reasoning_contents.append(
{
"reasoning_content": {
"reasoning_text": {"text": content_block_text, "signature": content_block_signature},
}
}
)
if content_block_redacted_thinking:
formatted_reasoning_contents.append(
{"reasoning_content": {"redacted_thinking": content_block_redacted_thinking}}
)
_finalize_reasoning_group(
formatted_reasoning_contents,
content_block_text,
content_block_signature,
content_block_redacted_thinking,
)

# Combine all reasoning texts into a single string for the main reasoning_text field
# Combine all reasoning texts
final_reasoning_text = ""
for content in formatted_reasoning_contents:
if "reasoning_text" in content["reasoning_content"]:
Expand All @@ -461,3 +461,31 @@ def _process_reasoning_contents(chunks: list[StreamingChunk]) -> ReasoningConten
if formatted_reasoning_contents
else None
)


def _finalize_reasoning_group(
formatted_reasoning_contents: list[dict[str, Any]],
content_block_text: str,
content_block_signature: str | None,
content_block_redacted_thinking: str | None,
) -> None:
"""
Finalize a reasoning content group and append it to the formatted list.

:param formatted_reasoning_contents: The list to append the finalized reasoning content to.
:param content_block_text: The accumulated reasoning text for the current group.
:param content_block_signature: The signature for the current reasoning group, if any.
:param content_block_redacted_thinking: The redacted thinking data for the current group, if any.
"""
if content_block_text:
formatted_reasoning_contents.append(
{
"reasoning_content": {
"reasoning_text": {"text": content_block_text, "signature": content_block_signature},
}
}
)
if content_block_redacted_thinking:
formatted_reasoning_contents.append(
{"reasoning_content": {"redacted_thinking": content_block_redacted_thinking}}
)
43 changes: 43 additions & 0 deletions integrations/anthropic/tests/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
_convert_chat_completion_to_chat_message,
_convert_image_content_to_anthropic_format,
_convert_messages_to_anthropic_format,
_finalize_reasoning_group,
)


Expand Down Expand Up @@ -149,6 +150,8 @@ def test_convert_anthropic_completion_chunks_with_multiple_tool_calls_and_reason
assert streaming_chunk.index == 0
assert streaming_chunk.tool_calls is None
assert streaming_chunk.content == ""
assert streaming_chunk.reasoning is not None
assert streaming_chunk.reasoning.reasoning_text == ""

# Test content_block_delta for reasoning
reasoning_delta_chunk = RawContentBlockDeltaEvent(
Expand All @@ -167,6 +170,8 @@ def test_convert_anthropic_completion_chunks_with_multiple_tool_calls_and_reason
assert streaming_chunk.index == 0
assert streaming_chunk.tool_calls is None
assert not streaming_chunk.start
assert streaming_chunk.reasoning is not None
assert streaming_chunk.reasoning.reasoning_text == "The user is asking 2 questions."

# Test content_block_delta for reasoning signature
reasoning_signature_delta_chunk = RawContentBlockDeltaEvent(
Expand All @@ -181,6 +186,8 @@ def test_convert_anthropic_completion_chunks_with_multiple_tool_calls_and_reason
assert streaming_chunk.component_info == component_info
assert streaming_chunk.meta == reasoning_signature_delta_chunk.model_dump()
assert streaming_chunk.content == ""
assert streaming_chunk.reasoning is not None
assert streaming_chunk.reasoning.extra.get("signature") == "1234567890"

# Test content_block_start for text
text_block_start_chunk = RawContentBlockStartEvent(
Expand Down Expand Up @@ -929,3 +936,39 @@ def test_convert_message_to_anthropic_invalid(self):
message = ChatMessage.from_tool(tool_result="result", origin=tool_call_null_id)
with pytest.raises(ValueError):
_convert_messages_to_anthropic_format([message])

def test_finalize_reasoning_group_with_thinking_text(self):
"""Test that _finalize_reasoning_group appends a reasoning_text entry."""
formatted: list = []
_finalize_reasoning_group(formatted, "The user is asking about weather.", "sig123", None)
assert len(formatted) == 1
assert formatted[0] == {
"reasoning_content": {
"reasoning_text": {"text": "The user is asking about weather.", "signature": "sig123"},
}
}

def test_finalize_reasoning_group_with_redacted_thinking(self):
"""Test that _finalize_reasoning_group appends a redacted_thinking entry."""
formatted: list = []
_finalize_reasoning_group(formatted, "", None, "redacted_data_abc")
assert len(formatted) == 1
assert formatted[0] == {"reasoning_content": {"redacted_thinking": "redacted_data_abc"}}

def test_finalize_reasoning_group_with_both(self):
"""Test that _finalize_reasoning_group appends both reasoning_text and redacted_thinking entries."""
formatted: list = []
_finalize_reasoning_group(formatted, "Some thinking.", "sig456", "redacted_xyz")
assert len(formatted) == 2
assert formatted[0] == {
"reasoning_content": {
"reasoning_text": {"text": "Some thinking.", "signature": "sig456"},
}
}
assert formatted[1] == {"reasoning_content": {"redacted_thinking": "redacted_xyz"}}

def test_finalize_reasoning_group_with_empty_inputs(self):
"""Test that _finalize_reasoning_group does nothing when all inputs are empty."""
formatted: list = []
_finalize_reasoning_group(formatted, "", None, None)
assert len(formatted) == 0