diff --git a/instrumentation-loongsuite/loongsuite-instrumentation-claude-agent-sdk/CHANGELOG.md b/instrumentation-loongsuite/loongsuite-instrumentation-claude-agent-sdk/CHANGELOG.md index fb5a7a6e7..a92f8f499 100644 --- a/instrumentation-loongsuite/loongsuite-instrumentation-claude-agent-sdk/CHANGELOG.md +++ b/instrumentation-loongsuite/loongsuite-instrumentation-claude-agent-sdk/CHANGELOG.md @@ -7,6 +7,15 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## Unreleased +### Added + +- Capture `gen_ai.skill.name`, `gen_ai.skill.id`, `gen_ai.skill.description` + and `gen_ai.skill.version` on the `execute_tool` span of the built-in + `Skill` tool. Skill metadata is read best-effort from the project-level + `SKILL.md` frontmatter (located via `SystemMessage.data.cwd`); `skill.id` + is reported as `claude:project:`. Metadata read failures never + affect the SDK call. + ### Fixed - Capture Claude Agent SDK session IDs on agent, LLM, and tool spans, and diff --git a/instrumentation-loongsuite/loongsuite-instrumentation-claude-agent-sdk/src/opentelemetry/instrumentation/claude_agent_sdk/patch.py b/instrumentation-loongsuite/loongsuite-instrumentation-claude-agent-sdk/src/opentelemetry/instrumentation/claude_agent_sdk/patch.py index b0d6bcf32..71d5c34b5 100644 --- a/instrumentation-loongsuite/loongsuite-instrumentation-claude-agent-sdk/src/opentelemetry/instrumentation/claude_agent_sdk/patch.py +++ b/instrumentation-loongsuite/loongsuite-instrumentation-claude-agent-sdk/src/opentelemetry/instrumentation/claude_agent_sdk/patch.py @@ -15,6 +15,7 @@ """Patch functions for Claude Agent SDK instrumentation.""" import logging +import os import time from typing import Any, Dict, List, Optional @@ -133,6 +134,133 @@ def _clear_client_managed_runs( client_managed_runs.clear() +# The name of the Claude Agent SDK built-in tool that loads a Skill. +_SKILL_TOOL_NAME = "Skill" + +# skill id prefix for project-scoped Claude Agent SDK skills. +_SKILL_ID_PREFIX = "claude:project:" + + +def _read_skill_metadata(skill_md_path: str) -> Dict[str, str]: + """Best-effort read of a Skill's SKILL.md frontmatter. + + Returns a dict with any of ``name``/``description``/``version`` keys that + were present in the YAML frontmatter. On any error (missing file, parse + failure, ...) returns an empty dict so telemetry never breaks the SDK call. + """ + try: + with open(skill_md_path, "r", encoding="utf-8") as f: + content = f.read() + except Exception: + # Missing or unreadable SKILL.md is expected for non-project skills. + return {} + + return _parse_skill_frontmatter(content) + + +def _parse_skill_frontmatter(content: str) -> Dict[str, str]: + """Parse selected scalar fields from SKILL.md frontmatter. + + This intentionally avoids a runtime PyYAML dependency. Claude skill + frontmatter only needs simple top-level scalar fields for telemetry. + """ + try: + stripped = content.lstrip() + if not stripped.startswith("---"): + return {} + # Split off the leading ``---``; the next ``---`` closes the block. + after_open = stripped[3:] + end_index = after_open.find("\n---") + if end_index == -1: + # Frontmatter never closed; treat the remainder as the block. + frontmatter_text = after_open + else: + frontmatter_text = after_open[:end_index] + except Exception: + return {} + + metadata: Dict[str, str] = {} + wanted_keys = {"name", "description", "version"} + for raw_line in frontmatter_text.splitlines(): + line = raw_line.strip() + if not line or line.startswith("#") or ":" not in line: + continue + + key, value = line.split(":", 1) + key = key.strip() + if key not in wanted_keys: + continue + + value = value.strip() + if len(value) >= 2 and value[0] == value[-1] and value[0] in { + '"', + "'", + }: + value = value[1:-1] + if value: + metadata[key] = value + return metadata + + +def _apply_skill_metadata( + tool_invocation: ExecuteToolInvocation, + skill_name: str, + cwd: Optional[str], +) -> None: + """Attach ``gen_ai.skill.*`` attributes to a Skill load tool span. + + Reads the project-level ``SKILL.md`` frontmatter best-effort and fills in + ``skill_name``/``skill_id``/``skill_description``/``skill_version`` on the + invocation. Any failure is swallowed so the SDK call is never affected. + """ + if not skill_name: + return + + metadata: Dict[str, str] = {} + if cwd: + skill_md_path = os.path.join( + cwd, ".claude", "skills", skill_name, "SKILL.md" + ) + metadata = _read_skill_metadata(skill_md_path) + + # gen_ai.skill.name: prefer the requested tool input; frontmatter is + # supplemental metadata for description/version. + name = skill_name or metadata.get("name") + if not name: + return + tool_invocation.skill_name = name + tool_invocation.skill_id = f"{_SKILL_ID_PREFIX}{name}" + + description = metadata.get("description") + if description: + tool_invocation.skill_description = description + version = metadata.get("version") + if version: + tool_invocation.skill_version = version + + +def _apply_skill_fallback( + tool_invocation: ExecuteToolInvocation, + tool_use_result: Any, +) -> None: + """Best-effort fallback to recover skill_name before closing a Skill span. + + If ``skill_name`` was not captured at span start (e.g. cwd was unavailable + so SKILL.md could not be read), try ``UserMessage.tool_use_result.commandName`` + per the SDK's Skill tool result format. + """ + if tool_invocation.skill_name: + return + if not isinstance(tool_use_result, dict): + return + command_name = tool_use_result.get("commandName") + if command_name: + tool_invocation.skill_name = str(command_name) + tool_invocation.skill_id = ( + f"{_SKILL_ID_PREFIX}{command_name}" + ) + + def _extract_message_parts(msg: Any) -> List[Any]: """Extract parts (text + tool calls) from an AssistantMessage.""" parts = [] @@ -161,12 +289,17 @@ def _create_tool_spans_from_message( active_task_stack: List[Any], client_managed_runs: Dict[str, ExecuteToolInvocation], exclude_tool_names: Optional[List[str]] = None, + cwd: Optional[str] = None, ) -> None: """Create tool execution spans from ToolUseBlocks in an AssistantMessage. Tool spans are children of the active SubAgent span (if any), otherwise agent span. When a Task tool is created, it's pushed onto active_task_stack along with a SubAgent span. + For the built-in ``Skill`` tool, ``gen_ai.skill.*`` attributes are read + best-effort from the project-level ``SKILL.md`` frontmatter (located via + ``cwd``) and attached to the tool span. + The stack structure is: [{"task": ExecuteToolInvocation, "subagent": InvokeAgentInvocation}, ...] """ if not hasattr(msg, "content"): @@ -214,6 +347,26 @@ def _create_tool_spans_from_message( _apply_session_identity( tool_invocation, agent_invocation.conversation_id ) + + # Skill load: attach gen_ai.skill.* attributes best-effort + # from the project SKILL.md frontmatter. Failures here must + # never propagate to break the SDK call. + if tool_name == _SKILL_TOOL_NAME: + try: + skill_name = "" + if isinstance(tool_input, dict): + skill_name = str( + tool_input.get("skill") or "" + ) + _apply_skill_metadata( + tool_invocation, skill_name, cwd + ) + except Exception as e: + logger.warning( + f"Failed to read Skill metadata for " + f"'{tool_input}': {e}" + ) + handler.start_execute_tool(tool_invocation) client_managed_runs[tool_use_id] = tool_invocation @@ -327,6 +480,7 @@ def _process_assistant_message( collected_messages: List[Dict[str, Any]], active_task_stack: List[Any], client_managed_runs: Dict[str, ExecuteToolInvocation], + cwd: Optional[str] = None, ) -> None: """Process AssistantMessage: create LLM turn, extract parts, create tool spans.""" parts = _extract_message_parts(msg) @@ -414,6 +568,7 @@ def _process_assistant_message( agent_invocation, active_task_stack, client_managed_runs, + cwd=cwd, ) @@ -535,6 +690,18 @@ def _process_user_message( Error(message=error_msg, type=RuntimeError), ) else: + # Skill load: best-effort fallback to fill skill_name + # from the tool result if it wasn't captured at start. + if tool_invocation.tool_name == _SKILL_TOOL_NAME: + try: + _apply_skill_fallback( + tool_invocation, tool_use_result + ) + except Exception as e: + logger.warning( + f"Failed to apply Skill metadata " + f"fallback: {e}" + ) handler.stop_execute_tool(tool_invocation) if tool_use_id: @@ -583,18 +750,25 @@ def _process_user_message( def _process_system_message( msg: Any, agent_invocation: InvokeAgentInvocation, -) -> None: - """Process SystemMessage: extract session_id early in the stream. +) -> Optional[str]: + """Process SystemMessage: extract session_id and cwd early in the stream. SystemMessage appears at the beginning of the message stream and contains - the session_id in its data field. We extract it here so that it's available - for all subsequent LLM spans. + the session_id and cwd in its data field. We extract them here so they are + available for all subsequent spans (cwd is needed to locate project-level + SKILL.md files for Skill tool telemetry). + + Returns the cwd if present, otherwise ``None``. """ if hasattr(msg, "subtype") and msg.subtype == "init": if hasattr(msg, "data") and isinstance(msg.data, dict): session_id = msg.data.get("session_id") if session_id: _set_session_id(agent_invocation, session_id) + cwd = msg.data.get("cwd") + if cwd: + return str(cwd) + return None def _process_stream_event_message( @@ -670,12 +844,19 @@ async def _process_agent_invocation_stream( active_task_stack: List[Any] = [] client_managed_runs: Dict[str, ExecuteToolInvocation] = {} + # cwd captured from SystemMessage.data.cwd, used to locate project-level + # SKILL.md files for Skill tool telemetry. + session_cwd: Optional[str] = None + agent_closed = False + try: async for msg in wrapped_stream: msg_type = type(msg).__name__ if msg_type == "SystemMessage": - _process_system_message(msg, agent_invocation) + cwd = _process_system_message(msg, agent_invocation) + if cwd: + session_cwd = cwd elif msg_type == "StreamEvent": _process_stream_event_message(msg, agent_invocation) elif msg_type == "AssistantMessage": @@ -689,6 +870,7 @@ async def _process_agent_invocation_stream( collected_messages, active_task_stack, client_managed_runs, + cwd=session_cwd, ) elif msg_type == "UserMessage": _process_user_message( @@ -705,15 +887,23 @@ async def _process_agent_invocation_stream( yield msg handler.stop_invoke_agent(agent_invocation) + agent_closed = True - except Exception as e: + except BaseException as e: error_msg = str(e) - if agent_invocation.span: - agent_invocation.span.set_attribute("error.type", type(e).__name__) - agent_invocation.span.set_attribute("error.message", error_msg) - handler.fail_invoke_agent( - agent_invocation, error=Error(message=error_msg, type=type(e)) - ) + if not agent_closed: + if agent_invocation.span: + agent_invocation.span.set_attribute( + "error.type", type(e).__name__ + ) + agent_invocation.span.set_attribute( + "error.message", error_msg + ) + handler.fail_invoke_agent( + agent_invocation, + error=Error(message=error_msg, type=type(e)), + ) + agent_closed = True raise finally: diff --git a/instrumentation-loongsuite/loongsuite-instrumentation-claude-agent-sdk/tests/cassettes/test_skill_load.yaml b/instrumentation-loongsuite/loongsuite-instrumentation-claude-agent-sdk/tests/cassettes/test_skill_load.yaml new file mode 100644 index 000000000..241302fd8 --- /dev/null +++ b/instrumentation-loongsuite/loongsuite-instrumentation-claude-agent-sdk/tests/cassettes/test_skill_load.yaml @@ -0,0 +1,76 @@ +description: 'Skill load: project-level probe-skill loaded via Skill tool' +prompt: Use the probe-skill Skill tool first. Then answer exactly PROBE_SKILL_MARKER and nothing else. +messages: +- type: SystemMessage + subtype: init + data: + type: system + subtype: init + cwd: __SKILL_CWD__ + session_id: skill-session-0001 + tools: + - Skill + - Bash + - Read + skills: + - probe-skill + mcp_servers: [] + model: qwen-plus + permissionMode: bypassPermissions + apiKeySource: ANTHROPIC_API_KEY + claude_code_version: 2.1.1 + output_style: default + agents: [] + slash_commands: [] + plugins: [] + uuid: skill-init-uuid +- type: AssistantMessage + model: qwen-plus + content: + - type: ToolUseBlock + id: call_skill_load_probe + name: Skill + input: + skill: probe-skill + parent_tool_use_id: null + error: null +- type: UserMessage + content: + - type: ToolResultBlock + tool_use_id: call_skill_load_probe + content: 'Launching skill: probe-skill' + is_error: false + uuid: skill-result-uuid + parent_tool_use_id: null + tool_use_result: + success: true + commandName: probe-skill +- type: AssistantMessage + model: qwen-plus + content: + - type: TextBlock + text: PROBE_SKILL_MARKER + parent_tool_use_id: null + error: null +- type: ResultMessage + subtype: success + duration_ms: 3210 + duration_api_ms: 9000 + is_error: false + num_turns: 2 + session_id: skill-session-0001 + total_cost_usd: 0.012 + usage: + input_tokens: 1024 + cache_creation_input_tokens: 0 + cache_read_input_tokens: 0 + output_tokens: 32 + server_tool_use: + web_search_requests: 0 + web_fetch_requests: 0 + service_tier: standard + cache_creation: + ephemeral_1h_input_tokens: 0 + ephemeral_5m_input_tokens: 0 + result: PROBE_SKILL_MARKER + structured_output: null diff --git a/instrumentation-loongsuite/loongsuite-instrumentation-claude-agent-sdk/tests/test_session_capture.py b/instrumentation-loongsuite/loongsuite-instrumentation-claude-agent-sdk/tests/test_session_capture.py index d61f57cd6..14a3327dc 100644 --- a/instrumentation-loongsuite/loongsuite-instrumentation-claude-agent-sdk/tests/test_session_capture.py +++ b/instrumentation-loongsuite/loongsuite-instrumentation-claude-agent-sdk/tests/test_session_capture.py @@ -122,6 +122,11 @@ async def _stream(messages): yield message +async def _cancelled_stream(session_id): + yield SystemMessage(session_id) + await asyncio.sleep(60) + + def _spans_by_operation(spans, operation): return [ span @@ -538,6 +543,50 @@ async def test_sequential_streams_create_independent_root_traces( assert len({span.context.trace_id for span in root_agent_spans}) == 2 +@pytest.mark.asyncio +async def test_cancelled_stream_detaches_agent_context( + tracer_provider, span_exporter +): + handler = ExtendedTelemetryHandler(tracer_provider=tracer_provider) + + with pytest.raises((TimeoutError, asyncio.TimeoutError)): + async with asyncio.timeout(0.01): + async for _ in _process_agent_invocation_stream( + wrapped_stream=_cancelled_stream("sess-cancelled"), + handler=handler, + model="claude-sonnet", + prompt="this stream will be cancelled", + ): + pass + + await _run_stream( + tracer_provider, + [ + SystemMessage("sess-after-cancel"), + AssistantMessage([TextBlock("answer after cancellation")]), + ResultMessage("sess-after-cancel"), + ], + ) + + agent_spans = _spans_by_operation( + span_exporter.get_finished_spans(), "invoke_agent" + ) + cancelled_span = [ + span + for span in agent_spans + if span.attributes.get(GEN_AI_SESSION_ID) == "sess-cancelled" + ][0] + after_span = [ + span + for span in agent_spans + if span.attributes.get(GEN_AI_SESSION_ID) == "sess-after-cancel" + ][0] + + assert cancelled_span.attributes["error.type"] == "CancelledError" + assert after_span.parent is None + assert after_span.context.trace_id != cancelled_span.context.trace_id + + @pytest.mark.asyncio async def test_parallel_streams_keep_session_ids_isolated( tracer_provider, span_exporter diff --git a/instrumentation-loongsuite/loongsuite-instrumentation-claude-agent-sdk/tests/test_span_validation.py b/instrumentation-loongsuite/loongsuite-instrumentation-claude-agent-sdk/tests/test_span_validation.py index e53f84fe0..3755e0390 100644 --- a/instrumentation-loongsuite/loongsuite-instrumentation-claude-agent-sdk/tests/test_span_validation.py +++ b/instrumentation-loongsuite/loongsuite-instrumentation-claude-agent-sdk/tests/test_span_validation.py @@ -21,6 +21,7 @@ - Span hierarchy and timeline """ +import asyncio import json from pathlib import Path from typing import Any, AsyncIterator, Dict, List @@ -95,6 +96,7 @@ def create_mock_message_from_data(message_data: Dict[str, Any]) -> Any: mock_msg.uuid = message_data.get("uuid") mock_msg.parent_tool_use_id = message_data.get("parent_tool_use_id") + mock_msg.tool_use_result = message_data.get("tool_use_result") elif msg_type == "ResultMessage": mock_msg.subtype = message_data["subtype"] @@ -522,3 +524,330 @@ async def test_span_hierarchy_correctness( assert tool_span.parent.span_id != llm_span.context.span_id, ( "Tool span should not be a child of LLM span" ) + + +# ============================================================================ +# Tests - Skill Tool Span (gen_ai.skill.* attributes) +# ============================================================================ + + +def _write_probe_skill_md( + project_dir: Path, + skill_name: str = "probe-skill", + version: str = "1.2.3", +) -> str: + """Create a project-level probe SKILL.md and return its project dir.""" + skill_dir = project_dir / ".claude" / "skills" / skill_name + skill_dir.mkdir(parents=True, exist_ok=True) + skill_md = skill_dir / "SKILL.md" + skill_md.write_text( + "---\n" + f"name: {skill_name}\n" + f"description: Skill telemetry probe for {skill_name}.\n" + f"version: {version}\n" + "---\n\n" + "When this skill is loaded, answer exactly: PROBE_SKILL_MARKER\n", + encoding="utf-8", + ) + return str(project_dir) + + +def _skill_load_messages( + cwd: str, + skill_name: str = "probe-skill", + session_id: str = "skill-session-0001", + tool_use_id: str = "call_skill_load_probe", + marker: str = "PROBE_SKILL_MARKER", +) -> List[Dict[str, Any]]: + """Message sequence for a Skill load, modelled on the SDK message stream.""" + return [ + { + "type": "SystemMessage", + "subtype": "init", + "data": { + "type": "system", + "subtype": "init", + "cwd": cwd, + "session_id": session_id, + "tools": ["Skill", "Bash", "Read"], + "skills": [skill_name], + "model": "qwen-plus", + "permissionMode": "bypassPermissions", + "apiKeySource": "ANTHROPIC_API_KEY", + "claude_code_version": "2.1.1", + "output_style": "default", + "agents": [], + "slash_commands": [], + "plugins": [], + "mcp_servers": [], + "uuid": "skill-init-uuid", + }, + }, + { + "type": "AssistantMessage", + "model": "qwen-plus", + "content": [ + { + "type": "ToolUseBlock", + "id": tool_use_id, + "name": "Skill", + "input": {"skill": skill_name}, + } + ], + "parent_tool_use_id": None, + "error": None, + }, + { + "type": "UserMessage", + "content": [ + { + "type": "ToolResultBlock", + "tool_use_id": tool_use_id, + "content": f"Launching skill: {skill_name}", + "is_error": False, + } + ], + "uuid": "skill-result-uuid", + "parent_tool_use_id": None, + "tool_use_result": { + "success": True, + "commandName": skill_name, + }, + }, + { + "type": "AssistantMessage", + "model": "qwen-plus", + "content": [{"type": "TextBlock", "text": marker}], + "parent_tool_use_id": None, + "error": None, + }, + { + "type": "ResultMessage", + "subtype": "success", + "duration_ms": 3210, + "duration_api_ms": 9000, + "is_error": False, + "num_turns": 2, + "session_id": session_id, + "total_cost_usd": 0.012, + "usage": { + "input_tokens": 1024, + "cache_creation_input_tokens": 0, + "cache_read_input_tokens": 0, + "output_tokens": 32, + "server_tool_use": { + "web_search_requests": 0, + "web_fetch_requests": 0, + }, + "service_tier": "standard", + "cache_creation": { + "ephemeral_1h_input_tokens": 0, + "ephemeral_5m_input_tokens": 0, + }, + }, + "result": marker, + "structured_output": None, + }, + ] + + +@pytest.mark.asyncio +async def test_skill_tool_span_attributes( + instrument, span_exporter, tracer_provider, tmp_path +): + """Verify gen_ai.skill.* attributes on a Skill load execute_tool span. + + Validates per the Skill telemetry spec: + 1. Exactly one gen_ai.tool.name=Skill execute_tool span exists. + 2. That span carries gen_ai.skill.name/id/description/version. + 3. skill id is ``claude:project:``. + 4. Metadata is read best-effort from the project SKILL.md frontmatter. + """ + from opentelemetry.instrumentation.claude_agent_sdk.patch import ( # noqa: PLC0415 + _process_agent_invocation_stream, + ) + from opentelemetry.semconv._incubating.attributes import ( # noqa: PLC0415 + gen_ai_attributes as GenAIAttributes, + ) + from opentelemetry.util.genai.extended_handler import ( # noqa: PLC0415 + ExtendedTelemetryHandler, + ) + + cwd = _write_probe_skill_md(tmp_path) + handler = ExtendedTelemetryHandler(tracer_provider=tracer_provider) + mock_stream = create_mock_stream_from_messages(_skill_load_messages(cwd)) + + async for _ in _process_agent_invocation_stream( + wrapped_stream=mock_stream, + handler=handler, + model="qwen-plus", + prompt=( + "Use the probe-skill Skill tool first. Then answer exactly " + "PROBE_SKILL_MARKER and nothing else." + ), + ): + pass + + spans = span_exporter.get_finished_spans() + + skill_tool_spans = [ + s + for s in spans + if dict(s.attributes or {}).get(GenAIAttributes.GEN_AI_OPERATION_NAME) + == "execute_tool" + and dict(s.attributes or {}).get(GenAIAttributes.GEN_AI_TOOL_NAME) + == "Skill" + ] + + # Pass criterion 2: exactly one gen_ai.tool.name=Skill execute_tool span. + assert len(skill_tool_spans) == 1, ( + f"Should capture exactly one Skill execute_tool span, got " + f"{len(skill_tool_spans)}" + ) + + tool_span = skill_tool_spans[0] + attrs = dict(tool_span.attributes or {}) + + # Pass criterion 3: span carries all four gen_ai.skill.* attributes. + assert attrs.get("gen_ai.skill.name") == "probe-skill" + assert attrs.get("gen_ai.skill.id") == "claude:project:probe-skill" + assert attrs.get("gen_ai.skill.description") == ( + "Skill telemetry probe for probe-skill." + ) + assert attrs.get("gen_ai.skill.version") == "1.2.3" + + # Tool span still carries the standard tool attributes. + assert attrs.get(GenAIAttributes.GEN_AI_TOOL_CALL_ID) == ( + "call_skill_load_probe" + ) + + +@pytest.mark.asyncio +async def test_skill_metadata_read_failure_does_not_break_sdk( + instrument, span_exporter, tracer_provider, tmp_path +): + """Skill metadata read failures must not affect the SDK call (best-effort). + + When cwd points nowhere useful (no SKILL.md), the Skill tool span is still + created with skill.name/id derived from the tool input; no exception escapes. + """ + from opentelemetry.instrumentation.claude_agent_sdk.patch import ( # noqa: PLC0415 + _process_agent_invocation_stream, + ) + from opentelemetry.semconv._incubating.attributes import ( # noqa: PLC0415 + gen_ai_attributes as GenAIAttributes, + ) + from opentelemetry.util.genai.extended_handler import ( # noqa: PLC0415 + ExtendedTelemetryHandler, + ) + + # cwd with no .claude/skills tree -> SKILL.md read returns empty best-effort + cwd = str(tmp_path) + handler = ExtendedTelemetryHandler(tracer_provider=tracer_provider) + mock_stream = create_mock_stream_from_messages(_skill_load_messages(cwd)) + + async for _ in _process_agent_invocation_stream( + wrapped_stream=mock_stream, + handler=handler, + model="qwen-plus", + prompt="Use the probe-skill Skill tool.", + ): + pass + + spans = span_exporter.get_finished_spans() + skill_tool_spans = [ + s + for s in spans + if dict(s.attributes or {}).get(GenAIAttributes.GEN_AI_OPERATION_NAME) + == "execute_tool" + and dict(s.attributes or {}).get(GenAIAttributes.GEN_AI_TOOL_NAME) + == "Skill" + ] + assert len(skill_tool_spans) == 1 + attrs = dict(skill_tool_spans[0].attributes or {}) + # name/id fall back to the requested skill; description/version absent. + assert attrs.get("gen_ai.skill.name") == "probe-skill" + assert attrs.get("gen_ai.skill.id") == "claude:project:probe-skill" + assert "gen_ai.skill.description" not in attrs + assert "gen_ai.skill.version" not in attrs + + +@pytest.mark.asyncio +async def test_parallel_skill_loads_keep_metadata_isolated( + instrument, span_exporter, tracer_provider, tmp_path +): + """Parallel streams with the same tool_use_id must not mix Skill spans.""" + from opentelemetry.instrumentation.claude_agent_sdk.patch import ( # noqa: PLC0415 + _process_agent_invocation_stream, + ) + from opentelemetry.semconv._incubating.attributes import ( # noqa: PLC0415 + gen_ai_attributes as GenAIAttributes, + ) + from opentelemetry.util.genai.extended_handler import ( # noqa: PLC0415 + ExtendedTelemetryHandler, + ) + from opentelemetry.util.genai.extended_semconv.gen_ai_extended_attributes import ( # noqa: PLC0415 + GEN_AI_SESSION_ID, + ) + + async def interleaved_stream(messages): + for message in messages: + await asyncio.sleep(0) + yield create_mock_message_from_data(message) + + async def run_case(skill_name: str, session_id: str, version: str) -> None: + project_dir = tmp_path / skill_name + cwd = _write_probe_skill_md( + project_dir, skill_name=skill_name, version=version + ) + messages = _skill_load_messages( + cwd, + skill_name=skill_name, + session_id=session_id, + tool_use_id="shared_skill_tool", + marker=f"{skill_name.upper()}_MARKER", + ) + handler = ExtendedTelemetryHandler(tracer_provider=tracer_provider) + + async for _ in _process_agent_invocation_stream( + wrapped_stream=interleaved_stream(messages), + handler=handler, + model="qwen-plus", + prompt=f"Use the {skill_name} Skill tool.", + ): + pass + + await asyncio.gather( + run_case("alpha-skill", "session-alpha", "1.0.0"), + run_case("beta-skill", "session-beta", "2.0.0"), + ) + + skill_tool_attrs = [] + for span in span_exporter.get_finished_spans(): + attrs = dict(span.attributes or {}) + if ( + attrs.get(GenAIAttributes.GEN_AI_OPERATION_NAME) + == "execute_tool" + and attrs.get(GenAIAttributes.GEN_AI_TOOL_NAME) == "Skill" + ): + skill_tool_attrs.append(attrs) + + assert len(skill_tool_attrs) == 2 + attrs_by_skill = { + attrs["gen_ai.skill.name"]: attrs for attrs in skill_tool_attrs + } + assert set(attrs_by_skill) == {"alpha-skill", "beta-skill"} + + assert attrs_by_skill["alpha-skill"]["gen_ai.skill.id"] == ( + "claude:project:alpha-skill" + ) + assert attrs_by_skill["alpha-skill"]["gen_ai.skill.version"] == "1.0.0" + assert attrs_by_skill["alpha-skill"][GEN_AI_SESSION_ID] == ( + "session-alpha" + ) + + assert attrs_by_skill["beta-skill"]["gen_ai.skill.id"] == ( + "claude:project:beta-skill" + ) + assert attrs_by_skill["beta-skill"]["gen_ai.skill.version"] == "2.0.0" + assert attrs_by_skill["beta-skill"][GEN_AI_SESSION_ID] == "session-beta"