From b486a29184a194d0c9f60455a6d4c5ae987b98ee Mon Sep 17 00:00:00 2001 From: Luis Saavedra Date: Sun, 9 Aug 2020 23:52:40 -0400 Subject: [PATCH] fix retry --- celery_singleton/singleton.py | 36 +++++++++++++++++++++-------------- 1 file changed, 22 insertions(+), 14 deletions(-) diff --git a/celery_singleton/singleton.py b/celery_singleton/singleton.py index 83d433e..209018e 100644 --- a/celery_singleton/singleton.py +++ b/celery_singleton/singleton.py @@ -22,6 +22,10 @@ class Singleton(BaseTask): raise_on_duplicate = None lock_expiry = None + def __init__(self, *args, **kwargs): + self._unlock_to_super_retry = False + super(Singleton, self).__init__(*args, **kwargs) + @property def _raise_on_duplicate(self): if self.raise_on_duplicate is not None: @@ -78,17 +82,8 @@ def generate_lock(self, task_name, task_args=None, task_kwargs=None): key_prefix=self.singleton_config.key_prefix, ) - def apply_async( - self, - args=None, - kwargs=None, - task_id=None, - producer=None, - link=None, - link_error=None, - shadow=None, - **options - ): + def apply_async(self, args=None, kwargs=None, task_id=None, producer=None, + link=None, link_error=None, shadow=None, **options): args = args or [] kwargs = kwargs or {} task_id = task_id or uuid() @@ -120,14 +115,15 @@ def apply_async( def lock_and_run(self, lock, *args, task_id=None, **kwargs): lock_aquired = self.aquire_lock(lock, task_id) - if lock_aquired: + if lock_aquired or self._unlock_to_super_retry: try: return super(Singleton, self).apply_async( *args, task_id=task_id, **kwargs ) except Exception: # Clear the lock if apply_async fails - self.unlock(lock) + if lock_aquired: + self.unlock(lock) raise def release_lock(self, task_args=None, task_kwargs=None): @@ -140,7 +136,9 @@ def unlock(self, lock): def on_duplicate(self, existing_task_id): if self._raise_on_duplicate: raise DuplicateTaskError( - "Attempted to queue a duplicate of task ID {}".format(existing_task_id), + "Attempted to queue a duplicate of task ID {}".format( + existing_task_id + ), task_id=existing_task_id, ) return self.AsyncResult(existing_task_id) @@ -150,3 +148,13 @@ def on_failure(self, exc, task_id, args, kwargs, einfo): def on_success(self, retval, task_id, args, kwargs): self.release_lock(task_args=args, task_kwargs=kwargs) + + def retry(self, args=None, kwargs=None, exc=None, throw=True, + eta=None, countdown=None, max_retries=None, **options): + self._unlock_to_super_retry = True + retry_task = super(Singleton, self).retry( + args, kwargs, exc, throw, eta, countdown, max_retries, **options + ) + self._unlock_to_super_retry = False + + return retry_task