1414from enum import Enum
1515from random import shuffle
1616from types import FrameType
17- from typing import List , Optional , Tuple , Any , Iterable , Collection
17+ from typing import List , Optional , Tuple , Any , Iterable , Collection , Union
1818
1919import scheduler
2020from scheduler .helpers .queues import get_queue
@@ -63,7 +63,7 @@ class QueueConnectionDiscrepancyError(Exception):
6363)
6464
6565
66- def signal_name (signum ) -> str :
66+ def signal_name (signum : int ) -> str :
6767 try :
6868 return signal .Signals (signum ).name
6969 except KeyError :
@@ -99,19 +99,19 @@ def from_model(cls, model: WorkerModel) -> Self:
9999 return res
100100
101101 def __init__ (
102- self ,
103- queues ,
104- name : str ,
105- connection : ConnectionType ,
106- maintenance_interval : int = SCHEDULER_CONFIG .DEFAULT_MAINTENANCE_TASK_INTERVAL ,
107- job_monitoring_interval = SCHEDULER_CONFIG .DEFAULT_JOB_MONITORING_INTERVAL ,
108- dequeue_strategy : DequeueStrategy = DequeueStrategy .DEFAULT ,
109- disable_default_exception_handler : bool = False ,
110- fork_job_execution : bool = True ,
111- with_scheduler : bool = True ,
112- burst : bool = False ,
113- model : Optional [WorkerModel ] = None ,
114- ): # noqa
102+ self ,
103+ queues : Iterable [ Union [ str , Queue ]] ,
104+ name : str ,
105+ connection : ConnectionType ,
106+ maintenance_interval : int = SCHEDULER_CONFIG .DEFAULT_MAINTENANCE_TASK_INTERVAL ,
107+ job_monitoring_interval : int = SCHEDULER_CONFIG .DEFAULT_JOB_MONITORING_INTERVAL ,
108+ dequeue_strategy : DequeueStrategy = DequeueStrategy .DEFAULT ,
109+ disable_default_exception_handler : bool = False ,
110+ fork_job_execution : bool = True ,
111+ with_scheduler : bool = True ,
112+ burst : bool = False ,
113+ model : Optional [WorkerModel ] = None ,
114+ ) -> None :
115115 self .fork_job_execution = fork_job_execution
116116 self .job_monitoring_interval : int = job_monitoring_interval
117117 self .maintenance_interval = maintenance_interval
@@ -232,7 +232,7 @@ def work(self, max_jobs: Optional[int] = None, max_idle_time: Optional[int] = No
232232
233233 timeout = None if self .burst else (SCHEDULER_CONFIG .DEFAULT_WORKER_TTL - 15 )
234234 job , queue = self .dequeue_job_and_maintain_ttl (timeout , max_idle_time )
235- if job is None :
235+ if job is None or queue is None :
236236 if self .burst :
237237 logger .info (f"[Worker { self .name } /{ self ._pid } ]: done, quitting" )
238238 break
@@ -267,7 +267,7 @@ def work(self, max_jobs: Optional[int] = None, max_idle_time: Optional[int] = No
267267 self .teardown ()
268268 return False
269269
270- def handle_job_failure (self , job : JobModel , queue : Queue , exc_string = "" ) -> None :
270+ def handle_job_failure (self , job : JobModel , queue : Queue , exc_string : str = "" ) -> None :
271271 """
272272 Handles the failure or an executing job by:
273273 1. Setting the job status to failed
@@ -312,7 +312,7 @@ def handle_job_failure(self, job: JobModel, queue: Queue, exc_string="") -> None
312312 # Ensure that custom exception handlers are called even if the Broker is down
313313 pass
314314
315- def bootstrap (self )-> None :
315+ def bootstrap (self ) -> None :
316316 """Bootstraps the worker.
317317 Runs the basic tasks that should run when the worker actually starts working.
318318 Used so that new workers can focus on the work loop implementation rather
@@ -327,7 +327,8 @@ def bootstrap(self)-> None:
327327 self ._model .has_scheduler = True
328328 self ._model .save (connection = self .connection )
329329 if self .with_scheduler and self .burst :
330- self .scheduler .request_stop_and_wait ()
330+ if self .scheduler is not None :
331+ self .scheduler .request_stop_and_wait ()
331332 self ._model .has_scheduler = False
332333 self ._model .save (connection = self .connection )
333334 qnames = [queue .name for queue in self .queues ]
@@ -375,8 +376,8 @@ def run_maintenance_tasks(self) -> None:
375376 self ._model .save (connection = self .connection )
376377
377378 def dequeue_job_and_maintain_ttl (
378- self , timeout : Optional [int ], max_idle_time : Optional [int ] = None
379- ) -> Tuple [JobModel , Queue ]:
379+ self , timeout : Optional [int ], max_idle_time : Optional [int ] = None
380+ ) -> Tuple [Optional [ JobModel ], Optional [ Queue ] ]:
380381 """Dequeues a job while maintaining the TTL.
381382 :param timeout: The timeout for the dequeue operation.
382383 :param max_idle_time: The maximum idle time for the worker.
@@ -550,7 +551,7 @@ def reorder_queues(self, reference_queue: Queue) -> None:
550551 return
551552 if self ._dequeue_strategy == DequeueStrategy .ROUND_ROBIN :
552553 pos = self ._ordered_queues .index (reference_queue )
553- self ._ordered_queues = self ._ordered_queues [pos + 1 :] + self ._ordered_queues [: pos + 1 ]
554+ self ._ordered_queues = self ._ordered_queues [pos + 1 :] + self ._ordered_queues [: pos + 1 ]
554555 return
555556 if self ._dequeue_strategy == DequeueStrategy .RANDOM :
556557 shuffle (self ._ordered_queues )
@@ -634,7 +635,7 @@ def monitor_job_execution_process(self, job: JobModel, queue: Queue) -> None:
634635 while True :
635636 try :
636637 with SCHEDULER_CONFIG .DEATH_PENALTY_CLASS (
637- self .job_monitoring_interval , JobExecutionMonitorTimeoutException
638+ self .job_monitoring_interval , JobExecutionMonitorTimeoutException
638639 ):
639640 retpid , ret_val = self .wait_for_job_execution_process ()
640641 break
@@ -877,7 +878,7 @@ class RoundRobinWorker(Worker):
877878
878879 def reorder_queues (self , reference_queue : Queue ) -> None :
879880 pos = self ._ordered_queues .index (reference_queue )
880- self ._ordered_queues = self ._ordered_queues [pos + 1 :] + self ._ordered_queues [: pos + 1 ]
881+ self ._ordered_queues = self ._ordered_queues [pos + 1 :] + self ._ordered_queues [: pos + 1 ]
881882
882883
883884class RandomWorker (Worker ):
0 commit comments