diff --git a/docs/getting-started.md b/docs/getting-started.md index 4fa0afd..54c74c8 100644 --- a/docs/getting-started.md +++ b/docs/getting-started.md @@ -244,7 +244,7 @@ You now know the core concepts: creating dockets, scheduling work with idempoten Ready for more? Check out: -- **[Task Behaviors](task-behaviors.md)** - Retries, timeouts, progress reporting, and concurrency control +- **[Task Behaviors](task-behaviors.md)** - Retries, timeouts, progress reporting, concurrency control, debounce, and cooldown - **[Dependency Injection](dependency-injection.md)** - Access current docket, custom dependencies, shared resources - **[Testing with Docket](testing.md)** - Ergonomic testing utilities for unit and integration tests - **[Task Design Patterns](task-patterns.md)** - Find & flood, task scattering, logging, and task chains diff --git a/docs/task-behaviors.md b/docs/task-behaviors.md index b01128f..df472f7 100644 --- a/docs/task-behaviors.md +++ b/docs/task-behaviors.md @@ -305,18 +305,18 @@ For more details on progress monitoring patterns and real-time observation, see ## Concurrency Control -Docket provides fine-grained concurrency control that allows you to limit the number of concurrent tasks based on specific argument values. This is essential for protecting shared resources, preventing overwhelming external services, and managing database connections. +Docket provides fine-grained concurrency control that limits how many tasks can run at the same time, based on specific argument values. This is useful for protecting shared resources, preventing overwhelming external services, and managing database connections. -### Basic Concurrency Limits +### Per-Argument Concurrency -Use `ConcurrencyLimit` to restrict concurrent execution based on task arguments: +Annotate a parameter with `ConcurrencyLimit` to limit concurrency based on its value. Each distinct value gets its own independent limit: ```python +from typing import Annotated from docket import ConcurrencyLimit async def process_customer_data( - customer_id: int, - concurrency: ConcurrencyLimit = ConcurrencyLimit("customer_id", max_concurrent=1) + customer_id: Annotated[int, ConcurrencyLimit(1)], ) -> None: # Only one task per customer_id can run at a time await update_customer_profile(customer_id) @@ -325,11 +325,22 @@ async def process_customer_data( # These will run sequentially for the same customer await docket.add(process_customer_data)(customer_id=1001) await docket.add(process_customer_data)(customer_id=1001) -await docket.add(process_customer_data)(customer_id=1001) # But different customers can run concurrently await docket.add(process_customer_data)(customer_id=2001) # Runs in parallel -await docket.add(process_customer_data)(customer_id=3001) # Runs in parallel +``` + +### Per-Task Concurrency + +Use a default parameter to limit the total number of concurrent executions of a task, regardless of arguments: + +```python +async def expensive_computation( + input_data: str, + concurrency: ConcurrencyLimit = ConcurrencyLimit(max_concurrent=3), +) -> None: + # At most 3 of these tasks can run at once across all arguments + await run_computation(input_data) ``` ### Database Connection Pooling @@ -338,9 +349,8 @@ Limit concurrent database operations to prevent overwhelming your database: ```python async def backup_database_table( - db_name: str, + db_name: Annotated[str, ConcurrencyLimit(2)], table_name: str, - concurrency: ConcurrencyLimit = ConcurrencyLimit("db_name", max_concurrent=2) ) -> None: # Maximum 2 backup operations per database at once await create_table_backup(db_name, table_name) @@ -360,8 +370,7 @@ Protect external APIs from being overwhelmed: ```python async def sync_user_with_external_service( user_id: int, - service_name: str, - concurrency: ConcurrencyLimit = ConcurrencyLimit("service_name", max_concurrent=5) + service_name: Annotated[str, ConcurrencyLimit(5)], ) -> None: # Limit to 5 concurrent API calls per external service api_client = get_api_client(service_name) @@ -374,84 +383,22 @@ await docket.add(sync_user_with_external_service)(456, "salesforce") # Will que await docket.add(sync_user_with_external_service)(789, "hubspot") # Different service, runs in parallel ``` -### File Processing Limits - -Control concurrent file operations to manage disk I/O: - -```python -async def process_media_file( - file_path: str, - operation_type: str, - concurrency: ConcurrencyLimit = ConcurrencyLimit("operation_type", max_concurrent=3) -) -> None: - # Limit concurrent operations by type (e.g., 3 video transcodes, 3 image resizes) - if operation_type == "video_transcode": - await transcode_video(file_path) - elif operation_type == "image_resize": - await resize_image(file_path) - elif operation_type == "audio_compress": - await compress_audio(file_path) - -# Different operation types can run concurrently, but each type is limited -await docket.add(process_media_file)("/videos/movie1.mp4", "video_transcode") -await docket.add(process_media_file)("/videos/movie2.mp4", "video_transcode") -await docket.add(process_media_file)("/images/photo1.jpg", "image_resize") # Runs in parallel -``` - ### Custom Scopes Use custom scopes to create independent concurrency limits: ```python async def process_tenant_data( - tenant_id: str, + tenant_id: Annotated[str, ConcurrencyLimit(2, scope="tenant_operations")], operation: str, - concurrency: ConcurrencyLimit = ConcurrencyLimit( - "tenant_id", - max_concurrent=2, - scope="tenant_operations" - ) ) -> None: # Each tenant can have up to 2 concurrent operations await perform_tenant_operation(tenant_id, operation) - -async def process_global_data( - data_type: str, - concurrency: ConcurrencyLimit = ConcurrencyLimit( - "data_type", - max_concurrent=1, - scope="global_operations" # Separate from tenant operations - ) -) -> None: - # Global operations have their own concurrency limits - await process_global_data_type(data_type) ``` -### Multi-Level Concurrency - -Combine multiple concurrency controls for complex scenarios: - -```python -async def process_user_export( - user_id: int, - export_type: str, - region: str, - user_limit: ConcurrencyLimit = ConcurrencyLimit("user_id", max_concurrent=1), - type_limit: ConcurrencyLimit = ConcurrencyLimit("export_type", max_concurrent=3), - region_limit: ConcurrencyLimit = ConcurrencyLimit("region", max_concurrent=10) -) -> None: - # This task respects ALL concurrency limits: - # - Only 1 export per user at a time - # - Only 3 exports of each type globally - # - Only 10 exports per region - await generate_user_export(user_id, export_type, region) -``` - -**Note**: When using multiple `ConcurrencyLimit` dependencies, all limits must be satisfied before the task can start. - ### Monitoring Concurrency -Concurrency limits are enforced using Redis sets, so you can monitor them: +Concurrency limits are enforced using Redis sorted sets, so you can monitor them: ```python async def monitor_concurrency_usage() -> None: @@ -467,14 +414,150 @@ async def monitor_concurrency_usage() -> None: print(f"{key}: {count} active tasks") ``` -### Tips +!!! note "Legacy default-parameter style" + Prior to 0.18, `ConcurrencyLimit` required passing the argument name as a + string: `ConcurrencyLimit("customer_id", max_concurrent=1)`. This style + still works but `Annotated` is preferred — it avoids the string-name + duplication and is consistent with Debounce, Cooldown, and other + dependencies. + +## Cooldown + +Cooldown executes the first submission immediately, then drops duplicates within a window. If another submission arrives before the window expires, it's quietly dropped with an INFO-level log. + +### Per-Task Cooldown + +```python +from datetime import timedelta +from docket import Cooldown + +async def process_webhooks( + cooldown: Cooldown = Cooldown(timedelta(seconds=30)), +) -> None: + events = await fetch_pending_webhook_events() + await process_events(events) + +# First call starts immediately and sets a 30-second window +await docket.add(process_webhooks)() + +# This one arrives 5 seconds later — quietly dropped +await docket.add(process_webhooks)() +``` + +### Per-Parameter Cooldown + +Annotate a parameter with `Cooldown` to apply independent windows per value: + +```python +from typing import Annotated + +async def sync_customer( + customer_id: Annotated[int, Cooldown(timedelta(seconds=30))], +) -> None: + await refresh_customer_data(customer_id) + +# First sync for customer 1001 starts immediately +await docket.add(sync_customer)(customer_id=1001) + +# Duplicate for 1001 within 30s — dropped +await docket.add(sync_customer)(customer_id=1001) + +# Different customer — runs immediately +await docket.add(sync_customer)(customer_id=2002) +``` + +## Debounce -1. **Choose appropriate argument names**: Use arguments that represent the resource you want to protect (database name, customer ID, API endpoint). +Debounce waits for submissions to settle before firing. When rapid-fire events arrive, only one task runs — after a quiet period equal to the settle window. This is the classic "trailing-edge" debounce: keep resetting the timer on each new event, then fire once things calm down. + +### Per-Task Debounce + +```python +from datetime import timedelta +from docket import Debounce + +async def process_webhooks( + debounce: Debounce = Debounce(timedelta(seconds=5)), +) -> None: + events = await fetch_pending_webhook_events() + await process_events(events) -2. **Set reasonable limits**: Base limits on your system's capacity and external service constraints. +# First submission becomes the "winner" and gets rescheduled +await docket.add(process_webhooks)() -3. **Use descriptive scopes**: When you have multiple unrelated concurrency controls, use different scopes to avoid conflicts. +# More events arrive — they reset the settle timer but are dropped +await docket.add(process_webhooks)() +await docket.add(process_webhooks)() + +# After 5 seconds of quiet, the winner proceeds +``` + +### Per-Parameter Debounce + +Annotate a parameter with `Debounce` to get independent settle windows per value: + +```python +from typing import Annotated + +async def sync_customer( + customer_id: Annotated[int, Debounce(timedelta(seconds=5))], +) -> None: + await refresh_customer_data(customer_id) + +# Each customer_id gets its own independent settle window +await docket.add(sync_customer)(customer_id=1001) +await docket.add(sync_customer)(customer_id=1001) # resets 1001's timer +await docket.add(sync_customer)(customer_id=2002) # independent window +``` -4. **Monitor blocked tasks**: Tasks that can't start due to concurrency limits are automatically rescheduled with small delays. +### Debounce vs. Cooldown + +| | Cooldown | Debounce | +|---|---|---| +| **Behavior** | Execute first, drop duplicates | Wait for quiet, then execute | +| **Window anchored to** | First execution | Last submission | +| **Good for** | Deduplicating rapid-fire events | Batching bursts into one action | + +### Multiple Cooldowns + +You can annotate multiple parameters with `Cooldown` on the same task. Each gets its own independent window scoped to that parameter's value. A task must pass *all* of its cooldown checks to start — if any one blocks, the task is dropped: + +```python +from typing import Annotated + +async def sync_data( + customer_id: Annotated[int, Cooldown(timedelta(seconds=30))], + region: Annotated[str, Cooldown(timedelta(seconds=60))], +) -> None: + await refresh_data(customer_id, region) + +# Runs immediately — both windows are clear +await docket.add(sync_data)(customer_id=1, region="us") + +# Blocked — customer_id=1 is still in cooldown +await docket.add(sync_data)(customer_id=1, region="eu") + +# Blocked — region="us" is still in cooldown +await docket.add(sync_data)(customer_id=2, region="us") + +# Runs — both customer_id=2 and region="eu" are clear +await docket.add(sync_data)(customer_id=2, region="eu") +``` + +Only one `Debounce` is allowed per task — its reschedule mechanism requires a single settle window. + +### Combining with Other Controls + +Debounce, cooldown, and concurrency limits can all coexist on the same task: + +```python +from typing import Annotated + +async def process_order( + order_id: Annotated[int, ConcurrencyLimit(1)], + cooldown: Cooldown = Cooldown(timedelta(seconds=60)), +) -> None: + await finalize_order(order_id) +``` -5. **Consider cascading effects**: Concurrency limits can create queuing effects - monitor your system to ensure tasks don't back up excessively. +Each admission control is checked independently. A task must satisfy all of them to start. diff --git a/loq.toml b/loq.toml index 5b31587..a55406a 100644 --- a/loq.toml +++ b/loq.toml @@ -10,7 +10,7 @@ max_lines = 750 # Source files that still need exceptions above 750 [[rules]] path = "src/docket/worker.py" -max_lines = 1141 +max_lines = 1150 [[rules]] path = "src/docket/cli/__init__.py" diff --git a/src/docket/__init__.py b/src/docket/__init__.py index f2396ea..9c886ea 100644 --- a/src/docket/__init__.py +++ b/src/docket/__init__.py @@ -12,7 +12,9 @@ from .annotations import Logged from .dependencies import ( ConcurrencyLimit, + Cooldown, Cron, + Debounce, CurrentDocket, CurrentExecution, CurrentWorker, @@ -37,7 +39,9 @@ "__version__", "Agenda", "ConcurrencyLimit", + "Cooldown", "Cron", + "Debounce", "CurrentDocket", "CurrentExecution", "CurrentWorker", diff --git a/src/docket/dependencies/__init__.py b/src/docket/dependencies/__init__.py index 3fcf2ce..a18c40a 100644 --- a/src/docket/dependencies/__init__.py +++ b/src/docket/dependencies/__init__.py @@ -19,6 +19,8 @@ format_duration, ) from ._concurrency import ConcurrencyBlocked, ConcurrencyLimit +from ._cooldown import Cooldown +from ._debounce import Debounce from ._cron import Cron from ._contextual import ( CurrentDocket, @@ -83,6 +85,8 @@ "AdmissionBlocked", "ConcurrencyBlocked", "ConcurrencyLimit", + "Cooldown", + "Debounce", "Cron", "Perpetual", "Progress", diff --git a/src/docket/dependencies/_base.py b/src/docket/dependencies/_base.py index a085af2..146645b 100644 --- a/src/docket/dependencies/_base.py +++ b/src/docket/dependencies/_base.py @@ -55,11 +55,27 @@ class AdmissionBlocked(Exception): This is the base exception for admission control mechanisms like concurrency limits, rate limits, or health gates. + + When ``reschedule`` is True (default), the worker re-queues the task + with a short delay. When False, the task is quietly acknowledged + and dropped with an INFO-level log (appropriate for debounce/cooldown + where re-trying would just hit the same window). + + ``retry_delay`` overrides the default reschedule delay when set. """ - def __init__(self, execution: Execution, reason: str = "admission control"): + def __init__( + self, + execution: Execution, + reason: str = "admission control", + *, + reschedule: bool = True, + retry_delay: timedelta | None = None, + ): self.execution = execution self.reason = reason + self.reschedule = reschedule + self.retry_delay = retry_delay super().__init__(f"Task {execution.key} blocked by {reason}") diff --git a/src/docket/dependencies/_cooldown.py b/src/docket/dependencies/_cooldown.py new file mode 100644 index 0000000..b72d22c --- /dev/null +++ b/src/docket/dependencies/_cooldown.py @@ -0,0 +1,79 @@ +"""Cooldown admission control dependency. + +Executes the first task immediately, then drops duplicates within the window. +Sets a Redis key on entry with TTL. +""" + +from __future__ import annotations + +from datetime import timedelta +from types import TracebackType +from typing import Any + +from ._base import AdmissionBlocked, Dependency, current_docket, current_execution + + +class Cooldown(Dependency["Cooldown"]): + """Execute first, drop duplicates within window. + + Sets a Redis key on entry with a TTL equal to the window. If the key + already exists, the task is blocked and quietly dropped. + + Works both as a default parameter and as ``Annotated`` metadata:: + + # Per-task: don't start if one started in the last 30s + async def process_webhooks( + cooldown: Cooldown = Cooldown(timedelta(seconds=30)), + ) -> None: ... + + # Per-parameter: don't start for this customer if one started in the last 30s + async def process_customer( + customer_id: Annotated[int, Cooldown(timedelta(seconds=30))], + ) -> None: ... + """ + + def __init__(self, window: timedelta, *, scope: str | None = None) -> None: + self.window = window + self.scope = scope + self._argument_name: str | None = None + self._argument_value: Any = None + + def bind_to_parameter(self, name: str, value: Any) -> Cooldown: + bound = Cooldown(self.window, scope=self.scope) + bound._argument_name = name + bound._argument_value = value + return bound + + async def __aenter__(self) -> Cooldown: + execution = current_execution.get() + docket = current_docket.get() + + scope = self.scope or docket.name + if self._argument_name is not None: + cooldown_key = ( + f"{scope}:cooldown:{self._argument_name}:{self._argument_value}" + ) + else: + cooldown_key = f"{scope}:cooldown:{execution.function_name}" + + window_ms = int(self.window.total_seconds() * 1000) + + async with docket.redis() as redis: + acquired = await redis.set(cooldown_key, 1, nx=True, px=window_ms) + + if not acquired: + raise AdmissionBlocked( + execution, + reason=f"cooldown ({self.window}) on {cooldown_key}", + reschedule=False, + ) + + return self + + async def __aexit__( + self, + exc_type: type[BaseException] | None, + exc_value: BaseException | None, + traceback: TracebackType | None, + ) -> None: + pass diff --git a/src/docket/dependencies/_debounce.py b/src/docket/dependencies/_debounce.py new file mode 100644 index 0000000..97ebcb2 --- /dev/null +++ b/src/docket/dependencies/_debounce.py @@ -0,0 +1,161 @@ +"""Debounce (trailing-edge / settle) admission control dependency. + +Waits for submissions to settle, then fires once. Uses two Redis keys +(winner + last_seen) so only one task bounces while the rest immediately +drop. +""" + +from __future__ import annotations + +import time +from datetime import timedelta +from types import TracebackType +from typing import Any + +from ._base import AdmissionBlocked, Dependency, current_docket, current_execution + +# Lua script for atomic debounce logic. +# +# KEYS[1] = winner key (holds the execution key of the chosen task) +# KEYS[2] = last_seen key (holds ms timestamp of most recent submission) +# ARGV[1] = my execution key +# ARGV[2] = settle window in milliseconds +# ARGV[3] = current time in milliseconds +# ARGV[4] = key TTL in milliseconds (settle * 10) +# +# Returns: {action, remaining_ms} +# action: 1=PROCEED, 2=RESCHEDULE, 3=DROP +# remaining_ms: ms until settle window expires (only for RESCHEDULE) +_DEBOUNCE_LUA = """ +local winner_key = KEYS[1] +local seen_key = KEYS[2] +local my_key = ARGV[1] +local settle_ms = tonumber(ARGV[2]) +local now_ms = tonumber(ARGV[3]) +local ttl_ms = tonumber(ARGV[4]) + +local winner = redis.call('GET', winner_key) + +if not winner then + -- No winner: I become winner, record last_seen = now + redis.call('SET', winner_key, my_key, 'PX', ttl_ms) + redis.call('SET', seen_key, tostring(now_ms), 'PX', ttl_ms) + return {2, settle_ms} +end + +if winner == my_key then + -- I'm the winner, returning from reschedule + local last_seen_str = redis.call('GET', seen_key) + local last_seen = tonumber(last_seen_str) or 0 + local elapsed = now_ms - last_seen + + if elapsed >= settle_ms then + -- Settled: clean up and proceed + redis.call('DEL', winner_key, seen_key) + return {1, 0} + else + -- Not settled yet: refresh TTLs and reschedule for remaining time + local remaining = settle_ms - elapsed + redis.call('PEXPIRE', winner_key, ttl_ms) + redis.call('PEXPIRE', seen_key, ttl_ms) + return {2, remaining} + end +end + +-- Someone else is the winner: update last_seen and refresh TTLs +redis.call('SET', seen_key, tostring(now_ms), 'PX', ttl_ms) +redis.call('PEXPIRE', winner_key, ttl_ms) +return {3, 0} +""" + +_ACTION_PROCEED = 1 +_ACTION_RESCHEDULE = 2 +_ACTION_DROP = 3 + + +class Debounce(Dependency["Debounce"]): + """Wait for submissions to settle, then fire once. + + Uses two Redis keys per scope — a "winner" key (which execution gets + to proceed) and a "last_seen" timestamp. Only the winner bounces + via reschedule; all other submissions are immediately dropped. + + Works both as a default parameter and as ``Annotated`` metadata:: + + # Per-task: wait for 5s of quiet, then execute once + async def process_webhooks( + debounce: Debounce = Debounce(timedelta(seconds=5)), + ) -> None: ... + + # Per-parameter: independent settle window per customer + async def sync_customer( + customer_id: Annotated[int, Debounce(timedelta(seconds=5))], + ) -> None: ... + """ + + single: bool = True + + def __init__(self, settle: timedelta, *, scope: str | None = None) -> None: + self.settle = settle + self.scope = scope + self._argument_name: str | None = None + self._argument_value: Any = None + + def bind_to_parameter(self, name: str, value: Any) -> Debounce: + bound = Debounce(self.settle, scope=self.scope) + bound._argument_name = name + bound._argument_value = value + return bound + + async def __aenter__(self) -> Debounce: + execution = current_execution.get() + docket = current_docket.get() + + scope = self.scope or docket.name + if self._argument_name is not None: + hash_tag = f"{self._argument_name}:{self._argument_value}" + base_key = f"{scope}:debounce:{hash_tag}" + else: + hash_tag = execution.function_name + base_key = f"{scope}:debounce:{hash_tag}" + + # Use a Redis hash tag {…} so both keys land on the same cluster slot + winner_key = f"{base_key}:{{{hash_tag}}}:winner" + seen_key = f"{base_key}:{{{hash_tag}}}:last_seen" + + settle_ms = int(self.settle.total_seconds() * 1000) + now_ms = int(time.time() * 1000) + ttl_ms = settle_ms * 10 + + async with docket.redis() as redis: + script = redis.register_script(_DEBOUNCE_LUA) + result: list[int] = await script( + keys=[winner_key, seen_key], + args=[execution.key, settle_ms, now_ms, ttl_ms], + ) + + action = result[0] + remaining_ms = result[1] + + if action == _ACTION_PROCEED: + return self + + reason = f"debounce ({self.settle}) on {base_key}" + + if action == _ACTION_RESCHEDULE: + raise AdmissionBlocked( + execution, + reason=reason, + retry_delay=timedelta(milliseconds=remaining_ms), + ) + + # DROP + raise AdmissionBlocked(execution, reason=reason, reschedule=False) + + async def __aexit__( + self, + exc_type: type[BaseException] | None, + exc_value: BaseException | None, + traceback: TracebackType | None, + ) -> None: + pass diff --git a/src/docket/dependencies/_resolution.py b/src/docket/dependencies/_resolution.py index c810c19..b7085f6 100644 --- a/src/docket/dependencies/_resolution.py +++ b/src/docket/dependencies/_resolution.py @@ -32,7 +32,7 @@ def get_single_dependency_parameter_of_type( for _, dependencies in get_annotation_dependencies(function).items(): for dependency in dependencies: if isinstance(dependency, dependency_type): - return dependency # type: ignore[return-value] + return dependency return None diff --git a/src/docket/worker.py b/src/docket/worker.py index 17ea7bc..6866425 100644 --- a/src/docket/worker.py +++ b/src/docket/worker.py @@ -528,15 +528,23 @@ async def process_completed_tasks() -> None: await task await ack_message(redis, message_id) except AdmissionBlocked as e: - logger.debug( - "🔒 Task %s blocked by admission control, rescheduling", - e.execution.key, - extra=log_context, - ) - e.execution.when = ( - datetime.now(timezone.utc) + ADMISSION_BLOCKED_RETRY_DELAY - ) - await e.execution.schedule(reschedule_message=message_id) + if e.reschedule: + delay = e.retry_delay or ADMISSION_BLOCKED_RETRY_DELAY + logger.debug( + "⏳ Task %s blocked by admission control, rescheduling", + e.execution.key, + extra=log_context, + ) + e.execution.when = datetime.now(timezone.utc) + delay + await e.execution.schedule(reschedule_message=message_id) + else: + logger.debug( + "⏭ Task %s blocked by admission control, dropping", + e.execution.key, + extra=log_context, + ) + await e.execution.mark_as_cancelled() + await ack_message(redis, message_id) async def ack_message(redis: Redis, message_id: RedisMessageID) -> None: logger.debug("Acknowledging message", extra=log_context) diff --git a/tests/test_cooldown.py b/tests/test_cooldown.py new file mode 100644 index 0000000..4d6941c --- /dev/null +++ b/tests/test_cooldown.py @@ -0,0 +1,137 @@ +"""Tests for Cooldown dependency.""" + +from __future__ import annotations + +import asyncio +from datetime import timedelta +from typing import Annotated + +from docket import ConcurrencyLimit, Docket, Worker +from docket.dependencies import Cooldown + + +async def test_task_level_cooldown_blocks_rapid_reexecution( + docket: Docket, worker: Worker +): + """Task-level cooldown drops duplicate execution within the window.""" + results: list[str] = [] + + async def cooled_task( + cooldown: Cooldown = Cooldown(timedelta(seconds=5)), + ): + results.append("executed") + + await docket.add(cooled_task)() + await docket.add(cooled_task)() + + await worker.run_until_finished() + + assert results == ["executed"] + + +async def test_task_level_cooldown_allows_after_window(docket: Docket, worker: Worker): + """Task-level cooldown allows execution after the window expires.""" + results: list[str] = [] + + async def cooled_task( + cooldown: Cooldown = Cooldown(timedelta(milliseconds=50)), + ): + results.append("executed") + + await docket.add(cooled_task)() + await worker.run_until_finished() + assert results == ["executed"] + + await asyncio.sleep(0.06) + + await docket.add(cooled_task)() + await worker.run_until_finished() + assert results == ["executed", "executed"] + + +async def test_per_parameter_cooldown_blocks_same_value(docket: Docket, worker: Worker): + """Per-parameter cooldown blocks same value, allows different values.""" + results: list[int] = [] + + async def cooled_task( + customer_id: Annotated[int, Cooldown(timedelta(seconds=5))], + ): + results.append(customer_id) + + await docket.add(cooled_task)(customer_id=1) + await docket.add(cooled_task)(customer_id=1) + await docket.add(cooled_task)(customer_id=2) + + worker.concurrency = 10 + await worker.run_until_finished() + + assert sorted(results) == [1, 2] + assert results.count(1) == 1 + + +async def test_multiple_cooldowns_on_different_parameters( + docket: Docket, worker: Worker +): + """Multiple Cooldown annotations on different parameters are independent.""" + results: list[tuple[int, str]] = [] + + async def task( + customer_id: Annotated[int, Cooldown(timedelta(seconds=5))], + region: Annotated[str, Cooldown(timedelta(seconds=5))], + ): + results.append((customer_id, region)) + + await docket.add(task)(customer_id=1, region="us") + await worker.run_until_finished() + + await docket.add(task)( + customer_id=1, region="eu" + ) # same customer, different region + await docket.add(task)( + customer_id=2, region="us" + ) # different customer, same region + + await worker.run_until_finished() + + # First call runs. Second is blocked by customer_id=1. Third is blocked by region="us". + assert results == [(1, "us")] + + +async def test_cooldown_coexists_with_concurrency_limit(docket: Docket, worker: Worker): + """Cooldown + ConcurrencyLimit can coexist on the same task.""" + results: list[str] = [] + + async def task( + customer_id: Annotated[int, ConcurrencyLimit(1)], + cooldown: Cooldown = Cooldown(timedelta(milliseconds=50)), + ): + results.append(f"executed_{customer_id}") + + await docket.add(task)(customer_id=1) + await worker.run_until_finished() + assert results == ["executed_1"] + + +async def test_cooldown_key_cleaned_up_after_ttl(docket: Docket, worker: Worker): + """Redis key is cleaned up after TTL expires.""" + + async def cooled_task( + cooldown: Cooldown = Cooldown(timedelta(milliseconds=50)), + ): + pass + + await docket.add(cooled_task)() + await worker.run_until_finished() + + # Wait for TTL to expire + await asyncio.sleep(0.1) + + async with docket.redis() as redis: + # Scan for any cooldown keys — should all be expired + cooldown_keys: list[str] = [ + key + async for key in redis.scan_iter( # type: ignore[union-attr] + match=f"{docket.name}:cooldown:*" + ) + ] + assert cooldown_keys == [] diff --git a/tests/test_debounce.py b/tests/test_debounce.py new file mode 100644 index 0000000..fdfd292 --- /dev/null +++ b/tests/test_debounce.py @@ -0,0 +1,117 @@ +"""Tests for Debounce dependency (true debounce: settle then fire).""" + +from __future__ import annotations + +import asyncio +from datetime import timedelta +from typing import Annotated + +import pytest + +from docket import ConcurrencyLimit, Docket, Worker +from docket.dependencies import Debounce + + +async def test_single_submission_fires_after_settle(docket: Docket, worker: Worker): + """A single submission fires after the settle window, not immediately.""" + results: list[str] = [] + + async def debounced_task( + debounce: Debounce = Debounce(timedelta(milliseconds=50)), + ): + results.append("executed") + + await docket.add(debounced_task)() + + # The winner gets rescheduled; run_until_finished processes it + await worker.run_until_finished() + assert results == ["executed"] + + +async def test_rapid_submissions_only_one_execution(docket: Docket, worker: Worker): + """Rapid submissions result in only one execution after settling.""" + results: list[str] = [] + + async def debounced_task( + debounce: Debounce = Debounce(timedelta(milliseconds=100)), + ): + results.append("executed") + + await docket.add(debounced_task)() + await docket.add(debounced_task)() + await docket.add(debounced_task)() + + await worker.run_until_finished() + assert results == ["executed"] + + +async def test_per_parameter_debounce_independent_windows( + docket: Docket, worker: Worker +): + """Per-parameter debounce has independent settle windows per value.""" + results: list[int] = [] + + async def debounced_task( + customer_id: Annotated[int, Debounce(timedelta(milliseconds=50))], + ): + results.append(customer_id) + + await docket.add(debounced_task)(customer_id=1) + await docket.add(debounced_task)(customer_id=2) + await docket.add(debounced_task)(customer_id=1) + + await worker.run_until_finished() + + assert sorted(results) == [1, 2] + assert results.count(1) == 1 + + +async def test_multiple_debounces_rejected(docket: Docket): + """Only one Debounce is allowed per task.""" + with pytest.raises(ValueError, match="Only one Debounce"): + + async def task( + a: Annotated[int, Debounce(timedelta(seconds=1))], + b: Annotated[str, Debounce(timedelta(seconds=2))], + ): ... # pragma: no cover + + await docket.add(task)(a=1, b="x") + + +async def test_debounce_coexists_with_concurrency_limit(docket: Docket, worker: Worker): + """Debounce + ConcurrencyLimit can coexist on the same task.""" + results: list[str] = [] + + async def task( + customer_id: Annotated[int, ConcurrencyLimit(1)], + debounce: Debounce = Debounce(timedelta(milliseconds=50)), + ): + results.append(f"executed_{customer_id}") + + await docket.add(task)(customer_id=1) + await worker.run_until_finished() + assert results == ["executed_1"] + + +async def test_debounce_keys_cleaned_up_after_execution(docket: Docket, worker: Worker): + """Winner and last_seen keys are cleaned up after the task proceeds.""" + + async def debounced_task( + debounce: Debounce = Debounce(timedelta(milliseconds=50)), + ): + pass + + await docket.add(debounced_task)() + await worker.run_until_finished() + + # Wait for any residual TTLs + await asyncio.sleep(0.1) + + async with docket.redis() as redis: + debounce_keys: list[str] = [ + key + async for key in redis.scan_iter( # type: ignore[union-attr] + match=f"{docket.name}:debounce:*" + ) + ] + assert debounce_keys == []