Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .changelog/4728.fixed
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
`opentelemetry-instrumentation-redis`: extract `ClusterPipeline` commands from `_execution_strategy` in redis-py 6+ so pipeline span attributes include the queued commands instead of being empty
Original file line number Diff line number Diff line change
Expand Up @@ -203,11 +203,18 @@ def _build_span_meta_data_for_pipeline(
instance: PipelineInstance | AsyncPipelineInstance,
) -> tuple[list[Any], str, str]:
try:
command_stack = (
instance.command_stack
if hasattr(instance, "command_stack")
else instance._command_stack
)
# redis-py 6+ ClusterPipeline no longer updates ``command_stack``;
# queued commands are tracked on ``_execution_strategy.command_queue``.
# Fall back to ``command_stack`` / ``_command_stack`` for non-cluster
# pipelines and older redis-py versions.
if hasattr(instance, "_execution_strategy") and hasattr(
instance._execution_strategy, "command_queue"
):
command_stack = instance._execution_strategy.command_queue
elif hasattr(instance, "command_stack"):
command_stack = instance.command_stack
else:
command_stack = instance._command_stack

cmds = [
_format_command_args(c.args if hasattr(c, "args") else c[0])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@
_OpenTelemetrySemanticConventionStability,
)
from opentelemetry.instrumentation.redis import RedisInstrumentor
from opentelemetry.instrumentation.redis.util import (
_build_span_meta_data_for_pipeline,
)
from opentelemetry.instrumentation.utils import suppress_instrumentation
from opentelemetry.semconv._incubating.attributes.db_attributes import (
DB_REDIS_DATABASE_INDEX,
Expand Down Expand Up @@ -1613,3 +1616,67 @@ def test_schema_url_combined_mode(self):
call_args[1]["schema_url"],
"https://opentelemetry.io/schemas/1.25.0",
)


class _FakeCommand:
def __init__(self, *args):
self.args = args


class _FakeExecutionStrategy:
def __init__(self, commands):
self.command_queue = commands


class _FakeClusterPipeline:
"""Mimics redis-py 6+ ClusterPipeline: queued commands live on
``_execution_strategy.command_queue`` while ``command_stack`` stays empty."""

def __init__(self, commands):
self.command_stack = []
self._execution_strategy = _FakeExecutionStrategy(commands)


class _FakeLegacyPipeline:
def __init__(self, commands):
self.command_stack = commands


class TestBuildSpanMetaDataForPipeline(TestBase):
def test_cluster_pipeline_reads_execution_strategy(self):
# Regression test for issue #4084: redis-py 6+ ClusterPipeline no
# longer populates command_stack, so commands must be read from
# _execution_strategy.command_queue.
commands = [_FakeCommand("SET", "k1", "v1"), _FakeCommand("GET", "k1")]
instance = _FakeClusterPipeline(commands)

command_stack, resource, span_name = (
_build_span_meta_data_for_pipeline(instance)
)

self.assertEqual(len(command_stack), 2)
self.assertEqual(resource, "SET ? ?\nGET ?")
self.assertEqual(span_name, "SET GET")

def test_legacy_pipeline_still_reads_command_stack(self):
commands = [_FakeCommand("SET", "k1", "v1")]
instance = _FakeLegacyPipeline(commands)

command_stack, resource, span_name = (
_build_span_meta_data_for_pipeline(instance)
)

self.assertEqual(len(command_stack), 1)
self.assertEqual(resource, "SET ? ?")
self.assertEqual(span_name, "SET")

def test_empty_cluster_pipeline_falls_back_to_redis_span_name(self):
instance = _FakeClusterPipeline([])

command_stack, resource, span_name = (
_build_span_meta_data_for_pipeline(instance)
)

self.assertEqual(command_stack, [])
self.assertEqual(resource, "")
self.assertEqual(span_name, "redis")