Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ def _instrument_channel_consumers(
consumer_callback = getattr(consumer_info, callback_attr, None)
if consumer_callback is None:
continue
if hasattr(consumer_callback, "_original_callback"):
continue
decorated_callback = utils._decorate_callback(
consumer_callback,
tracer,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,14 @@ def setUp(self) -> None:
self.channel = mock.MagicMock(spec=Channel)
consumer_info = mock.MagicMock()
callback_attr = PikaInstrumentor.CONSUMER_CALLBACK_ATTR
setattr(consumer_info, callback_attr, mock.MagicMock())
setattr(consumer_info, callback_attr, self._consumer_callback)
self.blocking_channel._consumer_infos = {"consumer-tag": consumer_info}
self.channel._consumers = {"consumer-tag": consumer_info}
self.mock_callback = mock.MagicMock()

def _consumer_callback(self) -> None:
pass

def test_instrument_api(self) -> None:
instrumentation = PikaInstrumentor()
instrumentation.instrument()
Expand Down Expand Up @@ -155,6 +158,52 @@ def test_instrument_consumers_on_channel(
for callback in self.channel._consumers.values()
)

@mock.patch("opentelemetry.instrumentation.pika.utils._decorate_callback")
def test_instrument_consumers_skips_already_decorated_callbacks(
self, decorate_callback: mock.MagicMock
) -> None:
def consumer_callback_1() -> None:
pass

def consumer_callback_2() -> None:
pass

def decorate_callback_side_effect(
callback, tracer, consumer_tag, consume_hook
):
def decorated_callback() -> None:
pass

return decorated_callback

tracer = mock.MagicMock(spec=Tracer)
callback_attr = PikaInstrumentor.CONSUMER_CALLBACK_ATTR
consumer_info_1 = mock.Mock()
consumer_info_2 = mock.Mock()
setattr(consumer_info_1, callback_attr, consumer_callback_1)
setattr(consumer_info_2, callback_attr, consumer_callback_2)
channel = mock.MagicMock(spec=BlockingChannel)
channel._consumer_infos = {
"consumer-tag-1": consumer_info_1,
"consumer-tag-2": consumer_info_2,
}
decorate_callback.side_effect = decorate_callback_side_effect

PikaInstrumentor._instrument_channel_consumers(channel, tracer)
wrapped_callback_1 = getattr(consumer_info_1, callback_attr)
wrapped_callback_2 = getattr(consumer_info_2, callback_attr)
PikaInstrumentor._instrument_channel_consumers(channel, tracer)

self.assertEqual(decorate_callback.call_count, 2)
self.assertIs(getattr(consumer_info_1, callback_attr), wrapped_callback_1)
self.assertIs(getattr(consumer_info_2, callback_attr), wrapped_callback_2)
self.assertIs(
wrapped_callback_1._original_callback, consumer_callback_1
)
self.assertIs(
wrapped_callback_2._original_callback, consumer_callback_2
)

@mock.patch(
"opentelemetry.instrumentation.pika.utils._decorate_basic_publish"
)
Expand Down
Loading