From 3aafc98275b2844f04e8078cf5531b4b1f6501b2 Mon Sep 17 00:00:00 2001 From: UTKARSH698 Date: Mon, 22 Jun 2026 11:01:22 +0530 Subject: [PATCH 1/2] fix(redis): build ClusterPipeline span metadata from _execution_strategy in redis-py 6+ redis-py 6 refactored ClusterPipeline so that queued commands are no longer appended to `command_stack`; they now live on `_execution_strategy.command_queue`. As a result the redis instrumentor emitted ClusterPipeline spans with an empty DB_STATEMENT and a `db.redis.pipeline_length` of 0. `_build_span_meta_data_for_pipeline` now reads commands from `_execution_strategy.command_queue` when present, falling back to `command_stack` / `_command_stack` for non-cluster pipelines and older redis-py versions. Fixes #4084 --- .changelog/4084.fixed | 1 + .../instrumentation/redis/util.py | 17 +++-- .../tests/test_redis.py | 67 +++++++++++++++++++ 3 files changed, 80 insertions(+), 5 deletions(-) create mode 100644 .changelog/4084.fixed diff --git a/.changelog/4084.fixed b/.changelog/4084.fixed new file mode 100644 index 0000000000..a098e69312 --- /dev/null +++ b/.changelog/4084.fixed @@ -0,0 +1 @@ +`opentelemetry-instrumentation-redis`: extract `ClusterPipeline` commands from `_execution_strategy` in redis-py 6+ so pipeline span attributes include the queued commands instead of being empty diff --git a/instrumentation/opentelemetry-instrumentation-redis/src/opentelemetry/instrumentation/redis/util.py b/instrumentation/opentelemetry-instrumentation-redis/src/opentelemetry/instrumentation/redis/util.py index 1eae0e624d..388a495fec 100644 --- a/instrumentation/opentelemetry-instrumentation-redis/src/opentelemetry/instrumentation/redis/util.py +++ b/instrumentation/opentelemetry-instrumentation-redis/src/opentelemetry/instrumentation/redis/util.py @@ -203,11 +203,18 @@ def _build_span_meta_data_for_pipeline( instance: PipelineInstance | AsyncPipelineInstance, ) -> tuple[list[Any], str, str]: try: - command_stack = ( - instance.command_stack - if hasattr(instance, "command_stack") - else instance._command_stack - ) + # redis-py 6+ ClusterPipeline no longer updates ``command_stack``; + # queued commands are tracked on ``_execution_strategy.command_queue``. + # Fall back to ``command_stack`` / ``_command_stack`` for non-cluster + # pipelines and older redis-py versions. + if hasattr(instance, "_execution_strategy") and hasattr( + instance._execution_strategy, "command_queue" + ): + command_stack = instance._execution_strategy.command_queue + elif hasattr(instance, "command_stack"): + command_stack = instance.command_stack + else: + command_stack = instance._command_stack cmds = [ _format_command_args(c.args if hasattr(c, "args") else c[0]) diff --git a/instrumentation/opentelemetry-instrumentation-redis/tests/test_redis.py b/instrumentation/opentelemetry-instrumentation-redis/tests/test_redis.py index 5c38fc9db7..a9eec56ed9 100644 --- a/instrumentation/opentelemetry-instrumentation-redis/tests/test_redis.py +++ b/instrumentation/opentelemetry-instrumentation-redis/tests/test_redis.py @@ -22,6 +22,9 @@ _OpenTelemetrySemanticConventionStability, ) from opentelemetry.instrumentation.redis import RedisInstrumentor +from opentelemetry.instrumentation.redis.util import ( + _build_span_meta_data_for_pipeline, +) from opentelemetry.instrumentation.utils import suppress_instrumentation from opentelemetry.semconv._incubating.attributes.db_attributes import ( DB_REDIS_DATABASE_INDEX, @@ -1613,3 +1616,67 @@ def test_schema_url_combined_mode(self): call_args[1]["schema_url"], "https://opentelemetry.io/schemas/1.25.0", ) + + +class _FakeCommand: + def __init__(self, *args): + self.args = args + + +class _FakeExecutionStrategy: + def __init__(self, commands): + self.command_queue = commands + + +class _FakeClusterPipeline: + """Mimics redis-py 6+ ClusterPipeline: queued commands live on + ``_execution_strategy.command_queue`` while ``command_stack`` stays empty.""" + + def __init__(self, commands): + self.command_stack = [] + self._execution_strategy = _FakeExecutionStrategy(commands) + + +class _FakeLegacyPipeline: + def __init__(self, commands): + self.command_stack = commands + + +class TestBuildSpanMetaDataForPipeline(TestBase): + def test_cluster_pipeline_reads_execution_strategy(self): + # Regression test for issue #4084: redis-py 6+ ClusterPipeline no + # longer populates command_stack, so commands must be read from + # _execution_strategy.command_queue. + commands = [_FakeCommand("SET", "k1", "v1"), _FakeCommand("GET", "k1")] + instance = _FakeClusterPipeline(commands) + + command_stack, resource, span_name = ( + _build_span_meta_data_for_pipeline(instance) + ) + + self.assertEqual(len(command_stack), 2) + self.assertEqual(resource, "SET ? ?\nGET ?") + self.assertEqual(span_name, "SET GET") + + def test_legacy_pipeline_still_reads_command_stack(self): + commands = [_FakeCommand("SET", "k1", "v1")] + instance = _FakeLegacyPipeline(commands) + + command_stack, resource, span_name = ( + _build_span_meta_data_for_pipeline(instance) + ) + + self.assertEqual(len(command_stack), 1) + self.assertEqual(resource, "SET ? ?") + self.assertEqual(span_name, "SET") + + def test_empty_cluster_pipeline_falls_back_to_redis_span_name(self): + instance = _FakeClusterPipeline([]) + + command_stack, resource, span_name = ( + _build_span_meta_data_for_pipeline(instance) + ) + + self.assertEqual(command_stack, []) + self.assertEqual(resource, "") + self.assertEqual(span_name, "redis") From 8c0478d3ed4c68af0c364cb42fe1a38f6f821ae4 Mon Sep 17 00:00:00 2001 From: UTKARSH698 Date: Mon, 22 Jun 2026 11:02:58 +0530 Subject: [PATCH 2/2] chore(redis): rename changelog fragment to PR number --- .changelog/{4084.fixed => 4728.fixed} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename .changelog/{4084.fixed => 4728.fixed} (100%) diff --git a/.changelog/4084.fixed b/.changelog/4728.fixed similarity index 100% rename from .changelog/4084.fixed rename to .changelog/4728.fixed