Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions backend/routes/analysis.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ async def get_style_analysis(
return {**cached_style, "cached": True}

logger.info("Analyzing code style", repo_id=repo_id)
style_data = style_analyzer.analyze_repository_style(repo["local_path"])
style_data = style_analyzer.analyze_repository_style(repo["local_path"], include_paths=repo.get("include_paths"))
style_analyzer.save_to_cache(repo_id, style_data)

return {**style_data, "cached": False}
Expand Down Expand Up @@ -178,7 +178,7 @@ async def get_codebase_dna(
logger.info("Extracting codebase DNA", repo_id=repo_id)
metrics.increment("dna_extractions")

dna = dna_extractor.extract_dna(repo["local_path"], repo_id)
dna = dna_extractor.extract_dna(repo["local_path"], repo_id, include_paths=repo.get("include_paths"))
dna_extractor.save_to_cache(repo_id, dna)

if format == "markdown":
Expand Down
51 changes: 39 additions & 12 deletions backend/services/dna_extractor.py
Original file line number Diff line number Diff line change
Expand Up @@ -361,21 +361,33 @@ def _detect_language(self, file_path: str) -> str:
'.tsx': 'typescript',
}.get(ext, 'unknown')

def _discover_files(self, repo_path: Path) -> List[Path]:
"""Find all code files, skipping irrelevant directories and symlinks"""
def _discover_files(self, repo_path: Path, include_paths: Optional[List[str]] = None) -> List[Path]:
"""Find all code files, skipping irrelevant directories and symlinks.

If include_paths is set, only files within those directories are returned.
"""
files = []
extensions = {'.py', '.js', '.jsx', '.ts', '.tsx', '.sql'}

try:
for item in repo_path.rglob('*'):
if item.is_symlink():
continue
if item.is_file() and item.suffix in extensions:
if not any(skip in item.parts for skip in self.SKIP_DIRS):
files.append(item)
if len(files) >= self.MAX_FILES:
logger.warning(f"Hit max file limit ({self.MAX_FILES})")
break
if not item.is_file() or item.suffix not in extensions:
continue
if any(skip in item.parts for skip in self.SKIP_DIRS):
continue
if include_paths:
rel_parts = item.relative_to(repo_path).parts
if not any(
rel_parts[:len(Path(p).parts)] == Path(p).parts
for p in include_paths
):
continue
files.append(item)
if len(files) >= self.MAX_FILES:
logger.warning(f"Hit max file limit ({self.MAX_FILES})")
break
except Exception as e:
logger.error(f"Error discovering files: {e}")

Expand Down Expand Up @@ -914,8 +926,11 @@ def _extract_config_patterns(self, files: List[Path], repo_path: Path) -> Config

return pattern

def extract_dna(self, repo_path: str, repo_id: str) -> CodebaseDNA:
"""Extract complete DNA profile from a codebase"""
def extract_dna(self, repo_path: str, repo_id: str, include_paths: Optional[List[str]] = None) -> CodebaseDNA:
"""Extract complete DNA profile from a codebase.

If include_paths is set, only files within those directories are analyzed.
"""
import time
start_time = time.time()

Expand All @@ -929,13 +944,25 @@ def extract_dna(self, repo_path: str, repo_id: str) -> CodebaseDNA:
logger.error(f"Repo path is not a directory: {repo_path}")
raise ValueError(f"Repository path is not a directory: {repo_path}")

# Sanitize include_paths (could be corrupt jsonb from DB)
if include_paths:
cleaned = []
for p in include_paths:
if not isinstance(p, str):
continue
p = p.replace('\\', '/').strip().strip('/')
if not p or '..' in p.split('/'):
continue
cleaned.append(p)
include_paths = cleaned or None

# reset cache for fresh extraction
self._reset_cache()

logger.info("Extracting codebase DNA", repo_id=repo_id, path=str(repo_path))
logger.info("Extracting codebase DNA", repo_id=repo_id, path=str(repo_path), include_paths=include_paths)

# Discover files
files = self._discover_files(repo_path)
files = self._discover_files(repo_path, include_paths=include_paths)
logger.info(f"Found {len(files)} code files")

# Detect framework first
Expand Down
33 changes: 28 additions & 5 deletions backend/services/style_analyzer.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,11 +139,26 @@ def _check_type_hints(self, source_code: str, language: str) -> bool:
else:
return ': ' in source_code and ('interface' in source_code or 'type ' in source_code)

def analyze_repository_style(self, repo_path: str) -> Dict:
"""Analyze coding style patterns across repository"""
def analyze_repository_style(self, repo_path: str, include_paths: Optional[List[str]] = None) -> Dict:
"""Analyze coding style patterns across repository.

If include_paths is set, only files within those directories are analyzed.
"""
repo_path = Path(repo_path)

logger.info("Analyzing code style for repository")
# Sanitize include_paths (could be corrupt jsonb from DB)
if include_paths:
cleaned = []
for p in include_paths:
if not isinstance(p, str):
continue
p = p.replace('\\', '/').strip().strip('/')
if not p or '..' in p.split('/'):
continue
cleaned.append(p)
include_paths = cleaned or None

logger.info("Analyzing code style for repository", include_paths=include_paths)

# Discover code files
code_files = []
Expand All @@ -155,8 +170,16 @@ def analyze_repository_style(self, repo_path: str) -> Dict:
continue
if any(skip in file_path.parts for skip in skip_dirs):
continue
if file_path.suffix in extensions:
code_files.append(file_path)
if file_path.suffix not in extensions:
continue
if include_paths:
rel_parts = file_path.relative_to(repo_path).parts
if not any(
rel_parts[:len(Path(p).parts)] == Path(p).parts
for p in include_paths
):
continue
code_files.append(file_path)

# Collect style data
function_names = []
Expand Down