diff --git a/src/defib/agent/client.py b/src/defib/agent/client.py index 7557e63..fd5b0a9 100644 --- a/src/defib/agent/client.py +++ b/src/defib/agent/client.py @@ -143,7 +143,13 @@ def bad_block(self) -> list[SectorResult]: def get_agent_binary(chip: str) -> Path | None: - """Get the path to the pre-compiled agent binary for a chip.""" + """Get the path to the pre-compiled agent binary for a chip. + + Accepts an optional ``:variant`` suffix (e.g. ``hi3516av300:emmc``); + the variant only affects DDR init, not the agent binary, so we strip + it before the lookup. + """ + chip = chip.split(":", 1)[0] chip_to_agent = { "hi3516ev300": "hi3516ev300", "gk7205v200": "gk7205v200", diff --git a/src/defib/cli/app.py b/src/defib/cli/app.py index 3d6b950..b6044ce 100644 --- a/src/defib/cli/app.py +++ b/src/defib/cli/app.py @@ -942,6 +942,58 @@ def list_interfaces_cmd( app.add_typer(agent_app, name="agent") +def _agent_not_responding_message(chip: str, uboot_address: int | None = None) -> str: + """Build the diagnostic shown when boot protocol uploads complete but the + agent never sends READY. + + The single most common cause we've seen is a board-variant DDR mismatch: + defib's per-chip DDR init (PRESTEP0/DDRSTEP0) is calibrated for one + board, the bootrom faithfully jumps to the agent's load address, but + DDR isn't actually backed there so the CPU fetches garbage and hangs + silently with no UART output. The diagnostic surfaces the board-variant + fix path first and falls back to the vendor-U-Boot loadx manual route. + """ + from defib.profiles.loader import list_variants, parse_chip_variant + + base_chip, current_variant = parse_chip_variant(chip) + variants = list_variants(base_chip) + addr = f"0x{uboot_address:08x}" if uboot_address is not None else "" + + lines = [ + "Boot-protocol upload completed but the agent never sent READY.", + "", + "Most common cause: the chip profile's DDR init (PRESTEP0/DDRSTEP0)", + "doesn't match this board's DDR layout. The bootrom faithfully calls", + f"the agent at {addr}, but DDR isn't backed there, so the CPU", + "fetches garbage and hangs silently (no UART output).", + "", + ] + suggested = [v for v in variants if v != current_variant] + if suggested: + lines.append(f"Known board variants for {base_chip}: {', '.join(variants)}") + lines.append(f" Try: defib agent upload -c {base_chip}:{suggested[0]} ...") + lines.append("") + elif variants: + lines.append( + f"You already specified variant '{current_variant}' — no other " + f"variants are declared for {base_chip}." + ) + lines.append("") + else: + lines.append(f"No board variants declared for {base_chip}.") + lines.append("") + + lines.extend([ + "Manual workaround (vendor U-Boot must be intact in flash):", + " 1. power-cycle the camera", + " 2. hold Ctrl-C to break U-Boot autoboot", + f" 3. at the U-Boot prompt: loady {addr}", + " 4. YMODEM-send agent-.bin", + f" 5. go {addr}", + ]) + return "\n".join(lines) + + @agent_app.command("upload") def agent_upload( chip: str = typer.Option(..., "-c", "--chip", help="Chip model name"), @@ -1052,10 +1104,16 @@ def on_progress(e: ProgressEvent) -> None: elif output == "json": print(json_mod.dumps({"event": "ready", **info})) else: + diag = _agent_not_responding_message(chip, profile.uboot_address) if output == "json": - print(json_mod.dumps({"event": "error", "message": "Agent not responding"})) + print(json_mod.dumps({ + "event": "error", + "message": "Agent not responding", + "diagnostic": diag, + })) else: console.print("[red]Agent not responding[/red]") + console.print(diag) raise typer.Exit(1) await transport.close() @@ -1188,11 +1246,16 @@ def on_boot_progress(e: ProgressEvent) -> None: client = FlashAgentClient(transport, chip) if not await client.connect(timeout=10.0): - msg = "Agent not responding" + diag = _agent_not_responding_message(chip, profile.uboot_address) if output == "json": - print(json_mod.dumps({"event": "error", "message": msg})) + print(json_mod.dumps({ + "event": "error", + "message": "Agent not responding", + "diagnostic": diag, + })) else: - console.print(f"[red]{msg}[/red]") + console.print("[red]Agent not responding[/red]") + console.print(diag) await transport.close() raise typer.Exit(1) diff --git a/src/defib/firmware.py b/src/defib/firmware.py index edb1406..243e3ba 100644 --- a/src/defib/firmware.py +++ b/src/defib/firmware.py @@ -58,8 +58,15 @@ def get_cache_dir() -> Path: return cache_dir +def _strip_variant(chip: str) -> str: + """Drop the optional ``:variant`` suffix — U-Boot binaries are per-chip, + not per-board, so any variant suffix is irrelevant for firmware lookup.""" + return chip.split(":", 1)[0] + + def firmware_url(chip: str) -> str | None: """Get the OpenIPC download URL for a chip, or None if unavailable.""" + chip = _strip_variant(chip) name = CHIP_TO_FIRMWARE.get(chip, chip) if name in AVAILABLE_FIRMWARE: return f"{OPENIPC_BASE_URL}/u-boot-{name}-universal.bin" @@ -73,6 +80,7 @@ def has_firmware(chip: str) -> bool: def get_cached_path(chip: str) -> Path | None: """Get the path to cached firmware, or None if not cached.""" + chip = _strip_variant(chip) name = CHIP_TO_FIRMWARE.get(chip, chip) path = get_cache_dir() / f"u-boot-{name}-universal.bin" if path.exists() and path.stat().st_size > 0: @@ -126,6 +134,7 @@ def download_firmware( return cached # Download + chip = _strip_variant(chip) name = CHIP_TO_FIRMWARE.get(chip, chip) dest = get_cache_dir() / f"u-boot-{name}-universal.bin" logger.info("Downloading firmware from %s", url) diff --git a/src/defib/profiles/loader.py b/src/defib/profiles/loader.py index 8d92cab..a7db8ac 100644 --- a/src/defib/profiles/loader.py +++ b/src/defib/profiles/loader.py @@ -21,24 +21,48 @@ def _get_profiles_dir() -> Path: return Path(str(ref)) +def parse_chip_variant(chip: str) -> tuple[str, str | None]: + """Split an optional ``chip:variant`` form into its parts. + + ``hi3516av300`` → ``("hi3516av300", None)`` + ``hi3516av300:emmc`` → ``("hi3516av300", "emmc")`` + + Used by every helper that accepts a chip identifier so callers can + pass the joined form consistently without each layer re-parsing. + """ + if ":" in chip: + base, variant = chip.split(":", 1) + return base.lower(), variant.lower() or None + return chip.lower(), None + + def load_profile(chip_name: str, profiles_dir: Path | None = None) -> SoCProfile: - """Load a SoC profile by chip name, resolving alias chains. + """Load a SoC profile by chip name, resolving alias chains and variants. + + ``chip_name`` may optionally use the ``chip:variant`` form (e.g. + ``hi3516av300:emmc``). When a variant is specified, the loader applies + the variant's per-board overrides (typically DDRSTEP0 / PRESTEP0) on + top of the base profile. Args: - chip_name: The chip identifier (e.g., "hi3516cv300"). + chip_name: The chip identifier (e.g., ``hi3516cv300`` or + ``hi3516av300:emmc``). profiles_dir: Optional override for the profiles directory. Returns: - Parsed SoCProfile. + Parsed SoCProfile with variant overrides applied if any. Raises: FileNotFoundError: If the profile JSON doesn't exist. - ValueError: If alias chain exceeds MAX_ALIAS_DEPTH. + ValueError: If the alias chain is too deep, or the requested + variant isn't declared for this chip. """ if profiles_dir is None: profiles_dir = _get_profiles_dir() - current = chip_name.lower() + base_chip, variant = parse_chip_variant(chip_name) + + current = base_chip for _ in range(MAX_ALIAS_DEPTH): profile_path = profiles_dir / f"{current}.json" if not profile_path.exists(): @@ -53,11 +77,52 @@ def load_profile(chip_name: str, profiles_dir: Path | None = None) -> SoCProfile continue data = json.loads(content) + # Pop variants before pydantic sees the dict so the SoCProfile + # model itself stays variant-unaware. + variants = data.pop("variants", None) or {} + if variant is not None: + if variant not in variants: + available = sorted(variants) if variants else [] + avail_str = ", ".join(available) if available else "(none declared)" + raise ValueError( + f"Unknown variant '{variant}' for chip '{current}'. " + f"Available variants: {avail_str}" + ) + # Variant entries override matching top-level keys + data.update(variants[variant]) return SoCProfile.model_validate(data) raise ValueError(f"Alias chain too deep for chip: {chip_name}") +def list_variants(chip_name: str, profiles_dir: Path | None = None) -> list[str]: + """Return the list of board variants declared for ``chip_name``. + + Empty list when the chip has no variants. Aliases are followed. + Variant suffixes in ``chip_name`` (after ``:``) are stripped first. + """ + if profiles_dir is None: + profiles_dir = _get_profiles_dir() + + base_chip, _ = parse_chip_variant(chip_name) + current = base_chip + for _ in range(MAX_ALIAS_DEPTH): + profile_path = profiles_dir / f"{current}.json" + if not profile_path.exists(): + return [] + content = profile_path.read_text().strip() + tokens = content.split() + if len(tokens) == 1 and tokens[0].endswith(".json"): + current = tokens[0][:-5] + continue + try: + data = json.loads(content) + except json.JSONDecodeError: + return [] + return sorted((data.get("variants") or {}).keys()) + return [] + + def list_chips(profiles_dir: Path | None = None) -> list[str]: """List all available chip names from profile files. diff --git a/tests/test_cli.py b/tests/test_cli.py index fd731c5..72aa90e 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -186,3 +186,42 @@ def test_replay_json(self, tmp_path): data = json.loads(result.stdout) assert data["chip"] == "json_test" assert data["records"] == 1 + + +class TestAgentNotRespondingDiagnostic: + """The diagnostic shown when boot-protocol upload completes but no + READY frame arrives — most often a board-variant DDR mismatch.""" + + def test_mentions_ddr_init_root_cause(self): + from defib.cli.app import _agent_not_responding_message + msg = _agent_not_responding_message("hi3516av300", 0x81000000) + assert "DDR" in msg + # Surfaces the actual address so the user can match it against + # what they see on the wire. + assert "0x81000000" in msg + + def test_no_variants_path(self): + from defib.cli.app import _agent_not_responding_message + msg = _agent_not_responding_message("hi3516av300", 0x81000000) + # Shipped profile has no variants today; message should say so + # rather than pretending one exists. + assert "No board variants declared" in msg + + def test_mentions_vendor_uboot_loadx_fallback(self): + from defib.cli.app import _agent_not_responding_message + msg = _agent_not_responding_message("hi3516av300", 0x81000000) + assert "loady" in msg + assert "go 0x81000000" in msg + + def test_when_variants_exist_lists_them(self, monkeypatch): + # Pretend the chip ships with two variants + import defib.cli.app as cli_app + from defib.profiles import loader + monkeypatch.setattr( + loader, "list_variants", lambda *a, **kw: ["emmc", "nor"], + ) + msg = cli_app._agent_not_responding_message("hi3516av300", 0x81000000) + assert "emmc" in msg + assert "nor" in msg + # And a concrete next-command nudge + assert "defib agent upload -c hi3516av300:" in msg diff --git a/tests/test_profiles.py b/tests/test_profiles.py index 1b10923..c210daf 100644 --- a/tests/test_profiles.py +++ b/tests/test_profiles.py @@ -1,10 +1,16 @@ """Tests for SoC profile loading and validation.""" +import json from pathlib import Path import pytest -from defib.profiles.loader import list_chips, load_profile +from defib.profiles.loader import ( + list_chips, + list_variants, + load_profile, + parse_chip_variant, +) PROFILES_DIR = Path(__file__).parent.parent / "src" / "defib" / "profiles" / "data" @@ -76,3 +82,152 @@ def test_alias_chain(self): except Exception: pass # Some might be aliases to nonexistent targets assert loaded > 10 + + +class TestParseChipVariant: + def test_no_colon_no_variant(self): + assert parse_chip_variant("hi3516av300") == ("hi3516av300", None) + + def test_colon_splits_variant(self): + assert parse_chip_variant("hi3516av300:emmc") == ("hi3516av300", "emmc") + + def test_lowercased(self): + assert parse_chip_variant("Hi3516AV300:EMMC") == ("hi3516av300", "emmc") + + def test_empty_variant_treated_as_none(self): + assert parse_chip_variant("hi3516av300:") == ("hi3516av300", None) + + +class TestBoardVariants: + """Per-board overrides for fields that vary by board (DDR, clocks).""" + + @pytest.fixture + def chip_with_variants(self, tmp_path: Path) -> Path: + """Synthetic chip profile with an `emmc` variant that overrides DDRSTEP0.""" + base_ddr = [0] * 64 + emmc_ddr = [1] * 64 + profile = { + "name": "testchip", + "DDRSTEP0": base_ddr, + "ADDRESS": ["0x04017000", "0x04010500", "0x81000000"], + "FILELEN": ["0x0040", "0x6000"], + "STEPLEN": ["0x0040", "0x0070"], + "variants": { + "emmc": {"DDRSTEP0": emmc_ddr}, + "uart-debug": { + "DDRSTEP0": [2] * 64, + "PRESTEP0": [3] * 64, + }, + }, + } + (tmp_path / "testchip.json").write_text(json.dumps(profile)) + return tmp_path + + def test_load_without_variant_uses_base(self, chip_with_variants: Path): + p = load_profile("testchip", chip_with_variants) + assert p.ddr_step_data == bytes([0] * 64) + + def test_load_with_variant_applies_override(self, chip_with_variants: Path): + p = load_profile("testchip:emmc", chip_with_variants) + assert p.ddr_step_data == bytes([1] * 64) + + def test_variant_can_add_field_not_in_base(self, chip_with_variants: Path): + # uart-debug adds PRESTEP0 that the base doesn't have + p = load_profile("testchip:uart-debug", chip_with_variants) + assert p.ddr_step_data == bytes([2] * 64) + assert p.prestep_data == bytes([3] * 64) + + def test_unknown_variant_raises_with_available_list( + self, chip_with_variants: Path + ): + with pytest.raises(ValueError, match="Unknown variant 'nope'"): + load_profile("testchip:nope", chip_with_variants) + # The error should name the available variants so the user can pivot. + with pytest.raises(ValueError, match="emmc"): + load_profile("testchip:nope", chip_with_variants) + with pytest.raises(ValueError, match="uart-debug"): + load_profile("testchip:nope", chip_with_variants) + + def test_unknown_variant_on_chip_with_no_variants(self, tmp_path: Path): + profile = { + "name": "tinychip", + "DDRSTEP0": [0] * 64, + "ADDRESS": ["0x0", "0x0", "0x0"], + "FILELEN": ["0x0", "0x0"], + "STEPLEN": ["0x0", "0x0"], + } + (tmp_path / "tinychip.json").write_text(json.dumps(profile)) + with pytest.raises(ValueError, match="none declared"): + load_profile("tinychip:emmc", tmp_path) + + def test_list_variants_returns_sorted(self, chip_with_variants: Path): + assert list_variants("testchip", chip_with_variants) == [ + "emmc", "uart-debug", + ] + + def test_list_variants_strips_variant_suffix(self, chip_with_variants: Path): + # Caller passed a variant suffix — should still resolve and list. + assert list_variants("testchip:emmc", chip_with_variants) == [ + "emmc", "uart-debug", + ] + + def test_list_variants_empty_when_chip_has_none(self, tmp_path: Path): + profile = { + "name": "tinychip", + "DDRSTEP0": [0] * 64, + "ADDRESS": ["0x0", "0x0", "0x0"], + "FILELEN": ["0x0", "0x0"], + "STEPLEN": ["0x0", "0x0"], + } + (tmp_path / "tinychip.json").write_text(json.dumps(profile)) + assert list_variants("tinychip", tmp_path) == [] + + def test_variants_follow_alias_chain(self, tmp_path: Path): + # Aliases let us name dv300 → av300 without duplicating data. + # Variants live on the alias TARGET and are reachable via either name. + (tmp_path / "dv300_alias.json").write_text("av300_real.json") + (tmp_path / "av300_real.json").write_text(json.dumps({ + "name": "av300_real", + "DDRSTEP0": [0] * 64, + "ADDRESS": ["0x04017000", "0x04010500", "0x81000000"], + "FILELEN": ["0x0040", "0x6000"], + "STEPLEN": ["0x0040", "0x0070"], + "variants": {"emmc": {"DDRSTEP0": [9] * 64}}, + })) + p = load_profile("dv300_alias:emmc", tmp_path) + assert p.ddr_step_data == bytes([9] * 64) + assert list_variants("dv300_alias", tmp_path) == ["emmc"] + + def test_real_av300_profile_still_loads(self): + """Smoke test: real shipped profile (no variants today) keeps working.""" + p = load_profile("hi3516av300", PROFILES_DIR) + assert p.name == "hi3516av300" + assert p.uboot_address == 0x81000000 + + +class TestVariantStrippingInLookups: + """Helpers that accept a chip identifier must accept the chip:variant + form and strip the variant before looking up per-chip resources + (firmware download URL, prebuilt agent binary). Otherwise the colon + syntax would only work for the profile loader.""" + + def test_firmware_url_strips_variant(self): + from defib.firmware import firmware_url + a = firmware_url("hi3516ev300") + b = firmware_url("hi3516ev300:emmc") + assert a is not None + assert a == b + + def test_get_cached_path_strips_variant(self): + from defib.firmware import get_cached_path + # Either both return None (not cached) or both return the same path — + # they must agree. + assert get_cached_path("hi3516ev300") == get_cached_path( + "hi3516ev300:emmc" + ) + + def test_get_agent_binary_strips_variant(self): + from defib.agent.client import get_agent_binary + assert get_agent_binary("hi3516av300") == get_agent_binary( + "hi3516av300:emmc" + )