Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,13 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## Unreleased

### Fixed

- Record token usage from Qwen-Agent DashScope response metadata on streaming
and non-streaming chat spans.
- Roll up child LLM token usage to Qwen-Agent invoke-agent spans, preserve
nested agent spans, and record only the final agent answer as output.

## Version 0.6.0 (2026-06-03)

There are no changelog entries for this release.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@
from opentelemetry.util.genai.types import Error

from .utils import (
_apply_usage_to_llm_invocation,
_convert_qwen_agent_final_output_messages,
_convert_qwen_messages_to_output_messages,
_create_agent_invocation,
_create_llm_invocation,
Expand All @@ -56,11 +58,14 @@
_react_step_counter: ContextVar[int] = ContextVar(
"qwen_react_step_counter", default=0
)
_active_agent_invocations: ContextVar[tuple[Any, ...]] = ContextVar(
"qwen_active_agent_invocations", default=()
)

# Reentrancy guards to prevent duplicate spans when Agent/BaseChatModel
# are abstract classes and subclass calls super() (Proxy/Wrapper scenarios).
_in_agent_run: ContextVar[bool] = ContextVar(
"_qwen_in_agent_run", default=False
_agent_run_instance_stack: ContextVar[tuple[int, ...]] = ContextVar(
"_qwen_agent_run_instance_stack", default=()
)
_in_chat: ContextVar[bool] = ContextVar("_qwen_in_chat", default=False)
_in_call_tool: ContextVar[bool] = ContextVar(
Expand All @@ -79,6 +84,42 @@ def _close_active_react_step(handler: ExtendedTelemetryHandler) -> None:
_react_step_invocation.set(None)


def _accumulate_llm_usage_on_active_agents(invocation: Any) -> None:
"""Roll up child LLM token usage onto active invoke_agent spans.

The rollup is intentionally transitive: a parent agent records the total
nested LLM cost of its run, so consumers should not sum agent spans to
calculate global token usage.
"""
active_agents = _active_agent_invocations.get()
if not active_agents:
return

for active_agent in active_agents:
if getattr(invocation, "input_tokens", None) is not None:
active_agent.input_tokens = (active_agent.input_tokens or 0) + (
invocation.input_tokens or 0
)
if getattr(invocation, "output_tokens", None) is not None:
active_agent.output_tokens = (active_agent.output_tokens or 0) + (
invocation.output_tokens or 0
)
if (
getattr(invocation, "usage_cache_read_input_tokens", None)
is not None
):
active_agent.usage_cache_read_input_tokens = (
active_agent.usage_cache_read_input_tokens or 0
) + (invocation.usage_cache_read_input_tokens or 0)
if (
getattr(invocation, "usage_cache_creation_input_tokens", None)
is not None
):
active_agent.usage_cache_creation_input_tokens = (
active_agent.usage_cache_creation_input_tokens or 0
) + (invocation.usage_cache_creation_input_tokens or 0)


def wrap_agent_run(
wrapped, instance, args, kwargs, handler: ExtendedTelemetryHandler
):
Expand All @@ -93,18 +134,20 @@ def wrap_agent_run(
"""
# Reentrancy guard: prevent duplicate spans in Proxy/Wrapper scenarios
# where a subclass calls super().run().
if _in_agent_run.get():
run_stack = _agent_run_instance_stack.get()
instance_id = id(instance)
if instance_id in run_stack:
yield from wrapped(*args, **kwargs)
return
run_token = _in_agent_run.set(True)
run_token = _agent_run_instance_stack.set(run_stack + (instance_id,))

messages = args[0] if args else kwargs.get("messages", [])

try:
invocation = _create_agent_invocation(instance, messages)
except Exception as e:
logger.debug(f"Failed to create agent invocation: {e}")
_in_agent_run.reset(run_token)
_agent_run_instance_stack.reset(run_token)
yield from wrapped(*args, **kwargs)
return

Expand All @@ -113,6 +156,9 @@ def wrap_agent_run(
mode_token = _react_mode.set(is_react)
counter_token = _react_step_counter.set(0)
step_token = _react_step_invocation.set(None)
active_agent_token = _active_agent_invocations.set(
_active_agent_invocations.get() + (invocation,)
)

handler.start_invoke_agent(invocation)

Expand All @@ -125,7 +171,7 @@ def wrap_agent_run(
# Extract output from last yielded response
if last_response:
invocation.output_messages = (
_convert_qwen_messages_to_output_messages(last_response)
_convert_qwen_agent_final_output_messages(last_response)
)

# Close the last react_step span before closing invoke_agent.
Expand Down Expand Up @@ -153,7 +199,8 @@ def wrap_agent_run(
_react_step_counter.reset(counter_token)
_react_step_invocation.reset(step_token)
_react_mode.reset(mode_token)
_in_agent_run.reset(run_token)
_active_agent_invocations.reset(active_agent_token)
_agent_run_instance_stack.reset(run_token)


def wrap_chat_model_chat(
Expand Down Expand Up @@ -206,6 +253,7 @@ def wrap_chat_model_chat(
else:
# Non-streaming: result is List[Message]
if result:
_apply_usage_to_llm_invocation(invocation, result)
invocation.output_messages = (
_convert_qwen_messages_to_output_messages(result)
)
Expand All @@ -225,6 +273,7 @@ def wrap_chat_model_chat(
invocation.finish_reasons = ["tool_calls"]
break

_accumulate_llm_usage_on_active_agents(invocation)
handler.stop_llm(invocation)
return result

Expand All @@ -246,6 +295,7 @@ def _wrap_streaming_llm_response(
if first_token:
invocation.monotonic_first_token_s = timeit.default_timer()
first_token = False
_apply_usage_to_llm_invocation(invocation, response)
last_response = response
yield response

Expand All @@ -269,6 +319,7 @@ def _wrap_streaming_llm_response(
invocation.finish_reasons = ["tool_calls"]
break

_accumulate_llm_usage_on_active_agents(invocation)
handler.stop_llm(invocation)

except GeneratorExit as e:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,183 @@ def _extract_content_text(content: Any) -> str:
return str(content) if content else ""


def _field_value(value: Any, *names: str) -> Any:
"""Read the first present field from a mapping or SDK response object."""
if value is None:
return None

for name in names:
if isinstance(value, dict):
if name in value:
return value[name]
continue

try:
attr_value = getattr(value, name)
except Exception:
attr_value = None
if attr_value is not None:
return attr_value

get_method = getattr(value, "get", None)
if callable(get_method):
try:
got_value = get_method(name)
except Exception:
got_value = None
if got_value is not None:
return got_value

return None


def _int_value(value: Any) -> Optional[int]:
if value is None:
return None
try:
return int(value)
except (TypeError, ValueError):
return None


def _usage_token_values(usage: Any) -> Dict[str, int]:
if usage is None:
return {}

input_tokens = _int_value(
_field_value(usage, "input_tokens", "prompt_tokens")
)
output_tokens = _int_value(
_field_value(usage, "output_tokens", "completion_tokens")
)
cache_read_tokens = _int_value(
_field_value(usage, "cache_read_input_tokens", "cached_prompt_tokens")
)
cache_creation_tokens = _int_value(
_field_value(usage, "cache_creation_input_tokens")
)

for detail_name in ("prompt_tokens_details", "input_tokens_details"):
details = _field_value(usage, detail_name)
if details is not None and cache_read_tokens is None:
cache_read_tokens = _int_value(
_field_value(details, "cached_tokens")
)

values: Dict[str, int] = {}
if input_tokens is not None:
values["input_tokens"] = input_tokens
if output_tokens is not None:
values["output_tokens"] = output_tokens
if cache_read_tokens is not None and cache_read_tokens > 0:
values["cache_read_input_tokens"] = cache_read_tokens
if cache_creation_tokens is not None and cache_creation_tokens > 0:
values["cache_creation_input_tokens"] = cache_creation_tokens

return values


def _usage_score(usage_values: Dict[str, int]) -> int:
return (usage_values.get("input_tokens") or 0) + (
usage_values.get("output_tokens") or 0
)


def _usage_sources(value: Any) -> List[Any]:
sources = []
usage = _field_value(value, "usage")
if usage is not None:
sources.append(usage)

extra = _field_value(value, "extra")
if extra is not None:
extra_usage = _field_value(extra, "usage", "usage_metadata")
if extra_usage is not None:
sources.append(extra_usage)

service_info = _field_value(extra, "model_service_info")
if service_info is not None:
sources.append(service_info)

service_info = _field_value(value, "model_service_info")
if service_info is not None:
sources.append(service_info)

return sources


def _extract_usage_values(value: Any, depth: int = 0) -> Dict[str, int]:
"""Extract token usage from qwen-agent Message/extra/model_service_info."""
if value is None or depth > 4:
return {}

best_values: Dict[str, int] = {}
values = _usage_token_values(value)
if values:
best_values = values

if isinstance(value, (list, tuple)):
for item in reversed(value):
item_values = _extract_usage_values(item, depth + 1)
if _usage_score(item_values) > _usage_score(best_values):
best_values = item_values
return best_values

for source in _usage_sources(value):
source_values = _extract_usage_values(source, depth + 1)
if _usage_score(source_values) > _usage_score(best_values):
best_values = source_values

return best_values


def _apply_usage_to_llm_invocation(
invocation: LLMInvocation, value: Any
) -> None:
"""Apply qwen-agent token usage metadata to an LLMInvocation.

Qwen-Agent stores DashScope responses under Message.extra["model_service_info"]
for both streaming and non-streaming calls. Streaming chunks can carry
cumulative usage, so only replace existing values when the candidate usage
has at least as many observed tokens as the current invocation.
"""
usage_values = _extract_usage_values(value)
if not usage_values:
return

current_score = (invocation.input_tokens or 0) + (
invocation.output_tokens or 0
)
if current_score and _usage_score(usage_values) < current_score:
return

if "input_tokens" in usage_values:
invocation.input_tokens = usage_values["input_tokens"]
if "output_tokens" in usage_values:
invocation.output_tokens = usage_values["output_tokens"]
if "cache_read_input_tokens" in usage_values:
invocation.usage_cache_read_input_tokens = usage_values[
"cache_read_input_tokens"
]
if "cache_creation_input_tokens" in usage_values:
invocation.usage_cache_creation_input_tokens = usage_values[
"cache_creation_input_tokens"
]


def apply_token_usage_from_qwen_messages(
invocation: LLMInvocation,
messages: Any,
) -> None:
"""Populate token usage from qwen-agent Message metadata.

Kept as a compatibility entrypoint for callers that used the previous
helper name; the instrumentation wrapper now calls the generic extractor
directly so it can process individual streaming chunks.
"""
_apply_usage_to_llm_invocation(invocation, messages)


def _convert_qwen_messages_to_input_messages(
messages: Any,
) -> List[InputMessage]:
Expand Down Expand Up @@ -291,6 +468,36 @@ def _convert_qwen_messages_to_output_messages(
return output_messages


def _convert_qwen_agent_final_output_messages(
messages: Any,
) -> List[OutputMessage]:
"""Convert only the final qwen-agent answer to GenAI OutputMessage format."""
if not messages:
return []

if not isinstance(messages, list):
messages = [messages]

for msg in reversed(messages):
try:
role = _field_value(msg, "role") or "assistant"
function_call = _field_value(msg, "function_call")
content = _field_value(msg, "content") or ""

if role in ("function", "tool") or function_call:
continue

text = _extract_content_text(content)
if text:
return _convert_qwen_messages_to_output_messages([msg])
except Exception as e:
logger.debug(f"Error extracting final agent output message: {e}")
continue

logger.debug("No final qwen-agent assistant text output message found")
return []


def _get_tool_definitions(
functions: Optional[List[Dict]],
) -> Optional[List[FunctionToolDefinition]]:
Expand Down
Loading
Loading