Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

## Ongoing

- Code optimizations via PR [#837](https://github.com/plugwise/python-plugwise/pull/837), [#838](https://github.com/plugwise/python-plugwise/pull/838)
- Code optimizations via PR [#837](https://github.com/plugwise/python-plugwise/pull/837), [#838](https://github.com/plugwise/python-plugwise/pull/838), [#839](https://github.com/plugwise/python-plugwise/pull/839)

## v1.11.0

Expand Down
19 changes: 8 additions & 11 deletions plugwise/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -188,29 +188,28 @@ def _entity_switching_group(self, entity: GwEntityData) -> None:
entity["switches"]["relay"] = counter != 0
self._count += 1

def _get_groups(self) -> dict[str, GwEntityData]:
def _get_groups(self) -> None:
"""Helper-function for smile.py: get_all_gateway_entities().

Collect switching-, pumping- or report-group info.
"""
groups: dict[str, GwEntityData] = {}
# P1 and Anna don't have groups
if self.smile.type == "power" or self.check_name(ANNA):
return groups
return

for group in self._domain_objects.findall("./group"):
members: list[str] = []
group_id = group.attrib["id"]
group_id = group.get("id")
group_name = group.find("name").text
group_type = group.find("type").text
group_appliances = group.findall("appliances/appliance")
for item in group_appliances:
# Check if members are not orphaned - stretch
if item.attrib["id"] in self.gw_entities:
members.append(item.attrib["id"])
if item.get("id") in self.gw_entities:
members.append(item.get("id"))

if group_type in GROUP_TYPES and members:
groups[group_id] = {
if group_type in GROUP_TYPES and members and group_id:
self.gw_entities[group_id] = {
"dev_class": group_type,
"model": "Group",
"name": group_name,
Expand All @@ -219,8 +218,6 @@ def _get_groups(self) -> dict[str, GwEntityData]:
}
self._count += 5

return groups

def _get_lock_state(
self, xml: etree.Element, data: GwEntityData, stretch_v2: bool = False
) -> None:
Expand Down Expand Up @@ -266,7 +263,7 @@ def _get_module_data(
if key is not None and key not in link_tag:
continue

link_id = appl_search.attrib["id"]
link_id = appl_search.get("id")
loc = f".//services/{link_tag}[@id='{link_id}']...."
# Not possible to walrus for some reason...
# xml_2: self._modules for legacy, self._domain_objects for actual
Expand Down
28 changes: 14 additions & 14 deletions plugwise/helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ def _get_appliances(self) -> None:
for appliance in self._domain_objects.findall("./appliance"):
appl = Munch()
appl.available = None
appl.entity_id = appliance.attrib["id"]
appl.entity_id = appliance.get("id")
appl.location = None
appl.name = appliance.find("name").text
appl.model = None
Expand All @@ -139,7 +139,7 @@ def _get_appliances(self) -> None:
continue

if (appl_loc := appliance.find("location")) is not None:
appl.location = appl_loc.attrib["id"]
appl.location = appl_loc.get("id")
# Set location to the _home_loc_id when the appliance-location is not found,
# except for thermostat-devices without a location, they are not active
elif appl.pwclass not in THERMOSTAT_CLASSES:
Expand Down Expand Up @@ -206,7 +206,7 @@ def _get_locations(self) -> None:
loc = Munch()
locations = self._domain_objects.findall("./location")
for location in locations:
loc.loc_id = location.attrib["id"]
loc.loc_id = location.get("id")
loc.name = location.find("name").text
loc._type = location.find("type").text
self._loc_data[loc.loc_id] = {
Expand Down Expand Up @@ -269,7 +269,7 @@ def _appliance_info_finder(self, appl: Munch, appliance: etree.Element) -> Munch

def _appl_gateway_info(self, appl: Munch, appliance: etree.Element) -> Munch:
"""Helper-function for _appliance_info_finder()."""
self._gateway_id = appliance.attrib["id"]
self._gateway_id = appliance.get("id")
appl.firmware = str(self.smile.version)
appl.hardware = self.smile.hw_version
appl.mac = self.smile.mac_address
Expand Down Expand Up @@ -319,7 +319,7 @@ def _get_appliances_with_offset_functionality(self) -> list[str]:
'.//actuator_functionalities/offset_functionality[type="temperature_offset"]/offset/../../..'
)
for item in offset_appls:
therm_list.append(item.attrib["id"])
therm_list.append(item.get("id"))

return therm_list

Expand Down Expand Up @@ -393,12 +393,12 @@ def _get_measurement_data(self, entity_id: str, entity: GwEntityData) -> None:
def _collect_group_sensors(
self,
data: GwEntityData,
entity_id: str,
group_id: str,
measurements: dict[str, UOM],
) -> None:
"""Collect group sensors."""
if (
group := self._domain_objects.find(f'./group[@id="{entity_id}"]')
group := self._domain_objects.find(f'./group[@id="{group_id}"]')
) is not None:
for measurement, attrs in measurements.items():
locator = f'.//logs/point_log[type="{measurement}"]/period/measurement'
Expand Down Expand Up @@ -502,7 +502,7 @@ def _get_plugwise_notifications(self) -> None:
self._notifications = {}
for notification in self._domain_objects.findall("./notification"):
try:
msg_id = notification.attrib["id"]
msg_id = notification.get("id")
msg_type = notification.find("type").text
msg = notification.find("message").text
self._notifications[msg_id] = {msg_type: msg}
Expand Down Expand Up @@ -903,7 +903,7 @@ def _presets(self, loc_id: str) -> dict[str, list[float]]:
directives = self._domain_objects.find(f'rule[@id="{rule_id}"]/directives')
for directive in directives:
preset = directive.find("then").attrib
presets[directive.attrib["preset"]] = [
presets[directive.get("preset")] = [
float(preset["heating_setpoint"]),
float(preset["cooling_setpoint"]),
]
Expand All @@ -920,13 +920,13 @@ def _rule_ids_by_name(self, name: str, loc_id: str) -> dict[str, dict[str, str]]
for rule in self._domain_objects.findall(f'./rule[name="{name}"]'):
active = rule.find("active").text
if rule.find(locator) is not None:
schedule_ids[rule.attrib["id"]] = {
schedule_ids[rule.get("id")] = {
"location": loc_id,
"name": name,
"active": active,
}
else:
schedule_ids[rule.attrib["id"]] = {
schedule_ids[rule.get("id")] = {
"location": NONE,
"name": name,
"active": active,
Expand All @@ -947,13 +947,13 @@ def _rule_ids_by_tag(self, tag: str, loc_id: str) -> dict[str, dict[str, str]]:
name = rule.find("name").text
active = rule.find("active").text
if rule.find(locator2) is not None:
schedule_ids[rule.attrib["id"]] = {
schedule_ids[rule.get("id")] = {
"location": loc_id,
"name": name,
"active": active,
}
else:
schedule_ids[rule.attrib["id"]] = {
schedule_ids[rule.get("id")] = {
"location": NONE,
"name": name,
"active": active,
Expand Down Expand Up @@ -1002,6 +1002,6 @@ def _thermostat_uri(self, loc_id: str) -> str:
Determine the location-set_temperature uri - from LOCATIONS.
"""
locator = f'./location[@id="{loc_id}"]/actuator_functionalities/thermostat_functionality'
thermostat_functionality_id = self._domain_objects.find(locator).attrib["id"]
thermostat_functionality_id = self._domain_objects.find(locator).get("id")

return f"{LOCATIONS};id={loc_id}/thermostat;id={thermostat_functionality_id}"
18 changes: 11 additions & 7 deletions plugwise/legacy/helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ def _all_appliances(self) -> None:
continue # pragma: no cover

appl.location = self._home_loc_id
appl.entity_id = appliance.attrib["id"]
appl.entity_id = appliance.get("id")
appl.name = appliance.find("name").text
# Extend device_class name when a Circle/Stealth is type heater_central -- Pw-Beta Issue #739
if (
Expand Down Expand Up @@ -148,7 +148,7 @@ def _all_locations(self) -> None:
return

for location in locations:
loc.loc_id = location.attrib["id"]
loc.loc_id = location.get("id")
loc.name = location.find("name").text
loc._type = location.find("type").text
# Filter the valid single location for P1 legacy: services not empty
Expand Down Expand Up @@ -393,10 +393,14 @@ def _presets(self) -> dict[str, list[float]]:
"""Helper-function for presets() - collect Presets for a legacy Anna."""
presets: dict[str, list[float]] = {}
for directive in self._domain_objects.findall("rule/directives/when/then"):
if directive is not None and directive.get("icon") is not None:
if (
directive is not None
and directive.get("icon") is not None
and directive.get("temperature") is not None
):
# Ensure list of heating_setpoint, cooling_setpoint
presets[directive.attrib["icon"]] = [
float(directive.attrib["temperature"]),
presets[directive.get("icon")] = [
float(directive.get("temperature")),
0,
]

Expand All @@ -411,7 +415,7 @@ def _schedules(self) -> tuple[list[str], str]:
search = self._domain_objects
if (result := search.find("./rule[name='Thermostat schedule']")) is not None:
name = "Thermostat schedule"
rule_id = result.attrib["id"]
rule_id = result.get("id")

log_type = "schedule_state"
locator = f"./appliance[type='thermostat']/logs/point_log[type='{log_type}']/period/measurement"
Expand All @@ -432,5 +436,5 @@ def _schedules(self) -> tuple[list[str], str]:
def _thermostat_uri(self) -> str:
"""Determine the location-set_temperature uri - from APPLIANCES."""
locator = "./appliance[type='thermostat']"
appliance_id = self._appliances.find(locator).attrib["id"]
appliance_id = self._appliances.find(locator).get("id")
return f"{APPLIANCES};id={appliance_id}/thermostat"
16 changes: 9 additions & 7 deletions plugwise/legacy/smile.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,7 @@ def get_all_gateway_entities(self) -> None:
Finally, collect the data and states for each entity.
"""
self._all_appliances()
if group_data := self._get_groups():
self.gw_entities.update(group_data)

self._get_groups()
self._all_entity_data()

async def async_update(self) -> dict[str, GwEntityData]:
Expand Down Expand Up @@ -154,13 +152,17 @@ async def set_offset(self, dev_id: str, offset: float) -> None:

async def set_preset(self, _: str, preset: str) -> None:
"""Set the given Preset on the relevant Thermostat - from DOMAIN_OBJECTS."""
if (presets := self._presets()) is None:
if not (presets := self._presets()):
raise PlugwiseError("Plugwise: no presets available.") # pragma: no cover
if preset not in list(presets):
raise PlugwiseError("Plugwise: invalid preset.")

locator = f'rule/directives/when/then[@icon="{preset}"].../.../...'
rule_id = self._domain_objects.find(locator).attrib["id"]
if (rule := self._domain_objects.find(locator)) is None:
raise PlugwiseError("Plugwise: no preset rule found.") # pragma: no cover
if (rule_id := rule.get("id")) is None:
raise PlugwiseError("Plugwise: no preset id found.") # pragma: no cover

data = f"<rules><rule id='{rule_id}'><active>true</active></rule></rules>"
await self.call_request(RULES, method="put", data=data)

Expand Down Expand Up @@ -192,7 +194,7 @@ async def set_schedule_state(
schedule_rule_id: str | None = None
for rule in self._domain_objects.findall("rule"):
if rule.find("name").text == name:
schedule_rule_id = rule.attrib["id"]
schedule_rule_id = rule.get("id")
break

if schedule_rule_id is None:
Expand All @@ -205,7 +207,7 @@ async def set_schedule_state(
new_state = "true"

locator = f'.//*[@id="{schedule_rule_id}"]/template'
template_id = self._domain_objects.find(locator).attrib["id"]
template_id = self._domain_objects.find(locator).get("id")

data = (
"<rules>"
Expand Down
14 changes: 6 additions & 8 deletions plugwise/smile.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,9 +113,7 @@ def get_all_gateway_entities(self) -> None:
)
self._scan_thermostats()

if group_data := self._get_groups():
self.gw_entities.update(group_data)

self._get_groups()
self._all_entity_data()

async def async_update(self) -> dict[str, GwEntityData]:
Expand Down Expand Up @@ -178,7 +176,7 @@ async def set_number(
if th_func_list := self._domain_objects.findall(locator):
for th_func in th_func_list:
if th_func.find("type").text == key:
thermostat_id = th_func.attrib["id"]
thermostat_id = th_func.get("id")

if thermostat_id is None:
raise PlugwiseError(f"Plugwise: cannot change setpoint, {key} not found.")
Expand Down Expand Up @@ -358,7 +356,7 @@ async def set_schedule_state(
)
if self.check_name(ANNA):
locator = f'.//*[@id="{schedule_rule_id}"]/template'
template_id = self._domain_objects.find(locator).attrib["id"]
template_id = self._domain_objects.find(locator).get("id")
template = f'<template id="{template_id}" />'

contexts = self.determine_contexts(loc_id, new_state, schedule_rule_id)
Expand Down Expand Up @@ -427,10 +425,10 @@ async def set_switch_state(
# multiple types of e.g. toggle_functionality present
if (sw_type := item.find("type")) is not None:
if sw_type.text == switch.act_type:
switch_id = item.attrib["id"]
switch_id = item.get("id")
break
else: # actuators with a single item like relay_functionality
switch_id = item.attrib["id"]
switch_id = item.get("id")

uri = f"{APPLIANCES};id={appl_id}/{switch.device};id={switch_id}"
if model == "relay":
Expand All @@ -456,7 +454,7 @@ async def _set_groupswitch_member_state(
switched = 0
for member in members:
locator = f'appliance[@id="{member}"]/{switch.actuator}/{switch.func_type}'
switch_id = self._domain_objects.find(locator).attrib["id"]
switch_id = self._domain_objects.find(locator).get("id")
uri = f"{APPLIANCES};id={member}/{switch.device};id={switch_id}"
lock_blocked = self.gw_entities[member]["switches"].get("lock")
# Assume Plugs under Plugwise control are not part of a group
Expand Down
35 changes: 19 additions & 16 deletions plugwise/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,30 +82,33 @@ def check_heater_central(xml: etree.Element) -> str:
for a system that has two heater_central appliances.
"""
locator = "./appliance[type='heater_central']"
hc_count = 0
hc_list: list[dict[str, bool]] = []
heater_central_count = 0
heater_central_list: list[dict[str, bool]] = []
for heater_central in xml.findall(locator):
hc_count += 1
hc_id: str = heater_central.attrib["id"]
has_actuators: bool = (
heater_central.find("actuator_functionalities/") is not None
)
if (heater_central_id := heater_central.get("id")) is None:
continue # pragma: no cover

if (heater_central_name := heater_central.find("name")) is None:
continue # pragma: no cover

has_actuators = heater_central.find("actuator_functionalities/") is not None
# Filter for Plug/Circle/Stealth heater_central -- Pw-Beta Issue #739
if heater_central.find("name").text == "Central heating boiler":
hc_list.append({hc_id: has_actuators})
if heater_central_name.text == "Central heating boiler":
heater_central_list.append({heater_central_id: has_actuators})
heater_central_count += 1

if not hc_list:
if not heater_central_list:
return NONE # pragma: no cover

heater_central_id = list(hc_list[0].keys())[0]
if hc_count > 1:
for item in hc_list:
hc_id, has_actuators = next(iter(item.items()))
heater_id = list(heater_central_list[0].keys())[0]
if heater_central_count > 1:
for item in heater_central_list:
heater_central_id, has_actuators = next(iter(item.items()))
if has_actuators:
heater_central_id = hc_id
heater_id = heater_central_id
break

return heater_central_id
return heater_id


def check_model(name: str | None, vendor_name: str | None) -> str | None:
Expand Down