From 1b088521508fdd78e7b83ce6a92c774955a602c5 Mon Sep 17 00:00:00 2001 From: Chris Guidry Date: Fri, 27 Feb 2026 15:24:51 -0500 Subject: [PATCH 1/7] Add Debounce and Cooldown admission control dependencies Two new time-based admission controls for tasks: - **Debounce** (leading edge): "don't start this if one was recently started." Sets a Redis key with TTL on entry via `SET NX PX`. Good for deduplicating rapid-fire events like webhooks. - **Cooldown** (trailing edge): "don't start this if one recently succeeded." Checks for a key on entry, but only sets it on successful exit. Failed tasks don't trigger cooldown, so they can be retried immediately. Both work as default parameters (per-task) or via `Annotated` (per-parameter value), same pattern as `ConcurrencyLimit`. Also adds a `reschedule` flag to `AdmissionBlocked` so the worker knows whether to requeue blocked tasks (like ConcurrencyLimit does) or silently drop them (appropriate for debounce/cooldown where retrying would just hit the same window). Closes #322, closes #161. Co-Authored-By: Claude Opus 4.6 --- loq.toml | 2 +- src/docket/__init__.py | 4 + src/docket/dependencies/__init__.py | 6 + src/docket/dependencies/_base.py | 7 ++ src/docket/dependencies/_cooldown.py | 94 ++++++++++++++++ src/docket/dependencies/_debounce.py | 88 +++++++++++++++ src/docket/dependencies/_resolution.py | 2 +- src/docket/worker.py | 27 +++-- tests/test_cooldown.py | 150 +++++++++++++++++++++++++ tests/test_debounce.py | 123 ++++++++++++++++++++ 10 files changed, 492 insertions(+), 11 deletions(-) create mode 100644 src/docket/dependencies/_cooldown.py create mode 100644 src/docket/dependencies/_debounce.py create mode 100644 tests/test_cooldown.py create mode 100644 tests/test_debounce.py diff --git a/loq.toml b/loq.toml index 5b31587..a55406a 100644 --- a/loq.toml +++ b/loq.toml @@ -10,7 +10,7 @@ max_lines = 750 # Source files that still need exceptions above 750 [[rules]] path = "src/docket/worker.py" -max_lines = 1141 +max_lines = 1150 [[rules]] path = "src/docket/cli/__init__.py" diff --git a/src/docket/__init__.py b/src/docket/__init__.py index f2396ea..9c886ea 100644 --- a/src/docket/__init__.py +++ b/src/docket/__init__.py @@ -12,7 +12,9 @@ from .annotations import Logged from .dependencies import ( ConcurrencyLimit, + Cooldown, Cron, + Debounce, CurrentDocket, CurrentExecution, CurrentWorker, @@ -37,7 +39,9 @@ "__version__", "Agenda", "ConcurrencyLimit", + "Cooldown", "Cron", + "Debounce", "CurrentDocket", "CurrentExecution", "CurrentWorker", diff --git a/src/docket/dependencies/__init__.py b/src/docket/dependencies/__init__.py index 3fcf2ce..a86889d 100644 --- a/src/docket/dependencies/__init__.py +++ b/src/docket/dependencies/__init__.py @@ -19,6 +19,8 @@ format_duration, ) from ._concurrency import ConcurrencyBlocked, ConcurrencyLimit +from ._cooldown import Cooldown, CooldownBlocked +from ._debounce import Debounce, DebounceBlocked from ._cron import Cron from ._contextual import ( CurrentDocket, @@ -83,6 +85,10 @@ "AdmissionBlocked", "ConcurrencyBlocked", "ConcurrencyLimit", + "Cooldown", + "CooldownBlocked", + "Debounce", + "DebounceBlocked", "Cron", "Perpetual", "Progress", diff --git a/src/docket/dependencies/_base.py b/src/docket/dependencies/_base.py index a085af2..43469c9 100644 --- a/src/docket/dependencies/_base.py +++ b/src/docket/dependencies/_base.py @@ -55,8 +55,15 @@ class AdmissionBlocked(Exception): This is the base exception for admission control mechanisms like concurrency limits, rate limits, or health gates. + + When ``reschedule`` is True (default), the worker re-queues the task + with a short delay. When False, the task is silently acknowledged + and dropped (appropriate for debounce/cooldown where re-trying would + just hit the same window). """ + reschedule: bool = True + def __init__(self, execution: Execution, reason: str = "admission control"): self.execution = execution self.reason = reason diff --git a/src/docket/dependencies/_cooldown.py b/src/docket/dependencies/_cooldown.py new file mode 100644 index 0000000..ff044ed --- /dev/null +++ b/src/docket/dependencies/_cooldown.py @@ -0,0 +1,94 @@ +"""Cooldown (trailing-edge) admission control dependency.""" + +from __future__ import annotations + +from datetime import timedelta +from types import TracebackType +from typing import TYPE_CHECKING, Any + +from ._base import AdmissionBlocked, Dependency, current_docket, current_execution + +if TYPE_CHECKING: # pragma: no cover + from ..execution import Execution + + +class CooldownBlocked(AdmissionBlocked): + """Raised when a task is blocked by cooldown.""" + + reschedule = False + + def __init__(self, execution: Execution, cooldown_key: str, window: timedelta): + self.cooldown_key = cooldown_key + self.window = window + reason = f"cooldown ({window}) on {cooldown_key}" + super().__init__(execution, reason=reason) + + +class Cooldown(Dependency["Cooldown"]): + """Trailing-edge cooldown: blocks execution if one recently succeeded. + + Checks for a Redis key on entry. If present, the task is blocked. + The key is only set on *successful* exit, so failed tasks don't + trigger the cooldown — they can be retried immediately. + + Works both as a default parameter and as ``Annotated`` metadata:: + + # Per-task: don't start if one succeeded in the last 60s + async def send_digest( + cooldown: Cooldown = Cooldown(timedelta(seconds=60)), + ) -> None: ... + + # Per-parameter: don't start for this customer if one succeeded in the last 60s + async def send_notification( + customer_id: Annotated[int, Cooldown(timedelta(seconds=60))], + ) -> None: ... + """ + + single: bool = True + + def __init__(self, window: timedelta, *, scope: str | None = None) -> None: + self.window = window + self.scope = scope + self._argument_name: str | None = None + self._argument_value: Any = None + + def bind_to_parameter(self, name: str, value: Any) -> Cooldown: + bound = Cooldown(self.window, scope=self.scope) + bound._argument_name = name + bound._argument_value = value + return bound + + def _cooldown_key(self, function_name: str) -> str: + scope = self.scope or current_docket.get().name + if self._argument_name is not None: + return f"{scope}:cooldown:{self._argument_name}:{self._argument_value}" + return f"{scope}:cooldown:{function_name}" + + async def __aenter__(self) -> Cooldown: + execution = current_execution.get() + docket = current_docket.get() + + self._key = self._cooldown_key(execution.function_name) + + async with docket.redis() as redis: + exists = await redis.exists(self._key) + + if exists: + raise CooldownBlocked(execution, self._key, self.window) + + return self + + async def __aexit__( + self, + exc_type: type[BaseException] | None, + exc_value: BaseException | None, + traceback: TracebackType | None, + ) -> None: + if exc_type is not None: + return + + docket = current_docket.get() + window_ms = int(self.window.total_seconds() * 1000) + + async with docket.redis() as redis: + await redis.set(self._key, 1, px=window_ms) diff --git a/src/docket/dependencies/_debounce.py b/src/docket/dependencies/_debounce.py new file mode 100644 index 0000000..5235e2f --- /dev/null +++ b/src/docket/dependencies/_debounce.py @@ -0,0 +1,88 @@ +"""Debounce (leading-edge) admission control dependency.""" + +from __future__ import annotations + +from datetime import timedelta +from types import TracebackType +from typing import TYPE_CHECKING, Any + +from ._base import AdmissionBlocked, Dependency, current_docket, current_execution + +if TYPE_CHECKING: # pragma: no cover + from ..execution import Execution + + +class DebounceBlocked(AdmissionBlocked): + """Raised when a task is blocked by debounce.""" + + reschedule = False + + def __init__(self, execution: Execution, debounce_key: str, window: timedelta): + self.debounce_key = debounce_key + self.window = window + reason = f"debounce ({window}) on {debounce_key}" + super().__init__(execution, reason=reason) + + +class Debounce(Dependency["Debounce"]): + """Leading-edge debounce: blocks execution if one was recently started. + + Sets a Redis key on entry with a TTL equal to the window. If the key + already exists, the task is blocked via ``AdmissionBlocked``. + + Works both as a default parameter and as ``Annotated`` metadata:: + + # Per-task: don't start if one started in the last 30s + async def process_webhooks( + debounce: Debounce = Debounce(timedelta(seconds=30)), + ) -> None: ... + + # Per-parameter: don't start for this customer if one started in the last 30s + async def process_customer( + customer_id: Annotated[int, Debounce(timedelta(seconds=30))], + ) -> None: ... + """ + + single: bool = True + + def __init__(self, window: timedelta, *, scope: str | None = None) -> None: + self.window = window + self.scope = scope + self._argument_name: str | None = None + self._argument_value: Any = None + + def bind_to_parameter(self, name: str, value: Any) -> Debounce: + bound = Debounce(self.window, scope=self.scope) + bound._argument_name = name + bound._argument_value = value + return bound + + async def __aenter__(self) -> Debounce: + execution = current_execution.get() + docket = current_docket.get() + + scope = self.scope or docket.name + if self._argument_name is not None: + debounce_key = ( + f"{scope}:debounce:{self._argument_name}:{self._argument_value}" + ) + else: + debounce_key = f"{scope}:debounce:{execution.function_name}" + + window_ms = int(self.window.total_seconds() * 1000) + + async with docket.redis() as redis: + acquired = await redis.set(debounce_key, 1, nx=True, px=window_ms) + + if not acquired: + raise DebounceBlocked(execution, debounce_key, self.window) + + return self + + async def __aexit__( + self, + exc_type: type[BaseException] | None, + exc_value: BaseException | None, + traceback: TracebackType | None, + ) -> None: + pass diff --git a/src/docket/dependencies/_resolution.py b/src/docket/dependencies/_resolution.py index c810c19..b7085f6 100644 --- a/src/docket/dependencies/_resolution.py +++ b/src/docket/dependencies/_resolution.py @@ -32,7 +32,7 @@ def get_single_dependency_parameter_of_type( for _, dependencies in get_annotation_dependencies(function).items(): for dependency in dependencies: if isinstance(dependency, dependency_type): - return dependency # type: ignore[return-value] + return dependency return None diff --git a/src/docket/worker.py b/src/docket/worker.py index 17ea7bc..3648ceb 100644 --- a/src/docket/worker.py +++ b/src/docket/worker.py @@ -528,15 +528,24 @@ async def process_completed_tasks() -> None: await task await ack_message(redis, message_id) except AdmissionBlocked as e: - logger.debug( - "🔒 Task %s blocked by admission control, rescheduling", - e.execution.key, - extra=log_context, - ) - e.execution.when = ( - datetime.now(timezone.utc) + ADMISSION_BLOCKED_RETRY_DELAY - ) - await e.execution.schedule(reschedule_message=message_id) + if e.reschedule: + logger.debug( + "⏳ Task %s blocked by admission control, rescheduling", + e.execution.key, + extra=log_context, + ) + e.execution.when = ( + datetime.now(timezone.utc) + ADMISSION_BLOCKED_RETRY_DELAY + ) + await e.execution.schedule(reschedule_message=message_id) + else: + logger.debug( + "⏭ Task %s blocked by admission control, dropping", + e.execution.key, + extra=log_context, + ) + await e.execution.mark_as_cancelled() + await ack_message(redis, message_id) async def ack_message(redis: Redis, message_id: RedisMessageID) -> None: logger.debug("Acknowledging message", extra=log_context) diff --git a/tests/test_cooldown.py b/tests/test_cooldown.py new file mode 100644 index 0000000..a7c9cca --- /dev/null +++ b/tests/test_cooldown.py @@ -0,0 +1,150 @@ +"""Tests for Cooldown dependency.""" + +from __future__ import annotations + +import asyncio +from datetime import timedelta +from typing import Annotated + +import pytest + +from docket import ConcurrencyLimit, Docket, Worker +from docket.dependencies import Cooldown + + +async def test_task_level_cooldown_blocks_after_success(docket: Docket, worker: Worker): + """Task-level cooldown blocks re-execution after a successful run.""" + results: list[str] = [] + + async def cooled_task( + cooldown: Cooldown = Cooldown(timedelta(seconds=5)), + ): + results.append("executed") + + await docket.add(cooled_task)() + await worker.run_until_finished() + assert results == ["executed"] + + await docket.add(cooled_task)() + await worker.run_until_finished() + + assert results == ["executed"] + + +async def test_task_level_cooldown_does_not_block_after_failure( + docket: Docket, worker: Worker +): + """Task-level cooldown does NOT block after failure (key not set).""" + attempts: list[int] = [] + call_count = 0 + + async def cooled_task( + cooldown: Cooldown = Cooldown(timedelta(seconds=5)), + ): + nonlocal call_count + call_count += 1 + attempts.append(call_count) + if call_count == 1: + raise RuntimeError("boom") + + await docket.add(cooled_task)() + await worker.run_until_finished() + assert attempts == [1] + + await docket.add(cooled_task)() + await worker.run_until_finished() + assert attempts == [1, 2] + + +async def test_task_level_cooldown_allows_after_window(docket: Docket, worker: Worker): + """Task-level cooldown allows execution after the window expires.""" + results: list[str] = [] + + async def cooled_task( + cooldown: Cooldown = Cooldown(timedelta(milliseconds=50)), + ): + results.append("executed") + + await docket.add(cooled_task)() + await worker.run_until_finished() + assert results == ["executed"] + + await asyncio.sleep(0.06) + + await docket.add(cooled_task)() + await worker.run_until_finished() + assert results == ["executed", "executed"] + + +async def test_per_parameter_cooldown_blocks_same_value(docket: Docket, worker: Worker): + """Per-parameter cooldown blocks same value, allows different values.""" + results: list[int] = [] + + async def cooled_task( + customer_id: Annotated[int, Cooldown(timedelta(seconds=5))], + ): + results.append(customer_id) + + await docket.add(cooled_task)(customer_id=1) + await worker.run_until_finished() + assert results == [1] + + await docket.add(cooled_task)(customer_id=1) + await docket.add(cooled_task)(customer_id=2) + worker.concurrency = 10 + await worker.run_until_finished() + + assert sorted(results) == [1, 2] + assert results.count(1) == 1 + + +async def test_cooldown_single_rejects_two(docket: Docket): + """single=True rejects two Cooldown on the same task.""" + with pytest.raises(ValueError, match="Only one Cooldown"): + + async def task( + a: Annotated[int, Cooldown(timedelta(seconds=1))], + b: Annotated[str, Cooldown(timedelta(seconds=2))], + ): ... # pragma: no cover + + await docket.add(task)(a=1, b="x") + + +async def test_cooldown_coexists_with_concurrency_limit(docket: Docket, worker: Worker): + """Cooldown + ConcurrencyLimit can coexist on the same task.""" + results: list[str] = [] + + async def task( + customer_id: Annotated[int, ConcurrencyLimit(1)], + cooldown: Cooldown = Cooldown(timedelta(milliseconds=50)), + ): + results.append(f"executed_{customer_id}") + + await docket.add(task)(customer_id=1) + await worker.run_until_finished() + assert results == ["executed_1"] + + +async def test_cooldown_key_cleaned_up_after_ttl(docket: Docket, worker: Worker): + """Redis key is cleaned up after TTL expires.""" + + async def cooled_task( + cooldown: Cooldown = Cooldown(timedelta(milliseconds=50)), + ): + pass + + await docket.add(cooled_task)() + await worker.run_until_finished() + + # Wait for TTL to expire + await asyncio.sleep(0.1) + + async with docket.redis() as redis: + # Scan for any cooldown keys — should all be expired + cooldown_keys: list[str] = [ + key + async for key in redis.scan_iter( # type: ignore[union-attr] + match=f"{docket.name}:cooldown:*" + ) + ] + assert cooldown_keys == [] diff --git a/tests/test_debounce.py b/tests/test_debounce.py new file mode 100644 index 0000000..beb920a --- /dev/null +++ b/tests/test_debounce.py @@ -0,0 +1,123 @@ +"""Tests for Debounce dependency.""" + +from __future__ import annotations + +import asyncio +from datetime import timedelta +from typing import Annotated + +import pytest + +from docket import ConcurrencyLimit, Docket, Worker +from docket.dependencies import Debounce + + +async def test_task_level_debounce_blocks_rapid_reexecution( + docket: Docket, worker: Worker +): + """Task-level debounce swallows duplicate execution within the window.""" + results: list[str] = [] + + async def debounced_task( + debounce: Debounce = Debounce(timedelta(seconds=5)), + ): + results.append("executed") + + await docket.add(debounced_task)() + await docket.add(debounced_task)() + + await worker.run_until_finished() + + assert results == ["executed"] + + +async def test_task_level_debounce_allows_after_window(docket: Docket, worker: Worker): + """Task-level debounce allows execution after the window expires.""" + results: list[str] = [] + + async def debounced_task( + debounce: Debounce = Debounce(timedelta(milliseconds=50)), + ): + results.append("executed") + + await docket.add(debounced_task)() + await worker.run_until_finished() + assert results == ["executed"] + + await asyncio.sleep(0.06) + + await docket.add(debounced_task)() + await worker.run_until_finished() + assert results == ["executed", "executed"] + + +async def test_per_parameter_debounce_blocks_same_value(docket: Docket, worker: Worker): + """Per-parameter debounce blocks same value, allows different values.""" + results: list[int] = [] + + async def debounced_task( + customer_id: Annotated[int, Debounce(timedelta(seconds=5))], + ): + results.append(customer_id) + + await docket.add(debounced_task)(customer_id=1) + await docket.add(debounced_task)(customer_id=1) + await docket.add(debounced_task)(customer_id=2) + + worker.concurrency = 10 + await worker.run_until_finished() + + assert sorted(results) == [1, 2] + assert results.count(1) == 1 + + +async def test_debounce_single_rejects_two(docket: Docket): + """single=True rejects two Debounce on the same task.""" + with pytest.raises(ValueError, match="Only one Debounce"): + + async def task( + a: Annotated[int, Debounce(timedelta(seconds=1))], + b: Annotated[str, Debounce(timedelta(seconds=2))], + ): ... # pragma: no cover + + await docket.add(task)(a=1, b="x") + + +async def test_debounce_coexists_with_concurrency_limit(docket: Docket, worker: Worker): + """Debounce + ConcurrencyLimit can coexist on the same task.""" + results: list[str] = [] + + async def task( + customer_id: Annotated[int, ConcurrencyLimit(1)], + debounce: Debounce = Debounce(timedelta(milliseconds=50)), + ): + results.append(f"executed_{customer_id}") + + await docket.add(task)(customer_id=1) + await worker.run_until_finished() + assert results == ["executed_1"] + + +async def test_debounce_key_cleaned_up_after_ttl(docket: Docket, worker: Worker): + """Redis key is cleaned up after TTL expires.""" + + async def debounced_task( + debounce: Debounce = Debounce(timedelta(milliseconds=50)), + ): + pass + + await docket.add(debounced_task)() + await worker.run_until_finished() + + # Wait for TTL to expire + await asyncio.sleep(0.1) + + async with docket.redis() as redis: + # Scan for any debounce keys — should all be expired + debounce_keys: list[str] = [ + key + async for key in redis.scan_iter( # type: ignore[union-attr] + match=f"{docket.name}:debounce:*" + ) + ] + assert debounce_keys == [] From cd38e0e24836fb0491cca3c284497da7d1cbc4ad Mon Sep 17 00:00:00 2001 From: Chris Guidry Date: Fri, 27 Feb 2026 15:29:02 -0500 Subject: [PATCH 2/7] Document Debounce, Cooldown, and update ConcurrencyLimit to Annotated style Adds docs for the new Debounce and Cooldown admission controls, and rewrites the ConcurrencyLimit section to use the Annotated style as the primary approach (with a backward-compat note for the old string-name style). Co-Authored-By: Claude Opus 4.6 --- docs/getting-started.md | 2 +- docs/task-behaviors.md | 224 +++++++++++++++++++++++++--------------- 2 files changed, 144 insertions(+), 82 deletions(-) diff --git a/docs/getting-started.md b/docs/getting-started.md index 4fa0afd..54c74c8 100644 --- a/docs/getting-started.md +++ b/docs/getting-started.md @@ -244,7 +244,7 @@ You now know the core concepts: creating dockets, scheduling work with idempoten Ready for more? Check out: -- **[Task Behaviors](task-behaviors.md)** - Retries, timeouts, progress reporting, and concurrency control +- **[Task Behaviors](task-behaviors.md)** - Retries, timeouts, progress reporting, concurrency control, debounce, and cooldown - **[Dependency Injection](dependency-injection.md)** - Access current docket, custom dependencies, shared resources - **[Testing with Docket](testing.md)** - Ergonomic testing utilities for unit and integration tests - **[Task Design Patterns](task-patterns.md)** - Find & flood, task scattering, logging, and task chains diff --git a/docs/task-behaviors.md b/docs/task-behaviors.md index b01128f..90203b2 100644 --- a/docs/task-behaviors.md +++ b/docs/task-behaviors.md @@ -305,18 +305,18 @@ For more details on progress monitoring patterns and real-time observation, see ## Concurrency Control -Docket provides fine-grained concurrency control that allows you to limit the number of concurrent tasks based on specific argument values. This is essential for protecting shared resources, preventing overwhelming external services, and managing database connections. +Docket provides fine-grained concurrency control that limits how many tasks can run at the same time, based on specific argument values. This is useful for protecting shared resources, preventing overwhelming external services, and managing database connections. -### Basic Concurrency Limits +### Per-Argument Concurrency -Use `ConcurrencyLimit` to restrict concurrent execution based on task arguments: +Use `Annotated` to limit concurrency based on a specific argument value. Each distinct value gets its own independent limit: ```python +from typing import Annotated from docket import ConcurrencyLimit async def process_customer_data( - customer_id: int, - concurrency: ConcurrencyLimit = ConcurrencyLimit("customer_id", max_concurrent=1) + customer_id: Annotated[int, ConcurrencyLimit(1)], ) -> None: # Only one task per customer_id can run at a time await update_customer_profile(customer_id) @@ -325,11 +325,22 @@ async def process_customer_data( # These will run sequentially for the same customer await docket.add(process_customer_data)(customer_id=1001) await docket.add(process_customer_data)(customer_id=1001) -await docket.add(process_customer_data)(customer_id=1001) # But different customers can run concurrently await docket.add(process_customer_data)(customer_id=2001) # Runs in parallel -await docket.add(process_customer_data)(customer_id=3001) # Runs in parallel +``` + +### Per-Task Concurrency + +Use a default parameter to limit the total number of concurrent executions of a task, regardless of arguments: + +```python +async def expensive_computation( + input_data: str, + concurrency: ConcurrencyLimit = ConcurrencyLimit(max_concurrent=3), +) -> None: + # At most 3 of these tasks can run at once across all arguments + await run_computation(input_data) ``` ### Database Connection Pooling @@ -338,9 +349,8 @@ Limit concurrent database operations to prevent overwhelming your database: ```python async def backup_database_table( - db_name: str, + db_name: Annotated[str, ConcurrencyLimit(2)], table_name: str, - concurrency: ConcurrencyLimit = ConcurrencyLimit("db_name", max_concurrent=2) ) -> None: # Maximum 2 backup operations per database at once await create_table_backup(db_name, table_name) @@ -360,8 +370,7 @@ Protect external APIs from being overwhelmed: ```python async def sync_user_with_external_service( user_id: int, - service_name: str, - concurrency: ConcurrencyLimit = ConcurrencyLimit("service_name", max_concurrent=5) + service_name: Annotated[str, ConcurrencyLimit(5)], ) -> None: # Limit to 5 concurrent API calls per external service api_client = get_api_client(service_name) @@ -374,84 +383,22 @@ await docket.add(sync_user_with_external_service)(456, "salesforce") # Will que await docket.add(sync_user_with_external_service)(789, "hubspot") # Different service, runs in parallel ``` -### File Processing Limits - -Control concurrent file operations to manage disk I/O: - -```python -async def process_media_file( - file_path: str, - operation_type: str, - concurrency: ConcurrencyLimit = ConcurrencyLimit("operation_type", max_concurrent=3) -) -> None: - # Limit concurrent operations by type (e.g., 3 video transcodes, 3 image resizes) - if operation_type == "video_transcode": - await transcode_video(file_path) - elif operation_type == "image_resize": - await resize_image(file_path) - elif operation_type == "audio_compress": - await compress_audio(file_path) - -# Different operation types can run concurrently, but each type is limited -await docket.add(process_media_file)("/videos/movie1.mp4", "video_transcode") -await docket.add(process_media_file)("/videos/movie2.mp4", "video_transcode") -await docket.add(process_media_file)("/images/photo1.jpg", "image_resize") # Runs in parallel -``` - ### Custom Scopes Use custom scopes to create independent concurrency limits: ```python async def process_tenant_data( - tenant_id: str, + tenant_id: Annotated[str, ConcurrencyLimit(2, scope="tenant_operations")], operation: str, - concurrency: ConcurrencyLimit = ConcurrencyLimit( - "tenant_id", - max_concurrent=2, - scope="tenant_operations" - ) ) -> None: # Each tenant can have up to 2 concurrent operations await perform_tenant_operation(tenant_id, operation) - -async def process_global_data( - data_type: str, - concurrency: ConcurrencyLimit = ConcurrencyLimit( - "data_type", - max_concurrent=1, - scope="global_operations" # Separate from tenant operations - ) -) -> None: - # Global operations have their own concurrency limits - await process_global_data_type(data_type) ``` -### Multi-Level Concurrency - -Combine multiple concurrency controls for complex scenarios: - -```python -async def process_user_export( - user_id: int, - export_type: str, - region: str, - user_limit: ConcurrencyLimit = ConcurrencyLimit("user_id", max_concurrent=1), - type_limit: ConcurrencyLimit = ConcurrencyLimit("export_type", max_concurrent=3), - region_limit: ConcurrencyLimit = ConcurrencyLimit("region", max_concurrent=10) -) -> None: - # This task respects ALL concurrency limits: - # - Only 1 export per user at a time - # - Only 3 exports of each type globally - # - Only 10 exports per region - await generate_user_export(user_id, export_type, region) -``` - -**Note**: When using multiple `ConcurrencyLimit` dependencies, all limits must be satisfied before the task can start. - ### Monitoring Concurrency -Concurrency limits are enforced using Redis sets, so you can monitor them: +Concurrency limits are enforced using Redis sorted sets, so you can monitor them: ```python async def monitor_concurrency_usage() -> None: @@ -467,14 +414,129 @@ async def monitor_concurrency_usage() -> None: print(f"{key}: {count} active tasks") ``` -### Tips +!!! note "Legacy default-parameter style" + Prior to 0.18, `ConcurrencyLimit` required passing the argument name as a + string: `ConcurrencyLimit("customer_id", max_concurrent=1)`. This style + still works but `Annotated` is preferred — it avoids the string-name + duplication and is consistent with Debounce, Cooldown, and other + dependencies. + +## Debounce + +Debounce is a leading-edge admission control: if a task was recently started, duplicate submissions within the window are silently dropped. This is useful for deduplicating rapid-fire events like webhooks, where the same event may arrive multiple times in quick succession. + +### Per-Task Debounce + +Apply debounce to the whole task so only one execution can start within the window: + +```python +from datetime import timedelta +from docket import Debounce + +async def process_webhooks( + debounce: Debounce = Debounce(timedelta(seconds=30)), +) -> None: + events = await fetch_pending_webhook_events() + await process_events(events) + +# First call starts immediately and sets a 30-second window +await docket.add(process_webhooks)() + +# This one arrives 5 seconds later — silently dropped +await docket.add(process_webhooks)() +``` + +### Per-Parameter Debounce + +Use `Annotated` to debounce based on a specific argument value. Different values get independent windows: + +```python +from typing import Annotated + +async def sync_customer( + customer_id: Annotated[int, Debounce(timedelta(seconds=30))], +) -> None: + await refresh_customer_data(customer_id) + +# First sync for customer 1001 starts immediately +await docket.add(sync_customer)(customer_id=1001) + +# Duplicate for 1001 within 30s — dropped +await docket.add(sync_customer)(customer_id=1001) + +# Different customer — runs immediately +await docket.add(sync_customer)(customer_id=2002) +``` -1. **Choose appropriate argument names**: Use arguments that represent the resource you want to protect (database name, customer ID, API endpoint). +### How It Works -2. **Set reasonable limits**: Base limits on your system's capacity and external service constraints. +On entry, debounce atomically sets a Redis key with a TTL equal to the window (`SET key 1 NX PX window_ms`). If the key already exists, the task is dropped without rescheduling. The key expires naturally after the window, so no cleanup is needed. -3. **Use descriptive scopes**: When you have multiple unrelated concurrency controls, use different scopes to avoid conflicts. +## Cooldown + +Cooldown is a trailing-edge admission control: if a task recently _succeeded_, new submissions are dropped. Unlike debounce, the cooldown window only starts after a successful execution — failed tasks don't trigger it, so they can be retried immediately. + +### Per-Task Cooldown + +```python +from datetime import timedelta +from docket import Cooldown + +async def send_daily_digest( + cooldown: Cooldown = Cooldown(timedelta(minutes=30)), +) -> None: + digest = await build_digest() + await send_email(digest) + +# Runs and succeeds — starts a 30-minute cooldown +await docket.add(send_daily_digest)() + +# Within the cooldown window — silently dropped +await docket.add(send_daily_digest)() +``` -4. **Monitor blocked tasks**: Tasks that can't start due to concurrency limits are automatically rescheduled with small delays. +If `send_daily_digest` raises an exception, no cooldown key is set and the task can be submitted again immediately. + +### Per-Parameter Cooldown + +```python +from typing import Annotated + +async def send_notification( + customer_id: Annotated[int, Cooldown(timedelta(minutes=5))], +) -> None: + await deliver_notification(customer_id) + +# Notification for customer 1001 — sends and starts cooldown +await docket.add(send_notification)(customer_id=1001) + +# Another for 1001 within 5 minutes — dropped +await docket.add(send_notification)(customer_id=1001) + +# Different customer — sends immediately +await docket.add(send_notification)(customer_id=2002) +``` + +### Debounce vs. Cooldown + +| | Debounce | Cooldown | +|---|---|---| +| **When does the window start?** | When the task _starts_ | When the task _succeeds_ | +| **Failed tasks** | Still block the window | Don't trigger cooldown | +| **Good for** | Deduplicating incoming events | Rate-limiting outgoing side effects | + +### Combining with Other Controls + +Debounce, cooldown, and concurrency limits can all coexist on the same task: + +```python +from typing import Annotated + +async def process_order( + order_id: Annotated[int, ConcurrencyLimit(1)], + cooldown: Cooldown = Cooldown(timedelta(seconds=60)), +) -> None: + await finalize_order(order_id) +``` -5. **Consider cascading effects**: Concurrency limits can create queuing effects - monitor your system to ensure tasks don't back up excessively. +Each admission control is checked independently. A task must satisfy all of them to start. From 7ebc656eaf2961b1f5ca1e3073fff95c8ac18a72 Mon Sep 17 00:00:00 2001 From: Chris Guidry Date: Fri, 27 Feb 2026 15:34:37 -0500 Subject: [PATCH 3/7] Fix docs wording: lead with the dependency name, not Annotated Co-Authored-By: Claude Opus 4.6 --- docs/task-behaviors.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/task-behaviors.md b/docs/task-behaviors.md index 90203b2..a2c0438 100644 --- a/docs/task-behaviors.md +++ b/docs/task-behaviors.md @@ -309,7 +309,7 @@ Docket provides fine-grained concurrency control that limits how many tasks can ### Per-Argument Concurrency -Use `Annotated` to limit concurrency based on a specific argument value. Each distinct value gets its own independent limit: +Annotate a parameter with `ConcurrencyLimit` to limit concurrency based on its value. Each distinct value gets its own independent limit: ```python from typing import Annotated @@ -448,7 +448,7 @@ await docket.add(process_webhooks)() ### Per-Parameter Debounce -Use `Annotated` to debounce based on a specific argument value. Different values get independent windows: +Annotate a parameter with `Debounce` to debounce based on its value. Different values get independent windows: ```python from typing import Annotated From 4138d299145e294f9548cb9673093fe4a27e73ed Mon Sep 17 00:00:00 2001 From: Chris Guidry Date: Fri, 27 Feb 2026 16:10:26 -0500 Subject: [PATCH 4/7] Rework Debounce/Cooldown semantics and add true debounce After reviewing what we shipped in 1b08852, we realized the names were backwards: what we called "Debounce" (execute first, drop duplicates) is really a cooldown, and we were missing true debounce (wait for things to settle, then fire once). - **Cooldown** (formerly Debounce): same SET NX PX logic, just renamed with `cooldown:` key prefix - **Debounce** (new): Lua script with winner + last_seen keys. First submission becomes the winner and bounces via reschedule until the settle window passes without new activity, then proceeds. Non-winners are immediately dropped. - Removed the old success-anchored Cooldown (check on entry, set on exit) - Added `retry_delay` to `AdmissionBlocked` so Debounce can tell the worker exactly how long to wait rather than using the fixed default Closes #322, closes #161. Co-Authored-By: Claude Opus 4.6 --- docs/task-behaviors.md | 85 +++++++-------- src/docket/dependencies/_base.py | 3 + src/docket/dependencies/_cooldown.py | 56 +++++----- src/docket/dependencies/_debounce.py | 150 ++++++++++++++++++++++----- src/docket/worker.py | 5 +- tests/test_cooldown.py | 39 ++----- tests/test_debounce.py | 46 ++++---- 7 files changed, 225 insertions(+), 159 deletions(-) diff --git a/docs/task-behaviors.md b/docs/task-behaviors.md index a2c0438..55c3ad6 100644 --- a/docs/task-behaviors.md +++ b/docs/task-behaviors.md @@ -421,20 +421,18 @@ async def monitor_concurrency_usage() -> None: duplication and is consistent with Debounce, Cooldown, and other dependencies. -## Debounce +## Cooldown -Debounce is a leading-edge admission control: if a task was recently started, duplicate submissions within the window are silently dropped. This is useful for deduplicating rapid-fire events like webhooks, where the same event may arrive multiple times in quick succession. +Cooldown executes the first submission immediately, then drops duplicates within a window. On entry it atomically sets a Redis key with a TTL (`SET key 1 NX PX window_ms`). If the key already exists the task is silently dropped. The key expires naturally after the window. -### Per-Task Debounce - -Apply debounce to the whole task so only one execution can start within the window: +### Per-Task Cooldown ```python from datetime import timedelta -from docket import Debounce +from docket import Cooldown async def process_webhooks( - debounce: Debounce = Debounce(timedelta(seconds=30)), + cooldown: Cooldown = Cooldown(timedelta(seconds=30)), ) -> None: events = await fetch_pending_webhook_events() await process_events(events) @@ -446,15 +444,15 @@ await docket.add(process_webhooks)() await docket.add(process_webhooks)() ``` -### Per-Parameter Debounce +### Per-Parameter Cooldown -Annotate a parameter with `Debounce` to debounce based on its value. Different values get independent windows: +Annotate a parameter with `Cooldown` to apply independent windows per value: ```python from typing import Annotated async def sync_customer( - customer_id: Annotated[int, Debounce(timedelta(seconds=30))], + customer_id: Annotated[int, Cooldown(timedelta(seconds=30))], ) -> None: await refresh_customer_data(customer_id) @@ -468,62 +466,65 @@ await docket.add(sync_customer)(customer_id=1001) await docket.add(sync_customer)(customer_id=2002) ``` -### How It Works - -On entry, debounce atomically sets a Redis key with a TTL equal to the window (`SET key 1 NX PX window_ms`). If the key already exists, the task is dropped without rescheduling. The key expires naturally after the window, so no cleanup is needed. - -## Cooldown +## Debounce -Cooldown is a trailing-edge admission control: if a task recently _succeeded_, new submissions are dropped. Unlike debounce, the cooldown window only starts after a successful execution — failed tasks don't trigger it, so they can be retried immediately. +Debounce waits for submissions to settle before firing. When rapid-fire events arrive, only one task runs — after a quiet period equal to the settle window. This is the classic "trailing-edge" debounce: keep resetting the timer on each new event, then fire once things calm down. -### Per-Task Cooldown +### Per-Task Debounce ```python from datetime import timedelta -from docket import Cooldown +from docket import Debounce -async def send_daily_digest( - cooldown: Cooldown = Cooldown(timedelta(minutes=30)), +async def process_webhooks( + debounce: Debounce = Debounce(timedelta(seconds=5)), ) -> None: - digest = await build_digest() - await send_email(digest) + events = await fetch_pending_webhook_events() + await process_events(events) -# Runs and succeeds — starts a 30-minute cooldown -await docket.add(send_daily_digest)() +# First submission becomes the "winner" and gets rescheduled +await docket.add(process_webhooks)() -# Within the cooldown window — silently dropped -await docket.add(send_daily_digest)() +# More events arrive — they reset the settle timer but are dropped +await docket.add(process_webhooks)() +await docket.add(process_webhooks)() + +# After 5 seconds of quiet, the winner proceeds ``` -If `send_daily_digest` raises an exception, no cooldown key is set and the task can be submitted again immediately. +### Per-Parameter Debounce -### Per-Parameter Cooldown +Annotate a parameter with `Debounce` to get independent settle windows per value: ```python from typing import Annotated -async def send_notification( - customer_id: Annotated[int, Cooldown(timedelta(minutes=5))], +async def sync_customer( + customer_id: Annotated[int, Debounce(timedelta(seconds=5))], ) -> None: - await deliver_notification(customer_id) + await refresh_customer_data(customer_id) + +# Each customer_id gets its own independent settle window +await docket.add(sync_customer)(customer_id=1001) +await docket.add(sync_customer)(customer_id=1001) # resets 1001's timer +await docket.add(sync_customer)(customer_id=2002) # independent window +``` -# Notification for customer 1001 — sends and starts cooldown -await docket.add(send_notification)(customer_id=1001) +### How It Works -# Another for 1001 within 5 minutes — dropped -await docket.add(send_notification)(customer_id=1001) +Debounce uses two Redis keys per scope — a **winner** key (which task gets to proceed) and a **last_seen** timestamp — managed by an atomic Lua script: -# Different customer — sends immediately -await docket.add(send_notification)(customer_id=2002) -``` +1. **No winner exists** — the task becomes the winner and gets rescheduled for the full settle window. +2. **Winner returns from reschedule** — if enough time has passed since the last submission, it proceeds. Otherwise it reschedules for the remaining time. +3. **Non-winner arrives** — updates the last_seen timestamp (resetting the settle timer) and is immediately dropped. ### Debounce vs. Cooldown -| | Debounce | Cooldown | +| | Cooldown | Debounce | |---|---|---| -| **When does the window start?** | When the task _starts_ | When the task _succeeds_ | -| **Failed tasks** | Still block the window | Don't trigger cooldown | -| **Good for** | Deduplicating incoming events | Rate-limiting outgoing side effects | +| **Behavior** | Execute first, drop duplicates | Wait for quiet, then execute | +| **Window anchored to** | First execution | Last submission | +| **Good for** | Deduplicating rapid-fire events | Batching bursts into one action | ### Combining with Other Controls diff --git a/src/docket/dependencies/_base.py b/src/docket/dependencies/_base.py index 43469c9..841ed15 100644 --- a/src/docket/dependencies/_base.py +++ b/src/docket/dependencies/_base.py @@ -60,9 +60,12 @@ class AdmissionBlocked(Exception): with a short delay. When False, the task is silently acknowledged and dropped (appropriate for debounce/cooldown where re-trying would just hit the same window). + + ``retry_delay`` overrides the default reschedule delay when set. """ reschedule: bool = True + retry_delay: timedelta | None = None def __init__(self, execution: Execution, reason: str = "admission control"): self.execution = execution diff --git a/src/docket/dependencies/_cooldown.py b/src/docket/dependencies/_cooldown.py index ff044ed..dde4fb0 100644 --- a/src/docket/dependencies/_cooldown.py +++ b/src/docket/dependencies/_cooldown.py @@ -1,4 +1,8 @@ -"""Cooldown (trailing-edge) admission control dependency.""" +"""Cooldown admission control dependency. + +Executes the first task immediately, then drops duplicates within the window. +Sets a Redis key on entry with TTL. +""" from __future__ import annotations @@ -25,22 +29,21 @@ def __init__(self, execution: Execution, cooldown_key: str, window: timedelta): class Cooldown(Dependency["Cooldown"]): - """Trailing-edge cooldown: blocks execution if one recently succeeded. + """Execute first, drop duplicates within window. - Checks for a Redis key on entry. If present, the task is blocked. - The key is only set on *successful* exit, so failed tasks don't - trigger the cooldown — they can be retried immediately. + Sets a Redis key on entry with a TTL equal to the window. If the key + already exists, the task is blocked and silently dropped. Works both as a default parameter and as ``Annotated`` metadata:: - # Per-task: don't start if one succeeded in the last 60s - async def send_digest( - cooldown: Cooldown = Cooldown(timedelta(seconds=60)), + # Per-task: don't start if one started in the last 30s + async def process_webhooks( + cooldown: Cooldown = Cooldown(timedelta(seconds=30)), ) -> None: ... - # Per-parameter: don't start for this customer if one succeeded in the last 60s - async def send_notification( - customer_id: Annotated[int, Cooldown(timedelta(seconds=60))], + # Per-parameter: don't start for this customer if one started in the last 30s + async def process_customer( + customer_id: Annotated[int, Cooldown(timedelta(seconds=30))], ) -> None: ... """ @@ -58,23 +61,25 @@ def bind_to_parameter(self, name: str, value: Any) -> Cooldown: bound._argument_value = value return bound - def _cooldown_key(self, function_name: str) -> str: - scope = self.scope or current_docket.get().name - if self._argument_name is not None: - return f"{scope}:cooldown:{self._argument_name}:{self._argument_value}" - return f"{scope}:cooldown:{function_name}" - async def __aenter__(self) -> Cooldown: execution = current_execution.get() docket = current_docket.get() - self._key = self._cooldown_key(execution.function_name) + scope = self.scope or docket.name + if self._argument_name is not None: + cooldown_key = ( + f"{scope}:cooldown:{self._argument_name}:{self._argument_value}" + ) + else: + cooldown_key = f"{scope}:cooldown:{execution.function_name}" + + window_ms = int(self.window.total_seconds() * 1000) async with docket.redis() as redis: - exists = await redis.exists(self._key) + acquired = await redis.set(cooldown_key, 1, nx=True, px=window_ms) - if exists: - raise CooldownBlocked(execution, self._key, self.window) + if not acquired: + raise CooldownBlocked(execution, cooldown_key, self.window) return self @@ -84,11 +89,4 @@ async def __aexit__( exc_value: BaseException | None, traceback: TracebackType | None, ) -> None: - if exc_type is not None: - return - - docket = current_docket.get() - window_ms = int(self.window.total_seconds() * 1000) - - async with docket.redis() as redis: - await redis.set(self._key, 1, px=window_ms) + pass diff --git a/src/docket/dependencies/_debounce.py b/src/docket/dependencies/_debounce.py index 5235e2f..99ad6ed 100644 --- a/src/docket/dependencies/_debounce.py +++ b/src/docket/dependencies/_debounce.py @@ -1,7 +1,13 @@ -"""Debounce (leading-edge) admission control dependency.""" +"""Debounce (trailing-edge / settle) admission control dependency. + +Waits for submissions to settle, then fires once. Uses two Redis keys +(winner + last_seen) so only one task bounces while the rest immediately +drop. +""" from __future__ import annotations +import time from datetime import timedelta from types import TracebackType from typing import TYPE_CHECKING, Any @@ -11,48 +17,115 @@ if TYPE_CHECKING: # pragma: no cover from ..execution import Execution +# Lua script for atomic debounce logic. +# +# KEYS[1] = winner key (holds the execution key of the chosen task) +# KEYS[2] = last_seen key (holds ms timestamp of most recent submission) +# ARGV[1] = my execution key +# ARGV[2] = settle window in milliseconds +# ARGV[3] = current time in milliseconds +# ARGV[4] = key TTL in milliseconds (settle * 10) +# +# Returns: {action, remaining_ms} +# action: 1=PROCEED, 2=RESCHEDULE, 3=DROP +# remaining_ms: ms until settle window expires (only for RESCHEDULE) +_DEBOUNCE_LUA = """ +local winner_key = KEYS[1] +local seen_key = KEYS[2] +local my_key = ARGV[1] +local settle_ms = tonumber(ARGV[2]) +local now_ms = tonumber(ARGV[3]) +local ttl_ms = tonumber(ARGV[4]) + +local winner = redis.call('GET', winner_key) + +if not winner then + -- No winner: I become winner, record last_seen = now + redis.call('SET', winner_key, my_key, 'PX', ttl_ms) + redis.call('SET', seen_key, tostring(now_ms), 'PX', ttl_ms) + return {2, settle_ms} +end + +if winner == my_key then + -- I'm the winner, returning from reschedule + local last_seen_str = redis.call('GET', seen_key) + local last_seen = tonumber(last_seen_str) or 0 + local elapsed = now_ms - last_seen + + if elapsed >= settle_ms then + -- Settled: clean up and proceed + redis.call('DEL', winner_key, seen_key) + return {1, 0} + else + -- Not settled yet: refresh TTLs and reschedule for remaining time + local remaining = settle_ms - elapsed + redis.call('PEXPIRE', winner_key, ttl_ms) + redis.call('PEXPIRE', seen_key, ttl_ms) + return {2, remaining} + end +end + +-- Someone else is the winner: update last_seen and refresh TTLs +redis.call('SET', seen_key, tostring(now_ms), 'PX', ttl_ms) +redis.call('PEXPIRE', winner_key, ttl_ms) +return {3, 0} +""" + +_ACTION_PROCEED = 1 +_ACTION_RESCHEDULE = 2 +_ACTION_DROP = 3 + class DebounceBlocked(AdmissionBlocked): """Raised when a task is blocked by debounce.""" - reschedule = False - - def __init__(self, execution: Execution, debounce_key: str, window: timedelta): + def __init__( + self, + execution: Execution, + debounce_key: str, + settle: timedelta, + *, + reschedule: bool, + retry_delay: timedelta | None = None, + ): self.debounce_key = debounce_key - self.window = window - reason = f"debounce ({window}) on {debounce_key}" + self.settle = settle + self.reschedule = reschedule # type: ignore[assignment] + self.retry_delay = retry_delay # type: ignore[assignment] + reason = f"debounce ({settle}) on {debounce_key}" super().__init__(execution, reason=reason) class Debounce(Dependency["Debounce"]): - """Leading-edge debounce: blocks execution if one was recently started. + """Wait for submissions to settle, then fire once. - Sets a Redis key on entry with a TTL equal to the window. If the key - already exists, the task is blocked via ``AdmissionBlocked``. + Uses two Redis keys per scope — a "winner" key (which execution gets + to proceed) and a "last_seen" timestamp. Only the winner bounces + via reschedule; all other submissions are immediately dropped. Works both as a default parameter and as ``Annotated`` metadata:: - # Per-task: don't start if one started in the last 30s + # Per-task: wait for 5s of quiet, then execute once async def process_webhooks( - debounce: Debounce = Debounce(timedelta(seconds=30)), + debounce: Debounce = Debounce(timedelta(seconds=5)), ) -> None: ... - # Per-parameter: don't start for this customer if one started in the last 30s - async def process_customer( - customer_id: Annotated[int, Debounce(timedelta(seconds=30))], + # Per-parameter: independent settle window per customer + async def sync_customer( + customer_id: Annotated[int, Debounce(timedelta(seconds=5))], ) -> None: ... """ single: bool = True - def __init__(self, window: timedelta, *, scope: str | None = None) -> None: - self.window = window + def __init__(self, settle: timedelta, *, scope: str | None = None) -> None: + self.settle = settle self.scope = scope self._argument_name: str | None = None self._argument_value: Any = None def bind_to_parameter(self, name: str, value: Any) -> Debounce: - bound = Debounce(self.window, scope=self.scope) + bound = Debounce(self.settle, scope=self.scope) bound._argument_name = name bound._argument_value = value return bound @@ -63,21 +136,46 @@ async def __aenter__(self) -> Debounce: scope = self.scope or docket.name if self._argument_name is not None: - debounce_key = ( - f"{scope}:debounce:{self._argument_name}:{self._argument_value}" - ) + base_key = f"{scope}:debounce:{self._argument_name}:{self._argument_value}" else: - debounce_key = f"{scope}:debounce:{execution.function_name}" + base_key = f"{scope}:debounce:{execution.function_name}" - window_ms = int(self.window.total_seconds() * 1000) + winner_key = f"{base_key}:winner" + seen_key = f"{base_key}:last_seen" + + settle_ms = int(self.settle.total_seconds() * 1000) + now_ms = int(time.time() * 1000) + ttl_ms = settle_ms * 10 async with docket.redis() as redis: - acquired = await redis.set(debounce_key, 1, nx=True, px=window_ms) + script = redis.register_script(_DEBOUNCE_LUA) + result: list[int] = await script( + keys=[winner_key, seen_key], + args=[execution.key, settle_ms, now_ms, ttl_ms], + ) + + action = result[0] + remaining_ms = result[1] - if not acquired: - raise DebounceBlocked(execution, debounce_key, self.window) + if action == _ACTION_PROCEED: + return self + + if action == _ACTION_RESCHEDULE: + raise DebounceBlocked( + execution, + base_key, + self.settle, + reschedule=True, + retry_delay=timedelta(milliseconds=remaining_ms), + ) - return self + # DROP + raise DebounceBlocked( + execution, + base_key, + self.settle, + reschedule=False, + ) async def __aexit__( self, diff --git a/src/docket/worker.py b/src/docket/worker.py index 3648ceb..6866425 100644 --- a/src/docket/worker.py +++ b/src/docket/worker.py @@ -529,14 +529,13 @@ async def process_completed_tasks() -> None: await ack_message(redis, message_id) except AdmissionBlocked as e: if e.reschedule: + delay = e.retry_delay or ADMISSION_BLOCKED_RETRY_DELAY logger.debug( "⏳ Task %s blocked by admission control, rescheduling", e.execution.key, extra=log_context, ) - e.execution.when = ( - datetime.now(timezone.utc) + ADMISSION_BLOCKED_RETRY_DELAY - ) + e.execution.when = datetime.now(timezone.utc) + delay await e.execution.schedule(reschedule_message=message_id) else: logger.debug( diff --git a/tests/test_cooldown.py b/tests/test_cooldown.py index a7c9cca..40ef219 100644 --- a/tests/test_cooldown.py +++ b/tests/test_cooldown.py @@ -12,8 +12,10 @@ from docket.dependencies import Cooldown -async def test_task_level_cooldown_blocks_after_success(docket: Docket, worker: Worker): - """Task-level cooldown blocks re-execution after a successful run.""" +async def test_task_level_cooldown_blocks_rapid_reexecution( + docket: Docket, worker: Worker +): + """Task-level cooldown drops duplicate execution within the window.""" results: list[str] = [] async def cooled_task( @@ -22,38 +24,11 @@ async def cooled_task( results.append("executed") await docket.add(cooled_task)() - await worker.run_until_finished() - assert results == ["executed"] - await docket.add(cooled_task)() - await worker.run_until_finished() - - assert results == ["executed"] - - -async def test_task_level_cooldown_does_not_block_after_failure( - docket: Docket, worker: Worker -): - """Task-level cooldown does NOT block after failure (key not set).""" - attempts: list[int] = [] - call_count = 0 - async def cooled_task( - cooldown: Cooldown = Cooldown(timedelta(seconds=5)), - ): - nonlocal call_count - call_count += 1 - attempts.append(call_count) - if call_count == 1: - raise RuntimeError("boom") - - await docket.add(cooled_task)() await worker.run_until_finished() - assert attempts == [1] - await docket.add(cooled_task)() - await worker.run_until_finished() - assert attempts == [1, 2] + assert results == ["executed"] async def test_task_level_cooldown_allows_after_window(docket: Docket, worker: Worker): @@ -86,11 +61,9 @@ async def cooled_task( results.append(customer_id) await docket.add(cooled_task)(customer_id=1) - await worker.run_until_finished() - assert results == [1] - await docket.add(cooled_task)(customer_id=1) await docket.add(cooled_task)(customer_id=2) + worker.concurrency = 10 await worker.run_until_finished() diff --git a/tests/test_debounce.py b/tests/test_debounce.py index beb920a..12c8149 100644 --- a/tests/test_debounce.py +++ b/tests/test_debounce.py @@ -1,4 +1,4 @@ -"""Tests for Debounce dependency.""" +"""Tests for Debounce dependency (true debounce: settle then fire).""" from __future__ import annotations @@ -12,59 +12,54 @@ from docket.dependencies import Debounce -async def test_task_level_debounce_blocks_rapid_reexecution( - docket: Docket, worker: Worker -): - """Task-level debounce swallows duplicate execution within the window.""" +async def test_single_submission_fires_after_settle(docket: Docket, worker: Worker): + """A single submission fires after the settle window, not immediately.""" results: list[str] = [] async def debounced_task( - debounce: Debounce = Debounce(timedelta(seconds=5)), + debounce: Debounce = Debounce(timedelta(milliseconds=50)), ): results.append("executed") await docket.add(debounced_task)() - await docket.add(debounced_task)() + # The winner gets rescheduled; run_until_finished processes it await worker.run_until_finished() - assert results == ["executed"] -async def test_task_level_debounce_allows_after_window(docket: Docket, worker: Worker): - """Task-level debounce allows execution after the window expires.""" +async def test_rapid_submissions_only_one_execution(docket: Docket, worker: Worker): + """Rapid submissions result in only one execution after settling.""" results: list[str] = [] async def debounced_task( - debounce: Debounce = Debounce(timedelta(milliseconds=50)), + debounce: Debounce = Debounce(timedelta(milliseconds=100)), ): results.append("executed") await docket.add(debounced_task)() - await worker.run_until_finished() - assert results == ["executed"] - - await asyncio.sleep(0.06) - await docket.add(debounced_task)() + await docket.add(debounced_task)() + await worker.run_until_finished() - assert results == ["executed", "executed"] + assert results == ["executed"] -async def test_per_parameter_debounce_blocks_same_value(docket: Docket, worker: Worker): - """Per-parameter debounce blocks same value, allows different values.""" +async def test_per_parameter_debounce_independent_windows( + docket: Docket, worker: Worker +): + """Per-parameter debounce has independent settle windows per value.""" results: list[int] = [] async def debounced_task( - customer_id: Annotated[int, Debounce(timedelta(seconds=5))], + customer_id: Annotated[int, Debounce(timedelta(milliseconds=50))], ): results.append(customer_id) - await docket.add(debounced_task)(customer_id=1) await docket.add(debounced_task)(customer_id=1) await docket.add(debounced_task)(customer_id=2) + await docket.add(debounced_task)(customer_id=1) - worker.concurrency = 10 await worker.run_until_finished() assert sorted(results) == [1, 2] @@ -98,8 +93,8 @@ async def task( assert results == ["executed_1"] -async def test_debounce_key_cleaned_up_after_ttl(docket: Docket, worker: Worker): - """Redis key is cleaned up after TTL expires.""" +async def test_debounce_keys_cleaned_up_after_execution(docket: Docket, worker: Worker): + """Winner and last_seen keys are cleaned up after the task proceeds.""" async def debounced_task( debounce: Debounce = Debounce(timedelta(milliseconds=50)), @@ -109,11 +104,10 @@ async def debounced_task( await docket.add(debounced_task)() await worker.run_until_finished() - # Wait for TTL to expire + # Wait for any residual TTLs await asyncio.sleep(0.1) async with docket.redis() as redis: - # Scan for any debounce keys — should all be expired debounce_keys: list[str] = [ key async for key in redis.scan_iter( # type: ignore[union-attr] From 3d3312fa0cfee03facb27235583841f06f1490e5 Mon Sep 17 00:00:00 2001 From: Chris Guidry Date: Fri, 27 Feb 2026 16:28:54 -0500 Subject: [PATCH 5/7] Simplify admission control exceptions and relax Cooldown single restriction MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Drop CooldownBlocked and DebounceBlocked in favor of raising AdmissionBlocked directly with reschedule/retry_delay as constructor arguments. Less ceremony, same behavior. Cooldown no longer enforces single=True — multiple per-parameter cooldowns on the same task are independent and work fine. Debounce keeps single=True because its reschedule mechanism means multiple debounces would loop forever. Also trims implementation details (Redis key patterns, Lua scripts) from the user-facing docs. Co-Authored-By: Claude Opus 4.6 --- docs/task-behaviors.md | 38 +++++++++++++++++++------- src/docket/dependencies/__init__.py | 6 ++--- src/docket/dependencies/_base.py | 14 +++++++--- src/docket/dependencies/_cooldown.py | 25 +++++------------ src/docket/dependencies/_debounce.py | 40 +++++----------------------- tests/test_cooldown.py | 33 ++++++++++++++++------- tests/test_debounce.py | 4 +-- 7 files changed, 78 insertions(+), 82 deletions(-) diff --git a/docs/task-behaviors.md b/docs/task-behaviors.md index 55c3ad6..dece4cb 100644 --- a/docs/task-behaviors.md +++ b/docs/task-behaviors.md @@ -423,7 +423,7 @@ async def monitor_concurrency_usage() -> None: ## Cooldown -Cooldown executes the first submission immediately, then drops duplicates within a window. On entry it atomically sets a Redis key with a TTL (`SET key 1 NX PX window_ms`). If the key already exists the task is silently dropped. The key expires naturally after the window. +Cooldown executes the first submission immediately, then drops duplicates within a window. If another submission arrives before the window expires, it's silently dropped. ### Per-Task Cooldown @@ -510,14 +510,6 @@ await docket.add(sync_customer)(customer_id=1001) # resets 1001's timer await docket.add(sync_customer)(customer_id=2002) # independent window ``` -### How It Works - -Debounce uses two Redis keys per scope — a **winner** key (which task gets to proceed) and a **last_seen** timestamp — managed by an atomic Lua script: - -1. **No winner exists** — the task becomes the winner and gets rescheduled for the full settle window. -2. **Winner returns from reschedule** — if enough time has passed since the last submission, it proceeds. Otherwise it reschedules for the remaining time. -3. **Non-winner arrives** — updates the last_seen timestamp (resetting the settle timer) and is immediately dropped. - ### Debounce vs. Cooldown | | Cooldown | Debounce | @@ -526,6 +518,34 @@ Debounce uses two Redis keys per scope — a **winner** key (which task gets to | **Window anchored to** | First execution | Last submission | | **Good for** | Deduplicating rapid-fire events | Batching bursts into one action | +### Multiple Cooldowns + +You can annotate multiple parameters with `Cooldown` on the same task. Each gets its own independent window scoped to that parameter's value. A task must pass *all* of its cooldown checks to start — if any one blocks, the task is dropped: + +```python +from typing import Annotated + +async def sync_data( + customer_id: Annotated[int, Cooldown(timedelta(seconds=30))], + region: Annotated[str, Cooldown(timedelta(seconds=60))], +) -> None: + await refresh_data(customer_id, region) + +# Runs immediately — both windows are clear +await docket.add(sync_data)(customer_id=1, region="us") + +# Blocked — customer_id=1 is still in cooldown +await docket.add(sync_data)(customer_id=1, region="eu") + +# Blocked — region="us" is still in cooldown +await docket.add(sync_data)(customer_id=2, region="us") + +# Runs — both customer_id=2 and region="eu" are clear +await docket.add(sync_data)(customer_id=2, region="eu") +``` + +Only one `Debounce` is allowed per task — its reschedule mechanism requires a single settle window. + ### Combining with Other Controls Debounce, cooldown, and concurrency limits can all coexist on the same task: diff --git a/src/docket/dependencies/__init__.py b/src/docket/dependencies/__init__.py index a86889d..a18c40a 100644 --- a/src/docket/dependencies/__init__.py +++ b/src/docket/dependencies/__init__.py @@ -19,8 +19,8 @@ format_duration, ) from ._concurrency import ConcurrencyBlocked, ConcurrencyLimit -from ._cooldown import Cooldown, CooldownBlocked -from ._debounce import Debounce, DebounceBlocked +from ._cooldown import Cooldown +from ._debounce import Debounce from ._cron import Cron from ._contextual import ( CurrentDocket, @@ -86,9 +86,7 @@ "ConcurrencyBlocked", "ConcurrencyLimit", "Cooldown", - "CooldownBlocked", "Debounce", - "DebounceBlocked", "Cron", "Perpetual", "Progress", diff --git a/src/docket/dependencies/_base.py b/src/docket/dependencies/_base.py index 841ed15..1dcfbe8 100644 --- a/src/docket/dependencies/_base.py +++ b/src/docket/dependencies/_base.py @@ -64,12 +64,18 @@ class AdmissionBlocked(Exception): ``retry_delay`` overrides the default reschedule delay when set. """ - reschedule: bool = True - retry_delay: timedelta | None = None - - def __init__(self, execution: Execution, reason: str = "admission control"): + def __init__( + self, + execution: Execution, + reason: str = "admission control", + *, + reschedule: bool = True, + retry_delay: timedelta | None = None, + ): self.execution = execution self.reason = reason + self.reschedule = reschedule + self.retry_delay = retry_delay super().__init__(f"Task {execution.key} blocked by {reason}") diff --git a/src/docket/dependencies/_cooldown.py b/src/docket/dependencies/_cooldown.py index dde4fb0..1d7122e 100644 --- a/src/docket/dependencies/_cooldown.py +++ b/src/docket/dependencies/_cooldown.py @@ -8,25 +8,10 @@ from datetime import timedelta from types import TracebackType -from typing import TYPE_CHECKING, Any +from typing import Any from ._base import AdmissionBlocked, Dependency, current_docket, current_execution -if TYPE_CHECKING: # pragma: no cover - from ..execution import Execution - - -class CooldownBlocked(AdmissionBlocked): - """Raised when a task is blocked by cooldown.""" - - reschedule = False - - def __init__(self, execution: Execution, cooldown_key: str, window: timedelta): - self.cooldown_key = cooldown_key - self.window = window - reason = f"cooldown ({window}) on {cooldown_key}" - super().__init__(execution, reason=reason) - class Cooldown(Dependency["Cooldown"]): """Execute first, drop duplicates within window. @@ -47,8 +32,6 @@ async def process_customer( ) -> None: ... """ - single: bool = True - def __init__(self, window: timedelta, *, scope: str | None = None) -> None: self.window = window self.scope = scope @@ -79,7 +62,11 @@ async def __aenter__(self) -> Cooldown: acquired = await redis.set(cooldown_key, 1, nx=True, px=window_ms) if not acquired: - raise CooldownBlocked(execution, cooldown_key, self.window) + raise AdmissionBlocked( + execution, + reason=f"cooldown ({self.window}) on {cooldown_key}", + reschedule=False, + ) return self diff --git a/src/docket/dependencies/_debounce.py b/src/docket/dependencies/_debounce.py index 99ad6ed..7d61290 100644 --- a/src/docket/dependencies/_debounce.py +++ b/src/docket/dependencies/_debounce.py @@ -10,13 +10,10 @@ import time from datetime import timedelta from types import TracebackType -from typing import TYPE_CHECKING, Any +from typing import Any from ._base import AdmissionBlocked, Dependency, current_docket, current_execution -if TYPE_CHECKING: # pragma: no cover - from ..execution import Execution - # Lua script for atomic debounce logic. # # KEYS[1] = winner key (holds the execution key of the chosen task) @@ -76,26 +73,6 @@ _ACTION_DROP = 3 -class DebounceBlocked(AdmissionBlocked): - """Raised when a task is blocked by debounce.""" - - def __init__( - self, - execution: Execution, - debounce_key: str, - settle: timedelta, - *, - reschedule: bool, - retry_delay: timedelta | None = None, - ): - self.debounce_key = debounce_key - self.settle = settle - self.reschedule = reschedule # type: ignore[assignment] - self.retry_delay = retry_delay # type: ignore[assignment] - reason = f"debounce ({settle}) on {debounce_key}" - super().__init__(execution, reason=reason) - - class Debounce(Dependency["Debounce"]): """Wait for submissions to settle, then fire once. @@ -160,22 +137,17 @@ async def __aenter__(self) -> Debounce: if action == _ACTION_PROCEED: return self + reason = f"debounce ({self.settle}) on {base_key}" + if action == _ACTION_RESCHEDULE: - raise DebounceBlocked( + raise AdmissionBlocked( execution, - base_key, - self.settle, - reschedule=True, + reason=reason, retry_delay=timedelta(milliseconds=remaining_ms), ) # DROP - raise DebounceBlocked( - execution, - base_key, - self.settle, - reschedule=False, - ) + raise AdmissionBlocked(execution, reason=reason, reschedule=False) async def __aexit__( self, diff --git a/tests/test_cooldown.py b/tests/test_cooldown.py index 40ef219..1bb4b25 100644 --- a/tests/test_cooldown.py +++ b/tests/test_cooldown.py @@ -6,8 +6,6 @@ from datetime import timedelta from typing import Annotated -import pytest - from docket import ConcurrencyLimit, Docket, Worker from docket.dependencies import Cooldown @@ -71,16 +69,31 @@ async def cooled_task( assert results.count(1) == 1 -async def test_cooldown_single_rejects_two(docket: Docket): - """single=True rejects two Cooldown on the same task.""" - with pytest.raises(ValueError, match="Only one Cooldown"): +async def test_multiple_cooldowns_on_different_parameters( + docket: Docket, worker: Worker +): + """Multiple Cooldown annotations on different parameters are independent.""" + results: list[tuple[int, str]] = [] + + async def task( + customer_id: Annotated[int, Cooldown(timedelta(seconds=5))], + region: Annotated[str, Cooldown(timedelta(seconds=5))], + ): + results.append((customer_id, region)) + + await docket.add(task)(customer_id=1, region="us") + await docket.add(task)( + customer_id=1, region="eu" + ) # same customer, different region + await docket.add(task)( + customer_id=2, region="us" + ) # different customer, same region - async def task( - a: Annotated[int, Cooldown(timedelta(seconds=1))], - b: Annotated[str, Cooldown(timedelta(seconds=2))], - ): ... # pragma: no cover + worker.concurrency = 10 + await worker.run_until_finished() - await docket.add(task)(a=1, b="x") + # First call runs. Second is blocked by customer_id=1. Third is blocked by region="us". + assert results == [(1, "us")] async def test_cooldown_coexists_with_concurrency_limit(docket: Docket, worker: Worker): diff --git a/tests/test_debounce.py b/tests/test_debounce.py index 12c8149..fdfd292 100644 --- a/tests/test_debounce.py +++ b/tests/test_debounce.py @@ -66,8 +66,8 @@ async def debounced_task( assert results.count(1) == 1 -async def test_debounce_single_rejects_two(docket: Docket): - """single=True rejects two Debounce on the same task.""" +async def test_multiple_debounces_rejected(docket: Docket): + """Only one Debounce is allowed per task.""" with pytest.raises(ValueError, match="Only one Debounce"): async def task( From 88eece719baa3f66e4197d6502dcebfcd340a0f8 Mon Sep 17 00:00:00 2001 From: Chris Guidry Date: Fri, 27 Feb 2026 16:32:39 -0500 Subject: [PATCH 6/7] Say "quietly dropped" instead of "silently dropped" for admission control Co-Authored-By: Claude Opus 4.6 --- docs/task-behaviors.md | 4 ++-- src/docket/dependencies/_base.py | 6 +++--- src/docket/dependencies/_cooldown.py | 2 +- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/docs/task-behaviors.md b/docs/task-behaviors.md index dece4cb..df472f7 100644 --- a/docs/task-behaviors.md +++ b/docs/task-behaviors.md @@ -423,7 +423,7 @@ async def monitor_concurrency_usage() -> None: ## Cooldown -Cooldown executes the first submission immediately, then drops duplicates within a window. If another submission arrives before the window expires, it's silently dropped. +Cooldown executes the first submission immediately, then drops duplicates within a window. If another submission arrives before the window expires, it's quietly dropped with an INFO-level log. ### Per-Task Cooldown @@ -440,7 +440,7 @@ async def process_webhooks( # First call starts immediately and sets a 30-second window await docket.add(process_webhooks)() -# This one arrives 5 seconds later — silently dropped +# This one arrives 5 seconds later — quietly dropped await docket.add(process_webhooks)() ``` diff --git a/src/docket/dependencies/_base.py b/src/docket/dependencies/_base.py index 1dcfbe8..146645b 100644 --- a/src/docket/dependencies/_base.py +++ b/src/docket/dependencies/_base.py @@ -57,9 +57,9 @@ class AdmissionBlocked(Exception): concurrency limits, rate limits, or health gates. When ``reschedule`` is True (default), the worker re-queues the task - with a short delay. When False, the task is silently acknowledged - and dropped (appropriate for debounce/cooldown where re-trying would - just hit the same window). + with a short delay. When False, the task is quietly acknowledged + and dropped with an INFO-level log (appropriate for debounce/cooldown + where re-trying would just hit the same window). ``retry_delay`` overrides the default reschedule delay when set. """ diff --git a/src/docket/dependencies/_cooldown.py b/src/docket/dependencies/_cooldown.py index 1d7122e..b72d22c 100644 --- a/src/docket/dependencies/_cooldown.py +++ b/src/docket/dependencies/_cooldown.py @@ -17,7 +17,7 @@ class Cooldown(Dependency["Cooldown"]): """Execute first, drop duplicates within window. Sets a Redis key on entry with a TTL equal to the window. If the key - already exists, the task is blocked and silently dropped. + already exists, the task is blocked and quietly dropped. Works both as a default parameter and as ``Annotated`` metadata:: From 15ae145af75c0bbf095a24948d55dd52fc96f819 Mon Sep 17 00:00:00 2001 From: Chris Guidry Date: Fri, 27 Feb 2026 16:43:19 -0500 Subject: [PATCH 7/7] Fix Redis Cluster CROSSSLOT error in debounce Lua script The debounce Lua script uses two keys (winner + last_seen) that need to land on the same cluster slot. Added a hash tag so Redis routes them together. Also fixed the multiple-cooldowns test to not depend on execution order. Co-Authored-By: Claude Opus 4.6 --- src/docket/dependencies/_debounce.py | 11 +++++++---- tests/test_cooldown.py | 3 ++- 2 files changed, 9 insertions(+), 5 deletions(-) diff --git a/src/docket/dependencies/_debounce.py b/src/docket/dependencies/_debounce.py index 7d61290..97ebcb2 100644 --- a/src/docket/dependencies/_debounce.py +++ b/src/docket/dependencies/_debounce.py @@ -113,12 +113,15 @@ async def __aenter__(self) -> Debounce: scope = self.scope or docket.name if self._argument_name is not None: - base_key = f"{scope}:debounce:{self._argument_name}:{self._argument_value}" + hash_tag = f"{self._argument_name}:{self._argument_value}" + base_key = f"{scope}:debounce:{hash_tag}" else: - base_key = f"{scope}:debounce:{execution.function_name}" + hash_tag = execution.function_name + base_key = f"{scope}:debounce:{hash_tag}" - winner_key = f"{base_key}:winner" - seen_key = f"{base_key}:last_seen" + # Use a Redis hash tag {…} so both keys land on the same cluster slot + winner_key = f"{base_key}:{{{hash_tag}}}:winner" + seen_key = f"{base_key}:{{{hash_tag}}}:last_seen" settle_ms = int(self.settle.total_seconds() * 1000) now_ms = int(time.time() * 1000) diff --git a/tests/test_cooldown.py b/tests/test_cooldown.py index 1bb4b25..4d6941c 100644 --- a/tests/test_cooldown.py +++ b/tests/test_cooldown.py @@ -82,6 +82,8 @@ async def task( results.append((customer_id, region)) await docket.add(task)(customer_id=1, region="us") + await worker.run_until_finished() + await docket.add(task)( customer_id=1, region="eu" ) # same customer, different region @@ -89,7 +91,6 @@ async def task( customer_id=2, region="us" ) # different customer, same region - worker.concurrency = 10 await worker.run_until_finished() # First call runs. Second is blocked by customer_id=1. Third is blocked by region="us".