Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 15 additions & 1 deletion src/agentex/lib/core/tracing/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,19 @@
from agentex.types.span import Span
from agentex.lib.core.tracing.trace import Trace, AsyncTrace
from agentex.lib.core.tracing.tracer import Tracer, AsyncTracer
from agentex.lib.core.tracing.span_queue import (
AsyncSpanQueue,
get_default_span_queue,
shutdown_default_span_queue,
)

__all__ = ["Trace", "AsyncTrace", "Span", "Tracer", "AsyncTracer"]
__all__ = [
"Trace",
"AsyncTrace",
"Span",
"Tracer",
"AsyncTracer",
"AsyncSpanQueue",
"get_default_span_queue",
"shutdown_default_span_queue",
]
111 changes: 111 additions & 0 deletions src/agentex/lib/core/tracing/span_queue.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
from __future__ import annotations

import asyncio
from enum import Enum
from dataclasses import dataclass

from agentex.types.span import Span
from agentex.lib.utils.logging import make_logger
from agentex.lib.core.tracing.processors.tracing_processor_interface import (
AsyncTracingProcessor,
)

logger = make_logger(__name__)


class SpanEventType(str, Enum):
START = "start"
END = "end"


@dataclass
class _SpanQueueItem:
event_type: SpanEventType
span: Span
processors: list[AsyncTracingProcessor]


class AsyncSpanQueue:
"""Background FIFO queue for async span processing.

Span events are enqueued synchronously (non-blocking) and processed
sequentially by a background drain task. This keeps tracing HTTP calls
off the critical request path while preserving start-before-end ordering.
"""

def __init__(self) -> None:
self._queue: asyncio.Queue[_SpanQueueItem] = asyncio.Queue()
self._drain_task: asyncio.Task[None] | None = None
self._stopping = False

def enqueue(
self,
event_type: SpanEventType,
span: Span,
processors: list[AsyncTracingProcessor],
) -> None:
if self._stopping:
logger.warning("Span queue is shutting down, dropping %s event for span %s", event_type.value, span.id)
return
self._ensure_drain_running()
self._queue.put_nowait(_SpanQueueItem(event_type=event_type, span=span, processors=processors))

def _ensure_drain_running(self) -> None:
if self._drain_task is None or self._drain_task.done():
self._drain_task = asyncio.create_task(self._drain_loop())

async def _drain_loop(self) -> None:
while True:
item = await self._queue.get()
try:
if item.event_type == SpanEventType.START:
coros = [p.on_span_start(item.span) for p in item.processors]
else:
coros = [p.on_span_end(item.span) for p in item.processors]
results = await asyncio.gather(*coros, return_exceptions=True)
for result in results:
if isinstance(result, Exception):
logger.error(
"Tracing processor error during %s for span %s",
item.event_type.value,
item.span.id,
exc_info=result,
)
except Exception:
logger.exception("Unexpected error in span queue drain loop for span %s", item.span.id)
finally:
self._queue.task_done()

async def shutdown(self, timeout: float = 30.0) -> None:
self._stopping = True
if self._queue.empty() and (self._drain_task is None or self._drain_task.done()):
return
try:
await asyncio.wait_for(self._queue.join(), timeout=timeout)
except asyncio.TimeoutError:
logger.warning(
"Span queue shutdown timed out after %.1fs with %d items remaining", timeout, self._queue.qsize()
)
if self._drain_task is not None and not self._drain_task.done():
self._drain_task.cancel()
try:
await self._drain_task
except asyncio.CancelledError:
pass


_default_span_queue: AsyncSpanQueue | None = None


def get_default_span_queue() -> AsyncSpanQueue:
global _default_span_queue
if _default_span_queue is None:
_default_span_queue = AsyncSpanQueue()
return _default_span_queue


async def shutdown_default_span_queue(timeout: float = 30.0) -> None:
global _default_span_queue
if _default_span_queue is not None:
await _default_span_queue.shutdown(timeout=timeout)
_default_span_queue = None
17 changes: 10 additions & 7 deletions src/agentex/lib/core/tracing/trace.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
from __future__ import annotations

import uuid
import asyncio
from typing import Any, AsyncGenerator
from datetime import UTC, datetime
from contextlib import contextmanager, asynccontextmanager
Expand All @@ -12,6 +11,11 @@
from agentex.types.span import Span
from agentex.lib.utils.logging import make_logger
from agentex.lib.utils.model_utils import recursive_model_dump
from agentex.lib.core.tracing.span_queue import (
SpanEventType,
AsyncSpanQueue,
get_default_span_queue,
)
from agentex.lib.core.tracing.processors.tracing_processor_interface import (
SyncTracingProcessor,
AsyncTracingProcessor,
Expand Down Expand Up @@ -173,17 +177,20 @@ def __init__(
processors: list[AsyncTracingProcessor],
client: AsyncAgentex,
trace_id: str | None = None,
span_queue: AsyncSpanQueue | None = None,
):
"""
Initialize a new trace with the specified trace ID.

Args:
trace_id: Required trace ID to use for this trace.
processors: Optional list of tracing processors to use for this trace.
span_queue: Optional span queue for background processing.
"""
self.processors = processors
self.client = client
self.trace_id = trace_id
self._span_queue = span_queue or get_default_span_queue()

async def start_span(
self,
Expand Down Expand Up @@ -225,9 +232,7 @@ async def start_span(
)

if self.processors:
await asyncio.gather(
*[processor.on_span_start(span) for processor in self.processors]
)
self._span_queue.enqueue(SpanEventType.START, span.model_copy(deep=True), self.processors)

return span

Expand All @@ -252,9 +257,7 @@ async def end_span(
span.data = recursive_model_dump(span.data) if span.data else None

if self.processors:
await asyncio.gather(
*[processor.on_span_end(span) for processor in self.processors]
)
self._span_queue.enqueue(SpanEventType.END, span.model_copy(deep=True), self.processors)

return span

Expand Down
5 changes: 4 additions & 1 deletion src/agentex/lib/core/tracing/tracer.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

from agentex import Agentex, AsyncAgentex
from agentex.lib.core.tracing.trace import Trace, AsyncTrace
from agentex.lib.core.tracing.span_queue import AsyncSpanQueue
from agentex.lib.core.tracing.tracing_processor_manager import (
get_sync_tracing_processors,
get_async_tracing_processors,
Expand Down Expand Up @@ -55,12 +56,13 @@ def __init__(self, client: AsyncAgentex):
"""
self.client = client

def trace(self, trace_id: str | None = None) -> AsyncTrace:
def trace(self, trace_id: str | None = None, span_queue: AsyncSpanQueue | None = None) -> AsyncTrace:
"""
Create a new trace with the given trace ID.

Args:
trace_id: The trace ID to use.
span_queue: Optional span queue for background processing.

Returns:
A new AsyncTrace instance.
Expand All @@ -69,4 +71,5 @@ def trace(self, trace_id: str | None = None) -> AsyncTrace:
processors=get_async_tracing_processors(),
client=self.client,
trace_id=trace_id,
span_queue=span_queue,
)
6 changes: 5 additions & 1 deletion src/agentex/lib/sdk/fastacp/base/base_acp_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
from agentex.lib.environment_variables import EnvironmentVariables, refreshed_environment_variables
from agentex.types.task_message_update import TaskMessageUpdate, StreamTaskMessageFull
from agentex.types.task_message_content import TaskMessageContent
from agentex.lib.core.tracing.span_queue import shutdown_default_span_queue
from agentex.lib.sdk.fastacp.base.constants import (
FASTACP_HEADER_SKIP_EXACT,
FASTACP_HEADER_SKIP_PREFIXES,
Expand Down Expand Up @@ -103,7 +104,10 @@ async def lifespan_context(app: FastAPI): # noqa: ARG001
else:
logger.warning("AGENTEX_BASE_URL not set, skipping agent registration")

yield
try:
yield
finally:
await shutdown_default_span_queue()

return lifespan_context

Expand Down
Loading
Loading