From e8d3250e405e8cbd44b2677b303ab43861d24ca8 Mon Sep 17 00:00:00 2001 From: mmercuri Date: Sat, 25 Apr 2026 22:34:26 -0700 Subject: [PATCH 1/2] feat(instrument): port LangGraph framework adapter (M2) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit M2 fan-out work — completes the LangGraph adapter port that landed mechanically in the M1.C orchestration PR (#96) by adding: * Round-2 item 23: ``STRATIXLangGraphAdapter`` is now a deprecation alias served via PEP 562 ``__getattr__`` on the package. Accessing the old name returns ``LayerLensLangGraphAdapter`` and raises a ``DeprecationWarning`` pointing at the v2.0 removal — silent on bare ``import`` so existing star-imports stay quiet. * Round-2 item 20: ``get_adapter_info()`` now sets ``requires_pydantic = PydanticCompat.V2_ONLY`` explicitly (was V1_OR_V2 default), so the catalog manifest emitter reports the right compat badge regardless of which entrypoint it calls. * tests/instrument/adapters/frameworks/test_langgraph.py — 29 unit tests modelled after the SmolAgents / Langfuse adapter test pattern. Mocks LangGraph at the SDK shape level (``_FakeCompiledGraph`` with ``invoke`` / ``ainvoke``); covers lifecycle, info(), alias deprecation, graph wrapping (success + error), handoff detection, ``trace_node`` / ``trace_langgraph_tool`` decorators, ``wrap_llm_for_langgraph`` (sync + async), state adapter, and replay serialization. * samples/instrument/langgraph/main.py — expanded from 1 node to a 3-node ``planner -> researcher -> writer`` graph. LLM call inside the writer is mocked (no API key); ``HttpEventSink`` import is best-effort so the sample runs even before the M1.E transport PR (#98) lands. * samples/instrument/langgraph/README.md — usage, topology, expected output, "going to production" steps. * docs/adapters/frameworks-langgraph.md — clarified backward-compat section with the new DeprecationWarning behaviour and added a Pydantic compatibility section. Source: ateam/stratix/sdk/python/adapters/langgraph/ (~2,248 LOC, 7 files). Reference template: layerlens/instrument/adapters/frameworks/langchain/. Acceptance: * uv run pytest tests/instrument/adapters/frameworks/test_langgraph.py --> 29 passed * uv run mypy --strict src/layerlens/instrument/adapters/frameworks/langgraph --> Success: no issues found in 7 source files * uv run ruff check src/.../langgraph tests/.../test_langgraph.py --> All checks passed * test_lazy_imports + test_default_install guards still green * python samples/instrument/langgraph/main.py exits 0 with the 3-node graph result printed and event counts surfaced. Stacks on: feat/instrument-frameworks-orchestration (PR #96). --- docs/adapters/frameworks-langgraph.md | 16 +- samples/instrument/langgraph/README.md | 72 +++ samples/instrument/langgraph/main.py | 184 ++++-- .../adapters/frameworks/langgraph/__init__.py | 40 +- .../frameworks/langgraph/lifecycle.py | 6 + .../adapters/frameworks/test_langgraph.py | 534 ++++++++++++++++++ 6 files changed, 814 insertions(+), 38 deletions(-) create mode 100644 samples/instrument/langgraph/README.md create mode 100644 tests/instrument/adapters/frameworks/test_langgraph.py diff --git a/docs/adapters/frameworks-langgraph.md b/docs/adapters/frameworks-langgraph.md index c46fc9d3..7814bdc5 100644 --- a/docs/adapters/frameworks-langgraph.md +++ b/docs/adapters/frameworks-langgraph.md @@ -110,5 +110,17 @@ For platform-managed BYOK see `docs/adapters/byok.md` (atlas-app M1.B). from layerlens.instrument.adapters.frameworks.langgraph import STRATIXLangGraphAdapter ``` -`STRATIXLangGraphAdapter` is an alias for `LayerLensLangGraphAdapter` and will -be removed in v2.0. +`STRATIXLangGraphAdapter` is a deprecated alias for `LayerLensLangGraphAdapter`. +Accessing the alias raises a `DeprecationWarning` (visible under +`pytest -W error::DeprecationWarning`) and the symbol will be removed in +LayerLens v2.0. Update imports to `LayerLensLangGraphAdapter`. + +## Pydantic compatibility + +LangGraph >= 0.2 inherits `langchain-core`'s **Pydantic v2-only** requirement. +The adapter declares `requires_pydantic = PydanticCompat.V2_ONLY` on both the +class attribute and the `info()` / `get_adapter_info()` manifest payload, so +the catalog UI can warn users before they pin an incompatible runtime. +Importing `layerlens.instrument.adapters.frameworks.langgraph` under Pydantic +v1 raises a clear `RuntimeError` rather than letting LangChain raise an +opaque `ImportError` mid-callback. diff --git a/samples/instrument/langgraph/README.md b/samples/instrument/langgraph/README.md new file mode 100644 index 00000000..38819304 --- /dev/null +++ b/samples/instrument/langgraph/README.md @@ -0,0 +1,72 @@ +# LayerLens + LangGraph sample + +A self-contained 3-node LangGraph state machine wrapped with +`LayerLensLangGraphAdapter`. Designed as a smoke test for the adapter +plumbing and as a worked example of multi-agent tracing — no API keys, no +network round-trips, no external services required. + +## What it shows + +- **`adapter.wrap_graph(compiled)`** — proxies `invoke` / `ainvoke` and + emits `environment.config` + `agent.input` + `agent.output` per execution. +- **`agent.state.change`** — emitted whenever the canonical state hash + changes between snapshots (each of the three nodes mutates state, so + this fires once per run). +- **`HandoffDetector`** — attached to the adapter so node transitions + involving an `agent` slot in state emit `agent.handoff`. The standard + pattern for supervisor / multi-agent graphs. +- **`wrap_llm_for_langgraph(llm, adapter=...)`** — wraps a chat model so + each invocation also emits `model.invoke` (typed event with token + usage, model name, provider, latency). +- **Capture configuration** — uses `CaptureConfig.standard()` (L1, L3, + L4a, L5a, L6 enabled; raw payloads dropped). Swap for + `CaptureConfig.full()` to keep prompt / response content in the + emitted events. + +## Topology + +``` + planner -> researcher -> writer -> END +``` + +Each node is a pure Python function over the dict-typed graph state. The +`writer` node calls a `MockLLM.invoke(...)` so the sample is offline. + +## Run + +```bash +pip install 'layerlens[langgraph]' +cd samples/instrument/langgraph +python main.py +``` + +Expected tail of output: + +``` +Graph result: {'topic': 'agent observability', 'plan': [...], 'agent': 'writer', + 'research': 'facts about agent observability', + 'summary': 'Drafted summary using cached research.'} +Events emitted: {'agent.input': 1, 'agent.output': 1, 'agent.state.change': 1, + 'environment.config': 1, 'model.invoke': 1} +``` + +(Counts may include `agent.handoff` if a `HandoffDetector` is configured +to track the same agent slot the nodes write to.) + +## Going to production + +Three steps to turn this sample into a real instrumented agent: + +1. **Swap the LLM.** Replace `MockLLM()` with `ChatOpenAI(model=...)` + (or any provider) and pass it through `wrap_llm_for_langgraph(llm, + adapter=adapter)` so model calls emit typed `model.invoke` events. +2. **Ship telemetry.** Install the transport extra + (`pip install 'layerlens[transport-http]'`) and the sample will pick + up `HttpEventSink` automatically — events will POST to + `LAYERLENS_STRATIX_BASE_URL/telemetry/spans` using + `LAYERLENS_STRATIX_API_KEY`. +3. **Tighten capture.** Switch to `CaptureConfig.full()` if your + workspace allows raw prompt / response capture, or define a custom + `CaptureConfig` to gate specific event layers. + +See `docs/adapters/frameworks-langgraph.md` for the full reference. diff --git a/samples/instrument/langgraph/main.py b/samples/instrument/langgraph/main.py index 5bf1a4c8..a975fd40 100644 --- a/samples/instrument/langgraph/main.py +++ b/samples/instrument/langgraph/main.py @@ -1,20 +1,30 @@ -"""Sample: instrument a LangGraph state machine with LayerLens. +"""Sample: instrument a 3-node LangGraph state machine with LayerLens. -Builds a tiny one-node ``StateGraph``, wraps it with +Builds a ``planner -> researcher -> writer`` ``StateGraph``, wraps it with ``LayerLensLangGraphAdapter.wrap_graph``, and invokes it. The adapter emits -``environment.config`` + ``agent.input`` + ``agent.output`` (and -``agent.state.change`` if the state changed) which ship to atlas-app via -``HttpEventSink``. +``environment.config`` + ``agent.input`` + ``agent.output`` per execution +plus an ``agent.state.change`` for every node that mutates state. -The sample does not require an LLM provider — the node is a pure Python -function — so no API key is needed. This keeps the sample fast, free, and -network-independent so it can be used as a smoke test for the adapter -plumbing itself. +The LLM call inside the ``writer`` node is mocked — the sample is designed +to be self-contained, network-free, and runnable as a smoke test for the +adapter plumbing itself. To plug in a real provider, swap the +``MockLLM.invoke(...)`` call for ``ChatOpenAI(...).invoke(...)`` (with +``OPENAI_API_KEY`` set) and instrument it through ``wrap_llm_for_langgraph`` +so the call also emits ``model.invoke`` events. + +A ``HandoffDetector`` is attached to the adapter so each node transition +emits an ``agent.handoff`` event — the standard signal for multi-agent +LangGraph topologies. Run:: pip install 'layerlens[langgraph]' - python -m samples.instrument.langgraph.main + cd samples/instrument/langgraph && python main.py + +If the optional ``layerlens.instrument.transport.sink_http`` extra is +installed, telemetry ships to atlas-app over HTTP. Otherwise the sample +falls back to the in-memory event buffer that every adapter maintains for +local inspection / replay. """ from __future__ import annotations @@ -23,13 +33,102 @@ from typing import Any from layerlens.instrument.adapters._base import CaptureConfig -from layerlens.instrument.transport.sink_http import HttpEventSink -from layerlens.instrument.adapters.frameworks.langgraph import LayerLensLangGraphAdapter +from layerlens.instrument.adapters.frameworks.langgraph import ( + HandoffDetector, + LayerLensLangGraphAdapter, + wrap_llm_for_langgraph, +) + + +# --------------------------------------------------------------------------- +# Mock LLM — keeps the sample free of network / API keys. +# --------------------------------------------------------------------------- + + +class _MockLLMResponse: + def __init__(self, content: str) -> None: + self.content = content + self.type = "ai" + self.usage_metadata = {"input_tokens": 12, "output_tokens": 8} + + +class MockLLM: + """Tiny stand-in for a chat model. Real samples should pass a + ``langchain_openai.ChatOpenAI`` (or any provider with the same + ``invoke(messages) -> response`` shape) instead. + """ + + model_name = "mock-llm-1" + + def invoke(self, messages: Any, **kwargs: Any) -> _MockLLMResponse: + del messages, kwargs + return _MockLLMResponse("Drafted summary using cached research.") + + +# --------------------------------------------------------------------------- +# Three-node graph: planner -> researcher -> writer. +# --------------------------------------------------------------------------- + + +def _build_compiled_graph() -> Any: + """Construct a 3-node ``StateGraph`` and return the compiled form. + + Imported lazily so the file can still be imported as a module on a + machine that does not have the ``langgraph`` extra installed. + """ + from langgraph.graph import END, StateGraph + + def planner(state: dict[str, Any]) -> dict[str, Any]: + topic = state.get("topic", "untitled") + return {**state, "plan": [f"research:{topic}", f"write:{topic}"], "agent": "planner"} + + def researcher(state: dict[str, Any]) -> dict[str, Any]: + topic = state.get("topic", "untitled") + return {**state, "research": f"facts about {topic}", "agent": "researcher"} + + def writer(state: dict[str, Any], llm: MockLLM | None = None) -> dict[str, Any]: + # Real samples would call ``llm.invoke(...)`` with real prompts; we + # call the mock so the sample stays network-independent. + used_llm = llm or MockLLM() + response = used_llm.invoke( + [{"role": "user", "content": f"Summarize: {state.get('research')}"}], + ) + return {**state, "summary": response.content, "agent": "writer"} + + graph: StateGraph = StateGraph(dict) + graph.add_node("planner", planner) + graph.add_node("researcher", researcher) + graph.add_node("writer", writer) + graph.set_entry_point("planner") + graph.add_edge("planner", "researcher") + graph.add_edge("researcher", "writer") + graph.add_edge("writer", END) + return graph.compile() + + +# --------------------------------------------------------------------------- +# Optional HTTP sink — the M1.E transport PR ships ``HttpEventSink``. Until +# that lands the sample falls back to the in-memory event buffer that every +# adapter maintains. +# --------------------------------------------------------------------------- + + +def _build_sink() -> Any | None: + try: + from layerlens.instrument.transport.sink_http import HttpEventSink # type: ignore[import-not-found,unused-ignore] + except ImportError: + return None + return HttpEventSink( + adapter_name="langgraph", + path="/telemetry/spans", + max_batch=10, + flush_interval_s=1.0, + ) def main() -> int: try: - from langgraph.graph import END, StateGraph + compiled = _build_compiled_graph() except ImportError: print( "langgraph not installed. Install with:\n" @@ -38,35 +137,52 @@ def main() -> int: ) return 2 - sink = HttpEventSink( - adapter_name="langgraph", - path="/telemetry/spans", - max_batch=10, - flush_interval_s=1.0, - ) + # HandoffDetector tracks the ``agent`` slot in state and emits an + # ``agent.handoff`` event each time it changes — the standard pattern + # for multi-agent LangGraph topologies. + detector = HandoffDetector() + detector.register_agents("planner", "researcher", "writer") - adapter = LayerLensLangGraphAdapter(capture_config=CaptureConfig.standard()) - adapter.add_sink(sink) + adapter = LayerLensLangGraphAdapter( + capture_config=CaptureConfig.standard(), + handoff_detector=detector, + ) + sink = _build_sink() + if sink is not None: + adapter.add_sink(sink) adapter.connect() - def greet(state: dict[str, Any]) -> dict[str, Any]: - return {"messages": ["hi"], "count": state.get("count", 0) + 1} - - graph: StateGraph = StateGraph(dict) - graph.add_node("greet", greet) - graph.set_entry_point("greet") - graph.add_edge("greet", END) - compiled = graph.compile() + # Demonstrate the wrapped LLM helper too — even though the writer node + # constructs its own MockLLM internally, this shows the recommended + # production pattern: pass a ``TracedLLM`` so each model call emits + # ``model.invoke`` events alongside the graph events. + traced_llm = wrap_llm_for_langgraph(MockLLM(), adapter=adapter) + _ = traced_llm.invoke([{"role": "user", "content": "warm-up"}]) try: - traced = adapter.wrap_graph(compiled) - result = traced.invoke({"count": 0}) - print(f"Result: {result}") + traced_graph = adapter.wrap_graph(compiled) + result = traced_graph.invoke({"topic": "agent observability"}) + print(f"Graph result: {result}") + + # The adapter buffers every emitted event in ``_trace_events`` for + # local inspection / replay. Surface a quick summary so the sample + # is also useful as an offline smoke test. + events_by_type: dict[str, int] = {} + for evt in adapter._trace_events: # type: ignore[attr-defined] + events_by_type[evt["event_type"]] = events_by_type.get(evt["event_type"], 0) + 1 + print(f"Events emitted: {dict(sorted(events_by_type.items()))}") finally: - sink.close() + if sink is not None: + sink.close() adapter.disconnect() - print("Telemetry shipped. Check the LayerLens dashboard adapter health page.") + if sink is None: + print( + "Note: layerlens.instrument.transport.sink_http not installed; " + "events were buffered in-process only.", + ) + else: + print("Telemetry shipped. Check the LayerLens dashboard adapter health page.") return 0 diff --git a/src/layerlens/instrument/adapters/frameworks/langgraph/__init__.py b/src/layerlens/instrument/adapters/frameworks/langgraph/__init__.py index fa2d0c39..5477950c 100644 --- a/src/layerlens/instrument/adapters/frameworks/langgraph/__init__.py +++ b/src/layerlens/instrument/adapters/frameworks/langgraph/__init__.py @@ -54,5 +54,41 @@ def my_tool(state): ] -# Backward-compat aliases for users coming from ateam. -STRATIXLangGraphAdapter = LayerLensLangGraphAdapter # noqa: N816 - backward-compat alias for ateam users +# --------------------------------------------------------------------------- +# Backward-compat aliases (Round-2 deliberation item 23) +# --------------------------------------------------------------------------- +# Users porting from the ``ateam`` reference implementation imported the +# adapter under the ``STRATIX*`` name. We keep the symbol resolvable but +# raise a ``DeprecationWarning`` on access so callers see the v2.0 removal +# notice in their existing test runs without us breaking import-time +# behaviour. The warning fires via PEP 562 module-level ``__getattr__`` so +# it is emitted at *attribute access* time only — a bare module-level +# assignment would warn at every package import even when the alias is +# never referenced. + +import warnings as _warnings +from typing import Any as _Any + +_DEPRECATED_ALIASES: dict[str, tuple[str, _Any]] = { + "STRATIXLangGraphAdapter": ("LayerLensLangGraphAdapter", LayerLensLangGraphAdapter), +} + + +def __getattr__(name: str) -> _Any: + """Resolve deprecated ``STRATIX*`` aliases with a ``DeprecationWarning``. + + PEP 562 module-level ``__getattr__`` is invoked for attributes not + found via normal lookup. Aliases are kept out of ``__all__`` and out + of static binding so star-imports do not pull them and so the warning + fires only when a caller explicitly references the old name. + """ + if name in _DEPRECATED_ALIASES: + new_name, target = _DEPRECATED_ALIASES[name] + _warnings.warn( + f"`{name}` is deprecated and will be removed in layerlens v2.0; " + f"use `{new_name}` instead.", + DeprecationWarning, + stacklevel=2, + ) + return target + raise AttributeError(f"module {__name__!r} has no attribute {name!r}") diff --git a/src/layerlens/instrument/adapters/frameworks/langgraph/lifecycle.py b/src/layerlens/instrument/adapters/frameworks/langgraph/lifecycle.py index e0fbb915..3c42c730 100644 --- a/src/layerlens/instrument/adapters/frameworks/langgraph/lifecycle.py +++ b/src/layerlens/instrument/adapters/frameworks/langgraph/lifecycle.py @@ -158,6 +158,11 @@ def health_check(self) -> AdapterHealth: ) def get_adapter_info(self) -> AdapterInfo: + # Round-2 deliberation item 20: surface the v2-only requirement on + # both ``info()`` (the manifest wrapper that re-applies the class + # attribute) and ``get_adapter_info()`` (the subclass override) so + # the atlas-app catalog UI shows the correct compat badge regardless + # of which entrypoint the manifest emitter ends up calling. return AdapterInfo( name="LayerLensLangGraphAdapter", version=self.VERSION, @@ -171,6 +176,7 @@ def get_adapter_info(self) -> AdapterInfo: AdapterCapability.REPLAY, ], description="LayerLens adapter for LangGraph agent framework", + requires_pydantic=PydanticCompat.V2_ONLY, ) def serialize_for_replay(self) -> ReplayableTrace: diff --git a/tests/instrument/adapters/frameworks/test_langgraph.py b/tests/instrument/adapters/frameworks/test_langgraph.py new file mode 100644 index 00000000..81e55e49 --- /dev/null +++ b/tests/instrument/adapters/frameworks/test_langgraph.py @@ -0,0 +1,534 @@ +"""Unit tests for the LangGraph framework adapter. + +Mocked at the SDK shape level — no real LangGraph runtime needed. Each +test exercises one slice of the adapter surface using a duck-typed +``_FakeCompiledGraph`` that mimics ``langgraph.graph.StateGraph.compile()``. + +Coverage: + +* Lifecycle (connect → healthy → disconnect → disconnected). +* ``ADAPTER_CLASS`` registry export. +* ``info()`` / ``get_adapter_info()`` reports ``requires_pydantic = V2_ONLY`` + per Round-2 deliberation item 20. +* ``STRATIXLangGraphAdapter`` alias resolves to ``LayerLensLangGraphAdapter`` + and emits a ``DeprecationWarning`` per Round-2 deliberation item 23. +* ``wrap_graph(...).invoke(...)`` emits ``environment.config`` + + ``agent.input`` + ``agent.output`` (and ``agent.state.change`` when the + state hash changes). +* ``ainvoke(...)`` mirrors ``invoke(...)``. +* Errors during graph execution still emit ``agent.output`` with the + exception captured. +* ``HandoffDetector`` integration — node transitions emit ``agent.handoff``. +* ``trace_node`` / ``trace_langgraph_tool`` decorator emit the right + legacy events when used standalone. +* ``wrap_llm_for_langgraph`` / ``TracedLLM`` emits ``model.invoke``. +* ``serialize_for_replay`` returns a ``ReplayableTrace`` with the + framework string and ``capture_config`` payload. +""" + +from __future__ import annotations + +import asyncio +import warnings +from typing import Any, Dict, List + +import pytest + +from layerlens.instrument.adapters._base import AdapterStatus, CaptureConfig +from layerlens.instrument.adapters.frameworks.langgraph import ( + ADAPTER_CLASS, + TracedLLM, + NodeTracer, + HandoffDetector, + LangGraphStateAdapter, + LayerLensLangGraphAdapter, + trace_node, + detect_handoff, + trace_langgraph_tool, + wrap_llm_for_langgraph, +) +from layerlens.instrument.adapters._base.pydantic_compat import PydanticCompat + +# --------------------------------------------------------------------------- +# Helpers — minimal duck-typed LangGraph + STRATIX surfaces. +# --------------------------------------------------------------------------- + + +class _RecordingStratix: + """Duck-typed STRATIX client that captures ``emit(event_type, payload)``.""" + + def __init__(self) -> None: + self.events: List[Dict[str, Any]] = [] + + def emit(self, *args: Any, **kwargs: Any) -> None: + if len(args) == 2 and isinstance(args[0], str): + self.events.append({"event_type": args[0], "payload": args[1]}) + + +class _FakeCompiledGraph: + """Duck-typed ``StateGraph.compile()`` result used in tests. + + Calling ``invoke`` runs an in-process function that returns the new + state. ``ainvoke`` is provided as the async coroutine equivalent. + """ + + def __init__( + self, + name: str, + run: Any, + ) -> None: + self.name = name + self._run = run + + def invoke(self, state: Any, config: Any = None) -> Any: + return self._run(state) + + async def ainvoke(self, state: Any, config: Any = None) -> Any: + return self._run(state) + + +class _FakeLLM: + """Duck-typed LangChain chat model (``invoke`` / ``ainvoke``).""" + + model_name = "gpt-test" + + class _Response: + def __init__(self, content: str) -> None: + self.content = content + self.type = "ai" + self.usage_metadata = {"input_tokens": 4, "output_tokens": 2} + + def invoke(self, messages: Any, **kwargs: Any) -> "_FakeLLM._Response": + del messages, kwargs + return _FakeLLM._Response("hi from fake llm") + + async def ainvoke(self, messages: Any, **kwargs: Any) -> "_FakeLLM._Response": + del messages, kwargs + return _FakeLLM._Response("hi from fake llm async") + + +# --------------------------------------------------------------------------- +# Registry / metadata +# --------------------------------------------------------------------------- + + +def test_adapter_class_export() -> None: + """Registry lazy-loading expects ``ADAPTER_CLASS`` to point at the adapter.""" + assert ADAPTER_CLASS is LayerLensLangGraphAdapter + + +def test_framework_and_version_constants() -> None: + assert LayerLensLangGraphAdapter.FRAMEWORK == "langgraph" + assert LayerLensLangGraphAdapter.VERSION + + +def test_class_attribute_requires_pydantic_v2() -> None: + """Round-2 item 20: every framework adapter must declare its compat.""" + assert LayerLensLangGraphAdapter.requires_pydantic == PydanticCompat.V2_ONLY + + +def test_get_adapter_info_reports_v2_only() -> None: + """Round-2 item 20: catalog manifest reads ``info()`` — must report v2-only.""" + a = LayerLensLangGraphAdapter() + info = a.get_adapter_info() + assert info.framework == "langgraph" + assert info.name == "LayerLensLangGraphAdapter" + assert info.requires_pydantic == PydanticCompat.V2_ONLY + + +def test_info_wrapper_also_reports_v2_only() -> None: + """``info()`` (the BaseAdapter wrapper) must agree with the class attribute.""" + a = LayerLensLangGraphAdapter() + assert a.info().requires_pydantic == PydanticCompat.V2_ONLY + + +# --------------------------------------------------------------------------- +# Lifecycle +# --------------------------------------------------------------------------- + + +def test_lifecycle() -> None: + a = LayerLensLangGraphAdapter() + a.connect() + assert a.is_connected is True + assert a.status == AdapterStatus.HEALTHY + + health = a.health_check() + assert health.framework_name == "langgraph" + assert health.adapter_version == LayerLensLangGraphAdapter.VERSION + + a.disconnect() + assert a.is_connected is False + assert a.status == AdapterStatus.DISCONNECTED + + +def test_constructor_accepts_capture_config() -> None: + cfg = CaptureConfig.standard() + a = LayerLensLangGraphAdapter(capture_config=cfg) + assert a.capture_config is cfg + + +def test_constructor_legacy_kwargs_map_to_capture_config() -> None: + """Legacy ``emit_environment_config`` / ``emit_agent_code`` flags are honoured.""" + a = LayerLensLangGraphAdapter( + stratix_instance=_RecordingStratix(), + emit_environment_config=False, + emit_agent_code=True, + ) + assert a.capture_config.l4a_environment_config is False + assert a.capture_config.l2_agent_code is True + + +# --------------------------------------------------------------------------- +# Backward-compat alias (Round-2 item 23) +# --------------------------------------------------------------------------- + + +def test_stratix_alias_resolves_to_layerlens_class() -> None: + """The ``STRATIX*`` name still resolves for existing ateam users.""" + import layerlens.instrument.adapters.frameworks.langgraph as mod + + with warnings.catch_warnings(): + warnings.simplefilter("ignore", DeprecationWarning) + cls = mod.STRATIXLangGraphAdapter + assert cls is LayerLensLangGraphAdapter + + +def test_stratix_alias_emits_deprecation_warning() -> None: + """Round-2 item 23: alias access must raise ``DeprecationWarning``.""" + import layerlens.instrument.adapters.frameworks.langgraph as mod + + with warnings.catch_warnings(record=True) as caught: + warnings.simplefilter("always") + _ = mod.STRATIXLangGraphAdapter + deprecation_warnings = [w for w in caught if issubclass(w.category, DeprecationWarning)] + assert len(deprecation_warnings) == 1 + msg = str(deprecation_warnings[0].message) + assert "STRATIXLangGraphAdapter" in msg + assert "LayerLensLangGraphAdapter" in msg + assert "v2.0" in msg + + +def test_unknown_attribute_still_raises_attribute_error() -> None: + """``__getattr__`` must NOT silently swallow unknown names.""" + import layerlens.instrument.adapters.frameworks.langgraph as mod + + with pytest.raises(AttributeError): + _ = mod.NoSuchSymbol # type: ignore[attr-defined] + + +# --------------------------------------------------------------------------- +# Graph wrapping → event emission +# --------------------------------------------------------------------------- + + +def test_wrap_graph_invoke_emits_lifecycle_events() -> None: + """Wrapping a compiled graph emits config + input + output events.""" + stratix = _RecordingStratix() + adapter = LayerLensLangGraphAdapter( + stratix_instance=stratix, + capture_config=CaptureConfig.full(), + ) + adapter.connect() + + graph = _FakeCompiledGraph( + name="test-graph", + run=lambda s: {**s, "result": "done"}, + ) + traced = adapter.wrap_graph(graph) + out = traced.invoke({"count": 0}) + + assert out == {"count": 0, "result": "done"} + types = [e["event_type"] for e in stratix.events] + assert "environment.config" in types + assert "agent.input" in types + assert "agent.output" in types + # State changed (added "result"), so a state-change event must fire. + assert "agent.state.change" in types + + +def test_wrap_graph_emits_state_change_only_when_hash_changes() -> None: + """Identity-mapping nodes should NOT emit ``agent.state.change``.""" + stratix = _RecordingStratix() + adapter = LayerLensLangGraphAdapter( + stratix_instance=stratix, + capture_config=CaptureConfig.full(), + ) + adapter.connect() + + graph = _FakeCompiledGraph(name="passthrough", run=lambda s: s) + traced = adapter.wrap_graph(graph) + traced.invoke({"k": "v"}) + + types = [e["event_type"] for e in stratix.events] + assert "agent.state.change" not in types + + +def test_wrap_graph_invoke_failure_emits_output_with_error() -> None: + """An exception inside the graph still emits ``agent.output`` with the error.""" + + def _boom(_: Any) -> Any: + raise RuntimeError("graph blew up") + + stratix = _RecordingStratix() + adapter = LayerLensLangGraphAdapter( + stratix_instance=stratix, + capture_config=CaptureConfig.full(), + ) + adapter.connect() + + graph = _FakeCompiledGraph(name="failing", run=_boom) + traced = adapter.wrap_graph(graph) + + with pytest.raises(RuntimeError): + traced.invoke({"x": 1}) + + out = next(e for e in stratix.events if e["event_type"] == "agent.output") + assert out["payload"]["error"] == "graph blew up" + + +def test_wrap_graph_ainvoke_emits_lifecycle_events() -> None: + """Async graph invocation emits the same lifecycle events as ``invoke``.""" + stratix = _RecordingStratix() + adapter = LayerLensLangGraphAdapter( + stratix_instance=stratix, + capture_config=CaptureConfig.full(), + ) + adapter.connect() + + graph = _FakeCompiledGraph( + name="async-graph", + run=lambda s: {**s, "answer": 42}, + ) + traced = adapter.wrap_graph(graph) + out = asyncio.run(traced.ainvoke({"q": "what?"})) + + assert out == {"q": "what?", "answer": 42} + types = [e["event_type"] for e in stratix.events] + assert "agent.input" in types + assert "agent.output" in types + + +def test_traced_graph_proxies_unknown_attributes_to_underlying_graph() -> None: + """Unknown attribute access on the wrapper falls through to the graph.""" + adapter = LayerLensLangGraphAdapter() + adapter.connect() + + graph = _FakeCompiledGraph(name="proxy", run=lambda s: s) + graph.custom_attr = "hello" # type: ignore[attr-defined] + traced = adapter.wrap_graph(graph) + assert traced.custom_attr == "hello" + + +# --------------------------------------------------------------------------- +# Handoff detection +# --------------------------------------------------------------------------- + + +def test_handoff_detector_emits_handoff_on_agent_change() -> None: + """Routing from one agent to another emits a single handoff event.""" + stratix = _RecordingStratix() + detector = HandoffDetector(stratix_instance=stratix) + detector.register_agents("researcher", "writer") + detector.set_current_agent("researcher") + + handoff = detector.detect_handoff("writer", state={"task": "summarize"}) + assert handoff is not None + assert handoff.from_agent == "researcher" + assert handoff.to_agent == "writer" + + handoffs = [e for e in stratix.events if e["event_type"] == "agent.handoff"] + assert len(handoffs) == 1 + assert handoffs[0]["payload"]["from_agent"] == "researcher" + assert handoffs[0]["payload"]["to_agent"] == "writer" + + +def test_handoff_detector_returns_none_on_same_agent() -> None: + """No event when staying with the same agent.""" + stratix = _RecordingStratix() + detector = HandoffDetector(stratix_instance=stratix) + detector.set_current_agent("a") + assert detector.detect_handoff("a") is None + assert all(e["event_type"] != "agent.handoff" for e in stratix.events) + + +def test_detect_handoff_helper_emits_when_agents_differ() -> None: + stratix = _RecordingStratix() + h = detect_handoff( + from_agent="a", + to_agent="b", + stratix_instance=stratix, + reason="testing", + ) + assert h is not None + handoffs = [e for e in stratix.events if e["event_type"] == "agent.handoff"] + assert len(handoffs) == 1 + assert handoffs[0]["payload"]["reason"] == "testing" + + +def test_adapter_with_handoff_detector_emits_on_node_transition() -> None: + """When attached, the detector fires on every ``on_node_start``.""" + stratix = _RecordingStratix() + detector = HandoffDetector(stratix_instance=stratix) + detector.register_agents("planner", "executor") + detector.set_current_agent("planner") + + adapter = LayerLensLangGraphAdapter( + stratix_instance=stratix, + capture_config=CaptureConfig.full(), + handoff_detector=detector, + ) + adapter.connect() + + execution = adapter.on_graph_start( + graph_id="g1", + execution_id="g1:1", + initial_state={"step": 0}, + ) + # First node: same as current agent, no handoff. + adapter.on_node_start(execution, "planner", {"step": 1}) + # Second node: triggers handoff. + adapter.on_node_start(execution, "executor", {"step": 2}) + + handoffs = [e for e in stratix.events if e["event_type"] == "agent.handoff"] + assert len(handoffs) == 1 + assert handoffs[0]["payload"]["to_agent"] == "executor" + + +# --------------------------------------------------------------------------- +# Standalone decorators +# --------------------------------------------------------------------------- + + +def test_trace_node_decorator_emits_state_change() -> None: + """``@trace_node(stratix)`` decorates a function and emits state change.""" + stratix = _RecordingStratix() + + @trace_node(stratix_instance=stratix) + def my_node(state: dict[str, Any]) -> dict[str, Any]: + return {**state, "touched": True} + + out = my_node({"x": 1}) + assert out == {"x": 1, "touched": True} + + state_changes = [e for e in stratix.events if e["event_type"] == "agent.state.change"] + assert len(state_changes) == 1 + assert state_changes[0]["payload"]["node_name"] == "my_node" + + +def test_node_tracer_no_state_change_when_state_unchanged() -> None: + """Identity nodes should not emit ``agent.state.change``.""" + stratix = _RecordingStratix() + tracer = NodeTracer(stratix_instance=stratix) + + @tracer.decorate + def passthrough(state: dict[str, Any]) -> dict[str, Any]: + return state + + passthrough({"k": "v"}) + assert all(e["event_type"] != "agent.state.change" for e in stratix.events) + + +def test_trace_langgraph_tool_decorator_emits_tool_call() -> None: + """``@trace_langgraph_tool`` emits ``tool.call`` per invocation.""" + stratix = _RecordingStratix() + + @trace_langgraph_tool(stratix_instance=stratix) + def search(query: str) -> str: + return f"results for {query}" + + out = search("python") + assert out == "results for python" + + tool_calls = [e for e in stratix.events if e["event_type"] == "tool.call"] + assert len(tool_calls) == 1 + assert tool_calls[0]["payload"]["tool_name"] == "search" + assert tool_calls[0]["payload"]["error"] is None + + +def test_trace_langgraph_tool_captures_exception() -> None: + stratix = _RecordingStratix() + + @trace_langgraph_tool(stratix_instance=stratix) + def failing(query: str) -> str: + raise ValueError(f"bad query: {query}") + + with pytest.raises(ValueError): + failing("x") + + tool_calls = [e for e in stratix.events if e["event_type"] == "tool.call"] + assert len(tool_calls) == 1 + assert "bad query" in tool_calls[0]["payload"]["error"] + + +# --------------------------------------------------------------------------- +# LLM wrapping +# --------------------------------------------------------------------------- + + +def test_wrap_llm_for_langgraph_emits_model_invoke() -> None: + """``TracedLLM`` emits ``model.invoke`` for each invocation.""" + stratix = _RecordingStratix() + llm = _FakeLLM() + traced = wrap_llm_for_langgraph(llm, stratix_instance=stratix) + assert isinstance(traced, TracedLLM) + + response = traced.invoke([{"role": "user", "content": "hi"}]) + assert response.content == "hi from fake llm" + + invokes = [e for e in stratix.events if e["event_type"] == "model.invoke"] + assert len(invokes) == 1 + payload = invokes[0]["payload"] + assert payload["model"] == "gpt-test" + assert payload["provider"] == "openai" or payload["provider"] == "unknown" + assert payload["error"] is None + + +def test_traced_llm_ainvoke_emits_model_invoke() -> None: + stratix = _RecordingStratix() + traced = TracedLLM(_FakeLLM(), stratix_instance=stratix) + + response = asyncio.run(traced.ainvoke([{"role": "user", "content": "hi"}])) + assert response.content == "hi from fake llm async" + + invokes = [e for e in stratix.events if e["event_type"] == "model.invoke"] + assert len(invokes) == 1 + + +# --------------------------------------------------------------------------- +# State adapter (sanity) +# --------------------------------------------------------------------------- + + +def test_state_adapter_get_hash_stable_across_calls() -> None: + sa = LangGraphStateAdapter() + h1 = sa.get_hash({"a": 1, "b": [1, 2, 3]}) + h2 = sa.get_hash({"b": [1, 2, 3], "a": 1}) + assert h1 == h2 # canonical (sort_keys) JSON + + +def test_state_adapter_diff_reports_changes() -> None: + sa = LangGraphStateAdapter() + before = sa.snapshot({"x": 1, "y": 2}) + after = sa.snapshot({"x": 1, "z": 3}) + diff = sa.diff(before, after) + assert "z" in diff["added"] + assert "y" in diff["removed"] + + +# --------------------------------------------------------------------------- +# Replay serialization +# --------------------------------------------------------------------------- + + +def test_serialize_for_replay_returns_replayable_trace() -> None: + adapter = LayerLensLangGraphAdapter( + stratix_instance=_RecordingStratix(), + capture_config=CaptureConfig.full(), + ) + adapter.connect() + + rt = adapter.serialize_for_replay() + assert rt.framework == "langgraph" + assert rt.adapter_name == "LayerLensLangGraphAdapter" + assert "capture_config" in rt.config From 7f8b3af52908a90c65e7d284390d738a72852836 Mon Sep 17 00:00:00 2001 From: mmercuri Date: Sun, 10 May 2026 08:39:08 -0700 Subject: [PATCH 2/2] fix(instrument): declare STREAMING capability for langgraph (closes PR #119 deferred) LangGraph wraps llm.stream / llm.astream (see frameworks/langgraph/llm.py) and accumulates per-chunk events in TracedLLM. The atlas-app catalog UI reads AdapterCapability.STREAMING off info() to surface streaming support, but the langgraph adapter only declared REPLAY here. PR #119 (brand leak + capability declarations) wired STREAMING for the six adapters that lived on its branch; langgraph was deferred because it lives on its own source-port branch (PR #100). This closes that deferral per CLAUDE.md item 5/11. Capabilities declared honestly per CLAUDE.md 'no fake claims': REPLAY matches serialize_for_replay (already present); STREAMING matches the wrapped llm.stream / llm.astream entry-points. Tests: added test_declares_replay_and_streaming_capabilities regression asserting both capabilities surface via adapter.info().capabilities. Verification: * uv run python -m pytest tests/instrument/adapters/frameworks/test_langgraph.py -x -> 30 passed --- .../adapters/frameworks/langgraph/lifecycle.py | 1 + .../instrument/adapters/frameworks/test_langgraph.py | 11 +++++++++++ 2 files changed, 12 insertions(+) diff --git a/src/layerlens/instrument/adapters/frameworks/langgraph/lifecycle.py b/src/layerlens/instrument/adapters/frameworks/langgraph/lifecycle.py index 3c42c730..4e469dbb 100644 --- a/src/layerlens/instrument/adapters/frameworks/langgraph/lifecycle.py +++ b/src/layerlens/instrument/adapters/frameworks/langgraph/lifecycle.py @@ -174,6 +174,7 @@ def get_adapter_info(self) -> AdapterInfo: AdapterCapability.TRACE_STATE, AdapterCapability.TRACE_HANDOFFS, AdapterCapability.REPLAY, + AdapterCapability.STREAMING, ], description="LayerLens adapter for LangGraph agent framework", requires_pydantic=PydanticCompat.V2_ONLY, diff --git a/tests/instrument/adapters/frameworks/test_langgraph.py b/tests/instrument/adapters/frameworks/test_langgraph.py index 81e55e49..aaee9988 100644 --- a/tests/instrument/adapters/frameworks/test_langgraph.py +++ b/tests/instrument/adapters/frameworks/test_langgraph.py @@ -35,6 +35,7 @@ import pytest from layerlens.instrument.adapters._base import AdapterStatus, CaptureConfig +from layerlens.instrument.adapters._base.adapter import AdapterCapability from layerlens.instrument.adapters.frameworks.langgraph import ( ADAPTER_CLASS, TracedLLM, @@ -142,6 +143,16 @@ def test_info_wrapper_also_reports_v2_only() -> None: assert a.info().requires_pydantic == PydanticCompat.V2_ONLY +def test_declares_replay_and_streaming_capabilities() -> None: + """Catalog UI relies on declared capabilities; LangGraph wraps both + ``serialize_for_replay`` and a streaming entry-point (``llm.stream`` / + ``llm.astream``), so REPLAY and STREAMING must both be advertised.""" + a = LayerLensLangGraphAdapter() + caps = a.info().capabilities + assert AdapterCapability.REPLAY in caps + assert AdapterCapability.STREAMING in caps + + # --------------------------------------------------------------------------- # Lifecycle # ---------------------------------------------------------------------------