Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
99 changes: 79 additions & 20 deletions autobot-slm-backend/api/code_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,60 @@ class FleetSyncJob:
# In-memory tracking for running asyncio tasks only (not job state)
_running_tasks: Dict[str, asyncio.Task] = {}

# Serialise the check-and-insert in sync_fleet so two concurrent
# requests cannot both pass the "no running job" guard (#1730, #1937).
_fleet_sync_lock = asyncio.Lock()


async def reconcile_stale_fleet_sync_jobs() -> int:
"""Mark stale 'running' fleet sync jobs as failed on startup (#1729).

Called during SLM backend lifespan init. Any job left in 'running'
status from a prior process crash is marked 'failed' with a message
explaining it was interrupted by a service restart.

Returns:
Number of stale jobs reconciled.
"""
from services.database import db_service

async with db_service.session() as db:
result = await db.execute(
select(FleetSyncJobModel).where(FleetSyncJobModel.status == "running")
)
stale_jobs = result.scalars().all()
count = len(stale_jobs)

for job in stale_jobs:
job.status = "failed"
job.completed_at = datetime.utcnow()
logger.warning(
"Reconciled stale fleet sync job %s "
"(was 'running', marked 'failed')",
job.job_id,
)

if count:
await db.commit()

return count


async def assert_no_running_sync(db) -> None:
"""Raise 409 if a fleet sync is already running (#1730).

Shared guard for sync_fleet, run_schedule, and execute_schedule.
Must be called inside ``_fleet_sync_lock`` to prevent TOCTOU races.
"""
running_result = await db.execute(
select(FleetSyncJobModel).where(FleetSyncJobModel.status == "running")
)
if running_result.scalar_one_or_none():
raise HTTPException(
status_code=status.HTTP_409_CONFLICT,
detail="Fleet sync already in progress",
)


async def _persist_fleet_sync_job(
job: FleetSyncJob,
Expand Down Expand Up @@ -1078,6 +1132,7 @@ async def _run_fleet_sync_job(job: FleetSyncJob) -> None:
await _update_job_status_db(
job.job_id, status=job.status, completed_at=job.completed_at
)
_running_tasks.pop(job.job_id, None) # Prevent memory leak (#1928)
logger.info(
"Fleet sync job %s completed: %d/%d successful",
job.job_id,
Expand Down Expand Up @@ -1261,29 +1316,33 @@ async def sync_fleet(
If node_ids is None, syncs all outdated nodes.
Supports rolling, immediate, graceful, and manual strategies.
"""
# Get target nodes
if request.node_ids:
result = await db.execute(
select(Node).where(Node.node_id.in_(request.node_ids))
)
else:
result = await db.execute(
select(Node).where(Node.code_status == CodeStatus.OUTDATED.value)
)
# Lock covers check-through-persist to prevent TOCTOU race (#1937)
async with _fleet_sync_lock:
await assert_no_running_sync(db)

nodes = result.scalars().all()
# Get target nodes
if request.node_ids:
result = await db.execute(
select(Node).where(Node.node_id.in_(request.node_ids))
)
else:
result = await db.execute(
select(Node).where(Node.code_status == CodeStatus.OUTDATED.value)
)

if not nodes:
return FleetSyncResponse(
success=True,
message="No nodes to sync",
job_id="",
nodes_queued=0,
)
nodes = result.scalars().all()

# Create job with node states and persist to DB (#1707)
job = _build_fleet_sync_job_from_nodes(nodes, request)
await _persist_fleet_sync_job(job)
if not nodes:
return FleetSyncResponse(
success=True,
message="No nodes to sync",
job_id="",
nodes_queued=0,
)

# Create job with node states and persist to DB (#1707)
job = _build_fleet_sync_job_from_nodes(nodes, request)
await _persist_fleet_sync_job(job)

if request.strategy != "manual":
task = asyncio.create_task(_run_fleet_sync_job(job))
Expand Down
10 changes: 10 additions & 0 deletions autobot-slm-backend/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,16 @@ async def lifespan(app: FastAPI):
await _seed_default_roles()
await _seed_default_agents()

# Reconcile stale fleet sync jobs from prior crash (#1729)
try:
from api.code_sync import reconcile_stale_fleet_sync_jobs

reconciled = await reconcile_stale_fleet_sync_jobs()
if reconciled:
logger.warning("Reconciled %d stale fleet sync job(s)", reconciled)
except Exception:
logger.exception("Failed to reconcile stale fleet sync jobs")

# Initialize manifest loader singleton (Issue #926 Phase 3)
from services.manifest_loader import init_manifest_loader

Expand Down
Loading