Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion src/defib/agent/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,13 @@ def bad_block(self) -> list[SectorResult]:


def get_agent_binary(chip: str) -> Path | None:
"""Get the path to the pre-compiled agent binary for a chip."""
"""Get the path to the pre-compiled agent binary for a chip.

Accepts an optional ``:variant`` suffix (e.g. ``hi3516av300:emmc``);
the variant only affects DDR init, not the agent binary, so we strip
it before the lookup.
"""
chip = chip.split(":", 1)[0]
chip_to_agent = {
"hi3516ev300": "hi3516ev300",
"gk7205v200": "gk7205v200",
Expand Down
71 changes: 67 additions & 4 deletions src/defib/cli/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -942,6 +942,58 @@ def list_interfaces_cmd(
app.add_typer(agent_app, name="agent")


def _agent_not_responding_message(chip: str, uboot_address: int | None = None) -> str:
"""Build the diagnostic shown when boot protocol uploads complete but the
agent never sends READY.

The single most common cause we've seen is a board-variant DDR mismatch:
defib's per-chip DDR init (PRESTEP0/DDRSTEP0) is calibrated for one
board, the bootrom faithfully jumps to the agent's load address, but
DDR isn't actually backed there so the CPU fetches garbage and hangs
silently with no UART output. The diagnostic surfaces the board-variant
fix path first and falls back to the vendor-U-Boot loadx manual route.
"""
from defib.profiles.loader import list_variants, parse_chip_variant

base_chip, current_variant = parse_chip_variant(chip)
variants = list_variants(base_chip)
addr = f"0x{uboot_address:08x}" if uboot_address is not None else "<uboot_address>"

lines = [
"Boot-protocol upload completed but the agent never sent READY.",
"",
"Most common cause: the chip profile's DDR init (PRESTEP0/DDRSTEP0)",
"doesn't match this board's DDR layout. The bootrom faithfully calls",
f"the agent at {addr}, but DDR isn't backed there, so the CPU",
"fetches garbage and hangs silently (no UART output).",
"",
]
suggested = [v for v in variants if v != current_variant]
if suggested:
lines.append(f"Known board variants for {base_chip}: {', '.join(variants)}")
lines.append(f" Try: defib agent upload -c {base_chip}:{suggested[0]} ...")
lines.append("")
elif variants:
lines.append(
f"You already specified variant '{current_variant}' — no other "
f"variants are declared for {base_chip}."
)
lines.append("")
else:
lines.append(f"No board variants declared for {base_chip}.")
lines.append("")

lines.extend([
"Manual workaround (vendor U-Boot must be intact in flash):",
" 1. power-cycle the camera",
" 2. hold Ctrl-C to break U-Boot autoboot",
f" 3. at the U-Boot prompt: loady {addr}",
" 4. YMODEM-send agent-<chip>.bin",
f" 5. go {addr}",
])
return "\n".join(lines)


@agent_app.command("upload")
def agent_upload(
chip: str = typer.Option(..., "-c", "--chip", help="Chip model name"),
Expand Down Expand Up @@ -1052,10 +1104,16 @@ def on_progress(e: ProgressEvent) -> None:
elif output == "json":
print(json_mod.dumps({"event": "ready", **info}))
else:
diag = _agent_not_responding_message(chip, profile.uboot_address)
if output == "json":
print(json_mod.dumps({"event": "error", "message": "Agent not responding"}))
print(json_mod.dumps({
"event": "error",
"message": "Agent not responding",
"diagnostic": diag,
}))
else:
console.print("[red]Agent not responding[/red]")
console.print(diag)
raise typer.Exit(1)

await transport.close()
Expand Down Expand Up @@ -1188,11 +1246,16 @@ def on_boot_progress(e: ProgressEvent) -> None:

client = FlashAgentClient(transport, chip)
if not await client.connect(timeout=10.0):
msg = "Agent not responding"
diag = _agent_not_responding_message(chip, profile.uboot_address)
if output == "json":
print(json_mod.dumps({"event": "error", "message": msg}))
print(json_mod.dumps({
"event": "error",
"message": "Agent not responding",
"diagnostic": diag,
}))
else:
console.print(f"[red]{msg}[/red]")
console.print("[red]Agent not responding[/red]")
console.print(diag)
await transport.close()
raise typer.Exit(1)

Expand Down
9 changes: 9 additions & 0 deletions src/defib/firmware.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,15 @@ def get_cache_dir() -> Path:
return cache_dir


def _strip_variant(chip: str) -> str:
"""Drop the optional ``:variant`` suffix — U-Boot binaries are per-chip,
not per-board, so any variant suffix is irrelevant for firmware lookup."""
return chip.split(":", 1)[0]


def firmware_url(chip: str) -> str | None:
"""Get the OpenIPC download URL for a chip, or None if unavailable."""
chip = _strip_variant(chip)
name = CHIP_TO_FIRMWARE.get(chip, chip)
if name in AVAILABLE_FIRMWARE:
return f"{OPENIPC_BASE_URL}/u-boot-{name}-universal.bin"
Expand All @@ -73,6 +80,7 @@ def has_firmware(chip: str) -> bool:

def get_cached_path(chip: str) -> Path | None:
"""Get the path to cached firmware, or None if not cached."""
chip = _strip_variant(chip)
name = CHIP_TO_FIRMWARE.get(chip, chip)
path = get_cache_dir() / f"u-boot-{name}-universal.bin"
if path.exists() and path.stat().st_size > 0:
Expand Down Expand Up @@ -126,6 +134,7 @@ def download_firmware(
return cached

# Download
chip = _strip_variant(chip)
name = CHIP_TO_FIRMWARE.get(chip, chip)
dest = get_cache_dir() / f"u-boot-{name}-universal.bin"
logger.info("Downloading firmware from %s", url)
Expand Down
75 changes: 70 additions & 5 deletions src/defib/profiles/loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,24 +21,48 @@ def _get_profiles_dir() -> Path:
return Path(str(ref))


def parse_chip_variant(chip: str) -> tuple[str, str | None]:
"""Split an optional ``chip:variant`` form into its parts.

``hi3516av300`` → ``("hi3516av300", None)``
``hi3516av300:emmc`` → ``("hi3516av300", "emmc")``

Used by every helper that accepts a chip identifier so callers can
pass the joined form consistently without each layer re-parsing.
"""
if ":" in chip:
base, variant = chip.split(":", 1)
return base.lower(), variant.lower() or None
return chip.lower(), None


def load_profile(chip_name: str, profiles_dir: Path | None = None) -> SoCProfile:
"""Load a SoC profile by chip name, resolving alias chains.
"""Load a SoC profile by chip name, resolving alias chains and variants.

``chip_name`` may optionally use the ``chip:variant`` form (e.g.
``hi3516av300:emmc``). When a variant is specified, the loader applies
the variant's per-board overrides (typically DDRSTEP0 / PRESTEP0) on
top of the base profile.

Args:
chip_name: The chip identifier (e.g., "hi3516cv300").
chip_name: The chip identifier (e.g., ``hi3516cv300`` or
``hi3516av300:emmc``).
profiles_dir: Optional override for the profiles directory.

Returns:
Parsed SoCProfile.
Parsed SoCProfile with variant overrides applied if any.

Raises:
FileNotFoundError: If the profile JSON doesn't exist.
ValueError: If alias chain exceeds MAX_ALIAS_DEPTH.
ValueError: If the alias chain is too deep, or the requested
variant isn't declared for this chip.
"""
if profiles_dir is None:
profiles_dir = _get_profiles_dir()

current = chip_name.lower()
base_chip, variant = parse_chip_variant(chip_name)

current = base_chip
for _ in range(MAX_ALIAS_DEPTH):
profile_path = profiles_dir / f"{current}.json"
if not profile_path.exists():
Expand All @@ -53,11 +77,52 @@ def load_profile(chip_name: str, profiles_dir: Path | None = None) -> SoCProfile
continue

data = json.loads(content)
# Pop variants before pydantic sees the dict so the SoCProfile
# model itself stays variant-unaware.
variants = data.pop("variants", None) or {}
if variant is not None:
if variant not in variants:
available = sorted(variants) if variants else []
avail_str = ", ".join(available) if available else "(none declared)"
raise ValueError(
f"Unknown variant '{variant}' for chip '{current}'. "
f"Available variants: {avail_str}"
)
# Variant entries override matching top-level keys
data.update(variants[variant])
return SoCProfile.model_validate(data)

raise ValueError(f"Alias chain too deep for chip: {chip_name}")


def list_variants(chip_name: str, profiles_dir: Path | None = None) -> list[str]:
"""Return the list of board variants declared for ``chip_name``.

Empty list when the chip has no variants. Aliases are followed.
Variant suffixes in ``chip_name`` (after ``:``) are stripped first.
"""
if profiles_dir is None:
profiles_dir = _get_profiles_dir()

base_chip, _ = parse_chip_variant(chip_name)
current = base_chip
for _ in range(MAX_ALIAS_DEPTH):
profile_path = profiles_dir / f"{current}.json"
if not profile_path.exists():
return []
content = profile_path.read_text().strip()
tokens = content.split()
if len(tokens) == 1 and tokens[0].endswith(".json"):
current = tokens[0][:-5]
continue
try:
data = json.loads(content)
except json.JSONDecodeError:
return []
return sorted((data.get("variants") or {}).keys())
return []


def list_chips(profiles_dir: Path | None = None) -> list[str]:
"""List all available chip names from profile files.

Expand Down
39 changes: 39 additions & 0 deletions tests/test_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -186,3 +186,42 @@ def test_replay_json(self, tmp_path):
data = json.loads(result.stdout)
assert data["chip"] == "json_test"
assert data["records"] == 1


class TestAgentNotRespondingDiagnostic:
"""The diagnostic shown when boot-protocol upload completes but no
READY frame arrives — most often a board-variant DDR mismatch."""

def test_mentions_ddr_init_root_cause(self):
from defib.cli.app import _agent_not_responding_message
msg = _agent_not_responding_message("hi3516av300", 0x81000000)
assert "DDR" in msg
# Surfaces the actual address so the user can match it against
# what they see on the wire.
assert "0x81000000" in msg

def test_no_variants_path(self):
from defib.cli.app import _agent_not_responding_message
msg = _agent_not_responding_message("hi3516av300", 0x81000000)
# Shipped profile has no variants today; message should say so
# rather than pretending one exists.
assert "No board variants declared" in msg

def test_mentions_vendor_uboot_loadx_fallback(self):
from defib.cli.app import _agent_not_responding_message
msg = _agent_not_responding_message("hi3516av300", 0x81000000)
assert "loady" in msg
assert "go 0x81000000" in msg

def test_when_variants_exist_lists_them(self, monkeypatch):
# Pretend the chip ships with two variants
import defib.cli.app as cli_app
from defib.profiles import loader
monkeypatch.setattr(
loader, "list_variants", lambda *a, **kw: ["emmc", "nor"],
)
msg = cli_app._agent_not_responding_message("hi3516av300", 0x81000000)
assert "emmc" in msg
assert "nor" in msg
# And a concrete next-command nudge
assert "defib agent upload -c hi3516av300:" in msg
Loading
Loading