From a4af226684076792572de455643f1302f45518ba Mon Sep 17 00:00:00 2001 From: Chris Guidry Date: Fri, 20 Feb 2026 17:45:50 -0500 Subject: [PATCH] Polish docs: reorganize pages and clean up AI filler The docs had grown into a couple of monolith pages (advanced-patterns.md at 1100 lines, dependencies.md at 540 lines) and production.md was padded with generic ops advice (capacity planning formulas, blue-green deployment instructions, Grafana dashboards) that doesn't belong in project docs. Reorganization: - Split advanced-patterns.md into task-behaviors.md, task-patterns.md, and observability.md - Split dependencies.md into dependency-injection.md and moved behavioral deps (retries, timeouts, progress, concurrency) into task-behaviors.md - Updated nav, mkdocs.yml, and cross-references accordingly production.md cleanup: - Cut Redis Streams Architecture internals (creates implied contracts) - Cut generic sections: capacity planning, deployment strategies, dead letter handling, scaling considerations, monitoring checklist, Grafana dashboard, tuning recipes - Moved CLI strike examples into the Striking section - Added Signal Handling section for k8s/container orchestrator users - Removed Error Handling section (already covered in task-behaviors and getting-started) Consistency pass across all docs: - Removed closing filler paragraphs from task-behaviors.md and testing.md - Fixed "comprehensive" wording in getting-started.md and testing.md - Updated stale production.md description in getting-started.md nav Co-Authored-By: Claude Opus 4.6 --- .github/workflows/docs.yml | 2 +- .readthedocs.yaml | 2 +- README.md | 2 +- docs/advanced-patterns.md | 1106 ---------------------------------- docs/dependencies.md | 539 ----------------- docs/dependency-injection.md | 366 +++++++++++ docs/getting-started.md | 16 +- docs/index.md | 15 +- docs/observability.md | 349 +++++++++++ docs/production.md | 250 ++------ docs/task-behaviors.md | 480 +++++++++++++++ docs/task-patterns.md | 245 ++++++++ docs/testing.md | 4 +- mkdocs.yml | 20 +- pyproject.toml | 3 - uv.lock | 12 +- 16 files changed, 1544 insertions(+), 1867 deletions(-) delete mode 100644 docs/advanced-patterns.md delete mode 100644 docs/dependencies.md create mode 100644 docs/dependency-injection.md create mode 100644 docs/observability.md create mode 100644 docs/task-behaviors.md create mode 100644 docs/task-patterns.md diff --git a/.github/workflows/docs.yml b/.github/workflows/docs.yml index db07a248..ba805a3c 100644 --- a/.github/workflows/docs.yml +++ b/.github/workflows/docs.yml @@ -32,7 +32,7 @@ jobs: cache-dependency-glob: "pyproject.toml" - name: Install dependencies - run: uv sync --group docs + run: uv sync - name: Build documentation run: uv run zensical build diff --git a/.readthedocs.yaml b/.readthedocs.yaml index 13260224..d2cf5090 100644 --- a/.readthedocs.yaml +++ b/.readthedocs.yaml @@ -5,7 +5,7 @@ build: python: "3.12" commands: - pip install uv - - uv sync --group docs + - uv sync - uv run zensical build - mkdir -p $READTHEDOCS_OUTPUT/html - cp -r site/* $READTHEDOCS_OUTPUT/html/ diff --git a/README.md b/README.md index 7d259b24..7711cbec 100644 --- a/README.md +++ b/README.md @@ -124,7 +124,7 @@ time. To work on the documentation locally: ```bash -uv sync --group docs +uv sync uv run zensical serve ``` diff --git a/docs/advanced-patterns.md b/docs/advanced-patterns.md deleted file mode 100644 index 9a2f0ffd..00000000 --- a/docs/advanced-patterns.md +++ /dev/null @@ -1,1106 +0,0 @@ -# Advanced Task Patterns - -Docket is made for building complex distributed systems, and the patterns below highlight some of the original use cases for Docket. - -## Perpetual Tasks - -Perpetual tasks automatically reschedule themselves, making them well-suited for recurring work like health checks, data synchronization, or periodic cleanup operations. - -### Basic Perpetual Tasks - -```python -from docket import Perpetual - -async def health_check_service( - service_url: str, - perpetual: Perpetual = Perpetual(every=timedelta(minutes=5)) -) -> None: - try: - response = await http_client.get(f"{service_url}/health") - response.raise_for_status() - print(f"✓ {service_url} is healthy") - except Exception as e: - print(f"✗ {service_url} failed health check: {e}") - await send_alert(f"Service {service_url} is down") - -# Schedule the task once, it will run every 5 minutes forever -await docket.add(health_check_service)("https://api.example.com") -``` - -After each execution, the task automatically schedules itself to run again after the specified interval. - -### Automatic Startup - -Perpetual tasks can start themselves automatically when a worker sees them, without needing to be explicitly scheduled: - -```python -async def background_cleanup( - perpetual: Perpetual = Perpetual( - every=timedelta(hours=1), - automatic=True - ) -) -> None: - deleted_count = await cleanup_old_records() - print(f"Cleaned up {deleted_count} old records") - -# Just register the task - no need to schedule it -docket.register(background_cleanup) - -# When a worker starts, it will automatically begin running this task -# The task key will be the function name: "background_cleanup" -``` - -### Self-Canceling Tasks - -Perpetual tasks can stop themselves when their work is done: - -```python -async def monitor_deployment( - deployment_id: str, - perpetual: Perpetual = Perpetual(every=timedelta(seconds=30)) -) -> None: - status = await check_deployment_status(deployment_id) - - if status in ["completed", "failed"]: - await notify_deployment_finished(deployment_id, status) - perpetual.cancel() # Stop monitoring this deployment - return - - print(f"Deployment {deployment_id} status: {status}") -``` - -### Dynamic Parameters - -Perpetual tasks can change their arguments or timing for the next execution: - -```python -async def adaptive_rate_limiter( - api_endpoint: str, - requests_per_minute: int = 60, - perpetual: Perpetual = Perpetual(every=timedelta(minutes=1)) -) -> None: - # Check current API load - current_load = await check_api_load(api_endpoint) - - if current_load > 0.8: # High load - new_rate = max(30, requests_per_minute - 10) - perpetual.every = timedelta(seconds=30) # Check more frequently - print(f"High load detected, reducing rate to {new_rate} req/min") - else: # Normal load - new_rate = min(120, requests_per_minute + 5) - perpetual.every = timedelta(minutes=1) # Normal check interval - print(f"Normal load, increasing rate to {new_rate} req/min") - - # Schedule next run with updated parameters - perpetual.perpetuate(api_endpoint, new_rate) -``` - -### Error Resilience - -Perpetual tasks automatically reschedule themselves regardless of success or failure: - -```python -async def resilient_sync( - source_url: str, - perpetual: Perpetual = Perpetual(every=timedelta(minutes=15)) -) -> None: - # This will ALWAYS reschedule, whether it succeeds or fails - await sync_data_from_source(source_url) - print(f"Successfully synced data from {source_url}") -``` - -You don't need try/except blocks to ensure rescheduling - Docket handles this automatically. Whether the task completes successfully or raises an exception, the next execution will be scheduled according to the `every` interval. - -### Find & Flood Pattern - -A common perpetual task pattern is "find & flood" - a single perpetual task that periodically discovers work to do, then creates many smaller tasks to handle the actual work: - -```python -from docket import CurrentDocket, Perpetual - -async def find_pending_orders( - docket: Docket = CurrentDocket(), - perpetual: Perpetual = Perpetual(every=timedelta(minutes=1)) -) -> None: - # Find all orders that need processing - pending_orders = await database.fetch_pending_orders() - - # Flood the queue with individual processing tasks - for order in pending_orders: - await docket.add(process_single_order)(order.id) - - print(f"Queued {len(pending_orders)} orders for processing") - -async def process_single_order(order_id: int) -> None: - # Handle one specific order - await process_order_payment(order_id) - await update_inventory(order_id) - await send_confirmation_email(order_id) -``` - -This pattern separates discovery (finding work) from execution (doing work), allowing for better load distribution and fault isolation. The perpetual task stays lightweight and fast, while the actual work is distributed across many workers. - -## Task Scattering with Agenda - -For "find-and-flood" workloads, you often want to distribute a batch of tasks over time rather than scheduling them all immediately. The `Agenda` class collects related tasks and scatters them evenly across a time window. - -### Basic Scattering - -```python -from datetime import timedelta -from docket import Agenda, Docket - -async def process_item(item_id: int) -> None: - await perform_expensive_operation(item_id) - await update_database(item_id) - -async with Docket() as docket: - # Build an agenda of tasks - agenda = Agenda() - for item_id in range(1, 101): # 100 items to process - agenda.add(process_item)(item_id) - - # Scatter them evenly over 50 minutes to avoid overwhelming the system - executions = await agenda.scatter(docket, over=timedelta(minutes=50)) - print(f"Scheduled {len(executions)} tasks over 50 minutes") -``` - -Tasks are distributed evenly across the time window. For 100 tasks over 50 minutes, they'll be scheduled approximately 30 seconds apart. - -### Jitter for Thundering Herd Prevention - -Add random jitter to prevent multiple processes from scheduling identical work at exactly the same times: - -```python -# Scatter with ±30 second jitter around each scheduled time -await agenda.scatter( - docket, - over=timedelta(minutes=50), - jitter=timedelta(seconds=30) -) -``` - -### Future Scatter Windows - -Schedule the entire batch to start at a specific time in the future: - -```python -from datetime import datetime, timezone - -# Start scattering in 2 hours, spread over 30 minutes -start_time = datetime.now(timezone.utc) + timedelta(hours=2) -await agenda.scatter( - docket, - start=start_time, - over=timedelta(minutes=30) -) -``` - -### Mixed Task Types - -Agendas can contain different types of tasks: - -```python -async def send_email(user_id: str, template: str) -> None: - await email_service.send(user_id, template) - -async def update_analytics(event_data: dict[str, str]) -> None: - await analytics_service.track(event_data) - -# Create a mixed agenda -agenda = Agenda() -agenda.add(process_item)(item_id=1001) -agenda.add(send_email)("user123", "welcome") -agenda.add(update_analytics)({"event": "signup", "user": "user123"}) -agenda.add(process_item)(item_id=1002) - -# All tasks will be scattered in the order they were added -await agenda.scatter(docket, over=timedelta(minutes=10)) -``` - -### Single Task Positioning - -When scattering a single task, it's positioned at the midpoint of the time window: - -```python -agenda = Agenda() -agenda.add(process_item)(item_id=42) - -# This task will be scheduled 5 minutes from now (middle of 10-minute window) -await agenda.scatter(docket, over=timedelta(minutes=10)) -``` - -### Agenda Reusability - -Agendas can be reused for multiple scatter operations: - -```python -# Create a reusable template -daily_cleanup_agenda = Agenda() -daily_cleanup_agenda.add(cleanup_temp_files)() -daily_cleanup_agenda.add(compress_old_logs)() -daily_cleanup_agenda.add(update_metrics)() - -# Use it multiple times with different timing -await daily_cleanup_agenda.scatter(docket, over=timedelta(hours=1)) - -# Later, scatter the same tasks over a different window -tomorrow = datetime.now(timezone.utc) + timedelta(days=1) -await daily_cleanup_agenda.scatter( - docket, - start=tomorrow, - over=timedelta(minutes=30) -) -``` - -### Failure Behavior - -Keep in mind that, if an error occurs during scheduling, some tasks may have already been scheduled successfully: - -```python -agenda = Agenda() -agenda.add(valid_task)("arg1") -agenda.add(valid_task)("arg2") -agenda.add("nonexistent_task")("arg3") # This will cause an error -agenda.add(valid_task)("arg4") - -try: - await agenda.scatter(docket, over=timedelta(minutes=10)) -except KeyError: - # The first two tasks were scheduled successfully - # The error prevented the fourth task from being scheduled - pass -``` - -## Striking and Restoring Tasks - -Striking allows you to temporarily disable tasks without redeploying code. This is invaluable for incident response, gradual rollouts, or handling problematic customers. - -### Striking Entire Task Types - -Disable all instances of a specific task: - -```python -# Disable all order processing during maintenance -await docket.strike(process_order) - -# Orders added during this time won't be processed -await docket.add(process_order)(order_id=12345) # Won't run -await docket.add(process_order)(order_id=67890) # Won't run - -# Re-enable when ready -await docket.restore(process_order) -``` - -### Striking by Parameter Values - -Disable tasks based on their arguments using comparison operators: - -```python -# Block all tasks for a problematic customer -await docket.strike(None, "customer_id", "==", "12345") - -# Block low-priority work during high load -await docket.strike(process_order, "priority", "<=", "low") - -# Block all orders above a certain value during fraud investigation -await docket.strike(process_payment, "amount", ">", 10000) - -# Later, restore them -await docket.restore(None, "customer_id", "==", "12345") -await docket.restore(process_order, "priority", "<=", "low") -``` - -Supported operators include `==`, `!=`, `<`, `<=`, `>`, `>=`. - -### Striking Specific Task-Parameter Combinations - -Target very specific scenarios: - -```python -# Block only high-value orders for a specific customer -await docket.strike(process_order, "customer_id", "==", "12345") -await docket.strike(process_order, "amount", ">", 1000) - -# This order won't run (blocked customer) -await docket.add(process_order)(customer_id="12345", amount=500) - -# This order won't run (blocked customer AND high amount) -await docket.add(process_order)(customer_id="12345", amount=2000) - -# This order WILL run (different customer) -await docket.add(process_order)(customer_id="67890", amount=2000) -``` - -Striking is useful for incident response when you need to quickly disable failing tasks, customer management to block problematic accounts, gradual rollouts where you disable features for certain parameters, load management during high traffic, and debugging to isolate specific scenarios. - -## Advanced Logging and Debugging - -### Argument Logging - -Control which task arguments appear in logs using the `Logged` annotation: - -```python -from typing import Annotated -from docket import Logged - -async def process_payment( - customer_id: Annotated[str, Logged], # Will be logged - credit_card: str, # Won't be logged - amount: Annotated[float, Logged()] = 0.0, # Will be logged - trace_id: Annotated[str, Logged] = "unknown" # Will be logged -) -> None: - # Process the payment... - pass - -# Log output will show: -# process_payment('12345', credit_card=..., amount=150.0, trace_id='abc-123') -``` - -### Collection Length Logging - -For large collections, log just their size instead of contents: - -```python -async def bulk_update_users( - user_ids: Annotated[list[str], Logged(length_only=True)], - metadata: Annotated[dict[str, str], Logged(length_only=True)], - options: Annotated[set[str], Logged(length_only=True)] -) -> None: - # Process users... - pass - -# Log output will show: -# bulk_update_users([len 150], metadata={len 5}, options={len 3}) -``` - -This prevents logs from being overwhelmed with large data structures while still providing useful information. - -### Task Context Logging - -Use `TaskLogger` for structured logging with task context: - -```python -from logging import Logger, LoggerAdapter -from docket import TaskLogger - -async def complex_data_pipeline( - dataset_id: str, - logger: LoggerAdapter[Logger] = TaskLogger() -) -> None: - logger.info("Starting data pipeline", extra={"dataset_id": dataset_id}) - - try: - await extract_data(dataset_id) - logger.info("Data extraction completed") - - await transform_data(dataset_id) - logger.info("Data transformation completed") - - await load_data(dataset_id) - logger.info("Data loading completed") - - except Exception as e: - logger.error("Pipeline failed", extra={"error": str(e)}) - raise -``` - -The logger automatically includes task context like the task name, key, and worker information. - -### Built-in Utility Tasks - -Docket provides helpful debugging tasks: - -```python -from docket import tasks - -# Simple trace logging -await docket.add(tasks.trace)("System startup completed") -await docket.add(tasks.trace)("Processing batch 123") - -# Intentional failures for testing error handling -await docket.add(tasks.fail)("Testing error notification system") -``` - -These are particularly useful for: - -- Marking milestones in complex workflows -- Testing monitoring and alerting systems -- Debugging task execution order -- Creating synthetic load for testing - -## Task Chain Patterns - -### Sequential Processing - -Create chains of related tasks that pass data forward: - -```python -async def download_data( - url: str, - docket: Docket = CurrentDocket() -) -> None: - file_path = await download_file(url) - await docket.add(validate_data)(file_path) - -async def validate_data( - file_path: str, - docket: Docket = CurrentDocket() -) -> None: - if await is_valid_data(file_path): - await docket.add(process_data)(file_path) - else: - await docket.add(handle_invalid_data)(file_path) - -async def process_data(file_path: str) -> None: - # Final processing step - await transform_and_store(file_path) -``` - -### Fan-out Processing - -Break large tasks into parallel subtasks: - -```python -async def process_large_dataset( - dataset_id: str, - docket: Docket = CurrentDocket() -) -> None: - chunk_ids = await split_dataset_into_chunks(dataset_id) - - # Schedule parallel processing of all chunks - for chunk_id in chunk_ids: - await docket.add(process_chunk)(dataset_id, chunk_id) - - # Schedule a task to run after all chunks should be done - estimated_completion = datetime.now(timezone.utc) + timedelta(hours=2) - await docket.add( - finalize_dataset, - when=estimated_completion, - key=f"finalize-{dataset_id}" - )(dataset_id, len(chunk_ids)) - -async def process_chunk(dataset_id: str, chunk_id: str) -> None: - await process_data_chunk(dataset_id, chunk_id) - await mark_chunk_complete(dataset_id, chunk_id) -``` - -### Conditional Workflows - -Tasks can make decisions about what work to schedule next: - -```python -async def analyze_user_behavior( - user_id: str, - docket: Docket = CurrentDocket() -) -> None: - behavior_data = await collect_user_behavior(user_id) - - if behavior_data.indicates_churn_risk(): - await docket.add(create_retention_campaign)(user_id) - elif behavior_data.indicates_upsell_opportunity(): - await docket.add(create_upsell_campaign)(user_id) - elif behavior_data.indicates_satisfaction(): - # Schedule a follow-up check in 30 days - future_check = datetime.now(timezone.utc) + timedelta(days=30) - await docket.add( - analyze_user_behavior, - when=future_check, - key=f"behavior-check-{user_id}" - )(user_id) -``` - -## Concurrency Control - -Docket provides fine-grained concurrency control that allows you to limit the number of concurrent tasks based on specific argument values. This is essential for protecting shared resources, preventing overwhelming external services, and managing database connections. - -### Basic Concurrency Limits - -Use `ConcurrencyLimit` to restrict concurrent execution based on task arguments: - -```python -from docket import ConcurrencyLimit - -async def process_customer_data( - customer_id: int, - concurrency: ConcurrencyLimit = ConcurrencyLimit("customer_id", max_concurrent=1) -) -> None: - # Only one task per customer_id can run at a time - await update_customer_profile(customer_id) - await recalculate_customer_metrics(customer_id) - -# These will run sequentially for the same customer -await docket.add(process_customer_data)(customer_id=1001) -await docket.add(process_customer_data)(customer_id=1001) -await docket.add(process_customer_data)(customer_id=1001) - -# But different customers can run concurrently -await docket.add(process_customer_data)(customer_id=2001) # Runs in parallel -await docket.add(process_customer_data)(customer_id=3001) # Runs in parallel -``` - -### Database Connection Pooling - -Limit concurrent database operations to prevent overwhelming your database: - -```python -async def backup_database_table( - db_name: str, - table_name: str, - concurrency: ConcurrencyLimit = ConcurrencyLimit("db_name", max_concurrent=2) -) -> None: - # Maximum 2 backup operations per database at once - await create_table_backup(db_name, table_name) - await verify_backup_integrity(db_name, table_name) - -# Schedule many backup tasks - only 2 per database will run concurrently -tables = ["users", "orders", "products", "analytics", "logs"] -for table in tables: - await docket.add(backup_database_table)("production", table) - await docket.add(backup_database_table)("staging", table) -``` - -### API Rate Limiting - -Protect external APIs from being overwhelmed: - -```python -async def sync_user_with_external_service( - user_id: int, - service_name: str, - concurrency: ConcurrencyLimit = ConcurrencyLimit("service_name", max_concurrent=5) -) -> None: - # Limit to 5 concurrent API calls per external service - api_client = get_api_client(service_name) - user_data = await fetch_user_data(user_id) - await api_client.sync_user(user_data) - -# These respect per-service limits -await docket.add(sync_user_with_external_service)(123, "salesforce") -await docket.add(sync_user_with_external_service)(456, "salesforce") # Will queue if needed -await docket.add(sync_user_with_external_service)(789, "hubspot") # Different service, runs in parallel -``` - -### File Processing Limits - -Control concurrent file operations to manage disk I/O: - -```python -async def process_media_file( - file_path: str, - operation_type: str, - concurrency: ConcurrencyLimit = ConcurrencyLimit("operation_type", max_concurrent=3) -) -> None: - # Limit concurrent operations by type (e.g., 3 video transcodes, 3 image resizes) - if operation_type == "video_transcode": - await transcode_video(file_path) - elif operation_type == "image_resize": - await resize_image(file_path) - elif operation_type == "audio_compress": - await compress_audio(file_path) - -# Different operation types can run concurrently, but each type is limited -await docket.add(process_media_file)("/videos/movie1.mp4", "video_transcode") -await docket.add(process_media_file)("/videos/movie2.mp4", "video_transcode") -await docket.add(process_media_file)("/images/photo1.jpg", "image_resize") # Runs in parallel -``` - -### Custom Scopes - -Use custom scopes to create independent concurrency limits: - -```python -async def process_tenant_data( - tenant_id: str, - operation: str, - concurrency: ConcurrencyLimit = ConcurrencyLimit( - "tenant_id", - max_concurrent=2, - scope="tenant_operations" - ) -) -> None: - # Each tenant can have up to 2 concurrent operations - await perform_tenant_operation(tenant_id, operation) - -async def process_global_data( - data_type: str, - concurrency: ConcurrencyLimit = ConcurrencyLimit( - "data_type", - max_concurrent=1, - scope="global_operations" # Separate from tenant operations - ) -) -> None: - # Global operations have their own concurrency limits - await process_global_data_type(data_type) -``` - -### Multi-Level Concurrency - -Combine multiple concurrency controls for complex scenarios: - -```python -async def process_user_export( - user_id: int, - export_type: str, - region: str, - user_limit: ConcurrencyLimit = ConcurrencyLimit("user_id", max_concurrent=1), - type_limit: ConcurrencyLimit = ConcurrencyLimit("export_type", max_concurrent=3), - region_limit: ConcurrencyLimit = ConcurrencyLimit("region", max_concurrent=10) -) -> None: - # This task respects ALL concurrency limits: - # - Only 1 export per user at a time - # - Only 3 exports of each type globally - # - Only 10 exports per region - await generate_user_export(user_id, export_type, region) -``` - -**Note**: When using multiple `ConcurrencyLimit` dependencies, all limits must be satisfied before the task can start. - -### Monitoring Concurrency - -Concurrency limits are enforced using Redis sets, so you can monitor them: - -```python -async def monitor_concurrency_usage() -> None: - async with docket.redis() as redis: - # Check how many tasks are running for a specific limit - active_count = await redis.scard("docket:concurrency:customer_id:1001") - print(f"Customer 1001 has {active_count} active tasks") - - # List all active concurrency keys - keys = await redis.keys("docket:concurrency:*") - for key in keys: - count = await redis.scard(key) - print(f"{key}: {count} active tasks") -``` - -### Best Practices - -1. **Choose appropriate argument names**: Use arguments that represent the resource you want to protect (database name, customer ID, API endpoint). - -2. **Set reasonable limits**: Base limits on your system's capacity and external service constraints. - -3. **Use descriptive scopes**: When you have multiple unrelated concurrency controls, use different scopes to avoid conflicts. - -4. **Monitor blocked tasks**: Tasks that can't start due to concurrency limits are automatically rescheduled with small delays. - -5. **Consider cascading effects**: Concurrency limits can create queuing effects - monitor your system to ensure tasks don't back up excessively. - -Concurrency control helps you build robust systems that respect resource limits while maintaining high throughput for independent operations. - -## Task State and Progress Monitoring - -Docket provides comprehensive execution state tracking, progress monitoring, and result persistence. These features enable you to observe task execution in real-time, report progress to users, and retrieve results from completed tasks. - -### High-Level Design - -Understanding how Docket tracks and stores task execution information helps when building observable systems. - -#### Execution State Machine - -Every task execution transitions through a well-defined lifecycle: - -``` -SCHEDULED → QUEUED → RUNNING → COMPLETED - ↘ FAILED -``` - -- **SCHEDULED**: Task is scheduled and waiting in the queue for its execution time -- **QUEUED**: Task has been moved to the stream and is ready to be claimed by a worker -- **RUNNING**: Task is currently being executed by a worker -- **COMPLETED**: Task execution finished successfully -- **FAILED**: Task execution failed - -State transitions are atomic and published via Redis pub/sub for real-time monitoring. - -#### Redis Data Model - -Docket stores execution state and progress in Redis with automatic cleanup: - -**Execution State** (`{docket}:runs:{key}`): - -- Stored as Redis hash containing state, timestamps, worker name, error messages, and result keys -- TTL controlled by `execution_ttl` (default: 15 minutes) -- Setting `execution_ttl=0` skips state persistence entirely for maximum throughput - -**Progress Data** (`{docket}:progress:{key}`): - -- Stored as Redis hash with `current`, `total`, `message`, and `updated_at` fields -- Ephemeral data deleted when task completes -- Updated atomically with `increment()` for thread-safe progress tracking - -**Result Storage**: - -- Results stored using `py-key-value-aio` library (RedisStore or MemoryStore) -- Serialized with cloudpickle and base64-encoded for reliability -- TTL matches `execution_ttl` for consistent cleanup -- Skipped when `execution_ttl=0` or task returns `None` - -#### Pub/Sub Event System - -Docket publishes real-time events for state transitions and progress updates: - -**State Events** (channel: `{docket}:state:{key}`): -```python -{ - "type": "state", - "key": "task-key", - "state": "running", - "when": "2025-01-15T10:30:00Z", - "worker": "worker-1", - "started_at": "2025-01-15T10:30:05Z", - "completed_at": None, - "error": None -} -``` - -**Progress Events** (channel: `{docket}:progress:{key}`): -```python -{ - "type": "progress", - "key": "task-key", - "current": 45, - "total": 100, - "message": "Processing records...", - "updated_at": "2025-01-15T10:30:10Z" -} -``` - -These events enable real-time dashboards, progress bars, and monitoring systems to track task execution without polling. - -### Tracking Execution State - -Access the current state of any task execution: - -```python -from docket import Docket -from docket.execution import ExecutionState - -async with Docket() as docket: - # Schedule a task - execution = await docket.add(process_order)(order_id=12345) - - # Check initial state - print(f"State: {execution.state}") # ExecutionState.QUEUED - - # Later, sync with Redis to get current state - await execution.sync() - print(f"State: {execution.state}") # May be RUNNING or COMPLETED - - # Check specific states - if execution.state == ExecutionState.COMPLETED: - print(f"Task completed at {execution.completed_at}") - elif execution.state == ExecutionState.FAILED: - print(f"Task failed: {execution.error}") - elif execution.state == ExecutionState.RUNNING: - print(f"Task running on {execution.worker} since {execution.started_at}") -``` - -### Reporting Task Progress - -Tasks can report their progress to provide visibility into long-running operations: - -```python -from docket import Progress -from docket.execution import ExecutionProgress - -async def import_customer_records( - file_path: str, - progress: ExecutionProgress = Progress() -) -> None: - # Read the data source - records = await load_records(file_path) - - # Set the total number of items - await progress.set_total(len(records)) - await progress.set_message("Starting import") - - # Process records one by one - for i, record in enumerate(records, 1): - await import_record(record) - - # Update progress - await progress.increment() - await progress.set_message(f"Imported record {i}/{len(records)}") - - await progress.set_message("Import complete") - -# Schedule the task -await docket.add(import_customer_records)("/data/customers.csv") -``` - -Progress updates are atomic and published via pub/sub, so multiple observers can monitor the same task simultaneously. - -### Monitoring Progress in Real-Time - -Subscribe to progress updates programmatically: - -```python -async def monitor_task_progress(execution: Execution) -> None: - """Monitor a task's progress and state in real-time.""" - async for event in execution.subscribe(): - if event["type"] == "state": - state = event["state"] - print(f"State changed to: {state}") - - if state in (ExecutionState.COMPLETED, ExecutionState.FAILED): - break - - elif event["type"] == "progress": - current = event["current"] - total = event["total"] - message = event["message"] - percentage = (current / total * 100) if total > 0 else 0 - print(f"Progress: {current}/{total} ({percentage:.1f}%) - {message}") - -# Schedule a task and monitor it -execution = await docket.add(import_customer_records)("/data/large_dataset.csv") - -# Monitor in a separate task -asyncio.create_task(monitor_task_progress(execution)) -``` - -### Advanced Progress Patterns - -#### Incremental Progress - -For tasks with known steps, use `set_total()` and `increment()`: - -```python -async def process_batch( - batch_id: int, - progress: ExecutionProgress = Progress() -) -> None: - items = await fetch_batch_items(batch_id) - await progress.set_total(len(items)) - - for item in items: - await process_item(item) - await progress.increment() # Increments by 1 -``` - -#### Batch Progress Updates - -For fine-grained work, batch progress updates to reduce Redis calls: - -```python -async def process_large_dataset( - dataset_id: str, - progress: ExecutionProgress = Progress() -) -> None: - records = await load_dataset(dataset_id) - await progress.set_total(len(records)) - - # Update every 100 records instead of every record - for i, record in enumerate(records): - await process_record(record) - - if (i + 1) % 100 == 0: - await progress.increment(100) - await progress.set_message(f"Processed {i + 1} records") - - # Update any remaining progress - remaining = len(records) % 100 - if remaining > 0: - await progress.increment(remaining) -``` - -#### Nested Progress Tracking - -Break down complex tasks into subtasks with their own progress: - -```python -async def data_migration( - source_db: str, - progress: ExecutionProgress = Progress() -) -> None: - # Define major phases - phases = [ - ("extract", extract_data), - ("transform", transform_data), - ("load", load_data), - ("verify", verify_data) - ] - - await progress.set_total(len(phases) * 100) - - for phase_num, (phase_name, phase_func) in enumerate(phases): - await progress.set_message(f"Phase: {phase_name}") - - # Each phase reports its own progress (0-100) - # We scale it to our overall progress - phase_progress = 0 - async for update in phase_func(source_db): - # Each phase returns progress from 0-100 - delta = update - phase_progress - await progress.increment(delta) - phase_progress = update -``` - -### Retrieving Task Results - -Tasks can return values that are automatically persisted and retrievable: - -```python -async def calculate_metrics(dataset_id: str) -> dict[str, float]: - """Calculate and return metrics from a dataset.""" - data = await load_dataset(dataset_id) - return { - "mean": sum(data) / len(data), - "max": max(data), - "min": min(data), - "count": len(data) - } - -# Schedule the task -execution = await docket.add(calculate_metrics)("dataset-2025-01") - -# Later, retrieve the result -metrics = await execution.get_result() -print(f"Mean: {metrics['mean']}, Count: {metrics['count']}") -``` - -#### Waiting for Results - -`get_result()` automatically waits for task completion if it's still running: - -```python -# Schedule a task and immediately wait for its result -execution = await docket.add(fetch_external_data)("https://api.example.com/data") - -# This will wait until the task completes -try: - data = await execution.get_result() - print(f"Retrieved {len(data)} records") -except Exception as e: - print(f"Task failed: {e}") -``` - -#### Timeout and Deadline - -Control how long to wait for results: - -```python -from datetime import datetime, timedelta, timezone - -# Wait at most 30 seconds for a result -try: - result = await execution.get_result(timeout=timedelta(seconds=30)) -except TimeoutError: - print("Task didn't complete in 30 seconds") - -# Or specify an absolute deadline -deadline = datetime.now(timezone.utc) + timedelta(minutes=5) -try: - result = await execution.get_result(deadline=deadline) -except TimeoutError: - print("Task didn't complete by deadline") -``` - -Following Python conventions, you can specify either `timeout` (relative duration) or `deadline` (absolute time), but not both. - -#### Exception Handling - -When tasks fail, `get_result()` re-raises the original exception: - -```python -async def risky_operation(data: dict) -> str: - if not data.get("valid"): - raise ValueError("Invalid data provided") - return process_data(data) - -execution = await docket.add(risky_operation)({"valid": False}) - -try: - result = await execution.get_result() -except ValueError as e: - # The original ValueError is re-raised - print(f"Validation failed: {e}") -except Exception as e: - # Other exceptions are also preserved - print(f"Unexpected error: {e}") -``` - -#### Result Patterns for Workflows - -Chain tasks together using results: - -```python -async def download_file(url: str) -> str: - """Download a file and return the local path.""" - file_path = await download(url) - return file_path - -async def process_file(file_path: str) -> dict: - """Process a file and return statistics.""" - data = await parse_file(file_path) - return calculate_statistics(data) - -# Chain tasks together -download_execution = await docket.add(download_file)("https://example.com/data.csv") -file_path = await download_execution.get_result() - -# Use the result to schedule the next task -process_execution = await docket.add(process_file)(file_path) -stats = await process_execution.get_result() -print(f"Statistics: {stats}") -``` - -For complex workflows with many dependencies, consider using the `CurrentDocket()` dependency to schedule follow-up work from within tasks themselves. - -### CLI Monitoring with Watch - -Monitor task execution in real-time from the command line: - -```bash -# Watch a specific task -docket watch --url redis://localhost:6379/0 --docket emails task-key-123 - -# The watch command shows: -# - Current state (SCHEDULED, QUEUED, RUNNING, COMPLETED, FAILED) -# - Progress bar with percentage -# - Status messages -# - Execution timing -# - Worker information -``` - -Example output: -``` -State: RUNNING (worker-1) -Started: 2025-01-15 10:30:05 - -Progress: [████████████░░░░░░░░] 60/100 (60.0%) -Message: Processing records... -Updated: 2025-01-15 10:30:15 -``` - -The watch command uses pub/sub to receive real-time updates without polling, making it efficient for monitoring long-running tasks. - -### Fire-and-Forget Mode - -For high-throughput scenarios where observability isn't critical, disable state and result persistence: - -```python -from datetime import timedelta - -async with Docket( - name="high-throughput", - url="redis://localhost:6379/0", - execution_ttl=timedelta(0) # Disable state persistence -) as docket: - # Tasks scheduled with this docket won't track state or store results - # This maximizes throughput for fire-and-forget operations - for i in range(10000): - await docket.add(send_notification)(user_id=i) -``` - -With `execution_ttl=0`: - -- No state records are created in Redis -- No results are persisted -- Progress tracking is not available -- `get_result()` will not work - -This is ideal for high-volume event processing where individual task tracking isn't necessary. - -These advanced patterns enable building sophisticated distributed systems that can adapt to changing conditions, handle operational requirements, and provide the debugging and testing capabilities needed for production deployments. diff --git a/docs/dependencies.md b/docs/dependencies.md deleted file mode 100644 index 14d289f2..00000000 --- a/docs/dependencies.md +++ /dev/null @@ -1,539 +0,0 @@ -# Dependencies Guide - -Docket tasks include a dependency injection system that provides access to context, configuration, and custom resources. This system is similar to FastAPI's dependency injection but tailored for background task patterns. - -## Built-in Context Dependencies - -### Accessing the Current Docket - -Tasks often need to schedule more work. The `CurrentDocket` dependency gives you access to the same docket the worker is processing: - -```python -from pathlib import Path -from datetime import datetime, timedelta, timezone -from docket import Docket, CurrentDocket - -def now() -> datetime: - return datetime.now(timezone.utc) - -async def poll_for_file( - file_path: str, - docket: Docket = CurrentDocket() -) -> None: - path = Path(file_path) - if path.exists(): - print(f"File {file_path} found!") - return - - # Schedule another check in 30 seconds - await docket.add( - poll_for_file, - when=now() + timedelta(seconds=30) - )(file_path) -``` - -This is especially useful for self-perpetuating tasks that create chains of future work. - -### Getting Your Task Key - -Use `TaskKey` to access the current task's key, which is helpful for creating related work or maintaining task chains: - -```python -from docket import CurrentDocket, TaskKey - -async def process_data_chunk( - dataset_id: int, - chunk: int, - total_chunks: int, - key: str = TaskKey(), - docket: Docket = CurrentDocket() -) -> None: - print(f"Processing chunk {chunk}/{total_chunks} for dataset {dataset_id}") - - # Process this chunk... - await process_chunk_data(dataset_id, chunk) - - if chunk < total_chunks: - # Schedule next chunk with a related key - next_key = f"dataset-{dataset_id}-chunk-{chunk + 1}" - await docket.add( - process_data_chunk, - key=next_key - )(dataset_id, chunk + 1, total_chunks) -``` - -### Worker and Execution Context - -Access the current worker and execution details when needed: - -```python -from docket import CurrentWorker, CurrentExecution, Worker, Execution - -async def diagnostic_task( - worker: Worker = CurrentWorker(), - execution: Execution = CurrentExecution() -) -> None: - print(f"Running on worker: {worker.name}") - print(f"Task key: {execution.key}") - print(f"Scheduled at: {execution.when}") - print(f"Worker concurrency: {worker.concurrency}") -``` - -### Reporting Task Progress - -The `Progress()` dependency provides access to the current task's progress tracker, allowing tasks to report their progress to external observers: - -```python -from docket import Progress -from docket.execution import ExecutionProgress - -async def import_records( - file_path: str, - progress: ExecutionProgress = Progress() -) -> None: - records = await load_records(file_path) - - # Set the total number of items to process - await progress.set_total(len(records)) - await progress.set_message("Starting import") - - for i, record in enumerate(records, 1): - await import_record(record) - - # Update progress atomically - await progress.increment() - - # Optionally update status message - if i % 100 == 0: - await progress.set_message(f"Imported {i}/{len(records)} records") - - await progress.set_message("Import complete") -``` - -Progress updates are: - -- **Atomic**: `increment()` uses Redis HINCRBY for thread-safe updates -- **Real-time**: Updates published via pub/sub for live monitoring -- **Observable**: Can be monitored with `docket watch` CLI or programmatically -- **Ephemeral**: Progress data is automatically deleted when the task completes - -The `ExecutionProgress` object provides these methods: - -- `set_total(total: int)`: Set the target/total value for progress tracking -- `increment(amount: int = 1)`: Atomically increment the current progress value -- `set_message(message: str)`: Update the status message -- `sync()`: Refresh local state from Redis - -For more details on progress monitoring patterns and real-time observation, see [Task State and Progress Monitoring](advanced-patterns.md#task-state-and-progress-monitoring). - -## Advanced Retry Patterns - -### Exponential Backoff - -For services that might be overloaded, exponential backoff gives them time to recover: - -```python -from docket import ExponentialRetry - -async def call_external_api( - url: str, - retry: ExponentialRetry = ExponentialRetry( - attempts=5, - minimum_delay=timedelta(seconds=1), - maximum_delay=timedelta(minutes=5) - ) -) -> None: - # Retries with delays: 1s, 2s, 4s, 8s, 16s (but capped at 5 minutes) - try: - response = await http_client.get(url) - response.raise_for_status() - print(f"API call succeeded on attempt {retry.attempt}") - except Exception as e: - print(f"Attempt {retry.attempt} failed: {e}") - raise -``` - -### Unlimited Retries - -For critical tasks that must eventually succeed, use `attempts=None`: - -```python -from docket import Retry - -async def critical_data_sync( - source_url: str, - retry: Retry = Retry(attempts=None, delay=timedelta(minutes=5)) -) -> None: - # This will retry forever with 5-minute delays until it succeeds - await sync_critical_data(source_url) - print(f"Critical sync completed after {retry.attempt} attempts") -``` - -Both `Retry` and `ExponentialRetry` support unlimited retries this way. - -## Task Timeouts - -Prevent tasks from running too long with the `Timeout` dependency: - -```python -from docket import Timeout - -async def data_processing_task( - large_dataset: dict, - timeout: Timeout = Timeout(timedelta(minutes=10)) -) -> None: - # This task will be cancelled if it runs longer than 10 minutes - await process_dataset_phase_one(large_dataset) - - # Extend timeout if we need more time for phase two - timeout.extend(timedelta(minutes=5)) - await process_dataset_phase_two(large_dataset) -``` - -The `extend()` method can take a specific duration or default to the original timeout duration: - -```python -async def adaptive_timeout_task( - timeout: Timeout = Timeout(timedelta(minutes=2)) -) -> None: - await quick_check() - - # Extend by the base timeout (another 2 minutes) - timeout.extend() - await longer_operation() -``` - -Timeouts work alongside retries. If a task times out, it can be retried according to its retry policy. - -## Custom Dependencies - -Create your own dependencies using `Depends()` for reusable resources and patterns. Dependencies can be either synchronous or asynchronous. - -### Synchronous Dependencies - -Use sync dependencies for pure computations and in-memory operations: - -```python -from docket import Depends - -# In-memory config lookup - no I/O -def get_config() -> dict: - """Access configuration from memory.""" - return {"api_url": "https://api.example.com", "timeout": 30} - -# Pure computation - no I/O -def build_request_headers(config: dict = Depends(get_config)) -> dict: - """Construct headers from config.""" - return { - "User-Agent": "MyApp/1.0", - "Timeout": str(config["timeout"]) - } - -async def call_api( - headers: dict = Depends(build_request_headers) -) -> None: - # Headers are computed without blocking - # Network I/O happens here (async) - response = await http_client.get(url, headers=headers) -``` - -**Important**: Synchronous dependencies should **NOT** include blocking I/O operations (file access, network calls, database queries, etc.) as it will block the event loop and prevent tasks from being executed. Use async dependencies for any I/O. Sync dependencies are best for: - -- Pure computations -- In-memory data structure access -- Configuration lookups from memory -- Non-blocking transformations - -### Asynchronous Dependencies - -```python -from contextlib import asynccontextmanager -from docket import Depends - -@asynccontextmanager -async def get_database_connection(): - """Async dependency that returns a database connection.""" - conn = await database.connect() - try: - yield conn - finally: - await conn.close() - -async def process_user_data( - user_id: int, - db=Depends(get_database_connection) -) -> None: - # Database connection is automatically provided and cleaned up - user = await db.fetch_user(user_id) - await db.update_user(user_id, {"last_seen": datetime.now()}) -``` - -### Synchronous Context Managers - -Use sync context managers only for managing in-memory resources or quick non-blocking operations: - -```python -from contextlib import contextmanager -from docket import Depends - -# In-memory resource tracking - no I/O -@contextmanager -def track_operation(operation_name: str): - """Track operation execution without blocking.""" - operations_in_progress.add(operation_name) # In-memory set - try: - yield operation_name - finally: - operations_in_progress.remove(operation_name) - -async def process_data( - tracker=Depends(lambda: track_operation("data_processing")) -) -> None: - # Operation tracked in memory, no blocking - await perform_async_work() -``` - -### Mixed Sync and Async Dependencies - -You can freely mix synchronous and asynchronous dependencies in the same task. Use sync for computations, async for I/O: - -```python -# Sync - in-memory config lookup -def get_local_config() -> dict: - """Access local config from memory - no I/O.""" - return {"retry_count": 3, "batch_size": 100} - -# Async - network I/O -async def get_remote_config() -> dict: - """Fetch remote config via network - requires I/O.""" - response = await http_client.get("/api/config") - return await response.json() - -# Sync - pure computation -def merge_configs( - local: dict = Depends(get_local_config), - remote: dict = Depends(get_remote_config) -) -> dict: - """Merge configs without blocking - pure computation.""" - return {**local, **remote} - -async def process_batch( - config: dict = Depends(merge_configs) -) -> None: - # Config is computed/fetched appropriately - # Now do the actual I/O work - for i in range(config["batch_size"]): - await process_item(i, retries=config["retry_count"]) -``` - -### Nested Dependencies - -Dependencies can depend on other dependencies, and Docket resolves them in the correct order: - -```python -async def get_auth_service(db=Depends(get_database_connection)): - """A service that depends on the database connection.""" - return AuthService(db) - -async def get_user_service( - db=Depends(get_database_connection), - auth=Depends(get_auth_service) -): - """A service that depends on both database and auth service.""" - return UserService(db, auth) - -async def update_user_profile( - user_id: int, - profile_data: dict, - user_service=Depends(get_user_service) -) -> None: - # All dependencies are resolved automatically: - # db -> auth_service -> user_service -> this task - await user_service.update_profile(user_id, profile_data) -``` - -Dependencies are resolved once per task execution and cached, so if multiple parameters depend on the same resource, only one instance is created. This caching works across both sync and async dependencies. - -### Dependencies with Built-in Context - -Dependencies can access Docket's built-in context dependencies: - -```python -async def get_task_logger( - execution: Execution = CurrentExecution(), - worker: Worker = CurrentWorker() -) -> LoggerAdapter: - """Create a logger with task and worker context.""" - logger = logging.getLogger(f"worker.{worker.name}") - return LoggerAdapter(logger, { - 'task_key': execution.key, - 'worker_name': worker.name - }) - -async def important_task( - data: dict, - logger=Depends(get_task_logger) -) -> None: - logger.info("Starting important task") - await process_important_data(data) - logger.info("Important task completed") -``` - -## TaskArgument: Accessing Task Parameters - -Dependencies can access the task's input arguments using `TaskArgument`: - -```python -from docket import TaskArgument - -async def get_user_context(user_id: int = TaskArgument()) -> dict: - """Dependency that fetches user context based on task argument.""" - user = await fetch_user(user_id) - return { - 'user': user, - 'permissions': await fetch_user_permissions(user_id), - 'preferences': await fetch_user_preferences(user_id) - } - -async def send_personalized_email( - user_id: int, - message: str, - user_context=Depends(get_user_context) -) -> None: - # user_context is populated based on the user_id argument - email = personalize_email(message, user_context['preferences']) - await send_email(user_context['user'].email, email) -``` - -You can access arguments by name or make them optional: - -```python -async def get_optional_config( - config_name: str | None = TaskArgument("config", optional=True) -) -> dict: - """Get configuration if provided, otherwise use defaults.""" - if config_name: - return await load_config(config_name) - return DEFAULT_CONFIG - -async def flexible_task( - data: dict, - config: str | None = None, # Optional argument - resolved_config=Depends(get_optional_config) -) -> None: - # resolved_config will be loaded config or defaults - await process_data(data, resolved_config) -``` - -## Dependency Error Handling - -When dependencies fail, the entire task fails with detailed error information: - -```python -async def unreliable_dependency(): - if random.random() < 0.5: - raise ValueError("Service unavailable") - return "success" - -async def dependent_task( - value=Depends(unreliable_dependency) -) -> None: - print(f"Got value: {value}") -``` - -If `unreliable_dependency` fails, the task won't execute and the error will be logged with context about which dependency failed. This prevents tasks from running with incomplete or invalid dependencies. - -## Dependency Guidelines - -### Choose Sync vs Async Appropriately - -**Use synchronous dependencies for:** - -- Pure computations (math, string manipulation, data transformations) -- In-memory data structure access (dicts, lists, sets) -- Configuration lookups from memory -- Non-blocking operations that complete instantly - -**Use asynchronous dependencies for:** - -- Network I/O (HTTP requests, API calls) -- File I/O (reading/writing files) -- Database queries -- Any operation that involves `await` -- Resource management requiring async cleanup - -```python -# ✅ Good: Sync for pure computation -def calculate_batch_size(item_count: int) -> int: - return min(item_count, 1000) - -# ✅ Good: Async for I/O -async def fetch_user_data(user_id: int) -> dict: - return await api_client.get(f"/users/{user_id}") - -# ❌ Bad: Sync with blocking I/O -def load_config_from_file() -> dict: - with open("config.json") as f: # Blocks the event loop! - return json.load(f) - -# ✅ Good: Use async for file I/O instead -async def load_config_from_file() -> dict: - async with aiofiles.open("config.json") as f: - return json.loads(await f.read()) -``` - -### Design for Reusability - -Create dependencies that can be used across multiple tasks: - -```python -# Good: Reusable across many tasks -async def get_api_client(): - return APIClient(api_key=os.getenv("API_KEY")) - -# Less ideal: Too specific to one task -async def get_user_api_client_for_profile_updates(): - return APIClient(api_key=os.getenv("API_KEY"), timeout=30) -``` - -### Keep Dependencies Focused - -Each dependency should have a single responsibility: - -```python -# Good: Focused dependencies -async def get_database(): - return await database.connect() - -async def get_cache(): - return redis.Redis() - -# Less ideal: Too many responsibilities -async def get_all_services(): - return { - 'db': await database.connect(), - 'cache': redis.Redis(), - 'api': APIClient(), - 'metrics': MetricsClient() - } -``` - -### Handle Resource Cleanup - -Always use context managers or try/finally for resource cleanup: - -```python -# Good: Automatic cleanup -async def get_database(): - conn = await database.connect() - try: - yield conn - finally: - await conn.close() - -# Risky: Manual cleanup required -async def get_database_no_cleanup(): - return await database.connect() # Who closes this? -``` - -The dependency injection system supports flexible task design while maintaining clear separation of concerns. Dependencies can be simple values, complex services, or entire subsystems that your tasks need to operate effectively. diff --git a/docs/dependency-injection.md b/docs/dependency-injection.md new file mode 100644 index 00000000..ca08653e --- /dev/null +++ b/docs/dependency-injection.md @@ -0,0 +1,366 @@ +# Dependency Injection + +Docket includes a dependency injection system that provides access to context, configuration, and custom resources. It's similar to FastAPI's dependency injection but tailored for background task patterns. + +## Contextual Dependencies + +### Accessing the Current Docket + +Tasks often need to schedule more work. The `CurrentDocket` dependency gives you access to the same docket the worker is processing: + +```python +from pathlib import Path +from datetime import datetime, timedelta, timezone +from docket import Docket, CurrentDocket + +def now() -> datetime: + return datetime.now(timezone.utc) + +async def poll_for_file( + file_path: str, + docket: Docket = CurrentDocket() +) -> None: + path = Path(file_path) + if path.exists(): + print(f"File {file_path} found!") + return + + # Schedule another check in 30 seconds + await docket.add( + poll_for_file, + when=now() + timedelta(seconds=30) + )(file_path) +``` + +This is especially useful for self-perpetuating tasks that create chains of future work. + +### Getting Your Task Key + +Use `TaskKey` to access the current task's key, which is helpful for creating related work or maintaining task chains: + +```python +from docket import CurrentDocket, TaskKey + +async def process_data_chunk( + dataset_id: int, + chunk: int, + total_chunks: int, + key: str = TaskKey(), + docket: Docket = CurrentDocket() +) -> None: + print(f"Processing chunk {chunk}/{total_chunks} for dataset {dataset_id}") + + # Process this chunk... + await process_chunk_data(dataset_id, chunk) + + if chunk < total_chunks: + # Schedule next chunk with a related key + next_key = f"dataset-{dataset_id}-chunk-{chunk + 1}" + await docket.add( + process_data_chunk, + key=next_key + )(dataset_id, chunk + 1, total_chunks) +``` + +### Worker and Execution Context + +Access the current worker and execution details when needed: + +```python +from docket import CurrentWorker, CurrentExecution, Worker, Execution + +async def diagnostic_task( + worker: Worker = CurrentWorker(), + execution: Execution = CurrentExecution() +) -> None: + print(f"Running on worker: {worker.name}") + print(f"Task key: {execution.key}") + print(f"Scheduled at: {execution.when}") + print(f"Worker concurrency: {worker.concurrency}") +``` + +### TaskArgument + +Dependencies can access the task's input arguments using `TaskArgument`. This lets a dependency function pull values from the task's call site without the task having to pass them explicitly: + +```python +from docket import TaskArgument + +async def get_user_context(user_id: int = TaskArgument()) -> dict: + """Dependency that fetches user context based on task argument.""" + user = await fetch_user(user_id) + return { + 'user': user, + 'permissions': await fetch_user_permissions(user_id), + 'preferences': await fetch_user_preferences(user_id) + } + +async def send_personalized_email( + user_id: int, + message: str, + user_context=Depends(get_user_context) +) -> None: + # user_context is populated based on the user_id argument + email = personalize_email(message, user_context['preferences']) + await send_email(user_context['user'].email, email) +``` + +You can access arguments by name or make them optional: + +```python +async def get_optional_config( + config_name: str | None = TaskArgument("config", optional=True) +) -> dict: + """Get configuration if provided, otherwise use defaults.""" + if config_name: + return await load_config(config_name) + return DEFAULT_CONFIG + +async def flexible_task( + data: dict, + config: str | None = None, # Optional argument + resolved_config=Depends(get_optional_config) +) -> None: + # resolved_config will be loaded config or defaults + await process_data(data, resolved_config) +``` + +## Using Functions as Dependencies + +### Depends + +`Depends()` wraps any callable (sync or async, plain or context manager) as a dependency that's resolved fresh for each task execution. + +#### Async Dependencies + +```python +from contextlib import asynccontextmanager +from docket import Depends + +@asynccontextmanager +async def get_database_connection(): + """Async dependency that returns a database connection.""" + conn = await database.connect() + try: + yield conn + finally: + await conn.close() + +async def process_user_data( + user_id: int, + db=Depends(get_database_connection) +) -> None: + # Database connection is automatically provided and cleaned up + user = await db.fetch_user(user_id) + await db.update_user(user_id, {"last_seen": datetime.now()}) +``` + +#### Sync Dependencies + +Use sync dependencies for pure computations and in-memory operations. Synchronous dependencies should **not** include blocking I/O (file access, network calls, database queries) as that blocks the event loop and prevents other tasks from running. Use async dependencies for any I/O. + +```python +from docket import Depends + +# In-memory config lookup - no I/O +def get_config() -> dict: + """Access configuration from memory.""" + return {"api_url": "https://api.example.com", "timeout": 30} + +# Pure computation - no I/O +def build_request_headers(config: dict = Depends(get_config)) -> dict: + """Construct headers from config.""" + return { + "User-Agent": "MyApp/1.0", + "Timeout": str(config["timeout"]) + } + +async def call_api( + headers: dict = Depends(build_request_headers) +) -> None: + # Headers are computed without blocking + # Network I/O happens here (async) + response = await http_client.get(url, headers=headers) +``` + +#### Context Managers + +Dependencies that are async context managers get automatic cleanup: + +```python +@asynccontextmanager +async def get_database_connection(): + conn = await database.connect() + try: + yield conn + finally: + await conn.close() +``` + +The connection is created before your task runs and closed after it finishes, even if the task raises an exception. + +#### Nesting and Caching + +Dependency functions can themselves declare dependencies as parameters — including other `Depends()` values and built-in context like `CurrentExecution()`. Docket resolves the full graph in the right order: + +```python +async def get_auth_service(db=Depends(get_database_connection)): + """A service that depends on the database connection.""" + return AuthService(db) + +async def get_user_service( + db=Depends(get_database_connection), + auth=Depends(get_auth_service) +): + """A service that depends on both database and auth service.""" + return UserService(db, auth) + +async def update_user_profile( + user_id: int, + profile_data: dict, + user_service=Depends(get_user_service) +) -> None: + # All dependencies are resolved automatically: + # db -> auth_service -> user_service -> this task + await user_service.update_profile(user_id, profile_data) +``` + +Dependencies are resolved once per task execution and cached, so if multiple parameters depend on the same resource, only one instance is created. + +```python +async def get_task_logger( + execution: Execution = CurrentExecution(), + worker: Worker = CurrentWorker() +) -> LoggerAdapter: + logger = logging.getLogger(f"worker.{worker.name}") + return LoggerAdapter(logger, { + 'task_key': execution.key, + 'worker_name': worker.name + }) + +async def important_task( + data: dict, + logger=Depends(get_task_logger) +) -> None: + logger.info("Starting important task") + await process_important_data(data) + logger.info("Important task completed") +``` + +### Shared + +While `Depends` resolves a fresh instance for each task, `Shared` resolves once at worker startup and provides the same instance to all tasks for the worker's lifetime. This is useful for expensive resources like connection pools, loaded configuration, or shared clients. + +#### Async Context Manager (with cleanup) + +Use an async context manager when the resource needs cleanup at worker shutdown: + +```python +from contextlib import asynccontextmanager +from docket import Shared + +@asynccontextmanager +async def create_db_pool(): + pool = await AsyncConnectionPool.create(conninfo="postgresql://...") + try: + yield pool + finally: + await pool.close() + +async def query_users( + pool: AsyncConnectionPool = Shared(create_db_pool) +) -> None: + async with pool.connection() as conn: + await conn.execute("SELECT ...") +``` + +The pool is created once on first use and closed when the worker shuts down. + +#### Simple Async Function + +For shared values that don't need cleanup, a plain async function works: + +```python +from docket import Shared + +async def load_config() -> Config: + return await fetch_config_from_remote() + +async def process_order( + config: Config = Shared(load_config) +) -> None: + # Same config instance across all tasks on this worker + print(config.api_url) +``` + +#### Identity + +Shared dependencies are keyed by the factory function itself. Multiple `Shared(same_factory)` calls anywhere in the codebase resolve to the same cached value: + +```python +async def task_a(pool: AsyncConnectionPool = Shared(create_db_pool)) -> None: + ... + +async def task_b(pool: AsyncConnectionPool = Shared(create_db_pool)) -> None: + # Same pool instance as task_a + ... +``` + +### Error Handling + +When dependencies fail, the entire task fails with detailed error information: + +```python +async def unreliable_dependency(): + if random.random() < 0.5: + raise ValueError("Service unavailable") + return "success" + +async def dependent_task( + value=Depends(unreliable_dependency) +) -> None: + print(f"Got value: {value}") +``` + +If `unreliable_dependency` fails, the task won't execute and the error will be logged with context about which dependency failed. This prevents tasks from running with incomplete or invalid dependencies. + +## Subclassing Dependency + +For full control, subclass `Dependency` directly. This is how all of Docket's built-in dependencies (`Progress`, `ConcurrencyLimit`, `Timeout`, etc.) are implemented. A `Dependency` subclass is an async context manager — `__aenter__` sets up the resource and returns the value injected into the task, and `__aexit__` handles cleanup. + +```python +from docket.dependencies import Dependency + +class RateLimitedClient(Dependency): + """Injects a rate-limited HTTP client scoped to this task execution.""" + + def __init__(self, requests_per_second: int = 10) -> None: + self.requests_per_second = requests_per_second + + async def __aenter__(self) -> httpx.AsyncClient: + self._client = httpx.AsyncClient( + limits=httpx.Limits(max_connections=self.requests_per_second) + ) + return self._client + + async def __aexit__(self, exc_type, exc_value, traceback) -> None: + await self._client.aclose() + +async def fetch_pages( + urls: list[str], + client: httpx.AsyncClient = RateLimitedClient(requests_per_second=5) +) -> None: + for url in urls: + response = await client.get(url) + await process_response(response) +``` + +Inside `__aenter__`, you can access the current execution context through the class-level context vars `self.docket`, `self.worker`, and `self.execution`: + +```python +class AuditedDependency(Dependency): + async def __aenter__(self) -> AuditLog: + execution = self.execution.get() + worker = self.worker.get() + return AuditLog(task_key=execution.key, worker_name=worker.name) +``` diff --git a/docs/getting-started.md b/docs/getting-started.md index 7d8a14d5..4fa0afde 100644 --- a/docs/getting-started.md +++ b/docs/getting-started.md @@ -21,7 +21,9 @@ With `pip`: pip install pydocket ``` -You'll also need a [Redis](http://redis.io/) server with Streams support (Redis 5.0+). Docket is tested with Redis 6, 7, and 8, and also works with [Valkey](https://valkey.io/). +You'll also need a [Redis](https://redis.io/) server with Streams support (Redis 5.0+). Docket is tested with Redis 6.2, 7.4, and 8.6, and also works with [Valkey](https://valkey.io/) 8.1. + +To try docket without setting up Redis, you can use the in-memory backend — see [Testing with Docket](testing.md) for details. ## Your First Docket @@ -183,7 +185,7 @@ Docket provides at-least-once delivery semantics, meaning tasks may be delivered ## Task Observability -Docket automatically tracks task execution state and provides comprehensive observability features for monitoring long-running tasks. +Docket automatically tracks task execution state and provides tools for monitoring long-running tasks. ### Execution State @@ -234,7 +236,7 @@ total = await execution.get_result() # Waits for completion if needed print(f"Order total: ${total:.2f}") ``` -For detailed information on state tracking, progress monitoring, result retrieval, and CLI monitoring tools, see [Task State and Progress Monitoring](advanced-patterns.md#task-state-and-progress-monitoring). +For detailed information on state tracking, progress monitoring, result retrieval, and CLI monitoring tools, see [Task Observability](observability.md). ## What's Next? @@ -242,10 +244,12 @@ You now know the core concepts: creating dockets, scheduling work with idempoten Ready for more? Check out: -- **[Dependencies Guide](dependencies.md)** - Access current docket, advanced retry patterns, timeouts, and custom dependencies +- **[Task Behaviors](task-behaviors.md)** - Retries, timeouts, progress reporting, and concurrency control +- **[Dependency Injection](dependency-injection.md)** - Access current docket, custom dependencies, shared resources - **[Testing with Docket](testing.md)** - Ergonomic testing utilities for unit and integration tests -- **[Advanced Task Patterns](advanced-patterns.md)** - Perpetual tasks, striking/restoring, logging, and task chains -- **[Docket in Production](production.md)** - Redis architecture, monitoring, and deployment strategies +- **[Task Design Patterns](task-patterns.md)** - Find & flood, task scattering, logging, and task chains +- **[Task Observability](observability.md)** - State tracking, progress monitoring, results, and CLI watch +- **[Docket in Production](production.md)** - Worker configuration, Redis connections, monitoring, and striking - **[API Reference](api-reference.md)** - Complete documentation of all classes and methods ## A Note on Security diff --git a/docs/index.md b/docs/index.md index 72688779..db0e5275 100644 --- a/docs/index.md +++ b/docs/index.md @@ -9,6 +9,14 @@ on the scheduling of future work as seamlessly and efficiently as immediate work [![Codecov](https://img.shields.io/codecov/c/github/chrisguidry/docket)](https://app.codecov.io/gh/chrisguidry/docket) [![PyPI - License](https://img.shields.io/pypi/l/pydocket)](https://github.com/chrisguidry/docket/blob/main/LICENSE) +## Install + +```bash +uv add pydocket +``` + +See [Getting Started](getting-started.md) for more installation options. + ## At a glance ```python @@ -35,6 +43,8 @@ And in another process, run a worker: from docket import Docket, Worker async with Docket() as docket: + docket.register(greet) + async with Worker(docket) as worker: await worker.run_until_finished() ``` @@ -65,7 +75,8 @@ docket integrates two modes of task execution: 1. **Immediate tasks** are pushed onto a Redis stream and are available to be picked up by any worker. 2. **Scheduled tasks** are pushed onto a Redis sorted set with a schedule time. A loop within each worker moves scheduled tasks onto the stream when their schedule time has arrived. This move is performed as a Lua script to ensure atomicity. -Docket requires a [Redis](http://redis.io/) server with Streams support (which was -introduced in Redis 5.0.0). Docket is tested with Redis 6, 7, and 8. +Docket requires a [Redis](https://redis.io/) server with Streams support (which was +introduced in Redis 5.0.0). Docket is tested with Redis 6.2, 7.4, and 8.6, and also +works with [Valkey](https://valkey.io/) 8.1. For more detailed information, check out our [Getting Started](getting-started.md) guide or dive into the [API Reference](api-reference.md). diff --git a/docs/observability.md b/docs/observability.md new file mode 100644 index 00000000..9aee6b93 --- /dev/null +++ b/docs/observability.md @@ -0,0 +1,349 @@ +# Task Observability + +Docket tracks execution state, progress, and results for every task. These features let you observe task execution in real-time, report progress to users, and retrieve results from completed tasks. + +## Tracking Execution State + +Access the current state of any task execution: + +```python +from docket import Docket +from docket.execution import ExecutionState + +async with Docket() as docket: + # Schedule a task + execution = await docket.add(process_order)(order_id=12345) + + # Check initial state + print(f"State: {execution.state}") # ExecutionState.QUEUED + + # Later, sync with Redis to get current state + await execution.sync() + print(f"State: {execution.state}") # May be RUNNING or COMPLETED + + # Check specific states + if execution.state == ExecutionState.COMPLETED: + print(f"Task completed at {execution.completed_at}") + elif execution.state == ExecutionState.FAILED: + print(f"Task failed: {execution.error}") + elif execution.state == ExecutionState.RUNNING: + print(f"Task running on {execution.worker} since {execution.started_at}") +``` + +## Monitoring Progress in Real-Time + +Subscribe to progress updates programmatically: + +```python +async def monitor_task_progress(execution: Execution) -> None: + """Monitor a task's progress and state in real-time.""" + async for event in execution.subscribe(): + if event["type"] == "state": + state = event["state"] + print(f"State changed to: {state}") + + if state in (ExecutionState.COMPLETED, ExecutionState.FAILED): + break + + elif event["type"] == "progress": + current = event["current"] + total = event["total"] + message = event["message"] + percentage = (current / total * 100) if total > 0 else 0 + print(f"Progress: {current}/{total} ({percentage:.1f}%) - {message}") + +# Schedule a task and monitor it +execution = await docket.add(import_customer_records)("/data/large_dataset.csv") + +# Monitor in a separate task +asyncio.create_task(monitor_task_progress(execution)) +``` + +## Progress Patterns + +### Incremental Progress + +For tasks with known steps, use `set_total()` and `increment()`: + +```python +async def process_batch( + batch_id: int, + progress: ExecutionProgress = Progress() +) -> None: + items = await fetch_batch_items(batch_id) + await progress.set_total(len(items)) + + for item in items: + await process_item(item) + await progress.increment() # Increments by 1 +``` + +### Batch Progress Updates + +For fine-grained work, batch progress updates to reduce Redis calls: + +```python +async def process_large_dataset( + dataset_id: str, + progress: ExecutionProgress = Progress() +) -> None: + records = await load_dataset(dataset_id) + await progress.set_total(len(records)) + + # Update every 100 records instead of every record + for i, record in enumerate(records): + await process_record(record) + + if (i + 1) % 100 == 0: + await progress.increment(100) + await progress.set_message(f"Processed {i + 1} records") + + # Update any remaining progress + remaining = len(records) % 100 + if remaining > 0: + await progress.increment(remaining) +``` + +### Nested Progress Tracking + +Break down complex tasks into subtasks with their own progress: + +```python +async def data_migration( + source_db: str, + progress: ExecutionProgress = Progress() +) -> None: + # Define major phases + phases = [ + ("extract", extract_data), + ("transform", transform_data), + ("load", load_data), + ("verify", verify_data) + ] + + await progress.set_total(len(phases) * 100) + + for phase_num, (phase_name, phase_func) in enumerate(phases): + await progress.set_message(f"Phase: {phase_name}") + + # Each phase reports its own progress (0-100) + # We scale it to our overall progress + phase_progress = 0 + async for update in phase_func(source_db): + # Each phase returns progress from 0-100 + delta = update - phase_progress + await progress.increment(delta) + phase_progress = update +``` + +## Retrieving Task Results + +Tasks can return values that are automatically persisted and retrievable: + +```python +async def calculate_metrics(dataset_id: str) -> dict[str, float]: + """Calculate and return metrics from a dataset.""" + data = await load_dataset(dataset_id) + return { + "mean": sum(data) / len(data), + "max": max(data), + "min": min(data), + "count": len(data) + } + +# Schedule the task +execution = await docket.add(calculate_metrics)("dataset-2025-01") + +# Later, retrieve the result +metrics = await execution.get_result() +print(f"Mean: {metrics['mean']}, Count: {metrics['count']}") +``` + +### Waiting for Results + +`get_result()` automatically waits for task completion if it's still running: + +```python +# Schedule a task and immediately wait for its result +execution = await docket.add(fetch_external_data)("https://api.example.com/data") + +# This will wait until the task completes +try: + data = await execution.get_result() + print(f"Retrieved {len(data)} records") +except Exception as e: + print(f"Task failed: {e}") +``` + +### Timeout and Deadline + +Control how long to wait for results: + +```python +from datetime import datetime, timedelta, timezone + +# Wait at most 30 seconds for a result +try: + result = await execution.get_result(timeout=timedelta(seconds=30)) +except TimeoutError: + print("Task didn't complete in 30 seconds") + +# Or specify an absolute deadline +deadline = datetime.now(timezone.utc) + timedelta(minutes=5) +try: + result = await execution.get_result(deadline=deadline) +except TimeoutError: + print("Task didn't complete by deadline") +``` + +Following Python conventions, you can specify either `timeout` (relative duration) or `deadline` (absolute time), but not both. + +### Exception Handling + +When tasks fail, `get_result()` re-raises the original exception: + +```python +async def risky_operation(data: dict) -> str: + if not data.get("valid"): + raise ValueError("Invalid data provided") + return process_data(data) + +execution = await docket.add(risky_operation)({"valid": False}) + +try: + result = await execution.get_result() +except ValueError as e: + # The original ValueError is re-raised + print(f"Validation failed: {e}") +except Exception as e: + # Other exceptions are also preserved + print(f"Unexpected error: {e}") +``` + +### Result Patterns for Workflows + +Chain tasks together using results: + +```python +async def download_file(url: str) -> str: + """Download a file and return the local path.""" + file_path = await download(url) + return file_path + +async def process_file(file_path: str) -> dict: + """Process a file and return statistics.""" + data = await parse_file(file_path) + return calculate_statistics(data) + +# Chain tasks together +download_execution = await docket.add(download_file)("https://example.com/data.csv") +file_path = await download_execution.get_result() + +# Use the result to schedule the next task +process_execution = await docket.add(process_file)(file_path) +stats = await process_execution.get_result() +print(f"Statistics: {stats}") +``` + +For complex workflows with many dependencies, consider using the `CurrentDocket()` dependency to schedule follow-up work from within tasks themselves. + +## Logging and Debugging + +### Argument Logging + +Control which task arguments appear in logs using the `Logged` annotation: + +```python +from typing import Annotated +from docket import Logged + +async def process_payment( + customer_id: Annotated[str, Logged], # Will be logged + credit_card: str, # Won't be logged + amount: Annotated[float, Logged()] = 0.0, # Will be logged + trace_id: Annotated[str, Logged] = "unknown" # Will be logged +) -> None: + # Process the payment... + pass + +# Log output will show: +# process_payment('12345', credit_card=..., amount=150.0, trace_id='abc-123') +``` + +### Collection Length Logging + +For large collections, log just their size instead of contents: + +```python +async def bulk_update_users( + user_ids: Annotated[list[str], Logged(length_only=True)], + metadata: Annotated[dict[str, str], Logged(length_only=True)], + options: Annotated[set[str], Logged(length_only=True)] +) -> None: + # Process users... + pass + +# Log output will show: +# bulk_update_users([len 150], metadata={len 5}, options={len 3}) +``` + +This prevents logs from being overwhelmed with large data structures while still providing useful information. + +### Task Context Logging + +Use `TaskLogger` for structured logging with task context: + +```python +from logging import Logger, LoggerAdapter +from docket import TaskLogger + +async def complex_data_pipeline( + dataset_id: str, + logger: LoggerAdapter[Logger] = TaskLogger() +) -> None: + logger.info("Starting data pipeline", extra={"dataset_id": dataset_id}) + + try: + await extract_data(dataset_id) + logger.info("Data extraction completed") + + await transform_data(dataset_id) + logger.info("Data transformation completed") + + await load_data(dataset_id) + logger.info("Data loading completed") + + except Exception as e: + logger.error("Pipeline failed", extra={"error": str(e)}) + raise +``` + +The logger automatically includes task context like the task name, key, and worker information. + +## CLI Monitoring with Watch + +Monitor task execution in real-time from the command line: + +```bash +# Watch a specific task +docket watch --url redis://localhost:6379/0 --docket emails task-key-123 + +# The watch command shows: +# - Current state (SCHEDULED, QUEUED, RUNNING, COMPLETED, FAILED) +# - Progress bar with percentage +# - Status messages +# - Execution timing +# - Worker information +``` + +Example output: +``` +State: RUNNING (worker-1) +Started: 2025-01-15 10:30:05 + +Progress: [████████████░░░░░░░░] 60/100 (60.0%) +Message: Processing records... +Updated: 2025-01-15 10:30:15 +``` + +The watch command uses pub/sub to receive real-time updates without polling, making it efficient for monitoring long-running tasks. diff --git a/docs/production.md b/docs/production.md index ed78265d..1b974737 100644 --- a/docs/production.md +++ b/docs/production.md @@ -1,46 +1,6 @@ # Docket in Production -Running Docket at scale requires understanding its Redis-based architecture, configuring workers appropriately, and monitoring system health. This guide covers everything you need for reliable production deployments. - -## Redis Streams Architecture - -Docket uses Redis streams and sorted sets to provide reliable task delivery with at-least-once semantics. - -### Task Lifecycle - -Understanding how tasks flow through the system helps with monitoring and troubleshooting: - -1. **Immediate tasks** go directly to the Redis stream and are available to any worker in the consumer group -2. **Future tasks** are stored in the sorted set with their execution time as the score -3. **Workers continuously move** due tasks from the sorted set to the stream -4. **Consumer groups** ensure each task is delivered to exactly one worker -5. **Acknowledgment** removes completed tasks; unacknowledged tasks are redelivered - -### Redelivery Behavior - -When a worker crashes or fails to acknowledge a task within `redelivery_timeout`, Redis automatically makes the task available to other workers. This ensures reliability but means tasks may execute more than once. - -```python -# Configure redelivery timeout based on your longest-running tasks -async with Worker( - docket, - redelivery_timeout=timedelta(minutes=10) # Adjust for your workload -) as worker: - await worker.run_forever() -``` - -Set redelivery timeout to be longer than your 99th percentile task duration to minimize duplicate executions. - -### Redis Data Structures - -Docket creates several Redis data structures for each docket: - -- **Stream (`{docket}:stream`)**: Ready-to-execute tasks using Redis consumer groups -- **Sorted Set (`{docket}:queue`)**: Future tasks ordered by scheduled execution time -- **Hashes (`{docket}:{key}`)**: Serialized task data for scheduled tasks -- **Set (`{docket}:workers`)**: Active worker heartbeats with timestamps -- **Set (`{docket}:worker-tasks:{worker}`)**: Tasks each worker can execute -- **Stream (`{docket}:strikes`)**: Strike/restore commands for operational control +This page covers configuring workers, connecting to Redis, managing state and results, monitoring, and operational tools for running Docket in production. ## Worker Configuration @@ -64,7 +24,7 @@ async with Worker( ### Environment Variable Configuration -All settings can be configured via environment variables for production deployments: +All settings can be configured via environment variables: ```bash # Core docket settings @@ -112,48 +72,19 @@ docket worker \ --tasks myapp.tasks:production_tasks ``` -### Tuning for Different Workloads - -**High-throughput, fast tasks:** - -```bash -docket worker \ - --concurrency 100 \ - --redelivery-timeout 30s \ - --minimum-check-interval 50ms \ - --scheduling-resolution 100ms -``` +### Signal Handling -**Long-running, resource-intensive tasks:** - -```bash -docket worker \ - --concurrency 5 \ - --redelivery-timeout 1h \ - --minimum-check-interval 1s \ - --scheduling-resolution 5s -``` - -**Mixed workload with perpetual tasks:** - -```bash -docket worker \ - --concurrency 25 \ - --redelivery-timeout 5m \ - --schedule-automatic-tasks \ - --tasks myapp.tasks:all_tasks,myapp.monitoring:health_checks -``` +Workers catch `SIGTERM` and `SIGINT` and shut down gracefully — they stop accepting new tasks and wait for in-flight tasks to finish before exiting. On container orchestrators like Kubernetes, set `terminationGracePeriodSeconds` to be longer than your slowest expected task so the worker has time to drain. Tasks that don't finish before the grace period expires will be redelivered to other workers based on `redelivery_timeout`. ## Connection Management ### Redis Connection Pools -Docket automatically manages Redis connection pools, but you can tune them for your environment: +Docket automatically manages Redis connection pools. To use a custom pool: ```python from redis.asyncio import ConnectionPool -# Custom connection pool for high-concurrency workers pool = ConnectionPool.from_url( "redis://redis.prod.com:6379/0", max_connections=50, # Match or exceed worker concurrency @@ -161,18 +92,9 @@ pool = ConnectionPool.from_url( ) async with Docket(name="orders", connection_pool=pool) as docket: - # Use the custom pool pass ``` -### Redis Requirements - -Docket supports both standalone Redis and Redis Cluster deployments. For high availability, consider: - -- **Managed Redis services** like AWS ElastiCache, Google Cloud Memorystore, or Redis Cloud -- **Redis Cluster** for horizontal scaling and automatic failover -- **Redis replicas** with manual failover procedures for standalone deployments - ### Redis Cluster Support Docket supports Redis Cluster using the `redis+cluster://` URL scheme: @@ -313,13 +235,6 @@ With `execution_ttl=0`: - **Maximum throughput**: Minimizes Redis operations per task - **get_result() unavailable**: Cannot retrieve task results -This mode is ideal for: - -- High-volume event processing (logging, metrics, notifications) -- Fire-and-forget operations where results don't matter -- Systems where observability is handled externally -- Maximizing task throughput at the expense of visibility - ### Result Storage Configuration Task results are stored using the `py-key-value-aio` library. By default, Docket uses `RedisStore` but you can provide a custom storage backend: @@ -347,7 +262,7 @@ async with Docket( Custom storage backends must implement the `KeyValueStore` protocol from `py-key-value-aio`. -### Result Storage Best Practices +### Result Storage Tips **Separate Redis Database**: Store results in a different Redis database than task queues: @@ -491,108 +406,69 @@ Log entries include: - Redis connection status - Strike/restore operations -### Example Grafana Dashboard - -Monitor Docket health with queries like: - -```promql -# Task throughput -rate(docket_tasks_completed[5m]) +## Striking and Restoring Tasks -# Error rate -rate(docket_tasks_failed[5m]) / rate(docket_tasks_started[5m]) +Striking temporarily disables tasks without redeploying code. -# Queue depth trending -docket_queue_depth +### Striking Entire Task Types -# P95 task duration -histogram_quantile(0.95, rate(docket_task_duration_bucket[5m])) +Disable all instances of a specific task: -# Worker availability -up{job="docket-workers"} -``` - -## Production Guidelines - -### Capacity Planning +```python +# Disable all order processing during maintenance +await docket.strike(process_order) -**Estimate concurrent tasks:** +# Orders added during this time won't be processed +await docket.add(process_order)(order_id=12345) # Won't run +await docket.add(process_order)(order_id=67890) # Won't run +# Re-enable when ready +await docket.restore(process_order) ``` -concurrent_tasks = avg_task_duration * tasks_per_second -worker_concurrency = concurrent_tasks * 1.2 # 20% buffer -``` - -**Size worker pools:** -- Start with 1-2 workers per CPU core -- Monitor CPU and memory usage -- Scale horizontally rather than increasing concurrency indefinitely +### Striking by Parameter Values -### Deployment Strategies +Disable tasks based on their arguments using comparison operators: -**Blue-green deployments:** +```python +# Block all tasks for a problematic customer +await docket.strike(None, "customer_id", "==", "12345") -```bash -# Deploy new workers with different name -docket worker --name orders-worker-v2 --tasks myapp.tasks:v2_tasks +# Block low-priority work during high load +await docket.strike(process_order, "priority", "<=", "low") -# Gradually strike old task versions -docket strike old_task_function +# Block all orders above a certain value during fraud investigation +await docket.strike(process_payment, "amount", ">", 10000) -# Scale down old workers after tasks drain +# Later, restore them +await docket.restore(None, "customer_id", "==", "12345") +await docket.restore(process_order, "priority", "<=", "low") ``` -### Error Handling - -**Configure appropriate retries:** +Supported operators include `==`, `!=`, `<`, `<=`, `>`, `>=`. -```python -# Transient failures - short delays -async def api_call( - retry: Retry = Retry(attempts=3, delay=timedelta(seconds=5)) -): ... - -# Infrastructure issues - exponential backoff -async def database_sync( - retry: ExponentialRetry = ExponentialRetry( - attempts=5, - minimum_delay=timedelta(seconds=30), - maximum_delay=timedelta(minutes=10) - ) -): ... - -# Critical operations - unlimited retries -async def financial_transaction( - retry: Retry = Retry(attempts=None, delay=timedelta(minutes=1)) -): ... -``` +### Striking Specific Task-Parameter Combinations -**Dead letter handling:** +Target very specific scenarios: ```python -async def process_order(order_id: str) -> None: - try: - await handle_order(order_id) - except CriticalError as e: - # Send to dead letter queue for manual investigation - await send_to_dead_letter_queue(order_id, str(e)) - raise -``` - -### Operational Procedures +# Block only high-value orders for a specific customer +await docket.strike(process_order, "customer_id", "==", "12345") +await docket.strike(process_order, "amount", ">", 1000) -**Graceful shutdown:** +# This order won't run (blocked customer) +await docket.add(process_order)(customer_id="12345", amount=500) -```bash -# Workers handle SIGTERM gracefully -kill -TERM $WORKER_PID +# This order won't run (blocked customer AND high amount) +await docket.add(process_order)(customer_id="12345", amount=2000) -# Or use container orchestration stop signals -docker stop docket-worker +# This order WILL run (different customer) +await docket.add(process_order)(customer_id="67890", amount=2000) ``` -**Emergency task blocking:** +### Striking from the CLI + +You can also strike and restore tasks from the command line: ```bash # Block problematic tasks immediately @@ -605,33 +481,17 @@ docket strike process_order customer_id == "problematic-customer" docket restore problematic_function ``` -**Monitoring checklist:** +## Built-in Utility Tasks -- Queue depth alerts (tasks backing up) -- Error rate alerts (> 5% failure rate) -- Task duration alerts (P95 > expected) -- Worker availability alerts -- Redis connection health +Docket provides utility tasks for smoke-testing and debugging: -### Scaling Considerations - -**Horizontal scaling:** - -- Add workers across multiple machines -- Use consistent worker naming for monitoring -- Monitor Redis memory usage as task volume grows - -**Vertical scaling:** - -- Increase worker concurrency for I/O bound tasks -- Increase memory limits for large task payloads -- Monitor CPU usage to avoid oversubscription - -**Redis scaling:** +```python +from docket import tasks -- Use managed Redis services for high availability -- Deploy Redis Cluster for horizontal scaling with the `redis+cluster://` URL scheme -- Monitor memory usage and eviction policies -- Scale vertically for larger workloads on standalone Redis +# Simple trace logging +await docket.add(tasks.trace)("System startup completed") +await docket.add(tasks.trace)("Processing batch 123") -Running Docket in production requires attention to these operational details, but the Redis-based architecture and monitoring support can help with demanding production workloads. +# Intentional failures for testing error handling +await docket.add(tasks.fail)("Testing error notification system") +``` diff --git a/docs/task-behaviors.md b/docs/task-behaviors.md new file mode 100644 index 00000000..b01128f5 --- /dev/null +++ b/docs/task-behaviors.md @@ -0,0 +1,480 @@ +# Task Behaviors + +Docket tasks can declare behavioral defaults like retries, timeouts, progress reporting, and concurrency limits as default parameter values. Docket's dependency injection resolves them at execution time, so you just write a normal function signature. + +## Perpetual Tasks + +Perpetual tasks automatically reschedule themselves, making them well-suited for recurring work like health checks, data synchronization, or periodic cleanup operations. + +### Basic Perpetual Tasks + +```python +from docket import Perpetual + +async def health_check_service( + service_url: str, + perpetual: Perpetual = Perpetual(every=timedelta(minutes=5)) +) -> None: + try: + response = await http_client.get(f"{service_url}/health") + response.raise_for_status() + print(f"✓ {service_url} is healthy") + except Exception as e: + print(f"✗ {service_url} failed health check: {e}") + await send_alert(f"Service {service_url} is down") + +# Schedule the task once, it will run every 5 minutes forever +await docket.add(health_check_service)("https://api.example.com") +``` + +After each execution, the task automatically schedules itself to run again after the specified interval. + +### Automatic Startup + +Perpetual tasks can start themselves automatically when a worker sees them, without needing to be explicitly scheduled: + +```python +async def background_cleanup( + perpetual: Perpetual = Perpetual( + every=timedelta(hours=1), + automatic=True + ) +) -> None: + deleted_count = await cleanup_old_records() + print(f"Cleaned up {deleted_count} old records") + +# Just register the task - no need to schedule it +docket.register(background_cleanup) + +# When a worker starts, it will automatically begin running this task +# The task key will be the function name: "background_cleanup" +``` + +### Self-Canceling Tasks + +Perpetual tasks can stop themselves when their work is done: + +```python +async def monitor_deployment( + deployment_id: str, + perpetual: Perpetual = Perpetual(every=timedelta(seconds=30)) +) -> None: + status = await check_deployment_status(deployment_id) + + if status in ["completed", "failed"]: + await notify_deployment_finished(deployment_id, status) + perpetual.cancel() # Stop monitoring this deployment + return + + print(f"Deployment {deployment_id} status: {status}") +``` + +### Dynamic Parameters + +Perpetual tasks can change their arguments or timing for the next execution: + +```python +async def adaptive_rate_limiter( + api_endpoint: str, + requests_per_minute: int = 60, + perpetual: Perpetual = Perpetual(every=timedelta(minutes=1)) +) -> None: + # Check current API load + current_load = await check_api_load(api_endpoint) + + if current_load > 0.8: # High load + new_rate = max(30, requests_per_minute - 10) + perpetual.every = timedelta(seconds=30) # Check more frequently + print(f"High load detected, reducing rate to {new_rate} req/min") + else: # Normal load + new_rate = min(120, requests_per_minute + 5) + perpetual.every = timedelta(minutes=1) # Normal check interval + print(f"Normal load, increasing rate to {new_rate} req/min") + + # Schedule next run with updated parameters + perpetual.perpetuate(api_endpoint, new_rate) +``` + +### Error Resilience + +Perpetual tasks automatically reschedule themselves regardless of success or failure: + +```python +async def resilient_sync( + source_url: str, + perpetual: Perpetual = Perpetual(every=timedelta(minutes=15)) +) -> None: + # This will ALWAYS reschedule, whether it succeeds or fails + await sync_data_from_source(source_url) + print(f"Successfully synced data from {source_url}") +``` + +You don't need try/except blocks to ensure rescheduling - Docket handles this automatically. Whether the task completes successfully or raises an exception, the next execution will be scheduled according to the `every` interval. + +## Cron Tasks + +For tasks that need to run at specific wall-clock times rather than at fixed intervals, use the `Cron` dependency. It extends `Perpetual` with cron expression support, scheduling the next run at the exact matching time after each execution. + +### Basic Cron Expressions + +```python +from docket import Cron + +async def weekly_report(cron: Cron = Cron("0 9 * * 1")) -> None: + # Runs every Monday at 9:00 AM UTC + await generate_and_send_report() + +async def hourly_sync(cron: Cron = Cron("0 * * * *")) -> None: + # Runs at the top of every hour + await sync_external_data() +``` + +Cron uses standard 5-field syntax: `minute hour day month weekday`. + +### Vixie Keywords + +For common schedules, use the shorthand keywords: + +```python +async def daily_cleanup(cron: Cron = Cron("@daily")) -> None: + await cleanup_old_records() + +async def hourly_check(cron: Cron = Cron("@hourly")) -> None: + await check_service_health() +``` + +Supported keywords: `@yearly`, `@annually`, `@monthly`, `@weekly`, `@daily`, `@midnight`, `@hourly`. + +### Timezone Support + +By default, cron expressions are interpreted in UTC. Pass a `tz` argument to use a different timezone — this handles daylight saving time transitions automatically: + +```python +from zoneinfo import ZoneInfo + +async def morning_standup( + cron: Cron = Cron("0 9 * * 1-5", tz=ZoneInfo("America/Los_Angeles")) +) -> None: + # Runs weekdays at 9:00 AM Pacific, adjusting for DST + await send_standup_reminder() + +async def tokyo_report( + cron: Cron = Cron("30 17 * * *", tz=ZoneInfo("Asia/Tokyo")) +) -> None: + # Runs at 5:30 PM JST every day + await generate_daily_report() +``` + +### Automatic Scheduling + +Like `Perpetual`, cron tasks default to `automatic=True`, meaning they start themselves when a worker sees them — no explicit `docket.add()` call needed: + +```python +# Just register the task; the worker handles scheduling +docket.register(weekly_report) +docket.register(daily_cleanup) +``` + +Since cron tasks are automatic by default, they must not require any arguments. + +## Retrying Tasks + +### Exponential Backoff + +For services that might be overloaded, exponential backoff gives them time to recover: + +```python +from docket import ExponentialRetry + +async def call_external_api( + url: str, + retry: ExponentialRetry = ExponentialRetry( + attempts=5, + minimum_delay=timedelta(seconds=1), + maximum_delay=timedelta(minutes=5) + ) +) -> None: + # Retries with delays: 1s, 2s, 4s, 8s, 16s (but capped at 5 minutes) + try: + response = await http_client.get(url) + response.raise_for_status() + print(f"API call succeeded on attempt {retry.attempt}") + except Exception as e: + print(f"Attempt {retry.attempt} failed: {e}") + raise +``` + +### Unlimited Retries + +For critical tasks that must eventually succeed, use `attempts=None`: + +```python +from docket import Retry + +async def critical_data_sync( + source_url: str, + retry: Retry = Retry(attempts=None, delay=timedelta(minutes=5)) +) -> None: + # This will retry forever with 5-minute delays until it succeeds + await sync_critical_data(source_url) + print(f"Critical sync completed after {retry.attempt} attempts") +``` + +Both `Retry` and `ExponentialRetry` support unlimited retries this way. + +## Task Timeouts + +Prevent tasks from running too long with the `Timeout` dependency: + +```python +from docket import Timeout + +async def data_processing_task( + large_dataset: dict, + timeout: Timeout = Timeout(timedelta(minutes=10)) +) -> None: + # This task will be cancelled if it runs longer than 10 minutes + await process_dataset_phase_one(large_dataset) + + # Extend timeout if we need more time for phase two + timeout.extend(timedelta(minutes=5)) + await process_dataset_phase_two(large_dataset) +``` + +The `extend()` method can take a specific duration or default to the original timeout duration: + +```python +async def adaptive_timeout_task( + timeout: Timeout = Timeout(timedelta(minutes=2)) +) -> None: + await quick_check() + + # Extend by the base timeout (another 2 minutes) + timeout.extend() + await longer_operation() +``` + +Timeouts work alongside retries. If a task times out, it can be retried according to its retry policy. + +## Reporting Task Progress + +The `Progress()` dependency provides access to the current task's progress tracker, allowing tasks to report their progress to external observers: + +```python +from docket import Progress +from docket.execution import ExecutionProgress + +async def import_records( + file_path: str, + progress: ExecutionProgress = Progress() +) -> None: + records = await load_records(file_path) + + # Set the total number of items to process + await progress.set_total(len(records)) + await progress.set_message("Starting import") + + for i, record in enumerate(records, 1): + await import_record(record) + + # Update progress atomically + await progress.increment() + + # Optionally update status message + if i % 100 == 0: + await progress.set_message(f"Imported {i}/{len(records)} records") + + await progress.set_message("Import complete") +``` + +Progress updates are: + +- **Atomic**: `increment()` uses Redis HINCRBY for thread-safe updates +- **Real-time**: Updates published via pub/sub for live monitoring +- **Observable**: Can be monitored with `docket watch` CLI or programmatically +- **Ephemeral**: Progress data is automatically deleted when the task completes + +The `ExecutionProgress` object provides these methods: + +- `set_total(total: int)`: Set the target/total value for progress tracking +- `increment(amount: int = 1)`: Atomically increment the current progress value +- `set_message(message: str)`: Update the status message +- `sync()`: Refresh local state from Redis + +For more details on progress monitoring patterns and real-time observation, see [Task Observability](observability.md). + +## Concurrency Control + +Docket provides fine-grained concurrency control that allows you to limit the number of concurrent tasks based on specific argument values. This is essential for protecting shared resources, preventing overwhelming external services, and managing database connections. + +### Basic Concurrency Limits + +Use `ConcurrencyLimit` to restrict concurrent execution based on task arguments: + +```python +from docket import ConcurrencyLimit + +async def process_customer_data( + customer_id: int, + concurrency: ConcurrencyLimit = ConcurrencyLimit("customer_id", max_concurrent=1) +) -> None: + # Only one task per customer_id can run at a time + await update_customer_profile(customer_id) + await recalculate_customer_metrics(customer_id) + +# These will run sequentially for the same customer +await docket.add(process_customer_data)(customer_id=1001) +await docket.add(process_customer_data)(customer_id=1001) +await docket.add(process_customer_data)(customer_id=1001) + +# But different customers can run concurrently +await docket.add(process_customer_data)(customer_id=2001) # Runs in parallel +await docket.add(process_customer_data)(customer_id=3001) # Runs in parallel +``` + +### Database Connection Pooling + +Limit concurrent database operations to prevent overwhelming your database: + +```python +async def backup_database_table( + db_name: str, + table_name: str, + concurrency: ConcurrencyLimit = ConcurrencyLimit("db_name", max_concurrent=2) +) -> None: + # Maximum 2 backup operations per database at once + await create_table_backup(db_name, table_name) + await verify_backup_integrity(db_name, table_name) + +# Schedule many backup tasks - only 2 per database will run concurrently +tables = ["users", "orders", "products", "analytics", "logs"] +for table in tables: + await docket.add(backup_database_table)("production", table) + await docket.add(backup_database_table)("staging", table) +``` + +### API Rate Limiting + +Protect external APIs from being overwhelmed: + +```python +async def sync_user_with_external_service( + user_id: int, + service_name: str, + concurrency: ConcurrencyLimit = ConcurrencyLimit("service_name", max_concurrent=5) +) -> None: + # Limit to 5 concurrent API calls per external service + api_client = get_api_client(service_name) + user_data = await fetch_user_data(user_id) + await api_client.sync_user(user_data) + +# These respect per-service limits +await docket.add(sync_user_with_external_service)(123, "salesforce") +await docket.add(sync_user_with_external_service)(456, "salesforce") # Will queue if needed +await docket.add(sync_user_with_external_service)(789, "hubspot") # Different service, runs in parallel +``` + +### File Processing Limits + +Control concurrent file operations to manage disk I/O: + +```python +async def process_media_file( + file_path: str, + operation_type: str, + concurrency: ConcurrencyLimit = ConcurrencyLimit("operation_type", max_concurrent=3) +) -> None: + # Limit concurrent operations by type (e.g., 3 video transcodes, 3 image resizes) + if operation_type == "video_transcode": + await transcode_video(file_path) + elif operation_type == "image_resize": + await resize_image(file_path) + elif operation_type == "audio_compress": + await compress_audio(file_path) + +# Different operation types can run concurrently, but each type is limited +await docket.add(process_media_file)("/videos/movie1.mp4", "video_transcode") +await docket.add(process_media_file)("/videos/movie2.mp4", "video_transcode") +await docket.add(process_media_file)("/images/photo1.jpg", "image_resize") # Runs in parallel +``` + +### Custom Scopes + +Use custom scopes to create independent concurrency limits: + +```python +async def process_tenant_data( + tenant_id: str, + operation: str, + concurrency: ConcurrencyLimit = ConcurrencyLimit( + "tenant_id", + max_concurrent=2, + scope="tenant_operations" + ) +) -> None: + # Each tenant can have up to 2 concurrent operations + await perform_tenant_operation(tenant_id, operation) + +async def process_global_data( + data_type: str, + concurrency: ConcurrencyLimit = ConcurrencyLimit( + "data_type", + max_concurrent=1, + scope="global_operations" # Separate from tenant operations + ) +) -> None: + # Global operations have their own concurrency limits + await process_global_data_type(data_type) +``` + +### Multi-Level Concurrency + +Combine multiple concurrency controls for complex scenarios: + +```python +async def process_user_export( + user_id: int, + export_type: str, + region: str, + user_limit: ConcurrencyLimit = ConcurrencyLimit("user_id", max_concurrent=1), + type_limit: ConcurrencyLimit = ConcurrencyLimit("export_type", max_concurrent=3), + region_limit: ConcurrencyLimit = ConcurrencyLimit("region", max_concurrent=10) +) -> None: + # This task respects ALL concurrency limits: + # - Only 1 export per user at a time + # - Only 3 exports of each type globally + # - Only 10 exports per region + await generate_user_export(user_id, export_type, region) +``` + +**Note**: When using multiple `ConcurrencyLimit` dependencies, all limits must be satisfied before the task can start. + +### Monitoring Concurrency + +Concurrency limits are enforced using Redis sets, so you can monitor them: + +```python +async def monitor_concurrency_usage() -> None: + async with docket.redis() as redis: + # Check how many tasks are running for a specific limit + active_count = await redis.scard("docket:concurrency:customer_id:1001") + print(f"Customer 1001 has {active_count} active tasks") + + # List all active concurrency keys + keys = await redis.keys("docket:concurrency:*") + for key in keys: + count = await redis.scard(key) + print(f"{key}: {count} active tasks") +``` + +### Tips + +1. **Choose appropriate argument names**: Use arguments that represent the resource you want to protect (database name, customer ID, API endpoint). + +2. **Set reasonable limits**: Base limits on your system's capacity and external service constraints. + +3. **Use descriptive scopes**: When you have multiple unrelated concurrency controls, use different scopes to avoid conflicts. + +4. **Monitor blocked tasks**: Tasks that can't start due to concurrency limits are automatically rescheduled with small delays. + +5. **Consider cascading effects**: Concurrency limits can create queuing effects - monitor your system to ensure tasks don't back up excessively. diff --git a/docs/task-patterns.md b/docs/task-patterns.md new file mode 100644 index 00000000..83ad9ac2 --- /dev/null +++ b/docs/task-patterns.md @@ -0,0 +1,245 @@ +# Task Design Patterns + +Docket is made for building complex distributed systems, and the patterns below highlight some of the original use cases for Docket. + +## Find & Flood Pattern + +A common perpetual task pattern is "find & flood" - a single perpetual task that periodically discovers work to do, then creates many smaller tasks to handle the actual work: + +```python +from docket import CurrentDocket, Perpetual + +async def find_pending_orders( + docket: Docket = CurrentDocket(), + perpetual: Perpetual = Perpetual(every=timedelta(minutes=1)) +) -> None: + # Find all orders that need processing + pending_orders = await database.fetch_pending_orders() + + # Flood the queue with individual processing tasks + for order in pending_orders: + await docket.add(process_single_order)(order.id) + + print(f"Queued {len(pending_orders)} orders for processing") + +async def process_single_order(order_id: int) -> None: + # Handle one specific order + await process_order_payment(order_id) + await update_inventory(order_id) + await send_confirmation_email(order_id) +``` + +This pattern separates discovery (finding work) from execution (doing work), allowing for better load distribution and fault isolation. The perpetual task stays lightweight and fast, while the actual work is distributed across many workers. + +## Task Scattering with Agenda + +For "find-and-flood" workloads, you often want to distribute a batch of tasks over time rather than scheduling them all immediately. The `Agenda` class collects related tasks and scatters them evenly across a time window. + +### Basic Scattering + +```python +from datetime import timedelta +from docket import Agenda, Docket + +async def process_item(item_id: int) -> None: + await perform_expensive_operation(item_id) + await update_database(item_id) + +async with Docket() as docket: + # Build an agenda of tasks + agenda = Agenda() + for item_id in range(1, 101): # 100 items to process + agenda.add(process_item)(item_id) + + # Scatter them evenly over 50 minutes to avoid overwhelming the system + executions = await agenda.scatter(docket, over=timedelta(minutes=50)) + print(f"Scheduled {len(executions)} tasks over 50 minutes") +``` + +Tasks are distributed evenly across the time window. For 100 tasks over 50 minutes, they'll be scheduled approximately 30 seconds apart. + +### Jitter for Thundering Herd Prevention + +Add random jitter to prevent multiple processes from scheduling identical work at exactly the same times: + +```python +# Scatter with ±30 second jitter around each scheduled time +await agenda.scatter( + docket, + over=timedelta(minutes=50), + jitter=timedelta(seconds=30) +) +``` + +### Future Scatter Windows + +Schedule the entire batch to start at a specific time in the future: + +```python +from datetime import datetime, timezone + +# Start scattering in 2 hours, spread over 30 minutes +start_time = datetime.now(timezone.utc) + timedelta(hours=2) +await agenda.scatter( + docket, + start=start_time, + over=timedelta(minutes=30) +) +``` + +### Mixed Task Types + +Agendas can contain different types of tasks: + +```python +async def send_email(user_id: str, template: str) -> None: + await email_service.send(user_id, template) + +async def update_analytics(event_data: dict[str, str]) -> None: + await analytics_service.track(event_data) + +# Create a mixed agenda +agenda = Agenda() +agenda.add(process_item)(item_id=1001) +agenda.add(send_email)("user123", "welcome") +agenda.add(update_analytics)({"event": "signup", "user": "user123"}) +agenda.add(process_item)(item_id=1002) + +# All tasks will be scattered in the order they were added +await agenda.scatter(docket, over=timedelta(minutes=10)) +``` + +### Single Task Positioning + +When scattering a single task, it's positioned at the midpoint of the time window: + +```python +agenda = Agenda() +agenda.add(process_item)(item_id=42) + +# This task will be scheduled 5 minutes from now (middle of 10-minute window) +await agenda.scatter(docket, over=timedelta(minutes=10)) +``` + +### Agenda Reusability + +Agendas can be reused for multiple scatter operations: + +```python +# Create a reusable template +daily_cleanup_agenda = Agenda() +daily_cleanup_agenda.add(cleanup_temp_files)() +daily_cleanup_agenda.add(compress_old_logs)() +daily_cleanup_agenda.add(update_metrics)() + +# Use it multiple times with different timing +await daily_cleanup_agenda.scatter(docket, over=timedelta(hours=1)) + +# Later, scatter the same tasks over a different window +tomorrow = datetime.now(timezone.utc) + timedelta(days=1) +await daily_cleanup_agenda.scatter( + docket, + start=tomorrow, + over=timedelta(minutes=30) +) +``` + +### Failure Behavior + +Keep in mind that, if an error occurs during scheduling, some tasks may have already been scheduled successfully: + +```python +agenda = Agenda() +agenda.add(valid_task)("arg1") +agenda.add(valid_task)("arg2") +agenda.add("nonexistent_task")("arg3") # This will cause an error +agenda.add(valid_task)("arg4") + +try: + await agenda.scatter(docket, over=timedelta(minutes=10)) +except KeyError: + # The first two tasks were scheduled successfully + # The error prevented the fourth task from being scheduled + pass +``` + +## Task Chain Patterns + +### Sequential Processing + +Create chains of related tasks that pass data forward: + +```python +async def download_data( + url: str, + docket: Docket = CurrentDocket() +) -> None: + file_path = await download_file(url) + await docket.add(validate_data)(file_path) + +async def validate_data( + file_path: str, + docket: Docket = CurrentDocket() +) -> None: + if await is_valid_data(file_path): + await docket.add(process_data)(file_path) + else: + await docket.add(handle_invalid_data)(file_path) + +async def process_data(file_path: str) -> None: + # Final processing step + await transform_and_store(file_path) +``` + +### Fan-out Processing + +Break large tasks into parallel subtasks: + +```python +async def process_large_dataset( + dataset_id: str, + docket: Docket = CurrentDocket() +) -> None: + chunk_ids = await split_dataset_into_chunks(dataset_id) + + # Schedule parallel processing of all chunks + for chunk_id in chunk_ids: + await docket.add(process_chunk)(dataset_id, chunk_id) + + # Schedule a task to run after all chunks should be done + estimated_completion = datetime.now(timezone.utc) + timedelta(hours=2) + await docket.add( + finalize_dataset, + when=estimated_completion, + key=f"finalize-{dataset_id}" + )(dataset_id, len(chunk_ids)) + +async def process_chunk(dataset_id: str, chunk_id: str) -> None: + await process_data_chunk(dataset_id, chunk_id) + await mark_chunk_complete(dataset_id, chunk_id) +``` + +### Conditional Workflows + +Tasks can make decisions about what work to schedule next: + +```python +async def analyze_user_behavior( + user_id: str, + docket: Docket = CurrentDocket() +) -> None: + behavior_data = await collect_user_behavior(user_id) + + if behavior_data.indicates_churn_risk(): + await docket.add(create_retention_campaign)(user_id) + elif behavior_data.indicates_upsell_opportunity(): + await docket.add(create_upsell_campaign)(user_id) + elif behavior_data.indicates_satisfaction(): + # Schedule a follow-up check in 30 days + future_check = datetime.now(timezone.utc) + timedelta(days=30) + await docket.add( + analyze_user_behavior, + when=future_check, + key=f"behavior-check-{user_id}" + )(user_id) +``` diff --git a/docs/testing.md b/docs/testing.md index e55e4cf4..46c0d883 100644 --- a/docs/testing.md +++ b/docs/testing.md @@ -1,6 +1,6 @@ # Testing with Docket -Docket includes the utilities you need to test all your background task systems in realistic ways. The ergonomic design supports testing complex workflows with minimal setup. +Docket includes utilities for testing background task systems in realistic ways with minimal setup. ## Using In-Memory Backend (No Redis Required) @@ -422,5 +422,3 @@ async def test_scheduled_task_timing(test_docket: Docket, test_worker: Worker) - assert reminder_was_sent(123) ``` - -Docket's testing utilities make it straightforward to write comprehensive tests for even complex distributed task workflows. The key is using [`run_until_finished()`](api-reference.md#docket.Worker.run_until_finished) for deterministic execution and [`run_at_most()`](api-reference.md#docket.Worker.run_at_most) for controlling perpetual or self-scheduling tasks. diff --git a/mkdocs.yml b/mkdocs.yml index de850b4c..4f8e43a6 100644 --- a/mkdocs.yml +++ b/mkdocs.yml @@ -4,8 +4,12 @@ site_url: https://docket.lol/ repo_url: https://github.com/chrisguidry/docket repo_name: chrisguidry/docket +edit_uri: edit/main/docs/ + theme: name: material + icon: + logo: material/clipboard-check-outline features: - navigation.instant - navigation.tracking @@ -14,6 +18,9 @@ theme: - navigation.top - search.suggest - search.highlight + - content.code.copy + - content.action.edit + - content.action.view palette: - media: "(prefers-color-scheme: light)" scheme: default @@ -55,11 +62,20 @@ markdown_extensions: - attr_list - md_in_html +extra: + social: + - icon: fontawesome/brands/github + link: https://github.com/chrisguidry/docket + - icon: fontawesome/brands/python + link: https://pypi.org/project/pydocket/ + nav: - Home: index.md - Getting Started: getting-started.md - - Dependencies Guide: dependencies.md + - Task Behaviors: task-behaviors.md + - Dependency Injection: dependency-injection.md + - Task Design Patterns: task-patterns.md + - Task Observability: observability.md - Testing with Docket: testing.md - - Advanced Task Patterns: advanced-patterns.md - Docket in Production: production.md - API Reference: api-reference.md diff --git a/pyproject.toml b/pyproject.toml index 15737245..eff9cadd 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -72,9 +72,6 @@ dev = [ "pytest-xdist>=3.6.1", "ruff>=0.14.14", "types-croniter>=6", -] - -docs = [ "zensical", "mkdocstrings>=1.0.2", "mkdocstrings-python>=2.0.1", diff --git a/uv.lock b/uv.lock index 86e6498e..ae47e217 100644 --- a/uv.lock +++ b/uv.lock @@ -1581,6 +1581,8 @@ dev = [ { name = "docker" }, { name = "ipython" }, { name = "loq" }, + { name = "mkdocstrings" }, + { name = "mkdocstrings-python" }, { name = "mypy" }, { name = "opentelemetry-distro" }, { name = "opentelemetry-exporter-otlp" }, @@ -1602,10 +1604,6 @@ dev = [ { name = "ruff" }, { name = "types-croniter" }, { name = "urllib3" }, -] -docs = [ - { name = "mkdocstrings" }, - { name = "mkdocstrings-python" }, { name = "zensical" }, ] examples = [ @@ -1641,6 +1639,8 @@ dev = [ { name = "docker", specifier = ">=7.1.0" }, { name = "ipython", specifier = ">=8.0.0" }, { name = "loq", specifier = ">=0.1.0a3" }, + { name = "mkdocstrings", specifier = ">=1.0.2" }, + { name = "mkdocstrings-python", specifier = ">=2.0.1" }, { name = "mypy", specifier = ">=1.14.1" }, { name = "opentelemetry-distro", specifier = ">=0.60b0" }, { name = "opentelemetry-exporter-otlp", specifier = ">=1.33.0" }, @@ -1662,10 +1662,6 @@ dev = [ { name = "ruff", specifier = ">=0.14.14" }, { name = "types-croniter", specifier = ">=6" }, { name = "urllib3", specifier = ">=2.6.3" }, -] -docs = [ - { name = "mkdocstrings", specifier = ">=1.0.2" }, - { name = "mkdocstrings-python", specifier = ">=2.0.1" }, { name = "zensical" }, ] examples = [