|
| 1 | +"""Durable repo-state v0.1 (#311): lazy re-clone + stuck-job recovery. |
| 2 | +
|
| 3 | +Covers ensure_clone (warm no-op / cold re-clone / concurrency / no git_url) and the |
| 4 | +stuck-indexing recovery paths (timestamp stamping, steal-on-retry filter, startup sweep). |
| 5 | +""" |
| 6 | +import asyncio |
| 7 | +import time |
| 8 | +from pathlib import Path |
| 9 | +from unittest.mock import MagicMock, patch, AsyncMock |
| 10 | + |
| 11 | +import pytest |
| 12 | + |
| 13 | +from services.repo_manager import RepositoryManager, RepoCloneError |
| 14 | + |
| 15 | + |
| 16 | +def _make_manager(tmp_path) -> RepositoryManager: |
| 17 | + """RepositoryManager with disk scan + Supabase stubbed, repos_dir pointed at a tmp dir.""" |
| 18 | + with patch.object(RepositoryManager, "_sync_existing_repos"), \ |
| 19 | + patch("services.repo_manager.get_supabase_service") as mock_get_db: |
| 20 | + mock_get_db.return_value = MagicMock() |
| 21 | + mgr = RepositoryManager() |
| 22 | + mgr.repos_dir = Path(tmp_path) |
| 23 | + return mgr |
| 24 | + |
| 25 | + |
| 26 | +class TestEnsureClone: |
| 27 | + async def test_warm_path_does_not_clone(self, tmp_path): |
| 28 | + """Clone already present -> stat only, no re-clone, no behavior change (AC4).""" |
| 29 | + mgr = _make_manager(tmp_path) |
| 30 | + repo_id = "repo-warm" |
| 31 | + (mgr.repos_dir / repo_id / ".git").mkdir(parents=True) |
| 32 | + repo = {"id": repo_id, "git_url": "https://example.com/x.git", "branch": "main"} |
| 33 | + |
| 34 | + with patch.object(mgr, "_clone_into_place") as clone: |
| 35 | + path = await mgr.ensure_clone(repo) |
| 36 | + |
| 37 | + clone.assert_not_called() |
| 38 | + assert path == str(mgr.repos_dir / repo_id) |
| 39 | + assert repo["local_path"] == str(mgr.repos_dir / repo_id) |
| 40 | + |
| 41 | + async def test_cold_path_reclones(self, tmp_path): |
| 42 | + """Clone missing (redeploy) -> re-clone from git_url@branch (AC1).""" |
| 43 | + mgr = _make_manager(tmp_path) |
| 44 | + repo_id = "repo-cold" |
| 45 | + repo = {"id": repo_id, "git_url": "https://example.com/x.git", "branch": "dev"} |
| 46 | + |
| 47 | + def fake_clone(rid, url, branch, canonical): |
| 48 | + (canonical / ".git").mkdir(parents=True) |
| 49 | + |
| 50 | + with patch.object(mgr, "_clone_into_place", side_effect=fake_clone) as clone: |
| 51 | + path = await mgr.ensure_clone(repo) |
| 52 | + |
| 53 | + clone.assert_called_once() |
| 54 | + rid, url, branch, _canonical = clone.call_args.args |
| 55 | + assert (rid, url, branch) == (repo_id, "https://example.com/x.git", "dev") |
| 56 | + assert (Path(path) / ".git").exists() |
| 57 | + assert repo["local_path"] == str(mgr.repos_dir / repo_id) |
| 58 | + |
| 59 | + async def test_missing_git_url_raises_actionable_503(self, tmp_path): |
| 60 | + """No usable git_url -> actionable 503, not an opaque 500 or silent 404.""" |
| 61 | + mgr = _make_manager(tmp_path) |
| 62 | + repo = {"id": "repo-nourl", "git_url": "unknown", "branch": "main"} |
| 63 | + with pytest.raises(RepoCloneError) as exc: |
| 64 | + await mgr.ensure_clone(repo) |
| 65 | + assert exc.value.status_code == 503 |
| 66 | + assert exc.value.detail["error"] == "REPO_UNAVAILABLE" |
| 67 | + |
| 68 | + async def test_clone_failure_raises_actionable_503(self, tmp_path): |
| 69 | + """A failed clone (private repo / network) surfaces as 503, not a raw 500.""" |
| 70 | + mgr = _make_manager(tmp_path) |
| 71 | + repo = {"id": "repo-fail", "git_url": "https://example.com/private.git", "branch": "main"} |
| 72 | + |
| 73 | + def boom(rid, url, branch, canonical): |
| 74 | + raise RuntimeError("authentication required") |
| 75 | + |
| 76 | + with patch.object(mgr, "_clone_into_place", side_effect=boom): |
| 77 | + with pytest.raises(RepoCloneError) as exc: |
| 78 | + await mgr.ensure_clone(repo) |
| 79 | + assert exc.value.status_code == 503 |
| 80 | + |
| 81 | + async def test_concurrent_callers_clone_once(self, tmp_path): |
| 82 | + """Two simultaneous ops on the same missing repo clone exactly once (AC2).""" |
| 83 | + mgr = _make_manager(tmp_path) |
| 84 | + repo_id = "repo-race" |
| 85 | + calls = {"n": 0} |
| 86 | + |
| 87 | + def fake_clone(rid, url, branch, canonical): |
| 88 | + calls["n"] += 1 |
| 89 | + time.sleep(0.05) # widen the window so the second caller is forced to wait on the lock |
| 90 | + (canonical / ".git").mkdir(parents=True) |
| 91 | + |
| 92 | + repo_a = {"id": repo_id, "git_url": "https://example.com/x.git", "branch": "main"} |
| 93 | + repo_b = {"id": repo_id, "git_url": "https://example.com/x.git", "branch": "main"} |
| 94 | + |
| 95 | + with patch.object(mgr, "_clone_into_place", side_effect=fake_clone): |
| 96 | + await asyncio.gather(mgr.ensure_clone(repo_a), mgr.ensure_clone(repo_b)) |
| 97 | + |
| 98 | + assert calls["n"] == 1 |
| 99 | + |
| 100 | + |
| 101 | +class TestStuckJobRecovery: |
| 102 | + def _service(self): |
| 103 | + from services.supabase_service import SupabaseService |
| 104 | + svc = SupabaseService() |
| 105 | + svc.client = MagicMock() |
| 106 | + return svc |
| 107 | + |
| 108 | + def test_update_status_indexing_stamps_started_at(self): |
| 109 | + """Transition to 'indexing' records when it started, for the reaper's clock.""" |
| 110 | + svc = self._service() |
| 111 | + svc.update_repository_status("r1", "indexing") |
| 112 | + updates = svc.client.table.return_value.update.call_args.args[0] |
| 113 | + assert updates["status"] == "indexing" |
| 114 | + assert "indexing_started_at" in updates |
| 115 | + |
| 116 | + def test_update_status_non_indexing_does_not_stamp(self): |
| 117 | + svc = self._service() |
| 118 | + svc.update_repository_status("r1", "indexed") |
| 119 | + updates = svc.client.table.return_value.update.call_args.args[0] |
| 120 | + assert updates == {"status": "indexed"} |
| 121 | + |
| 122 | + def test_try_set_indexing_steal_filter(self): |
| 123 | + """Steal condition covers fresh-claim, stale, and legacy NULL-timestamp rows (AC5).""" |
| 124 | + svc = self._service() |
| 125 | + exec_result = MagicMock() |
| 126 | + exec_result.data = [{"id": "r1"}] |
| 127 | + svc.client.table.return_value.update.return_value.eq.return_value.or_.return_value.execute.return_value = exec_result |
| 128 | + |
| 129 | + assert svc.try_set_indexing_status("r1") is True |
| 130 | + or_arg = svc.client.table.return_value.update.return_value.eq.return_value.or_.call_args.args[0] |
| 131 | + assert "status.neq.indexing" in or_arg |
| 132 | + assert "indexing_started_at.is.null" in or_arg |
| 133 | + assert "indexing_started_at.lt." in or_arg |
| 134 | + |
| 135 | + def test_try_set_indexing_false_when_fresh_job_owns(self): |
| 136 | + """A live, recently-started job is not stealable -> returns False.""" |
| 137 | + svc = self._service() |
| 138 | + exec_result = MagicMock() |
| 139 | + exec_result.data = [] |
| 140 | + svc.client.table.return_value.update.return_value.eq.return_value.or_.return_value.execute.return_value = exec_result |
| 141 | + assert svc.try_set_indexing_status("r1") is False |
| 142 | + |
| 143 | + def test_reset_stuck_jobs_resets_indexing_to_error(self): |
| 144 | + """Startup sweep flips every 'indexing' row to 'error' and reports the count (AC6).""" |
| 145 | + svc = self._service() |
| 146 | + exec_result = MagicMock() |
| 147 | + exec_result.data = [{"id": "a"}, {"id": "b"}] |
| 148 | + svc.client.table.return_value.update.return_value.eq.return_value.execute.return_value = exec_result |
| 149 | + |
| 150 | + count = svc.reset_stuck_indexing_jobs() |
| 151 | + assert count == 2 |
| 152 | + assert svc.client.table.return_value.update.call_args.args[0] == {"status": "error"} |
| 153 | + assert svc.client.table.return_value.update.return_value.eq.call_args.args == ("status", "indexing") |
| 154 | + |
| 155 | + def test_update_status_falls_back_when_column_missing(self): |
| 156 | + """If migration 003 isn't applied, the status update degrades to no-column instead of 500.""" |
| 157 | + svc = self._service() |
| 158 | + chain = svc.client.table.return_value.update.return_value.eq.return_value |
| 159 | + chain.execute.side_effect = [Exception("PGRST204 column indexing_started_at not found"), MagicMock()] |
| 160 | + |
| 161 | + svc.update_repository_status("r1", "indexing") |
| 162 | + |
| 163 | + calls = svc.client.table.return_value.update.call_args_list |
| 164 | + assert len(calls) == 2 |
| 165 | + assert "indexing_started_at" in calls[0].args[0] # first attempt includes the new column |
| 166 | + assert calls[1].args[0] == {"status": "indexing"} # retry drops it |
| 167 | + |
| 168 | + def test_try_set_indexing_falls_back_to_basic_cas_when_column_missing(self): |
| 169 | + """Steal path errors without the column -> fall back to the original atomic CAS.""" |
| 170 | + svc = self._service() |
| 171 | + eq_chain = svc.client.table.return_value.update.return_value.eq.return_value |
| 172 | + eq_chain.or_.return_value.execute.side_effect = Exception("PGRST204 column missing") |
| 173 | + fallback = MagicMock() |
| 174 | + fallback.data = [{"id": "r1"}] |
| 175 | + eq_chain.neq.return_value.execute.return_value = fallback |
| 176 | + |
| 177 | + assert svc.try_set_indexing_status("r1") is True |
| 178 | + eq_chain.neq.assert_called_once_with("status", "indexing") |
| 179 | + |
| 180 | + |
| 181 | +class TestRouteWiring: |
| 182 | + """Integration: a route actually invokes ensure_clone (guards against the guard being removed).""" |
| 183 | + |
| 184 | + def test_dependency_route_triggers_ensure_clone(self, client): |
| 185 | + repo = { |
| 186 | + "id": "r-int", "git_url": "https://example.com/x.git", "branch": "main", |
| 187 | + "local_path": "./repos/r-int", "include_paths": None, |
| 188 | + "name": "x", "status": "indexed", "file_count": 0, |
| 189 | + } |
| 190 | + with patch("routes.analysis.get_repo_or_404", return_value=repo), \ |
| 191 | + patch("routes.analysis.repo_manager.ensure_clone", new=AsyncMock(return_value="./repos/r-int")) as ensure, \ |
| 192 | + patch("routes.analysis.dependency_analyzer.load_from_cache", return_value=None), \ |
| 193 | + patch("routes.analysis.dependency_analyzer.build_dependency_graph", return_value={"dependencies": {}, "metrics": {}}), \ |
| 194 | + patch("routes.analysis.dependency_analyzer.save_to_cache"): |
| 195 | + resp = client.get("/api/v1/repos/r-int/dependencies?force=true") |
| 196 | + |
| 197 | + assert resp.status_code == 200 |
| 198 | + ensure.assert_awaited_once() |
0 commit comments