Skip to content

Commit 0b01713

Browse files
committed
fix: ensure that only one worker processing task
1 parent d6118db commit 0b01713

File tree

13 files changed

+250
-22
lines changed

13 files changed

+250
-22
lines changed

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[![PyPI - Python Version](https://img.shields.io/pypi/pyversions/taskiq-postgres?style=for-the-badge&logo=python)](https://pypi.org/project/taskiq-postgres/)
22
[![PyPI](https://img.shields.io/pypi/v/taskiq-postgres?style=for-the-badge&logo=pypi)](https://pypi.org/project/taskiq-postgres/)
3-
[![Checks](https://img.shields.io/github/check-runs/danfimov/taskiq-postgres/main?style=for-the-badge&logo=pytest)](https://github.com/danfimov/taskiq-postgres)
3+
[![Checks](https://img.shields.io/github/check-runs/danfimov/taskiq-postgres/main?nameFilter=Tests%20(3.12)&style=for-the-badge)](https://github.com/danfimov/taskiq-postgres)
44

55
<div align="center">
66
<a href="https://github.com/danfimov/taskiq-postgres/"><img src="https://raw.githubusercontent.com/danfimov/taskiq-postgres/main/assets/logo.png" width=400></a>

docs/contributing.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
---
2+
title: Contributing
3+
---

docs/tutorial/broker.md

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
---
2+
title: Broker
3+
---
4+
5+
To use broker with PostgreSQL you need to import broker and result backend from this library and provide a address for connection. For example, lets create a file `broker.py` with the following content:
6+
7+
=== "asyncpg"
8+
9+
```python
10+
import asyncio
11+
from taskiq_pg.asyncpg import AsyncpgResultBackend, AsyncpgBroker
12+
13+
14+
dsn = "postgres://postgres:postgres@localhost:5432/postgres"
15+
broker = AsyncpgBroker(dsn).with_result_backend(AsyncpgResultBackend(dsn))
16+
17+
18+
@broker.task
19+
async def best_task_ever() -> None:
20+
"""Solve all problems in the world."""
21+
await asyncio.sleep(5.5)
22+
print("All problems are solved!")
23+
24+
25+
async def main():
26+
await broker.startup()
27+
task = await best_task_ever.kiq()
28+
print(await task.wait_result())
29+
await broker.shutdown()
30+
31+
32+
if __name__ == "__main__":
33+
asyncio.run(main())
34+
```
35+
36+
=== "psqlpy"
37+
38+
```python
39+
import asyncio
40+
from taskiq_pg.psqlpy import PSQLPyResultBackend, PSQLPyBroker
41+
42+
43+
dsn = "postgres://postgres:postgres@localhost:5432/postgres"
44+
broker = PSQLPyBroker(dsn).with_result_backend(PSQLPyResultBackend(dsn))
45+
46+
47+
@broker.task
48+
async def best_task_ever() -> None:
49+
"""Solve all problems in the world."""
50+
await asyncio.sleep(5.5)
51+
print("All problems are solved!")
52+
53+
54+
async def main():
55+
await broker.startup()
56+
task = await best_task_ever.kiq()
57+
print(await task.wait_result())
58+
await broker.shutdown()
59+
60+
61+
if __name__ == "__main__":
62+
asyncio.run(main())
63+
```
64+
65+
Then you can run this file with:
66+
67+
```bash
68+
python broker.py
69+
```

docs/tutorial/quickstart.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
---
2+
title: Getting Started
3+
---

docs/tutorial/schedule_source.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
---
2+
title: Schedule Source
3+
---

examples/example_with_broker.py

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
"""
2+
How to run:
3+
4+
1) Run worker in one terminal:
5+
uv run taskiq worker examples.example_with_broker:broker
6+
7+
2) Run this script in another terminal:
8+
uv run python -m examples.example_with_broker
9+
"""
10+
11+
import asyncio
12+
13+
from taskiq_pg.asyncpg import AsyncpgBroker, AsyncpgResultBackend
14+
15+
16+
dsn = "postgres://taskiq_postgres:look_in_vault@localhost:5432/taskiq_postgres"
17+
broker = AsyncpgBroker(dsn).with_result_backend(AsyncpgResultBackend(dsn))
18+
19+
20+
@broker.task("solve_all_problems")
21+
async def best_task_ever() -> None:
22+
"""Solve all problems in the world."""
23+
await asyncio.sleep(2)
24+
print("All problems are solved!")
25+
26+
27+
async def main():
28+
await broker.startup()
29+
task = await best_task_ever.kiq()
30+
print(await task.wait_result())
31+
await broker.shutdown()
32+
33+
34+
if __name__ == "__main__":
35+
asyncio.run(main())

mkdocs.yml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,10 @@ edit_uri: edit/main/docs/
88
nav:
99
- Overview:
1010
- index.md
11+
- Tutorial:
12+
- tutorial/quickstart.md
13+
- tutorial/broker.md
14+
- tutorial/schedule_source.md
1115
- API:
1216
- reference.md
1317
- Contributing:

pyproject.toml

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -143,10 +143,19 @@ ignore = [
143143

144144
"S101", # assert usage
145145
"S311", # pseudo-random generators are not suitable for cryptographic purposes
146+
"S608",
147+
148+
"RUF",
146149
]
147150
"tests/test_linting.py" = [
148151
"S603", # subprocess usage
149152
]
153+
"examples/*" = [
154+
"T201",
155+
"D",
156+
"ANN",
157+
"INP001",
158+
]
150159

151160
[tool.ruff.lint.isort]
152161
known-local-folder = ["taskiq_pg"]

src/taskiq_pg/asyncpg/broker.py

Lines changed: 6 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -10,10 +10,10 @@
1010

1111
from taskiq_pg._internal.broker import BasePostgresBroker
1212
from taskiq_pg.asyncpg.queries import (
13+
CLAIM_MESSAGE_QUERY,
1314
CREATE_MESSAGE_TABLE_QUERY,
1415
DELETE_MESSAGE_QUERY,
1516
INSERT_MESSAGE_QUERY,
16-
SELECT_MESSAGE_QUERY,
1717
)
1818

1919

@@ -142,20 +142,14 @@ async def listen(self) -> AsyncGenerator[AckableMessage, None]:
142142
try:
143143
payload = await self._queue.get()
144144
message_id = int(payload)
145-
message_row = await self.read_conn.fetchrow(
146-
SELECT_MESSAGE_QUERY.format(self.table_name),
147-
message_id,
148-
)
149-
if message_row is None:
150-
logger.warning(
151-
"Message with id %s not found in database.",
145+
async with self.write_pool.acquire() as conn:
146+
claimed = await conn.fetchrow(
147+
CLAIM_MESSAGE_QUERY.format(self.table_name),
152148
message_id,
153149
)
150+
if claimed is None:
154151
continue
155-
if message_row.get("message") is None:
156-
msg = "Message row does not have 'message' column"
157-
raise ValueError(msg)
158-
message_str = message_row["message"]
152+
message_str = claimed["message"]
159153
if not isinstance(message_str, str):
160154
msg = "message is not a string"
161155
raise TypeError(msg)

src/taskiq_pg/asyncpg/queries.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
task_name VARCHAR NOT NULL,
3838
message TEXT NOT NULL,
3939
labels JSONB NOT NULL,
40+
status TEXT NOT NULL DEFAULT 'pending',
4041
created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()
4142
);
4243
"""
@@ -47,6 +48,6 @@
4748
RETURNING id
4849
"""
4950

50-
SELECT_MESSAGE_QUERY = "SELECT * FROM {} WHERE id = $1"
51+
CLAIM_MESSAGE_QUERY = "UPDATE {} SET status = 'processing' WHERE id = $1 AND status = 'pending' RETURNING id, message"
5152

5253
DELETE_MESSAGE_QUERY = "DELETE FROM {} WHERE id = $1"

0 commit comments

Comments
 (0)