From 1353fe0dba040362fde444ef306fdb01d55c8de4 Mon Sep 17 00:00:00 2001 From: JosepSampe Date: Mon, 21 Jul 2025 19:38:57 +0200 Subject: [PATCH 1/2] [Core] Improve reliability of job monitor --- CHANGELOG.md | 2 ++ lithops/monitor.py | 50 ++++++++++++++++++++++++++++++---------------- lithops/wait.py | 2 ++ 3 files changed, 37 insertions(+), 17 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 523098d02..67b890343 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,6 +14,8 @@ - [IBM CE] Sanitize user_key in IBM CE to be RFC 1233 compliant - [CLI] Fix storage list error - [K8s] Fixed bug with first execution of K8s and Singularity +- [Core] Prevent job monitor from stopping abruptly on iteration error causing hanging jobs + ## [v3.6.0] diff --git a/lithops/monitor.py b/lithops/monitor.py index 860eabfd0..5f7c2e570 100644 --- a/lithops/monitor.py +++ b/lithops/monitor.py @@ -426,9 +426,30 @@ def _generate_tokens(self, callids_running, callids_done): self.callids_running_processed.update(callids_running_to_process) self.callids_done_processed.update(callids_done_to_process) + def _poll_and_process_job_status(self, previous_log, log_time): + """ + Polls the storage backend for job status, updates futures, + and prints status logs. + + Returns: + new_callids_done (set): New callids that were marked as done. + previous_log (str): Updated log message. + log_time (float): Updated log time counter. + """ + callids_running, callids_done = self.internal_storage.get_job_status(self.executor_id) + new_callids_done = callids_done - self.callids_done_processed_status + + self._generate_tokens(callids_running, callids_done) + self._tag_future_as_running(callids_running) + self._tag_future_as_ready(callids_done) + + previous_log, log_time = self._print_status_log(previous_log, log_time) + + return new_callids_done, previous_log, log_time + def run(self): """ - Run method + Run method for the Storage job monitor thread. """ logger.debug(f'ExecutorID {self.executor_id} - Starting Storage job monitor') @@ -436,31 +457,23 @@ def run(self): previous_log = None log_time = 0 - def process_callids(): - nonlocal previous_log, log_time - callids_running, callids_done = self.internal_storage.get_job_status(self.executor_id) - # verify if there are new callids_done and reduce the sleep - new_callids_done = callids_done - self.callids_done_processed_status - # generate tokens and mark futures as running/done - self._generate_tokens(callids_running, callids_done) - self._tag_future_as_running(callids_running) - self._tag_future_as_ready(callids_done) - previous_log, log_time = self._print_status_log(previous_log, log_time) - - return new_callids_done - while not self._all_ready(): time.sleep(wait_dur_sec) wait_dur_sec = self.monitoring_interval log_time += wait_dur_sec if not self.should_run: + logger.debug(f'ExecutorID {self.executor_id} - Monitor stopped externally') break - if len(process_callids()) > 0: - wait_dur_sec = self.monitoring_interval / 5 + try: + new_callids_done, previous_log, log_time = self._poll_and_process_job_status(previous_log, log_time) + if new_callids_done: + wait_dur_sec = self.monitoring_interval / 5 + except Exception as e: + logger.error(f'ExecutorID {self.executor_id} - Error during monitor: {e}', exc_info=True) - process_callids() + self._poll_and_process_job_status(previous_log, log_time) logger.debug(f'ExecutorID {self.executor_id} - Storage job monitor finished') @@ -509,6 +522,9 @@ def start(self, fs, job_id=None, chunksize=None, generate_tokens=False): if not self.monitor.is_alive(): self.monitor.start() + def is_alive(self): + self.monitor.is_alive() + def remove(self, fs): if self.monitor and self.monitor.is_alive(): self.monitor.remove_futures(fs) diff --git a/lithops/wait.py b/lithops/wait.py index 61da721c5..22f12b2e8 100644 --- a/lithops/wait.py +++ b/lithops/wait.py @@ -142,6 +142,8 @@ def wait(fs: Union[ResponseFuture, FuturesList, List[ResponseFuture]], threadpool_size=threadpool_size) else: while not _check_done(fs, return_when, download_results): + if not job_monitor.is_alive(): + job_monitor.start(fs=fs) for executor_data in executors_data: new_data = _get_executor_data(fs, executor_data, pbar=pbar, throw_except=throw_except, From 99a6cd0d32d9ef6e6586e160945093927a4bed6f Mon Sep 17 00:00:00 2001 From: JosepSampe Date: Mon, 21 Jul 2025 19:48:17 +0200 Subject: [PATCH 2/2] Update _all_ready method in job monitor --- lithops/monitor.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/lithops/monitor.py b/lithops/monitor.py index 5f7c2e570..dd1a2f7fe 100644 --- a/lithops/monitor.py +++ b/lithops/monitor.py @@ -90,7 +90,10 @@ def _all_ready(self): """ Checks if all futures are ready, success or done """ - return all([f.ready or f.success or f.done for f in self.futures]) + try: + return all(f.ready or f.success or f.done for f in self.futures) + except Exception: + return False def _check_new_futures(self, call_status, f): """Checks if a functions returned new futures to track"""