diff --git a/libbs/api/type_definition_parser.py b/libbs/api/type_definition_parser.py new file mode 100644 index 0000000..46675ae --- /dev/null +++ b/libbs/api/type_definition_parser.py @@ -0,0 +1,201 @@ +""" +Parse a single C type *definition* string into the matching libbs artifact. + +Unlike ``CTypeParser`` (libbs/api/type_parser.py), which is deliberately scoped to +type *expressions* ("int *", "struct Foo *"), this module handles full type +*definitions* with bodies: + + - ``struct Name { };`` -> :class:`libbs.artifacts.Struct` + - ``enum Name { A, B=5, C };`` -> :class:`libbs.artifacts.Enum` + - ``typedef Name;`` -> :class:`libbs.artifacts.Typedef` + +It is intentionally decompiler-free and unit-testable: the heavy lifting is done by +``pycparser`` (already a libbs dependency) for the AST and member type-string +rendering, and by ``CTypeParser`` for member sizing. The resulting artifact is then +applied to a decompiler via the normal ``deci.structs[name] = struct`` / +``deci.set_artifact(...)`` path, which is portable across every backend. +""" +import logging +import re +from typing import Optional, Union + +import pycparser +from pycparser import c_ast, c_generator +from pycparser.c_parser import ParseError + +from libbs.artifacts import Struct, StructMember, Enum, Typedef +from libbs.api.type_parser import CTypeParser + +_l = logging.getLogger(__name__) + +# Reuse single instances; both are stateless across parses. +_GENERATOR = c_generator.CGenerator() +_PARSER = pycparser.CParser() +_DEFAULT_TYPE_PARSER = CTypeParser() + +# Member natural alignment is its own size in System V, capped at the platform +# word width (pointers/long are 8 in CTypeParser's defaults). +_MAX_ALIGN = 8 + + +class TypeDefinitionParseError(ValueError): + """Raised when a C type-definition string cannot be turned into a libbs artifact.""" + + +def parse_type_definition( + text: str, + type_parser: Optional[CTypeParser] = None, +) -> Union[Struct, Enum, Typedef]: + """ + Parse a single C type *definition* into the matching libbs artifact. + + Supports exactly one top-level definition: a named ``struct``, ``enum``, or + ``typedef``. Raises :class:`TypeDefinitionParseError` on anything unparseable, + anonymous, multi-definition, or otherwise unsupported. + + >>> parse_type_definition("struct Point { int x; int y; }") + + """ + tp = type_parser or _DEFAULT_TYPE_PARSER + ast = _parse_ast(_normalize(text)) + top = ast.ext[0] + + if isinstance(top, c_ast.Typedef): + return _typedef_from_ast(top) + + # struct/enum arrive wrapped in a Decl whose .type is the Struct/Enum node + if isinstance(top, c_ast.Decl): + inner = top.type + if isinstance(inner, c_ast.Struct): + return _struct_from_ast(inner, tp) + if isinstance(inner, c_ast.Enum): + return _enum_from_ast(inner, tp) + + raise TypeDefinitionParseError( + f"Unsupported top-level definition: {type(top).__name__}. " + "Expected a named struct, enum, or typedef." + ) + + +def _normalize(text: str) -> str: + if not text or not text.strip(): + raise TypeDefinitionParseError("Empty type definition.") + # strip C comments (same approach as CTypeParser.parse_type_with_name) + text = re.sub(r"/\*.*?\*/", "", text, flags=re.DOTALL) + text = re.sub(r"//.*?$", "", text, flags=re.MULTILINE) + text = text.strip() + if not text.endswith(";"): + text += ";" + return text + + +def _parse_ast(text: str) -> c_ast.FileAST: + try: + ast = _PARSER.parse(text) + except ParseError as exc: + raise TypeDefinitionParseError(f"could not parse C definition: {exc}") + if not ast.ext: + raise TypeDefinitionParseError("no type definition found.") + if len(ast.ext) != 1: + raise TypeDefinitionParseError( + "expected exactly one type definition, got " + f"{len(ast.ext)}. Define one type at a time." + ) + return ast + + +def _render_type(node) -> str: + """Render a member/typedef type node back to a C type string, e.g. "char *".""" + rendered = _GENERATOR.visit(node).strip() + if "\n" in rendered or "{" in rendered: + raise TypeDefinitionParseError( + "inline/nested type definitions are unsupported here; define the " + "inner type separately and reference it by name." + ) + return rendered + + +def _member_size(tp: CTypeParser, type_str: str) -> int: + ct = tp.parse_type(type_str) + if ct is None or not ct.size: + # Unknown, user-defined non-pointer type (e.g. "struct Bar" before Bar + # exists): we cannot reliably size it, so reject rather than emit a + # 0-size member that would corrupt every subsequent offset. + raise TypeDefinitionParseError( + f"could not determine the size of member type {type_str!r}. " + "Define referenced types first, or use a pointer/primitive." + ) + return ct.size + + +def _struct_from_ast(struct_node: c_ast.Struct, tp: CTypeParser) -> Struct: + if not struct_node.name: + raise TypeDefinitionParseError( + "anonymous structs are not supported; give the struct a name." + ) + if not struct_node.decls: + raise TypeDefinitionParseError( + f"struct {struct_node.name!r} has no members to define." + ) + + members = {} + offset = 0 + max_align = 1 + for decl in struct_node.decls: + if decl.name is None: + raise TypeDefinitionParseError( + f"unnamed member in struct {struct_node.name!r} is unsupported." + ) + type_str = _render_type(decl.type) + size = _member_size(tp, type_str) + align = min(size, _MAX_ALIGN) if size else 1 + # round the running offset up to this member's natural alignment + if align > 1 and offset % align: + offset += align - (offset % align) + members[offset] = StructMember( + name=decl.name, offset=offset, type_=type_str, size=size, + ) + offset += size + max_align = max(max_align, align) + + total = offset + if max_align > 1 and total % max_align: + total += max_align - (total % max_align) + + return Struct(name=struct_node.name, size=total, members=members) + + +def _enum_from_ast(enum_node: c_ast.Enum, tp: CTypeParser) -> Enum: + if not enum_node.name: + raise TypeDefinitionParseError( + "anonymous enums are not supported; give the enum a name." + ) + if not enum_node.values or not enum_node.values.enumerators: + raise TypeDefinitionParseError( + f"enum {enum_node.name!r} has no members to define." + ) + + members = {} + next_val = 0 + for en in enum_node.values.enumerators: + if en.value is None: + val = next_val + else: + try: + val = tp._parse_const(en.value) + except Exception: + raise TypeDefinitionParseError( + f"could not evaluate enum value for {en.name!r}." + ) + members[en.name] = val + next_val = val + 1 + + return Enum(name=enum_node.name, members=members) + + +def _typedef_from_ast(typedef_node: c_ast.Typedef) -> Typedef: + name = typedef_node.name + if not name: + raise TypeDefinitionParseError("typedef is missing a name.") + type_str = _render_type(typedef_node.type) + return Typedef(name=name, type_=type_str) diff --git a/libbs/artifacts/func.py b/libbs/artifacts/func.py index b68b139..4566235 100644 --- a/libbs/artifacts/func.py +++ b/libbs/artifacts/func.py @@ -289,6 +289,14 @@ def __setstate__(self, state): # Let super set all attributes at once super().__setstate__(state) + # dec_obj is intentionally excluded from serialization (it's in + # _attr_ignore_set), so it is never present in `state`. Because Function + # uses __slots__, the attribute would otherwise be unset after a + # deserialization round-trip and any access to `func.dec_obj` (e.g. in + # get_dependencies or a backend's rename path) would raise AttributeError. + if not hasattr(self, "dec_obj"): + self.dec_obj = None + def diff(self, other, **kwargs) -> Dict: diff_dict = {} if not isinstance(other, Function): diff --git a/libbs/cli/decompiler_cli.py b/libbs/cli/decompiler_cli.py index fd3aa38..3886a44 100644 --- a/libbs/cli/decompiler_cli.py +++ b/libbs/cli/decompiler_cli.py @@ -16,6 +16,9 @@ - xref_to data + code references to a target - xref_from things a function calls (callees) - rename rename a function or local variable +- create-type define a new struct/enum/typedef from a C string +- retype change the type of a function's variable or argument +- sync copy work on a function from one server into another - list_strings list strings in the binary, optionally filtered by regex - get_callers functions (call sites only) that call a target - read_memory read raw bytes from the binary at an address @@ -724,6 +727,196 @@ def cmd_rename(args) -> int: raise SystemExit(f"Unknown rename kind: {kind}") +# --------------------------------------------------------------------------- +# create-type / retype +# --------------------------------------------------------------------------- + +def cmd_create_type(args) -> int: + """Define a new struct/enum/typedef from a C string and apply it. + + The definition is parsed (client-side, decompiler-free) into a libbs + artifact and pushed through the normal type-setting path, which works + across every backend. + """ + from libbs.api.type_definition_parser import ( + parse_type_definition, TypeDefinitionParseError, + ) + + try: + artifact = parse_type_definition(args.definition) + except TypeDefinitionParseError as exc: + # Fail before connecting — nothing about a server changes the parse. + raise SystemExit(f"Could not parse type definition: {exc}") + + with _with_client(args) as client: + ok = bool(client.set_artifact(artifact)) + _emit(args, { + "kind": type(artifact).__name__, + "name": artifact.name, + "size": getattr(artifact, "size", None), + "members": len(artifact.members) if hasattr(artifact, "members") else None, + "success": ok, + }) + return EXIT_OK if ok else EXIT_USER_ERROR + + +def _find_variable(func, var_name: str): + """Locate a variable by name in a function. Returns (kind, var) or (None, None). + + Stack variables are checked before arguments. ``kind`` is "stack" or "arg". + """ + for svar in func.stack_vars.values(): + if svar.name == var_name: + return "stack", svar + if func.header is not None: + for arg in func.header.args.values(): + if arg.name == var_name: + return "arg", arg + return None, None + + +def _compute_type_size(client, type_str: str) -> int: + """Best-effort byte size for a (possibly user-defined) type string.""" + ctype = client.type_parser.parse_type(type_str) + if ctype is not None and ctype.size: + return ctype.size + # Unknown/user-defined non-pointer type (e.g. a struct by value): ask the + # backend what it already knows about this type. + try: + defined = client.get_defined_type(type_str) + except Exception: + defined = None + size = getattr(defined, "size", None) + if size: + return size + # 0 means "let the backend infer the size". + return ctype.size if ctype is not None else 0 + + +def cmd_retype(args) -> int: + """Change the type of a local variable or argument of a function.""" + with _with_client(args) as client: + func_addr = _resolve_function_addr(client, args.function) + if func_addr is None: + raise SystemExit(f"Function not found: {args.function!r}") + func = client.functions[func_addr] + if not func: + raise SystemExit(f"Could not load function at 0x{func_addr:x}") + + kind, var = _find_variable(func, args.variable) + if var is None: + raise SystemExit( + f"Variable {args.variable!r} not found in {args.function!r}. " + "Check the name (it is case-sensitive)." + ) + + var.type = args.new_type + var.size = _compute_type_size(client, args.new_type) + + ok = bool(client.set_artifact(func)) + if not ok: + raise SystemExit( + f"Backend rejected retype of {args.variable!r} to {args.new_type!r}." + ) + + # Re-read so the caller can see what the backend actually stored. + refreshed = client.functions[func_addr] + _, new_var = _find_variable(refreshed, args.variable) + _emit(args, { + "function_addr": func_addr, + "variable": args.variable, + "kind": kind, + "new_type": args.new_type, + "applied_type": getattr(new_var, "type", None) if new_var else None, + "success": ok, + }) + return EXIT_OK + + +# --------------------------------------------------------------------------- +# sync +# --------------------------------------------------------------------------- + +def cmd_sync(args) -> int: + """Copy work on a function from one running server into another. + + Source is selected by --from-id; destination by the usual + --id/--binary/--backend. Syncs the function's referenced user types + (struct/enum/typedef) first, then the function header (name/return/args) + and stack variables. Addresses and stack offsets are canonical in lifted + form, so they re-key correctly on the destination even if it names the + function differently. + """ + from libbs.artifacts import Struct, Enum, Typedef + + src_record = _select_server(server_id=args.from_id, binary_path=None, backend=None) + dst_record = _select_server( + server_id=getattr(args, "id", None), + binary_path=getattr(args, "binary", None), + backend=getattr(args, "backend", None), + ) + if src_record.get("id") == dst_record.get("id"): + raise SystemExit( + f"Source and destination are the same server (id={src_record.get('id')}). " + "Pick two different servers." + ) + + src = _connect_client(src_record) + dst = None + try: + dst = _connect_client(dst_record) + + addr = _resolve_function_addr(src, args.target) + known = _known_function_addrs(src) + if addr is None or (known and addr not in known): + raise SystemExit(f"Function not found on source: {args.target!r}") + + src_func = src.functions[addr] + if not src_func: + raise SystemExit(f"Could not load function at 0x{addr:x} on source") + + # 1) Sync referenced user types first so retypes resolve on the dest. + synced_types, failed_types = [], [] + try: + deps = src.get_dependencies(src_func, decompile=True) + except Exception as exc: + _l.debug("get_dependencies failed: %s", exc) + deps = [] + for dep in deps: + if isinstance(dep, (Struct, Enum, Typedef)): + name = getattr(dep, "name", None) + try: + ok_dep = bool(dst.set_artifact(dep)) + except Exception as exc: + _l.debug("type sync failed for %s: %s", name, exc) + ok_dep = False + (synced_types if ok_dep else failed_types).append(name) + + # 2) Sync the function header + stack vars in one shot. + func_ok = bool(dst.set_artifact(src_func)) + + synced_vars = sorted( + (sv.offset, getattr(sv, "name", None)) + for sv in (src_func.stack_vars or {}).values() + ) + _emit(args, { + "target": args.target, + "addr": addr, + "from_id": src_record.get("id"), + "to_id": dst_record.get("id"), + "function_name": src_func.name, + "synced_types": synced_types, + "failed_types": failed_types, + "synced_stack_vars": [{"offset": off, "name": nm} for off, nm in synced_vars], + "success": func_ok, + }) + return EXIT_OK if func_ok else EXIT_USER_ERROR + finally: + src.shutdown() + if dst is not None: + dst.shutdown() + + # --------------------------------------------------------------------------- # list_strings / get_callers (new core APIs) # --------------------------------------------------------------------------- @@ -1149,6 +1342,55 @@ def build_parser() -> argparse.ArgumentParser: _add_output_args(p_ren) p_ren.set_defaults(func=cmd_rename) + # create-type + p_ct = sub.add_parser( + "create-type", + help=( + "Define a new struct, enum, or typedef from a C definition string " + "and apply it to the binary's type database." + ), + ) + p_ct.add_argument( + "definition", + help=( + 'C type definition, e.g. "struct Point { int x; int y; }", ' + '"enum Color { RED, GREEN, BLUE }", or "typedef int my_int_t".' + ), + ) + _add_server_filter_args(p_ct) + _add_output_args(p_ct) + p_ct.set_defaults(func=cmd_create_type) + + # retype + p_rt = sub.add_parser( + "retype", + help="Change the type of a function's local variable or argument.", + ) + p_rt.add_argument("function", help="Function name or address (hex/decimal).") + p_rt.add_argument("variable", help="Variable (stack var or arg) name to retype.") + p_rt.add_argument("new_type", help='New C type, e.g. "int", "double", "Point *".') + _add_server_filter_args(p_rt) + _add_output_args(p_rt) + p_rt.set_defaults(func=cmd_retype) + + # sync + p_sync = sub.add_parser( + "sync", + help=( + "Copy work on a function (name, return/arg types, stack vars, and " + "referenced user types) from one running server into another for " + "the same binary. Source = --from-id; destination = --id/--binary/--backend." + ), + ) + p_sync.add_argument("target", help="Function name or address (hex/decimal) on the source.") + p_sync.add_argument( + "--from-id", dest="from_id", required=True, + help="Source server ID to copy work FROM (see `decompiler list`).", + ) + _add_server_filter_args(p_sync) + _add_output_args(p_sync) + p_sync.set_defaults(func=cmd_sync) + # list_strings p_ls = sub.add_parser( "list_strings", diff --git a/libbs/decompilers/ghidra/compat/headless.py b/libbs/decompilers/ghidra/compat/headless.py index c81aaf9..d29af73 100644 --- a/libbs/decompilers/ghidra/compat/headless.py +++ b/libbs/decompilers/ghidra/compat/headless.py @@ -27,7 +27,13 @@ def open_program( raise ValueError("You must provide either a binary path or a project location.") if not PyGhidraLauncher.has_launched(): - HeadlessPyGhidraLauncher().start() + launcher = HeadlessPyGhidraLauncher() + # Force the JVM into AWT-headless mode. The "headless" launcher does not + # set this itself, so if a (possibly stale) DISPLAY is exported Ghidra's + # JVM tries to reach an X server and dies with: + # java.awt.AWTError: Can't connect to X11 window server ... + launcher.add_vmargs("-Djava.awt.headless=true") + launcher.start() from ghidra.app.script import GhidraScriptUtil from ghidra.program.flatapi import FlatProgramAPI diff --git a/libbs/decompilers/ghidra/interface.py b/libbs/decompilers/ghidra/interface.py index 3f3ee63..7065e66 100644 --- a/libbs/decompilers/ghidra/interface.py +++ b/libbs/decompilers/ghidra/interface.py @@ -1237,14 +1237,30 @@ def typestr_to_gtype(self, typestr: str) -> Optional["DataType"]: except Exception as e: parsed_type = None - # attempt a lookup as a custom datatype - if parsed_type is None: - typestr = "/" + typestr if not typestr.startswith("/") else typestr - parsed_type = self.currentProgram.getDataTypeManager().getDataType(typestr) + dtm = self.currentProgram.getDataTypeManager() - #if self.headless and parsed_type is None: - # # try again in headless mode only! - # parsed_type = self._headless_lookup_struct(typestr) + # attempt a lookup as a custom datatype by name (e.g. "Point") + if parsed_type is None: + lookup = typestr if typestr.startswith("/") else "/" + typestr + parsed_type = dtm.getDataType(lookup) + + # attempt to resolve a pointer to a custom datatype (e.g. "Point *"): + # DataTypeParser can't resolve user structs by bare name and the path + # lookup above only matches non-pointer names, so build the pointer + # explicitly from the resolved base type. + if parsed_type is None and typestr.rstrip().endswith("*"): + base_str = typestr.strip() + ptr_levels = 0 + while base_str.endswith("*"): + base_str = base_str[:-1].strip() + ptr_levels += 1 + base_lookup = base_str if base_str.startswith("/") else "/" + base_str + base_dt = dtm.getDataType(base_lookup) + if base_dt is not None: + from ghidra.program.model.data import PointerDataType + parsed_type = base_dt + for _ in range(ptr_levels): + parsed_type = PointerDataType(parsed_type) if parsed_type is None: _l.warning("Failed to parse type string: %s", typestr) diff --git a/libbs/decompilers/ida/compat.py b/libbs/decompilers/ida/compat.py index f49b860..63bcec2 100644 --- a/libbs/decompilers/ida/compat.py +++ b/libbs/decompilers/ida/compat.py @@ -143,8 +143,12 @@ def execute_ui(func): class DummyIDACodeView: """ - TODO: this needs to be redone to support setting artifacts in the decompiler when in headless mode. - Mostly for decompiled artifacts, like stack variables and function arguments + A stand-in for an IDA pseudocode ``vdui`` used in headless mode (where no GUI + view exists). It exposes the two mutations ``set_stack_variables`` needs — + renaming and retyping a local/stack variable — implemented against the + headless Hexrays APIs (``rename_lvar`` / ``modify_user_lvar_info``) so that + edits actually persist to the database. Any other attribute access falls + back to a no-op (e.g. ``refresh_view`` has nothing to refresh headless). """ def __init__(self, addr): self.cfunc = ida_hexrays.decompile(addr) @@ -153,6 +157,34 @@ def __init__(self, addr): def __getattr__(self, item): return lambda *x,**y: None + def rename_lvar(self, lvar, name, is_user=1) -> bool: + """Rename a local variable headlessly. Mirrors vdui.rename_lvar.""" + ok = bool(ida_hexrays.rename_lvar(self.addr, lvar.name, name)) + if ok: + self._refresh_cfunc() + return ok + + def set_lvar_type(self, lvar, new_type) -> bool: + """Set a local variable's type headlessly. Mirrors vdui.set_lvar_type. + + Uses ``modify_user_lvar_info`` with ``MLI_TYPE`` since there is no GUI + ``vdui`` to drive; the user-lvar settings persist across redecompilation. + """ + info = ida_hexrays.lvar_saved_info_t() + info.ll.location = lvar.location + info.ll.defea = lvar.defea + info.type = new_type + ok = bool(ida_hexrays.modify_user_lvar_info(self.addr, ida_hexrays.MLI_TYPE, info)) + if ok: + self._refresh_cfunc() + return ok + + def _refresh_cfunc(self): + # Re-decompile so callers re-reading ``cfunc.lvars`` see the change. + new_cfunc = ida_hexrays.decompile(self.addr) + if new_cfunc is not None: + self.cfunc = new_cfunc + def requires_decompilation(f): @wraps(f) diff --git a/libbs/skills/decompiler/SKILL.md b/libbs/skills/decompiler/SKILL.md index 8979311..64ed5fa 100644 --- a/libbs/skills/decompiler/SKILL.md +++ b/libbs/skills/decompiler/SKILL.md @@ -1,6 +1,6 @@ --- name: decompiler -description: Reverse-engineer and modify binaries with a single `decompiler` CLI that drives IDA Pro, Ghidra, Binary Ninja, or angr via LibBS. Use whenever the user asks to decompile, disassemble, look up cross references, rename functions or variables, search strings or functions, or otherwise inspect a binary file. Also use for multi-binary workflows (load several binaries at once and switch between them with --id). +description: Reverse-engineer and modify binaries with a single `decompiler` CLI that drives IDA Pro, Ghidra, Binary Ninja, or angr via LibBS. Use whenever the user asks to decompile, disassemble, look up cross references, rename functions or variables, define or change types, sync work between decompilers, search strings or functions, or otherwise inspect a binary file. Also use for multi-binary workflows (load several binaries at once and switch between them with --id). --- # `decompiler` — LibBS CLI for LLMs @@ -73,6 +73,8 @@ decompiler get_callers authenticate # call-sites only (subset of xref_ decompiler xref_from main # what does main call? decompiler rename func sub_400662 trampoline # rename a function decompiler rename var v2 auth_result --function main # rename a local +decompiler create-type "struct Point { int x; int y; }" # define a new type +decompiler retype main buf "Point *" # set a variable's type decompiler stop --all ``` @@ -135,6 +137,9 @@ same binary. | `xref_from ` | Functions that `target` calls. | same | | `rename func ` | Rename a function. | same + `--json` | | `rename var --function ` | Rename a local variable inside a function. | same | +| `create-type ""` | Define a new `struct`/`enum`/`typedef` from a C string and add it to the type database. | same + `--json` | +| `retype ` | Set the type of a function's local variable or argument. | same | +| `sync --from-id ` | Copy a function's work (names, return/arg types, stack-var names+types, referenced user types) from one running server into another for the same binary. | dest: `--id`/`--binary`/`--backend`; `--json` | | `list_strings` | Strings the decompiler found (may be incomplete — see below). | `--filter`, `--min-length N`, same | | `get_callers ` | Call-sites only — subset of `xref_to`. | same | | `read_memory ` | Read raw bytes from the binary at ``. Default output is a hexdump. | `--format {hexdump,hex,raw}`, same + `--json` (base64-encoded bytes) | @@ -190,6 +195,38 @@ the address with `list_functions` / `xref_to`. Address formats follow the same rules as everywhere else: hex (`0x4008e0`), decimal (`4197088`), or lifted (`0x8e0`) all work. +### Editing types and syncing across decompilers + +`create-type` parses a C type *definition* and adds it to the binary's type +database. `retype` then points a variable at it (or at any built-in type). +Both work on every backend; refer to the struct by name, with `*`/`[]` for +pointers and arrays: + +```bash +decompiler create-type "struct Point { int x; int y; }" +decompiler create-type "enum Color { RED, GREEN=5, BLUE }" +decompiler retype main buf "Point *" # stack var or argument, by name +``` + +`sync` copies one function's work from a **source** server into a +**destination** server for the *same* binary — handy when you reverse a +function in one tool and want it mirrored in another. It transfers the +function name, return/argument types, stack-variable names and types, and +any user-defined types those reference. The source is chosen with +`--from-id`; the destination with the usual `--id`/`--binary`/`--backend`: + +```bash +decompiler load ./fauxware --backend ida # id=ida123 (do your work here) +decompiler load ./fauxware --backend ghidra # id=ghi456 +decompiler rename func 0x71d auth_check --id ida123 +decompiler retype 0x71d buf "Point *" --id ida123 +decompiler sync 0x71d --from-id ida123 --id ghi456 # push it into Ghidra +``` + +Addresses and stack-variable offsets are normalized, so the function and +its variables re-key correctly even when the two backends name them +differently. Pass a function **address** (most robust) or a name. + ### `list_strings` may be incomplete `list_strings` returns exactly what the backend's own string detector diff --git a/tests/test_decompiler_cli.py b/tests/test_decompiler_cli.py index 5afff22..240ee1b 100644 --- a/tests/test_decompiler_cli.py +++ b/tests/test_decompiler_cli.py @@ -383,6 +383,96 @@ def test_project_dir_keeps_binary_dir_clean(self): self.assertTrue(project_contents, f"{self.backend} wrote nothing to the project_dir") + # ------------------------------------------------------------------- + # create-type / retype (run against every backend) + # ------------------------------------------------------------------- + + def _direct_client(self): + """Connect a DecompilerClient straight to this binary's server.""" + record = server_registry.find_servers(binary_path=str(FAUXWARE_PATH))[0] + return DecompilerClient(socket_path=record["socket_path"]) + + def _load_fauxware_isolated(self): + """Load fauxware into a fresh, non-hidden project dir. + + Ghidra rejects project *locations* containing a dot-prefixed path + element (e.g. the default ``~/.cache/libbs/...``), so hand it a temp + dir. This also keeps the test hermetic — no shared-cache state leaks + in from prior (possibly interrupted) runs. + """ + proj = tempfile.mkdtemp(prefix="libbs_cli_proj_") + self.addCleanup(shutil.rmtree, proj, ignore_errors=True) + return self._load_fauxware(project_dir=proj) + + def test_create_type(self): + self._load_fauxware_isolated() + result = _run_cli("create-type", "struct Point { int x; int y; }", "--json") + payload = json.loads(result.stdout) + self.assertEqual(payload["kind"], "Struct") + self.assertEqual(payload["name"], "Point") + self.assertTrue(payload["success"], + f"{self.backend}: create-type failed: {payload}") + + # Verify the struct actually landed, with both named members. + client = self._direct_client() + try: + struct = client.structs["Point"] + finally: + client.shutdown() + self.assertIsNotNone(struct, f"{self.backend}: Point not found after create") + member_names = {m.name for m in struct.members.values()} + self.assertEqual(member_names, {"x", "y"}, + f"{self.backend}: unexpected members {member_names}") + + def test_retype(self): + self._load_fauxware_isolated() + # Pick a 4-byte scalar stack var (an int) and retype it to `float`. + # Same size + scalar->scalar keeps this clean across backends: no + # overlap with the adjacent slot and no array->scalar reshaping (which + # Ghidra handles poorly). + client = self._direct_client() + try: + addrs = [a for a, f in client.functions.items() if f.name == "main"] + main_addr = addrs[0] + main_func = client.functions[main_addr] + svars = list(main_func.stack_vars.values()) + scalars = [v for v in svars + if (v.size or 0) == 4 and "[" not in str(v.type or "")] + if not scalars: + self.skipTest(f"{self.backend}: no 4-byte scalar var in main to retype") + target = scalars[0].name + had_float_before = any("float" in str(v.type or "").lower() for v in svars) + finally: + client.shutdown() + self.assertFalse(had_float_before, + f"{self.backend}: main already has a float var; bad fixture") + + result = _run_cli("retype", "main", target, "float", "--json", check=False) + if result.returncode != 0: + self.skipTest( + f"{self.backend}: retype of {target!r} unsupported: " + f"{result.stdout + result.stderr}" + ) + self.assertTrue(json.loads(result.stdout)["success"]) + + # Verify a float-typed variable now exists. Match on the type appearing + # in the set rather than by name/offset: backends rename a variable by + # its type when retyped (Ghidra local_2c -> fStack_2c). + client = self._direct_client() + try: + refreshed = client.functions[main_addr] + after_types = [str(v.type or "").lower() for v in refreshed.stack_vars.values()] + finally: + client.shutdown() + self.assertTrue(any("float" in t for t in after_types), + f"{self.backend}: no float-typed var after retype; types={after_types}") + + def test_retype_missing_var_exits_1(self): + self._load_fauxware_isolated() + result = _run_cli("retype", "main", "no_such_var_xyz", "int", check=False) + self.assertEqual(result.returncode, 1) + self.assertIn("not found", (result.stdout + result.stderr).lower()) + class TestDecompilerCLIAngr(_CLIBackendTestBase): """angr backend: always available (pure-Python dependency).""" @@ -550,6 +640,200 @@ class TestDecompilerCLIIDA(_CLIBackendTestBase): _persists_project_files = True # .id0/.id1/.id2/.nam/.til +# --------------------------------------------------------------------------- +# Cross-decompiler sync: push edits made in IDA into a running Ghidra instance. +# Standalone (not backend-parametrized) because it needs two specific backends. +# --------------------------------------------------------------------------- + +@unittest.skipUnless( + _backend_available("ida") and _backend_available("ghidra"), + "sync IDA->Ghidra tests need both ida (idapro) and ghidra (GHIDRA_INSTALL_DIR)", +) +class TestDecompilerSyncIDAtoGhidra(unittest.TestCase): + """`decompiler sync` copies a function's work from a source server (IDA) + into a destination server (Ghidra) for the same binary.""" + + @classmethod + def setUpClass(cls): + if not FAUXWARE_PATH.exists(): + raise unittest.SkipTest(f"Missing test binary: {FAUXWARE_PATH}") + os.environ["LIBBS_SERVER_REGISTRY"] = _REGISTRY_DIR + _stop_all_servers() + + @classmethod + def tearDownClass(cls): + _stop_all_servers() + + def setUp(self): + # Fresh, isolated project dir per test so a stale/locked backend + # database from a previous (possibly interrupted) run can't make a + # `load` hang or fail. Each backend writes into its own subdir. + self._proj_dir = tempfile.mkdtemp(prefix="libbs_sync_proj_") + + def tearDown(self): + _stop_all_servers() + shutil.rmtree(self._proj_dir, ignore_errors=True) + + # -- helpers ----------------------------------------------------------- + + def _load(self, backend): + # `load` blocks until the server is ready (Ghidra analysis included). + out = _run_cli("load", str(FAUXWARE_PATH), "--backend", backend, + "--force", "--project-dir", self._proj_dir, "--json").stdout + payload = json.loads(out) + self.assertIn(payload["status"], ("started", "already_loaded")) + return payload["id"] + + def _client_for(self, server_id): + rec = server_registry.find_server(server_id=server_id) + self.assertIsNotNone(rec, f"no server record for id={server_id}") + return DecompilerClient(socket_path=rec["socket_path"]) + + def _main_addr(self, client): + addrs = [a for a, f in client.functions.items() + if f.name in ("main", "_main")] + if not addrs: + addrs = [a for a in client.functions.keys() if a == 0x71d] + self.assertTrue(addrs, "could not find main on server") + return addrs[0] + + # -- tests ------------------------------------------------------------- + + def test_sync_names_ida_to_ghidra(self): + ida_id = self._load("ida") + ghidra_id = self._load("ghidra") + + # Pick a stack var on the IDA side to rename. + ida = self._client_for(ida_id) + try: + main_addr = self._main_addr(ida) + main_func = ida.functions[main_addr] + self.assertTrue(main_func.stack_vars, "IDA main has no stack vars") + target_off = sorted(main_func.stack_vars.keys())[0] + old_var_name = main_func.stack_vars[target_off].name + finally: + ida.shutdown() + + # Edit in IDA via the CLI: rename the function and the stack var. + # Reference the function by address (stable) rather than by its new + # name, since the light function list can lag a header rename. + main_hex = _format_hex(main_addr) + r1 = _run_cli("rename", "func", main_hex, "synced_main", "--id", ida_id, "--json") + self.assertTrue(json.loads(r1.stdout)["success"]) + r2 = _run_cli("rename", "var", old_var_name, "synced_var", + "--function", main_hex, "--id", ida_id, "--json") + self.assertTrue(json.loads(r2.stdout)["success"]) + + # Sync IDA -> Ghidra (sync takes a function address). + rs = _run_cli("sync", main_hex, "--from-id", ida_id, + "--id", ghidra_id, "--json") + sync_payload = json.loads(rs.stdout) + self.assertTrue(sync_payload["success"], f"sync failed: {sync_payload}") + + # Verify on Ghidra. The function is keyed by addr (Ghidra still calls + # it "main"); the renamed var is matched by canonical stack offset. + ghidra = self._client_for(ghidra_id) + try: + gfunc = ghidra.functions[sync_payload["addr"]] + self.assertEqual(gfunc.name, "synced_main", + f"function name not synced: {gfunc.name}") + var_names = {sv.name for sv in gfunc.stack_vars.values()} + self.assertIn("synced_var", var_names, + f"variable name not synced; ghidra vars: {var_names}") + finally: + ghidra.shutdown() + + def test_sync_types_ida_to_ghidra(self): + ida_id = self._load("ida") + ghidra_id = self._load("ghidra") + + # Pick a stack var on IDA to retype. Use the largest so there's room + # for an 8-byte `Point *` without overlapping the adjacent slot. + ida = self._client_for(ida_id) + try: + main_addr = self._main_addr(ida) + main_func = ida.functions[main_addr] + self.assertTrue(main_func.stack_vars) + biggest = max(main_func.stack_vars.values(), key=lambda v: (v.size or 0)) + target_off = biggest.offset + target_var_name = biggest.name + finally: + ida.shutdown() + + # Feature 1 in IDA: create a struct, then retype a var to a Point pointer. + rc = _run_cli("create-type", "struct Point { int x; int y; }", + "--id", ida_id, "--json") + self.assertEqual(rc.returncode, 0, rc.stderr) + self.assertTrue(json.loads(rc.stdout)["success"]) + main_hex = _format_hex(main_addr) + rt = _run_cli("retype", main_hex, target_var_name, "Point *", + "--id", ida_id, "--json") + self.assertEqual(rt.returncode, 0, rt.stderr) + self.assertTrue(json.loads(rt.stdout)["success"]) + + # Sync IDA -> Ghidra (sync takes a function address). + rs = _run_cli("sync", main_hex, "--from-id", ida_id, "--id", ghidra_id, "--json") + sync_payload = json.loads(rs.stdout) + self.assertTrue(sync_payload["success"], f"sync failed: {sync_payload}") + + # Verify on Ghidra: the struct exists and the var references it. + ghidra = self._client_for(ghidra_id) + try: + self.assertIn("Point", ghidra.structs, + f"Point not in ghidra structs: {list(ghidra.structs.keys())}") + gfunc = ghidra.functions[sync_payload["addr"]] + point_typed = [sv for sv in gfunc.stack_vars.values() + if "Point" in str(sv.type or "")] + self.assertTrue(point_typed, + "no ghidra var references Point: " + f"{[(sv.name, sv.type) for sv in gfunc.stack_vars.values()]}") + finally: + ghidra.shutdown() + + +# --------------------------------------------------------------------------- +# Type-definition parser unit tests: backend-free, cheap to iterate on. +# --------------------------------------------------------------------------- + +class TestTypeDefinitionParser(unittest.TestCase): + def test_struct_offsets_and_size(self): + from libbs.api.type_definition_parser import parse_type_definition + s = parse_type_definition("struct Point { int x; int y; }") + self.assertEqual(s.name, "Point") + self.assertEqual(s.members[0].name, "x") + self.assertEqual(s.members[0].size, 4) + self.assertEqual(s.members[4].name, "y") + self.assertEqual(s.size, 8) + + def test_struct_pointer_and_array_members(self): + from libbs.api.type_definition_parser import parse_type_definition + s = parse_type_definition("struct S { char *name; int arr[4]; struct Foo *fp; }") + types = {m.name: m.type for m in s.members.values()} + self.assertEqual(types["name"], "char *") + self.assertEqual(types["arr"], "int [4]") + self.assertEqual(types["fp"], "struct Foo *") + + def test_enum(self): + from libbs.api.type_definition_parser import parse_type_definition + e = parse_type_definition("enum Color { RED, GREEN=5, BLUE }") + self.assertEqual(dict(e.members), {"RED": 0, "GREEN": 5, "BLUE": 6}) + + def test_typedef(self): + from libbs.api.type_definition_parser import parse_type_definition + t = parse_type_definition("typedef char *str_t") + self.assertEqual(t.name, "str_t") + self.assertEqual(t.type, "char *") + + def test_bad_input_raises(self): + from libbs.api.type_definition_parser import ( + parse_type_definition, TypeDefinitionParseError, + ) + for bad in ["struct {", "not c @#", "", "struct Empty {}", + "struct A { int a; }; struct B { int b; };"]: + with self.assertRaises(TypeDefinitionParseError): + parse_type_definition(bad) + + # --------------------------------------------------------------------------- # Artifact-serialization unit tests: keep these separate from the CLI # subprocess tests so they run in isolation and are cheap to iterate on.