Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion packages/helpermodules/utils/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,2 @@
from helpermodules.utils._exit_after import exit_after
from helpermodules.utils._thread_handler import joined_thread_handler, thread_handler
from helpermodules.utils.processing_counter import ProcessingCounter
26 changes: 0 additions & 26 deletions packages/helpermodules/utils/_exit_after.py

This file was deleted.

138 changes: 104 additions & 34 deletions packages/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from helpermodules.utils import run_command, thread_handler
import threading
import sys
import functools

# als erstes logging initialisieren, damit auch ImportError geloggt werden
logger.setup_logging()
Expand All @@ -28,7 +29,6 @@
from helpermodules.measurement_logging.write_log import LogType, save_log
from helpermodules.modbusserver import start_modbus_server
from helpermodules.pub import Pub
from helpermodules.utils import exit_after
from modules import configuration, loadvars, update_soc
from modules.internal_chargepoint_handler.internal_chargepoint_handler import GeneralInternalChargepointHandler
from modules.internal_chargepoint_handler.gpio import InternalGpioHandler
Expand All @@ -41,12 +41,97 @@ class HandlerAlgorithm:
def __init__(self):
self.interval_counter = 1
self.current_day = None
self.handler_locks = {}
self.handler_timestamps = {}

def __acquire_lock(self, handler_name, error_threshold=60):
"""Versucht, den Lock für den angegebenen Handler zu erwerben.
Erstellt Lock und Timestamp-Eintrag bei Bedarf dynamisch.
Gibt True zurück, wenn der Lock erfolgreich erworben wurde, sonst False.
"""
if handler_name not in self.handler_locks:
self.handler_locks[handler_name] = threading.Lock()
if handler_name not in self.handler_timestamps:
self.handler_timestamps[handler_name] = 0

lock = self.handler_locks[handler_name]
now = time.time()
if lock.acquire(blocking=False):
self.handler_timestamps[handler_name] = now
log.debug(f"Lock für {handler_name} erworben.")
return True
# Wenn der Lock älter als 'error_threshold' Sekunden ist, wird ein Error geloggt.
log_handler = log.error if now - self.handler_timestamps[handler_name] > error_threshold else log.debug
log_handler(
f"{handler_name} läuft bereits, neuer Aufruf wird übersprungen. Letzter Start: "
f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(self.handler_timestamps[handler_name]))} "
f"(vor {now - self.handler_timestamps[handler_name]} Sekunden).")
return False

def __release_lock(self, handler_name):
"""Gibt den Lock für den angegebenen Handler frei."""
lock = self.handler_locks.get(handler_name)
if lock:
lock.release()
log.debug(f"Lock für {handler_name} freigegeben nach {time.time() - self.handler_timestamps[handler_name]} Sekunden.")
self.handler_timestamps.pop(handler_name, None)
else:
log.warning(f"Lock für {handler_name} nicht gefunden.")

def __with_handler_lock(error_threshold=60):
def decorator(func):
@functools.wraps(func)
def wrapper(self, *args, **kwargs):
handler_name = func.__name__
if self.__acquire_lock(handler_name, error_threshold):
try:
return func(self, *args, **kwargs)
finally:
self.__release_lock(handler_name)
else:
return
return wrapper
return decorator

def monitor_handler_locks(self, max_runtime=300):
emergency_exit = False
for handler_name, lock in self.handler_locks.items():
if lock.locked():
# Überprüfen, wie lange der Lock bereits aktiv ist
duration = time.time() - self.handler_timestamps[handler_name]
log.warning(f"Handler {handler_name} ist seit {duration} Sekunden gesperrt.")
# Stack Trace des Threads, der den Lock hält
for tid, frame in sys._current_frames().items():
if tid == lock._get_ident():
stack_trace = traceback.format_stack(frame)
log.warning(f"Stack Trace für {handler_name}:")
for line in stack_trace:
log.warning(line.strip())
if duration > max_runtime:
log.error(f"Handler {handler_name} ist seit mehr als {max_runtime} Sekunden gesperrt.")
emergency_exit = True
if emergency_exit:
log.error(f"Deadlock erkannt, Prozess wird beendet!")
log.debug(f"Threads: {enumerate()}")
for thread in enumerate():
logging.debug(f"Thread Name: {thread.name}")
if hasattr(thread, "ident"):
thread_id = thread.ident
for tid, frame in sys._current_frames().items():
if tid == thread_id:
logging.debug(f" File: {frame.f_code.co_filename}, Line: {frame.f_lineno}, Function: {frame.f_code.co_name}")
stack_trace = traceback.format_stack(frame)
logging.debug(" Stack Trace:")
for line in stack_trace:
logging.debug(line.strip())
sys.exit(1)

# decorator can not be used here as it would block logging before handler_with_control_interval()
# @__with_handler_lock(error_threshold=30)
def handler10Sec(self):
""" führt den Algorithmus durch.
"""
try:
@exit_after(data.data.general_data.data.control_interval)
def handler_with_control_interval():
if (data.data.general_data.data.control_interval / 10) == self.interval_counter:
data.data.copy_data()
Expand All @@ -70,28 +155,19 @@ def handler_with_control_interval():
else:
self.interval_counter = self.interval_counter + 1
log.info("# ***Start*** ")
log.debug(run_command.run_shell_command("top -b -n 1 | head -n 20"))
log.debug(f'Drosselung: {run_command.run_shell_command("if which vcgencmd >/dev/null; then vcgencmd get_throttled; else echo not found; fi")}')
log.debug(f"Threads: {enumerate()}")
for thread in threading.enumerate():
logging.debug(f"Thread Name: {thread.name}")
if hasattr(thread, "ident"):
thread_id = thread.ident
for tid, frame in sys._current_frames().items():
if tid == thread_id:
logging.debug(f" File: {frame.f_code.co_filename}, Line: {frame.f_lineno}, Function: {frame.f_code.co_name}")
stack_trace = traceback.format_stack(frame)
logging.debug(" Stack Trace:")
for line in stack_trace:
logging.debug(line.strip())
# log.debug(run_command.run_shell_command("top -b -n 1 | head -n 20"))
# log.debug(f'Drosselung: {run_command.run_shell_command("if which vcgencmd >/dev/null; then vcgencmd get_throttled; else echo not found; fi")}')
Pub().pub("openWB/set/system/time", timecheck.create_timestamp())
handler_with_control_interval()
except KeyboardInterrupt:
log.critical("Ausführung durch exit_after gestoppt: "+traceback.format_exc())
if not self.__acquire_lock("handler10Sec", error_threshold=30):
return
try:
handler_with_control_interval()
finally:
self.__release_lock("handler10Sec")
except Exception:
log.exception("Fehler im Main-Modul")

@exit_after(10)
@__with_handler_lock(error_threshold=60)
def handler5MinAlgorithm(self):
""" Handler, der alle 5 Minuten aufgerufen wird und die Heartbeats der Threads überprüft und die Aufgaben
ausführt, die nur alle 5 Minuten ausgeführt werden müssen.
Expand All @@ -104,12 +180,10 @@ def handler5MinAlgorithm(self):
data.data.general_data.grid_protection()
data.data.optional_data.ocpp_transfer_meter_values()
data.data.counter_all_data.validate_hierarchy()
except KeyboardInterrupt:
log.critical("Ausführung durch exit_after gestoppt: "+traceback.format_exc())
except Exception:
log.exception("Fehler im Main-Modul")

@exit_after(10)
@__with_handler_lock(error_threshold=60)
def handler5Min(self):
""" Handler, der alle 5 Minuten aufgerufen wird und die Heartbeats der Threads überprüft und die Aufgaben
ausführt, die nur alle 5 Minuten ausgeführt werden müssen.
Expand Down Expand Up @@ -139,41 +213,35 @@ def handler5Min(self):
general_internal_chargepoint_handler.internal_chargepoint_handler.heartbeat = False
with ChangedValuesContext(loadvars_.event_module_update_completed):
sub.system_data["system"].update_ip_address()
except KeyboardInterrupt:
log.critical("Ausführung durch exit_after gestoppt: "+traceback.format_exc())
except Exception:
log.exception("Fehler im Main-Modul")

@exit_after(10)
@__with_handler_lock(error_threshold=60)
def handler_midnight(self):
try:
save_log(LogType.MONTHLY)
thread_errors_path = Path(Path(__file__).resolve().parents[1]/"ramdisk"/"thread_errors.log")
with thread_errors_path.open("w") as f:
f.write("")
except KeyboardInterrupt:
log.critical("Ausführung durch exit_after gestoppt: "+traceback.format_exc())
except Exception:
log.exception("Fehler im Main-Modul")

@exit_after(10)
@__with_handler_lock(error_threshold=60)
def handler_random_nightly(self):
try:
data.data.system_data["system"].thread_backup_and_send_to_cloud()
except KeyboardInterrupt:
log.critical("Ausführung durch exit_after gestoppt: "+traceback.format_exc())
except Exception:
log.exception("Fehler im Main-Modul")

@exit_after(10)
@__with_handler_lock(error_threshold=60)
def handler_hour(self):
""" Handler, der jede Stunde aufgerufen wird und die Aufgaben ausführt, die nur jede Stunde ausgeführt werden müssen.
"""
try:
with ChangedValuesContext(loadvars_.event_module_update_completed):
for cp in data.data.cp_data.values():
calculate_charge_cost(cp)
data.data.optional_data.et_get_prices()
except KeyboardInterrupt:
log.critical("Ausführung durch exit_after gestoppt: "+traceback.format_exc())
except Exception:
log.exception("Fehler im Main-Modul")

Expand All @@ -189,6 +257,8 @@ def schedule_jobs():
schedule.every().day.at(f"0{randrange(0, 5)}:{randrange(0, 59):02d}:{randrange(0, 59):02d}").do(
handler.handler_random_nightly)
[schedule.every().minute.at(f":{i:02d}").do(handler.handler10Sec).tag("algorithm") for i in range(0, 60, 10)]
# 30 Sekunden Handler, der die Locks überwacht, Deadlocks erkennt, loggt und ggf. den Prozess beendet
schedule.every(30).seconds.do(handler.monitor_handler_locks, max_runtime=600)


try:
Expand Down