diff --git a/README.md b/README.md index 82906ba06b..c751916637 100644 --- a/README.md +++ b/README.md @@ -22,12 +22,12 @@ Available addons addon | version | maintainers | summary --- | --- | --- | --- [base_import_async](base_import_async/) | 18.0.1.0.0 | | Import CSV files in the background -[queue_job](queue_job/) | 18.0.2.0.2 | guewen | Job Queue +[queue_job](queue_job/) | 18.0.2.0.6 | guewen sbidoul | Job Queue [queue_job_batch](queue_job_batch/) | 18.0.1.0.0 | | Job Queue Batch [queue_job_cron](queue_job_cron/) | 18.0.1.1.1 | | Scheduled Actions as Queue Jobs [queue_job_cron_jobrunner](queue_job_cron_jobrunner/) | 18.0.1.0.1 | ivantodorovich | Run jobs without a dedicated JobRunner [queue_job_subscribe](queue_job_subscribe/) | 18.0.1.0.0 | | Control which users are subscribed to queue job notifications -[test_queue_job](test_queue_job/) | 18.0.2.0.0 | | Queue Job Tests +[test_queue_job](test_queue_job/) | 18.0.2.0.1 | sbidoul | Queue Job Tests [test_queue_job_batch](test_queue_job_batch/) | 18.0.1.0.0 | | Test Job Queue Batch [//]: # (end addons) diff --git a/queue_job/README.rst b/queue_job/README.rst index 88b5a4d00b..684a9ed058 100644 --- a/queue_job/README.rst +++ b/queue_job/README.rst @@ -11,7 +11,7 @@ Job Queue !! This file is generated by oca-gen-addon-readme !! !! changes will be overwritten. !! !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! - !! source digest: sha256:58f9182440bb316576671959b69148ea5454958f9ae8db75bccd30c89012676d + !! source digest: sha256:9294f4c715c0f0e10a55590082388776b34763472ac72b2b88d0d464b31f42a3 !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! .. |badge1| image:: https://img.shields.io/badge/maturity-Mature-brightgreen.png @@ -627,21 +627,6 @@ Known issues / Roadmap - After creating a new database or installing ``queue_job`` on an existing database, Odoo must be restarted for the runner to detect it. -- When Odoo shuts down normally, it waits for running jobs to finish. - However, when the Odoo server crashes or is otherwise force-stopped, - running jobs are interrupted while the runner has no chance to know - they have been aborted. In such situations, jobs may remain in - ``started`` or ``enqueued`` state after the Odoo server is halted. - Since the runner has no way to know if they are actually running or - not, and does not know for sure if it is safe to restart the jobs, it - does not attempt to restart them automatically. Such stale jobs - therefore fill the running queue and prevent other jobs to start. You - must therefore requeue them manually, either from the Jobs view, or by - running the following SQL statement *before starting Odoo*: - -.. code:: sql - - update queue_job set state='pending' where state in ('started', 'enqueued') Changelog ========= @@ -715,10 +700,13 @@ promote its widespread use. .. |maintainer-guewen| image:: https://github.com/guewen.png?size=40px :target: https://github.com/guewen :alt: guewen +.. |maintainer-sbidoul| image:: https://github.com/sbidoul.png?size=40px + :target: https://github.com/sbidoul + :alt: sbidoul -Current `maintainer `__: +Current `maintainers `__: -|maintainer-guewen| +|maintainer-guewen| |maintainer-sbidoul| This module is part of the `OCA/queue `_ project on GitHub. diff --git a/queue_job/__manifest__.py b/queue_job/__manifest__.py index 69211ec167..a2105b27cd 100644 --- a/queue_job/__manifest__.py +++ b/queue_job/__manifest__.py @@ -2,7 +2,7 @@ { "name": "Job Queue", - "version": "18.0.2.0.2", + "version": "18.0.2.0.6", "author": "Camptocamp,ACSONE SA/NV,Odoo Community Association (OCA)", "website": "https://github.com/OCA/queue", "license": "LGPL-3", @@ -29,7 +29,7 @@ }, "installable": True, "development_status": "Mature", - "maintainers": ["guewen"], + "maintainers": ["guewen", "sbidoul"], "post_init_hook": "post_init_hook", "post_load": "post_load", } diff --git a/queue_job/controllers/main.py b/queue_job/controllers/main.py index 6365e6efbc..a0dce3b8e9 100644 --- a/queue_job/controllers/main.py +++ b/queue_job/controllers/main.py @@ -50,14 +50,15 @@ def _enqueue_dependent_jobs(self, env, job): tries = 0 while True: try: - job.enqueue_waiting() + with job.env.cr.savepoint(): + job.enqueue_waiting() except OperationalError as err: # Automatically retry the typical transaction serialization # errors if err.pgcode not in PG_CONCURRENCY_ERRORS_TO_RETRY: raise if tries >= DEPENDS_MAX_TRIES_ON_CONCURRENCY_FAILURE: - _logger.info( + _logger.error( "%s, maximum number of tries reached to update dependencies", errorcodes.lookup(err.pgcode), ) @@ -177,6 +178,7 @@ def create_test_job( description="Test job", size=1, failure_rate=0, + job_duration=0, ): if not http.request.env.user.has_group("base.group_erp_manager"): raise Forbidden(_("Access Denied")) @@ -187,6 +189,12 @@ def create_test_job( except (ValueError, TypeError): failure_rate = 0 + if job_duration is not None: + try: + job_duration = float(job_duration) + except (ValueError, TypeError): + job_duration = 0 + if not (0 <= failure_rate <= 1): raise BadRequest("failure_rate must be between 0 and 1") @@ -215,6 +223,7 @@ def create_test_job( channel=channel, description=description, failure_rate=failure_rate, + job_duration=job_duration, ) if size > 1: @@ -225,6 +234,7 @@ def create_test_job( channel=channel, description=description, failure_rate=failure_rate, + job_duration=job_duration, ) return "" @@ -236,6 +246,7 @@ def _create_single_test_job( description="Test job", size=1, failure_rate=0, + job_duration=0, ): delayed = ( http.request.env["queue.job"] @@ -245,7 +256,7 @@ def _create_single_test_job( channel=channel, description=description, ) - ._test_job(failure_rate=failure_rate) + ._test_job(failure_rate=failure_rate, job_duration=job_duration) ) return f"job uuid: {delayed.db_record().uuid}" @@ -259,6 +270,7 @@ def _create_graph_test_jobs( channel=None, description="Test job", failure_rate=0, + job_duration=0, ): model = http.request.env["queue.job"] current_count = 0 @@ -281,7 +293,7 @@ def _create_graph_test_jobs( max_retries=max_retries, channel=channel, description="%s #%d" % (description, current_count), - )._test_job(failure_rate=failure_rate) + )._test_job(failure_rate=failure_rate, job_duration=job_duration) ) grouping = random.choice(possible_grouping_methods) diff --git a/queue_job/jobrunner/runner.py b/queue_job/jobrunner/runner.py index a1aa70a4d4..44ae785bc1 100644 --- a/queue_job/jobrunner/runner.py +++ b/queue_job/jobrunner/runner.py @@ -357,23 +357,26 @@ def _query_requeue_dead_jobs(self): ELSE exc_info END) WHERE - id in ( - SELECT - queue_job_id - FROM - queue_job_lock - WHERE - queue_job_id in ( - SELECT - id - FROM - queue_job - WHERE - state IN ('enqueued','started') - AND date_enqueued < - (now() AT TIME ZONE 'utc' - INTERVAL '10 sec') - ) - FOR UPDATE SKIP LOCKED + state IN ('enqueued','started') + AND date_enqueued < (now() AT TIME ZONE 'utc' - INTERVAL '10 sec') + AND ( + id in ( + SELECT + queue_job_id + FROM + queue_job_lock + WHERE + queue_job_lock.queue_job_id = queue_job.id + FOR UPDATE SKIP LOCKED + ) + OR NOT EXISTS ( + SELECT + 1 + FROM + queue_job_lock + WHERE + queue_job_lock.queue_job_id = queue_job.id + ) ) RETURNING uuid """ @@ -396,6 +399,12 @@ def requeue_dead_jobs(self): However, when the Odoo server crashes or is otherwise force-stopped, running jobs are interrupted while the runner has no chance to know they have been aborted. + + This also handles orphaned jobs (enqueued but never started, no lock). + This edge case occurs when the runner marks a job as 'enqueued' + but the HTTP request to start the job never reaches the Odoo server + (e.g., due to server shutdown/crash between setting enqueued and + the controller receiving the request). """ with closing(self.conn.cursor()) as cr: diff --git a/queue_job/models/queue_job.py b/queue_job/models/queue_job.py index 411ae43af5..bf1046f18e 100644 --- a/queue_job/models/queue_job.py +++ b/queue_job/models/queue_job.py @@ -3,6 +3,7 @@ import logging import random +import time from datetime import datetime, timedelta from odoo import _, api, exceptions, fields, models @@ -16,6 +17,7 @@ from ..job import ( CANCELLED, DONE, + ENQUEUED, FAILED, PENDING, STARTED, @@ -324,18 +326,26 @@ def _change_job_state(self, state, result=None): raise ValueError(f"State not supported: {state}") def button_done(self): + # If job was set to STARTED or CANCELLED, do not set it to DONE + states_from = (WAIT_DEPENDENCIES, PENDING, ENQUEUED, FAILED) result = _("Manually set to done by {}").format(self.env.user.name) - self._change_job_state(DONE, result=result) + records = self.filtered(lambda job_: job_.state in states_from) + records._change_job_state(DONE, result=result) return True def button_cancelled(self): + # If job was set to DONE or WAIT_DEPENDENCIES, do not cancel it + states_from = (PENDING, ENQUEUED, FAILED) result = _("Cancelled by {}").format(self.env.user.name) - self._change_job_state(CANCELLED, result=result) + records = self.filtered(lambda job_: job_.state in states_from) + records._change_job_state(CANCELLED, result=result) return True def requeue(self): - jobs_to_requeue = self.filtered(lambda job_: job_.state != WAIT_DEPENDENCIES) - jobs_to_requeue._change_job_state(PENDING) + # If job is already in queue or started, do not requeue it + states_from = (FAILED, DONE, CANCELLED) + records = self.filtered(lambda job_: job_.state in states_from) + records._change_job_state(PENDING) return True def _message_post_on_failure(self): @@ -442,7 +452,9 @@ def related_action_open_record(self): ) return action - def _test_job(self, failure_rate=0): + def _test_job(self, failure_rate=0, job_duration=0): _logger.info("Running test job.") if random.random() <= failure_rate: raise JobError("Job failed") + if job_duration: + time.sleep(job_duration) diff --git a/queue_job/readme/ROADMAP.md b/queue_job/readme/ROADMAP.md index a13be6beb3..df33142b88 100644 --- a/queue_job/readme/ROADMAP.md +++ b/queue_job/readme/ROADMAP.md @@ -1,17 +1,2 @@ - After creating a new database or installing `queue_job` on an existing database, Odoo must be restarted for the runner to detect it. -- When Odoo shuts down normally, it waits for running jobs to finish. - However, when the Odoo server crashes or is otherwise force-stopped, - running jobs are interrupted while the runner has no chance to know - they have been aborted. In such situations, jobs may remain in - `started` or `enqueued` state after the Odoo server is halted. Since - the runner has no way to know if they are actually running or not, and - does not know for sure if it is safe to restart the jobs, it does not - attempt to restart them automatically. Such stale jobs therefore fill - the running queue and prevent other jobs to start. You must therefore - requeue them manually, either from the Jobs view, or by running the - following SQL statement *before starting Odoo*: - -``` sql -update queue_job set state='pending' where state in ('started', 'enqueued') -``` diff --git a/queue_job/static/description/index.html b/queue_job/static/description/index.html index 6cc2121a4d..04174801d2 100644 --- a/queue_job/static/description/index.html +++ b/queue_job/static/description/index.html @@ -372,7 +372,7 @@

Job Queue

!! This file is generated by oca-gen-addon-readme !! !! changes will be overwritten. !! !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! -!! source digest: sha256:58f9182440bb316576671959b69148ea5454958f9ae8db75bccd30c89012676d +!! source digest: sha256:9294f4c715c0f0e10a55590082388776b34763472ac72b2b88d0d464b31f42a3 !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! -->

Mature License: LGPL-3 OCA/queue Translate me on Weblate Try me on Runboat

This addon adds an integrated Job Queue to Odoo.

@@ -928,21 +928,7 @@

Known issues / Roadmap

  • After creating a new database or installing queue_job on an existing database, Odoo must be restarted for the runner to detect it.
  • -
  • When Odoo shuts down normally, it waits for running jobs to finish. -However, when the Odoo server crashes or is otherwise force-stopped, -running jobs are interrupted while the runner has no chance to know -they have been aborted. In such situations, jobs may remain in -started or enqueued state after the Odoo server is halted. -Since the runner has no way to know if they are actually running or -not, and does not know for sure if it is safe to restart the jobs, it -does not attempt to restart them automatically. Such stale jobs -therefore fill the running queue and prevent other jobs to start. You -must therefore requeue them manually, either from the Jobs view, or by -running the following SQL statement before starting Odoo:
-
-update queue_job set state='pending' where state in ('started', 'enqueued')
-

Changelog

@@ -1008,8 +994,8 @@

Maintainers

OCA, or the Odoo Community Association, is a nonprofit organization whose mission is to support the collaborative development of Odoo features and promote its widespread use.

-

Current maintainer:

-

guewen

+

Current maintainers:

+

guewen sbidoul

This module is part of the OCA/queue project on GitHub.

You are welcome to contribute. To learn how please visit https://odoo-community.org/page/Contribute.

diff --git a/queue_job/tests/test_wizards.py b/queue_job/tests/test_wizards.py index 2ac162d313..7738836d2f 100644 --- a/queue_job/tests/test_wizards.py +++ b/queue_job/tests/test_wizards.py @@ -46,3 +46,60 @@ def test_03_done(self): wizard = self._wizard("queue.jobs.to.done") wizard.set_done() self.assertEqual(self.job.state, "done") + + def test_04_requeue_forbidden(self): + wizard = self._wizard("queue.requeue.job") + + # State WAIT_DEPENDENCIES is not requeued + self.job.state = "wait_dependencies" + wizard.requeue() + self.assertEqual(self.job.state, "wait_dependencies") + + # State PENDING, ENQUEUED or STARTED are ignored too + for test_state in ("pending", "enqueued", "started"): + self.job.state = test_state + wizard.requeue() + self.assertEqual(self.job.state, test_state) + + # States CANCELLED, DONE or FAILED will change status + self.job.state = "cancelled" + wizard.requeue() + self.assertEqual(self.job.state, "pending") + + def test_05_cancel_forbidden(self): + wizard = self._wizard("queue.jobs.to.cancelled") + + # State WAIT_DEPENDENCIES is not cancelled + self.job.state = "wait_dependencies" + wizard.set_cancelled() + self.assertEqual(self.job.state, "wait_dependencies") + + # State DONE is not cancelled + self.job.state = "done" + wizard.set_cancelled() + self.assertEqual(self.job.state, "done") + + # State PENDING, ENQUEUED or FAILED will be cancelled + for test_state in ("pending", "enqueued"): + self.job.state = test_state + wizard.set_cancelled() + self.assertEqual(self.job.state, "cancelled") + + def test_06_done_forbidden(self): + wizard = self._wizard("queue.jobs.to.done") + + # State STARTED is not set DONE manually + self.job.state = "started" + wizard.set_done() + self.assertEqual(self.job.state, "started") + + # State CANCELLED is not cancelled + self.job.state = "cancelled" + wizard.set_done() + self.assertEqual(self.job.state, "cancelled") + + # State WAIT_DEPENDENCIES, PENDING, ENQUEUED or FAILED will be set to DONE + for test_state in ("wait_dependencies", "pending", "enqueued"): + self.job.state = test_state + wizard.set_done() + self.assertEqual(self.job.state, "done") diff --git a/queue_job/wizards/queue_jobs_to_cancelled.py b/queue_job/wizards/queue_jobs_to_cancelled.py index 9e73374ebd..bb9f831576 100644 --- a/queue_job/wizards/queue_jobs_to_cancelled.py +++ b/queue_job/wizards/queue_jobs_to_cancelled.py @@ -10,8 +10,8 @@ class SetJobsToCancelled(models.TransientModel): _description = "Cancel all selected jobs" def set_cancelled(self): - jobs = self.job_ids.filtered( - lambda x: x.state in ("pending", "failed", "enqueued") - ) + # Only jobs with state PENDING, FAILED, ENQUEUED + # will change to CANCELLED + jobs = self.job_ids jobs.button_cancelled() return {"type": "ir.actions.act_window_close"} diff --git a/queue_job/wizards/queue_jobs_to_done.py b/queue_job/wizards/queue_jobs_to_done.py index ff1366ffed..caf8129213 100644 --- a/queue_job/wizards/queue_jobs_to_done.py +++ b/queue_job/wizards/queue_jobs_to_done.py @@ -10,6 +10,8 @@ class SetJobsToDone(models.TransientModel): _description = "Set all selected jobs to done" def set_done(self): + # Only jobs with state WAIT_DEPENDENCIES, PENDING, ENQUEUED or FAILED + # will change to DONE jobs = self.job_ids jobs.button_done() return {"type": "ir.actions.act_window_close"} diff --git a/queue_job/wizards/queue_requeue_job.py b/queue_job/wizards/queue_requeue_job.py index 67d2ffcbdc..a88256300f 100644 --- a/queue_job/wizards/queue_requeue_job.py +++ b/queue_job/wizards/queue_requeue_job.py @@ -20,6 +20,7 @@ def _default_job_ids(self): ) def requeue(self): + # Only jobs with state FAILED, DONE or CANCELLED will change to PENDING jobs = self.job_ids jobs.requeue() return {"type": "ir.actions.act_window_close"} diff --git a/test_queue_job/__manifest__.py b/test_queue_job/__manifest__.py index 1a844dcd39..8e80ab2ced 100644 --- a/test_queue_job/__manifest__.py +++ b/test_queue_job/__manifest__.py @@ -3,7 +3,7 @@ { "name": "Queue Job Tests", - "version": "18.0.2.0.0", + "version": "18.0.2.0.1", "author": "Camptocamp,Odoo Community Association (OCA)", "license": "LGPL-3", "category": "Generic Modules", @@ -15,5 +15,6 @@ "security/ir.model.access.csv", "data/queue_job_test_job.xml", ], + "maintainers": ["sbidoul"], "installable": True, } diff --git a/test_queue_job/tests/test_requeue_dead_job.py b/test_queue_job/tests/test_requeue_dead_job.py index a6328fed76..58890adf24 100644 --- a/test_queue_job/tests/test_requeue_dead_job.py +++ b/test_queue_job/tests/test_requeue_dead_job.py @@ -99,3 +99,19 @@ def test_requeue_dead_jobs(self): uuids_requeued = self.env.cr.fetchall() self.assertTrue(queue_job.uuid in j[0] for j in uuids_requeued) + + def test_requeue_orphaned_jobs(self): + queue_job = self._get_demo_job("test_enqueued_job") + job_obj = Job.load(self.env, queue_job.uuid) + + # Only enqueued job, don't set it to started to simulate the scenario + # that system shutdown before job is starting + job_obj.set_enqueued() + job_obj.date_enqueued = datetime.now() - timedelta(minutes=1) + job_obj.store() + + # job is now picked up by the requeue query (which includes orphaned jobs) + query = Database(self.env.cr.dbname)._query_requeue_dead_jobs() + self.env.cr.execute(query) + uuids_requeued = self.env.cr.fetchall() + self.assertTrue(queue_job.uuid in j[0] for j in uuids_requeued)