diff --git a/README.md b/README.md index 73d644b..c0ea61c 100644 --- a/README.md +++ b/README.md @@ -20,6 +20,7 @@ Below is the list of packages currently included in this repository. | [`packages/bub-acp-server`](./packages/bub-acp-server/README.md) | `acp-server` | Exposes Bub as an Agent Client Protocol agent with `bub acp serve` for ACP-compatible editors. | | [`packages/bub-tg-feed`](./packages/bub-tg-feed/README.md) | `tg-feed` | Provides an AMQP-based channel adapter for Telegram feed messages. | | [`packages/bub-schedule`](./packages/bub-schedule/README.md) | `schedule` | Provides scheduling channel/tools backed by APScheduler with a JSON job store. | +| [`packages/bub-tapestore-otel`](./packages/bub-tapestore-otel/README.md) | `tapestore-otel` | Wraps the active tape store and projects committed tape writes to OpenTelemetry through Logfire. | | [`packages/bub-tapestore-sqlalchemy`](./packages/bub-tapestore-sqlalchemy/README.md) | `tapestore-sqlalchemy` | Provides a SQLAlchemy-backed tape store for Bub conversation history. | | [`packages/bub-tapestore-sqlite`](./packages/bub-tapestore-sqlite/README.md) | `tapestore-sqlite` | Provides a SQLite-backed tape store for Bub conversation history. | | [`packages/bub-discord`](./packages/bub-discord/README.md) | `discord` | Provides a Discord channel adapter for Bub message IO. | diff --git a/packages/bub-tapestore-otel/README.md b/packages/bub-tapestore-otel/README.md new file mode 100644 index 0000000..4844bff --- /dev/null +++ b/packages/bub-tapestore-otel/README.md @@ -0,0 +1,77 @@ +# bub-tapestore-otel + +`bub-tapestore-otel` wraps the active Bub tape store and projects committed tape +writes to OpenTelemetry through the OTLP HTTP exporter. + +It is a transparent tape-store decorator: + +```text +Bub -> OTelTapeStore -> active TapeStore + -> OpenTelemetry / OTLP +``` + +The real tape backend can still be the builtin file store or another contrib +store such as SQLite, SQLAlchemy, or Redis. This package observes `append` and +`reset` calls after the real store succeeds, then emits best-effort spans. Export +failures are swallowed so telemetry cannot break tape persistence. + +## Configuration + +For local Phoenix: + +```bash +OTEL_EXPORTER_OTLP_TRACES_ENDPOINT=http://127.0.0.1:6006/v1/traces \ +OTEL_EXPORTER_OTLP_TRACES_PROTOCOL=http/protobuf \ +uv run bub run "hello" +``` + +For local Jaeger: + +```bash +OTEL_EXPORTER_OTLP_TRACES_ENDPOINT=http://127.0.0.1:4318/v1/traces \ +OTEL_EXPORTER_OTLP_TRACES_PROTOCOL=http/protobuf \ +uv run bub run "hello" +``` + +Plugin settings: + +| Variable | Default | Description | +| --- | --- | --- | +| `BUB_TAPESTORE_OTEL_ENABLED` | `true` | Wrap the active tape store. | +| `BUB_TAPESTORE_OTEL_SERVICE_NAME` | `bub` | OpenTelemetry `service.name` resource value. | +| `BUB_TAPESTORE_OTEL_AGENT_NAME` | `bub` | OpenTelemetry `gen_ai.agent.name` value. | + +OTLP exporter configuration stays on the standard OpenTelemetry environment +variables such as `OTEL_EXPORTER_OTLP_TRACES_ENDPOINT` and +`OTEL_EXPORTER_OTLP_TRACES_PROTOCOL`. + +The projection is tape-first and uses separate namespaces for separate +semantic sources: + +- `gen_ai.*` attributes and span names follow the current OpenTelemetry GenAI + semantic conventions. +- `bub.*` attributes describe Bub-specific runtime facts such as tape identity, + tape entry boundaries, loop step number, step duration, and the runtime tool + name Bub actually executed. +- `openinference.*`, `llm.*`, `input.*`, and `output.*` attributes are emitted + as Phoenix/OpenInference compatibility attributes so Phoenix can classify and + render Bub spans usefully. They are not OpenTelemetry semantic-convention + attributes. + +It emits these spans: + +- `invoke_agent bub` root span with `gen_ai.operation.name=invoke_agent`, + `gen_ai.agent.name=bub`, and `openinference.span.kind=AGENT` +- `bub.agent.step` framework span for each Bub loop turn, carrying custom + `bub.agent.step` and `bub.agent.step.duration_ms` attributes +- `chat ` child span with `gen_ai.operation.name=chat` and + `openinference.span.kind=LLM` +- `execute_tool ` child spans with `gen_ai.operation.name=execute_tool`, + `gen_ai.tool.call.*`, `bub.tool.*`, and `openinference.span.kind=TOOL` + +All spans include `gen_ai.conversation.id` for trace correlation. Message +content, system prompt, token usage, model/provider metadata, and tool calls are +derived from committed tape entries and exported using OTel GenAI and +OpenInference attribute names. Bub loop turns do not currently have a dedicated +OTel GenAI semantic-convention attribute, so step numbering stays in the +`bub.*` namespace. diff --git a/packages/bub-tapestore-otel/pyproject.toml b/packages/bub-tapestore-otel/pyproject.toml new file mode 100644 index 0000000..282c7d3 --- /dev/null +++ b/packages/bub-tapestore-otel/pyproject.toml @@ -0,0 +1,22 @@ +[project] +name = "bub-tapestore-otel" +version = "0.1.0" +description = "OpenTelemetry projection layer for Bub tape stores" +readme = "README.md" +authors = [ + { name = "Chojan Shang", email = "psiace@apache.org" } +] +requires-python = ">=3.12" +dependencies = [ + "bub", + "opentelemetry-exporter-otlp-proto-http>=1.39.0", + "opentelemetry-sdk>=1.39.0", + "republic>=0.5.7", +] + +[project.entry-points.bub] +tapestore-otel = "bub_tapestore_otel.plugin:OTelTapeStorePlugin" + +[build-system] +requires = ["uv_build>=0.9.7,<0.10.0"] +build-backend = "uv_build" diff --git a/packages/bub-tapestore-otel/src/bub_tapestore_otel/__init__.py b/packages/bub-tapestore-otel/src/bub_tapestore_otel/__init__.py new file mode 100644 index 0000000..5be8726 --- /dev/null +++ b/packages/bub-tapestore-otel/src/bub_tapestore_otel/__init__.py @@ -0,0 +1,5 @@ +from __future__ import annotations + +__all__ = ["OTelTapeStorePlugin"] + +from bub_tapestore_otel.plugin import OTelTapeStorePlugin diff --git a/packages/bub-tapestore-otel/src/bub_tapestore_otel/exporter.py b/packages/bub-tapestore-otel/src/bub_tapestore_otel/exporter.py new file mode 100644 index 0000000..79d44ea --- /dev/null +++ b/packages/bub-tapestore-otel/src/bub_tapestore_otel/exporter.py @@ -0,0 +1,678 @@ +from __future__ import annotations + +import hashlib +import json +import threading +from collections.abc import Callable, Iterator, Mapping +from contextlib import contextmanager +from typing import Any + +from loguru import logger +from pydantic import BaseModel, ConfigDict, Field +from republic import TapeEntry + +FORCE_FLUSH_TIMEOUT_MS = 3_000 +DEFAULT_AGENT_NAME = "bub" +TERMINAL_STEP_STATUSES = frozenset({"ok", "error", "failed", "cancelled"}) +TRACER_NAME = "bub_tapestore_otel" + + +class TapeProjectionModel(BaseModel): + model_config = ConfigDict(arbitrary_types_allowed=True, frozen=True) + + +class OTelTapeExporterSettings(TapeProjectionModel): + service_name: str = "bub" + agent_name: str = DEFAULT_AGENT_NAME + + +class OTelExporterRuntime(TapeProjectionModel): + provider: Any + tracer: Any + + +class ToolCall(TapeProjectionModel): + id: str + name: str + arguments: str + result: str | None = None + + +class TraceMessage(TapeProjectionModel): + role: str + content: str = "" + name: str | None = None + tool_call_id: str | None = None + tool_calls: tuple[ToolCall, ...] = () + + +class TraceProjection(TapeProjectionModel): + tape: str + entries: list[TapeEntry] + input_messages: list[TraceMessage] + output_messages: list[TraceMessage] + tool_calls: list[ToolCall] + output: str | None = None + provider: str | None = None + model: str | None = None + status: str | None = None + usage_input_tokens: int | None = None + usage_output_tokens: int | None = None + usage_total_tokens: int | None = None + duration_ms: int | float | None = None + + +class StepTrace(TraceProjection): + step: int + llm_attributes: dict[str, Any] = Field(default_factory=dict) + + +class TapeTrace(TraceProjection): + agent_name: str = DEFAULT_AGENT_NAME + system_prompt: str | None = None + prompt: str | None = None + steps: list[StepTrace] = Field(default_factory=list) + agent_attributes: dict[str, Any] = Field(default_factory=dict) + llm_attributes: dict[str, Any] = Field(default_factory=dict) + + +class OTelTapeExporter: + def __init__(self, settings: OTelTapeExporterSettings | None = None) -> None: + self._settings = settings or OTelTapeExporterSettings() + self._lock = threading.Lock() + self._pending: dict[str, list[TapeEntry]] = {} + self._runtime: OTelExporterRuntime | None = None + + def append(self, tape: str, entry: TapeEntry) -> None: + try: + self._append(tape, entry) + except Exception: + logger.opt(exception=True).warning("tapestore.otel.export_failed action=append tape={}", tape) + + def reset(self, tape: str) -> None: + try: + self._reset(tape) + except Exception: + logger.opt(exception=True).warning("tapestore.otel.export_failed action=reset tape={}", tape) + + def _ensure_exporter(self) -> OTelExporterRuntime: + with self._lock: + if self._runtime is None: + self._runtime = _build_otel_exporter_runtime(self._settings.service_name) + return self._runtime + + def _flush(self, runtime: OTelExporterRuntime) -> None: + runtime.provider.force_flush(timeout_millis=FORCE_FLUSH_TIMEOUT_MS) + + def _append(self, tape: str, entry: TapeEntry) -> None: + runtime = self._ensure_exporter() + batch = self._record_entry(tape, entry) + if batch is None: + return + _instrument_trace(build_tape_trace(tape, batch, agent_name=self._settings.agent_name), tracer=runtime.tracer) + self._flush(runtime) + + def _reset(self, tape: str) -> None: + runtime = self._ensure_exporter() + batch = self._pop_pending(tape) + if batch: + _instrument_trace(build_tape_trace(tape, batch, agent_name=self._settings.agent_name), tracer=runtime.tracer) + _instrument_reset(tape, tracer=runtime.tracer) + self._flush(runtime) + + def _record_entry(self, tape: str, entry: TapeEntry) -> list[TapeEntry] | None: + with self._lock: + entries = self._pending.setdefault(tape, []) + entries.append(entry) + if not _should_flush_batch(entry): + return None + return self._pending.pop(tape) + + def _pop_pending(self, tape: str) -> list[TapeEntry]: + with self._lock: + return self._pending.pop(tape, []) + + +def _build_otel_exporter_runtime(service_name: str) -> OTelExporterRuntime: + from opentelemetry.sdk.resources import Resource + from opentelemetry.sdk.trace import TracerProvider + + provider = TracerProvider(resource=Resource.create({"service.name": service_name})) + provider.add_span_processor(_build_otel_span_processor()) + return OTelExporterRuntime(provider=provider, tracer=provider.get_tracer(TRACER_NAME)) + + +def _build_otel_span_processor() -> object: + from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter + from opentelemetry.sdk.trace.export import BatchSpanProcessor + + return BatchSpanProcessor(OTLPSpanExporter()) + + +def build_tape_trace(tape: str, entries: list[TapeEntry], *, agent_name: str = DEFAULT_AGENT_NAME) -> TapeTrace: + steps = [_build_step_trace(tape, step, index) for index, step in enumerate(_split_step_entries(entries), start=1)] + prompt_tokens, completion_tokens, total_tokens = _combined_usage(entries) + fields = _trace_projection_fields(tape, entries) + fields.update( + agent_name=agent_name, + system_prompt=_first_message_content(fields["input_messages"], "system"), + prompt=_first_prompt(entries), + usage_input_tokens=prompt_tokens, + usage_output_tokens=completion_tokens, + usage_total_tokens=total_tokens, + duration_ms=_valid_duration_ms(_combined_duration_ms(entries)), + steps=steps, + ) + trace = TapeTrace(**fields) + return _with_trace_attributes(trace) + + +def _build_step_trace(tape: str, entries: list[TapeEntry], index: int) -> StepTrace: + step_data = _last_event_data(entries, "loop.step") + step = StepTrace( + **_trace_projection_fields(tape, entries), + step=_step_number(step_data, index), + ) + return step.model_copy(update={"llm_attributes": _llm_attributes(step) | _step_attributes(step)}) + + +def _trace_projection_fields(tape: str, entries: list[TapeEntry]) -> dict[str, Any]: + run_data = _last_event_data(entries, "run") + step_data = _last_event_data(entries, "loop.step") + prompt = _first_prompt(entries) + messages, tool_calls = _extract_messages_and_tools(entries) + input_messages, output_messages = _split_input_output(messages, prompt, tool_calls) + output = _output_value(output_messages, tool_calls) + prompt_tokens, completion_tokens, total_tokens = _usage(run_data) + duration_ms = step_data.get("elapsed_ms") or run_data.get("elapsed_ms") + return { + "tape": tape, + "entries": entries, + "input_messages": input_messages, + "output_messages": output_messages, + "tool_calls": tool_calls, + "output": output, + "provider": _as_text(run_data.get("provider")), + "model": _as_text(run_data.get("model")), + "status": _as_text(step_data.get("status") or run_data.get("status")), + "usage_input_tokens": prompt_tokens, + "usage_output_tokens": completion_tokens, + "usage_total_tokens": total_tokens, + "duration_ms": _valid_duration_ms(duration_ms), + } + + +def _with_trace_attributes(trace: TapeTrace) -> TapeTrace: + agent_attributes = _genai_span_attributes(trace, operation_name="invoke_agent") + if trace.agent_name: + agent_attributes["gen_ai.agent.name"] = trace.agent_name + agent_attributes.update(_bub_batch_attributes(trace)) + agent_attributes.update(_openinference_span_attributes(trace, span_kind="AGENT")) + if trace.status: + agent_attributes["bub.tape.status"] = trace.status + + return trace.model_copy(update={"agent_attributes": agent_attributes, "llm_attributes": _llm_attributes(trace)}) + + +def _llm_attributes(projection: TraceProjection) -> dict[str, Any]: + attributes = _genai_span_attributes(projection, operation_name="chat") + attributes.update(_openinference_span_attributes(projection, span_kind="LLM")) + attributes.update(_openinference_messages("llm.input_messages", projection.input_messages)) + attributes.update(_openinference_messages("llm.output_messages", projection.output_messages)) + return attributes + + +def _genai_span_attributes(projection: TraceProjection, *, operation_name: str) -> dict[str, Any]: + attributes = _genai_conversation_attributes(projection.tape) | { + "gen_ai.operation.name": operation_name, + } + if projection.model: + attributes["gen_ai.request.model"] = projection.model + attributes["gen_ai.response.model"] = projection.model + if projection.provider: + attributes["gen_ai.provider.name"] = projection.provider + attributes.update(_genai_usage_attributes(projection)) + return attributes + + +def _genai_conversation_attributes(tape: str) -> dict[str, str]: + return {"gen_ai.conversation.id": tape} + + +def _genai_usage_attributes(projection: TraceProjection) -> dict[str, int]: + attributes = { + "gen_ai.usage.input_tokens": projection.usage_input_tokens, + "gen_ai.usage.output_tokens": projection.usage_output_tokens, + } + return {name: value for name, value in attributes.items() if value is not None} + + +def _openinference_span_attributes(projection: TraceProjection, *, span_kind: str) -> dict[str, Any]: + attributes = { + "openinference.span.kind": span_kind, + "input.value": _messages_text(projection.input_messages), + "output.value": projection.output or "", + } + if projection.model: + attributes["llm.model_name"] = projection.model + if projection.provider: + attributes["llm.provider"] = projection.provider + attributes.update(_openinference_usage_attributes(projection)) + return attributes + + +def _openinference_usage_attributes(projection: TraceProjection) -> dict[str, int]: + attributes = { + "llm.token_count.prompt": projection.usage_input_tokens, + "llm.token_count.completion": projection.usage_output_tokens, + "llm.token_count.total": projection.usage_total_tokens, + } + return {name: value for name, value in attributes.items() if value is not None} + + +def _step_attributes(step: StepTrace) -> dict[str, Any]: + attributes: dict[str, Any] = { + "bub.agent.step": step.step, + "bub.tape.batch.entries": len(step.entries), + } + if step.status: + attributes["bub.tape.status"] = step.status + if step.duration_ms is not None: + attributes["bub.agent.step.duration_ms"] = step.duration_ms + return attributes + + +def _bub_batch_attributes(projection: TraceProjection) -> dict[str, int]: + return {"bub.tape.batch.entries": len(projection.entries)} + + +def _split_step_entries(entries: list[TapeEntry]) -> list[list[TapeEntry]]: + steps: list[list[TapeEntry]] = [] + current: list[TapeEntry] = [] + + for entry in entries: + current.append(entry) + if entry.kind == "event" and _entry_name(entry) == "loop.step": + steps.append(current) + current = [] + + if current and not steps: + steps.append(current) + return steps + + +def _extract_messages_and_tools(entries: list[TapeEntry]) -> tuple[list[TraceMessage], list[ToolCall]]: + messages: list[TraceMessage] = [] + pending_calls: list[ToolCall] = [] + + for entry in entries: + if entry.kind == "system": + content = _as_text(entry.payload.get("content")) + if content: + messages.append(TraceMessage(role="system", content=content)) + elif entry.kind == "message": + message = _message_entry(entry) + if message is not None: + messages.append(message) + elif entry.kind == "tool_call": + calls = [_tool_call(call, index) for index, call in enumerate(_payload_list(entry, "calls"))] + pending_calls.extend(calls) + if calls: + messages.append(TraceMessage(role="assistant", tool_calls=tuple(calls))) + elif entry.kind == "tool_result": + results = _payload_list(entry, "results") + pending_calls = _attach_tool_results(pending_calls, results) + for index, result in enumerate(results): + tool_call = pending_calls[index] if index < len(pending_calls) else None + messages.append( + TraceMessage( + role="tool", + content=_attribute_text(result), + tool_call_id=tool_call.id if tool_call else None, + ) + ) + + return messages, pending_calls + + +def _message_entry(entry: TapeEntry) -> TraceMessage | None: + role = _as_text(entry.payload.get("role")) or "assistant" + content = _as_text(entry.payload.get("content")) or "" + name = _as_text(entry.payload.get("name")) + tool_call_id = _as_text(entry.payload.get("tool_call_id")) + raw_tool_calls = entry.payload.get("tool_calls") + tool_calls = () + if isinstance(raw_tool_calls, list): + tool_calls = tuple(_tool_call(call, index) for index, call in enumerate(raw_tool_calls)) + if not content and not tool_calls: + return None + return TraceMessage(role=role, content=content, name=name, tool_call_id=tool_call_id, tool_calls=tool_calls) + + +def _split_input_output( + messages: list[TraceMessage], prompt: str | None, tool_calls: list[ToolCall] +) -> tuple[list[TraceMessage], list[TraceMessage]]: + if (last_assistant_index := _last_message_index(messages, _has_assistant_content)) is not None: + return messages[:last_assistant_index], [messages[last_assistant_index]] + + if (last_tool_call_index := _last_message_index(messages, _has_assistant_tool_call)) is not None: + return messages[:last_tool_call_index], [messages[last_tool_call_index]] + + if messages: + return messages, [] + + if prompt: + return [TraceMessage(role="user", content=prompt)], [] + if tool_calls: + return [], [TraceMessage(role="assistant", tool_calls=tuple(tool_calls))] + return [], [] + + +def _last_message_index(messages: list[TraceMessage], predicate: Callable[[TraceMessage], bool]) -> int | None: + for index in range(len(messages) - 1, -1, -1): + if predicate(messages[index]): + return index + return None + + +def _has_assistant_content(message: TraceMessage) -> bool: + return message.role == "assistant" and bool(message.content) + + +def _has_assistant_tool_call(message: TraceMessage) -> bool: + return message.role == "assistant" and bool(message.tool_calls) + + +def _tool_call(raw: Any, index: int) -> ToolCall: + call = raw if isinstance(raw, dict) else {"value": raw} + function = call.get("function") + if isinstance(function, dict): + name = function.get("name") or call.get("name") or f"tool_{index}" + arguments = function.get("arguments") or call.get("arguments") or call.get("args") or {} + else: + name = call.get("name") or call.get("tool_name") or f"tool_{index}" + arguments = call.get("arguments") or call.get("args") or call.get("input") or {} + return ToolCall( + id=str(call.get("id") or call.get("tool_call_id") or f"tool-{index}"), + name=str(name), + arguments=_attribute_text(arguments), + ) + + +def _attach_tool_results(tool_calls: list[ToolCall], results: list[Any]) -> list[ToolCall]: + if not tool_calls: + return [] + updated: list[ToolCall] = [] + for index, call in enumerate(tool_calls): + result = _attribute_text(results[index]) if index < len(results) else call.result + updated.append(ToolCall(id=call.id, name=call.name, arguments=call.arguments, result=result)) + return updated + + +def _bub_tape_attributes(tape: str, entries: list[TapeEntry]) -> dict[str, Any]: + attributes: dict[str, Any] = { + "bub.tape.name": tape, + "bub.session.hash": _session_hash(tape), + } + if entries: + attributes.update({ + "bub.tape.entry.first_id": entries[0].id, + "bub.tape.entry.last_id": entries[-1].id, + "bub.tape.entry.first_date": entries[0].date, + "bub.tape.entry.last_date": entries[-1].date, + }) + return attributes + + +def _common_attributes(tape: str, entries: list[TapeEntry]) -> dict[str, Any]: + return _genai_conversation_attributes(tape) | _bub_tape_attributes(tape, entries) + + +def _openinference_messages(prefix: str, messages: list[TraceMessage]) -> dict[str, Any]: + attributes: dict[str, Any] = {} + for index, message in enumerate(messages): + base = f"{prefix}.{index}.message" + attributes[f"{base}.role"] = message.role + if message.content: + attributes[f"{base}.content"] = message.content + if message.name: + attributes[f"{base}.name"] = message.name + if message.tool_call_id: + attributes[f"{base}.tool_call_id"] = message.tool_call_id + for call_index, call in enumerate(message.tool_calls): + call_base = f"{base}.tool_calls.{call_index}.tool_call" + attributes[f"{call_base}.id"] = call.id + attributes[f"{call_base}.function.name"] = call.name + attributes[f"{call_base}.function.arguments"] = call.arguments + return attributes + + +def _tool_span_attributes(step: StepTrace, call: ToolCall) -> dict[str, Any]: + attributes = _genai_conversation_attributes(step.tape) | _bub_tape_attributes(step.tape, step.entries) + attributes.update(_step_attributes(step)) + attributes.update({ + "openinference.span.kind": "TOOL", + "gen_ai.operation.name": "execute_tool", + "gen_ai.tool.name": call.name, + "gen_ai.tool.call.id": call.id, + "gen_ai.tool.type": "function", + "gen_ai.tool.call.arguments": call.arguments, + "bub.tool.name": call.name, + "bub.tool.call.id": call.id, + "input.mime_type": "application/json", + "input.value": call.arguments, + "output.value": call.result or "", + }) + if call.result is not None: + attributes["gen_ai.tool.call.result"] = call.result + attributes["output.mime_type"] = "application/json" + return attributes + + +def _payload_list(entry: TapeEntry, key: str) -> list[Any]: + value = entry.payload.get(key) + return value if isinstance(value, list) else [] + + +def _messages_text(messages: list[TraceMessage]) -> str: + return "\n".join(f"{message.role}: {message.content}" for message in messages if message.content) + + +def _first_message_content(messages: list[TraceMessage], role: str) -> str | None: + for message in messages: + if message.role == role and message.content: + return message.content + return None + + +def _output_value(messages: list[TraceMessage], tool_calls: list[ToolCall]) -> str | None: + content = "\n".join(message.content for message in messages if message.content) + if content: + return content + calls = [call for message in messages for call in message.tool_calls] or tool_calls + if calls: + return _compact_json([{"id": call.id, "name": call.name, "arguments": call.arguments} for call in calls]) + return None + + +def _first_prompt(entries: list[TapeEntry]) -> str | None: + for entry in entries: + data = _payload_data(entry) + prompt = data.get("prompt") + if isinstance(prompt, str): + return prompt + if isinstance(prompt, list): + return _attribute_text(prompt) + return None + + +def _last_event_data(entries: list[TapeEntry], name: str) -> dict[str, Any]: + for entry in reversed(entries): + if entry.kind == "event" and _entry_name(entry) == name: + return _payload_data(entry) + return {} + + +def _usage(data: dict[str, Any]) -> tuple[int | None, int | None, int | None]: + usage = data.get("usage") + if not isinstance(usage, dict): + return None, None, None + return ( + _int_or_none(usage.get("prompt_tokens") or usage.get("input_tokens")), + _int_or_none(usage.get("completion_tokens") or usage.get("output_tokens")), + _int_or_none(usage.get("total_tokens")), + ) + + +def _combined_usage(entries: list[TapeEntry]) -> tuple[int | None, int | None, int | None]: + totals = [0, 0, 0] + saw_usage = False + for entry in entries: + if entry.kind != "event" or _entry_name(entry) != "run": + continue + usage = _usage(_payload_data(entry)) + if all(value is None for value in usage): + continue + saw_usage = True + for index, value in enumerate(usage): + if value is not None: + totals[index] += value + if not saw_usage: + return None, None, None + return totals[0], totals[1], totals[2] + + +def _combined_duration_ms(entries: list[TapeEntry]) -> int | float | None: + total = 0 + saw_duration = False + for entry in entries: + if entry.kind != "event" or _entry_name(entry) != "loop.step": + continue + elapsed_ms = _payload_data(entry).get("elapsed_ms") + if isinstance(elapsed_ms, (int, float)) and not isinstance(elapsed_ms, bool): + total += elapsed_ms + saw_duration = True + return total if saw_duration else None + + +def _valid_duration_ms(value: object) -> int | float | None: + return value if isinstance(value, (int, float)) and not isinstance(value, bool) else None + + +def _step_number(data: dict[str, Any], fallback: int) -> int: + value = data.get("step") + return value if isinstance(value, int) and not isinstance(value, bool) else fallback + + +def _int_or_none(value: Any) -> int | None: + if isinstance(value, bool): + return None + if isinstance(value, int): + return value + if isinstance(value, float) and value.is_integer(): + return int(value) + return None + + +def _entry_name(entry: TapeEntry) -> str: + value = entry.payload.get("name") + return str(value) if value else entry.kind + + +def _payload_data(entry: TapeEntry) -> dict[str, Any]: + data = entry.payload.get("data") + return data if isinstance(data, dict) else {} + + +def _should_flush_batch(entry: TapeEntry) -> bool: + if entry.kind != "event": + return False + if _entry_name(entry) == "command": + return True + if _entry_name(entry) != "loop.step": + return False + return _is_terminal_step(entry) + + +def _is_terminal_step(entry: TapeEntry) -> bool: + status = _as_text(_payload_data(entry).get("status")) + return status in TERMINAL_STEP_STATUSES + + +def _session_hash(tape: str) -> str: + return hashlib.sha256(tape.encode("utf-8")).hexdigest()[:16] + + +def _as_text(value: Any) -> str | None: + if value is None: + return None + if isinstance(value, str): + return value + return _attribute_text(value) + + +def _attribute_text(value: Any) -> str: + if isinstance(value, str): + return value + return _compact_json(value) + + +def _compact_json(value: Any) -> str: + return json.dumps(value, ensure_ascii=False, separators=(",", ":"), default=str) + + +def _instrument_trace(trace: TapeTrace, *, tracer: Any) -> None: + from opentelemetry.trace import SpanKind + + with _otel_span(tracer, _agent_span_name(trace), kind=SpanKind.INTERNAL, attributes=trace.agent_attributes): + for step in trace.steps: + with _otel_span(tracer, "bub.agent.step", kind=SpanKind.INTERNAL, attributes=_step_span_attributes(step)): + with _otel_span(tracer, _llm_span_name(step), kind=SpanKind.CLIENT, attributes=step.llm_attributes): + pass + + for call in step.tool_calls: + with _otel_span( + tracer, + _tool_span_name(call), + kind=SpanKind.INTERNAL, + attributes=_tool_span_attributes(step, call), + ): + pass + + +def _llm_span_name(step: StepTrace) -> str: + return f"chat {step.model}" if step.model else "chat" + + +def _agent_span_name(trace: TapeTrace) -> str: + return f"invoke_agent {trace.agent_name}" if trace.agent_name else "invoke_agent" + + +def _tool_span_name(call: ToolCall) -> str: + return f"execute_tool {call.name or 'tool'}" + + +def _step_span_attributes(step: StepTrace) -> dict[str, Any]: + return _common_attributes(step.tape, step.entries) | _step_attributes(step) + + +def _instrument_reset(tape: str, *, tracer: Any) -> None: + from opentelemetry.trace import SpanKind + + with _otel_span( + tracer, + "bub.tape.reset", + kind=SpanKind.INTERNAL, + attributes={"bub.tape.name": tape, "bub.session.hash": _session_hash(tape)}, + ): + pass + + +@contextmanager +def _otel_span(tracer: Any, name: str, *, kind: object, attributes: Mapping[str, Any]) -> Iterator[None]: + with tracer.start_as_current_span(name, kind=kind, attributes=_otel_attributes(attributes)): + yield + + +def _otel_attributes(attributes: Mapping[str, Any]) -> dict[str, str | bool | int | float]: + return {name: value for name, value in attributes.items() if isinstance(value, (str, bool, int, float))} diff --git a/packages/bub-tapestore-otel/src/bub_tapestore_otel/plugin.py b/packages/bub-tapestore-otel/src/bub_tapestore_otel/plugin.py new file mode 100644 index 0000000..e364581 --- /dev/null +++ b/packages/bub-tapestore-otel/src/bub_tapestore_otel/plugin.py @@ -0,0 +1,74 @@ +from __future__ import annotations + +import contextlib +from collections.abc import AsyncIterator, Iterator +from typing import Any + +import bub +from bub import BubFramework, hookimpl +from pydantic import Field +from pydantic_settings import SettingsConfigDict + +from bub_tapestore_otel.exporter import OTelTapeExporter, OTelTapeExporterSettings +from bub_tapestore_otel.store import OTelTapeStore + +CONFIG_NAME = "tapestore-otel" + + +@bub.config(name=CONFIG_NAME) +class OTelTapeStoreSettings(bub.Settings): + model_config = SettingsConfigDict( + env_file=".env", + env_file_encoding="utf-8", + extra="ignore", + populate_by_name=True, + ) + + enabled: bool = Field(default=True, validation_alias="BUB_TAPESTORE_OTEL_ENABLED") + service_name: str = Field(default="bub", validation_alias="BUB_TAPESTORE_OTEL_SERVICE_NAME") + agent_name: str = Field(default="bub", validation_alias="BUB_TAPESTORE_OTEL_AGENT_NAME") + + +class OTelTapeStorePlugin: + def __init__(self, framework: BubFramework) -> None: + self.framework = framework + + @hookimpl(tryfirst=True) + def provide_tape_store(self) -> Any: + parent = self.framework._plugin_manager.subset_hook_caller( + "provide_tape_store", + remove_plugins=[self], + ) + store = parent() + settings = bub.ensure_config(OTelTapeStoreSettings) + if not settings.enabled: + return store + exporter = OTelTapeExporter( + OTelTapeExporterSettings( + service_name=settings.service_name, + agent_name=settings.agent_name, + ) + ) + return _wrap_store_result(store, exporter) + + +def _wrap_store_result(store: Any, exporter: OTelTapeExporter) -> Any: + if isinstance(store, AsyncIterator): + + @contextlib.asynccontextmanager + async def manager() -> AsyncIterator[OTelTapeStore]: + async with contextlib.asynccontextmanager(lambda: store)() as inner: + yield OTelTapeStore(inner, exporter) + + return manager() + + if isinstance(store, Iterator): + + @contextlib.contextmanager + def manager() -> Iterator[OTelTapeStore]: + with contextlib.contextmanager(lambda: store)() as inner: + yield OTelTapeStore(inner, exporter) + + return manager() + + return OTelTapeStore(store, exporter) diff --git a/packages/bub-tapestore-otel/src/bub_tapestore_otel/py.typed b/packages/bub-tapestore-otel/src/bub_tapestore_otel/py.typed new file mode 100644 index 0000000..8b13789 --- /dev/null +++ b/packages/bub-tapestore-otel/src/bub_tapestore_otel/py.typed @@ -0,0 +1 @@ + diff --git a/packages/bub-tapestore-otel/src/bub_tapestore_otel/store.py b/packages/bub-tapestore-otel/src/bub_tapestore_otel/store.py new file mode 100644 index 0000000..5c8be4f --- /dev/null +++ b/packages/bub-tapestore-otel/src/bub_tapestore_otel/store.py @@ -0,0 +1,53 @@ +from __future__ import annotations + +from collections.abc import Iterable +from typing import Protocol + +from loguru import logger +from republic import AsyncTapeStore, TapeEntry, TapeQuery +from republic.tape import TapeStore +from republic.tape.store import is_async_tape_store + + +class TapeExporter(Protocol): + def append(self, tape: str, entry: TapeEntry) -> None: ... + + def reset(self, tape: str) -> None: ... + + +class OTelTapeStore: + """Transparent async tape-store decorator that observes committed writes.""" + + def __init__(self, inner: TapeStore | AsyncTapeStore, exporter: TapeExporter) -> None: + self._inner = inner + self._exporter = exporter + + async def list_tapes(self) -> list[str]: + if is_async_tape_store(self._inner): + return await self._inner.list_tapes() + return self._inner.list_tapes() + + async def fetch_all(self, query: TapeQuery[AsyncTapeStore]) -> Iterable[TapeEntry]: + if is_async_tape_store(self._inner): + return await self._inner.fetch_all(query) + return self._inner.fetch_all(query) + + async def append(self, tape: str, entry: TapeEntry) -> None: + if is_async_tape_store(self._inner): + await self._inner.append(tape, entry) + else: + self._inner.append(tape, entry) + try: + self._exporter.append(tape, entry) + except Exception: + logger.opt(exception=True).warning("tapestore.otel.export_failed action=append tape={}", tape) + + async def reset(self, tape: str) -> None: + if is_async_tape_store(self._inner): + await self._inner.reset(tape) + else: + self._inner.reset(tape) + try: + self._exporter.reset(tape) + except Exception: + logger.opt(exception=True).warning("tapestore.otel.export_failed action=reset tape={}", tape) diff --git a/packages/bub-tapestore-otel/tests/test_exporter.py b/packages/bub-tapestore-otel/tests/test_exporter.py new file mode 100644 index 0000000..6f9b474 --- /dev/null +++ b/packages/bub-tapestore-otel/tests/test_exporter.py @@ -0,0 +1,212 @@ +from __future__ import annotations +from contextlib import contextmanager +from types import SimpleNamespace + +import bub_tapestore_otel.exporter as exporter +from bub_tapestore_otel.exporter import OTelTapeExporter, _instrument_trace, _should_flush_batch, build_tape_trace +from republic import TapeEntry + + +def test_build_tape_trace_exports_genai_and_openinference_llm_attributes() -> None: + entries = [ + TapeEntry.system("system rules"), + TapeEntry.message({"role": "user", "content": "say hello"}), + TapeEntry.message({"role": "assistant", "content": "hello"}), + TapeEntry.event( + "run", + data={ + "provider": "openai", + "model": "gpt-5-mini", + "usage": {"prompt_tokens": 11, "completion_tokens": 3, "total_tokens": 14}, + }, + ), + TapeEntry.event("loop.step", data={"status": "ok", "elapsed_ms": 125}), + ] + + trace = build_tape_trace("chat__1", entries) + + assert trace.agent_attributes["openinference.span.kind"] == "AGENT" + assert trace.agent_attributes["gen_ai.operation.name"] == "invoke_agent" + assert trace.agent_attributes["gen_ai.agent.name"] == "bub" + assert trace.agent_attributes["gen_ai.provider.name"] == "openai" + assert trace.agent_attributes["gen_ai.request.model"] == "gpt-5-mini" + assert trace.agent_attributes["gen_ai.conversation.id"] == "chat__1" + assert trace.agent_attributes["input.value"] == "system: system rules\nuser: say hello" + assert trace.agent_attributes["output.value"] == "hello" + + assert trace.llm_attributes["openinference.span.kind"] == "LLM" + assert trace.llm_attributes["gen_ai.operation.name"] == "chat" + assert trace.llm_attributes["gen_ai.provider.name"] == "openai" + assert trace.llm_attributes["gen_ai.request.model"] == "gpt-5-mini" + assert trace.llm_attributes["gen_ai.usage.input_tokens"] == 11 + assert trace.llm_attributes["gen_ai.usage.output_tokens"] == 3 + assert trace.llm_attributes["llm.token_count.total"] == 14 + assert trace.llm_attributes["llm.input_messages.0.message.role"] == "system" + assert trace.llm_attributes["llm.input_messages.0.message.content"] == "system rules" + assert trace.llm_attributes["llm.input_messages.1.message.role"] == "user" + assert trace.llm_attributes["llm.input_messages.1.message.content"] == "say hello" + assert trace.llm_attributes["llm.output_messages.0.message.role"] == "assistant" + assert trace.llm_attributes["llm.output_messages.0.message.content"] == "hello" + assert "gen_ai.input.messages" not in trace.llm_attributes + assert "gen_ai.output.messages" not in trace.llm_attributes + + +def test_build_tape_trace_exports_tool_calls_and_results() -> None: + entries = [ + TapeEntry.message({"role": "user", "content": "search docs"}), + TapeEntry.tool_call([{"id": "call_1", "name": "search", "arguments": {"query": "otel genai"}}]), + TapeEntry.tool_result([{"title": "OpenTelemetry GenAI"}]), + TapeEntry.event("loop.step", data={"status": "ok"}), + ] + + trace = build_tape_trace("agent__tools", entries) + + assert trace.tool_calls[0].id == "call_1" + assert trace.tool_calls[0].name == "search" + assert trace.tool_calls[0].arguments == '{"query":"otel genai"}' + assert trace.tool_calls[0].result == '{"title":"OpenTelemetry GenAI"}' + assert trace.llm_attributes["llm.output_messages.0.message.tool_calls.0.tool_call.id"] == "call_1" + assert trace.llm_attributes["llm.output_messages.0.message.tool_calls.0.tool_call.function.name"] == "search" + assert ( + trace.llm_attributes["llm.output_messages.0.message.tool_calls.0.tool_call.function.arguments"] + == '{"query":"otel genai"}' + ) + assert trace.steps[0].tool_calls[0].name == "search" + assert "llm.tools.0.tool.json_schema" not in trace.steps[0].llm_attributes + + +def test_build_tape_trace_groups_a_turn_into_steps() -> None: + entries = [ + TapeEntry.event("loop.step.start", data={"step": 1, "prompt": "first"}), + TapeEntry.message({"role": "user", "content": "first"}), + TapeEntry.tool_call([{"id": "call_1", "name": "search", "arguments": {"query": "otel"}}]), + TapeEntry.tool_result(["result"]), + TapeEntry.event( + "run", + data={ + "provider": "openai", + "model": "gpt-5-mini", + "usage": {"prompt_tokens": 10, "completion_tokens": 2, "total_tokens": 12}, + }, + ), + TapeEntry.event("loop.step", data={"step": 1, "status": "continue", "elapsed_ms": 100}), + TapeEntry.event("loop.step.start", data={"step": 2, "prompt": "second"}), + TapeEntry.message({"role": "assistant", "content": "done"}), + TapeEntry.event( + "run", + data={ + "provider": "openai", + "model": "gpt-5-mini", + "usage": {"prompt_tokens": 20, "completion_tokens": 4, "total_tokens": 24}, + }, + ), + TapeEntry.event("loop.step", data={"step": 2, "status": "ok", "elapsed_ms": 200}), + ] + + trace = build_tape_trace("agent__steps", entries) + + assert trace.usage_input_tokens == 30 + assert trace.usage_output_tokens == 6 + assert trace.usage_total_tokens == 36 + assert trace.duration_ms == 300 + assert [step.step for step in trace.steps] == [1, 2] + assert [step.status for step in trace.steps] == ["continue", "ok"] + assert trace.steps[0].tool_calls[0].name == "search" + assert trace.steps[1].output == "done" + + +def test_build_tape_trace_falls_back_to_prompt_when_messages_are_missing() -> None: + trace = build_tape_trace( + "prompt__1", + [ + TapeEntry.event("loop.step.start", data={"prompt": "plain prompt"}), + TapeEntry.event("loop.step", data={"status": "ok"}), + ], + ) + + assert trace.input_messages[0].role == "user" + assert trace.input_messages[0].content == "plain prompt" + assert trace.llm_attributes["llm.input_messages.0.message.content"] == "plain prompt" + + +def test_batch_flushes_on_completed_tape_turn_markers() -> None: + assert _should_flush_batch(TapeEntry.event("loop.step", data={"status": "ok"})) + assert _should_flush_batch(TapeEntry.event("loop.step", data={"status": "error"})) + assert _should_flush_batch(TapeEntry.event("command", data={})) + assert not _should_flush_batch(TapeEntry.event("loop.step", data={"status": "continue"})) + assert not _should_flush_batch(TapeEntry.event("loop.step.start", data={})) + + +def test_instrument_trace_nests_steps_and_tools_under_agent(monkeypatch) -> None: + spans: list[tuple[str, str | None, dict]] = [] + stack: list[str] = [] + + class FakeTracer: + @contextmanager + def start_as_current_span(self, name, **kwargs): + spans.append((name, stack[-1] if stack else None, kwargs["attributes"])) + stack.append(name) + try: + yield SimpleNamespace(get_span_context=lambda: object()) + finally: + stack.pop() + + trace = build_tape_trace( + "agent__nested", + [ + TapeEntry.message({"role": "user", "content": "search docs"}), + TapeEntry.tool_call([{"id": "call_1", "name": "search", "arguments": {"query": "otel"}}]), + TapeEntry.tool_result(["result"]), + TapeEntry.event("run", data={"provider": "openai", "model": "gpt-5-mini"}), + TapeEntry.event("loop.step", data={"step": 1, "status": "ok"}), + ], + ) + + _instrument_trace(trace, tracer=FakeTracer()) + + assert spans == [ + ("invoke_agent bub", None, trace.agent_attributes), + ("bub.agent.step", "invoke_agent bub", exporter._step_span_attributes(trace.steps[0])), + ("chat gpt-5-mini", "bub.agent.step", trace.steps[0].llm_attributes), + ( + "execute_tool search", + "bub.agent.step", + exporter._tool_span_attributes(trace.steps[0], trace.steps[0].tool_calls[0]), + ), + ] + assert spans[1][2]["bub.agent.step"] == 1 + assert spans[1][2]["gen_ai.conversation.id"] == "agent__nested" + assert spans[2][2]["bub.agent.step"] == 1 + assert spans[3][2]["gen_ai.tool.call.arguments"] == '{"query":"otel"}' + assert spans[3][2]["gen_ai.tool.call.result"] == "result" + assert spans[3][2]["bub.tool.name"] == "search" + + +def test_exporter_uses_span_processor_without_shutdown(monkeypatch) -> None: + calls: list[str] = [] + + class FakeProvider: + def force_flush(self, *, timeout_millis: int) -> None: + calls.append(f"force_flush:{timeout_millis}") + + fake_runtime = exporter.OTelExporterRuntime(provider=FakeProvider(), tracer=object()) + + monkeypatch.setattr(exporter, "_build_otel_exporter_runtime", lambda _service_name: calls.append("build_runtime") or fake_runtime) + monkeypatch.setattr( + exporter, + "_instrument_trace", + lambda _trace, *, tracer: calls.append(f"instrument_trace:{tracer is fake_runtime.tracer}"), + ) + + tape_exporter = OTelTapeExporter() + tape_exporter.append("tape-1", TapeEntry.message({"role": "user", "content": "hello"})) + tape_exporter.append("tape-1", TapeEntry.event("loop.step", data={"status": "ok"})) + tape_exporter.append("tape-2", TapeEntry.event("command", data={})) + + assert calls == [ + "build_runtime", + "instrument_trace:True", + "force_flush:3000", + "instrument_trace:True", + "force_flush:3000", + ] diff --git a/packages/bub-tapestore-otel/tests/test_plugin.py b/packages/bub-tapestore-otel/tests/test_plugin.py new file mode 100644 index 0000000..700ff88 --- /dev/null +++ b/packages/bub-tapestore-otel/tests/test_plugin.py @@ -0,0 +1,56 @@ +from __future__ import annotations + +import bub_tapestore_otel.plugin as plugin +import pluggy +from bub.hookspecs import BUB_HOOK_NAMESPACE, BubHookSpecs, hookimpl +from bub_tapestore_otel.plugin import OTelTapeStorePlugin, OTelTapeStoreSettings +from bub_tapestore_otel.store import OTelTapeStore + + +class ParentStore: + pass + + +class ParentPlugin: + @hookimpl + def provide_tape_store(self) -> ParentStore: + return ParentStore() + + +class Framework: + def __init__(self) -> None: + self._plugin_manager = pluggy.PluginManager(BUB_HOOK_NAMESPACE) + self._plugin_manager.add_hookspecs(BubHookSpecs) + + +def test_plugin_wraps_parent_tape_store(monkeypatch) -> None: + framework = Framework() + otel_plugin = OTelTapeStorePlugin(framework) # type: ignore[arg-type] + framework._plugin_manager.register(ParentPlugin(), "parent") + framework._plugin_manager.register(otel_plugin, "otel") + monkeypatch.setattr( + plugin.bub, + "ensure_config", + lambda _: OTelTapeStoreSettings(enabled=True), + ) + + store = framework._plugin_manager.hook.provide_tape_store() + + assert isinstance(store, OTelTapeStore) + assert isinstance(store._inner, ParentStore) + + +def test_plugin_can_be_disabled(monkeypatch) -> None: + framework = Framework() + otel_plugin = OTelTapeStorePlugin(framework) # type: ignore[arg-type] + framework._plugin_manager.register(ParentPlugin(), "parent") + framework._plugin_manager.register(otel_plugin, "otel") + monkeypatch.setattr( + plugin.bub, + "ensure_config", + lambda _: OTelTapeStoreSettings(enabled=False), + ) + + store = framework._plugin_manager.hook.provide_tape_store() + + assert isinstance(store, ParentStore) diff --git a/packages/bub-tapestore-otel/tests/test_store.py b/packages/bub-tapestore-otel/tests/test_store.py new file mode 100644 index 0000000..ba43563 --- /dev/null +++ b/packages/bub-tapestore-otel/tests/test_store.py @@ -0,0 +1,79 @@ +from __future__ import annotations + +from collections.abc import Iterable + +import pytest +from bub_tapestore_otel.store import OTelTapeStore +from republic import TapeEntry, TapeQuery + + +class MemoryStore: + def __init__(self) -> None: + self.entries: dict[str, list[TapeEntry]] = {} + self.resets: list[str] = [] + + def list_tapes(self) -> list[str]: + return sorted(self.entries) + + def reset(self, tape: str) -> None: + self.resets.append(tape) + self.entries[tape] = [] + + def fetch_all(self, query: TapeQuery) -> Iterable[TapeEntry]: + return list(self.entries.get(query.tape, [])) + + def append(self, tape: str, entry: TapeEntry) -> None: + self.entries.setdefault(tape, []).append(entry) + + +class Exporter: + def __init__(self) -> None: + self.appended: list[tuple[str, TapeEntry]] = [] + self.reset_tapes: list[str] = [] + + def append(self, tape: str, entry: TapeEntry) -> None: + self.appended.append((tape, entry)) + + def reset(self, tape: str) -> None: + self.reset_tapes.append(tape) + + +class FailingExporter(Exporter): + def append(self, tape: str, entry: TapeEntry) -> None: + raise RuntimeError("export failed") + + +@pytest.mark.asyncio +async def test_append_writes_inner_store_before_exporting() -> None: + inner = MemoryStore() + exporter = Exporter() + store = OTelTapeStore(inner, exporter) + entry = TapeEntry.event("loop.step", data={"status": "ok"}) + + await store.append("tape-1", entry) + + assert inner.entries == {"tape-1": [entry]} + assert exporter.appended == [("tape-1", entry)] + + +@pytest.mark.asyncio +async def test_reset_writes_inner_store_before_exporting() -> None: + inner = MemoryStore() + exporter = Exporter() + store = OTelTapeStore(inner, exporter) + + await store.reset("tape-1") + + assert inner.resets == ["tape-1"] + assert exporter.reset_tapes == ["tape-1"] + + +@pytest.mark.asyncio +async def test_export_errors_do_not_roll_back_inner_write() -> None: + inner = MemoryStore() + store = OTelTapeStore(inner, FailingExporter()) + entry = TapeEntry.event("command", data={}) + + await store.append("tape-1", entry) + + assert inner.entries == {"tape-1": [entry]} diff --git a/pyproject.toml b/pyproject.toml index 950197f..f3f0f59 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -20,6 +20,7 @@ dependencies = [ "bub-schedule", "bub-searxng-search", "bub-tapestore-sqlalchemy", + "bub-tapestore-otel", "bub-tapestore-redis", "bub-tapestore-sqlite", "bub-tg-feed", @@ -51,6 +52,7 @@ bub-qq = { workspace = true } bub-schedule = { workspace = true } bub-searxng-search = { workspace = true } bub-tapestore-sqlalchemy = { workspace = true } +bub-tapestore-otel = { workspace = true } bub-tapestore-redis = { workspace = true } bub-tapestore-sqlite = { workspace = true } bub-web-search = { workspace = true } diff --git a/uv.lock b/uv.lock index 93a30a8..4f19576 100644 --- a/uv.lock +++ b/uv.lock @@ -25,6 +25,7 @@ members = [ "bub-schedule", "bub-searxng-search", "bub-session-prompt", + "bub-tapestore-otel", "bub-tapestore-redis", "bub-tapestore-sqlalchemy", "bub-tapestore-sqlite", @@ -443,6 +444,7 @@ dependencies = [ { name = "bub-schedule" }, { name = "bub-searxng-search" }, { name = "bub-session-prompt" }, + { name = "bub-tapestore-otel" }, { name = "bub-tapestore-redis" }, { name = "bub-tapestore-sqlalchemy" }, { name = "bub-tapestore-sqlite" }, @@ -478,6 +480,7 @@ requires-dist = [ { name = "bub-schedule", editable = "packages/bub-schedule" }, { name = "bub-searxng-search", editable = "packages/bub-searxng-search" }, { name = "bub-session-prompt", editable = "packages/bub-session-prompt" }, + { name = "bub-tapestore-otel", editable = "packages/bub-tapestore-otel" }, { name = "bub-tapestore-redis", editable = "packages/bub-tapestore-redis" }, { name = "bub-tapestore-sqlalchemy", editable = "packages/bub-tapestore-sqlalchemy" }, { name = "bub-tapestore-sqlite", editable = "packages/bub-tapestore-sqlite" }, @@ -722,6 +725,25 @@ name = "bub-session-prompt" version = "0.1.0" source = { editable = "packages/bub-session-prompt" } +[[package]] +name = "bub-tapestore-otel" +version = "0.1.0" +source = { editable = "packages/bub-tapestore-otel" } +dependencies = [ + { name = "bub" }, + { name = "opentelemetry-exporter-otlp-proto-http" }, + { name = "opentelemetry-sdk" }, + { name = "republic" }, +] + +[package.metadata] +requires-dist = [ + { name = "bub", git = "https://github.com/bubbuild/bub.git" }, + { name = "opentelemetry-exporter-otlp-proto-http", specifier = ">=1.39.0" }, + { name = "opentelemetry-sdk", specifier = ">=1.39.0" }, + { name = "republic", specifier = ">=0.5.7" }, +] + [[package]] name = "bub-tapestore-redis" version = "0.1.0" @@ -1412,6 +1434,18 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/6f/d1/4adcfcb9c95e3d064c9f7aaf6cb3a4fc842d86115014b9d4094db4d465b5/google_re2-1.1.20251105-1-cp314-cp314-win_arm64.whl", hash = "sha256:1d27f3a2a947ec1f721d0f14f661108acfd4f4d34f357ce28db951cc036656e5", size = 643093, upload-time = "2025-11-05T14:58:05.761Z" }, ] +[[package]] +name = "googleapis-common-protos" +version = "1.75.0" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "protobuf" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/b5/c8/f439cffde755cffa462bfbb156278fa6f9d09119719af9814b858fd4f81f/googleapis_common_protos-1.75.0.tar.gz", hash = "sha256:53a062ff3c32552fbd62c11fe23768b78e4ddf0494d5e5fd97d3f4689c75fbbd", size = 151035, upload-time = "2026-05-07T08:04:49.423Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/e7/c8/e2645aa8ed02fd4c7a2f59d68783b65b1f3cbdfe39a6308e156509d1fee8/googleapis_common_protos-1.75.0-py3-none-any.whl", hash = "sha256:961ed60399c457ceb0ee8f285a84c870aabc9c6a832b9d37bb281b5bebde43ed", size = 300631, upload-time = "2026-05-07T08:03:30.345Z" }, +] + [[package]] name = "greenlet" version = "3.5.1" @@ -1536,6 +1570,18 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/d2/23/408243171aa9aaba178d3e2559159c24c1171a641aa83b67bdd3394ead8e/idna-3.15-py3-none-any.whl", hash = "sha256:048adeaf8c2d788c40fee287673ccaa74c24ffd8dcf09ffa555a2fbb59f10ac8", size = 72340, upload-time = "2026-05-12T22:45:55.733Z" }, ] +[[package]] +name = "importlib-metadata" +version = "8.7.1" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "zipp" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/f3/49/3b30cad09e7771a4982d9975a8cbf64f00d4a1ececb53297f1d9a7be1b10/importlib_metadata-8.7.1.tar.gz", hash = "sha256:49fef1ae6440c182052f407c8d34a68f72efc36db9ca90dc0113398f2fdde8bb", size = 57107, upload-time = "2025-12-21T10:00:19.278Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/fa/5e/f8e9a1d23b9c20a551a8a02ea3637b4642e22c2626e3a13a9a29cdea99eb/importlib_metadata-8.7.1-py3-none-any.whl", hash = "sha256:5a1f80bf1daa489495071efbb095d75a634cf28a8bc299581244063b53176151", size = 27865, upload-time = "2025-12-21T10:00:18.329Z" }, +] + [[package]] name = "iniconfig" version = "2.3.0" @@ -2061,14 +2107,84 @@ wheels = [ [[package]] name = "opentelemetry-api" -version = "1.42.0" +version = "1.41.1" source = { registry = "https://pypi.org/simple" } dependencies = [ + { name = "importlib-metadata" }, { name = "typing-extensions" }, ] -sdist = { url = "https://files.pythonhosted.org/packages/86/ca/25288069c399be6769159d9fb7b1190b603537d82aad2fa2746a0cc2c8c6/opentelemetry_api-1.42.0.tar.gz", hash = "sha256:ea84c893ad177791d138e0349d6ceebd8d3bf006440900400ce220008dafc372", size = 72300, upload-time = "2026-05-19T09:46:29.885Z" } +sdist = { url = "https://files.pythonhosted.org/packages/fa/fc/b7564cbef36601aef0d6c9bc01f7badb64be8e862c2e1c3c5c3b43b53e4f/opentelemetry_api-1.41.1.tar.gz", hash = "sha256:0ad1814d73b875f84494387dae86ce0b12c68556331ce6ce8fe789197c949621", size = 71416, upload-time = "2026-04-24T13:15:38.262Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/29/59/3e7118ed140f76b0982ba4321bdaed1997a0473f9720de2d10788a577033/opentelemetry_api-1.41.1-py3-none-any.whl", hash = "sha256:a22df900e75c76dc08440710e51f52f1aa6b451b429298896023e60db5b3139f", size = 69007, upload-time = "2026-04-24T13:15:15.662Z" }, +] + +[[package]] +name = "opentelemetry-exporter-otlp-proto-common" +version = "1.41.1" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "opentelemetry-proto" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/ae/fa/f9e3bd3c4d692b3ce9a2880a167d1f79681a1bea11f00d5bf76adc03e6ea/opentelemetry_exporter_otlp_proto_common-1.41.1.tar.gz", hash = "sha256:0e253156ea9c36b0bd3d2440c5c9ba7dd1f3fb64ba7a08fc85fbac536b56e1fb", size = 20409, upload-time = "2026-04-24T13:15:40.924Z" } wheels = [ - { url = "https://files.pythonhosted.org/packages/1b/0b/be5daf659b82b525338fde371dfcfab09b606a19bb5620c37076964710ec/opentelemetry_api-1.42.0-py3-none-any.whl", hash = "sha256:558d88f88192a973579910ef6f2c13db47a268d5ec2e53e83e50e74a39a02922", size = 61310, upload-time = "2026-05-19T09:46:06.561Z" }, + { url = "https://files.pythonhosted.org/packages/29/48/bce76d3ea772b609757e9bc844e02ab408a6446609bf74fb562062ba6b71/opentelemetry_exporter_otlp_proto_common-1.41.1-py3-none-any.whl", hash = "sha256:10da74dad6a49344b9b7b21b6182e3060373a235fde1528616d5f01f92e66aa9", size = 18366, upload-time = "2026-04-24T13:15:18.917Z" }, +] + +[[package]] +name = "opentelemetry-exporter-otlp-proto-http" +version = "1.41.1" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "googleapis-common-protos" }, + { name = "opentelemetry-api" }, + { name = "opentelemetry-exporter-otlp-proto-common" }, + { name = "opentelemetry-proto" }, + { name = "opentelemetry-sdk" }, + { name = "requests" }, + { name = "typing-extensions" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/33/5b/9d3c7f70cca10136ba82a81e738dee626c8e7fc61c6887ea9a58bf34c606/opentelemetry_exporter_otlp_proto_http-1.41.1.tar.gz", hash = "sha256:4747a9604c8550ab38c6fd6180e2fcb80de3267060bef2c306bad3cb443302bc", size = 24139, upload-time = "2026-04-24T13:15:42.977Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/ba/4d/ef07ff2fc630849f2080ae0ae73a61f67257905b7ac79066640bfa0c5739/opentelemetry_exporter_otlp_proto_http-1.41.1-py3-none-any.whl", hash = "sha256:1a21e8f49c7a946d935551e90947d6c3eb39236723c6624401da0f33d68edcb4", size = 22673, upload-time = "2026-04-24T13:15:21.313Z" }, +] + +[[package]] +name = "opentelemetry-proto" +version = "1.41.1" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "protobuf" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/99/e8/633c6d8a9c8840338b105907e55c32d3da1983abab5e52f899f72a82c3d1/opentelemetry_proto-1.41.1.tar.gz", hash = "sha256:4b9d2eb631237ea43b80e16c073af438554e32bc7e9e3f8ca4a9582f900020e5", size = 45670, upload-time = "2026-04-24T13:15:49.768Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/e4/1e/5cd77035e3e82070e2265a63a760f715aacd3cb16dddc7efee913f297fcc/opentelemetry_proto-1.41.1-py3-none-any.whl", hash = "sha256:0496713b804d127a4147e32849fbaf5683fac8ee98550e8e7679cd706c289720", size = 72076, upload-time = "2026-04-24T13:15:32.542Z" }, +] + +[[package]] +name = "opentelemetry-sdk" +version = "1.41.1" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "opentelemetry-api" }, + { name = "opentelemetry-semantic-conventions" }, + { name = "typing-extensions" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/58/d0/54ee30dab82fb0acda23d144502771ff76ef8728459c83c3e89ef9fb1825/opentelemetry_sdk-1.41.1.tar.gz", hash = "sha256:724b615e1215b5aeacda0abb8a6a8922c9a1853068948bd0bd225a56d0c792e6", size = 230180, upload-time = "2026-04-24T13:15:50.991Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/b4/e7/a1420b698aad018e1cf60fdbaaccbe49021fb415e2a0d81c242f4c518f54/opentelemetry_sdk-1.41.1-py3-none-any.whl", hash = "sha256:edee379c126c1bce952b0c812b48fe8ff35b30df0eecf17e98afa4d598b7d85d", size = 180213, upload-time = "2026-04-24T13:15:33.767Z" }, +] + +[[package]] +name = "opentelemetry-semantic-conventions" +version = "0.62b1" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "opentelemetry-api" }, + { name = "typing-extensions" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/9e/de/911ac9e309052aca1b20b2d5549d3db45d1011e1a610e552c6ccdd1b64f8/opentelemetry_semantic_conventions-0.62b1.tar.gz", hash = "sha256:c5cc6e04a7f8c7cdd30be2ed81499fa4e75bfbd52c9cb70d40af1f9cd3619802", size = 145750, upload-time = "2026-04-24T13:15:52.236Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/eb/a6/83dc2ab6fa397ee66fba04fe2e74bdf7be3b3870005359ceb7689103c058/opentelemetry_semantic_conventions-0.62b1-py3-none-any.whl", hash = "sha256:cf506938103d331fbb78eded0d9788095f7fd59016f2bda813c3324e5a74a93c", size = 231620, upload-time = "2026-04-24T13:15:35.454Z" }, ] [[package]] @@ -2265,6 +2381,21 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/3a/ed/1cdcab6ba3d6ab7feca11fc14f0eeea80755bb53ef4e892079f31b10a25f/propcache-0.5.2-py3-none-any.whl", hash = "sha256:be1ddfcbb376e3de5d2e2db1d58d6d67463e6b4f9f040c000de8e300295465fe", size = 14036, upload-time = "2026-05-08T21:02:10.673Z" }, ] +[[package]] +name = "protobuf" +version = "6.33.6" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/66/70/e908e9c5e52ef7c3a6c7902c9dfbb34c7e29c25d2f81ade3856445fd5c94/protobuf-6.33.6.tar.gz", hash = "sha256:a6768d25248312c297558af96a9f9c929e8c4cee0659cb07e780731095f38135", size = 444531, upload-time = "2026-03-18T19:05:00.988Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/fc/9f/2f509339e89cfa6f6a4c4ff50438db9ca488dec341f7e454adad60150b00/protobuf-6.33.6-cp310-abi3-win32.whl", hash = "sha256:7d29d9b65f8afef196f8334e80d6bc1d5d4adedb449971fefd3723824e6e77d3", size = 425739, upload-time = "2026-03-18T19:04:48.373Z" }, + { url = "https://files.pythonhosted.org/packages/76/5d/683efcd4798e0030c1bab27374fd13a89f7c2515fb1f3123efdfaa5eab57/protobuf-6.33.6-cp310-abi3-win_amd64.whl", hash = "sha256:0cd27b587afca21b7cfa59a74dcbd48a50f0a6400cfb59391340ad729d91d326", size = 437089, upload-time = "2026-03-18T19:04:50.381Z" }, + { url = "https://files.pythonhosted.org/packages/5c/01/a3c3ed5cd186f39e7880f8303cc51385a198a81469d53d0fdecf1f64d929/protobuf-6.33.6-cp39-abi3-macosx_10_9_universal2.whl", hash = "sha256:9720e6961b251bde64edfdab7d500725a2af5280f3f4c87e57c0208376aa8c3a", size = 427737, upload-time = "2026-03-18T19:04:51.866Z" }, + { url = "https://files.pythonhosted.org/packages/ee/90/b3c01fdec7d2f627b3a6884243ba328c1217ed2d978def5c12dc50d328a3/protobuf-6.33.6-cp39-abi3-manylinux2014_aarch64.whl", hash = "sha256:e2afbae9b8e1825e3529f88d514754e094278bb95eadc0e199751cdd9a2e82a2", size = 324610, upload-time = "2026-03-18T19:04:53.096Z" }, + { url = "https://files.pythonhosted.org/packages/9b/ca/25afc144934014700c52e05103c2421997482d561f3101ff352e1292fb81/protobuf-6.33.6-cp39-abi3-manylinux2014_s390x.whl", hash = "sha256:c96c37eec15086b79762ed265d59ab204dabc53056e3443e702d2681f4b39ce3", size = 339381, upload-time = "2026-03-18T19:04:54.616Z" }, + { url = "https://files.pythonhosted.org/packages/16/92/d1e32e3e0d894fe00b15ce28ad4944ab692713f2e7f0a99787405e43533a/protobuf-6.33.6-cp39-abi3-manylinux2014_x86_64.whl", hash = "sha256:e9db7e292e0ab79dd108d7f1a94fe31601ce1ee3f7b79e0692043423020b0593", size = 323436, upload-time = "2026-03-18T19:04:55.768Z" }, + { url = "https://files.pythonhosted.org/packages/c4/72/02445137af02769918a93807b2b7890047c32bfb9f90371cbc12688819eb/protobuf-6.33.6-py3-none-any.whl", hash = "sha256:77179e006c476e69bf8e8ce866640091ec42e1beb80b213c3900006ecfba6901", size = 170656, upload-time = "2026-03-18T19:04:59.826Z" }, +] + [[package]] name = "py-key-value-aio" version = "0.4.4" @@ -3399,3 +3530,12 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/65/a4/ba80dccd3593ff1f01051a818694d07b58cb8232677ee9a22a5a1f93a9fc/yarl-1.24.2-cp314-cp314t-win_arm64.whl", hash = "sha256:e434a45ce2e7a947f951fc5a8944c8cc080b7e59f9c50ae80fd39107cf88126d", size = 91219, upload-time = "2026-05-19T21:31:01.934Z" }, { url = "https://files.pythonhosted.org/packages/fd/4d/4b880086bd0d3e034d25647be1d830afc3e3f610e98c4ab3490af6b1b6d5/yarl-1.24.2-py3-none-any.whl", hash = "sha256:2783d9226db8797636cd6896e4de81feed252d1db72265686c9558d97a4d94b9", size = 53576, upload-time = "2026-05-19T21:31:03.909Z" }, ] + +[[package]] +name = "zipp" +version = "4.1.0" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/b9/d8/eab98a517c14134c0b2eb4e2387bc5f457334293ec5d2dd3857ec2966802/zipp-4.1.0.tar.gz", hash = "sha256:4cb57381f544315db7688e976e922a2b18cdb513d21cc194eb42232ba2a3e602", size = 26214, upload-time = "2026-05-18T20:08:57.967Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/3a/13/547360d81e6d88d58492968ffda9f9542854f11310ee556fef14260cc886/zipp-4.1.0-py3-none-any.whl", hash = "sha256:25ad4e16390cd314347dd8f1de67a2ac538ae658ed4ab9db16029c07c188e97f", size = 10238, upload-time = "2026-05-18T20:08:57.045Z" }, +]