Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
- [IBM CE] Sanitize user_key in IBM CE to be RFC 1233 compliant
- [CLI] Fix storage list error
- [K8s] Fixed bug with first execution of K8s and Singularity
- [Core] Prevent job monitor from stopping abruptly on iteration error causing hanging jobs


## [v3.6.0]

Expand Down
55 changes: 37 additions & 18 deletions lithops/monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,10 @@ def _all_ready(self):
"""
Checks if all futures are ready, success or done
"""
return all([f.ready or f.success or f.done for f in self.futures])
try:
return all(f.ready or f.success or f.done for f in self.futures)
except Exception:
return False

def _check_new_futures(self, call_status, f):
"""Checks if a functions returned new futures to track"""
Expand Down Expand Up @@ -426,41 +429,54 @@ def _generate_tokens(self, callids_running, callids_done):
self.callids_running_processed.update(callids_running_to_process)
self.callids_done_processed.update(callids_done_to_process)

def _poll_and_process_job_status(self, previous_log, log_time):
"""
Polls the storage backend for job status, updates futures,
and prints status logs.

Returns:
new_callids_done (set): New callids that were marked as done.
previous_log (str): Updated log message.
log_time (float): Updated log time counter.
"""
callids_running, callids_done = self.internal_storage.get_job_status(self.executor_id)
new_callids_done = callids_done - self.callids_done_processed_status

self._generate_tokens(callids_running, callids_done)
self._tag_future_as_running(callids_running)
self._tag_future_as_ready(callids_done)

previous_log, log_time = self._print_status_log(previous_log, log_time)

return new_callids_done, previous_log, log_time

def run(self):
"""
Run method
Run method for the Storage job monitor thread.
"""
logger.debug(f'ExecutorID {self.executor_id} - Starting Storage job monitor')

wait_dur_sec = self.monitoring_interval
previous_log = None
log_time = 0

def process_callids():
nonlocal previous_log, log_time
callids_running, callids_done = self.internal_storage.get_job_status(self.executor_id)
# verify if there are new callids_done and reduce the sleep
new_callids_done = callids_done - self.callids_done_processed_status
# generate tokens and mark futures as running/done
self._generate_tokens(callids_running, callids_done)
self._tag_future_as_running(callids_running)
self._tag_future_as_ready(callids_done)
previous_log, log_time = self._print_status_log(previous_log, log_time)

return new_callids_done

while not self._all_ready():
time.sleep(wait_dur_sec)
wait_dur_sec = self.monitoring_interval
log_time += wait_dur_sec

if not self.should_run:
logger.debug(f'ExecutorID {self.executor_id} - Monitor stopped externally')
break

if len(process_callids()) > 0:
wait_dur_sec = self.monitoring_interval / 5
try:
new_callids_done, previous_log, log_time = self._poll_and_process_job_status(previous_log, log_time)
if new_callids_done:
wait_dur_sec = self.monitoring_interval / 5
except Exception as e:
logger.error(f'ExecutorID {self.executor_id} - Error during monitor: {e}', exc_info=True)

process_callids()
self._poll_and_process_job_status(previous_log, log_time)

logger.debug(f'ExecutorID {self.executor_id} - Storage job monitor finished')

Expand Down Expand Up @@ -509,6 +525,9 @@ def start(self, fs, job_id=None, chunksize=None, generate_tokens=False):
if not self.monitor.is_alive():
self.monitor.start()

def is_alive(self):
self.monitor.is_alive()

def remove(self, fs):
if self.monitor and self.monitor.is_alive():
self.monitor.remove_futures(fs)
Expand Down
2 changes: 2 additions & 0 deletions lithops/wait.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,8 @@ def wait(fs: Union[ResponseFuture, FuturesList, List[ResponseFuture]],
threadpool_size=threadpool_size)
else:
while not _check_done(fs, return_when, download_results):
if not job_monitor.is_alive():
job_monitor.start(fs=fs)
for executor_data in executors_data:
new_data = _get_executor_data(fs, executor_data, pbar=pbar,
throw_except=throw_except,
Expand Down