Skip to content
140 changes: 138 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ informal introduction to the features and their implementation.
- [Data Conversion](#data-conversion)
- [Pydantic Support](#pydantic-support)
- [Custom Type Data Conversion](#custom-type-data-conversion)
- [External Payload Storage](#external-payload-storage)
- [Workers](#workers)
- [Workflows](#workflows)
- [Definition](#definition)
Expand Down Expand Up @@ -309,8 +310,9 @@ other_ns_client = Client(**config)

Data converters are used to convert raw Temporal payloads to/from actual Python types. A custom data converter of type
`temporalio.converter.DataConverter` can be set via the `data_converter` parameter of the `Client` constructor. Data
converters are a combination of payload converters, payload codecs, and failure converters. Payload converters convert
Python values to/from serialized bytes. Payload codecs convert bytes to bytes (e.g. for compression or encryption).
converters are a combination of payload converters, external payload storage, payload codecs, and failure converters. Payload
converters convert Python values to/from serialized bytes. External payload storage optionally stores and retrieves payloads
to/from external storage services using drivers. Payload codecs convert bytes to bytes (e.g. for compression or encryption).
Failure converters convert exceptions to/from serialized failures.

The default data converter supports converting multiple types including:
Expand Down Expand Up @@ -455,6 +457,140 @@ my_data_converter = dataclasses.replace(

Now `IPv4Address` can be used in type hints including collections, optionals, etc.

##### External Payload Storage

⚠️ **External payload storage support is currently at an experimental release stage.** ⚠️

External payload storage allows large payloads to be offloaded to an external storage service (such as Amazon S3) rather than stored inline in workflow history. This is useful when workflows or activities work with data that would otherwise exceed Temporal's payload size limits.

External payload storage is configured via the `external_storage` parameter on `DataConverter`, which accepts a `temporalio.extstore.StorageConfig` instance. Any driver used to store payloads must also be configured on the component that retrieves them — for example, if the client stores workflow inputs using a driver, the worker must include that driver in its `StorageConfig.drivers` list to retrieve them.

The simplest setup uses a single storage driver:

```python
import dataclasses
from temporalio.client import Client
from temporalio.converter import DataConverter
from temporalio.extstore import StorageConfig

driver = MyDriver()

client = await Client.connect(
"localhost:7233",
data_converter=dataclasses.replace(
DataConverter.default,
external_storage=StorageConfig(drivers=[driver]),
),
)
```

Some things to note about external payload storage:

* Only payloads that meet or exceed `StorageConfig.payload_size_threshold` (default 256 KiB) are offloaded. Smaller payloads are stored inline as normal.
* External payload storage applies transparently to workflow inputs/outputs, activity inputs/outputs, signals, updates, queries, and failure details.
* The `DataConverter`'s `payload_codec` (if configured) is applied to the *reference* payload stored in workflow history, not to the externally stored bytes. To encrypt or compress the bytes handed to a driver, use `StorageConfig.payload_codec`.
* Setting `StorageConfig.payload_size_threshold` to `None` causes every payload to be considered for external payload storage regardless of size.

###### Multiple Drivers and Driver Selection

When multiple storage backends are needed, list all drivers in `StorageConfig.drivers` and provide a `driver_selector` to control which driver stores new payloads. Any driver in the list not chosen for storing is still available for retrieval, which is useful when migrating between storage backends.

```python
from temporalio.extstore import StorageConfig

options = StorageConfig(
drivers=[hot_driver, cold_driver],
driver_selector=lambda context, payload: (
hot_driver if payload.ByteSize() < 5 * 1024 * 1024 else cold_driver
),
)
```

For stateful or class-based selection logic, implement a callable class. If it also implements `temporalio.converter.WithSerializationContext`, it will receive workflow or activity context (namespace, workflow ID, etc.) at serialization time, just like a driver or payload codec:

```python
from typing_extensions import Self
import temporalio.converter
import temporalio.extstore
from temporalio.api.common.v1 import Payload

class FeatureFlaggedDriverSelector(temporalio.converter.WithSerializationContext):
def __init__(self, driver: temporalio.extstore.StorageDriver, enabled: bool = False):
self._driver = driver
self._enabled = enabled

def __call__(
self, _context: temporalio.extstore.StorageDriverContext, _payload: Payload
) -> temporalio.extstore.StorageDriver | None:
return self._driver if self._enabled else None

def with_context(self, context: temporalio.converter.SerializationContext) -> Self:
workflow_id = None
if isinstance(context, temporalio.converter.WorkflowSerializationContext) and context.workflow_id:
workflow_id = context.workflow_id
elif isinstance(context, temporalio.converter.ActivitySerializationContext) and context.workflow_id:
workflow_id = context.workflow_id

return FeatureFlaggedDriverSelector(
self._driver, FeatureFlaggedDriverSelector.feature_flag_is_on(workflow_id)
)

@staticmethod
def feature_flag_is_on(workflow_id: str | None) -> bool:
"""Mock implementation of a feature flag based on a workflow ID."""
return workflow_id is not None and len(workflow_id) % 2 == 0
```

Some things to note about driver selection:

* When no `driver_selector` is set, the first driver in `StorageConfig.drivers` is always used for storing.
* Returning `None` from a selector leaves the payload stored inline in workflow history rather than offloading it.
* The driver returned by the selector must be registered in `StorageConfig.drivers`. If it is not, a `RuntimeError` is raised.

###### Custom Drivers

Implement `temporalio.extstore.StorageDriver` to integrate with an external storage system:

```python
from collections.abc import Sequence
from temporalio.extstore import StorageDriver, StorageDriverClaim, StorageDriverContext
from temporalio.api.common.v1 import Payload

class MyDriver(StorageDriver):
def __init__(self, driver_name: str | None = None):
self._driver_name = driver_name or "my-org:driver:my-driver"

def name(self) -> str:
return self._driver_name

async def store(
self, context: StorageDriverContext, payloads: Sequence[Payload]
) -> list[StorageDriverClaim]:
claims = []
for payload in payloads:
key = await my_storage.put(payload.SerializeToString())
claims.append(StorageDriverClaim(data={"key": key}))
return claims

async def retrieve(
self, context: StorageDriverContext, claims: Sequence[StorageDriverClaim]
) -> list[Payload]:
payloads = []
for claim in claims:
data = await my_storage.get(claim.data["key"])
p = Payload()
p.ParseFromString(data)
payloads.append(p)
return payloads
```

Some things to note about implementing a custom driver:

* `store` and `retrieve` must return lists of the same length as their respective input sequences.
* `StorageDriver.name()` must return a string that is unique among all drivers in `StorageConfig.drivers`. This name is embedded in the reference payload stored in workflow history and used to look up the correct driver during retrieval — changing it after payloads have been stored will break retrieval.
* `StorageDriver.type()` is automatically implemented to return the name of the class. This can be overridden in subclasses but must remain consistent across all instances of the subclass.
* Implement `temporalio.converter.WithSerializationContext` on your driver to receive workflow or activity context (namespace, workflow ID, activity ID, etc.) at serialization time.

### Workers

Workers host workflows and/or activities. Here's how to run a worker:
Expand Down
7 changes: 3 additions & 4 deletions temporalio/bridge/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -303,10 +303,9 @@ async def decode_activation(
decode_headers: bool,
) -> None:
"""Decode all payloads in the activation."""
if data_converter._decode_payload_has_effect:
await CommandAwarePayloadVisitor(
skip_search_attributes=True, skip_headers=not decode_headers
).visit(_Visitor(data_converter._decode_payload_sequence), activation)
await CommandAwarePayloadVisitor(
skip_search_attributes=True, skip_headers=not decode_headers
).visit(_Visitor(data_converter._decode_payload_sequence), activation)


async def encode_completion(
Expand Down
57 changes: 44 additions & 13 deletions temporalio/converter.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from itertools import zip_longest
from logging import getLogger
from typing import (
TYPE_CHECKING,
Any,
ClassVar,
Literal,
Expand All @@ -44,6 +45,9 @@
import temporalio.exceptions
import temporalio.types

if TYPE_CHECKING:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unless you are hitting a bug, I'd be ok just importing this regardless of whether type checking (even if at the moment it is only for type checking)

import temporalio.extstore # avoid circular import at runtime

if sys.version_info < (3, 11):
# Python's datetime.fromisoformat doesn't support certain formats pre-3.11
from dateutil import parser # type: ignore
Expand Down Expand Up @@ -1359,15 +1363,25 @@ class DataConverter(WithSerializationContext):
payload_limits: PayloadLimitsConfig = PayloadLimitsConfig()
"""Settings for payload size limits."""

external_storage: temporalio.extstore.StorageConfig | None = None
"""Options for external storage. If None, external storage is disabled.

.. warning::
This API is experimental.
"""

default: ClassVar[DataConverter]
"""Singleton default data converter."""

_storage_impl: temporalio.extstore._StorageImpl = dataclasses.field(init=False)

_payload_error_limits: _ServerPayloadErrorLimits | None = None
"""Server-reported limits for payloads."""

def __post_init__(self) -> None: # noqa: D105
object.__setattr__(self, "payload_converter", self.payload_converter_class())
object.__setattr__(self, "failure_converter", self.failure_converter_class())
self._reset_external_storage_middleware()

async def encode(
self, values: Sequence[Any]
Expand Down Expand Up @@ -1445,27 +1459,44 @@ def with_context(self, context: SerializationContext) -> Self:
payload_converter = self.payload_converter
payload_codec = self.payload_codec
failure_converter = self.failure_converter
external_storage = self.external_storage
if isinstance(payload_converter, WithSerializationContext):
payload_converter = payload_converter.with_context(context)
if isinstance(payload_codec, WithSerializationContext):
payload_codec = payload_codec.with_context(context)
if isinstance(failure_converter, WithSerializationContext):
failure_converter = failure_converter.with_context(context)
if isinstance(external_storage, WithSerializationContext):
external_storage = external_storage.with_context(context)
if all(
new is orig
for new, orig in [
(payload_converter, self.payload_converter),
(payload_codec, self.payload_codec),
(failure_converter, self.failure_converter),
(external_storage, self.external_storage),
]
):
return self
cloned = dataclasses.replace(self)
object.__setattr__(cloned, "payload_converter", payload_converter)
object.__setattr__(cloned, "payload_codec", payload_codec)
object.__setattr__(cloned, "failure_converter", failure_converter)
object.__setattr__(cloned, "external_storage", external_storage)
cloned._reset_external_storage_middleware(context)
return cloned

def _reset_external_storage_middleware(
self, context: SerializationContext | None = None
) -> None:
import temporalio.extstore # lazy import to avoid circular dependency

object.__setattr__(
self,
"_external_storage_middleware",
temporalio.extstore._StorageImpl(self.external_storage, context),
)

def _with_payload_error_limits(
self, limits: _ServerPayloadErrorLimits | None
) -> DataConverter:
Expand Down Expand Up @@ -1523,48 +1554,48 @@ async def _encode_memo_existing(
async def _encode_payload(
self, payload: temporalio.api.common.v1.Payload
) -> temporalio.api.common.v1.Payload:
payload = await self._storage_impl.store_payload(payload)
if self.payload_codec:
payload = (await self.payload_codec.encode([payload]))[0]
self._validate_payload_limits([payload])
return payload

async def _encode_payloads(self, payloads: temporalio.api.common.v1.Payloads):
await self._storage_impl.store_payloads(payloads)
if self.payload_codec:
await self.payload_codec.encode_wrapper(payloads)
self._validate_payload_limits(payloads.payloads)

async def _encode_payload_sequence(
self, payloads: Sequence[temporalio.api.common.v1.Payload]
) -> list[temporalio.api.common.v1.Payload]:
encoded_payloads = list(payloads)
result = await self._storage_impl.store_payload_sequence(payloads)
if self.payload_codec:
encoded_payloads = await self.payload_codec.encode(encoded_payloads)
self._validate_payload_limits(encoded_payloads)
return encoded_payloads
result = await self.payload_codec.encode(result)
self._validate_payload_limits(result)
return result

async def _decode_payload(
self, payload: temporalio.api.common.v1.Payload
) -> temporalio.api.common.v1.Payload:
if self.payload_codec:
payload = (await self.payload_codec.decode([payload]))[0]
payload = await self._storage_impl.retrieve_payload(payload)
return payload

async def _decode_payloads(self, payloads: temporalio.api.common.v1.Payloads):
if self.payload_codec:
await self.payload_codec.decode_wrapper(payloads)
await self._storage_impl.retrieve_payloads(payloads)

async def _decode_payload_sequence(
self, payloads: Sequence[temporalio.api.common.v1.Payload]
) -> list[temporalio.api.common.v1.Payload]:
if not self.payload_codec:
return list(payloads)
return await self.payload_codec.decode(payloads)

# Temporary shortcircuit detection while the _decode_* methods may no-op if
# a payload codec is not configured. Remove once those paths have more to them.
@property
def _decode_payload_has_effect(self) -> bool:
return self.payload_codec is not None
result = list(payloads)
if self.payload_codec:
result = await self.payload_codec.decode(result)
result = await self._storage_impl.retrieve_payload_sequence(result)
return result

@staticmethod
async def _apply_to_failure_payloads(
Expand Down
Loading
Loading