diff --git a/CHANGELOG.md b/CHANGELOG.md
index 7b52b0def..14e6bbfec 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -2,7 +2,7 @@
## Ongoing
-- Code optimizations via PR [#837](https://github.com/plugwise/python-plugwise/pull/837), [#838](https://github.com/plugwise/python-plugwise/pull/838)
+- Code optimizations via PR [#837](https://github.com/plugwise/python-plugwise/pull/837), [#838](https://github.com/plugwise/python-plugwise/pull/838), [#839](https://github.com/plugwise/python-plugwise/pull/839)
## v1.11.0
diff --git a/plugwise/common.py b/plugwise/common.py
index 795d68ed3..1024c8218 100644
--- a/plugwise/common.py
+++ b/plugwise/common.py
@@ -188,29 +188,28 @@ def _entity_switching_group(self, entity: GwEntityData) -> None:
entity["switches"]["relay"] = counter != 0
self._count += 1
- def _get_groups(self) -> dict[str, GwEntityData]:
+ def _get_groups(self) -> None:
"""Helper-function for smile.py: get_all_gateway_entities().
Collect switching-, pumping- or report-group info.
"""
- groups: dict[str, GwEntityData] = {}
# P1 and Anna don't have groups
if self.smile.type == "power" or self.check_name(ANNA):
- return groups
+ return
for group in self._domain_objects.findall("./group"):
members: list[str] = []
- group_id = group.attrib["id"]
+ group_id = group.get("id")
group_name = group.find("name").text
group_type = group.find("type").text
group_appliances = group.findall("appliances/appliance")
for item in group_appliances:
# Check if members are not orphaned - stretch
- if item.attrib["id"] in self.gw_entities:
- members.append(item.attrib["id"])
+ if item.get("id") in self.gw_entities:
+ members.append(item.get("id"))
- if group_type in GROUP_TYPES and members:
- groups[group_id] = {
+ if group_type in GROUP_TYPES and members and group_id:
+ self.gw_entities[group_id] = {
"dev_class": group_type,
"model": "Group",
"name": group_name,
@@ -219,8 +218,6 @@ def _get_groups(self) -> dict[str, GwEntityData]:
}
self._count += 5
- return groups
-
def _get_lock_state(
self, xml: etree.Element, data: GwEntityData, stretch_v2: bool = False
) -> None:
@@ -266,7 +263,7 @@ def _get_module_data(
if key is not None and key not in link_tag:
continue
- link_id = appl_search.attrib["id"]
+ link_id = appl_search.get("id")
loc = f".//services/{link_tag}[@id='{link_id}']...."
# Not possible to walrus for some reason...
# xml_2: self._modules for legacy, self._domain_objects for actual
diff --git a/plugwise/helper.py b/plugwise/helper.py
index f62e395a7..371dd1613 100644
--- a/plugwise/helper.py
+++ b/plugwise/helper.py
@@ -117,7 +117,7 @@ def _get_appliances(self) -> None:
for appliance in self._domain_objects.findall("./appliance"):
appl = Munch()
appl.available = None
- appl.entity_id = appliance.attrib["id"]
+ appl.entity_id = appliance.get("id")
appl.location = None
appl.name = appliance.find("name").text
appl.model = None
@@ -139,7 +139,7 @@ def _get_appliances(self) -> None:
continue
if (appl_loc := appliance.find("location")) is not None:
- appl.location = appl_loc.attrib["id"]
+ appl.location = appl_loc.get("id")
# Set location to the _home_loc_id when the appliance-location is not found,
# except for thermostat-devices without a location, they are not active
elif appl.pwclass not in THERMOSTAT_CLASSES:
@@ -206,7 +206,7 @@ def _get_locations(self) -> None:
loc = Munch()
locations = self._domain_objects.findall("./location")
for location in locations:
- loc.loc_id = location.attrib["id"]
+ loc.loc_id = location.get("id")
loc.name = location.find("name").text
loc._type = location.find("type").text
self._loc_data[loc.loc_id] = {
@@ -269,7 +269,7 @@ def _appliance_info_finder(self, appl: Munch, appliance: etree.Element) -> Munch
def _appl_gateway_info(self, appl: Munch, appliance: etree.Element) -> Munch:
"""Helper-function for _appliance_info_finder()."""
- self._gateway_id = appliance.attrib["id"]
+ self._gateway_id = appliance.get("id")
appl.firmware = str(self.smile.version)
appl.hardware = self.smile.hw_version
appl.mac = self.smile.mac_address
@@ -319,7 +319,7 @@ def _get_appliances_with_offset_functionality(self) -> list[str]:
'.//actuator_functionalities/offset_functionality[type="temperature_offset"]/offset/../../..'
)
for item in offset_appls:
- therm_list.append(item.attrib["id"])
+ therm_list.append(item.get("id"))
return therm_list
@@ -393,12 +393,12 @@ def _get_measurement_data(self, entity_id: str, entity: GwEntityData) -> None:
def _collect_group_sensors(
self,
data: GwEntityData,
- entity_id: str,
+ group_id: str,
measurements: dict[str, UOM],
) -> None:
"""Collect group sensors."""
if (
- group := self._domain_objects.find(f'./group[@id="{entity_id}"]')
+ group := self._domain_objects.find(f'./group[@id="{group_id}"]')
) is not None:
for measurement, attrs in measurements.items():
locator = f'.//logs/point_log[type="{measurement}"]/period/measurement'
@@ -502,7 +502,7 @@ def _get_plugwise_notifications(self) -> None:
self._notifications = {}
for notification in self._domain_objects.findall("./notification"):
try:
- msg_id = notification.attrib["id"]
+ msg_id = notification.get("id")
msg_type = notification.find("type").text
msg = notification.find("message").text
self._notifications[msg_id] = {msg_type: msg}
@@ -903,7 +903,7 @@ def _presets(self, loc_id: str) -> dict[str, list[float]]:
directives = self._domain_objects.find(f'rule[@id="{rule_id}"]/directives')
for directive in directives:
preset = directive.find("then").attrib
- presets[directive.attrib["preset"]] = [
+ presets[directive.get("preset")] = [
float(preset["heating_setpoint"]),
float(preset["cooling_setpoint"]),
]
@@ -920,13 +920,13 @@ def _rule_ids_by_name(self, name: str, loc_id: str) -> dict[str, dict[str, str]]
for rule in self._domain_objects.findall(f'./rule[name="{name}"]'):
active = rule.find("active").text
if rule.find(locator) is not None:
- schedule_ids[rule.attrib["id"]] = {
+ schedule_ids[rule.get("id")] = {
"location": loc_id,
"name": name,
"active": active,
}
else:
- schedule_ids[rule.attrib["id"]] = {
+ schedule_ids[rule.get("id")] = {
"location": NONE,
"name": name,
"active": active,
@@ -947,13 +947,13 @@ def _rule_ids_by_tag(self, tag: str, loc_id: str) -> dict[str, dict[str, str]]:
name = rule.find("name").text
active = rule.find("active").text
if rule.find(locator2) is not None:
- schedule_ids[rule.attrib["id"]] = {
+ schedule_ids[rule.get("id")] = {
"location": loc_id,
"name": name,
"active": active,
}
else:
- schedule_ids[rule.attrib["id"]] = {
+ schedule_ids[rule.get("id")] = {
"location": NONE,
"name": name,
"active": active,
@@ -1002,6 +1002,6 @@ def _thermostat_uri(self, loc_id: str) -> str:
Determine the location-set_temperature uri - from LOCATIONS.
"""
locator = f'./location[@id="{loc_id}"]/actuator_functionalities/thermostat_functionality'
- thermostat_functionality_id = self._domain_objects.find(locator).attrib["id"]
+ thermostat_functionality_id = self._domain_objects.find(locator).get("id")
return f"{LOCATIONS};id={loc_id}/thermostat;id={thermostat_functionality_id}"
diff --git a/plugwise/legacy/helper.py b/plugwise/legacy/helper.py
index ad0120e3b..3ec6d0632 100644
--- a/plugwise/legacy/helper.py
+++ b/plugwise/legacy/helper.py
@@ -107,7 +107,7 @@ def _all_appliances(self) -> None:
continue # pragma: no cover
appl.location = self._home_loc_id
- appl.entity_id = appliance.attrib["id"]
+ appl.entity_id = appliance.get("id")
appl.name = appliance.find("name").text
# Extend device_class name when a Circle/Stealth is type heater_central -- Pw-Beta Issue #739
if (
@@ -148,7 +148,7 @@ def _all_locations(self) -> None:
return
for location in locations:
- loc.loc_id = location.attrib["id"]
+ loc.loc_id = location.get("id")
loc.name = location.find("name").text
loc._type = location.find("type").text
# Filter the valid single location for P1 legacy: services not empty
@@ -393,10 +393,14 @@ def _presets(self) -> dict[str, list[float]]:
"""Helper-function for presets() - collect Presets for a legacy Anna."""
presets: dict[str, list[float]] = {}
for directive in self._domain_objects.findall("rule/directives/when/then"):
- if directive is not None and directive.get("icon") is not None:
+ if (
+ directive is not None
+ and directive.get("icon") is not None
+ and directive.get("temperature") is not None
+ ):
# Ensure list of heating_setpoint, cooling_setpoint
- presets[directive.attrib["icon"]] = [
- float(directive.attrib["temperature"]),
+ presets[directive.get("icon")] = [
+ float(directive.get("temperature")),
0,
]
@@ -411,7 +415,7 @@ def _schedules(self) -> tuple[list[str], str]:
search = self._domain_objects
if (result := search.find("./rule[name='Thermostat schedule']")) is not None:
name = "Thermostat schedule"
- rule_id = result.attrib["id"]
+ rule_id = result.get("id")
log_type = "schedule_state"
locator = f"./appliance[type='thermostat']/logs/point_log[type='{log_type}']/period/measurement"
@@ -432,5 +436,5 @@ def _schedules(self) -> tuple[list[str], str]:
def _thermostat_uri(self) -> str:
"""Determine the location-set_temperature uri - from APPLIANCES."""
locator = "./appliance[type='thermostat']"
- appliance_id = self._appliances.find(locator).attrib["id"]
+ appliance_id = self._appliances.find(locator).get("id")
return f"{APPLIANCES};id={appliance_id}/thermostat"
diff --git a/plugwise/legacy/smile.py b/plugwise/legacy/smile.py
index 54a21c15d..a841a1d1c 100644
--- a/plugwise/legacy/smile.py
+++ b/plugwise/legacy/smile.py
@@ -82,9 +82,7 @@ def get_all_gateway_entities(self) -> None:
Finally, collect the data and states for each entity.
"""
self._all_appliances()
- if group_data := self._get_groups():
- self.gw_entities.update(group_data)
-
+ self._get_groups()
self._all_entity_data()
async def async_update(self) -> dict[str, GwEntityData]:
@@ -154,13 +152,17 @@ async def set_offset(self, dev_id: str, offset: float) -> None:
async def set_preset(self, _: str, preset: str) -> None:
"""Set the given Preset on the relevant Thermostat - from DOMAIN_OBJECTS."""
- if (presets := self._presets()) is None:
+ if not (presets := self._presets()):
raise PlugwiseError("Plugwise: no presets available.") # pragma: no cover
if preset not in list(presets):
raise PlugwiseError("Plugwise: invalid preset.")
locator = f'rule/directives/when/then[@icon="{preset}"].../.../...'
- rule_id = self._domain_objects.find(locator).attrib["id"]
+ if (rule := self._domain_objects.find(locator)) is None:
+ raise PlugwiseError("Plugwise: no preset rule found.") # pragma: no cover
+ if (rule_id := rule.get("id")) is None:
+ raise PlugwiseError("Plugwise: no preset id found.") # pragma: no cover
+
data = f"true"
await self.call_request(RULES, method="put", data=data)
@@ -192,7 +194,7 @@ async def set_schedule_state(
schedule_rule_id: str | None = None
for rule in self._domain_objects.findall("rule"):
if rule.find("name").text == name:
- schedule_rule_id = rule.attrib["id"]
+ schedule_rule_id = rule.get("id")
break
if schedule_rule_id is None:
@@ -205,7 +207,7 @@ async def set_schedule_state(
new_state = "true"
locator = f'.//*[@id="{schedule_rule_id}"]/template'
- template_id = self._domain_objects.find(locator).attrib["id"]
+ template_id = self._domain_objects.find(locator).get("id")
data = (
""
diff --git a/plugwise/smile.py b/plugwise/smile.py
index 058d767b8..61a042964 100644
--- a/plugwise/smile.py
+++ b/plugwise/smile.py
@@ -113,9 +113,7 @@ def get_all_gateway_entities(self) -> None:
)
self._scan_thermostats()
- if group_data := self._get_groups():
- self.gw_entities.update(group_data)
-
+ self._get_groups()
self._all_entity_data()
async def async_update(self) -> dict[str, GwEntityData]:
@@ -178,7 +176,7 @@ async def set_number(
if th_func_list := self._domain_objects.findall(locator):
for th_func in th_func_list:
if th_func.find("type").text == key:
- thermostat_id = th_func.attrib["id"]
+ thermostat_id = th_func.get("id")
if thermostat_id is None:
raise PlugwiseError(f"Plugwise: cannot change setpoint, {key} not found.")
@@ -358,7 +356,7 @@ async def set_schedule_state(
)
if self.check_name(ANNA):
locator = f'.//*[@id="{schedule_rule_id}"]/template'
- template_id = self._domain_objects.find(locator).attrib["id"]
+ template_id = self._domain_objects.find(locator).get("id")
template = f''
contexts = self.determine_contexts(loc_id, new_state, schedule_rule_id)
@@ -427,10 +425,10 @@ async def set_switch_state(
# multiple types of e.g. toggle_functionality present
if (sw_type := item.find("type")) is not None:
if sw_type.text == switch.act_type:
- switch_id = item.attrib["id"]
+ switch_id = item.get("id")
break
else: # actuators with a single item like relay_functionality
- switch_id = item.attrib["id"]
+ switch_id = item.get("id")
uri = f"{APPLIANCES};id={appl_id}/{switch.device};id={switch_id}"
if model == "relay":
@@ -456,7 +454,7 @@ async def _set_groupswitch_member_state(
switched = 0
for member in members:
locator = f'appliance[@id="{member}"]/{switch.actuator}/{switch.func_type}'
- switch_id = self._domain_objects.find(locator).attrib["id"]
+ switch_id = self._domain_objects.find(locator).get("id")
uri = f"{APPLIANCES};id={member}/{switch.device};id={switch_id}"
lock_blocked = self.gw_entities[member]["switches"].get("lock")
# Assume Plugs under Plugwise control are not part of a group
diff --git a/plugwise/util.py b/plugwise/util.py
index 1b4a3efaf..475a825dc 100644
--- a/plugwise/util.py
+++ b/plugwise/util.py
@@ -82,30 +82,33 @@ def check_heater_central(xml: etree.Element) -> str:
for a system that has two heater_central appliances.
"""
locator = "./appliance[type='heater_central']"
- hc_count = 0
- hc_list: list[dict[str, bool]] = []
+ heater_central_count = 0
+ heater_central_list: list[dict[str, bool]] = []
for heater_central in xml.findall(locator):
- hc_count += 1
- hc_id: str = heater_central.attrib["id"]
- has_actuators: bool = (
- heater_central.find("actuator_functionalities/") is not None
- )
+ if (heater_central_id := heater_central.get("id")) is None:
+ continue # pragma: no cover
+
+ if (heater_central_name := heater_central.find("name")) is None:
+ continue # pragma: no cover
+
+ has_actuators = heater_central.find("actuator_functionalities/") is not None
# Filter for Plug/Circle/Stealth heater_central -- Pw-Beta Issue #739
- if heater_central.find("name").text == "Central heating boiler":
- hc_list.append({hc_id: has_actuators})
+ if heater_central_name.text == "Central heating boiler":
+ heater_central_list.append({heater_central_id: has_actuators})
+ heater_central_count += 1
- if not hc_list:
+ if not heater_central_list:
return NONE # pragma: no cover
- heater_central_id = list(hc_list[0].keys())[0]
- if hc_count > 1:
- for item in hc_list:
- hc_id, has_actuators = next(iter(item.items()))
+ heater_id = list(heater_central_list[0].keys())[0]
+ if heater_central_count > 1:
+ for item in heater_central_list:
+ heater_central_id, has_actuators = next(iter(item.items()))
if has_actuators:
- heater_central_id = hc_id
+ heater_id = heater_central_id
break
- return heater_central_id
+ return heater_id
def check_model(name: str | None, vendor_name: str | None) -> str | None: