Skip to content

Commit ca451f0

Browse files
authored
Merge pull request #316 from DevanshuNEU/fix/durable-repo-state-v0.1
fix: durable repo-state v0.1: lazy re-clone + stuck-job recovery (#311)
2 parents 6e6abd9 + 0694ada commit ca451f0

7 files changed

Lines changed: 426 additions & 28 deletions

File tree

backend/main.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,10 @@
4444
@asynccontextmanager
4545
async def lifespan(app: FastAPI):
4646
validate_environment()
47+
# Any repo left 'indexing' at boot is orphaned (indexing runs in-process; a restart
48+
# kills it). Reset so the user can retry instead of seeing an eternal spinner. (#311)
49+
from services.supabase_service import get_supabase_service
50+
get_supabase_service().reset_stuck_indexing_jobs()
4751
await load_demo_repos()
4852
yield
4953
# Shutdown (cleanup if needed)
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
-- Durable repo-state v0.1 (issue #311)
2+
-- Records when a repo entered the 'indexing' state so the stuck-job reaper can tell a
3+
-- live job from an orphaned one (process died mid-index, leaving status='indexing' forever).
4+
5+
ALTER TABLE repositories
6+
ADD COLUMN IF NOT EXISTS indexing_started_at TIMESTAMPTZ;
7+
8+
-- Partial index: the reaper and the steal-on-retry path only ever scan rows currently indexing.
9+
CREATE INDEX IF NOT EXISTS idx_repositories_indexing_started_at
10+
ON repositories(indexing_started_at)
11+
WHERE status = 'indexing';

backend/routes/analysis.py

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44

55
from dependencies import (
66
dependency_analyzer, style_analyzer, dna_extractor,
7-
get_repo_or_404
7+
get_repo_or_404, repo_manager
88
)
99
from services.input_validator import InputValidator
1010
from middleware.auth import require_auth, AuthContext
@@ -35,6 +35,7 @@ async def get_dependency_graph(
3535
return {**cached_graph, "cached": True}
3636

3737
logger.info("Building fresh dependency graph", repo_id=repo_id, include_paths=repo.get("include_paths"))
38+
await repo_manager.ensure_clone(repo)
3839
graph_data = dependency_analyzer.build_dependency_graph(repo["local_path"], include_paths=repo.get("include_paths"))
3940
dependency_analyzer.save_to_cache(repo_id, graph_data)
4041

@@ -57,12 +58,15 @@ async def analyze_impact(
5758
try:
5859
repo = get_repo_or_404(repo_id, auth.user_id)
5960

61+
# Validate user input BEFORE any expensive/external work (re-clone). Reject traversal first.
6062
valid_path, path_error = InputValidator.validate_file_path(
6163
request.file_path, repo["local_path"]
6264
)
6365
if not valid_path:
6466
raise HTTPException(status_code=400, detail=f"Invalid file path: {path_error}")
6567

68+
await repo_manager.ensure_clone(repo)
69+
6670
graph_data = dependency_analyzer.load_from_cache(repo_id)
6771
if not graph_data:
6872
logger.info("Building dependency graph for impact analysis", repo_id=repo_id)
@@ -96,6 +100,7 @@ async def get_repository_insights(
96100
graph_data = dependency_analyzer.load_from_cache(repo_id)
97101
if not graph_data:
98102
logger.info("Building dependency graph for insights", repo_id=repo_id)
103+
await repo_manager.ensure_clone(repo)
99104
graph_data = dependency_analyzer.build_dependency_graph(repo["local_path"], include_paths=repo.get("include_paths"))
100105
dependency_analyzer.save_to_cache(repo_id, graph_data)
101106

@@ -134,6 +139,7 @@ async def get_style_analysis(
134139
return {**cached_style, "cached": True}
135140

136141
logger.info("Analyzing code style", repo_id=repo_id)
142+
await repo_manager.ensure_clone(repo)
137143
style_data = style_analyzer.analyze_repository_style(repo["local_path"], include_paths=repo.get("include_paths"))
138144
style_analyzer.save_to_cache(repo_id, style_data)
139145

@@ -178,6 +184,7 @@ async def get_codebase_dna(
178184
logger.info("Extracting codebase DNA", repo_id=repo_id)
179185
metrics.increment("dna_extractions")
180186

187+
await repo_manager.ensure_clone(repo)
181188
dna = dna_extractor.extract_dna(repo["local_path"], repo_id, include_paths=repo.get("include_paths"))
182189
dna_extractor.save_to_cache(repo_id, dna)
183190

backend/routes/repos.py

Lines changed: 19 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -470,11 +470,10 @@ async def get_repo_directories(
470470
directories to index instead of the entire repo.
471471
"""
472472
repo = get_repo_or_404(repo_id, auth.user_id)
473+
# Re-clone if the container was redeployed and wiped the working tree (#311).
474+
await repo_manager.ensure_clone(repo)
473475
local_path = Path(repo["local_path"])
474476

475-
if not local_path.exists():
476-
raise HTTPException(status_code=404, detail="Repo not cloned yet")
477-
478477
dirs = await asyncio.to_thread(_scan_directories, local_path)
479478

480479
return {
@@ -501,7 +500,9 @@ async def index_repository(
501500

502501
try:
503502
repo = get_repo_or_404(repo_id, user_id)
504-
503+
# Re-clone if the container was redeployed and wiped the working tree (#311).
504+
await repo_manager.ensure_clone(repo)
505+
505506
# Re-check size limits before indexing (in case tier changed or repo updated)
506507
analysis = repo_validator.analyze_repo(repo["local_path"])
507508

@@ -765,10 +766,13 @@ async def index_repository_async(
765766

766767
try:
767768
repo = get_repo_or_404(repo_id, user_id)
768-
769+
# Re-clone before the size-check (which reads the working tree) so a redeployed
770+
# container rehydrates here, before the 202, rather than failing the gate (#311).
771+
await repo_manager.ensure_clone(repo)
772+
769773
# Re-check size limits
770774
analysis = repo_validator.analyze_repo(repo["local_path"])
771-
775+
772776
if not analysis.success:
773777
raise HTTPException(
774778
status_code=500,
@@ -867,7 +871,15 @@ async def websocket_index(websocket: WebSocket, repo_id: str):
867871
if not repo:
868872
await websocket.close(code=4004, reason="Repository not found")
869873
return
870-
874+
875+
# Re-clone if the container was redeployed and wiped the working tree (#311).
876+
try:
877+
await repo_manager.ensure_clone(repo)
878+
except Exception as e:
879+
logger.error("Re-clone failed for websocket indexing", repo_id=repo_id, error=str(e))
880+
await websocket.close(code=4005, reason="Repository clone unavailable")
881+
return
882+
871883
# Check size limits before WebSocket indexing
872884
analysis = repo_validator.analyze_repo(repo["local_path"])
873885

backend/services/repo_manager.py

Lines changed: 95 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2,22 +2,54 @@
22
Repository Manager (Supabase Edition)
33
Handles repository CRUD operations with PostgreSQL via Supabase
44
"""
5+
import asyncio
6+
import os
7+
import shutil
58
import uuid
6-
from typing import List, Optional
9+
from typing import Dict, List, Optional
710
import git
811
from pathlib import Path
12+
from fastapi import HTTPException
913
from services.supabase_service import get_supabase_service
1014
from services.observability import logger, metrics
1115

1216

17+
class RepoCloneError(HTTPException):
18+
"""Repo working tree is missing and could not be restored from its git remote.
19+
20+
Subclasses HTTPException so it surfaces as an actionable 503 (handlers already re-raise
21+
HTTPException) instead of an opaque 500. UX matters here: a redeploy is invisible to the
22+
user, so the message has to tell them what to actually do. (#311)
23+
"""
24+
25+
def __init__(self, repo_id: str, reason: str = ""):
26+
super().__init__(
27+
status_code=503,
28+
detail={
29+
"error": "REPO_UNAVAILABLE",
30+
"repo_id": repo_id,
31+
"message": (
32+
"Repository source files are temporarily unavailable and could not be "
33+
"restored from the git remote. Private repositories are not yet supported "
34+
"for re-sync; for public repos, please retry shortly."
35+
),
36+
},
37+
)
38+
self.reason = reason
39+
40+
1341
class RepositoryManager:
1442
"""Manage repositories with Supabase persistence"""
1543

1644
def __init__(self):
1745
self.repos_dir = Path("./repos")
1846
self.repos_dir.mkdir(exist_ok=True)
1947
self.db = get_supabase_service()
20-
48+
49+
# Per-repo locks so two concurrent ops on the same missing clone don't both clone.
50+
# Single uvicorn worker means an in-process lock is sufficient here.
51+
self._clone_locks: Dict[str, asyncio.Lock] = {}
52+
2153
# Discover and sync existing repositories on startup
2254
self._sync_existing_repos()
2355

@@ -126,10 +158,69 @@ def add_repo(self, name: str, git_url: str, branch: str = "main", user_id: Optio
126158
except Exception as e:
127159
# Cleanup on failure
128160
if local_path.exists():
129-
import shutil
130161
shutil.rmtree(local_path)
131162
raise Exception(f"Failed to clone repository: {str(e)}")
132-
163+
164+
async def ensure_clone(self, repo: dict) -> str:
165+
"""Guarantee the working tree exists on disk, lazily re-cloning from git_url if needed.
166+
167+
Railway redeploys wipe ./repos (ephemeral disk) but Pinecone/Supabase survive, so
168+
local_path is a cache hint, not source of truth -- the git remote is. On a warm hit
169+
this is a sub-millisecond stat with no behavior change; on a miss it re-clones.
170+
Returns the canonical local path and refreshes repo['local_path'] in place.
171+
"""
172+
repo_id = repo["id"]
173+
canonical = self.repos_dir / repo_id
174+
175+
# Warm path: clone present. No re-clone, no event-loop work.
176+
if (canonical / ".git").exists():
177+
repo["local_path"] = str(canonical)
178+
return str(canonical)
179+
180+
git_url = repo.get("git_url")
181+
if not git_url or git_url == "unknown":
182+
raise RepoCloneError(repo_id, "no git_url on record")
183+
branch = repo.get("branch") or "main"
184+
185+
lock = self._clone_locks.setdefault(repo_id, asyncio.Lock())
186+
async with lock:
187+
# Another coroutine may have cloned while we waited for the lock.
188+
if not (canonical / ".git").exists():
189+
try:
190+
await asyncio.to_thread(self._clone_into_place, repo_id, git_url, branch, canonical)
191+
except Exception as e:
192+
# Private repo (no creds on a fresh container), network failure, deleted
193+
# remote: surface as an actionable 503, not an opaque 500.
194+
logger.error("Re-clone failed", repo_id=repo_id, git_url=git_url, error=str(e))
195+
raise RepoCloneError(repo_id, str(e)) from e
196+
logger.info("Re-cloned repo on demand (cache miss)", repo_id=repo_id, git_url=git_url)
197+
metrics.increment("repos_recloned")
198+
199+
repo["local_path"] = str(canonical)
200+
return str(canonical)
201+
202+
def _clone_into_place(self, repo_id: str, git_url: str, branch: str, canonical: Path) -> None:
203+
"""Clone into a temp dir then atomically rename into the canonical path.
204+
205+
The rename is the correctness guarantee: a crashed or concurrent clone never leaves a
206+
half-populated canonical dir for a reader to trip over. Runs in a worker thread (git is
207+
blocking I/O); never call directly on the event loop.
208+
"""
209+
tmp = self.repos_dir / f".{repo_id}.tmp.{uuid.uuid4().hex}"
210+
try:
211+
git.Repo.clone_from(git_url, tmp, branch=branch, depth=1)
212+
# Clear any leftover partial dir before the atomic swap. Do NOT ignore errors here:
213+
# a failed removal must surface (the outer except re-raises it, and ensure_clone wraps
214+
# it into a logged RepoCloneError) rather than letting us rename onto a dir we could
215+
# not clean, which would fail later with a more confusing error.
216+
if canonical.exists():
217+
shutil.rmtree(canonical)
218+
os.rename(tmp, canonical) # atomic on the same filesystem
219+
except Exception:
220+
if tmp.exists():
221+
shutil.rmtree(tmp, ignore_errors=True)
222+
raise
223+
133224
def update_status(self, repo_id: str, status: str):
134225
"""Update repository status"""
135226
self.db.update_repository_status(repo_id, status)
@@ -158,8 +249,6 @@ def update_last_commit(self, repo_id: str, commit_sha: str, function_count: int
158249

159250
def delete_repo(self, repo_id: str) -> bool:
160251
"""Delete repository and clean up local files"""
161-
import shutil
162-
163252
repo = self.get_repo(repo_id)
164253
if not repo:
165254
return False

backend/services/supabase_service.py

Lines changed: 73 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,16 @@
44
"""
55
import os
66
from typing import Dict, List, Optional
7-
from datetime import datetime
7+
from datetime import datetime, timedelta
88
from supabase import create_client, Client, ClientOptions
99
import uuid
1010

1111
from services.observability import logger
1212

13+
# A repo stuck in 'indexing' longer than this is treated as orphaned (its indexer
14+
# process died) and may be re-claimed on retry. The startup sweep is unconditional.
15+
STUCK_INDEXING_THRESHOLD_MINUTES = 30
16+
1317

1418
class SupabaseService:
1519
"""Service for Supabase database operations"""
@@ -87,23 +91,78 @@ def update_repository(self, repo_id: str, updates: Dict) -> Optional[Dict]:
8791
return result.data[0] if result.data else None
8892

8993
def update_repository_status(self, repo_id: str, status: str) -> None:
90-
"""Update repository status"""
91-
self.client.table("repositories").update({"status": status}).eq("id", repo_id).execute()
92-
94+
"""Update repository status. Transitioning to 'indexing' stamps indexing_started_at
95+
so the stuck-job reaper has a clock to measure against, regardless of which code path
96+
started the job."""
97+
updates: Dict = {"status": status}
98+
if status == "indexing":
99+
updates["indexing_started_at"] = datetime.utcnow().isoformat()
100+
try:
101+
self.client.table("repositories").update(updates).eq("id", repo_id).execute()
102+
except Exception as e:
103+
# Only swallow the specific case where migration 003 (indexing_started_at) has not been
104+
# applied yet -- detected by the column name appearing in the DB error. Any other error
105+
# (network, constraint, etc.) must re-raise so a real failure isn't masked by the retry.
106+
if "indexing_started_at" not in updates or "indexing_started_at" not in str(e):
107+
raise
108+
logger.warning(
109+
"indexing_started_at column missing; retrying status update without it (apply migration 003)",
110+
repo_id=repo_id, error=str(e),
111+
)
112+
self.client.table("repositories").update({"status": status}).eq("id", repo_id).execute()
113+
93114
def try_set_indexing_status(self, repo_id: str) -> bool:
94115
"""
95-
Atomically set status to 'indexing' only if not already indexing.
96-
97-
Returns True if status was set, False if repo was already indexing.
98-
This prevents TOCTOU race conditions where two requests could both
99-
see status != 'indexing' and both start indexing.
116+
Atomically set status to 'indexing' only if not already actively indexing.
117+
118+
Returns True if status was set, False if a fresh indexing job already owns the repo.
119+
This prevents TOCTOU race conditions where two requests both see status != 'indexing'
120+
and both start indexing. A row stuck in 'indexing' past the threshold (its process died)
121+
-- or with no start stamp at all (legacy/pre-migration) -- is treated as orphaned and
122+
re-claimed, so a crashed job never permanently blocks retry.
100123
"""
101-
result = self.client.table("repositories").update(
102-
{"status": "indexing"}
103-
).eq("id", repo_id).neq("status", "indexing").execute()
104-
105-
# If result.data is empty, no rows matched (already indexing)
124+
# cutoff is the staleness threshold for stealing an orphaned 'indexing' lock, derived from
125+
# STUCK_INDEXING_THRESHOLD_MINUTES and a SERVER-CONTROLLED datetime.utcnow(). It is never
126+
# user input, so interpolating it into the PostgREST .or_ filter string is safe. If any
127+
# user-supplied value is ever introduced into this filter, parameterize it via the SDK
128+
# (do not f-string it) to avoid filter-injection.
129+
cutoff = (datetime.utcnow() - timedelta(minutes=STUCK_INDEXING_THRESHOLD_MINUTES)).isoformat()
130+
try:
131+
result = self.client.table("repositories").update(
132+
{"status": "indexing", "indexing_started_at": datetime.utcnow().isoformat()}
133+
).eq("id", repo_id).or_(
134+
f"status.neq.indexing,indexing_started_at.is.null,indexing_started_at.lt.{cutoff}"
135+
).execute()
136+
except Exception as e:
137+
# Only fall back when migration 003 (indexing_started_at) is missing -- detected by the
138+
# column name in the DB error. Re-raise anything else so a real failure isn't masked.
139+
if "indexing_started_at" not in str(e):
140+
raise
141+
logger.warning(
142+
"indexing_started_at column missing; falling back to basic CAS (apply migration 003)",
143+
repo_id=repo_id, error=str(e),
144+
)
145+
result = self.client.table("repositories").update(
146+
{"status": "indexing"}
147+
).eq("id", repo_id).neq("status", "indexing").execute()
148+
149+
# If result.data is empty, no rows matched (a fresh indexing job owns it)
106150
return bool(result.data)
151+
152+
def reset_stuck_indexing_jobs(self) -> int:
153+
"""Reset every repo left in 'indexing' to 'error' so the user can retry.
154+
155+
Called once on startup: indexing runs in-process (BackgroundTasks / WebSocket), so a
156+
restart kills any in-flight job and every 'indexing' row at boot is by definition
157+
orphaned. Returns the number of rows reset.
158+
"""
159+
result = self.client.table("repositories").update(
160+
{"status": "error"}
161+
).eq("status", "indexing").execute()
162+
count = len(result.data) if result.data else 0
163+
if count:
164+
logger.warning("Reset orphaned indexing jobs on startup", count=count)
165+
return count
107166

108167
def update_file_count(self, repo_id: str, count: int) -> None:
109168
"""Update repository file count"""

0 commit comments

Comments
 (0)