|
1 | 1 | """Repository management routes - CRUD and indexing.""" |
2 | | -from fastapi import APIRouter, HTTPException, WebSocket, WebSocketDisconnect, Depends |
| 2 | +from fastapi import APIRouter, HTTPException, WebSocket, WebSocketDisconnect, Depends, BackgroundTasks |
3 | 3 | from pydantic import BaseModel |
4 | 4 | from typing import Optional |
5 | 5 | import hashlib |
6 | 6 | import time |
| 7 | +import asyncio |
7 | 8 | import git |
8 | 9 |
|
9 | 10 | from dependencies import ( |
10 | | - indexer, repo_manager, metrics, |
| 11 | + indexer, repo_manager, metrics, redis_client, |
11 | 12 | get_repo_or_404, user_limits, repo_validator |
12 | 13 | ) |
13 | 14 | from services.input_validator import InputValidator |
| 15 | +from services.indexing_events import get_event_publisher, IndexingStats |
14 | 16 | from middleware.auth import require_auth, AuthContext |
15 | 17 | from services.observability import logger, capture_exception |
16 | 18 |
|
@@ -224,6 +226,189 @@ async def index_repository( |
224 | 226 | raise HTTPException(status_code=500, detail=str(e)) |
225 | 227 |
|
226 | 228 |
|
| 229 | +async def _run_async_indexing( |
| 230 | + repo_id: str, |
| 231 | + repo: dict, |
| 232 | + user_id: str, |
| 233 | + incremental: bool = True |
| 234 | +): |
| 235 | + """ |
| 236 | + Background task for async indexing with real-time progress. |
| 237 | + |
| 238 | + Publishes events to Redis pub/sub for WebSocket clients. |
| 239 | + """ |
| 240 | + start_time = time.time() |
| 241 | + publisher = get_event_publisher(redis_client) |
| 242 | + |
| 243 | + try: |
| 244 | + repo_manager.update_status(repo_id, "indexing") |
| 245 | + |
| 246 | + # Check for incremental |
| 247 | + last_commit = repo_manager.get_last_indexed_commit(repo_id) |
| 248 | + |
| 249 | + if incremental and last_commit: |
| 250 | + logger.info("Async INCREMENTAL indexing", repo_id=repo_id, last_commit=last_commit[:8]) |
| 251 | + total_functions = await indexer.incremental_index_repository( |
| 252 | + repo_id, |
| 253 | + repo["local_path"], |
| 254 | + last_commit |
| 255 | + ) |
| 256 | + index_type = "incremental" |
| 257 | + else: |
| 258 | + logger.info("Async FULL indexing with progress", repo_id=repo_id) |
| 259 | + |
| 260 | + # Progress callback that publishes to Redis |
| 261 | + async def progress_callback( |
| 262 | + files_processed: int, |
| 263 | + functions_found: int, |
| 264 | + total_files: int, |
| 265 | + current_file: str = None |
| 266 | + ): |
| 267 | + if publisher: |
| 268 | + publisher.publish_progress( |
| 269 | + repo_id, |
| 270 | + files_processed, |
| 271 | + total_files, |
| 272 | + functions_found, |
| 273 | + current_file |
| 274 | + ) |
| 275 | + |
| 276 | + total_functions = await indexer.index_repository_with_progress( |
| 277 | + repo_id, |
| 278 | + repo["local_path"], |
| 279 | + progress_callback |
| 280 | + ) |
| 281 | + index_type = "full" |
| 282 | + |
| 283 | + # Update metadata |
| 284 | + git_repo = git.Repo(repo["local_path"]) |
| 285 | + current_commit = git_repo.head.commit.hexsha |
| 286 | + |
| 287 | + repo_manager.update_status(repo_id, "indexed") |
| 288 | + repo_manager.update_file_count(repo_id, total_functions) |
| 289 | + repo_manager.update_last_commit(repo_id, current_commit) |
| 290 | + |
| 291 | + duration = time.time() - start_time |
| 292 | + metrics.record_indexing(repo_id, duration, total_functions) |
| 293 | + |
| 294 | + # Publish completion event |
| 295 | + if publisher: |
| 296 | + publisher.publish_completed( |
| 297 | + repo_id, |
| 298 | + repo_id, |
| 299 | + IndexingStats( |
| 300 | + files_processed=total_functions, |
| 301 | + functions_indexed=total_functions, |
| 302 | + indexing_time_seconds=duration |
| 303 | + ) |
| 304 | + ) |
| 305 | + |
| 306 | + logger.info( |
| 307 | + "Async indexing complete", |
| 308 | + repo_id=repo_id, |
| 309 | + functions=total_functions, |
| 310 | + duration=f"{duration:.2f}s", |
| 311 | + index_type=index_type |
| 312 | + ) |
| 313 | + |
| 314 | + except Exception as e: |
| 315 | + logger.error("Async indexing failed", repo_id=repo_id, error=str(e)) |
| 316 | + capture_exception(e) |
| 317 | + repo_manager.update_status(repo_id, "error") |
| 318 | + |
| 319 | + # Publish error event |
| 320 | + if publisher: |
| 321 | + publisher.publish_error( |
| 322 | + repo_id, |
| 323 | + error="indexing_failed", |
| 324 | + message=str(e), |
| 325 | + recoverable=True |
| 326 | + ) |
| 327 | + |
| 328 | + |
| 329 | +@router.post("/{repo_id}/index/async") |
| 330 | +async def index_repository_async( |
| 331 | + repo_id: str, |
| 332 | + background_tasks: BackgroundTasks, |
| 333 | + incremental: bool = True, |
| 334 | + auth: AuthContext = Depends(require_auth) |
| 335 | +): |
| 336 | + """ |
| 337 | + Trigger async indexing for a repository. |
| 338 | + |
| 339 | + Returns immediately with status 202. Connect to WebSocket at |
| 340 | + /ws/repos/{repo_id}/indexing to receive real-time progress updates. |
| 341 | + """ |
| 342 | + user_id = auth.user_id |
| 343 | + |
| 344 | + if not user_id: |
| 345 | + raise HTTPException(status_code=401, detail="User ID required") |
| 346 | + |
| 347 | + try: |
| 348 | + repo = get_repo_or_404(repo_id, user_id) |
| 349 | + |
| 350 | + # Check if already indexing |
| 351 | + if repo.get("status") == "indexing": |
| 352 | + raise HTTPException( |
| 353 | + status_code=409, |
| 354 | + detail="Repository is already being indexed" |
| 355 | + ) |
| 356 | + |
| 357 | + # Re-check size limits |
| 358 | + analysis = repo_validator.analyze_repo(repo["local_path"]) |
| 359 | + |
| 360 | + if not analysis.success: |
| 361 | + raise HTTPException( |
| 362 | + status_code=500, |
| 363 | + detail={ |
| 364 | + "error": "ANALYSIS_FAILED", |
| 365 | + "message": f"Cannot index: {analysis.error}" |
| 366 | + } |
| 367 | + ) |
| 368 | + |
| 369 | + size_check = user_limits.check_repo_size( |
| 370 | + user_id, |
| 371 | + analysis.file_count, |
| 372 | + analysis.estimated_functions |
| 373 | + ) |
| 374 | + |
| 375 | + if not size_check.allowed: |
| 376 | + raise HTTPException( |
| 377 | + status_code=403, |
| 378 | + detail={ |
| 379 | + "error": "REPO_TOO_LARGE", |
| 380 | + "limit_check": size_check.to_dict(), |
| 381 | + "message": size_check.message |
| 382 | + } |
| 383 | + ) |
| 384 | + |
| 385 | + # Mark as indexing immediately |
| 386 | + repo_manager.update_status(repo_id, "indexing") |
| 387 | + |
| 388 | + # Schedule background task |
| 389 | + background_tasks.add_task( |
| 390 | + _run_async_indexing, |
| 391 | + repo_id, |
| 392 | + repo, |
| 393 | + user_id, |
| 394 | + incremental |
| 395 | + ) |
| 396 | + |
| 397 | + return { |
| 398 | + "status": "indexing", |
| 399 | + "repo_id": repo_id, |
| 400 | + "message": "Indexing started. Connect to WebSocket for progress.", |
| 401 | + "websocket_url": f"/api/v1/ws/repos/{repo_id}/indexing" |
| 402 | + } |
| 403 | + |
| 404 | + except HTTPException: |
| 405 | + raise |
| 406 | + except Exception as e: |
| 407 | + logger.error("Failed to start async indexing", repo_id=repo_id, error=str(e)) |
| 408 | + capture_exception(e) |
| 409 | + raise HTTPException(status_code=500, detail=str(e)) |
| 410 | + |
| 411 | + |
227 | 412 | async def _authenticate_websocket(websocket: WebSocket) -> Optional[dict]: |
228 | 413 | """Authenticate WebSocket via query parameter token.""" |
229 | 414 | token = websocket.query_params.get("token") |
|
0 commit comments