diff --git a/lithops/monitor.py b/lithops/monitor.py index ce68fa2b..2e8b48ad 100644 --- a/lithops/monitor.py +++ b/lithops/monitor.py @@ -38,12 +38,15 @@ class Monitor(threading.Thread): Monitor base class """ - def __init__(self, executor_id, - internal_storage, - token_bucket_q, - job_chunksize, - generate_tokens, - config): + def __init__( + self, + executor_id, + internal_storage, + token_bucket_q, + job_chunksize, + generate_tokens, + config, + ): super().__init__() self.executor_id = executor_id @@ -97,14 +100,14 @@ def _all_ready(self): def _check_new_futures(self, call_status, f): """Checks if a functions returned new futures to track""" - if 'new_futures' not in call_status: + if "new_futures" not in call_status: return False f._set_futures(call_status) self.futures.update(f._new_futures) logger.debug( - f'ExecutorID {self.executor_id} - Received {len(f._new_futures)} ' - 'new function Futures to track' + f"ExecutorID {self.executor_id} - Received {len(f._new_futures)} " + "new function Futures to track" ) return True @@ -117,23 +120,25 @@ def _future_timeout_checker(self, futures): futures_running = [f for f in futures if f.running and f._call_status] for fut in futures_running: try: - start_tstamp = fut._call_status['worker_start_tstamp'] + start_tstamp = fut._call_status["worker_start_tstamp"] fut_timeout = start_tstamp + fut.execution_timeout + 5 if current_time > fut_timeout: msg = f"The function exceeded the execution timeout of {fut.execution_timeout} seconds." - raise TimeoutError('HANDLER', msg) + raise TimeoutError("HANDLER", msg) except TimeoutError: # generate fake TimeoutError call status pickled_exception = str(pickle.dumps(sys.exc_info())) - call_status = {'type': '__end__', - 'exception': True, - 'exc_info': pickled_exception, - 'executor_id': fut.executor_id, - 'job_id': fut.job_id, - 'call_id': fut.call_id, - 'activation_id': fut.activation_id, - 'worker_start_tstamp': start_tstamp, - 'worker_end_tstamp': time.time()} + call_status = { + "type": "__end__", + "exception": True, + "exc_info": pickled_exception, + "executor_id": fut.executor_id, + "job_id": fut.job_id, + "call_id": fut.call_id, + "activation_id": fut.activation_id, + "worker_start_tstamp": start_tstamp, + "worker_end_tstamp": time.time(), + } fut._set_ready(call_status) def _print_status_log(self, previous_log=None, log_time=None): @@ -143,9 +148,15 @@ def _print_status_log(self, previous_log=None, log_time=None): callids_pending = len([f for f in self.futures if f.invoked]) callids_running = len([f for f in self.futures if f.running]) callids_done = len([f for f in self.futures if f.ready or f.success or f.done]) - if (callids_pending, callids_running, callids_done) != previous_log or log_time > LOG_INTERVAL: - logger.debug(f'ExecutorID {self.executor_id} - Pending: {callids_pending} ' - f'- Running: {callids_running} - Done: {callids_done}') + if ( + callids_pending, + callids_running, + callids_done, + ) != previous_log or log_time > LOG_INTERVAL: + logger.debug( + f"ExecutorID {self.executor_id} - Pending: {callids_pending} " + f"- Running: {callids_running} - Done: {callids_done}" + ) log_time = 0 return (callids_pending, callids_running, callids_done), log_time @@ -153,13 +164,13 @@ def _print_status_log(self, previous_log=None, log_time=None): class RabbitmqMonitor(Monitor): def __init__( - self, - executor_id, - internal_storage, - token_bucket_q, - job_chunksize, - generate_tokens, - config + self, + executor_id, + internal_storage, + token_bucket_q, + job_chunksize, + generate_tokens, + config, ): super().__init__( executor_id, @@ -167,11 +178,11 @@ def __init__( token_bucket_q, job_chunksize, generate_tokens, - config + config, ) - self.rabbit_amqp_url = config.get('amqp_url') - self.queue = f'lithops-{self.executor_id}' + self.rabbit_amqp_url = config.get("amqp_url") + self.queue = f"lithops-{self.executor_id}" self.tag = None self._create_resources() @@ -179,7 +190,9 @@ def _create_resources(self): """ Creates RabbitMQ queues and exchanges of a given job """ - logger.debug(f'ExecutorID {self.executor_id} - Creating RabbitMQ queue {self.queue}') + logger.debug( + f"ExecutorID {self.executor_id} - Creating RabbitMQ queue {self.queue}" + ) self.pikaparams = pika.URLParameters(self.rabbit_amqp_url) self.connection = pika.BlockingConnection(self.pikaparams) @@ -210,9 +223,15 @@ def _tag_future_as_running(self, call_status): """ Assigns a call_status to its future """ - not_running_futures = [f for f in self.futures if not (f.running or f.ready or f.success or f.done)] + not_running_futures = [ + f for f in self.futures if not (f.running or f.ready or f.success or f.done) + ] for f in not_running_futures: - calljob_id = (call_status['executor_id'], call_status['job_id'], call_status['call_id']) + calljob_id = ( + call_status["executor_id"], + call_status["job_id"], + call_status["call_id"], + ) if (f.executor_id, f.job_id, f.call_id) == calljob_id: f._set_running(call_status) @@ -220,9 +239,15 @@ def _tag_future_as_ready(self, call_status): """ tags a future as ready based on call_status """ - not_ready_futures = [f for f in self.futures if not (f.ready or f.success or f.done)] + not_ready_futures = [ + f for f in self.futures if not (f.ready or f.success or f.done) + ] for f in not_ready_futures: - calljob_id = (call_status['executor_id'], call_status['job_id'], call_status['call_id']) + calljob_id = ( + call_status["executor_id"], + call_status["job_id"], + call_status["call_id"], + ) if (f.executor_id, f.job_id, f.call_id) == calljob_id: if not self._check_new_futures(call_status, f): f._set_ready(call_status) @@ -234,20 +259,26 @@ def _generate_tokens(self, call_status): if not self.generate_tokens or not self.should_run: return - call_id = (call_status['executor_id'], call_status['job_id'], call_status['call_id']) - worker_id = call_status['activation_id'] + call_id = ( + call_status["executor_id"], + call_status["job_id"], + call_status["call_id"], + ) + worker_id = call_status["activation_id"] if worker_id not in self.callids_done_worker: self.callids_done_worker[worker_id] = [] self.callids_done_worker[worker_id].append(call_id) - if worker_id not in self.workers_done and \ - len(self.callids_done_worker[worker_id]) == call_status['chunksize']: + if ( + worker_id not in self.workers_done + and len(self.callids_done_worker[worker_id]) == call_status["chunksize"] + ): self.workers_done.append(worker_id) if self.should_run: - self.token_bucket_q.put('#') + self.token_bucket_q.put("#") def run(self): - logger.debug(f'ExecutorID {self.executor_id} | Starting RabbitMQ job monitor') + logger.debug(f"ExecutorID {self.executor_id} | Starting RabbitMQ job monitor") SLEEP_TIME = 2 channel = self.connection.channel() @@ -255,10 +286,10 @@ def run(self): def callback(ch, method, properties, body): call_status = json.loads(body.decode("utf-8")) - if call_status['type'] == '__init__': + if call_status["type"] == "__init__": self._tag_future_as_running(call_status) - elif call_status['type'] == '__end__': + elif call_status["type"] == "__end__": self._generate_tokens(call_status) self._tag_future_as_ready(call_status) @@ -271,7 +302,9 @@ def manage_timeouts(): log_time = 0 while self.should_run and not self._all_ready(): # Format call_ids running, pending and done - prevoius_log, log_time = self._print_status_log(previous_log=prevoius_log, log_time=log_time) + prevoius_log, log_time = self._print_status_log( + previous_log=prevoius_log, log_time=log_time + ) self._future_timeout_checker(self.futures) time.sleep(SLEEP_TIME) log_time += SLEEP_TIME @@ -282,7 +315,7 @@ def manage_timeouts(): channel.start_consuming() self.tag = None self._print_status_log() - logger.debug(f'ExecutorID {self.executor_id} | RabbitMQ job monitor finished') + logger.debug(f"ExecutorID {self.executor_id} | RabbitMQ job monitor finished") class StorageMonitor(Monitor): @@ -290,13 +323,13 @@ class StorageMonitor(Monitor): THREADPOOL_SIZE = 64 def __init__( - self, - executor_id, - internal_storage, - token_bucket_q, - job_chunksize, - generate_tokens, - config + self, + executor_id, + internal_storage, + token_bucket_q, + job_chunksize, + generate_tokens, + config, ): super().__init__( executor_id, @@ -304,10 +337,10 @@ def __init__( token_bucket_q, job_chunksize, generate_tokens, - config + config, ) - self.monitoring_interval = config['monitoring_interval'] + self.monitoring_interval = config["monitoring_interval"] # vars for _generate_tokens self.callids_running_worker = {} @@ -331,14 +364,20 @@ def _tag_future_as_running(self, callids_running): Mark which futures are in running status based on callids_running """ current_time = time.time() - not_running_futures = [f for f in self.futures if not (f.running or f.ready or f.success or f.done)] - callids_running_to_process = callids_running - self.callids_running_processed_timeout + not_running_futures = [ + f for f in self.futures if not (f.running or f.ready or f.success or f.done) + ] + callids_running_to_process = ( + callids_running - self.callids_running_processed_timeout + ) for f in not_running_futures: for call in callids_running_to_process: if f.invoked and (f.executor_id, f.job_id, f.call_id) == call[0]: - call_status = {'type': '__init__', - 'activation_id': call[1], - 'worker_start_tstamp': current_time} + call_status = { + "type": "__init__", + "activation_id": call[1], + "worker_start_tstamp": current_time, + } f._set_running(call_status) self.callids_running_processed_timeout.update(callids_running_to_process) @@ -348,7 +387,9 @@ def _tag_future_as_ready(self, callids_done): """ Mark which futures has a call_status ready to be downloaded """ - not_ready_futures = [f for f in self.futures if not (f.ready or f.success or f.done)] + not_ready_futures = [ + f for f in self.futures if not (f.ready or f.success or f.done) + ] callids_done_to_process = callids_done - self.callids_done_processed_status fs_to_query = [] @@ -364,7 +405,9 @@ def _tag_future_as_ready(self, callids_done): return def get_status(f): - cs = self.internal_storage.get_call_status(f.executor_id, f.job_id, f.call_id) + cs = self.internal_storage.get_call_status( + f.executor_id, f.job_id, f.call_id + ) f._status_query_count += 1 if cs: if not self._check_new_futures(cs, f): @@ -418,11 +461,13 @@ def _generate_tokens(self, callids_running, callids_done): if job_id not in self.present_jobs: continue chunksize = self.job_chunksize[job_id] - if worker_id not in self.workers_done and \ - len(self.callids_done_worker[worker_id]) == chunksize: + if ( + worker_id not in self.workers_done + and len(self.callids_done_worker[worker_id]) == chunksize + ): self.workers_done.append(worker_id) if self.should_run: - self.token_bucket_q.put('#') + self.token_bucket_q.put("#") else: break @@ -439,7 +484,9 @@ def _poll_and_process_job_status(self, previous_log, log_time): previous_log (str): Updated log message. log_time (float): Updated log time counter. """ - callids_running, callids_done = self.internal_storage.get_job_status(self.executor_id) + callids_running, callids_done = self.internal_storage.get_job_status( + self.executor_id + ) new_callids_done = callids_done - self.callids_done_processed_status self._generate_tokens(callids_running, callids_done) @@ -453,32 +500,39 @@ def _poll_and_process_job_status(self, previous_log, log_time): def run(self): """ Run method for the Storage job monitor thread. + + Uses `while self.should_run` instead of `while not self._all_ready()` to + prevent race condition where monitor exits before newly added futures + are processed. See: https://github.com/lithops-cloud/lithops/issues/1449 """ - logger.debug(f'ExecutorID {self.executor_id} - Starting Storage job monitor') + logger.debug(f"ExecutorID {self.executor_id} - Starting Storage job monitor") wait_dur_sec = self.monitoring_interval previous_log = None log_time = 0 - while not self._all_ready(): - time.sleep(wait_dur_sec) - wait_dur_sec = self.monitoring_interval - log_time += wait_dur_sec - - if not self.should_run: - logger.debug(f'ExecutorID {self.executor_id} - Monitor stopped externally') - break - + while self.should_run: try: - new_callids_done, previous_log, log_time = self._poll_and_process_job_status(previous_log, log_time) + new_callids_done, previous_log, log_time = ( + self._poll_and_process_job_status(previous_log, log_time) + ) if new_callids_done: wait_dur_sec = self.monitoring_interval / 5 + else: + wait_dur_sec = self.monitoring_interval except Exception as e: - logger.error(f'ExecutorID {self.executor_id} - Error during monitor: {e}', exc_info=True) + logger.error( + f"ExecutorID {self.executor_id} - Error during monitor: {e}", + exc_info=True, + ) + + time.sleep(wait_dur_sec) + log_time += wait_dur_sec + # Final poll after stop requested self._poll_and_process_job_status(previous_log, log_time) - logger.debug(f'ExecutorID {self.executor_id} - Storage job monitor finished') + logger.debug(f"ExecutorID {self.executor_id} - Storage job monitor finished") class JobMonitor: @@ -489,21 +543,20 @@ def __init__(self, executor_id, internal_storage, config=None): self.storage_config = internal_storage.get_storage_config() self.storage_backend = internal_storage.backend self.config = config - self.type = self.config['lithops']['monitoring'].lower() if config else 'storage' + self.type = ( + self.config["lithops"]["monitoring"].lower() if config else "storage" + ) self.token_bucket_q = queue.Queue() self.monitor = None self.job_chunksize = {} - self.MonitorClass = getattr( - lithops.monitor, - f'{self.type.capitalize()}Monitor' - ) + self.MonitorClass = getattr(lithops.monitor, f"{self.type.capitalize()}Monitor") def start(self, fs, job_id=None, chunksize=None, generate_tokens=False): - if self.type == 'storage': - monitoring_interval = self.storage_config['monitoring_interval'] - monitor_config = {'monitoring_interval': monitoring_interval} + if self.type == "storage": + monitoring_interval = self.storage_config["monitoring_interval"] + monitor_config = {"monitoring_interval": monitoring_interval} else: monitor_config = self.config.get(self.type) @@ -517,7 +570,7 @@ def start(self, fs, job_id=None, chunksize=None, generate_tokens=False): token_bucket_q=self.token_bucket_q, job_chunksize=self.job_chunksize, generate_tokens=generate_tokens, - config=monitor_config + config=monitor_config, ) self.monitor.add_futures(fs)