diff --git a/packages/core/src/repowise/core/analysis/health/duplication/detector.py b/packages/core/src/repowise/core/analysis/health/duplication/detector.py index 105cecf92..9b64bc460 100644 --- a/packages/core/src/repowise/core/analysis/health/duplication/detector.py +++ b/packages/core/src/repowise/core/analysis/health/duplication/detector.py @@ -198,6 +198,7 @@ def detect_clones( min_lines: int = DEFAULT_MIN_LINES, limits: DuplicationLimits | None = None, cache_dir: Path | None = None, + changed_files: set[str] | None = None, ) -> DuplicationReport: """Run the duplication pipeline over the supplied parsed files. @@ -214,6 +215,12 @@ def detect_clones( Every stage is bounded by :class:`~.limits.DuplicationLimits` so no repo shape (minified bundles, generated tables) can wedge the run — see issue #341. + + When *changed_files* is given (incremental ``repowise update`` runs) + and a persisted pair index from a previous run validates, the raw + pair multiset is spliced instead of recomputed: only hash buckets + touched by a changed/deleted file are re-verified. Any validity miss + falls back to this full pipeline, which rewrites the artifact. """ meta_map = git_meta_map or {} lim = limits or DuplicationLimits() @@ -226,8 +233,32 @@ def detect_clones( cache = DuplicationTokenCache(cache_dir, window_tokens) cache.load() - per_file_kinds, per_file_nloc, all_windows = _collect_windows( - parsed_files, window_tokens, lim, diag, cache + parsed_list = list(parsed_files) + + if changed_files is not None and cache is not None: + from .pair_index import load_pair_index + + index = load_pair_index(cache_dir, window_tokens, lim) + if index is not None: + report = _detect_clones_incremental( + parsed_list, + meta_map, + set(changed_files), + window_tokens, + min_lines, + lim, + diag, + cache, + index, + cache_dir, + ) + if report is not None: + return report + # Fall through to the full pipeline; it refreshes the artifact. + diag = DuplicationDiagnostics() + + per_file_kinds, per_file_nloc, all_windows, per_file_hash = _collect_windows( + parsed_list, window_tokens, lim, diag, cache ) if cache is not None: cache.save() @@ -242,6 +273,19 @@ def detect_clones( bucket = index_by_hash(all_windows) raw_pairs = _pairs_from_buckets(bucket, per_file_kinds, window_tokens, lim, diag) + if cache is not None and cache_dir is not None: + all_paths = {pf.file_info.path for pf in parsed_list} + _persist_pair_index( + cache_dir, + window_tokens, + lim, + diag, + per_file_kinds, + per_file_hash, + raw_pairs, + all_paths, + ) + final = _finalize_pairs(_merge_adjacent_pairs(raw_pairs), min_lines, meta_map) pairs_by_file, duplication_pct = _aggregate(final, per_file_nloc) @@ -264,7 +308,7 @@ def _collect_windows( limits: DuplicationLimits, diag: DuplicationDiagnostics, cache: Any | None = None, -) -> tuple[dict[str, list[str]], dict[str, int], list[WindowHash]]: +) -> tuple[dict[str, list[str]], dict[str, int], list[WindowHash], dict[str, str]]: """Tokenize each file once and emit its rolling-hash windows. Files are dropped (and counted in *diag*) when they are unreadable, @@ -275,13 +319,16 @@ def _collect_windows( When a :class:`~.token_cache.DuplicationTokenCache` is supplied, unchanged files (by content hash) skip the tokenize + rolling-hash work and replay their cached kind sequence and window tuples; every - gate above still re-evaluates live against the cached lengths. + gate above still re-evaluates live against the cached lengths. The + returned hash map (path -> content hash, gate survivors only) feeds + the persisted pair index; it is empty when no cache is supplied. """ import hashlib per_file_kinds: dict[str, list[str]] = {} per_file_nloc: dict[str, int] = {} all_windows: list[WindowHash] = [] + per_file_hash: dict[str, str] = {} for pf in parsed_files: check_cancelled() @@ -344,10 +391,12 @@ def _collect_windows( per_file_kinds[path] = kinds per_file_nloc[path] = nloc all_windows.extend(windows) + if content_hash: + per_file_hash[path] = content_hash diag.files_tokenized += 1 diag.total_windows = len(all_windows) - return per_file_kinds, per_file_nloc, all_windows + return per_file_kinds, per_file_nloc, all_windows, per_file_hash # --------------------------------------------------------------------------- @@ -434,6 +483,309 @@ def _verify_bucket( ) +# --------------------------------------------------------------------------- +# Incremental splice (update path) +# --------------------------------------------------------------------------- + +# Raw pairs identified by line geometry; multiplicity preserved via Counter +# because the merge stage accumulates token_count per merged region. +_PairKey = tuple[str, str, int, int, int, int] + +# An incremental run is only worth it when few files moved — past this +# fraction the full pipeline's flat cost wins (and stays simpler). The +# floor keeps typical small commits incremental even on small repos, +# where a handful of files is a large fraction but splicing is correct +# and still cheap. +_MAX_CHANGED_FRACTION = 0.2 +_CHANGED_COUNT_FLOOR = 16 + + +def _detect_clones_incremental( + parsed_files: list[Any], + meta_map: dict[str, dict[str, Any]], + changed_files: set[str], + window_tokens: int, + min_lines: int, + lim: DuplicationLimits, + diag: DuplicationDiagnostics, + cache: Any, + index: Any, + cache_dir: Path, +) -> DuplicationReport | None: + """Splice the persisted raw-pair multiset instead of recomputing it. + + Equivalence argument: raw pairs are a pure function of the gate- + surviving window set, and each (window, window) pair lives in exactly + one hash bucket. For every bucket whose membership changes (a hash + seen in a changed/deleted file's old windows or a changed file's new + windows), subtract the bucket's old contribution and add its new one + — both recomputed deterministically, including the degenerate-bucket + cap on the bucket's *full* membership, so cap transitions in either + direction are handled uniformly. Untouched buckets keep their pairs + verbatim. Finalize (merge, min-lines, co-change weighting) always + runs live against the current ``git_meta_map``. + + Returns ``None`` whenever the persisted state cannot be spliced + safely (truncated/timed-out runs, missing cache entries, too many + changes, accounting mismatch); the caller falls back to the full + pipeline. + """ + from collections import Counter + + if not index.spliceable: + return None + + current = {pf.file_info.path: pf for pf in parsed_files} + old_files: dict[str, str] = index.files + # Files the previous run considered but gated out are not "new"; + # unchanged ones stay gated (same bytes, same gates), changed ones + # arrive via changed_files and re-evaluate the gates live. + new_paths = set(current) - set(old_files) - index.nonsurvivors + deleted = set(old_files) - set(current) + changed = (changed_files & set(current)) | new_paths + moved = len(changed) + len(deleted) + if moved > max(_CHANGED_COUNT_FLOOR, _MAX_CHANGED_FRACTION * len(current)): + return None + # Paths whose OLD windows leave the state (modified or deleted). + affected_old = (changed | deleted) & set(old_files) + unchanged = set(old_files) - affected_old - deleted + + # 1. Collect the changed files live (read, gates, tokenize, windows). + # Sorted for deterministic window-budget behaviour. + changed_pfs = [current[p] for p in sorted(changed)] + new_kinds, new_nloc, new_windows, new_hash = _collect_windows( + changed_pfs, window_tokens, lim, diag, cache + ) + if diag.window_budget_hit: + return None + + # 2. Old windows + kinds of affected files, from the token cache. + old_aff: dict[str, tuple[list[str], list[tuple[int, int, int, int]]]] = {} + for p in affected_old: + entry = cache.entry(old_files[p]) + if entry is None: + return None + kinds, _nloc, window_tuples = entry + old_aff[p] = (kinds, window_tuples) + + # 3. Window-budget equivalence: splicing is only valid when the full + # pipeline would not truncate either state. + n_old_aff = sum(len(wt) for _k, wt in old_aff.values()) + if index.total_windows - n_old_aff + len(new_windows) > lim.max_total_windows: + return None + + # 4. Touched buckets = hashes present in moving windows (old or new). + touched: set[int] = {w.hash_value for w in new_windows} + for _kinds, window_tuples in old_aff.values(): + touched.update(row[0] for row in window_tuples) + + # 5. One pass over the unchanged files' cached windows builds the + # touched buckets' membership; rows of unchanged files belong to + # both the old and the new bucket composition. Kinds maps are + # split because a modified file verifies with its old kinds on + # the old side and its new kinds on the new side. + old_rows: dict[int, list[WindowHash]] = defaultdict(list) + new_rows: dict[int, list[WindowHash]] = defaultdict(list) + old_kinds_map: dict[str, list[str]] = {} + new_kinds_map: dict[str, list[str]] = dict(new_kinds) + per_file_nloc: dict[str, int] = dict(new_nloc) + for p in unchanged: + check_cancelled() + h = old_files[p] + entry = cache.entry(h) + if entry is None: + return None + kinds, nloc, window_tuples = entry + per_file_nloc[p] = nloc + hit = False + for row in window_tuples: + if row[0] in touched: + w = WindowHash( + file_path=p, + hash_value=row[0], + start_index=row[1], + start_line=row[2], + end_line=row[3], + ) + old_rows[row[0]].append(w) + new_rows[row[0]].append(w) + hit = True + if hit: + old_kinds_map[p] = kinds + new_kinds_map[p] = kinds + cache.retain(h) + + for p, (kinds, window_tuples) in old_aff.items(): + for row in window_tuples: + if row[0] in touched: + old_rows[row[0]].append( + WindowHash( + file_path=p, + hash_value=row[0], + start_index=row[1], + start_line=row[2], + end_line=row[3], + ) + ) + old_kinds_map[p] = kinds + for w in new_windows: + new_rows[w.hash_value].append(w) + + # 6. Per-bucket contributions, old and new. + old_contrib = _bucket_contributions(old_rows, old_kinds_map, window_tokens, lim, None) + new_contrib = _bucket_contributions(new_rows, new_kinds_map, window_tokens, lim, diag) + + # 7. Splice the multiset. A negative count means the persisted state + # disagrees with the recomputed old contribution — fall back. + paths_table: list[str] = index.paths + pair_counter: Counter[_PairKey] = Counter() + for pid_a, pid_b, a_sl, a_el, b_sl, b_el, count in index.pairs: + pair_counter[(paths_table[pid_a], paths_table[pid_b], a_sl, a_el, b_sl, b_el)] += count + pair_counter.subtract(Counter(_pair_key(p) for p in old_contrib)) + if -pair_counter: # truthy when any count went negative + return None + pair_counter.update(_pair_key(p) for p in new_contrib) + + # One ClonePair per distinct key carrying the multiplicity in its + # token_count: identical raw pairs are guaranteed to merge with each + # other (same files, same starts), and merging sums token_count, so + # this is exactly what expanding the multiset would produce. + raw_pairs = [ + ClonePair( + file_a=key[0], + file_b=key[1], + a_start_line=key[2], + a_end_line=key[3], + b_start_line=key[4], + b_end_line=key[5], + token_count=window_tokens * count, + ) + for key, count in pair_counter.items() + if count > 0 + ] + + # 8. Persist the spliced state and the retained token cache before + # finalizing, mirroring the full path's ordering. + from .pair_index import DuplicationPairIndex, limits_fingerprint, save_pair_index + + new_files = {p: old_files[p] for p in unchanged} + new_files.update(new_hash) + new_paths_table = sorted(new_files) + pid = {p: i for i, p in enumerate(new_paths_table)} + save_pair_index( + cache_dir, + DuplicationPairIndex( + window_tokens=window_tokens, + limits_key=limits_fingerprint(lim), + files=new_files, + nonsurvivors=set(current) - set(new_files), + paths=new_paths_table, + pairs=[ + (pid[key[0]], pid[key[1]], key[2], key[3], key[4], key[5], count) + for key, count in pair_counter.items() + if count > 0 + ], + total_windows=index.total_windows - n_old_aff + len(new_windows), + ), + ) + cache.save() + log.debug( + "duplication_incremental_splice", + changed=len(changed), + deleted=len(deleted), + touched_buckets=len(touched), + pairs=len(raw_pairs), + ) + + final = _finalize_pairs(_merge_adjacent_pairs(raw_pairs), min_lines, meta_map) + pairs_by_file, duplication_pct = _aggregate(final, per_file_nloc) + + diagnostics = diag.as_log_fields() + diagnostics["incremental"] = True + return DuplicationReport( + pairs=final, + duplication_pct=duplication_pct, + pairs_by_file=pairs_by_file, + diagnostics=diagnostics, + ) + + +def _pair_key(p: ClonePair) -> _PairKey: + return (p.file_a, p.file_b, p.a_start_line, p.a_end_line, p.b_start_line, p.b_end_line) + + +def _bucket_contributions( + rows_by_hash: dict[int, list[WindowHash]], + kinds_map: dict[str, list[str]], + window_tokens: int, + lim: DuplicationLimits, + diag: DuplicationDiagnostics | None, +) -> list[ClonePair]: + """Verify the touched buckets exactly as the full pipeline would. + + The degenerate-bucket cap applies to each bucket's full membership + (rows here cover it: every window with a touched hash was gathered), + so a bucket crossing the cap in either direction contributes pairs on + exactly one side of the splice. The shared ``seen`` set mirrors the + full pipeline's; (file, start_index) pairs are unique to one bucket, + so per-run scoping is equivalent. + """ + out: list[ClonePair] = [] + seen: set[tuple[str, int, str, int]] = set() + for rows in rows_by_hash.values(): + if len(rows) < 2: + continue + if len(rows) > lim.max_bucket_windows: + if diag is not None: + diag.degenerate_buckets += 1 + continue + check_cancelled() + _verify_bucket(rows, kinds_map, window_tokens, seen, out) + return out + + +def _persist_pair_index( + cache_dir: Path, + window_tokens: int, + lim: DuplicationLimits, + diag: DuplicationDiagnostics, + per_file_kinds: dict[str, list[str]], + per_file_hash: dict[str, str], + raw_pairs: list[ClonePair], + all_paths: set[str], +) -> None: + """Persist the full run's raw pairs for the next incremental splice.""" + from collections import Counter + + from .pair_index import DuplicationPairIndex, limits_fingerprint, save_pair_index + + files = {p: per_file_hash[p] for p in per_file_kinds if p in per_file_hash} + if len(files) != len(per_file_kinds): + # A gate survivor without a content hash should be impossible + # when the cache is active; don't persist a state we can't trust. + return + paths_table = sorted(files) + pid = {p: i for i, p in enumerate(paths_table)} + pair_counter = Counter(_pair_key(p) for p in raw_pairs) + save_pair_index( + cache_dir, + DuplicationPairIndex( + window_tokens=window_tokens, + limits_key=limits_fingerprint(lim), + files=files, + nonsurvivors=all_paths - set(files), + paths=paths_table, + pairs=[ + (pid[key[0]], pid[key[1]], key[2], key[3], key[4], key[5], count) + for key, count in pair_counter.items() + ], + total_windows=diag.total_windows, + window_budget_hit=diag.window_budget_hit, + timed_out=diag.timed_out, + ), + ) + + # --------------------------------------------------------------------------- # Stage 3 — finalize + roll up # --------------------------------------------------------------------------- diff --git a/packages/core/src/repowise/core/analysis/health/duplication/pair_index.py b/packages/core/src/repowise/core/analysis/health/duplication/pair_index.py new file mode 100644 index 000000000..d61b1f165 --- /dev/null +++ b/packages/core/src/repowise/core/analysis/health/duplication/pair_index.py @@ -0,0 +1,151 @@ +"""Persisted clone-pair index for incremental duplication runs. + +A full duplication pass re-derives the repo-wide raw pair set from +scratch even when one file changed, because ``duplication_pct`` is +repo-wide. The raw pairs are a pure function of (file bytes, window +size, limits): pairs between unchanged files cannot change. Persisting +them lets an incremental run splice the pair multiset instead -- drop +the contributions of buckets a changed file touches, re-verify only +those buckets, and keep everything else verbatim. + +The artifact stores: + +* ``files`` -- path -> content hash for every file that contributed + windows (the detector's gate survivors). Used to detect deletions, + to fetch unchanged files' cached token streams, and to keep their + token-cache entries alive across incremental runs. +* ``pairs`` -- the raw (pre-merge) pair multiset as compact path-id + rows. Multiset, not set: ``_merge_adjacent_pairs`` accumulates + ``token_count`` per merged pair, so multiplicity matters. +* ``total_windows`` plus the guard flags, so the incremental path can + re-evaluate the repo-wide window budget and refuse to splice a + truncated state. + +Validity is keyed on (version, window size, limits fingerprint); any +mismatch -- or any load/save error -- degrades to a full re-detect, +which rewrites the artifact. Best-effort by design, like the token +cache next to it. +""" + +from __future__ import annotations + +import contextlib +import os +import pickle +import tempfile +from dataclasses import dataclass, field +from pathlib import Path + +import structlog + +from .limits import DuplicationLimits + +log = structlog.get_logger(__name__) + +_INDEX_VERSION = 1 +_INDEX_FILENAME = "duplication_pairs.pkl" + +# One raw-pair multiset entry as a path-id row: +# (pid_a, pid_b, a_start, a_end, b_start, b_end, count). +# Counts compress the multiset: identical raw pairs (same files and line +# geometry) always merge together downstream, so multiplicity is all the +# merge stage needs from them. +PairRow = tuple[int, int, int, int, int, int, int] + + +def limits_fingerprint(limits: DuplicationLimits) -> tuple: + """The limit fields that change which pairs a full run emits.""" + return ( + limits.minified_avg_line_bytes, + limits.minified_max_line_bytes, + limits.max_tokens_per_file, + limits.max_total_windows, + limits.max_bucket_windows, + ) + + +@dataclass +class DuplicationPairIndex: + """In-memory form of one persisted pair-index artifact.""" + + window_tokens: int + limits_key: tuple + files: dict[str, str] = field(default_factory=dict) # path -> content hash + # Paths the detector considered but that contributed no windows + # (minified, too small, over the token cap, unreadable). Tracked so + # an incremental run doesn't mistake them for new files every time. + nonsurvivors: set[str] = field(default_factory=set) + paths: list[str] = field(default_factory=list) + pairs: list[PairRow] = field(default_factory=list) + total_windows: int = 0 + window_budget_hit: bool = False + timed_out: bool = False + + @property + def spliceable(self) -> bool: + """A truncated or deadline-cut state cannot be spliced safely.""" + return not (self.window_budget_hit or self.timed_out) + + +def load_pair_index( + cache_dir: Path, + window_tokens: int, + limits: DuplicationLimits, +) -> DuplicationPairIndex | None: + """Load and validate the artifact; ``None`` on any mismatch/error.""" + path = Path(cache_dir) / _INDEX_FILENAME + try: + with path.open("rb") as fh: + payload = pickle.load(fh) + if ( + payload.get("version") != _INDEX_VERSION + or payload.get("window_tokens") != window_tokens + or tuple(payload.get("limits_key", ())) != limits_fingerprint(limits) + ): + return None + return DuplicationPairIndex( + window_tokens=window_tokens, + limits_key=limits_fingerprint(limits), + files=payload["files"], + nonsurvivors=payload["nonsurvivors"], + paths=payload["paths"], + pairs=payload["pairs"], + total_windows=payload["total_windows"], + window_budget_hit=payload["window_budget_hit"], + timed_out=payload["timed_out"], + ) + except FileNotFoundError: + return None + except Exception as exc: # corrupt / unreadable -> full re-detect + log.debug("duplication_pair_index_load_failed", error=str(exc)) + return None + + +def save_pair_index(cache_dir: Path, index: DuplicationPairIndex) -> None: + """Atomically persist *index*; failures degrade to a future full run.""" + path = Path(cache_dir) / _INDEX_FILENAME + try: + path.parent.mkdir(parents=True, exist_ok=True) + payload = { + "version": _INDEX_VERSION, + "window_tokens": index.window_tokens, + "limits_key": index.limits_key, + "files": index.files, + "nonsurvivors": index.nonsurvivors, + "paths": index.paths, + "pairs": index.pairs, + "total_windows": index.total_windows, + "window_budget_hit": index.window_budget_hit, + "timed_out": index.timed_out, + } + fd, tmp_name = tempfile.mkstemp(dir=str(path.parent), prefix=_INDEX_FILENAME, suffix=".tmp") + try: + with os.fdopen(fd, "wb") as fh: + pickle.dump(payload, fh, protocol=pickle.HIGHEST_PROTOCOL) + os.replace(tmp_name, path) + except BaseException: + with contextlib.suppress(OSError): + os.unlink(tmp_name) + raise + except Exception as exc: + log.debug("duplication_pair_index_save_failed", error=str(exc)) diff --git a/packages/core/src/repowise/core/analysis/health/duplication/token_cache.py b/packages/core/src/repowise/core/analysis/health/duplication/token_cache.py index ec2a92264..fc6f4ce85 100644 --- a/packages/core/src/repowise/core/analysis/health/duplication/token_cache.py +++ b/packages/core/src/repowise/core/analysis/health/duplication/token_cache.py @@ -101,6 +101,30 @@ def get( self._fresh[content_hash] = entry return entry + def entry( + self, content_hash: str + ) -> tuple[list[str], int, list[tuple[int, int, int, int]]] | None: + """Read an entry without touching hit/miss stats or freshness. + + The incremental pair-splice path reads unchanged files' cached + tuples through this so its lookups don't skew the cache-hit + telemetry that ``get`` feeds. + """ + return self._entries.get(content_hash) + + def retain(self, content_hash: str) -> bool: + """Mark an entry as live so ``save`` keeps it. + + ``save`` rewrites the cache with only the entries touched this + run; an incremental run that never reads unchanged files would + otherwise evict them. Returns False when the hash is unknown. + """ + entry = self._entries.get(content_hash) + if entry is None: + return False + self._fresh[content_hash] = entry + return True + def put( self, content_hash: str, diff --git a/packages/core/src/repowise/core/analysis/health/engine.py b/packages/core/src/repowise/core/analysis/health/engine.py index 91fb8d5cb..0b4299ae1 100644 --- a/packages/core/src/repowise/core/analysis/health/engine.py +++ b/packages/core/src/repowise/core/analysis/health/engine.py @@ -319,8 +319,9 @@ def analyze( # Duplication runs once, up-front, so each file biomarker can see # its clone list. Cheap when the repo is small; when disabled # explicitly we skip the work entirely. Even for incremental - # runs we keep the full-repo scan: a changed file's clone partners - # may be unchanged files we still need to compare against. + # runs the result stays repo-wide: a changed file's clone partners + # may be unchanged files — passing changed_files lets the detector + # splice its persisted pair index instead of recomputing it all. if "dry_violation" in disabled: dup_report = DuplicationReport() else: @@ -329,6 +330,7 @@ def analyze( self.parsed_files, self.git_meta_map, cache_dir=self.duplication_cache_dir, + changed_files=changed_set, ) _log_duplication_diagnostics(dup_report) except Exception as exc: @@ -449,6 +451,7 @@ async def analyze_async( self.parsed_files, self.git_meta_map, cache_dir=self.duplication_cache_dir, + changed_files=changed_set, ) ) diff --git a/tests/unit/health/test_duplication_incremental.py b/tests/unit/health/test_duplication_incremental.py new file mode 100644 index 000000000..1cb8cfc90 --- /dev/null +++ b/tests/unit/health/test_duplication_incremental.py @@ -0,0 +1,388 @@ +"""Incremental duplication splice must reproduce the full recompute exactly. + +Oracle pattern: every scenario seeds a repo, runs a full cached pass +(persisting the pair index), mutates the tree, then asserts the +incremental run (``changed_files=...``) equals a fresh full recompute +of the mutated tree — pairs as a multiset including token_count (the +merge stage accumulates it, so multiplicity drift would surface there), +plus duplication_pct and pairs_by_file. +""" + +from __future__ import annotations + +from collections import Counter +from pathlib import Path +from types import SimpleNamespace + +from repowise.core.analysis.health.duplication import detect_clones +from repowise.core.analysis.health.duplication.limits import DuplicationLimits +from repowise.core.analysis.health.duplication.pair_index import ( + _INDEX_FILENAME, + load_pair_index, +) + +WINDOW = 20 +MIN_LINES = 4 +# Deterministic limits: no wall-clock deadline in tests. +LIMITS = DuplicationLimits(time_budget_secs=0) + + +def _pf(tmp_path: Path, rel: str) -> SimpleNamespace: + return SimpleNamespace( + file_info=SimpleNamespace(path=rel, abs_path=str(tmp_path / rel), language="python"), + symbols=[], + ) + + +_BODY = "\n".join( + [ + "def doit(x, y, z):", + " if x:", + " a = x + y", + " else:", + " a = x - y", + " if z:", + " b = a * 2", + " else:", + " b = a - 1", + " return a + b + x + y + z", + "", + ] +) + +_OTHER = "\n".join( + [ + "def other(p, q):", + " total = 0", + " for i in range(p):", + " if i % 2:", + " total += i * q", + " else:", + " total -= i + q", + " return total", + "", + ] +) + + +def _write(tmp_path: Path, files: dict[str, str]) -> list[SimpleNamespace]: + for rel, body in files.items(): + (tmp_path / rel).write_text(body) + return [_pf(tmp_path, rel) for rel in sorted(files)] + + +def _parsed(tmp_path: Path) -> list[SimpleNamespace]: + return [_pf(tmp_path, p.name) for p in sorted(tmp_path.glob("*.py"))] + + +def _key(report): + return ( + sorted( + Counter( + ( + p.file_a, + p.file_b, + p.a_start_line, + p.a_end_line, + p.b_start_line, + p.b_end_line, + p.token_count, + p.co_change_count, + ) + for p in report.pairs + ).items() + ), + report.duplication_pct, + {f: len(ps) for f, ps in report.pairs_by_file.items()}, + ) + + +def _full(parsed, limits=LIMITS): + """Fresh full recompute, no cache — the oracle.""" + return detect_clones(parsed, window_tokens=WINDOW, min_lines=MIN_LINES, limits=limits) + + +def _incremental(parsed, cache_dir, changed, limits=LIMITS): + return detect_clones( + parsed, + window_tokens=WINDOW, + min_lines=MIN_LINES, + limits=limits, + cache_dir=cache_dir, + changed_files=set(changed), + ) + + +def _seed(tmp_path: Path, files: dict[str, str], limits=LIMITS): + """Initial full cached run; persists token cache + pair index.""" + parsed = _write(tmp_path, files) + cache_dir = tmp_path / ".repowise" + detect_clones( + parsed, window_tokens=WINDOW, min_lines=MIN_LINES, limits=limits, cache_dir=cache_dir + ) + assert (cache_dir / _INDEX_FILENAME).exists() + return cache_dir + + +def test_modify_clone_member(tmp_path: Path): + cache_dir = _seed(tmp_path, {"a.py": _BODY, "b.py": _BODY, "c.py": _OTHER}) + (tmp_path / "a.py").write_text(_OTHER.replace("other", "mutated")) + + parsed = _parsed(tmp_path) + report = _incremental(parsed, cache_dir, {"a.py"}) + assert report.diagnostics.get("incremental") is True + assert _key(report) == _key(_full(parsed)) + + +def test_add_new_clone_file(tmp_path: Path): + cache_dir = _seed(tmp_path, {"a.py": _BODY, "c.py": _OTHER}) + (tmp_path / "d.py").write_text(_BODY.replace("doit", "added")) + + parsed = _parsed(tmp_path) + report = _incremental(parsed, cache_dir, {"d.py"}) + assert report.diagnostics.get("incremental") is True + assert any({p.file_a, p.file_b} == {"a.py", "d.py"} for p in report.pairs) + assert _key(report) == _key(_full(parsed)) + + +def test_delete_clone_file(tmp_path: Path): + cache_dir = _seed(tmp_path, {"a.py": _BODY, "b.py": _BODY, "c.py": _OTHER}) + (tmp_path / "b.py").unlink() + + parsed = _parsed(tmp_path) + # Deletions arrive via the parsed set shrinking, not changed_files. + report = _incremental(parsed, cache_dir, set()) + assert report.diagnostics.get("incremental") is True + assert not report.pairs + assert _key(report) == _key(_full(parsed)) + + +def test_rename_clone_file(tmp_path: Path): + cache_dir = _seed(tmp_path, {"a.py": _BODY, "b.py": _BODY}) + (tmp_path / "b.py").unlink() + (tmp_path / "renamed.py").write_text(_BODY) + + parsed = _parsed(tmp_path) + report = _incremental(parsed, cache_dir, {"renamed.py"}) + assert report.diagnostics.get("incremental") is True + assert any({p.file_a, p.file_b} == {"a.py", "renamed.py"} for p in report.pairs) + assert _key(report) == _key(_full(parsed)) + + +def test_noop_change_listed_as_changed(tmp_path: Path): + files = {"a.py": _BODY, "b.py": _BODY, "c.py": _OTHER} + cache_dir = _seed(tmp_path, files) + (tmp_path / "a.py").write_text(_BODY) # rewrite identical content + + parsed = _parsed(tmp_path) + report = _incremental(parsed, cache_dir, {"a.py"}) + assert report.diagnostics.get("incremental") is True + assert _key(report) == _key(_full(parsed)) + + +def test_intra_file_duplication_in_changed_file(tmp_path: Path): + cache_dir = _seed(tmp_path, {"a.py": _BODY, "c.py": _OTHER}) + intra = _BODY + "\n" + _BODY.replace("doit", "again") + (tmp_path / "a.py").write_text(intra) + + parsed = _parsed(tmp_path) + report = _incremental(parsed, cache_dir, {"a.py"}) + assert report.diagnostics.get("incremental") is True + assert any(p.is_intra_file for p in report.pairs) + assert _key(report) == _key(_full(parsed)) + + +def test_degenerate_bucket_shrinks_below_cap(tmp_path: Path): + """Removing a member can revive a previously capped bucket's pairs.""" + lim = DuplicationLimits(time_budget_secs=0, max_bucket_windows=3) + files = {f"{n}.py": _BODY for n in "abcd"} # buckets of 4 > cap of 3 + cache_dir = _seed(tmp_path, files, limits=lim) + baseline = load_pair_index(cache_dir, WINDOW, lim) + assert baseline is not None and not baseline.pairs # all degenerate + + (tmp_path / "d.py").unlink() # buckets drop to 3 == cap -> pairs emerge + parsed = _parsed(tmp_path) + report = _incremental(parsed, cache_dir, set(), limits=lim) + assert report.diagnostics.get("incremental") is True + assert report.pairs + assert _key(report) == _key(_full(parsed, limits=lim)) + + +def test_degenerate_bucket_grows_past_cap(tmp_path: Path): + """Adding a member can cap a bucket, removing unchanged-pair output.""" + lim = DuplicationLimits(time_budget_secs=0, max_bucket_windows=3) + files = {f"{n}.py": _BODY for n in "abc"} # buckets of 3 == cap -> pairs + cache_dir = _seed(tmp_path, files, limits=lim) + baseline = load_pair_index(cache_dir, WINDOW, lim) + assert baseline is not None and baseline.pairs + + (tmp_path / "d.py").write_text(_BODY) # buckets of 4 > cap + parsed = _parsed(tmp_path) + report = _incremental(parsed, cache_dir, {"d.py"}, limits=lim) + assert report.diagnostics.get("incremental") is True + assert not report.pairs + assert _key(report) == _key(_full(parsed, limits=lim)) + + +def test_chained_incremental_updates(tmp_path: Path): + """The artifact rewritten by one splice must support the next.""" + cache_dir = _seed(tmp_path, {"a.py": _BODY, "b.py": _BODY, "c.py": _OTHER}) + + (tmp_path / "d.py").write_text(_BODY.replace("doit", "fourth")) + parsed = _parsed(tmp_path) + first = _incremental(parsed, cache_dir, {"d.py"}) + assert first.diagnostics.get("incremental") is True + assert _key(first) == _key(_full(parsed)) + + (tmp_path / "b.py").write_text(_OTHER.replace("other", "swapped")) + parsed = _parsed(tmp_path) + second = _incremental(parsed, cache_dir, {"b.py"}) + assert second.diagnostics.get("incremental") is True + assert _key(second) == _key(_full(parsed)) + + +def test_gated_files_are_not_treated_as_new(tmp_path: Path, monkeypatch): + """Files the seed run gated out (too small to window) must not count + toward the changed-files guard on every later run.""" + from repowise.core.analysis.health.duplication import detector + + monkeypatch.setattr(detector, "_CHANGED_COUNT_FLOOR", 1) + files = {"a.py": _BODY, "b.py": _BODY} + files.update({f"tiny{i}.py": f"x = {i}\n" for i in range(6)}) + cache_dir = _seed(tmp_path, files) + idx = load_pair_index(cache_dir, WINDOW, LIMITS) + assert idx is not None and len(idx.nonsurvivors) == 6 + + (tmp_path / "a.py").write_text(_BODY.replace("doit", "edited")) + parsed = _parsed(tmp_path) + # Only 1 real change; with the tiny files miscounted as new this + # would exceed the floor of 1 and fall back. + report = _incremental(parsed, cache_dir, {"a.py"}) + assert report.diagnostics.get("incremental") is True + assert _key(report) == _key(_full(parsed)) + + +def test_gated_file_growing_into_survivor(tmp_path: Path): + """A previously gated file that changes into real content joins in.""" + cache_dir = _seed(tmp_path, {"a.py": _BODY, "tiny.py": "x = 1\n"}) + (tmp_path / "tiny.py").write_text(_BODY.replace("doit", "grown")) + + parsed = _parsed(tmp_path) + report = _incremental(parsed, cache_dir, {"tiny.py"}) + assert report.diagnostics.get("incremental") is True + assert any({p.file_a, p.file_b} == {"a.py", "tiny.py"} for p in report.pairs) + assert _key(report) == _key(_full(parsed)) + + +def test_too_many_changes_falls_back_to_full(tmp_path: Path, monkeypatch): + from repowise.core.analysis.health.duplication import detector + + monkeypatch.setattr(detector, "_CHANGED_COUNT_FLOOR", 0) + files = {f"f{i}.py": _BODY.replace("doit", f"fn{i}") for i in range(5)} + cache_dir = _seed(tmp_path, files) + for i in range(3): # 3 of 5 changed > 20% threshold + (tmp_path / f"f{i}.py").write_text(_OTHER.replace("other", f"fn{i}")) + + parsed = _parsed(tmp_path) + report = _incremental(parsed, cache_dir, {"f0.py", "f1.py", "f2.py"}) + assert "incremental" not in report.diagnostics + assert _key(report) == _key(_full(parsed)) + + +def test_limits_change_invalidates_artifact(tmp_path: Path): + cache_dir = _seed(tmp_path, {"a.py": _BODY, "b.py": _BODY}) + other_limits = DuplicationLimits(time_budget_secs=0, max_bucket_windows=128) + assert load_pair_index(cache_dir, WINDOW, other_limits) is None + + (tmp_path / "a.py").write_text(_BODY.replace("doit", "edited")) + parsed = _parsed(tmp_path) + report = _incremental(parsed, cache_dir, {"a.py"}, limits=other_limits) + assert "incremental" not in report.diagnostics + assert _key(report) == _key(_full(parsed, limits=other_limits)) + + +def test_missing_artifact_falls_back(tmp_path: Path): + cache_dir = _seed(tmp_path, {"a.py": _BODY, "b.py": _BODY}) + (cache_dir / _INDEX_FILENAME).unlink() + + parsed = _parsed(tmp_path) + report = _incremental(parsed, cache_dir, {"a.py"}) + assert "incremental" not in report.diagnostics + assert _key(report) == _key(_full(parsed)) + # The fallback full run re-persists the artifact for next time. + assert (cache_dir / _INDEX_FILENAME).exists() + + +def test_corrupt_artifact_falls_back(tmp_path: Path): + cache_dir = _seed(tmp_path, {"a.py": _BODY, "b.py": _BODY}) + (cache_dir / _INDEX_FILENAME).write_bytes(b"not a pickle") + + parsed = _parsed(tmp_path) + report = _incremental(parsed, cache_dir, {"a.py"}) + assert "incremental" not in report.diagnostics + assert _key(report) == _key(_full(parsed)) + + +def test_truncated_state_refuses_to_splice(tmp_path: Path): + """A window-budget-hit run persists a flag that blocks splicing.""" + # Budget admits the first file's windows but trips on the second, so + # the run persists a truncated state (an empty-window run would + # return before persisting anything). + lim = DuplicationLimits(time_budget_secs=0, max_total_windows=35) + files = {"a.py": _BODY, "b.py": _BODY, "c.py": _BODY} + cache_dir = _seed(tmp_path, files, limits=lim) + idx = load_pair_index(cache_dir, WINDOW, lim) + assert idx is not None and idx.window_budget_hit + + (tmp_path / "a.py").write_text(_BODY.replace("doit", "edited")) + parsed = _parsed(tmp_path) + report = _incremental(parsed, cache_dir, {"a.py"}, limits=lim) + assert "incremental" not in report.diagnostics + assert _key(report) == _key(_full(parsed, limits=lim)) + + +def test_unchanged_token_cache_entries_survive_splice(tmp_path: Path): + """The splice path must retain unchanged files' cache entries.""" + import hashlib + + from repowise.core.analysis.health.duplication.token_cache import ( + DuplicationTokenCache, + ) + + cache_dir = _seed(tmp_path, {"a.py": _BODY, "b.py": _BODY, "c.py": _OTHER}) + (tmp_path / "a.py").write_text(_BODY.replace("doit", "edited")) + parsed = _parsed(tmp_path) + report = _incremental(parsed, cache_dir, {"a.py"}) + assert report.diagnostics.get("incremental") is True + + cache = DuplicationTokenCache(cache_dir, WINDOW) + cache.load() + for rel in ("a.py", "b.py", "c.py"): + digest = hashlib.sha256((tmp_path / rel).read_bytes()).hexdigest() + assert cache.get(digest) is not None, rel + + +def test_co_change_weight_applied_live_on_splice(tmp_path: Path): + """Finalize must consume the CURRENT git_meta_map, not persisted state.""" + import json + + cache_dir = _seed(tmp_path, {"a.py": _BODY, "b.py": _BODY, "c.py": _OTHER}) + (tmp_path / "c.py").write_text(_OTHER.replace("other", "edited")) + parsed = _parsed(tmp_path) + meta = { + "a.py": { + "co_change_partners_json": json.dumps([{"file_path": "b.py", "co_change_count": 7}]) + } + } + report = detect_clones( + parsed, + meta, + window_tokens=WINDOW, + min_lines=MIN_LINES, + limits=LIMITS, + cache_dir=cache_dir, + changed_files={"c.py"}, + ) + assert report.diagnostics.get("incremental") is True + ab = [p for p in report.pairs if {p.file_a, p.file_b} == {"a.py", "b.py"}] + assert ab and all(p.co_change_count == 7 for p in ab)