Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
!.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
`opentelemetry-instrumentation-aiokafka`: add `capture_experimental_span_attributes` option to gate `messaging.cluster.id` on producer and consumer spans
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ class InstrumentKwargs(TypedDict, total=False):
tracer_provider: trace.TracerProvider
async_produce_hook: ProduceHookT
async_consume_hook: ConsumeHookT
capture_experimental_span_attributes: bool

class UninstrumentKwargs(TypedDict, total=False):
pass
Expand All @@ -147,6 +148,8 @@ def _instrument(self, **kwargs: Unpack[InstrumentKwargs]):
``tracer_provider``: a TracerProvider, defaults to global.
``async_produce_hook``: a callable to be executed just before producing a message
``async_consume_hook``: a callable to be executed just after consuming a message
``capture_experimental_span_attributes``: if True, emit experimental
``messaging.cluster.id`` on producer and consumer spans. Defaults to False.
"""
tracer_provider = kwargs.get("tracer_provider")

Expand All @@ -158,6 +161,10 @@ def _instrument(self, **kwargs: Unpack[InstrumentKwargs]):
if not iscoroutinefunction(async_consume_hook):
async_consume_hook = None

capture_experimental_span_attributes = kwargs.get(
"capture_experimental_span_attributes", False
)

tracer = trace.get_tracer(
__name__,
__version__,
Expand All @@ -168,17 +175,29 @@ def _instrument(self, **kwargs: Unpack[InstrumentKwargs]):
wrap_function_wrapper(
aiokafka.AIOKafkaProducer,
"send",
_wrap_send(tracer, async_produce_hook),
_wrap_send(
tracer,
async_produce_hook,
capture_experimental_span_attributes,
),
)
wrap_function_wrapper(
aiokafka.AIOKafkaConsumer,
"getone",
_wrap_getone(tracer, async_consume_hook),
_wrap_getone(
tracer,
async_consume_hook,
capture_experimental_span_attributes,
),
)
wrap_function_wrapper(
aiokafka.AIOKafkaConsumer,
"getmany",
_wrap_getmany(tracer, async_consume_hook),
_wrap_getmany(
tracer,
async_consume_hook,
capture_experimental_span_attributes,
),
)

def _uninstrument(self, **kwargs: Unpack[UninstrumentKwargs]):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@ async def __call__(

_LOG = getLogger(__name__)

_MESSAGING_CLUSTER_ID = "messaging.cluster.id"


def _extract_bootstrap_servers(
client: aiokafka.AIOKafkaClient,
Expand All @@ -90,6 +92,24 @@ def _extract_client_id(client: aiokafka.AIOKafkaClient) -> str:
return client._client_id


def _extract_cluster_id_from_client(
client: aiokafka.AIOKafkaClient,
) -> str | None:
"""Read cluster ID from the aiokafka client's cached cluster metadata.

aiokafka sets AIOKafkaClient.cluster.cluster_id after the first successful
broker metadata response — no extra connection or background thread needed.
Returns None if metadata has not been received yet.
"""
try:
cluster_id = getattr(
getattr(client, "cluster", None), "cluster_id", None
)
return cluster_id if cluster_id else None
except Exception: # pylint: disable=broad-except
return None


def _extract_consumer_group(
consumer: aiokafka.AIOKafkaConsumer,
) -> str | None:
Expand Down Expand Up @@ -237,6 +257,7 @@ def _enrich_base_span(
topic: str,
partition: int | None,
key: str | None,
cluster_id: str | None = None,
) -> None:
span.set_attribute(
messaging_attributes.MESSAGING_SYSTEM,
Expand All @@ -259,6 +280,9 @@ def _enrich_base_span(
messaging_attributes.MESSAGING_KAFKA_MESSAGE_KEY, key
)

if cluster_id is not None:
span.set_attribute(_MESSAGING_CLUSTER_ID, cluster_id)


def _enrich_send_span(
span: Span,
Expand All @@ -268,6 +292,7 @@ def _enrich_send_span(
topic: str,
partition: int | None,
key: str | None,
cluster_id: str | None = None,
) -> None:
if not span.is_recording():
return
Expand All @@ -279,6 +304,7 @@ def _enrich_send_span(
topic=topic,
partition=partition,
key=key,
cluster_id=cluster_id,
)

span.set_attribute(messaging_attributes.MESSAGING_OPERATION_NAME, "send")
Expand All @@ -298,6 +324,7 @@ def _enrich_getone_span(
partition: int | None,
key: str | None,
offset: int,
cluster_id: str | None = None,
) -> None:
if not span.is_recording():
return
Expand All @@ -309,6 +336,7 @@ def _enrich_getone_span(
topic=topic,
partition=partition,
key=key,
cluster_id=cluster_id,
)

if consumer_group is not None:
Expand Down Expand Up @@ -344,6 +372,7 @@ def _enrich_getmany_poll_span(
client_id: str,
consumer_group: str | None,
message_count: int,
cluster_id: str | None = None,
) -> None:
if not span.is_recording():
return
Expand All @@ -357,6 +386,9 @@ def _enrich_getmany_poll_span(
)
span.set_attribute(messaging_attributes.MESSAGING_CLIENT_ID, client_id)

if cluster_id is not None:
span.set_attribute(_MESSAGING_CLUSTER_ID, cluster_id)

if consumer_group is not None:
span.set_attribute(
messaging_attributes.MESSAGING_CONSUMER_GROUP_NAME, consumer_group
Expand Down Expand Up @@ -384,6 +416,7 @@ def _enrich_getmany_topic_span(
topic: str,
partition: int,
message_count: int,
cluster_id: str | None = None,
) -> None:
if not span.is_recording():
return
Expand All @@ -395,6 +428,7 @@ def _enrich_getmany_topic_span(
topic=topic,
partition=partition,
key=None,
cluster_id=cluster_id,
)

if consumer_group is not None:
Expand All @@ -420,7 +454,9 @@ def _get_span_name(operation: str, topic: str):


def _wrap_send( # type: ignore[reportUnusedFunction]
tracer: Tracer, async_produce_hook: ProduceHookT | None
tracer: Tracer,
async_produce_hook: ProduceHookT | None,
capture_experimental_span_attributes: bool = False,
) -> Callable[..., Awaitable[asyncio.Future[RecordMetadata]]]:
async def _traced_send(
func: AIOKafkaSendProto,
Expand All @@ -439,6 +475,11 @@ async def _traced_send(
client_id = _extract_client_id(instance.client)
key = _deserialize_key(_extract_send_key(args, kwargs))
partition = await _extract_send_partition(instance, args, kwargs)
cluster_id = (
_extract_cluster_id_from_client(instance.client)
if capture_experimental_span_attributes
else None
)
span_name = _get_span_name("send", topic)
with tracer.start_as_current_span(
span_name, kind=trace.SpanKind.PRODUCER
Expand All @@ -450,6 +491,7 @@ async def _traced_send(
topic=topic,
partition=partition,
key=key,
cluster_id=cluster_id,
)
propagate.inject(
headers,
Expand All @@ -461,8 +503,14 @@ async def _traced_send(
await async_produce_hook(span, args, kwargs)
except Exception as hook_exception: # pylint: disable=W0703
_LOG.exception(hook_exception)

return await func(*args, **kwargs)
result = await func(*args, **kwargs)
# After send(), broker has responded and metadata is populated.
# Set cluster_id now if it wasn't available at span start.
if capture_experimental_span_attributes:
cluster_id = _extract_cluster_id_from_client(instance.client)
if cluster_id is not None and span.is_recording():
span.set_attribute(_MESSAGING_CLUSTER_ID, cluster_id)
return result

return _traced_send

Expand All @@ -475,6 +523,7 @@ async def _create_consumer_span(
bootstrap_servers: str | list[str],
client_id: str,
consumer_group: str | None,
cluster_id: str | None,
args: tuple[aiokafka.TopicPartition, ...],
kwargs: dict[str, Any],
) -> trace.Span:
Expand All @@ -495,6 +544,7 @@ async def _create_consumer_span(
partition=record.partition,
key=_deserialize_key(record.key),
offset=record.offset,
cluster_id=cluster_id,
)
try:
if async_consume_hook is not None:
Expand All @@ -508,7 +558,9 @@ async def _create_consumer_span(


def _wrap_getone( # type: ignore[reportUnusedFunction]
tracer: Tracer, async_consume_hook: ConsumeHookT | None
tracer: Tracer,
async_consume_hook: ConsumeHookT | None,
capture_experimental_span_attributes: bool = False,
) -> Callable[..., Awaitable[aiokafka.ConsumerRecord[object, object]]]:
async def _traced_getone(
func: AIOKafkaGetOneProto,
Expand All @@ -522,6 +574,11 @@ async def _traced_getone(
bootstrap_servers = _extract_bootstrap_servers(instance._client)
client_id = _extract_client_id(instance._client)
consumer_group = _extract_consumer_group(instance)
cluster_id = (
_extract_cluster_id_from_client(instance._client)
if capture_experimental_span_attributes
else None
)

extracted_context = propagate.extract(
record.headers, getter=_aiokafka_getter
Expand All @@ -534,6 +591,7 @@ async def _traced_getone(
bootstrap_servers,
client_id,
consumer_group,
cluster_id,
args,
kwargs,
)
Expand All @@ -543,7 +601,9 @@ async def _traced_getone(


def _wrap_getmany( # type: ignore[reportUnusedFunction]
tracer: Tracer, async_consume_hook: ConsumeHookT | None
tracer: Tracer,
async_consume_hook: ConsumeHookT | None,
capture_experimental_span_attributes: bool = False,
) -> Callable[
...,
Awaitable[
Expand All @@ -567,6 +627,11 @@ async def _traced_getmany(
bootstrap_servers = _extract_bootstrap_servers(instance._client)
client_id = _extract_client_id(instance._client)
consumer_group = _extract_consumer_group(instance)
cluster_id = (
_extract_cluster_id_from_client(instance._client)
if capture_experimental_span_attributes
else None
)

span_name = _get_span_name(
"receive",
Expand All @@ -581,6 +646,7 @@ async def _traced_getmany(
client_id=client_id,
consumer_group=consumer_group,
message_count=sum(len(r) for r in records.values()),
cluster_id=cluster_id,
)

for topic, topic_records in records.items():
Expand All @@ -596,6 +662,7 @@ async def _traced_getmany(
topic=topic.topic,
partition=topic.partition,
message_count=len(topic_records),
cluster_id=cluster_id,
)

for record in topic_records:
Expand All @@ -610,6 +677,7 @@ async def _traced_getmany(
bootstrap_servers,
client_id,
consumer_group,
cluster_id,
args,
kwargs,
)
Expand Down
Loading
Loading