diff --git a/backend/main.py b/backend/main.py index fb6db31..624688e 100644 --- a/backend/main.py +++ b/backend/main.py @@ -28,6 +28,7 @@ from routes.users import router as users_router from routes.search_v2 import router as search_v2_router from routes.ws_playground import websocket_playground_index +from routes.ws_repos import websocket_repo_indexing # Lifespan context manager for startup/shutdown @@ -95,6 +96,7 @@ async def dispatch(self, request: Request, call_next): # WebSocket endpoints (versioned) app.add_api_websocket_route(f"{API_PREFIX}/ws/index/{{repo_id}}", websocket_index) app.add_api_websocket_route(f"{API_PREFIX}/ws/playground/{{job_id}}", websocket_playground_index) +app.add_api_websocket_route(f"{API_PREFIX}/ws/repos/{{repo_id}}/indexing", websocket_repo_indexing) # ===== ERROR HANDLERS ===== diff --git a/backend/routes/repos.py b/backend/routes/repos.py index 59d8454..4348282 100644 --- a/backend/routes/repos.py +++ b/backend/routes/repos.py @@ -1,16 +1,18 @@ """Repository management routes - CRUD and indexing.""" -from fastapi import APIRouter, HTTPException, WebSocket, WebSocketDisconnect, Depends +from fastapi import APIRouter, HTTPException, WebSocket, WebSocketDisconnect, Depends, BackgroundTasks from pydantic import BaseModel from typing import Optional import hashlib import time +import asyncio import git from dependencies import ( - indexer, repo_manager, metrics, + indexer, repo_manager, metrics, redis_client, get_repo_or_404, user_limits, repo_validator ) from services.input_validator import InputValidator +from services.indexing_events import get_event_publisher, IndexingStats from middleware.auth import require_auth, AuthContext from services.observability import logger, capture_exception @@ -79,18 +81,24 @@ async def add_repository( # Fail CLOSED if analysis failed (security: don't allow unknown-size repos) if not analysis.success: logger.error( - "Repo analysis failed - blocking indexing", + "Repo analysis failed - removing repo", user_id=user_id, repo_id=repo["id"], error=analysis.error ) - return { - "repo_id": repo["id"], - "status": "added", - "indexing_blocked": True, - "analysis": analysis.to_dict(), - "message": f"Repository added but analysis failed: {analysis.error}. Please try re-indexing later." - } + # Clean up: delete the repo we just created + try: + repo_manager.delete_repo(repo["id"]) + except Exception as del_err: + logger.warning("Failed to cleanup failed analysis repo", error=str(del_err)) + + raise HTTPException( + status_code=500, + detail={ + "error": "ANALYSIS_FAILED", + "message": f"Repository analysis failed: {analysis.error}" + } + ) # Check repo size against tier limits size_check = user_limits.check_repo_size( @@ -100,22 +108,30 @@ async def add_repository( ) if not size_check.allowed: - # Repo added but too large - return warning with upgrade CTA + # Repo too large - delete the entry and return error logger.info( - "Repo too large for user tier", + "Repo too large for user tier - removing", user_id=user_id, repo_id=repo["id"], file_count=analysis.file_count, + estimated_functions=analysis.estimated_functions, tier=size_check.tier ) - return { - "repo_id": repo["id"], - "status": "added", - "indexing_blocked": True, - "analysis": analysis.to_dict(), - "limit_check": size_check.to_dict(), - "message": size_check.message - } + # Clean up: delete the repo we just created + try: + repo_manager.delete_repo(repo["id"]) + except Exception as del_err: + logger.warning("Failed to cleanup rejected repo", error=str(del_err)) + + raise HTTPException( + status_code=403, + detail={ + "error": "REPO_TOO_LARGE", + "analysis": analysis.to_dict(), + "limit_check": size_check.to_dict(), + "message": size_check.message + } + ) return { "repo_id": repo["id"], @@ -224,6 +240,216 @@ async def index_repository( raise HTTPException(status_code=500, detail=str(e)) +async def _run_async_indexing( + repo_id: str, + repo: dict, + user_id: str, + incremental: bool = True +): + """ + Background task for async indexing with real-time progress. + + Publishes events to Redis pub/sub for WebSocket clients. + """ + start_time = time.time() + publisher = get_event_publisher(redis_client) + + try: + # Wait for WebSocket client to connect and subscribe + # Redis pub/sub doesn't buffer - events sent before subscription are lost + # TODO: Consider Redis Streams or initial state fetch to avoid timing dependency + await asyncio.sleep(1.5) + + repo_manager.update_status(repo_id, "indexing") + + # Publish initial progress to confirm connection + if publisher: + publisher.publish_progress(repo_id, 0, 1, 0, "Starting...") + + # Check for incremental + last_commit = repo_manager.get_last_indexed_commit(repo_id) + + if incremental and last_commit: + logger.info("Async INCREMENTAL indexing", repo_id=repo_id, last_commit=last_commit[:8]) + total_functions = await indexer.incremental_index_repository( + repo_id, + repo["local_path"], + last_commit + ) + index_type = "incremental" + # For incremental, get file count from repo or analyze + total_files = repo.get("file_count", 0) + if not total_files: + analysis = repo_validator.analyze_repo(repo["local_path"]) + total_files = analysis.file_count if analysis and analysis.success else 0 + else: + logger.info("Async FULL indexing with progress", repo_id=repo_id) + + # Track total_files from progress callback + tracked_total_files = 0 + + # Progress callback that publishes to Redis + async def progress_callback( + files_processed: int, + functions_found: int, + total_files: int, + current_file: str = None, + functions_total: int = 0 + ): + nonlocal tracked_total_files + tracked_total_files = total_files + if publisher: + logger.info( + "Publishing progress event", + repo_id=repo_id, + files=f"{files_processed}/{total_files}", + functions=f"{functions_found}/{functions_total}" if functions_total else str(functions_found), + file=current_file + ) + publisher.publish_progress( + repo_id, + files_processed, + total_files, + functions_found, + current_file, + functions_total + ) + + total_functions = await indexer.index_repository_with_progress( + repo_id, + repo["local_path"], + progress_callback + ) + total_files = tracked_total_files + index_type = "full" + + # Update metadata + git_repo = git.Repo(repo["local_path"]) + current_commit = git_repo.head.commit.hexsha + + repo_manager.update_status(repo_id, "indexed") + repo_manager.update_file_count(repo_id, total_files) + repo_manager.update_last_commit(repo_id, current_commit) + + duration = time.time() - start_time + metrics.record_indexing(repo_id, duration, total_functions) + + # Publish completion event + if publisher: + publisher.publish_completed( + repo_id, + repo_id, + IndexingStats( + files_processed=total_files, + functions_indexed=total_functions, + indexing_time_seconds=duration + ) + ) + + logger.info( + "Async indexing complete", + repo_id=repo_id, + functions=total_functions, + duration=f"{duration:.2f}s", + index_type=index_type + ) + + except Exception as e: + logger.error("Async indexing failed", repo_id=repo_id, error=str(e)) + capture_exception(e) + repo_manager.update_status(repo_id, "error") + + # Publish error event + if publisher: + publisher.publish_error( + repo_id, + error="indexing_failed", + message=str(e), + recoverable=True + ) + + +@router.post("/{repo_id}/index/async", status_code=202) +async def index_repository_async( + repo_id: str, + background_tasks: BackgroundTasks, + incremental: bool = True, + auth: AuthContext = Depends(require_auth) +): + """ + Trigger async indexing for a repository. + + Returns immediately with status 202. Connect to WebSocket at + /api/v1/ws/repos/{repo_id}/indexing to receive real-time progress updates. + """ + user_id = auth.user_id + + if not user_id: + raise HTTPException(status_code=401, detail="User ID required") + + try: + repo = get_repo_or_404(repo_id, user_id) + + # Re-check size limits + analysis = repo_validator.analyze_repo(repo["local_path"]) + + if not analysis.success: + raise HTTPException( + status_code=500, + detail={ + "error": "ANALYSIS_FAILED", + "message": f"Cannot index: {analysis.error}" + } + ) + + size_check = user_limits.check_repo_size( + user_id, + analysis.file_count, + analysis.estimated_functions + ) + + if not size_check.allowed: + raise HTTPException( + status_code=403, + detail={ + "error": "REPO_TOO_LARGE", + "limit_check": size_check.to_dict(), + "message": size_check.message + } + ) + + # Atomic check-and-set: only set 'indexing' if not already indexing + # This prevents TOCTOU race where two requests both see status != 'indexing' + if not repo_manager.try_set_indexing(repo_id): + raise HTTPException( + status_code=409, + detail="Repository is already being indexed" + ) + + # Schedule background task + background_tasks.add_task( + _run_async_indexing, + repo_id, + repo, + user_id, + incremental + ) + + return { + "status": "indexing", + "repo_id": repo_id, + "message": "Indexing started. Connect to WebSocket for progress.", + "websocket_url": f"/api/v1/ws/repos/{repo_id}/indexing" + } + + except HTTPException: + raise + except Exception as e: + logger.error("Failed to start async indexing", repo_id=repo_id, error=str(e)) + capture_exception(e) + raise HTTPException(status_code=500, detail=str(e)) + + async def _authenticate_websocket(websocket: WebSocket) -> Optional[dict]: """Authenticate WebSocket via query parameter token.""" token = websocket.query_params.get("token") diff --git a/backend/routes/ws_repos.py b/backend/routes/ws_repos.py new file mode 100644 index 0000000..91bfd07 --- /dev/null +++ b/backend/routes/ws_repos.py @@ -0,0 +1,213 @@ +""" +WebSocket endpoint for real-time repo indexing progress. + +Subscribes to Redis pub/sub channel and streams events to authenticated clients. +Channel format: indexing:{repo_id}:events +""" +import json +import asyncio +from typing import Optional + +from fastapi import WebSocket, WebSocketDisconnect + +from dependencies import redis_client, repo_manager +from services.observability import logger + + +PING_INTERVAL_SECONDS = 30 +IDLE_TIMEOUT_SECONDS = 300 # 5 minutes for repo indexing (longer than playground) + + +async def authenticate_websocket(websocket: WebSocket) -> Optional[dict]: + """ + Authenticate WebSocket via query parameter token. + + Token is the user's JWT access token passed as ?token=xxx + """ + token = websocket.query_params.get("token") + if not token: + return None + + try: + from services.auth import get_auth_service + auth_service = get_auth_service() + return auth_service.verify_jwt(token) + except Exception as e: + logger.debug("WebSocket auth failed", error=str(e)) + return None + + +async def websocket_repo_indexing(websocket: WebSocket, repo_id: str): + """ + Stream repo indexing progress to authenticated client. + + Subscribes to Redis pub/sub channel for this repo and forwards + all indexing events. Does NOT trigger indexing - just listens. + + Use POST /repos/{repo_id}/index/async to start indexing, + then connect here to receive progress updates. + """ + # Authenticate + user = await authenticate_websocket(websocket) + if not user: + await websocket.accept() + await websocket.close(code=4001, reason="Authentication required") + return + + user_id = user.get("user_id") + if not user_id: + await websocket.accept() + await websocket.close(code=4001, reason="User ID required") + return + + # Verify user owns this repo + repo = repo_manager.get_repo_for_user(repo_id, user_id) + if not repo: + await websocket.accept() + await websocket.close(code=4004, reason="Repository not found") + return + + # Validate Redis connection + if not redis_client: + logger.error("WebSocket failed - no Redis connection") + await websocket.accept() + await websocket.close(code=4500, reason="Service unavailable") + return + + # Accept connection + await websocket.accept() + logger.info("Repo indexing WebSocket connected", repo_id=repo_id[:12], user_id=user_id[:12]) + + # Check current repo status + repo_status = repo.get("status") + + # If already indexed, send completion immediately + if repo_status == "indexed": + await websocket.send_json({ + "type": "completed", + "entity_id": repo_id, + "repo_id": repo_id, + "message": "Repository already indexed", + "stats": { + "files_processed": repo.get("file_count", 0), + "functions_indexed": repo.get("file_count", 0), + "indexing_time_seconds": 0 + } + }) + await websocket.close() + return + + # If error state, notify + if repo_status == "error": + await websocket.send_json({ + "type": "error", + "entity_id": repo_id, + "error": "previous_failure", + "message": "Previous indexing failed. Please try again.", + "recoverable": True + }) + await websocket.close() + return + + # Subscribe to indexing events channel + channel = f"indexing:{repo_id}:events" + pubsub = redis_client.pubsub() + + try: + await asyncio.to_thread(pubsub.subscribe, channel) + logger.debug("Subscribed to channel", channel=channel) + + # Send initial ack + await websocket.send_json({ + "type": "connected", + "entity_id": repo_id, + "current_status": repo_status, + "message": "Listening for indexing events" + }) + + last_activity = asyncio.get_event_loop().time() + + while True: + current_time = asyncio.get_event_loop().time() + + # Check idle timeout + if current_time - last_activity > IDLE_TIMEOUT_SECONDS: + logger.warning("WebSocket idle timeout", repo_id=repo_id[:12]) + await websocket.send_json({ + "type": "error", + "message": "Connection timed out" + }) + break + + # Check for new message (non-blocking) + message = await asyncio.to_thread( + pubsub.get_message, + ignore_subscribe_messages=True, + timeout=PING_INTERVAL_SECONDS + ) + + if message is None: + # Send ping to keep connection alive + try: + await websocket.send_json({"type": "ping"}) + except Exception: + logger.debug("Client disconnected during ping", repo_id=repo_id[:12]) + break + continue + + if message["type"] != "message": + continue + + # Reset activity timer + last_activity = current_time + + # Parse and forward event + try: + event_data = json.loads(message["data"]) + await websocket.send_json(event_data) + + # Close on terminal events + event_type = event_data.get("type") + if event_type in ("completed", "error"): + logger.info( + "Indexing finished, closing WebSocket", + repo_id=repo_id[:12], + event_type=event_type + ) + await asyncio.sleep(1.0) # Give client time to process message + break + + except json.JSONDecodeError: + logger.warning("Invalid JSON in pub/sub", repo_id=repo_id[:12]) + continue + except Exception as e: + logger.error("Error forwarding message", error=str(e)) + continue + + except WebSocketDisconnect: + logger.debug("WebSocket disconnected by client", repo_id=repo_id[:12]) + + except Exception as e: + logger.error("WebSocket error", error=str(e), repo_id=repo_id[:12]) + try: + await websocket.send_json({ + "type": "error", + "message": "Internal server error" + }) + except Exception: + pass + + finally: + # Cleanup + try: + await asyncio.to_thread(pubsub.unsubscribe, channel) + await asyncio.to_thread(pubsub.close) + except Exception: + pass + + try: + await websocket.close() + except Exception: + pass + + logger.debug("WebSocket cleanup complete", repo_id=repo_id[:12]) diff --git a/backend/services/indexer_optimized.py b/backend/services/indexer_optimized.py index d3268f2..9317488 100644 --- a/backend/services/indexer_optimized.py +++ b/backend/services/indexer_optimized.py @@ -768,12 +768,23 @@ async def index_repository_with_progress( ] all_embeddings = [] + total_to_embed = len(embedding_texts) for i in range(0, len(embedding_texts), self.EMBEDDING_BATCH_SIZE): batch_texts = embedding_texts[i:i + self.EMBEDDING_BATCH_SIZE] batch_embeddings = await self._create_embeddings_batch(batch_texts) all_embeddings.extend(batch_embeddings) - logger.debug("Embeddings generated", completed=len(all_embeddings), total=len(embedding_texts)) + # Report embedding progress - this is where most time is spent + embedded_count = len(all_embeddings) + await progress_callback( + total_files, # Files done + embedded_count, # Functions embedded so far + total_files, + f"Embedding functions ({embedded_count}/{total_to_embed})...", + total_to_embed # Total functions to embed + ) + + logger.debug("Embeddings generated", completed=embedded_count, total=total_to_embed) # Prepare vectors for Pinecone logger.debug("Uploading to Pinecone") diff --git a/backend/services/indexing_events.py b/backend/services/indexing_events.py new file mode 100644 index 0000000..5597fe5 --- /dev/null +++ b/backend/services/indexing_events.py @@ -0,0 +1,192 @@ +""" +Indexing Events Publisher + +Unified event publishing for real-time indexing progress. +Used by both playground (anonymous) and dashboard (authenticated) indexing. + +Channel format: indexing:{entity_id}:events +Entity can be job_id (playground) or repo_id (dashboard). +""" +import json +from typing import Optional, Any +from enum import Enum +from dataclasses import dataclass, asdict + +from services.observability import logger + + +class IndexingEventType(str, Enum): + """Event types for indexing progress.""" + CONNECTED = "connected" + CLONING = "cloning" + PROGRESS = "progress" + COMPLETED = "completed" + ERROR = "error" + PING = "ping" + + +@dataclass +class IndexingProgress: + """Progress data for indexing event.""" + files_processed: int + files_total: int + functions_found: int + current_file: Optional[str] = None + percent: int = 0 + functions_total: int = 0 # Total functions to embed (set during embedding phase) + + def __post_init__(self): + # Guard against division by zero + if self.files_total <= 0: + self.percent = 0 + return + + # If we have functions_total, we're in embedding phase (slow) - weight it 80% + # File extraction is fast, weight it 20% + if self.functions_total > 0: + file_progress = (self.files_processed / self.files_total) * 20 # 0-20% + embed_progress = (self.functions_found / self.functions_total) * 80 # 0-80% + raw_percent = file_progress + embed_progress + else: + # Still in file extraction phase (0-20%) + raw_percent = (self.files_processed / self.files_total) * 20 + + # Clamp to 0-100 range defensively + self.percent = max(0, min(100, int(raw_percent))) + + +@dataclass +class IndexingStats: + """Final stats for completed indexing.""" + files_processed: int + functions_indexed: int + indexing_time_seconds: float + + +class IndexingEventPublisher: + """ + Publishes indexing events to Redis pub/sub. + + Events are fire-and-forget. If no WebSocket clients are listening, + events are simply discarded. Redis job/repo state serves as fallback + for polling clients. + """ + + CHANNEL_PREFIX = "indexing:" + CHANNEL_SUFFIX = ":events" + + def __init__(self, redis_client): + self.redis = redis_client + + def _get_channel(self, entity_id: str) -> str: + """Get Redis pub/sub channel for entity.""" + return f"{self.CHANNEL_PREFIX}{entity_id}{self.CHANNEL_SUFFIX}" + + def _publish(self, entity_id: str, event: dict) -> bool: + """Publish event to Redis channel.""" + if not self.redis: + logger.warning("Cannot publish event - no Redis client") + return False + + try: + channel = self._get_channel(entity_id) + result = self.redis.publish(channel, json.dumps(event)) + logger.info( + "Published event to Redis", + channel=channel, + event_type=event.get("type"), + subscribers=result + ) + return True + except Exception as e: + logger.warning( + "Failed to publish indexing event", + entity_id=entity_id, + error=str(e) + ) + return False + + def publish_connected(self, entity_id: str, message: str = "Connected") -> bool: + """Publish connection acknowledgment.""" + return self._publish(entity_id, { + "type": IndexingEventType.CONNECTED.value, + "entity_id": entity_id, + "message": message + }) + + def publish_cloning( + self, + entity_id: str, + repo_name: str, + message: str = "Cloning repository..." + ) -> bool: + """Publish cloning started event.""" + return self._publish(entity_id, { + "type": IndexingEventType.CLONING.value, + "entity_id": entity_id, + "repo_name": repo_name, + "message": message + }) + + def publish_progress( + self, + entity_id: str, + files_processed: int, + files_total: int, + functions_found: int, + current_file: Optional[str] = None, + functions_total: int = 0 + ) -> bool: + """Publish indexing progress update.""" + progress = IndexingProgress( + files_processed=files_processed, + files_total=files_total, + functions_found=functions_found, + current_file=current_file, + functions_total=functions_total + ) + + return self._publish(entity_id, { + "type": IndexingEventType.PROGRESS.value, + "entity_id": entity_id, + **asdict(progress) + }) + + def publish_completed( + self, + entity_id: str, + repo_id: str, + stats: IndexingStats, + message: str = "Indexing complete" + ) -> bool: + """Publish indexing completed event.""" + return self._publish(entity_id, { + "type": IndexingEventType.COMPLETED.value, + "entity_id": entity_id, + "repo_id": repo_id, + "stats": asdict(stats), + "message": message + }) + + def publish_error( + self, + entity_id: str, + error: str, + message: str, + recoverable: bool = False + ) -> bool: + """Publish indexing error event.""" + return self._publish(entity_id, { + "type": IndexingEventType.ERROR.value, + "entity_id": entity_id, + "error": error, + "message": message, + "recoverable": recoverable + }) + + +def get_event_publisher(redis_client) -> Optional[IndexingEventPublisher]: + """Factory function to get event publisher.""" + if not redis_client: + return None + return IndexingEventPublisher(redis_client) diff --git a/backend/services/repo_manager.py b/backend/services/repo_manager.py index bcdd6b7..7430968 100644 --- a/backend/services/repo_manager.py +++ b/backend/services/repo_manager.py @@ -135,6 +135,15 @@ def update_status(self, repo_id: str, status: str): """Update repository status""" self.db.update_repository_status(repo_id, status) + def try_set_indexing(self, repo_id: str) -> bool: + """ + Atomically set status to 'indexing' only if not already indexing. + + Returns True if status was set, False if already indexing. + Use this instead of checking status then updating to prevent race conditions. + """ + return self.db.try_set_indexing_status(repo_id) + def update_file_count(self, repo_id: str, count: int): """Update file count""" self.db.update_file_count(repo_id, count) diff --git a/backend/services/supabase_service.py b/backend/services/supabase_service.py index 59aee91..7aa1112 100644 --- a/backend/services/supabase_service.py +++ b/backend/services/supabase_service.py @@ -94,6 +94,21 @@ def update_repository_status(self, repo_id: str, status: str) -> None: """Update repository status""" self.client.table("repositories").update({"status": status}).eq("id", repo_id).execute() + def try_set_indexing_status(self, repo_id: str) -> bool: + """ + Atomically set status to 'indexing' only if not already indexing. + + Returns True if status was set, False if repo was already indexing. + This prevents TOCTOU race conditions where two requests could both + see status != 'indexing' and both start indexing. + """ + result = self.client.table("repositories").update( + {"status": "indexing"} + ).eq("id", repo_id).neq("status", "indexing").execute() + + # If result.data is empty, no rows matched (already indexing) + return bool(result.data) + def update_file_count(self, repo_id: str, count: int) -> None: """Update repository file count""" self.client.table("repositories").update({"file_count": count}).eq("id", repo_id).execute() diff --git a/frontend/src/components/IndexingProgressModal.tsx b/frontend/src/components/IndexingProgressModal.tsx new file mode 100644 index 0000000..d4903ef --- /dev/null +++ b/frontend/src/components/IndexingProgressModal.tsx @@ -0,0 +1,291 @@ +'use client' + +/** + * IndexingProgressModal + * + * Real-time indexing progress display with file streaming. + * Uses WebSocket for live updates. + */ + +import { useEffect, useRef } from 'react' +import { motion, AnimatePresence } from 'framer-motion' +import { + Loader2, + CheckCircle2, + AlertCircle, + FileCode2, + X, + RefreshCw +} from 'lucide-react' +import { + useRepoIndexingWebSocket +} from '@/hooks/useRepoIndexingWebSocket' +import type { + IndexingPhase, + IndexingStats +} from '@/hooks/useRepoIndexingWebSocket' + +interface IndexingProgressModalProps { + repoId: string | null + repoName: string + isOpen: boolean + onClose: () => void + onCompleted?: (repoId: string, stats: IndexingStats) => void + onRetry?: () => void +} + +export function IndexingProgressModal({ + repoId, + repoName, + isOpen, + onClose, + onCompleted, + onRetry, +}: IndexingProgressModalProps) { + const { + phase, + progress, + recentFiles, + completedStats, + error, + isRecoverable, + reset, + } = useRepoIndexingWebSocket(isOpen ? repoId : null, { + onCompleted: (rid, stats) => { + onCompleted?.(rid, stats) + }, + }) + + // Auto-close after completion (with delay for user to see success) + const closeTimeoutRef = useRef | null>(null) + + useEffect(() => { + if (phase === 'completed') { + closeTimeoutRef.current = setTimeout(() => { + onClose() + reset() + }, 3500) // 3.5s to let user see completion stats + } + return () => { + if (closeTimeoutRef.current) { + clearTimeout(closeTimeoutRef.current) + closeTimeoutRef.current = null + } + } + }, [phase, onClose, reset]) + + // Clear timeout when modal closes + useEffect(() => { + if (!isOpen && closeTimeoutRef.current) { + clearTimeout(closeTimeoutRef.current) + closeTimeoutRef.current = null + } + }, [isOpen]) + + // Handle Escape key to close modal + useEffect(() => { + if (!isOpen) return + + const handleKeyDown = (e: KeyboardEvent) => { + if (e.key === 'Escape') { + if (closeTimeoutRef.current) { + clearTimeout(closeTimeoutRef.current) + closeTimeoutRef.current = null + } + reset() + onClose() + } + } + + document.addEventListener('keydown', handleKeyDown) + return () => document.removeEventListener('keydown', handleKeyDown) + }, [isOpen, onClose, reset]) + + const handleClose = () => { + if (closeTimeoutRef.current) { + clearTimeout(closeTimeoutRef.current) + closeTimeoutRef.current = null + } + reset() + onClose() + } + + const handleRetry = () => { + if (closeTimeoutRef.current) { + clearTimeout(closeTimeoutRef.current) + closeTimeoutRef.current = null + } + reset() + onRetry?.() + } + + return ( + + {isOpen && ( + + e.stopPropagation()} + role="dialog" + aria-modal="true" + aria-labelledby="indexing-progress-title" + > + {/* Header */} +
+

+ Indexing {repoName} +

+ +
+ + {/* Content */} +
+ + + {/* Progress bar */} + {(phase === 'indexing' || phase === 'completed') && ( +
+
+ Progress + {progress.percent}% +
+
+ +
+
+ )} + + {/* Stats */} + {phase === 'indexing' && ( +
+ + +
+ )} + + {/* Recent files */} + {phase === 'indexing' && recentFiles.length > 0 && ( +
+

Processing

+
+ + {recentFiles.slice(0, 5).map((file, i) => ( + + + {file} + + ))} + +
+
+ )} + + {/* Completion stats */} + {phase === 'completed' && completedStats && ( + + + + + + )} + + {/* Error with retry */} + {phase === 'error' && isRecoverable && onRetry && ( + + + Try Again + + )} +
+
+
+ )} +
+ ) +} + +function PhaseIndicator({ phase, error }: { phase: IndexingPhase; error: string | null }) { + const config = { + idle: { icon: Loader2, text: 'Preparing...', color: 'text-zinc-400', spin: true }, + connecting: { icon: Loader2, text: 'Connecting...', color: 'text-zinc-400', spin: true }, + indexing: { icon: Loader2, text: 'Indexing repository...', color: 'text-indigo-400', spin: true }, + completed: { icon: CheckCircle2, text: 'Indexing complete!', color: 'text-emerald-400', spin: false }, + error: { icon: AlertCircle, text: error || 'An error occurred', color: 'text-red-400', spin: false }, + } + + const { icon: Icon, text, color, spin } = config[phase] + + return ( +
+
+ +
+ {text} +
+ ) +} + +function StatCard({ label, value }: { label: string; value: string }) { + return ( +
+

{label}

+

{value}

+
+ ) +} diff --git a/frontend/src/components/dashboard/DashboardHome.tsx b/frontend/src/components/dashboard/DashboardHome.tsx index a682398..8b13b32 100644 --- a/frontend/src/components/dashboard/DashboardHome.tsx +++ b/frontend/src/components/dashboard/DashboardHome.tsx @@ -22,6 +22,7 @@ import { RepoOverview } from '../RepoOverview' import { StyleInsights } from '../StyleInsights' import { ImpactAnalyzer } from '../ImpactAnalyzer' import { DashboardStats } from './DashboardStats' +import { IndexingProgressModal } from '../IndexingProgressModal' import type { Repository } from '../../types' import { API_URL } from '../../config/api' @@ -35,6 +36,11 @@ export function DashboardHome() { const [loading, setLoading] = useState(false) const [reposLoading, setReposLoading] = useState(true) const [showAddForm, setShowAddForm] = useState(false) + + // Indexing progress modal state + const [indexingRepoId, setIndexingRepoId] = useState(null) + const [indexingRepoName, setIndexingRepoName] = useState('') + const [showIndexingModal, setShowIndexingModal] = useState(false) const fetchRepos = async () => { if (!session?.access_token) return @@ -78,29 +84,79 @@ export function DashboardHome() { const data = await response.json() if (!data.repo_id) throw new Error('Missing repo_id in response') - await fetch(`${API_URL}/repos/${data.repo_id}/index`, { + // Trigger async indexing + const indexResponse = await fetch(`${API_URL}/repos/${data.repo_id}/index/async`, { method: 'POST', headers: { 'Authorization': `Bearer ${session?.access_token}` } }) + + if (!indexResponse.ok) { + const err = await indexResponse.json().catch(() => ({})) + throw new Error(err.detail?.message || err.detail || 'Failed to start indexing') + } + + // Show indexing progress modal + setIndexingRepoId(data.repo_id) + setIndexingRepoName(name) + setShowIndexingModal(true) + setShowAddForm(false) + await fetchRepos() - toast.success('Repository added!', { description: `${name} is now being indexed` }) } catch (error) { console.error('Error adding repo:', error) - toast.error('Failed to add repository', { description: 'Please check the Git URL and try again' }) + toast.error('Failed to add repository', { description: error instanceof Error ? error.message : 'Please check the Git URL and try again' }) } finally { setLoading(false) } } + const handleIndexingComplete = async () => { + await fetchRepos() + toast.success('Indexing complete!', { description: `${indexingRepoName} is ready for search` }) + } + + const handleCloseIndexingModal = () => { + setShowIndexingModal(false) + setIndexingRepoId(null) + setIndexingRepoName('') + } + + const handleRetryIndexing = async () => { + if (!indexingRepoId) return + + try { + const response = await fetch(`${API_URL}/repos/${indexingRepoId}/index/async`, { + method: 'POST', + headers: { 'Authorization': `Bearer ${session?.access_token}` } + }) + + if (!response.ok) { + throw new Error('Failed to restart indexing') + } + + // Modal will reconnect via WebSocket + } catch (error) { + toast.error('Failed to retry indexing') + } + } + const handleReindex = async () => { - if (!selectedRepo) return + if (!selectedRepo || !selectedRepoData) return try { setLoading(true) - await fetch(`${API_URL}/repos/${selectedRepo}/index`, { + const response = await fetch(`${API_URL}/repos/${selectedRepo}/index/async`, { method: 'POST', headers: { 'Authorization': `Bearer ${session?.access_token}` } }) - await fetchRepos() + + if (!response.ok) { + throw new Error('Failed to start re-indexing') + } + + // Show indexing progress modal + setIndexingRepoId(selectedRepo) + setIndexingRepoName(selectedRepoData.name) + setShowIndexingModal(true) } catch (error) { toast.error('Re-indexing failed', { description: 'Please check the console for details' }) } finally { @@ -274,6 +330,16 @@ export function DashboardHome() { )} + + {/* Indexing Progress Modal */} + ) } diff --git a/frontend/src/hooks/useRepoIndexingWebSocket.ts b/frontend/src/hooks/useRepoIndexingWebSocket.ts new file mode 100644 index 0000000..e0c330e --- /dev/null +++ b/frontend/src/hooks/useRepoIndexingWebSocket.ts @@ -0,0 +1,278 @@ +/** + * useRepoIndexingWebSocket + * + * WebSocket hook for real-time repo indexing progress in dashboard. + * Connects to /ws/repos/{repo_id}/indexing with JWT auth. + */ + +import { useState, useEffect, useCallback, useRef } from 'react' +import { buildWsUrl } from '@/config/api' +import { useAuth } from '@/contexts/AuthContext' + +export type ConnectionState = 'idle' | 'connecting' | 'connected' | 'disconnected' | 'error' + +export type IndexingPhase = 'idle' | 'connecting' | 'indexing' | 'completed' | 'error' + +export interface IndexingProgress { + percent: number + filesProcessed: number + filesTotal: number + currentFile: string + functionsFound: number +} + +export interface IndexingStats { + files_processed: number + functions_indexed: number + indexing_time_seconds: number +} + +interface WSEvent { + type: 'connected' | 'progress' | 'completed' | 'error' | 'ping' + entity_id?: string + repo_id?: string + message?: string + // Progress fields + files_processed?: number + files_total?: number + functions_found?: number + current_file?: string + percent?: number + // Completion fields + stats?: IndexingStats + // Error fields + error?: string + recoverable?: boolean +} + +interface UseRepoIndexingOptions { + onCompleted?: (repoId: string, stats: IndexingStats) => void + onError?: (error: string, recoverable: boolean) => void +} + +const INITIAL_PROGRESS: IndexingProgress = { + percent: 0, + filesProcessed: 0, + filesTotal: 0, + currentFile: '', + functionsFound: 0, +} + +export function useRepoIndexingWebSocket( + repoId: string | null, + options: UseRepoIndexingOptions = {} +) { + const { session } = useAuth() + + // Refs for callbacks to prevent dependency loops + const onCompletedRef = useRef(options.onCompleted) + const onErrorRef = useRef(options.onError) + onCompletedRef.current = options.onCompleted + onErrorRef.current = options.onError + + const [connectionState, setConnectionState] = useState('idle') + const [phase, setPhase] = useState('idle') + const [progress, setProgress] = useState(INITIAL_PROGRESS) + const [recentFiles, setRecentFiles] = useState([]) + const [completedStats, setCompletedStats] = useState(null) + const [error, setError] = useState(null) + const [isRecoverable, setIsRecoverable] = useState(false) + + const wsRef = useRef(null) + const reconnectAttempts = useRef(0) + const reconnectTimeout = useRef | null>(null) + // Track phase in ref to avoid effect re-running when phase changes + const phaseRef = useRef('idle') + phaseRef.current = phase + + const cleanup = useCallback(() => { + wsRef.current?.close() + wsRef.current = null + if (reconnectTimeout.current) { + clearTimeout(reconnectTimeout.current) + reconnectTimeout.current = null + } + }, []) + + const handleMessage = useCallback((event: MessageEvent) => { + try { + const data: WSEvent = JSON.parse(event.data) + + switch (data.type) { + case 'connected': + setConnectionState('connected') + setPhase('connecting') + break + + case 'ping': + // Keepalive, ignore + break + + case 'progress': + setPhase('indexing') + setProgress({ + percent: data.percent || 0, + filesProcessed: data.files_processed || 0, + filesTotal: data.files_total || 0, + currentFile: data.current_file || '', + functionsFound: data.functions_found || 0, + }) + // Track recent files + if (data.current_file) { + setRecentFiles(prev => { + const filtered = prev.filter(f => f !== data.current_file) + return [data.current_file!, ...filtered].slice(0, 10) + }) + } + break + + case 'completed': + setPhase('completed') + setProgress(prev => ({ ...prev, percent: 100 })) + if (data.stats) { + setCompletedStats(data.stats) + onCompletedRef.current?.(data.repo_id || '', data.stats) + } + break + + case 'error': { + setPhase('error') + // Prefer data.error (server error code) over data.message (human-readable) + const errorMessage = data.error || data.message || 'Unknown error' + setError(errorMessage) + setIsRecoverable(data.recoverable || false) + onErrorRef.current?.(errorMessage, data.recoverable || false) + break + } + } + } catch (err) { + console.error('[WS] Parse error:', err) + } + }, []) + + const connect = useCallback((rid: string) => { + if (!session?.access_token) { + console.error('[WS] No auth token available') + setConnectionState('error') + setPhase('error') + setError('Authentication required - no access token') + return + } + + cleanup() + setConnectionState('connecting') + setPhase('connecting') + setError(null) + + const wsUrl = buildWsUrl(`/ws/repos/${rid}/indexing?token=${session.access_token}`) + console.log('[WS] Connecting to repo indexing:', rid) + + try { + const ws = new WebSocket(wsUrl) + wsRef.current = ws + + ws.onopen = () => { + // Guard against stale socket callbacks after cleanup/reconnect + if (ws !== wsRef.current) return + console.log('[WS] Connected') + reconnectAttempts.current = 0 + setConnectionState('connected') + } + + ws.onmessage = (event) => { + // Guard against stale socket callbacks + if (ws !== wsRef.current) return + handleMessage(event) + } + + ws.onerror = () => { + // Guard against stale socket callbacks + if (ws !== wsRef.current) return + setConnectionState('error') + setPhase('error') + setError('WebSocket connection error') + } + + ws.onclose = (event) => { + // Guard against stale socket callbacks after cleanup/reconnect + if (ws !== wsRef.current) return + + console.log('[WS] Closed:', event.code, event.reason) + + // Normal closure or auth failure - don't reconnect + if (event.code === 1000 || event.code === 4001 || event.code === 4004) { + setConnectionState('disconnected') + return + } + + // Try to reconnect (max 3 attempts) + if (reconnectAttempts.current < 3) { + const delay = Math.min(1000 * Math.pow(2, reconnectAttempts.current), 5000) + reconnectTimeout.current = setTimeout(() => { + reconnectAttempts.current++ + connect(rid) + }, delay) + } else { + setConnectionState('error') + setPhase('error') + setError('Connection failed after multiple attempts') + } + } + } catch (err) { + console.error('[WS] Connection error:', err) + setConnectionState('error') + setPhase('error') + setError(err instanceof Error ? err.message : 'Failed to create WebSocket') + } + }, [session?.access_token, cleanup, handleMessage]) + + // Connect when repoId changes + // Note: phase intentionally excluded from deps - we use phaseRef to check completion + // without triggering reconnects when phase changes to 'completed' + useEffect(() => { + if (repoId && session?.access_token) { + // Reset reconnect counter for fresh session + reconnectAttempts.current = 0 + connect(repoId) + } else { + cleanup() + // Only reset if not completed (use ref to avoid dependency) + if (phaseRef.current !== 'completed') { + setConnectionState('idle') + setPhase('idle') + setProgress(INITIAL_PROGRESS) + setRecentFiles([]) + setError(null) + } + } + return cleanup + // eslint-disable-next-line react-hooks/exhaustive-deps + }, [repoId, session?.access_token, connect, cleanup]) + + const reset = useCallback(() => { + cleanup() + reconnectAttempts.current = 0 + setConnectionState('idle') + setPhase('idle') + setProgress(INITIAL_PROGRESS) + setRecentFiles([]) + setCompletedStats(null) + setError(null) + setIsRecoverable(false) + }, [cleanup]) + + return { + connectionState, + phase, + progress, + recentFiles, + completedStats, + error, + isRecoverable, + reset, + isConnected: connectionState === 'connected', + isIndexing: phase === 'indexing', + isCompleted: phase === 'completed', + hasError: phase === 'error', + } +}