Skip to content

Commit be7c015

Browse files
authored
Add Distributed Lock (#46)
Added a distributed lock
1 parent 1a7f76c commit be7c015

File tree

4 files changed

+207
-1
lines changed

4 files changed

+207
-1
lines changed

thingsdb/misc/README.md

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
# Lock
2+
3+
This lock provides distributed mutual exclusion, allowing you to synchronize
4+
access to shared resources or critical sections of code across multiple
5+
independent Python programs or services, even if they are running on different
6+
machines.
7+
8+
It functions similarly to `asyncio.Lock()`, which is designed for concurrency
9+
within a single process, but extends this capability to a multi-process,
10+
multi-host environment by leveraging ThingsDB as its backend. This ensures that
11+
only one client can acquire the lock at any given time, preventing race
12+
conditions and maintaining data integrity in a distributed system.
13+
14+
The `timeout` parameter defines the maximum duration a lock can be held.
15+
If a client fails to explicitly release the lock (e.g., due to a crash),
16+
ThingsDB will automatically release it after this period, preventing deadlocks.
17+
Separately, the expression `queue_size * timeout` indicates the total maximum
18+
time a client will actively attempt to acquire the lock if it's currently
19+
unavailable.
20+
21+
Example code:
22+
23+
```python
24+
import asyncio
25+
from functools import partial
26+
from thingsdb.client import Client
27+
from thingsdb.misc import lock
28+
29+
30+
async def main():
31+
# ThingsDB client
32+
client = Client()
33+
34+
# Multiple locks may be created, make sure you give each lock a unique name
35+
mylock = partial(lock.lock, client=client, name='my-lock', timeout=5)
36+
37+
await client.connect('localhost')
38+
try:
39+
await client.authenticate('admin', 'pass')
40+
41+
# This will set-up a lock collection
42+
# It will only do work the first time, but no harm in keep calling
43+
await lock.setup(client)
44+
45+
# Wait for a lock
46+
async with mylock():
47+
print('In here')
48+
await asyncio.sleep(5.0) # simulate some work
49+
print('Done here')
50+
51+
finally:
52+
await client.close_and_wait()
53+
54+
55+
if __name__ == '__main__':
56+
asyncio.run(main())
57+
```
58+
59+
To observe the distributed lock in action, you can execute the example Python
60+
script simultaneously in multiple separate terminal windows.
61+
62+
You can determine if a specific distributed lock is currently held by using
63+
the `lock.locked()` asynchronous function.
64+
65+
To check the lock's status:
66+
67+
```python
68+
is_locked = await lock.locked(client, 'my-lock')
69+
```

thingsdb/misc/__init__.py

Whitespace-only changes.

thingsdb/misc/lock.py

Lines changed: 137 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,137 @@
1+
import asyncio
2+
from contextlib import asynccontextmanager
3+
from typing import AsyncGenerator
4+
from ..client import Client
5+
from ..room import Room, event
6+
7+
8+
async def setup(client: Client, collection: str = 'lock'):
9+
has_collection = await client.query("""//ti
10+
has_collection(name);
11+
""", name=collection, scope='/t')
12+
13+
if has_collection:
14+
return
15+
16+
await client.query("""//ti
17+
new_collection(name);
18+
""", name=collection, scope='/t')
19+
20+
await client.query("""//ti
21+
22+
set_type('Inner', {
23+
room: 'room',
24+
task: 'task',
25+
timeout: 'int',
26+
set_task: |this, lock_id| {
27+
this.task = task(
28+
datetime().move('seconds', this.timeout),
29+
|_, lock_id, room_id| wse(Lock(lock_id).release(room_id)),
30+
[lock_id, this.room.id()],
31+
);
32+
nil;
33+
},
34+
});
35+
36+
set_type('Lock', {
37+
queue: '[Inner]',
38+
go: |this| {
39+
if (!this.queue) return nil;
40+
inner = this.queue.first();
41+
inner.set_task(this.id());
42+
inner.room.set_name('go');
43+
inner.room.emit('go');
44+
},
45+
acquire: |this, timeout| {
46+
size = this.queue.len();
47+
immediately = size == 0;
48+
inner = Inner{timeout:,};
49+
this.queue.push(inner);
50+
immediately && inner.set_task(this.id());
51+
[immediately, inner.room.id(), size];
52+
},
53+
release: |this, room_id| try({
54+
if (this.queue.first().room.id() == room_id) {
55+
this.queue.first().task.del();
56+
this.queue.shift();
57+
this.go();
58+
} else {
59+
this.queue
60+
.remove(|inner| inner.room.id() == room_id)
61+
.each(|inner| inner.task.del());
62+
};
63+
nil;
64+
}),
65+
});
66+
67+
set_type('Root', {
68+
lock: 'thing<Lock>',
69+
version: 'int'
70+
});
71+
72+
new_procedure('acquire', |name, timeout| {
73+
.lock.get(name, .lock[name] = Lock{}).acquire(timeout);
74+
});
75+
76+
new_procedure('test', |room_id| {
77+
room(room_id).name() == 'go';
78+
});
79+
80+
new_procedure('locked', |name| {
81+
bool(.lock[name].queue);
82+
});
83+
84+
new_procedure('release', |name, room_id| {
85+
wse(.lock[name].release(room_id));
86+
});
87+
88+
.to_type('Root');
89+
""", scope=f'//{collection}')
90+
91+
92+
class _InnerRoom(Room):
93+
94+
future: asyncio.Future
95+
96+
def on_init(self) -> None:
97+
self.future = asyncio.Future()
98+
99+
async def on_join(self) -> None:
100+
# We might have missed the event during the join. If so, set the
101+
# future result to continue.
102+
ok = await self.client.run('test', self.id, scope=self.scope)
103+
if ok and not self.future.done():
104+
self.future.set_result(None)
105+
106+
@event('go')
107+
def on_go(self):
108+
if not self.future.done():
109+
self.future.set_result(None)
110+
111+
112+
@asynccontextmanager
113+
async def lock(client: Client, name: str,
114+
scope: str = '//lock',
115+
timeout: int = 60) -> AsyncGenerator[int, None]:
116+
117+
res: tuple[bool, int, int] = \
118+
await client.run('acquire', name, timeout, scope=scope)
119+
120+
immediately, room_id, size = res
121+
if not immediately:
122+
room = _InnerRoom(room_id, scope=scope)
123+
await room.join(client, wait=None)
124+
try:
125+
await asyncio.wait_for(room.future, timeout=timeout*size)
126+
except asyncio.TimeoutError:
127+
pass
128+
129+
try:
130+
yield room_id # Lock Id assigned to the 'as' target (not required)
131+
finally:
132+
await client.run('release', name, room_id, scope=scope)
133+
134+
135+
async def locked(client: Client, name: str, scope: str = '//lock') -> bool:
136+
res: bool = await client.run('locked', name, scope=scope)
137+
return res

thingsdb/version.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
__version__ = '1.1.6'
1+
__version__ = '1.1.7'

0 commit comments

Comments
 (0)