From dc28e7b1a4bb31f2a8a64c956373faa50870684e Mon Sep 17 00:00:00 2001 From: Cameron Will Date: Sun, 22 Feb 2026 21:22:51 -0500 Subject: [PATCH] feat: Add telemetry request and response Closes #28 --- pyproject.toml | 1 + src/meshcore_console/core/enums.py | 3 + src/meshcore_console/core/services.py | 4 + src/meshcore_console/core/types.py | 11 ++ src/meshcore_console/meshcore/cayenne_lpp.py | 61 ++++++++ src/meshcore_console/meshcore/client.py | 26 ++++ src/meshcore_console/meshcore/contact_book.py | 4 + src/meshcore_console/meshcore/operations.py | 17 +++ src/meshcore_console/meshcore/session.py | 68 +++++++++ src/meshcore_console/mock/client.py | 42 ++++++ src/meshcore_console/ui_gtk/views/peers.py | 132 +++++++++++++++++- uv.lock | 11 ++ 12 files changed, 378 insertions(+), 2 deletions(-) create mode 100644 src/meshcore_console/meshcore/cayenne_lpp.py diff --git a/pyproject.toml b/pyproject.toml index 5763683..a45ec74 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -15,6 +15,7 @@ dependencies = [ "pynmea2>=1.18.0", "segno>=1.6.0", # QR code generation "gpsdclient>=1.3", + "pycayennelpp>=2.4.0", ] [project.optional-dependencies] diff --git a/src/meshcore_console/core/enums.py b/src/meshcore_console/core/enums.py index e806aab..0cbcabe 100644 --- a/src/meshcore_console/core/enums.py +++ b/src/meshcore_console/core/enums.py @@ -63,6 +63,9 @@ class EventType(StrEnum): # Radio health RADIO_ERROR = "radio_error" + # Telemetry + TELEMETRY_RECEIVED = "telemetry_received" + # EventService events (mesh.* naming) MESH_CONTACT_NEW = "mesh.contact.new" MESH_CHANNEL_MESSAGE_NEW = "mesh.channel.message.new" diff --git a/src/meshcore_console/core/services.py b/src/meshcore_console/core/services.py index 70753e1..0763d30 100644 --- a/src/meshcore_console/core/services.py +++ b/src/meshcore_console/core/services.py @@ -89,6 +89,10 @@ def set_favorite(self, peer_id: str, favorite: bool) -> None: """Toggle the favorite flag on a peer.""" ... + def request_telemetry(self, peer_name: str) -> dict: + """Request telemetry data from a remote peer. Returns decoded sensor data.""" + ... + def get_self_public_key(self) -> str | None: """Return this node's public key as a hex string, or None if unavailable.""" ... diff --git a/src/meshcore_console/core/types.py b/src/meshcore_console/core/types.py index 7996751..8b443c0 100644 --- a/src/meshcore_console/core/types.py +++ b/src/meshcore_console/core/types.py @@ -158,6 +158,17 @@ async def send_group_text(self, channel_name: str, message: str) -> object: """Send a text message to a group channel.""" ... + async def send_telemetry_request( + self, + contact_name: str, + want_base: bool = True, + want_location: bool = True, + want_environment: bool = False, + timeout: float = 10.0, + ) -> dict: + """Request telemetry data from a remote peer.""" + ... + class EventSubscriberProtocol(Protocol): """Protocol for pyMC_core EventSubscriber.""" diff --git a/src/meshcore_console/meshcore/cayenne_lpp.py b/src/meshcore_console/meshcore/cayenne_lpp.py new file mode 100644 index 0000000..5472cd5 --- /dev/null +++ b/src/meshcore_console/meshcore/cayenne_lpp.py @@ -0,0 +1,61 @@ +"""CayenneLPP encoder/decoder for telemetry payloads. + +Uses the pycayennelpp library for encoding/decoding, and provides +``decode_cayenne_lpp_payload`` matching the interface pymc_core's +ProtocolResponseHandler expects from ``utils.cayenne_lpp_helpers``. +""" + +from __future__ import annotations + +from cayennelpp import LppFrame + + +def encode_gps(channel: int, lat: float, lon: float, alt: float = 0.0) -> bytes: + """Encode a GPS location as CayenneLPP bytes.""" + frame = LppFrame() + frame.add_gps(channel, lat, lon, alt) + return bytes(frame) + + +def decode_cayenne_lpp_payload(hex_string: str) -> dict: + """Decode a CayenneLPP hex payload into structured sensor data. + + Matches the signature expected by pymc_core's + ``utils.cayenne_lpp_helpers.decode_cayenne_lpp_payload``. + + Returns: + dict with ``sensor_count`` and ``sensors`` list, or ``error`` string. + """ + try: + data = bytes.fromhex(hex_string) + except ValueError as e: + return {"error": f"Invalid hex: {e}"} + + try: + frame = LppFrame().from_bytes(data) + except Exception as e: + return {"error": f"LPP decode failed: {e}"} + + sensors: list[dict] = [] + for item in frame: + value = item.value + # GPS/Location type returns (lat, lon, alt) tuple + if item.type == "Location": + value = {"latitude": value[0], "longitude": value[1], "altitude": value[2]} + elif isinstance(value, tuple) and len(value) == 1: + value = value[0] + + sensors.append( + { + "channel": item.channel, + "type": item.type, + "type_id": item.type, + "value": value, + "raw_value": hex_string, + } + ) + + if not sensors: + return {"error": "No valid sensors found", "sensor_count": 0, "sensors": []} + + return {"sensor_count": len(sensors), "sensors": sensors} diff --git a/src/meshcore_console/meshcore/client.py b/src/meshcore_console/meshcore/client.py index 3ab3044..6f3963e 100644 --- a/src/meshcore_console/meshcore/client.py +++ b/src/meshcore_console/meshcore/client.py @@ -157,6 +157,7 @@ def connect(self) -> None: self._connected = False raise self._connected = True + self._session.set_telemetry_data_fn(self._get_local_telemetry) self._seed_contact_book() self._gps_provider.start() self._append_event( @@ -777,6 +778,31 @@ def get_self_public_key(self) -> str | None: """Return this node's public key as a hex string, or None if unavailable.""" return self._session.get_public_key() + def request_telemetry(self, peer_name: str) -> dict: + """Request telemetry data from a remote peer.""" + if not self._connected: + self.connect() + result = self._run_async( + self._session.send_telemetry_request(peer_name, timeout=10.0), + timeout=15.0, + ) + self._append_event( + { + "type": EventType.TELEMETRY_RECEIVED, + "data": {"peer_name": peer_name, "telemetry": result}, + } + ) + return result # type: ignore[return-value] + + def _get_local_telemetry(self) -> dict: + """Provide local telemetry data for inbound requests.""" + loc = self._gps_provider.get_location() + return { + "allow": self._settings.allow_telemetry, + "lat": loc[0] if loc else None, + "lon": loc[1] if loc else None, + } + def _seed_contact_book(self) -> None: """Populate the session's contact book with known peers that have public keys.""" book = self._session.contact_book diff --git a/src/meshcore_console/meshcore/contact_book.py b/src/meshcore_console/meshcore/contact_book.py index 3b29ed0..f765751 100644 --- a/src/meshcore_console/meshcore/contact_book.py +++ b/src/meshcore_console/meshcore/contact_book.py @@ -5,6 +5,7 @@ - .contacts — iterable of objects with .public_key (hex str) and .name - .get_by_name(name) — return a contact or None - .add_contact(data) — store a new contact + - .list_contacts() — return all contacts (used by ProtocolResponseHandler) """ from __future__ import annotations @@ -31,6 +32,9 @@ class ContactBook: def __init__(self) -> None: self.contacts: list[Contact] = [] + def list_contacts(self) -> list[Contact]: + return self.contacts + def get_by_name(self, name: str) -> Contact | None: for contact in self.contacts: if contact.name == name: diff --git a/src/meshcore_console/meshcore/operations.py b/src/meshcore_console/meshcore/operations.py index fb46bc0..230c303 100644 --- a/src/meshcore_console/meshcore/operations.py +++ b/src/meshcore_console/meshcore/operations.py @@ -18,6 +18,23 @@ async def send_group_text(*, node: MeshNodeProtocol, channel_name: str, message: return await node.send_group_text(channel_name, message) +async def request_telemetry( + *, + node: MeshNodeProtocol, + contact_name: str, + want_location: bool = True, + timeout: float = 10.0, +) -> dict: + """Request telemetry data from a remote peer.""" + return await node.send_telemetry_request( + contact_name, + want_base=True, + want_location=want_location, + want_environment=False, + timeout=timeout, + ) + + async def send_advert( *, node: MeshNodeProtocol, diff --git a/src/meshcore_console/meshcore/session.py b/src/meshcore_console/meshcore/session.py index ea8624d..37493fa 100644 --- a/src/meshcore_console/meshcore/session.py +++ b/src/meshcore_console/meshcore/session.py @@ -23,6 +23,21 @@ SX1262RadioProtocol, ) +from .cayenne_lpp import encode_gps +from . import cayenne_lpp as _cayenne_lpp_mod + +# pymc_core's ProtocolResponseHandler tries ``from utils.cayenne_lpp_helpers +# import decode_cayenne_lpp_payload`` which isn't shipped. Register our +# module under that name so the import succeeds. +import sys as _sys +import types as _types + +if "utils.cayenne_lpp_helpers" not in _sys.modules: + _shim = _types.ModuleType("utils.cayenne_lpp_helpers") + _shim.decode_cayenne_lpp_payload = _cayenne_lpp_mod.decode_cayenne_lpp_payload # type: ignore[attr-defined] + _sys.modules.setdefault("utils", _types.ModuleType("utils")) + _sys.modules["utils.cayenne_lpp_helpers"] = _shim + from .channel_db import ChannelDatabase from .config import RuntimeRadioConfig, load_hardware_config_from_env from .contact_book import ContactBook @@ -46,6 +61,7 @@ def __init__(self, config: RuntimeRadioConfig, logger: LoggerCallback | None = N self._node_task: asyncio.Task[None] | None = None self._event_queue: queue.Queue[MeshEventDict] = queue.Queue() self._event_notify: Callable[[], None] | None = None + self._telemetry_data_fn: Callable[[], dict[str, Any]] | None = None self._db = open_db() self._channel_db = ChannelDatabase(self._db) self._contact_book = ContactBook() @@ -58,6 +74,10 @@ def _log(self, message: str) -> None: def set_event_notify(self, notify_fn: Callable[[], None]) -> None: self._event_notify = notify_fn + def set_telemetry_data_fn(self, fn: Callable[[], dict[str, Any]]) -> None: + """Set the callback that provides local telemetry data for inbound requests.""" + self._telemetry_data_fn = fn + def _emit(self, payload: MeshEventDict) -> None: self._event_queue.put_nowait(payload) if self._event_notify is not None: @@ -120,6 +140,33 @@ async def _send() -> None: control_handler.set_request_callback(_on_discovery_request) + def _build_telemetry_handler(self) -> Callable[..., bytes | None]: + """Build a handler closure for inbound telemetry requests (REQ type 0x03).""" + TELEM_PERM_LOCATION = 0x02 # noqa: N806 + + def _handle_telemetry(_client: Any, _timestamp: int, req_data: bytes) -> bytes | None: + if self._telemetry_data_fn is None: + return None + data = self._telemetry_data_fn() + if not data.get("allow"): + self._log("telemetry request denied (allow_telemetry=False)") + return None + # req_data is an inverse permission mask — bits set = permissions EXCLUDED + mask = req_data[0] if req_data else 0x00 + if mask & TELEM_PERM_LOCATION: + self._log("telemetry request excludes location") + return None + lat = data.get("lat") + lon = data.get("lon") + if lat is None or lon is None: + self._log("telemetry request: no GPS fix") + return None + payload = encode_gps(channel=1, lat=lat, lon=lon) + self._log(f"responding to telemetry request with GPS ({lat:.4f}, {lon:.4f})") + return payload + + return _handle_telemetry + def _register_req_handler(self) -> None: """Register a handler for incoming REQ packets. @@ -131,6 +178,8 @@ def _register_req_handler(self) -> None: from pymc_core.node.handlers.protocol_request import ProtocolRequestHandler from pymc_core.protocol.constants import PAYLOAD_TYPE_REQ + REQ_TYPE_GET_TELEMETRY_DATA = 0x03 # noqa: N806 + assert self._node is not None assert self._identity is not None @@ -138,6 +187,7 @@ def _register_req_handler(self) -> None: local_identity=self._identity, contacts=self._contact_book, log_fn=self._log, + request_handlers={REQ_TYPE_GET_TELEMETRY_DATA: self._build_telemetry_handler()}, ) dispatcher = self._node.dispatcher @@ -384,6 +434,24 @@ async def send_advert( route_type=route_type, ) + async def send_telemetry_request( + self, + contact_name: str, + *, + want_location: bool = True, + timeout: float = 10.0, + ) -> dict: + """Request telemetry from a remote peer. Returns decoded sensor data.""" + if self._node is None: + raise RuntimeError("Session is not started.") + return await self._node.send_telemetry_request( + contact_name, + want_base=True, + want_location=want_location, + want_environment=False, + timeout=timeout, + ) + async def listen_events(self) -> AsyncIterator[MeshEventDict]: if self._node is None: raise RuntimeError("Session is not started.") diff --git a/src/meshcore_console/mock/client.py b/src/meshcore_console/mock/client.py index 42e2a8f..48b1d1a 100644 --- a/src/meshcore_console/mock/client.py +++ b/src/meshcore_console/mock/client.py @@ -234,6 +234,48 @@ def set_favorite(self, peer_id: str, favorite: bool) -> None: peer.is_favorite = favorite return + def request_telemetry(self, peer_name: str) -> dict: + """Return synthetic telemetry data matching pymc_core's format.""" + import time + + loc = self._gps_provider.get_location() + lat = (loc[0] + 0.012) if loc else 37.7749 + lon = (loc[1] - 0.008) if loc else -122.4194 + result = { + "success": True, + "contact": peer_name, + "telemetry_data": { + "type": "telemetry", + "reflected_timestamp": int(time.time()), + "sensor_count": 2, + "sensors": [ + { + "channel": 1, + "type": "Voltage", + "type_id": "Voltage", + "value": 3.87, + "raw_value": "01740183", + }, + { + "channel": 2, + "type": "Location", + "type_id": "Location", + "value": {"latitude": lat, "longitude": lon, "altitude": 42.5}, + "raw_value": "", + }, + ], + }, + "rtt_ms": 1247.0, + "reason": "Telemetry response received", + } + self._append_event( + { + "type": "telemetry_received", + "data": {"peer_name": peer_name, "telemetry": result}, + } + ) + return result + def get_self_public_key(self) -> str | None: """Return a mock public key for testing.""" return "6b547fd13630e0f7a6b167df23b9876543210abcdef0123456789abcdef0a619" diff --git a/src/meshcore_console/ui_gtk/views/peers.py b/src/meshcore_console/ui_gtk/views/peers.py index f7730ce..d7d2683 100644 --- a/src/meshcore_console/ui_gtk/views/peers.py +++ b/src/meshcore_console/ui_gtk/views/peers.py @@ -7,7 +7,7 @@ gi.require_version("Gtk", "4.0") -from gi.repository import Gtk, Pango +from gi.repository import GLib, Gtk, Pango if TYPE_CHECKING: pass @@ -121,7 +121,8 @@ def __init__(self, service: MeshcoreService, event_store: UiEventStore, layout: self._details_title.set_margin_bottom(8) details_column.append(self._details_title) - details_scroll = Gtk.ScrolledWindow.new() + self._details_scroll = Gtk.ScrolledWindow.new() + details_scroll = self._details_scroll details_scroll.set_vexpand(True) details_scroll.set_policy(Gtk.PolicyType.NEVER, Gtk.PolicyType.AUTOMATIC) @@ -331,6 +332,10 @@ def _show_peer_details(self, peer: Peer) -> None: message_btn.connect("clicked", self._on_send_message_clicked, peer) actions.append(message_btn) + telem_btn = Gtk.Button.new_with_label("Request Telemetry") + telem_btn.connect("clicked", self._on_request_telemetry_clicked, peer) + actions.append(telem_btn) + self._details_content.append(actions) def _add_section_header(self, title: str) -> None: @@ -382,3 +387,126 @@ def _on_send_message_clicked(self, _button: Gtk.Button, peer: Peer) -> None: """Navigate to messages view and start a conversation with this peer.""" logger.debug("UI: send message to peer=%s", peer.display_name) navigate(self, "messages", ("select_channel", peer.display_name)) + + def _on_request_telemetry_clicked(self, button: Gtk.Button, peer: Peer) -> None: + """Request telemetry from a peer in a background thread.""" + import threading + + button.set_sensitive(False) + button.set_label("Requesting...") + + # Add a spinner below the actions + spinner_box = Gtk.Box(orientation=Gtk.Orientation.HORIZONTAL, spacing=8) + spinner_box.set_halign(Gtk.Align.START) + spinner = Gtk.Spinner() + spinner.start() + spinner_box.append(spinner) + spinner_label = Gtk.Label(label="Waiting for telemetry response...") + spinner_label.add_css_class("panel-muted") + spinner_box.append(spinner_label) + self._details_content.append(spinner_box) + + def _do_request() -> None: + try: + result = self._service.request_telemetry(peer.display_name) + GLib.idle_add(self._show_telemetry_result, spinner_box, button, peer, result) + except Exception as exc: + GLib.idle_add(self._show_telemetry_error, spinner_box, button, str(exc)) + + threading.Thread(target=_do_request, daemon=True, name="telemetry-req").start() + + def _show_telemetry_result( + self, spinner_box: Gtk.Box, button: Gtk.Button, peer: Peer, result: dict + ) -> None: + """Display telemetry results in the details panel (called on main thread).""" + self._details_content.remove(spinner_box) + button.set_sensitive(True) + button.set_label("Request Telemetry") + + self._add_section_header("Telemetry") + + if not result or not result.get("success"): + reason = result.get("reason", "No telemetry data returned") if result else "No response" + no_data = Gtk.Label(label=reason) + no_data.add_css_class("panel-muted") + no_data.set_halign(Gtk.Align.START) + self._details_content.append(no_data) + self._scroll_details_to_bottom() + return + + rtt = result.get("rtt_ms") + if rtt is not None: + self._details_content.append(DetailRow("RTT:", f"{rtt:.0f} ms")) + + telemetry = result.get("telemetry_data") or {} + sensors = telemetry.get("sensors", []) + + if not sensors: + text = telemetry.get("formatted") or result.get("response_text") or "No sensor data" + fallback = Gtk.Label(label=text) + fallback.add_css_class("panel-muted") + fallback.set_halign(Gtk.Align.START) + self._details_content.append(fallback) + self._scroll_details_to_bottom() + return + + for sensor in sensors: + value = sensor.get("value") + sensor_type = sensor.get("type", "Unknown") + + if isinstance(value, dict) and "latitude" in value: + self._details_content.append( + DetailRow( + "Location:", format_coordinates(value["latitude"], value["longitude"]) + ) + ) + alt = value.get("altitude") + if alt is not None and alt != 0.0: + self._details_content.append(DetailRow("Altitude:", f"{alt:.1f} m")) + elif sensor_type == "Voltage": + self._details_content.append(DetailRow("Battery:", f"{value:.2f} V")) + elif sensor_type == "Temperature": + self._details_content.append(DetailRow("Temperature:", f"{value:.1f} °C")) + elif sensor_type == "Humidity": + self._details_content.append(DetailRow("Humidity:", f"{value:.0f}%")) + elif sensor_type == "Barometer": + self._details_content.append(DetailRow("Pressure:", f"{value:.1f} hPa")) + else: + self._details_content.append(DetailRow(f"{sensor_type}:", str(value))) + + self._scroll_details_to_bottom() + + def _show_telemetry_error(self, spinner_box: Gtk.Box, button: Gtk.Button, error: str) -> None: + """Display telemetry error in the details panel (called on main thread).""" + self._details_content.remove(spinner_box) + button.set_sensitive(True) + button.set_label("Request Telemetry") + + self._add_section_header("Telemetry") + + err_scroll = Gtk.ScrolledWindow() + err_scroll.set_policy(Gtk.PolicyType.NEVER, Gtk.PolicyType.AUTOMATIC) + err_scroll.set_max_content_height(120) + err_scroll.set_propagate_natural_height(True) + + err_label = Gtk.Label(label=f"Request failed: {error}") + err_label.add_css_class("panel-muted") + err_label.set_halign(Gtk.Align.START) + err_label.set_wrap(True) + err_label.set_wrap_mode(Pango.WrapMode.WORD_CHAR) + err_label.set_selectable(True) + err_scroll.set_child(err_label) + self._details_content.append(err_scroll) + + self._scroll_details_to_bottom() + + def _scroll_details_to_bottom(self) -> None: + """Scroll the details panel to the bottom after content is added.""" + + def _do_scroll() -> bool: + vadj = self._details_scroll.get_vadjustment() + vadj.set_value(vadj.get_upper()) + return False + + # Defer so GTK has time to allocate the new widgets + GLib.idle_add(_do_scroll) diff --git a/uv.lock b/uv.lock index 99483c2..f8f4ce0 100644 --- a/uv.lock +++ b/uv.lock @@ -534,6 +534,7 @@ version = "1.7.0" source = { editable = "." } dependencies = [ { name = "gpsdclient" }, + { name = "pycayennelpp" }, { name = "pymc-core" }, { name = "pymc-core", extra = ["hardware"], marker = "sys_platform == 'linux'" }, { name = "pynmea2" }, @@ -560,6 +561,7 @@ requires-dist = [ { name = "gpsdclient", specifier = ">=1.3" }, { name = "mypy", marker = "extra == 'dev'", specifier = ">=1.7" }, { name = "pre-commit", marker = "extra == 'dev'", specifier = ">=3.7" }, + { name = "pycayennelpp", specifier = ">=2.4.0" }, { name = "pygobject", marker = "extra == 'gtk'", specifier = ">=3.48" }, { name = "pymc-core", specifier = ">=0.1.0" }, { name = "pymc-core", extras = ["hardware"], marker = "sys_platform == 'linux'", specifier = ">=0.1.0" }, @@ -715,6 +717,15 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/d7/54/123f6239685f5f3f2edc123f1e38d2eefacebee18cf3c532d2f4bd51d0ef/pycairo-1.29.0-cp314-cp314t-win_arm64.whl", hash = "sha256:caba0837a4b40d47c8dfb0f24cccc12c7831e3dd450837f2a356c75f21ce5a15", size = 721404, upload-time = "2025-11-11T19:12:36.919Z" }, ] +[[package]] +name = "pycayennelpp" +version = "2.4.0" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/5f/d6/3587ea1936b479fbb5250de604f76e2f7e75ac15e845e7c0b7fb1065a157/pycayennelpp-2.4.0.tar.gz", hash = "sha256:bdf5e3e698ce40e66c5065794340e7149c1147f3fd98438e7ea72885c4a786b1", size = 11390, upload-time = "2021-10-12T20:02:33.95Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/6e/9a/79e6623e3b65b2ecf3294d3bb14aae74ea529da30d8ff5db1f9cb19c0aac/pycayennelpp-2.4.0-py3-none-any.whl", hash = "sha256:a3e69ea4da9e2971a44d1e275d5555617b2345eea752325f28c0356f64901d62", size = 10417, upload-time = "2021-10-12T20:02:32.573Z" }, +] + [[package]] name = "pycparser" version = "3.0"