Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions temporalio/contrib/opentelemetry/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
"""OpenTelemetry v2 integration for Temporal SDK.

This package provides OpenTelemetry tracing integration for Temporal workflows,
activities, and other operations. It includes automatic span creation and
propagation for distributed tracing.
"""

from temporalio.contrib.opentelemetry._interceptor import (
TracingInterceptor,
TracingWorkflowInboundInterceptor,
)
from temporalio.contrib.opentelemetry._otel_interceptor import OpenTelemetryInterceptor
from temporalio.contrib.opentelemetry._plugin import OpenTelemetryPlugin
from temporalio.contrib.opentelemetry._tracer_provider import init_tracer_provider

__all__ = [
"TracingInterceptor",
"TracingWorkflowInboundInterceptor",
"OpenTelemetryInterceptor",
"OpenTelemetryPlugin",
"init_tracer_provider",
]
74 changes: 74 additions & 0 deletions temporalio/contrib/opentelemetry/_id_generator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
import random

from opentelemetry.sdk.trace.id_generator import IdGenerator
from opentelemetry.trace import (
INVALID_SPAN_ID,
INVALID_TRACE_ID,
)

import temporalio.workflow


def _get_workflow_random() -> random.Random | None:
if (
temporalio.workflow.in_workflow()
and not temporalio.workflow.unsafe.is_read_only()
):
# Cache the random on the workflow instance because this IdGenerator is created outside of the workflow
# but the random should be reseeded for each workflow task
if (
getattr(temporalio.workflow.instance(), "__temporal_otel_id_random", None)
is None
):
setattr(
temporalio.workflow.instance(),
"__temporal_otel_id_random",
temporalio.workflow.new_random(),
)
return getattr(temporalio.workflow.instance(), "__temporal_otel_id_random")

return None


class TemporalIdGenerator(IdGenerator):
"""OpenTelemetry ID generator that uses Temporal's deterministic random generator.

.. warning::
This class is experimental and may change in future versions.
Use with caution in production environments.

This generator uses Temporal's workflow-safe random number generator when
inside a workflow execution, ensuring deterministic span and trace IDs
across workflow replays. Falls back to standard random generation outside
of workflows.
"""

def __init__(self, id_generator: IdGenerator):
"""Initialize a TemporalIdGenerator."""
self._id_generator = id_generator

def generate_span_id(self) -> int:
"""Generate a span ID using Temporal's deterministic random when in workflow.

Returns:
A 64-bit span ID.
"""
if workflow_random := _get_workflow_random():
span_id = workflow_random.getrandbits(64)
while span_id == INVALID_SPAN_ID:
span_id = workflow_random.getrandbits(64)
return span_id
return self._id_generator.generate_span_id()

def generate_trace_id(self) -> int:
"""Generate a trace ID using Temporal's deterministic random when in workflow.

Returns:
A 128-bit trace ID.
"""
if workflow_random := _get_workflow_random():
trace_id = workflow_random.getrandbits(128)
while trace_id == INVALID_TRACE_ID:
trace_id = workflow_random.getrandbits(128)
return trace_id
return self._id_generator.generate_trace_id()
Original file line number Diff line number Diff line change
Expand Up @@ -826,43 +826,3 @@ def _carrier_to_nexus_headers(
else:
out[k] = v
return out


class workflow:
"""Contains static methods that are safe to call from within a workflow.

.. warning::
Using any other ``opentelemetry`` API could cause non-determinism.
"""

def __init__(self) -> None: # noqa: D107
raise NotImplementedError

@staticmethod
def completed_span(
name: str,
*,
attributes: opentelemetry.util.types.Attributes = None,
exception: Exception | None = None,
) -> None:
"""Create and end an OpenTelemetry span.

Note, this will only create and record when the workflow is not
replaying and if there is a current span (meaning the client started a
span and this interceptor is configured on the worker and the span is on
the context).

There is currently no way to create a long-running span or to create a
span that actually spans other code.

Args:
name: Name of the span.
attributes: Attributes to set on the span if any. Workflow ID and
run ID are automatically added.
exception: Optional exception to record on the span.
"""
interceptor = TracingWorkflowInboundInterceptor._from_context()
if interceptor:
interceptor._completed_span(
name, additional_attributes=attributes, exception=exception
)
Loading
Loading