Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 37 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,41 @@
# Release Notes

## Latest

## 1.1.0 - 2026-04-15

### Features

* 🚀 **Parallel execution control (`allow_parallel`)**:
Add configurable `allow_parallel` setting to control whether the same task can run multiple times concurrently.
When set to `False`, the coordinator skips scheduling if the task is already running on any worker (checked via
Redis heartbeat key).
Supports three-level cascade resolution: **task > task group > global config**, the most specific non-null
value wins.
Available on `Config` (global default), `TaskGroup` (group-level override), `@task_group.add_task()` decorator, `task_group.add_dynamic_task()`, and the `POST /tasks` REST API. Dynamic task definitions include the setting in Redis persistence for restart survival.

### Internals
* ⬆️ Bump dependencies in uv.lock file, for dev purposes:
- anyio from 4.12.1 to 4.13.0
- charset-normalizer from 3.4.5 to 3.4.7
- coverage from 7.13.4 to 7.13.5
- fastapi from 0.135.1 to 0.135.3
- mkdocs-get-deps from 0.2.0 to 0.2.2
- mkdocs-material from 9.7.4 to 9.7.6
- platformdirs from 4.9.4 to 4.9.6
- pydantic from 2.12.5 to 2.13.0
- pydantic-core from 2.41.5 to 2.46.0
- pygments from 2.19.2 to 2.20.0
- pymdown-extensions from 10.21 to 10.21.2
- pytest from 9.0.2 to 9.0.3
- redis from 7.3.0 to 7.4.0
- requests from 2.32.5 to 2.33.1
- ruff from 0.15.5 to 0.15.10
- setuptools from 82.0.0 to 82.0.1
- starlette from 0.52.1 to 1.0.0
- termynal from 0.13.1 to 0.14.0
- ty from 0.0.21 to 0.0.29

## 1.0.0 - 2026-03-09

### Features
Expand Down Expand Up @@ -33,7 +69,7 @@
* ⬆️ Pre-commit bump uv-pre-commit from 0.9.7 to 0.9.18.
* ⬆️ Pre-commit bump ruff-pre-commit from 0.14.3 to 0.14.10.
* ⬆️ Bump dependencies in uv.lock file, for dev purposes:
* - annotated-doc added 0.0.4
- annotated-doc added 0.0.4
- anyio from 4.11.0 to 4.12.1
- backrefs from 6.1 to 6.2
- certifi from 2026.1.4 to 2026.2.25
Expand Down
1 change: 1 addition & 0 deletions docs/docs/features.md
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ Built with production environments in mind, FastAPI Task Manager includes compre
**Built-in safeguards:**

- Exponential backoff retry with configurable delays and per-task overrides
- Per-task parallel execution control (`allow_parallel=False` to prevent overlapping runs)
- Task heartbeat monitoring for crash detection
- Automatic reconciliation of stale and failed tasks
- Health check endpoints for monitoring
Expand Down
5 changes: 4 additions & 1 deletion docs/docs/learn/api-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ Retrieve detailed information for all tasks, including running state and retry b
"retry_backoff": null,
"retry_backoff_max": null,
"dynamic": false,
"allow_parallel": null,
"task_group_name": "My Example Task Group",
"kwargs": null,
"function_name": null,
Expand Down Expand Up @@ -246,7 +247,8 @@ Create a new dynamic task from a registered function.
"high_priority": false,
"tags": ["reports"],
"retry_backoff": 5.0,
"retry_backoff_max": 120.0
"retry_backoff_max": 120.0,
"allow_parallel": false
}
```

Expand All @@ -262,6 +264,7 @@ Create a new dynamic task from a registered function.
| `tags` | `list[string]` | No | Tags for filtering |
| `retry_backoff` | `float` | No | Per-task initial backoff override |
| `retry_backoff_max` | `float` | No | Per-task max backoff override |
| `allow_parallel` | `bool \| null` | No | Allow concurrent executions (`null` = inherit from group/config) |

**Response model:** `DynamicTaskResponse`

Expand Down
17 changes: 17 additions & 0 deletions docs/docs/learn/dynamic-tasks.md
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,22 @@ curl -X POST http://localhost:8000/task-manager/tasks \
}'
```

### Disable Parallel Execution

Set `allow_parallel: false` to prevent a task from being scheduled while a previous execution is still running. This is checked via the Redis heartbeat key, so it works correctly across multiple workers.

```bash
curl -X POST http://localhost:8000/task-manager/tasks \
-H "Content-Type: application/json" \
-d '{
"task_group_name": "Reports",
"function_name": "send_report",
"cron_expression": "*/5 * * * *",
"name": "long_running_report",
"allow_parallel": false
}'
```

### Custom Task Names

If you don't provide a `name`, one is auto-generated from the function name plus a hash of the kwargs and cron expression. Providing explicit names makes tasks easier to manage and monitor.
Expand Down Expand Up @@ -189,6 +205,7 @@ The method accepts the same parameters available in the REST API:
| `tags` | `list[str]` | No | Tags for filtering |
| `retry_backoff` | `float` | No | Initial retry delay in seconds |
| `retry_backoff_max` | `float` | No | Maximum retry delay in seconds |
| `allow_parallel` | `bool \| None` | No | Allow concurrent executions (`None` = inherit from group/config) |

The method returns the created `Task` object and raises `RuntimeError` if the function is not registered or the task name is already taken.

Expand Down
81 changes: 67 additions & 14 deletions docs/docs/learn/getting_started/configurations.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,24 +37,24 @@ Default value is `500`.
## Redis Configuration

### Redis Host
{* ./docs_src/tutorial/configurations_py310.py ln[10] *}
{* ./docs_src/tutorial/configurations_py310.py ln[11] *}

The hostname or IP address of the Redis server. This is the only **required** configuration parameter with no default value.

### Redis Port
{* ./docs_src/tutorial/configurations_py310.py ln[11] *}
{* ./docs_src/tutorial/configurations_py310.py ln[12] *}

The port number of the Redis server.
Default value is `6379`.

### Redis Password
{* ./docs_src/tutorial/configurations_py310.py ln[12] *}
{* ./docs_src/tutorial/configurations_py310.py ln[13] *}

The password for authenticating with the Redis server. Set to `None` if your Redis instance does not require authentication.
Default value is `None`.

### Redis DB
{* ./docs_src/tutorial/configurations_py310.py ln[13] *}
{* ./docs_src/tutorial/configurations_py310.py ln[14] *}

The Redis database number to use. Redis supports multiple databases (0-15 by default).
Default value is `0`.
Expand All @@ -66,13 +66,13 @@ Default value is `0`.
These settings control the core task scheduling loop.

### Poll Interval
{* ./docs_src/tutorial/configurations_py310.py ln[15] *}
{* ./docs_src/tutorial/configurations_py310.py ln[16] *}

The interval (in seconds) between coordinator scheduling cycles. Lower values mean tasks are picked up faster but increase Redis load.
Default value is `0.1`.

### Worker Service Name
{* ./docs_src/tutorial/configurations_py310.py ln[16] *}
{* ./docs_src/tutorial/configurations_py310.py ln[17] *}

The service name used for worker identification. This appears in health check responses and logs to help identify which service a worker belongs to.
Default value is `"fastapi-task-manager"`.
Expand All @@ -84,7 +84,7 @@ Default value is `"fastapi-task-manager"`.
These settings control the Redis Streams-based task distribution system.

### Stream Block Timeout
{* ./docs_src/tutorial/configurations_py310.py ln[18] *}
{* ./docs_src/tutorial/configurations_py310.py ln[19] *}

The block timeout (in milliseconds) for `XREADGROUP` when consumers wait for new messages. Higher values reduce Redis round-trips but increase shutdown latency.
Default value is `1000`.
Expand All @@ -96,13 +96,13 @@ Default value is `1000`.
FastAPI Task Manager uses distributed leader election via Redis to ensure that only one instance schedules tasks at a time, while all instances can execute them.

### Leader Heartbeat Interval
{* ./docs_src/tutorial/configurations_py310.py ln[20] *}
{* ./docs_src/tutorial/configurations_py310.py ln[21] *}

The interval (in seconds) between leader lock renewals. The leader periodically renews its lock to signal it is still alive.
Default value is `3.0`.

### Leader Retry Interval
{* ./docs_src/tutorial/configurations_py310.py ln[21] *}
{* ./docs_src/tutorial/configurations_py310.py ln[22] *}

The interval (in seconds) between leadership acquisition attempts for follower instances. When a worker is not the leader, it tries to acquire leadership at this interval.
Default value is `5.0`.
Expand All @@ -114,7 +114,7 @@ Default value is `5.0`.
The reconciler detects and recovers stale or failed tasks. It runs only on the leader instance.

### Reconciliation Interval
{* ./docs_src/tutorial/configurations_py310.py ln[23] *}
{* ./docs_src/tutorial/configurations_py310.py ln[24] *}

The interval (in seconds) between reconciliation checks. The reconciler scans for overdue or stuck tasks at this interval.
Default value is `30`.
Expand All @@ -126,7 +126,7 @@ Default value is `30`.
When a task fails, FastAPI Task Manager applies exponential backoff to delay re-execution. This prevents rapid failure loops and gives external dependencies time to recover.

### Retry Backoff
{* ./docs_src/tutorial/configurations_py310.py ln[25] *}
{* ./docs_src/tutorial/configurations_py310.py ln[26] *}

The initial backoff delay (in seconds) after a task failure. The first retry will be delayed by this amount.
Default value is `1.0`.
Expand All @@ -136,7 +136,7 @@ This setting can be overridden on individual tasks via the `retry_backoff` param
////

### Retry Backoff Max
{* ./docs_src/tutorial/configurations_py310.py ln[26] *}
{* ./docs_src/tutorial/configurations_py310.py ln[27] *}

The maximum backoff delay (in seconds). The delay will never exceed this value, regardless of how many consecutive failures occur.
Default value is `60.0`.
Expand All @@ -146,19 +146,72 @@ This setting can be overridden on individual tasks via the `retry_backoff_max` p
////

### Retry Backoff Multiplier
{* ./docs_src/tutorial/configurations_py310.py ln[27] *}
{* ./docs_src/tutorial/configurations_py310.py ln[28] *}

The multiplier applied to the current delay after each consecutive failure. For example, with default settings: 1s, 2s, 4s, 8s, 16s, 32s, 60s (capped).
Default value is `2.0`.

---

## Parallel Execution Control

The `allow_parallel` setting controls whether the same task can run multiple times concurrently. When set to `False`, the coordinator will **skip scheduling** the task if it is already running on any worker. This is useful for long-running tasks where overlapping executions would be problematic.

This setting can be configured at **three levels**, with a cascade resolution order: **task > task group > global config**.

- If a task sets `allow_parallel`, that value is used.
- Otherwise, if the task group sets `allow_parallel`, that value is used.
- Otherwise, the global `Config.allow_parallel` value is used (default: `True`).

### Global (Config)

{* ./docs_src/tutorial/configurations_py310.py ln[9] *}

Sets the default for all tasks across the entire application. Default value is `True`.

### Task Group

```python
# All tasks in this group default to no parallel execution
my_tasks = TaskGroup(name="My Tasks", allow_parallel=False)
```

Sets the default for all tasks within the group. Set to `None` (the default) to inherit from the global config.

### Task

```python
# This specific task cannot run in parallel, even if the group allows it
@my_tasks.add_task(
"*/5 * * * *",
name="slow_sync",
allow_parallel=False,
)
async def slow_sync():
await asyncio.sleep(7)
```

Overrides both the group and global setting for this specific task. Set to `None` (the default) to inherit from the task group.

//// tip | Cascade example
With `Config(allow_parallel=True)` and `TaskGroup(allow_parallel=False)`:

- A task with `allow_parallel=True` **will** allow parallel runs (task wins)
- A task with `allow_parallel=None` **will not** allow parallel runs (inherits from group)
////

//// note | Multi-worker safe
This check uses the Redis `running` heartbeat key, which is shared across all workers. Even if the task is running on a different instance, the coordinator will correctly skip scheduling it.
////

---

## Running Heartbeat Configuration

While a task is executing, the worker periodically renews a heartbeat key in Redis. If a worker crashes, the key expires, signaling that the task is no longer being executed.

### Running Heartbeat Interval
{* ./docs_src/tutorial/configurations_py310.py ln[29] *}
{* ./docs_src/tutorial/configurations_py310.py ln[30] *}

The interval (in seconds) between heartbeat renewals while a task is executing. The worker renews the running key at this interval.
Default value is `3.0`.
1 change: 1 addition & 0 deletions docs_src/tutorial/configurations_py310.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
concurrent_tasks=2,
statistics_redis_expiration=432_000,
statistics_history_runs=30,
allow_parallel=True,
# --------- Redis config ---------
redis_host="localhost",
redis_port=6379,
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[project]
name = "fastapi-task-manager"
version = "1.0.0.post2"
version = "1.1.0"
description = "A task manager for FastAPI. Robust Scheduling, Distributed Safety"
readme = "README.md"
authors = [
Expand Down
3 changes: 3 additions & 0 deletions src/fastapi_task_manager/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ class Config(BaseModel):
concurrent_tasks: int = 2
statistics_redis_expiration: int = 432_000 # 5 days
statistics_history_runs: int = 500
# Global default for allowing parallel executions of the same task.
# Can be overridden at task group or individual task level.
allow_parallel: bool = True
# --------- End of app config variables ---------

# --------- Redis config variables ---------
Expand Down
22 changes: 21 additions & 1 deletion src/fastapi_task_manager/coordinator.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,9 @@ async def _is_task_due(self, task_group: TaskGroup, task: Task) -> bool:

A task is due if:
1. It's not disabled
2. Its next_run time is in the past (or not set)
2. It's not in backoff
3. If allow_parallel is False, it's not already running
4. Its next_run time is in the past (or not set)

Args:
task_group: The TaskGroup containing the task.
Expand All @@ -179,6 +181,24 @@ async def _is_task_due(self, task_group: TaskGroup, task: Task) -> bool:
)
return False

# Resolve allow_parallel with cascade: task > task_group > config
effective_allow_parallel = task.allow_parallel
if effective_allow_parallel is None:
effective_allow_parallel = task_group.allow_parallel
if effective_allow_parallel is None:
effective_allow_parallel = self._task_manager.config.allow_parallel

# When parallel execution is disabled, skip if the task is already running
if not effective_allow_parallel:
running_key = self._keys.running_task_key(task_group.name, task.name)
if await self._redis.exists(running_key):
logger.debug(
"Task %s/%s is already running and allow_parallel=False, skipping",
task_group.name,
task.name,
)
return False

# Check next_run time
next_run_b = await self._redis.get(keys.next_run)
if next_run_b is None:
Expand Down
1 change: 1 addition & 0 deletions src/fastapi_task_manager/schema/health.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ class ConfigResponse(BaseModel):
concurrent_tasks: int
statistics_history_runs: int
statistics_redis_expiration: int
allow_parallel: bool

# Runner
poll_interval: float
Expand Down
3 changes: 3 additions & 0 deletions src/fastapi_task_manager/schema/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ class TaskBase(BaseModel):
retry_backoff_max: float | None = None
# Whether this task was created dynamically via API (vs. static decorator)
dynamic: bool = False
# Per-task override for parallel execution (None = inherit from task group / global config)
allow_parallel: bool | None = None


class TaskRun(BaseModel):
Expand Down Expand Up @@ -101,6 +103,7 @@ class CreateDynamicTaskRequest(BaseModel):
tags: list[str] | None = None
retry_backoff: float | None = None
retry_backoff_max: float | None = None
allow_parallel: bool | None = None


class DynamicTaskResponse(BaseModel):
Expand Down
1 change: 1 addition & 0 deletions src/fastapi_task_manager/schema/task_group.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,5 @@
class TaskGroup(BaseModel):
name: str
tags: list[str] | None = None
allow_parallel: bool | None = None
task_count: int = 0
Loading
Loading