diff --git a/src/layerlens/instrument/adapters/_base/adapter.py b/src/layerlens/instrument/adapters/_base/adapter.py index b20ccb9..ef1161a 100644 --- a/src/layerlens/instrument/adapters/_base/adapter.py +++ b/src/layerlens/instrument/adapters/_base/adapter.py @@ -32,6 +32,10 @@ ALWAYS_ENABLED_EVENT_TYPES, CaptureConfig, ) +from layerlens.instrument.adapters._base.logging import ( + TenantContextLogAdapter, + get_tenant_logger, +) from layerlens.instrument.adapters._base.pydantic_compat import PydanticCompat # Forward reference: EventSink is defined in sinks.py, which itself does not @@ -43,6 +47,60 @@ logger = logging.getLogger(__name__) +# --------------------------------------------------------------------------- +# OTel ↔ SDK org_id correlation (Gap 3) +# --------------------------------------------------------------------------- + +# OTel span attribute name used to correlate distributed traces with the +# SDK's tenant binding. Picked under the ``layerlens.*`` namespace so it +# does not clash with the OTel semantic conventions and so atlas-app / +# downstream span processors can index on it directly. +_OTEL_ORG_ID_ATTR: str = "layerlens.org_id" + + +def _set_current_span_org_id(org_id: str) -> None: + """Stamp ``layerlens.org_id`` onto the currently-active OTel span. + + No-op in any of the following conditions: + + * OpenTelemetry is not installed (``opentelemetry`` import fails). + * No span is currently active (``trace.get_current_span()`` returns + the no-op ``INVALID_SPAN``). + * The active span is not recording (``span.is_recording()`` is + ``False``). + * The OTel API raises (defensive — we never let logging / + observability take down the adapter hot path). + + The attribute is set under the ``layerlens.*`` prefix so it can + coexist with OTel semantic-convention attributes + (``service.namespace`` etc.) without collision. Downstream span + processors (atlas-app, Tempo, Jaeger) can index on this attribute + to filter spans by tenant — closing the cross-correlation gap + between the SDK's per-event ``org_id`` and OTel's distributed + span graph. + """ + try: + from opentelemetry import trace as _otel_trace + except ImportError: + return + try: + span = _otel_trace.get_current_span() + # ``get_current_span`` always returns a span object (the no-op + # ``INVALID_SPAN`` when no context exists). Defer to the + # ``is_recording`` predicate so non-recording / sampled-out + # spans do not pay the attribute-set cost. + is_recording = getattr(span, "is_recording", None) + if callable(is_recording) and not is_recording(): + return + span.set_attribute(_OTEL_ORG_ID_ATTR, org_id) + except Exception: # noqa: BLE001 — observability MUST never raise + # Defensive: if the OTel API misbehaves under any condition, + # we drop the attribute set rather than fault the emit path. + logger.debug( + "Failed to stamp %s onto active OTel span", _OTEL_ORG_ID_ATTR, exc_info=True + ) + + # --------------------------------------------------------------------------- # Enums & Models # --------------------------------------------------------------------------- @@ -278,13 +336,19 @@ def __init__( self._connected = False self._status: AdapterStatus = AdapterStatus.DISCONNECTED - # Circuit breaker state (protected by _lock). + # Circuit breaker state (protected by _lock). The lock and + # counter are *per-instance*, so they inherit the adapter's + # single-tenant binding — tenant A's circuit cannot trip + # tenant B's adapter even though both share the BaseAdapter + # class. See ``test_cache_tenant_isolation.py``. self._lock = threading.Lock() self._error_count = 0 self._circuit_open = False self._circuit_opened_at: float = 0.0 - # Collected events for replay serialization. + # Collected events for replay serialization. Per-instance, so + # cross-tenant trace contamination is structurally impossible + # for the in-memory event buffer. self._trace_events: List[Dict[str, Any]] = [] # Pluggable event sinks for persistence / export. Use add_sink / @@ -292,6 +356,14 @@ def __init__( # the public API and may change in v2. self._event_sinks: List["EventSink"] = list(event_sinks) if event_sinks else [] + # Per-instance tenant-aware logger (Gap 4). All adapter log + # lines now carry ``org_id`` in record extras AND in a + # ``[org_id=...]`` message prefix. Subclasses inherit this via + # ``self.tlogger`` and should prefer it over plain ``logging``. + self._tlogger: TenantContextLogAdapter = get_tenant_logger( + type(self).__module__, self._org_id + ) + # --- Sink management (public API) --- def add_sink(self, sink: "EventSink") -> None: @@ -350,6 +422,19 @@ def org_id(self) -> str: """ return self._org_id + @property + def tlogger(self) -> TenantContextLogAdapter: + """Per-instance tenant-aware logger. + + Wraps a standard :class:`logging.Logger` named by the subclass + module and stamps ``org_id`` into every record's ``extra`` dict + AND prefixes the message with ``[org_id=...]``. Subclasses + SHOULD use this in place of ``logging.getLogger(__name__)`` so + adapter log lines carry tenant context end-to-end (Gap 4 of the + multi-tenancy hardening contract). + """ + return self._tlogger + # --- Abstract lifecycle methods --- @abstractmethod @@ -496,6 +581,12 @@ def emit_event( payload = self._stamp_org_id(payload) + # Cross-correlate the SDK's per-event tenant binding with the + # currently-active OTel span so distributed-trace consumers can + # filter spans by tenant. No-op when OTel is absent or when no + # recording span is active. (Gap 3.) + _set_current_span_org_id(self._org_id) + try: if privacy_level is not None: self._stratix.emit(payload, privacy_level) @@ -532,6 +623,10 @@ def emit_dict_event( payload = self._stamp_org_id(payload) + # Cross-correlate with the active OTel span (see emit_event for + # the full rationale). Same no-op semantics. (Gap 3.) + _set_current_span_org_id(self._org_id) + try: self._stratix.emit(event_type, payload) self._post_emit_success(event_type, payload) diff --git a/src/layerlens/instrument/adapters/_base/logging.py b/src/layerlens/instrument/adapters/_base/logging.py new file mode 100644 index 0000000..ddd0c17 --- /dev/null +++ b/src/layerlens/instrument/adapters/_base/logging.py @@ -0,0 +1,128 @@ +"""Tenant-aware logging utilities for LayerLens adapters. + +Provides :class:`TenantContextLogAdapter` (a thin +:class:`logging.LoggerAdapter` subclass) and the convenience constructor +:func:`get_tenant_logger`. Every log record produced through the adapter +is enriched with the bound tenant's ``org_id`` in two surfaces: + +* the ``extra`` keyword propagated to ``logging.LogRecord.__dict__`` so + log handlers (JSON formatters, OTel log exporters, structlog + processors) can promote it to a structured field; +* the ``"[org_id=...] "`` prefix on the formatted message so plain-text + log lines carry the tenant binding without any handler config. + +This implements Gap 4 of the multi-tenancy hardening contract +(CLAUDE.md "EVERY data operation must be scoped by tenant"). Adapter +log lines previously omitted ``org_id``, making per-tenant log +filtering and incident triage impossible. After this change every +emission, circuit-breaker state change, and sink dispatch failure logs +carries the tenant context end-to-end. + +Thread-safety +------------- +The Python ``logging`` module is process-wide thread-safe; this adapter +adds no shared mutable state, only a per-instance ``org_id`` string. +Multiple adapters bound to different tenants can therefore share a +single underlying ``logging.Logger`` without leaking org_id across +instances. +""" + +from __future__ import annotations + +import logging +from typing import Any, Tuple, Mapping, MutableMapping + +# Reserved key used both inside ``extra`` and as the formatted prefix. +# Matches :data:`layerlens.instrument.adapters._base.adapter.ORG_ID_FIELD` +# — the same canonical key used everywhere in the platform. +_ORG_ID_KEY: str = "org_id" + + +class TenantContextLogAdapter(logging.LoggerAdapter): # type: ignore[type-arg] + """:class:`logging.LoggerAdapter` that injects ``org_id`` into every record. + + Construction takes a base :class:`logging.Logger` and the tenant's + ``org_id``. Each ``debug`` / ``info`` / ``warning`` / ``error`` / + ``critical`` call: + + 1. Adds ``org_id`` to the record's ``extra`` dict so structured + handlers see it as a first-class field. + 2. Prepends ``"[org_id=] "`` to the message body so flat-text + log lines carry the tenant binding without handler config. + + The adapter is **per-instance** — adapters bound to different + tenants must use distinct :class:`TenantContextLogAdapter` instances + even when they share the same underlying logger name. The + :func:`get_tenant_logger` factory enforces this by always creating a + fresh instance. + + Example:: + + log = TenantContextLogAdapter(logging.getLogger(__name__), org_id="org-A") + log.warning("circuit breaker open") + # record.extra["org_id"] == "org-A" + # formatted message: "[org_id=org-A] circuit breaker open" + """ + + def __init__(self, logger: logging.Logger, org_id: str) -> None: + if not isinstance(org_id, str) or not org_id.strip(): + raise ValueError( + "TenantContextLogAdapter requires a non-empty org_id " + "string. Construction without a tenant binding violates " + "the multi-tenancy contract — see CLAUDE.md." + ) + super().__init__(logger, {_ORG_ID_KEY: org_id}) + self._org_id: str = org_id + + @property + def org_id(self) -> str: + """The tenant ``org_id`` bound to this log adapter.""" + return self._org_id + + def process( + self, + msg: Any, + kwargs: MutableMapping[str, Any], + ) -> Tuple[Any, MutableMapping[str, Any]]: + """Inject ``org_id`` into the record's ``extra`` and prefix the message. + + ``kwargs["extra"]`` is merged (caller-supplied keys win EXCEPT + for ``org_id``, which the adapter always stamps to its bound + tenant — caller cannot override the tenant binding via a stray + ``extra={"org_id": ...}`` argument). + """ + # Merge caller extras with the tenant binding. Tenant binding + # always wins to prevent caller-supplied org_id from + # impersonating a different tenant in log records. + existing_extra: Mapping[str, Any] = kwargs.get("extra") or {} + merged_extra: dict[str, Any] = dict(existing_extra) + merged_extra[_ORG_ID_KEY] = self._org_id + kwargs["extra"] = merged_extra + + # Prefix the message so plain-text handlers also surface the + # tenant binding. Use repr-safe brackets to avoid collision with + # other bracketed prefixes in adapter log lines. + return f"[{_ORG_ID_KEY}={self._org_id}] {msg}", kwargs + + +def get_tenant_logger(name: str, org_id: str) -> TenantContextLogAdapter: + """Construct a :class:`TenantContextLogAdapter` for ``name`` bound to ``org_id``. + + Convenience wrapper that mirrors the shape of + :func:`logging.getLogger` so adapter modules can swap a plain + ``logger = logging.getLogger(__name__)`` for + ``logger = get_tenant_logger(__name__, self._org_id)`` with a + one-line change. + + The underlying logger is shared across calls with the same name (as + with :func:`logging.getLogger`); only the adapter wrapper is fresh + per call so each adapter instance carries its own tenant binding. + """ + base = logging.getLogger(name) + return TenantContextLogAdapter(base, org_id) + + +__all__ = [ + "TenantContextLogAdapter", + "get_tenant_logger", +] diff --git a/src/layerlens/instrument/adapters/_base/sinks.py b/src/layerlens/instrument/adapters/_base/sinks.py index 764e7bb..26d69be 100644 --- a/src/layerlens/instrument/adapters/_base/sinks.py +++ b/src/layerlens/instrument/adapters/_base/sinks.py @@ -17,12 +17,22 @@ ``layerlens.instrument.transport`` and is added in a later milestone. Ported from ``ateam/stratix/sdk/python/adapters/sinks.py``. + +Multi-tenancy +------------- +Sinks accept events from multiple :class:`BaseAdapter` instances bound +to different tenants. Buffering and flushing are partitioned by +``org_id`` so a single tenant's burst cannot starve another tenant's +events nor displace them via global eviction. See +:class:`IngestionPipelineSink` for the per-tenant buffer cap contract +(CLAUDE.md "EVERY data operation must be scoped by tenant"). """ from __future__ import annotations import uuid import logging +import threading from abc import ABC, abstractmethod from typing import Any, Dict, List, Optional from datetime import datetime, timezone @@ -214,6 +224,15 @@ def close(self) -> None: ) +_DEFAULT_MAX_PER_TENANT_BUFFER_SIZE = 1000 +"""Default per-tenant buffer cap for :class:`IngestionPipelineSink`. + +Caps each tenant's buffer independently so a single noisy tenant cannot +exhaust memory or displace events from quieter tenants. See +``MaxPerTenantBufferSize`` parameter on :class:`IngestionPipelineSink`. +""" + + class IngestionPipelineSink(EventSink): """Sink that feeds events into a duck-typed ingestion pipeline. @@ -222,9 +241,34 @@ class IngestionPipelineSink(EventSink): Supports two modes: - * **immediate** (default): each event is ingested as a single-item batch. - * **buffered**: events are collected and ingested on - :meth:`flush` / :meth:`close`. + * **immediate** (default): each event is ingested as a single-item batch + keyed by the per-event ``org_id``. + * **buffered**: events are partitioned by ``org_id`` into per-tenant + buffers and ingested on :meth:`flush` / :meth:`close`. Each tenant's + buffer is flushed in its own ``ingest()`` call so one tenant's + backlog cannot delay another tenant's events. + + Multi-tenancy + ------------- + Buffering is partitioned by ``org_id`` (Gap 2 of the multi-tenancy + hardening contract). Per-tenant invariants: + + * **Isolation:** each tenant gets its own buffer dict slot. A burst + from tenant A never appears in tenant B's flush. + * **Bounded eviction:** ``max_per_tenant_buffer_size`` (default + ``1000``) caps each tenant independently. When tenant A overflows, + A's oldest events drop FIFO — tenant B's events are untouched. + Drops are counted per-tenant in :attr:`dropped_per_tenant`. + * **Observability:** :meth:`buffer_size_per_tenant` returns a + snapshot ``dict[org_id, int]`` for the + ``sink_per_tenant_buffer_size{org_id}`` gauge consumers. + + Thread-safety + ------------- + ``send`` / ``flush`` / ``close`` are safe to call concurrently from + any thread. All buffer mutations are serialized via an internal + lock; per-tenant ingest calls are made *outside* the lock to avoid + holding it across slow IO. """ def __init__( @@ -233,19 +277,56 @@ def __init__( trace_id: Optional[str] = None, tenant_id: str = "default", buffered: bool = False, + max_per_tenant_buffer_size: int = _DEFAULT_MAX_PER_TENANT_BUFFER_SIZE, ) -> None: + if max_per_tenant_buffer_size <= 0: + raise ValueError( + "max_per_tenant_buffer_size must be > 0; got " + f"{max_per_tenant_buffer_size}. The cap is per-tenant — " + "use a small positive value, never zero." + ) self._pipeline = pipeline self._trace_id = trace_id or str(uuid.uuid4()) self._tenant_id = tenant_id self._buffered = buffered - self._buffer: List[Dict[str, Any]] = [] + # Per-tenant buffers — keyed by org_id. A buffer for tenant A + # is never flushed under tenant B's binding. + self._buffers: Dict[str, List[Dict[str, Any]]] = {} + # Per-tenant drop counters, surfaced as + # ``sink_per_tenant_dropped{org_id}``. + self._dropped_per_tenant: Dict[str, int] = {} + self._max_per_tenant_buffer_size = max_per_tenant_buffer_size self._sequence_id = 0 self._closed = False + # Single mutex covers buffer / counter / sequence-id mutation. + # Per-tenant ingest IO happens outside the lock. + self._lock = threading.Lock() @property def trace_id(self) -> str: return self._trace_id + @property + def max_per_tenant_buffer_size(self) -> int: + """The per-tenant buffer cap. Read-only after construction.""" + return self._max_per_tenant_buffer_size + + def buffer_size_per_tenant(self) -> Dict[str, int]: + """Snapshot of currently-buffered event counts per tenant. + + Suitable for the ``sink_per_tenant_buffer_size{org_id}`` gauge. + Returned dict is a defensive copy; mutating it does not affect + sink state. + """ + with self._lock: + return {org_id: len(buf) for org_id, buf in self._buffers.items()} + + @property + def dropped_per_tenant(self) -> Dict[str, int]: + """Snapshot of drop counts per tenant (FIFO eviction on overflow).""" + with self._lock: + return dict(self._dropped_per_tenant) + def _format_event( self, event_type: str, @@ -257,16 +338,20 @@ def _format_event( ``org_id`` is propagated both as a top-level field (for sinks that read it directly) and inside the payload (already stamped - upstream by :meth:`BaseAdapter._stamp_org_id`). + upstream by :meth:`BaseAdapter._stamp_org_id`). Sequence-id + increment is performed under the sink lock so concurrent senders + do not collide. """ - self._sequence_id += 1 + with self._lock: + self._sequence_id += 1 + seq = self._sequence_id ts = datetime.fromtimestamp(timestamp_ns / 1e9, tz=UTC) return { "event_type": event_type, "trace_id": self._trace_id, "timestamp": ts.isoformat(), "span_id": str(uuid.uuid4()), - "sequence_id": self._sequence_id, + "sequence_id": seq, "event_id": str(uuid.uuid4()), "org_id": org_id, "payload": payload if isinstance(payload, dict) else {"raw": str(payload)}, @@ -284,16 +369,17 @@ def send( return formatted = self._format_event(event_type, payload, timestamp_ns, org_id) + # Per-event org_id is the source of truth for multi-tenant + # ingest. Empty values fall back to the sink-level legacy + # ``tenant_id`` only to preserve backward compatibility — adapter + # emissions always carry a non-empty ``org_id`` post-PR #118. + effective_org = org_id or self._tenant_id if self._buffered: - self._buffer.append(formatted) + self._enqueue(effective_org, formatted) else: try: - # Honour the per-event org_id when present — falls back - # to the sink-level tenant_id only for legacy callers - # that did not configure one. Per-event org_id is the - # source of truth for multi-tenant ingest. - self._pipeline.ingest([formatted], tenant_id=org_id or self._tenant_id) + self._pipeline.ingest([formatted], tenant_id=effective_org) except Exception: logger.debug( "IngestionPipelineSink.send() failed for event %s", @@ -301,18 +387,55 @@ def send( exc_info=True, ) + def _enqueue(self, org_id: str, formatted: Dict[str, Any]) -> None: + """Append an event to ``org_id``'s buffer with FIFO overflow drop. + + When the per-tenant buffer is at the cap, the OLDEST event in + THAT TENANT's buffer is dropped — never another tenant's. The + drop counter is bumped in :attr:`dropped_per_tenant`. + """ + with self._lock: + buf = self._buffers.setdefault(org_id, []) + if len(buf) >= self._max_per_tenant_buffer_size: + # FIFO eviction confined to THIS tenant's buffer. + # Other tenants' buffers are not inspected or modified. + buf.pop(0) + self._dropped_per_tenant[org_id] = self._dropped_per_tenant.get(org_id, 0) + 1 + logger.debug( + "IngestionPipelineSink dropped oldest event for tenant %s (cap=%d)", + org_id, + self._max_per_tenant_buffer_size, + ) + buf.append(formatted) + def flush(self) -> None: - if not self._buffer: - return - try: - self._pipeline.ingest(list(self._buffer), tenant_id=self._tenant_id) - except Exception: - logger.debug( - "IngestionPipelineSink.flush() failed for %d events", - len(self._buffer), - exc_info=True, - ) - self._buffer.clear() + """Flush every tenant's buffer in its own ``ingest()`` call. + + Each tenant's batch is sent under that tenant's ``org_id`` — one + tenant's slow downstream cannot block another's flush, because + the calls are sequenced on a fresh per-tenant snapshot taken + under the lock and the ingest IO happens outside it. + """ + with self._lock: + # Take a snapshot per tenant and clear each buffer atomically + # so concurrent ``send`` after this point starts a fresh + # buffer for that tenant. + snapshot: Dict[str, List[Dict[str, Any]]] = {} + for org_id, buf in self._buffers.items(): + if buf: + snapshot[org_id] = buf + self._buffers[org_id] = [] + + for org_id, batch in snapshot.items(): + try: + self._pipeline.ingest(list(batch), tenant_id=org_id) + except Exception: + logger.debug( + "IngestionPipelineSink.flush() failed for %d events (tenant=%s)", + len(batch), + org_id, + exc_info=True, + ) def close(self) -> None: if self._closed: diff --git a/tests/instrument/adapters/_base/test_cache_tenant_isolation.py b/tests/instrument/adapters/_base/test_cache_tenant_isolation.py new file mode 100644 index 0000000..ae9cdb6 --- /dev/null +++ b/tests/instrument/adapters/_base/test_cache_tenant_isolation.py @@ -0,0 +1,455 @@ +"""In-memory cache tenant-isolation tests (Gap 1). + +Every per-instance cache held by :class:`BaseAdapter` (circuit breaker +counters, error count, ``_trace_events`` buffer, sink registry, +``_circuit_open`` flag, opened-at timestamp) must be tenant-scoped. +Because adapters fail-fast at construction with a single ``org_id`` +binding (PR #118), the **per-instance** lifetime of every cache +inherits that single-tenant scope. + +This suite enforces that contract empirically — not by trusting the +type system but by: + +* Constructing two adapters bound to different tenants in the same + process and asserting their state never bleeds. +* Driving sustained concurrent emission from multiple threads — one per + tenant — and proving every event in tenant A's recorded stream is + tagged with tenant A and never B (or vice versa). +* Tripping tenant A's circuit breaker and verifying tenant B's adapter + remains HEALTHY and emit-able. +* Replacing one adapter's sink list and verifying the other adapter's + sinks are untouched. + +Background +---------- +A 2026-04-25 multi-tenancy audit +(``A:/tmp/adapter-depth-audit.md`` cross-cutting finding #1) flagged +that the in-memory caches in :class:`BaseAdapter` were never tested for +cross-tenant contamination. PR #118 added construction-time +tenant binding; this suite closes the audit by proving the runtime +state behaves accordingly under contention. +""" + +from __future__ import annotations + +import threading +from types import SimpleNamespace +from typing import Any, Dict, List, Tuple + +import pytest + +from layerlens.instrument.adapters._base.sinks import EventSink +from layerlens.instrument.adapters._base.adapter import ( + ORG_ID_FIELD, + AdapterInfo, + BaseAdapter, + AdapterHealth, + AdapterStatus, + ReplayableTrace, +) + +# --------------------------------------------------------------------------- +# Test doubles +# --------------------------------------------------------------------------- + + +class _RecordingStratix: + """Stratix double that records every emit call thread-safely.""" + + def __init__(self, org_id: str) -> None: + self.org_id = org_id + self._lock = threading.Lock() + self._events: List[Tuple[Any, ...]] = [] + + def emit(self, *args: Any, **kwargs: Any) -> None: + with self._lock: + self._events.append(args) + + @property + def events(self) -> List[Tuple[Any, ...]]: + with self._lock: + return list(self._events) + + +class _RecordingSink(EventSink): + """Sink that records every event seen, with thread-safe append.""" + + def __init__(self) -> None: + self._lock = threading.Lock() + self._received: List[Dict[str, Any]] = [] + + def send( + self, + event_type: str, + payload: Dict[str, Any], + timestamp_ns: int, + *, + org_id: str, + ) -> None: + with self._lock: + self._received.append({"event_type": event_type, "payload": payload, "org_id": org_id}) + + def flush(self) -> None: + pass + + def close(self) -> None: + pass + + @property + def received(self) -> List[Dict[str, Any]]: + with self._lock: + return list(self._received) + + +class _FailingStratix: + """Stratix double whose emit always raises — used to trip the breaker.""" + + def __init__(self, org_id: str) -> None: + self.org_id = org_id + self.calls = 0 + + def emit(self, *args: Any, **kwargs: Any) -> None: + self.calls += 1 + raise RuntimeError("synthetic emit failure") + + +class _MinimalAdapter(BaseAdapter): + FRAMEWORK = "test" + VERSION = "0.0.0" + + def connect(self) -> None: + self._connected = True + self._status = AdapterStatus.HEALTHY + + def disconnect(self) -> None: + self._connected = False + self._status = AdapterStatus.DISCONNECTED + + def health_check(self) -> AdapterHealth: + return AdapterHealth( + status=self._status, + framework_name=self.FRAMEWORK, + adapter_version=self.VERSION, + ) + + def get_adapter_info(self) -> AdapterInfo: + return AdapterInfo(name="MinimalAdapter", version=self.VERSION, framework=self.FRAMEWORK) + + def serialize_for_replay(self) -> ReplayableTrace: + return ReplayableTrace( + adapter_name="MinimalAdapter", + framework=self.FRAMEWORK, + trace_id="trace-test", + events=list(self._trace_events), + ) + + +# --------------------------------------------------------------------------- +# Per-instance cache scoping +# --------------------------------------------------------------------------- + + +def test_trace_events_buffer_is_per_instance() -> None: + """``_trace_events`` is a per-instance list — never shared across tenants.""" + a = _MinimalAdapter(stratix=_RecordingStratix("org-A")) + b = _MinimalAdapter(stratix=_RecordingStratix("org-B")) + a.connect() + b.connect() + + a.emit_dict_event("tool.call", {"tool_name": "calc-A"}) + a.emit_dict_event("tool.call", {"tool_name": "calc-A2"}) + b.emit_dict_event("tool.call", {"tool_name": "calc-B"}) + + trace_a = a.serialize_for_replay() + trace_b = b.serialize_for_replay() + + assert len(trace_a.events) == 2 + assert len(trace_b.events) == 1 + # Cross-check the org_id stamp — every event in A's trace is org-A, + # every event in B's trace is org-B. + assert all(e[ORG_ID_FIELD] == "org-A" for e in trace_a.events) + assert all(e[ORG_ID_FIELD] == "org-B" for e in trace_b.events) + + +def test_circuit_breaker_state_is_per_instance() -> None: + """Tripping tenant A's breaker leaves tenant B HEALTHY.""" + failing = _FailingStratix("org-A") + healthy = _RecordingStratix("org-B") + a = _MinimalAdapter(stratix=failing) + b = _MinimalAdapter(stratix=healthy) + a.connect() + b.connect() + + # Drive 12 emit failures into A — exceeds the threshold (10). + for i in range(12): + a.emit_dict_event("tool.call", {"i": i}) + + # A is now in ERROR with circuit OPEN — but B is untouched. + assert a._circuit_open is True + assert a._status == AdapterStatus.ERROR + assert a._error_count >= 10 + + assert b._circuit_open is False + assert b._status == AdapterStatus.HEALTHY + assert b._error_count == 0 + + # B can still emit normally even though A's circuit is open. + b.emit_dict_event("tool.call", {"tool_name": "B-still-works"}) + assert len(healthy.events) == 1 + + +def test_sink_registry_is_per_instance() -> None: + """Sinks added to A are NOT visible on B.""" + sink_a = _RecordingSink() + sink_b = _RecordingSink() + a = _MinimalAdapter(stratix=_RecordingStratix("org-A"), event_sinks=[sink_a]) + b = _MinimalAdapter(stratix=_RecordingStratix("org-B"), event_sinks=[sink_b]) + a.connect() + b.connect() + + a.emit_dict_event("tool.call", {"tool_name": "to-A"}) + b.emit_dict_event("tool.call", {"tool_name": "to-B"}) + + assert len(sink_a.received) == 1 + assert sink_a.received[0]["org_id"] == "org-A" + + assert len(sink_b.received) == 1 + assert sink_b.received[0]["org_id"] == "org-B" + + +def test_remove_sink_does_not_affect_other_adapter() -> None: + """Mutating one adapter's sink list never touches another's.""" + shared_sink = _RecordingSink() + a = _MinimalAdapter(stratix=_RecordingStratix("org-A"), event_sinks=[shared_sink]) + b = _MinimalAdapter(stratix=_RecordingStratix("org-B"), event_sinks=[shared_sink]) + + # Even though we passed the SAME sink instance into both adapters, + # removing it from A leaves B's reference intact. + assert a.remove_sink(shared_sink) is True + assert shared_sink in b.sinks + assert shared_sink not in a.sinks + + +def test_lock_is_per_instance() -> None: + """Each adapter holds its own ``_lock`` — no global serialization.""" + a = _MinimalAdapter(org_id="org-A") + b = _MinimalAdapter(org_id="org-B") + assert a._lock is not b._lock + + +def test_org_id_property_remains_immutable_per_instance() -> None: + """Adapter's bound ``org_id`` cannot be silently swapped at runtime.""" + a = _MinimalAdapter(org_id="org-A") + b = _MinimalAdapter(org_id="org-B") + assert a.org_id == "org-A" + assert b.org_id == "org-B" + # Mutating B does not affect A even via attribute write. + b._org_id = "org-B-CHANGED" + assert a.org_id == "org-A" + + +# --------------------------------------------------------------------------- +# Concurrent emission stress — proves no cross-tenant pollution under load +# --------------------------------------------------------------------------- + + +def _emit_burst(adapter: BaseAdapter, label: str, n: int) -> None: + """Emit ``n`` events through ``adapter``, each labelled distinctly.""" + for i in range(n): + adapter.emit_dict_event("tool.call", {"tool_name": f"{label}-{i}"}) + + +def test_concurrent_emission_two_tenants_no_cross_contamination() -> None: + """Two threads, two tenants, 500 events each — no leak in either direction.""" + stratix_a = _RecordingStratix("org-A") + stratix_b = _RecordingStratix("org-B") + sink_a = _RecordingSink() + sink_b = _RecordingSink() + adapter_a = _MinimalAdapter(stratix=stratix_a, event_sinks=[sink_a]) + adapter_b = _MinimalAdapter(stratix=stratix_b, event_sinks=[sink_b]) + adapter_a.connect() + adapter_b.connect() + + n = 500 + t_a = threading.Thread(target=_emit_burst, args=(adapter_a, "A", n)) + t_b = threading.Thread(target=_emit_burst, args=(adapter_b, "B", n)) + t_a.start() + t_b.start() + t_a.join(timeout=10) + t_b.join(timeout=10) + assert not t_a.is_alive(), "tenant A burst thread hung" + assert not t_b.is_alive(), "tenant B burst thread hung" + + # A's stratix saw exactly n org-A events and zero org-B events. + a_events = stratix_a.events + b_events = stratix_b.events + assert len(a_events) == n + assert len(b_events) == n + assert all(p[ORG_ID_FIELD] == "org-A" for _, p in a_events) + assert all(p[ORG_ID_FIELD] == "org-B" for _, p in b_events) + assert not any(p[ORG_ID_FIELD] == "org-B" for _, p in a_events) + assert not any(p[ORG_ID_FIELD] == "org-A" for _, p in b_events) + + # Sinks observed the same isolation. + assert all(r["org_id"] == "org-A" for r in sink_a.received) + assert all(r["org_id"] == "org-B" for r in sink_b.received) + + +def test_concurrent_emission_three_tenants_isolated_under_contention() -> None: + """Three concurrent tenants, each with their own adapter, see only their events.""" + tenants = ["org-X", "org-Y", "org-Z"] + stratix_per_tenant = {t: _RecordingStratix(t) for t in tenants} + adapters = {t: _MinimalAdapter(stratix=stratix_per_tenant[t]) for t in tenants} + for a in adapters.values(): + a.connect() + + n = 200 + threads = [ + threading.Thread(target=_emit_burst, args=(adapters[t], t, n)) for t in tenants + ] + for th in threads: + th.start() + for th in threads: + th.join(timeout=10) + assert not th.is_alive() + + for t in tenants: + events = stratix_per_tenant[t].events + assert len(events) == n + # Every event in this tenant's stream is correctly tagged. + assert all(p[ORG_ID_FIELD] == t for _, p in events) + # Cross-checks: no other tenant's id appears anywhere. + for other in tenants: + if other == t: + continue + assert not any(p[ORG_ID_FIELD] == other for _, p in events), ( + f"tenant {other}'s id leaked into {t}'s event stream" + ) + + +def test_concurrent_emission_does_not_corrupt_trace_event_buffer() -> None: + """High-concurrency emit calls produce a consistent trace_events count.""" + stratix = _RecordingStratix("org-S") + adapter = _MinimalAdapter(stratix=stratix) + adapter.connect() + + n_per_thread = 100 + n_threads = 8 + + def _worker() -> None: + for i in range(n_per_thread): + adapter.emit_dict_event("tool.call", {"i": i}) + + threads = [threading.Thread(target=_worker) for _ in range(n_threads)] + for th in threads: + th.start() + for th in threads: + th.join(timeout=10) + assert not th.is_alive() + + # Trace event buffer length == total emissions. Even though events + # may interleave, none are lost or duplicated. + expected = n_per_thread * n_threads + assert len(adapter._trace_events) == expected + # Every record carries the same (correct) tenant. + assert all(e[ORG_ID_FIELD] == "org-S" for e in adapter._trace_events) + + +# --------------------------------------------------------------------------- +# "Eviction respects tenant scope" — applies to per-instance trace_events too +# --------------------------------------------------------------------------- + + +def test_disconnect_does_not_clear_other_adapter_state() -> None: + """Disconnecting tenant A leaves tenant B's events / sinks intact.""" + stratix_a = _RecordingStratix("org-A") + stratix_b = _RecordingStratix("org-B") + sink_b = _RecordingSink() + a = _MinimalAdapter(stratix=stratix_a) + b = _MinimalAdapter(stratix=stratix_b, event_sinks=[sink_b]) + a.connect() + b.connect() + + a.emit_dict_event("tool.call", {"x": 1}) + b.emit_dict_event("tool.call", {"y": 1}) + + a.disconnect() + + # B's recorded state is untouched. + assert len(b._trace_events) == 1 + assert b._trace_events[0][ORG_ID_FIELD] == "org-B" + assert sink_b.received[0]["org_id"] == "org-B" + # B can still emit after A disconnects. + b.emit_dict_event("tool.call", {"y": 2}) + assert len(stratix_b.events) == 2 + + +def test_no_class_level_mutable_caches_on_base_adapter() -> None: + """BaseAdapter MUST NOT carry class-level mutable state that could leak. + + A class-level ``dict`` or ``list`` shared across instances would + silently fan a tenant A write into a tenant B read — exactly the + bug Gap 1 forbids. This guard fails loudly if a future commit adds + one. + """ + # Allowlist of class-level non-callable attrs that are explicitly + # immutable / type-level. + allowed_class_attrs = {"FRAMEWORK", "VERSION", "requires_pydantic"} + for name, value in vars(BaseAdapter).items(): + if name.startswith("_") or callable(value) or isinstance(value, (property, staticmethod, classmethod)): + continue + if name in allowed_class_attrs: + continue + # A bare list/dict/set at class level would be cross-instance + # shared state — banned. + assert not isinstance(value, (list, dict, set)), ( + f"BaseAdapter.{name} is a class-level mutable container " + f"({type(value).__name__}) — would leak across tenants" + ) + + +def test_two_adapters_share_no_module_globals_for_state() -> None: + """The adapter module exposes constants but no mutable state singletons. + + Module-level mutable state (a global dict, a global list) keyed + without ``org_id`` would defeat the per-instance binding. The + constants (``_CIRCUIT_BREAKER_THRESHOLD`` etc.) are immutable + primitives and therefore safe. + """ + import layerlens.instrument.adapters._base.adapter as adapter_mod + + forbidden_types = (list, dict, set, bytearray) + leaks: list[str] = [] + for name, value in vars(adapter_mod).items(): + if name.startswith("__"): + continue + # Skip imported / type / function objects. + if isinstance(value, type) or callable(value): + continue + # Skip the null sentinel — it has no mutable surface. + if name == "_NULL_STRATIX": + continue + if isinstance(value, forbidden_types): + leaks.append(f"{name}={type(value).__name__}") + + assert not leaks, ( + "adapter module exposes mutable global containers that could " + f"silently aggregate cross-tenant state: {leaks}" + ) + + +# --------------------------------------------------------------------------- +# Smoke checks ensuring fail-fast still applies (no Gap 1 regression) +# --------------------------------------------------------------------------- + + +def test_construction_without_org_id_still_fail_fasts() -> None: + """Gap 1 hardening MUST NOT regress PR #118's fail-fast guarantee.""" + with pytest.raises(ValueError, match="non-empty org_id"): + _MinimalAdapter() + + +def test_blank_stratix_org_id_still_rejected() -> None: + """A blank ``stratix.org_id`` still raises (no silent fallback).""" + with pytest.raises(ValueError, match="non-empty org_id"): + _MinimalAdapter(stratix=SimpleNamespace(org_id="")) diff --git a/tests/instrument/adapters/_base/test_otel_correlation.py b/tests/instrument/adapters/_base/test_otel_correlation.py new file mode 100644 index 0000000..f75127b --- /dev/null +++ b/tests/instrument/adapters/_base/test_otel_correlation.py @@ -0,0 +1,330 @@ +"""OTel ↔ SDK ``org_id`` cross-correlation tests (Gap 3). + +When a :class:`BaseAdapter` emits an event, the SDK's per-event +``org_id`` is stamped onto the **currently-active** OpenTelemetry span +as the ``layerlens.org_id`` attribute. This closes the cross-system +correlation gap: every distributed-trace span produced inside an +adapter call can be filtered by tenant. + +Properties enforced +------------------- + +* When OTel is installed AND a recording span is active, every + ``emit_event`` / ``emit_dict_event`` sets ``layerlens.org_id`` on + that span to the adapter's bound tenant. +* When the active span is the no-op span (no SDK installed / no + TracerProvider configured), the attribute set is a no-op — the + adapter's hot path never raises. +* When ``span.is_recording()`` returns False, the attribute set is + skipped (paying for it on a sampled-out span is wasteful). +* Two adapters bound to different tenants emitting under the same + parent span overwrite the attribute with each emit — the LAST emit's + tenant wins per span (which is the expected semantics: a span scopes + one logical operation by one tenant). +* Adapter errors do not block the OTel attribute set (defensive try / + except around the OTel API). + +Background +---------- +The 2026-04-25 cross-cutting audit (gap #3) identified that the SDK's +``org_id`` was nowhere visible in OTel spans, making per-tenant +distributed-trace queries impossible from atlas-app and external +backends (Tempo / Jaeger / Honeycomb). This file pins the fix. +""" + +from __future__ import annotations + +from typing import Any, Dict, List, Tuple + +import pytest +from opentelemetry import trace as otel_trace +from opentelemetry.sdk.trace import TracerProvider +from opentelemetry.sdk.trace.export import SimpleSpanProcessor +from opentelemetry.sdk.trace.export.in_memory_span_exporter import InMemorySpanExporter + +from layerlens.instrument.adapters._base.adapter import ( + AdapterInfo, + BaseAdapter, + AdapterHealth, + AdapterStatus, + ReplayableTrace, + _set_current_span_org_id, +) + +# --------------------------------------------------------------------------- +# OTel test harness — single global TracerProvider, per-test exporter +# --------------------------------------------------------------------------- +# +# ``trace.set_tracer_provider`` accepts the first call only (subsequent +# calls log "Overriding of current TracerProvider is not allowed" and +# silently ignore the new provider). We therefore install one provider +# at module load and attach a fresh InMemorySpanExporter per test via +# :func:`memory_exporter` — the exporter is the per-test surface, not +# the provider itself. + +_PROVIDER = TracerProvider() +otel_trace.set_tracer_provider(_PROVIDER) + + +@pytest.fixture() +def memory_exporter() -> InMemorySpanExporter: + """Attach a fresh in-memory exporter to the module-global provider.""" + exporter = InMemorySpanExporter() + processor = SimpleSpanProcessor(exporter) + _PROVIDER.add_span_processor(processor) + try: + yield exporter + finally: + exporter.clear() + # SDK does not expose a remove_span_processor; shut the + # processor down so it stops receiving span events. + processor.shutdown() + + +@pytest.fixture() +def tracer() -> otel_trace.Tracer: + """Tracer scoped to the test module.""" + return _PROVIDER.get_tracer("layerlens.tests.otel_correlation") + + +# --------------------------------------------------------------------------- +# Test doubles +# --------------------------------------------------------------------------- + + +class _RecordingStratix: + def __init__(self, org_id: str) -> None: + self.org_id = org_id + self.events: List[Tuple[Any, ...]] = [] + + def emit(self, *args: Any, **kwargs: Any) -> None: + self.events.append(args) + + +class _MinimalAdapter(BaseAdapter): + FRAMEWORK = "test" + VERSION = "0.0.0" + + def connect(self) -> None: + self._connected = True + self._status = AdapterStatus.HEALTHY + + def disconnect(self) -> None: + self._connected = False + self._status = AdapterStatus.DISCONNECTED + + def health_check(self) -> AdapterHealth: + return AdapterHealth( + status=self._status, + framework_name=self.FRAMEWORK, + adapter_version=self.VERSION, + ) + + def get_adapter_info(self) -> AdapterInfo: + return AdapterInfo(name="MinimalAdapter", version=self.VERSION, framework=self.FRAMEWORK) + + def serialize_for_replay(self) -> ReplayableTrace: + return ReplayableTrace( + adapter_name="MinimalAdapter", + framework=self.FRAMEWORK, + trace_id="trace-test", + events=list(self._trace_events), + ) + + +def _attrs(span: Any) -> Dict[str, Any]: + """Extract the attributes dict from a captured span (cross-version safe).""" + raw = getattr(span, "attributes", None) or {} + return dict(raw) + + +# --------------------------------------------------------------------------- +# Cross-correlation under an active span +# --------------------------------------------------------------------------- + + +def test_emit_dict_event_stamps_org_id_on_active_span( + memory_exporter: InMemorySpanExporter, + tracer: otel_trace.Tracer, +) -> None: + """``emit_dict_event`` sets ``layerlens.org_id`` on the active span.""" + stratix = _RecordingStratix("org-A") + adapter = _MinimalAdapter(stratix=stratix) + adapter.connect() + + with tracer.start_as_current_span("auth-check") as span: + span.set_attribute("auth.org_id", "org-A") + adapter.emit_dict_event("tool.call", {"tool_name": "calc"}) + + spans = memory_exporter.get_finished_spans() + assert len(spans) == 1 + attrs = _attrs(spans[0]) + assert attrs.get("layerlens.org_id") == "org-A" + # Pre-existing auth span attribute is preserved. + assert attrs.get("auth.org_id") == "org-A" + + +def test_emit_event_stamps_org_id_on_active_span( + memory_exporter: InMemorySpanExporter, + tracer: otel_trace.Tracer, +) -> None: + """``emit_event`` (typed payload path) also stamps the OTel span.""" + + class _TypedPayload: + def __init__(self) -> None: + self.event_type = "model.invoke" + self.model = "gpt-5" + + stratix = _RecordingStratix("org-B") + adapter = _MinimalAdapter(stratix=stratix) + adapter.connect() + + with tracer.start_as_current_span("invoke"): + adapter.emit_event(_TypedPayload()) + + spans = memory_exporter.get_finished_spans() + assert len(spans) == 1 + assert _attrs(spans[0]).get("layerlens.org_id") == "org-B" + + +def test_two_adapters_for_different_tenants_overwrite_per_span( + memory_exporter: InMemorySpanExporter, + tracer: otel_trace.Tracer, +) -> None: + """Two adapters under the same parent span — last emit's tenant wins. + + One span scopes one logical operation by one tenant. A correctly- + configured caller never multiplexes tenants under one span; this + test simply pins the behaviour so misuse is observable rather than + silently merging tenants. + """ + a = _MinimalAdapter(stratix=_RecordingStratix("org-A")) + b = _MinimalAdapter(stratix=_RecordingStratix("org-B")) + a.connect() + b.connect() + + with tracer.start_as_current_span("shared-parent"): + a.emit_dict_event("tool.call", {"x": 1}) + b.emit_dict_event("tool.call", {"y": 1}) + + spans = memory_exporter.get_finished_spans() + assert len(spans) == 1 + assert _attrs(spans[0]).get("layerlens.org_id") == "org-B" + + +def test_each_span_carries_its_own_tenant_attribute( + memory_exporter: InMemorySpanExporter, + tracer: otel_trace.Tracer, +) -> None: + """Two sequential spans, two adapters: each span tagged with its own tenant.""" + a = _MinimalAdapter(stratix=_RecordingStratix("org-A")) + b = _MinimalAdapter(stratix=_RecordingStratix("org-B")) + a.connect() + b.connect() + + with tracer.start_as_current_span("op-A"): + a.emit_dict_event("tool.call", {"x": 1}) + with tracer.start_as_current_span("op-B"): + b.emit_dict_event("tool.call", {"y": 1}) + + spans = memory_exporter.get_finished_spans() + by_name = {s.name: _attrs(s) for s in spans} + assert by_name["op-A"].get("layerlens.org_id") == "org-A" + assert by_name["op-B"].get("layerlens.org_id") == "org-B" + + +# --------------------------------------------------------------------------- +# No-op behaviour when no span / no recording span +# --------------------------------------------------------------------------- + + +def test_emit_outside_span_does_not_raise( + memory_exporter: InMemorySpanExporter, +) -> None: + """Emit with no active span finishes without raising and produces no spans.""" + stratix = _RecordingStratix("org-A") + adapter = _MinimalAdapter(stratix=stratix) + adapter.connect() + + # No span context — the OTel call is a no-op (INVALID_SPAN). + adapter.emit_dict_event("tool.call", {"tool_name": "ok"}) + + # OTel returns a tuple; convert for portable comparison. + assert list(memory_exporter.get_finished_spans()) == [] + # SDK still saw the event. + assert len(stratix.events) == 1 + + +def test_set_current_span_org_id_is_safe_when_otel_unavailable( + monkeypatch: pytest.MonkeyPatch, +) -> None: + """When OTel import fails, the helper silently no-ops.""" + import builtins + + real_import = builtins.__import__ + + def fake_import(name: str, *args: Any, **kwargs: Any) -> Any: + if name == "opentelemetry" or name.startswith("opentelemetry."): + raise ImportError("simulated missing opentelemetry") + return real_import(name, *args, **kwargs) + + monkeypatch.setattr(builtins, "__import__", fake_import) + + # No exception raised even without OTel. + _set_current_span_org_id("org-A") + + +def test_non_recording_span_skipped( + memory_exporter: InMemorySpanExporter, +) -> None: + """A non-recording span (sampled out) is skipped — no attribute set.""" + + class _FakeSpan: + attributes: Dict[str, Any] = {} + + def is_recording(self) -> bool: + return False + + def set_attribute(self, key: str, value: Any) -> None: + # If we got here, the recording check failed — record it. + type(self).attributes[key] = value + + fake_span = _FakeSpan() + + # Patch get_current_span to return our fake non-recording span. + real_get = otel_trace.get_current_span + try: + otel_trace.get_current_span = lambda *args, **kwargs: fake_span # type: ignore[assignment] + _set_current_span_org_id("org-A") + finally: + otel_trace.get_current_span = real_get # type: ignore[assignment] + + assert "layerlens.org_id" not in _FakeSpan.attributes + + +def test_otel_set_attribute_failure_does_not_block_emit( + memory_exporter: InMemorySpanExporter, + tracer: otel_trace.Tracer, +) -> None: + """If set_attribute raises, the emit path still completes.""" + stratix = _RecordingStratix("org-A") + adapter = _MinimalAdapter(stratix=stratix) + adapter.connect() + + class _BoomSpan: + def is_recording(self) -> bool: + return True + + def set_attribute(self, *args: Any, **kwargs: Any) -> None: + raise RuntimeError("boom") + + real_get = otel_trace.get_current_span + try: + otel_trace.get_current_span = lambda *args, **kwargs: _BoomSpan() # type: ignore[assignment] + # Must not raise. + adapter.emit_dict_event("tool.call", {"tool_name": "ok"}) + finally: + otel_trace.get_current_span = real_get # type: ignore[assignment] + + # SDK still emitted the event despite the OTel failure. + assert len(stratix.events) == 1 diff --git a/tests/instrument/adapters/_base/test_sinks_per_tenant.py b/tests/instrument/adapters/_base/test_sinks_per_tenant.py new file mode 100644 index 0000000..38eb829 --- /dev/null +++ b/tests/instrument/adapters/_base/test_sinks_per_tenant.py @@ -0,0 +1,336 @@ +"""Per-tenant stream isolation tests for :class:`IngestionPipelineSink`. + +Implements Gap 2 of the multi-tenancy hardening contract: a single +sink instance servicing multiple tenants must partition its buffer by +``org_id``, cap each tenant independently, and never let one tenant's +burst displace another's events. + +Properties enforced +------------------- + +* **Per-tenant buffers:** events from tenant A go to a different list + slot than tenant B's. Flushing A never moves B's events. +* **Per-tenant cap (FIFO eviction):** when tenant A overflows the cap, + A's OLDEST event is dropped — B's buffer (and its events) are + untouched. The drop is recorded in :attr:`dropped_per_tenant` keyed + by A's ``org_id``. +* **Per-tenant flush calls:** :meth:`flush` issues one ``ingest()`` + call per tenant with that tenant's ``org_id`` — never a mixed batch. +* **Concurrent isolation:** N threads emitting on N tenants in + parallel produce per-tenant batches with no cross-contamination, + even under contention with a small cap that triggers eviction. +* **Observability:** :meth:`buffer_size_per_tenant` reports a + per-tenant snapshot suitable for the + ``sink_per_tenant_buffer_size{org_id}`` gauge. + +Background +---------- +PR #118 added per-event ``org_id`` propagation but +:class:`IngestionPipelineSink` still kept a single buffer. A noisy +tenant could (a) starve quieter tenants by filling a global cap and +(b) cause flush batches to mix tenants — defeating downstream RLS. +This file pins the hardened contract. +""" + +from __future__ import annotations + +import threading +from typing import Any, Dict, List, Tuple + +import pytest + +from layerlens.instrument.adapters._base.sinks import IngestionPipelineSink + + +class _RecordingPipeline: + """Pipeline double that captures every ``ingest()`` call. + + Each call appears as ``(events_list_copy, tenant_id)``. Thread-safe + so the test suite can drive concurrent senders. + """ + + def __init__(self) -> None: + self._lock = threading.Lock() + self._calls: List[Tuple[List[Dict[str, Any]], str]] = [] + + def ingest(self, events: List[Dict[str, Any]], tenant_id: str) -> None: + with self._lock: + self._calls.append(([dict(e) for e in events], tenant_id)) + + @property + def calls(self) -> List[Tuple[List[Dict[str, Any]], str]]: + with self._lock: + return list(self._calls) + + def calls_for(self, tenant_id: str) -> List[Tuple[List[Dict[str, Any]], str]]: + return [(events, tid) for events, tid in self.calls if tid == tenant_id] + + +def _emit(sink: IngestionPipelineSink, org_id: str, n: int, prefix: str = "ev") -> None: + """Drive ``n`` events into ``sink`` bound to ``org_id``.""" + for i in range(n): + sink.send( + "tool.call", + {"tool_name": f"{prefix}-{i}", "org_id": org_id}, + timestamp_ns=1_700_000_000_000_000_000 + i, + org_id=org_id, + ) + + +# --------------------------------------------------------------------------- +# Cross-tenant burst isolation (immediate mode) +# --------------------------------------------------------------------------- + + +def test_immediate_mode_keys_each_event_by_org_id() -> None: + """Immediate (unbuffered) mode routes per-event ``org_id`` to ``ingest``.""" + pipeline = _RecordingPipeline() + sink = IngestionPipelineSink(pipeline=pipeline, buffered=False) + + _emit(sink, "org-A", 3) + _emit(sink, "org-B", 2) + + a_calls = pipeline.calls_for("org-A") + b_calls = pipeline.calls_for("org-B") + assert len(a_calls) == 3 + assert len(b_calls) == 2 + # No mixed batch: every immediate call is for a single tenant. + for events, tenant in pipeline.calls: + assert len(events) == 1 + assert all(e["org_id"] == tenant for e in events) + + +# --------------------------------------------------------------------------- +# Per-tenant buffer (buffered mode) +# --------------------------------------------------------------------------- + + +def test_buffered_mode_partitions_buffers_per_tenant() -> None: + """Each tenant has its own buffer; ``buffer_size_per_tenant`` reflects this.""" + pipeline = _RecordingPipeline() + sink = IngestionPipelineSink(pipeline=pipeline, buffered=True) + + _emit(sink, "org-A", 5) + _emit(sink, "org-B", 2) + + sizes = sink.buffer_size_per_tenant() + assert sizes == {"org-A": 5, "org-B": 2} + # Nothing flushed yet. + assert pipeline.calls == [] + + +def test_flush_emits_one_ingest_call_per_tenant() -> None: + """Flushing a multi-tenant buffered sink yields exactly one batch per tenant.""" + pipeline = _RecordingPipeline() + sink = IngestionPipelineSink(pipeline=pipeline, buffered=True) + + _emit(sink, "org-A", 4, prefix="A") + _emit(sink, "org-B", 3, prefix="B") + _emit(sink, "org-C", 2, prefix="C") + + sink.flush() + + by_tenant = {tenant: events for events, tenant in pipeline.calls} + assert set(by_tenant.keys()) == {"org-A", "org-B", "org-C"} + assert len(by_tenant["org-A"]) == 4 + assert len(by_tenant["org-B"]) == 3 + assert len(by_tenant["org-C"]) == 2 + + # Every batch contains ONLY events for its tenant — no leak. + for tenant, events in by_tenant.items(): + assert all(e["org_id"] == tenant for e in events), ( + f"batch for tenant {tenant} contained another tenant's event" + ) + + # Buffers are now empty. + assert sink.buffer_size_per_tenant() == {"org-A": 0, "org-B": 0, "org-C": 0} + + +def test_per_tenant_cap_drops_only_overflowing_tenants_oldest_event() -> None: + """A burst from tenant A FIFO-evicts ONLY A's oldest events.""" + pipeline = _RecordingPipeline() + sink = IngestionPipelineSink( + pipeline=pipeline, + buffered=True, + max_per_tenant_buffer_size=3, + ) + + # Tenant B sits below the cap. + _emit(sink, "org-B", 2, prefix="B") + # Tenant A overflows: cap=3, send 5 → 2 events dropped (the oldest). + _emit(sink, "org-A", 5, prefix="A") + + sizes = sink.buffer_size_per_tenant() + # B is unaffected by A's overflow. + assert sizes["org-B"] == 2 + # A is at the cap (not above). + assert sizes["org-A"] == 3 + # A's drop counter == 2; B's drop counter is absent or 0. + drops = sink.dropped_per_tenant + assert drops.get("org-A") == 2 + assert "org-B" not in drops or drops["org-B"] == 0 + + # Flush — A gets exactly 3 events: A-2, A-3, A-4 (oldest two dropped). + sink.flush() + by_tenant = {tenant: events for events, tenant in pipeline.calls} + a_names = [e["payload"]["tool_name"] for e in by_tenant["org-A"]] + assert a_names == ["A-2", "A-3", "A-4"] + # B's events are intact and untouched. + b_names = [e["payload"]["tool_name"] for e in by_tenant["org-B"]] + assert b_names == ["B-0", "B-1"] + + +def test_max_per_tenant_buffer_size_must_be_positive() -> None: + """A non-positive cap is rejected at construction (no silent passthrough).""" + with pytest.raises(ValueError, match="must be > 0"): + IngestionPipelineSink(pipeline=_RecordingPipeline(), max_per_tenant_buffer_size=0) + with pytest.raises(ValueError, match="must be > 0"): + IngestionPipelineSink(pipeline=_RecordingPipeline(), max_per_tenant_buffer_size=-1) + + +# --------------------------------------------------------------------------- +# Concurrency: per-tenant burst isolation under contention +# --------------------------------------------------------------------------- + + +def test_concurrent_bursts_two_tenants_partition_correctly() -> None: + """Two threads emitting under contention produce isolated per-tenant batches.""" + pipeline = _RecordingPipeline() + sink = IngestionPipelineSink( + pipeline=pipeline, + buffered=True, + max_per_tenant_buffer_size=10_000, + ) + + n = 500 + + def _drive(org_id: str) -> None: + for i in range(n): + sink.send( + "tool.call", + {"tool_name": f"{org_id}-{i}", "org_id": org_id}, + timestamp_ns=1_700_000_000_000_000_000 + i, + org_id=org_id, + ) + + t_a = threading.Thread(target=_drive, args=("org-A",)) + t_b = threading.Thread(target=_drive, args=("org-B",)) + t_a.start() + t_b.start() + t_a.join(timeout=10) + t_b.join(timeout=10) + assert not t_a.is_alive() and not t_b.is_alive() + + sizes = sink.buffer_size_per_tenant() + assert sizes["org-A"] == n + assert sizes["org-B"] == n + + sink.flush() + by_tenant: Dict[str, List[Dict[str, Any]]] = {} + for events, tenant in pipeline.calls: + by_tenant.setdefault(tenant, []).extend(events) + + # Every event in A's flush is tagged with A. No B leakage in either direction. + assert len(by_tenant["org-A"]) == n + assert len(by_tenant["org-B"]) == n + assert all(e["org_id"] == "org-A" for e in by_tenant["org-A"]) + assert all(e["org_id"] == "org-B" for e in by_tenant["org-B"]) + + +def test_noisy_tenant_does_not_evict_quiet_tenants_buffer() -> None: + """One tenant flooding past the cap evicts ONLY its own events.""" + pipeline = _RecordingPipeline() + cap = 50 + sink = IngestionPipelineSink( + pipeline=pipeline, + buffered=True, + max_per_tenant_buffer_size=cap, + ) + + # Quiet tenant fills its buffer with a small burst. + _emit(sink, "org-quiet", 5, prefix="Q") + quiet_baseline = sink.buffer_size_per_tenant()["org-quiet"] + assert quiet_baseline == 5 + + # Noisy tenant in parallel drives 10x the cap, FIFO-evicts itself. + noise = cap * 10 + + def _flood() -> None: + for i in range(noise): + sink.send( + "tool.call", + {"tool_name": f"NOISE-{i}", "org_id": "org-noisy"}, + timestamp_ns=1_700_000_000_000_000_000 + i, + org_id="org-noisy", + ) + + th = threading.Thread(target=_flood) + th.start() + th.join(timeout=15) + assert not th.is_alive() + + # Quiet tenant's buffer is unchanged. + assert sink.buffer_size_per_tenant()["org-quiet"] == quiet_baseline + # Noisy tenant capped exactly at cap. + assert sink.buffer_size_per_tenant()["org-noisy"] == cap + # Drops counted ONLY against the noisy tenant. + drops = sink.dropped_per_tenant + assert drops.get("org-noisy") == noise - cap + assert "org-quiet" not in drops or drops["org-quiet"] == 0 + + +# --------------------------------------------------------------------------- +# Observability surface +# --------------------------------------------------------------------------- + + +def test_buffer_size_per_tenant_returns_defensive_copy() -> None: + """Caller mutation of the returned dict does not affect sink state.""" + pipeline = _RecordingPipeline() + sink = IngestionPipelineSink(pipeline=pipeline, buffered=True) + _emit(sink, "org-X", 2) + + snapshot = sink.buffer_size_per_tenant() + snapshot["org-X"] = 99999 + snapshot["org-OTHER"] = 42 + + # Sink's internal state is unaffected. + fresh = sink.buffer_size_per_tenant() + assert fresh == {"org-X": 2} + + +def test_dropped_per_tenant_returns_defensive_copy() -> None: + """Caller mutation of dropped counts dict does not affect sink state.""" + pipeline = _RecordingPipeline() + sink = IngestionPipelineSink( + pipeline=pipeline, + buffered=True, + max_per_tenant_buffer_size=2, + ) + _emit(sink, "org-Q", 5) + + snapshot = sink.dropped_per_tenant + snapshot["org-Q"] = 0 + snapshot["org-bogus"] = 42 + + fresh = sink.dropped_per_tenant + assert fresh == {"org-Q": 3} + + +def test_close_flushes_all_per_tenant_buffers() -> None: + """``close()`` is equivalent to a final per-tenant flush across every tenant.""" + pipeline = _RecordingPipeline() + sink = IngestionPipelineSink(pipeline=pipeline, buffered=True) + + _emit(sink, "org-A", 3) + _emit(sink, "org-B", 1) + + sink.close() + + by_tenant = {tenant: events for events, tenant in pipeline.calls} + assert sorted(by_tenant.keys()) == ["org-A", "org-B"] + assert len(by_tenant["org-A"]) == 3 + assert len(by_tenant["org-B"]) == 1 + # Subsequent send becomes a no-op (sink is closed). + _emit(sink, "org-C", 1) + assert "org-C" not in {tenant for _, tenant in pipeline.calls} diff --git a/tests/instrument/adapters/_base/test_tenant_logger.py b/tests/instrument/adapters/_base/test_tenant_logger.py new file mode 100644 index 0000000..3cf173f --- /dev/null +++ b/tests/instrument/adapters/_base/test_tenant_logger.py @@ -0,0 +1,271 @@ +"""Tenant-aware logging propagation tests (Gap 4). + +Verifies that :class:`TenantContextLogAdapter` and the +:func:`get_tenant_logger` factory: + +* Inject ``org_id`` into every record's ``extra`` dict so structured + log handlers (JSON formatters, OTel log exporters) see it as a + first-class field. +* Prepend ``[org_id=] `` to the formatted message body so plain + log lines also carry the tenant binding. +* Refuse construction without a non-empty ``org_id`` + (matches :class:`BaseAdapter`'s fail-fast contract). +* Are per-instance — two adapters bound to different tenants log with + their respective bindings even when they share the same underlying + logger name. +* Override caller-supplied ``extra={"org_id": ...}`` to prevent + callers from impersonating another tenant in log records. +* Are wired into :class:`BaseAdapter` via the ``tlogger`` property so + subclass code can drop in ``self.tlogger`` for ``logging.getLogger``. + +Background +---------- +Per the 2026-04-25 cross-cutting audit (gap #4), adapter log lines +omitted ``org_id`` entirely. Tenant-A and tenant-B circuit breaker +events were indistinguishable in shared log streams, making per-tenant +incident triage impossible. This file pins the fix. +""" + +from __future__ import annotations + +import logging +from typing import Any, Dict, List + +import pytest + +from layerlens.instrument.adapters._base.adapter import ( + AdapterInfo, + BaseAdapter, + AdapterHealth, + AdapterStatus, + ReplayableTrace, +) +from layerlens.instrument.adapters._base.logging import ( + TenantContextLogAdapter, + get_tenant_logger, +) + +# --------------------------------------------------------------------------- +# Test doubles +# --------------------------------------------------------------------------- + + +class _RecordingHandler(logging.Handler): + """Capture every emitted ``LogRecord`` in a list for inspection.""" + + def __init__(self) -> None: + super().__init__(level=logging.DEBUG) + self.records: List[logging.LogRecord] = [] + + def emit(self, record: logging.LogRecord) -> None: + self.records.append(record) + + +class _RecordingStratix: + def __init__(self, org_id: str) -> None: + self.org_id = org_id + + def emit(self, *args: Any, **kwargs: Any) -> None: + pass + + +class _MinimalAdapter(BaseAdapter): + FRAMEWORK = "test" + VERSION = "0.0.0" + + def connect(self) -> None: + self._connected = True + self._status = AdapterStatus.HEALTHY + + def disconnect(self) -> None: + self._connected = False + self._status = AdapterStatus.DISCONNECTED + + def health_check(self) -> AdapterHealth: + return AdapterHealth( + status=self._status, + framework_name=self.FRAMEWORK, + adapter_version=self.VERSION, + ) + + def get_adapter_info(self) -> AdapterInfo: + return AdapterInfo(name="MinimalAdapter", version=self.VERSION, framework=self.FRAMEWORK) + + def serialize_for_replay(self) -> ReplayableTrace: + return ReplayableTrace( + adapter_name="MinimalAdapter", + framework=self.FRAMEWORK, + trace_id="trace-test", + events=list(self._trace_events), + ) + + +@pytest.fixture() +def handler() -> _RecordingHandler: + """Attach a fresh capture handler to the root logger for the test.""" + h = _RecordingHandler() + root = logging.getLogger() + # Save and restore level so we don't pollute other tests. + saved_level = root.level + root.setLevel(logging.DEBUG) + root.addHandler(h) + try: + yield h + finally: + root.removeHandler(h) + root.setLevel(saved_level) + + +# --------------------------------------------------------------------------- +# Construction-time fail-fast +# --------------------------------------------------------------------------- + + +def test_construction_rejects_empty_org_id() -> None: + """An empty ``org_id`` is rejected at construction (CLAUDE.md fail-fast).""" + base = logging.getLogger("layerlens.tests.tenant_logger.empty") + with pytest.raises(ValueError, match="non-empty org_id"): + TenantContextLogAdapter(base, "") + + +def test_construction_rejects_whitespace_org_id() -> None: + """A whitespace-only ``org_id`` is rejected as well.""" + base = logging.getLogger("layerlens.tests.tenant_logger.ws") + with pytest.raises(ValueError, match="non-empty org_id"): + TenantContextLogAdapter(base, " ") + + +def test_construction_rejects_non_string_org_id() -> None: + """A non-string ``org_id`` is rejected.""" + base = logging.getLogger("layerlens.tests.tenant_logger.notstr") + with pytest.raises(ValueError, match="non-empty org_id"): + TenantContextLogAdapter(base, None) # type: ignore[arg-type] + + +def test_org_id_property_returns_bound_value() -> None: + """The read-only property reports the binding.""" + base = logging.getLogger("layerlens.tests.tenant_logger.prop") + log = TenantContextLogAdapter(base, "org-prop") + assert log.org_id == "org-prop" + + +# --------------------------------------------------------------------------- +# Record extras propagation +# --------------------------------------------------------------------------- + + +def test_extra_carries_org_id_on_every_record(handler: _RecordingHandler) -> None: + """Every log call attaches ``org_id`` to the record extras.""" + log = get_tenant_logger("layerlens.tests.tenant_logger.extras", "org-A") + + log.warning("circuit breaker open") + log.error("retry exhausted") + + assert len(handler.records) == 2 + for rec in handler.records: + assert getattr(rec, "org_id", None) == "org-A", ( + f"record missing org_id extra: {rec.__dict__}" + ) + + +def test_message_is_prefixed_with_org_id(handler: _RecordingHandler) -> None: + """The formatted message body includes the ``[org_id=...]`` prefix.""" + log = get_tenant_logger("layerlens.tests.tenant_logger.prefix", "org-B") + log.info("emit succeeded") + + assert len(handler.records) == 1 + formatted = handler.records[0].getMessage() + assert formatted.startswith("[org_id=org-B] ") + assert "emit succeeded" in formatted + + +def test_caller_supplied_extra_cannot_overwrite_tenant(handler: _RecordingHandler) -> None: + """A caller cannot pass ``extra={"org_id": "OTHER"}`` and impersonate a tenant.""" + log = get_tenant_logger("layerlens.tests.tenant_logger.spoof", "org-A") + + log.warning("attempted impersonation", extra={"org_id": "org-B-SPOOFED", "request_id": "r1"}) + + assert len(handler.records) == 1 + rec = handler.records[0] + assert rec.org_id == "org-A", "tenant binding overwritten by caller" + # Other caller-supplied extras are preserved. + assert rec.request_id == "r1" + + +# --------------------------------------------------------------------------- +# Per-instance binding (no leak across adapters / tenants) +# --------------------------------------------------------------------------- + + +def test_two_loggers_bound_to_different_tenants_do_not_leak( + handler: _RecordingHandler, +) -> None: + """Two TenantContextLogAdapter instances on the same logger keep distinct bindings.""" + log_a = get_tenant_logger("layerlens.tests.tenant_logger.shared", "org-A") + log_b = get_tenant_logger("layerlens.tests.tenant_logger.shared", "org-B") + + log_a.info("from A") + log_b.info("from B") + + assert len(handler.records) == 2 + by_msg: Dict[str, str] = {} + for rec in handler.records: + msg = rec.getMessage() + if "from A" in msg: + by_msg["A"] = rec.org_id + elif "from B" in msg: + by_msg["B"] = rec.org_id + assert by_msg == {"A": "org-A", "B": "org-B"} + + +def test_get_tenant_logger_returns_distinct_adapter_instances() -> None: + """Each call returns a fresh adapter even for the same logger name.""" + log_a = get_tenant_logger("layerlens.tests.tenant_logger.distinct", "org-A") + log_b = get_tenant_logger("layerlens.tests.tenant_logger.distinct", "org-A") + assert log_a is not log_b + # But underlying logger is the same singleton — getLogger semantics. + assert log_a.logger is log_b.logger + + +# --------------------------------------------------------------------------- +# BaseAdapter wiring +# --------------------------------------------------------------------------- + + +def test_base_adapter_exposes_tlogger_bound_to_its_org_id(handler: _RecordingHandler) -> None: + """Adapter's ``tlogger`` is bound to the same ``org_id`` as ``adapter.org_id``.""" + adapter = _MinimalAdapter(stratix=_RecordingStratix("org-W")) + assert isinstance(adapter.tlogger, TenantContextLogAdapter) + assert adapter.tlogger.org_id == "org-W" + + adapter.tlogger.warning("hello from adapter") + + relevant = [r for r in handler.records if "hello from adapter" in r.getMessage()] + assert len(relevant) == 1 + assert relevant[0].org_id == "org-W" + + +def test_two_adapters_have_distinct_tloggers(handler: _RecordingHandler) -> None: + """Two adapters bound to different tenants produce separate log bindings.""" + a = _MinimalAdapter(stratix=_RecordingStratix("org-A")) + b = _MinimalAdapter(stratix=_RecordingStratix("org-B")) + + a.tlogger.info("from-A-adapter") + b.tlogger.info("from-B-adapter") + + a_recs = [r for r in handler.records if "from-A-adapter" in r.getMessage()] + b_recs = [r for r in handler.records if "from-B-adapter" in r.getMessage()] + assert len(a_recs) == 1 + assert len(b_recs) == 1 + assert a_recs[0].org_id == "org-A" + assert b_recs[0].org_id == "org-B" + + +def test_tlogger_message_prefix_visible_in_formatted_output(handler: _RecordingHandler) -> None: + """The ``[org_id=...]`` prefix is present in plain-text formatted output.""" + adapter = _MinimalAdapter(org_id="org-Z") + adapter.tlogger.error("disconnected") + + relevant = [r for r in handler.records if "disconnected" in r.getMessage()] + assert len(relevant) == 1 + assert relevant[0].getMessage().startswith("[org_id=org-Z] ")