diff --git a/packages/helpermodules/command_test.py b/packages/helpermodules/command_test.py index dade5c3324..4045f8a09a 100644 --- a/packages/helpermodules/command_test.py +++ b/packages/helpermodules/command_test.py @@ -15,8 +15,9 @@ @pytest.fixture -def subdata_fixture() -> None: - SubData(*([Mock()]*16)) +def subdata_fixture(monkeypatch) -> None: + monkeypatch.setattr(SubData, "initialize", lambda x: None) + SubData(*([Mock()]*15)) SubData.cp_data = {"cp0": Mock(spec=ChargepointStateUpdate, chargepoint=Mock( spec=Chargepoint, chargepoint_module=Mock(spec=ChargepointModulePro)))} diff --git a/packages/helpermodules/setdata.py b/packages/helpermodules/setdata.py index 067ede9af8..70bc3567e7 100644 --- a/packages/helpermodules/setdata.py +++ b/packages/helpermodules/setdata.py @@ -28,18 +28,15 @@ class SetData: def __init__(self, event_ev_template: Event, event_cp_config: Event, - event_soc: Event, - event_subdata_initialized: Event): + event_soc: Event): self.event_ev_template = event_ev_template self.event_cp_config = event_cp_config self.event_soc = event_soc - self.event_subdata_initialized = event_subdata_initialized self.heartbeat = False def set_data(self): self.internal_broker_client = BrokerClient("mqttset", self.on_connect, self.on_message) - self.event_subdata_initialized.wait() - log.debug("Subdata initialization completed. Starting setdata loop to broker.") + log.debug("Starting setdata loop to broker.") self.internal_broker_client.start_infinite_loop() def disconnect(self) -> None: diff --git a/packages/helpermodules/subdata.py b/packages/helpermodules/subdata.py index b5b2856e0c..1377fd6610 100644 --- a/packages/helpermodules/subdata.py +++ b/packages/helpermodules/subdata.py @@ -4,6 +4,7 @@ import logging from pathlib import Path from threading import Event +import time from typing import Dict, Union import re import subprocess @@ -23,7 +24,6 @@ from helpermodules import graph, system from helpermodules.broker import BrokerClient from helpermodules.messaging import MessageType, pub_system_message -from helpermodules.utils import ProcessingCounter from helpermodules.utils.run_command import run_command from helpermodules.utils.topic_parser import decode_payload, get_index, get_second_index from helpermodules.pub import Pub @@ -39,6 +39,8 @@ log = logging.getLogger(__name__) mqtt_log = logging.getLogger("mqtt") +QUIET_TIME = 3 + class SubData: """ Klasse, die die benötigten Topics abonniert, die Instanzen erstellt, wenn z.b. ein Modul neu konfiguriert @@ -77,7 +79,6 @@ def __init__(self, event_copy_data: Event, event_global_data_initialized: Event, event_command_completed: Event, - event_subdata_initialized: Event, event_vehicle_update_completed: Event, event_start_internal_chargepoint: Event, event_stop_internal_chargepoint: Event, @@ -93,7 +94,6 @@ def __init__(self, self.event_copy_data = event_copy_data self.event_global_data_initialized = event_global_data_initialized self.event_command_completed = event_command_completed - self.event_subdata_initialized = event_subdata_initialized self.event_vehicle_update_completed = event_vehicle_update_completed self.event_start_internal_chargepoint = event_start_internal_chargepoint self.event_stop_internal_chargepoint = event_stop_internal_chargepoint @@ -104,13 +104,28 @@ def __init__(self, self.event_modbus_server = event_modbus_server self.event_restart_gpio = event_restart_gpio self.heartbeat = False - # Immer wenn ein Subscribe hinzugefügt wird, wird der Zähler hinzugefügt und subdata_initialized gepublished. - # Wenn subdata_initialized empfangen wird, wird der Zäheler runtergezählt. Erst wenn alle subdata_initialized - # empfangen wurden, wurden auch die vorher subskribierten Topics empfangen und der Algorithmus kann starten. - self.processing_counter = ProcessingCounter(self.event_subdata_initialized) + self.last_msg_time = None + self.initialized = False + self.internal_broker_client = BrokerClient("mqttsub", self.on_connect, self.on_message) + self.initialize() + + def initialize(self): + try: + self.internal_broker_client.client.loop_start() + log.debug("Warte auf retained Topics ...") + while self.last_msg_time is None: + time.sleep(0.05) + # quiet time detector: mqtt schickt alle retained messages direkt nach dem subscribe + while time.time() - self.last_msg_time < QUIET_TIME: + time.sleep(0.05) + self.internal_broker_client.client.loop_stop() + log.debug("Alle retained Topics empfangen.") + self.initialized = True + except Exception: + log.exception("Fehler beim Initialisieren des Subdata-Moduls") def sub_topics(self): - self.internal_broker_client = BrokerClient("mqttsub", self.on_connect, self.on_message) + log.debug("Starte Subdata-MQTT-Loop") self.internal_broker_client.start_infinite_loop() def disconnect(self) -> None: @@ -149,15 +164,14 @@ def on_connect(self, client: mqtt.Client, userdata, flags: dict, rc: int): ("openWB/LegacySmartHome/Status/wattnichtHaus", 2), ("openWB/io/#", 2), ]) - self.processing_counter.add_task() - Pub().pub("openWB/system/subdata_initialized", True) def on_message(self, client: mqtt.Client, userdata, msg: mqtt.MQTTMessage): """ wait for incoming topics. """ - mqtt_log.debug("Topic: "+str(msg.topic) + - ", Payload: "+str(msg.payload.decode("utf-8"))) + mqtt_log.debug(f"Topic: {msg.topic}, Payload: {msg.payload.decode('utf-8')}") self.heartbeat = True + if self.initialized is False: + self.last_msg_time = time.time() if "openWB/vehicle/template/charge_template/" in msg.topic: self.process_vehicle_charge_template_topic( self.ev_charge_template_data, msg) @@ -311,8 +325,6 @@ def process_vehicle_topic(self, client: mqtt.Client, var: Dict[str, ev.Ev], msg: var["ev"+index].soc_module = mod.create_vehicle(config, index) client.subscribe(f"openWB/vehicle/{index}/soc_module/calculated_soc_state", 2) client.subscribe(f"openWB/vehicle/{index}/soc_module/general_config", 2) - self.processing_counter.add_task() - Pub().pub("openWB/system/subdata_initialized", True) self.event_soc.set() else: # temporäres ChargeTemplate aktualisieren, wenn dem Fahrzeug ein anderes Ladeprofil zugeordnet @@ -610,10 +622,7 @@ def process_general_topic(self, var: general.General, msg: mqtt.MQTTMessage): if decode_payload(msg.payload) and self.general_data.data.extern: self.event_modbus_server.set() elif "openWB/general/http_api" == msg.topic: - if ( - self.event_subdata_initialized.is_set() and - self.general_data.data.http_api != decode_payload(msg.payload) - ): + if self.initialized and self.general_data.data.http_api != decode_payload(msg.payload): pub_system_message( msg.payload, "Bitte die openWB " @@ -750,7 +759,7 @@ def process_optional_topic(self, var: optional.Optional, msg: mqtt.MQTTMessage): var.data.ocpp = dataclass_from_dict(Ocpp, config_dict) elif re.search("/optional/monitoring/", msg.topic) is not None: # do not reconfigure monitoring if topic is received on startup - if self.event_subdata_initialized.is_set(): + if self.initialized: config = decode_payload(msg.payload) if config["type"] is None: var.monitoring_stop() @@ -834,8 +843,6 @@ def process_system_topic(self, client: mqtt.Client, var: dict, msg: mqtt.MQTTMes # Durch das erneute Subscribe werden die Komponenten mit dem aktualisierten TCP-Client angelegt. client.subscribe(f"openWB/system/device/{index}/component/+/config", 2) client.subscribe(f"openWB/system/device/{index}/error_timestamp", 2) - self.processing_counter.add_task() - Pub().pub("openWB/system/subdata_initialized", True) elif re.search("^.+/device/[0-9]+/component/[0-9]+/simulation$", msg.topic) is not None: index = get_index(msg.topic) index_second = get_second_index(msg.topic) @@ -872,11 +879,9 @@ def process_system_topic(self, client: mqtt.Client, var: dict, msg: mqtt.MQTTMes config = dataclass_from_dict(component.component_descriptor.configuration_factory, component_config) var["device"+index].add_component(config) client.subscribe(f"openWB/system/device/{index}/component/{index_second}/simulation", 2) - self.processing_counter.add_task() - Pub().pub("openWB/system/subdata_initialized", True) elif "mqtt" and "bridge" in msg.topic: # do not reconfigure mqtt bridges if topic is received on startup - if self.event_subdata_initialized.is_set(): + if self.initialized: index = get_index(msg.topic) parent_file = Path(__file__).resolve().parents[2] try: @@ -904,7 +909,7 @@ def process_system_topic(self, client: mqtt.Client, var: dict, msg: mqtt.MQTTMes run_command([str(Path(__file__).resolve().parents[2] / "runs" / "start_remote_support.sh"), token, port, user], process_exception=True) elif "openWB/system/backup_password" in msg.topic: - if self.event_subdata_initialized.is_set(): + if self.initialized: key_file = Path.home() / "backup.key" payload = decode_payload(msg.payload) if payload is None or payload == "": @@ -927,7 +932,7 @@ def process_system_topic(self, client: mqtt.Client, var: dict, msg: mqtt.MQTTMes self.set_json_payload(var["system"].data["backup_cloud"], msg) elif ("openWB/system/dataprotection_acknowledged" == msg.topic and decode_payload(msg.payload) is False): - if self.event_subdata_initialized.is_set(): + if self.initialized: Pub().pub("openWB/set/command/removeCloudBridge/todo", {"command": "removeCloudBridge"}) else: @@ -967,10 +972,6 @@ def process_system_topic(self, client: mqtt.Client, var: dict, msg: mqtt.MQTTMes "openWB/system/time" == msg.topic): # Logged in update.log, not used in data.data and removed due to readability purposes of main.log. return - elif "openWB/system/subdata_initialized" == msg.topic: - if decode_payload(msg.payload) != "": - Pub().pub("openWB/system/subdata_initialized", "") - self.processing_counter.task_done() elif "openWB/system/update_config_completed" == msg.topic: if decode_payload(msg.payload) != "": Pub().pub("openWB/system/update_config_completed", "") diff --git a/packages/helpermodules/utils/__init__.py b/packages/helpermodules/utils/__init__.py index 15a1ee8103..55d7b1e041 100644 --- a/packages/helpermodules/utils/__init__.py +++ b/packages/helpermodules/utils/__init__.py @@ -1,3 +1,2 @@ from helpermodules.utils._get_default import get_default from helpermodules.utils._thread_handler import joined_thread_handler, thread_handler -from helpermodules.utils.processing_counter import ProcessingCounter diff --git a/packages/helpermodules/utils/processing_counter.py b/packages/helpermodules/utils/processing_counter.py deleted file mode 100644 index f8d4aead99..0000000000 --- a/packages/helpermodules/utils/processing_counter.py +++ /dev/null @@ -1,26 +0,0 @@ -from threading import Event, Lock - - -class ProcessingCounter: - def __init__(self, done_event: Event): - self.lock = Lock() - self.counter = 0 - self.done_event = done_event - - def add_task(self): - with self.lock: - self.counter += 1 - self.done_event.clear() - - def task_done(self): - with self.lock: - self.counter -= 1 - if self.counter <= 0: - self.done_event.set() - - def wait_for_completion(self, timeout=None): - return self.done_event.wait(timeout) - - def is_done(self): - with self.lock: - return self.counter == 0 diff --git a/packages/main.py b/packages/main.py index 1650b10fb8..74a1c6b6e0 100755 --- a/packages/main.py +++ b/packages/main.py @@ -19,7 +19,7 @@ import time from threading import Event, Thread, enumerate import traceback -from control.chargelog.chargelog import calc_energy_costs, calculate_charged_energy_by_source +from control.chargelog.chargelog import calc_energy_costs from control import data, prepare, process from control.algorithm import algorithm @@ -303,7 +303,6 @@ def schedule_jobs(): event_global_data_initialized = Event() event_command_completed = Event() event_command_completed.set() - event_subdata_initialized = Event() event_update_config_completed = Event() event_modbus_server = Event() event_jobs_running = Event() @@ -314,12 +313,11 @@ def schedule_jobs(): prep = prepare.Prepare() soc = update_soc.UpdateSoc(event_update_soc) set = setdata.SetData(event_ev_template, - event_cp_config, event_soc, - event_subdata_initialized) + event_cp_config, event_soc) sub = subdata.SubData(event_ev_template, event_cp_config, loadvars_.event_module_update_completed, event_copy_data, event_global_data_initialized, event_command_completed, - event_subdata_initialized, soc.event_vehicle_update_completed, + soc.event_vehicle_update_completed, general_internal_chargepoint_handler.event_start, general_internal_chargepoint_handler.event_stop, event_update_config_completed, @@ -348,7 +346,6 @@ def schedule_jobs(): Thread(target=start_modbus_server, args=(event_modbus_server,), name="Modbus Control Server").start() # Warten, damit subdata Zeit hat, alle Topics auf dem Broker zu empfangen. event_update_config_completed.wait(300) - event_subdata_initialized.wait(300) Pub().pub("openWB/set/system/boot_done", True) Path(Path(__file__).resolve().parents[1]/"ramdisk"/"bootdone").touch() schedule_jobs() diff --git a/packages/modules/update_soc_test.py b/packages/modules/update_soc_test.py index 376af80f66..e0ef5a96b7 100644 --- a/packages/modules/update_soc_test.py +++ b/packages/modules/update_soc_test.py @@ -19,10 +19,11 @@ @pytest.fixture(autouse=True) -def mock_data() -> None: +def mock_data(monkeypatch) -> None: data.data_init(Mock()) - SubData(*([Mock()]*16)) + monkeypatch.setattr(SubData, "initialize", lambda x: None) + SubData(*([Mock()]*15)) SubData.cp_data = {"cp0": Mock(spec=ChargepointStateUpdate, chargepoint=Mock( spec=Chargepoint, id=id,