From c33f3621924200ebee39bf2709f5b62ba97ea8b5 Mon Sep 17 00:00:00 2001 From: Lutz Bender Date: Thu, 17 Apr 2025 13:22:28 +0200 Subject: [PATCH] fix counter/inverter state --- .../devices/sonnen/sonnenbatterie/api.py | 44 ++++++++++++------- 1 file changed, 27 insertions(+), 17 deletions(-) diff --git a/packages/modules/devices/sonnen/sonnenbatterie/api.py b/packages/modules/devices/sonnen/sonnenbatterie/api.py index a774953267..60edeaf089 100644 --- a/packages/modules/devices/sonnen/sonnenbatterie/api.py +++ b/packages/modules/devices/sonnen/sonnenbatterie/api.py @@ -1,6 +1,6 @@ #!/usr/bin/env python3 from enum import Enum -from typing import Dict, List, Optional, TypedDict +from typing import Dict, List, Optional, TypedDict, Union from modules.common import req from modules.common.component_state import BatState, CounterState, InverterState from modules.common.simcount import SimCounter @@ -229,22 +229,33 @@ def __read_power_meter(self, direction: Optional[PowerMeterDirection] = None) -> raise ValueError(f"No data found for direction: {direction.value}") return data - def __counter_state_from_channel(self, channel: ChannelDict, inverted: bool = False) -> CounterState: + def __state_from_channel(self, channel: ChannelDict) -> Union[CounterState, InverterState]: """ - Converts a channel dictionary to a CounterState object. + Converts a channel dictionary to a CounterState or InverterState object based on channel["direction"]. + If the direction is "consumption", it returns a CounterState object. + If the direction is "production", it returns an InverterState object. + If the direction is neither, it raises a ValueError. Args: channel (ChannelDict): The channel data as a dictionary. Returns: - CounterState: The converted CounterState object. - """ - return CounterState(power=-channel["w_total"] if inverted else channel["w_total"], - powers=[-channel[f"w_l{phase}"] for phase in range(1, 4)] if inverted else - [channel[f"w_l{phase}"] for phase in range(1, 4)], - currents=[-channel[f"a_l{phase}"] for phase in range(1, 4)] if inverted else - [channel[f"a_l{phase}"] for phase in range(1, 4)], - voltages=[channel[f"v_l{phase}_n"] for phase in range(1, 4)], - imported=channel["kwh_exported"] if inverted else channel["kwh_imported"], - exported=channel["kwh_imported"] if inverted else channel["kwh_exported"]) + CounterState|InverterState: The converted State object. + """ + if channel["direction"] == self.PowerMeterDirection.CONSUMPTION.value: + return CounterState(power=channel["w_total"], + powers=[channel[f"w_l{phase}"] for phase in range(1, 4)], + currents=[channel[f"a_l{phase}"] for phase in range(1, 4)], + voltages=[channel[f"v_l{phase}_n"] for phase in range(1, 4)], + imported=channel["kwh_imported"], + exported=channel["kwh_exported"]) + elif channel["direction"] == self.PowerMeterDirection.PRODUCTION.value: + return InverterState(power=-channel["w_total"], + # powers=[-channel[f"w_l{phase}"] for phase in range(1, 4)], + # currents=[-channel[f"a_l{phase}"] for phase in range(1, 4)], + # voltages=[channel[f"v_l{phase}_n"] for phase in range(1, 4)], + # imported=channel["kwh_exported"] + exported=channel["kwh_imported"]) + else: + raise ValueError(f"Unknown direction: {channel['direction']}") def __get_configurations(self) -> Dict: if self.api_version != JsonApiVersion.V2: @@ -322,9 +333,8 @@ def update_inverter(self, sim_counter: SimCounter) -> InverterState: return InverterState(exported=exported, power=pv_power) else: - return self.__counter_state_from_channel( - self.__read_power_meter(direction=self.PowerMeterDirection.PRODUCTION)[0], - inverted=True) + return self.__state_from_channel( + self.__read_power_meter(direction=self.PowerMeterDirection.PRODUCTION)[0]) def update_consumption_counter(self) -> CounterState: """ @@ -332,7 +342,7 @@ def update_consumption_counter(self) -> CounterState: Returns: CounterState: The updated consumption counter state. """ - return self.__counter_state_from_channel( + return self.__state_from_channel( self.__read_power_meter(direction=self.PowerMeterDirection.CONSUMPTION)[0]) def set_power_limit(self, power_limit: Optional[int]) -> None: