From 21dad511b5fd3e6154d7a8bb01367281881dee7e Mon Sep 17 00:00:00 2001 From: LKuemmel <76958050+LKuemmel@users.noreply.github.com> Date: Tue, 9 Dec 2025 08:53:24 +0100 Subject: [PATCH] Revert "subdata initialization: wait for all retained topics (#3015)" This reverts commit 5988e8bc74f855905e901ba2299f6b08b6225a07. --- packages/helpermodules/command_test.py | 5 +- packages/helpermodules/setdata.py | 7 ++- packages/helpermodules/subdata.py | 61 +++++++++---------- packages/helpermodules/utils/__init__.py | 1 + .../helpermodules/utils/processing_counter.py | 26 ++++++++ packages/main.py | 9 ++- packages/modules/update_soc_test.py | 5 +- 7 files changed, 72 insertions(+), 42 deletions(-) create mode 100644 packages/helpermodules/utils/processing_counter.py diff --git a/packages/helpermodules/command_test.py b/packages/helpermodules/command_test.py index 4045f8a09a..dade5c3324 100644 --- a/packages/helpermodules/command_test.py +++ b/packages/helpermodules/command_test.py @@ -15,9 +15,8 @@ @pytest.fixture -def subdata_fixture(monkeypatch) -> None: - monkeypatch.setattr(SubData, "initialize", lambda x: None) - SubData(*([Mock()]*15)) +def subdata_fixture() -> None: + SubData(*([Mock()]*16)) SubData.cp_data = {"cp0": Mock(spec=ChargepointStateUpdate, chargepoint=Mock( spec=Chargepoint, chargepoint_module=Mock(spec=ChargepointModulePro)))} diff --git a/packages/helpermodules/setdata.py b/packages/helpermodules/setdata.py index 70bc3567e7..067ede9af8 100644 --- a/packages/helpermodules/setdata.py +++ b/packages/helpermodules/setdata.py @@ -28,15 +28,18 @@ class SetData: def __init__(self, event_ev_template: Event, event_cp_config: Event, - event_soc: Event): + event_soc: Event, + event_subdata_initialized: Event): self.event_ev_template = event_ev_template self.event_cp_config = event_cp_config self.event_soc = event_soc + self.event_subdata_initialized = event_subdata_initialized self.heartbeat = False def set_data(self): self.internal_broker_client = BrokerClient("mqttset", self.on_connect, self.on_message) - log.debug("Starting setdata loop to broker.") + self.event_subdata_initialized.wait() + log.debug("Subdata initialization completed. Starting setdata loop to broker.") self.internal_broker_client.start_infinite_loop() def disconnect(self) -> None: diff --git a/packages/helpermodules/subdata.py b/packages/helpermodules/subdata.py index 1377fd6610..b5b2856e0c 100644 --- a/packages/helpermodules/subdata.py +++ b/packages/helpermodules/subdata.py @@ -4,7 +4,6 @@ import logging from pathlib import Path from threading import Event -import time from typing import Dict, Union import re import subprocess @@ -24,6 +23,7 @@ from helpermodules import graph, system from helpermodules.broker import BrokerClient from helpermodules.messaging import MessageType, pub_system_message +from helpermodules.utils import ProcessingCounter from helpermodules.utils.run_command import run_command from helpermodules.utils.topic_parser import decode_payload, get_index, get_second_index from helpermodules.pub import Pub @@ -39,8 +39,6 @@ log = logging.getLogger(__name__) mqtt_log = logging.getLogger("mqtt") -QUIET_TIME = 3 - class SubData: """ Klasse, die die benötigten Topics abonniert, die Instanzen erstellt, wenn z.b. ein Modul neu konfiguriert @@ -79,6 +77,7 @@ def __init__(self, event_copy_data: Event, event_global_data_initialized: Event, event_command_completed: Event, + event_subdata_initialized: Event, event_vehicle_update_completed: Event, event_start_internal_chargepoint: Event, event_stop_internal_chargepoint: Event, @@ -94,6 +93,7 @@ def __init__(self, self.event_copy_data = event_copy_data self.event_global_data_initialized = event_global_data_initialized self.event_command_completed = event_command_completed + self.event_subdata_initialized = event_subdata_initialized self.event_vehicle_update_completed = event_vehicle_update_completed self.event_start_internal_chargepoint = event_start_internal_chargepoint self.event_stop_internal_chargepoint = event_stop_internal_chargepoint @@ -104,28 +104,13 @@ def __init__(self, self.event_modbus_server = event_modbus_server self.event_restart_gpio = event_restart_gpio self.heartbeat = False - self.last_msg_time = None - self.initialized = False - self.internal_broker_client = BrokerClient("mqttsub", self.on_connect, self.on_message) - self.initialize() - - def initialize(self): - try: - self.internal_broker_client.client.loop_start() - log.debug("Warte auf retained Topics ...") - while self.last_msg_time is None: - time.sleep(0.05) - # quiet time detector: mqtt schickt alle retained messages direkt nach dem subscribe - while time.time() - self.last_msg_time < QUIET_TIME: - time.sleep(0.05) - self.internal_broker_client.client.loop_stop() - log.debug("Alle retained Topics empfangen.") - self.initialized = True - except Exception: - log.exception("Fehler beim Initialisieren des Subdata-Moduls") + # Immer wenn ein Subscribe hinzugefügt wird, wird der Zähler hinzugefügt und subdata_initialized gepublished. + # Wenn subdata_initialized empfangen wird, wird der Zäheler runtergezählt. Erst wenn alle subdata_initialized + # empfangen wurden, wurden auch die vorher subskribierten Topics empfangen und der Algorithmus kann starten. + self.processing_counter = ProcessingCounter(self.event_subdata_initialized) def sub_topics(self): - log.debug("Starte Subdata-MQTT-Loop") + self.internal_broker_client = BrokerClient("mqttsub", self.on_connect, self.on_message) self.internal_broker_client.start_infinite_loop() def disconnect(self) -> None: @@ -164,14 +149,15 @@ def on_connect(self, client: mqtt.Client, userdata, flags: dict, rc: int): ("openWB/LegacySmartHome/Status/wattnichtHaus", 2), ("openWB/io/#", 2), ]) + self.processing_counter.add_task() + Pub().pub("openWB/system/subdata_initialized", True) def on_message(self, client: mqtt.Client, userdata, msg: mqtt.MQTTMessage): """ wait for incoming topics. """ - mqtt_log.debug(f"Topic: {msg.topic}, Payload: {msg.payload.decode('utf-8')}") + mqtt_log.debug("Topic: "+str(msg.topic) + + ", Payload: "+str(msg.payload.decode("utf-8"))) self.heartbeat = True - if self.initialized is False: - self.last_msg_time = time.time() if "openWB/vehicle/template/charge_template/" in msg.topic: self.process_vehicle_charge_template_topic( self.ev_charge_template_data, msg) @@ -325,6 +311,8 @@ def process_vehicle_topic(self, client: mqtt.Client, var: Dict[str, ev.Ev], msg: var["ev"+index].soc_module = mod.create_vehicle(config, index) client.subscribe(f"openWB/vehicle/{index}/soc_module/calculated_soc_state", 2) client.subscribe(f"openWB/vehicle/{index}/soc_module/general_config", 2) + self.processing_counter.add_task() + Pub().pub("openWB/system/subdata_initialized", True) self.event_soc.set() else: # temporäres ChargeTemplate aktualisieren, wenn dem Fahrzeug ein anderes Ladeprofil zugeordnet @@ -622,7 +610,10 @@ def process_general_topic(self, var: general.General, msg: mqtt.MQTTMessage): if decode_payload(msg.payload) and self.general_data.data.extern: self.event_modbus_server.set() elif "openWB/general/http_api" == msg.topic: - if self.initialized and self.general_data.data.http_api != decode_payload(msg.payload): + if ( + self.event_subdata_initialized.is_set() and + self.general_data.data.http_api != decode_payload(msg.payload) + ): pub_system_message( msg.payload, "Bitte die openWB " @@ -759,7 +750,7 @@ def process_optional_topic(self, var: optional.Optional, msg: mqtt.MQTTMessage): var.data.ocpp = dataclass_from_dict(Ocpp, config_dict) elif re.search("/optional/monitoring/", msg.topic) is not None: # do not reconfigure monitoring if topic is received on startup - if self.initialized: + if self.event_subdata_initialized.is_set(): config = decode_payload(msg.payload) if config["type"] is None: var.monitoring_stop() @@ -843,6 +834,8 @@ def process_system_topic(self, client: mqtt.Client, var: dict, msg: mqtt.MQTTMes # Durch das erneute Subscribe werden die Komponenten mit dem aktualisierten TCP-Client angelegt. client.subscribe(f"openWB/system/device/{index}/component/+/config", 2) client.subscribe(f"openWB/system/device/{index}/error_timestamp", 2) + self.processing_counter.add_task() + Pub().pub("openWB/system/subdata_initialized", True) elif re.search("^.+/device/[0-9]+/component/[0-9]+/simulation$", msg.topic) is not None: index = get_index(msg.topic) index_second = get_second_index(msg.topic) @@ -879,9 +872,11 @@ def process_system_topic(self, client: mqtt.Client, var: dict, msg: mqtt.MQTTMes config = dataclass_from_dict(component.component_descriptor.configuration_factory, component_config) var["device"+index].add_component(config) client.subscribe(f"openWB/system/device/{index}/component/{index_second}/simulation", 2) + self.processing_counter.add_task() + Pub().pub("openWB/system/subdata_initialized", True) elif "mqtt" and "bridge" in msg.topic: # do not reconfigure mqtt bridges if topic is received on startup - if self.initialized: + if self.event_subdata_initialized.is_set(): index = get_index(msg.topic) parent_file = Path(__file__).resolve().parents[2] try: @@ -909,7 +904,7 @@ def process_system_topic(self, client: mqtt.Client, var: dict, msg: mqtt.MQTTMes run_command([str(Path(__file__).resolve().parents[2] / "runs" / "start_remote_support.sh"), token, port, user], process_exception=True) elif "openWB/system/backup_password" in msg.topic: - if self.initialized: + if self.event_subdata_initialized.is_set(): key_file = Path.home() / "backup.key" payload = decode_payload(msg.payload) if payload is None or payload == "": @@ -932,7 +927,7 @@ def process_system_topic(self, client: mqtt.Client, var: dict, msg: mqtt.MQTTMes self.set_json_payload(var["system"].data["backup_cloud"], msg) elif ("openWB/system/dataprotection_acknowledged" == msg.topic and decode_payload(msg.payload) is False): - if self.initialized: + if self.event_subdata_initialized.is_set(): Pub().pub("openWB/set/command/removeCloudBridge/todo", {"command": "removeCloudBridge"}) else: @@ -972,6 +967,10 @@ def process_system_topic(self, client: mqtt.Client, var: dict, msg: mqtt.MQTTMes "openWB/system/time" == msg.topic): # Logged in update.log, not used in data.data and removed due to readability purposes of main.log. return + elif "openWB/system/subdata_initialized" == msg.topic: + if decode_payload(msg.payload) != "": + Pub().pub("openWB/system/subdata_initialized", "") + self.processing_counter.task_done() elif "openWB/system/update_config_completed" == msg.topic: if decode_payload(msg.payload) != "": Pub().pub("openWB/system/update_config_completed", "") diff --git a/packages/helpermodules/utils/__init__.py b/packages/helpermodules/utils/__init__.py index 55d7b1e041..15a1ee8103 100644 --- a/packages/helpermodules/utils/__init__.py +++ b/packages/helpermodules/utils/__init__.py @@ -1,2 +1,3 @@ from helpermodules.utils._get_default import get_default from helpermodules.utils._thread_handler import joined_thread_handler, thread_handler +from helpermodules.utils.processing_counter import ProcessingCounter diff --git a/packages/helpermodules/utils/processing_counter.py b/packages/helpermodules/utils/processing_counter.py new file mode 100644 index 0000000000..f8d4aead99 --- /dev/null +++ b/packages/helpermodules/utils/processing_counter.py @@ -0,0 +1,26 @@ +from threading import Event, Lock + + +class ProcessingCounter: + def __init__(self, done_event: Event): + self.lock = Lock() + self.counter = 0 + self.done_event = done_event + + def add_task(self): + with self.lock: + self.counter += 1 + self.done_event.clear() + + def task_done(self): + with self.lock: + self.counter -= 1 + if self.counter <= 0: + self.done_event.set() + + def wait_for_completion(self, timeout=None): + return self.done_event.wait(timeout) + + def is_done(self): + with self.lock: + return self.counter == 0 diff --git a/packages/main.py b/packages/main.py index 74a1c6b6e0..1650b10fb8 100755 --- a/packages/main.py +++ b/packages/main.py @@ -19,7 +19,7 @@ import time from threading import Event, Thread, enumerate import traceback -from control.chargelog.chargelog import calc_energy_costs +from control.chargelog.chargelog import calc_energy_costs, calculate_charged_energy_by_source from control import data, prepare, process from control.algorithm import algorithm @@ -303,6 +303,7 @@ def schedule_jobs(): event_global_data_initialized = Event() event_command_completed = Event() event_command_completed.set() + event_subdata_initialized = Event() event_update_config_completed = Event() event_modbus_server = Event() event_jobs_running = Event() @@ -313,11 +314,12 @@ def schedule_jobs(): prep = prepare.Prepare() soc = update_soc.UpdateSoc(event_update_soc) set = setdata.SetData(event_ev_template, - event_cp_config, event_soc) + event_cp_config, event_soc, + event_subdata_initialized) sub = subdata.SubData(event_ev_template, event_cp_config, loadvars_.event_module_update_completed, event_copy_data, event_global_data_initialized, event_command_completed, - soc.event_vehicle_update_completed, + event_subdata_initialized, soc.event_vehicle_update_completed, general_internal_chargepoint_handler.event_start, general_internal_chargepoint_handler.event_stop, event_update_config_completed, @@ -346,6 +348,7 @@ def schedule_jobs(): Thread(target=start_modbus_server, args=(event_modbus_server,), name="Modbus Control Server").start() # Warten, damit subdata Zeit hat, alle Topics auf dem Broker zu empfangen. event_update_config_completed.wait(300) + event_subdata_initialized.wait(300) Pub().pub("openWB/set/system/boot_done", True) Path(Path(__file__).resolve().parents[1]/"ramdisk"/"bootdone").touch() schedule_jobs() diff --git a/packages/modules/update_soc_test.py b/packages/modules/update_soc_test.py index e0ef5a96b7..376af80f66 100644 --- a/packages/modules/update_soc_test.py +++ b/packages/modules/update_soc_test.py @@ -19,11 +19,10 @@ @pytest.fixture(autouse=True) -def mock_data(monkeypatch) -> None: +def mock_data() -> None: data.data_init(Mock()) - monkeypatch.setattr(SubData, "initialize", lambda x: None) - SubData(*([Mock()]*15)) + SubData(*([Mock()]*16)) SubData.cp_data = {"cp0": Mock(spec=ChargepointStateUpdate, chargepoint=Mock( spec=Chargepoint, id=id,