From 97bcaf49d8c109b3b84225023376c90d5d3b27d5 Mon Sep 17 00:00:00 2001 From: Stas Moreinis Date: Thu, 2 Apr 2026 11:57:06 -0700 Subject: [PATCH 1/5] fix(tracing): Fix memory leak in SGP tracing processors SGPSyncTracingProcessor and SGPAsyncTracingProcessor accumulated spans in self._spans dict on every request but never removed them, since on_span_end() used dict.get() (read-only) instead of dict.pop() (read-and-remove). The only cleanup was in shutdown() which is never called. After this fix, spans are removed from the dict when they complete, preventing unbounded memory growth. --- .../processors/sgp_tracing_processor.py | 14 +- tests/lib/core/__init__.py | 0 tests/lib/core/tracing/__init__.py | 0 tests/lib/core/tracing/processors/__init__.py | 0 .../processors/test_sgp_tracing_processor.py | 158 ++++++++++++++++++ 5 files changed, 163 insertions(+), 9 deletions(-) create mode 100644 tests/lib/core/__init__.py create mode 100644 tests/lib/core/tracing/__init__.py create mode 100644 tests/lib/core/tracing/processors/__init__.py create mode 100644 tests/lib/core/tracing/processors/test_sgp_tracing_processor.py diff --git a/src/agentex/lib/core/tracing/processors/sgp_tracing_processor.py b/src/agentex/lib/core/tracing/processors/sgp_tracing_processor.py index b0c2a213a..2f94e7f87 100644 --- a/src/agentex/lib/core/tracing/processors/sgp_tracing_processor.py +++ b/src/agentex/lib/core/tracing/processors/sgp_tracing_processor.py @@ -30,7 +30,7 @@ def __init__(self, config: SGPTracingProcessorConfig): disabled = config.sgp_api_key == "" or config.sgp_account_id == "" tracing.init( SGPClient( - api_key=config.sgp_api_key, + api_key=config.sgp_api_key, account_id=config.sgp_account_id, base_url=config.sgp_base_url, ), @@ -72,11 +72,9 @@ def on_span_start(self, span: Span) -> None: @override def on_span_end(self, span: Span) -> None: - sgp_span = self._spans.get(span.id) + sgp_span = self._spans.pop(span.id, None) if sgp_span is None: - logger.warning( - f"Span {span.id} not found in stored spans, skipping span end" - ) + logger.warning(f"Span {span.id} not found in stored spans, skipping span end") return self._add_source_to_span(span) @@ -151,11 +149,9 @@ async def on_span_start(self, span: Span) -> None: @override async def on_span_end(self, span: Span) -> None: - sgp_span = self._spans.get(span.id) + sgp_span = self._spans.pop(span.id, None) if sgp_span is None: - logger.warning( - f"Span {span.id} not found in stored spans, skipping span end" - ) + logger.warning(f"Span {span.id} not found in stored spans, skipping span end") return self._add_source_to_span(span) diff --git a/tests/lib/core/__init__.py b/tests/lib/core/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/tests/lib/core/tracing/__init__.py b/tests/lib/core/tracing/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/tests/lib/core/tracing/processors/__init__.py b/tests/lib/core/tracing/processors/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/tests/lib/core/tracing/processors/test_sgp_tracing_processor.py b/tests/lib/core/tracing/processors/test_sgp_tracing_processor.py new file mode 100644 index 000000000..793bf2495 --- /dev/null +++ b/tests/lib/core/tracing/processors/test_sgp_tracing_processor.py @@ -0,0 +1,158 @@ +from __future__ import annotations + +import uuid +from datetime import UTC, datetime +from unittest.mock import AsyncMock, MagicMock, patch + +from agentex.types.span import Span +from agentex.lib.types.tracing import SGPTracingProcessorConfig + +MODULE = "agentex.lib.core.tracing.processors.sgp_tracing_processor" + + +def _make_config() -> SGPTracingProcessorConfig: + return SGPTracingProcessorConfig( + sgp_api_key="test-key", + sgp_account_id="test-account", + ) + + +def _make_span(span_id: str | None = None) -> Span: + return Span( + id=span_id or str(uuid.uuid4()), + name="test-span", + start_time=datetime.now(UTC), + trace_id="trace-1", + ) + + +def _make_mock_sgp_span() -> MagicMock: + sgp_span = MagicMock() + sgp_span.to_request_params.return_value = {"mock": "params"} + sgp_span.start_time = None + sgp_span.end_time = None + sgp_span.output = None + sgp_span.metadata = None + return sgp_span + + +# --------------------------------------------------------------------------- +# Sync processor tests +# --------------------------------------------------------------------------- + + +class TestSGPSyncTracingProcessorMemoryLeak: + @patch(f"{MODULE}.EnvironmentVariables") + @patch(f"{MODULE}.flush_queue") + @patch(f"{MODULE}.create_span") + @patch(f"{MODULE}.SGPClient") + @patch(f"{MODULE}.tracing") + def _make_processor(self, mock_tracing, mock_sgp_client, mock_create_span, mock_flush, mock_env): + mock_env.refresh.return_value = MagicMock(ACP_TYPE=None, AGENT_NAME=None, AGENT_ID=None) + mock_create_span.side_effect = lambda **kwargs: _make_mock_sgp_span() + + from agentex.lib.core.tracing.processors.sgp_tracing_processor import ( + SGPSyncTracingProcessor, + ) + + return SGPSyncTracingProcessor(_make_config()), mock_create_span + + def test_spans_not_leaked_after_completed_lifecycle(self): + processor, _ = self._make_processor() + + for _ in range(100): + span = _make_span() + processor.on_span_start(span) + span.end_time = datetime.now(UTC) + processor.on_span_end(span) + + assert len(processor._spans) == 0, ( + f"Expected 0 spans after 100 complete lifecycles, got {len(processor._spans)} — memory leak!" + ) + + def test_spans_present_during_active_lifecycle(self): + processor, _ = self._make_processor() + + span = _make_span() + processor.on_span_start(span) + assert len(processor._spans) == 1, "Span should be tracked while active" + + span.end_time = datetime.now(UTC) + processor.on_span_end(span) + assert len(processor._spans) == 0, "Span should be removed after end" + + def test_span_end_for_unknown_span_is_noop(self): + processor, _ = self._make_processor() + + span = _make_span() + # End a span that was never started — should not raise + span.end_time = datetime.now(UTC) + processor.on_span_end(span) + + assert len(processor._spans) == 0 + + +# --------------------------------------------------------------------------- +# Async processor tests +# --------------------------------------------------------------------------- + + +class TestSGPAsyncTracingProcessorMemoryLeak: + @staticmethod + def _make_processor(): + mock_env = MagicMock() + mock_env.refresh.return_value = MagicMock(ACP_TYPE=None, AGENT_NAME=None, AGENT_ID=None) + mock_create_span = MagicMock(side_effect=lambda **kwargs: _make_mock_sgp_span()) + + mock_async_client = MagicMock() + mock_async_client.spans.upsert_batch = AsyncMock() + + with patch(f"{MODULE}.EnvironmentVariables", mock_env), \ + patch(f"{MODULE}.create_span", mock_create_span), \ + patch(f"{MODULE}.AsyncSGPClient", return_value=mock_async_client): + from agentex.lib.core.tracing.processors.sgp_tracing_processor import ( + SGPAsyncTracingProcessor, + ) + + processor = SGPAsyncTracingProcessor(_make_config()) + + # Wire up the mock client after construction (constructor stores it) + processor.sgp_async_client = mock_async_client + + # Keep create_span mock active for on_span_start calls + return processor, mock_create_span + + async def test_spans_not_leaked_after_completed_lifecycle(self): + processor, _ = self._make_processor() + + with patch(f"{MODULE}.create_span", side_effect=lambda **kw: _make_mock_sgp_span()): + for _ in range(100): + span = _make_span() + await processor.on_span_start(span) + span.end_time = datetime.now(UTC) + await processor.on_span_end(span) + + assert len(processor._spans) == 0, ( + f"Expected 0 spans after 100 complete lifecycles, got {len(processor._spans)} — memory leak!" + ) + + async def test_spans_present_during_active_lifecycle(self): + processor, _ = self._make_processor() + + with patch(f"{MODULE}.create_span", side_effect=lambda **kw: _make_mock_sgp_span()): + span = _make_span() + await processor.on_span_start(span) + assert len(processor._spans) == 1, "Span should be tracked while active" + + span.end_time = datetime.now(UTC) + await processor.on_span_end(span) + assert len(processor._spans) == 0, "Span should be removed after end" + + async def test_span_end_for_unknown_span_is_noop(self): + processor, _ = self._make_processor() + + span = _make_span() + span.end_time = datetime.now(UTC) + await processor.on_span_end(span) + + assert len(processor._spans) == 0 From 26402d0b085100574b7cde26d412220ed6c3c63d Mon Sep 17 00:00:00 2001 From: Stas Moreinis Date: Thu, 2 Apr 2026 17:19:53 -0700 Subject: [PATCH 2/5] feat(tracing): Add background queue for async span processing AsyncTrace.start_span() and end_span() previously awaited processor HTTP calls inline, adding 8 blocking round-trips per request. This moves processor calls into a background FIFO queue so callers enqueue and return immediately. - Add AsyncSpanQueue with sequential drain loop, error logging, and graceful shutdown with configurable timeout - Wire shutdown into FastAPI lifespan teardown in base_acp_server - FIFO ordering preserves start-before-end invariant required by SGPAsyncTracingProcessor's internal _spans dict --- src/agentex/lib/core/tracing/__init__.py | 16 +- src/agentex/lib/core/tracing/span_queue.py | 111 ++++++++++ src/agentex/lib/core/tracing/trace.py | 17 +- src/agentex/lib/core/tracing/tracer.py | 5 +- .../lib/sdk/fastacp/base/base_acp_server.py | 6 +- tests/lib/core/tracing/test_span_queue.py | 197 ++++++++++++++++++ 6 files changed, 342 insertions(+), 10 deletions(-) create mode 100644 src/agentex/lib/core/tracing/span_queue.py create mode 100644 tests/lib/core/tracing/test_span_queue.py diff --git a/src/agentex/lib/core/tracing/__init__.py b/src/agentex/lib/core/tracing/__init__.py index 9f91f9cec..639f3ba8e 100644 --- a/src/agentex/lib/core/tracing/__init__.py +++ b/src/agentex/lib/core/tracing/__init__.py @@ -1,5 +1,19 @@ from agentex.types.span import Span from agentex.lib.core.tracing.trace import Trace, AsyncTrace from agentex.lib.core.tracing.tracer import Tracer, AsyncTracer +from agentex.lib.core.tracing.span_queue import ( + AsyncSpanQueue, + get_default_span_queue, + shutdown_default_span_queue, +) -__all__ = ["Trace", "AsyncTrace", "Span", "Tracer", "AsyncTracer"] +__all__ = [ + "Trace", + "AsyncTrace", + "Span", + "Tracer", + "AsyncTracer", + "AsyncSpanQueue", + "get_default_span_queue", + "shutdown_default_span_queue", +] diff --git a/src/agentex/lib/core/tracing/span_queue.py b/src/agentex/lib/core/tracing/span_queue.py new file mode 100644 index 000000000..e881cc1da --- /dev/null +++ b/src/agentex/lib/core/tracing/span_queue.py @@ -0,0 +1,111 @@ +from __future__ import annotations + +import asyncio +from enum import Enum +from dataclasses import dataclass + +from agentex.types.span import Span +from agentex.lib.utils.logging import make_logger +from agentex.lib.core.tracing.processors.tracing_processor_interface import ( + AsyncTracingProcessor, +) + +logger = make_logger(__name__) + + +class SpanEventType(str, Enum): + START = "start" + END = "end" + + +@dataclass +class _SpanQueueItem: + event_type: SpanEventType + span: Span + processors: list[AsyncTracingProcessor] + + +class AsyncSpanQueue: + """Background FIFO queue for async span processing. + + Span events are enqueued synchronously (non-blocking) and processed + sequentially by a background drain task. This keeps tracing HTTP calls + off the critical request path while preserving start-before-end ordering. + """ + + def __init__(self) -> None: + self._queue: asyncio.Queue[_SpanQueueItem] = asyncio.Queue() + self._drain_task: asyncio.Task[None] | None = None + self._stopping = False + + def enqueue( + self, + event_type: SpanEventType, + span: Span, + processors: list[AsyncTracingProcessor], + ) -> None: + if self._stopping: + logger.warning("Span queue is shutting down, dropping %s event for span %s", event_type.value, span.id) + return + self._ensure_drain_running() + self._queue.put_nowait(_SpanQueueItem(event_type=event_type, span=span, processors=processors)) + + def _ensure_drain_running(self) -> None: + if self._drain_task is None or self._drain_task.done(): + self._drain_task = asyncio.create_task(self._drain_loop()) + + async def _drain_loop(self) -> None: + while True: + item = await self._queue.get() + try: + if item.event_type == SpanEventType.START: + coros = [p.on_span_start(item.span) for p in item.processors] + else: + coros = [p.on_span_end(item.span) for p in item.processors] + results = await asyncio.gather(*coros, return_exceptions=True) + for result in results: + if isinstance(result, Exception): + logger.error( + "Tracing processor error during %s for span %s", + item.event_type.value, + item.span.id, + exc_info=result, + ) + except Exception: + logger.exception("Unexpected error in span queue drain loop for span %s", item.span.id) + finally: + self._queue.task_done() + + async def shutdown(self, timeout: float = 30.0) -> None: + self._stopping = True + if self._queue.empty() and (self._drain_task is None or self._drain_task.done()): + return + try: + await asyncio.wait_for(self._queue.join(), timeout=timeout) + except asyncio.TimeoutError: + logger.warning( + "Span queue shutdown timed out after %.1fs with %d items remaining", timeout, self._queue.qsize() + ) + if self._drain_task is not None and not self._drain_task.done(): + self._drain_task.cancel() + try: + await self._drain_task + except asyncio.CancelledError: + pass + + +_default_span_queue: AsyncSpanQueue | None = None + + +def get_default_span_queue() -> AsyncSpanQueue: + global _default_span_queue + if _default_span_queue is None: + _default_span_queue = AsyncSpanQueue() + return _default_span_queue + + +async def shutdown_default_span_queue(timeout: float = 30.0) -> None: + global _default_span_queue + if _default_span_queue is not None: + await _default_span_queue.shutdown(timeout=timeout) + _default_span_queue = None diff --git a/src/agentex/lib/core/tracing/trace.py b/src/agentex/lib/core/tracing/trace.py index 2ba1d489e..5a758ed40 100644 --- a/src/agentex/lib/core/tracing/trace.py +++ b/src/agentex/lib/core/tracing/trace.py @@ -1,7 +1,6 @@ from __future__ import annotations import uuid -import asyncio from typing import Any, AsyncGenerator from datetime import UTC, datetime from contextlib import contextmanager, asynccontextmanager @@ -12,6 +11,11 @@ from agentex.types.span import Span from agentex.lib.utils.logging import make_logger from agentex.lib.utils.model_utils import recursive_model_dump +from agentex.lib.core.tracing.span_queue import ( + SpanEventType, + AsyncSpanQueue, + get_default_span_queue, +) from agentex.lib.core.tracing.processors.tracing_processor_interface import ( SyncTracingProcessor, AsyncTracingProcessor, @@ -173,6 +177,7 @@ def __init__( processors: list[AsyncTracingProcessor], client: AsyncAgentex, trace_id: str | None = None, + span_queue: AsyncSpanQueue | None = None, ): """ Initialize a new trace with the specified trace ID. @@ -180,10 +185,12 @@ def __init__( Args: trace_id: Required trace ID to use for this trace. processors: Optional list of tracing processors to use for this trace. + span_queue: Optional span queue for background processing. """ self.processors = processors self.client = client self.trace_id = trace_id + self._span_queue = span_queue or get_default_span_queue() async def start_span( self, @@ -225,9 +232,7 @@ async def start_span( ) if self.processors: - await asyncio.gather( - *[processor.on_span_start(span) for processor in self.processors] - ) + self._span_queue.enqueue(SpanEventType.START, span, self.processors) return span @@ -252,9 +257,7 @@ async def end_span( span.data = recursive_model_dump(span.data) if span.data else None if self.processors: - await asyncio.gather( - *[processor.on_span_end(span) for processor in self.processors] - ) + self._span_queue.enqueue(SpanEventType.END, span, self.processors) return span diff --git a/src/agentex/lib/core/tracing/tracer.py b/src/agentex/lib/core/tracing/tracer.py index da77bec95..3af79977e 100644 --- a/src/agentex/lib/core/tracing/tracer.py +++ b/src/agentex/lib/core/tracing/tracer.py @@ -2,6 +2,7 @@ from agentex import Agentex, AsyncAgentex from agentex.lib.core.tracing.trace import Trace, AsyncTrace +from agentex.lib.core.tracing.span_queue import AsyncSpanQueue from agentex.lib.core.tracing.tracing_processor_manager import ( get_sync_tracing_processors, get_async_tracing_processors, @@ -55,12 +56,13 @@ def __init__(self, client: AsyncAgentex): """ self.client = client - def trace(self, trace_id: str | None = None) -> AsyncTrace: + def trace(self, trace_id: str | None = None, span_queue: AsyncSpanQueue | None = None) -> AsyncTrace: """ Create a new trace with the given trace ID. Args: trace_id: The trace ID to use. + span_queue: Optional span queue for background processing. Returns: A new AsyncTrace instance. @@ -69,4 +71,5 @@ def trace(self, trace_id: str | None = None) -> AsyncTrace: processors=get_async_tracing_processors(), client=self.client, trace_id=trace_id, + span_queue=span_queue, ) diff --git a/src/agentex/lib/sdk/fastacp/base/base_acp_server.py b/src/agentex/lib/sdk/fastacp/base/base_acp_server.py index b625eaa1c..56507ade5 100644 --- a/src/agentex/lib/sdk/fastacp/base/base_acp_server.py +++ b/src/agentex/lib/sdk/fastacp/base/base_acp_server.py @@ -32,6 +32,7 @@ from agentex.lib.environment_variables import EnvironmentVariables, refreshed_environment_variables from agentex.types.task_message_update import TaskMessageUpdate, StreamTaskMessageFull from agentex.types.task_message_content import TaskMessageContent +from agentex.lib.core.tracing.span_queue import shutdown_default_span_queue from agentex.lib.sdk.fastacp.base.constants import ( FASTACP_HEADER_SKIP_EXACT, FASTACP_HEADER_SKIP_PREFIXES, @@ -103,7 +104,10 @@ async def lifespan_context(app: FastAPI): # noqa: ARG001 else: logger.warning("AGENTEX_BASE_URL not set, skipping agent registration") - yield + try: + yield + finally: + await shutdown_default_span_queue() return lifespan_context diff --git a/tests/lib/core/tracing/test_span_queue.py b/tests/lib/core/tracing/test_span_queue.py new file mode 100644 index 000000000..c6a558a22 --- /dev/null +++ b/tests/lib/core/tracing/test_span_queue.py @@ -0,0 +1,197 @@ +from __future__ import annotations + +import time +import uuid +import asyncio +from datetime import UTC, datetime +from unittest.mock import AsyncMock, MagicMock, patch + +from agentex.types.span import Span +from agentex.lib.core.tracing.span_queue import SpanEventType, AsyncSpanQueue + + +def _make_span(span_id: str | None = None) -> Span: + return Span( + id=span_id or str(uuid.uuid4()), + name="test-span", + start_time=datetime.now(UTC), + trace_id="trace-1", + ) + + +def _make_processor(**overrides: AsyncMock) -> AsyncMock: + proc = AsyncMock() + proc.on_span_start = overrides.get("on_span_start", AsyncMock()) + proc.on_span_end = overrides.get("on_span_end", AsyncMock()) + return proc + + +class TestAsyncSpanQueueNonBlocking: + async def test_enqueue_does_not_block(self): + started = asyncio.Event() + + async def slow_start(span: Span) -> None: + started.set() + await asyncio.sleep(1.0) + + slow_processor = _make_processor( + on_span_start=AsyncMock(side_effect=slow_start), + ) + queue = AsyncSpanQueue() + span = _make_span() + + start = time.monotonic() + queue.enqueue(SpanEventType.START, span, [slow_processor]) + elapsed = time.monotonic() - start + + assert elapsed < 0.01, f"enqueue took {elapsed:.3f}s — should be instant" + + # Wait for the processor to start (proves it was called) + await asyncio.wait_for(started.wait(), timeout=2.0) + await queue.shutdown() + + +class TestAsyncSpanQueueOrdering: + async def test_fifo_ordering_preserved(self): + call_log: list[tuple[str, str]] = [] + + async def record_start(span: Span) -> None: + call_log.append(("start", span.id)) + + async def record_end(span: Span) -> None: + call_log.append(("end", span.id)) + + proc = _make_processor( + on_span_start=AsyncMock(side_effect=record_start), + on_span_end=AsyncMock(side_effect=record_end), + ) + queue = AsyncSpanQueue() + + span_a = _make_span("span-a") + span_b = _make_span("span-b") + + queue.enqueue(SpanEventType.START, span_a, [proc]) + queue.enqueue(SpanEventType.END, span_a, [proc]) + queue.enqueue(SpanEventType.START, span_b, [proc]) + queue.enqueue(SpanEventType.END, span_b, [proc]) + + await queue.shutdown() + + assert call_log == [ + ("start", "span-a"), + ("end", "span-a"), + ("start", "span-b"), + ("end", "span-b"), + ] + + +class TestAsyncSpanQueueErrorHandling: + async def test_error_in_processor_does_not_stop_drain(self): + call_count = 0 + + async def failing_start(span: Span) -> None: + nonlocal call_count + call_count += 1 + if call_count == 1: + raise RuntimeError("simulated failure") + + proc = _make_processor( + on_span_start=AsyncMock(side_effect=failing_start), + ) + queue = AsyncSpanQueue() + + queue.enqueue(SpanEventType.START, _make_span(), [proc]) + queue.enqueue(SpanEventType.START, _make_span(), [proc]) + + await queue.shutdown() + + assert call_count == 2, "Second event should still be processed after first fails" + + +class TestAsyncSpanQueueShutdown: + async def test_shutdown_drains_remaining_items(self): + processed: list[str] = [] + + async def track(span: Span) -> None: + processed.append(span.id) + + proc = _make_processor(on_span_start=AsyncMock(side_effect=track)) + queue = AsyncSpanQueue() + + for i in range(5): + queue.enqueue(SpanEventType.START, _make_span(f"span-{i}"), [proc]) + + await queue.shutdown() + + assert len(processed) == 5 + + async def test_shutdown_timeout(self): + async def stuck_start(span: Span) -> None: + await asyncio.sleep(60) + + stuck_processor = _make_processor( + on_span_start=AsyncMock(side_effect=stuck_start), + ) + queue = AsyncSpanQueue() + queue.enqueue(SpanEventType.START, _make_span(), [stuck_processor]) + + # Give the drain loop a moment to pick up the item + await asyncio.sleep(0.05) + + start = time.monotonic() + await queue.shutdown(timeout=0.1) + elapsed = time.monotonic() - start + + assert elapsed < 1.0, f"shutdown should not hang — took {elapsed:.1f}s" + + async def test_enqueue_after_shutdown_is_dropped(self): + proc = _make_processor() + queue = AsyncSpanQueue() + await queue.shutdown() + + queue.enqueue(SpanEventType.START, _make_span(), [proc]) + + proc.on_span_start.assert_not_called() + + +class TestAsyncSpanQueueIntegration: + async def test_integration_with_async_trace(self): + call_log: list[tuple[str, str]] = [] + + async def record_start(span: Span) -> None: + call_log.append(("start", span.id)) + + async def record_end(span: Span) -> None: + call_log.append(("end", span.id)) + + proc = _make_processor( + on_span_start=AsyncMock(side_effect=record_start), + on_span_end=AsyncMock(side_effect=record_end), + ) + queue = AsyncSpanQueue() + + # Patch get_async_tracing_processors to return our mock + with patch( + "agentex.lib.core.tracing.trace.get_default_span_queue", + return_value=queue, + ): + from agentex.lib.core.tracing.trace import AsyncTrace + + mock_client = MagicMock() + trace = AsyncTrace( + processors=[proc], + client=mock_client, + trace_id="test-trace", + span_queue=queue, + ) + + async with trace.span("test-operation") as span: + span.output = {"result": "ok"} + + await queue.shutdown() + + assert len(call_log) == 2 + assert call_log[0][0] == "start" + assert call_log[1][0] == "end" + # Same span ID for both events + assert call_log[0][1] == call_log[1][1] From 67ed1567b23af084298dfe673ee809370dd673fe Mon Sep 17 00:00:00 2001 From: Stas Moreinis Date: Thu, 2 Apr 2026 17:40:38 -0700 Subject: [PATCH 3/5] fix(tests): Use context manager patches for sync processor tests @patch decorators on _make_processor expired before test bodies ran, so on_span_start/on_span_end hit the real create_span and flush. Refactored to @staticmethod with 'with patch(...)' context managers matching the async test class pattern. --- .../processors/test_sgp_tracing_processor.py | 50 +++++++++++-------- 1 file changed, 28 insertions(+), 22 deletions(-) diff --git a/tests/lib/core/tracing/processors/test_sgp_tracing_processor.py b/tests/lib/core/tracing/processors/test_sgp_tracing_processor.py index 793bf2495..1acafa527 100644 --- a/tests/lib/core/tracing/processors/test_sgp_tracing_processor.py +++ b/tests/lib/core/tracing/processors/test_sgp_tracing_processor.py @@ -42,29 +42,34 @@ def _make_mock_sgp_span() -> MagicMock: class TestSGPSyncTracingProcessorMemoryLeak: - @patch(f"{MODULE}.EnvironmentVariables") - @patch(f"{MODULE}.flush_queue") - @patch(f"{MODULE}.create_span") - @patch(f"{MODULE}.SGPClient") - @patch(f"{MODULE}.tracing") - def _make_processor(self, mock_tracing, mock_sgp_client, mock_create_span, mock_flush, mock_env): + @staticmethod + def _make_processor(): + mock_env = MagicMock() mock_env.refresh.return_value = MagicMock(ACP_TYPE=None, AGENT_NAME=None, AGENT_ID=None) - mock_create_span.side_effect = lambda **kwargs: _make_mock_sgp_span() + mock_create_span = MagicMock(side_effect=lambda **kwargs: _make_mock_sgp_span()) - from agentex.lib.core.tracing.processors.sgp_tracing_processor import ( - SGPSyncTracingProcessor, - ) + with patch(f"{MODULE}.EnvironmentVariables", mock_env), \ + patch(f"{MODULE}.SGPClient"), \ + patch(f"{MODULE}.tracing"), \ + patch(f"{MODULE}.flush_queue"), \ + patch(f"{MODULE}.create_span", mock_create_span): + from agentex.lib.core.tracing.processors.sgp_tracing_processor import ( + SGPSyncTracingProcessor, + ) + + processor = SGPSyncTracingProcessor(_make_config()) - return SGPSyncTracingProcessor(_make_config()), mock_create_span + return processor, mock_create_span def test_spans_not_leaked_after_completed_lifecycle(self): processor, _ = self._make_processor() - for _ in range(100): - span = _make_span() - processor.on_span_start(span) - span.end_time = datetime.now(UTC) - processor.on_span_end(span) + with patch(f"{MODULE}.create_span", side_effect=lambda **kw: _make_mock_sgp_span()): + for _ in range(100): + span = _make_span() + processor.on_span_start(span) + span.end_time = datetime.now(UTC) + processor.on_span_end(span) assert len(processor._spans) == 0, ( f"Expected 0 spans after 100 complete lifecycles, got {len(processor._spans)} — memory leak!" @@ -73,13 +78,14 @@ def test_spans_not_leaked_after_completed_lifecycle(self): def test_spans_present_during_active_lifecycle(self): processor, _ = self._make_processor() - span = _make_span() - processor.on_span_start(span) - assert len(processor._spans) == 1, "Span should be tracked while active" + with patch(f"{MODULE}.create_span", side_effect=lambda **kw: _make_mock_sgp_span()): + span = _make_span() + processor.on_span_start(span) + assert len(processor._spans) == 1, "Span should be tracked while active" - span.end_time = datetime.now(UTC) - processor.on_span_end(span) - assert len(processor._spans) == 0, "Span should be removed after end" + span.end_time = datetime.now(UTC) + processor.on_span_end(span) + assert len(processor._spans) == 0, "Span should be removed after end" def test_span_end_for_unknown_span_is_noop(self): processor, _ = self._make_processor() From 84b3bca1b2e9c3b231da629832a7b832d43e2f0c Mon Sep 17 00:00:00 2001 From: Stas Moreinis Date: Thu, 2 Apr 2026 18:01:12 -0700 Subject: [PATCH 4/5] fix(tests): Satisfy pyright type check for span output assignment Use explicit dict[str, object] annotation to avoid invariance error when assigning to Span.output (Dict[str, object] | ... | None). --- tests/lib/core/tracing/test_span_queue.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tests/lib/core/tracing/test_span_queue.py b/tests/lib/core/tracing/test_span_queue.py index c6a558a22..1f39fb25d 100644 --- a/tests/lib/core/tracing/test_span_queue.py +++ b/tests/lib/core/tracing/test_span_queue.py @@ -186,7 +186,8 @@ async def record_end(span: Span) -> None: ) async with trace.span("test-operation") as span: - span.output = {"result": "ok"} + output: dict[str, object] = {"result": "ok"} + span.output = output await queue.shutdown() From 1a4f0b779340271789222611d035bb1edb182146 Mon Sep 17 00:00:00 2001 From: Stas Moreinis Date: Thu, 2 Apr 2026 18:35:03 -0700 Subject: [PATCH 5/5] fix(tracing): Deep-copy spans before enqueuing to background queue Processors like SGPAsyncTracingProcessor mutate span.data in-place via _add_source_to_span. With async background processing, this raced with the caller who holds a reference to the same span object. Deep-copying via model_copy(deep=True) decouples the two. --- src/agentex/lib/core/tracing/trace.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/agentex/lib/core/tracing/trace.py b/src/agentex/lib/core/tracing/trace.py index 5a758ed40..7925df7fc 100644 --- a/src/agentex/lib/core/tracing/trace.py +++ b/src/agentex/lib/core/tracing/trace.py @@ -232,7 +232,7 @@ async def start_span( ) if self.processors: - self._span_queue.enqueue(SpanEventType.START, span, self.processors) + self._span_queue.enqueue(SpanEventType.START, span.model_copy(deep=True), self.processors) return span @@ -257,7 +257,7 @@ async def end_span( span.data = recursive_model_dump(span.data) if span.data else None if self.processors: - self._span_queue.enqueue(SpanEventType.END, span, self.processors) + self._span_queue.enqueue(SpanEventType.END, span.model_copy(deep=True), self.processors) return span