Skip to content

Commit 5927095

Browse files
authored
Merge pull request #140 from experimaestro/122-asyncio-job-start
2 parents 5a02f75 + 46e7880 commit 5927095

File tree

3 files changed

+134
-96
lines changed

3 files changed

+134
-96
lines changed

src/experimaestro/scheduler/base.py

Lines changed: 27 additions & 94 deletions
Original file line numberDiff line numberDiff line change
@@ -9,12 +9,11 @@
99
from typing import Dict
1010

1111
from experimaestro.scheduler import experiment
12-
from experimaestro.scheduler.jobs import Job, JobError, JobState
12+
from experimaestro.scheduler.jobs import Job, JobState
1313
from experimaestro.scheduler.services import Service
1414

1515

1616
from experimaestro.utils import logger
17-
from experimaestro.locking import Locks, LockError
1817
from experimaestro.utils.asyncio import asyncThreadcheck
1918
import concurrent.futures
2019

@@ -84,10 +83,6 @@ def start_scheduler(self):
8483
else:
8584
logger.warning("Scheduler already started")
8685

87-
# @property
88-
# def loop(self):
89-
# return self.xp.loop
90-
9186
def addlistener(self, listener: Listener):
9287
self.listeners.add(listener)
9388

@@ -262,9 +257,6 @@ async def aio_submit(self, job: Job) -> JobState: # noqa: C901
262257

263258
# Decrement the number of unfinished jobs and notify
264259
self.xp.unfinishedJobs -= 1
265-
# async with self.xp.central.exitCondition:
266-
# logging.debug("Updated number of unfinished jobs")
267-
# self.xp.central.exitCondition.notify_all()
268260
async with self.exitCondition:
269261
logging.debug("Updated number of unfinished jobs")
270262
self.exitCondition.notify_all()
@@ -282,96 +274,37 @@ async def aio_submit(self, job: Job) -> JobState: # noqa: C901
282274
return job.state
283275

284276
async def aio_start(self, job: Job) -> Optional[JobState]:
285-
"""Start a job
277+
"""Start a job (scheduler coordination layer)
278+
279+
This method serves as a coordination layer that delegates the actual
280+
job starting logic to the job itself while handling scheduler-specific
281+
concerns like state notifications and providing coordination context.
286282
287-
Returns None if the dependencies could not be locked after all
288-
Returns DONE/ERROR depending on the process outcome
283+
:param job: The job to start
284+
:return: JobState.WAITING if dependencies could not be locked, JobState.DONE
285+
if job completed successfully, JobState.ERROR if job failed during execution,
286+
or None (should not occur in normal operation)
287+
:raises Exception: Various exceptions during scheduler coordination
289288
"""
290289

291-
# We first lock the job before proceeding
290+
# Assert preconditions
292291
assert job.launcher is not None
293-
# assert self.xp.central is not None
294-
295-
with Locks() as locks:
296-
logger.debug("[starting] Locking job %s", job)
297-
async with job.launcher.connector.lock(job.lockpath):
298-
logger.debug("[starting] Locked job %s", job)
299292

300-
state = None
301-
try:
302-
logger.debug(
303-
"Starting job %s with %d dependencies",
304-
job,
305-
len(job.dependencies),
306-
)
307-
308-
# async with self.xp.central.dependencyLock:
309-
async with self.dependencyLock:
310-
for dependency in job.dependencies:
311-
try:
312-
locks.append(dependency.lock().acquire())
313-
except LockError:
314-
logger.warning(
315-
"Could not lock %s, aborting start for job %s",
316-
dependency,
317-
job,
318-
)
319-
dependency.check()
320-
return JobState.WAITING
321-
322-
self.notify_job_state(job)
323-
324-
job.starttime = time.time()
325-
326-
# Creates the main directory
327-
directory = job.path
328-
logger.debug("Making directories job %s...", directory)
329-
if not directory.is_dir():
330-
directory.mkdir(parents=True, exist_ok=True)
331-
332-
# Sets up the notification URL
333-
if self.xp.server is not None:
334-
job.add_notification_server(self.xp.server)
335-
336-
except Exception:
337-
logger.warning("Error while locking job", exc_info=True)
338-
return JobState.WAITING
293+
try:
294+
# Call job's start method with scheduler context
295+
state = await job.aio_start(
296+
sched_dependency_lock=self.dependencyLock,
297+
notification_server=self.xp.server if self.xp else None,
298+
)
339299

340-
try:
341-
# Runs the job
342-
process = await job.aio_run()
343-
except Exception:
344-
logger.warning("Error while starting job", exc_info=True)
345-
return JobState.ERROR
300+
if state is None:
301+
# Dependencies couldn't be locked, return WAITING state
302+
return JobState.WAITING
346303

347-
try:
348-
if isinstance(process, JobState):
349-
state = process
350-
logger.debug("Job %s ended (state %s)", job, state)
351-
else:
352-
logger.debug("Waiting for job %s process to end", job)
353-
354-
code = await process.aio_code()
355-
logger.debug("Got return code %s for %s", code, job)
356-
357-
# Check the file if there is no return code
358-
if code is None:
359-
# Case where we cannot retrieve the code right away
360-
if job.donepath.is_file():
361-
code = 0
362-
else:
363-
code = int(job.failedpath.read_text())
364-
365-
logger.debug("Job %s ended with code %s", job, code)
366-
state = JobState.DONE if code == 0 else JobState.ERROR
367-
368-
except JobError:
369-
logger.warning("Error while running job")
370-
state = JobState.ERROR
304+
# Notify scheduler listeners of job state after successful start
305+
self.notify_job_state(job)
306+
return state
371307

372-
except Exception:
373-
logger.warning(
374-
"Error while running job (in experimaestro)", exc_info=True
375-
)
376-
state = JobState.ERROR
377-
return state
308+
except Exception:
309+
logger.warning("Error in scheduler job coordination", exc_info=True)
310+
return JobState.ERROR

src/experimaestro/scheduler/experiment.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -250,7 +250,6 @@ def __enter__(self):
250250
self.workspace.__enter__()
251251
(self.workspace.path / ".__experimaestro__").touch()
252252

253-
# global SIGNAL_HANDLER
254253
# Number of unfinished jobs
255254
self.unfinishedJobs = 0
256255
self.taskOutputQueueSize = 0

src/experimaestro/scheduler/jobs.py

Lines changed: 107 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import asyncio
2+
import time
23
from collections import ChainMap
34
import enum
45
from functools import cached_property
@@ -14,7 +15,7 @@
1415
# from experimaestro.scheduler.base import Scheduler
1516
from experimaestro.scheduler.dependencies import Dependency, DependencyStatus, Resource
1617
from experimaestro.scheduler.workspace import RunMode, Workspace
17-
from experimaestro.locking import Lock
18+
from experimaestro.locking import Lock, LockError, Locks
1819
from experimaestro.utils import logger
1920

2021
if TYPE_CHECKING:
@@ -277,6 +278,111 @@ def prepare(self, overwrite=False):
277278
"""
278279
pass
279280

281+
async def aio_start(self, sched_dependency_lock, notification_server=None):
282+
"""Start the job with core job starting logic
283+
284+
This method contains the core logic for starting a job that was previously
285+
located in Scheduler.aio_start(). It handles job locking, dependency
286+
acquisition, directory setup, and job execution while using the scheduler's
287+
coordination lock to prevent race conditions between multiple jobs.
288+
289+
:param sched_dependency_lock: The scheduler's dependency lock for coordination
290+
between jobs to prevent race conditions during dependency acquisition
291+
:param notification_server: Optional notification server from the experiment
292+
for job progress reporting
293+
:return: JobState.DONE if job completed successfully, JobState.ERROR if job
294+
failed during execution, or None if dependencies couldn't be locked
295+
(signals WAITING state to scheduler)
296+
:raises Exception: Various exceptions during job execution, dependency locking,
297+
or process creation
298+
"""
299+
# We first lock the job before proceeding
300+
assert self.launcher is not None
301+
302+
with Locks() as locks:
303+
logger.debug("[starting] Locking job %s", self)
304+
async with self.launcher.connector.lock(self.lockpath):
305+
logger.debug("[starting] Locked job %s", self)
306+
307+
state = None
308+
try:
309+
logger.debug(
310+
"Starting job %s with %d dependencies",
311+
self,
312+
len(self.dependencies),
313+
)
314+
315+
# Individual dependency lock acquisition
316+
# We use the scheduler-wide lock to avoid cross-jobs race conditions
317+
async with sched_dependency_lock:
318+
for dependency in self.dependencies:
319+
try:
320+
locks.append(dependency.lock().acquire())
321+
except LockError:
322+
logger.warning(
323+
"Could not lock %s, aborting start for job %s",
324+
dependency,
325+
self,
326+
)
327+
dependency.check()
328+
return None # Signal to scheduler that dependencies couldn't be locked
329+
330+
# Dependencies have been locked, we can start the job
331+
self.starttime = time.time()
332+
333+
# Creates the main directory
334+
directory = self.path
335+
logger.debug("Making directories job %s...", directory)
336+
if not directory.is_dir():
337+
directory.mkdir(parents=True, exist_ok=True)
338+
339+
# Sets up the notification URL
340+
if notification_server is not None:
341+
self.add_notification_server(notification_server)
342+
343+
except Exception:
344+
logger.warning("Error while locking job", exc_info=True)
345+
return None # Signal waiting state to scheduler
346+
347+
try:
348+
# Runs the job
349+
process = await self.aio_run()
350+
except Exception:
351+
logger.warning("Error while starting job", exc_info=True)
352+
return JobState.ERROR
353+
354+
try:
355+
if isinstance(process, JobState):
356+
state = process
357+
logger.debug("Job %s ended (state %s)", self, state)
358+
else:
359+
logger.debug("Waiting for job %s process to end", self)
360+
361+
code = await process.aio_code()
362+
logger.debug("Got return code %s for %s", code, self)
363+
364+
# Check the file if there is no return code
365+
if code is None:
366+
# Case where we cannot retrieve the code right away
367+
if self.donepath.is_file():
368+
code = 0
369+
else:
370+
code = int(self.failedpath.read_text())
371+
372+
logger.debug("Job %s ended with code %s", self, code)
373+
state = JobState.DONE if code == 0 else JobState.ERROR
374+
375+
except JobError:
376+
logger.warning("Error while running job")
377+
state = JobState.ERROR
378+
379+
except Exception:
380+
logger.warning(
381+
"Error while running job (in experimaestro)", exc_info=True
382+
)
383+
state = JobState.ERROR
384+
return state
385+
280386
async def aio_run(self):
281387
"""Actually run the code"""
282388
raise NotImplementedError(f"Method aio_run not implemented in {self.__class__}")

0 commit comments

Comments
 (0)