From 09cdf71602a74ff6d6a2c5c456804bd7828793e6 Mon Sep 17 00:00:00 2001 From: mmercuri Date: Sat, 25 Apr 2026 22:33:46 -0700 Subject: [PATCH 1/2] feat(instrument): port AutoGen framework adapter (M2 fan-out) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Port the AutoGen framework adapter from the ateam reference implementation onto the new layerlens.instrument base layer. This is the M2 fan-out per-adapter PR for AutoGen — replaces the AutoGen slice of the larger orchestration bundle (PR #96) with a self-contained, mergeable unit. Source: A:/github/layerlens/ateam/stratix/sdk/python/adapters/autogen/ (~1,213 LOC, 6 files) Template: src/layerlens/instrument/adapters/frameworks/langchain/ Scope ----- - src/layerlens/instrument/adapters/frameworks/autogen/ - __init__.py — public surface + ADAPTER_CLASS + instrument_agents - lifecycle.py — AutoGenAdapter (BaseAdapter subclass) - wrappers.py — traced send/receive/generate_reply/execute_code - groupchat.py — GroupChatTracer (multi-agent turn management) - human_proxy.py — HumanProxyTracer (HITL classification) - metadata.py — environment.config payload extractor - tests/instrument/adapters/frameworks/test_autogen.py — 13 SDK-shape mocked tests (no pyautogen install required) - samples/instrument/autogen/main.py — runnable end-to-end demo with mocked LLM + duck-typed agent - docs/adapters/frameworks/autogen.md — install, quick-start, events emitted, capture-config guide - pyproject.toml - autogen = ["pyautogen>=0.2,<0.5; python_version >= '3.10'"] - ARG002 + pyright relaxation for monkey-patched framework code Adaptations from ateam source ----------------------------- - stratix.* imports → layerlens.* (BaseAdapter, CaptureConfig, PydanticCompat, MemoryEntry). - STRATIX docstrings/branding → LayerLens. - Monkey-patched private attributes (_stratix_tracer, _stratix_human_tracer, _stratix_original) gain _layerlens_* primary names with the _stratix_* aliases retained as deprecation shims for ateam-era consumers. - requires_pydantic explicit (V1_OR_V2) — adapter does not import pydantic directly. Acceptance ---------- - pytest tests/instrument/adapters/frameworks/test_autogen.py -x → 13 passed - mypy --strict src/layerlens/instrument/adapters/frameworks/autogen → Success: no issues found in 6 source files - ruff check src/layerlens/instrument/adapters/frameworks/autogen tests/instrument/adapters/frameworks/test_autogen.py → All checks passed - Lazy-import + default-install guards still pass for the frameworks surface (test_layerlens_import_does_not_pull_frameworks + test_instrument_import_does_not_pull_frameworks + test_default_install). - Sample runs end-to-end producing 15 events across 8 event types. Stacks on --------- feat/instrument-base-foundation (PR #93) — provides BaseAdapter, CaptureConfig, EventSink, AdapterRegistry, PydanticCompat, vendored event schemas. Cannot rebase off origin/main alone because the adapter literally subclasses BaseAdapter; merging this requires #93 first. Related ------- - Replaces the AutoGen slice of PR #96 (orchestration bundle) for the per-adapter fan-out review. - Sibling fan-out PRs ship LangChain, LangGraph, CrewAI, Agentforce, Langfuse separately. --- docs/adapters/frameworks/autogen.md | 117 ++++ pyproject.toml | 14 +- samples/instrument/autogen/__init__.py | 0 samples/instrument/autogen/main.py | 145 +++++ .../adapters/frameworks/__init__.py | 18 + .../adapters/frameworks/autogen/__init__.py | 56 ++ .../adapters/frameworks/autogen/groupchat.py | 176 +++++ .../frameworks/autogen/human_proxy.py | 144 +++++ .../adapters/frameworks/autogen/lifecycle.py | 603 ++++++++++++++++++ .../adapters/frameworks/autogen/metadata.py | 94 +++ .../adapters/frameworks/autogen/wrappers.py | 155 +++++ tests/instrument/adapters/__init__.py | 0 .../adapters/frameworks/__init__.py | 0 .../adapters/frameworks/test_autogen.py | 229 +++++++ 14 files changed, 1750 insertions(+), 1 deletion(-) create mode 100644 docs/adapters/frameworks/autogen.md create mode 100644 samples/instrument/autogen/__init__.py create mode 100644 samples/instrument/autogen/main.py create mode 100644 src/layerlens/instrument/adapters/frameworks/__init__.py create mode 100644 src/layerlens/instrument/adapters/frameworks/autogen/__init__.py create mode 100644 src/layerlens/instrument/adapters/frameworks/autogen/groupchat.py create mode 100644 src/layerlens/instrument/adapters/frameworks/autogen/human_proxy.py create mode 100644 src/layerlens/instrument/adapters/frameworks/autogen/lifecycle.py create mode 100644 src/layerlens/instrument/adapters/frameworks/autogen/metadata.py create mode 100644 src/layerlens/instrument/adapters/frameworks/autogen/wrappers.py create mode 100644 tests/instrument/adapters/__init__.py create mode 100644 tests/instrument/adapters/frameworks/__init__.py create mode 100644 tests/instrument/adapters/frameworks/test_autogen.py diff --git a/docs/adapters/frameworks/autogen.md b/docs/adapters/frameworks/autogen.md new file mode 100644 index 00000000..08d70762 --- /dev/null +++ b/docs/adapters/frameworks/autogen.md @@ -0,0 +1,117 @@ +# AutoGen framework adapter + +`layerlens.instrument.adapters.frameworks.autogen.AutoGenAdapter` instruments +[Microsoft AutoGen](https://github.com/microsoft/autogen) `ConversableAgent` +objects, capturing message exchange, LLM calls, code execution, and group-chat +turns. + +## Install + +```bash +pip install 'layerlens[autogen]' +``` + +Pulls `pyautogen>=0.2,<0.5`. + +## Quick start + +```python +from autogen import AssistantAgent, UserProxyAgent + +from layerlens import LayerLens +from layerlens.instrument.adapters.frameworks.autogen import ( + AutoGenAdapter, + instrument_agents, +) + +client = LayerLens() # picks up LAYERLENS_STRATIX_API_KEY / _BASE_URL + +adapter = AutoGenAdapter(stratix=client) +adapter.connect() + +assistant = AssistantAgent(name="assistant", llm_config={"model": "gpt-4o-mini"}) +user = UserProxyAgent(name="user", human_input_mode="NEVER", code_execution_config=False) + +adapter.connect_agents(assistant, user) +user.initiate_chat(assistant, message="What is 2+2?", max_turns=1) + +adapter.disconnect() +``` + +`instrument_agents(assistant, user, stratix=client)` is the one-line +equivalent of the `AutoGenAdapter(...).connect()` + `connect_agents(...)` +sequence above. + +For an offline / no-API-key demonstration, see +`samples/instrument/autogen/main.py` — it wires the adapter against an +in-process `EventSink` and a duck-typed agent so you can inspect the +emitted event stream without any external services. + +## What's wrapped + +`adapter.connect_agents(*agents)` monkey-patches the following on each +`ConversableAgent`: + +- `send` — emits `agent.handoff` for the outgoing message. +- `receive` — emits `agent.state.change` for the incoming message. +- `generate_reply` — emits `model.invoke` (with token usage when + available). +- `execute_code_blocks` — emits `tool.call` and `tool.environment` for + the code execution. + +The originals are stashed on the adapter and restored on `disconnect()`. +A `GroupChatTracer` wires similar hooks onto `GroupChatManager`, and a +`HumanProxyTracer` traces `get_human_input` for human-in-the-loop +proxies (emitting `agent.input` events with `role: "HUMAN"`). + +## Events emitted + +| Event | Layer | When | +|----------------------|----------------|--------------------------------------------------------------------------| +| `environment.config` | L4a | First time each agent is seen by `connect_agents`. | +| `agent.input` | L1 | `on_conversation_start` and human-input requests. | +| `agent.output` | L1 | `on_conversation_end` and group-chat termination. | +| `agent.handoff` | cross-cutting | Every `send` (carries `from_agent`, `to_agent`, `message_seq`). | +| `agent.state.change` | cross-cutting | Every `receive` (carries `agent`, `from_agent`, `message_preview`). | +| `agent.code` | L2 | Group-chat speaker selection (via `GroupChatTracer.on_speaker_selected`).| +| `tool.call` | L5a | Each `execute_code_blocks` call (`tool_name: "code_execution"`). | +| `tool.environment` | L5c | Each `execute_code_blocks` call (execution context details). | +| `model.invoke` | L3 | Each `generate_reply` (with token usage / latency / messages). | + +## AutoGen specifics + +- **Multi-agent attribution**: `agent_name`, `recipient_name`, and + `message_seq` (a monotonic counter) are included on every event so the + full chat can be reconstructed in order. +- **Group chats**: `GroupChatTracer` registers as a callback on + `GroupChatManager`, capturing the speaker-selection turns. Pass a + `GroupChatManager` to `connect_agents` alongside the participants. +- **Code execution**: when an agent runs code blocks, the language and + truncated code body emit `agent.code` (only if + `CaptureConfig.l2_agent_code` is enabled). + +## Capture config + +```python +from layerlens.instrument.adapters._base import CaptureConfig + +# Recommended. +adapter = AutoGenAdapter(capture_config=CaptureConfig.standard()) + +# Production-light: skip the verbose code-execution events. +adapter = AutoGenAdapter( + capture_config=CaptureConfig( + l1_agent_io=True, + l3_model_metadata=True, + l4a_environment_config=True, + l5a_tool_calls=True, + l2_agent_code=False, + ), +) +``` + +## BYOK + +AutoGen reads its `llm_config` to instantiate provider clients. The adapter +does not own those keys. For platform-managed BYOK see +`docs/adapters/byok.md` (atlas-app M1.B). diff --git a/pyproject.toml b/pyproject.toml index ae6d1dc7..8769b172 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -34,6 +34,11 @@ classifiers = [ [project.optional-dependencies] cli = ["click>=8.0.0"] +# --- Instrument layer: framework adapters --- +# Adding any extra below MUST keep the default `pip install layerlens` +# install set unchanged. Verified by `tests/instrument/test_default_install.py`. +autogen = ["pyautogen>=0.2,<0.5; python_version >= '3.10'"] + [project.urls] Homepage = "https://github.com/LayerLens/stratix-python" Repository = "https://github.com/LayerLens/stratix-python" @@ -139,14 +144,21 @@ known-first-party = ["openai", "tests"] "tests/**.py" = ["T201", "T203", "ARG", "B007"] "examples/**.py" = ["T201", "T203"] "src/layerlens/cli/**" = ["T201", "T203"] +# Framework callbacks have signatures dictated by upstream — unused +# arguments are part of the contract, not a code smell. +"src/layerlens/instrument/adapters/frameworks/**.py" = ["ARG002"] [tool.pyright] include = ["src", "tests"] exclude = ["**/__pycache__"] reportMissingTypeStubs = false -# Less strict settings for tests and cli +# Less strict settings for tests, cli, and the dynamic-monkey-patching +# framework adapter code. mypy --strict stays strict for these dirs; +# pyright is relaxed here because it can't follow runtime attribute +# mutation that the framework instrumentation relies on. executionEnvironments = [ { root = "src/layerlens/cli", reportMissingImports = false, reportFunctionMemberAccess = false, reportCallIssue = false, reportArgumentType = false, reportAttributeAccessIssue = false }, + { root = "src/layerlens/instrument/adapters/frameworks", reportPossiblyUnbound = false, reportPossiblyUnboundVariable = false, reportCallIssue = false, reportAttributeAccessIssue = false, reportArgumentType = false, reportMissingImports = false, reportFunctionMemberAccess = false }, { root = "tests", reportGeneralTypeIssues = false, reportOptionalSubscript = false, reportOptionalMemberAccess = false, reportUntypedFunctionDecorator = false, reportUnknownArgumentType = false, reportUnknownMemberType = false, reportUnknownVariableType = false, reportUnnecessaryIsInstance = false, reportUnnecessaryComparison = false, reportArgumentType = false, reportCallIssue = false }, ] diff --git a/samples/instrument/autogen/__init__.py b/samples/instrument/autogen/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/samples/instrument/autogen/main.py b/samples/instrument/autogen/main.py new file mode 100644 index 00000000..0fa8ea4a --- /dev/null +++ b/samples/instrument/autogen/main.py @@ -0,0 +1,145 @@ +"""Sample: instrument an AutoGen-style two-agent exchange with LayerLens. + +This sample is fully self-contained — no network, no API keys, no +``pyautogen`` install required. It demonstrates the +``AutoGenAdapter`` lifecycle by: + +1. Defining two minimal duck-typed agents that mimic AutoGen's + ``ConversableAgent`` interface (``name``, ``send``, ``receive``, + ``generate_reply``). +2. Connecting them through ``AutoGenAdapter.connect_agents``. +3. Driving a single send / generate_reply / receive round-trip. +4. Routing the resulting events through an in-process + :class:`EventSink` and printing the captured stream. + +To wire this against the real AutoGen runtime, replace the +``_FakeAgent`` instances with ``autogen.AssistantAgent`` / +``autogen.UserProxyAgent`` and provide ``llm_config`` with real +``OPENAI_API_KEY`` / ``ANTHROPIC_API_KEY`` credentials. + +Run:: + + python -m samples.instrument.autogen.main +""" + +from __future__ import annotations + +from typing import Any, Dict, List + +from layerlens.instrument.adapters._base import CaptureConfig +from layerlens.instrument.adapters._base.sinks import EventSink +from layerlens.instrument.adapters.frameworks.autogen import ( + AutoGenAdapter, + instrument_agents, +) + + +class _PrintSink(EventSink): + """Sink that prints each event and keeps an in-memory log.""" + + def __init__(self) -> None: + self.events: List[Dict[str, Any]] = [] + + def send(self, event_type: str, payload: Dict[str, Any], timestamp_ns: int) -> None: + self.events.append({"event_type": event_type, "payload": payload, "ts_ns": timestamp_ns}) + print(f" [{event_type}] {sorted(payload.keys())}") + + def flush(self) -> None: + pass + + def close(self) -> None: + pass + + +class _RecordingStratix: + """LayerLens-shaped client stub that the adapter emits into.""" + + def __init__(self) -> None: + self.events: List[Dict[str, Any]] = [] + + def emit(self, *args: Any, **kwargs: Any) -> None: + if len(args) == 2 and isinstance(args[0], str): + self.events.append({"event_type": args[0], "payload": args[1]}) + + +class _FakeAgent: + """Minimal duck-typed AutoGen ``ConversableAgent`` for offline demos.""" + + def __init__( + self, + name: str, + system_message: str = "", + llm_config: Dict[str, Any] | None = None, + canned_reply: str = "", + ) -> None: + self.name = name + self.system_message = system_message + self.llm_config = llm_config + self.human_input_mode = "NEVER" + self._canned_reply = canned_reply + self.received: List[Any] = [] + + def send(self, message: Any, recipient: Any, **_kwargs: Any) -> Any: + recipient.receive(message, sender=self) + + def receive(self, message: Any, sender: Any, **_kwargs: Any) -> Any: + self.received.append({"from": getattr(sender, "name", "?"), "message": message}) + + def generate_reply(self, messages: Any = None, sender: Any = None, **_kwargs: Any) -> Any: + return self._canned_reply + + def execute_code_blocks(self, code_blocks: Any) -> Any: + return f"executed {len(code_blocks)} blocks" + + +def main() -> int: + print("=== AutoGen adapter sample (mocked LLM) ===") + + stratix = _RecordingStratix() + sink = _PrintSink() + + adapter = AutoGenAdapter(stratix=stratix, capture_config=CaptureConfig.full()) + adapter.add_sink(sink) + adapter.connect() + + assistant = _FakeAgent( + name="assistant", + system_message="You are a concise assistant.", + llm_config={"model": "gpt-4o-mini", "temperature": 0}, + canned_reply="2 + 2 is 4.", + ) + user = _FakeAgent(name="user", canned_reply="thanks") + + instrument_agents(assistant, user, stratix=stratix) + adapter.connect_agents(assistant, user) + + print("-- agent.send round-trip --") + user.send("What is 2 + 2?", recipient=assistant) + + print("-- generate_reply --") + reply = assistant.generate_reply( + messages=[{"role": "user", "content": "What is 2 + 2?"}], + sender=user, + ) + print(f" reply: {reply!r}") + + print("-- execute_code_blocks --") + assistant.execute_code_blocks([("python", "print('hi')")]) + + print("-- conversation lifecycle --") + adapter.on_conversation_start(initiator=user, message="What is 2 + 2?") + adapter.on_conversation_end(final_message=reply, termination_reason="auto_reply") + + adapter.disconnect() + + print(f"\nTotal events captured by sink: {len(sink.events)}") + print(f"Total events captured by stratix client: {len(stratix.events)}") + print("Event types:") + for ev in stratix.events: + print(f" - {ev['event_type']}") + + return 0 + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/src/layerlens/instrument/adapters/frameworks/__init__.py b/src/layerlens/instrument/adapters/frameworks/__init__.py new file mode 100644 index 00000000..91cec946 --- /dev/null +++ b/src/layerlens/instrument/adapters/frameworks/__init__.py @@ -0,0 +1,18 @@ +"""Framework adapters for the LayerLens Instrument layer. + +Each framework adapter wraps an agent / chain framework's lifecycle to +intercept agent runs, model invocations, tool calls, state changes, and +handoffs, emitting events through the LayerLens telemetry pipeline. + +Adapters available (loaded on demand via :class:`AdapterRegistry`): + +* ``autogen`` — Microsoft AutoGen (group chat + lifecycle) + +Other framework adapters (LangChain, LangGraph, CrewAI, Agentforce, +Langfuse, Semantic Kernel, OpenAI Agents, etc.) ship in sibling M2 +fan-out PRs. + +Importing this package does NOT import any framework SDK. +""" + +from __future__ import annotations diff --git a/src/layerlens/instrument/adapters/frameworks/autogen/__init__.py b/src/layerlens/instrument/adapters/frameworks/autogen/__init__.py new file mode 100644 index 00000000..6e44cc22 --- /dev/null +++ b/src/layerlens/instrument/adapters/frameworks/autogen/__init__.py @@ -0,0 +1,56 @@ +""" +LayerLens AutoGen Adapter + +Integrates LayerLens tracing with the Microsoft AutoGen framework. + +Usage: + from layerlens.instrument.adapters.frameworks.autogen import ( + AutoGenAdapter, + instrument_agents, + GroupChatTracer, + HumanProxyTracer, + ) + + adapter = AutoGenAdapter(stratix=stratix_instance) + adapter.connect() + adapter.connect_agents(agent1, agent2) +""" + +from __future__ import annotations + +from typing import Any + +from layerlens.instrument.adapters.frameworks.autogen.groupchat import GroupChatTracer +from layerlens.instrument.adapters.frameworks.autogen.lifecycle import AutoGenAdapter +from layerlens.instrument.adapters.frameworks.autogen.human_proxy import HumanProxyTracer + +# Registry lazy-loading convention +ADAPTER_CLASS = AutoGenAdapter + + +def instrument_agents( + *agents: Any, stratix: Any = None, capture_config: dict[str, Any] | None = None +) -> Any: + """ + Convenience function to instrument AutoGen agents with LayerLens tracing. + + Args: + *agents: AutoGen ConversableAgent instances + stratix: LayerLens SDK / client instance to receive emitted events + capture_config: CaptureConfig to use + + Returns: + List of instrumented agents + """ + adapter = AutoGenAdapter(stratix=stratix, capture_config=capture_config) # type: ignore[arg-type] + adapter.connect() + return adapter.connect_agents(*agents) + + +__all__ = [ + "AutoGenAdapter", + "GroupChatTracer", + "HumanProxyTracer", + "instrument_agents", + "ADAPTER_CLASS", +] diff --git a/src/layerlens/instrument/adapters/frameworks/autogen/groupchat.py b/src/layerlens/instrument/adapters/frameworks/autogen/groupchat.py new file mode 100644 index 00000000..00f04ef3 --- /dev/null +++ b/src/layerlens/instrument/adapters/frameworks/autogen/groupchat.py @@ -0,0 +1,176 @@ +""" +AutoGen GroupChat Tracing + +Traces GroupChat speaker selection and turn management for multi-agent +conversations. +""" + +from __future__ import annotations + +import time +import logging +import threading +from typing import TYPE_CHECKING, Any +from collections.abc import Callable + +if TYPE_CHECKING: + from layerlens.instrument.adapters.frameworks.autogen.lifecycle import AutoGenAdapter + +logger = logging.getLogger(__name__) + + +class GroupChatTracer: + """ + Traces GroupChat speaker selection and turn management. + + Wraps GroupChatManager to intercept speaker selection, message routing, + and termination detection. + """ + + def __init__(self, adapter: AutoGenAdapter) -> None: + self._adapter = adapter + self._lock = threading.Lock() + self._message_seq: int = 0 + self._original_run_chat: Callable[..., Any] | None = None + + @property + def message_seq(self) -> int: + return self._message_seq + + def wrap_manager(self, manager: Any) -> Any: + """ + Wrap a GroupChatManager with tracing. + + Args: + manager: An AutoGen GroupChatManager instance + + Returns: + The wrapped manager (same object, modified in-place) + """ + if hasattr(manager, "run_chat"): + self._original_run_chat = manager.run_chat + manager.run_chat = self._create_traced_run_chat(manager, manager.run_chat) + manager._layerlens_tracer = self + # Deprecated alias retained so consumers porting from the + # ateam / STRATIX-prefixed naming keep working. Remove in v2. + manager._stratix_tracer = self + return manager + + def on_speaker_selected( + self, + method: str | None = None, + candidates: list[str] | None = None, + chosen: str | None = None, + ) -> None: + """ + Record a speaker selection event. + + Emits agent.code (L2) dict event for the selection. + """ + try: + self._adapter.emit_dict_event( + "agent.code", + { + "framework": "autogen", + "event_subtype": "speaker_selection", + "method": method, + "candidates": candidates, + "chosen": chosen, + "message_seq": self._message_seq, + }, + ) + except Exception: + logger.warning("Error emitting speaker selection event", exc_info=True) + + def on_message_routed( + self, + from_agent: str, + to_agent: str, + message: Any = None, + ) -> None: + """ + Record a message routing event. + + Emits agent.handoff (cross-cutting). + """ + with self._lock: + self._message_seq += 1 + msg_seq = self._message_seq + try: + self._adapter.emit_dict_event( + "agent.handoff", + { + "framework": "autogen", + "from_agent": from_agent, + "to_agent": to_agent, + "reason": "groupchat_routing", + "message_seq": msg_seq, + }, + ) + except Exception: + logger.warning("Error emitting message routing event", exc_info=True) + + def on_termination( + self, + reason: str | None = None, + final_speaker: str | None = None, + ) -> None: + """ + Record conversation termination. + + Emits agent.output (L1). + """ + try: + self._adapter.emit_dict_event( + "agent.output", + { + "framework": "autogen", + "event_subtype": "groupchat_termination", + "termination_reason": reason, + "final_speaker": final_speaker, + "total_messages": self._message_seq, + }, + ) + except Exception: + logger.warning("Error emitting termination event", exc_info=True) + + def _create_traced_run_chat( + self, + manager: Any, + original: Callable[..., Any], + ) -> Callable[..., Any]: + """Create a traced version of run_chat.""" + tracer = self + + def traced_run_chat(*args: Any, **kwargs: Any) -> Any: + start_ns = time.time_ns() + + try: + tracer._adapter.emit_dict_event( + "agent.input", + { + "framework": "autogen", + "event_subtype": "groupchat_start", + "timestamp_ns": start_ns, + }, + ) + except Exception: + logger.warning("Error emitting groupchat start", exc_info=True) + + result = original(*args, **kwargs) + + try: + # Source had an orphan duration calc here with no side + # effect. Removed during port — termination signal is the + # only thing the original code dispatched. + tracer.on_termination( + reason="run_chat_complete", + final_speaker=None, + ) + except Exception: + logger.warning("Error emitting groupchat end", exc_info=True) + + return result + + traced_run_chat._layerlens_original = original # type: ignore[attr-defined] + return traced_run_chat diff --git a/src/layerlens/instrument/adapters/frameworks/autogen/human_proxy.py b/src/layerlens/instrument/adapters/frameworks/autogen/human_proxy.py new file mode 100644 index 00000000..608aaeb4 --- /dev/null +++ b/src/layerlens/instrument/adapters/frameworks/autogen/human_proxy.py @@ -0,0 +1,144 @@ +""" +AutoGen Human-in-the-Loop Tracing + +Traces human interactions through UserProxyAgent, capturing requests, +responses, and approval patterns. +""" + +from __future__ import annotations + +import time +import logging +import threading +from typing import TYPE_CHECKING, Any +from collections.abc import Callable + +if TYPE_CHECKING: + from layerlens.instrument.adapters.frameworks.autogen.lifecycle import AutoGenAdapter + +logger = logging.getLogger(__name__) + + +class HumanProxyTracer: + """ + Traces human interactions through UserProxyAgent. + + Wraps get_human_input() to capture human requests and responses, + measure response latency, and detect approval patterns. + """ + + def __init__(self, adapter: AutoGenAdapter) -> None: + self._adapter = adapter + self._lock = threading.Lock() + self._original_get_human_input: Callable[..., Any] | None = None + self._interaction_count: int = 0 + + @property + def interaction_count(self) -> int: + return self._interaction_count + + def wrap_agent(self, agent: Any) -> Any: + """ + Wrap a UserProxyAgent with human interaction tracing. + + Args: + agent: An AutoGen UserProxyAgent instance + + Returns: + The wrapped agent (same object, modified in-place) + """ + if hasattr(agent, "get_human_input"): + self._original_get_human_input = agent.get_human_input + agent.get_human_input = self._create_traced_get_human_input( + agent, agent.get_human_input + ) + agent._layerlens_human_tracer = self + # Deprecated alias retained so consumers porting from the + # ateam / STRATIX-prefixed naming keep working. Remove in v2. + agent._stratix_human_tracer = self + return agent + + def _create_traced_get_human_input( + self, + agent: Any, + original: Callable[..., Any], + ) -> Callable[..., Any]: + """Create a traced version of get_human_input.""" + tracer = self + + def traced_get_human_input(prompt: str = "", **kwargs: Any) -> str: + start_ns = time.time_ns() + with tracer._lock: + tracer._interaction_count += 1 + interaction_seq = tracer._interaction_count + + # Emit request event + try: + agent_name = getattr(agent, "name", str(agent)) + tracer._adapter.emit_dict_event( + "agent.input", + { + "framework": "autogen", + "role": "HUMAN", + "input_type": "human_input_request", + "agent": agent_name, + "prompt": prompt[:500] if prompt else "", + "interaction_seq": interaction_seq, + }, + ) + except Exception: + logger.warning("Error emitting human input request", exc_info=True) + + # Call original + response = original(prompt, **kwargs) + + # Emit response event + try: + elapsed_ms = (time.time_ns() - start_ns) / 1_000_000 + input_type = tracer._classify_input(response) + tracer._adapter.emit_dict_event( + "agent.input", + { + "framework": "autogen", + "role": "HUMAN", + "input_type": input_type, + "agent": agent_name, + "response_preview": response[:500] if response else "", + "response_latency_ms": elapsed_ms, + "interaction_seq": interaction_seq, + }, + ) + except Exception: + logger.warning("Error emitting human input response", exc_info=True) + + return response # type: ignore[no-any-return] + + traced_get_human_input._layerlens_original = original # type: ignore[attr-defined] + return traced_get_human_input + + def _classify_input(self, response: str) -> str: + """ + Classify the type of human input. + + Returns: + Input type string: "approval", "rejection", "auto_reply", + "custom_input", or "empty" + """ + if not response: + return "empty" + + lower = response.strip().lower() + + # Auto-reply detection + if lower in ("", "exit"): + return "auto_reply" + + # Approval patterns + if lower in ("y", "yes", "approve", "ok", "okay", "sure", "proceed", "continue"): + return "approval" + + # Rejection patterns + if lower in ("n", "no", "reject", "deny", "stop", "cancel", "abort"): + return "rejection" + + return "custom_input" diff --git a/src/layerlens/instrument/adapters/frameworks/autogen/lifecycle.py b/src/layerlens/instrument/adapters/frameworks/autogen/lifecycle.py new file mode 100644 index 00000000..e3870987 --- /dev/null +++ b/src/layerlens/instrument/adapters/frameworks/autogen/lifecycle.py @@ -0,0 +1,603 @@ +""" +LayerLens AutoGen Lifecycle Hooks + +Provides the main AutoGenAdapter class with monkey-patch-based instrumentation +for AutoGen ConversableAgent instances. +""" + +from __future__ import annotations + +import time +import uuid +import logging +import threading +from typing import Any + +from layerlens.instrument.adapters._base.adapter import ( + AdapterInfo, + BaseAdapter, + AdapterHealth, + AdapterStatus, + ReplayableTrace, + AdapterCapability, +) +from layerlens.instrument.adapters._base.capture import CaptureConfig +from layerlens.instrument.adapters._base.pydantic_compat import PydanticCompat +from layerlens.instrument.adapters.frameworks.autogen.metadata import AutoGenAgentMetadataExtractor + +logger = logging.getLogger(__name__) + + +class AutoGenAdapter(BaseAdapter): + """ + Main adapter for integrating LayerLens with Microsoft AutoGen. + + Uses monkey-patching to intercept ConversableAgent methods (send, receive, + generate_reply, execute_code_blocks) and emit LayerLens telemetry events. + + Supports both new-style (stratix, capture_config) and legacy (stratix_instance) + constructor parameters. + + Usage: + adapter = AutoGenAdapter(stratix=stratix_instance) + adapter.connect() + adapter.connect_agents(agent1, agent2) + agent1.initiate_chat(agent2, message="Hello") + """ + + FRAMEWORK = "autogen" + VERSION = "0.1.0" + # The adapter source files import nothing from ``pydantic`` directly + # (verified by grep across ``frameworks/autogen/``). pyautogen 0.2.x + # supports both Pydantic majors; the adapter only monkey-patches + # ConversableAgent methods and emits dict events, never touching the + # framework's Pydantic models. + requires_pydantic = PydanticCompat.V1_OR_V2 + + def __init__( + self, + stratix: Any | None = None, + capture_config: CaptureConfig | None = None, + # Legacy param + stratix_instance: Any | None = None, + memory_service: Any | None = None, + ) -> None: + resolved_stratix = stratix or stratix_instance + super().__init__(stratix=resolved_stratix, capture_config=capture_config) + + self._metadata_extractor = AutoGenAgentMetadataExtractor() + self._adapter_lock = threading.Lock() + self._seen_agents: set[str] = set() + self._wrapped_agents: list[Any] = [] + self._originals: dict[int, dict[str, Any]] = {} # agent id -> original methods + self._message_seq: int = 0 + self._conversation_start_ns: int = 0 + self._framework_version: str | None = None + self._memory_service = memory_service + + # --- BaseAdapter lifecycle --- + + def connect(self) -> None: + """Verify AutoGen is importable and mark as connected.""" + try: + import autogen # type: ignore[import-not-found,unused-ignore] # noqa: F401 + + version = getattr(autogen, "__version__", "unknown") + logger.debug("AutoGen %s detected", version) + except ImportError: + logger.debug("AutoGen not installed; adapter usable in mock/test mode") + self._framework_version = self._detect_framework_version() + self._connected = True + self._status = AdapterStatus.HEALTHY + + def disconnect(self) -> None: + """Unwrap agents and disconnect.""" + for agent in self._wrapped_agents: + self._unwrap_agent(agent) + self._wrapped_agents.clear() + self._originals.clear() + self._connected = False + self._status = AdapterStatus.DISCONNECTED + + def health_check(self) -> AdapterHealth: + return AdapterHealth( + status=self._status, + framework_name=self.FRAMEWORK, + framework_version=self._framework_version, + adapter_version=self.VERSION, + error_count=self._error_count, + circuit_open=self._circuit_open, + ) + + def get_adapter_info(self) -> AdapterInfo: + return AdapterInfo( + name="AutoGenAdapter", + version=self.VERSION, + framework=self.FRAMEWORK, + framework_version=self._framework_version, + capabilities=[ + AdapterCapability.TRACE_TOOLS, + AdapterCapability.TRACE_MODELS, + AdapterCapability.TRACE_STATE, + AdapterCapability.TRACE_HANDOFFS, + ], + description="LayerLens adapter for Microsoft AutoGen framework", + ) + + def serialize_for_replay(self) -> ReplayableTrace: + return ReplayableTrace( + adapter_name="AutoGenAdapter", + framework=self.FRAMEWORK, + trace_id=str(uuid.uuid4()), + events=list(self._trace_events), + state_snapshots=[], + config={ + "capture_config": self._capture_config.model_dump(), + }, + ) + + # --- Agent wrapping --- + + def connect_agents(self, *agents: Any) -> list[Any]: + """ + Monkey-patch AutoGen agents with LayerLens tracing. + + Wraps send, receive, generate_reply, and execute_code_blocks methods. + Stores originals for unwrap on disconnect. + + Emits environment.config (L4a) on first encounter per agent. + + Args: + *agents: AutoGen ConversableAgent instances + + Returns: + List of wrapped agents (same objects, modified in-place) + """ + from layerlens.instrument.adapters.frameworks.autogen.wrappers import ( + create_traced_send, + create_traced_receive, + create_traced_execute_code, + create_traced_generate_reply, + ) + + result = [] + for agent in agents: + agent_id = id(agent) + if agent_id in self._originals: + result.append(agent) + continue + + originals: dict[str, Any] = {} + + # Wrap send + if hasattr(agent, "send"): + originals["send"] = agent.send + agent.send = create_traced_send(self, agent, agent.send) + + # Wrap receive + if hasattr(agent, "receive"): + originals["receive"] = agent.receive + agent.receive = create_traced_receive(self, agent, agent.receive) + + # Wrap generate_reply + if hasattr(agent, "generate_reply"): + originals["generate_reply"] = agent.generate_reply + agent.generate_reply = create_traced_generate_reply( + self, agent, agent.generate_reply + ) + + # Wrap execute_code_blocks + if hasattr(agent, "execute_code_blocks"): + originals["execute_code_blocks"] = agent.execute_code_blocks + agent.execute_code_blocks = create_traced_execute_code( + self, agent, agent.execute_code_blocks + ) + + self._originals[agent_id] = originals + self._wrapped_agents.append(agent) + + # Emit agent config on first encounter + self._emit_agent_config(agent) + + result.append(agent) + + return result + + def _unwrap_agent(self, agent: Any) -> None: + """Restore original methods on an agent.""" + agent_id = id(agent) + originals = self._originals.get(agent_id) + if not originals: + return + for method_name, original in originals.items(): + try: + setattr(agent, method_name, original) + except Exception: + logger.debug("Could not unwrap %s on agent", method_name, exc_info=True) + + # --- Lifecycle hooks (called by wrappers) --- + + def on_send( + self, + sender: Any, + message: Any, + recipient: Any, + ) -> None: + """ + Handle agent send. + + Emits agent.handoff (cross-cutting). + """ + with self._adapter_lock: + self._message_seq += 1 + msg_seq = self._message_seq + sender_name = getattr(sender, "name", str(sender)) + recipient_name = getattr(recipient, "name", str(recipient)) + + self.emit_dict_event( + "agent.handoff", + { + "framework": "autogen", + "from_agent": sender_name, + "to_agent": recipient_name, + "message_preview": self._truncate(self._message_content(message)), + "message_seq": msg_seq, + }, + ) + + def on_receive( + self, + receiver: Any, + message: Any, + sender: Any, + ) -> None: + """ + Handle agent receive. + + Emits agent.state.change (cross-cutting). + """ + receiver_name = getattr(receiver, "name", str(receiver)) + sender_name = getattr(sender, "name", str(sender)) if sender else None + + self.emit_dict_event( + "agent.state.change", + { + "framework": "autogen", + "agent": receiver_name, + "event_subtype": "message_received", + "from_agent": sender_name, + "message_preview": self._truncate(self._message_content(message)), + }, + ) + + def on_generate_reply( + self, + agent: Any, + messages: Any = None, + reply: Any = None, + latency_ms: float | None = None, + ) -> None: + """ + Handle reply generation. + + Emits model.invoke (L3). + """ + agent_name = getattr(agent, "name", str(agent)) + model = self._extract_model_name(agent) + + payload: dict[str, Any] = { + "framework": "autogen", + "agent": agent_name, + "model": model, + "reply_preview": self._truncate(self._message_content(reply)), + } + if latency_ms is not None: + payload["latency_ms"] = latency_ms + + # Extract token counts if available + token_usage = self._extract_token_usage_from_reply(reply) + if token_usage: + payload.update(token_usage) + + # Include messages for Prompt Lab extraction (gated by capture_content) + if self._capture_config.capture_content and messages: + normalized: list[dict[str, str]] = [] + # Prepend system message from agent config + sys_msg = self._extract_system_message(agent) + if sys_msg: + normalized.append({"role": "system", "content": self._truncate(sys_msg, 10_000)}) + if isinstance(messages, list): + for msg in messages: + if isinstance(msg, dict) and "role" in msg and "content" in msg: + normalized.append( + { + "role": str(msg["role"]), + "content": str(msg["content"])[:10_000], + } + ) + elif isinstance(msg, str): + normalized.append({"role": "user", "content": msg[:10_000]}) + if normalized: + payload["messages"] = normalized + + self.emit_dict_event("model.invoke", payload) + + def on_execute_code( + self, + agent: Any, + code_blocks: Any = None, + result: Any = None, + latency_ms: float | None = None, + ) -> None: + """ + Handle code execution. + + Emits tool.call (L5a) and tool.environment (L5c). + """ + agent_name = getattr(agent, "name", str(agent)) + + # tool.call for the code execution + self.emit_dict_event( + "tool.call", + { + "framework": "autogen", + "tool_name": "code_execution", + "agent": agent_name, + "code_blocks_count": len(code_blocks) if code_blocks else 0, + "result_preview": self._truncate(str(result)) if result else None, + "latency_ms": latency_ms, + }, + ) + + # tool.environment for execution environment details + self.emit_dict_event( + "tool.environment", + { + "framework": "autogen", + "agent": agent_name, + "execution_type": "code_block", + "code_blocks_count": len(code_blocks) if code_blocks else 0, + }, + ) + + def on_conversation_start( + self, + initiator: Any, + message: Any, + ) -> None: + """ + Handle conversation start. + + Emits agent.input (L1). + """ + with self._adapter_lock: + self._conversation_start_ns = time.time_ns() + initiator_name = getattr(initiator, "name", str(initiator)) + + self.emit_dict_event( + "agent.input", + { + "framework": "autogen", + "initiator": initiator_name, + "message": self._safe_serialize(message), + "timestamp_ns": self._conversation_start_ns, + }, + ) + + def on_conversation_end( + self, + final_message: Any = None, + termination_reason: str | None = None, + ) -> None: + """ + Handle conversation end. + + Emits agent.output (L1). + """ + end_ns = time.time_ns() + duration_ns = end_ns - self._conversation_start_ns if self._conversation_start_ns else 0 + + self.emit_dict_event( + "agent.output", + { + "framework": "autogen", + "final_message": self._safe_serialize(final_message), + "termination_reason": termination_reason, + "duration_ns": duration_ns, + }, + ) + + # --- Memory integration --- + + def on_message( + self, + agent_id: str, + message: Any, + ) -> None: + """Store an agent message as episodic memory. + + Only active when ``memory_service`` is provided. Failures are + logged and swallowed to avoid disrupting normal operation. + + Args: + agent_id: Identifier of the agent that sent/received the message. + message: The message content to store. + """ + if self._memory_service is None: + return + + try: + from layerlens.instrument._vendored.memory_models import MemoryEntry + + content = self._message_content(message) + timestamp = int(time.time()) + entry = MemoryEntry( + org_id=getattr(self._stratix, "org_id", ""), + agent_id=agent_id, + memory_type="episodic", + key=f"message_{timestamp}", + content=content[:4000], + importance=0.4, + metadata={"source": "autogen_adapter"}, + ) + self._memory_service.store(entry) + except Exception: + logger.debug( + "Failed to store episodic memory for agent %s", + agent_id, + exc_info=True, + ) + + def on_conversation_end_memory( + self, + agent_id: str, + summary: str, + ) -> None: + """Consolidate a conversation summary into semantic memory. + + Only active when ``memory_service`` is provided. Failures are + logged and swallowed. + + Args: + agent_id: Agent whose conversation to consolidate. + summary: High-level summary of the conversation. + """ + if self._memory_service is None: + return + + try: + from layerlens.instrument._vendored.memory_models import MemoryEntry + + timestamp = int(time.time()) + entry = MemoryEntry( + org_id=getattr(self._stratix, "org_id", ""), + agent_id=agent_id, + memory_type="semantic", + key=f"conversation_summary_{timestamp}", + content=summary[:4000], + importance=0.7, + metadata={"source": "autogen_adapter", "type": "conversation_consolidation"}, + ) + self._memory_service.store(entry) + except Exception: + logger.debug( + "Failed to store conversation summary for agent %s", + agent_id, + exc_info=True, + ) + + # --- Agent config emission --- + + def _emit_agent_config(self, agent: Any) -> None: + """Emit environment.config for an agent on first encounter.""" + name = getattr(agent, "name", None) or str(agent) + with self._adapter_lock: + if name in self._seen_agents: + return + self._seen_agents.add(name) + + metadata = self._metadata_extractor.extract(agent) + + self.emit_dict_event( + "environment.config", + { + "framework": "autogen", + **metadata, + }, + ) + + # --- Internal helpers --- + + def _safe_serialize(self, value: Any) -> Any: + """Safely serialize a value for events.""" + try: + if value is None: + return None + if hasattr(value, "model_dump"): + return value.model_dump() + if hasattr(value, "dict"): + return value.dict() + if isinstance(value, dict): + return dict(value) + if isinstance(value, (str, int, float, bool)): + return value + return str(value) + except Exception: + return str(value) + + def _message_content(self, message: Any) -> str: + """Extract string content from a message.""" + if message is None: + return "" + if isinstance(message, str): + return message + if isinstance(message, dict): + return str(message.get("content", message)) + return str(message) + + def _truncate(self, text: str, max_len: int = 500) -> str: + """Truncate text to max_len.""" + if len(text) <= max_len: + return text + return text[:max_len] + "..." + + def _extract_system_message(self, agent: Any) -> str | None: + """Extract system message from agent config.""" + try: + # AutoGen 0.2.x: agent.system_message + sys_msg = getattr(agent, "system_message", None) + if sys_msg: + return str(sys_msg) + # AutoGen 0.4+/agentchat: agent._system_messages + sys_msgs = getattr(agent, "_system_messages", None) + if sys_msgs and isinstance(sys_msgs, list) and sys_msgs: + first = sys_msgs[0] + content = getattr(first, "content", None) or str(first) + return str(content) + except Exception: + pass + return None + + def _extract_model_name(self, agent: Any) -> str | None: + """Extract model name from agent's llm_config.""" + try: + llm_config = getattr(agent, "llm_config", None) + if not llm_config or not isinstance(llm_config, dict): + return None + if "model" in llm_config: + return llm_config["model"] # type: ignore[no-any-return] + config_list = llm_config.get("config_list", []) + if config_list and isinstance(config_list[0], dict): + return config_list[0].get("model") + except Exception: + pass + return None + + def _extract_token_usage_from_reply(self, reply: Any) -> dict[str, Any] | None: + """Extract token usage from a reply if available.""" + if reply is None: + return None + try: + usage = getattr(reply, "usage", None) + if usage: + if isinstance(usage, dict): + return { + "tokens_prompt": usage.get("prompt_tokens"), + "tokens_completion": usage.get("completion_tokens"), + } + return { + "tokens_prompt": getattr(usage, "prompt_tokens", None), + "tokens_completion": getattr(usage, "completion_tokens", None), + } + except Exception: + pass + return None + + @staticmethod + def _detect_framework_version() -> str | None: + try: + import autogen # type: ignore[import-not-found,unused-ignore] + + return getattr(autogen, "__version__", None) + except ImportError: + return None diff --git a/src/layerlens/instrument/adapters/frameworks/autogen/metadata.py b/src/layerlens/instrument/adapters/frameworks/autogen/metadata.py new file mode 100644 index 00000000..2d9ab934 --- /dev/null +++ b/src/layerlens/instrument/adapters/frameworks/autogen/metadata.py @@ -0,0 +1,94 @@ +""" +AutoGen Agent Metadata Extraction + +Extracts agent metadata for L4a (environment.config) emission. +""" + +from __future__ import annotations + +import logging +from typing import Any + +logger = logging.getLogger(__name__) + + +class AutoGenAgentMetadataExtractor: + """Extracts AutoGen agent metadata for environment.config emission.""" + + def extract(self, agent: Any) -> dict[str, Any]: + """ + Extract metadata from an AutoGen ConversableAgent. + + Args: + agent: An AutoGen ConversableAgent instance + + Returns: + Dict of agent metadata + """ + metadata: dict[str, Any] = {} + + # Agent name + try: + metadata["name"] = getattr(agent, "name", str(agent)) + except Exception: + metadata["name"] = "" + + # System message + try: + system_message = getattr(agent, "system_message", None) + if system_message is not None: + metadata["system_message"] = ( + system_message[:500] if len(system_message) > 500 else system_message + ) + except Exception: + pass + + # Human input mode + try: + him = getattr(agent, "human_input_mode", None) + if him is not None: + metadata["human_input_mode"] = him + except Exception: + pass + + # LLM config + try: + llm_config = getattr(agent, "llm_config", None) + if llm_config and isinstance(llm_config, dict): + safe_config: dict[str, Any] = {} + if "model" in llm_config: + safe_config["model"] = llm_config["model"] + if "config_list" in llm_config: + models = [] + for cfg in llm_config["config_list"]: + if isinstance(cfg, dict) and "model" in cfg: + models.append(cfg["model"]) + if models: + safe_config["models"] = models + if "temperature" in llm_config: + safe_config["temperature"] = llm_config["temperature"] + metadata["llm_config"] = safe_config + except Exception: + pass + + # Max consecutive auto reply + try: + max_reply = getattr(agent, "max_consecutive_auto_reply", None) + if max_reply is not None: + metadata["max_consecutive_auto_reply"] = max_reply + except Exception: + pass + + # Code execution config + try: + code_config = getattr(agent, "code_execution_config", None) + if code_config and isinstance(code_config, dict): + safe_code_config: dict[str, Any] = {} + for key in ("work_dir", "use_docker", "timeout", "last_n_messages"): + if key in code_config: + safe_code_config[key] = code_config[key] + metadata["code_execution_config"] = safe_code_config + except Exception: + pass + + return metadata diff --git a/src/layerlens/instrument/adapters/frameworks/autogen/wrappers.py b/src/layerlens/instrument/adapters/frameworks/autogen/wrappers.py new file mode 100644 index 00000000..cfa23f9f --- /dev/null +++ b/src/layerlens/instrument/adapters/frameworks/autogen/wrappers.py @@ -0,0 +1,155 @@ +""" +AutoGen Method Wrappers + +Creates traced versions of ConversableAgent methods that intercept calls +and route events to the AutoGenAdapter lifecycle hooks. + +All wrappers preserve the original method's behavior and handle adapter +exceptions silently to prevent tracing from breaking the application. +""" + +from __future__ import annotations + +import time +import logging +from typing import TYPE_CHECKING, Any +from collections.abc import Callable + +if TYPE_CHECKING: + from layerlens.instrument.adapters.frameworks.autogen.lifecycle import AutoGenAdapter + +logger = logging.getLogger(__name__) + + +def create_traced_send( + adapter: AutoGenAdapter, + agent: Any, + original_send: Callable[..., Any], +) -> Callable[..., Any]: + """ + Create a traced version of agent.send(). + + Captures the message being sent and the recipient, then delegates + to the original send method. + """ + + def traced_send(message: Any, recipient: Any, **kwargs: Any) -> Any: + try: + adapter.on_send(sender=agent, message=message, recipient=recipient) + except Exception: + logger.warning("Error in traced send pre-hook", exc_info=True) + + return original_send(message, recipient, **kwargs) + + traced_send._layerlens_original = original_send # type: ignore[attr-defined] + return traced_send + + +def create_traced_receive( + adapter: AutoGenAdapter, + agent: Any, + original_receive: Callable[..., Any], +) -> Callable[..., Any]: + """ + Create a traced version of agent.receive(). + + Captures the received message and sender, then delegates + to the original receive method. + """ + + def traced_receive(message: Any, sender: Any, **kwargs: Any) -> Any: + try: + adapter.on_receive(receiver=agent, message=message, sender=sender) + except Exception: + logger.warning("Error in traced receive pre-hook", exc_info=True) + + return original_receive(message, sender, **kwargs) + + traced_receive._layerlens_original = original_receive # type: ignore[attr-defined] + return traced_receive + + +def create_traced_generate_reply( + adapter: AutoGenAdapter, + agent: Any, + original_generate_reply: Callable[..., Any], +) -> Callable[..., Any]: + """ + Create a traced version of agent.generate_reply(). + + Captures timing and the generated reply, then delegates to the + original method. + """ + + def traced_generate_reply(messages: Any = None, sender: Any = None, **kwargs: Any) -> Any: + start_ns = time.time_ns() + error: Exception | None = None + + try: + reply = original_generate_reply(messages=messages, sender=sender, **kwargs) + except Exception as exc: + error = exc + reply = None + raise + finally: + try: + elapsed_ms = (time.time_ns() - start_ns) / 1_000_000 + if error is not None: + # Emit model.invoke with error information for failed calls + adapter.emit_dict_event( + "model.invoke", + { + "framework": "autogen", + "agent": getattr(agent, "name", str(agent)), + "model": adapter._extract_model_name(agent), + "latency_ms": elapsed_ms, + "error": str(error), + }, + ) + else: + adapter.on_generate_reply( + agent=agent, + messages=messages, + reply=reply, + latency_ms=elapsed_ms, + ) + except Exception: + logger.warning("Error in traced generate_reply post-hook", exc_info=True) + + return reply + + traced_generate_reply._layerlens_original = original_generate_reply # type: ignore[attr-defined] + return traced_generate_reply + + +def create_traced_execute_code( + adapter: AutoGenAdapter, + agent: Any, + original_execute_code: Callable[..., Any], +) -> Callable[..., Any]: + """ + Create a traced version of agent.execute_code_blocks(). + + Captures code blocks, execution result, and timing. + """ + + def traced_execute_code(code_blocks: Any, **kwargs: Any) -> Any: + start_ns = time.time_ns() + + result = original_execute_code(code_blocks, **kwargs) + + try: + elapsed_ms = (time.time_ns() - start_ns) / 1_000_000 + adapter.on_execute_code( + agent=agent, + code_blocks=code_blocks, + result=result, + latency_ms=elapsed_ms, + ) + except Exception: + logger.warning("Error in traced execute_code post-hook", exc_info=True) + + return result + + traced_execute_code._layerlens_original = original_execute_code # type: ignore[attr-defined] + return traced_execute_code diff --git a/tests/instrument/adapters/__init__.py b/tests/instrument/adapters/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/instrument/adapters/frameworks/__init__.py b/tests/instrument/adapters/frameworks/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/instrument/adapters/frameworks/test_autogen.py b/tests/instrument/adapters/frameworks/test_autogen.py new file mode 100644 index 00000000..a3cde4cb --- /dev/null +++ b/tests/instrument/adapters/frameworks/test_autogen.py @@ -0,0 +1,229 @@ +"""Unit tests for the AutoGen framework adapter. + +Mocked at the SDK shape level — no real ``autogen`` runtime needed. +""" + +from __future__ import annotations + +from typing import Any, Dict, List + +from layerlens.instrument.adapters._base import AdapterStatus, CaptureConfig +from layerlens.instrument.adapters.frameworks.autogen import ( + ADAPTER_CLASS, + AutoGenAdapter, + instrument_agents, +) + + +class _RecordingStratix: + def __init__(self) -> None: + self.events: List[Dict[str, Any]] = [] + + def emit(self, *args: Any, **kwargs: Any) -> None: + if len(args) == 2 and isinstance(args[0], str): + self.events.append({"event_type": args[0], "payload": args[1]}) + + +class _FakeAgent: + """Minimal duck-typed AutoGen ConversableAgent for tests.""" + + def __init__( + self, + name: str = "agent", + system_message: Any = None, + llm_config: Any = None, + ) -> None: + self.name = name + self.system_message = system_message + self.llm_config = llm_config + + def send(self, message: Any, recipient: Any, **kwargs: Any) -> Any: + return None + + def receive(self, message: Any, sender: Any, **kwargs: Any) -> Any: + return None + + def generate_reply(self, messages: Any = None, sender: Any = None, **kwargs: Any) -> Any: + return "reply" + + def execute_code_blocks(self, code_blocks: Any) -> Any: + return "exec result" + + +def test_adapter_class_export() -> None: + assert ADAPTER_CLASS is AutoGenAdapter + + +def test_lifecycle() -> None: + a = AutoGenAdapter() + a.connect() + assert a.status == AdapterStatus.HEALTHY + a.disconnect() + assert a.status == AdapterStatus.DISCONNECTED + + +def test_adapter_info_and_health() -> None: + a = AutoGenAdapter() + a.connect() + info = a.get_adapter_info() + assert info.framework == "autogen" + assert info.name == "AutoGenAdapter" + health = a.health_check() + assert health.framework_name == "autogen" + + +def test_connect_agents_wraps_methods_and_emits_config() -> None: + stratix = _RecordingStratix() + adapter = AutoGenAdapter(stratix=stratix, capture_config=CaptureConfig.full()) + adapter.connect() + + agent = _FakeAgent(name="alice", llm_config={"model": "gpt-5"}) + adapter.connect_agents(agent) + + # Methods replaced. + assert agent.send.__name__ == "traced_send" + assert agent.receive.__name__ == "traced_receive" + assert agent.generate_reply.__name__ == "traced_generate_reply" + assert agent.execute_code_blocks.__name__ == "traced_execute_code" + + # environment.config emitted once for this agent. + configs = [e for e in stratix.events if e["event_type"] == "environment.config"] + assert len(configs) == 1 + + adapter.disconnect() + # Original methods restored. + assert agent.send.__name__ == "send" + + +def test_connect_agents_idempotent() -> None: + """Calling connect_agents twice with the same agent does not double-wrap.""" + stratix = _RecordingStratix() + adapter = AutoGenAdapter(stratix=stratix, capture_config=CaptureConfig.full()) + adapter.connect() + agent = _FakeAgent(name="alice") + adapter.connect_agents(agent) + adapter.connect_agents(agent) + + configs = [e for e in stratix.events if e["event_type"] == "environment.config"] + assert len(configs) == 1 + + +def test_on_send_emits_handoff() -> None: + stratix = _RecordingStratix() + adapter = AutoGenAdapter(stratix=stratix, capture_config=CaptureConfig.full()) + adapter.connect() + + sender = _FakeAgent(name="alice") + recipient = _FakeAgent(name="bob") + adapter.on_send(sender=sender, message="hi there", recipient=recipient) + + evt = next(e for e in stratix.events if e["event_type"] == "agent.handoff") + assert evt["payload"]["from_agent"] == "alice" + assert evt["payload"]["to_agent"] == "bob" + assert evt["payload"]["message_seq"] == 1 + + +def test_on_receive_emits_state_change() -> None: + stratix = _RecordingStratix() + adapter = AutoGenAdapter(stratix=stratix, capture_config=CaptureConfig.full()) + adapter.connect() + + receiver = _FakeAgent(name="bob") + sender = _FakeAgent(name="alice") + adapter.on_receive(receiver=receiver, message={"content": "hello"}, sender=sender) + + evt = next(e for e in stratix.events if e["event_type"] == "agent.state.change") + assert evt["payload"]["agent"] == "bob" + assert evt["payload"]["from_agent"] == "alice" + + +def test_on_generate_reply_emits_model_invoke() -> None: + stratix = _RecordingStratix() + adapter = AutoGenAdapter(stratix=stratix, capture_config=CaptureConfig.full()) + adapter.connect() + + agent = _FakeAgent(name="alice", llm_config={"model": "gpt-5"}) + reply = type("Reply", (), {"usage": {"prompt_tokens": 10, "completion_tokens": 5}})() + adapter.on_generate_reply(agent=agent, messages=[{"role": "user", "content": "hi"}], reply=reply, latency_ms=42.0) + + evt = next(e for e in stratix.events if e["event_type"] == "model.invoke") + assert evt["payload"]["agent"] == "alice" + assert evt["payload"]["model"] == "gpt-5" + assert evt["payload"]["latency_ms"] == 42.0 + assert evt["payload"]["tokens_prompt"] == 10 + + +def test_on_execute_code_emits_tool_call_and_environment() -> None: + stratix = _RecordingStratix() + adapter = AutoGenAdapter(stratix=stratix, capture_config=CaptureConfig.full()) + adapter.connect() + + agent = _FakeAgent(name="alice") + adapter.on_execute_code(agent=agent, code_blocks=[("python", "print(1)")], result="1\n", latency_ms=5.0) + + types = [e["event_type"] for e in stratix.events] + assert "tool.call" in types + assert "tool.environment" in types + tool = next(e for e in stratix.events if e["event_type"] == "tool.call") + assert tool["payload"]["tool_name"] == "code_execution" + assert tool["payload"]["code_blocks_count"] == 1 + + +def test_on_conversation_start_end_emits_input_output() -> None: + stratix = _RecordingStratix() + adapter = AutoGenAdapter(stratix=stratix, capture_config=CaptureConfig.full()) + adapter.connect() + + initiator = _FakeAgent(name="alice") + adapter.on_conversation_start(initiator=initiator, message="start") + adapter.on_conversation_end(final_message="bye", termination_reason="max_rounds") + + types = [e["event_type"] for e in stratix.events] + assert "agent.input" in types + assert "agent.output" in types + + out = next(e for e in stratix.events if e["event_type"] == "agent.output") + assert out["payload"]["termination_reason"] == "max_rounds" + assert out["payload"]["duration_ns"] >= 0 + + +def test_capture_config_gates_l3_model_metadata() -> None: + stratix = _RecordingStratix() + cfg = CaptureConfig(l3_model_metadata=False) + adapter = AutoGenAdapter(stratix=stratix, capture_config=cfg) + adapter.connect() + + agent = _FakeAgent(name="alice", llm_config={"model": "gpt-5"}) + adapter.on_generate_reply(agent=agent, reply="hi") + sender = _FakeAgent(name="alice") + recipient = _FakeAgent(name="bob") + adapter.on_send(sender=sender, message="x", recipient=recipient) + + types = [e["event_type"] for e in stratix.events] + assert "model.invoke" not in types + # handoff (from on_send) is cross-cutting / always enabled. + assert "agent.handoff" in types + + +def test_instrument_agents_helper() -> None: + """Top-level convenience wraps multiple agents at once.""" + a = _FakeAgent(name="a") + b = _FakeAgent(name="b") + result = instrument_agents(a, b) + assert isinstance(result, list) + assert len(result) == 2 + # Both wrapped. + assert a.send.__name__ == "traced_send" + assert b.send.__name__ == "traced_send" + + +def test_serialize_for_replay() -> None: + adapter = AutoGenAdapter( + stratix=_RecordingStratix(), + capture_config=CaptureConfig.full(), + ) + adapter.connect() + rt = adapter.serialize_for_replay() + assert rt.framework == "autogen" + assert rt.adapter_name == "AutoGenAdapter" + assert "capture_config" in rt.config From b08d75e7a12e5ab6e3f2cdab747826a8e2d2f345 Mon Sep 17 00:00:00 2001 From: mmercuri Date: Sun, 10 May 2026 08:47:07 -0700 Subject: [PATCH 2/2] fix(instrument): declare REPLAY capability for autogen (closes PR #119 deferred) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit AutoGenAdapter implements serialize_for_replay (returns a populated ReplayableTrace with events + state snapshots + capture config), but get_adapter_info().capabilities did not declare AdapterCapability.REPLAY. The atlas-app catalog UI reads that list to surface replay support, so customers were told they could not replay traces from AutoGen even though the adapter supports it. PR #119 (brand leak + capability declarations) wired REPLAY for the adapters that lived on its branch; AutoGen was deferred because it lives on its own source-port branch (PR #101). This closes that deferral per CLAUDE.md item 5/11. STREAMING is intentionally NOT declared. Per CLAUDE.md 'no fake claims', a capability is only declared if the adapter actually implements it. The AutoGen integration wraps send / receive / generate_reply / execute_code as discrete method calls rather than streaming entry-points — no per-chunk events flow through the adapter. Tests: added test_declares_replay_capability regression asserting REPLAY appears and STREAMING does NOT. Verification: * uv run --with pytest python -m pytest tests/instrument/adapters/frameworks/test_autogen.py -x -> 14 passed --- .../adapters/frameworks/autogen/lifecycle.py | 1 + .../adapters/frameworks/test_autogen.py | 17 +++++++++++++++++ 2 files changed, 18 insertions(+) diff --git a/src/layerlens/instrument/adapters/frameworks/autogen/lifecycle.py b/src/layerlens/instrument/adapters/frameworks/autogen/lifecycle.py index e3870987..d639000d 100644 --- a/src/layerlens/instrument/adapters/frameworks/autogen/lifecycle.py +++ b/src/layerlens/instrument/adapters/frameworks/autogen/lifecycle.py @@ -120,6 +120,7 @@ def get_adapter_info(self) -> AdapterInfo: AdapterCapability.TRACE_MODELS, AdapterCapability.TRACE_STATE, AdapterCapability.TRACE_HANDOFFS, + AdapterCapability.REPLAY, ], description="LayerLens adapter for Microsoft AutoGen framework", ) diff --git a/tests/instrument/adapters/frameworks/test_autogen.py b/tests/instrument/adapters/frameworks/test_autogen.py index a3cde4cb..0e1ca75a 100644 --- a/tests/instrument/adapters/frameworks/test_autogen.py +++ b/tests/instrument/adapters/frameworks/test_autogen.py @@ -8,6 +8,7 @@ from typing import Any, Dict, List from layerlens.instrument.adapters._base import AdapterStatus, CaptureConfig +from layerlens.instrument.adapters._base.adapter import AdapterCapability from layerlens.instrument.adapters.frameworks.autogen import ( ADAPTER_CLASS, AutoGenAdapter, @@ -72,6 +73,22 @@ def test_adapter_info_and_health() -> None: assert health.framework_name == "autogen" +def test_declares_replay_capability() -> None: + """Catalog UI relies on declared capabilities. AutoGen implements + ``serialize_for_replay`` (returns a populated ReplayableTrace), so the + REPLAY capability must surface via ``info().capabilities``. + + STREAMING is intentionally NOT declared: the AutoGen integration is + method-wrapping (send / receive / generate_reply / execute_code) + rather than per-chunk streaming. Per CLAUDE.md 'no fake claims', + the adapter does not declare a capability it does not implement. + """ + a = AutoGenAdapter() + caps = a.info().capabilities + assert AdapterCapability.REPLAY in caps + assert AdapterCapability.STREAMING not in caps + + def test_connect_agents_wraps_methods_and_emits_config() -> None: stratix = _RecordingStratix() adapter = AutoGenAdapter(stratix=stratix, capture_config=CaptureConfig.full())