Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions backend/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,10 @@
@asynccontextmanager
async def lifespan(app: FastAPI):
validate_environment()
# Any repo left 'indexing' at boot is orphaned (indexing runs in-process; a restart
# kills it). Reset so the user can retry instead of seeing an eternal spinner. (#311)
from services.supabase_service import get_supabase_service
get_supabase_service().reset_stuck_indexing_jobs()
await load_demo_repos()
yield
# Shutdown (cleanup if needed)
Expand Down
11 changes: 11 additions & 0 deletions backend/migrations/003_add_indexing_started_at.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
-- Durable repo-state v0.1 (issue #311)
-- Records when a repo entered the 'indexing' state so the stuck-job reaper can tell a
-- live job from an orphaned one (process died mid-index, leaving status='indexing' forever).

ALTER TABLE repositories
ADD COLUMN IF NOT EXISTS indexing_started_at TIMESTAMPTZ;

-- Partial index: the reaper and the steal-on-retry path only ever scan rows currently indexing.
CREATE INDEX IF NOT EXISTS idx_repositories_indexing_started_at
ON repositories(indexing_started_at)
WHERE status = 'indexing';
9 changes: 8 additions & 1 deletion backend/routes/analysis.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

from dependencies import (
dependency_analyzer, style_analyzer, dna_extractor,
get_repo_or_404
get_repo_or_404, repo_manager
)
from services.input_validator import InputValidator
from middleware.auth import require_auth, AuthContext
Expand Down Expand Up @@ -35,6 +35,7 @@ async def get_dependency_graph(
return {**cached_graph, "cached": True}

logger.info("Building fresh dependency graph", repo_id=repo_id, include_paths=repo.get("include_paths"))
await repo_manager.ensure_clone(repo)
graph_data = dependency_analyzer.build_dependency_graph(repo["local_path"], include_paths=repo.get("include_paths"))
dependency_analyzer.save_to_cache(repo_id, graph_data)

Expand All @@ -57,12 +58,15 @@ async def analyze_impact(
try:
repo = get_repo_or_404(repo_id, auth.user_id)

# Validate user input BEFORE any expensive/external work (re-clone). Reject traversal first.
valid_path, path_error = InputValidator.validate_file_path(
request.file_path, repo["local_path"]
)
if not valid_path:
raise HTTPException(status_code=400, detail=f"Invalid file path: {path_error}")

await repo_manager.ensure_clone(repo)

graph_data = dependency_analyzer.load_from_cache(repo_id)
if not graph_data:
logger.info("Building dependency graph for impact analysis", repo_id=repo_id)
Expand Down Expand Up @@ -96,6 +100,7 @@ async def get_repository_insights(
graph_data = dependency_analyzer.load_from_cache(repo_id)
if not graph_data:
logger.info("Building dependency graph for insights", repo_id=repo_id)
await repo_manager.ensure_clone(repo)
graph_data = dependency_analyzer.build_dependency_graph(repo["local_path"], include_paths=repo.get("include_paths"))
dependency_analyzer.save_to_cache(repo_id, graph_data)

Expand Down Expand Up @@ -134,6 +139,7 @@ async def get_style_analysis(
return {**cached_style, "cached": True}

logger.info("Analyzing code style", repo_id=repo_id)
await repo_manager.ensure_clone(repo)
style_data = style_analyzer.analyze_repository_style(repo["local_path"], include_paths=repo.get("include_paths"))
style_analyzer.save_to_cache(repo_id, style_data)

Expand Down Expand Up @@ -178,6 +184,7 @@ async def get_codebase_dna(
logger.info("Extracting codebase DNA", repo_id=repo_id)
metrics.increment("dna_extractions")

await repo_manager.ensure_clone(repo)
dna = dna_extractor.extract_dna(repo["local_path"], repo_id, include_paths=repo.get("include_paths"))
dna_extractor.save_to_cache(repo_id, dna)

Expand Down
26 changes: 19 additions & 7 deletions backend/routes/repos.py
Original file line number Diff line number Diff line change
Expand Up @@ -470,11 +470,10 @@ async def get_repo_directories(
directories to index instead of the entire repo.
"""
repo = get_repo_or_404(repo_id, auth.user_id)
# Re-clone if the container was redeployed and wiped the working tree (#311).
await repo_manager.ensure_clone(repo)
local_path = Path(repo["local_path"])

if not local_path.exists():
raise HTTPException(status_code=404, detail="Repo not cloned yet")

dirs = await asyncio.to_thread(_scan_directories, local_path)

return {
Expand All @@ -501,7 +500,9 @@ async def index_repository(

try:
repo = get_repo_or_404(repo_id, user_id)

# Re-clone if the container was redeployed and wiped the working tree (#311).
await repo_manager.ensure_clone(repo)

# Re-check size limits before indexing (in case tier changed or repo updated)
analysis = repo_validator.analyze_repo(repo["local_path"])

Expand Down Expand Up @@ -765,10 +766,13 @@ async def index_repository_async(

try:
repo = get_repo_or_404(repo_id, user_id)

# Re-clone before the size-check (which reads the working tree) so a redeployed
# container rehydrates here, before the 202, rather than failing the gate (#311).
await repo_manager.ensure_clone(repo)

# Re-check size limits
analysis = repo_validator.analyze_repo(repo["local_path"])

if not analysis.success:
raise HTTPException(
status_code=500,
Expand Down Expand Up @@ -867,7 +871,15 @@ async def websocket_index(websocket: WebSocket, repo_id: str):
if not repo:
await websocket.close(code=4004, reason="Repository not found")
return


# Re-clone if the container was redeployed and wiped the working tree (#311).
try:
await repo_manager.ensure_clone(repo)
except Exception as e:
logger.error("Re-clone failed for websocket indexing", repo_id=repo_id, error=str(e))
await websocket.close(code=4005, reason="Repository clone unavailable")
return

# Check size limits before WebSocket indexing
analysis = repo_validator.analyze_repo(repo["local_path"])

Expand Down
101 changes: 95 additions & 6 deletions backend/services/repo_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,54 @@
Repository Manager (Supabase Edition)
Handles repository CRUD operations with PostgreSQL via Supabase
"""
import asyncio
import os
import shutil
import uuid
from typing import List, Optional
from typing import Dict, List, Optional
import git
from pathlib import Path
from fastapi import HTTPException
from services.supabase_service import get_supabase_service
from services.observability import logger, metrics


class RepoCloneError(HTTPException):
"""Repo working tree is missing and could not be restored from its git remote.

Subclasses HTTPException so it surfaces as an actionable 503 (handlers already re-raise
HTTPException) instead of an opaque 500. UX matters here: a redeploy is invisible to the
user, so the message has to tell them what to actually do. (#311)
"""

def __init__(self, repo_id: str, reason: str = ""):
super().__init__(
status_code=503,
detail={
"error": "REPO_UNAVAILABLE",
"repo_id": repo_id,
"message": (
"Repository source files are temporarily unavailable and could not be "
"restored from the git remote. Private repositories are not yet supported "
"for re-sync; for public repos, please retry shortly."
),
},
)
self.reason = reason


class RepositoryManager:
"""Manage repositories with Supabase persistence"""

def __init__(self):
self.repos_dir = Path("./repos")
self.repos_dir.mkdir(exist_ok=True)
self.db = get_supabase_service()


# Per-repo locks so two concurrent ops on the same missing clone don't both clone.
# Single uvicorn worker means an in-process lock is sufficient here.
self._clone_locks: Dict[str, asyncio.Lock] = {}

# Discover and sync existing repositories on startup
self._sync_existing_repos()

Expand Down Expand Up @@ -126,10 +158,69 @@ def add_repo(self, name: str, git_url: str, branch: str = "main", user_id: Optio
except Exception as e:
# Cleanup on failure
if local_path.exists():
import shutil
shutil.rmtree(local_path)
raise Exception(f"Failed to clone repository: {str(e)}")


async def ensure_clone(self, repo: dict) -> str:
"""Guarantee the working tree exists on disk, lazily re-cloning from git_url if needed.

Railway redeploys wipe ./repos (ephemeral disk) but Pinecone/Supabase survive, so
local_path is a cache hint, not source of truth -- the git remote is. On a warm hit
this is a sub-millisecond stat with no behavior change; on a miss it re-clones.
Returns the canonical local path and refreshes repo['local_path'] in place.
"""
repo_id = repo["id"]
canonical = self.repos_dir / repo_id

# Warm path: clone present. No re-clone, no event-loop work.
if (canonical / ".git").exists():
repo["local_path"] = str(canonical)
return str(canonical)

git_url = repo.get("git_url")
if not git_url or git_url == "unknown":
raise RepoCloneError(repo_id, "no git_url on record")
branch = repo.get("branch") or "main"

lock = self._clone_locks.setdefault(repo_id, asyncio.Lock())
async with lock:
# Another coroutine may have cloned while we waited for the lock.
if not (canonical / ".git").exists():
try:
await asyncio.to_thread(self._clone_into_place, repo_id, git_url, branch, canonical)
except Exception as e:
# Private repo (no creds on a fresh container), network failure, deleted
# remote: surface as an actionable 503, not an opaque 500.
logger.error("Re-clone failed", repo_id=repo_id, git_url=git_url, error=str(e))
raise RepoCloneError(repo_id, str(e)) from e
logger.info("Re-cloned repo on demand (cache miss)", repo_id=repo_id, git_url=git_url)
metrics.increment("repos_recloned")

repo["local_path"] = str(canonical)
return str(canonical)

def _clone_into_place(self, repo_id: str, git_url: str, branch: str, canonical: Path) -> None:
Comment thread
coderabbitai[bot] marked this conversation as resolved.
"""Clone into a temp dir then atomically rename into the canonical path.

The rename is the correctness guarantee: a crashed or concurrent clone never leaves a
half-populated canonical dir for a reader to trip over. Runs in a worker thread (git is
blocking I/O); never call directly on the event loop.
"""
tmp = self.repos_dir / f".{repo_id}.tmp.{uuid.uuid4().hex}"
try:
git.Repo.clone_from(git_url, tmp, branch=branch, depth=1)
# Clear any leftover partial dir before the atomic swap. Do NOT ignore errors here:
# a failed removal must surface (the outer except re-raises it, and ensure_clone wraps
# it into a logged RepoCloneError) rather than letting us rename onto a dir we could
# not clean, which would fail later with a more confusing error.
if canonical.exists():
shutil.rmtree(canonical)
os.rename(tmp, canonical) # atomic on the same filesystem
except Exception:
if tmp.exists():
shutil.rmtree(tmp, ignore_errors=True)
raise

def update_status(self, repo_id: str, status: str):
"""Update repository status"""
self.db.update_repository_status(repo_id, status)
Expand Down Expand Up @@ -158,8 +249,6 @@ def update_last_commit(self, repo_id: str, commit_sha: str, function_count: int

def delete_repo(self, repo_id: str) -> bool:
"""Delete repository and clean up local files"""
import shutil

repo = self.get_repo(repo_id)
if not repo:
return False
Expand Down
87 changes: 73 additions & 14 deletions backend/services/supabase_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,16 @@
"""
import os
from typing import Dict, List, Optional
from datetime import datetime
from datetime import datetime, timedelta
from supabase import create_client, Client, ClientOptions
import uuid

from services.observability import logger

# A repo stuck in 'indexing' longer than this is treated as orphaned (its indexer
# process died) and may be re-claimed on retry. The startup sweep is unconditional.
STUCK_INDEXING_THRESHOLD_MINUTES = 30


class SupabaseService:
"""Service for Supabase database operations"""
Expand Down Expand Up @@ -87,23 +91,78 @@ def update_repository(self, repo_id: str, updates: Dict) -> Optional[Dict]:
return result.data[0] if result.data else None

def update_repository_status(self, repo_id: str, status: str) -> None:
"""Update repository status"""
self.client.table("repositories").update({"status": status}).eq("id", repo_id).execute()

"""Update repository status. Transitioning to 'indexing' stamps indexing_started_at
so the stuck-job reaper has a clock to measure against, regardless of which code path
started the job."""
updates: Dict = {"status": status}
if status == "indexing":
updates["indexing_started_at"] = datetime.utcnow().isoformat()
try:
self.client.table("repositories").update(updates).eq("id", repo_id).execute()
except Exception as e:
# Only swallow the specific case where migration 003 (indexing_started_at) has not been
# applied yet -- detected by the column name appearing in the DB error. Any other error
# (network, constraint, etc.) must re-raise so a real failure isn't masked by the retry.
if "indexing_started_at" not in updates or "indexing_started_at" not in str(e):
raise
logger.warning(
"indexing_started_at column missing; retrying status update without it (apply migration 003)",
repo_id=repo_id, error=str(e),
)
self.client.table("repositories").update({"status": status}).eq("id", repo_id).execute()
Comment thread
coderabbitai[bot] marked this conversation as resolved.

def try_set_indexing_status(self, repo_id: str) -> bool:
"""
Atomically set status to 'indexing' only if not already indexing.

Returns True if status was set, False if repo was already indexing.
This prevents TOCTOU race conditions where two requests could both
see status != 'indexing' and both start indexing.
Atomically set status to 'indexing' only if not already actively indexing.

Returns True if status was set, False if a fresh indexing job already owns the repo.
This prevents TOCTOU race conditions where two requests both see status != 'indexing'
and both start indexing. A row stuck in 'indexing' past the threshold (its process died)
-- or with no start stamp at all (legacy/pre-migration) -- is treated as orphaned and
re-claimed, so a crashed job never permanently blocks retry.
"""
result = self.client.table("repositories").update(
{"status": "indexing"}
).eq("id", repo_id).neq("status", "indexing").execute()

# If result.data is empty, no rows matched (already indexing)
# cutoff is the staleness threshold for stealing an orphaned 'indexing' lock, derived from
# STUCK_INDEXING_THRESHOLD_MINUTES and a SERVER-CONTROLLED datetime.utcnow(). It is never
# user input, so interpolating it into the PostgREST .or_ filter string is safe. If any
# user-supplied value is ever introduced into this filter, parameterize it via the SDK
# (do not f-string it) to avoid filter-injection.
cutoff = (datetime.utcnow() - timedelta(minutes=STUCK_INDEXING_THRESHOLD_MINUTES)).isoformat()
try:
result = self.client.table("repositories").update(
{"status": "indexing", "indexing_started_at": datetime.utcnow().isoformat()}
).eq("id", repo_id).or_(
f"status.neq.indexing,indexing_started_at.is.null,indexing_started_at.lt.{cutoff}"
).execute()
except Exception as e:
# Only fall back when migration 003 (indexing_started_at) is missing -- detected by the
# column name in the DB error. Re-raise anything else so a real failure isn't masked.
if "indexing_started_at" not in str(e):
raise
logger.warning(
"indexing_started_at column missing; falling back to basic CAS (apply migration 003)",
repo_id=repo_id, error=str(e),
)
result = self.client.table("repositories").update(
{"status": "indexing"}
).eq("id", repo_id).neq("status", "indexing").execute()

# If result.data is empty, no rows matched (a fresh indexing job owns it)
return bool(result.data)

def reset_stuck_indexing_jobs(self) -> int:
"""Reset every repo left in 'indexing' to 'error' so the user can retry.

Called once on startup: indexing runs in-process (BackgroundTasks / WebSocket), so a
restart kills any in-flight job and every 'indexing' row at boot is by definition
orphaned. Returns the number of rows reset.
"""
result = self.client.table("repositories").update(
{"status": "error"}
).eq("status", "indexing").execute()
count = len(result.data) if result.data else 0
if count:
logger.warning("Reset orphaned indexing jobs on startup", count=count)
return count

def update_file_count(self, repo_id: str, count: int) -> None:
"""Update repository file count"""
Expand Down
Loading
Loading