Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
99 changes: 97 additions & 2 deletions src/layerlens/instrument/adapters/_base/adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@
ALWAYS_ENABLED_EVENT_TYPES,
CaptureConfig,
)
from layerlens.instrument.adapters._base.logging import (
TenantContextLogAdapter,
get_tenant_logger,
)
from layerlens.instrument.adapters._base.pydantic_compat import PydanticCompat

# Forward reference: EventSink is defined in sinks.py, which itself does not
Expand All @@ -43,6 +47,60 @@
logger = logging.getLogger(__name__)


# ---------------------------------------------------------------------------
# OTel ↔ SDK org_id correlation (Gap 3)
# ---------------------------------------------------------------------------

# OTel span attribute name used to correlate distributed traces with the
# SDK's tenant binding. Picked under the ``layerlens.*`` namespace so it
# does not clash with the OTel semantic conventions and so atlas-app /
# downstream span processors can index on it directly.
_OTEL_ORG_ID_ATTR: str = "layerlens.org_id"


def _set_current_span_org_id(org_id: str) -> None:
"""Stamp ``layerlens.org_id`` onto the currently-active OTel span.

No-op in any of the following conditions:

* OpenTelemetry is not installed (``opentelemetry`` import fails).
* No span is currently active (``trace.get_current_span()`` returns
the no-op ``INVALID_SPAN``).
* The active span is not recording (``span.is_recording()`` is
``False``).
* The OTel API raises (defensive — we never let logging /
observability take down the adapter hot path).

The attribute is set under the ``layerlens.*`` prefix so it can
coexist with OTel semantic-convention attributes
(``service.namespace`` etc.) without collision. Downstream span
processors (atlas-app, Tempo, Jaeger) can index on this attribute
to filter spans by tenant — closing the cross-correlation gap
between the SDK's per-event ``org_id`` and OTel's distributed
span graph.
"""
try:
from opentelemetry import trace as _otel_trace
except ImportError:
return
try:
span = _otel_trace.get_current_span()
# ``get_current_span`` always returns a span object (the no-op
# ``INVALID_SPAN`` when no context exists). Defer to the
# ``is_recording`` predicate so non-recording / sampled-out
# spans do not pay the attribute-set cost.
is_recording = getattr(span, "is_recording", None)
if callable(is_recording) and not is_recording():
return
span.set_attribute(_OTEL_ORG_ID_ATTR, org_id)
except Exception: # noqa: BLE001 — observability MUST never raise
# Defensive: if the OTel API misbehaves under any condition,
# we drop the attribute set rather than fault the emit path.
logger.debug(
"Failed to stamp %s onto active OTel span", _OTEL_ORG_ID_ATTR, exc_info=True
)


# ---------------------------------------------------------------------------
# Enums & Models
# ---------------------------------------------------------------------------
Expand Down Expand Up @@ -278,20 +336,34 @@ def __init__(
self._connected = False
self._status: AdapterStatus = AdapterStatus.DISCONNECTED

# Circuit breaker state (protected by _lock).
# Circuit breaker state (protected by _lock). The lock and
# counter are *per-instance*, so they inherit the adapter's
# single-tenant binding — tenant A's circuit cannot trip
# tenant B's adapter even though both share the BaseAdapter
# class. See ``test_cache_tenant_isolation.py``.
self._lock = threading.Lock()
self._error_count = 0
self._circuit_open = False
self._circuit_opened_at: float = 0.0

# Collected events for replay serialization.
# Collected events for replay serialization. Per-instance, so
# cross-tenant trace contamination is structurally impossible
# for the in-memory event buffer.
self._trace_events: List[Dict[str, Any]] = []

# Pluggable event sinks for persistence / export. Use add_sink /
# remove_sink to mutate; direct list manipulation is not part of
# the public API and may change in v2.
self._event_sinks: List["EventSink"] = list(event_sinks) if event_sinks else []

# Per-instance tenant-aware logger (Gap 4). All adapter log
# lines now carry ``org_id`` in record extras AND in a
# ``[org_id=...]`` message prefix. Subclasses inherit this via
# ``self.tlogger`` and should prefer it over plain ``logging``.
self._tlogger: TenantContextLogAdapter = get_tenant_logger(
type(self).__module__, self._org_id
)

# --- Sink management (public API) ---

def add_sink(self, sink: "EventSink") -> None:
Expand Down Expand Up @@ -350,6 +422,19 @@ def org_id(self) -> str:
"""
return self._org_id

@property
def tlogger(self) -> TenantContextLogAdapter:
"""Per-instance tenant-aware logger.

Wraps a standard :class:`logging.Logger` named by the subclass
module and stamps ``org_id`` into every record's ``extra`` dict
AND prefixes the message with ``[org_id=...]``. Subclasses
SHOULD use this in place of ``logging.getLogger(__name__)`` so
adapter log lines carry tenant context end-to-end (Gap 4 of the
multi-tenancy hardening contract).
"""
return self._tlogger

# --- Abstract lifecycle methods ---

@abstractmethod
Expand Down Expand Up @@ -496,6 +581,12 @@ def emit_event(

payload = self._stamp_org_id(payload)

# Cross-correlate the SDK's per-event tenant binding with the
# currently-active OTel span so distributed-trace consumers can
# filter spans by tenant. No-op when OTel is absent or when no
# recording span is active. (Gap 3.)
_set_current_span_org_id(self._org_id)

try:
if privacy_level is not None:
self._stratix.emit(payload, privacy_level)
Expand Down Expand Up @@ -532,6 +623,10 @@ def emit_dict_event(

payload = self._stamp_org_id(payload)

# Cross-correlate with the active OTel span (see emit_event for
# the full rationale). Same no-op semantics. (Gap 3.)
_set_current_span_org_id(self._org_id)

try:
self._stratix.emit(event_type, payload)
self._post_emit_success(event_type, payload)
Expand Down
128 changes: 128 additions & 0 deletions src/layerlens/instrument/adapters/_base/logging.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
"""Tenant-aware logging utilities for LayerLens adapters.

Provides :class:`TenantContextLogAdapter` (a thin
:class:`logging.LoggerAdapter` subclass) and the convenience constructor
:func:`get_tenant_logger`. Every log record produced through the adapter
is enriched with the bound tenant's ``org_id`` in two surfaces:

* the ``extra`` keyword propagated to ``logging.LogRecord.__dict__`` so
log handlers (JSON formatters, OTel log exporters, structlog
processors) can promote it to a structured field;
* the ``"[org_id=...] "`` prefix on the formatted message so plain-text
log lines carry the tenant binding without any handler config.

This implements Gap 4 of the multi-tenancy hardening contract
(CLAUDE.md "EVERY data operation must be scoped by tenant"). Adapter
log lines previously omitted ``org_id``, making per-tenant log
filtering and incident triage impossible. After this change every
emission, circuit-breaker state change, and sink dispatch failure logs
carries the tenant context end-to-end.

Thread-safety
-------------
The Python ``logging`` module is process-wide thread-safe; this adapter
adds no shared mutable state, only a per-instance ``org_id`` string.
Multiple adapters bound to different tenants can therefore share a
single underlying ``logging.Logger`` without leaking org_id across
instances.
"""

from __future__ import annotations

import logging
from typing import Any, Tuple, Mapping, MutableMapping

# Reserved key used both inside ``extra`` and as the formatted prefix.
# Matches :data:`layerlens.instrument.adapters._base.adapter.ORG_ID_FIELD`
# — the same canonical key used everywhere in the platform.
_ORG_ID_KEY: str = "org_id"


class TenantContextLogAdapter(logging.LoggerAdapter): # type: ignore[type-arg]
""":class:`logging.LoggerAdapter` that injects ``org_id`` into every record.

Construction takes a base :class:`logging.Logger` and the tenant's
``org_id``. Each ``debug`` / ``info`` / ``warning`` / ``error`` /
``critical`` call:

1. Adds ``org_id`` to the record's ``extra`` dict so structured
handlers see it as a first-class field.
2. Prepends ``"[org_id=<value>] "`` to the message body so flat-text
log lines carry the tenant binding without handler config.

The adapter is **per-instance** — adapters bound to different
tenants must use distinct :class:`TenantContextLogAdapter` instances
even when they share the same underlying logger name. The
:func:`get_tenant_logger` factory enforces this by always creating a
fresh instance.

Example::

log = TenantContextLogAdapter(logging.getLogger(__name__), org_id="org-A")
log.warning("circuit breaker open")
# record.extra["org_id"] == "org-A"
# formatted message: "[org_id=org-A] circuit breaker open"
"""

def __init__(self, logger: logging.Logger, org_id: str) -> None:
if not isinstance(org_id, str) or not org_id.strip():
raise ValueError(
"TenantContextLogAdapter requires a non-empty org_id "
"string. Construction without a tenant binding violates "
"the multi-tenancy contract — see CLAUDE.md."
)
super().__init__(logger, {_ORG_ID_KEY: org_id})
self._org_id: str = org_id

@property
def org_id(self) -> str:
"""The tenant ``org_id`` bound to this log adapter."""
return self._org_id

def process(
self,
msg: Any,
kwargs: MutableMapping[str, Any],
) -> Tuple[Any, MutableMapping[str, Any]]:
"""Inject ``org_id`` into the record's ``extra`` and prefix the message.

``kwargs["extra"]`` is merged (caller-supplied keys win EXCEPT
for ``org_id``, which the adapter always stamps to its bound
tenant — caller cannot override the tenant binding via a stray
``extra={"org_id": ...}`` argument).
"""
# Merge caller extras with the tenant binding. Tenant binding
# always wins to prevent caller-supplied org_id from
# impersonating a different tenant in log records.
existing_extra: Mapping[str, Any] = kwargs.get("extra") or {}
merged_extra: dict[str, Any] = dict(existing_extra)
merged_extra[_ORG_ID_KEY] = self._org_id
kwargs["extra"] = merged_extra

# Prefix the message so plain-text handlers also surface the
# tenant binding. Use repr-safe brackets to avoid collision with
# other bracketed prefixes in adapter log lines.
return f"[{_ORG_ID_KEY}={self._org_id}] {msg}", kwargs


def get_tenant_logger(name: str, org_id: str) -> TenantContextLogAdapter:
"""Construct a :class:`TenantContextLogAdapter` for ``name`` bound to ``org_id``.

Convenience wrapper that mirrors the shape of
:func:`logging.getLogger` so adapter modules can swap a plain
``logger = logging.getLogger(__name__)`` for
``logger = get_tenant_logger(__name__, self._org_id)`` with a
one-line change.

The underlying logger is shared across calls with the same name (as
with :func:`logging.getLogger`); only the adapter wrapper is fresh
per call so each adapter instance carries its own tenant binding.
"""
base = logging.getLogger(name)
return TenantContextLogAdapter(base, org_id)


__all__ = [
"TenantContextLogAdapter",
"get_tenant_logger",
]
Loading