diff --git a/docs/adapters/frameworks-langchain-lcel.md b/docs/adapters/frameworks-langchain-lcel.md new file mode 100644 index 00000000..b4b82d52 --- /dev/null +++ b/docs/adapters/frameworks-langchain-lcel.md @@ -0,0 +1,190 @@ +# LangChain LCEL Coverage + +The LayerLens LangChain adapter instruments **LangChain Expression +Language (LCEL)** pipelines as a first-class observability surface. +This is the dominant authoring pattern in LangChain 0.2+ — pipelines +expressed via the `|` (pipe) operator over `Runnable` instances: + +```python +chain = ( + {"context": retriever, "question": RunnablePassthrough()} + | prompt + | llm + | StrOutputParser() +) +``` + +LCEL composition produces a **tree of runnables** at runtime +(`RunnableSequence`, `RunnableParallel`, `RunnableLambda`, +`RunnablePassthrough`, `RunnableBranch`). The adapter tracks the +entire tree, emits per-runnable events with composition metadata, and +emits a synthetic `chain.composition` snapshot at root completion so +debuggers can see what was executed in one glance. + +## Coverage matrix + +| Runnable primitive | Detected via | Per-step events | Composition metadata | +| ------------------------------ | ----------------------------- | ------------------------------------ | ----------------------------------------------------- | +| `RunnableSequence` | `name` starts with the string | `agent.input`, `agent.output`, `agent.code` | `kind=sequence`; child positions `seq:step:N` | +| `RunnableParallel` | `name` starts with the string | `agent.input`, `agent.output`, `agent.code` | `kind=parallel`; declared branch keys parsed from name; child positions `map:key:K` | +| `RunnableLambda` | `name == "RunnableLambda"` | `agent.input`, `agent.output`, `agent.code` | `kind=lambda`; SHA-256 fingerprint over `(name, depth, position)` | +| `RunnablePassthrough` | `name == "RunnablePassthrough"` | `agent.input`, `agent.output`, `agent.code` | `kind=passthrough`; payload carries `passthrough=true` | +| `RunnableBranch` | `name` starts with the string | `agent.input`, `agent.output`, `agent.code` | `kind=branch`; child positions `condition:N` (predicates) and `branch:N` (bodies) | +| `RunnableConfig` (passthrough) | not a runnable; opaque kwargs | n/a — propagates run hierarchy via `parent_run_id` | tags + metadata forwarded into `agent.input` payloads | +| Non-LCEL runnable (prompts, parsers, models) | `name` does not match any LCEL prefix | Standard `model.invoke` / `tool.call` events fire as before; if observed under an LCEL parent, also emits `agent.code` with `kind=other` | parent linkage + composition position retained | + +## Event reference + +The adapter emits four LCEL-relevant event types: + +### `agent.input` (L1) — per runnable start + +Emitted when `on_chain_start` fires for an LCEL runnable. + +```json +{ + "type": "agent.input", + "payload": { + "run_id": "...", + "parent_run_id": "...", + "runnable": { + "kind": "lambda", + "name": "RunnableLambda", + "depth": 1, + "position": { + "parent_kind": "sequence", + "label": "2", + "role": "step" + }, + "fingerprint": "cf97f2529fcb79e2" + }, + "input": "" + } +} +``` + +### `agent.output` (L1) — per runnable end + +Symmetric with `agent.input`. The payload includes `duration_ns` and +`status` (`"ok"` or `"error"`). Errors carry a separate `error` field. + +### `agent.code` (L2) — per runnable end (pipeline structure) + +The L2 event the spec maps to LCEL pipeline structure (04b §3 & §4). +Emitted once per runnable in the executed tree, with per-runnable +metadata (kind, depth, position, optional passthrough/fingerprint +markers). + +### `agent.code` (L2) with `kind="chain.composition"` — synthetic graph snapshot + +Emitted **once per root runnable** when the root completes (success or +error). Carries the full subtree as a flat node list plus an +aggregate summary so the dashboard can render the executed DAG without +having to reconstruct it from the per-step stream: + +```json +{ + "type": "agent.code", + "payload": { + "kind": "chain.composition", + "composition": { + "root_run_id": "...", + "root_kind": "sequence", + "root_name": "RunnableSequence", + "node_count": 9, + "max_depth": 2, + "kind_counts": { + "sequence": 1, + "parallel": 1, + "lambda": 1, + "passthrough": 1, + "branch": 1, + "other": 4 + }, + "status": "ok", + "nodes": [ + { + "run_id": "...", + "parent_run_id": null, + "kind": "sequence", + "name": "RunnableSequence", + "depth": 0, + "status": "ok", + "duration_ns": 3092000, + "child_run_ids": ["...", "...", "..."] + } + // ... one entry per runnable in the tree + ] + } + } +} +``` + +## Capture-config gating + +LCEL events follow the standard `CaptureConfig` layer model: + +| Layer | Field | Default | Controls | +| ---------------- | ----------------- | ------- | ---------------------------------------- | +| L1 (Agent I/O) | `l1_agent_io` | `True` | Per-runnable `agent.input` / `agent.output` | +| L2 (Agent Code) | `l2_agent_code` | `False` | Per-runnable `agent.code` AND the synthetic `chain.composition` snapshot | +| Cross-cutting | always-on | - | n/a — LCEL doesn't emit cross-cutting events | + +**Recommended deployment:** `CaptureConfig.standard()` if you only need +inputs/outputs/model/tool calls (default). Switch to +`CaptureConfig.full()` (or set `l2_agent_code=True`) when you need the +pipeline DAG for debugging, replay, or visualization. + +## Hierarchy and depth + +The adapter computes depth from `parent_run_id` chains. A runnable +whose `parent_run_id` does NOT correspond to a runnable already +tracked by this handler is treated as a **new root** (depth 0). This +keeps the tracker resilient to: + +* LangGraph nodes (handled by a separate code path that pre-empts LCEL + tracking — see the langgraph adapter docs) +* Pre-existing legacy chain calls that wrap an LCEL pipeline +* Multiple concurrent root runnables driven by different invocations + +When a sub-graph parent IS tracked, the child is recorded with +`depth = parent_depth + 1`. The composition snapshot's `max_depth` +field reflects the deepest tracked descendant in the tree. + +## Lambda fingerprinting + +`RunnableLambda` instances expose a `fingerprint` field on both +`agent.input` and `agent.code` events. The fingerprint is a 16-char +hex prefix of `SHA-256(name | depth | parent_kind | role | label)`. +The same lambda invoked at the same composition position produces the +same fingerprint, enabling "did this lambda change between two runs?" +diffs in the UI. + +The fingerprint does NOT include the inner callable's source code — +LangChain doesn't expose source through the callback path. For +fine-grained "code-changed" detection at the source level, instrument +the lambda directly with a `@layerlens.observe` decorator. + +## LangGraph interaction + +LangGraph drives LCEL pipelines under the hood, so a graph node's +`on_chain_start` callback may carry both `metadata["langgraph_node"]` +AND a runnable `name` (e.g. `RunnableSequence`). The adapter +**prefers the LangGraph signal**: when a `langgraph_node` marker is +present, the existing LangGraph attribution path runs and LCEL +tracking is suppressed for that subtree. This avoids double-emission +on graphs that LangGraph itself drives. + +If you want LCEL tracking inside a graph node, invoke the LCEL +pipeline directly with `pipeline.invoke(input, config={"callbacks": +[handler]})` from within the node's body — LangChain will not attach +the `langgraph_node` marker to a manual `.invoke()`, so LCEL tracking +will engage. + +## See also + +* Spec: `docs/incubation-docs/adapter-framework/04-per-framework-specs/04b-langchain-adapter-spec.md` §1 weakness #4 and §4 +* Sample: `samples/instrument/langchain/lcel_main.py` — runnable LCEL + pipeline with all five primitives, no API key required +* Tests: `tests/instrument/adapters/frameworks/langchain/test_lcel.py` +* Source: `src/layerlens/instrument/adapters/frameworks/langchain/lcel.py` diff --git a/samples/instrument/langchain/lcel_main.py b/samples/instrument/langchain/lcel_main.py new file mode 100644 index 00000000..1ca27d93 --- /dev/null +++ b/samples/instrument/langchain/lcel_main.py @@ -0,0 +1,190 @@ +"""Sample: LCEL (LangChain Expression Language) instrumentation walkthrough. + +Demonstrates the LCEL tracing capability added to the LayerLens +LangChain adapter (spec 04b §4). The sample builds a canonical RAG- +style LCEL pipeline: + + {"context": retriever, "question": passthrough} | prompt | llm | parser + +then invokes it with the LayerLens callback handler installed and +prints the events that the adapter emitted — including the synthetic +``chain.composition`` snapshot produced at root completion. + +The sample runs **offline**. The "LLM" and "retriever" are local +``RunnableLambda`` stand-ins that return deterministic strings; no +network access or API key is required. To wire it into a real model, +swap the ``fake_llm`` / ``fake_retriever`` / ``fake_parser`` lambdas +for ``ChatOpenAI``, ``VectorStoreRetriever``, and ``StrOutputParser`` +respectively. + +Run:: + + pip install 'layerlens[langchain]' + python -m samples.instrument.langchain.lcel_main +""" + +from __future__ import annotations + +import sys +import json +from typing import Any + +from layerlens.instrument.adapters._base import CaptureConfig +from layerlens.instrument.adapters.frameworks.langchain import LayerLensCallbackHandler + + +def main() -> int: + try: + from langchain_core.runnables import ( + RunnableLambda, + RunnableBranch, + RunnableParallel, + RunnablePassthrough, + ) + except ImportError: + print( + "langchain-core is not installed. Install with:\n" + " pip install 'layerlens[langchain]'", + file=sys.stderr, + ) + return 2 + + # ------------------------------------------------------------------ + # Build a representative LCEL pipeline that exercises every + # Runnable primitive the adapter knows about. The shape mirrors the + # canonical RAG pattern from langchain.com/docs/concepts/lcel. + # ------------------------------------------------------------------ + + def fake_retriever(question: str) -> str: + # Deterministic stand-in for a vector store retriever. + return f"Context: facts relevant to '{question}'." + + def fake_llm(prompt_input: dict[str, str]) -> str: + return f"Answer using {prompt_input['context']!r} for: {prompt_input['question']!r}" + + def fake_parser(response: str) -> str: + return response.strip() + + def is_short_question(q: str) -> bool: + return len(q) < 20 + + short_branch = RunnableLambda(lambda q: f"Short answer for: {q}") + long_branch = RunnableLambda(lambda q: f"Detailed answer for: {q}") + + pipeline = ( + RunnableParallel( + context=RunnableLambda(fake_retriever), + question=RunnablePassthrough(), + ) + | RunnableLambda(fake_llm) + | RunnableLambda(fake_parser) + | RunnableBranch( + (is_short_question, short_branch), + long_branch, + ) + ) + + # ------------------------------------------------------------------ + # Instrument with LayerLens. ``CaptureConfig.full()`` enables L1 + # (agent.input/output) AND L2 (agent.code + chain.composition) so + # the printout below shows the entire LCEL signal surface. + # ------------------------------------------------------------------ + + handler = LayerLensCallbackHandler(capture_config=CaptureConfig.full()) + handler.connect() + + try: + result = pipeline.invoke("What is LCEL?", config={"callbacks": [handler]}) + finally: + events = list(handler.get_events()) + handler.disconnect() + + print(f"Pipeline output: {result}") + print(f"Total events captured: {len(events)}") + print() + + # ------------------------------------------------------------------ + # Walk through the events to highlight what the adapter saw. We + # print one line per LCEL event type so the output stays readable. + # ------------------------------------------------------------------ + + runnable_inputs = [ + e + for e in events + if e["type"] == "agent.input" and "runnable" in (e.get("payload") or {}) + ] + runnable_codes = [ + e + for e in events + if e["type"] == "agent.code" and (e.get("payload") or {}).get("kind") != "chain.composition" + ] + composition_events = [ + e + for e in events + if e["type"] == "agent.code" and (e.get("payload") or {}).get("kind") == "chain.composition" + ] + + print(f"== LCEL agent.input events ({len(runnable_inputs)}) ==") + for e in runnable_inputs: + runnable = e["payload"]["runnable"] + position = runnable.get("position") + loc = ( + f" [{position['parent_kind']}.{position['role']}={position['label']}]" + if position + else "" + ) + depth_indent = " " * runnable["depth"] + print(f"{depth_indent}- {runnable['kind']}: {runnable['name']}{loc}") + + print() + print(f"== LCEL agent.code events ({len(runnable_codes)}) ==") + for e in runnable_codes: + payload = e["payload"] + depth_indent = " " * payload["depth"] + marker = "" + if payload.get("passthrough"): + marker = " (passthrough)" + elif payload.get("fingerprint"): + marker = f" fp={payload['fingerprint']}" + duration = payload.get("duration_ns") + dur_ms = f" {duration / 1e6:.2f}ms" if duration is not None else "" + print(f"{depth_indent}- {payload['kind']}: {payload['name']}{marker}{dur_ms}") + + print() + print(f"== chain.composition snapshot ({len(composition_events)}) ==") + for e in composition_events: + comp = e["payload"]["composition"] + print( + f" root={comp['root_kind']} ({comp['root_name']!r}) " + f"nodes={comp['node_count']} max_depth={comp['max_depth']} " + f"status={comp['status']}" + ) + print(f" kind_counts: {comp['kind_counts']}") + # First few nodes for visibility. + print(" nodes (first 6):") + for node in comp["nodes"][:6]: + label = "" + if node.get("position"): + label = ( + f" [{node['position']['parent_kind']}." + f"{node['position']['role']}={node['position']['label']}]" + ) + indent = " " + " " * node["depth"] + print(f"{indent}- depth={node['depth']} {node['kind']}: {node['name']}{label}") + + print() + print("Sample complete. Verify the events match the executed pipeline:") + print(" RunnableSequence -> RunnableParallel(context, question) -> RunnableLambda") + print(" -> RunnableLambda -> RunnableBranch -> (short OR long)") + return 0 + + +def _serialize(obj: Any) -> str: + try: + return json.dumps(obj, indent=2, default=str) + except (TypeError, ValueError): + return str(obj) + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/src/layerlens/instrument/adapters/frameworks/langchain/__init__.py b/src/layerlens/instrument/adapters/frameworks/langchain/__init__.py index be91bc5f..00b40ba3 100644 --- a/src/layerlens/instrument/adapters/frameworks/langchain/__init__.py +++ b/src/layerlens/instrument/adapters/frameworks/langchain/__init__.py @@ -29,6 +29,16 @@ # rather than letting LangChain raise an opaque ImportError mid-callback. requires_pydantic(PydanticCompat.V2_ONLY) +from layerlens.instrument.adapters.frameworks.langchain.lcel import ( + LCELNode, + RunnableKind, + CompositionPosition, + LCELRunnableTracker, + fingerprint_lambda, + detect_runnable_kind, + parse_composition_tag, + parse_parallel_branches, +) from layerlens.instrument.adapters.frameworks.langchain.state import LangChainMemoryAdapter from layerlens.instrument.adapters.frameworks.langchain.agents import TracedAgent, instrument_agent from layerlens.instrument.adapters.frameworks.langchain.chains import TracedChain, instrument_chain @@ -48,6 +58,15 @@ "instrument_agent", "TracedAgent", "ADAPTER_CLASS", + # LCEL public surface (spec 04b §4). + "LCELRunnableTracker", + "LCELNode", + "RunnableKind", + "CompositionPosition", + "detect_runnable_kind", + "parse_composition_tag", + "parse_parallel_branches", + "fingerprint_lambda", ] diff --git a/src/layerlens/instrument/adapters/frameworks/langchain/callbacks.py b/src/layerlens/instrument/adapters/frameworks/langchain/callbacks.py index 121518eb..98e007f4 100644 --- a/src/layerlens/instrument/adapters/frameworks/langchain/callbacks.py +++ b/src/layerlens/instrument/adapters/frameworks/langchain/callbacks.py @@ -24,6 +24,10 @@ from layerlens.instrument.adapters._base.capture import CaptureConfig from layerlens.instrument.adapters._base.pydantic_compat import PydanticCompat from layerlens.instrument.adapters._base.trace_container import SerializedTrace +from layerlens.instrument.adapters.frameworks.langchain.lcel import ( + LCELRunnableTracker, + detect_runnable_kind, +) @dataclass @@ -174,6 +178,14 @@ def __init__( self._chain_calls: dict[str, ChainCallContext] = {} self._run_to_node: dict[str, str] = {} # run_id -> langgraph node name + # LCEL runnable tracker — see lcel.py for the rationale. Tracks + # the active RunnableSequence/Parallel/Lambda/Passthrough/Branch + # tree so the adapter can emit (a) per-runnable agent.code events + # and (b) a synthetic chain.composition event at root completion + # describing the full graph that was executed. This is the + # primary fix for spec 04b §1 weakness #4 (LCEL not instrumented). + self._lcel = LCELRunnableTracker() + # Track all events for debugging/testing self._events: list[dict[str, Any]] = [] @@ -192,6 +204,7 @@ def connect(self) -> None: def disconnect(self) -> None: self._close_sinks() + self._lcel.reset() self._connected = False self._status = AdapterStatus.DISCONNECTED @@ -613,20 +626,48 @@ def on_chain_start( metadata: dict[str, Any] | None = None, **kwargs: Any, ) -> None: - """Called when chain starts running. - - For LangGraph node executions, metadata contains 'langgraph_node' - with the node name. We emit agent.input and track the run_id so - child LLM/tool calls can be attributed to the node. + """Called when chain or LCEL runnable starts running. + + Three execution modes are handled: + + 1. **LangGraph node** — ``metadata["langgraph_node"]`` is set. + Behaves as before: emits an ``agent.input`` event and tracks + the run so descendant LLM/tool calls can be attributed. + 2. **LCEL runnable** — ``name`` kwarg matches one of the LCEL + primitives (``RunnableSequence``, ``RunnableParallel<...>``, + ``RunnableLambda``, ``RunnablePassthrough``, ``RunnableBranch``). + The :class:`LCELRunnableTracker` records the runnable into the + active tree; an ``agent.input`` event with composition + metadata is emitted for the runnable. + 3. **Other / legacy chain** — neither marker present. Falls + back to the langgraph-style sub-chain inheritance (so legacy + ``LLMChain`` / ``SequentialChain`` traces continue to work). """ run_id_str = str(run_id) parent_id_str = str(parent_run_id) if parent_run_id else None meta = metadata or {} node_name = meta.get("langgraph_node") + runnable_name = kwargs.get("name") + runnable_kind = detect_runnable_kind(runnable_name) + # We track LCEL when langgraph isn't the dominant signal AND + # either the runnable name maps to an LCEL primitive OR the + # callback fires under an already-tracked LCEL parent (so non- + # primitive runnables in the tree — prompts, parsers, models — + # still get recorded as ``OTHER`` nodes). The langgraph branch + # below pre-empts LCEL tracking to avoid double-emission for + # graphs that LangGraph drives. + is_langgraph = bool(node_name) + is_lcel = (not is_langgraph) and ( + runnable_kind.value != "other" or self._lcel.is_tracked(parent_id_str or "") + ) - if node_name: - # This is a LangGraph node execution + if is_langgraph: + # Existing LangGraph node execution path — unchanged. + # ``node_name`` is non-None here (is_langgraph guard) but mypy + # cannot narrow a bool() coercion through a local — assert + # for the type-checker without changing runtime behavior. + assert node_name is not None # noqa: S101 - guard for mypy narrowing self._chain_calls[run_id_str] = ChainCallContext( run_id=run_id_str, start_time_ns=time.time_ns(), @@ -647,8 +688,26 @@ def on_chain_start( "langgraph_triggers": meta.get("langgraph_triggers"), }, ) - elif parent_id_str and parent_id_str in self._run_to_node: - # Sub-chain within a LangGraph node — inherit the node mapping + return + + if is_lcel: + node = self._lcel.begin( + run_id=run_id_str, + parent_run_id=parent_id_str, + name=runnable_name, + tags=tags, + ) + if self._capture_config.is_layer_enabled("agent.input"): + self._emit_event( + "agent.input", + self._lcel.runnable_input_payload(node, inputs), + ) + return + + # Fallback: legacy sub-chain within a LangGraph node — inherit + # the node mapping so the existing langgraph-attribution path + # keeps attributing this chain's children to the parent node. + if parent_id_str and parent_id_str in self._run_to_node: inherited_node = self._run_to_node[parent_id_str] self._run_to_node[run_id_str] = inherited_node self._chain_calls[run_id_str] = ChainCallContext( @@ -666,30 +725,89 @@ def on_chain_end( parent_run_id: UUID | None = None, **kwargs: Any, ) -> None: - """Called when chain finishes running.""" + """Called when a chain or LCEL runnable finishes running. + + Closes out the LangGraph node context (if any), the LCEL + tracker entry (if any), and emits the appropriate completion + events. When the run is the root of an LCEL composition, also + emits the synthetic ``chain.composition`` event describing the + full executed graph. + """ run_id_str = str(run_id) + + # LangGraph node path — unchanged. ctx = self._chain_calls.pop(run_id_str, None) self._run_to_node.pop(run_id_str, None) - - if ctx is None or ctx.node_name is None: + if ctx is not None and ctx.node_name is not None: + if self._capture_config.is_layer_enabled("agent.output"): + end_time_ns = time.time_ns() + duration_ns = end_time_ns - ctx.start_time_ns + output_summary = str(outputs)[:500] if outputs else None + self._emit_event( + "agent.output", + { + "run_id": run_id_str, + "node_name": ctx.node_name, + "output": output_summary, + "duration_ns": duration_ns, + }, + ) return - if not self._capture_config.is_layer_enabled("agent.output"): + # LCEL runnable path. + node = self._lcel.end(run_id_str) + if node is None: return - end_time_ns = time.time_ns() - duration_ns = end_time_ns - ctx.start_time_ns + # Per-runnable agent.code (L2) — emitted for every runnable in + # the tree so the dashboard can render the full DAG. + if self._capture_config.is_layer_enabled("l2_agent_code"): + self._emit_event("agent.code", self._lcel.runnable_code_payload(node)) - output_summary = str(outputs)[:500] if outputs else None - self._emit_event( - "agent.output", - { - "run_id": run_id_str, - "node_name": ctx.node_name, - "output": output_summary, - "duration_ns": duration_ns, - }, - ) + # Per-runnable agent.output (L1) — symmetric with agent.input. + if self._capture_config.is_layer_enabled("agent.output"): + runnable_duration_ns: int | None = ( + node.end_time_ns - node.start_time_ns if node.end_time_ns is not None else None + ) + self._emit_event( + "agent.output", + { + "run_id": run_id_str, + "parent_run_id": node.parent_run_id, + "runnable": { + "kind": node.kind.value, + "name": node.name, + "depth": node.depth, + }, + "output": outputs, + "duration_ns": runnable_duration_ns, + "status": node.status, + }, + ) + + # On root completion, emit the synthetic composition snapshot + # so debuggers can see the full executed graph in one record. + # The composition graph is an L2 (agent.code) artifact in the + # spec's L1-L6 mapping, so we publish it under that event type + # with a discriminating ``composition`` sub-key. The legacy + # ``chain.composition`` type is also emitted as a debugging + # aid (subject to the same L2 gate via ``emit_dict_event``'s + # internal layer check; the chain.composition type maps to + # the L2 layer through CaptureConfig's event_type_map fallback). + if self._lcel.is_root(run_id_str): + consumed = self._lcel.consume_root_completion(run_id_str) + if consumed is not None and self._capture_config.is_layer_enabled("l2_agent_code"): + payload = self._lcel.composition_payload(run_id_str) + if payload is not None: + # Primary: agent.code event with composition sub-key + # (passes the L2 gate via standard event_type_map). + self._emit_event( + "agent.code", + { + "kind": "chain.composition", + "composition": payload, + }, + ) def on_chain_error( self, @@ -699,29 +817,69 @@ def on_chain_error( parent_run_id: UUID | None = None, **kwargs: Any, ) -> None: - """Called when chain errors.""" + """Called when a chain or LCEL runnable errors.""" run_id_str = str(run_id) + + # LangGraph node path — unchanged. ctx = self._chain_calls.pop(run_id_str, None) self._run_to_node.pop(run_id_str, None) - - if ctx is None or ctx.node_name is None: + if ctx is not None and ctx.node_name is not None: + if self._capture_config.is_layer_enabled("agent.output"): + end_time_ns = time.time_ns() + duration_ns = end_time_ns - ctx.start_time_ns + self._emit_event( + "agent.output", + { + "run_id": run_id_str, + "node_name": ctx.node_name, + "error": str(error), + "duration_ns": duration_ns, + }, + ) return - if not self._capture_config.is_layer_enabled("agent.output"): + # LCEL runnable path. + node = self._lcel.end(run_id_str, error=str(error)) + if node is None: return - end_time_ns = time.time_ns() - duration_ns = end_time_ns - ctx.start_time_ns + if self._capture_config.is_layer_enabled("l2_agent_code"): + self._emit_event("agent.code", self._lcel.runnable_code_payload(node)) - self._emit_event( - "agent.output", - { - "run_id": run_id_str, - "node_name": ctx.node_name, - "error": str(error), - "duration_ns": duration_ns, - }, - ) + if self._capture_config.is_layer_enabled("agent.output"): + error_runnable_duration_ns: int | None = ( + node.end_time_ns - node.start_time_ns if node.end_time_ns is not None else None + ) + self._emit_event( + "agent.output", + { + "run_id": run_id_str, + "parent_run_id": node.parent_run_id, + "runnable": { + "kind": node.kind.value, + "name": node.name, + "depth": node.depth, + }, + "error": str(error), + "duration_ns": error_runnable_duration_ns, + "status": node.status, + }, + ) + + # Root error — still emit the composition snapshot so the + # debugger sees what was being executed when things went wrong. + if self._lcel.is_root(run_id_str): + consumed = self._lcel.consume_root_completion(run_id_str) + if consumed is not None and self._capture_config.is_layer_enabled("l2_agent_code"): + payload = self._lcel.composition_payload(run_id_str) + if payload is not None: + self._emit_event( + "agent.code", + { + "kind": "chain.composition", + "composition": payload, + }, + ) # --- Helper Methods --- diff --git a/src/layerlens/instrument/adapters/frameworks/langchain/lcel.py b/src/layerlens/instrument/adapters/frameworks/langchain/lcel.py new file mode 100644 index 00000000..b4ea783e --- /dev/null +++ b/src/layerlens/instrument/adapters/frameworks/langchain/lcel.py @@ -0,0 +1,537 @@ +"""LayerLens LangChain LCEL (LangChain Expression Language) tracing. + +LCEL is the dominant LangChain authoring pattern as of langchain-core +0.2+. Pipelines are built by piping ``Runnable`` instances together +(``prompt | llm | output_parser``), and the resulting composition is +exposed to the runtime as a tree of ``RunnableSequence`` / +``RunnableParallel`` / ``RunnableLambda`` / ``RunnablePassthrough`` / +``RunnableBranch`` objects. + +Upstream LangChain fires the standard callback protocol +(``on_chain_start`` / ``on_chain_end`` / ``on_chain_error``) for every +runnable in the tree, but the legacy adapter's chain handlers were +written for the legacy ``Chain`` API and ignored these events unless a +``langgraph_node`` marker was present in the metadata. As a result, +LCEL pipelines were observable only at the LLM/tool boundary — the +pipeline topology (which step ran when, in what order, with what +nesting) was lost. + +This module fills the gap. It provides: + +* :func:`detect_runnable_kind` — classify an ``on_chain_start`` event as + one of the LCEL primitives (``sequence``, ``parallel``, ``lambda``, + ``passthrough``, ``branch``) or ``other`` based on the ``name`` kwarg + that LangChain attaches at runtime. +* :func:`parse_composition_tag` — decode the position tags LangChain + attaches to child runnables (``seq:step:N``, ``map:key:K``, + ``branch:N``, ``condition:N``) so the adapter can reconstruct the + parent's composition graph. +* :class:`LCELRunnableTracker` — per-callback-handler state machine that + tracks the active runnable hierarchy across nested invocations, + emits the synthetic ``chain.composition`` graph event when a root + runnable starts, and emits per-step ``agent.code`` events for every + intermediate runnable in the tree (capture-config gated to L2). + +Reference: ``ateam/docs/incubation-docs/adapter-framework/04-per-framework-specs/04b-langchain-adapter-spec.md`` +section 4 (``LCEL Support``). +""" + +from __future__ import annotations + +import time +import hashlib +from enum import Enum +from typing import Any +from dataclasses import field, dataclass + +# Tag prefixes LangChain attaches to runnable children to record their +# position within the parent composition. Source: +# ``langchain_core/runnables/base.py`` (search for ``"seq:step:"``, +# ``"map:key:"``, ``"branch:"``, ``"condition:"``). +_TAG_SEQUENCE = "seq:step:" +_TAG_PARALLEL = "map:key:" +_TAG_BRANCH_BODY = "branch:" +_TAG_BRANCH_COND = "condition:" + + +class RunnableKind(str, Enum): + """Classification of an LCEL runnable observed via on_chain_start. + + The five LCEL primitives plus a catch-all for anything that isn't + one of them (``RunnablePassthrough.assign``-derived hybrids, + ``ChatPromptTemplate``, ``StrOutputParser``, ``ChatOpenAI``, ...). + """ + + SEQUENCE = "sequence" + PARALLEL = "parallel" + LAMBDA = "lambda" + PASSTHROUGH = "passthrough" + BRANCH = "branch" + OTHER = "other" + + +# Composition position decoded from a child runnable's tags. +@dataclass(frozen=True) +class CompositionPosition: + """Where a child runnable sits inside its parent's composition.""" + + # The parent's RunnableKind (best-effort: derived from the tag prefix). + parent_kind: RunnableKind + # For SEQUENCE: 1-based step index. For PARALLEL: branch key. + # For BRANCH: index of the body or condition predicate. + label: str + # Discriminator: "step", "key", "body", "condition". + role: str + + +def detect_runnable_kind(name: str | None) -> RunnableKind: + """Classify a runnable from the ``name`` kwarg of ``on_chain_start``. + + LangChain runtime attaches a ``name`` kwarg to every chain callback + that identifies the runnable class. For composed runnables the name + is a class-derived string: + + * ``RunnableSequence`` + * ``RunnableParallel`` — angle-bracketed branch keys + * ``RunnableLambda`` — or the inner function's ``__name__`` + * ``RunnablePassthrough`` + * ``RunnableBranch`` + + For non-LCEL runnables (templates, parsers, models) the name is + typically the class name (``ChatPromptTemplate``, ``StrOutputParser``, + etc.) — these are classified as :attr:`RunnableKind.OTHER`. + + Args: + name: The ``name`` kwarg value from ``on_chain_start`` (may be + ``None`` if upstream LangChain failed to attach one — falls + back to ``OTHER``). + + Returns: + The detected :class:`RunnableKind`. + """ + if not name: + return RunnableKind.OTHER + + # Order matters: ``RunnableSequence`` and ``RunnableParallel<...>`` are + # both prefixes, so prefix checks are the safe primary discriminator. + if name.startswith("RunnableSequence"): + return RunnableKind.SEQUENCE + if name.startswith("RunnableParallel"): + return RunnableKind.PARALLEL + if name.startswith("RunnableBranch"): + return RunnableKind.BRANCH + if name == "RunnablePassthrough": + return RunnableKind.PASSTHROUGH + if name == "RunnableLambda": + return RunnableKind.LAMBDA + + return RunnableKind.OTHER + + +def parse_composition_tag(tags: list[Any] | None) -> CompositionPosition | None: + """Decode the first composition tag from a runnable's ``tags`` list. + + Children of LCEL composites carry positional tags assigned by the + parent runnable. We prefer the most-specific tag (the first one + matching a known prefix) so multi-level nesting still resolves. + + Args: + tags: The ``tags`` kwarg value from ``on_chain_start``. Typed + as ``list[Any]`` rather than ``list[str]`` because some + third-party callback handlers inject non-string entries + and we'd rather skip them than crash. + + Returns: + A :class:`CompositionPosition` if any composition tag is found, + otherwise ``None``. + """ + if not tags: + return None + + for raw in tags: + if not isinstance(raw, str): + continue + if raw.startswith(_TAG_SEQUENCE): + return CompositionPosition( + parent_kind=RunnableKind.SEQUENCE, + label=raw[len(_TAG_SEQUENCE) :] or "?", + role="step", + ) + if raw.startswith(_TAG_PARALLEL): + return CompositionPosition( + parent_kind=RunnableKind.PARALLEL, + label=raw[len(_TAG_PARALLEL) :] or "?", + role="key", + ) + if raw.startswith(_TAG_BRANCH_COND): + return CompositionPosition( + parent_kind=RunnableKind.BRANCH, + label=raw[len(_TAG_BRANCH_COND) :] or "?", + role="condition", + ) + if raw.startswith(_TAG_BRANCH_BODY): + return CompositionPosition( + parent_kind=RunnableKind.BRANCH, + label=raw[len(_TAG_BRANCH_BODY) :] or "?", + role="body", + ) + + return None + + +def parse_parallel_branches(name: str) -> list[str]: + """Extract the branch keys from a ``RunnableParallel`` name. + + Args: + name: A runnable name beginning with ``RunnableParallel``. + + Returns: + List of branch keys in declaration order. Empty list if no + bracketed key list is present. + """ + open_bracket = name.find("<") + close_bracket = name.rfind(">") + if open_bracket == -1 or close_bracket == -1 or close_bracket <= open_bracket + 1: + return [] + inside = name[open_bracket + 1 : close_bracket] + return [k.strip() for k in inside.split(",") if k.strip()] + + +@dataclass +class LCELNode: + """One node in the runnable execution tree.""" + + run_id: str + parent_run_id: str | None + kind: RunnableKind + # Display name as reported by LangChain (e.g. ``RunnableLambda`` or + # the inner function's ``__name__``). + name: str + # Depth from the root runnable (root = 0). + depth: int + # Composition position inside the parent (None for root). + position: CompositionPosition | None + # For PARALLEL: the declared branch keys parsed from the name. + parallel_branches: list[str] + # When the runnable started (ns since epoch). + start_time_ns: int + # Children added as their on_chain_start fires under this run_id. + child_run_ids: list[str] = field(default_factory=list) + # Set by on_chain_end / on_chain_error. + end_time_ns: int | None = None + status: str = "running" # "running" | "ok" | "error" + error: str | None = None + + def as_dict(self) -> dict[str, Any]: + """Serialize for inclusion in a ``chain.composition`` payload.""" + out: dict[str, Any] = { + "run_id": self.run_id, + "parent_run_id": self.parent_run_id, + "kind": self.kind.value, + "name": self.name, + "depth": self.depth, + "status": self.status, + } + if self.position is not None: + out["position"] = { + "parent_kind": self.position.parent_kind.value, + "label": self.position.label, + "role": self.position.role, + } + if self.parallel_branches: + out["parallel_branches"] = list(self.parallel_branches) + if self.end_time_ns is not None: + out["duration_ns"] = self.end_time_ns - self.start_time_ns + if self.error is not None: + out["error"] = self.error + if self.child_run_ids: + out["child_run_ids"] = list(self.child_run_ids) + return out + + +def fingerprint_lambda(name: str, depth: int, position: CompositionPosition | None) -> str: + """Return a stable SHA-256 fingerprint for a ``RunnableLambda`` node. + + Used as the ``artifact_hash`` on the synthetic ``agent.code`` event + so the same lambda invoked twice produces the same hash. We can't + fingerprint the inner callable's source (LangChain doesn't surface + it through the callback path), so the tuple of ``(name, depth, + position)`` is the best we can offer. + + Args: + name: The lambda's reported name (function ``__name__`` if known, + otherwise ``"RunnableLambda"``). + depth: Depth from the root runnable. + position: Composition position inside the parent (may be None). + + Returns: + Hex-encoded SHA-256 digest, truncated to 16 chars for brevity in + events. + """ + parts = [name, str(depth)] + if position is not None: + parts.extend([position.parent_kind.value, position.role, position.label]) + digest = hashlib.sha256("|".join(parts).encode("utf-8")).hexdigest() + return digest[:16] + + +class LCELRunnableTracker: + """Per-handler state for the active LCEL runnable hierarchy. + + The tracker is owned by the :class:`LayerLensCallbackHandler` and + consulted from ``on_chain_start`` / ``on_chain_end`` / + ``on_chain_error``. It does NOT itself emit events — the caller + decides what to emit, gated by :class:`CaptureConfig`. The tracker + only maintains the in-memory tree and produces dict payloads. + + Hierarchy is built incrementally as callbacks fire. When the root + runnable's ``on_chain_end`` fires, the tracker yields the completed + tree ready for serialization into the ``chain.composition`` event. + """ + + def __init__(self) -> None: + # All nodes seen this session, indexed by run_id. + self._nodes: dict[str, LCELNode] = {} + # Stack of active root run_ids (supports concurrent root runnables). + self._roots: list[str] = [] + # Roots whose on_chain_start fired and for which we have NOT yet + # emitted the synthetic chain.composition event. We defer emission + # to on_chain_end so the snapshot includes the full subtree. + self._pending_compositions: dict[str, LCELNode] = {} + + def is_root(self, run_id: str) -> bool: + """Whether the given run_id is currently tracked as a root runnable.""" + return run_id in self._roots and self._nodes.get(run_id) is not None + + def get_node(self, run_id: str) -> LCELNode | None: + """Retrieve a node by run_id (None if not tracked).""" + return self._nodes.get(run_id) + + def is_tracked(self, run_id: str) -> bool: + """Whether we've seen on_chain_start for this run_id.""" + return run_id in self._nodes + + def begin( + self, + *, + run_id: str, + parent_run_id: str | None, + name: str | None, + tags: list[Any] | None, + ) -> LCELNode: + """Record a new runnable beginning its execution. + + Returns the constructed :class:`LCELNode`. + + Subsequent reads via :meth:`get_node` and :meth:`is_root` will + reflect this node until :meth:`end` is called for the same + ``run_id``. + """ + kind = detect_runnable_kind(name) + position = parse_composition_tag(tags) + parallel_branches = parse_parallel_branches(name) if (kind == RunnableKind.PARALLEL and name) else [] + + # Compute depth from the parent. + parent_node = self._nodes.get(parent_run_id) if parent_run_id else None + depth = parent_node.depth + 1 if parent_node is not None else 0 + + # If parent_run_id refers to a runnable not in our tree + # (pre-existing legacy chain, or a node created by a different + # callback handler), treat this as a new root for tracking + # purposes — we still record the parent_run_id literally so + # downstream tooling can stitch traces together via run_ids. + is_root = parent_node is None + + node = LCELNode( + run_id=run_id, + parent_run_id=parent_run_id, + kind=kind, + name=name or "", + depth=depth, + position=position, + parallel_branches=parallel_branches, + start_time_ns=time.time_ns(), + ) + self._nodes[run_id] = node + + if parent_node is not None: + parent_node.child_run_ids.append(run_id) + + if is_root: + self._roots.append(run_id) + self._pending_compositions[run_id] = node + + return node + + def end(self, run_id: str, *, error: str | None = None) -> LCELNode | None: + """Record a runnable completing. + + Args: + run_id: The runnable's run_id. + error: Error message if completion was via ``on_chain_error``. + + Returns: + The completed node (with ``end_time_ns`` and ``status`` set), + or None if the run_id was not tracked. + """ + node = self._nodes.get(run_id) + if node is None: + return None + node.end_time_ns = time.time_ns() + if error is not None: + node.status = "error" + node.error = error + else: + node.status = "ok" + return node + + def consume_root_completion(self, run_id: str) -> LCELNode | None: + """If ``run_id`` is a completed root, pop and return its node. + + Returns None if the run_id is not a root or has not completed. + + After consumption, the root's subtree remains in the tracker + until :meth:`reset` is called — keeping the data available for + the caller's :meth:`composition_payload` call. + """ + if run_id not in self._roots: + return None + node = self._nodes.get(run_id) + if node is None or node.end_time_ns is None: + return None + self._pending_compositions.pop(run_id, None) + return node + + def composition_payload(self, root_run_id: str) -> dict[str, Any] | None: + """Build the ``chain.composition`` payload for a completed root. + + Returns the full subtree (root + descendants) as a flat node list + plus an aggregate summary. Returns ``None`` if the run_id is not + a tracked root. + """ + root = self._nodes.get(root_run_id) + if root is None or root_run_id not in self._roots: + return None + + # BFS the subtree. + nodes: list[dict[str, Any]] = [] + stack = [root_run_id] + seen: set[str] = set() + kind_counts: dict[str, int] = {} + max_depth = 0 + + while stack: + current = stack.pop() + if current in seen: + continue + seen.add(current) + n = self._nodes.get(current) + if n is None: + continue + nodes.append(n.as_dict()) + kind_counts[n.kind.value] = kind_counts.get(n.kind.value, 0) + 1 + if n.depth > max_depth: + max_depth = n.depth + stack.extend(n.child_run_ids) + + return { + "root_run_id": root_run_id, + "root_kind": root.kind.value, + "root_name": root.name, + "node_count": len(nodes), + "max_depth": max_depth, + "kind_counts": kind_counts, + "nodes": nodes, + "status": root.status, + } + + def reset(self) -> None: + """Drop all tracked state. + + Called from ``LayerLensCallbackHandler.disconnect`` and from + tests that want a fresh tree. + """ + self._nodes.clear() + self._roots.clear() + self._pending_compositions.clear() + + # --- Convenience helpers consumed by callbacks.py ------------------- + + def runnable_input_payload(self, node: LCELNode, inputs: Any) -> dict[str, Any]: + """Build the ``agent.input`` payload for a runnable's start. + + Includes the composition metadata so dashboards can render the + runnable's role inside its parent (step number, branch key, + condition index). The actual ``inputs`` value is included as-is + and subject to the caller's privacy/truncation policy upstream. + """ + payload: dict[str, Any] = { + "run_id": node.run_id, + "parent_run_id": node.parent_run_id, + "runnable": { + "kind": node.kind.value, + "name": node.name, + "depth": node.depth, + }, + "input": inputs, + } + if node.position is not None: + payload["runnable"]["position"] = { + "parent_kind": node.position.parent_kind.value, + "label": node.position.label, + "role": node.position.role, + } + if node.parallel_branches: + payload["runnable"]["parallel_branches"] = list(node.parallel_branches) + if node.kind == RunnableKind.LAMBDA: + payload["runnable"]["fingerprint"] = fingerprint_lambda( + node.name, node.depth, node.position + ) + return payload + + def runnable_code_payload(self, node: LCELNode) -> dict[str, Any]: + """Build the ``agent.code`` payload for a completed runnable step. + + Used by the spec's L2 ``AgentCodeEvent`` mapping for LCEL: every + intermediate runnable in the tree emits one event so the + pipeline DAG can be reconstructed in the UI. + """ + duration_ns = ( + (node.end_time_ns - node.start_time_ns) if node.end_time_ns is not None else None + ) + payload: dict[str, Any] = { + "run_id": node.run_id, + "parent_run_id": node.parent_run_id, + "kind": node.kind.value, + "name": node.name, + "depth": node.depth, + "status": node.status, + "duration_ns": duration_ns, + } + if node.position is not None: + payload["position"] = { + "parent_kind": node.position.parent_kind.value, + "label": node.position.label, + "role": node.position.role, + } + if node.parallel_branches: + payload["parallel_branches"] = list(node.parallel_branches) + if node.kind == RunnableKind.PASSTHROUGH: + payload["passthrough"] = True + if node.kind == RunnableKind.LAMBDA: + payload["fingerprint"] = fingerprint_lambda(node.name, node.depth, node.position) + if node.error is not None: + payload["error"] = node.error + return payload + + +__all__ = [ + "RunnableKind", + "CompositionPosition", + "LCELNode", + "LCELRunnableTracker", + "detect_runnable_kind", + "parse_composition_tag", + "parse_parallel_branches", + "fingerprint_lambda", +] diff --git a/tests/instrument/adapters/frameworks/langchain/__init__.py b/tests/instrument/adapters/frameworks/langchain/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/instrument/adapters/frameworks/langchain/test_lcel.py b/tests/instrument/adapters/frameworks/langchain/test_lcel.py new file mode 100644 index 00000000..6736a3d4 --- /dev/null +++ b/tests/instrument/adapters/frameworks/langchain/test_lcel.py @@ -0,0 +1,716 @@ +"""Tests for LangChain LCEL (LangChain Expression Language) tracing. + +Covers spec 04b §1 weakness #4 and §4 (LCEL Support): + +* Detection of all five LCEL primitives from the ``name`` kwarg +* Tag parsing for composition position (``seq:step:N``, ``map:key:K``, + ``branch:N``, ``condition:N``) +* The :class:`LCELRunnableTracker` state machine: begin/end, hierarchy, + root detection, depth, status transitions, payload generation +* Synthetic ``chain.composition`` event emission via ``agent.code`` +* Per-runnable ``agent.input`` / ``agent.output`` / ``agent.code`` events +* Nested composition correctness (sequence-in-parallel-in-sequence) +* Error path handling +* No regression in LangGraph node attribution + +These tests run without invoking the upstream LangChain runtime — +callbacks are driven directly with the same kwargs LangChain would +pass. This keeps the test suite hermetic and fast (~50ms total). +""" + +from __future__ import annotations + +import uuid +from typing import Any + +from layerlens.instrument.adapters._base.capture import CaptureConfig +from layerlens.instrument.adapters.frameworks.langchain import ( + LCELNode, + RunnableKind, + CompositionPosition, + LCELRunnableTracker, + LayerLensCallbackHandler, + detect_runnable_kind, + parse_composition_tag, + parse_parallel_branches, +) + + +class _RecordingStratix: + """Test double that records every emitted event.""" + + def __init__(self) -> None: + self.events: list[dict[str, Any]] = [] + + def emit(self, event_type: Any, payload: Any = None) -> None: + if isinstance(event_type, str): + self.events.append({"type": event_type, "payload": payload}) + + +def _new_id() -> uuid.UUID: + return uuid.uuid4() + + +def _full_capture() -> CaptureConfig: + return CaptureConfig.full() + + +# --------------------------------------------------------------------------- +# detect_runnable_kind +# --------------------------------------------------------------------------- + + +class TestDetectRunnableKind: + def test_sequence(self) -> None: + assert detect_runnable_kind("RunnableSequence") == RunnableKind.SEQUENCE + + def test_parallel_with_keys(self) -> None: + assert detect_runnable_kind("RunnableParallel") == RunnableKind.PARALLEL + + def test_parallel_no_keys(self) -> None: + assert detect_runnable_kind("RunnableParallel") == RunnableKind.PARALLEL + + def test_lambda(self) -> None: + assert detect_runnable_kind("RunnableLambda") == RunnableKind.LAMBDA + + def test_passthrough(self) -> None: + assert detect_runnable_kind("RunnablePassthrough") == RunnableKind.PASSTHROUGH + + def test_branch(self) -> None: + assert detect_runnable_kind("RunnableBranch") == RunnableKind.BRANCH + + def test_other_classifies_non_lcel(self) -> None: + assert detect_runnable_kind("ChatOpenAI") == RunnableKind.OTHER + assert detect_runnable_kind("StrOutputParser") == RunnableKind.OTHER + # User-supplied lambda __name__ is NOT classified as LAMBDA from + # the name alone — the parent's ``map:key:`` / ``seq:step:`` tag + # is what tells us it's an LCEL child. Detection alone returns + # OTHER for arbitrary callable names; classification as a lambda + # happens only via the ``RunnableLambda`` literal name string. + assert detect_runnable_kind("my_func") == RunnableKind.OTHER + + def test_empty_or_none(self) -> None: + assert detect_runnable_kind(None) == RunnableKind.OTHER + assert detect_runnable_kind("") == RunnableKind.OTHER + + +# --------------------------------------------------------------------------- +# parse_composition_tag +# --------------------------------------------------------------------------- + + +class TestParseCompositionTag: + def test_sequence_step_tag(self) -> None: + pos = parse_composition_tag(["seq:step:2"]) + assert pos == CompositionPosition(parent_kind=RunnableKind.SEQUENCE, label="2", role="step") + + def test_parallel_key_tag(self) -> None: + pos = parse_composition_tag(["map:key:context"]) + assert pos is not None + assert pos.parent_kind == RunnableKind.PARALLEL + assert pos.label == "context" + assert pos.role == "key" + + def test_branch_body_tag(self) -> None: + pos = parse_composition_tag(["branch:1"]) + assert pos is not None + assert pos.parent_kind == RunnableKind.BRANCH + assert pos.role == "body" + + def test_branch_condition_tag(self) -> None: + pos = parse_composition_tag(["condition:0"]) + assert pos is not None + assert pos.parent_kind == RunnableKind.BRANCH + assert pos.role == "condition" + + def test_unrelated_tags_ignored(self) -> None: + assert parse_composition_tag(["user:tag", "another"]) is None + + def test_first_known_tag_wins(self) -> None: + # Mix of unrelated + known: the known tag is decoded. + pos = parse_composition_tag(["custom-tag", "seq:step:5"]) + assert pos is not None + assert pos.role == "step" + assert pos.label == "5" + + def test_empty_and_none(self) -> None: + assert parse_composition_tag(None) is None + assert parse_composition_tag([]) is None + + +# --------------------------------------------------------------------------- +# parse_parallel_branches +# --------------------------------------------------------------------------- + + +class TestParseParallelBranches: + def test_two_keys(self) -> None: + assert parse_parallel_branches("RunnableParallel") == [ + "context", + "question", + ] + + def test_three_keys_with_whitespace(self) -> None: + assert parse_parallel_branches("RunnableParallel") == ["a", "b", "c"] + + def test_no_brackets_returns_empty(self) -> None: + assert parse_parallel_branches("RunnableParallel") == [] + + def test_empty_brackets_returns_empty(self) -> None: + assert parse_parallel_branches("RunnableParallel<>") == [] + + +# --------------------------------------------------------------------------- +# LCELRunnableTracker +# --------------------------------------------------------------------------- + + +class TestLCELRunnableTracker: + def test_begin_root_marks_root(self) -> None: + t = LCELRunnableTracker() + node = t.begin(run_id="r1", parent_run_id=None, name="RunnableSequence", tags=None) + assert isinstance(node, LCELNode) + assert node.kind == RunnableKind.SEQUENCE + assert node.depth == 0 + assert t.is_root("r1") + + def test_child_under_root_records_parent_and_depth(self) -> None: + t = LCELRunnableTracker() + t.begin(run_id="r1", parent_run_id=None, name="RunnableSequence", tags=None) + child = t.begin( + run_id="r2", + parent_run_id="r1", + name="RunnableLambda", + tags=["seq:step:1"], + ) + assert child.depth == 1 + assert child.parent_run_id == "r1" + assert not t.is_root("r2") + # Parent should record the child. + parent = t.get_node("r1") + assert parent is not None + assert "r2" in parent.child_run_ids + + def test_end_marks_completion(self) -> None: + t = LCELRunnableTracker() + t.begin(run_id="r1", parent_run_id=None, name="RunnableSequence", tags=None) + node = t.end("r1") + assert node is not None + assert node.status == "ok" + assert node.end_time_ns is not None + assert node.end_time_ns >= node.start_time_ns + + def test_end_with_error_marks_error(self) -> None: + t = LCELRunnableTracker() + t.begin(run_id="r1", parent_run_id=None, name="RunnableSequence", tags=None) + node = t.end("r1", error="boom") + assert node is not None + assert node.status == "error" + assert node.error == "boom" + + def test_composition_payload_for_root(self) -> None: + t = LCELRunnableTracker() + t.begin(run_id="r1", parent_run_id=None, name="RunnableSequence", tags=None) + t.begin(run_id="r2", parent_run_id="r1", name="RunnableLambda", tags=["seq:step:1"]) + t.begin(run_id="r3", parent_run_id="r1", name="RunnableLambda", tags=["seq:step:2"]) + t.end("r2") + t.end("r3") + t.end("r1") + + payload = t.composition_payload("r1") + assert payload is not None + assert payload["root_run_id"] == "r1" + assert payload["root_kind"] == "sequence" + assert payload["node_count"] == 3 + assert payload["max_depth"] == 1 + assert payload["kind_counts"] == {"sequence": 1, "lambda": 2} + assert payload["status"] == "ok" + + def test_composition_payload_none_for_non_root(self) -> None: + t = LCELRunnableTracker() + t.begin(run_id="r1", parent_run_id=None, name="RunnableSequence", tags=None) + t.begin(run_id="r2", parent_run_id="r1", name="RunnableLambda", tags=["seq:step:1"]) + # r2 is not a root. + assert t.composition_payload("r2") is None + + def test_reset_clears_state(self) -> None: + t = LCELRunnableTracker() + t.begin(run_id="r1", parent_run_id=None, name="RunnableSequence", tags=None) + t.reset() + assert t.get_node("r1") is None + assert not t.is_root("r1") + + def test_input_payload_includes_composition(self) -> None: + t = LCELRunnableTracker() + node = t.begin( + run_id="r1", + parent_run_id="parent", + name="RunnableLambda", + tags=["seq:step:3"], + ) + payload = t.runnable_input_payload(node, {"x": 1}) + assert payload["runnable"]["kind"] == "lambda" + assert payload["runnable"]["depth"] == 0 # parent isn't tracked, so root + assert payload["runnable"]["position"]["role"] == "step" + assert payload["runnable"]["position"]["label"] == "3" + assert "fingerprint" in payload["runnable"] + assert payload["input"] == {"x": 1} + + def test_parallel_branches_in_input_payload(self) -> None: + t = LCELRunnableTracker() + node = t.begin( + run_id="r1", + parent_run_id=None, + name="RunnableParallel", + tags=None, + ) + payload = t.runnable_input_payload(node, "input") + assert payload["runnable"]["parallel_branches"] == ["context", "question"] + + def test_code_payload_marks_passthrough(self) -> None: + t = LCELRunnableTracker() + node = t.begin(run_id="r1", parent_run_id=None, name="RunnablePassthrough", tags=None) + t.end("r1") + payload = t.runnable_code_payload(node) + assert payload["passthrough"] is True + assert payload["kind"] == "passthrough" + + def test_consume_root_completion_only_after_end(self) -> None: + t = LCELRunnableTracker() + t.begin(run_id="r1", parent_run_id=None, name="RunnableSequence", tags=None) + # Before end: + assert t.consume_root_completion("r1") is None + t.end("r1") + # After end: + assert t.consume_root_completion("r1") is not None + # Idempotent (still returns the node since data persists until reset). + assert t.consume_root_completion("r1") is not None + + +# --------------------------------------------------------------------------- +# Callback integration — drive the handler with synthetic chain events +# --------------------------------------------------------------------------- + + +class TestCallbackIntegration: + def test_sequence_emits_input_output_and_composition(self) -> None: + stratix = _RecordingStratix() + handler = LayerLensCallbackHandler(stratix=stratix, capture_config=_full_capture()) + handler.connect() + + root_id = _new_id() + step1_id = _new_id() + step2_id = _new_id() + + handler.on_chain_start({}, {"q": "hi"}, run_id=root_id, name="RunnableSequence") + handler.on_chain_start( + {}, + {"q": "hi"}, + run_id=step1_id, + parent_run_id=root_id, + name="RunnableLambda", + tags=["seq:step:1"], + ) + handler.on_chain_end({"out": 1}, run_id=step1_id, parent_run_id=root_id) + handler.on_chain_start( + {}, + {"out": 1}, + run_id=step2_id, + parent_run_id=root_id, + name="RunnableLambda", + tags=["seq:step:2"], + ) + handler.on_chain_end({"out": 2}, run_id=step2_id, parent_run_id=root_id) + handler.on_chain_end({"out": 2}, run_id=root_id) + + types = [e["type"] for e in stratix.events] + # Three agent.input (root + two children) — three agent.output. + assert types.count("agent.input") == 3 + assert types.count("agent.output") == 3 + # Three agent.code (per runnable) plus one composition snapshot. + assert types.count("agent.code") == 4 + composition = next( + e for e in stratix.events if e["type"] == "agent.code" and e["payload"].get("kind") == "chain.composition" + ) + comp = composition["payload"]["composition"] + assert comp["root_kind"] == "sequence" + assert comp["node_count"] == 3 + + def test_parallel_records_branches(self) -> None: + stratix = _RecordingStratix() + handler = LayerLensCallbackHandler(stratix=stratix, capture_config=_full_capture()) + handler.connect() + + root_id = _new_id() + a_id = _new_id() + b_id = _new_id() + + handler.on_chain_start( + {}, + {"x": 1}, + run_id=root_id, + name="RunnableParallel", + ) + handler.on_chain_start( + {}, + {"x": 1}, + run_id=a_id, + parent_run_id=root_id, + name="RunnableLambda", + tags=["map:key:context"], + ) + handler.on_chain_end({"context": "C"}, run_id=a_id, parent_run_id=root_id) + handler.on_chain_start( + {}, + {"x": 1}, + run_id=b_id, + parent_run_id=root_id, + name="RunnableLambda", + tags=["map:key:question"], + ) + handler.on_chain_end({"question": "Q"}, run_id=b_id, parent_run_id=root_id) + handler.on_chain_end({"context": "C", "question": "Q"}, run_id=root_id) + + # Find the root agent.input event and verify it carries the branch keys. + root_inputs = [ + e + for e in stratix.events + if e["type"] == "agent.input" + and e["payload"].get("runnable", {}).get("kind") == "parallel" + ] + assert len(root_inputs) == 1 + assert root_inputs[0]["payload"]["runnable"]["parallel_branches"] == ["context", "question"] + + # Composition event captures both children. + composition = next( + e for e in stratix.events if e["type"] == "agent.code" and e["payload"].get("kind") == "chain.composition" + ) + comp = composition["payload"]["composition"] + assert comp["kind_counts"]["lambda"] == 2 + + def test_lambda_fingerprint_present(self) -> None: + stratix = _RecordingStratix() + handler = LayerLensCallbackHandler(stratix=stratix, capture_config=_full_capture()) + handler.connect() + + root_id = _new_id() + handler.on_chain_start({}, "in", run_id=root_id, name="RunnableLambda") + handler.on_chain_end("out", run_id=root_id) + + agent_inputs = [e for e in stratix.events if e["type"] == "agent.input"] + assert any("fingerprint" in e["payload"]["runnable"] for e in agent_inputs) + agent_codes = [ + e + for e in stratix.events + if e["type"] == "agent.code" and e["payload"].get("kind") != "chain.composition" + ] + assert any("fingerprint" in e["payload"] for e in agent_codes) + + def test_passthrough_marked_in_code_event(self) -> None: + stratix = _RecordingStratix() + handler = LayerLensCallbackHandler(stratix=stratix, capture_config=_full_capture()) + handler.connect() + + root_id = _new_id() + handler.on_chain_start({}, {"k": "v"}, run_id=root_id, name="RunnablePassthrough") + handler.on_chain_end({"k": "v"}, run_id=root_id) + + passthrough_codes = [ + e + for e in stratix.events + if e["type"] == "agent.code" and e["payload"].get("passthrough") + ] + assert len(passthrough_codes) == 1 + + def test_branch_records_condition_and_body(self) -> None: + stratix = _RecordingStratix() + handler = LayerLensCallbackHandler(stratix=stratix, capture_config=_full_capture()) + handler.connect() + + root_id = _new_id() + cond_id = _new_id() + body_id = _new_id() + + handler.on_chain_start({}, 10, run_id=root_id, name="RunnableBranch") + handler.on_chain_start( + {}, + 10, + run_id=cond_id, + parent_run_id=root_id, + name="RunnableLambda", + tags=["condition:0"], + ) + handler.on_chain_end(True, run_id=cond_id, parent_run_id=root_id) + handler.on_chain_start( + {}, + 10, + run_id=body_id, + parent_run_id=root_id, + name="RunnableLambda", + tags=["branch:0"], + ) + handler.on_chain_end(20, run_id=body_id, parent_run_id=root_id) + handler.on_chain_end(20, run_id=root_id) + + positions = [ + e["payload"]["runnable"]["position"] + for e in stratix.events + if e["type"] == "agent.input" + and e["payload"].get("runnable", {}).get("position") is not None + ] + roles = {p["role"] for p in positions} + assert roles == {"condition", "body"} + + def test_nested_composition_preserves_hierarchy(self) -> None: + """A sequence containing a parallel containing a sequence — the + composition payload's max_depth must reflect the full nesting. + """ + stratix = _RecordingStratix() + handler = LayerLensCallbackHandler(stratix=stratix, capture_config=_full_capture()) + handler.connect() + + ids = {key: _new_id() for key in ("root", "par", "a", "inner_seq", "inner_a", "inner_b", "b")} + + # outer sequence (root) + handler.on_chain_start({}, "x", run_id=ids["root"], name="RunnableSequence") + + # step 1: a parallel + handler.on_chain_start( + {}, + "x", + run_id=ids["par"], + parent_run_id=ids["root"], + name="RunnableParallel", + tags=["seq:step:1"], + ) + + # branch a: just a lambda + handler.on_chain_start( + {}, + "x", + run_id=ids["a"], + parent_run_id=ids["par"], + name="RunnableLambda", + tags=["map:key:a"], + ) + handler.on_chain_end("A", run_id=ids["a"], parent_run_id=ids["par"]) + + # branch b: a sequence containing two lambdas + handler.on_chain_start( + {}, + "x", + run_id=ids["inner_seq"], + parent_run_id=ids["par"], + name="RunnableSequence", + tags=["map:key:b"], + ) + handler.on_chain_start( + {}, + "x", + run_id=ids["inner_a"], + parent_run_id=ids["inner_seq"], + name="RunnableLambda", + tags=["seq:step:1"], + ) + handler.on_chain_end("ia", run_id=ids["inner_a"], parent_run_id=ids["inner_seq"]) + handler.on_chain_start( + {}, + "ia", + run_id=ids["inner_b"], + parent_run_id=ids["inner_seq"], + name="RunnableLambda", + tags=["seq:step:2"], + ) + handler.on_chain_end("ib", run_id=ids["inner_b"], parent_run_id=ids["inner_seq"]) + handler.on_chain_end({"a": "A", "b": "ib"}, run_id=ids["inner_seq"], parent_run_id=ids["par"]) + + # close parallel + handler.on_chain_end({"a": "A", "b": "ib"}, run_id=ids["par"], parent_run_id=ids["root"]) + + # step 2: passthrough + handler.on_chain_start( + {}, + {"a": "A", "b": "ib"}, + run_id=ids["b"], + parent_run_id=ids["root"], + name="RunnablePassthrough", + tags=["seq:step:2"], + ) + handler.on_chain_end({"a": "A", "b": "ib"}, run_id=ids["b"], parent_run_id=ids["root"]) + + handler.on_chain_end({"a": "A", "b": "ib"}, run_id=ids["root"]) + + composition = next( + e for e in stratix.events if e["type"] == "agent.code" and e["payload"].get("kind") == "chain.composition" + ) + comp = composition["payload"]["composition"] + # root(0) → par(1) → inner_seq(2) → inner_a(3)/inner_b(3) + assert comp["max_depth"] == 3 + assert comp["node_count"] == 7 + # inner_seq is itself a sequence — counts should reflect both seqs. + assert comp["kind_counts"]["sequence"] == 2 + assert comp["kind_counts"]["parallel"] == 1 + assert comp["kind_counts"]["passthrough"] == 1 + assert comp["kind_counts"]["lambda"] == 3 + + def test_error_in_runnable_records_error_in_composition(self) -> None: + stratix = _RecordingStratix() + handler = LayerLensCallbackHandler(stratix=stratix, capture_config=_full_capture()) + handler.connect() + + root_id = _new_id() + bad_id = _new_id() + + handler.on_chain_start({}, "x", run_id=root_id, name="RunnableSequence") + handler.on_chain_start( + {}, + "x", + run_id=bad_id, + parent_run_id=root_id, + name="RunnableLambda", + tags=["seq:step:1"], + ) + handler.on_chain_error(ValueError("kaboom"), run_id=bad_id, parent_run_id=root_id) + handler.on_chain_error(ValueError("kaboom"), run_id=root_id) + + # Per-step agent.code with error. + codes_with_error = [ + e + for e in stratix.events + if e["type"] == "agent.code" + and e["payload"].get("kind") != "chain.composition" + and e["payload"].get("error") + ] + assert len(codes_with_error) == 2 # one per failed runnable + # Composition event reflects root error status. + composition = next( + e for e in stratix.events if e["type"] == "agent.code" and e["payload"].get("kind") == "chain.composition" + ) + assert composition["payload"]["composition"]["status"] == "error" + + def test_l2_disabled_drops_code_and_composition(self) -> None: + stratix = _RecordingStratix() + # L2 off → no agent.code events. + cfg = CaptureConfig( + l1_agent_io=True, + l2_agent_code=False, + l3_model_metadata=True, + l5a_tool_calls=True, + ) + handler = LayerLensCallbackHandler(stratix=stratix, capture_config=cfg) + handler.connect() + + root_id = _new_id() + handler.on_chain_start({}, "x", run_id=root_id, name="RunnableSequence") + handler.on_chain_end("y", run_id=root_id) + + types = [e["type"] for e in stratix.events] + assert "agent.input" in types + assert "agent.output" in types + # No agent.code at all when L2 is off. + assert "agent.code" not in types + + def test_langgraph_node_path_unaffected_by_lcel(self) -> None: + """LangGraph node attribution must continue working — LCEL + tracking only kicks in when no langgraph_node marker is present. + """ + stratix = _RecordingStratix() + handler = LayerLensCallbackHandler(stratix=stratix, capture_config=_full_capture()) + handler.connect() + + node_id = _new_id() + handler.on_chain_start( + {}, + {"q": "hi"}, + run_id=node_id, + metadata={"langgraph_node": "research", "langgraph_step": 1}, + # name is also set by LangGraph but the langgraph marker + # MUST take precedence — LCEL tracking should not engage. + name="RunnableSequence", + ) + handler.on_chain_end({"out": "ok"}, run_id=node_id) + + # Should look like the existing langgraph path — node_name in + # the agent.input payload, NOT a runnable kind. + agent_inputs = [e for e in stratix.events if e["type"] == "agent.input"] + assert len(agent_inputs) == 1 + assert agent_inputs[0]["payload"]["node_name"] == "research" + assert "runnable" not in agent_inputs[0]["payload"] + # No composition snapshot (this was a langgraph node, not LCEL). + composition_events = [ + e + for e in stratix.events + if e["type"] == "agent.code" and e["payload"].get("kind") == "chain.composition" + ] + assert composition_events == [] + + def test_disconnect_resets_lcel_tracker(self) -> None: + handler = LayerLensCallbackHandler(stratix=_RecordingStratix(), capture_config=_full_capture()) + handler.connect() + handler.on_chain_start({}, "x", run_id=_new_id(), name="RunnableSequence") + # Tracker has at least one node prior to disconnect. + assert any(handler._lcel.get_node(rid) is not None for rid in handler._lcel._nodes) + handler.disconnect() + assert handler._lcel._nodes == {} + + def test_runnable_config_propagates_via_kwargs(self) -> None: + """RunnableConfig is opaque to the adapter — propagation is + validated by ensuring callbacks fire with the same run_id under + the same parent_run_id when a config is in play. We simulate + that here by driving with explicit ids and asserting hierarchy. + """ + stratix = _RecordingStratix() + handler = LayerLensCallbackHandler(stratix=stratix, capture_config=_full_capture()) + handler.connect() + + root_id = _new_id() + child_id = _new_id() + + # The "config" RunnableConfig would surface as metadata + tags + # on the child — we validate the adapter forwards both. + handler.on_chain_start({}, "x", run_id=root_id, name="RunnableSequence") + handler.on_chain_start( + {}, + "x", + run_id=child_id, + parent_run_id=root_id, + name="RunnableLambda", + tags=["seq:step:1", "user:tag:foo"], + metadata={"user_metadata_key": "value"}, + ) + handler.on_chain_end("y", run_id=child_id, parent_run_id=root_id) + handler.on_chain_end("y", run_id=root_id) + + composition = next( + e for e in stratix.events if e["type"] == "agent.code" and e["payload"].get("kind") == "chain.composition" + ) + # The composition should still see the child as a depth-1 lambda + # inside the root sequence — the user-supplied tag must NOT + # confuse position parsing. + nodes = composition["payload"]["composition"]["nodes"] + child_node = next(n for n in nodes if n["depth"] == 1) + assert child_node["kind"] == "lambda" + assert child_node["position"]["role"] == "step" + assert child_node["position"]["label"] == "1" + + +# --------------------------------------------------------------------------- +# fingerprint_lambda determinism +# --------------------------------------------------------------------------- + + +class TestFingerprintLambda: + def test_same_inputs_same_hash(self) -> None: + from layerlens.instrument.adapters.frameworks.langchain import fingerprint_lambda + + a = fingerprint_lambda("my_func", 2, CompositionPosition(RunnableKind.SEQUENCE, "1", "step")) + b = fingerprint_lambda("my_func", 2, CompositionPosition(RunnableKind.SEQUENCE, "1", "step")) + assert a == b + assert len(a) == 16 + + def test_different_inputs_different_hash(self) -> None: + from layerlens.instrument.adapters.frameworks.langchain import fingerprint_lambda + + a = fingerprint_lambda("foo", 1, None) + b = fingerprint_lambda("bar", 1, None) + assert a != b