Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/getting-started.md
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ You now know the core concepts: creating dockets, scheduling work with idempoten

Ready for more? Check out:

- **[Task Behaviors](task-behaviors.md)** - Retries, timeouts, progress reporting, and concurrency control
- **[Task Behaviors](task-behaviors.md)** - Retries, timeouts, progress reporting, concurrency control, debounce, and cooldown
- **[Dependency Injection](dependency-injection.md)** - Access current docket, custom dependencies, shared resources
- **[Testing with Docket](testing.md)** - Ergonomic testing utilities for unit and integration tests
- **[Task Design Patterns](task-patterns.md)** - Find & flood, task scattering, logging, and task chains
Expand Down
245 changes: 164 additions & 81 deletions docs/task-behaviors.md
Original file line number Diff line number Diff line change
Expand Up @@ -305,18 +305,18 @@ For more details on progress monitoring patterns and real-time observation, see

## Concurrency Control

Docket provides fine-grained concurrency control that allows you to limit the number of concurrent tasks based on specific argument values. This is essential for protecting shared resources, preventing overwhelming external services, and managing database connections.
Docket provides fine-grained concurrency control that limits how many tasks can run at the same time, based on specific argument values. This is useful for protecting shared resources, preventing overwhelming external services, and managing database connections.

### Basic Concurrency Limits
### Per-Argument Concurrency

Use `ConcurrencyLimit` to restrict concurrent execution based on task arguments:
Annotate a parameter with `ConcurrencyLimit` to limit concurrency based on its value. Each distinct value gets its own independent limit:

```python
from typing import Annotated
from docket import ConcurrencyLimit

async def process_customer_data(
customer_id: int,
concurrency: ConcurrencyLimit = ConcurrencyLimit("customer_id", max_concurrent=1)
customer_id: Annotated[int, ConcurrencyLimit(1)],
) -> None:
# Only one task per customer_id can run at a time
await update_customer_profile(customer_id)
Expand All @@ -325,11 +325,22 @@ async def process_customer_data(
# These will run sequentially for the same customer
await docket.add(process_customer_data)(customer_id=1001)
await docket.add(process_customer_data)(customer_id=1001)
await docket.add(process_customer_data)(customer_id=1001)

# But different customers can run concurrently
await docket.add(process_customer_data)(customer_id=2001) # Runs in parallel
await docket.add(process_customer_data)(customer_id=3001) # Runs in parallel
```

### Per-Task Concurrency

Use a default parameter to limit the total number of concurrent executions of a task, regardless of arguments:

```python
async def expensive_computation(
input_data: str,
concurrency: ConcurrencyLimit = ConcurrencyLimit(max_concurrent=3),
) -> None:
# At most 3 of these tasks can run at once across all arguments
await run_computation(input_data)
```

### Database Connection Pooling
Expand All @@ -338,9 +349,8 @@ Limit concurrent database operations to prevent overwhelming your database:

```python
async def backup_database_table(
db_name: str,
db_name: Annotated[str, ConcurrencyLimit(2)],
table_name: str,
concurrency: ConcurrencyLimit = ConcurrencyLimit("db_name", max_concurrent=2)
) -> None:
# Maximum 2 backup operations per database at once
await create_table_backup(db_name, table_name)
Expand All @@ -360,8 +370,7 @@ Protect external APIs from being overwhelmed:
```python
async def sync_user_with_external_service(
user_id: int,
service_name: str,
concurrency: ConcurrencyLimit = ConcurrencyLimit("service_name", max_concurrent=5)
service_name: Annotated[str, ConcurrencyLimit(5)],
) -> None:
# Limit to 5 concurrent API calls per external service
api_client = get_api_client(service_name)
Expand All @@ -374,84 +383,22 @@ await docket.add(sync_user_with_external_service)(456, "salesforce") # Will que
await docket.add(sync_user_with_external_service)(789, "hubspot") # Different service, runs in parallel
```

### File Processing Limits

Control concurrent file operations to manage disk I/O:

```python
async def process_media_file(
file_path: str,
operation_type: str,
concurrency: ConcurrencyLimit = ConcurrencyLimit("operation_type", max_concurrent=3)
) -> None:
# Limit concurrent operations by type (e.g., 3 video transcodes, 3 image resizes)
if operation_type == "video_transcode":
await transcode_video(file_path)
elif operation_type == "image_resize":
await resize_image(file_path)
elif operation_type == "audio_compress":
await compress_audio(file_path)

# Different operation types can run concurrently, but each type is limited
await docket.add(process_media_file)("/videos/movie1.mp4", "video_transcode")
await docket.add(process_media_file)("/videos/movie2.mp4", "video_transcode")
await docket.add(process_media_file)("/images/photo1.jpg", "image_resize") # Runs in parallel
```

### Custom Scopes

Use custom scopes to create independent concurrency limits:

```python
async def process_tenant_data(
tenant_id: str,
tenant_id: Annotated[str, ConcurrencyLimit(2, scope="tenant_operations")],
operation: str,
concurrency: ConcurrencyLimit = ConcurrencyLimit(
"tenant_id",
max_concurrent=2,
scope="tenant_operations"
)
) -> None:
# Each tenant can have up to 2 concurrent operations
await perform_tenant_operation(tenant_id, operation)

async def process_global_data(
data_type: str,
concurrency: ConcurrencyLimit = ConcurrencyLimit(
"data_type",
max_concurrent=1,
scope="global_operations" # Separate from tenant operations
)
) -> None:
# Global operations have their own concurrency limits
await process_global_data_type(data_type)
```

### Multi-Level Concurrency

Combine multiple concurrency controls for complex scenarios:

```python
async def process_user_export(
user_id: int,
export_type: str,
region: str,
user_limit: ConcurrencyLimit = ConcurrencyLimit("user_id", max_concurrent=1),
type_limit: ConcurrencyLimit = ConcurrencyLimit("export_type", max_concurrent=3),
region_limit: ConcurrencyLimit = ConcurrencyLimit("region", max_concurrent=10)
) -> None:
# This task respects ALL concurrency limits:
# - Only 1 export per user at a time
# - Only 3 exports of each type globally
# - Only 10 exports per region
await generate_user_export(user_id, export_type, region)
```

**Note**: When using multiple `ConcurrencyLimit` dependencies, all limits must be satisfied before the task can start.

### Monitoring Concurrency

Concurrency limits are enforced using Redis sets, so you can monitor them:
Concurrency limits are enforced using Redis sorted sets, so you can monitor them:

```python
async def monitor_concurrency_usage() -> None:
Expand All @@ -467,14 +414,150 @@ async def monitor_concurrency_usage() -> None:
print(f"{key}: {count} active tasks")
```

### Tips
!!! note "Legacy default-parameter style"
Prior to 0.18, `ConcurrencyLimit` required passing the argument name as a
string: `ConcurrencyLimit("customer_id", max_concurrent=1)`. This style
still works but `Annotated` is preferred — it avoids the string-name
duplication and is consistent with Debounce, Cooldown, and other
dependencies.

## Cooldown

Cooldown executes the first submission immediately, then drops duplicates within a window. If another submission arrives before the window expires, it's quietly dropped with an INFO-level log.

### Per-Task Cooldown

```python
from datetime import timedelta
from docket import Cooldown

async def process_webhooks(
cooldown: Cooldown = Cooldown(timedelta(seconds=30)),
) -> None:
events = await fetch_pending_webhook_events()
await process_events(events)

# First call starts immediately and sets a 30-second window
await docket.add(process_webhooks)()

# This one arrives 5 seconds later — quietly dropped
await docket.add(process_webhooks)()
```

### Per-Parameter Cooldown

Annotate a parameter with `Cooldown` to apply independent windows per value:

```python
from typing import Annotated

async def sync_customer(
customer_id: Annotated[int, Cooldown(timedelta(seconds=30))],
) -> None:
await refresh_customer_data(customer_id)

# First sync for customer 1001 starts immediately
await docket.add(sync_customer)(customer_id=1001)

# Duplicate for 1001 within 30s — dropped
await docket.add(sync_customer)(customer_id=1001)

# Different customer — runs immediately
await docket.add(sync_customer)(customer_id=2002)
```

## Debounce

1. **Choose appropriate argument names**: Use arguments that represent the resource you want to protect (database name, customer ID, API endpoint).
Debounce waits for submissions to settle before firing. When rapid-fire events arrive, only one task runs — after a quiet period equal to the settle window. This is the classic "trailing-edge" debounce: keep resetting the timer on each new event, then fire once things calm down.

### Per-Task Debounce

```python
from datetime import timedelta
from docket import Debounce

async def process_webhooks(
debounce: Debounce = Debounce(timedelta(seconds=5)),
) -> None:
events = await fetch_pending_webhook_events()
await process_events(events)

2. **Set reasonable limits**: Base limits on your system's capacity and external service constraints.
# First submission becomes the "winner" and gets rescheduled
await docket.add(process_webhooks)()

3. **Use descriptive scopes**: When you have multiple unrelated concurrency controls, use different scopes to avoid conflicts.
# More events arrive — they reset the settle timer but are dropped
await docket.add(process_webhooks)()
await docket.add(process_webhooks)()

# After 5 seconds of quiet, the winner proceeds
```

### Per-Parameter Debounce

Annotate a parameter with `Debounce` to get independent settle windows per value:

```python
from typing import Annotated

async def sync_customer(
customer_id: Annotated[int, Debounce(timedelta(seconds=5))],
) -> None:
await refresh_customer_data(customer_id)

# Each customer_id gets its own independent settle window
await docket.add(sync_customer)(customer_id=1001)
await docket.add(sync_customer)(customer_id=1001) # resets 1001's timer
await docket.add(sync_customer)(customer_id=2002) # independent window
```

4. **Monitor blocked tasks**: Tasks that can't start due to concurrency limits are automatically rescheduled with small delays.
### Debounce vs. Cooldown

| | Cooldown | Debounce |
|---|---|---|
| **Behavior** | Execute first, drop duplicates | Wait for quiet, then execute |
| **Window anchored to** | First execution | Last submission |
| **Good for** | Deduplicating rapid-fire events | Batching bursts into one action |

### Multiple Cooldowns

You can annotate multiple parameters with `Cooldown` on the same task. Each gets its own independent window scoped to that parameter's value. A task must pass *all* of its cooldown checks to start — if any one blocks, the task is dropped:

```python
from typing import Annotated

async def sync_data(
customer_id: Annotated[int, Cooldown(timedelta(seconds=30))],
region: Annotated[str, Cooldown(timedelta(seconds=60))],
) -> None:
await refresh_data(customer_id, region)

# Runs immediately — both windows are clear
await docket.add(sync_data)(customer_id=1, region="us")

# Blocked — customer_id=1 is still in cooldown
await docket.add(sync_data)(customer_id=1, region="eu")

# Blocked — region="us" is still in cooldown
await docket.add(sync_data)(customer_id=2, region="us")

# Runs — both customer_id=2 and region="eu" are clear
await docket.add(sync_data)(customer_id=2, region="eu")
```

Only one `Debounce` is allowed per task — its reschedule mechanism requires a single settle window.

### Combining with Other Controls

Debounce, cooldown, and concurrency limits can all coexist on the same task:

```python
from typing import Annotated

async def process_order(
order_id: Annotated[int, ConcurrencyLimit(1)],
cooldown: Cooldown = Cooldown(timedelta(seconds=60)),
) -> None:
await finalize_order(order_id)
```

5. **Consider cascading effects**: Concurrency limits can create queuing effects - monitor your system to ensure tasks don't back up excessively.
Each admission control is checked independently. A task must satisfy all of them to start.
2 changes: 1 addition & 1 deletion loq.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ max_lines = 750
# Source files that still need exceptions above 750
[[rules]]
path = "src/docket/worker.py"
max_lines = 1141
max_lines = 1150

[[rules]]
path = "src/docket/cli/__init__.py"
Expand Down
4 changes: 4 additions & 0 deletions src/docket/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@
from .annotations import Logged
from .dependencies import (
ConcurrencyLimit,
Cooldown,
Cron,
Debounce,
CurrentDocket,
CurrentExecution,
CurrentWorker,
Expand All @@ -37,7 +39,9 @@
"__version__",
"Agenda",
"ConcurrencyLimit",
"Cooldown",
"Cron",
"Debounce",
"CurrentDocket",
"CurrentExecution",
"CurrentWorker",
Expand Down
4 changes: 4 additions & 0 deletions src/docket/dependencies/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
format_duration,
)
from ._concurrency import ConcurrencyBlocked, ConcurrencyLimit
from ._cooldown import Cooldown
from ._debounce import Debounce
from ._cron import Cron
from ._contextual import (
CurrentDocket,
Expand Down Expand Up @@ -83,6 +85,8 @@
"AdmissionBlocked",
"ConcurrencyBlocked",
"ConcurrencyLimit",
"Cooldown",
"Debounce",
"Cron",
"Perpetual",
"Progress",
Expand Down
18 changes: 17 additions & 1 deletion src/docket/dependencies/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,27 @@ class AdmissionBlocked(Exception):

This is the base exception for admission control mechanisms like
concurrency limits, rate limits, or health gates.

When ``reschedule`` is True (default), the worker re-queues the task
with a short delay. When False, the task is quietly acknowledged
and dropped with an INFO-level log (appropriate for debounce/cooldown
where re-trying would just hit the same window).

``retry_delay`` overrides the default reschedule delay when set.
"""

def __init__(self, execution: Execution, reason: str = "admission control"):
def __init__(
self,
execution: Execution,
reason: str = "admission control",
*,
reschedule: bool = True,
retry_delay: timedelta | None = None,
):
self.execution = execution
self.reason = reason
self.reschedule = reschedule
self.retry_delay = retry_delay
super().__init__(f"Task {execution.key} blocked by {reason}")


Expand Down
Loading