Skip to content

Commit 2ab9dfa

Browse files
committed
Add a test
1 parent 69751bb commit 2ab9dfa

File tree

4 files changed

+119
-2
lines changed

4 files changed

+119
-2
lines changed

resource_locking/resource_locking_workflow.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ async def run(self, input: ResourceLockingWorkflowInput):
6060
async with ResourceAllocator.acquire_resource(
6161
already_acquired_resource=input.already_acquired_resource
6262
) as resource:
63-
for iteration in ["first", "second", "third"]:
63+
for iteration in ["first", "second"]:
6464
await workflow.execute_activity(
6565
use_resource,
6666
UseResourceActivityInput(resource.resource, iteration),

resource_locking/shared.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
from dataclasses import dataclass, field
2-
from typing import Optional
32

43
LOCK_MANAGER_WORKFLOW_ID = "lock_manager"
54

tests/resource_locking/__init__.py

Whitespace-only changes.
Lines changed: 118 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,118 @@
1+
import asyncio
2+
import uuid
3+
from collections import defaultdict
4+
from datetime import timedelta
5+
from typing import Any, Optional, Sequence
6+
7+
from temporalio import activity
8+
from temporalio.client import Client, WorkflowFailureError, WorkflowHandle
9+
from temporalio.common import WorkflowIDConflictPolicy
10+
from temporalio.worker import Worker
11+
12+
from resource_locking.lock_manager_workflow import (
13+
LockManagerWorkflow,
14+
LockManagerWorkflowInput,
15+
)
16+
from resource_locking.resource_allocator import ResourceAllocator
17+
from resource_locking.resource_locking_workflow import (
18+
ResourceLockingWorkflow,
19+
ResourceLockingWorkflowInput,
20+
UseResourceActivityInput,
21+
)
22+
from resource_locking.shared import LOCK_MANAGER_WORKFLOW_ID
23+
24+
TASK_QUEUE = "default"
25+
26+
27+
async def test_resource_locking_workflow(client: Client):
28+
# key is resource, value is a description of resource usage
29+
resource_usage: defaultdict[str, list[Sequence[str]]] = defaultdict(list)
30+
31+
# Mock out the activity to count executions
32+
@activity.defn(name="use_resource")
33+
async def use_resource_mock(input: UseResourceActivityInput) -> None:
34+
workflow_id = activity.info().workflow_id
35+
resource_usage[input.resource].append((workflow_id, "start"))
36+
# We need a small sleep here to bait out races
37+
await asyncio.sleep(0.05)
38+
resource_usage[input.resource].append((workflow_id, "end"))
39+
40+
resource_allocator = ResourceAllocator(client)
41+
42+
async with Worker(
43+
client,
44+
task_queue=TASK_QUEUE,
45+
workflows=[LockManagerWorkflow, ResourceLockingWorkflow],
46+
activities=[use_resource_mock, resource_allocator.send_acquire_signal],
47+
):
48+
await run_all_workflows(client)
49+
50+
# Did any workflow run in more than one place?
51+
workflow_id_to_resource: dict[str, str] = {}
52+
for resource, events in resource_usage.items():
53+
for workflow_id, event in events:
54+
if workflow_id in workflow_id_to_resource:
55+
existing_resource = workflow_id_to_resource[workflow_id]
56+
assert (
57+
existing_resource == resource
58+
), f"{workflow_id} ran on both {resource} and {existing_resource}"
59+
else:
60+
workflow_id_to_resource[workflow_id] = resource
61+
62+
# Did any resource have more than one workflow on it at a time?
63+
for resource, events in resource_usage.items():
64+
holder: Optional[str] = None
65+
for workflow_id, event in events:
66+
if event == "start":
67+
assert (
68+
holder is None
69+
), f"{workflow_id} started on {resource} held by {holder}"
70+
holder = workflow_id
71+
else:
72+
assert (
73+
holder == workflow_id
74+
), f"{workflow_id} ended on {resource} held by {holder}"
75+
holder = None
76+
77+
78+
async def run_all_workflows(client: Client):
79+
resource_locking_handles: list[WorkflowHandle[Any, Any]] = []
80+
for i in range(0, 8):
81+
input = ResourceLockingWorkflowInput(
82+
iteration_to_fail_after=None,
83+
should_continue_as_new=False,
84+
)
85+
if i == 0:
86+
input.should_continue_as_new = True
87+
if i == 1:
88+
input.iteration_to_fail_after = "first"
89+
90+
resource_locking_handle = await client.start_workflow(
91+
workflow=ResourceLockingWorkflow.run,
92+
arg=input,
93+
id=f"resource-locking-workflow-{i}",
94+
task_queue=TASK_QUEUE,
95+
)
96+
resource_locking_handles.append(resource_locking_handle)
97+
98+
# Add some resources
99+
lock_manager_handle = await client.start_workflow(
100+
workflow=LockManagerWorkflow.run,
101+
arg=LockManagerWorkflowInput(
102+
resources={},
103+
waiters=[],
104+
),
105+
id=LOCK_MANAGER_WORKFLOW_ID,
106+
task_queue="default",
107+
id_conflict_policy=WorkflowIDConflictPolicy.USE_EXISTING,
108+
start_signal="add_resources",
109+
start_signal_args=[["r_a", "r_b", "r_c"]],
110+
)
111+
112+
for resource_locking_handle in resource_locking_handles:
113+
try:
114+
await resource_locking_handle.result()
115+
except WorkflowFailureError:
116+
pass
117+
118+
await lock_manager_handle.terminate()

0 commit comments

Comments
 (0)