Skip to content

Commit fa24461

Browse files
authored
feat: add optional methods for schedule sources (#21)
2 parents 59f46bf + 91ce7de commit fa24461

File tree

15 files changed

+433
-164
lines changed

15 files changed

+433
-164
lines changed

Makefile

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,10 @@ init: ## Install all project dependencies with extras
2929
@$(MAKE) check_venv
3030
@uv sync --all-extras
3131

32+
.PHONY: run_docs
33+
run_docs: ## Run documentation server
34+
@uv run mkdocs serve --livereload
35+
3236
.PHONY: run_infra
3337
run_infra: ## Run rabbitmq in docker for integration tests
3438
@docker compose -f docker-compose.yml up -d

docs/tutorial/schedule_source.md

Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,85 @@
22
title: Schedule Source
33
---
44

5+
## Basic usage
6+
7+
The easiest way to schedule task with this library is to add `schedule` label to task. Schedule source will automatically
8+
parse this label and add new schedule to database on start of scheduler.
9+
10+
You can define your scheduled task like this:
11+
12+
```python
13+
import asyncio
14+
from taskiq import TaskiqScheduler
15+
from taskiq_pg.asyncpg import AsyncpgBroker, AsyncpgScheduleSource
16+
17+
18+
dsn = "postgres://taskiq_postgres:look_in_vault@localhost:5432/taskiq_postgres"
19+
broker = AsyncpgBroker(dsn)
20+
scheduler = TaskiqScheduler(
21+
broker=broker,
22+
sources=[AsyncpgScheduleSource(
23+
dsn=dsn,
24+
broker=broker,
25+
)],
26+
)
27+
28+
29+
@broker.task(
30+
task_name="solve_all_problems",
31+
schedule=[
32+
{
33+
"cron": "*/1 * * * *", # type: str, either cron or time should be specified.
34+
"cron_offset": None, # type: str | None, can be omitted. For example "Europe/Berlin".
35+
"time": None, # type: datetime | None, either cron or time should be specified.
36+
"args": [], # type list[Any] | None, can be omitted.
37+
"kwargs": {}, # type: dict[str, Any] | None, can be omitted.
38+
"labels": {}, # type: dict[str, Any] | None, can be omitted.
39+
},
40+
],
41+
)
42+
async def best_task_ever() -> None:
43+
"""Solve all problems in the world."""
44+
await asyncio.sleep(2)
45+
print("All problems are solved!")
46+
```
47+
48+
49+
## Adding schedule in runtime
50+
51+
You can also add schedules in runtime using `add_schedule` method of the schedule source:
52+
53+
54+
```python
55+
import asyncio
56+
from taskiq import TaskiqScheduler, ScheduledTask
57+
from taskiq_pg.asyncpg import AsyncpgBroker, AsyncpgScheduleSource
58+
59+
60+
dsn = "postgres://taskiq_postgres:look_in_vault@localhost:5432/taskiq_postgres"
61+
broker = AsyncpgBroker(dsn)
62+
schedule_source = AsyncpgScheduleSource(
63+
dsn=dsn,
64+
broker=broker,
65+
)
66+
scheduler = TaskiqScheduler(
67+
broker=broker,
68+
sources=[schedule_source],
69+
)
70+
71+
72+
@broker.task(
73+
task_name="solve_all_problems",
74+
)
75+
async def best_task_ever() -> None:
76+
"""Solve all problems in the world."""
77+
await asyncio.sleep(2)
78+
print("All problems are solved!")
79+
80+
# Call this function somewhere in your code to add new schedule
81+
async def add_new_schedule() -> None:
82+
await schedule_source.add_schedule(ScheduledTask(...))
83+
```
584

685
## Using multiple schedules
786

pyproject.toml

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@ lint = [
6969
"asyncpg-stubs>=0.30.2",
7070
]
7171
test = [
72+
"polyfactory>=2.22.2",
7273
"pytest>=8.4.2",
7374
"pytest-asyncio>=1.1.0",
7475
"pytest-cov>=7.0.0",
@@ -125,10 +126,8 @@ select = ["ALL"]
125126
ignore = [
126127
# TODO: enable this rules
127128
"TRY301",
128-
"PLR0913",
129-
"D401",
130129
"ANN401",
131-
"PERF203",
130+
# "PERF203",
132131

133132

134133
# boolean args
@@ -174,6 +173,9 @@ ignore = [
174173
"INP001",
175174
]
176175

176+
[tool.ruff.lint.pydocstyle]
177+
convention = "google"
178+
177179
[tool.ruff.lint.isort]
178180
known-local-folder = ["taskiq_pg"]
179181
lines-after-imports = 2

src/taskiq_pg/_internal/broker.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
class BasePostgresBroker(AsyncBroker, abc.ABC):
1717
"""Base class for Postgres brokers."""
1818

19-
def __init__(
19+
def __init__( # noqa: PLR0913
2020
self,
2121
dsn: str | tp.Callable[[], str] = "postgresql://postgres:postgres@localhost:5432/postgres",
2222
result_backend: AsyncResultBackend[_T] | None = None,

src/taskiq_pg/_internal/schedule_source.py

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,21 @@
11
from __future__ import annotations
22

33
import typing as tp
4+
import uuid
5+
from logging import getLogger
46

7+
from pydantic import ValidationError
58
from taskiq import ScheduleSource
9+
from taskiq.scheduler.scheduled_task import ScheduledTask
610

711

812
if tp.TYPE_CHECKING:
913
from taskiq.abc.broker import AsyncBroker
1014

1115

16+
logger = getLogger("taskiq_pg")
17+
18+
1219
class BasePostgresScheduleSource(ScheduleSource):
1320
def __init__(
1421
self,
@@ -47,3 +54,44 @@ def dsn(self) -> str | None:
4754
if callable(self._dsn):
4855
return self._dsn()
4956
return self._dsn
57+
58+
def extract_scheduled_tasks_from_broker(self) -> list[ScheduledTask]:
59+
"""
60+
Extract schedules from tasks that were registered in broker.
61+
62+
Returns:
63+
A list of ScheduledTask instances extracted from the task's labels.
64+
"""
65+
scheduled_tasks_for_creation: list[ScheduledTask] = []
66+
for task_name, task in self._broker.get_all_tasks().items():
67+
if "schedule" not in task.labels:
68+
logger.debug("Task %s has no schedule, skipping", task_name)
69+
continue
70+
if not isinstance(task.labels["schedule"], list):
71+
logger.warning(
72+
"Schedule for task %s is not a list, skipping",
73+
task_name,
74+
)
75+
continue
76+
for schedule in task.labels["schedule"]:
77+
try:
78+
new_schedule = ScheduledTask.model_validate(
79+
{
80+
"task_name": task_name,
81+
"labels": schedule.get("labels", {}),
82+
"args": schedule.get("args", []),
83+
"kwargs": schedule.get("kwargs", {}),
84+
"schedule_id": str(uuid.uuid4()),
85+
"cron": schedule.get("cron", None),
86+
"cron_offset": schedule.get("cron_offset", None),
87+
"time": schedule.get("time", None),
88+
},
89+
)
90+
scheduled_tasks_for_creation.append(new_schedule)
91+
except ValidationError: # noqa: PERF203
92+
logger.exception(
93+
"Schedule for task %s is not valid, skipping",
94+
task_name,
95+
)
96+
continue
97+
return scheduled_tasks_for_creation

src/taskiq_pg/aiopg/queries.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,3 +57,7 @@
5757
DELETE_ALL_SCHEDULES_QUERY = """
5858
DELETE FROM {};
5959
"""
60+
61+
DELETE_SCHEDULE_QUERY = """
62+
DELETE FROM {} WHERE id = %s;
63+
"""

src/taskiq_pg/aiopg/schedule_source.py

Lines changed: 41 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,14 @@
1-
import uuid
21
from logging import getLogger
32

43
from aiopg import Pool, create_pool
5-
from pydantic import ValidationError
64
from taskiq import ScheduledTask
75

86
from taskiq_pg import exceptions
97
from taskiq_pg._internal import BasePostgresScheduleSource
108
from taskiq_pg.aiopg.queries import (
119
CREATE_SCHEDULES_TABLE_QUERY,
1210
DELETE_ALL_SCHEDULES_QUERY,
11+
DELETE_SCHEDULE_QUERY,
1312
INSERT_SCHEDULE_QUERY,
1413
SELECT_SCHEDULES_QUERY,
1514
)
@@ -39,42 +38,6 @@ async def _update_schedules_on_startup(self, schedules: list[ScheduledTask]) ->
3938
],
4039
)
4140

42-
def _get_schedules_from_broker_tasks(self) -> list[ScheduledTask]:
43-
"""Extract schedules from the broker's registered tasks."""
44-
scheduled_tasks_for_creation: list[ScheduledTask] = []
45-
for task_name, task in self._broker.get_all_tasks().items():
46-
if "schedule" not in task.labels:
47-
logger.debug("Task %s has no schedule, skipping", task_name)
48-
continue
49-
if not isinstance(task.labels["schedule"], list):
50-
logger.warning(
51-
"Schedule for task %s is not a list, skipping",
52-
task_name,
53-
)
54-
continue
55-
for schedule in task.labels["schedule"]:
56-
try:
57-
new_schedule = ScheduledTask.model_validate(
58-
{
59-
"task_name": task_name,
60-
"labels": schedule.get("labels", {}),
61-
"args": schedule.get("args", []),
62-
"kwargs": schedule.get("kwargs", {}),
63-
"schedule_id": str(uuid.uuid4()),
64-
"cron": schedule.get("cron", None),
65-
"cron_offset": schedule.get("cron_offset", None),
66-
"time": schedule.get("time", None),
67-
},
68-
)
69-
scheduled_tasks_for_creation.append(new_schedule)
70-
except ValidationError:
71-
logger.exception(
72-
"Schedule for task %s is not valid, skipping",
73-
task_name,
74-
)
75-
continue
76-
return scheduled_tasks_for_creation
77-
7841
async def startup(self) -> None:
7942
"""
8043
Initialize the schedule source.
@@ -89,7 +52,7 @@ async def startup(self) -> None:
8952
)
9053
async with self._database_pool.acquire() as connection, connection.cursor() as cursor:
9154
await cursor.execute(CREATE_SCHEDULES_TABLE_QUERY.format(self._table_name))
92-
scheduled_tasks_for_creation = self._get_schedules_from_broker_tasks()
55+
scheduled_tasks_for_creation = self.extract_scheduled_tasks_from_broker()
9356
await self._update_schedules_on_startup(scheduled_tasks_for_creation)
9457
except Exception as error:
9558
raise exceptions.DatabaseConnectionError(str(error)) from error
@@ -122,3 +85,42 @@ async def get_schedules(self) -> list["ScheduledTask"]:
12285
),
12386
)
12487
return schedules
88+
89+
async def add_schedule(self, schedule: "ScheduledTask") -> None:
90+
"""
91+
Add a new schedule.
92+
93+
Args:
94+
schedule: schedule to add.
95+
"""
96+
async with self._database_pool.acquire() as connection, connection.cursor() as cursor:
97+
await cursor.execute(
98+
INSERT_SCHEDULE_QUERY.format(self._table_name),
99+
[
100+
schedule.schedule_id,
101+
schedule.task_name,
102+
schedule.model_dump_json(
103+
exclude={"schedule_id", "task_name"},
104+
),
105+
],
106+
)
107+
108+
async def delete_schedule(self, schedule_id: str) -> None:
109+
"""
110+
Method to delete schedule by id.
111+
112+
This is useful for schedule cancelation.
113+
114+
Args:
115+
schedule_id: id of schedule to delete.
116+
"""
117+
async with self._database_pool.acquire() as connection, connection.cursor() as cursor:
118+
await cursor.execute(
119+
DELETE_SCHEDULE_QUERY.format(self._table_name),
120+
[schedule_id],
121+
)
122+
123+
async def post_send(self, task: ScheduledTask) -> None:
124+
"""Delete a task after it's completed."""
125+
if task.time is not None:
126+
await self.delete_schedule(task.schedule_id)

src/taskiq_pg/asyncpg/queries.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,3 +79,7 @@
7979
DELETE_ALL_SCHEDULES_QUERY = """
8080
DELETE FROM {};
8181
"""
82+
83+
DELETE_SCHEDULE_QUERY = """
84+
DELETE FROM {} WHERE id = $1;
85+
"""

0 commit comments

Comments
 (0)