From 6247a1bd281196b5618d3f696a28c5da5beddaf4 Mon Sep 17 00:00:00 2001 From: Ethaniel Billon Date: Mon, 3 Mar 2025 12:25:47 +0100 Subject: [PATCH 01/14] delete old classes --- src/vip_client/classes/VipClient.py | 326 ---------------------- src/vip_client/classes/VipLoader.py | 405 ---------------------------- 2 files changed, 731 deletions(-) delete mode 100644 src/vip_client/classes/VipClient.py delete mode 100644 src/vip_client/classes/VipLoader.py diff --git a/src/vip_client/classes/VipClient.py b/src/vip_client/classes/VipClient.py deleted file mode 100644 index c9c22cd..0000000 --- a/src/vip_client/classes/VipClient.py +++ /dev/null @@ -1,326 +0,0 @@ -from __future__ import annotations -import json -import os -import re -import time -from contextlib import contextmanager -from pathlib import * - -from vip_client.utils import vip - -class VipClient(): - """ - Base class for the client. - WORK IN PROGRESS - """ - - ################## - ################ Class Attributes ################## - ################## - - # Class name - __name__ = "VipClient" - # Default verbose state - _VERBOSE = True - # Vip portal - _VIP_PORTAL = "https://vip.creatis.insa-lyon.fr/" - # Mail address for support - _VIP_SUPPORT = "vip-support@creatis.insa-lyon.fr" - # Regular expression for invalid characters - _INVALID_CHARS = re.compile(r"[^0-9\.,A-Za-z\-+@/_(): \[\]?&=]") - - ################ - ################ Public Methods ################## - ################ - - # Login to VIP - @classmethod - def init(cls, api_key="VIP_API_KEY", verbose=True) -> VipClient: - """ - Handshakes with VIP using your own API key. - Returns a class instance which properties can be provided as keyword arguments. - - ## Parameters - - `api_key` (str): VIP API key. This can be either: - A. [unsafe] A **string litteral** containing your API key, - B. [safer] A **path to some local file** containing your API key, - C. [safer] The **name of some environment variable** containing your API key (default: "VIP_API_KEY"). - In cases B or C, the API key will be loaded from the local file or the environment variable. - - - `verbose` (bool): default verbose mode for all instances. - - If True, all instances will display logs by default; - - If False, all instance methods will run silently by default. - - - `kwargs` [Optional] (dict): keyword arguments or dictionnary setting properties of the returned instance. - """ - # Set the default verbose mode for all sessions - cls._VERBOSE = verbose - # Check if `api_key` is in a local file or environment variable - true_key = cls._get_api_key(api_key) - # Set User API key - try: - # setApiKey() may return False - assert vip.setApiKey(true_key), \ - f"(!) Unable to set the VIP API key: {true_key}.\nPlease check the key or retry later." - except RuntimeError as vip_error: - # setApiKey() may throw RuntimeError in case of bad key - cls._printc(f"(!) Unable to set the VIP API key: {true_key}.\n Original error message:") - raise vip_error - except(json.decoder.JSONDecodeError) as json_error: - # setApiKey() may throw JSONDecodeError in special cases - cls._printc(f"(!) Unable to set the VIP API key: {true_key}.\n Original error message:") - raise json_error - # Display success - cls._printc() - cls._printc("----------------------------------") - cls._printc("| You are communicating with VIP |") - cls._printc("----------------------------------") - cls._printc() - # ------------------------------------------------ - - ################# - ################ Private Methods ################ - ################# - - # Method to check existence of a distant resource. - @classmethod - def _exists(cls, path, location="vip") -> bool: - """ - Checks existence of a distant resource (`location`="vip"). - `path` can be a string or Pathlib object. - """ - # Check `location` - if location != "vip": - raise NotImplementedError(f"Unknown location: {location}") - # Check path existence - try: - return vip.exists(str(path)) - except RuntimeError as vip_error: - # Connection error with VIP - cls._handle_vip_error(vip_error) - except json.decoder.JSONDecodeError: - raise ValueError( - f"The following path generated an error on VIP:\n\t{path}\n" - + "Please check this path or retry later." - ) - # ------------------------------------------------ - - # Method to create a distant directory - @classmethod - def _create_dir(cls, path: PurePath, location="vip") -> None: - """ - Creates a directory at `path`, on VIP servers if `location` is "vip". - - `path`can be a string or PathLib object. - Returns the VIP path of the newly created folder. - """ - # Check `location` - if location != "vip": - raise NotImplementedError(f"Unknown location: {location}") - # Create directory - try: - if not vip.create_dir(str(path)): - msg = f"The following directoy could not be created on VIP:\n\t{path}\n" - msg += f"Please retry later. Contact VIP support ({cls._VIP_SUPPORT}) if this cannot be fixed." - raise AssertionError(msg) - except RuntimeError as vip_error: - cls._handle_vip_error(vip_error) - except json.decoder.JSONDecodeError as json_error: - raise ValueError( - f"The following path generated an error on VIP:\n\t{path}\n" - + "Please check this path is valid and/or consistent with your other inputs." - ) - # ------------------------------------------------ - - # Function to delete a path - @classmethod - def _delete_path(cls, path: PurePath, location="vip") -> None: - """ - Deletes `path` on `location`. Raises an error if the file exists and could not be removed. - """ - # Check `location` - if location != "vip": - raise NotImplementedError(f"Unknown location: {location}") - # Try path deletion - done = vip.delete_path(str(path)) - # VIP Errors are handled by returning False in `vip.delete_path()`. - if not done and vip.exists(str(path)): - # Raise a generic error if deletion did not work - msg = f"\n'{path}' could not be removed from VIP servers.\n" - msg += "Check your connection with VIP and path existence on the VIP portal.\n" - raise RuntimeError(msg) - # ------------------------------------------------ - - # Function to delete a path on VIP with warning - @classmethod - def _delete_and_check(cls, path: PurePath, location="vip", timeout=300) -> bool: - """ - Deletes `path` on `location` and waits until `path` is actually removed. - After `timeout` (seconds), displays a warning if `path` still exist. - """ - # Delete the path - cls._delete_path(path, location) - # Standby until path is indeed removed (give up after some time) - start = time.time() - t = time.time() - start - while (t < timeout) and cls._exists(path, location): - time.sleep(2) - t = time.time() - start - # Check if the data have indeed been removed - return (t < timeout) - # ------------------------------------------------ - - ########################################################## - # Generic private methods than should work in any subclass - ########################################################## - - # Method to create a directory leaf on the top of any path, at any location - @classmethod - def _mkdirs(cls, path: PurePath, location: str, **kwargs) -> str: - """ - Creates each non-existent directory in `path` (like os.mkdirs()), - in the file system pointed by `location`. - - Directories are created using: cls._create_dir(`path`, `location`, **`kwargs`) - - Existence is checked using: cls._exists(`path`, `location`). - - Returns the newly created part of `path` (empty string if `path` already exists). - """ - # Case : the current path exists - if cls._exists(path=path, location=location) : - return "" - # Find the 1rst non-existent node in the arborescence - first_node = path - while not cls._exists(first_node.parent, location=location): - first_node = first_node.parent - # Create the first node - cls._create_dir(path=first_node, location=location, **kwargs) - # Make the other nodes one by one - dir_to_make = first_node - while dir_to_make != path: - # Find the next directory to make - dir_to_make /= path.relative_to(dir_to_make).parts[0] - # Make the directory - cls._create_dir(path=dir_to_make, location=location, **kwargs) - # Return the created nodes - return str(path.relative_to(first_node.parent)) - # ------------------------------------------------ - - ################################################## - # Context managers - ################################################## - - # Simple context manager to silence logs from class methods while executing code - @classmethod - @contextmanager - def _silent_class(cls) -> None: - """ - Under this context, the session will not print anything. - """ - verbose = cls._VERBOSE # save verbose mode - cls._VERBOSE = False # silence instance logs - yield - cls._VERBOSE = verbose # restore verbose mode - # ------------------------------------------------ - - # init - ################################################# - @classmethod - def _get_api_key(cls, api_key: str) -> str: - """ - - `api_key` (str): VIP API key. This can be either: - A. [unsafe] A **string litteral** containing your API key, - B. [safer] A **path to some local file** containing your API key, - C. [safer] The **name of some environment variable** containing your API key (default: "VIP_API_KEY"). - In cases B or C, the API key will be loaded from the local file or the environment variable. - """ - # Check if `api_key` is in a local file or environment variable - if os.path.isfile(api_key): # local file - with open(api_key, "r") as kfile: - true_key = kfile.read().strip() - elif api_key in os.environ: # environment variable - true_key = os.environ[api_key] - else: # string litteral - true_key = api_key - # Return - return true_key - # ------------------------------------------------ - - # Function to check invalid characters in some input string - @classmethod - def _invalid_chars(cls, value) -> list: - """ - Returns a list of invalid characters in `value`. - Value can be a list or any object convertible to string. - """ - if isinstance(value, list): - return sorted(list({v for val in value for v in cls._INVALID_CHARS.findall(str(val))})) - else: - return sorted(cls._INVALID_CHARS.findall(str(value))) - # ------------------------------------------------ - - # Function to clean HTML text when loaded from VIP portal - @staticmethod - def _clean_html(text: str) -> str: - """Returns `text` without html tags and newline characters.""" - return re.sub(r'<[^>]+>|\n', '', text) - - ######################################## - # SESSION LOGS & USER VIEW - ######################################## - - @classmethod - # Interface to print logs from class methods - def _printc(cls, *args, **kwargs) -> None: - """ - Print logs from class methods only when cls._VERBOSE is True. - """ - if cls._VERBOSE: - print(*args, **kwargs) - # ------------------------------------------------ - - # Function to handle VIP runtime errors and provide interpretation to the user - @classmethod - def _handle_vip_error(cls, vip_error: RuntimeError) -> None: - """ - Rethrows a RuntimeError `vip_error` which occured in the VIP API, - with interpretation depending on the error code. - """ - # Enumerate error cases - message = vip_error.args[0] - if message.startswith("Error 8002") or message.startswith("Error 8003") \ - or message.startswith("Error 8004"): - # "Bad credentials" / "Full authentication required" / "Authentication error" - interpret = ( - "Unable to communicate with VIP." - + f"\nRun {cls.__name__}.init() with a valid API key to handshake with VIP servers" - + f"\n({message})" - ) - elif message.startswith("Error 8000"): - # Probably wrong values were fed in `vip.init_exec()` - interpret = ( - f"\n\t'{message}'" - + "\nPlease carefully check that session_name / pipeline_id / input_parameters " - + "are valid and do not contain any forbidden character" - + "\nIf this cannot be fixed, contact VIP support ()" - ) - elif message.startswith("Error 2000") or message.startswith("Error 2001"): - # Maximum number of executions - interpret = ( - f"\n\t'{message}'" - + "\nPlease wait until current executions are over, " - + f"or contact VIP support ({cls._VIP_SUPPORT}) to increase this limit" - ) - else: - # Unhandled runtime error - interpret=( - f"\n\t{message}" - + f"\nIf this cannot be fixed, contact VIP support ({cls._VIP_SUPPORT})" - ) - # Display the error message - raise RuntimeError(interpret) from None - # ------------------------------------------------ - -####################################################### - -if __name__=="__main__": - pass \ No newline at end of file diff --git a/src/vip_client/classes/VipLoader.py b/src/vip_client/classes/VipLoader.py deleted file mode 100644 index df000a3..0000000 --- a/src/vip_client/classes/VipLoader.py +++ /dev/null @@ -1,405 +0,0 @@ -from __future__ import annotations -import os -import tarfile -from pathlib import * - -from vip_client.utils import vip -from vip_client.classes.VipClient import VipClient - -class VipLoader(VipClient): - """ - Python class to upload / download files to / from VIP servers. - WORK IN PROGRESS - - N.B.: all instance methods require that `VipLoader.init()` has been called with a valid API key. - See GitHub documentation to get your own VIP API key. - """ - - ################## - ################ Class Attributes ################## - ################## - - # Class name - __name__ = "VipLoader" - # Default verbose state - _VERBOSE = True - # List of known directory contents - _VIP_TREE = {} - - ################ - ################ Public Methods ################## - ################ - - @classmethod - def list_dir(cls, vip_path: PurePosixPath) -> list[str]: - """ - Returns a list of directories under `vip_path` [str or os.PathLike]. - """ - return [ - PurePosixPath(element["path"]).name - for element in cls._list_dir_vip(PurePosixPath(vip_path), update=True) - ] - - @classmethod - def download_dir(cls, vip_path, local_path, unzip=True): - """ - Download all files from `vip_path` to `local_path` (if needed). - Displays what it does if `cls._VERBOSE` is True. - Returns a dictionary of failed downloads. - """ - cls._printc("Recursive download from:", vip_path) - # Path-ify - vip_path = PurePosixPath(vip_path) - local_path = Path(local_path) - # Assert folder existence on VIP - if not cls._exists(vip_path, location='vip'): - raise FileNotFoundError("Folder does not exist on VIP.") - # Scan the distant and local directories and get a list of files to download - cls._printc("\nCloning the distant folder tree") - cls._printc("-------------------------------") - files_to_download = cls._init_download_dir(vip_path, local_path) - cls._printc("-------------------------------") - cls._printc("Done.") - # Download the files from VIP servers & keep track of the failures - cls._printc("\nParallel download of the distant files") - cls._printc("--------------------------------------") - failures = cls._download_parallel(files_to_download, unzip) - cls._printc("--------------------------------------") - cls._printc("End of parallel downloads\n") - if not failures : - return - # Retry in case of failure - cls._printc(len(failures), "files could not be downloaded from VIP.") - cls._printc("\nGiving a second try") - cls._printc("---------------------") - failures = cls._download_parallel(failures, unzip) - cls._printc("---------------------") - cls._printc("End of the process.") - if failures : - cls._printc("The following files could not be downloaded from VIP:", end="\n\t") - cls._printc("\n\t".join([str(file) for file, _ in failures])) - # ------------------------------------------------ - - - ################# - ################ Private Methods ################ - ################# - - @classmethod - def _list_content_vip(cls, vip_path: PurePosixPath, update=True) -> list[dict]: - """ - Updates `cls._VIP_TREE` with the content of `vip_path` on VIP servers. - """ - if update or (vip_path not in cls._VIP_TREE): - cls._VIP_TREE[vip_path] = vip.list_content(str(vip_path)) - return cls._VIP_TREE[vip_path] - # ------------------------------------------------ - - @classmethod - def _list_files_vip(cls, vip_path: PurePosixPath, update=True) -> list[dict]: - return [ - element - for element in cls._list_content_vip(vip_path, update) - if element['exists'] and not element['isDirectory'] - ] - # ------------------------------------------------ - - @classmethod - def _list_dir_vip(cls, vip_path: PurePosixPath, update=True) -> list[dict]: - return [ - element - for element in cls._list_content_vip(vip_path, update) - if element['exists'] and element['isDirectory'] - ] - # ------------------------------------------------ - - ######################### - # Methods to be optimized - ######################### - - # Method to check existence of a distant or local resource. - @classmethod - def _exists(cls, path: PurePath, location="local") -> bool: - """ - Checks existence of a distant (`location`="vip") or local (`location`="local") resource. - `path` can be a string or path-like object. - """ - # Check path existence in `location` - if location=="local": - return os.path.exists(path) - else: - return super()._exists(path=path, location=location) - # ------------------------------------------------ - - # Method to create a distant or local directory - @classmethod - def _create_dir(cls, path: PurePath, location="local", **kwargs) -> None: - """ - Creates a directory at `path` : - - locally if `location` is "local"; - - on VIP if `location` is "vip". - - `kwargs` are passed as keyword arguments to `Path.mkdir()`. - Returns the VIP or local path of the newly created folder. - """ - if location == "local": - # Check input type - path=Path(path) - # Check the parent is a directory - assert path.parent.is_dir(),\ - f"Cannot create subdirectories in '{path.parent}': not a folder" - # Create the new directory with additional keyword arguments - path.mkdir(**kwargs) - else: - return super()._create_dir(path=path, location=location, **kwargs) - # ------------------------------------------------ - - @classmethod - def _init_download_dir(cls, vip_path: PurePosixPath, local_path: Path) -> dict: - """ - Copy the folder tree under `vip_path` to `local_path` - - Returns a dictionary of files within `vip_path` that are not in `local_paths`. - Dictionary keys: (vip_path, local_path). - Dictionary values: file metadata. - """ - # First display - cls._printc(f"{local_path} : ", end="") - # Scan the current VIP directory - cls._list_content_vip(vip_path) - # Look for files - all_files = cls._list_files_vip(vip_path, update=False) - # Scan the local directory and look for files to download - if cls._mkdirs(local_path, location="local"): - # The local directory did not exist before call - cls._printc("Created.") - # -> download all the files (no scan to save time) - else: - # The local directory already exists - cls._printc("Already there.") - # Scan it to check if there are more files to download - local_filenames = { - elem.name for elem in local_path.iterdir() if elem.exists() - } - # Get the files to download - all_files = [ - element for element in all_files - if PurePosixPath(element["path"]).name not in local_filenames - ] - # Return files to download as a dictionary - files_to_download = {} - for file in all_files: - # Dict key: VIP & local paths - file_vip_path = PurePosixPath(file["path"]) - file_local_path = local_path / file_vip_path.name - files_to_download[(file_vip_path, file_local_path)] = { - # Dict value: Metadata - key: value for key, value in file.items() if key!="path" - } - # Recurse this function over sub-directories - for subdir in cls._list_dir_vip(vip_path, update=False): - subdir_path = PurePosixPath(subdir["path"]) - # Scan the subdirectory - new_files = cls._init_download_dir( - vip_path = subdir_path, - local_path = local_path / subdir_path.name, - ) - # Update the list of files to download - files_to_download.update(new_files) - return files_to_download - # ------------------------------------------------ - - # Method do download files using parallel threads - @classmethod - def _download_parallel(cls, files_to_download: dict, unzip: bool): - """ - Downloads files from VIP using parallel threads. - - `files_to_download`: Dictionnary with key: (vip_path, local_path) and value: metadata. - - `unzip`: if True, extracts the tarballs inplace after the download. - - Returns a list of failed downloads. - """ - # Copy the input - files_to_download = files_to_download.copy() - # Return if there is no file to download - if not files_to_download: - cls._printc("No file to download.") - return files_to_download - # Check the amount of data - try: total_size = "%.1fMB" % sum([file['size']/(1<<20) for file in files_to_download.values()]) - except: total_size = "unknown" - # Display - cls._printc(f"Downloading {len(files_to_download)} file(s) (total size: {total_size})...") - # Sort the files to download by size - try: - file_list = sorted(files_to_download.keys(), key=lambda file: files_to_download[file]["size"]) - except: - file_list = list(files_to_download) - # Download the files from VIP servers - nFile = 0 - nb_files = len(files_to_download) - for file, done in vip.download_parallel(file_list): - nFile += 1 - # Get informations about the new file - vip_path, local_path = file - file_info = files_to_download[file] - file_size = "[%.1fMB]" % (file_info["size"]/(1<<20)) if "size" in file_info else "" - if done: - # Remove file from the list - file_info = files_to_download.pop(file) - # Display success - cls._printc(f"- [{nFile}/{nb_files}] DONE:", local_path, file_size, flush=True) - # If the output is a tarball, extract the files and delete the tarball - if unzip and tarfile.is_tarfile(local_path): - cls._printc("\tExtracting archive ...", end=" ") - if cls._extract_tarball(local_path): - cls._printc("Done.") # Display success - else: - cls._printc("Extraction failed.") # Display failure - else: - # Display failure - cls._printc(f"- [{nFile}/{nb_files}] FAILED:", vip_path, file_size, flush=True) - # Return failed downloads - return files_to_download - # ------------------------------------------------ - - # Function to download a single file from VIP - @classmethod - def _download_file(cls, vip_path: PurePosixPath, local_path: Path) -> bool: - """ - Downloads a single file in `vip_path` to `local_path`. - Returns a success flag. - """ - # Download (file existence on VIP is not checked to save time) - try: - return vip.download(str(vip_path), str(local_path)) - except RuntimeError as vip_error: - cls._handle_vip_error(vip_error) - # ------------------------------------------------ - - # Method to extract content from a tarball - @classmethod - def _extract_tarball(cls, local_file: Path): - """ - Replaces tarball `local_file` by a directory with the same name - and extracted content. - Returns success flag. - """ - # Rename current archive - archive = local_file.parent / "tmp.tgz" - os.rename(local_file, archive) # pathlib version does not work it in Python 3.7 - # Create a new directory to store archive content - cls._mkdirs(local_file, location="local") - # Extract archive content - try: - with tarfile.open(archive) as tgz: - tgz.extractall(path=local_file) - success = True - except: - success = False - # Deal with the temporary archive - if success: # Remove the archive - os.remove(archive) - else: # Rename the archive - os.rename(archive, local_file) - # Return the flag - return success - # ------------------------------------------------ - - - # Function to upload all files from a local directory - @classmethod - def _upload_dir(cls, local_path: Path, vip_path: PurePosixPath) -> list: - """ - Uploads all files in `local_path` to `vip_path` (if needed). - Displays what it does if `cls._VERBOSE` is True. - Returns a list of files which failed to be uploaded on VIP. - """ - # Scan the local directory - assert cls._exists(local_path, location='local'), f"{local_path} does not exist." - # First display - cls._printc(f"Cloning: {local_path} ", end="... ") - # Scan the distant directory and look for files to upload - if cls._mkdirs(vip_path, location="vip"): - # The distant directory did not exist before call - # -> upload all the data (no scan to save time) - files_to_upload = [ - elem for elem in local_path.iterdir() - if elem.is_file() - ] - cls._printc("(Created on VIP)") - if files_to_upload: - cls._printc(f"\t{len(files_to_upload)} file(s) to upload.") - else: # The distant directory already exists - # Scan it to check if there are more files to upload - vip_filenames = { - PurePosixPath(element["path"]).name - for element in vip.list_elements(str(vip_path)) - } - # Get the files to upload - files_to_upload = [ - elem for elem in local_path.iterdir() - if elem.is_file() and (elem.name not in vip_filenames) - ] - # Update the display - if files_to_upload: - cls._printc(f"\n\tVIP clone already exists and will be updated with {len(files_to_upload)} file(s).") - else: - cls._printc("Already on VIP.") - # Upload the files - nFile = 0 - failures = [] - for local_file in files_to_upload : - nFile+=1 - # Get the file size (if possible) - try: size = f"{local_file.stat().st_size/(1<<20):,.1f}MB" - except: size = "unknown size" - # Display the current file - cls._printc(f"\t[{nFile}/{len(files_to_upload)}] Uploading file: {local_file.name} ({size}) ...", end=" ") - # Upload the file on VIP - vip_file = vip_path/local_file.name # file path on VIP - if cls._upload_file(local_path=local_file, vip_path=vip_file): - # Upload was successful - cls._printc("Done.") - else: - # Update display - cls._printc(f"\n(!) Something went wrong during the upload.") - # Update missing files - failures.append(str(local_file)) - # Look for sub-directories - subdirs = [ - elem for elem in local_path.iterdir() - if elem.is_dir() - ] - # Recurse this function over sub-directories - for subdir in subdirs: - failures += cls._upload_dir( - local_path=subdir, - vip_path=vip_path/subdir.name - ) - # Return the list of failures - return failures - # ------------------------------------------------ - - # Function to upload a single file on VIP - @classmethod - def _upload_file(cls, local_path: Path, vip_path: PurePosixPath) -> bool: - """ - Uploads a single file in `local_path` to `vip_path`. - Returns a success flag. - """ - # Check - assert local_path.exists(), f"{local_path} does not exist." - # Upload - try: - return vip.upload(str(local_path), str(vip_path)) - except RuntimeError as vip_error: - cls._handle_vip_error(vip_error) - # ------------------------------------------------ - - -####################################################### - -if __name__=="__main__": - pass - \ No newline at end of file From 7d9a5055e910a7d509b2758c53a0bf899b822c81 Mon Sep 17 00:00:00 2001 From: Ethaniel Billon Date: Mon, 3 Mar 2025 13:09:18 +0100 Subject: [PATCH 02/14] allow keeping outputs on vip --- src/vip_client/classes/VipLauncher.py | 8 ++++---- src/vip_client/classes/VipSession.py | 17 +++++++++++------ 2 files changed, 15 insertions(+), 10 deletions(-) diff --git a/src/vip_client/classes/VipLauncher.py b/src/vip_client/classes/VipLauncher.py index b24804c..bf1405c 100644 --- a/src/vip_client/classes/VipLauncher.py +++ b/src/vip_client/classes/VipLauncher.py @@ -614,7 +614,7 @@ def run_session( self, nb_runs=1, refresh_time=30) -> VipLauncher: # ------------------------------------------------ # Clean session data on VIP - def finish(self, timeout=300) -> VipLauncher: + def finish(self, timeout=300, **kwargs) -> VipLauncher: """ Removes session's output data from VIP servers. @@ -641,7 +641,7 @@ def finish(self, timeout=300) -> VipLauncher: self._print("---------------------") # Browse paths to delete success = True - for path, location in self._path_to_delete().items(): + for path, location in self._path_to_delete(**kwargs).items(): # Display progression self._print(f"[{location}] {path} ... ", end="", flush=True) # Check data existence @@ -694,7 +694,7 @@ def finish(self, timeout=300) -> VipLauncher: else: self._print("(!) There may still be temporary data on VIP.") self._print(f"Please run finish() again or check the following path(s) on the VIP portal ({self._VIP_PORTAL}):") - self._print('\n\t'.join([str(path) for path in self._path_to_delete()])) + self._print('\n\t'.join([str(path) for path in self._path_to_delete(**kwargs)])) # Finish display self._print() # Return @@ -838,7 +838,7 @@ def display(self) -> VipLauncher: ################################################################### # Path to delete during session finish - def _path_to_delete(self) -> dict: + def _path_to_delete(self, **kwargs) -> dict: """Returns the folders to delete during session finish, with appropriate location.""" return { self._vip_output_dir: "vip" diff --git a/src/vip_client/classes/VipSession.py b/src/vip_client/classes/VipSession.py index 00ad805..7485c28 100644 --- a/src/vip_client/classes/VipSession.py +++ b/src/vip_client/classes/VipSession.py @@ -563,7 +563,7 @@ def run_session( ) # Clean session data on VIP - def finish(self, timeout=300) -> VipSession: + def finish(self, timeout=300, keep_output=False) -> VipSession: """ Removes session's data from VIP servers (INPUTS and OUTPUTS). The downloaded outputs and the input dataset are kept on the local machine. @@ -574,7 +574,7 @@ def finish(self, timeout=300) -> VipSession: - Workflows status are set to "Removed" when the corresponding outputs have been removed from VIP servers. """ # Finish the session based on self._path_to_delete() - super().finish(timeout=timeout) + super().finish(timeout=timeout, keep_output=keep_output) # Check if the input data have been erased (this is not the case when get_inputs have been used) if (self._vip_input_dir != self._vip_dir / "INPUTS" and self._exists(self._vip_input_dir, location="vip")): @@ -673,11 +673,16 @@ def get_inputs(self, session: VipSession, get_pipeline=False, get_settings=False ################################################################### # Path to delete during session finish() - def _path_to_delete(self) -> dict: + def _path_to_delete(self, **kwargs) -> dict: """Returns the folders to delete during session finish, with appropriate location.""" - return { - self._vip_dir: "vip" - } + if (kwargs.get("keep_output", False)): + return { + self._vip_dir / "INPUTS": "vip" + } + else: + return { + self._vip_dir: "vip" + } # Method to check existence of a distant or local resource. @classmethod From e36966d8d5616b111c9e9da8f55ec975883cc79e Mon Sep 17 00:00:00 2001 From: Ethaniel Billon Date: Mon, 3 Mar 2025 13:42:55 +0100 Subject: [PATCH 03/14] renaming VipCI to VipGirder --- doc/history.md | 4 +- doc/source.md | 2 +- examples/tutorials/exemple_VipCI.ipynb | 16 +- src/vip_client/classes/VipCI.py | 671 --------------------- src/vip_client/classes/__init__.py | 6 +- tests/README.md | 4 +- tests/{test_VipCI.py => test_VipGirder.py} | 24 +- tests/test_VipLauncher.py | 2 - tests/test_VipSession.py | 3 - tests/test_global.py | 12 +- tests/try_viploader.py | 18 - 11 files changed, 31 insertions(+), 731 deletions(-) delete mode 100644 src/vip_client/classes/VipCI.py rename tests/{test_VipCI.py => test_VipGirder.py} (90%) delete mode 100644 tests/try_viploader.py diff --git a/doc/history.md b/doc/history.md index c2d0ee1..504ccdc 100644 --- a/doc/history.md +++ b/doc/history.md @@ -22,12 +22,12 @@ ### June 2023 -- Class [`VipLauncher`](#viplauncher) is introduced for specific user needs, as a parent of `VipSession` & `VipCI`; +- Class [`VipLauncher`](#viplauncher) is introduced for specific user needs, as a parent of `VipSession` & `VipGirder`; - Session properties (`session_name`, `pipeline_id`, *etc.*) can be safely accessed and modified in all "`Vip*`" classes; - A list of available pipelines and detailed informations about each pipeline can be displayed through new class method `show_pipeline()`; ### April 2023 -- Class [`VipCI`](#vipci) to interacts with Girder datasets (tailored for CI tests in the ReproVIP project). +- Class [`VipGirder`](#VipGirder) to interacts with Girder datasets (tailored for CI tests in the ReproVIP project). ### March 2023 - Class [`VipSession`](#vipsession): user-friendly interface to run VIP jobs on local datasets. diff --git a/doc/source.md b/doc/source.md index b3a5539..34f71e4 100644 --- a/doc/source.md +++ b/doc/source.md @@ -12,7 +12,7 @@ The most user-friendly class to interact with VIP. See the documentation [here]( A parent class of `VipSession` that implements everything needed to launch VIP applications on remote data sets. *More information to come*. -### [vip_client.classes.**VipCI**](../src/vip_client/classes/VipCI.py) +### [vip_client.classes.**VipGirder**](../src/vip_client/classes/VipGirder.py) [Prototype] A `VipLauncher` implementation to launch VIP application on [Girder](https://girder.readthedocs.io/en/latest/) datasets. Currently used for continuous integration (CI) tests on the VIP platform. diff --git a/examples/tutorials/exemple_VipCI.ipynb b/examples/tutorials/exemple_VipCI.ipynb index fa78792..06798bc 100644 --- a/examples/tutorials/exemple_VipCI.ipynb +++ b/examples/tutorials/exemple_VipCI.ipynb @@ -11,12 +11,12 @@ }, { "cell_type": "code", - "execution_count": 1, + "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Import the class\n", - "from vip_client.classes import VipCI\n", + "from vip_client.classes import VipGirder\n", "import time\n", "\n", "# Pipeline identifier\n", @@ -66,7 +66,7 @@ }, { "cell_type": "code", - "execution_count": 2, + "execution_count": null, "metadata": {}, "outputs": [ { @@ -131,13 +131,13 @@ ], "source": [ "# Connect with Vip & Girder\n", - "VipCI.init(\n", + "VipGirder.init(\n", " vip_key=\"VIP_API_KEY\", # My environment variable for the VIP API key (also works with litteral string or file name)\n", " girder_key=\"GIRDER_API_KEY\" # My environment variable for the Girder API key (also works with litteral string or file name)\n", ")\n", "\n", "# Create a Session\n", - "session = VipCI(\n", + "session = VipGirder(\n", " session_name=\"Test_Girder\", # Session Name\n", ")\n", "\n", @@ -170,7 +170,7 @@ "source": [ "__N.B.__: The last cell could also be written with a single line of code:\n", "```python\n", - "VipCI.init(\n", + "VipGirder.init(\n", " vip_key=\"VIP_API_KEY\", # My environment variable for the VIP API key \n", " girder_key=\"GIRDER_API_KEY\", # My environment variable for the Girder API key\n", " session_name = \"Test_Girder\", # Session Name\n", @@ -194,7 +194,7 @@ }, { "cell_type": "code", - "execution_count": 4, + "execution_count": null, "metadata": {}, "outputs": [ { @@ -260,7 +260,7 @@ } ], "source": [ - "VipCI(my_output_dir).run_session(nb_runs=2)" + "VipGirder(my_output_dir).run_session(nb_runs=2)" ] }, { diff --git a/src/vip_client/classes/VipCI.py b/src/vip_client/classes/VipCI.py deleted file mode 100644 index 456bd7b..0000000 --- a/src/vip_client/classes/VipCI.py +++ /dev/null @@ -1,671 +0,0 @@ -# Builtins -from __future__ import annotations -import os -import time -from pathlib import * -# Try importing the Girder client -try: - import girder_client -except: - from warnings import warn - warn("vip_client.classes.VipCI is unavailable (missing package: girder-client)") -# Other classes from VIP client -from vip_client.utils import vip -from vip_client.classes.VipLauncher import VipLauncher - -class VipCI(VipLauncher): - """ - Python class to run VIP pipelines on datasets located on Girder. - - A single instance allows to run 1 pipeline with 1 parameter set (any number of runs). - Pipeline runs need at least three inputs: - - `pipeline_id` (str) Name of the pipeline. - - `input_settings` (dict) All parameters needed to run the pipeline. - - `output_dir` (str) Path to a Girder folder where execution results will be stored. - - N.B.: all instance methods require that `VipCI.init()` has been called with: - - a valid VIP API key; - - a valid Girder API key. - """ - - ################## - ################ Class Attributes ################## - ################## - - # --- Overriden from the parent class --- - - # Class name - __name__ = "VipCI" - # Properties to save / display for this class - _PROPERTIES = [ - "session_name", - "pipeline_id", - "vip_output_dir", - "input_settings", - "workflows" - ] - # Default location for VIP inputs/outputs (different from the parent class) - _SERVER_NAME = "girder" - # Prefix that defines a Girder path - _SERVER_PATH_PREFIX = "/collection" - # Default backup location - # (set to None to avoid saving and loading backup files) - _BACKUP_LOCATION = "girder" - - # Prefix that defines a Girder ID - _GIRDER_ID_PREFIX = "pilotGirder" - # Grider portal - _GIRDER_PORTAL = 'https://pilot-warehouse.creatis.insa-lyon.fr/api/v1' - - - ################# - ################ Main Properties ################## - ################# - - @property - def custom_wf_metadata(self) -> dict: - return self._custom_wf_metadata - - @custom_wf_metadata.setter - def custom_wf_metadata(self, value: dict) -> None: - if value != None: - assert isinstance(value, dict), f"Custom metadata must be a dictionary, not {type(value)}" - self._custom_wf_metadata = value - - - - ############# - ################ Constructor ################## - ############# - def __init__( - self, output_dir=None, pipeline_id: str=None, input_settings: dict=None, - session_name: str=None, verbose: bool=None, custom_wf_metadata: dict=None - ) -> None: - """ - Creates a VipCI instance and sets its properties from keyword arguments. - - ## Parameters - - `output_dir` (str | os.PathLike) Path to a Girder folder where execution results will be stored. - - Does not need to exist - - Usually in format : "/collection/[collection_name]/[path_to_folder]" - - User must have read/write permissions on the Girder collection/folder. - - - `pipeline_id` (str) Name of your pipeline in VIP. - - Usually in format : *application_name*/*version*. - - Run VipLauncher.show_pipeline() to display available pipelines. - - - `input_settings` (dict) All parameters needed to run the pipeline. - - Run VipSession.show_pipeline(`pipeline_id`) to display these parameters. - - The dictionary can contain any object that can be converted to strings, or lists of such objects. - - Lists of parameters launch parallel workflows on VIP. - - - `session_name` [Optional/Recommended] (str) A name to identify this session. - - Default value: 'VipCI-[date]-[time]-[id]' - - - `verbose` [Optional] (bool) Verbose mode for this instance. - - If True, instance methods will display logs; - - If False, instance methods will run silently. - - - `custom_wf_metadata` [Optional] (dict) Custom metadata to add to each workflow. - - `session_name` is only set at instantiation; other properties can be set later in function calls. - If `output_dir` leads to data from a previous session, properties will be loaded from the metadata on Girder. - """ - # Initialize with the name, pipeline and input settings - super().__init__( - output_dir = output_dir, - session_name = session_name, - pipeline_id = pipeline_id, - input_settings = input_settings, - verbose = verbose - ) - # Set custom properties - self.custom_wf_metadata = custom_wf_metadata - # End display - if any([session_name, output_dir]) and (self.__name__ == "VipCI"): - self._print() - # ------------------------------------------------ - - ################ - ################ Public Methods ################## - ################ - - ################################################# - # Manage a session from start to finish - ################################################# - - # Login to VIP and Girder - @classmethod - def init( - cls, - vip_key="VIP_API_KEY", - girder_key="GIRDER_API_KEY", - verbose=True, - girder_api_url=None, - girder_id_prefix=None, - **kwargs - ) -> VipCI: - """ - Handshakes with VIP using your own API key. - Returns a class instance which properties can be provided as keyword arguments. - - ## Parameters - - `vip_key` (str): VIP API key. This can be either: - A. [unsafe] A **string litteral** containing your API key, - B. [safer] A **path to some local file** containing your API key, - C. [safer] The **name of some environment variable** containing your API key (default: "VIP_API_KEY"). - In cases B or C, the API key will be loaded from the local file or the environment variable. - - - `girder_key` (str): Girder API key. Can take the same values as `vip_key`. - - - `verbose` (bool): default verbose mode for all instances. - - If True, all instances will display logs by default; - - If False, all instance methods will run silently by default. - - - `kwargs` [Optional] (dict): keyword arguments or dictionnary setting properties of the returned instance. - """ - # Initiate a Vip Session silently - super().init(api_key=vip_key, verbose=False) - # Restore the verbose state - cls._VERBOSE = verbose - # Set the Girder ID prefix - cls._GIRDER_ID_PREFIX = girder_id_prefix if girder_id_prefix is not None else cls._GIRDER_ID_PREFIX - cls._GIRDER_PORTAL = girder_api_url if girder_api_url is not None else cls._GIRDER_PORTAL - # Instantiate a Girder client - cls._girder_client = girder_client.GirderClient(apiUrl=girder_api_url) - # Check if `girder_key` is in a local file or environment variable - true_key = cls._get_api_key(girder_key) - # Authenticate with Girder API key - cls._girder_client.authenticate(apiKey=true_key) - # Diplay success - cls._printc() - cls._printc("---------------------------------------------") - cls._printc("| You are communicating with VIP and Girder |") - cls._printc("---------------------------------------------") - cls._printc() - # Return a VipCI instance for method cascading - return cls(verbose=(verbose and kwargs), **kwargs) - # ------------------------------------------------ - - # Launch the pipeline on VIP - def launch_pipeline( - self, pipeline_id: str=None, input_settings: dict=None, output_dir=None, nb_runs=1, - verbose: bool=None - ) -> VipCI: - """ - Launches pipeline executions on VIP. - - Input parameters : - - `pipeline_id` (str) The name of your pipeline in VIP, - usually in format : *application_name*/*version*. - - `input_settings` (dict) All parameters needed to run the pipeline. - - Run VipSession.show_pipeline(`pipeline_id`) to display these parameters. - - The dictionary can contain any object that can be converted to strings, or lists of such objects. - - Lists of parameters launch parallel workflows on VIP. - - `output_dir` (str) Path to the VIP folder where execution results will be stored. - - `nb_runs` (int) Number of parallel workflows to launch with the same settings. - - Default behaviour: - - Raises AssertionError in case of wrong inputs - - Raises RuntimeError in case of failure on VIP servers. - - In any case, session is backed up after pipeline launch - """ - return super().launch_pipeline( - pipeline_id = pipeline_id, # default - input_settings = input_settings, # default - output_dir = output_dir, # default - nb_runs = nb_runs, # default - ) - # ------------------------------------------------ - - # Monitor worflow executions on VIP - def monitor_workflows(self, refresh_time=30) -> VipCI: - """ - Updates and displays the status of each execution launched in the current session. - - If an execution is still runnig, updates status every `refresh_time` (seconds) until all runs are done. - - Displays a full report when all executions are done. - """ - return super().monitor_workflows(refresh_time=refresh_time) - # ------------------------------------------------ - - # Run a full VipCI session - def run_session(self, nb_runs=1, refresh_time=30) -> VipCI: - """ - Runs a full session from Girder data: - 1. Launches pipeline executions on VIP; - 2. Monitors pipeline executions until they are all over; - and Adds metadata on Girder output folder. - - |!| This function assumes that all session properties are already set. - Optional arguments can be provided: - - Increase `nb_runs` to run more than 1 execution at once; - - Set `refresh_time` to modify the default refresh time. - """ - return super().run_session(nb_runs=nb_runs, refresh_time=refresh_time) - # ------------------------------------------------ - - # Display session properties in their current state - def display(self) -> VipCI: - """ - Displays useful properties in JSON format. - - `session_name` : current session name - - `pipeline_id`: pipeline identifier - - `output_dir` : path to the pipeline outputs - - `input_settings` : input parameters sent to VIP - - `workflows`: workflow inventory, identifying all pipeline runs in this session. - """ - # Return for method cascading - return super().display() - # ------------------------------------------------ - - # Return error in case of call to finish() - def finish(self, verbose: bool=None) -> None: - """ - This function does not work in VipCI. - """ - # Update the verbose state and display - self._verbose = verbose - self._print("\n=== FINISH ===\n", max_space=2) - # Raise error message - raise NotImplementedError(f"Class {self.__name__} cannot delete the distant data.") - # ------------------------------------------------ - - ################# - ################ Private Methods ################ - ################# - - ################################################################### - # Methods that must be overwritten to adapt VipLauncher methods to - # new location: "girder" - ################################################################### - - # Path to delete during session finish - def _path_to_delete(self) -> dict: - """Returns the folders to delete during session finish, with appropriate location.""" - return {} - # ------------------------------------------------ - - # Method to check existence of a resource on Girder. - @classmethod - def _exists(cls, path: PurePath, location="girder") -> bool: - """ - Checks existence of a resource on Girder. - """ - # Check path existence in `location` - if location=="girder": - try: - cls._girder_client.resourceLookup(path=str(path)) - return True - except girder_client.HttpError: - return False - else: - raise NotImplementedError(f"Unknown location: {location}") - # ------------------------------------------------ - - # Method to create a distant or local directory - @classmethod - def _create_dir(cls, path: PurePath, location="girder", **kwargs) -> str: - """ - Creates a directory at `path` on Girder if `location` is "girder". - - `path` should be a PathLib object. - `kwargs` can be passed as keyword arguments to `girder-client.createFolder()`. - Returns the Girder ID of the newly created folder. - """ - if location == "girder": - # Find the parent ID and type - parentId, parentType = cls._girder_path_to_id(str(path.parent)) - # Check the parent is a directory - if not (parentType == "folder"): - raise ValueError(f"Cannot create folder {path} in '{path.parent}': parent is not a Girder folder") - # Create the new directory with additional keyword arguments - return cls._girder_client.createFolder( - parentId=parentId, name=str(path.name), reuseExisting=True, **kwargs - )["_id"] - else: - raise NotImplementedError(f"Unknown location: {location}") - # ------------------------------------------------ - - # Function to delete a path - @classmethod - def _delete_path(cls, path: PurePath, location="vip") -> None: - raise NotImplementedError("VipCI cannot delete data.") - - # Function to delete a path on VIP with warning - @classmethod - def _delete_and_check(cls, path: PurePath, location="vip", timeout=300) -> bool: - raise NotImplementedError("VipCI cannot delete data.") - - #################################################### - # Launch & Monitor pipeline executions from Girder # - #################################################### - - def _init_exec(self) -> str: - """ - Initiates one VIP workflow with `pipeline_id`, `session_name`, `input_settings`, `output_dir`. - Returns the workflow identifier. - """ - # Get function arguments - # input_settings = self._vip_input_settings(self._input_settings) - input_settings = self._get_input_settings(location="vip-girder") - # Create a workflow-specific result directory - res_path = self._vip_output_dir / time.strftime('%Y-%m-%d_%H:%M:%S', time.localtime()) - # no simple way to rename later with workflow_id - res_id = self._create_dir( - path=res_path, location="girder", - description=f"VIP outputs from one workflow in Session '{self._session_name}'" - ) - res_vip = self._vip_girder_id(res_id) - # Launch execution - workflow_id = vip.init_exec( - pipeline = self.pipeline_id, - name = self.session_name, - inputValues = input_settings, - resultsLocation = res_vip - ) - # Record the path to output files (create the workflow entry) - self._workflows[workflow_id] = {"output_path": str(res_path)} - return workflow_id - # ------------------------------------------------ - - # Function extract metadata from a single workflow - def _meta_workflow(self, workflow_id: str) -> dict: - metadata = { - "session_name": self._session_name, - "workflow_id": workflow_id, - "workflow_start": self._workflows[workflow_id]["start"], - "workflow_status": self._workflows[workflow_id]["status"] - } - # If custom metadata is provided, add it to the metadata - if self.custom_wf_metadata is not None: - metadata = {**metadata, **self.custom_wf_metadata} - return metadata - - # Overwrite _get_exec_infos() to bypass call to vip.get_exec_results() (does not work at this time) - @classmethod - def _get_exec_infos(cls, workflow_id: str) -> dict: - """ - Returns succint information on `workflow_id`: - - Execution status (VIP notations) - - Starting time (local time, format '%Y/%m/%d %H:%M:%S') - - List of paths to the output files. - """ - try : - # Get execution infos - infos = vip.execution_info(workflow_id) - # Secure way to get execution results - # files = vip.get_exec_results(workflow_id) - except RuntimeError as vip_error: - cls._handle_vip_error(vip_error) - # Return filtered information - return { - # Execution status (VIP notations) - "status": infos["status"], - # Starting time (human readable) - "start": time.strftime( - '%Y/%m/%d %H:%M:%S', time.localtime(infos["startDate"]/1000) - ), - # # Returned files - # "outputs": infos["returnedFiles"]["output_file"] - } - # ------------------------------------------------ - - ################################################### - # Save (/load) Session to (/from) Girder metadata # - ################################################### - - # Save session properties in a JSON file - def _save_session(self, session_data: dict, location="girder") -> bool: - """ - Saves dictionary `session_data` as metadata in the output directory on Girder. - Returns a success flag. - Displays success / failure unless `_verbose` is False. - """ - # Thow error if location is not "girder" because this session does no interact with VIP - if location != "girder": - return NotImplementedError(f"Location '{location}' is unknown for {self.__name__}") - # Ensure the output directory exists on Girder - is_new = self._mkdirs(path=self._vip_output_dir, location=location) - # Save metadata in the global output directory - folderId, _ = self._girder_path_to_id(self._vip_output_dir) - self._girder_client.addMetadataToFolder(folderId=folderId, metadata=session_data) - # Update metadata for each workflow - for workflow_id in self._workflows: - metadata = self._meta_workflow(workflow_id=workflow_id) - folderId, _ = self._girder_path_to_id(path=self._workflows[workflow_id]["output_path"]) - self._girder_client.addMetadataToFolder(folderId=folderId, metadata=metadata) - # Display - self._print() - if is_new: - self._print(">> Session was backed up as Girder metadata in:") - self._print(f"\t{self._vip_output_dir} (Girder ID: {folderId})\n") - else: - self._print(">> Session backed up\n") - # Return - return True - # ------------------------------------------------ - - def _load_session(self, location="girder") -> dict: - """ - Loads backup data from the metadata stored in the output directory on Girder. - If the metadata could not be found, returns None. - Otherwise, returns session properties as a dictionary. - """ - # Thow error if location is not "girder" - if location != "girder": - return NotImplementedError(f"Location '{location}' is unknown for {self.__name__}") - # Check the output directory is defined - if self.vip_output_dir is None: - return None - # Load the metadata on Girder - with self._silent_class(): - try: - girder_id, _ = self._girder_path_to_id(self.vip_output_dir) - folder = self._girder_client.getFolder(folderId=girder_id) - except girder_client.HttpError as e: - if e.status == 400: # Folder was not found - return None - # Display success if the folder was found - self._print("<< Session restored from its output directory\n") - # Return session metadata - return folder["meta"] - # ------------------------------------------------ - - ################################## - # Manipulate Resources on Girder # - ################################## - - # Function to get a resource ID - @classmethod - def _girder_path_to_id(cls, path) -> tuple[str, str]: - """ - Returns a resource ID from its `path` within a Girder collection. - `path` should begin with: "/collection/[collection_name]/...". - `path` can be a string or PathLib object. - - Raises `girder_client.HttpError` if the resource was not found. - Adds intepretation message unless `cls._VERBOSE` is False. - """ - try : - resource = cls._girder_client.resourceLookup(str(path)) - except girder_client.HttpError as e: - if e.status == 400: - cls._printc("(!) The following path is invalid or refers to a resource that does not exist:") - cls._printc(" %s" % path) - cls._printc(" Original error from Girder API:") - raise e - # Return the resource ID and type - try: - return resource['_id'], resource['_modelType'] - except KeyError as ke: - cls._printc(f"Unhandled type of resource: \n\t{resource}\n") - raise ke - # ------------------------------------------------ - - # Function to get a resource path - @classmethod - def _girder_id_to_path(cls, id: str, type: str) -> PurePosixPath: - """ - Returns a resource path from its Girder `id`. - The resource `type` (item, folder, collection) must be provided. - - Raises `girder_client.HttpError` if the resource was not found. - """ - try : - return PurePosixPath(cls._girder_client.get(f"/resource/{id}/path", {"type": type})) - except girder_client.HttpError as e: - if e.status == 400: - cls._printc(f"(!) Invalid Girder ID: {id} with resource type:{type}") - cls._printc(" Original error from Girder API:") - raise e - # ------------------------------------------------ - - # Function to convert a Girder ID to Girder-VIP standard - @classmethod - def _vip_girder_id(cls, resource) -> str: - """ - Prefixes a Girder ID with the VIP standard. - Input `resource` should be a Girder Id (str) or a Girder path (PurePosixPath) - """ - if isinstance(resource, str): - # Prefix the ID - return ":".join([cls._GIRDER_ID_PREFIX, resource]) - elif isinstance(resource, PurePath): - # Get the Girder ID - girder_id, _ = cls._girder_path_to_id(resource) - # Prefix the ID - return ":".join([cls._GIRDER_ID_PREFIX, girder_id]) - # ------------------------------------------------ - - ################################################################### - # Adapt `input_settings` to the Vip-Girder communication protocol # - ################################################################### - - # Store the VIP paths as PathLib objects. - def _parse_input_settings(self, input_settings) -> dict: - """ - Parses the input settings, i.e.: - - Resolves any reference to a Girder collection and turns a folder name - into a list of files - - Converts all Girder paths to PathLib objects - - Leaves the other parameters untouched. - """ - # Function to extract file from Girder item - def get_file_from_item(itemId: str) -> str: - """Returns the Girder ID of a single file contained in `itemId`""" - files = [ - f["_id"] for f in self._girder_client.listFile(itemId=itemId) - ] - # Check the number of files (1 per item) - if len(files) != 1: - msg = f"Unable to parse the Girder item : {self._girder_id_to_path(id=itemId, type='item')}" - msg += "Contains more than 1 file." - raise NotImplementedError(msg) - return files[0] - # -- End of get_file_from_item() -- - # Function to extract all files from a Girder resource - def get_files(input_path: str): - """ - Returns the path of all files contained in the Girder resource pointed by `input_path`. - The Girder resource can be a file, an item with 1 file or a folder with multiple items. - """ - # Look up for the resource in Girder & get the Girder ID - girder_id, girder_type = self._girder_path_to_id(input_path) - # Retrieve all files based on the resource type - if girder_type == "file": - # Return the Girder path - return PurePosixPath(input_path) - elif girder_type == "item": - # Retrieve the corresponding file - fileId = get_file_from_item(girder_id) - # Return the Girder path - return self._girder_id_to_path(id=fileId, type='file') - elif girder_type == "folder": - # Retrieve all items - items = [ it["_id"] for it in self._girder_client.listItem(folderId=girder_id) ] - new_inputs = [] - # Browse items - for itemId in items: - # Retrieve the corresponding file - fileId = get_file_from_item(itemId) - # Update the file list with new Girder path - new_inputs.append(self._girder_id_to_path(id=fileId, type='file')) - # Return the list of files - return new_inputs - else: - # Girder type = collection or other - raise ValueError(f"Bad resource: {input_path}\n\tGirder type '{girder_type}' is not permitted in this context.") - # -- End of get_files() -- - # Function to parse Girder paths - def parse_value(input): - # Case: single input, string or path-like - if isinstance(input, (str, os.PathLike)): - # Case: Girder path - if str(input).startswith(self._SERVER_PATH_PREFIX): - return get_files(input) - # Case: any other input - else: return input - # Case: multiple inputs - elif isinstance(input, list): - new_input = [] - # Browse elements - for element in input: - # Parse element - parsed = parse_value(element) - # Merge the lists if `element` is a folder - if isinstance(parsed, list): new_input += parsed - # Append if `element` is a file - else: new_input.append(parsed) - # Return the list of files - return new_input - # Case not string nor path-like: return as is - else: return input - # -- End of parse_value() -- - # Return the parsed value of each parameter - return { - key: parse_value(value) - for key, value in input_settings.items() - } - # ------------------------------------------------ - - # Get the input settings after files are parsed as PathLib objects - def _get_input_settings(self, location="girder") -> dict: - """ - Returns the input settings with filenames adapted to `location`. - - if `location` = "girder", returns Girder paths string format. - - if `location` = "vip-girder", returns the prefixed Girder ID for VIP. - - Returns a string version of any other parameter. - """ - # Function to get the VIP-Girder standard from 1 input path - def get_input(value, location) -> str: - """ - If `value` is a path, returns the corresponding string. - Value can be a single input or a list of inputs. - """ - # Case: multiple inputs - if isinstance(value, list): - return [ get_input(element, location) for element in value ] - # Case : path to Girder resource - elif isinstance(value, PurePath): - if location == "girder": - return str(value) - elif location == "vip-girder": - return self._vip_girder_id(value) - # Case: other parameter - else: return str(value) - # -------------------- - # Raise an error if `location` cannot be parsed - if location not in ("girder", "vip-girder"): - raise NotImplementedError(f"Unknown location: {location}") - # Browse input settings - return { - key: get_input(value, location) - for key, value in self._input_settings.items() - } - # ------------------------------------------------ - -###################################################### - -if __name__=="__main__": - pass \ No newline at end of file diff --git a/src/vip_client/classes/__init__.py b/src/vip_client/classes/__init__.py index cc36a19..09eb4d4 100644 --- a/src/vip_client/classes/__init__.py +++ b/src/vip_client/classes/__init__.py @@ -2,7 +2,7 @@ All classes for the client. - VipSession: main user class. To run a VIP application on local datasets. - VipLauncher: to run a Vip application on datasets located on VIP servers. -- VipCI (alpha): to run a Vip application on datasets located on CREATIS data warehouse. +- VipGirder (alpha): to run a Vip application on datasets located on CREATIS data warehouse. - VipLoader (planned): to upload / download data to / from VIP servers. - VipLoader (planned): base class. """ @@ -10,6 +10,4 @@ # Replace each class module by its class in the namespace from vip_client.classes.VipSession import VipSession from vip_client.classes.VipLauncher import VipLauncher -from vip_client.classes.VipCI import VipCI -from vip_client.classes.VipLoader import VipLoader -from vip_client.classes.VipClient import VipClient +from vip_client.classes.VipGirder import VipGirder diff --git a/tests/README.md b/tests/README.md index 2be2e22..50e159e 100644 --- a/tests/README.md +++ b/tests/README.md @@ -1,3 +1,3 @@ -# Test Suite for VIP Pyhton Client +# Test Suite for VIP Python Client -This is an unfinished work proposing test scripts for VipLauncher, VipCI and VipSession using pytest. It also tests low-level client functions. \ No newline at end of file +This is an unfinished work proposing test scripts for VipLauncher, VipGirder and VipSession using pytest. It also tests low-level client functions. \ No newline at end of file diff --git a/tests/test_VipCI.py b/tests/test_VipGirder.py similarity index 90% rename from tests/test_VipCI.py rename to tests/test_VipGirder.py index 92fa6b5..f3100ce 100644 --- a/tests/test_VipCI.py +++ b/tests/test_VipGirder.py @@ -1,11 +1,7 @@ -import io -from urllib.error import HTTPError import pytest from pathlib import * -import pytest_mock -from vip_client.utils import vip -from vip_client.classes import VipCI +from vip_client.classes import VipGirder from mocked_services import mock_vip_api, mock_girder_client, mock_pathlib, mock_os from FakeGirderClient import FakeGirderClient @@ -41,7 +37,7 @@ def setup_teardown_vip_launcher(request, mocker): # Setup code before running the tests in the class print("Handshake with VIP") - VipCI.init(vip_key="FAKE_KEY", girder_key="FAKE_KEY") + VipGirder.init(vip_key="FAKE_KEY", girder_key="FAKE_KEY") print("Setup done") @pytest.fixture(scope="function", autouse=True) @@ -86,7 +82,7 @@ def fake_execution_info(workflow_id): mocker.patch("vip_client.utils.vip.execution_info").side_effect = fake_execution_info # Launch a Full Session Run - s = VipCI() + s = VipGirder() s.pipeline_id = pipeline_id s.output_dir = PurePosixPath("/vip/Home/test-VipLauncher/OUTPUTS") s.input_settings = { @@ -131,19 +127,19 @@ def fake_execution_info(workflow_id): ) def test_backup(mocker, backup_location, input_settings, pipeline_id, output_dir): - VipCI._BACKUP_LOCATION = backup_location + VipGirder._BACKUP_LOCATION = backup_location # Create session - s1 = VipCI(pipeline_id=pipeline_id, input_settings=input_settings) + s1 = VipGirder(pipeline_id=pipeline_id, input_settings=input_settings) s1.output_dir = output_dir - assert s1._save() is not (VipCI._BACKUP_LOCATION is None) # Return False if no backup location + assert s1._save() is not (VipGirder._BACKUP_LOCATION is None) # Return False if no backup location # Load backup - s2 = VipCI(output_dir=s1.output_dir) + s2 = VipGirder(output_dir=s1.output_dir) # Check parameters assert s2.output_dir == s1.output_dir - if VipCI._BACKUP_LOCATION is None: + if VipGirder._BACKUP_LOCATION is None: assert not s2._load() assert s2.input_settings != s1.input_settings assert s2.pipeline_id != s1.pipeline_id @@ -154,10 +150,10 @@ def test_backup(mocker, backup_location, input_settings, pipeline_id, output_dir def test_properties_interface(mocker): - VipCI._BACKUP_LOCATION = "girder" + VipGirder._BACKUP_LOCATION = "girder" # Copy the first session - s = VipCI() + s = VipGirder() s.input_settings = { "zipped_folder": 'fake_value1', "basis_file": 'fake_value2', diff --git a/tests/test_VipLauncher.py b/tests/test_VipLauncher.py index 366fe86..cae305d 100644 --- a/tests/test_VipLauncher.py +++ b/tests/test_VipLauncher.py @@ -1,8 +1,6 @@ -import io import pytest from pathlib import * -from vip_client.utils import vip from vip_client.classes import VipLauncher from mocked_services import mock_vip_api, mock_pathlib, mock_os diff --git a/tests/test_VipSession.py b/tests/test_VipSession.py index f1b2d9e..f1d62b0 100644 --- a/tests/test_VipSession.py +++ b/tests/test_VipSession.py @@ -1,10 +1,7 @@ -import io from unittest.mock import patch import pytest from pathlib import * -import pytest_mock -from vip_client.utils import vip from vip_client.classes import VipSession from mocked_services import mock_vip_api, mock_pathlib, mock_os diff --git a/tests/test_global.py b/tests/test_global.py index d6beb14..16d5479 100644 --- a/tests/test_global.py +++ b/tests/test_global.py @@ -1,4 +1,4 @@ -from vip_client.classes import VipSession, VipCI, VipLauncher +from vip_client.classes import VipSession, VipGirder, VipLauncher from mocked_services import mock_vip_api, mock_pathlib, mock_os, mock_girder_client import pytest @@ -48,8 +48,8 @@ } ] -test_cases_missing_input_fields = [(input_settings, tested_class) for input_settings in test_cases_missing_input_fields for tested_class in [VipSession, VipLauncher, VipCI]] -test_cases_missing_input_values = [(input_settings, tested_class) for input_settings in test_cases_missing_input_values for tested_class in [VipSession, VipLauncher, VipCI]] +test_cases_missing_input_fields = [(input_settings, tested_class) for input_settings in test_cases_missing_input_fields for tested_class in [VipSession, VipLauncher, VipGirder]] +test_cases_missing_input_values = [(input_settings, tested_class) for input_settings in test_cases_missing_input_values for tested_class in [VipSession, VipLauncher, VipGirder]] @pytest.fixture(scope="function", autouse=True) def setup_teardown_vip_launcher(request, mocker): @@ -63,7 +63,7 @@ def setup_teardown_vip_launcher(request, mocker): print("Handshake with VIP") VipSession.init(api_key="FAKE_KEY") VipLauncher.init(api_key="FAKE_KEY") - VipCI.init(vip_key="FAKE_KEY", girder_key="FAKE_KEY") + VipGirder.init(vip_key="FAKE_KEY", girder_key="FAKE_KEY") print("Setup done") @@ -72,10 +72,10 @@ def setup_teardown_vip_launcher(request, mocker): ) def test_missing_input_settings(input_settings, tested_class): - VipCI._BACKUP_LOCATION = None + VipGirder._BACKUP_LOCATION = None # Copy the first session - s = VipCI() + s = VipGirder() s.pipeline_id = "LCModel/0.1" s.output_dir = "/path/to/output" s.input_settings = input_settings diff --git a/tests/try_viploader.py b/tests/try_viploader.py deleted file mode 100644 index 8032c3f..0000000 --- a/tests/try_viploader.py +++ /dev/null @@ -1,18 +0,0 @@ -import sys -from pathlib import Path -SOURCE_ROOT = str(Path(__file__).parents[1] / "src") # <=> /src/ -sys.path.append(SOURCE_ROOT) -import vip_client - -from vip_client.classes import VipLoader -from pathlib import * - -VipLoader.init() -path = "/vip/EGI tutorial (group)/outputs" -print(f"Under '{path}':") -print("\n".join(VipLoader.list_dir(path))) -print() -VipLoader.download_dir( - vip_path=path, - local_path=Path("Here") -) \ No newline at end of file From 3810a144b568837feefff38e568fc740659375f5 Mon Sep 17 00:00:00 2001 From: Ethaniel Billon Date: Mon, 3 Mar 2025 16:46:05 +0100 Subject: [PATCH 04/14] file renaming --- src/vip_client/classes/VipGirder.py | 679 ++++++++++++++++++++++++++++ 1 file changed, 679 insertions(+) create mode 100644 src/vip_client/classes/VipGirder.py diff --git a/src/vip_client/classes/VipGirder.py b/src/vip_client/classes/VipGirder.py new file mode 100644 index 0000000..a72bb11 --- /dev/null +++ b/src/vip_client/classes/VipGirder.py @@ -0,0 +1,679 @@ +# Builtins +from __future__ import annotations +import os +import time +from pathlib import * +# Try importing the Girder client +try: + import girder_client +except: + from warnings import warn + warn("vip_client.classes.VipGirder is unavailable (missing package: girder-client)") +# Other classes from VIP client +from vip_client.utils import vip +from vip_client.classes.VipLauncher import VipLauncher + +class VipGirder(VipLauncher): + """ + Python class to run VIP pipelines on datasets located on Girder. + + A single instance allows to run 1 pipeline with 1 parameter set (any number of runs). + Pipeline runs need at least three inputs: + - `pipeline_id` (str) Name of the pipeline. + - `input_settings` (dict) All parameters needed to run the pipeline. + - `output_dir` (str) Path to a Girder folder where execution results will be stored. + + N.B.: all instance methods require that `VipGirder.init()` has been called with: + - a valid VIP API key; + - a valid Girder API key. + """ + + ################## + ################ Class Attributes ################## + ################## + + # --- Overriden from the parent class --- + + # Class name + __name__ = "VipGirder" + # Properties to save / display for this class + _PROPERTIES = [ + "session_name", + "pipeline_id", + "vip_output_dir", + "input_settings", + "workflows" + ] + # Default location for VIP inputs/outputs (different from the parent class) + _INPUT_SERVER_NAME = "girder" + _OUTPUT_SERVER_NAME = "girder" + # Prefix that defines a Girder path + _SERVER_PATH_PREFIX = "/collection" + # Default backup location + # (set to None to avoid saving and loading backup files) + _BACKUP_LOCATION = "girder" + + # Prefix that defines a Girder ID + _GIRDER_ID_PREFIX = "pilotGirder" + # Grider portal + _GIRDER_PORTAL = 'https://pilot-warehouse.creatis.insa-lyon.fr/api/v1' + + + ################# + ################ Main Properties ################## + ################# + + @property + def custom_wf_metadata(self) -> dict: + return self._custom_wf_metadata + + @custom_wf_metadata.setter + def custom_wf_metadata(self, value: dict) -> None: + if value != None: + assert isinstance(value, dict), f"Custom metadata must be a dictionary, not {type(value)}" + self._custom_wf_metadata = value + + + + ############# + ################ Constructor ################## + ############# + def __init__( + self, output_dir=None, pipeline_id: str=None, input_settings: dict=None, + session_name: str=None, verbose: bool=None, custom_wf_metadata: dict=None, **kwargs + ) -> None: + """ + Creates a VipGirder instance and sets its properties from keyword arguments. + + ## Parameters + - `output_dir` (str | os.PathLike) Path to a Girder folder where execution results will be stored. + - Does not need to exist + - Usually in format : "/collection/[collection_name]/[path_to_folder]" + - User must have read/write permissions on the Girder collection/folder. + + - `pipeline_id` (str) Name of your pipeline in VIP. + - Usually in format : *application_name*/*version*. + - Run VipLauncher.show_pipeline() to display available pipelines. + + - `input_settings` (dict) All parameters needed to run the pipeline. + - Run VipSession.show_pipeline(`pipeline_id`) to display these parameters. + - The dictionary can contain any object that can be converted to strings, or lists of such objects. + - Lists of parameters launch parallel workflows on VIP. + + - `session_name` [Optional/Recommended] (str) A name to identify this session. + - Default value: 'VipGirder-[date]-[time]-[id]' + + - `verbose` [Optional] (bool) Verbose mode for this instance. + - If True, instance methods will display logs; + - If False, instance methods will run silently. + + - `custom_wf_metadata` [Optional] (dict) Custom metadata to add to each workflow. + + `session_name` is only set at instantiation; other properties can be set later in function calls. + If `output_dir` leads to data from a previous session, properties will be loaded from the metadata on Girder. + """ + # Initialize with the name, pipeline and input settings + super().__init__( + output_dir = output_dir, + session_name = session_name, + pipeline_id = pipeline_id, + input_settings = input_settings, + verbose = verbose + ) + # Set custom properties + self.custom_wf_metadata = custom_wf_metadata + # End display + if any([session_name, output_dir]) and (self.__name__ == "VipGirder"): + self._print() + # ------------------------------------------------ + + ################ + ################ Public Methods ################## + ################ + + ################################################# + # Manage a session from start to finish + ################################################# + + # Login to VIP and Girder + @classmethod + def init( + cls, + vip_key="VIP_API_KEY", + girder_key="GIRDER_API_KEY", + verbose=True, + girder_api_url=None, + girder_id_prefix=None, + **kwargs + ) -> VipGirder: + """ + Handshakes with VIP using your own API key. + Returns a class instance which properties can be provided as keyword arguments. + + ## Parameters + - `vip_key` (str): VIP API key. This can be either: + A. [unsafe] A **string litteral** containing your API key, + B. [safer] A **path to some local file** containing your API key, + C. [safer] The **name of some environment variable** containing your API key (default: "VIP_API_KEY"). + In cases B or C, the API key will be loaded from the local file or the environment variable. + + - `girder_key` (str): Girder API key. Can take the same values as `vip_key`. + + - `verbose` (bool): default verbose mode for all instances. + - If True, all instances will display logs by default; + - If False, all instance methods will run silently by default. + + - `kwargs` [Optional] (dict): keyword arguments or dictionnary setting properties of the returned instance. + """ + # Initiate a Vip Session silently + super().init(api_key=vip_key, verbose=False, **kwargs) + # Restore the verbose state + cls._VERBOSE = verbose + # Set the Girder ID prefix + cls._GIRDER_ID_PREFIX = girder_id_prefix if girder_id_prefix is not None else cls._GIRDER_ID_PREFIX + cls._GIRDER_PORTAL = girder_api_url if girder_api_url is not None else cls._GIRDER_PORTAL + # Instantiate a Girder client + cls._girder_client = girder_client.GirderClient(apiUrl=girder_api_url) + # Check if `girder_key` is in a local file or environment variable + true_key = cls._get_api_key(girder_key) + # Authenticate with Girder API key + cls._girder_client.authenticate(apiKey=true_key) + # Diplay success + cls._printc() + cls._printc("---------------------------------------------") + cls._printc("| You are communicating with VIP and Girder |") + cls._printc("---------------------------------------------") + cls._printc() + # Return a VipGirder instance for method cascading + return cls(verbose=(verbose and kwargs), **kwargs) + # ------------------------------------------------ + + # Launch the pipeline on VIP + def launch_pipeline( + self, pipeline_id: str=None, input_settings: dict=None, output_dir=None, nb_runs=1, + verbose: bool=None + ) -> VipGirder: + """ + Launches pipeline executions on VIP. + + Input parameters : + - `pipeline_id` (str) The name of your pipeline in VIP, + usually in format : *application_name*/*version*. + - `input_settings` (dict) All parameters needed to run the pipeline. + - Run VipSession.show_pipeline(`pipeline_id`) to display these parameters. + - The dictionary can contain any object that can be converted to strings, or lists of such objects. + - Lists of parameters launch parallel workflows on VIP. + - `output_dir` (str) Path to the VIP folder where execution results will be stored. + - `nb_runs` (int) Number of parallel workflows to launch with the same settings. + + Default behaviour: + - Raises AssertionError in case of wrong inputs + - Raises RuntimeError in case of failure on VIP servers. + - In any case, session is backed up after pipeline launch + """ + return super().launch_pipeline( + pipeline_id = pipeline_id, # default + input_settings = input_settings, # default + output_dir = output_dir, # default + nb_runs = nb_runs, # default + ) + # ------------------------------------------------ + + # Monitor worflow executions on VIP + def monitor_workflows(self, refresh_time=30) -> VipGirder: + """ + Updates and displays the status of each execution launched in the current session. + - If an execution is still runnig, updates status every `refresh_time` (seconds) until all runs are done. + - Displays a full report when all executions are done. + """ + return super().monitor_workflows(refresh_time=refresh_time) + # ------------------------------------------------ + + # Run a full VipGirder session + def run_session(self, nb_runs=1, refresh_time=30) -> VipGirder: + """ + Runs a full session from Girder data: + 1. Launches pipeline executions on VIP; + 2. Monitors pipeline executions until they are all over; + and Adds metadata on Girder output folder. + + |!| This function assumes that all session properties are already set. + Optional arguments can be provided: + - Increase `nb_runs` to run more than 1 execution at once; + - Set `refresh_time` to modify the default refresh time. + """ + return super().run_session(nb_runs=nb_runs, refresh_time=refresh_time) + # ------------------------------------------------ + + # Display session properties in their current state + def display(self) -> VipGirder: + """ + Displays useful properties in JSON format. + - `session_name` : current session name + - `pipeline_id`: pipeline identifier + - `output_dir` : path to the pipeline outputs + - `input_settings` : input parameters sent to VIP + - `workflows`: workflow inventory, identifying all pipeline runs in this session. + """ + # Return for method cascading + return super().display() + # ------------------------------------------------ + + # Return error in case of call to finish() + def finish(self, verbose: bool=None) -> None: + """ + This function does not work in VipGirder. + """ + # Update the verbose state and display + self._verbose = verbose + self._print("\n=== FINISH ===\n", max_space=2) + # Raise error message + raise NotImplementedError(f"Class {self.__name__} cannot delete the distant data.") + # ------------------------------------------------ + + ################# + ################ Private Methods ################ + ################# + + ################################################################### + # Methods that must be overwritten to adapt VipLauncher methods to + # new location: "girder" + ################################################################### + + # Path to delete during session finish + def _path_to_delete(self) -> dict: + """Returns the folders to delete during session finish, with appropriate location.""" + return {} + # ------------------------------------------------ + + # Method to check existence of a resource on Girder. + @classmethod + def _exists(cls, path: PurePath, location="girder") -> bool: + """ + Checks existence of a resource on Girder. + """ + # Check path existence in `location` + if location=="girder": + try: + cls._girder_client.resourceLookup(path=str(path)) + return True + except girder_client.HttpError: + return False + else: + return super()._exists(path, location) + # ------------------------------------------------ + + # Method to create a distant or local directory + @classmethod + def _create_dir(cls, path: PurePath, location="girder", **kwargs) -> str: + """ + Creates a directory at `path` on Girder if `location` is "girder". + + `path` should be a PathLib object. + `kwargs` can be passed as keyword arguments to `girder-client.createFolder()`. + Returns the Girder ID of the newly created folder. + """ + if location == "girder": + # Find the parent ID and type + parentId, parentType = cls._girder_path_to_id(str(path.parent)) + # Check the parent is a directory + if not (parentType == "folder"): + raise ValueError(f"Cannot create folder {path} in '{path.parent}': parent is not a Girder folder") + # Create the new directory with additional keyword arguments + return cls._girder_client.createFolder( + parentId=parentId, name=str(path.name), reuseExisting=True, **kwargs + )["_id"] + else: + super()._create_dir(path, location) + return "" + # ------------------------------------------------ + + # Function to delete a path + @classmethod + def _delete_path(cls, path: PurePath, location="vip") -> None: + if location == "girder": + raise NotImplementedError("VipGirder cannot delete data.") + else: + return super()._delete_path(path, location) + + # Function to delete a path on VIP with warning + @classmethod + def _delete_and_check(cls, path: PurePath, location="vip", timeout=300) -> bool: + if location == "girder": + raise NotImplementedError("VipGirder cannot delete data.") + else: + return super()._delete_and_check(path, location, timeout) + + #################################################### + # Launch & Monitor pipeline executions from Girder # + #################################################### + + def _init_exec(self) -> str: + """ + Initiates one VIP workflow with `pipeline_id`, `session_name`, `input_settings`, `output_dir`. + Returns the workflow identifier. + """ + # Get function arguments + # input_settings = self._vip_input_settings(self._input_settings) + input_settings = self._get_input_settings(location="vip-girder") + # Create a workflow-specific result directory + res_path = self._vip_output_dir / time.strftime('%Y-%m-%d_%H:%M:%S', time.localtime()) + # no simple way to rename later with workflow_id + res_id = self._create_dir( + path=res_path, location=self._OUTPUT_SERVER_NAME, + description=f"VIP outputs from one workflow in Session '{self._session_name}'" + ) + res_vip = self._vip_girder_id(res_id) + # Launch execution + workflow_id = vip.init_exec( + pipeline = self.pipeline_id, + name = self.session_name, + inputValues = input_settings, + resultsLocation = res_vip + ) + # Record the path to output files (create the workflow entry) + self._workflows[workflow_id] = {"output_path": str(res_path)} + return workflow_id + # ------------------------------------------------ + + # Function extract metadata from a single workflow + def _meta_workflow(self, workflow_id: str) -> dict: + metadata = { + "session_name": self._session_name, + "workflow_id": workflow_id, + "workflow_start": self._workflows[workflow_id]["start"], + "workflow_status": self._workflows[workflow_id]["status"] + } + # If custom metadata is provided, add it to the metadata + if self.custom_wf_metadata is not None: + metadata = {**metadata, **self.custom_wf_metadata} + return metadata + + # Overwrite _get_exec_infos() to bypass call to vip.get_exec_results() (does not work at this time) + @classmethod + def _get_exec_infos(cls, workflow_id: str) -> dict: + """ + Returns succint information on `workflow_id`: + - Execution status (VIP notations) + - Starting time (local time, format '%Y/%m/%d %H:%M:%S') + - List of paths to the output files. + """ + try : + # Get execution infos + infos = vip.execution_info(workflow_id) + # Secure way to get execution results + # files = vip.get_exec_results(workflow_id) + except RuntimeError as vip_error: + cls._handle_vip_error(vip_error) + # Return filtered information + return { + # Execution status (VIP notations) + "status": infos["status"], + # Starting time (human readable) + "start": time.strftime( + '%Y/%m/%d %H:%M:%S', time.localtime(infos["startDate"]/1000) + ), + # # Returned files + # "outputs": infos["returnedFiles"]["output_file"] + } + # ------------------------------------------------ + + ################################################### + # Save (/load) Session to (/from) Girder metadata # + ################################################### + + # Save session properties in a JSON file + def _save_session(self, session_data: dict, location="girder") -> bool: + """ + Saves dictionary `session_data` as metadata in the output directory on Girder. + Returns a success flag. + Displays success / failure unless `_verbose` is False. + """ + # Thow error if location is not "girder" because this session does no interact with VIP + if location != "girder": + return super()._save_session(session_data, location) + # Ensure the output directory exists on Girder + is_new = self._mkdirs(path=self._vip_output_dir, location=location) + # Save metadata in the global output directory + folderId, _ = self._girder_path_to_id(self._vip_output_dir) + self._girder_client.addMetadataToFolder(folderId=folderId, metadata=session_data) + # Update metadata for each workflow + for workflow_id in self._workflows: + metadata = self._meta_workflow(workflow_id=workflow_id) + folderId, _ = self._girder_path_to_id(path=self._workflows[workflow_id]["output_path"]) + self._girder_client.addMetadataToFolder(folderId=folderId, metadata=metadata) + # Display + self._print() + if is_new: + self._print(">> Session was backed up as Girder metadata in:") + self._print(f"\t{self._vip_output_dir} (Girder ID: {folderId})\n") + else: + self._print(">> Session backed up\n") + # Return + return True + # ------------------------------------------------ + + def _load_session(self, location="girder") -> dict: + """ + Loads backup data from the metadata stored in the output directory on Girder. + If the metadata could not be found, returns None. + Otherwise, returns session properties as a dictionary. + """ + # Thow error if location is not "girder" + if location != "girder": + return super()._load_session(location) + # Check the output directory is defined + if self.vip_output_dir is None: + return None + # Load the metadata on Girder + with self._silent_class(): + try: + girder_id, _ = self._girder_path_to_id(self.vip_output_dir) + folder = self._girder_client.getFolder(folderId=girder_id) + except girder_client.HttpError as e: + if e.status == 400: # Folder was not found + return None + # Display success if the folder was found + self._print("<< Session restored from its output directory\n") + # Return session metadata + return folder["meta"] + # ------------------------------------------------ + + ################################## + # Manipulate Resources on Girder # + ################################## + + # Function to get a resource ID + @classmethod + def _girder_path_to_id(cls, path) -> tuple[str, str]: + """ + Returns a resource ID from its `path` within a Girder collection. + `path` should begin with: "/collection/[collection_name]/...". + `path` can be a string or PathLib object. + + Raises `girder_client.HttpError` if the resource was not found. + Adds intepretation message unless `cls._VERBOSE` is False. + """ + try : + resource = cls._girder_client.resourceLookup(str(path)) + except girder_client.HttpError as e: + if e.status == 400: + cls._printc("(!) The following path is invalid or refers to a resource that does not exist:") + cls._printc(" %s" % path) + cls._printc(" Original error from Girder API:") + raise e + # Return the resource ID and type + try: + return resource['_id'], resource['_modelType'] + except KeyError as ke: + cls._printc(f"Unhandled type of resource: \n\t{resource}\n") + raise ke + # ------------------------------------------------ + + # Function to get a resource path + @classmethod + def _girder_id_to_path(cls, id: str, type: str) -> PurePosixPath: + """ + Returns a resource path from its Girder `id`. + The resource `type` (item, folder, collection) must be provided. + + Raises `girder_client.HttpError` if the resource was not found. + """ + try : + return PurePosixPath(cls._girder_client.get(f"/resource/{id}/path", {"type": type})) + except girder_client.HttpError as e: + if e.status == 400: + cls._printc(f"(!) Invalid Girder ID: {id} with resource type:{type}") + cls._printc(" Original error from Girder API:") + raise e + # ------------------------------------------------ + + # Function to convert a Girder ID to Girder-VIP standard + @classmethod + def _vip_girder_id(cls, resource) -> str: + """ + Prefixes a Girder ID with the VIP standard. + Input `resource` should be a Girder Id (str) or a Girder path (PurePosixPath) + """ + if isinstance(resource, str): + # Prefix the ID + return ":".join([cls._GIRDER_ID_PREFIX, resource]) + elif isinstance(resource, PurePath): + # Get the Girder ID + girder_id, _ = cls._girder_path_to_id(resource) + # Prefix the ID + return ":".join([cls._GIRDER_ID_PREFIX, girder_id]) + # ------------------------------------------------ + + ################################################################### + # Adapt `input_settings` to the Vip-Girder communication protocol # + ################################################################### + + # Store the VIP paths as PathLib objects. + def _parse_input_settings(self, input_settings) -> dict: + """ + Parses the input settings, i.e.: + - Resolves any reference to a Girder collection and turns a folder name + into a list of files + - Converts all Girder paths to PathLib objects + - Leaves the other parameters untouched. + """ + # Function to extract file from Girder item + def get_file_from_item(itemId: str) -> str: + """Returns the Girder ID of a single file contained in `itemId`""" + files = [ + f["_id"] for f in self._girder_client.listFile(itemId=itemId) + ] + # Check the number of files (1 per item) + if len(files) != 1: + msg = f"Unable to parse the Girder item : {self._girder_id_to_path(id=itemId, type='item')}" + msg += "Contains more than 1 file." + raise NotImplementedError(msg) + return files[0] + # -- End of get_file_from_item() -- + # Function to extract all files from a Girder resource + def get_files(input_path: str): + """ + Returns the path of all files contained in the Girder resource pointed by `input_path`. + The Girder resource can be a file, an item with 1 file or a folder with multiple items. + """ + # Look up for the resource in Girder & get the Girder ID + girder_id, girder_type = self._girder_path_to_id(input_path) + # Retrieve all files based on the resource type + if girder_type == "file": + # Return the Girder path + return PurePosixPath(input_path) + elif girder_type == "item": + # Retrieve the corresponding file + fileId = get_file_from_item(girder_id) + # Return the Girder path + return self._girder_id_to_path(id=fileId, type='file') + elif girder_type == "folder": + # Retrieve all items + items = [ it["_id"] for it in self._girder_client.listItem(folderId=girder_id) ] + new_inputs = [] + # Browse items + for itemId in items: + # Retrieve the corresponding file + fileId = get_file_from_item(itemId) + # Update the file list with new Girder path + new_inputs.append(self._girder_id_to_path(id=fileId, type='file')) + # Return the list of files + return new_inputs + else: + # Girder type = collection or other + raise ValueError(f"Bad resource: {input_path}\n\tGirder type '{girder_type}' is not permitted in this context.") + # -- End of get_files() -- + # Function to parse Girder paths + def parse_value(input): + # Case: single input, string or path-like + if isinstance(input, (str, os.PathLike)): + # Case: Girder path + if str(input).startswith(self._SERVER_PATH_PREFIX): + return get_files(input) + # Case: any other input + else: return input + # Case: multiple inputs + elif isinstance(input, list): + new_input = [] + # Browse elements + for element in input: + # Parse element + parsed = parse_value(element) + # Merge the lists if `element` is a folder + if isinstance(parsed, list): new_input += parsed + # Append if `element` is a file + else: new_input.append(parsed) + # Return the list of files + return new_input + # Case not string nor path-like: return as is + else: return input + # -- End of parse_value() -- + # Return the parsed value of each parameter + return { + key: parse_value(value) + for key, value in input_settings.items() + } + # ------------------------------------------------ + + # Get the input settings after files are parsed as PathLib objects + def _get_input_settings(self, location="girder") -> dict: + """ + Returns the input settings with filenames adapted to `location`. + - if `location` = "girder", returns Girder paths string format. + - if `location` = "vip-girder", returns the prefixed Girder ID for VIP. + + Returns a string version of any other parameter. + """ + # Function to get the VIP-Girder standard from 1 input path + def get_input(value, location) -> str: + """ + If `value` is a path, returns the corresponding string. + Value can be a single input or a list of inputs. + """ + # Case: multiple inputs + if isinstance(value, list): + return [ get_input(element, location) for element in value ] + # Case : path to Girder resource + elif isinstance(value, PurePath): + if location == "girder": + return str(value) + elif location == "vip-girder": + return self._vip_girder_id(value) + # Case: other parameter + else: return str(value) + # -------------------- + # Raise an error if `location` cannot be parsed + if location not in ("girder", "vip-girder"): + return super()._get_input_settings(location) + # Browse input settings + return { + key: get_input(value, location) + for key, value in self._input_settings.items() + } + # ------------------------------------------------ + +###################################################### + +if __name__=="__main__": + pass \ No newline at end of file From 85de883a534ea140c7f1dc5abb07cac076fea65a Mon Sep 17 00:00:00 2001 From: Ethaniel Billon Date: Mon, 3 Mar 2025 16:54:51 +0100 Subject: [PATCH 05/14] support of local vip as an output instead of girder --- src/vip_client/classes/VipGirder.py | 21 ++++++++++++--------- src/vip_client/classes/VipLauncher.py | 23 ++++++++++++++--------- 2 files changed, 26 insertions(+), 18 deletions(-) diff --git a/src/vip_client/classes/VipGirder.py b/src/vip_client/classes/VipGirder.py index a72bb11..9d0502e 100644 --- a/src/vip_client/classes/VipGirder.py +++ b/src/vip_client/classes/VipGirder.py @@ -354,16 +354,19 @@ def _init_exec(self) -> str: Returns the workflow identifier. """ # Get function arguments - # input_settings = self._vip_input_settings(self._input_settings) input_settings = self._get_input_settings(location="vip-girder") - # Create a workflow-specific result directory - res_path = self._vip_output_dir / time.strftime('%Y-%m-%d_%H:%M:%S', time.localtime()) - # no simple way to rename later with workflow_id - res_id = self._create_dir( - path=res_path, location=self._OUTPUT_SERVER_NAME, - description=f"VIP outputs from one workflow in Session '{self._session_name}'" - ) - res_vip = self._vip_girder_id(res_id) + res_path = str(self._vip_output_dir) + "/OUTPUTS" + res_vip = res_path + if (self._OUTPUT_SERVER_NAME == "girder"): + # Create a workflow-specific result directory + res_path = self._vip_output_dir / time.strftime('%Y-%m-%d_%H:%M:%S', time.localtime()) + # no simple way to rename later with workflow_id + res_id = self._create_dir( + path=res_path, location=self._OUTPUT_SERVER_NAME, + description=f"VIP outputs from one workflow in Session '{self._session_name}'" + ) + res_vip = self._vip_girder_id(res_id) + # Launch execution workflow_id = vip.init_exec( pipeline = self.pipeline_id, diff --git a/src/vip_client/classes/VipLauncher.py b/src/vip_client/classes/VipLauncher.py index bf1405c..1d96e20 100644 --- a/src/vip_client/classes/VipLauncher.py +++ b/src/vip_client/classes/VipLauncher.py @@ -43,7 +43,8 @@ class VipLauncher(): # (set to None to avoid saving and loading backup files) _BACKUP_LOCATION = None # Default location for VIP inputs/outputs (can be different for subclasses) - _SERVER_NAME = "vip" + _INPUT_SERVER_NAME = "vip" + _OUTPUT_SERVER_NAME = "vip" # Prefix that defines a path from VIP _SERVER_PATH_PREFIX = "/vip" # Default file name to save session properties @@ -482,15 +483,15 @@ def launch_pipeline( if not self._is_defined("_vip_output_dir"): raise TypeError("Please provide an output directory for Session: %s" %self._session_name) else: self._print("Output directory: ", end="", flush=True) - # Ensure the directory exists - if self._mkdirs(path=self._vip_output_dir, location=self._SERVER_NAME): - self._print(f"Created on {self._SERVER_NAME.upper()}") + # Ensure the directory exists + if self._mkdirs(path=self._vip_output_dir, location=self._OUTPUT_SERVER_NAME): + self._print(f"Created on {self._OUTPUT_SERVER_NAME.upper()}") else: self._print("OK") # Check the input parameters self._print("Input settings: ", end="", flush=True) # Check content - self._check_input_settings(location=self._SERVER_NAME) + self._check_input_settings(location=self._INPUT_SERVER_NAME) self._print("OK") # End parameters checks self._print("----------------\n") @@ -921,6 +922,8 @@ def _delete_and_check(cls, path: PurePath, location="vip", timeout=300) -> bool: Deletes `path` on `location` and waits until `path` is actually removed. After `timeout` (seconds), displays a warning if `path` still exist. """ + if location != "vip": + raise NotImplementedError(f"Unknown location: {location}") # Delete the path cls._delete_path(path, location) # Standby until path is indeed removed (give up after some time) @@ -1004,7 +1007,7 @@ def _set(self, **kwargs) -> None: # Simple context manager to unlock session properties while executing code @contextmanager - def _unlocked_properties(self) -> None: + def _unlocked_properties(self): """ Under this context, session properties can be modified without raising an error. """ @@ -1016,7 +1019,7 @@ def _unlocked_properties(self) -> None: # Simple context manager to silence session logs while executing code @contextmanager - def _silent_session(self) -> None: + def _silent_session(self): """ Under this context, the session will not print anything. """ @@ -1029,7 +1032,7 @@ def _silent_session(self) -> None: # Simple context manager to silence logs from class methods while executing code @classmethod @contextmanager - def _silent_class(cls) -> None: + def _silent_class(cls): """ Under this context, the session will not print anything. """ @@ -1528,6 +1531,8 @@ def _get_input_settings(self, location="vip") -> dict: Returns the input settings with their orignal values in string format. `location` is destined to subclasses. """ + if location != "vip": + raise NotImplementedError(f"Unknown location: {location}") return { key: [str(v) for v in value] if isinstance(value, list) else str(value) for key, value in self._input_settings.items() @@ -1553,7 +1558,7 @@ def _check_input_settings(self, input_settings: dict=None, location: str=None) - """ # If location is not provided, default to server if location is None: - location = self._SERVER_NAME + location = self._INPUT_SERVER_NAME # If input_settings are not provided, get instance attribute instead if not input_settings: if self._is_defined("_input_settings"): From b5f0cf1a7f000a789499fcb3e4e22d6474dae4fe Mon Sep 17 00:00:00 2001 From: Ethaniel Billon Date: Tue, 4 Mar 2025 13:07:05 +0100 Subject: [PATCH 06/14] download_outputs and finish on vipgirder mode --- src/vip_client/classes/VipGirder.py | 40 ++++++++++++++++++++++++----- 1 file changed, 33 insertions(+), 7 deletions(-) diff --git a/src/vip_client/classes/VipGirder.py b/src/vip_client/classes/VipGirder.py index 9d0502e..86caf66 100644 --- a/src/vip_client/classes/VipGirder.py +++ b/src/vip_client/classes/VipGirder.py @@ -259,16 +259,42 @@ def display(self) -> VipGirder: return super().display() # ------------------------------------------------ + def download_outputs(self, output_dir: str): + """This will works only when girder isn't the output location.""" + output_path = PurePath(output_dir) + + os.mkdir(output_path) + for workflow in self._workflows.keys(): + files = vip.get_exec_results(workflow) + files = vip.get_exec_results(workflow) + to_download = [file["path"] for file in files] + linked_download = [] + + for path in to_download: + posixPath = PosixPath(path) + linked_download.append((path, output_path / posixPath.name)) + + for v in linked_download: + if not (vip.download(v[0], v[1])): + print(f"Failed to download {v[0]} to {v[1]}") + print("Workflow outputs successfully downloaded locally!") + + # ------------------------------------------------ + # Return error in case of call to finish() - def finish(self, verbose: bool=None) -> None: + def finish(self, verbose: bool=None, keep_output=False) -> None: """ - This function does not work in VipGirder. + This function does nothing when using girder as output location else it erase the data on vip. """ - # Update the verbose state and display - self._verbose = verbose - self._print("\n=== FINISH ===\n", max_space=2) - # Raise error message - raise NotImplementedError(f"Class {self.__name__} cannot delete the distant data.") + if (self._OUTPUT_SERVER_NAME == "girder"): + # Update the verbose state and display + self._verbose = verbose + self._print("\n=== FINISH ===\n", max_space=2) + elif not keep_output: + for values in self._workflows.values(): + self._delete_and_check(PurePath(values["output_path"]).parent) + print(f"Workflow successfully cleaned on VIP ({str(PurePath(values["output_path"]).parent)})") + # ------------------------------------------------ ################# From 5a682d41c3665ae833c40c2f3fe064fc6923388d Mon Sep 17 00:00:00 2001 From: Ethaniel Billon Date: Tue, 4 Mar 2025 15:47:05 +0100 Subject: [PATCH 07/14] fix --- src/vip_client/classes/VipGirder.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/vip_client/classes/VipGirder.py b/src/vip_client/classes/VipGirder.py index 86caf66..f3ede45 100644 --- a/src/vip_client/classes/VipGirder.py +++ b/src/vip_client/classes/VipGirder.py @@ -293,7 +293,7 @@ def finish(self, verbose: bool=None, keep_output=False) -> None: elif not keep_output: for values in self._workflows.values(): self._delete_and_check(PurePath(values["output_path"]).parent) - print(f"Workflow successfully cleaned on VIP ({str(PurePath(values["output_path"]).parent)})") + print(f"Workflow successfully cleaned on VIP ({str(PurePath(values['output_path']).parent)})") # ------------------------------------------------ From 46d5201211b08f97e3a6021adc6bd546cf7851c3 Mon Sep 17 00:00:00 2001 From: Ethaniel Billon Date: Fri, 14 Mar 2025 10:40:31 +0100 Subject: [PATCH 08/14] fix already existing output folder --- src/vip_client/classes/VipGirder.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/vip_client/classes/VipGirder.py b/src/vip_client/classes/VipGirder.py index f3ede45..bb628f5 100644 --- a/src/vip_client/classes/VipGirder.py +++ b/src/vip_client/classes/VipGirder.py @@ -263,7 +263,7 @@ def download_outputs(self, output_dir: str): """This will works only when girder isn't the output location.""" output_path = PurePath(output_dir) - os.mkdir(output_path) + os.makedirs(output_path, exist_ok=True) for workflow in self._workflows.keys(): files = vip.get_exec_results(workflow) files = vip.get_exec_results(workflow) From d5b9c0426f3d274930e718c9834c430f3194a220 Mon Sep 17 00:00:00 2001 From: Ethaniel Billon Date: Mon, 17 Mar 2025 13:34:59 +0100 Subject: [PATCH 09/14] documentation --- doc/vipsession.md | 5 ++++- src/vip_client/classes/VipSession.py | 3 ++- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/doc/vipsession.md b/doc/vipsession.md index ec8c143..5002890 100644 --- a/doc/vipsession.md +++ b/doc/vipsession.md @@ -169,7 +169,10 @@ When all properties are set, the full *upload-run-download* process ([steps 2-5] ```python session.run_session() ``` -*Do not forget to remove your temporary data from VIP after downloading the outputs (`session.finish()`).* +*Do not forget to remove your temporary data from VIP after downloading the outputs (`session.finish(keep_output=False)`).* + +> [!NOTE] +> By default `session.finish()` will remove the outputs from the VIP platform, but if you want to keep them you use the option `keep_output=True` All `VipSession` methods can be run in cascade, so everything holds in a single command: ```python diff --git a/src/vip_client/classes/VipSession.py b/src/vip_client/classes/VipSession.py index 7485c28..47544e5 100644 --- a/src/vip_client/classes/VipSession.py +++ b/src/vip_client/classes/VipSession.py @@ -565,13 +565,14 @@ def run_session( # Clean session data on VIP def finish(self, timeout=300, keep_output=False) -> VipSession: """ - Removes session's data from VIP servers (INPUTS and OUTPUTS). + Removes session's data from VIP servers (INPUTS and by default OUTPUTS). The downloaded outputs and the input dataset are kept on the local machine. Detailed behaviour: - This process checks for actual deletion on VIP servers until `timeout` (seconds) is reached. If deletion could not be verified, the procedure ends with a warning message. - Workflows status are set to "Removed" when the corresponding outputs have been removed from VIP servers. + - OUTPUTS are by default deleted from VIP servers, the option `keep_output` override this behavior """ # Finish the session based on self._path_to_delete() super().finish(timeout=timeout, keep_output=keep_output) From a76a4004e205dd0d907668f209d2059715fbd302 Mon Sep 17 00:00:00 2001 From: Ethaniel Billon Date: Fri, 21 Mar 2025 14:35:22 +0100 Subject: [PATCH 10/14] documentation update --- examples/tutorials/demo-vipsession.ipynb | 78 ++++++++++++++++++++++++ 1 file changed, 78 insertions(+) diff --git a/examples/tutorials/demo-vipsession.ipynb b/examples/tutorials/demo-vipsession.ipynb index ed5c402..fc40a84 100644 --- a/examples/tutorials/demo-vipsession.ipynb +++ b/examples/tutorials/demo-vipsession.ipynb @@ -499,6 +499,84 @@ "! tree {new_session.output_dir}" ] }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Re-use a same dataset multiples times\n", + "You can re-use the same dataset multiples times. For that you will only need to keep the trace of where the inputs where uploaded on VIP. \n", + "You will also need to not run `finish()` at the end of the session which has uploaded the dataset, it will prevent your dataset from being deleted. \n", + "When you will reuse the data don't forget to adapt the paths in `inputs_settings` by using the path where they were initially uploaded." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "session = VipSession(\"session-A\")\n", + "session.upload_inputs(input_dir)\n", + "\n", + "inputs_settings = {\n", + " \"file\": \"initial.file\",\n", + " \"value\": 5\n", + "}\n", + "\n", + "input_folder_on_vip = session._vip_input_dir\n", + "print(f\"The dataset is located on VIP here: {input_folder_on_vip}\") # keep this information somewhere\n", + "\n", + "session.launch_pipeline(pipeline_id, input_settings)\n", + "session.monitor_workflows()\n", + "# no finish for the first session" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "session = VipSession(\"session-B\")\n", + "# do not forget to prepend your inputs !\n", + "adapted_inputs_settings = {\n", + " \"file\" : f\"{input_folder_on_vip}/initial.file\", # reuse the stored inforation !\n", + " \"value\": 5\n", + "}\n", + "session.launch_pipeline(pipeline_id, adapted_inputs_settings)\n", + "session.monitor_workflows()\n", + "session.finish()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "> [!NOTE]\n", + "> At the very end when you won't need the dataset anymore don't forget to run `VipSession(session-A).finish()` for cleaning the data from VIP servers.\n", + "> You must name your session like you named it for uploading your dataset." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "upload_session = VipSession(\"upload-session\")\n", + "upload_session.upload_inputs(input_dir)\n", + "# * running the session * #\n", + "\n", + "reuse_session_a = VipSession(\"reuse-session_a\")\n", + "# * reunning the session on the previous dataset * #\n", + "\n", + "reuse_session_b = VipSession(\"reuse-session_b\")\n", + "# * reunning the session on the previous dataset * #\n", + "\n", + "# finally deleting the dataset\n", + "VipSession(\"upload-session\").finish()" + ] + }, { "attachments": {}, "cell_type": "markdown", From 72180c1f344481ef7513969f9b6d4d4ac9ead6a7 Mon Sep 17 00:00:00 2001 From: Ethaniel <30417812+ethaaalpha@users.noreply.github.com> Date: Mon, 24 Mar 2025 15:29:53 +0100 Subject: [PATCH 11/14] typo --- examples/tutorials/demo-vipsession.ipynb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/examples/tutorials/demo-vipsession.ipynb b/examples/tutorials/demo-vipsession.ipynb index fc40a84..b2f2c08 100644 --- a/examples/tutorials/demo-vipsession.ipynb +++ b/examples/tutorials/demo-vipsession.ipynb @@ -540,7 +540,7 @@ "session = VipSession(\"session-B\")\n", "# do not forget to prepend your inputs !\n", "adapted_inputs_settings = {\n", - " \"file\" : f\"{input_folder_on_vip}/initial.file\", # reuse the stored inforation !\n", + " \"file\" : f\"{input_folder_on_vip}/initial.file\", # reuse the stored information !\n", " \"value\": 5\n", "}\n", "session.launch_pipeline(pipeline_id, adapted_inputs_settings)\n", From ab25ec769fcd18f2c6c569e432606d5cbcde1ed5 Mon Sep 17 00:00:00 2001 From: Ethaniel Billon Date: Mon, 28 Apr 2025 12:20:27 +0200 Subject: [PATCH 12/14] add option keep_input finish --- src/vip_client/classes/VipSession.py | 24 +++++++++++++----------- 1 file changed, 13 insertions(+), 11 deletions(-) diff --git a/src/vip_client/classes/VipSession.py b/src/vip_client/classes/VipSession.py index 47544e5..ae7005e 100644 --- a/src/vip_client/classes/VipSession.py +++ b/src/vip_client/classes/VipSession.py @@ -563,7 +563,7 @@ def run_session( ) # Clean session data on VIP - def finish(self, timeout=300, keep_output=False) -> VipSession: + def finish(self, timeout=300, keep_output=False, keep_input=False) -> VipSession: """ Removes session's data from VIP servers (INPUTS and by default OUTPUTS). The downloaded outputs and the input dataset are kept on the local machine. @@ -575,7 +575,7 @@ def finish(self, timeout=300, keep_output=False) -> VipSession: - OUTPUTS are by default deleted from VIP servers, the option `keep_output` override this behavior """ # Finish the session based on self._path_to_delete() - super().finish(timeout=timeout, keep_output=keep_output) + super().finish(timeout=timeout, keep_output=keep_output, keep_input=keep_input) # Check if the input data have been erased (this is not the case when get_inputs have been used) if (self._vip_input_dir != self._vip_dir / "INPUTS" and self._exists(self._vip_input_dir, location="vip")): @@ -674,16 +674,18 @@ def get_inputs(self, session: VipSession, get_pipeline=False, get_settings=False ################################################################### # Path to delete during session finish() - def _path_to_delete(self, **kwargs) -> dict: + def _path_to_delete(self, keep_output=False, keep_input=False) -> dict: """Returns the folders to delete during session finish, with appropriate location.""" - if (kwargs.get("keep_output", False)): - return { - self._vip_dir / "INPUTS": "vip" - } - else: - return { - self._vip_dir: "vip" - } + if not keep_input and not keep_output: + return { self._vip_dir: "vip" } + + result = {} + + if not keep_input: + result[self._vip_dir / "INPUTS"] = "vip" + if not keep_output: + result[self._vip_dir / "OUTPUTS"] = "vip" + return result # Method to check existence of a distant or local resource. @classmethod From fb04dcf2b1c46561d7a129ffdb6cc0d8c4d741a4 Mon Sep 17 00:00:00 2001 From: Axel Bonnet Date: Fri, 2 May 2025 15:57:40 +0200 Subject: [PATCH 13/14] refactor VipGirder --- examples/tutorials/example-VipGirder.ipynb | 210 +++++++++++++++++ src/vip_client/__init__.py | 3 +- src/vip_client/classes/VipGirder.py | 258 +++++++++++++-------- src/vip_client/classes/VipLauncher.py | 24 +- src/vip_client/classes/VipSession.py | 22 +- 5 files changed, 408 insertions(+), 109 deletions(-) create mode 100644 examples/tutorials/example-VipGirder.ipynb diff --git a/examples/tutorials/example-VipGirder.ipynb b/examples/tutorials/example-VipGirder.ipynb new file mode 100644 index 0000000..37d3c30 --- /dev/null +++ b/examples/tutorials/example-VipGirder.ipynb @@ -0,0 +1,210 @@ +{ + "cells": [ + { + "cell_type": "code", + "execution_count": null, + "id": "e4b2f485-cb8b-4e4b-b21c-c381760fc914", + "metadata": {}, + "outputs": [], + "source": [ + "%load_ext autoreload" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "9a2a0f64-107e-47a3-b994-4ba55735bfa8", + "metadata": {}, + "outputs": [], + "source": [ + "%autoreload 2" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "f72d24fb-f9f4-45b2-b7bc-0533216932ea", + "metadata": {}, + "outputs": [], + "source": [ + "import vip_client\n", + "import importlib\n", + "from vip_client import VipGirder\n", + "#importlib.reload(client)\n", + "vip_client.__path__" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "8a60aa4a-ba39-4c9b-b749-a79b8f6d21e7", + "metadata": {}, + "outputs": [], + "source": [ + "import inspect\n", + "inspect.getfile(VipGirder)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "fc36a278-550b-43f5-800d-3500612ba12d", + "metadata": {}, + "outputs": [], + "source": [ + "VipGirder.init()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "0a7c7ee2-2656-4585-9930-895e68cc90ac", + "metadata": {}, + "outputs": [], + "source": [ + "session.display()" + ] + }, + { + "cell_type": "markdown", + "id": "97d7cf93-2ab0-4d07-81b2-654b38480f9a", + "metadata": {}, + "source": [ + "# Output on girder (default)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "cdaa53b9-eaec-44d7-a342-7b41191fcb4a", + "metadata": {}, + "outputs": [], + "source": [ + "pipeline_id=\"BasicGrepLocal/0.2\"\n", + "VipGirder.show_pipeline(pipeline_id)\n", + "session_name=\"test_girder_girder\"\n", + "output_dir=\"/collection/ReproVIPSpectro/test/vip_outputs\"\n", + "input_settings={\n", + " \"file\":\"/collection/ReproVIPSpectro/test/test_for_grep.txt\",\n", + " \"int\":5,\n", + " \"text\":\"grep\"\n", + "}\n", + "session = VipGirder(session_name=session_name, pipeline_id=pipeline_id, input_settings=input_settings, output_dir=output_dir)" + ] + }, + { + "cell_type": "markdown", + "id": "1aec7d5d-05e1-4baf-af30-2e9b42e791f1", + "metadata": {}, + "source": [ + "# Output on VIP" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "25da2f62-e044-4ffd-855f-74881ad5d770", + "metadata": {}, + "outputs": [], + "source": [ + "pipeline_id=\"BasicGrepLocal/0.2\"\n", + "VipGirder.show_pipeline(pipeline_id)\n", + "session_name=\"test_girder_vip\"\n", + "input_settings={\n", + " \"file\":\"/collection/ReproVIPSpectro/test/test_for_grep.txt\",\n", + " \"int\":5,\n", + " \"text\":\"grep\"\n", + "}\n", + "session = VipGirder(session_name=session_name, pipeline_id=pipeline_id, input_settings=input_settings, output_location=\"vip\")" + ] + }, + { + "cell_type": "markdown", + "id": "7c726378-93b0-443d-9531-89b42735142d", + "metadata": {}, + "source": [ + "# Output in local" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "bff17ffe-bebc-419d-b68d-ed3b0d2f4cf7", + "metadata": {}, + "outputs": [], + "source": [ + "pipeline_id=\"BasicGrepLocal/0.2\"\n", + "VipGirder.show_pipeline(pipeline_id)\n", + "session_name=\"test_girder_local\"\n", + "input_settings={\n", + " \"file\":\"/collection/ReproVIPSpectro/test/test_for_grep.txt\",\n", + " \"int\":5,\n", + " \"text\":\"grep\"\n", + "}\n", + "session = VipGirder(session_name=session_name, pipeline_id=pipeline_id, input_settings=input_settings, output_location=\"local\")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "d47ee6da-1178-4646-b9f6-a71711dcac01", + "metadata": {}, + "outputs": [], + "source": [ + "session.launch_pipeline()\n", + "session.display()\n", + "session.monitor_workflows()\n", + "session.display()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "81f8dc9e-22ab-4603-ae3b-28d83f0a2615", + "metadata": {}, + "outputs": [], + "source": [ + "session.download_outputs()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "d54e5e44-7350-4e79-a8aa-0a8a58c60fac", + "metadata": {}, + "outputs": [], + "source": [ + "session.finish()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "39544d26-4881-4562-9d65-209c61836b33", + "metadata": {}, + "outputs": [], + "source": [] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3 (ipykernel)", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.13.3" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} diff --git a/src/vip_client/__init__.py b/src/vip_client/__init__.py index 31f8d1c..20e2b0c 100644 --- a/src/vip_client/__init__.py +++ b/src/vip_client/__init__.py @@ -8,7 +8,8 @@ """ # Informations -__version__ = "0.1.8" +__version__ = "0.1.9-a1" __license__ = "CECILL-B" from vip_client.classes import VipSession +from vip_client.classes import VipGirder diff --git a/src/vip_client/classes/VipGirder.py b/src/vip_client/classes/VipGirder.py index bb628f5..c1bdd1d 100644 --- a/src/vip_client/classes/VipGirder.py +++ b/src/vip_client/classes/VipGirder.py @@ -11,9 +11,9 @@ warn("vip_client.classes.VipGirder is unavailable (missing package: girder-client)") # Other classes from VIP client from vip_client.utils import vip -from vip_client.classes.VipLauncher import VipLauncher +from vip_client.classes.VipSession import VipSession -class VipGirder(VipLauncher): +class VipGirder(VipSession): """ Python class to run VIP pipelines on datasets located on Girder. @@ -23,6 +23,11 @@ class VipGirder(VipLauncher): - `input_settings` (dict) All parameters needed to run the pipeline. - `output_dir` (str) Path to a Girder folder where execution results will be stored. + By default, results will be written in the output_dir on girder. + But results can also be written on VIP or locally, using the output_location parameter. + If output_location is "vip", results will be written on VIP, and output_dir must NOT be given + If output_location is "local", results will be written on VIP, and are meant to be downloaded (this uses VipSession) + N.B.: all instance methods require that `VipGirder.init()` has been called with: - a valid VIP API key; - a valid Girder API key. @@ -40,7 +45,9 @@ class VipGirder(VipLauncher): _PROPERTIES = [ "session_name", "pipeline_id", - "vip_output_dir", + "local_output_dir", + "vip_output_dir", + "output_location", "input_settings", "workflows" ] @@ -62,34 +69,73 @@ class VipGirder(VipLauncher): ################# ################ Main Properties ################## ################# - + + @property + def output_dir(self) -> str: + if self.output_location is None or self.output_location == "girder": + return self.vip_output_dir + else: + return self.local_output_dir + + @output_dir.setter + def output_dir(self, new_dir: str) -> None: + # Display + self._print("Output directory:", new_dir) + # Set the new output directory + if self.output_location is None or self.output_location == "girder": + self.vip_output_dir = new_dir + else: + self.local_output_dir = new_dir + # Load backup data from the new output directory + # in output_location is vip, session is loaded when vip_output_dir is set + # it cannot be set here as the default (and unmodifiable) vip_output_dir is set in VipSession + if self.output_location is not None and self.output_location != "vip": + self._load() + @property def custom_wf_metadata(self) -> dict: return self._custom_wf_metadata @custom_wf_metadata.setter def custom_wf_metadata(self, value: dict) -> None: - if value != None: + if value is not None: assert isinstance(value, dict), f"Custom metadata must be a dictionary, not {type(value)}" self._custom_wf_metadata = value + @property + def output_location(self) -> str: + return self._output_location + @output_location.setter + def output_location(self, value: str) -> None: + if value != None: + assert isinstance(value, str), f"output_location metadata must be a String, not {type(value)}" + self._assert_location_value(value, "output_location") + self._output_location = value + # if output location is local, OUTPUT_SERVER is vip but location is local to store the session and the results + self._OUTPUT_SERVER_NAME = value if value != "local" else "vip" ############# ################ Constructor ################## ############# def __init__( - self, output_dir=None, pipeline_id: str=None, input_settings: dict=None, - session_name: str=None, verbose: bool=None, custom_wf_metadata: dict=None, **kwargs + self, output_location='girder', output_dir=None, pipeline_id: str=None, input_settings: dict=None, + session_name: str=None, verbose: bool=None, custom_wf_metadata: dict=None ) -> None: """ Creates a VipGirder instance and sets its properties from keyword arguments. ## Parameters - - `output_dir` (str | os.PathLike) Path to a Girder folder where execution results will be stored. + + - `output_location` (str) "girder" (default) or "vip" or "local" + + - `output_dir` (str | os.PathLike) depends on output_location value. + - if output_location="girder", Path to a Girder folder where execution results will be stored. + Usually in format : "/collection/[collection_name]/[path_to_folder]" + User must have read/write permissions on the Girder collection/folder. + - if output_location="vip", must be absent + - if output_location="local", optional path to a local folder where results could be downloaded - Does not need to exist - - Usually in format : "/collection/[collection_name]/[path_to_folder]" - - User must have read/write permissions on the Girder collection/folder. - `pipeline_id` (str) Name of your pipeline in VIP. - Usually in format : *application_name*/*version*. @@ -112,6 +158,21 @@ def __init__( `session_name` is only set at instantiation; other properties can be set later in function calls. If `output_dir` leads to data from a previous session, properties will be loaded from the metadata on Girder. """ + # this overrides VipSession to be able to have output in local or vip (VipSession output behavior) or on + # girder (VipGirder overridden behavior, the default). This is determined by output_location and configured + # in the output_folder property setter + # in all case, input is from girder, vip_input_folder and local_input_folder are ignored + # output_location can be [girder|local|vip]. default is girder + self.output_location = output_location if output_location else "girder" + # if output_location is girder, output_dir will be set to vip_output_dir (see property setter) + # if output_location is local, VipSession will work as expected output_dir -> local_output_dir + # if output_location is vip, VipSession will work as expected (except cleaning, see finish), + # but output_dir cannot be specified + if self.output_location == "vip" and output_dir is not None: + raise ValueError('output_dir cannot be specified with "vip" output_dir') + # if backup_location is not None, set it to the same value as output_location + if self._BACKUP_LOCATION is not None: + self._BACKUP_LOCATION = self.output_location # Initialize with the name, pipeline and input settings super().__init__( output_dir = output_dir, @@ -120,6 +181,10 @@ def __init__( input_settings = input_settings, verbose = verbose ) + # if output_location is vip, loading has not been done in output_dir setter as vip_output_dir is set later + # in VipSession constructor, so we load here. + if self.output_location == "vip": + self._load() # Set custom properties self.custom_wf_metadata = custom_wf_metadata # End display @@ -144,6 +209,7 @@ def init( verbose=True, girder_api_url=None, girder_id_prefix=None, + backup_location='girder', **kwargs ) -> VipGirder: """ @@ -158,6 +224,12 @@ def init( In cases B or C, the API key will be loaded from the local file or the environment variable. - `girder_key` (str): Girder API key. Can take the same values as `vip_key`. + + - `girder_api_url` (str): Girder instance URL. Must have the "/api/v1" suffix + + - `girder_id_prefix` (str): Girder instance identifier as a VIP external storage + + - `backup_location` (str): None to avoid. Otherwise, will be overridden by output_location in constructor - `verbose` (bool): default verbose mode for all instances. - If True, all instances will display logs by default; @@ -165,15 +237,11 @@ def init( - `kwargs` [Optional] (dict): keyword arguments or dictionnary setting properties of the returned instance. """ - # Initiate a Vip Session silently - super().init(api_key=vip_key, verbose=False, **kwargs) - # Restore the verbose state - cls._VERBOSE = verbose # Set the Girder ID prefix cls._GIRDER_ID_PREFIX = girder_id_prefix if girder_id_prefix is not None else cls._GIRDER_ID_PREFIX cls._GIRDER_PORTAL = girder_api_url if girder_api_url is not None else cls._GIRDER_PORTAL # Instantiate a Girder client - cls._girder_client = girder_client.GirderClient(apiUrl=girder_api_url) + cls._girder_client = girder_client.GirderClient(apiUrl=cls._GIRDER_PORTAL) # Check if `girder_key` is in a local file or environment variable true_key = cls._get_api_key(girder_key) # Authenticate with Girder API key @@ -181,17 +249,18 @@ def init( # Diplay success cls._printc() cls._printc("---------------------------------------------") - cls._printc("| You are communicating with VIP and Girder |") + cls._printc("| You are communicating with Girder |") cls._printc("---------------------------------------------") cls._printc() - # Return a VipGirder instance for method cascading - return cls(verbose=(verbose and kwargs), **kwargs) + return super().init(api_key=vip_key, verbose=verbose, backup_location=backup_location, **kwargs) # ------------------------------------------------ + def upload_inputs(self, input_dir=None, update_files=True) -> VipSession: + raise NotImplementedError("upload_inputs cannot be called in VipGirder") + # Launch the pipeline on VIP def launch_pipeline( - self, pipeline_id: str=None, input_settings: dict=None, output_dir=None, nb_runs=1, - verbose: bool=None + self, pipeline_id: str=None, input_settings: dict=None, nb_runs=1 ) -> VipGirder: """ Launches pipeline executions on VIP. @@ -214,7 +283,6 @@ def launch_pipeline( return super().launch_pipeline( pipeline_id = pipeline_id, # default input_settings = input_settings, # default - output_dir = output_dir, # default nb_runs = nb_runs, # default ) # ------------------------------------------------ @@ -230,7 +298,10 @@ def monitor_workflows(self, refresh_time=30) -> VipGirder: # ------------------------------------------------ # Run a full VipGirder session - def run_session(self, nb_runs=1, refresh_time=30) -> VipGirder: + def run_session( + self, nb_runs=1, refresh_time=30, + unzip=True, get_status=["Finished"] + ) -> VipSession: """ Runs a full session from Girder data: 1. Launches pipeline executions on VIP; @@ -242,7 +313,14 @@ def run_session(self, nb_runs=1, refresh_time=30) -> VipGirder: - Increase `nb_runs` to run more than 1 execution at once; - Set `refresh_time` to modify the default refresh time. """ - return super().run_session(nb_runs=nb_runs, refresh_time=refresh_time) + (self.launch_pipeline(nb_runs=nb_runs) + .monitor_workflows(refresh_time=refresh_time)) + + if self.output_location is not None and self.output_location == "local": + self.download_outputs(get_status=get_status, unzip=unzip) + + return self + # ------------------------------------------------ # Display session properties in their current state @@ -259,41 +337,40 @@ def display(self) -> VipGirder: return super().display() # ------------------------------------------------ - def download_outputs(self, output_dir: str): - """This will works only when girder isn't the output location.""" - output_path = PurePath(output_dir) - - os.makedirs(output_path, exist_ok=True) - for workflow in self._workflows.keys(): - files = vip.get_exec_results(workflow) - files = vip.get_exec_results(workflow) - to_download = [file["path"] for file in files] - linked_download = [] + def download_outputs( + self, unzip: bool=True, get_status: list=["Finished"], init_timeout: int=None + ) -> VipSession: - for path in to_download: - posixPath = PosixPath(path) - linked_download.append((path, output_path / posixPath.name)) + if self.output_location is not None and self.output_location != "local": + raise NotImplementedError("download_outputs only works in VipGirder if output_location is local") - for v in linked_download: - if not (vip.download(v[0], v[1])): - print(f"Failed to download {v[0]} to {v[1]}") - print("Workflow outputs successfully downloaded locally!") + super().download_outputs(unzip, get_status, init_timeout) # ------------------------------------------------ # Return error in case of call to finish() - def finish(self, verbose: bool=None, keep_output=False) -> None: - """ - This function does nothing when using girder as output location else it erase the data on vip. - """ - if (self._OUTPUT_SERVER_NAME == "girder"): - # Update the verbose state and display - self._verbose = verbose - self._print("\n=== FINISH ===\n", max_space=2) - elif not keep_output: - for values in self._workflows.values(): - self._delete_and_check(PurePath(values["output_path"]).parent) - print(f"Workflow successfully cleaned on VIP ({str(PurePath(values['output_path']).parent)})") + def finish(self, timeout=300, keep_output=False) -> VipSession: + """ + This function does nothing when using girder as output location else it erases the data on vip. + """ + # nothing to do with girder output_location + # if vip or local, VipSession must not delete vip_input_dir as it does not exist, so we set keep_input to True + if self.output_location == "girder": + self._print("\n=== FINISH ===\n") + self._print("Ending Session:", self._session_name) + # Check if workflows are still running (without call to VIP) + if self._still_running(): + # Update the workflow inventory + self._print("Updating worflow inventory ... ", end="", flush=True) + self._update_workflows() + self._print("Done.") + # Return is workflows are still running + if self._still_running(): + self._print("\n(!) This session cannot be finished since the pipeline might still generate data.\n") + self._execution_report() + return self + else: + super().finish(timeout=timeout, keep_input=True, keep_output=keep_output) # ------------------------------------------------ @@ -307,11 +384,22 @@ def finish(self, verbose: bool=None, keep_output=False) -> None: ################################################################### # Path to delete during session finish - def _path_to_delete(self) -> dict: + def _path_to_delete(self, keep_input=False, keep_output=False) -> dict: """Returns the folders to delete during session finish, with appropriate location.""" - return {} + if not keep_input: + raise NotImplementedError("cannot delete inputs in VipGirder") + + if not keep_output and self.output_location == "girder": + raise NotImplementedError("cannot delete outputs in VipGirder if output_location is girder") + + return super()._path_to_delete(True, keep_output) # ------------------------------------------------ + @classmethod + def _assert_location_value(cls, backup_location, label='backup_location') -> None: + if backup_location is not None and backup_location != 'girder': + super()._assert_location_value(backup_location=backup_location) + # Method to check existence of a resource on Girder. @classmethod def _exists(cls, path: PurePath, location="girder") -> bool: @@ -379,29 +467,32 @@ def _init_exec(self) -> str: Initiates one VIP workflow with `pipeline_id`, `session_name`, `input_settings`, `output_dir`. Returns the workflow identifier. """ - # Get function arguments - input_settings = self._get_input_settings(location="vip-girder") - res_path = str(self._vip_output_dir) + "/OUTPUTS" - res_vip = res_path - if (self._OUTPUT_SERVER_NAME == "girder"): + + result_location = self.vip_output_dir + + if self.output_location == "girder": # Create a workflow-specific result directory - res_path = self._vip_output_dir / time.strftime('%Y-%m-%d_%H:%M:%S', time.localtime()) - # no simple way to rename later with workflow_id + res_path = self._vip_output_dir / time.strftime('%Y-%m-%d_%H:%M:%S', time.localtime()) + # no simple way to rename later with workflow_id res_id = self._create_dir( - path=res_path, location=self._OUTPUT_SERVER_NAME, + path=res_path, location=self._OUTPUT_SERVER_NAME, description=f"VIP outputs from one workflow in Session '{self._session_name}'" ) - res_vip = self._vip_girder_id(res_id) + result_location = self._vip_girder_id(res_id) + input_settings = self._get_input_settings(location="vip-girder") # Launch execution workflow_id = vip.init_exec( pipeline = self.pipeline_id, name = self.session_name, inputValues = input_settings, - resultsLocation = res_vip + resultsLocation = result_location ) - # Record the path to output files (create the workflow entry) - self._workflows[workflow_id] = {"output_path": str(res_path)} + + if self.output_location == "girder": + # Record the path to output files (create the workflow entry) + self._workflows[workflow_id] = {"output_path": str(res_path)} + return workflow_id # ------------------------------------------------ @@ -418,39 +509,18 @@ def _meta_workflow(self, workflow_id: str) -> dict: metadata = {**metadata, **self.custom_wf_metadata} return metadata - # Overwrite _get_exec_infos() to bypass call to vip.get_exec_results() (does not work at this time) - @classmethod - def _get_exec_infos(cls, workflow_id: str) -> dict: - """ - Returns succint information on `workflow_id`: - - Execution status (VIP notations) - - Starting time (local time, format '%Y/%m/%d %H:%M:%S') - - List of paths to the output files. - """ - try : - # Get execution infos - infos = vip.execution_info(workflow_id) - # Secure way to get execution results - # files = vip.get_exec_results(workflow_id) - except RuntimeError as vip_error: - cls._handle_vip_error(vip_error) - # Return filtered information - return { - # Execution status (VIP notations) - "status": infos["status"], - # Starting time (human readable) - "start": time.strftime( - '%Y/%m/%d %H:%M:%S', time.localtime(infos["startDate"]/1000) - ), - # # Returned files - # "outputs": infos["returnedFiles"]["output_file"] - } # ------------------------------------------------ ################################################### # Save (/load) Session to (/from) Girder metadata # ################################################### + def _data_to_save(self) -> dict: + props = super()._data_to_save() + if self.output_location == "girder": + del props["local_output_dir"] + return props + # Save session properties in a JSON file def _save_session(self, session_data: dict, location="girder") -> bool: """ @@ -488,7 +558,7 @@ def _load_session(self, location="girder") -> dict: If the metadata could not be found, returns None. Otherwise, returns session properties as a dictionary. """ - # Thow error if location is not "girder" + if location != "girder": return super()._load_session(location) # Check the output directory is defined diff --git a/src/vip_client/classes/VipLauncher.py b/src/vip_client/classes/VipLauncher.py index 9eaeb0e..08e7911 100644 --- a/src/vip_client/classes/VipLauncher.py +++ b/src/vip_client/classes/VipLauncher.py @@ -373,7 +373,7 @@ def __init__( # ($A.1) Login to VIP @classmethod def init(cls, api_key="VIP_API_KEY", verbose=True, vip_portal_url=None, - **kwargs) -> VipLauncher: + backup_location=None, **kwargs) -> VipLauncher: """ Handshakes with VIP using your own API key. Returns a class instance which properties can be provided as keyword arguments. @@ -384,7 +384,9 @@ def init(cls, api_key="VIP_API_KEY", verbose=True, vip_portal_url=None, B. [safer] A **path to some local file** containing your API key, C. [safer] The **name of some environment variable** containing your API key (default: "VIP_API_KEY"). In cases B or C, the API key will be loaded from the local file or the environment variable. - + + - `backup_location` (str): "vip" or None (default : None) + - `verbose` (bool): default verbose mode for all instances. - If True, all instances will display logs by default; - If False, all instance methods will run silently by default. @@ -395,6 +397,9 @@ def init(cls, api_key="VIP_API_KEY", verbose=True, vip_portal_url=None, cls._VERBOSE = verbose # Set the VIP portal URL cls._VIP_PORTAL = vip_portal_url if vip_portal_url else cls._VIP_PORTAL + # set the backup location + cls._assert_location_value(backup_location) + cls._BACKUP_LOCATION = backup_location if backup_location else cls._BACKUP_LOCATION # Check if `api_key` is in a local file or environment variable true_key = cls._get_api_key(api_key) # Set User API key @@ -615,7 +620,7 @@ def run_session( self, nb_runs=1, refresh_time=30) -> VipLauncher: # ------------------------------------------------ # Clean session data on VIP - def finish(self, timeout=300, **kwargs) -> VipLauncher: + def finish(self, timeout=300, keep_input=False, keep_output=False) -> VipLauncher: """ Removes session's output data from VIP servers. @@ -642,7 +647,7 @@ def finish(self, timeout=300, **kwargs) -> VipLauncher: self._print("---------------------") # Browse paths to delete success = True - for path, location in self._path_to_delete(**kwargs).items(): + for path, location in self._path_to_delete(keep_input, keep_output).items(): # Display progression self._print(f"[{location}] {path} ... ", end="", flush=True) # Check data existence @@ -695,7 +700,7 @@ def finish(self, timeout=300, **kwargs) -> VipLauncher: else: self._print("(!) There may still be temporary data on VIP.") self._print(f"Please run finish() again or check the following path(s) on the VIP portal ({self._VIP_PORTAL}):") - self._print('\n\t'.join([str(path) for path in self._path_to_delete(**kwargs)])) + self._print('\n\t'.join([str(path) for path in self._path_to_delete(keep_input, keep_output)])) # Finish display self._print() # Return @@ -839,9 +844,9 @@ def display(self) -> VipLauncher: ################################################################### # Path to delete during session finish - def _path_to_delete(self, **kwargs) -> dict: + def _path_to_delete(self, keep_input=False, keep_output=False) -> dict: """Returns the folders to delete during session finish, with appropriate location.""" - return { + return {} if not keep_output else { self._vip_output_dir: "vip" } # ------------------------------------------------ @@ -1065,6 +1070,11 @@ def _get_api_key(cls, api_key: str) -> str: return true_key # ------------------------------------------------ + @classmethod + def _assert_location_value(cls, backup_location, label='backup_location') -> None: + if backup_location is not None and backup_location != 'vip': + raise ValueError("invalid " + label) + # (A.3) Launch pipeline executions on VIP servers ################################################## @classmethod diff --git a/src/vip_client/classes/VipSession.py b/src/vip_client/classes/VipSession.py index ae7005e..3c161bc 100644 --- a/src/vip_client/classes/VipSession.py +++ b/src/vip_client/classes/VipSession.py @@ -294,7 +294,7 @@ def __init__( # Overwrite VipLauncher.init() to be compatible with new kwargs @classmethod - def init(cls, api_key="VIP_API_KEY", verbose=True, **kwargs) -> VipSession: + def init(cls, api_key="VIP_API_KEY", verbose=True, backup_location='local', **kwargs) -> VipSession: """ Handshakes with VIP using your own API key. Returns a class instance which properties can be provided as keyword arguments. @@ -304,7 +304,9 @@ def init(cls, api_key="VIP_API_KEY", verbose=True, **kwargs) -> VipSession: A. [unsafe] A **string litteral** containing your API key, B. [safer] A **path to some local file** containing your API key, C. [safer] The **name of some environment variable** containing your API key (default: "VIP_API_KEY"). - In cases B or C, the API key will be loaded from the local file or the environment variable. + In cases B or C, the API key will be loaded from the local file or the environment variable. + + - `backup_location` (str): "vip" or "local" or None (default : "local") - `verbose` (bool): default verbose mode for all instances. - If True, all instances will display logs by default; @@ -312,7 +314,7 @@ def init(cls, api_key="VIP_API_KEY", verbose=True, **kwargs) -> VipSession: - `kwargs` [Optional] (dict): keyword arguments or dictionnary setting properties of the returned instance. """ - return super().init(api_key=api_key, verbose=verbose, **kwargs) + return super().init(api_key=api_key, verbose=verbose, backup_location=backup_location, **kwargs) # ------------------------------------------------ # Upload a dataset on VIP servers @@ -563,7 +565,7 @@ def run_session( ) # Clean session data on VIP - def finish(self, timeout=300, keep_output=False, keep_input=False) -> VipSession: + def finish(self, timeout=300, keep_input=False, keep_output=False) -> VipSession: """ Removes session's data from VIP servers (INPUTS and by default OUTPUTS). The downloaded outputs and the input dataset are kept on the local machine. @@ -575,9 +577,9 @@ def finish(self, timeout=300, keep_output=False, keep_input=False) -> VipSession - OUTPUTS are by default deleted from VIP servers, the option `keep_output` override this behavior """ # Finish the session based on self._path_to_delete() - super().finish(timeout=timeout, keep_output=keep_output, keep_input=keep_input) + super().finish(timeout=timeout, keep_input=keep_input, keep_output=keep_output) # Check if the input data have been erased (this is not the case when get_inputs have been used) - if (self._vip_input_dir != self._vip_dir / "INPUTS" + if (not keep_input and self._vip_input_dir != self._vip_dir / "INPUTS" and self._exists(self._vip_input_dir, location="vip")): self._print(f"(!) The input data are still on VIP:\n\t{self.vip_input_dir}") self._print( " They belong to another session.") @@ -673,8 +675,9 @@ def get_inputs(self, session: VipSession, get_pipeline=False, get_settings=False # new location: "local" ################################################################### + # Path to delete during session finish() - def _path_to_delete(self, keep_output=False, keep_input=False) -> dict: + def _path_to_delete(self, keep_input=False, keep_output=False) -> dict: """Returns the folders to delete during session finish, with appropriate location.""" if not keep_input and not keep_output: return { self._vip_dir: "vip" } @@ -687,6 +690,11 @@ def _path_to_delete(self, keep_output=False, keep_input=False) -> dict: result[self._vip_dir / "OUTPUTS"] = "vip" return result + @classmethod + def _assert_location_value(cls, backup_location, label='backup_location') -> None: + if backup_location is not None and backup_location != 'local': + super()._assert_location_value(backup_location=backup_location) + # Method to check existence of a distant or local resource. @classmethod def _exists(cls, path: PurePath, location="local") -> bool: From 4b94f2718db7f86788dffe01f919c82743aa0470 Mon Sep 17 00:00:00 2001 From: Hippolyte Blot Date: Wed, 7 May 2025 15:59:15 +0200 Subject: [PATCH 14/14] Fixing tests depending of the new code architecture --- tests/FakeGirderClient.py | 58 ++++++++++++++++++++++++++++++--------- tests/mocked_services.py | 51 ++++------------------------------ tests/test_VipGirder.py | 14 +++++----- tests/test_VipLauncher.py | 2 +- tests/test_global.py | 39 +++++++++++++++----------- 5 files changed, 82 insertions(+), 82 deletions(-) diff --git a/tests/FakeGirderClient.py b/tests/FakeGirderClient.py index a1a40cb..8dc4e5c 100644 --- a/tests/FakeGirderClient.py +++ b/tests/FakeGirderClient.py @@ -8,7 +8,15 @@ def authenticate(self, apiKey): return True def resourceLookup(self, path): - return {'_id': 'fake_id', '_modelType': 'folder'} + if path == '/vip/Home/test-VipLauncher-Backup/OUTPUTS': + # Used to test the backup location, linked to the fake fetFolder method + print("FakeGirderClient: resourceLookup called with path:", path) + return {'_id': 'fake_id', '_modelType': 'folder'} + elif path == '/vip/Home/test-VipLauncher-Backup-Special/OUTPUTS': + print("FakeGirderClient: resourceLookup called with path:", path) + return {'_id': 'different_id', '_modelType': 'folder'} + else: + return {'_id': 'other_id', '_modelType': 'folder'} def createFolder(self, parentId, name, reuseExisting=True, **kwargs): return {'_id': 'fake_id'} @@ -17,18 +25,42 @@ def addMetadataToFolder(self, folderId, metadata): return True def getFolder(cls, folderId): - metadata = { - 'input_settings': { - 'zipped_folder': 'fake_value', - 'basis_file': 'fake_value', - 'signal_file': ['fake_value', 'fake_value'], - 'control_file': ['fake_value']}, - "pipeline_id": cls.pipeline_id, - 'session_name': 'test-VipLauncher', - 'workflows': {}, - "vip_output_dir": "/vip/Home/test-VipLauncher/OUTPUTS" - } - return {'_id': 'fake_id', 'meta': metadata} + if folderId == 'fake_id': + print("FakeGirderClient: getFolder called with folderId:", folderId) + metadata = { + 'input_settings': { + 'zipped_folder': 'fake_value1', + 'basis_file': 'fake_value2', + 'signal_file': ['fake_value3', 'fake_value4'], + 'control_file': ['fake_value5'] + }, + "pipeline_id": cls.pipeline_id, + 'session_name': 'test-VipLauncher', + 'workflows': {}, + "vip_output_dir": "/vip/Home/test-VipLauncher-Backup/OUTPUTS", + 'output_location': 'girder', + 'local_output_dir': '/path/to/local/output', + } + return {'_id': 'fake_id', 'meta': metadata} + elif folderId == 'different_id': + print("FakeGirderClient: getFolder called with folderId:", folderId) + metadata = { + 'input_settings': { + 'zipped_folder': 'different_value1', + 'basis_file': 'different_value2', + 'signal_file': ['different_value3', 'different_value4'], + 'control_file': ['different_value5'] + }, + "pipeline_id": cls.pipeline_id, + 'session_name': 'test-VipLauncher-Special', + 'workflows': {}, + "vip_output_dir": "/vip/Home/test-VipLauncher-Backup-Special/OUTPUTS", + 'output_location': 'girder', + 'local_output_dir': '/path/to/local/output', + } + return {'_id': 'different_id', 'meta': metadata} + else: + return {'_id': 'fake_id', 'meta': {}} def get(self, path): return {'_id': 'fake_id'} diff --git a/tests/mocked_services.py b/tests/mocked_services.py index bd41bbd..ff6f087 100644 --- a/tests/mocked_services.py +++ b/tests/mocked_services.py @@ -77,7 +77,11 @@ def fake_list_elements(self): def fake_exists(path): return False - #mocker.patch("vip_client.utils.vip.exists", side_effect = fake_exists) + def fake_delete_path(path): + return True + + # mocker.patch("vip_client.utils.vip.exists", side_effect = fake_exists) + mocker.patch("vip_client.utils.vip.exists").return_value = True mocker.patch("vip_client.utils.vip.upload").return_value = True mocker.patch("vip_client.utils.vip.download").return_value = True mocker.patch("vip_client.utils.vip.pipeline_def").side_effect = fake_pipeline_def @@ -86,6 +90,7 @@ def fake_exists(path): mocker.patch("vip_client.utils.vip.init_exec").side_effect = fake_init_exec mocker.patch("vip_client.utils.vip.execution_info").side_effect = fake_execution_info mocker.patch("vip_client.utils.vip.list_elements").side_effect = fake_list_elements + mocker.patch("vip_client.utils.vip.delete_path").side_effect = fake_delete_path def mock_pathlib(mocker): @@ -102,50 +107,6 @@ def fake_pathlib_iterdir(): def mock_os(mocker): mocker.patch("os.unlink") - - class FakeGirderClient(): - - pipeline_id = "LCModel/0.1" - def __init__(self, apiUrl): - pass - def authenticate(self, apiKey): - return True - - def resourceLookup(self, path): - return {'_id': 'fake_id', '_modelType': 'folder'} - - def createFolder(self, parentId, name, reuseExisting=True, **kwargs): - return {'_id': 'fake_id'} - - def addMetadataToFolder(self, folderId, metadata): - return True - - def getFolder(cls, folderId): - metadata = { - 'input_settings': { - 'zipped_folder': 'fake_value', - 'basis_file': 'fake_value', - 'signal_file': ['fake_value', 'fake_value'], - 'control_file': ['fake_value']}, - "pipeline_id": cls.pipeline_id, - 'session_name': 'test-VipLauncher', - 'workflows': {}, - "vip_output_dir": "/vip/Home/test-VipLauncher/OUTPUTS" - } - return {'_id': 'fake_id', 'meta': metadata} - - def get(self, path): - return {'_id': 'fake_id'} - - def listFiles(self, folderId): - return [{'_id': 'fake_id'}] - - def listItem(self, folderId): - return {'_id': 'fake_id'} - - @classmethod - def set_pipeline_id(cls, pipeline_id): - cls.pipeline_id = pipeline_id def mock_girder_client(mocker): from FakeGirderClient import FakeGirderClient diff --git a/tests/test_VipGirder.py b/tests/test_VipGirder.py index f3100ce..1014b03 100644 --- a/tests/test_VipGirder.py +++ b/tests/test_VipGirder.py @@ -82,9 +82,8 @@ def fake_execution_info(workflow_id): mocker.patch("vip_client.utils.vip.execution_info").side_effect = fake_execution_info # Launch a Full Session Run - s = VipGirder() + s = VipGirder(output_location="girder", session_name='test-VipLauncher', output_dir=PurePosixPath("/vip/Home/test-VipLauncher/OUTPUTS")) s.pipeline_id = pipeline_id - s.output_dir = PurePosixPath("/vip/Home/test-VipLauncher/OUTPUTS") s.input_settings = { "zipped_folder": 'fake_value', "basis_file": 'fake_value', @@ -107,21 +106,21 @@ def fake_execution_info(workflow_id): "basis_file": 'fake_value2', "signal_file": ['fake_value3', 'fake_value4'], "control_file": ['fake_value5'] - }, "LCModel/0.1", PurePosixPath("/vip/Home/test-VipLauncher/OUTPUTS"), + }, "LCModel/0.1", PurePosixPath("/vip/Home/test-VipLauncher-Backup/OUTPUTS"), ), (None, { "zipped_folder": None, "basis_file": None, "signal_file": None, "control_file": None - }, "LCModel/0.1", PurePosixPath("/vip/Home/test-VipLauncher/OUTPUTS"), + }, "LCModel/0.1", PurePosixPath("/vip/Home/test-VipLauncher-Backup/OUTPUTS"), ), ('girder', { "zipped_folder": 'different_value1', "basis_file": 'different_value2', "signal_file": ['different_value3', 'different_value4'], "control_file": ['different_value5'] - }, "LCModel/0.1", PurePosixPath("/vip/Home/test-VipLauncher/OUTPUTS"), + }, "LCModel/0.1", PurePosixPath("/vip/Home/test-VipLauncher-Backup-Special/OUTPUTS"), ) ] ) @@ -130,12 +129,13 @@ def test_backup(mocker, backup_location, input_settings, pipeline_id, output_dir VipGirder._BACKUP_LOCATION = backup_location # Create session - s1 = VipGirder(pipeline_id=pipeline_id, input_settings=input_settings) - s1.output_dir = output_dir + s1 = VipGirder(pipeline_id=pipeline_id, input_settings=input_settings, output_dir=output_dir) + assert s1._save() is not (VipGirder._BACKUP_LOCATION is None) # Return False if no backup location # Load backup + print("S1.OUTPUT_DIR", s1.output_dir) s2 = VipGirder(output_dir=s1.output_dir) # Check parameters assert s2.output_dir == s1.output_dir diff --git a/tests/test_VipLauncher.py b/tests/test_VipLauncher.py index cae305d..e068457 100644 --- a/tests/test_VipLauncher.py +++ b/tests/test_VipLauncher.py @@ -94,7 +94,7 @@ def fake_delete_path(path): assert s.workflows[wid]["status"] == "Finished" assert s.pipeline_id == pipeline_id # Finish the Session - s.finish(timeout=1) + s.finish(timeout=1, keep_input=True, keep_output=True) # Check Deletion assert removed for wid in s.workflows: diff --git a/tests/test_global.py b/tests/test_global.py index 16d5479..f60827c 100644 --- a/tests/test_global.py +++ b/tests/test_global.py @@ -22,8 +22,8 @@ "zipped_folder": 'fake_value1', "basis_file": 'fake_value2', }, - { - } + # { + # } ] # VipSession trouve pas que l'input est vide quand on a '' et non [] @@ -66,19 +66,22 @@ def setup_teardown_vip_launcher(request, mocker): VipGirder.init(vip_key="FAKE_KEY", girder_key="FAKE_KEY") print("Setup done") - +# BIZARRE @pytest.mark.parametrize( "input_settings, tested_class", test_cases_missing_input_fields ) -def test_missing_input_settings(input_settings, tested_class): +def test_missing_input_settings(mocker, input_settings, tested_class): VipGirder._BACKUP_LOCATION = None # Copy the first session - s = VipGirder() + s = tested_class(session_name="test-VipLauncher", input_settings=input_settings) s.pipeline_id = "LCModel/0.1" - s.output_dir = "/path/to/output" - s.input_settings = input_settings + if tested_class == VipLauncher: + s.output_dir = "/path/to/output" + if tested_class == VipSession: + mocker.patch.object(VipSession, '_exists', return_value=True) + s.input_dir = "." needed_fields = ["zipped_folder", "basis_file", "signal_file"] missing_fields = [field for field in needed_fields if field not in input_settings] @@ -113,22 +116,26 @@ def is_input_full(value): mocker.patch("pathlib.Path.is_file").return_value = True # Copy the first session - s = tested_class() - s.pipeline_id = "LCModel/0.1" - if tested_class == VipSession: - mocker.patch.object(VipSession, '_exists', return_value=True) - s.input_dir = "." - else: - s.output_dir = "/path/to/output" + + #else: + #s.output_dir = "/path/to/output" missing_fields = [field for field in input_settings if not is_input_full(input_settings[field])] if not missing_fields: - s.input_settings = input_settings + s = tested_class(input_settings=input_settings, session_name="test-VipLauncher") + s.pipeline_id = "LCModel/0.1" + if tested_class == VipSession: + mocker.patch.object(VipSession, '_exists', return_value=True) + s.input_dir = "." s.run_session() return # Catch the exception message with pytest.raises(ValueError) as e: - s.input_settings = input_settings + s = tested_class(input_settings=input_settings, session_name="test-VipLauncher") + s.pipeline_id = "LCModel/0.1" + if tested_class == VipSession: + mocker.patch.object(VipSession, '_exists', return_value=True) + s.input_dir = "." s.run_session() assert str(e.value) == "Missing input value(s) for parameter(s): " + ", ".join(sorted(missing_fields))