diff --git a/src/layerlens/instrument/adapters/frameworks/pydantic_ai/lifecycle.py b/src/layerlens/instrument/adapters/frameworks/pydantic_ai/lifecycle.py index 3dc974a..b594b4b 100644 --- a/src/layerlens/instrument/adapters/frameworks/pydantic_ai/lifecycle.py +++ b/src/layerlens/instrument/adapters/frameworks/pydantic_ai/lifecycle.py @@ -7,6 +7,30 @@ ModelRequestNode → model.invoke (L3) CallToolsNode → tool.call (L5a) AgentRun transitions → agent.state.change (Cross) + +Typed-event status (post PR #129 migration, bundle 5): + +* Every emission flows through :meth:`BaseAdapter.emit_event` with a + canonical Pydantic payload imported from + :mod:`layerlens.instrument._compat.events`. +* PydanticAI-specific provenance (``framework``, ``agent_name``, + ``timestamp_ns``, ``duration_ns``, ``tools``, ``result_type``, + ``system_prompt``, ``model``) is carried in the canonical model's + metadata / attributes / parameters / input slots. +* The ad-hoc ``agent.state.change`` ``run_complete`` / ``run_failed`` + marker emitted by :meth:`on_run_end` does NOT satisfy the canonical + :class:`AgentStateChangeEvent` ``before_hash`` / ``after_hash`` + contract (the run boundary has no real state mutation to hash). + Following the PR #151 ms_agent_framework precedent, the marker is + carried as ``run_status`` on :class:`AgentOutputEvent.metadata`, + preserving the cross-cutting completion signal without violating + the canonical schema. +* PydanticAI tools execute in-process (via the ``@agent.tool`` + decorator), so the L5a integration is :class:`IntegrationType.LIBRARY`. +* The handoff context hash is generated via SHA-256 over the context + string (or the empty string when no context is available) so the + canonical :class:`AgentHandoffEvent.handoff_context_hash` validator + always passes. """ from __future__ import annotations @@ -18,6 +42,18 @@ import threading from typing import Any +from layerlens.instrument._compat.events import ( + MessageRole, + ToolCallEvent, + AgentInputEvent, + CostRecordEvent, + EnvironmentType, + IntegrationType, + AgentOutputEvent, + ModelInvokeEvent, + AgentHandoffEvent, + EnvironmentConfigEvent, +) from layerlens.instrument.adapters._base.adapter import ( AdapterInfo, BaseAdapter, @@ -31,6 +67,55 @@ logger = logging.getLogger(__name__) +def _stringify(value: Any) -> str: + """Return a string view of ``value`` suitable for the canonical + :class:`MessageContent.message` field. + + The canonical schema requires :class:`AgentInputEvent` and + :class:`AgentOutputEvent` to carry a ``message: str``. PydanticAI + delivers user prompts and run results as arbitrary Python objects + (strings, Pydantic ``BaseModel`` instances when ``result_type`` is + set, ``None``); this helper converts each to a (possibly empty) + string so the typed event always validates. The original payload + is preserved on :class:`MessageContent.metadata.raw_input` / + ``raw_output``. + """ + if value is None: + return "" + if isinstance(value, str): + return value + return str(value) + + +def _coerce_to_dict(value: Any) -> dict[str, Any]: + """Coerce ``value`` into a dict suitable for the canonical + :class:`ToolCallEvent` ``input`` / ``output`` slots. + + The canonical schema requires ``input: dict[str, Any]`` and + ``output: dict[str, Any] | None``. PydanticAI tool returns can be + arbitrary Python values (Pydantic models, scalars, dicts). This + helper wraps non-dict values in ``{"value": ...}`` so the + canonical slot is always satisfied. + """ + if value is None: + return {} + if isinstance(value, dict): + return dict(value) + return {"value": value} + + +def _sha256_of(value: str) -> str: + """Return a canonical ``sha256:`` hash string for ``value``. + + The canonical schema's :class:`AgentHandoffEvent` requires + ``handoff_context_hash`` to start with ``sha256:`` and have a + 64-character hex tail. Centralising the format here ensures every + emit site uses the same wire format — including the empty-string + fallback used when PydanticAI has no handoff context to hash. + """ + return "sha256:" + hashlib.sha256(value.encode("utf-8")).hexdigest() + + class PydanticAIAdapter(BaseAdapter): """LayerLens adapter for PydanticAI.""" @@ -41,6 +126,13 @@ class PydanticAIAdapter(BaseAdapter): # There is no v1 path; the framework cannot be installed alongside v1. requires_pydantic = PydanticCompat.V2_ONLY + # Per-adapter ``extra="allow"`` decision: pydantic_ai targets the + # canonical 13-event taxonomy exclusively. Unknown event types must + # be rejected by the base adapter's typed-event validator, so this + # stays ``False``. See ``docs/adapters/typed-events.md`` for the + # opt-in policy. + ALLOW_UNREGISTERED_EVENTS: bool = False + def __init__( self, stratix: Any | None = None, @@ -196,43 +288,64 @@ def traced_run_sync(*args: Any, **kwargs: Any) -> Any: return traced_run_sync def _extract_run_usage(self, result: Any) -> None: - """Extract usage info from PydanticAI RunResult.""" + """Extract usage info from PydanticAI RunResult. + + Emits typed :class:`CostRecordEvent`, :class:`ModelInvokeEvent`, + and :class:`ToolCallEvent` payloads depending on what the + ``RunResult`` exposes. PydanticAI tool returns are in-process + Python callables (``@agent.tool``), so the integration is + :class:`IntegrationType.LIBRARY`. + """ if result is None: return try: usage = getattr(result, "usage", None) or getattr(result, "_usage", None) if usage: - self.emit_dict_event( - "cost.record", - { - "framework": "pydantic_ai", - "tokens_prompt": getattr(usage, "request_tokens", None), - "tokens_completion": getattr(usage, "response_tokens", None), - "tokens_total": getattr(usage, "total_tokens", None), - }, + prompt_tokens = getattr(usage, "request_tokens", None) + completion_tokens = getattr(usage, "response_tokens", None) + total_tokens = getattr(usage, "total_tokens", None) + self.emit_event( + CostRecordEvent.create( + prompt_tokens=prompt_tokens, + completion_tokens=completion_tokens, + tokens=total_tokens, + ) ) # Extract model invocation details all_messages = getattr(result, "all_messages", None) or [] for msg in all_messages: msg_kind = getattr(msg, "kind", None) if msg_kind == "response": - model = getattr(result, "model_name", None) - self.emit_dict_event( - "model.invoke", - { - "framework": "pydantic_ai", - "model": model, - "provider": self._detect_provider(model), - }, + model_raw = getattr(result, "model_name", None) + model_name = str(model_raw) if model_raw else "unknown" + provider = self._detect_provider(model_name) or "unknown" + self.emit_event( + ModelInvokeEvent.create( + provider=provider, + name=model_name, + version="unavailable", + parameters={"framework": "pydantic_ai"}, + ) ) elif msg_kind == "tool-return": - self.emit_dict_event( - "tool.call", - { - "framework": "pydantic_ai", - "tool_name": getattr(msg, "tool_name", "unknown"), - "tool_output": self._safe_serialize(getattr(msg, "content", None)), - }, + tool_name_raw = getattr(msg, "tool_name", None) + tool_name = str(tool_name_raw) if tool_name_raw else "unknown" + serialised_output = self._safe_serialize( + getattr(msg, "content", None) + ) + output_data: dict[str, Any] | None = ( + _coerce_to_dict(serialised_output) + if serialised_output is not None + else None + ) + self.emit_event( + ToolCallEvent.create( + name=tool_name, + version="unavailable", + integration=IntegrationType.LIBRARY, + input_data={"framework": "pydantic_ai"}, + output_data=output_data, + ) ) except Exception: logger.debug("Could not extract run usage", exc_info=True) @@ -240,6 +353,7 @@ def _extract_run_usage(self, result: Any) -> None: # --- Lifecycle Hooks --- def on_run_start(self, agent_name: str | None = None, input_data: Any = None) -> None: + """Emit a typed :class:`AgentInputEvent` when an agent run starts.""" if not self._connected: return try: @@ -247,14 +361,18 @@ def on_run_start(self, agent_name: str | None = None, input_data: Any = None) -> start_ns = time.time_ns() with self._adapter_lock: self._run_starts[tid] = start_ns - self.emit_dict_event( - "agent.input", - { - "framework": "pydantic_ai", - "agent_name": agent_name, - "input": self._safe_serialize(input_data), - "timestamp_ns": start_ns, - }, + raw_input = self._safe_serialize(input_data) + self.emit_event( + AgentInputEvent.create( + message=_stringify(raw_input), + role=MessageRole.HUMAN, + metadata={ + "framework": "pydantic_ai", + "agent_name": agent_name, + "timestamp_ns": start_ns, + "raw_input": raw_input, + }, + ) ) except Exception: logger.warning("Error in on_run_start", exc_info=True) @@ -265,6 +383,20 @@ def on_run_end( output: Any = None, error: Exception | None = None, ) -> None: + """Emit a typed :class:`AgentOutputEvent` when an agent run ends. + + The previous adapter implementation also emitted a separate + ``agent.state.change`` payload carrying only an + ``event_subtype`` marker (``run_complete`` / ``run_failed``). + That payload did not satisfy the canonical + :class:`AgentStateChangeEvent` ``before_hash`` / ``after_hash`` + contract — the run boundary has no real state mutation to + hash. The post-migration mapping (matching the PR #151 + ms_agent_framework precedent) carries the same signal as + ``run_status`` on :class:`MessageContent.metadata`, preserving + the cross-cutting completion marker without violating the + canonical schema. + """ if not self._connected: return try: @@ -273,22 +405,21 @@ def on_run_end( with self._adapter_lock: start_ns = self._run_starts.pop(tid, 0) duration_ns = end_ns - start_ns if start_ns else 0 - payload: dict[str, Any] = { + raw_output = self._safe_serialize(output) + metadata: dict[str, Any] = { "framework": "pydantic_ai", "agent_name": agent_name, - "output": self._safe_serialize(output), "duration_ns": duration_ns, + "raw_output": raw_output, + "run_status": "run_complete" if not error else "run_failed", } if error: - payload["error"] = str(error) - self.emit_dict_event("agent.output", payload) - self.emit_dict_event( - "agent.state.change", - { - "framework": "pydantic_ai", - "agent_name": agent_name, - "event_subtype": "run_complete" if not error else "run_failed", - }, + metadata["error"] = str(error) + self.emit_event( + AgentOutputEvent.create( + message=_stringify(raw_output), + metadata=metadata, + ) ) except Exception: logger.warning("Error in on_run_end", exc_info=True) @@ -301,20 +432,35 @@ def on_tool_use( error: Exception | None = None, latency_ms: float | None = None, ) -> None: + """Emit a typed :class:`ToolCallEvent` for a tool invocation. + + PydanticAI tools are in-process Python callables decorated + with ``@agent.tool`` — this maps to + :class:`IntegrationType.LIBRARY`. Tool versions are not + surfaced by the framework, so ``version`` falls back to + ``"unavailable"`` per the canonical NORMATIVE rule. + """ if not self._connected: return try: - payload: dict[str, Any] = { - "framework": "pydantic_ai", - "tool_name": tool_name, - "tool_input": self._safe_serialize(tool_input), - "tool_output": self._safe_serialize(tool_output), - } - if error: - payload["error"] = str(error) - if latency_ms is not None: - payload["latency_ms"] = latency_ms - self.emit_dict_event("tool.call", payload) + serialised_input = self._safe_serialize(tool_input) + serialised_output = self._safe_serialize(tool_output) + input_data = _coerce_to_dict(serialised_input) + input_data.setdefault("framework", "pydantic_ai") + output_data: dict[str, Any] | None = ( + _coerce_to_dict(serialised_output) if serialised_output is not None else None + ) + self.emit_event( + ToolCallEvent.create( + name=tool_name, + version="unavailable", + integration=IntegrationType.LIBRARY, + input_data=input_data, + output_data=output_data, + error=str(error) if error else None, + latency_ms=latency_ms, + ) + ) except Exception: logger.warning("Error in on_tool_use", exc_info=True) @@ -327,39 +473,42 @@ def on_llm_call( latency_ms: float | None = None, messages: list[dict[str, str]] | None = None, ) -> None: + """Emit a typed :class:`ModelInvokeEvent` for an LLM call.""" if not self._connected: return try: - payload: dict[str, Any] = {"framework": "pydantic_ai"} - if provider: - payload["provider"] = provider - if model: - payload["model"] = model - if tokens_prompt is not None: - payload["tokens_prompt"] = tokens_prompt - if tokens_completion is not None: - payload["tokens_completion"] = tokens_completion - if latency_ms is not None: - payload["latency_ms"] = latency_ms + model_name = model or "unknown" + resolved_provider = provider or self._detect_provider(model_name) or "unknown" + input_messages: list[dict[str, str]] | None = None if self._capture_config.capture_content and messages: - payload["messages"] = messages - self.emit_dict_event("model.invoke", payload) + input_messages = messages + self.emit_event( + ModelInvokeEvent.create( + provider=resolved_provider, + name=model_name, + version="unavailable", + parameters={"framework": "pydantic_ai"}, + prompt_tokens=tokens_prompt, + completion_tokens=tokens_completion, + latency_ms=latency_ms, + input_messages=input_messages, + ) + ) except Exception: logger.warning("Error in on_llm_call", exc_info=True) def on_handoff(self, from_agent: str, to_agent: str, context: Any = None) -> None: + """Emit a typed :class:`AgentHandoffEvent` for an agent handoff.""" if not self._connected: return try: context_str = str(context) if context else "" - self.emit_dict_event( - "agent.handoff", - { - "from_agent": from_agent, - "to_agent": to_agent, - "reason": "pydantic_ai_handoff", - "context_hash": hashlib.sha256(context_str.encode()).hexdigest() if context_str else None, - }, + self.emit_event( + AgentHandoffEvent.create( + from_agent=from_agent, + to_agent=to_agent, + handoff_context_hash=_sha256_of(context_str), + ) ) except Exception: logger.warning("Error in on_handoff", exc_info=True) @@ -381,30 +530,46 @@ def _detect_provider(self, model: str | None) -> str | None: return None def _emit_agent_config(self, agent_name: str, agent: Any) -> None: + """Emit a typed :class:`EnvironmentConfigEvent` per agent. + + Idempotent per agent — only the first call for a given agent + name actually emits. PydanticAI agents run in a ``simulated`` + environment by default (the host application is responsible + for emitting the real ``cloud`` / ``on_prem`` environment + record). PydanticAI-specific provenance (``framework``, + ``agent_name``, ``model``, ``tools``, ``system_prompt``, + ``result_type``) lives on + :attr:`EnvironmentInfo.attributes`. + """ with self._adapter_lock: if agent_name in self._seen_agents: return self._seen_agents.add(agent_name) - metadata: dict[str, Any] = { + attributes: dict[str, Any] = { "framework": "pydantic_ai", "agent_name": agent_name, } model = getattr(agent, "model", None) if model: - metadata["model"] = str(model) + attributes["model"] = str(model) system_prompt = getattr(agent, "system_prompt", None) if system_prompt and self._capture_config.capture_content: - metadata["system_prompt"] = str(system_prompt)[:500] + attributes["system_prompt"] = str(system_prompt)[:500] tools = getattr(agent, "_function_tools", None) or getattr(agent, "tools", None) if tools: if isinstance(tools, dict): - metadata["tools"] = list(tools.keys()) + attributes["tools"] = list(tools.keys()) else: - metadata["tools"] = [getattr(t, "name", str(t)) for t in tools] + attributes["tools"] = [getattr(t, "name", str(t)) for t in tools] result_type = getattr(agent, "result_type", None) if result_type: - metadata["result_type"] = str(result_type) - self.emit_dict_event("environment.config", metadata) + attributes["result_type"] = str(result_type) + self.emit_event( + EnvironmentConfigEvent.create( + env_type=EnvironmentType.SIMULATED, + attributes=attributes, + ) + ) def _safe_serialize(self, value: Any) -> Any: try: diff --git a/src/layerlens/instrument/adapters/frameworks/semantic_kernel/__init__.py b/src/layerlens/instrument/adapters/frameworks/semantic_kernel/__init__.py new file mode 100644 index 0000000..bb11927 --- /dev/null +++ b/src/layerlens/instrument/adapters/frameworks/semantic_kernel/__init__.py @@ -0,0 +1,16 @@ +""" +STRATIX Semantic Kernel Adapter + +Provides plugin invocation tracing, planner execution tracking, +and memory operation capture for Microsoft Semantic Kernel. +""" + +from __future__ import annotations + +from layerlens.instrument.adapters.frameworks.semantic_kernel.lifecycle import ( + SemanticKernelAdapter, +) + +ADAPTER_CLASS = SemanticKernelAdapter + +__all__ = ["SemanticKernelAdapter", "ADAPTER_CLASS"] diff --git a/src/layerlens/instrument/adapters/frameworks/semantic_kernel/filters.py b/src/layerlens/instrument/adapters/frameworks/semantic_kernel/filters.py new file mode 100644 index 0000000..2e30ba8 --- /dev/null +++ b/src/layerlens/instrument/adapters/frameworks/semantic_kernel/filters.py @@ -0,0 +1,259 @@ +""" +Semantic Kernel Filter Implementations + +Provides STRATIX-instrumented filter classes for the SK filter API: +- LayerLensFunctionFilter: Function invocation pre/post hooks +- LayerLensPromptRenderFilter: Prompt template rendering hooks +- LayerLensAutoFunctionFilter: Auto-invoked function hooks +""" + +from __future__ import annotations + +import logging +from typing import TYPE_CHECKING, Any + +if TYPE_CHECKING: + from layerlens.instrument.adapters.frameworks.semantic_kernel.lifecycle import SemanticKernelAdapter + +logger = logging.getLogger(__name__) + + +class LayerLensFunctionFilter: + """ + Intercepts SK function invocations via the FunctionInvocationFilter API. + + Captures plugin name, function name, arguments, result, and latency. + """ + + def __init__(self, adapter: SemanticKernelAdapter) -> None: + self._adapter = adapter + self._contexts: dict[int, dict[str, Any]] = {} + + async def __call__(self, context: Any, next: Any = None) -> None: + """SK filter callable interface: (context, next=...) -> Awaitable[None].""" + return await self.on_function_invocation(context, next) + + async def on_function_invocation( + self, + context: Any, + next_handler: Any = None, + ) -> None: + """Pre/post hook for function invocation.""" + plugin_name = self._extract_plugin_name(context) + function_name = self._extract_function_name(context) + arguments = self._extract_arguments(context) + + try: + trace_ctx = self._adapter.on_function_start( + plugin_name=plugin_name, + function_name=function_name, + arguments=arguments, + ) + except Exception: + logger.warning("Error in function start hook", exc_info=True) + trace_ctx = {} + + error = None + try: + if next_handler: + await next_handler(context) + except Exception as exc: + error = exc + raise + finally: + try: + result = self._extract_result(context) + self._adapter.on_function_end( + context=trace_ctx, + result=result, + error=error, + ) + except Exception: + logger.warning("Error in function end hook", exc_info=True) + + def on_function_invocation_sync( + self, + plugin_name: str, + function_name: str, + arguments: dict[str, Any] | None = None, + result: Any = None, + error: Exception | None = None, + ) -> None: + """Synchronous hook for testing and non-async usage.""" + try: + trace_ctx = self._adapter.on_function_start( + plugin_name=plugin_name, + function_name=function_name, + arguments=arguments, + ) + self._adapter.on_function_end( + context=trace_ctx, + result=result, + error=error, + ) + except Exception: + logger.warning("Error in sync function hook", exc_info=True) + + @staticmethod + def _extract_plugin_name(context: Any) -> str: + """Extract plugin name from SK invocation context.""" + if hasattr(context, "function"): + fn = context.function + return getattr(fn, "plugin_name", "") or getattr(fn, "skill_name", "") or "" + return getattr(context, "plugin_name", "") or "" + + @staticmethod + def _extract_function_name(context: Any) -> str: + if hasattr(context, "function"): + fn = context.function + return getattr(fn, "name", "") or "" + return getattr(context, "function_name", "") or "" + + @staticmethod + def _extract_arguments(context: Any) -> dict[str, Any] | None: + args = getattr(context, "arguments", None) + if args is None: + return None + if isinstance(args, dict): + return args + if hasattr(args, "items"): + return dict(args.items()) + return None + + @staticmethod + def _extract_result(context: Any) -> Any: + return getattr(context, "result", None) + + +class LayerLensPromptRenderFilter: + """ + Intercepts SK prompt rendering via the PromptRenderFilter API. + + Captures template text and rendered prompt string. + """ + + def __init__(self, adapter: SemanticKernelAdapter) -> None: + self._adapter = adapter + + async def __call__(self, context: Any, next: Any = None) -> None: + """SK filter callable interface.""" + return await self.on_prompt_render(context, next) + + async def on_prompt_render( + self, + context: Any, + next_handler: Any = None, + ) -> None: + """Pre/post hook for prompt rendering.""" + function_name = getattr(context, "function_name", None) or "" + template = getattr(context, "prompt_template", None) + + if next_handler: + await next_handler(context) + + try: + rendered = getattr(context, "rendered_prompt", None) + self._adapter.on_prompt_render( + template=str(template) if template else None, + rendered_prompt=str(rendered) if rendered else None, + function_name=function_name, + ) + except Exception: + logger.warning("Error in prompt render hook", exc_info=True) + + def on_prompt_render_sync( + self, + template: str | None = None, + rendered_prompt: str | None = None, + function_name: str | None = None, + ) -> None: + """Synchronous hook for testing.""" + try: + self._adapter.on_prompt_render( + template=template, + rendered_prompt=rendered_prompt, + function_name=function_name, + ) + except Exception: + logger.warning("Error in sync prompt render hook", exc_info=True) + + +class LayerLensAutoFunctionFilter: + """ + Intercepts LLM-initiated (auto-invoked) function calls via + the AutoFunctionInvocationFilter API. + + Marks all emitted events with auto_invoked=True. + """ + + def __init__(self, adapter: SemanticKernelAdapter) -> None: + self._adapter = adapter + + async def __call__(self, context: Any, next: Any = None) -> None: + """SK filter callable interface.""" + return await self.on_auto_function_invocation(context, next) + + async def on_auto_function_invocation( + self, + context: Any, + next_handler: Any = None, + ) -> None: + """Pre/post hook for auto-invoked functions.""" + plugin_name = LayerLensFunctionFilter._extract_plugin_name(context) + function_name = LayerLensFunctionFilter._extract_function_name(context) + arguments = LayerLensFunctionFilter._extract_arguments(context) + + try: + trace_ctx = self._adapter.on_function_start( + plugin_name=plugin_name, + function_name=function_name, + arguments=arguments, + auto_invoked=True, + ) + except Exception: + logger.warning("Error in auto function start hook", exc_info=True) + trace_ctx = {} + + error = None + try: + if next_handler: + await next_handler(context) + except Exception as exc: + error = exc + raise + finally: + try: + result = LayerLensFunctionFilter._extract_result(context) + self._adapter.on_function_end( + context=trace_ctx, + result=result, + error=error, + auto_invoked=True, + ) + except Exception: + logger.warning("Error in auto function end hook", exc_info=True) + + def on_auto_function_invocation_sync( + self, + plugin_name: str, + function_name: str, + arguments: dict[str, Any] | None = None, + result: Any = None, + error: Exception | None = None, + ) -> None: + """Synchronous hook for testing.""" + try: + trace_ctx = self._adapter.on_function_start( + plugin_name=plugin_name, + function_name=function_name, + arguments=arguments, + auto_invoked=True, + ) + self._adapter.on_function_end( + context=trace_ctx, + result=result, + error=error, + auto_invoked=True, + ) + except Exception: + logger.warning("Error in sync auto function hook", exc_info=True) diff --git a/src/layerlens/instrument/adapters/frameworks/semantic_kernel/lifecycle.py b/src/layerlens/instrument/adapters/frameworks/semantic_kernel/lifecycle.py index 82de307..b74d81a 100644 --- a/src/layerlens/instrument/adapters/frameworks/semantic_kernel/lifecycle.py +++ b/src/layerlens/instrument/adapters/frameworks/semantic_kernel/lifecycle.py @@ -4,16 +4,59 @@ Provides the main SemanticKernelAdapter class. Instruments SK Kernel instances via the official filter API (FunctionInvocationFilter, PromptRenderFilter, AutoFunctionInvocationFilter). + +Typed-event status (post PR #129 migration, bundle 5): + +* Every emission flows through :meth:`BaseAdapter.emit_event` with a + canonical Pydantic payload imported from + :mod:`layerlens.instrument._compat.events`. +* Microsoft Semantic Kernel-specific provenance (``framework``, + ``plugin_name``, ``function_name``, ``invocation_seq``, + ``auto_invoked``, ``timestamp_ns``, ``backend_type``, + ``relevance_scores``, ``planner_type``) is carried in the canonical + model's metadata / attributes / parameters / input slots. +* SK plugins and skills execute in-process inside the host Python + runtime (``Kernel.invoke`` calls Python callables registered + through ``KernelFunction``). The L5a integration is therefore + :class:`IntegrationType.LIBRARY` for both function invocations and + memory operations. Memory backends (``qdrant``, ``redis``, etc.) + are recorded as ``backend_type`` provenance — the *call* is still + in-process even when the storage is remote. +* The ad-hoc ``agent.code`` event type (used for prompt rendering and + planner step boundaries) is NOT in the canonical 13-event taxonomy. + Following the PR #138 smolagents precedent, those boundaries are + re-mapped onto :class:`ToolLogicEvent` (L5b — tool business logic). + The semantic fit is good: prompt rendering applies templating + rules to generate a final prompt; planner steps emit reasoning + rules (thought/action/observation) for plan execution. The L5b + ``description`` slot carries the operation summary and ``rules`` + carries the per-event detail (template preview, planner status, + etc.). Framework provenance (``framework``, ``event_subtype``, + ``function_name``, ``planner_type``, ``step_index``) is preserved + as additional rules entries via JSON-encoded key/value pairs. """ from __future__ import annotations +import json import time import uuid import logging import threading from typing import Any +from layerlens.instrument._compat.events import ( + MessageRole, + ToolCallEvent, + AgentInputEvent, + CostRecordEvent, + EnvironmentType, + IntegrationType, + AgentOutputEvent, + ModelInvokeEvent, + ToolLogicEvent, + EnvironmentConfigEvent, +) from layerlens.instrument.adapters._base.adapter import ( AdapterInfo, BaseAdapter, @@ -28,6 +71,59 @@ logger = logging.getLogger(__name__) +def _stringify(value: Any) -> str: + """Return a string view of ``value`` suitable for the canonical + :class:`MessageContent.message` field. + + The canonical schema requires :class:`AgentInputEvent` and + :class:`AgentOutputEvent` to carry a ``message: str``. Semantic + Kernel kernel arguments and results may be arbitrary Python + objects (``KernelArguments``, ``FunctionResult``, dicts, + ``None``); this helper converts each to a (possibly empty) + string so the typed event always validates. The original payload + is preserved on :class:`MessageContent.metadata.raw_input` / + ``raw_output``. + """ + if value is None: + return "" + if isinstance(value, str): + return value + return str(value) + + +def _coerce_to_dict(value: Any) -> dict[str, Any]: + """Coerce ``value`` into a dict suitable for the canonical + :class:`ToolCallEvent` ``input`` / ``output`` slots. + + The canonical schema requires ``input: dict[str, Any]`` and + ``output: dict[str, Any] | None``. SK function/skill returns can + be arbitrary Python values. This helper wraps non-dict values in + ``{"value": ...}`` so the canonical slot is always satisfied. + """ + if value is None: + return {} + if isinstance(value, dict): + return dict(value) + return {"value": value} + + +def _kv_rule(key: str, value: Any) -> str: + """Format a key/value pair as a single rule entry for L5b events. + + The canonical :class:`ToolLogicInfo.rules` slot is ``list[str]``. + To preserve the framework's per-event provenance (function name, + planner status, step index, etc.) without losing structure, each + key/value pair is encoded as ``"="``. + Consumers of the trace stream can split on the first ``=`` to + recover the original pair. + """ + try: + encoded = json.dumps(value, default=str, ensure_ascii=False) + except (TypeError, ValueError): + encoded = json.dumps(str(value)) + return f"{key}={encoded}" + + class SemanticKernelAdapter(BaseAdapter): """ Main adapter for integrating STRATIX with Microsoft Semantic Kernel. @@ -46,12 +142,20 @@ class SemanticKernelAdapter(BaseAdapter): VERSION = "0.1.0" # The adapter source files import nothing from ``pydantic`` directly # (verified by grep across ``frameworks/semantic_kernel/``). The - # adapter only registers SK filter callbacks and emits dict events; + # adapter only registers SK filter callbacks and emits typed events; # it never touches Semantic Kernel's own Pydantic models. SK 1.0+ is # internally Pydantic v2, but customers running older SK 0.x with # Pydantic v1 can still use this adapter. requires_pydantic = PydanticCompat.V1_OR_V2 + # Per-adapter ``extra="allow"`` decision: semantic_kernel targets + # the canonical 13-event taxonomy exclusively. Unknown event types + # must be rejected by the base adapter's typed-event validator, so + # this stays ``False``. The legacy ``agent.code`` event type is + # re-mapped onto :class:`ToolLogicEvent` rather than passed through + # as an unregistered event — see the module docstring. + ALLOW_UNREGISTERED_EVENTS: bool = False + def __init__( self, stratix: Any | None = None, @@ -186,6 +290,11 @@ def on_function_start( Handle function invocation start. Returns context dict for correlation with on_function_end. + + Emits a typed :class:`EnvironmentConfigEvent` on the first + encounter of a plugin (idempotent per plugin name) carrying + the SK plugin/function provenance on + :attr:`EnvironmentInfo.attributes`. """ with self._adapter_lock: self._invocation_count += 1 @@ -202,13 +311,15 @@ def on_function_start( with self._adapter_lock: if plugin_name not in self._seen_plugins: self._seen_plugins.add(plugin_name) - self.emit_dict_event( - "environment.config", - { - "framework": "semantic_kernel", - "plugin_name": plugin_name, - "function_name": function_name, - }, + self.emit_event( + EnvironmentConfigEvent.create( + env_type=EnvironmentType.SIMULATED, + attributes={ + "framework": "semantic_kernel", + "plugin_name": plugin_name, + "function_name": function_name, + }, + ) ) return context @@ -223,30 +334,47 @@ def on_function_end( """ Handle function invocation end. - Emits tool.call (L5a) for plugin functions. + Emits a typed :class:`ToolCallEvent` (L5a) for plugin + functions. SK plugin functions are in-process Python + callables registered through ``KernelFunction`` — the + L5a integration is :class:`IntegrationType.LIBRARY`. + Tool versions are not surfaced by SK so ``version`` + falls back to ``"unavailable"`` per the canonical + NORMATIVE rule. """ start_ns = context.get("start_ns", 0) - elapsed_ms = (time.time_ns() - start_ns) / 1_000_000 if start_ns else 0 + elapsed_ms = (time.time_ns() - start_ns) / 1_000_000 if start_ns else 0.0 + + plugin_name = context.get("plugin_name", "") + function_name = context.get("function_name", "") + tool_name = f"{plugin_name}.{function_name}" - payload: dict[str, Any] = { + input_data: dict[str, Any] = { "framework": "semantic_kernel", - "tool_name": f"{context.get('plugin_name', '')}.{context.get('function_name', '')}", - "plugin_name": context.get("plugin_name"), - "function_name": context.get("function_name"), - "latency_ms": elapsed_ms, + "plugin_name": plugin_name, + "function_name": function_name, "invocation_seq": context.get("invocation_seq"), } - if auto_invoked: - payload["auto_invoked"] = True + input_data["auto_invoked"] = True + output_data: dict[str, Any] | None = None if result is not None: - payload["result_preview"] = self._truncate(self._safe_serialize(result)) - - if error: - payload["error"] = str(error) - - self.emit_dict_event("tool.call", payload) + output_data = { + "result_preview": self._truncate(self._safe_serialize(result)), + } + + self.emit_event( + ToolCallEvent.create( + name=tool_name, + version="unavailable", + integration=IntegrationType.LIBRARY, + input_data=input_data, + output_data=output_data, + error=str(error) if error else None, + latency_ms=elapsed_ms, + ) + ) def on_prompt_render( self, @@ -257,20 +385,30 @@ def on_prompt_render( """ Handle prompt template rendering. - Emits agent.code (L2) for template rendering events. + Emits a typed :class:`ToolLogicEvent` (L5b) for template + rendering. The previous adapter implementation emitted an + ad-hoc ``agent.code`` event type that is NOT in the canonical + 13-event taxonomy. The post-migration mapping carries the + rendering operation as L5b business logic: prompt templating + applies a set of substitution rules to a template, which is + a clean fit for ``ToolLogicEvent``. The + :class:`ToolLogicInfo.description` slot carries a one-line + summary; per-event provenance (framework, function name, + previews) is encoded as key/value rule entries. """ - payload: dict[str, Any] = { - "framework": "semantic_kernel", - "event_subtype": "prompt_render", - } + rules: list[str] = [_kv_rule("framework", "semantic_kernel"), + _kv_rule("event_subtype", "prompt_render")] if function_name: - payload["function_name"] = function_name + rules.append(_kv_rule("function_name", function_name)) if template: - payload["template_preview"] = self._truncate(template, 500) + rules.append(_kv_rule("template_preview", self._truncate(template, 500))) if rendered_prompt: - payload["rendered_preview"] = self._truncate(rendered_prompt, 500) + rules.append(_kv_rule("rendered_preview", self._truncate(rendered_prompt, 500))) - self.emit_dict_event("agent.code", payload) + description = ( + f"prompt_render: {function_name}" if function_name else "prompt_render" + ) + self.emit_event(ToolLogicEvent.create(description=description, rules=rules)) def on_model_invoke( self, @@ -285,40 +423,46 @@ def on_model_invoke( """ Handle LLM call from SK service. - Emits model.invoke (L3) and cost.record (cross-cutting). + Emits a typed :class:`ModelInvokeEvent` (L3) and a paired + :class:`CostRecordEvent` (Cross) when token usage is known. + SK-specific provenance (``framework``, ``error``) is carried + on :attr:`ModelInfo.parameters`. """ - payload: dict[str, Any] = { - "framework": "semantic_kernel", - } - if provider: - payload["provider"] = provider - if model: - payload["model"] = model - if prompt_tokens is not None: - payload["prompt_tokens"] = prompt_tokens - if completion_tokens is not None: - payload["completion_tokens"] = completion_tokens - if latency_ms is not None: - payload["latency_ms"] = latency_ms + model_name = model or "unknown" + resolved_provider = provider or "azure_openai" + parameters: dict[str, Any] = {"framework": "semantic_kernel"} if error: - payload["error"] = error + parameters["error"] = error + input_messages: list[dict[str, str]] | None = None if self._capture_config.capture_content and messages: - payload["messages"] = messages - - self.emit_dict_event("model.invoke", payload) + input_messages = messages + + self.emit_event( + ModelInvokeEvent.create( + provider=resolved_provider, + name=model_name, + version="unavailable", + parameters=parameters, + prompt_tokens=prompt_tokens, + completion_tokens=completion_tokens, + total_tokens=( + (prompt_tokens or 0) + (completion_tokens or 0) + if (prompt_tokens or completion_tokens) + else None + ), + latency_ms=latency_ms, + input_messages=input_messages, + ) + ) # Emit cost record if prompt_tokens or completion_tokens: - self.emit_dict_event( - "cost.record", - { - "framework": "semantic_kernel", - "provider": provider, - "model": model, - "prompt_tokens": prompt_tokens or 0, - "completion_tokens": completion_tokens or 0, - "total_tokens": (prompt_tokens or 0) + (completion_tokens or 0), - }, + self.emit_event( + CostRecordEvent.create( + prompt_tokens=prompt_tokens or 0, + completion_tokens=completion_tokens or 0, + tokens=(prompt_tokens or 0) + (completion_tokens or 0), + ) ) def on_planner_step( @@ -334,27 +478,34 @@ def on_planner_step( """ Handle planner execution step. - Emits agent.code (L2) for plan generation and step execution. + Emits a typed :class:`ToolLogicEvent` (L5b) for plan + generation and step execution. Same rationale as + :meth:`on_prompt_render` — planner steps are a stream of + reasoning rules (thought / action / observation) that map + cleanly onto L5b's ``description`` + ``rules`` slots without + violating the canonical schema. """ - payload: dict[str, Any] = { - "framework": "semantic_kernel", - "event_subtype": "planner_step", - "planner_type": planner_type, - } + rules: list[str] = [ + _kv_rule("framework", "semantic_kernel"), + _kv_rule("event_subtype", "planner_step"), + _kv_rule("planner_type", planner_type), + ] if step_index is not None: - payload["step_index"] = step_index + rules.append(_kv_rule("step_index", step_index)) if plan is not None: - payload["plan_preview"] = self._truncate(str(plan), 1000) + rules.append(_kv_rule("plan_preview", self._truncate(str(plan), 1000))) if thought: - payload["thought"] = self._truncate(thought) + rules.append(_kv_rule("thought", self._truncate(thought))) if action: - payload["action"] = action + rules.append(_kv_rule("action", action)) if observation: - payload["observation"] = self._truncate(observation) + rules.append(_kv_rule("observation", self._truncate(observation))) if status: - payload["status"] = status + rules.append(_kv_rule("status", status)) - self.emit_dict_event("agent.code", payload) + step_label = f" step {step_index}" if step_index is not None else "" + description = f"planner_step: {planner_type}{step_label}" + self.emit_event(ToolLogicEvent.create(description=description, rules=rules)) def on_memory_operation( self, @@ -369,40 +520,62 @@ def on_memory_operation( """ Handle memory operation (save, search, get). - Emits tool.call (L5a) for memory operations. + Emits a typed :class:`ToolCallEvent` (L5a) for memory + operations. The SK ``MemoryStore`` API is invoked in-process + from the host runtime, so the integration is + :class:`IntegrationType.LIBRARY`. The remote storage backend + (``qdrant``, ``redis``, ``pinecone``, etc.) is recorded as + ``backend_type`` provenance on the canonical input dict — the + *call* is still in-process even when the storage is remote. """ - payload: dict[str, Any] = { + tool_name = f"memory.{operation}" + input_data: dict[str, Any] = { "framework": "semantic_kernel", - "tool_name": f"memory.{operation}", "operation": operation, } if collection: - payload["collection"] = collection + input_data["collection"] = collection if key: - payload["key"] = key + input_data["key"] = key if query: - payload["query_preview"] = self._truncate(query, 200) - if result_count is not None: - payload["result_count"] = result_count - if relevance_scores: - payload["relevance_scores"] = relevance_scores[:10] + input_data["query_preview"] = self._truncate(query, 200) if backend_type: - payload["backend_type"] = backend_type - - self.emit_dict_event("tool.call", payload) + input_data["backend_type"] = backend_type + + output_data: dict[str, Any] | None = None + if result_count is not None or relevance_scores: + output_data = {} + if result_count is not None: + output_data["result_count"] = result_count + if relevance_scores: + output_data["relevance_scores"] = relevance_scores[:10] + + self.emit_event( + ToolCallEvent.create( + name=tool_name, + version="unavailable", + integration=IntegrationType.LIBRARY, + input_data=input_data, + output_data=output_data, + ) + ) def on_kernel_invoke_start(self, input_text: Any = None) -> None: - """Handle kernel invocation start. Emits agent.input (L1).""" + """Handle kernel invocation start. Emits typed :class:`AgentInputEvent` (L1).""" with self._adapter_lock: self._kernel_start_ns = time.time_ns() - self.emit_dict_event( - "agent.input", - { - "framework": "semantic_kernel", - "input": self._safe_serialize(input_text), - "timestamp_ns": self._kernel_start_ns, - }, + raw_input = self._safe_serialize(input_text) + self.emit_event( + AgentInputEvent.create( + message=_stringify(raw_input), + role=MessageRole.HUMAN, + metadata={ + "framework": "semantic_kernel", + "timestamp_ns": self._kernel_start_ns, + "raw_input": raw_input, + }, + ) ) def on_kernel_invoke_end( @@ -410,24 +583,34 @@ def on_kernel_invoke_end( output: Any = None, error: Exception | None = None, ) -> None: - """Handle kernel invocation end. Emits agent.output (L1).""" + """Handle kernel invocation end. Emits typed :class:`AgentOutputEvent` (L1).""" end_ns = time.time_ns() duration_ns = end_ns - self._kernel_start_ns if self._kernel_start_ns else 0 - payload: dict[str, Any] = { + raw_output = self._safe_serialize(output) + metadata: dict[str, Any] = { "framework": "semantic_kernel", - "output": self._safe_serialize(output), "duration_ns": duration_ns, + "raw_output": raw_output, } if error: - payload["error"] = str(error) + metadata["error"] = str(error) - self.emit_dict_event("agent.output", payload) + self.emit_event( + AgentOutputEvent.create( + message=_stringify(raw_output), + metadata=metadata, + ) + ) # --- Plugin discovery --- def _discover_plugins(self, kernel: Any) -> None: - """Discover and register plugins from the kernel.""" + """Discover and register plugins from the kernel. + + Emits one typed :class:`EnvironmentConfigEvent` per discovered + plugin (idempotent per plugin name). + """ try: plugins = getattr(kernel, "plugins", None) if plugins is None: @@ -441,13 +624,15 @@ def _discover_plugins(self, kernel: Any) -> None: with self._adapter_lock: if name not in self._seen_plugins: self._seen_plugins.add(name) - self.emit_dict_event( - "environment.config", - { - "framework": "semantic_kernel", - "plugin_name": name, - "event_subtype": "plugin_registered", - }, + self.emit_event( + EnvironmentConfigEvent.create( + env_type=EnvironmentType.SIMULATED, + attributes={ + "framework": "semantic_kernel", + "plugin_name": name, + "event_subtype": "plugin_registered", + }, + ) ) except Exception: logger.debug("Error discovering SK plugins", exc_info=True) diff --git a/src/layerlens/instrument/adapters/frameworks/semantic_kernel/metadata.py b/src/layerlens/instrument/adapters/frameworks/semantic_kernel/metadata.py new file mode 100644 index 0000000..ee6275e --- /dev/null +++ b/src/layerlens/instrument/adapters/frameworks/semantic_kernel/metadata.py @@ -0,0 +1,60 @@ +""" +Semantic Kernel Metadata Extraction + +Extracts plugin and kernel configuration metadata for environment.config events. +""" + +from __future__ import annotations + +import logging +from typing import Any + +logger = logging.getLogger(__name__) + + +class SKMetadataExtractor: + """Extract metadata from Semantic Kernel components.""" + + def extract_plugin_metadata(self, plugin: Any) -> dict[str, Any]: + """Extract metadata from a registered plugin.""" + metadata: dict[str, Any] = {} + try: + metadata["plugin_name"] = getattr(plugin, "name", str(plugin)) + metadata["description"] = getattr(plugin, "description", None) + + # Extract function names + functions = getattr(plugin, "functions", None) + if functions: # noqa: SIM102 + if isinstance(functions, dict) or hasattr(functions, "keys"): + metadata["function_names"] = list(functions.keys()) + except Exception: + logger.debug("Error extracting plugin metadata", exc_info=True) + return metadata + + def extract_kernel_metadata(self, kernel: Any) -> dict[str, Any]: + """Extract metadata from a Kernel instance.""" + metadata: dict[str, Any] = {} + try: + # Extract registered plugins + plugins = getattr(kernel, "plugins", None) + if plugins: + if isinstance(plugins, dict): + metadata["plugin_count"] = len(plugins) + metadata["plugin_names"] = list(plugins.keys()) + elif hasattr(plugins, "__len__"): + metadata["plugin_count"] = len(plugins) + + # Extract registered services + services = getattr(kernel, "services", None) + if services and isinstance(services, dict): + metadata["service_count"] = len(services) + metadata["service_types"] = [type(s).__name__ for s in services.values()] + + # Extract memory backend + memory = getattr(kernel, "memory", None) + if memory: + metadata["memory_backend"] = type(memory).__name__ + + except Exception: + logger.debug("Error extracting kernel metadata", exc_info=True) + return metadata diff --git a/src/layerlens/instrument/adapters/frameworks/strands/lifecycle.py b/src/layerlens/instrument/adapters/frameworks/strands/lifecycle.py index b695506..78c32f0 100644 --- a/src/layerlens/instrument/adapters/frameworks/strands/lifecycle.py +++ b/src/layerlens/instrument/adapters/frameworks/strands/lifecycle.py @@ -8,6 +8,41 @@ Model invoke (Bedrock) -> model.invoke (L3) Conversation state -> agent.state.change (Cross) Cost (Bedrock pricing) -> cost.record (Cross) + +Typed-event status (post PR #129 migration, bundle 5): + +* Every emission flows through :meth:`BaseAdapter.emit_event` with a + canonical Pydantic payload imported from + :mod:`layerlens.instrument._compat.events`. +* AWS Strands-specific provenance (``framework``, ``agent_name``, + ``timestamp_ns``, ``duration_ns``, ``model``, ``tools``, + ``conversation_type``, ``turn_count``, ``system_prompt``) is + carried in the canonical model's metadata / attributes / + parameters / input slots. +* AWS Strands tool execution: Strands lets agents declare tools as + Python callables that run in the host runtime — even when the + underlying capability is an AWS service, the tool *call* is an + in-process Python invocation. The L5a integration is therefore + :class:`IntegrationType.LIBRARY`. (This deliberately differs + from the bedrock_agents adapter's :class:`IntegrationType.SERVICE` + mapping — bedrock_agents tool execution is performed by the + Bedrock service via Lambda action groups, not in-process.) +* The ad-hoc ``agent.state.change`` ``run_complete`` / ``run_failed`` + marker emitted by :meth:`on_run_end` does NOT satisfy the canonical + :class:`AgentStateChangeEvent` ``before_hash`` / ``after_hash`` + contract (the run boundary has no real state mutation to hash). + Following the PR #151 ms_agent_framework precedent, the marker is + carried as ``run_status`` on :class:`AgentOutputEvent.metadata`, + preserving the cross-cutting completion signal without violating + the canonical schema. +* The conversation-update ``agent.state.change`` emitted by + :meth:`_extract_run_details` carries Strands ``turn_count`` data + but not the canonical ``before_hash`` / ``after_hash`` (the + framework does not surface a hashable state snapshot at the + conversation boundary). It is mapped onto + :class:`AgentOutputEvent.metadata.conversation_state` so the + conversation-progress signal is preserved without violating the + canonical schema. """ from __future__ import annotations @@ -18,6 +53,17 @@ import threading from typing import Any +from layerlens.instrument._compat.events import ( + MessageRole, + ToolCallEvent, + AgentInputEvent, + CostRecordEvent, + EnvironmentType, + IntegrationType, + AgentOutputEvent, + ModelInvokeEvent, + EnvironmentConfigEvent, +) from layerlens.instrument.adapters._base.adapter import ( AdapterInfo, BaseAdapter, @@ -31,6 +77,41 @@ logger = logging.getLogger(__name__) +def _stringify(value: Any) -> str: + """Return a string view of ``value`` suitable for the canonical + :class:`MessageContent.message` field. + + The canonical schema requires :class:`AgentInputEvent` and + :class:`AgentOutputEvent` to carry a ``message: str``. AWS + Strands delivers user prompts and run outputs as arbitrary Python + objects (strings, message objects, ``None``); this helper + converts each to a (possibly empty) string so the typed event + always validates. The original payload is preserved on + :class:`MessageContent.metadata.raw_input` / ``raw_output``. + """ + if value is None: + return "" + if isinstance(value, str): + return value + return str(value) + + +def _coerce_to_dict(value: Any) -> dict[str, Any]: + """Coerce ``value`` into a dict suitable for the canonical + :class:`ToolCallEvent` ``input`` / ``output`` slots. + + The canonical schema requires ``input: dict[str, Any]`` and + ``output: dict[str, Any] | None``. Strands tool returns can be + arbitrary Python values. This helper wraps non-dict values in + ``{"value": ...}`` so the canonical slot is always satisfied. + """ + if value is None: + return {} + if isinstance(value, dict): + return dict(value) + return {"value": value} + + class StrandsAdapter(BaseAdapter): """LayerLens adapter for AWS Strands.""" @@ -38,10 +119,17 @@ class StrandsAdapter(BaseAdapter): VERSION = "0.1.0" # The adapter source has no direct ``pydantic`` imports (verified by # grep across ``frameworks/strands/``). Strands instrumentation hooks - # into agent callbacks and emits dict events without crossing the + # into agent callbacks and emits typed events without crossing the # framework's Pydantic boundary. requires_pydantic = PydanticCompat.V1_OR_V2 + # Per-adapter ``extra="allow"`` decision: strands targets the + # canonical 13-event taxonomy exclusively. Unknown event types + # must be rejected by the base adapter's typed-event validator, + # so this stays ``False``. See ``docs/adapters/typed-events.md`` + # for the opt-in policy. + ALLOW_UNREGISTERED_EVENTS: bool = False + def __init__( self, stratix: Any | None = None, @@ -179,7 +267,14 @@ def traced_call(*args: Any, **kwargs: Any) -> Any: return traced_call def _extract_run_details(self, agent: Any, result: Any) -> None: - """Extract tool calls, model invocations, and cost from run result.""" + """Extract tool calls, model invocations, and cost from a run result. + + Emits typed :class:`ModelInvokeEvent`, :class:`CostRecordEvent`, + :class:`ToolCallEvent`, and a conversation-state-change marker + (carried as :class:`AgentOutputEvent.metadata.conversation_state`) + when the framework surfaces the corresponding data on the run + result. + """ if result is None: return try: @@ -187,13 +282,14 @@ def _extract_run_details(self, agent: Any, result: Any) -> None: model = getattr(agent, "model", None) or getattr(agent, "model_id", None) if model: model_name = str(model) - self.emit_dict_event( - "model.invoke", - { - "framework": "strands", - "model": model_name, - "provider": self._detect_provider(model_name), - }, + provider = self._detect_provider(model_name) or "bedrock" + self.emit_event( + ModelInvokeEvent.create( + provider=provider, + name=model_name, + version="unavailable", + parameters={"framework": "strands"}, + ) ) # Extract usage/token info from result @@ -202,48 +298,71 @@ def _extract_run_details(self, agent: Any, result: Any) -> None: tokens_prompt = getattr(usage, "inputTokens", None) or getattr(usage, "prompt_tokens", None) tokens_completion = getattr(usage, "outputTokens", None) or getattr(usage, "completion_tokens", None) tokens_total = getattr(usage, "totalTokens", None) or getattr(usage, "total_tokens", None) - self.emit_dict_event( - "cost.record", - { - "framework": "strands", - "model": str(model) if model else None, - "tokens_prompt": tokens_prompt, - "tokens_completion": tokens_completion, - "tokens_total": tokens_total, - }, + self.emit_event( + CostRecordEvent.create( + prompt_tokens=tokens_prompt, + completion_tokens=tokens_completion, + tokens=tokens_total, + ) ) - # Extract tool calls from result + # Extract tool calls from result. Strands tool-result entries + # come through as objects (with ``name``/``input``/``output`` + # attributes) or dicts; normalise both shapes here. tool_results = getattr(result, "tool_results", None) or [] for tr in tool_results: - self.emit_dict_event( - "tool.call", - { - "framework": "strands", - "tool_name": getattr(tr, "name", None) or tr.get("name", "unknown") - if isinstance(tr, dict) - else "unknown", - "tool_input": self._safe_serialize( - getattr(tr, "input", None) or (tr.get("input") if isinstance(tr, dict) else None) - ), - "tool_output": self._safe_serialize( - getattr(tr, "output", None) or (tr.get("output") if isinstance(tr, dict) else None) - ), - }, + if isinstance(tr, dict): + tool_name_raw: Any = tr.get("name", "unknown") + raw_input = tr.get("input") + raw_output = tr.get("output") + else: + tool_name_raw = getattr(tr, "name", None) or "unknown" + raw_input = getattr(tr, "input", None) + raw_output = getattr(tr, "output", None) + tool_name = str(tool_name_raw) if tool_name_raw else "unknown" + serialised_input = self._safe_serialize(raw_input) + serialised_output = self._safe_serialize(raw_output) + input_data = _coerce_to_dict(serialised_input) + input_data.setdefault("framework", "strands") + output_data: dict[str, Any] | None = ( + _coerce_to_dict(serialised_output) + if serialised_output is not None + else None + ) + self.emit_event( + ToolCallEvent.create( + name=tool_name, + version="unavailable", + integration=IntegrationType.LIBRARY, + input_data=input_data, + output_data=output_data, + ) ) - # Emit conversation state change + # Emit conversation state change. The previous adapter emitted + # an ad-hoc agent.state.change payload with only + # event_subtype + turn_count. That payload did not satisfy + # the canonical AgentStateChangeEvent before_hash / after_hash + # contract — the framework does not surface a hashable state + # snapshot at the conversation boundary. The post-migration + # mapping carries the conversation-progress signal as + # AgentOutputEvent.metadata.conversation_state, preserving + # the marker without violating the canonical schema. conversation = getattr(agent, "conversation", None) or getattr(agent, "conversation_manager", None) if conversation: - turn_count = getattr(conversation, "turn_count", None) or len(getattr(conversation, "messages", [])) - self.emit_dict_event( - "agent.state.change", - { - "framework": "strands", - "agent_name": getattr(agent, "name", "strands_agent"), - "event_subtype": "conversation_update", - "turn_count": turn_count, - }, + turn_count = getattr(conversation, "turn_count", None) or len( + getattr(conversation, "messages", []) + ) + self.emit_event( + AgentOutputEvent.create( + message="", + metadata={ + "framework": "strands", + "agent_name": getattr(agent, "name", "strands_agent"), + "conversation_state": "conversation_update", + "turn_count": turn_count, + }, + ) ) except Exception: logger.debug("Could not extract run details", exc_info=True) @@ -251,7 +370,7 @@ def _extract_run_details(self, agent: Any, result: Any) -> None: # --- Lifecycle Hooks --- def on_run_start(self, agent_name: str | None = None, input_data: Any = None) -> None: - """Emit agent.input event when an agent run starts.""" + """Emit a typed :class:`AgentInputEvent` when an agent run starts.""" if not self._connected: return try: @@ -259,14 +378,18 @@ def on_run_start(self, agent_name: str | None = None, input_data: Any = None) -> start_ns = time.time_ns() with self._adapter_lock: self._run_starts[tid] = start_ns - self.emit_dict_event( - "agent.input", - { - "framework": "strands", - "agent_name": agent_name, - "input": self._safe_serialize(input_data), - "timestamp_ns": start_ns, - }, + raw_input = self._safe_serialize(input_data) + self.emit_event( + AgentInputEvent.create( + message=_stringify(raw_input), + role=MessageRole.HUMAN, + metadata={ + "framework": "strands", + "agent_name": agent_name, + "timestamp_ns": start_ns, + "raw_input": raw_input, + }, + ) ) except Exception: logger.warning("Error in on_run_start", exc_info=True) @@ -277,7 +400,20 @@ def on_run_end( output: Any = None, error: Exception | None = None, ) -> None: - """Emit agent.output event when an agent run ends.""" + """Emit a typed :class:`AgentOutputEvent` when an agent run ends. + + The previous adapter implementation also emitted a separate + ``agent.state.change`` payload carrying only an + ``event_subtype`` marker (``run_complete`` / ``run_failed``). + That payload did not satisfy the canonical + :class:`AgentStateChangeEvent` ``before_hash`` / ``after_hash`` + contract — the run boundary has no real state mutation to + hash. The post-migration mapping (matching the PR #151 + ms_agent_framework precedent) carries the same signal as + ``run_status`` on :class:`MessageContent.metadata`, preserving + the cross-cutting completion marker without violating the + canonical schema. + """ if not self._connected: return try: @@ -286,22 +422,21 @@ def on_run_end( with self._adapter_lock: start_ns = self._run_starts.pop(tid, 0) duration_ns = end_ns - start_ns if start_ns else 0 - payload: dict[str, Any] = { + raw_output = self._safe_serialize(output) + metadata: dict[str, Any] = { "framework": "strands", "agent_name": agent_name, - "output": self._safe_serialize(output), "duration_ns": duration_ns, + "raw_output": raw_output, + "run_status": "run_complete" if not error else "run_failed", } if error: - payload["error"] = str(error) - self.emit_dict_event("agent.output", payload) - self.emit_dict_event( - "agent.state.change", - { - "framework": "strands", - "agent_name": agent_name, - "event_subtype": "run_complete" if not error else "run_failed", - }, + metadata["error"] = str(error) + self.emit_event( + AgentOutputEvent.create( + message=_stringify(raw_output), + metadata=metadata, + ) ) except Exception: logger.warning("Error in on_run_end", exc_info=True) @@ -314,21 +449,37 @@ def on_tool_use( error: Exception | None = None, latency_ms: float | None = None, ) -> None: - """Emit tool.call event for a tool invocation.""" + """Emit a typed :class:`ToolCallEvent` for a tool invocation. + + AWS Strands tools execute as in-process Python callables in + the host runtime — even when the underlying capability is an + AWS service, the *call* boundary instrumented here is the + in-process invocation. Integration is therefore + :class:`IntegrationType.LIBRARY`. Tool versions are not + surfaced by Strands so ``version`` falls back to + ``"unavailable"`` per the canonical NORMATIVE rule. + """ if not self._connected: return try: - payload: dict[str, Any] = { - "framework": "strands", - "tool_name": tool_name, - "tool_input": self._safe_serialize(tool_input), - "tool_output": self._safe_serialize(tool_output), - } - if error: - payload["error"] = str(error) - if latency_ms is not None: - payload["latency_ms"] = latency_ms - self.emit_dict_event("tool.call", payload) + serialised_input = self._safe_serialize(tool_input) + serialised_output = self._safe_serialize(tool_output) + input_data = _coerce_to_dict(serialised_input) + input_data.setdefault("framework", "strands") + output_data: dict[str, Any] | None = ( + _coerce_to_dict(serialised_output) if serialised_output is not None else None + ) + self.emit_event( + ToolCallEvent.create( + name=tool_name, + version="unavailable", + integration=IntegrationType.LIBRARY, + input_data=input_data, + output_data=output_data, + error=str(error) if error else None, + latency_ms=latency_ms, + ) + ) except Exception: logger.warning("Error in on_tool_use", exc_info=True) @@ -341,24 +492,32 @@ def on_llm_call( latency_ms: float | None = None, messages: list[dict[str, str]] | None = None, ) -> None: - """Emit model.invoke event for an LLM call.""" + """Emit a typed :class:`ModelInvokeEvent` for an LLM call. + + Strands defaults to AWS Bedrock when no provider is + explicitly supplied, matching the framework's primary + deployment target. + """ if not self._connected: return try: - payload: dict[str, Any] = {"framework": "strands"} - if provider: - payload["provider"] = provider - if model: - payload["model"] = model - if tokens_prompt is not None: - payload["tokens_prompt"] = tokens_prompt - if tokens_completion is not None: - payload["tokens_completion"] = tokens_completion - if latency_ms is not None: - payload["latency_ms"] = latency_ms + model_name = model or "unknown" + resolved_provider = provider or self._detect_provider(model_name) or "bedrock" + input_messages: list[dict[str, str]] | None = None if self._capture_config.capture_content and messages: - payload["messages"] = messages - self.emit_dict_event("model.invoke", payload) + input_messages = messages + self.emit_event( + ModelInvokeEvent.create( + provider=resolved_provider, + name=model_name, + version="unavailable", + parameters={"framework": "strands"}, + prompt_tokens=tokens_prompt, + completion_tokens=tokens_completion, + latency_ms=latency_ms, + input_messages=input_messages, + ) + ) except Exception: logger.warning("Error in on_llm_call", exc_info=True) @@ -389,31 +548,50 @@ def _detect_provider(self, model: str | None) -> str | None: return "bedrock" # Default to Bedrock for Strands def _emit_agent_config(self, agent_name: str, agent: Any) -> None: - """Emit environment.config event for agent configuration on first encounter.""" + """Emit a typed :class:`EnvironmentConfigEvent` per agent. + + Idempotent per agent — only the first call for a given agent + name actually emits. Strands agents typically run in a + ``cloud`` environment (AWS), but the adapter cannot + confidently distinguish cloud vs. local execution, so the + canonical ``simulated`` env_type is used by default — the + host application is responsible for emitting the real + environment.config record. Strands-specific provenance + (``framework``, ``agent_name``, ``model``, ``tools``, + ``conversation_type``, ``system_prompt``) lives on + :attr:`EnvironmentInfo.attributes`. + """ with self._adapter_lock: if agent_name in self._seen_agents: return self._seen_agents.add(agent_name) - metadata: dict[str, Any] = { + attributes: dict[str, Any] = { "framework": "strands", "agent_name": agent_name, } model = getattr(agent, "model", None) or getattr(agent, "model_id", None) if model: - metadata["model"] = str(model) + attributes["model"] = str(model) system_prompt = getattr(agent, "system_prompt", None) if system_prompt and self._capture_config.capture_content: - metadata["system_prompt"] = str(system_prompt)[:500] + attributes["system_prompt"] = str(system_prompt)[:500] tools = getattr(agent, "tools", None) if tools: if isinstance(tools, dict): - metadata["tools"] = list(tools.keys()) + attributes["tools"] = list(tools.keys()) else: - metadata["tools"] = [getattr(t, "name", None) or getattr(t, "tool_name", str(t)) for t in tools] + attributes["tools"] = [ + getattr(t, "name", None) or getattr(t, "tool_name", str(t)) for t in tools + ] conversation = getattr(agent, "conversation", None) or getattr(agent, "conversation_manager", None) if conversation: - metadata["conversation_type"] = str(type(conversation).__name__) - self.emit_dict_event("environment.config", metadata) + attributes["conversation_type"] = str(type(conversation).__name__) + self.emit_event( + EnvironmentConfigEvent.create( + env_type=EnvironmentType.SIMULATED, + attributes=attributes, + ) + ) def _safe_serialize(self, value: Any) -> Any: """Safely serialize a value for event payloads.""" diff --git a/tests/instrument/adapters/frameworks/test_pydantic_ai_adapter.py b/tests/instrument/adapters/frameworks/test_pydantic_ai_adapter.py index 6c4c3d5..d0d4b7f 100644 --- a/tests/instrument/adapters/frameworks/test_pydantic_ai_adapter.py +++ b/tests/instrument/adapters/frameworks/test_pydantic_ai_adapter.py @@ -1,6 +1,13 @@ """Unit tests for the PydanticAI framework adapter. Mocked at the SDK shape level — no real ``pydantic_ai`` runtime needed. + +After the typed-event migration (PR #129 follow-up — bundle 5) every +emit site flows through :meth:`BaseAdapter.emit_event` with a canonical +Pydantic payload. The :class:`_RecordingStratix` stand-in below records +both shapes: the ``payload`` slot always carries a dict (model-dumped +if typed), and ``typed_payloads`` holds the original Pydantic instances +for tests that want to assert against the model surface. """ from __future__ import annotations @@ -10,6 +17,10 @@ import pytest +from layerlens._compat.pydantic import ( + BaseModel as _CompatBaseModel, + model_dump as _compat_model_dump, +) from layerlens.instrument.adapters._base import AdapterStatus, CaptureConfig from layerlens.instrument.adapters.frameworks.pydantic_ai import ( ADAPTER_CLASS, @@ -27,10 +38,22 @@ class _RecordingStratix: def __init__(self) -> None: self.events: List[Dict[str, Any]] = [] + # Hold strong references to the original typed payloads. + self.typed_payloads: List[Any] = [] def emit(self, *args: Any, **kwargs: Any) -> None: + # Two-arg legacy path: ``emit(event_type, payload_dict)``. if len(args) == 2 and isinstance(args[0], str): self.events.append({"event_type": args[0], "payload": args[1]}) + return + # Single-arg typed path: ``emit(payload_model[, privacy_level])``. + if args and isinstance(args[0], _CompatBaseModel): + payload_model = args[0] + self.typed_payloads.append(payload_model) + event_type = getattr(payload_model, "event_type", "") + self.events.append( + {"event_type": event_type, "payload": _compat_model_dump(payload_model)} + ) class _FakeAgent: @@ -96,6 +119,7 @@ def test_instrument_agent_wraps_run_sync() -> None: def test_run_emits_input_and_output_events() -> None: + """Typed migration: agent_name lives at payload.content.metadata.""" stratix = _RecordingStratix() adapter = PydanticAIAdapter(stratix=stratix, capture_config=CaptureConfig.full()) adapter.connect() @@ -111,11 +135,13 @@ def test_run_emits_input_and_output_events() -> None: assert "agent.output" in types out = next(e for e in stratix.events if e["event_type"] == "agent.output") - assert out["payload"]["agent_name"] == "planner" - assert out["payload"]["duration_ns"] >= 0 + metadata = out["payload"]["content"]["metadata"] + assert metadata["agent_name"] == "planner" + assert metadata["duration_ns"] >= 0 def test_run_failure_emits_output_with_error() -> None: + """Typed migration: error is on AgentOutputEvent.metadata, run_status=run_failed.""" stratix = _RecordingStratix() adapter = PydanticAIAdapter(stratix=stratix, capture_config=CaptureConfig.full()) adapter.connect() @@ -126,12 +152,18 @@ def test_run_failure_emits_output_with_error() -> None: agent.run_sync("bad") out = next(e for e in stratix.events if e["event_type"] == "agent.output") - assert "error" in out["payload"] - assert "simulated failure" in out["payload"]["error"] + metadata = out["payload"]["content"]["metadata"] + assert "error" in metadata + assert "simulated failure" in metadata["error"] + assert metadata["run_status"] == "run_failed" def test_run_extracts_usage_and_messages() -> None: - """When the result has usage and a tool-return message, cost.record + tool.call fire.""" + """When the result has usage and a tool-return message, cost.record + tool.call fire. + + Typed shape: tokens at payload.cost.tokens, model at payload.model.name, + tool name at payload.tool.name. + """ stratix = _RecordingStratix() adapter = PydanticAIAdapter(stratix=stratix, capture_config=CaptureConfig.full()) adapter.connect() @@ -155,24 +187,58 @@ def test_run_extracts_usage_and_messages() -> None: assert "tool.call" in types cost = next(e for e in stratix.events if e["event_type"] == "cost.record") - assert cost["payload"]["tokens_total"] == 15 + assert cost["payload"]["cost"]["tokens"] == 15 + assert cost["payload"]["cost"]["prompt_tokens"] == 10 + assert cost["payload"]["cost"]["completion_tokens"] == 5 + + invoke = next(e for e in stratix.events if e["event_type"] == "model.invoke") + assert invoke["payload"]["model"]["name"] == "gpt-5" + assert invoke["payload"]["model"]["provider"] == "openai" + tool = next(e for e in stratix.events if e["event_type"] == "tool.call") - assert tool["payload"]["tool_name"] == "calc" + assert tool["payload"]["tool"]["name"] == "calc" + assert tool["payload"]["tool"]["integration"] == "library" + # Scalar tool-return content is wrapped in {"value": ...}. + assert tool["payload"]["output"] == {"value": 42} def test_on_handoff_emits_event_with_context_hash() -> None: + """Typed AgentHandoffEvent: handoff_context_hash is sha256:.""" stratix = _RecordingStratix() adapter = PydanticAIAdapter(stratix=stratix, capture_config=CaptureConfig.full()) adapter.connect() adapter.on_handoff(from_agent="a", to_agent="b", context="some context") evt = next(e for e in stratix.events if e["event_type"] == "agent.handoff") - assert evt["payload"]["from_agent"] == "a" - assert evt["payload"]["context_hash"] is not None + payload = evt["payload"] + assert payload["from_agent"] == "a" + assert payload["to_agent"] == "b" + assert payload["handoff_context_hash"].startswith("sha256:") + assert len(payload["handoff_context_hash"]) == 7 + 64 + + +def test_handoff_emits_canonical_hash_for_empty_context() -> None: + """Empty context still produces a well-formed sha256 hash.""" + stratix = _RecordingStratix() + adapter = PydanticAIAdapter(stratix=stratix, capture_config=CaptureConfig.full()) + adapter.connect() + adapter.on_handoff(from_agent="a", to_agent="b", context=None) + + evt = next(e for e in stratix.events if e["event_type"] == "agent.handoff") + assert evt["payload"]["handoff_context_hash"].startswith("sha256:") def test_capture_config_gates_l1_agent_io() -> None: - """When l1_agent_io is disabled, agent.input/output do NOT fire (state.change still does).""" + """When l1_agent_io is disabled, agent.input/output do NOT fire. + + Post-migration: the previous adapter emitted an ad-hoc + ``agent.state.change`` payload alongside agent.output to carry a + run_complete / run_failed marker. That payload is no longer + emitted (it did not satisfy the canonical + AgentStateChangeEvent contract); the marker now lives on + AgentOutputEvent.metadata.run_status. So when l1_agent_io is + disabled, no agent.* events fire at all from on_run_*. + """ stratix = _RecordingStratix() cfg = CaptureConfig(l1_agent_io=False) adapter = PydanticAIAdapter(stratix=stratix, capture_config=cfg) @@ -184,11 +250,13 @@ def test_capture_config_gates_l1_agent_io() -> None: types = [e["event_type"] for e in stratix.events] assert "agent.input" not in types assert "agent.output" not in types - # state.change is cross-cutting / always enabled. - assert "agent.state.change" in types + # agent.state.change is NO LONGER emitted post-migration — + # see the on_run_end docstring for the rationale. + assert "agent.state.change" not in types def test_environment_config_emits_once_per_agent() -> None: + """Typed migration: agent_name + tools live at payload.environment.attributes.""" stratix = _RecordingStratix() adapter = PydanticAIAdapter(stratix=stratix, capture_config=CaptureConfig.full()) adapter.connect() @@ -199,8 +267,10 @@ def test_environment_config_emits_once_per_agent() -> None: configs = [e for e in stratix.events if e["event_type"] == "environment.config"] assert len(configs) == 1 - assert configs[0]["payload"]["agent_name"] == "a1" - assert configs[0]["payload"]["tools"] == ["search"] + attributes = configs[0]["payload"]["environment"]["attributes"] + assert attributes["agent_name"] == "a1" + assert attributes["tools"] == ["search"] + assert configs[0]["payload"]["environment"]["type"] == "simulated" def test_instrument_agent_helper() -> None: @@ -220,3 +290,97 @@ def test_serialize_for_replay() -> None: assert rt.framework == "pydantic_ai" assert rt.adapter_name == "PydanticAIAdapter" assert "capture_config" in rt.config + + +# --------------------------------------------------------------------------- +# Typed-event migration regression tests (PR #129 follow-up — bundle 5) +# --------------------------------------------------------------------------- + + +def test_pydantic_ai_emits_typed_payloads_only() -> None: + """Every emit site in pydantic_ai lifecycle.py is a typed + emit_event call. + + Pins the post-migration contract: the recording stratix's + ``typed_payloads`` list grows for every emission and the legacy + two-arg dict path receives nothing. + """ + from layerlens.instrument._compat.events import ( + ToolCallEvent, + AgentInputEvent, + CostRecordEvent, + AgentOutputEvent, + ModelInvokeEvent, + AgentHandoffEvent, + EnvironmentConfigEvent, + ) + + stratix = _RecordingStratix() + adapter = PydanticAIAdapter(stratix=stratix, capture_config=CaptureConfig.full()) + adapter.connect() + + # Drive every emission path: instrument_agent triggers + # environment.config; run_sync drives agent.input/output and + # _extract_run_usage drives cost.record + model.invoke + tool.call. + usage = SimpleNamespace(request_tokens=10, response_tokens=5, total_tokens=15) + response_msg = SimpleNamespace(kind="response") + tool_msg = SimpleNamespace(kind="tool-return", tool_name="calc", content=42) + result = SimpleNamespace( + data="ok", + usage=usage, + all_messages=[response_msg, tool_msg], + model_name="gpt-5", + ) + agent = _FakeAgent(name="planner", model="gpt-5", result=result) + adapter.instrument_agent(agent) + agent.run_sync("hi") + + # Direct lifecycle hooks + adapter.on_tool_use("ext_tool", tool_input={"a": 1}, tool_output="ok") + adapter.on_llm_call(provider="openai", model="gpt-5", tokens_prompt=5) + adapter.on_handoff(from_agent="planner", to_agent="executor", context="ctx") + + # Every captured payload is a Pydantic model instance — the legacy + # dict path was not used. + assert stratix.typed_payloads, "expected typed payloads to be captured" + types_seen = {type(p) for p in stratix.typed_payloads} + assert AgentInputEvent in types_seen + assert AgentOutputEvent in types_seen + assert AgentHandoffEvent in types_seen + assert CostRecordEvent in types_seen + assert EnvironmentConfigEvent in types_seen + assert ModelInvokeEvent in types_seen + assert ToolCallEvent in types_seen + + +def test_pydantic_ai_emit_does_not_warn_after_migration() -> None: + """No DeprecationWarning fires from pydantic_ai lifecycle paths. + + The base adapter's ``emit_dict_event`` raises a DeprecationWarning + on every call. After migration, pydantic_ai lifecycle must never + trigger that warning. + """ + import warnings + + stratix = _RecordingStratix() + adapter = PydanticAIAdapter(stratix=stratix, capture_config=CaptureConfig.full()) + adapter.connect() + + with warnings.catch_warnings(): + warnings.simplefilter("error", DeprecationWarning) + + usage = SimpleNamespace(request_tokens=10, response_tokens=5, total_tokens=15) + response_msg = SimpleNamespace(kind="response") + tool_msg = SimpleNamespace(kind="tool-return", tool_name="calc", content=42) + result = SimpleNamespace( + data="ok", + usage=usage, + all_messages=[response_msg, tool_msg], + model_name="gpt-5", + ) + agent = _FakeAgent(name="planner", model="gpt-5", result=result) + adapter.instrument_agent(agent) + agent.run_sync("hi") + adapter.on_tool_use("t", tool_input={"a": 1}, tool_output="ok") + adapter.on_llm_call(provider="openai", model="gpt-5", tokens_prompt=5) + adapter.on_handoff(from_agent="a", to_agent="b", context="ctx") diff --git a/tests/instrument/adapters/frameworks/test_semantic_kernel_adapter.py b/tests/instrument/adapters/frameworks/test_semantic_kernel_adapter.py index 6b4615c..b331d6e 100644 --- a/tests/instrument/adapters/frameworks/test_semantic_kernel_adapter.py +++ b/tests/instrument/adapters/frameworks/test_semantic_kernel_adapter.py @@ -5,12 +5,30 @@ suite of lifecycle hooks (``on_function_start``, ``on_model_invoke``, ``on_planner_step``, etc.) that are called by those filters. Tests exercise the lifecycle hooks directly + verify filter wiring. + +After the typed-event migration (PR #129 follow-up — bundle 5) every +emit site flows through :meth:`BaseAdapter.emit_event` with a canonical +Pydantic payload. The :class:`_RecordingStratix` stand-in below records +both shapes: the ``payload`` slot always carries a dict (model-dumped +if typed), and ``typed_payloads`` holds the original Pydantic instances +for tests that want to assert against the model surface. + +Notable mapping decision: the legacy ``agent.code`` event type used +for prompt rendering and planner steps is NOT in the canonical +13-event taxonomy. The migration re-maps those boundaries onto +:class:`ToolLogicEvent` (L5b — tool business logic). Test coverage +asserts the L5b fields (``description``, ``rules``) carry the +expected provenance. """ from __future__ import annotations from typing import Any, Dict, List +from layerlens._compat.pydantic import ( + BaseModel as _CompatBaseModel, + model_dump as _compat_model_dump, +) from layerlens.instrument.adapters._base import AdapterStatus, CaptureConfig from layerlens.instrument.adapters.frameworks.semantic_kernel import ( ADAPTER_CLASS, @@ -27,10 +45,22 @@ class _RecordingStratix: def __init__(self) -> None: self.events: List[Dict[str, Any]] = [] + # Hold strong references to the original typed payloads. + self.typed_payloads: List[Any] = [] def emit(self, *args: Any, **kwargs: Any) -> None: + # Two-arg legacy path: ``emit(event_type, payload_dict)``. if len(args) == 2 and isinstance(args[0], str): self.events.append({"event_type": args[0], "payload": args[1]}) + return + # Single-arg typed path: ``emit(payload_model[, privacy_level])``. + if args and isinstance(args[0], _CompatBaseModel): + payload_model = args[0] + self.typed_payloads.append(payload_model) + event_type = getattr(payload_model, "event_type", "") + self.events.append( + {"event_type": event_type, "payload": _compat_model_dump(payload_model)} + ) class _FakeKernel: @@ -42,6 +72,15 @@ def add_filter(self, filter_type: str, filter_obj: Any) -> None: self._added_filters.append({"type": filter_type, "filter": filter_obj}) +def _rule_value(rules: List[str], key: str) -> str | None: + """Find the JSON-encoded value in the L5b rules list for a given key.""" + prefix = f"{key}=" + for rule in rules: + if rule.startswith(prefix): + return rule[len(prefix):] + return None + + def test_adapter_class_export() -> None: assert ADAPTER_CLASS is SemanticKernelAdapter @@ -65,6 +104,7 @@ def test_adapter_info_and_health() -> None: def test_instrument_kernel_registers_filters_and_discovers_plugins() -> None: + """Typed migration: plugin_name lives at payload.environment.attributes.""" stratix = _RecordingStratix() adapter = SemanticKernelAdapter(stratix=stratix, capture_config=CaptureConfig.full()) adapter.connect() @@ -77,12 +117,13 @@ def test_instrument_kernel_registers_filters_and_discovers_plugins() -> None: # Plugin discovery emits environment.config events. configs = [e for e in stratix.events if e["event_type"] == "environment.config"] - plugin_names = {c["payload"].get("plugin_name") for c in configs} + plugin_names = {c["payload"]["environment"]["attributes"].get("plugin_name") for c in configs} assert "math" in plugin_names assert "search" in plugin_names def test_on_function_start_end_emits_tool_call() -> None: + """Typed migration: tool name lives at payload.tool.name; latency at payload.latency_ms.""" stratix = _RecordingStratix() adapter = SemanticKernelAdapter(stratix=stratix, capture_config=CaptureConfig.full()) adapter.connect() @@ -91,13 +132,16 @@ def test_on_function_start_end_emits_tool_call() -> None: adapter.on_function_end(context=ctx, result=3) evt = next(e for e in stratix.events if e["event_type"] == "tool.call") - assert evt["payload"]["tool_name"] == "math.add" - assert evt["payload"]["plugin_name"] == "math" - assert evt["payload"]["function_name"] == "add" - assert evt["payload"]["latency_ms"] >= 0 + payload = evt["payload"] + assert payload["tool"]["name"] == "math.add" + assert payload["tool"]["integration"] == "library" + assert payload["input"]["plugin_name"] == "math" + assert payload["input"]["function_name"] == "add" + assert payload["latency_ms"] >= 0 def test_on_model_invoke_emits_invoke_and_cost() -> None: + """Typed migration: model name at payload.model.name; tokens at payload.cost.tokens.""" stratix = _RecordingStratix() adapter = SemanticKernelAdapter(stratix=stratix, capture_config=CaptureConfig.full()) adapter.connect() @@ -111,14 +155,26 @@ def test_on_model_invoke_emits_invoke_and_cost() -> None: ) invoke = next(e for e in stratix.events if e["event_type"] == "model.invoke") - assert invoke["payload"]["model"] == "gpt-5" - assert invoke["payload"]["latency_ms"] == 20.0 + inv_payload = invoke["payload"] + assert inv_payload["model"]["name"] == "gpt-5" + assert inv_payload["model"]["provider"] == "azure_openai" + assert inv_payload["latency_ms"] == 20.0 cost = next(e for e in stratix.events if e["event_type"] == "cost.record") - assert cost["payload"]["total_tokens"] == 15 + assert cost["payload"]["cost"]["tokens"] == 15 + assert cost["payload"]["cost"]["prompt_tokens"] == 10 + assert cost["payload"]["cost"]["completion_tokens"] == 5 + +def test_on_prompt_render_emits_tool_logic() -> None: + """Typed migration: agent.code → ToolLogicEvent (L5b). -def test_on_prompt_render_emits_agent_code() -> None: + The legacy adapter emitted an ad-hoc ``agent.code`` event for + prompt template rendering; that event type is NOT in the + canonical 13-event taxonomy. The post-migration mapping carries + rendering as L5b business logic with the rendering operation as + ``description`` and per-event provenance encoded as ``rules``. + """ stratix = _RecordingStratix() adapter = SemanticKernelAdapter(stratix=stratix, capture_config=CaptureConfig.full()) adapter.connect() @@ -129,12 +185,21 @@ def test_on_prompt_render_emits_agent_code() -> None: function_name="greet", ) - evt = next(e for e in stratix.events if e["event_type"] == "agent.code") - assert evt["payload"]["event_subtype"] == "prompt_render" - assert evt["payload"]["function_name"] == "greet" + # agent.code is no longer emitted post-migration + types = [e["event_type"] for e in stratix.events] + assert "agent.code" not in types + assert "tool.logic" in types + + evt = next(e for e in stratix.events if e["event_type"] == "tool.logic") + payload = evt["payload"] + assert "prompt_render" in payload["logic"]["description"] + rules = payload["logic"]["rules"] + assert _rule_value(rules, "event_subtype") == '"prompt_render"' + assert _rule_value(rules, "function_name") == '"greet"' -def test_on_planner_step_emits_agent_code() -> None: +def test_on_planner_step_emits_tool_logic() -> None: + """Typed migration: agent.code → ToolLogicEvent for planner steps.""" stratix = _RecordingStratix() adapter = SemanticKernelAdapter(stratix=stratix, capture_config=CaptureConfig.full()) adapter.connect() @@ -148,13 +213,23 @@ def test_on_planner_step_emits_agent_code() -> None: status="completed", ) - evt = next(e for e in stratix.events if e["event_type"] == "agent.code") - assert evt["payload"]["event_subtype"] == "planner_step" - assert evt["payload"]["planner_type"] == "HandlebarsPlanner" - assert evt["payload"]["step_index"] == 1 + types = [e["event_type"] for e in stratix.events] + assert "agent.code" not in types + assert "tool.logic" in types + + evt = next(e for e in stratix.events if e["event_type"] == "tool.logic") + payload = evt["payload"] + assert "planner_step" in payload["logic"]["description"] + assert "HandlebarsPlanner" in payload["logic"]["description"] + rules = payload["logic"]["rules"] + assert _rule_value(rules, "event_subtype") == '"planner_step"' + assert _rule_value(rules, "planner_type") == '"HandlebarsPlanner"' + assert _rule_value(rules, "step_index") == "1" + assert _rule_value(rules, "action") == '"search"' def test_on_memory_operation_emits_tool_call() -> None: + """Typed migration: memory operations remain L5a tool.call (in-process API).""" stratix = _RecordingStratix() adapter = SemanticKernelAdapter(stratix=stratix, capture_config=CaptureConfig.full()) adapter.connect() @@ -169,12 +244,15 @@ def test_on_memory_operation_emits_tool_call() -> None: ) evt = next(e for e in stratix.events if e["event_type"] == "tool.call") - assert evt["payload"]["tool_name"] == "memory.search" - assert evt["payload"]["result_count"] == 3 - assert evt["payload"]["backend_type"] == "qdrant" + payload = evt["payload"] + assert payload["tool"]["name"] == "memory.search" + assert payload["tool"]["integration"] == "library" + assert payload["input"]["backend_type"] == "qdrant" + assert payload["output"]["result_count"] == 3 def test_on_kernel_invoke_start_end_emits_input_output() -> None: + """Typed migration: input/output text lives at payload.content.message.""" stratix = _RecordingStratix() adapter = SemanticKernelAdapter(stratix=stratix, capture_config=CaptureConfig.full()) adapter.connect() @@ -186,8 +264,8 @@ def test_on_kernel_invoke_start_end_emits_input_output() -> None: assert "agent.output" in types out = next(e for e in stratix.events if e["event_type"] == "agent.output") - assert out["payload"]["output"] == "world" - assert out["payload"]["duration_ns"] >= 0 + assert out["payload"]["content"]["message"] == "world" + assert out["payload"]["content"]["metadata"]["duration_ns"] >= 0 def test_capture_config_gates_l5a_tool_calls() -> None: @@ -216,3 +294,83 @@ def test_serialize_for_replay() -> None: assert rt.framework == "semantic_kernel" assert rt.adapter_name == "SemanticKernelAdapter" assert "capture_config" in rt.config + + +# --------------------------------------------------------------------------- +# Typed-event migration regression tests (PR #129 follow-up — bundle 5) +# --------------------------------------------------------------------------- + + +def test_semantic_kernel_emits_typed_payloads_only() -> None: + """Every emit site in semantic_kernel lifecycle.py is a typed + emit_event call. + + Pins the post-migration contract: the recording stratix's + ``typed_payloads`` list grows for every emission and the legacy + two-arg dict path receives nothing. + """ + from layerlens.instrument._compat.events import ( + ToolCallEvent, + AgentInputEvent, + CostRecordEvent, + ToolLogicEvent, + AgentOutputEvent, + ModelInvokeEvent, + EnvironmentConfigEvent, + ) + + stratix = _RecordingStratix() + adapter = SemanticKernelAdapter(stratix=stratix, capture_config=CaptureConfig.full()) + adapter.connect() + + kernel = _FakeKernel(plugins={"math": object()}) + adapter.instrument_kernel(kernel) + + # Drive every emission path + ctx = adapter.on_function_start(plugin_name="math", function_name="add") + adapter.on_function_end(context=ctx, result=3) + adapter.on_prompt_render(template="hello", rendered_prompt="hi", function_name="greet") + adapter.on_model_invoke(provider="openai", model="gpt-5", prompt_tokens=10, completion_tokens=5) + adapter.on_planner_step(planner_type="HandlebarsPlanner", step_index=0, thought="t", action="a") + adapter.on_memory_operation(operation="search", collection="facts", query="q", result_count=3) + adapter.on_kernel_invoke_start(input_text="hello") + adapter.on_kernel_invoke_end(output="world") + + # Every captured payload is a Pydantic model instance — the legacy + # dict path was not used. + assert stratix.typed_payloads, "expected typed payloads to be captured" + types_seen = {type(p) for p in stratix.typed_payloads} + assert AgentInputEvent in types_seen + assert AgentOutputEvent in types_seen + assert CostRecordEvent in types_seen + assert EnvironmentConfigEvent in types_seen + assert ModelInvokeEvent in types_seen + assert ToolCallEvent in types_seen + assert ToolLogicEvent in types_seen + + +def test_semantic_kernel_emit_does_not_warn_after_migration() -> None: + """No DeprecationWarning fires from semantic_kernel lifecycle paths. + + The base adapter's ``emit_dict_event`` raises a DeprecationWarning + on every call. After migration, semantic_kernel lifecycle must + never trigger that warning. + """ + import warnings + + stratix = _RecordingStratix() + adapter = SemanticKernelAdapter(stratix=stratix, capture_config=CaptureConfig.full()) + adapter.connect() + + with warnings.catch_warnings(): + warnings.simplefilter("error", DeprecationWarning) + kernel = _FakeKernel(plugins={"math": object()}) + adapter.instrument_kernel(kernel) + ctx = adapter.on_function_start(plugin_name="math", function_name="add") + adapter.on_function_end(context=ctx, result=3) + adapter.on_prompt_render(template="t", rendered_prompt="r", function_name="f") + adapter.on_model_invoke(provider="openai", model="gpt-5", prompt_tokens=10, completion_tokens=5) + adapter.on_planner_step(planner_type="P", step_index=0) + adapter.on_memory_operation(operation="search", collection="c", query="q", result_count=1) + adapter.on_kernel_invoke_start(input_text="hi") + adapter.on_kernel_invoke_end(output="bye") diff --git a/tests/instrument/adapters/frameworks/test_strands_adapter.py b/tests/instrument/adapters/frameworks/test_strands_adapter.py index ce728b7..e1a509d 100644 --- a/tests/instrument/adapters/frameworks/test_strands_adapter.py +++ b/tests/instrument/adapters/frameworks/test_strands_adapter.py @@ -2,6 +2,13 @@ Mocked at the SDK shape level — no real ``strands`` runtime needed. The adapter wraps ``invoke()`` (and ``__call__``); tests exercise ``invoke``. + +After the typed-event migration (PR #129 follow-up — bundle 5) every +emit site flows through :meth:`BaseAdapter.emit_event` with a canonical +Pydantic payload. The :class:`_RecordingStratix` stand-in below records +both shapes: the ``payload`` slot always carries a dict (model-dumped +if typed), and ``typed_payloads`` holds the original Pydantic instances +for tests that want to assert against the model surface. """ from __future__ import annotations @@ -11,6 +18,10 @@ import pytest +from layerlens._compat.pydantic import ( + BaseModel as _CompatBaseModel, + model_dump as _compat_model_dump, +) from layerlens.instrument.adapters._base import AdapterStatus, CaptureConfig from layerlens.instrument.adapters.frameworks.strands import ( ADAPTER_CLASS, @@ -28,10 +39,22 @@ class _RecordingStratix: def __init__(self) -> None: self.events: List[Dict[str, Any]] = [] + # Hold strong references to the original typed payloads. + self.typed_payloads: List[Any] = [] def emit(self, *args: Any, **kwargs: Any) -> None: + # Two-arg legacy path: ``emit(event_type, payload_dict)``. if len(args) == 2 and isinstance(args[0], str): self.events.append({"event_type": args[0], "payload": args[1]}) + return + # Single-arg typed path: ``emit(payload_model[, privacy_level])``. + if args and isinstance(args[0], _CompatBaseModel): + payload_model = args[0] + self.typed_payloads.append(payload_model) + event_type = getattr(payload_model, "event_type", "") + self.events.append( + {"event_type": event_type, "payload": _compat_model_dump(payload_model)} + ) class _FakeAgent: @@ -98,6 +121,7 @@ def test_instrument_agent_wraps_invoke() -> None: def test_invoke_emits_input_and_output_events() -> None: + """Typed migration: agent_name lives at payload.content.metadata.""" stratix = _RecordingStratix() adapter = StrandsAdapter(stratix=stratix, capture_config=CaptureConfig.full()) adapter.connect() @@ -113,11 +137,13 @@ def test_invoke_emits_input_and_output_events() -> None: assert "agent.output" in types out = next(e for e in stratix.events if e["event_type"] == "agent.output") - assert out["payload"]["agent_name"] == "planner" - assert out["payload"]["duration_ns"] >= 0 + metadata = out["payload"]["content"]["metadata"] + assert metadata["agent_name"] == "planner" + assert metadata["duration_ns"] >= 0 def test_invoke_extracts_usage_and_emits_cost() -> None: + """Typed migration: tokens at payload.cost.tokens; model name at payload.model.name.""" stratix = _RecordingStratix() adapter = StrandsAdapter(stratix=stratix, capture_config=CaptureConfig.full()) adapter.connect() @@ -133,10 +159,17 @@ def test_invoke_extracts_usage_and_emits_cost() -> None: assert "cost.record" in types cost = next(e for e in stratix.events if e["event_type"] == "cost.record") - assert cost["payload"]["tokens_total"] == 15 + assert cost["payload"]["cost"]["tokens"] == 15 + assert cost["payload"]["cost"]["prompt_tokens"] == 10 + assert cost["payload"]["cost"]["completion_tokens"] == 5 + + invoke = next(e for e in stratix.events if e["event_type"] == "model.invoke") + assert invoke["payload"]["model"]["name"] == "anthropic.claude-v2" + assert invoke["payload"]["model"]["provider"] == "bedrock" def test_invoke_failure_emits_output_with_error() -> None: + """Typed migration: error is on AgentOutputEvent.metadata, run_status=run_failed.""" stratix = _RecordingStratix() adapter = StrandsAdapter(stratix=stratix, capture_config=CaptureConfig.full()) adapter.connect() @@ -147,23 +180,38 @@ def test_invoke_failure_emits_output_with_error() -> None: agent.invoke("bad") out = next(e for e in stratix.events if e["event_type"] == "agent.output") - assert "error" in out["payload"] - assert "simulated failure" in out["payload"]["error"] + metadata = out["payload"]["content"]["metadata"] + assert "error" in metadata + assert "simulated failure" in metadata["error"] + assert metadata["run_status"] == "run_failed" def test_on_tool_use_emits_event() -> None: + """Typed migration: tool name at payload.tool.name; latency at payload.latency_ms.""" stratix = _RecordingStratix() adapter = StrandsAdapter(stratix=stratix, capture_config=CaptureConfig.full()) adapter.connect() adapter.on_tool_use("calc", tool_input={"x": 1}, tool_output=2, latency_ms=12.3) evt = next(e for e in stratix.events if e["event_type"] == "tool.call") - assert evt["payload"]["tool_name"] == "calc" + assert evt["payload"]["tool"]["name"] == "calc" + assert evt["payload"]["tool"]["integration"] == "library" assert evt["payload"]["latency_ms"] == 12.3 + # Scalar tool output is wrapped in {"value": ...}. + assert evt["payload"]["output"] == {"value": 2} def test_capture_config_gates_l3_model_metadata() -> None: - """When l3_model_metadata is disabled, model.invoke does NOT fire (state.change still does).""" + """When l3_model_metadata is disabled, model.invoke does NOT fire. + + Post-migration: the previous adapter emitted an ad-hoc + ``agent.state.change`` payload at the run boundary to carry a + run_complete / run_failed marker. That payload is no longer + emitted (it did not satisfy the canonical + AgentStateChangeEvent contract); the marker now lives on + AgentOutputEvent.metadata.run_status. So this test now asserts + only that model.invoke is gated and agent.input/output still fire. + """ stratix = _RecordingStratix() cfg = CaptureConfig(l3_model_metadata=False) adapter = StrandsAdapter(stratix=stratix, capture_config=cfg) @@ -175,11 +223,17 @@ def test_capture_config_gates_l3_model_metadata() -> None: types = [e["event_type"] for e in stratix.events] assert "model.invoke" not in types - # state.change is cross-cutting — always fires. - assert "agent.state.change" in types + # agent.state.change is no longer emitted post-migration + assert "agent.state.change" not in types + # Run boundary signal is preserved on AgentOutputEvent.metadata. + assert "agent.input" in types + assert "agent.output" in types + out = next(e for e in stratix.events if e["event_type"] == "agent.output") + assert out["payload"]["content"]["metadata"]["run_status"] == "run_complete" def test_environment_config_emits_once_per_agent() -> None: + """Typed migration: tools live at payload.environment.attributes.""" stratix = _RecordingStratix() adapter = StrandsAdapter(stratix=stratix, capture_config=CaptureConfig.full()) adapter.connect() @@ -190,7 +244,39 @@ def test_environment_config_emits_once_per_agent() -> None: configs = [e for e in stratix.events if e["event_type"] == "environment.config"] assert len(configs) == 1 - assert configs[0]["payload"]["tools"] == ["search"] + attributes = configs[0]["payload"]["environment"]["attributes"] + assert attributes["tools"] == ["search"] + assert configs[0]["payload"]["environment"]["type"] == "simulated" + + +def test_conversation_state_carried_on_agent_output_metadata() -> None: + """When the agent has a conversation manager, conversation_state metadata fires. + + The previous adapter emitted an ad-hoc agent.state.change payload + with event_subtype=conversation_update + turn_count. That payload + did not satisfy the canonical AgentStateChangeEvent before_hash / + after_hash contract. The post-migration mapping carries the + conversation-progress signal as agent.output metadata. + """ + stratix = _RecordingStratix() + adapter = StrandsAdapter(stratix=stratix, capture_config=CaptureConfig.full()) + adapter.connect() + + conversation = SimpleNamespace(turn_count=3, messages=[]) + agent = _FakeAgent(name="c1", model="claude", conversation=conversation) + adapter.instrument_agent(agent) + agent.invoke("hi") + + types = [e["event_type"] for e in stratix.events] + assert "agent.state.change" not in types + # Conversation state is carried on an agent.output emission. + convo_outputs = [ + e for e in stratix.events + if e["event_type"] == "agent.output" + and e["payload"]["content"].get("metadata", {}).get("conversation_state") == "conversation_update" + ] + assert convo_outputs, "expected conversation_update marker on agent.output metadata" + assert convo_outputs[0]["payload"]["content"]["metadata"]["turn_count"] == 3 def test_instrument_agent_helper() -> None: @@ -210,3 +296,103 @@ def test_serialize_for_replay() -> None: assert rt.framework == "strands" assert rt.adapter_name == "StrandsAdapter" assert "capture_config" in rt.config + + +# --------------------------------------------------------------------------- +# Typed-event migration regression tests (PR #129 follow-up — bundle 5) +# --------------------------------------------------------------------------- + + +def test_strands_emits_typed_payloads_only() -> None: + """Every emit site in strands lifecycle.py is a typed + emit_event call. + + Pins the post-migration contract: the recording stratix's + ``typed_payloads`` list grows for every emission and the legacy + two-arg dict path receives nothing. + """ + from layerlens.instrument._compat.events import ( + ToolCallEvent, + AgentInputEvent, + CostRecordEvent, + AgentOutputEvent, + ModelInvokeEvent, + EnvironmentConfigEvent, + ) + + stratix = _RecordingStratix() + adapter = StrandsAdapter(stratix=stratix, capture_config=CaptureConfig.full()) + adapter.connect() + + # Drive every emission path: instrument_agent triggers + # environment.config; invoke drives agent.input/output and + # _extract_run_details drives model.invoke + cost.record + tool.call. + usage = SimpleNamespace(inputTokens=10, outputTokens=5, totalTokens=15) + tool_result = SimpleNamespace(name="calc", input={"x": 1}, output=2) + conversation = SimpleNamespace(turn_count=3, messages=[]) + result = SimpleNamespace( + content="ok", + text=None, + usage=usage, + tool_results=[tool_result], + ) + agent = _FakeAgent( + name="planner", + model="anthropic.claude-v2", + result=result, + conversation=conversation, + ) + adapter.instrument_agent(agent) + agent.invoke("hi") + + # Direct lifecycle hooks + adapter.on_tool_use("ext_tool", tool_input={"a": 1}, tool_output="ok") + adapter.on_llm_call(provider="bedrock", model="claude", tokens_prompt=5) + + # Every captured payload is a Pydantic model instance — the legacy + # dict path was not used. + assert stratix.typed_payloads, "expected typed payloads to be captured" + types_seen = {type(p) for p in stratix.typed_payloads} + assert AgentInputEvent in types_seen + assert AgentOutputEvent in types_seen + assert CostRecordEvent in types_seen + assert EnvironmentConfigEvent in types_seen + assert ModelInvokeEvent in types_seen + assert ToolCallEvent in types_seen + + +def test_strands_emit_does_not_warn_after_migration() -> None: + """No DeprecationWarning fires from strands lifecycle paths. + + The base adapter's ``emit_dict_event`` raises a DeprecationWarning + on every call. After migration, strands lifecycle must never + trigger that warning. + """ + import warnings + + stratix = _RecordingStratix() + adapter = StrandsAdapter(stratix=stratix, capture_config=CaptureConfig.full()) + adapter.connect() + + with warnings.catch_warnings(): + warnings.simplefilter("error", DeprecationWarning) + + usage = SimpleNamespace(inputTokens=10, outputTokens=5, totalTokens=15) + tool_result = SimpleNamespace(name="calc", input={"x": 1}, output=2) + conversation = SimpleNamespace(turn_count=3, messages=[]) + result = SimpleNamespace( + content="ok", + text=None, + usage=usage, + tool_results=[tool_result], + ) + agent = _FakeAgent( + name="planner", + model="anthropic.claude-v2", + result=result, + conversation=conversation, + ) + adapter.instrument_agent(agent) + agent.invoke("hi") + adapter.on_tool_use("t", tool_input={"a": 1}, tool_output="ok") + adapter.on_llm_call(provider="bedrock", model="claude", tokens_prompt=5)