Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
190 changes: 190 additions & 0 deletions docs/adapters/frameworks-langchain-lcel.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,190 @@
# LangChain LCEL Coverage

The LayerLens LangChain adapter instruments **LangChain Expression
Language (LCEL)** pipelines as a first-class observability surface.
This is the dominant authoring pattern in LangChain 0.2+ — pipelines
expressed via the `|` (pipe) operator over `Runnable` instances:

```python
chain = (
{"context": retriever, "question": RunnablePassthrough()}
| prompt
| llm
| StrOutputParser()
)
```

LCEL composition produces a **tree of runnables** at runtime
(`RunnableSequence`, `RunnableParallel`, `RunnableLambda`,
`RunnablePassthrough`, `RunnableBranch`). The adapter tracks the
entire tree, emits per-runnable events with composition metadata, and
emits a synthetic `chain.composition` snapshot at root completion so
debuggers can see what was executed in one glance.

## Coverage matrix

| Runnable primitive | Detected via | Per-step events | Composition metadata |
| ------------------------------ | ----------------------------- | ------------------------------------ | ----------------------------------------------------- |
| `RunnableSequence` | `name` starts with the string | `agent.input`, `agent.output`, `agent.code` | `kind=sequence`; child positions `seq:step:N` |
| `RunnableParallel<a,b,...>` | `name` starts with the string | `agent.input`, `agent.output`, `agent.code` | `kind=parallel`; declared branch keys parsed from name; child positions `map:key:K` |
| `RunnableLambda` | `name == "RunnableLambda"` | `agent.input`, `agent.output`, `agent.code` | `kind=lambda`; SHA-256 fingerprint over `(name, depth, position)` |
| `RunnablePassthrough` | `name == "RunnablePassthrough"` | `agent.input`, `agent.output`, `agent.code` | `kind=passthrough`; payload carries `passthrough=true` |
| `RunnableBranch` | `name` starts with the string | `agent.input`, `agent.output`, `agent.code` | `kind=branch`; child positions `condition:N` (predicates) and `branch:N` (bodies) |
| `RunnableConfig` (passthrough) | not a runnable; opaque kwargs | n/a — propagates run hierarchy via `parent_run_id` | tags + metadata forwarded into `agent.input` payloads |
| Non-LCEL runnable (prompts, parsers, models) | `name` does not match any LCEL prefix | Standard `model.invoke` / `tool.call` events fire as before; if observed under an LCEL parent, also emits `agent.code` with `kind=other` | parent linkage + composition position retained |

## Event reference

The adapter emits four LCEL-relevant event types:

### `agent.input` (L1) — per runnable start

Emitted when `on_chain_start` fires for an LCEL runnable.

```json
{
"type": "agent.input",
"payload": {
"run_id": "...",
"parent_run_id": "...",
"runnable": {
"kind": "lambda",
"name": "RunnableLambda",
"depth": 1,
"position": {
"parent_kind": "sequence",
"label": "2",
"role": "step"
},
"fingerprint": "cf97f2529fcb79e2"
},
"input": "<the input value passed to invoke()>"
}
}
```

### `agent.output` (L1) — per runnable end

Symmetric with `agent.input`. The payload includes `duration_ns` and
`status` (`"ok"` or `"error"`). Errors carry a separate `error` field.

### `agent.code` (L2) — per runnable end (pipeline structure)

The L2 event the spec maps to LCEL pipeline structure (04b §3 & §4).
Emitted once per runnable in the executed tree, with per-runnable
metadata (kind, depth, position, optional passthrough/fingerprint
markers).

### `agent.code` (L2) with `kind="chain.composition"` — synthetic graph snapshot

Emitted **once per root runnable** when the root completes (success or
error). Carries the full subtree as a flat node list plus an
aggregate summary so the dashboard can render the executed DAG without
having to reconstruct it from the per-step stream:

```json
{
"type": "agent.code",
"payload": {
"kind": "chain.composition",
"composition": {
"root_run_id": "...",
"root_kind": "sequence",
"root_name": "RunnableSequence",
"node_count": 9,
"max_depth": 2,
"kind_counts": {
"sequence": 1,
"parallel": 1,
"lambda": 1,
"passthrough": 1,
"branch": 1,
"other": 4
},
"status": "ok",
"nodes": [
{
"run_id": "...",
"parent_run_id": null,
"kind": "sequence",
"name": "RunnableSequence",
"depth": 0,
"status": "ok",
"duration_ns": 3092000,
"child_run_ids": ["...", "...", "..."]
}
// ... one entry per runnable in the tree
]
}
}
}
```

## Capture-config gating

LCEL events follow the standard `CaptureConfig` layer model:

| Layer | Field | Default | Controls |
| ---------------- | ----------------- | ------- | ---------------------------------------- |
| L1 (Agent I/O) | `l1_agent_io` | `True` | Per-runnable `agent.input` / `agent.output` |
| L2 (Agent Code) | `l2_agent_code` | `False` | Per-runnable `agent.code` AND the synthetic `chain.composition` snapshot |
| Cross-cutting | always-on | - | n/a — LCEL doesn't emit cross-cutting events |

**Recommended deployment:** `CaptureConfig.standard()` if you only need
inputs/outputs/model/tool calls (default). Switch to
`CaptureConfig.full()` (or set `l2_agent_code=True`) when you need the
pipeline DAG for debugging, replay, or visualization.

## Hierarchy and depth

The adapter computes depth from `parent_run_id` chains. A runnable
whose `parent_run_id` does NOT correspond to a runnable already
tracked by this handler is treated as a **new root** (depth 0). This
keeps the tracker resilient to:

* LangGraph nodes (handled by a separate code path that pre-empts LCEL
tracking — see the langgraph adapter docs)
* Pre-existing legacy chain calls that wrap an LCEL pipeline
* Multiple concurrent root runnables driven by different invocations

When a sub-graph parent IS tracked, the child is recorded with
`depth = parent_depth + 1`. The composition snapshot's `max_depth`
field reflects the deepest tracked descendant in the tree.

## Lambda fingerprinting

`RunnableLambda` instances expose a `fingerprint` field on both
`agent.input` and `agent.code` events. The fingerprint is a 16-char
hex prefix of `SHA-256(name | depth | parent_kind | role | label)`.
The same lambda invoked at the same composition position produces the
same fingerprint, enabling "did this lambda change between two runs?"
diffs in the UI.

The fingerprint does NOT include the inner callable's source code —
LangChain doesn't expose source through the callback path. For
fine-grained "code-changed" detection at the source level, instrument
the lambda directly with a `@layerlens.observe` decorator.

## LangGraph interaction

LangGraph drives LCEL pipelines under the hood, so a graph node's
`on_chain_start` callback may carry both `metadata["langgraph_node"]`
AND a runnable `name` (e.g. `RunnableSequence`). The adapter
**prefers the LangGraph signal**: when a `langgraph_node` marker is
present, the existing LangGraph attribution path runs and LCEL
tracking is suppressed for that subtree. This avoids double-emission
on graphs that LangGraph itself drives.

If you want LCEL tracking inside a graph node, invoke the LCEL
pipeline directly with `pipeline.invoke(input, config={"callbacks":
[handler]})` from within the node's body — LangChain will not attach
the `langgraph_node` marker to a manual `.invoke()`, so LCEL tracking
will engage.

## See also

* Spec: `docs/incubation-docs/adapter-framework/04-per-framework-specs/04b-langchain-adapter-spec.md` §1 weakness #4 and §4
* Sample: `samples/instrument/langchain/lcel_main.py` — runnable LCEL
pipeline with all five primitives, no API key required
* Tests: `tests/instrument/adapters/frameworks/langchain/test_lcel.py`
* Source: `src/layerlens/instrument/adapters/frameworks/langchain/lcel.py`
190 changes: 190 additions & 0 deletions samples/instrument/langchain/lcel_main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,190 @@
"""Sample: LCEL (LangChain Expression Language) instrumentation walkthrough.

Demonstrates the LCEL tracing capability added to the LayerLens
LangChain adapter (spec 04b §4). The sample builds a canonical RAG-
style LCEL pipeline:

{"context": retriever, "question": passthrough} | prompt | llm | parser

then invokes it with the LayerLens callback handler installed and
prints the events that the adapter emitted — including the synthetic
``chain.composition`` snapshot produced at root completion.

The sample runs **offline**. The "LLM" and "retriever" are local
``RunnableLambda`` stand-ins that return deterministic strings; no
network access or API key is required. To wire it into a real model,
swap the ``fake_llm`` / ``fake_retriever`` / ``fake_parser`` lambdas
for ``ChatOpenAI``, ``VectorStoreRetriever``, and ``StrOutputParser``
respectively.

Run::

pip install 'layerlens[langchain]'
python -m samples.instrument.langchain.lcel_main
"""

from __future__ import annotations

import sys
import json
from typing import Any

from layerlens.instrument.adapters._base import CaptureConfig
from layerlens.instrument.adapters.frameworks.langchain import LayerLensCallbackHandler


def main() -> int:
try:
from langchain_core.runnables import (
RunnableLambda,
RunnableBranch,
RunnableParallel,
RunnablePassthrough,
)
except ImportError:
print(
"langchain-core is not installed. Install with:\n"
" pip install 'layerlens[langchain]'",
file=sys.stderr,
)
return 2

# ------------------------------------------------------------------
# Build a representative LCEL pipeline that exercises every
# Runnable primitive the adapter knows about. The shape mirrors the
# canonical RAG pattern from langchain.com/docs/concepts/lcel.
# ------------------------------------------------------------------

def fake_retriever(question: str) -> str:
# Deterministic stand-in for a vector store retriever.
return f"Context: facts relevant to '{question}'."

def fake_llm(prompt_input: dict[str, str]) -> str:
return f"Answer using {prompt_input['context']!r} for: {prompt_input['question']!r}"

def fake_parser(response: str) -> str:
return response.strip()

def is_short_question(q: str) -> bool:
return len(q) < 20

short_branch = RunnableLambda(lambda q: f"Short answer for: {q}")
long_branch = RunnableLambda(lambda q: f"Detailed answer for: {q}")

pipeline = (
RunnableParallel(
context=RunnableLambda(fake_retriever),
question=RunnablePassthrough(),
)
| RunnableLambda(fake_llm)
| RunnableLambda(fake_parser)
| RunnableBranch(
(is_short_question, short_branch),
long_branch,
)
)

# ------------------------------------------------------------------
# Instrument with LayerLens. ``CaptureConfig.full()`` enables L1
# (agent.input/output) AND L2 (agent.code + chain.composition) so
# the printout below shows the entire LCEL signal surface.
# ------------------------------------------------------------------

handler = LayerLensCallbackHandler(capture_config=CaptureConfig.full())
handler.connect()

try:
result = pipeline.invoke("What is LCEL?", config={"callbacks": [handler]})
finally:
events = list(handler.get_events())
handler.disconnect()

print(f"Pipeline output: {result}")
print(f"Total events captured: {len(events)}")
print()

# ------------------------------------------------------------------
# Walk through the events to highlight what the adapter saw. We
# print one line per LCEL event type so the output stays readable.
# ------------------------------------------------------------------

runnable_inputs = [
e
for e in events
if e["type"] == "agent.input" and "runnable" in (e.get("payload") or {})
]
runnable_codes = [
e
for e in events
if e["type"] == "agent.code" and (e.get("payload") or {}).get("kind") != "chain.composition"
]
composition_events = [
e
for e in events
if e["type"] == "agent.code" and (e.get("payload") or {}).get("kind") == "chain.composition"
]

print(f"== LCEL agent.input events ({len(runnable_inputs)}) ==")
for e in runnable_inputs:
runnable = e["payload"]["runnable"]
position = runnable.get("position")
loc = (
f" [{position['parent_kind']}.{position['role']}={position['label']}]"
if position
else ""
)
depth_indent = " " * runnable["depth"]
print(f"{depth_indent}- {runnable['kind']}: {runnable['name']}{loc}")

print()
print(f"== LCEL agent.code events ({len(runnable_codes)}) ==")
for e in runnable_codes:
payload = e["payload"]
depth_indent = " " * payload["depth"]
marker = ""
if payload.get("passthrough"):
marker = " (passthrough)"
elif payload.get("fingerprint"):
marker = f" fp={payload['fingerprint']}"
duration = payload.get("duration_ns")
dur_ms = f" {duration / 1e6:.2f}ms" if duration is not None else ""
print(f"{depth_indent}- {payload['kind']}: {payload['name']}{marker}{dur_ms}")

print()
print(f"== chain.composition snapshot ({len(composition_events)}) ==")
for e in composition_events:
comp = e["payload"]["composition"]
print(
f" root={comp['root_kind']} ({comp['root_name']!r}) "
f"nodes={comp['node_count']} max_depth={comp['max_depth']} "
f"status={comp['status']}"
)
print(f" kind_counts: {comp['kind_counts']}")
# First few nodes for visibility.
print(" nodes (first 6):")
for node in comp["nodes"][:6]:
label = ""
if node.get("position"):
label = (
f" [{node['position']['parent_kind']}."
f"{node['position']['role']}={node['position']['label']}]"
)
indent = " " + " " * node["depth"]
print(f"{indent}- depth={node['depth']} {node['kind']}: {node['name']}{label}")

print()
print("Sample complete. Verify the events match the executed pipeline:")
print(" RunnableSequence -> RunnableParallel(context, question) -> RunnableLambda")
print(" -> RunnableLambda -> RunnableBranch -> (short OR long)")
return 0


def _serialize(obj: Any) -> str:
try:
return json.dumps(obj, indent=2, default=str)
except (TypeError, ValueError):
return str(obj)


if __name__ == "__main__":
raise SystemExit(main())
Loading