diff --git a/docs/adapters/frameworks-langfuse.md b/docs/adapters/frameworks-langfuse.md new file mode 100644 index 00000000..e7562a9c --- /dev/null +++ b/docs/adapters/frameworks-langfuse.md @@ -0,0 +1,321 @@ +# Langfuse framework adapter + +`layerlens.instrument.adapters.frameworks.langfuse.LangfuseAdapter` is a +**bidirectional trace import / export pipeline** between LayerLens and a +[Langfuse](https://langfuse.com) instance (cloud or self-hosted). + +Unlike the runtime-wrapping framework adapters (LangChain, LangGraph, +CrewAI, AutoGen, etc.), the Langfuse adapter does not monkey-patch any +in-process Python SDK. Langfuse is itself a remote observability +backend, so the adapter speaks to its REST API in batch: + +* **Import**: pull historical Langfuse traces (with their nested + observations) into LayerLens canonical events. Useful for backfilling + an existing Langfuse-instrumented agent into LayerLens for replay, + scoring, or cost analytics without re-running the agent. +* **Export**: push LayerLens canonical events back into Langfuse so a + team that already standardised on Langfuse for inspection can + continue using their existing Langfuse UI workflow. +* **Bidirectional**: run import + export in a single sync cycle with + cursor-based incremental progress and per-trace loop prevention. + +## Install + +```bash +pip install 'layerlens[langfuse-importer]' +``` + +The `[langfuse-importer]` extra is **empty** — the adapter talks to a +remote REST surface via `urllib` from the Python stdlib and pulls no +additional dependencies. The extra is declared anyway so the adapter +remains explicitly opt-in and the install set for `pip install +layerlens` is unchanged. + +You will need a Langfuse instance and a key pair from +[**Langfuse → Settings → Project Settings → API Keys**](https://langfuse.com/docs/get-started): + +| Setting | Where to find it | +|----------------|-------------------------------------------------------| +| `public_key` | Langfuse UI → Project Settings → API Keys → "pk-lf-…" | +| `secret_key` | Langfuse UI → Project Settings → API Keys → "sk-lf-…" | +| `host` | Your Langfuse base URL (default `https://cloud.langfuse.com`) | + +The adapter authenticates via HTTP **Basic auth** +(`base64(public_key:secret_key)`) — there is no separate OAuth flow. + +## Quick start + +```python +from datetime import datetime, timezone + +from layerlens.instrument.adapters.frameworks.langfuse import LangfuseAdapter +from layerlens.instrument.adapters.frameworks.langfuse.config import ( + LangfuseConfig, + SyncDirection, +) + +config = LangfuseConfig( + public_key="pk-lf-...", + secret_key="sk-lf-...", + host="https://cloud.langfuse.com", + mode=SyncDirection.IMPORT, # or EXPORT, or BIDIRECTIONAL + page_size=50, + max_retries=3, +) + +adapter = LangfuseAdapter(stratix=my_stratix_client, config=config) +adapter.connect() # runs a health check against /api/public/health + +# --- Import: backfill traces from Langfuse into LayerLens --- +result = adapter.import_traces( + since=datetime(2026, 1, 1, tzinfo=timezone.utc), + tags=["production"], + limit=200, +) +print( + f"Imported {result.imported_count} traces, " + f"skipped {result.skipped_count}, failed {result.failed_count}" +) + +# --- Export: push LayerLens canonical events back to Langfuse --- +events_by_trace = { + "trace-001": [ + {"event_type": "agent.input", "trace_id": "trace-001", "timestamp": "...", "sequence_id": 0, "payload": {...}}, + {"event_type": "model.invoke", "trace_id": "trace-001", "timestamp": "...", "sequence_id": 1, "payload": {...}}, + {"event_type": "agent.output", "trace_id": "trace-001", "timestamp": "...", "sequence_id": 2, "payload": {...}}, + ], +} +export_result = adapter.export_traces(events_by_trace=events_by_trace) +print(f"Exported {export_result.exported_count} traces to Langfuse") + +adapter.disconnect() +``` + +A fully runnable, mocked end-to-end sample lives in +[`samples/instrument/langfuse/`](../../samples/instrument/langfuse/). + +## Pipeline architecture + +The adapter is split into per-concern modules under +`layerlens.instrument.adapters.frameworks.langfuse`: + +| Module | Purpose | +|-----------------|----------------------------------------------------------| +| `lifecycle.py` | `LangfuseAdapter` — `BaseAdapter` subclass, public API | +| `config.py` | `LangfuseConfig`, `SyncState`, `SyncResult`, enums | +| `client.py` | `LangfuseAPIClient` — stdlib-only HTTP client | +| `importer.py` | `TraceImporter` — Langfuse → LayerLens batch backfill | +| `exporter.py` | `TraceExporter` — LayerLens → Langfuse batch push | +| `mapper.py` | Bidirectional trace ↔ canonical event mapping | +| `sync.py` | `BidirectionalSync` — cursor-tracked combined cycle | + +### Import pipeline (Langfuse → LayerLens) + +``` +LangfuseAPIClient.get_all_traces(...) # paginated /api/public/traces + │ + ▼ (per trace summary) +TraceImporter._is_quarantined / dedup / loop-prevention checks + │ + ▼ +LangfuseAPIClient.get_trace(trace_id) # full trace + observations + │ + ▼ +LangfuseToLayerLensMapper.map_trace(...) # → list[canonical event dict] + │ + ▼ +stratix.emit(event_type, payload) # injected into LayerLens pipeline + │ + ▼ +SyncState.record_import(...) # cursor + dedup set updated +``` + +Loop prevention: traces previously exported by LayerLens are tagged with +both `layerlens-exported` (canonical) and `stratix-exported` (legacy +alias for backward compatibility). The importer skips any trace +carrying either tag so an export → import round-trip does not double- +ingest. + +### Export pipeline (LayerLens → Langfuse) + +``` +LayerLensToLangfuseMapper.map_events_to_trace(events, trace_id) + │ + ▼ +LangfuseAPIClient.ingestion_batch([...]) # single /api/public/ingestion call + │ (one trace-create + N {generation,span}-create events) + ▼ +SyncState.record_export(trace_id, datetime.now(UTC)) +``` + +Quarantine + retries: the `LangfuseAPIClient` retries `429` and `5xx` +with capped exponential backoff (1s → 16s, configurable via +`LangfuseConfig.max_retries`). Per-trace failures are tracked in +`SyncState.quarantined_trace_ids`; traces that fail three times are +removed from subsequent import cycles until `clear_quarantine()` is +called. + +## Event mapping + +### Langfuse → LayerLens + +| Langfuse | LayerLens (canonical) | +|----------------------------------------------|------------------------------------| +| `trace.input` | `agent.input` (L1) | +| `trace.output` | `agent.output` (L1) | +| `trace.metadata` | `environment.config` (L4a) | +| `observation` type=`GENERATION` | `model.invoke` (L3) | +| `observation` type=`GENERATION` + `totalCost`| `cost.record` (cross) | +| `observation` type=`SPAN` (tool/metadata) | `tool.call` (L5a) | +| `observation` type=`SPAN` (other) | `agent.code` (L2) | +| `observation` `level`=`ERROR`/`WARNING` | `policy.violation` (cross) | + +Trace-level Langfuse metadata (`sessionId`, `userId`, `tags`, `scores`) +is propagated onto every emitted canonical event under the +`metadata.langfuse_*` namespace. + +### LayerLens → Langfuse + +| LayerLens (canonical) | Langfuse | +|------------------------|---------------------------------------------| +| `agent.input` | `trace.input` (+ optionally `trace.name`) | +| `agent.output` | `trace.output` | +| `environment.config` | `trace.metadata.environment_config` | +| `model.invoke` | observation type=`GENERATION` | +| `cost.record` | `totalCost` attached to matching generation | +| `tool.call` | observation type=`SPAN`, `metadata.type=TOOL` | +| `agent.code` | observation type=`SPAN` | +| `agent.handoff` | observation type=`SPAN`, `metadata.type=HANDOFF` | +| `agent.state.change` | observation type=`SPAN`, `metadata.type=STATE_CHANGE` | + +Exported traces are tagged with both `layerlens-exported` and +`stratix-exported` for loop prevention (see above). + +## Capability matrix + +| Capability | Supported | Notes | +|--------------------------------------|--------------------|-----------------------------------------------------------------------| +| `AdapterCapability.TRACE_TOOLS` | yes | Langfuse `SPAN` observations (incl. tool spans) → `tool.call` | +| `AdapterCapability.TRACE_MODELS` | yes | Langfuse `GENERATION` observations → `model.invoke` + `cost.record` | +| `AdapterCapability.REPLAY` | yes | `serialize_for_replay()` returns a `ReplayableTrace` of emitted events | +| Real-time runtime hooks | no | Langfuse is a remote backend — there is no in-process SDK to patch | +| Incremental sync (cursor) | yes | `SyncState.last_import_cursor` / `last_export_cursor` | +| Per-trace deduplication | yes | `SyncState.imported_trace_ids` / `exported_trace_ids` | +| Per-trace quarantine after N fails | yes | Default 3 failures; configurable via `record_failure(max_failures=…)` | +| Loop prevention (export → import) | yes | `layerlens-exported` / `stratix-exported` tag round-trip | +| Tag / project filtering on import | yes | `LangfuseConfig.tag_filter` and `LangfuseConfig.project_filter` | +| Time-window filtering on import | yes | `since` parameter on `import_traces()` and `LangfuseConfig.since` | +| Conflict resolution | last-write-wins | Configurable via `LangfuseConfig.conflict_strategy` (also `MANUAL`) | +| Auto-retry on `429` / `5xx` | yes | Capped exponential backoff (1s → 16s) | +| `pip install layerlens` blast radius | zero | `[langfuse-importer]` extra is empty (stdlib only) | + +## Version compatibility + +| Component | Supported | Notes | +|--------------------------|------------------------------------|--------------------------------------------------------| +| Python | 3.9, 3.10, 3.11, 3.12, 3.13 | Same as the rest of `layerlens.instrument` | +| Langfuse server / cloud | API revision matching `/api/public/*` (Langfuse v2 / v3 generation) | Adapter calls only the **public** REST endpoints so it is forward-compatible across minor server versions | +| Pydantic | **v2 only** | `LangfuseConfig` uses `field_validator`; importing the adapter under Pydantic v1 raises a clear error from `pydantic_compat.requires_pydantic(...)` | +| Langfuse Python SDK | not required | The adapter does **not** depend on the `langfuse` Python package — it is a pure HTTP client | +| Networking | `urllib.request` (stdlib) | No `requests` / `httpx` dependency | + +`LangfuseAdapter` exposes its Pydantic compatibility hint as +`requires_pydantic = PydanticCompat.V2_ONLY` so the manifest emitter +can surface this in the atlas-app catalog UI before customers pin an +incompatible runtime. + +## Capture config + +The adapter respects the standard `CaptureConfig` filter from +`layerlens.instrument.adapters._base`: + +```python +from layerlens.instrument.adapters._base import CaptureConfig + +# Recommended for compliance backfills. +adapter = LangfuseAdapter( + config=config, + capture_config=CaptureConfig.standard(), +) + +# Hand-rolled — keep tokens / costs but redact prompt and completion content. +adapter = LangfuseAdapter( + config=config, + capture_config=CaptureConfig( + l3_model_metadata=True, + capture_content=False, + ), +) +``` + +The capture config is applied at the `BaseAdapter` emission layer, not +inside the Langfuse mapper, so the same redaction rules apply +identically to every adapter in the suite. + +## BYOK + +The Langfuse adapter does not own any model API keys — Langfuse never +sees a model provider key. The Langfuse `public_key` / `secret_key` +pair is intended to live in the platform's BYOK store +(`byok_credentials` table — see `docs/adapters/byok.md`) once that +M1.B work ships, alongside the credentials for the runtime-wrapping +adapters. + +## Replay + +`adapter.serialize_for_replay()` returns a `ReplayableTrace` containing +every event the adapter emitted during the current process. Replay is a +re-emit operation: the adapter does not re-fetch from Langfuse during +replay. + +```python +trace = adapter.serialize_for_replay() +# trace.events == list of canonical event dicts +# trace.metadata["sync_state"] == {"imported": N, "exported": M, "quarantined": K} +``` + +## Backward compatibility + +Users coming from the `ateam` / `stratix` package layout (pre-LayerLens +rename) can keep importing the legacy class name: + +```python +from layerlens.instrument.adapters.frameworks.langfuse import STRATIXLangfuseAdapter +``` + +`STRATIXLangfuseAdapter` is resolved via a PEP 562 module-level +`__getattr__` that emits a `DeprecationWarning` and returns +`LangfuseAdapter`. The alias will be removed in v2.0; new code should +import `LangfuseAdapter` directly. + +The mapper additionally tags exported traces with **both** +`layerlens-exported` (canonical) and `stratix-exported` (legacy), and +the importer recognises **both** tags for loop prevention. This allows +a deployment to upgrade the adapter without re-importing traces that +were exported by the previous release. + +## Operational notes + +* **Rate limits.** Langfuse Cloud enforces per-project request quotas; + the client retries `429` responses with capped exponential backoff + (1s → 16s). Tune `LangfuseConfig.max_retries` (default `3`) for + long-running backfills. +* **Cursor persistence.** `SyncState` is held in memory by default. To + resume an incremental sync across process restarts, call + `adapter.sync_state.model_dump()` on shutdown and re-hydrate the + state on the next start. +* **Pagination.** `LangfuseConfig.page_size` (default `50`) controls + how many traces are fetched per `/api/public/traces` request. The + importer transparently iterates until exhausted. +* **Quarantine.** Per-trace failures are tracked in + `SyncState.quarantined_trace_ids`; after three failures the trace is + skipped on future imports. Call + `adapter.sync_state.clear_quarantine(trace_id=...)` to retry a + specific trace, or `clear_quarantine()` (no argument) to clear all + quarantined traces. + +## See also + +* [`samples/instrument/langfuse/`](../../samples/instrument/langfuse/) — runnable mocked sample +* [`docs/adapters/frameworks-langchain.md`](frameworks-langchain.md) — runtime callback adapter for LangChain +* [`docs/adapters/byok.md`](byok.md) *(planned)* — credential storage for Langfuse keys diff --git a/samples/instrument/langfuse/README.md b/samples/instrument/langfuse/README.md new file mode 100644 index 00000000..a9894d47 --- /dev/null +++ b/samples/instrument/langfuse/README.md @@ -0,0 +1,60 @@ +# Langfuse sample + +Runnable end-to-end sample for the +`layerlens.instrument.adapters.frameworks.langfuse` adapter. + +The sample is **fully mocked** — every call to the Langfuse HTTP API is +intercepted in-process. It makes no network calls and requires no +Langfuse credentials. It exists to demonstrate the adapter's API +surface and act as a smoke test that the `[langfuse-importer]` extra +installs cleanly. + +## Install + +```bash +pip install 'layerlens[langfuse-importer]' +``` + +The `[langfuse-importer]` extra is intentionally empty — the Langfuse +adapter talks to a remote REST surface and uses only `urllib` from the +Python stdlib. No additional dependencies are pulled in. + +## Run + +```bash +python -m samples.instrument.langfuse.main +``` + +You should see three labeled flows print to stdout: + +* `[import]` — Backfill two synthetic Langfuse traces (one with a + `GENERATION` observation, one with a `TOOL` `SPAN`) into LayerLens + canonical events. Prints the per-event-type histogram. +* `[export]` — Push one synthetic LayerLens trace + (`agent.input` + `model.invoke` + `cost.record` + `agent.output`) + back into Langfuse via the batch ingestion endpoint and confirms + the loop-prevention tags (`layerlens-exported`, `stratix-exported`). +* `[bidirectional]` — Run a dry-run `sync()` in `BIDIRECTIONAL` mode. + +The sample exits 0 on success. + +## Live Langfuse smoke (optional) + +If you have a Langfuse instance, set these environment variables before +running and the sample will additionally exercise a single live +`connect()` against the API after the mocked flows complete: + +```bash +export LANGFUSE_PUBLIC_KEY="pk-lf-..." +export LANGFUSE_SECRET_KEY="sk-lf-..." +export LANGFUSE_HOST="https://cloud.langfuse.com" # default +``` + +The live smoke only runs the health check that `connect()` performs; +no traces are imported or exported against the live instance. + +## See also + +`docs/adapters/frameworks-langfuse.md` — install + usage guide, +import/export pipeline architecture, capability matrix, version +compatibility, and the `STRATIX → LayerLens` deprecation aliases. diff --git a/samples/instrument/langfuse/__init__.py b/samples/instrument/langfuse/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/samples/instrument/langfuse/main.py b/samples/instrument/langfuse/main.py new file mode 100644 index 00000000..acd2d22f --- /dev/null +++ b/samples/instrument/langfuse/main.py @@ -0,0 +1,455 @@ +"""Runnable sample: drive the Langfuse adapter end-to-end. + +This sample is **fully mocked** — every call to the Langfuse HTTP API is +intercepted in-process. No real network traffic is generated and no +Langfuse credentials are required. The sample exists to demonstrate the +adapter's API surface and act as a smoke test that the +``[langfuse-importer]`` extra installs cleanly. + +Three flows are exercised end-to-end: + + 1. ``[import]`` Backfill traces from Langfuse into LayerLens. The + Langfuse REST surface is replaced with a small in-process client + that returns two synthetic traces (one with a ``GENERATION`` + observation, one with a ``TOOL`` ``SPAN``). Each Langfuse trace + is mapped into LayerLens canonical events and emitted through a + recording stratix sink. + 2. ``[export]`` Push three LayerLens events (``agent.input``, + ``model.invoke``, ``agent.output``) for a single trace back into + Langfuse via the batch ingestion endpoint. The mocked client + records the ingestion payload so the sample can print a summary. + 3. ``[bidirectional]`` Run a combined import + export cycle through + the high-level ``adapter.sync()`` API and print the merged + ``SyncResult``. + +Run:: + + pip install 'layerlens[langfuse-importer]' # extra is empty (stdlib) + python -m samples.instrument.langfuse.main + +Exits 0 on success. + +Optional environment for an additional live ``connect()`` smoke against a +real Langfuse instance (skipped when not set): + +* ``LANGFUSE_PUBLIC_KEY`` +* ``LANGFUSE_SECRET_KEY`` +* ``LANGFUSE_HOST`` — defaults to ``https://cloud.langfuse.com`` +""" + +from __future__ import annotations + +import os +import sys +from typing import Any +from datetime import datetime, timezone + +from layerlens.instrument.adapters._base import CaptureConfig +from layerlens.instrument.adapters.frameworks.langfuse import LangfuseAdapter +from layerlens.instrument.adapters.frameworks.langfuse.config import ( + SyncDirection, + LangfuseConfig, +) + +UTC = timezone.utc # 3.9 / 3.10 compat alias for ``datetime.UTC`` + + +# --------------------------------------------------------------------------- +# In-process recording sink (stand-in for HttpEventSink / OTLP) +# --------------------------------------------------------------------------- + + +class _RecordingStratix: + """Stand-in for a real LayerLens client — records every emit() call.""" + + def __init__(self) -> None: + self.events: list[tuple[str, dict[str, Any]]] = [] + + def emit(self, *args: Any, **kwargs: Any) -> None: # noqa: ARG002 + if len(args) == 2 and isinstance(args[0], str): + self.events.append((args[0], args[1])) + + +# --------------------------------------------------------------------------- +# In-process Langfuse API client mock +# --------------------------------------------------------------------------- + + +class _MockLangfuseClient: + """Mock of ``LangfuseAPIClient`` with the methods the adapter calls. + + The adapter's importer, exporter, and sync orchestrator only call a + small surface of the real client — health_check, get_all_traces, + get_trace, ingestion_batch. We implement those with in-memory + fixture data so the sample is fully offline. + """ + + def __init__(self) -> None: + # Fixture: two Langfuse traces, one with a generation, one with a tool. + self._traces: list[dict[str, Any]] = [ + { + "id": "lf-trace-001", + "name": "support-agent", + "timestamp": "2026-04-25T10:00:00Z", + "endTime": "2026-04-25T10:00:05Z", + "input": {"question": "Where is my order?"}, + "output": {"answer": "Your order shipped on 2026-04-24."}, + "tags": ["production", "tier-pro"], + "metadata": {"customer_id": "cust-42"}, + "observations": [ + { + "id": "obs-gen-001", + "type": "GENERATION", + "name": "gpt-4o-mini", + "model": "gpt-4o-mini", + "startTime": "2026-04-25T10:00:01Z", + "endTime": "2026-04-25T10:00:04Z", + "usage": { + "promptTokens": 150, + "completionTokens": 32, + "totalTokens": 182, + }, + "totalCost": 0.000273, + "modelParameters": {"temperature": 0.0, "max_tokens": 64}, + } + ], + }, + { + "id": "lf-trace-002", + "name": "tool-using-agent", + "timestamp": "2026-04-25T10:05:00Z", + "endTime": "2026-04-25T10:05:02Z", + "input": {"question": "Look up order O-123"}, + "output": {"answer": "Found order O-123 (status=shipped)."}, + "tags": ["production"], + "observations": [ + { + "id": "obs-tool-001", + "type": "SPAN", + "name": "tool_lookup_order", + "startTime": "2026-04-25T10:05:00Z", + "endTime": "2026-04-25T10:05:01Z", + "input": {"order_id": "O-123"}, + "output": {"status": "shipped"}, + "metadata": {"type": "TOOL", "tool_name": "lookup_order"}, + } + ], + }, + ] + # Records every batch ingested via ``ingestion_batch`` — used to + # verify the export path round-trips correctly. + self.ingested_batches: list[list[dict[str, Any]]] = [] + + # --- Methods the adapter actually invokes --- + + def health_check(self) -> dict[str, Any]: + return {"status": "ok"} + + def get_all_traces( + self, + limit: int = 50, # noqa: ARG002 + tags: list[str] | None = None, + from_timestamp: datetime | None = None, # noqa: ARG002 + to_timestamp: datetime | None = None, # noqa: ARG002 + ) -> list[dict[str, Any]]: + out = self._traces + if tags: + tag_set = set(tags) + out = [t for t in out if tag_set.intersection(t.get("tags", []))] + # Trim to summary-shape (the real API returns summaries, not full bodies). + return [ + {"id": t["id"], "tags": t.get("tags", []), "timestamp": t["timestamp"]} + for t in out + ] + + def get_trace(self, trace_id: str) -> dict[str, Any]: + for t in self._traces: + if t["id"] == trace_id: + return t + raise KeyError(f"Unknown trace id {trace_id!r}") + + def ingestion_batch(self, events: list[dict[str, Any]]) -> dict[str, Any]: + # Real Langfuse responds with ``{"successes": [...], "errors": [...]}``. + self.ingested_batches.append(events) + return { + "successes": [{"id": e.get("id"), "status": 201} for e in events], + "errors": [], + } + + +# --------------------------------------------------------------------------- +# Adapter wiring +# --------------------------------------------------------------------------- + + +def _build_adapter(stratix: _RecordingStratix) -> tuple[LangfuseAdapter, _MockLangfuseClient]: + """Construct a LangfuseAdapter wired to the in-process mock client. + + We construct the adapter *without* a config so ``connect()`` does + not try to stand up a real ``LangfuseAPIClient`` (which would attempt + a live health check). We then swap the mock client in directly and + re-initialise the importer / exporter / sync sub-components — this + is the same wiring path ``connect(config=...)`` would normally run. + """ + from layerlens.instrument.adapters._base.adapter import AdapterStatus + from layerlens.instrument.adapters.frameworks.langfuse.sync import BidirectionalSync + from layerlens.instrument.adapters.frameworks.langfuse.exporter import TraceExporter + from layerlens.instrument.adapters.frameworks.langfuse.importer import TraceImporter + + adapter = LangfuseAdapter( + stratix=stratix, + capture_config=CaptureConfig.full(), + ) + adapter.connect() # no-config path → HEALTHY but no client + + mock_client = _MockLangfuseClient() + adapter._client = mock_client # type: ignore[assignment] + adapter._importer = TraceImporter(mock_client, adapter._sync_state) # type: ignore[arg-type] + adapter._exporter = TraceExporter(mock_client, adapter._sync_state) # type: ignore[arg-type] + adapter._sync = BidirectionalSync( + importer=adapter._importer, + exporter=adapter._exporter, + state=adapter._sync_state, + ) + adapter._langfuse_healthy = True + adapter._status = AdapterStatus.HEALTHY + return adapter, mock_client + + +# --------------------------------------------------------------------------- +# Flow 1 — Import traces from Langfuse into LayerLens +# --------------------------------------------------------------------------- + + +def _flow_import(adapter: LangfuseAdapter, stratix: _RecordingStratix) -> int: + print("[import] importing all traces from Langfuse...") + result = adapter.import_traces() + if result.errors: + print(f"[import] FAILED: {result.errors}", file=sys.stderr) + return 1 + + print( + f"[import] imported={result.imported_count} skipped={result.skipped_count} " + f"failed={result.failed_count} duration_ms={result.duration_ms:.1f}" + ) + + # Group emitted events by type for a tidy summary. + by_type: dict[str, int] = {} + for event_type, _payload in stratix.events: + by_type[event_type] = by_type.get(event_type, 0) + 1 + print(f"[import] events emitted by type: {dict(sorted(by_type.items()))}") + + if result.imported_count != 2: + print( + f"[import] expected 2 traces imported, got {result.imported_count}", + file=sys.stderr, + ) + return 1 + return 0 + + +# --------------------------------------------------------------------------- +# Flow 2 — Export LayerLens events back to Langfuse +# --------------------------------------------------------------------------- + + +def _flow_export(adapter: LangfuseAdapter, mock_client: _MockLangfuseClient) -> int: + print("[export] exporting one synthetic LayerLens trace to Langfuse...") + events_by_trace: dict[str, list[dict[str, Any]]] = { + "ll-trace-001": [ + { + "event_type": "agent.input", + "trace_id": "ll-trace-001", + "timestamp": "2026-04-25T11:00:00Z", + "sequence_id": 0, + "payload": { + "agent_id": "billing-agent", + "input_text": "Refund my last invoice", + "input": {"intent": "refund", "invoice_id": "INV-9001"}, + }, + }, + { + "event_type": "model.invoke", + "trace_id": "ll-trace-001", + "timestamp": "2026-04-25T11:00:00Z", + "sequence_id": 1, + "payload": { + "provider": "openai", + "model": "gpt-4o-mini", + "tokens_prompt": 200, + "tokens_completion": 40, + "tokens_total": 240, + "latency_ms": 1500, + }, + }, + { + "event_type": "cost.record", + "trace_id": "ll-trace-001", + "timestamp": "2026-04-25T11:00:00Z", + "sequence_id": 2, + "payload": { + "model": "gpt-4o-mini", + "cost_usd": 0.00036, + "tokens_prompt": 200, + "tokens_completion": 40, + }, + }, + { + "event_type": "agent.output", + "trace_id": "ll-trace-001", + "timestamp": "2026-04-25T11:00:01Z", + "sequence_id": 3, + "payload": { + "agent_id": "billing-agent", + "output_text": "Refund processed.", + "output": {"status": "refunded", "amount": 49.0}, + }, + }, + ], + } + + result = adapter.export_traces(events_by_trace=events_by_trace) + if result.errors: + print(f"[export] FAILED: {result.errors}", file=sys.stderr) + return 1 + + print( + f"[export] exported={result.exported_count} skipped={result.skipped_count} " + f"failed={result.failed_count} duration_ms={result.duration_ms:.1f}" + ) + print(f"[export] mock recorded {len(mock_client.ingested_batches)} ingestion batch(es)") + + # Confirm the batch contained a trace-create + at least one observation. + if not mock_client.ingested_batches: + print("[export] no batches ingested", file=sys.stderr) + return 1 + batch = mock_client.ingested_batches[-1] + types = sorted({e.get("type") for e in batch}) + print(f"[export] batch event types: {types}") + if "trace-create" not in types: + print("[export] missing trace-create event in batch", file=sys.stderr) + return 1 + + # Confirm the loop-prevention tag is present on the exported trace. + trace_event = next((e for e in batch if e.get("type") == "trace-create"), None) + if trace_event is None: + print("[export] no trace-create body found", file=sys.stderr) + return 1 + tags = trace_event.get("body", {}).get("tags", []) + if "layerlens-exported" not in tags: + print( + f"[export] expected ``layerlens-exported`` tag, got {tags}", + file=sys.stderr, + ) + return 1 + print(f"[export] loop-prevention tags: {tags}") + return 0 + + +# --------------------------------------------------------------------------- +# Flow 3 — Bidirectional sync() +# --------------------------------------------------------------------------- + + +def _flow_bidirectional(adapter: LangfuseAdapter) -> int: + print("[bidirectional] running a dry-run sync() in BIDIRECTIONAL mode...") + result = adapter.sync( + direction=SyncDirection.BIDIRECTIONAL, + dry_run=True, + events_by_trace={ + "ll-trace-002": [ + { + "event_type": "agent.input", + "trace_id": "ll-trace-002", + "timestamp": "2026-04-25T12:00:00Z", + "sequence_id": 0, + "payload": { + "agent_id": "qa-agent", + "input_text": "Run a smoke test.", + }, + } + ] + }, + ) + print( + f"[bidirectional] direction={result.direction.value} dry_run={result.dry_run} " + f"imported={result.imported_count} exported={result.exported_count} " + f"skipped={result.skipped_count} failed={result.failed_count}" + ) + return 0 + + +# --------------------------------------------------------------------------- +# Optional: live connect smoke (only if LANGFUSE_* env vars present) +# --------------------------------------------------------------------------- + + +def _have_langfuse_env() -> bool: + return bool(os.environ.get("LANGFUSE_PUBLIC_KEY")) and bool( + os.environ.get("LANGFUSE_SECRET_KEY") + ) + + +def _flow_live_connect_smoke() -> int: + cfg = LangfuseConfig( + public_key=os.environ["LANGFUSE_PUBLIC_KEY"], + secret_key=os.environ["LANGFUSE_SECRET_KEY"], + host=os.environ.get("LANGFUSE_HOST", "https://cloud.langfuse.com"), + ) + adapter = LangfuseAdapter(config=cfg, capture_config=CaptureConfig.standard()) + try: + adapter.connect() + status = adapter.get_status() + print( + f"[live-connect] connected={status['connected']} " + f"langfuse_healthy={status['langfuse_healthy']} host={status['host']}" + ) + finally: + adapter.disconnect() + return 0 + + +# --------------------------------------------------------------------------- +# Entrypoint +# --------------------------------------------------------------------------- + + +def main() -> int: + stratix = _RecordingStratix() + adapter, mock_client = _build_adapter(stratix) + + rc = _flow_import(adapter, stratix) + if rc: + adapter.disconnect() + return rc + + rc = _flow_export(adapter, mock_client) + if rc: + adapter.disconnect() + return rc + + rc = _flow_bidirectional(adapter) + if rc: + adapter.disconnect() + return rc + + state = adapter.sync_state + print( + f"[summary] sink recorded {len(stratix.events)} events; " + f"sync_state imported={len(state.imported_trace_ids)} " + f"exported={len(state.exported_trace_ids)} " + f"quarantined={len(state.quarantined_trace_ids)}" + ) + + adapter.disconnect() + + if _have_langfuse_env(): + rc = _flow_live_connect_smoke() + if rc: + return rc + + return 0 + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/scripts/emit_adapter_manifest.py b/scripts/emit_adapter_manifest.py index fd4c660f..874a9427 100644 --- a/scripts/emit_adapter_manifest.py +++ b/scripts/emit_adapter_manifest.py @@ -154,6 +154,10 @@ "cohere", "mistral", "smolagents", + # Frameworks promoted from ``lifecycle_preview`` once they ship a + # dedicated unit-test suite, a reference doc in ``docs/adapters/``, + # and a runnable mocked sample under ``samples/instrument/``. + "langfuse", } diff --git a/src/layerlens/instrument/adapters/frameworks/__init__.py b/src/layerlens/instrument/adapters/frameworks/__init__.py new file mode 100644 index 00000000..4cfd328f --- /dev/null +++ b/src/layerlens/instrument/adapters/frameworks/__init__.py @@ -0,0 +1,32 @@ +"""Framework adapters for the LayerLens Instrument layer. + +Each framework adapter wraps an agent / chain framework's lifecycle to +intercept agent runs, model invocations, tool calls, state changes, and +handoffs, emitting events through the LayerLens telemetry pipeline. + +Adapters available (loaded on demand via :class:`AdapterRegistry`): + +* ``langchain`` — LangChain (callbacks + agent + chain + memory) +* ``langgraph`` — LangGraph (graph hooks + handoff detection + state) +* ``crewai`` — CrewAI (delegation + team metadata) +* ``autogen`` — AutoGen (group chat + lifecycle) +* ``agentforce`` — Salesforce Agentforce (auth, client, event mapping) +* ``semantic_kernel`` — Microsoft Semantic Kernel (filters + lifecycle) +* ``langfuse_importer`` — Langfuse trace import / export +* ``embedding`` — Embedding + vector store instrumentation +* ``openai_agents`` — OpenAI Agents SDK lifecycle +* ``ms_agent_framework`` — MS Agent Framework lifecycle +* ``agno`` — Agno lifecycle +* ``bedrock_agents`` — AWS Bedrock Agents lifecycle +* ``llama_index`` — LlamaIndex lifecycle +* ``google_adk`` — Google ADK lifecycle +* ``strands`` — Strands lifecycle +* ``benchmark_import`` — Benchmark replay-based ingestion +* ``pydantic_ai`` — Pydantic-AI lifecycle +* ``smolagents`` — SmolAgents (HuggingFace) lifecycle +* ``browser_use`` — Browser-Use lifecycle (placeholder; ported in M7) + +Importing this package does NOT import any framework SDK. +""" + +from __future__ import annotations diff --git a/src/layerlens/instrument/adapters/frameworks/langfuse/__init__.py b/src/layerlens/instrument/adapters/frameworks/langfuse/__init__.py new file mode 100644 index 00000000..a79647ad --- /dev/null +++ b/src/layerlens/instrument/adapters/frameworks/langfuse/__init__.py @@ -0,0 +1,81 @@ +""" +LayerLens Langfuse Adapter + +Bidirectional trace sync between LayerLens and Langfuse. + +Unlike other adapters that wrap running code in real-time, the Langfuse +adapter is a data import/export pipeline that communicates with a remote +Langfuse HTTP API to pull/push traces in batch. + +Usage:: + + from layerlens.instrument.adapters.frameworks.langfuse import LangfuseAdapter + from layerlens.instrument.adapters.frameworks.langfuse.config import LangfuseConfig + + config = LangfuseConfig( + public_key="pk-...", + secret_key="sk-...", + ) + + adapter = LangfuseAdapter(stratix=stratix_instance, config=config) + adapter.connect() + + # Import traces from Langfuse into LayerLens + result = adapter.import_traces(since=datetime(2024, 1, 1)) + + # Export LayerLens traces to Langfuse + result = adapter.export_traces(events_by_trace={"trace-1": [...]}) + +Backward compatibility +---------------------- + +Users coming from ``ateam`` / ``stratix`` can keep importing the old +name ``STRATIXLangfuseAdapter``. Accessing it raises a +:class:`DeprecationWarning` (see PEP 562) and resolves to +:class:`LangfuseAdapter`. The alias will be removed in v2.0. +""" + +from __future__ import annotations + +import warnings +from typing import Any + +from layerlens.instrument.adapters._base.pydantic_compat import PydanticCompat, requires_pydantic + +# Round-2 deliberation item 20: ``frameworks/langfuse/config.py`` uses +# ``field_validator`` (v2-only); fail fast under v1 with a clear message +# instead of a confusing ImportError from config.py. +requires_pydantic(PydanticCompat.V2_ONLY) + +from layerlens.instrument.adapters.frameworks.langfuse.lifecycle import LangfuseAdapter + +# Registry lazy-loading convention +ADAPTER_CLASS = LangfuseAdapter + +__all__ = [ + "LangfuseAdapter", + "STRATIXLangfuseAdapter", + "ADAPTER_CLASS", +] + + +def __getattr__(name: str) -> Any: + """PEP 562 module-level ``__getattr__`` for deprecated aliases. + + Importing :class:`STRATIXLangfuseAdapter` raises a + :class:`DeprecationWarning` and resolves to :class:`LangfuseAdapter`. + This preserves backward compatibility with code written against the + legacy ``stratix.*`` package layout (pre-LayerLens rename) while + nudging callers toward the new name. + """ + if name == "STRATIXLangfuseAdapter": + warnings.warn( + "STRATIXLangfuseAdapter is a deprecated alias for " + "LangfuseAdapter and will be removed in v2.0. Import " + "LangfuseAdapter from " + "layerlens.instrument.adapters.frameworks.langfuse instead.", + DeprecationWarning, + stacklevel=2, + ) + return LangfuseAdapter + raise AttributeError(f"module {__name__!r} has no attribute {name!r}") diff --git a/src/layerlens/instrument/adapters/frameworks/langfuse/client.py b/src/layerlens/instrument/adapters/frameworks/langfuse/client.py new file mode 100644 index 00000000..def3e8ea --- /dev/null +++ b/src/layerlens/instrument/adapters/frameworks/langfuse/client.py @@ -0,0 +1,293 @@ +""" +Langfuse API Client + +HTTP client for the Langfuse REST API using stdlib urllib. +Supports Basic auth, pagination, and exponential backoff. +""" + +from __future__ import annotations + +import json +import time +import base64 +import logging +import contextlib +from typing import Any +from datetime import datetime, timezone + +UTC = timezone.utc # Python 3.11+ has datetime.UTC; alias for 3.9/3.10 compat. +from urllib.error import URLError, HTTPError +from urllib.request import Request, urlopen + +logger = logging.getLogger(__name__) + +# Langfuse API rate limit: 429 responses trigger backoff +_DEFAULT_MAX_RETRIES = 3 +_BACKOFF_BASE_S = 1.0 +_BACKOFF_MAX_S = 16.0 +_REQUEST_TIMEOUT_S = 30 + + +class LangfuseAPIError(Exception): + """Raised when a Langfuse API call fails.""" + + def __init__(self, message: str, status_code: int | None = None, body: str = "") -> None: + super().__init__(message) + self.status_code = status_code + self.body = body + + +class LangfuseAPIClient: + """ + HTTP client for the Langfuse REST API. + + Uses Basic auth with base64(public_key:secret_key). + No external dependencies — built on stdlib urllib.request. + """ + + def __init__( + self, + public_key: str, + secret_key: str, + host: str = "https://cloud.langfuse.com", + max_retries: int = _DEFAULT_MAX_RETRIES, + timeout: int = _REQUEST_TIMEOUT_S, + ) -> None: + self._host = host.rstrip("/") + self._max_retries = max_retries + self._timeout = timeout + + # Basic auth header + credentials = f"{public_key}:{secret_key}" + encoded = base64.b64encode(credentials.encode()).decode() + self._auth_header = f"Basic {encoded}" + + # --- Public API --- + + def health_check(self) -> dict[str, Any]: + """Check Langfuse API health.""" + return self._request("GET", "/api/public/health") + + def list_traces( + self, + page: int = 1, + limit: int = 50, + order_by: str = "timestamp", + order: str = "DESC", + name: str | None = None, + tags: list[str] | None = None, + from_timestamp: datetime | None = None, + to_timestamp: datetime | None = None, + ) -> dict[str, Any]: + """ + List traces with pagination and filtering. + + Returns dict with 'data' (list of trace objects) and 'meta' (pagination info). + """ + params: dict[str, Any] = { + "page": page, + "limit": limit, + "orderBy": order_by, + "order": order, + } + if name: + params["name"] = name + if tags: + for tag in tags: + params.setdefault("tags", []).append(tag) + if from_timestamp: + params["fromTimestamp"] = from_timestamp.isoformat() + if to_timestamp: + params["toTimestamp"] = to_timestamp.isoformat() + + return self._request("GET", "/api/public/traces", params=params) + + def get_trace(self, trace_id: str) -> dict[str, Any]: + """Get a single trace with all observations.""" + return self._request("GET", f"/api/public/traces/{trace_id}") + + def list_observations( + self, + trace_id: str | None = None, + page: int = 1, + limit: int = 50, + type: str | None = None, + ) -> dict[str, Any]: + """List observations for a trace.""" + params: dict[str, Any] = {"page": page, "limit": limit} + if trace_id: + params["traceId"] = trace_id + if type: + params["type"] = type + return self._request("GET", "/api/public/observations", params=params) + + def create_trace(self, trace_data: dict[str, Any]) -> dict[str, Any]: + """Create a new trace in Langfuse.""" + return self._request( + "POST", + "/api/public/ingestion", + body={ + "batch": [ + { + "id": trace_data.get("id", ""), + "type": "trace-create", + "timestamp": datetime.now(UTC).isoformat(), + "body": trace_data, + } + ], + }, + ) + + def create_generation(self, generation_data: dict[str, Any]) -> dict[str, Any]: + """Create a generation observation.""" + return self._request( + "POST", + "/api/public/ingestion", + body={ + "batch": [ + { + "id": generation_data.get("id", ""), + "type": "generation-create", + "timestamp": datetime.now(UTC).isoformat(), + "body": generation_data, + } + ], + }, + ) + + def create_span(self, span_data: dict[str, Any]) -> dict[str, Any]: + """Create a span observation.""" + return self._request( + "POST", + "/api/public/ingestion", + body={ + "batch": [ + { + "id": span_data.get("id", ""), + "type": "span-create", + "timestamp": datetime.now(UTC).isoformat(), + "body": span_data, + } + ], + }, + ) + + def ingestion_batch(self, events: list[dict[str, Any]]) -> dict[str, Any]: + """Send a batch of ingestion events.""" + return self._request("POST", "/api/public/ingestion", body={"batch": events}) + + def get_all_traces( + self, + limit: int = 50, + tags: list[str] | None = None, + from_timestamp: datetime | None = None, + to_timestamp: datetime | None = None, + ) -> list[dict[str, Any]]: + """ + Fetch all traces with automatic pagination. + + Yields all pages until exhausted. + """ + all_traces: list[dict[str, Any]] = [] + page = 1 + while True: + result = self.list_traces( + page=page, + limit=limit, + tags=tags, + from_timestamp=from_timestamp, + to_timestamp=to_timestamp, + ) + data = result.get("data", []) + if not data: + break + all_traces.extend(data) + meta = result.get("meta", {}) + total_pages = meta.get("totalPages", 1) + if page >= total_pages: + break + page += 1 + return all_traces + + # --- Internal --- + + def _request( + self, + method: str, + path: str, + params: dict[str, Any] | None = None, + body: dict[str, Any] | None = None, + ) -> dict[str, Any]: + """Make an HTTP request with retry and backoff.""" + url = f"{self._host}{path}" + if params: + # Handle list params (e.g., tags) + query_parts = [] + for k, v in params.items(): + if isinstance(v, list): + for item in v: + query_parts.append(f"{k}={item}") + else: + query_parts.append(f"{k}={v}") + url = f"{url}?{'&'.join(query_parts)}" + + headers = { + "Authorization": self._auth_header, + "Content-Type": "application/json", + "Accept": "application/json", + } + + data = json.dumps(body).encode() if body else None + + last_error: Exception | None = None + for attempt in range(self._max_retries + 1): + try: + req = Request(url, data=data, headers=headers, method=method) + with urlopen(req, timeout=self._timeout) as resp: + resp_body = resp.read().decode() + if not resp_body: + return {} + return json.loads(resp_body) # type: ignore[no-any-return] + + except HTTPError as e: + status = e.code + error_body = "" + with contextlib.suppress(Exception): + error_body = e.read().decode() + + if status == 429 or status >= 500: + last_error = LangfuseAPIError( + f"HTTP {status}: {error_body}", status_code=status, body=error_body + ) + if attempt < self._max_retries: + delay = min(_BACKOFF_BASE_S * (2**attempt), _BACKOFF_MAX_S) + logger.debug( + "Langfuse API %s %s returned %d, retrying in %.1fs (attempt %d/%d)", + method, + path, + status, + delay, + attempt + 1, + self._max_retries, + ) + time.sleep(delay) + continue + raise LangfuseAPIError( # noqa: B904 + f"HTTP {status}: {error_body}", status_code=status, body=error_body + ) + + except URLError as e: + last_error = LangfuseAPIError(f"Connection error: {e}") + if attempt < self._max_retries: + delay = min(_BACKOFF_BASE_S * (2**attempt), _BACKOFF_MAX_S) + logger.debug( + "Langfuse API connection error, retrying in %.1fs (attempt %d/%d)", + delay, + attempt + 1, + self._max_retries, + ) + time.sleep(delay) + continue + raise last_error # noqa: B904 + + raise last_error or LangfuseAPIError("Max retries exceeded") diff --git a/src/layerlens/instrument/adapters/frameworks/langfuse/config.py b/src/layerlens/instrument/adapters/frameworks/langfuse/config.py new file mode 100644 index 00000000..2c4bb963 --- /dev/null +++ b/src/layerlens/instrument/adapters/frameworks/langfuse/config.py @@ -0,0 +1,143 @@ +""" +Langfuse Adapter Configuration Models + +Pydantic models for Langfuse adapter configuration, sync state tracking, +and sync result reporting. +""" + +from __future__ import annotations + +from enum import Enum # Python 3.11+ has StrEnum; using `(str, Enum)` for 3.9/3.10 compat. +from typing import Optional +from datetime import datetime + +from pydantic import Field, BaseModel, field_validator + + +class SyncDirection(str, Enum): + """Direction of synchronization.""" + + IMPORT = "import" + EXPORT = "export" + BIDIRECTIONAL = "bidirectional" + + +class ConflictStrategy(str, Enum): + """Strategy for resolving sync conflicts.""" + + LAST_WRITE_WINS = "last-write-wins" + MANUAL = "manual" + + +class LangfuseConfig(BaseModel): + """Configuration for the Langfuse adapter.""" + + public_key: str = Field(description="Langfuse public API key") + secret_key: str = Field(description="Langfuse secret API key") + host: str = Field( + default="https://cloud.langfuse.com", + description="Langfuse API host URL", + ) + mode: SyncDirection = Field( + default=SyncDirection.IMPORT, + description="Sync mode: import, export, or bidirectional", + ) + sync_interval_seconds: int = Field( + default=3600, + description="Auto-sync interval in seconds (0 = disabled)", + ) + project_filter: Optional[str] = Field( + default=None, + description="Filter by Langfuse project name", + ) + tag_filter: Optional[list[str]] = Field( + default=None, + description="Filter by trace tags", + ) + since: Optional[datetime] = Field( + default=None, + description="Only sync traces after this timestamp", + ) + conflict_strategy: ConflictStrategy = Field( + default=ConflictStrategy.LAST_WRITE_WINS, + description="Conflict resolution strategy", + ) + max_retries: int = Field(default=3, description="Max retries per API call") + page_size: int = Field(default=50, description="Page size for listing traces") + + @field_validator("host") + @classmethod + def strip_trailing_slash(cls, v: str) -> str: + return v.rstrip("/") + + +class SyncState(BaseModel): + """Tracks the state of a Langfuse sync session.""" + + last_import_cursor: Optional[datetime] = Field( + default=None, + description="Timestamp of the last imported trace", + ) + last_export_cursor: Optional[datetime] = Field( + default=None, + description="Timestamp of the last exported trace", + ) + imported_trace_ids: set[str] = Field( + default_factory=set, + description="Set of Langfuse trace IDs that have been imported", + ) + exported_trace_ids: set[str] = Field( + default_factory=set, + description="Set of LayerLens trace IDs that have been exported", + ) + quarantined_trace_ids: dict[str, int] = Field( + default_factory=dict, + description="Trace IDs that have failed repeatedly, mapped to failure count", + ) + + def record_import(self, trace_id: str, updated_at: datetime) -> None: + """Record a successful import.""" + self.imported_trace_ids.add(trace_id) + if self.last_import_cursor is None or updated_at > self.last_import_cursor: + self.last_import_cursor = updated_at + # Clear from quarantine on success + self.quarantined_trace_ids.pop(trace_id, None) + + def record_export(self, trace_id: str, updated_at: datetime) -> None: + """Record a successful export.""" + self.exported_trace_ids.add(trace_id) + if self.last_export_cursor is None or updated_at > self.last_export_cursor: + self.last_export_cursor = updated_at + + def record_failure(self, trace_id: str, max_failures: int = 3) -> bool: + """ + Record a failure for a trace. Returns True if the trace is now quarantined. + """ + count = self.quarantined_trace_ids.get(trace_id, 0) + 1 + self.quarantined_trace_ids[trace_id] = count + return count >= max_failures + + def is_quarantined(self, trace_id: str) -> bool: + """Check if a trace is quarantined (3+ failures).""" + return self.quarantined_trace_ids.get(trace_id, 0) >= 3 + + def clear_quarantine(self, trace_id: str | None = None) -> None: + """Clear quarantine for a specific trace or all traces.""" + if trace_id: + self.quarantined_trace_ids.pop(trace_id, None) + else: + self.quarantined_trace_ids.clear() + + +class SyncResult(BaseModel): + """Result of a sync operation.""" + + direction: SyncDirection + imported_count: int = Field(default=0) + exported_count: int = Field(default=0) + skipped_count: int = Field(default=0) + failed_count: int = Field(default=0) + quarantined_count: int = Field(default=0) + errors: list[str] = Field(default_factory=list) + duration_ms: float = Field(default=0.0) + dry_run: bool = Field(default=False) diff --git a/src/layerlens/instrument/adapters/frameworks/langfuse/exporter.py b/src/layerlens/instrument/adapters/frameworks/langfuse/exporter.py new file mode 100644 index 00000000..0b0539b3 --- /dev/null +++ b/src/layerlens/instrument/adapters/frameworks/langfuse/exporter.py @@ -0,0 +1,142 @@ +""" +Langfuse Trace Exporter + +Reverse-maps LayerLens events to Langfuse traces and pushes them via the API. +""" + +from __future__ import annotations + +import uuid +import logging +from typing import Any +from datetime import datetime, timezone + +UTC = timezone.utc # Python 3.11+ has datetime.UTC; alias for 3.9/3.10 compat. + +from layerlens.instrument.adapters.frameworks.langfuse.client import LangfuseAPIError, LangfuseAPIClient +from layerlens.instrument.adapters.frameworks.langfuse.config import SyncState, SyncResult, SyncDirection +from layerlens.instrument.adapters.frameworks.langfuse.mapper import LayerLensToLangfuseMapper + +logger = logging.getLogger(__name__) + + +class TraceExporter: + """ + Export pipeline for LayerLens -> Langfuse. + + Steps: + 1. Group LayerLens events by trace ID + 2. Reverse-map to Langfuse trace + observations + 3. Create trace and observations via Langfuse API + 4. Tag with ``layerlens-exported`` (and the legacy ``stratix-exported`` + alias) to prevent re-import on the next import cycle. + """ + + def __init__( + self, + client: LangfuseAPIClient, + state: SyncState, + ) -> None: + self._client = client + self._state = state + self._mapper = LayerLensToLangfuseMapper() + + def export_traces( + self, + events_by_trace: dict[str, list[dict[str, Any]]], + trace_ids: list[str] | None = None, + dry_run: bool = False, + ) -> SyncResult: + """ + Export LayerLens traces to Langfuse. + + Args: + events_by_trace: Dict mapping trace_id -> list of LayerLens event dicts. + trace_ids: Optional filter — only export these trace IDs. + dry_run: If True, count but don't actually export. + + Returns: + SyncResult with export statistics. + """ + result = SyncResult(direction=SyncDirection.EXPORT, dry_run=dry_run) + + ids_to_export = trace_ids or list(events_by_trace.keys()) + + for trace_id in ids_to_export: + events = events_by_trace.get(trace_id, []) + if not events: + result.skipped_count += 1 + continue + + # Loop prevention: skip traces that were imported from Langfuse + if trace_id in self._state.imported_trace_ids: + result.skipped_count += 1 + continue + + # Skip already exported + if trace_id in self._state.exported_trace_ids: + result.skipped_count += 1 + continue + + if dry_run: + result.exported_count += 1 + continue + + # Map LayerLens events to Langfuse structure + try: + langfuse_data = self._mapper.map_events_to_trace(events, trace_id=trace_id) + except Exception as e: + logger.warning("Failed to map trace %s for export: %s", trace_id, e) + result.failed_count += 1 + result.errors.append(f"Trace {trace_id} mapping: {e}") + continue + + # Push to Langfuse + try: + self._push_to_langfuse(langfuse_data) + except LangfuseAPIError as e: + logger.warning("Failed to export trace %s: %s", trace_id, e) + result.failed_count += 1 + result.errors.append(f"Trace {trace_id} export: {e}") + continue + + # Record success + self._state.record_export(trace_id, datetime.now(UTC)) + result.exported_count += 1 + + return result + + def _push_to_langfuse(self, langfuse_data: dict[str, Any]) -> None: + """Push a mapped trace + observations to Langfuse via batch ingestion.""" + trace_body = langfuse_data.get("trace", {}) + observations = langfuse_data.get("observations", []) + + # Build batch events + batch: list[dict[str, Any]] = [] + now = datetime.now(UTC).isoformat() + + # Trace create event + batch.append( + { + "id": str(uuid.uuid4()), + "type": "trace-create", + "timestamp": now, + "body": trace_body, + } + ) + + # Observation create events + for obs in observations: + obs_type = obs.get("type", "SPAN").upper() + event_type = "generation-create" if obs_type == "GENERATION" else "span-create" + + batch.append( + { + "id": str(uuid.uuid4()), + "type": event_type, + "timestamp": now, + "body": obs, + } + ) + + self._client.ingestion_batch(batch) diff --git a/src/layerlens/instrument/adapters/frameworks/langfuse/importer.py b/src/layerlens/instrument/adapters/frameworks/langfuse/importer.py new file mode 100644 index 00000000..719b6d5f --- /dev/null +++ b/src/layerlens/instrument/adapters/frameworks/langfuse/importer.py @@ -0,0 +1,181 @@ +""" +Langfuse Trace Importer + +Fetches traces from Langfuse, maps them to LayerLens events, deduplicates, +and ingests via the LayerLens pipeline. +""" + +from __future__ import annotations + +import logging +from typing import Any +from datetime import datetime, timezone + +UTC = timezone.utc # Python 3.11+ has datetime.UTC; alias for 3.9/3.10 compat. + +from layerlens.instrument.adapters.frameworks.langfuse.client import LangfuseAPIError, LangfuseAPIClient +from layerlens.instrument.adapters.frameworks.langfuse.config import SyncState, SyncResult, SyncDirection +from layerlens.instrument.adapters.frameworks.langfuse.mapper import LangfuseToLayerLensMapper + +logger = logging.getLogger(__name__) + + +class TraceImporter: + """ + Import pipeline for Langfuse -> LayerLens. + + Steps: + 1. List traces from Langfuse (with filters) + 2. Fetch full trace with observations + 3. Map to LayerLens events + 4. Deduplicate against previously imported traces + 5. Ingest via LayerLens emit or pipeline + """ + + def __init__( + self, + client: LangfuseAPIClient, + state: SyncState, + ) -> None: + self._client = client + self._state = state + self._mapper = LangfuseToLayerLensMapper() + + def import_traces( + self, + stratix: Any | None = None, + since: datetime | None = None, + tags: list[str] | None = None, + limit: int | None = None, + dry_run: bool = False, + ) -> SyncResult: + """ + Import traces from Langfuse. + + Args: + stratix: LayerLens instance for event emission (or pipeline). + since: Only import traces after this timestamp. + tags: Filter by trace tags. + limit: Max number of traces to import. + dry_run: If True, count but don't actually import. + + Returns: + SyncResult with import statistics. + """ + result = SyncResult(direction=SyncDirection.IMPORT, dry_run=dry_run) + + # Fetch trace list + try: + traces = self._client.get_all_traces( + tags=tags, + from_timestamp=since, + ) + except LangfuseAPIError as e: + result.errors.append(f"Failed to list traces: {e}") + result.failed_count = 1 + return result + + if limit: + traces = traces[:limit] + + for trace_summary in traces: + trace_id = trace_summary.get("id", "") + + # Skip quarantined traces + if self._state.is_quarantined(trace_id): + result.quarantined_count += 1 + continue + + # Dedup: skip already imported (unless updated_at is newer) + if trace_id in self._state.imported_trace_ids: + result.skipped_count += 1 + continue + + # Skip traces exported by LayerLens (loop prevention). + # Recognise both the canonical ``layerlens-exported`` tag and + # the legacy ``stratix-exported`` alias so this importer + # remains compatible with traces that were exported by an + # older release before the LayerLens rename. + trace_tags = trace_summary.get("tags", []) or [] + if "layerlens-exported" in trace_tags or "stratix-exported" in trace_tags: + result.skipped_count += 1 + continue + + if dry_run: + result.imported_count += 1 + continue + + # Fetch full trace + try: + full_trace = self._client.get_trace(trace_id) + except LangfuseAPIError as e: + logger.warning("Failed to fetch trace %s: %s", trace_id, e) + is_quarantined = self._state.record_failure(trace_id) + if is_quarantined: + result.quarantined_count += 1 + result.failed_count += 1 + result.errors.append(f"Trace {trace_id}: {e}") + continue + + # Map to LayerLens events + try: + events = self._mapper.map_trace(full_trace) + except Exception as e: + logger.warning("Failed to map trace %s: %s", trace_id, e) + is_quarantined = self._state.record_failure(trace_id) + if is_quarantined: + result.quarantined_count += 1 + result.failed_count += 1 + result.errors.append(f"Trace {trace_id} mapping: {e}") + continue + + if not events: + result.skipped_count += 1 + continue + + # Ingest events + try: + self._ingest_events(events, stratix) + except Exception as e: + logger.warning("Failed to ingest trace %s: %s", trace_id, e) + is_quarantined = self._state.record_failure(trace_id) + if is_quarantined: + result.quarantined_count += 1 + result.failed_count += 1 + result.errors.append(f"Trace {trace_id} ingestion: {e}") + continue + + # Record success + updated_at = self._parse_timestamp( + full_trace.get("updatedAt", full_trace.get("timestamp")) + ) + self._state.record_import(trace_id, updated_at) + result.imported_count += 1 + + return result + + def _ingest_events( + self, + events: list[dict[str, Any]], + stratix: Any | None, + ) -> None: + """Ingest mapped events via LayerLens emit or pipeline.""" + if stratix is None or not bool(stratix): + return + + for event in events: + event_type = event.get("event_type", "") + payload = event.get("payload", {}) + stratix.emit(event_type, payload) + + @staticmethod + def _parse_timestamp(value: Any) -> datetime: + """Parse a timestamp string to datetime, or return now.""" + if isinstance(value, datetime): + return value + if isinstance(value, str): + try: + return datetime.fromisoformat(value.replace("Z", "+00:00")) + except (ValueError, TypeError): + pass + return datetime.now(UTC) diff --git a/src/layerlens/instrument/adapters/frameworks/langfuse/lifecycle.py b/src/layerlens/instrument/adapters/frameworks/langfuse/lifecycle.py new file mode 100644 index 00000000..aeb8e636 --- /dev/null +++ b/src/layerlens/instrument/adapters/frameworks/langfuse/lifecycle.py @@ -0,0 +1,338 @@ +""" +Langfuse Adapter Lifecycle + +Main LangfuseAdapter class extending BaseAdapter. +Manages connection, health, import/export, and sync operations. +""" + +from __future__ import annotations + +import time +import uuid +import logging +from typing import Any +from datetime import datetime, timezone + +UTC = timezone.utc # Python 3.11+ has datetime.UTC; alias for 3.9/3.10 compat. + +from layerlens.instrument.adapters._base.adapter import ( + AdapterInfo, + BaseAdapter, + AdapterHealth, + AdapterStatus, + ReplayableTrace, + AdapterCapability, +) +from layerlens.instrument.adapters._base.capture import CaptureConfig +from layerlens.instrument.adapters._base.pydantic_compat import PydanticCompat +from layerlens.instrument.adapters.frameworks.langfuse.sync import BidirectionalSync +from layerlens.instrument.adapters.frameworks.langfuse.client import LangfuseAPIError, LangfuseAPIClient +from layerlens.instrument.adapters.frameworks.langfuse.config import ( + SyncState, + SyncResult, + SyncDirection, + LangfuseConfig, +) +from layerlens.instrument.adapters.frameworks.langfuse.exporter import TraceExporter +from layerlens.instrument.adapters.frameworks.langfuse.importer import TraceImporter + +logger = logging.getLogger(__name__) + + +class LangfuseAdapter(BaseAdapter): + """ + LayerLens adapter for Langfuse integration. + + Unlike other adapters that wrap running code in real-time, the Langfuse + adapter is a data import/export pipeline that communicates with a remote + Langfuse HTTP API to pull/push traces in batch. + """ + + FRAMEWORK = "langfuse" + VERSION = "0.1.0" + # The adapter's own config layer + # (``frameworks/langfuse/config.py`` line 13) imports + # ``from pydantic import field_validator`` — a v2-only decorator. + # Pydantic v1 has ``validator``; ``field_validator`` was added in v2 + # (see pydantic v2 migration guide). Importing this adapter under v1 + # raises ``ImportError`` in config.py. + requires_pydantic = PydanticCompat.V2_ONLY + + def __init__( + self, + stratix: Any | None = None, + capture_config: CaptureConfig | None = None, + config: LangfuseConfig | None = None, + ) -> None: + super().__init__(stratix=stratix, capture_config=capture_config) + self._config: LangfuseConfig | None = config + self._client: LangfuseAPIClient | None = None + self._sync_state = SyncState() + self._importer: TraceImporter | None = None + self._exporter: TraceExporter | None = None + self._sync: BidirectionalSync | None = None + self._last_health_check: datetime | None = None + self._langfuse_healthy = False + + # --- BaseAdapter abstract methods --- + + def connect(self, config: LangfuseConfig | None = None) -> None: + """ + Connect to the Langfuse API. + + Creates the HTTP client and validates credentials with a health check. + """ + if config: + self._config = config + + if self._config is None: + # Connect without a config — adapter is usable but not connected to Langfuse + self._connected = True + self._status = AdapterStatus.HEALTHY + return + + self._client = LangfuseAPIClient( + public_key=self._config.public_key, + secret_key=self._config.secret_key, + host=self._config.host, + max_retries=self._config.max_retries, + ) + + # Validate credentials + try: + self._client.health_check() + self._langfuse_healthy = True + except LangfuseAPIError as e: + logger.warning("Langfuse health check failed: %s", e) + self._langfuse_healthy = False + + # Initialize sub-components + self._importer = TraceImporter(self._client, self._sync_state) + self._exporter = TraceExporter(self._client, self._sync_state) + self._sync = BidirectionalSync( + importer=self._importer, + exporter=self._exporter, + state=self._sync_state, + ) + + self._connected = True + self._status = AdapterStatus.HEALTHY if self._langfuse_healthy else AdapterStatus.DEGRADED + self._last_health_check = datetime.now(UTC) + + def disconnect(self) -> None: + """Disconnect from Langfuse.""" + self._client = None + self._importer = None + self._exporter = None + self._sync = None + self._connected = False + self._status = AdapterStatus.DISCONNECTED + self._langfuse_healthy = False + + def health_check(self) -> AdapterHealth: + """Return health status including Langfuse API reachability.""" + message = None + if self._client and self._connected: + try: + self._client.health_check() + self._langfuse_healthy = True + message = "Langfuse API reachable" + except LangfuseAPIError as e: + self._langfuse_healthy = False + message = f"Langfuse API unreachable: {e}" + self._status = AdapterStatus.DEGRADED + elif not self._config: + message = "No Langfuse config — adapter connected without remote API" + else: + message = "Not connected" + + return AdapterHealth( + status=self._status, + framework_name=self.FRAMEWORK, + framework_version=None, + adapter_version=self.VERSION, + message=message, + error_count=self._error_count, + circuit_open=self._circuit_open, + ) + + def get_adapter_info(self) -> AdapterInfo: + """Return metadata about this adapter.""" + return AdapterInfo( + name="LangfuseAdapter", + version=self.VERSION, + framework=self.FRAMEWORK, + framework_version=None, + capabilities=[ + AdapterCapability.TRACE_TOOLS, + AdapterCapability.TRACE_MODELS, + AdapterCapability.REPLAY, + ], + author="LayerLens Team", + description="Bidirectional trace sync between LayerLens and Langfuse", + ) + + def serialize_for_replay(self) -> ReplayableTrace: + """Serialize accumulated trace events for replay.""" + return ReplayableTrace( + adapter_name="LangfuseAdapter", + framework=self.FRAMEWORK, + trace_id=str(uuid.uuid4()), + events=list(self._trace_events), + config=self._config.model_dump() if self._config else {}, + metadata={ + "sync_state": { + "imported": len(self._sync_state.imported_trace_ids), + "exported": len(self._sync_state.exported_trace_ids), + "quarantined": len(self._sync_state.quarantined_trace_ids), + }, + }, + ) + + # --- Import/Export/Sync API --- + + def import_traces( + self, + since: datetime | None = None, + tags: list[str] | None = None, + limit: int | None = None, + dry_run: bool = False, + ) -> SyncResult: + """ + Import traces from Langfuse into LayerLens. + + Args: + since: Only import traces updated after this timestamp. + tags: Filter by Langfuse trace tags. + limit: Maximum number of traces to import. + dry_run: If True, report what would be imported without importing. + + Returns: + SyncResult with import statistics. + """ + if self._importer is None: + return SyncResult( + direction=SyncDirection.IMPORT, + errors=["Adapter not connected to Langfuse API"], + ) + + start_time = time.monotonic() + effective_since = since or (self._config.since if self._config else None) + effective_tags = tags or (self._config.tag_filter if self._config else None) + + result = self._importer.import_traces( + stratix=self._stratix, + since=effective_since, + tags=effective_tags, + limit=limit, + dry_run=dry_run, + ) + result.duration_ms = (time.monotonic() - start_time) * 1000 + return result + + def export_traces( + self, + events_by_trace: dict[str, list[dict[str, Any]]] | None = None, + trace_ids: list[str] | None = None, + dry_run: bool = False, + ) -> SyncResult: + """ + Export LayerLens traces to Langfuse. + + Args: + events_by_trace: Dict mapping trace_id -> list of LayerLens event dicts. + trace_ids: List of trace IDs to export (requires events_by_trace). + dry_run: If True, report what would be exported without exporting. + + Returns: + SyncResult with export statistics. + """ + if self._exporter is None: + return SyncResult( + direction=SyncDirection.EXPORT, + errors=["Adapter not connected to Langfuse API"], + ) + + start_time = time.monotonic() + result = self._exporter.export_traces( + events_by_trace=events_by_trace or {}, + trace_ids=trace_ids, + dry_run=dry_run, + ) + result.duration_ms = (time.monotonic() - start_time) * 1000 + return result + + def sync( + self, + direction: SyncDirection | None = None, + since: datetime | None = None, + dry_run: bool = False, + events_by_trace: dict[str, list[dict[str, Any]]] | None = None, + ) -> SyncResult: + """ + Run a sync cycle in the configured direction. + + Args: + direction: Override the configured sync direction. + since: Override the since timestamp. + dry_run: If True, report what would be synced without making changes. + events_by_trace: Required for export/bidirectional — LayerLens events to export. + + Returns: + SyncResult with combined statistics. + """ + if self._sync is None: + return SyncResult( + direction=direction or SyncDirection.IMPORT, + errors=["Adapter not connected to Langfuse API"], + ) + + effective_direction = direction or ( + self._config.mode if self._config else SyncDirection.IMPORT + ) + start_time = time.monotonic() + + result = self._sync.run( + stratix=self._stratix, + direction=effective_direction, + since=since, + dry_run=dry_run, + events_by_trace=events_by_trace or {}, + tags=self._config.tag_filter if self._config else None, + ) + result.duration_ms = (time.monotonic() - start_time) * 1000 + return result + + # --- State access --- + + @property + def sync_state(self) -> SyncState: + """Return the current sync state.""" + return self._sync_state + + @property + def config(self) -> LangfuseConfig | None: + """Return the current configuration.""" + return self._config + + def get_status(self) -> dict[str, Any]: + """Return a status summary for CLI/API use.""" + return { + "connected": self._connected, + "langfuse_healthy": self._langfuse_healthy, + "host": self._config.host if self._config else None, + "mode": self._config.mode.value if self._config else None, + "imported_traces": len(self._sync_state.imported_trace_ids), + "exported_traces": len(self._sync_state.exported_trace_ids), + "quarantined_traces": len(self._sync_state.quarantined_trace_ids), + "last_import_cursor": ( + self._sync_state.last_import_cursor.isoformat() + if self._sync_state.last_import_cursor + else None + ), + "last_export_cursor": ( + self._sync_state.last_export_cursor.isoformat() + if self._sync_state.last_export_cursor + else None + ), + } diff --git a/src/layerlens/instrument/adapters/frameworks/langfuse/mapper.py b/src/layerlens/instrument/adapters/frameworks/langfuse/mapper.py new file mode 100644 index 00000000..a02d7b46 --- /dev/null +++ b/src/layerlens/instrument/adapters/frameworks/langfuse/mapper.py @@ -0,0 +1,631 @@ +""" +Langfuse <-> LayerLens Bidirectional Field Mapper + +Maps Langfuse trace/observation structures to LayerLens canonical events +and vice versa. +""" + +from __future__ import annotations + +import uuid +import logging +from typing import Any +from datetime import datetime, timezone + +UTC = timezone.utc # Python 3.11+ has datetime.UTC; alias for 3.9/3.10 compat. + +logger = logging.getLogger(__name__) + + +class LangfuseToLayerLensMapper: + """ + Maps Langfuse traces and observations to LayerLens canonical event dicts. + + Each Langfuse trace produces multiple LayerLens events: + - trace.input -> agent.input (L1) + - trace.output -> agent.output (L1) + - span -> agent.code (L2) or tool.call (L5a) + - generation -> model.invoke (L3) + cost.record (Cross) + - metadata -> environment.config (L4a) + - errors -> policy.violation (Cross) + """ + + def map_trace(self, trace: dict[str, Any]) -> list[dict[str, Any]]: + """ + Map a complete Langfuse trace (with observations) to LayerLens events. + + Args: + trace: Langfuse trace dict from the API, including nested observations. + + Returns: + List of LayerLens event dicts ready for ingestion. + """ + trace_id = trace.get("id", str(uuid.uuid4())) + events: list[dict[str, Any]] = [] + timestamp = trace.get("timestamp", datetime.now(UTC).isoformat()) + seq = 0 + + # Trace-level metadata (L4a) + metadata = trace.get("metadata") + if metadata: + events.append( + self._make_event( + event_type="environment.config", + trace_id=trace_id, + timestamp=timestamp, + sequence_id=seq, + payload={ + "config_type": "langfuse_trace_metadata", + "config": metadata, + "framework": "langfuse", + }, + langfuse_metadata=self._extract_trace_metadata(trace), + ) + ) + seq += 1 + + # Trace input -> agent.input (L1) + trace_input = trace.get("input") + if trace_input is not None: + events.append( + self._make_event( + event_type="agent.input", + trace_id=trace_id, + timestamp=timestamp, + sequence_id=seq, + payload={ + "agent_id": trace.get("name", "langfuse_agent"), + "input_text": self._to_str(trace_input), + "input": trace_input, + "framework": "langfuse", + }, + langfuse_metadata=self._extract_trace_metadata(trace), + ) + ) + seq += 1 + + # Sort observations by start_time for temporal ordering + observations = trace.get("observations", []) + observations = sorted( + observations, + key=lambda o: o.get("startTime", o.get("start_time", "")), + ) + + for obs in observations: + obs_events = self._map_observation(obs, trace_id, seq) + events.extend(obs_events) + seq += len(obs_events) + + # Trace output -> agent.output (L1) + trace_output = trace.get("output") + if trace_output is not None: + end_time = trace.get("endTime", trace.get("end_time", timestamp)) + events.append( + self._make_event( + event_type="agent.output", + trace_id=trace_id, + timestamp=end_time or timestamp, + sequence_id=seq, + payload={ + "agent_id": trace.get("name", "langfuse_agent"), + "output_text": self._to_str(trace_output), + "output": trace_output, + "framework": "langfuse", + }, + langfuse_metadata=self._extract_trace_metadata(trace), + ) + ) + + return events + + def _map_observation( + self, + obs: dict[str, Any], + trace_id: str, + start_seq: int, + ) -> list[dict[str, Any]]: + """Map a single Langfuse observation to LayerLens event(s).""" + obs_type = obs.get("type", "SPAN").upper() + timestamp = obs.get("startTime", obs.get("start_time", "")) + + if obs_type == "GENERATION": + return self._map_generation(obs, trace_id, timestamp, start_seq) + elif obs_type == "SPAN": + return self._map_span(obs, trace_id, timestamp, start_seq) + else: + # EVENT or unknown type — map as agent.code + return self._map_span(obs, trace_id, timestamp, start_seq) + + def _map_generation( + self, + obs: dict[str, Any], + trace_id: str, + timestamp: str, + seq: int, + ) -> list[dict[str, Any]]: + """Map a Langfuse generation to model.invoke + cost.record.""" + events: list[dict[str, Any]] = [] + + model = obs.get("model", obs.get("modelId")) + usage = obs.get("usage", obs.get("promptTokens")) + + # Compute latency + latency_ms = self._compute_latency_ms(obs) + + # Normalize token usage + if isinstance(usage, dict): + prompt_tokens = usage.get("promptTokens", usage.get("input", 0)) + completion_tokens = usage.get("completionTokens", usage.get("output", 0)) + total_tokens = usage.get("totalTokens", usage.get("total", 0)) + else: + prompt_tokens = obs.get("promptTokens", 0) + completion_tokens = obs.get("completionTokens", 0) + total_tokens = obs.get("totalTokens", 0) + + # model.invoke (L3) + payload: dict[str, Any] = { + "provider": "langfuse", + "model": model, + "tokens_prompt": prompt_tokens or 0, + "tokens_completion": completion_tokens or 0, + "tokens_total": total_tokens or (prompt_tokens or 0) + (completion_tokens or 0), + "framework": "langfuse", + } + if latency_ms is not None: + payload["latency_ms"] = latency_ms + + # Include model parameters if present + model_params = obs.get("modelParameters") + if model_params: + payload["parameters"] = model_params + + # Check for errors + level = obs.get("level", "").upper() + status_message = obs.get("statusMessage", "") + if level == "ERROR": + payload["error"] = status_message or "Generation error" + + events.append( + self._make_event( + event_type="model.invoke", + trace_id=trace_id, + timestamp=timestamp, + sequence_id=seq, + payload=payload, + ) + ) + + # cost.record (Cross-cutting) + total_cost = obs.get("totalCost", obs.get("calculatedTotalCost")) + if total_cost is not None and total_cost > 0: + events.append( + self._make_event( + event_type="cost.record", + trace_id=trace_id, + timestamp=timestamp, + sequence_id=seq + 1, + payload={ + "model": model, + "cost_usd": total_cost, + "tokens_prompt": prompt_tokens or 0, + "tokens_completion": completion_tokens or 0, + "framework": "langfuse", + }, + ) + ) + + # Error/warning observations -> policy.violation + if level in ("ERROR", "WARNING"): + events.append( + self._make_event( + event_type="policy.violation", + trace_id=trace_id, + timestamp=timestamp, + sequence_id=seq + len(events), + payload={ + "violation_type": "error" if level == "ERROR" else "warning", + "description": status_message or f"Generation {level.lower()}", + "source": "langfuse_observation", + "observation_id": obs.get("id"), + "framework": "langfuse", + }, + ) + ) + + return events + + def _map_span( + self, + obs: dict[str, Any], + trace_id: str, + timestamp: str, + seq: int, + ) -> list[dict[str, Any]]: + """Map a Langfuse span to tool.call or agent.code.""" + name = obs.get("name", "") + obs_input = obs.get("input") + obs_output = obs.get("output") + latency_ms = self._compute_latency_ms(obs) + level = obs.get("level", "").upper() + status_message = obs.get("statusMessage", "") + + # Determine if this is a tool call (metadata hint or naming convention) + metadata = obs.get("metadata", {}) or {} + is_tool = ( + metadata.get("type") == "TOOL" + or name.lower().startswith("tool_") + or name.lower().startswith("tool:") + or metadata.get("tool_name") + ) + + events: list[dict[str, Any]] = [] + + if is_tool: + # tool.call (L5a) + payload: dict[str, Any] = { + "tool_name": metadata.get("tool_name", name), + "framework": "langfuse", + } + if obs_input is not None: + payload["input"] = obs_input + if obs_output is not None: + payload["output"] = obs_output + if latency_ms is not None: + payload["latency_ms"] = latency_ms + if level == "ERROR": + payload["error"] = status_message or "Tool error" + + events.append( + self._make_event( + event_type="tool.call", + trace_id=trace_id, + timestamp=timestamp, + sequence_id=seq, + payload=payload, + ) + ) + else: + # agent.code (L2) + payload = { + "step_name": name, + "framework": "langfuse", + } + if obs_input is not None: + payload["input"] = obs_input + if obs_output is not None: + payload["output"] = obs_output + if latency_ms is not None: + payload["latency_ms"] = latency_ms + + events.append( + self._make_event( + event_type="agent.code", + trace_id=trace_id, + timestamp=timestamp, + sequence_id=seq, + payload=payload, + ) + ) + + # Error/warning -> policy.violation + if level in ("ERROR", "WARNING"): + events.append( + self._make_event( + event_type="policy.violation", + trace_id=trace_id, + timestamp=timestamp, + sequence_id=seq + 1, + payload={ + "violation_type": "error" if level == "ERROR" else "warning", + "description": status_message or f"Span {level.lower()}", + "source": "langfuse_observation", + "observation_id": obs.get("id"), + "framework": "langfuse", + }, + ) + ) + + return events + + # --- Helpers --- + + @staticmethod + def _make_event( + event_type: str, + trace_id: str, + timestamp: str, + sequence_id: int, + payload: dict[str, Any], + langfuse_metadata: dict[str, Any] | None = None, + ) -> dict[str, Any]: + """Construct a normalized LayerLens event dict.""" + event: dict[str, Any] = { + "event_type": event_type, + "trace_id": trace_id, + "timestamp": timestamp, + "sequence_id": sequence_id, + "payload": payload, + } + if langfuse_metadata: + event["metadata"] = langfuse_metadata + return event + + @staticmethod + def _extract_trace_metadata(trace: dict[str, Any]) -> dict[str, Any]: + """Extract Langfuse-specific metadata from a trace.""" + meta: dict[str, Any] = { + "langfuse_trace_id": trace.get("id"), + } + if trace.get("sessionId"): + meta["langfuse_session_id"] = trace["sessionId"] + if trace.get("userId"): + meta["langfuse_user_id"] = trace["userId"] + if trace.get("tags"): + meta["langfuse_tags"] = trace["tags"] + if trace.get("scores"): + meta["langfuse_scores"] = trace["scores"] + return meta + + @staticmethod + def _compute_latency_ms(obs: dict[str, Any]) -> float | None: + """Compute latency from observation start/end times.""" + start = obs.get("startTime", obs.get("start_time")) + end = obs.get("endTime", obs.get("end_time")) + if not start or not end: + return None + try: + if isinstance(start, str): + start_dt = datetime.fromisoformat(start.replace("Z", "+00:00")) + else: + start_dt = start + if isinstance(end, str): + end_dt = datetime.fromisoformat(end.replace("Z", "+00:00")) + else: + end_dt = end + delta = end_dt - start_dt + return delta.total_seconds() * 1000 + except (ValueError, TypeError): + return None + + @staticmethod + def _to_str(value: Any) -> str: + """Convert a value to string representation.""" + if isinstance(value, str): + return value + if isinstance(value, dict): + import json + + return json.dumps(value) + return str(value) + + +class LayerLensToLangfuseMapper: + """ + Maps LayerLens canonical events back to Langfuse trace/observation structures. + + Used for exporting LayerLens traces to Langfuse. + """ + + def map_events_to_trace( + self, + events: list[dict[str, Any]], + trace_id: str | None = None, + ) -> dict[str, Any]: + """ + Map a list of LayerLens events to a Langfuse trace with observations. + + Returns a dict with 'trace' (trace body) and 'observations' (list of observations). + """ + trace_id = trace_id or str(uuid.uuid4()) + + # Tag with both the canonical ``layerlens-exported`` name and the + # legacy ``stratix-exported`` alias so traces remain discoverable + # by clients written against the pre-rename adapter. The importer + # likewise recognises both tags for loop prevention. + trace_body: dict[str, Any] = { + "id": trace_id, + "name": "layerlens-export", + "tags": ["layerlens-exported", "stratix-exported"], + "metadata": { + "layerlens_trace_id": trace_id, + "stratix_trace_id": trace_id, # deprecated alias + }, + } + observations: list[dict[str, Any]] = [] + + for event in events: + event_type = event.get("event_type", "") + payload = event.get("payload", {}) + timestamp = event.get("timestamp", datetime.now(UTC).isoformat()) + event.get("metadata", {}) + + if event_type == "agent.input": + trace_body["input"] = payload.get("input", payload.get("input_text")) + if not trace_body.get("name") or trace_body["name"] == "stratix-export": + agent_id = payload.get("agent_id") + if agent_id: + trace_body["name"] = agent_id + + elif event_type == "agent.output": + trace_body["output"] = payload.get("output", payload.get("output_text")) + + elif event_type == "model.invoke": + obs = self._make_generation(payload, timestamp, trace_id) + observations.append(obs) + + elif event_type == "tool.call": + obs = self._make_tool_span(payload, timestamp, trace_id) + observations.append(obs) + + elif event_type == "agent.code": + obs = self._make_default_span(payload, timestamp, trace_id) + observations.append(obs) + + elif event_type == "cost.record": + # Cost is attached to corresponding generation — find matching + self._attach_cost(observations, payload) + + elif event_type == "environment.config": + config = payload.get("config", {}) + existing_meta = trace_body.get("metadata", {}) + existing_meta["environment_config"] = config + trace_body["metadata"] = existing_meta + + elif event_type == "agent.handoff": + obs = self._make_handoff_span(payload, timestamp, trace_id) + observations.append(obs) + + elif event_type == "agent.state.change": + obs = self._make_state_span(payload, timestamp, trace_id) + observations.append(obs) + + return {"trace": trace_body, "observations": observations} + + @staticmethod + def _make_generation( + payload: dict[str, Any], + timestamp: str, + trace_id: str, + ) -> dict[str, Any]: + """Create a Langfuse generation observation from model.invoke event.""" + gen: dict[str, Any] = { + "id": str(uuid.uuid4()), + "traceId": trace_id, + "type": "GENERATION", + "name": payload.get("model", "unknown-model"), + "startTime": timestamp, + "model": payload.get("model"), + } + # Token usage + usage: dict[str, Any] = {} + if payload.get("tokens_prompt"): + usage["promptTokens"] = payload["tokens_prompt"] + if payload.get("tokens_completion"): + usage["completionTokens"] = payload["tokens_completion"] + if payload.get("tokens_total"): + usage["totalTokens"] = payload["tokens_total"] + if usage: + gen["usage"] = usage + + # Parameters + if payload.get("parameters"): + gen["modelParameters"] = payload["parameters"] + + # Latency -> end time + if payload.get("latency_ms"): + gen["endTime"] = timestamp # Approximate + + # Error + if payload.get("error"): + gen["level"] = "ERROR" + gen["statusMessage"] = payload["error"] + + return gen + + @staticmethod + def _make_tool_span( + payload: dict[str, Any], + timestamp: str, + trace_id: str, + ) -> dict[str, Any]: + """Create a Langfuse TOOL span from tool.call event.""" + span: dict[str, Any] = { + "id": str(uuid.uuid4()), + "traceId": trace_id, + "type": "SPAN", + "name": payload.get("tool_name", "unknown-tool"), + "startTime": timestamp, + "metadata": {"type": "TOOL"}, + } + if payload.get("input") is not None: + span["input"] = payload["input"] + if payload.get("output") is not None: + span["output"] = payload["output"] + if payload.get("error"): + span["level"] = "ERROR" + span["statusMessage"] = payload["error"] + return span + + @staticmethod + def _make_default_span( + payload: dict[str, Any], + timestamp: str, + trace_id: str, + ) -> dict[str, Any]: + """Create a Langfuse DEFAULT span from agent.code event.""" + span: dict[str, Any] = { + "id": str(uuid.uuid4()), + "traceId": trace_id, + "type": "SPAN", + "name": payload.get("step_name", "execution-step"), + "startTime": timestamp, + } + if payload.get("input") is not None: + span["input"] = payload["input"] + if payload.get("output") is not None: + span["output"] = payload["output"] + return span + + @staticmethod + def _make_handoff_span( + payload: dict[str, Any], + timestamp: str, + trace_id: str, + ) -> dict[str, Any]: + """Create a Langfuse span for agent.handoff event.""" + return { + "id": str(uuid.uuid4()), + "traceId": trace_id, + "type": "SPAN", + "name": f"handoff:{payload.get('from_agent', '?')}->{payload.get('to_agent', '?')}", + "startTime": timestamp, + "metadata": { + "type": "HANDOFF", + "from_agent": payload.get("from_agent"), + "to_agent": payload.get("to_agent"), + "context": payload.get("context"), + }, + } + + @staticmethod + def _make_state_span( + payload: dict[str, Any], + timestamp: str, + trace_id: str, + ) -> dict[str, Any]: + """Create a Langfuse span for agent.state.change event.""" + return { + "id": str(uuid.uuid4()), + "traceId": trace_id, + "type": "SPAN", + "name": f"state-change:{payload.get('state_type', 'unknown')}", + "startTime": timestamp, + "metadata": { + "type": "STATE_CHANGE", + "before": payload.get("before"), + "after": payload.get("after"), + }, + } + + @staticmethod + def _attach_cost( + observations: list[dict[str, Any]], + cost_payload: dict[str, Any], + ) -> None: + """Attach cost to the matching generation observation.""" + model = cost_payload.get("model") + cost_usd = cost_payload.get("cost_usd") + if cost_usd is None: + return + + # Find a matching generation by model name + for obs in reversed(observations): + if obs.get("type") == "GENERATION": # noqa: SIM102 + if model is None or obs.get("model") == model: + obs["totalCost"] = cost_usd + return + # No match — attach to last generation if any + for obs in reversed(observations): + if obs.get("type") == "GENERATION": + obs["totalCost"] = cost_usd + return diff --git a/src/layerlens/instrument/adapters/frameworks/langfuse/sync.py b/src/layerlens/instrument/adapters/frameworks/langfuse/sync.py new file mode 100644 index 00000000..2ad9653c --- /dev/null +++ b/src/layerlens/instrument/adapters/frameworks/langfuse/sync.py @@ -0,0 +1,89 @@ +""" +Langfuse Bidirectional Sync + +Coordinates import and export with cursor tracking and conflict resolution. +""" + +from __future__ import annotations + +import logging +from typing import Any +from datetime import datetime + +from layerlens.instrument.adapters.frameworks.langfuse.config import SyncState, SyncResult, SyncDirection +from layerlens.instrument.adapters.frameworks.langfuse.exporter import TraceExporter +from layerlens.instrument.adapters.frameworks.langfuse.importer import TraceImporter + +logger = logging.getLogger(__name__) + + +class BidirectionalSync: + """ + Orchestrates bidirectional sync between Langfuse and LayerLens. + + Uses cursor-based incremental sync to minimize API calls. + """ + + def __init__( + self, + importer: TraceImporter, + exporter: TraceExporter, + state: SyncState, + ) -> None: + self._importer = importer + self._exporter = exporter + self._state = state + + def run( + self, + stratix: Any | None = None, + direction: SyncDirection = SyncDirection.BIDIRECTIONAL, + since: datetime | None = None, + dry_run: bool = False, + events_by_trace: dict[str, list[dict[str, Any]]] | None = None, + tags: list[str] | None = None, + ) -> SyncResult: + """ + Run a sync cycle. + + Args: + stratix: LayerLens instance for event emission. + direction: Sync direction (import, export, or bidirectional). + since: Override since timestamp. + dry_run: If True, report what would happen without making changes. + events_by_trace: LayerLens events for export (required for export/bidirectional). + tags: Filter tags for import. + + Returns: + Combined SyncResult. + """ + result = SyncResult(direction=direction, dry_run=dry_run) + + # Import phase + if direction in (SyncDirection.IMPORT, SyncDirection.BIDIRECTIONAL): + effective_since = since or self._state.last_import_cursor + import_result = self._importer.import_traces( + stratix=stratix, + since=effective_since, + tags=tags, + dry_run=dry_run, + ) + result.imported_count = import_result.imported_count + result.skipped_count += import_result.skipped_count + result.failed_count += import_result.failed_count + result.quarantined_count += import_result.quarantined_count + result.errors.extend(import_result.errors) + + # Export phase + if direction in (SyncDirection.EXPORT, SyncDirection.BIDIRECTIONAL): # noqa: SIM102 + if events_by_trace: + export_result = self._exporter.export_traces( + events_by_trace=events_by_trace, + dry_run=dry_run, + ) + result.exported_count = export_result.exported_count + result.skipped_count += export_result.skipped_count + result.failed_count += export_result.failed_count + result.errors.extend(export_result.errors) + + return result diff --git a/tests/instrument/adapters/__init__.py b/tests/instrument/adapters/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/instrument/adapters/frameworks/__init__.py b/tests/instrument/adapters/frameworks/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/instrument/adapters/frameworks/test_langfuse_adapter.py b/tests/instrument/adapters/frameworks/test_langfuse_adapter.py new file mode 100644 index 00000000..0fc633a1 --- /dev/null +++ b/tests/instrument/adapters/frameworks/test_langfuse_adapter.py @@ -0,0 +1,162 @@ +"""Unit tests for the Langfuse framework adapter. + +Mocked at the SDK shape level — no real Langfuse API calls. + +Unlike runtime-wrapping adapters, the Langfuse adapter is a data +import/export pipeline. Tests focus on: + + * lifecycle (with and without config) + * connect-without-config edge case (adapter still healthy, no client) + * import/export return SyncResult with appropriate error message when + no client is configured + * SyncState tracking semantics + * health_check / get_status structural correctness + * serialize_for_replay returns proper ReplayableTrace +""" + +from __future__ import annotations + +from typing import Any, Dict, List + +from layerlens.instrument.adapters._base import AdapterStatus, CaptureConfig +from layerlens.instrument.adapters.frameworks.langfuse import ( + ADAPTER_CLASS, + LangfuseAdapter, +) +from layerlens.instrument.adapters.frameworks.langfuse.config import ( + SyncDirection, + LangfuseConfig, +) + + +class _RecordingStratix: + def __init__(self) -> None: + self.events: List[Dict[str, Any]] = [] + + def emit(self, *args: Any, **kwargs: Any) -> None: + if len(args) == 2 and isinstance(args[0], str): + self.events.append({"event_type": args[0], "payload": args[1]}) + + +def test_adapter_class_export() -> None: + assert ADAPTER_CLASS is LangfuseAdapter + + +def test_lifecycle_no_config() -> None: + """Adapter is usable without a Langfuse config — connects HEALTHY but no client.""" + a = LangfuseAdapter() + a.connect() + assert a.status == AdapterStatus.HEALTHY + assert a.is_connected is True + a.disconnect() + assert a.status == AdapterStatus.DISCONNECTED + + +def test_adapter_info_and_health() -> None: + a = LangfuseAdapter() + a.connect() + info = a.get_adapter_info() + assert info.framework == "langfuse" + assert info.name == "LangfuseAdapter" + health = a.health_check() + assert health.framework_name == "langfuse" + assert "No Langfuse config" in (health.message or "") + + +def test_import_returns_error_result_when_not_connected() -> None: + """Without a Langfuse client, import_traces returns an errored SyncResult.""" + a = LangfuseAdapter() + a.connect() + result = a.import_traces() + assert result.direction == SyncDirection.IMPORT + assert result.errors + assert "not connected" in result.errors[0].lower() + + +def test_export_returns_error_result_when_not_connected() -> None: + """Without a Langfuse client, export_traces returns an errored SyncResult.""" + a = LangfuseAdapter() + a.connect() + result = a.export_traces(events_by_trace={"trace-1": []}) + assert result.direction == SyncDirection.EXPORT + assert result.errors + assert "not connected" in result.errors[0].lower() + + +def test_sync_returns_error_result_when_not_connected() -> None: + """Without a Langfuse client, sync() returns an errored SyncResult.""" + a = LangfuseAdapter() + a.connect() + result = a.sync() + assert result.errors + assert "not connected" in result.errors[0].lower() + + +def test_sync_state_tracking() -> None: + """SyncState records imports/exports and updates cursors.""" + a = LangfuseAdapter() + a.connect() + state = a.sync_state + + from datetime import datetime, timezone + + UTC = timezone.utc # Python 3.11+ has datetime.UTC; alias for 3.9/3.10 compat. + + t0 = datetime(2024, 1, 1, tzinfo=UTC) + t1 = datetime(2024, 1, 2, tzinfo=UTC) + state.record_import("trace-1", t0) + state.record_import("trace-2", t1) + assert "trace-1" in state.imported_trace_ids + assert "trace-2" in state.imported_trace_ids + assert state.last_import_cursor == t1 + + +def test_get_status_structure() -> None: + """get_status returns a complete status dict.""" + a = LangfuseAdapter() + a.connect() + status = a.get_status() + assert "connected" in status + assert "langfuse_healthy" in status + assert "host" in status + assert "imported_traces" in status + assert "exported_traces" in status + assert "quarantined_traces" in status + + +def test_config_property_default_none() -> None: + """When no config provided, config property is None.""" + a = LangfuseAdapter() + a.connect() + assert a.config is None + + +def test_config_property_returns_provided_config() -> None: + """When config provided to constructor, it is exposed via the config property. + + This avoids ``connect(config=cfg)`` which would attempt an HTTP health + check against the (fake) host. The constructor path stores the config + without networking; ``connect()`` then runs without the health check + when ``self._config`` was set on a prior call we skip here. + """ + cfg = LangfuseConfig(public_key="pk-test", secret_key="sk-test", host="https://api/") + a = LangfuseAdapter(config=cfg) + assert a.config is cfg + # Trailing slash is stripped by the validator. + assert a.config.host == "https://api" + + +def test_capture_config_passes_through() -> None: + """The standard capture_config kwarg is accepted and stored.""" + cfg = CaptureConfig.full() + a = LangfuseAdapter(capture_config=cfg) + assert a.capture_config is cfg + + +def test_serialize_for_replay() -> None: + a = LangfuseAdapter(stratix=_RecordingStratix(), capture_config=CaptureConfig.full()) + a.connect() + rt = a.serialize_for_replay() + assert rt.framework == "langfuse" + assert rt.adapter_name == "LangfuseAdapter" + assert "sync_state" in rt.metadata