Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 13 additions & 1 deletion packages/helpermodules/subdata.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
from helpermodules.abstract_plans import AutolockPlan, ScheduledChargingPlan, TimeChargingPlan
from helpermodules.broker import BrokerClient
from helpermodules.messaging import MessageType, pub_system_message
from helpermodules.utils import ProcessingCounter
from helpermodules.utils.run_command import run_command
from helpermodules.utils.topic_parser import decode_payload, get_index, get_second_index
from helpermodules.pub import Pub
Expand Down Expand Up @@ -104,6 +105,10 @@ def __init__(self,
self.event_modbus_server = event_modbus_server
self.event_restart_gpio = event_restart_gpio
self.heartbeat = False
# Immer wenn ein Subscribe hinzugefügt wird, wird der Zähler hinzugefügt und subdata_initialized gepublished.
# Wenn subdata_initialized empfangen wird, wird der Zäheler runtergezählt. Erst wenn alle subdata_initialized
# empfangen wurden, wurden auch die vorher subskribierten Topics empfangen und der Algorithmus kann starten.
self.processing_counter = ProcessingCounter(self.event_subdata_initialized)

def sub_topics(self):
self.internal_broker_client = BrokerClient("mqttsub", self.on_connect, self.on_message)
Expand Down Expand Up @@ -140,6 +145,7 @@ def on_connect(self, client: mqtt.Client, userdata, flags: dict, rc: int):
("openWB/system/io/#", 2),
("openWB/LegacySmartHome/Status/wattnichtHaus", 2),
])
self.processing_counter.add_task()
Pub().pub("openWB/system/subdata_initialized", True)

def on_message(self, client: mqtt.Client, userdata, msg: mqtt.MQTTMessage):
Expand Down Expand Up @@ -301,6 +307,8 @@ def process_vehicle_topic(self, client: mqtt.Client, var: Dict[str, ev.Ev], msg:
var["ev"+index].soc_module = mod.create_vehicle(config, index)
client.subscribe(f"openWB/vehicle/{index}/soc_module/calculated_soc_state", 2)
client.subscribe(f"openWB/vehicle/{index}/soc_module/general_config", 2)
self.processing_counter.add_task()
Pub().pub("openWB/system/subdata_initialized", True)
self.event_soc.set()
else:
# temporäres ChargeTemplate aktualisieren, wenn dem Fahrzeug ein anderes Ladeprofil zugeordnet
Expand Down Expand Up @@ -864,6 +872,8 @@ def process_system_topic(self, client: mqtt.Client, var: dict, msg: mqtt.MQTTMes
# Durch das erneute Subscribe werden die Komponenten mit dem aktualisierten TCP-Client angelegt.
client.subscribe(f"openWB/system/device/{index}/component/+/config", 2)
client.subscribe(f"openWB/system/device/{index}/error_timestamp", 2)
self.processing_counter.add_task()
Pub().pub("openWB/system/subdata_initialized", True)
elif re.search("^.+/device/[0-9]+/component/[0-9]+/simulation$", msg.topic) is not None:
index = get_index(msg.topic)
index_second = get_second_index(msg.topic)
Expand Down Expand Up @@ -900,6 +910,8 @@ def process_system_topic(self, client: mqtt.Client, var: dict, msg: mqtt.MQTTMes
config = dataclass_from_dict(component.component_descriptor.configuration_factory, component_config)
var["device"+index].add_component(config)
client.subscribe(f"openWB/system/device/{index}/component/{index_second}/simulation", 2)
self.processing_counter.add_task()
Pub().pub("openWB/system/subdata_initialized", True)
elif "mqtt" and "bridge" in msg.topic:
# do not reconfigure mqtt bridges if topic is received on startup
if self.event_subdata_initialized.is_set():
Expand Down Expand Up @@ -978,7 +990,7 @@ def process_system_topic(self, client: mqtt.Client, var: dict, msg: mqtt.MQTTMes
elif "openWB/system/subdata_initialized" == msg.topic:
if decode_payload(msg.payload) != "":
Pub().pub("openWB/system/subdata_initialized", "")
self.event_subdata_initialized.set()
self.processing_counter.task_done()
elif "openWB/system/update_config_completed" == msg.topic:
if decode_payload(msg.payload) != "":
Pub().pub("openWB/system/update_config_completed", "")
Expand Down
1 change: 1 addition & 0 deletions packages/helpermodules/utils/__init__.py
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
from helpermodules.utils._exit_after import exit_after
from helpermodules.utils._thread_handler import joined_thread_handler, thread_handler
from helpermodules.utils.processing_counter import ProcessingCounter
26 changes: 26 additions & 0 deletions packages/helpermodules/utils/processing_counter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
from threading import Event, Lock


class ProcessingCounter:
def __init__(self, done_event: Event):
self.lock = Lock()
self.counter = 0
self.done_event = done_event

def add_task(self):
with self.lock:
self.counter += 1
self.done_event.clear()

def task_done(self):
with self.lock:
self.counter -= 1
if self.counter <= 0:
self.done_event.set()

def wait_for_completion(self, timeout=None):
return self.done_event.wait(timeout)

def is_done(self):
with self.lock:
return self.counter == 0
1 change: 1 addition & 0 deletions packages/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,7 @@ def schedule_jobs():
Thread(target=start_modbus_server, args=(event_modbus_server,), name="Modbus Control Server").start()
# Warten, damit subdata Zeit hat, alle Topics auf dem Broker zu empfangen.
event_update_config_completed.wait(300)
event_subdata_initialized.wait(300)
Pub().pub("openWB/set/system/boot_done", True)
Path(Path(__file__).resolve().parents[1]/"ramdisk"/"bootdone").touch()
schedule_jobs()
Expand Down