From 0eccd28691219d6d2c4a5eb2f1e9668705d3f573 Mon Sep 17 00:00:00 2001 From: Lutz Bender Date: Thu, 7 Aug 2025 10:00:47 +0200 Subject: [PATCH 1/2] implement handler locking --- packages/helpermodules/utils/__init__.py | 1 - packages/helpermodules/utils/_exit_after.py | 26 ------- packages/main.py | 86 ++++++++++++++++----- 3 files changed, 66 insertions(+), 47 deletions(-) delete mode 100644 packages/helpermodules/utils/_exit_after.py diff --git a/packages/helpermodules/utils/__init__.py b/packages/helpermodules/utils/__init__.py index ce3cdae0d2..7253bf39e9 100644 --- a/packages/helpermodules/utils/__init__.py +++ b/packages/helpermodules/utils/__init__.py @@ -1,3 +1,2 @@ -from helpermodules.utils._exit_after import exit_after from helpermodules.utils._thread_handler import joined_thread_handler, thread_handler from helpermodules.utils.processing_counter import ProcessingCounter diff --git a/packages/helpermodules/utils/_exit_after.py b/packages/helpermodules/utils/_exit_after.py deleted file mode 100644 index 717915b0ca..0000000000 --- a/packages/helpermodules/utils/_exit_after.py +++ /dev/null @@ -1,26 +0,0 @@ -import _thread as thread -from threading import Timer -import sys - - -def quit_function(fn_name): - sys.stderr.flush() # Python 3 stderr is likely buffered. - thread.interrupt_main() # raises KeyboardInterrupt - - -def exit_after(s): - ''' https://stackoverflow.com/questions/492519/timeout-on-a-function-call - use as decorator to exit process if - function takes longer than s seconds - ''' - def outer(fn): - def inner(*args, **kwargs): - timer = Timer(s, quit_function, args=[fn.__name__]) - timer.start() - try: - result = fn(*args, **kwargs) - finally: - timer.cancel() - return result - return inner - return outer diff --git a/packages/main.py b/packages/main.py index 34538fd566..08960f8870 100755 --- a/packages/main.py +++ b/packages/main.py @@ -7,6 +7,7 @@ from helpermodules.utils import run_command, thread_handler import threading import sys +import functools # als erstes logging initialisieren, damit auch ImportError geloggt werden logger.setup_logging() @@ -28,7 +29,6 @@ from helpermodules.measurement_logging.write_log import LogType, save_log from helpermodules.modbusserver import start_modbus_server from helpermodules.pub import Pub -from helpermodules.utils import exit_after from modules import configuration, loadvars, update_soc from modules.internal_chargepoint_handler.internal_chargepoint_handler import GeneralInternalChargepointHandler from modules.internal_chargepoint_handler.gpio import InternalGpioHandler @@ -41,12 +41,63 @@ class HandlerAlgorithm: def __init__(self): self.interval_counter = 1 self.current_day = None + self.handler_locks = {} + self.handler_timestamps = {} + def __acquire_lock(self, handler_name, error_threshold=60): + """Versucht, den Lock für den angegebenen Handler zu erwerben. + Erstellt Lock und Timestamp-Eintrag bei Bedarf dynamisch. + Gibt True zurück, wenn der Lock erfolgreich erworben wurde, sonst False. + """ + if handler_name not in self.handler_locks: + self.handler_locks[handler_name] = threading.Lock() + if handler_name not in self.handler_timestamps: + self.handler_timestamps[handler_name] = 0 + + lock = self.handler_locks[handler_name] + now = time.time() + if lock.acquire(blocking=False): + self.handler_timestamps[handler_name] = now + log.debug(f"Lock für {handler_name} erworben.") + return True + # Wenn der Lock älter als 'error_threshold' Sekunden ist, wird ein Error geloggt. + log_handler = log.error if now - self.handler_timestamps[handler_name] > error_threshold else log.debug + log_handler( + f"{handler_name} läuft bereits, neuer Aufruf wird übersprungen. Letzter Start: " + f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(self.handler_timestamps[handler_name]))} " + f"(vor {now - self.handler_timestamps[handler_name]} Sekunden).") + return False + + def __release_lock(self, handler_name): + """Gibt den Lock für den angegebenen Handler frei.""" + lock = self.handler_locks.get(handler_name) + if lock: + lock.release() + log.debug(f"Lock für {handler_name} freigegeben nach {time.time() - self.handler_timestamps[handler_name]} Sekunden.") + else: + log.warning(f"Lock für {handler_name} nicht gefunden.") + + def __with_handler_lock(error_threshold=60): + def decorator(func): + @functools.wraps(func) + def wrapper(self, *args, **kwargs): + handler_name = func.__name__ + if self.__acquire_lock(handler_name, error_threshold): + try: + return func(self, *args, **kwargs) + finally: + self.__release_lock(handler_name) + else: + return + return wrapper + return decorator + + # decorator can not be used here as it would block logging before handler_with_control_interval() + # @__with_handler_lock(error_threshold=30) def handler10Sec(self): """ führt den Algorithmus durch. """ try: - @exit_after(data.data.general_data.data.control_interval) def handler_with_control_interval(): if (data.data.general_data.data.control_interval / 10) == self.interval_counter: data.data.copy_data() @@ -85,13 +136,16 @@ def handler_with_control_interval(): for line in stack_trace: logging.debug(line.strip()) Pub().pub("openWB/set/system/time", timecheck.create_timestamp()) - handler_with_control_interval() - except KeyboardInterrupt: - log.critical("Ausführung durch exit_after gestoppt: "+traceback.format_exc()) + if not self.__acquire_lock("handler10Sec", error_threshold=30): + return + try: + handler_with_control_interval() + finally: + self.__release_lock("handler10Sec") except Exception: log.exception("Fehler im Main-Modul") - @exit_after(10) + @__with_handler_lock(error_threshold=60) def handler5MinAlgorithm(self): """ Handler, der alle 5 Minuten aufgerufen wird und die Heartbeats der Threads überprüft und die Aufgaben ausführt, die nur alle 5 Minuten ausgeführt werden müssen. @@ -104,12 +158,10 @@ def handler5MinAlgorithm(self): data.data.general_data.grid_protection() data.data.optional_data.ocpp_transfer_meter_values() data.data.counter_all_data.validate_hierarchy() - except KeyboardInterrupt: - log.critical("Ausführung durch exit_after gestoppt: "+traceback.format_exc()) except Exception: log.exception("Fehler im Main-Modul") - @exit_after(10) + @__with_handler_lock(error_threshold=60) def handler5Min(self): """ Handler, der alle 5 Minuten aufgerufen wird und die Heartbeats der Threads überprüft und die Aufgaben ausführt, die nur alle 5 Minuten ausgeführt werden müssen. @@ -139,41 +191,35 @@ def handler5Min(self): general_internal_chargepoint_handler.internal_chargepoint_handler.heartbeat = False with ChangedValuesContext(loadvars_.event_module_update_completed): sub.system_data["system"].update_ip_address() - except KeyboardInterrupt: - log.critical("Ausführung durch exit_after gestoppt: "+traceback.format_exc()) except Exception: log.exception("Fehler im Main-Modul") - @exit_after(10) + @__with_handler_lock(error_threshold=60) def handler_midnight(self): try: save_log(LogType.MONTHLY) thread_errors_path = Path(Path(__file__).resolve().parents[1]/"ramdisk"/"thread_errors.log") with thread_errors_path.open("w") as f: f.write("") - except KeyboardInterrupt: - log.critical("Ausführung durch exit_after gestoppt: "+traceback.format_exc()) except Exception: log.exception("Fehler im Main-Modul") - @exit_after(10) + @__with_handler_lock(error_threshold=60) def handler_random_nightly(self): try: data.data.system_data["system"].thread_backup_and_send_to_cloud() - except KeyboardInterrupt: - log.critical("Ausführung durch exit_after gestoppt: "+traceback.format_exc()) except Exception: log.exception("Fehler im Main-Modul") - @exit_after(10) + @__with_handler_lock(error_threshold=60) def handler_hour(self): + """ Handler, der jede Stunde aufgerufen wird und die Aufgaben ausführt, die nur jede Stunde ausgeführt werden müssen. + """ try: with ChangedValuesContext(loadvars_.event_module_update_completed): for cp in data.data.cp_data.values(): calculate_charge_cost(cp) data.data.optional_data.et_get_prices() - except KeyboardInterrupt: - log.critical("Ausführung durch exit_after gestoppt: "+traceback.format_exc()) except Exception: log.exception("Fehler im Main-Modul") From ff9885b6bb939e5c2190acee4f829fb74cf6153b Mon Sep 17 00:00:00 2001 From: Lutz Bender Date: Fri, 8 Aug 2025 12:51:03 +0200 Subject: [PATCH 2/2] implement monitoring and emergency exit --- packages/main.py | 52 +++++++++++++++++++++++++++++++++++------------- 1 file changed, 38 insertions(+), 14 deletions(-) diff --git a/packages/main.py b/packages/main.py index 08960f8870..9bf6fa5d6f 100755 --- a/packages/main.py +++ b/packages/main.py @@ -74,6 +74,7 @@ def __release_lock(self, handler_name): if lock: lock.release() log.debug(f"Lock für {handler_name} freigegeben nach {time.time() - self.handler_timestamps[handler_name]} Sekunden.") + self.handler_timestamps.pop(handler_name, None) else: log.warning(f"Lock für {handler_name} nicht gefunden.") @@ -92,6 +93,39 @@ def wrapper(self, *args, **kwargs): return wrapper return decorator + def monitor_handler_locks(self, max_runtime=300): + emergency_exit = False + for handler_name, lock in self.handler_locks.items(): + if lock.locked(): + # Überprüfen, wie lange der Lock bereits aktiv ist + duration = time.time() - self.handler_timestamps[handler_name] + log.warning(f"Handler {handler_name} ist seit {duration} Sekunden gesperrt.") + # Stack Trace des Threads, der den Lock hält + for tid, frame in sys._current_frames().items(): + if tid == lock._get_ident(): + stack_trace = traceback.format_stack(frame) + log.warning(f"Stack Trace für {handler_name}:") + for line in stack_trace: + log.warning(line.strip()) + if duration > max_runtime: + log.error(f"Handler {handler_name} ist seit mehr als {max_runtime} Sekunden gesperrt.") + emergency_exit = True + if emergency_exit: + log.error(f"Deadlock erkannt, Prozess wird beendet!") + log.debug(f"Threads: {enumerate()}") + for thread in enumerate(): + logging.debug(f"Thread Name: {thread.name}") + if hasattr(thread, "ident"): + thread_id = thread.ident + for tid, frame in sys._current_frames().items(): + if tid == thread_id: + logging.debug(f" File: {frame.f_code.co_filename}, Line: {frame.f_lineno}, Function: {frame.f_code.co_name}") + stack_trace = traceback.format_stack(frame) + logging.debug(" Stack Trace:") + for line in stack_trace: + logging.debug(line.strip()) + sys.exit(1) + # decorator can not be used here as it would block logging before handler_with_control_interval() # @__with_handler_lock(error_threshold=30) def handler10Sec(self): @@ -121,20 +155,8 @@ def handler_with_control_interval(): else: self.interval_counter = self.interval_counter + 1 log.info("# ***Start*** ") - log.debug(run_command.run_shell_command("top -b -n 1 | head -n 20")) - log.debug(f'Drosselung: {run_command.run_shell_command("if which vcgencmd >/dev/null; then vcgencmd get_throttled; else echo not found; fi")}') - log.debug(f"Threads: {enumerate()}") - for thread in threading.enumerate(): - logging.debug(f"Thread Name: {thread.name}") - if hasattr(thread, "ident"): - thread_id = thread.ident - for tid, frame in sys._current_frames().items(): - if tid == thread_id: - logging.debug(f" File: {frame.f_code.co_filename}, Line: {frame.f_lineno}, Function: {frame.f_code.co_name}") - stack_trace = traceback.format_stack(frame) - logging.debug(" Stack Trace:") - for line in stack_trace: - logging.debug(line.strip()) + # log.debug(run_command.run_shell_command("top -b -n 1 | head -n 20")) + # log.debug(f'Drosselung: {run_command.run_shell_command("if which vcgencmd >/dev/null; then vcgencmd get_throttled; else echo not found; fi")}') Pub().pub("openWB/set/system/time", timecheck.create_timestamp()) if not self.__acquire_lock("handler10Sec", error_threshold=30): return @@ -235,6 +257,8 @@ def schedule_jobs(): schedule.every().day.at(f"0{randrange(0, 5)}:{randrange(0, 59):02d}:{randrange(0, 59):02d}").do( handler.handler_random_nightly) [schedule.every().minute.at(f":{i:02d}").do(handler.handler10Sec).tag("algorithm") for i in range(0, 60, 10)] + # 30 Sekunden Handler, der die Locks überwacht, Deadlocks erkennt, loggt und ggf. den Prozess beendet + schedule.every(30).seconds.do(handler.monitor_handler_locks, max_runtime=600) try: