From 4eaaea4d6f73c9d048959cce2477dbfb2c505174 Mon Sep 17 00:00:00 2001 From: Codex Date: Sun, 7 Jun 2026 21:24:34 -0300 Subject: [PATCH 1/3] Optimize IDA streaming export memory use --- install.sh | 24 +++- src/tocode/analysis.py | 15 ++ src/tocode/backends/ida.py | 95 ++++++++---- src/tocode/exporter.py | 286 +++++++++++++++++++++++++++++++++++-- src/tocode/parallel.py | 53 ++++++- tests/test_algorithms.py | 34 ++++- 6 files changed, 462 insertions(+), 45 deletions(-) diff --git a/install.sh b/install.sh index f167102..3369b07 100644 --- a/install.sh +++ b/install.sh @@ -2,10 +2,18 @@ set -euo pipefail repo_url="${TOCODE_REPO_URL:-https://github.com/buzzer-re/ToCode.git}" -install_dir="${TOCODE_INSTALL_DIR:-$HOME/ToCode}" branch="${TOCODE_BRANCH:-main}" with_dev=false +script_dir="$(cd "$(dirname "$0")" && pwd)" +if [ -n "${TOCODE_INSTALL_DIR:-}" ]; then + install_dir="$TOCODE_INSTALL_DIR" +elif [ -d "$script_dir/.git" ] && [ -f "$script_dir/pyproject.toml" ]; then + install_dir="$script_dir" +else + install_dir="$HOME/ToCode" +fi + usage() { cat <<'EOF' Install ToCode on Linux or macOS. @@ -14,7 +22,7 @@ Usage: ./install.sh [options] Options: - --dir PATH Clone or update ToCode at PATH. Default: $HOME/ToCode + --dir PATH Clone or update ToCode at PATH. Default: this checkout when run from one, otherwise $HOME/ToCode --repo URL Git repository URL. Default: https://github.com/buzzer-re/ToCode.git --branch NAME Branch to install. Default: main --dev Also install development extras in the local checkout @@ -124,10 +132,14 @@ done command -v git >/dev/null 2>&1 || die "git is required but was not found on PATH" if [ -d "$install_dir/.git" ]; then - info "Updating ToCode at $install_dir" - git -C "$install_dir" fetch origin "$branch" - git -C "$install_dir" checkout "$branch" - git -C "$install_dir" pull --ff-only origin "$branch" + if [ "$install_dir" = "$script_dir" ]; then + info "Installing ToCode from this checkout at $install_dir" + else + info "Updating ToCode at $install_dir" + git -C "$install_dir" fetch origin "$branch" + git -C "$install_dir" checkout "$branch" + git -C "$install_dir" pull --ff-only origin "$branch" + fi elif [ -e "$install_dir" ]; then die "$install_dir already exists and is not a Git checkout" else diff --git a/src/tocode/analysis.py b/src/tocode/analysis.py index ef4e1aa..7455df3 100644 --- a/src/tocode/analysis.py +++ b/src/tocode/analysis.py @@ -159,6 +159,21 @@ def prepare_parallel_workers(self) -> None: if callable(prepare): prepare() + def release_parallel_resources(self) -> None: + release = getattr(self.session, "release_parallel_resources", None) + if callable(release): + release() + + def restore_parallel_resources(self) -> None: + restore = getattr(self.session, "restore_parallel_resources", None) + if callable(restore): + restore() + + def release_render_memory(self) -> None: + release = getattr(self.session, "release_render_memory", None) + if callable(release): + release() + def _binary_facts( self, info: dict[str, Any], entries: list[dict[str, Any]] ) -> BinaryFacts: diff --git a/src/tocode/backends/ida.py b/src/tocode/backends/ida.py index de8fbf8..897eb71 100644 --- a/src/tocode/backends/ida.py +++ b/src/tocode/backends/ida.py @@ -74,6 +74,7 @@ def __init__( self._ida_fixup = self._optional_import("ida_fixup") self._ida_auto = self._optional_import("ida_auto") self._ida_nalt = self._optional_import("ida_nalt") + self._db: Any = None if db_path is None: resolved_db, first_open = _database_path(self.binary) @@ -81,6 +82,9 @@ def __init__( else: resolved_db = db_path needs_analysis = bool(needs_analysis) + self.analysis_command = ( + "IDA Domain auto-analysis" if needs_analysis else "IDA database inventory" + ) self._cache_db = None if is_ida_database(self.binary) else resolved_db if needs_analysis: @@ -159,12 +163,15 @@ def analyze(self) -> None: self._strings_ready = True def close(self) -> None: + if self._db is None: + return try: self._db.close(save=self._opened_for_analysis) except Exception: # noqa: BLE001 pass finally: self._opened_for_analysis = False + self._db = None def database_path(self) -> Path | None: if self._cache_db is None: @@ -180,9 +187,50 @@ def prepare_parallel_workers(self) -> None: return self._save_and_reopen_database() + def release_parallel_resources(self) -> None: + if self._cache_db is not None: + self.prepare_parallel_workers() + self._clear_caches() + if self._db is None: + return + try: + self._db.close(save=False) + except Exception as exc: # noqa: BLE001 + raise BackendError("failed to close parent IDA database") from exc + self._db = None + self._opened_for_analysis = False + self._decompiler_ready = False + + def restore_parallel_resources(self) -> None: + if self._db is not None: + return + resolved_db = self._cache_db + if resolved_db is None and is_ida_database(self.binary): + resolved_db = self.binary + if resolved_db is None: + return + self._open_existing_database(resolved_db) + + def release_render_memory(self) -> None: + self._disasm_cache.clear() + self._decompile_cache.clear() + self._summary_cache.clear() + self._locals_cache.clear() + if self._ida_hexrays is None: + return + clear_cached = getattr(self._ida_hexrays, "clear_cached_cfuncs", None) + if callable(clear_cached): + try: + clear_cached() + except Exception: # noqa: BLE001 + pass + def _save_and_reopen_database(self) -> None: if self._cache_db is None: return + if self._db is None: + self._open_existing_database(self._cache_db) + return try: self._db.close(save=True) except Exception as exc: # noqa: BLE001 @@ -190,22 +238,29 @@ def _save_and_reopen_database(self) -> None: f"failed to save IDA database at {self._cache_db}" ) from exc self._opened_for_analysis = False + self._open_existing_database(self._cache_db) + + def _open_existing_database(self, resolved_db: Path) -> None: options = self._Options(auto_analysis=False, new_database=False) try: self._db = self._Database.open( - str(self._cache_db), args=options, save_on_close=False + str(resolved_db), args=options, save_on_close=False ) except Exception as exc: # noqa: BLE001 raise BackendError( - f"failed to reopen IDA database at {self._cache_db}" + f"failed to reopen IDA database at {resolved_db}" ) from exc + self._opened_for_analysis = False + self._clear_caches() + self.ensure_decompiler() + + def _clear_caches(self) -> None: self._decompiler_ready = False self._disasm_cache.clear() self._decompile_cache.clear() self._summary_cache.clear() self._locals_cache.clear() self._primed.clear() - self.ensure_decompiler() def worker(self) -> "IdaSession": if self._cache_db is not None and self._cache_db.exists(): @@ -413,19 +468,10 @@ def functions(self) -> list[dict[str, Any]]: for func in self._db.functions: name = self._db.functions.get_name(func) or f"sub_{func.start_ea:x}" segment_name = self._segment_name(func.start_ea) - self._prime(func.start_ea) flags = self._db.functions.get_flags(func) is_library = bool(flags & FunctionFlags.LIB) is_thunk = bool(flags & FunctionFlags.THUNK) - lvars = self._locals(func.start_ea) - args = sum(1 for item in lvars if bool(getattr(item, "is_argument", False))) - locals_count = sum( - 1 - for item in lvars - if not bool(getattr(item, "is_argument", False)) - and not bool(getattr(item, "is_result", False)) - ) rows.append( { "offset": int(func.start_ea), @@ -435,8 +481,8 @@ def functions(self) -> list[dict[str, Any]]: "calltype": None, "noreturn": not bool(self._db.functions.does_return(func)), "stackframe": int(getattr(func, "frsize", 0) or 0), - "nlocals": locals_count, - "nargs": args, + "nlocals": 0, + "nargs": 0, "outdegree": 0, "indegree": 0, "is_library": is_library, @@ -449,20 +495,14 @@ def functions(self) -> list[dict[str, Any]]: return rows def disasm(self, address: int) -> str: - if address not in self._disasm_cache: - func = self._need_function(address) - self._disasm_cache[address] = "\n".join(self._function_disassembly(func)) - return self._disasm_cache[address] + func = self._need_function(address) + return "\n".join(self._function_disassembly(func)) def decompile(self, address: int) -> str: - if address not in self._decompile_cache: - self.ensure_decompiler() - func = self._need_function(address) - lines = self._function_pseudocode(func) - self._decompile_cache[address] = ( - "\n".join(lines) if isinstance(lines, list) else str(lines) - ) - return self._decompile_cache[address] + self.ensure_decompiler() + func = self._need_function(address) + lines = self._function_pseudocode(func) + return "\n".join(lines) if isinstance(lines, list) else str(lines) def function_summary(self, address: int) -> str: if address in self._summary_cache: @@ -494,8 +534,7 @@ def function_summary(self, address: int) -> str: ] if callee_names: lines.append(f"callee_names: {', '.join(callee_names)}") - self._summary_cache[address] = "\n".join(lines) - return self._summary_cache[address] + return "\n".join(lines) def calls_from( self, address: int, imports, functions diff --git a/src/tocode/exporter.py b/src/tocode/exporter.py index f876e7e..58968af 100644 --- a/src/tocode/exporter.py +++ b/src/tocode/exporter.py @@ -43,7 +43,7 @@ normalize_source, summary_file_name, ) -from .parallel import choose_jobs, describe_jobs +from .parallel import available_memory_mb, choose_jobs, describe_jobs from .progress import Progress from .schema import ( Cluster, @@ -154,10 +154,12 @@ def export_binary( ) _prepare_tree(context) _cluster(context) - _render(context) - _write_raw(context) if tree: + _render(context) + _write_raw(context) _write_tree(context) + else: + _render_and_write_raw(context) _write_metadata(context) return _summary(context) @@ -205,9 +207,26 @@ def _cluster(context: ExportContext) -> None: def _render(context: ExportContext) -> None: + _select_render_workers(context) analysis = _need(context.analysis) names = _need(context.names) count = len(context.addresses) + + context.progress.log( + f"Rendering {count} functions in {len(context.clusters)} clusters with {context.analyzer.decompiler_label}" + ) + context.rendered = render_functions( + analyzer=context.analyzer, + analysis=analysis, + addresses=context.addresses, + names=names, + progress=context.progress, + worker_count=context.worker_count, + ) + + +def _select_render_workers(context: ExportContext) -> None: + count = len(context.addresses) if context.analyzer.supports_parallel: context.requested_jobs = context.jobs context.worker_count = choose_jobs( @@ -215,6 +234,9 @@ def _render(context: ExportContext) -> None: analysis_seconds=context.analyzer.analysis_seconds, requested=context.jobs, backend=context.analyzer.backend_name, + available_memory_mb=available_memory_mb() + if context.analyzer.backend_name == "ida" + else None, ) context.render_mode = "process" if context.worker_count > 1 else "single" context.progress.log( @@ -233,17 +255,37 @@ def _render(context: ExportContext) -> None: f"Rendering with one {context.analyzer.backend_label} session" ) + +def _render_and_write_raw(context: ExportContext) -> None: + _select_render_workers(context) + if context.render_mode == "single": + context.render_mode = "stream" + elif context.worker_count > 1: + context.render_mode = "stream-process" + analysis = _need(context.analysis) + names = _need(context.names) context.progress.log( - f"Rendering {count} functions in {len(context.clusters)} clusters with {context.analyzer.decompiler_label}" + f"Rendering and writing {len(context.addresses)} functions in {len(context.clusters)} clusters with {context.analyzer.decompiler_label}" ) - context.rendered = render_functions( + written = render_and_write_source_tree( analyzer=context.analyzer, analysis=analysis, - addresses=context.addresses, + clusters=context.clusters, + src_dir=_need(context.raw_dir), + asm_dir=_need(context.raw_dir), + summary_dir=_need(context.raw_dir), + include_dir=_need(context.include_dir), + header_name=context.header_name, names=names, + prototypes=context.prototypes, progress=context.progress, worker_count=context.worker_count, ) + context.raw_sources = written["sources"] + context.asm_files = written["asm"] + context.summary_files = written["summaries"] + context.failures = written["failures"] + context.raw_ranges = written["ranges"] def _write_raw(context: ExportContext) -> None: @@ -364,6 +406,228 @@ def write_source_tree( } +def render_and_write_source_tree( + *, + analyzer: BinaryAnalyzer, + analysis: ProgramAnalysis, + clusters: list[Cluster], + src_dir: Path, + asm_dir: Path, + summary_dir: Path, + include_dir: Path, + header_name: str, + names: NameBook, + prototypes: dict[int, str], + progress: Progress | None = None, + worker_count: int = 1, +) -> dict[str, Any]: + if worker_count > 1: + try: + return render_and_write_source_tree_parallel( + analyzer=analyzer, + analysis=analysis, + clusters=clusters, + src_dir=src_dir, + asm_dir=asm_dir, + summary_dir=summary_dir, + include_dir=include_dir, + header_name=header_name, + names=names, + prototypes=prototypes, + progress=progress, + worker_count=worker_count, + ) + except Exception as exc: # noqa: BLE001 + analyzer.restore_parallel_resources() + if progress is not None: + progress.log( + f"Warning: streaming workers failed ({exc}); retrying with the primary session" + ) + + sources: list[Path] = [] + asm_files: list[Path] = [] + summaries: list[Path] = [] + failures: list[FunctionFailure] = [] + ranges: list[FunctionRange] = [] + total = sum(len(cluster.members) for cluster in clusters) + + bar_context = ( + progress.bar(total=total, desc="exporting", unit="func") + if progress + else nullcontext() + ) + with bar_context as bar: + for cluster in clusters: + c_path = _cluster_path(src_dir, cluster, c_file_name(cluster)) + asm_path = _cluster_path(asm_dir, cluster, asm_file_name(cluster)) + summary_path = _cluster_path( + summary_dir, cluster, summary_file_name(cluster) + ) + c_path.parent.mkdir(parents=True, exist_ok=True) + asm_path.parent.mkdir(parents=True, exist_ok=True) + summary_path.parent.mkdir(parents=True, exist_ok=True) + include_path = Path( + os.path.relpath(include_dir / header_name, c_path.parent) + ).as_posix() + rendered: dict[int, RenderedFunction] = {} + for address in cluster.members: + rendered[address] = render_one( + analyzer, analysis, analysis.routines[address], names + ) + if bar is not None: + bar.update(1) + block = build_cluster_files( + analysis=analysis, + cluster=cluster, + header_include=include_path, + c_path=c_path, + asm_path=asm_path, + summary_path=summary_path, + rendered=rendered, + prototypes=prototypes, + ) + c_path.write_text(block["c"], encoding="utf-8") + asm_path.write_text(block["asm"], encoding="utf-8") + summary_path.write_text(block["summary"], encoding="utf-8") + sources.append(c_path.resolve()) + asm_files.append(asm_path.resolve()) + summaries.append(summary_path.resolve()) + ranges.extend(block["ranges"]) + failures.extend(block["failures"]) + release_memory = getattr(analyzer, "release_render_memory", None) + if callable(release_memory): + release_memory() + + return { + "sources": sources, + "asm": asm_files, + "summaries": summaries, + "failures": failures, + "ranges": ranges, + } + + +def render_and_write_source_tree_parallel( + *, + analyzer: BinaryAnalyzer, + analysis: ProgramAnalysis, + clusters: list[Cluster], + src_dir: Path, + asm_dir: Path, + summary_dir: Path, + include_dir: Path, + header_name: str, + names: NameBook, + prototypes: dict[int, str], + progress: Progress | None, + worker_count: int, +) -> dict[str, Any]: + total = sum(len(cluster.members) for cluster in clusters) + if progress is not None: + progress.log(f"Opening {worker_count} streaming workers for {total} functions") + analyzer.prepare_parallel_workers() + spec = _worker_spec(analyzer) + analyzer.release_parallel_resources() + + sources: list[Path] = [] + asm_files: list[Path] = [] + summaries: list[Path] = [] + failures: list[FunctionFailure] = [] + ranges: list[FunctionRange] = [] + + ctx = multiprocessing.get_context("spawn") + bar_context = ( + progress.bar(total=total, desc="exporting", unit="func") + if progress + else nullcontext() + ) + with bar_context as bar: + with ProcessPoolExecutor( + max_workers=worker_count, + mp_context=ctx, + initializer=_init_worker, + initargs=(spec, analysis, names), + ) as executor: + for cluster in clusters: + rendered: dict[int, RenderedFunction] = {} + futures = { + executor.submit(_render_in_worker, address): address + for address in cluster.members + } + for future in as_completed(futures): + result_address, result = future.result() + rendered[result_address] = result + if bar is not None: + bar.update(1) + block = _write_rendered_cluster( + analysis=analysis, + cluster=cluster, + src_dir=src_dir, + asm_dir=asm_dir, + summary_dir=summary_dir, + include_dir=include_dir, + header_name=header_name, + rendered=rendered, + prototypes=prototypes, + ) + sources.append(block["source"]) + asm_files.append(block["asm_file"]) + summaries.append(block["summary_file"]) + ranges.extend(block["ranges"]) + failures.extend(block["failures"]) + + return { + "sources": sources, + "asm": asm_files, + "summaries": summaries, + "failures": failures, + "ranges": ranges, + } + + +def _write_rendered_cluster( + *, + analysis: ProgramAnalysis, + cluster: Cluster, + src_dir: Path, + asm_dir: Path, + summary_dir: Path, + include_dir: Path, + header_name: str, + rendered: dict[int, RenderedFunction], + prototypes: dict[int, str], +) -> dict[str, Any]: + c_path = _cluster_path(src_dir, cluster, c_file_name(cluster)) + asm_path = _cluster_path(asm_dir, cluster, asm_file_name(cluster)) + summary_path = _cluster_path(summary_dir, cluster, summary_file_name(cluster)) + c_path.parent.mkdir(parents=True, exist_ok=True) + asm_path.parent.mkdir(parents=True, exist_ok=True) + summary_path.parent.mkdir(parents=True, exist_ok=True) + include_path = Path( + os.path.relpath(include_dir / header_name, c_path.parent) + ).as_posix() + block = build_cluster_files( + analysis=analysis, + cluster=cluster, + header_include=include_path, + c_path=c_path, + asm_path=asm_path, + summary_path=summary_path, + rendered=rendered, + prototypes=prototypes, + ) + c_path.write_text(block["c"], encoding="utf-8") + asm_path.write_text(block["asm"], encoding="utf-8") + summary_path.write_text(block["summary"], encoding="utf-8") + return { + "source": c_path.resolve(), + "asm_file": asm_path.resolve(), + "summary_file": summary_path.resolve(), + "ranges": block["ranges"], + "failures": block["failures"], + } + + def write_tree_sources( *, analysis: ProgramAnalysis, @@ -673,6 +937,7 @@ def render_functions( analyzer, analysis, addresses, names, progress, worker_count ) except Exception as exc: # noqa: BLE001 + analyzer.restore_parallel_resources() progress.log( f"Warning: parallel export failed ({exc}); retrying with the primary session" ) @@ -707,6 +972,7 @@ def _render_parallel( progress.log(f"Opening {worker_count} workers for {len(addresses)} functions") analyzer.prepare_parallel_workers() spec = _worker_spec(analyzer) + analyzer.release_parallel_resources() output: dict[int, RenderedFunction] = {} pending = set(addresses) with progress.bar(total=len(addresses), desc="exporting", unit="func") as bar: @@ -820,9 +1086,11 @@ def _render_in_worker(address: int) -> tuple[int, RenderedFunction]: if _WORKER_SESSION is None or _WORKER_ANALYSIS is None or _WORKER_NAMES is None: raise RuntimeError("render worker was not initialized") routine = _WORKER_ANALYSIS.routines[address] - return address, render_one( - _WORKER_SESSION, _WORKER_ANALYSIS, routine, _WORKER_NAMES - ) + result = render_one(_WORKER_SESSION, _WORKER_ANALYSIS, routine, _WORKER_NAMES) + release_memory = getattr(_WORKER_SESSION, "release_render_memory", None) + if callable(release_memory): + release_memory() + return address, result def _render_isolated( diff --git a/src/tocode/parallel.py b/src/tocode/parallel.py index 04f4eb6..13c00af 100644 --- a/src/tocode/parallel.py +++ b/src/tocode/parallel.py @@ -7,9 +7,10 @@ FAST_ANALYSIS_SECONDS = 5.0 MIN_FUNCTIONS_FOR_AUTO = 32 MAX_AUTO_JOBS = 8 -MAX_AUTO_IDA_JOBS = 4 +MAX_AUTO_IDA_JOBS = 2 FUNCTIONS_PER_WORKER = 32 DEFAULT_JOB_LIMIT = 16 +DEFAULT_IDA_WORKER_MEMORY_MB = 3072 def choose_jobs( @@ -20,6 +21,8 @@ def choose_jobs( backend: str, cpu_count: int | None = None, job_limit: int | None = None, + available_memory_mb: int | None = None, + ida_worker_memory_mb: int | None = None, ) -> int: limit = job_limit if job_limit is not None else configured_job_limit() if requested is not None: @@ -33,6 +36,14 @@ def choose_jobs( cpus = cpu_count if cpu_count is not None else (os.cpu_count() or 1) backend_limit = MAX_AUTO_IDA_JOBS if backend.lower() == "ida" else MAX_AUTO_JOBS ceiling = min(cpus, backend_limit, limit, function_count) + if backend.lower() == "ida" and available_memory_mb is not None: + worker_memory_mb = ( + ida_worker_memory_mb + if ida_worker_memory_mb is not None + else configured_ida_worker_memory_mb() + ) + memory_ceiling = available_memory_mb // worker_memory_mb + ceiling = min(ceiling, max(1, memory_ceiling)) target = math.ceil(function_count / FUNCTIONS_PER_WORKER) return max(1, min(ceiling, target)) @@ -65,3 +76,43 @@ def configured_job_limit() -> int: except ValueError: return DEFAULT_JOB_LIMIT return max(1, value) + + +def configured_ida_worker_memory_mb() -> int: + raw = os.environ.get( + "TOCODE_IDA_WORKER_MEMORY_MB", str(DEFAULT_IDA_WORKER_MEMORY_MB) + ).strip() + try: + value = int(raw) + except ValueError: + return DEFAULT_IDA_WORKER_MEMORY_MB + return max(512, value) + + +def available_memory_mb() -> int | None: + meminfo = _linux_mem_available_mb() + if meminfo is not None: + return meminfo + try: + pages = os.sysconf("SC_AVPHYS_PAGES") + page_size = os.sysconf("SC_PAGE_SIZE") + except (AttributeError, OSError, ValueError): + return None + if not isinstance(pages, int) or not isinstance(page_size, int): + return None + if pages <= 0 or page_size <= 0: + return None + return pages * page_size // (1024 * 1024) + + +def _linux_mem_available_mb() -> int | None: + try: + with open("/proc/meminfo", encoding="utf-8") as handle: + for line in handle: + if line.startswith("MemAvailable:"): + parts = line.split() + if len(parts) >= 2: + return int(parts[1]) // 1024 + except (OSError, ValueError): + return None + return None diff --git a/tests/test_algorithms.py b/tests/test_algorithms.py index 45f66ee..a2adf91 100644 --- a/tests/test_algorithms.py +++ b/tests/test_algorithms.py @@ -36,7 +36,39 @@ def test_choose_jobs_caps_auto_ida_parallelism() -> None: cpu_count=32, job_limit=64, ) - == 4 + == 2 + ) + + +def test_choose_jobs_limits_auto_ida_parallelism_by_available_memory() -> None: + assert ( + choose_jobs( + function_count=300, + analysis_seconds=0.2, + requested=None, + backend="ida", + cpu_count=32, + job_limit=64, + available_memory_mb=3500, + ida_worker_memory_mb=4096, + ) + == 1 + ) + + +def test_requested_jobs_are_not_limited_by_available_memory() -> None: + assert ( + choose_jobs( + function_count=300, + analysis_seconds=0.2, + requested=3, + backend="ida", + cpu_count=32, + job_limit=64, + available_memory_mb=3500, + ida_worker_memory_mb=4096, + ) + == 3 ) From 155959b760e28ba165dd3607bcbefcbfb12c8964 Mon Sep 17 00:00:00 2001 From: Codex Date: Sun, 7 Jun 2026 21:47:02 -0300 Subject: [PATCH 2/3] Log streaming worker startup progress --- src/tocode/exporter.py | 61 +++++++++++++++++++++++++++++++++++++++++- 1 file changed, 60 insertions(+), 1 deletion(-) diff --git a/src/tocode/exporter.py b/src/tocode/exporter.py index 58968af..2054069 100644 --- a/src/tocode/exporter.py +++ b/src/tocode/exporter.py @@ -525,9 +525,16 @@ def render_and_write_source_tree_parallel( total = sum(len(cluster.members) for cluster in clusters) if progress is not None: progress.log(f"Opening {worker_count} streaming workers for {total} functions") + progress.log("Preparing IDA worker database copies") analyzer.prepare_parallel_workers() spec = _worker_spec(analyzer) + if progress is not None: + progress.log("Closing parent backend session before workers open") analyzer.release_parallel_resources() + if progress is not None: + progress.log( + "Starting streaming worker processes; opening IDA databases may take a while" + ) sources: list[Path] = [] asm_files: list[Path] = [] @@ -548,7 +555,21 @@ def render_and_write_source_tree_parallel( initializer=_init_worker, initargs=(spec, analysis, names), ) as executor: - for cluster in clusters: + _wait_for_streaming_workers( + executor=executor, + worker_count=worker_count, + progress=progress, + ) + cluster_total = len(clusters) + for cluster_index, cluster in enumerate(clusters, start=1): + if progress is not None: + progress.log( + _cluster_progress_message( + cluster_index=cluster_index, + cluster_total=cluster_total, + cluster=cluster, + ) + ) rendered: dict[int, RenderedFunction] = {} futures = { executor.submit(_render_in_worker, address): address @@ -585,6 +606,38 @@ def render_and_write_source_tree_parallel( } +def _wait_for_streaming_workers( + *, + executor: ProcessPoolExecutor, + worker_count: int, + progress: Progress | None, +) -> None: + ready_futures = [executor.submit(_worker_ready) for _ in range(worker_count)] + bar_context = ( + progress.bar(total=worker_count, desc="opening workers", unit="worker") + if progress + else nullcontext() + ) + pids: set[int] = set() + with bar_context as bar: + for future in as_completed(ready_futures): + pids.add(future.result()) + if bar is not None: + bar.update(1) + if progress is not None: + pid_text = ", ".join(str(pid) for pid in sorted(pids)) + progress.log(f"Streaming workers ready: {pid_text}") + + +def _cluster_progress_message( + *, cluster_index: int, cluster_total: int, cluster: Cluster +) -> str: + return ( + f"Rendering cluster {cluster_index}/{cluster_total}: " + f"{cluster.label} ({len(cluster.members)} functions)" + ) + + def _write_rendered_cluster( *, analysis: ProgramAnalysis, @@ -1093,6 +1146,12 @@ def _render_in_worker(address: int) -> tuple[int, RenderedFunction]: return address, result +def _worker_ready() -> int: + if _WORKER_SESSION is None: + raise RuntimeError("render worker was not initialized") + return os.getpid() + + def _render_isolated( spec: WorkerSpec, analysis: ProgramAnalysis, From 8503d6a841fb3c5250e8148e0759bee8849fe601 Mon Sep 17 00:00:00 2001 From: Codex Date: Tue, 9 Jun 2026 22:29:02 -0300 Subject: [PATCH 3/3] Fix kernel .i64 export OOM and add phase progress Resolve out-of-memory kills and silent phases when exporting large already-analyzed databases (e.g. PS5 kernel .i64), without ever re-running auto-analysis on an existing database. - Place per-worker IDA database copies on durable on-disk storage (cache root, overridable via TOCODE_WORKER_TMP_DIR) instead of the system temp dir, which is frequently RAM-backed tmpfs; copying a multi-gigabyte database there pinned memory and OOM-killed the export. - Skip the worker copy entirely when a single worker renders by opening the prepared database in place. - Size the auto IDA worker budget from the database size so a huge database does not over-subscribe RAM. - Log "Loading " before constructing the backend session so the database open/load is no longer a silent gap. - Drop the per-cluster render log that flooded output and broke the progress bar. - Report the metadata phase with a stepped progress bar and copy the published database in chunks with a byte progress bar. - Annotate _resolve_thunk to clear the mypy annotation-unchecked note. Co-Authored-By: Claude Opus 4.8 --- src/tocode/analysis.py | 5 +- src/tocode/backends/ida.py | 2 +- src/tocode/exporter.py | 291 +++++++++++++++++++++++++++---------- src/tocode/parallel.py | 44 ++++-- src/tocode/progress.py | 11 +- tests/test_algorithms.py | 37 +++++ 6 files changed, 302 insertions(+), 88 deletions(-) diff --git a/src/tocode/analysis.py b/src/tocode/analysis.py index 7455df3..e3b5c3f 100644 --- a/src/tocode/analysis.py +++ b/src/tocode/analysis.py @@ -36,7 +36,6 @@ def __init__( self.progress = progress or Progress() self.analysis: ProgramAnalysis | None = None self.analysis_seconds: float | None = None - self.progress.log(f"Loading {self.binary}") @property def backend_name(self) -> str: @@ -413,6 +412,10 @@ def create_analyzer( ) if progress is not None: progress.log(f"Using {choice.selected.upper()} as backend.") + # Log before constructing the session: opening (and, for a fresh binary, + # loading) the IDA database happens inside the session constructor and can + # take a while, so the user should see activity instead of a silent gap. + progress.log(f"Loading {Path(binary).resolve()}") if choice.selected == "ida": return BinaryAnalyzer( binary, diff --git a/src/tocode/backends/ida.py b/src/tocode/backends/ida.py index 897eb71..e390361 100644 --- a/src/tocode/backends/ida.py +++ b/src/tocode/backends/ida.py @@ -557,7 +557,7 @@ def calls_from( imported.add(name) return sorted(edges), sorted(name for name in imported if name) - def _resolve_thunk(self, func): + def _resolve_thunk(self, func: Any) -> Any: from ida_domain.functions import FunctionFlags current = func diff --git a/src/tocode/exporter.py b/src/tocode/exporter.py index 2054069..cbcbb16 100644 --- a/src/tocode/exporter.py +++ b/src/tocode/exporter.py @@ -14,7 +14,9 @@ from typing import Any from .analysis import BinaryAnalyzer +from .backends.base import is_ida_database from .backends.ida import IdaSession +from .backends.ida import _cache_root as _ida_cache_root from .backends.r2 import R2Session from .cluster import cluster_routines from .metadata import ( @@ -125,6 +127,7 @@ class WorkerSpec: idadir: Path | None = None ida_domain_path: Path | None = None db_path: Path | None = None + copy_db: bool = True @dataclass(frozen=True, slots=True) @@ -229,13 +232,15 @@ def _select_render_workers(context: ExportContext) -> None: count = len(context.addresses) if context.analyzer.supports_parallel: context.requested_jobs = context.jobs + is_ida = context.analyzer.backend_name == "ida" context.worker_count = choose_jobs( function_count=count, analysis_seconds=context.analyzer.analysis_seconds, requested=context.jobs, backend=context.analyzer.backend_name, - available_memory_mb=available_memory_mb() - if context.analyzer.backend_name == "ida" + available_memory_mb=available_memory_mb() if is_ida else None, + database_size_mb=_worker_database_size_mb(context.analyzer) + if is_ida else None, ) context.render_mode = "process" if context.worker_count > 1 else "single" @@ -527,7 +532,7 @@ def render_and_write_source_tree_parallel( progress.log(f"Opening {worker_count} streaming workers for {total} functions") progress.log("Preparing IDA worker database copies") analyzer.prepare_parallel_workers() - spec = _worker_spec(analyzer) + spec = _worker_spec(analyzer, copy_db=worker_count > 1) if progress is not None: progress.log("Closing parent backend session before workers open") analyzer.release_parallel_resources() @@ -560,16 +565,7 @@ def render_and_write_source_tree_parallel( worker_count=worker_count, progress=progress, ) - cluster_total = len(clusters) - for cluster_index, cluster in enumerate(clusters, start=1): - if progress is not None: - progress.log( - _cluster_progress_message( - cluster_index=cluster_index, - cluster_total=cluster_total, - cluster=cluster, - ) - ) + for cluster in clusters: rendered: dict[int, RenderedFunction] = {} futures = { executor.submit(_render_in_worker, address): address @@ -629,15 +625,6 @@ def _wait_for_streaming_workers( progress.log(f"Streaming workers ready: {pid_text}") -def _cluster_progress_message( - *, cluster_index: int, cluster_total: int, cluster: Cluster -) -> str: - return ( - f"Rendering cluster {cluster_index}/{cluster_total}: " - f"{cluster.label} ({len(cluster.members)} functions)" - ) - - def _write_rendered_cluster( *, analysis: ProgramAnalysis, @@ -1024,7 +1011,7 @@ def _render_parallel( ) -> dict[int, RenderedFunction]: progress.log(f"Opening {worker_count} workers for {len(addresses)} functions") analyzer.prepare_parallel_workers() - spec = _worker_spec(analyzer) + spec = _worker_spec(analyzer, copy_db=worker_count > 1) analyzer.release_parallel_resources() output: dict[int, RenderedFunction] = {} pending = set(addresses) @@ -1057,7 +1044,7 @@ def _render_parallel( return output -def _worker_spec(analyzer: BinaryAnalyzer) -> WorkerSpec: +def _worker_spec(analyzer: BinaryAnalyzer, *, copy_db: bool = True) -> WorkerSpec: session = analyzer.session return WorkerSpec( backend=analyzer.backend_name, @@ -1066,9 +1053,31 @@ def _worker_spec(analyzer: BinaryAnalyzer) -> WorkerSpec: idadir=getattr(session, "idadir", None), ida_domain_path=getattr(session, "ida_domain_path", None), db_path=_session_database_path(session), + copy_db=copy_db, ) +def _worker_database_size_mb(analyzer: BinaryAnalyzer) -> int | None: + """On-disk size (MiB) of the database each worker will load, if known. + + Used to size the per-worker memory budget so that a very large database + (such as a kernel `.i64`) does not spawn more workers than RAM can hold. + """ + session = analyzer.session + candidate = getattr(session, "_cache_db", None) + if candidate is None: + binary = getattr(session, "binary", None) + if binary is not None and is_ida_database(Path(binary)): + candidate = binary + if candidate is None: + return None + try: + size = Path(candidate).stat().st_size + except OSError: + return None + return max(1, size // (1024 * 1024)) + + def _session_database_path(session: object) -> Path | None: database_path = getattr(session, "database_path", None) if callable(database_path): @@ -1087,16 +1096,23 @@ def _init_worker(spec: WorkerSpec, analysis: ProgramAnalysis, names: NameBook) - def _open_worker(spec: WorkerSpec): session: Any if spec.backend == "ida": - worker_db = ( - _copy_worker_database(spec.db_path) if spec.db_path is not None else None - ) + # When a single worker renders, it can open the prepared database in place + # instead of duplicating it. Copying a large database (e.g. a kernel `.i64`) + # is only needed so that concurrent workers do not share one IDA lock, and a + # copy on RAM-backed temp storage would otherwise exhaust memory. + if spec.db_path is not None and spec.copy_db: + worker_db = _copy_worker_database(spec.db_path) + open_db: Path | None = worker_db + else: + worker_db = None + open_db = spec.db_path try: session = IdaSession( spec.binary, idadir=spec.idadir, ida_domain_path=spec.ida_domain_path, - db_path=worker_db, - needs_analysis=False if worker_db is not None else None, + db_path=open_db, + needs_analysis=False if open_db is not None else None, ) except Exception: if worker_db is not None: @@ -1116,13 +1132,35 @@ def _open_worker(spec: WorkerSpec): def _copy_worker_database(db_path: Path) -> Path: - fd, name = tempfile.mkstemp(prefix="tocode-ida-worker-", suffix=db_path.suffix) + # Place worker copies on durable, on-disk storage rather than the default + # system temp dir, which is frequently RAM-backed (tmpfs). Copying a + # multi-gigabyte IDA database into tmpfs would pin that memory and OOM-kill + # the export. The on-disk page cache used here is reclaimable under pressure. + copy_dir = _worker_copy_dir(db_path) + fd, name = tempfile.mkstemp( + prefix="tocode-ida-worker-", suffix=db_path.suffix, dir=str(copy_dir) + ) os.close(fd) target = Path(name) shutil.copy2(db_path, target) return target +def _worker_copy_dir(db_path: Path) -> Path: + explicit = os.environ.get("TOCODE_WORKER_TMP_DIR", "").strip() + if explicit: + candidate = Path(explicit).expanduser() + else: + candidate = _ida_cache_root() / "workers" + try: + candidate.mkdir(parents=True, exist_ok=True) + return candidate + except OSError: + # Fall back to the directory that already holds the source database; it is + # on the same (on-disk) filesystem as the data we are copying. + return db_path.parent + + def _close_worker() -> None: global _WORKER_SESSION if _WORKER_SESSION is not None: @@ -1390,18 +1428,8 @@ def _write_metadata(context: ExportContext) -> None: root = _need(context.root) header = _need(context.header_path) names = _need(context.names) - header.write_text( - build_header(analysis, context.prototypes, names), encoding="utf-8" - ) - context.data_variable_count = export_variables(analysis, root, context.raw_ranges) - context.function_index = write_function_index(root, context.raw_ranges) - context.tree_index = ( - write_function_index( - root, context.tree_ranges, file_name="function-index-tree.json" - ) - if context.tree_enabled - else None - ) + + # Cheap cleanups of stale artifacts; not worth a progress step. if not context.tree_enabled: stale_tree_index = root / "function-index-tree.json" if stale_tree_index.exists(): @@ -1412,43 +1440,130 @@ def _write_metadata(context: ExportContext) -> None: stale = root / ("function-index-" + "ll" + "m.json") if stale.exists(): stale.unlink() - write_json(root / "sections.json", sections_json(analysis)) - write_json(root / "strings.json", strings_json(analysis, context.raw_ranges)) - write_json(root / "imports.json", imports_json(analysis)) - write_json(root / "exports.json", exports_json(analysis)) - write_json(root / "relocations.json", relocations_json(analysis)) - write_json( - root / "functions.json", - functions_json( - analysis, - context.raw_ranges, - context.prototypes, - names.functions, - tree_ranges=context.tree_ranges, + + shared: dict[str, Any] = {} + + def write_header() -> None: + header.write_text( + build_header(analysis, context.prototypes, names), encoding="utf-8" + ) + + def write_variables() -> None: + context.data_variable_count = export_variables( + analysis, root, context.raw_ranges + ) + + def write_indexes() -> None: + context.function_index = write_function_index(root, context.raw_ranges) + context.tree_index = ( + write_function_index( + root, context.tree_ranges, file_name="function-index-tree.json" + ) + if context.tree_enabled + else None + ) + + def write_functions() -> None: + write_json( + root / "functions.json", + functions_json( + analysis, + context.raw_ranges, + context.prototypes, + names.functions, + tree_ranges=context.tree_ranges, + ), + ) + + def write_reachable() -> None: + reachable = reachable_json(analysis) + shared["reachable"] = reachable + write_json(root / "reachable.json", reachable) + + def write_triage() -> None: + write_json( + root / "triage.json", + triage_json( + analysis, context.clusters, context.raw_ranges, shared["reachable"] + ), + ) + + def publish_database() -> None: + context.ida_database = publish_backend_database(context) + + def write_docs() -> None: + (root / "AGENTS.md").write_text( + build_export_agents( + analysis, context.header_name, tree_enabled=context.tree_enabled + ), + encoding="utf-8", + ) + (root / "CLAUDE.md").write_text("@./AGENTS.md\n", encoding="utf-8") + + steps: list[tuple[str, Any]] = [ + ("header", write_header), + ("data variables", write_variables), + ("function index", write_indexes), + ( + "sections.json", + lambda: write_json(root / "sections.json", sections_json(analysis)), ), - ) - reachable = reachable_json(analysis) - write_json(root / "reachable.json", reachable) - write_json( - root / "cluster-graph.json", - cluster_graph_json(analysis, context.clusters, context.raw_ranges), - ) - write_json( - root / "triage.json", - triage_json(analysis, context.clusters, context.raw_ranges, reachable), - ) - context.ida_database = publish_backend_database(context) - write_project_json(context) - (root / "AGENTS.md").write_text( - build_export_agents( - analysis, context.header_name, tree_enabled=context.tree_enabled + ( + "strings.json", + lambda: write_json( + root / "strings.json", strings_json(analysis, context.raw_ranges) + ), ), - encoding="utf-8", - ) - (root / "CLAUDE.md").write_text("@./AGENTS.md\n", encoding="utf-8") + ( + "imports.json", + lambda: write_json(root / "imports.json", imports_json(analysis)), + ), + ( + "exports.json", + lambda: write_json(root / "exports.json", exports_json(analysis)), + ), + ( + "relocations.json", + lambda: write_json(root / "relocations.json", relocations_json(analysis)), + ), + ("functions.json", write_functions), + ("reachable.json", write_reachable), + ( + "cluster-graph.json", + lambda: write_json( + root / "cluster-graph.json", + cluster_graph_json(analysis, context.clusters, context.raw_ranges), + ), + ), + ("triage.json", write_triage), + ("backend database", publish_database), + ("project.json", lambda: write_project_json(context)), + ("AGENTS.md", write_docs), + ("export-manifest.json", lambda: _set_manifest(context)), + ] + + context.progress.log(f"Writing metadata and indexes ({len(steps)} steps)") + with context.progress.bar(total=len(steps), desc="metadata", unit="step") as bar: + for label, run in steps: + _set_bar_description(bar, f"metadata: {label}") + run() + bar.update(1) + + +def _set_manifest(context: ExportContext) -> None: context.manifest = write_manifest(context) +def _set_bar_description(bar: Any, text: str) -> None: + setter = getattr(bar, "set_description", None) + if not callable(setter): + return + try: + setter(text, refresh=False) + except Exception: # noqa: BLE001 + pass + + def publish_backend_database(context: ExportContext) -> Path | None: session = getattr(context.analyzer, "session", None) database_path = getattr(session, "database_path", None) @@ -1462,11 +1577,37 @@ def publish_backend_database(context: ExportContext) -> Path | None: suffix = source.suffix.lower() target = root / f"{clean_path_component(analysis.binary.path.stem)}{suffix}" if source.resolve() != target.resolve(): - shutil.copy2(source, target) + _copy_file_with_progress( + source, target, context.progress, desc="saving database" + ) context.progress.log(f"Saved IDA database to {target}") return target.resolve() +def _copy_file_with_progress( + source: Path, target: Path, progress: Progress, *, desc: str +) -> None: + # A database (kernel `.i64`) can be multiple gigabytes; copy in chunks with a + # byte progress bar so the export does not appear stuck during the copy. + try: + size = source.stat().st_size + except OSError: + size = 0 + chunk_size = 8 * 1024 * 1024 + with ( + progress.bar(total=size, desc=desc, unit="B", unit_scale=True) as bar, + source.open("rb") as src, + target.open("wb") as dst, + ): + while True: + buffer = src.read(chunk_size) + if not buffer: + break + dst.write(buffer) + bar.update(len(buffer)) + shutil.copystat(source, target) + + def build_header( analysis: ProgramAnalysis, prototypes: dict[int, str], names: NameBook ) -> str: diff --git a/src/tocode/parallel.py b/src/tocode/parallel.py index 13c00af..86a337d 100644 --- a/src/tocode/parallel.py +++ b/src/tocode/parallel.py @@ -11,6 +11,11 @@ FUNCTIONS_PER_WORKER = 32 DEFAULT_JOB_LIMIT = 16 DEFAULT_IDA_WORKER_MEMORY_MB = 3072 +# A worker loads the whole IDA database into memory. Estimate its resident cost +# from the database size so that huge databases (kernels) do not over-subscribe +# RAM. Base covers IDA runtime + Hex-Rays; the factor covers the loaded database. +IDA_WORKER_BASE_MEMORY_MB = 768 +IDA_DB_RESIDENT_FACTOR = 1.5 def choose_jobs( @@ -23,31 +28,52 @@ def choose_jobs( job_limit: int | None = None, available_memory_mb: int | None = None, ida_worker_memory_mb: int | None = None, + database_size_mb: int | None = None, ) -> int: limit = job_limit if job_limit is not None else configured_job_limit() + is_ida = backend.lower() == "ida" + # Explicit `--jobs N` is honored as-is; the operator has opted into N workers. if requested is not None: return max(1, min(requested, function_count or 1, limit)) if function_count < MIN_FUNCTIONS_FOR_AUTO or analysis_seconds is None: return 1 - if backend.lower() != "ida" and analysis_seconds > FAST_ANALYSIS_SECONDS: + if not is_ida and analysis_seconds > FAST_ANALYSIS_SECONDS: return 1 cpus = cpu_count if cpu_count is not None else (os.cpu_count() or 1) - backend_limit = MAX_AUTO_IDA_JOBS if backend.lower() == "ida" else MAX_AUTO_JOBS + backend_limit = MAX_AUTO_IDA_JOBS if is_ida else MAX_AUTO_JOBS ceiling = min(cpus, backend_limit, limit, function_count) - if backend.lower() == "ida" and available_memory_mb is not None: - worker_memory_mb = ( - ida_worker_memory_mb - if ida_worker_memory_mb is not None - else configured_ida_worker_memory_mb() + if is_ida: + memory_ceiling = _ida_memory_ceiling( + available_memory_mb, ida_worker_memory_mb, database_size_mb ) - memory_ceiling = available_memory_mb // worker_memory_mb - ceiling = min(ceiling, max(1, memory_ceiling)) + if memory_ceiling is not None: + ceiling = min(ceiling, memory_ceiling) target = math.ceil(function_count / FUNCTIONS_PER_WORKER) return max(1, min(ceiling, target)) +def _ida_memory_ceiling( + available_memory_mb: int | None, + ida_worker_memory_mb: int | None, + database_size_mb: int | None, +) -> int | None: + if available_memory_mb is None: + return None + worker_memory_mb = ( + ida_worker_memory_mb + if ida_worker_memory_mb is not None + else configured_ida_worker_memory_mb() + ) + if database_size_mb is not None and database_size_mb > 0: + estimated = IDA_WORKER_BASE_MEMORY_MB + int( + database_size_mb * IDA_DB_RESIDENT_FACTOR + ) + worker_memory_mb = max(worker_memory_mb, estimated) + return max(1, available_memory_mb // worker_memory_mb) + + def describe_jobs( *, function_count: int, diff --git a/src/tocode/progress.py b/src/tocode/progress.py index 6a94d6f..98f145e 100644 --- a/src/tocode/progress.py +++ b/src/tocode/progress.py @@ -38,13 +38,20 @@ def log(self, message: str) -> None: print(message, file=sys.stderr) @contextmanager - def bar(self, *, total: int, desc: str, unit: str) -> Iterator[ProgressBar]: + def bar( + self, *, total: int, desc: str, unit: str, unit_scale: bool = False + ) -> Iterator[ProgressBar]: tqdm_factory: Any = _tqdm if not self.enabled or tqdm_factory is None: yield _NullProgress() return bar = tqdm_factory( - total=total, desc=desc, unit=unit, leave=False, dynamic_ncols=True + total=total, + desc=desc, + unit=unit, + unit_scale=unit_scale, + leave=False, + dynamic_ncols=True, ) try: yield bar diff --git a/tests/test_algorithms.py b/tests/test_algorithms.py index a2adf91..c136486 100644 --- a/tests/test_algorithms.py +++ b/tests/test_algorithms.py @@ -56,6 +56,43 @@ def test_choose_jobs_limits_auto_ida_parallelism_by_available_memory() -> None: ) +def test_choose_jobs_auto_ida_budget_scales_with_database_size() -> None: + # A large database (e.g. a kernel `.i64`) inflates the per-worker memory + # estimate so the auto budget falls back to a single worker even when the + # default flat estimate would have allowed more. + assert ( + choose_jobs( + function_count=18000, + analysis_seconds=30.0, + requested=None, + backend="ida", + cpu_count=32, + job_limit=64, + available_memory_mb=6800, + ida_worker_memory_mb=3072, + database_size_mb=4000, + ) + == 1 + ) + + +def test_choose_jobs_auto_ida_budget_allows_parallel_for_small_database() -> None: + assert ( + choose_jobs( + function_count=18000, + analysis_seconds=30.0, + requested=None, + backend="ida", + cpu_count=32, + job_limit=64, + available_memory_mb=64000, + ida_worker_memory_mb=3072, + database_size_mb=200, + ) + == 2 + ) + + def test_requested_jobs_are_not_limited_by_available_memory() -> None: assert ( choose_jobs(