diff --git a/src/layerlens/instrument/adapters/_base/__init__.py b/src/layerlens/instrument/adapters/_base/__init__.py new file mode 100644 index 00000000..c2775780 --- /dev/null +++ b/src/layerlens/instrument/adapters/_base/__init__.py @@ -0,0 +1,28 @@ +"""Shared base infrastructure for all instrument adapters. + +This package re-exports the public surface of the base adapter contract +(``AdapterInfo``, ``BaseAdapter``) and the resilience helpers used by +framework adapters to wrap callbacks in try/except boundaries so that +observability code never breaks the user's framework. +""" + +from __future__ import annotations + +from ._core import AdapterInfo, BaseAdapter +from .resilience import ( + DEFAULT_FAILURE_THRESHOLD, + HealthStatus, + ResilienceTracker, + get_default_for, + resilient_callback, +) + +__all__ = [ + "AdapterInfo", + "BaseAdapter", + "DEFAULT_FAILURE_THRESHOLD", + "HealthStatus", + "ResilienceTracker", + "get_default_for", + "resilient_callback", +] diff --git a/src/layerlens/instrument/adapters/_base.py b/src/layerlens/instrument/adapters/_base/_core.py similarity index 100% rename from src/layerlens/instrument/adapters/_base.py rename to src/layerlens/instrument/adapters/_base/_core.py diff --git a/src/layerlens/instrument/adapters/_base/resilience.py b/src/layerlens/instrument/adapters/_base/resilience.py new file mode 100644 index 00000000..0b57ab3f --- /dev/null +++ b/src/layerlens/instrument/adapters/_base/resilience.py @@ -0,0 +1,394 @@ +"""Per-callback try/except resilience wrapper for adapter callbacks. + +Mature framework adapters (CrewAI, AutoGen, OpenAI Agents, Google ADK, +Strands, Bedrock Agents) wrap every observability callback in a +try/except boundary so an exception in the adapter never escapes back +into the framework's own execution path. Lighter adapters historically +relied on outer wrappers — meaning a single bug in our callback could +crash a customer's agent run. + +This module exposes a shared decorator (``resilient_callback``) and a +per-adapter failure tracker (``ResilienceTracker``) so every framework +adapter can apply the SAME resilience contract: + +1. Catch ``Exception`` (NOT ``BaseException`` — KeyboardInterrupt / + SystemExit / GeneratorExit must still propagate). +2. Log the exception via the adapter's logger with + ``adapter_name``, ``callback_name``, and a truncated traceback. +3. Increment the adapter's ``_resilience_failures`` counter. +4. Return the framework's expected default value for the callback so + the framework continues uninterrupted. + +The failure counter is consulted by ``ResilienceTracker.health_status`` +which returns ``HealthStatus.DEGRADED`` once the adapter has crossed +``DEFAULT_FAILURE_THRESHOLD`` failures within the lifetime of the run. +Adapters surface this in their ``adapter_info().metadata`` block. + +This module is **adapter-internal infrastructure**. It is NOT public +API for end users — there are no version guarantees on the helpers +exposed here, only on the BaseAdapter contract. +""" + +from __future__ import annotations + +import enum +import logging +import functools +import threading +import traceback +from typing import Any, Dict, TypeVar, Callable, Optional, cast + +log = logging.getLogger(__name__) + + +# --------------------------------------------------------------------------- +# Public constants & enums +# --------------------------------------------------------------------------- + + +DEFAULT_FAILURE_THRESHOLD: int = 5 +"""Number of resilience failures before an adapter is marked DEGRADED. + +Chosen as a balance between fast detection (catch persistent bugs in +adapter wiring quickly) and not flapping on transient framework quirks +(a single bad event from a flaky upstream shouldn't degrade the entire +adapter). Adapters can override this via ``ResilienceTracker(threshold=...)``. +""" + + +_TRACEBACK_TRUNCATION: int = 4000 +"""Maximum characters of formatted traceback to log per failure. + +Prevents log spam from huge tracebacks (deep async stacks under +LangGraph or LlamaIndex can produce >10kB tracebacks per failure). +""" + + +class HealthStatus(str, enum.Enum): + """Adapter health states surfaced via ``adapter_info().metadata``.""" + + HEALTHY = "healthy" + DEGRADED = "degraded" + + +# --------------------------------------------------------------------------- +# Default-value table for known callbacks +# --------------------------------------------------------------------------- +# +# Many framework callback APIs require the callback to RETURN something +# (not just produce side-effects). For example: +# * Pydantic-AI ``after_model_request`` is expected to return the +# (possibly-mutated) response object — returning ``None`` would replace +# the LLM response with ``None`` and break the agent. +# * Pydantic-AI ``before_tool_execute`` returns the (possibly-mutated) +# args tuple — returning ``None`` would erase the tool args. +# * Google ADK plugin callbacks are documented as returning ``None`` +# (no override semantics) — ``None`` is the correct default. +# * Strands hook callbacks return ``None``. +# * boto3 event-system handlers (Bedrock Agents) return ``None``. +# +# When a callback that needs a passthrough (e.g. Pydantic-AI mutating +# hooks) raises, returning ``None`` would corrupt the framework's data +# flow. Adapters can pass ``passthrough_arg`` to ``resilient_callback`` +# so the wrapper returns that argument's value instead of the default. + +_DEFAULTS: Dict[str, Any] = { + # Google ADK plugin callbacks — all return None (no override hook). + "before_run_callback": None, + "after_run_callback": None, + "before_agent_callback": None, + "after_agent_callback": None, + "before_model_callback": None, + "after_model_callback": None, + "on_model_error_callback": None, + "before_tool_callback": None, + "after_tool_callback": None, + "on_tool_error_callback": None, + "on_event_callback": None, + # Strands hooks — sync, return None. + "_on_agent_initialized": None, + "_on_before_invocation": None, + "_on_after_invocation": None, + "_on_before_model": None, + "_on_after_model": None, + "_on_before_tool": None, + "_on_after_tool": None, + # OpenAI Agents TracingProcessor — return None. + "on_trace_start": None, + "on_trace_end": None, + "on_span_start": None, + "on_span_end": None, + "shutdown": None, + "force_flush": None, + # boto3 event handlers (Bedrock Agents). + "_before_invoke": None, + "_after_invoke": None, +} + + +def get_default_for(callback_name: str) -> Any: + """Return the framework-expected default for *callback_name*, or ``None``. + + The default of ``None`` is correct for the overwhelming majority of + callback APIs across instrumented frameworks (boto3 event system, + LlamaIndex span/event handlers, Strands hooks, Google ADK plugins, + OpenAI Agents TracingProcessor). For callbacks that need to return a + passthrough value (Pydantic-AI mutating hooks), use ``resilient_callback`` + with ``passthrough_arg`` instead. + """ + return _DEFAULTS.get(callback_name) + + +# --------------------------------------------------------------------------- +# Failure tracker +# --------------------------------------------------------------------------- + + +class ResilienceTracker: + """Per-adapter failure counter + degraded-health surface. + + Each framework adapter instantiates one tracker (in ``__init__``). + ``resilient_callback`` records failures via :meth:`record_failure`. + The adapter's ``adapter_info()`` reports current health via + :meth:`health_status` and a snapshot of recent failures via + :meth:`as_metadata`. + + The tracker is thread-safe: framework callbacks can fire from worker + threads (CrewAI dispatches across threads, AutoGen group chat fans + out, Bedrock boto3 hooks run in the request thread). + """ + + def __init__( + self, + adapter_name: str, + threshold: int = DEFAULT_FAILURE_THRESHOLD, + ) -> None: + if threshold < 1: + raise ValueError("threshold must be >= 1") + self._adapter_name = adapter_name + self._threshold = threshold + self._lock = threading.Lock() + self._total_failures: int = 0 + self._per_callback_failures: Dict[str, int] = {} + self._last_error: Optional[str] = None + self._last_callback: Optional[str] = None + + # -- recording -------------------------------------------------------- + + def record_failure(self, callback_name: str, exc: BaseException) -> None: + """Atomically record a failed callback invocation.""" + with self._lock: + self._total_failures += 1 + self._per_callback_failures[callback_name] = self._per_callback_failures.get(callback_name, 0) + 1 + self._last_callback = callback_name + self._last_error = f"{type(exc).__name__}: {exc}"[:500] + + def reset(self) -> None: + """Clear all failure state. Adapters call this on ``disconnect()``.""" + with self._lock: + self._total_failures = 0 + self._per_callback_failures.clear() + self._last_error = None + self._last_callback = None + + # -- queries ---------------------------------------------------------- + + @property + def total_failures(self) -> int: + with self._lock: + return self._total_failures + + @property + def threshold(self) -> int: + return self._threshold + + def health_status(self) -> HealthStatus: + """Return DEGRADED once total failures cross the threshold.""" + with self._lock: + return HealthStatus.DEGRADED if self._total_failures >= self._threshold else HealthStatus.HEALTHY + + def as_metadata(self) -> Dict[str, Any]: + """Snapshot for inclusion in ``adapter_info().metadata``.""" + with self._lock: + data: Dict[str, Any] = { + "resilience_status": ( + HealthStatus.DEGRADED.value if self._total_failures >= self._threshold else HealthStatus.HEALTHY.value + ), + "resilience_failures_total": self._total_failures, + "resilience_failure_threshold": self._threshold, + } + if self._per_callback_failures: + # Cap to top 20 so metadata payloads don't explode for + # adapters with many distinct callbacks. + top = sorted( + self._per_callback_failures.items(), + key=lambda kv: kv[1], + reverse=True, + )[:20] + data["resilience_failures_by_callback"] = dict(top) + if self._last_error is not None: + data["resilience_last_error"] = self._last_error + if self._last_callback is not None: + data["resilience_last_callback"] = self._last_callback + return data + + +# --------------------------------------------------------------------------- +# The decorator +# --------------------------------------------------------------------------- + + +F = TypeVar("F", bound=Callable[..., Any]) + + +def resilient_callback( + *, + callback_name: Optional[str] = None, + default: Any = None, + passthrough_arg: Optional[str] = None, + logger: Optional[logging.Logger] = None, +) -> Callable[[F], F]: + """Wrap a bound adapter method so observability errors never escape. + + The decorator must be applied to *instance methods* of an adapter + class. The adapter MUST expose: + + * ``self.name`` (or fall back to the class name) for logging context + * ``self._resilience`` (a :class:`ResilienceTracker`) for failure + recording + + On exception inside the wrapped method: + + 1. The exception is caught (excluding ``BaseException`` subclasses + like ``KeyboardInterrupt``). + 2. ``self._resilience.record_failure(name, exc)`` is invoked. + 3. The exception is logged via *logger* (or the adapter's module + logger) at WARNING level with a truncated traceback. + 4. The wrapper returns ``default``, OR the value of the keyword/positional + argument named *passthrough_arg* (so frameworks that expect a + mutating callback to return the passed-through value still work). + + Parameters + ---------- + callback_name: + Name to use in failure tracking and log records. Defaults to the + wrapped function's ``__name__``. + default: + Value to return when the wrapped method raises. + Use the framework's expected return type for this callback — + e.g. ``None`` for void handlers, ``""`` for handlers expected to + return a string, the original ``args`` tuple for mutating hooks. + For common callback names, the table in :func:`get_default_for` + provides the canonical default. + passthrough_arg: + If set, the wrapper returns the value of this argument (looked + up in *kwargs* first, then matched positionally if needed) on + failure. Use this for mutating hooks (Pydantic-AI + ``after_model_request`` returns the response object; + ``before_tool_execute`` returns the args tuple). When both + *passthrough_arg* and *default* are set, *passthrough_arg* wins + when the argument is present; otherwise *default* is used. + logger: + Logger to emit failure messages to. Defaults to the module + logger of the wrapped function. + """ + + def _decorate(func: F) -> F: + cb_name = callback_name or func.__name__ + # Resolve logger lazily — the wrapped function's module is the + # right logger context for warnings (so users can mute one + # adapter's resilience warnings without muting all of them). + bound_logger = logger or logging.getLogger(func.__module__) + + @functools.wraps(func) + def _wrapper(self: Any, *args: Any, **kwargs: Any) -> Any: + try: + return func(self, *args, **kwargs) + except Exception as exc: # noqa: BLE001 — intentional broad catch + _on_failure( + self, + cb_name=cb_name, + exc=exc, + bound_logger=bound_logger, + ) + return _resolve_return_value( + args=args, + kwargs=kwargs, + func=func, + passthrough_arg=passthrough_arg, + default=default, + ) + + return cast(F, _wrapper) + + return _decorate + + +# --------------------------------------------------------------------------- +# Internal helpers +# --------------------------------------------------------------------------- + + +def _on_failure( + adapter: Any, + *, + cb_name: str, + exc: BaseException, + bound_logger: logging.Logger, +) -> None: + """Record + log a callback failure on *adapter*'s resilience tracker. + + Best-effort: if the adapter doesn't have a tracker (programming + error), we still log the failure so the user sees it. + """ + adapter_name = getattr(adapter, "name", None) or type(adapter).__name__ + tracker = getattr(adapter, "_resilience", None) + if isinstance(tracker, ResilienceTracker): + tracker.record_failure(cb_name, exc) + + tb = "".join(traceback.format_exception(type(exc), exc, exc.__traceback__)) + if len(tb) > _TRACEBACK_TRUNCATION: + tb = tb[: _TRACEBACK_TRUNCATION - 24] + "\n... [traceback truncated]" + + bound_logger.warning( + "layerlens: resilient_callback caught exception in %s.%s: %s\n%s", + adapter_name, + cb_name, + exc, + tb, + ) + + +def _resolve_return_value( + *, + args: tuple[Any, ...], + kwargs: Dict[str, Any], + func: Callable[..., Any], + passthrough_arg: Optional[str], + default: Any, +) -> Any: + """Compute the value to return when a wrapped callback raises. + + If *passthrough_arg* names a parameter that was actually supplied, + return its value. Otherwise return *default*. + """ + if not passthrough_arg: + return default + + # Keyword-supplied arguments are the most common case for callback + # APIs (Pydantic-AI / Google ADK / Strands all use keyword-only + # callback signatures). + if passthrough_arg in kwargs: + return kwargs[passthrough_arg] + + # Fall back to positional resolution by inspecting the function's + # parameter list (skip ``self`` which is always position 0). + try: + params = func.__code__.co_varnames[: func.__code__.co_argcount] + except AttributeError: + return default + for index, name in enumerate(params): + if name == passthrough_arg and index >= 1 and index - 1 < len(args): + return args[index - 1] + + return default diff --git a/src/layerlens/instrument/adapters/frameworks/_base_framework.py b/src/layerlens/instrument/adapters/frameworks/_base_framework.py index 2e7b3e36..9de15a01 100644 --- a/src/layerlens/instrument/adapters/frameworks/_base_framework.py +++ b/src/layerlens/instrument/adapters/frameworks/_base_framework.py @@ -12,7 +12,7 @@ import threading from typing import Any, Dict, Optional -from .._base import AdapterInfo, BaseAdapter +from .._base import AdapterInfo, BaseAdapter, ResilienceTracker from ..._context import ( RunState, _pop_span, @@ -55,6 +55,18 @@ def __init__(self, client: Any, capture_config: Optional[CaptureConfig] = None) self._connected = False # Subclasses populate during connect() for adapter_info() metadata self._metadata: Dict[str, Any] = {} + # Resilience: every framework adapter gets a per-instance tracker so + # @resilient_callback wrappers can record failures without each + # subclass needing to opt in. Use ``self.name`` when the subclass + # has set it (class-level), otherwise fall back to the class name. + adapter_name = getattr(type(self), "name", None) or type(self).__name__ + self._resilience: ResilienceTracker = ResilienceTracker(adapter_name) + # Public, mypy-friendly alias of the failure counter — kept as a + # property-shaped int so external callers can read it without + # importing ResilienceTracker. + # (Intentionally not @property — keeping a plain attribute would + # have masked the tracker's lock; readers should call + # ``self._resilience.total_failures`` for the up-to-date count.) # ------------------------------------------------------------------ # Per-run state (ContextVar-based isolation for concurrent runs) @@ -303,15 +315,24 @@ def disconnect(self) -> None: self._on_disconnect() self._connected = False self._metadata.clear() + # Reset resilience state so a reconnect starts from a healthy + # baseline. Failures from a previous run shouldn't degrade a + # fresh adapter session. + self._resilience.reset() def _on_disconnect(self) -> None: """Override to clean up framework-specific resources (unsubscribe, restore, etc.).""" pass def adapter_info(self) -> AdapterInfo: + # Merge live resilience snapshot into the metadata block so + # ``adapter_info().metadata['resilience_status']`` reports + # HEALTHY / DEGRADED to monitoring code without each subclass + # having to remember to do it. + merged_metadata: Dict[str, Any] = {**self._metadata, **self._resilience.as_metadata()} return AdapterInfo( name=self.name, adapter_type="framework", connected=self._connected, - metadata=self._metadata, + metadata=merged_metadata, ) diff --git a/src/layerlens/instrument/adapters/frameworks/agno.py b/src/layerlens/instrument/adapters/frameworks/agno.py index 8aa4e027..ba10fcd2 100644 --- a/src/layerlens/instrument/adapters/frameworks/agno.py +++ b/src/layerlens/instrument/adapters/frameworks/agno.py @@ -3,6 +3,7 @@ import logging from typing import Any, Dict, List, Optional +from .._base import resilient_callback from ._utils import safe_serialize from ._base_framework import FrameworkAdapter from ..._capture_config import CaptureConfig @@ -245,6 +246,7 @@ async def _traced_arun(*args: Any, **kwargs: Any) -> Any: # Run lifecycle # ------------------------------------------------------------------ + @resilient_callback(callback_name="_on_run_start") def _on_run_start(self, agent: Any, input_data: Any) -> None: root = self._get_root_span() name = _agent_name(agent) @@ -255,6 +257,7 @@ def _on_run_start(self, agent: Any, input_data: Any) -> None: self._set_if_capturing(payload, "input", safe_serialize(input_data)) self._emit("agent.input", payload, span_id=root, parent_span_id=None, span_name=f"agno:{name}") + @resilient_callback(callback_name="_on_run_end") def _on_run_end(self, agent: Any, result: Any, error: Optional[Exception]) -> None: self._emit_output(agent, result, error) if result is not None: diff --git a/src/layerlens/instrument/adapters/frameworks/bedrock_agents.py b/src/layerlens/instrument/adapters/frameworks/bedrock_agents.py index 96f18829..4d928b17 100644 --- a/src/layerlens/instrument/adapters/frameworks/bedrock_agents.py +++ b/src/layerlens/instrument/adapters/frameworks/bedrock_agents.py @@ -3,6 +3,7 @@ import logging from typing import Any, Set, Dict, Optional +from .._base import resilient_callback from ._utils import safe_serialize from ._base_framework import FrameworkAdapter from ..._capture_config import CaptureConfig @@ -119,63 +120,67 @@ def _on_disconnect(self) -> None: # boto3 event hooks # ------------------------------------------------------------------ + @resilient_callback(callback_name="_before_invoke") def _before_invoke(self, **kwargs: Any) -> None: if not self._connected: return - try: - params = kwargs.get("params", {}) - agent_id = params.get("agentId", "unknown") + params = kwargs.get("params", {}) + agent_id = params.get("agentId", "unknown") - self._begin_run() - self._start_timer("invoke") + self._begin_run() + self._start_timer("invoke") - self._emit_agent_config(agent_id, params) + self._emit_agent_config(agent_id, params) - root = self._get_root_span() - payload = self._payload( - agent_id=agent_id, - session_id=params.get("sessionId"), - enable_trace=params.get("enableTrace", False), - ) - self._set_if_capturing(payload, "input", params.get("inputText")) - self._emit( - "agent.input", - payload, - span_id=root, - parent_span_id=None, - span_name="bedrock.invoke_agent", - ) - except Exception: - log.warning("layerlens: error in _before_invoke", exc_info=True) + root = self._get_root_span() + payload = self._payload( + agent_id=agent_id, + session_id=params.get("sessionId"), + enable_trace=params.get("enableTrace", False), + ) + self._set_if_capturing(payload, "input", params.get("inputText")) + self._emit( + "agent.input", + payload, + span_id=root, + parent_span_id=None, + span_name="bedrock.invoke_agent", + ) def _after_invoke(self, **kwargs: Any) -> None: + # _end_run() MUST run regardless of telemetry failures (otherwise + # collector/span ContextVars leak across boto3 calls). Keep the + # ``finally`` here at the OUTER level and delegate the resilient + # body to a helper wrapped with @resilient_callback. if not self._connected: return try: - parsed = kwargs.get("parsed", {}) - latency_ms = self._stop_timer("invoke") - output = _extract_completion(parsed) - - root = self._get_root_span() - payload = self._payload(session_id=parsed.get("sessionId")) - if latency_ms is not None: - payload["latency_ms"] = latency_ms - self._set_if_capturing(payload, "output", output) - self._emit( - "agent.output", - payload, - span_id=root, - parent_span_id=None, - span_name="bedrock.invoke_agent", - ) - - for step in _collect_steps(parsed): - self._process_step(step) - except Exception: - log.warning("layerlens: error in _after_invoke", exc_info=True) + self._after_invoke_body(**kwargs) finally: self._end_run() + @resilient_callback(callback_name="_after_invoke") + def _after_invoke_body(self, **kwargs: Any) -> None: + parsed = kwargs.get("parsed", {}) + latency_ms = self._stop_timer("invoke") + output = _extract_completion(parsed) + + root = self._get_root_span() + payload = self._payload(session_id=parsed.get("sessionId")) + if latency_ms is not None: + payload["latency_ms"] = latency_ms + self._set_if_capturing(payload, "output", output) + self._emit( + "agent.output", + payload, + span_id=root, + parent_span_id=None, + span_name="bedrock.invoke_agent", + ) + + for step in _collect_steps(parsed): + self._process_step(step) + # ------------------------------------------------------------------ # Trace step processing # ------------------------------------------------------------------ diff --git a/src/layerlens/instrument/adapters/frameworks/google_adk.py b/src/layerlens/instrument/adapters/frameworks/google_adk.py index 9c494050..396b4380 100644 --- a/src/layerlens/instrument/adapters/frameworks/google_adk.py +++ b/src/layerlens/instrument/adapters/frameworks/google_adk.py @@ -4,6 +4,7 @@ import logging from typing import Any, Dict, Optional +from .._base import resilient_callback from ._utils import safe_serialize from ..._collector import TraceCollector from ._base_framework import FrameworkAdapter @@ -134,6 +135,7 @@ def _end_trace(self) -> None: # Run lifecycle handlers (called from plugin) # ------------------------------------------------------------------ + @resilient_callback(callback_name="_on_before_run") def _on_before_run(self, invocation_context: Any) -> None: span_id = self._new_span_id() with self._lock: @@ -176,6 +178,7 @@ def _on_before_run(self, invocation_context: Any) -> None: self._set_if_capturing(payload, "input", safe_serialize(user_content)) self._fire("agent.input", payload, span_id=span_id, span_name=agent_name) + @resilient_callback(callback_name="_on_after_run") def _on_after_run(self, invocation_context: Any) -> None: latency_ms = self._tock("run") span_id = self._run_span_id or self._new_span_id() @@ -191,6 +194,7 @@ def _on_after_run(self, invocation_context: Any) -> None: # Agent lifecycle handlers # ------------------------------------------------------------------ + @resilient_callback(callback_name="_on_before_agent") def _on_before_agent(self, agent: Any, callback_context: Any) -> None: name = _agent_name(agent) span_id = self._new_span_id() @@ -206,6 +210,7 @@ def _on_before_agent(self, agent: Any, callback_context: Any) -> None: self._set_if_capturing(payload, "input", safe_serialize(user_content)) self._fire("agent.input", payload, span_id=span_id, parent_span_id=self._run_span_id, span_name=f"agent:{name}") + @resilient_callback(callback_name="_on_after_agent") def _on_after_agent(self, agent: Any, callback_context: Any) -> None: name = _agent_name(agent) latency_ms = self._tock(f"agent:{name}") @@ -225,10 +230,12 @@ def _on_after_agent(self, agent: Any, callback_context: Any) -> None: # Model lifecycle handlers # ------------------------------------------------------------------ + @resilient_callback(callback_name="_on_before_model") def _on_before_model(self, callback_context: Any, llm_request: Any) -> None: agent_name = getattr(callback_context, "agent_name", None) or "unknown" self._tick(f"model:{agent_name}") + @resilient_callback(callback_name="_on_after_model") def _on_after_model(self, callback_context: Any, llm_response: Any) -> None: agent_name = getattr(callback_context, "agent_name", None) or "unknown" latency_ms = self._tock(f"model:{agent_name}") @@ -267,6 +274,7 @@ def _on_after_model(self, callback_context: Any, llm_response: Any) -> None: cost_payload["model"] = str(model) self._fire("cost.record", cost_payload, span_id=span_id, parent_span_id=parent) + @resilient_callback(callback_name="_on_model_error") def _on_model_error(self, callback_context: Any, llm_request: Any, error: Exception) -> None: agent_name = getattr(callback_context, "agent_name", None) or "unknown" self._tock(f"model:{agent_name}") # clear timer @@ -280,11 +288,13 @@ def _on_model_error(self, callback_context: Any, llm_request: Any, error: Except # Tool lifecycle handlers # ------------------------------------------------------------------ + @resilient_callback(callback_name="_on_before_tool") def _on_before_tool(self, tool: Any, tool_args: Any, tool_context: Any) -> None: tool_name = getattr(tool, "name", None) or "unknown" call_id = getattr(tool_context, "function_call_id", None) or tool_name self._tick(f"tool:{call_id}") + @resilient_callback(callback_name="_on_after_tool") def _on_after_tool(self, tool: Any, tool_args: Any, tool_context: Any, result: Any) -> None: tool_name = getattr(tool, "name", None) or "unknown" call_id = getattr(tool_context, "function_call_id", None) or tool_name @@ -303,6 +313,7 @@ def _on_after_tool(self, tool: Any, tool_args: Any, tool_context: Any, result: A self._set_if_capturing(result_payload, "output", safe_serialize(result)) self._fire("tool.result", result_payload, span_id=span_id, parent_span_id=parent, span_name=f"tool:{tool_name}") + @resilient_callback(callback_name="_on_tool_error") def _on_tool_error(self, tool: Any, tool_args: Any, tool_context: Any, error: Exception) -> None: tool_name = getattr(tool, "name", None) or "unknown" call_id = getattr(tool_context, "function_call_id", None) or tool_name @@ -317,6 +328,7 @@ def _on_tool_error(self, tool: Any, tool_args: Any, tool_context: Any, error: Ex # Event callback # ------------------------------------------------------------------ + @resilient_callback(callback_name="_on_event") def _on_event(self, invocation_context: Any, event: Any) -> None: # Detect agent handoffs from event actions actions = getattr(event, "actions", None) @@ -378,86 +390,58 @@ def _make_plugin(adapter: GoogleADKAdapter) -> Any: if _BasePlugin is None: raise ImportError("google-adk is required for GoogleADKAdapter") - class _LayerLensPlugin(_BasePlugin): + # The adapter's ``_on_*`` methods are wrapped with ``@resilient_callback`` + # which catches exceptions, logs them, and increments the resilience + # tracker — so the plugin shims can call them directly without any + # additional try/except. The shims still need to ``return None`` + # (the ADK plugin contract requires None to mean "don't override"). + class _LayerLensPlugin(_BasePlugin): # type: ignore[misc, valid-type] def __init__(self) -> None: super().__init__(name="layerlens") async def before_run_callback(self, *, invocation_context: Any) -> None: - try: - adapter._on_before_run(invocation_context) - except Exception: - log.warning("layerlens: error in before_run_callback", exc_info=True) + adapter._on_before_run(invocation_context) return None async def after_run_callback(self, *, invocation_context: Any) -> None: - try: - adapter._on_after_run(invocation_context) - except Exception: - log.warning("layerlens: error in after_run_callback", exc_info=True) + adapter._on_after_run(invocation_context) async def before_agent_callback(self, *, agent: Any, callback_context: Any) -> None: - try: - adapter._on_before_agent(agent, callback_context) - except Exception: - log.warning("layerlens: error in before_agent_callback", exc_info=True) + adapter._on_before_agent(agent, callback_context) return None async def after_agent_callback(self, *, agent: Any, callback_context: Any) -> None: - try: - adapter._on_after_agent(agent, callback_context) - except Exception: - log.warning("layerlens: error in after_agent_callback", exc_info=True) + adapter._on_after_agent(agent, callback_context) return None async def before_model_callback(self, *, callback_context: Any, llm_request: Any) -> None: - try: - adapter._on_before_model(callback_context, llm_request) - except Exception: - log.warning("layerlens: error in before_model_callback", exc_info=True) + adapter._on_before_model(callback_context, llm_request) return None async def after_model_callback(self, *, callback_context: Any, llm_response: Any) -> None: - try: - adapter._on_after_model(callback_context, llm_response) - except Exception: - log.warning("layerlens: error in after_model_callback", exc_info=True) + adapter._on_after_model(callback_context, llm_response) return None async def on_model_error_callback(self, *, callback_context: Any, llm_request: Any, error: Exception) -> None: - try: - adapter._on_model_error(callback_context, llm_request, error) - except Exception: - log.warning("layerlens: error in on_model_error_callback", exc_info=True) + adapter._on_model_error(callback_context, llm_request, error) return None async def before_tool_callback(self, *, tool: Any, tool_args: Any, tool_context: Any) -> None: - try: - adapter._on_before_tool(tool, tool_args, tool_context) - except Exception: - log.warning("layerlens: error in before_tool_callback", exc_info=True) + adapter._on_before_tool(tool, tool_args, tool_context) return None async def after_tool_callback(self, *, tool: Any, tool_args: Any, tool_context: Any, result: Any) -> None: - try: - adapter._on_after_tool(tool, tool_args, tool_context, result) - except Exception: - log.warning("layerlens: error in after_tool_callback", exc_info=True) + adapter._on_after_tool(tool, tool_args, tool_context, result) return None async def on_tool_error_callback( self, *, tool: Any, tool_args: Any, tool_context: Any, error: Exception ) -> None: - try: - adapter._on_tool_error(tool, tool_args, tool_context, error) - except Exception: - log.warning("layerlens: error in on_tool_error_callback", exc_info=True) + adapter._on_tool_error(tool, tool_args, tool_context, error) return None async def on_event_callback(self, *, invocation_context: Any, event: Any) -> None: - try: - adapter._on_event(invocation_context, event) - except Exception: - log.warning("layerlens: error in on_event_callback", exc_info=True) + adapter._on_event(invocation_context, event) return None return _LayerLensPlugin() diff --git a/src/layerlens/instrument/adapters/frameworks/haystack.py b/src/layerlens/instrument/adapters/frameworks/haystack.py index 10ee412f..a24c8bf6 100644 --- a/src/layerlens/instrument/adapters/frameworks/haystack.py +++ b/src/layerlens/instrument/adapters/frameworks/haystack.py @@ -6,6 +6,7 @@ from typing import Any, Dict, Iterator, Optional from contextlib import contextmanager +from .._base import resilient_callback from ._utils import safe_serialize from ._base_framework import FrameworkAdapter from ..._capture_config import CaptureConfig @@ -70,15 +71,13 @@ def _on_disconnect(self) -> None: # Span handlers (called by _LayerLensSpan._finish) # ------------------------------------------------------------------ + @resilient_callback(callback_name="_on_span_end") def _on_span_end(self, span: _LayerLensSpan) -> None: elapsed_ms = (time.time_ns() - span._start_ns) / 1_000_000 - try: - if span._is_pipeline: - self._on_pipeline_end(span, elapsed_ms) - elif span._operation_name == "haystack.component.run": - self._on_component_end(span, elapsed_ms) - except Exception: - log.warning("layerlens: error emitting Haystack span", exc_info=True) + if span._is_pipeline: + self._on_pipeline_end(span, elapsed_ms) + elif span._operation_name == "haystack.component.run": + self._on_component_end(span, elapsed_ms) def _on_pipeline_end(self, span: _LayerLensSpan, elapsed_ms: float) -> None: tags = span._all_tags() diff --git a/src/layerlens/instrument/adapters/frameworks/langfuse.py b/src/layerlens/instrument/adapters/frameworks/langfuse.py index cdfe4611..35886f3a 100644 --- a/src/layerlens/instrument/adapters/frameworks/langfuse.py +++ b/src/layerlens/instrument/adapters/frameworks/langfuse.py @@ -4,6 +4,7 @@ import logging from typing import Any, Dict, List, Optional +from .._base import resilient_callback from ._utils import truncate, new_span_id from ..._collector import TraceCollector from ._base_framework import FrameworkAdapter @@ -174,15 +175,14 @@ def import_traces( imported = 0 for trace_summary in traces: - try: - self._import_single_trace(trace_summary) + # ``_import_single_trace`` is wrapped with @resilient_callback + # — a malformed trace becomes a logged warning + failure + # counter increment, NOT a halt of the batch import. Track + # success via the success counter we maintain manually. + before = self._resilience.total_failures + self._import_single_trace(trace_summary) + if self._resilience.total_failures == before: imported += 1 - except Exception: - log.warning( - "layerlens: failed to import Langfuse trace %s", - trace_summary.get("id", "?"), - exc_info=True, - ) # Advance cursor to the most recent trace timestamp latest = traces[0].get("updatedAt") or traces[0].get("timestamp") @@ -192,6 +192,7 @@ def import_traces( log.info("layerlens: imported %d Langfuse traces", imported) return imported + @resilient_callback(callback_name="_import_single_trace") def _import_single_trace(self, trace_summary: Dict[str, Any]) -> None: """Fetch a full trace and emit events via TraceCollector.""" trace_id = trace_summary["id"] @@ -219,48 +220,17 @@ def _import_single_trace(self, trace_summary: Dict[str, Any]) -> None: span_name=trace.get("name"), ) - # Process observations (generations, spans, events) + # Process observations (generations, spans, events). Inner call + # is wrapped with @resilient_callback — a malformed observation + # is logged + counted, not propagated. observations = trace.get("observations", []) for obs in observations: - try: - self._import_observation(collector, obs, root_span_id) - except Exception: - log.warning( - "layerlens: failed to import observation %s", - obs.get("id", "?"), - exc_info=True, - ) + self._import_observation(collector, obs, root_span_id) - # Scores (Langfuse "annotations") — both human annotations and LLM-as-judge - # scores land in the same collection. Emit them as evaluation.result so - # the migration path preserves all grading signal. + # Scores (Langfuse "annotations") — wrapped in a resilient + # helper so one bad score doesn't abort the whole trace import. for score in trace.get("scores", []) or []: - try: - score_payload: Dict[str, Any] = { - "framework": "langfuse", - "langfuse_trace_id": trace_id, - "name": score.get("name"), - "value": score.get("value"), - "source": score.get("source"), - "data_type": score.get("dataType"), - "observation_id": score.get("observationId"), - } - comment = score.get("comment") - if comment: - score_payload["comment"] = truncate(str(comment), max_len=2000) - # Session clustering: Langfuse groups related traces via sessionId. - # Carry it through so downstream session-level analytics work. - session_id = score.get("sessionId") or trace.get("sessionId") - if session_id: - score_payload["session_id"] = session_id - collector.emit( - "evaluation.result", - score_payload, - span_id=new_span_id(), - parent_span_id=root_span_id, - ) - except Exception: - log.warning("layerlens: failed to import score", exc_info=True) + self._import_score(collector, trace, trace_id, root_span_id, score) # Emit agent.output from trace output trace_output = trace.get("output") @@ -283,6 +253,41 @@ def _import_single_trace(self, trace_summary: Dict[str, Any]) -> None: collector.flush() + @resilient_callback(callback_name="_import_score") + def _import_score( + self, + collector: TraceCollector, + trace: Dict[str, Any], + trace_id: str, + root_span_id: str, + score: Dict[str, Any], + ) -> None: + """Emit one Langfuse score as a LayerLens evaluation.result event.""" + score_payload: Dict[str, Any] = { + "framework": "langfuse", + "langfuse_trace_id": trace_id, + "name": score.get("name"), + "value": score.get("value"), + "source": score.get("source"), + "data_type": score.get("dataType"), + "observation_id": score.get("observationId"), + } + comment = score.get("comment") + if comment: + score_payload["comment"] = truncate(str(comment), max_len=2000) + # Session clustering: Langfuse groups related traces via sessionId. + # Carry it through so downstream session-level analytics work. + session_id = score.get("sessionId") or trace.get("sessionId") + if session_id: + score_payload["session_id"] = session_id + collector.emit( + "evaluation.result", + score_payload, + span_id=new_span_id(), + parent_span_id=root_span_id, + ) + + @resilient_callback(callback_name="_import_observation") def _import_observation( self, collector: TraceCollector, @@ -472,21 +477,26 @@ def export_traces( exported = 0 for trace_id, events in events_by_trace.items(): - try: - batch = self._build_ingestion_batch(trace_id, events) - if batch: - self._post_ingestion(batch) - exported += 1 - except Exception: - log.warning( - "layerlens: failed to export trace %s to Langfuse", - trace_id, - exc_info=True, - ) + before = self._resilience.total_failures + self._export_single_trace(trace_id, events) + if self._resilience.total_failures == before: + exported += 1 log.info("layerlens: exported %d traces to Langfuse", exported) return exported + @resilient_callback(callback_name="_export_single_trace") + def _export_single_trace(self, trace_id: str, events: List[Dict[str, Any]]) -> None: + """Build + POST a single trace's ingestion batch. + + Wrapped with @resilient_callback so a single bad trace doesn't + abort the rest of the batch export. The success/failure of each + trace is tracked via the resilience tracker. + """ + batch = self._build_ingestion_batch(trace_id, events) + if batch: + self._post_ingestion(batch) + def _build_ingestion_batch( self, trace_id: str, diff --git a/src/layerlens/instrument/adapters/frameworks/llamaindex.py b/src/layerlens/instrument/adapters/frameworks/llamaindex.py index 53f1071f..008e02a4 100644 --- a/src/layerlens/instrument/adapters/frameworks/llamaindex.py +++ b/src/layerlens/instrument/adapters/frameworks/llamaindex.py @@ -4,6 +4,7 @@ import logging from typing import Any, Dict, List, Optional +from .._base import resilient_callback from ._utils import safe_serialize from ..._collector import TraceCollector from ._base_framework import FrameworkAdapter @@ -166,6 +167,7 @@ def _flush_all(self) -> None: # Span lifecycle (called by the thin span handler) # ------------------------------------------------------------------ + @resilient_callback(callback_name="_on_span_enter") def _on_span_enter(self, id_: str, parent_span_id: Optional[str]) -> Any: with self._lock: span = _BaseSpan(id_=id_, parent_id=parent_span_id) @@ -175,6 +177,7 @@ def _on_span_enter(self, id_: str, parent_span_id: Optional[str]) -> Any: self._collectors[id_] = TraceCollector(self._client, self._config) return span + @resilient_callback(callback_name="_on_span_exit") def _on_span_exit(self, id_: str) -> Any: with self._lock: span = self._open_spans.get(id_) @@ -184,6 +187,7 @@ def _on_span_exit(self, id_: str) -> Any: collector.flush() return span + @resilient_callback(callback_name="_on_span_drop") def _on_span_drop(self, id_: str) -> Any: return self._on_span_exit(id_) # same cleanup @@ -191,23 +195,29 @@ def _on_span_drop(self, id_: str) -> Any: # Event dispatch (called by the thin event handler) # ------------------------------------------------------------------ + @resilient_callback(callback_name="_handle_event") def _handle_event(self, event: Any) -> None: - try: - handler_name = self._EVENT_DISPATCH.get(type(event).__name__) - if handler_name is not None: - getattr(self, handler_name)(event) - except Exception: - log.warning("layerlens: error in LlamaIndex event handler", exc_info=True) + # Per-event handlers are individually wrapped (defense-in-depth) + # so each failed handler is recorded with its real name in the + # resilience tracker rather than being aggregated under + # ``_handle_event``. The outer wrapper here covers any failure + # in the dispatch/lookup logic itself (unknown event class, + # ``getattr`` raising, etc.). + handler_name = self._EVENT_DISPATCH.get(type(event).__name__) + if handler_name is not None: + getattr(self, handler_name)(event) # ------------------------------------------------------------------ # LLM Chat # ------------------------------------------------------------------ + @resilient_callback(callback_name="_on_llm_chat_start") def _on_llm_chat_start(self, event: Any) -> None: span_id = getattr(event, "span_id", None) if span_id: self._llm_start_times[span_id] = time.time() + @resilient_callback(callback_name="_on_llm_chat_end") def _on_llm_chat_end(self, event: Any) -> None: span_id = getattr(event, "span_id", None) response = getattr(event, "response", None) @@ -246,11 +256,13 @@ def _on_llm_chat_end(self, event: Any) -> None: # LLM Completion # ------------------------------------------------------------------ + @resilient_callback(callback_name="_on_llm_completion_start") def _on_llm_completion_start(self, event: Any) -> None: span_id = getattr(event, "span_id", None) if span_id: self._llm_start_times[span_id] = time.time() + @resilient_callback(callback_name="_on_llm_completion_end") def _on_llm_completion_end(self, event: Any) -> None: span_id = getattr(event, "span_id", None) response = getattr(event, "response", None) @@ -289,6 +301,7 @@ def _on_llm_completion_end(self, event: Any) -> None: # Tool calls # ------------------------------------------------------------------ + @resilient_callback(callback_name="_on_tool_call") def _on_tool_call(self, event: Any) -> None: span_id = getattr(event, "span_id", None) tool = getattr(event, "tool", None) @@ -310,6 +323,7 @@ def _on_tool_call(self, event: Any) -> None: # Retrieval # ------------------------------------------------------------------ + @resilient_callback(callback_name="_on_retrieval_start") def _on_retrieval_start(self, event: Any) -> None: span_id = getattr(event, "span_id", None) payload = self._payload(tool_name="retrieval") @@ -319,6 +333,7 @@ def _on_retrieval_start(self, event: Any) -> None: payload["input"] = str(query) self._fire("tool.call", payload, span_id=span_id, span_name="retrieval") + @resilient_callback(callback_name="_on_retrieval_end") def _on_retrieval_end(self, event: Any) -> None: span_id = getattr(event, "span_id", None) nodes = getattr(event, "nodes", None) @@ -333,6 +348,7 @@ def _on_retrieval_end(self, event: Any) -> None: # Embeddings # ------------------------------------------------------------------ + @resilient_callback(callback_name="_on_embedding_start") def _on_embedding_start(self, event: Any) -> None: # When L3 model metadata is suppressed, skip the costly embedding serialization # — bulk ingestion runs fire thousands of these events and the collector @@ -346,6 +362,7 @@ def _on_embedding_start(self, event: Any) -> None: payload["model"] = model self._fire("model.invoke", payload, span_id=span_id, span_name="embedding") + @resilient_callback(callback_name="_on_embedding_end") def _on_embedding_end(self, event: Any) -> None: if not self._config.l3_model_metadata: return @@ -380,6 +397,7 @@ def _on_embedding_end(self, event: Any) -> None: # Query # ------------------------------------------------------------------ + @resilient_callback(callback_name="_on_query_start") def _on_query_start(self, event: Any) -> None: span_id = getattr(event, "span_id", None) payload = self._payload() @@ -389,6 +407,7 @@ def _on_query_start(self, event: Any) -> None: payload["input"] = str(query) self._fire("agent.input", payload, span_id=span_id, span_name="query") + @resilient_callback(callback_name="_on_query_end") def _on_query_end(self, event: Any) -> None: span_id = getattr(event, "span_id", None) payload = self._payload(status="ok") @@ -402,6 +421,7 @@ def _on_query_end(self, event: Any) -> None: # Agent steps # ------------------------------------------------------------------ + @resilient_callback(callback_name="_on_agent_step_start") def _on_agent_step_start(self, event: Any) -> None: span_id = getattr(event, "span_id", None) payload = self._payload() @@ -414,6 +434,7 @@ def _on_agent_step_start(self, event: Any) -> None: payload["input"] = safe_serialize(step_input) self._fire("agent.input", payload, span_id=span_id, span_name="agent_step") + @resilient_callback(callback_name="_on_agent_step_end") def _on_agent_step_end(self, event: Any) -> None: span_id = getattr(event, "span_id", None) payload = self._payload(status="ok") @@ -427,6 +448,7 @@ def _on_agent_step_end(self, event: Any) -> None: # Rerank # ------------------------------------------------------------------ + @resilient_callback(callback_name="_on_rerank_start") def _on_rerank_start(self, event: Any) -> None: span_id = getattr(event, "span_id", None) payload = self._payload(tool_name="rerank") @@ -438,6 +460,7 @@ def _on_rerank_start(self, event: Any) -> None: payload["top_n"] = top_n self._fire("tool.call", payload, span_id=span_id, span_name="rerank") + @resilient_callback(callback_name="_on_rerank_end") def _on_rerank_end(self, event: Any) -> None: span_id = getattr(event, "span_id", None) payload = self._payload(tool_name="rerank") @@ -450,6 +473,7 @@ def _on_rerank_end(self, event: Any) -> None: # Exceptions # ------------------------------------------------------------------ + @resilient_callback(callback_name="_on_exception") def _on_exception(self, event: Any) -> None: span_id = getattr(event, "span_id", None) exc = getattr(event, "exception", None) diff --git a/src/layerlens/instrument/adapters/frameworks/openai_agents.py b/src/layerlens/instrument/adapters/frameworks/openai_agents.py index 9acd00c0..a354824c 100644 --- a/src/layerlens/instrument/adapters/frameworks/openai_agents.py +++ b/src/layerlens/instrument/adapters/frameworks/openai_agents.py @@ -4,6 +4,7 @@ from typing import Any, Dict, Optional from datetime import datetime +from .._base import resilient_callback from ._utils import safe_serialize from ..._context import RunState, _current_run, _current_collector from ..._collector import TraceCollector @@ -83,52 +84,46 @@ def _on_disconnect(self) -> None: # TracingProcessor interface # ------------------------------------------------------------------ + @resilient_callback(callback_name="on_trace_start") def on_trace_start(self, trace: Any) -> None: - try: - # OA manages multiple concurrent traces from one processor, - # so we create RunState directly instead of using _begin_run - # (which would pollute ContextVars for the next trace). - run = RunState( - collector=TraceCollector(self._client, self._config), - root_span_id=self._new_span_id(), - ) - with self._lock: - self._trace_runs[trace.trace_id] = run - except Exception: - log.warning("layerlens: error in on_trace_start", exc_info=True) + # OA manages multiple concurrent traces from one processor, + # so we create RunState directly instead of using _begin_run + # (which would pollute ContextVars for the next trace). + run = RunState( + collector=TraceCollector(self._client, self._config), + root_span_id=self._new_span_id(), + ) + with self._lock: + self._trace_runs[trace.trace_id] = run + @resilient_callback(callback_name="on_trace_end") def on_trace_end(self, trace: Any) -> None: - try: - with self._lock: - run = self._trace_runs.pop(trace.trace_id, None) - if run is not None: - run.collector.flush() - except Exception: - log.warning("layerlens: error in on_trace_end", exc_info=True) + with self._lock: + run = self._trace_runs.pop(trace.trace_id, None) + if run is not None: + run.collector.flush() def on_span_start(self, span: Any) -> None: pass + @resilient_callback(callback_name="on_span_end") def on_span_end(self, span: Any) -> None: + with self._lock: + run = self._trace_runs.get(span.trace_id) + if run is None: + return + + # Temporarily set both ContextVars so _emit and providers work. + run_token = _current_run.set(run) + col_token = _current_collector.set(run.collector) try: - with self._lock: - run = self._trace_runs.get(span.trace_id) - if run is None: - return - - # Temporarily set both ContextVars so _emit and providers work. - run_token = _current_run.set(run) - col_token = _current_collector.set(run.collector) - try: - span_type = getattr(span.span_data, "type", None) or "" - handler_name = self._SPAN_HANDLERS.get(span_type) - if handler_name is not None: - getattr(self, handler_name)(span) - finally: - _current_collector.reset(col_token) - _current_run.reset(run_token) - except Exception: - log.warning("layerlens: error handling OpenAI Agents span", exc_info=True) + span_type = getattr(span.span_data, "type", None) or "" + handler_name = self._SPAN_HANDLERS.get(span_type) + if handler_name is not None: + getattr(self, handler_name)(span) + finally: + _current_collector.reset(col_token) + _current_run.reset(run_token) def shutdown(self) -> None: pass diff --git a/src/layerlens/instrument/adapters/frameworks/pydantic_ai.py b/src/layerlens/instrument/adapters/frameworks/pydantic_ai.py index 63517dd9..a97913eb 100644 --- a/src/layerlens/instrument/adapters/frameworks/pydantic_ai.py +++ b/src/layerlens/instrument/adapters/frameworks/pydantic_ai.py @@ -3,6 +3,7 @@ import logging from typing import Any, Dict, Optional +from .._base import resilient_callback from ._utils import safe_serialize from ._base_framework import FrameworkAdapter from ..._capture_config import CaptureConfig @@ -98,6 +99,7 @@ def _register_hooks(self, hooks: Any) -> None: # Run lifecycle hooks # ------------------------------------------------------------------ + @resilient_callback(callback_name="_on_before_run") def _on_before_run(self, ctx: Any) -> None: self._begin_run() root = self._get_root_span() @@ -141,6 +143,7 @@ def _on_before_run(self, ctx: Any) -> None: ) self._start_timer("run") + @resilient_callback(callback_name="_on_after_run", passthrough_arg="result") def _on_after_run(self, ctx: Any, *, result: Any) -> Any: latency_ms = self._stop_timer("run") root = self._get_root_span() @@ -176,6 +179,16 @@ def _on_after_run(self, ctx: Any, *, result: Any) -> Any: return result def _on_run_error(self, ctx: Any, *, error: BaseException) -> None: + # Telemetry is best-effort; we MUST always re-raise the + # framework's original error or PydanticAI loses its error + # propagation contract. Keep telemetry inside a resilient + # helper so adapter-side bugs can never swallow the framework + # error. + self._emit_run_error_telemetry(ctx, error=error) + raise error + + @resilient_callback(callback_name="_on_run_error") + def _emit_run_error_telemetry(self, ctx: Any, *, error: BaseException) -> None: latency_ms = self._stop_timer("run") root = self._get_root_span() agent_name = self._get_agent_name(ctx) @@ -196,12 +209,12 @@ def _on_run_error(self, ctx: Any, *, error: BaseException) -> None: ) self._end_run() - raise error # ------------------------------------------------------------------ # Model request hooks # ------------------------------------------------------------------ + @resilient_callback(callback_name="_on_after_model_request", passthrough_arg="response") def _on_after_model_request( self, ctx: Any, @@ -240,18 +253,31 @@ def _on_model_request_error( *, request_context: Any, error: Exception, + ) -> None: + # Telemetry first (resilient), THEN re-raise the framework's + # error so PydanticAI's own error propagation is preserved. + self._emit_model_request_error_telemetry(ctx, request_context=request_context, error=error) + raise error + + @resilient_callback(callback_name="_on_model_request_error") + def _emit_model_request_error_telemetry( + self, + ctx: Any, + *, + request_context: Any, + error: Exception, ) -> None: payload = self._payload( error=str(error), error_type=type(error).__name__, ) self._emit("agent.error", payload) - raise error # ------------------------------------------------------------------ # Tool execution hooks # ------------------------------------------------------------------ + @resilient_callback(callback_name="_on_before_tool_execute", passthrough_arg="args") def _on_before_tool_execute( self, ctx: Any, @@ -269,6 +295,7 @@ def _on_before_tool_execute( self._start_timer(f"tool:{call_id}") return args + @resilient_callback(callback_name="_on_after_tool_execute", passthrough_arg="result") def _on_after_tool_execute( self, ctx: Any, @@ -301,6 +328,21 @@ def _on_tool_execute_error( tool_def: Any, args: Any, error: Exception, + ) -> None: + # Telemetry first (resilient), THEN re-raise the framework's + # error so PydanticAI can propagate the tool failure. + self._emit_tool_execute_error_telemetry(ctx, call=call, tool_def=tool_def, args=args, error=error) + raise error + + @resilient_callback(callback_name="_on_tool_execute_error") + def _emit_tool_execute_error_telemetry( + self, + ctx: Any, + *, + call: Any, + tool_def: Any, + args: Any, + error: Exception, ) -> None: tool_name = getattr(call, "tool_name", "unknown") call_id = getattr(call, "id", None) or tool_name @@ -316,12 +358,12 @@ def _on_tool_execute_error( error_type=type(error).__name__, ) self._emit("agent.error", payload) - raise error # ------------------------------------------------------------------ # Streaming hooks (pydantic-ai >= 0.5) # ------------------------------------------------------------------ + @resilient_callback(callback_name="_on_stream_chunk") def _on_stream_chunk(self, ctx: Any, *, chunk: Any, **_kwargs: Any) -> None: """Accumulate streaming chunks on the RunState; aggregated at stream end.""" run = self._get_run() @@ -330,6 +372,7 @@ def _on_stream_chunk(self, ctx: Any, *, chunk: Any, **_kwargs: Any) -> None: buf = run.data.setdefault("stream_buffer", []) buf.append(chunk) + @resilient_callback(callback_name="_on_after_stream") def _on_after_stream(self, ctx: Any, *, response: Any = None, **_kwargs: Any) -> None: run = self._get_run() if run is None: diff --git a/src/layerlens/instrument/adapters/frameworks/smolagents.py b/src/layerlens/instrument/adapters/frameworks/smolagents.py index 0e9c1e87..0b77f203 100644 --- a/src/layerlens/instrument/adapters/frameworks/smolagents.py +++ b/src/layerlens/instrument/adapters/frameworks/smolagents.py @@ -4,6 +4,7 @@ import logging from typing import Any, Dict, List, Optional +from .._base import resilient_callback from ._utils import safe_serialize from ..._collector import TraceCollector from ._base_framework import FrameworkAdapter @@ -190,6 +191,7 @@ def _end_trace(self) -> None: # Run lifecycle handlers # ------------------------------------------------------------------ + @resilient_callback(callback_name="_on_run_start") def _on_run_start(self, agent: Any, task: Any) -> None: span_id = self._new_span_id() with self._lock: @@ -219,6 +221,7 @@ def _on_run_start(self, agent: Any, task: Any) -> None: self._set_if_capturing(payload, "input", safe_serialize(task)) self._fire("agent.input", payload, span_id=span_id, span_name=agent_name) + @resilient_callback(callback_name="_on_run_end") def _on_run_end(self, agent: Any, result: Any, error: Optional[Exception]) -> None: latency_ms = self._tock("run") span_id = self._run_span_id or self._new_span_id() @@ -232,6 +235,7 @@ def _on_run_end(self, agent: Any, result: Any, error: Optional[Exception]) -> No self._fire("agent.output", payload, span_id=span_id, span_name=agent_name) self._end_trace() + @resilient_callback(callback_name="_on_run_error") def _on_run_error(self, agent: Any, exc: Exception) -> None: agent_name = _agent_name(agent) self._fire( @@ -244,18 +248,15 @@ def _on_run_error(self, agent: Any, exc: Exception) -> None: # Step handlers (registered as step_callbacks) # ------------------------------------------------------------------ + @resilient_callback(callback_name="_on_action_step") def _on_action_step(self, step: Any, agent: Any = None) -> None: - try: - self._handle_action_step(step, agent) - except Exception: - log.warning("layerlens: error in SmolAgents action step handler", exc_info=True) + self._handle_action_step(step, agent) + @resilient_callback(callback_name="_on_planning_step") def _on_planning_step(self, step: Any, agent: Any = None) -> None: - try: - self._handle_planning_step(step, agent) - except Exception: - log.warning("layerlens: error in SmolAgents planning step handler", exc_info=True) + self._handle_planning_step(step, agent) + @resilient_callback(callback_name="_on_final_answer_step") def _on_final_answer_step(self, step: Any, agent: Any = None) -> None: pass # run wrapper handles final output + flush diff --git a/src/layerlens/instrument/adapters/frameworks/strands.py b/src/layerlens/instrument/adapters/frameworks/strands.py index 25cec465..60168f4c 100644 --- a/src/layerlens/instrument/adapters/frameworks/strands.py +++ b/src/layerlens/instrument/adapters/frameworks/strands.py @@ -4,6 +4,7 @@ import logging from typing import Any, Dict, Optional +from .._base import resilient_callback from ._utils import safe_serialize from ..._collector import TraceCollector from ._base_framework import FrameworkAdapter @@ -174,78 +175,71 @@ def _end_trace(self) -> None: # Hook handlers # ------------------------------------------------------------------ + @resilient_callback(callback_name="_on_agent_initialized") def _on_agent_initialized(self, event: Any) -> None: """Sync-only callback fired when an agent is constructed.""" - try: - agent = event.agent - name = _agent_name(agent) - self._emit_agent_config(name, agent) - except Exception: - log.warning("layerlens: error in Strands agent_initialized", exc_info=True) + agent = event.agent + name = _agent_name(agent) + self._emit_agent_config(name, agent) + @resilient_callback(callback_name="_on_before_invocation") def _on_before_invocation(self, event: Any) -> None: - try: - agent = event.agent - name = _agent_name(agent) - span_id = self._new_span_id() - with self._lock: - self._collector = TraceCollector(self._client, self._config) - self._run_span_id = span_id - self._current_agent_name = name - self._tick("run") - - # Re-emit config if we haven't seen this agent yet - self._emit_agent_config(name, agent) - - payload = self._payload(agent_name=name) - model_id = _model_id(agent) - if model_id: - payload["model"] = model_id + agent = event.agent + name = _agent_name(agent) + span_id = self._new_span_id() + with self._lock: + self._collector = TraceCollector(self._client, self._config) + self._run_span_id = span_id + self._current_agent_name = name + self._tick("run") + + # Re-emit config if we haven't seen this agent yet + self._emit_agent_config(name, agent) - messages = getattr(event, "messages", None) - self._set_if_capturing(payload, "input", safe_serialize(messages)) - self._fire("agent.input", payload, span_id=span_id, span_name=name) - except Exception: - log.warning("layerlens: error in Strands before_invocation", exc_info=True) + payload = self._payload(agent_name=name) + model_id = _model_id(agent) + if model_id: + payload["model"] = model_id + + messages = getattr(event, "messages", None) + self._set_if_capturing(payload, "input", safe_serialize(messages)) + self._fire("agent.input", payload, span_id=span_id, span_name=name) + @resilient_callback(callback_name="_on_after_invocation") def _on_after_invocation(self, event: Any) -> None: - try: - agent = event.agent - name = _agent_name(agent) - latency_ms = self._tock("run") - span_id = self._run_span_id or self._new_span_id() - - payload = self._payload(agent_name=name) - if latency_ms is not None: - payload["duration_ns"] = int(latency_ms * 1_000_000) - - result = getattr(event, "result", None) - if result is not None: - stop_reason = getattr(result, "stop_reason", None) - if stop_reason: - payload["stop_reason"] = str(stop_reason) - - message = getattr(result, "message", None) - self._set_if_capturing(payload, "output", safe_serialize(message)) - - # Emit per-cycle cost.record events matched to model spans. - # accumulated_usage updates AFTER AfterModelCallEvent fires, - # so we read per-cycle tokens here instead. - self._emit_per_cycle_tokens(agent) - - self._fire("agent.output", payload, span_id=span_id, span_name=name) - self._end_trace() - except Exception: - log.warning("layerlens: error in Strands after_invocation", exc_info=True) + agent = event.agent + name = _agent_name(agent) + latency_ms = self._tock("run") + span_id = self._run_span_id or self._new_span_id() + + payload = self._payload(agent_name=name) + if latency_ms is not None: + payload["duration_ns"] = int(latency_ms * 1_000_000) + + result = getattr(event, "result", None) + if result is not None: + stop_reason = getattr(result, "stop_reason", None) + if stop_reason: + payload["stop_reason"] = str(stop_reason) + message = getattr(result, "message", None) + self._set_if_capturing(payload, "output", safe_serialize(message)) + + # Emit per-cycle cost.record events matched to model spans. + # accumulated_usage updates AFTER AfterModelCallEvent fires, + # so we read per-cycle tokens here instead. + self._emit_per_cycle_tokens(agent) + + self._fire("agent.output", payload, span_id=span_id, span_name=name) + self._end_trace() + + @resilient_callback(callback_name="_on_before_model") def _on_before_model(self, event: Any) -> None: - try: - agent = event.agent - name = _agent_name(agent) - self._tick(f"model:{name}") - except Exception: - log.warning("layerlens: error in Strands before_model", exc_info=True) + agent = event.agent + name = _agent_name(agent) + self._tick(f"model:{name}") + @resilient_callback(callback_name="_on_after_model") def _on_after_model(self, event: Any) -> None: """Emit model.invoke with timing and error info. @@ -253,95 +247,88 @@ def _on_after_model(self, event: Any) -> None: accumulated_usage AFTER this hook fires. Tokens are emitted per-cycle from _on_after_invocation using the cycle data. """ - try: - agent = event.agent - name = _agent_name(agent) - latency_ms = self._tock(f"model:{name}") + agent = event.agent + name = _agent_name(agent) + latency_ms = self._tock(f"model:{name}") - model_id = _model_id(agent) - payload = self._payload() - if model_id: - payload["model"] = model_id - - if latency_ms is not None: - payload["latency_ms"] = latency_ms - - exception = getattr(event, "exception", None) - if exception is not None: - payload["error"] = str(exception) - payload["error_type"] = type(exception).__name__ - - stop_response = getattr(event, "stop_response", None) - if stop_response is not None: - stop_reason = getattr(stop_response, "stop_reason", None) - if stop_reason: - payload["stop_reason"] = str(stop_reason) - - parent = self._run_span_id - span_id = self._new_span_id() - self._fire("model.invoke", payload, span_id=span_id, parent_span_id=parent) - with self._lock: - self._model_span_ids.append(span_id) - except Exception: - log.warning("layerlens: error in Strands after_model", exc_info=True) + model_id = _model_id(agent) + payload = self._payload() + if model_id: + payload["model"] = model_id + if latency_ms is not None: + payload["latency_ms"] = latency_ms + + exception = getattr(event, "exception", None) + if exception is not None: + payload["error"] = str(exception) + payload["error_type"] = type(exception).__name__ + + stop_response = getattr(event, "stop_response", None) + if stop_response is not None: + stop_reason = getattr(stop_response, "stop_reason", None) + if stop_reason: + payload["stop_reason"] = str(stop_reason) + + parent = self._run_span_id + span_id = self._new_span_id() + self._fire("model.invoke", payload, span_id=span_id, parent_span_id=parent) + with self._lock: + self._model_span_ids.append(span_id) + + @resilient_callback(callback_name="_on_before_tool") def _on_before_tool(self, event: Any) -> None: - try: - tool_use = event.tool_use - tool_name = ( - tool_use.get("name", "unknown") if isinstance(tool_use, dict) else getattr(tool_use, "name", "unknown") - ) - tool_id = ( - tool_use.get("toolUseId", tool_name) - if isinstance(tool_use, dict) - else getattr(tool_use, "toolUseId", tool_name) - ) - self._tick(f"tool:{tool_id}") - except Exception: - log.warning("layerlens: error in Strands before_tool", exc_info=True) + tool_use = event.tool_use + tool_name = ( + tool_use.get("name", "unknown") if isinstance(tool_use, dict) else getattr(tool_use, "name", "unknown") + ) + tool_id = ( + tool_use.get("toolUseId", tool_name) + if isinstance(tool_use, dict) + else getattr(tool_use, "toolUseId", tool_name) + ) + self._tick(f"tool:{tool_id}") + @resilient_callback(callback_name="_on_after_tool") def _on_after_tool(self, event: Any) -> None: - try: - tool_use = event.tool_use - tool_name = ( - tool_use.get("name", "unknown") if isinstance(tool_use, dict) else getattr(tool_use, "name", "unknown") - ) - tool_id = ( - tool_use.get("toolUseId", tool_name) - if isinstance(tool_use, dict) - else getattr(tool_use, "toolUseId", tool_name) - ) - tool_input = tool_use.get("input", None) if isinstance(tool_use, dict) else getattr(tool_use, "input", None) - latency_ms = self._tock(f"tool:{tool_id}") - - parent = self._run_span_id - span_id = self._new_span_id() - - call_payload = self._payload(tool_name=tool_name) - self._set_if_capturing(call_payload, "input", safe_serialize(tool_input)) - if latency_ms is not None: - call_payload["latency_ms"] = latency_ms - self._fire("tool.call", call_payload, span_id=span_id, parent_span_id=parent, span_name=f"tool:{tool_name}") - - result = getattr(event, "result", None) - result_payload = self._payload(tool_name=tool_name) - if result is not None: - status = result.get("status", None) if isinstance(result, dict) else getattr(result, "status", None) - if status: - result_payload["status"] = str(status) - content = result.get("content", None) if isinstance(result, dict) else getattr(result, "content", None) - self._set_if_capturing(result_payload, "output", safe_serialize(content)) - - exception = getattr(event, "exception", None) - if exception is not None: - result_payload["error"] = str(exception) - result_payload["error_type"] = type(exception).__name__ - - self._fire( - "tool.result", result_payload, span_id=span_id, parent_span_id=parent, span_name=f"tool:{tool_name}" - ) - except Exception: - log.warning("layerlens: error in Strands after_tool", exc_info=True) + tool_use = event.tool_use + tool_name = ( + tool_use.get("name", "unknown") if isinstance(tool_use, dict) else getattr(tool_use, "name", "unknown") + ) + tool_id = ( + tool_use.get("toolUseId", tool_name) + if isinstance(tool_use, dict) + else getattr(tool_use, "toolUseId", tool_name) + ) + tool_input = tool_use.get("input", None) if isinstance(tool_use, dict) else getattr(tool_use, "input", None) + latency_ms = self._tock(f"tool:{tool_id}") + + parent = self._run_span_id + span_id = self._new_span_id() + + call_payload = self._payload(tool_name=tool_name) + self._set_if_capturing(call_payload, "input", safe_serialize(tool_input)) + if latency_ms is not None: + call_payload["latency_ms"] = latency_ms + self._fire("tool.call", call_payload, span_id=span_id, parent_span_id=parent, span_name=f"tool:{tool_name}") + + result = getattr(event, "result", None) + result_payload = self._payload(tool_name=tool_name) + if result is not None: + status = result.get("status", None) if isinstance(result, dict) else getattr(result, "status", None) + if status: + result_payload["status"] = str(status) + content = result.get("content", None) if isinstance(result, dict) else getattr(result, "content", None) + self._set_if_capturing(result_payload, "output", safe_serialize(content)) + + exception = getattr(event, "exception", None) + if exception is not None: + result_payload["error"] = str(exception) + result_payload["error_type"] = type(exception).__name__ + + self._fire( + "tool.result", result_payload, span_id=span_id, parent_span_id=parent, span_name=f"tool:{tool_name}" + ) # ------------------------------------------------------------------ # Helpers diff --git a/tests/instrument/adapters/_base/__init__.py b/tests/instrument/adapters/_base/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/instrument/adapters/_base/test_per_adapter_resilience.py b/tests/instrument/adapters/_base/test_per_adapter_resilience.py new file mode 100644 index 00000000..6ae9a57b --- /dev/null +++ b/tests/instrument/adapters/_base/test_per_adapter_resilience.py @@ -0,0 +1,537 @@ +"""Per-adapter resilience smoke tests for all 10 lighter framework adapters. + +These tests instantiate each adapter, force a callback to raise (by +sabotaging an inner helper), and assert: + +1. The exception does NOT propagate (framework would crash otherwise). +2. The resilience tracker recorded the failure. +3. After enough failures, ``adapter_info().metadata['resilience_status']`` + is ``"degraded"``. + +This is the per-adapter complement to ``test_resilience.py``, which +covers the decorator + tracker mechanics in isolation. + +Adapters covered (10 lighter): + agno, llamaindex, google_adk, strands, pydantic_ai, + smolagents, bedrock_agents, openai_agents, haystack, langfuse. + +Each adapter is exercised against a CALLBACK that exists on the +instance unconditionally (no need to construct framework-specific +fixture objects). The actual callback bodies use ``self._payload(...)``, +``self._fire(...)`` or similar internal helpers that we monkey-patch +to force a failure deterministically. +""" + +from __future__ import annotations + +from typing import Any, Dict +from unittest.mock import Mock + +import pytest + +from layerlens.instrument._context import ( + _current_run, + _current_span_id, + _current_collector, +) +from layerlens.instrument.adapters._base import DEFAULT_FAILURE_THRESHOLD + + +@pytest.fixture(autouse=True) +def _isolate_context_vars(): + """Ensure ContextVar state is clean before AND after every test. + + Several callbacks under test (pydantic_ai._on_before_run, + bedrock_agents._before_invoke) intentionally call _begin_run() — + when the test then forces those callbacks to fail, the ContextVar + tokens pushed by _begin_run are NOT popped (because the failure + happens after the push). Without per-test cleanup those leaked + tokens corrupt subsequent tests in the same process (notably + ``tests/instrument/test_trace_context.py``). + """ + # Snapshot current state (likely None) and force a clean baseline. + run_token = _current_run.set(None) + col_token = _current_collector.set(None) + span_token = _current_span_id.set(None) + try: + yield + finally: + # Hard reset — tests in this module are not expected to leave + # any persistent run/collector/span state. + for var, token in ( + (_current_run, run_token), + (_current_collector, col_token), + (_current_span_id, span_token), + ): + try: + var.reset(token) + except (ValueError, LookupError): + var.set(None) + + +class _Boom(Exception): + """Sentinel exception type used to verify the right error was caught.""" + + +def _force_payload_failure(adapter: Any) -> None: + """Sabotage ``adapter._payload`` so any callback that touches it raises.""" + + def _raise(*args: Any, **kwargs: Any) -> Dict[str, Any]: + raise _Boom("simulated framework callback failure") + + adapter._payload = _raise # type: ignore[method-assign] + + +def _force_fire_failure(adapter: Any) -> None: + """Sabotage ``adapter._fire`` for adapters whose callbacks call _fire directly.""" + + def _raise(*args: Any, **kwargs: Any) -> None: + raise _Boom("simulated _fire failure") + + adapter._fire = _raise # type: ignore[method-assign] + + +def _force_emit_failure(adapter: Any) -> None: + """Sabotage ``adapter._emit`` for adapters whose callbacks call _emit directly.""" + + def _raise(*args: Any, **kwargs: Any) -> None: + raise _Boom("simulated _emit failure") + + adapter._emit = _raise # type: ignore[method-assign] + + +# --------------------------------------------------------------------------- +# agno +# --------------------------------------------------------------------------- + + +class TestAgnoResilience: + def test_on_run_start_failure_caught(self) -> None: + from layerlens.instrument.adapters.frameworks.agno import AgnoAdapter + + adapter = AgnoAdapter(Mock()) + _force_payload_failure(adapter) + # Must not raise. + result = adapter._on_run_start(Mock(), "input") + assert result is None + assert adapter._resilience.total_failures == 1 + + def test_on_run_end_failure_caught(self) -> None: + from layerlens.instrument.adapters.frameworks.agno import AgnoAdapter + + adapter = AgnoAdapter(Mock()) + _force_payload_failure(adapter) + result = adapter._on_run_end(Mock(), Mock(), None) + assert result is None + assert adapter._resilience.total_failures == 1 + + +# --------------------------------------------------------------------------- +# llamaindex +# --------------------------------------------------------------------------- + + +class TestLlamaIndexResilience: + def test_on_query_start_failure_caught(self) -> None: + pytest.importorskip("llama_index.core") + from layerlens.instrument.adapters.frameworks.llamaindex import LlamaIndexAdapter + + adapter = LlamaIndexAdapter(Mock()) + _force_payload_failure(adapter) + event = Mock() + event.span_id = "span-1" + result = adapter._on_query_start(event) + assert result is None + assert adapter._resilience.total_failures == 1 + + def test_handle_event_unknown_type_no_op(self) -> None: + pytest.importorskip("llama_index.core") + from layerlens.instrument.adapters.frameworks.llamaindex import LlamaIndexAdapter + + adapter = LlamaIndexAdapter(Mock()) + # Unknown event class — handler lookup returns None, no exception. + adapter._handle_event(object()) + # No failure recorded — unknown types are a no-op, not an error. + assert adapter._resilience.total_failures == 0 + + def test_on_span_enter_failure_caught(self) -> None: + pytest.importorskip("llama_index.core") + from layerlens.instrument.adapters.frameworks.llamaindex import LlamaIndexAdapter + + adapter = LlamaIndexAdapter(Mock()) + + # Sabotage open_spans dict access by replacing _open_spans with + # an object that raises on __setitem__. + class _Bad: + def __setitem__(self, key: Any, value: Any) -> None: + raise _Boom("dict broken") + + def get(self, key: Any, default: Any = None) -> Any: + return default + + def __contains__(self, key: Any) -> bool: + return False + + adapter._open_spans = _Bad() # type: ignore[assignment] + result = adapter._on_span_enter("id-1", None) + # Default for span lifecycle is None — LlamaIndex tolerates it. + assert result is None + assert adapter._resilience.total_failures == 1 + + +# --------------------------------------------------------------------------- +# google_adk +# --------------------------------------------------------------------------- + + +class TestGoogleAdkResilience: + def test_on_before_run_failure_caught(self) -> None: + from layerlens.instrument.adapters.frameworks.google_adk import GoogleADKAdapter + + adapter = GoogleADKAdapter(Mock()) + _force_payload_failure(adapter) + adapter._on_before_run(Mock()) + assert adapter._resilience.total_failures == 1 + + def test_on_after_run_failure_caught(self) -> None: + from layerlens.instrument.adapters.frameworks.google_adk import GoogleADKAdapter + + adapter = GoogleADKAdapter(Mock()) + _force_payload_failure(adapter) + adapter._on_after_run(Mock()) + assert adapter._resilience.total_failures == 1 + + +# --------------------------------------------------------------------------- +# strands +# --------------------------------------------------------------------------- + + +class TestStrandsResilience: + def test_on_before_invocation_failure_caught(self) -> None: + from layerlens.instrument.adapters.frameworks.strands import StrandsAdapter + + adapter = StrandsAdapter(Mock()) + _force_payload_failure(adapter) + # Build a minimal event shim — the wrapped callback will raise + # when it tries to call _payload, which our sabotage replaces. + event = Mock() + event.agent = Mock() + adapter._on_before_invocation(event) + assert adapter._resilience.total_failures == 1 + + +# --------------------------------------------------------------------------- +# pydantic_ai +# --------------------------------------------------------------------------- + + +class TestPydanticAiResilience: + def test_on_before_run_failure_caught(self) -> None: + from layerlens.instrument.adapters.frameworks.pydantic_ai import PydanticAIAdapter + + adapter = PydanticAIAdapter(Mock()) + _force_payload_failure(adapter) + # _on_before_run calls _begin_run() then _payload — the latter + # raises and must be caught. + adapter._on_before_run(Mock()) + assert adapter._resilience.total_failures == 1 + + def test_on_after_model_request_passthrough_returns_response(self) -> None: + # Critical: when _on_after_model_request raises, the wrapper + # MUST return the original response object (passthrough_arg= + # "response") otherwise the agent's LLM response becomes None + # and the agent crashes downstream. + from layerlens.instrument.adapters.frameworks.pydantic_ai import PydanticAIAdapter + + adapter = PydanticAIAdapter(Mock()) + _force_emit_failure(adapter) + sentinel_response = Mock(name="response_object") + result = adapter._on_after_model_request( + Mock(), + request_context=Mock(), + response=sentinel_response, + ) + assert result is sentinel_response + assert adapter._resilience.total_failures == 1 + + def test_on_before_tool_execute_passthrough_returns_args(self) -> None: + from layerlens.instrument.adapters.frameworks.pydantic_ai import PydanticAIAdapter + + adapter = PydanticAIAdapter(Mock()) + + # Sabotage _start_timer (used inside _on_before_tool_execute). + def _raise(*a: Any, **kw: Any) -> None: + raise _Boom("timer broken") + + adapter._start_timer = _raise # type: ignore[method-assign] + sentinel_args = ("a", "b", "c") + result = adapter._on_before_tool_execute( + Mock(), + call=Mock(), + tool_def=Mock(), + args=sentinel_args, + ) + assert result == sentinel_args + assert adapter._resilience.total_failures == 1 + + def test_run_error_re_raises_framework_error(self) -> None: + # The error-callback path MUST always re-raise the framework's + # original error — even when our telemetry helper raises. The + # framework's contract requires the error to propagate. + from layerlens.instrument.adapters.frameworks.pydantic_ai import PydanticAIAdapter + + adapter = PydanticAIAdapter(Mock()) + _force_emit_failure(adapter) # telemetry will fail + + framework_error = ValueError("framework's own error") + with pytest.raises(ValueError, match="framework's own error"): + adapter._on_run_error(Mock(), error=framework_error) + + # Telemetry helper failure was caught + recorded; re-raise still happened. + assert adapter._resilience.total_failures == 1 + + +# --------------------------------------------------------------------------- +# smolagents +# --------------------------------------------------------------------------- + + +class TestSmolAgentsResilience: + def test_on_action_step_failure_caught(self) -> None: + from layerlens.instrument.adapters.frameworks.smolagents import SmolAgentsAdapter + + adapter = SmolAgentsAdapter(Mock()) + + # Sabotage _handle_action_step itself. + def _raise(*a: Any, **kw: Any) -> None: + raise _Boom("handler broken") + + adapter._handle_action_step = _raise # type: ignore[method-assign] + adapter._on_action_step(Mock(), Mock()) + assert adapter._resilience.total_failures == 1 + + +# --------------------------------------------------------------------------- +# bedrock_agents +# --------------------------------------------------------------------------- + + +class TestBedrockAgentsResilience: + def test_before_invoke_failure_caught(self) -> None: + from layerlens.instrument.adapters.frameworks.bedrock_agents import BedrockAgentsAdapter + + adapter = BedrockAgentsAdapter(Mock()) + adapter._connected = True # bypass the early ``not connected`` return + _force_payload_failure(adapter) + # boto3 invokes hooks with kwargs only. + adapter._before_invoke(params={"agentId": "id-1", "sessionId": "s-1"}) + assert adapter._resilience.total_failures == 1 + + def test_after_invoke_finally_runs_even_on_failure(self) -> None: + # The outer _after_invoke wraps the inner body in try/finally so + # _end_run() always fires — critical for releasing ContextVars. + from layerlens.instrument.adapters.frameworks.bedrock_agents import BedrockAgentsAdapter + + adapter = BedrockAgentsAdapter(Mock()) + adapter._connected = True + # Set up a run scope so _end_run has something to clean up. + adapter._begin_run() + _force_payload_failure(adapter) + + end_run_called = [] + original_end_run = adapter._end_run + + def _spy_end_run() -> None: + end_run_called.append(True) + original_end_run() + + adapter._end_run = _spy_end_run # type: ignore[method-assign] + adapter._after_invoke(parsed={"sessionId": "s-1"}) + # _end_run fired despite the body raising. + assert end_run_called == [True] + assert adapter._resilience.total_failures == 1 + + +# --------------------------------------------------------------------------- +# openai_agents +# --------------------------------------------------------------------------- + + +class TestOpenAiAgentsResilience: + def test_on_trace_start_failure_caught(self) -> None: + # If the SDK isn't installed, we still want to test the resilience + # wiring — but the class can't be instantiated because the parent + # TracingProcessor isn't available. Skip in that case. + pytest.importorskip("agents") + from layerlens.instrument.adapters.frameworks.openai_agents import OpenAIAgentsAdapter + + adapter = OpenAIAgentsAdapter(Mock()) + + # Sabotage RunState construction by patching the lock to raise. + def _raise_on_acquire(*a: Any, **kw: Any) -> Any: + raise _Boom("lock broken") + + adapter._lock = Mock() + adapter._lock.__enter__ = _raise_on_acquire + adapter._lock.__exit__ = lambda *a: None + + trace = Mock() + trace.trace_id = "t-1" + adapter.on_trace_start(trace) + assert adapter._resilience.total_failures == 1 + + def test_on_trace_end_failure_caught(self) -> None: + pytest.importorskip("agents") + from layerlens.instrument.adapters.frameworks.openai_agents import OpenAIAgentsAdapter + + adapter = OpenAIAgentsAdapter(Mock()) + + # Sabotage the trace_runs dict. + class _Bad: + def pop(self, *a: Any, **kw: Any) -> Any: + raise _Boom("dict broken") + + adapter._trace_runs = _Bad() # type: ignore[assignment] + trace = Mock() + trace.trace_id = "t-1" + adapter.on_trace_end(trace) + assert adapter._resilience.total_failures == 1 + + +# --------------------------------------------------------------------------- +# haystack +# --------------------------------------------------------------------------- + + +class TestHaystackResilience: + def test_on_span_end_failure_caught(self) -> None: + from layerlens.instrument.adapters.frameworks.haystack import ( + HaystackAdapter, + _LayerLensSpan, + ) + + adapter = HaystackAdapter(Mock()) + # Sabotage the _on_pipeline_end branch. + def _raise(*a: Any, **kw: Any) -> None: + raise _Boom("pipeline broken") + + adapter._on_pipeline_end = _raise # type: ignore[method-assign] + + span = _LayerLensSpan( + adapter, + "haystack.pipeline.run", + "span-1", + None, + {}, + is_pipeline=True, + ) + adapter._on_span_end(span) + assert adapter._resilience.total_failures == 1 + + +# --------------------------------------------------------------------------- +# langfuse +# --------------------------------------------------------------------------- + + +class TestLangfuseResilience: + def test_import_observation_failure_caught(self) -> None: + from layerlens.instrument.adapters.frameworks.langfuse import LangfuseAdapter + + adapter = LangfuseAdapter(Mock()) + + # Force the inner branch to raise — _import_generation is called + # for type=GENERATION, _import_span for SPAN; sabotage the SPAN + # path with a malformed obs. + collector = Mock() + bad_obs = {"type": "SPAN", "id": "obs-1"} + + # Sabotage _import_span specifically. + def _raise(*a: Any, **kw: Any) -> None: + raise _Boom("span import broken") + + adapter._import_span = _raise # type: ignore[method-assign] + adapter._import_observation(collector, bad_obs, "root-span") + assert adapter._resilience.total_failures == 1 + + def test_import_score_failure_caught(self) -> None: + from layerlens.instrument.adapters.frameworks.langfuse import LangfuseAdapter + + adapter = LangfuseAdapter(Mock()) + collector = Mock() + # collector.emit raises — our score importer must catch. + collector.emit.side_effect = _Boom("collector broken") + adapter._import_score( + collector, + {"sessionId": "s-1"}, + "trace-1", + "root-span", + {"name": "quality", "value": 0.9}, + ) + assert adapter._resilience.total_failures == 1 + + +# --------------------------------------------------------------------------- +# Health degradation across all 10 adapters +# --------------------------------------------------------------------------- + + +@pytest.mark.parametrize( + "module_path, class_name", + [ + ("layerlens.instrument.adapters.frameworks.agno", "AgnoAdapter"), + ("layerlens.instrument.adapters.frameworks.smolagents", "SmolAgentsAdapter"), + ("layerlens.instrument.adapters.frameworks.google_adk", "GoogleADKAdapter"), + ("layerlens.instrument.adapters.frameworks.strands", "StrandsAdapter"), + ("layerlens.instrument.adapters.frameworks.pydantic_ai", "PydanticAIAdapter"), + ("layerlens.instrument.adapters.frameworks.bedrock_agents", "BedrockAgentsAdapter"), + ("layerlens.instrument.adapters.frameworks.haystack", "HaystackAdapter"), + ("layerlens.instrument.adapters.frameworks.langfuse", "LangfuseAdapter"), + ], +) +def test_adapter_health_degrades_on_repeated_failures(module_path: str, class_name: str) -> None: + """Every lighter adapter exposes resilience health via adapter_info().metadata.""" + import importlib + + module = importlib.import_module(module_path) + adapter_cls = getattr(module, class_name) + adapter = adapter_cls(Mock()) + + # Hit the threshold by recording failures directly on the tracker + # (faster than driving each adapter's specific callback path; this + # test is purely about the metadata surface). + for _ in range(DEFAULT_FAILURE_THRESHOLD): + adapter._resilience.record_failure("synthetic", _Boom("threshold test")) + + info = adapter.adapter_info() + assert info.metadata["resilience_status"] == "degraded" + assert info.metadata["resilience_failures_total"] == DEFAULT_FAILURE_THRESHOLD + + +# --------------------------------------------------------------------------- +# llamaindex / openai_agents handled separately because they need +# llama_index_core / agents installed at test time. Use importorskip. +# --------------------------------------------------------------------------- + + +def test_llamaindex_health_degrades_on_repeated_failures() -> None: + pytest.importorskip("llama_index.core") + from layerlens.instrument.adapters.frameworks.llamaindex import LlamaIndexAdapter + + adapter = LlamaIndexAdapter(Mock()) + for _ in range(DEFAULT_FAILURE_THRESHOLD): + adapter._resilience.record_failure("synthetic", _Boom("threshold")) + info = adapter.adapter_info() + assert info.metadata["resilience_status"] == "degraded" + + +def test_openai_agents_health_degrades_on_repeated_failures() -> None: + pytest.importorskip("agents") + from layerlens.instrument.adapters.frameworks.openai_agents import OpenAIAgentsAdapter + + adapter = OpenAIAgentsAdapter(Mock()) + for _ in range(DEFAULT_FAILURE_THRESHOLD): + adapter._resilience.record_failure("synthetic", _Boom("threshold")) + info = adapter.adapter_info() + assert info.metadata["resilience_status"] == "degraded" diff --git a/tests/instrument/adapters/_base/test_resilience.py b/tests/instrument/adapters/_base/test_resilience.py new file mode 100644 index 00000000..5c1f730c --- /dev/null +++ b/tests/instrument/adapters/_base/test_resilience.py @@ -0,0 +1,500 @@ +"""Tests for the per-callback resilience wrapper. + +Covers ``ResilienceTracker``, ``resilient_callback``, ``get_default_for``, +``HealthStatus``, and the ``adapter_info().metadata`` integration on +``FrameworkAdapter`` subclasses. + +Every test asserts a behaviour that prevents observability code from +breaking the framework's own execution path. +""" + +from __future__ import annotations + +import logging +import threading +from typing import Any, Dict +from unittest.mock import Mock + +import pytest + +from layerlens.instrument.adapters._base import ( + DEFAULT_FAILURE_THRESHOLD, + AdapterInfo, + BaseAdapter, + HealthStatus, + ResilienceTracker, + get_default_for, + resilient_callback, +) +from layerlens.instrument.adapters.frameworks._base_framework import FrameworkAdapter + +# --------------------------------------------------------------------------- +# Test helpers +# --------------------------------------------------------------------------- + + +class _Boom(Exception): + """Sentinel error type so tests can assert the right exception was caught.""" + + +class _DummyAdapter: + """Minimal adapter shape — provides ``name`` and ``_resilience`` only.""" + + name = "dummy" + + def __init__(self) -> None: + self._resilience = ResilienceTracker(self.name) + + @resilient_callback(callback_name="my_callback", default="DEFAULT") + def my_callback(self, value: Any) -> Any: + if value == "raise": + raise _Boom("dummy failure") + return f"ok:{value}" + + @resilient_callback(callback_name="passthrough_cb", passthrough_arg="value") + def passthrough_cb(self, value: Any) -> Any: + if value == "raise": + raise _Boom("passthrough failure") + return f"transformed:{value}" + + @resilient_callback(callback_name="kw_passthrough", passthrough_arg="payload") + def kw_passthrough(self, *, payload: Any) -> Any: + if payload == "raise": + raise _Boom("kw passthrough failure") + return {"wrapped": payload} + + +class _MinimalFramework(FrameworkAdapter): + """Real FrameworkAdapter subclass for integration tests.""" + + name = "test-framework" + + def _on_connect(self, target: Any = None, **kwargs: Any) -> None: + return None + + @resilient_callback(callback_name="emit_thing") + def emit_thing(self, value: int) -> None: + if value < 0: + raise _Boom(f"negative value: {value}") + + +# --------------------------------------------------------------------------- +# get_default_for +# --------------------------------------------------------------------------- + + +class TestGetDefaultFor: + def test_known_callback_returns_none(self) -> None: + # All registered callbacks default to None — the framework default + # for void-callback APIs (Strands hooks, Google ADK plugins, boto3). + assert get_default_for("on_trace_start") is None + assert get_default_for("_before_invoke") is None + assert get_default_for("after_run_callback") is None + + def test_unknown_callback_returns_none(self) -> None: + # Unknown callback names also return None — the safe default. If a + # callback needs a non-None default, the adapter must pass it + # explicitly via @resilient_callback(default=...). + assert get_default_for("does_not_exist") is None + assert get_default_for("") is None + + +# --------------------------------------------------------------------------- +# ResilienceTracker +# --------------------------------------------------------------------------- + + +class TestResilienceTracker: + def test_starts_healthy_with_zero_failures(self) -> None: + tracker = ResilienceTracker("test") + assert tracker.total_failures == 0 + assert tracker.health_status() == HealthStatus.HEALTHY + + def test_threshold_validation(self) -> None: + with pytest.raises(ValueError): + ResilienceTracker("test", threshold=0) + with pytest.raises(ValueError): + ResilienceTracker("test", threshold=-1) + + def test_record_failure_increments_counter(self) -> None: + tracker = ResilienceTracker("test") + tracker.record_failure("cb1", _Boom("first")) + assert tracker.total_failures == 1 + tracker.record_failure("cb1", _Boom("second")) + tracker.record_failure("cb2", _Boom("third")) + assert tracker.total_failures == 3 + + def test_health_degrades_after_threshold(self) -> None: + tracker = ResilienceTracker("test", threshold=3) + tracker.record_failure("cb", _Boom("a")) + tracker.record_failure("cb", _Boom("b")) + assert tracker.health_status() == HealthStatus.HEALTHY + tracker.record_failure("cb", _Boom("c")) + assert tracker.health_status() == HealthStatus.DEGRADED + + def test_metadata_snapshot(self) -> None: + tracker = ResilienceTracker("test", threshold=2) + tracker.record_failure("cb1", _Boom("oops")) + tracker.record_failure("cb2", ValueError("bad value")) + snap = tracker.as_metadata() + assert snap["resilience_status"] == HealthStatus.DEGRADED.value + assert snap["resilience_failures_total"] == 2 + assert snap["resilience_failure_threshold"] == 2 + # Per-callback breakdown carries the top failure counts. + assert snap["resilience_failures_by_callback"] == {"cb1": 1, "cb2": 1} + # Last error preserved (truncated) for triage. + assert snap["resilience_last_callback"] == "cb2" + assert "ValueError" in snap["resilience_last_error"] + + def test_reset_clears_state(self) -> None: + tracker = ResilienceTracker("test") + tracker.record_failure("cb", _Boom("x")) + assert tracker.total_failures == 1 + tracker.reset() + assert tracker.total_failures == 0 + assert tracker.health_status() == HealthStatus.HEALTHY + snap = tracker.as_metadata() + assert snap["resilience_failures_total"] == 0 + assert "resilience_last_error" not in snap + + def test_thread_safety(self) -> None: + # Many threads recording failures concurrently must not lose any + # increments — observability code commonly fires from worker + # threads (CrewAI, AutoGen group chat, Bedrock boto3 hooks). + tracker = ResilienceTracker("test", threshold=DEFAULT_FAILURE_THRESHOLD) + per_thread_count = 100 + thread_count = 8 + + def _worker() -> None: + for _ in range(per_thread_count): + tracker.record_failure("cb", _Boom("concurrent")) + + threads = [threading.Thread(target=_worker) for _ in range(thread_count)] + for t in threads: + t.start() + for t in threads: + t.join() + + assert tracker.total_failures == per_thread_count * thread_count + + +# --------------------------------------------------------------------------- +# resilient_callback decorator +# --------------------------------------------------------------------------- + + +class TestResilientCallbackDecorator: + def test_returns_normal_value_on_success(self) -> None: + adapter = _DummyAdapter() + assert adapter.my_callback("hello") == "ok:hello" + assert adapter._resilience.total_failures == 0 + + def test_returns_default_on_exception(self) -> None: + adapter = _DummyAdapter() + # Without the wrapper, the call would raise — instead it returns + # the framework's expected default and the framework continues. + result = adapter.my_callback("raise") + assert result == "DEFAULT" + + def test_failure_counter_incremented(self) -> None: + adapter = _DummyAdapter() + adapter.my_callback("raise") + adapter.my_callback("raise") + adapter.my_callback("ok-1") # this one succeeds + adapter.my_callback("raise") + assert adapter._resilience.total_failures == 3 + + def test_exception_is_logged_with_context(self, caplog: pytest.LogCaptureFixture) -> None: + adapter = _DummyAdapter() + with caplog.at_level(logging.WARNING): + adapter.my_callback("raise") + + # Adapter name + callback name + traceback all surfaced. + assert any( + "dummy" in rec.message and "my_callback" in rec.message and "_Boom" in rec.message + for rec in caplog.records + ) + + def test_exception_does_not_propagate(self) -> None: + adapter = _DummyAdapter() + # The whole point: the framework calling this method MUST NOT see + # _Boom — that would crash the user's agent. + try: + adapter.my_callback("raise") + except _Boom: + pytest.fail("resilient_callback let the exception escape") + + def test_passthrough_arg_returns_positional_value(self) -> None: + adapter = _DummyAdapter() + # On failure the wrapper returns the passthrough arg's value + # rather than the default — critical for mutating hooks + # (Pydantic-AI ``after_model_request`` returns ``response``). + assert adapter.passthrough_cb("raise") == "raise" + # On success, the original return value flows through. + assert adapter.passthrough_cb("ok") == "transformed:ok" + + def test_passthrough_arg_returns_keyword_value(self) -> None: + adapter = _DummyAdapter() + assert adapter.kw_passthrough(payload="raise") == "raise" + assert adapter.kw_passthrough(payload="data") == {"wrapped": "data"} + + def test_health_degrades_after_repeated_failures(self) -> None: + adapter = _DummyAdapter() + # Default threshold is 5; after 5 consecutive failures the + # adapter reports DEGRADED so monitoring can alert. + assert adapter._resilience.health_status() == HealthStatus.HEALTHY + for _ in range(DEFAULT_FAILURE_THRESHOLD): + adapter.my_callback("raise") + assert adapter._resilience.health_status() == HealthStatus.DEGRADED + + def test_keyboard_interrupt_propagates(self) -> None: + # We must NEVER swallow KeyboardInterrupt / SystemExit / + # GeneratorExit — those are control-flow signals, not bugs. + class _CtrlCAdapter: + name = "ctrlc" + + def __init__(self) -> None: + self._resilience = ResilienceTracker(self.name) + + @resilient_callback(callback_name="cb") + def cb(self) -> None: + raise KeyboardInterrupt("user pressed Ctrl-C") + + adapter = _CtrlCAdapter() + with pytest.raises(KeyboardInterrupt): + adapter.cb() + + def test_works_without_resilience_tracker_attribute( + self, caplog: pytest.LogCaptureFixture + ) -> None: + # If an adapter forgets to set up _resilience, the wrapper still + # logs and returns the default — never crashes the framework. + class _NoTracker: + name = "no_tracker" + + @resilient_callback(callback_name="cb", default="OK") + def cb(self) -> str: + raise _Boom("no tracker") + + adapter = _NoTracker() + with caplog.at_level(logging.WARNING): + assert adapter.cb() == "OK" + assert any("_Boom" in rec.message for rec in caplog.records) + + def test_logger_uses_module_of_decorated_function( + self, caplog: pytest.LogCaptureFixture + ) -> None: + # Failures are logged via the wrapped function's module logger so + # users can mute one adapter's resilience warnings without + # silencing all of them. + adapter = _DummyAdapter() + with caplog.at_level(logging.WARNING, logger=__name__): + adapter.my_callback("raise") + # Our test module's logger captured the warning. + assert any(rec.name == __name__ for rec in caplog.records) + + +# --------------------------------------------------------------------------- +# Integration with FrameworkAdapter +# --------------------------------------------------------------------------- + + +class TestFrameworkAdapterIntegration: + def test_framework_adapter_owns_resilience_tracker(self) -> None: + adapter = _MinimalFramework(Mock()) + assert isinstance(adapter._resilience, ResilienceTracker) + assert adapter._resilience.total_failures == 0 + + def test_adapter_info_surfaces_resilience_metadata(self) -> None: + adapter = _MinimalFramework(Mock()) + info: AdapterInfo = adapter.adapter_info() + meta = info.metadata + assert meta["resilience_status"] == "healthy" + assert meta["resilience_failures_total"] == 0 + assert meta["resilience_failure_threshold"] == DEFAULT_FAILURE_THRESHOLD + + def test_adapter_info_reports_degraded_after_failures(self) -> None: + adapter = _MinimalFramework(Mock()) + for _ in range(DEFAULT_FAILURE_THRESHOLD): + adapter.emit_thing(-1) # raises inside the wrapped method + info = adapter.adapter_info() + assert info.metadata["resilience_status"] == "degraded" + assert info.metadata["resilience_failures_total"] == DEFAULT_FAILURE_THRESHOLD + + def test_disconnect_resets_resilience(self) -> None: + adapter = _MinimalFramework(Mock()) + adapter.connect() + adapter.emit_thing(-1) + adapter.emit_thing(-2) + assert adapter._resilience.total_failures == 2 + adapter.disconnect() + assert adapter._resilience.total_failures == 0 + + def test_callback_failure_does_not_break_framework(self) -> None: + adapter = _MinimalFramework(Mock()) + adapter.connect() + # Simulating "framework fires our callback" — the callback throws + # but the framework's call-site sees no exception, just None. + result = adapter.emit_thing(-99) + assert result is None + assert adapter._resilience.total_failures == 1 + + +# --------------------------------------------------------------------------- +# Public surface re-exports +# --------------------------------------------------------------------------- + + +class TestPackageExports: + def test_base_package_re_exports_resilience_helpers(self) -> None: + from layerlens.instrument.adapters._base import ( + DEFAULT_FAILURE_THRESHOLD as T1, + AdapterInfo as A1, + BaseAdapter as B1, + HealthStatus as H1, + ResilienceTracker as R1, + get_default_for as G1, + resilient_callback as RC1, + ) + + # Sanity — every public symbol resolves and is the right kind. + assert A1 is AdapterInfo + assert B1 is BaseAdapter + assert R1 is ResilienceTracker + assert RC1 is resilient_callback + assert H1 is HealthStatus + assert T1 == DEFAULT_FAILURE_THRESHOLD + assert G1 is get_default_for + + +# --------------------------------------------------------------------------- +# Decorator preserves function metadata +# --------------------------------------------------------------------------- + + +class TestDecoratorMetadata: + def test_wrapped_function_keeps_name_and_docstring(self) -> None: + class _A: + name = "x" + _resilience = ResilienceTracker("x") + + @resilient_callback(callback_name="cb") + def cb(self) -> None: + """My docstring.""" + pass + + # functools.wraps preserves __name__ and __doc__ — important for + # frameworks that introspect handlers by name (boto3 event system + # uses handler identity for unregister()). + assert _A.cb.__name__ == "cb" + assert _A.cb.__doc__ == "My docstring." + + +# --------------------------------------------------------------------------- +# End-to-end: verifying the per-adapter failure scenario +# --------------------------------------------------------------------------- + + +class TestPerAdapterCallbackException: + """Simulate a callback exception on each lighter adapter and assert + the framework continues unaffected. + + Each test instantiates the adapter, monkey-patches one of its + callback methods to raise, then invokes the callback and asserts: + + 1. No exception escaped (framework would crash otherwise). + 2. The resilience tracker incremented its failure counter. + 3. Repeated failures cross the threshold and degrade adapter health. + """ + + @pytest.mark.parametrize( + "module_path, class_name, callback_name, callback_args", + [ + ( + "layerlens.instrument.adapters.frameworks.agno", + "AgnoAdapter", + "_on_run_start", + (Mock(), "input"), + ), + ( + "layerlens.instrument.adapters.frameworks.agno", + "AgnoAdapter", + "_on_run_end", + (Mock(), Mock(), None), + ), + ( + "layerlens.instrument.adapters.frameworks.smolagents", + "SmolAgentsAdapter", + "_on_run_start", + (Mock(), "task"), + ), + ( + "layerlens.instrument.adapters.frameworks.smolagents", + "SmolAgentsAdapter", + "_on_run_end", + (Mock(), Mock(), None), + ), + ( + "layerlens.instrument.adapters.frameworks.smolagents", + "SmolAgentsAdapter", + "_on_run_error", + (Mock(), _Boom("framework error")), + ), + ( + "layerlens.instrument.adapters.frameworks.smolagents", + "SmolAgentsAdapter", + "_on_action_step", + (Mock(), Mock()), + ), + ], + ) + def test_callback_exception_caught_and_counted( + self, + module_path: str, + class_name: str, + callback_name: str, + callback_args: tuple, + ) -> None: + import importlib + + module = importlib.import_module(module_path) + adapter_cls = getattr(module, class_name) + adapter = adapter_cls(Mock()) + + # Force the underlying body to raise by sabotaging an inner + # helper the callback always calls. The simplest way is to patch + # ``adapter._payload`` to raise — every callback uses it. + original_payload = adapter._payload + + def _raise_on_payload(*args: Any, **kwargs: Any) -> Dict[str, Any]: + raise _Boom("simulated callback failure") + + adapter._payload = _raise_on_payload # type: ignore[method-assign] + + try: + cb = getattr(adapter, callback_name) + # Must not raise — that's the entire resilience contract. + cb(*callback_args) + # Failure recorded against this exact callback name. + assert adapter._resilience.total_failures >= 1 + finally: + adapter._payload = original_payload # type: ignore[method-assign] + + def test_repeated_failures_degrade_adapter(self) -> None: + # Use agno as the proxy — same wiring applies to all 10 lighter + # adapters because they all inherit from FrameworkAdapter and use + # @resilient_callback on their entry points. + from layerlens.instrument.adapters.frameworks.agno import AgnoAdapter + + adapter = AgnoAdapter(Mock()) + + def _raise_on_payload(*args: Any, **kwargs: Any) -> Dict[str, Any]: + raise _Boom("persistent failure") + + adapter._payload = _raise_on_payload # type: ignore[method-assign] + + for _ in range(DEFAULT_FAILURE_THRESHOLD): + adapter._on_run_start(Mock(), "input") + + info = adapter.adapter_info() + assert info.metadata["resilience_status"] == "degraded" diff --git a/tests/instrument/adapters/frameworks/test_langfuse.py b/tests/instrument/adapters/frameworks/test_langfuse.py index 86c50ff2..70aca581 100644 --- a/tests/instrument/adapters/frameworks/test_langfuse.py +++ b/tests/instrument/adapters/frameworks/test_langfuse.py @@ -222,13 +222,21 @@ def test_adapter_info_returns_correct_metadata(self, connected_adapter): assert info.name == "langfuse" assert info.adapter_type == "framework" assert info.connected is True - assert info.metadata == {"host": "https://test.langfuse.com"} + # The Langfuse-specific ``host`` metadata must be present; resilience + # health metadata is added by FrameworkAdapter.adapter_info() to + # every framework adapter — assert presence of both surfaces. + assert info.metadata["host"] == "https://test.langfuse.com" + assert info.metadata["resilience_status"] == "healthy" def test_adapter_info_disconnected(self, mock_client): adapter = LangfuseAdapter(mock_client) info = adapter.adapter_info() assert info.connected is False - assert info.metadata == {} + # Disconnected adapters expose only the resilience health surface + # (no per-adapter metadata since connect() never populated it). + assert info.metadata.get("host") is None + assert info.metadata["resilience_status"] == "healthy" + assert info.metadata["resilience_failures_total"] == 0 # ===================================================================