Skip to content
80 changes: 80 additions & 0 deletions temporalio/_log_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
"""Internal utilities for Temporal logging.

This module is internal and may change at any time.
"""

from __future__ import annotations

from collections.abc import Mapping, MutableMapping
from typing import Any, Literal

TemporalLogExtraMode = Literal["dict", "flatten"]
"""Mode controlling how Temporal context is added to log record extra.

Values:
dict: (default) Add context as a nested dictionary under a single key.
This is the original behavior. Suitable for logging handlers that
support nested structures.
flatten: Add each context field as a separate top-level key with a
namespaced prefix. Values that are not primitives (str/int/float/bool)
are converted to strings. This mode is recommended for OpenTelemetry
and other logging pipelines that require flat, scalar attributes.
"""


def _apply_temporal_context_to_extra(
extra: MutableMapping[str, Any],
*,
key: str,
ctx: Mapping[str, Any],
mode: TemporalLogExtraMode,
) -> None:
"""Apply temporal context to log record extra based on the configured mode.

Args:
extra: The mutable extra dict to update.
key: The base key (e.g., "temporal_workflow"). In dict mode this is
used directly. In flatten mode the prefix is derived by replacing
underscores with dots (e.g., "temporal.workflow").
ctx: The context mapping containing temporal fields.
mode: The mode controlling how context is added.
"""
if mode == "flatten":
prefix = key.replace("_", ".")
for k, v in ctx.items():
# Ensure value is a primitive type safe for OTel attributes
if not isinstance(v, (str, int, float, bool, type(None))):
v = str(v)
extra[f"{prefix}.{k}"] = v
else:
extra[key] = dict(ctx)


def _update_temporal_context_in_extra(
extra: MutableMapping[str, Any],
*,
key: str,
update_ctx: Mapping[str, Any],
mode: TemporalLogExtraMode,
) -> None:
"""Update existing temporal context in extra with additional fields.

This is used when adding update info to existing workflow context.

Args:
extra: The mutable extra dict to update.
key: The base key (e.g., "temporal_workflow"). In dict mode this is
used directly. In flatten mode the prefix is derived by replacing
underscores with dots (e.g., "temporal.workflow").
update_ctx: Additional context fields to add/update.
mode: The mode controlling how context is added.
"""
if mode == "flatten":
prefix = key.replace("_", ".")
for k, v in update_ctx.items():
# Ensure value is a primitive type safe for OTel attributes
if not isinstance(v, (str, int, float, bool, type(None))):
v = str(v)
extra[f"{prefix}.{k}"] = v
else:
extra.setdefault(key, {}).update(update_ctx)
13 changes: 12 additions & 1 deletion temporalio/activity.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import temporalio.common
import temporalio.converter

from ._log_utils import TemporalLogExtraMode, _apply_temporal_context_to_extra
from .types import CallableType

if TYPE_CHECKING:
Expand Down Expand Up @@ -474,6 +475,10 @@ class LoggerAdapter(logging.LoggerAdapter):
value will be added to the ``extra`` dictionary with the entire
activity info, making it present on the ``LogRecord.__dict__`` for
use by others. Default is False.
temporal_extra_mode: Controls how activity context is added to log
``extra``. Default is ``"dict"`` (current behavior). Set to
``"flatten"`` for OpenTelemetry compatibility (scalar attributes
with ``temporal.activity.`` prefix).
"""

def __init__(self, logger: logging.Logger, extra: Mapping[str, Any] | None) -> None:
Expand All @@ -482,6 +487,7 @@ def __init__(self, logger: logging.Logger, extra: Mapping[str, Any] | None) -> N
self.activity_info_on_message = True
self.activity_info_on_extra = True
self.full_activity_info_on_extra = False
self.temporal_extra_mode: TemporalLogExtraMode = "dict"

def process(
self, msg: Any, kwargs: MutableMapping[str, Any]
Expand All @@ -499,7 +505,12 @@ def process(
if self.activity_info_on_extra:
# Extra can be absent or None, this handles both
extra = kwargs.get("extra", None) or {}
extra["temporal_activity"] = context.logger_details
_apply_temporal_context_to_extra(
extra,
key="temporal_activity",
ctx=context.logger_details,
mode=self.temporal_extra_mode,
)
kwargs["extra"] = extra
if self.full_activity_info_on_extra:
# Extra can be absent or None, this handles both
Expand Down
24 changes: 22 additions & 2 deletions temporalio/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,11 @@
import temporalio.workflow
from temporalio.nexus._util import ServiceHandlerT

from ._log_utils import (
TemporalLogExtraMode,
_apply_temporal_context_to_extra,
_update_temporal_context_in_extra,
)
from .types import (
AnyType,
CallableAsyncNoParam,
Expand Down Expand Up @@ -1619,6 +1624,10 @@ class LoggerAdapter(logging.LoggerAdapter):
use by others. Default is False.
log_during_replay: Boolean for whether logs should occur during replay.
Default is False.
temporal_extra_mode: Controls how workflow context is added to log
``extra``. Default is ``"dict"`` (current behavior). Set to
``"flatten"`` for OpenTelemetry compatibility (scalar attributes
with ``temporal.workflow.`` prefix).

Values added to ``extra`` are merged with the ``extra`` dictionary from a
logging call, with values from the logging call taking precedence. I.e. the
Expand All @@ -1632,6 +1641,7 @@ def __init__(self, logger: logging.Logger, extra: Mapping[str, Any] | None) -> N
self.workflow_info_on_extra = True
self.full_workflow_info_on_extra = False
self.log_during_replay = False
self.temporal_extra_mode: TemporalLogExtraMode = "dict"
self.disable_sandbox = False

def process(
Expand All @@ -1652,7 +1662,12 @@ def process(
if self.workflow_info_on_message:
msg_extra.update(workflow_details)
if self.workflow_info_on_extra:
extra["temporal_workflow"] = workflow_details
_apply_temporal_context_to_extra(
extra,
key="temporal_workflow",
ctx=workflow_details,
mode=self.temporal_extra_mode,
)
if self.full_workflow_info_on_extra:
extra["workflow_info"] = runtime.workflow_info()
update_info = current_update_info()
Expand All @@ -1661,7 +1676,12 @@ def process(
if self.workflow_info_on_message:
msg_extra.update(update_details)
if self.workflow_info_on_extra:
extra.setdefault("temporal_workflow", {}).update(update_details)
_update_temporal_context_in_extra(
extra,
key="temporal_workflow",
update_ctx=update_details,
mode=self.temporal_extra_mode,
)

kwargs["extra"] = {**extra, **(kwargs.get("extra") or {})}
if msg_extra:
Expand Down
Loading