|
| 1 | +import logging |
| 2 | +import threading |
| 3 | +from time import monotonic |
| 4 | + |
| 5 | +import attr |
| 6 | + |
| 7 | +from .common import ManagedResource, ResourceManager |
| 8 | +from ..factory import target_factory |
| 9 | + |
| 10 | +@attr.s(eq=False) |
| 11 | +class MQTTManager(ResourceManager): |
| 12 | + _available = attr.ib(default=attr.Factory(set), validator=attr.validators.instance_of(set)) |
| 13 | + _avail_lock = attr.ib(default=threading.Lock()) |
| 14 | + _clients = attr.ib(default=attr.Factory(dict), validator=attr.validators.instance_of(dict)) |
| 15 | + _topics = attr.ib(default=attr.Factory(list), validator=attr.validators.instance_of(list)) |
| 16 | + _topic_lock = attr.ib(default=threading.Lock()) |
| 17 | + _last = attr.ib(default=0.0, validator=attr.validators.instance_of(float)) |
| 18 | + |
| 19 | + def __attrs_post_init__(self): |
| 20 | + super().__attrs_post_init__() |
| 21 | + self.log = logging.getLogger('MQTTManager') |
| 22 | + |
| 23 | + def _create_mqtt_connection(self, host): |
| 24 | + import paho.mqtt.client as mqtt |
| 25 | + client = mqtt.Client() |
| 26 | + client.connect(host) |
| 27 | + client.on_message = self._on_message |
| 28 | + client.loop_start() |
| 29 | + return client |
| 30 | + |
| 31 | + def on_resource_added(self, resource): |
| 32 | + host = resource.host |
| 33 | + if host not in self._clients: |
| 34 | + self._clients[host] = self._create_mqtt_connection(host) |
| 35 | + self._clients[host].subscribe(resource.avail_topic) |
| 36 | + |
| 37 | + def _on_message(self, client, userdata, msg): |
| 38 | + payload = msg.payload.decode('utf-8') |
| 39 | + topic = msg.topic |
| 40 | + if payload == "Online": |
| 41 | + with self._avail_lock: |
| 42 | + self._available.add(topic) |
| 43 | + if payload == "Offline": |
| 44 | + with self._avail_lock: |
| 45 | + self._available.discard(topic) |
| 46 | + |
| 47 | + def poll(self): |
| 48 | + if monotonic()-self._last < 2: |
| 49 | + return # ratelimit requests |
| 50 | + self._last = monotonic() |
| 51 | + with self._avail_lock: |
| 52 | + for resource in self.resources: |
| 53 | + resource.avail = resource.avail_topic in self._available |
| 54 | + |
| 55 | + |
| 56 | +@target_factory.reg_resource |
| 57 | +@attr.s(eq=False) |
| 58 | +class MQTTResource(ManagedResource): |
| 59 | + manager_cls = MQTTManager |
| 60 | + |
| 61 | + host = attr.ib(validator=attr.validators.instance_of(str)) |
| 62 | + avail_topic = attr.ib(validator=attr.validators.instance_of(str)) |
| 63 | + |
| 64 | + def __attrs_post_init__(self): |
| 65 | + self.timeout = 30.0 |
| 66 | + super().__attrs_post_init__() |
| 67 | + |
| 68 | + |
| 69 | +@target_factory.reg_resource |
| 70 | +@attr.s(eq=False) |
| 71 | +class TasmotaPowerPort(MQTTResource): |
| 72 | + power_topic = attr.ib(default=None, |
| 73 | + validator=attr.validators.instance_of(str)) |
| 74 | + status_topic = attr.ib(default=None, |
| 75 | + validator=attr.validators.instance_of(str)) |
0 commit comments