diff --git a/src/experimaestro/connectors/local.py b/src/experimaestro/connectors/local.py index 5d5db18..13b33dd 100644 --- a/src/experimaestro/connectors/local.py +++ b/src/experimaestro/connectors/local.py @@ -6,10 +6,10 @@ import os import threading from experimaestro.launcherfinder import LauncherRegistry -import fasteners +from fasteners import InterProcessLock as FastenersInterProcessLock import psutil -from experimaestro.locking import Lock +from asyncio import Lock from . import ( Connector, @@ -93,7 +93,7 @@ def fromspec(connector, spec): def getstream(redirect: Redirect, write: bool): - if redirect.type == RedirectType.FILE: + if redirect.type == RedirectType.FILE and redirect.path: return redirect.path.open("w" if write else "r") if redirect.type == RedirectType.PIPE: @@ -145,25 +145,35 @@ def start(self, task_mode=False): return process -class InterProcessLock(fasteners.InterProcessLock, Lock): +class InterProcessLock(FastenersInterProcessLock, Lock): def __init__(self, path, max_delay=-1): - super().__init__(path) + FastenersInterProcessLock.__init__(self, path) self.max_delay = max_delay def __enter__(self): logger.debug("Locking %s", self.path) - if not super().acquire(blocking=True, max_delay=self.max_delay, timeout=None): + if not FastenersInterProcessLock.acquire( + self, blocking=True, max_delay=self.max_delay, timeout=None + ): raise threading.ThreadError("Could not acquire lock") logger.debug("Locked %s", self.path) return self + def __aenter__(self): + # use the synchronous __enter__ method in async context + return self.__enter__() + def __exit__(self, *args): logger.debug("Unlocking %s", self.path) super().__exit__(*args) + def __aexit__(self, *args): + # use the synchronous __exit__ method in async context + return self.__exit__(*args) + class LocalConnector(Connector): - INSTANCE: Connector = None + INSTANCE: Optional[Connector] = None @staticmethod def instance(): @@ -175,7 +185,7 @@ def instance(): def init_registry(registry: LauncherRegistry): pass - def __init__(self, localpath: Path = None): + def __init__(self, localpath: Optional[Path] = None): localpath = localpath if not localpath: localpath = Path( @@ -200,7 +210,7 @@ def createtoken(self, name: str, total: int) -> Token: def processbuilder(self) -> ProcessBuilder: return LocalProcessBuilder() - def resolve(self, path: Path, basepath: Path = None) -> str: + def resolve(self, path: Path, basepath: Optional[Path] = None) -> str: assert isinstance(path, PosixPath) or isinstance( path, WindowsPath ), f"Unrecognized path {type(path)}" diff --git a/src/experimaestro/connectors/ssh.py b/src/experimaestro/connectors/ssh.py index 115e532..ed850d7 100644 --- a/src/experimaestro/connectors/ssh.py +++ b/src/experimaestro/connectors/ssh.py @@ -9,6 +9,7 @@ import io import os import re +from asyncio import Lock from experimaestro.launcherfinder import LauncherRegistry from urllib.parse import urlparse from itertools import chain @@ -19,7 +20,6 @@ RedirectType, Redirect, ) -from experimaestro.locking import Lock from experimaestro.tokens import Token try: diff --git a/src/experimaestro/locking.py b/src/experimaestro/locking.py index 1440b92..a98001b 100644 --- a/src/experimaestro/locking.py +++ b/src/experimaestro/locking.py @@ -1,68 +1,42 @@ -from experimaestro.utils.asyncio import asyncThreadcheck +from asyncio import Lock from .utils import logger -class Lock: - """A lock""" - - def __init__(self): - self._level = 0 - self.detached = False - - def detach(self): - self.detached = True - - def acquire(self): - if self._level == 0: - self._level += 1 - self._acquire() - return self - - def release(self): - if not self.detached and self._level == 1: - self._level -= 1 - self._release() - - def __enter__(self): - self.acquire() - return self - - def __exit__(self, *args): - self.release() - - async def __aenter__(self): - return await asyncThreadcheck("lock (aenter)", self.__enter__) - - async def __aexit__(self, *args): - return await asyncThreadcheck("lock (aexit)", self.__exit__, *args) - - def _acquire(self): - raise NotImplementedError() - - def _release(self): - raise NotImplementedError() - - class LockError(Exception): pass class Locks(Lock): - """A set of locks""" + """A set of locks that can be acquired/released together""" def __init__(self): super().__init__() self.locks = [] def append(self, lock): + """Add a lock to the collection""" self.locks.append(lock) - def _acquire(self): - for lock in self.locks: - lock.acquire() + async def acquire(self): + """Acquire all locks in order""" + if not self.locked(): + for lock in self.locks: + await lock.acquire() + self._acquired = True + await super().acquire() + return self + + def release(self): + """Release all locks in reverse order""" + if self.locked(): + # if not self.detached and self._acquired: + logger.debug("Releasing %d locks", len(self.locks)) + # Release in reverse order to prevent deadlocks + for lock in reversed(self.locks): + logger.debug("[locks] Releasing %s", lock) + lock.release() + super().release() - def _release(self): - logger.debug("Releasing %d locks", len(self.locks)) - for lock in self.locks: - logger.debug("[locks] Releasing %s", lock) - lock.release() + async def __aenter__(self): + await super().__aenter__() + return self diff --git a/src/experimaestro/scheduler/dependencies.py b/src/experimaestro/scheduler/dependencies.py index 988b578..fc323ff 100644 --- a/src/experimaestro/scheduler/dependencies.py +++ b/src/experimaestro/scheduler/dependencies.py @@ -5,7 +5,7 @@ import asyncio from enum import Enum from ..utils import logger -from ..locking import Lock + if TYPE_CHECKING: from . import Job @@ -60,7 +60,7 @@ def __init__(self, origin): def status(self) -> DependencyStatus: raise NotImplementedError() - def lock(self) -> Lock: + def lock(self) -> asyncio.Lock: raise NotImplementedError() def __repr__(self) -> str: diff --git a/src/experimaestro/scheduler/jobs.py b/src/experimaestro/scheduler/jobs.py index f8d9e0f..ca1737f 100644 --- a/src/experimaestro/scheduler/jobs.py +++ b/src/experimaestro/scheduler/jobs.py @@ -74,10 +74,18 @@ def __init__(self, job): super().__init__() self.job = job - def _acquire(self): + # def _acquire(self): + # return self.job.state == JobState.DONE + + async def acquire(self): + await super().acquire() return self.job.state == JobState.DONE - def _release(self): + # def _release(self): + # return False + + def release(self): + super().release() return False @@ -299,9 +307,9 @@ async def aio_start(self, sched_dependency_lock, notification_server=None): # We first lock the job before proceeding assert self.launcher is not None - with Locks() as locks: + async with Locks() as locks: logger.debug("[starting] Locking job %s", self) - async with self.launcher.connector.lock(self.lockpath): + with self.launcher.connector.lock(self.lockpath): logger.debug("[starting] Locked job %s", self) state = None @@ -317,7 +325,10 @@ async def aio_start(self, sched_dependency_lock, notification_server=None): async with sched_dependency_lock: for dependency in self.dependencies: try: - locks.append(dependency.lock().acquire()) + lock = dependency.lock() + await lock.acquire() + # locks.append(dependency.lock().acquire()) + locks.append(lock) except LockError: logger.warning( "Could not lock %s, aborting start for job %s", diff --git a/src/experimaestro/tokens.py b/src/experimaestro/tokens.py index 93a1573..9909611 100644 --- a/src/experimaestro/tokens.py +++ b/src/experimaestro/tokens.py @@ -12,13 +12,14 @@ import threading import os.path from watchdog.events import FileSystemEventHandler +from asyncio import Lock from typing import Dict from experimaestro.launcherfinder.base import TokenConfiguration from experimaestro.launcherfinder.registry import LauncherRegistry from .ipc import ipcom -from .locking import Lock, LockError +from .locking import LockError from .scheduler.dependencies import Dependency, DependencyStatus, Resource import logging import json @@ -51,9 +52,17 @@ def __init__(self, dependency: "CounterTokenDependency"): def _acquire(self): self.dependency.token.acquire(self.dependency) + async def acquire(self): + self._acquire() + return await super().acquire() + def _release(self): self.dependency.token.release(self.dependency) + def release(self): + self._release() + return super().release() + def __str__(self): return "Lock(%s)" % self.dependency