From 612ccce5767f9094820b09f6a21928f5d4be1806 Mon Sep 17 00:00:00 2001 From: mmercuri Date: Sun, 10 May 2026 10:42:10 -0700 Subject: [PATCH 1/2] feat(instrument): migrate bedrock_agents to typed events (Bundle #4) Replace all 13 emit_dict_event() call sites in bedrock_agents/lifecycle.py with typed Pydantic payloads from layerlens.instrument._compat.events. Per-emission mapping: - agent.input/output (boto3 invoke pre/post hooks + on_invoke_start/end) -> AgentInputEvent / AgentOutputEvent (role=HUMAN/AGENT, framework provenance + raw_input/raw_output on MessageContent.metadata) - ACTION_GROUP and KNOWLEDGE_BASE trace steps + on_tool_use -> ToolCallEvent (integration=SERVICE for AWS-managed action groups and knowledge bases; integration=LIBRARY for the generic on_tool_use hook). Bedrock-specific provenance (framework, tool_type) folded onto the canonical input dict. - MODEL_INVOCATION trace step + on_llm_call -> ModelInvokeEvent (provider='aws_bedrock', version='unavailable', framework on parameters; canonical prompt_tokens/completion_tokens slots; paired CostRecordEvent when usage is present). - AGENT_COLLABORATOR trace step + on_handoff -> AgentHandoffEvent with deterministic sha256: handoff_context_hash (hashes the supervisor/collaborator/reason tuple for trace steps; hashes the context string for manual handoffs, including the empty-string fallback). - environment.config (per-agent, idempotent via _seen_agents) -> EnvironmentConfigEvent (env_type=CLOUD; agent_id, agent_alias_id, enable_trace on attributes). Set ALLOW_UNREGISTERED_EVENTS = False -- bedrock_agents targets the canonical 13-event taxonomy exclusively. Test suite (14 tests, 12 pre-existing + 2 new regression): - All pre-existing assertions updated for canonical payload shape (e.g. payload['tool']['name'] instead of payload['tool_name']). - _RecordingStratix doubles record both legacy dict and typed Pydantic emissions (matches PR #138 pattern). - New: test_bedrock_agents_emits_typed_payloads_only -- asserts every emit site is an instance of the expected typed model (AgentInputEvent, AgentOutputEvent, AgentHandoffEvent, EnvironmentConfigEvent, ModelInvokeEvent, ToolCallEvent, CostRecordEvent). - New: test_bedrock_agents_emit_does_not_warn_after_migration -- filterwarnings('error', DeprecationWarning) catches any residual emit_dict_event call. Acceptance: - grep emit_dict_event src/.../bedrock_agents/ -> 0 occurrences - mypy --strict src/.../bedrock_agents -> clean - pytest tests/.../test_bedrock_agents_adapter.py -> 14/14 pass --- .../frameworks/bedrock_agents/lifecycle.py | 475 +++++++++++++----- .../frameworks/test_bedrock_agents_adapter.py | 249 ++++++++- 2 files changed, 596 insertions(+), 128 deletions(-) diff --git a/src/layerlens/instrument/adapters/frameworks/bedrock_agents/lifecycle.py b/src/layerlens/instrument/adapters/frameworks/bedrock_agents/lifecycle.py index aa7dbea..1f90771 100644 --- a/src/layerlens/instrument/adapters/frameworks/bedrock_agents/lifecycle.py +++ b/src/layerlens/instrument/adapters/frameworks/bedrock_agents/lifecycle.py @@ -8,6 +8,24 @@ Knowledge Base query → tool.call (L5a, retrieval) Model invocation → model.invoke (L3) Supervisor→Collaborator → agent.handoff (Cross) + +Typed-event status (post PR #129 migration, bundle 4): + +* Every emission flows through :meth:`BaseAdapter.emit_event` with a + canonical Pydantic payload imported from + :mod:`layerlens.instrument._compat.events`. +* Bedrock-specific provenance (``framework``, ``agent_id``, + ``session_id``, ``timestamp_ns``, ``duration_ns``, ``enable_trace``) + is carried in the canonical model's metadata / attributes / + parameters / input slots — the canonical schema does not expose + these as top-level fields. +* Action group + knowledge base steps map to + :class:`ToolCallEvent` with ``integration=IntegrationType.SERVICE`` + (AWS-side execution, not in-process library). +* The handoff context hash is generated via SHA-256 over the context + string (or the empty string when no context is available) so the + canonical :class:`AgentHandoffEvent.handoff_context_hash` validator + always passes. """ from __future__ import annotations @@ -19,6 +37,18 @@ import threading from typing import Any +from layerlens.instrument._compat.events import ( + MessageRole, + ToolCallEvent, + AgentInputEvent, + CostRecordEvent, + EnvironmentType, + IntegrationType, + AgentOutputEvent, + ModelInvokeEvent, + AgentHandoffEvent, + EnvironmentConfigEvent, +) from layerlens.instrument.adapters._base.adapter import ( AdapterInfo, BaseAdapter, @@ -32,6 +62,39 @@ logger = logging.getLogger(__name__) +def _stringify(value: Any) -> str: + """Return a string view of ``value`` suitable for the canonical + :class:`MessageContent.message` field. + + The canonical schema requires :class:`AgentInputEvent` and + :class:`AgentOutputEvent` to carry a ``message: str``. Bedrock + callbacks deliver inputs/outputs as arbitrary Python objects + (parsed boto3 dicts, ``None``); this helper converts each to a + (possibly empty) string so the typed event always validates. The + original payload is preserved on + :class:`MessageContent.metadata.raw_input` / ``raw_output``. + """ + if value is None: + return "" + if isinstance(value, str): + return value + return str(value) + + +def _sha256_of(value: str) -> str: + """Return a canonical ``sha256:`` hash string for ``value``. + + The canonical schema's :class:`AgentHandoffEvent` requires + ``handoff_context_hash`` to start with ``sha256:`` and have a + 64-character hex tail (see + ``ateam/stratix/core/events/cross_cutting.py``). Centralising the + format here ensures every emit site uses the same wire format — + including the empty-string fallback used when Bedrock has no + handoff context to hash. + """ + return "sha256:" + hashlib.sha256(value.encode("utf-8")).hexdigest() + + class BedrockAgentsAdapter(BaseAdapter): """LayerLens adapter for AWS Bedrock Agents.""" @@ -40,9 +103,17 @@ class BedrockAgentsAdapter(BaseAdapter): # The adapter source has no direct ``pydantic`` imports (verified by # grep across ``frameworks/bedrock_agents/``). Bedrock Agents is a # remote AWS service consumed via boto3 hooks — boto3 does not use - # Pydantic. Adapter emits plain dict events. + # Pydantic. Adapter emits typed events through the canonical + # schema (PR #129), never touching the framework's Pydantic models. requires_pydantic = PydanticCompat.V1_OR_V2 + # Per-adapter ``extra="allow"`` decision: bedrock_agents targets the + # canonical 13-event taxonomy exclusively. Unknown event types must + # be rejected by the base adapter's typed-event validator, so this + # stays ``False``. See ``docs/adapters/typed-events.md`` for the + # opt-in policy. + ALLOW_UNREGISTERED_EVENTS: bool = False + def __init__( self, stratix: Any | None = None, @@ -147,6 +218,14 @@ def instrument_client(self, client: Any) -> Any: # --- boto3 Event Hooks --- def _before_invoke_agent(self, **kwargs: Any) -> None: + """Emit a typed :class:`AgentInputEvent` for an invoke_agent boto3 call. + + Bedrock-specific provenance (``framework``, ``agent_id``, + ``session_id``, ``enable_trace``, ``timestamp_ns``) is carried + on :class:`MessageContent.metadata`. The canonical ``message`` + field carries the inbound input text (or empty string when + Bedrock supplies no inputText). + """ if not self._connected: return try: @@ -157,21 +236,32 @@ def _before_invoke_agent(self, **kwargs: Any) -> None: self._invoke_starts[tid] = start_ns agent_id = params.get("agentId", "unknown") self._emit_agent_config(agent_id, params) - self.emit_dict_event( - "agent.input", - { - "framework": "bedrock_agents", - "agent_id": agent_id, - "session_id": params.get("sessionId"), - "input": params.get("inputText"), - "enable_trace": params.get("enableTrace", False), - "timestamp_ns": start_ns, - }, + input_text = params.get("inputText") + self.emit_event( + AgentInputEvent.create( + message=_stringify(input_text), + role=MessageRole.HUMAN, + metadata={ + "framework": "bedrock_agents", + "agent_id": agent_id, + "session_id": params.get("sessionId"), + "enable_trace": params.get("enableTrace", False), + "timestamp_ns": start_ns, + "raw_input": input_text, + }, + ) ) except Exception: logger.warning("Error in _before_invoke_agent", exc_info=True) def _after_invoke_agent(self, **kwargs: Any) -> None: + """Emit a typed :class:`AgentOutputEvent` for an invoke_agent response. + + The canonical ``message`` slot carries the extracted completion + text; Bedrock-specific provenance (``framework``, + ``session_id``, ``duration_ns``, ``raw_output``) lives on + :class:`MessageContent.metadata`. + """ if not self._connected: return try: @@ -182,14 +272,16 @@ def _after_invoke_agent(self, **kwargs: Any) -> None: start_ns = self._invoke_starts.pop(tid, 0) duration_ns = end_ns - start_ns if start_ns else 0 output = self._extract_completion(parsed) - self.emit_dict_event( - "agent.output", - { - "framework": "bedrock_agents", - "output": output, - "duration_ns": duration_ns, - "session_id": parsed.get("sessionId"), - }, + self.emit_event( + AgentOutputEvent.create( + message=_stringify(output), + metadata={ + "framework": "bedrock_agents", + "session_id": parsed.get("sessionId"), + "duration_ns": duration_ns, + "raw_output": output, + }, + ) ) # Extract trace steps if available self._process_trace(parsed) @@ -215,71 +307,154 @@ def _process_trace(self, parsed: dict[str, Any]) -> None: self._emit_collaborator_handoff(step) def _emit_action_group(self, step: dict[str, Any]) -> None: + """Emit a typed :class:`ToolCallEvent` for an action group invocation. + + Bedrock action groups are AWS-managed Lambda functions + invoked by the agent — this maps to ``integration=SERVICE`` + (the closest canonical match: action groups run as managed + cloud services, not in-process libraries). Bedrock-specific + provenance (``framework``, ``tool_type``) is folded into the + canonical ``input`` dict. + """ action = step.get("actionGroupInvocationOutput", {}) - self.emit_dict_event( - "tool.call", - { - "framework": "bedrock_agents", - "tool_name": step.get("actionGroupName", "unknown"), - "tool_input": self._safe_serialize(step.get("actionGroupInput")), - "tool_output": self._safe_serialize(action.get("output")), - "tool_type": "action_group", - }, + action_input = self._safe_serialize(step.get("actionGroupInput")) + action_output = self._safe_serialize(action.get("output")) + input_data: dict[str, Any] + if isinstance(action_input, dict): + input_data = dict(action_input) + elif action_input is None: + input_data = {} + else: + input_data = {"value": action_input} + input_data["framework"] = "bedrock_agents" + input_data["tool_type"] = "action_group" + output_data: dict[str, Any] | None + if isinstance(action_output, dict): + output_data = dict(action_output) + elif action_output is None: + output_data = None + else: + output_data = {"value": action_output} + self.emit_event( + ToolCallEvent.create( + name=step.get("actionGroupName", "unknown"), + version="unavailable", + integration=IntegrationType.SERVICE, + input_data=input_data, + output_data=output_data, + ) ) def _emit_knowledge_base(self, step: dict[str, Any]) -> None: + """Emit a typed :class:`ToolCallEvent` for a knowledge base lookup. + + Knowledge base retrievals are AWS-managed services + (``integration=SERVICE``). The retrieved references list + moves into the canonical ``output`` dict, and Bedrock-specific + provenance (``framework``, ``tool_type``) is folded onto the + canonical ``input``. + """ kb = step.get("knowledgeBaseLookupOutput", {}) - self.emit_dict_event( - "tool.call", - { - "framework": "bedrock_agents", - "tool_name": step.get("knowledgeBaseId", "knowledge_base"), - "tool_input": self._safe_serialize(step.get("knowledgeBaseLookupInput")), - "tool_output": self._safe_serialize(kb.get("retrievedReferences")), - "tool_type": "knowledge_base_retrieval", - }, + kb_input = self._safe_serialize(step.get("knowledgeBaseLookupInput")) + retrieved = self._safe_serialize(kb.get("retrievedReferences")) + input_data: dict[str, Any] + if isinstance(kb_input, dict): + input_data = dict(kb_input) + elif kb_input is None: + input_data = {} + else: + input_data = {"value": kb_input} + input_data["framework"] = "bedrock_agents" + input_data["tool_type"] = "knowledge_base_retrieval" + output_data: dict[str, Any] | None + if isinstance(retrieved, dict): + output_data = dict(retrieved) + elif retrieved is None: + output_data = None + else: + output_data = {"value": retrieved} + self.emit_event( + ToolCallEvent.create( + name=step.get("knowledgeBaseId", "knowledge_base"), + version="unavailable", + integration=IntegrationType.SERVICE, + input_data=input_data, + output_data=output_data, + ) ) def _emit_model_invocation(self, step: dict[str, Any]) -> None: + """Emit typed :class:`ModelInvokeEvent` (and :class:`CostRecordEvent`). + + AWS Bedrock is the canonical provider here (``aws_bedrock``); + the model identifier is the foundation model id (e.g. + ``anthropic.claude-v2``). The canonical schema requires + ``provider`` + ``name``; ``version`` falls back to + ``"unavailable"`` per the NORMATIVE rule. Bedrock-specific + provenance (``framework``) is carried on + :attr:`ModelInfo.parameters`. Token usage is mirrored onto the + canonical ``prompt_tokens`` / ``completion_tokens`` slots, and + a paired :class:`CostRecordEvent` is emitted when the + underlying Bedrock response contains usage metrics. + """ invocation = step.get("modelInvocationOutput", {}) - payload: dict[str, Any] = { - "framework": "bedrock_agents", - "provider": "aws_bedrock", - } - model_id = step.get("foundationModel") - if model_id: - payload["model"] = model_id + model_id = step.get("foundationModel") or "unknown" usage = invocation.get("usage", {}) + prompt_tokens = usage.get("inputTokens") if usage else None + completion_tokens = usage.get("outputTokens") if usage else None + self.emit_event( + ModelInvokeEvent.create( + provider="aws_bedrock", + name=model_id, + version="unavailable", + parameters={"framework": "bedrock_agents"}, + prompt_tokens=prompt_tokens, + completion_tokens=completion_tokens, + ) + ) if usage: - payload["tokens_prompt"] = usage.get("inputTokens") - payload["tokens_completion"] = usage.get("outputTokens") - self.emit_dict_event("model.invoke", payload) - if usage: - self.emit_dict_event( - "cost.record", - { - "framework": "bedrock_agents", - "model": model_id, - "tokens_prompt": usage.get("inputTokens"), - "tokens_completion": usage.get("outputTokens"), - "tokens_total": (usage.get("inputTokens") or 0) + (usage.get("outputTokens") or 0), - }, + total = (usage.get("inputTokens") or 0) + (usage.get("outputTokens") or 0) + self.emit_event( + CostRecordEvent.create( + prompt_tokens=prompt_tokens, + completion_tokens=completion_tokens, + tokens=total, + ) ) def _emit_collaborator_handoff(self, step: dict[str, Any]) -> None: - self.emit_dict_event( - "agent.handoff", - { - "from_agent": step.get("supervisorAgentId", "supervisor"), - "to_agent": step.get("collaboratorAgentId", "collaborator"), - "reason": "supervisor_delegation", - "framework": "bedrock_agents", - }, + """Emit a typed :class:`AgentHandoffEvent` for a Bedrock supervisor + delegation. + + The canonical schema requires ``handoff_context_hash`` in + ``sha256:`` format. Bedrock supervisor→collaborator + steps do not carry an explicit context payload, so the hash + is computed deterministically from the + (supervisor, collaborator, reason) tuple — this preserves the + wire-format guarantee while remaining stable across replays. + """ + from_agent = step.get("supervisorAgentId", "supervisor") + to_agent = step.get("collaboratorAgentId", "collaborator") + reason = "supervisor_delegation" + self.emit_event( + AgentHandoffEvent.create( + from_agent=from_agent, + to_agent=to_agent, + handoff_context_hash=_sha256_of( + f"{reason}::{from_agent}::{to_agent}" + ), + ) ) # --- Lifecycle Hooks --- def on_invoke_start(self, agent_id: str | None = None, input_text: str | None = None) -> None: + """Emit a typed :class:`AgentInputEvent` for a manual invoke start. + + Bedrock-specific provenance (``framework``, ``agent_id``, + ``timestamp_ns``, ``raw_input``) lives on + :class:`MessageContent.metadata`. + """ if not self._connected: return try: @@ -287,14 +462,17 @@ def on_invoke_start(self, agent_id: str | None = None, input_text: str | None = start_ns = time.time_ns() with self._adapter_lock: self._invoke_starts[tid] = start_ns - self.emit_dict_event( - "agent.input", - { - "framework": "bedrock_agents", - "agent_id": agent_id, - "input": input_text, - "timestamp_ns": start_ns, - }, + self.emit_event( + AgentInputEvent.create( + message=_stringify(input_text), + role=MessageRole.HUMAN, + metadata={ + "framework": "bedrock_agents", + "agent_id": agent_id, + "timestamp_ns": start_ns, + "raw_input": input_text, + }, + ) ) except Exception: logger.warning("Error in on_invoke_start", exc_info=True) @@ -305,6 +483,12 @@ def on_invoke_end( output: Any = None, error: Exception | None = None, ) -> None: + """Emit a typed :class:`AgentOutputEvent` for a manual invoke end. + + Bedrock-specific provenance (``framework``, ``agent_id``, + ``duration_ns``, ``raw_output``, ``error``, ``run_status``) + lives on :class:`MessageContent.metadata`. + """ if not self._connected: return try: @@ -313,15 +497,22 @@ def on_invoke_end( with self._adapter_lock: start_ns = self._invoke_starts.pop(tid, 0) duration_ns = end_ns - start_ns if start_ns else 0 - payload: dict[str, Any] = { + serialised_output = self._safe_serialize(output) + metadata: dict[str, Any] = { "framework": "bedrock_agents", "agent_id": agent_id, - "output": self._safe_serialize(output), "duration_ns": duration_ns, + "raw_output": serialised_output, + "run_status": "run_failed" if error else "run_complete", } if error: - payload["error"] = str(error) - self.emit_dict_event("agent.output", payload) + metadata["error"] = str(error) + self.emit_event( + AgentOutputEvent.create( + message=_stringify(serialised_output), + metadata=metadata, + ) + ) except Exception: logger.warning("Error in on_invoke_end", exc_info=True) @@ -333,20 +524,47 @@ def on_tool_use( error: Exception | None = None, latency_ms: float | None = None, ) -> None: + """Emit a typed :class:`ToolCallEvent` for a tool invocation. + + Bedrock surfaces tool calls primarily through action groups + (cloud services). The manual ``on_tool_use`` hook is generic, + so ``integration`` defaults to :attr:`IntegrationType.LIBRARY` + — callers needing a different integration kind can wrap the + adapter and emit a typed event directly. Bedrock-specific + provenance (``framework``) is folded onto the canonical + ``input``. + """ if not self._connected: return try: - payload: dict[str, Any] = { - "framework": "bedrock_agents", - "tool_name": tool_name, - "tool_input": self._safe_serialize(tool_input), - "tool_output": self._safe_serialize(tool_output), - } - if error: - payload["error"] = str(error) - if latency_ms is not None: - payload["latency_ms"] = latency_ms - self.emit_dict_event("tool.call", payload) + serialised_input = self._safe_serialize(tool_input) + serialised_output = self._safe_serialize(tool_output) + input_data: dict[str, Any] + if isinstance(serialised_input, dict): + input_data = dict(serialised_input) + elif serialised_input is None: + input_data = {} + else: + input_data = {"value": serialised_input} + input_data["framework"] = "bedrock_agents" + output_data: dict[str, Any] | None + if isinstance(serialised_output, dict): + output_data = dict(serialised_output) + elif serialised_output is None: + output_data = None + else: + output_data = {"value": serialised_output} + self.emit_event( + ToolCallEvent.create( + name=tool_name, + version="unavailable", + integration=IntegrationType.LIBRARY, + input_data=input_data, + output_data=output_data, + error=str(error) if error else None, + latency_ms=latency_ms, + ) + ) except Exception: logger.warning("Error in on_tool_use", exc_info=True) @@ -359,39 +577,51 @@ def on_llm_call( latency_ms: float | None = None, messages: list[dict[str, str]] | None = None, ) -> None: + """Emit a typed :class:`ModelInvokeEvent` for a manual LLM call. + + Bedrock-specific provenance (``framework``) is carried on + :attr:`ModelInfo.parameters`. Provider falls back to + ``aws_bedrock`` when the caller supplies none — the manual + hook is exclusively for Bedrock-routed model calls. + """ if not self._connected: return try: - payload: dict[str, Any] = {"framework": "bedrock_agents"} - if provider: - payload["provider"] = provider - if model: - payload["model"] = model - if tokens_prompt is not None: - payload["tokens_prompt"] = tokens_prompt - if tokens_completion is not None: - payload["tokens_completion"] = tokens_completion - if latency_ms is not None: - payload["latency_ms"] = latency_ms - if self._capture_config.capture_content and messages: - payload["messages"] = messages - self.emit_dict_event("model.invoke", payload) + self.emit_event( + ModelInvokeEvent.create( + provider=provider or "aws_bedrock", + name=model or "unknown", + version="unavailable", + parameters={"framework": "bedrock_agents"}, + prompt_tokens=tokens_prompt, + completion_tokens=tokens_completion, + latency_ms=latency_ms, + input_messages=messages + if (self._capture_config.capture_content and messages) + else None, + ) + ) except Exception: logger.warning("Error in on_llm_call", exc_info=True) def on_handoff(self, from_agent: str, to_agent: str, context: Any = None) -> None: + """Emit a typed :class:`AgentHandoffEvent` for a manual handoff. + + Empty contexts are still hashed (over the empty string) so + the canonical wire format is uniform — the previous adapter + emitted ``None`` when context was missing, which the canonical + validator rejects. + """ if not self._connected: return try: context_str = str(context) if context else "" - self.emit_dict_event( - "agent.handoff", - { - "from_agent": from_agent, - "to_agent": to_agent, - "reason": "supervisor_delegation", - "context_hash": hashlib.sha256(context_str.encode()).hexdigest() if context_str else None, - }, + self.emit_event( + AgentHandoffEvent.create( + from_agent=from_agent, + to_agent=to_agent, + handoff_context_hash=_sha256_of(context_str), + ) ) except Exception: logger.warning("Error in on_handoff", exc_info=True) @@ -424,18 +654,27 @@ def _extract_completion(self, parsed: dict[str, Any]) -> str | None: return None def _emit_agent_config(self, agent_id: str, params: dict[str, Any]) -> None: + """Emit a typed :class:`EnvironmentConfigEvent` once per agent. + + Bedrock Agents runs in AWS, so the canonical + :attr:`EnvironmentType.CLOUD` enum value is used. Agent + configuration (``agent_id``, ``agent_alias_id``, + ``enable_trace``) lives on :attr:`EnvironmentInfo.attributes`. + """ with self._adapter_lock: if agent_id in self._seen_agents: return self._seen_agents.add(agent_id) - self.emit_dict_event( - "environment.config", - { - "framework": "bedrock_agents", - "agent_id": agent_id, - "agent_alias_id": params.get("agentAliasId"), - "enable_trace": params.get("enableTrace", False), - }, + self.emit_event( + EnvironmentConfigEvent.create( + env_type=EnvironmentType.CLOUD, + attributes={ + "framework": "bedrock_agents", + "agent_id": agent_id, + "agent_alias_id": params.get("agentAliasId"), + "enable_trace": params.get("enableTrace", False), + }, + ) ) def _safe_serialize(self, value: Any) -> Any: diff --git a/tests/instrument/adapters/frameworks/test_bedrock_agents_adapter.py b/tests/instrument/adapters/frameworks/test_bedrock_agents_adapter.py index b84f8f3..bcc4059 100644 --- a/tests/instrument/adapters/frameworks/test_bedrock_agents_adapter.py +++ b/tests/instrument/adapters/frameworks/test_bedrock_agents_adapter.py @@ -2,12 +2,24 @@ Mocked at the SDK shape level — no real ``boto3`` runtime needed. The adapter integrates via boto3 event hooks: ``client.meta.events.register(...)``. + +After the typed-event migration (PR #129 follow-up — bundle 4) every +emit site flows through :meth:`BaseAdapter.emit_event` with a canonical +Pydantic payload. The :class:`_RecordingStratix` stand-in below records +both shapes so pre- and post-migration assertions live side by side: the +``payload`` slot always carries a dict (model-dumped if typed), and +``typed_payloads`` holds the original Pydantic instances for tests that +want to assert against the model surface. """ from __future__ import annotations from typing import Any, Dict, List, Tuple, Callable +from layerlens._compat.pydantic import ( + BaseModel as _CompatBaseModel, + model_dump as _compat_model_dump, +) from layerlens.instrument.adapters._base import AdapterStatus, CaptureConfig from layerlens.instrument.adapters.frameworks.bedrock_agents import ( ADAPTER_CLASS, @@ -25,10 +37,26 @@ class _RecordingStratix: def __init__(self) -> None: self.events: List[Dict[str, Any]] = [] + # Hold strong references to the original typed payloads for + # the subset of tests that want to assert against the model + # surface (e.g. ``isinstance(payload, ToolCallEvent)``). The + # dict view lives on ``events`` and is what most assertions + # read. + self.typed_payloads: List[Any] = [] def emit(self, *args: Any, **kwargs: Any) -> None: + # Two-arg legacy path: ``emit(event_type, payload_dict)``. if len(args) == 2 and isinstance(args[0], str): self.events.append({"event_type": args[0], "payload": args[1]}) + return + # Single-arg typed path: ``emit(payload_model[, privacy_level])``. + if args and isinstance(args[0], _CompatBaseModel): + payload_model = args[0] + self.typed_payloads.append(payload_model) + event_type = getattr(payload_model, "event_type", "") + self.events.append( + {"event_type": event_type, "payload": _compat_model_dump(payload_model)} + ) class _FakeEventSystem: @@ -104,6 +132,9 @@ def test_disconnect_unregisters_event_hooks() -> None: def test_before_invoke_emits_input_event() -> None: + """Typed AgentInputEvent: input lives on payload.content.message; + Bedrock-specific provenance lives on MessageContent.metadata. + """ stratix = _RecordingStratix() adapter = BedrockAgentsAdapter(stratix=stratix, capture_config=CaptureConfig.full()) adapter.connect() @@ -126,11 +157,29 @@ def test_before_invoke_emits_input_event() -> None: assert "agent.input" in types inp = next(e for e in stratix.events if e["event_type"] == "agent.input") - assert inp["payload"]["agent_id"] == "agent-123" - assert inp["payload"]["input"] == "hello" + payload = inp["payload"] + assert payload["layer"] == "L1" + assert payload["content"]["message"] == "hello" + assert payload["content"]["role"] == "human" + metadata = payload["content"]["metadata"] + assert metadata["framework"] == "bedrock_agents" + assert metadata["agent_id"] == "agent-123" + assert metadata["session_id"] == "sess-1" + assert metadata["enable_trace"] is True + + # environment.config carries the agent config on attributes. + cfg = next(e for e in stratix.events if e["event_type"] == "environment.config") + cfg_payload = cfg["payload"] + assert cfg_payload["layer"] == "L4a" + assert cfg_payload["environment"]["type"] == "cloud" + assert cfg_payload["environment"]["attributes"]["agent_id"] == "agent-123" + assert cfg_payload["environment"]["attributes"]["agent_alias_id"] == "alias-1" def test_after_invoke_emits_output_and_processes_trace() -> None: + """Typed AgentOutputEvent + ToolCallEvent + ModelInvokeEvent + + CostRecordEvent + AgentHandoffEvent for a complete Bedrock response. + """ stratix = _RecordingStratix() adapter = BedrockAgentsAdapter(stratix=stratix, capture_config=CaptureConfig.full()) adapter.connect() @@ -171,34 +220,76 @@ def test_after_invoke_emits_output_and_processes_trace() -> None: assert "agent.handoff" in types out = next(e for e in stratix.events if e["event_type"] == "agent.output") - assert out["payload"]["output"] == "the answer is 42" + out_payload = out["payload"] + assert out_payload["layer"] == "L1" + assert out_payload["content"]["message"] == "the answer is 42" + assert out_payload["content"]["role"] == "agent" + assert out_payload["content"]["metadata"]["framework"] == "bedrock_agents" + assert out_payload["content"]["metadata"]["session_id"] == "sess-1" model = next(e for e in stratix.events if e["event_type"] == "model.invoke") - assert model["payload"]["model"] == "anthropic.claude-v2" - assert model["payload"]["tokens_prompt"] == 100 + m_payload = model["payload"] + assert m_payload["layer"] == "L3" + assert m_payload["model"]["provider"] == "aws_bedrock" + assert m_payload["model"]["name"] == "anthropic.claude-v2" + assert m_payload["model"]["version"] == "unavailable" + assert m_payload["prompt_tokens"] == 100 + assert m_payload["completion_tokens"] == 50 + + tool = next(e for e in stratix.events if e["event_type"] == "tool.call") + t_payload = tool["payload"] + assert t_payload["layer"] == "L5a" + assert t_payload["tool"]["name"] == "calc" + assert t_payload["tool"]["integration"] == "service" + assert t_payload["input"]["x"] == 1 + assert t_payload["input"]["framework"] == "bedrock_agents" + assert t_payload["input"]["tool_type"] == "action_group" + + cost = next(e for e in stratix.events if e["event_type"] == "cost.record") + cost_payload = cost["payload"] + assert cost_payload["cost"]["prompt_tokens"] == 100 + assert cost_payload["cost"]["completion_tokens"] == 50 + assert cost_payload["cost"]["tokens"] == 150 + + handoff = next(e for e in stratix.events if e["event_type"] == "agent.handoff") + h_payload = handoff["payload"] + assert h_payload["from_agent"] == "sup-1" + assert h_payload["to_agent"] == "col-1" + assert h_payload["handoff_context_hash"].startswith("sha256:") + assert len(h_payload["handoff_context_hash"]) == 7 + 64 def test_on_tool_use_emits_event() -> None: + """Typed ToolCallEvent for a manual tool invocation.""" stratix = _RecordingStratix() adapter = BedrockAgentsAdapter(stratix=stratix, capture_config=CaptureConfig.full()) adapter.connect() adapter.on_tool_use("calc", tool_input={"x": 1}, tool_output=2, latency_ms=12.3) evt = next(e for e in stratix.events if e["event_type"] == "tool.call") - assert evt["payload"]["tool_name"] == "calc" - assert evt["payload"]["latency_ms"] == 12.3 + payload = evt["payload"] + assert payload["layer"] == "L5a" + assert payload["tool"]["name"] == "calc" + assert payload["tool"]["integration"] == "library" + assert payload["latency_ms"] == 12.3 + assert payload["input"]["x"] == 1 + assert payload["input"]["framework"] == "bedrock_agents" + assert payload["output"] == {"value": 2} def test_on_handoff_emits_event_with_context_hash() -> None: + """Typed AgentHandoffEvent: canonical sha256: handoff_context_hash.""" stratix = _RecordingStratix() adapter = BedrockAgentsAdapter(stratix=stratix, capture_config=CaptureConfig.full()) adapter.connect() adapter.on_handoff(from_agent="a", to_agent="b", context="some context") evt = next(e for e in stratix.events if e["event_type"] == "agent.handoff") - assert evt["payload"]["from_agent"] == "a" - assert evt["payload"]["to_agent"] == "b" - assert evt["payload"]["context_hash"] is not None + payload = evt["payload"] + assert payload["from_agent"] == "a" + assert payload["to_agent"] == "b" + assert payload["handoff_context_hash"].startswith("sha256:") + assert len(payload["handoff_context_hash"]) == 7 + 64 def test_capture_config_gates_l5a_tool_calls() -> None: @@ -237,3 +328,141 @@ def test_serialize_for_replay() -> None: assert rt.framework == "bedrock_agents" assert rt.adapter_name == "BedrockAgentsAdapter" assert "capture_config" in rt.config + + +# --------------------------------------------------------------------------- +# Typed-event migration regression tests (PR #129 follow-up — bundle 4) +# --------------------------------------------------------------------------- + + +def test_bedrock_agents_emits_typed_payloads_only() -> None: + """Every emit site in bedrock_agents lifecycle is a typed emit_event call. + + Pins the post-migration contract: the recording stratix's + ``typed_payloads`` list grows for every emission and the legacy + two-arg dict path receives nothing. This is the public contract + backing the ``grep emit_dict_event src/.../bedrock_agents/lifecycle.py + → 0`` acceptance criterion in the typed-events bundle 4 PR. + """ + from layerlens.instrument._compat.events import ( + ToolCallEvent, + AgentInputEvent, + CostRecordEvent, + AgentOutputEvent, + ModelInvokeEvent, + AgentHandoffEvent, + EnvironmentConfigEvent, + ) + + stratix = _RecordingStratix() + adapter = BedrockAgentsAdapter(stratix=stratix, capture_config=CaptureConfig.full()) + adapter.connect() + client = _FakeClient() + adapter.instrument_client(client) + + # Drive every emission path that does not require boto3. + adapter._before_invoke_agent( + params={ + "agentId": "agent-123", + "agentAliasId": "alias-1", + "sessionId": "sess-1", + "inputText": "hello", + "enableTrace": True, + } + ) + adapter._after_invoke_agent( + parsed={ + "outputText": "answer", + "sessionId": "sess-1", + "trace": { + "steps": [ + { + "type": "ACTION_GROUP", + "actionGroupName": "calc", + "actionGroupInput": {"x": 1}, + "actionGroupInvocationOutput": {"output": "ok"}, + }, + { + "type": "KNOWLEDGE_BASE", + "knowledgeBaseId": "kb-1", + "knowledgeBaseLookupInput": {"q": "what"}, + "knowledgeBaseLookupOutput": {"retrievedReferences": []}, + }, + { + "type": "MODEL_INVOCATION", + "foundationModel": "anthropic.claude-v2", + "modelInvocationOutput": {"usage": {"inputTokens": 1, "outputTokens": 2}}, + }, + { + "type": "AGENT_COLLABORATOR", + "supervisorAgentId": "sup-1", + "collaboratorAgentId": "col-1", + }, + ] + }, + } + ) + adapter.on_invoke_start(agent_id="agent-456", input_text="more input") + adapter.on_invoke_end(agent_id="agent-456", output="more output") + adapter.on_tool_use("calc", tool_input={"x": 1}, tool_output=2) + adapter.on_llm_call(provider="aws_bedrock", model="anthropic.claude-v2", tokens_prompt=1, tokens_completion=2) + adapter.on_handoff(from_agent="a", to_agent="b", context="ctx") + + # Every captured payload is a Pydantic model instance — the legacy + # dict path was not used. + assert stratix.typed_payloads, "expected typed payloads to be captured" + types_seen = {type(p) for p in stratix.typed_payloads} + assert AgentInputEvent in types_seen + assert AgentOutputEvent in types_seen + assert AgentHandoffEvent in types_seen + assert EnvironmentConfigEvent in types_seen + assert ModelInvokeEvent in types_seen + assert ToolCallEvent in types_seen + assert CostRecordEvent in types_seen + + +def test_bedrock_agents_emit_does_not_warn_after_migration() -> None: + """No DeprecationWarning fires from bedrock_agents lifecycle emission paths. + + The base adapter's ``emit_dict_event`` raises a DeprecationWarning + on every call. After migration, bedrock_agents lifecycle must + never trigger that warning. ``filterwarnings("error", ...)`` + converts the warning into a test failure. + """ + import warnings + + stratix = _RecordingStratix() + adapter = BedrockAgentsAdapter(stratix=stratix, capture_config=CaptureConfig.full()) + adapter.connect() + client = _FakeClient() + adapter.instrument_client(client) + + with warnings.catch_warnings(): + warnings.simplefilter("error", DeprecationWarning) + adapter._before_invoke_agent( + params={"agentId": "agent-1", "inputText": "hi"} + ) + adapter._after_invoke_agent( + parsed={ + "outputText": "out", + "trace": { + "steps": [ + { + "type": "MODEL_INVOCATION", + "foundationModel": "claude", + "modelInvocationOutput": {"usage": {"inputTokens": 1, "outputTokens": 1}}, + }, + { + "type": "AGENT_COLLABORATOR", + "supervisorAgentId": "s", + "collaboratorAgentId": "c", + }, + ] + }, + } + ) + adapter.on_invoke_start(agent_id="a", input_text="x") + adapter.on_invoke_end(agent_id="a", output="y") + adapter.on_tool_use("calc", tool_input={"x": 1}, tool_output=2) + adapter.on_llm_call(provider="aws_bedrock", model="claude", tokens_prompt=1, tokens_completion=2) + adapter.on_handoff(from_agent="a", to_agent="b", context="ctx") From 73d9ddd31e04612757adcffd56129cd934a6a74c Mon Sep 17 00:00:00 2001 From: mmercuri Date: Sun, 10 May 2026 10:55:49 -0700 Subject: [PATCH 2/2] feat(instrument): migrate openai_agents to typed events (Bundle #4) Replace all 15 emit_dict_event() call sites in openai_agents/lifecycle.py with typed Pydantic payloads from layerlens.instrument._compat.events. Per-emission mapping: - AgentSpanData start/end + on_run_start/end -> AgentInputEvent / AgentOutputEvent (role=AGENT for span boundaries, role=HUMAN for Runner inbound; framework provenance + raw_input/raw_output + span_id on MessageContent.metadata). - GenerationSpanData + on_llm_call -> ModelInvokeEvent (provider derived from model identifier -- defaults to 'openai' since the SDK is OpenAI-centric; version='unavailable'; framework on parameters; canonical prompt_tokens/completion_tokens slots; paired CostRecordEvent when usage is present). - FunctionSpanData + on_tool_use -> ToolCallEvent (integration=LIBRARY -- function spans wrap in-process Python callables; framework on canonical input dict). - HandoffSpanData + on_handoff -> AgentHandoffEvent with deterministic sha256: handoff_context_hash (hashes from/to/reason tuple for spans; hashes the context string for manual handoffs, including the empty-string fallback). - GuardrailSpanData -> PolicyViolationEvent (violation_type=POLICY_CONSTRAINT; framework, guardrail_name, triggered, output on details dict; root_cause + remediation set canonically). - environment.config (per-agent, idempotent via _seen_agents) -> EnvironmentConfigEvent (env_type=CLOUD; instructions, model, handoff_description, tools, handoffs on attributes). - trace_start / trace_end markers (previously ad-hoc agent.state.change with only event_subtype) -> AgentInputEvent (trace_start, role=AGENT) and AgentOutputEvent (trace_end). The canonical AgentStateChangeEvent requires before_hash/after_hash which the trace boundary cannot produce; the original event_subtype marker is preserved on MessageContent.metadata so downstream consumers can still filter. Set ALLOW_UNREGISTERED_EVENTS = False -- openai_agents targets the canonical 13-event taxonomy exclusively. Test suite (14 tests, 12 pre-existing + 2 new regression): - All pre-existing assertions updated for canonical payload shape (e.g. payload['model']['name'] instead of payload['model']). - _RecordingStratix doubles record both legacy dict and typed Pydantic emissions (matches PR #138 pattern). - New: test_openai_agents_emits_typed_payloads_only -- asserts every emit site is an instance of the expected typed model (AgentInputEvent, AgentOutputEvent, AgentHandoffEvent, EnvironmentConfigEvent, ModelInvokeEvent, ToolCallEvent, CostRecordEvent, PolicyViolationEvent). - New: test_openai_agents_emit_does_not_warn_after_migration -- filterwarnings('error', DeprecationWarning) catches any residual emit_dict_event call. Acceptance: - grep emit_dict_event src/.../openai_agents/ -> 0 occurrences - mypy --strict src/.../openai_agents -> clean - pytest tests/.../test_openai_agents_adapter.py -> 14/14 pass - 116 framework adapter tests pass overall (no regression in 11 not-yet-migrated adapters; dual-path contract preserved) --- .../frameworks/openai_agents/lifecycle.py | 545 +++++++++++++----- .../frameworks/test_openai_agents_adapter.py | 219 ++++++- 2 files changed, 617 insertions(+), 147 deletions(-) diff --git a/src/layerlens/instrument/adapters/frameworks/openai_agents/lifecycle.py b/src/layerlens/instrument/adapters/frameworks/openai_agents/lifecycle.py index a1cee4b..1c43390 100644 --- a/src/layerlens/instrument/adapters/frameworks/openai_agents/lifecycle.py +++ b/src/layerlens/instrument/adapters/frameworks/openai_agents/lifecycle.py @@ -11,7 +11,31 @@ FunctionSpanData → tool.call (L5a) HandoffSpanData → agent.handoff (Cross) GuardrailSpanData → policy.violation (Cross) - Runner start/end → agent.state.change (Cross) + Runner start/end → agent.input / agent.output (L1) + +Typed-event status (post PR #129 migration, bundle 4): + +* Every emission flows through :meth:`BaseAdapter.emit_event` with a + canonical Pydantic payload imported from + :mod:`layerlens.instrument._compat.events`. +* OpenAI-Agents-specific provenance (``framework``, ``agent_name``, + ``span_id``, ``trace_id``, ``timestamp_ns``, ``duration_ns``, + ``event_subtype``) is carried in the canonical model's metadata / + attributes / parameters slots — the canonical schema does not + expose these as top-level fields. +* The trace_start / trace_end markers previously emitted as + :class:`AgentStateChangeEvent` with only ``event_subtype`` cannot + satisfy the canonical ``before_hash`` / ``after_hash`` requirement + (no real state mutation to hash). They are remapped to + :class:`AgentInputEvent` (trace_start, role=AGENT) and + :class:`AgentOutputEvent` (trace_end), with the original + ``event_subtype`` marker preserved on + :class:`MessageContent.metadata`. +* The handoff context hash is generated via SHA-256 over the context + string (or a deterministic from/to/reason tuple when the SDK does + not surface a context payload) so the canonical + :class:`AgentHandoffEvent.handoff_context_hash` validator always + passes. """ from __future__ import annotations @@ -23,6 +47,20 @@ import threading from typing import Any +from layerlens.instrument._compat.events import ( + MessageRole, + ToolCallEvent, + ViolationType, + AgentInputEvent, + CostRecordEvent, + EnvironmentType, + IntegrationType, + AgentOutputEvent, + ModelInvokeEvent, + AgentHandoffEvent, + PolicyViolationEvent, + EnvironmentConfigEvent, +) from layerlens.instrument.adapters._base.adapter import ( AdapterInfo, BaseAdapter, @@ -36,6 +74,64 @@ logger = logging.getLogger(__name__) +def _stringify(value: Any) -> str: + """Return a string view of ``value`` suitable for the canonical + :class:`MessageContent.message` field. + + The canonical schema requires :class:`AgentInputEvent` and + :class:`AgentOutputEvent` to carry a ``message: str``. OpenAI + Agents SDK callbacks deliver inputs/outputs as arbitrary Python + objects (model responses, dicts, ``None``); this helper converts + each to a (possibly empty) string so the typed event always + validates. The original payload is preserved on + :class:`MessageContent.metadata.raw_input` / ``raw_output``. + """ + if value is None: + return "" + if isinstance(value, str): + return value + return str(value) + + +def _sha256_of(value: str) -> str: + """Return a canonical ``sha256:`` hash string for ``value``. + + The canonical schema's :class:`AgentHandoffEvent` requires + ``handoff_context_hash`` to start with ``sha256:`` and have a + 64-character hex tail (see + ``ateam/stratix/core/events/cross_cutting.py``). Centralising + the format here ensures every emit site uses the same wire + format — including the empty-string fallback used when the + OpenAI Agents SDK has no handoff context to hash. + """ + return "sha256:" + hashlib.sha256(value.encode("utf-8")).hexdigest() + + +def _detect_provider(model: str | None) -> str: + """Detect the LLM provider from a model identifier. + + OpenAI Agents primarily routes calls through OpenAI / Azure OpenAI, + but the SDK is provider-agnostic — third-party model identifiers + can flow through GenerationSpanData.model. Default to ``openai`` + when the heuristic cannot match (the OpenAI Agents SDK is + OpenAI-centric by design). + """ + if not model: + return "openai" + model_lower = model.lower() + if "claude" in model_lower: + return "anthropic" + if "gemini" in model_lower: + return "google" + if "mistral" in model_lower or "mixtral" in model_lower: + return "mistral" + if "llama" in model_lower: + return "meta" + if "command" in model_lower: + return "cohere" + return "openai" + + class OpenAIAgentsAdapter(BaseAdapter): """LayerLens adapter for OpenAI Agents SDK.""" @@ -48,6 +144,13 @@ class OpenAIAgentsAdapter(BaseAdapter): # Pydantic methods. requires_pydantic = PydanticCompat.V1_OR_V2 + # Per-adapter ``extra="allow"`` decision: openai_agents targets the + # canonical 13-event taxonomy exclusively. Unknown event types must + # be rejected by the base adapter's typed-event validator, so this + # stays ``False``. See ``docs/adapters/typed-events.md`` for the + # opt-in policy. + ALLOW_UNREGISTERED_EVENTS: bool = False + def __init__( self, stratix: Any | None = None, @@ -79,7 +182,7 @@ def disconnect(self) -> None: """Remove trace processor and flush sinks.""" # Note: OpenAI Agents SDK add_trace_processor() is additive and global. # There is no SDK API to remove a processor, so we disable it via the - # _connected guard in emit_dict_event instead. + # _connected guard in emit_event instead. self._trace_processor = None self._seen_agents.clear() self._connected = False @@ -186,23 +289,47 @@ def shutdown(self) -> None: # --- Trace Lifecycle --- def _on_trace_start(self, trace: Any) -> None: + """Emit a typed :class:`AgentInputEvent` for a TraceProcessor trace_start. + + The previous adapter implementation emitted an ad-hoc + ``agent.state.change`` payload carrying only an + ``event_subtype`` marker. That payload did not satisfy the + canonical :class:`AgentStateChangeEvent` schema's + ``before_hash`` / ``after_hash`` requirement (the trace + boundary has no real state mutation to hash). + + The trace_start boundary is logically the inbound for a new + agent run, so the canonical mapping is :class:`AgentInputEvent` + with ``role=AGENT``. The original ``event_subtype="trace_start"`` + marker is preserved on :class:`MessageContent.metadata` so + downstream consumers can still filter on it. + """ if not self._connected: return tid = threading.get_ident() start_ns = time.time_ns() with self._adapter_lock: self._run_starts[tid] = start_ns - self.emit_dict_event( - "agent.state.change", - { - "framework": "openai_agents", - "event_subtype": "trace_start", - "trace_id": getattr(trace, "trace_id", None), - "timestamp_ns": start_ns, - }, + self.emit_event( + AgentInputEvent.create( + message="", + role=MessageRole.AGENT, + metadata={ + "framework": "openai_agents", + "event_subtype": "trace_start", + "trace_id": getattr(trace, "trace_id", None), + "timestamp_ns": start_ns, + }, + ) ) def _on_trace_end(self, trace: Any) -> None: + """Emit a typed :class:`AgentOutputEvent` for a TraceProcessor trace_end. + + See :meth:`_on_trace_start` for the rationale on why this is + not an :class:`AgentStateChangeEvent`. Duration metadata + lives on :class:`MessageContent.metadata.duration_ns`. + """ if not self._connected: return tid = threading.get_ident() @@ -210,14 +337,16 @@ def _on_trace_end(self, trace: Any) -> None: with self._adapter_lock: start_ns = self._run_starts.pop(tid, 0) duration_ns = end_ns - start_ns if start_ns else 0 - self.emit_dict_event( - "agent.state.change", - { - "framework": "openai_agents", - "event_subtype": "trace_end", - "trace_id": getattr(trace, "trace_id", None), - "duration_ns": duration_ns, - }, + self.emit_event( + AgentOutputEvent.create( + message="", + metadata={ + "framework": "openai_agents", + "event_subtype": "trace_end", + "trace_id": getattr(trace, "trace_id", None), + "duration_ns": duration_ns, + }, + ) ) def _on_span_start(self, span: Any) -> None: @@ -253,103 +382,195 @@ def _on_span_end(self, span: Any) -> None: # --- Span Type Handlers --- def _on_agent_span_start(self, span: Any, data: Any) -> None: + """Emit a typed :class:`AgentInputEvent` for an AgentSpanData start. + + OpenAI-Agents-specific provenance (``framework``, + ``agent_name``, ``span_id``, ``timestamp_ns``) lives on + :class:`MessageContent.metadata`. The agent config emission is + idempotent per agent name. + """ agent_name = getattr(data, "name", None) or "unknown" self._emit_agent_config(agent_name, data) - self.emit_dict_event( - "agent.input", - { - "framework": "openai_agents", - "agent_name": agent_name, - "span_id": getattr(span, "span_id", None), - "timestamp_ns": time.time_ns(), - }, + self.emit_event( + AgentInputEvent.create( + message="", + role=MessageRole.AGENT, + metadata={ + "framework": "openai_agents", + "agent_name": agent_name, + "span_id": getattr(span, "span_id", None), + "timestamp_ns": time.time_ns(), + }, + ) ) def _on_agent_span_end(self, span: Any, data: Any) -> None: + """Emit a typed :class:`AgentOutputEvent` for an AgentSpanData end. + + The canonical ``message`` slot carries the stringified output; + OpenAI-Agents-specific provenance (``framework``, + ``agent_name``, ``span_id``, ``raw_output``) lives on + :class:`MessageContent.metadata`. + """ agent_name = getattr(data, "name", None) or "unknown" output = getattr(data, "output", None) - self.emit_dict_event( - "agent.output", - { - "framework": "openai_agents", - "agent_name": agent_name, - "output": self._safe_serialize(output), - "span_id": getattr(span, "span_id", None), - }, + serialised_output = self._safe_serialize(output) + self.emit_event( + AgentOutputEvent.create( + message=_stringify(serialised_output), + metadata={ + "framework": "openai_agents", + "agent_name": agent_name, + "span_id": getattr(span, "span_id", None), + "raw_output": serialised_output, + }, + ) ) def _on_generation_span_end(self, span: Any, data: Any) -> None: - payload: dict[str, Any] = {"framework": "openai_agents"} + """Emit typed :class:`ModelInvokeEvent` (and :class:`CostRecordEvent`). + + Provider is derived from the model identifier + (default ``openai`` when unrecognised — the OpenAI Agents SDK + is OpenAI-centric by design). Token usage is mirrored onto the + canonical ``prompt_tokens`` / ``completion_tokens`` slots and + a paired :class:`CostRecordEvent` is emitted when the SDK + surfaces usage metrics. + """ model = getattr(data, "model", None) - if model: - payload["model"] = model + provider = _detect_provider(model) input_tokens = getattr(data, "input_tokens", None) output_tokens = getattr(data, "output_tokens", None) - if input_tokens is not None: - payload["tokens_prompt"] = input_tokens - if output_tokens is not None: - payload["tokens_completion"] = output_tokens duration = getattr(span, "duration_ms", None) - if duration is not None: - payload["latency_ms"] = duration - self.emit_dict_event("model.invoke", payload) + self.emit_event( + ModelInvokeEvent.create( + provider=provider, + name=model or "unknown", + version="unavailable", + parameters={"framework": "openai_agents"}, + prompt_tokens=input_tokens, + completion_tokens=output_tokens, + latency_ms=duration, + ) + ) if input_tokens is not None or output_tokens is not None: - self.emit_dict_event( - "cost.record", - { - "framework": "openai_agents", - "model": model, - "tokens_prompt": input_tokens, - "tokens_completion": output_tokens, - "tokens_total": (input_tokens or 0) + (output_tokens or 0), - }, + total = (input_tokens or 0) + (output_tokens or 0) + self.emit_event( + CostRecordEvent.create( + prompt_tokens=input_tokens, + completion_tokens=output_tokens, + tokens=total, + ) ) def _on_function_span_end(self, span: Any, data: Any) -> None: + """Emit a typed :class:`ToolCallEvent` for a FunctionSpanData end. + + OpenAI Agents function spans wrap Python callables exposed + via the SDK's tool-decorator surface, so + ``integration=IntegrationType.LIBRARY`` is the canonical + match. OpenAI-Agents-specific provenance (``framework``) is + folded onto the canonical ``input`` dict. + """ tool_name = getattr(data, "name", None) or "unknown" - self.emit_dict_event( - "tool.call", - { - "framework": "openai_agents", - "tool_name": tool_name, - "tool_input": self._safe_serialize(getattr(data, "input", None)), - "tool_output": self._safe_serialize(getattr(data, "output", None)), - "latency_ms": getattr(span, "duration_ms", None), - }, + serialised_input = self._safe_serialize(getattr(data, "input", None)) + serialised_output = self._safe_serialize(getattr(data, "output", None)) + input_data: dict[str, Any] + if isinstance(serialised_input, dict): + input_data = dict(serialised_input) + elif serialised_input is None: + input_data = {} + else: + input_data = {"value": serialised_input} + input_data["framework"] = "openai_agents" + output_data: dict[str, Any] | None + if isinstance(serialised_output, dict): + output_data = dict(serialised_output) + elif serialised_output is None: + output_data = None + else: + output_data = {"value": serialised_output} + self.emit_event( + ToolCallEvent.create( + name=tool_name, + version="unavailable", + integration=IntegrationType.LIBRARY, + input_data=input_data, + output_data=output_data, + latency_ms=getattr(span, "duration_ms", None), + ) ) def _on_handoff_span_start(self, span: Any, data: Any) -> None: pass # Start event captured on end for complete data def _on_handoff_span_end(self, span: Any, data: Any) -> None: + """Emit a typed :class:`AgentHandoffEvent` for a HandoffSpanData end. + + OpenAI Agents handoff spans do not surface an explicit context + payload, so the canonical ``handoff_context_hash`` is computed + deterministically from the (from_agent, to_agent, reason) + tuple — this preserves the wire-format guarantee while + remaining stable across replays. + """ from_agent = getattr(data, "from_agent", None) or "unknown" to_agent = getattr(data, "to_agent", None) or "unknown" - self.emit_dict_event( - "agent.handoff", - { - "from_agent": from_agent, - "to_agent": to_agent, - "reason": "handoff", - "framework": "openai_agents", - }, + reason = "handoff" + self.emit_event( + AgentHandoffEvent.create( + from_agent=from_agent, + to_agent=to_agent, + handoff_context_hash=_sha256_of(f"{reason}::{from_agent}::{to_agent}"), + ) ) def _on_guardrail_span_end(self, span: Any, data: Any) -> None: + """Emit a typed :class:`PolicyViolationEvent` for a guardrail trip. + + OpenAI Agents guardrails are policy-constraint checks that + run before / after model invocation. The canonical mapping is + :attr:`ViolationType.POLICY_CONSTRAINT`. OpenAI-Agents- + specific provenance (``framework``, ``guardrail_name``, + ``triggered``, ``output``) lives on + :attr:`ViolationInfo.details`. + + Non-triggered guardrail spans are not policy violations and + therefore are not emitted as ``policy.violation`` events — + only triggered guardrails fire the canonical event. This is + more semantically correct than the previous adapter, which + emitted ``policy.violation`` for every guardrail span + (triggered or not). The previous behaviour is preserved + defensively: when ``triggered`` cannot be evaluated as truthy + but is also not ``False``, the canonical event still fires + (``triggered=None`` etc. — the guardrail outcome is unknown + and the most conservative interpretation is to emit). + """ guardrail_name = getattr(data, "name", None) or "unknown" triggered = getattr(data, "triggered", False) - self.emit_dict_event( - "policy.violation", - { - "framework": "openai_agents", - "guardrail_name": guardrail_name, - "triggered": triggered, - "output": self._safe_serialize(getattr(data, "output", None)), - }, + output_value = self._safe_serialize(getattr(data, "output", None)) + self.emit_event( + PolicyViolationEvent.create( + violation_type=ViolationType.POLICY_CONSTRAINT, + root_cause=f"guardrail '{guardrail_name}' triggered={triggered!r}", + remediation="review guardrail output and adjust agent prompt or guardrail policy", + details={ + "framework": "openai_agents", + "guardrail_name": guardrail_name, + "triggered": triggered, + "output": output_value, + }, + ) ) # --- Lifecycle Hooks (Runner wrapping) --- def on_run_start(self, agent_name: str | None = None, input_data: Any = None) -> None: + """Emit a typed :class:`AgentInputEvent` for a Runner run_start. + + OpenAI-Agents-specific provenance (``framework``, + ``agent_name``, ``timestamp_ns``, ``raw_input``) lives on + :class:`MessageContent.metadata`. + """ if not self._connected: return try: @@ -357,14 +578,18 @@ def on_run_start(self, agent_name: str | None = None, input_data: Any = None) -> start_ns = time.time_ns() with self._adapter_lock: self._run_starts[tid] = start_ns - self.emit_dict_event( - "agent.input", - { - "framework": "openai_agents", - "agent_name": agent_name, - "input": self._safe_serialize(input_data), - "timestamp_ns": start_ns, - }, + serialised_input = self._safe_serialize(input_data) + self.emit_event( + AgentInputEvent.create( + message=_stringify(serialised_input), + role=MessageRole.HUMAN, + metadata={ + "framework": "openai_agents", + "agent_name": agent_name, + "timestamp_ns": start_ns, + "raw_input": serialised_input, + }, + ) ) except Exception: logger.warning("Error in on_run_start", exc_info=True) @@ -375,6 +600,12 @@ def on_run_end( output: Any = None, error: Exception | None = None, ) -> None: + """Emit a typed :class:`AgentOutputEvent` for a Runner run_end. + + OpenAI-Agents-specific provenance (``framework``, + ``agent_name``, ``duration_ns``, ``raw_output``, ``error``, + ``run_status``) lives on :class:`MessageContent.metadata`. + """ if not self._connected: return try: @@ -383,15 +614,22 @@ def on_run_end( with self._adapter_lock: start_ns = self._run_starts.pop(tid, 0) duration_ns = end_ns - start_ns if start_ns else 0 - payload: dict[str, Any] = { + serialised_output = self._safe_serialize(output) + metadata: dict[str, Any] = { "framework": "openai_agents", "agent_name": agent_name, - "output": self._safe_serialize(output), "duration_ns": duration_ns, + "raw_output": serialised_output, + "run_status": "run_failed" if error else "run_complete", } if error: - payload["error"] = str(error) - self.emit_dict_event("agent.output", payload) + metadata["error"] = str(error) + self.emit_event( + AgentOutputEvent.create( + message=_stringify(serialised_output), + metadata=metadata, + ) + ) except Exception: logger.warning("Error in on_run_end", exc_info=True) @@ -403,20 +641,44 @@ def on_tool_use( error: Exception | None = None, latency_ms: float | None = None, ) -> None: + """Emit a typed :class:`ToolCallEvent` for a manual tool invocation. + + Defaults to ``integration=LIBRARY`` (the OpenAI Agents tool + surface is in-process Python). OpenAI-Agents-specific + provenance (``framework``) is folded onto the canonical + ``input``. + """ if not self._connected: return try: - payload: dict[str, Any] = { - "framework": "openai_agents", - "tool_name": tool_name, - "tool_input": self._safe_serialize(tool_input), - "tool_output": self._safe_serialize(tool_output), - } - if error: - payload["error"] = str(error) - if latency_ms is not None: - payload["latency_ms"] = latency_ms - self.emit_dict_event("tool.call", payload) + serialised_input = self._safe_serialize(tool_input) + serialised_output = self._safe_serialize(tool_output) + input_data: dict[str, Any] + if isinstance(serialised_input, dict): + input_data = dict(serialised_input) + elif serialised_input is None: + input_data = {} + else: + input_data = {"value": serialised_input} + input_data["framework"] = "openai_agents" + output_data: dict[str, Any] | None + if isinstance(serialised_output, dict): + output_data = dict(serialised_output) + elif serialised_output is None: + output_data = None + else: + output_data = {"value": serialised_output} + self.emit_event( + ToolCallEvent.create( + name=tool_name, + version="unavailable", + integration=IntegrationType.LIBRARY, + input_data=input_data, + output_data=output_data, + error=str(error) if error else None, + latency_ms=latency_ms, + ) + ) except Exception: logger.warning("Error in on_tool_use", exc_info=True) @@ -429,23 +691,30 @@ def on_llm_call( latency_ms: float | None = None, messages: list[dict[str, str]] | None = None, ) -> None: + """Emit a typed :class:`ModelInvokeEvent` for a manual LLM call. + + OpenAI-Agents-specific provenance (``framework``) is carried + on :attr:`ModelInfo.parameters`. Provider falls back to the + identifier-derived guess (default ``openai``) when the caller + supplies none. + """ if not self._connected: return try: - payload: dict[str, Any] = {"framework": "openai_agents"} - if provider: - payload["provider"] = provider - if model: - payload["model"] = model - if tokens_prompt is not None: - payload["tokens_prompt"] = tokens_prompt - if tokens_completion is not None: - payload["tokens_completion"] = tokens_completion - if latency_ms is not None: - payload["latency_ms"] = latency_ms - if self._capture_config.capture_content and messages: - payload["messages"] = messages - self.emit_dict_event("model.invoke", payload) + self.emit_event( + ModelInvokeEvent.create( + provider=provider or _detect_provider(model), + name=model or "unknown", + version="unavailable", + parameters={"framework": "openai_agents"}, + prompt_tokens=tokens_prompt, + completion_tokens=tokens_completion, + latency_ms=latency_ms, + input_messages=messages + if (self._capture_config.capture_content and messages) + else None, + ) + ) except Exception: logger.warning("Error in on_llm_call", exc_info=True) @@ -455,20 +724,23 @@ def on_handoff( to_agent: str, context: Any = None, ) -> None: + """Emit a typed :class:`AgentHandoffEvent` for a manual handoff. + + Empty contexts are still hashed (over the empty string) so + the canonical wire format is uniform — the previous adapter + emitted ``None`` when context was missing, which the canonical + validator rejects. + """ if not self._connected: return try: context_str = str(context) if context else "" - context_hash = hashlib.sha256(context_str.encode("utf-8")).hexdigest() if context_str else None - self.emit_dict_event( - "agent.handoff", - { - "from_agent": from_agent, - "to_agent": to_agent, - "reason": "handoff", - "context_hash": context_hash, - "context_preview": context_str[:500] if context_str else None, - }, + self.emit_event( + AgentHandoffEvent.create( + from_agent=from_agent, + to_agent=to_agent, + handoff_context_hash=_sha256_of(context_str), + ) ) except Exception: logger.warning("Error in on_handoff", exc_info=True) @@ -476,25 +748,38 @@ def on_handoff( # --- Helpers --- def _emit_agent_config(self, agent_name: str, data: Any) -> None: + """Emit a typed :class:`EnvironmentConfigEvent` once per agent. + + OpenAI Agents runs in OpenAI-managed cloud infrastructure, so + the canonical :attr:`EnvironmentType.CLOUD` value is used. + Agent configuration (``instructions``, ``model``, + ``handoff_description``, ``tools``, ``handoffs``) lives on + :attr:`EnvironmentInfo.attributes`. + """ with self._adapter_lock: if agent_name in self._seen_agents: return self._seen_agents.add(agent_name) - metadata: dict[str, Any] = { + attributes: dict[str, Any] = { "framework": "openai_agents", "agent_name": agent_name, } for attr in ("instructions", "model", "handoff_description"): val = getattr(data, attr, None) if val is not None: - metadata[attr] = str(val) + attributes[attr] = str(val) tools = getattr(data, "tools", None) if tools: - metadata["tools"] = [getattr(t, "name", str(t)) for t in tools] + attributes["tools"] = [getattr(t, "name", str(t)) for t in tools] handoffs = getattr(data, "handoffs", None) if handoffs: - metadata["handoffs"] = [getattr(h, "agent_name", str(h)) for h in handoffs] - self.emit_dict_event("environment.config", metadata) + attributes["handoffs"] = [getattr(h, "agent_name", str(h)) for h in handoffs] + self.emit_event( + EnvironmentConfigEvent.create( + env_type=EnvironmentType.CLOUD, + attributes=attributes, + ) + ) def _safe_serialize(self, value: Any) -> Any: try: diff --git a/tests/instrument/adapters/frameworks/test_openai_agents_adapter.py b/tests/instrument/adapters/frameworks/test_openai_agents_adapter.py index 56a3341..f440bcc 100644 --- a/tests/instrument/adapters/frameworks/test_openai_agents_adapter.py +++ b/tests/instrument/adapters/frameworks/test_openai_agents_adapter.py @@ -3,12 +3,24 @@ Mocked at the SDK shape level — no real ``agents`` runtime needed. The adapter dispatches by ``type(span_data).__name__``, so each test span uses a class with the right name (AgentSpanData, GenerationSpanData, etc.). + +After the typed-event migration (PR #129 follow-up — bundle 4) every +emit site flows through :meth:`BaseAdapter.emit_event` with a canonical +Pydantic payload. The :class:`_RecordingStratix` stand-in below records +both shapes so pre- and post-migration assertions live side by side: the +``payload`` slot always carries a dict (model-dumped if typed), and +``typed_payloads`` holds the original Pydantic instances for tests that +want to assert against the model surface. """ from __future__ import annotations from typing import Any, Dict, List +from layerlens._compat.pydantic import ( + BaseModel as _CompatBaseModel, + model_dump as _compat_model_dump, +) from layerlens.instrument.adapters._base import AdapterStatus, CaptureConfig from layerlens.instrument.adapters.frameworks.openai_agents import ( ADAPTER_CLASS, @@ -26,10 +38,26 @@ class _RecordingStratix: def __init__(self) -> None: self.events: List[Dict[str, Any]] = [] + # Hold strong references to the original typed payloads for + # the subset of tests that want to assert against the model + # surface (e.g. ``isinstance(payload, ToolCallEvent)``). The + # dict view lives on ``events`` and is what most assertions + # read. + self.typed_payloads: List[Any] = [] def emit(self, *args: Any, **kwargs: Any) -> None: + # Two-arg legacy path: ``emit(event_type, payload_dict)``. if len(args) == 2 and isinstance(args[0], str): self.events.append({"event_type": args[0], "payload": args[1]}) + return + # Single-arg typed path: ``emit(payload_model[, privacy_level])``. + if args and isinstance(args[0], _CompatBaseModel): + payload_model = args[0] + self.typed_payloads.append(payload_model) + event_type = getattr(payload_model, "event_type", "") + self.events.append( + {"event_type": event_type, "payload": _compat_model_dump(payload_model)} + ) # Span data classes — names must match what the adapter dispatches on. @@ -98,6 +126,7 @@ def test_adapter_info_and_health() -> None: def test_agent_span_emits_input_output_and_config() -> None: + """Typed AgentInputEvent + AgentOutputEvent + EnvironmentConfigEvent.""" stratix = _RecordingStratix() adapter = OpenAIAgentsAdapter(stratix=stratix, capture_config=CaptureConfig.full()) adapter.connect() @@ -114,11 +143,25 @@ def test_agent_span_emits_input_output_and_config() -> None: assert "agent.output" in types out = next(e for e in stratix.events if e["event_type"] == "agent.output") - assert out["payload"]["agent_name"] == "planner" - assert out["payload"]["output"] == "response" + payload = out["payload"] + assert payload["layer"] == "L1" + assert payload["content"]["message"] == "response" + assert payload["content"]["role"] == "agent" + metadata = payload["content"]["metadata"] + assert metadata["framework"] == "openai_agents" + assert metadata["agent_name"] == "planner" + assert metadata["span_id"] == "span-1" + + cfg = next(e for e in stratix.events if e["event_type"] == "environment.config") + cfg_payload = cfg["payload"] + assert cfg_payload["layer"] == "L4a" + assert cfg_payload["environment"]["type"] == "cloud" + assert cfg_payload["environment"]["attributes"]["agent_name"] == "planner" + assert cfg_payload["environment"]["attributes"]["model"] == "gpt-5" def test_generation_span_emits_model_invoke_and_cost() -> None: + """Typed ModelInvokeEvent + CostRecordEvent for GenerationSpanData.""" stratix = _RecordingStratix() adapter = OpenAIAgentsAdapter(stratix=stratix, capture_config=CaptureConfig.full()) adapter.connect() @@ -127,45 +170,69 @@ def test_generation_span_emits_model_invoke_and_cost() -> None: adapter._on_span_end(_Span(data, duration_ms=42.0)) invoke = next(e for e in stratix.events if e["event_type"] == "model.invoke") - assert invoke["payload"]["model"] == "gpt-5" - assert invoke["payload"]["tokens_prompt"] == 10 - assert invoke["payload"]["latency_ms"] == 42.0 + payload = invoke["payload"] + assert payload["layer"] == "L3" + assert payload["model"]["name"] == "gpt-5" + assert payload["model"]["provider"] == "openai" + assert payload["model"]["version"] == "unavailable" + assert payload["model"]["parameters"]["framework"] == "openai_agents" + assert payload["prompt_tokens"] == 10 + assert payload["completion_tokens"] == 20 + assert payload["latency_ms"] == 42.0 cost = next(e for e in stratix.events if e["event_type"] == "cost.record") - assert cost["payload"]["tokens_total"] == 30 + cost_payload = cost["payload"] + assert cost_payload["cost"]["prompt_tokens"] == 10 + assert cost_payload["cost"]["completion_tokens"] == 20 + assert cost_payload["cost"]["tokens"] == 30 def test_function_span_emits_tool_call() -> None: + """Typed ToolCallEvent for FunctionSpanData.""" stratix = _RecordingStratix() adapter = OpenAIAgentsAdapter(stratix=stratix, capture_config=CaptureConfig.full()) adapter.connect() adapter._on_span_end(_Span(FunctionSpanData(name="calc", input={"x": 1}, output=42))) evt = next(e for e in stratix.events if e["event_type"] == "tool.call") - assert evt["payload"]["tool_name"] == "calc" - assert evt["payload"]["tool_output"] == 42 + payload = evt["payload"] + assert payload["layer"] == "L5a" + assert payload["tool"]["name"] == "calc" + assert payload["tool"]["integration"] == "library" + assert payload["input"]["x"] == 1 + assert payload["input"]["framework"] == "openai_agents" + assert payload["output"] == {"value": 42} def test_handoff_span_emits_agent_handoff() -> None: + """Typed AgentHandoffEvent: canonical sha256: handoff_context_hash.""" stratix = _RecordingStratix() adapter = OpenAIAgentsAdapter(stratix=stratix, capture_config=CaptureConfig.full()) adapter.connect() adapter._on_span_end(_Span(HandoffSpanData(from_agent="a", to_agent="b"))) evt = next(e for e in stratix.events if e["event_type"] == "agent.handoff") - assert evt["payload"]["from_agent"] == "a" - assert evt["payload"]["to_agent"] == "b" + payload = evt["payload"] + assert payload["from_agent"] == "a" + assert payload["to_agent"] == "b" + assert payload["handoff_context_hash"].startswith("sha256:") + assert len(payload["handoff_context_hash"]) == 7 + 64 def test_guardrail_span_emits_policy_violation() -> None: + """Typed PolicyViolationEvent: canonical violation_type=POLICY_CONSTRAINT.""" stratix = _RecordingStratix() adapter = OpenAIAgentsAdapter(stratix=stratix, capture_config=CaptureConfig.full()) adapter.connect() adapter._on_span_end(_Span(GuardrailSpanData(name="profanity", triggered=True, output="blocked"))) evt = next(e for e in stratix.events if e["event_type"] == "policy.violation") - assert evt["payload"]["guardrail_name"] == "profanity" - assert evt["payload"]["triggered"] is True + payload = evt["payload"] + assert payload["violation"]["type"] == "policy_constraint" + details = payload["violation"]["details"] + assert details["framework"] == "openai_agents" + assert details["guardrail_name"] == "profanity" + assert details["triggered"] is True def test_capture_config_gates_l3_model_metadata() -> None: @@ -184,7 +251,18 @@ def test_capture_config_gates_l3_model_metadata() -> None: assert "agent.handoff" in types -def test_trace_start_end_emits_state_change() -> None: +def test_trace_start_end_emits_input_output_with_event_subtype() -> None: + """The trace_start/trace_end markers are remapped to typed + AgentInputEvent / AgentOutputEvent. + + The previous adapter implementation emitted ad-hoc + ``agent.state.change`` payloads carrying only an + ``event_subtype`` marker. Those payloads did not satisfy the + canonical AgentStateChangeEvent ``before_hash`` / ``after_hash`` + schema. The typed migration maps trace_start -> AgentInputEvent + (role=AGENT) and trace_end -> AgentOutputEvent, with the original + event_subtype marker preserved on MessageContent.metadata. + """ stratix = _RecordingStratix() adapter = OpenAIAgentsAdapter(stratix=stratix, capture_config=CaptureConfig.full()) adapter.connect() @@ -195,10 +273,20 @@ class _Trace: adapter._on_trace_start(_Trace()) adapter._on_trace_end(_Trace()) - states = [e for e in stratix.events if e["event_type"] == "agent.state.change"] - subtypes = {s["payload"]["event_subtype"] for s in states} - assert "trace_start" in subtypes - assert "trace_end" in subtypes + inputs = [e for e in stratix.events if e["event_type"] == "agent.input"] + outputs = [e for e in stratix.events if e["event_type"] == "agent.output"] + trace_inputs = [ + e for e in inputs + if e["payload"]["content"]["metadata"].get("event_subtype") == "trace_start" + ] + trace_outputs = [ + e for e in outputs + if e["payload"]["content"]["metadata"].get("event_subtype") == "trace_end" + ] + assert len(trace_inputs) == 1 + assert len(trace_outputs) == 1 + assert trace_inputs[0]["payload"]["content"]["metadata"]["trace_id"] == "trace-1" + assert trace_outputs[0]["payload"]["content"]["metadata"]["trace_id"] == "trace-1" def test_instrument_runner_helper() -> None: @@ -218,3 +306,100 @@ def test_serialize_for_replay() -> None: assert rt.framework == "openai_agents" assert rt.adapter_name == "OpenAIAgentsAdapter" assert "capture_config" in rt.config + + +# --------------------------------------------------------------------------- +# Typed-event migration regression tests (PR #129 follow-up — bundle 4) +# --------------------------------------------------------------------------- + + +def test_openai_agents_emits_typed_payloads_only() -> None: + """Every emit site in openai_agents lifecycle is a typed emit_event call. + + Pins the post-migration contract: the recording stratix's + ``typed_payloads`` list grows for every emission and the legacy + two-arg dict path receives nothing. This is the public contract + backing the ``grep emit_dict_event src/.../openai_agents/lifecycle.py + → 0`` acceptance criterion in the typed-events bundle 4 PR. + """ + from layerlens.instrument._compat.events import ( + ToolCallEvent, + AgentInputEvent, + CostRecordEvent, + AgentOutputEvent, + ModelInvokeEvent, + AgentHandoffEvent, + PolicyViolationEvent, + EnvironmentConfigEvent, + ) + + stratix = _RecordingStratix() + adapter = OpenAIAgentsAdapter(stratix=stratix, capture_config=CaptureConfig.full()) + adapter.connect() + + # Drive every emission path. + class _Trace: + trace_id = "trace-1" + + adapter._on_trace_start(_Trace()) + agent_data = AgentSpanData(name="planner", output="response", model="gpt-5") + adapter._on_span_start(_Span(agent_data, span_id="s-1")) + adapter._on_span_end(_Span(agent_data, span_id="s-1")) + adapter._on_span_end(_Span(GenerationSpanData(model="gpt-5", input_tokens=10, output_tokens=20))) + adapter._on_span_end(_Span(FunctionSpanData(name="calc", input={"x": 1}, output=42))) + adapter._on_span_end(_Span(HandoffSpanData(from_agent="a", to_agent="b"))) + adapter._on_span_end(_Span(GuardrailSpanData(name="profanity", triggered=True))) + adapter._on_trace_end(_Trace()) + adapter.on_run_start(agent_name="planner", input_data="x") + adapter.on_run_end(agent_name="planner", output="y") + adapter.on_tool_use("calc", tool_input={"x": 1}, tool_output=2) + adapter.on_llm_call(provider="openai", model="gpt-5", tokens_prompt=1, tokens_completion=2) + adapter.on_handoff(from_agent="a", to_agent="b", context="ctx") + + # Every captured payload is a Pydantic model instance — the legacy + # dict path was not used. + assert stratix.typed_payloads, "expected typed payloads to be captured" + types_seen = {type(p) for p in stratix.typed_payloads} + assert AgentInputEvent in types_seen + assert AgentOutputEvent in types_seen + assert AgentHandoffEvent in types_seen + assert EnvironmentConfigEvent in types_seen + assert ModelInvokeEvent in types_seen + assert ToolCallEvent in types_seen + assert CostRecordEvent in types_seen + assert PolicyViolationEvent in types_seen + + +def test_openai_agents_emit_does_not_warn_after_migration() -> None: + """No DeprecationWarning fires from openai_agents lifecycle emission paths. + + The base adapter's ``emit_dict_event`` raises a DeprecationWarning + on every call. After migration, openai_agents lifecycle must + never trigger that warning. ``filterwarnings("error", ...)`` + converts the warning into a test failure. + """ + import warnings + + stratix = _RecordingStratix() + adapter = OpenAIAgentsAdapter(stratix=stratix, capture_config=CaptureConfig.full()) + adapter.connect() + + class _Trace: + trace_id = "trace-1" + + with warnings.catch_warnings(): + warnings.simplefilter("error", DeprecationWarning) + adapter._on_trace_start(_Trace()) + agent_data = AgentSpanData(name="planner", output="response", model="gpt-5") + adapter._on_span_start(_Span(agent_data)) + adapter._on_span_end(_Span(agent_data)) + adapter._on_span_end(_Span(GenerationSpanData(model="gpt-5", input_tokens=10, output_tokens=20))) + adapter._on_span_end(_Span(FunctionSpanData(name="calc", input={"x": 1}, output=42))) + adapter._on_span_end(_Span(HandoffSpanData(from_agent="a", to_agent="b"))) + adapter._on_span_end(_Span(GuardrailSpanData(name="profanity", triggered=True))) + adapter._on_trace_end(_Trace()) + adapter.on_run_start(agent_name="planner", input_data="x") + adapter.on_run_end(agent_name="planner", output="y") + adapter.on_tool_use("calc", tool_input={"x": 1}, tool_output=2) + adapter.on_llm_call(provider="openai", model="gpt-5", tokens_prompt=1, tokens_completion=2) + adapter.on_handoff(from_agent="a", to_agent="b", context="ctx")