Skip to content

Commit 1a83150

Browse files
authored
Merge pull request #226 from DevanshuNEU/feat/realtime-indexing-progress
feat(dashboard): real-time indexing progress with WebSocket streaming
2 parents 6c1b5ba + 55d4f97 commit 1a83150

10 files changed

Lines changed: 1330 additions & 27 deletions

File tree

backend/main.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
from routes.users import router as users_router
2929
from routes.search_v2 import router as search_v2_router
3030
from routes.ws_playground import websocket_playground_index
31+
from routes.ws_repos import websocket_repo_indexing
3132

3233

3334
# Lifespan context manager for startup/shutdown
@@ -95,6 +96,7 @@ async def dispatch(self, request: Request, call_next):
9596
# WebSocket endpoints (versioned)
9697
app.add_api_websocket_route(f"{API_PREFIX}/ws/index/{{repo_id}}", websocket_index)
9798
app.add_api_websocket_route(f"{API_PREFIX}/ws/playground/{{job_id}}", websocket_playground_index)
99+
app.add_api_websocket_route(f"{API_PREFIX}/ws/repos/{{repo_id}}/indexing", websocket_repo_indexing)
98100

99101

100102
# ===== ERROR HANDLERS =====

backend/routes/repos.py

Lines changed: 246 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,18 @@
11
"""Repository management routes - CRUD and indexing."""
2-
from fastapi import APIRouter, HTTPException, WebSocket, WebSocketDisconnect, Depends
2+
from fastapi import APIRouter, HTTPException, WebSocket, WebSocketDisconnect, Depends, BackgroundTasks
33
from pydantic import BaseModel
44
from typing import Optional
55
import hashlib
66
import time
7+
import asyncio
78
import git
89

910
from dependencies import (
10-
indexer, repo_manager, metrics,
11+
indexer, repo_manager, metrics, redis_client,
1112
get_repo_or_404, user_limits, repo_validator
1213
)
1314
from services.input_validator import InputValidator
15+
from services.indexing_events import get_event_publisher, IndexingStats
1416
from middleware.auth import require_auth, AuthContext
1517
from services.observability import logger, capture_exception
1618

@@ -79,18 +81,24 @@ async def add_repository(
7981
# Fail CLOSED if analysis failed (security: don't allow unknown-size repos)
8082
if not analysis.success:
8183
logger.error(
82-
"Repo analysis failed - blocking indexing",
84+
"Repo analysis failed - removing repo",
8385
user_id=user_id,
8486
repo_id=repo["id"],
8587
error=analysis.error
8688
)
87-
return {
88-
"repo_id": repo["id"],
89-
"status": "added",
90-
"indexing_blocked": True,
91-
"analysis": analysis.to_dict(),
92-
"message": f"Repository added but analysis failed: {analysis.error}. Please try re-indexing later."
93-
}
89+
# Clean up: delete the repo we just created
90+
try:
91+
repo_manager.delete_repo(repo["id"])
92+
except Exception as del_err:
93+
logger.warning("Failed to cleanup failed analysis repo", error=str(del_err))
94+
95+
raise HTTPException(
96+
status_code=500,
97+
detail={
98+
"error": "ANALYSIS_FAILED",
99+
"message": f"Repository analysis failed: {analysis.error}"
100+
}
101+
)
94102

95103
# Check repo size against tier limits
96104
size_check = user_limits.check_repo_size(
@@ -100,22 +108,30 @@ async def add_repository(
100108
)
101109

102110
if not size_check.allowed:
103-
# Repo added but too large - return warning with upgrade CTA
111+
# Repo too large - delete the entry and return error
104112
logger.info(
105-
"Repo too large for user tier",
113+
"Repo too large for user tier - removing",
106114
user_id=user_id,
107115
repo_id=repo["id"],
108116
file_count=analysis.file_count,
117+
estimated_functions=analysis.estimated_functions,
109118
tier=size_check.tier
110119
)
111-
return {
112-
"repo_id": repo["id"],
113-
"status": "added",
114-
"indexing_blocked": True,
115-
"analysis": analysis.to_dict(),
116-
"limit_check": size_check.to_dict(),
117-
"message": size_check.message
118-
}
120+
# Clean up: delete the repo we just created
121+
try:
122+
repo_manager.delete_repo(repo["id"])
123+
except Exception as del_err:
124+
logger.warning("Failed to cleanup rejected repo", error=str(del_err))
125+
126+
raise HTTPException(
127+
status_code=403,
128+
detail={
129+
"error": "REPO_TOO_LARGE",
130+
"analysis": analysis.to_dict(),
131+
"limit_check": size_check.to_dict(),
132+
"message": size_check.message
133+
}
134+
)
119135

120136
return {
121137
"repo_id": repo["id"],
@@ -224,6 +240,216 @@ async def index_repository(
224240
raise HTTPException(status_code=500, detail=str(e))
225241

226242

243+
async def _run_async_indexing(
244+
repo_id: str,
245+
repo: dict,
246+
user_id: str,
247+
incremental: bool = True
248+
):
249+
"""
250+
Background task for async indexing with real-time progress.
251+
252+
Publishes events to Redis pub/sub for WebSocket clients.
253+
"""
254+
start_time = time.time()
255+
publisher = get_event_publisher(redis_client)
256+
257+
try:
258+
# Wait for WebSocket client to connect and subscribe
259+
# Redis pub/sub doesn't buffer - events sent before subscription are lost
260+
# TODO: Consider Redis Streams or initial state fetch to avoid timing dependency
261+
await asyncio.sleep(1.5)
262+
263+
repo_manager.update_status(repo_id, "indexing")
264+
265+
# Publish initial progress to confirm connection
266+
if publisher:
267+
publisher.publish_progress(repo_id, 0, 1, 0, "Starting...")
268+
269+
# Check for incremental
270+
last_commit = repo_manager.get_last_indexed_commit(repo_id)
271+
272+
if incremental and last_commit:
273+
logger.info("Async INCREMENTAL indexing", repo_id=repo_id, last_commit=last_commit[:8])
274+
total_functions = await indexer.incremental_index_repository(
275+
repo_id,
276+
repo["local_path"],
277+
last_commit
278+
)
279+
index_type = "incremental"
280+
# For incremental, get file count from repo or analyze
281+
total_files = repo.get("file_count", 0)
282+
if not total_files:
283+
analysis = repo_validator.analyze_repo(repo["local_path"])
284+
total_files = analysis.file_count if analysis and analysis.success else 0
285+
else:
286+
logger.info("Async FULL indexing with progress", repo_id=repo_id)
287+
288+
# Track total_files from progress callback
289+
tracked_total_files = 0
290+
291+
# Progress callback that publishes to Redis
292+
async def progress_callback(
293+
files_processed: int,
294+
functions_found: int,
295+
total_files: int,
296+
current_file: str = None,
297+
functions_total: int = 0
298+
):
299+
nonlocal tracked_total_files
300+
tracked_total_files = total_files
301+
if publisher:
302+
logger.info(
303+
"Publishing progress event",
304+
repo_id=repo_id,
305+
files=f"{files_processed}/{total_files}",
306+
functions=f"{functions_found}/{functions_total}" if functions_total else str(functions_found),
307+
file=current_file
308+
)
309+
publisher.publish_progress(
310+
repo_id,
311+
files_processed,
312+
total_files,
313+
functions_found,
314+
current_file,
315+
functions_total
316+
)
317+
318+
total_functions = await indexer.index_repository_with_progress(
319+
repo_id,
320+
repo["local_path"],
321+
progress_callback
322+
)
323+
total_files = tracked_total_files
324+
index_type = "full"
325+
326+
# Update metadata
327+
git_repo = git.Repo(repo["local_path"])
328+
current_commit = git_repo.head.commit.hexsha
329+
330+
repo_manager.update_status(repo_id, "indexed")
331+
repo_manager.update_file_count(repo_id, total_files)
332+
repo_manager.update_last_commit(repo_id, current_commit)
333+
334+
duration = time.time() - start_time
335+
metrics.record_indexing(repo_id, duration, total_functions)
336+
337+
# Publish completion event
338+
if publisher:
339+
publisher.publish_completed(
340+
repo_id,
341+
repo_id,
342+
IndexingStats(
343+
files_processed=total_files,
344+
functions_indexed=total_functions,
345+
indexing_time_seconds=duration
346+
)
347+
)
348+
349+
logger.info(
350+
"Async indexing complete",
351+
repo_id=repo_id,
352+
functions=total_functions,
353+
duration=f"{duration:.2f}s",
354+
index_type=index_type
355+
)
356+
357+
except Exception as e:
358+
logger.error("Async indexing failed", repo_id=repo_id, error=str(e))
359+
capture_exception(e)
360+
repo_manager.update_status(repo_id, "error")
361+
362+
# Publish error event
363+
if publisher:
364+
publisher.publish_error(
365+
repo_id,
366+
error="indexing_failed",
367+
message=str(e),
368+
recoverable=True
369+
)
370+
371+
372+
@router.post("/{repo_id}/index/async", status_code=202)
373+
async def index_repository_async(
374+
repo_id: str,
375+
background_tasks: BackgroundTasks,
376+
incremental: bool = True,
377+
auth: AuthContext = Depends(require_auth)
378+
):
379+
"""
380+
Trigger async indexing for a repository.
381+
382+
Returns immediately with status 202. Connect to WebSocket at
383+
/api/v1/ws/repos/{repo_id}/indexing to receive real-time progress updates.
384+
"""
385+
user_id = auth.user_id
386+
387+
if not user_id:
388+
raise HTTPException(status_code=401, detail="User ID required")
389+
390+
try:
391+
repo = get_repo_or_404(repo_id, user_id)
392+
393+
# Re-check size limits
394+
analysis = repo_validator.analyze_repo(repo["local_path"])
395+
396+
if not analysis.success:
397+
raise HTTPException(
398+
status_code=500,
399+
detail={
400+
"error": "ANALYSIS_FAILED",
401+
"message": f"Cannot index: {analysis.error}"
402+
}
403+
)
404+
405+
size_check = user_limits.check_repo_size(
406+
user_id,
407+
analysis.file_count,
408+
analysis.estimated_functions
409+
)
410+
411+
if not size_check.allowed:
412+
raise HTTPException(
413+
status_code=403,
414+
detail={
415+
"error": "REPO_TOO_LARGE",
416+
"limit_check": size_check.to_dict(),
417+
"message": size_check.message
418+
}
419+
)
420+
421+
# Atomic check-and-set: only set 'indexing' if not already indexing
422+
# This prevents TOCTOU race where two requests both see status != 'indexing'
423+
if not repo_manager.try_set_indexing(repo_id):
424+
raise HTTPException(
425+
status_code=409,
426+
detail="Repository is already being indexed"
427+
)
428+
429+
# Schedule background task
430+
background_tasks.add_task(
431+
_run_async_indexing,
432+
repo_id,
433+
repo,
434+
user_id,
435+
incremental
436+
)
437+
438+
return {
439+
"status": "indexing",
440+
"repo_id": repo_id,
441+
"message": "Indexing started. Connect to WebSocket for progress.",
442+
"websocket_url": f"/api/v1/ws/repos/{repo_id}/indexing"
443+
}
444+
445+
except HTTPException:
446+
raise
447+
except Exception as e:
448+
logger.error("Failed to start async indexing", repo_id=repo_id, error=str(e))
449+
capture_exception(e)
450+
raise HTTPException(status_code=500, detail=str(e))
451+
452+
227453
async def _authenticate_websocket(websocket: WebSocket) -> Optional[dict]:
228454
"""Authenticate WebSocket via query parameter token."""
229455
token = websocket.query_params.get("token")

0 commit comments

Comments
 (0)