diff --git a/java-ecosystem/libs/analysis-engine/src/main/java/org/rostilos/codecrow/analysisengine/aiclient/AiAnalysisClient.java b/java-ecosystem/libs/analysis-engine/src/main/java/org/rostilos/codecrow/analysisengine/aiclient/AiAnalysisClient.java index d31b136a..bf2b7a8b 100644 --- a/java-ecosystem/libs/analysis-engine/src/main/java/org/rostilos/codecrow/analysisengine/aiclient/AiAnalysisClient.java +++ b/java-ecosystem/libs/analysis-engine/src/main/java/org/rostilos/codecrow/analysisengine/aiclient/AiAnalysisClient.java @@ -160,7 +160,7 @@ public Map performAnalysis(AiAnalysisRequest request, java.util. /** * Extracts analysis data from nested response structure. - * Expected: response -> result -> {comment, issues} + * Expected: response -> result -> {comment, issues, inference_stats} * Issues can be either a List (array) or Map (object with numeric keys) */ private Map extractAndValidateAnalysisData(Map response) throws IOException { @@ -176,6 +176,15 @@ private Map extractAndValidateAnalysisData(Map response) throws if (result == null) { throw new IOException("Missing 'result' field in AI response"); } + + // Check for error response from MCP client + Object errorFlag = result.get("error"); + if (Boolean.TRUE.equals(errorFlag) || "true".equals(String.valueOf(errorFlag))) { + String errorMessage = result.get("error_message") != null + ? String.valueOf(result.get("error_message")) + : String.valueOf(result.get("comment")); + throw new IOException("Analysis failed: " + errorMessage); + } if (!result.containsKey("comment") || !result.containsKey("issues")) { throw new IOException("Analysis data missing required fields: 'comment' and/or 'issues'"); diff --git a/python-ecosystem/mcp-client/model/models.py b/python-ecosystem/mcp-client/model/models.py index e6aa26e8..a7a0830c 100644 --- a/python-ecosystem/mcp-client/model/models.py +++ b/python-ecosystem/mcp-client/model/models.py @@ -166,6 +166,9 @@ class CodeReviewIssue(BaseModel): suggestedFixDescription: str = Field(description="Description of the suggested fix") suggestedFixDiff: Optional[str] = Field(default=None, description="Optional unified diff format patch for the fix") isResolved: bool = Field(default=False, description="Whether this issue from previous analysis is resolved") + # Resolution tracking fields + resolutionExplanation: Optional[str] = Field(default=None, description="Explanation of how the issue was resolved (separate from original reason)") + resolvedInCommit: Optional[str] = Field(default=None, description="Commit hash where the issue was resolved") # Additional fields preserved from previous issues during reconciliation visibility: Optional[str] = Field(default=None, description="Issue visibility status") codeSnippet: Optional[str] = Field(default=None, description="Code snippet associated with the issue") diff --git a/python-ecosystem/mcp-client/service/multi_stage_orchestrator.py b/python-ecosystem/mcp-client/service/multi_stage_orchestrator.py index bf662f22..38508932 100644 --- a/python-ecosystem/mcp-client/service/multi_stage_orchestrator.py +++ b/python-ecosystem/mcp-client/service/multi_stage_orchestrator.py @@ -1,6 +1,7 @@ import logging import asyncio import json +import re from typing import Dict, Any, List, Optional, Callable from model.models import ( @@ -112,6 +113,78 @@ def __init__( self.rag_client = rag_client self.event_callback = event_callback self.max_parallel_stage_1 = 5 # Limit parallel execution to avoid rate limits + # PR-specific RAG indexing (data goes into main collection with PR metadata) + self._pr_number: Optional[int] = None + self._pr_indexed: bool = False + + async def _index_pr_files( + self, + request: ReviewRequestDto, + processed_diff: Optional[ProcessedDiff] + ) -> None: + """ + Index PR files into the main RAG collection with PR-specific metadata. + This enables hybrid queries that prioritize PR data over stale branch data. + """ + if not self.rag_client or not processed_diff: + return + + pr_number = request.pullRequestId + if not pr_number: + logger.info("No PR number, skipping PR file indexing") + return + + # Prepare files for indexing + # Prefer full_content if available, otherwise use diff content + # Diff content still provides value for understanding what changed + files = [] + for f in processed_diff.get_included_files(): + content = f.full_content or f.content # Use full content if available, fallback to diff + if content and f.change_type.value != "DELETED": + files.append({ + "path": f.path, + "content": content, + "change_type": f.change_type.value if hasattr(f.change_type, 'value') else str(f.change_type) + }) + + if not files: + logger.info("No files to index for PR") + return + + try: + result = await self.rag_client.index_pr_files( + workspace=request.projectWorkspace, + project=request.projectNamespace, + pr_number=pr_number, + branch=request.targetBranchName or "unknown", + files=files + ) + if result.get("status") == "indexed": + self._pr_number = pr_number + self._pr_indexed = True + logger.info(f"Indexed PR #{pr_number}: {result.get('chunks_indexed', 0)} chunks") + else: + logger.warning(f"Failed to index PR files: {result}") + except Exception as e: + logger.warning(f"Error indexing PR files: {e}") + + async def _cleanup_pr_files(self, request: ReviewRequestDto) -> None: + """Delete PR-indexed data after analysis completes.""" + if not self._pr_indexed or not self._pr_number or not self.rag_client: + return + + try: + await self.rag_client.delete_pr_files( + workspace=request.projectWorkspace, + project=request.projectNamespace, + pr_number=self._pr_number + ) + logger.info(f"Cleaned up PR #{self._pr_number} indexed data") + except Exception as e: + logger.warning(f"Failed to cleanup PR files: {e}") + finally: + self._pr_number = None + self._pr_indexed = False async def execute_branch_analysis(self, prompt: str) -> Dict[str, Any]: """ @@ -179,7 +252,15 @@ async def orchestrate_review( else: logger.info("FULL mode: initial PR review") + # Generate unique ID for temp diff collection + analysis_id = f"{request.projectId}_{request.pullRequestId or request.commitHash or 'unknown'}" + try: + # === Index PR files into RAG for hybrid queries === + # This indexes PR files with metadata (pr=true, pr_number=X) to enable + # queries that prioritize fresh PR data over potentially stale branch data + await self._index_pr_files(request, processed_diff) + # === STAGE 0: Planning === self._emit_status("stage_0_started", "Stage 0: Planning & Prioritization...") review_plan = await self._execute_stage_0_planning(request, is_incremental) @@ -226,6 +307,9 @@ async def orchestrate_review( logger.error(f"Multi-stage review failed: {e}", exc_info=True) self._emit_error(str(e)) raise + finally: + # Cleanup PR-indexed data + await self._cleanup_pr_files(request) async def _reconcile_previous_issues( self, @@ -238,12 +322,16 @@ async def _reconcile_previous_issues( - Mark resolved issues as isResolved=true - Update line numbers for persisting issues - Merge with new issues found in delta diff + - PRESERVE original issue data (reason, suggestedFixDescription, suggestedFixDiff) """ if not request.previousCodeAnalysisIssues: return new_issues logger.info(f"Reconciling {len(request.previousCodeAnalysisIssues)} previous issues with {len(new_issues)} new issues") + # Current commit for resolution tracking + current_commit = request.currentCommitHash or request.commitHash + # Get the delta diff content to check what files/lines changed delta_diff = request.deltaDiff or "" @@ -253,25 +341,70 @@ async def _reconcile_previous_issues( for f in processed_diff.files: changed_files_in_delta.add(f.path) - reconciled_issues = list(new_issues) # Start with new issues + # Build lookup of previous issues by ID for merging with LLM results + prev_issues_by_id = {} + for prev_issue in request.previousCodeAnalysisIssues: + if hasattr(prev_issue, 'model_dump'): + prev_data = prev_issue.model_dump() + else: + prev_data = prev_issue if isinstance(prev_issue, dict) else vars(prev_issue) + issue_id = prev_data.get('id') + if issue_id: + prev_issues_by_id[str(issue_id)] = prev_data + + reconciled_issues = [] + processed_prev_ids = set() # Track which previous issues we've handled - # Process each previous issue + # Process new issues from LLM - merge with previous issue data if they reference same ID + for new_issue in new_issues: + new_data = new_issue.model_dump() if hasattr(new_issue, 'model_dump') else new_issue + issue_id = new_data.get('id') + + # If this issue references a previous issue ID, merge data + if issue_id and str(issue_id) in prev_issues_by_id: + prev_data = prev_issues_by_id[str(issue_id)] + processed_prev_ids.add(str(issue_id)) + + # Check if LLM marked it resolved + is_resolved = new_data.get('isResolved', False) + + # PRESERVE original data, use LLM's reason as resolution explanation + merged_issue = CodeReviewIssue( + id=str(issue_id), + severity=(prev_data.get('severity') or prev_data.get('issueSeverity') or 'MEDIUM').upper(), + category=prev_data.get('category') or prev_data.get('issueCategory') or prev_data.get('type') or 'CODE_QUALITY', + file=prev_data.get('file') or prev_data.get('filePath') or new_data.get('file', 'unknown'), + line=str(prev_data.get('line') or prev_data.get('lineNumber') or new_data.get('line', '1')), + # PRESERVE original reason and fix description + reason=prev_data.get('reason') or prev_data.get('title') or prev_data.get('description') or '', + suggestedFixDescription=prev_data.get('suggestedFixDescription') or prev_data.get('suggestedFix') or '', + suggestedFixDiff=prev_data.get('suggestedFixDiff') or None, + isResolved=is_resolved, + # Store LLM's explanation separately if resolved + resolutionExplanation=new_data.get('reason') if is_resolved else None, + resolvedInCommit=current_commit if is_resolved else None, + visibility=prev_data.get('visibility'), + codeSnippet=prev_data.get('codeSnippet') + ) + reconciled_issues.append(merged_issue) + else: + # New issue not referencing previous - keep as is + reconciled_issues.append(new_issue) + + # Process remaining previous issues not handled by LLM for prev_issue in request.previousCodeAnalysisIssues: - # Convert to dict if needed if hasattr(prev_issue, 'model_dump'): prev_data = prev_issue.model_dump() else: prev_data = prev_issue if isinstance(prev_issue, dict) else vars(prev_issue) - # Debug log to verify field mapping - logger.debug(f"Previous issue data: reason={prev_data.get('reason')}, " - f"suggestedFixDescription={prev_data.get('suggestedFixDescription')}, " - f"suggestedFixDiff={prev_data.get('suggestedFixDiff')[:50] if prev_data.get('suggestedFixDiff') else None}") + issue_id = prev_data.get('id') + if issue_id and str(issue_id) in processed_prev_ids: + continue # Already handled above file_path = prev_data.get('file', prev_data.get('filePath', '')) - issue_id = prev_data.get('id') - # Check if this issue was already found in new issues (by file+line or ID) + # Check if this issue was already found in new issues (by file+line) already_reported = False for new_issue in new_issues: new_data = new_issue.model_dump() if hasattr(new_issue, 'model_dump') else new_issue @@ -279,34 +412,11 @@ async def _reconcile_previous_issues( str(new_data.get('line')) == str(prev_data.get('line', prev_data.get('lineNumber')))): already_reported = True break - if issue_id and new_data.get('id') == issue_id: - already_reported = True - break if already_reported: - continue # Already in new issues, skip - - # Check if the file was modified in delta diff - file_in_delta = any(file_path.endswith(f) or f.endswith(file_path) for f in changed_files_in_delta) - - # If file wasn't touched in delta, issue persists unchanged - # If file was touched, we need to check if the specific line was modified - is_resolved = False - if file_in_delta: - # Simple heuristic: if file changed and we didn't re-report this issue, - # it might be resolved. But we should be conservative here. - # For now, we'll re-report it as persisting unless LLM marked it resolved - pass + continue - # Preserve all original issue data - just pass through as CodeReviewIssue - # Field mapping from Java DTO: - # reason (or title for legacy) -> reason - # severity (uppercase) -> severity - # category (or issueCategory) -> category - # file -> file - # line -> line - # suggestedFixDescription -> suggestedFixDescription - # suggestedFixDiff -> suggestedFixDiff + # Preserve all original issue data persisting_issue = CodeReviewIssue( id=str(issue_id) if issue_id else None, severity=(prev_data.get('severity') or prev_data.get('issueSeverity') or 'MEDIUM').upper(), @@ -316,7 +426,7 @@ async def _reconcile_previous_issues( reason=prev_data.get('reason') or prev_data.get('title') or prev_data.get('description') or '', suggestedFixDescription=prev_data.get('suggestedFixDescription') or prev_data.get('suggestedFix') or '', suggestedFixDiff=prev_data.get('suggestedFixDiff') or None, - isResolved=is_resolved, + isResolved=False, visibility=prev_data.get('visibility'), codeSnippet=prev_data.get('codeSnippet') ) @@ -399,6 +509,54 @@ def _format_previous_issues_for_batch(self, issues: List[Any]) -> str: lines.append("=== END PREVIOUS ISSUES ===") return "\n".join(lines) + def _extract_symbols_from_diff(self, diff_content: str) -> List[str]: + """ + Extract potential symbols (identifiers, class names, function names) from diff. + Used to query cross-file context for related changes. + """ + if not diff_content: + return [] + + # Common language keywords/stop-words to filter out + STOP_WORDS = { + # Python + 'import', 'from', 'class', 'def', 'return', 'if', 'else', 'elif', + 'for', 'while', 'try', 'except', 'finally', 'with', 'as', 'pass', + 'break', 'continue', 'raise', 'yield', 'lambda', 'async', 'await', + 'True', 'False', 'None', 'and', 'or', 'not', 'in', 'is', + # Java/TS/JS + 'public', 'private', 'protected', 'static', 'final', 'void', + 'new', 'this', 'super', 'extends', 'implements', 'interface', + 'abstract', 'const', 'let', 'var', 'function', 'export', 'default', + 'throw', 'throws', 'catch', 'instanceof', 'typeof', 'null', + # Common + 'true', 'false', 'null', 'undefined', 'self', 'args', 'kwargs', + 'string', 'number', 'boolean', 'object', 'array', 'list', 'dict', + } + + symbols = set() + + # Patterns for common identifiers + # Match CamelCase identifiers (likely class/component names) + camel_case = re.findall(r'\b([A-Z][a-z]+[A-Z][a-zA-Z]*)\b', diff_content) + symbols.update(camel_case) + + # Match snake_case identifiers (variables, functions) + snake_case = re.findall(r'\b([a-z][a-z0-9]*(?:_[a-z0-9]+)+)\b', diff_content) + symbols.update(s for s in snake_case if len(s) > 5) # Filter short ones + + # Match assignments and function calls + assignments = re.findall(r'\b(\w+)\s*[=:]\s*', diff_content) + symbols.update(a for a in assignments if len(a) > 3) + + # Match import statements + imports = re.findall(r'(?:from|import)\s+([a-zA-Z_][a-zA-Z0-9_.]+)', diff_content) + symbols.update(imports) + + # Filter out stop-words and return + filtered = [s for s in symbols if s.lower() not in STOP_WORDS and len(s) > 2] + return filtered[:20] # Limit to top 20 symbols + def _extract_diff_snippets(self, diff_content: str) -> List[str]: """ Extract meaningful code snippets from diff content for RAG semantic search. @@ -583,6 +741,9 @@ async def _fetch_batch_rag_context( """ Fetch RAG context specifically for this batch of files. Uses batch file paths and diff snippets for targeted semantic search. + + In hybrid mode (when PR files are indexed), passes pr_number to enable + queries that prioritize fresh PR data over potentially stale branch data. """ if not self.rag_client: return None @@ -593,6 +754,10 @@ async def _fetch_batch_rag_context( logger.info(f"Fetching per-batch RAG context for {len(batch_file_paths)} files") + # Use hybrid mode if PR files were indexed + pr_number = request.pullRequestId if self._pr_indexed else None + all_pr_files = request.changedFiles if self._pr_indexed else None + rag_response = await self.rag_client.get_pr_context( workspace=request.projectWorkspace, project=request.projectNamespace, @@ -601,7 +766,9 @@ async def _fetch_batch_rag_context( diff_snippets=batch_diff_snippets, pr_title=request.prTitle, pr_description=request.prDescription, - top_k=10 # Fewer chunks per batch for focused context + top_k=10, # Fewer chunks per batch for focused context + pr_number=pr_number, + all_pr_changed_files=all_pr_files ) if rag_response and rag_response.get("context"): @@ -631,7 +798,8 @@ async def _review_file_batch( batch_files_data = [] batch_file_paths = [] batch_diff_snippets = [] - project_rules = "1. No hardcoded secrets.\n2. Use dependency injection.\n3. Verify all inputs." + #TODO: Project custom rules + project_rules = "" # For incremental mode, use deltaDiff instead of full diff diff_source = None @@ -676,6 +844,7 @@ async def _review_file_batch( ) # Use batch-specific RAG context if available, otherwise fall back to initial context + # Hybrid mode: PR-indexed data is already included via _fetch_batch_rag_context if batch_rag_context: logger.info(f"Using per-batch RAG context for: {batch_file_paths}") rag_context_text = self._format_rag_context( diff --git a/python-ecosystem/mcp-client/service/rag_client.py b/python-ecosystem/mcp-client/service/rag_client.py index ae9ba481..f6a1fa92 100644 --- a/python-ecosystem/mcp-client/service/rag_client.py +++ b/python-ecosystem/mcp-client/service/rag_client.py @@ -65,7 +65,9 @@ async def get_pr_context( enable_priority_reranking: bool = True, min_relevance_score: float = None, base_branch: Optional[str] = None, - deleted_files: Optional[List[str]] = None + deleted_files: Optional[List[str]] = None, + pr_number: Optional[int] = None, + all_pr_changed_files: Optional[List[str]] = None ) -> Dict[str, Any]: """ Get relevant context for PR review with multi-branch support. @@ -83,6 +85,8 @@ async def get_pr_context( min_relevance_score: Minimum relevance threshold (default from RAG_MIN_RELEVANCE_SCORE) base_branch: Base branch (PR target, e.g., 'main'). Auto-detected if not provided. deleted_files: Files deleted in target branch (excluded from results) + pr_number: If set, enables hybrid query with PR-indexed data priority + all_pr_changed_files: All files in PR (for exclusion from branch query in hybrid mode) Returns: Dict with context information or empty dict if RAG is disabled @@ -123,6 +127,12 @@ async def get_pr_context( payload["base_branch"] = base_branch if deleted_files: payload["deleted_files"] = deleted_files + + # Add hybrid mode parameters + if pr_number: + payload["pr_number"] = pr_number + if all_pr_changed_files: + payload["all_pr_changed_files"] = all_pr_changed_files client = await self._get_client() response = await client.post( @@ -284,3 +294,109 @@ async def get_deterministic_context( except Exception as e: logger.error(f"Unexpected error in deterministic RAG query: {e}") return {"context": {"chunks": [], "by_identifier": {}, "by_file": {}}} + + # ========================================================================= + # PR File Indexing Methods (for PR-specific RAG layer) + # ========================================================================= + + async def index_pr_files( + self, + workspace: str, + project: str, + pr_number: int, + branch: str, + files: List[Dict[str, str]] + ) -> Dict[str, Any]: + """ + Index PR files into the main collection with PR-specific metadata. + + Files are indexed with metadata (pr=true, pr_number=X) to enable + hybrid queries that prioritize PR data over branch data. + + Existing PR points for the same pr_number are deleted first. + + Args: + workspace: Workspace identifier + project: Project identifier + pr_number: PR number for metadata tagging + branch: Source branch name + files: List of {path: str, content: str, change_type: str} + + Returns: + Dict with indexing status and chunk counts + """ + if not self.enabled: + logger.debug("RAG disabled, skipping PR file indexing") + return {"status": "skipped", "chunks_indexed": 0} + + if not files: + logger.debug("No files to index for PR") + return {"status": "skipped", "chunks_indexed": 0} + + try: + payload = { + "workspace": workspace, + "project": project, + "pr_number": pr_number, + "branch": branch, + "files": files + } + + client = await self._get_client() + response = await client.post( + f"{self.base_url}/index/pr-files", + json=payload, + timeout=120.0 # Longer timeout for indexing + ) + response.raise_for_status() + result = response.json() + + logger.info(f"Indexed PR #{pr_number}: {result.get('chunks_indexed', 0)} chunks from {result.get('files_processed', 0)} files") + return result + + except httpx.HTTPError as e: + logger.warning(f"Failed to index PR files: {e}") + return {"status": "error", "error": str(e)} + except Exception as e: + logger.error(f"Unexpected error indexing PR files: {e}") + return {"status": "error", "error": str(e)} + + async def delete_pr_files( + self, + workspace: str, + project: str, + pr_number: int + ) -> bool: + """ + Delete all indexed points for a specific PR. + + Called after analysis completes to clean up PR-specific data. + + Args: + workspace: Workspace identifier + project: Project identifier + pr_number: PR number to delete + + Returns: + True if deleted successfully, False otherwise + """ + if not self.enabled: + return True + + try: + client = await self._get_client() + response = await client.delete( + f"{self.base_url}/index/pr-files/{workspace}/{project}/{pr_number}" + ) + response.raise_for_status() + result = response.json() + + logger.info(f"Deleted PR #{pr_number} indexed data") + return result.get("status") == "deleted" + + except httpx.HTTPError as e: + logger.warning(f"Failed to delete PR files: {e}") + return False + except Exception as e: + logger.error(f"Unexpected error deleting PR files: {e}") + return False diff --git a/python-ecosystem/mcp-client/utils/diff_processor.py b/python-ecosystem/mcp-client/utils/diff_processor.py index 913c2f04..17616bfe 100644 --- a/python-ecosystem/mcp-client/utils/diff_processor.py +++ b/python-ecosystem/mcp-client/utils/diff_processor.py @@ -50,7 +50,8 @@ class DiffFile: old_path: Optional[str] = None # For renamed files additions: int = 0 deletions: int = 0 - content: str = "" + content: str = "" # Diff content (unified diff format) + full_content: Optional[str] = None # Full file content (populated separately if needed) hunks: List[str] = field(default_factory=list) is_binary: bool = False is_skipped: bool = False diff --git a/python-ecosystem/mcp-client/utils/prompts/prompt_builder.py b/python-ecosystem/mcp-client/utils/prompts/prompt_builder.py index b3cad5b1..4edba35f 100644 --- a/python-ecosystem/mcp-client/utils/prompts/prompt_builder.py +++ b/python-ecosystem/mcp-client/utils/prompts/prompt_builder.py @@ -73,7 +73,8 @@ def build_stage_1_batch_prompt( project_rules: str = "", rag_context: str = "", is_incremental: bool = False, - previous_issues: str = "" + previous_issues: str = "", + all_pr_files: List[str] = None # All files in this PR for cross-file awareness ) -> str: """ Build prompt for Stage 1: Batch File Review. @@ -103,6 +104,20 @@ def build_stage_1_batch_prompt( This is a follow-up review after the PR was updated with new commits. The diff above shows ONLY the changes since the last review - focus on these NEW changes. For any previous issues listed below, check if they are RESOLVED in the new changes. +""" + + # Add PR-wide file list for cross-batch awareness + pr_files_context = "" + if all_pr_files: + current_batch_files = [f['path'] for f in files] + other_files = [fp for fp in all_pr_files if fp not in current_batch_files] + if other_files: + pr_files_context = f""" +## OTHER FILES IN THIS PR (for cross-file awareness) +This PR also modifies these files (reviewed in other batches): +{chr(10).join('- ' + fp for fp in other_files[:20])} +{'... and ' + str(len(other_files) - 20) + ' more files' if len(other_files) > 20 else ''} +Consider potential interactions with these files when reviewing. """ return STAGE_1_BATCH_PROMPT_TEMPLATE.format( @@ -111,7 +126,8 @@ def build_stage_1_batch_prompt( files_context=files_context, rag_context=rag_context or "(No additional codebase context available)", incremental_instructions=incremental_instructions, - previous_issues=previous_issues + previous_issues=previous_issues, + pr_files_context=pr_files_context ) @staticmethod diff --git a/python-ecosystem/mcp-client/utils/response_parser.py b/python-ecosystem/mcp-client/utils/response_parser.py index fc169f47..b49c14a9 100644 --- a/python-ecosystem/mcp-client/utils/response_parser.py +++ b/python-ecosystem/mcp-client/utils/response_parser.py @@ -703,21 +703,14 @@ def create_error_response(error_message: str, exception_str: str = "") -> Dict[s exception_str: Optional exception details Returns: - Structured error response dictionary with issues as list + Structured error response dictionary marked as error (no fake issues) """ full_message = f"{error_message}: {exception_str}" if exception_str else error_message return { + "status": "error", "comment": full_message, - "issues": [ - { - "severity": "HIGH", - "category": "ERROR_HANDLING", - "file": "system", - "line": "0", - "reason": full_message, - "suggestedFixDescription": "Check system configuration and connectivity", - "isResolved": False - } - ] + "issues": [], # Don't create fake issues for errors - let Java handle error state properly + "error": True, + "error_message": full_message } \ No newline at end of file diff --git a/python-ecosystem/rag-pipeline/src/rag_pipeline/api/api.py b/python-ecosystem/rag-pipeline/src/rag_pipeline/api/api.py index 2ea2c73a..675e063b 100644 --- a/python-ecosystem/rag-pipeline/src/rag_pipeline/api/api.py +++ b/python-ecosystem/rag-pipeline/src/rag_pipeline/api/api.py @@ -1,8 +1,11 @@ import logging import gc -from typing import List, Optional +import uuid +from typing import Dict, List, Optional from fastapi import FastAPI, HTTPException, BackgroundTasks from pydantic import BaseModel +from llama_index.core.schema import TextNode +from qdrant_client.models import PointStruct from ..models.config import RAGConfig, IndexStats from ..core.index_manager import RAGIndexManager @@ -65,6 +68,9 @@ class PRContextRequest(BaseModel): enable_priority_reranking: Optional[bool] = True min_relevance_score: Optional[float] = 0.7 deleted_files: Optional[List[str]] = [] # Files deleted in target branch + # NEW: PR-specific hybrid query mode + pr_number: Optional[int] = None # If set, enables hybrid query with PR data priority + all_pr_changed_files: Optional[List[str]] = [] # All files in PR (for exclusion from branch query) class DeterministicContextRequest(BaseModel): @@ -382,15 +388,19 @@ def semantic_search(request: QueryRequest): @app.post("/query/pr-context") def get_pr_context(request: PRContextRequest): """ - Get context for PR review with multi-branch support. + Get context for PR review with multi-branch support and optional PR-specific hybrid mode. - Queries both target branch and base branch to preserve cross-file relationships. - Results are deduplicated with target branch taking priority. + When pr_number is provided, uses HYBRID query: + 1. Query PR-indexed chunks (pr=true, pr_number=X) - these are the actual changed files + 2. Query branch data, excluding files that are in the PR (to get unchanged dependencies) + 3. Merge results with PR data taking priority Args: branch: Target branch (PR source branch) base_branch: Base branch (PR target, e.g., 'main'). Auto-detected if not provided. deleted_files: Files deleted in target branch (excluded from results) + pr_number: If set, enables hybrid query with PR data priority + all_pr_changed_files: Files to exclude from branch query (PR files already indexed separately) """ try: # If branch is not provided, return empty context @@ -409,6 +419,21 @@ def get_pr_context(request: PRContextRequest): } } + pr_results = [] + + # HYBRID MODE: Query PR-indexed data first if pr_number is provided + if request.pr_number: + pr_results = _query_pr_indexed_data( + workspace=request.workspace, + project=request.project, + pr_number=request.pr_number, + query_texts=request.diff_snippets or [], + pr_title=request.pr_title, + top_k=request.top_k or 15 + ) + logger.info(f"Hybrid mode: Found {len(pr_results)} PR-specific chunks for PR #{request.pr_number}") + + # Get branch context (with optional exclusion of PR files) context = query_service.get_context_for_pr( workspace=request.workspace, project=request.project, @@ -421,16 +446,42 @@ def get_pr_context(request: PRContextRequest): enable_priority_reranking=request.enable_priority_reranking, min_relevance_score=request.min_relevance_score, base_branch=request.base_branch, - deleted_files=request.deleted_files or [] + deleted_files=request.deleted_files or [], + # Pass PR files to exclude if in hybrid mode + exclude_pr_files=request.all_pr_changed_files if request.pr_number else [] ) + # Merge PR results with branch results (PR first, then branch) + if pr_results: + existing_paths = set() + merged_code = [] + + # PR results first (highest priority - fresh data) + for pr_chunk in pr_results: + path = pr_chunk.get("path", "") + if path not in existing_paths: + merged_code.append(pr_chunk) + existing_paths.add(path) + + # Then branch results (excluding paths already covered by PR) + for branch_chunk in context.get("relevant_code", []): + path = branch_chunk.get("path", "") + if path not in existing_paths: + merged_code.append(branch_chunk) + existing_paths.add(path) + + context["relevant_code"] = merged_code + context["_pr_chunks_count"] = len(pr_results) + # Add metadata about processing context["_metadata"] = { "priority_reranking_enabled": request.enable_priority_reranking, "min_relevance_score": request.min_relevance_score, "changed_files_count": len(request.changed_files), "result_count": len(context.get("relevant_code", [])), - "branches_searched": context.get("_branches_searched", [request.branch]) + "branches_searched": context.get("_branches_searched", [request.branch]), + "hybrid_mode": request.pr_number is not None, + "pr_number": request.pr_number } return {"context": context} @@ -439,6 +490,76 @@ def get_pr_context(request: PRContextRequest): raise HTTPException(status_code=500, detail=str(e)) +def _query_pr_indexed_data( + workspace: str, + project: str, + pr_number: int, + query_texts: List[str], + pr_title: Optional[str], + top_k: int = 15 +) -> List[Dict]: + """ + Query PR-indexed chunks from the main collection. + + Filters by pr=true and pr_number to get only PR-specific data. + """ + from qdrant_client.models import Filter, FieldCondition, MatchValue + + collection_name = index_manager._get_project_collection_name(workspace, project) + + if not index_manager._collection_manager.collection_exists(collection_name): + return [] + + # Build query from snippets and title + query_parts = [] + if pr_title: + query_parts.append(pr_title) + if query_texts: + query_parts.extend(query_texts[:5]) # Limit snippets + + if not query_parts: + # If no query, just get all PR chunks + query_text = f"code changes for PR {pr_number}" + else: + query_text = " ".join(query_parts)[:1000] + + try: + # Generate embedding for query + query_embedding = index_manager.embed_model.get_text_embedding(query_text) + + # Search with PR filter + results = index_manager.qdrant_client.search( + collection_name=collection_name, + query_vector=query_embedding, + query_filter=Filter( + must=[ + FieldCondition(key="pr", match=MatchValue(value=True)), + FieldCondition(key="pr_number", match=MatchValue(value=pr_number)) + ] + ), + limit=top_k, + with_payload=True + ) + + formatted = [] + for r in results: + formatted.append({ + "path": r.payload.get("path", "unknown"), + "content": r.payload.get("text", ""), + "score": r.score, + "semantic_name": r.payload.get("semantic_name", ""), + "semantic_type": r.payload.get("semantic_type", ""), + "branch": r.payload.get("pr_branch", ""), + "_source": "pr_indexed" + }) + + return formatted + + except Exception as e: + logger.warning(f"Error querying PR-indexed data: {e}") + return [] + + @app.post("/query/deterministic") def get_deterministic_context(request: DeterministicContextRequest): """ @@ -591,6 +712,184 @@ def get_memory_usage(): raise HTTPException(status_code=500, detail=str(e)) +# ============================================================================= +# PR-SPECIFIC RAG ENDPOINTS (for PR file indexing with metadata) +# ============================================================================= + +class PRFileInfo(BaseModel): + """Info about a single PR file.""" + path: str + content: str # Full file content (not just diff) + change_type: str # ADDED, MODIFIED, DELETED + + +class PRIndexRequest(BaseModel): + """Request to index PR files into main collection with PR metadata.""" + workspace: str + project: str + pr_number: int + branch: str # Source branch + files: List[PRFileInfo] + + +@app.post("/index/pr-files") +def index_pr_files(request: PRIndexRequest): + """ + Index PR files into the main collection with PR-specific metadata. + + Files are indexed with metadata: + - pr: true + - pr_number: + - pr_branch: + + This allows hybrid queries that prioritize PR data over branch data. + Existing PR points for the same pr_number are deleted first. + """ + try: + from datetime import datetime, timezone + from llama_index.core import Document as LlamaDocument + + collection_name = index_manager._get_project_collection_name( + request.workspace, request.project + ) + + # Ensure collection exists + index_manager._ensure_collection_exists(collection_name) + + # Delete existing points for this PR first (handles re-analysis) + try: + from qdrant_client.models import Filter, FieldCondition, MatchValue + index_manager.qdrant_client.delete( + collection_name=collection_name, + points_selector=Filter( + must=[ + FieldCondition(key="pr_number", match=MatchValue(value=request.pr_number)) + ] + ) + ) + logger.info(f"Deleted existing PR points for PR #{request.pr_number}") + except Exception as e: + logger.warning(f"Error deleting existing PR points: {e}") + + # Convert files to LlamaIndex documents + documents = [] + for file_info in request.files: + if not file_info.content or not file_info.content.strip(): + continue + if file_info.change_type == "DELETED": + continue # Don't index deleted files + + doc = LlamaDocument( + text=file_info.content, + metadata={ + "path": file_info.path, + "change_type": file_info.change_type, + } + ) + documents.append(doc) + + if not documents: + return { + "status": "skipped", + "message": "No files to index", + "chunks_indexed": 0 + } + + # Split documents into chunks + chunks = index_manager.splitter.split_documents(documents) + + # Add PR metadata to all chunks + for chunk in chunks: + chunk.metadata["pr"] = True + chunk.metadata["pr_number"] = request.pr_number + chunk.metadata["pr_branch"] = request.branch + chunk.metadata["workspace"] = request.workspace + chunk.metadata["project"] = request.project + chunk.metadata["branch"] = request.branch # For compatibility + chunk.metadata["indexed_at"] = datetime.now(timezone.utc).isoformat() + + # Embed and upsert using point_ops (with PR-specific IDs) + chunk_data = [] + chunks_by_file = {} + for chunk in chunks: + path = chunk.metadata.get("path", str(uuid.uuid4())) + if path not in chunks_by_file: + chunks_by_file[path] = [] + chunks_by_file[path].append(chunk) + + for path, file_chunks in chunks_by_file.items(): + for chunk_index, chunk in enumerate(file_chunks): + # Use PR-specific ID format to avoid collision with branch data + key = f"pr:{request.pr_number}:{request.workspace}:{request.project}:{path}:{chunk_index}" + point_id = str(uuid.uuid5(uuid.NAMESPACE_DNS, key)) + chunk_data.append((point_id, chunk)) + + # Embed and create points + points = index_manager._point_ops.embed_and_create_points(chunk_data) + + # Upsert to collection + successful, failed = index_manager._point_ops.upsert_points(collection_name, points) + + logger.info(f"Indexed PR #{request.pr_number}: {successful} chunks from {len(documents)} files") + + return { + "status": "indexed", + "pr_number": request.pr_number, + "files_processed": len(documents), + "chunks_indexed": successful, + "chunks_failed": failed + } + + except ValueError as e: + logger.warning(f"Invalid request for PR indexing: {e}") + raise HTTPException(status_code=400, detail=str(e)) + except Exception as e: + logger.error(f"Internal error indexing PR files: {e}") + raise HTTPException(status_code=500, detail="Internal indexing error") + + +@app.delete("/index/pr-files/{workspace}/{project}/{pr_number}") +def delete_pr_files(workspace: str, project: str, pr_number: int): + """ + Delete all indexed points for a specific PR. + + Called after analysis completes to clean up PR-specific data. + """ + try: + from qdrant_client.models import Filter, FieldCondition, MatchValue + + collection_name = index_manager._get_project_collection_name(workspace, project) + + # Check if collection exists + if not index_manager._collection_manager.collection_exists(collection_name): + return { + "status": "skipped", + "message": f"Collection does not exist" + } + + # Delete points with matching pr_number + result = index_manager.qdrant_client.delete( + collection_name=collection_name, + points_selector=Filter( + must=[ + FieldCondition(key="pr_number", match=MatchValue(value=pr_number)) + ] + ) + ) + + logger.info(f"Deleted PR #{pr_number} points from {collection_name}") + + return { + "status": "deleted", + "pr_number": pr_number, + "collection": collection_name + } + + except Exception as e: + logger.error(f"Error deleting PR files: {e}") + raise HTTPException(status_code=500, detail=str(e)) + + if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=8001) diff --git a/python-ecosystem/rag-pipeline/src/rag_pipeline/services/query_service.py b/python-ecosystem/rag-pipeline/src/rag_pipeline/services/query_service.py index b4078cd8..cb45f7dc 100644 --- a/python-ecosystem/rag-pipeline/src/rag_pipeline/services/query_service.py +++ b/python-ecosystem/rag-pipeline/src/rag_pipeline/services/query_service.py @@ -694,7 +694,8 @@ def get_context_for_pr( enable_priority_reranking: bool = True, min_relevance_score: float = 0.7, base_branch: Optional[str] = None, - deleted_files: Optional[List[str]] = None + deleted_files: Optional[List[str]] = None, + exclude_pr_files: Optional[List[str]] = None ) -> Dict: """ Get relevant context for PR review using Smart RAG with multi-branch support. @@ -706,9 +707,14 @@ def get_context_for_pr( branch: Target branch (the PR's source branch) base_branch: Base branch (the PR's target, e.g., 'main'). If None, uses fallback logic. deleted_files: Files that were deleted in target branch (excluded from results) + exclude_pr_files: Files indexed separately as PR data (excluded to avoid duplication) """ diff_snippets = diff_snippets or [] deleted_files = deleted_files or [] + exclude_pr_files = exclude_pr_files or [] + + # Combine exclusion lists: deleted files + PR-indexed files + all_excluded_paths = list(set(deleted_files + exclude_pr_files)) # Determine branches to search branches_to_search = [branch] @@ -768,7 +774,7 @@ def get_context_for_pr( branches=branches_to_search, top_k=q_top_k, instruction_type=q_instruction_type, - excluded_paths=deleted_files + excluded_paths=all_excluded_paths ) logger.info(f"Query {i+1}/{len(queries)} returned {len(results)} results")