diff --git a/simvue/factory/proxy/__init__.py b/simvue/factory/proxy/__init__.py deleted file mode 100644 index dbcf0cba..00000000 --- a/simvue/factory/proxy/__init__.py +++ /dev/null @@ -1,32 +0,0 @@ -""" -Proxy -===== - -Selects whether to use offline or online processing depending on configuration. -""" - -import typing - -if typing.TYPE_CHECKING: - from .base import SimvueBaseClass - from simvue.config import SimvueConfiguration - -from .offline import Offline -from .remote import Remote - - -def Simvue( - name: str | None, - uniq_id: str, - mode: str, - config: "SimvueConfiguration", - suppress_errors: bool = True, -) -> "SimvueBaseClass": - if mode == "offline": - return Offline( - name=name, uniq_id=uniq_id, suppress_errors=suppress_errors, config=config - ) - else: - return Remote( - name=name, uniq_id=uniq_id, config=config, suppress_errors=suppress_errors - ) diff --git a/simvue/factory/proxy/base.py b/simvue/factory/proxy/base.py deleted file mode 100644 index da3991fb..00000000 --- a/simvue/factory/proxy/base.py +++ /dev/null @@ -1,78 +0,0 @@ -import abc -import logging -import typing - - -class SimvueBaseClass(abc.ABC): - @abc.abstractmethod - def __init__( - self, - name: str | None, - uniq_id: str, - suppress_errors: bool, - ) -> None: - self._logger = logging.getLogger(f"simvue.{self.__class__.__name__}") - self._suppress_errors: bool = suppress_errors - self._uuid: str = uniq_id - self.name: str | None = name - self.id: int | None = None - self._aborted: bool = False - - def _error(self, message: str) -> None: - """ - Raise an exception if necessary and log error - """ - if not self._suppress_errors: - raise RuntimeError(message) - self._logger.error(message) - self._aborted = True - - @abc.abstractmethod - def list_tags(self) -> list[str] | None: - pass - - @abc.abstractmethod - def create_run(self, data: dict[str, typing.Any]) -> tuple[str, str | None]: - pass - - @abc.abstractmethod - def update(self, data: dict[str, typing.Any]) -> dict[str, typing.Any] | None: - pass - - @abc.abstractmethod - def set_folder_details(self, data) -> dict[str, typing.Any] | None: - pass - - @abc.abstractmethod - def save_file(self, data: dict[str, typing.Any]) -> dict[str, typing.Any] | None: - pass - - @abc.abstractmethod - def add_alert(self, data: dict[str, typing.Any]) -> dict[str, typing.Any] | None: - pass - - @abc.abstractmethod - def set_alert_state( - self, alert_id: str, status: str - ) -> dict[str, typing.Any] | None: - pass - - @abc.abstractmethod - def list_alerts(self) -> list[dict[str, typing.Any]]: - pass - - @abc.abstractmethod - def send_metrics(self, data: dict[str, typing.Any]) -> dict[str, typing.Any] | None: - pass - - @abc.abstractmethod - def send_event(self, data: dict[str, typing.Any]) -> dict[str, typing.Any] | None: - pass - - @abc.abstractmethod - def send_heartbeat(self) -> dict[str, typing.Any] | None: - pass - - @abc.abstractmethod - def get_abort_status(self) -> bool: - pass diff --git a/simvue/factory/proxy/offline.py b/simvue/factory/proxy/offline.py deleted file mode 100644 index 1386c506..00000000 --- a/simvue/factory/proxy/offline.py +++ /dev/null @@ -1,227 +0,0 @@ -import glob -import json -import logging -import os -import pathlib -import time -import typing -import uuid -import unicodedata -import randomname - -from simvue.factory.proxy.base import SimvueBaseClass -from simvue.config.user import SimvueConfiguration -from simvue.utilities import ( - create_file, - prepare_for_api, - skip_if_failed, -) - -logger = logging.getLogger(__name__) - - -class Offline(SimvueBaseClass): - """ - Class for offline runs - """ - - def __init__( - self, - name: str | None, - uniq_id: str, - config: SimvueConfiguration, - suppress_errors: bool = True, - ) -> None: - super().__init__(name=name, uniq_id=uniq_id, suppress_errors=suppress_errors) - - _offline_dir: pathlib.Path = config.offline.cache - self._directory: str = _offline_dir.joinpath(self._uuid) - - os.makedirs(self._directory, exist_ok=True) - - @skip_if_failed("_aborted", "_suppress_errors", None) - def _write_json(self, filename: str, data: dict[str, typing.Any]) -> None: - """ - Write JSON to file - """ - if not os.path.isdir(os.path.dirname(filename)): - self._error( - f"Cannot write file '{filename}', parent directory does not exist" - ) - - try: - with open(filename, "w") as fh: - json.dump(data, fh) - except Exception as err: - self._error(f"Unable to write file {filename} due to {str(err)}") - - @skip_if_failed("_aborted", "_suppress_errors", None) - def _mock_api_post( - self, prefix: str, data: dict[str, typing.Any] - ) -> dict[str, typing.Any] | None: - unique_id = time.time() - filename = os.path.join(self._directory, f"{prefix}-{unique_id}.json") - if not data.get("id"): - data["id"] = f"{unique_id}" - self._write_json(filename, data) - return data - - @staticmethod - def _generate_random_name() -> str: - """Generates a random name with only valid ASCII characters.""" - return ( - unicodedata.normalize("NFKD", randomname.get_name()) - .encode("ascii", "ignore") - .decode("ascii") - ) - - @skip_if_failed("_aborted", "_suppress_errors", (None, None)) - def create_run(self, data) -> tuple[str, str | None]: - """ - Create a run - """ - if not self._directory: - self._logger.error("No directory specified") - return (None, None) - - if not self.name: - self.name = self._generate_random_name() - - try: - os.makedirs(self._directory, exist_ok=True) - except Exception as err: - self._logger.error( - "Unable to create directory %s due to: %s", self._directory, str(err) - ) - return (None, None) - - filename = f"{self._directory}/run.json" - - logger.debug(f"Creating run in '{filename}'") - - if "name" not in data: - data["name"] = None - - self._write_json(filename, data) - - status = data["status"] - filename = f"{self._directory}/{status}" - create_file(filename) - - return self.name, self.id - - @skip_if_failed("_aborted", "_suppress_errors", None) - def update(self, data) -> dict[str, typing.Any] | None: - """ - Update metadata, tags or status - """ - unique_id = time.time() - filename = f"{self._directory}/update-{unique_id}.json" - self._write_json(filename, data) - - if "status" in data: - status = data["status"] - if not self._directory or not os.path.exists(self._directory): - self._error("No directory defined for writing") - return None - filename = f"{self._directory}/{status}" - - logger.debug(f"Writing API data to file '{filename}'") - - create_file(filename) - - if status == "completed": - status_running = f"{self._directory}/running" - if os.path.isfile(status_running): - os.remove(status_running) - - return data - - @skip_if_failed("_aborted", "_suppress_errors", None) - def set_folder_details(self, data) -> dict[str, typing.Any] | None: - """ - Set folder details - """ - unique_id = time.time() - filename = f"{self._directory}/folder-{unique_id}.json" - self._write_json(filename, data) - return data - - @skip_if_failed("_aborted", "_suppress_errors", None) - def save_file(self, data: dict[str, typing.Any]) -> dict[str, typing.Any] | None: - """ - Save file - """ - if "pickled" in data: - temp_file = f"{self._directory}/temp-{uuid.uuid4()}.pickle" - with open(temp_file, "wb") as fh: - fh.write(data["pickled"]) - data["pickledFile"] = temp_file - unique_id = time.time() - filename = os.path.join(self._directory, f"file-{unique_id}.json") - self._write_json(filename, prepare_for_api(data, False)) - return data - - def add_alert(self, data: dict[str, typing.Any]) -> dict[str, typing.Any] | None: - """ - Add an alert - """ - return self._mock_api_post("alert", data) - - @skip_if_failed("_aborted", "_suppress_errors", None) - def set_alert_state( - self, alert_id: str, status: str - ) -> dict[str, typing.Any] | None: - if not os.path.exists( - _alert_file := os.path.join(self._directory, f"alert-{alert_id}.json") - ): - self._error(f"Failed to retrieve alert '{alert_id}' for modification") - return None - - with open(_alert_file) as alert_in: - _alert_data = json.load(alert_in) - - _alert_data |= {"run": self.id, "alert": alert_id, "status": status} - - self._write_json(_alert_file, _alert_data) - - return _alert_data - - @skip_if_failed("_aborted", "_suppress_errors", []) - def list_tags(self) -> list[dict[str, typing.Any]]: - # TODO: Tag retrieval not implemented for offline running - raise NotImplementedError( - "Retrieval of current tags is not implemented for offline running" - ) - - @skip_if_failed("_aborted", "_suppress_errors", True) - def get_abort_status(self) -> bool: - # TODO: Abort on failure not implemented for offline running - return True - - @skip_if_failed("_aborted", "_suppress_errors", []) - def list_alerts(self) -> list[dict[str, typing.Any]]: - return [ - json.load(open(alert_file)) - for alert_file in glob.glob(os.path.join(self._directory, "alert-*.json")) - ] - - def send_metrics(self, data: dict[str, typing.Any]) -> dict[str, typing.Any] | None: - """ - Send metrics - """ - return self._mock_api_post("metrics", data) - - def send_event(self, data: dict[str, typing.Any]) -> dict[str, typing.Any] | None: - """ - Send event - """ - return self._mock_api_post("event", data) - - @skip_if_failed("_aborted", "_suppress_errors", None) - def send_heartbeat(self) -> dict[str, typing.Any] | None: - logger.debug( - f"Creating heartbeat file: {os.path.join(self._directory, 'heartbeat')}" - ) - pathlib.Path(os.path.join(self._directory, "heartbeat")).touch() - return {"success": True} diff --git a/simvue/factory/proxy/remote.py b/simvue/factory/proxy/remote.py deleted file mode 100644 index ea268606..00000000 --- a/simvue/factory/proxy/remote.py +++ /dev/null @@ -1,490 +0,0 @@ -import logging -import typing -import http - -if typing.TYPE_CHECKING: - from simvue.config.user import SimvueConfiguration - -from simvue.api.request import get, post, put -from simvue.factory.proxy.base import SimvueBaseClass -from simvue.utilities import prepare_for_api, skip_if_failed -from simvue.version import __version__ - -logger = logging.getLogger(__name__) - -UPLOAD_TIMEOUT: int = 30 -DEFAULT_API_TIMEOUT: int = 10 - - -class Remote(SimvueBaseClass): - """ - Class which interacts with Simvue REST API - """ - - def __init__( - self, - name: str | None, - uniq_id: str, - config: "SimvueConfiguration", - suppress_errors: bool = True, - ) -> None: - self._user_config = config - - self._headers: dict[str, str] = { - "Authorization": f"Bearer {self._user_config.server.token}", - "User-Agent": f"Simvue Python client {__version__}", - } - self._headers_mp: dict[str, str] = self._headers | { - "Content-Type": "application/msgpack" - } - super().__init__(name, uniq_id, suppress_errors) - - self.id = uniq_id - - @skip_if_failed("_aborted", "_suppress_errors", None) - def list_tags(self) -> list[str]: - logger.debug("Retrieving existing tags") - try: - response = get( - f"{self._user_config.server.url}/runs/{self.id}", self._headers - ) - except Exception as err: - self._error(f"Exception retrieving tags: {str(err)}") - return [] - - logger.debug( - 'Got status code %d when retrieving tags: "%s"', - response.status_code, - response.text, - ) - - if not (response_data := response.json()) or ( - (data := response_data.get("tags")) is None - ): - self._error( - "Expected key 'tags' in response from server during alert retrieval" - ) - return [] - - return data if response.status_code == http.HTTPStatus.OK else [] - - @skip_if_failed("_aborted", "_suppress_errors", (None, None)) - def create_run(self, data) -> tuple[str, str | None]: - """ - Create a run - """ - if data.get("folder") != "/": - logger.debug("Creating folder %s if necessary", data.get("folder")) - try: - response = post( - f"{self._user_config.server.url}/folders", - self._headers, - {"path": data.get("folder")}, - ) - except Exception as err: - self._error(f"Exception creating folder: {str(err)}") - return (None, None) - - logger.debug( - 'Got status code %d when creating folder, with response: "%s"', - response.status_code, - response.text, - ) - - if response.status_code not in ( - http.HTTPStatus.OK, - http.HTTPStatus.CONFLICT, - ): - self._error(f"Unable to create folder {data.get('folder')}") - return (None, None) - - logger.debug('Creating run with data: "%s"', data) - - try: - response = post(f"{self._user_config.server.url}/runs", self._headers, data) - except Exception as err: - self._error(f"Exception creating run: {str(err)}") - return (None, None) - - logger.debug( - 'Got status code %d when creating run, with response: "%s"', - response.status_code, - response.text, - ) - - if response.status_code == http.HTTPStatus.CONFLICT: - self._error(f"Duplicate run, name {data['name']} already exists") - return (None, None) - elif response.status_code != http.HTTPStatus.OK: - self._error(f"Got status code {response.status_code} when creating run") - return (None, None) - - if "name" in response.json(): - self.name = response.json()["name"] - - if "id" in response.json(): - self.id = response.json()["id"] - - return self.name, self.id - - @skip_if_failed("_aborted", "_suppress_errors", None) - def update( - self, data: dict[str, typing.Any], _=None - ) -> dict[str, typing.Any] | None: - """ - Update metadata, tags or status - """ - if self.id: - data["id"] = self.id - - logger.debug('Updating run with data: "%s"', data) - - try: - response = put(f"{self._user_config.server.url}/runs", self._headers, data) - except Exception as err: - self._error(f"Exception updating run: {err}") - return None - - logger.debug( - 'Got status code %d when updating run, with response: "%s"', - response.status_code, - response.text, - ) - - if response.status_code == http.HTTPStatus.OK: - return data - - self._error(f"Got status code {response.status_code} when updating run") - return None - - @skip_if_failed("_aborted", "_suppress_errors", None) - def set_folder_details(self, data, run=None) -> dict[str, typing.Any] | None: - """ - Set folder details - """ - if run is not None and not __version__: - data["name"] = run - - try: - response = post( - f"{self._user_config.server.url}/folders", self._headers, data - ) - except Exception as err: - self._error(f"Exception creating folder: {err}") - return None - - if response.status_code in (http.HTTPStatus.OK, http.HTTPStatus.CONFLICT): - folder_id = response.json()["id"] - data["id"] = folder_id - - if response.status_code == http.HTTPStatus.OK: - logger.debug('Got id of new folder: "%s"', folder_id) - else: - logger.debug('Got id of existing folder: "%s"', folder_id) - - logger.debug('Setting folder details with data: "%s"', data) - - try: - response = put( - f"{self._user_config.server.url}/folders", self._headers, data - ) - except Exception as err: - self._error(f"Exception setting folder details: {err}") - return None - - logger.debug( - 'Got status code %d when setting folder details, with response: "%s"', - response.status_code, - response.text, - ) - - if response.status_code == http.HTTPStatus.OK: - return response.json() - - self._error( - f"Got status code {response.status_code} when updating folder details" - ) - return None - - @skip_if_failed("_aborted", "_suppress_errors", False) - def save_file(self, data: dict[str, typing.Any]) -> dict[str, typing.Any] | None: - """ - Save file - """ - logger.debug('Getting presigned URL for saving artifact, with data: "%s"', data) - - # Get presigned URL - try: - response = post( - f"{self._user_config.server.url}/artifacts", - self._headers, - prepare_for_api(data), - ) - except Exception as err: - self._error( - f"Got exception when preparing to upload file {data['name']} to object storage: {str(err)}" - ) - return None - - logger.debug( - 'Got status code %d when getting presigned URL, with response: "%s"', - response.status_code, - response.text, - ) - - if response.status_code == http.HTTPStatus.CONFLICT: - return data - - if response.status_code != http.HTTPStatus.OK: - self._error( - f"Got status code {response.status_code} when registering file {data['name']}" - ) - return None - - storage_id = None - if "storage_id" in response.json(): - storage_id = response.json()["storage_id"] - - if not storage_id: - return None - - if "url" in response.json(): - url = response.json()["url"] - if "pickled" in data and "pickledFile" not in data: - try: - response = put( - url, {}, data["pickled"], is_json=False, timeout=UPLOAD_TIMEOUT - ) - - logger.debug( - "Got status code %d when uploading artifact", - response.status_code, - ) - - if response.status_code != http.HTTPStatus.OK: - self._error( - f"Got status code {response.status_code} when uploading object {data['name']} to object storage" - ) - return None - except Exception as err: - self._error( - f"Got exception when uploading object {data['name']} to object storage: {str(err)}" - ) - return None - else: - use_filename = data.get("pickledFile", data["originalPath"]) - try: - with open(use_filename, "rb") as fh: - response = put( - url, {}, fh, is_json=False, timeout=UPLOAD_TIMEOUT - ) - - logger.debug( - "Got status code %d when uploading artifact", - response.status_code, - ) - - if response.status_code != http.HTTPStatus.OK: - self._error( - f"Got status code {response.status_code} when uploading file {data['name']} to object storage" - ) - return None - except Exception as err: - self._error( - f"Got exception when uploading file {data['name']} to object storage: {str(err)}" - ) - return None - - if storage_id: - path = f"{self._user_config.server.url}/runs/{self.id}/artifacts" - data["storage"] = storage_id - - try: - response = put(path, self._headers, prepare_for_api(data)) - except Exception as err: - self._error( - f"Got exception when confirming upload of file {data['name']}: {str(err)}" - ) - return None - - if response.status_code != http.HTTPStatus.OK: - self._error( - f"Got status code {response.status_code} when confirming upload of file {data['name']}: {response.text}" - ) - return None - - return data - - @skip_if_failed("_aborted", "_suppress_errors", False) - def add_alert(self, data, run=None): - """ - Add an alert - """ - if run is not None: - data["run"] = run - - logger.debug('Adding alert with data: "%s"', data) - - try: - response = post( - f"{self._user_config.server.url}/alerts", self._headers, data - ) - except Exception as err: - self._error(f"Got exception when creating an alert: {str(err)}") - return False - - logger.debug( - 'Got response %d when adding alert, with response: "%s"', - response.status_code, - response.text, - ) - - if response.status_code in (http.HTTPStatus.OK, http.HTTPStatus.CONFLICT): - return response.json() - - self._error(f"Got status code {response.status_code} when creating alert") - return False - - @skip_if_failed("_aborted", "_suppress_errors", {}) - def set_alert_state(self, alert_id, status) -> dict[str, typing.Any] | None: - """ - Set alert state - """ - data = {"run": self.id, "alert": alert_id, "status": status} - try: - response = put( - f"{self._user_config.server.url}/alerts/status", self._headers, data - ) - except Exception as err: - self._error(f"Got exception when setting alert state: {err}") - return {} - - return response.json() if response.status_code == http.HTTPStatus.OK else {} - - @skip_if_failed("_aborted", "_suppress_errors", []) - def list_alerts(self) -> list[dict[str, typing.Any]]: - """ - List alerts - """ - try: - response = get(f"{self._user_config.server.url}/alerts", self._headers) - except Exception as err: - self._error(f"Got exception when listing alerts: {str(err)}") - return [] - - if not (response_data := response.json()) or ( - (data := response_data.get("data")) is None - ): - self._error( - "Expected key 'alerts' in response from server during alert retrieval" - ) - return [] - - return data if response.status_code == http.HTTPStatus.OK else [] - - @skip_if_failed("_aborted", "_suppress_errors", None) - def send_metrics(self, data: dict[str, typing.Any]) -> dict[str, typing.Any] | None: - """ - Send metrics - """ - logger.debug("Sending metrics") - - try: - response = post( - f"{self._user_config.server.url}/metrics", - self._headers_mp, - data, - is_json=False, - ) - except Exception as err: - self._error(f"Exception sending metrics: {str(err)}") - return None - - logger.debug("Got status code %d when sending metrics", response.status_code) - - if response.status_code == http.HTTPStatus.OK: - return response.json() - - self._error(f"Got status code {response.status_code} when sending metrics") - return None - - @skip_if_failed("_aborted", "_suppress_errors", None) - def send_event(self, data: dict[str, typing.Any]) -> dict[str, typing.Any] | None: - """ - Send events - """ - logger.debug("Sending events") - - try: - response = post( - f"{self._user_config.server.url}/events", - self._headers_mp, - data, - is_json=False, - ) - except Exception as err: - self._error(f"Exception sending event: {str(err)}") - return None - - logger.debug("Got status code %d when sending events", response.status_code) - - if response.status_code == http.HTTPStatus.OK: - return response.json() - - self._error(f"Got status code {response.status_code} when sending events") - return None - - @skip_if_failed("_aborted", "_suppress_errors", None) - def send_heartbeat(self) -> dict[str, typing.Any] | None: - """ - Send heartbeat - """ - logger.debug("Sending heartbeat") - - try: - response = put( - f"{self._user_config.server.url}/runs/heartbeat", - self._headers, - {"id": self.id}, - ) - except Exception as err: - self._error(f"Exception creating run: {str(err)}") - return None - - logger.debug("Got status code %d when sending heartbeat", response.status_code) - - if response.status_code == http.HTTPStatus.OK: - return response.json() - - self._error(f"Got status code {response.status_code} when sending heartbeat") - return None - - @skip_if_failed("_aborted", "_suppress_errors", False) - def get_abort_status(self) -> bool: - logger.debug("Retrieving abort status") - - try: - response = get( - f"{self._user_config.server.url}/runs/{self.id}/abort", - self._headers_mp, - ) - except Exception as err: - self._error(f"Exception retrieving abort status: {str(err)}") - return False - - logger.debug( - "Got status code %d when checking abort status", response.status_code - ) - - if response.status_code == http.HTTPStatus.OK: - if (status := response.json().get("status")) is None: - self._error( - f"Expected key 'status' when retrieving abort status {response.json()}" - ) - return False - return status - - self._error( - f"Got status code {response.status_code} when checking abort status" - ) - return False