diff --git a/README.md b/README.md index 75ad55d..1038436 100644 --- a/README.md +++ b/README.md @@ -33,6 +33,7 @@ The Sagemcom F@st series is used by multiple cable companies, where some cable c | Sagemcom F@st 5370e | Telia | sha512 | | | Sagemcom F@st 5380 | TDC | md5 | | | Sagemcom F@st 5566 | Bell (Home Hub 3000) | md5 | username: guest, password: "" | +| Sagemcom F@st 5598 | YouFibre | None / New API | username: admin, password: "" | | Sagemcom F@st 5688T | Salt (FibreBox_X6) | sha512 | username: admin | | Sagemcom F@st 5689 | Bell (Home Hub 4000) | md5 | username: admin, password: "" | | Sagemcom F@st 5689E | Bell (Giga Hub) | sha512 | username: admin, password: "" | @@ -63,7 +64,7 @@ The following script can be used as a quickstart. ```python import asyncio from sagemcom_api.client import SagemcomClient -from sagemcom_api.enums import EncryptionMethod +from sagemcom_api.enums import ApiMode, EncryptionMethod from sagemcom_api.exceptions import NonWritableParameterException HOST = "" @@ -71,9 +72,17 @@ USERNAME = "" PASSWORD = "" ENCRYPTION_METHOD = EncryptionMethod.SHA512 # or EncryptionMethod.MD5 VALIDATE_SSL_CERT = True +API_MODE = ApiMode.AUTO # auto, legacy or rest async def main() -> None: - async with SagemcomClient(HOST, USERNAME, PASSWORD, ENCRYPTION_METHOD, verify_ssl=VALIDATE_SSL_CERT) as client: + async with SagemcomClient( + HOST, + USERNAME, + PASSWORD, + ENCRYPTION_METHOD, + api_mode=API_MODE, + verify_ssl=VALIDATE_SSL_CERT, + ) as client: try: await client.login() except Exception as exception: # pylint: disable=broad-except @@ -119,11 +128,21 @@ asyncio.run(main()) ## Advanced +### API Mode + +The client supports two API variants: + +- `ApiMode.LEGACY`: original `/cgi/json-req` API with XPath support +- `ApiMode.REST`: newer `/api/v1/*` API used by newer firmwares +- `ApiMode.AUTO` (default): tries legacy first, then falls back to REST when the legacy endpoint is unavailable + +When REST mode is active, high-level helpers like `get_device_info()`, `get_hosts()` and `reboot()` are supported. XPath-based methods (`get_value_by_xpath`, `set_value_by_xpath`, `get_values_by_xpaths`) are legacy-only. + ### Determine the EncryptionMethod If you are not sure which encryption method to use, you can leave it empty or pass `None` and use `get_encryption_method` to determine the encryption method. -`get_encryption_method` will return an `EncryptionMethod` when a match is found. Best would be to use this function only during your initial investigation. +`get_encryption_method` will return an `EncryptionMethod` when a match is found and `EncryptionMethod.NONE` if no method matches or REST mode is configured. Best would be to use this function only during your initial investigation. This function will throw a `LoginTimeoutException` when no match is found, since this is still a HTTP Time Out. This could caused by the wrong encryption method, but also by trying to connect to an inaccessible host. diff --git a/sagemcom_api/client.py b/sagemcom_api/client.py index 4d8347d..133b554 100644 --- a/sagemcom_api/client.py +++ b/sagemcom_api/client.py @@ -17,6 +17,8 @@ ClientOSError, ClientSession, ClientTimeout, + CookieJar, + ContentTypeError, ServerDisconnectedError, TCPConnector, ) @@ -39,7 +41,7 @@ XMO_REQUEST_NO_ERR, XMO_UNKNOWN_PATH_ERR, ) -from .enums import EncryptionMethod +from .enums import ApiMode, EncryptionMethod from .exceptions import ( AccessRestrictionException, AuthenticationException, @@ -75,6 +77,7 @@ def __init__( username: str, password: str, authentication_method: EncryptionMethod | None = None, + api_mode: ApiMode | str = ApiMode.AUTO, session: ClientSession | None = None, ssl: bool | None = False, verify_ssl: bool | None = True, @@ -86,11 +89,16 @@ def __init__( :param username: the username for your Sagemcom router :param password: the password for your Sagemcom router :param authentication_method: the auth method of your Sagemcom router + :param api_mode: one of auto, legacy or rest :param session: use a custom session, for example to configure the timeout """ self.host = host self.username = username self.authentication_method = authentication_method + self.api_mode = ApiMode(api_mode) + self._active_api_mode: ApiMode = ( + self.api_mode if self.api_mode != ApiMode.AUTO else ApiMode.LEGACY + ) self.password = password self._current_nonce = None self._password_hash = self.__generate_hash(password) @@ -106,12 +114,18 @@ def __init__( else ClientSession( headers={"User-Agent": f"{DEFAULT_USER_AGENT}"}, timeout=ClientTimeout(DEFAULT_TIMEOUT), + cookie_jar=CookieJar(unsafe=True), connector=TCPConnector( verify_ssl=verify_ssl if verify_ssl is not None else True ), ) ) + @property + def active_api_mode(self) -> ApiMode: + """Return the API mode that is currently active.""" + return self._active_api_mode + async def __aenter__(self) -> SagemcomClient: """TODO.""" return self @@ -315,8 +329,8 @@ async def __api_request_async(self, actions, priority=False): ) as exception: raise ConnectionError(str(exception)) from exception - async def login(self): - """Login to the SagemCom F@st router using a username and password.""" + async def __legacy_login(self): + """Login to the legacy JSON-REQ API.""" actions = { "id": 0, @@ -358,20 +372,245 @@ async def login(self): raise UnauthorizedException(data) + @backoff.on_exception( + backoff.expo, + (ClientConnectorError, ClientOSError, ServerDisconnectedError), + max_tries=5, + ) + async def __rest_request( + self, method: str, endpoint: str, data: dict[str, Any] | None = None + ): + """Call the REST API using form-encoded payloads.""" + url = f"{self.protocol}://{self.host}{endpoint}" + payload = urllib.parse.urlencode(data or {}) + request_headers = {"Content-Type": "application/x-www-form-urlencoded"} + + async with self.session.request( + method, url, data=payload, headers=request_headers + ) as response: + if response.status in (200, 204): + if response.status == 204: + return None + try: + return await response.json() + except (json.JSONDecodeError, ContentTypeError): + return await response.text() + + result = await response.text() + if response.status in (401, 403): + raise UnauthorizedException(result) + + if response.status == 404: + raise UnsupportedHostException(result) + + if response.status == 400: + raise AuthenticationException(result) + + raise UnknownException(result) + + async def __rest_login(self): + """Login to routers exposing the newer REST API.""" + await self.__rest_request( + "POST", + "/api/v1/login", + data={"login": self.username, "password": self.password}, + ) + return True + + async def __probe_rest_availability(self) -> bool: + """Try a REST login/logout sequence to detect REST-only firmware.""" + try: + await self.__rest_request( + "POST", + "/api/v1/login", + data={"login": self.username, "password": self.password}, + ) + except ( + AuthenticationException, + UnauthorizedException, + UnsupportedHostException, + UnknownException, + ): + return False + + await self.__rest_request("POST", "/api/v1/logout", data={"_": ""}) + return True + + @staticmethod + def __first_value(data: dict[str, Any], *keys: str) -> Any: + """Return the first non-None value from data for the given keys.""" + for key in keys: + if key in data and data[key] is not None: + return data[key] + return None + + @staticmethod + def __to_bool(value: Any, default: bool = True) -> bool: + """Convert mixed payload boolean values to bool.""" + if value is None: + return default + if isinstance(value, bool): + return value + if isinstance(value, (int, float)): + return bool(value) + if isinstance(value, str): + return value.strip().lower() in ("1", "true", "yes", "on", "up") + return default + + def __build_rest_device( + self, entry: dict[str, Any], interface_type: str | None + ) -> Device: + """Map a REST host entry to Device.""" + detected_interface = self.__first_value( + entry, + "interface_type", + "interfaceType", + "interface", + "connectionType", + "connection_type", + "type", + ) + if interface_type is None and isinstance(detected_interface, str): + normalized = detected_interface.lower() + if "wifi" in normalized or "wireless" in normalized or "wlan" in normalized: + interface_type = "wifi" + elif "ethernet" in normalized or "eth" in normalized or "lan" in normalized: + interface_type = "ethernet" + else: + interface_type = detected_interface + + return Device( + uid=self.__first_value(entry, "id", "uid"), + phys_address=self.__first_value( + entry, "macAddress", "mac_address", "phys_address" + ), + ip_address=self.__first_value(entry, "ipAddress", "ip_address"), + host_name=self.__first_value(entry, "hostname", "host_name", "name"), + user_host_name=self.__first_value( + entry, "friendlyname", "friendly_name", "user_host_name" + ), + active=self.__to_bool( + self.__first_value(entry, "active", "isActive"), True + ), + interface_type=interface_type, + detected_device_type=self.__first_value( + entry, "devicetype", "deviceType", "detected_device_type" + ), + ) + + def __extract_rest_home_hosts(self, data: Any) -> list[Device]: + """Parse /api/v1/home hosts payload.""" + if isinstance(data, list): + if not data: + return [] + home = data[0] + elif isinstance(data, dict): + home = data + else: + raise UnknownException("Invalid response from /api/v1/home") + + if not isinstance(home, dict): + raise UnknownException("Invalid response from /api/v1/home") + + devices: list[Device] = [] + for entry in home.get("wirelessListDevice", []): + if isinstance(entry, dict): + devices.append(self.__build_rest_device(entry, "wifi")) + + for entry in home.get("ethernetListDevice", []): + if isinstance(entry, dict): + devices.append(self.__build_rest_device(entry, "ethernet")) + + return devices + + def __extract_rest_hosts(self, data: Any) -> list[Device]: + """Parse /api/v1/hosts payload.""" + hosts: list[dict[str, Any]] + if isinstance(data, list): + hosts = [entry for entry in data if isinstance(entry, dict)] + elif isinstance(data, dict): + raw_hosts = self.__first_value( + data, + "hosts", + "Hosts", + "list", + "listDevice", + "list_device", + "devices", + ) + if not isinstance(raw_hosts, list): + raise UnknownException("Invalid response from /api/v1/hosts") + hosts = [entry for entry in raw_hosts if isinstance(entry, dict)] + else: + raise UnknownException("Invalid response from /api/v1/hosts") + + return [self.__build_rest_device(entry, None) for entry in hosts] + + def __should_fallback_to_rest(self, exception: Exception) -> bool: + """Return True when legacy API failure indicates a REST-only router.""" + if isinstance(exception, UnsupportedHostException): + return True + + if isinstance(exception, (UnknownException, BadRequestException)): + content = str(exception).lower() + return "service unavailable" in content or " EncryptionMethod: """Determine which encryption method to use for authentication and set it directly.""" + if self.api_mode == ApiMode.REST: + return EncryptionMethod.NONE + + if self.api_mode == ApiMode.AUTO and await self.__probe_rest_availability(): + return EncryptionMethod.NONE + for encryption_method in EncryptionMethod: try: + if encryption_method == EncryptionMethod.NONE: + continue self.authentication_method = encryption_method self._password_hash = self.__generate_hash( self.password, encryption_method @@ -391,7 +630,7 @@ async def get_encryption_method(self): ): pass - return None + return EncryptionMethod.NONE @backoff.on_exception( backoff.expo, @@ -411,6 +650,8 @@ async def get_value_by_xpath(self, xpath: str, options: dict | None = None) -> d :param xpath: path expression :param options: optional options """ + self.__ensure_legacy_api() + actions = { "id": 0, "method": "getValue", @@ -441,6 +682,8 @@ async def get_values_by_xpaths(self, xpaths, options: dict | None = None) -> dic :param xpaths: Dict of key to xpath expression :param options: optional options """ + self.__ensure_legacy_api() + actions = [ { "id": i, @@ -478,6 +721,8 @@ async def set_value_by_xpath( :param value: value :param options: optional options """ + self.__ensure_legacy_api() + actions = { "id": 0, "method": "setValue", @@ -503,6 +748,26 @@ async def set_value_by_xpath( ) async def get_device_info(self) -> DeviceInfo: """Retrieve information about Sagemcom F@st device.""" + if self._active_api_mode == ApiMode.REST: + data = await self.__rest_request("GET", "/api/v1/device") + if not data or not isinstance(data, list): + raise UnknownException("Invalid response from /api/v1/device") + + device = data[0].get("device", {}) + return DeviceInfo( + mac_address=device.get("wan_mac_address"), + serial_number=device.get("serialnumber"), + model_name=device.get("modelname"), + model_number=device.get("modelname"), + product_class=device.get("modelname"), + software_version=device.get("running", {}).get("version"), + hardware_version=device.get("hardware_version"), + manufacturer="Sagemcom", + up_time=device.get("uptime"), + first_use_date=device.get("firstusedate"), + reboot_count=device.get("numberofboots"), + ) + try: data = await self.get_value_by_xpath("Device/DeviceInfo") return DeviceInfo(**data["device_info"]) @@ -534,6 +799,34 @@ async def get_device_info(self) -> DeviceInfo: ) async def get_hosts(self, only_active: bool | None = False) -> list[Device]: """Retrieve hosts connected to Sagemcom F@st device.""" + if self._active_api_mode == ApiMode.REST: + rest_errors: list[Exception] = [] + devices: list[Device] = [] + + for endpoint, parser in ( + ("/api/v1/home", self.__extract_rest_home_hosts), + ("/api/v1/hosts", self.__extract_rest_hosts), + ): + try: + data = await self.__rest_request("GET", endpoint) + devices = parser(data) + break + except ( + UnknownException, + UnsupportedHostException, + AuthenticationException, + ) as exception: + rest_errors.append(exception) + else: + if rest_errors: + raise rest_errors[-1] + raise UnknownException("Unable to retrieve hosts using REST endpoints") + + if only_active: + return [d for d in devices if d.active is True] + + return devices + data = await self.get_value_by_xpath( "Device/Hosts/Hosts", options={"capability-flags": {"interface": True}} ) @@ -558,6 +851,7 @@ async def get_hosts(self, only_active: bool | None = False) -> list[Device]: ) async def get_port_mappings(self) -> list[PortMapping]: """Retrieve configured Port Mappings on Sagemcom F@st device.""" + self.__ensure_legacy_api() data = await self.get_value_by_xpath("Device/NAT/PortMappings") port_mappings = [PortMapping(**p) for p in data] @@ -576,6 +870,9 @@ async def get_port_mappings(self) -> list[PortMapping]: ) async def reboot(self): """Reboot Sagemcom F@st device.""" + if self._active_api_mode == ApiMode.REST: + return await self.__rest_request("POST", "/api/v1/device/reboot") + action = { "id": 0, "method": "reboot", diff --git a/sagemcom_api/enums.py b/sagemcom_api/enums.py index 6025b59..5942499 100644 --- a/sagemcom_api/enums.py +++ b/sagemcom_api/enums.py @@ -18,3 +18,13 @@ class EncryptionMethod(StrEnum): MD5 = "MD5" MD5_NONCE = "MD5_NONCE" SHA512 = "SHA512" + NONE = "NONE" + + +@unique +class ApiMode(StrEnum): + """API mode to use when communicating with the router.""" + + AUTO = "auto" + LEGACY = "legacy" + REST = "rest" diff --git a/tests/unit/test_client_basic.py b/tests/unit/test_client_basic.py index ba98fda..e8a7f55 100644 --- a/tests/unit/test_client_basic.py +++ b/tests/unit/test_client_basic.py @@ -2,13 +2,30 @@ # pylint: disable=protected-access -import pytest +from unittest.mock import AsyncMock, MagicMock +from aiohttp import ClientSession +import pytest from sagemcom_api.client import SagemcomClient -from sagemcom_api.enums import EncryptionMethod +from sagemcom_api.enums import ApiMode, EncryptionMethod from sagemcom_api.exceptions import AuthenticationException +@pytest.mark.asyncio +async def test_default_session_accepts_ip_cookies(): + """Default aiohttp session should accept cookies from IP hosts.""" + client = SagemcomClient( + host="192.168.1.1", + username="admin", + password="admin", + authentication_method=EncryptionMethod.MD5, + ) + try: + assert getattr(client.session.cookie_jar, "_unsafe", False) is True + finally: + await client.close() + + @pytest.mark.asyncio async def test_login_success(mock_session_factory, login_success_response): """ @@ -131,3 +148,267 @@ async def test_login_with_preconfigured_fixture(mock_client_sha512): assert client.authentication_method == EncryptionMethod.SHA512 assert client._session_id == 12345 assert client._server_nonce == "abcdef1234567890" + + +@pytest.mark.asyncio +async def test_login_auto_fallbacks_to_rest_when_legacy_503(): + """Auto mode should switch to REST when legacy endpoint is unavailable.""" + mock_session = MagicMock(spec=ClientSession) + mock_session.close = AsyncMock() + + legacy_response = AsyncMock() + legacy_response.status = 503 + legacy_response.text = AsyncMock(return_value="503 Service Unavailable") + legacy_response.__aenter__ = AsyncMock(return_value=legacy_response) + legacy_response.__aexit__ = AsyncMock(return_value=None) + mock_session.post.return_value = legacy_response + + rest_response = AsyncMock() + rest_response.status = 204 + rest_response.text = AsyncMock(return_value="") + rest_response.__aenter__ = AsyncMock(return_value=rest_response) + rest_response.__aexit__ = AsyncMock(return_value=None) + mock_session.request.return_value = rest_response + + client = SagemcomClient( + host="192.168.1.1", + username="admin", + password="admin", + authentication_method=EncryptionMethod.MD5, + session=mock_session, + api_mode=ApiMode.AUTO, + ) + + result = await client.login() + + assert result is True + assert client.active_api_mode == ApiMode.REST + assert mock_session.post.call_count == 1 + assert mock_session.request.call_count == 1 + + +@pytest.mark.asyncio +async def test_get_encryption_method_rest_returns_none(): + """REST mode should immediately signal that no encryption method is needed.""" + client = SagemcomClient( + host="192.168.1.1", + username="admin", + password="admin", + api_mode=ApiMode.REST, + ) + + result = await client.get_encryption_method() + + assert result == EncryptionMethod.NONE + await client.close() + + +@pytest.mark.asyncio +async def test_get_hosts_rest_mode(): + """get_hosts should parse wifi and ethernet devices on REST firmware.""" + mock_session = MagicMock(spec=ClientSession) + mock_session.close = AsyncMock() + + login_response = AsyncMock() + login_response.status = 204 + login_response.text = AsyncMock(return_value="") + login_response.__aenter__ = AsyncMock(return_value=login_response) + login_response.__aexit__ = AsyncMock(return_value=None) + + home_payload = [ + { + "wirelessListDevice": [ + { + "id": 1, + "hostname": "wifi-device", + "friendlyname": "wifi-device", + "macAddress": "aa:bb:cc:dd:ee:ff", + "ipAddress": "192.168.1.2", + "active": True, + "devicetype": "MISCELLANEOUS", + } + ], + "ethernetListDevice": [ + { + "id": 2, + "hostname": "lan-device", + "friendlyname": "lan-device", + "macAddress": "11:22:33:44:55:66", + "ipAddress": "192.168.1.3", + "active": True, + "devicetype": "MISCELLANEOUS", + } + ], + } + ] + hosts_response = AsyncMock() + hosts_response.status = 200 + hosts_response.json = AsyncMock(return_value=home_payload) + hosts_response.__aenter__ = AsyncMock(return_value=hosts_response) + hosts_response.__aexit__ = AsyncMock(return_value=None) + + mock_session.request.side_effect = [login_response, hosts_response] + + client = SagemcomClient( + host="192.168.1.1", + username="admin", + password="admin", + session=mock_session, + api_mode=ApiMode.REST, + ) + + await client.login() + devices = await client.get_hosts() + + assert len(devices) == 2 + assert devices[0].host_name == "wifi-device" + assert devices[0].interface_type == "wifi" + assert devices[1].host_name == "lan-device" + assert devices[1].interface_type == "ethernet" + + +@pytest.mark.asyncio +async def test_get_hosts_rest_fallbacks_to_hosts_endpoint(): + """/api/v1/hosts should be used when /api/v1/home response is invalid.""" + mock_session = MagicMock(spec=ClientSession) + mock_session.close = AsyncMock() + + login_response = AsyncMock() + login_response.status = 204 + login_response.text = AsyncMock(return_value="") + login_response.__aenter__ = AsyncMock(return_value=login_response) + login_response.__aexit__ = AsyncMock(return_value=None) + + home_response = AsyncMock() + home_response.status = 200 + home_response.json = AsyncMock(return_value=[{"unexpected": "shape"}]) + home_response.__aenter__ = AsyncMock(return_value=home_response) + home_response.__aexit__ = AsyncMock(return_value=None) + + hosts_payload = [ + { + "id": 7, + "hostname": "tablet", + "friendlyname": "tablet", + "macAddress": "aa:aa:aa:aa:aa:aa", + "ipAddress": "192.168.1.50", + "active": "true", + "interfaceType": "wireless", + "devicetype": "TABLET", + } + ] + hosts_response = AsyncMock() + hosts_response.status = 200 + hosts_response.json = AsyncMock(return_value=hosts_payload) + hosts_response.__aenter__ = AsyncMock(return_value=hosts_response) + hosts_response.__aexit__ = AsyncMock(return_value=None) + + mock_session.request.side_effect = [login_response, home_response, hosts_response] + + client = SagemcomClient( + host="192.168.1.1", + username="admin", + password="admin", + session=mock_session, + api_mode=ApiMode.REST, + ) + + await client.login() + devices = await client.get_hosts(only_active=True) + + assert len(devices) == 1 + assert devices[0].host_name == "tablet" + assert devices[0].interface_type == "wifi" + assert devices[0].active is True + + +@pytest.mark.asyncio +async def test_get_hosts_rest_fallbacks_on_home_400(): + """/api/v1/hosts should be tried when /api/v1/home returns HTTP 400.""" + mock_session = MagicMock(spec=ClientSession) + mock_session.close = AsyncMock() + + login_response = AsyncMock() + login_response.status = 204 + login_response.text = AsyncMock(return_value="") + login_response.__aenter__ = AsyncMock(return_value=login_response) + login_response.__aexit__ = AsyncMock(return_value=None) + + home_response = AsyncMock() + home_response.status = 400 + home_response.text = AsyncMock(return_value='{"exception":{"domain":"/api/v1/home"}}') + home_response.__aenter__ = AsyncMock(return_value=home_response) + home_response.__aexit__ = AsyncMock(return_value=None) + + hosts_payload = [ + { + "id": 3, + "hostname": "phone", + "friendlyname": "phone", + "macAddress": "de:ad:be:ef:00:01", + "ipAddress": "192.168.1.25", + "active": True, + "interfaceType": "wireless", + "devicetype": "SMARTPHONE", + } + ] + hosts_response = AsyncMock() + hosts_response.status = 200 + hosts_response.json = AsyncMock(return_value=hosts_payload) + hosts_response.__aenter__ = AsyncMock(return_value=hosts_response) + hosts_response.__aexit__ = AsyncMock(return_value=None) + + mock_session.request.side_effect = [login_response, home_response, hosts_response] + + client = SagemcomClient( + host="192.168.1.1", + username="admin", + password="admin", + session=mock_session, + api_mode=ApiMode.REST, + ) + + await client.login() + devices = await client.get_hosts() + + assert len(devices) == 1 + assert devices[0].host_name == "phone" + assert devices[0].interface_type == "wifi" + + +@pytest.mark.asyncio +async def test_reboot_rest_mode(): + """reboot should call REST endpoint on REST firmware.""" + mock_session = MagicMock(spec=ClientSession) + mock_session.close = AsyncMock() + + login_response = AsyncMock() + login_response.status = 204 + login_response.text = AsyncMock(return_value="") + login_response.__aenter__ = AsyncMock(return_value=login_response) + login_response.__aexit__ = AsyncMock(return_value=None) + + reboot_response = AsyncMock() + reboot_response.status = 204 + reboot_response.text = AsyncMock(return_value="") + reboot_response.__aenter__ = AsyncMock(return_value=reboot_response) + reboot_response.__aexit__ = AsyncMock(return_value=None) + + mock_session.request.side_effect = [login_response, reboot_response] + + client = SagemcomClient( + host="192.168.1.1", + username="admin", + password="admin", + session=mock_session, + api_mode=ApiMode.REST, + ) + + await client.login() + result = await client.reboot() + + assert result is None + assert mock_session.request.call_count == 2 + reboot_call = mock_session.request.call_args_list[1] + assert reboot_call.args[0] == "POST" + assert reboot_call.args[1].endswith("/api/v1/device/reboot")