From 97bcaf49d8c109b3b84225023376c90d5d3b27d5 Mon Sep 17 00:00:00 2001 From: Stas Moreinis Date: Thu, 2 Apr 2026 11:57:06 -0700 Subject: [PATCH 1/2] fix(tracing): Fix memory leak in SGP tracing processors SGPSyncTracingProcessor and SGPAsyncTracingProcessor accumulated spans in self._spans dict on every request but never removed them, since on_span_end() used dict.get() (read-only) instead of dict.pop() (read-and-remove). The only cleanup was in shutdown() which is never called. After this fix, spans are removed from the dict when they complete, preventing unbounded memory growth. --- .../processors/sgp_tracing_processor.py | 14 +- tests/lib/core/__init__.py | 0 tests/lib/core/tracing/__init__.py | 0 tests/lib/core/tracing/processors/__init__.py | 0 .../processors/test_sgp_tracing_processor.py | 158 ++++++++++++++++++ 5 files changed, 163 insertions(+), 9 deletions(-) create mode 100644 tests/lib/core/__init__.py create mode 100644 tests/lib/core/tracing/__init__.py create mode 100644 tests/lib/core/tracing/processors/__init__.py create mode 100644 tests/lib/core/tracing/processors/test_sgp_tracing_processor.py diff --git a/src/agentex/lib/core/tracing/processors/sgp_tracing_processor.py b/src/agentex/lib/core/tracing/processors/sgp_tracing_processor.py index b0c2a213a..2f94e7f87 100644 --- a/src/agentex/lib/core/tracing/processors/sgp_tracing_processor.py +++ b/src/agentex/lib/core/tracing/processors/sgp_tracing_processor.py @@ -30,7 +30,7 @@ def __init__(self, config: SGPTracingProcessorConfig): disabled = config.sgp_api_key == "" or config.sgp_account_id == "" tracing.init( SGPClient( - api_key=config.sgp_api_key, + api_key=config.sgp_api_key, account_id=config.sgp_account_id, base_url=config.sgp_base_url, ), @@ -72,11 +72,9 @@ def on_span_start(self, span: Span) -> None: @override def on_span_end(self, span: Span) -> None: - sgp_span = self._spans.get(span.id) + sgp_span = self._spans.pop(span.id, None) if sgp_span is None: - logger.warning( - f"Span {span.id} not found in stored spans, skipping span end" - ) + logger.warning(f"Span {span.id} not found in stored spans, skipping span end") return self._add_source_to_span(span) @@ -151,11 +149,9 @@ async def on_span_start(self, span: Span) -> None: @override async def on_span_end(self, span: Span) -> None: - sgp_span = self._spans.get(span.id) + sgp_span = self._spans.pop(span.id, None) if sgp_span is None: - logger.warning( - f"Span {span.id} not found in stored spans, skipping span end" - ) + logger.warning(f"Span {span.id} not found in stored spans, skipping span end") return self._add_source_to_span(span) diff --git a/tests/lib/core/__init__.py b/tests/lib/core/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/tests/lib/core/tracing/__init__.py b/tests/lib/core/tracing/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/tests/lib/core/tracing/processors/__init__.py b/tests/lib/core/tracing/processors/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/tests/lib/core/tracing/processors/test_sgp_tracing_processor.py b/tests/lib/core/tracing/processors/test_sgp_tracing_processor.py new file mode 100644 index 000000000..793bf2495 --- /dev/null +++ b/tests/lib/core/tracing/processors/test_sgp_tracing_processor.py @@ -0,0 +1,158 @@ +from __future__ import annotations + +import uuid +from datetime import UTC, datetime +from unittest.mock import AsyncMock, MagicMock, patch + +from agentex.types.span import Span +from agentex.lib.types.tracing import SGPTracingProcessorConfig + +MODULE = "agentex.lib.core.tracing.processors.sgp_tracing_processor" + + +def _make_config() -> SGPTracingProcessorConfig: + return SGPTracingProcessorConfig( + sgp_api_key="test-key", + sgp_account_id="test-account", + ) + + +def _make_span(span_id: str | None = None) -> Span: + return Span( + id=span_id or str(uuid.uuid4()), + name="test-span", + start_time=datetime.now(UTC), + trace_id="trace-1", + ) + + +def _make_mock_sgp_span() -> MagicMock: + sgp_span = MagicMock() + sgp_span.to_request_params.return_value = {"mock": "params"} + sgp_span.start_time = None + sgp_span.end_time = None + sgp_span.output = None + sgp_span.metadata = None + return sgp_span + + +# --------------------------------------------------------------------------- +# Sync processor tests +# --------------------------------------------------------------------------- + + +class TestSGPSyncTracingProcessorMemoryLeak: + @patch(f"{MODULE}.EnvironmentVariables") + @patch(f"{MODULE}.flush_queue") + @patch(f"{MODULE}.create_span") + @patch(f"{MODULE}.SGPClient") + @patch(f"{MODULE}.tracing") + def _make_processor(self, mock_tracing, mock_sgp_client, mock_create_span, mock_flush, mock_env): + mock_env.refresh.return_value = MagicMock(ACP_TYPE=None, AGENT_NAME=None, AGENT_ID=None) + mock_create_span.side_effect = lambda **kwargs: _make_mock_sgp_span() + + from agentex.lib.core.tracing.processors.sgp_tracing_processor import ( + SGPSyncTracingProcessor, + ) + + return SGPSyncTracingProcessor(_make_config()), mock_create_span + + def test_spans_not_leaked_after_completed_lifecycle(self): + processor, _ = self._make_processor() + + for _ in range(100): + span = _make_span() + processor.on_span_start(span) + span.end_time = datetime.now(UTC) + processor.on_span_end(span) + + assert len(processor._spans) == 0, ( + f"Expected 0 spans after 100 complete lifecycles, got {len(processor._spans)} — memory leak!" + ) + + def test_spans_present_during_active_lifecycle(self): + processor, _ = self._make_processor() + + span = _make_span() + processor.on_span_start(span) + assert len(processor._spans) == 1, "Span should be tracked while active" + + span.end_time = datetime.now(UTC) + processor.on_span_end(span) + assert len(processor._spans) == 0, "Span should be removed after end" + + def test_span_end_for_unknown_span_is_noop(self): + processor, _ = self._make_processor() + + span = _make_span() + # End a span that was never started — should not raise + span.end_time = datetime.now(UTC) + processor.on_span_end(span) + + assert len(processor._spans) == 0 + + +# --------------------------------------------------------------------------- +# Async processor tests +# --------------------------------------------------------------------------- + + +class TestSGPAsyncTracingProcessorMemoryLeak: + @staticmethod + def _make_processor(): + mock_env = MagicMock() + mock_env.refresh.return_value = MagicMock(ACP_TYPE=None, AGENT_NAME=None, AGENT_ID=None) + mock_create_span = MagicMock(side_effect=lambda **kwargs: _make_mock_sgp_span()) + + mock_async_client = MagicMock() + mock_async_client.spans.upsert_batch = AsyncMock() + + with patch(f"{MODULE}.EnvironmentVariables", mock_env), \ + patch(f"{MODULE}.create_span", mock_create_span), \ + patch(f"{MODULE}.AsyncSGPClient", return_value=mock_async_client): + from agentex.lib.core.tracing.processors.sgp_tracing_processor import ( + SGPAsyncTracingProcessor, + ) + + processor = SGPAsyncTracingProcessor(_make_config()) + + # Wire up the mock client after construction (constructor stores it) + processor.sgp_async_client = mock_async_client + + # Keep create_span mock active for on_span_start calls + return processor, mock_create_span + + async def test_spans_not_leaked_after_completed_lifecycle(self): + processor, _ = self._make_processor() + + with patch(f"{MODULE}.create_span", side_effect=lambda **kw: _make_mock_sgp_span()): + for _ in range(100): + span = _make_span() + await processor.on_span_start(span) + span.end_time = datetime.now(UTC) + await processor.on_span_end(span) + + assert len(processor._spans) == 0, ( + f"Expected 0 spans after 100 complete lifecycles, got {len(processor._spans)} — memory leak!" + ) + + async def test_spans_present_during_active_lifecycle(self): + processor, _ = self._make_processor() + + with patch(f"{MODULE}.create_span", side_effect=lambda **kw: _make_mock_sgp_span()): + span = _make_span() + await processor.on_span_start(span) + assert len(processor._spans) == 1, "Span should be tracked while active" + + span.end_time = datetime.now(UTC) + await processor.on_span_end(span) + assert len(processor._spans) == 0, "Span should be removed after end" + + async def test_span_end_for_unknown_span_is_noop(self): + processor, _ = self._make_processor() + + span = _make_span() + span.end_time = datetime.now(UTC) + await processor.on_span_end(span) + + assert len(processor._spans) == 0 From d74b8ce7ae84ed4f8bbe37622153c7e3722cb4b4 Mon Sep 17 00:00:00 2001 From: Stas Moreinis Date: Thu, 2 Apr 2026 17:40:38 -0700 Subject: [PATCH 2/2] fix(tests): Use context manager patches for sync processor tests @patch decorators on _make_processor expired before test bodies ran, so on_span_start/on_span_end hit the real create_span and flush. Refactored to @staticmethod with 'with patch(...)' context managers matching the async test class pattern. --- .../processors/test_sgp_tracing_processor.py | 50 +++++++++++-------- 1 file changed, 28 insertions(+), 22 deletions(-) diff --git a/tests/lib/core/tracing/processors/test_sgp_tracing_processor.py b/tests/lib/core/tracing/processors/test_sgp_tracing_processor.py index 793bf2495..1acafa527 100644 --- a/tests/lib/core/tracing/processors/test_sgp_tracing_processor.py +++ b/tests/lib/core/tracing/processors/test_sgp_tracing_processor.py @@ -42,29 +42,34 @@ def _make_mock_sgp_span() -> MagicMock: class TestSGPSyncTracingProcessorMemoryLeak: - @patch(f"{MODULE}.EnvironmentVariables") - @patch(f"{MODULE}.flush_queue") - @patch(f"{MODULE}.create_span") - @patch(f"{MODULE}.SGPClient") - @patch(f"{MODULE}.tracing") - def _make_processor(self, mock_tracing, mock_sgp_client, mock_create_span, mock_flush, mock_env): + @staticmethod + def _make_processor(): + mock_env = MagicMock() mock_env.refresh.return_value = MagicMock(ACP_TYPE=None, AGENT_NAME=None, AGENT_ID=None) - mock_create_span.side_effect = lambda **kwargs: _make_mock_sgp_span() + mock_create_span = MagicMock(side_effect=lambda **kwargs: _make_mock_sgp_span()) - from agentex.lib.core.tracing.processors.sgp_tracing_processor import ( - SGPSyncTracingProcessor, - ) + with patch(f"{MODULE}.EnvironmentVariables", mock_env), \ + patch(f"{MODULE}.SGPClient"), \ + patch(f"{MODULE}.tracing"), \ + patch(f"{MODULE}.flush_queue"), \ + patch(f"{MODULE}.create_span", mock_create_span): + from agentex.lib.core.tracing.processors.sgp_tracing_processor import ( + SGPSyncTracingProcessor, + ) + + processor = SGPSyncTracingProcessor(_make_config()) - return SGPSyncTracingProcessor(_make_config()), mock_create_span + return processor, mock_create_span def test_spans_not_leaked_after_completed_lifecycle(self): processor, _ = self._make_processor() - for _ in range(100): - span = _make_span() - processor.on_span_start(span) - span.end_time = datetime.now(UTC) - processor.on_span_end(span) + with patch(f"{MODULE}.create_span", side_effect=lambda **kw: _make_mock_sgp_span()): + for _ in range(100): + span = _make_span() + processor.on_span_start(span) + span.end_time = datetime.now(UTC) + processor.on_span_end(span) assert len(processor._spans) == 0, ( f"Expected 0 spans after 100 complete lifecycles, got {len(processor._spans)} — memory leak!" @@ -73,13 +78,14 @@ def test_spans_not_leaked_after_completed_lifecycle(self): def test_spans_present_during_active_lifecycle(self): processor, _ = self._make_processor() - span = _make_span() - processor.on_span_start(span) - assert len(processor._spans) == 1, "Span should be tracked while active" + with patch(f"{MODULE}.create_span", side_effect=lambda **kw: _make_mock_sgp_span()): + span = _make_span() + processor.on_span_start(span) + assert len(processor._spans) == 1, "Span should be tracked while active" - span.end_time = datetime.now(UTC) - processor.on_span_end(span) - assert len(processor._spans) == 0, "Span should be removed after end" + span.end_time = datetime.now(UTC) + processor.on_span_end(span) + assert len(processor._spans) == 0, "Span should be removed after end" def test_span_end_for_unknown_span_is_noop(self): processor, _ = self._make_processor()