Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 14 additions & 2 deletions docs/adapters/frameworks-langgraph.md
Original file line number Diff line number Diff line change
Expand Up @@ -110,5 +110,17 @@ For platform-managed BYOK see `docs/adapters/byok.md` (atlas-app M1.B).
from layerlens.instrument.adapters.frameworks.langgraph import STRATIXLangGraphAdapter
```

`STRATIXLangGraphAdapter` is an alias for `LayerLensLangGraphAdapter` and will
be removed in v2.0.
`STRATIXLangGraphAdapter` is a deprecated alias for `LayerLensLangGraphAdapter`.
Accessing the alias raises a `DeprecationWarning` (visible under
`pytest -W error::DeprecationWarning`) and the symbol will be removed in
LayerLens v2.0. Update imports to `LayerLensLangGraphAdapter`.

## Pydantic compatibility

LangGraph >= 0.2 inherits `langchain-core`'s **Pydantic v2-only** requirement.
The adapter declares `requires_pydantic = PydanticCompat.V2_ONLY` on both the
class attribute and the `info()` / `get_adapter_info()` manifest payload, so
the catalog UI can warn users before they pin an incompatible runtime.
Importing `layerlens.instrument.adapters.frameworks.langgraph` under Pydantic
v1 raises a clear `RuntimeError` rather than letting LangChain raise an
opaque `ImportError` mid-callback.
72 changes: 72 additions & 0 deletions samples/instrument/langgraph/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
# LayerLens + LangGraph sample

A self-contained 3-node LangGraph state machine wrapped with
`LayerLensLangGraphAdapter`. Designed as a smoke test for the adapter
plumbing and as a worked example of multi-agent tracing — no API keys, no
network round-trips, no external services required.

## What it shows

- **`adapter.wrap_graph(compiled)`** — proxies `invoke` / `ainvoke` and
emits `environment.config` + `agent.input` + `agent.output` per execution.
- **`agent.state.change`** — emitted whenever the canonical state hash
changes between snapshots (each of the three nodes mutates state, so
this fires once per run).
- **`HandoffDetector`** — attached to the adapter so node transitions
involving an `agent` slot in state emit `agent.handoff`. The standard
pattern for supervisor / multi-agent graphs.
- **`wrap_llm_for_langgraph(llm, adapter=...)`** — wraps a chat model so
each invocation also emits `model.invoke` (typed event with token
usage, model name, provider, latency).
- **Capture configuration** — uses `CaptureConfig.standard()` (L1, L3,
L4a, L5a, L6 enabled; raw payloads dropped). Swap for
`CaptureConfig.full()` to keep prompt / response content in the
emitted events.

## Topology

```
planner -> researcher -> writer -> END
```

Each node is a pure Python function over the dict-typed graph state. The
`writer` node calls a `MockLLM.invoke(...)` so the sample is offline.

## Run

```bash
pip install 'layerlens[langgraph]'
cd samples/instrument/langgraph
python main.py
```

Expected tail of output:

```
Graph result: {'topic': 'agent observability', 'plan': [...], 'agent': 'writer',
'research': 'facts about agent observability',
'summary': 'Drafted summary using cached research.'}
Events emitted: {'agent.input': 1, 'agent.output': 1, 'agent.state.change': 1,
'environment.config': 1, 'model.invoke': 1}
```

(Counts may include `agent.handoff` if a `HandoffDetector` is configured
to track the same agent slot the nodes write to.)

## Going to production

Three steps to turn this sample into a real instrumented agent:

1. **Swap the LLM.** Replace `MockLLM()` with `ChatOpenAI(model=...)`
(or any provider) and pass it through `wrap_llm_for_langgraph(llm,
adapter=adapter)` so model calls emit typed `model.invoke` events.
2. **Ship telemetry.** Install the transport extra
(`pip install 'layerlens[transport-http]'`) and the sample will pick
up `HttpEventSink` automatically — events will POST to
`LAYERLENS_STRATIX_BASE_URL/telemetry/spans` using
`LAYERLENS_STRATIX_API_KEY`.
3. **Tighten capture.** Switch to `CaptureConfig.full()` if your
workspace allows raw prompt / response capture, or define a custom
`CaptureConfig` to gate specific event layers.

See `docs/adapters/frameworks-langgraph.md` for the full reference.
184 changes: 150 additions & 34 deletions samples/instrument/langgraph/main.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,30 @@
"""Sample: instrument a LangGraph state machine with LayerLens.
"""Sample: instrument a 3-node LangGraph state machine with LayerLens.

Builds a tiny one-node ``StateGraph``, wraps it with
Builds a ``planner -> researcher -> writer`` ``StateGraph``, wraps it with
``LayerLensLangGraphAdapter.wrap_graph``, and invokes it. The adapter emits
``environment.config`` + ``agent.input`` + ``agent.output`` (and
``agent.state.change`` if the state changed) which ship to atlas-app via
``HttpEventSink``.
``environment.config`` + ``agent.input`` + ``agent.output`` per execution
plus an ``agent.state.change`` for every node that mutates state.

The sample does not require an LLM provider — the node is a pure Python
function — so no API key is needed. This keeps the sample fast, free, and
network-independent so it can be used as a smoke test for the adapter
plumbing itself.
The LLM call inside the ``writer`` node is mocked — the sample is designed
to be self-contained, network-free, and runnable as a smoke test for the
adapter plumbing itself. To plug in a real provider, swap the
``MockLLM.invoke(...)`` call for ``ChatOpenAI(...).invoke(...)`` (with
``OPENAI_API_KEY`` set) and instrument it through ``wrap_llm_for_langgraph``
so the call also emits ``model.invoke`` events.

A ``HandoffDetector`` is attached to the adapter so each node transition
emits an ``agent.handoff`` event — the standard signal for multi-agent
LangGraph topologies.

Run::

pip install 'layerlens[langgraph]'
python -m samples.instrument.langgraph.main
cd samples/instrument/langgraph && python main.py

If the optional ``layerlens.instrument.transport.sink_http`` extra is
installed, telemetry ships to atlas-app over HTTP. Otherwise the sample
falls back to the in-memory event buffer that every adapter maintains for
local inspection / replay.
"""

from __future__ import annotations
Expand All @@ -23,13 +33,102 @@
from typing import Any

from layerlens.instrument.adapters._base import CaptureConfig
from layerlens.instrument.transport.sink_http import HttpEventSink
from layerlens.instrument.adapters.frameworks.langgraph import LayerLensLangGraphAdapter
from layerlens.instrument.adapters.frameworks.langgraph import (
HandoffDetector,
LayerLensLangGraphAdapter,
wrap_llm_for_langgraph,
)


# ---------------------------------------------------------------------------
# Mock LLM — keeps the sample free of network / API keys.
# ---------------------------------------------------------------------------


class _MockLLMResponse:
def __init__(self, content: str) -> None:
self.content = content
self.type = "ai"
self.usage_metadata = {"input_tokens": 12, "output_tokens": 8}


class MockLLM:
"""Tiny stand-in for a chat model. Real samples should pass a
``langchain_openai.ChatOpenAI`` (or any provider with the same
``invoke(messages) -> response`` shape) instead.
"""

model_name = "mock-llm-1"

def invoke(self, messages: Any, **kwargs: Any) -> _MockLLMResponse:
del messages, kwargs
return _MockLLMResponse("Drafted summary using cached research.")


# ---------------------------------------------------------------------------
# Three-node graph: planner -> researcher -> writer.
# ---------------------------------------------------------------------------


def _build_compiled_graph() -> Any:
"""Construct a 3-node ``StateGraph`` and return the compiled form.

Imported lazily so the file can still be imported as a module on a
machine that does not have the ``langgraph`` extra installed.
"""
from langgraph.graph import END, StateGraph

def planner(state: dict[str, Any]) -> dict[str, Any]:
topic = state.get("topic", "untitled")
return {**state, "plan": [f"research:{topic}", f"write:{topic}"], "agent": "planner"}

def researcher(state: dict[str, Any]) -> dict[str, Any]:
topic = state.get("topic", "untitled")
return {**state, "research": f"facts about {topic}", "agent": "researcher"}

def writer(state: dict[str, Any], llm: MockLLM | None = None) -> dict[str, Any]:
# Real samples would call ``llm.invoke(...)`` with real prompts; we
# call the mock so the sample stays network-independent.
used_llm = llm or MockLLM()
response = used_llm.invoke(
[{"role": "user", "content": f"Summarize: {state.get('research')}"}],
)
return {**state, "summary": response.content, "agent": "writer"}

graph: StateGraph = StateGraph(dict)
graph.add_node("planner", planner)
graph.add_node("researcher", researcher)
graph.add_node("writer", writer)
graph.set_entry_point("planner")
graph.add_edge("planner", "researcher")
graph.add_edge("researcher", "writer")
graph.add_edge("writer", END)
return graph.compile()


# ---------------------------------------------------------------------------
# Optional HTTP sink — the M1.E transport PR ships ``HttpEventSink``. Until
# that lands the sample falls back to the in-memory event buffer that every
# adapter maintains.
# ---------------------------------------------------------------------------


def _build_sink() -> Any | None:
try:
from layerlens.instrument.transport.sink_http import HttpEventSink # type: ignore[import-not-found,unused-ignore]
except ImportError:
return None
return HttpEventSink(
adapter_name="langgraph",
path="/telemetry/spans",
max_batch=10,
flush_interval_s=1.0,
)


def main() -> int:
try:
from langgraph.graph import END, StateGraph
compiled = _build_compiled_graph()
except ImportError:
print(
"langgraph not installed. Install with:\n"
Expand All @@ -38,35 +137,52 @@ def main() -> int:
)
return 2

sink = HttpEventSink(
adapter_name="langgraph",
path="/telemetry/spans",
max_batch=10,
flush_interval_s=1.0,
)
# HandoffDetector tracks the ``agent`` slot in state and emits an
# ``agent.handoff`` event each time it changes — the standard pattern
# for multi-agent LangGraph topologies.
detector = HandoffDetector()
detector.register_agents("planner", "researcher", "writer")

adapter = LayerLensLangGraphAdapter(capture_config=CaptureConfig.standard())
adapter.add_sink(sink)
adapter = LayerLensLangGraphAdapter(
capture_config=CaptureConfig.standard(),
handoff_detector=detector,
)
sink = _build_sink()
if sink is not None:
adapter.add_sink(sink)
adapter.connect()

def greet(state: dict[str, Any]) -> dict[str, Any]:
return {"messages": ["hi"], "count": state.get("count", 0) + 1}

graph: StateGraph = StateGraph(dict)
graph.add_node("greet", greet)
graph.set_entry_point("greet")
graph.add_edge("greet", END)
compiled = graph.compile()
# Demonstrate the wrapped LLM helper too — even though the writer node
# constructs its own MockLLM internally, this shows the recommended
# production pattern: pass a ``TracedLLM`` so each model call emits
# ``model.invoke`` events alongside the graph events.
traced_llm = wrap_llm_for_langgraph(MockLLM(), adapter=adapter)
_ = traced_llm.invoke([{"role": "user", "content": "warm-up"}])

try:
traced = adapter.wrap_graph(compiled)
result = traced.invoke({"count": 0})
print(f"Result: {result}")
traced_graph = adapter.wrap_graph(compiled)
result = traced_graph.invoke({"topic": "agent observability"})
print(f"Graph result: {result}")

# The adapter buffers every emitted event in ``_trace_events`` for
# local inspection / replay. Surface a quick summary so the sample
# is also useful as an offline smoke test.
events_by_type: dict[str, int] = {}
for evt in adapter._trace_events: # type: ignore[attr-defined]
events_by_type[evt["event_type"]] = events_by_type.get(evt["event_type"], 0) + 1
print(f"Events emitted: {dict(sorted(events_by_type.items()))}")
finally:
sink.close()
if sink is not None:
sink.close()
adapter.disconnect()

print("Telemetry shipped. Check the LayerLens dashboard adapter health page.")
if sink is None:
print(
"Note: layerlens.instrument.transport.sink_http not installed; "
"events were buffered in-process only.",
)
else:
print("Telemetry shipped. Check the LayerLens dashboard adapter health page.")
return 0


Expand Down
40 changes: 38 additions & 2 deletions src/layerlens/instrument/adapters/frameworks/langgraph/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,5 +54,41 @@ def my_tool(state):
]


# Backward-compat aliases for users coming from ateam.
STRATIXLangGraphAdapter = LayerLensLangGraphAdapter # noqa: N816 - backward-compat alias for ateam users
# ---------------------------------------------------------------------------
# Backward-compat aliases (Round-2 deliberation item 23)
# ---------------------------------------------------------------------------
# Users porting from the ``ateam`` reference implementation imported the
# adapter under the ``STRATIX*`` name. We keep the symbol resolvable but
# raise a ``DeprecationWarning`` on access so callers see the v2.0 removal
# notice in their existing test runs without us breaking import-time
# behaviour. The warning fires via PEP 562 module-level ``__getattr__`` so
# it is emitted at *attribute access* time only — a bare module-level
# assignment would warn at every package import even when the alias is
# never referenced.

import warnings as _warnings
from typing import Any as _Any

_DEPRECATED_ALIASES: dict[str, tuple[str, _Any]] = {
"STRATIXLangGraphAdapter": ("LayerLensLangGraphAdapter", LayerLensLangGraphAdapter),
}


def __getattr__(name: str) -> _Any:
"""Resolve deprecated ``STRATIX*`` aliases with a ``DeprecationWarning``.

PEP 562 module-level ``__getattr__`` is invoked for attributes not
found via normal lookup. Aliases are kept out of ``__all__`` and out
of static binding so star-imports do not pull them and so the warning
fires only when a caller explicitly references the old name.
"""
if name in _DEPRECATED_ALIASES:
new_name, target = _DEPRECATED_ALIASES[name]
_warnings.warn(
f"`{name}` is deprecated and will be removed in layerlens v2.0; "
f"use `{new_name}` instead.",
DeprecationWarning,
stacklevel=2,
)
return target
raise AttributeError(f"module {__name__!r} has no attribute {name!r}")
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,11 @@ def health_check(self) -> AdapterHealth:
)

def get_adapter_info(self) -> AdapterInfo:
# Round-2 deliberation item 20: surface the v2-only requirement on
# both ``info()`` (the manifest wrapper that re-applies the class
# attribute) and ``get_adapter_info()`` (the subclass override) so
# the atlas-app catalog UI shows the correct compat badge regardless
# of which entrypoint the manifest emitter ends up calling.
return AdapterInfo(
name="LayerLensLangGraphAdapter",
version=self.VERSION,
Expand All @@ -169,8 +174,10 @@ def get_adapter_info(self) -> AdapterInfo:
AdapterCapability.TRACE_STATE,
AdapterCapability.TRACE_HANDOFFS,
AdapterCapability.REPLAY,
AdapterCapability.STREAMING,
],
description="LayerLens adapter for LangGraph agent framework",
requires_pydantic=PydanticCompat.V2_ONLY,
)

def serialize_for_replay(self) -> ReplayableTrace:
Expand Down
Loading