Skip to content

Commit 701c1f3

Browse files
authored
feat: add ability to store and get progress (#36)
2 parents e65856e + 040a4c7 commit 701c1f3

21 files changed

+1010
-417
lines changed

.pre-commit-config.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ repos:
2121
- id: end-of-file-fixer
2222

2323
- repo: https://github.com/crate-ci/typos
24-
rev: v1.38.1
24+
rev: v1.40.0
2525
hooks:
2626
- id: typos
2727

docs/tutorial/result_backend.md

Lines changed: 115 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,115 @@
1+
---
2+
title: Result Backend
3+
---
4+
5+
## Basic usage
6+
7+
You can store task results in Postgres using one of result backend classes from this package.
8+
9+
You can define your broker with result backend like this:
10+
11+
```python
12+
import asyncio
13+
from taskiq import TaskiqBroker
14+
# 1. Import AsyncpgBroker and AsyncpgResultBackend (or other result backend you want to use)
15+
from taskiq_pg.asyncpg import AsyncpgBroker, AsyncpgResultBackend
16+
17+
# 2. Define your broker with result backend
18+
dsn = "postgres://taskiq_postgres:look_in_vault@localhost:5432/taskiq_postgres"
19+
broker = AsyncpgBroker(dsn).with_result_backend(AsyncpgResultBackend(dsn=dsn)
20+
21+
# 3. Register task
22+
@broker.task(task_name="answer_for_everything")
23+
async def answer_for_everything() -> None:
24+
await asyncio.sleep(2)
25+
return 42
26+
27+
async def main():
28+
# 4. Start broker, call task and wait for result
29+
await broker.startup()
30+
task = await best_task_ever.kiq()
31+
print(await task.wait_result())
32+
await broker.shutdown()
33+
34+
35+
if __name__ == "__main__":
36+
asyncio.run(main())
37+
```
38+
39+
After running this code, you should see `42` printed in the console. Plus the result will be stored in the Postgres database in `taskiq_results` (by default).
40+
41+
## Customization
42+
43+
You can customize the result backend by providing additional parameters to the constructor.
44+
45+
- `keep_results` - whatever to keep results after they are fetched. Default is `True`. Suitable if you don't want to store results forever.
46+
- `table_name` - name of the table to store results in. Default is `taskiq_results`.
47+
- `field_for_task_id` - type of the field to store task IDs. Default is `VarChar`. But you can pick `Uuid` or `Text` if you want.
48+
- `serializer` - serializer to use for serializing results. Default is `PickleSerializer`. But if you want human readable results you can use `JsonSerializer` from `taskiq.serializers` for example.
49+
50+
## Task progress
51+
52+
You can also store task progress using result backend. To do this, you need to use `set_progress` method from `ProgressTracker`:
53+
54+
```python
55+
import asyncio
56+
from taskiq import TaskiqBroker
57+
# 1. Import AsyncpgBroker and AsyncpgResultBackend (or other result backend you want to use)
58+
from taskiq_pg.asyncpg import AsyncpgBroker, AsyncpgResultBackend
59+
60+
# 2. Define your broker with result backend
61+
dsn = "postgres://taskiq_postgres:look_in_vault@localhost:5432/taskiq_postgres"
62+
broker = AsyncpgBroker(dsn).with_result_backend(AsyncpgResultBackend(dsn=dsn)
63+
64+
# 3. Register task
65+
@broker.task("solve_all_problems")
66+
async def best_task_ever(
67+
progress_tracker: ProgressTracker[Any] = TaskiqDepends(), # noqa: B008
68+
) -> int:
69+
# 4. Set progress with state
70+
state_dict = {"start_message": "Starting to solve problems"}
71+
await progress_tracker.set_progress(TaskState.STARTED, state_dict)
72+
73+
await asyncio.sleep(2)
74+
75+
# You can also use custom states, but progress will be rewritten on each call (it's update not merge)
76+
state_dict.update({"halfway_message": "Halfway done!"})
77+
await progress_tracker.set_progress("halfway", state_dict)
78+
await progress_tracker.set_progress(TaskState.STARTED, state_dict)
79+
80+
await asyncio.sleep(2)
81+
82+
return 42
83+
84+
async def main():
85+
# 5. Start broker
86+
await broker.startup()
87+
task = await best_task_ever.kiq()
88+
89+
# 6. Check progress on start
90+
await asyncio.sleep(1)
91+
print(await task.get_progress())
92+
93+
# 7. Check progress on halfway
94+
await asyncio.sleep(2)
95+
print(await task.get_progress())
96+
97+
# 8. Get final result
98+
print(await task.wait_result())
99+
100+
await broker.shutdown()
101+
102+
103+
if __name__ == "__main__":
104+
asyncio.run(main())
105+
```
106+
107+
If you run this code, you should see something like this in the console:
108+
109+
```bash
110+
> uv run python -m examples.example_with_progress
111+
112+
state='STARTED' meta={'start_message': 'Starting to solve problems'}
113+
state='STARTED' meta={'start_message': 'Starting to solve problems', 'halfway_message': 'Halfway done!'}
114+
is_err=False log=None return_value=42 execution_time=4.01 labels={} error=None
115+
```

examples/example_with_progress.py

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
"""
2+
How to run:
3+
4+
1) Run worker in one terminal:
5+
uv run taskiq worker examples.example_with_progress:broker --workers 1
6+
7+
2) Run this script in another terminal:
8+
uv run python -m examples.example_with_progress
9+
"""
10+
11+
import asyncio
12+
from typing import Any
13+
14+
from taskiq import TaskiqDepends
15+
from taskiq.depends.progress_tracker import ProgressTracker, TaskState
16+
17+
from taskiq_pg.psycopg import PsycopgBroker, PsycopgResultBackend
18+
19+
20+
dsn = "postgres://taskiq_postgres:look_in_vault@localhost:5432/taskiq_postgres"
21+
broker = PsycopgBroker(dsn).with_result_backend(PsycopgResultBackend(dsn))
22+
23+
24+
@broker.task("solve_all_problems")
25+
async def best_task_ever(
26+
progress_tracker: ProgressTracker[Any] = TaskiqDepends(), # noqa: B008
27+
) -> int:
28+
state_dict = {"start_message": "Starting to solve problems"}
29+
await progress_tracker.set_progress(TaskState.STARTED, state_dict)
30+
31+
await asyncio.sleep(2)
32+
33+
state_dict.update({"halfway_message": "Halfway done!"})
34+
await progress_tracker.set_progress("halfway", state_dict)
35+
await progress_tracker.set_progress(TaskState.STARTED, state_dict)
36+
37+
await asyncio.sleep(2)
38+
39+
return 42
40+
41+
42+
async def main():
43+
await broker.startup()
44+
task = await best_task_ever.kiq()
45+
46+
# check progress on start
47+
await asyncio.sleep(1)
48+
print(await task.get_progress())
49+
50+
# check progress on halfway
51+
await asyncio.sleep(2)
52+
print(await task.get_progress())
53+
54+
# get final result
55+
print(await task.wait_result())
56+
57+
await broker.shutdown()
58+
59+
60+
if __name__ == "__main__":
61+
asyncio.run(main())

mkdocs.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ nav:
99
- Overview:
1010
- index.md
1111
- Tutorial:
12+
- tutorial/result_backend.md
1213
- tutorial/schedule_source.md
1314
- tutorial/common_issues.md
1415
- API:

pyproject.toml

Lines changed: 15 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -46,48 +46,48 @@ aiopg = [
4646
"aiopg>=1.4.0",
4747
]
4848
asyncpg = [
49-
"asyncpg>=0.30.0",
49+
"asyncpg>=0.31.0",
5050
]
5151
psqlpy = [
52-
"psqlpy>=0.11.6",
52+
"psqlpy>=0.11.11",
5353
]
5454
psycopg = [
55-
"psycopg[binary,pool]>=3.2.10",
55+
"psycopg[binary,pool]>=3.3.2",
5656
]
5757

5858
[dependency-groups]
5959
dev = [
6060
{include-group = "lint"},
6161
{include-group = "test"},
6262
{include-group = "docs"},
63-
"prek>=0.2.8",
63+
"prek>=0.2.19",
6464
]
6565
lint = [
66-
"ruff>=0.14.0",
67-
"bandit>=1.8.6",
66+
"ruff>=0.14.8",
67+
"bandit>=1.9.2",
6868
"codespell>=2.4.1",
69-
"zizmor>=1.15.2",
69+
"zizmor>=1.18.0",
7070
# type check
71-
"mypy>=1.18.1",
72-
"asyncpg-stubs>=0.30.2",
71+
"mypy>=1.19.0",
72+
"asyncpg-stubs>=0.31.0",
7373
]
7474
test = [
75-
"polyfactory>=2.22.2",
76-
"pytest>=8.4.2",
77-
"pytest-asyncio>=1.1.0",
75+
"polyfactory>=3.1.0",
76+
"pytest>=9.0.1",
77+
"pytest-asyncio>=1.3.0",
7878
"pytest-cov>=7.0.0",
7979
# for database in tests
8080
"sqlalchemy-utils>=0.42.0",
8181
# for faster asyncio loop in tests
8282
"uvloop>=0.22.1",
8383
]
8484
docs = [
85-
"mkdocs-material>=9.6.22",
86-
"mkdocstrings-python>=1.18.2",
85+
"mkdocs-material>=9.7.0",
86+
"mkdocstrings-python>=2.0.1",
8787
]
8888

8989
[build-system]
90-
requires = ["uv_build>=0.8.14,<0.9.0"]
90+
requires = ["uv_build>=0.9,<0.10"]
9191
build-backend = "uv_build"
9292

9393
[tool.uv.build-backend]

src/taskiq_pg/_internal/result_backend.py

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,11 @@
1-
from __future__ import annotations
2-
31
import abc
42
import typing as tp
53

64
from taskiq import AsyncResultBackend
5+
from taskiq.abc.serializer import TaskiqSerializer
76
from taskiq.serializers import PickleSerializer
87

98

10-
if tp.TYPE_CHECKING:
11-
from taskiq.abc.serializer import TaskiqSerializer
12-
13-
149
ReturnType = tp.TypeVar("ReturnType")
1510

1611

src/taskiq_pg/aiopg/queries.py

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,24 +1,40 @@
11
CREATE_TABLE_QUERY = """
22
CREATE TABLE IF NOT EXISTS {} (
33
task_id {} UNIQUE,
4-
result BYTEA
4+
result BYTEA,
5+
progress BYTEA
56
)
67
"""
78

9+
ADD_PROGRESS_COLUMN_QUERY = """
10+
ALTER TABLE {} ADD COLUMN IF NOT EXISTS progress BYTEA;
11+
"""
12+
813
CREATE_INDEX_QUERY = """
914
CREATE INDEX IF NOT EXISTS {}_task_id_idx ON {} USING HASH (task_id)
1015
"""
1116

1217
INSERT_RESULT_QUERY = """
13-
INSERT INTO {} VALUES (%s, %s)
18+
INSERT INTO {} VALUES (%s, %s, NULL)
1419
ON CONFLICT (task_id)
1520
DO UPDATE
1621
SET result = %s
1722
"""
1823

24+
INSERT_PROGRESS_QUERY = """
25+
INSERT INTO {} VALUES (%s, NULL, %s)
26+
ON CONFLICT (task_id)
27+
DO UPDATE
28+
SET progress = %s
29+
"""
30+
31+
SELECT_PROGRESS_QUERY = """
32+
SELECT progress FROM {} WHERE task_id = %s
33+
"""
34+
1935
IS_RESULT_EXISTS_QUERY = """
2036
SELECT EXISTS(
21-
SELECT 1 FROM {} WHERE task_id = %s
37+
SELECT 1 FROM {} WHERE task_id = %s and result IS NOT NULL
2238
)
2339
"""
2440

0 commit comments

Comments
 (0)