diff --git a/README.md b/README.md index d4d6a61b..2cb2d620 100644 --- a/README.md +++ b/README.md @@ -75,10 +75,14 @@ Some examples require extra dependencies. See each sample's directory for specif * [message_passing/introduction](message_passing/introduction/) - Introduction to queries, signals, and updates. * [message_passing/safe_message_handlers](message_passing/safe_message_handlers/) - Safely handling updates and signals. * [message_passing/update_with_start/lazy_initialization](message_passing/update_with_start/lazy_initialization/) - Use update-with-start to update a Shopping Cart, starting it if it does not exist. +* [Nexus Messaging](nexus_messaging): Demonstrates how send signal, update and query messages through Nexus. + This contains two samples, one sending messages to an existing workflow and a second that creates a workflow through Nexus + and sends messages to it. * [open_telemetry](open_telemetry) - Trace workflows with OpenTelemetry. * [patching](patching) - Alter workflows safely with `patch` and `deprecate_patch`. * [polling](polling) - Recommended implementation of an activity that needs to periodically poll an external resource waiting its successful completion. * [prometheus](prometheus) - Configure Prometheus metrics on clients/workers. +* [workflow_streams](workflow_streams) - Workflow-hosted durable event stream via `temporalio.contrib.workflow_streams`. **Experimental** * [pydantic_converter](pydantic_converter) - Data converter for using Pydantic models. * [schedules](schedules) - Demonstrates a Workflow Execution that occurs according to a schedule. * [sentry](sentry) - Report errors to Sentry. diff --git a/hello_nexus/handler/service_handler.py b/hello_nexus/handler/service_handler.py index 1295abd1..812a5c5d 100644 --- a/hello_nexus/handler/service_handler.py +++ b/hello_nexus/handler/service_handler.py @@ -4,8 +4,6 @@ from __future__ import annotations -import uuid - import nexusrpc from temporalio import nexus @@ -33,7 +31,7 @@ async def my_workflow_run_operation( return await ctx.start_workflow( WorkflowStartedByNexusOperation.run, input, - id=str(uuid.uuid4()), + id=f"hello-nexus-workflow-{input.name}", ) # This is a Nexus operation that responds synchronously to all requests. That means diff --git a/nexus_cancel/handler/service_handler.py b/nexus_cancel/handler/service_handler.py index 92868510..5d75e8a6 100644 --- a/nexus_cancel/handler/service_handler.py +++ b/nexus_cancel/handler/service_handler.py @@ -1,8 +1,9 @@ """ Nexus service handler for the cancellation sample. -The hello operation is backed by a workflow, using the Nexus request ID as the -workflow ID for idempotency across retries. +The hello operation is backed by a workflow whose ID is derived from the +operation input (name + language), giving each fan-out branch a distinct, +meaningful business ID. """ from __future__ import annotations @@ -23,5 +24,5 @@ async def hello( return await ctx.start_workflow( HelloHandlerWorkflow.run, input, - id=ctx.request_id, + id=f"hello-handler-{input.name}-{input.language.name}", ) diff --git a/nexus_messaging/README.md b/nexus_messaging/README.md new file mode 100644 index 00000000..3103fc0f --- /dev/null +++ b/nexus_messaging/README.md @@ -0,0 +1,16 @@ +This sample shows how to expose a long-running Workflow's queries, updates, and signals as Nexus +operations. There are two self-contained examples, each in its own directory: + +| | `callerpattern/` | `ondemandpattern/` | +|--------------------------------|--------------------------------------|--------------------------------------------------------------| +| **Pattern** | Signal an existing Workflow | Create and run Workflows on demand, and send signals to them | +| **Who creates the Workflow?** | The handler worker starts it on boot | The caller starts it via a Nexus operation | +| **Who knows the Workflow ID?** | Only the handler | The caller chooses and passes it in every operation | +| **Nexus service** | `NexusGreetingService` | `NexusRemoteGreetingService` | + +Each directory is fully self-contained for clarity. The `GreetingWorkflow`, activity, and +`Language` enum are **identical** between the two -- only the Nexus service definition and its +handler implementation differ. This highlights that the same Workflow can be exposed through +Nexus in different ways depending on whether the caller needs lifecycle control. + +See each directory's README for running instructions. diff --git a/nexus_sync_operations/__init__.py b/nexus_messaging/__init__.py similarity index 100% rename from nexus_sync_operations/__init__.py rename to nexus_messaging/__init__.py diff --git a/nexus_messaging/callerpattern/README.md b/nexus_messaging/callerpattern/README.md new file mode 100644 index 00000000..9458ae43 --- /dev/null +++ b/nexus_messaging/callerpattern/README.md @@ -0,0 +1,56 @@ +## Caller pattern + +The handler worker starts a `GreetingWorkflow` for a User ID. +`NexusGreetingServiceHandler` holds that ID and routes every Nexus operation to it. +The caller's input does not have that Workflow ID as the caller doesn't know it -- but the caller +sends in the User ID, and `NexusGreetingServiceHandler` knows how to get the desired Workflow ID +from that User ID (see the `get_workflow_id` call). + +The handler worker uses the same `get_workflow_id` call to generate a Workflow ID from a Wser ID +when it launches the Workflow. + +The caller Workflow: +1. Queries for supported languages (`get_languages` -- backed by a `@workflow.query`) +2. Changes the language to Arabic (`set_language` -- backed by a `@workflow.update` that calls an activity) +3. Confirms the change via a second query (`get_language`) +4. Approves the Workflow (`approve` -- backed by a `@workflow.signal`) + +### Running + +Start a Temporal server: + +```bash +temporal server start-dev +``` + +Create the namespaces and Nexus endpoint: + +```bash +temporal operator namespace create --namespace nexus-messaging-handler-namespace +temporal operator namespace create --namespace nexus-messaging-caller-namespace + +temporal operator nexus endpoint create \ + --name nexus-messaging-nexus-endpoint \ + --target-namespace nexus-messaging-handler-namespace \ + --target-task-queue nexus-messaging-handler-task-queue +``` + +In one terminal, start the handler worker: + +```bash +uv run python -m nexus_messaging.callerpattern.handler.worker +``` + +In another terminal, run the following command to start the example: + +```bash +uv run python -m nexus_messaging.callerpattern.caller.app +``` + +Expected output: + +``` +Supported languages: [, ] +Language changed: ENGLISH -> ARABIC +Workflow approved +``` diff --git a/nexus_sync_operations/caller/__init__.py b/nexus_messaging/callerpattern/__init__.py similarity index 100% rename from nexus_sync_operations/caller/__init__.py rename to nexus_messaging/callerpattern/__init__.py diff --git a/nexus_sync_operations/handler/__init__.py b/nexus_messaging/callerpattern/caller/__init__.py similarity index 100% rename from nexus_sync_operations/handler/__init__.py rename to nexus_messaging/callerpattern/caller/__init__.py diff --git a/nexus_sync_operations/caller/app.py b/nexus_messaging/callerpattern/caller/app.py similarity index 73% rename from nexus_sync_operations/caller/app.py rename to nexus_messaging/callerpattern/caller/app.py index 375628d2..933dcd5d 100644 --- a/nexus_sync_operations/caller/app.py +++ b/nexus_messaging/callerpattern/caller/app.py @@ -6,15 +6,13 @@ from temporalio.envconfig import ClientConfig from temporalio.worker import Worker -from nexus_sync_operations.caller.workflows import CallerWorkflow +from nexus_messaging.callerpattern.caller.workflows import CallerWorkflow -NAMESPACE = "nexus-sync-operations-caller-namespace" -TASK_QUEUE = "nexus-sync-operations-caller-task-queue" +NAMESPACE = "nexus-messaging-caller-namespace" +TASK_QUEUE = "nexus-messaging-caller-task-queue" -async def execute_caller_workflow( - client: Optional[Client] = None, -) -> None: +async def execute_caller_workflow(client: Optional[Client] = None) -> None: if client is None: config = ClientConfig.load_client_connect_config() config.setdefault("target_host", "localhost:7233") @@ -28,7 +26,8 @@ async def execute_caller_workflow( ): log = await client.execute_workflow( CallerWorkflow.run, - id=str(uuid.uuid4()), + arg="user-1", + id=f"nexus-messaging-caller-{uuid.uuid4()}", task_queue=TASK_QUEUE, ) for line in log: diff --git a/nexus_messaging/callerpattern/caller/workflows.py b/nexus_messaging/callerpattern/caller/workflows.py new file mode 100644 index 00000000..04f79dad --- /dev/null +++ b/nexus_messaging/callerpattern/caller/workflows.py @@ -0,0 +1,73 @@ +""" +A caller workflow that executes Nexus operations. The caller does not have information +about how these operations are implemented by the Nexus service. +""" + +from temporalio import workflow +from temporalio.exceptions import ApplicationError + +from nexus_messaging.callerpattern.service import ( + ApproveInput, + GetLanguageInput, + GetLanguagesInput, + Language, + NexusGreetingService, + SetLanguageInput, +) + +NEXUS_ENDPOINT = "nexus-messaging-nexus-endpoint" + + +@workflow.defn +class CallerWorkflow: + @workflow.run + async def run(self, user_id: str) -> list[str]: + log: list[str] = [] + nexus_client = workflow.create_nexus_client( + service=NexusGreetingService, + endpoint=NEXUS_ENDPOINT, + ) + + # Call a Nexus operation backed by a query against the entity workflow. + # The workflow must already be running on the handler, otherwise you will + # get an error saying the workflow has already terminated. + languages_output = await nexus_client.execute_operation( + NexusGreetingService.get_languages, + GetLanguagesInput(include_unsupported=False, user_id=user_id), + ) + log.append(f"Supported languages: {languages_output.languages}") + workflow.logger.info("Supported languages: %s", languages_output.languages) + + # Following are examples for each of the three messaging types - + # update, query, then signal. + + # Call a Nexus operation backed by an update against the entity workflow. + previous_language = await nexus_client.execute_operation( + NexusGreetingService.set_language, + SetLanguageInput(language=Language.ARABIC, user_id=user_id), + ) + + # Call a Nexus operation backed by a query to confirm the language change. + current_language = await nexus_client.execute_operation( + NexusGreetingService.get_language, + GetLanguageInput(user_id=user_id), + ) + if current_language != Language.ARABIC: + raise ApplicationError(f"Expected language ARABIC, got {current_language}") + + log.append( + f"Language changed: {previous_language.name} -> {Language.ARABIC.name}" + ) + workflow.logger.info( + "Language changed from %s to %s", previous_language, Language.ARABIC + ) + + # Call a Nexus operation backed by a signal against the entity workflow. + await nexus_client.execute_operation( + NexusGreetingService.approve, + ApproveInput(name="caller", user_id=user_id), + ) + log.append("Workflow approved") + workflow.logger.info("Workflow approved") + + return log diff --git a/nexus_messaging/callerpattern/handler/__init__.py b/nexus_messaging/callerpattern/handler/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/nexus_messaging/callerpattern/handler/activities.py b/nexus_messaging/callerpattern/handler/activities.py new file mode 100644 index 00000000..4031b34f --- /dev/null +++ b/nexus_messaging/callerpattern/handler/activities.py @@ -0,0 +1,22 @@ +import asyncio +from typing import Optional + +from temporalio import activity + +from nexus_messaging.callerpattern.service import Language + + +@activity.defn +async def call_greeting_service(language: Language) -> Optional[str]: + """Simulates a call to a remote greeting service. Returns None if unsupported.""" + greetings = { + Language.ARABIC: "\u0645\u0631\u062d\u0628\u0627 \u0628\u0627\u0644\u0639\u0627\u0644\u0645", + Language.CHINESE: "\u4f60\u597d\uff0c\u4e16\u754c", + Language.ENGLISH: "Hello, world", + Language.FRENCH: "Bonjour, monde", + Language.HINDI: "\u0928\u092e\u0938\u094d\u0924\u0947 \u0926\u0941\u0928\u093f\u092f\u093e", + Language.PORTUGUESE: "Ol\u00e1 mundo", + Language.SPANISH: "Hola mundo", + } + await asyncio.sleep(0.2) + return greetings.get(language) diff --git a/nexus_messaging/callerpattern/handler/service_handler.py b/nexus_messaging/callerpattern/handler/service_handler.py new file mode 100644 index 00000000..cbc57ead --- /dev/null +++ b/nexus_messaging/callerpattern/handler/service_handler.py @@ -0,0 +1,80 @@ +""" +Nexus operation handler implementation for the entity pattern. Each operation receives a +user_id, which is mapped to a workflow ID. The operations are synchronous because queries +and updates against a running workflow complete quickly. +""" + +from __future__ import annotations + +import nexusrpc +from temporalio import nexus +from temporalio.client import WorkflowHandle + +from nexus_messaging.callerpattern.handler.workflows import GreetingWorkflow +from nexus_messaging.callerpattern.service import ( + ApproveInput, + ApproveOutput, + GetLanguageInput, + GetLanguagesInput, + GetLanguagesOutput, + Language, + NexusGreetingService, + SetLanguageInput, +) + +WORKFLOW_ID_PREFIX = "GreetingWorkflow_for_" + + +def get_workflow_id(user_id: str) -> str: + """Map a user ID to a workflow ID. + + This example assumes you might have multiple workflows, one for each user. + If you had a single workflow for all users, you could remove this function, + remove the user_id from each input, and just use a single workflow ID. + """ + return f"{WORKFLOW_ID_PREFIX}{user_id}" + + +@nexusrpc.handler.service_handler(service=NexusGreetingService) +class NexusGreetingServiceHandler: + def _get_workflow_handle( + self, user_id: str + ) -> WorkflowHandle[GreetingWorkflow, str]: + return nexus.client().get_workflow_handle_for( + GreetingWorkflow.run, get_workflow_id(user_id) + ) + + @nexusrpc.handler.sync_operation + async def get_languages( + self, ctx: nexusrpc.handler.StartOperationContext, input: GetLanguagesInput + ) -> GetLanguagesOutput: + return await self._get_workflow_handle(input.user_id).query( + GreetingWorkflow.get_languages, input + ) + + @nexusrpc.handler.sync_operation + async def get_language( + self, ctx: nexusrpc.handler.StartOperationContext, input: GetLanguageInput + ) -> Language: + return await self._get_workflow_handle(input.user_id).query( + GreetingWorkflow.get_language + ) + + # Routes to set_language_using_activity (not set_language) so that new languages not + # already in the greetings map can be fetched via an activity. + @nexusrpc.handler.sync_operation + async def set_language( + self, ctx: nexusrpc.handler.StartOperationContext, input: SetLanguageInput + ) -> Language: + return await self._get_workflow_handle(input.user_id).execute_update( + GreetingWorkflow.set_language_using_activity, input + ) + + @nexusrpc.handler.sync_operation + async def approve( + self, ctx: nexusrpc.handler.StartOperationContext, input: ApproveInput + ) -> ApproveOutput: + await self._get_workflow_handle(input.user_id).signal( + GreetingWorkflow.approve, input + ) + return ApproveOutput() diff --git a/nexus_messaging/callerpattern/handler/worker.py b/nexus_messaging/callerpattern/handler/worker.py new file mode 100644 index 00000000..fa8e2c0f --- /dev/null +++ b/nexus_messaging/callerpattern/handler/worker.py @@ -0,0 +1,62 @@ +import asyncio +import logging +from typing import Optional + +from temporalio.client import Client +from temporalio.common import WorkflowIDConflictPolicy +from temporalio.envconfig import ClientConfig +from temporalio.worker import Worker + +from nexus_messaging.callerpattern.handler.activities import call_greeting_service +from nexus_messaging.callerpattern.handler.service_handler import ( + NexusGreetingServiceHandler, + get_workflow_id, +) +from nexus_messaging.callerpattern.handler.workflows import GreetingWorkflow + +interrupt_event = asyncio.Event() + +NAMESPACE = "nexus-messaging-handler-namespace" +TASK_QUEUE = "nexus-messaging-handler-task-queue" +USER_ID = "user-1" + + +async def main(client: Optional[Client] = None): + logging.basicConfig(level=logging.INFO) + + if client is None: + config = ClientConfig.load_client_connect_config() + config.setdefault("target_host", "localhost:7233") + config.setdefault("namespace", NAMESPACE) + client = await Client.connect(**config) + + # Start the long-running entity workflow that backs the Nexus service, + # if not already running. + workflow_id = get_workflow_id(USER_ID) + await client.start_workflow( + GreetingWorkflow.run, + id=workflow_id, + task_queue=TASK_QUEUE, + id_conflict_policy=WorkflowIDConflictPolicy.USE_EXISTING, + ) + logging.info("Started greeting workflow: %s", workflow_id) + + async with Worker( + client, + task_queue=TASK_QUEUE, + workflows=[GreetingWorkflow], + activities=[call_greeting_service], + nexus_service_handlers=[NexusGreetingServiceHandler()], + ): + logging.info("Handler worker started, ctrl+c to exit") + await interrupt_event.wait() + logging.info("Shutting down") + + +if __name__ == "__main__": + loop = asyncio.new_event_loop() + try: + loop.run_until_complete(main()) + except KeyboardInterrupt: + interrupt_event.set() + loop.run_until_complete(loop.shutdown_asyncgens()) diff --git a/nexus_messaging/callerpattern/handler/workflows.py b/nexus_messaging/callerpattern/handler/workflows.py new file mode 100644 index 00000000..fb10d8c5 --- /dev/null +++ b/nexus_messaging/callerpattern/handler/workflows.py @@ -0,0 +1,87 @@ +""" +A long-running "entity" workflow that backs the NexusGreetingService Nexus operations. +The workflow exposes queries, an update, and a signal. These are private implementation +details of the Nexus service: the caller only interacts via Nexus operations. +""" + +import asyncio +from datetime import timedelta + +from temporalio import workflow +from temporalio.exceptions import ApplicationError + +from nexus_messaging.callerpattern.handler.activities import call_greeting_service +from nexus_messaging.callerpattern.service import ( + ApproveInput, + GetLanguagesInput, + GetLanguagesOutput, + Language, + SetLanguageInput, +) + + +@workflow.defn +class GreetingWorkflow: + def __init__(self) -> None: + self.approved_for_release = False + self.greetings: dict[Language, str] = { + Language.CHINESE: "\u4f60\u597d\uff0c\u4e16\u754c", + Language.ENGLISH: "Hello, world", + } + self.language = Language.ENGLISH + self.lock = asyncio.Lock() + + @workflow.run + async def run(self) -> str: + # Wait until approved and all in-flight update handlers have finished. + await workflow.wait_condition( + lambda: self.approved_for_release and workflow.all_handlers_finished() + ) + return self.greetings[self.language] + + @workflow.query + def get_languages(self, input: GetLanguagesInput) -> GetLanguagesOutput: + if input.include_unsupported: + languages = sorted(Language) + else: + languages = sorted(self.greetings) + return GetLanguagesOutput(languages=languages) + + @workflow.query + def get_language(self) -> Language: + return self.language + + @workflow.signal + def approve(self, input: ApproveInput) -> None: + workflow.logger.info("Approval signal received for user %s", input.user_id) + self.approved_for_release = True + + @workflow.update + def set_language(self, input: SetLanguageInput) -> Language: + workflow.logger.info("setLanguage update received for user %s", input.user_id) + previous_language, self.language = self.language, input.language + return previous_language + + @set_language.validator + def validate_set_language(self, input: SetLanguageInput) -> None: + if input.language not in self.greetings: + raise ValueError(f"{input.language.name} is not supported") + + # Changes the active language, calling an activity to fetch a greeting for new + # languages not already in the greetings map. + @workflow.update + async def set_language_using_activity(self, input: SetLanguageInput) -> Language: + if input.language not in self.greetings: + async with self.lock: + greeting = await workflow.execute_activity( + call_greeting_service, + input.language, + start_to_close_timeout=timedelta(seconds=10), + ) + if greeting is None: + raise ApplicationError( + f"Greeting service does not support {input.language.name}" + ) + self.greetings[input.language] = greeting + previous_language, self.language = self.language, input.language + return previous_language diff --git a/nexus_messaging/callerpattern/service.py b/nexus_messaging/callerpattern/service.py new file mode 100644 index 00000000..23a550fb --- /dev/null +++ b/nexus_messaging/callerpattern/service.py @@ -0,0 +1,67 @@ +""" +Nexus service definition for the caller (entity) pattern. Shared between the handler and +caller. The caller uses this to create a type-safe Nexus client; the handler implements +the operations. + +Every operation includes a user_id so the handler knows which entity workflow to target. +""" + +from dataclasses import dataclass +from enum import IntEnum + +import nexusrpc + + +class Language(IntEnum): + ARABIC = 1 + CHINESE = 2 + ENGLISH = 3 + FRENCH = 4 + HINDI = 5 + PORTUGUESE = 6 + SPANISH = 7 + + +@dataclass +class GetLanguagesInput: + include_unsupported: bool + user_id: str + + +@dataclass +class GetLanguagesOutput: + languages: list[Language] + + +@dataclass +class GetLanguageInput: + user_id: str + + +@dataclass +class SetLanguageInput: + language: Language + user_id: str + + +@dataclass +class ApproveInput: + name: str + user_id: str + + +@dataclass +class ApproveOutput: + pass + + +@nexusrpc.service +class NexusGreetingService: + # Returns the languages supported by the greeting workflow. + get_languages: nexusrpc.Operation[GetLanguagesInput, GetLanguagesOutput] + # Returns the currently active language. + get_language: nexusrpc.Operation[GetLanguageInput, Language] + # Changes the active language, returning the previous one. + set_language: nexusrpc.Operation[SetLanguageInput, Language] + # Approves the workflow, allowing it to complete. + approve: nexusrpc.Operation[ApproveInput, ApproveOutput] diff --git a/nexus_messaging/endpoint_description.md b/nexus_messaging/endpoint_description.md new file mode 100644 index 00000000..4184134b --- /dev/null +++ b/nexus_messaging/endpoint_description.md @@ -0,0 +1,14 @@ +## Services + +### [NexusGreetingService](https://github.com/temporalio/samples-python/blob/main/nexus_messaging/callerpattern/service.py) (callerpattern) +- operation: `get_languages` +- operation: `get_language` +- operation: `set_language` +- operation: `approve` + +### [NexusRemoteGreetingService](https://github.com/temporalio/samples-python/blob/main/nexus_messaging/ondemandpattern/service.py) (ondemandpattern) +- operation: `run_from_remote` +- operation: `get_languages` +- operation: `get_language` +- operation: `set_language` +- operation: `approve` diff --git a/nexus_messaging/ondemandpattern/README.md b/nexus_messaging/ondemandpattern/README.md new file mode 100644 index 00000000..b9ba67ff --- /dev/null +++ b/nexus_messaging/ondemandpattern/README.md @@ -0,0 +1,60 @@ +## On-demand pattern + +No Workflow is pre-started. The caller creates and controls Workflow instances through Nexus +operations. `NexusRemoteGreetingService` adds a `run_from_remote` operation that starts a new +`GreetingWorkflow`, and every other operation includes a `user_id` so the handler knows which +instance to target. + +The caller Workflow: +1. Starts two remote `GreetingWorkflow` instances via `run_from_remote` (backed by `workflow_run_operation`) +2. Queries each for supported languages +3. Changes the language on each (Arabic and Hindi) +4. Confirms the changes via queries +5. Approves both Workflows +6. Waits for each to complete and returns their results + +### Running + +Start a Temporal server: + +```bash +temporal server start-dev +``` + +Create the namespaces and Nexus endpoint: + +```bash +temporal operator namespace create --namespace nexus-messaging-handler-namespace +temporal operator namespace create --namespace nexus-messaging-caller-namespace + +temporal operator nexus endpoint create \ + --name nexus-messaging-nexus-endpoint \ + --target-namespace nexus-messaging-handler-namespace \ + --target-task-queue nexus-messaging-handler-task-queue +``` + +In one terminal, start the handler worker: + +```bash +uv run python -m nexus_messaging.ondemandpattern.handler.worker +``` + +In another terminal, run the following command to start the example: + +```bash +uv run python -m nexus_messaging.ondemandpattern.caller.app +``` + +Expected output: + +``` +started remote greeting workflow: UserId One +started remote greeting workflow: UserId Two +Supported languages for UserId One: [, ] +Supported languages for UserId Two: [, ] +UserId One changed language: ENGLISH -> ARABIC +UserId Two changed language: ENGLISH -> HINDI +Workflows approved +Workflow one result: مرحبا بالعالم +Workflow two result: नमस्ते दुनिया +``` diff --git a/nexus_messaging/ondemandpattern/__init__.py b/nexus_messaging/ondemandpattern/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/nexus_messaging/ondemandpattern/caller/__init__.py b/nexus_messaging/ondemandpattern/caller/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/nexus_messaging/ondemandpattern/caller/app.py b/nexus_messaging/ondemandpattern/caller/app.py new file mode 100644 index 00000000..a1837bab --- /dev/null +++ b/nexus_messaging/ondemandpattern/caller/app.py @@ -0,0 +1,41 @@ +import asyncio +import uuid +from typing import Optional + +from temporalio.client import Client +from temporalio.envconfig import ClientConfig +from temporalio.worker import Worker + +from nexus_messaging.ondemandpattern.caller.workflows import CallerRemoteWorkflow + +NAMESPACE = "nexus-messaging-caller-namespace" +TASK_QUEUE = "nexus-messaging-caller-remote-task-queue" + + +async def execute_caller_workflow(client: Optional[Client] = None) -> None: + if client is None: + config = ClientConfig.load_client_connect_config() + config.setdefault("target_host", "localhost:7233") + config.setdefault("namespace", NAMESPACE) + client = await Client.connect(**config) + + async with Worker( + client, + task_queue=TASK_QUEUE, + workflows=[CallerRemoteWorkflow], + ): + log = await client.execute_workflow( + CallerRemoteWorkflow.run, + id=f"nexus-messaging-remote-caller-{uuid.uuid4()}", + task_queue=TASK_QUEUE, + ) + for line in log: + print(line) + + +if __name__ == "__main__": + loop = asyncio.new_event_loop() + try: + loop.run_until_complete(execute_caller_workflow()) + except KeyboardInterrupt: + loop.run_until_complete(loop.shutdown_asyncgens()) diff --git a/nexus_messaging/ondemandpattern/caller/workflows.py b/nexus_messaging/ondemandpattern/caller/workflows.py new file mode 100644 index 00000000..8dbbd4b2 --- /dev/null +++ b/nexus_messaging/ondemandpattern/caller/workflows.py @@ -0,0 +1,148 @@ +""" +A caller workflow that creates and controls workflow instances through Nexus operations. +Unlike the entity (callerpattern), no workflow is pre-started; the caller creates them +on demand via the run_from_remote operation. +""" + +from temporalio import workflow + +from nexus_messaging.ondemandpattern.service import ( + ApproveInput, + GetLanguageInput, + GetLanguagesInput, + Language, + NexusRemoteGreetingService, + RunFromRemoteInput, + SetLanguageInput, +) + +NEXUS_ENDPOINT = "nexus-messaging-nexus-endpoint" + +REMOTE_WORKFLOW_ONE = "UserId One" +REMOTE_WORKFLOW_TWO = "UserId Two" + + +@workflow.defn +class CallerRemoteWorkflow: + def __init__(self) -> None: + self.nexus_client = workflow.create_nexus_client( + service=NexusRemoteGreetingService, + endpoint=NEXUS_ENDPOINT, + ) + + @workflow.run + async def run(self) -> list[str]: + log: list[str] = [] + + # Each call is performed twice in this example. This assumes there are two + # users we want to process. The first calls start two workflows, one for each + # user. Subsequent calls perform different actions between the two users. + + # This is an async Nexus operation -- starts a workflow on the handler and + # returns a handle. Unlike the sync operations below, this does not block + # until the workflow completes. It is backed by workflow_run_operation on the + # handler side. + handle_one = await self.nexus_client.start_operation( + NexusRemoteGreetingService.run_from_remote, + RunFromRemoteInput(user_id=REMOTE_WORKFLOW_ONE), + ) + log.append(f"started remote greeting workflow: {REMOTE_WORKFLOW_ONE}") + workflow.logger.info("started remote greeting workflow %s", REMOTE_WORKFLOW_ONE) + + handle_two = await self.nexus_client.start_operation( + NexusRemoteGreetingService.run_from_remote, + RunFromRemoteInput(user_id=REMOTE_WORKFLOW_TWO), + ) + log.append(f"started remote greeting workflow: {REMOTE_WORKFLOW_TWO}") + workflow.logger.info("started remote greeting workflow %s", REMOTE_WORKFLOW_TWO) + + # Query the remote workflows for supported languages. + languages_output = await self.nexus_client.execute_operation( + NexusRemoteGreetingService.get_languages, + GetLanguagesInput(include_unsupported=False, user_id=REMOTE_WORKFLOW_ONE), + ) + log.append( + f"Supported languages for {REMOTE_WORKFLOW_ONE}: " + f"{languages_output.languages}" + ) + workflow.logger.info( + "supported languages are %s for workflow %s", + languages_output.languages, + REMOTE_WORKFLOW_ONE, + ) + + languages_output = await self.nexus_client.execute_operation( + NexusRemoteGreetingService.get_languages, + GetLanguagesInput(include_unsupported=False, user_id=REMOTE_WORKFLOW_TWO), + ) + log.append( + f"Supported languages for {REMOTE_WORKFLOW_TWO}: " + f"{languages_output.languages}" + ) + workflow.logger.info( + "supported languages are %s for workflow %s", + languages_output.languages, + REMOTE_WORKFLOW_TWO, + ) + + # Update the language on each remote workflow. + previous_language_one = await self.nexus_client.execute_operation( + NexusRemoteGreetingService.set_language, + SetLanguageInput(language=Language.ARABIC, user_id=REMOTE_WORKFLOW_ONE), + ) + + previous_language_two = await self.nexus_client.execute_operation( + NexusRemoteGreetingService.set_language, + SetLanguageInput(language=Language.HINDI, user_id=REMOTE_WORKFLOW_TWO), + ) + + # Confirm the changes by querying. + current_language = await self.nexus_client.execute_operation( + NexusRemoteGreetingService.get_language, + GetLanguageInput(user_id=REMOTE_WORKFLOW_ONE), + ) + log.append( + f"{REMOTE_WORKFLOW_ONE} changed language: " + f"{previous_language_one.name} -> {current_language.name}" + ) + workflow.logger.info( + "Language changed from %s to %s for workflow %s", + previous_language_one, + current_language, + REMOTE_WORKFLOW_ONE, + ) + + current_language = await self.nexus_client.execute_operation( + NexusRemoteGreetingService.get_language, + GetLanguageInput(user_id=REMOTE_WORKFLOW_TWO), + ) + log.append( + f"{REMOTE_WORKFLOW_TWO} changed language: " + f"{previous_language_two.name} -> {current_language.name}" + ) + workflow.logger.info( + "Language changed from %s to %s for workflow %s", + previous_language_two, + current_language, + REMOTE_WORKFLOW_TWO, + ) + + # Approve both workflows so they can complete. + await self.nexus_client.execute_operation( + NexusRemoteGreetingService.approve, + ApproveInput(name="remote-caller", user_id=REMOTE_WORKFLOW_ONE), + ) + await self.nexus_client.execute_operation( + NexusRemoteGreetingService.approve, + ApproveInput(name="remote-caller", user_id=REMOTE_WORKFLOW_TWO), + ) + log.append("Workflows approved") + + # Wait for the remote workflows to finish and return their results. + result = await handle_one + log.append(f"Workflow one result: {result}") + + result = await handle_two + log.append(f"Workflow two result: {result}") + + return log diff --git a/nexus_messaging/ondemandpattern/handler/__init__.py b/nexus_messaging/ondemandpattern/handler/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/nexus_messaging/ondemandpattern/handler/activities.py b/nexus_messaging/ondemandpattern/handler/activities.py new file mode 100644 index 00000000..ba028489 --- /dev/null +++ b/nexus_messaging/ondemandpattern/handler/activities.py @@ -0,0 +1,22 @@ +import asyncio +from typing import Optional + +from temporalio import activity + +from nexus_messaging.ondemandpattern.service import Language + + +@activity.defn +async def call_greeting_service(language: Language) -> Optional[str]: + """Simulates a call to a remote greeting service. Returns None if unsupported.""" + greetings = { + Language.ARABIC: "\u0645\u0631\u062d\u0628\u0627 \u0628\u0627\u0644\u0639\u0627\u0644\u0645", + Language.CHINESE: "\u4f60\u597d\uff0c\u4e16\u754c", + Language.ENGLISH: "Hello, world", + Language.FRENCH: "Bonjour, monde", + Language.HINDI: "\u0928\u092e\u0938\u094d\u0924\u0947 \u0926\u0941\u0928\u093f\u092f\u093e", + Language.PORTUGUESE: "Ol\u00e1 mundo", + Language.SPANISH: "Hola mundo", + } + await asyncio.sleep(0.2) + return greetings.get(language) diff --git a/nexus_messaging/ondemandpattern/handler/service_handler.py b/nexus_messaging/ondemandpattern/handler/service_handler.py new file mode 100644 index 00000000..1351aae7 --- /dev/null +++ b/nexus_messaging/ondemandpattern/handler/service_handler.py @@ -0,0 +1,84 @@ +""" +Nexus operation handler for the on-demand pattern. Each operation receives the target +userId in its input, and run_from_remote starts a brand-new GreetingWorkflow. +""" + +from __future__ import annotations + +import nexusrpc +from temporalio import nexus +from temporalio.client import WorkflowHandle + +from nexus_messaging.ondemandpattern.handler.workflows import GreetingWorkflow +from nexus_messaging.ondemandpattern.service import ( + ApproveInput, + ApproveOutput, + GetLanguageInput, + GetLanguagesInput, + GetLanguagesOutput, + Language, + NexusRemoteGreetingService, + RunFromRemoteInput, + SetLanguageInput, +) + +WORKFLOW_ID_PREFIX = "GreetingWorkflow_for_" + + +@nexusrpc.handler.service_handler(service=NexusRemoteGreetingService) +class NexusRemoteGreetingServiceHandler: + def _get_workflow_id(self, user_id: str) -> str: + return WORKFLOW_ID_PREFIX + user_id + + def _get_workflow_handle( + self, user_id: str + ) -> WorkflowHandle[GreetingWorkflow, str]: + return nexus.client().get_workflow_handle_for( + GreetingWorkflow.run, self._get_workflow_id(user_id) + ) + + # Starts a new GreetingWorkflow with the caller-specified user ID. + # This is an async Nexus operation backed by workflow_run_operation. + @nexus.workflow_run_operation + async def run_from_remote( + self, ctx: nexus.WorkflowRunOperationContext, input: RunFromRemoteInput + ) -> nexus.WorkflowHandle[str]: + return await ctx.start_workflow( + GreetingWorkflow.run, + id=self._get_workflow_id(input.user_id), + ) + + @nexusrpc.handler.sync_operation + async def get_languages( + self, ctx: nexusrpc.handler.StartOperationContext, input: GetLanguagesInput + ) -> GetLanguagesOutput: + return await self._get_workflow_handle(input.user_id).query( + GreetingWorkflow.get_languages, input + ) + + @nexusrpc.handler.sync_operation + async def get_language( + self, ctx: nexusrpc.handler.StartOperationContext, input: GetLanguageInput + ) -> Language: + return await self._get_workflow_handle(input.user_id).query( + GreetingWorkflow.get_language, + ) + + # Routes to set_language_using_activity so that new languages not already in the + # greetings map can be fetched via an activity. + @nexusrpc.handler.sync_operation + async def set_language( + self, ctx: nexusrpc.handler.StartOperationContext, input: SetLanguageInput + ) -> Language: + return await self._get_workflow_handle(input.user_id).execute_update( + GreetingWorkflow.set_language_using_activity, input + ) + + @nexusrpc.handler.sync_operation + async def approve( + self, ctx: nexusrpc.handler.StartOperationContext, input: ApproveInput + ) -> ApproveOutput: + await self._get_workflow_handle(input.user_id).signal( + GreetingWorkflow.approve, input + ) + return ApproveOutput() diff --git a/nexus_sync_operations/handler/worker.py b/nexus_messaging/ondemandpattern/handler/worker.py similarity index 58% rename from nexus_sync_operations/handler/worker.py rename to nexus_messaging/ondemandpattern/handler/worker.py index 97c8eb04..5eec9cfc 100644 --- a/nexus_sync_operations/handler/worker.py +++ b/nexus_messaging/ondemandpattern/handler/worker.py @@ -6,14 +6,16 @@ from temporalio.envconfig import ClientConfig from temporalio.worker import Worker -from message_passing.introduction.activities import call_greeting_service -from message_passing.introduction.workflows import GreetingWorkflow -from nexus_sync_operations.handler.service_handler import GreetingServiceHandler +from nexus_messaging.ondemandpattern.handler.activities import call_greeting_service +from nexus_messaging.ondemandpattern.handler.service_handler import ( + NexusRemoteGreetingServiceHandler, +) +from nexus_messaging.ondemandpattern.handler.workflows import GreetingWorkflow interrupt_event = asyncio.Event() -NAMESPACE = "nexus-sync-operations-handler-namespace" -TASK_QUEUE = "nexus-sync-operations-handler-task-queue" +NAMESPACE = "nexus-messaging-handler-namespace" +TASK_QUEUE = "nexus-messaging-handler-task-queue" async def main(client: Optional[Client] = None): @@ -25,20 +27,14 @@ async def main(client: Optional[Client] = None): config.setdefault("namespace", NAMESPACE) client = await Client.connect(**config) - # Create the nexus service handler instance, starting the long-running entity workflow that - # backs the Nexus service - greeting_service_handler = await GreetingServiceHandler.create( - "nexus-sync-operations-greeting-workflow", client, TASK_QUEUE - ) - async with Worker( client, task_queue=TASK_QUEUE, workflows=[GreetingWorkflow], activities=[call_greeting_service], - nexus_service_handlers=[greeting_service_handler], + nexus_service_handlers=[NexusRemoteGreetingServiceHandler()], ): - logging.info("Worker started, ctrl+c to exit") + logging.info("Handler worker started, ctrl+c to exit") await interrupt_event.wait() logging.info("Shutting down") diff --git a/nexus_messaging/ondemandpattern/handler/workflows.py b/nexus_messaging/ondemandpattern/handler/workflows.py new file mode 100644 index 00000000..99fc5728 --- /dev/null +++ b/nexus_messaging/ondemandpattern/handler/workflows.py @@ -0,0 +1,88 @@ +""" +A long-running "entity" workflow that backs the NexusRemoteGreetingService Nexus +operations. The workflow exposes queries, an update, and a signal. These are private +implementation details of the Nexus service: the caller only interacts via Nexus +operations. +""" + +import asyncio +from datetime import timedelta + +from temporalio import workflow +from temporalio.exceptions import ApplicationError + +from nexus_messaging.ondemandpattern.handler.activities import call_greeting_service +from nexus_messaging.ondemandpattern.service import ( + ApproveInput, + GetLanguagesInput, + GetLanguagesOutput, + Language, + SetLanguageInput, +) + + +@workflow.defn +class GreetingWorkflow: + def __init__(self) -> None: + self.approved_for_release = False + self.greetings: dict[Language, str] = { + Language.CHINESE: "\u4f60\u597d\uff0c\u4e16\u754c", + Language.ENGLISH: "Hello, world", + } + self.language = Language.ENGLISH + self.lock = asyncio.Lock() + + @workflow.run + async def run(self) -> str: + # Wait until approved and all in-flight update handlers have finished. + await workflow.wait_condition( + lambda: self.approved_for_release and workflow.all_handlers_finished() + ) + return self.greetings[self.language] + + @workflow.query + def get_languages(self, input: GetLanguagesInput) -> GetLanguagesOutput: + if input.include_unsupported: + languages = sorted(Language) + else: + languages = sorted(self.greetings) + return GetLanguagesOutput(languages=languages) + + @workflow.query + def get_language(self) -> Language: + return self.language + + @workflow.signal + def approve(self, input: ApproveInput) -> None: + workflow.logger.info("Approval signal received for user %s", input.user_id) + self.approved_for_release = True + + @workflow.update + def set_language(self, input: SetLanguageInput) -> Language: + workflow.logger.info("setLanguage update received for user %s", input.user_id) + previous_language, self.language = self.language, input.language + return previous_language + + @set_language.validator + def validate_set_language(self, input: SetLanguageInput) -> None: + if input.language not in self.greetings: + raise ValueError(f"{input.language.name} is not supported") + + # Changes the active language, calling an activity to fetch a greeting for new + # languages not already in the greetings map. + @workflow.update + async def set_language_using_activity(self, input: SetLanguageInput) -> Language: + if input.language not in self.greetings: + async with self.lock: + greeting = await workflow.execute_activity( + call_greeting_service, + input.language, + start_to_close_timeout=timedelta(seconds=10), + ) + if greeting is None: + raise ApplicationError( + f"Greeting service does not support {input.language.name}" + ) + self.greetings[input.language] = greeting + previous_language, self.language = self.language, input.language + return previous_language diff --git a/nexus_messaging/ondemandpattern/service.py b/nexus_messaging/ondemandpattern/service.py new file mode 100644 index 00000000..8f347d32 --- /dev/null +++ b/nexus_messaging/ondemandpattern/service.py @@ -0,0 +1,72 @@ +""" +Nexus service definition for the on-demand pattern. Every operation includes a userId +so the caller controls which workflow instance is targeted. This also exposes a +run_from_remote operation that starts a new GreetingWorkflow. +""" + +from dataclasses import dataclass +from enum import IntEnum + +import nexusrpc + + +class Language(IntEnum): + ARABIC = 1 + CHINESE = 2 + ENGLISH = 3 + FRENCH = 4 + HINDI = 5 + PORTUGUESE = 6 + SPANISH = 7 + + +@dataclass +class RunFromRemoteInput: + user_id: str + + +@dataclass +class GetLanguagesInput: + include_unsupported: bool + user_id: str + + +@dataclass +class GetLanguagesOutput: + languages: list[Language] + + +@dataclass +class GetLanguageInput: + user_id: str + + +@dataclass +class SetLanguageInput: + language: Language + user_id: str + + +@dataclass +class ApproveInput: + name: str + user_id: str + + +@dataclass +class ApproveOutput: + pass + + +@nexusrpc.service +class NexusRemoteGreetingService: + # Starts a new GreetingWorkflow with the given workflow ID (asynchronous). + run_from_remote: nexusrpc.Operation[RunFromRemoteInput, str] + # Returns the languages supported by the specified workflow. + get_languages: nexusrpc.Operation[GetLanguagesInput, GetLanguagesOutput] + # Returns the currently active language of the specified workflow. + get_language: nexusrpc.Operation[GetLanguageInput, Language] + # Changes the active language on the specified workflow, returning the previous one. + set_language: nexusrpc.Operation[SetLanguageInput, Language] + # Approves the specified workflow, allowing it to complete. + approve: nexusrpc.Operation[ApproveInput, ApproveOutput] diff --git a/nexus_multiple_args/handler/service_handler.py b/nexus_multiple_args/handler/service_handler.py index c2ddfb92..8c00ee31 100644 --- a/nexus_multiple_args/handler/service_handler.py +++ b/nexus_multiple_args/handler/service_handler.py @@ -1,7 +1,5 @@ from __future__ import annotations -import uuid - import nexusrpc from temporalio import nexus @@ -32,7 +30,7 @@ async def hello( input.name, # First argument: name input.language, # Second argument: language ], - id=str(uuid.uuid4()), + id=f"hello-multi-args-{input.name}-{input.language}", ) diff --git a/nexus_sync_operations/README.md b/nexus_sync_operations/README.md deleted file mode 100644 index 10e266ec..00000000 --- a/nexus_sync_operations/README.md +++ /dev/null @@ -1,39 +0,0 @@ -This sample shows how to create a Nexus service that is backed by a long-running workflow and -exposes operations that execute updates and queries against that workflow. The long-running -workflow, and the updates/queries are private implementation detail of the nexus service: the caller -does not know how the operations are implemented. - -### Sample directory structure - -- [service.py](./service.py) - shared Nexus service definition -- [caller](./caller) - a caller workflow that executes Nexus operations, together with a worker and starter code -- [handler](./handler) - Nexus operation handlers, together with a workflow used by one of the Nexus operations, and a worker that polls for both workflow, activity, and Nexus tasks. - - -### Instructions - -Start a Temporal server. (See the main samples repo [README](../README.md)). - -Run the following to create the caller and handler namespaces, and the Nexus endpoint: - -``` -temporal operator namespace create --namespace nexus-sync-operations-handler-namespace -temporal operator namespace create --namespace nexus-sync-operations-caller-namespace - -temporal operator nexus endpoint create \ - --name nexus-sync-operations-nexus-endpoint \ - --target-namespace nexus-sync-operations-handler-namespace \ - --target-task-queue nexus-sync-operations-handler-task-queue \ - --description-file nexus_sync_operations/endpoint_description.md -``` - -In one terminal, run the Temporal worker in the handler namespace: -``` -uv run nexus_sync_operations/handler/worker.py -``` - -In another terminal, run the Temporal worker in the caller namespace and start the caller -workflow: -``` -uv run nexus_sync_operations/caller/app.py -``` diff --git a/nexus_sync_operations/caller/workflows.py b/nexus_sync_operations/caller/workflows.py deleted file mode 100644 index a358d764..00000000 --- a/nexus_sync_operations/caller/workflows.py +++ /dev/null @@ -1,46 +0,0 @@ -""" -This is a workflow that calls nexus operations. The caller does not have information about how these -operations are implemented by the nexus service. -""" - -from temporalio import workflow - -from message_passing.introduction import Language -from message_passing.introduction.workflows import GetLanguagesInput, SetLanguageInput - -with workflow.unsafe.imports_passed_through(): - from nexus_sync_operations.service import GreetingService - -NEXUS_ENDPOINT = "nexus-sync-operations-nexus-endpoint" - - -@workflow.defn -class CallerWorkflow: - @workflow.run - async def run(self) -> list[str]: - log = [] - nexus_client = workflow.create_nexus_client( - service=GreetingService, - endpoint=NEXUS_ENDPOINT, - ) - - # Get supported languages - supported_languages = await nexus_client.execute_operation( - GreetingService.get_languages, GetLanguagesInput(include_unsupported=False) - ) - log.append(f"supported languages: {supported_languages}") - - # Set language - previous_language = await nexus_client.execute_operation( - GreetingService.set_language, - SetLanguageInput(language=Language.ARABIC), - ) - assert ( - await nexus_client.execute_operation(GreetingService.get_language, None) - == Language.ARABIC - ) - log.append( - f"language changed: {previous_language.name} -> {Language.ARABIC.name}" - ) - - return log diff --git a/nexus_sync_operations/endpoint_description.md b/nexus_sync_operations/endpoint_description.md deleted file mode 100644 index a33b60cf..00000000 --- a/nexus_sync_operations/endpoint_description.md +++ /dev/null @@ -1,4 +0,0 @@ -## Service: [GreetingService](https://github.com/temporalio/samples-python/blob/main/nexus_sync_operations/service.py) -- operation: `get_languages` -- operation: `get_language` -- operation: `set_language` diff --git a/nexus_sync_operations/handler/service_handler.py b/nexus_sync_operations/handler/service_handler.py deleted file mode 100644 index 626948f0..00000000 --- a/nexus_sync_operations/handler/service_handler.py +++ /dev/null @@ -1,83 +0,0 @@ -""" -This file demonstrates how to implement a Nexus service that is backed by a long-running workflow -and exposes operations that perform updates and queries against that workflow. -""" - -from __future__ import annotations - -import nexusrpc -from temporalio import nexus -from temporalio.client import Client, WorkflowHandle -from temporalio.common import WorkflowIDConflictPolicy - -from message_passing.introduction import Language -from message_passing.introduction.workflows import ( - GetLanguagesInput, - GreetingWorkflow, - SetLanguageInput, -) -from nexus_sync_operations.service import GreetingService - - -@nexusrpc.handler.service_handler(service=GreetingService) -class GreetingServiceHandler: - def __init__(self, workflow_id: str): - self.workflow_id = workflow_id - - @classmethod - async def create( - cls, workflow_id: str, client: Client, task_queue: str - ) -> GreetingServiceHandler: - # Start the long-running "entity" workflow, if it is not already running. - await client.start_workflow( - GreetingWorkflow.run, - id=workflow_id, - task_queue=task_queue, - id_conflict_policy=WorkflowIDConflictPolicy.USE_EXISTING, - ) - return cls(workflow_id) - - @property - def greeting_workflow_handle(self) -> WorkflowHandle[GreetingWorkflow, str]: - # In nexus operation handler code, nexus.client() is always available, returning a client - # connected to the handler namespace (it's the same client instance that your nexus worker - # is using to poll the server for nexus tasks). This client can be used to interact with the - # handler namespace, for example to send signals, queries, or updates. Remember however, - # that a sync_operation handler must return quickly (no more than a few seconds). To do - # long-running work in a nexus operation handler, use - # temporalio.nexus.workflow_run_operation (see the hello_nexus sample). - return nexus.client().get_workflow_handle_for( - GreetingWorkflow.run, self.workflow_id - ) - - # 👉 This is a handler for a nexus operation whose internal implementation involves executing a - # query against a long-running workflow that is private to the nexus service. - @nexusrpc.handler.sync_operation - async def get_languages( - self, ctx: nexusrpc.handler.StartOperationContext, input: GetLanguagesInput - ) -> list[Language]: - return await self.greeting_workflow_handle.query( - GreetingWorkflow.get_languages, input - ) - - # 👉 This is a handler for a nexus operation whose internal implementation involves executing a - # query against a long-running workflow that is private to the nexus service. - @nexusrpc.handler.sync_operation - async def get_language( - self, ctx: nexusrpc.handler.StartOperationContext, input: None - ) -> Language: - return await self.greeting_workflow_handle.query(GreetingWorkflow.get_language) - - # 👉 This is a handler for a nexus operation whose internal implementation involves executing an - # update against a long-running workflow that is private to the nexus service. Although updates - # can run for an arbitrarily long time, when exposing an update via a nexus sync operation the - # update should execute quickly (sync operations must complete in under 10s). - @nexusrpc.handler.sync_operation - async def set_language( - self, - ctx: nexusrpc.handler.StartOperationContext, - input: SetLanguageInput, - ) -> Language: - return await self.greeting_workflow_handle.execute_update( - GreetingWorkflow.set_language_using_activity, input - ) diff --git a/nexus_sync_operations/service.py b/nexus_sync_operations/service.py deleted file mode 100644 index 3436d5f3..00000000 --- a/nexus_sync_operations/service.py +++ /dev/null @@ -1,20 +0,0 @@ -""" -This module defines a Nexus service that exposes three operations. - -It is used by the nexus service handler to validate that the operation handlers implement the -correct input and output types, and by the caller workflow to create a type-safe client. It does not -contain the implementation of the operations; see nexus_sync_operations.handler.service_handler for -that. -""" - -import nexusrpc - -from message_passing.introduction import Language -from message_passing.introduction.workflows import GetLanguagesInput, SetLanguageInput - - -@nexusrpc.service -class GreetingService: - get_languages: nexusrpc.Operation[GetLanguagesInput, list[Language]] - get_language: nexusrpc.Operation[None, Language] - set_language: nexusrpc.Operation[SetLanguageInput, Language] diff --git a/pyproject.toml b/pyproject.toml index 0927eea3..f57bc875 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -49,7 +49,7 @@ pydantic-converter = ["pydantic>=2.10.6,<3"] sentry = ["sentry-sdk>=2.13.0"] trio-async = ["trio>=0.28.0,<0.29", "trio-asyncio>=0.15.0,<0.16"] cloud-export-to-parquet = [ - "pandas>=2.2.2,<3 ; python_version >= '3.10' and python_version < '4.0'", + "pandas>=2.3.3,<3 ; python_version >= '3.10' and python_version < '4.0'", "numpy>=1.26.0,<2 ; python_version >= '3.10' and python_version < '3.13'", "boto3>=1.34.89,<2", "pyarrow>=19.0.1", diff --git a/tests/nexus_sync_operations/nexus_sync_operations_test.py b/tests/nexus_messaging/callerpattern_test.py similarity index 55% rename from tests/nexus_sync_operations/nexus_sync_operations_test.py rename to tests/nexus_messaging/callerpattern_test.py index d74168cb..d14b4c29 100644 --- a/tests/nexus_sync_operations/nexus_sync_operations_test.py +++ b/tests/nexus_messaging/callerpattern_test.py @@ -8,18 +8,18 @@ from temporalio.testing import WorkflowEnvironment from temporalio.worker import Worker -import nexus_sync_operations.handler.service_handler -import nexus_sync_operations.handler.worker -from message_passing.introduction import Language -from message_passing.introduction.workflows import GetLanguagesInput, SetLanguageInput -from nexus_sync_operations.caller.workflows import CallerWorkflow +import nexus_messaging.callerpattern.handler.worker +from nexus_messaging.callerpattern.caller.workflows import CallerWorkflow +from nexus_messaging.callerpattern.service import ( + GetLanguageInput, + GetLanguagesInput, + Language, + NexusGreetingService, + SetLanguageInput, +) from tests.helpers.nexus import create_nexus_endpoint, delete_nexus_endpoint -with workflow.unsafe.imports_passed_through(): - from nexus_sync_operations.service import GreetingService - - -NEXUS_ENDPOINT = "nexus-sync-operations-nexus-endpoint" +NEXUS_ENDPOINT = "nexus-messaging-nexus-endpoint" @workflow.defn @@ -27,89 +27,90 @@ class TestCallerWorkflow: """Test workflow that calls Nexus operations and makes assertions.""" @workflow.run - async def run(self) -> None: + async def run(self, user_id: str) -> None: nexus_client = workflow.create_nexus_client( - service=GreetingService, + service=NexusGreetingService, endpoint=NEXUS_ENDPOINT, ) supported_languages = await nexus_client.execute_operation( - GreetingService.get_languages, GetLanguagesInput(include_unsupported=False) + NexusGreetingService.get_languages, + GetLanguagesInput(include_unsupported=False, user_id=user_id), ) - assert supported_languages == [Language.CHINESE, Language.ENGLISH] + assert supported_languages.languages == [Language.CHINESE, Language.ENGLISH] initial_language = await nexus_client.execute_operation( - GreetingService.get_language, None + NexusGreetingService.get_language, + GetLanguageInput(user_id=user_id), ) assert initial_language == Language.ENGLISH previous_language = await nexus_client.execute_operation( - GreetingService.set_language, - SetLanguageInput(language=Language.CHINESE), + NexusGreetingService.set_language, + SetLanguageInput(language=Language.CHINESE, user_id=user_id), ) assert previous_language == Language.ENGLISH current_language = await nexus_client.execute_operation( - GreetingService.get_language, None + NexusGreetingService.get_language, + GetLanguageInput(user_id=user_id), ) assert current_language == Language.CHINESE previous_language = await nexus_client.execute_operation( - GreetingService.set_language, - SetLanguageInput(language=Language.ARABIC), + NexusGreetingService.set_language, + SetLanguageInput(language=Language.ARABIC, user_id=user_id), ) assert previous_language == Language.CHINESE current_language = await nexus_client.execute_operation( - GreetingService.get_language, None + NexusGreetingService.get_language, + GetLanguageInput(user_id=user_id), ) assert current_language == Language.ARABIC -async def test_nexus_sync_operations(client: Client, env: WorkflowEnvironment): +async def test_callerpattern(client: Client, env: WorkflowEnvironment): if env.supports_time_skipping: pytest.skip("Nexus tests don't work under the Java test server") await _run_caller_workflow(client, TestCallerWorkflow) -async def test_nexus_sync_operations_caller_workflow( - client: Client, env: WorkflowEnvironment -): - """ - Runs the CallerWorkflow from the sample to ensure it executes without errors. - """ +async def test_callerpattern_caller_workflow(client: Client, env: WorkflowEnvironment): + """Runs the CallerWorkflow from the sample to ensure it executes without errors.""" if env.supports_time_skipping: pytest.skip("Nexus tests don't work under the Java test server") await _run_caller_workflow(client, CallerWorkflow) -async def _run_caller_workflow(client: Client, workflow: Type): +async def _run_caller_workflow(client: Client, wf: Type): create_response = await create_nexus_endpoint( name=NEXUS_ENDPOINT, - task_queue=nexus_sync_operations.handler.worker.TASK_QUEUE, + task_queue=nexus_messaging.callerpattern.handler.worker.TASK_QUEUE, client=client, ) try: handler_worker_task = asyncio.create_task( - nexus_sync_operations.handler.worker.main(client) + nexus_messaging.callerpattern.handler.worker.main(client) ) try: async with Worker( client, task_queue="test-caller-task-queue", - workflows=[workflow], + workflows=[wf], ): await client.execute_workflow( - workflow.run, + wf.run, + arg="user-1", id=str(uuid.uuid4()), task_queue="test-caller-task-queue", ) finally: - nexus_sync_operations.handler.worker.interrupt_event.set() + nexus_messaging.callerpattern.handler.worker.interrupt_event.set() await handler_worker_task - nexus_sync_operations.handler.worker.interrupt_event.clear() + nexus_messaging.callerpattern.handler.worker.interrupt_event.clear() finally: await delete_nexus_endpoint( id=create_response.endpoint.id, diff --git a/tests/nexus_messaging/ondemandpattern_test.py b/tests/nexus_messaging/ondemandpattern_test.py new file mode 100644 index 00000000..75fd761c --- /dev/null +++ b/tests/nexus_messaging/ondemandpattern_test.py @@ -0,0 +1,130 @@ +import asyncio +import uuid +from typing import Type + +import pytest +from temporalio import workflow +from temporalio.client import Client +from temporalio.testing import WorkflowEnvironment +from temporalio.worker import Worker + +import nexus_messaging.ondemandpattern.handler.worker +from nexus_messaging.ondemandpattern.caller.workflows import CallerRemoteWorkflow +from nexus_messaging.ondemandpattern.service import ( + ApproveInput, + GetLanguageInput, + GetLanguagesInput, + Language, + NexusRemoteGreetingService, + RunFromRemoteInput, + SetLanguageInput, +) +from tests.helpers.nexus import create_nexus_endpoint, delete_nexus_endpoint + +NEXUS_ENDPOINT = "nexus-messaging-nexus-endpoint" + + +@workflow.defn +class TestCallerRemoteWorkflow: + """Test workflow that creates remote workflows and makes assertions.""" + + @workflow.run + async def run(self) -> None: + nexus_client = workflow.create_nexus_client( + service=NexusRemoteGreetingService, + endpoint=NEXUS_ENDPOINT, + ) + + workflow_id = f"test-remote-{workflow.uuid4()}" + + # Start a remote workflow. + handle = await nexus_client.start_operation( + NexusRemoteGreetingService.run_from_remote, + RunFromRemoteInput(user_id=workflow_id), + ) + + # Query for supported languages. + languages_output = await nexus_client.execute_operation( + NexusRemoteGreetingService.get_languages, + GetLanguagesInput(include_unsupported=False, user_id=workflow_id), + ) + assert languages_output.languages == [Language.CHINESE, Language.ENGLISH] + + # Check initial language. + initial_language = await nexus_client.execute_operation( + NexusRemoteGreetingService.get_language, + GetLanguageInput(user_id=workflow_id), + ) + assert initial_language == Language.ENGLISH + + # Set language. + previous_language = await nexus_client.execute_operation( + NexusRemoteGreetingService.set_language, + SetLanguageInput(language=Language.ARABIC, user_id=workflow_id), + ) + assert previous_language == Language.ENGLISH + + current_language = await nexus_client.execute_operation( + NexusRemoteGreetingService.get_language, + GetLanguageInput(user_id=workflow_id), + ) + assert current_language == Language.ARABIC + + # Approve and wait for result. + await nexus_client.execute_operation( + NexusRemoteGreetingService.approve, + ApproveInput(name="test", user_id=workflow_id), + ) + + result = await handle + assert "\u0645\u0631\u062d\u0628\u0627" in result # Arabic greeting + + +async def test_ondemandpattern(client: Client, env: WorkflowEnvironment): + if env.supports_time_skipping: + pytest.skip("Nexus tests don't work under the Java test server") + + await _run_caller_workflow(client, TestCallerRemoteWorkflow) + + +async def test_ondemandpattern_caller_workflow( + client: Client, env: WorkflowEnvironment +): + """Runs the CallerRemoteWorkflow from the sample to ensure it executes without errors.""" + if env.supports_time_skipping: + pytest.skip("Nexus tests don't work under the Java test server") + + await _run_caller_workflow(client, CallerRemoteWorkflow) + + +async def _run_caller_workflow(client: Client, wf: Type): + create_response = await create_nexus_endpoint( + name=NEXUS_ENDPOINT, + task_queue=nexus_messaging.ondemandpattern.handler.worker.TASK_QUEUE, + client=client, + ) + try: + handler_worker_task = asyncio.create_task( + nexus_messaging.ondemandpattern.handler.worker.main(client) + ) + try: + async with Worker( + client, + task_queue="test-caller-remote-task-queue", + workflows=[wf], + ): + await client.execute_workflow( + wf.run, + id=str(uuid.uuid4()), + task_queue="test-caller-remote-task-queue", + ) + finally: + nexus_messaging.ondemandpattern.handler.worker.interrupt_event.set() + await handler_worker_task + nexus_messaging.ondemandpattern.handler.worker.interrupt_event.clear() + finally: + await delete_nexus_endpoint( + id=create_response.endpoint.id, + version=create_response.endpoint.version, + client=client, + ) diff --git a/uv.lock b/uv.lock index 3c9990eb..106627c3 100644 --- a/uv.lock +++ b/uv.lock @@ -1556,7 +1556,7 @@ wheels = [ [[package]] name = "pandas" -version = "2.3.1" +version = "2.3.3" source = { registry = "https://pypi.org/simple" } dependencies = [ { name = "numpy" }, @@ -1564,42 +1564,55 @@ dependencies = [ { name = "pytz" }, { name = "tzdata" }, ] -sdist = { url = "https://files.pythonhosted.org/packages/d1/6f/75aa71f8a14267117adeeed5d21b204770189c0a0025acbdc03c337b28fc/pandas-2.3.1.tar.gz", hash = "sha256:0a95b9ac964fe83ce317827f80304d37388ea77616b1425f0ae41c9d2d0d7bb2", size = 4487493, upload-time = "2025-07-07T19:20:04.079Z" } -wheels = [ - { url = "https://files.pythonhosted.org/packages/c4/ca/aa97b47287221fa37a49634532e520300088e290b20d690b21ce3e448143/pandas-2.3.1-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:22c2e866f7209ebc3a8f08d75766566aae02bcc91d196935a1d9e59c7b990ac9", size = 11542731, upload-time = "2025-07-07T19:18:12.619Z" }, - { url = "https://files.pythonhosted.org/packages/80/bf/7938dddc5f01e18e573dcfb0f1b8c9357d9b5fa6ffdee6e605b92efbdff2/pandas-2.3.1-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:3583d348546201aff730c8c47e49bc159833f971c2899d6097bce68b9112a4f1", size = 10790031, upload-time = "2025-07-07T19:18:16.611Z" }, - { url = "https://files.pythonhosted.org/packages/ee/2f/9af748366763b2a494fed477f88051dbf06f56053d5c00eba652697e3f94/pandas-2.3.1-cp310-cp310-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:0f951fbb702dacd390561e0ea45cdd8ecfa7fb56935eb3dd78e306c19104b9b0", size = 11724083, upload-time = "2025-07-07T19:18:20.512Z" }, - { url = "https://files.pythonhosted.org/packages/2c/95/79ab37aa4c25d1e7df953dde407bb9c3e4ae47d154bc0dd1692f3a6dcf8c/pandas-2.3.1-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:cd05b72ec02ebfb993569b4931b2e16fbb4d6ad6ce80224a3ee838387d83a191", size = 12342360, upload-time = "2025-07-07T19:18:23.194Z" }, - { url = "https://files.pythonhosted.org/packages/75/a7/d65e5d8665c12c3c6ff5edd9709d5836ec9b6f80071b7f4a718c6106e86e/pandas-2.3.1-cp310-cp310-musllinux_1_2_aarch64.whl", hash = "sha256:1b916a627919a247d865aed068eb65eb91a344b13f5b57ab9f610b7716c92de1", size = 13202098, upload-time = "2025-07-07T19:18:25.558Z" }, - { url = "https://files.pythonhosted.org/packages/65/f3/4c1dbd754dbaa79dbf8b537800cb2fa1a6e534764fef50ab1f7533226c5c/pandas-2.3.1-cp310-cp310-musllinux_1_2_x86_64.whl", hash = "sha256:fe67dc676818c186d5a3d5425250e40f179c2a89145df477dd82945eaea89e97", size = 13837228, upload-time = "2025-07-07T19:18:28.344Z" }, - { url = "https://files.pythonhosted.org/packages/3f/d6/d7f5777162aa9b48ec3910bca5a58c9b5927cfd9cfde3aa64322f5ba4b9f/pandas-2.3.1-cp310-cp310-win_amd64.whl", hash = "sha256:2eb789ae0274672acbd3c575b0598d213345660120a257b47b5dafdc618aec83", size = 11336561, upload-time = "2025-07-07T19:18:31.211Z" }, - { url = "https://files.pythonhosted.org/packages/76/1c/ccf70029e927e473a4476c00e0d5b32e623bff27f0402d0a92b7fc29bb9f/pandas-2.3.1-cp311-cp311-macosx_10_9_x86_64.whl", hash = "sha256:2b0540963d83431f5ce8870ea02a7430adca100cec8a050f0811f8e31035541b", size = 11566608, upload-time = "2025-07-07T19:18:33.86Z" }, - { url = "https://files.pythonhosted.org/packages/ec/d3/3c37cb724d76a841f14b8f5fe57e5e3645207cc67370e4f84717e8bb7657/pandas-2.3.1-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:fe7317f578c6a153912bd2292f02e40c1d8f253e93c599e82620c7f69755c74f", size = 10823181, upload-time = "2025-07-07T19:18:36.151Z" }, - { url = "https://files.pythonhosted.org/packages/8a/4c/367c98854a1251940edf54a4df0826dcacfb987f9068abf3e3064081a382/pandas-2.3.1-cp311-cp311-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:e6723a27ad7b244c0c79d8e7007092d7c8f0f11305770e2f4cd778b3ad5f9f85", size = 11793570, upload-time = "2025-07-07T19:18:38.385Z" }, - { url = "https://files.pythonhosted.org/packages/07/5f/63760ff107bcf5146eee41b38b3985f9055e710a72fdd637b791dea3495c/pandas-2.3.1-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:3462c3735fe19f2638f2c3a40bd94ec2dc5ba13abbb032dd2fa1f540a075509d", size = 12378887, upload-time = "2025-07-07T19:18:41.284Z" }, - { url = "https://files.pythonhosted.org/packages/15/53/f31a9b4dfe73fe4711c3a609bd8e60238022f48eacedc257cd13ae9327a7/pandas-2.3.1-cp311-cp311-musllinux_1_2_aarch64.whl", hash = "sha256:98bcc8b5bf7afed22cc753a28bc4d9e26e078e777066bc53fac7904ddef9a678", size = 13230957, upload-time = "2025-07-07T19:18:44.187Z" }, - { url = "https://files.pythonhosted.org/packages/e0/94/6fce6bf85b5056d065e0a7933cba2616dcb48596f7ba3c6341ec4bcc529d/pandas-2.3.1-cp311-cp311-musllinux_1_2_x86_64.whl", hash = "sha256:4d544806b485ddf29e52d75b1f559142514e60ef58a832f74fb38e48d757b299", size = 13883883, upload-time = "2025-07-07T19:18:46.498Z" }, - { url = "https://files.pythonhosted.org/packages/c8/7b/bdcb1ed8fccb63d04bdb7635161d0ec26596d92c9d7a6cce964e7876b6c1/pandas-2.3.1-cp311-cp311-win_amd64.whl", hash = "sha256:b3cd4273d3cb3707b6fffd217204c52ed92859533e31dc03b7c5008aa933aaab", size = 11340212, upload-time = "2025-07-07T19:18:49.293Z" }, - { url = "https://files.pythonhosted.org/packages/46/de/b8445e0f5d217a99fe0eeb2f4988070908979bec3587c0633e5428ab596c/pandas-2.3.1-cp312-cp312-macosx_10_13_x86_64.whl", hash = "sha256:689968e841136f9e542020698ee1c4fbe9caa2ed2213ae2388dc7b81721510d3", size = 11588172, upload-time = "2025-07-07T19:18:52.054Z" }, - { url = "https://files.pythonhosted.org/packages/1e/e0/801cdb3564e65a5ac041ab99ea6f1d802a6c325bb6e58c79c06a3f1cd010/pandas-2.3.1-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:025e92411c16cbe5bb2a4abc99732a6b132f439b8aab23a59fa593eb00704232", size = 10717365, upload-time = "2025-07-07T19:18:54.785Z" }, - { url = "https://files.pythonhosted.org/packages/51/a5/c76a8311833c24ae61a376dbf360eb1b1c9247a5d9c1e8b356563b31b80c/pandas-2.3.1-cp312-cp312-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:9b7ff55f31c4fcb3e316e8f7fa194566b286d6ac430afec0d461163312c5841e", size = 11280411, upload-time = "2025-07-07T19:18:57.045Z" }, - { url = "https://files.pythonhosted.org/packages/da/01/e383018feba0a1ead6cf5fe8728e5d767fee02f06a3d800e82c489e5daaf/pandas-2.3.1-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:7dcb79bf373a47d2a40cf7232928eb7540155abbc460925c2c96d2d30b006eb4", size = 11988013, upload-time = "2025-07-07T19:18:59.771Z" }, - { url = "https://files.pythonhosted.org/packages/5b/14/cec7760d7c9507f11c97d64f29022e12a6cc4fc03ac694535e89f88ad2ec/pandas-2.3.1-cp312-cp312-musllinux_1_2_aarch64.whl", hash = "sha256:56a342b231e8862c96bdb6ab97170e203ce511f4d0429589c8ede1ee8ece48b8", size = 12767210, upload-time = "2025-07-07T19:19:02.944Z" }, - { url = "https://files.pythonhosted.org/packages/50/b9/6e2d2c6728ed29fb3d4d4d302504fb66f1a543e37eb2e43f352a86365cdf/pandas-2.3.1-cp312-cp312-musllinux_1_2_x86_64.whl", hash = "sha256:ca7ed14832bce68baef331f4d7f294411bed8efd032f8109d690df45e00c4679", size = 13440571, upload-time = "2025-07-07T19:19:06.82Z" }, - { url = "https://files.pythonhosted.org/packages/80/a5/3a92893e7399a691bad7664d977cb5e7c81cf666c81f89ea76ba2bff483d/pandas-2.3.1-cp312-cp312-win_amd64.whl", hash = "sha256:ac942bfd0aca577bef61f2bc8da8147c4ef6879965ef883d8e8d5d2dc3e744b8", size = 10987601, upload-time = "2025-07-07T19:19:09.589Z" }, - { url = "https://files.pythonhosted.org/packages/32/ed/ff0a67a2c5505e1854e6715586ac6693dd860fbf52ef9f81edee200266e7/pandas-2.3.1-cp313-cp313-macosx_10_13_x86_64.whl", hash = "sha256:9026bd4a80108fac2239294a15ef9003c4ee191a0f64b90f170b40cfb7cf2d22", size = 11531393, upload-time = "2025-07-07T19:19:12.245Z" }, - { url = "https://files.pythonhosted.org/packages/c7/db/d8f24a7cc9fb0972adab0cc80b6817e8bef888cfd0024eeb5a21c0bb5c4a/pandas-2.3.1-cp313-cp313-macosx_11_0_arm64.whl", hash = "sha256:6de8547d4fdb12421e2d047a2c446c623ff4c11f47fddb6b9169eb98ffba485a", size = 10668750, upload-time = "2025-07-07T19:19:14.612Z" }, - { url = "https://files.pythonhosted.org/packages/0f/b0/80f6ec783313f1e2356b28b4fd8d2148c378370045da918c73145e6aab50/pandas-2.3.1-cp313-cp313-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:782647ddc63c83133b2506912cc6b108140a38a37292102aaa19c81c83db2928", size = 11342004, upload-time = "2025-07-07T19:19:16.857Z" }, - { url = "https://files.pythonhosted.org/packages/e9/e2/20a317688435470872885e7fc8f95109ae9683dec7c50be29b56911515a5/pandas-2.3.1-cp313-cp313-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:2ba6aff74075311fc88504b1db890187a3cd0f887a5b10f5525f8e2ef55bfdb9", size = 12050869, upload-time = "2025-07-07T19:19:19.265Z" }, - { url = "https://files.pythonhosted.org/packages/55/79/20d746b0a96c67203a5bee5fb4e00ac49c3e8009a39e1f78de264ecc5729/pandas-2.3.1-cp313-cp313-musllinux_1_2_aarch64.whl", hash = "sha256:e5635178b387bd2ba4ac040f82bc2ef6e6b500483975c4ebacd34bec945fda12", size = 12750218, upload-time = "2025-07-07T19:19:21.547Z" }, - { url = "https://files.pythonhosted.org/packages/7c/0f/145c8b41e48dbf03dd18fdd7f24f8ba95b8254a97a3379048378f33e7838/pandas-2.3.1-cp313-cp313-musllinux_1_2_x86_64.whl", hash = "sha256:6f3bf5ec947526106399a9e1d26d40ee2b259c66422efdf4de63c848492d91bb", size = 13416763, upload-time = "2025-07-07T19:19:23.939Z" }, - { url = "https://files.pythonhosted.org/packages/b2/c0/54415af59db5cdd86a3d3bf79863e8cc3fa9ed265f0745254061ac09d5f2/pandas-2.3.1-cp313-cp313-win_amd64.whl", hash = "sha256:1c78cf43c8fde236342a1cb2c34bcff89564a7bfed7e474ed2fffa6aed03a956", size = 10987482, upload-time = "2025-07-07T19:19:42.699Z" }, - { url = "https://files.pythonhosted.org/packages/48/64/2fd2e400073a1230e13b8cd604c9bc95d9e3b962e5d44088ead2e8f0cfec/pandas-2.3.1-cp313-cp313t-macosx_10_13_x86_64.whl", hash = "sha256:8dfc17328e8da77be3cf9f47509e5637ba8f137148ed0e9b5241e1baf526e20a", size = 12029159, upload-time = "2025-07-07T19:19:26.362Z" }, - { url = "https://files.pythonhosted.org/packages/d8/0a/d84fd79b0293b7ef88c760d7dca69828d867c89b6d9bc52d6a27e4d87316/pandas-2.3.1-cp313-cp313t-macosx_11_0_arm64.whl", hash = "sha256:ec6c851509364c59a5344458ab935e6451b31b818be467eb24b0fe89bd05b6b9", size = 11393287, upload-time = "2025-07-07T19:19:29.157Z" }, - { url = "https://files.pythonhosted.org/packages/50/ae/ff885d2b6e88f3c7520bb74ba319268b42f05d7e583b5dded9837da2723f/pandas-2.3.1-cp313-cp313t-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:911580460fc4884d9b05254b38a6bfadddfcc6aaef856fb5859e7ca202e45275", size = 11309381, upload-time = "2025-07-07T19:19:31.436Z" }, - { url = "https://files.pythonhosted.org/packages/85/86/1fa345fc17caf5d7780d2699985c03dbe186c68fee00b526813939062bb0/pandas-2.3.1-cp313-cp313t-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:2f4d6feeba91744872a600e6edbbd5b033005b431d5ae8379abee5bcfa479fab", size = 11883998, upload-time = "2025-07-07T19:19:34.267Z" }, - { url = "https://files.pythonhosted.org/packages/81/aa/e58541a49b5e6310d89474333e994ee57fea97c8aaa8fc7f00b873059bbf/pandas-2.3.1-cp313-cp313t-musllinux_1_2_aarch64.whl", hash = "sha256:fe37e757f462d31a9cd7580236a82f353f5713a80e059a29753cf938c6775d96", size = 12704705, upload-time = "2025-07-07T19:19:36.856Z" }, - { url = "https://files.pythonhosted.org/packages/d5/f9/07086f5b0f2a19872554abeea7658200824f5835c58a106fa8f2ae96a46c/pandas-2.3.1-cp313-cp313t-musllinux_1_2_x86_64.whl", hash = "sha256:5db9637dbc24b631ff3707269ae4559bce4b7fd75c1c4d7e13f40edc42df4444", size = 13189044, upload-time = "2025-07-07T19:19:39.999Z" }, +sdist = { url = "https://files.pythonhosted.org/packages/33/01/d40b85317f86cf08d853a4f495195c73815fdf205eef3993821720274518/pandas-2.3.3.tar.gz", hash = "sha256:e05e1af93b977f7eafa636d043f9f94c7ee3ac81af99c13508215942e64c993b", size = 4495223, upload-time = "2025-09-29T23:34:51.853Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/3d/f7/f425a00df4fcc22b292c6895c6831c0c8ae1d9fac1e024d16f98a9ce8749/pandas-2.3.3-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:376c6446ae31770764215a6c937f72d917f214b43560603cd60da6408f183b6c", size = 11555763, upload-time = "2025-09-29T23:16:53.287Z" }, + { url = "https://files.pythonhosted.org/packages/13/4f/66d99628ff8ce7857aca52fed8f0066ce209f96be2fede6cef9f84e8d04f/pandas-2.3.3-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:e19d192383eab2f4ceb30b412b22ea30690c9e618f78870357ae1d682912015a", size = 10801217, upload-time = "2025-09-29T23:17:04.522Z" }, + { url = "https://files.pythonhosted.org/packages/1d/03/3fc4a529a7710f890a239cc496fc6d50ad4a0995657dccc1d64695adb9f4/pandas-2.3.3-cp310-cp310-manylinux_2_24_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:5caf26f64126b6c7aec964f74266f435afef1c1b13da3b0636c7518a1fa3e2b1", size = 12148791, upload-time = "2025-09-29T23:17:18.444Z" }, + { url = "https://files.pythonhosted.org/packages/40/a8/4dac1f8f8235e5d25b9955d02ff6f29396191d4e665d71122c3722ca83c5/pandas-2.3.3-cp310-cp310-manylinux_2_24_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:dd7478f1463441ae4ca7308a70e90b33470fa593429f9d4c578dd00d1fa78838", size = 12769373, upload-time = "2025-09-29T23:17:35.846Z" }, + { url = "https://files.pythonhosted.org/packages/df/91/82cc5169b6b25440a7fc0ef3a694582418d875c8e3ebf796a6d6470aa578/pandas-2.3.3-cp310-cp310-musllinux_1_2_aarch64.whl", hash = "sha256:4793891684806ae50d1288c9bae9330293ab4e083ccd1c5e383c34549c6e4250", size = 13200444, upload-time = "2025-09-29T23:17:49.341Z" }, + { url = "https://files.pythonhosted.org/packages/10/ae/89b3283800ab58f7af2952704078555fa60c807fff764395bb57ea0b0dbd/pandas-2.3.3-cp310-cp310-musllinux_1_2_x86_64.whl", hash = "sha256:28083c648d9a99a5dd035ec125d42439c6c1c525098c58af0fc38dd1a7a1b3d4", size = 13858459, upload-time = "2025-09-29T23:18:03.722Z" }, + { url = "https://files.pythonhosted.org/packages/85/72/530900610650f54a35a19476eca5104f38555afccda1aa11a92ee14cb21d/pandas-2.3.3-cp310-cp310-win_amd64.whl", hash = "sha256:503cf027cf9940d2ceaa1a93cfb5f8c8c7e6e90720a2850378f0b3f3b1e06826", size = 11346086, upload-time = "2025-09-29T23:18:18.505Z" }, + { url = "https://files.pythonhosted.org/packages/c1/fa/7ac648108144a095b4fb6aa3de1954689f7af60a14cf25583f4960ecb878/pandas-2.3.3-cp311-cp311-macosx_10_9_x86_64.whl", hash = "sha256:602b8615ebcc4a0c1751e71840428ddebeb142ec02c786e8ad6b1ce3c8dec523", size = 11578790, upload-time = "2025-09-29T23:18:30.065Z" }, + { url = "https://files.pythonhosted.org/packages/9b/35/74442388c6cf008882d4d4bdfc4109be87e9b8b7ccd097ad1e7f006e2e95/pandas-2.3.3-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:8fe25fc7b623b0ef6b5009149627e34d2a4657e880948ec3c840e9402e5c1b45", size = 10833831, upload-time = "2025-09-29T23:38:56.071Z" }, + { url = "https://files.pythonhosted.org/packages/fe/e4/de154cbfeee13383ad58d23017da99390b91d73f8c11856f2095e813201b/pandas-2.3.3-cp311-cp311-manylinux_2_24_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:b468d3dad6ff947df92dcb32ede5b7bd41a9b3cceef0a30ed925f6d01fb8fa66", size = 12199267, upload-time = "2025-09-29T23:18:41.627Z" }, + { url = "https://files.pythonhosted.org/packages/bf/c9/63f8d545568d9ab91476b1818b4741f521646cbdd151c6efebf40d6de6f7/pandas-2.3.3-cp311-cp311-manylinux_2_24_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:b98560e98cb334799c0b07ca7967ac361a47326e9b4e5a7dfb5ab2b1c9d35a1b", size = 12789281, upload-time = "2025-09-29T23:18:56.834Z" }, + { url = "https://files.pythonhosted.org/packages/f2/00/a5ac8c7a0e67fd1a6059e40aa08fa1c52cc00709077d2300e210c3ce0322/pandas-2.3.3-cp311-cp311-musllinux_1_2_aarch64.whl", hash = "sha256:1d37b5848ba49824e5c30bedb9c830ab9b7751fd049bc7914533e01c65f79791", size = 13240453, upload-time = "2025-09-29T23:19:09.247Z" }, + { url = "https://files.pythonhosted.org/packages/27/4d/5c23a5bc7bd209231618dd9e606ce076272c9bc4f12023a70e03a86b4067/pandas-2.3.3-cp311-cp311-musllinux_1_2_x86_64.whl", hash = "sha256:db4301b2d1f926ae677a751eb2bd0e8c5f5319c9cb3f88b0becbbb0b07b34151", size = 13890361, upload-time = "2025-09-29T23:19:25.342Z" }, + { url = "https://files.pythonhosted.org/packages/8e/59/712db1d7040520de7a4965df15b774348980e6df45c129b8c64d0dbe74ef/pandas-2.3.3-cp311-cp311-win_amd64.whl", hash = "sha256:f086f6fe114e19d92014a1966f43a3e62285109afe874f067f5abbdcbb10e59c", size = 11348702, upload-time = "2025-09-29T23:19:38.296Z" }, + { url = "https://files.pythonhosted.org/packages/9c/fb/231d89e8637c808b997d172b18e9d4a4bc7bf31296196c260526055d1ea0/pandas-2.3.3-cp312-cp312-macosx_10_13_x86_64.whl", hash = "sha256:6d21f6d74eb1725c2efaa71a2bfc661a0689579b58e9c0ca58a739ff0b002b53", size = 11597846, upload-time = "2025-09-29T23:19:48.856Z" }, + { url = "https://files.pythonhosted.org/packages/5c/bd/bf8064d9cfa214294356c2d6702b716d3cf3bb24be59287a6a21e24cae6b/pandas-2.3.3-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:3fd2f887589c7aa868e02632612ba39acb0b8948faf5cc58f0850e165bd46f35", size = 10729618, upload-time = "2025-09-29T23:39:08.659Z" }, + { url = "https://files.pythonhosted.org/packages/57/56/cf2dbe1a3f5271370669475ead12ce77c61726ffd19a35546e31aa8edf4e/pandas-2.3.3-cp312-cp312-manylinux_2_24_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:ecaf1e12bdc03c86ad4a7ea848d66c685cb6851d807a26aa245ca3d2017a1908", size = 11737212, upload-time = "2025-09-29T23:19:59.765Z" }, + { url = "https://files.pythonhosted.org/packages/e5/63/cd7d615331b328e287d8233ba9fdf191a9c2d11b6af0c7a59cfcec23de68/pandas-2.3.3-cp312-cp312-manylinux_2_24_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:b3d11d2fda7eb164ef27ffc14b4fcab16a80e1ce67e9f57e19ec0afaf715ba89", size = 12362693, upload-time = "2025-09-29T23:20:14.098Z" }, + { url = "https://files.pythonhosted.org/packages/a6/de/8b1895b107277d52f2b42d3a6806e69cfef0d5cf1d0ba343470b9d8e0a04/pandas-2.3.3-cp312-cp312-musllinux_1_2_aarch64.whl", hash = "sha256:a68e15f780eddf2b07d242e17a04aa187a7ee12b40b930bfdd78070556550e98", size = 12771002, upload-time = "2025-09-29T23:20:26.76Z" }, + { url = "https://files.pythonhosted.org/packages/87/21/84072af3187a677c5893b170ba2c8fbe450a6ff911234916da889b698220/pandas-2.3.3-cp312-cp312-musllinux_1_2_x86_64.whl", hash = "sha256:371a4ab48e950033bcf52b6527eccb564f52dc826c02afd9a1bc0ab731bba084", size = 13450971, upload-time = "2025-09-29T23:20:41.344Z" }, + { url = "https://files.pythonhosted.org/packages/86/41/585a168330ff063014880a80d744219dbf1dd7a1c706e75ab3425a987384/pandas-2.3.3-cp312-cp312-win_amd64.whl", hash = "sha256:a16dcec078a01eeef8ee61bf64074b4e524a2a3f4b3be9326420cabe59c4778b", size = 10992722, upload-time = "2025-09-29T23:20:54.139Z" }, + { url = "https://files.pythonhosted.org/packages/cd/4b/18b035ee18f97c1040d94debd8f2e737000ad70ccc8f5513f4eefad75f4b/pandas-2.3.3-cp313-cp313-macosx_10_13_x86_64.whl", hash = "sha256:56851a737e3470de7fa88e6131f41281ed440d29a9268dcbf0002da5ac366713", size = 11544671, upload-time = "2025-09-29T23:21:05.024Z" }, + { url = "https://files.pythonhosted.org/packages/31/94/72fac03573102779920099bcac1c3b05975c2cb5f01eac609faf34bed1ca/pandas-2.3.3-cp313-cp313-macosx_11_0_arm64.whl", hash = "sha256:bdcd9d1167f4885211e401b3036c0c8d9e274eee67ea8d0758a256d60704cfe8", size = 10680807, upload-time = "2025-09-29T23:21:15.979Z" }, + { url = "https://files.pythonhosted.org/packages/16/87/9472cf4a487d848476865321de18cc8c920b8cab98453ab79dbbc98db63a/pandas-2.3.3-cp313-cp313-manylinux_2_24_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:e32e7cc9af0f1cc15548288a51a3b681cc2a219faa838e995f7dc53dbab1062d", size = 11709872, upload-time = "2025-09-29T23:21:27.165Z" }, + { url = "https://files.pythonhosted.org/packages/15/07/284f757f63f8a8d69ed4472bfd85122bd086e637bf4ed09de572d575a693/pandas-2.3.3-cp313-cp313-manylinux_2_24_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:318d77e0e42a628c04dc56bcef4b40de67918f7041c2b061af1da41dcff670ac", size = 12306371, upload-time = "2025-09-29T23:21:40.532Z" }, + { url = "https://files.pythonhosted.org/packages/33/81/a3afc88fca4aa925804a27d2676d22dcd2031c2ebe08aabd0ae55b9ff282/pandas-2.3.3-cp313-cp313-musllinux_1_2_aarch64.whl", hash = "sha256:4e0a175408804d566144e170d0476b15d78458795bb18f1304fb94160cabf40c", size = 12765333, upload-time = "2025-09-29T23:21:55.77Z" }, + { url = "https://files.pythonhosted.org/packages/8d/0f/b4d4ae743a83742f1153464cf1a8ecfafc3ac59722a0b5c8602310cb7158/pandas-2.3.3-cp313-cp313-musllinux_1_2_x86_64.whl", hash = "sha256:93c2d9ab0fc11822b5eece72ec9587e172f63cff87c00b062f6e37448ced4493", size = 13418120, upload-time = "2025-09-29T23:22:10.109Z" }, + { url = "https://files.pythonhosted.org/packages/4f/c7/e54682c96a895d0c808453269e0b5928a07a127a15704fedb643e9b0a4c8/pandas-2.3.3-cp313-cp313-win_amd64.whl", hash = "sha256:f8bfc0e12dc78f777f323f55c58649591b2cd0c43534e8355c51d3fede5f4dee", size = 10993991, upload-time = "2025-09-29T23:25:04.889Z" }, + { url = "https://files.pythonhosted.org/packages/f9/ca/3f8d4f49740799189e1395812f3bf23b5e8fc7c190827d55a610da72ce55/pandas-2.3.3-cp313-cp313t-macosx_10_13_x86_64.whl", hash = "sha256:75ea25f9529fdec2d2e93a42c523962261e567d250b0013b16210e1d40d7c2e5", size = 12048227, upload-time = "2025-09-29T23:22:24.343Z" }, + { url = "https://files.pythonhosted.org/packages/0e/5a/f43efec3e8c0cc92c4663ccad372dbdff72b60bdb56b2749f04aa1d07d7e/pandas-2.3.3-cp313-cp313t-macosx_11_0_arm64.whl", hash = "sha256:74ecdf1d301e812db96a465a525952f4dde225fdb6d8e5a521d47e1f42041e21", size = 11411056, upload-time = "2025-09-29T23:22:37.762Z" }, + { url = "https://files.pythonhosted.org/packages/46/b1/85331edfc591208c9d1a63a06baa67b21d332e63b7a591a5ba42a10bb507/pandas-2.3.3-cp313-cp313t-manylinux_2_24_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:6435cb949cb34ec11cc9860246ccb2fdc9ecd742c12d3304989017d53f039a78", size = 11645189, upload-time = "2025-09-29T23:22:51.688Z" }, + { url = "https://files.pythonhosted.org/packages/44/23/78d645adc35d94d1ac4f2a3c4112ab6f5b8999f4898b8cdf01252f8df4a9/pandas-2.3.3-cp313-cp313t-manylinux_2_24_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:900f47d8f20860de523a1ac881c4c36d65efcb2eb850e6948140fa781736e110", size = 12121912, upload-time = "2025-09-29T23:23:05.042Z" }, + { url = "https://files.pythonhosted.org/packages/53/da/d10013df5e6aaef6b425aa0c32e1fc1f3e431e4bcabd420517dceadce354/pandas-2.3.3-cp313-cp313t-musllinux_1_2_aarch64.whl", hash = "sha256:a45c765238e2ed7d7c608fc5bc4a6f88b642f2f01e70c0c23d2224dd21829d86", size = 12712160, upload-time = "2025-09-29T23:23:28.57Z" }, + { url = "https://files.pythonhosted.org/packages/bd/17/e756653095a083d8a37cbd816cb87148debcfcd920129b25f99dd8d04271/pandas-2.3.3-cp313-cp313t-musllinux_1_2_x86_64.whl", hash = "sha256:c4fc4c21971a1a9f4bdb4c73978c7f7256caa3e62b323f70d6cb80db583350bc", size = 13199233, upload-time = "2025-09-29T23:24:24.876Z" }, + { url = "https://files.pythonhosted.org/packages/04/fd/74903979833db8390b73b3a8a7d30d146d710bd32703724dd9083950386f/pandas-2.3.3-cp314-cp314-macosx_10_13_x86_64.whl", hash = "sha256:ee15f284898e7b246df8087fc82b87b01686f98ee67d85a17b7ab44143a3a9a0", size = 11540635, upload-time = "2025-09-29T23:25:52.486Z" }, + { url = "https://files.pythonhosted.org/packages/21/00/266d6b357ad5e6d3ad55093a7e8efc7dd245f5a842b584db9f30b0f0a287/pandas-2.3.3-cp314-cp314-macosx_11_0_arm64.whl", hash = "sha256:1611aedd912e1ff81ff41c745822980c49ce4a7907537be8692c8dbc31924593", size = 10759079, upload-time = "2025-09-29T23:26:33.204Z" }, + { url = "https://files.pythonhosted.org/packages/ca/05/d01ef80a7a3a12b2f8bbf16daba1e17c98a2f039cbc8e2f77a2c5a63d382/pandas-2.3.3-cp314-cp314-manylinux_2_24_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:6d2cefc361461662ac48810cb14365a365ce864afe85ef1f447ff5a1e99ea81c", size = 11814049, upload-time = "2025-09-29T23:27:15.384Z" }, + { url = "https://files.pythonhosted.org/packages/15/b2/0e62f78c0c5ba7e3d2c5945a82456f4fac76c480940f805e0b97fcbc2f65/pandas-2.3.3-cp314-cp314-manylinux_2_24_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:ee67acbbf05014ea6c763beb097e03cd629961c8a632075eeb34247120abcb4b", size = 12332638, upload-time = "2025-09-29T23:27:51.625Z" }, + { url = "https://files.pythonhosted.org/packages/c5/33/dd70400631b62b9b29c3c93d2feee1d0964dc2bae2e5ad7a6c73a7f25325/pandas-2.3.3-cp314-cp314-musllinux_1_2_aarch64.whl", hash = "sha256:c46467899aaa4da076d5abc11084634e2d197e9460643dd455ac3db5856b24d6", size = 12886834, upload-time = "2025-09-29T23:28:21.289Z" }, + { url = "https://files.pythonhosted.org/packages/d3/18/b5d48f55821228d0d2692b34fd5034bb185e854bdb592e9c640f6290e012/pandas-2.3.3-cp314-cp314-musllinux_1_2_x86_64.whl", hash = "sha256:6253c72c6a1d990a410bc7de641d34053364ef8bcd3126f7e7450125887dffe3", size = 13409925, upload-time = "2025-09-29T23:28:58.261Z" }, + { url = "https://files.pythonhosted.org/packages/a6/3d/124ac75fcd0ecc09b8fdccb0246ef65e35b012030defb0e0eba2cbbbe948/pandas-2.3.3-cp314-cp314-win_amd64.whl", hash = "sha256:1b07204a219b3b7350abaae088f451860223a52cfb8a6c53358e7948735158e5", size = 11109071, upload-time = "2025-09-29T23:32:27.484Z" }, + { url = "https://files.pythonhosted.org/packages/89/9c/0e21c895c38a157e0faa1fb64587a9226d6dd46452cac4532d80c3c4a244/pandas-2.3.3-cp314-cp314t-macosx_10_13_x86_64.whl", hash = "sha256:2462b1a365b6109d275250baaae7b760fd25c726aaca0054649286bcfbb3e8ec", size = 12048504, upload-time = "2025-09-29T23:29:31.47Z" }, + { url = "https://files.pythonhosted.org/packages/d7/82/b69a1c95df796858777b68fbe6a81d37443a33319761d7c652ce77797475/pandas-2.3.3-cp314-cp314t-macosx_11_0_arm64.whl", hash = "sha256:0242fe9a49aa8b4d78a4fa03acb397a58833ef6199e9aa40a95f027bb3a1b6e7", size = 11410702, upload-time = "2025-09-29T23:29:54.591Z" }, + { url = "https://files.pythonhosted.org/packages/f9/88/702bde3ba0a94b8c73a0181e05144b10f13f29ebfc2150c3a79062a8195d/pandas-2.3.3-cp314-cp314t-manylinux_2_24_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:a21d830e78df0a515db2b3d2f5570610f5e6bd2e27749770e8bb7b524b89b450", size = 11634535, upload-time = "2025-09-29T23:30:21.003Z" }, + { url = "https://files.pythonhosted.org/packages/a4/1e/1bac1a839d12e6a82ec6cb40cda2edde64a2013a66963293696bbf31fbbb/pandas-2.3.3-cp314-cp314t-manylinux_2_24_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:2e3ebdb170b5ef78f19bfb71b0dc5dc58775032361fa188e814959b74d726dd5", size = 12121582, upload-time = "2025-09-29T23:30:43.391Z" }, + { url = "https://files.pythonhosted.org/packages/44/91/483de934193e12a3b1d6ae7c8645d083ff88dec75f46e827562f1e4b4da6/pandas-2.3.3-cp314-cp314t-musllinux_1_2_aarch64.whl", hash = "sha256:d051c0e065b94b7a3cea50eb1ec32e912cd96dba41647eb24104b6c6c14c5788", size = 12699963, upload-time = "2025-09-29T23:31:10.009Z" }, + { url = "https://files.pythonhosted.org/packages/70/44/5191d2e4026f86a2a109053e194d3ba7a31a2d10a9c2348368c63ed4e85a/pandas-2.3.3-cp314-cp314t-musllinux_1_2_x86_64.whl", hash = "sha256:3869faf4bd07b3b66a9f462417d0ca3a9df29a9f6abd5d0d0dbab15dac7abe87", size = 13202175, upload-time = "2025-09-29T23:31:59.173Z" }, ] [[package]] @@ -2620,7 +2633,7 @@ bedrock = [{ name = "boto3", specifier = ">=1.34.92,<2" }] cloud-export-to-parquet = [ { name = "boto3", specifier = ">=1.34.89,<2" }, { name = "numpy", marker = "python_full_version >= '3.10' and python_full_version < '3.13'", specifier = ">=1.26.0,<2" }, - { name = "pandas", marker = "python_full_version >= '3.10' and python_full_version < '4'", specifier = ">=2.2.2,<3" }, + { name = "pandas", marker = "python_full_version >= '3.10' and python_full_version < '4'", specifier = ">=2.3.3,<3" }, { name = "pyarrow", specifier = ">=19.0.1" }, ] dev = [ diff --git a/workflow_streams/README.md b/workflow_streams/README.md new file mode 100644 index 00000000..f06bed39 --- /dev/null +++ b/workflow_streams/README.md @@ -0,0 +1,120 @@ +# Workflow Streams + +> **Experimental.** These samples use +> `temporalio.contrib.workflow_streams`, which ships in +> `temporalio>=1.27.0`. The module is considered experimental and its +> API may change in future versions. + +`temporalio.contrib.workflow_streams` lets a workflow host a durable, +offset-addressed event channel. The workflow holds an append-only log; +external clients (activities, starters, web backends) publish to topics via +signals and subscribe via long-poll updates. This packages the +boilerplate — batching, offset tracking, topic filtering, +continue-as-new hand-off — into a reusable stream. + +This directory has five scenarios. The first four share one worker; +the fifth has its own worker because it needs the `openai` package +and an `OPENAI_API_KEY`. + +**Scenario 1 — basic publish/subscribe with heterogeneous topics:** + +* `workflows/order_workflow.py` — a workflow that hosts a + `WorkflowStream` and publishes status events as it processes an order. +* `activities/payment_activity.py` — an activity that publishes + intermediate progress to the stream via + `WorkflowStreamClient.from_within_activity()`. +* `run_publisher.py` — starts the workflow, subscribes to both topics, + decodes each by `item.topic`, and prints events as they arrive. + +**Scenario 2 — reconnecting subscriber:** + +* `workflows/pipeline_workflow.py` — a multi-stage pipeline that + publishes stage transitions over ~10 seconds, leaving room for a + consumer to disconnect and reconnect mid-run. +* `run_reconnecting_subscriber.py` — connects, reads a couple of + events, "disconnects," then reopens a fresh client and resumes via + `subscribe(from_offset=...)`. This is the central Workflow Streams + use case: a consumer can disappear (page refresh, server restart, + laptop closed) and resume later without missing events or seeing + duplicates. + +**Scenario 3 — external (non-Activity) publisher:** + +* `workflows/hub_workflow.py` — a passive workflow that does no work + of its own; it exists only to host a `WorkflowStream` and shut down + when signaled. +* `run_external_publisher.py` — starts the hub, then publishes events + into it from a plain Python coroutine using + `WorkflowStreamClient.create(client, workflow_id)`. A subscriber + task runs alongside; when the publisher is done it emits a sentinel + event and signals `HubWorkflow.close`. The shape that fits a + backend service or scheduled job pushing events into a workflow it + didn't itself start. + +**Scenario 4 — bounded log via `truncate()`:** + +* `workflows/ticker_workflow.py` — a long-running workflow that + publishes events at a fixed cadence and calls + `self.stream.truncate(...)` periodically to bound log growth, + keeping only the most recent N entries. +* `run_truncating_ticker.py` — runs a fast subscriber and a slow + subscriber side by side. The fast one keeps up and sees every + offset in order; the slow one falls behind a truncation and + silently jumps forward to the new base offset. The output makes + the trade visible: bounded log size in exchange for intermediate + events being invisible to slow consumers. + +**Scenario 5 — LLM streaming:** + +* `workflows/llm_workflow.py` — hosts a `WorkflowStream` and runs + `stream_completion` as a single activity. The workflow itself + does no streaming; the activity owns the non-deterministic OpenAI + call. +* `activities/llm_activity.py` — calls + `openai.AsyncOpenAI().chat.completions.create(stream=True)`, + publishes each token chunk on the `delta` topic, the final + accumulated text on `complete`, and a `RetryEvent` on `retry` + when running on attempt > 1. +* `run_llm.py` — subscribes to all three topics, renders deltas to + the terminal as they arrive, and on a `retry` event uses ANSI + escapes to rewind the printed output before the retried attempt + re-publishes. + +Scenario 5 runs on its own worker (`run_llm_worker.py`, on +`workflow-stream-llm-task-queue`) because it needs the `openai` +dependency and an `OPENAI_API_KEY`, and because killing this worker +mid-stream is the easiest way to demonstrate retry handling without +disrupting the other four scenarios. + +## Run it + +For scenarios 1–4, start the shared worker: + +```bash +uv run workflow_streams/run_worker.py +``` + +For scenario 5, install the extra, export the key, and start the +LLM worker: + +```bash +uv sync --group llm-stream +export OPENAI_API_KEY=... +uv run workflow_streams/run_llm_worker.py +``` + +Then in another terminal, pick a scenario: + +```bash +uv run workflow_streams/run_publisher.py # scenario 1 +uv run workflow_streams/run_reconnecting_subscriber.py # scenario 2 +uv run workflow_streams/run_external_publisher.py # scenario 3 +uv run workflow_streams/run_truncating_ticker.py # scenario 4 +uv run workflow_streams/run_llm.py # scenario 5 +``` + +To exercise scenario 5's retry path, kill `run_llm_worker.py` +(`Ctrl-C`) while output is streaming and start it again. The +activity's next attempt sends a `RetryEvent` first; the consumer +clears its on-screen output via ANSI escapes and re-renders from +scratch. diff --git a/workflow_streams/__init__.py b/workflow_streams/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/workflow_streams/activities/__init__.py b/workflow_streams/activities/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/workflow_streams/activities/llm_activity.py b/workflow_streams/activities/llm_activity.py new file mode 100644 index 00000000..aebe80ec --- /dev/null +++ b/workflow_streams/activities/llm_activity.py @@ -0,0 +1,71 @@ +from __future__ import annotations + +from datetime import timedelta + +from openai import AsyncOpenAI +from temporalio import activity +from temporalio.contrib.workflow_streams import WorkflowStreamClient + +from workflow_streams.llm_shared import ( + TOPIC_COMPLETE, + TOPIC_DELTA, + TOPIC_RETRY, + LLMInput, + RetryEvent, + TextComplete, + TextDelta, +) + + +@activity.defn +async def stream_completion(input: LLMInput) -> str: + """Stream an LLM completion to the parent workflow's stream. + + Activity-as-publisher: each delta from the OpenAI streaming API is + pushed to the workflow's stream as a ``TextDelta`` event on the + ``delta`` topic. The accumulated full text returns as the + activity's result and is also published on the ``complete`` topic + as a terminator. On retry attempts (``activity.info().attempt > 1``) + a ``RetryEvent`` lands on the ``retry`` topic before the new + attempt's deltas, so consumers can reset their accumulated state + instead of concatenating the failed attempt's partial output with + the retried attempt's full output. + + No ``force_flush=True``: the 200ms ``batch_interval`` is fast + enough for an interactive feel, and the WorkflowStreamClient's + ``__aexit__`` cancels a sleeping flusher cleanly. + """ + stream_client = WorkflowStreamClient.from_within_activity( + batch_interval=timedelta(milliseconds=200), + ) + # Disable provider-side retries; let Temporal own retry policy at + # the activity layer. + openai_client = AsyncOpenAI(max_retries=0) + + async with stream_client: + deltas = stream_client.topic(TOPIC_DELTA, type=TextDelta) + complete = stream_client.topic(TOPIC_COMPLETE, type=TextComplete) + retry = stream_client.topic(TOPIC_RETRY, type=RetryEvent) + + attempt = activity.info().attempt + if attempt > 1: + retry.publish(RetryEvent(attempt=attempt)) + + full: list[str] = [] + oai_stream = await openai_client.chat.completions.create( + model=input.model, + messages=[{"role": "user", "content": input.prompt}], + stream=True, + ) + async for chunk in oai_stream: + if not chunk.choices: + continue + text = chunk.choices[0].delta.content + if not text: + continue + deltas.publish(TextDelta(text=text)) + full.append(text) + + full_text = "".join(full) + complete.publish(TextComplete(full_text=full_text)) + return full_text diff --git a/workflow_streams/activities/payment_activity.py b/workflow_streams/activities/payment_activity.py new file mode 100644 index 00000000..b3b2aa29 --- /dev/null +++ b/workflow_streams/activities/payment_activity.py @@ -0,0 +1,30 @@ +from __future__ import annotations + +import asyncio +from datetime import timedelta + +from temporalio import activity +from temporalio.contrib.workflow_streams import WorkflowStreamClient + +from workflow_streams.shared import TOPIC_PROGRESS, ProgressEvent + + +@activity.defn +async def charge_card(order_id: str) -> str: + """Pretend to charge a card, publishing progress to the parent workflow. + + `WorkflowStreamClient.from_within_activity()` reads the parent + workflow id and the Temporal client from the activity context, so + this activity can push events back without any wiring. + """ + client = WorkflowStreamClient.from_within_activity( + batch_interval=timedelta(milliseconds=200) + ) + async with client: + progress = client.topic(TOPIC_PROGRESS, type=ProgressEvent) + progress.publish(ProgressEvent(message="charging card...")) + await asyncio.sleep(1.0) + progress.publish( + ProgressEvent(message="card charged"), + ) + return f"charge-{order_id}" diff --git a/workflow_streams/llm_shared.py b/workflow_streams/llm_shared.py new file mode 100644 index 00000000..2780fd0b --- /dev/null +++ b/workflow_streams/llm_shared.py @@ -0,0 +1,44 @@ +"""Types and constants for the LLM-streaming scenario. + +Kept separate from ``shared.py`` because the other scenarios don't +use these — and this scenario runs on its own worker and task queue +so the ``openai`` dependency stays out of everyone else's path. +""" + +from __future__ import annotations + +from dataclasses import dataclass + +from temporalio.contrib.workflow_streams import WorkflowStreamState + +# Scenario 5 runs on its own worker so the openai dependency only +# matters for that scenario. +LLM_TASK_QUEUE = "workflow-stream-llm-task-queue" + +# Topics published by the activity. +TOPIC_DELTA = "delta" +TOPIC_COMPLETE = "complete" +TOPIC_RETRY = "retry" + + +@dataclass +class LLMInput: + prompt: str + model: str = "gpt-5-mini" + # Carries stream state across continue-as-new. None on a fresh start. + stream_state: WorkflowStreamState | None = None + + +@dataclass +class TextDelta: + text: str + + +@dataclass +class TextComplete: + full_text: str + + +@dataclass +class RetryEvent: + attempt: int diff --git a/workflow_streams/run_external_publisher.py b/workflow_streams/run_external_publisher.py new file mode 100644 index 00000000..8e7d38f8 --- /dev/null +++ b/workflow_streams/run_external_publisher.py @@ -0,0 +1,99 @@ +"""External publisher: a non-Activity process pushes events into a workflow. + +The two earlier scenarios publish from inside the workflow itself +(``OrderWorkflow``, ``PipelineWorkflow``) or from an Activity it runs +(``charge_card``). This scenario shows the third shape: a backend +service, scheduled job, or anything else with a Temporal ``Client`` +publishing into a *running* workflow it didn't start. Same factory as +the subscribe path — :py:meth:`WorkflowStreamClient.create` — used for +publishing instead. + +The script starts a ``HubWorkflow`` (which does no work of its own — +it exists only to host the stream), then runs a publisher and a +subscriber concurrently. When the publisher is done it signals +``HubWorkflow.close``, the workflow's run finishes, and the +subscriber's iterator exits normally. + +Run the worker first (``uv run workflow_streams/run_worker.py``), then:: + + uv run workflow_streams/run_external_publisher.py +""" + +from __future__ import annotations + +import asyncio +import uuid + +from temporalio.client import Client +from temporalio.contrib.workflow_streams import WorkflowStreamClient + +from workflow_streams.shared import ( + TASK_QUEUE, + TOPIC_NEWS, + HubInput, + NewsEvent, +) +from workflow_streams.workflows.hub_workflow import HubWorkflow + +HEADLINES = [ + "rates held", + "merger announced", + "outage resolved", + "earnings beat", + "regulator opens probe", +] + +# In-band terminator the publisher emits before signaling close. The +# subscriber recognizes this value and stops polling — without an +# explicit terminator the consumer would have to rely on the workflow +# returning to break the iterator, which means racing the last item +# delivery against workflow completion. +DONE_HEADLINE = "__done__" + + +async def main() -> None: + client = await Client.connect("localhost:7233") + + workflow_id = f"workflow-stream-hub-{uuid.uuid4().hex[:8]}" + handle = await client.start_workflow( + HubWorkflow.run, + HubInput(hub_id=workflow_id), + id=workflow_id, + task_queue=TASK_QUEUE, + ) + + async def publish_news() -> None: + # WorkflowStreamClient.create takes a Temporal client and a + # workflow id — the same factory used elsewhere for subscribing. + # The async context manager batches publishes and flushes on + # exit; we additionally call flush() before signaling close so + # we know the events landed before the workflow shuts down. + producer = WorkflowStreamClient.create(client, workflow_id) + async with producer: + news = producer.topic(TOPIC_NEWS, type=NewsEvent) + for headline in HEADLINES: + news.publish(NewsEvent(headline=headline)) + print(f"[publisher] sent: {headline}") + await asyncio.sleep(0.5) + news.publish(NewsEvent(headline=DONE_HEADLINE), force_flush=True) + await producer.flush() + # Tell the hub it can stop. The subscriber has already broken + # out of its async-for loop on the sentinel above. + await handle.signal(HubWorkflow.close) + print("[publisher] signaled close") + + async def consume_news() -> None: + consumer = WorkflowStreamClient.create(client, workflow_id) + async for item in consumer.subscribe([TOPIC_NEWS], result_type=NewsEvent): + if item.data.headline == DONE_HEADLINE: + return + print(f"[subscriber] offset={item.offset}: {item.data.headline}") + + await asyncio.gather(publish_news(), consume_news()) + + result = await handle.result() + print(f"\nworkflow result: {result}") + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/workflow_streams/run_llm.py b/workflow_streams/run_llm.py new file mode 100644 index 00000000..1b2a9d7a --- /dev/null +++ b/workflow_streams/run_llm.py @@ -0,0 +1,132 @@ +"""Stream LLM output to the terminal, handling retries. + +Starts an ``LLMWorkflow``, subscribes to its delta / complete / retry +topics, and renders the model's output to stdout as it arrives. On a +``RETRY`` event (the activity is on attempt > 1), the consumer rewinds +its rendered output with ANSI escapes and starts fresh — so a killed +worker doesn't leave a half-finished response stuck on screen +followed by the retried attempt's full output. + +Requires ``OPENAI_API_KEY`` in the environment and the ``llm-stream`` +extra:: + + uv sync --group llm-stream + export OPENAI_API_KEY=... + +Run the LLM worker first (``uv run workflow_streams/run_llm_worker.py``), +then:: + + uv run workflow_streams/run_llm.py + +To see retry handling in action, kill the LLM worker mid-stream +(Ctrl-C in its terminal) and start it again. The consumer will clear +its accumulated output on the ``RETRY`` event and re-render the +retried attempt's output from scratch. +""" + +from __future__ import annotations + +import asyncio +import sys +import uuid + +from temporalio.client import Client +from temporalio.common import RawValue +from temporalio.contrib.workflow_streams import WorkflowStreamClient + +from workflow_streams.llm_shared import ( + LLM_TASK_QUEUE, + TOPIC_COMPLETE, + TOPIC_DELTA, + TOPIC_RETRY, + LLMInput, + RetryEvent, + TextComplete, + TextDelta, +) +from workflow_streams.workflows.llm_workflow import LLMWorkflow + +# Long enough that you can comfortably kill the worker mid-stream and +# watch the retry render. Adjust to taste. +DEFAULT_PROMPT = ( + "Write a 500-word comparison of Paxos, Raft, and Viewstamped " + "Replication for a new distributed-systems engineer. Cover the " + "core ideas, leader election, normal-case operation, " + "reconfiguration, and the practical tradeoffs that show up when " + "implementing each. Use short paragraphs." +) + + +# ANSI cursor save / restore. ``\033[s`` saves the current cursor +# position, ``\033[u`` restores it, ``\033[J`` clears from the cursor +# to the end of the screen. Save once before the first delta, and on +# RETRY restore + clear-to-end so the failed attempt's rendered output +# disappears regardless of how it was wrapped by the terminal. Save +# again afterwards so a second retry can rewind to the same point. +ANSI_SAVE = "\033[s" +ANSI_RESTORE_AND_CLEAR = "\033[u\033[J" + + +async def main() -> None: + client = await Client.connect("localhost:7233") + converter = client.data_converter.payload_converter + + workflow_id = f"workflow-stream-llm-{uuid.uuid4().hex[:8]}" + llm_input = LLMInput(prompt=DEFAULT_PROMPT) + handle = await client.start_workflow( + LLMWorkflow.run, + llm_input, + id=workflow_id, + task_queue=LLM_TASK_QUEUE, + ) + + # Print a header so the user sees something immediately. The + # response will start streaming below it once the first delta + # arrives — until then this is the only line on screen. + print( + f"[llm {workflow_id}] streaming response from {llm_input.model}, " + f"awaiting first token..." + ) + print() + sys.stdout.write(ANSI_SAVE) + sys.stdout.flush() + + stream = WorkflowStreamClient.create(client, workflow_id) + + # result_type=RawValue lets us dispatch on item.topic and decode + # against the right dataclass per topic. The loop ends either on + # the `complete` terminator (break) or because the iterator + # naturally exhausts when the workflow reaches a terminal state + # without one (activity exhausted retries, etc.). Either way the + # handle.result() below either returns the full text or raises + # the workflow's failure. + async for item in stream.subscribe( + [TOPIC_DELTA, TOPIC_RETRY, TOPIC_COMPLETE], + result_type=RawValue, + ): + if item.topic == TOPIC_RETRY: + evt = converter.from_payload(item.data.payload, RetryEvent) + sys.stdout.write(ANSI_RESTORE_AND_CLEAR) + sys.stdout.flush() + print(f"[retry attempt {evt.attempt}] resetting output") + print() + sys.stdout.write(ANSI_SAVE) + sys.stdout.flush() + elif item.topic == TOPIC_DELTA: + delta = converter.from_payload(item.data.payload, TextDelta) + sys.stdout.write(delta.text) + sys.stdout.flush() + elif item.topic == TOPIC_COMPLETE: + # The full text is also in the payload (and returned by + # the workflow), but the consumer has already rendered it + # incrementally. Just terminate the line. + converter.from_payload(item.data.payload, TextComplete) + print() + break + + result = await handle.result() + print(f"\n[workflow result: {len(result)} chars]") + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/workflow_streams/run_llm_worker.py b/workflow_streams/run_llm_worker.py new file mode 100644 index 00000000..2bad5991 --- /dev/null +++ b/workflow_streams/run_llm_worker.py @@ -0,0 +1,40 @@ +"""Worker for the LLM-streaming scenario. + +Runs separately from ``run_worker.py`` so the ``openai`` dependency +and the ``OPENAI_API_KEY`` requirement stay isolated to this one +scenario. Different task queue too — the other four samples won't +route work to this worker. + +Kill this worker mid-stream while ``run_llm.py`` is running to +trigger a retry: Temporal restarts the activity on the next worker +to come up, the activity publishes a ``RetryEvent`` on its second +attempt, and the consumer resets its rendered output. +""" + +from __future__ import annotations + +import asyncio +import logging + +from temporalio.client import Client +from temporalio.worker import Worker + +from workflow_streams.activities.llm_activity import stream_completion +from workflow_streams.llm_shared import LLM_TASK_QUEUE +from workflow_streams.workflows.llm_workflow import LLMWorkflow + + +async def main() -> None: + logging.basicConfig(level=logging.INFO) + client = await Client.connect("localhost:7233") + worker = Worker( + client, + task_queue=LLM_TASK_QUEUE, + workflows=[LLMWorkflow], + activities=[stream_completion], + ) + await worker.run() + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/workflow_streams/run_publisher.py b/workflow_streams/run_publisher.py new file mode 100644 index 00000000..9f76ee41 --- /dev/null +++ b/workflow_streams/run_publisher.py @@ -0,0 +1,60 @@ +from __future__ import annotations + +import asyncio +import uuid + +from temporalio.client import Client +from temporalio.common import RawValue +from temporalio.contrib.workflow_streams import WorkflowStreamClient + +from workflow_streams.shared import ( + TASK_QUEUE, + TOPIC_PROGRESS, + TOPIC_STATUS, + OrderInput, + ProgressEvent, + StatusEvent, +) +from workflow_streams.workflows.order_workflow import OrderWorkflow + + +async def main() -> None: + client = await Client.connect("localhost:7233") + + workflow_id = f"workflow-stream-order-{uuid.uuid4().hex[:8]}" + handle = await client.start_workflow( + OrderWorkflow.run, + OrderInput(order_id="order-1"), + id=workflow_id, + task_queue=TASK_QUEUE, + ) + + stream = WorkflowStreamClient.create(client, workflow_id) + converter = client.data_converter.payload_converter + + # Single iterator over both topics — avoids a cancellation race + # between two concurrent subscribers. result_type=RawValue + # delivers the underlying Payload so we can dispatch heterogeneous + # events on item.topic. The loop ends either on the in-band + # `complete` terminator (break) or because the iterator exhausts + # when the workflow reaches a terminal state without one (e.g. on + # failure). Either way we then await handle.result(), which raises + # if the workflow failed. + async for item in stream.subscribe( + [TOPIC_STATUS, TOPIC_PROGRESS], result_type=RawValue + ): + if item.topic == TOPIC_STATUS: + evt = converter.from_payload(item.data.payload, StatusEvent) + print(f"[status] {evt.kind}: order={evt.order_id}") + if evt.kind == "complete": + break + elif item.topic == TOPIC_PROGRESS: + progress = converter.from_payload(item.data.payload, ProgressEvent) + print(f"[progress] {progress.message}") + + result = await handle.result() + print(f"workflow result: {result}") + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/workflow_streams/run_reconnecting_subscriber.py b/workflow_streams/run_reconnecting_subscriber.py new file mode 100644 index 00000000..d0da5e32 --- /dev/null +++ b/workflow_streams/run_reconnecting_subscriber.py @@ -0,0 +1,164 @@ +"""Reconnecting subscriber: read a few events, disconnect, resume. + +Demonstrates the central Workflow Streams use case: a consumer can +disappear mid-stream — page refresh, server restart, laptop closed — +and resume later without missing events or seeing duplicates. The +event log lives in the Workflow, so the consumer just remembers where +it stopped. + +The script runs the pattern in two phases inside one process to keep +the demo short. The same code shape works across actual process +restarts because the resume offset is durable in the workflow, not in +the consumer. + +Output is one line per emit, with current stream stats in a left column +and a phase / event message in a right column. A background poller +calls ``WorkflowStreamClient.get_offset()`` for the whole demo and +emits a heartbeat line once a second so you can watch ``pending`` +(``available - processed``) grow while the consumer is disconnected +and shrink as phase 2 catches up. + +Run the worker first (``uv run workflow_streams/run_worker.py``), then:: + + uv run workflow_streams/run_reconnecting_subscriber.py +""" + +from __future__ import annotations + +import asyncio +import uuid +from dataclasses import dataclass + +from temporalio.client import Client +from temporalio.contrib.workflow_streams import WorkflowStreamClient + +from workflow_streams.shared import ( + TASK_QUEUE, + TOPIC_STATUS, + PipelineInput, + StageEvent, +) +from workflow_streams.workflows.pipeline_workflow import PipelineWorkflow + +# Number of events read in phase 1 before simulating a disconnect. +# Picked small enough that the workflow is still running after. +PHASE_1_EVENTS = 2 + +# How long to stay disconnected. +DISCONNECT_SECONDS = 3.0 + +# Background poller cadence. The poller refreshes state.available this +# often and emits a heartbeat line once per HEARTBEAT_SECONDS. +POLL_INTERVAL_SECONDS = 0.25 +HEARTBEAT_SECONDS = 1.0 + +# Width of the stats column. Picked to fit the longest stats string. +LEFT_WIDTH = 30 + + +@dataclass +class State: + processed: int = 0 + available: int = 0 + + @property + def pending(self) -> int: + return max(0, self.available - self.processed) + + +def emit(state: State, message: str) -> None: + left = ( + f"proc={state.processed:>2} " + f"avail={state.available:>2} " + f"pend={state.pending:>2}" + ) + print(f"{left:<{LEFT_WIDTH}}│ {message}", flush=True) + + +async def main() -> None: + client = await Client.connect("localhost:7233") + + workflow_id = f"workflow-stream-pipeline-{uuid.uuid4().hex[:8]}" + handle = await client.start_workflow( + PipelineWorkflow.run, + PipelineInput(pipeline_id=workflow_id), + id=workflow_id, + task_queue=TASK_QUEUE, + ) + + # In a production web backend the resume offset would live in + # durable storage keyed by (user_id, run_id) — a database row, a + # Redis key, etc. For an in-process demo a State.processed + # attribute works the same way. + state = State() + stream = WorkflowStreamClient.create(client, workflow_id) + emit(state, f"started {workflow_id}") + + stop = asyncio.Event() + + async def poller() -> None: + """Refresh state.available; emit a heartbeat line once a second.""" + loop = asyncio.get_running_loop() + last_emit = loop.time() + while not stop.is_set(): + try: + state.available = await stream.get_offset() + except Exception: + pass + now = loop.time() + if now - last_emit >= HEARTBEAT_SECONDS: + emit(state, "·") + last_emit = now + try: + await asyncio.wait_for(stop.wait(), timeout=POLL_INTERVAL_SECONDS) + except asyncio.TimeoutError: + pass + + poller_task = asyncio.create_task(poller()) + try: + # ---- Phase 1: connect, read a couple of events, "disconnect". + emit(state, "[phase 1] connecting") + seen = 0 + async for item in stream.subscribe([TOPIC_STATUS], result_type=StageEvent): + # Remember *one past* the offset just consumed: on resume we + # want the next unseen event, not the one we already showed. + state.processed = item.offset + 1 + emit(state, f" offset={item.offset:2d} stage={item.data.stage}") + seen += 1 + if seen >= PHASE_1_EVENTS: + break + emit(state, "[phase 1] disconnecting") + + # ---- Disconnect window: nobody reads. The workflow keeps + # publishing — `pend` grows on the heartbeat lines as the offset + # advances past `processed`. + await asyncio.sleep(DISCONNECT_SECONDS) + + # ---- Phase 2: brand-new client + stream, resume from saved + # offset. Same shape as a different process picking up where the + # first one left off. + emit(state, "[phase 2] reconnecting") + client2 = await Client.connect("localhost:7233") + stream2 = WorkflowStreamClient.create(client2, workflow_id) + async for item in stream2.subscribe( + [TOPIC_STATUS], + from_offset=state.processed, + result_type=StageEvent, + ): + state.processed = item.offset + 1 + emit(state, f" offset={item.offset:2d} stage={item.data.stage}") + if item.data.stage == "complete": + break + + result = await handle.result() + emit(state, f"workflow result: {result}") + finally: + stop.set() + try: + await poller_task + except asyncio.CancelledError: + pass + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/workflow_streams/run_truncating_ticker.py b/workflow_streams/run_truncating_ticker.py new file mode 100644 index 00000000..26399447 --- /dev/null +++ b/workflow_streams/run_truncating_ticker.py @@ -0,0 +1,129 @@ +"""Truncating ticker: bounded log + slow vs. fast subscribers. + +The ``TickerWorkflow`` publishes ``count`` events at a fixed interval, +calling ``self.stream.truncate(...)`` periodically to bound log +growth. This script subscribes twice — once fast, once slow — and +prints them in two lanes so the trade is visible at a glance: + +* **Fast lane** (left). Keeps up. Sees every published offset. +* **Slow lane** (right). Sleeps between iterations. When a truncation + has dropped its position by the time it polls again, the iterator + silently jumps forward to the new base offset; the slow lane prints + a ``↪ jumped N → M (K dropped)`` marker for each gap and resumes + at the new offset. + +``truncate()`` is unilateral: the workflow does not know who is +subscribed and does not wait for them. The implicit alternative — +never truncating — keeps every event around forever, lets slow +consumers eventually catch up without losses, and pays for it in +unbounded workflow history. The truncation model is the opposite +trade: bounded log, at-best-effort delivery to slow consumers, no +backpressure on the publisher. Pair it with set-semantic events where +each event carries enough state to make missing the prior ones +recoverable. (If you actually need lossless delivery to slow +consumers, the workflow has to coordinate acknowledgements +explicitly — that is a different sample.) + +Run the worker first (``uv run workflow_streams/run_worker.py``), then:: + + uv run workflow_streams/run_truncating_ticker.py +""" + +from __future__ import annotations + +import asyncio +import uuid + +from temporalio.client import Client +from temporalio.contrib.workflow_streams import WorkflowStreamClient + +from workflow_streams.shared import ( + TASK_QUEUE, + TOPIC_TICK, + TickerInput, + TickEvent, +) +from workflow_streams.workflows.ticker_workflow import TickerWorkflow + +# Aggressive truncation so the log stays at most KEEP_LAST entries +# right after each truncation, which keeps the slow subscriber's +# per-poll batch tiny. Small batches + a slow per-event sleep mean the +# slow subscriber re-polls often, and most of those polls land after a +# truncation that has passed its position — so it sees several jumps +# during the run rather than one batched at the end. +TICKER_COUNT = 30 +INTERVAL_MS = 200 +TRUNCATE_EVERY = 2 +KEEP_LAST = 1 +SLOW_SUBSCRIBER_DELAY_S = 1.5 + +LANE_WIDTH = 32 +SEP = "│" + + +def emit_fast(message: str) -> None: + print(f"{message:<{LANE_WIDTH}} {SEP}", flush=True) + + +def emit_slow(message: str) -> None: + print(f"{' ' * LANE_WIDTH} {SEP} {message}", flush=True) + + +def emit_header() -> None: + rule = "─" * LANE_WIDTH + print( + f"{'fast (every event)':<{LANE_WIDTH}} {SEP} " + f"slow (sleeps {SLOW_SUBSCRIBER_DELAY_S}s between events)" + ) + print(f"{rule} {SEP} {rule}") + + +async def main() -> None: + client = await Client.connect("localhost:7233") + + workflow_id = f"workflow-stream-ticker-{uuid.uuid4().hex[:8]}" + handle = await client.start_workflow( + TickerWorkflow.run, + TickerInput( + count=TICKER_COUNT, + keep_last=KEEP_LAST, + truncate_every=TRUNCATE_EVERY, + interval_ms=INTERVAL_MS, + ), + id=workflow_id, + task_queue=TASK_QUEUE, + ) + stream = WorkflowStreamClient.create(client, workflow_id) + last_n = TICKER_COUNT - 1 + + emit_header() + + async def fast_subscriber() -> None: + async for item in stream.subscribe([TOPIC_TICK], result_type=TickEvent): + emit_fast(f"offset={item.offset:>3} n={item.data.n}") + if item.data.n == last_n: + return + + async def slow_subscriber() -> None: + last_offset = -1 + async for item in stream.subscribe([TOPIC_TICK], result_type=TickEvent): + if last_offset >= 0 and item.offset > last_offset + 1: + gap = item.offset - last_offset - 1 + emit_slow( + f"↪ jumped offset={last_offset} → {item.offset} ({gap} dropped)" + ) + emit_slow(f"offset={item.offset:>3} n={item.data.n}") + last_offset = item.offset + if item.data.n == last_n: + return + await asyncio.sleep(SLOW_SUBSCRIBER_DELAY_S) + + await asyncio.gather(fast_subscriber(), slow_subscriber()) + + result = await handle.result() + print() + print(f"workflow result: {result}") + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/workflow_streams/run_worker.py b/workflow_streams/run_worker.py new file mode 100644 index 00000000..8aa12edc --- /dev/null +++ b/workflow_streams/run_worker.py @@ -0,0 +1,30 @@ +from __future__ import annotations + +import asyncio +import logging + +from temporalio.client import Client +from temporalio.worker import Worker + +from workflow_streams.activities.payment_activity import charge_card +from workflow_streams.shared import TASK_QUEUE +from workflow_streams.workflows.hub_workflow import HubWorkflow +from workflow_streams.workflows.order_workflow import OrderWorkflow +from workflow_streams.workflows.pipeline_workflow import PipelineWorkflow +from workflow_streams.workflows.ticker_workflow import TickerWorkflow + + +async def main() -> None: + logging.basicConfig(level=logging.INFO) + client = await Client.connect("localhost:7233") + worker = Worker( + client, + task_queue=TASK_QUEUE, + workflows=[HubWorkflow, OrderWorkflow, PipelineWorkflow, TickerWorkflow], + activities=[charge_card], + ) + await worker.run() + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/workflow_streams/shared.py b/workflow_streams/shared.py new file mode 100644 index 00000000..9bf5a4b7 --- /dev/null +++ b/workflow_streams/shared.py @@ -0,0 +1,70 @@ +from __future__ import annotations + +from dataclasses import dataclass + +from temporalio.contrib.workflow_streams import WorkflowStreamState + +TASK_QUEUE = "workflow-stream-sample-task-queue" + +# Topics published by the workflow / activity. +TOPIC_STATUS = "status" +TOPIC_PROGRESS = "progress" +TOPIC_NEWS = "news" +TOPIC_TICK = "tick" + + +@dataclass +class OrderInput: + order_id: str + # Carries stream state across continue-as-new. None on a fresh start. + stream_state: WorkflowStreamState | None = None + + +@dataclass +class StatusEvent: + kind: str + order_id: str + + +@dataclass +class ProgressEvent: + message: str + + +@dataclass +class PipelineInput: + pipeline_id: str + # Carries stream state across continue-as-new. None on a fresh start. + stream_state: WorkflowStreamState | None = None + + +@dataclass +class StageEvent: + stage: str + + +@dataclass +class HubInput: + hub_id: str + # Carries stream state across continue-as-new. None on a fresh start. + stream_state: WorkflowStreamState | None = None + + +@dataclass +class NewsEvent: + headline: str + + +@dataclass +class TickerInput: + count: int = 20 + keep_last: int = 3 + truncate_every: int = 5 + interval_ms: int = 400 + # Carries stream state across continue-as-new. None on a fresh start. + stream_state: WorkflowStreamState | None = None + + +@dataclass +class TickEvent: + n: int diff --git a/workflow_streams/workflows/__init__.py b/workflow_streams/workflows/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/workflow_streams/workflows/hub_workflow.py b/workflow_streams/workflows/hub_workflow.py new file mode 100644 index 00000000..5dcc3c5f --- /dev/null +++ b/workflow_streams/workflows/hub_workflow.py @@ -0,0 +1,40 @@ +from __future__ import annotations + +from datetime import timedelta + +from temporalio import workflow +from temporalio.contrib.workflow_streams import WorkflowStream + +from workflow_streams.shared import HubInput + + +@workflow.defn +class HubWorkflow: + """Passive stream host: starts up, waits, closes when told. + + Unlike OrderWorkflow or PipelineWorkflow, this workflow does no + work of its own — it exists only to host a ``WorkflowStream`` that + external publishers push events into and external subscribers read + from. The shape that fits a backend service or "event bus" pattern, + where the workflow owns durable state but the events come from + outside. + """ + + @workflow.init + def __init__(self, input: HubInput) -> None: + self.stream = WorkflowStream(prior_state=input.stream_state) + self._closed = False + + @workflow.run + async def run(self, input: HubInput) -> str: + await workflow.wait_condition(lambda: self._closed) + # The publisher publishes its own terminator into the stream + # before signaling close (see run_external_publisher.py). + # Hold the run open briefly so subscribers' final poll + # delivers any items still in the log. + await workflow.sleep(timedelta(milliseconds=500)) + return f"hub {input.hub_id} closed" + + @workflow.signal + def close(self) -> None: + self._closed = True diff --git a/workflow_streams/workflows/llm_workflow.py b/workflow_streams/workflows/llm_workflow.py new file mode 100644 index 00000000..b26cfbe4 --- /dev/null +++ b/workflow_streams/workflows/llm_workflow.py @@ -0,0 +1,51 @@ +from __future__ import annotations + +from datetime import timedelta + +from temporalio import workflow +from temporalio.common import RetryPolicy +from temporalio.contrib.workflow_streams import WorkflowStream + +from workflow_streams.llm_shared import LLMInput + +with workflow.unsafe.imports_passed_through(): + from workflow_streams.activities.llm_activity import stream_completion + + +@workflow.defn +class LLMWorkflow: + """Wrapper for an LLM-streaming activity. + + The workflow does no streaming of its own; it hosts the + `WorkflowStream` so external subscribers can attach by workflow + id, kicks off the streaming activity, and returns the full text + the activity produced. + + Streaming is delegated to the activity because the OpenAI call is + non-deterministic. If the activity fails partway through, Temporal + retries it (up to ``max_attempts``); the retried attempt + re-publishes from the start, so the consumer must reset on the + activity's ``RETRY`` event. See + `activities/llm_activity.py` and `run_llm.py`. + """ + + @workflow.init + def __init__(self, input: LLMInput) -> None: + # Construct the stream from `@workflow.init` so the + # publish-Signal handler is registered before any external + # publisher (the activity, here) tries to publish. + self.stream = WorkflowStream(prior_state=input.stream_state) + + @workflow.run + async def run(self, input: LLMInput) -> str: + result = await workflow.execute_activity( + stream_completion, + input, + start_to_close_timeout=timedelta(minutes=5), + retry_policy=RetryPolicy(maximum_attempts=3), + ) + # Hold the run open briefly so the consumer's next poll + # delivers the activity's terminal `complete` event before the + # workflow exits and the in-memory log is gone. + await workflow.sleep(timedelta(milliseconds=500)) + return result diff --git a/workflow_streams/workflows/order_workflow.py b/workflow_streams/workflows/order_workflow.py new file mode 100644 index 00000000..099634cd --- /dev/null +++ b/workflow_streams/workflows/order_workflow.py @@ -0,0 +1,58 @@ +from __future__ import annotations + +from datetime import timedelta + +from temporalio import workflow +from temporalio.contrib.workflow_streams import WorkflowStream + +from workflow_streams.shared import ( + TOPIC_PROGRESS, + TOPIC_STATUS, + OrderInput, + ProgressEvent, + StatusEvent, +) + +with workflow.unsafe.imports_passed_through(): + from workflow_streams.activities.payment_activity import charge_card + + +@workflow.defn +class OrderWorkflow: + """Process a fake order, publishing status and progress events. + + The workflow itself publishes status changes; an activity it runs + publishes finer-grained progress events using a + `WorkflowStreamClient`. A single stream carries both topics — + subscribers can filter on the topic(s) they care about. + """ + + @workflow.init + def __init__(self, input: OrderInput) -> None: + # Construct the stream from @workflow.init so it can register + # signal/update/query handlers before the workflow accepts any + # messages. Threading prior_state lets the workflow survive + # continue-as-new without losing buffered items. + self.stream = WorkflowStream(prior_state=input.stream_state) + self.status = self.stream.topic(TOPIC_STATUS, type=StatusEvent) + self.progress = self.stream.topic(TOPIC_PROGRESS, type=ProgressEvent) + + @workflow.run + async def run(self, input: OrderInput) -> str: + self.status.publish(StatusEvent(kind="received", order_id=input.order_id)) + + charge_id = await workflow.execute_activity( + charge_card, + input.order_id, + start_to_close_timeout=timedelta(seconds=30), + ) + + self.status.publish(StatusEvent(kind="shipped", order_id=input.order_id)) + self.progress.publish(ProgressEvent(message=f"charge id: {charge_id}")) + self.status.publish(StatusEvent(kind="complete", order_id=input.order_id)) + # The "complete" status event above is the in-band terminator + # subscribers break on (see run_publisher.py). Hold the run + # open briefly so subscribers' next poll delivers it before + # this task returns and the in-memory log is gone. + await workflow.sleep(timedelta(milliseconds=500)) + return charge_id diff --git a/workflow_streams/workflows/pipeline_workflow.py b/workflow_streams/workflows/pipeline_workflow.py new file mode 100644 index 00000000..83336905 --- /dev/null +++ b/workflow_streams/workflows/pipeline_workflow.py @@ -0,0 +1,48 @@ +from __future__ import annotations + +from datetime import timedelta + +from temporalio import workflow +from temporalio.contrib.workflow_streams import WorkflowStream + +from workflow_streams.shared import ( + TOPIC_STATUS, + PipelineInput, + StageEvent, +) + + +@workflow.defn +class PipelineWorkflow: + """Multi-stage pipeline that publishes stage transitions over time. + + Stages are spaced out with ``workflow.sleep`` so a subscriber can + realistically disconnect partway through and reconnect without the + pipeline finishing in the meantime — the shape needed to demo the + "show up late and still see what happened" pattern. + """ + + @workflow.init + def __init__(self, input: PipelineInput) -> None: + self.stream = WorkflowStream(prior_state=input.stream_state) + self.status = self.stream.topic(TOPIC_STATUS, type=StageEvent) + + @workflow.run + async def run(self, input: PipelineInput) -> str: + stages = [ + "validating", + "loading data", + "transforming", + "writing output", + "verifying", + "complete", + ] + for stage in stages: + self.status.publish(StageEvent(stage=stage)) + if stage != "complete": + await workflow.sleep(timedelta(seconds=2)) + # The "complete" stage above is the in-band terminator + # subscribers break on. Hold the run open briefly so the final + # poll delivers it. + await workflow.sleep(timedelta(milliseconds=500)) + return f"pipeline {input.pipeline_id} done" diff --git a/workflow_streams/workflows/ticker_workflow.py b/workflow_streams/workflows/ticker_workflow.py new file mode 100644 index 00000000..c3f37b9f --- /dev/null +++ b/workflow_streams/workflows/ticker_workflow.py @@ -0,0 +1,66 @@ +from __future__ import annotations + +from datetime import timedelta + +from temporalio import workflow +from temporalio.contrib.workflow_streams import WorkflowStream + +from workflow_streams.shared import ( + TOPIC_TICK, + TickerInput, + TickEvent, +) + + +@workflow.defn +class TickerWorkflow: + """Long-running ticker that bounds its event log via ``truncate``. + + Long-running workflows that publish high volumes of events would + otherwise grow their event log unboundedly. This workflow shows + the truncation pattern: every ``truncate_every`` events, drop + everything except the last ``keep_last`` entries by calling + ``self.stream.truncate(safe_offset)``. + + Subscribers that fall behind a truncation jump forward to the new + base offset transparently (the iterator handles the + ``TruncatedOffset`` error internally), so consumers stay live but + may not see every intermediate event. That is the trade: bounded + log size in exchange for at-best-effort delivery to slow + consumers. + + To compute the truncation offset the workflow tracks its own + published count. ``WorkflowStream`` does not expose a workflow-side + head-offset accessor, but the running count plus the carried + ``base_offset`` (in continue-as-new chains) is sufficient. + """ + + @workflow.init + def __init__(self, input: TickerInput) -> None: + self.stream = WorkflowStream(prior_state=input.stream_state) + self.tick = self.stream.topic(TOPIC_TICK, type=TickEvent) + # Running count of events published by THIS run. To compute a + # global offset, add the prior_state's base_offset (omitted + # here — this sample doesn't continue-as-new). + self._published = 0 + + @workflow.run + async def run(self, input: TickerInput) -> str: + for n in range(input.count): + self.tick.publish(TickEvent(n=n)) + self._published += 1 + await workflow.sleep(timedelta(milliseconds=input.interval_ms)) + if ( + self._published % input.truncate_every == 0 + and self._published > input.keep_last + ): + # Drop everything except the last `keep_last` entries. + truncate_to = self._published - input.keep_last + self.stream.truncate(truncate_to) + # The final tick (n == count - 1) is the in-band terminator + # subscribers break on. ``keep_last`` guarantees that final + # offset survives the last truncation so even slow consumers + # eventually see it. Hold the run open briefly so the final + # poll delivers it. + await workflow.sleep(timedelta(milliseconds=500)) + return f"ticker emitted {self._published} events"