From 5088b41c7ebf17804f864180bce75bf3ab38d792 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=B5=81=E5=B1=BF?= Date: Tue, 16 Jun 2026 15:28:27 +0800 Subject: [PATCH] fix(qwen-agent): improve token and nested agent tracing --- .../CHANGELOG.md | 7 + .../instrumentation/qwen_agent/patch.py | 65 +++- .../instrumentation/qwen_agent/utils.py | 207 ++++++++++ .../tests/test_spans.py | 363 +++++++++++++++++- 4 files changed, 634 insertions(+), 8 deletions(-) diff --git a/instrumentation-loongsuite/loongsuite-instrumentation-qwen-agent/CHANGELOG.md b/instrumentation-loongsuite/loongsuite-instrumentation-qwen-agent/CHANGELOG.md index 14e4b8269..45eef220f 100644 --- a/instrumentation-loongsuite/loongsuite-instrumentation-qwen-agent/CHANGELOG.md +++ b/instrumentation-loongsuite/loongsuite-instrumentation-qwen-agent/CHANGELOG.md @@ -7,6 +7,13 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## Unreleased +### Fixed + +- Record token usage from Qwen-Agent DashScope response metadata on streaming + and non-streaming chat spans. +- Roll up child LLM token usage to Qwen-Agent invoke-agent spans, preserve + nested agent spans, and record only the final agent answer as output. + ## Version 0.6.0 (2026-06-03) There are no changelog entries for this release. diff --git a/instrumentation-loongsuite/loongsuite-instrumentation-qwen-agent/src/opentelemetry/instrumentation/qwen_agent/patch.py b/instrumentation-loongsuite/loongsuite-instrumentation-qwen-agent/src/opentelemetry/instrumentation/qwen_agent/patch.py index 7fcf0ff16..c395cd3f5 100644 --- a/instrumentation-loongsuite/loongsuite-instrumentation-qwen-agent/src/opentelemetry/instrumentation/qwen_agent/patch.py +++ b/instrumentation-loongsuite/loongsuite-instrumentation-qwen-agent/src/opentelemetry/instrumentation/qwen_agent/patch.py @@ -37,6 +37,8 @@ from opentelemetry.util.genai.types import Error from .utils import ( + _apply_usage_to_llm_invocation, + _convert_qwen_agent_final_output_messages, _convert_qwen_messages_to_output_messages, _create_agent_invocation, _create_llm_invocation, @@ -56,11 +58,14 @@ _react_step_counter: ContextVar[int] = ContextVar( "qwen_react_step_counter", default=0 ) +_active_agent_invocations: ContextVar[tuple[Any, ...]] = ContextVar( + "qwen_active_agent_invocations", default=() +) # Reentrancy guards to prevent duplicate spans when Agent/BaseChatModel # are abstract classes and subclass calls super() (Proxy/Wrapper scenarios). -_in_agent_run: ContextVar[bool] = ContextVar( - "_qwen_in_agent_run", default=False +_agent_run_instance_stack: ContextVar[tuple[int, ...]] = ContextVar( + "_qwen_agent_run_instance_stack", default=() ) _in_chat: ContextVar[bool] = ContextVar("_qwen_in_chat", default=False) _in_call_tool: ContextVar[bool] = ContextVar( @@ -79,6 +84,42 @@ def _close_active_react_step(handler: ExtendedTelemetryHandler) -> None: _react_step_invocation.set(None) +def _accumulate_llm_usage_on_active_agents(invocation: Any) -> None: + """Roll up child LLM token usage onto active invoke_agent spans. + + The rollup is intentionally transitive: a parent agent records the total + nested LLM cost of its run, so consumers should not sum agent spans to + calculate global token usage. + """ + active_agents = _active_agent_invocations.get() + if not active_agents: + return + + for active_agent in active_agents: + if getattr(invocation, "input_tokens", None) is not None: + active_agent.input_tokens = (active_agent.input_tokens or 0) + ( + invocation.input_tokens or 0 + ) + if getattr(invocation, "output_tokens", None) is not None: + active_agent.output_tokens = (active_agent.output_tokens or 0) + ( + invocation.output_tokens or 0 + ) + if ( + getattr(invocation, "usage_cache_read_input_tokens", None) + is not None + ): + active_agent.usage_cache_read_input_tokens = ( + active_agent.usage_cache_read_input_tokens or 0 + ) + (invocation.usage_cache_read_input_tokens or 0) + if ( + getattr(invocation, "usage_cache_creation_input_tokens", None) + is not None + ): + active_agent.usage_cache_creation_input_tokens = ( + active_agent.usage_cache_creation_input_tokens or 0 + ) + (invocation.usage_cache_creation_input_tokens or 0) + + def wrap_agent_run( wrapped, instance, args, kwargs, handler: ExtendedTelemetryHandler ): @@ -93,10 +134,12 @@ def wrap_agent_run( """ # Reentrancy guard: prevent duplicate spans in Proxy/Wrapper scenarios # where a subclass calls super().run(). - if _in_agent_run.get(): + run_stack = _agent_run_instance_stack.get() + instance_id = id(instance) + if instance_id in run_stack: yield from wrapped(*args, **kwargs) return - run_token = _in_agent_run.set(True) + run_token = _agent_run_instance_stack.set(run_stack + (instance_id,)) messages = args[0] if args else kwargs.get("messages", []) @@ -104,7 +147,7 @@ def wrap_agent_run( invocation = _create_agent_invocation(instance, messages) except Exception as e: logger.debug(f"Failed to create agent invocation: {e}") - _in_agent_run.reset(run_token) + _agent_run_instance_stack.reset(run_token) yield from wrapped(*args, **kwargs) return @@ -113,6 +156,9 @@ def wrap_agent_run( mode_token = _react_mode.set(is_react) counter_token = _react_step_counter.set(0) step_token = _react_step_invocation.set(None) + active_agent_token = _active_agent_invocations.set( + _active_agent_invocations.get() + (invocation,) + ) handler.start_invoke_agent(invocation) @@ -125,7 +171,7 @@ def wrap_agent_run( # Extract output from last yielded response if last_response: invocation.output_messages = ( - _convert_qwen_messages_to_output_messages(last_response) + _convert_qwen_agent_final_output_messages(last_response) ) # Close the last react_step span before closing invoke_agent. @@ -153,7 +199,8 @@ def wrap_agent_run( _react_step_counter.reset(counter_token) _react_step_invocation.reset(step_token) _react_mode.reset(mode_token) - _in_agent_run.reset(run_token) + _active_agent_invocations.reset(active_agent_token) + _agent_run_instance_stack.reset(run_token) def wrap_chat_model_chat( @@ -206,6 +253,7 @@ def wrap_chat_model_chat( else: # Non-streaming: result is List[Message] if result: + _apply_usage_to_llm_invocation(invocation, result) invocation.output_messages = ( _convert_qwen_messages_to_output_messages(result) ) @@ -225,6 +273,7 @@ def wrap_chat_model_chat( invocation.finish_reasons = ["tool_calls"] break + _accumulate_llm_usage_on_active_agents(invocation) handler.stop_llm(invocation) return result @@ -246,6 +295,7 @@ def _wrap_streaming_llm_response( if first_token: invocation.monotonic_first_token_s = timeit.default_timer() first_token = False + _apply_usage_to_llm_invocation(invocation, response) last_response = response yield response @@ -269,6 +319,7 @@ def _wrap_streaming_llm_response( invocation.finish_reasons = ["tool_calls"] break + _accumulate_llm_usage_on_active_agents(invocation) handler.stop_llm(invocation) except GeneratorExit as e: diff --git a/instrumentation-loongsuite/loongsuite-instrumentation-qwen-agent/src/opentelemetry/instrumentation/qwen_agent/utils.py b/instrumentation-loongsuite/loongsuite-instrumentation-qwen-agent/src/opentelemetry/instrumentation/qwen_agent/utils.py index 269358540..db6975ebc 100644 --- a/instrumentation-loongsuite/loongsuite-instrumentation-qwen-agent/src/opentelemetry/instrumentation/qwen_agent/utils.py +++ b/instrumentation-loongsuite/loongsuite-instrumentation-qwen-agent/src/opentelemetry/instrumentation/qwen_agent/utils.py @@ -102,6 +102,183 @@ def _extract_content_text(content: Any) -> str: return str(content) if content else "" +def _field_value(value: Any, *names: str) -> Any: + """Read the first present field from a mapping or SDK response object.""" + if value is None: + return None + + for name in names: + if isinstance(value, dict): + if name in value: + return value[name] + continue + + try: + attr_value = getattr(value, name) + except Exception: + attr_value = None + if attr_value is not None: + return attr_value + + get_method = getattr(value, "get", None) + if callable(get_method): + try: + got_value = get_method(name) + except Exception: + got_value = None + if got_value is not None: + return got_value + + return None + + +def _int_value(value: Any) -> Optional[int]: + if value is None: + return None + try: + return int(value) + except (TypeError, ValueError): + return None + + +def _usage_token_values(usage: Any) -> Dict[str, int]: + if usage is None: + return {} + + input_tokens = _int_value( + _field_value(usage, "input_tokens", "prompt_tokens") + ) + output_tokens = _int_value( + _field_value(usage, "output_tokens", "completion_tokens") + ) + cache_read_tokens = _int_value( + _field_value(usage, "cache_read_input_tokens", "cached_prompt_tokens") + ) + cache_creation_tokens = _int_value( + _field_value(usage, "cache_creation_input_tokens") + ) + + for detail_name in ("prompt_tokens_details", "input_tokens_details"): + details = _field_value(usage, detail_name) + if details is not None and cache_read_tokens is None: + cache_read_tokens = _int_value( + _field_value(details, "cached_tokens") + ) + + values: Dict[str, int] = {} + if input_tokens is not None: + values["input_tokens"] = input_tokens + if output_tokens is not None: + values["output_tokens"] = output_tokens + if cache_read_tokens is not None and cache_read_tokens > 0: + values["cache_read_input_tokens"] = cache_read_tokens + if cache_creation_tokens is not None and cache_creation_tokens > 0: + values["cache_creation_input_tokens"] = cache_creation_tokens + + return values + + +def _usage_score(usage_values: Dict[str, int]) -> int: + return (usage_values.get("input_tokens") or 0) + ( + usage_values.get("output_tokens") or 0 + ) + + +def _usage_sources(value: Any) -> List[Any]: + sources = [] + usage = _field_value(value, "usage") + if usage is not None: + sources.append(usage) + + extra = _field_value(value, "extra") + if extra is not None: + extra_usage = _field_value(extra, "usage", "usage_metadata") + if extra_usage is not None: + sources.append(extra_usage) + + service_info = _field_value(extra, "model_service_info") + if service_info is not None: + sources.append(service_info) + + service_info = _field_value(value, "model_service_info") + if service_info is not None: + sources.append(service_info) + + return sources + + +def _extract_usage_values(value: Any, depth: int = 0) -> Dict[str, int]: + """Extract token usage from qwen-agent Message/extra/model_service_info.""" + if value is None or depth > 4: + return {} + + best_values: Dict[str, int] = {} + values = _usage_token_values(value) + if values: + best_values = values + + if isinstance(value, (list, tuple)): + for item in reversed(value): + item_values = _extract_usage_values(item, depth + 1) + if _usage_score(item_values) > _usage_score(best_values): + best_values = item_values + return best_values + + for source in _usage_sources(value): + source_values = _extract_usage_values(source, depth + 1) + if _usage_score(source_values) > _usage_score(best_values): + best_values = source_values + + return best_values + + +def _apply_usage_to_llm_invocation( + invocation: LLMInvocation, value: Any +) -> None: + """Apply qwen-agent token usage metadata to an LLMInvocation. + + Qwen-Agent stores DashScope responses under Message.extra["model_service_info"] + for both streaming and non-streaming calls. Streaming chunks can carry + cumulative usage, so only replace existing values when the candidate usage + has at least as many observed tokens as the current invocation. + """ + usage_values = _extract_usage_values(value) + if not usage_values: + return + + current_score = (invocation.input_tokens or 0) + ( + invocation.output_tokens or 0 + ) + if current_score and _usage_score(usage_values) < current_score: + return + + if "input_tokens" in usage_values: + invocation.input_tokens = usage_values["input_tokens"] + if "output_tokens" in usage_values: + invocation.output_tokens = usage_values["output_tokens"] + if "cache_read_input_tokens" in usage_values: + invocation.usage_cache_read_input_tokens = usage_values[ + "cache_read_input_tokens" + ] + if "cache_creation_input_tokens" in usage_values: + invocation.usage_cache_creation_input_tokens = usage_values[ + "cache_creation_input_tokens" + ] + + +def apply_token_usage_from_qwen_messages( + invocation: LLMInvocation, + messages: Any, +) -> None: + """Populate token usage from qwen-agent Message metadata. + + Kept as a compatibility entrypoint for callers that used the previous + helper name; the instrumentation wrapper now calls the generic extractor + directly so it can process individual streaming chunks. + """ + _apply_usage_to_llm_invocation(invocation, messages) + + def _convert_qwen_messages_to_input_messages( messages: Any, ) -> List[InputMessage]: @@ -291,6 +468,36 @@ def _convert_qwen_messages_to_output_messages( return output_messages +def _convert_qwen_agent_final_output_messages( + messages: Any, +) -> List[OutputMessage]: + """Convert only the final qwen-agent answer to GenAI OutputMessage format.""" + if not messages: + return [] + + if not isinstance(messages, list): + messages = [messages] + + for msg in reversed(messages): + try: + role = _field_value(msg, "role") or "assistant" + function_call = _field_value(msg, "function_call") + content = _field_value(msg, "content") or "" + + if role in ("function", "tool") or function_call: + continue + + text = _extract_content_text(content) + if text: + return _convert_qwen_messages_to_output_messages([msg]) + except Exception as e: + logger.debug(f"Error extracting final agent output message: {e}") + continue + + logger.debug("No final qwen-agent assistant text output message found") + return [] + + def _get_tool_definitions( functions: Optional[List[Dict]], ) -> Optional[List[FunctionToolDefinition]]: diff --git a/instrumentation-loongsuite/loongsuite-instrumentation-qwen-agent/tests/test_spans.py b/instrumentation-loongsuite/loongsuite-instrumentation-qwen-agent/tests/test_spans.py index dd42670cb..5ce33105d 100644 --- a/instrumentation-loongsuite/loongsuite-instrumentation-qwen-agent/tests/test_spans.py +++ b/instrumentation-loongsuite/loongsuite-instrumentation-qwen-agent/tests/test_spans.py @@ -18,6 +18,8 @@ with the expected names, kinds, and attributes. """ +import json +from types import SimpleNamespace from unittest.mock import MagicMock, patch import pytest @@ -25,6 +27,7 @@ from qwen_agent.llm.base import BaseChatModel from qwen_agent.llm.schema import ContentItem, FunctionCall, Message +from opentelemetry.instrumentation.qwen_agent import QwenAgentInstrumentor from opentelemetry.semconv._incubating.attributes import ( gen_ai_attributes as GenAIAttributes, ) @@ -130,6 +133,50 @@ def test_non_stream_chat_creates_span(self, span_exporter, instrument): f"Expected 'dashscope', got attrs: {attrs}" ) + def test_non_stream_chat_records_token_usage( + self, span_exporter, instrument + ): + """Non-streaming chat() should record token usage from Message.extra.""" + model = _StubChatModel( + model="deepseek-v3", model_type="qwen_dashscope" + ) + + fake_response = [ + Message( + role="assistant", + content="4", + extra={ + "model_service_info": SimpleNamespace( + usage={ + "input_tokens": 21, + "output_tokens": 1, + "total_tokens": 22, + "prompt_tokens_details": {"cached_tokens": 4}, + } + ) + }, + ) + ] + + with patch.object( + _StubChatModel, + "_chat_no_stream", + return_value=fake_response, + ): + model.chat( + messages=[Message(role="user", content="What is 2+2?")], + stream=False, + ) + + spans = span_exporter.get_finished_spans() + chat_spans = [s for s in spans if s.name.startswith("chat")] + assert len(chat_spans) >= 1 + attrs = dict(chat_spans[0].attributes or {}) + assert attrs.get(GenAIAttributes.GEN_AI_USAGE_INPUT_TOKENS) == 21 + assert attrs.get(GenAIAttributes.GEN_AI_USAGE_OUTPUT_TOKENS) == 1 + assert attrs.get("gen_ai.usage.total_tokens") == 22 + assert attrs.get("gen_ai.usage.cache_read.input_tokens") == 4 + def test_stream_chat_creates_span(self, span_exporter, instrument): """Streaming chat() should create a chat span after the iterator is consumed.""" model = _StubChatModel(model="qwen-turbo", model_type="qwen_dashscope") @@ -162,6 +209,122 @@ def fake_stream(messages, **kwargs): assert span.name == "chat qwen-turbo" assert span.kind == SpanKind.CLIENT + def test_stream_chat_records_token_usage(self, span_exporter, instrument): + """Streaming chat() should record final cumulative token usage.""" + model = _StubChatModel( + model="deepseek-v3", model_type="qwen_dashscope" + ) + + chunk1 = [ + Message( + role="assistant", + content="The", + extra={ + "model_service_info": { + "usage": { + "input_tokens": 18, + "output_tokens": 1, + "total_tokens": 19, + } + } + }, + ) + ] + chunk2 = [ + Message( + role="assistant", + content="The answer is 4.", + extra={ + "model_service_info": { + "usage": { + "input_tokens": 18, + "output_tokens": 5, + "total_tokens": 23, + } + } + }, + ) + ] + + def fake_stream(messages, **kwargs): + yield chunk1 + yield chunk2 + + with patch.object( + _StubChatModel, + "_chat_stream", + side_effect=fake_stream, + ): + response_iter = model.chat( + messages=[Message(role="user", content="What is 2+2?")], + stream=True, + ) + list(response_iter) + + spans = span_exporter.get_finished_spans() + chat_spans = [s for s in spans if s.name.startswith("chat")] + assert len(chat_spans) >= 1 + attrs = dict(chat_spans[0].attributes or {}) + assert attrs.get(GenAIAttributes.GEN_AI_USAGE_INPUT_TOKENS) == 18 + assert attrs.get(GenAIAttributes.GEN_AI_USAGE_OUTPUT_TOKENS) == 5 + assert attrs.get("gen_ai.usage.total_tokens") == 23 + + def test_stream_chat_keeps_most_complete_usage( + self, span_exporter, instrument + ): + """Streaming chat() should keep the largest cumulative usage seen.""" + model = _StubChatModel( + model="deepseek-v3", model_type="qwen_dashscope" + ) + + chunk1 = [ + Message( + role="assistant", + content="The", + extra={ + "model_service_info": { + "usage": {"input_tokens": 18, "output_tokens": 1} + } + }, + ) + ] + chunk2 = [ + Message( + role="assistant", + content="The answer", + extra={ + "model_service_info": { + "usage": {"input_tokens": 18, "output_tokens": 5} + } + }, + ) + ] + chunk3 = [Message(role="assistant", content="The answer is 4.")] + + def fake_stream(messages, **kwargs): + yield chunk1 + yield chunk2 + yield chunk3 + + with patch.object( + _StubChatModel, + "_chat_stream", + side_effect=fake_stream, + ): + response_iter = model.chat( + messages=[Message(role="user", content="What is 2+2?")], + stream=True, + ) + list(response_iter) + + spans = span_exporter.get_finished_spans() + chat_spans = [s for s in spans if s.name.startswith("chat")] + assert len(chat_spans) >= 1 + attrs = dict(chat_spans[0].attributes or {}) + assert attrs.get(GenAIAttributes.GEN_AI_USAGE_INPUT_TOKENS) == 18 + assert attrs.get(GenAIAttributes.GEN_AI_USAGE_OUTPUT_TOKENS) == 5 + assert attrs.get("gen_ai.usage.total_tokens") == 23 + def test_chat_with_function_call_response(self, span_exporter, instrument): """Chat response containing a function_call should still produce a valid span.""" model = _StubChatModel(model="qwen-max", model_type="qwen_dashscope") @@ -334,6 +497,189 @@ def fake_run(messages, **kwargs): # The wrapper should produce exactly one span per run() call assert len(agent_spans) == 1 + def test_agent_run_records_only_final_output_message( + self, + span_exporter, + tracer_provider, + logger_provider, + meter_provider, + monkeypatch, + ): + """Agent output should keep the final answer, not tool calls/results.""" + monkeypatch.setenv( + "OTEL_INSTRUMENTATION_GENAI_CAPTURE_MESSAGE_CONTENT", "SPAN_ONLY" + ) + instrumentor = QwenAgentInstrumentor() + instrumentor.instrument( + tracer_provider=tracer_provider, + logger_provider=logger_provider, + meter_provider=meter_provider, + skip_dep_check=True, + ) + + llm = MagicMock() + llm.model = "qwen-plus" + llm.model_type = "qwen_dashscope" + + agent = _StubAgent.create(name="OnePilotBot", llm=llm) + + response_msgs = [ + Message( + role="assistant", + content="", + function_call=FunctionCall( + name="get_operational_snapshot", + arguments='{"incident_id": "INC-1"}', + ), + ), + Message( + role="function", + name="get_operational_snapshot", + content='{"p95_latency_ms": 1840}', + ), + Message( + role="assistant", + content="", + function_call=FunctionCall( + name="score_bundle_plan", + arguments='{"plan_name": "gray-rollout"}', + ), + ), + Message( + role="function", + name="score_bundle_plan", + content='{"score": 80}', + ), + Message(role="assistant", content="Final verdict: continue."), + ] + + def fake_run(messages, **kwargs): + yield response_msgs + + try: + with patch.object(_StubAgent, "_run", side_effect=fake_run): + list(agent.run([Message(role="user", content="diagnose")])) + finally: + instrumentor.uninstrument() + + spans = span_exporter.get_finished_spans() + agent_spans = [s for s in spans if "invoke_agent" in s.name] + assert len(agent_spans) == 1 + + attrs = dict(agent_spans[0].attributes or {}) + output = json.loads(attrs["gen_ai.output.messages"]) + assert len(output) == 1 + assert output[0]["role"] == "assistant" + assert output[0]["finish_reason"] == "stop" + assert output[0]["parts"] == [ + {"content": "Final verdict: continue.", "type": "text"} + ] + + def test_agent_run_without_final_answer_skips_tool_output_messages( + self, + span_exporter, + tracer_provider, + logger_provider, + meter_provider, + monkeypatch, + ): + """Agent output should not fall back to intermediate tool messages.""" + monkeypatch.setenv( + "OTEL_INSTRUMENTATION_GENAI_CAPTURE_MESSAGE_CONTENT", "SPAN_ONLY" + ) + instrumentor = QwenAgentInstrumentor() + instrumentor.instrument( + tracer_provider=tracer_provider, + logger_provider=logger_provider, + meter_provider=meter_provider, + skip_dep_check=True, + ) + + llm = MagicMock() + llm.model = "qwen-plus" + llm.model_type = "qwen_dashscope" + + agent = _StubAgent.create(name="ToolOnlyBot", llm=llm) + + response_msgs = [ + Message( + role="assistant", + content="", + function_call=FunctionCall( + name="get_operational_snapshot", + arguments='{"incident_id": "INC-1"}', + ), + ), + Message( + role="function", + name="get_operational_snapshot", + content='{"p95_latency_ms": 1840}', + ), + ] + + def fake_run(messages, **kwargs): + yield response_msgs + + try: + with patch.object(_StubAgent, "_run", side_effect=fake_run): + list(agent.run([Message(role="user", content="diagnose")])) + finally: + instrumentor.uninstrument() + + spans = span_exporter.get_finished_spans() + agent_spans = [s for s in spans if "invoke_agent" in s.name] + assert len(agent_spans) == 1 + + attrs = dict(agent_spans[0].attributes or {}) + assert "gen_ai.output.messages" not in attrs + + def test_nested_agent_run_creates_child_invoke_agent_span( + self, span_exporter, instrument + ): + """Nested runs on different agent instances should not be suppressed.""" + parent_llm = MagicMock() + parent_llm.model = "qwen-turbo" + parent_llm.model_type = "qwen_dashscope" + child_llm = MagicMock() + child_llm.model = "qwen-plus" + child_llm.model_type = "qwen_dashscope" + + parent_agent = _StubAgent.create(name="ParentBot", llm=parent_llm) + child_agent = _StubAgent.create(name="ChildBot", llm=child_llm) + + def fake_run(self, messages, **kwargs): + if self is parent_agent: + yield from child_agent.run( + [Message(role="user", content="child task")] + ) + yield [Message(role="assistant", content="parent final")] + elif self is child_agent: + yield [Message(role="assistant", content="child final")] + else: + yield [Message(role="assistant", content="unexpected")] + + with patch.object( + _StubAgent, "_run", autospec=True, side_effect=fake_run + ): + results = list( + parent_agent.run([Message(role="user", content="parent task")]) + ) + + assert len(results) == 2 + + spans = span_exporter.get_finished_spans() + agent_spans = [s for s in spans if "invoke_agent" in s.name] + span_by_name = {s.name: s for s in agent_spans} + assert set(span_by_name) == { + "invoke_agent ParentBot", + "invoke_agent ChildBot", + } + + parent_span = span_by_name["invoke_agent ParentBot"] + child_span = span_by_name["invoke_agent ChildBot"] + assert child_span.parent is not None + assert child_span.parent.span_id == parent_span.context.span_id + # --------------------------------------------------------------------------- # Tool call span tests @@ -451,7 +797,17 @@ def test_agent_run_with_llm_call_produces_nested_spans( model = _StubChatModel(model="qwen-max", model_type="qwen_dashscope") agent = _StubAgent.create(name="NestBot", llm=model) - llm_response = [Message(role="assistant", content="The answer is 42.")] + llm_response = [ + Message( + role="assistant", + content="The answer is 42.", + extra={ + "model_service_info": { + "usage": {"input_tokens": 21, "output_tokens": 7} + } + }, + ) + ] def fake_run(messages, **kwargs): # Simulate the agent calling LLM internally @@ -480,6 +836,11 @@ def fake_run(messages, **kwargs): assert chat_span.parent is not None assert chat_span.parent.span_id == agent_span.context.span_id + agent_attrs = dict(agent_span.attributes or {}) + assert agent_attrs.get(GenAIAttributes.GEN_AI_USAGE_INPUT_TOKENS) == 21 + assert agent_attrs.get(GenAIAttributes.GEN_AI_USAGE_OUTPUT_TOKENS) == 7 + assert agent_attrs.get("gen_ai.usage.total_tokens") == 28 + def test_agent_run_with_tool_call_produces_nested_spans( self, span_exporter, instrument ):