Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 29 additions & 0 deletions src/harness_sdk/instrumentation/anthropic/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,33 @@ def _evaluate_invocation(invocation: LLMInvocation) -> None:
logger.debug("Anthropic span evaluation error: %s", err)


def _set_if_present(target: Any, attr: str, value: Any) -> None:
if target is not None and value is not None:
setattr(target, attr, value)


def _copy_usage_semconv(invocation: LLMInvocation, usage: Any) -> None:
"""Populate optional GenAI usage attrs not modeled by LLMInvocation."""
if usage is None:
return

inf = getattr(invocation, "_inference_invocation", None)
_set_if_present(
inf,
"cache_read_input_tokens",
getattr(usage, "cache_read_input_tokens", None),
)
_set_if_present(
inf,
"cache_creation_input_tokens",
getattr(usage, "cache_creation_input_tokens", None),
)

reasoning_tokens = getattr(usage, "reasoning_output_tokens", None)
if reasoning_tokens is not None:
invocation.attributes["gen_ai.usage.reasoning.output_tokens"] = reasoning_tokens


def _build_invocation(
handler: TelemetryHandler,
params: Any,
Expand Down Expand Up @@ -129,6 +156,7 @@ def _wrapper(
logger.debug("Anthropic Messages.create (sync): returning stream wrapper")
return _MessagesStreamWrapper(result, handler, invocation, capture_content)
set_invocation_response_attributes(invocation, result, capture_content)
_copy_usage_semconv(invocation, getattr(result, "usage", None))
handler.stop_llm(invocation)
logger.debug("Anthropic Messages.create (sync): complete")
return result
Expand Down Expand Up @@ -173,6 +201,7 @@ async def _wrapper(
logger.debug("Anthropic Messages.create (async): returning stream wrapper")
return _AsyncMessagesStreamWrapper(result, handler, invocation, capture_content)
set_invocation_response_attributes(invocation, result, capture_content)
_copy_usage_semconv(invocation, getattr(result, "usage", None))
handler.stop_llm(invocation)
logger.debug("Anthropic Messages.create (async): complete")
return result
Expand Down
117 changes: 103 additions & 14 deletions src/harness_sdk/instrumentation/openai/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

from __future__ import annotations

import inspect
from typing import Any, Callable

import wrapt
Expand Down Expand Up @@ -49,6 +50,47 @@ def _get_handler() -> TelemetryHandler:
return TelemetryHandler()


def _create_chat_invocation(
handler: TelemetryHandler,
kwargs: dict[str, Any],
instance: Any,
capture_content: bool,
) -> Any:
"""Support both old and new openai_v2 helper signatures."""
params = inspect.signature(create_chat_invocation).parameters
if "handler" in params:
return create_chat_invocation(handler, kwargs, instance, capture_content)
return handler.start_llm(
create_chat_invocation(kwargs, instance, capture_content=capture_content)
)


def _stop_invocation(handler: TelemetryHandler, invocation: Any) -> None:
if hasattr(invocation, "stop"):
invocation.stop()
return
handler.stop_llm(invocation)


def _fail_invocation(handler: TelemetryHandler, invocation: Any, error: Error) -> None:
if hasattr(invocation, "fail"):
invocation.fail(error)
return
handler.fail_llm(invocation, error)


def _chat_stream_wrapper(
stream: Any,
handler: TelemetryHandler,
invocation: Any,
capture_content: bool,
) -> Any:
params = inspect.signature(ChatStreamWrapper).parameters
if "handler" in params:
return ChatStreamWrapper(stream, handler, invocation, capture_content)
return ChatStreamWrapper(stream, invocation, capture_content)


def _evaluate_invocation(invocation: Any) -> None:
"""Run Traceable policy evaluation against the live span; raise if blocked.

Expand Down Expand Up @@ -79,6 +121,47 @@ def _evaluate_invocation(invocation: Any) -> None:
logger.debug("OpenAI span evaluation error: %s", err)


def _get_value(obj: Any, key: str) -> Any:
if obj is None:
return None
if isinstance(obj, dict):
return obj.get(key)
return getattr(obj, key, None)


def _set_if_present(target: Any, attr: str, value: Any) -> None:
if target is not None and value is not None:
setattr(target, attr, value)


def _copy_usage_semconv(invocation: LLMInvocation, result: Any) -> None:
"""Populate optional GenAI usage attrs not modeled by LLMInvocation."""
usage = _get_value(result, "usage")
if usage is None:
return

prompt_details = _get_value(usage, "prompt_tokens_details")
completion_details = _get_value(usage, "completion_tokens_details")
inf = getattr(invocation, "_inference_invocation", None) or invocation

_set_if_present(
inf,
"cache_read_input_tokens",
_get_value(usage, "cache_read_input_tokens")
or _get_value(prompt_details, "cached_tokens"),
)
_set_if_present(
inf,
"cache_creation_input_tokens",
_get_value(usage, "cache_creation_input_tokens")
or _get_value(prompt_details, "cache_creation_tokens"),
)

reasoning_tokens = _get_value(completion_details, "reasoning_tokens")
if reasoning_tokens is not None:
invocation.attributes["gen_ai.usage.reasoning.output_tokens"] = reasoning_tokens


def _make_chat_create_sync(handler: TelemetryHandler, capture_content: bool) -> Callable[..., Any]:
logger.debug("OpenAI Completions.create (sync): capture_content=%s", capture_content)

Expand All @@ -92,30 +175,33 @@ def _wrapper(
logger.debug("OpenAI Completions.create (sync): gen_ai disabled, passthrough")
return wrapped(*args, **kwargs)

invocation = handler.start_llm(
create_chat_invocation(kwargs, instance, capture_content=capture_content)
)
invocation = _create_chat_invocation(handler, kwargs, instance, capture_content)
try:
_evaluate_invocation(invocation)
except ControlEvaluationBlocked:
handler.fail_llm(invocation, Error(message="blocked", type=ControlEvaluationBlocked))
_fail_invocation(
handler, invocation, Error(message="blocked", type=ControlEvaluationBlocked)
)
raise
try:
result = wrapped(*args, **kwargs)
parsed_result = result.parse() if hasattr(result, "parse") else result
if is_streaming(kwargs):
logger.debug("OpenAI Completions.create (sync): returning stream wrapper")
invocation.attributes["gen_ai.request.streaming"] = True
return ChatStreamWrapper(parsed_result, handler, invocation, capture_content)
return _chat_stream_wrapper(
parsed_result, handler, invocation, capture_content
)
_set_response_properties(invocation, parsed_result, capture_content)
handler.stop_llm(invocation)
_copy_usage_semconv(invocation, parsed_result)
_stop_invocation(handler, invocation)
logger.debug("OpenAI Completions.create (sync): complete")
return result
except ControlEvaluationBlocked:
raise
except Exception as exc: # pylint: disable=broad-except
logger.debug("OpenAI Completions.create (sync): exception=%s", exc)
handler.fail_llm(invocation, Error(message=str(exc), type=type(exc)))
_fail_invocation(handler, invocation, Error(message=str(exc), type=type(exc)))
raise

return _wrapper
Expand All @@ -134,30 +220,33 @@ async def _wrapper(
logger.debug("OpenAI AsyncCompletions.create (async): gen_ai disabled, passthrough")
return await wrapped(*args, **kwargs)

invocation = handler.start_llm(
create_chat_invocation(kwargs, instance, capture_content=capture_content)
)
invocation = _create_chat_invocation(handler, kwargs, instance, capture_content)
try:
_evaluate_invocation(invocation)
except ControlEvaluationBlocked:
handler.fail_llm(invocation, Error(message="blocked", type=ControlEvaluationBlocked))
_fail_invocation(
handler, invocation, Error(message="blocked", type=ControlEvaluationBlocked)
)
raise
try:
result = await wrapped(*args, **kwargs)
parsed_result = result.parse() if hasattr(result, "parse") else result
if is_streaming(kwargs):
logger.debug("OpenAI AsyncCompletions.create (async): returning stream wrapper")
invocation.attributes["gen_ai.request.streaming"] = True
return ChatStreamWrapper(parsed_result, handler, invocation, capture_content)
return _chat_stream_wrapper(
parsed_result, handler, invocation, capture_content
)
_set_response_properties(invocation, parsed_result, capture_content)
handler.stop_llm(invocation)
_copy_usage_semconv(invocation, parsed_result)
_stop_invocation(handler, invocation)
logger.debug("OpenAI AsyncCompletions.create (async): complete")
return result
except ControlEvaluationBlocked:
raise
except Exception as exc: # pylint: disable=broad-except
logger.debug("OpenAI AsyncCompletions.create (async): exception=%s", exc)
handler.fail_llm(invocation, Error(message=str(exc), type=type(exc)))
_fail_invocation(handler, invocation, Error(message=str(exc), type=type(exc)))
raise

return _wrapper
Expand Down
20 changes: 15 additions & 5 deletions test/instrumentation/anthropic/test_anthropic.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,9 @@ def anthropic_instrumentor():
class _FakeUsage:
input_tokens = 2
output_tokens = 4
cache_creation_input_tokens = None
cache_read_input_tokens = None
cache_creation_input_tokens = 1
cache_read_input_tokens = 3
reasoning_output_tokens = 2


class _FakeMessage:
Expand Down Expand Up @@ -62,12 +63,16 @@ def test_anthropic_span_has_gen_ai_attributes(agent, exporter, anthropic_instrum
exporter.clear()
assert len(spans) == 1
attrs = spans[0].attributes
assert attrs.get("gen_ai.system") == "anthropic"
assert attrs.get("gen_ai.provider.name") == "anthropic"
assert "gen_ai.system" not in attrs
assert attrs.get("gen_ai.operation.name") == "chat"
assert attrs.get("gen_ai.request.model") == "claude-3-haiku-20240307"
assert attrs.get("gen_ai.response.id") == "msg_test"
assert attrs.get("gen_ai.usage.input_tokens") == 2
assert attrs.get("gen_ai.usage.output_tokens") == 4
assert attrs.get("gen_ai.usage.cache_read.input_tokens") == 3
assert attrs.get("gen_ai.usage.cache_creation.input_tokens") == 1
assert attrs.get("gen_ai.usage.reasoning.output_tokens") == 2


async def test_async_messages_create_span_has_gen_ai_attributes(agent, exporter, anthropic_instrumentor):
Expand All @@ -87,11 +92,15 @@ async def _fake(_self, *_args, **_kwargs):
exporter.clear()
assert len(spans) == 1
attrs = spans[0].attributes
assert attrs.get("gen_ai.system") == "anthropic"
assert attrs.get("gen_ai.provider.name") == "anthropic"
assert "gen_ai.system" not in attrs
assert attrs.get("gen_ai.request.model") == "claude-3-haiku-20240307"
assert attrs.get("gen_ai.response.id") == "msg_test"
assert attrs.get("gen_ai.usage.input_tokens") == 2
assert attrs.get("gen_ai.usage.output_tokens") == 4
assert attrs.get("gen_ai.usage.cache_read.input_tokens") == 3
assert attrs.get("gen_ai.usage.cache_creation.input_tokens") == 1
assert attrs.get("gen_ai.usage.reasoning.output_tokens") == 2



Expand Down Expand Up @@ -316,7 +325,8 @@ def test_messages_stream_sync_span_after_consume(agent, exporter, anthropic_inst
exporter.clear()
assert len(spans) == 1
attrs = spans[0].attributes
assert attrs.get("gen_ai.system") == "anthropic"
assert attrs.get("gen_ai.provider.name") == "anthropic"
assert "gen_ai.system" not in attrs
assert attrs.get("gen_ai.usage.output_tokens") == 5


Expand Down
8 changes: 8 additions & 0 deletions test/instrumentation/openai/openai_instrumentation_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,11 @@ class _FakeChoice:
class _FakeUsage:
prompt_tokens = 3
completion_tokens = 7
prompt_tokens_details = SimpleNamespace(
cached_tokens=1,
cache_creation_tokens=2,
)
completion_tokens_details = SimpleNamespace(reasoning_tokens=4)


class _FakeChatCompletion:
Expand Down Expand Up @@ -72,6 +77,9 @@ def test_openai_span_has_gen_ai_attributes(agent, exporter, openai_instrumentor)
assert attrs.get("gen_ai.response.id") == "chatcmpl-test"
assert attrs.get("gen_ai.usage.input_tokens") == 3
assert attrs.get("gen_ai.usage.output_tokens") == 7
assert attrs.get("gen_ai.usage.cache_read.input_tokens") == 1
assert attrs.get("gen_ai.usage.cache_creation.input_tokens") == 2
assert attrs.get("gen_ai.usage.reasoning.output_tokens") == 4



Expand Down
Loading