Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions AGENTS.md
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,9 @@ Mock implementation is in `meshcore/mock_session.py`. It exercises the same adap
| `MESHCORE_UI_GEOM_DEBUG=1` | Enable geometry debugging logs |
| `MESHCORE_USE_DIO2_RF=1` | Radio hardware flag (default on) |
| `MESHCORE_USE_DIO3_TCXO=1` | Radio hardware flag (default on) |
| `MESHCORE_GPSD_DISABLE=1` | Skip gpsd detection, use direct serial |
| `MESHCORE_GPSD_HOST` | gpsd hostname (default: 127.0.0.1) |
| `MESHCORE_GPSD_PORT` | gpsd port (default: 2947) |

### Nix Development (macOS)

Expand Down
3 changes: 2 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ dependencies = [
"pymc-core[hardware]>=0.1.0; platform_system == 'Linux'",
"pynmea2>=1.18.0",
"segno>=1.6.0", # QR code generation
"gpsdclient>=1.3",
]

[project.optional-dependencies]
Expand Down Expand Up @@ -74,7 +75,7 @@ no_implicit_optional = true
check_untyped_defs = true
# PyGObject stubs are incomplete, ignore missing imports for gi
[[tool.mypy.overrides]]
module = ["gi.*", "pynmea2", "serial", "RPi.*", "pymc_core.*", "spidev"]
module = ["gi.*", "pynmea2", "serial", "RPi.*", "pymc_core.*", "spidev", "gpsdclient"]
ignore_missing_imports = true

[tool.commitizen]
Expand Down
125 changes: 124 additions & 1 deletion src/meshcore_console/platform/gps.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

import logging
import os
import socket
import threading
from pathlib import Path
from typing import Callable, Protocol

Expand Down Expand Up @@ -405,13 +407,134 @@ def _parse_rmc_manual(self, sentence: str) -> None:
logger.debug("GPS: manual RMC parse error: %s", e)


class GpsdProvider:
"""GPS provider that reads from gpsd via gpsdclient.

Runs a background daemon thread streaming TPV (Time-Position-Velocity)
reports. Reconnects with exponential backoff if gpsd disconnects.
"""

def __init__(self, host: str = "127.0.0.1", port: int = 2947) -> None:
self._host = host
self._port = port
self._callback: Callable[[float, float], None] | None = None
self._running = False
self._latitude: float | None = None
self._longitude: float | None = None
self._has_fix = False
self._last_error: str | None = None
self._thread: threading.Thread | None = None

def start(self) -> None:
if self._running:
return
self._running = True
self._thread = threading.Thread(target=self._run, daemon=True)
self._thread.start()
logger.info("GpsdProvider: started (host=%s, port=%d)", self._host, self._port)

def stop(self) -> None:
self._running = False
logger.debug("GpsdProvider: stopped")

def get_location(self) -> tuple[float, float] | None:
if self._latitude is not None and self._longitude is not None:
return (self._latitude, self._longitude)
return None

def set_callback(self, callback: Callable[[float, float], None] | None) -> None:
self._callback = callback

def poll(self) -> bool:
return self._running

def get_last_error(self) -> str | None:
return self._last_error

def has_fix(self) -> bool:
return self._has_fix

def _update_location(self, lat: float, lon: float) -> None:
if lat == 0.0 and lon == 0.0:
return
if not self._has_fix:
self._has_fix = True
logger.info("GpsdProvider: fix acquired: %.6f, %.6f", lat, lon)
self._latitude = lat
self._longitude = lon
if self._callback:
self._callback(lat, lon)

def _run(self) -> None:
from gpsdclient import GPSDClient # type: ignore[import-not-found]

backoff = 1.0
max_backoff = 30.0

while self._running:
try:
logger.debug("GpsdProvider: connecting to %s:%d", self._host, self._port)
with GPSDClient(host=self._host, port=self._port) as client:
backoff = 1.0 # Reset on successful connection
for result in client.dict_stream(filter=["TPV"]):
if not self._running:
return
mode = result.get("mode", 0)
if mode < 2:
if self._has_fix:
self._has_fix = False
logger.info("GpsdProvider: fix lost (mode=%d)", mode)
continue
lat = result.get("lat")
lon = result.get("lon")
if lat is not None and lon is not None:
self._update_location(float(lat), float(lon))
except Exception as e:
if not self._running:
return
self._last_error = f"gpsd connection error: {e}"
logger.warning("GpsdProvider: %s (retry in %.0fs)", e, backoff)
# Sleep in small increments so we can check _running
elapsed = 0.0
while elapsed < backoff and self._running:
import time

time.sleep(min(0.5, backoff - elapsed))
elapsed += 0.5
backoff = min(backoff * 2, max_backoff)


def _gpsd_available(host: str = "127.0.0.1", port: int = 2947) -> bool:
"""Check if gpsd is reachable via a raw socket connect."""
try:
with socket.create_connection((host, port), timeout=1.0):
return True
except OSError:
return False


def create_gps_provider() -> GpsProvider:
"""Create the appropriate GPS provider for the current environment."""
"""Create the appropriate GPS provider for the current environment.

Priority:
1. MESHCORE_MOCK=1 → MockGps
2. gpsd reachable (unless MESHCORE_GPSD_DISABLE=1) → GpsdProvider
3. /dev/ttyS0 exists → UConsoleGps
4. Fallback → MockGps
"""
if os.environ.get("MESHCORE_MOCK", "0") == "1":
from meshcore_console.mock import MockGps

return MockGps()

# Check for gpsd
if os.environ.get("MESHCORE_GPSD_DISABLE", "0") != "1":
host = os.environ.get("MESHCORE_GPSD_HOST", "127.0.0.1")
port = int(os.environ.get("MESHCORE_GPSD_PORT", "2947"))
if _gpsd_available(host, port):
logger.info("GPS: gpsd detected at %s:%d, using GpsdProvider", host, port)
return GpsdProvider(host=host, port=port)

# Check if we're on a Pi with GPS hardware
if Path("/dev/ttyS0").exists():
return UConsoleGps()
Expand Down
150 changes: 150 additions & 0 deletions tests/unit/test_gps.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
"""Tests for GPS providers."""

from __future__ import annotations

import os
from unittest.mock import patch

from meshcore_console.platform.gps import (
GpsdProvider,
UConsoleGps,
_gpsd_available,
_nmea_to_decimal,
create_gps_provider,
)


# --- _nmea_to_decimal ---


def test_nmea_to_decimal_latitude_north() -> None:
# 37 degrees 46.9410 minutes N = 37.78235
result = _nmea_to_decimal("3746.9410", "N", is_longitude=False)
assert abs(result - 37.78235) < 0.001


def test_nmea_to_decimal_latitude_south() -> None:
result = _nmea_to_decimal("3746.9410", "S", is_longitude=False)
assert result < 0
assert abs(result + 37.78235) < 0.001


def test_nmea_to_decimal_longitude_west() -> None:
result = _nmea_to_decimal("12225.1234", "W", is_longitude=True)
assert result < 0


def test_nmea_to_decimal_longitude_east() -> None:
result = _nmea_to_decimal("12225.1234", "E", is_longitude=True)
assert result > 0


# --- GpsdProvider ---


def test_gpsd_provider_initial_state() -> None:
provider = GpsdProvider()
assert provider.get_location() is None
assert not provider.has_fix()
assert provider.get_last_error() is None


def test_gpsd_provider_update_location() -> None:
provider = GpsdProvider()
provider._update_location(37.7749, -122.4194)
assert provider.has_fix()
loc = provider.get_location()
assert loc is not None
assert abs(loc[0] - 37.7749) < 0.0001
assert abs(loc[1] - (-122.4194)) < 0.0001


def test_gpsd_provider_rejects_zero_zero() -> None:
provider = GpsdProvider()
provider._update_location(0.0, 0.0)
assert provider.get_location() is None
assert not provider.has_fix()


def test_gpsd_provider_callback() -> None:
provider = GpsdProvider()
received: list[tuple[float, float]] = []
provider.set_callback(lambda lat, lon: received.append((lat, lon)))
provider._update_location(37.7749, -122.4194)
assert len(received) == 1
assert received[0] == (37.7749, -122.4194)


def test_gpsd_provider_fix_loss() -> None:
provider = GpsdProvider()
provider._update_location(37.7749, -122.4194)
assert provider.has_fix()
# Simulate fix loss: _has_fix is cleared by the _run loop when mode < 2,
# but we can test the attribute directly
provider._has_fix = False
assert not provider.has_fix()


def test_gpsd_provider_poll_returns_running_state() -> None:
provider = GpsdProvider()
assert provider.poll() is False # not started
provider._running = True
assert provider.poll() is True


# --- _gpsd_available ---


def test_gpsd_available_returns_false_for_closed_port() -> None:
# Port 1 should not have gpsd listening
assert _gpsd_available("127.0.0.1", 1) is False


# --- create_gps_provider ---


def test_create_gps_provider_mock_mode() -> None:
from meshcore_console.mock.gps import MockGps

with patch.dict(os.environ, {"MESHCORE_MOCK": "1"}):
provider = create_gps_provider()
assert isinstance(provider, MockGps)


def test_create_gps_provider_prefers_gpsd_when_available() -> None:
env = {"MESHCORE_MOCK": "0"}
with (
patch.dict(os.environ, env, clear=False),
patch("meshcore_console.platform.gps._gpsd_available", return_value=True),
):
provider = create_gps_provider()
assert isinstance(provider, GpsdProvider)


def test_create_gps_provider_respects_gpsd_disable() -> None:
env = {"MESHCORE_MOCK": "0", "MESHCORE_GPSD_DISABLE": "1"}
with (
patch.dict(os.environ, env, clear=False),
patch("meshcore_console.platform.gps._gpsd_available", return_value=True) as mock_avail,
patch("meshcore_console.platform.gps.Path") as mock_path,
):
mock_path.return_value.exists.return_value = False
provider = create_gps_provider()
# gpsd should not even be checked
mock_avail.assert_not_called()
# Should fall through to mock since /dev/ttyS0 doesn't exist
from meshcore_console.mock.gps import MockGps

assert isinstance(provider, MockGps)


def test_create_gps_provider_falls_back_to_serial() -> None:
env = {"MESHCORE_MOCK": "0"}
with (
patch.dict(os.environ, env, clear=False),
patch("meshcore_console.platform.gps._gpsd_available", return_value=False),
patch("meshcore_console.platform.gps.Path") as mock_path,
):
mock_path.return_value.exists.return_value = True
provider = create_gps_provider()
assert isinstance(provider, UConsoleGps)
13 changes: 12 additions & 1 deletion uv.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.