Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
4acb709
feat(dashboard): real-time indexing progress with WebSocket streaming
DevanshuNEU Jan 27, 2026
5c65021
fix(backend): atomic check-and-set for indexing status to prevent TOC…
DevanshuNEU Jan 27, 2026
c60f4e9
fix(modal): move isOpen check inside AnimatePresence for exit animations
DevanshuNEU Jan 27, 2026
a53f526
fix(ws-hook): set phase and error state on all connection failures
DevanshuNEU Jan 27, 2026
069e772
fix(ws-hook): guard against stale socket callbacks after cleanup/reco…
DevanshuNEU Jan 27, 2026
12148a2
fix(ws-hook): remove phase from effect deps to prevent reconnect on c…
DevanshuNEU Jan 27, 2026
6e4294e
fix(ws-hook): prefer data.error over data.message in error events
DevanshuNEU Jan 27, 2026
65e6482
fix(modal): use 'import type' for TypeScript type-only imports
DevanshuNEU Jan 27, 2026
b40eb3a
fix(api): return 202 Accepted for POST /repos/{id}/index/async
DevanshuNEU Jan 27, 2026
fc17c5f
fix(indexing): track total_files separately from total_functions
DevanshuNEU Jan 27, 2026
53e10ce
fix(modal): clear auto-close timeout on manual close/retry
DevanshuNEU Jan 27, 2026
4e85b7f
fix(ws-hook): block-scope errorMessage in switch case 'error'
DevanshuNEU Jan 27, 2026
732f9e4
fix(ws-hook): reset reconnectAttempts for new sessions and manual resets
DevanshuNEU Jan 27, 2026
81b55dd
docs(api): fix WebSocket path in async indexing docstring
DevanshuNEU Jan 27, 2026
93aac70
fix(ws): increase delay before closing WebSocket after completed event
DevanshuNEU Jan 27, 2026
39eed6b
fix(modal): increase auto-close delay from 2s to 3.5s
DevanshuNEU Jan 27, 2026
2fa9487
fix(api): delete repo entry when size check or analysis fails
DevanshuNEU Jan 27, 2026
dfe614a
fix(ws): add delay before indexing to let WebSocket connect
DevanshuNEU Jan 27, 2026
a4c8e11
fix(progress): report embedding progress (was showing 100% instantly)
DevanshuNEU Jan 27, 2026
ef5fc8f
fix(a11y): add ARIA dialog attributes to IndexingProgressModal
DevanshuNEU Jan 27, 2026
367301c
fix: address CodeRabbit review feedback
DevanshuNEU Jan 27, 2026
1f845e4
fix(progress): clamp percent to 0-100 range
DevanshuNEU Jan 27, 2026
55d4f97
polish: enhance processing list animations
DevanshuNEU Jan 27, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions backend/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
from routes.users import router as users_router
from routes.search_v2 import router as search_v2_router
from routes.ws_playground import websocket_playground_index
from routes.ws_repos import websocket_repo_indexing


# Lifespan context manager for startup/shutdown
Expand Down Expand Up @@ -95,6 +96,7 @@ async def dispatch(self, request: Request, call_next):
# WebSocket endpoints (versioned)
app.add_api_websocket_route(f"{API_PREFIX}/ws/index/{{repo_id}}", websocket_index)
app.add_api_websocket_route(f"{API_PREFIX}/ws/playground/{{job_id}}", websocket_playground_index)
app.add_api_websocket_route(f"{API_PREFIX}/ws/repos/{{repo_id}}/indexing", websocket_repo_indexing)


# ===== ERROR HANDLERS =====
Expand Down
266 changes: 246 additions & 20 deletions backend/routes/repos.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,18 @@
"""Repository management routes - CRUD and indexing."""
from fastapi import APIRouter, HTTPException, WebSocket, WebSocketDisconnect, Depends
from fastapi import APIRouter, HTTPException, WebSocket, WebSocketDisconnect, Depends, BackgroundTasks
from pydantic import BaseModel
from typing import Optional
import hashlib
import time
import asyncio
import git

from dependencies import (
indexer, repo_manager, metrics,
indexer, repo_manager, metrics, redis_client,
get_repo_or_404, user_limits, repo_validator
)
from services.input_validator import InputValidator
from services.indexing_events import get_event_publisher, IndexingStats
from middleware.auth import require_auth, AuthContext
from services.observability import logger, capture_exception

Expand Down Expand Up @@ -79,18 +81,24 @@ async def add_repository(
# Fail CLOSED if analysis failed (security: don't allow unknown-size repos)
if not analysis.success:
logger.error(
"Repo analysis failed - blocking indexing",
"Repo analysis failed - removing repo",
user_id=user_id,
repo_id=repo["id"],
error=analysis.error
)
return {
"repo_id": repo["id"],
"status": "added",
"indexing_blocked": True,
"analysis": analysis.to_dict(),
"message": f"Repository added but analysis failed: {analysis.error}. Please try re-indexing later."
}
# Clean up: delete the repo we just created
try:
repo_manager.delete_repo(repo["id"])
except Exception as del_err:
logger.warning("Failed to cleanup failed analysis repo", error=str(del_err))

raise HTTPException(
status_code=500,
detail={
"error": "ANALYSIS_FAILED",
"message": f"Repository analysis failed: {analysis.error}"
}
)

# Check repo size against tier limits
size_check = user_limits.check_repo_size(
Expand All @@ -100,22 +108,30 @@ async def add_repository(
)

if not size_check.allowed:
# Repo added but too large - return warning with upgrade CTA
# Repo too large - delete the entry and return error
logger.info(
"Repo too large for user tier",
"Repo too large for user tier - removing",
user_id=user_id,
repo_id=repo["id"],
file_count=analysis.file_count,
estimated_functions=analysis.estimated_functions,
tier=size_check.tier
)
return {
"repo_id": repo["id"],
"status": "added",
"indexing_blocked": True,
"analysis": analysis.to_dict(),
"limit_check": size_check.to_dict(),
"message": size_check.message
}
# Clean up: delete the repo we just created
try:
repo_manager.delete_repo(repo["id"])
except Exception as del_err:
logger.warning("Failed to cleanup rejected repo", error=str(del_err))

raise HTTPException(
status_code=403,
detail={
"error": "REPO_TOO_LARGE",
"analysis": analysis.to_dict(),
"limit_check": size_check.to_dict(),
"message": size_check.message
}
)

return {
"repo_id": repo["id"],
Expand Down Expand Up @@ -224,6 +240,216 @@ async def index_repository(
raise HTTPException(status_code=500, detail=str(e))


async def _run_async_indexing(
repo_id: str,
repo: dict,
user_id: str,
incremental: bool = True
):
"""
Background task for async indexing with real-time progress.

Publishes events to Redis pub/sub for WebSocket clients.
"""
start_time = time.time()
publisher = get_event_publisher(redis_client)

try:
# Wait for WebSocket client to connect and subscribe
# Redis pub/sub doesn't buffer - events sent before subscription are lost
# TODO: Consider Redis Streams or initial state fetch to avoid timing dependency
await asyncio.sleep(1.5)

repo_manager.update_status(repo_id, "indexing")

# Publish initial progress to confirm connection
if publisher:
publisher.publish_progress(repo_id, 0, 1, 0, "Starting...")

# Check for incremental
last_commit = repo_manager.get_last_indexed_commit(repo_id)

if incremental and last_commit:
logger.info("Async INCREMENTAL indexing", repo_id=repo_id, last_commit=last_commit[:8])
total_functions = await indexer.incremental_index_repository(
repo_id,
repo["local_path"],
last_commit
)
index_type = "incremental"
# For incremental, get file count from repo or analyze
total_files = repo.get("file_count", 0)
if not total_files:
analysis = repo_validator.analyze_repo(repo["local_path"])
total_files = analysis.file_count if analysis and analysis.success else 0
else:
logger.info("Async FULL indexing with progress", repo_id=repo_id)

# Track total_files from progress callback
tracked_total_files = 0

# Progress callback that publishes to Redis
async def progress_callback(
files_processed: int,
functions_found: int,
total_files: int,
current_file: str = None,
functions_total: int = 0
):
nonlocal tracked_total_files
tracked_total_files = total_files
if publisher:
logger.info(
"Publishing progress event",
repo_id=repo_id,
files=f"{files_processed}/{total_files}",
functions=f"{functions_found}/{functions_total}" if functions_total else str(functions_found),
file=current_file
)
publisher.publish_progress(
repo_id,
files_processed,
total_files,
functions_found,
current_file,
functions_total
)

total_functions = await indexer.index_repository_with_progress(
repo_id,
repo["local_path"],
progress_callback
)
total_files = tracked_total_files
index_type = "full"

# Update metadata
git_repo = git.Repo(repo["local_path"])
current_commit = git_repo.head.commit.hexsha

repo_manager.update_status(repo_id, "indexed")
repo_manager.update_file_count(repo_id, total_files)
repo_manager.update_last_commit(repo_id, current_commit)

duration = time.time() - start_time
metrics.record_indexing(repo_id, duration, total_functions)

# Publish completion event
if publisher:
publisher.publish_completed(
repo_id,
repo_id,
IndexingStats(
files_processed=total_files,
functions_indexed=total_functions,
indexing_time_seconds=duration
)
)
Comment thread
coderabbitai[bot] marked this conversation as resolved.
Comment thread
DevanshuNEU marked this conversation as resolved.

logger.info(
"Async indexing complete",
repo_id=repo_id,
functions=total_functions,
duration=f"{duration:.2f}s",
index_type=index_type
)

except Exception as e:
logger.error("Async indexing failed", repo_id=repo_id, error=str(e))
capture_exception(e)
repo_manager.update_status(repo_id, "error")

# Publish error event
if publisher:
publisher.publish_error(
repo_id,
error="indexing_failed",
message=str(e),
recoverable=True
)


@router.post("/{repo_id}/index/async", status_code=202)
async def index_repository_async(
repo_id: str,
background_tasks: BackgroundTasks,
incremental: bool = True,
auth: AuthContext = Depends(require_auth)
):
"""
Trigger async indexing for a repository.

Returns immediately with status 202. Connect to WebSocket at
/api/v1/ws/repos/{repo_id}/indexing to receive real-time progress updates.
"""
user_id = auth.user_id

if not user_id:
raise HTTPException(status_code=401, detail="User ID required")

try:
repo = get_repo_or_404(repo_id, user_id)

# Re-check size limits
analysis = repo_validator.analyze_repo(repo["local_path"])

if not analysis.success:
raise HTTPException(
status_code=500,
detail={
"error": "ANALYSIS_FAILED",
"message": f"Cannot index: {analysis.error}"
}
)

size_check = user_limits.check_repo_size(
user_id,
analysis.file_count,
analysis.estimated_functions
)

if not size_check.allowed:
raise HTTPException(
status_code=403,
detail={
"error": "REPO_TOO_LARGE",
"limit_check": size_check.to_dict(),
"message": size_check.message
}
)

# Atomic check-and-set: only set 'indexing' if not already indexing
# This prevents TOCTOU race where two requests both see status != 'indexing'
if not repo_manager.try_set_indexing(repo_id):
raise HTTPException(
status_code=409,
detail="Repository is already being indexed"
)

# Schedule background task
background_tasks.add_task(
_run_async_indexing,
repo_id,
repo,
user_id,
incremental
)

return {
"status": "indexing",
"repo_id": repo_id,
"message": "Indexing started. Connect to WebSocket for progress.",
"websocket_url": f"/api/v1/ws/repos/{repo_id}/indexing"
}
Comment thread
DevanshuNEU marked this conversation as resolved.

except HTTPException:
raise
except Exception as e:
logger.error("Failed to start async indexing", repo_id=repo_id, error=str(e))
capture_exception(e)
raise HTTPException(status_code=500, detail=str(e))


async def _authenticate_websocket(websocket: WebSocket) -> Optional[dict]:
"""Authenticate WebSocket via query parameter token."""
token = websocket.query_params.get("token")
Expand Down
Loading