diff --git a/src/layerlens/instrument/adapters/frameworks/google_adk/lifecycle.py b/src/layerlens/instrument/adapters/frameworks/google_adk/lifecycle.py index 5de8ea3..1b4d7aa 100644 --- a/src/layerlens/instrument/adapters/frameworks/google_adk/lifecycle.py +++ b/src/layerlens/instrument/adapters/frameworks/google_adk/lifecycle.py @@ -9,6 +9,21 @@ BeforeToolCallback → tool.call start (L5a) AfterToolCallback → tool.call complete (L5a) transfer_to_agent → agent.handoff (Cross) + +Typed-event status (post PR #129 migration, bundle 3): + +* Every emission flows through :meth:`BaseAdapter.emit_event` with a + canonical Pydantic payload imported from + :mod:`layerlens.instrument._compat.events`. +* Google ADK-specific provenance (``framework``, ``agent_name``, + ``timestamp_ns``, ``session_id``, ``description``, ``instruction``) + is carried in the canonical model's metadata / attributes / + parameters slots — the canonical schema does not expose these as + top-level fields. +* The handoff context hash is generated via SHA-256 over the context + string (or the empty string when no context is available) so the + canonical :class:`AgentHandoffEvent.handoff_context_hash` validator + always passes. """ from __future__ import annotations @@ -20,6 +35,18 @@ import threading from typing import Any +from layerlens.instrument._compat.events import ( + MessageRole, + ToolCallEvent, + AgentInputEvent, + CostRecordEvent, + EnvironmentType, + IntegrationType, + AgentOutputEvent, + ModelInvokeEvent, + AgentHandoffEvent, + EnvironmentConfigEvent, +) from layerlens.instrument.adapters._base.adapter import ( AdapterInfo, BaseAdapter, @@ -33,6 +60,97 @@ logger = logging.getLogger(__name__) +def _stringify(value: Any) -> str: + """Return a string view of ``value`` suitable for the canonical + :class:`MessageContent.message` field. + + The canonical schema requires :class:`AgentInputEvent` and + :class:`AgentOutputEvent` to carry a ``message: str``. ADK + callbacks deliver the underlying input/output as arbitrary Python + objects (Pydantic ``Content`` models with ``parts``, dicts, + ``None``); this helper converts each to a (possibly empty) string + so the typed event always validates. The original payload is + preserved on :class:`MessageContent.metadata.raw_input` / + ``raw_output``. + """ + if value is None: + return "" + if isinstance(value, str): + return value + if isinstance(value, dict): + # ADK Content payloads serialise to ``{"parts": [...], "role": ...}``; + # surface a flat string view of the parts when present. + parts = value.get("parts") + if isinstance(parts, list) and parts: + text_parts = [p.get("text", "") if isinstance(p, dict) else str(p) for p in parts] + return "".join(text_parts) + content = value.get("content") + if isinstance(content, str): + return content + return str(value) + + +def _sha256_of(value: str) -> str: + """Return a canonical ``sha256:`` hash string for ``value``. + + The canonical schema's :class:`AgentHandoffEvent` requires + ``handoff_context_hash`` to start with ``sha256:`` and have a + 64-character hex tail (see + ``ateam/stratix/core/events/cross_cutting.py``). Centralising the + format here ensures every emit site uses the same wire format — + including the empty-string fallback used when ADK has no + context to hash. + """ + return "sha256:" + hashlib.sha256(value.encode("utf-8")).hexdigest() + + +def _coerce_to_dict(value: Any) -> dict[str, Any]: + """Coerce ``value`` into a dict suitable for the canonical + :class:`ToolCallEvent` ``input`` / ``output`` slots. + + The canonical schema requires ``input: dict[str, Any]`` and + ``output: dict[str, Any] | None``. ADK tool callbacks deliver + arbitrary Python values (scalars, dicts, lists, dataclass-like + objects). This helper wraps non-dict values in ``{"value": ...}`` + so the canonical slot is always satisfied. ``None`` returns an + empty dict so the caller can pass it positionally without a guard. + """ + if value is None: + return {} + if isinstance(value, dict): + return dict(value) + return {"value": value} + + +def _detect_provider(model: str | None) -> str: + """Detect the LLM provider from a model identifier. + + Google ADK's primary models are Gemini, but ADK supports + third-party LLMs via LiteLLM. The canonical + :class:`ModelInvokeEvent` requires both ``provider`` and ``name``, + so this heuristic derives the provider from well-known model name + prefixes. Mirrors the agno reference implementation. Unknown + identifiers return ``"google"`` (the ADK default) since most ADK + deployments use Gemini. + """ + if not model: + return "google" + model_lower = model.lower() + if "gemini" in model_lower: + return "google" + if "gpt" in model_lower or "o1" in model_lower or "o3" in model_lower: + return "openai" + if "claude" in model_lower: + return "anthropic" + if "mistral" in model_lower or "mixtral" in model_lower: + return "mistral" + if "llama" in model_lower: + return "meta" + if "command" in model_lower: + return "cohere" + return "google" + + class GoogleADKAdapter(BaseAdapter): """LayerLens adapter for Google Agent Development Kit.""" @@ -40,10 +158,17 @@ class GoogleADKAdapter(BaseAdapter): VERSION = "0.1.0" # The adapter source has no direct ``pydantic`` imports (verified by # grep across ``frameworks/google_adk/``). The adapter only registers - # ADK's native 6-callback hooks and emits dict events; it never - # touches ADK's own Pydantic models. + # ADK's native 6-callback hooks and emits typed events through the + # canonical schema (PR #129); it never touches ADK's own Pydantic + # models. requires_pydantic = PydanticCompat.V1_OR_V2 + # Per-adapter ``extra="allow"`` decision: google_adk targets the + # canonical 13-event taxonomy exclusively. Unknown event types must + # be rejected by the base adapter's typed-event validator, so this + # stays ``False``. + ALLOW_UNREGISTERED_EVENTS: bool = False + def __init__( self, stratix: Any | None = None, @@ -148,14 +273,18 @@ def _before_agent_callback(self, callback_context: Any) -> Any: start_ns = time.time_ns() with self._adapter_lock: self._agent_starts[tid] = start_ns - self.emit_dict_event( - "agent.input", - { - "framework": "google_adk", - "agent_name": agent_name, - "input": self._safe_serialize(getattr(callback_context, "user_content", None)), - "timestamp_ns": start_ns, - }, + raw_input = self._safe_serialize(getattr(callback_context, "user_content", None)) + self.emit_event( + AgentInputEvent.create( + message=_stringify(raw_input), + role=MessageRole.HUMAN, + metadata={ + "framework": "google_adk", + "agent_name": agent_name, + "timestamp_ns": start_ns, + "raw_input": raw_input, + }, + ) ) except Exception: logger.warning("Error in before_agent_callback", exc_info=True) @@ -171,14 +300,17 @@ def _after_agent_callback(self, callback_context: Any) -> Any: with self._adapter_lock: start_ns = self._agent_starts.pop(tid, 0) duration_ns = end_ns - start_ns if start_ns else 0 - self.emit_dict_event( - "agent.output", - { - "framework": "google_adk", - "agent_name": agent_name, - "output": self._safe_serialize(getattr(callback_context, "agent_output", None)), - "duration_ns": duration_ns, - }, + raw_output = self._safe_serialize(getattr(callback_context, "agent_output", None)) + self.emit_event( + AgentOutputEvent.create( + message=_stringify(raw_output), + metadata={ + "framework": "google_adk", + "agent_name": agent_name, + "duration_ns": duration_ns, + "raw_output": raw_output, + }, + ) ) except Exception: logger.warning("Error in after_agent_callback", exc_info=True) @@ -202,31 +334,40 @@ def _after_model_callback(self, callback_context: Any, llm_response: Any) -> Any tid = threading.get_ident() with self._adapter_lock: start_ns = self._model_call_starts.pop(tid, None) - latency_ms = None + latency_ms: float | None = None if start_ns: latency_ms = (time.time_ns() - start_ns) / 1_000_000 - payload: dict[str, Any] = {"framework": "google_adk"} - model = getattr(callback_context, "model", None) or getattr(llm_response, "model", None) - if model: - payload["model"] = str(model) - payload["provider"] = "google" + model_raw = getattr(callback_context, "model", None) or getattr( + llm_response, "model", None + ) + model_name = str(model_raw) if model_raw else "unknown" + provider = _detect_provider(model_name) usage = getattr(llm_response, "usage_metadata", None) + prompt_tokens = ( + getattr(usage, "prompt_token_count", None) if usage else None + ) + completion_tokens = ( + getattr(usage, "candidates_token_count", None) if usage else None + ) + self.emit_event( + ModelInvokeEvent.create( + provider=provider, + name=model_name, + version="unavailable", + parameters={"framework": "google_adk"}, + prompt_tokens=prompt_tokens, + completion_tokens=completion_tokens, + latency_ms=latency_ms, + ) + ) if usage: - payload["tokens_prompt"] = getattr(usage, "prompt_token_count", None) - payload["tokens_completion"] = getattr(usage, "candidates_token_count", None) - if latency_ms is not None: - payload["latency_ms"] = latency_ms - self.emit_dict_event("model.invoke", payload) - if usage: - self.emit_dict_event( - "cost.record", - { - "framework": "google_adk", - "model": payload.get("model"), - "tokens_prompt": payload.get("tokens_prompt"), - "tokens_completion": payload.get("tokens_completion"), - "tokens_total": ((payload.get("tokens_prompt") or 0) + (payload.get("tokens_completion") or 0)), - }, + total_tokens = (prompt_tokens or 0) + (completion_tokens or 0) + self.emit_event( + CostRecordEvent.create( + prompt_tokens=prompt_tokens, + completion_tokens=completion_tokens, + tokens=total_tokens, + ) ) except Exception: logger.warning("Error in after_model_callback", exc_info=True) @@ -256,18 +397,25 @@ def _after_tool_callback( call_id = f"{tool_name}_{id(tool_input)}" with self._adapter_lock: start_ns = self._tool_call_starts.pop(call_id, None) - latency_ms = None + latency_ms: float | None = None if start_ns: latency_ms = (time.time_ns() - start_ns) / 1_000_000 - self.emit_dict_event( - "tool.call", - { - "framework": "google_adk", - "tool_name": tool_name, - "tool_input": self._safe_serialize(tool_input), - "tool_output": self._safe_serialize(tool_output), - "latency_ms": latency_ms, - }, + serialized_input = self._safe_serialize(tool_input) + serialized_output = self._safe_serialize(tool_output) + input_data = _coerce_to_dict(serialized_input) + input_data.setdefault("framework", "google_adk") + output_data: dict[str, Any] | None = ( + _coerce_to_dict(serialized_output) if serialized_output is not None else None + ) + self.emit_event( + ToolCallEvent.create( + name=tool_name, + version="unavailable", + integration=IntegrationType.LIBRARY, + input_data=input_data, + output_data=output_data, + latency_ms=latency_ms, + ) ) except Exception: logger.warning("Error in after_tool_callback", exc_info=True) @@ -283,14 +431,18 @@ def on_agent_start(self, agent_name: str | None = None, input_data: Any = None) start_ns = time.time_ns() with self._adapter_lock: self._agent_starts[tid] = start_ns - self.emit_dict_event( - "agent.input", - { - "framework": "google_adk", - "agent_name": agent_name, - "input": self._safe_serialize(input_data), - "timestamp_ns": start_ns, - }, + raw_input = self._safe_serialize(input_data) + self.emit_event( + AgentInputEvent.create( + message=_stringify(raw_input), + role=MessageRole.HUMAN, + metadata={ + "framework": "google_adk", + "agent_name": agent_name, + "timestamp_ns": start_ns, + "raw_input": raw_input, + }, + ) ) except Exception: logger.warning("Error in on_agent_start", exc_info=True) @@ -309,15 +461,21 @@ def on_agent_end( with self._adapter_lock: start_ns = self._agent_starts.pop(tid, 0) duration_ns = end_ns - start_ns if start_ns else 0 - payload: dict[str, Any] = { + raw_output = self._safe_serialize(output) + metadata: dict[str, Any] = { "framework": "google_adk", "agent_name": agent_name, - "output": self._safe_serialize(output), "duration_ns": duration_ns, + "raw_output": raw_output, } if error: - payload["error"] = str(error) - self.emit_dict_event("agent.output", payload) + metadata["error"] = str(error) + self.emit_event( + AgentOutputEvent.create( + message=_stringify(raw_output), + metadata=metadata, + ) + ) except Exception: logger.warning("Error in on_agent_end", exc_info=True) @@ -326,15 +484,12 @@ def on_handoff(self, from_agent: str, to_agent: str, context: Any = None) -> Non return try: context_str = str(context) if context else "" - self.emit_dict_event( - "agent.handoff", - { - "from_agent": from_agent, - "to_agent": to_agent, - "reason": "transfer_to_agent", - "context_hash": hashlib.sha256(context_str.encode()).hexdigest() if context_str else None, - "context_preview": context_str[:500] if context_str else None, - }, + self.emit_event( + AgentHandoffEvent.create( + from_agent=from_agent, + to_agent=to_agent, + handoff_context_hash=_sha256_of(context_str), + ) ) except Exception: logger.warning("Error in on_handoff", exc_info=True) @@ -350,17 +505,24 @@ def on_tool_use( if not self._connected: return try: - payload: dict[str, Any] = { - "framework": "google_adk", - "tool_name": tool_name, - "tool_input": self._safe_serialize(tool_input), - "tool_output": self._safe_serialize(tool_output), - } - if error: - payload["error"] = str(error) - if latency_ms is not None: - payload["latency_ms"] = latency_ms - self.emit_dict_event("tool.call", payload) + serialized_input = self._safe_serialize(tool_input) + serialized_output = self._safe_serialize(tool_output) + input_data = _coerce_to_dict(serialized_input) + input_data.setdefault("framework", "google_adk") + output_data: dict[str, Any] | None = ( + _coerce_to_dict(serialized_output) if serialized_output is not None else None + ) + self.emit_event( + ToolCallEvent.create( + name=tool_name, + version="unavailable", + integration=IntegrationType.LIBRARY, + input_data=input_data, + output_data=output_data, + error=str(error) if error else None, + latency_ms=latency_ms, + ) + ) except Exception: logger.warning("Error in on_tool_use", exc_info=True) @@ -376,20 +538,23 @@ def on_llm_call( if not self._connected: return try: - payload: dict[str, Any] = {"framework": "google_adk"} - if provider: - payload["provider"] = provider - if model: - payload["model"] = model - if tokens_prompt is not None: - payload["tokens_prompt"] = tokens_prompt - if tokens_completion is not None: - payload["tokens_completion"] = tokens_completion - if latency_ms is not None: - payload["latency_ms"] = latency_ms + model_name = model or "unknown" + resolved_provider = provider or _detect_provider(model_name) + input_messages: list[dict[str, str]] | None = None if self._capture_config.capture_content and messages: - payload["messages"] = messages - self.emit_dict_event("model.invoke", payload) + input_messages = messages + self.emit_event( + ModelInvokeEvent.create( + provider=resolved_provider, + name=model_name, + version="unavailable", + parameters={"framework": "google_adk"}, + prompt_tokens=tokens_prompt, + completion_tokens=tokens_completion, + latency_ms=latency_ms, + input_messages=input_messages, + ) + ) except Exception: logger.warning("Error in on_llm_call", exc_info=True) @@ -402,12 +567,21 @@ def _get_agent_name(self, callback_context: Any) -> str: return "unknown" def _emit_agent_config(self, agent_name: str, callback_context: Any) -> None: + """Emit a typed :class:`EnvironmentConfigEvent` per agent. + + Idempotent per agent — only the first call for a given agent + name actually emits. ADK's runtime is treated as a + ``simulated`` environment by default; the real production + environment (``cloud`` / ``on_prem``) is the responsibility of + the host application's environment.config emission, not this + framework adapter (mirrors the agno reference pattern). + """ with self._adapter_lock: if agent_name in self._seen_agents: return self._seen_agents.add(agent_name) agent = getattr(callback_context, "agent", None) - metadata: dict[str, Any] = { + attributes: dict[str, Any] = { "framework": "google_adk", "agent_name": agent_name, } @@ -415,17 +589,22 @@ def _emit_agent_config(self, agent_name: str, callback_context: Any) -> None: for attr in ("description", "instruction", "model"): val = getattr(agent, attr, None) if val is not None: - metadata[attr] = str(val) + attributes[attr] = str(val) tools = getattr(agent, "tools", None) if tools: - metadata["tools"] = [getattr(t, "name", str(t)) for t in tools] + attributes["tools"] = [getattr(t, "name", str(t)) for t in tools] sub_agents = getattr(agent, "sub_agents", None) if sub_agents: - metadata["sub_agents"] = [getattr(a, "name", str(a)) for a in sub_agents] + attributes["sub_agents"] = [getattr(a, "name", str(a)) for a in sub_agents] session = getattr(callback_context, "session", None) if session: - metadata["session_id"] = getattr(session, "id", None) - self.emit_dict_event("environment.config", metadata) + attributes["session_id"] = getattr(session, "id", None) + self.emit_event( + EnvironmentConfigEvent.create( + env_type=EnvironmentType.SIMULATED, + attributes=attributes, + ) + ) def _safe_serialize(self, value: Any) -> Any: try: diff --git a/src/layerlens/instrument/adapters/frameworks/llama_index/lifecycle.py b/src/layerlens/instrument/adapters/frameworks/llama_index/lifecycle.py index acbf1a9..b070e16 100644 --- a/src/layerlens/instrument/adapters/frameworks/llama_index/lifecycle.py +++ b/src/layerlens/instrument/adapters/frameworks/llama_index/lifecycle.py @@ -9,6 +9,25 @@ Query/retrieval → tool.call (L5a, retrieval) Agent handoff → agent.handoff (Cross) Workflow event → agent.state.change (Cross) + +Typed-event status (post PR #129 migration, bundle 3): + +* Every emission flows through :meth:`BaseAdapter.emit_event` with a + canonical Pydantic payload imported from + :mod:`layerlens.instrument._compat.events`. +* LlamaIndex-specific provenance (``framework``, ``agent_name``, + ``step``, ``timestamp_ns``, ``tool_type``, ``result_count``) is + carried in the canonical model's metadata / attributes / parameters + / input slots — the canonical schema does not expose these as + top-level fields. +* The handoff context hash is generated via SHA-256 over the context + string (or the empty string when no context is available) so the + canonical :class:`AgentHandoffEvent.handoff_context_hash` validator + always passes. +* Retrieval events map onto :class:`ToolCallEvent` with + ``name="retrieval"`` and ``integration=IntegrationType.LIBRARY`` — + the canonical schema has no dedicated retrieval shape, but the + agno reference adapter follows the same convention. """ from __future__ import annotations @@ -20,6 +39,18 @@ import threading from typing import Any +from layerlens.instrument._compat.events import ( + MessageRole, + ToolCallEvent, + AgentInputEvent, + CostRecordEvent, + EnvironmentType, + IntegrationType, + AgentOutputEvent, + ModelInvokeEvent, + AgentHandoffEvent, + EnvironmentConfigEvent, +) from layerlens.instrument.adapters._base.adapter import ( AdapterInfo, BaseAdapter, @@ -33,6 +64,94 @@ logger = logging.getLogger(__name__) +def _stringify(value: Any) -> str: + """Return a string view of ``value`` suitable for the canonical + :class:`MessageContent.message` field. + + The canonical schema requires :class:`AgentInputEvent` and + :class:`AgentOutputEvent` to carry a ``message: str``. LlamaIndex + delivers the underlying input/output as arbitrary Python objects + (Pydantic ``Response`` models with ``response`` attribute, dicts, + ``None``); this helper converts each to a (possibly empty) string + so the typed event always validates. The original payload is + preserved on :class:`MessageContent.metadata.raw_input` / + ``raw_output``. + """ + if value is None: + return "" + if isinstance(value, str): + return value + if isinstance(value, dict): + # LlamaIndex Response payloads serialise to ``{"response": ...}``; + # surface the response slot when present. + response = value.get("response") + if isinstance(response, str): + return response + content = value.get("content") + if isinstance(content, str): + return content + return str(value) + + +def _coerce_to_dict(value: Any) -> dict[str, Any]: + """Coerce ``value`` into a dict suitable for the canonical + :class:`ToolCallEvent` ``input`` / ``output`` slots. + + The canonical schema requires ``input: dict[str, Any]`` and + ``output: dict[str, Any] | None``. LlamaIndex tool events deliver + arbitrary Python values (scalars, dicts, lists, dataclass-like + objects). This helper wraps non-dict values in ``{"value": ...}`` + so the canonical slot is always satisfied. ``None`` returns an + empty dict so the caller can pass it positionally without a guard. + """ + if value is None: + return {} + if isinstance(value, dict): + return dict(value) + if isinstance(value, list): + return {"value": value} + return {"value": value} + + +def _sha256_of(value: str) -> str: + """Return a canonical ``sha256:`` hash string for ``value``. + + The canonical schema's :class:`AgentHandoffEvent` requires + ``handoff_context_hash`` to start with ``sha256:`` and have a + 64-character hex tail. Centralising the format here ensures every + emit site uses the same wire format — including the empty-string + fallback used when LlamaIndex has no context to hash. + """ + return "sha256:" + hashlib.sha256(value.encode("utf-8")).hexdigest() + + +def _detect_provider(model: str | None) -> str: + """Detect the LLM provider from a model identifier. + + LlamaIndex supports OpenAI, Anthropic, Google, Mistral, Cohere, + LiteLLM, etc. The canonical :class:`ModelInvokeEvent` requires + both ``provider`` and ``name``, so this heuristic derives the + provider from well-known model name prefixes. Unknown identifiers + return ``"unknown"`` per the canonical schema's NORMATIVE rule. + """ + if not model: + return "unknown" + model_lower = model.lower() + if "gpt" in model_lower or "o1" in model_lower or "o3" in model_lower: + return "openai" + if "claude" in model_lower: + return "anthropic" + if "gemini" in model_lower: + return "google" + if "mistral" in model_lower or "mixtral" in model_lower: + return "mistral" + if "llama" in model_lower: + return "meta" + if "command" in model_lower: + return "cohere" + return "unknown" + + class LlamaIndexAdapter(BaseAdapter): """LayerLens adapter for LlamaIndex.""" @@ -41,9 +160,16 @@ class LlamaIndexAdapter(BaseAdapter): # The adapter source has no direct ``pydantic`` imports (verified by # grep across ``frameworks/llama_index/``). LlamaIndex's # Instrumentation Module emits dict-shaped events that the adapter - # forwards without touching framework Pydantic models. + # forwards through the canonical schema (PR #129) without touching + # framework Pydantic models. requires_pydantic = PydanticCompat.V1_OR_V2 + # Per-adapter ``extra="allow"`` decision: llama_index targets the + # canonical 13-event taxonomy exclusively. Unknown event types must + # be rejected by the base adapter's typed-event validator, so this + # stays ``False``. + ALLOW_UNREGISTERED_EVENTS: bool = False + def __init__( self, stratix: Any | None = None, @@ -203,90 +329,145 @@ def _on_llm_start(self, event: Any) -> None: pass # Timing tracked on end def _on_llm_end(self, event: Any) -> None: - payload: dict[str, Any] = {"framework": "llama_index"} - model = getattr(event, "model", None) or getattr(event, "model_name", None) - if model: - payload["model"] = str(model) + """Emit a typed :class:`ModelInvokeEvent` and (optionally) + :class:`CostRecordEvent` for an LLM completion. + + LlamaIndex-specific provenance (``framework``) lives on + :attr:`ModelInfo.parameters`. Token usage is extracted from + the optional ``response.raw.usage`` slot LlamaIndex exposes + for OpenAI-compatible providers. + """ + model_raw = getattr(event, "model", None) or getattr(event, "model_name", None) + model_name = str(model_raw) if model_raw else "unknown" + provider = _detect_provider(model_name) + prompt_tokens: int | None = None + completion_tokens: int | None = None response = getattr(event, "response", None) if response: raw = getattr(response, "raw", None) if raw: usage = getattr(raw, "usage", None) if usage: - payload["tokens_prompt"] = getattr(usage, "prompt_tokens", None) - payload["tokens_completion"] = getattr(usage, "completion_tokens", None) - self.emit_dict_event("model.invoke", payload) - if "tokens_prompt" in payload or "tokens_completion" in payload: - self.emit_dict_event( - "cost.record", - { - "framework": "llama_index", - "model": payload.get("model"), - "tokens_prompt": payload.get("tokens_prompt"), - "tokens_completion": payload.get("tokens_completion"), - "tokens_total": (payload.get("tokens_prompt") or 0) + (payload.get("tokens_completion") or 0), - }, + prompt_tokens = getattr(usage, "prompt_tokens", None) + completion_tokens = getattr(usage, "completion_tokens", None) + self.emit_event( + ModelInvokeEvent.create( + provider=provider, + name=model_name, + version="unavailable", + parameters={"framework": "llama_index"}, + prompt_tokens=prompt_tokens, + completion_tokens=completion_tokens, + ) + ) + if prompt_tokens is not None or completion_tokens is not None: + total_tokens = (prompt_tokens or 0) + (completion_tokens or 0) + self.emit_event( + CostRecordEvent.create( + prompt_tokens=prompt_tokens, + completion_tokens=completion_tokens, + tokens=total_tokens, + ) ) def _on_tool_call(self, event: Any) -> None: - self.emit_dict_event( - "tool.call", - { - "framework": "llama_index", - "tool_name": getattr(event, "tool_name", None) or getattr(event, "name", "unknown"), - "tool_input": self._safe_serialize(getattr(event, "tool_input", None)), - "tool_output": self._safe_serialize(getattr(event, "tool_output", None)), - }, + """Emit a typed :class:`ToolCallEvent` for a LlamaIndex tool call.""" + tool_name_raw = getattr(event, "tool_name", None) or getattr(event, "name", "unknown") + tool_name: str = str(tool_name_raw) if tool_name_raw else "unknown" + serialized_input = self._safe_serialize(getattr(event, "tool_input", None)) + serialized_output = self._safe_serialize(getattr(event, "tool_output", None)) + input_data = _coerce_to_dict(serialized_input) + input_data.setdefault("framework", "llama_index") + output_data: dict[str, Any] | None = ( + _coerce_to_dict(serialized_output) if serialized_output is not None else None + ) + self.emit_event( + ToolCallEvent.create( + name=tool_name, + version="unavailable", + integration=IntegrationType.LIBRARY, + input_data=input_data, + output_data=output_data, + ) ) def _on_retrieval_start(self, event: Any) -> None: pass # Tracked on end def _on_retrieval_end(self, event: Any) -> None: + """Emit a typed :class:`ToolCallEvent` for a retrieval result. + + Retrieval is modelled as a tool call with ``name="retrieval"`` + and ``integration=IntegrationType.LIBRARY``. Adapter-specific + provenance (``framework``, ``tool_type``, ``result_count``) + lives on the canonical ``input`` slot; the per-node scores + list lives on the canonical ``output`` slot. + """ nodes = getattr(event, "nodes", None) or [] - self.emit_dict_event( - "tool.call", - { - "framework": "llama_index", - "tool_name": "retrieval", - "tool_type": "retrieval", - "tool_output": self._safe_serialize([{"score": getattr(n, "score", None)} for n in nodes[:10]]), - "result_count": len(nodes), - }, + node_scores = self._safe_serialize( + [{"score": getattr(n, "score", None)} for n in nodes[:10]] + ) + self.emit_event( + ToolCallEvent.create( + name="retrieval", + version="unavailable", + integration=IntegrationType.LIBRARY, + input_data={ + "framework": "llama_index", + "tool_type": "retrieval", + "result_count": len(nodes), + }, + output_data={"nodes": node_scores}, + ) ) def _on_agent_step_start(self, event: Any) -> None: + """Emit a typed :class:`AgentInputEvent` for agent step start. + + LlamaIndex-specific provenance (``framework``, ``agent_name``, + ``step``, ``timestamp_ns``) lives on + :class:`MessageContent.metadata`. The canonical ``message`` + field carries a string view of the step. + """ agent_name = getattr(event, "agent_id", None) or "llama_agent" self._emit_agent_config(agent_name, event) tid = threading.get_ident() start_ns = time.time_ns() with self._adapter_lock: self._agent_starts[tid] = start_ns - self.emit_dict_event( - "agent.input", - { - "framework": "llama_index", - "agent_name": agent_name, - "step": getattr(event, "step", None), - "timestamp_ns": start_ns, - }, + step = getattr(event, "step", None) + self.emit_event( + AgentInputEvent.create( + message=_stringify(step), + role=MessageRole.HUMAN, + metadata={ + "framework": "llama_index", + "agent_name": agent_name, + "step": step, + "timestamp_ns": start_ns, + }, + ) ) def _on_agent_step_end(self, event: Any) -> None: + """Emit a typed :class:`AgentOutputEvent` for agent step end.""" agent_name = getattr(event, "agent_id", None) or "llama_agent" tid = threading.get_ident() end_ns = time.time_ns() with self._adapter_lock: start_ns = self._agent_starts.pop(tid, 0) duration_ns = end_ns - start_ns if start_ns else 0 - self.emit_dict_event( - "agent.output", - { - "framework": "llama_index", - "agent_name": agent_name, - "output": self._safe_serialize(getattr(event, "response", None)), - "duration_ns": duration_ns, - }, + raw_output = self._safe_serialize(getattr(event, "response", None)) + self.emit_event( + AgentOutputEvent.create( + message=_stringify(raw_output), + metadata={ + "framework": "llama_index", + "agent_name": agent_name, + "duration_ns": duration_ns, + "raw_output": raw_output, + }, + ) ) # --- Lifecycle Hooks --- @@ -299,14 +480,18 @@ def on_agent_start(self, agent_name: str | None = None, input_data: Any = None) start_ns = time.time_ns() with self._adapter_lock: self._agent_starts[tid] = start_ns - self.emit_dict_event( - "agent.input", - { - "framework": "llama_index", - "agent_name": agent_name, - "input": self._safe_serialize(input_data), - "timestamp_ns": start_ns, - }, + raw_input = self._safe_serialize(input_data) + self.emit_event( + AgentInputEvent.create( + message=_stringify(raw_input), + role=MessageRole.HUMAN, + metadata={ + "framework": "llama_index", + "agent_name": agent_name, + "timestamp_ns": start_ns, + "raw_input": raw_input, + }, + ) ) except Exception: logger.warning("Error in on_agent_start", exc_info=True) @@ -325,15 +510,21 @@ def on_agent_end( with self._adapter_lock: start_ns = self._agent_starts.pop(tid, 0) duration_ns = end_ns - start_ns if start_ns else 0 - payload: dict[str, Any] = { + raw_output = self._safe_serialize(output) + metadata: dict[str, Any] = { "framework": "llama_index", "agent_name": agent_name, - "output": self._safe_serialize(output), "duration_ns": duration_ns, + "raw_output": raw_output, } if error: - payload["error"] = str(error) - self.emit_dict_event("agent.output", payload) + metadata["error"] = str(error) + self.emit_event( + AgentOutputEvent.create( + message=_stringify(raw_output), + metadata=metadata, + ) + ) except Exception: logger.warning("Error in on_agent_end", exc_info=True) @@ -348,17 +539,24 @@ def on_tool_use( if not self._connected: return try: - payload: dict[str, Any] = { - "framework": "llama_index", - "tool_name": tool_name, - "tool_input": self._safe_serialize(tool_input), - "tool_output": self._safe_serialize(tool_output), - } - if error: - payload["error"] = str(error) - if latency_ms is not None: - payload["latency_ms"] = latency_ms - self.emit_dict_event("tool.call", payload) + serialized_input = self._safe_serialize(tool_input) + serialized_output = self._safe_serialize(tool_output) + input_data = _coerce_to_dict(serialized_input) + input_data.setdefault("framework", "llama_index") + output_data: dict[str, Any] | None = ( + _coerce_to_dict(serialized_output) if serialized_output is not None else None + ) + self.emit_event( + ToolCallEvent.create( + name=tool_name, + version="unavailable", + integration=IntegrationType.LIBRARY, + input_data=input_data, + output_data=output_data, + error=str(error) if error else None, + latency_ms=latency_ms, + ) + ) except Exception: logger.warning("Error in on_tool_use", exc_info=True) @@ -374,20 +572,23 @@ def on_llm_call( if not self._connected: return try: - payload: dict[str, Any] = {"framework": "llama_index"} - if provider: - payload["provider"] = provider - if model: - payload["model"] = model - if tokens_prompt is not None: - payload["tokens_prompt"] = tokens_prompt - if tokens_completion is not None: - payload["tokens_completion"] = tokens_completion - if latency_ms is not None: - payload["latency_ms"] = latency_ms + model_name = model or "unknown" + resolved_provider = provider or _detect_provider(model_name) + input_messages: list[dict[str, str]] | None = None if self._capture_config.capture_content and messages: - payload["messages"] = messages - self.emit_dict_event("model.invoke", payload) + input_messages = messages + self.emit_event( + ModelInvokeEvent.create( + provider=resolved_provider, + name=model_name, + version="unavailable", + parameters={"framework": "llama_index"}, + prompt_tokens=tokens_prompt, + completion_tokens=tokens_completion, + latency_ms=latency_ms, + input_messages=input_messages, + ) + ) except Exception: logger.warning("Error in on_llm_call", exc_info=True) @@ -396,14 +597,12 @@ def on_handoff(self, from_agent: str, to_agent: str, context: Any = None) -> Non return try: context_str = str(context) if context else "" - self.emit_dict_event( - "agent.handoff", - { - "from_agent": from_agent, - "to_agent": to_agent, - "reason": "agent_workflow_handoff", - "context_hash": hashlib.sha256(context_str.encode()).hexdigest() if context_str else None, - }, + self.emit_event( + AgentHandoffEvent.create( + from_agent=from_agent, + to_agent=to_agent, + handoff_context_hash=_sha256_of(context_str), + ) ) except Exception: logger.warning("Error in on_handoff", exc_info=True) @@ -411,18 +610,32 @@ def on_handoff(self, from_agent: str, to_agent: str, context: Any = None) -> Non # --- Helpers --- def _emit_agent_config(self, agent_name: str, event_or_agent: Any) -> None: + """Emit a typed :class:`EnvironmentConfigEvent` per agent. + + Idempotent per agent — only the first call for a given agent + name actually emits. LlamaIndex's runtime is treated as a + ``simulated`` environment by default; the real production + environment (``cloud`` / ``on_prem``) is the responsibility + of the host application's environment.config emission, not + this framework adapter (mirrors the agno reference pattern). + """ with self._adapter_lock: if agent_name in self._seen_agents: return self._seen_agents.add(agent_name) - metadata: dict[str, Any] = { + attributes: dict[str, Any] = { "framework": "llama_index", "agent_name": agent_name, } tools = getattr(event_or_agent, "tools", None) if tools: - metadata["tools"] = [getattr(t, "name", str(t)) for t in tools] - self.emit_dict_event("environment.config", metadata) + attributes["tools"] = [getattr(t, "name", str(t)) for t in tools] + self.emit_event( + EnvironmentConfigEvent.create( + env_type=EnvironmentType.SIMULATED, + attributes=attributes, + ) + ) def _safe_serialize(self, value: Any) -> Any: try: diff --git a/src/layerlens/instrument/adapters/frameworks/ms_agent_framework/lifecycle.py b/src/layerlens/instrument/adapters/frameworks/ms_agent_framework/lifecycle.py index ea7c153..d738572 100644 --- a/src/layerlens/instrument/adapters/frameworks/ms_agent_framework/lifecycle.py +++ b/src/layerlens/instrument/adapters/frameworks/ms_agent_framework/lifecycle.py @@ -8,6 +8,27 @@ Tool call -> tool.call (L5a) Model call -> model.invoke (L3) Channel selection -> agent.state.change (Cross) + +Typed-event status (post PR #129 migration, bundle 3): + +* Every emission flows through :meth:`BaseAdapter.emit_event` with a + canonical Pydantic payload imported from + :mod:`layerlens.instrument._compat.events`. +* MS Agent Framework-specific provenance (``framework``, ``agent_name``, + ``chat_name``, ``chat_type``, ``timestamp_ns``, ``selection_strategy``, + ``termination_strategy``) is carried in the canonical model's + metadata / attributes / parameters slots. +* The ``agent.state.change`` "run_complete" / "run_failed" marker + emitted by :meth:`on_run_end` does not satisfy the canonical + :class:`AgentStateChangeEvent` ``before_hash`` / ``after_hash`` + contract (the run boundary has no real state mutation to hash). + It is mapped onto :class:`AgentOutputEvent` metadata as + ``run_status`` so the cross-cutting completion signal is preserved + without violating the canonical schema. +* The handoff context hash is generated via SHA-256 over the context + string (or the empty string when no context is available) so the + canonical :class:`AgentHandoffEvent.handoff_context_hash` validator + always passes. """ from __future__ import annotations @@ -19,6 +40,18 @@ import threading from typing import Any +from layerlens.instrument._compat.events import ( + MessageRole, + ToolCallEvent, + AgentInputEvent, + CostRecordEvent, + EnvironmentType, + IntegrationType, + AgentOutputEvent, + ModelInvokeEvent, + AgentHandoffEvent, + EnvironmentConfigEvent, +) from layerlens.instrument.adapters._base.adapter import ( AdapterInfo, BaseAdapter, @@ -32,6 +65,60 @@ logger = logging.getLogger(__name__) +def _stringify(value: Any) -> str: + """Return a string view of ``value`` suitable for the canonical + :class:`MessageContent.message` field. + + The canonical schema requires :class:`AgentInputEvent` and + :class:`AgentOutputEvent` to carry a ``message: str``. MS Agent + Framework delivers the underlying input/output as arbitrary Python + objects (Pydantic ``ChatMessageContent`` models with ``content`` + / ``items``, dicts, ``None``); this helper converts each to a + (possibly empty) string so the typed event always validates. The + original payload is preserved on + :class:`MessageContent.metadata.raw_input` / ``raw_output``. + """ + if value is None: + return "" + if isinstance(value, str): + return value + if isinstance(value, dict): + # SK ChatMessageContent serialises with a ``content`` slot. + content = value.get("content") + if isinstance(content, str): + return content + return str(value) + + +def _coerce_to_dict(value: Any) -> dict[str, Any]: + """Coerce ``value`` into a dict suitable for the canonical + :class:`ToolCallEvent` ``input`` / ``output`` slots. + + The canonical schema requires ``input: dict[str, Any]`` and + ``output: dict[str, Any] | None``. SK function-call payloads + deliver arbitrary Python values. This helper wraps non-dict + values in ``{"value": ...}`` so the canonical slot is always + satisfied. + """ + if value is None: + return {} + if isinstance(value, dict): + return dict(value) + return {"value": value} + + +def _sha256_of(value: str) -> str: + """Return a canonical ``sha256:`` hash string for ``value``. + + The canonical schema's :class:`AgentHandoffEvent` requires + ``handoff_context_hash`` to start with ``sha256:`` and have a + 64-character hex tail. Centralising the format here ensures every + emit site uses the same wire format — including the empty-string + fallback used when the framework has no context to hash. + """ + return "sha256:" + hashlib.sha256(value.encode("utf-8")).hexdigest() + + class MSAgentAdapter(BaseAdapter): """LayerLens adapter for Microsoft Agent Framework.""" @@ -39,11 +126,18 @@ class MSAgentAdapter(BaseAdapter): VERSION = "0.1.0" # The adapter source has no direct ``pydantic`` imports (verified by # grep across ``frameworks/ms_agent_framework/``). The adapter wraps - # AgentChat.invoke() and emits dict events. The pyproject extra pulls + # AgentChat.invoke() and emits typed events through the canonical + # schema (PR #129). The pyproject extra pulls # ``semantic-kernel>=1.0,<2.0`` (SK 1.x is internally Pydantic v2) # but the adapter itself stays version-agnostic. requires_pydantic = PydanticCompat.V1_OR_V2 + # Per-adapter ``extra="allow"`` decision: ms_agent_framework targets + # the canonical 13-event taxonomy exclusively. Unknown event types + # must be rejected by the base adapter's typed-event validator, so + # this stays ``False``. + ALLOW_UNREGISTERED_EVENTS: bool = False + def __init__( self, stratix: Any | None = None, @@ -217,68 +311,93 @@ async def traced_invoke_stream(*args: Any, **kwargs: Any) -> Any: return traced_invoke_stream def _process_message(self, chat: Any, message: Any, current_agent: str) -> None: - """Process a chat message to extract tool calls, model info, and handoffs.""" + """Process a chat message to extract tool calls, model info, and handoffs. + + Emits typed :class:`AgentHandoffEvent`, :class:`ToolCallEvent`, + :class:`ModelInvokeEvent`, and :class:`CostRecordEvent` based + on the message shape. SK-specific provenance is carried on + canonical metadata / parameters / input slots. + """ try: # Detect agent turn transitions (handoffs in group chat) msg_agent_name = getattr(message, "agent_name", None) or getattr(message, "name", None) if msg_agent_name and msg_agent_name != current_agent: - self.emit_dict_event( - "agent.handoff", - { - "from_agent": current_agent, - "to_agent": msg_agent_name, - "reason": "group_chat_turn", - }, + self.emit_event( + AgentHandoffEvent.create( + from_agent=current_agent, + to_agent=msg_agent_name, + handoff_context_hash=_sha256_of(""), + ) ) # Extract tool calls from message items = getattr(message, "items", None) or [] for item in items: item_type = type(item).__name__ + tool_name_raw = getattr(item, "name", None) or getattr( + item, "function_name", "unknown" + ) + tool_name: str = str(tool_name_raw) if tool_name_raw else "unknown" if "FunctionCall" in item_type or "ToolCall" in item_type: - self.emit_dict_event( - "tool.call", - { - "framework": "ms_agent_framework", - "tool_name": getattr(item, "name", None) or getattr(item, "function_name", "unknown"), - "tool_input": self._safe_serialize(getattr(item, "arguments", None)), - }, + serialized_input = self._safe_serialize(getattr(item, "arguments", None)) + input_data = _coerce_to_dict(serialized_input) + input_data.setdefault("framework", "ms_agent_framework") + self.emit_event( + ToolCallEvent.create( + name=tool_name, + version="unavailable", + integration=IntegrationType.LIBRARY, + input_data=input_data, + ) ) elif "FunctionResult" in item_type or "ToolResult" in item_type: - self.emit_dict_event( - "tool.call", - { - "framework": "ms_agent_framework", - "tool_name": getattr(item, "name", None) or getattr(item, "function_name", "unknown"), - "tool_output": self._safe_serialize(getattr(item, "result", None)), - }, + serialized_output = self._safe_serialize(getattr(item, "result", None)) + output_data: dict[str, Any] | None = ( + _coerce_to_dict(serialized_output) + if serialized_output is not None + else None + ) + self.emit_event( + ToolCallEvent.create( + name=tool_name, + version="unavailable", + integration=IntegrationType.LIBRARY, + input_data={"framework": "ms_agent_framework"}, + output_data=output_data, + ) ) # Extract model info from metadata metadata = getattr(message, "metadata", None) or {} if isinstance(metadata, dict): - model = metadata.get("model") or metadata.get("model_id") - if model: - self.emit_dict_event( - "model.invoke", - { - "framework": "ms_agent_framework", - "model": str(model), - "provider": self._detect_provider(str(model)), - }, + model_raw = metadata.get("model") or metadata.get("model_id") + if model_raw: + model_name = str(model_raw) + provider = self._detect_provider(model_name) or "azure_openai" + self.emit_event( + ModelInvokeEvent.create( + provider=provider, + name=model_name, + version="unavailable", + parameters={"framework": "ms_agent_framework"}, + ) ) usage = metadata.get("usage") if usage: - self.emit_dict_event( - "cost.record", - { - "framework": "ms_agent_framework", - "model": str(model) if model else None, - "tokens_prompt": getattr(usage, "prompt_tokens", None) - or (usage.get("prompt_tokens") if isinstance(usage, dict) else None), - "tokens_completion": getattr(usage, "completion_tokens", None) - or (usage.get("completion_tokens") if isinstance(usage, dict) else None), - }, + prompt_tokens = getattr(usage, "prompt_tokens", None) or ( + usage.get("prompt_tokens") if isinstance(usage, dict) else None + ) + completion_tokens = getattr(usage, "completion_tokens", None) or ( + usage.get("completion_tokens") if isinstance(usage, dict) else None + ) + self.emit_event( + CostRecordEvent.create( + prompt_tokens=prompt_tokens, + completion_tokens=completion_tokens, + tokens=(prompt_tokens or 0) + (completion_tokens or 0) + if (prompt_tokens or completion_tokens) + else None, + ) ) except Exception: logger.debug("Could not process message", exc_info=True) @@ -286,7 +405,7 @@ def _process_message(self, chat: Any, message: Any, current_agent: str) -> None: # --- Lifecycle Hooks --- def on_run_start(self, agent_name: str | None = None, input_data: Any = None) -> None: - """Emit agent.input event when a chat invocation starts.""" + """Emit a typed :class:`AgentInputEvent` when a chat invocation starts.""" if not self._connected: return try: @@ -294,14 +413,18 @@ def on_run_start(self, agent_name: str | None = None, input_data: Any = None) -> start_ns = time.time_ns() with self._adapter_lock: self._run_starts[tid] = start_ns - self.emit_dict_event( - "agent.input", - { - "framework": "ms_agent_framework", - "agent_name": agent_name, - "input": self._safe_serialize(input_data), - "timestamp_ns": start_ns, - }, + raw_input = self._safe_serialize(input_data) + self.emit_event( + AgentInputEvent.create( + message=_stringify(raw_input), + role=MessageRole.HUMAN, + metadata={ + "framework": "ms_agent_framework", + "agent_name": agent_name, + "timestamp_ns": start_ns, + "raw_input": raw_input, + }, + ) ) except Exception: logger.warning("Error in on_run_start", exc_info=True) @@ -312,7 +435,19 @@ def on_run_end( output: Any = None, error: Exception | None = None, ) -> None: - """Emit agent.output event when a chat invocation ends.""" + """Emit a typed :class:`AgentOutputEvent` when a chat invocation ends. + + The previous adapter implementation also emitted a separate + ``agent.state.change`` payload carrying only an + ``event_subtype`` marker (``run_complete`` / ``run_failed``). + That payload did not satisfy the canonical + :class:`AgentStateChangeEvent` ``before_hash`` / ``after_hash`` + contract — the run boundary has no real state mutation to + hash. The post-migration mapping carries the same signal as + ``run_status`` on :class:`MessageContent.metadata`, preserving + the cross-cutting completion marker without violating the + canonical schema. + """ if not self._connected: return try: @@ -321,22 +456,21 @@ def on_run_end( with self._adapter_lock: start_ns = self._run_starts.pop(tid, 0) duration_ns = end_ns - start_ns if start_ns else 0 - payload: dict[str, Any] = { + raw_output = self._safe_serialize(output) + metadata: dict[str, Any] = { "framework": "ms_agent_framework", "agent_name": agent_name, - "output": self._safe_serialize(output), "duration_ns": duration_ns, + "raw_output": raw_output, + "run_status": "run_complete" if not error else "run_failed", } if error: - payload["error"] = str(error) - self.emit_dict_event("agent.output", payload) - self.emit_dict_event( - "agent.state.change", - { - "framework": "ms_agent_framework", - "agent_name": agent_name, - "event_subtype": "run_complete" if not error else "run_failed", - }, + metadata["error"] = str(error) + self.emit_event( + AgentOutputEvent.create( + message=_stringify(raw_output), + metadata=metadata, + ) ) except Exception: logger.warning("Error in on_run_end", exc_info=True) @@ -349,21 +483,28 @@ def on_tool_use( error: Exception | None = None, latency_ms: float | None = None, ) -> None: - """Emit tool.call event for a tool invocation.""" + """Emit a typed :class:`ToolCallEvent` for a tool invocation.""" if not self._connected: return try: - payload: dict[str, Any] = { - "framework": "ms_agent_framework", - "tool_name": tool_name, - "tool_input": self._safe_serialize(tool_input), - "tool_output": self._safe_serialize(tool_output), - } - if error: - payload["error"] = str(error) - if latency_ms is not None: - payload["latency_ms"] = latency_ms - self.emit_dict_event("tool.call", payload) + serialized_input = self._safe_serialize(tool_input) + serialized_output = self._safe_serialize(tool_output) + input_data = _coerce_to_dict(serialized_input) + input_data.setdefault("framework", "ms_agent_framework") + output_data: dict[str, Any] | None = ( + _coerce_to_dict(serialized_output) if serialized_output is not None else None + ) + self.emit_event( + ToolCallEvent.create( + name=tool_name, + version="unavailable", + integration=IntegrationType.LIBRARY, + input_data=input_data, + output_data=output_data, + error=str(error) if error else None, + latency_ms=latency_ms, + ) + ) except Exception: logger.warning("Error in on_tool_use", exc_info=True) @@ -376,41 +517,42 @@ def on_llm_call( latency_ms: float | None = None, messages: list[dict[str, str]] | None = None, ) -> None: - """Emit model.invoke event for an LLM call.""" + """Emit a typed :class:`ModelInvokeEvent` for an LLM call.""" if not self._connected: return try: - payload: dict[str, Any] = {"framework": "ms_agent_framework"} - if provider: - payload["provider"] = provider - if model: - payload["model"] = model - if tokens_prompt is not None: - payload["tokens_prompt"] = tokens_prompt - if tokens_completion is not None: - payload["tokens_completion"] = tokens_completion - if latency_ms is not None: - payload["latency_ms"] = latency_ms + model_name = model or "unknown" + resolved_provider = provider or self._detect_provider(model_name) or "azure_openai" + input_messages: list[dict[str, str]] | None = None if self._capture_config.capture_content and messages: - payload["messages"] = messages - self.emit_dict_event("model.invoke", payload) + input_messages = messages + self.emit_event( + ModelInvokeEvent.create( + provider=resolved_provider, + name=model_name, + version="unavailable", + parameters={"framework": "ms_agent_framework"}, + prompt_tokens=tokens_prompt, + completion_tokens=tokens_completion, + latency_ms=latency_ms, + input_messages=input_messages, + ) + ) except Exception: logger.warning("Error in on_llm_call", exc_info=True) def on_handoff(self, from_agent: str, to_agent: str, context: Any = None) -> None: - """Emit agent.handoff event for agent turn transitions.""" + """Emit a typed :class:`AgentHandoffEvent` for agent turn transitions.""" if not self._connected: return try: context_str = str(context) if context else "" - self.emit_dict_event( - "agent.handoff", - { - "from_agent": from_agent, - "to_agent": to_agent, - "reason": "group_chat_turn", - "context_hash": hashlib.sha256(context_str.encode()).hexdigest() if context_str else None, - }, + self.emit_event( + AgentHandoffEvent.create( + from_agent=from_agent, + to_agent=to_agent, + handoff_context_hash=_sha256_of(context_str), + ) ) except Exception: logger.warning("Error in on_handoff", exc_info=True) @@ -437,12 +579,19 @@ def _detect_provider(self, model: str | None) -> str | None: return "azure_openai" # Default for MS Agent Framework def _emit_chat_config(self, chat_name: str, chat: Any) -> None: - """Emit environment.config event for chat configuration on first encounter.""" + """Emit a typed :class:`EnvironmentConfigEvent` per chat. + + Idempotent per chat — only the first call for a given chat + name actually emits. SK chat instances run in a + ``simulated`` environment by default; the real production + environment (``cloud`` / ``on_prem``) is the responsibility + of the host application's environment.config emission. + """ with self._adapter_lock: if chat_name in self._seen_agents: return self._seen_agents.add(chat_name) - metadata: dict[str, Any] = { + attributes: dict[str, Any] = { "framework": "ms_agent_framework", "chat_name": chat_name, "chat_type": type(chat).__name__, @@ -450,29 +599,34 @@ def _emit_chat_config(self, chat_name: str, chat: Any) -> None: # Extract agents from group chat agents = getattr(chat, "agents", None) if agents: - metadata["agents"] = [getattr(a, "name", str(a)) for a in agents] + attributes["agents"] = [getattr(a, "name", str(a)) for a in agents] # Extract agent info from single chat agent = getattr(chat, "agent", None) if agent: - metadata["agent_name"] = getattr(agent, "name", str(agent)) + attributes["agent_name"] = getattr(agent, "name", str(agent)) instructions = getattr(agent, "instructions", None) if instructions and self._capture_config.capture_content: - metadata["instructions"] = str(instructions)[:500] + attributes["instructions"] = str(instructions)[:500] kernel = getattr(agent, "kernel", None) if kernel: plugins = getattr(kernel, "plugins", None) if plugins: - metadata["plugins"] = ( + attributes["plugins"] = ( list(plugins.keys()) if isinstance(plugins, dict) else [str(p) for p in plugins] ) # Selection strategy for group chats selection_strategy = getattr(chat, "selection_strategy", None) if selection_strategy: - metadata["selection_strategy"] = type(selection_strategy).__name__ + attributes["selection_strategy"] = type(selection_strategy).__name__ termination_strategy = getattr(chat, "termination_strategy", None) if termination_strategy: - metadata["termination_strategy"] = type(termination_strategy).__name__ - self.emit_dict_event("environment.config", metadata) + attributes["termination_strategy"] = type(termination_strategy).__name__ + self.emit_event( + EnvironmentConfigEvent.create( + env_type=EnvironmentType.SIMULATED, + attributes=attributes, + ) + ) def _safe_serialize(self, value: Any) -> Any: """Safely serialize a value for event payloads.""" diff --git a/tests/instrument/adapters/frameworks/test_google_adk_adapter.py b/tests/instrument/adapters/frameworks/test_google_adk_adapter.py index 2c21a4d..c6cc785 100644 --- a/tests/instrument/adapters/frameworks/test_google_adk_adapter.py +++ b/tests/instrument/adapters/frameworks/test_google_adk_adapter.py @@ -2,6 +2,14 @@ Mocked at the SDK shape level — no real ``google.adk`` runtime needed. The adapter integrates via 6 native callbacks (before/after agent/model/tool). + +After the typed-event migration (PR #129 follow-up — bundle 3) every +emit site flows through :meth:`BaseAdapter.emit_event` with a canonical +Pydantic payload. The :class:`_RecordingStratix` stand-in below records +both shapes so pre- and post-migration assertions live side by side: the +``payload`` slot always carries a dict (model-dumped if typed), and +``typed_payloads`` holds the original Pydantic instances for tests +that want to assert against the model surface. """ from __future__ import annotations @@ -9,6 +17,10 @@ from types import SimpleNamespace from typing import Any, Dict, List +from layerlens._compat.pydantic import ( + BaseModel as _CompatBaseModel, + model_dump as _compat_model_dump, +) from layerlens.instrument.adapters._base import AdapterStatus, CaptureConfig from layerlens.instrument.adapters.frameworks.google_adk import ( ADAPTER_CLASS, @@ -26,10 +38,24 @@ class _RecordingStratix: def __init__(self) -> None: self.events: List[Dict[str, Any]] = [] + # Hold strong references to the original typed payloads for the + # subset of tests that want to assert against the model surface + # (e.g. ``isinstance(payload, ToolCallEvent)``). + self.typed_payloads: List[Any] = [] def emit(self, *args: Any, **kwargs: Any) -> None: + # Two-arg legacy path: ``emit(event_type, payload_dict)``. if len(args) == 2 and isinstance(args[0], str): self.events.append({"event_type": args[0], "payload": args[1]}) + return + # Single-arg typed path: ``emit(payload_model[, privacy_level])``. + if args and isinstance(args[0], _CompatBaseModel): + payload_model = args[0] + self.typed_payloads.append(payload_model) + event_type = getattr(payload_model, "event_type", "") + self.events.append( + {"event_type": event_type, "payload": _compat_model_dump(payload_model)} + ) class _FakeAgent: @@ -96,6 +122,9 @@ def test_instrument_agent_attaches_callbacks() -> None: def test_before_after_agent_emits_input_output() -> None: + """Typed AgentInputEvent + AgentOutputEvent for the agent lifecycle. + ADK-specific provenance lives on MessageContent.metadata. + """ stratix = _RecordingStratix() adapter = GoogleADKAdapter(stratix=stratix, capture_config=CaptureConfig.full()) adapter.connect() @@ -112,15 +141,22 @@ def test_before_after_agent_emits_input_output() -> None: assert "agent.output" in types inp = next(e for e in stratix.events if e["event_type"] == "agent.input") - assert inp["payload"]["agent_name"] == "planner" - assert inp["payload"]["input"] == "hello world" + inp_payload = inp["payload"] + assert inp_payload["layer"] == "L1" + assert inp_payload["content"]["message"] == "hello world" + assert inp_payload["content"]["metadata"]["agent_name"] == "planner" + assert inp_payload["content"]["metadata"]["framework"] == "google_adk" out = next(e for e in stratix.events if e["event_type"] == "agent.output") - assert out["payload"]["output"] == "response" - assert out["payload"]["duration_ns"] >= 0 + out_payload = out["payload"] + assert out_payload["content"]["message"] == "response" + assert out_payload["content"]["metadata"]["duration_ns"] >= 0 def test_after_model_emits_invoke_and_cost() -> None: + """Typed ModelInvokeEvent + CostRecordEvent. + Model name lives at payload.model.name; tokens at payload.cost.*. + """ stratix = _RecordingStratix() adapter = GoogleADKAdapter(stratix=stratix, capture_config=CaptureConfig.full()) adapter.connect() @@ -135,15 +171,23 @@ def test_after_model_emits_invoke_and_cost() -> None: adapter._after_model_callback(callback_context, llm_response) invoke = next(e for e in stratix.events if e["event_type"] == "model.invoke") - assert invoke["payload"]["model"] == "gemini-2" - assert invoke["payload"]["provider"] == "google" - assert invoke["payload"]["tokens_prompt"] == 10 + inv_payload = invoke["payload"] + assert inv_payload["layer"] == "L3" + assert inv_payload["model"]["name"] == "gemini-2" + assert inv_payload["model"]["provider"] == "google" + assert inv_payload["model"]["version"] == "unavailable" + assert inv_payload["prompt_tokens"] == 10 + assert inv_payload["completion_tokens"] == 20 cost = next(e for e in stratix.events if e["event_type"] == "cost.record") - assert cost["payload"]["tokens_total"] == 30 + cost_payload = cost["payload"] + assert cost_payload["cost"]["prompt_tokens"] == 10 + assert cost_payload["cost"]["completion_tokens"] == 20 + assert cost_payload["cost"]["tokens"] == 30 def test_after_tool_emits_tool_call() -> None: + """Typed ToolCallEvent: tool name lives at payload.tool.name.""" stratix = _RecordingStratix() adapter = GoogleADKAdapter(stratix=stratix, capture_config=CaptureConfig.full()) adapter.connect() @@ -153,21 +197,55 @@ def test_after_tool_emits_tool_call() -> None: adapter._after_tool_callback(SimpleNamespace(), "calc", inp, 42) evt = next(e for e in stratix.events if e["event_type"] == "tool.call") - assert evt["payload"]["tool_name"] == "calc" - assert evt["payload"]["tool_output"] == 42 - assert evt["payload"]["latency_ms"] is not None + payload = evt["payload"] + assert payload["layer"] == "L5a" + assert payload["tool"]["name"] == "calc" + assert payload["tool"]["integration"] == "library" + assert payload["tool"]["version"] == "unavailable" + assert payload["latency_ms"] is not None + # Scalar tool_output is wrapped in {"value": ...} so the canonical + # ``output: dict`` slot is satisfied. + assert payload["output"] == {"value": 42} + assert payload["input"]["x"] == 1 + assert payload["input"]["framework"] == "google_adk" def test_on_handoff_emits_event_with_context_hash() -> None: + """Typed AgentHandoffEvent: handoff_context_hash is sha256:. + + The previous adapter's ``context_hash`` (bare hex) and + ``context_preview`` (top-level) fields are gone — the canonical + schema only declares ``handoff_context_hash`` (with strict + ``sha256:`` prefix validation). + """ stratix = _RecordingStratix() adapter = GoogleADKAdapter(stratix=stratix, capture_config=CaptureConfig.full()) adapter.connect() adapter.on_handoff(from_agent="a", to_agent="b", context="some context") evt = next(e for e in stratix.events if e["event_type"] == "agent.handoff") - assert evt["payload"]["from_agent"] == "a" - assert evt["payload"]["to_agent"] == "b" - assert evt["payload"]["context_hash"] is not None + payload = evt["payload"] + assert payload["from_agent"] == "a" + assert payload["to_agent"] == "b" + assert payload["handoff_context_hash"].startswith("sha256:") + # 7-char prefix + 64 hex chars = 71 chars total per the canonical + # validator in events_cross_cutting.py. + assert len(payload["handoff_context_hash"]) == 7 + 64 + + +def test_handoff_emits_canonical_hash_for_empty_context() -> None: + """Empty context still produces a well-formed sha256 hash. + + The previous adapter emitted ``context_hash=None`` when the + context was missing; the canonical schema rejects ``None``. + """ + stratix = _RecordingStratix() + adapter = GoogleADKAdapter(stratix=stratix, capture_config=CaptureConfig.full()) + adapter.connect() + adapter.on_handoff(from_agent="a", to_agent="b", context=None) + + evt = next(e for e in stratix.events if e["event_type"] == "agent.handoff") + assert evt["payload"]["handoff_context_hash"].startswith("sha256:") def test_capture_config_gates_l3_model_metadata() -> None: @@ -191,6 +269,9 @@ def test_capture_config_gates_l3_model_metadata() -> None: def test_environment_config_emits_once_per_agent() -> None: + """Typed EnvironmentConfigEvent: provenance lives on + payload.environment.attributes. + """ stratix = _RecordingStratix() adapter = GoogleADKAdapter(stratix=stratix, capture_config=CaptureConfig.full()) adapter.connect() @@ -203,7 +284,11 @@ def test_environment_config_emits_once_per_agent() -> None: configs = [e for e in stratix.events if e["event_type"] == "environment.config"] assert len(configs) == 1 - assert configs[0]["payload"]["agent_name"] == "a1" + # Canonical L4a schema: payload.environment.attributes is the dict + # that carries adapter-specific provenance (agent_name). + attributes = configs[0]["payload"]["environment"]["attributes"] + assert attributes["agent_name"] == "a1" + assert configs[0]["payload"]["environment"]["type"] == "simulated" def test_instrument_agent_helper() -> None: @@ -224,3 +309,102 @@ def test_serialize_for_replay() -> None: assert rt.framework == "google_adk" assert rt.adapter_name == "GoogleADKAdapter" assert "capture_config" in rt.config + + +# --------------------------------------------------------------------------- +# Typed-event migration regression tests (PR #129 follow-up — bundle 3) +# --------------------------------------------------------------------------- + + +def test_google_adk_lifecycle_emits_typed_payloads_only() -> None: + """Every emit site in google_adk lifecycle.py is a typed emit_event call. + + Pins the post-migration contract: the recording stratix's + ``typed_payloads`` list grows for every emission and the legacy + two-arg dict path receives nothing. This is the public contract + backing the ``grep emit_dict_event src/.../google_adk/ → 0`` + acceptance criterion in the typed-events bundle 3 PR. + """ + from layerlens.instrument._compat.events import ( + ToolCallEvent, + AgentInputEvent, + CostRecordEvent, + AgentOutputEvent, + ModelInvokeEvent, + AgentHandoffEvent, + EnvironmentConfigEvent, + ) + + stratix = _RecordingStratix() + adapter = GoogleADKAdapter(stratix=stratix, capture_config=CaptureConfig.full()) + adapter.connect() + + agent = _FakeAgent(name="planner", model="gemini-2", tools=[SimpleNamespace(name="search")]) + cb = SimpleNamespace(agent=agent, user_content="hello", agent_output="reply", session=None) + adapter._before_agent_callback(cb) + adapter._after_agent_callback(cb) + + model_cb = SimpleNamespace(model="gemini-2", agent=None) + adapter._before_model_callback(model_cb, SimpleNamespace()) + adapter._after_model_callback( + model_cb, + SimpleNamespace(usage_metadata=SimpleNamespace(prompt_token_count=10, candidates_token_count=5)), + ) + + inp = {"x": 1} + adapter._before_tool_callback(SimpleNamespace(), "calc", inp) + adapter._after_tool_callback(SimpleNamespace(), "calc", inp, 42) + + adapter.on_agent_start(agent_name="other", input_data="task") + adapter.on_agent_end(agent_name="other", output="done") + adapter.on_tool_use("ext_tool", tool_input={"a": 1}, tool_output="ok") + adapter.on_llm_call(provider="google", model="gemini-2", tokens_prompt=5) + adapter.on_handoff(from_agent="planner", to_agent="executor", context="ctx") + + # Every captured payload is a Pydantic model instance — the legacy + # dict path was not used. + assert stratix.typed_payloads, "expected typed payloads to be captured" + types_seen = {type(p) for p in stratix.typed_payloads} + assert AgentInputEvent in types_seen + assert AgentOutputEvent in types_seen + assert AgentHandoffEvent in types_seen + assert CostRecordEvent in types_seen + assert EnvironmentConfigEvent in types_seen + assert ModelInvokeEvent in types_seen + assert ToolCallEvent in types_seen + + +def test_google_adk_emit_does_not_warn_after_migration() -> None: + """No DeprecationWarning fires from google_adk lifecycle emission paths. + + The base adapter's ``emit_dict_event`` raises a DeprecationWarning + on every call. After migration, google_adk lifecycle must never + trigger that warning. ``filterwarnings("error", ...)`` converts + the warning into a test failure. + """ + import warnings + + stratix = _RecordingStratix() + adapter = GoogleADKAdapter(stratix=stratix, capture_config=CaptureConfig.full()) + adapter.connect() + + agent = _FakeAgent(name="planner", model="gemini-2", tools=[SimpleNamespace(name="search")]) + cb = SimpleNamespace(agent=agent, user_content="hello", agent_output="reply", session=None) + + with warnings.catch_warnings(): + warnings.simplefilter("error", DeprecationWarning) + adapter._before_agent_callback(cb) + adapter._after_agent_callback(cb) + adapter._before_model_callback(SimpleNamespace(model="gemini-2", agent=None), SimpleNamespace()) + adapter._after_model_callback( + SimpleNamespace(model="gemini-2", agent=None), + SimpleNamespace(usage_metadata=SimpleNamespace(prompt_token_count=10, candidates_token_count=5)), + ) + inp = {"x": 1} + adapter._before_tool_callback(SimpleNamespace(), "calc", inp) + adapter._after_tool_callback(SimpleNamespace(), "calc", inp, 42) + adapter.on_agent_start(agent_name="o", input_data="i") + adapter.on_agent_end(agent_name="o", output="o") + adapter.on_tool_use("t", tool_input={"a": 1}, tool_output="ok") + adapter.on_llm_call(provider="google", model="gemini-2", tokens_prompt=5) + adapter.on_handoff(from_agent="a", to_agent="b", context="ctx") diff --git a/tests/instrument/adapters/frameworks/test_llama_index_adapter.py b/tests/instrument/adapters/frameworks/test_llama_index_adapter.py index 8365f35..1e08b2a 100644 --- a/tests/instrument/adapters/frameworks/test_llama_index_adapter.py +++ b/tests/instrument/adapters/frameworks/test_llama_index_adapter.py @@ -3,6 +3,14 @@ Mocked at the SDK shape level — no real ``llama_index`` runtime needed. Internal dispatch is by ``type(event).__name__``, so each test event uses a minimally-shaped class with the right name. + +After the typed-event migration (PR #129 follow-up — bundle 3) every +emit site flows through :meth:`BaseAdapter.emit_event` with a canonical +Pydantic payload. The :class:`_RecordingStratix` stand-in below records +both shapes so pre- and post-migration assertions live side by side: the +``payload`` slot always carries a dict (model-dumped if typed), and +``typed_payloads`` holds the original Pydantic instances for tests that +want to assert against the model surface. """ from __future__ import annotations @@ -10,6 +18,10 @@ from types import SimpleNamespace from typing import Any, Dict, List +from layerlens._compat.pydantic import ( + BaseModel as _CompatBaseModel, + model_dump as _compat_model_dump, +) from layerlens.instrument.adapters._base import AdapterStatus, CaptureConfig from layerlens.instrument.adapters.frameworks.llama_index import ( ADAPTER_CLASS, @@ -27,10 +39,22 @@ class _RecordingStratix: def __init__(self) -> None: self.events: List[Dict[str, Any]] = [] + # Hold strong references to the original typed payloads. + self.typed_payloads: List[Any] = [] def emit(self, *args: Any, **kwargs: Any) -> None: + # Two-arg legacy path: ``emit(event_type, payload_dict)``. if len(args) == 2 and isinstance(args[0], str): self.events.append({"event_type": args[0], "payload": args[1]}) + return + # Single-arg typed path: ``emit(payload_model[, privacy_level])``. + if args and isinstance(args[0], _CompatBaseModel): + payload_model = args[0] + self.typed_payloads.append(payload_model) + event_type = getattr(payload_model, "event_type", "") + self.events.append( + {"event_type": event_type, "payload": _compat_model_dump(payload_model)} + ) # Minimal classes shaped like LlamaIndex events. The adapter dispatches by @@ -89,6 +113,9 @@ def test_adapter_info_and_health() -> None: def test_handle_llm_end_emits_model_invoke_and_cost() -> None: + """Typed ModelInvokeEvent + CostRecordEvent. + Model name lives at payload.model.name; tokens at payload.cost.*. + """ stratix = _RecordingStratix() adapter = LlamaIndexAdapter(stratix=stratix, capture_config=CaptureConfig.full()) adapter.connect() @@ -102,25 +129,42 @@ def test_handle_llm_end_emits_model_invoke_and_cost() -> None: assert "cost.record" in types invoke = next(e for e in stratix.events if e["event_type"] == "model.invoke") - assert invoke["payload"]["model"] == "gpt-5" - assert invoke["payload"]["tokens_prompt"] == 10 + inv_payload = invoke["payload"] + assert inv_payload["layer"] == "L3" + assert inv_payload["model"]["name"] == "gpt-5" + assert inv_payload["model"]["provider"] == "openai" + assert inv_payload["model"]["version"] == "unavailable" + assert inv_payload["prompt_tokens"] == 10 + assert inv_payload["completion_tokens"] == 5 cost = next(e for e in stratix.events if e["event_type"] == "cost.record") - assert cost["payload"]["tokens_total"] == 15 + cost_payload = cost["payload"] + assert cost_payload["cost"]["prompt_tokens"] == 10 + assert cost_payload["cost"]["completion_tokens"] == 5 + assert cost_payload["cost"]["tokens"] == 15 def test_handle_tool_call_event_emits_tool_call() -> None: + """Typed ToolCallEvent: tool name lives at payload.tool.name. + Scalar output is wrapped in {"value": ...}. + """ stratix = _RecordingStratix() adapter = LlamaIndexAdapter(stratix=stratix, capture_config=CaptureConfig.full()) adapter.connect() adapter._handle_event(ToolCallEvent(tool_name="calc", tool_input={"x": 1}, tool_output=2)) evt = next(e for e in stratix.events if e["event_type"] == "tool.call") - assert evt["payload"]["tool_name"] == "calc" - assert evt["payload"]["tool_output"] == 2 + payload = evt["payload"] + assert payload["layer"] == "L5a" + assert payload["tool"]["name"] == "calc" + assert payload["tool"]["integration"] == "library" + assert payload["output"] == {"value": 2} def test_handle_retrieval_end_emits_retrieval_tool_call() -> None: + """Retrieval is mapped onto ToolCallEvent with tool.name='retrieval'. + Adapter-specific tool_type/result_count live on payload.input. + """ stratix = _RecordingStratix() adapter = LlamaIndexAdapter(stratix=stratix, capture_config=CaptureConfig.full()) adapter.connect() @@ -128,11 +172,17 @@ def test_handle_retrieval_end_emits_retrieval_tool_call() -> None: adapter._handle_event(RetrievalEndEvent(nodes=nodes)) evt = next(e for e in stratix.events if e["event_type"] == "tool.call") - assert evt["payload"]["tool_type"] == "retrieval" - assert evt["payload"]["result_count"] == 2 + payload = evt["payload"] + assert payload["tool"]["name"] == "retrieval" + assert payload["input"]["tool_type"] == "retrieval" + assert payload["input"]["result_count"] == 2 def test_agent_step_start_end_emits_input_output_and_config() -> None: + """Typed AgentInputEvent + AgentOutputEvent + EnvironmentConfigEvent. + LlamaIndex-specific provenance lives on MessageContent.metadata + and EnvironmentInfo.attributes. + """ stratix = _RecordingStratix() adapter = LlamaIndexAdapter(stratix=stratix, capture_config=CaptureConfig.full()) adapter.connect() @@ -146,20 +196,38 @@ def test_agent_step_start_end_emits_input_output_and_config() -> None: assert "agent.output" in types out = next(e for e in stratix.events if e["event_type"] == "agent.output") - assert out["payload"]["agent_name"] == "myagent" - assert out["payload"]["duration_ns"] >= 0 + out_payload = out["payload"] + assert out_payload["content"]["message"] == "result" + metadata = out_payload["content"]["metadata"] + assert metadata["agent_name"] == "myagent" + assert metadata["framework"] == "llama_index" + assert metadata["duration_ns"] >= 0 def test_on_handoff_emits_event_with_context_hash() -> None: + """Typed AgentHandoffEvent: handoff_context_hash is sha256:.""" stratix = _RecordingStratix() adapter = LlamaIndexAdapter(stratix=stratix, capture_config=CaptureConfig.full()) adapter.connect() adapter.on_handoff(from_agent="a", to_agent="b", context="some context") evt = next(e for e in stratix.events if e["event_type"] == "agent.handoff") - assert evt["payload"]["from_agent"] == "a" - assert evt["payload"]["to_agent"] == "b" - assert evt["payload"]["context_hash"] is not None + payload = evt["payload"] + assert payload["from_agent"] == "a" + assert payload["to_agent"] == "b" + assert payload["handoff_context_hash"].startswith("sha256:") + assert len(payload["handoff_context_hash"]) == 7 + 64 + + +def test_handoff_emits_canonical_hash_for_empty_context() -> None: + """Empty context still produces a well-formed sha256 hash.""" + stratix = _RecordingStratix() + adapter = LlamaIndexAdapter(stratix=stratix, capture_config=CaptureConfig.full()) + adapter.connect() + adapter.on_handoff(from_agent="a", to_agent="b", context=None) + + evt = next(e for e in stratix.events if e["event_type"] == "agent.handoff") + assert evt["payload"]["handoff_context_hash"].startswith("sha256:") def test_capture_config_gates_l5a_tool_calls() -> None: @@ -203,3 +271,98 @@ def test_serialize_for_replay() -> None: assert rt.framework == "llama_index" assert rt.adapter_name == "LlamaIndexAdapter" assert "capture_config" in rt.config + + +# --------------------------------------------------------------------------- +# Typed-event migration regression tests (PR #129 follow-up — bundle 3) +# --------------------------------------------------------------------------- + + +def test_llama_index_lifecycle_emits_typed_payloads_only() -> None: + """Every emit site in llama_index lifecycle.py is a typed emit_event call. + + Pins the post-migration contract: the recording stratix's + ``typed_payloads`` list grows for every emission and the legacy + two-arg dict path receives nothing. + """ + from layerlens.instrument._compat.events import ( + AgentInputEvent, + CostRecordEvent, + AgentOutputEvent, + ModelInvokeEvent, + AgentHandoffEvent, + EnvironmentConfigEvent, + ) + from layerlens.instrument._compat.events import ( + ToolCallEvent as CanonicalToolCallEvent, + ) + + stratix = _RecordingStratix() + adapter = LlamaIndexAdapter(stratix=stratix, capture_config=CaptureConfig.full()) + adapter.connect() + + # LLM end + raw = SimpleNamespace(usage=SimpleNamespace(prompt_tokens=10, completion_tokens=5)) + response = SimpleNamespace(raw=raw) + adapter._handle_event(LLMChatEndEvent(model="gpt-5", response=response)) + + # Tool call + adapter._handle_event(ToolCallEvent(tool_name="calc", tool_input={"x": 1}, tool_output=2)) + + # Retrieval + adapter._handle_event(RetrievalEndEvent(nodes=[SimpleNamespace(score=0.9)])) + + # Agent step + adapter._handle_event(AgentRunStepStartEvent(agent_id="myagent", step=1)) + adapter._handle_event(AgentRunStepEndEvent(agent_id="myagent", response="ok")) + + # Direct lifecycle hooks + adapter.on_agent_start(agent_name="other", input_data="task") + adapter.on_agent_end(agent_name="other", output="done") + adapter.on_tool_use("ext_tool", tool_input={"a": 1}, tool_output="ok") + adapter.on_llm_call(provider="openai", model="gpt-5", tokens_prompt=5) + adapter.on_handoff(from_agent="planner", to_agent="executor", context="ctx") + + # Every captured payload is a Pydantic model instance — the legacy + # dict path was not used. + assert stratix.typed_payloads, "expected typed payloads to be captured" + types_seen = {type(p) for p in stratix.typed_payloads} + assert AgentInputEvent in types_seen + assert AgentOutputEvent in types_seen + assert AgentHandoffEvent in types_seen + assert CostRecordEvent in types_seen + assert EnvironmentConfigEvent in types_seen + assert ModelInvokeEvent in types_seen + assert CanonicalToolCallEvent in types_seen + + +def test_llama_index_emit_does_not_warn_after_migration() -> None: + """No DeprecationWarning fires from llama_index lifecycle paths. + + The base adapter's ``emit_dict_event`` raises a DeprecationWarning + on every call. After migration, llama_index lifecycle must never + trigger that warning. + """ + import warnings + + stratix = _RecordingStratix() + adapter = LlamaIndexAdapter(stratix=stratix, capture_config=CaptureConfig.full()) + adapter.connect() + + raw = SimpleNamespace(usage=SimpleNamespace(prompt_tokens=10, completion_tokens=5)) + response = SimpleNamespace(raw=raw) + + with warnings.catch_warnings(): + warnings.simplefilter("error", DeprecationWarning) + adapter._handle_event(LLMChatEndEvent(model="gpt-5", response=response)) + adapter._handle_event( + ToolCallEvent(tool_name="calc", tool_input={"x": 1}, tool_output=2) + ) + adapter._handle_event(RetrievalEndEvent(nodes=[SimpleNamespace(score=0.9)])) + adapter._handle_event(AgentRunStepStartEvent(agent_id="myagent", step=1)) + adapter._handle_event(AgentRunStepEndEvent(agent_id="myagent", response="ok")) + adapter.on_agent_start(agent_name="o", input_data="i") + adapter.on_agent_end(agent_name="o", output="o") + adapter.on_tool_use("t", tool_input={"a": 1}, tool_output="ok") + adapter.on_llm_call(provider="openai", model="gpt-5", tokens_prompt=5) + adapter.on_handoff(from_agent="a", to_agent="b", context="ctx") diff --git a/tests/instrument/adapters/frameworks/test_ms_agent_framework_adapter.py b/tests/instrument/adapters/frameworks/test_ms_agent_framework_adapter.py index 818049f..2731185 100644 --- a/tests/instrument/adapters/frameworks/test_ms_agent_framework_adapter.py +++ b/tests/instrument/adapters/frameworks/test_ms_agent_framework_adapter.py @@ -3,6 +3,13 @@ Mocked at the SDK shape level — no real ``semantic_kernel.agents`` runtime needed. The adapter wraps ``invoke()`` async generators on chat instances; tests exercise ``_process_message`` and the lifecycle hooks directly. + +After the typed-event migration (PR #129 follow-up — bundle 3) every +emit site flows through :meth:`BaseAdapter.emit_event` with a canonical +Pydantic payload. The :class:`_RecordingStratix` stand-in below records +both shapes: the ``payload`` slot always carries a dict (model-dumped +if typed), and ``typed_payloads`` holds the original Pydantic instances +for tests that want to assert against the model surface. """ from __future__ import annotations @@ -10,6 +17,10 @@ from types import SimpleNamespace from typing import Any, Dict, List +from layerlens._compat.pydantic import ( + BaseModel as _CompatBaseModel, + model_dump as _compat_model_dump, +) from layerlens.instrument.adapters._base import AdapterStatus, CaptureConfig from layerlens.instrument.adapters.frameworks.ms_agent_framework import ( ADAPTER_CLASS, @@ -27,10 +38,22 @@ class _RecordingStratix: def __init__(self) -> None: self.events: List[Dict[str, Any]] = [] + # Hold strong references to the original typed payloads. + self.typed_payloads: List[Any] = [] def emit(self, *args: Any, **kwargs: Any) -> None: + # Two-arg legacy path: ``emit(event_type, payload_dict)``. if len(args) == 2 and isinstance(args[0], str): self.events.append({"event_type": args[0], "payload": args[1]}) + return + # Single-arg typed path: ``emit(payload_model[, privacy_level])``. + if args and isinstance(args[0], _CompatBaseModel): + payload_model = args[0] + self.typed_payloads.append(payload_model) + event_type = getattr(payload_model, "event_type", "") + self.events.append( + {"event_type": event_type, "payload": _compat_model_dump(payload_model)} + ) # Item types — name-driven dispatch in adapter @@ -85,6 +108,9 @@ def test_adapter_info_and_health() -> None: def test_instrument_chat_wraps_invoke_and_emits_config() -> None: + """Typed EnvironmentConfigEvent: chat_name lives at + payload.environment.attributes. + """ stratix = _RecordingStratix() adapter = MSAgentAdapter(stratix=stratix, capture_config=CaptureConfig.full()) adapter.connect() @@ -96,7 +122,9 @@ def test_instrument_chat_wraps_invoke_and_emits_config() -> None: assert chat.invoke_stream.__name__ == "traced_invoke_stream" cfg = next(e for e in stratix.events if e["event_type"] == "environment.config") - assert cfg["payload"]["chat_name"] == "planner-chat" + attributes = cfg["payload"]["environment"]["attributes"] + assert attributes["chat_name"] == "planner-chat" + assert cfg["payload"]["environment"]["type"] == "simulated" adapter.disconnect() # Restored. @@ -104,6 +132,7 @@ def test_instrument_chat_wraps_invoke_and_emits_config() -> None: def test_process_message_emits_handoff_on_agent_change() -> None: + """Typed AgentHandoffEvent: handoff_context_hash is sha256:.""" stratix = _RecordingStratix() adapter = MSAgentAdapter(stratix=stratix, capture_config=CaptureConfig.full()) adapter.connect() @@ -112,11 +141,16 @@ def test_process_message_emits_handoff_on_agent_change() -> None: adapter._process_message(_FakeChat(), msg, current_agent="alice") evt = next(e for e in stratix.events if e["event_type"] == "agent.handoff") - assert evt["payload"]["from_agent"] == "alice" - assert evt["payload"]["to_agent"] == "bob" + payload = evt["payload"] + assert payload["from_agent"] == "alice" + assert payload["to_agent"] == "bob" + assert payload["handoff_context_hash"].startswith("sha256:") def test_process_message_emits_tool_calls_from_function_items() -> None: + """Typed ToolCallEvent: tool name lives at payload.tool.name. + Scalar result is wrapped in {"value": ...}. + """ stratix = _RecordingStratix() adapter = MSAgentAdapter(stratix=stratix, capture_config=CaptureConfig.full()) adapter.connect() @@ -132,11 +166,16 @@ def test_process_message_emits_tool_calls_from_function_items() -> None: tool_calls = [e for e in stratix.events if e["event_type"] == "tool.call"] assert len(tool_calls) == 2 - assert tool_calls[0]["payload"]["tool_name"] == "calc" - assert tool_calls[1]["payload"]["tool_output"] == 42 + assert tool_calls[0]["payload"]["tool"]["name"] == "calc" + assert tool_calls[0]["payload"]["tool"]["integration"] == "library" + # Scalar result is wrapped in {"value": ...}. + assert tool_calls[1]["payload"]["output"] == {"value": 42} def test_process_message_emits_model_and_cost_from_metadata() -> None: + """Typed ModelInvokeEvent + CostRecordEvent. + Model name lives at payload.model.name; tokens at payload.cost.*. + """ stratix = _RecordingStratix() adapter = MSAgentAdapter(stratix=stratix, capture_config=CaptureConfig.full()) adapter.connect() @@ -148,12 +187,27 @@ def test_process_message_emits_model_and_cost_from_metadata() -> None: adapter._process_message(_FakeChat(), msg, current_agent="alice") invoke = next(e for e in stratix.events if e["event_type"] == "model.invoke") - assert invoke["payload"]["model"] == "gpt-5" - cost = next(e for e in stratix.events if e["event_type"] == "cost.record") - assert cost["payload"]["tokens_prompt"] == 10 - + inv_payload = invoke["payload"] + assert inv_payload["model"]["name"] == "gpt-5" + assert inv_payload["model"]["provider"] == "openai" -def test_on_run_start_end_emits_input_output_and_state() -> None: + cost = next(e for e in stratix.events if e["event_type"] == "cost.record") + assert cost["payload"]["cost"]["prompt_tokens"] == 10 + assert cost["payload"]["cost"]["completion_tokens"] == 5 + + +def test_on_run_start_end_emits_input_output() -> None: + """Typed migration: on_run_end → AgentOutputEvent with + run_status=run_complete on metadata. + + The previous adapter also emitted an ad-hoc + ``agent.state.change`` payload with ``event_subtype`` — + that did not satisfy the canonical + :class:`AgentStateChangeEvent` ``before_hash`` / ``after_hash`` + contract. The completion marker is now carried as + ``run_status`` on :class:`MessageContent.metadata` of the + :class:`AgentOutputEvent`. + """ stratix = _RecordingStratix() adapter = MSAgentAdapter(stratix=stratix, capture_config=CaptureConfig.full()) adapter.connect() @@ -164,7 +218,28 @@ def test_on_run_start_end_emits_input_output_and_state() -> None: types = [e["event_type"] for e in stratix.events] assert "agent.input" in types assert "agent.output" in types - assert "agent.state.change" in types + # agent.state.change is NO LONGER emitted — the completion marker + # is preserved on the agent.output metadata's run_status field. + assert "agent.state.change" not in types + + out = next(e for e in stratix.events if e["event_type"] == "agent.output") + metadata = out["payload"]["content"]["metadata"] + assert metadata["run_status"] == "run_complete" + + +def test_on_run_end_failure_carries_run_failed() -> None: + """When an error is supplied, run_status carries ``run_failed``.""" + stratix = _RecordingStratix() + adapter = MSAgentAdapter(stratix=stratix, capture_config=CaptureConfig.full()) + adapter.connect() + + adapter.on_run_start(agent_name="planner", input_data="hi") + adapter.on_run_end(agent_name="planner", output=None, error=RuntimeError("boom")) + + out = next(e for e in stratix.events if e["event_type"] == "agent.output") + metadata = out["payload"]["content"]["metadata"] + assert metadata["run_status"] == "run_failed" + assert "boom" in metadata["error"] def test_capture_config_gates_l5a_tool_calls() -> None: @@ -187,14 +262,29 @@ def test_capture_config_gates_l5a_tool_calls() -> None: def test_on_handoff_emits_event_with_context_hash() -> None: + """Typed AgentHandoffEvent: handoff_context_hash is sha256:.""" stratix = _RecordingStratix() adapter = MSAgentAdapter(stratix=stratix, capture_config=CaptureConfig.full()) adapter.connect() adapter.on_handoff(from_agent="a", to_agent="b", context="some context") evt = next(e for e in stratix.events if e["event_type"] == "agent.handoff") - assert evt["payload"]["from_agent"] == "a" - assert evt["payload"]["context_hash"] is not None + payload = evt["payload"] + assert payload["from_agent"] == "a" + assert payload["to_agent"] == "b" + assert payload["handoff_context_hash"].startswith("sha256:") + assert len(payload["handoff_context_hash"]) == 7 + 64 + + +def test_handoff_emits_canonical_hash_for_empty_context() -> None: + """Empty context still produces a well-formed sha256 hash.""" + stratix = _RecordingStratix() + adapter = MSAgentAdapter(stratix=stratix, capture_config=CaptureConfig.full()) + adapter.connect() + adapter.on_handoff(from_agent="a", to_agent="b", context=None) + + evt = next(e for e in stratix.events if e["event_type"] == "agent.handoff") + assert evt["payload"]["handoff_context_hash"].startswith("sha256:") def test_instrument_agent_helper() -> None: @@ -214,3 +304,98 @@ def test_serialize_for_replay() -> None: assert rt.framework == "ms_agent_framework" assert rt.adapter_name == "MSAgentAdapter" assert "capture_config" in rt.config + + +# --------------------------------------------------------------------------- +# Typed-event migration regression tests (PR #129 follow-up — bundle 3) +# --------------------------------------------------------------------------- + + +def test_ms_agent_framework_lifecycle_emits_typed_payloads_only() -> None: + """Every emit site in ms_agent_framework lifecycle.py is a typed + emit_event call. + + Pins the post-migration contract: the recording stratix's + ``typed_payloads`` list grows for every emission and the legacy + two-arg dict path receives nothing. + """ + from layerlens.instrument._compat.events import ( + ToolCallEvent, + AgentInputEvent, + CostRecordEvent, + AgentOutputEvent, + ModelInvokeEvent, + AgentHandoffEvent, + EnvironmentConfigEvent, + ) + + stratix = _RecordingStratix() + adapter = MSAgentAdapter(stratix=stratix, capture_config=CaptureConfig.full()) + adapter.connect() + + chat = _FakeChat(name="planner-chat") + adapter.instrument_chat(chat) + + # Process a message with all the typed emission paths. + msg = SimpleNamespace( + agent_name="bob", + items=[ + FunctionCallContent(name="calc", arguments={"x": 1}), + FunctionResultContent(name="calc", result=42), + ], + metadata={"model": "gpt-5", "usage": {"prompt_tokens": 10, "completion_tokens": 5}}, + ) + adapter._process_message(chat, msg, current_agent="alice") + + # Direct lifecycle hooks + adapter.on_run_start(agent_name="planner", input_data="hi") + adapter.on_run_end(agent_name="planner", output="bye") + adapter.on_tool_use("ext_tool", tool_input={"a": 1}, tool_output="ok") + adapter.on_llm_call(provider="openai", model="gpt-5", tokens_prompt=5) + adapter.on_handoff(from_agent="planner", to_agent="executor", context="ctx") + + # Every captured payload is a Pydantic model instance — the legacy + # dict path was not used. + assert stratix.typed_payloads, "expected typed payloads to be captured" + types_seen = {type(p) for p in stratix.typed_payloads} + assert AgentInputEvent in types_seen + assert AgentOutputEvent in types_seen + assert AgentHandoffEvent in types_seen + assert CostRecordEvent in types_seen + assert EnvironmentConfigEvent in types_seen + assert ModelInvokeEvent in types_seen + assert ToolCallEvent in types_seen + + +def test_ms_agent_framework_emit_does_not_warn_after_migration() -> None: + """No DeprecationWarning fires from ms_agent_framework lifecycle paths. + + The base adapter's ``emit_dict_event`` raises a DeprecationWarning + on every call. After migration, ms_agent_framework lifecycle must + never trigger that warning. + """ + import warnings + + stratix = _RecordingStratix() + adapter = MSAgentAdapter(stratix=stratix, capture_config=CaptureConfig.full()) + adapter.connect() + + chat = _FakeChat(name="planner-chat") + + with warnings.catch_warnings(): + warnings.simplefilter("error", DeprecationWarning) + adapter.instrument_chat(chat) + msg = SimpleNamespace( + agent_name="bob", + items=[ + FunctionCallContent(name="calc", arguments={"x": 1}), + FunctionResultContent(name="calc", result=42), + ], + metadata={"model": "gpt-5", "usage": {"prompt_tokens": 10, "completion_tokens": 5}}, + ) + adapter._process_message(chat, msg, current_agent="alice") + adapter.on_run_start(agent_name="p", input_data="i") + adapter.on_run_end(agent_name="p", output="o") + adapter.on_tool_use("t", tool_input={"a": 1}, tool_output="ok") + adapter.on_llm_call(provider="openai", model="gpt-5", tokens_prompt=5) + adapter.on_handoff(from_agent="a", to_agent="b", context="ctx")