Skip to content

Commit 209884e

Browse files
committed
Working on adding Nexus messaging sample code
1 parent ce5d8dd commit 209884e

33 files changed

Lines changed: 1308 additions & 220 deletions

nexus_messaging/README.md

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
This sample shows how to expose a long-running workflow's queries, updates, and signals as Nexus
2+
operations. There are two self-contained examples, each in its own directory:
3+
4+
| | `callerpattern/` | `ondemandpattern/` |
5+
|---|---|---|
6+
| **Pattern** | Signal an existing workflow | Create and run workflows on demand, and send signals to them |
7+
| **Who creates the workflow?** | The handler worker starts it on boot | The caller starts it via a Nexus operation |
8+
| **Who knows the workflow ID?** | Only the handler | The caller chooses and passes it in every operation |
9+
| **Nexus service** | `NexusGreetingService` | `NexusRemoteGreetingService` |
10+
11+
Each directory is fully self-contained for clarity. The `GreetingWorkflow`, activity, and
12+
`Language` enum are **identical** between the two -- only the Nexus service definition and its
13+
handler implementation differ. This highlights that the same workflow can be exposed through
14+
Nexus in different ways depending on whether the caller needs lifecycle control.
15+
16+
See each directory's README for running instructions.
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
## Entity pattern
2+
3+
The handler worker starts a `GreetingWorkflow` for a user ID.
4+
`NexusGreetingServiceHandler` holds that ID and routes every Nexus operation to it.
5+
The caller's input does not have that workflow ID as the caller doesn't know it -- but the caller
6+
sends in the User ID, and `NexusGreetingServiceHandler` knows how to get the desired workflow ID
7+
from that User ID (see the `get_workflow_id` call).
8+
9+
The handler worker uses the same `get_workflow_id` call to generate a workflow ID from a user ID
10+
when it launches the workflow.
11+
12+
The caller workflow:
13+
1. Queries for supported languages (`get_languages` -- backed by a `@workflow.query`)
14+
2. Changes the language to Arabic (`set_language` -- backed by a `@workflow.update` that calls an activity)
15+
3. Confirms the change via a second query (`get_language`)
16+
4. Approves the workflow (`approve` -- backed by a `@workflow.signal`)
17+
18+
### Sample directory structure
19+
20+
- [service.py](./service.py) - shared Nexus service definition
21+
- [caller](./caller) - a caller workflow that executes Nexus operations, together with a starter
22+
- [handler](./handler) - Nexus operation handlers, together with a workflow used by the Nexus operations, and a worker that polls for workflow, activity, and Nexus tasks
23+
24+
### Running
25+
26+
Start a Temporal server:
27+
28+
```bash
29+
temporal server start-dev
30+
```
31+
32+
Create the namespaces and Nexus endpoint:
33+
34+
```bash
35+
temporal operator namespace create --namespace nexus-messaging-handler-namespace
36+
temporal operator namespace create --namespace nexus-messaging-caller-namespace
37+
38+
temporal operator nexus endpoint create \
39+
--name nexus-messaging-nexus-endpoint \
40+
--target-namespace nexus-messaging-handler-namespace \
41+
--target-task-queue nexus-messaging-handler-task-queue
42+
```
43+
44+
In one terminal, start the handler worker:
45+
46+
```bash
47+
uv run python -m nexus_messaging.callerpattern.handler.worker
48+
```
49+
50+
In another terminal, run the caller workflow:
51+
52+
```bash
53+
uv run python -m nexus_messaging.callerpattern.caller.app
54+
```
55+
56+
Expected output:
57+
58+
```
59+
Supported languages: [<Language.CHINESE: 2>, <Language.ENGLISH: 3>]
60+
Language changed: ENGLISH -> ARABIC
61+
Workflow approved
62+
```
File renamed without changes.

nexus_sync_operations/caller/app.py renamed to nexus_messaging/callerpattern/caller/app.py

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -6,15 +6,13 @@
66
from temporalio.envconfig import ClientConfig
77
from temporalio.worker import Worker
88

9-
from nexus_sync_operations.caller.workflows import CallerWorkflow
9+
from nexus_messaging.callerpattern.caller.workflows import CallerWorkflow
1010

11-
NAMESPACE = "nexus-sync-operations-caller-namespace"
12-
TASK_QUEUE = "nexus-sync-operations-caller-task-queue"
11+
NAMESPACE = "nexus-messaging-caller-namespace"
12+
TASK_QUEUE = "nexus-messaging-caller-task-queue"
1313

1414

15-
async def execute_caller_workflow(
16-
client: Optional[Client] = None,
17-
) -> None:
15+
async def execute_caller_workflow(client: Optional[Client] = None) -> None:
1816
if client is None:
1917
config = ClientConfig.load_client_connect_config()
2018
config.setdefault("target_host", "localhost:7233")
@@ -28,7 +26,8 @@ async def execute_caller_workflow(
2826
):
2927
log = await client.execute_workflow(
3028
CallerWorkflow.run,
31-
id=str(uuid.uuid4()),
29+
arg="user-1",
30+
id=f"nexus-messaging-caller-{uuid.uuid4()}",
3231
task_queue=TASK_QUEUE,
3332
)
3433
for line in log:
Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
"""
2+
A caller workflow that executes Nexus operations. The caller does not have information
3+
about how these operations are implemented by the Nexus service.
4+
"""
5+
6+
from temporalio import workflow
7+
from temporalio.exceptions import ApplicationError
8+
9+
with workflow.unsafe.imports_passed_through():
10+
from nexus_messaging.callerpattern.service import (
11+
ApproveInput,
12+
GetLanguageInput,
13+
GetLanguagesInput,
14+
Language,
15+
NexusGreetingService,
16+
SetLanguageInput,
17+
)
18+
19+
NEXUS_ENDPOINT = "nexus-messaging-nexus-endpoint"
20+
21+
22+
@workflow.defn
23+
class CallerWorkflow:
24+
@workflow.run
25+
async def run(self, user_id: str) -> list[str]:
26+
log: list[str] = []
27+
nexus_client = workflow.create_nexus_client(
28+
service=NexusGreetingService,
29+
endpoint=NEXUS_ENDPOINT,
30+
)
31+
32+
# Call a Nexus operation backed by a query against the entity workflow.
33+
# The workflow must already be running on the handler, otherwise you will
34+
# get an error saying the workflow has already terminated.
35+
languages_output = await nexus_client.execute_operation(
36+
NexusGreetingService.get_languages,
37+
GetLanguagesInput(include_unsupported=False, user_id=user_id),
38+
)
39+
log.append(f"Supported languages: {languages_output.languages}")
40+
workflow.logger.info("Supported languages: %s", languages_output.languages)
41+
42+
# Following are examples for each of the three messaging types -
43+
# update, query, then signal.
44+
45+
# Call a Nexus operation backed by an update against the entity workflow.
46+
previous_language = await nexus_client.execute_operation(
47+
NexusGreetingService.set_language,
48+
SetLanguageInput(language=Language.ARABIC, user_id=user_id),
49+
)
50+
51+
# Call a Nexus operation backed by a query to confirm the language change.
52+
current_language = await nexus_client.execute_operation(
53+
NexusGreetingService.get_language,
54+
GetLanguageInput(user_id=user_id),
55+
)
56+
if current_language != Language.ARABIC:
57+
raise ApplicationError(f"Expected language ARABIC, got {current_language}")
58+
59+
log.append(
60+
f"Language changed: {previous_language.name} -> {Language.ARABIC.name}"
61+
)
62+
workflow.logger.info(
63+
"Language changed from %s to %s", previous_language, Language.ARABIC
64+
)
65+
66+
# Call a Nexus operation backed by a signal against the entity workflow.
67+
await nexus_client.execute_operation(
68+
NexusGreetingService.approve,
69+
ApproveInput(name="caller", user_id=user_id),
70+
)
71+
log.append("Workflow approved")
72+
workflow.logger.info("Workflow approved")
73+
74+
return log

nexus_messaging/callerpattern/handler/__init__.py

Whitespace-only changes.
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
import asyncio
2+
from typing import Optional
3+
4+
from temporalio import activity
5+
6+
from nexus_messaging.callerpattern.service import Language
7+
8+
9+
@activity.defn
10+
async def call_greeting_service(language: Language) -> Optional[str]:
11+
"""Simulates a call to a remote greeting service. Returns None if unsupported."""
12+
greetings = {
13+
Language.ARABIC: "\u0645\u0631\u062d\u0628\u0627 \u0628\u0627\u0644\u0639\u0627\u0644\u0645",
14+
Language.CHINESE: "\u4f60\u597d\uff0c\u4e16\u754c",
15+
Language.ENGLISH: "Hello, world",
16+
Language.FRENCH: "Bonjour, monde",
17+
Language.HINDI: "\u0928\u092e\u0938\u094d\u0924\u0947 \u0926\u0941\u0928\u093f\u092f\u093e",
18+
Language.PORTUGUESE: "Ol\u00e1 mundo",
19+
Language.SPANISH: "Hola mundo",
20+
}
21+
await asyncio.sleep(0.2)
22+
return greetings.get(language)
Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
"""
2+
Nexus operation handler implementation for the entity pattern. Each operation receives a
3+
user_id, which is mapped to a workflow ID. The operations are synchronous because queries
4+
and updates against a running workflow complete quickly.
5+
"""
6+
7+
from __future__ import annotations
8+
9+
import nexusrpc
10+
from temporalio import nexus
11+
from temporalio.client import WorkflowHandle
12+
13+
from nexus_messaging.callerpattern.handler.workflows import GreetingWorkflow
14+
from nexus_messaging.callerpattern.service import (
15+
ApproveInput,
16+
ApproveOutput,
17+
GetLanguageInput,
18+
GetLanguagesInput,
19+
GetLanguagesOutput,
20+
Language,
21+
NexusGreetingService,
22+
SetLanguageInput,
23+
)
24+
25+
WORKFLOW_ID_PREFIX = "GreetingWorkflow_for_"
26+
27+
28+
def get_workflow_id(user_id: str) -> str:
29+
"""Map a user ID to a workflow ID.
30+
31+
This example assumes you might have multiple workflows, one for each user.
32+
If you had a single workflow for all users, you could remove this function,
33+
remove the user_id from each input, and just use a single workflow ID.
34+
"""
35+
return f"{WORKFLOW_ID_PREFIX}{user_id}"
36+
37+
38+
@nexusrpc.handler.service_handler(service=NexusGreetingService)
39+
class NexusGreetingServiceHandler:
40+
def _get_workflow_handle(
41+
self, user_id: str
42+
) -> WorkflowHandle[GreetingWorkflow, str]:
43+
return nexus.client().get_workflow_handle_for(
44+
GreetingWorkflow.run, get_workflow_id(user_id)
45+
)
46+
47+
@nexusrpc.handler.sync_operation
48+
async def get_languages(
49+
self, ctx: nexusrpc.handler.StartOperationContext, input: GetLanguagesInput
50+
) -> GetLanguagesOutput:
51+
return await self._get_workflow_handle(input.user_id).query(
52+
GreetingWorkflow.get_languages, input
53+
)
54+
55+
@nexusrpc.handler.sync_operation
56+
async def get_language(
57+
self, ctx: nexusrpc.handler.StartOperationContext, input: GetLanguageInput
58+
) -> Language:
59+
return await self._get_workflow_handle(input.user_id).query(
60+
GreetingWorkflow.get_language
61+
)
62+
63+
# Routes to set_language_using_activity (not set_language) so that new languages not
64+
# already in the greetings map can be fetched via an activity.
65+
@nexusrpc.handler.sync_operation
66+
async def set_language(
67+
self, ctx: nexusrpc.handler.StartOperationContext, input: SetLanguageInput
68+
) -> Language:
69+
return await self._get_workflow_handle(input.user_id).execute_update(
70+
GreetingWorkflow.set_language_using_activity, input
71+
)
72+
73+
@nexusrpc.handler.sync_operation
74+
async def approve(
75+
self, ctx: nexusrpc.handler.StartOperationContext, input: ApproveInput
76+
) -> ApproveOutput:
77+
await self._get_workflow_handle(input.user_id).signal(
78+
GreetingWorkflow.approve, input
79+
)
80+
return ApproveOutput()

0 commit comments

Comments
 (0)