diff --git a/temporalio/contrib/opentelemetry/__init__.py b/temporalio/contrib/opentelemetry/__init__.py new file mode 100644 index 000000000..f2aed9e29 --- /dev/null +++ b/temporalio/contrib/opentelemetry/__init__.py @@ -0,0 +1,22 @@ +"""OpenTelemetry v2 integration for Temporal SDK. + +This package provides OpenTelemetry tracing integration for Temporal workflows, +activities, and other operations. It includes automatic span creation and +propagation for distributed tracing. +""" + +from temporalio.contrib.opentelemetry._interceptor import ( + TracingInterceptor, + TracingWorkflowInboundInterceptor, +) +from temporalio.contrib.opentelemetry._otel_interceptor import OpenTelemetryInterceptor +from temporalio.contrib.opentelemetry._plugin import OpenTelemetryPlugin +from temporalio.contrib.opentelemetry._tracer_provider import init_tracer_provider + +__all__ = [ + "TracingInterceptor", + "TracingWorkflowInboundInterceptor", + "OpenTelemetryInterceptor", + "OpenTelemetryPlugin", + "init_tracer_provider", +] diff --git a/temporalio/contrib/opentelemetry/_id_generator.py b/temporalio/contrib/opentelemetry/_id_generator.py new file mode 100644 index 000000000..f5b2c73e5 --- /dev/null +++ b/temporalio/contrib/opentelemetry/_id_generator.py @@ -0,0 +1,74 @@ +import random + +from opentelemetry.sdk.trace.id_generator import IdGenerator +from opentelemetry.trace import ( + INVALID_SPAN_ID, + INVALID_TRACE_ID, +) + +import temporalio.workflow + + +def _get_workflow_random() -> random.Random | None: + if ( + temporalio.workflow.in_workflow() + and not temporalio.workflow.unsafe.is_read_only() + ): + # Cache the random on the workflow instance because this IdGenerator is created outside of the workflow + # but the random should be reseeded for each workflow task + if ( + getattr(temporalio.workflow.instance(), "__temporal_otel_id_random", None) + is None + ): + setattr( + temporalio.workflow.instance(), + "__temporal_otel_id_random", + temporalio.workflow.new_random(), + ) + return getattr(temporalio.workflow.instance(), "__temporal_otel_id_random") + + return None + + +class TemporalIdGenerator(IdGenerator): + """OpenTelemetry ID generator that uses Temporal's deterministic random generator. + + .. warning:: + This class is experimental and may change in future versions. + Use with caution in production environments. + + This generator uses Temporal's workflow-safe random number generator when + inside a workflow execution, ensuring deterministic span and trace IDs + across workflow replays. Falls back to standard random generation outside + of workflows. + """ + + def __init__(self, id_generator: IdGenerator): + """Initialize a TemporalIdGenerator.""" + self._id_generator = id_generator + + def generate_span_id(self) -> int: + """Generate a span ID using Temporal's deterministic random when in workflow. + + Returns: + A 64-bit span ID. + """ + if workflow_random := _get_workflow_random(): + span_id = workflow_random.getrandbits(64) + while span_id == INVALID_SPAN_ID: + span_id = workflow_random.getrandbits(64) + return span_id + return self._id_generator.generate_span_id() + + def generate_trace_id(self) -> int: + """Generate a trace ID using Temporal's deterministic random when in workflow. + + Returns: + A 128-bit trace ID. + """ + if workflow_random := _get_workflow_random(): + trace_id = workflow_random.getrandbits(128) + while trace_id == INVALID_TRACE_ID: + trace_id = workflow_random.getrandbits(128) + return trace_id + return self._id_generator.generate_trace_id() diff --git a/temporalio/contrib/opentelemetry.py b/temporalio/contrib/opentelemetry/_interceptor.py similarity index 96% rename from temporalio/contrib/opentelemetry.py rename to temporalio/contrib/opentelemetry/_interceptor.py index ef1e52bb2..a72b44f19 100644 --- a/temporalio/contrib/opentelemetry.py +++ b/temporalio/contrib/opentelemetry/_interceptor.py @@ -826,43 +826,3 @@ def _carrier_to_nexus_headers( else: out[k] = v return out - - -class workflow: - """Contains static methods that are safe to call from within a workflow. - - .. warning:: - Using any other ``opentelemetry`` API could cause non-determinism. - """ - - def __init__(self) -> None: # noqa: D107 - raise NotImplementedError - - @staticmethod - def completed_span( - name: str, - *, - attributes: opentelemetry.util.types.Attributes = None, - exception: Exception | None = None, - ) -> None: - """Create and end an OpenTelemetry span. - - Note, this will only create and record when the workflow is not - replaying and if there is a current span (meaning the client started a - span and this interceptor is configured on the worker and the span is on - the context). - - There is currently no way to create a long-running span or to create a - span that actually spans other code. - - Args: - name: Name of the span. - attributes: Attributes to set on the span if any. Workflow ID and - run ID are automatically added. - exception: Optional exception to record on the span. - """ - interceptor = TracingWorkflowInboundInterceptor._from_context() - if interceptor: - interceptor._completed_span( - name, additional_attributes=attributes, exception=exception - ) diff --git a/temporalio/contrib/opentelemetry/_otel_interceptor.py b/temporalio/contrib/opentelemetry/_otel_interceptor.py new file mode 100644 index 000000000..5c34cf95a --- /dev/null +++ b/temporalio/contrib/opentelemetry/_otel_interceptor.py @@ -0,0 +1,626 @@ +"""OpenTelemetry interceptor that creates/propagates spans.""" + +from __future__ import annotations + +from collections.abc import Iterator, Mapping +from contextlib import contextmanager +from typing import ( + Any, + NoReturn, + TypeAlias, +) + +import nexusrpc.handler +import opentelemetry.baggage.propagation +import opentelemetry.context +import opentelemetry.propagators.composite +import opentelemetry.propagators.textmap +import opentelemetry.trace +import opentelemetry.trace.propagation.tracecontext +import opentelemetry.util.types +from opentelemetry.context import Context +from opentelemetry.trace import ( + Status, + StatusCode, + Tracer, +) +from typing_extensions import Protocol + +import temporalio.activity +import temporalio.api.common.v1 +import temporalio.client +import temporalio.contrib.opentelemetry.workflow +import temporalio.converter +import temporalio.worker +import temporalio.workflow +from temporalio import workflow +from temporalio.exceptions import ApplicationError, ApplicationErrorCategory + +# OpenTelemetry dynamically, lazily chooses its context implementation at +# runtime. When first accessed, they use pkg_resources.iter_entry_points + load. +# The load uses built-in open() which we don't allow in sandbox mode at runtime, +# only import time. Therefore if the first use of a OTel context is inside the +# sandbox, which it may be for a workflow worker, this will fail. So instead we +# eagerly reference it here to force loading at import time instead of lazily. +opentelemetry.context.get_current() + +default_text_map_propagator = opentelemetry.propagators.composite.CompositePropagator( + [ + opentelemetry.trace.propagation.tracecontext.TraceContextTextMapPropagator(), + opentelemetry.baggage.propagation.W3CBaggagePropagator(), + ] +) +"""Default text map propagator used by :py:class:`TracingInterceptor`.""" + +_CarrierDict: TypeAlias = dict[str, opentelemetry.propagators.textmap.CarrierValT] + + +def _context_to_headers( + headers: Mapping[str, temporalio.api.common.v1.Payload], +) -> Mapping[str, temporalio.api.common.v1.Payload]: + carrier: _CarrierDict = {} + default_text_map_propagator.inject(carrier) + if carrier: + headers = { + **headers, + "_tracer-data": temporalio.converter.PayloadConverter.default.to_payloads( + [carrier] + )[0], + } + return headers + + +def _context_to_nexus_headers(headers: Mapping[str, str]) -> Mapping[str, str]: + carrier: _CarrierDict = {} + default_text_map_propagator.inject(carrier) + if carrier: + out = {**headers} if headers else {} + for k, v in carrier.items(): + if isinstance(v, list): + out[k] = ",".join(v) + else: + out[k] = v + return out + else: + return headers + + +def _headers_to_context( + headers: Mapping[str, temporalio.api.common.v1.Payload], +) -> Context: + context_header = headers.get("_tracer-data") + if context_header: + context_carrier: _CarrierDict = ( + temporalio.converter.PayloadConverter.default.from_payloads( + [context_header] + )[0] + ) + + context = default_text_map_propagator.extract(context_carrier) + else: + context = opentelemetry.context.Context() + return context + + +def _nexus_headers_to_context(headers: Mapping[str, str]) -> Context: + context = default_text_map_propagator.extract(headers) + return context + + +def _ensure_tracer(tracer: Tracer) -> None: + """We use a custom uuid generator for spans to ensure that changes to user code workflow.random usage + do not affect tracing and vice versa. + """ + instance = workflow.instance() + if not hasattr(instance, "__temporal_opentelemetry_tracer"): + setattr( + workflow.instance(), + "__temporal_opentelemetry_tracer", + tracer, + ) + + +@contextmanager +def _maybe_span( + tracer: Tracer, + name: str, + *, + add_temporal_spans: bool, + attributes: opentelemetry.util.types.Attributes, + kind: opentelemetry.trace.SpanKind, + context: Context | None = None, +) -> Iterator[None]: + if not add_temporal_spans: + yield + return + + token = opentelemetry.context.attach(context) if context else None + try: + span_factory = ( + temporalio.contrib.opentelemetry.workflow.tracer().start_as_current_span + if workflow.in_workflow() + else tracer.start_as_current_span + ) + with span_factory( + name, + attributes=attributes, + kind=kind, + context=context, + set_status_on_exception=False, + ) as span: + try: + yield + except Exception as exc: + if ( + not isinstance(exc, ApplicationError) + or exc.category != ApplicationErrorCategory.BENIGN + ): + span.set_status( + Status( + status_code=StatusCode.ERROR, + description=f"{type(exc).__name__}: {exc}", + ) + ) + raise + finally: + if token and context is opentelemetry.context.get_current(): + opentelemetry.context.detach(token) + + +class OpenTelemetryInterceptor( + temporalio.client.Interceptor, temporalio.worker.Interceptor +): + """Interceptor that supports client and worker OpenTelemetry span creation + and propagation. + + .. warning:: + This class is experimental and may change in future versions. + Use with caution in production environments. + + This should be created and used for ``interceptors`` on the + :py:meth:`temporalio.client.Client.connect` call to apply to all client + calls and worker calls using that client. To only apply to workers, set as + worker creation option instead of in client. + """ + + def __init__( # type: ignore[reportMissingSuperCall] + self, + tracer: opentelemetry.trace.Tracer, + add_temporal_spans: bool = False, + ) -> None: + """Initialize a OpenTelemetry tracing interceptor.""" + self._tracer = tracer + self._add_temporal_spans = add_temporal_spans + + def intercept_client( + self, next: temporalio.client.OutboundInterceptor + ) -> temporalio.client.OutboundInterceptor: + """Implementation of + :py:meth:`temporalio.client.Interceptor.intercept_client`. + """ + return _TracingClientOutboundInterceptor( + next, self._tracer, self._add_temporal_spans + ) + + def intercept_activity( + self, next: temporalio.worker.ActivityInboundInterceptor + ) -> temporalio.worker.ActivityInboundInterceptor: + """Implementation of + :py:meth:`temporalio.worker.Interceptor.intercept_activity`. + """ + return _TracingActivityInboundInterceptor( + next, self._tracer, self._add_temporal_spans + ) + + def workflow_interceptor_class( + self, input: temporalio.worker.WorkflowInterceptorClassInput + ) -> type[_TracingWorkflowInboundInterceptor]: + """Implementation of + :py:meth:`temporalio.worker.Interceptor.workflow_interceptor_class`. + """ + tracer = self._tracer + + class InterceptorWithState(_TracingWorkflowInboundInterceptor): + _add_temporal_spans = self._add_temporal_spans + + def get_tracer(self): + return tracer + + return InterceptorWithState + + def intercept_nexus_operation( + self, next: temporalio.worker.NexusOperationInboundInterceptor + ) -> temporalio.worker.NexusOperationInboundInterceptor: + """Implementation of + :py:meth:`temporalio.worker.Interceptor.intercept_nexus_operation`. + """ + return _TracingNexusOperationInboundInterceptor( + next, self._tracer, self._add_temporal_spans + ) + + +class _TracingClientOutboundInterceptor(temporalio.client.OutboundInterceptor): + def __init__( + self, + next: temporalio.client.OutboundInterceptor, + tracer: Tracer, + add_temporal_spans: bool, + ) -> None: + super().__init__(next) + self._add_temporal_spans = add_temporal_spans + self._tracer = tracer + + async def start_workflow( + self, input: temporalio.client.StartWorkflowInput + ) -> temporalio.client.WorkflowHandle[Any, Any]: + prefix = ( + "StartWorkflow" if not input.start_signal else "SignalWithStartWorkflow" + ) + with _maybe_span( + self._tracer, + f"{prefix}:{input.workflow}", + add_temporal_spans=self._add_temporal_spans, + attributes={"temporalWorkflowID": input.id}, + kind=opentelemetry.trace.SpanKind.CLIENT, + ): + input.headers = _context_to_headers(input.headers) + return await super().start_workflow(input) + + async def query_workflow(self, input: temporalio.client.QueryWorkflowInput) -> Any: + with _maybe_span( + self._tracer, + f"QueryWorkflow:{input.query}", + add_temporal_spans=self._add_temporal_spans, + attributes={"temporalWorkflowID": input.id}, + kind=opentelemetry.trace.SpanKind.CLIENT, + ): + input.headers = _context_to_headers(input.headers) + return await super().query_workflow(input) + + async def signal_workflow( + self, input: temporalio.client.SignalWorkflowInput + ) -> None: + with _maybe_span( + self._tracer, + f"SignalWorkflow:{input.signal}", + add_temporal_spans=self._add_temporal_spans, + attributes={"temporalWorkflowID": input.id}, + kind=opentelemetry.trace.SpanKind.CLIENT, + ): + input.headers = _context_to_headers(input.headers) + return await super().signal_workflow(input) + + async def start_workflow_update( + self, input: temporalio.client.StartWorkflowUpdateInput + ) -> temporalio.client.WorkflowUpdateHandle[Any]: + with _maybe_span( + self._tracer, + f"StartWorkflowUpdate:{input.update}", + add_temporal_spans=self._add_temporal_spans, + attributes={"temporalWorkflowID": input.id}, + kind=opentelemetry.trace.SpanKind.CLIENT, + ): + input.headers = _context_to_headers(input.headers) + return await super().start_workflow_update(input) + + async def start_update_with_start_workflow( + self, input: temporalio.client.StartWorkflowUpdateWithStartInput + ) -> temporalio.client.WorkflowUpdateHandle[Any]: + attrs = { + "temporalWorkflowID": input.start_workflow_input.id, + } + if input.update_workflow_input.update_id is not None: + attrs["temporalUpdateID"] = input.update_workflow_input.update_id + + with _maybe_span( + self._tracer, + f"StartUpdateWithStartWorkflow:{input.start_workflow_input.workflow}", + add_temporal_spans=self._add_temporal_spans, + attributes=attrs, + kind=opentelemetry.trace.SpanKind.CLIENT, + ): + input.start_workflow_input.headers = _context_to_headers( + input.start_workflow_input.headers + ) + input.update_workflow_input.headers = _context_to_headers( + input.update_workflow_input.headers + ) + return await super().start_update_with_start_workflow(input) + + +class _TracingActivityInboundInterceptor(temporalio.worker.ActivityInboundInterceptor): + def __init__( + self, + next: temporalio.worker.ActivityInboundInterceptor, + tracer: Tracer, + add_temporal_spans: bool, + ) -> None: + super().__init__(next) + self._tracer = tracer + self._add_temporal_spans = add_temporal_spans + + async def execute_activity( + self, input: temporalio.worker.ExecuteActivityInput + ) -> Any: + context = _headers_to_context(input.headers) + token = opentelemetry.context.attach(context) + try: + info = temporalio.activity.info() + with _maybe_span( + self._tracer, + f"RunActivity:{info.activity_type}", + add_temporal_spans=self._add_temporal_spans, + attributes={ + "temporalWorkflowID": info.workflow_id, + "temporalRunID": info.workflow_run_id, + "temporalActivityID": info.activity_id, + }, + kind=opentelemetry.trace.SpanKind.SERVER, + ): + return await super().execute_activity(input) + finally: + if context is opentelemetry.context.get_current(): + opentelemetry.context.detach(token) + + +class _TracingNexusOperationInboundInterceptor( + temporalio.worker.NexusOperationInboundInterceptor +): + def __init__( + self, + next: temporalio.worker.NexusOperationInboundInterceptor, + tracer: Tracer, + add_temporal_spans: bool, + ) -> None: + super().__init__(next) + self._tracer = tracer + self._add_temporal_spans = add_temporal_spans + + @contextmanager + def _top_level_context(self, headers: Mapping[str, str]) -> Iterator[None]: + context = _nexus_headers_to_context(headers) + token = opentelemetry.context.attach(context) + try: + yield + finally: + if context is opentelemetry.context.get_current(): + opentelemetry.context.detach(token) + + async def execute_nexus_operation_start( + self, input: temporalio.worker.ExecuteNexusOperationStartInput + ) -> ( + nexusrpc.handler.StartOperationResultSync[Any] + | nexusrpc.handler.StartOperationResultAsync + ): + with self._top_level_context(input.ctx.headers): + with _maybe_span( + self._tracer, + f"RunStartNexusOperationHandler:{input.ctx.service}/{input.ctx.operation}", + add_temporal_spans=self._add_temporal_spans, + attributes={}, + kind=opentelemetry.trace.SpanKind.SERVER, + ): + return await self.next.execute_nexus_operation_start(input) + + async def execute_nexus_operation_cancel( + self, input: temporalio.worker.ExecuteNexusOperationCancelInput + ) -> None: + with self._top_level_context(input.ctx.headers): + with _maybe_span( + self._tracer, + f"RunCancelNexusOperationHandler:{input.ctx.service}/{input.ctx.operation}", + add_temporal_spans=self._add_temporal_spans, + attributes={}, + kind=opentelemetry.trace.SpanKind.SERVER, + ): + return await self.next.execute_nexus_operation_cancel(input) + + +class _InputWithHeaders(Protocol): + headers: Mapping[str, temporalio.api.common.v1.Payload] + + +class _TracingWorkflowInboundInterceptor(temporalio.worker.WorkflowInboundInterceptor): + """Tracing interceptor for workflow calls.""" + + _add_temporal_spans: bool = False + + def __init__(self, next: temporalio.worker.WorkflowInboundInterceptor) -> None: + """Initialize a tracing workflow interceptor.""" + super().__init__(next) + + def get_tracer(self) -> Tracer: + raise NotImplementedError() + + def init(self, outbound: temporalio.worker.WorkflowOutboundInterceptor) -> None: + """Implementation of + :py:meth:`temporalio.worker.WorkflowInboundInterceptor.init`. + """ + super().init( + _TracingWorkflowOutboundInterceptor( + outbound, self.get_tracer(), self._add_temporal_spans + ) + ) + + @contextmanager + def _workflow_maybe_span(self, name: str) -> Iterator[None]: + info = temporalio.workflow.info() + attributes: dict[str, opentelemetry.util.types.AttributeValue] = { + "temporalWorkflowID": info.workflow_id, + "temporalRunID": info.run_id, + } + with _maybe_span( + self.get_tracer(), + name, + add_temporal_spans=self._add_temporal_spans, + attributes=attributes, + kind=opentelemetry.trace.SpanKind.SERVER, + ): + yield + + async def execute_workflow( + self, input: temporalio.worker.ExecuteWorkflowInput + ) -> Any: + """Implementation of + :py:meth:`temporalio.worker.WorkflowInboundInterceptor.execute_workflow`. + """ + _ensure_tracer(self.get_tracer()) + with self._top_level_workflow_context(input): + with self._workflow_maybe_span( + f"RunWorkflow:{temporalio.workflow.info().workflow_type}" + ): + return await super().execute_workflow(input) + + async def handle_signal(self, input: temporalio.worker.HandleSignalInput) -> None: + """Implementation of + :py:meth:`temporalio.worker.WorkflowInboundInterceptor.handle_signal`. + """ + _ensure_tracer(self.get_tracer()) + with self._top_level_workflow_context(input): + with self._workflow_maybe_span( + f"HandleSignal:{input.signal}", + ): + await super().handle_signal(input) + + async def handle_query(self, input: temporalio.worker.HandleQueryInput) -> Any: + """Implementation of + :py:meth:`temporalio.worker.WorkflowInboundInterceptor.handle_query`. + """ + _ensure_tracer(self.get_tracer()) + with self._top_level_workflow_context(input): + with self._workflow_maybe_span( + f"HandleQuery:{input.query}", + ): + return await super().handle_query(input) + + def handle_update_validator( + self, input: temporalio.worker.HandleUpdateInput + ) -> None: + """Implementation of + :py:meth:`temporalio.worker.WorkflowInboundInterceptor.handle_update_validator`. + """ + _ensure_tracer(self.get_tracer()) + with self._top_level_workflow_context(input): + with self._workflow_maybe_span( + f"ValidateUpdate:{input.update}", + ): + super().handle_update_validator(input) + + async def handle_update_handler( + self, input: temporalio.worker.HandleUpdateInput + ) -> Any: + """Implementation of + :py:meth:`temporalio.worker.WorkflowInboundInterceptor.handle_update_handler`. + """ + _ensure_tracer(self.get_tracer()) + with self._top_level_workflow_context(input): + with self._workflow_maybe_span( + f"HandleUpdate:{input.update}", + ): + return await super().handle_update_handler(input) + + @contextmanager + def _top_level_workflow_context(self, input: _InputWithHeaders) -> Iterator[None]: + context = _headers_to_context(input.headers) + token = opentelemetry.context.attach(context) + try: + yield + finally: + if context is opentelemetry.context.get_current(): + opentelemetry.context.detach(token) + + +class _TracingWorkflowOutboundInterceptor( + temporalio.worker.WorkflowOutboundInterceptor +): + def __init__( + self, + next: temporalio.worker.WorkflowOutboundInterceptor, + tracer: Tracer, + add_temporal_spans: bool, + ) -> None: + super().__init__(next) + self._tracer = tracer + self._add_temporal_spans = add_temporal_spans + + @contextmanager + def _workflow_maybe_span( + self, name: str, kind: opentelemetry.trace.SpanKind + ) -> Iterator[None]: + info = temporalio.workflow.info() + attributes: dict[str, opentelemetry.util.types.AttributeValue] = { + "temporalWorkflowID": info.workflow_id, + "temporalRunID": info.run_id, + } + with _maybe_span( + self._tracer, + name, + add_temporal_spans=self._add_temporal_spans, + attributes=attributes, + kind=kind, + ): + yield + + def continue_as_new(self, input: temporalio.worker.ContinueAsNewInput) -> NoReturn: + input.headers = _context_to_headers(input.headers) + super().continue_as_new(input) + + async def signal_child_workflow( + self, input: temporalio.worker.SignalChildWorkflowInput + ) -> None: + with self._workflow_maybe_span( + f"SignalChildWorkflow:{input.signal}", + kind=opentelemetry.trace.SpanKind.SERVER, + ): + input.headers = _context_to_headers(input.headers) + await super().signal_child_workflow(input) + + async def signal_external_workflow( + self, input: temporalio.worker.SignalExternalWorkflowInput + ) -> None: + with self._workflow_maybe_span( + f"SignalExternalWorkflow:{input.signal}", + kind=opentelemetry.trace.SpanKind.CLIENT, + ): + input.headers = _context_to_headers(input.headers) + await super().signal_external_workflow(input) + + def start_activity( + self, input: temporalio.worker.StartActivityInput + ) -> temporalio.workflow.ActivityHandle: + with self._workflow_maybe_span( + f"StartActivity:{input.activity}", + kind=opentelemetry.trace.SpanKind.CLIENT, + ): + input.headers = _context_to_headers(input.headers) + return super().start_activity(input) + + async def start_child_workflow( + self, input: temporalio.worker.StartChildWorkflowInput + ) -> temporalio.workflow.ChildWorkflowHandle: + with self._workflow_maybe_span( + f"StartChildWorkflow:{input.workflow}", + kind=opentelemetry.trace.SpanKind.CLIENT, + ): + input.headers = _context_to_headers(input.headers) + return await super().start_child_workflow(input) + + def start_local_activity( + self, input: temporalio.worker.StartLocalActivityInput + ) -> temporalio.workflow.ActivityHandle: + with self._workflow_maybe_span( + f"StartActivity:{input.activity}", + kind=opentelemetry.trace.SpanKind.CLIENT, + ): + input.headers = _context_to_headers(input.headers) + return super().start_local_activity(input) + + async def start_nexus_operation( + self, input: temporalio.worker.StartNexusOperationInput[Any, Any] + ) -> temporalio.workflow.NexusOperationHandle[Any]: + with self._workflow_maybe_span( + f"StartNexusOperation:{input.service}/{input.operation_name}", + kind=opentelemetry.trace.SpanKind.CLIENT, + ): + input.headers = _context_to_nexus_headers(input.headers or {}) + return await super().start_nexus_operation(input) diff --git a/temporalio/contrib/opentelemetry/_plugin.py b/temporalio/contrib/opentelemetry/_plugin.py new file mode 100644 index 000000000..8a1c09ce9 --- /dev/null +++ b/temporalio/contrib/opentelemetry/_plugin.py @@ -0,0 +1,48 @@ +from opentelemetry.trace import get_tracer_provider + +from temporalio.contrib.opentelemetry import OpenTelemetryInterceptor +from temporalio.contrib.opentelemetry._tracer_provider import ReplaySafeTracerProvider +from temporalio.plugin import SimplePlugin + + +class OpenTelemetryPlugin(SimplePlugin): + """OpenTelemetry plugin for Temporal SDK. + + .. warning:: + This class is experimental and may change in future versions. + Use with caution in production environments. + + This plugin integrates OpenTelemetry tracing with the Temporal SDK, providing + automatic span creation for workflows, activities, and other Temporal operations. + It uses the new OpenTelemetryInterceptor implementation. + + Unlike the prior TracingInterceptor, this allows for accurate duration spans and parenting inside a workflow + with temporalio.contrib.opentelemetry.workflow.tracer() + + When starting traces on the client side, you can use OpenTelemetryPlugin.provider() to trace to the same + exporters provided. If you don't, ensure that some provider is globally registered or the client side + traces will not be propagated to the workflow. + """ + + def __init__(self, *, add_temporal_spans: bool = False): + """Initialize the OpenTelemetry plugin. + + Args: + add_temporal_spans: Whether to add additional Temporal-specific spans + for operations like StartWorkflow, RunWorkflow, etc. + """ + provider = get_tracer_provider() + if not isinstance(provider, ReplaySafeTracerProvider): + raise ValueError( + "When using OpenTelemetryPlugin, the global trace provider must be a ReplaySafeTracerProvider. Use init_tracer_provider to create one." + ) + + interceptors = [ + OpenTelemetryInterceptor(provider.get_tracer(__name__), add_temporal_spans) + ] + + super().__init__( + "OpenTelemetryPlugin", + client_interceptors=interceptors, + worker_interceptors=interceptors, + ) diff --git a/temporalio/contrib/opentelemetry/_processor.py b/temporalio/contrib/opentelemetry/_processor.py new file mode 100644 index 000000000..30245d458 --- /dev/null +++ b/temporalio/contrib/opentelemetry/_processor.py @@ -0,0 +1,36 @@ +from opentelemetry.sdk.trace import ReadableSpan +from opentelemetry.sdk.trace.export import BatchSpanProcessor + +from temporalio import workflow + + +class TemporalSpanProcessor(BatchSpanProcessor): + """A span processor that handles Temporal workflow replay semantics. + + This processor ensures that spans are only exported when workflows actually + complete, not during intermediate replays. This is crucial for maintaining + correct telemetry data when using OpenAI agents within Temporal workflows. + + Example usage: + from opentelemetry.sdk import trace as trace_sdk + from opentelemetry.sdk.trace.export.in_memory_span_exporter import InMemorySpanExporter + from temporalio.contrib.openai_agents._temporal_trace_provider import TemporalIdGenerator + from temporalio.contrib.openai_agents._otel import TemporalSpanProcessor + from openinference.instrumentation.openai_agents import OpenAIAgentsInstrumentor + + exporter = InMemorySpanExporter() + provider = trace_sdk.TracerProvider(id_generator=TemporalIdGenerator()) + provider.add_span_processor(TemporalSpanProcessor(exporter)) + OpenAIAgentsInstrumentor().instrument(tracer_provider=provider) + """ + + def on_end(self, span: ReadableSpan) -> None: + """Handle span end events, skipping export during workflow replay. + + Args: + span: The span that has ended. + """ + if workflow.in_workflow() and workflow.unsafe.is_replaying_history_events(): + # Skip exporting spans during workflow replay to avoid duplicate telemetry + return + super().on_end(span) diff --git a/temporalio/contrib/opentelemetry/_tracer_provider.py b/temporalio/contrib/opentelemetry/_tracer_provider.py new file mode 100644 index 000000000..853a03208 --- /dev/null +++ b/temporalio/contrib/opentelemetry/_tracer_provider.py @@ -0,0 +1,258 @@ +from collections.abc import Iterator, Mapping, Sequence + +import opentelemetry.sdk.trace as trace_sdk +from opentelemetry.context import Context +from opentelemetry.sdk.resources import Resource +from opentelemetry.sdk.trace import ( + ConcurrentMultiSpanProcessor, + SpanLimits, + SynchronousMultiSpanProcessor, + sampling, +) +from opentelemetry.sdk.trace.id_generator import IdGenerator, RandomIdGenerator +from opentelemetry.trace import ( + Link, + Span, + SpanContext, + SpanKind, + Status, + StatusCode, + Tracer, + TracerProvider, + use_span, +) +from opentelemetry.util import types +from opentelemetry.util._decorator import _agnosticcontextmanager + +from temporalio import workflow +from temporalio.contrib.opentelemetry._id_generator import TemporalIdGenerator + + +class _ReplaySafeSpan(Span): + def __init__(self, span: Span): + self._span = span + + def end(self, end_time: int | None = None) -> None: + if workflow.in_workflow() and workflow.unsafe.is_replaying_history_events(): + # Skip ending spans during workflow replay to avoid duplicate telemetry + return + self._span.end(end_time=end_time) + + def get_span_context(self) -> SpanContext: + return self._span.get_span_context() + + def set_attributes(self, attributes: Mapping[str, types.AttributeValue]) -> None: + self._span.set_attributes(attributes) + + def set_attribute(self, key: str, value: types.AttributeValue) -> None: + self._span.set_attribute(key, value) + + def add_event( + self, + name: str, + attributes: types.Attributes = None, + timestamp: int | None = None, + ) -> None: + self._span.add_event(name, attributes, timestamp) + + def update_name(self, name: str) -> None: + self._span.update_name(name) + + def is_recording(self) -> bool: + return self._span.is_recording() + + def set_status( + self, status: Status | StatusCode, description: str | None = None + ) -> None: + self._span.set_status(status, description) + + def record_exception( + self, + exception: BaseException, + attributes: types.Attributes = None, + timestamp: int | None = None, + escaped: bool = False, + ) -> None: + self._span.record_exception(exception, attributes, timestamp, escaped) + + +class _ReplaySafeTracer(Tracer): + def __init__(self, tracer: Tracer): + self._tracer = tracer + + def start_span( + self, + name: str, + context: Context | None = None, + kind: SpanKind = SpanKind.INTERNAL, + attributes: types.Attributes = None, + links: Sequence[Link] | None = None, + start_time: int | None = None, + record_exception: bool = True, + set_status_on_exception: bool = True, + ) -> "Span": + span = self._tracer.start_span( + name, + context, + kind, + attributes, + links, + start_time or workflow.time_ns(), + record_exception, + set_status_on_exception, + ) + return _ReplaySafeSpan(span) + + @_agnosticcontextmanager + def start_as_current_span( + self, + name: str, + context: Context | None = None, + kind: SpanKind = SpanKind.INTERNAL, + attributes: types.Attributes = None, + links: Sequence[Link] | None = None, + start_time: int | None = None, + record_exception: bool = True, + set_status_on_exception: bool = True, + end_on_exit: bool = True, + ) -> Iterator["Span"]: + if workflow.in_workflow() and workflow.unsafe.is_replaying_history_events(): + start_time = start_time or workflow.time_ns() + span = self._tracer.start_span( + name, + context, + kind, + attributes, + links, + start_time, + record_exception, + set_status_on_exception, + ) + span = _ReplaySafeSpan(span) + with use_span( + span, + end_on_exit=end_on_exit, + record_exception=record_exception, + set_status_on_exception=set_status_on_exception, + ) as span: + yield span + + +class ReplaySafeTracerProvider(TracerProvider): + """A tracer provider that is safe for use during workflow replay. + + .. warning:: + This class is experimental and may change in future versions. + Use with caution in production environments. + + This tracer provider wraps an OpenTelemetry TracerProvider and ensures + that telemetry operations are safe during workflow replay by using + replay-safe spans and tracers. + """ + + def __init__(self, tracer_provider: trace_sdk.TracerProvider): + """Initialize the replay-safe tracer provider. + + Args: + tracer_provider: The underlying OpenTelemetry TracerProvider to wrap. + Must use a _TemporalIdGenerator for replay safety. + + Raises: + ValueError: If the tracer provider doesn't use a _TemporalIdGenerator. + """ + if not isinstance(tracer_provider.id_generator, TemporalIdGenerator): + raise ValueError( + "ReplaySafeTracerProvider should only be used with a TemporalIdGenerator for replay safety. The given TracerProvider doesnt use one." + ) + self._tracer_provider = tracer_provider + + def add_span_processor(self, span_processor: trace_sdk.SpanProcessor) -> None: + """Add a span processor to the underlying tracer provider. + + Args: + span_processor: The span processor to add. + """ + self._tracer_provider.add_span_processor(span_processor) + + def shutdown(self) -> None: + """Shutdown the underlying tracer provider.""" + self._tracer_provider.shutdown() + + def force_flush(self, timeout_millis: int = 30000) -> bool: + """Force flush the underlying tracer provider. + + Args: + timeout_millis: Timeout in milliseconds. + + Returns: + True if flush was successful, False otherwise. + """ + return self._tracer_provider.force_flush(timeout_millis) + + def get_tracer( + self, + instrumenting_module_name: str, + instrumenting_library_version: str | None = None, + schema_url: str | None = None, + attributes: types.Attributes | None = None, + ) -> Tracer: + """Get a replay-safe tracer from the underlying provider. + + Args: + instrumenting_module_name: The name of the instrumenting module. + instrumenting_library_version: The version of the instrumenting library. + schema_url: The schema URL for the tracer. + attributes: Additional attributes for the tracer. + + Returns: + A replay-safe tracer instance. + """ + tracer = self._tracer_provider.get_tracer( + instrumenting_module_name, + instrumenting_library_version, + schema_url, + attributes, + ) + return _ReplaySafeTracer(tracer) + + +def init_tracer_provider( + sampler: sampling.Sampler | None = None, + resource: Resource | None = None, + shutdown_on_exit: bool = True, + active_span_processor: SynchronousMultiSpanProcessor + | ConcurrentMultiSpanProcessor + | None = None, + id_generator: IdGenerator | None = None, + span_limits: SpanLimits | None = None, +) -> ReplaySafeTracerProvider: + """Initialize a replay-safe tracer provider. + + .. warning:: + This function is experimental and may change in future versions. + Use with caution in production environments. + + Creates a new TracerProvider with a TemporalIdGenerator for replay safety + and wraps it in a ReplaySafeTracerProvider. + + Args: + sampler: The sampler to use for sampling spans. + resource: The resource to associate with the tracer provider. + shutdown_on_exit: Whether to shutdown the provider on exit. + active_span_processor: The active span processor to use. + id_generator: The ID generator to wrap with TemporalIdGenerator. + span_limits: The span limits to apply. + + Returns: + A replay-safe tracer provider instance. + """ + generator = TemporalIdGenerator(id_generator or RandomIdGenerator()) + provider = trace_sdk.TracerProvider( + sampler=sampler, + resource=resource, + shutdown_on_exit=shutdown_on_exit, + active_span_processor=active_span_processor, + span_limits=span_limits, + id_generator=generator, + ) + return ReplaySafeTracerProvider(provider) diff --git a/temporalio/contrib/opentelemetry/workflow.py b/temporalio/contrib/opentelemetry/workflow.py new file mode 100644 index 000000000..5b2e6c001 --- /dev/null +++ b/temporalio/contrib/opentelemetry/workflow.py @@ -0,0 +1,86 @@ +"""OpenTelemetry workflow utilities for Temporal SDK. + +This module provides workflow-safe OpenTelemetry span creation and context +management utilities for use within Temporal workflows. All functions in +this module are designed to work correctly during workflow replay. +""" + +from __future__ import annotations + +import warnings + +import opentelemetry.util.types +from opentelemetry.trace import ( + Tracer, +) + +import temporalio.workflow +from temporalio.contrib.opentelemetry import TracingWorkflowInboundInterceptor +from temporalio.exceptions import ApplicationError + + +def _try_get_tracer() -> Tracer | None: + tracer = getattr( + temporalio.workflow.instance(), "__temporal_opentelemetry_tracer", None + ) + if tracer is not None and not isinstance(tracer, Tracer): + raise ApplicationError( + "Failed to get temporal OpenTelemetry tracer from workflow. It was present but not a Tracer. This is unexpected." + ) + return tracer + + +def tracer(): + """Get an OpenTelemetry Tracer which functions inside a Temporal workflow. + + .. warning:: + This function is experimental and may change in future versions. + Use with caution in production environments. + """ + tracer = _try_get_tracer() + if tracer is None or not isinstance(tracer, Tracer): + raise ApplicationError( + "Failed to get temporal OpenTelemetry tracer from workflow. You may not have registered the OpenTelemetryPlugin." + ) + + return tracer + + +def completed_span( + name: str, + *, + attributes: opentelemetry.util.types.Attributes = None, + exception: Exception | None = None, +) -> None: + """Create and end an OpenTelemetry span. + + Note, this will only create and record when the workflow is not + replaying and if there is a current span (meaning the client started a + span and this interceptor is configured on the worker and the span is on + the context). + + To create a long-running span or to create a span that actually spans other code use OpenTelemetryPlugin and tracer(). + + Args: + name: Name of the span. + attributes: Attributes to set on the span if any. Workflow ID and + run ID are automatically added. + exception: Optional exception to record on the span. + """ + # Check for v2 Tracer first + if tracer := _try_get_tracer(): + warnings.warn( + "When using OpenTelemetryPlugin, you should prefer workflow.tracer().", + DeprecationWarning, + ) + span = tracer.start_span(name, attributes=attributes) + if exception: + span.record_exception(exception) + span.end() + return + + interceptor = TracingWorkflowInboundInterceptor._from_context() + if interceptor: + interceptor._completed_span( + name, additional_attributes=attributes, exception=exception + ) diff --git a/temporalio/plugin.py b/temporalio/plugin.py index db917e337..b0f358143 100644 --- a/temporalio/plugin.py +++ b/temporalio/plugin.py @@ -153,8 +153,29 @@ def configure_worker(self, config: WorkerConfig) -> WorkerConfig: interceptors = _resolve_append_parameter( config.get("interceptors"), self.worker_interceptors ) - if interceptors is not None: - config["interceptors"] = interceptors + + client = config.get("client") + + # Don't add new worker interceptors which are already registered in the client. + if ( + self.worker_interceptors + and not callable(self.worker_interceptors) + and client + ): + new_interceptors = list(config.get("interceptors") or []) + for interceptor in self.worker_interceptors: + client_interceptors = client.config(active_config=True).get( + "interceptors" + ) + if not client_interceptors or not interceptor in client_interceptors: + new_interceptors.append(interceptor) + config["interceptors"] = new_interceptors + else: + interceptors = _resolve_append_parameter( + config.get("interceptors"), self.worker_interceptors + ) + if interceptors is not None: + config["interceptors"] = interceptors failure_exception_types = _resolve_append_parameter( config.get("workflow_failure_exception_types"), diff --git a/temporalio/worker/_workflow_instance.py b/temporalio/worker/_workflow_instance.py index 3f55e19eb..a3bbec549 100644 --- a/temporalio/worker/_workflow_instance.py +++ b/temporalio/worker/_workflow_instance.py @@ -1232,6 +1232,9 @@ def workflow_is_replaying(self) -> bool: def workflow_is_replaying_history_events(self) -> bool: return self._is_replaying and not self._in_query_or_validator + def workflow_is_read_only(self) -> bool: + return self._read_only + def workflow_memo(self) -> Mapping[str, Any]: if self._untyped_converted_memo is None: self._untyped_converted_memo = { diff --git a/temporalio/workflow.py b/temporalio/workflow.py index fb1753ed8..441dac6be 100644 --- a/temporalio/workflow.py +++ b/temporalio/workflow.py @@ -741,6 +741,9 @@ def workflow_is_replaying(self) -> bool: ... @abstractmethod def workflow_is_replaying_history_events(self) -> bool: ... + @abstractmethod + def workflow_is_read_only(self) -> bool: ... + @abstractmethod def workflow_memo(self) -> Mapping[str, Any]: ... @@ -1516,6 +1519,18 @@ def is_replaying_history_events() -> bool: """ return _Runtime.current().workflow_is_replaying_history_events() + @staticmethod + def is_read_only() -> bool: + """Whether the workflow is currently in read-only mode. + + Read-only mode occurs during queries and update validators where + side effects are not allowed. + + Returns: + True if the workflow is in read-only mode, False otherwise. + """ + return _Runtime.current().workflow_is_read_only() + @staticmethod def is_sandbox_unrestricted() -> bool: """Whether the current block of code is not restricted via sandbox. diff --git a/tests/contrib/test_opentelemetry.py b/tests/contrib/opentelemetry/test_opentelemetry.py similarity index 100% rename from tests/contrib/test_opentelemetry.py rename to tests/contrib/opentelemetry/test_opentelemetry.py diff --git a/tests/contrib/opentelemetry/test_opentelemetry_plugin.py b/tests/contrib/opentelemetry/test_opentelemetry_plugin.py new file mode 100644 index 000000000..d803f1cb7 --- /dev/null +++ b/tests/contrib/opentelemetry/test_opentelemetry_plugin.py @@ -0,0 +1,470 @@ +import logging +import uuid +from datetime import timedelta +from typing import Any + +import nexusrpc +import opentelemetry.trace +import pytest +from opentelemetry.sdk.trace import ReadableSpan +from opentelemetry.sdk.trace.export import SimpleSpanProcessor +from opentelemetry.sdk.trace.export.in_memory_span_exporter import InMemorySpanExporter +from opentelemetry.trace import get_tracer +from opentelemetry.util._once import Once + +import temporalio.contrib.opentelemetry.workflow +from temporalio import activity, nexus, workflow +from temporalio.client import Client +from temporalio.contrib.opentelemetry import OpenTelemetryPlugin, init_tracer_provider +from temporalio.testing import WorkflowEnvironment + +# Import the dump_spans function from the original opentelemetry test +from tests.contrib.test_opentelemetry import dump_spans +from tests.helpers import new_worker +from tests.helpers.nexus import create_nexus_endpoint, make_nexus_endpoint_name + +logger = logging.getLogger(__name__) + + +def print_otel_spans(spans: tuple[ReadableSpan, ...]): + print( + "\n".join( + [ + str( + { + "Name": span.name, + "Id": span.context.span_id if span.context else None, + "Parent": span.parent.span_id if span.parent else None, + } + ) + for span in spans + ] + ) + ) + + +@pytest.fixture +def reset_otel_tracer_provider(): + """Reset OpenTelemetry tracer provider state to allow multiple test runs.""" + opentelemetry.trace._TRACER_PROVIDER_SET_ONCE = Once() + opentelemetry.trace._TRACER_PROVIDER = None + yield + opentelemetry.trace._TRACER_PROVIDER_SET_ONCE = Once() + opentelemetry.trace._TRACER_PROVIDER = None + + +@activity.defn +async def simple_no_context_activity() -> str: + with get_tracer(__name__).start_as_current_span("Activity"): + pass + return "success" + + +@workflow.defn +class SimpleNexusWorkflow: + @workflow.run + async def run(self, input: str) -> str: + return f"nexus-result-{input}" + + +@nexusrpc.handler.service_handler +class ComprehensiveNexusService: + @nexus.workflow_run_operation + async def test_operation( + self, ctx: nexus.WorkflowRunOperationContext, input: str + ) -> nexus.WorkflowHandle[str]: + return await ctx.start_workflow( + SimpleNexusWorkflow.run, + input, + id=f"nexus-wf-{ctx.request_id}", + ) + + +@workflow.defn +class BasicTraceWorkflow: + @workflow.run + async def run(self): + print("\nWorkflow Run") + tracer = temporalio.contrib.opentelemetry.workflow.tracer() + print("Workflow tracer: ", tracer) + temporalio.contrib.opentelemetry.workflow.completed_span("Completed Span") + with tracer.start_as_current_span("Hello World"): + await workflow.execute_activity( + simple_no_context_activity, + start_to_close_timeout=timedelta(seconds=10), + ) + await workflow.execute_activity( + simple_no_context_activity, + start_to_close_timeout=timedelta(seconds=10), + ) + span = tracer.start_span("Not context") + with tracer.start_as_current_span("Inner"): + await workflow.execute_activity( + simple_no_context_activity, + start_to_close_timeout=timedelta(seconds=10), + ) + span.end() + return + + +async def test_otel_tracing_basic(client: Client, reset_otel_tracer_provider: Any): # type: ignore[reportUnusedParameter] + exporter = InMemorySpanExporter() + provider = init_tracer_provider() + provider.add_span_processor(SimpleSpanProcessor(exporter)) + opentelemetry.trace.set_tracer_provider(provider) + + new_config = client.config() + new_config["plugins"] = [OpenTelemetryPlugin()] + new_client = Client(**new_config) + + async with new_worker( + new_client, + BasicTraceWorkflow, + activities=[simple_no_context_activity], + max_cached_workflows=0, + ) as worker: + tracer = get_tracer(__name__) + + with tracer.start_as_current_span("Research workflow"): + workflow_handle = await new_client.start_workflow( + BasicTraceWorkflow.run, + id=f"research-workflow-{uuid.uuid4()}", + task_queue=worker.task_queue, + execution_timeout=timedelta(seconds=120), + ) + await workflow_handle.result() + + spans = exporter.get_finished_spans() + print_otel_spans(spans) + print(dump_spans(spans, with_attributes=False)) + # assert len(spans) == 7 + + expected_hierarchy = [ + "Research workflow", + " Completed Span", + " Hello World", + " Activity", + " Activity", + " Inner", + " Activity", + " Not context", + ] + + # Verify the span hierarchy matches expectations + actual_hierarchy = dump_spans(spans, with_attributes=False) + assert ( + actual_hierarchy == expected_hierarchy + ), f"Span hierarchy mismatch.\nExpected:\n{expected_hierarchy}\nActual:\n{actual_hierarchy}" + + +@workflow.defn +class ComprehensiveWorkflow: + def __init__(self) -> None: + self._signal_count = 0 + self._update_completed = False + self._nexus_result: str = "" + + @workflow.run + async def run(self, actions: list[str]) -> dict[str, str]: + results = {} + tracer = temporalio.contrib.opentelemetry.workflow.tracer() + with tracer.start_as_current_span("MainWorkflow"): + for action in actions: + if action == "activity": + with tracer.start_as_current_span("ActivitySection"): + result = await workflow.execute_activity( + simple_no_context_activity, + start_to_close_timeout=timedelta(seconds=10), + ) + results["activity"] = result + + elif action == "local_activity": + with tracer.start_as_current_span("LocalActivitySection"): + result = await workflow.execute_local_activity( + simple_no_context_activity, + start_to_close_timeout=timedelta(seconds=10), + ) + results["local_activity"] = result + + elif action == "child_workflow": + with tracer.start_as_current_span("ChildWorkflowSection"): + child_handle = await workflow.start_child_workflow( + BasicTraceWorkflow.run, + id=f"child-{workflow.info().workflow_id}", + ) + await child_handle + results["child_workflow"] = "completed" + + elif action == "timer": + with tracer.start_as_current_span("TimerSection"): + await workflow.sleep(0.01) + results["timer"] = "completed" + + elif action == "wait_signal": + with tracer.start_as_current_span("WaitSignalSection"): + await workflow.wait_condition(lambda: self._signal_count > 0) + results["wait_signal"] = ( + f"received_{self._signal_count}_signals" + ) + + elif action == "wait_update": + with tracer.start_as_current_span("WaitUpdateSection"): + await workflow.wait_condition(lambda: self._update_completed) + results["wait_update"] = "update_received" + + elif action == "nexus": + with tracer.start_as_current_span("NexusSection"): + nexus_client = workflow.create_nexus_client( + endpoint=make_nexus_endpoint_name( + workflow.info().task_queue + ), + service=ComprehensiveNexusService, + ) + nexus_handle = await nexus_client.start_operation( + operation=ComprehensiveNexusService.test_operation, + input="test-input", + ) + nexus_result = await nexus_handle + results["nexus"] = nexus_result + + elif action == "continue_as_new": + with tracer.start_as_current_span("ContinueAsNewSection"): + if ( + len(results) > 0 + ): # Only continue as new if we've done some work + workflow.continue_as_new( + [] + ) # Empty actions to finish quickly + results["continue_as_new"] = "prepared" + + return results + + @workflow.query + def get_status(self) -> dict[str, Any]: + return { + "signal_count": self._signal_count, + "update_completed": self._update_completed, + } + + @workflow.signal + def notify(self, message: str) -> None: # type: ignore[reportUnusedParameter] + self._signal_count += 1 + + @workflow.update + def update_status(self, status: str) -> str: + self._update_completed = True + return f"updated_to_{status}" + + @update_status.validator + def validate_update_status(self, status: str) -> None: + if not status: + raise ValueError("Status cannot be empty") + + +async def test_opentelemetry_comprehensive_tracing( + client: Client, + env: WorkflowEnvironment, + reset_otel_tracer_provider: Any, # type: ignore[reportUnusedParameter] +): + """Test OpenTelemetry v2 integration across all workflow operations.""" + if env.supports_time_skipping: + pytest.skip("Fails on java test server.") + + exporter = InMemorySpanExporter() + provider = init_tracer_provider() + provider.add_span_processor(SimpleSpanProcessor(exporter)) + opentelemetry.trace.set_tracer_provider(provider) + + new_config = client.config() + new_config["plugins"] = [OpenTelemetryPlugin(add_temporal_spans=True)] + new_client = Client(**new_config) + + async with new_worker( + new_client, + ComprehensiveWorkflow, + BasicTraceWorkflow, # For child workflow + SimpleNexusWorkflow, # For Nexus operation + activities=[simple_no_context_activity], + nexus_service_handlers=[ComprehensiveNexusService()], + max_cached_workflows=0, + ) as worker: + # Create Nexus endpoint for this task queue + await create_nexus_endpoint(worker.task_queue, new_client) + + with get_tracer(__name__).start_as_current_span("ComprehensiveTest") as span: + span.set_attribute("test.type", "comprehensive") + + # Start workflow with various actions + workflow_handle = await new_client.start_workflow( + ComprehensiveWorkflow.run, + [ + "activity", + "local_activity", + "child_workflow", + "timer", + "nexus", + "wait_signal", + "wait_update", + ], + id=f"comprehensive-workflow-{uuid.uuid4()}", + task_queue=worker.task_queue, + execution_timeout=timedelta(seconds=120), + ) + + logger.info(f"Comprehensive workflow query") + + # Test query + status = await workflow_handle.query(ComprehensiveWorkflow.get_status) + assert status["signal_count"] == 0 + + logger.info(f"Comprehensive workflow signal") + + # Test signal + await workflow_handle.signal(ComprehensiveWorkflow.notify, "test-signal-1") + await workflow_handle.signal(ComprehensiveWorkflow.notify, "test-signal-2") + + logger.info(f"Comprehensive workflow update") + + # Test update + update_result = await workflow_handle.execute_update( + ComprehensiveWorkflow.update_status, "active" + ) + assert update_result == "updated_to_active" + + logger.info(f"Comprehensive workflow get result") + + # Get final result + result = await workflow_handle.result() + + # Verify results + expected_keys = { + "activity", + "local_activity", + "child_workflow", + "timer", + "nexus", + "wait_signal", + "wait_update", + } + assert all(key in result for key in expected_keys) + assert result["activity"] == "success" + assert result["local_activity"] == "success" + assert result["child_workflow"] == "completed" + assert result["timer"] == "completed" + assert result["nexus"] == "nexus-result-test-input" + assert result["wait_signal"] == "received_2_signals" + assert result["wait_update"] == "update_received" + + spans = exporter.get_finished_spans() + + # Note: Even though we call signal twice, dump_spans() deduplicates signal spans + # as they "can duplicate in rare situations" according to the original test + + expected_hierarchy = [ + "ComprehensiveTest", + " StartWorkflow:ComprehensiveWorkflow", + " RunWorkflow:ComprehensiveWorkflow", + " MainWorkflow", + " ActivitySection", + " StartActivity:simple_no_context_activity", + " RunActivity:simple_no_context_activity", + " Activity", + " LocalActivitySection", + " StartActivity:simple_no_context_activity", + " RunActivity:simple_no_context_activity", + " Activity", + " ChildWorkflowSection", + " StartChildWorkflow:BasicTraceWorkflow", + " RunWorkflow:BasicTraceWorkflow", + " Completed Span", + " Hello World", + " StartActivity:simple_no_context_activity", + " RunActivity:simple_no_context_activity", + " Activity", + " StartActivity:simple_no_context_activity", + " RunActivity:simple_no_context_activity", + " Activity", + " Inner", + " StartActivity:simple_no_context_activity", + " RunActivity:simple_no_context_activity", + " Activity", + " Not context", + " TimerSection", + " NexusSection", + " StartNexusOperation:ComprehensiveNexusService/test_operation", + " RunStartNexusOperationHandler:ComprehensiveNexusService/test_operation", + " StartWorkflow:SimpleNexusWorkflow", + " RunWorkflow:SimpleNexusWorkflow", + " WaitSignalSection", + " WaitUpdateSection", + " QueryWorkflow:get_status", + " HandleQuery:get_status", + " SignalWorkflow:notify", + " HandleSignal:notify", + " StartWorkflowUpdate:update_status", + " ValidateUpdate:update_status", + " HandleUpdate:update_status", + ] + + # Verify the span hierarchy matches expectations + actual_hierarchy = dump_spans(spans, with_attributes=False) + assert ( + actual_hierarchy == expected_hierarchy + ), f"Span hierarchy mismatch.\nExpected:\n{expected_hierarchy}\nActual:\n{actual_hierarchy}" + + +async def test_otel_tracing_with_added_spans( + client: Client, + reset_otel_tracer_provider: Any, # type: ignore[reportUnusedParameter] +): + exporter = InMemorySpanExporter() + provider = init_tracer_provider() + provider.add_span_processor(SimpleSpanProcessor(exporter)) + opentelemetry.trace.set_tracer_provider(provider) + + plugin = OpenTelemetryPlugin(add_temporal_spans=True) + new_config = client.config() + new_config["plugins"] = [plugin] + new_client = Client(**new_config) + + async with new_worker( + new_client, + BasicTraceWorkflow, + activities=[simple_no_context_activity], + max_cached_workflows=0, + ) as worker: + with get_tracer(__name__).start_as_current_span("Research workflow"): + workflow_handle = await new_client.start_workflow( + BasicTraceWorkflow.run, + id=f"research-workflow-{uuid.uuid4()}", + task_queue=worker.task_queue, + execution_timeout=timedelta(seconds=120), + ) + await workflow_handle.result() + + spans = exporter.get_finished_spans() + + expected_hierarchy = [ + "Research workflow", + " StartWorkflow:BasicTraceWorkflow", + " RunWorkflow:BasicTraceWorkflow", + " Completed Span", + " Hello World", + " StartActivity:simple_no_context_activity", + " RunActivity:simple_no_context_activity", + " Activity", + " StartActivity:simple_no_context_activity", + " RunActivity:simple_no_context_activity", + " Activity", + " Inner", + " StartActivity:simple_no_context_activity", + " RunActivity:simple_no_context_activity", + " Activity", + " Not context", + ] + + # Verify the span hierarchy matches expectations + actual_hierarchy = dump_spans(spans, with_attributes=False) + assert ( + actual_hierarchy == expected_hierarchy + ), f"Span hierarchy mismatch.\nExpected:\n{expected_hierarchy}\nActual:\n{actual_hierarchy}"