Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,12 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [1.42.0] - 2026-01-29

### Changed

- Detect error APDUs during navigation to have proper error APDUs exceptions instead of timeouts

## [1.41.2] - 2026-01-08

### Fixed
Expand Down
29 changes: 27 additions & 2 deletions src/ragger/backend/speculos.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ def __init__(self,
api_url=self.url,
**kwargs)
self._pending: Optional[ApduResponse] = None
self._pending_async_response: Optional[ApduResponse] = None
self._last_screenshot: Optional[BytesIO] = None
self._home_screenshot: Optional[BytesIO] = None
self._ticker_paused_count = 0
Expand All @@ -137,6 +138,15 @@ def apdu_timeout(self) -> float:
def apdu_timeout(self, value: float) -> None:
self._apdu_timeout = value

def _check_async_error(self) -> None:
"""Check for async APDU errors and raise if present."""
if self._pending_async_response is not None and self._last_async_response is None:
if has_data_available(self._pending_async_response, timeout=0):
self.logger.info("[Ragger] Early async data available, retrieving it now.")
# This will raise ExceptionRAPDU immediately if status != 9000
self._last_async_response = self._get_last_async_response(
self._pending_async_response)

def _retrieve_client_screen_content(self) -> dict:
raw_content = self._client.get_current_screen_content()
# Keep only text events
Expand Down Expand Up @@ -214,13 +224,22 @@ def _get_last_async_response(self, response) -> RAPDU:
@contextmanager
def exchange_async_raw(self, data: bytes = b"") -> Generator[bool, None, None]:
self.apdu_logger.info("=> %s", data.hex())
# Reset state for this new async exchange
self._last_async_response = None
with self._client.apdu_exchange_nowait(cla=data[0],
ins=data[1],
p1=data[2],
p2=data[3],
data=data[5:]) as response:
yield has_data_available(response, timeout=self.apdu_timeout)
self._last_async_response = self._get_last_async_response(response)
self._pending_async_response = response
try:
yield has_data_available(response, timeout=self.apdu_timeout)
# Only retrieve if not already retrieved by _check_async_error during navigation
if self._last_async_response is None:
self._last_async_response = self._get_last_async_response(response)
finally:
# Clear pending async response flag in all cases
self._pending_async_response = None

def right_click(self) -> None:
self._client.press_and_release("right")
Expand Down Expand Up @@ -315,6 +334,12 @@ def wait_for_screen_change(self, timeout: float = 10.0) -> None:
if not screenshot_equal(screenshot, self._last_screenshot):
break

# Check for async APDU errors before sending a tick. This ensures that if the
# application has already refused the APDU (e.g., due to an error), we detect and raise
# the error immediately instead of waiting for a screen change that will never occur.
# This makes navigation robust and prevents hanging.
self._check_async_error()

# Send a ticker event and let the app process it
self.send_tick()
screenshot = BytesIO(self._client.get_screenshot())
Expand Down
27 changes: 27 additions & 0 deletions tests/functional/backend/test_speculos.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,3 +147,30 @@
self.backend.right_click()
self.backend.left_click()
self.backend.both_click()

def test_async_error_raised_during_navigation(self):
"""
Ensure that an async APDU error is raised immediately during navigation (e.g., while waiting for a screen change),
not just at context exit. This simulates an error response being available before navigation completes.
Also verifies that sequential async exchanges properly reset state and retrieve fresh responses.
"""
with patch("speculos.client.subprocess"):
with SpeculosServerStub():
with self.backend:
# Start an async exchange with an error APDU (not 0x9000)
with self.assertRaises(ExceptionRAPDU) as error:
with self.backend.exchange_async_raw(bytes.fromhex("01000000")):
# Simulate navigation that would trigger the async error check
# (wait_for_screen_change calls _check_async_error)
self.backend.wait_for_screen_change(timeout=0.5)
# This line should be unreachable - if reached, the error wasn't raised during navigation
assert False, "Expected ExceptionRAPDU was not raised during navigation" # pragma: no cover
self.assertEqual(error.exception.status, APDUStatus.ERROR)

# Perform a second async exchange with a SUCCESS APDU to ensure state is properly reset
# If state wasn't reset, this could incorrectly raise due to the previous error response
with self.assertRaises(TimeoutError):
with self.backend.exchange_async_raw(bytes.fromhex("00000000")):
self.backend.wait_for_screen_change(timeout=1)
# Verify that the response was successfully retrieved despite the timeout
self.assertEqual(self.backend.last_async_response.status, APDUStatus.SUCCESS)
Loading