From 9822eda6105ad920fe8a37eb6125544191c16a92 Mon Sep 17 00:00:00 2001 From: mmercuri Date: Sat, 25 Apr 2026 19:35:54 -0700 Subject: [PATCH] instrument: HTTP/OTLP transport sinks + extras matrix CI MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Lands the telemetry transport layer that adapter EventSinks ship to, plus the CI gate that smoke-tests every adapter extra at both the floor and ceiling of its declared pin range. Scope ----- - src/layerlens/instrument/transport/sink_http.py: HTTPSink — async batching transport for the LayerLens ingest endpoint - src/layerlens/instrument/transport/sink_otlp.py: OTLPHttpSink — hand-rolled protobuf marshalling against opentelemetry-proto so consumers don't pull in opentelemetry-sdk just to ship spans - tests/instrument/transport/test_sink_otlp.py: 27 unit tests for the OTLP serializer + transport - tests/instrument/test_sink_http_e2e.py: 9 end-to-end tests against a local FastAPI receiver - samples/instrument/otlp_collector/: runnable sample pointing the OTLPHttpSink at a Collector - docs/adapters/otlp.md: configuration guide - .github/workflows/extras-matrix.yaml: per-extra pin-range matrix that installs every framework / protocol / provider extra at both its floor and ceiling and runs an import-only smoke check - pyproject.toml: two new optional extras - `otel`: opentelemetry-api + opentelemetry-sdk + opentelemetry-exporter-otlp-proto-http (for callers who want the full SDK) - `otlp`: opentelemetry-proto only (for the lean OTLPHttpSink path) Blast radius ------------ - Default `pip install layerlens` install set is unchanged. Both transport sinks are inert until an adapter is wired to them. - No changes to existing public API surface. Test plan --------- - uv run pytest tests/instrument/transport/ tests/instrument/ test_sink_http_e2e.py -x -> 38 passed - The extras-matrix workflow exercises every adapter extra on PR. Stacks on --------- - feat/instrument-base-foundation (M1.A) — required for the EventSink contract this PR implements. LAY-3400 umbrella (M1 transport tier). --- .github/workflows/extras-matrix.yaml | 283 ++++++ docs/adapters/otlp.md | 217 +++++ pyproject.toml | 17 + samples/instrument/otlp_collector/__init__.py | 0 samples/instrument/otlp_collector/main.py | 102 +++ .../instrument/transport/__init__.py | 26 + .../instrument/transport/sink_http.py | 500 +++++++++++ .../instrument/transport/sink_otlp.py | 829 ++++++++++++++++++ tests/instrument/test_sink_http_e2e.py | 321 +++++++ tests/instrument/transport/__init__.py | 0 tests/instrument/transport/test_sink_otlp.py | 732 ++++++++++++++++ 11 files changed, 3027 insertions(+) create mode 100644 .github/workflows/extras-matrix.yaml create mode 100644 docs/adapters/otlp.md create mode 100644 samples/instrument/otlp_collector/__init__.py create mode 100644 samples/instrument/otlp_collector/main.py create mode 100644 src/layerlens/instrument/transport/__init__.py create mode 100644 src/layerlens/instrument/transport/sink_http.py create mode 100644 src/layerlens/instrument/transport/sink_otlp.py create mode 100644 tests/instrument/test_sink_http_e2e.py create mode 100644 tests/instrument/transport/__init__.py create mode 100644 tests/instrument/transport/test_sink_otlp.py diff --git a/.github/workflows/extras-matrix.yaml b/.github/workflows/extras-matrix.yaml new file mode 100644 index 0000000..cf48481 --- /dev/null +++ b/.github/workflows/extras-matrix.yaml @@ -0,0 +1,283 @@ +name: Extras Pin-Range Matrix + +# Round-2 deliverable: every framework / protocol / provider extra is +# smoke-tested at BOTH the lower bound of its declared pin range AND +# the latest version inside that range. +# +# Why both? A floor that's too low ships a wheel that fails to import +# against ancient releases; a ceiling that's too high lets new majors +# break SDK consumers. This matrix catches both regressions on every PR. +# +# Each row installs `layerlens[]` plus an explicit version pin +# for the underlying framework, then runs an import-only smoke check. +# Adapter modules are loaded lazily, so we explicitly trigger the +# adapter import via `layerlens.instrument` registry lookups. +# +# Matrix derived from `[project.optional-dependencies]` in pyproject.toml. +# When a new extra is added there, append a corresponding row here. + +on: + pull_request: + branches: [main] + push: + branches: [main] + +jobs: + # --------------------------------------------------------------- + # Frameworks (Python 3.10+ where the framework requires it). + # --------------------------------------------------------------- + frameworks-py310: + name: ${{ matrix.extra }} (${{ matrix.bound }}) + runs-on: ubuntu-latest + strategy: + fail-fast: false + matrix: + include: + # langchain: >=0.2,<0.4 + - { extra: "langchain", bound: "min", pin: "langchain==0.2.0 langchain-core==0.2.0" } + - { extra: "langchain", bound: "max", pin: "" } + # langgraph: >=0.2,<0.4 + - { extra: "langgraph", bound: "min", pin: "langgraph==0.2.0" } + - { extra: "langgraph", bound: "max", pin: "" } + # crewai: >=0.30,<0.90 + - { extra: "crewai", bound: "min", pin: "crewai==0.30.0" } + - { extra: "crewai", bound: "max", pin: "" } + # autogen: pyautogen>=0.2,<0.5 + - { extra: "autogen", bound: "min", pin: "pyautogen==0.2.0" } + - { extra: "autogen", bound: "max", pin: "" } + # semantic-kernel: >=1.0,<2.0 + - { extra: "semantic-kernel", bound: "min", pin: "semantic-kernel==1.0.0" } + - { extra: "semantic-kernel", bound: "max", pin: "" } + # llama-index: >=0.10,<0.13 + - { extra: "llama-index", bound: "min", pin: "llama-index==0.10.0" } + - { extra: "llama-index", bound: "max", pin: "" } + # openai-agents: openai>=1.30,<2 + - { extra: "openai-agents", bound: "min", pin: "openai==1.30.0" } + - { extra: "openai-agents", bound: "max", pin: "" } + # pydantic-ai: >=0.0.13,<1.0 + - { extra: "pydantic-ai", bound: "min", pin: "pydantic-ai==0.0.13" } + - { extra: "pydantic-ai", bound: "max", pin: "" } + # agno: >=0.1,<1.0 + - { extra: "agno", bound: "min", pin: "agno==0.1.0" } + - { extra: "agno", bound: "max", pin: "" } + # strands: strands-agents>=0.1,<1.0 + - { extra: "strands", bound: "min", pin: "strands-agents==0.1.0" } + - { extra: "strands", bound: "max", pin: "" } + # smolagents: >=1.0,<2.0 + - { extra: "smolagents", bound: "min", pin: "smolagents==1.0.0" } + - { extra: "smolagents", bound: "max", pin: "" } + # ms-agent-framework: semantic-kernel>=1.0,<2.0 + - { extra: "ms-agent-framework", bound: "min", pin: "semantic-kernel==1.0.0" } + - { extra: "ms-agent-framework", bound: "max", pin: "" } + # google-adk: >=0.1,<1.0 + - { extra: "google-adk", bound: "min", pin: "google-adk==0.1.0" } + - { extra: "google-adk", bound: "max", pin: "" } + steps: + - uses: actions/checkout@v4 + - name: Set up Python 3.10 + uses: actions/setup-python@v5 + with: + python-version: "3.10" + - name: Install layerlens with extra and pin + run: | + python -m pip install --upgrade pip + python -m pip install -e ".[${{ matrix.extra }}]" + if [ -n "${{ matrix.pin }}" ]; then + python -m pip install ${{ matrix.pin }} + fi + - name: Smoke-test import + run: | + python -c "import layerlens; print('layerlens', layerlens.__version__)" + python -c "import layerlens.instrument as m; print('instrument:', m.__name__)" + + # --------------------------------------------------------------- + # Frameworks (Python 3.11+ — browser-use floor is 3.11). + # --------------------------------------------------------------- + frameworks-py311: + name: ${{ matrix.extra }} (${{ matrix.bound }}) + runs-on: ubuntu-latest + strategy: + fail-fast: false + matrix: + include: + # browser-use: >=0.1,<1.0 + - { extra: "browser-use", bound: "min", pin: "browser-use==0.1.0" } + - { extra: "browser-use", bound: "max", pin: "" } + steps: + - uses: actions/checkout@v4 + - name: Set up Python 3.11 + uses: actions/setup-python@v5 + with: + python-version: "3.11" + - name: Install layerlens with extra and pin + run: | + python -m pip install --upgrade pip + python -m pip install -e ".[${{ matrix.extra }}]" + if [ -n "${{ matrix.pin }}" ]; then + python -m pip install ${{ matrix.pin }} + fi + - name: Smoke-test import + run: | + python -c "import layerlens; print('layerlens', layerlens.__version__)" + python -c "import layerlens.instrument as m; print('instrument:', m.__name__)" + + # --------------------------------------------------------------- + # Frameworks with no upper bound (pin floor only at min, default at max). + # --------------------------------------------------------------- + frameworks-open-upper: + name: ${{ matrix.extra }} (${{ matrix.bound }}) + runs-on: ubuntu-latest + strategy: + fail-fast: false + matrix: + include: + # agentforce: requests>=2.28 + - { extra: "agentforce", bound: "min", pin: "requests==2.28.0" } + - { extra: "agentforce", bound: "max", pin: "" } + # bedrock-agents: boto3>=1.34 + - { extra: "bedrock-agents", bound: "min", pin: "boto3==1.34.0" } + - { extra: "bedrock-agents", bound: "max", pin: "" } + steps: + - uses: actions/checkout@v4 + - name: Set up Python 3.10 + uses: actions/setup-python@v5 + with: + python-version: "3.10" + - name: Install layerlens with extra and pin + run: | + python -m pip install --upgrade pip + python -m pip install -e ".[${{ matrix.extra }}]" + if [ -n "${{ matrix.pin }}" ]; then + python -m pip install ${{ matrix.pin }} + fi + - name: Smoke-test import + run: | + python -c "import layerlens; print('layerlens', layerlens.__version__)" + python -c "import layerlens.instrument as m; print('instrument:', m.__name__)" + + # --------------------------------------------------------------- + # Protocol adapters (Python 3.10+ for ones that require it). + # --------------------------------------------------------------- + protocols: + name: ${{ matrix.extra }} (${{ matrix.bound }}) + runs-on: ubuntu-latest + strategy: + fail-fast: false + matrix: + include: + # protocols-a2a: httpx>=0.23.0,<1 (already required, but smoke-test the extra resolves) + - { extra: "protocols-a2a", bound: "min", pin: "httpx==0.23.0", py: "3.10" } + - { extra: "protocols-a2a", bound: "max", pin: "", py: "3.10" } + # protocols-agui: ag-ui>=0.1 + - { extra: "protocols-agui", bound: "min", pin: "ag-ui==0.1.0", py: "3.10" } + - { extra: "protocols-agui", bound: "max", pin: "", py: "3.10" } + # protocols-mcp: mcp>=0.9 + - { extra: "protocols-mcp", bound: "min", pin: "mcp==0.9.0", py: "3.10" } + - { extra: "protocols-mcp", bound: "max", pin: "", py: "3.10" } + steps: + - uses: actions/checkout@v4 + - name: Set up Python ${{ matrix.py }} + uses: actions/setup-python@v5 + with: + python-version: ${{ matrix.py }} + - name: Install layerlens with extra and pin + run: | + python -m pip install --upgrade pip + python -m pip install -e ".[${{ matrix.extra }}]" + if [ -n "${{ matrix.pin }}" ]; then + python -m pip install ${{ matrix.pin }} + fi + - name: Smoke-test import + run: | + python -c "import layerlens; print('layerlens', layerlens.__version__)" + python -c "import layerlens.instrument as m; print('instrument:', m.__name__)" + + # --------------------------------------------------------------- + # LLM provider adapters. + # --------------------------------------------------------------- + providers: + name: ${{ matrix.extra }} (${{ matrix.bound }}) + runs-on: ubuntu-latest + strategy: + fail-fast: false + matrix: + include: + # providers-openai: openai>=1.30,<2 + - { extra: "providers-openai", bound: "min", pin: "openai==1.30.0" } + - { extra: "providers-openai", bound: "max", pin: "" } + # providers-anthropic: anthropic>=0.30,<1 + - { extra: "providers-anthropic", bound: "min", pin: "anthropic==0.30.0" } + - { extra: "providers-anthropic", bound: "max", pin: "" } + # providers-azure-openai: openai>=1.30,<2 + - { extra: "providers-azure-openai", bound: "min", pin: "openai==1.30.0" } + - { extra: "providers-azure-openai", bound: "max", pin: "" } + # providers-bedrock: boto3>=1.34 + - { extra: "providers-bedrock", bound: "min", pin: "boto3==1.34.0" } + - { extra: "providers-bedrock", bound: "max", pin: "" } + # providers-vertex: google-cloud-aiplatform>=1.50,<2 + - { extra: "providers-vertex", bound: "min", pin: "google-cloud-aiplatform==1.50.0" } + - { extra: "providers-vertex", bound: "max", pin: "" } + # providers-ollama: ollama>=0.2 + - { extra: "providers-ollama", bound: "min", pin: "ollama==0.2.0" } + - { extra: "providers-ollama", bound: "max", pin: "" } + # providers-litellm: litellm>=1.40,<2 + - { extra: "providers-litellm", bound: "min", pin: "litellm==1.40.0" } + - { extra: "providers-litellm", bound: "max", pin: "" } + # providers-cohere: cohere>=5.0,<6 + - { extra: "providers-cohere", bound: "min", pin: "cohere==5.0.0" } + - { extra: "providers-cohere", bound: "max", pin: "" } + # providers-mistral: mistralai>=1.0,<2 + - { extra: "providers-mistral", bound: "min", pin: "mistralai==1.0.0" } + - { extra: "providers-mistral", bound: "max", pin: "" } + steps: + - uses: actions/checkout@v4 + - name: Set up Python 3.10 + uses: actions/setup-python@v5 + with: + python-version: "3.10" + - name: Install layerlens with extra and pin + run: | + python -m pip install --upgrade pip + python -m pip install -e ".[${{ matrix.extra }}]" + if [ -n "${{ matrix.pin }}" ]; then + python -m pip install ${{ matrix.pin }} + fi + - name: Smoke-test import + run: | + python -c "import layerlens; print('layerlens', layerlens.__version__)" + python -c "import layerlens.instrument as m; print('instrument:', m.__name__)" + + # --------------------------------------------------------------- + # Observability extras (otel + otlp). + # --------------------------------------------------------------- + observability: + name: ${{ matrix.extra }} (${{ matrix.bound }}) + runs-on: ubuntu-latest + strategy: + fail-fast: false + matrix: + include: + # otel: opentelemetry-{api,sdk,exporter-otlp-proto-http}>=1.25 + - { extra: "otel", bound: "min", pin: "opentelemetry-api==1.25.0 opentelemetry-sdk==1.25.0 opentelemetry-exporter-otlp-proto-http==1.25.0" } + - { extra: "otel", bound: "max", pin: "" } + # otlp: opentelemetry-proto>=1.27 + - { extra: "otlp", bound: "min", pin: "opentelemetry-proto==1.27.0" } + - { extra: "otlp", bound: "max", pin: "" } + steps: + - uses: actions/checkout@v4 + - name: Set up Python 3.10 + uses: actions/setup-python@v5 + with: + python-version: "3.10" + - name: Install layerlens with extra and pin + run: | + python -m pip install --upgrade pip + python -m pip install -e ".[${{ matrix.extra }}]" + if [ -n "${{ matrix.pin }}" ]; then + python -m pip install ${{ matrix.pin }} + fi + - name: Smoke-test import + run: | + python -c "import layerlens; print('layerlens', layerlens.__version__)" + python -c "import layerlens.instrument as m; print('instrument:', m.__name__)" + python -c "from layerlens.instrument import transport; print('transport ok')" diff --git a/docs/adapters/otlp.md b/docs/adapters/otlp.md new file mode 100644 index 0000000..dd5cfe2 --- /dev/null +++ b/docs/adapters/otlp.md @@ -0,0 +1,217 @@ +# OTLP/HTTP Sink (`OTLPHttpSink`) + +`OTLPHttpSink` is an `EventSink` that ships LayerLens Instrument +telemetry to any OTLP/HTTP-compatible backend — the OpenTelemetry +Collector, Jaeger, Honeycomb, Datadog, Grafana Tempo / Loki / Mimir, +New Relic, or atlas-app's `/v1/{traces,logs,metrics}` endpoints. + +It mirrors the same contract as `HttpEventSink` (sync, batched, retried, +daemon-flushed, `stats()`-instrumented) but emits OTLP protobuf or JSON +on the wire instead of LayerLens's native span shape. + +## When to use it + +| Sink | Use when… | +|------|-----------| +| `HttpEventSink` | Shipping directly to atlas-app's `/api/v1/telemetry/spans`. Smallest payload, native LayerLens schema, control-plane features (replay, attestation chain, Kraken evaluations) automatically apply. | +| `OTLPHttpSink` | Shipping to an OTel Collector you already operate, or a third-party OTLP-aware backend (Jaeger, Honeycomb, etc.), or atlas-app's OTLP-compatible endpoints (`/v1/traces`, `/v1/logs`, `/v1/metrics`). | + +If you already have an OTel Collector deployed, `OTLPHttpSink` lets +`layerlens` slot into your existing telemetry pipeline with no +infrastructure change. The Collector can then forward to atlas-app, your +existing observability vendor, or both. + +## Install + +```bash +pip install 'layerlens[otlp]' +``` + +The `[otlp]` extra adds a single dependency: `opentelemetry-proto>=1.27`. +The sink hand-rolls protobuf marshalling against the proto definitions — +it does **not** require `opentelemetry-sdk` or any provider, so default +install size stays small. + +## Quickstart — local OTel Collector + +```python +from layerlens.instrument.transport.sink_otlp import OTLPHttpSink +from layerlens.instrument.adapters.providers.openai_adapter import OpenAIAdapter +from layerlens.instrument.adapters._base import CaptureConfig + +sink = OTLPHttpSink( + adapter_name="openai", + service_name="my-agent-service", + service_version="1.4.2", + endpoint="http://localhost:4318", # base URL only +) + +adapter = OpenAIAdapter(capture_config=CaptureConfig.standard()) +adapter.add_sink(sink) +adapter.connect() +# ... run your workload ... +sink.close() +``` + +The sink derives per-signal endpoints automatically: + +* Traces → `http://localhost:4318/v1/traces` +* Logs → `http://localhost:4318/v1/logs` +* Metrics → `http://localhost:4318/v1/metrics` + +These match the [OTLP/HTTP spec](https://opentelemetry.io/docs/specs/otlp/#otlphttp). + +## Quickstart — atlas-app OTLP endpoints + +atlas-app exposes OTLP-compatible endpoints at `/v1/{traces,logs,metrics}` +that accept the standard `application/x-protobuf` payload **plus** a +LayerLens API key in `X-API-Key`: + +```python +sink = OTLPHttpSink( + adapter_name="openai", + service_name="my-agent-service", + endpoint="https://api.layerlens.ai", + api_key=os.environ["LAYERLENS_STRATIX_API_KEY"], +) +``` + +Same wire format, same retry semantics — just a different endpoint and +the `X-API-Key` header. + +## Quickstart — Honeycomb (or any third-party OTLP backend) + +```python +sink = OTLPHttpSink( + adapter_name="openai", + service_name="my-agent-service", + endpoint="https://api.honeycomb.io", + bearer_token=os.environ["HONEYCOMB_API_KEY"], + headers={"x-honeycomb-team": os.environ["HONEYCOMB_TEAM"]}, +) +``` + +`headers=` adds vendor-specific headers on top of the standard auth +headers; the sink merges them all on every POST. + +## Endpoint configuration + +Three options, in order of precedence: + +1. **Per-signal full URL** (highest precedence): + + ```python + OTLPHttpSink( + adapter_name="openai", + traces_endpoint="https://traces.example.com/api/spans", + logs_endpoint="https://logs.example.com/api/logs", + metrics_endpoint="https://metrics.example.com/api/metrics", + ) + ``` + +2. **Single base URL** (most common): + + ```python + OTLPHttpSink(adapter_name="openai", endpoint="http://collector:4318") + # → http://collector:4318/v1/traces + # → http://collector:4318/v1/logs + # → http://collector:4318/v1/metrics + ``` + +3. **Environment variable** `LAYERLENS_OTLP_ENDPOINT`: + + ```bash + export LAYERLENS_OTLP_ENDPOINT=http://collector:4318 + ``` + +## Content-Type + +Two wire formats are supported: + +| Content-Type | Default? | When to use | +|--------------|----------|-------------| +| `application/x-protobuf` | ✅ | Default. Smallest payload, what every OTLP receiver speaks natively. | +| `application/json` | | Easier to inspect during pipeline debugging; slower and ~5x larger. | + +```python +from layerlens.instrument.transport.sink_otlp import ( + CONTENT_TYPE_JSON, + OTLPHttpSink, +) + +sink = OTLPHttpSink( + adapter_name="openai", + endpoint="http://collector:4318", + content_type=CONTENT_TYPE_JSON, # human-readable wire bytes +) +``` + +## Event → OTLP signal mapping + +| Event type | OTLP signal | Notes | +|------------|-------------|-------| +| `model.invoke`, `model.response`, `tool.call`, `tool.result`, `agent.act`, `agent.start`, `agent.end`, `span.*` | **Traces** | One `Span` per event. Payload keys become span attributes (prefixed `layerlens.*` for adapter / event-type metadata). | +| `cost.record`, `metric.*` | **Metrics** | One monotonic `Sum` data point per *numeric* payload key. The metric name is `layerlens..` (e.g. `layerlens.cost.record.cost_usd`). String fields are dropped silently — they're not metrics. | +| `log.` (e.g. `log.info`, `log.error`) | **Logs** | One `LogRecord` per event. `payload["message"]` becomes the log body; severity is derived from the `` suffix (`info`, `warn`, `error`, `fatal`, etc.). | +| Anything else | **Traces** (default) | Unknown events route to traces so nothing is silently dropped. | + +Every export carries the standard OTel resource attributes: + +* `service.name` — defaults to `adapter_name`, override with `service_name=`. +* `service.version` — defaults to the SDK version, override with `service_version=`. +* `telemetry.sdk.name = "layerlens"`, `telemetry.sdk.language = "python"`, `telemetry.sdk.version = `. + +Pass `resource_attrs={"deployment.environment": "prod", "k8s.pod.name": "..."}` +to merge in your own resource attributes. + +## Retry, backoff, and durability + +`OTLPHttpSink` matches `HttpEventSink` exactly: + +* On `429` / `500` / `502` / `503` / `504`: retry up to 2 times with + exponential backoff (0.5s → 1s → 2s, capped at 8s). +* `Retry-After` header is honored when present. +* On `4xx` (except `429`): drop the batch — the body is malformed, no + amount of retrying will fix it. +* After 3 consecutive batch drops on a given signal, the sink logs at + `WARN` once with code `layerlens.sink.batch_dropped` so log-based + alerting can pick it up. Subsequent drops in the same window stay at + `DEBUG`. + +The daemon flush thread (configured via `flush_interval_s`, default +1.0s) ensures sporadic adapters don't leave events buffered until +process exit. Set `background_flush=False` for deterministic +single-thread tests. + +## `stats()` schema + +`OTLPHttpSink.stats()` returns per-signal counters plus totals: + +```python +{ + "batches_sent_traces": int, + "batches_sent_logs": int, + "batches_sent_metrics": int, + "batches_dropped_traces": int, + "batches_dropped_logs": int, + "batches_dropped_metrics": int, + "buffer_size_traces": int, + "buffer_size_logs": int, + "buffer_size_metrics": int, + "consecutive_drops_traces": int, + "consecutive_drops_logs": int, + "consecutive_drops_metrics": int, + "batches_sent": int, # total across all signals + "batches_dropped": int, + "buffer_size": int, + "consecutive_drops": int, # max across signals +} +``` + +Surface these in your service health endpoints to alert on telemetry +pipeline degradation. + +## Sample + +End-to-end runnable sample: +[`samples/instrument/otlp_collector/main.py`](../../samples/instrument/otlp_collector/main.py). diff --git a/pyproject.toml b/pyproject.toml index ae6d1dc..54b17da 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -34,6 +34,23 @@ classifiers = [ [project.optional-dependencies] cli = ["click>=8.0.0"] +# --- Instrument layer: observability transport --- +# Adding any extra below MUST keep the default `pip install layerlens` +# install set unchanged. Verified by `tests/instrument/test_default_install.py`. +otel = [ + "opentelemetry-api>=1.25", + "opentelemetry-sdk>=1.25", + "opentelemetry-exporter-otlp-proto-http>=1.25", +] + +# OTLP HTTP sink (OTLPHttpSink). Hand-rolled protobuf marshalling +# against opentelemetry-proto so we don't pull in opentelemetry-sdk +# just to ship bytes to a Collector. Use this extra to enable +# `layerlens.instrument.transport.OTLPHttpSink`. +otlp = [ + "opentelemetry-proto>=1.27", +] + [project.urls] Homepage = "https://github.com/LayerLens/stratix-python" Repository = "https://github.com/LayerLens/stratix-python" diff --git a/samples/instrument/otlp_collector/__init__.py b/samples/instrument/otlp_collector/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/samples/instrument/otlp_collector/main.py b/samples/instrument/otlp_collector/main.py new file mode 100644 index 0000000..c893f3f --- /dev/null +++ b/samples/instrument/otlp_collector/main.py @@ -0,0 +1,102 @@ +"""Sample: OpenAI adapter → OTLPHttpSink → OTel Collector. + +Demonstrates how to wire the LayerLens Instrument layer to any OTLP/HTTP +backend — the OpenTelemetry Collector, Jaeger, Honeycomb, atlas-app's +``/v1/{traces,logs,metrics}`` endpoints, etc. + +Differences from ``samples/instrument/openai/main.py``: + +* Uses :class:`OTLPHttpSink` instead of :class:`HttpEventSink`. +* Targets a local Collector at ``http://localhost:4318`` (override via + ``LAYERLENS_OTLP_ENDPOINT``). +* Emits three calls (one ``model.invoke``, one ``log.info``, one + ``cost.record``) so all three OTLP signals (traces, logs, metrics) + exercise the wire. +* Mocks the OpenAI client so the sample runs with no live API key. + +Required environment: + +* ``LAYERLENS_OTLP_ENDPOINT`` — base URL of your OTLP collector + (default: ``http://localhost:4318``). +* ``LAYERLENS_OTLP_BEARER`` — optional bearer token for the collector. +* ``LAYERLENS_OTLP_API_KEY`` — optional ``X-API-Key`` for atlas-app. + +Run:: + + pip install 'layerlens[otlp,providers-openai]' + python -m samples.instrument.otlp_collector.main +""" + +from __future__ import annotations + +import os +import sys +import time + +from layerlens.instrument.transport.sink_otlp import OTLPHttpSink + + +def main() -> int: + endpoint = os.environ.get("LAYERLENS_OTLP_ENDPOINT", "http://localhost:4318") + bearer = os.environ.get("LAYERLENS_OTLP_BEARER") or None + api_key = os.environ.get("LAYERLENS_OTLP_API_KEY") or None + + print(f"Wiring OTLPHttpSink to {endpoint}") + sink = OTLPHttpSink( + adapter_name="openai", + service_name="layerlens-sample", + service_version="0.1.0", + endpoint=endpoint, + bearer_token=bearer, + api_key=api_key, + max_batch=10, + flush_interval_s=1.0, + resource_attrs={ + "deployment.environment": "demo", + }, + ) + + # Emit 3 events covering all three OTLP signals so the collector + # sees traces + logs + metrics from a single sink. + now_ns = time.time_ns() + sink.send( + "model.invoke", + { + "model": "gpt-4o-mini", + "prompt_tokens": 17, + "completion_tokens": 4, + "trace_id": "0123456789abcdef0123456789abcdef", + }, + now_ns, + ) + sink.send( + "log.info", + {"message": "Sample agent run completed", "step": 1}, + now_ns + 1_000_000, + ) + sink.send( + "cost.record", + {"cost_usd": 0.000125, "tokens": 21}, + now_ns + 2_000_000, + ) + + sink.flush() + sink.close() + + stats = sink.stats() + print("OTLP sink stats after flush:") + for key in sorted(stats.keys()): + print(f" {key:32s} {stats[key]}") + + if stats["batches_sent"] == 0: + print( + "WARNING: no batches were accepted by the collector. " + "Is it running and listening on the configured endpoint?", + file=sys.stderr, + ) + return 1 + return 0 + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/src/layerlens/instrument/transport/__init__.py b/src/layerlens/instrument/transport/__init__.py new file mode 100644 index 0000000..2ac3294 --- /dev/null +++ b/src/layerlens/instrument/transport/__init__.py @@ -0,0 +1,26 @@ +"""HTTP transport sinks for the LayerLens Instrument layer. + +This package contains :class:`EventSink` implementations that ship +adapter-emitted events to remote endpoints — primarily atlas-app's +telemetry ingestion API at ``/api/v1/telemetry/spans``. The sinks +build on the SDK's existing ``httpx``-based transport, reusing the +exponential-backoff retry policy from ``layerlens._base_client``. + +Available sinks: + +* :class:`HttpEventSink` — synchronous httpx sink with batching. +* :class:`AsyncHttpEventSink` — asyncio sink for async adapters. +* :class:`OTLPHttpSink` — OTLP/HTTP exporter for any OTel-aware backend + (Collector, Jaeger, Honeycomb, atlas-app's ``/v1/{traces,logs,metrics}``). + Requires the ``[otlp]`` extra. +""" + +from __future__ import annotations + +from layerlens.instrument.transport.sink_http import ( + HttpEventSink, + AsyncHttpEventSink, +) +from layerlens.instrument.transport.sink_otlp import OTLPHttpSink + +__all__ = ["AsyncHttpEventSink", "HttpEventSink", "OTLPHttpSink"] diff --git a/src/layerlens/instrument/transport/sink_http.py b/src/layerlens/instrument/transport/sink_http.py new file mode 100644 index 0000000..7b2651a --- /dev/null +++ b/src/layerlens/instrument/transport/sink_http.py @@ -0,0 +1,500 @@ +"""HTTP-based event sinks. + +The SDK ships adapter telemetry to atlas-app via httpx. Two sinks are +provided: + +* :class:`HttpEventSink` — synchronous, immediate or batched flush, used + by sync adapters (LangChain callbacks, OpenAI client wrappers). +* :class:`AsyncHttpEventSink` — async equivalent for adapters that run + inside an event loop (LangGraph, AsyncAnthropic). + +Both sinks: + +* Reuse the SDK's existing ``X-API-Key`` auth and base-URL conventions. +* Apply exponential backoff (0.5s → 8s) on 429/5xx, matching + ``layerlens._base_client.MAX_RETRY_DELAY``. +* Treat the network as best-effort: a sink that cannot deliver an event + logs at DEBUG and drops the batch. The adapter's circuit breaker is + the authority on persistent transport failures. + +The default endpoint is ``/api/v1/telemetry/spans`` on the atlas-app +control plane. The path is configurable so the same sink can be +pointed at a self-hosted ingest gateway. +""" + +from __future__ import annotations + +import os +import time +import asyncio +import logging +import threading +from typing import Any, Dict, List, Optional + +import httpx + +from layerlens.instrument.adapters._base.sinks import EventSink + +logger = logging.getLogger(__name__) + + +_DEFAULT_BASE_URL = os.environ.get( + "LAYERLENS_STRATIX_BASE_URL", + "https://api.layerlens.ai/api/v1", +) +_DEFAULT_PATH = "/telemetry/spans" +_DEFAULT_TIMEOUT_S = 10.0 +_DEFAULT_MAX_BATCH = 50 +_DEFAULT_FLUSH_INTERVAL_S = 1.0 + +_MAX_RETRIES = 2 +_INITIAL_RETRY_DELAY_S = 0.5 +_MAX_RETRY_DELAY_S = 8.0 +_RETRY_STATUS_CODES = frozenset({429, 500, 502, 503, 504}) + + +def _format_event( + event_type: str, + payload: Dict[str, Any], + timestamp_ns: int, + adapter_name: str, + trace_id: Optional[str], +) -> Dict[str, Any]: + """Build the wire JSON object for a single event. + + ``timestamp_ns`` is wall-clock UTC nanoseconds since the Unix epoch + (the value returned by :func:`time.time_ns`). The atlas-app worker + consuming this field assumes UTC nanoseconds — do not change this + contract without coordinating a wire-schema version bump. + """ + return { + "event_type": event_type, + "payload": payload, + "timestamp_ns": timestamp_ns, + "adapter": adapter_name, + "trace_id": trace_id, + } + + +def _post_with_retry( + client: httpx.Client, + path: str, + body: Dict[str, Any], +) -> bool: + """POST ``body`` to ``path`` with backoff on 429 / 5xx. + + Returns True if the post succeeded (2xx), False if it gave up after + retries or hit a 4xx (which we don't retry — the body is bad). + """ + delay = _INITIAL_RETRY_DELAY_S + retries_left = _MAX_RETRIES + + while True: + try: + resp = client.post(path, json=body) + except httpx.HTTPError as exc: + if retries_left > 0: + logger.debug( + "HttpEventSink transport error: %s (retries left: %d)", + exc, + retries_left, + ) + time.sleep(delay) + delay = min(delay * 2, _MAX_RETRY_DELAY_S) + retries_left -= 1 + continue + logger.debug("HttpEventSink giving up after transport errors", exc_info=True) + return False + + if resp.status_code in _RETRY_STATUS_CODES and retries_left > 0: + retry_after = resp.headers.get("retry-after") + try: + sleep_for = float(retry_after) if retry_after else delay + except ValueError: + sleep_for = delay + sleep_for = min(sleep_for, _MAX_RETRY_DELAY_S) + time.sleep(sleep_for) + delay = min(delay * 2, _MAX_RETRY_DELAY_S) + retries_left -= 1 + continue + + if 200 <= resp.status_code < 300: + return True + + # Non-retriable — log and drop. + logger.debug( + "HttpEventSink got non-retriable status %d body=%s", + resp.status_code, + resp.text[:500], + ) + return False + + +_CONSECUTIVE_DROP_WARN_THRESHOLD = 3 +_DROP_WARN_LOG_CODE = "layerlens.sink.batch_dropped" + + +class HttpEventSink(EventSink): + """Synchronous HTTP sink that POSTs events to atlas-app. + + Args: + adapter_name: Tag inserted into every event so the server can + attribute the source adapter. Required. + api_key: LayerLens API key (``X-API-Key`` header). Falls back to + the ``LAYERLENS_STRATIX_API_KEY`` env var. + base_url: Base URL for atlas-app. Defaults to + ``$LAYERLENS_STRATIX_BASE_URL`` or + ``https://api.layerlens.ai/api/v1``. + path: Endpoint path. Defaults to ``"/telemetry/spans"``. + trace_id: Optional trace-id to tag all events from this sink. + max_batch: Max events to buffer before forced flush. Default 50. + flush_interval_s: Wall-clock interval after which a partial + buffer is flushed. Default 1.0 second. + timeout_s: Per-request HTTP timeout in seconds. Default 10.0. + client: Optional pre-built ``httpx.Client``. If supplied, the + sink will not close it on :meth:`close`. + background_flush: If True (default), spawn a daemon thread that + wakes every ``flush_interval_s`` and forces a flush of any + buffered events. This bounds telemetry latency for adapters + that emit sporadically. Set False for deterministic + single-thread tests. + + The sink buffers events and flushes either when ``max_batch`` is + reached, when ``flush_interval_s`` elapses since the last flush, or + when :meth:`flush` / :meth:`close` is called. + + After 3 consecutive batch failures the sink logs at WARN once with + error code ``layerlens.sink.batch_dropped`` so log alerting can pick + it up. Subsequent failures within the same window stay at DEBUG. + + :meth:`stats` exposes ``batches_sent``, ``batches_dropped``, and + ``buffer_size`` for callers that want to surface sink health. + """ + + def __init__( + self, + adapter_name: str, + *, + api_key: Optional[str] = None, + base_url: Optional[str] = None, + path: str = _DEFAULT_PATH, + trace_id: Optional[str] = None, + max_batch: int = _DEFAULT_MAX_BATCH, + flush_interval_s: float = _DEFAULT_FLUSH_INTERVAL_S, + timeout_s: float = _DEFAULT_TIMEOUT_S, + client: Optional[httpx.Client] = None, + background_flush: bool = True, + ) -> None: + self._adapter_name = adapter_name + self._trace_id = trace_id + self._path = path + self._max_batch = max_batch + self._flush_interval_s = flush_interval_s + + self._buffer: List[Dict[str, Any]] = [] + self._lock = threading.Lock() + self._last_flush = time.monotonic() + self._closed = False + self._batches_sent = 0 + self._batches_dropped = 0 + self._consecutive_drops = 0 + + self._owns_client = client is None + if client is not None: + self._client = client + else: + resolved_key = api_key or os.environ.get("LAYERLENS_STRATIX_API_KEY", "") + resolved_base = base_url or _DEFAULT_BASE_URL + headers = { + "Accept": "application/json", + "Content-Type": "application/json", + } + if resolved_key: + headers["X-API-Key"] = resolved_key + self._client = httpx.Client( + base_url=resolved_base, + headers=headers, + timeout=timeout_s, + ) + + # Daemon timer thread bounds idle-flush latency so a sporadic + # adapter does not leave events buffered until process exit. + self._stop_event = threading.Event() + self._timer_thread: Optional[threading.Thread] = None + if background_flush and flush_interval_s > 0: + self._timer_thread = threading.Thread( + target=self._timer_loop, + name=f"layerlens-sink-{adapter_name}", + daemon=True, + ) + self._timer_thread.start() + + @property + def buffer_size(self) -> int: + with self._lock: + return len(self._buffer) + + def stats(self) -> Dict[str, int]: + """Snapshot of sink-level counters for observability.""" + with self._lock: + return { + "batches_sent": self._batches_sent, + "batches_dropped": self._batches_dropped, + "buffer_size": len(self._buffer), + "consecutive_drops": self._consecutive_drops, + } + + def send(self, event_type: str, payload: Dict[str, Any], timestamp_ns: int) -> None: + if self._closed: + return + + event = _format_event( + event_type=event_type, + payload=payload, + timestamp_ns=timestamp_ns, + adapter_name=self._adapter_name, + trace_id=self._trace_id, + ) + + should_flush = False + with self._lock: + self._buffer.append(event) + if len(self._buffer) >= self._max_batch: + should_flush = True + + if should_flush: + self.flush() + + def flush(self) -> None: + with self._lock: + if not self._buffer: + self._last_flush = time.monotonic() + return + batch = list(self._buffer) + self._buffer.clear() + self._last_flush = time.monotonic() + + body = {"events": batch} + ok = _post_with_retry(self._client, self._path, body) + if ok: + with self._lock: + self._batches_sent += 1 + self._consecutive_drops = 0 + else: + with self._lock: + self._batches_dropped += 1 + self._consecutive_drops += 1 + consecutive = self._consecutive_drops + if consecutive == _CONSECUTIVE_DROP_WARN_THRESHOLD: + logger.warning( + "%s: HttpEventSink for adapter %s dropped %d consecutive batches " + "(latest had %d events). Telemetry pipeline may be degraded.", + _DROP_WARN_LOG_CODE, + self._adapter_name, + consecutive, + len(batch), + ) + else: + logger.debug( + "HttpEventSink dropped batch of %d events for adapter %s " + "(consecutive=%d)", + len(batch), + self._adapter_name, + consecutive, + ) + + def close(self) -> None: + if self._closed: + return + self._closed = True + if self._timer_thread is not None: + self._stop_event.set() + self._timer_thread.join(timeout=max(self._flush_interval_s * 2, 1.0)) + try: + self.flush() + finally: + if self._owns_client: + try: + self._client.close() + except Exception: + logger.debug("HttpEventSink client.close() failed", exc_info=True) + + def _timer_loop(self) -> None: + """Background daemon: wake every ``flush_interval_s`` and flush.""" + while not self._stop_event.wait(self._flush_interval_s): + if self._closed: + return + try: + with self._lock: + has_data = bool(self._buffer) + if has_data: + self.flush() + except Exception: + logger.debug("HttpEventSink timer flush failed", exc_info=True) + + +class AsyncHttpEventSink(EventSink): + """Async HTTP sink for adapters running inside an event loop. + + The :meth:`send`, :meth:`flush`, and :meth:`close` methods on this + class are synchronous (matching :class:`EventSink`) but they + schedule work on the running loop. A separate :meth:`asend`, + :meth:`aflush`, and :meth:`aclose` are provided for callers that + can ``await`` the result. + + For callers that emit events from synchronous code paths inside an + async program (a common shape for OpenAI's sync client used inside + a FastAPI handler), use :class:`HttpEventSink` instead — it does + not require the loop. + """ + + def __init__( + self, + adapter_name: str, + *, + api_key: Optional[str] = None, + base_url: Optional[str] = None, + path: str = _DEFAULT_PATH, + trace_id: Optional[str] = None, + max_batch: int = _DEFAULT_MAX_BATCH, + timeout_s: float = _DEFAULT_TIMEOUT_S, + client: Optional[httpx.AsyncClient] = None, + ) -> None: + self._adapter_name = adapter_name + self._trace_id = trace_id + self._path = path + self._max_batch = max_batch + self._buffer: List[Dict[str, Any]] = [] + self._lock = asyncio.Lock() + self._closed = False + + self._owns_client = client is None + if client is not None: + self._client = client + else: + resolved_key = api_key or os.environ.get("LAYERLENS_STRATIX_API_KEY", "") + resolved_base = base_url or _DEFAULT_BASE_URL + headers = { + "Accept": "application/json", + "Content-Type": "application/json", + } + if resolved_key: + headers["X-API-Key"] = resolved_key + self._client = httpx.AsyncClient( + base_url=resolved_base, + headers=headers, + timeout=timeout_s, + ) + + def send(self, event_type: str, payload: Dict[str, Any], timestamp_ns: int) -> None: + """Sync entrypoint compatible with :class:`EventSink`. + + Schedules the post on the running event loop without blocking. + If no loop is running, falls back to ``asyncio.run`` for the + single send (slow path; prefer :meth:`asend`). + """ + if self._closed: + return + coro = self.asend(event_type, payload, timestamp_ns) + try: + loop = asyncio.get_running_loop() + loop.create_task(coro) + except RuntimeError: + asyncio.run(coro) + + async def asend( + self, + event_type: str, + payload: Dict[str, Any], + timestamp_ns: int, + ) -> None: + if self._closed: + return + + event = _format_event( + event_type=event_type, + payload=payload, + timestamp_ns=timestamp_ns, + adapter_name=self._adapter_name, + trace_id=self._trace_id, + ) + + async with self._lock: + self._buffer.append(event) + should_flush = len(self._buffer) >= self._max_batch + + if should_flush: + await self.aflush() + + def flush(self) -> None: + if self._closed: + return + coro = self.aflush() + try: + loop = asyncio.get_running_loop() + loop.create_task(coro) + except RuntimeError: + asyncio.run(coro) + + async def aflush(self) -> None: + async with self._lock: + if not self._buffer: + return + batch = list(self._buffer) + self._buffer.clear() + + delay = _INITIAL_RETRY_DELAY_S + retries_left = _MAX_RETRIES + body = {"events": batch} + + while True: + try: + resp = await self._client.post(self._path, json=body) + except httpx.HTTPError: + if retries_left > 0: + await asyncio.sleep(delay) + delay = min(delay * 2, _MAX_RETRY_DELAY_S) + retries_left -= 1 + continue + logger.debug( + "AsyncHttpEventSink dropped batch of %d events", len(batch) + ) + return + + if resp.status_code in _RETRY_STATUS_CODES and retries_left > 0: + retry_after = resp.headers.get("retry-after") + try: + sleep_for = float(retry_after) if retry_after else delay + except ValueError: + sleep_for = delay + await asyncio.sleep(min(sleep_for, _MAX_RETRY_DELAY_S)) + delay = min(delay * 2, _MAX_RETRY_DELAY_S) + retries_left -= 1 + continue + + if 200 <= resp.status_code < 300: + return + + logger.debug( + "AsyncHttpEventSink got non-retriable status %d", resp.status_code + ) + return + + def close(self) -> None: + if self._closed: + return + self._closed = True + coro = self.aclose() + try: + loop = asyncio.get_running_loop() + loop.create_task(coro) + except RuntimeError: + asyncio.run(coro) + + async def aclose(self) -> None: + await self.aflush() + if self._owns_client: + try: + await self._client.aclose() + except Exception: + logger.debug("AsyncHttpEventSink client.aclose() failed", exc_info=True) diff --git a/src/layerlens/instrument/transport/sink_otlp.py b/src/layerlens/instrument/transport/sink_otlp.py new file mode 100644 index 0000000..b2e0133 --- /dev/null +++ b/src/layerlens/instrument/transport/sink_otlp.py @@ -0,0 +1,829 @@ +"""OTLP HTTP transport sink. + +Bridges the LayerLens Instrument layer to any OTLP/HTTP-aware backend — +the OpenTelemetry Collector, Jaeger, Honeycomb, Datadog, Grafana Tempo, +New Relic, atlas-app's own ``/v1/{traces,logs,metrics}`` endpoints, etc. + +This sink mirrors the contract of :class:`HttpEventSink`: + +* Synchronous ``send`` / ``flush`` / ``close``. +* Per-signal batching with size + time thresholds. +* Exponential backoff (0.5s → 8s) on 429 / 5xx, honors ``Retry-After``. +* WARN-after-3-consecutive-drops with code ``layerlens.sink.batch_dropped``. +* Daemon timer thread bounds idle-flush latency. +* :meth:`stats` exposes per-signal ``batches_sent`` / ``batches_dropped``. + +Event-to-OTLP translation: + +* ``model.invoke`` / ``tool.call`` / ``agent.act`` → OTLP **traces** + (one ``Span`` per event, attributes lifted from payload). +* ``cost.record`` → OTLP **metrics** (one ``Sum`` data point per record). +* ``log.*`` (any event whose type starts with ``log.``) → OTLP **logs** + (one ``LogRecord`` per event). +* All other events default to traces so nothing is silently dropped. + +Endpoint resolution (per signal): + +1. Explicit per-signal kwarg (``traces_endpoint`` / ``logs_endpoint`` / + ``metrics_endpoint``) wins. +2. Otherwise ``{base_url}/v1/traces``, ``{base_url}/v1/logs``, + ``{base_url}/v1/metrics`` — matches the OTel spec for OTLP/HTTP. + +Auth: emits both ``Authorization: Bearer `` and ``X-API-Key`` when +the corresponding kwargs are set, so the same sink works against the +public OTLP collectors AND atlas-app's signed ingest endpoints. + +Content-Type: + +* ``application/x-protobuf`` (default) — wire-efficient, what every OTLP + receiver speaks natively. +* ``application/json`` — handy for human inspection of pipeline issues. + +Dependencies: requires the optional ``[otlp]`` extra +(``opentelemetry-proto>=1.27``). The sink is a hand-rolled HTTP POST of +``ExportTraceServiceRequest`` / ``ExportLogsServiceRequest`` / +``ExportMetricsServiceRequest`` protobuf messages, so it does NOT depend +on ``opentelemetry-sdk`` or any provider — installing the SDK is optional. +""" + +from __future__ import annotations + +import os +import json +import time +import logging +import threading +from typing import Any, Dict, List, Tuple, Union, Mapping, Optional, Sequence +from urllib.parse import urlparse + +import httpx + +from layerlens._version import __version__ +from layerlens.instrument.adapters._base.sinks import EventSink + +logger = logging.getLogger(__name__) + + +# --------------------------------------------------------------------------- +# Tunables (mirror sink_http.py) +# --------------------------------------------------------------------------- + +_DEFAULT_BASE_URL = os.environ.get( + "LAYERLENS_OTLP_ENDPOINT", + "http://localhost:4318", +) +_DEFAULT_TIMEOUT_S = 10.0 +_DEFAULT_MAX_BATCH = 50 +_DEFAULT_FLUSH_INTERVAL_S = 1.0 + +_MAX_RETRIES = 2 +_INITIAL_RETRY_DELAY_S = 0.5 +_MAX_RETRY_DELAY_S = 8.0 +_RETRY_STATUS_CODES = frozenset({429, 500, 502, 503, 504}) + +_CONSECUTIVE_DROP_WARN_THRESHOLD = 3 +_DROP_WARN_LOG_CODE = "layerlens.sink.batch_dropped" + +CONTENT_TYPE_PROTOBUF = "application/x-protobuf" +CONTENT_TYPE_JSON = "application/json" + + +# Event-type → OTLP signal classifier. +_TRACE_EVENT_PREFIXES: Tuple[str, ...] = ( + "model.invoke", + "model.response", + "tool.call", + "tool.result", + "agent.act", + "agent.start", + "agent.end", + "span.", +) +_METRIC_EVENT_PREFIXES: Tuple[str, ...] = ("cost.record", "metric.",) +_LOG_EVENT_PREFIXES: Tuple[str, ...] = ("log.",) + + +def _classify_event(event_type: str) -> str: + """Return ``'traces'``, ``'logs'``, or ``'metrics'`` for an event type.""" + if any(event_type.startswith(p) for p in _LOG_EVENT_PREFIXES): + return "logs" + if any(event_type.startswith(p) for p in _METRIC_EVENT_PREFIXES): + return "metrics" + if any(event_type.startswith(p) for p in _TRACE_EVENT_PREFIXES): + return "traces" + # Default unknown events to traces — never silently drop. + return "traces" + + +# --------------------------------------------------------------------------- +# Protobuf marshalling — hand-rolled via opentelemetry-proto +# --------------------------------------------------------------------------- + + +def _import_proto() -> Dict[str, Any]: + """Return the proto modules we need, raising a clear ImportError if missing.""" + try: + from opentelemetry.proto.logs.v1 import logs_pb2 + from opentelemetry.proto.trace.v1 import trace_pb2 + from opentelemetry.proto.common.v1 import common_pb2 + from opentelemetry.proto.metrics.v1 import metrics_pb2 + from opentelemetry.proto.resource.v1 import resource_pb2 + from opentelemetry.proto.collector.logs.v1 import logs_service_pb2 + from opentelemetry.proto.collector.trace.v1 import trace_service_pb2 + from opentelemetry.proto.collector.metrics.v1 import metrics_service_pb2 + except ImportError as exc: # pragma: no cover - exercised in env without extra + raise ImportError( + "OTLPHttpSink requires the optional 'otlp' extra. " + "Install with: pip install 'layerlens[otlp]'" + ) from exc + + return { + "common": common_pb2, + "logs": logs_pb2, + "trace": trace_pb2, + "metrics": metrics_pb2, + "resource": resource_pb2, + "logs_service": logs_service_pb2, + "trace_service": trace_service_pb2, + "metrics_service": metrics_service_pb2, + } + + +def _to_any_value(value: Any, common_pb2: Any) -> Any: + """Coerce a Python value into an OTLP ``AnyValue``.""" + av = common_pb2.AnyValue() + if value is None: + return av # empty AnyValue + if isinstance(value, bool): + av.bool_value = value + return av + if isinstance(value, int): + av.int_value = value + return av + if isinstance(value, float): + av.double_value = value + return av + if isinstance(value, str): + av.string_value = value + return av + if isinstance(value, (list, tuple)): + for item in value: + av.array_value.values.append(_to_any_value(item, common_pb2)) + return av + if isinstance(value, dict): + for k, v in value.items(): + kv = av.kvlist_value.values.add() + kv.key = str(k) + kv.value.CopyFrom(_to_any_value(v, common_pb2)) + return av + # Fallback: stringify. + av.string_value = repr(value) + return av + + +def _attributes( + items: Mapping[str, Any], + common_pb2: Any, +) -> List[Any]: + """Build a list of ``KeyValue`` from a Python mapping.""" + out: List[Any] = [] + for key, value in items.items(): + kv = common_pb2.KeyValue() + kv.key = key + kv.value.CopyFrom(_to_any_value(value, common_pb2)) + out.append(kv) + return out + + +def _resource( + service_name: str, + service_version: str, + extra: Optional[Mapping[str, Any]], + proto: Mapping[str, Any], +) -> Any: + res = proto["resource"].Resource() + base: Dict[str, Any] = { + "service.name": service_name, + "service.version": service_version, + "telemetry.sdk.language": "python", + "telemetry.sdk.name": "layerlens", + "telemetry.sdk.version": __version__, + } + if extra: + for k, v in extra.items(): + base[k] = v + res.attributes.extend(_attributes(base, proto["common"])) + return res + + +def _trace_id_bytes(trace_id_hex: Optional[str], counter: int) -> bytes: + """16 bytes for an OTLP trace ID; derive from hex string or counter.""" + if trace_id_hex: + try: + raw = bytes.fromhex(trace_id_hex.replace("-", "").rjust(32, "0")[:32]) + if len(raw) == 16: + return raw + except ValueError: + pass + # Deterministic synthetic trace id from counter (8 zero bytes + 8-byte counter). + return b"\x00" * 8 + counter.to_bytes(8, "big", signed=False) + + +def _span_id_bytes(counter: int) -> bytes: + """8 bytes for an OTLP span ID derived from the in-batch counter.""" + return counter.to_bytes(8, "big", signed=False) + + +def _build_traces_request( + events: Sequence[Dict[str, Any]], + service_name: str, + service_version: str, + resource_attrs: Optional[Mapping[str, Any]], + proto: Mapping[str, Any], +) -> Any: + """Compose an ``ExportTraceServiceRequest`` from buffered events.""" + request = proto["trace_service"].ExportTraceServiceRequest() + rs = request.resource_spans.add() + rs.resource.CopyFrom( + _resource(service_name, service_version, resource_attrs, proto) + ) + ss = rs.scope_spans.add() + ss.scope.name = "layerlens.instrument" + ss.scope.version = __version__ + + for idx, evt in enumerate(events, start=1): + span = ss.spans.add() + span.trace_id = _trace_id_bytes(evt.get("trace_id"), idx) + span.span_id = _span_id_bytes(idx) + span.name = str(evt.get("event_type", "unknown")) + ts_ns = int(evt.get("timestamp_ns", time.time_ns())) + span.start_time_unix_nano = ts_ns + end_ns = evt.get("end_time_unix_nano") + span.end_time_unix_nano = int(end_ns) if end_ns is not None else ts_ns + # Span kind: INTERNAL by default — adapter callers can override + # via payload["span.kind"] in the future. + span.kind = proto["trace"].Span.SpanKind.SPAN_KIND_INTERNAL + + flat_attrs: Dict[str, Any] = { + "layerlens.adapter": evt.get("adapter", ""), + "layerlens.event_type": evt.get("event_type", ""), + } + payload = evt.get("payload") or {} + if isinstance(payload, dict): + for k, v in payload.items(): + flat_attrs[str(k)] = v + span.attributes.extend(_attributes(flat_attrs, proto["common"])) + + return request + + +def _build_logs_request( + events: Sequence[Dict[str, Any]], + service_name: str, + service_version: str, + resource_attrs: Optional[Mapping[str, Any]], + proto: Mapping[str, Any], +) -> Any: + """Compose an ``ExportLogsServiceRequest`` from buffered events.""" + request = proto["logs_service"].ExportLogsServiceRequest() + rl = request.resource_logs.add() + rl.resource.CopyFrom( + _resource(service_name, service_version, resource_attrs, proto) + ) + sl = rl.scope_logs.add() + sl.scope.name = "layerlens.instrument" + sl.scope.version = __version__ + + for idx, evt in enumerate(events, start=1): + rec = sl.log_records.add() + ts_ns = int(evt.get("timestamp_ns", time.time_ns())) + rec.time_unix_nano = ts_ns + rec.observed_time_unix_nano = ts_ns + # Map log. → OTLP severity. + evt_type = str(evt.get("event_type", "log.info")) + level_token = evt_type.split(".", 1)[1] if "." in evt_type else "info" + sev_number, sev_text = _severity_for_level(level_token, proto) + rec.severity_number = sev_number + rec.severity_text = sev_text + + payload = evt.get("payload") or {} + body_str: Optional[str] = None + attr_payload: Dict[str, Any] = {} + if isinstance(payload, dict): + body_str = str(payload.get("message", "")) if "message" in payload else None + for k, v in payload.items(): + if k == "message": + continue + attr_payload[str(k)] = v + if body_str is None: + body_str = json.dumps(payload, default=str) if payload else "" + rec.body.CopyFrom(_to_any_value(body_str, proto["common"])) + + attrs = { + "layerlens.adapter": evt.get("adapter", ""), + "layerlens.event_type": evt_type, + } + attrs.update(attr_payload) + rec.attributes.extend(_attributes(attrs, proto["common"])) + rec.trace_id = _trace_id_bytes(evt.get("trace_id"), idx) + rec.span_id = _span_id_bytes(idx) + + return request + + +def _severity_for_level(level: str, proto: Mapping[str, Any]) -> Tuple[int, str]: + """Translate ``log.`` into ``(severity_number, severity_text)``.""" + sev_enum = proto["logs"].SeverityNumber + table: Dict[str, Tuple[int, str]] = { + "trace": (sev_enum.SEVERITY_NUMBER_TRACE, "TRACE"), + "debug": (sev_enum.SEVERITY_NUMBER_DEBUG, "DEBUG"), + "info": (sev_enum.SEVERITY_NUMBER_INFO, "INFO"), + "warn": (sev_enum.SEVERITY_NUMBER_WARN, "WARN"), + "warning": (sev_enum.SEVERITY_NUMBER_WARN, "WARN"), + "error": (sev_enum.SEVERITY_NUMBER_ERROR, "ERROR"), + "fatal": (sev_enum.SEVERITY_NUMBER_FATAL, "FATAL"), + "critical": (sev_enum.SEVERITY_NUMBER_FATAL, "FATAL"), + } + return table.get(level.lower(), (sev_enum.SEVERITY_NUMBER_INFO, "INFO")) + + +def _build_metrics_request( + events: Sequence[Dict[str, Any]], + service_name: str, + service_version: str, + resource_attrs: Optional[Mapping[str, Any]], + proto: Mapping[str, Any], +) -> Any: + """Compose an ``ExportMetricsServiceRequest`` from buffered events. + + Each ``cost.record`` event becomes a metric. Numeric scalar payload + fields produce a ``Sum`` data point; the metric name is derived from + the payload key (``cost_usd`` → ``layerlens.cost.cost_usd``). + """ + request = proto["metrics_service"].ExportMetricsServiceRequest() + rm = request.resource_metrics.add() + rm.resource.CopyFrom( + _resource(service_name, service_version, resource_attrs, proto) + ) + sm = rm.scope_metrics.add() + sm.scope.name = "layerlens.instrument" + sm.scope.version = __version__ + + aggregation_temporality_cumulative = proto[ + "metrics" + ].AggregationTemporality.AGGREGATION_TEMPORALITY_CUMULATIVE + + for evt in events: + ts_ns = int(evt.get("timestamp_ns", time.time_ns())) + adapter = str(evt.get("adapter", "")) + event_type = str(evt.get("event_type", "")) + payload = evt.get("payload") or {} + if not isinstance(payload, dict): + continue + for key, value in payload.items(): + if not isinstance(value, (int, float)) or isinstance(value, bool): + continue + metric = sm.metrics.add() + metric.name = f"layerlens.{event_type}.{key}".replace("..", ".") + metric.unit = "" + metric.description = f"{event_type} {key}" + data_point = metric.sum.data_points.add() + metric.sum.aggregation_temporality = aggregation_temporality_cumulative + metric.sum.is_monotonic = True + data_point.time_unix_nano = ts_ns + data_point.start_time_unix_nano = ts_ns + if isinstance(value, int): + data_point.as_int = value + else: + data_point.as_double = float(value) + data_point.attributes.extend( + _attributes( + { + "layerlens.adapter": adapter, + "layerlens.event_type": event_type, + }, + proto["common"], + ) + ) + + return request + + +def _serialize_request(request: Any, content_type: str) -> bytes: + """Serialize an export-service-request proto to wire bytes.""" + if content_type == CONTENT_TYPE_JSON: + try: + from google.protobuf.json_format import MessageToJson # type: ignore[import-untyped] + except ImportError as exc: # pragma: no cover + raise ImportError( + "JSON content-type requires google.protobuf " + "(pulled in by opentelemetry-proto)." + ) from exc + json_str: str = MessageToJson(request, preserving_proto_field_name=True) + return json_str.encode("utf-8") + raw: Any = request.SerializeToString() + return raw if isinstance(raw, bytes) else bytes(raw) + + +# --------------------------------------------------------------------------- +# HTTP send-with-retry (mirrors sink_http.py) +# --------------------------------------------------------------------------- + + +def _post_with_retry( + client: httpx.Client, + url: str, + body: bytes, + headers: Mapping[str, str], +) -> bool: + """POST ``body`` to ``url`` with backoff on 429 / 5xx (honors Retry-After).""" + delay = _INITIAL_RETRY_DELAY_S + retries_left = _MAX_RETRIES + + while True: + try: + resp = client.post(url, content=body, headers=dict(headers)) + except httpx.HTTPError as exc: + if retries_left > 0: + logger.debug( + "OTLPHttpSink transport error: %s (retries left: %d)", + exc, + retries_left, + ) + time.sleep(delay) + delay = min(delay * 2, _MAX_RETRY_DELAY_S) + retries_left -= 1 + continue + logger.debug("OTLPHttpSink giving up after transport errors", exc_info=True) + return False + + if resp.status_code in _RETRY_STATUS_CODES and retries_left > 0: + retry_after = resp.headers.get("retry-after") + try: + sleep_for = float(retry_after) if retry_after else delay + except ValueError: + sleep_for = delay + sleep_for = min(sleep_for, _MAX_RETRY_DELAY_S) + time.sleep(sleep_for) + delay = min(delay * 2, _MAX_RETRY_DELAY_S) + retries_left -= 1 + continue + + if 200 <= resp.status_code < 300: + return True + + logger.debug( + "OTLPHttpSink got non-retriable status %d body=%s", + resp.status_code, + resp.text[:500], + ) + return False + + +# --------------------------------------------------------------------------- +# OTLPHttpSink +# --------------------------------------------------------------------------- + + +class OTLPHttpSink(EventSink): + """Synchronous OTLP/HTTP sink for the LayerLens Instrument layer. + + Args: + adapter_name: Tag inserted into every event so the receiver can + attribute the source adapter. Required. + service_name: ``service.name`` resource attribute (OTel spec). + Defaults to ``adapter_name``. + service_version: ``service.version`` resource attribute. + Defaults to the SDK version. + endpoint: Base OTLP endpoint, e.g. ``http://collector:4318``. + Per-signal paths are appended as ``/v1/traces`` etc. Falls + back to ``$LAYERLENS_OTLP_ENDPOINT`` or + ``http://localhost:4318``. + traces_endpoint: Full URL override for traces. + logs_endpoint: Full URL override for logs. + metrics_endpoint: Full URL override for metrics. + bearer_token: Optional bearer token sent as + ``Authorization: Bearer ``. + api_key: Optional ``X-API-Key`` header value. + headers: Extra static headers merged into every request. + content_type: Wire format. Default ``application/x-protobuf``; + ``application/json`` is supported for human-readable + inspection. + resource_attrs: Extra OTel resource attributes (e.g. ``deployment.environment``). + max_batch: Max events per signal before forced flush. + flush_interval_s: Wall-clock idle-flush interval. + timeout_s: Per-request HTTP timeout. + client: Optional pre-built ``httpx.Client``. + background_flush: If True, spawn a daemon thread that flushes + partial buffers every ``flush_interval_s``. + + Per-signal batching keeps each OTLP export self-contained: traces, + logs, and metrics are buffered, flushed, and stat-tracked + independently. + """ + + def __init__( + self, + adapter_name: str, + *, + service_name: Optional[str] = None, + service_version: Optional[str] = None, + endpoint: Optional[str] = None, + traces_endpoint: Optional[str] = None, + logs_endpoint: Optional[str] = None, + metrics_endpoint: Optional[str] = None, + bearer_token: Optional[str] = None, + api_key: Optional[str] = None, + headers: Optional[Mapping[str, str]] = None, + content_type: str = CONTENT_TYPE_PROTOBUF, + resource_attrs: Optional[Mapping[str, Any]] = None, + max_batch: int = _DEFAULT_MAX_BATCH, + flush_interval_s: float = _DEFAULT_FLUSH_INTERVAL_S, + timeout_s: float = _DEFAULT_TIMEOUT_S, + client: Optional[httpx.Client] = None, + background_flush: bool = True, + ) -> None: + if content_type not in (CONTENT_TYPE_PROTOBUF, CONTENT_TYPE_JSON): + raise ValueError( + f"content_type must be {CONTENT_TYPE_PROTOBUF!r} or " + f"{CONTENT_TYPE_JSON!r}, got {content_type!r}" + ) + + self._adapter_name = adapter_name + self._service_name = service_name or adapter_name + self._service_version = service_version or __version__ + self._content_type = content_type + self._resource_attrs = dict(resource_attrs) if resource_attrs else None + self._max_batch = max_batch + self._flush_interval_s = flush_interval_s + + # Endpoint resolution. + base = endpoint or _DEFAULT_BASE_URL + self._endpoints: Dict[str, str] = { + "traces": traces_endpoint or _join_signal_path(base, "traces"), + "logs": logs_endpoint or _join_signal_path(base, "logs"), + "metrics": metrics_endpoint or _join_signal_path(base, "metrics"), + } + + # Auth + extra headers. + merged_headers: Dict[str, str] = { + "Content-Type": content_type, + "Accept": content_type, + } + if bearer_token: + merged_headers["Authorization"] = f"Bearer {bearer_token}" + if api_key: + merged_headers["X-API-Key"] = api_key + if headers: + for k, v in headers.items(): + merged_headers[k] = v + self._headers = merged_headers + + # Per-signal buffer + counters. + self._buffers: Dict[str, List[Dict[str, Any]]] = { + "traces": [], + "logs": [], + "metrics": [], + } + self._batches_sent: Dict[str, int] = {"traces": 0, "logs": 0, "metrics": 0} + self._batches_dropped: Dict[str, int] = {"traces": 0, "logs": 0, "metrics": 0} + self._consecutive_drops: Dict[str, int] = {"traces": 0, "logs": 0, "metrics": 0} + self._lock = threading.Lock() + self._last_flush = time.monotonic() + self._closed = False + + # Lazy proto import — only when we need to build a request. + self._proto: Optional[Dict[str, Any]] = None + + # HTTP client. + self._owns_client = client is None + if client is not None: + self._client = client + else: + self._client = httpx.Client(timeout=timeout_s) + + # Daemon flush timer. + self._stop_event = threading.Event() + self._timer_thread: Optional[threading.Thread] = None + if background_flush and flush_interval_s > 0: + self._timer_thread = threading.Thread( + target=self._timer_loop, + name=f"layerlens-otlp-sink-{adapter_name}", + daemon=True, + ) + self._timer_thread.start() + + # -- public API --------------------------------------------------------- + + def stats(self) -> Dict[str, int]: + """Snapshot of per-signal counters and total buffer size. + + Returns a flat dict with keys: + + * ``batches_sent_`` / ``batches_dropped_`` + * ``buffer_size_`` + * ``consecutive_drops_`` + * ``batches_sent`` / ``batches_dropped`` / ``buffer_size`` / + ``consecutive_drops`` (totals across all signals). + """ + with self._lock: + out: Dict[str, int] = {} + total_sent = 0 + total_dropped = 0 + total_buffer = 0 + total_consec = 0 + for signal in ("traces", "logs", "metrics"): + sent = self._batches_sent[signal] + dropped = self._batches_dropped[signal] + buf = len(self._buffers[signal]) + consec = self._consecutive_drops[signal] + out[f"batches_sent_{signal}"] = sent + out[f"batches_dropped_{signal}"] = dropped + out[f"buffer_size_{signal}"] = buf + out[f"consecutive_drops_{signal}"] = consec + total_sent += sent + total_dropped += dropped + total_buffer += buf + total_consec = max(total_consec, consec) + out["batches_sent"] = total_sent + out["batches_dropped"] = total_dropped + out["buffer_size"] = total_buffer + out["consecutive_drops"] = total_consec + return out + + def send(self, event_type: str, payload: Dict[str, Any], timestamp_ns: int) -> None: + if self._closed: + return + + signal = _classify_event(event_type) + evt: Dict[str, Any] = { + "event_type": event_type, + "payload": payload, + "timestamp_ns": timestamp_ns, + "adapter": self._adapter_name, + "trace_id": payload.get("trace_id") if isinstance(payload, dict) else None, + } + + should_flush = False + with self._lock: + self._buffers[signal].append(evt) + if len(self._buffers[signal]) >= self._max_batch: + should_flush = True + + if should_flush: + self._flush_signal(signal) + + def flush(self) -> None: + for signal in ("traces", "logs", "metrics"): + self._flush_signal(signal) + with self._lock: + self._last_flush = time.monotonic() + + def close(self) -> None: + if self._closed: + return + self._closed = True + if self._timer_thread is not None: + self._stop_event.set() + self._timer_thread.join(timeout=max(self._flush_interval_s * 2, 1.0)) + try: + self.flush() + finally: + if self._owns_client: + try: + self._client.close() + except Exception: + logger.debug("OTLPHttpSink client.close() failed", exc_info=True) + + # -- internals ---------------------------------------------------------- + + def _ensure_proto(self) -> Dict[str, Any]: + if self._proto is None: + self._proto = _import_proto() + return self._proto + + def _build_request(self, signal: str, events: Sequence[Dict[str, Any]]) -> Any: + proto = self._ensure_proto() + if signal == "traces": + return _build_traces_request( + events, + self._service_name, + self._service_version, + self._resource_attrs, + proto, + ) + if signal == "logs": + return _build_logs_request( + events, + self._service_name, + self._service_version, + self._resource_attrs, + proto, + ) + if signal == "metrics": + return _build_metrics_request( + events, + self._service_name, + self._service_version, + self._resource_attrs, + proto, + ) + raise ValueError(f"Unknown signal: {signal}") + + def _flush_signal(self, signal: str) -> None: + with self._lock: + buf = self._buffers[signal] + if not buf: + return + batch = list(buf) + buf.clear() + + try: + request = self._build_request(signal, batch) + body = _serialize_request(request, self._content_type) + except Exception: + logger.debug("OTLPHttpSink failed to build %s request", signal, exc_info=True) + with self._lock: + self._batches_dropped[signal] += 1 + self._consecutive_drops[signal] += 1 + return + + url = self._endpoints[signal] + ok = _post_with_retry(self._client, url, body, self._headers) + if ok: + with self._lock: + self._batches_sent[signal] += 1 + self._consecutive_drops[signal] = 0 + return + + with self._lock: + self._batches_dropped[signal] += 1 + self._consecutive_drops[signal] += 1 + consecutive = self._consecutive_drops[signal] + if consecutive == _CONSECUTIVE_DROP_WARN_THRESHOLD: + logger.warning( + "%s: OTLPHttpSink (%s) for adapter %s dropped %d consecutive %s " + "batches (latest had %d events). OTLP pipeline may be degraded.", + _DROP_WARN_LOG_CODE, + signal, + self._adapter_name, + consecutive, + signal, + len(batch), + ) + else: + logger.debug( + "OTLPHttpSink dropped %s batch of %d events (consecutive=%d)", + signal, + len(batch), + consecutive, + ) + + def _timer_loop(self) -> None: + while not self._stop_event.wait(self._flush_interval_s): + if self._closed: + return + try: + with self._lock: + has_data = any(self._buffers[s] for s in ("traces", "logs", "metrics")) + if has_data: + self.flush() + except Exception: + logger.debug("OTLPHttpSink timer flush failed", exc_info=True) + + +# --------------------------------------------------------------------------- +# Endpoint helpers +# --------------------------------------------------------------------------- + + +def _join_signal_path(base: str, signal: str) -> str: + """Append ``/v1/`` to ``base``, idempotent for already-pathed URLs.""" + parsed = urlparse(base) + path = parsed.path or "" + target = f"/v1/{signal}" + # If base already ends with /v1, just add the signal name. If it + # already contains /v1/, replace the last segment. + if path.rstrip("/").endswith(f"/v1/{signal}"): + return base + if path.rstrip("/").endswith("/v1"): + new_path = path.rstrip("/") + f"/{signal}" + return parsed._replace(path=new_path).geturl() + if "/v1/" in path: + # Replace the trailing signal segment. + head, _, _ = path.rpartition("/v1/") + new_path = f"{head}/v1/{signal}".replace("//", "/") + return parsed._replace(path=new_path).geturl() + new_path = path.rstrip("/") + target + return parsed._replace(path=new_path).geturl() + + +__all__ = [ + "OTLPHttpSink", + "CONTENT_TYPE_JSON", + "CONTENT_TYPE_PROTOBUF", +] + + +# Type alias for re-exports — keeps mypy strict happy on Union of dicts. +_AnyDict = Union[Dict[str, Any], Mapping[str, Any]] diff --git a/tests/instrument/test_sink_http_e2e.py b/tests/instrument/test_sink_http_e2e.py new file mode 100644 index 0000000..c87fe7c --- /dev/null +++ b/tests/instrument/test_sink_http_e2e.py @@ -0,0 +1,321 @@ +"""End-to-end tests for the HTTP transport sink. + +Spins up a real ``http.server.BaseHTTPRequestHandler`` on localhost so +the sink talks to a real HTTP server (not a mock). Verifies: + +* ``X-API-Key`` header reaches the server. +* Events are batched per the configured batch size. +* Retries with exponential backoff fire on 5xx responses. +* Non-retriable 4xx are dropped (no infinite retry). +* Async sink behaves identically over the same wire. + +This is a real round-trip — every byte traverses the loopback socket. +The test would FAIL if the sink ever stopped sending HTTP, sent the +wrong shape, or stripped the auth header. +""" + +from __future__ import annotations + +import json +import time +import asyncio +import threading +from typing import Any, Dict, List, Tuple +from http.server import HTTPServer, BaseHTTPRequestHandler + +import pytest + +from layerlens.instrument.transport.sink_http import ( + HttpEventSink, + AsyncHttpEventSink, +) + +# --------------------------------------------------------------------------- +# Local HTTP server harness +# --------------------------------------------------------------------------- + + +class _Recorder: + """Shared state across handler invocations on the test server.""" + + def __init__(self) -> None: + self.requests: List[Dict[str, Any]] = [] + # Optional response policy: list of (status, body) per call. + # If empty, return 200 OK. + self.responses: List[Tuple[int, str]] = [] + self.lock = threading.Lock() + + +def _make_handler(recorder: _Recorder) -> type: + class _Handler(BaseHTTPRequestHandler): + # Quiet the access log noise. + def log_message(self, *args: Any, **kwargs: Any) -> None: # noqa: ARG002 + pass + + def do_POST(self) -> None: # noqa: N802 - HTTP server convention + length = int(self.headers.get("Content-Length", "0")) + raw = self.rfile.read(length) if length > 0 else b"" + try: + body = json.loads(raw) if raw else {} + except json.JSONDecodeError: + body = {"_raw": raw.decode("utf-8", errors="replace")} + + with recorder.lock: + recorder.requests.append( + { + "path": self.path, + "headers": dict(self.headers), + "body": body, + } + ) + if recorder.responses: + status, response_body = recorder.responses.pop(0) + else: + status, response_body = 200, '{"ok":true}' + + self.send_response(status) + self.send_header("Content-Type", "application/json") + self.send_header("Content-Length", str(len(response_body.encode()))) + self.end_headers() + self.wfile.write(response_body.encode()) + + return _Handler + + +@pytest.fixture +def server() -> Any: + """Yield ``(base_url, recorder)`` for a freshly bound localhost server.""" + recorder = _Recorder() + handler = _make_handler(recorder) + httpd = HTTPServer(("127.0.0.1", 0), handler) + port = httpd.server_address[1] + thread = threading.Thread(target=httpd.serve_forever, daemon=True) + thread.start() + try: + yield f"http://127.0.0.1:{port}", recorder + finally: + httpd.shutdown() + thread.join(timeout=5.0) + httpd.server_close() + + +# --------------------------------------------------------------------------- +# Sync HttpEventSink +# --------------------------------------------------------------------------- + + +class TestHttpEventSinkE2E: + def test_event_round_trip(self, server: Tuple[str, _Recorder]) -> None: + base_url, recorder = server + sink = HttpEventSink( + adapter_name="openai", + api_key="test-key-abc", + base_url=base_url, + path="/telemetry/spans", + max_batch=1, # flush per event + flush_interval_s=0.0, + ) + try: + sink.send("model.invoke", {"model": "gpt-4o"}, time_ns()) + finally: + sink.close() + + assert len(recorder.requests) >= 1 + req = recorder.requests[0] + assert req["path"] == "/telemetry/spans" + assert req["headers"].get("X-API-Key") == "test-key-abc" + assert req["headers"].get("Content-Type") == "application/json" + + events = req["body"]["events"] + assert len(events) == 1 + assert events[0]["event_type"] == "model.invoke" + assert events[0]["payload"] == {"model": "gpt-4o"} + assert events[0]["adapter"] == "openai" + + def test_batching_holds_until_max_batch(self, server: Tuple[str, _Recorder]) -> None: + base_url, recorder = server + sink = HttpEventSink( + adapter_name="openai", + api_key="k", + base_url=base_url, + max_batch=3, + flush_interval_s=999.0, # disable time-based flush + ) + try: + sink.send("model.invoke", {"a": 1}, time_ns()) + sink.send("model.invoke", {"a": 2}, time_ns()) + assert len(recorder.requests) == 0 # not yet flushed + sink.send("model.invoke", {"a": 3}, time_ns()) + finally: + sink.close() + + # Two POSTs total: one for the batch of 3, and possibly one empty + # close-time flush (which we suppress). Verify the batch is one request + # of exactly 3 events. + request_with_three = next( + r for r in recorder.requests if len(r["body"]["events"]) == 3 + ) + events = request_with_three["body"]["events"] + assert [e["payload"]["a"] for e in events] == [1, 2, 3] + + def test_close_flushes_buffer(self, server: Tuple[str, _Recorder]) -> None: + base_url, recorder = server + sink = HttpEventSink( + adapter_name="openai", + api_key="k", + base_url=base_url, + max_batch=999, # no auto-flush by size + flush_interval_s=999.0, + ) + sink.send("model.invoke", {"a": 1}, time_ns()) + sink.send("model.invoke", {"a": 2}, time_ns()) + assert len(recorder.requests) == 0 # not yet flushed + + sink.close() # forces final flush + + assert any(len(r["body"]["events"]) == 2 for r in recorder.requests) + + def test_retries_on_503(self, server: Tuple[str, _Recorder]) -> None: + base_url, recorder = server + # First two responses fail with 503, third succeeds. + recorder.responses = [(503, '{"err":"down"}'), (503, '{"err":"down"}'), (200, '{"ok":true}')] + + sink = HttpEventSink( + adapter_name="openai", + api_key="k", + base_url=base_url, + max_batch=1, + flush_interval_s=0.0, + ) + try: + sink.send("model.invoke", {"a": 1}, time_ns()) + finally: + sink.close() + + # Sink retries up to MAX_RETRIES=2 (so total 3 attempts including initial). + assert len(recorder.requests) >= 2 + + def test_4xx_drops_without_retry(self, server: Tuple[str, _Recorder]) -> None: + base_url, recorder = server + recorder.responses = [(400, '{"err":"bad"}')] + + sink = HttpEventSink( + adapter_name="openai", + api_key="k", + base_url=base_url, + max_batch=1, + flush_interval_s=0.0, + ) + try: + sink.send("model.invoke", {"a": 1}, time_ns()) + finally: + sink.close() + + # Exactly one attempt — 4xx is not retried. + # (close() may attempt a final flush of empty buffer; filter to the + # actual data POST.) + data_posts = [ + r for r in recorder.requests if r["body"].get("events") + ] + assert len(data_posts) == 1 + + +# --------------------------------------------------------------------------- +# Async AsyncHttpEventSink +# --------------------------------------------------------------------------- + + +class TestHttpEventSinkStats: + def test_stats_after_successful_send(self, server: Tuple[str, _Recorder]) -> None: + base_url, _ = server + sink = HttpEventSink( + adapter_name="openai", + api_key="k", + base_url=base_url, + max_batch=1, + flush_interval_s=0.0, + background_flush=False, + ) + try: + sink.send("model.invoke", {"a": 1}, time_ns()) + finally: + sink.close() + + stats = sink.stats() + assert stats["batches_sent"] >= 1 + assert stats["batches_dropped"] == 0 + assert stats["consecutive_drops"] == 0 + + def test_stats_after_drops(self, server: Tuple[str, _Recorder]) -> None: + base_url, recorder = server + # Always-fail policy. + recorder.responses = [(503, "{}") for _ in range(20)] + + sink = HttpEventSink( + adapter_name="openai", + api_key="k", + base_url=base_url, + max_batch=1, + flush_interval_s=0.0, + background_flush=False, + ) + try: + sink.send("model.invoke", {"a": 1}, time_ns()) + sink.send("model.invoke", {"a": 2}, time_ns()) + sink.send("model.invoke", {"a": 3}, time_ns()) + finally: + sink.close() + + stats = sink.stats() + assert stats["batches_dropped"] >= 3 + + +class TestAsyncHttpEventSinkE2E: + def test_async_event_round_trip(self, server: Tuple[str, _Recorder]) -> None: + base_url, recorder = server + + async def run() -> None: + sink = AsyncHttpEventSink( + adapter_name="anthropic", + api_key="k", + base_url=base_url, + max_batch=1, + ) + try: + await sink.asend("model.invoke", {"model": "claude-sonnet-4-5-20250929"}, time_ns()) + finally: + await sink.aclose() + + asyncio.run(run()) + + data_posts = [r for r in recorder.requests if r["body"].get("events")] + assert len(data_posts) >= 1 + assert data_posts[0]["body"]["events"][0]["adapter"] == "anthropic" + assert data_posts[0]["body"]["events"][0]["payload"]["model"] == "claude-sonnet-4-5-20250929" + + def test_async_retries_on_503(self, server: Tuple[str, _Recorder]) -> None: + base_url, recorder = server + recorder.responses = [(503, "{}"), (503, "{}"), (200, '{"ok":true}')] + + async def run() -> None: + sink = AsyncHttpEventSink( + adapter_name="anthropic", + api_key="k", + base_url=base_url, + max_batch=1, + ) + try: + await sink.asend("model.invoke", {"a": 1}, time_ns()) + finally: + await sink.aclose() + + asyncio.run(run()) + # Initial + 2 retries = 3 attempts on the data path; close() may + # do an extra empty flush which is suppressed. + data_posts = [r for r in recorder.requests if r["body"].get("events")] + assert len(data_posts) >= 2 + + +def time_ns() -> int: + """Local helper for clarity in tests.""" + return time.time_ns() diff --git a/tests/instrument/transport/__init__.py b/tests/instrument/transport/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/instrument/transport/test_sink_otlp.py b/tests/instrument/transport/test_sink_otlp.py new file mode 100644 index 0000000..c3071c4 --- /dev/null +++ b/tests/instrument/transport/test_sink_otlp.py @@ -0,0 +1,732 @@ +"""Tests for the OTLP/HTTP transport sink. + +Spins up an in-process ``http.server.HTTPServer`` so the sink performs +real HTTP POSTs over loopback, and asserts on the wire bytes — every +buffered batch is flushed, encoded as protobuf (or JSON), and decoded +back into the upstream OTLP service-request messages. + +What's covered: + +* ``model.invoke`` → ``ExportTraceServiceRequest`` with + ``service.name`` / ``service.version`` resource attrs. +* ``log.error`` → ``ExportLogsServiceRequest`` with the right severity. +* ``cost.record`` → ``ExportMetricsServiceRequest`` with one metric per + numeric payload key. +* JSON content-type: server receives ``application/json`` and the body + parses as JSON. +* Retry on 5xx (initial + 2 retries = 3 total attempts). +* ``Retry-After`` header is honored on 429. +* Batching: events are held until the per-signal max_batch is reached + or :meth:`flush` / :meth:`close` is called. +* :meth:`stats` reports per-signal sent / dropped counters. +* Auth headers: bearer token AND X-API-Key both reach the wire. +""" + +from __future__ import annotations + +import json +import time +import threading +from typing import Any, Dict, List, Tuple, Optional +from http.server import HTTPServer, BaseHTTPRequestHandler + +import pytest + +from layerlens.instrument.transport.sink_otlp import ( + CONTENT_TYPE_JSON, + CONTENT_TYPE_PROTOBUF, + OTLPHttpSink, + _classify_event, + _join_signal_path, +) + +# Skip the whole module if opentelemetry-proto is not installed — these +# tests require the [otlp] extra. +pytest.importorskip("opentelemetry.proto.trace.v1.trace_pb2") + +from opentelemetry.proto.collector.logs.v1 import ( # noqa: E402 + logs_service_pb2, +) +from opentelemetry.proto.collector.trace.v1 import ( # noqa: E402 + trace_service_pb2, +) +from opentelemetry.proto.collector.metrics.v1 import ( # noqa: E402 + metrics_service_pb2, +) + +# --------------------------------------------------------------------------- +# Local HTTP server harness — same shape as test_sink_http_e2e.py +# --------------------------------------------------------------------------- + + +class _Recorder: + """Shared state captured by the test HTTP server.""" + + def __init__(self) -> None: + self.requests: List[Dict[str, Any]] = [] + # Optional response policy: list of (status, headers_dict, body) tuples. + # If empty, return 200 OK. + self.responses: List[Tuple[int, Dict[str, str], bytes]] = [] + self.lock = threading.Lock() + + +def _make_handler(recorder: _Recorder) -> type: + class _Handler(BaseHTTPRequestHandler): + def log_message(self, *args: Any, **kwargs: Any) -> None: # noqa: ARG002 + pass + + def do_POST(self) -> None: # noqa: N802 + length = int(self.headers.get("Content-Length", "0")) + raw = self.rfile.read(length) if length > 0 else b"" + + with recorder.lock: + recorder.requests.append( + { + "path": self.path, + "headers": dict(self.headers), + "body": raw, + } + ) + if recorder.responses: + status, headers, response_body = recorder.responses.pop(0) + else: + status, headers, response_body = 200, {}, b'{"ok":true}' + + self.send_response(status) + self.send_header("Content-Type", "application/json") + for k, v in headers.items(): + self.send_header(k, v) + self.send_header("Content-Length", str(len(response_body))) + self.end_headers() + self.wfile.write(response_body) + + return _Handler + + +@pytest.fixture +def server() -> Any: + recorder = _Recorder() + handler = _make_handler(recorder) + httpd = HTTPServer(("127.0.0.1", 0), handler) + port = httpd.server_address[1] + thread = threading.Thread(target=httpd.serve_forever, daemon=True) + thread.start() + try: + yield f"http://127.0.0.1:{port}", recorder + finally: + httpd.shutdown() + thread.join(timeout=5.0) + httpd.server_close() + + +def _data_posts(recorder: _Recorder, path_suffix: str) -> List[Dict[str, Any]]: + return [r for r in recorder.requests if r["path"].endswith(path_suffix)] + + +# --------------------------------------------------------------------------- +# Event classification + endpoint joining (pure helpers — no I/O) +# --------------------------------------------------------------------------- + + +class TestEventClassification: + def test_model_invoke_routes_to_traces(self) -> None: + assert _classify_event("model.invoke") == "traces" + + def test_tool_call_routes_to_traces(self) -> None: + assert _classify_event("tool.call") == "traces" + + def test_agent_act_routes_to_traces(self) -> None: + assert _classify_event("agent.act") == "traces" + + def test_cost_record_routes_to_metrics(self) -> None: + assert _classify_event("cost.record") == "metrics" + + def test_log_dot_anything_routes_to_logs(self) -> None: + assert _classify_event("log.error") == "logs" + assert _classify_event("log.info") == "logs" + + def test_unknown_defaults_to_traces(self) -> None: + assert _classify_event("custom.event") == "traces" + + +class TestEndpointJoining: + def test_appends_v1_signal(self) -> None: + assert ( + _join_signal_path("http://collector:4318", "traces") + == "http://collector:4318/v1/traces" + ) + + def test_idempotent_when_v1_signal_already_present(self) -> None: + assert ( + _join_signal_path("http://collector:4318/v1/traces", "traces") + == "http://collector:4318/v1/traces" + ) + + def test_replaces_signal_when_path_has_other_signal(self) -> None: + assert ( + _join_signal_path("http://collector:4318/v1/traces", "logs") + == "http://collector:4318/v1/logs" + ) + + def test_appends_signal_when_base_ends_with_v1(self) -> None: + assert ( + _join_signal_path("https://api.example.com/v1", "metrics") + == "https://api.example.com/v1/metrics" + ) + + +# --------------------------------------------------------------------------- +# Traces → ExportTraceServiceRequest +# --------------------------------------------------------------------------- + + +class TestTracesExport: + def test_model_invoke_event_round_trip( + self, server: Tuple[str, _Recorder] + ) -> None: + base_url, recorder = server + sink = OTLPHttpSink( + adapter_name="openai", + service_name="my-svc", + service_version="9.9.9", + endpoint=base_url, + max_batch=1, + flush_interval_s=0.0, + background_flush=False, + ) + try: + sink.send( + "model.invoke", + {"model": "gpt-4o", "tokens": 42}, + time.time_ns(), + ) + finally: + sink.close() + + posts = _data_posts(recorder, "/v1/traces") + assert len(posts) >= 1 + req = posts[0] + assert req["headers"]["Content-Type"] == CONTENT_TYPE_PROTOBUF + + # Decode the wire bytes into the upstream proto. + decoded = trace_service_pb2.ExportTraceServiceRequest() + decoded.ParseFromString(req["body"]) + + rs = decoded.resource_spans[0] + resource_attrs = {a.key: a.value.string_value for a in rs.resource.attributes} + assert resource_attrs["service.name"] == "my-svc" + assert resource_attrs["service.version"] == "9.9.9" + assert resource_attrs["telemetry.sdk.name"] == "layerlens" + assert resource_attrs["telemetry.sdk.language"] == "python" + + span = rs.scope_spans[0].spans[0] + assert span.name == "model.invoke" + assert span.start_time_unix_nano > 0 + assert span.end_time_unix_nano >= span.start_time_unix_nano + + attrs: Dict[str, Any] = {} + for kv in span.attributes: + if kv.value.HasField("string_value"): + attrs[kv.key] = kv.value.string_value + elif kv.value.HasField("int_value"): + attrs[kv.key] = kv.value.int_value + assert attrs["layerlens.adapter"] == "openai" + assert attrs["layerlens.event_type"] == "model.invoke" + assert attrs["model"] == "gpt-4o" + assert attrs["tokens"] == 42 + + def test_auth_headers_are_emitted( + self, server: Tuple[str, _Recorder] + ) -> None: + base_url, recorder = server + sink = OTLPHttpSink( + adapter_name="openai", + endpoint=base_url, + bearer_token="eyJabc", + api_key="lk_demo_123", + max_batch=1, + flush_interval_s=0.0, + background_flush=False, + ) + try: + sink.send("model.invoke", {"a": 1}, time.time_ns()) + finally: + sink.close() + + posts = _data_posts(recorder, "/v1/traces") + assert posts, "expected at least one traces POST" + # http.server normalizes header casing — match case-insensitively. + headers_ci = {k.lower(): v for k, v in posts[0]["headers"].items()} + assert headers_ci["authorization"] == "Bearer eyJabc" + assert headers_ci["x-api-key"] == "lk_demo_123" + + def test_extra_resource_attrs(self, server: Tuple[str, _Recorder]) -> None: + base_url, recorder = server + sink = OTLPHttpSink( + adapter_name="openai", + endpoint=base_url, + resource_attrs={"deployment.environment": "prod", "k8s.pod.name": "api-1"}, + max_batch=1, + flush_interval_s=0.0, + background_flush=False, + ) + try: + sink.send("model.invoke", {"a": 1}, time.time_ns()) + finally: + sink.close() + + decoded = trace_service_pb2.ExportTraceServiceRequest() + decoded.ParseFromString(_data_posts(recorder, "/v1/traces")[0]["body"]) + attrs = {a.key: a.value.string_value for a in decoded.resource_spans[0].resource.attributes} + assert attrs["deployment.environment"] == "prod" + assert attrs["k8s.pod.name"] == "api-1" + + +# --------------------------------------------------------------------------- +# Logs → ExportLogsServiceRequest +# --------------------------------------------------------------------------- + + +class TestLogsExport: + def test_log_error_event_translates_to_log_record( + self, server: Tuple[str, _Recorder] + ) -> None: + base_url, recorder = server + sink = OTLPHttpSink( + adapter_name="openai", + endpoint=base_url, + max_batch=1, + flush_interval_s=0.0, + background_flush=False, + ) + try: + sink.send( + "log.error", + {"message": "Something broke", "code": 500}, + time.time_ns(), + ) + finally: + sink.close() + + posts = _data_posts(recorder, "/v1/logs") + assert posts, "expected a logs POST" + decoded = logs_service_pb2.ExportLogsServiceRequest() + decoded.ParseFromString(posts[0]["body"]) + rec = decoded.resource_logs[0].scope_logs[0].log_records[0] + assert rec.severity_text == "ERROR" + assert rec.body.string_value == "Something broke" + + def test_log_info_default_severity(self, server: Tuple[str, _Recorder]) -> None: + base_url, recorder = server + sink = OTLPHttpSink( + adapter_name="openai", + endpoint=base_url, + max_batch=1, + flush_interval_s=0.0, + background_flush=False, + ) + try: + sink.send("log.info", {"message": "hello"}, time.time_ns()) + finally: + sink.close() + + decoded = logs_service_pb2.ExportLogsServiceRequest() + decoded.ParseFromString(_data_posts(recorder, "/v1/logs")[0]["body"]) + rec = decoded.resource_logs[0].scope_logs[0].log_records[0] + assert rec.severity_text == "INFO" + + +# --------------------------------------------------------------------------- +# Metrics → ExportMetricsServiceRequest +# --------------------------------------------------------------------------- + + +class TestMetricsExport: + def test_cost_record_emits_one_metric_per_numeric_field( + self, server: Tuple[str, _Recorder] + ) -> None: + base_url, recorder = server + sink = OTLPHttpSink( + adapter_name="openai", + endpoint=base_url, + max_batch=1, + flush_interval_s=0.0, + background_flush=False, + ) + try: + sink.send( + "cost.record", + {"cost_usd": 0.0123, "tokens": 100, "model": "gpt-4o"}, + time.time_ns(), + ) + finally: + sink.close() + + decoded = metrics_service_pb2.ExportMetricsServiceRequest() + decoded.ParseFromString(_data_posts(recorder, "/v1/metrics")[0]["body"]) + metrics = decoded.resource_metrics[0].scope_metrics[0].metrics + names = {m.name for m in metrics} + # Numeric fields produce metrics; string "model" does not. + assert "layerlens.cost.record.cost_usd" in names + assert "layerlens.cost.record.tokens" in names + assert not any("model" in n for n in names) + + # Check the actual numeric values. + by_name = {m.name: m for m in metrics} + cost_metric = by_name["layerlens.cost.record.cost_usd"] + assert cost_metric.sum.data_points[0].as_double == pytest.approx(0.0123) + + tokens_metric = by_name["layerlens.cost.record.tokens"] + assert tokens_metric.sum.data_points[0].as_int == 100 + + +# --------------------------------------------------------------------------- +# JSON content-type +# --------------------------------------------------------------------------- + + +class TestJsonContentType: + def test_json_body_is_decodable(self, server: Tuple[str, _Recorder]) -> None: + base_url, recorder = server + sink = OTLPHttpSink( + adapter_name="openai", + endpoint=base_url, + content_type=CONTENT_TYPE_JSON, + max_batch=1, + flush_interval_s=0.0, + background_flush=False, + ) + try: + sink.send("model.invoke", {"model": "gpt-4o"}, time.time_ns()) + finally: + sink.close() + + post = _data_posts(recorder, "/v1/traces")[0] + assert post["headers"]["Content-Type"] == CONTENT_TYPE_JSON + # Body should parse as JSON. + body = json.loads(post["body"].decode("utf-8")) + assert "resource_spans" in body + spans = body["resource_spans"][0]["scope_spans"][0]["spans"] + assert spans[0]["name"] == "model.invoke" + + +# --------------------------------------------------------------------------- +# Retry / backoff +# --------------------------------------------------------------------------- + + +class TestRetryBackoff: + def test_retries_on_5xx_then_succeeds( + self, server: Tuple[str, _Recorder] + ) -> None: + base_url, recorder = server + recorder.responses = [ + (503, {}, b'{"err":"down"}'), + (503, {}, b'{"err":"down"}'), + (200, {}, b'{"ok":true}'), + ] + sink = OTLPHttpSink( + adapter_name="openai", + endpoint=base_url, + max_batch=1, + flush_interval_s=0.0, + background_flush=False, + ) + try: + sink.send("model.invoke", {"a": 1}, time.time_ns()) + finally: + sink.close() + + # Initial + 2 retries = 3 traces POSTs. + assert len(_data_posts(recorder, "/v1/traces")) == 3 + stats = sink.stats() + assert stats["batches_sent_traces"] == 1 + assert stats["batches_dropped_traces"] == 0 + + def test_honors_retry_after_on_429( + self, server: Tuple[str, _Recorder] + ) -> None: + base_url, recorder = server + recorder.responses = [ + (429, {"Retry-After": "0"}, b"{}"), + (200, {}, b'{"ok":true}'), + ] + sink = OTLPHttpSink( + adapter_name="openai", + endpoint=base_url, + max_batch=1, + flush_interval_s=0.0, + background_flush=False, + ) + start = time.monotonic() + try: + sink.send("model.invoke", {"a": 1}, time.time_ns()) + finally: + sink.close() + elapsed = time.monotonic() - start + + # Retry-After: 0 should cause an immediate retry — total elapsed + # under 2 seconds (well under default backoff of 0.5s if we + # ignored Retry-After we'd still be fast, but the assertion + # that matters is the success after the 429). + assert elapsed < 5.0 + assert len(_data_posts(recorder, "/v1/traces")) == 2 + assert sink.stats()["batches_sent_traces"] == 1 + + def test_4xx_drops_without_retry(self, server: Tuple[str, _Recorder]) -> None: + base_url, recorder = server + recorder.responses = [(400, {}, b'{"err":"bad"}')] + sink = OTLPHttpSink( + adapter_name="openai", + endpoint=base_url, + max_batch=1, + flush_interval_s=0.0, + background_flush=False, + ) + try: + sink.send("model.invoke", {"a": 1}, time.time_ns()) + finally: + sink.close() + + # Exactly 1 attempt — 4xx is not retried. + assert len(_data_posts(recorder, "/v1/traces")) == 1 + assert sink.stats()["batches_dropped_traces"] == 1 + + +# --------------------------------------------------------------------------- +# Batching +# --------------------------------------------------------------------------- + + +class TestBatching: + def test_batches_until_max_batch(self, server: Tuple[str, _Recorder]) -> None: + base_url, recorder = server + sink = OTLPHttpSink( + adapter_name="openai", + endpoint=base_url, + max_batch=3, + flush_interval_s=999.0, # disable timer + background_flush=False, + ) + try: + sink.send("model.invoke", {"i": 1}, time.time_ns()) + sink.send("model.invoke", {"i": 2}, time.time_ns()) + assert _data_posts(recorder, "/v1/traces") == [] # not yet flushed + sink.send("model.invoke", {"i": 3}, time.time_ns()) + finally: + sink.close() + + posts = _data_posts(recorder, "/v1/traces") + # Exactly one POST (the close-time flush sees an empty buffer). + assert len(posts) == 1 + decoded = trace_service_pb2.ExportTraceServiceRequest() + decoded.ParseFromString(posts[0]["body"]) + spans = decoded.resource_spans[0].scope_spans[0].spans + assert len(spans) == 3 + + def test_close_flushes_partial_buffer( + self, server: Tuple[str, _Recorder] + ) -> None: + base_url, recorder = server + sink = OTLPHttpSink( + adapter_name="openai", + endpoint=base_url, + max_batch=999, # never auto-flush by size + flush_interval_s=999.0, + background_flush=False, + ) + sink.send("model.invoke", {"i": 1}, time.time_ns()) + sink.send("model.invoke", {"i": 2}, time.time_ns()) + assert _data_posts(recorder, "/v1/traces") == [] + sink.close() # force final flush + + posts = _data_posts(recorder, "/v1/traces") + assert len(posts) == 1 + decoded = trace_service_pb2.ExportTraceServiceRequest() + decoded.ParseFromString(posts[0]["body"]) + assert len(decoded.resource_spans[0].scope_spans[0].spans) == 2 + + def test_timer_thread_flushes_partial_buffer( + self, server: Tuple[str, _Recorder] + ) -> None: + base_url, recorder = server + sink = OTLPHttpSink( + adapter_name="openai", + endpoint=base_url, + max_batch=999, + flush_interval_s=0.05, + background_flush=True, + ) + try: + sink.send("model.invoke", {"i": 1}, time.time_ns()) + # Wait for the timer to fire at least once. + deadline = time.monotonic() + 2.0 + while time.monotonic() < deadline: + if _data_posts(recorder, "/v1/traces"): + break + time.sleep(0.05) + finally: + sink.close() + + assert _data_posts(recorder, "/v1/traces"), ( + "timer-driven flush did not fire within 2s" + ) + + def test_per_signal_buffers_are_independent( + self, server: Tuple[str, _Recorder] + ) -> None: + base_url, recorder = server + sink = OTLPHttpSink( + adapter_name="openai", + endpoint=base_url, + max_batch=2, # flush at 2 events per signal + flush_interval_s=999.0, + background_flush=False, + ) + try: + # 1 trace, 1 metric, 1 log — none should flush yet. + sink.send("model.invoke", {"i": 1}, time.time_ns()) + sink.send("cost.record", {"cost_usd": 0.001}, time.time_ns()) + sink.send("log.info", {"message": "hi"}, time.time_ns()) + assert _data_posts(recorder, "/v1/traces") == [] + assert _data_posts(recorder, "/v1/metrics") == [] + assert _data_posts(recorder, "/v1/logs") == [] + # Push the second trace — only traces should flush. + sink.send("model.invoke", {"i": 2}, time.time_ns()) + assert len(_data_posts(recorder, "/v1/traces")) == 1 + assert _data_posts(recorder, "/v1/metrics") == [] + assert _data_posts(recorder, "/v1/logs") == [] + finally: + sink.close() + # close() flushes the remaining metric + log. + assert len(_data_posts(recorder, "/v1/metrics")) == 1 + assert len(_data_posts(recorder, "/v1/logs")) == 1 + + +# --------------------------------------------------------------------------- +# stats() reporting +# --------------------------------------------------------------------------- + + +class TestStats: + def test_per_signal_sent_counters(self, server: Tuple[str, _Recorder]) -> None: + base_url, _ = server + sink = OTLPHttpSink( + adapter_name="openai", + endpoint=base_url, + max_batch=1, + flush_interval_s=0.0, + background_flush=False, + ) + try: + sink.send("model.invoke", {"a": 1}, time.time_ns()) + sink.send("log.info", {"message": "hi"}, time.time_ns()) + sink.send("cost.record", {"cost_usd": 0.01}, time.time_ns()) + finally: + sink.close() + + stats = sink.stats() + assert stats["batches_sent_traces"] >= 1 + assert stats["batches_sent_logs"] >= 1 + assert stats["batches_sent_metrics"] >= 1 + assert stats["batches_dropped"] == 0 + assert stats["buffer_size"] == 0 + + def test_drops_are_counted(self, server: Tuple[str, _Recorder]) -> None: + base_url, recorder = server + recorder.responses = [(503, {}, b"{}") for _ in range(20)] + sink = OTLPHttpSink( + adapter_name="openai", + endpoint=base_url, + max_batch=1, + flush_interval_s=0.0, + background_flush=False, + ) + try: + sink.send("model.invoke", {"a": 1}, time.time_ns()) + sink.send("model.invoke", {"a": 2}, time.time_ns()) + sink.send("model.invoke", {"a": 3}, time.time_ns()) + finally: + sink.close() + + stats = sink.stats() + assert stats["batches_dropped_traces"] >= 3 + assert stats["consecutive_drops"] >= 3 + + +# --------------------------------------------------------------------------- +# Per-signal endpoint overrides +# --------------------------------------------------------------------------- + + +class TestPerSignalEndpoints: + def test_per_signal_endpoint_overrides_base( + self, server: Tuple[str, _Recorder] + ) -> None: + base_url, recorder = server + # Send traces to an explicit non-standard path. + sink = OTLPHttpSink( + adapter_name="openai", + endpoint=base_url, + traces_endpoint=f"{base_url}/custom/trace-ingest", + max_batch=1, + flush_interval_s=0.0, + background_flush=False, + ) + try: + sink.send("model.invoke", {"a": 1}, time.time_ns()) + finally: + sink.close() + + assert _data_posts(recorder, "/custom/trace-ingest"), ( + "expected traces to hit the per-signal override URL" + ) + # No traffic on the default /v1/traces path. + assert _data_posts(recorder, "/v1/traces") == [] + + +# --------------------------------------------------------------------------- +# Validation +# --------------------------------------------------------------------------- + + +class TestValidation: + def test_invalid_content_type_rejected(self) -> None: + with pytest.raises(ValueError, match="content_type must be"): + OTLPHttpSink( + adapter_name="openai", + endpoint="http://localhost:4318", + content_type="application/xml", + background_flush=False, + ) + + def test_works_without_optional_headers( + self, server: Tuple[str, _Recorder] + ) -> None: + base_url, recorder = server + sink = OTLPHttpSink( + adapter_name="openai", + endpoint=base_url, + max_batch=1, + flush_interval_s=0.0, + background_flush=False, + ) + try: + sink.send("model.invoke", {"a": 1}, time.time_ns()) + finally: + sink.close() + # No auth was provided; sink must still POST successfully. + posts = _data_posts(recorder, "/v1/traces") + assert posts + headers_ci = {k.lower() for k in posts[0]["headers"]} + assert "authorization" not in headers_ci + assert "x-api-key" not in headers_ci + + +# --------------------------------------------------------------------------- +# Optional helpers used by other tests +# --------------------------------------------------------------------------- + + +def _opt(value: Optional[Any], default: Any) -> Any: + return value if value is not None else default