|
| 1 | +"""Context assembly service for per-task context packaging. |
| 2 | +
|
| 3 | +Takes a task description + repo, finds the most relevant files via |
| 4 | +semantic search, expands with 1-hop dependencies, matches applicable |
| 5 | +project rules, and returns an assembled context package within a |
| 6 | +token budget. This is the core of OPE-172. |
| 7 | +""" |
| 8 | +import logging |
| 9 | +import re |
| 10 | +from pathlib import Path |
| 11 | +from typing import Any, Dict, List, Optional, Tuple |
| 12 | + |
| 13 | +logger = logging.getLogger(__name__) |
| 14 | + |
| 15 | +# Rule files in priority order (first found wins, same as dna_extractor) |
| 16 | +RULES_FILES = [ |
| 17 | + "CLAUDE.md", "AGENTS.md", ".cursorrules", |
| 18 | + ".codeintel/rules.md", "CONVENTIONS.md", |
| 19 | + ".github/copilot-instructions.md", "CODING_GUIDELINES.md", |
| 20 | +] |
| 21 | + |
| 22 | +# Sections that apply to every task regardless of file matches |
| 23 | +ALWAYS_RELEVANT_PATTERNS = re.compile( |
| 24 | + r"(git|commit|workflow|what not to do|never|critical|do not|testing|review)", |
| 25 | + re.IGNORECASE, |
| 26 | +) |
| 27 | + |
| 28 | + |
| 29 | +def _estimate_tokens(text: str) -> int: |
| 30 | + """Rough token estimate: 1 token per 4 chars.""" |
| 31 | + return len(text) // 4 |
| 32 | + |
| 33 | + |
| 34 | +def _split_rules_into_sections(content: str) -> List[Dict[str, str]]: |
| 35 | + """Split markdown content by ## headers into discrete sections.""" |
| 36 | + sections: List[Dict[str, str]] = [] |
| 37 | + current_header = "" |
| 38 | + current_body: List[str] = [] |
| 39 | + |
| 40 | + for line in content.splitlines(): |
| 41 | + if line.startswith("## "): |
| 42 | + if current_header or current_body: |
| 43 | + sections.append({ |
| 44 | + "header": current_header, |
| 45 | + "body": "\n".join(current_body).strip(), |
| 46 | + }) |
| 47 | + current_header = line |
| 48 | + current_body = [] |
| 49 | + else: |
| 50 | + current_body.append(line) |
| 51 | + |
| 52 | + # Capture the last section |
| 53 | + if current_header or current_body: |
| 54 | + sections.append({ |
| 55 | + "header": current_header, |
| 56 | + "body": "\n".join(current_body).strip(), |
| 57 | + }) |
| 58 | + |
| 59 | + return sections |
| 60 | + |
| 61 | + |
| 62 | +def _read_rules_file(repo_path: Path) -> Tuple[Optional[str], Optional[str]]: |
| 63 | + """Find and read the first matching rules file in the repo.""" |
| 64 | + for filename in RULES_FILES: |
| 65 | + rules_path = repo_path / filename |
| 66 | + if rules_path.exists() and rules_path.is_file(): |
| 67 | + try: |
| 68 | + content = rules_path.read_text(encoding="utf-8", errors="replace") |
| 69 | + if content.strip(): |
| 70 | + return content, filename |
| 71 | + except OSError as exc: |
| 72 | + logger.warning("Could not read rules file %s: %s", rules_path, exc) |
| 73 | + return None, None |
| 74 | + |
| 75 | + |
| 76 | +class ContextAssembler: |
| 77 | + """Assembles per-task context from semantic search + deps + rules.""" |
| 78 | + |
| 79 | + async def assemble( |
| 80 | + self, |
| 81 | + task: str, |
| 82 | + repo_id: str, |
| 83 | + user_id: str, |
| 84 | + token_budget: int = 1500, |
| 85 | + ) -> Dict[str, Any]: |
| 86 | + """Build a context package for a specific coding task. |
| 87 | +
|
| 88 | + Returns dict with 'context' (markdown string), 'files_found', |
| 89 | + 'tokens_used', and 'debug' metadata. |
| 90 | + """ |
| 91 | + from dependencies import indexer, dependency_analyzer, get_repo_or_404 |
| 92 | + from services.supabase_service import get_supabase_service |
| 93 | + |
| 94 | + repo = get_repo_or_404(repo_id, user_id) |
| 95 | + local_path = Path(repo.get("local_path", "")) |
| 96 | + |
| 97 | + # Step 1: Semantic search for the most relevant files |
| 98 | + search_results = await self._search(task, repo_id, indexer) |
| 99 | + found_files = self._unique_files(search_results) |
| 100 | + |
| 101 | + # Step 2: Expand with 1-hop dependencies |
| 102 | + dep_files = self._expand_deps(found_files, repo_id, get_supabase_service()) |
| 103 | + |
| 104 | + # Step 3: Match relevant rule sections |
| 105 | + all_files = list(dict.fromkeys(found_files + dep_files)) |
| 106 | + rules_content, rules_source = _read_rules_file(local_path) |
| 107 | + matched_rules = self._match_rules(rules_content, all_files) if rules_content else [] |
| 108 | + |
| 109 | + # Step 4: Assemble within token budget |
| 110 | + context_md = self._build_package( |
| 111 | + task, search_results, found_files, dep_files, |
| 112 | + matched_rules, token_budget, |
| 113 | + ) |
| 114 | + |
| 115 | + return { |
| 116 | + "context": context_md, |
| 117 | + "files_found": len(all_files), |
| 118 | + "tokens_used": _estimate_tokens(context_md), |
| 119 | + "token_budget": token_budget, |
| 120 | + "rules_source": rules_source, |
| 121 | + "search_hits": len(search_results), |
| 122 | + "dep_files_added": len(dep_files), |
| 123 | + "rule_sections_matched": len(matched_rules), |
| 124 | + } |
| 125 | + |
| 126 | + # ------------------------------------------------------------------ |
| 127 | + # Internal helpers |
| 128 | + # ------------------------------------------------------------------ |
| 129 | + |
| 130 | + async def _search( |
| 131 | + self, task: str, repo_id: str, indexer: Any, top_k: int = 5, |
| 132 | + ) -> List[Dict]: |
| 133 | + """Run semantic search and return top results.""" |
| 134 | + try: |
| 135 | + results = await indexer.search_v2( |
| 136 | + query=task, repo_id=repo_id, top_k=top_k, use_reranking=True, |
| 137 | + ) |
| 138 | + return results |
| 139 | + except Exception as exc: |
| 140 | + logger.error("Context search failed: %s", exc) |
| 141 | + return [] |
| 142 | + |
| 143 | + @staticmethod |
| 144 | + def _unique_files(results: List[Dict]) -> List[str]: |
| 145 | + """Extract unique file paths from search results, preserving order.""" |
| 146 | + seen: set[str] = set() |
| 147 | + files: List[str] = [] |
| 148 | + for r in results: |
| 149 | + fp = r.get("file_path", "") |
| 150 | + if fp and fp not in seen: |
| 151 | + seen.add(fp) |
| 152 | + files.append(fp) |
| 153 | + return files |
| 154 | + |
| 155 | + @staticmethod |
| 156 | + def _expand_deps( |
| 157 | + seed_files: List[str], repo_id: str, db: Any, |
| 158 | + ) -> List[str]: |
| 159 | + """Add 1-hop imports/dependents for seed files.""" |
| 160 | + try: |
| 161 | + all_deps = db.get_file_dependencies(repo_id) |
| 162 | + except Exception as exc: |
| 163 | + logger.warning("Could not load deps for expansion: %s", exc) |
| 164 | + return [] |
| 165 | + |
| 166 | + # Build adjacency maps |
| 167 | + imports_map: Dict[str, List[str]] = {} |
| 168 | + dependents_map: Dict[str, List[str]] = {} |
| 169 | + for row in all_deps: |
| 170 | + fp = row.get("file_path", "") |
| 171 | + deps = row.get("depends_on", []) |
| 172 | + imports_map[fp] = deps |
| 173 | + for dep in deps: |
| 174 | + dependents_map.setdefault(dep, []).append(fp) |
| 175 | + |
| 176 | + seed_set = set(seed_files) |
| 177 | + expanded: List[str] = [] |
| 178 | + for fp in seed_files: |
| 179 | + for imp in imports_map.get(fp, []): |
| 180 | + if imp not in seed_set and imp not in expanded: |
| 181 | + expanded.append(imp) |
| 182 | + for dep in dependents_map.get(fp, []): |
| 183 | + if dep not in seed_set and dep not in expanded: |
| 184 | + expanded.append(dep) |
| 185 | + |
| 186 | + return expanded |
| 187 | + |
| 188 | + @staticmethod |
| 189 | + def _match_rules( |
| 190 | + rules_content: str, files: List[str], |
| 191 | + ) -> List[Dict[str, str]]: |
| 192 | + """Return rule sections relevant to the discovered files.""" |
| 193 | + sections = _split_rules_into_sections(rules_content) |
| 194 | + stems = {Path(f).stem for f in files} |
| 195 | + names = {Path(f).name for f in files} |
| 196 | + |
| 197 | + matched: List[Dict[str, str]] = [] |
| 198 | + for section in sections: |
| 199 | + header = section["header"] |
| 200 | + body = section["body"] |
| 201 | + combined = f"{header}\n{body}" |
| 202 | + |
| 203 | + # Always-relevant sections (git rules, "what not to do", etc.) |
| 204 | + if ALWAYS_RELEVANT_PATTERNS.search(header): |
| 205 | + matched.append(section) |
| 206 | + continue |
| 207 | + |
| 208 | + # Sections mentioning any discovered file |
| 209 | + if any(name in combined for name in names): |
| 210 | + matched.append(section) |
| 211 | + continue |
| 212 | + if any(stem in combined for stem in stems if len(stem) > 2): |
| 213 | + matched.append(section) |
| 214 | + |
| 215 | + return matched |
| 216 | + |
| 217 | + @staticmethod |
| 218 | + def _build_package( |
| 219 | + task: str, |
| 220 | + search_results: List[Dict], |
| 221 | + found_files: List[str], |
| 222 | + dep_files: List[str], |
| 223 | + matched_rules: List[Dict[str, str]], |
| 224 | + budget: int, |
| 225 | + ) -> str: |
| 226 | + """Assemble markdown context package within token budget.""" |
| 227 | + lines: List[str] = [f'## Context for: "{task}"', ""] |
| 228 | + |
| 229 | + # Tier 1: Relevant files (highest priority) |
| 230 | + if found_files: |
| 231 | + lines.append("### Relevant files") |
| 232 | + for r in search_results: |
| 233 | + fp = r.get("file_path", "") |
| 234 | + name = r.get("qualified_name", r.get("name", "")) |
| 235 | + score = r.get("score", 0) |
| 236 | + sig = r.get("signature", "") |
| 237 | + pct = f"{score * 100:.0f}%" if isinstance(score, float) else str(score) |
| 238 | + desc = sig if sig else name |
| 239 | + lines.append(f"- `{fp}` -- {desc} (relevance: {pct})") |
| 240 | + lines.append("") |
| 241 | + |
| 242 | + # Tier 2: Dependency files |
| 243 | + if dep_files: |
| 244 | + lines.append("### Depends on") |
| 245 | + for fp in dep_files[:10]: |
| 246 | + lines.append(f"- `{fp}`") |
| 247 | + lines.append("") |
| 248 | + |
| 249 | + # Check budget before adding rules |
| 250 | + current = _estimate_tokens("\n".join(lines)) |
| 251 | + remaining = budget - current |
| 252 | + |
| 253 | + # Tier 3: Matched rules |
| 254 | + if matched_rules and remaining > 50: |
| 255 | + lines.append("### Rules that apply") |
| 256 | + for section in matched_rules: |
| 257 | + section_text = section["header"] + "\n" + section["body"] |
| 258 | + section_tokens = _estimate_tokens(section_text) |
| 259 | + if section_tokens <= remaining: |
| 260 | + lines.append(section["body"]) |
| 261 | + remaining -= section_tokens |
| 262 | + else: |
| 263 | + # Truncate the last section to fit |
| 264 | + chars_left = remaining * 4 |
| 265 | + lines.append(section["body"][:chars_left] + "...") |
| 266 | + break |
| 267 | + lines.append("") |
| 268 | + |
| 269 | + return "\n".join(lines) |
0 commit comments