diff --git a/packages/helpermodules/subdata.py b/packages/helpermodules/subdata.py index 95e5c1a157..cb26297c36 100644 --- a/packages/helpermodules/subdata.py +++ b/packages/helpermodules/subdata.py @@ -24,6 +24,7 @@ from helpermodules.abstract_plans import AutolockPlan, ScheduledChargingPlan, TimeChargingPlan from helpermodules.broker import BrokerClient from helpermodules.messaging import MessageType, pub_system_message +from helpermodules.utils import ProcessingCounter from helpermodules.utils.run_command import run_command from helpermodules.utils.topic_parser import decode_payload, get_index, get_second_index from helpermodules.pub import Pub @@ -104,6 +105,10 @@ def __init__(self, self.event_modbus_server = event_modbus_server self.event_restart_gpio = event_restart_gpio self.heartbeat = False + # Immer wenn ein Subscribe hinzugefügt wird, wird der Zähler hinzugefügt und subdata_initialized gepublished. + # Wenn subdata_initialized empfangen wird, wird der Zäheler runtergezählt. Erst wenn alle subdata_initialized + # empfangen wurden, wurden auch die vorher subskribierten Topics empfangen und der Algorithmus kann starten. + self.processing_counter = ProcessingCounter(self.event_subdata_initialized) def sub_topics(self): self.internal_broker_client = BrokerClient("mqttsub", self.on_connect, self.on_message) @@ -140,6 +145,7 @@ def on_connect(self, client: mqtt.Client, userdata, flags: dict, rc: int): ("openWB/system/io/#", 2), ("openWB/LegacySmartHome/Status/wattnichtHaus", 2), ]) + self.processing_counter.add_task() Pub().pub("openWB/system/subdata_initialized", True) def on_message(self, client: mqtt.Client, userdata, msg: mqtt.MQTTMessage): @@ -301,6 +307,8 @@ def process_vehicle_topic(self, client: mqtt.Client, var: Dict[str, ev.Ev], msg: var["ev"+index].soc_module = mod.create_vehicle(config, index) client.subscribe(f"openWB/vehicle/{index}/soc_module/calculated_soc_state", 2) client.subscribe(f"openWB/vehicle/{index}/soc_module/general_config", 2) + self.processing_counter.add_task() + Pub().pub("openWB/system/subdata_initialized", True) self.event_soc.set() else: # temporäres ChargeTemplate aktualisieren, wenn dem Fahrzeug ein anderes Ladeprofil zugeordnet @@ -864,6 +872,8 @@ def process_system_topic(self, client: mqtt.Client, var: dict, msg: mqtt.MQTTMes # Durch das erneute Subscribe werden die Komponenten mit dem aktualisierten TCP-Client angelegt. client.subscribe(f"openWB/system/device/{index}/component/+/config", 2) client.subscribe(f"openWB/system/device/{index}/error_timestamp", 2) + self.processing_counter.add_task() + Pub().pub("openWB/system/subdata_initialized", True) elif re.search("^.+/device/[0-9]+/component/[0-9]+/simulation$", msg.topic) is not None: index = get_index(msg.topic) index_second = get_second_index(msg.topic) @@ -900,6 +910,8 @@ def process_system_topic(self, client: mqtt.Client, var: dict, msg: mqtt.MQTTMes config = dataclass_from_dict(component.component_descriptor.configuration_factory, component_config) var["device"+index].add_component(config) client.subscribe(f"openWB/system/device/{index}/component/{index_second}/simulation", 2) + self.processing_counter.add_task() + Pub().pub("openWB/system/subdata_initialized", True) elif "mqtt" and "bridge" in msg.topic: # do not reconfigure mqtt bridges if topic is received on startup if self.event_subdata_initialized.is_set(): @@ -978,7 +990,7 @@ def process_system_topic(self, client: mqtt.Client, var: dict, msg: mqtt.MQTTMes elif "openWB/system/subdata_initialized" == msg.topic: if decode_payload(msg.payload) != "": Pub().pub("openWB/system/subdata_initialized", "") - self.event_subdata_initialized.set() + self.processing_counter.task_done() elif "openWB/system/update_config_completed" == msg.topic: if decode_payload(msg.payload) != "": Pub().pub("openWB/system/update_config_completed", "") diff --git a/packages/helpermodules/utils/__init__.py b/packages/helpermodules/utils/__init__.py index c87b48fb9a..ce3cdae0d2 100644 --- a/packages/helpermodules/utils/__init__.py +++ b/packages/helpermodules/utils/__init__.py @@ -1,2 +1,3 @@ from helpermodules.utils._exit_after import exit_after from helpermodules.utils._thread_handler import joined_thread_handler, thread_handler +from helpermodules.utils.processing_counter import ProcessingCounter diff --git a/packages/helpermodules/utils/processing_counter.py b/packages/helpermodules/utils/processing_counter.py new file mode 100644 index 0000000000..f8d4aead99 --- /dev/null +++ b/packages/helpermodules/utils/processing_counter.py @@ -0,0 +1,26 @@ +from threading import Event, Lock + + +class ProcessingCounter: + def __init__(self, done_event: Event): + self.lock = Lock() + self.counter = 0 + self.done_event = done_event + + def add_task(self): + with self.lock: + self.counter += 1 + self.done_event.clear() + + def task_done(self): + with self.lock: + self.counter -= 1 + if self.counter <= 0: + self.done_event.set() + + def wait_for_completion(self, timeout=None): + return self.done_event.wait(timeout) + + def is_done(self): + with self.lock: + return self.counter == 0 diff --git a/packages/main.py b/packages/main.py index 9c159d9ad4..f483a68e19 100755 --- a/packages/main.py +++ b/packages/main.py @@ -253,6 +253,7 @@ def schedule_jobs(): Thread(target=start_modbus_server, args=(event_modbus_server,), name="Modbus Control Server").start() # Warten, damit subdata Zeit hat, alle Topics auf dem Broker zu empfangen. event_update_config_completed.wait(300) + event_subdata_initialized.wait(300) Pub().pub("openWB/set/system/boot_done", True) Path(Path(__file__).resolve().parents[1]/"ramdisk"/"bootdone").touch() schedule_jobs()