From ed516d07f3d0a39485c4908c61f9a2db15090da8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=B5=81=E5=B1=BF?= Date: Mon, 15 Jun 2026 00:54:23 +0800 Subject: [PATCH 1/3] fix: capture claude agent sdk session ids --- .../CHANGELOG.md | 4 + .../instrumentation/claude_agent_sdk/patch.py | 158 +++-- .../tests/test_session_capture.py | 562 ++++++++++++++++++ 3 files changed, 690 insertions(+), 34 deletions(-) create mode 100644 instrumentation-loongsuite/loongsuite-instrumentation-claude-agent-sdk/tests/test_session_capture.py diff --git a/instrumentation-loongsuite/loongsuite-instrumentation-claude-agent-sdk/CHANGELOG.md b/instrumentation-loongsuite/loongsuite-instrumentation-claude-agent-sdk/CHANGELOG.md index 59b05e2b8..7836f9f0c 100644 --- a/instrumentation-loongsuite/loongsuite-instrumentation-claude-agent-sdk/CHANGELOG.md +++ b/instrumentation-loongsuite/loongsuite-instrumentation-claude-agent-sdk/CHANGELOG.md @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## Unreleased +### Fixed + +- Capture Claude Agent SDK session IDs on agent, LLM, and tool spans. + ## Version 0.6.0 (2026-06-03) There are no changelog entries for this release. diff --git a/instrumentation-loongsuite/loongsuite-instrumentation-claude-agent-sdk/src/opentelemetry/instrumentation/claude_agent_sdk/patch.py b/instrumentation-loongsuite/loongsuite-instrumentation-claude-agent-sdk/src/opentelemetry/instrumentation/claude_agent_sdk/patch.py index 8477b6950..ae2b4e56a 100644 --- a/instrumentation-loongsuite/loongsuite-instrumentation-claude-agent-sdk/src/opentelemetry/instrumentation/claude_agent_sdk/patch.py +++ b/instrumentation-loongsuite/loongsuite-instrumentation-claude-agent-sdk/src/opentelemetry/instrumentation/claude_agent_sdk/patch.py @@ -18,6 +18,7 @@ import time from typing import Any, Dict, List, Optional +from opentelemetry import baggage from opentelemetry import context as otel_context from opentelemetry.instrumentation.claude_agent_sdk.utils import ( extract_usage_from_result_message, @@ -27,7 +28,10 @@ from opentelemetry.trace import set_span_in_context from opentelemetry.util.genai.extended_handler import ( ExtendedTelemetryHandler, - get_extended_telemetry_handler, +) +from opentelemetry.util.genai.extended_semconv.gen_ai_extended_attributes import ( + GEN_AI_SESSION_ID, + GEN_AI_USER_ID, ) from opentelemetry.util.genai.extended_types import ( ExecuteToolInvocation, @@ -46,29 +50,72 @@ logger = logging.getLogger(__name__) -# Storage for tool runs managed by client (created from response stream) -# Key: tool_use_id, Value: tool_invocation -_client_managed_runs: Dict[str, ExecuteToolInvocation] = {} + +def _current_baggage_value(key: str) -> Optional[str]: + try: + value = baggage.get_baggage(key) + except Exception: + return None + if value is None: + return None + text = str(value).strip() + return text or None + + +def _entry_baggage_identity_attributes() -> Dict[str, str]: + attributes: Dict[str, str] = {} + session_id = _current_baggage_value(GEN_AI_SESSION_ID) + user_id = _current_baggage_value(GEN_AI_USER_ID) + if session_id: + attributes[GEN_AI_SESSION_ID] = session_id + if user_id: + attributes[GEN_AI_USER_ID] = user_id + return attributes + + +def _apply_session_identity( + invocation: Any, session_id: Optional[str] +) -> None: + """Apply Entry baggage identity first, then Claude's own session fallback.""" + entry_attributes = _entry_baggage_identity_attributes() + effective_session_id = ( + entry_attributes.get(GEN_AI_SESSION_ID) or session_id + ) + + if effective_session_id: + if hasattr(invocation, "conversation_id"): + invocation.conversation_id = effective_session_id + invocation.attributes[GEN_AI_SESSION_ID] = effective_session_id + + for key, value in entry_attributes.items(): + invocation.attributes[key] = value -def _clear_client_managed_runs() -> None: +def _set_session_id( + agent_invocation: InvokeAgentInvocation, session_id: Optional[str] +) -> None: + """Set Entry session id or Claude session id on an agent invocation.""" + _apply_session_identity(agent_invocation, session_id) + + +def _set_llm_session_id( + llm_invocation: LLMInvocation, session_id: Optional[str] +) -> None: + """Set Entry session id or Claude session id on an LLM invocation.""" + _apply_session_identity(llm_invocation, session_id) + + +def _clear_client_managed_runs( + handler: ExtendedTelemetryHandler, + client_managed_runs: Dict[str, ExecuteToolInvocation], +) -> None: """Clear all client-managed tool runs. This should be called when a conversation ends to avoid memory leaks and to clean up any orphaned tool runs. """ - global _client_managed_runs - - try: - handler = get_extended_telemetry_handler() - except Exception: - # If we can't get the handler (e.g., instrumentation not initialized), - # we still need to clear the tracking dictionary to prevent memory leaks. - _client_managed_runs.clear() - return - # End any orphaned tool runs - for tool_use_id, tool_invocation in list(_client_managed_runs.items()): + for tool_use_id, tool_invocation in list(client_managed_runs.items()): try: handler.fail_execute_tool( tool_invocation, @@ -83,7 +130,7 @@ def _clear_client_managed_runs() -> None: # Best effort cleanup: continue processing remaining tools. pass - _client_managed_runs.clear() + client_managed_runs.clear() def _extract_message_parts(msg: Any) -> List[Any]: @@ -112,6 +159,7 @@ def _create_tool_spans_from_message( handler: ExtendedTelemetryHandler, agent_invocation: InvokeAgentInvocation, active_task_stack: List[Any], + client_managed_runs: Dict[str, ExecuteToolInvocation], exclude_tool_names: Optional[List[str]] = None, ) -> None: """Create tool execution spans from ToolUseBlocks in an AssistantMessage. @@ -163,8 +211,11 @@ def _create_tool_spans_from_message( tool_call_arguments=tool_input, tool_description=tool_name, ) + _apply_session_identity( + tool_invocation, agent_invocation.conversation_id + ) handler.start_execute_tool(tool_invocation) - _client_managed_runs[tool_use_id] = tool_invocation + client_managed_runs[tool_use_id] = tool_invocation # If this is a Task tool, create a SubAgent span under it # https://platform.claude.com/docs/en/agent-sdk/python#task @@ -203,6 +254,10 @@ def _create_tool_spans_from_message( agent_description=task_description, input_messages=input_messages, ) + _set_session_id( + subagent_invocation, + agent_invocation.conversation_id, + ) # Start SubAgent span handler.start_invoke_agent(subagent_invocation) @@ -271,6 +326,7 @@ def _process_assistant_message( handler: ExtendedTelemetryHandler, collected_messages: List[Dict[str, Any]], active_task_stack: List[Any], + client_managed_runs: Dict[str, ExecuteToolInvocation], ) -> None: """Process AssistantMessage: create LLM turn, extract parts, create tool spans.""" parts = _extract_message_parts(msg) @@ -353,7 +409,11 @@ def _process_assistant_message( turn_tracker.close_llm_turn() _create_tool_spans_from_message( - msg, handler, agent_invocation, active_task_stack + msg, + handler, + agent_invocation, + active_task_stack, + client_managed_runs, ) @@ -363,6 +423,7 @@ def _process_user_message( handler: ExtendedTelemetryHandler, collected_messages: List[Dict[str, Any]], active_task_stack: List[Any], + client_managed_runs: Dict[str, ExecuteToolInvocation], ) -> None: """Process UserMessage: close tool spans, collect message content, mark next LLM start.""" user_parts: List[MessagePart] = [] @@ -376,8 +437,8 @@ def _process_user_message( if block_type == "ToolResultBlock": tool_use_id = getattr(block, "tool_use_id", None) - if tool_use_id and tool_use_id in _client_managed_runs: - tool_invocation = _client_managed_runs.pop(tool_use_id) + if tool_use_id and tool_use_id in client_managed_runs: + tool_invocation = client_managed_runs.pop(tool_use_id) # Set tool response tool_content = getattr(block, "content", None) @@ -533,7 +594,21 @@ def _process_system_message( if hasattr(msg, "data") and isinstance(msg.data, dict): session_id = msg.data.get("session_id") if session_id: - agent_invocation.conversation_id = session_id + _set_session_id(agent_invocation, session_id) + + +def _process_stream_event_message( + msg: Any, + agent_invocation: InvokeAgentInvocation, +) -> None: + """Process StreamEvent: extract session_id when streaming mode exposes it early.""" + session_id = getattr(msg, "session_id", None) + if not session_id: + event = getattr(msg, "event", None) + if isinstance(event, dict): + session_id = event.get("session_id") + + _set_session_id(agent_invocation, session_id) def _process_result_message( @@ -543,6 +618,8 @@ def _process_result_message( ) -> None: """Process ResultMessage: update session_id (fallback), token usage, and close any open LLM turn.""" + _set_session_id(agent_invocation, getattr(msg, "session_id", None)) + turn_tracker.set_session_id(agent_invocation.conversation_id) _update_token_usage(agent_invocation, turn_tracker, msg) if turn_tracker.current_llm_invocation: @@ -554,6 +631,7 @@ async def _process_agent_invocation_stream( handler: ExtendedTelemetryHandler, model: str, prompt: str, + session_id: Optional[str] = None, ) -> Any: """Unified handler for processing agent invocation stream. @@ -564,18 +642,15 @@ async def _process_agent_invocation_stream( provider=infer_provider_from_base_url(), agent_name="claude-agent", request_model=model, - conversation_id="", + conversation_id=None, input_messages=[ InputMessage(role="user", parts=[Text(content=prompt)]) ] if prompt else [], ) + _set_session_id(agent_invocation, session_id) - # Attach empty context to clear any previous context, ensuring each query - # creates an independent root trace. This is important for scenarios where - # multiple queries are called in the same script - each should have its own trace_id. - empty_context_token = otel_context.attach(otel_context.Context()) handler.start_invoke_agent(agent_invocation) query_start_time = time.time() @@ -589,6 +664,7 @@ async def _process_agent_invocation_stream( # When a Task tool is created, it's pushed here # When its ToolResultBlock is received, it's popped active_task_stack: List[Any] = [] + client_managed_runs: Dict[str, ExecuteToolInvocation] = {} try: async for msg in wrapped_stream: @@ -596,6 +672,8 @@ async def _process_agent_invocation_stream( if msg_type == "SystemMessage": _process_system_message(msg, agent_invocation) + elif msg_type == "StreamEvent": + _process_stream_event_message(msg, agent_invocation) elif msg_type == "AssistantMessage": _process_assistant_message( msg, @@ -606,6 +684,7 @@ async def _process_agent_invocation_stream( handler, collected_messages, active_task_stack, + client_managed_runs, ) elif msg_type == "UserMessage": _process_user_message( @@ -614,6 +693,7 @@ async def _process_agent_invocation_stream( handler, collected_messages, active_task_stack, + client_managed_runs, ) elif msg_type == "ResultMessage": _process_result_message(msg, agent_invocation, turn_tracker) @@ -648,11 +728,7 @@ async def _process_agent_invocation_stream( # Span closure failure should not break the application pass - # Detach empty context token to restore the original context. - # Note: stop_invoke_agent/fail_invoke_agent already detached invocation.context_token, - # which restored to empty context. Now we detach empty_context_token to restore further. - otel_context.detach(empty_context_token) - _clear_client_managed_runs() + _clear_client_managed_runs(handler, client_managed_runs) class AssistantTurnTracker: @@ -728,8 +804,8 @@ def start_llm_turn( # Add conversation_id (session_id) to LLM span attributes # This is a custom extension beyond standard GenAI semantic conventions if agent_invocation and agent_invocation.conversation_id: - llm_invocation.attributes["gen_ai.conversation.id"] = ( - agent_invocation.conversation_id + _set_llm_session_id( + llm_invocation, agent_invocation.conversation_id ) self.handler.start_llm(llm_invocation) @@ -774,6 +850,12 @@ def update_usage( if output_tokens is not None: target_invocation.output_tokens = output_tokens + def set_session_id(self, session_id: Optional[str]) -> None: + """Update the open LLM invocation with a late session id.""" + target_invocation = self.current_llm_invocation + if target_invocation: + _set_llm_session_id(target_invocation, session_id) + def close_llm_turn(self) -> None: """Close the current LLM invocation span.""" if self.current_llm_invocation: @@ -798,6 +880,7 @@ def wrap_claude_client_init(wrapped, instance, args, kwargs, handler=None): instance._otel_handler = handler instance._otel_prompt = None + instance._otel_session_id = None return result @@ -808,6 +891,10 @@ def wrap_claude_client_query(wrapped, instance, args, kwargs, handler=None): instance._otel_prompt = str( kwargs.get("prompt") or (args[0] if args else "") ) + session_id = kwargs.get("session_id") + if session_id is None and len(args) > 1: + session_id = args[1] + instance._otel_session_id = session_id return wrapped(*args, **kwargs) @@ -835,6 +922,7 @@ async def wrap_claude_client_receive_response( handler=handler, model=model, prompt=prompt, + session_id=getattr(instance, "_otel_session_id", None), ): yield msg @@ -852,11 +940,13 @@ async def wrap_query(wrapped, instance, args, kwargs, handler=None): model = get_model_from_options_or_env(options) prompt_str = str(prompt) if isinstance(prompt, str) else "" + session_id = getattr(options, "resume", None) if options else None async for message in _process_agent_invocation_stream( wrapped(*args, **kwargs), handler=handler, model=model, prompt=prompt_str, + session_id=session_id, ): yield message diff --git a/instrumentation-loongsuite/loongsuite-instrumentation-claude-agent-sdk/tests/test_session_capture.py b/instrumentation-loongsuite/loongsuite-instrumentation-claude-agent-sdk/tests/test_session_capture.py new file mode 100644 index 000000000..c53b80b61 --- /dev/null +++ b/instrumentation-loongsuite/loongsuite-instrumentation-claude-agent-sdk/tests/test_session_capture.py @@ -0,0 +1,562 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Session propagation tests for Claude Agent SDK instrumentation.""" + +import asyncio +from types import SimpleNamespace +from typing import Any + +import pytest + +from opentelemetry import baggage +from opentelemetry import context as otel_context +from opentelemetry.instrumentation.claude_agent_sdk.patch import ( + _process_agent_invocation_stream, + wrap_claude_client_query, + wrap_claude_client_receive_response, + wrap_query, +) +from opentelemetry.semconv._incubating.attributes import ( + gen_ai_attributes as GenAIAttributes, +) +from opentelemetry.util.genai.extended_handler import ExtendedTelemetryHandler +from opentelemetry.util.genai.extended_semconv.gen_ai_extended_attributes import ( + GEN_AI_SESSION_ID, + GEN_AI_USER_ID, +) + + +class SystemMessage: + def __init__(self, session_id: str): + self.subtype = "init" + self.data = {"session_id": session_id} + + +class StreamEvent: + def __init__( + self, + session_id: str | None = None, + event_session_id: str | None = None, + ): + self.session_id = session_id + self.event = { + "type": "content_block_delta", + "delta": {"type": "text_delta", "text": "partial"}, + } + if event_session_id: + self.event["session_id"] = event_session_id + + +class AssistantMessage: + def __init__(self, content: list[Any], model: str = "claude-sonnet"): + self.content = content + self.model = model + self.parent_tool_use_id = None + self.error = None + + +class UserMessage: + def __init__( + self, + content: list[Any], + tool_use_result: dict[str, Any] | None = None, + ): + self.content = content + self.tool_use_result = tool_use_result + self.uuid = None + self.parent_tool_use_id = None + + +class ResultMessage: + def __init__(self, session_id: str | None = None): + self.subtype = "success" + self.duration_ms = 10 + self.duration_api_ms = 8 + self.is_error = False + self.num_turns = 1 + self.session_id = session_id + self.total_cost_usd = 0.01 + self.usage = {"input_tokens": 11, "output_tokens": 7} + self.result = "done" + self.structured_output = None + + +class TextBlock: + def __init__(self, text: str): + self.text = text + + +class ToolUseBlock: + def __init__( + self, tool_use_id: str, name: str, tool_input: dict[str, Any] + ): + self.id = tool_use_id + self.name = name + self.input = tool_input + + +class ToolResultBlock: + def __init__(self, tool_use_id: str, content: str): + self.tool_use_id = tool_use_id + self.content = content + self.is_error = False + + +async def _stream(messages): + for message in messages: + yield message + + +def _spans_by_operation(spans, operation): + return [ + span + for span in spans + if dict(span.attributes or {}).get( + GenAIAttributes.GEN_AI_OPERATION_NAME + ) + == operation + ] + + +async def _run_stream(tracer_provider, messages, session_id=None): + handler = ExtendedTelemetryHandler(tracer_provider=tracer_provider) + async for _ in _process_agent_invocation_stream( + wrapped_stream=_stream(messages), + handler=handler, + model="claude-sonnet", + prompt="inspect the project", + session_id=session_id, + ): + pass + + +@pytest.mark.asyncio +async def test_entry_baggage_session_overrides_claude_session( + tracer_provider, span_exporter +): + messages = [ + SystemMessage("claude-session"), + AssistantMessage([TextBlock("I will read a file.")]), + AssistantMessage( + [ToolUseBlock("toolu_1", "Read", {"file_path": "README.md"})] + ), + UserMessage([ToolResultBlock("toolu_1", "README content")]), + ResultMessage("claude-session"), + ] + ctx = baggage.set_baggage(GEN_AI_SESSION_ID, "entry-session") + ctx = baggage.set_baggage(GEN_AI_USER_ID, "entry-user", ctx) + token = otel_context.attach(ctx) + try: + await _run_stream(tracer_provider, messages) + finally: + otel_context.detach(token) + + spans = span_exporter.get_finished_spans() + agent_span = _spans_by_operation(spans, "invoke_agent")[0] + llm_span = _spans_by_operation(spans, "chat")[0] + tool_span = _spans_by_operation(spans, "execute_tool")[0] + + for span in (agent_span, llm_span, tool_span): + assert span.attributes[GEN_AI_SESSION_ID] == "entry-session" + assert span.attributes[GEN_AI_USER_ID] == "entry-user" + + +@pytest.mark.asyncio +async def test_system_session_propagates_to_agent_llm_and_tool( + tracer_provider, span_exporter +): + messages = [ + SystemMessage("sess-system"), + AssistantMessage([TextBlock("I will read a file.")]), + AssistantMessage( + [ToolUseBlock("toolu_1", "Read", {"file_path": "README.md"})] + ), + UserMessage([ToolResultBlock("toolu_1", "README content")]), + ResultMessage("sess-system"), + ] + + await _run_stream(tracer_provider, messages) + + spans = span_exporter.get_finished_spans() + agent_span = _spans_by_operation(spans, "invoke_agent")[0] + llm_span = _spans_by_operation(spans, "chat")[0] + tool_span = _spans_by_operation(spans, "execute_tool")[0] + + assert agent_span.attributes[GEN_AI_SESSION_ID] == "sess-system" + assert llm_span.attributes[GEN_AI_SESSION_ID] == "sess-system" + assert tool_span.attributes[GEN_AI_SESSION_ID] == "sess-system" + + +@pytest.mark.asyncio +async def test_result_session_sets_agent_span_when_no_init_message( + tracer_provider, span_exporter +): + messages = [ + AssistantMessage([TextBlock("answer")]), + ResultMessage("sess-result"), + ] + + await _run_stream(tracer_provider, messages) + + spans = span_exporter.get_finished_spans() + agent_span = _spans_by_operation(spans, "invoke_agent")[0] + llm_span = _spans_by_operation(spans, "chat")[0] + assert agent_span.attributes[GEN_AI_SESSION_ID] == "sess-result" + assert llm_span.attributes[GEN_AI_SESSION_ID] == "sess-result" + + +@pytest.mark.asyncio +async def test_stream_event_session_propagates_before_first_assistant_message( + tracer_provider, span_exporter +): + messages = [ + StreamEvent("sess-stream"), + AssistantMessage([TextBlock("streamed answer")]), + ResultMessage("sess-stream"), + ] + + await _run_stream(tracer_provider, messages) + + spans = span_exporter.get_finished_spans() + agent_span = _spans_by_operation(spans, "invoke_agent")[0] + llm_span = _spans_by_operation(spans, "chat")[0] + + assert agent_span.attributes[GEN_AI_SESSION_ID] == "sess-stream" + assert llm_span.attributes[GEN_AI_SESSION_ID] == "sess-stream" + + +@pytest.mark.asyncio +async def test_stream_event_dict_session_fallback( + tracer_provider, span_exporter +): + messages = [ + StreamEvent(event_session_id="sess-event-dict"), + AssistantMessage([TextBlock("streamed answer")]), + ResultMessage("sess-event-dict"), + ] + + await _run_stream(tracer_provider, messages) + + spans = span_exporter.get_finished_spans() + agent_span = _spans_by_operation(spans, "invoke_agent")[0] + llm_span = _spans_by_operation(spans, "chat")[0] + + assert agent_span.attributes[GEN_AI_SESSION_ID] == "sess-event-dict" + assert llm_span.attributes[GEN_AI_SESSION_ID] == "sess-event-dict" + + +@pytest.mark.asyncio +async def test_client_query_session_id_is_used_before_result_message( + tracer_provider, span_exporter +): + handler = ExtendedTelemetryHandler(tracer_provider=tracer_provider) + client = SimpleNamespace( + _otel_prompt=None, + _otel_session_id=None, + _otel_handler=handler, + options=SimpleNamespace(model="claude-sonnet"), + ) + + async def wrapped_query(*args, **kwargs): + return None + + async def wrapped_receive_response(): + yield AssistantMessage([TextBlock("answer")]) + yield ResultMessage(None) + + await wrap_claude_client_query( + wrapped_query, + client, + ("hello",), + {"session_id": "client-session"}, + handler=handler, + ) + + async for _ in wrap_claude_client_receive_response( + wrapped_receive_response, + client, + (), + {}, + handler=handler, + ): + pass + + agent_span = _spans_by_operation( + span_exporter.get_finished_spans(), "invoke_agent" + )[0] + assert agent_span.attributes[GEN_AI_SESSION_ID] == "client-session" + + +@pytest.mark.asyncio +async def test_client_query_without_session_does_not_write_default_session( + tracer_provider, span_exporter +): + handler = ExtendedTelemetryHandler(tracer_provider=tracer_provider) + client = SimpleNamespace( + _otel_prompt=None, + _otel_session_id=None, + _otel_handler=handler, + options=SimpleNamespace(model="claude-sonnet"), + ) + + async def wrapped_query(*args, **kwargs): + return None + + async def wrapped_receive_response(): + yield AssistantMessage([TextBlock("answer")]) + yield ResultMessage(None) + + await wrap_claude_client_query( + wrapped_query, + client, + ("hello",), + {}, + handler=handler, + ) + + async for _ in wrap_claude_client_receive_response( + wrapped_receive_response, + client, + (), + {}, + handler=handler, + ): + pass + + agent_span = _spans_by_operation( + span_exporter.get_finished_spans(), "invoke_agent" + )[0] + assert GEN_AI_SESSION_ID not in agent_span.attributes + assert GenAIAttributes.GEN_AI_CONVERSATION_ID not in agent_span.attributes + + +@pytest.mark.asyncio +async def test_standalone_query_resume_sets_initial_session( + tracer_provider, span_exporter +): + handler = ExtendedTelemetryHandler(tracer_provider=tracer_provider) + options = SimpleNamespace(model="claude-sonnet", resume="resume-session") + + async def wrapped_query(*args, **kwargs): + yield AssistantMessage([TextBlock("answer")]) + yield ResultMessage(None) + + async for _ in wrap_query( + wrapped_query, + None, + ("hello",), + {"options": options}, + handler=handler, + ): + pass + + agent_span = _spans_by_operation( + span_exporter.get_finished_spans(), "invoke_agent" + )[0] + llm_span = _spans_by_operation(span_exporter.get_finished_spans(), "chat")[ + 0 + ] + + assert agent_span.attributes[GEN_AI_SESSION_ID] == "resume-session" + assert llm_span.attributes[GEN_AI_SESSION_ID] == "resume-session" + + +@pytest.mark.asyncio +async def test_wrap_query_sequential_calls_create_independent_root_traces( + tracer_provider, span_exporter +): + handler = ExtendedTelemetryHandler(tracer_provider=tracer_provider) + options = SimpleNamespace(model="claude-sonnet", resume="resume-session") + + async def wrapped_query(*args, **kwargs): + yield AssistantMessage([TextBlock("answer")]) + yield ResultMessage(None) + + for prompt in ("first", "second"): + async for _ in wrap_query( + wrapped_query, + None, + (prompt,), + {"options": options}, + handler=handler, + ): + pass + + agent_spans = _spans_by_operation( + span_exporter.get_finished_spans(), "invoke_agent" + ) + + assert len(agent_spans) == 2 + assert all(span.parent is None for span in agent_spans) + assert len({span.context.trace_id for span in agent_spans}) == 2 + assert {span.attributes[GEN_AI_SESSION_ID] for span in agent_spans} == { + "resume-session" + } + + +@pytest.mark.asyncio +async def test_wrap_query_preserves_active_parent_context( + tracer_provider, span_exporter +): + handler = ExtendedTelemetryHandler(tracer_provider=tracer_provider) + options = SimpleNamespace(model="claude-sonnet", resume="parent-session") + tracer = tracer_provider.get_tracer(__name__) + + async def wrapped_query(*args, **kwargs): + yield AssistantMessage([TextBlock("answer")]) + yield ResultMessage(None) + + with tracer.start_as_current_span("caller-operation") as parent_span: + async for _ in wrap_query( + wrapped_query, + None, + ("inside parent",), + {"options": options}, + handler=handler, + ): + pass + + agent_span = _spans_by_operation( + span_exporter.get_finished_spans(), "invoke_agent" + )[0] + + assert agent_span.parent is not None + assert agent_span.parent.span_id == parent_span.context.span_id + assert agent_span.context.trace_id == parent_span.context.trace_id + assert agent_span.attributes[GEN_AI_SESSION_ID] == "parent-session" + + +@pytest.mark.asyncio +async def test_task_subagent_inherits_session_id( + tracer_provider, span_exporter +): + messages = [ + SystemMessage("sess-task"), + AssistantMessage([TextBlock("I will delegate this.")]), + AssistantMessage( + [ + ToolUseBlock( + "toolu_task", + "Task", + { + "subagent_type": "code-reviewer", + "description": "review session handling", + "prompt": "check session propagation", + }, + ) + ] + ), + UserMessage( + [ToolResultBlock("toolu_task", "task result")], + tool_use_result={ + "agentId": "subagent-1", + "content": [{"type": "text", "text": "done"}], + "usage": {"input_tokens": 4, "output_tokens": 2}, + }, + ), + ResultMessage("sess-task"), + ] + + await _run_stream(tracer_provider, messages) + + invoke_agent_sessions = [ + span.attributes[GEN_AI_SESSION_ID] + for span in _spans_by_operation( + span_exporter.get_finished_spans(), "invoke_agent" + ) + ] + tool_span = _spans_by_operation( + span_exporter.get_finished_spans(), "execute_tool" + )[0] + + assert invoke_agent_sessions.count("sess-task") == 2 + assert tool_span.attributes[GEN_AI_SESSION_ID] == "sess-task" + + +@pytest.mark.asyncio +async def test_sequential_streams_create_independent_root_traces( + tracer_provider, span_exporter +): + await _run_stream( + tracer_provider, + [ + SystemMessage("sess-first"), + AssistantMessage([TextBlock("first answer")]), + ResultMessage("sess-first"), + ], + ) + await _run_stream( + tracer_provider, + [ + SystemMessage("sess-second"), + AssistantMessage([TextBlock("second answer")]), + ResultMessage("sess-second"), + ], + ) + + agent_spans = _spans_by_operation( + span_exporter.get_finished_spans(), "invoke_agent" + ) + root_agent_spans = [span for span in agent_spans if span.parent is None] + + assert len(root_agent_spans) == 2 + assert { + span.attributes[GEN_AI_SESSION_ID] for span in root_agent_spans + } == {"sess-first", "sess-second"} + assert len({span.context.trace_id for span in root_agent_spans}) == 2 + + +@pytest.mark.asyncio +async def test_parallel_streams_keep_session_ids_isolated( + tracer_provider, span_exporter +): + async def run(session_id): + await _run_stream( + tracer_provider, + [ + SystemMessage(session_id), + AssistantMessage([TextBlock(f"answer for {session_id}")]), + AssistantMessage( + [ + ToolUseBlock( + "toolu_shared", + "Read", + {"file_path": f"{session_id}.md"}, + ) + ] + ), + UserMessage( + [ToolResultBlock("toolu_shared", f"{session_id} content")] + ), + ResultMessage(session_id), + ], + ) + + await asyncio.gather(run("sess-a"), run("sess-b")) + + agent_spans = _spans_by_operation( + span_exporter.get_finished_spans(), "invoke_agent" + ) + sessions = {span.attributes[GEN_AI_SESSION_ID] for span in agent_spans} + assert sessions == {"sess-a", "sess-b"} + assert len({span.context.trace_id for span in agent_spans}) == 2 + + tool_sessions = { + span.attributes[GEN_AI_SESSION_ID] + for span in _spans_by_operation( + span_exporter.get_finished_spans(), "execute_tool" + ) + } + assert tool_sessions == {"sess-a", "sess-b"} From aed5bb89d841312ba5439342b9f96c3f1667ef18 Mon Sep 17 00:00:00 2001 From: sipercai <53717475+sipercai@users.noreply.github.com> Date: Tue, 23 Jun 2026 13:45:30 +0800 Subject: [PATCH 2/3] docs: update claude agent sdk changelog --- .../loongsuite-instrumentation-claude-agent-sdk/CHANGELOG.md | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/instrumentation-loongsuite/loongsuite-instrumentation-claude-agent-sdk/CHANGELOG.md b/instrumentation-loongsuite/loongsuite-instrumentation-claude-agent-sdk/CHANGELOG.md index 7836f9f0c..fb5a7a6e7 100644 --- a/instrumentation-loongsuite/loongsuite-instrumentation-claude-agent-sdk/CHANGELOG.md +++ b/instrumentation-loongsuite/loongsuite-instrumentation-claude-agent-sdk/CHANGELOG.md @@ -9,7 +9,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Fixed -- Capture Claude Agent SDK session IDs on agent, LLM, and tool spans. +- Capture Claude Agent SDK session IDs on agent, LLM, and tool spans, and + preserve active caller context so SDK traces attach to existing caller spans + instead of being forced to independent roots. ## Version 0.6.0 (2026-06-03) From fff8fd35f707a30bcb01a199cb8865d9da882ee7 Mon Sep 17 00:00:00 2001 From: sipercai <53717475+sipercai@users.noreply.github.com> Date: Tue, 23 Jun 2026 20:06:11 +0800 Subject: [PATCH 3/3] fix: skip empty claude stream session updates --- .../instrumentation/claude_agent_sdk/patch.py | 4 ++++ .../tests/test_session_capture.py | 20 +++++++++++++++++++ 2 files changed, 24 insertions(+) diff --git a/instrumentation-loongsuite/loongsuite-instrumentation-claude-agent-sdk/src/opentelemetry/instrumentation/claude_agent_sdk/patch.py b/instrumentation-loongsuite/loongsuite-instrumentation-claude-agent-sdk/src/opentelemetry/instrumentation/claude_agent_sdk/patch.py index ae2b4e56a..b0d6bcf32 100644 --- a/instrumentation-loongsuite/loongsuite-instrumentation-claude-agent-sdk/src/opentelemetry/instrumentation/claude_agent_sdk/patch.py +++ b/instrumentation-loongsuite/loongsuite-instrumentation-claude-agent-sdk/src/opentelemetry/instrumentation/claude_agent_sdk/patch.py @@ -608,6 +608,10 @@ def _process_stream_event_message( if isinstance(event, dict): session_id = event.get("session_id") + if not session_id: + # Entry baggage is already applied when the agent invocation starts. + return + _set_session_id(agent_invocation, session_id) diff --git a/instrumentation-loongsuite/loongsuite-instrumentation-claude-agent-sdk/tests/test_session_capture.py b/instrumentation-loongsuite/loongsuite-instrumentation-claude-agent-sdk/tests/test_session_capture.py index c53b80b61..d61f57cd6 100644 --- a/instrumentation-loongsuite/loongsuite-instrumentation-claude-agent-sdk/tests/test_session_capture.py +++ b/instrumentation-loongsuite/loongsuite-instrumentation-claude-agent-sdk/tests/test_session_capture.py @@ -22,6 +22,9 @@ from opentelemetry import baggage from opentelemetry import context as otel_context +from opentelemetry.instrumentation.claude_agent_sdk import ( + patch as claude_patch, +) from opentelemetry.instrumentation.claude_agent_sdk.patch import ( _process_agent_invocation_stream, wrap_claude_client_query, @@ -257,6 +260,23 @@ async def test_stream_event_dict_session_fallback( assert llm_span.attributes[GEN_AI_SESSION_ID] == "sess-event-dict" +def test_stream_event_without_session_skips_baggage_lookup(monkeypatch): + def fail_baggage_lookup(): + raise AssertionError("unexpected per-event baggage lookup") + + monkeypatch.setattr( + claude_patch, + "_entry_baggage_identity_attributes", + fail_baggage_lookup, + ) + agent_invocation = SimpleNamespace(conversation_id=None, attributes={}) + + claude_patch._process_stream_event_message(StreamEvent(), agent_invocation) + + assert agent_invocation.conversation_id is None + assert GEN_AI_SESSION_ID not in agent_invocation.attributes + + @pytest.mark.asyncio async def test_client_query_session_id_is_used_before_result_message( tracer_provider, span_exporter