Skip to content

Commit 4c1179c

Browse files
committed
docs: add example and docs with set_progress/get_progress
1 parent 880208a commit 4c1179c

File tree

3 files changed

+177
-0
lines changed

3 files changed

+177
-0
lines changed

docs/tutorial/result_backend.md

Lines changed: 115 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,115 @@
1+
---
2+
title: Result Backend
3+
---
4+
5+
## Basic usage
6+
7+
You can store task results in Postgres using one of result backend classes from this package.
8+
9+
You can define your broker with result backend like this:
10+
11+
```python
12+
import asyncio
13+
from taskiq import TaskiqBroker
14+
# 1. Import AsyncpgBroker and AsyncpgResultBackend (or other result backend you want to use)
15+
from taskiq_pg.asyncpg import AsyncpgBroker, AsyncpgResultBackend
16+
17+
# 2. Define your broker with result backend
18+
dsn = "postgres://taskiq_postgres:look_in_vault@localhost:5432/taskiq_postgres"
19+
broker = AsyncpgBroker(dsn).with_result_backend(AsyncpgResultBackend(dsn=dsn)
20+
21+
# 3. Register task
22+
@broker.task(task_name="answer_for_everything")
23+
async def answer_for_everything() -> None:
24+
await asyncio.sleep(2)
25+
return 42
26+
27+
async def main():
28+
# 4. Start broker, call task and wait for result
29+
await broker.startup()
30+
task = await best_task_ever.kiq()
31+
print(await task.wait_result())
32+
await broker.shutdown()
33+
34+
35+
if __name__ == "__main__":
36+
asyncio.run(main())
37+
```
38+
39+
After running this code, you should see `42` printed in the console. Plus the result will be stored in the Postgres database in `taskiq_results` (by default).
40+
41+
## Customization
42+
43+
You can customize the result backend by providing additional parameters to the constructor.
44+
45+
- `keep_results` - whatever to keep results after they are fetched. Default is `True`. Suitable if you don't want to store results forever.
46+
- `table_name` - name of the table to store results in. Default is `taskiq_results`.
47+
- `field_for_task_id` - type of the field to store task IDs. Default is `VarChar`. But you can pick `Uuid` or `Text` if you want.
48+
- `serializer` - serializer to use for serializing results. Default is `PickleSerializer`. But if you want human readable results you can use `JsonSerializer` from `taskiq.serializers` for example.
49+
50+
## Task progress
51+
52+
You can also store task progress using result backend. To do this, you need to use `set_progress` method from `ProgressTracker`:
53+
54+
```python
55+
import asyncio
56+
from taskiq import TaskiqBroker
57+
# 1. Import AsyncpgBroker and AsyncpgResultBackend (or other result backend you want to use)
58+
from taskiq_pg.asyncpg import AsyncpgBroker, AsyncpgResultBackend
59+
60+
# 2. Define your broker with result backend
61+
dsn = "postgres://taskiq_postgres:look_in_vault@localhost:5432/taskiq_postgres"
62+
broker = AsyncpgBroker(dsn).with_result_backend(AsyncpgResultBackend(dsn=dsn)
63+
64+
# 3. Register task
65+
@broker.task("solve_all_problems")
66+
async def best_task_ever(
67+
progress_tracker: ProgressTracker[Any] = TaskiqDepends(), # noqa: B008
68+
) -> int:
69+
# 4. Set progress with state
70+
state_dict = {"start_message": "Starting to solve problems"}
71+
await progress_tracker.set_progress(TaskState.STARTED, state_dict)
72+
73+
await asyncio.sleep(2)
74+
75+
# You can also use custom states, but progress will be rewritten on each call (it's update not merge)
76+
state_dict.update({"halfway_message": "Halfway done!"})
77+
await progress_tracker.set_progress("halfway", state_dict)
78+
await progress_tracker.set_progress(TaskState.STARTED, state_dict)
79+
80+
await asyncio.sleep(2)
81+
82+
return 42
83+
84+
async def main():
85+
# 5. Start broker
86+
await broker.startup()
87+
task = await best_task_ever.kiq()
88+
89+
# 6. Check progress on start
90+
await asyncio.sleep(1)
91+
print(await task.get_progress())
92+
93+
# 7. Check progress on halfway
94+
await asyncio.sleep(2)
95+
print(await task.get_progress())
96+
97+
# 8. Get final result
98+
print(await task.wait_result())
99+
100+
await broker.shutdown()
101+
102+
103+
if __name__ == "__main__":
104+
asyncio.run(main())
105+
```
106+
107+
If you run this code, you should see something like this in the console:
108+
109+
```bash
110+
> uv run python -m examples.example_with_progress
111+
112+
state='STARTED' meta={'start_message': 'Starting to solve problems'}
113+
state='STARTED' meta={'start_message': 'Starting to solve problems', 'halfway_message': 'Halfway done!'}
114+
is_err=False log=None return_value=42 execution_time=4.01 labels={} error=None
115+
```

examples/example_with_progress.py

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
"""
2+
How to run:
3+
4+
1) Run worker in one terminal:
5+
uv run taskiq worker examples.example_with_progress:broker --workers 1
6+
7+
2) Run this script in another terminal:
8+
uv run python -m examples.example_with_progress
9+
"""
10+
11+
import asyncio
12+
from typing import Any
13+
14+
from taskiq import TaskiqDepends
15+
from taskiq.depends.progress_tracker import ProgressTracker, TaskState
16+
17+
from taskiq_pg.psycopg import PsycopgBroker, PsycopgResultBackend
18+
19+
20+
dsn = "postgres://taskiq_postgres:look_in_vault@localhost:5432/taskiq_postgres"
21+
broker = PsycopgBroker(dsn).with_result_backend(PsycopgResultBackend(dsn))
22+
23+
24+
@broker.task("solve_all_problems")
25+
async def best_task_ever(
26+
progress_tracker: ProgressTracker[Any] = TaskiqDepends(), # noqa: B008
27+
) -> int:
28+
state_dict = {"start_message": "Starting to solve problems"}
29+
await progress_tracker.set_progress(TaskState.STARTED, state_dict)
30+
31+
await asyncio.sleep(2)
32+
33+
state_dict.update({"halfway_message": "Halfway done!"})
34+
await progress_tracker.set_progress("halfway", state_dict)
35+
await progress_tracker.set_progress(TaskState.STARTED, state_dict)
36+
37+
await asyncio.sleep(2)
38+
39+
return 42
40+
41+
42+
async def main():
43+
await broker.startup()
44+
task = await best_task_ever.kiq()
45+
46+
# check progress on start
47+
await asyncio.sleep(1)
48+
print(await task.get_progress())
49+
50+
# check progress on halfway
51+
await asyncio.sleep(2)
52+
print(await task.get_progress())
53+
54+
# get final result
55+
print(await task.wait_result())
56+
57+
await broker.shutdown()
58+
59+
60+
if __name__ == "__main__":
61+
asyncio.run(main())

mkdocs.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ nav:
99
- Overview:
1010
- index.md
1111
- Tutorial:
12+
- tutorial/result_backend.md
1213
- tutorial/schedule_source.md
1314
- tutorial/common_issues.md
1415
- API:

0 commit comments

Comments
 (0)