Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 50 additions & 8 deletions packages/control/optional.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@
"""
import logging
from math import ceil
import random
from threading import Thread
from typing import List, Optional
from datetime import datetime
from typing import List, Optional as TypingOptional
from datetime import datetime, timedelta

from control import data
from control.ocpp import OcppMixin
Expand All @@ -19,27 +20,28 @@

log = logging.getLogger(__name__)
AS_EURO_PER_KWH = 1000.0 # Umrechnung von €/Wh in €/kWh
TARIFF_UPDATE_HOUR = 14 # latest expected time for daily tariff update


class Optional(OcppMixin):
def __init__(self):
try:
self.data = OptionalData()
# guarded et_module stored in a private attribute
self._et_module: Optional[ConfigurableElectricityTariff] = None
self._et_module: TypingOptional[ConfigurableElectricityTariff] = None
self.monitoring_module: ConfigurableMonitoring = None
self.data.dc_charging = hardware_configuration.get_hardware_configuration_setting("dc_charging")
Pub().pub("openWB/optional/dc_charging", self.data.dc_charging)
except Exception:
log.exception("Fehler im Optional-Modul")

@property
def et_module(self) -> Optional[ConfigurableElectricityTariff]:
def et_module(self) -> TypingOptional[ConfigurableElectricityTariff]:
"""Getter for the electricity tariff module (may be None)."""
return self._et_module

@et_module.setter
def et_module(self, value: Optional[ConfigurableElectricityTariff]):
def et_module(self, value: TypingOptional[ConfigurableElectricityTariff]):
"""Setter with basic type-guarding and logging.

Accepts either None or a ConfigurableElectricityTariff instance. Logs when set/cleared.
Expand All @@ -61,8 +63,8 @@ def monitoring_start(self):
self.monitoring_module.start_monitoring()

def monitoring_stop(self):
if self.mon_module is not None:
self.mon_module.stop_monitoring()
if self.monitoring_module is not None:
self.monitoring_module.stop_monitoring()

def et_provider_available(self) -> bool:
return self.et_module is not None
Expand Down Expand Up @@ -201,9 +203,11 @@ def et_get_loading_hours(self, duration: float, remaining_time: float) -> List[i

def et_get_prices(self):
try:
if self.et_module:
if self.et_module and self.et_price_update_required():
thread_handler(Thread(target=self.et_module.update, args=(),
name="electricity tariff in optional"))
self.data.et.get.next_query_time = None
Pub().pub("openWB/set/optional/et/get/next_query_time", None)
else:
# Wenn kein Modul konfiguriert ist, Fehlerstatus zurücksetzen.
if self.data.et.get.fault_state != 0 or self.data.et.get.fault_str != NO_ERROR:
Expand All @@ -212,6 +216,44 @@ def et_get_prices(self):
except Exception as e:
log.exception("Fehler im Optional-Modul: %s", e)

def et_price_update_required(self) -> bool:
def is_tomorrow(last_timestamp: str) -> bool:
return (day_of(date=datetime.now()) < day_of(datetime.fromtimestamp(int(last_timestamp)))
or day_of(date=datetime.now()).hour < TARIFF_UPDATE_HOUR)

def day_of(date: datetime) -> datetime:
return date.replace(hour=0, minute=0, second=0, microsecond=0)

def get_last_entry_time_stamp() -> str:
last_known_timestamp = "0"
if self.data.et.get.prices is not None:
last_known_timestamp = max(self.data.et.get.prices)
return last_known_timestamp
if len(self.data.et.get.prices) == 0:
return True
if self.data.et.get.next_query_time is None:
next_query_time = datetime.fromtimestamp(int(max(self.data.et.get.prices))).replace(
hour=TARIFF_UPDATE_HOUR, minute=0, second=0
) + timedelta(
# aktually ET providers issue next day prices up to half an hour earlier then 14:00
# reduce serverload on their site by trying early and randomizing query time
minutes=random.randint(1, 7) * -5
)
self.data.et.get.next_query_time = next_query_time.timestamp()
Pub().pub("openWB/set/optional/et/get/next_query_time", self.data.et.get.next_query_time)
if is_tomorrow(get_last_entry_time_stamp()):
if timecheck.create_timestamp() > self.data.et.get.next_query_time:
log.info(
f'Wartezeit {datetime.fromtimestamp(self.data.et.get.next_query_time).strftime("%Y%m%d-%H:%M:%S")}'
' abgelaufen, Strompreise werden abgefragt')
return True
else:
log.info(
'Nächster Abruf der Strompreise '
f'{datetime.fromtimestamp(self.data.et.get.next_query_time).strftime("%Y%m%d-%H:%M:%S")}.')
return False
return False

def ocpp_transfer_meter_values(self):
try:
if self.data.ocpp.active:
Expand Down
1 change: 1 addition & 0 deletions packages/control/optional_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
class EtGet:
fault_state: int = 0
fault_str: str = NO_ERROR
next_query_time: Optional[float] = None
prices: Dict = field(default_factory=empty_dict_factory)


Expand Down
40 changes: 40 additions & 0 deletions packages/control/optional_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -437,3 +437,43 @@ def test_et_charging_available_exception(monkeypatch):
opt.data.et.get.prices = {} # empty prices list raises exception
result = opt.et_is_charging_allowed_hours_list([])
assert result is False


@pytest.mark.parametrize(
"prices, next_query_time, current_timestamp, expected",
[
pytest.param(
{}, None, 1698224400, True,
id="update_required_when_no_prices"
),
pytest.param(
{"1698224400": 0.1, "1698228000": 0.2}, None, 1698224400, False,
id="no_update_required_when_prices_available_and_recent"
),
pytest.param(
{"1698224400": 0.1, "1698228000": 0.2}, 1698310800, 1698224400, False,
id="no_update_required_when_next_query_time_not_reached"
),
pytest.param(
{"1698224400": 0.1, "1698228000": 0.2}, 1698224000, 1698310800, True,
id="update_required_when_next_query_time_passed"
),
pytest.param(
{"1609459200": 0.1, "1609462800": 0.2}, None, 1698224400, True,
id="update_required_when_prices_from_yesterday"
),
]
)
def test_et_price_update_required(monkeypatch, prices, next_query_time, current_timestamp, expected):
# setup
opt = Optional()
opt.data.et.get.prices = prices
opt.data.et.get.next_query_time = next_query_time

monkeypatch.setattr(timecheck, "create_timestamp", Mock(return_value=current_timestamp))

# execution
result = opt.et_price_update_required()

# evaluation
assert result == expected
2 changes: 1 addition & 1 deletion packages/helpermodules/setdata.py
Original file line number Diff line number Diff line change
Expand Up @@ -849,7 +849,7 @@ def process_optional_topic(self, msg: mqtt.MQTTMessage):
try:
if "openWB/set/optional/et/get/prices" in msg.topic:
self._validate_value(msg, "json")
elif "openWB/set/optional/et/get/price" in msg.topic:
elif "openWB/set/optional/et/get/next_query_time" in msg.topic:
self._validate_value(msg, float)
elif "openWB/set/optional/et/get/fault_state" in msg.topic:
self._validate_value(msg, int, [(0, 2)])
Expand Down
91 changes: 7 additions & 84 deletions packages/modules/common/configurable_tariff.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
from typing import TypeVar, Generic, Callable
from datetime import datetime, timedelta
from helpermodules import timecheck
import random
import logging
from modules.common import store
from modules.common.component_context import SingleComponentUpdateContext
Expand All @@ -14,21 +12,12 @@
TARIFF_UPDATE_HOUR = 14 # latest expected time for daily tariff update
ONE_HOUR_SECONDS: int = 3600
log = logging.getLogger(__name__)
'''
next_query_time and internal_tariff_state are defined outside of class ConfigurableElectricityTariff because
for an unknown reason defining them as a class variable does not keep their values.
'''
next_query_time: datetime = datetime.fromtimestamp(1)
internal_tariff_state: TariffState = None


class ConfigurableElectricityTariff(Generic[T_TARIFF_CONFIG]):
def __init__(self,
config: T_TARIFF_CONFIG,
component_initializer: Callable[[], float]) -> None:
global internal_tariff_state, next_query_time
next_query_time = datetime.now()
internal_tariff_state = None
self.config = config
self.store = store.get_electricity_tariff_value_store()
self.fault_state = FaultState(ComponentInfo(None, self.config.name, ComponentType.ELECTRICITY_TARIFF.value))
Expand All @@ -41,90 +30,30 @@ def __init__(self,
def update(self) -> None:
if hasattr(self, "_component_updater"):
with SingleComponentUpdateContext(self.fault_state):
tariff_state, timeslot_length_seconds = self.__update_et_provider_data(internal_tariff_state)
tariff_state, timeslot_length_seconds = self.__update_et_provider_data()
self.__store_and_publish_updated_data(tariff_state)
self.__log_and_publish_progress(timeslot_length_seconds)
self.__log_and_publish_progress(timeslot_length_seconds, tariff_state)

def __update_et_provider_data(self, tariff_state: TariffState) -> tuple[TariffState, int]:
tariff_state = self.__query_et_provider_data_once_per_day(internal_tariff_state)
def __update_et_provider_data(self) -> tuple[TariffState, int]:
tariff_state = self._component_updater()
timeslot_length_seconds = self.__calculate_price_timeslot_length(tariff_state)
tariff_state = self._remove_outdated_prices(tariff_state, timeslot_length_seconds)
return tariff_state, timeslot_length_seconds

def __query_et_provider_data_once_per_day(self, tariff_state: TariffState) -> TariffState:
if datetime.now() > next_query_time:
return self.__query_et_provider_data(tariff_state=tariff_state)
else:
return tariff_state

def __query_et_provider_data(self, tariff_state: TariffState) -> TariffState:
def is_tomorrow(last_timestamp: str) -> bool:
return (self.__day_of(date=datetime.now()) < self.__day_of(datetime.fromtimestamp(int(last_timestamp)))
or self.__day_of(date=datetime.now()).hour < TARIFF_UPDATE_HOUR)
global next_query_time
log.info(f'Wartezeit {next_query_time.strftime("%Y%m%d-%H:%M:%S")}'
' abgelaufen, Strompreise werden abgefragt'
)
try:
new_tariff_state = self._component_updater()
if 0 < len(new_tariff_state.prices):
if is_tomorrow(self.__get_last_entry_time_stamp(new_tariff_state)):
next_query_time = self.__calulate_next_query_time(new_tariff_state)
log.info('Nächster Abruf der Strompreise'
f' {next_query_time.strftime("%Y%m%d-%H:%M:%S")}.')
else:
log.info('Keine Daten für morgen erhalten, weiterer Versuch in 5 Minuten')
return new_tariff_state
else:
log.warning('Leere Preisliste erhalten, weiterer Versuch in 5 Minuten.')
return tariff_state
except Exception as e:
log.warning(f'Fehler beim Abruf der Strompreise: {e}, nächster Versuch in 5 Minuten.')
self.fault_state.warning(
f'Fehler beim Abruf der Strompreise: {e}, nächster Versuch in 5 Minuten.')
return tariff_state

def __day_of(self, date: datetime) -> datetime:
return date.replace(hour=0, minute=0, second=0, microsecond=0)

def __next_query_message(self) -> str:
tomorrow = (
''
if self.__day_of(datetime.now()) == self.__day_of(next_query_time)
else 'morgen '
)
return (
f'{tomorrow}{next_query_time.strftime("%H:%M")}'
if datetime.now() < next_query_time
else "im nächsten Regelzyklus"
)

def __log_and_publish_progress(self, timeslot_length_seconds):
def __log_and_publish_progress(self, timeslot_length_seconds, tariff_state):
def publish_info(message_extension: str) -> None:
self.fault_state.no_error(
f'Die Preisliste hat {message_extension}{len(internal_tariff_state.prices)} Einträge. '
f'Nächster Abruf der Strompreise {self.__next_query_message()}.')
f'Die Preisliste hat {message_extension}{len(tariff_state.prices)} Einträge. ')
expected_time_slots = int(24 * ONE_HOUR_SECONDS / timeslot_length_seconds)
publish_info(f'nicht {expected_time_slots}, sondern '
if len(internal_tariff_state.prices) < expected_time_slots
if len(tariff_state.prices) < expected_time_slots
else ''
)

def __store_and_publish_updated_data(self, tariff_state: TariffState) -> None:
global internal_tariff_state
internal_tariff_state = tariff_state
self.store.set(tariff_state)
self.store.update()

def __calulate_next_query_time(self, tariff_state: TariffState) -> datetime:
return datetime.fromtimestamp(int(max(tariff_state.prices))).replace(
hour=TARIFF_UPDATE_HOUR, minute=0, second=0
) + timedelta(
# aktually ET providers issue next day prices up to half an hour earlier then 14:00
# reduce serverload on their site by trying early and randomizing query time
minutes=random.randint(1, 7) * -5
)

def __calculate_price_timeslot_length(self, tariff_state: TariffState) -> int:
if (tariff_state is None or
tariff_state.prices is None or
Expand All @@ -135,12 +64,6 @@ def __calculate_price_timeslot_length(self, tariff_state: TariffState) -> int:
first_timestamps = list(tariff_state.prices.keys())[:2]
return int(first_timestamps[1]) - int(first_timestamps[0])

def __get_last_entry_time_stamp(self, tariff_state: TariffState) -> str:
last_known_timestamp = "0"
if tariff_state is not None:
last_known_timestamp = max(tariff_state.prices)
return last_known_timestamp

def _remove_outdated_prices(self, tariff_state: TariffState, timeslot_length_seconds: int) -> TariffState:
if tariff_state.prices is None:
self.fault_state.error("no prices to show")
Expand Down
Loading