diff --git a/.github/workflows/test_client_ubuntu.yml b/.github/workflows/test_client_ubuntu.yml index 76c53121..d9c8d173 100644 --- a/.github/workflows/test_client_ubuntu.yml +++ b/.github/workflows/test_client_ubuntu.yml @@ -18,9 +18,9 @@ concurrency: cancel-in-progress: true jobs: - online_unit_tests: + object_retrieval: runs-on: ubuntu-latest - timeout-minutes: 40 + timeout-minutes: 30 steps: - uses: actions/checkout@v4 - name: Set up Python 3.13 @@ -40,12 +40,12 @@ jobs: SIMVUE_URL: ${{ secrets.SIMVUE_URL }} SIMVUE_TOKEN: ${{ secrets.SIMVUE_TOKEN }} run: >- - python -m pytest tests/unit/ -x - -m online -c /dev/null -p no:warnings + python -m pytest -x + -m object_retrieval -c /dev/null -p no:warnings -n 0 -v -o cache_dir=${GITHUB_WORKSPACE}/.pytest-cache - offline_unit_tests: + object_removal: runs-on: ubuntu-latest - timeout-minutes: 40 + timeout-minutes: 30 steps: - uses: actions/checkout@v4 - name: Set up Python 3.13 @@ -65,12 +65,12 @@ jobs: SIMVUE_URL: ${{ secrets.SIMVUE_URL }} SIMVUE_TOKEN: ${{ secrets.SIMVUE_TOKEN }} run: >- - python -m pytest tests/unit/ -x - -m offline -c /dev/null -p no:warnings + python -m pytest -x + -m object_removal -c /dev/null -p no:warnings -n 0 -v -o cache_dir=${GITHUB_WORKSPACE}/.pytest-cache - online_functional_tests: + dispatch_tests: runs-on: ubuntu-latest - timeout-minutes: 40 + timeout-minutes: 30 steps: - uses: actions/checkout@v4 - name: Set up Python 3.13 @@ -82,7 +82,6 @@ jobs: python -m pip install poetry poetry self add poetry-plugin-export poetry export -f requirements.txt --with dev -o requirements.txt --all-extras - python -m pip install torch --index-url https://download.pytorch.org/whl/cpu python -m pip install -r requirements.txt python -m pip install . - name: Test with pytest @@ -90,12 +89,12 @@ jobs: SIMVUE_URL: ${{ secrets.SIMVUE_URL }} SIMVUE_TOKEN: ${{ secrets.SIMVUE_TOKEN }} run: >- - python -m pytest tests/functional/ -x - -m online -m "not eco" -c /dev/null -p no:warnings + python -m pytest -x + -m dispatch -c /dev/null -p no:warnings -n 0 -v -o cache_dir=${GITHUB_WORKSPACE}/.pytest-cache - offline_functional_tests: + run_tests_online: runs-on: ubuntu-latest - timeout-minutes: 40 + timeout-minutes: 30 steps: - uses: actions/checkout@v4 - name: Set up Python 3.13 @@ -115,12 +114,12 @@ jobs: SIMVUE_URL: ${{ secrets.SIMVUE_URL }} SIMVUE_TOKEN: ${{ secrets.SIMVUE_TOKEN }} run: >- - python -m pytest tests/functional/ -x - -m offline -c /dev/null -p no:warnings + python -m pytest -x + -m run -m online -c /dev/null -p no:warnings -n 0 -v -o cache_dir=${GITHUB_WORKSPACE}/.pytest-cache - other_unit_tests: + run_tests_offline: runs-on: ubuntu-latest - timeout-minutes: 40 + timeout-minutes: 30 steps: - uses: actions/checkout@v4 - name: Set up Python 3.13 @@ -140,13 +139,84 @@ jobs: SIMVUE_URL: ${{ secrets.SIMVUE_URL }} SIMVUE_TOKEN: ${{ secrets.SIMVUE_TOKEN }} run: >- - python -m pytest tests/unit/ -x - -m 'not offline' -m 'not online' - -m 'not scenario' -c /dev/null - -p no:warnings -n 0 -v -o cache_dir=${GITHUB_WORKSPACE}/.pytest-cache - other_functional_tests: + python -m pytest -x + -m run -m offline -c /dev/null -p no:warnings + -n 0 -v -o cache_dir=${GITHUB_WORKSPACE}/.pytest-cache + config_tests: runs-on: ubuntu-latest - timeout-minutes: 40 + timeout-minutes: 30 + steps: + - uses: actions/checkout@v4 + - name: Set up Python 3.13 + uses: actions/setup-python@v5 + with: + python-version: "3.13" + - name: Install dependencies + run: | + python -m pip install poetry + poetry self add poetry-plugin-export + poetry export -f requirements.txt --with dev -o requirements.txt --all-extras + python -m pip install -r requirements.txt + python -m pip install . + - name: Test with pytest + env: + SIMVUE_URL: ${{ secrets.SIMVUE_URL }} + SIMVUE_TOKEN: ${{ secrets.SIMVUE_TOKEN }} + run: >- + python -m pytest -x + -m config -c /dev/null -p no:warnings + -n 0 -v -o cache_dir=${GITHUB_WORKSPACE}/.pytest-cache + executor_tests: + runs-on: ubuntu-latest + timeout-minutes: 30 + steps: + - uses: actions/checkout@v4 + - name: Set up Python 3.13 + uses: actions/setup-python@v5 + with: + python-version: "3.13" + - name: Install dependencies + run: | + python -m pip install poetry + poetry self add poetry-plugin-export + poetry export -f requirements.txt --with dev -o requirements.txt --all-extras + python -m pip install -r requirements.txt + python -m pip install . + - name: Test with pytest + env: + SIMVUE_URL: ${{ secrets.SIMVUE_URL }} + SIMVUE_TOKEN: ${{ secrets.SIMVUE_TOKEN }} + run: >- + python -m pytest -x + -m executor -c /dev/null -p no:warnings + -n 0 -v -o cache_dir=${GITHUB_WORKSPACE}/.pytest-cache + api_tests: + runs-on: ubuntu-latest + timeout-minutes: 30 + steps: + - uses: actions/checkout@v4 + - name: Set up Python 3.13 + uses: actions/setup-python@v5 + with: + python-version: "3.13" + - name: Install dependencies + run: | + python -m pip install poetry + poetry self add poetry-plugin-export + poetry export -f requirements.txt --with dev -o requirements.txt --all-extras + python -m pip install -r requirements.txt + python -m pip install . + - name: Test with pytest + env: + SIMVUE_URL: ${{ secrets.SIMVUE_URL }} + SIMVUE_TOKEN: ${{ secrets.SIMVUE_TOKEN }} + run: >- + python -m pytest -x + -m api -c /dev/null -p no:warnings + -n 0 -v -o cache_dir=${GITHUB_WORKSPACE}/.pytest-cache + local_tests: + runs-on: ubuntu-latest + timeout-minutes: 30 steps: - uses: actions/checkout@v4 - name: Set up Python 3.13 @@ -158,7 +228,6 @@ jobs: python -m pip install poetry poetry self add poetry-plugin-export poetry export -f requirements.txt --with dev -o requirements.txt --all-extras - python -m pip install torch --index-url https://download.pytorch.org/whl/cpu python -m pip install -r requirements.txt python -m pip install . - name: Test with pytest @@ -166,7 +235,6 @@ jobs: SIMVUE_URL: ${{ secrets.SIMVUE_URL }} SIMVUE_TOKEN: ${{ secrets.SIMVUE_TOKEN }} run: >- - python -m pytest tests/functional/ -x - -m 'not offline' -m 'not online' - -m 'not scenario' -c /dev/null - -p no:warnings -n 0 -v -o cache_dir=${GITHUB_WORKSPACE}/.pytest-cache + python -m pytest -x + -m local -c /dev/null -p no:warnings + -n 0 -v -o cache_dir=${GITHUB_WORKSPACE}/.pytest-cache diff --git a/CHANGELOG.md b/CHANGELOG.md index 86394d3b..59812f1f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,9 @@ * Fixed bug in pagination whereby the count value specified by the user is ignored. * Fixed bug where uploading larger files timed out leading to file of size 0B. * Fixed bug where if the range or threshold of an alert is zero the alert type validation fails. +* Fixed bug in `Folder.ids` where `kwargs` were not being passed to `GET`. +* Ensured all threads have `daemon=True` to prevent hanging on termination. +* Added error when `close()` method is called within the `simvue.Run` context manager. ## [v2.1.1](https://github.com/simvue-io/client/releases/tag/v2.1.1) - 2025-04-25 * Changed from CO2 Signal to ElectricityMaps * Fixed a number of bugs in how offline mode is handled with emissions diff --git a/pyproject.toml b/pyproject.toml index 77212d32..b724ce9e 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -97,7 +97,6 @@ testpaths = [ markers = [ "eco: tests for emission metrics", "client: tests of Simvue client", - "converters: tests for Simvue object converters", "dispatch: test data dispatcher", "run: test the simvue Run class", "utilities: test simvue utilities module", @@ -107,10 +106,11 @@ markers = [ "api: tests of RestAPI functionality", "unix: tests for UNIX systems only", "metadata: tests of metadata gathering functions", - "proxies: tests for remote/offline Simvue proxies", "online: tests for online functionality", "offline: tests for offline functionality", - "local: tests of functionality which do not involve a server or writing to an offline cache file" + "local: tests of functionality which do not involve a server or writing to an offline cache file", + "object_retrieval: tests relating to retrieval of objects from the server", + "object_removal: tests relating to removal of objects from the server", ] [tool.interrogate] diff --git a/simvue/api/objects/base.py b/simvue/api/objects/base.py index d99db66d..e2a81111 100644 --- a/simvue/api/objects/base.py +++ b/simvue/api/objects/base.py @@ -365,7 +365,7 @@ def ids( """ _class_instance = cls(_read_only=True, _local=True) _count: int = 0 - for response in cls._get_all_objects(offset, count=count): + for response in cls._get_all_objects(offset, count=count, **kwargs): if (_data := response.get("data")) is None: raise RuntimeError( f"Expected key 'data' for retrieval of {_class_instance.__class__.__name__.lower()}s" @@ -412,6 +412,10 @@ def get( f"Expected key 'data' for retrieval of {_class_instance.__class__.__name__.lower()}s" ) + # If data is an empty list + if not _data: + return + for entry in _data: _id = entry["id"] yield _id, cls(_read_only=True, identifier=_id, _local=True, **entry) @@ -473,7 +477,7 @@ def read_only(self, is_read_only: bool) -> None: if not self._read_only: self._staging = self._get_local_staged() - def commit(self) -> None: + def commit(self) -> dict | None: """Send updates to the server, or if offline, store locally.""" if self._read_only: raise AttributeError("Cannot commit object in 'read-only' mode") @@ -485,22 +489,26 @@ def commit(self) -> None: self._cache() return + _response: dict | None = None + # Initial commit is creation of object # if staging is empty then we do not need to use PUT if not self._identifier or self._identifier.startswith("offline_"): self._logger.debug( f"Posting from staged data for {self._label} '{self.id}': {self._staging}" ) - self._post(**self._staging) + _response = self._post(**self._staging) elif self._staging: self._logger.debug( f"Pushing updates from staged data for {self._label} '{self.id}': {self._staging}" ) - self._put(**self._staging) + _response = self._put(**self._staging) # Clear staged changes self._clear_staging() + return _response + @property def id(self) -> str | None: """The identifier for this object if applicable. @@ -686,3 +694,15 @@ def staged(self) -> dict[str, typing.Any] | None: the locally staged data if available. """ return self._staging or None + + def __str__(self) -> str: + """String representation of Simvue object.""" + return f"{self.__class__.__name__}({self.id=})" + + def __repr__(self) -> str: + _out_str = f"{self.__class__.__module__}.{self.__class__.__qualname__}(" + _out_str += ", ".join( + f"{property}={getattr(self, property)!r}" for property in self._properties + ) + _out_str += ")" + return _out_str diff --git a/simvue/api/url.py b/simvue/api/url.py index a22480a4..92d6a8fd 100644 --- a/simvue/api/url.py +++ b/simvue/api/url.py @@ -37,6 +37,11 @@ def __truediv__(self, other: str) -> Self: _new /= other return _new + def __repr__(self) -> str: + """Representation of URL""" + _out_str = f"{self.__class__.__module__}.{self.__class__.__qualname__}" + return f"{_out_str}(url={self.__str__()!r})" + @pydantic.validate_call def __itruediv__(self, other: str) -> Self: """Define URL extension through use of '/'""" diff --git a/simvue/client.py b/simvue/client.py index 3916d800..1d129e03 100644 --- a/simvue/client.py +++ b/simvue/client.py @@ -32,7 +32,16 @@ from .models import FOLDER_REGEX, NAME_REGEX from .config.user import SimvueConfiguration from .api.request import get_json_from_response -from .api.objects import Run, Folder, Tag, Artifact, Alert, FileArtifact, ObjectArtifact +from .api.objects import ( + Run, + Folder, + Tag, + Artifact, + Alert, + FileArtifact, + ObjectArtifact, + get_folder_from_path, +) CONCURRENT_DOWNLOADS = 10 @@ -350,10 +359,18 @@ def _get_folder_id_from_path(self, path: str) -> str | None: _ids = Folder.ids(filters=json.dumps([f"path == {path}"])) try: - return next(_ids) + _id = next(_ids) except StopIteration: return None + with contextlib.suppress(StopIteration): + next(_ids) + raise RuntimeError( + f"Expected single folder match for '{path}', but found duplicate." + ) + + return _id + @prettify_pydantic @pydantic.validate_call def delete_runs( @@ -422,13 +439,17 @@ def delete_folder( if allow_missing: return None else: - raise RuntimeError( - f"Deletion of folder '{folder_path}' failed, folder does not exist." + raise ObjectNotFoundError( + name=folder_path, + obj_type="folder", ) _response = Folder(identifier=folder_id).delete( delete_runs=remove_runs, recursive=recursive, runs_only=False ) + if folder_id not in _response.get("folders", []): + raise RuntimeError("Deletion of folder failed, server returned mismatch.") + return _response.get("runs", []) @prettify_pydantic @@ -622,7 +643,9 @@ def get_artifacts_as_files( Artifact.from_run(run_id=run_id, category=category) ) - with ThreadPoolExecutor(CONCURRENT_DOWNLOADS) as executor: + with ThreadPoolExecutor( + CONCURRENT_DOWNLOADS, thread_name_prefix=f"get_artifacts_run_{run_id}" + ) as executor: futures = [ executor.submit(_download_artifact_to_file, artifact, output_dir) for _, artifact in _artifacts @@ -665,17 +688,12 @@ def get_folder( RuntimeError if there was a failure when retrieving information from the server """ - _folders: typing.Generator[tuple[str, Folder], None, None] = Folder.get( - filters=json.dumps([f"path == {folder_path}"]) - ) # type: ignore - try: - _, _folder = next(_folders) - if not read_only: - _folder.read_only(read_only) - return _folder - except StopIteration: + _folder = get_folder_from_path(path=folder_path) + except ObjectNotFoundError: return None + _folder.read_only(is_read_only=read_only) + return _folder @pydantic.validate_call def get_folders( diff --git a/simvue/executor.py b/simvue/executor.py index 6f8fef5d..6ce5f2d2 100644 --- a/simvue/executor.py +++ b/simvue/executor.py @@ -80,6 +80,8 @@ def trigger_check( thread_out = threading.Thread( target=trigger_check, args=(completion_callback, completion_trigger, _result), + daemon=True, + name=f"{proc_id}_Thread", ) thread_out.start() diff --git a/simvue/factory/dispatch/__init__.py b/simvue/factory/dispatch/__init__.py index 9908c47c..490b34c5 100644 --- a/simvue/factory/dispatch/__init__.py +++ b/simvue/factory/dispatch/__init__.py @@ -23,6 +23,7 @@ def Dispatcher( callback: typing.Callable[[list[typing.Any], str, dict[str, typing.Any]], None], object_types: list[str], termination_trigger: "Event", + name: str | None = None, **kwargs, ) -> "DispatcherBaseClass": """Returns instance of dispatcher based on configuration @@ -43,6 +44,8 @@ def Dispatcher( categories, this is mainly used for creation of queues in a QueueDispatcher termination_trigger : Event event which triggers termination of the dispatcher + name : str | None, optional + name for the underlying thread, default None Returns ------- @@ -63,5 +66,6 @@ def Dispatcher( callback=callback, object_types=object_types, termination_trigger=termination_trigger, + name=name, **kwargs, ) diff --git a/simvue/factory/dispatch/queued.py b/simvue/factory/dispatch/queued.py index ae5b094e..ba042d6d 100644 --- a/simvue/factory/dispatch/queued.py +++ b/simvue/factory/dispatch/queued.py @@ -36,6 +36,7 @@ def __init__( callback: typing.Callable[[list[typing.Any], str], None], object_types: list[str], termination_trigger: threading.Event, + name: str | None = None, max_buffer_size: int = MAX_BUFFER_SIZE, max_read_rate: float = MAX_REQUESTS_PER_SECOND, ) -> None: @@ -51,6 +52,8 @@ def __init__( termination_trigger : threading.Event a threading event which when set declares that the dispatcher should terminate + name : str | None, optional + name for underlying thread, default None max_buffer_size : int maximum number of items allowed in created buffer. max_read_rate : float @@ -62,7 +65,7 @@ def __init__( object_types=object_types, termination_trigger=termination_trigger, ) - super().__init__() + super().__init__(name=name, daemon=True) self._termination_trigger = termination_trigger self._callback = callback diff --git a/simvue/run.py b/simvue/run.py index 891803eb..8c2808d5 100644 --- a/simvue/run.py +++ b/simvue/run.py @@ -151,6 +151,10 @@ def __init__( self._timer: float = 0 self._retention: float | None = None + # Keep track of if the Run class has been intialised + # through a context manager + self._context_manager_called: bool = False + self._testing: bool = False self._abort_on_alert: typing.Literal["run", "terminate", "ignore"] = "terminate" self._abort_callback: typing.Callable[[Self], None] | None = abort_callback @@ -209,6 +213,7 @@ def __init__( self._emissions_monitor: CO2Monitor | None = None def __enter__(self) -> Self: + self._context_manager_called = True return self def _handle_exception_throw( @@ -478,14 +483,14 @@ def _dispatch_callback( offline=self._user_config.run.mode == "offline", events=buffer, ) - _events.commit() + return _events.commit() else: _metrics = Metrics.new( run=self.id, offline=self._user_config.run.mode == "offline", metrics=buffer, ) - _metrics.commit() + return _metrics.commit() return _dispatch_callback @@ -539,7 +544,9 @@ def _start(self) -> bool: ) self._heartbeat_thread = threading.Thread( - target=self._create_heartbeat_callback() + target=self._create_heartbeat_callback(), + daemon=True, + name=f"{self.id}_heartbeat", ) except RuntimeError as e: @@ -1657,6 +1664,10 @@ def close(self) -> bool: bool whether close was successful """ + if self._context_manager_called: + self._error("Cannot call close method in context manager.") + return + self._executor.wait_for_completion() if not self._sv_obj: diff --git a/simvue/sender.py b/simvue/sender.py index ea5e6e68..d66ed249 100644 --- a/simvue/sender.py +++ b/simvue/sender.py @@ -201,7 +201,9 @@ def sender( for file_path in _offline_files: upload_cached_file(cache_dir, _obj_type, file_path, _id_mapping, _lock) else: - with ThreadPoolExecutor(max_workers=max_workers) as executor: + with ThreadPoolExecutor( + max_workers=max_workers, thread_name_prefix="sender_session_upload" + ) as executor: _results = executor.map( lambda file_path: upload_cached_file( cache_dir=cache_dir, @@ -230,7 +232,9 @@ def sender( ), ) else: - with ThreadPoolExecutor(max_workers=max_workers) as executor: + with ThreadPoolExecutor( + max_workers=max_workers, thread_name_prefix="sender_heartbeat" + ) as executor: _results = executor.map( lambda _heartbeat_file: send_heartbeat( file_path=_heartbeat_file, diff --git a/tests/conftest.py b/tests/conftest.py index 1a5cb71e..4d3bff27 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -27,6 +27,7 @@ def pytest_addoption(parser): parser.addoption("--debug-simvue", action="store_true", default=False) + parser.addoption("--retention-period", default="2 mins") class CountingLogHandler(logging.Handler): @@ -53,7 +54,7 @@ def clear_out_files() -> None: for file_obj in out_files: file_obj.unlink() - + @pytest.fixture() def offline_cache_setup(monkeypatch: monkeypatch.MonkeyPatch): # Will be executed before test @@ -63,7 +64,8 @@ def offline_cache_setup(monkeypatch: monkeypatch.MonkeyPatch): # Will be executed after test cache_dir.cleanup() monkeypatch.setenv("SIMVUE_OFFLINE_DIRECTORY", None) - + + @pytest.fixture def mock_co2_signal(monkeypatch: monkeypatch.MonkeyPatch) -> dict[str, dict | str]: _mock_data = { @@ -100,7 +102,7 @@ def _mock_location_info(self) -> None: monkeypatch.setattr(requests, "get", _mock_get) monkeypatch.setattr(sv_eco.APIClient, "_get_user_location_info", _mock_location_info) - + _fetch = sv_cfg.SimvueConfiguration.fetch @classmethod def _mock_fetch(cls, *args, **kwargs) -> sv_cfg.SimvueConfiguration: @@ -118,11 +120,13 @@ def speedy_heartbeat(monkeypatch: monkeypatch.MonkeyPatch) -> None: @pytest.fixture(autouse=True) -def setup_logging(pytestconfig) -> CountingLogHandler: +def setup_logging(pytestconfig, monkeypatch) -> CountingLogHandler: logging.basicConfig(level=logging.WARNING) handler = CountingLogHandler() logging.getLogger("simvue").setLevel(logging.DEBUG if pytestconfig.getoption("debug_simvue") else logging.WARNING) logging.getLogger("simvue").addHandler(handler) + if (_retention := pytestconfig.getoption("retention_period")): + monkeypatch.setenv("SIMVUE_TESTING_RETENTION_PERIOD", _retention) return handler @@ -158,14 +162,20 @@ def testing_exit(status: int) -> None: monkeypatch.setenv("SIMVUE_OFFLINE_DIRECTORY", temp_d) with sv_run.Run("offline") as run: yield run, setup_test_run(run, True, request) + with contextlib.suppress(ObjectNotFoundError): + sv_api_obj.Folder(identifier=run._folder.id).delete(recursive=True, delete_runs=True, runs_only=False) + for alert_id in _test_run_data.get("alert_ids", []): + with contextlib.suppress(ObjectNotFoundError): + sv_api_obj.Alert(identifier=alert_id).delete() clear_out_files() @pytest.fixture -def create_plain_run(request, prevent_script_exit) -> typing.Generator[typing.Tuple[sv_run.Run, dict], None, None]: +def create_plain_run(request, prevent_script_exit, mocker) -> typing.Generator[typing.Tuple[sv_run.Run, dict], None, None]: def testing_exit(status: int) -> None: raise SystemExit(status) with sv_run.Run() as run: + run.metric_spy = mocker.spy(run, "_get_internal_metrics") yield run, setup_test_run(run, False, request) clear_out_files() @@ -221,7 +231,7 @@ def setup_test_run(run: sv_run.Run, create_objects: bool, request: pytest.Fixtur tags=TEST_DATA["tags"], folder=TEST_DATA["folder"], visibility="tenant" if os.environ.get("CI") else None, - retention_period="1 hour", + retention_period=os.environ.get("SIMVUE_TESTING_RETENTION_PERIOD", "2 mins"), timeout=60, no_color=True, running=not created_only @@ -308,3 +318,4 @@ def setup_test_run(run: sv_run.Run, create_objects: bool, request: pytest.Fixtur TEST_DATA["alert_ids"] = _alert_ids return TEST_DATA + diff --git a/tests/functional/test_client.py b/tests/functional/test_client.py index 4d072e9c..966821e7 100644 --- a/tests/functional/test_client.py +++ b/tests/functional/test_client.py @@ -7,6 +7,8 @@ import glob import pathlib import time + +import requests import pytest_mock import tempfile import simvue.client as svc @@ -16,29 +18,34 @@ from simvue.api.objects.alert.base import AlertBase -@pytest.mark.dependency @pytest.mark.client +@pytest.mark.object_retrieval def test_get_events(create_test_run: tuple[sv_run.Run, dict]) -> None: client = svc.Client() assert client.get_events(run_id=create_test_run[1]["run_id"]) -@pytest.mark.dependency @pytest.mark.client +@pytest.mark.object_retrieval @pytest.mark.parametrize("from_run", (True, False), ids=("from_run", "all_runs")) @pytest.mark.parametrize("names_only", (True, False), ids=("names_only", "all_details")) @pytest.mark.parametrize( "critical_only", (True, False), ids=("critical_only", "all_states") ) def test_get_alerts( - create_plain_run: tuple[sv_run.Run, dict], from_run: bool, names_only: bool, critical_only: bool, ) -> None: - run, run_data = create_plain_run - run_id = run.id unique_id = f"{uuid.uuid4()}".split("-")[0] + run = sv_run.Run() + run.init( + "test_get_alerts", + folder=f"/simvue_unit_testing/{unique_id}", + tags=["test_get_alerts"], + retention_period="2 mins", + ) + run_id = run.id _id_1 = run.create_user_alert( name=f"user_alert_1_{unique_id}", ) @@ -90,8 +97,8 @@ def test_get_alerts( assert f"user_alert_2_{unique_id}" in _alerts -@pytest.mark.dependency @pytest.mark.client +@pytest.mark.object_retrieval def test_get_run_id_from_name(create_test_run: tuple[sv_run.Run, dict]) -> None: client = svc.Client() assert ( @@ -100,8 +107,8 @@ def test_get_run_id_from_name(create_test_run: tuple[sv_run.Run, dict]) -> None: ) -@pytest.mark.dependency @pytest.mark.client +@pytest.mark.object_retrieval @pytest.mark.parametrize( "aggregate,use_name_labels", [(True, False), (False, False), (False, True)], @@ -134,8 +141,8 @@ def test_get_metric_values( assert create_test_run[1]["run_id"] in _runs -@pytest.mark.dependency @pytest.mark.client +@pytest.mark.object_retrieval def test_plot_metrics(create_test_run: tuple[sv_run.Run, dict]) -> None: try: import matplotlib @@ -150,8 +157,8 @@ def test_plot_metrics(create_test_run: tuple[sv_run.Run, dict]) -> None: ) -@pytest.mark.dependency @pytest.mark.client +@pytest.mark.object_retrieval @pytest.mark.parametrize( "sorting", ([("metadata.test_identifier", True)], [("name", True), ("created", True)], None), @@ -167,8 +174,8 @@ def test_get_artifacts_entries( assert client.get_artifact(create_test_run[1]["run_id"], name="test_attributes") -@pytest.mark.dependency @pytest.mark.client +@pytest.mark.object_retrieval @pytest.mark.parametrize("file_id", (1, 2, 3), ids=lambda x: f"file_{x}") def test_get_artifact_as_file( create_test_run: tuple[sv_run.Run, dict], file_id: int @@ -186,8 +193,8 @@ def test_get_artifact_as_file( ) -@pytest.mark.dependency @pytest.mark.client +@pytest.mark.object_retrieval @pytest.mark.parametrize("category", (None, "code", "input", "output")) def test_get_artifacts_as_files( create_test_run: tuple[sv_run.Run, dict], @@ -216,8 +223,8 @@ def test_get_artifacts_as_files( assert create_test_run[1][file] not in files -@pytest.mark.dependency @pytest.mark.client +@pytest.mark.object_retrieval @pytest.mark.parametrize( "output_format,sorting", [ @@ -244,15 +251,15 @@ def test_get_runs( assert _result -@pytest.mark.dependency @pytest.mark.client +@pytest.mark.object_retrieval def test_get_run(create_test_run: tuple[sv_run.Run, dict]) -> None: client = svc.Client() assert client.get_run(run_id=create_test_run[1]["run_id"]) -@pytest.mark.dependency @pytest.mark.client +@pytest.mark.object_retrieval @pytest.mark.parametrize( "sorting", (None, [("metadata.test_identifier", True), ("path", True)], [("modified", False)]), @@ -268,25 +275,42 @@ def test_get_folders( assert client.get_folder(_folder.path) -@pytest.mark.dependency @pytest.mark.client +@pytest.mark.object_retrieval def test_get_metrics_names(create_test_run: tuple[sv_run.Run, dict]) -> None: client = svc.Client() - time.sleep(1) - assert list(client.get_metrics_names(create_test_run[1]["run_id"])) + attempts: int = 0 + + while ( + not list(client.get_metrics_names(create_test_run[1]["run_id"])) + and attempts < 10 + ): + time.sleep(1) + attempts += 1 + + if attempts >= 10: + raise AssertionError("Failed to retrieve metric name.") -@pytest.mark.dependency @pytest.mark.client +@pytest.mark.object_retrieval def test_get_tag(create_plain_run: tuple[sv_run.Run, dict]) -> None: _, run_data = create_plain_run client = svc.Client() - time.sleep(1.0) - assert any(tag.name == run_data["tags"][-1] for _, tag in client.get_tags()) + attempts: int = 0 + while ( + not any(tag.name == run_data["tags"][-1] for _, tag in client.get_tags()) + and attempts < 10 + ): + time.sleep(1) + attempts += 1 + + if attempts >= 10: + raise AssertionError("Failed to retrieve tag.") -@pytest.mark.dependency @pytest.mark.client +@pytest.mark.object_removal def test_run_deletion() -> None: run = sv_run.Run() run.init( @@ -303,8 +327,8 @@ def test_run_deletion() -> None: client.get_run(run.id) -@pytest.mark.dependency @pytest.mark.client +@pytest.mark.object_removal def test_runs_deletion() -> None: _runs = [sv_run.Run() for _ in range(5)] for i, run in enumerate(_runs): @@ -322,20 +346,35 @@ def test_runs_deletion() -> None: client.get_run(run.id) -@pytest.mark.dependency @pytest.mark.client -def test_get_tags(create_plain_run: tuple[sv_run.Run, dict]) -> None: - run, run_data = create_plain_run - tags = run_data["tags"] - run.close() - time.sleep(1.0) +@pytest.mark.object_retrieval +def test_get_tags() -> None: + _uuid = f"{uuid.uuid4()}".split("-")[0] + tags = ["simvue_unit_testing", "test_get_tags", "testing", _uuid] + + with sv_run.Run() as run: + run.init( + "test_get_tags", + folder=f"/simvue_unit_testing/{_uuid}", + tags=tags, + retention_period="2 mins" + ) + client = svc.Client() - retrieved = [t.name for _, t in client.get_tags()] - assert all(t in retrieved for t in tags) + attempts = 0 + while ( + not all(f in [t.name for _, t in client.get_tags()] for f in tags) + and attempts < 10 + ): + time.sleep(1) + attempts += 1 + + if attempts >= 10: + raise AssertionError("Failed to retrieve tags.") -@pytest.mark.dependency @pytest.mark.client +@pytest.mark.object_removal def test_folder_deletion() -> None: run = sv_run.Run() _temp_folder_id: str = f"{uuid.uuid4()}".split()[0] @@ -356,39 +395,44 @@ def test_folder_deletion() -> None: ) == 1 ) - time.sleep(10) - with pytest.raises(ObjectNotFoundError): - client.get_folder("/simvue_unit_testing/delete_me") + + # If the folder has been deleted then an ObjectNotFoundError should be raised + assert not client.get_folder(f"/simvue_unit_testing/{_temp_folder_id}") with pytest.raises(ObjectNotFoundError): client.get_run(run_id=run.id) @pytest.mark.client -def test_run_folder_metadata_find(create_plain_run: tuple[sv_run.Run, dict]) -> None: - run, run_data = create_plain_run - rand_val = random.randint(0, 1000) - run.set_folder_details(metadata={"atest": rand_val}) - run.close() - time.sleep(1.0) +@pytest.mark.object_retrieval +def test_run_folder_metadata_find() -> None: + _uuid: str = f"{uuid.uuid4()}".split()[0] + with sv_run.Run() as run: + run.init( + "test_run_folder_metadata_find", + tags=["test_run_folder_metadata_find", "testing"], + folder=(_folder := f"/simvue_unit_testing/{_uuid}"), + retention_period="2 mins" + ) + rand_val = random.randint(0, 1000) + run.set_folder_details(metadata={"atest": rand_val}) client = svc.Client() data = client.get_folders(filters=[f"metadata.atest == {rand_val}"]) - assert run_data["folder"] in [i.path for _, i in data] + assert _folder in [i.path for _, i in data] @pytest.mark.client +@pytest.mark.object_removal def test_tag_deletion() -> None: - run = sv_run.Run() - run.init( - name="test_folder_deletion", - folder="/simvue_unit_testing", - tags=["test_tag_deletion"], - retention_period="1 min", - ) - run.close() - unique_id = f"{uuid.uuid4()}".split("-")[0] - run.update_tags([(tag_str := f"delete_me_{unique_id}")]) - run.close() + with sv_run.Run() as run: + unique_id = f"{uuid.uuid4()}".split("-")[0] + run.init( + name="test_folder_deletion", + folder=f"/simvue_unit_testing/{unique_id}", + tags=["test_tag_deletion"], + retention_period="1 min", + ) + run.update_tags([(tag_str := f"delete_me_{unique_id}")]) client = svc.Client() tags = client.get_tags() client.delete_run(run.id) @@ -398,8 +442,8 @@ def test_tag_deletion() -> None: client.get_tag(tag_identifier) -@pytest.mark.dependency @pytest.mark.client +@pytest.mark.object_retrieval @pytest.mark.parametrize("aggregate", (True, False), ids=("aggregated", "normal")) @pytest.mark.parametrize("output_format", ("dict", "dataframe")) @pytest.mark.parametrize("xaxis", ("step", "time", "timestamp")) @@ -437,6 +481,7 @@ def test_multiple_metric_retrieval( @pytest.mark.client +@pytest.mark.object_removal def test_alert_deletion() -> None: _alert = sv_api_obj.UserAlert.new( name="test_alert", notification="none", description=None @@ -449,18 +494,17 @@ def test_alert_deletion() -> None: @pytest.mark.client +@pytest.mark.object_removal def test_abort_run(speedy_heartbeat, create_plain_run: tuple[sv_run.Run, dict]) -> None: run, run_data = create_plain_run _uuid = f"{uuid.uuid4()}".split("-")[0] run.update_tags([f"delete_me_{_uuid}"]) _client = svc.Client() _client.abort_run(run.id, reason="Test abort") - time.sleep(2) + _attempts: int = 0 - # On some machines it might take a little longer so - # try twice before accepting the abort failed - try: - assert run._status == "terminated" - except AssertionError: - time.sleep(2) - assert run._status == "terminated" + while run.status != "terminated" and _attempts < 10: + time.sleep(1) + _attempts += 1 + if _attempts >= 10: + raise AssertionError("Failed to terminate run.") diff --git a/tests/functional/test_dispatch.py b/tests/functional/test_dispatch.py index e2ab6362..3c007cf6 100644 --- a/tests/functional/test_dispatch.py +++ b/tests/functional/test_dispatch.py @@ -4,6 +4,7 @@ import time from threading import Event, Thread from queue import Queue +from concurrent.futures import ThreadPoolExecutor from simvue.factory.dispatch.queued import QueuedDispatcher @@ -12,17 +13,16 @@ # FIXME: Update the layout of these tests + @pytest.mark.dispatch +@pytest.mark.parametrize("overload_buffer", (True, False), ids=("overload", "normal")) @pytest.mark.parametrize( - "overload_buffer", (True, False), - ids=("overload", "normal") -) -@pytest.mark.parametrize( - "append_during_dispatch", (True, False), - ids=("pre_append", "append") + "append_during_dispatch", (True, False), ids=("pre_append", "append") ) @pytest.mark.parametrize("multiple", (True, False), ids=("multiple", "single")) -def test_queued_dispatcher(overload_buffer: bool, multiple: bool, append_during_dispatch: bool) -> None: +def test_queued_dispatcher( + overload_buffer: bool, multiple: bool, append_during_dispatch: bool +) -> None: buffer_size: int = 10 n_elements: int = 2 * buffer_size if overload_buffer else buffer_size - 1 max_read_rate: float = 0.2 @@ -42,24 +42,39 @@ def test_queued_dispatcher(overload_buffer: bool, multiple: bool, append_during_ for variable in variables: check_dict[variable] = {"counter": 0} - def callback(___: list[typing.Any], _: str, args=check_dict, var=variable) -> None: + + def callback( + ___: list[typing.Any], _: str, args=check_dict, var=variable + ) -> None: args[var]["counter"] += 1 + dispatchers.append( - QueuedDispatcher(callback, [variable], event, max_buffer_size=buffer_size, max_read_rate=max_read_rate) + QueuedDispatcher( + callback, + [variable], + event, + max_buffer_size=buffer_size, + max_read_rate=max_read_rate, + name=f"Queued_Dispatcher_{variable}" + ) ) if not append_during_dispatch: for i in range(n_elements): - for variable, dispatcher in zip(variables, dispatchers): - dispatcher.add_item({string.ascii_uppercase[i % 26]: i}, variable, False) + for variable, dispatcher in zip(variables, dispatchers): + dispatcher.add_item( + {string.ascii_uppercase[i % 26]: i}, variable, False + ) for dispatcher in dispatchers: dispatcher.start() if append_during_dispatch: for i in range(n_elements): - for variable, dispatcher in zip(variables, dispatchers): - dispatcher.add_item({string.ascii_uppercase[i % 26]: i}, variable, False) + for variable, dispatcher in zip(variables, dispatchers): + dispatcher.add_item( + {string.ascii_uppercase[i % 26]: i}, variable, False + ) while not dispatcher.empty: time.sleep(0.1) @@ -70,7 +85,9 @@ def callback(___: list[typing.Any], _: str, args=check_dict, var=variable) -> No time.sleep(0.1) for variable in variables: - assert check_dict[variable]["counter"] >= (2 if overload_buffer else 1), f"Check of counter for dispatcher '{variable}' failed with count = {check_dict[variable]['counter']}" + assert check_dict[variable]["counter"] >= (2 if overload_buffer else 1), ( + f"Check of counter for dispatcher '{variable}' failed with count = {check_dict[variable]['counter']}" + ) assert time.time() - start_time < time_threshold @@ -86,19 +103,30 @@ def test_nested_queued_dispatch(multi_queue: bool) -> None: result_queue = Queue() event = Event() + def create_callback(index): - def callback(___: list[typing.Any], _: str, check_dict=check_dict[index]) -> None: + def callback( + ___: list[typing.Any], _: str, check_dict=check_dict[index] + ) -> None: check_dict["counter"] += 1 + return callback - def _main(res_queue, index, dispatch_callback=create_callback, term_event=event, variable=variable) -> bool: + def _main( + res_queue, + index, + dispatch_callback=create_callback, + term_event=event, + variable=variable, + ) -> bool: term_event = Event() dispatcher = QueuedDispatcher( dispatch_callback(index), [variable] if isinstance(variable, str) else variable, term_event, max_buffer_size=buffer_size, - max_read_rate=max_read_rate + max_read_rate=max_read_rate, + name=f"test_nested_queued_dispatch" ) dispatcher.start() @@ -106,13 +134,17 @@ def _main(res_queue, index, dispatch_callback=create_callback, term_event=event, try: for i in range(n_elements): if isinstance(variable, str): - dispatcher.add_item({string.ascii_uppercase[i % 26]: i}, variable, False) + dispatcher.add_item( + {string.ascii_uppercase[i % 26]: i}, variable, False + ) else: for var in variable: - dispatcher.add_item({string.ascii_uppercase[i % 26]: i}, var, False) - except(RuntimeError): + dispatcher.add_item( + {string.ascii_uppercase[i % 26]: i}, var, False + ) + except RuntimeError: res_queue.put("AARGHGHGHGHAHSHGHSDHFSEDHSE") - + time.sleep(0.1) while not dispatcher.empty: @@ -127,10 +159,18 @@ def _main(res_queue, index, dispatch_callback=create_callback, term_event=event, threads = [] for i in range(3): - _thread = Thread(target=_main, args=(result_queue, i,)) + _thread = Thread( + target=_main, + args=( + result_queue, + i, + ), + daemon=True, + name=f"nested_queue_dispatch_{i}_Thread", + ) _thread.start() threads.append(_thread) - + for i in range(3): threads[i].join() @@ -138,7 +178,10 @@ def _main(res_queue, index, dispatch_callback=create_callback, term_event=event, assert False for i in range(3): - assert check_dict[i]["counter"] >= 2, f"Check of counter for dispatcher '{variable}' failed with count = {check_dict[i]['counter']}" + assert check_dict[i]["counter"] >= 2, ( + f"Check of counter for dispatcher '{variable}' failed with count = {check_dict[i]['counter']}" + ) + def test_queued_dispatch_error_adding_item_after_termination() -> None: trigger = Event() @@ -148,7 +191,8 @@ def test_queued_dispatch_error_adding_item_after_termination() -> None: object_types=["q"], termination_trigger=trigger, max_buffer_size=5, - max_read_rate=2 + max_read_rate=2, + name="test_queued_dispatch_error_adding_item_after_termination" ) dispatcher.start() @@ -157,6 +201,7 @@ def test_queued_dispatch_error_adding_item_after_termination() -> None: with pytest.raises(RuntimeError): dispatcher.add_item("blah", "q", False) + def test_queued_dispatch_error_attempting_to_use_non_existent_queue() -> None: trigger = Event() dispatcher = QueuedDispatcher( @@ -164,7 +209,8 @@ def test_queued_dispatch_error_attempting_to_use_non_existent_queue() -> None: object_types=["q"], termination_trigger=trigger, max_buffer_size=5, - max_read_rate=2 + max_read_rate=2, + name="test_queued_dispatch_error_attempting_to_use_non_existent_queue" ) dispatcher.start() @@ -194,18 +240,22 @@ def test_direct_dispatcher(multiple: bool) -> None: for variable in variables: check_dict[variable] = {"counter": 0} - def callback(___: list[typing.Any], _: str, args=check_dict, var=variable) -> None: + + def callback( + ___: list[typing.Any], _: str, args=check_dict, var=variable + ) -> None: args[var]["counter"] += 1 - dispatchers.append( - DirectDispatcher(callback, [variable], event) - ) + + dispatchers.append(DirectDispatcher(callback, [variable], event)) for i in range(n_elements): - for variable, dispatcher in zip(variables, dispatchers): + for variable, dispatcher in zip(variables, dispatchers): dispatcher.add_item({string.ascii_uppercase[i % 26]: i}, variable) event.set() for variable in variables: - assert check_dict[variable]["counter"] >= 1, f"Check of counter for dispatcher '{variable}' failed with count = {check_dict[variable]['counter']}" + assert check_dict[variable]["counter"] >= 1, ( + f"Check of counter for dispatcher '{variable}' failed with count = {check_dict[variable]['counter']}" + ) assert time.time() - start_time < time_threshold diff --git a/tests/functional/test_executor.py b/tests/functional/test_executor.py index 05f5ce26..d1bbe207 100644 --- a/tests/functional/test_executor.py +++ b/tests/functional/test_executor.py @@ -1,4 +1,6 @@ +import contextlib import typing +import uuid import pytest import simvue import time @@ -8,7 +10,10 @@ import os import multiprocessing import multiprocessing.synchronize - + +from simvue.api.objects.folder import Folder +from simvue.exception import ObjectNotFoundError + @pytest.mark.executor @pytest.mark.parametrize("successful", (True, False), ids=("successful", "failing")) @@ -18,6 +23,9 @@ def test_executor_add_process( ) -> None: import logging trigger = multiprocessing.Event() + folder_id = f"{uuid.uuid4()}".split("-")[0] + folder = Folder.new(path=f"/simvue_unit_testing/{folder_id}") + folder.commit() def completion_callback(*_, trigger=trigger, **__): trigger.set() @@ -26,7 +34,8 @@ def completion_callback(*_, trigger=trigger, **__): run.init( f"test_executor_{'success' if successful else 'fail'}", tags=["simvue_client_unit_tests", request.node.name.replace("[", "_").replace("]", "_")], - folder="/simvue_unit_testing" + folder=f"/simvue_unit_testing/{folder_id}", + retention_period="2 mins" ) run.add_process( identifier=f"test_add_process_{'success' if successful else 'fail'}", @@ -44,6 +53,9 @@ def completion_callback(*_, trigger=trigger, **__): with pytest.raises(SystemExit): run.close() + with contextlib.suppress(ObjectNotFoundError): + folder.delete(recursive=True, delete_runs=True) + @pytest.mark.executor @pytest.mark.unix @@ -51,11 +63,14 @@ def test_executor_multiprocess(request: pytest.FixtureRequest) -> None: triggers: dict[int, multiprocessing.synchronize.Event] = {} callbacks: dict[int, typing.Callable] = {} events: dict[int, bool] = {} + folder_id = f"{uuid.uuid4()}".split("-")[0] + folder = Folder.new(path=f"/simvue_unit_testing/{folder_id}") + folder.commit() with tempfile.TemporaryDirectory() as tempd: with simvue.Run() as run: run.init( "test_executor_multiprocess", - folder="/simvue_unit_testing", + folder=f"/simvue_unit_testing/{folder_id}", tags=["simvue_client_tests", request.node.name] ) @@ -82,10 +97,15 @@ def callback(*_, evts=events, ident=i, **__): for i in range(10): os.remove(f"test_executor_multiprocess_cmd_{i}.err") os.remove(f"test_executor_multiprocess_cmd_{i}.out") + with contextlib.suppress(ObjectNotFoundError): + folder.delete(recursive=True, delete_runs=True) @pytest.mark.executor def test_add_process_command_assembly(request: pytest.FixtureRequest) -> None: + folder_id = f"{uuid.uuid4()}".split("-")[0] + folder = Folder.new(path=f"/simvue_unit_testing/{folder_id}") + folder.commit() with tempfile.TemporaryDirectory() as tempd: _python_script=""" import argparse @@ -115,7 +135,7 @@ def test_add_process_command_assembly(request: pytest.FixtureRequest) -> None: with simvue.Run() as run: run.init( "test_advanced_executor", - folder="/simvue_unit_testing", + folder=f"/simvue_unit_testing/{folder_id}", tags=["simvue_client_tests", request.node.name] ) run.add_process( @@ -126,17 +146,22 @@ def test_add_process_command_assembly(request: pytest.FixtureRequest) -> None: output_file=out_file ) assert run._executor.command_str[exe_id] == expected_cmd + with contextlib.suppress(ObjectNotFoundError): + folder.delete(recursive=True, delete_runs=True) @pytest.mark.executor def test_completion_callbacks_var_change(request: pytest.FixtureRequest) -> None: success: dict[str, bool] = {"complete": False} def completion_callback(*_, success: dict[str, bool]=success, **__): success["complete"] = True + folder_id = f"{uuid.uuid4()}".split("-")[0] + folder = Folder.new(path=f"/simvue_unit_testing/{folder_id}") + folder.commit() with simvue.Run() as run: run.init( "test_completion_callbacks_var_change", - folder="/simvue_unit_testing", + folder=f"/simvue_unit_testing/{folder_id}", tags=["simvue_client_tests", request.node.name] ) run.add_process( @@ -150,16 +175,21 @@ def completion_callback(*_, success: dict[str, bool]=success, **__): time.sleep(1) assert success["complete"] + with contextlib.suppress(ObjectNotFoundError): + folder.delete(recursive=True, delete_runs=True) @pytest.mark.executor @pytest.mark.unix def test_completion_trigger_set(request: pytest.FixtureRequest) -> None: trigger = multiprocessing.Event() + folder_id = f"{uuid.uuid4()}".split("-")[0] + folder = Folder.new(path=f"/simvue_unit_testing/{folder_id}") + folder.commit() with simvue.Run() as run: run.init( "test_completion_trigger_set", - folder="/simvue_unit_testing", + folder=f"/simvue_unit_testing/{folder_id}", tags=["simvue_client_tests", request.node.name] ) run.add_process( @@ -173,6 +203,8 @@ def test_completion_trigger_set(request: pytest.FixtureRequest) -> None: time.sleep(1) assert trigger.is_set() + with contextlib.suppress(ObjectNotFoundError): + folder.delete(recursive=True, delete_runs=True) @pytest.mark.executor def test_completion_callbacks_trigger_set(request: pytest.FixtureRequest) -> None: @@ -181,6 +213,10 @@ def test_completion_callbacks_trigger_set(request: pytest.FixtureRequest) -> Non def completion_callback(*_, trigger=trigger, **__): trigger.set() + folder_id = f"{uuid.uuid4()}".split("-")[0] + folder = Folder.new(path=f"/simvue_unit_testing/{folder_id}") + folder.commit() + with simvue.Run() as run: run.init( "test_completion_callbacks_trigger_set", @@ -198,4 +234,6 @@ def completion_callback(*_, trigger=trigger, **__): time.sleep(1) assert trigger.is_set() + with contextlib.suppress(ObjectNotFoundError): + folder.delete(recursive=True, delete_runs=True) diff --git a/tests/functional/test_run_class.py b/tests/functional/test_run_class.py index 7c30567e..59c0362f 100644 --- a/tests/functional/test_run_class.py +++ b/tests/functional/test_run_class.py @@ -2,6 +2,7 @@ import logging import os import pytest +import requests import pytest_mock import time import typing @@ -18,7 +19,7 @@ import simvue from simvue.api.objects import Alert, Metrics from simvue.eco.api_client import CO2SignalData, CO2SignalResponse -from simvue.exception import SimvueRunError +from simvue.exception import ObjectNotFoundError, SimvueRunError from simvue.eco.emissions_monitor import TIME_FORMAT, CO2Monitor import simvue.run as sv_run import simvue.client as sv_cl @@ -32,11 +33,29 @@ @pytest.mark.run -def test_created_run() -> None: +def test_created_run(request) -> None: + _uuid = f"{uuid.uuid4()}".split("-")[0] with sv_run.Run() as run_created: - run_created.init(running=False, retention_period="1 min") + run_created.init( + request.node.name.replace("[", "_").replace("]", "_"), + tags=[ + "simvue_client_unit_tests", + "test_created_run" + ], + folder=f"/simvue_unit_testing/{_uuid}", + running=False, + visibility="tenant" if os.environ.get("CI") else None, + retention_period=os.environ.get("SIMVUE_TESTING_RETENTION_PERIOD", "2 mins"), + ) _run = RunObject(identifier=run_created.id) assert _run.status == "created" + with contextlib.suppress(ObjectNotFoundError): + client = sv_cl.Client() + client.delete_folder( + f"/simvue_unit_testing/{uuid}", + remove_runs=True, + recursive=True + ) @pytest.mark.run @@ -53,11 +72,19 @@ def test_check_run_initialised_decorator() -> None: @pytest.mark.run @pytest.mark.eco @pytest.mark.online -def test_run_with_emissions_online(speedy_heartbeat, mock_co2_signal, create_plain_run) -> None: +def test_run_with_emissions_online(speedy_heartbeat, mock_co2_signal, create_plain_run: tuple[sv_run.Run, ...], mocker) -> None: run_created, _ = create_plain_run + metric_interval = 1 run_created._user_config.eco.co2_signal_api_token = "test_token" - run_created.config(enable_emission_metrics=True, system_metrics_interval=1) - time.sleep(5) + run_created.config(enable_emission_metrics=True, system_metrics_interval=metric_interval) + while ( + "sustainability.emissions.total" not in requests.get( + url=f"{run_created._user_config.server.url}/metrics/names", + headers=run_created._headers, + params={"runs": json.dumps([run_created.id])}).json() + and run_created.metric_spy.call_count < 4 + ): + time.sleep(metric_interval) _run = RunObject(identifier=run_created.id) _metric_names = [item[0] for item in _run.metrics] client = sv_cl.Client() @@ -75,7 +102,6 @@ def test_run_with_emissions_online(speedy_heartbeat, mock_co2_signal, create_pla # Check that total = previous total + latest delta _total_values = _metric_values[_total_metric_name].tolist() _delta_values = _metric_values[_delta_metric_name].tolist() - assert len(_total_values) > 1 for i in range(1, len(_total_values)): assert _total_values[i] == _total_values[i - 1] + _delta_values[i] @@ -118,7 +144,7 @@ def test_run_with_emissions_offline(speedy_heartbeat, mock_co2_signal, create_pl assert len(_total_values) > 1 for i in range(1, len(_total_values)): assert _total_values[i] == _total_values[i - 1] + _delta_values[i] - + @pytest.mark.run @pytest.mark.parametrize( "timestamp", @@ -145,17 +171,18 @@ def test_log_metrics( metrics_spy = mocker.spy(Metrics, "new") system_metrics_spy = mocker.spy(sv_run.Run, "_get_internal_metrics") + unique_id = f"{uuid.uuid4()}".split("-")[0] if visibility == "bad_option": with pytest.raises(SimvueRunError, match="visibility") as e: run.init( - name=f"test_run_{str(uuid.uuid4()).split('-', 1)[0]}", + request.node.name.replace("[", "_").replace("]", "_"), tags=[ "simvue_client_unit_tests", - request.node.name.replace("[", "_").replace("]", "_"), + "test_log_metrics", ], - folder="/simvue_unit_testing", - retention_period="1 hour", + folder=f"/simvue_unit_testing/{unique_id}", + retention_period=os.environ.get("SIMVUE_TESTING_RETENTION_PERIOD", "2 mins"), visibility=visibility, ) # Will log system metrics on startup, and then not again within timeframe of test @@ -164,14 +191,14 @@ def test_log_metrics( return run.init( - name=f"test_run_{str(uuid.uuid4()).split('-', 1)[0]}", + request.node.name.replace("[", "_").replace("]", "_"), tags=[ "simvue_client_unit_tests", - request.node.name.replace("[", "_").replace("]", "_"), + "test_log_metrics", ], - folder="/simvue_unit_testing", + folder=f"/simvue_unit_testing/{unique_id}", visibility=visibility, - retention_period="1 hour", + retention_period=os.environ.get("SIMVUE_TESTING_RETENTION_PERIOD", "2 mins"), ) # Will log system metrics on startup, and then not again within timeframe of test # So should have exactly one measurement of this @@ -185,8 +212,8 @@ def test_log_metrics( run.log_metrics({key: i for key in METRICS}, timestamp=timestamp) else: run.log_metrics(METRICS, timestamp=timestamp) - time.sleep(2.0 if overload_buffer else 1.0) run.close() + time.sleep(2.0 if overload_buffer else 1.0) client = sv_cl.Client() _data = client.get_metric_values( run_ids=[run.id], @@ -195,8 +222,12 @@ def test_log_metrics( aggregate=False, ) - with contextlib.suppress(RuntimeError): - client.delete_run(run.id) + with contextlib.suppress(ObjectNotFoundError): + client.delete_folder( + f"/simvue_unit_testing/{unique_id}", + recursive=True, + remove_runs=True + ) assert _data @@ -226,16 +257,18 @@ def test_log_metrics_offline(create_plain_run_offline: tuple[sv_run.Run, dict]) run, _ = create_plain_run_offline run_name = run.name run.log_metrics(METRICS) - time.sleep(1) - sv_send.sender(os.environ["SIMVUE_OFFLINE_DIRECTORY"], 2, 10) - run.close() client = sv_cl.Client() - _data = client.get_metric_values( + sv_send.sender(os.environ["SIMVUE_OFFLINE_DIRECTORY"], 2, 10) + attempts: int = 0 + + while not (_data := client.get_metric_values( run_ids=[client.get_run_id_from_name(run_name)], metric_names=list(METRICS.keys()), xaxis="step", aggregate=False, - ) + )) and attempts < 5: + sv_send.sender(os.environ["SIMVUE_OFFLINE_DIRECTORY"], 2, 10) + assert sorted(set(METRICS.keys())) == sorted(set(_data.keys())) _steps = [] for entry in _data.values(): @@ -254,32 +287,32 @@ def test_visibility_online( run = sv_run.Run() run.config(suppress_errors=False) + _uuid = f"{uuid.uuid4()}".split("-")[0] if visibility == "bad_option": with pytest.raises(SimvueRunError, match="visibility") as e: run.init( - name=f"test_visibility_{str(uuid.uuid4()).split('-', 1)[0]}", + request.node.name.replace("[", "_").replace("]", "_"), tags=[ "simvue_client_unit_tests", - request.node.name.replace("[", "_").replace("]", "_"), + "test_visibility_online" ], - folder="/simvue_unit_testing", - retention_period="1 hour", + folder=f"/simvue_unit_testing/{_uuid}", + retention_period=os.environ.get("SIMVUE_TESTING_RETENTION_PERIOD", "2 mins"), visibility=visibility, ) return run.init( - name=f"test_visibility_{str(uuid.uuid4()).split('-', 1)[0]}", + request.node.name.replace("[", "_").replace("]", "_"), tags=[ "simvue_client_unit_tests", - request.node.name.replace("[", "_").replace("]", "_"), + "test_visibility_online" ], - folder="/simvue_unit_testing", + folder=f"/simvue_unit_testing/{_uuid}", visibility=visibility, - retention_period="1 hour", + retention_period=os.environ.get("SIMVUE_TESTING_RETENTION_PERIOD", "2 mins"), ) - time.sleep(1) _id = run.id run.close() _retrieved_run = RunObject(identifier=_id) @@ -303,6 +336,7 @@ def test_visibility_offline( monkeypatch, visibility: typing.Literal["public", "tenant"] | list[str] | None, ) -> None: + _uuid = f"{uuid.uuid4()}".split("-")[0] with tempfile.TemporaryDirectory() as tempd: os.environ["SIMVUE_OFFLINE_DIRECTORY"] = tempd run = sv_run.Run(mode="offline") @@ -311,28 +345,27 @@ def test_visibility_offline( if visibility == "bad_option": with pytest.raises(SimvueRunError, match="visibility") as e: run.init( - name=f"test_visibility_{str(uuid.uuid4()).split('-', 1)[0]}", + request.node.name.replace("[", "_").replace("]", "_"), tags=[ "simvue_client_unit_tests", - request.node.name.replace("[", "_").replace("]", "_"), + "test_visibility_offline" ], - folder="/simvue_unit_testing", - retention_period="1 hour", + folder=f"/simvue_unit_testing/{_uuid}", + retention_period=os.environ.get("SIMVUE_TESTING_RETENTION_PERIOD", "2 mins"), visibility=visibility, ) return run.init( - name=f"test_visibility_{str(uuid.uuid4()).split('-', 1)[0]}", + request.node.name.replace("[", "_").replace("]", "_"), tags=[ - "simvue_client_unit_tests", - request.node.name.replace("[", "_").replace("]", "_"), + "simvue_client_unit_tests", + "test_visibility_offline" ], - folder="/simvue_unit_testing", + folder=f"/simvue_unit_testing/{_uuid}", visibility=visibility, - retention_period="1 hour", + retention_period=os.environ.get("SIMVUE_TESTING_RETENTION_PERIOD", "2 mins"), ) - time.sleep(1) _id = run.id _id_mapping = sv_send.sender(os.environ["SIMVUE_OFFLINE_DIRECTORY"], 2, 10) run.close() @@ -346,14 +379,19 @@ def test_visibility_offline( assert not _retrieved_run.visibility.tenant and not _retrieved_run.visibility.public else: assert _retrieved_run.visibility.users == visibility + with contextlib.suppress(ObjectNotFoundError): + client = sv_cl.Client() + client.delete_folder( + f"/simvue_unit_testing/{_uuid}", + recursive=True, + remove_runs=True + ) @pytest.mark.run def test_log_events_online(create_test_run: tuple[sv_run.Run, dict]) -> None: EVENT_MSG = "Hello world!" run, _ = create_test_run run.log_event(EVENT_MSG) - time.sleep(1.0) - run.close() client = sv_cl.Client() event_data = client.get_events(run.id, count_limit=1) assert event_data[0].get("message", EVENT_MSG) @@ -366,11 +404,18 @@ def test_log_events_offline(create_plain_run_offline: tuple[sv_run.Run, dict]) - run, _ = create_plain_run_offline run_name = run.name run.log_event(EVENT_MSG) - time.sleep(1) sv_send.sender(os.environ["SIMVUE_OFFLINE_DIRECTORY"], 2, 10) - run.close() client = sv_cl.Client() - event_data = client.get_events(client.get_run_id_from_name(run_name), count_limit=1) + attempts: int = 0 + + # Because the time taken may vary between systems allow up to five attempts + # at an interval of 1 second + while ( + not (event_data := client.get_events(client.get_run_id_from_name(run_name), count_limit=1)) + ) and attempts < 5: + time.sleep(1) + sv_send.sender(os.environ["SIMVUE_OFFLINE_DIRECTORY"], 2, 10) + attempts += 1 assert event_data[0].get("message", EVENT_MSG) @@ -378,10 +423,9 @@ def test_log_events_offline(create_plain_run_offline: tuple[sv_run.Run, dict]) - @pytest.mark.offline def test_offline_tags(create_plain_run_offline: tuple[sv_run.Run, dict]) -> None: run, run_data = create_plain_run_offline - time.sleep(1.0) sv_send.sender(os.environ["SIMVUE_OFFLINE_DIRECTORY"], 2, 10) - run.close() client = sv_cl.Client() + tags = client.get_tags() # Find tag @@ -400,8 +444,6 @@ def test_update_metadata_running(create_test_run: tuple[sv_run.Run, dict]) -> No run.update_metadata({"d": "new"}) # Try updating an already defined piece of metadata run.update_metadata({"a": 1}) - run.close() - time.sleep(1.0) client = sv_cl.Client() run_info = client.get_run(run.id) @@ -419,7 +461,6 @@ def test_update_metadata_created(create_pending_run: tuple[sv_run.Run, dict]) -> run.update_metadata({"d": "new"}) # Try updating an already defined piece of metadata run.update_metadata({"a": 1}) - time.sleep(1.0) client = sv_cl.Client() run_info = client.get_run(run.id) @@ -443,8 +484,6 @@ def test_update_metadata_offline( run.update_metadata({"a": 1}) sv_send.sender(os.environ["SIMVUE_OFFLINE_DIRECTORY"], 2, 10) - run.close() - time.sleep(1.0) client = sv_cl.Client() run_info = client.get_run(client.get_run_id_from_name(run_name)) @@ -454,24 +493,26 @@ def test_update_metadata_offline( @pytest.mark.run +@pytest.mark.scenario @pytest.mark.parametrize("multi_threaded", (True, False), ids=("multi", "single")) def test_runs_multiple_parallel( multi_threaded: bool, request: pytest.FixtureRequest ) -> None: N_RUNS: int = 2 + _uuid = f"{uuid.uuid4()}".split("-")[0] if multi_threaded: def thread_func(index: int) -> tuple[int, list[dict[str, typing.Any]], str]: with sv_run.Run() as run: run.config(suppress_errors=False) run.init( - name=f"test_runs_multiple_{index + 1}", + request.node.name.replace("[", "_").replace("]", "_") + f"_{index}", tags=[ "simvue_client_unit_tests", - request.node.name.replace("[", "_").replace("]", "_"), ], - folder="/simvue_unit_testing", - retention_period="1 hour", + folder=f"/simvue_client_unit_tests/{_uuid}", + retention_period=os.environ.get("SIMVUE_TESTING_RETENTION_PERIOD", "2 mins"), + visibility="tenant" if os.environ.get("CI") else None, ) metrics = [] for _ in range(10): @@ -481,11 +522,9 @@ def thread_func(index: int) -> tuple[int, list[dict[str, typing.Any]], str]: run.log_metrics(metric) return index, metrics, run.id - with concurrent.futures.ThreadPoolExecutor(max_workers=N_RUNS) as executor: + with concurrent.futures.ThreadPoolExecutor(max_workers=N_RUNS, thread_name_prefix="test_runs_multiple_parallel") as executor: futures = [executor.submit(thread_func, i) for i in range(N_RUNS)] - time.sleep(1) - client = sv_cl.Client() for future in concurrent.futures.as_completed(futures): @@ -505,20 +544,22 @@ def thread_func(index: int) -> tuple[int, list[dict[str, typing.Any]], str]: with sv_run.Run() as run_2: run_1.config(suppress_errors=False) run_1.init( - name="test_runs_multiple_unthreaded_1", + request.node.name.replace("[", "_").replace("]", "_") + "_1", tags=[ "simvue_client_unit_tests", - request.node.name.replace("[", "_").replace("]", "_"), + "test_multi_run_unthreaded" ], - folder="/simvue_unit_testing", - retention_period="1 hour", + folder=f"/simvue_client_unit_tests/{_uuid}", + retention_period=os.environ.get("SIMVUE_TESTING_RETENTION_PERIOD", "2 mins"), + visibility="tenant" if os.environ.get("CI") else None, ) run_2.config(suppress_errors=False) run_2.init( - name="test_runs_multiple_unthreaded_2", + request.node.name.replace("[", "_").replace("]", "_") + "_2", tags=["simvue_client_unit_tests", "test_multi_run_unthreaded"], - folder="/simvue_unit_testing", - retention_period="1 hour", + folder=f"/simvue_client_unit_tests/{_uuid}", + retention_period=os.environ.get("SIMVUE_TESTING_RETENTION_PERIOD", "2 mins"), + visibility="tenant" if os.environ.get("CI") else None, ) metrics_1 = [] metrics_2 = [] @@ -531,7 +572,6 @@ def thread_func(index: int) -> tuple[int, list[dict[str, typing.Any]], str]: metrics.append(metric) run.log_metrics(metric) - time.sleep(1) client = sv_cl.Client() @@ -546,29 +586,35 @@ def thread_func(index: int) -> tuple[int, list[dict[str, typing.Any]], str]: ) with contextlib.suppress(RuntimeError): - client.delete_run(run_1.id) - client.delete_run(run_2.id) + client.delete_folder( + f"/simvue_unit_testing/{_uuid}", + remove_runs=True, + recursive=True + ) @pytest.mark.run +@pytest.mark.scenario def test_runs_multiple_series(request: pytest.FixtureRequest) -> None: N_RUNS: int = 2 metrics = [] run_ids = [] + _uuid = f"{uuid.uuid4()}".split("-")[0] for index in range(N_RUNS): with sv_run.Run() as run: run_metrics = [] run.config(suppress_errors=False) run.init( - name=f"test_runs_multiple_series_{index}", + request.node.name.replace("[", "_").replace("]", "_"), tags=[ "simvue_client_unit_tests", - request.node.name.replace("[", "_").replace("]", "_"), + "test_runs_multiple_series" ], - folder="/simvue_unit_testing", - retention_period="1 hour", + folder=f"/simvue_unit_testing/{_uuid}", + retention_period=os.environ.get("SIMVUE_TESTING_RETENTION_PERIOD", "2 mins"), + visibility="tenant" if os.environ.get("CI") else None, ) run_ids.append(run.id) for _ in range(10): @@ -578,8 +624,6 @@ def test_runs_multiple_series(request: pytest.FixtureRequest) -> None: run.log_metrics(metric) metrics.append(run_metrics) - time.sleep(1) - client = sv_cl.Client() for i, run_id in enumerate(run_ids): @@ -592,9 +636,12 @@ def test_runs_multiple_series(request: pytest.FixtureRequest) -> None: aggregate=False, ) - with contextlib.suppress(RuntimeError): - for run_id in run_ids: - client.delete_run(run_id) + with contextlib.suppress(ObjectNotFoundError): + client.delete_folder( + f"/simvue_unit_testing/{_uuid}", + recursive=True, + remove_runs=True + ) @pytest.mark.run @@ -604,6 +651,7 @@ def test_suppressed_errors( ) -> None: logging.getLogger("simvue").setLevel(logging.DEBUG) setup_logging.captures = ["Skipping call to"] + _uuid = f"{uuid.uuid4()}".split("-")[0] with sv_run.Run(mode="offline") as run: decorated_funcs = [ @@ -615,13 +663,14 @@ def test_suppressed_errors( if post_init: decorated_funcs.remove("init") run.init( - name="test_suppressed_errors", - folder="/simvue_unit_testing", + request.node.name.replace("[", "_").replace("]", "_"), + folder=f"/simvue_unit_testing/{_uuid}", tags=[ "simvue_client_unit_tests", - request.node.name.replace("[", "_").replace("]", "_"), + "test_suppressed_errors" ], - retention_period="1 hour", + retention_period=os.environ.get("SIMVUE_TESTING_RETENTION_PERIOD", "2 mins"), + visibility="tenant" if os.environ.get("CI") else None, ) run.config(suppress_errors=True) @@ -635,6 +684,14 @@ def test_suppressed_errors( else: assert setup_logging.counts[0] == len(decorated_funcs) + with contextlib.suppress(ObjectNotFoundError): + client = sv_cl.Client() + client.delete_folder( + f"/simvue_unit_testing/{_uuid}", + recursive=True, + remove_runs=True + ) + @pytest.mark.run def test_bad_run_arguments() -> None: @@ -645,14 +702,20 @@ def test_bad_run_arguments() -> None: @pytest.mark.run def test_set_folder_details(request: pytest.FixtureRequest) -> None: + _uuid = f"{uuid.uuid4()}".split("-")[0] with sv_run.Run() as run: - folder_name: str = "/simvue_unit_tests" + folder_name: str = f"/simvue_unit_testing/{_uuid}" description: str = "test description" tags: list[str] = [ "simvue_client_unit_tests", - request.node.name.replace("[", "_").replace("]", "_"), + "test_set_folder_details" ] - run.init(folder=folder_name) + run.init( + request.node.name.replace("[", "_").replace("]", "_"), + folder=folder_name, + visibility="tenant" if os.environ.get("CI") else None, + retention_period=os.environ.get("SIMVUE_TESTING_RETENTION_PERIOD", "2 mins"), + ) run.set_folder_details(tags=tags, description=description) client = sv_cl.Client() @@ -663,6 +726,13 @@ def test_set_folder_details(request: pytest.FixtureRequest) -> None: assert _folder.description == description + with contextlib.suppress(ObjectNotFoundError): + client.delete_folder( + f"/simvue_unit_testing/{_uuid}", + remove_runs=True, + recursive=True + ) + @pytest.mark.run @pytest.mark.parametrize( @@ -772,8 +842,6 @@ def test_save_file_offline( name=name, ) sv_send.sender(os.environ["SIMVUE_OFFLINE_DIRECTORY"], 2, 10) - simvue_run.close() - time.sleep(1.0) os.remove(out_name) client = sv_cl.Client() base_name = name or out_name.name @@ -806,14 +874,12 @@ def test_update_tags_running( simvue_run.set_tags(tags) - time.sleep(1) client = sv_cl.Client() run_data = client.get_run(simvue_run.id) assert sorted(run_data.tags) == sorted(tags) simvue_run.update_tags(["additional"]) - time.sleep(1) run_data = client.get_run(simvue_run.id) assert sorted(run_data.tags) == sorted(tags + ["additional"]) @@ -832,14 +898,12 @@ def test_update_tags_created( simvue_run.set_tags(tags) - time.sleep(1) client = sv_cl.Client() run_data = client.get_run(simvue_run.id) assert sorted(run_data.tags) == sorted(tags) simvue_run.update_tags(["additional"]) - time.sleep(1) run_data = client.get_run(simvue_run.id) assert sorted(run_data.tags) == sorted(tags + ["additional"]) @@ -861,14 +925,10 @@ def test_update_tags_offline( simvue_run.update_tags(["additional"]) sv_send.sender(os.environ["SIMVUE_OFFLINE_DIRECTORY"], 2, 10) - simvue_run.close() - time.sleep(1.0) client = sv_cl.Client() run_data = client.get_run(client.get_run_id_from_name(run_name)) - time.sleep(1) - run_data = client.get_run(simvue_run.id) assert sorted(run_data.tags) == sorted(["simvue_client_unit_tests", "additional"]) @@ -901,10 +961,10 @@ def test_add_alerts() -> None: run = sv_run.Run() run.init( name="test_add_alerts", - folder="/simvue_unit_tests", - retention_period="1 min", + folder=f"/simvue_unit_testing/{_uuid}", + retention_period=os.environ.get("SIMVUE_TESTING_RETENTION_PERIOD", "2 mins"), tags=["test_add_alerts"], - visibility="tenant", + visibility="tenant" if os.environ.get("CI") else None, ) _expected_alerts = [] @@ -956,7 +1016,6 @@ def test_add_alerts() -> None: f"metric_threshold_alert_{_uuid}", ] ) - time.sleep(1) # Check that there is no duplication _online_run.refresh() @@ -964,7 +1023,6 @@ def test_add_alerts() -> None: # Create another run without adding to run _id = run.create_user_alert(name=f"user_alert_{_uuid}", attach_to_run=False) - time.sleep(1) # Check alert is not added _online_run.refresh() @@ -973,7 +1031,6 @@ def test_add_alerts() -> None: # Try adding alerts with IDs, check there is no duplication _expected_alerts.append(_id) run.add_alerts(ids=_expected_alerts) - time.sleep(1) _online_run.refresh() assert sorted(_online_run.alerts) == sorted(_expected_alerts) @@ -981,7 +1038,12 @@ def test_add_alerts() -> None: run.close() client = sv_cl.Client() - client.delete_run(run.id) + with contextlib.suppress(ObjectNotFoundError): + client.delete_folder( + f"/simvue_unit_testing/{_uuid}", + remove_runs=True, + recursive=True + ) for _id in _expected_alerts: client.delete_alert(_id) @@ -993,10 +1055,10 @@ def test_log_alert() -> None: run = sv_run.Run() run.init( name="test_log_alerts", - folder="/simvue_unit_tests", - retention_period="1 min", + folder=f"/simvue_unit_testing/{_uuid}", + retention_period=os.environ.get("SIMVUE_TESTING_RETENTION_PERIOD", "2 mins"), tags=["test_add_alerts"], - visibility="tenant", + visibility="tenant" if os.environ.get("CI") else None, ) _run_id = run.id # Create a user alert @@ -1006,7 +1068,6 @@ def test_log_alert() -> None: # Set alert state to critical by name run.log_alert(name=f"user_alert_{_uuid}", state="critical") - time.sleep(1) client = sv_cl.Client() _alert = client.get_alerts(run_id=_run_id, critical_only=False, names_only=False)[0] @@ -1014,7 +1075,6 @@ def test_log_alert() -> None: # Set alert state to OK by ID run.log_alert(identifier=_id, state="ok") - time.sleep(2) _alert.refresh() assert _alert.get_status(_run_id) == "ok" @@ -1029,6 +1089,14 @@ def test_log_alert() -> None: run.log_alert(identifier="myid", name="myname", state="critical") assert "Please specify alert to update either by ID or by name." in str(e.value) + with contextlib.suppress(ObjectNotFoundError): + client.delete_folder( + f"/simvue_unit_testing/{_uuid}", + remove_runs=True, + recursive=True + ) + client.delete_alert(_id) + @pytest.mark.run def test_abort_on_alert_process(mocker: pytest_mock.MockerFixture) -> None: @@ -1040,13 +1108,14 @@ def testing_exit(status: int) -> None: def abort_callback(abort_run=trigger) -> None: trigger.set() + _uuid = f"{uuid.uuid4()}".split("-")[0] run = sv_run.Run(abort_callback=abort_callback) run.init( name="test_abort_on_alert_process", - folder="/simvue_unit_tests", - retention_period="1 min", + folder=f"/simvue_unit_testing/{_uuid}", + retention_period=os.environ.get("SIMVUE_TESTING_RETENTION_PERIOD", "2 mins"), tags=["test_abort_on_alert_process"], - visibility="tenant", + visibility="tenant" if os.environ.get("CI") else None, ) mocker.patch("os._exit", testing_exit) @@ -1073,6 +1142,13 @@ def abort_callback(abort_run=trigger) -> None: run.kill_all_processes() raise AssertionError("Run was not terminated") assert trigger.is_set() + run.close() + with contextlib.suppress(ObjectNotFoundError): + client.delete_folder( + f"/simvue_unit_testing/{_uuid}", + remove_runs=True, + recursive=True + ) @pytest.mark.run @@ -1084,8 +1160,15 @@ def test_abort_on_alert_python( run, _ = create_plain_run client = sv_cl.Client() client.abort_run(run.id, reason="Test abort") - time.sleep(2) - assert run._status == "terminated" + + attempts: int = 0 + + while run._status == "terminated" and attemps < 5: + time.sleep(1) + attempts += 1 + + if attempts >= 5: + raise AssertionError("Failed to terminate run") @pytest.mark.run @@ -1099,9 +1182,7 @@ def test_abort_on_alert_raise( run._testing = True alert_id = run.create_user_alert("abort_test", trigger_abort=True) run.add_process(identifier="forever_long", executable="bash", c="sleep 10") - time.sleep(2) run.log_alert(identifier=alert_id, state="critical") - time.sleep(1) _alert = Alert(identifier=alert_id) assert _alert.get_status(run.id) == "critical" counter = 0 @@ -1123,9 +1204,7 @@ def test_kill_all_processes(create_plain_run: tuple[sv_run.Run, dict]) -> None: processes = [ psutil.Process(process.pid) for process in run._executor._processes.values() ] - time.sleep(2) run.kill_all_processes() - time.sleep(4) for process in processes: assert not process.is_running() assert all(not child.is_running() for child in process.children(recursive=True)) @@ -1133,21 +1212,30 @@ def test_kill_all_processes(create_plain_run: tuple[sv_run.Run, dict]) -> None: @pytest.mark.run def test_run_created_with_no_timeout() -> None: + _uuid = f"{uuid.uuid4()}".split("-")[0] with simvue.Run() as run: run.init( name="test_run_created_with_no_timeout", - folder="/simvue_unit_testing", - retention_period="2 minutes", + folder=f"/simvue_unit_testing/{_uuid}", + retention_period=os.environ.get("SIMVUE_TESTING_RETENTION_PERIOD", "2 mins"), timeout=None, + visibility="tenant" if os.environ.get("CI") else None, ) client = simvue.Client() assert client.get_run(run.id) + with contextlib.suppress(ObjectNotFoundError): + client.delete_folder( + f"/simvue_unit_testing/{_uuid}", + remove_runs=True, + recursive=True + ) @pytest.mark.parametrize("mode", ("online", "offline"), ids=("online", "offline")) @pytest.mark.run def test_reconnect_functionality(mode, monkeypatch: pytest.MonkeyPatch) -> None: temp_d: tempfile.TemporaryDirectory | None = None + _uuid = f"{uuid.uuid4()}".split("-")[0] if mode == "offline": temp_d = tempfile.TemporaryDirectory() @@ -1156,8 +1244,8 @@ def test_reconnect_functionality(mode, monkeypatch: pytest.MonkeyPatch) -> None: with simvue.Run(mode=mode) as run: run.init( name="test_reconnect", - folder="/simvue_unit_testing", - retention_period="2 minutes", + folder=f"/simvue_unit_testing/{_uuid}", + retention_period=os.environ.get("SIMVUE_TESTING_RETENTION_PERIOD", "2 mins"), timeout=None, running=False, ) @@ -1169,7 +1257,6 @@ def test_reconnect_functionality(mode, monkeypatch: pytest.MonkeyPatch) -> None: client = simvue.Client() _created_run = client.get_run(run_id) assert _created_run.status == "created" - time.sleep(1) with simvue.Run() as run: run.reconnect(run_id) @@ -1188,10 +1275,16 @@ def test_reconnect_functionality(mode, monkeypatch: pytest.MonkeyPatch) -> None: temp_d.cleanup() -def test_reconnect_with_process(create_plain_run: tuple[sv_run.Run, dict]) -> None: - run, _ = create_plain_run - run.init(name="test_reconnect_with_process", folder="/simvue_unit_testing", retention_period="2 minutes", running=False) - run.close() +def test_reconnect_with_process() -> None: + _uuid = f"{uuid.uuid4()}".split("-")[0] + with simvue.Run() as run: + run.init( + name="test_reconnect_with_process", + folder=f"/simvue_unit_testing/{_uuid}", + retention_period=os.environ.get("SIMVUE_TESTING_RETENTION_PERIOD", "2 mins"), + running=False, + visibility="tenant" if os.environ.get("CI") else None, + ) with sv_run.Run() as new_run: new_run.reconnect(run.id) @@ -1200,3 +1293,12 @@ def test_reconnect_with_process(create_plain_run: tuple[sv_run.Run, dict]) -> No executable="bash", c="echo 'Hello World!'", ) + + client = sv_cl.Client() + + with contextlib.suppress(ObjectNotFoundError): + client.delete_folder( + f"/simvue_unit_testing/{_uuid}", + remove_runs=True, + recursive=True + ) diff --git a/tests/functional/test_run_execute_process.py b/tests/functional/test_run_execute_process.py index 6c071809..17add1de 100644 --- a/tests/functional/test_run_execute_process.py +++ b/tests/functional/test_run_execute_process.py @@ -1,3 +1,4 @@ +import pathlib import time import os import sys @@ -23,25 +24,48 @@ def test_monitor_processes(create_plain_run_offline: tuple[Run, dict]): @pytest.mark.executor def test_abort_all_processes(create_plain_run: tuple[Run, dict]) -> None: _run, _ = create_plain_run - start_time = time.time() - with tempfile.NamedTemporaryFile(suffix=".py") as temp_f: + with tempfile.NamedTemporaryFile(suffix=".sh") as temp_f: with open(temp_f.name, "w") as out_f: out_f.writelines([ - "import time\n", - "while True:\n" - " time.sleep(5)\n" + "for i in {0..20}; do\n", + " echo $i\n", + " sleep 1\n", + "done\n" ]) for i in range(1, 3): - _run.add_process(f"process_{i}", executable="python", script=temp_f.name) - assert _run.executor.get_command(f"process_{i}") == f"python {temp_f.name}" + _run.add_process(f"process_{i}", executable="bash", script=temp_f.name) + assert _run.executor.get_command(f"process_{i}") == f"bash {temp_f.name}" + time.sleep(3) _run.kill_all_processes() - end_time = time.time() - assert end_time - start_time < 10, f"{end_time - start_time} >= 10" + # Check that for when one of the processes has stopped + _attempts: int = 0 + _first_out = next(pathlib.Path.cwd().glob("*process_*.out")) + + while _first_out.stat().st_size == 0 and _attempts < 10: + time.sleep(1) + _attempts += 1 + + if _attempts >= 10: + raise AssertionError("Failed to terminate processes") + + # Check the Python process did not error + _out_err = pathlib.Path.cwd().glob("*process_*.err") + for file in _out_err: + with file.open() as in_f: + assert not in_f.readlines() + + # Now check the counter in the process was terminated + # just beyond the sleep time + _out_files = pathlib.Path.cwd().glob("*process_*.out") + for file in _out_files: + with file.open() as in_f: + assert (lines := in_f.readlines()) + assert int(lines[0].strip()) < 4 def test_processes_cwd(create_plain_run: dict[Run, dict]) -> None: diff --git a/tests/functional/test_scenarios.py b/tests/functional/test_scenarios.py index de6a1ea7..aa0d6a05 100644 --- a/tests/functional/test_scenarios.py +++ b/tests/functional/test_scenarios.py @@ -1,4 +1,5 @@ import pathlib +import uuid import pytest import simvue import time @@ -12,36 +13,44 @@ @pytest.mark.scenario -@pytest.mark.parametrize( - "file_size", (1, 10, 100) -) -def test_large_file_upload(file_size: int, create_plain_run: tuple[simvue.Run, dict]) -> None: +@pytest.mark.parametrize("file_size", (1, 10, 100)) +def test_large_file_upload( + file_size: int, create_plain_run: tuple[simvue.Run, dict] +) -> None: FILE_SIZE_MB: int = file_size - run, _ = create_plain_run - run.update_metadata({"file_size_mb": file_size}) _file = None _temp_file_name = None + run = simvue.Run() + _uuid = f"{uuid.uuid4()}".split("-")[0] + run.init( + "test_large_file_artifact", + folder=f"/simvue_unit_testing/{_uuid}", + retention_period="20 mins", + tags=["test_large_file_artifact"], + ) + run.update_metadata({"file_size_mb": file_size}) try: with tempfile.NamedTemporaryFile(mode="w+b", delete=False) as temp_f: temp_f.seek(FILE_SIZE_MB * 1024 * 1024 - 1) - temp_f.write(b'\0') + temp_f.write(b"\0") temp_f.flush() temp_f.seek(0) temp_f.close() _temp_file_name = temp_f.name _input_file_size = pathlib.Path(f"{_temp_file_name}").stat().st_size - run.save_file(file_path=f"{temp_f.name}", category="output", name="test_large_file_artifact") - + run.save_file( + file_path=f"{temp_f.name}", + category="output", + name="test_large_file_artifact", + ) run.close() client = simvue.Client() with tempfile.TemporaryDirectory() as tempd: client.get_artifact_as_file( - run_id=run.id, - name="test_large_file_artifact", - output_dir=tempd + run_id=run.id, name="test_large_file_artifact", output_dir=tempd ) _file = next(pathlib.Path(tempd).glob("*")) @@ -72,7 +81,7 @@ def test_time_multi_run_create_threshold() -> None: f"test run {i}", tags=["test_benchmarking"], folder="/simvue_benchmark_testing", - retention_period="1 hour" + retention_period="1 hour", ) runs.append(run) for run in runs: @@ -100,6 +109,7 @@ def delete_run(): request.addfinalizer(delete_run) return ident_dict + def upload(name: str, values_per_run: int, shared_dict) -> None: run = simvue.Run() run.init(name=name, tags=["simvue_client_tests"]) @@ -108,6 +118,7 @@ def upload(name: str, values_per_run: int, shared_dict) -> None: run.log_metrics({"increment": i}) run.close() + @pytest.mark.scenario @pytest.mark.parametrize("values_per_run", (1, 2, 100, 1500)) @pytest.mark.parametrize("processing", ("local", "on_thread", "on_process")) @@ -123,7 +134,10 @@ def test_uploaded_data_immediately_accessible( else: if processing == "on_thread": thread = threading.Thread( - target=upload, args=(name, values_per_run, shared_dict) + target=upload, + args=(name, values_per_run, shared_dict), + daemon=True, + name=f"{name}_Thread", ) else: thread = Process(target=upload, args=(name, values_per_run, shared_dict)) @@ -133,10 +147,16 @@ def test_uploaded_data_immediately_accessible( run_deleter["ident"] = shared_dict["ident"] values = simvue.Client().get_metric_values( - ["increment"], "step", run_ids=[shared_dict["ident"]], max_points=2 * values_per_run, aggregate=False + ["increment"], + "step", + run_ids=[shared_dict["ident"]], + max_points=2 * values_per_run, + aggregate=False, )["increment"] assert len(values) == values_per_run, "all uploaded values should be returned" for i in range(len(values)): - assert i == int(values[(i, shared_dict["ident"])]), "values should be ascending ints" + assert i == int(values[(i, shared_dict["ident"])]), ( + "values should be ascending ints" + )