Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions packages/helpermodules/command_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,9 @@


@pytest.fixture
def subdata_fixture() -> None:
SubData(*([Mock()]*16))
def subdata_fixture(monkeypatch) -> None:
monkeypatch.setattr(SubData, "initialize", lambda x: None)
SubData(*([Mock()]*15))
SubData.cp_data = {"cp0": Mock(spec=ChargepointStateUpdate, chargepoint=Mock(
spec=Chargepoint, chargepoint_module=Mock(spec=ChargepointModulePro)))}

Expand Down
7 changes: 2 additions & 5 deletions packages/helpermodules/setdata.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,18 +28,15 @@ class SetData:
def __init__(self,
event_ev_template: Event,
event_cp_config: Event,
event_soc: Event,
event_subdata_initialized: Event):
event_soc: Event):
self.event_ev_template = event_ev_template
self.event_cp_config = event_cp_config
self.event_soc = event_soc
self.event_subdata_initialized = event_subdata_initialized
self.heartbeat = False

def set_data(self):
self.internal_broker_client = BrokerClient("mqttset", self.on_connect, self.on_message)
self.event_subdata_initialized.wait()
log.debug("Subdata initialization completed. Starting setdata loop to broker.")
log.debug("Starting setdata loop to broker.")
self.internal_broker_client.start_infinite_loop()

def disconnect(self) -> None:
Expand Down
61 changes: 31 additions & 30 deletions packages/helpermodules/subdata.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import logging
from pathlib import Path
from threading import Event
import time
from typing import Dict, Union
import re
import subprocess
Expand All @@ -23,7 +24,6 @@
from helpermodules import graph, system
from helpermodules.broker import BrokerClient
from helpermodules.messaging import MessageType, pub_system_message
from helpermodules.utils import ProcessingCounter
from helpermodules.utils.run_command import run_command
from helpermodules.utils.topic_parser import decode_payload, get_index, get_second_index
from helpermodules.pub import Pub
Expand All @@ -39,6 +39,8 @@
log = logging.getLogger(__name__)
mqtt_log = logging.getLogger("mqtt")

QUIET_TIME = 3


class SubData:
""" Klasse, die die benötigten Topics abonniert, die Instanzen erstellt, wenn z.b. ein Modul neu konfiguriert
Expand Down Expand Up @@ -77,7 +79,6 @@ def __init__(self,
event_copy_data: Event,
event_global_data_initialized: Event,
event_command_completed: Event,
event_subdata_initialized: Event,
event_vehicle_update_completed: Event,
event_start_internal_chargepoint: Event,
event_stop_internal_chargepoint: Event,
Expand All @@ -93,7 +94,6 @@ def __init__(self,
self.event_copy_data = event_copy_data
self.event_global_data_initialized = event_global_data_initialized
self.event_command_completed = event_command_completed
self.event_subdata_initialized = event_subdata_initialized
self.event_vehicle_update_completed = event_vehicle_update_completed
self.event_start_internal_chargepoint = event_start_internal_chargepoint
self.event_stop_internal_chargepoint = event_stop_internal_chargepoint
Expand All @@ -104,13 +104,28 @@ def __init__(self,
self.event_modbus_server = event_modbus_server
self.event_restart_gpio = event_restart_gpio
self.heartbeat = False
# Immer wenn ein Subscribe hinzugefügt wird, wird der Zähler hinzugefügt und subdata_initialized gepublished.
# Wenn subdata_initialized empfangen wird, wird der Zäheler runtergezählt. Erst wenn alle subdata_initialized
# empfangen wurden, wurden auch die vorher subskribierten Topics empfangen und der Algorithmus kann starten.
self.processing_counter = ProcessingCounter(self.event_subdata_initialized)
self.last_msg_time = None
self.initialized = False
self.internal_broker_client = BrokerClient("mqttsub", self.on_connect, self.on_message)
self.initialize()

def initialize(self):
try:
self.internal_broker_client.client.loop_start()
log.debug("Warte auf retained Topics ...")
while self.last_msg_time is None:
time.sleep(0.05)
# quiet time detector: mqtt schickt alle retained messages direkt nach dem subscribe
while time.time() - self.last_msg_time < QUIET_TIME:
time.sleep(0.05)
self.internal_broker_client.client.loop_stop()
log.debug("Alle retained Topics empfangen.")
self.initialized = True
except Exception:
log.exception("Fehler beim Initialisieren des Subdata-Moduls")

def sub_topics(self):
self.internal_broker_client = BrokerClient("mqttsub", self.on_connect, self.on_message)
log.debug("Starte Subdata-MQTT-Loop")
self.internal_broker_client.start_infinite_loop()

def disconnect(self) -> None:
Expand Down Expand Up @@ -149,15 +164,14 @@ def on_connect(self, client: mqtt.Client, userdata, flags: dict, rc: int):
("openWB/LegacySmartHome/Status/wattnichtHaus", 2),
("openWB/io/#", 2),
])
self.processing_counter.add_task()
Pub().pub("openWB/system/subdata_initialized", True)

def on_message(self, client: mqtt.Client, userdata, msg: mqtt.MQTTMessage):
""" wait for incoming topics.
"""
mqtt_log.debug("Topic: "+str(msg.topic) +
", Payload: "+str(msg.payload.decode("utf-8")))
mqtt_log.debug(f"Topic: {msg.topic}, Payload: {msg.payload.decode('utf-8')}")
self.heartbeat = True
if self.initialized is False:
self.last_msg_time = time.time()
if "openWB/vehicle/template/charge_template/" in msg.topic:
self.process_vehicle_charge_template_topic(
self.ev_charge_template_data, msg)
Expand Down Expand Up @@ -311,8 +325,6 @@ def process_vehicle_topic(self, client: mqtt.Client, var: Dict[str, ev.Ev], msg:
var["ev"+index].soc_module = mod.create_vehicle(config, index)
client.subscribe(f"openWB/vehicle/{index}/soc_module/calculated_soc_state", 2)
client.subscribe(f"openWB/vehicle/{index}/soc_module/general_config", 2)
self.processing_counter.add_task()
Pub().pub("openWB/system/subdata_initialized", True)
self.event_soc.set()
else:
# temporäres ChargeTemplate aktualisieren, wenn dem Fahrzeug ein anderes Ladeprofil zugeordnet
Expand Down Expand Up @@ -610,10 +622,7 @@ def process_general_topic(self, var: general.General, msg: mqtt.MQTTMessage):
if decode_payload(msg.payload) and self.general_data.data.extern:
self.event_modbus_server.set()
elif "openWB/general/http_api" == msg.topic:
if (
self.event_subdata_initialized.is_set() and
self.general_data.data.http_api != decode_payload(msg.payload)
):
if self.initialized and self.general_data.data.http_api != decode_payload(msg.payload):
pub_system_message(
msg.payload,
"Bitte die openWB <a href=\"/openWB/web/settings/#/System/SystemConfiguration\">"
Expand Down Expand Up @@ -750,7 +759,7 @@ def process_optional_topic(self, var: optional.Optional, msg: mqtt.MQTTMessage):
var.data.ocpp = dataclass_from_dict(Ocpp, config_dict)
elif re.search("/optional/monitoring/", msg.topic) is not None:
# do not reconfigure monitoring if topic is received on startup
if self.event_subdata_initialized.is_set():
if self.initialized:
config = decode_payload(msg.payload)
if config["type"] is None:
var.monitoring_stop()
Expand Down Expand Up @@ -834,8 +843,6 @@ def process_system_topic(self, client: mqtt.Client, var: dict, msg: mqtt.MQTTMes
# Durch das erneute Subscribe werden die Komponenten mit dem aktualisierten TCP-Client angelegt.
client.subscribe(f"openWB/system/device/{index}/component/+/config", 2)
client.subscribe(f"openWB/system/device/{index}/error_timestamp", 2)
self.processing_counter.add_task()
Pub().pub("openWB/system/subdata_initialized", True)
elif re.search("^.+/device/[0-9]+/component/[0-9]+/simulation$", msg.topic) is not None:
index = get_index(msg.topic)
index_second = get_second_index(msg.topic)
Expand Down Expand Up @@ -872,11 +879,9 @@ def process_system_topic(self, client: mqtt.Client, var: dict, msg: mqtt.MQTTMes
config = dataclass_from_dict(component.component_descriptor.configuration_factory, component_config)
var["device"+index].add_component(config)
client.subscribe(f"openWB/system/device/{index}/component/{index_second}/simulation", 2)
self.processing_counter.add_task()
Pub().pub("openWB/system/subdata_initialized", True)
elif "mqtt" and "bridge" in msg.topic:
# do not reconfigure mqtt bridges if topic is received on startup
if self.event_subdata_initialized.is_set():
if self.initialized:
index = get_index(msg.topic)
parent_file = Path(__file__).resolve().parents[2]
try:
Expand Down Expand Up @@ -904,7 +909,7 @@ def process_system_topic(self, client: mqtt.Client, var: dict, msg: mqtt.MQTTMes
run_command([str(Path(__file__).resolve().parents[2] / "runs" / "start_remote_support.sh"),
token, port, user], process_exception=True)
elif "openWB/system/backup_password" in msg.topic:
if self.event_subdata_initialized.is_set():
if self.initialized:
key_file = Path.home() / "backup.key"
payload = decode_payload(msg.payload)
if payload is None or payload == "":
Expand All @@ -927,7 +932,7 @@ def process_system_topic(self, client: mqtt.Client, var: dict, msg: mqtt.MQTTMes
self.set_json_payload(var["system"].data["backup_cloud"], msg)
elif ("openWB/system/dataprotection_acknowledged" == msg.topic and
decode_payload(msg.payload) is False):
if self.event_subdata_initialized.is_set():
if self.initialized:
Pub().pub("openWB/set/command/removeCloudBridge/todo",
{"command": "removeCloudBridge"})
else:
Expand Down Expand Up @@ -967,10 +972,6 @@ def process_system_topic(self, client: mqtt.Client, var: dict, msg: mqtt.MQTTMes
"openWB/system/time" == msg.topic):
# Logged in update.log, not used in data.data and removed due to readability purposes of main.log.
return
elif "openWB/system/subdata_initialized" == msg.topic:
if decode_payload(msg.payload) != "":
Pub().pub("openWB/system/subdata_initialized", "")
self.processing_counter.task_done()
elif "openWB/system/update_config_completed" == msg.topic:
if decode_payload(msg.payload) != "":
Pub().pub("openWB/system/update_config_completed", "")
Expand Down
1 change: 0 additions & 1 deletion packages/helpermodules/utils/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,2 @@
from helpermodules.utils._get_default import get_default
from helpermodules.utils._thread_handler import joined_thread_handler, thread_handler
from helpermodules.utils.processing_counter import ProcessingCounter
26 changes: 0 additions & 26 deletions packages/helpermodules/utils/processing_counter.py

This file was deleted.

9 changes: 3 additions & 6 deletions packages/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import time
from threading import Event, Thread, enumerate
import traceback
from control.chargelog.chargelog import calc_energy_costs, calculate_charged_energy_by_source
from control.chargelog.chargelog import calc_energy_costs

from control import data, prepare, process
from control.algorithm import algorithm
Expand Down Expand Up @@ -303,7 +303,6 @@ def schedule_jobs():
event_global_data_initialized = Event()
event_command_completed = Event()
event_command_completed.set()
event_subdata_initialized = Event()
event_update_config_completed = Event()
event_modbus_server = Event()
event_jobs_running = Event()
Expand All @@ -314,12 +313,11 @@ def schedule_jobs():
prep = prepare.Prepare()
soc = update_soc.UpdateSoc(event_update_soc)
set = setdata.SetData(event_ev_template,
event_cp_config, event_soc,
event_subdata_initialized)
event_cp_config, event_soc)
sub = subdata.SubData(event_ev_template,
event_cp_config, loadvars_.event_module_update_completed,
event_copy_data, event_global_data_initialized, event_command_completed,
event_subdata_initialized, soc.event_vehicle_update_completed,
soc.event_vehicle_update_completed,
general_internal_chargepoint_handler.event_start,
general_internal_chargepoint_handler.event_stop,
event_update_config_completed,
Expand Down Expand Up @@ -348,7 +346,6 @@ def schedule_jobs():
Thread(target=start_modbus_server, args=(event_modbus_server,), name="Modbus Control Server").start()
# Warten, damit subdata Zeit hat, alle Topics auf dem Broker zu empfangen.
event_update_config_completed.wait(300)
event_subdata_initialized.wait(300)
Pub().pub("openWB/set/system/boot_done", True)
Path(Path(__file__).resolve().parents[1]/"ramdisk"/"bootdone").touch()
schedule_jobs()
Expand Down
5 changes: 3 additions & 2 deletions packages/modules/update_soc_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,11 @@


@pytest.fixture(autouse=True)
def mock_data() -> None:
def mock_data(monkeypatch) -> None:
data.data_init(Mock())

SubData(*([Mock()]*16))
monkeypatch.setattr(SubData, "initialize", lambda x: None)
SubData(*([Mock()]*15))
SubData.cp_data = {"cp0": Mock(spec=ChargepointStateUpdate, chargepoint=Mock(
spec=Chargepoint,
id=id,
Expand Down