Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
194 changes: 194 additions & 0 deletions docs/adapters/memory-contract.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,194 @@
# Memory persistence contract for adapters

LayerLens adapters carry per-conversation, per-agent recall — episodic,
procedural, and semantic memory — alongside the trace events they emit.
This module ports the ad-hoc memory plumbing that the four mature
framework adapters (LangChain, AutoGen, CrewAI, Semantic Kernel) carry
on the `ateam` monorepo into a shared, replay-safe primitive that any
adapter on the `stratix-python` SDK can plug into. Without this
plumbing, lighter adapters behave as "goldfish agents" — every run
starts from a blank slate, which is the difference between a usable
production agent and a demo.

This document defines the binding contract every adapter that integrates
the recorder must satisfy. It is enforced at runtime by
`MemoryRecorder.__init__` (fail-fast on missing tenant), by
`MemoryRecorder.restore` (cross-tenant rejection + content-hash
integrity check), and at CI time by
`tests/instrument/adapters/_base/test_memory.py`.

Cross-pollination audit reference:
[`A:/tmp/adapter-cross-pollination-audit.md`](../../../tmp/adapter-cross-pollination-audit.md)
§2 item #1.

## The three buckets

The memory model is the canonical agent-memory split that appears
across the literature (LangChain memory module; CrewAI procedural
memory; AutoGen episodic/semantic split):

| Bucket | Lifetime | Bounded by | Eviction |
|----------------|-----------------------|------------------------|------------------------|
| **Episodic** | per-turn | `max_episodic` (200) | FIFO (oldest dropped) |
| **Procedural** | recurring patterns | `max_procedural` (16) | least-frequent + ties broken by oldest `last_seen_turn` |
| **Semantic** | long-lived facts | `max_semantic` (64) | least-recently-set |

* **Episodic** — per-turn `(input, output, error?, tools?, extra?)`
records, ordered by `turn_index`. The detector for procedural
patterns reads this stream.
* **Procedural** — derived: each entry is
`{"pattern": [[prev_turn_tools], [cur_turn_tools]], "count": int,
"last_seen_turn": int}`. Detected automatically from the recent
episodic window every time `record_turn` is called.
* **Semantic** — caller-controlled key/value store
(`set_semantic(key, value)`). Both keys and values are stringified.
Callers wanting structured semantic data should JSON-encode their
value.

All three buckets are **bounded** (CLAUDE.md "every cache must be
bounded"). The defaults are conservative; callers wanting a different
size construct the recorder with explicit `max_*` kwargs.

## The contract

1. **Every adapter owns exactly one `MemoryRecorder`.** Constructed in
`BaseAdapter.__init__` and exposed via `adapter.memory_recorder`
(read-only property). The recorder is bound to the same `org_id` as
the adapter — multi-tenancy is propagated.

2. **Construction without a tenant raises.** `MemoryRecorder(org_id="")`
raises `ValueError`. There is no "default" tenant, no blank
fallback. `BaseAdapter.__init__` already fails fast on missing
`org_id` (see [multi-tenancy.md](multi-tenancy.md)) so the recorder
inherits that guarantee.

3. **Every recorded turn is bounded.** A single oversized turn cannot
blow past the bucket caps: per-field strings longer than 8 KB are
truncated to a deterministic suffix (`<...truncated:orig_len=N>`).
The truncation is defence-in-depth, not a substitute for the
adapter-level truncation policy (cross-poll #3). Detection of
recurring tool patterns runs in O(window) per turn.

4. **Cross-tenant restore is prohibited.** `restore(snapshot)` raises
`ValueError` if `snapshot.org_id != recorder.org_id`. This mirrors
the `BaseAdapter.org_id` contract — a tenant-A snapshot cannot land
in a tenant-B recorder, even if both happen to share a process.

5. **Snapshots are tamper-evident.** `restore(snapshot)` recomputes the
SHA-256 content hash and rejects the snapshot if the recorded hash
does not match. Guards against accidentally-mutated dicts in
transit and against forged snapshots reconstructed without the
`MemorySnapshot` factory.

6. **Snapshots are replay-safe.** The round-trip
`snapshot() → restore() → snapshot()` produces a snapshot with the
identical `content_hash` (deterministic reconstruction). This is the
foundation of replay-safe memory: the replay engine restores the
recorder, then the adapter re-runs the agent and produces the same
next-turn snapshot. The `record_turn` method stamps a wall-clock
`timestamp_ns` into the new turn — replay engines suppress this
drift by capturing the original `timestamp_ns` from the source
trace and seeding the recorder's clock at restore time.

7. **Snapshot serialisation is dict-shaped.** `snapshot.to_dict()`
returns a JSON-serialisable dict; `MemorySnapshot.from_dict(data)`
round-trips. Adapters embed the dict under
`ReplayableTrace.metadata["memory_snapshot"]` so the replay engine
can reconstruct via `MemoryRecorder.restore(MemorySnapshot.from_dict(...))`.

8. **Recording is best-effort.** `BaseAdapter.record_memory_turn`
catches and logs all exceptions at DEBUG. A failure inside the
recorder MUST NOT propagate into the host framework's call stack —
tracing never breaks user code (CLAUDE.md). The trade-off is that a
recorder bug shows up as missing memory in the replay rather than a
crash in production.

9. **Thread-safe.** `record_turn`, `set_semantic`, `clear`, and
`restore` are all guarded by an internal lock. Many concurrent
`record_turn` calls produce a snapshot whose `episodic` indices
form an unbroken `1..N` sequence.

## Wiring at the lifecycle hook

Every adapter wires `record_memory_turn(...)` into its **agent-output
boundary** — the point at which the framework reports a completed
agent step / chat turn / invocation. The exact hook varies by
framework:

| Adapter | Hook | Episodic input | Tool list |
|---------------------|---------------------------------------------------------------------|---------------------------|--------------------------------------------------------------|
| `agno` | `Agent.run` / `arun` finally-block | `args[0] / kwargs["input"]`| `_collect_tool_names(agent, result)` from `result.messages` |
| `ms_agent_framework`| `Chat.invoke` / `invoke_stream` finally-block | `kwargs["input"]/["message"]`| `_collect_tool_names_from_messages(seen)` from streamed items|
| `openai_agents` | `_on_agent_span_end` (TraceProcessor) + `on_run_end` (Runner wrap) | cached at `_on_agent_span_start` per `span_id` | rolled up from `_on_function_span_end` per `parent_id` |
| `llama_index` | `_on_agent_step_end` | cached at `_on_agent_step_start` per thread id| rolled up from `_on_tool_call` per thread id |
| `google_adk` | `after_agent_callback` + `on_agent_end` | cached at `before_agent_callback` per thread id| rolled up from `after_tool_callback` per thread id |
| `bedrock_agents` | `_after_invoke_agent` (boto3 hook) | cached at `_before_invoke_agent` per thread id| rolled up from `_process_trace` action-group / KB step names |

Each adapter also embeds its memory snapshot in `serialize_for_replay`
output via `ReplayableTrace.metadata["memory_snapshot"] =
self.memory_snapshot_dict()` — so a downstream replay engine can
reconstruct the full episodic + procedural + semantic state before
re-execution.

## Honest scope disclosure (target adapter coverage)

The cross-pollination audit §2 item #1 enumerates **seven** target
adapters: `agno`, `ms_agent_framework`, `openai_agents`, `llama_index`,
`google_adk`, `bedrock_agents`, **`browser_use`**.

Six are wired in this PR. The seventh — `browser_use` — does not exist
on this branch's base (`feat/instrument-multitenancy-org-id-propagation`);
it lives on the parallel `feat/instrument-frameworks-browser-use-full`
history. It will be wired when that adapter is ported to this base or
when the histories merge. This follows the same honest-disclosure
pattern as PR #120 (state filters, which omitted `ms_agent_framework`
for the same reason — adapter not on its base).

For `browser_use`, the eventual wiring (per the cross-pollination
audit) will be:

* **Episodic** — page navigation events (`url`, `action`, `selector`)
per turn.
* **Procedural** — recurring `(prev_action, current_action)` patterns
(e.g. `"click[search]"` → `"type[query]"` → `"click[submit]"`).
* **Semantic** — long-lived page-content cache keyed by URL or DOM
hash, so a re-visit can short-circuit page reload during replay.

## Audit hooks

* **Construction failures** — `MemoryRecorder.__init__` raises with a
message naming the missing field (`"non-empty org_id"`,
`"bounded buffer sizes"`).
* **Cross-tenant restore** — raises with the explicit
`"Cross-tenant restore is prohibited (CLAUDE.md multi-tenancy)"`
message.
* **Tampered snapshots** — raises with
`"snapshot content_hash mismatch"` and includes the recorded vs
recomputed hashes for triage.
* **Best-effort recording failures** — logged at DEBUG via
`BaseAdapter.record_memory_turn` with `exc_info=True` so the failing
call site is preserved without escalating.

## Replay engine integration

A replay flow looks like:

```python
# Original run captures both events and memory.
adapter = AgnoAdapter(stratix=client, org_id="tenant-A")
# ... agent runs, on_run_end fires record_memory_turn() ...
trace = adapter.serialize_for_replay()
trace.metadata["memory_snapshot"] # serialised MemorySnapshot dict.

# Replay reconstructs the recorder before re-execution.
replay_adapter = AgnoAdapter(stratix=client, org_id="tenant-A")
snapshot = MemorySnapshot.from_dict(trace.metadata["memory_snapshot"])
replay_adapter.memory_recorder.restore(snapshot)
# Re-run the agent — it sees the original recall state.
```

The next-turn snapshot taken from `replay_adapter` will match the
original (modulo the wall-clock `timestamp_ns` of the new turn — see
contract item 6). This is what makes memory persistence "replay-safe":
the replay engine can drive an adapter through the same agent state
the original run reached.
12 changes: 12 additions & 0 deletions src/layerlens/instrument/adapters/_base/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,13 @@
TraceStoreSink,
IngestionPipelineSink,
)
from layerlens.instrument.adapters._base.memory import (
DEFAULT_MAX_EPISODIC,
DEFAULT_MAX_SEMANTIC,
DEFAULT_MAX_PROCEDURAL,
MemoryRecorder,
MemorySnapshot,
)
from layerlens.instrument.adapters._base.adapter import (
ORG_ID_FIELD,
AdapterInfo,
Expand Down Expand Up @@ -41,8 +48,13 @@
"AdapterStatus",
"BaseAdapter",
"CaptureConfig",
"DEFAULT_MAX_EPISODIC",
"DEFAULT_MAX_PROCEDURAL",
"DEFAULT_MAX_SEMANTIC",
"EventSink",
"IngestionPipelineSink",
"MemoryRecorder",
"MemorySnapshot",
"ORG_ID_FIELD",
"PydanticCompat",
"ReplayableTrace",
Expand Down
95 changes: 95 additions & 0 deletions src/layerlens/instrument/adapters/_base/adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
from layerlens.instrument.adapters._base.sinks import EventSink

from layerlens._compat.pydantic import Field, BaseModel, model_dump
from layerlens.instrument.adapters._base.memory import MemoryRecorder, MemorySnapshot
from layerlens.instrument.adapters._base.capture import (
ALWAYS_ENABLED_EVENT_TYPES,
CaptureConfig,
Expand Down Expand Up @@ -292,6 +293,18 @@ def __init__(
# the public API and may change in v2.
self._event_sinks: List["EventSink"] = list(event_sinks) if event_sinks else []

# Cross-poll #1: per-adapter memory recorder. Bound to the same
# tenant as the adapter — :class:`MemoryRecorder` enforces the
# match on every :meth:`MemoryRecorder.restore`. Adapters call
# :meth:`record_memory_turn` after each ``agent.output``-style
# event so cross-conversation recall (episodic / procedural /
# semantic) lives alongside the trace events. The recorder is
# included in :meth:`serialize_for_replay` output via
# :meth:`memory_snapshot_dict` so a replay engine can deterministically
# reconstruct the agent's memory state before re-execution. See
# ``docs/adapters/memory-contract.md`` for the contract.
self._memory_recorder: MemoryRecorder = MemoryRecorder(org_id=self._org_id)

# --- Sink management (public API) ---

def add_sink(self, sink: "EventSink") -> None:
Expand Down Expand Up @@ -397,6 +410,88 @@ def info(self) -> AdapterInfo:
base_info = base_info.copy(update={"requires_pydantic": self.requires_pydantic})
return base_info

# --- Memory persistence (cross-poll #1) ------------------------

@property
def memory_recorder(self) -> MemoryRecorder:
"""The adapter's bound :class:`MemoryRecorder`.

Constructed in :meth:`__init__` and tenant-scoped to the same
``org_id`` as the adapter. Adapters wire :meth:`record_memory_turn`
into their per-turn lifecycle hooks (typically alongside the
``agent.output`` emission).
"""
return self._memory_recorder

def record_memory_turn(
self,
*,
agent_name: Optional[str] = None,
input_data: Any = None,
output_data: Any = None,
error: Optional[str] = None,
tools: Optional[List[str]] = None,
extra: Optional[Dict[str, Any]] = None,
) -> None:
"""Record a completed turn in the per-adapter memory recorder.

Adapters call this from their ``on_run_end`` / ``after_agent_callback``
/ equivalent per-turn lifecycle hook *after* emitting the
``agent.output`` event. The recorder is thread-safe and bounded
(see :class:`MemoryRecorder` defaults), so repeated calls under
concurrent execution are safe and never grow without bound.

Failures are swallowed and logged at DEBUG: memory persistence is
a best-effort cross-cutting concern and must not break the host
framework's run path. CLAUDE.md "tracing never breaks user code".

Args:
agent_name: The agent that produced this turn.
input_data: Input the agent received.
output_data: Output the agent produced.
error: Optional error string if the turn failed.
tools: Optional list of tool names invoked during the turn.
extra: Optional adapter-specific metadata. Keys are sorted
inside the recorder for hash determinism.
"""
try:
self._memory_recorder.record_turn(
agent_name=agent_name,
input_data=input_data,
output_data=output_data,
error=error,
tools=tools,
extra=extra,
)
except Exception:
# Memory persistence is best-effort. A failure here must
# never propagate into the host framework's call stack.
logger.debug(
"MemoryRecorder.record_turn failed for adapter %s",
self.FRAMEWORK,
exc_info=True,
)

def memory_snapshot(self) -> MemorySnapshot:
"""Return the current immutable :class:`MemorySnapshot`.

Convenience wrapper for :meth:`MemoryRecorder.snapshot` so
callers do not need to reach into the private recorder.
"""
return self._memory_recorder.snapshot()

def memory_snapshot_dict(self) -> Dict[str, Any]:
"""Return the memory snapshot as a JSON-serialisable dict.

Adapters embed this in :meth:`serialize_for_replay`'s
:class:`ReplayableTrace` ``metadata`` under the
``"memory_snapshot"`` key so the replay engine can reconstruct
the recorder's state via :meth:`MemoryRecorder.restore` before
re-executing the agent. The shape is the public, content-addressable
contract documented in ``docs/adapters/memory-contract.md``.
"""
return self._memory_recorder.snapshot().to_dict()

@abstractmethod
def serialize_for_replay(self) -> ReplayableTrace:
"""Serialize the current trace data for replay."""
Expand Down
Loading