diff --git a/src/harness_sdk/instrumentation/anthropic/__init__.py b/src/harness_sdk/instrumentation/anthropic/__init__.py index 725ff55..03cb4b3 100644 --- a/src/harness_sdk/instrumentation/anthropic/__init__.py +++ b/src/harness_sdk/instrumentation/anthropic/__init__.py @@ -78,6 +78,33 @@ def _evaluate_invocation(invocation: LLMInvocation) -> None: logger.debug("Anthropic span evaluation error: %s", err) +def _set_if_present(target: Any, attr: str, value: Any) -> None: + if target is not None and value is not None: + setattr(target, attr, value) + + +def _copy_usage_semconv(invocation: LLMInvocation, usage: Any) -> None: + """Populate optional GenAI usage attrs not modeled by LLMInvocation.""" + if usage is None: + return + + inf = getattr(invocation, "_inference_invocation", None) + _set_if_present( + inf, + "cache_read_input_tokens", + getattr(usage, "cache_read_input_tokens", None), + ) + _set_if_present( + inf, + "cache_creation_input_tokens", + getattr(usage, "cache_creation_input_tokens", None), + ) + + reasoning_tokens = getattr(usage, "reasoning_output_tokens", None) + if reasoning_tokens is not None: + invocation.attributes["gen_ai.usage.reasoning.output_tokens"] = reasoning_tokens + + def _build_invocation( handler: TelemetryHandler, params: Any, @@ -129,6 +156,7 @@ def _wrapper( logger.debug("Anthropic Messages.create (sync): returning stream wrapper") return _MessagesStreamWrapper(result, handler, invocation, capture_content) set_invocation_response_attributes(invocation, result, capture_content) + _copy_usage_semconv(invocation, getattr(result, "usage", None)) handler.stop_llm(invocation) logger.debug("Anthropic Messages.create (sync): complete") return result @@ -173,6 +201,7 @@ async def _wrapper( logger.debug("Anthropic Messages.create (async): returning stream wrapper") return _AsyncMessagesStreamWrapper(result, handler, invocation, capture_content) set_invocation_response_attributes(invocation, result, capture_content) + _copy_usage_semconv(invocation, getattr(result, "usage", None)) handler.stop_llm(invocation) logger.debug("Anthropic Messages.create (async): complete") return result diff --git a/src/harness_sdk/instrumentation/openai/__init__.py b/src/harness_sdk/instrumentation/openai/__init__.py index 06a50a3..9c04015 100644 --- a/src/harness_sdk/instrumentation/openai/__init__.py +++ b/src/harness_sdk/instrumentation/openai/__init__.py @@ -16,6 +16,7 @@ from __future__ import annotations +import inspect from typing import Any, Callable import wrapt @@ -49,6 +50,47 @@ def _get_handler() -> TelemetryHandler: return TelemetryHandler() +def _create_chat_invocation( + handler: TelemetryHandler, + kwargs: dict[str, Any], + instance: Any, + capture_content: bool, +) -> Any: + """Support both old and new openai_v2 helper signatures.""" + params = inspect.signature(create_chat_invocation).parameters + if "handler" in params: + return create_chat_invocation(handler, kwargs, instance, capture_content) + return handler.start_llm( + create_chat_invocation(kwargs, instance, capture_content=capture_content) + ) + + +def _stop_invocation(handler: TelemetryHandler, invocation: Any) -> None: + if hasattr(invocation, "stop"): + invocation.stop() + return + handler.stop_llm(invocation) + + +def _fail_invocation(handler: TelemetryHandler, invocation: Any, error: Error) -> None: + if hasattr(invocation, "fail"): + invocation.fail(error) + return + handler.fail_llm(invocation, error) + + +def _chat_stream_wrapper( + stream: Any, + handler: TelemetryHandler, + invocation: Any, + capture_content: bool, +) -> Any: + params = inspect.signature(ChatStreamWrapper).parameters + if "handler" in params: + return ChatStreamWrapper(stream, handler, invocation, capture_content) + return ChatStreamWrapper(stream, invocation, capture_content) + + def _evaluate_invocation(invocation: Any) -> None: """Run Traceable policy evaluation against the live span; raise if blocked. @@ -79,6 +121,47 @@ def _evaluate_invocation(invocation: Any) -> None: logger.debug("OpenAI span evaluation error: %s", err) +def _get_value(obj: Any, key: str) -> Any: + if obj is None: + return None + if isinstance(obj, dict): + return obj.get(key) + return getattr(obj, key, None) + + +def _set_if_present(target: Any, attr: str, value: Any) -> None: + if target is not None and value is not None: + setattr(target, attr, value) + + +def _copy_usage_semconv(invocation: LLMInvocation, result: Any) -> None: + """Populate optional GenAI usage attrs not modeled by LLMInvocation.""" + usage = _get_value(result, "usage") + if usage is None: + return + + prompt_details = _get_value(usage, "prompt_tokens_details") + completion_details = _get_value(usage, "completion_tokens_details") + inf = getattr(invocation, "_inference_invocation", None) or invocation + + _set_if_present( + inf, + "cache_read_input_tokens", + _get_value(usage, "cache_read_input_tokens") + or _get_value(prompt_details, "cached_tokens"), + ) + _set_if_present( + inf, + "cache_creation_input_tokens", + _get_value(usage, "cache_creation_input_tokens") + or _get_value(prompt_details, "cache_creation_tokens"), + ) + + reasoning_tokens = _get_value(completion_details, "reasoning_tokens") + if reasoning_tokens is not None: + invocation.attributes["gen_ai.usage.reasoning.output_tokens"] = reasoning_tokens + + def _make_chat_create_sync(handler: TelemetryHandler, capture_content: bool) -> Callable[..., Any]: logger.debug("OpenAI Completions.create (sync): capture_content=%s", capture_content) @@ -92,13 +175,13 @@ def _wrapper( logger.debug("OpenAI Completions.create (sync): gen_ai disabled, passthrough") return wrapped(*args, **kwargs) - invocation = handler.start_llm( - create_chat_invocation(kwargs, instance, capture_content=capture_content) - ) + invocation = _create_chat_invocation(handler, kwargs, instance, capture_content) try: _evaluate_invocation(invocation) except ControlEvaluationBlocked: - handler.fail_llm(invocation, Error(message="blocked", type=ControlEvaluationBlocked)) + _fail_invocation( + handler, invocation, Error(message="blocked", type=ControlEvaluationBlocked) + ) raise try: result = wrapped(*args, **kwargs) @@ -106,16 +189,19 @@ def _wrapper( if is_streaming(kwargs): logger.debug("OpenAI Completions.create (sync): returning stream wrapper") invocation.attributes["gen_ai.request.streaming"] = True - return ChatStreamWrapper(parsed_result, handler, invocation, capture_content) + return _chat_stream_wrapper( + parsed_result, handler, invocation, capture_content + ) _set_response_properties(invocation, parsed_result, capture_content) - handler.stop_llm(invocation) + _copy_usage_semconv(invocation, parsed_result) + _stop_invocation(handler, invocation) logger.debug("OpenAI Completions.create (sync): complete") return result except ControlEvaluationBlocked: raise except Exception as exc: # pylint: disable=broad-except logger.debug("OpenAI Completions.create (sync): exception=%s", exc) - handler.fail_llm(invocation, Error(message=str(exc), type=type(exc))) + _fail_invocation(handler, invocation, Error(message=str(exc), type=type(exc))) raise return _wrapper @@ -134,13 +220,13 @@ async def _wrapper( logger.debug("OpenAI AsyncCompletions.create (async): gen_ai disabled, passthrough") return await wrapped(*args, **kwargs) - invocation = handler.start_llm( - create_chat_invocation(kwargs, instance, capture_content=capture_content) - ) + invocation = _create_chat_invocation(handler, kwargs, instance, capture_content) try: _evaluate_invocation(invocation) except ControlEvaluationBlocked: - handler.fail_llm(invocation, Error(message="blocked", type=ControlEvaluationBlocked)) + _fail_invocation( + handler, invocation, Error(message="blocked", type=ControlEvaluationBlocked) + ) raise try: result = await wrapped(*args, **kwargs) @@ -148,16 +234,19 @@ async def _wrapper( if is_streaming(kwargs): logger.debug("OpenAI AsyncCompletions.create (async): returning stream wrapper") invocation.attributes["gen_ai.request.streaming"] = True - return ChatStreamWrapper(parsed_result, handler, invocation, capture_content) + return _chat_stream_wrapper( + parsed_result, handler, invocation, capture_content + ) _set_response_properties(invocation, parsed_result, capture_content) - handler.stop_llm(invocation) + _copy_usage_semconv(invocation, parsed_result) + _stop_invocation(handler, invocation) logger.debug("OpenAI AsyncCompletions.create (async): complete") return result except ControlEvaluationBlocked: raise except Exception as exc: # pylint: disable=broad-except logger.debug("OpenAI AsyncCompletions.create (async): exception=%s", exc) - handler.fail_llm(invocation, Error(message=str(exc), type=type(exc))) + _fail_invocation(handler, invocation, Error(message=str(exc), type=type(exc))) raise return _wrapper diff --git a/test/instrumentation/anthropic/test_anthropic.py b/test/instrumentation/anthropic/test_anthropic.py index 6cacc8c..977edab 100644 --- a/test/instrumentation/anthropic/test_anthropic.py +++ b/test/instrumentation/anthropic/test_anthropic.py @@ -31,8 +31,9 @@ def anthropic_instrumentor(): class _FakeUsage: input_tokens = 2 output_tokens = 4 - cache_creation_input_tokens = None - cache_read_input_tokens = None + cache_creation_input_tokens = 1 + cache_read_input_tokens = 3 + reasoning_output_tokens = 2 class _FakeMessage: @@ -62,12 +63,16 @@ def test_anthropic_span_has_gen_ai_attributes(agent, exporter, anthropic_instrum exporter.clear() assert len(spans) == 1 attrs = spans[0].attributes - assert attrs.get("gen_ai.system") == "anthropic" + assert attrs.get("gen_ai.provider.name") == "anthropic" + assert "gen_ai.system" not in attrs assert attrs.get("gen_ai.operation.name") == "chat" assert attrs.get("gen_ai.request.model") == "claude-3-haiku-20240307" assert attrs.get("gen_ai.response.id") == "msg_test" assert attrs.get("gen_ai.usage.input_tokens") == 2 assert attrs.get("gen_ai.usage.output_tokens") == 4 + assert attrs.get("gen_ai.usage.cache_read.input_tokens") == 3 + assert attrs.get("gen_ai.usage.cache_creation.input_tokens") == 1 + assert attrs.get("gen_ai.usage.reasoning.output_tokens") == 2 async def test_async_messages_create_span_has_gen_ai_attributes(agent, exporter, anthropic_instrumentor): @@ -87,11 +92,15 @@ async def _fake(_self, *_args, **_kwargs): exporter.clear() assert len(spans) == 1 attrs = spans[0].attributes - assert attrs.get("gen_ai.system") == "anthropic" + assert attrs.get("gen_ai.provider.name") == "anthropic" + assert "gen_ai.system" not in attrs assert attrs.get("gen_ai.request.model") == "claude-3-haiku-20240307" assert attrs.get("gen_ai.response.id") == "msg_test" assert attrs.get("gen_ai.usage.input_tokens") == 2 assert attrs.get("gen_ai.usage.output_tokens") == 4 + assert attrs.get("gen_ai.usage.cache_read.input_tokens") == 3 + assert attrs.get("gen_ai.usage.cache_creation.input_tokens") == 1 + assert attrs.get("gen_ai.usage.reasoning.output_tokens") == 2 @@ -316,7 +325,8 @@ def test_messages_stream_sync_span_after_consume(agent, exporter, anthropic_inst exporter.clear() assert len(spans) == 1 attrs = spans[0].attributes - assert attrs.get("gen_ai.system") == "anthropic" + assert attrs.get("gen_ai.provider.name") == "anthropic" + assert "gen_ai.system" not in attrs assert attrs.get("gen_ai.usage.output_tokens") == 5 diff --git a/test/instrumentation/openai/openai_instrumentation_test.py b/test/instrumentation/openai/openai_instrumentation_test.py index b52a3fd..5b3f720 100644 --- a/test/instrumentation/openai/openai_instrumentation_test.py +++ b/test/instrumentation/openai/openai_instrumentation_test.py @@ -40,6 +40,11 @@ class _FakeChoice: class _FakeUsage: prompt_tokens = 3 completion_tokens = 7 + prompt_tokens_details = SimpleNamespace( + cached_tokens=1, + cache_creation_tokens=2, + ) + completion_tokens_details = SimpleNamespace(reasoning_tokens=4) class _FakeChatCompletion: @@ -72,6 +77,9 @@ def test_openai_span_has_gen_ai_attributes(agent, exporter, openai_instrumentor) assert attrs.get("gen_ai.response.id") == "chatcmpl-test" assert attrs.get("gen_ai.usage.input_tokens") == 3 assert attrs.get("gen_ai.usage.output_tokens") == 7 + assert attrs.get("gen_ai.usage.cache_read.input_tokens") == 1 + assert attrs.get("gen_ai.usage.cache_creation.input_tokens") == 2 + assert attrs.get("gen_ai.usage.reasoning.output_tokens") == 4