|
4 | 4 | """ |
5 | 5 | import os |
6 | 6 | from typing import Dict, List, Optional |
7 | | -from datetime import datetime |
| 7 | +from datetime import datetime, timedelta |
8 | 8 | from supabase import create_client, Client, ClientOptions |
9 | 9 | import uuid |
10 | 10 |
|
11 | 11 | from services.observability import logger |
12 | 12 |
|
| 13 | +# A repo stuck in 'indexing' longer than this is treated as orphaned (its indexer |
| 14 | +# process died) and may be re-claimed on retry. The startup sweep is unconditional. |
| 15 | +STUCK_INDEXING_THRESHOLD_MINUTES = 30 |
| 16 | + |
13 | 17 |
|
14 | 18 | class SupabaseService: |
15 | 19 | """Service for Supabase database operations""" |
@@ -87,23 +91,70 @@ def update_repository(self, repo_id: str, updates: Dict) -> Optional[Dict]: |
87 | 91 | return result.data[0] if result.data else None |
88 | 92 |
|
89 | 93 | def update_repository_status(self, repo_id: str, status: str) -> None: |
90 | | - """Update repository status""" |
91 | | - self.client.table("repositories").update({"status": status}).eq("id", repo_id).execute() |
92 | | - |
| 94 | + """Update repository status. Transitioning to 'indexing' stamps indexing_started_at |
| 95 | + so the stuck-job reaper has a clock to measure against, regardless of which code path |
| 96 | + started the job.""" |
| 97 | + updates: Dict = {"status": status} |
| 98 | + if status == "indexing": |
| 99 | + updates["indexing_started_at"] = datetime.utcnow().isoformat() |
| 100 | + try: |
| 101 | + self.client.table("repositories").update(updates).eq("id", repo_id).execute() |
| 102 | + except Exception as e: |
| 103 | + # Degrade gracefully if migration 003 (indexing_started_at) hasn't been applied yet: |
| 104 | + # retry the status update without the new column instead of failing the index. |
| 105 | + if "indexing_started_at" not in updates: |
| 106 | + raise |
| 107 | + logger.warning( |
| 108 | + "Status update with indexing_started_at failed; retrying without it (apply migration 003)", |
| 109 | + repo_id=repo_id, error=str(e), |
| 110 | + ) |
| 111 | + self.client.table("repositories").update({"status": status}).eq("id", repo_id).execute() |
| 112 | + |
93 | 113 | def try_set_indexing_status(self, repo_id: str) -> bool: |
94 | 114 | """ |
95 | | - Atomically set status to 'indexing' only if not already indexing. |
96 | | - |
97 | | - Returns True if status was set, False if repo was already indexing. |
98 | | - This prevents TOCTOU race conditions where two requests could both |
99 | | - see status != 'indexing' and both start indexing. |
| 115 | + Atomically set status to 'indexing' only if not already actively indexing. |
| 116 | +
|
| 117 | + Returns True if status was set, False if a fresh indexing job already owns the repo. |
| 118 | + This prevents TOCTOU race conditions where two requests both see status != 'indexing' |
| 119 | + and both start indexing. A row stuck in 'indexing' past the threshold (its process died) |
| 120 | + -- or with no start stamp at all (legacy/pre-migration) -- is treated as orphaned and |
| 121 | + re-claimed, so a crashed job never permanently blocks retry. |
100 | 122 | """ |
101 | | - result = self.client.table("repositories").update( |
102 | | - {"status": "indexing"} |
103 | | - ).eq("id", repo_id).neq("status", "indexing").execute() |
104 | | - |
105 | | - # If result.data is empty, no rows matched (already indexing) |
| 123 | + cutoff = (datetime.utcnow() - timedelta(minutes=STUCK_INDEXING_THRESHOLD_MINUTES)).isoformat() |
| 124 | + try: |
| 125 | + result = self.client.table("repositories").update( |
| 126 | + {"status": "indexing", "indexing_started_at": datetime.utcnow().isoformat()} |
| 127 | + ).eq("id", repo_id).or_( |
| 128 | + f"status.neq.indexing,indexing_started_at.is.null,indexing_started_at.lt.{cutoff}" |
| 129 | + ).execute() |
| 130 | + except Exception as e: |
| 131 | + # Degrade gracefully if migration 003 hasn't been applied yet: fall back to the |
| 132 | + # original atomic compare-and-set (no steal-on-stale) so indexing still works. |
| 133 | + logger.warning( |
| 134 | + "try_set_indexing steal path failed; falling back to basic CAS (apply migration 003)", |
| 135 | + repo_id=repo_id, error=str(e), |
| 136 | + ) |
| 137 | + result = self.client.table("repositories").update( |
| 138 | + {"status": "indexing"} |
| 139 | + ).eq("id", repo_id).neq("status", "indexing").execute() |
| 140 | + |
| 141 | + # If result.data is empty, no rows matched (a fresh indexing job owns it) |
106 | 142 | return bool(result.data) |
| 143 | + |
| 144 | + def reset_stuck_indexing_jobs(self) -> int: |
| 145 | + """Reset every repo left in 'indexing' to 'error' so the user can retry. |
| 146 | +
|
| 147 | + Called once on startup: indexing runs in-process (BackgroundTasks / WebSocket), so a |
| 148 | + restart kills any in-flight job and every 'indexing' row at boot is by definition |
| 149 | + orphaned. Returns the number of rows reset. |
| 150 | + """ |
| 151 | + result = self.client.table("repositories").update( |
| 152 | + {"status": "error"} |
| 153 | + ).eq("status", "indexing").execute() |
| 154 | + count = len(result.data) if result.data else 0 |
| 155 | + if count: |
| 156 | + logger.warning("Reset orphaned indexing jobs on startup", count=count) |
| 157 | + return count |
107 | 158 |
|
108 | 159 | def update_file_count(self, repo_id: str, count: int) -> None: |
109 | 160 | """Update repository file count""" |
|
0 commit comments