Skip to content

Commit 3eab7c0

Browse files
committed
nexus_sync_operations
1 parent 326204a commit 3eab7c0

File tree

15 files changed

+355
-6
lines changed

15 files changed

+355
-6
lines changed

hello_nexus/README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ call the operations from a workflow.
1212

1313
Start a Temporal server. (See the main samples repo [README](../README.md)).
1414

15-
Run the following:
15+
Run the following to create the caller and handler namespaces, and the Nexus endpoint:
1616

1717
```
1818
temporal operator namespace create --namespace hello-nexus-basic-handler-namespace

hello_nexus/service.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,8 @@
99
type-safe clients, and it is used by Nexus handlers to validate that they implement
1010
correctly-named operation handlers with the correct input and output types.
1111
12-
The service defined in this file features two operations: echo and hello.
12+
The service defined in this file exposes two operations: my_sync_operation and
13+
my_workflow_run_operation.
1314
"""
1415

1516
from dataclasses import dataclass

message_passing/introduction/starter.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,8 @@ async def main(client: Optional[Client] = None):
3030
previous_language = await wf_handle.execute_update(
3131
GreetingWorkflow.set_language, Language.CHINESE
3232
)
33-
current_language = await wf_handle.query(GreetingWorkflow.get_language)
34-
print(f"language changed: {previous_language.name} -> {current_language.name}")
33+
assert await wf_handle.query(GreetingWorkflow.get_language) == Language.CHINESE
34+
print(f"language changed: {previous_language.name} -> {Language.CHINESE.name}")
3535

3636
# 👉 Start an Update and then wait for it to complete
3737
update_handle = await wf_handle.start_update(
@@ -40,8 +40,8 @@ async def main(client: Optional[Client] = None):
4040
wait_for_stage=WorkflowUpdateStage.ACCEPTED,
4141
)
4242
previous_language = await update_handle.result()
43-
current_language = await wf_handle.query(GreetingWorkflow.get_language)
44-
print(f"language changed: {previous_language.name} -> {current_language.name}")
43+
assert await wf_handle.query(GreetingWorkflow.get_language) == Language.ARABIC
44+
print(f"language changed: {previous_language.name} -> {Language.ARABIC.name}")
4545

4646
# 👉 Send a Signal
4747
await wf_handle.signal(GreetingWorkflow.approve, ApproveInput(name=""))

message_passing/introduction/workflows.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,16 @@ class GetLanguagesInput:
1616
include_unsupported: bool
1717

1818

19+
@dataclass
20+
class SetLanguageInput:
21+
language: Language
22+
23+
24+
@dataclass
25+
class SetLanguageUsingActivityInput:
26+
language: Language
27+
28+
1929
@dataclass
2030
class ApproveInput:
2131
name: str

nexus_sync_operations/README.md

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
This sample shows how to create a Nexus service that is backed by a long-running workflow and
2+
exposes operations that use signals, queries, and updates against that workflow.
3+
4+
### Sample directory structure
5+
6+
- [service.py](./service.py) - shared Nexus service definition
7+
- [caller](./caller) - a caller workflow that executes Nexus operations, together with a worker and starter code
8+
- [handler](./handler) - Nexus operation handlers, together with a workflow used by one of the Nexus operations, and a worker that polls for both workflow, activity, and Nexus tasks.
9+
10+
11+
### Instructions
12+
13+
Start a Temporal server. (See the main samples repo [README](../README.md)).
14+
15+
Run the following to create the caller and handler namespaces, and the Nexus endpoint:
16+
17+
```
18+
temporal operator namespace create --namespace nexus-sync-operations-handler-namespace
19+
temporal operator namespace create --namespace nexus-sync-operations-caller-namespace
20+
21+
temporal operator nexus endpoint create \
22+
--name nexus-sync-operations-nexus-endpoint \
23+
--target-namespace nexus-sync-operations-handler-namespace \
24+
--target-task-queue nexus-sync-operations-handler-task-queue \
25+
--description-file nexus_sync_operations/endpoint_description.md
26+
```
27+
28+
In one terminal, run the Temporal worker in the handler namespace:
29+
```
30+
uv run nexus_sync_operations/handler/worker.py
31+
```
32+
33+
In another terminal, run the Temporal worker in the caller namespace and start the caller
34+
workflow:
35+
```
36+
uv run nexus_sync_operations/caller/app.py
37+
```

nexus_sync_operations/__init__.py

Whitespace-only changes.

nexus_sync_operations/caller/__init__.py

Whitespace-only changes.
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
import asyncio
2+
import uuid
3+
from typing import Optional
4+
5+
from temporalio.client import Client
6+
from temporalio.worker import Worker
7+
8+
from nexus_sync_operations.caller.workflows import CallerWorkflow
9+
10+
NAMESPACE = "nexus-sync-operations-caller-namespace"
11+
TASK_QUEUE = "nexus-sync-operations-caller-task-queue"
12+
13+
14+
async def execute_caller_workflow(
15+
client: Optional[Client] = None,
16+
) -> None:
17+
client = client or await Client.connect(
18+
"localhost:7233",
19+
namespace=NAMESPACE,
20+
)
21+
22+
async with Worker(
23+
client,
24+
task_queue=TASK_QUEUE,
25+
workflows=[CallerWorkflow],
26+
):
27+
await client.execute_workflow(
28+
CallerWorkflow.run,
29+
id=str(uuid.uuid4()),
30+
task_queue=TASK_QUEUE,
31+
)
32+
33+
34+
if __name__ == "__main__":
35+
loop = asyncio.new_event_loop()
36+
try:
37+
loop.run_until_complete(execute_caller_workflow())
38+
except KeyboardInterrupt:
39+
loop.run_until_complete(loop.shutdown_asyncgens())
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
from message_passing.introduction import Language
2+
from message_passing.introduction.workflows import (
3+
ApproveInput,
4+
GetLanguagesInput,
5+
SetLanguageInput,
6+
SetLanguageUsingActivityInput,
7+
)
8+
from temporalio import workflow
9+
10+
with workflow.unsafe.imports_passed_through():
11+
from nexus_sync_operations.service import GreetingService
12+
13+
NEXUS_ENDPOINT = "nexus-sync-operations-nexus-endpoint"
14+
15+
16+
@workflow.defn
17+
class CallerWorkflow:
18+
@workflow.run
19+
async def run(self) -> None:
20+
nexus_client = workflow.create_nexus_client(
21+
service=GreetingService,
22+
endpoint=NEXUS_ENDPOINT,
23+
)
24+
25+
# Get supported languages
26+
supported_languages = await nexus_client.execute_operation(
27+
GreetingService.get_languages, GetLanguagesInput(include_unsupported=False)
28+
)
29+
print(f"supported languages: {supported_languages}")
30+
31+
# Set language
32+
previous_language = await nexus_client.execute_operation(
33+
GreetingService.set_language, SetLanguageInput(language=Language.CHINESE)
34+
)
35+
assert (
36+
await nexus_client.execute_operation(GreetingService.get_language, None)
37+
== Language.CHINESE
38+
)
39+
print(f"language changed: {previous_language.name} -> {Language.CHINESE.name}")
40+
41+
# Set language using remote service
42+
previous_language = await nexus_client.execute_operation(
43+
GreetingService.set_language_using_activity,
44+
SetLanguageUsingActivityInput(language=Language.ARABIC),
45+
)
46+
assert (
47+
await nexus_client.execute_operation(GreetingService.get_language, None)
48+
== Language.ARABIC
49+
)
50+
print(f"language changed: {previous_language.name} -> {Language.ARABIC.name}")
51+
52+
# Approve
53+
await nexus_client.execute_operation(
54+
GreetingService.approve, ApproveInput(name="")
55+
)
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
## Service: [GreetingService](https://github.com/temporalio/samples-python/blob/main/nexus_sync_operations/service.py)
2+
- operation: `get_languages`
3+
- operation: `get_language`
4+
- operation: `set_language`
5+
- operation: `set_language_using_activity`
6+
- operation: `approve`

0 commit comments

Comments
 (0)