diff --git a/autobot-slm-backend/api/code_sync.py b/autobot-slm-backend/api/code_sync.py index 8edfccf64..a60459dff 100644 --- a/autobot-slm-backend/api/code_sync.py +++ b/autobot-slm-backend/api/code_sync.py @@ -90,6 +90,60 @@ class FleetSyncJob: # In-memory tracking for running asyncio tasks only (not job state) _running_tasks: Dict[str, asyncio.Task] = {} +# Serialise the check-and-insert in sync_fleet so two concurrent +# requests cannot both pass the "no running job" guard (#1730, #1937). +_fleet_sync_lock = asyncio.Lock() + + +async def reconcile_stale_fleet_sync_jobs() -> int: + """Mark stale 'running' fleet sync jobs as failed on startup (#1729). + + Called during SLM backend lifespan init. Any job left in 'running' + status from a prior process crash is marked 'failed' with a message + explaining it was interrupted by a service restart. + + Returns: + Number of stale jobs reconciled. + """ + from services.database import db_service + + async with db_service.session() as db: + result = await db.execute( + select(FleetSyncJobModel).where(FleetSyncJobModel.status == "running") + ) + stale_jobs = result.scalars().all() + count = len(stale_jobs) + + for job in stale_jobs: + job.status = "failed" + job.completed_at = datetime.utcnow() + logger.warning( + "Reconciled stale fleet sync job %s " + "(was 'running', marked 'failed')", + job.job_id, + ) + + if count: + await db.commit() + + return count + + +async def assert_no_running_sync(db) -> None: + """Raise 409 if a fleet sync is already running (#1730). + + Shared guard for sync_fleet, run_schedule, and execute_schedule. + Must be called inside ``_fleet_sync_lock`` to prevent TOCTOU races. + """ + running_result = await db.execute( + select(FleetSyncJobModel).where(FleetSyncJobModel.status == "running") + ) + if running_result.scalar_one_or_none(): + raise HTTPException( + status_code=status.HTTP_409_CONFLICT, + detail="Fleet sync already in progress", + ) + async def _persist_fleet_sync_job( job: FleetSyncJob, @@ -1078,6 +1132,7 @@ async def _run_fleet_sync_job(job: FleetSyncJob) -> None: await _update_job_status_db( job.job_id, status=job.status, completed_at=job.completed_at ) + _running_tasks.pop(job.job_id, None) # Prevent memory leak (#1928) logger.info( "Fleet sync job %s completed: %d/%d successful", job.job_id, @@ -1261,29 +1316,33 @@ async def sync_fleet( If node_ids is None, syncs all outdated nodes. Supports rolling, immediate, graceful, and manual strategies. """ - # Get target nodes - if request.node_ids: - result = await db.execute( - select(Node).where(Node.node_id.in_(request.node_ids)) - ) - else: - result = await db.execute( - select(Node).where(Node.code_status == CodeStatus.OUTDATED.value) - ) + # Lock covers check-through-persist to prevent TOCTOU race (#1937) + async with _fleet_sync_lock: + await assert_no_running_sync(db) - nodes = result.scalars().all() + # Get target nodes + if request.node_ids: + result = await db.execute( + select(Node).where(Node.node_id.in_(request.node_ids)) + ) + else: + result = await db.execute( + select(Node).where(Node.code_status == CodeStatus.OUTDATED.value) + ) - if not nodes: - return FleetSyncResponse( - success=True, - message="No nodes to sync", - job_id="", - nodes_queued=0, - ) + nodes = result.scalars().all() - # Create job with node states and persist to DB (#1707) - job = _build_fleet_sync_job_from_nodes(nodes, request) - await _persist_fleet_sync_job(job) + if not nodes: + return FleetSyncResponse( + success=True, + message="No nodes to sync", + job_id="", + nodes_queued=0, + ) + + # Create job with node states and persist to DB (#1707) + job = _build_fleet_sync_job_from_nodes(nodes, request) + await _persist_fleet_sync_job(job) if request.strategy != "manual": task = asyncio.create_task(_run_fleet_sync_job(job)) diff --git a/autobot-slm-backend/main.py b/autobot-slm-backend/main.py index 6b4841bf2..ede5c7419 100644 --- a/autobot-slm-backend/main.py +++ b/autobot-slm-backend/main.py @@ -139,6 +139,16 @@ async def lifespan(app: FastAPI): await _seed_default_roles() await _seed_default_agents() + # Reconcile stale fleet sync jobs from prior crash (#1729) + try: + from api.code_sync import reconcile_stale_fleet_sync_jobs + + reconciled = await reconcile_stale_fleet_sync_jobs() + if reconciled: + logger.warning("Reconciled %d stale fleet sync job(s)", reconciled) + except Exception: + logger.exception("Failed to reconcile stale fleet sync jobs") + # Initialize manifest loader singleton (Issue #926 Phase 3) from services.manifest_loader import init_manifest_loader