Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 79 additions & 8 deletions packages/modules/devices/sonnen/sonnenbatterie/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,11 @@ class JsonApiVersion(Enum):


class JsonApi():
class OperatingMode(Enum):
MANUAL = "1"
SELF_CONSUMPTION = "2"
TIME_OF_USE = "10"

class PowerMeterDirection(Enum):
PRODUCTION = "production"
CONSUMPTION = "consumption"
Expand Down Expand Up @@ -180,6 +185,8 @@ class ChannelDict(TypedDict):
w_l3: float
w_total: float

default_operating_mode: Optional[OperatingMode] = None

def __init__(self,
host: str,
api_version: JsonApiVersion = JsonApiVersion.V1,
Expand All @@ -191,6 +198,14 @@ def __init__(self,
raise ValueError("API v2 requires an auth_token.")
self.headers = {"auth-token": auth_token} if api_version == JsonApiVersion.V2 else {}

def __del__(self) -> None:
"""
Destructor to clean up the object.
"""
# restore normal operating mode
if self.api_version == JsonApiVersion.V2:
self.set_power_limit(None)

def __read(self, endpoint: str = "status") -> Dict:
"""
Reads data from the Sonnenbatterie JSON API.
Expand Down Expand Up @@ -239,6 +254,8 @@ def __state_from_channel(self, channel: ChannelDict) -> Union[CounterState, Inve
channel (ChannelDict): The channel data as a dictionary.
Returns:
CounterState|InverterState: The converted State object.
Raises:
ValueError: If the direction is neither "consumption" nor "production".
"""
if channel["direction"] == self.PowerMeterDirection.CONSUMPTION.value:
return CounterState(power=channel["w_total"],
Expand All @@ -250,26 +267,48 @@ def __state_from_channel(self, channel: ChannelDict) -> Union[CounterState, Inve
elif channel["direction"] == self.PowerMeterDirection.PRODUCTION.value:
return InverterState(power=-channel["w_total"],
# powers=[-channel[f"w_l{phase}"] for phase in range(1, 4)],
# currents=[-channel[f"a_l{phase}"] for phase in range(1, 4)],
currents=[-channel[f"a_l{phase}"] for phase in range(1, 4)],
# voltages=[channel[f"v_l{phase}_n"] for phase in range(1, 4)],
# imported=channel["kwh_exported"] * 1000,
exported=channel["kwh_imported"] * 1000)
else:
raise ValueError(f"Unknown direction: {channel['direction']}")

def __get_configurations(self) -> Dict:
"""
Reads the configurations from the JSON API.
Returns:
Dict: The configurations as a dictionary.
Raises:
ValueError: If the API version is not v2.
"""
if self.api_version != JsonApiVersion.V2:
raise ValueError("Diese Methode erfordert die JSON API v2!")
return self.__read(endpoint="configurations")

def __set_configurations(self, configuration: Dict) -> None:
"""
Sets the configurations for the battery system.
Args:
configuration (Dict): The configurations to set.
Raises:
ValueError: If the API version is not v2.
"""
if self.api_version != JsonApiVersion.V2:
raise ValueError("Diese Methode erfordert die JSON API v2!")
req.get_http_session().put(f"http://{self.host}/api/v2/configurations",
json=configuration,
headers={"Auth-Token": self.auth_token})

def __update_set_point(self, power_limit: int) -> None:
"""
Updates the set point for the battery system.
Args:
power_limit (int): The desired power limit in watts. A positive value indicates
charging, while a negative value indicates discharging.
Raises:
ValueError: If the API version is not v2.
"""
if self.api_version != JsonApiVersion.V2:
raise ValueError("Diese Methode erfordert die JSON API v2!")
command = "charge"
Expand Down Expand Up @@ -297,8 +336,15 @@ def update_battery(self, sim_counter: SimCounter) -> BatState:
battery_state = self.__read_status()
battery_power = -battery_state["Pac_total_W"]
battery_soc = battery_state["USOC"]
# try to calculate the individual line currents as no data is provided by the API
# we assume that the voltage is the same for all three phases
# this is not correct, but we have no other way to get the currents
# the current is calculated as apparent power / voltage
battery_ac_voltage = battery_state["Uac"]
currents = [battery_state[f"Sac{phase}"] / battery_ac_voltage for phase in range(1, 4)]
imported, exported = sim_counter.sim_count(battery_power)
return BatState(power=battery_power,
currents=currents,
soc=battery_soc,
imported=imported,
exported=exported)
Expand Down Expand Up @@ -352,15 +398,40 @@ def update_consumption_counter(self, sim_counter: SimCounter) -> CounterState:
return counter_state

def set_power_limit(self, power_limit: Optional[int]) -> None:
"""
Sets the power limit for the battery system.

This method adjusts the operating mode and power limit of the battery system
based on the provided `power_limit` value. If `power_limit` is None, the method
switches the operating mode to "Self Consumption". Otherwise, it switches the
operating mode to "Manual" and sets the specified power limit.

Args:
power_limit (Optional[int]): The desired power limit in watts. A positive value
indicates charging, while a negative value indicates
discharging. If None, the power limit is removed.

Raises:
ValueError: If the power limit control is not supported or the API version is not v2.
KeyError: If the required key 'EM_OperatingMode' is missing in the API response.
"""
if self.power_limit_controllable() is False:
raise ValueError("Leistungsvorgabe wird nur für 'JSON-API v2' unterstützt!")
operating_mode = self.__get_configurations()["EM_OperatingMode"]
configurations = self.__get_configurations()
if "EM_OperatingMode" not in configurations:
raise KeyError("The key 'EM_OperatingMode' is missing in the API response.")
if self.default_operating_mode is None:
# Store the default operating mode for later restoration
self.default_operating_mode = self.OperatingMode(configurations["EM_OperatingMode"])
operating_mode = self.OperatingMode(configurations["EM_OperatingMode"])
if power_limit is None:
# Keine Leistungsvorgabe, Betriebsmodus "Eigenverbrauch" aktivieren
if operating_mode == "1":
self.__set_configurations({"EM_OperatingMode": "2"})
# No specific power limit is set, activating default mode to allow the system to optimize energy usage by it
# self.
if operating_mode == self.OperatingMode.MANUAL:
self.__set_configurations({"EM_OperatingMode": self.default_operating_mode.value})
else:
# Leistungsvorgabe, Betriebsmodus "Manuell" aktivieren
if operating_mode == "2":
self.__set_configurations({"EM_OperatingMode": "1"})
# Activate "Manual" operating mode to allow direct control of the power limit
# when a specific `power_limit` value is provided.
if operating_mode != self.OperatingMode.MANUAL:
self.__set_configurations({"EM_OperatingMode": self.OperatingMode.MANUAL.value})
self.__update_set_point(power_limit)
Loading