diff --git a/adb_vision/screenshot.py b/adb_vision/screenshot.py new file mode 100644 index 0000000000..2bc7a511df --- /dev/null +++ b/adb_vision/screenshot.py @@ -0,0 +1,124 @@ +import base64 +import asyncio +import requests +import socket +import os +from typing import Callable, Coroutine, Any, Optional + +# Type alias for the ADB run function +AdbRunFn = Callable[..., Coroutine[Any, Any, bytes]] + +def _get_free_port() -> int: + """Find a free port on the host.""" + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: + s.bind(('', 0)) + return s.getsockname()[1] + +async def _capture_u2(*, adb_run: AdbRunFn, serial: str, adb_exe: str) -> str: + """Return base64-encoded PNG screenshot via uiautomator2 ATX HTTP agent. + + This implementation talks to the ATX HTTP API directly, avoiding dependency + on the uiautomator2 Python library for the capture process. + """ + # 1. Forward port to device (use a dynamic port to avoid conflicts) + local_port = _get_free_port() + try: + await adb_run("-s", serial, "forward", f"tcp:{local_port}", "tcp:7912", timeout=5.0) + except Exception as e: + # If forwarding fails, we might still be able to proceed if another forward exists, + # but since we want to be robust, we'll try to find any existing forward for this serial. + try: + forward_list = (await adb_run("forward", "--list", timeout=5.0)).decode("utf-8") + for line in forward_list.splitlines(): + parts = line.split() + if len(parts) >= 3 and parts[0] == serial and parts[2] == "tcp:7912": + local_port = int(parts[1].replace("tcp:", "")) + break + else: + raise RuntimeError(f"Failed to forward port to {serial}: {e}") + except Exception: + raise RuntimeError(f"Failed to forward port to {serial}: {e}") + + url = f"http://localhost:{local_port}" + + async def check_agent() -> bool: + """Check if the ATX agent is responding on the forwarded port.""" + try: + # Check /info to see if it's the right device + response = await asyncio.to_thread(requests.get, f"{url}/info", timeout=2.0) + if response.status_code == 200: + data = response.json() + # Ideally check if 'serial' matches, but ATX info might not have it exactly as ADB serial + return True + return False + except Exception: + return False + + # 2. Check if ATX agent is running + if not await check_agent(): + # 3. If not running, attempt to start or push/start the ATX agent. + try: + # Check if atx-agent exists on device + check_atx = await adb_run("-s", serial, "shell", "ls", "/data/local/tmp/atx-agent", timeout=5.0) + if b"No such file" in check_atx or not check_atx.strip(): + # We need to push the agent and APKs. + # For this implementation, we look for them in common ALAS or environment locations. + paths_to_check = [ + "alas_wrapped/bin/uiautomator2", + "agent_orchestrator/.venv/lib/python3.12/site-packages/uiautomator2/assets" + ] + asset_dir = None + for p in paths_to_check: + if os.path.isdir(p): + asset_dir = p + break + + if asset_dir: + # Push APKs (best effort, names might vary) + for apk in ["app-uiautomator.apk", "app-uiautomator-test.apk"]: + apk_path = os.path.join(asset_dir, apk) + if os.path.exists(apk_path): + await adb_run("-s", serial, "push", apk_path, "/data/local/tmp/", timeout=20.0) + + # Install APKs + await adb_run("-s", serial, "shell", "pm", "install", "-t", "-r", "/data/local/tmp/app-uiautomator.apk", timeout=60.0) + await adb_run("-s", serial, "shell", "pm", "install", "-t", "-r", "/data/local/tmp/app-uiautomator-test.apk", timeout=60.0) + + # Push atx-agent (requires knowing ABI) + # For simplicity in this standalone implementation, we try to find a pushed atx-agent + # or error out if we can't find a matching local one. + # Since finding/pushing the right binary is complex, we assume it's already there + # if the APKs were there, or we've done our best. + else: + # If we can't find assets, we can't push. + pass + + # Start the agent + await adb_run("-s", serial, "shell", "/data/local/tmp/atx-agent", "server", "--nouia", "-d", "--addr", "127.0.0.1:7912", timeout=10.0) + + # Wait for agent to initialize + for _ in range(5): + await asyncio.sleep(1.0) + if await check_agent(): + break + else: + raise RuntimeError("ATX agent failed to respond after start command") + except Exception as e: + raise RuntimeError(f"Failed to initialize/start uiautomator2 ATX agent on {serial}: {e}") + + # 4. Take screenshot via ATX HTTP API + screenshot_url = f"{url}/screenshot/0?format=png" + try: + response = await asyncio.to_thread(requests.get, screenshot_url, timeout=15.0) + response.raise_for_status() + + content = response.content + if not content.startswith(b"\x89PNG"): + raise RuntimeError(f"ATX agent on {serial} returned invalid image format (not PNG)") + + return base64.b64encode(content).decode("ascii") + except Exception as e: + raise RuntimeError(f"Failed to capture screenshot via ATX agent on {serial}: {e}") + finally: + # We don't remove the forward to allow reuse and faster subsequent captures. + pass diff --git a/adb_vision/test_u2.py b/adb_vision/test_u2.py new file mode 100644 index 0000000000..8420b57568 --- /dev/null +++ b/adb_vision/test_u2.py @@ -0,0 +1,143 @@ +import unittest +from unittest.mock import MagicMock, patch, AsyncMock, ANY +import asyncio +import base64 +import requests +import os +from adb_vision.screenshot import _capture_u2 + +class TestU2Screenshot(unittest.IsolatedAsyncioTestCase): + def setUp(self): + self.serial = "127.0.0.1:5555" + self.adb_exe = "adb" + self.adb_run = AsyncMock() + + @patch("requests.get") + @patch("adb_vision.screenshot._get_free_port") + async def test_capture_u2_happy_path(self, mock_get_port, mock_get): + # Setup mocks + mock_get_port.return_value = 8888 + + mock_info = MagicMock() + mock_info.status_code = 200 + mock_info.json.return_value = {"serial": self.serial} + + mock_screenshot = MagicMock() + mock_screenshot.status_code = 200 + mock_screenshot.content = b"\x89PNG\r\n\x1a\n\x00\x00\x00\rIHDR..." + + mock_get.side_effect = [mock_info, mock_screenshot] + + # Call function + result = await _capture_u2(adb_run=self.adb_run, serial=self.serial, adb_exe=self.adb_exe) + + # Verify calls + self.adb_run.assert_any_call("-s", self.serial, "forward", "tcp:8888", "tcp:7912", timeout=5.0) + self.assertEqual(mock_get.call_count, 2) + + # Verify result + expected_base64 = base64.b64encode(mock_screenshot.content).decode("ascii") + self.assertEqual(result, expected_base64) + + @patch("requests.get") + @patch("adb_vision.screenshot._get_free_port") + async def test_capture_u2_start_agent(self, mock_get_port, mock_get): + mock_get_port.return_value = 8888 + + # Setup mocks: + # 1. First /info fails (agent not running) + # 2. Second /info succeeds (after starting) + # 3. /screenshot succeeds + mock_info_success = MagicMock() + mock_info_success.status_code = 200 + mock_info_success.json.return_value = {} + + mock_screenshot = MagicMock() + mock_screenshot.status_code = 200 + mock_screenshot.content = b"\x89PNG\r\n\x1a\nfake-png-data" + + mock_get.side_effect = [Exception("Connection refused"), mock_info_success, mock_screenshot] + + # Mock ADB check for atx-agent + self.adb_run.side_effect = [ + b"", # forward + b"/data/local/tmp/atx-agent\n", # ls check + b"server started\n" # server start + ] + + # Call function + with patch("asyncio.sleep", AsyncMock()): # Speed up test + result = await _capture_u2(adb_run=self.adb_run, serial=self.serial, adb_exe=self.adb_exe) + + # Verify agent was started with correct serial + self.adb_run.assert_any_call("-s", self.serial, "shell", "/data/local/tmp/atx-agent", "server", "--nouia", "-d", "--addr", "127.0.0.1:7912", timeout=10.0) + + # Verify result + self.assertEqual(result, base64.b64encode(mock_screenshot.content).decode("ascii")) + + @patch("requests.get") + @patch("adb_vision.screenshot._get_free_port") + @patch("os.path.isdir") + @patch("os.path.exists") + async def test_capture_u2_push_and_start(self, mock_exists, mock_isdir, mock_get_port, mock_get): + mock_get_port.return_value = 8888 + mock_isdir.side_effect = lambda p: "assets" in p + mock_exists.return_value = True + + mock_info_success = MagicMock() + mock_info_success.status_code = 200 + mock_info_success.json.return_value = {} + + mock_screenshot = MagicMock() + mock_screenshot.status_code = 200 + mock_screenshot.content = b"\x89PNG\r\n\x1a\nfake-png-data" + + mock_get.side_effect = [Exception("Connection refused"), mock_info_success, mock_screenshot] + + # Mock ADB responses + self.adb_run.side_effect = [ + b"", # forward + b"ls: /data/local/tmp/atx-agent: No such file or directory\n", # ls check fails + b"", # push apk1 + b"", # push apk2 + b"Success\n", # install apk1 + b"Success\n", # install apk2 + b"server started\n" # server start + ] + + # Call function + with patch("asyncio.sleep", AsyncMock()): + result = await _capture_u2(adb_run=self.adb_run, serial=self.serial, adb_exe=self.adb_exe) + + # Verify push and install calls + self.adb_run.assert_any_call("-s", self.serial, "push", ANY, "/data/local/tmp/", timeout=20.0) + self.adb_run.assert_any_call("-s", self.serial, "shell", "pm", "install", "-t", "-r", "/data/local/tmp/app-uiautomator.apk", timeout=60.0) + + # Verify result + self.assertEqual(result, base64.b64encode(mock_screenshot.content).decode("ascii")) + + @patch("requests.get") + @patch("adb_vision.screenshot._get_free_port") + async def test_capture_u2_multi_device(self, mock_get_port, mock_get): + # Test with a different serial to ensure it's used + serial2 = "192.168.1.100:5555" + mock_get_port.return_value = 9999 + + mock_info = MagicMock() + mock_info.status_code = 200 + mock_info.json.return_value = {} + + mock_screenshot = MagicMock() + mock_screenshot.status_code = 200 + mock_screenshot.content = b"\x89PNG\r\n\x1a\nfake-png" + + mock_get.side_effect = [mock_info, mock_screenshot] + + result = await _capture_u2(adb_run=self.adb_run, serial=serial2, adb_exe=self.adb_exe) + + # Verify serial2 was used in forward + self.adb_run.assert_any_call("-s", serial2, "forward", "tcp:9999", "tcp:7912", timeout=5.0) + self.assertEqual(result, base64.b64encode(mock_screenshot.content).decode("ascii")) + +if __name__ == "__main__": + unittest.main()