diff --git a/README.md b/README.md
index c751916637..7662abac7b 100644
--- a/README.md
+++ b/README.md
@@ -22,12 +22,12 @@ Available addons
addon | version | maintainers | summary
--- | --- | --- | ---
[base_import_async](base_import_async/) | 18.0.1.0.0 | | Import CSV files in the background
-[queue_job](queue_job/) | 18.0.2.0.6 |
| Job Queue
+[queue_job](queue_job/) | 18.0.2.0.8 |
| Job Queue
[queue_job_batch](queue_job_batch/) | 18.0.1.0.0 | | Job Queue Batch
[queue_job_cron](queue_job_cron/) | 18.0.1.1.1 | | Scheduled Actions as Queue Jobs
[queue_job_cron_jobrunner](queue_job_cron_jobrunner/) | 18.0.1.0.1 |
| Run jobs without a dedicated JobRunner
[queue_job_subscribe](queue_job_subscribe/) | 18.0.1.0.0 | | Control which users are subscribed to queue job notifications
-[test_queue_job](test_queue_job/) | 18.0.2.0.1 |
| Queue Job Tests
+[test_queue_job](test_queue_job/) | 18.0.2.0.2 |
| Queue Job Tests
[test_queue_job_batch](test_queue_job_batch/) | 18.0.1.0.0 | | Test Job Queue Batch
[//]: # (end addons)
diff --git a/queue_job/README.rst b/queue_job/README.rst
index 684a9ed058..c53bd635bf 100644
--- a/queue_job/README.rst
+++ b/queue_job/README.rst
@@ -11,7 +11,7 @@ Job Queue
!! This file is generated by oca-gen-addon-readme !!
!! changes will be overwritten. !!
!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
- !! source digest: sha256:9294f4c715c0f0e10a55590082388776b34763472ac72b2b88d0d464b31f42a3
+ !! source digest: sha256:b6e7440cf7bc258be59e8814bdaa4176e0afd49f6e512ae408299cb8d7f7e327
!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
.. |badge1| image:: https://img.shields.io/badge/maturity-Mature-brightgreen.png
diff --git a/queue_job/__manifest__.py b/queue_job/__manifest__.py
index a2105b27cd..978356cfd7 100644
--- a/queue_job/__manifest__.py
+++ b/queue_job/__manifest__.py
@@ -2,7 +2,7 @@
{
"name": "Job Queue",
- "version": "18.0.2.0.6",
+ "version": "18.0.2.0.8",
"author": "Camptocamp,ACSONE SA/NV,Odoo Community Association (OCA)",
"website": "https://github.com/OCA/queue",
"license": "LGPL-3",
diff --git a/queue_job/controllers/main.py b/queue_job/controllers/main.py
index a0dce3b8e9..28f3534848 100644
--- a/queue_job/controllers/main.py
+++ b/queue_job/controllers/main.py
@@ -27,15 +27,48 @@
class RunJobController(http.Controller):
- def _try_perform_job(self, env, job):
- """Try to perform the job."""
+ @classmethod
+ def _acquire_job(cls, env: api.Environment, job_uuid: str) -> Job | None:
+ """Acquire a job for execution.
+
+ - make sure it is in ENQUEUED state
+ - mark it as STARTED and commit the state change
+ - acquire the job lock
+
+ If successful, return the Job instance, otherwise return None. This
+ function may fail to acquire the job is not in the expected state or is
+ already locked by another worker.
+ """
+ env.cr.execute(
+ "SELECT uuid FROM queue_job WHERE uuid=%s AND state=%s "
+ "FOR UPDATE SKIP LOCKED",
+ (job_uuid, ENQUEUED),
+ )
+ if not env.cr.fetchone():
+ _logger.warning(
+ "was requested to run job %s, but it does not exist, "
+ "or is not in state %s, or is being handled by another worker",
+ job_uuid,
+ ENQUEUED,
+ )
+ return None
+ job = Job.load(env, job_uuid)
+ assert job and job.state == ENQUEUED
job.set_started()
job.store()
env.cr.commit()
- job.lock()
+ if not job.lock():
+ _logger.warning(
+ "was requested to run job %s, but it could not be locked",
+ job_uuid,
+ )
+ return None
+ return job
+ @classmethod
+ def _try_perform_job(cls, env, job):
+ """Try to perform the job, mark it done and commit if successful."""
_logger.debug("%s started", job)
-
job.perform()
# Triggers any stored computed fields before calling 'set_done'
# so that will be part of the 'exec_time'
@@ -46,7 +79,8 @@ def _try_perform_job(self, env, job):
env.cr.commit()
_logger.debug("%s done", job)
- def _enqueue_dependent_jobs(self, env, job):
+ @classmethod
+ def _enqueue_dependent_jobs(cls, env, job):
tries = 0
while True:
try:
@@ -76,17 +110,8 @@ def _enqueue_dependent_jobs(self, env, job):
else:
break
- @http.route(
- "/queue_job/runjob",
- type="http",
- auth="none",
- save_session=False,
- readonly=False,
- )
- def runjob(self, db, job_uuid, **kw):
- http.request.session.db = db
- env = http.request.env(user=SUPERUSER_ID)
-
+ @classmethod
+ def _runjob(cls, env: api.Environment, job: Job) -> None:
def retry_postpone(job, message, seconds=None):
job.env.clear()
with Registry(job.env.cr.dbname).cursor() as new_cr:
@@ -95,26 +120,9 @@ def retry_postpone(job, message, seconds=None):
job.set_pending(reset_retry=False)
job.store()
- # ensure the job to run is in the correct state and lock the record
- env.cr.execute(
- "SELECT state FROM queue_job WHERE uuid=%s AND state=%s FOR UPDATE",
- (job_uuid, ENQUEUED),
- )
- if not env.cr.fetchone():
- _logger.warning(
- "was requested to run job %s, but it does not exist, "
- "or is not in state %s",
- job_uuid,
- ENQUEUED,
- )
- return ""
-
- job = Job.load(env, job_uuid)
- assert job and job.state == ENQUEUED
-
try:
try:
- self._try_perform_job(env, job)
+ cls._try_perform_job(env, job)
except OperationalError as err:
# Automatically retry the typical transaction serialization
# errors
@@ -132,7 +140,6 @@ def retry_postpone(job, message, seconds=None):
# traceback in the logs we should have the traceback when all
# retries are exhausted
env.cr.rollback()
- return ""
except (FailedJobError, Exception) as orig_exception:
buff = StringIO()
@@ -142,19 +149,18 @@ def retry_postpone(job, message, seconds=None):
job.env.clear()
with Registry(job.env.cr.dbname).cursor() as new_cr:
job.env = job.env(cr=new_cr)
- vals = self._get_failure_values(job, traceback_txt, orig_exception)
+ vals = cls._get_failure_values(job, traceback_txt, orig_exception)
job.set_failed(**vals)
job.store()
buff.close()
raise
_logger.debug("%s enqueue depends started", job)
- self._enqueue_dependent_jobs(env, job)
+ cls._enqueue_dependent_jobs(env, job)
_logger.debug("%s enqueue depends done", job)
- return ""
-
- def _get_failure_values(self, job, traceback_txt, orig_exception):
+ @classmethod
+ def _get_failure_values(cls, job, traceback_txt, orig_exception):
"""Collect relevant data from exception."""
exception_name = orig_exception.__class__.__name__
if hasattr(orig_exception, "__module__"):
@@ -168,6 +174,22 @@ def _get_failure_values(self, job, traceback_txt, orig_exception):
"exc_message": exc_message,
}
+ @http.route(
+ "/queue_job/runjob",
+ type="http",
+ auth="none",
+ save_session=False,
+ readonly=False,
+ )
+ def runjob(self, db, job_uuid, **kw):
+ http.request.session.db = db
+ env = http.request.env(user=SUPERUSER_ID)
+ job = self._acquire_job(env, job_uuid)
+ if not job:
+ return ""
+ self._runjob(env, job)
+ return ""
+
# flake8: noqa: C901
@http.route("/queue_job/create_test_job", type="http", auth="user")
def create_test_job(
diff --git a/queue_job/job.py b/queue_job/job.py
index 6cfe12f232..4c78072508 100644
--- a/queue_job/job.py
+++ b/queue_job/job.py
@@ -221,7 +221,7 @@ def load_many(cls, env, job_uuids):
recordset = cls.db_records_from_uuids(env, job_uuids)
return {cls._load_from_db_record(record) for record in recordset}
- def add_lock_record(self):
+ def add_lock_record(self) -> None:
"""
Create row in db to be locked while the job is being performed.
"""
@@ -241,13 +241,11 @@ def add_lock_record(self):
[self.uuid],
)
- def lock(self):
- """
- Lock row of job that is being performed
+ def lock(self) -> bool:
+ """Lock row of job that is being performed.
- If a job cannot be locked,
- it means that the job wasn't started,
- a RetryableJobError is thrown.
+ Return False if a job cannot be locked: it means that the job is not in
+ STARTED state or is already locked by another worker.
"""
self.env.cr.execute(
"""
@@ -263,18 +261,15 @@ def lock(self):
queue_job
WHERE
uuid = %s
- AND state='started'
+ AND state = %s
)
- FOR UPDATE;
+ FOR UPDATE SKIP LOCKED;
""",
- [self.uuid],
+ [self.uuid, STARTED],
)
# 1 job should be locked
- if 1 != len(self.env.cr.fetchall()):
- raise RetryableJobError(
- f"Trying to lock job that wasn't started, uuid: {self.uuid}"
- )
+ return bool(self.env.cr.fetchall())
@classmethod
def _load_from_db_record(cls, job_db_record):
diff --git a/queue_job/models/queue_job.py b/queue_job/models/queue_job.py
index bf1046f18e..b8975b878e 100644
--- a/queue_job/models/queue_job.py
+++ b/queue_job/models/queue_job.py
@@ -102,6 +102,7 @@ class QueueJob(models.Model):
date_done = fields.Datetime(readonly=True)
exec_time = fields.Float(
string="Execution Time (avg)",
+ readonly=True,
aggregator="avg",
help="Time required to execute this job in seconds. Average when grouped.",
)
diff --git a/queue_job/static/description/index.html b/queue_job/static/description/index.html
index 04174801d2..6bdbe38f03 100644
--- a/queue_job/static/description/index.html
+++ b/queue_job/static/description/index.html
@@ -372,7 +372,7 @@
This addon adds an integrated Job Queue to Odoo.
diff --git a/test_queue_job/__manifest__.py b/test_queue_job/__manifest__.py index 8e80ab2ced..d909d231b1 100644 --- a/test_queue_job/__manifest__.py +++ b/test_queue_job/__manifest__.py @@ -3,7 +3,7 @@ { "name": "Queue Job Tests", - "version": "18.0.2.0.1", + "version": "18.0.2.0.2", "author": "Camptocamp,Odoo Community Association (OCA)", "license": "LGPL-3", "category": "Generic Modules", diff --git a/test_queue_job/tests/__init__.py b/test_queue_job/tests/__init__.py index 62347148e5..0cfacebdf3 100644 --- a/test_queue_job/tests/__init__.py +++ b/test_queue_job/tests/__init__.py @@ -1,3 +1,4 @@ +from . import test_acquire_job from . import test_autovacuum from . import test_delayable from . import test_dependencies diff --git a/test_queue_job/tests/common.py b/test_queue_job/tests/common.py index 335c072625..d3173a2198 100644 --- a/test_queue_job/tests/common.py +++ b/test_queue_job/tests/common.py @@ -20,3 +20,13 @@ def _create_job(self): stored = Job.db_records_from_uuids(self.env, [test_job.uuid]) self.assertEqual(len(stored), 1) return stored + + def _get_demo_job(self, uuid): + # job created during load of demo data + job = self.env["queue.job"].search([("uuid", "=", uuid)], limit=1) + self.assertTrue( + job, + f"Demo data queue job {uuid!r} should be loaded in order " + "to make this test work", + ) + return job diff --git a/test_queue_job/tests/test_acquire_job.py b/test_queue_job/tests/test_acquire_job.py new file mode 100644 index 0000000000..3f0c92a2be --- /dev/null +++ b/test_queue_job/tests/test_acquire_job.py @@ -0,0 +1,51 @@ +# Copyright 2026 ACSONE SA/NV +# License AGPL-3.0 or later (https://www.gnu.org/licenses/agpl). +import logging +from unittest import mock + +from odoo.tests import tagged + +from odoo.addons.queue_job.controllers.main import RunJobController + +from .common import JobCommonCase + + +@tagged("post_install", "-at_install") +class TestRequeueDeadJob(JobCommonCase): + def test_acquire_enqueued_job(self): + job_record = self._get_demo_job(uuid="test_enqueued_job") + self.assertFalse( + self.env["queue.job.lock"].search( + [("queue_job_id", "=", job_record.id)], + ), + "A job lock record should not exist at this point", + ) + with mock.patch.object( + self.env.cr, "commit", mock.Mock(side_effect=self.env.flush_all) + ) as mock_commit: + job = RunJobController._acquire_job(self.env, job_uuid="test_enqueued_job") + mock_commit.assert_called_once() + self.assertIsNotNone(job) + self.assertEqual(job.uuid, "test_enqueued_job") + self.assertEqual(job.state, "started") + self.assertTrue( + self.env["queue.job.lock"].search( + [("queue_job_id", "=", job_record.id)] + ), + "A job lock record should exist at this point", + ) + + def test_acquire_started_job(self): + with ( + mock.patch.object( + self.env.cr, "commit", mock.Mock(side_effect=self.env.flush_all) + ) as mock_commit, + self.assertLogs(level=logging.WARNING) as logs, + ): + job = RunJobController._acquire_job(self.env, "test_started_job") + mock_commit.assert_not_called() + self.assertIsNone(job) + self.assertIn( + "was requested to run job test_started_job, but it does not exist", + logs.output[0], + ) diff --git a/test_queue_job/tests/test_requeue_dead_job.py b/test_queue_job/tests/test_requeue_dead_job.py index 58890adf24..510276be63 100644 --- a/test_queue_job/tests/test_requeue_dead_job.py +++ b/test_queue_job/tests/test_requeue_dead_job.py @@ -13,23 +13,6 @@ @tagged("post_install", "-at_install") class TestRequeueDeadJob(JobCommonCase): - def _get_demo_job(self, uuid): - # job created during load of demo data - job = self.env["queue.job"].search( - [ - ("uuid", "=", uuid), - ], - limit=1, - ) - - self.assertTrue( - job, - f"Demo data queue job {uuid} should be loaded in order" - " to make this tests work", - ) - - return job - def get_locks(self, uuid, cr=None): """ Retrieve lock rows