diff --git a/.changelog/4728.fixed b/.changelog/4728.fixed new file mode 100644 index 0000000000..a098e69312 --- /dev/null +++ b/.changelog/4728.fixed @@ -0,0 +1 @@ +`opentelemetry-instrumentation-redis`: extract `ClusterPipeline` commands from `_execution_strategy` in redis-py 6+ so pipeline span attributes include the queued commands instead of being empty diff --git a/instrumentation/opentelemetry-instrumentation-redis/src/opentelemetry/instrumentation/redis/util.py b/instrumentation/opentelemetry-instrumentation-redis/src/opentelemetry/instrumentation/redis/util.py index 1eae0e624d..388a495fec 100644 --- a/instrumentation/opentelemetry-instrumentation-redis/src/opentelemetry/instrumentation/redis/util.py +++ b/instrumentation/opentelemetry-instrumentation-redis/src/opentelemetry/instrumentation/redis/util.py @@ -203,11 +203,18 @@ def _build_span_meta_data_for_pipeline( instance: PipelineInstance | AsyncPipelineInstance, ) -> tuple[list[Any], str, str]: try: - command_stack = ( - instance.command_stack - if hasattr(instance, "command_stack") - else instance._command_stack - ) + # redis-py 6+ ClusterPipeline no longer updates ``command_stack``; + # queued commands are tracked on ``_execution_strategy.command_queue``. + # Fall back to ``command_stack`` / ``_command_stack`` for non-cluster + # pipelines and older redis-py versions. + if hasattr(instance, "_execution_strategy") and hasattr( + instance._execution_strategy, "command_queue" + ): + command_stack = instance._execution_strategy.command_queue + elif hasattr(instance, "command_stack"): + command_stack = instance.command_stack + else: + command_stack = instance._command_stack cmds = [ _format_command_args(c.args if hasattr(c, "args") else c[0]) diff --git a/instrumentation/opentelemetry-instrumentation-redis/tests/test_redis.py b/instrumentation/opentelemetry-instrumentation-redis/tests/test_redis.py index 5c38fc9db7..a9eec56ed9 100644 --- a/instrumentation/opentelemetry-instrumentation-redis/tests/test_redis.py +++ b/instrumentation/opentelemetry-instrumentation-redis/tests/test_redis.py @@ -22,6 +22,9 @@ _OpenTelemetrySemanticConventionStability, ) from opentelemetry.instrumentation.redis import RedisInstrumentor +from opentelemetry.instrumentation.redis.util import ( + _build_span_meta_data_for_pipeline, +) from opentelemetry.instrumentation.utils import suppress_instrumentation from opentelemetry.semconv._incubating.attributes.db_attributes import ( DB_REDIS_DATABASE_INDEX, @@ -1613,3 +1616,67 @@ def test_schema_url_combined_mode(self): call_args[1]["schema_url"], "https://opentelemetry.io/schemas/1.25.0", ) + + +class _FakeCommand: + def __init__(self, *args): + self.args = args + + +class _FakeExecutionStrategy: + def __init__(self, commands): + self.command_queue = commands + + +class _FakeClusterPipeline: + """Mimics redis-py 6+ ClusterPipeline: queued commands live on + ``_execution_strategy.command_queue`` while ``command_stack`` stays empty.""" + + def __init__(self, commands): + self.command_stack = [] + self._execution_strategy = _FakeExecutionStrategy(commands) + + +class _FakeLegacyPipeline: + def __init__(self, commands): + self.command_stack = commands + + +class TestBuildSpanMetaDataForPipeline(TestBase): + def test_cluster_pipeline_reads_execution_strategy(self): + # Regression test for issue #4084: redis-py 6+ ClusterPipeline no + # longer populates command_stack, so commands must be read from + # _execution_strategy.command_queue. + commands = [_FakeCommand("SET", "k1", "v1"), _FakeCommand("GET", "k1")] + instance = _FakeClusterPipeline(commands) + + command_stack, resource, span_name = ( + _build_span_meta_data_for_pipeline(instance) + ) + + self.assertEqual(len(command_stack), 2) + self.assertEqual(resource, "SET ? ?\nGET ?") + self.assertEqual(span_name, "SET GET") + + def test_legacy_pipeline_still_reads_command_stack(self): + commands = [_FakeCommand("SET", "k1", "v1")] + instance = _FakeLegacyPipeline(commands) + + command_stack, resource, span_name = ( + _build_span_meta_data_for_pipeline(instance) + ) + + self.assertEqual(len(command_stack), 1) + self.assertEqual(resource, "SET ? ?") + self.assertEqual(span_name, "SET") + + def test_empty_cluster_pipeline_falls_back_to_redis_span_name(self): + instance = _FakeClusterPipeline([]) + + command_stack, resource, span_name = ( + _build_span_meta_data_for_pipeline(instance) + ) + + self.assertEqual(command_stack, []) + self.assertEqual(resource, "") + self.assertEqual(span_name, "redis")