From ee766cb1a20554416ee0f3fa98ef4c0a8de2fb28 Mon Sep 17 00:00:00 2001 From: axelray-dev <110029405+axelray-dev@users.noreply.github.com> Date: Sun, 21 Jun 2026 18:00:26 +0800 Subject: [PATCH] fix(pika): skip already-decorated callbacks in _instrument_channel_consumers Addresses #4667: multiple basic_consume() calls on a single channel cause duplicate nested CONSUMER spans because callbacks get re-wrapped without checking if already decorated. Assisted-by: Codex --- .../instrumentation/pika/pika_instrumentor.py | 2 + .../tests/test_pika_instrumentation.py | 51 ++++++++++++++++++- 2 files changed, 52 insertions(+), 1 deletion(-) diff --git a/instrumentation/opentelemetry-instrumentation-pika/src/opentelemetry/instrumentation/pika/pika_instrumentor.py b/instrumentation/opentelemetry-instrumentation-pika/src/opentelemetry/instrumentation/pika/pika_instrumentor.py index a83696a46d..24be559779 100644 --- a/instrumentation/opentelemetry-instrumentation-pika/src/opentelemetry/instrumentation/pika/pika_instrumentor.py +++ b/instrumentation/opentelemetry-instrumentation-pika/src/opentelemetry/instrumentation/pika/pika_instrumentor.py @@ -58,6 +58,8 @@ def _instrument_channel_consumers( consumer_callback = getattr(consumer_info, callback_attr, None) if consumer_callback is None: continue + if hasattr(consumer_callback, "_original_callback"): + continue decorated_callback = utils._decorate_callback( consumer_callback, tracer, diff --git a/instrumentation/opentelemetry-instrumentation-pika/tests/test_pika_instrumentation.py b/instrumentation/opentelemetry-instrumentation-pika/tests/test_pika_instrumentation.py index 2219ac9dd5..93f54baebe 100644 --- a/instrumentation/opentelemetry-instrumentation-pika/tests/test_pika_instrumentation.py +++ b/instrumentation/opentelemetry-instrumentation-pika/tests/test_pika_instrumentation.py @@ -28,11 +28,14 @@ def setUp(self) -> None: self.channel = mock.MagicMock(spec=Channel) consumer_info = mock.MagicMock() callback_attr = PikaInstrumentor.CONSUMER_CALLBACK_ATTR - setattr(consumer_info, callback_attr, mock.MagicMock()) + setattr(consumer_info, callback_attr, self._consumer_callback) self.blocking_channel._consumer_infos = {"consumer-tag": consumer_info} self.channel._consumers = {"consumer-tag": consumer_info} self.mock_callback = mock.MagicMock() + def _consumer_callback(self) -> None: + pass + def test_instrument_api(self) -> None: instrumentation = PikaInstrumentor() instrumentation.instrument() @@ -155,6 +158,52 @@ def test_instrument_consumers_on_channel( for callback in self.channel._consumers.values() ) + @mock.patch("opentelemetry.instrumentation.pika.utils._decorate_callback") + def test_instrument_consumers_skips_already_decorated_callbacks( + self, decorate_callback: mock.MagicMock + ) -> None: + def consumer_callback_1() -> None: + pass + + def consumer_callback_2() -> None: + pass + + def decorate_callback_side_effect( + callback, tracer, consumer_tag, consume_hook + ): + def decorated_callback() -> None: + pass + + return decorated_callback + + tracer = mock.MagicMock(spec=Tracer) + callback_attr = PikaInstrumentor.CONSUMER_CALLBACK_ATTR + consumer_info_1 = mock.Mock() + consumer_info_2 = mock.Mock() + setattr(consumer_info_1, callback_attr, consumer_callback_1) + setattr(consumer_info_2, callback_attr, consumer_callback_2) + channel = mock.MagicMock(spec=BlockingChannel) + channel._consumer_infos = { + "consumer-tag-1": consumer_info_1, + "consumer-tag-2": consumer_info_2, + } + decorate_callback.side_effect = decorate_callback_side_effect + + PikaInstrumentor._instrument_channel_consumers(channel, tracer) + wrapped_callback_1 = getattr(consumer_info_1, callback_attr) + wrapped_callback_2 = getattr(consumer_info_2, callback_attr) + PikaInstrumentor._instrument_channel_consumers(channel, tracer) + + self.assertEqual(decorate_callback.call_count, 2) + self.assertIs(getattr(consumer_info_1, callback_attr), wrapped_callback_1) + self.assertIs(getattr(consumer_info_2, callback_attr), wrapped_callback_2) + self.assertIs( + wrapped_callback_1._original_callback, consumer_callback_1 + ) + self.assertIs( + wrapped_callback_2._original_callback, consumer_callback_2 + ) + @mock.patch( "opentelemetry.instrumentation.pika.utils._decorate_basic_publish" )