From 2e2661d304f309ae44fb89f65d094b2ad69571c2 Mon Sep 17 00:00:00 2001 From: Francois Beutin Date: Wed, 28 Jan 2026 18:18:39 +0100 Subject: [PATCH] Detect error APDUs during navigation to have proper error APDUs exceptions instead of timeouts --- CHANGELOG.md | 6 +++++ src/ragger/backend/speculos.py | 29 +++++++++++++++++++++-- tests/functional/backend/test_speculos.py | 27 +++++++++++++++++++++ 3 files changed, 60 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 1c660ccd..cc8008f6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,12 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## [1.42.0] - 2026-01-29 + +### Changed + +- Detect error APDUs during navigation to have proper error APDUs exceptions instead of timeouts + ## [1.41.2] - 2026-01-08 ### Fixed diff --git a/src/ragger/backend/speculos.py b/src/ragger/backend/speculos.py index 4db2394f..9c289a8b 100644 --- a/src/ragger/backend/speculos.py +++ b/src/ragger/backend/speculos.py @@ -120,6 +120,7 @@ def __init__(self, api_url=self.url, **kwargs) self._pending: Optional[ApduResponse] = None + self._pending_async_response: Optional[ApduResponse] = None self._last_screenshot: Optional[BytesIO] = None self._home_screenshot: Optional[BytesIO] = None self._ticker_paused_count = 0 @@ -137,6 +138,15 @@ def apdu_timeout(self) -> float: def apdu_timeout(self, value: float) -> None: self._apdu_timeout = value + def _check_async_error(self) -> None: + """Check for async APDU errors and raise if present.""" + if self._pending_async_response is not None and self._last_async_response is None: + if has_data_available(self._pending_async_response, timeout=0): + self.logger.info("[Ragger] Early async data available, retrieving it now.") + # This will raise ExceptionRAPDU immediately if status != 9000 + self._last_async_response = self._get_last_async_response( + self._pending_async_response) + def _retrieve_client_screen_content(self) -> dict: raw_content = self._client.get_current_screen_content() # Keep only text events @@ -214,13 +224,22 @@ def _get_last_async_response(self, response) -> RAPDU: @contextmanager def exchange_async_raw(self, data: bytes = b"") -> Generator[bool, None, None]: self.apdu_logger.info("=> %s", data.hex()) + # Reset state for this new async exchange + self._last_async_response = None with self._client.apdu_exchange_nowait(cla=data[0], ins=data[1], p1=data[2], p2=data[3], data=data[5:]) as response: - yield has_data_available(response, timeout=self.apdu_timeout) - self._last_async_response = self._get_last_async_response(response) + self._pending_async_response = response + try: + yield has_data_available(response, timeout=self.apdu_timeout) + # Only retrieve if not already retrieved by _check_async_error during navigation + if self._last_async_response is None: + self._last_async_response = self._get_last_async_response(response) + finally: + # Clear pending async response flag in all cases + self._pending_async_response = None def right_click(self) -> None: self._client.press_and_release("right") @@ -315,6 +334,12 @@ def wait_for_screen_change(self, timeout: float = 10.0) -> None: if not screenshot_equal(screenshot, self._last_screenshot): break + # Check for async APDU errors before sending a tick. This ensures that if the + # application has already refused the APDU (e.g., due to an error), we detect and raise + # the error immediately instead of waiting for a screen change that will never occur. + # This makes navigation robust and prevents hanging. + self._check_async_error() + # Send a ticker event and let the app process it self.send_tick() screenshot = BytesIO(self._client.get_screenshot()) diff --git a/tests/functional/backend/test_speculos.py b/tests/functional/backend/test_speculos.py index aed44e4c..34b9f144 100644 --- a/tests/functional/backend/test_speculos.py +++ b/tests/functional/backend/test_speculos.py @@ -147,3 +147,30 @@ def test_clicks(self): self.backend.right_click() self.backend.left_click() self.backend.both_click() + + def test_async_error_raised_during_navigation(self): + """ + Ensure that an async APDU error is raised immediately during navigation (e.g., while waiting for a screen change), + not just at context exit. This simulates an error response being available before navigation completes. + Also verifies that sequential async exchanges properly reset state and retrieve fresh responses. + """ + with patch("speculos.client.subprocess"): + with SpeculosServerStub(): + with self.backend: + # Start an async exchange with an error APDU (not 0x9000) + with self.assertRaises(ExceptionRAPDU) as error: + with self.backend.exchange_async_raw(bytes.fromhex("01000000")): + # Simulate navigation that would trigger the async error check + # (wait_for_screen_change calls _check_async_error) + self.backend.wait_for_screen_change(timeout=0.5) + # This line should be unreachable - if reached, the error wasn't raised during navigation + assert False, "Expected ExceptionRAPDU was not raised during navigation" # pragma: no cover + self.assertEqual(error.exception.status, APDUStatus.ERROR) + + # Perform a second async exchange with a SUCCESS APDU to ensure state is properly reset + # If state wasn't reset, this could incorrectly raise due to the previous error response + with self.assertRaises(TimeoutError): + with self.backend.exchange_async_raw(bytes.fromhex("00000000")): + self.backend.wait_for_screen_change(timeout=1) + # Verify that the response was successfully retrieved despite the timeout + self.assertEqual(self.backend.last_async_response.status, APDUStatus.SUCCESS)