diff --git a/CHANGELOG.md b/CHANGELOG.md index 7b52b0def..14e6bbfec 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,7 +2,7 @@ ## Ongoing -- Code optimizations via PR [#837](https://github.com/plugwise/python-plugwise/pull/837), [#838](https://github.com/plugwise/python-plugwise/pull/838) +- Code optimizations via PR [#837](https://github.com/plugwise/python-plugwise/pull/837), [#838](https://github.com/plugwise/python-plugwise/pull/838), [#839](https://github.com/plugwise/python-plugwise/pull/839) ## v1.11.0 diff --git a/plugwise/common.py b/plugwise/common.py index 795d68ed3..1024c8218 100644 --- a/plugwise/common.py +++ b/plugwise/common.py @@ -188,29 +188,28 @@ def _entity_switching_group(self, entity: GwEntityData) -> None: entity["switches"]["relay"] = counter != 0 self._count += 1 - def _get_groups(self) -> dict[str, GwEntityData]: + def _get_groups(self) -> None: """Helper-function for smile.py: get_all_gateway_entities(). Collect switching-, pumping- or report-group info. """ - groups: dict[str, GwEntityData] = {} # P1 and Anna don't have groups if self.smile.type == "power" or self.check_name(ANNA): - return groups + return for group in self._domain_objects.findall("./group"): members: list[str] = [] - group_id = group.attrib["id"] + group_id = group.get("id") group_name = group.find("name").text group_type = group.find("type").text group_appliances = group.findall("appliances/appliance") for item in group_appliances: # Check if members are not orphaned - stretch - if item.attrib["id"] in self.gw_entities: - members.append(item.attrib["id"]) + if item.get("id") in self.gw_entities: + members.append(item.get("id")) - if group_type in GROUP_TYPES and members: - groups[group_id] = { + if group_type in GROUP_TYPES and members and group_id: + self.gw_entities[group_id] = { "dev_class": group_type, "model": "Group", "name": group_name, @@ -219,8 +218,6 @@ def _get_groups(self) -> dict[str, GwEntityData]: } self._count += 5 - return groups - def _get_lock_state( self, xml: etree.Element, data: GwEntityData, stretch_v2: bool = False ) -> None: @@ -266,7 +263,7 @@ def _get_module_data( if key is not None and key not in link_tag: continue - link_id = appl_search.attrib["id"] + link_id = appl_search.get("id") loc = f".//services/{link_tag}[@id='{link_id}']...." # Not possible to walrus for some reason... # xml_2: self._modules for legacy, self._domain_objects for actual diff --git a/plugwise/helper.py b/plugwise/helper.py index f62e395a7..371dd1613 100644 --- a/plugwise/helper.py +++ b/plugwise/helper.py @@ -117,7 +117,7 @@ def _get_appliances(self) -> None: for appliance in self._domain_objects.findall("./appliance"): appl = Munch() appl.available = None - appl.entity_id = appliance.attrib["id"] + appl.entity_id = appliance.get("id") appl.location = None appl.name = appliance.find("name").text appl.model = None @@ -139,7 +139,7 @@ def _get_appliances(self) -> None: continue if (appl_loc := appliance.find("location")) is not None: - appl.location = appl_loc.attrib["id"] + appl.location = appl_loc.get("id") # Set location to the _home_loc_id when the appliance-location is not found, # except for thermostat-devices without a location, they are not active elif appl.pwclass not in THERMOSTAT_CLASSES: @@ -206,7 +206,7 @@ def _get_locations(self) -> None: loc = Munch() locations = self._domain_objects.findall("./location") for location in locations: - loc.loc_id = location.attrib["id"] + loc.loc_id = location.get("id") loc.name = location.find("name").text loc._type = location.find("type").text self._loc_data[loc.loc_id] = { @@ -269,7 +269,7 @@ def _appliance_info_finder(self, appl: Munch, appliance: etree.Element) -> Munch def _appl_gateway_info(self, appl: Munch, appliance: etree.Element) -> Munch: """Helper-function for _appliance_info_finder().""" - self._gateway_id = appliance.attrib["id"] + self._gateway_id = appliance.get("id") appl.firmware = str(self.smile.version) appl.hardware = self.smile.hw_version appl.mac = self.smile.mac_address @@ -319,7 +319,7 @@ def _get_appliances_with_offset_functionality(self) -> list[str]: './/actuator_functionalities/offset_functionality[type="temperature_offset"]/offset/../../..' ) for item in offset_appls: - therm_list.append(item.attrib["id"]) + therm_list.append(item.get("id")) return therm_list @@ -393,12 +393,12 @@ def _get_measurement_data(self, entity_id: str, entity: GwEntityData) -> None: def _collect_group_sensors( self, data: GwEntityData, - entity_id: str, + group_id: str, measurements: dict[str, UOM], ) -> None: """Collect group sensors.""" if ( - group := self._domain_objects.find(f'./group[@id="{entity_id}"]') + group := self._domain_objects.find(f'./group[@id="{group_id}"]') ) is not None: for measurement, attrs in measurements.items(): locator = f'.//logs/point_log[type="{measurement}"]/period/measurement' @@ -502,7 +502,7 @@ def _get_plugwise_notifications(self) -> None: self._notifications = {} for notification in self._domain_objects.findall("./notification"): try: - msg_id = notification.attrib["id"] + msg_id = notification.get("id") msg_type = notification.find("type").text msg = notification.find("message").text self._notifications[msg_id] = {msg_type: msg} @@ -903,7 +903,7 @@ def _presets(self, loc_id: str) -> dict[str, list[float]]: directives = self._domain_objects.find(f'rule[@id="{rule_id}"]/directives') for directive in directives: preset = directive.find("then").attrib - presets[directive.attrib["preset"]] = [ + presets[directive.get("preset")] = [ float(preset["heating_setpoint"]), float(preset["cooling_setpoint"]), ] @@ -920,13 +920,13 @@ def _rule_ids_by_name(self, name: str, loc_id: str) -> dict[str, dict[str, str]] for rule in self._domain_objects.findall(f'./rule[name="{name}"]'): active = rule.find("active").text if rule.find(locator) is not None: - schedule_ids[rule.attrib["id"]] = { + schedule_ids[rule.get("id")] = { "location": loc_id, "name": name, "active": active, } else: - schedule_ids[rule.attrib["id"]] = { + schedule_ids[rule.get("id")] = { "location": NONE, "name": name, "active": active, @@ -947,13 +947,13 @@ def _rule_ids_by_tag(self, tag: str, loc_id: str) -> dict[str, dict[str, str]]: name = rule.find("name").text active = rule.find("active").text if rule.find(locator2) is not None: - schedule_ids[rule.attrib["id"]] = { + schedule_ids[rule.get("id")] = { "location": loc_id, "name": name, "active": active, } else: - schedule_ids[rule.attrib["id"]] = { + schedule_ids[rule.get("id")] = { "location": NONE, "name": name, "active": active, @@ -1002,6 +1002,6 @@ def _thermostat_uri(self, loc_id: str) -> str: Determine the location-set_temperature uri - from LOCATIONS. """ locator = f'./location[@id="{loc_id}"]/actuator_functionalities/thermostat_functionality' - thermostat_functionality_id = self._domain_objects.find(locator).attrib["id"] + thermostat_functionality_id = self._domain_objects.find(locator).get("id") return f"{LOCATIONS};id={loc_id}/thermostat;id={thermostat_functionality_id}" diff --git a/plugwise/legacy/helper.py b/plugwise/legacy/helper.py index ad0120e3b..3ec6d0632 100644 --- a/plugwise/legacy/helper.py +++ b/plugwise/legacy/helper.py @@ -107,7 +107,7 @@ def _all_appliances(self) -> None: continue # pragma: no cover appl.location = self._home_loc_id - appl.entity_id = appliance.attrib["id"] + appl.entity_id = appliance.get("id") appl.name = appliance.find("name").text # Extend device_class name when a Circle/Stealth is type heater_central -- Pw-Beta Issue #739 if ( @@ -148,7 +148,7 @@ def _all_locations(self) -> None: return for location in locations: - loc.loc_id = location.attrib["id"] + loc.loc_id = location.get("id") loc.name = location.find("name").text loc._type = location.find("type").text # Filter the valid single location for P1 legacy: services not empty @@ -393,10 +393,14 @@ def _presets(self) -> dict[str, list[float]]: """Helper-function for presets() - collect Presets for a legacy Anna.""" presets: dict[str, list[float]] = {} for directive in self._domain_objects.findall("rule/directives/when/then"): - if directive is not None and directive.get("icon") is not None: + if ( + directive is not None + and directive.get("icon") is not None + and directive.get("temperature") is not None + ): # Ensure list of heating_setpoint, cooling_setpoint - presets[directive.attrib["icon"]] = [ - float(directive.attrib["temperature"]), + presets[directive.get("icon")] = [ + float(directive.get("temperature")), 0, ] @@ -411,7 +415,7 @@ def _schedules(self) -> tuple[list[str], str]: search = self._domain_objects if (result := search.find("./rule[name='Thermostat schedule']")) is not None: name = "Thermostat schedule" - rule_id = result.attrib["id"] + rule_id = result.get("id") log_type = "schedule_state" locator = f"./appliance[type='thermostat']/logs/point_log[type='{log_type}']/period/measurement" @@ -432,5 +436,5 @@ def _schedules(self) -> tuple[list[str], str]: def _thermostat_uri(self) -> str: """Determine the location-set_temperature uri - from APPLIANCES.""" locator = "./appliance[type='thermostat']" - appliance_id = self._appliances.find(locator).attrib["id"] + appliance_id = self._appliances.find(locator).get("id") return f"{APPLIANCES};id={appliance_id}/thermostat" diff --git a/plugwise/legacy/smile.py b/plugwise/legacy/smile.py index 54a21c15d..a841a1d1c 100644 --- a/plugwise/legacy/smile.py +++ b/plugwise/legacy/smile.py @@ -82,9 +82,7 @@ def get_all_gateway_entities(self) -> None: Finally, collect the data and states for each entity. """ self._all_appliances() - if group_data := self._get_groups(): - self.gw_entities.update(group_data) - + self._get_groups() self._all_entity_data() async def async_update(self) -> dict[str, GwEntityData]: @@ -154,13 +152,17 @@ async def set_offset(self, dev_id: str, offset: float) -> None: async def set_preset(self, _: str, preset: str) -> None: """Set the given Preset on the relevant Thermostat - from DOMAIN_OBJECTS.""" - if (presets := self._presets()) is None: + if not (presets := self._presets()): raise PlugwiseError("Plugwise: no presets available.") # pragma: no cover if preset not in list(presets): raise PlugwiseError("Plugwise: invalid preset.") locator = f'rule/directives/when/then[@icon="{preset}"].../.../...' - rule_id = self._domain_objects.find(locator).attrib["id"] + if (rule := self._domain_objects.find(locator)) is None: + raise PlugwiseError("Plugwise: no preset rule found.") # pragma: no cover + if (rule_id := rule.get("id")) is None: + raise PlugwiseError("Plugwise: no preset id found.") # pragma: no cover + data = f"true" await self.call_request(RULES, method="put", data=data) @@ -192,7 +194,7 @@ async def set_schedule_state( schedule_rule_id: str | None = None for rule in self._domain_objects.findall("rule"): if rule.find("name").text == name: - schedule_rule_id = rule.attrib["id"] + schedule_rule_id = rule.get("id") break if schedule_rule_id is None: @@ -205,7 +207,7 @@ async def set_schedule_state( new_state = "true" locator = f'.//*[@id="{schedule_rule_id}"]/template' - template_id = self._domain_objects.find(locator).attrib["id"] + template_id = self._domain_objects.find(locator).get("id") data = ( "" diff --git a/plugwise/smile.py b/plugwise/smile.py index 058d767b8..61a042964 100644 --- a/plugwise/smile.py +++ b/plugwise/smile.py @@ -113,9 +113,7 @@ def get_all_gateway_entities(self) -> None: ) self._scan_thermostats() - if group_data := self._get_groups(): - self.gw_entities.update(group_data) - + self._get_groups() self._all_entity_data() async def async_update(self) -> dict[str, GwEntityData]: @@ -178,7 +176,7 @@ async def set_number( if th_func_list := self._domain_objects.findall(locator): for th_func in th_func_list: if th_func.find("type").text == key: - thermostat_id = th_func.attrib["id"] + thermostat_id = th_func.get("id") if thermostat_id is None: raise PlugwiseError(f"Plugwise: cannot change setpoint, {key} not found.") @@ -358,7 +356,7 @@ async def set_schedule_state( ) if self.check_name(ANNA): locator = f'.//*[@id="{schedule_rule_id}"]/template' - template_id = self._domain_objects.find(locator).attrib["id"] + template_id = self._domain_objects.find(locator).get("id") template = f'