diff --git a/.env-devel b/.env-devel index d3afee65bd49..d2abb6207717 100644 --- a/.env-devel +++ b/.env-devel @@ -244,6 +244,7 @@ RESOURCE_USAGE_TRACKER_TRACING={} R_CLONE_OPTION_BUFFER_SIZE=16M R_CLONE_OPTION_RETRIES=3 R_CLONE_OPTION_TRANSFERS=5 +R_CLONE_MOUNT_SETTINGS={} R_CLONE_PROVIDER=MINIO # simcore-user used in docker images diff --git a/.github/workflows/ci-testing-deploy.yml b/.github/workflows/ci-testing-deploy.yml index 8a30f7af4ca0..3ae7966eacaa 100644 --- a/.github/workflows/ci-testing-deploy.yml +++ b/.github/workflows/ci-testing-deploy.yml @@ -1551,8 +1551,6 @@ jobs: with: python-version: ${{ matrix.python }} cache-dependency-glob: "**/director-v2/requirements/ci.txt" - - name: setup rclone docker volume plugin - run: sudo ./ci/github/helpers/install_rclone_docker_volume_plugin.bash - name: Download and load Docker images uses: ./.github/actions/download-load-docker-images with: diff --git a/ci/github/helpers/install_rclone_docker_volume_plugin.bash b/ci/github/helpers/install_rclone_docker_volume_plugin.bash deleted file mode 100755 index 1f0e54658fbc..000000000000 --- a/ci/github/helpers/install_rclone_docker_volume_plugin.bash +++ /dev/null @@ -1,18 +0,0 @@ -#!/bin/bash -# -# Installs the latest version of rclone plugin -# - -# http://redsymbol.net/articles/unofficial-bash-strict-mode/ -set -o errexit # abort on nonzero exitstatus -set -o nounset # abort on unbound variable -set -o pipefail # don't hide errors within pipes -IFS=$'\n\t' - -# Installation instructions from https://rclone.org/docker/ -R_CLONE_VERSION="1.66.0" -mkdir --parents /var/lib/docker-plugins/rclone/config -mkdir --parents /var/lib/docker-plugins/rclone/cache -docker plugin install rclone/docker-volume-rclone:amd64-${R_CLONE_VERSION} args="-v" --alias rclone --grant-all-permissions -docker plugin list -docker plugin inspect rclone diff --git a/packages/postgres-database/src/simcore_postgres_database/migration/versions/e4db35fe8054_added_use_r_clone_mounting_field.py b/packages/postgres-database/src/simcore_postgres_database/migration/versions/e4db35fe8054_added_use_r_clone_mounting_field.py new file mode 100644 index 000000000000..c15a2cd96fbb --- /dev/null +++ b/packages/postgres-database/src/simcore_postgres_database/migration/versions/e4db35fe8054_added_use_r_clone_mounting_field.py @@ -0,0 +1,36 @@ +"""added use_r_clone_mounting field + +Revision ID: e4db35fe8054 +Revises: ce69cc44246a +Create Date: 2025-12-16 11:43:36.941571+00:00 + +""" + +import sqlalchemy as sa +from alembic import op + +# revision identifiers, used by Alembic. +revision = "e4db35fe8054" +down_revision = "ce69cc44246a" +branch_labels = None +depends_on = None + + +def upgrade(): + # ### commands auto generated by Alembic - please adjust! ### + op.add_column( + "groups_extra_properties", + sa.Column( + "use_r_clone_mounting", + sa.Boolean(), + server_default=sa.text("false"), + nullable=False, + ), + ) + # ### end Alembic commands ### + + +def downgrade(): + # ### commands auto generated by Alembic - please adjust! ### + op.drop_column("groups_extra_properties", "use_r_clone_mounting") + # ### end Alembic commands ### diff --git a/packages/postgres-database/src/simcore_postgres_database/models/groups_extra_properties.py b/packages/postgres-database/src/simcore_postgres_database/models/groups_extra_properties.py index e25a1bd3b2bd..84e28a42c884 100644 --- a/packages/postgres-database/src/simcore_postgres_database/models/groups_extra_properties.py +++ b/packages/postgres-database/src/simcore_postgres_database/models/groups_extra_properties.py @@ -68,6 +68,13 @@ server_default=sa.sql.expression.false(), doc="If true, will mount efs distributed file system when dynamic services starts", ), + sa.Column( + "use_r_clone_mounting", + sa.Boolean(), + nullable=False, + server_default=sa.sql.expression.false(), + doc="If true, will mount efs distributed file system when dynamic services starts", + ), sa.UniqueConstraint( "group_id", "product_name", name="group_id_product_name_uniqueness" ), diff --git a/packages/postgres-database/src/simcore_postgres_database/utils_groups_extra_properties.py b/packages/postgres-database/src/simcore_postgres_database/utils_groups_extra_properties.py index b1cb32abf9f2..e645bcda535a 100644 --- a/packages/postgres-database/src/simcore_postgres_database/utils_groups_extra_properties.py +++ b/packages/postgres-database/src/simcore_postgres_database/utils_groups_extra_properties.py @@ -40,6 +40,7 @@ class GroupExtraProperties(FromRowMixin): created: datetime.datetime modified: datetime.datetime enable_efs: bool + use_r_clone_mounting: bool def _list_table_entries_ordered_by_group_type_stmt(user_id: int, product_name: str): diff --git a/packages/service-library/src/servicelib/r_clone_utils.py b/packages/service-library/src/servicelib/r_clone_utils.py new file mode 100644 index 000000000000..933bd29d2e63 --- /dev/null +++ b/packages/service-library/src/servicelib/r_clone_utils.py @@ -0,0 +1,13 @@ +from collections.abc import AsyncIterator +from contextlib import asynccontextmanager + +from aiofiles import tempfile + + +@asynccontextmanager +async def config_file(config: str) -> AsyncIterator[str]: + async with tempfile.NamedTemporaryFile("w") as f: + await f.write(config) + await f.flush() + assert isinstance(f.name, str) # nosec + yield f.name diff --git a/packages/service-library/tests/tests_r_clone_utils.py b/packages/service-library/tests/tests_r_clone_utils.py new file mode 100644 index 000000000000..94f9a826ab03 --- /dev/null +++ b/packages/service-library/tests/tests_r_clone_utils.py @@ -0,0 +1,11 @@ +from pathlib import Path + +from faker import Faker +from servicelib.r_clone_utils import config_file + + +async def test_config_file(faker: Faker) -> None: + text_to_write = faker.text() + async with config_file(text_to_write) as file_name: + assert text_to_write == Path(file_name).read_text() + assert Path(file_name).exists() is False diff --git a/packages/settings-library/src/settings_library/r_clone.py b/packages/settings-library/src/settings_library/r_clone.py index d1a6472e9c67..545a9a7724be 100644 --- a/packages/settings-library/src/settings_library/r_clone.py +++ b/packages/settings-library/src/settings_library/r_clone.py @@ -1,34 +1,238 @@ +from datetime import timedelta from enum import StrEnum -from typing import Annotated +from pathlib import Path +from typing import Annotated, Final -from pydantic import Field, NonNegativeInt +from common_library.pydantic_validators import validate_numeric_string_as_timedelta +from pydantic import ByteSize, Field, NonNegativeInt, TypeAdapter from .base import BaseCustomSettings from .s3 import S3Settings +DEFAULT_VFS_CACHE_PATH: Final[Path] = Path("/vfs-cache") +DEFAULT_VFS_CACHE_MAX_SIZE: Final[str] = "500G" + + +_TRANSFER_COUNT: Final[NonNegativeInt] = 15 +_TPSLIMIT: Final[NonNegativeInt] = 2000 + +_ONE_NANO_CPU: Final[NonNegativeInt] = int(1e9) + class S3Provider(StrEnum): AWS = "AWS" + AWS_MOTO = "AWS_MOTO" CEPH = "CEPH" MINIO = "MINIO" +class RCloneMountSettings(BaseCustomSettings): + R_CLONE_MOUNT_TRANSFERS_COMPLETED_TIMEOUT: Annotated[ + timedelta, + Field( + description="max amount of time to wait for rclone mount command to finish", + ), + ] = timedelta(minutes=60) + + _validate_r_clone_mount_transfers_completed_timeout = ( + validate_numeric_string_as_timedelta( + "R_CLONE_MOUNT_TRANSFERS_COMPLETED_TIMEOUT" + ) + ) + + # CONTAINER + R_CLONE_MOUNT_CONTAINER_CONFIG_FILE_PATH: Annotated[ + Path, + Field( + description="path inside the container where the rclone config file is located", + ), + ] = Path( + "/tmp/rclone.conf" # noqa: S108 + ) + + R_CLONE_MOUNT_CONTAINER_SHOW_DEBUG_LOGS: Annotated[ + bool, + Field( + description="whether to enable debug logs for the rclone mount command", + ), + ] = False + + R_CLONE_MOUNT_CONTAINER_MEMORY_LIMIT: Annotated[ + ByteSize, Field(description="memory limit for the rclone mount container") + ] = TypeAdapter(ByteSize).validate_python("2GiB") + + R_CLONE_MOUNT_CONTAINER_NANO_CPUS: Annotated[ + NonNegativeInt, Field(description="CPU limit for the rclone mount container") + ] = (1 * _ONE_NANO_CPU) + + # CLI command `rclone mount` + + R_CLONE_MOUNT_VFS_CACHE_PATH: Annotated[ + Path, + Field( + description="`--cache-dir X`: sets the path to use for vfs cache", + ), + ] = DEFAULT_VFS_CACHE_PATH + + R_CLONE_MOUNT_VFS_READ_AHEAD: Annotated[ + str, + Field( + description="`--vfs-read-ahead X`: sets the read ahead buffer size", + ), + ] = "16M" + + R_CLONE_MOUNT_VFS_CACHE_MAX_SIZE: Annotated[ + str, + Field( + description="`--vfs-cache-max-size X`: sets the maximum size of the vfs cache", + ), + ] = DEFAULT_VFS_CACHE_MAX_SIZE + + R_CLONE_MOUNT_VFS_CACHE_MIN_FREE_SPACE: Annotated[ + str, + Field( + description="`--vfs-cache-min-free-space X`: sets the minimum free space to keep on disk", + ), + ] = "5G" + + R_CLONE_MOUNT_CACHE_POLL_INTERVAL: Annotated[ + str, + Field( + description="`--vfs-cache-poll-interval X`: sets the interval to poll the vfs cache", + ), + ] = "1m" + + R_CLONE_MOUNT_VFS_WRITE_BACK: Annotated[ + str, + Field( + description="`--vfs-write-back X`: sets the time to wait before writing back data to the remote", + ), + ] = "10s" + + R_CLONE_MOUNT_DIR_CACHE_TIME: Annotated[ + str, + Field( + description="`--dir-cache-time X`: time before directory is uploaded from remote if changed", + ), + ] = "10m" + + R_CLONE_MOUNT_ATTR_TIMEOUT: Annotated[ + str, + Field( + description="`--attr-timeout X`: sets the time to cache file attributes", + ), + ] = "1m" + + R_CLONE_MOUNT_TPSLIMIT: Annotated[ + NonNegativeInt, + Field( + description="`--tpslimit X`: sets the transactions per second limit", + ), + ] = _TPSLIMIT + R_CLONE_MOUNT_TPSLIMIT_BURST: Annotated[ + NonNegativeInt, + Field( + description="`--tpslimit-burst X`: sets the burst limit for transactions per second", + ), + ] = ( + _TPSLIMIT * 2 + ) + + R_CLONE_MOUNT_MAX_BUFFER_MEMORY: Annotated[ + str, + Field( + description="`--max-buffer-memory X`: sets the maximum buffer memory for rclone", + ), + ] = "16M" + + R_CLONE_MOUNT_RETRIES: Annotated[ + NonNegativeInt, + Field( + description="`--retries X`: sets the number of retries for rclone mount command", + ), + ] = 3 + + R_CLONE_MOUNT_RETRIES_SLEEP: Annotated[ + str, + Field( + description="`--retries-sleep X`: sets the maximum sleep time between retries", + ), + ] = "30s" + R_CLONE_MOUNT_TRANSFERS: Annotated[ + NonNegativeInt, + Field( + description="`--transfers X`: sets the number of parallel transfers for rclone mount command", + ), + ] = 15 + R_CLONE_MOUNT_BUFFER_SIZE: Annotated[ + str, + Field( + description="`--buffer-size X`: sets the buffer size for rclone mount command", + ), + ] = "16M" + R_CLONE_MOUNT_CHECKERS: Annotated[ + NonNegativeInt, + Field( + description="`--checkers X`: sets the number of checkers for rclone mount command", + ), + ] = 8 + R_CLONE_MOUNT_S3_UPLOAD_CONCURRENCY: Annotated[ + NonNegativeInt, + Field( + description="`--s3-upload-concurrency X`: sets the number of concurrent uploads to S3", + ), + ] = 5 + R_CLONE_MOUNT_S3_CHUNK_SIZE: Annotated[ + str, + Field( + description="`--s3-chunk-size X`: sets the chunk size for S3", + ), + ] = "16M" + R_CLONE_MOUNT_ORDER_BY: Annotated[ + str, + Field( + description="`--order-by X`: sets the order of file upload, e.g., 'size,mixed'", + ), + ] = "size,mixed" + + class RCloneSettings(BaseCustomSettings): R_CLONE_S3: Annotated[ S3Settings, Field(json_schema_extra={"auto_default_from_env": True}) ] R_CLONE_PROVIDER: S3Provider + R_CLONE_VERSION: Annotated[ + str | None, + Field( + pattern=r"^\d+\.\d+\.\d+$", + description="version of rclone for the container image", + ), + ] = None + + R_CLONE_MOUNT_SETTINGS: RCloneMountSettings = Field( + json_schema_extra={"auto_default_from_env": True} + ) + R_CLONE_OPTION_TRANSFERS: Annotated[ # SEE https://rclone.org/docs/#transfers-n NonNegativeInt, Field(description="`--transfers X`: sets the amount of parallel transfers"), ] = 5 + R_CLONE_OPTION_RETRIES: Annotated[ # SEE https://rclone.org/docs/#retries-int NonNegativeInt, Field(description="`--retries X`: times to retry each individual transfer"), ] = 3 + + R_CLONE_RETRIES_SLEEP: Annotated[ + str, + Field( + description="`--retries-sleep X`: max time to sleep between retries (caps exponential backoff)" + ), + ] = "30s" + R_CLONE_OPTION_BUFFER_SIZE: Annotated[ # SEE https://rclone.org/docs/#buffer-size-size str, @@ -36,3 +240,29 @@ class RCloneSettings(BaseCustomSettings): description="`--buffer-size X`: sets the amount of RAM to use for each individual transfer", ), ] = "16M" + + R_CLONE_CHECKERS: Annotated[ + NonNegativeInt, + Field( + description="`--checkers X`: sets the number checkers", + ), + ] = 8 + + R_CLONE_S3_UPLOAD_CONCURRENCY: Annotated[ + NonNegativeInt, + Field( + description="`--s3-upload-concurrency X`: sets the number of concurrent uploads to S3", + ), + ] = 5 + + R_CLONE_CHUNK_SIZE: Annotated[ + str, + Field(description="`--s3-chunk-size X`: sets the chunk size for S3"), + ] = "16M" + + R_CLONE_ORDER_BY: Annotated[ + str, + Field( + description="`--order-by X`: sets the order of file upload, e.g., 'size,mixed'", + ), + ] = "size,mixed" diff --git a/packages/settings-library/src/settings_library/utils_r_clone.py b/packages/settings-library/src/settings_library/utils_r_clone.py index cda4f878ad5b..ed4551221743 100644 --- a/packages/settings-library/src/settings_library/utils_r_clone.py +++ b/packages/settings-library/src/settings_library/utils_r_clone.py @@ -15,21 +15,29 @@ _PROVIDER_SETTINGS_OPTIONS: dict[S3Provider, dict[str, str]] = { # NOTE: # AWS_SESSION_TOKEN should be required for STS S3Provider.AWS: {"provider": "AWS"}, + S3Provider.AWS_MOTO: { + "provider": "Other", + "force_path_style": "true", + "endpoint": "{endpoint}", + }, S3Provider.CEPH: {"provider": "Ceph", "endpoint": "{endpoint}"}, S3Provider.MINIO: {"provider": "Minio", "endpoint": "{endpoint}"}, } -def _format_config(settings_options: dict[str, str], s3_config_key: str) -> str: +def format_config(config_key: str, settings_options: dict[str, str]) -> str: + """creates .ini file content for a given rclone configuration""" config = configparser.ConfigParser() - config[s3_config_key] = settings_options + config[config_key] = settings_options with StringIO() as string_io: config.write(string_io) string_io.seek(0) return string_io.read() -def get_r_clone_config(r_clone_settings: RCloneSettings, *, s3_config_key: str) -> str: +def get_s3_r_clone_config( + r_clone_settings: RCloneSettings, *, s3_config_key: str +) -> str: """ Arguments: r_clone_settings -- current rclone configuration @@ -44,8 +52,8 @@ def get_r_clone_config(r_clone_settings: RCloneSettings, *, s3_config_key: str) _PROVIDER_SETTINGS_OPTIONS[r_clone_settings.R_CLONE_PROVIDER] ) - r_clone_config_template = _format_config( - settings_options=settings_options, s3_config_key=s3_config_key + r_clone_config_template = format_config( + config_key=s3_config_key, settings_options=settings_options ) # replace entries in template diff --git a/packages/settings-library/tests/test_utils_r_clone.py b/packages/settings-library/tests/test_utils_r_clone.py index 82dabf47daf8..a1a0be0207ab 100644 --- a/packages/settings-library/tests/test_utils_r_clone.py +++ b/packages/settings-library/tests/test_utils_r_clone.py @@ -5,7 +5,7 @@ from settings_library.r_clone import RCloneSettings, S3Provider from settings_library.utils_r_clone import ( _COMMON_SETTINGS_OPTIONS, - get_r_clone_config, + get_s3_r_clone_config, resolve_provider, ) @@ -24,7 +24,7 @@ def r_clone_settings( def test_r_clone_config_template_replacement(r_clone_settings: RCloneSettings) -> None: - r_clone_config = get_r_clone_config(r_clone_settings, s3_config_key="target-s3") + r_clone_config = get_s3_r_clone_config(r_clone_settings, s3_config_key="target-s3") print(r_clone_config) assert "{endpoint}" not in r_clone_config diff --git a/packages/simcore-sdk/requirements/_base.in b/packages/simcore-sdk/requirements/_base.in index 4ce6caec6571..ba257f7663d0 100644 --- a/packages/simcore-sdk/requirements/_base.in +++ b/packages/simcore-sdk/requirements/_base.in @@ -11,6 +11,7 @@ aiocache +aiodocker aiofiles aiohttp httpx diff --git a/packages/simcore-sdk/requirements/_base.txt b/packages/simcore-sdk/requirements/_base.txt index c161d302f34b..7b2c72c20900 100644 --- a/packages/simcore-sdk/requirements/_base.txt +++ b/packages/simcore-sdk/requirements/_base.txt @@ -7,7 +7,9 @@ aiocache==0.12.3 aiodebug==2.3.0 # via -r requirements/../../../packages/service-library/requirements/_base.in aiodocker==0.24.0 - # via -r requirements/../../../packages/service-library/requirements/_base.in + # via + # -r requirements/../../../packages/service-library/requirements/_base.in + # -r requirements/_base.in aiofiles==24.1.0 # via # -r requirements/../../../packages/service-library/requirements/_base.in diff --git a/packages/simcore-sdk/src/simcore_sdk/node_data/data_manager.py b/packages/simcore-sdk/src/simcore_sdk/node_data/data_manager.py index fbb9b1980110..9511a9984437 100644 --- a/packages/simcore-sdk/src/simcore_sdk/node_data/data_manager.py +++ b/packages/simcore-sdk/src/simcore_sdk/node_data/data_manager.py @@ -2,11 +2,12 @@ from pathlib import Path from tempfile import TemporaryDirectory +from models_library.products import ProductName from models_library.projects import ProjectID from models_library.projects_nodes_io import NodeID, StorageFileID from models_library.service_settings_labels import LegacyState from models_library.users import UserID -from pydantic import TypeAdapter +from pydantic import NonNegativeInt, TypeAdapter from servicelib.archiving_utils import unarchive_dir from servicelib.logging_utils import log_context from servicelib.progress_bar import ProgressBarData @@ -16,16 +17,22 @@ from ..node_ports_common.constants import SIMCORE_LOCATION from ..node_ports_common.dbmanager import DBManager from ..node_ports_common.file_io_utils import LogRedirectCB +from ..node_ports_common.r_clone_mount import ( + GetBindPathsProtocol, + MountActivityProtocol, + MountRemoteType, + RCloneMountManager, +) _logger = logging.getLogger(__name__) def __create_s3_object_key( - project_id: ProjectID, node_uuid: NodeID, file_path: Path | str + project_id: ProjectID, node_id: NodeID, file_path: Path | str ) -> StorageFileID: file_name = file_path.name if isinstance(file_path, Path) else file_path return TypeAdapter(StorageFileID).validate_python( - f"{project_id}/{node_uuid}/{file_name}" + f"{project_id}/{node_id}/{file_name}" ) @@ -182,11 +189,18 @@ async def _delete_legacy_archive( ) -async def push( # pylint: disable=too-many-arguments +async def _stop_mount( + mount_manager: RCloneMountManager, destination_path: Path, index: NonNegativeInt +) -> None: + await mount_manager.ensure_unmounted(destination_path, index) + + +async def push( # pylint: disable=too-many-arguments # noqa: PLR0913 user_id: UserID, project_id: ProjectID, node_uuid: NodeID, source_path: Path, + index: NonNegativeInt, *, io_log_redirect_cb: LogRedirectCB, r_clone_settings: RCloneSettings, @@ -194,19 +208,23 @@ async def push( # pylint: disable=too-many-arguments progress_bar: ProgressBarData, legacy_state: LegacyState | None, application_name: str, + mount_manager: RCloneMountManager, ) -> None: """pushes and removes the legacy archive if present""" - await _push_directory( - user_id=user_id, - project_id=project_id, - node_uuid=node_uuid, - source_path=source_path, - r_clone_settings=r_clone_settings, - exclude_patterns=exclude_patterns, - io_log_redirect_cb=io_log_redirect_cb, - progress_bar=progress_bar, - ) + if mount_manager.is_mount_tracked(source_path, index): + await _stop_mount(mount_manager, source_path, index) + else: + await _push_directory( + user_id=user_id, + project_id=project_id, + node_uuid=node_uuid, + source_path=source_path, + r_clone_settings=r_clone_settings, + exclude_patterns=exclude_patterns, + io_log_redirect_cb=io_log_redirect_cb, + progress_bar=progress_bar, + ) archive_exists = await _state_metadata_entry_exists( user_id=user_id, @@ -244,19 +262,70 @@ async def push( # pylint: disable=too-many-arguments ) -async def pull( +async def _requires_r_clone_mounting( + application_name: str, user_id: UserID, product_name: ProductName +) -> bool: + group_extra_properties = await DBManager( + application_name=application_name + ).get_group_extra_properties(user_id=user_id, product_name=product_name) + return group_extra_properties.use_r_clone_mounting is True + + +async def _start_mount_if_required( + mount_manager: RCloneMountManager, + user_id: UserID, + project_id: ProjectID, + node_id: NodeID, + destination_path: Path, + index: NonNegativeInt, + handler_get_bind_paths: GetBindPathsProtocol, + handler_mount_activity: MountActivityProtocol, + *, + use_r_clone_mount: bool, +) -> None: + if not use_r_clone_mount: + return + + s3_object = __create_s3_object_key(project_id, node_id, destination_path) + + await filemanager.create_r_clone_mounted_directory_entry( + user_id=user_id, s3_object=s3_object, store_id=SIMCORE_LOCATION + ) + + await mount_manager.ensure_mounted( + destination_path, + index, + node_id=node_id, + remote_type=MountRemoteType.S3, + remote_path=s3_object, + handler_get_bind_paths=handler_get_bind_paths, + handler_mount_activity=handler_mount_activity, + ) + + +async def pull( # pylint: disable=too-many-arguments # noqa: PLR0913 + product_name: ProductName, user_id: UserID, project_id: ProjectID, node_uuid: NodeID, destination_path: Path, + index: NonNegativeInt, *, io_log_redirect_cb: LogRedirectCB, r_clone_settings: RCloneSettings, progress_bar: ProgressBarData, legacy_state: LegacyState | None, + application_name: str, + mount_manager: RCloneMountManager, + handler_get_bind_paths: GetBindPathsProtocol, + handler_mount_activity: MountActivityProtocol, ) -> None: """restores the state folder""" + use_r_clone_mount = await _requires_r_clone_mounting( + application_name, user_id, product_name + ) + if legacy_state and legacy_state.new_state_path == destination_path: _logger.info( "trying to restore from legacy_state=%s, destination_path=%s", @@ -286,6 +355,17 @@ async def pull( progress_bar=progress_bar, legacy_destination_path=legacy_state.old_state_path, ) + await _start_mount_if_required( + mount_manager, + user_id, + project_id, + node_uuid, + destination_path, + index, + handler_get_bind_paths, + handler_mount_activity, + use_r_clone_mount=use_r_clone_mount, + ) return state_archive_exists = await _state_metadata_entry_exists( @@ -305,6 +385,17 @@ async def pull( io_log_redirect_cb=io_log_redirect_cb, progress_bar=progress_bar, ) + await _start_mount_if_required( + mount_manager, + user_id, + project_id, + node_uuid, + destination_path, + index, + handler_get_bind_paths, + handler_mount_activity, + use_r_clone_mount=use_r_clone_mount, + ) return state_directory_exists = await _state_metadata_entry_exists( @@ -315,15 +406,40 @@ async def pull( is_archive=False, ) if state_directory_exists: - await _pull_directory( - user_id=user_id, - project_id=project_id, - node_uuid=node_uuid, - destination_path=destination_path, - io_log_redirect_cb=io_log_redirect_cb, - r_clone_settings=r_clone_settings, - progress_bar=progress_bar, - ) + if use_r_clone_mount: + await _start_mount_if_required( + mount_manager, + user_id, + project_id, + node_uuid, + destination_path, + index, + handler_get_bind_paths, + handler_mount_activity, + use_r_clone_mount=use_r_clone_mount, + ) + else: + await _pull_directory( + user_id=user_id, + project_id=project_id, + node_uuid=node_uuid, + destination_path=destination_path, + io_log_redirect_cb=io_log_redirect_cb, + r_clone_settings=r_clone_settings, + progress_bar=progress_bar, + ) + return + await _start_mount_if_required( + mount_manager, + user_id, + project_id, + node_uuid, + destination_path, + index, + handler_get_bind_paths, + handler_mount_activity, + use_r_clone_mount=use_r_clone_mount, + ) _logger.debug("No content previously saved for '%s'", destination_path) diff --git a/packages/simcore-sdk/src/simcore_sdk/node_ports_common/dbmanager.py b/packages/simcore-sdk/src/simcore_sdk/node_ports_common/dbmanager.py index 21c0f0173b91..d74656d63eb2 100644 --- a/packages/simcore-sdk/src/simcore_sdk/node_ports_common/dbmanager.py +++ b/packages/simcore-sdk/src/simcore_sdk/node_ports_common/dbmanager.py @@ -2,6 +2,7 @@ import sqlalchemy as sa from common_library.json_serialization import json_dumps, json_loads +from models_library.products import ProductName from models_library.projects import ProjectID from models_library.users import UserID from pydantic import TypeAdapter @@ -13,6 +14,10 @@ update_for_run_id_and_node_id, ) from simcore_postgres_database.utils_comp_runs import get_latest_run_id_for_project +from simcore_postgres_database.utils_groups_extra_properties import ( + GroupExtraProperties, + GroupExtraPropertiesRepo, +) from sqlalchemy.ext.asyncio import AsyncConnection, AsyncEngine from .exceptions import NodeNotFound, ProjectNotFoundError @@ -192,3 +197,16 @@ async def get_project_owner_user_id(self, project_id: ProjectID) -> UserID: if prj_owner is None: raise ProjectNotFoundError(project_id) return TypeAdapter(UserID).validate_python(prj_owner) + + async def get_group_extra_properties( + self, user_id: UserID, product_name: ProductName + ) -> GroupExtraProperties: + async with ( + DBContextManager( + self._db_engine, application_name=self._application_name + ) as engine, + engine.connect() as connection, + ): + return await GroupExtraPropertiesRepo.get_aggregated_properties_for_user( + connection, user_id=user_id, product_name=product_name + ) diff --git a/packages/simcore-sdk/src/simcore_sdk/node_ports_common/filemanager.py b/packages/simcore-sdk/src/simcore_sdk/node_ports_common/filemanager.py index 60f44f7a7e65..d04ae37888d4 100644 --- a/packages/simcore-sdk/src/simcore_sdk/node_ports_common/filemanager.py +++ b/packages/simcore-sdk/src/simcore_sdk/node_ports_common/filemanager.py @@ -88,6 +88,32 @@ async def get_download_link_from_s3( return URL(f"{file_link}") +async def create_r_clone_mounted_directory_entry( + *, + user_id: UserID, + s3_object: StorageFileID, + store_id: LocationID | None, +) -> None: + _, upload_links = await get_upload_links_from_s3( + user_id=user_id, + store_name=None, + store_id=store_id, + s3_object=s3_object, + client_session=None, + link_type=LinkType.S3, + file_size=ByteSize(0), + is_directory=True, + sha256_checksum=None, + ) + async with ClientSessionContextManager(None) as session: + await _filemanager_utils.complete_upload( + session, + upload_links.links.complete_upload, + [], + is_directory=True, + ) + + async def get_upload_links_from_s3( *, user_id: UserID, diff --git a/packages/simcore-sdk/src/simcore_sdk/node_ports_common/r_clone.py b/packages/simcore-sdk/src/simcore_sdk/node_ports_common/r_clone.py index cf40ef8ad9cd..eb4f8e38a744 100644 --- a/packages/simcore-sdk/src/simcore_sdk/node_ports_common/r_clone.py +++ b/packages/simcore-sdk/src/simcore_sdk/node_ports_common/r_clone.py @@ -3,19 +3,17 @@ import re import shlex from asyncio.streams import StreamReader -from collections.abc import AsyncIterator -from contextlib import asynccontextmanager from pathlib import Path from typing import Final from aiocache import cached # type: ignore[import-untyped] -from aiofiles import tempfile from common_library.errors_classes import OsparcErrorMixin from pydantic import AnyUrl, BaseModel, ByteSize from servicelib.progress_bar import ProgressBarData +from servicelib.r_clone_utils import config_file from servicelib.utils import logged_gather from settings_library.r_clone import RCloneSettings -from settings_library.utils_r_clone import get_r_clone_config +from settings_library.utils_r_clone import get_s3_r_clone_config from ._utils import BaseLogParser from .r_clone_utils import ( @@ -45,15 +43,6 @@ class RCloneDirectoryNotFoundError(BaseRCloneError): ) -@asynccontextmanager -async def _config_file(config: str) -> AsyncIterator[str]: - async with tempfile.NamedTemporaryFile("w") as f: - await f.write(config) - await f.flush() - assert isinstance(f.name, str) # nosec - yield f.name - - async def _read_stream(stream: StreamReader, r_clone_log_parsers: list[BaseLogParser]): while True: line: bytes = await stream.readline() @@ -92,7 +81,6 @@ async def _async_r_clone_command( [asyncio.create_task(_read_stream(proc.stdout, [*r_clone_log_parsers]))] ) - # NOTE: ANE not sure why you do this call here. The above one already reads out the stream. _stdout, _stderr = await proc.communicate() command_output = command_result_parser.get_output() @@ -146,10 +134,10 @@ async def _get_folder_size( folder: Path, s3_config_key: str, ) -> ByteSize: - r_clone_config_file_content = get_r_clone_config( + r_clone_config_file_content = get_s3_r_clone_config( r_clone_settings, s3_config_key=s3_config_key ) - async with _config_file(r_clone_config_file_content) as config_file_name: + async with config_file(r_clone_config_file_content) as config_file_name: r_clone_command = ( "rclone", f"--config {config_file_name}", @@ -190,22 +178,34 @@ async def _sync_sources( s3_config_key=s3_config_key, ) - r_clone_config_file_content = get_r_clone_config( + r_clone_config_file_content = get_s3_r_clone_config( r_clone_settings, s3_config_key=s3_config_key ) - async with _config_file(r_clone_config_file_content) as config_file_name: + async with config_file(r_clone_config_file_content) as config_file_name: r_clone_command = ( "rclone", "--config", config_file_name, "--retries", f"{r_clone_settings.R_CLONE_OPTION_RETRIES}", + "--retries-sleep", + r_clone_settings.R_CLONE_RETRIES_SLEEP, "--transfers", f"{r_clone_settings.R_CLONE_OPTION_TRANSFERS}", # below two options reduce to a minimum the memory footprint # https://forum.rclone.org/t/how-to-set-a-memory-limit/10230/4 "--buffer-size", # docs https://rclone.org/docs/#buffer-size-size r_clone_settings.R_CLONE_OPTION_BUFFER_SIZE, + "--checkers", + f"{r_clone_settings.R_CLONE_CHECKERS}", + "--s3-upload-concurrency", + f"{r_clone_settings.R_CLONE_S3_UPLOAD_CONCURRENCY}", + "--s3-chunk-size", + r_clone_settings.R_CLONE_CHUNK_SIZE, + # handles the order of file upload + "--order-by", + r_clone_settings.R_CLONE_ORDER_BY, + "--fast-list", "--use-json-log", # frequent polling for faster progress updates "--stats", diff --git a/packages/simcore-sdk/src/simcore_sdk/node_ports_common/r_clone_mount/__init__.py b/packages/simcore-sdk/src/simcore_sdk/node_ports_common/r_clone_mount/__init__.py new file mode 100644 index 000000000000..18d360c189a7 --- /dev/null +++ b/packages/simcore-sdk/src/simcore_sdk/node_ports_common/r_clone_mount/__init__.py @@ -0,0 +1,21 @@ +from ._config_provider import MountRemoteType +from ._errors import MountAlreadyStartedError +from ._manager import RCloneMountManager +from ._models import ( + GetBindPathsProtocol, + MountActivity, + MountActivityProtocol, + RequestShutdownProtocol, + Transferring, +) + +__all__: tuple[str, ...] = ( + "GetBindPathsProtocol", + "MountActivity", + "MountActivityProtocol", + "MountAlreadyStartedError", + "MountRemoteType", + "RCloneMountManager", + "RequestShutdownProtocol", + "Transferring", +) diff --git a/packages/simcore-sdk/src/simcore_sdk/node_ports_common/r_clone_mount/_config_provider.py b/packages/simcore-sdk/src/simcore_sdk/node_ports_common/r_clone_mount/_config_provider.py new file mode 100644 index 000000000000..cc893b0e787d --- /dev/null +++ b/packages/simcore-sdk/src/simcore_sdk/node_ports_common/r_clone_mount/_config_provider.py @@ -0,0 +1,23 @@ +from enum import Enum, auto +from typing import Final + +from settings_library.r_clone import RCloneSettings +from settings_library.utils_r_clone import get_s3_r_clone_config + +CONFIG_KEY: Final[str] = "MOUNT_REMOTE" + + +class MountRemoteType(Enum): + S3 = auto() + # NOTE: oauth atuthorization pattern needs to be setup for non S3 providers + + +def get_config_content( + r_clone_settings: RCloneSettings, mount_remote_type: MountRemoteType +) -> str: + match mount_remote_type: + case MountRemoteType.S3: + return get_s3_r_clone_config(r_clone_settings, s3_config_key=CONFIG_KEY) + case _: + msg = f"Mount type {mount_remote_type} not implemented" + raise NotImplementedError(msg) diff --git a/packages/simcore-sdk/src/simcore_sdk/node_ports_common/r_clone_mount/_container.py b/packages/simcore-sdk/src/simcore_sdk/node_ports_common/r_clone_mount/_container.py new file mode 100644 index 000000000000..8ca4a8d5fd45 --- /dev/null +++ b/packages/simcore-sdk/src/simcore_sdk/node_ports_common/r_clone_mount/_container.py @@ -0,0 +1,354 @@ +import asyncio +import logging +from datetime import timedelta +from functools import cached_property +from pathlib import Path +from textwrap import dedent +from typing import Any, Final + +from httpx import AsyncClient, HTTPError +from models_library.basic_types import PortInt +from models_library.progress_bar import ProgressReport +from models_library.projects_nodes_io import NodeID, StorageFileID +from pydantic import NonNegativeInt +from settings_library.r_clone import RCloneMountSettings, RCloneSettings +from tenacity import ( + before_sleep_log, + retry, + retry_if_exception_type, + stop_after_delay, + wait_fixed, +) + +from . import _docker_utils +from ._config_provider import CONFIG_KEY +from ._errors import ( + WaitingForQueueToBeEmptyError, + WaitingForTransfersToCompleteError, +) +from ._models import ( + GetBindPathsProtocol, + MountActivity, +) +from ._utils import get_mount_id + +_logger = logging.getLogger(__name__) + + +_MAX_WAIT_RC_HTTP_INTERFACE_READY: Final[timedelta] = timedelta(seconds=10) +_DEFAULT_UPDATE_INTERVAL: Final[timedelta] = timedelta(seconds=1) +_DEFAULT_R_CLONE_CLIENT_REQUEST_TIMEOUT: Final[timedelta] = timedelta(seconds=20) + + +_DOCKER_PREFIX_MOUNT: Final[str] = "rcm" + + +_R_CLONE_MOUNT_TEMPLATE: Final[str] = dedent( + """ +set -e + +MOUNT_POINT='{local_mount_path}' + +cleanup() {{ + echo 'STARTED CLEANUP...' + umount -f "$MOUNT_POINT" || true + echo 'FINISHED CLEANUP' +}} +trap cleanup SIGTERM SIGINT + +cat < {r_clone_config_path} +{r_clone_config_content} +EOF + +echo "Start command: {r_clone_command}" + +{r_clone_command} 2>&1 & + +RCLONE_PID=$! +wait "$RCLONE_PID" +echo "rclone exited, running cleanup (if not already triggered)..." +cleanup +""" +) + + +def _get_rclone_mount_command( + mount_settings: RCloneMountSettings, + r_clone_config_content: str, + remote_path: StorageFileID, + local_mount_path: Path, + rc_port: PortInt, + rc_user: str, + rc_password: str, +) -> str: + escaped_remote_path = f"{remote_path}".lstrip("/") + + r_clone_command = " ".join( + [ + "rclone", + "--config", + f"{mount_settings.R_CLONE_MOUNT_CONTAINER_CONFIG_FILE_PATH}", + ("-vv" if mount_settings.R_CLONE_MOUNT_CONTAINER_SHOW_DEBUG_LOGS else ""), + "mount", + f"{CONFIG_KEY}:{escaped_remote_path}", + f"{local_mount_path}", + # VFS + "--vfs-cache-mode", + "full", + "--vfs-read-ahead", + mount_settings.R_CLONE_MOUNT_VFS_READ_AHEAD, + "--vfs-cache-max-size", + mount_settings.R_CLONE_MOUNT_VFS_CACHE_MAX_SIZE, + "--vfs-cache-min-free-space", + mount_settings.R_CLONE_MOUNT_VFS_CACHE_MIN_FREE_SPACE, + "--vfs-cache-poll-interval", + mount_settings.R_CLONE_MOUNT_CACHE_POLL_INTERVAL, + "--write-back-cache", + "--vfs-write-back", + mount_settings.R_CLONE_MOUNT_VFS_WRITE_BACK, + "--cache-dir", + f"{mount_settings.R_CLONE_MOUNT_VFS_CACHE_PATH}", + "--dir-cache-time", + mount_settings.R_CLONE_MOUNT_DIR_CACHE_TIME, + "--attr-timeout", + mount_settings.R_CLONE_MOUNT_ATTR_TIMEOUT, + "--tpslimit", + f"{mount_settings.R_CLONE_MOUNT_TPSLIMIT}", + "--tpslimit-burst", + f"{mount_settings.R_CLONE_MOUNT_TPSLIMIT_BURST}", + "--no-modtime", + "--max-buffer-memory", + mount_settings.R_CLONE_MOUNT_MAX_BUFFER_MEMORY, + # TRANSFERS + "--retries", + f"{mount_settings.R_CLONE_MOUNT_RETRIES}", + "--retries-sleep", + mount_settings.R_CLONE_MOUNT_RETRIES_SLEEP, + "--transfers", + f"{mount_settings.R_CLONE_MOUNT_TRANSFERS}", + "--buffer-size", + mount_settings.R_CLONE_MOUNT_BUFFER_SIZE, + "--checkers", + f"{mount_settings.R_CLONE_MOUNT_CHECKERS}", + "--s3-upload-concurrency", + f"{mount_settings.R_CLONE_MOUNT_S3_UPLOAD_CONCURRENCY}", + "--s3-chunk-size", + mount_settings.R_CLONE_MOUNT_S3_CHUNK_SIZE, + "--order-by", + mount_settings.R_CLONE_MOUNT_ORDER_BY, + # REMOTE CONTROL + "--rc", + f"--rc-addr=0.0.0.0:{rc_port}", + "--rc-enable-metrics", + f"--rc-user='{rc_user}'", + f"--rc-pass='{rc_password}'", + "--allow-non-empty", + "--allow-other", + ] + ) + return _R_CLONE_MOUNT_TEMPLATE.format( + r_clone_config_path=mount_settings.R_CLONE_MOUNT_CONTAINER_CONFIG_FILE_PATH, + r_clone_config_content=r_clone_config_content, + r_clone_command=r_clone_command, + local_mount_path=local_mount_path, + ) + + +class ContainerManager: # pylint:disable=too-many-instance-attributes + def __init__( # pylint:disable=too-many-arguments + self, + r_clone_settings: RCloneSettings, + node_id: NodeID, + rc_port: PortInt, + local_mount_path: Path, + index: NonNegativeInt, + r_clone_config_content: str, + remote_path: str, + rc_user: str, + rc_password: str, + *, + handler_get_bind_paths: GetBindPathsProtocol, + ) -> None: + self.r_clone_settings = r_clone_settings + self.node_id = node_id + self.rc_port = rc_port + self.local_mount_path = local_mount_path + self.index = index + self.r_clone_config_content = r_clone_config_content + self.remote_path = remote_path + self.rc_user = rc_user + self.rc_password = rc_password + + self.handler_get_bind_paths = handler_get_bind_paths + + @cached_property + def r_clone_container_name(self) -> str: + mount_id = get_mount_id(self.local_mount_path, self.index) + return f"{_DOCKER_PREFIX_MOUNT}-c-{self.node_id}{mount_id}"[:63] + + @cached_property + def _r_clone_network_name(self) -> str: + mount_id = get_mount_id(self.local_mount_path, self.index) + return f"{_DOCKER_PREFIX_MOUNT}-c-{self.node_id}{mount_id}"[:63] + + async def create(self): + async with _docker_utils.get_or_crate_docker_session(None) as client: + # ensure nothing was left from previous runs + await _docker_utils.remove_container_if_exists( + client, self.r_clone_container_name + ) + await _docker_utils.remove_network_if_exists( + client, self.r_clone_container_name + ) + + # create network + container and connect to sidecar + await _docker_utils.create_network_and_connect_sidecar_container( + client, self._r_clone_network_name + ) + + assert self.r_clone_settings.R_CLONE_VERSION is not None # nosec + mount_settings = self.r_clone_settings.R_CLONE_MOUNT_SETTINGS + await _docker_utils.create_r_clone_container( + client, + self.r_clone_container_name, + command=_get_rclone_mount_command( + mount_settings=mount_settings, + r_clone_config_content=self.r_clone_config_content, + remote_path=self.remote_path, + local_mount_path=self.local_mount_path, + rc_port=self.rc_port, + rc_user=self.rc_user, + rc_password=self.rc_password, + ), + r_clone_version=self.r_clone_settings.R_CLONE_VERSION, + rc_port=self.rc_port, + r_clone_network_name=self._r_clone_network_name, + local_mount_path=self.local_mount_path, + memory_limit=mount_settings.R_CLONE_MOUNT_CONTAINER_MEMORY_LIMIT, + nano_cpus=mount_settings.R_CLONE_MOUNT_CONTAINER_NANO_CPUS, + handler_get_bind_paths=self.handler_get_bind_paths, + ) + + async def remove(self): + async with _docker_utils.get_or_crate_docker_session(None) as client: + await _docker_utils.remove_container_if_exists( + client, self.r_clone_container_name + ) + await _docker_utils.remove_network_if_exists( + client, self.r_clone_container_name + ) + + +class RemoteControlHttpClient: + def __init__( + self, + rc_host: str, + rc_port: PortInt, + rc_user: str, + rc_password: str, + *, + transfers_completed_timeout: timedelta, + update_interval: timedelta = _DEFAULT_UPDATE_INTERVAL, + r_clone_client_timeout: timedelta = _DEFAULT_R_CLONE_CLIENT_REQUEST_TIMEOUT, + ) -> None: + self.transfers_completed_timeout = transfers_completed_timeout + self._update_interval_seconds = update_interval.total_seconds() + self._r_clone_client_timeout = r_clone_client_timeout + + self.rc_host = rc_host + self.rc_port = rc_port + self._auth = (rc_user, rc_password) + + @property + def _base_url(self) -> float: + return f"http://{self.rc_host}:{self.rc_port}" + + async def _request(self, method: str, path: str) -> Any: + request_url = f"{self._base_url}/{path}" + _logger.debug("Sending '%s %s' request", method, request_url) + + async with AsyncClient( + timeout=self._r_clone_client_timeout.total_seconds() + ) as client: + response = await client.request(method, request_url, auth=self._auth) + response.raise_for_status() + return response.json() + + async def _post_core_stats(self) -> dict: + return await self._request("POST", "core/stats") + + async def _post_vfs_queue(self) -> dict: + return await self._request("POST", "vfs/queue") + + async def _rc_noop(self) -> dict: + return await self._request("POST", "rc/noop") + + async def get_mount_activity(self) -> MountActivity: + core_stats, vfs_queue = await asyncio.gather( + self._post_core_stats(), self._post_vfs_queue() + ) + + return MountActivity( + transferring=( + { + x["name"]: ProgressReport( + actual_value=( + x["percentage"] / 100 if "percentage" in x else 0.0 + ) + ) + for x in core_stats["transferring"] + } + if "transferring" in core_stats + else {} + ), + queued=[x["name"] for x in vfs_queue["queue"]], + ) + + @retry( + wait=wait_fixed(1), + stop=stop_after_delay(_MAX_WAIT_RC_HTTP_INTERFACE_READY.total_seconds()), + reraise=True, + retry=retry_if_exception_type(HTTPError), + before_sleep=before_sleep_log(_logger, logging.WARNING), + ) + async def wait_for_interface_to_be_ready(self) -> None: + await self._post_vfs_queue() + + async def is_responsive(self) -> bool: + try: + await self._rc_noop() + return True + except HTTPError: + return False + + async def wait_for_all_transfers_to_complete(self) -> None: + """ + Should be waited before closing the mount + to ensure all data is transferred to remote. + """ + + @retry( + wait=wait_fixed(1), + stop=stop_after_delay(self.transfers_completed_timeout.total_seconds()), + reraise=True, + retry=retry_if_exception_type( + (WaitingForQueueToBeEmptyError, WaitingForTransfersToCompleteError) + ), + before_sleep=before_sleep_log(_logger, logging.WARNING), + ) + async def _() -> None: + core_stats, vfs_queue = await asyncio.gather( + self._post_core_stats(), self._post_vfs_queue() + ) + + if ( + core_stats["transfers"] != core_stats["totalTransfers"] + or "transferring" in core_stats + ): + raise WaitingForTransfersToCompleteError + + queue = vfs_queue["queue"] + if len(queue) != 0: + raise WaitingForQueueToBeEmptyError(queue=queue) + + await _() diff --git a/packages/simcore-sdk/src/simcore_sdk/node_ports_common/r_clone_mount/_docker_utils.py b/packages/simcore-sdk/src/simcore_sdk/node_ports_common/r_clone_mount/_docker_utils.py new file mode 100644 index 000000000000..185eb20e8414 --- /dev/null +++ b/packages/simcore-sdk/src/simcore_sdk/node_ports_common/r_clone_mount/_docker_utils.py @@ -0,0 +1,152 @@ +import logging +import os +from collections.abc import AsyncIterator +from contextlib import asynccontextmanager +from pathlib import Path +from typing import Final + +from aiodocker import Docker +from aiodocker.exceptions import DockerError +from aiodocker.networks import DockerNetwork +from aiodocker.types import JSONObject +from models_library.basic_types import PortInt +from pydantic import ByteSize, NonNegativeInt + +from ._models import GetBindPathsProtocol + +_logger = logging.getLogger(__name__) + +_NOT_FOUND: Final[int] = 404 +_INTERNAL_SERVER_ERROR: Final[int] = 500 + + +def _get_self_container_id() -> str: + # in docker the hostname is the container id + return os.environ["HOSTNAME"] + + +@asynccontextmanager +async def get_or_crate_docker_session(docker: Docker | None) -> AsyncIterator[Docker]: + if docker is not None: + yield docker + return + + async with Docker() as client: + yield client + + +async def _get_config( + command: str, + r_clone_version: str, + rc_port: PortInt, + r_clone_network_name: str, + local_mount_path: Path, + memory_limit: ByteSize, + nano_cpus: NonNegativeInt, + handler_get_bind_paths: GetBindPathsProtocol, +) -> JSONObject: + return { + "Image": f"rclone/rclone:{r_clone_version}", + "Entrypoint": ["/bin/sh", "-c", f"{command}"], + "ExposedPorts": {f"{rc_port}/tcp": {}}, + "HostConfig": { + "NetworkMode": r_clone_network_name, + "Binds": [], + "Mounts": await handler_get_bind_paths(local_mount_path), + "Devices": [ + { + "PathOnHost": "/dev/fuse", + "PathInContainer": "/dev/fuse", + "CgroupPermissions": "rwm", + } + ], + "CapAdd": ["SYS_ADMIN"], + "SecurityOpt": ["apparmor:unconfined", "seccomp:unconfined"], + "Memory": memory_limit, + "MemorySwap": memory_limit, + "NanoCpus": nano_cpus, + }, + } + + +async def create_r_clone_container( + docker: Docker | None, + container_name: str, + *, + command: str, + r_clone_version: str, + rc_port: PortInt, + r_clone_network_name: str, + local_mount_path: Path, + memory_limit: ByteSize, + nano_cpus: NonNegativeInt, + handler_get_bind_paths: GetBindPathsProtocol, +) -> None: + async with get_or_crate_docker_session(docker) as client: + # create rclone container attached to the network + r_clone_container = await client.containers.run( + config=await _get_config( + command, + r_clone_version, + rc_port, + r_clone_network_name, + local_mount_path, + memory_limit, + nano_cpus, + handler_get_bind_paths, + ), + name=container_name, + ) + container_inspect = await r_clone_container.show() + _logger.debug( + "Started rclone mount container '%s' with command='%s' (inspect=%s)", + container_name, + command, + container_inspect, + ) + + +async def create_network_and_connect_sidecar_container( + docker: Docker | None, network_name: str +) -> None: + async with get_or_crate_docker_session(docker) as client: + r_clone_network = await client.networks.create( + {"Name": network_name, "Attachable": True} + ) + await r_clone_network.connect({"Container": _get_self_container_id()}) + + +async def remove_container_if_exists( + docker: Docker | None, container_name: str +) -> None: + async with get_or_crate_docker_session(docker) as client: + try: + existing_container = await client.containers.get(container_name) + await existing_container.delete(force=True) + except DockerError as e: + if e.status != _NOT_FOUND: + raise + + +async def remove_network_if_exists(docker: Docker | None, network_name: str) -> None: + async with get_or_crate_docker_session(docker) as client: + existing_network = DockerNetwork(client, network_name) + + try: + await existing_network.disconnect({"Container": _get_self_container_id()}) + except DockerError as e: + if ( + not ( + e.status == _INTERNAL_SERVER_ERROR + and "is not connected to network" in e.message + ) + and e.status != _NOT_FOUND + ): + raise + + try: + await existing_network.show() + await existing_network.delete() + except DockerError as e: + if e.status != _NOT_FOUND: + raise diff --git a/packages/simcore-sdk/src/simcore_sdk/node_ports_common/r_clone_mount/_errors.py b/packages/simcore-sdk/src/simcore_sdk/node_ports_common/r_clone_mount/_errors.py new file mode 100644 index 000000000000..7d4aafc7c0cd --- /dev/null +++ b/packages/simcore-sdk/src/simcore_sdk/node_ports_common/r_clone_mount/_errors.py @@ -0,0 +1,17 @@ +from common_library.errors_classes import OsparcErrorMixin + + +class _BaseRcloneMountError(OsparcErrorMixin, RuntimeError): + pass + + +class WaitingForTransfersToCompleteError(_BaseRcloneMountError): + msg_template: str = "Waiting for all transfers to complete" + + +class WaitingForQueueToBeEmptyError(_BaseRcloneMountError): + msg_template: str = "Waiting for VFS queue to be empty: queue={queue}" + + +class MountAlreadyStartedError(_BaseRcloneMountError): + msg_template: str = "Mount already started for local path='{local_mount_path}'" diff --git a/packages/simcore-sdk/src/simcore_sdk/node_ports_common/r_clone_mount/_manager.py b/packages/simcore-sdk/src/simcore_sdk/node_ports_common/r_clone_mount/_manager.py new file mode 100644 index 000000000000..9a3cce416ecc --- /dev/null +++ b/packages/simcore-sdk/src/simcore_sdk/node_ports_common/r_clone_mount/_manager.py @@ -0,0 +1,250 @@ +import asyncio +import logging +from datetime import UTC, datetime, timedelta +from pathlib import Path +from typing import Final +from uuid import uuid4 + +from common_library.async_tools import cancel_wait_task +from models_library.basic_types import PortInt +from models_library.projects_nodes_io import NodeID, StorageFileID +from pydantic import NonNegativeInt +from servicelib.background_task import create_periodic_task +from servicelib.logging_utils import log_catch, log_context +from servicelib.utils import unused_port +from settings_library.r_clone import RCloneSettings + +from ._config_provider import MountRemoteType, get_config_content +from ._container import ContainerManager, RemoteControlHttpClient +from ._errors import ( + MountAlreadyStartedError, +) +from ._models import ( + GetBindPathsProtocol, + MountActivity, + MountActivityProtocol, + MountId, + RequestShutdownProtocol, +) +from ._utils import get_mount_id + +_logger = logging.getLogger(__name__) + + +_DEFAULT_MOUNT_ACTIVITY_UPDATE_INTERVAL: Final[timedelta] = timedelta(seconds=5) + + +class _TrackedMount: # pylint:disable=too-many-instance-attributes + def __init__( # pylint:disable=too-many-arguments + self, + node_id: NodeID, + r_clone_settings: RCloneSettings, + mount_remote_type: MountRemoteType, + *, + rc_port: PortInt, + remote_path: StorageFileID, + local_mount_path: Path, + index: NonNegativeInt, + handler_get_bind_paths: GetBindPathsProtocol, + handler_mount_activity: MountActivityProtocol, + mount_activity_update_interval: timedelta = _DEFAULT_MOUNT_ACTIVITY_UPDATE_INTERVAL, + ) -> None: + self.remote_path = remote_path + self.local_mount_path = local_mount_path + self.index = index + + self._handler_mount_activity = handler_mount_activity + self._mount_activity_update_interval = mount_activity_update_interval + + self._last_mount_activity: MountActivity | None = None + self._last_mount_activity_update: datetime = datetime.fromtimestamp(0, UTC) + self._task_mount_activity: asyncio.Task[None] | None = None + + rc_user = f"{uuid4()}" + rc_password = f"{uuid4()}" + + # used internally to handle the mount command + self._container_manager = ContainerManager( + r_clone_settings=r_clone_settings, + node_id=node_id, + rc_port=rc_port, + local_mount_path=self.local_mount_path, + index=self.index, + r_clone_config_content=get_config_content( + r_clone_settings, mount_remote_type + ), + remote_path=f"{r_clone_settings.R_CLONE_S3.S3_BUCKET_NAME}/{self.remote_path}", + rc_user=rc_user, + rc_password=rc_password, + handler_get_bind_paths=handler_get_bind_paths, + ) + + self._rc_http_client = RemoteControlHttpClient( + rc_port=rc_port, + rc_host=self._container_manager.r_clone_container_name, + rc_user=rc_user, + rc_password=rc_password, + transfers_completed_timeout=r_clone_settings.R_CLONE_MOUNT_SETTINGS.R_CLONE_MOUNT_TRANSFERS_COMPLETED_TIMEOUT, + ) + + async def _update_and_notify_mount_activity( + self, mount_activity: MountActivity + ) -> None: + now = datetime.now(UTC) + + enough_time_passed = ( + now - self._last_mount_activity_update + > self._mount_activity_update_interval + ) + + if enough_time_passed and self._last_mount_activity != mount_activity: + self._last_mount_activity = mount_activity + self._last_mount_activity_update = now + + await self._handler_mount_activity(self.local_mount_path, mount_activity) + + async def _worker_mount_activity(self) -> None: + mount_activity = await self._rc_http_client.get_mount_activity() + with log_catch(logger=_logger, reraise=False): + await self._update_and_notify_mount_activity(mount_activity) + + async def start_mount(self) -> None: + await self._container_manager.create() + + await self._rc_http_client.wait_for_interface_to_be_ready() + + self._task_mount_activity = create_periodic_task( + self._worker_mount_activity, + interval=self._mount_activity_update_interval, + task_name=f"rclone-mount-activity-{get_mount_id(self.local_mount_path, self.index)}", + ) + + async def stop_mount(self, *, skip_transfer_wait: bool = False) -> None: + if not skip_transfer_wait: + await self._rc_http_client.wait_for_all_transfers_to_complete() + + await self._container_manager.remove() + if self._task_mount_activity is not None: + await cancel_wait_task(self._task_mount_activity) + + async def wait_for_all_transfers_to_complete(self) -> None: + await self._rc_http_client.wait_for_all_transfers_to_complete() + + async def is_responsive(self) -> bool: + return await self._rc_http_client.is_responsive() + + +class RCloneMountManager: + def __init__( + self, + r_clone_settings: RCloneSettings, + *, + handler_request_shutdown: RequestShutdownProtocol, + ) -> None: + self.r_clone_settings = r_clone_settings + self.handler_request_shutdown = handler_request_shutdown + if r_clone_settings.R_CLONE_VERSION is None: + msg = "R_CLONE_VERSION setting is not set" + raise RuntimeError(msg) + + self._tracked_mounts: dict[MountId, _TrackedMount] = {} + self._task_ensure_mounts_working: asyncio.Task[None] | None = None + + async def ensure_mounted( + self, + local_mount_path: Path, + index: NonNegativeInt, + *, + node_id: NodeID, + remote_type: MountRemoteType, + remote_path: StorageFileID, + handler_get_bind_paths: GetBindPathsProtocol, + handler_mount_activity: MountActivityProtocol, + ) -> None: + with log_context( + _logger, + logging.INFO, + f"mounting {local_mount_path=} from {remote_path=}", + log_duration=True, + ): + mount_id = get_mount_id(local_mount_path, index) + if mount_id in self._tracked_mounts: + tracked_mount = self._tracked_mounts[mount_id] + raise MountAlreadyStartedError(local_mount_path=local_mount_path) + + free_port = await asyncio.get_running_loop().run_in_executor( + None, unused_port + ) + + tracked_mount = _TrackedMount( + node_id, + self.r_clone_settings, + remote_type, + rc_port=free_port, + remote_path=remote_path, + local_mount_path=local_mount_path, + index=index, + handler_get_bind_paths=handler_get_bind_paths, + handler_mount_activity=handler_mount_activity, + ) + self._tracked_mounts[mount_id] = tracked_mount + await tracked_mount.start_mount() + + def is_mount_tracked(self, local_mount_path: Path, index: NonNegativeInt) -> bool: + mount_id = get_mount_id(local_mount_path, index) + return mount_id in self._tracked_mounts + + async def ensure_unmounted( + self, local_mount_path: Path, index: NonNegativeInt + ) -> None: + with log_context( + _logger, logging.INFO, f"unmounting {local_mount_path=}", log_duration=True + ): + mount_id = get_mount_id(local_mount_path, index) + tracked_mount = self._tracked_mounts.pop(mount_id) + + await tracked_mount.wait_for_all_transfers_to_complete() + + await tracked_mount.stop_mount() + + async def _worker_ensure_mounts_working(self) -> None: + mount_restored = False + with log_context(_logger, logging.DEBUG, "Ensuring rclone mounts are working"): + for mount in self._tracked_mounts.values(): + if not await mount.is_responsive(): + with log_context( + _logger, + logging.WARNING, + f"Restoring mount for path='{mount.local_mount_path}'", + ): + await mount.stop_mount(skip_transfer_wait=True) + await mount.start_mount() + mount_restored = True + + if mount_restored: + with log_context( + _logger, + logging.WARNING, + "Requesting service shutdown due to mount restoration", + ): + # NOTE: since the mount is bind mounted, we ensure that it restarts properly + # then we shutdown the service since the user service will have an out of date + # FUSE mount. + await self.handler_request_shutdown() + + async def setup(self) -> None: + self._task_ensure_mounts_working = create_periodic_task( + self._worker_ensure_mounts_working, + interval=timedelta(seconds=10), + task_name="rclone-mount-ensure-mounts-working", + ) + + async def teardown(self) -> None: + if self._task_ensure_mounts_working is not None: + await cancel_wait_task(self._task_ensure_mounts_working) + + # shutdown still ongoing mounts + await asyncio.gather( + *[mount.stop_mount() for mount in self._tracked_mounts.values()] + ) + self._tracked_mounts.clear() diff --git a/packages/simcore-sdk/src/simcore_sdk/node_ports_common/r_clone_mount/_models.py b/packages/simcore-sdk/src/simcore_sdk/node_ports_common/r_clone_mount/_models.py new file mode 100644 index 000000000000..a4809587e9a1 --- /dev/null +++ b/packages/simcore-sdk/src/simcore_sdk/node_ports_common/r_clone_mount/_models.py @@ -0,0 +1,26 @@ +from pathlib import Path +from typing import Protocol + +from models_library.progress_bar import ProgressReport +from pydantic import BaseModel + +type MountId = str + +type Transferring = dict[str, ProgressReport] + + +class MountActivity(BaseModel): + transferring: Transferring + queued: list[str] + + +class GetBindPathsProtocol(Protocol): + async def __call__(self, state_path: Path) -> list: ... + + +class MountActivityProtocol(Protocol): + async def __call__(self, state_path: Path, activity: MountActivity) -> None: ... + + +class RequestShutdownProtocol(Protocol): + async def __call__(self) -> None: ... diff --git a/packages/simcore-sdk/src/simcore_sdk/node_ports_common/r_clone_mount/_utils.py b/packages/simcore-sdk/src/simcore_sdk/node_ports_common/r_clone_mount/_utils.py new file mode 100644 index 000000000000..0c864322fe26 --- /dev/null +++ b/packages/simcore-sdk/src/simcore_sdk/node_ports_common/r_clone_mount/_utils.py @@ -0,0 +1,10 @@ +from pathlib import Path + +from pydantic import NonNegativeInt + +from ._models import MountId + + +def get_mount_id(local_mount_path: Path, index: NonNegativeInt) -> MountId: + # unique reproducible id for the mount + return f"{index}{local_mount_path}".replace("/", "_")[::-1] diff --git a/packages/simcore-sdk/tests/unit/test_node_ports_common__r_clone_mount__core.py b/packages/simcore-sdk/tests/unit/test_node_ports_common__r_clone_mount__core.py new file mode 100644 index 000000000000..0ed2c0926f2a --- /dev/null +++ b/packages/simcore-sdk/tests/unit/test_node_ports_common__r_clone_mount__core.py @@ -0,0 +1,501 @@ +# pylint: disable=protected-access +# pylint: disable=redefined-outer-name +# pylint: disable=unused-argument +import os +import re +import secrets +from collections.abc import AsyncIterable, AsyncIterator +from contextlib import asynccontextmanager +from pathlib import Path +from typing import Final, cast + +import aioboto3 +import aiodocker +import aiofiles +import pytest +from aiobotocore.session import ClientCreatorContext +from aiodocker.networks import DockerNetwork +from botocore.client import Config +from faker import Faker +from models_library.api_schemas_storage.storage_schemas import S3BucketName +from models_library.basic_types import PortInt +from models_library.projects_nodes_io import StorageFileID +from pydantic import ByteSize, TypeAdapter +from pytest_mock import MockerFixture +from pytest_simcore.helpers.monkeypatch_envs import EnvVarsDict, setenvs_from_dict +from servicelib.container_utils import run_command_in_container +from servicelib.file_utils import create_sha256_checksum +from servicelib.logging_utils import _dampen_noisy_loggers +from servicelib.utils import limited_gather +from settings_library.r_clone import RCloneSettings +from simcore_sdk.node_ports_common.r_clone_mount import RCloneMountManager, _core +from simcore_sdk.node_ports_common.r_clone_mount._config_provider import ( + MountRemoteType, +) +from simcore_sdk.node_ports_common.r_clone_mount._core import ( + DaemonProcessManager, +) +from types_aiobotocore_s3 import S3Client + +_dampen_noisy_loggers(("botocore", "aiobotocore", "aioboto3", "moto.server")) + + +@pytest.fixture +def r_clone_version(package_dir: Path) -> str: + install_rclone_bash = ( + (package_dir / ".." / ".." / ".." / "..").resolve() + / "scripts" + / "install_rclone.bash" + ) + assert install_rclone_bash.exists() + + match = re.search(r'R_CLONE_VERSION="([\d.]+)"', install_rclone_bash.read_text()) + assert match + return match.group(1) + + +@pytest.fixture +def local_s3_content_path(tmpdir: Path) -> Path: + # path where s3 are created and then uploaded form + path = Path(tmpdir) / "copy_to_s3" + path.mkdir(parents=True, exist_ok=True) + return path + + +@pytest.fixture +def r_clone_local_mount_path(tmpdir: Path) -> Path: + # where rclone mount will make the files available + path = Path(tmpdir) / "r_clone_local_mount_path" + path.mkdir(parents=True, exist_ok=True) + return path + + +@pytest.fixture +def config_path(tmpdir: Path) -> Path: + # where the configuration path for rclone is found inside the container + path = Path(tmpdir) / "config_path" + path.mkdir(parents=True, exist_ok=True) + return path + + +@pytest.fixture +def mock_config_file(config_path: Path, faker: Faker, mocker: MockerFixture) -> None: + # ensure this returns a path where the config is living which has to be mounted in the container + # replace context manager with one that writes here + @asynccontextmanager + async def config_file(config: str) -> AsyncIterator[str]: + file_path = config_path / f"{faker.uuid4()}" + file_path.write_text(config) + yield f"{file_path}" + + file_path.unlink() + + mocker.patch.object(_core, "config_file", config_file) + + +_MONITORING_PORT: Final[PortInt] = 5572 + + +@pytest.fixture +async def docker_network() -> AsyncIterable[DockerNetwork]: + async with aiodocker.Docker() as client: + network_to_attach = await client.networks.create({"Name": "a_test_network"}) + try: + yield network_to_attach + finally: + await network_to_attach.delete() + + +@pytest.fixture +async def r_clone_container( + r_clone_version: str, + r_clone_local_mount_path: Path, + config_path: Path, + monkeypatch: pytest.MonkeyPatch, + docker_network: DockerNetwork, +) -> AsyncIterable[str]: + async with aiodocker.Docker() as client: + container = await client.containers.run( + config={ + "Image": f"rclone/rclone:{r_clone_version}", + "Entrypoint": ["/bin/sh", "-c", "apk add findutils && sleep 10000"], + "ExposedPorts": {f"{_MONITORING_PORT}/tcp": {}}, + "HostConfig": { + "PortBindings": { + f"{_MONITORING_PORT}/tcp": [{"HostPort": f"{_MONITORING_PORT}"}] + }, + "Binds": [ + f"{r_clone_local_mount_path}:{r_clone_local_mount_path}:rw", + f"{config_path}:{config_path}:rw", + ], + "Devices": [ + { + "PathOnHost": "/dev/fuse", + "PathInContainer": "/dev/fuse", + "CgroupPermissions": "rwm", + } + ], + "CapAdd": ["SYS_ADMIN"], + "SecurityOpt": ["apparmor:unconfined", "seccomp:unconfined"], + }, + } + ) + container_inspect = await container.show() + + container_name = container_inspect["Name"][1:] + monkeypatch.setenv("HOSTNAME", container_name) + + await docker_network.connect({"Container": container.id}) + + try: + yield container.id + finally: + await container.delete(force=True) + + +@pytest.fixture +async def moto_container(docker_network: DockerNetwork) -> AsyncIterable[None]: + async with aiodocker.Docker() as client: + container = await client.containers.run( + config={ + "Image": "motoserver/moto:latest", + "ExposedPorts": {"5000/tcp": {}}, + "HostConfig": { + "PortBindings": {"5000/tcp": [{"HostPort": "5000"}]}, + }, + "Env": ["MOTO_PORT=5000"], + }, + name="moto", + ) + await docker_network.connect({"Container": container.id}) + + try: + yield None + finally: + await container.delete(force=True) + + +async def test_daemon_container_process(r_clone_container: str): + container_process = DaemonProcessManager("sleep 10000") + await container_process.start() + assert container_process.pid + + ps_command = "ps -o pid,stat,comm" + result = await container_process._run_in_container(ps_command) # noqa: SLF001 + assert f"{container_process.pid} S" in result # check sleeping + + await container_process.stop() + await container_process._run_in_container(ps_command) # noqa: SLF001 + assert f"{container_process.pid} Z" not in result # check killed + + +@pytest.fixture +def vfs_cache_path(tmpdir: Path) -> Path: + # path inside the docker container where the vfs cache will be stored + # for tests this can be just placed in the tmp directory ? + # TODO: for better tests it's better that is mounted as a volume + return Path("/tmp/rclone_cache") # noqa: S108 + + +@pytest.fixture +def remote_path(faker: Faker) -> StorageFileID: + return TypeAdapter(StorageFileID).validate_python( + f"{faker.uuid4()}/{faker.uuid4()}/mounted-path" + ) + + +@pytest.fixture +def bucket_name() -> S3BucketName: + return TypeAdapter(S3BucketName).validate_python("osparc-data") + + +@pytest.fixture +def mock_environment( + monkeypatch: pytest.MonkeyPatch, vfs_cache_path: Path, bucket_name: S3BucketName +) -> EnvVarsDict: + return setenvs_from_dict( + monkeypatch, + { + "R_CLONE_PROVIDER": "AWS_MOTO", + "S3_ENDPOINT": "http://moto:5000", + "S3_ACCESS_KEY": "test", + "S3_BUCKET_NAME": bucket_name, + "S3_SECRET_KEY": "test", + "S3_REGION": "us-east-1", + "R_CLONE_MOUNT_VFS_CACHE_PATH": f"{vfs_cache_path}", + }, + ) + + +@pytest.fixture +def r_clone_settings(mock_environment: EnvVarsDict) -> RCloneSettings: + return RCloneSettings.create_from_envs() + + +@pytest.fixture +async def s3_client(r_clone_settings: RCloneSettings) -> AsyncIterable[S3Client]: + s3_settings = r_clone_settings.R_CLONE_S3 + session = aioboto3.Session() + session_client = session.client( + "s3", + endpoint_url=f"{s3_settings.S3_ENDPOINT}".replace("moto", "localhost"), + aws_access_key_id=s3_settings.S3_ACCESS_KEY, + aws_secret_access_key=s3_settings.S3_SECRET_KEY, + region_name=s3_settings.S3_REGION, + config=Config(signature_version="s3v4"), + ) + assert isinstance(session_client, ClientCreatorContext) # nosec + async with session_client as client: + client = cast(S3Client, client) + yield client + + +def _secure_randint(a: int, b: int) -> int: + return a + secrets.randbelow(b - a + 1) + + +_DEFAULT_CHUCNK_SIZE: Final[ByteSize] = TypeAdapter(ByteSize).validate_python("1kb") + + +async def _get_random_file( + faker: Faker, + *, + store_to: Path, + file_size: ByteSize, + chunk_size: ByteSize = _DEFAULT_CHUCNK_SIZE, +) -> Path: + # creates a file in a path and returns it's hash + # generate a random file of size X and a random path inside the directory + + path_in_folder = Path( + faker.file_path(depth=_secure_randint(0, 5), extension="bin") + ).relative_to("/") + file_path = store_to / path_in_folder + + # ensure parent directory exists + file_path.parent.mkdir(parents=True, exist_ok=True) + assert file_path.parent.exists() + + async with aiofiles.open(file_path, "wb") as file: + written = 0 + while written < file_size: + to_write = min(chunk_size, file_size - written) + chunk = os.urandom(to_write) + await file.write(chunk) + written += to_write + + return path_in_folder + + +def _get_random_file_size() -> ByteSize: + return TypeAdapter(ByteSize).validate_python(f"{_secure_randint(1,1024)}Kb") + + +@pytest.fixture +async def create_files_in_s3( + r_clone_settings: RCloneSettings, + moto_container: None, + s3_client: S3Client, + bucket_name: S3BucketName, + faker: Faker, + remote_path: StorageFileID, + local_s3_content_path: Path, +) -> AsyncIterable[None]: + + await s3_client.create_bucket(Bucket=bucket_name) + + async def _create_file() -> None: + path_in_folder = await _get_random_file( + faker, + store_to=local_s3_content_path, + file_size=_get_random_file_size(), + ) + file_path = local_s3_content_path / path_in_folder + assert file_path.exists() + await s3_client.upload_file( + Filename=f"{file_path}", + Bucket=bucket_name, + Key=f"{remote_path/path_in_folder}", + ) + + files_to_create = _secure_randint(5, 20) + await limited_gather(*[_create_file() for _ in range(files_to_create)], limit=5) + + yield None + + files_in_bucket = await s3_client.list_objects_v2(Bucket=bucket_name) + + await limited_gather( + *[ + s3_client.delete_object(Bucket=bucket_name, Key=obj["Key"]) + for obj in files_in_bucket.get("Contents", []) + ], + limit=10, + ) + + # check all content form s3 was removed + files_in_bucket = await s3_client.list_objects_v2(Bucket=bucket_name) + assert files_in_bucket.get("Contents", []) == [] + + +@pytest.fixture +def mock_rc_port_with_default(mocker: MockerFixture) -> None: + mocker.patch( + "simcore_sdk.node_ports_common.r_clone_mount._core.unused_port", + return_value=_MONITORING_PORT, + ) + + +@pytest.fixture +async def single_mount_r_clone_mount_manager( + mock_rc_port_with_default: None, + r_clone_container: str, + mock_config_file: None, + r_clone_settings: RCloneSettings, +) -> AsyncIterable[RCloneMountManager]: + r_clone_mount_manager = RCloneMountManager(r_clone_settings) + + yield r_clone_mount_manager + + await r_clone_mount_manager.teardown() + + +async def _get_file_checksums_from_local_path( + local_s3_content_path: Path, +) -> dict[Path, str]: + local_checksums = {} + for dirpath, _, filenames in os.walk(local_s3_content_path): + for filename in filenames: + file_path = Path(dirpath) / filename + relative_path = file_path.relative_to(local_s3_content_path) + + async with aiofiles.open(file_path, "rb") as file: + checksum = await create_sha256_checksum(file) + + local_checksums[relative_path] = checksum + return local_checksums + + +async def _get_file_checksums_from_container( + path_in_container: Path, r_clone_container: str +) -> dict[Path, str]: + remote_checksum_and_files = await run_command_in_container( + r_clone_container, + command=f"find {path_in_container} -type f -exec sha256sum {{}} \\;", + timeout=30, + ) + + def _parse_entry(entry: str) -> tuple[Path, str]: + checksum, file_path = entry.strip().split() + relative_path = Path(file_path).relative_to(path_in_container) + return relative_path, checksum + + return dict( + [_parse_entry(x) for x in remote_checksum_and_files.strip().split("\n")] + ) + + +async def _get_files_from_s3( + s3_client: S3Client, bucket_name: S3BucketName, remote_path: StorageFileID +) -> dict[Path, str]: + """Download files from S3 and return their SHA256 checksums.""" + files_in_bucket = await s3_client.list_objects_v2(Bucket=bucket_name) + + async def _get_file_checksum(key: str) -> tuple[Path, str]: + response = await s3_client.get_object(Bucket=bucket_name, Key=key) + checksum = await create_sha256_checksum(response["Body"]) + return Path(key).relative_to(Path(remote_path)), checksum + + results = await limited_gather( + *[ + _get_file_checksum(obj["Key"]) + for obj in files_in_bucket.get("Contents", []) + ], + limit=10, + ) + + return dict(results) + + +async def _assert_local_content_in_s3( + s3_client: S3Client, + bucket_name: S3BucketName, + local_s3_content_path: Path, + remote_path: StorageFileID, +) -> None: + files_local_folder = await _get_file_checksums_from_local_path( + local_s3_content_path + ) + files_from_s3 = await _get_files_from_s3(s3_client, bucket_name, remote_path) + + assert files_local_folder == files_from_s3 + + +async def _assert_same_files_in_all_places( + s3_client: S3Client, + bucket_name: S3BucketName, + r_clone_container: str, + r_clone_local_mount_path: Path, + remote_path: StorageFileID, +) -> None: + files_from_container = await _get_file_checksums_from_container( + r_clone_local_mount_path, r_clone_container + ) + files_from_s3 = await _get_files_from_s3(s3_client, bucket_name, remote_path) + assert files_from_container == files_from_s3 + + +async def _change_file_in_container( + path_in_container: Path, r_clone_container: str +) -> None: + await run_command_in_container( + r_clone_container, + command=f"dd if=/dev/urandom of={path_in_container} bs={_get_random_file_size()} count=1", + timeout=30, + ) + + +async def test_tracked_mount_waits_for_files_before_finalizing( + create_files_in_s3: None, + single_mount_r_clone_mount_manager: RCloneMountManager, + r_clone_local_mount_path: Path, + # maybe drop + s3_client: S3Client, + bucket_name: S3BucketName, + r_clone_container: str, + local_s3_content_path: Path, + remote_path: StorageFileID, +): + await single_mount_r_clone_mount_manager.start_mount( + MountRemoteType.S3, remote_path, r_clone_local_mount_path + ) + + await _assert_local_content_in_s3( + s3_client, bucket_name, local_s3_content_path, remote_path + ) + + def _get_random_file_in_container() -> Path: + return r_clone_local_mount_path / secrets.choice( + [x for x in local_s3_content_path.rglob("*") if x.is_file()] + ).relative_to(local_s3_content_path) + + # change and check all is the same + files_to_change = {_get_random_file_in_container() for _ in range(15)} + + await limited_gather( + *[_change_file_in_container(x, r_clone_container) for x in files_to_change], + limit=10, + ) + + await single_mount_r_clone_mount_manager.wait_for_transfers_to_complete( + r_clone_local_mount_path + ) + await _assert_same_files_in_all_places( + s3_client, bucket_name, r_clone_container, r_clone_local_mount_path, remote_path + ) + + await single_mount_r_clone_mount_manager.stop_mount(r_clone_local_mount_path) + + +# TODO: we need a mode to check if rclone mount properly resumes the mounting in case of crash and restart +# we need a test for this one diff --git a/packages/simcore-sdk/tests/unit/test_node_ports_common_r_clone_mount.py b/packages/simcore-sdk/tests/unit/test_node_ports_common_r_clone_mount.py new file mode 100644 index 000000000000..15e4fecb59c4 --- /dev/null +++ b/packages/simcore-sdk/tests/unit/test_node_ports_common_r_clone_mount.py @@ -0,0 +1,391 @@ +# pylint: disable=protected-access +# pylint: disable=redefined-outer-name +# pylint: disable=unused-argument +import contextlib +import os +import re +from collections.abc import AsyncIterable, AsyncIterator, Iterator +from pathlib import Path +from typing import cast + +import aioboto3 +import aiodocker +import aiofiles +import pytest +from _pytest._py.path import LocalPath +from aiobotocore.session import ClientCreatorContext +from aiodocker.types import JSONObject +from botocore.client import Config +from faker import Faker +from models_library.api_schemas_storage.storage_schemas import S3BucketName +from models_library.basic_types import PortInt +from models_library.projects_nodes_io import NodeID, StorageFileID +from moto.server import ThreadedMotoServer +from pydantic import ByteSize, NonNegativeInt, TypeAdapter +from pytest_mock import MockerFixture +from pytest_simcore.helpers.monkeypatch_envs import EnvVarsDict, setenvs_from_dict +from servicelib.file_utils import create_sha256_checksum +from servicelib.logging_utils import _dampen_noisy_loggers +from settings_library.r_clone import DEFAULT_VFS_CACHE_PATH, RCloneSettings +from simcore_sdk.node_ports_common.r_clone_mount import ( + GetBindPathsProtocol, + MountActivity, + MountRemoteType, + RCloneMountManager, +) +from simcore_sdk.node_ports_common.r_clone_mount._container import ( + RemoteControlHttpClient, +) +from simcore_sdk.node_ports_common.r_clone_mount._docker_utils import ( + _get_config as original_get_config, +) +from types_aiobotocore_s3 import S3Client + +_dampen_noisy_loggers(("botocore", "aiobotocore", "aioboto3", "moto.server")) + + +@pytest.fixture +def bucket_name() -> S3BucketName: + return TypeAdapter(S3BucketName).validate_python("osparc-data") + + +@pytest.fixture +def r_clone_version(package_dir: Path) -> str: + install_rclone_bash = ( + (package_dir / ".." / ".." / ".." / "..").resolve() + / "scripts" + / "install_rclone.bash" + ) + assert install_rclone_bash.exists() + + match = re.search(r'R_CLONE_VERSION="([\d.]+)"', install_rclone_bash.read_text()) + assert match + return match.group(1) + + +@pytest.fixture +def mock_environment( + monkeypatch: pytest.MonkeyPatch, bucket_name: S3BucketName, r_clone_version: str +) -> EnvVarsDict: + return setenvs_from_dict( + monkeypatch, + { + "R_CLONE_PROVIDER": "AWS_MOTO", + "S3_ENDPOINT": "http://127.0.0.1:5000", + "S3_ACCESS_KEY": "test", + "S3_BUCKET_NAME": bucket_name, + "S3_SECRET_KEY": "test", + "S3_REGION": "us-east-1", + "R_CLONE_VERSION": r_clone_version, + "R_CLONE_MOUNT_CONTAINER_SHOW_DEBUG_LOGS": "1", + }, + ) + + +@pytest.fixture +def r_clone_settings(mock_environment: EnvVarsDict) -> RCloneSettings: + return RCloneSettings.create_from_envs() + + +@pytest.fixture +async def s3_client( + r_clone_settings: RCloneSettings, bucket_name: S3BucketName +) -> AsyncIterable[S3Client]: + s3_settings = r_clone_settings.R_CLONE_S3 + session = aioboto3.Session() + session_client = session.client( + "s3", + endpoint_url=f"{s3_settings.S3_ENDPOINT}".replace("moto", "localhost"), + aws_access_key_id=s3_settings.S3_ACCESS_KEY, + aws_secret_access_key=s3_settings.S3_SECRET_KEY, + region_name=s3_settings.S3_REGION, + config=Config(signature_version="s3v4"), + ) + assert isinstance(session_client, ClientCreatorContext) # nosec + async with session_client as client: + client = cast(S3Client, client) + + # Create the bucket + await client.create_bucket(Bucket=bucket_name) + + yield client + + +@pytest.fixture +async def r_clone_mount_manager( + r_clone_settings: RCloneSettings, +) -> AsyncIterator[RCloneMountManager]: + + # TODO: maybe put this into a fixture + async def do_nothing() -> None: + pass + + manager = RCloneMountManager(r_clone_settings, handler_request_shutdown=do_nothing) + await manager.setup() + + yield manager + + await manager.teardown() + + +@pytest.fixture +def local_mount_path(tmpdir: LocalPath) -> Path: + local_mount_path = Path(tmpdir) / "local_mount_path" + local_mount_path.mkdir(parents=True, exist_ok=True) + return local_mount_path + + +@pytest.fixture +def vfs_cache_path(tmpdir: LocalPath) -> Path: + vfs_cache_path = Path(tmpdir) / "vfs_cache_path" + vfs_cache_path.mkdir(parents=True, exist_ok=True) + return vfs_cache_path + + +@pytest.fixture +def index() -> int: + return 0 + + +@pytest.fixture +def remote_path(faker: Faker) -> StorageFileID: + return TypeAdapter(StorageFileID).validate_python( + f"{faker.uuid4()}/{faker.uuid4()}/mounted-path" + ) + + +@pytest.fixture +def node_id(faker: Faker) -> NodeID: + return faker.uuid4(cast_to=None) + + +@pytest.fixture +def moto_server() -> Iterator[None]: + server = ThreadedMotoServer(port="5000") + server.start() + yield None + server.stop() + + +@pytest.fixture +async def mocked_self_container(mocker: MockerFixture) -> AsyncIterator[None]: + # start the simplest lightweight container that sleeps forever + async with aiodocker.Docker() as client: + container = await client.containers.run( + config={"Image": "alpine:latest", "Cmd": ["sleep", "infinity"]} + ) + + mocker.patch( + "simcore_sdk.node_ports_common.r_clone_mount._docker_utils._get_self_container_id", + return_value=container.id, + ) + + yield None + + # remove started container + with contextlib.suppress(aiodocker.exceptions.DockerError): + await container.delete(force=True) + + +@pytest.fixture +async def mocked_r_clone_container_config(mocker: MockerFixture) -> None: + async def _patched_get_config( + command: str, + r_clone_version: str, + rc_port: PortInt, + r_clone_network_name: str, + local_mount_path: Path, + memory_limit: ByteSize, + nano_cpus: NonNegativeInt, + handler_get_bind_paths: GetBindPathsProtocol, + ) -> JSONObject: + config = await original_get_config( + command, + r_clone_version, + rc_port, + r_clone_network_name, + local_mount_path, + memory_limit, + nano_cpus, + handler_get_bind_paths, + ) + # Add port forwarding to access from host + config["HostConfig"]["PortBindings"] = { + f"{rc_port}/tcp": [{"HostPort": str(rc_port)}] + } + config["HostConfig"]["NetworkMode"] = "host" + return config + + mocker.patch( + "simcore_sdk.node_ports_common.r_clone_mount._docker_utils._get_config", + side_effect=_patched_get_config, + ) + + # Patch the rc_host to use localhost instead of container name + original_init = RemoteControlHttpClient.__init__ + + def _patched_init(self, rc_host: str, rc_port: PortInt, *args, **kwargs) -> None: + # Replace container hostname with localhost for host access + original_init(self, "localhost", rc_port, *args, **kwargs) + + mocker.patch.object(RemoteControlHttpClient, "__init__", _patched_init) + + +async def _handle_mount_activity(state_path: Path, activity: MountActivity) -> None: + print(f"⏳ {state_path=} {activity=}") + + +async def _create_random_binary_file( + file_path: Path, + file_size: ByteSize, + chunk_size: int = TypeAdapter(ByteSize).validate_python("1mib"), +) -> None: + """Create a random binary file of specified size.""" + async with aiofiles.open(file_path, mode="wb") as file: + bytes_written = 0 + while bytes_written < file_size: + remaining_bytes = file_size - bytes_written + current_chunk_size = min(chunk_size, remaining_bytes) + await file.write(os.urandom(current_chunk_size)) + bytes_written += current_chunk_size + assert bytes_written == file_size + + +async def _create_file_of_size( + target_dir: Path, *, name: str, file_size: ByteSize +) -> Path: + """Create a single file with random content of specified size.""" + file_path = target_dir / name + if not file_path.parent.exists(): + file_path.parent.mkdir(parents=True, exist_ok=True) + + await _create_random_binary_file(file_path, file_size) + assert file_path.exists() + assert file_path.stat().st_size == file_size + return file_path + + +async def _create_files_in_dir( + target_dir: Path, file_count: int, file_size: ByteSize +) -> set[str]: + """Create multiple random files in a directory.""" + files = [] + for i in range(file_count): + file_path = await _create_file_of_size( + target_dir, name=f"file_{i}.bin", file_size=file_size + ) + files.append(file_path) + return {x.name for x in files} + + +async def _get_file_checksums_from_local( + local_path: Path, +) -> dict[Path, str]: + """Get SHA256 checksums of all files in a directory.""" + checksums = {} + for dirpath, _, filenames in os.walk(local_path): + for filename in filenames: + file_path = Path(dirpath) / filename + relative_path = file_path.relative_to(local_path) + + async with aiofiles.open(file_path, "rb") as file: + checksum = await create_sha256_checksum(file) + + checksums[relative_path] = checksum + return checksums + + +async def _get_file_checksums_from_s3( + s3_client: S3Client, bucket_name: S3BucketName, remote_path: StorageFileID +) -> dict[Path, str]: + response = await s3_client.list_objects_v2( + Bucket=bucket_name, Prefix=f"{remote_path}" + ) + + checksums = {} + for obj in response.get("Contents", []): + key = obj["Key"] + file_response = await s3_client.get_object(Bucket=bucket_name, Key=key) + checksum = await create_sha256_checksum(file_response["Body"]) + relative_path = Path(key).relative_to(Path(remote_path)) + checksums[relative_path] = checksum + + return checksums + + +async def test_manager( + moto_server: None, + mocked_r_clone_container_config: None, + mocked_self_container: None, + r_clone_mount_manager: RCloneMountManager, + r_clone_settings: RCloneSettings, + bucket_name: S3BucketName, + node_id: NodeID, + remote_path: StorageFileID, + local_mount_path: Path, + vfs_cache_path: Path, + index: int, + s3_client: S3Client, +) -> None: + + async def _get_bind_paths_protocol(state_path: Path) -> list[Path]: + return [ + { + "Type": "bind", + "Source": f"{state_path}", + "Target": f"{state_path}", + "BindOptions": {"Propagation": "rshared"}, + }, + { + "Type": "bind", + "Source": f"{vfs_cache_path}", + "Target": f"{DEFAULT_VFS_CACHE_PATH}", + "BindOptions": {"Propagation": "rshared"}, + }, + ] + + await r_clone_mount_manager.ensure_mounted( + local_mount_path=local_mount_path, + remote_type=MountRemoteType.S3, + remote_path=remote_path, + node_id=node_id, + index=index, + handler_get_bind_paths=_get_bind_paths_protocol, + handler_mount_activity=_handle_mount_activity, + ) + + # create random test files + file_count = 5 + file_size = TypeAdapter(ByteSize).validate_python("100kb") + created_files = await _create_files_in_dir(local_mount_path, file_count, file_size) + assert len(created_files) == file_count + + # get checksums of local files before unmounting + local_checksums = await _get_file_checksums_from_local(local_mount_path) + assert len(local_checksums) == file_count + + # wait for rclone to complete all transfers + for mount in r_clone_mount_manager._tracked_mounts.values(): # noqa: SLF001 + await mount.wait_for_all_transfers_to_complete() + + # verify data is in S3 with matching checksums and filenames + s3_checksums = await _get_file_checksums_from_s3( + s3_client, bucket_name, remote_path + ) + + # compare checksums and filenames + assert len(s3_checksums) == len(local_checksums), "File count mismatch" + assert set(s3_checksums.keys()) == set(local_checksums.keys()), "Filename mismatch" + + for file_path, local_checksum in local_checksums.items(): + s3_checksum = s3_checksums[file_path] + assert ( + local_checksum == s3_checksum + ), f"Checksum mismatch for {file_path}: local={local_checksum}, s3={s3_checksum}" + + await r_clone_mount_manager.ensure_unmounted( + local_mount_path=local_mount_path, index=index + ) + + # bind to a different directory and ensure the same content is presnet there as well + # refactor a bit how the files are generated, some more randomnes in sizes, i want to be ranom in range of files and of sizes diff --git a/packages/simcore-sdk/tests/unit/test_node_ports_v2_r_clone.py b/packages/simcore-sdk/tests/unit/test_node_ports_v2_r_clone.py index 181813559fbd..372fdd17774c 100644 --- a/packages/simcore-sdk/tests/unit/test_node_ports_v2_r_clone.py +++ b/packages/simcore-sdk/tests/unit/test_node_ports_v2_r_clone.py @@ -71,13 +71,6 @@ async def test_is_r_clone_available_cached( assert await r_clone.is_r_clone_available(None) is False -async def test__config_file(faker: Faker) -> None: - text_to_write = faker.text() - async with r_clone._config_file(text_to_write) as file_name: # noqa: SLF001 - assert text_to_write == Path(file_name).read_text() - assert Path(file_name).exists() is False - - async def test__async_command_ok() -> None: result = await r_clone._async_r_clone_command("ls", "-la") # noqa: SLF001 assert len(result) > 0 diff --git a/scripts/install_rclone.bash b/scripts/install_rclone.bash index e6378cdd9b34..1bd530ba862c 100755 --- a/scripts/install_rclone.bash +++ b/scripts/install_rclone.bash @@ -9,7 +9,7 @@ set -o nounset # abort on unbound variable set -o pipefail # don't hide errors within pipes IFS=$'\n\t' -R_CLONE_VERSION="1.63.1" +R_CLONE_VERSION="1.72.0" TARGETARCH="${TARGETARCH:-amd64}" echo "platform ${TARGETARCH}" diff --git a/services/agent/src/simcore_service_agent/services/backup.py b/services/agent/src/simcore_service_agent/services/backup.py index 8538a4dc0412..ca95731fb5ef 100644 --- a/services/agent/src/simcore_service_agent/services/backup.py +++ b/services/agent/src/simcore_service_agent/services/backup.py @@ -205,7 +205,7 @@ async def _store_in_s3( # below two options reduce to a minimum the memory footprint # https://forum.rclone.org/t/how-to-set-a-memory-limit/10230/4 "--buffer-size", # docs https://rclone.org/docs/#buffer-size-size - "0M", + "16M", "--stats", "5s", "--stats-one-line", diff --git a/services/agent/src/simcore_service_agent/services/docker_utils.py b/services/agent/src/simcore_service_agent/services/docker_utils.py index 1390a5b12df2..ef41a263172b 100644 --- a/services/agent/src/simcore_service_agent/services/docker_utils.py +++ b/services/agent/src/simcore_service_agent/services/docker_utils.py @@ -34,12 +34,12 @@ def _reverse_string(to_reverse: str) -> str: def _does_volume_require_backup(volume_name: str) -> bool: # from `dyv_1726228407_891aa1a7-eb31-459f-8aed-8c902f5f5fb0_dd84f39e-7154-4a13-ba1d-50068d723104_stupni_www_` - # retruns `stupni_www_` + # returns `stupni_www_` inverse_name_part = volume_name[CHARS_IN_VOLUME_NAME_BEFORE_DIR_NAME:] return not inverse_name_part.startswith(_VOLUMES_NOT_TO_BACKUP) -async def get_unused_dynamc_sidecar_volumes(docker: Docker) -> set[str]: +async def get_unused_dynamic_sidecar_volumes(docker: Docker) -> set[str]: """Returns all volumes unused by sidecars""" volumes = await docker.volumes.list() all_volumes: set[str] = {volume["Name"] for volume in volumes["Volumes"]} diff --git a/services/agent/src/simcore_service_agent/services/volumes_manager.py b/services/agent/src/simcore_service_agent/services/volumes_manager.py index 1ef6ef1d0cbd..6dcfac4c349f 100644 --- a/services/agent/src/simcore_service_agent/services/volumes_manager.py +++ b/services/agent/src/simcore_service_agent/services/volumes_manager.py @@ -16,15 +16,24 @@ from servicelib.rabbitmq.rpc_interfaces.agent.errors import ( NoServiceVolumesFoundRPCError, ) +from settings_library.r_clone import DEFAULT_VFS_CACHE_PATH from tenacity import AsyncRetrying, before_sleep_log, stop_after_delay, wait_fixed from ..core.settings import ApplicationSettings -from .docker_utils import get_unused_dynamc_sidecar_volumes, remove_volume +from .docker_utils import get_unused_dynamic_sidecar_volumes, remove_volume _logger = logging.getLogger(__name__) _WAIT_FOR_UNUSED_SERVICE_VOLUMES: Final[timedelta] = timedelta(minutes=1) +_VOLUMES_TO_NEVER_BACKUP: Final[set[str]] = { + "stupni", # inputs -> usually all services use this name + "stuptuo", # outputs -> can be regenerated, usually all services use this name + "erots-derahs", # shared-store -> defined by the dynamic-sidecar + f"{DEFAULT_VFS_CACHE_PATH}".strip("/")[::-1], # vfs-cache + "secnereferP", # Preferences -> usually defined by the user this is the one we use in the only service that supports if for now +} + @dataclass class VolumesManager( # pylint:disable=too-many-instance-attributes @@ -68,7 +77,7 @@ async def shutdown(self) -> None: async def _bookkeeping_task(self) -> None: with log_context(_logger, logging.DEBUG, "volume bookkeeping"): - current_unused_volumes = await get_unused_dynamc_sidecar_volumes( + current_unused_volumes = await get_unused_dynamic_sidecar_volumes( self.docker ) old_unused_volumes = set(self._unused_volumes.keys()) @@ -86,6 +95,12 @@ async def _bookkeeping_task(self) -> None: async def _remove_volume_safe( self, *, volume_name: str, requires_backup: bool ) -> None: + # overwrite backup policy if volume does not require backup + for x in _VOLUMES_TO_NEVER_BACKUP: + if f"_{x}_" in volume_name: + requires_backup = False + break + # NOTE: to avoid race conditions only one volume can be removed # also avoids issues with accessing the docker API in parallel async with self.removal_lock: @@ -100,10 +115,10 @@ async def _periodic_volume_cleanup_task(self) -> None: with log_context(_logger, logging.DEBUG, "volume cleanup"): volumes_to_remove: set[str] = set() for volume_name, inactive_since in self._unused_volumes.items(): - volume_inactive_sicne = ( + volume_inactive_since = ( arrow.utcnow().datetime - inactive_since ).total_seconds() - if volume_inactive_sicne > self.remove_volumes_inactive_for: + if volume_inactive_since > self.remove_volumes_inactive_for: volumes_to_remove.add(volume_name) for volume in volumes_to_remove: @@ -123,7 +138,7 @@ async def _wait_for_service_volumes_to_become_unused( before_sleep=before_sleep_log(_logger, logging.DEBUG), ): with attempt: - current_unused_volumes = await get_unused_dynamc_sidecar_volumes( + current_unused_volumes = await get_unused_dynamic_sidecar_volumes( self.docker ) @@ -142,6 +157,10 @@ async def _wait_for_service_volumes_to_become_unused( return service_volumes async def remove_service_volumes(self, node_id: NodeID) -> None: + """ + Cleanup after each sidecar was shut down removing all volumes it created with + a backup since it already did that + """ # bookkept volumes might not be up to date service_volumes = await self._wait_for_service_volumes_to_become_unused(node_id) _logger.debug( @@ -157,8 +176,12 @@ async def remove_service_volumes(self, node_id: NodeID) -> None: ) async def remove_all_volumes(self) -> None: + """ + Should be called by autoscaling to ensure no data is lost + If a volume is found it's data has to be backed up + """ # bookkept volumes might not be up to date - current_unused_volumes = await get_unused_dynamc_sidecar_volumes(self.docker) + current_unused_volumes = await get_unused_dynamic_sidecar_volumes(self.docker) with log_context(_logger, logging.INFO, "remove all volumes"): for volume in current_unused_volumes: diff --git a/services/agent/tests/unit/test_services_docker_utils.py b/services/agent/tests/unit/test_services_docker_utils.py index f4a19c9b9aa0..c6c1e4f59f33 100644 --- a/services/agent/tests/unit/test_services_docker_utils.py +++ b/services/agent/tests/unit/test_services_docker_utils.py @@ -17,7 +17,7 @@ _VOLUMES_NOT_TO_BACKUP, _does_volume_require_backup, _reverse_string, - get_unused_dynamc_sidecar_volumes, + get_unused_dynamic_sidecar_volumes, get_volume_details, remove_volume, ) @@ -78,7 +78,7 @@ async def test_doclker_utils_workflow( ) created_volumes.update(created_volume) - volumes = await get_unused_dynamc_sidecar_volumes(volumes_manager_docker_client) + volumes = await get_unused_dynamic_sidecar_volumes(volumes_manager_docker_client) assert volumes == created_volumes, ( "Most likely you have a dirty working state, please check " "that there are no previous docker volumes named `dyv_...` " @@ -114,12 +114,12 @@ async def test_doclker_utils_workflow( count_vloumes_to_backup if requires_backup else 0 ) - volumes = await get_unused_dynamc_sidecar_volumes(volumes_manager_docker_client) + volumes = await get_unused_dynamic_sidecar_volumes(volumes_manager_docker_client) assert len(volumes) == 0 @pytest.mark.parametrize("requires_backup", [True, False]) -async def test_remove_misisng_volume_does_not_raise_error( +async def test_remove_missing_volume_does_not_raise_error( requires_backup: bool, initialized_app: FastAPI, volumes_manager_docker_client: Docker, diff --git a/services/agent/tests/unit/test_services_volumes_manager.py b/services/agent/tests/unit/test_services_volumes_manager.py index 5fae32710dfe..17d190961bb3 100644 --- a/services/agent/tests/unit/test_services_volumes_manager.py +++ b/services/agent/tests/unit/test_services_volumes_manager.py @@ -43,7 +43,7 @@ def add_unused_volumes_for_service(self, node_id: NodeID) -> None: def remove_volume(self, volume_name: str) -> None: self.volumes.remove(volume_name) - def get_unused_dynamc_sidecar_volumes(self) -> set[str]: + def get_unused_dynamic_sidecar_volumes(self) -> set[str]: return deepcopy(self.volumes) @@ -58,8 +58,8 @@ async def _remove_volume( ) -> None: proxy.remove_volume(volume_name) - async def _get_unused_dynamc_sidecar_volumes(app: FastAPI) -> set[str]: - return proxy.get_unused_dynamc_sidecar_volumes() + async def _get_unused_dynamic_sidecar_volumes(app: FastAPI) -> set[str]: + return proxy.get_unused_dynamic_sidecar_volumes() mocker.patch( "simcore_service_agent.services.volumes_manager.remove_volume", @@ -67,8 +67,8 @@ async def _get_unused_dynamc_sidecar_volumes(app: FastAPI) -> set[str]: ) mocker.patch( - "simcore_service_agent.services.volumes_manager.get_unused_dynamc_sidecar_volumes", - side_effect=_get_unused_dynamc_sidecar_volumes, + "simcore_service_agent.services.volumes_manager.get_unused_dynamic_sidecar_volumes", + side_effect=_get_unused_dynamic_sidecar_volumes, ) return proxy @@ -105,13 +105,13 @@ async def test_volumes_manager_remove_all_volumes( mock_docker_utils.add_unused_volumes_for_service(uuid4()) assert spy_remove_volume.call_count == 0 assert ( - len(mock_docker_utils.get_unused_dynamc_sidecar_volumes()) + len(mock_docker_utils.get_unused_dynamic_sidecar_volumes()) == len(VOLUMES_TO_CREATE) * service_count ) await volumes_manager.remove_all_volumes() assert spy_remove_volume.call_count == len(VOLUMES_TO_CREATE) * service_count - assert len(mock_docker_utils.get_unused_dynamc_sidecar_volumes()) == 0 + assert len(mock_docker_utils.get_unused_dynamic_sidecar_volumes()) == 0 async def test_volumes_manager_remove_service_volumes( @@ -121,22 +121,22 @@ async def test_volumes_manager_remove_service_volumes( ): assert spy_remove_volume.call_count == 0 mock_docker_utils.add_unused_volumes_for_service(uuid4()) - node_id_to_remvoe = uuid4() - mock_docker_utils.add_unused_volumes_for_service(node_id_to_remvoe) + node_id_to_remove = uuid4() + mock_docker_utils.add_unused_volumes_for_service(node_id_to_remove) assert spy_remove_volume.call_count == 0 assert ( - len(mock_docker_utils.get_unused_dynamc_sidecar_volumes()) + len(mock_docker_utils.get_unused_dynamic_sidecar_volumes()) == len(VOLUMES_TO_CREATE) * 2 ) - await volumes_manager.remove_service_volumes(node_id_to_remvoe) + await volumes_manager.remove_service_volumes(node_id_to_remove) assert spy_remove_volume.call_count == len(VOLUMES_TO_CREATE) - unused_volumes = mock_docker_utils.get_unused_dynamc_sidecar_volumes() + unused_volumes = mock_docker_utils.get_unused_dynamic_sidecar_volumes() assert len(unused_volumes) == len(VOLUMES_TO_CREATE) for volume_name in unused_volumes: - assert f"{node_id_to_remvoe}" not in volume_name + assert f"{node_id_to_remove}" not in volume_name @pytest.fixture @@ -184,4 +184,4 @@ async def _run_volumes_clennup() -> None: with attempt: await _run_volumes_clennup() assert spy_remove_volume.call_count == len(VOLUMES_TO_CREATE) - assert len(mock_docker_utils.get_unused_dynamc_sidecar_volumes()) == 0 + assert len(mock_docker_utils.get_unused_dynamic_sidecar_volumes()) == 0 diff --git a/services/director-v2/.env-devel b/services/director-v2/.env-devel index 83b9a460ac08..6d806d9d46b9 100644 --- a/services/director-v2/.env-devel +++ b/services/director-v2/.env-devel @@ -61,6 +61,7 @@ R_CLONE_PROVIDER=MINIO R_CLONE_OPTION_TRANSFERS=5 R_CLONE_OPTION_RETRIES=3 R_CLONE_OPTION_BUFFER_SIZE=16M +R_CLONE_MOUNT_SETTINGS={} TRACING_OBSERVABILITY_BACKEND_ENDPOINT=http://jaeger:9411 TRAEFIK_SIMCORE_ZONE=internal_simcore_stack diff --git a/services/director-v2/requirements/_test.in b/services/director-v2/requirements/_test.in index 34b5327e2538..409b03549001 100644 --- a/services/director-v2/requirements/_test.in +++ b/services/director-v2/requirements/_test.in @@ -10,7 +10,6 @@ --constraint _base.txt aio_pika -aioboto3 alembic # migration due to pytest_simcore.postgres_service2 asgi_lifespan async-asgi-testclient # replacement for fastapi.testclient.TestClient [see b) below] diff --git a/services/director-v2/requirements/_test.txt b/services/director-v2/requirements/_test.txt index 92b0a1a8d9e3..d954fa735917 100644 --- a/services/director-v2/requirements/_test.txt +++ b/services/director-v2/requirements/_test.txt @@ -46,7 +46,6 @@ async-asgi-testclient==1.4.11 attrs==25.4.0 # via # -c requirements/_base.txt - # aiohttp # pytest-docker bokeh==3.8.1 # via dask @@ -145,11 +144,6 @@ jinja2==3.1.6 # bokeh # dask # distributed -jmespath==1.0.1 - # via - # aiobotocore - # boto3 - # botocore locket==1.0.0 # via # -c requirements/_base.txt @@ -174,8 +168,6 @@ msgpack==1.1.2 multidict==6.7.0 # via # -c requirements/_base.txt - # aiobotocore - # aiohttp # async-asgi-testclient # yarl mypy==1.18.2 @@ -221,7 +213,6 @@ pprintpp==0.4.0 propcache==0.4.1 # via # -c requirements/_base.txt - # aiohttp # yarl psutil==7.1.3 # via @@ -257,8 +248,6 @@ pytest-xdist==3.8.0 python-dateutil==2.9.0.post0 # via # -c requirements/_base.txt - # aiobotocore - # botocore # pandas pytz==2025.2 # via pandas @@ -341,7 +330,6 @@ urllib3==2.5.0 # via # -c requirements/../../../requirements/constraints.txt # -c requirements/_base.txt - # botocore # distributed # docker # requests @@ -355,7 +343,6 @@ yarl==1.22.0 # via # -c requirements/_base.txt # aio-pika - # aiohttp # aiormq zict==3.0.0 # via diff --git a/services/director-v2/src/simcore_service_director_v2/core/dynamic_services_settings/sidecar.py b/services/director-v2/src/simcore_service_director_v2/core/dynamic_services_settings/sidecar.py index 776834039339..77a6c54d81e3 100644 --- a/services/director-v2/src/simcore_service_director_v2/core/dynamic_services_settings/sidecar.py +++ b/services/director-v2/src/simcore_service_director_v2/core/dynamic_services_settings/sidecar.py @@ -1,6 +1,5 @@ import logging import warnings -from enum import Enum from pathlib import Path from typing import Annotated @@ -18,14 +17,13 @@ AliasChoices, Field, Json, - PositiveInt, ValidationInfo, field_validator, ) from settings_library.base import BaseCustomSettings from settings_library.basic_types import PortInt from settings_library.efs import AwsEfsSettings -from settings_library.r_clone import RCloneSettings as SettingsLibraryRCloneSettings +from settings_library.r_clone import RCloneSettings from settings_library.utils_logging import MixinLoggingSettings from settings_library.utils_service import DEFAULT_FASTAPI_PORT @@ -34,39 +32,6 @@ _logger = logging.getLogger(__name__) -class VFSCacheMode(str, Enum): - __slots__ = () - - OFF = "off" - MINIMAL = "minimal" - WRITES = "writes" - FULL = "full" - - -class RCloneSettings(SettingsLibraryRCloneSettings): - R_CLONE_DIR_CACHE_TIME_SECONDS: Annotated[ - PositiveInt, Field(description="time to cache directory entries for") - ] = 10 - R_CLONE_POLL_INTERVAL_SECONDS: Annotated[ - PositiveInt, Field(description="time to wait between polling for changes") - ] = 9 - R_CLONE_VFS_CACHE_MODE: Annotated[ - VFSCacheMode, - Field( - description="VFS operation mode, defines how and when the disk cache is synced" - ), - ] = VFSCacheMode.MINIMAL - - @field_validator("R_CLONE_POLL_INTERVAL_SECONDS") - @classmethod - def enforce_r_clone_requirement(cls, v: int, info: ValidationInfo) -> PositiveInt: - dir_cache_time = info.data["R_CLONE_DIR_CACHE_TIME_SECONDS"] - if v >= dir_cache_time: - msg = f"R_CLONE_POLL_INTERVAL_SECONDS={v} must be lower than R_CLONE_DIR_CACHE_TIME_SECONDS={dir_cache_time}" - raise ValueError(msg) - return v - - class PlacementSettings(BaseCustomSettings): DIRECTOR_V2_SERVICES_CUSTOM_PLACEMENT_CONSTRAINTS: Annotated[ list[DockerPlacementConstraint], diff --git a/services/director-v2/src/simcore_service_director_v2/core/settings.py b/services/director-v2/src/simcore_service_director_v2/core/settings.py index 0a85dc4cb313..8bb1bdd61d65 100644 --- a/services/director-v2/src/simcore_service_director_v2/core/settings.py +++ b/services/director-v2/src/simcore_service_director_v2/core/settings.py @@ -62,7 +62,7 @@ class ComputationalBackendSettings(BaseCustomSettings): COMPUTATIONAL_BACKEND_PER_CLUSTER_MAX_DISTRIBUTED_CONCURRENT_CONNECTIONS: Annotated[ PositiveInt, Field( - description="defines how many concurrent connections to each dask scheduler are allowed accross all director-v2 replicas" + description="defines how many concurrent connections to each dask scheduler are allowed across all director-v2 replicas" ), ] = 20 COMPUTATIONAL_BACKEND_DEFAULT_CLUSTER_URL: Annotated[ @@ -203,7 +203,7 @@ class AppSettings(BaseApplicationSettings, MixinLoggingSettings): datetime.timedelta, Field( description="Service scheduler heartbeat (everytime a heartbeat is sent into RabbitMQ)" - " (default to seconds, or see https://pydantic-docs.helpmanual.io/usage/types/#datetime-types for string formating)" + " (default to seconds, or see https://pydantic-docs.helpmanual.io/usage/types/#datetime-types for string formatting)" ), ] = DEFAULT_RESOURCE_USAGE_HEARTBEAT_INTERVAL diff --git a/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/docker_service_specs/sidecar.py b/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/docker_service_specs/sidecar.py index 87dd5d2c1439..978a5e387463 100644 --- a/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/docker_service_specs/sidecar.py +++ b/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/docker_service_specs/sidecar.py @@ -148,6 +148,7 @@ def _get_environment_variables( "R_CLONE_OPTION_TRANSFERS": f"{r_clone_settings.R_CLONE_OPTION_TRANSFERS}", "R_CLONE_OPTION_RETRIES": f"{r_clone_settings.R_CLONE_OPTION_RETRIES}", "R_CLONE_OPTION_BUFFER_SIZE": r_clone_settings.R_CLONE_OPTION_BUFFER_SIZE, + "R_CLONE_MOUNT_SETTINGS": r_clone_settings.R_CLONE_MOUNT_SETTINGS.model_dump_json(), "RABBIT_HOST": f"{rabbit_settings.RABBIT_HOST}", "RABBIT_PASSWORD": f"{rabbit_settings.RABBIT_PASSWORD.get_secret_value()}", "RABBIT_PORT": f"{rabbit_settings.RABBIT_PORT}", @@ -230,7 +231,6 @@ async def _get_mounts( scheduler_data: SchedulerData, dynamic_sidecar_settings: DynamicSidecarSettings, dynamic_services_scheduler_settings: DynamicServicesSchedulerSettings, - app_settings: AppSettings, has_quota_support: bool, rpc_client: RabbitMQRPCClient, is_efs_enabled: bool, @@ -310,19 +310,6 @@ async def _get_mounts( storage_directory_name=_storage_directory_name, ) ) - # for now only enable this with dev features enabled - elif app_settings.DIRECTOR_V2_DEV_FEATURE_R_CLONE_MOUNTS_ENABLED: - mounts.append( - DynamicSidecarVolumesPathsResolver.mount_r_clone( - swarm_stack_name=dynamic_services_scheduler_settings.SWARM_STACK_NAME, - path=path_to_mount, - node_uuid=scheduler_data.node_uuid, - service_run_id=scheduler_data.run_id, - project_id=scheduler_data.project_id, - user_id=scheduler_data.user_id, - r_clone_settings=dynamic_sidecar_settings.DYNAMIC_SIDECAR_R_CLONE_SETTINGS, - ) - ) else: mounts.append( DynamicSidecarVolumesPathsResolver.mount_entry( @@ -336,6 +323,18 @@ async def _get_mounts( ) ) + if scheduler_data.paths_mapping.state_paths: + mounts.append( + DynamicSidecarVolumesPathsResolver.mount_vfs_cache( + swarm_stack_name=dynamic_services_scheduler_settings.SWARM_STACK_NAME, + node_uuid=scheduler_data.node_uuid, + service_run_id=scheduler_data.run_id, + project_id=scheduler_data.project_id, + user_id=scheduler_data.user_id, + has_quota_support=has_quota_support, + ) + ) + if dynamic_sidecar_path := dynamic_sidecar_settings.DYNAMIC_SIDECAR_MOUNT_PATH_DEV: # Settings validators guarantees that this never happens in production mode assert ( @@ -440,7 +439,6 @@ async def get_dynamic_sidecar_spec( # pylint:disable=too-many-arguments# noqa: scheduler_data=scheduler_data, dynamic_services_scheduler_settings=dynamic_services_scheduler_settings, dynamic_sidecar_settings=dynamic_sidecar_settings, - app_settings=app_settings, has_quota_support=has_quota_support, rpc_client=rpc_client, is_efs_enabled=user_extra_properties.is_efs_enabled, diff --git a/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/scheduler/_core/_events_utils.py b/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/scheduler/_core/_events_utils.py index c9236448256c..5427952ff607 100644 --- a/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/scheduler/_core/_events_utils.py +++ b/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/scheduler/_core/_events_utils.py @@ -6,10 +6,9 @@ from common_library.json_serialization import json_loads from fastapi import FastAPI -from models_library.api_schemas_long_running_tasks.base import ProgressPercent from models_library.products import ProductName from models_library.projects_networks import ProjectsNetworks -from models_library.projects_nodes_io import NodeID, NodeIDStr +from models_library.projects_nodes_io import NodeID from models_library.rabbitmq_messages import InstrumentationRabbitMessage from models_library.rpc.webserver.auth.api_keys import generate_unique_api_key from models_library.service_settings_labels import SimcoreServiceLabels @@ -211,17 +210,11 @@ async def service_remove_sidecar_proxy_docker_networks_and_volumes( app: FastAPI, node_uuid: NodeID, swarm_stack_name: str, - set_were_state_and_outputs_saved: bool | None = None, ) -> None: scheduler_data: SchedulerData = _get_scheduler_data(app, node_uuid) rabbit_rpc_client: RabbitMQRPCClient = app.state.rabbitmq_rpc_client - if set_were_state_and_outputs_saved is not None: - scheduler_data.dynamic_sidecar.were_state_and_outputs_saved = True - - await task_progress.update( - message="removing dynamic sidecar stack", percent=ProgressPercent(0.1) - ) + await task_progress.update(message="removing dynamic sidecar stack", percent=0.1) await remove_dynamic_sidecar_stack( node_uuid=scheduler_data.node_uuid, @@ -235,42 +228,32 @@ async def service_remove_sidecar_proxy_docker_networks_and_volumes( node_id=scheduler_data.node_uuid, ) - await task_progress.update(message="removing network", percent=ProgressPercent(0.2)) + await task_progress.update(message="removing network", percent=0.2) await remove_dynamic_sidecar_network(scheduler_data.dynamic_sidecar_network_name) - if scheduler_data.dynamic_sidecar.were_state_and_outputs_saved: - if scheduler_data.dynamic_sidecar.docker_node_id is None: - _logger.warning( - "Skipped volume removal for %s, since a docker_node_id was not found.", - scheduler_data.node_uuid, - ) - else: - # Remove all dy-sidecar associated volumes from node - await task_progress.update( - message="removing volumes", percent=ProgressPercent(0.3) - ) - with log_context(_logger, logging.DEBUG, f"removing volumes '{node_uuid}'"): - try: - await remove_volumes_without_backup_for_service( - rabbit_rpc_client, - docker_node_id=scheduler_data.dynamic_sidecar.docker_node_id, - swarm_stack_name=swarm_stack_name, - node_id=scheduler_data.node_uuid, - ) - except ( - NoServiceVolumesFoundRPCError, - RemoteMethodNotRegisteredError, # happens when autoscaling node was removed - ) as e: - _logger.info("Could not remove volumes, because: '%s'", e) + if scheduler_data.dynamic_sidecar.docker_node_id: + # Remove all dy-sidecar associated volumes from node + await task_progress.update(message="removing volumes", percent=0.3) + with log_context(_logger, logging.DEBUG, f"removing volumes '{node_uuid}'"): + try: + await remove_volumes_without_backup_for_service( + rabbit_rpc_client, + docker_node_id=scheduler_data.dynamic_sidecar.docker_node_id, + swarm_stack_name=swarm_stack_name, + node_id=scheduler_data.node_uuid, + ) + except ( + NoServiceVolumesFoundRPCError, + RemoteMethodNotRegisteredError, # happens when autoscaling node was removed + ) as e: + _logger.info("Could not remove volumes, because: '%s'", e) _logger.debug( "Removed dynamic-sidecar services and crated container for '%s'", scheduler_data.service_name, ) - await task_progress.update( - message="removing project networks", percent=ProgressPercent(0.8) - ) + await task_progress.update(message="removing project networks", percent=0.8) used_projects_networks = await get_projects_networks_containers( project_id=scheduler_data.project_id ) @@ -290,9 +273,7 @@ async def service_remove_sidecar_proxy_docker_networks_and_volumes( await _cleanup_long_running_tasks(app, scheduler_data.run_id) - await task_progress.update( - message="finished removing resources", percent=ProgressPercent(1) - ) + await task_progress.update(message="finished removing resources", percent=1) async def _cleanup_long_running_tasks( @@ -371,16 +352,10 @@ async def attempt_pod_removal_and_data_saving( try: tasks = [ - service_push_outputs(app, scheduler_data.node_uuid, sidecars_client) + service_push_outputs(app, scheduler_data.node_uuid, sidecars_client), + service_save_state(app, scheduler_data.node_uuid, sidecars_client), ] - # When enabled no longer uploads state via nodeports - # It uses rclone mounted volumes for this task. - if not app_settings.DIRECTOR_V2_DEV_FEATURE_R_CLONE_MOUNTS_ENABLED: - tasks.append( - service_save_state(app, scheduler_data.node_uuid, sidecars_client) - ) - await logged_gather(*tasks, max_concurrency=2) scheduler_data.dynamic_sidecar.were_state_and_outputs_saved = True @@ -458,7 +433,7 @@ async def attach_project_networks(app: FastAPI, scheduler_data: SchedulerData) - network_name, container_aliases, ) in projects_networks.networks_with_aliases.items(): - network_alias = container_aliases.get(NodeIDStr(scheduler_data.node_uuid)) + network_alias = container_aliases.get(f"{scheduler_data.node_uuid}") if network_alias is not None: await sidecars_client.attach_service_containers_to_project_network( dynamic_sidecar_endpoint=dynamic_sidecar_endpoint, @@ -496,7 +471,6 @@ async def wait_for_sidecar_api(app: FastAPI, scheduler_data: SchedulerData) -> N async def prepare_services_environment( app: FastAPI, scheduler_data: SchedulerData ) -> None: - app_settings: AppSettings = app.state.settings sidecars_client = await get_sidecars_client(app, scheduler_data.node_uuid) dynamic_sidecar_endpoint = scheduler_data.endpoint @@ -565,11 +539,8 @@ async def _restore_service_state_with_metrics() -> None: tasks = [ _pull_user_services_images_with_metrics(), _pull_output_ports_with_metrics(), + _restore_service_state_with_metrics(), ] - # When enabled no longer downloads state via nodeports - # S3 is used to store state paths - if not app_settings.DIRECTOR_V2_DEV_FEATURE_R_CLONE_MOUNTS_ENABLED: - tasks.append(_restore_service_state_with_metrics()) await limited_gather(*tasks, limit=3) diff --git a/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/volumes.py b/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/volumes.py index bf375b29eede..fcfd5ebcbef6 100644 --- a/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/volumes.py +++ b/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/volumes.py @@ -1,6 +1,6 @@ import os from pathlib import Path -from typing import Any +from typing import Any, Final from models_library.api_schemas_directorv2.services import ( CHARS_IN_VOLUME_NAME_BEFORE_DIR_NAME, @@ -20,72 +20,18 @@ WRITE_SIZE, AwsEfsSettings, ) -from settings_library.r_clone import S3Provider +from settings_library.r_clone import DEFAULT_VFS_CACHE_MAX_SIZE, DEFAULT_VFS_CACHE_PATH -from ...core.dynamic_services_settings.sidecar import RCloneSettings -from .errors import DynamicSidecarError +_BASE_PATH: Path = Path("/dy-volumes") +# below are subfolders in `_BASE_PATH` +_DY_SIDECAR_SUBFOLDER_SHARED_STORE: Final[Path] = Path("/shared-store") +_DY_SIDECAR_SUBFOLDER_VFS_CACHE: Final[Path] = DEFAULT_VFS_CACHE_PATH -DY_SIDECAR_SHARED_STORE_PATH = Path("/shared-store") - -def _get_s3_volume_driver_config( - r_clone_settings: RCloneSettings, - project_id: ProjectID, - node_uuid: NodeID, - storage_directory_name: str, -) -> dict[str, Any]: - assert "/" not in storage_directory_name # nosec - driver_config: dict[str, Any] = { - "Name": "rclone", - "Options": { - "type": "s3", - "s3-access_key_id": r_clone_settings.R_CLONE_S3.S3_ACCESS_KEY, - "s3-secret_access_key": r_clone_settings.R_CLONE_S3.S3_SECRET_KEY, - "path": f"{r_clone_settings.R_CLONE_S3.S3_BUCKET_NAME}/{project_id}/{node_uuid}/{storage_directory_name}", - "allow-other": "true", - "vfs-cache-mode": r_clone_settings.R_CLONE_VFS_CACHE_MODE.value, - # Directly connected to how much time it takes for - # files to appear on remote s3, please se discussion - # SEE https://forum.rclone.org/t/file-added-to-s3-on-one-machine-not-visible-on-2nd-machine-unless-mount-is-restarted/20645 - # SEE https://rclone.org/commands/rclone_mount/#vfs-directory-cache - "dir-cache-time": f"{r_clone_settings.R_CLONE_DIR_CACHE_TIME_SECONDS}s", - "poll-interval": f"{r_clone_settings.R_CLONE_POLL_INTERVAL_SECONDS}s", - }, - } - if r_clone_settings.R_CLONE_S3.S3_ENDPOINT: - driver_config["Options"][ - "s3-endpoint" - ] = r_clone_settings.R_CLONE_S3.S3_ENDPOINT - - extra_options: dict[str, str] | None = None - - if r_clone_settings.R_CLONE_PROVIDER == S3Provider.MINIO: - extra_options = { - "s3-provider": "Minio", - "s3-region": "us-east-1", - "s3-location_constraint": "", - "s3-server_side_encryption": "", - } - elif r_clone_settings.R_CLONE_PROVIDER == S3Provider.CEPH: - extra_options = { - "s3-provider": "Ceph", - "s3-acl": "private", - } - elif r_clone_settings.R_CLONE_PROVIDER == S3Provider.AWS: - extra_options = { - "s3-provider": "AWS", - "s3-region": r_clone_settings.R_CLONE_S3.S3_REGION, - "s3-acl": "private", - } - else: - msg = f"Unexpected, all {S3Provider.__name__} should be covered" - raise DynamicSidecarError(msg=msg) - - assert extra_options is not None # nosec - options: dict[str, Any] = driver_config["Options"] - options.update(extra_options) - - return driver_config +# DEFAULT LIMITS +_LIMIT_SHARED_STORE: Final[str] = "1M" +_LIMIT_VFS_CACHE: Final[str] = DEFAULT_VFS_CACHE_MAX_SIZE +_LIMIT_USER_PREFERENCES: Final[str] = "10M" def _get_efs_volume_driver_config( @@ -106,12 +52,10 @@ def _get_efs_volume_driver_config( class DynamicSidecarVolumesPathsResolver: - BASE_PATH: Path = Path("/dy-volumes") - @classmethod def target(cls, path: Path) -> str: """Returns a folder path within `/dy-volumes` folder""" - target_path = cls.BASE_PATH / path.relative_to("/") + target_path = _BASE_PATH / path.relative_to("/") return f"{target_path}" @classmethod @@ -192,12 +136,33 @@ def mount_shared_store( ) -> dict[str, Any]: return cls.mount_entry( swarm_stack_name=swarm_stack_name, - path=DY_SIDECAR_SHARED_STORE_PATH, + path=_DY_SIDECAR_SUBFOLDER_SHARED_STORE, node_uuid=node_uuid, service_run_id=service_run_id, project_id=project_id, user_id=user_id, - volume_size_limit="1M" if has_quota_support else None, + volume_size_limit=_LIMIT_SHARED_STORE if has_quota_support else None, + ) + + @classmethod + def mount_vfs_cache( + cls, + service_run_id: ServiceRunID, + node_uuid: NodeID, + project_id: ProjectID, + user_id: UserID, + swarm_stack_name: str, + *, + has_quota_support: bool, + ) -> dict[str, Any]: + return cls.mount_entry( + swarm_stack_name=swarm_stack_name, + path=_DY_SIDECAR_SUBFOLDER_VFS_CACHE, + node_uuid=node_uuid, + service_run_id=service_run_id, + project_id=project_id, + user_id=user_id, + volume_size_limit=_LIMIT_VFS_CACHE if has_quota_support else None, ) @classmethod @@ -222,42 +187,9 @@ def mount_user_preferences( # NOTE: the contents of this volume will be zipped and much # be at most `_MAX_PREFERENCES_TOTAL_SIZE`, this 10M accounts # for files and data that can be compressed a lot - volume_size_limit="10M" if has_quota_support else None, + volume_size_limit=_LIMIT_USER_PREFERENCES if has_quota_support else None, ) - @classmethod - def mount_r_clone( - cls, - swarm_stack_name: str, - path: Path, - node_uuid: NodeID, - service_run_id: ServiceRunID, - project_id: ProjectID, - user_id: UserID, - r_clone_settings: RCloneSettings, - ) -> dict[str, Any]: - return { - "Source": cls.source(path, node_uuid, service_run_id), - "Target": cls.target(path), - "Type": "volume", - "VolumeOptions": { - "Labels": { - "source": cls.source(path, node_uuid, service_run_id), - "run_id": f"{service_run_id}", - "node_uuid": f"{node_uuid}", - "study_id": f"{project_id}", - "user_id": f"{user_id}", - "swarm_stack_name": swarm_stack_name, - }, - "DriverConfig": _get_s3_volume_driver_config( - r_clone_settings=r_clone_settings, - project_id=project_id, - node_uuid=node_uuid, - storage_directory_name=cls.volume_name(path).strip("_"), - ), - }, - } - @classmethod def mount_efs( cls, diff --git a/services/director-v2/tests/integration/02/test_dynamic_sidecar_nodeports_integration.py b/services/director-v2/tests/integration/02/test_dynamic_sidecar_nodeports_integration.py index 9dbdf3865293..e329af45eade 100644 --- a/services/director-v2/tests/integration/02/test_dynamic_sidecar_nodeports_integration.py +++ b/services/director-v2/tests/integration/02/test_dynamic_sidecar_nodeports_integration.py @@ -14,7 +14,6 @@ from typing import Any, NamedTuple, cast from uuid import uuid4 -import aioboto3 import aiodocker import httpx import pytest @@ -97,7 +96,6 @@ is_legacy, patch_dynamic_service_url, run_command, - sleep_for, ) from yarl import URL @@ -333,26 +331,6 @@ async def db_manager(sqlalchemy_async_engine: AsyncEngine) -> DBManager: return DBManager(sqlalchemy_async_engine, application_name=APP_NAME) -def _is_docker_r_clone_plugin_installed() -> bool: - return "rclone:" in run_command("docker plugin ls") - - -@pytest.fixture( - scope="session", - params={ - # NOTE: There is an issue with the docker rclone volume plugin: - # SEE https://github.com/rclone/rclone/issues/6059 - # Disabling rclone test until this is fixed. - # "true", - "false", - }, -) -def dev_feature_r_clone_enabled(request) -> str: - if request.param == "true" and not _is_docker_r_clone_plugin_installed(): - pytest.skip("Required docker plugin `rclone` not installed.") - return request.param - - @pytest.fixture async def patch_storage_setup( mocker: MockerFixture, @@ -378,7 +356,6 @@ def mock_env( mock_env: EnvVarsDict, monkeypatch: pytest.MonkeyPatch, network_name: str, - dev_feature_r_clone_enabled: str, dask_scheduler_service: str, dask_scheduler_auth: ClusterAuthentication, minimal_configuration: None, @@ -425,7 +402,6 @@ def mock_env( "RABBIT_HOST": f"{get_localhost_ip()}", "POSTGRES_HOST": f"{get_localhost_ip()}", "R_CLONE_PROVIDER": "MINIO", - "DIRECTOR_V2_DEV_FEATURE_R_CLONE_MOUNTS_ENABLED": dev_feature_r_clone_enabled, "COMPUTATIONAL_BACKEND_ENABLED": "true", "COMPUTATIONAL_BACKEND_DASK_CLIENT_ENABLED": "true", "COMPUTATIONAL_BACKEND_DEFAULT_CLUSTER_URL": dask_scheduler_service, @@ -713,36 +689,6 @@ async def _fetch_data_via_data_manager( return save_to -async def _fetch_data_via_aioboto( - r_clone_settings: RCloneSettings, - dir_tag: str, - temp_dir: Path, - node_id: NodeIDStr, - project_id: ProjectID, -) -> Path: - save_to = temp_dir / f"aioboto_{dir_tag}_{uuid4()}" - save_to.mkdir(parents=True, exist_ok=True) - - session = aioboto3.Session( - aws_access_key_id=r_clone_settings.R_CLONE_S3.S3_ACCESS_KEY, - aws_secret_access_key=r_clone_settings.R_CLONE_S3.S3_SECRET_KEY, - ) - async with session.resource( - "s3", endpoint_url=r_clone_settings.R_CLONE_S3.S3_ENDPOINT - ) as s3: - bucket = await s3.Bucket(r_clone_settings.R_CLONE_S3.S3_BUCKET_NAME) - async for s3_object in bucket.objects.all(): - key_path = f"{project_id}/{node_id}/{DY_SERVICES_R_CLONE_DIR_NAME}/" - if s3_object.key.startswith(key_path): - file_object = await s3_object.get() - file_path = save_to / s3_object.key.replace(key_path, "") - print(f"Saving file to {file_path}") - file_content = await file_object["Body"].read() - file_path.write_bytes(file_content) - - return save_to - - async def _start_and_wait_for_dynamic_services_ready( director_v2_client: httpx.AsyncClient, product_name: str, @@ -1077,39 +1023,13 @@ async def test_nodeports_integration( app_settings.DYNAMIC_SERVICES.DYNAMIC_SIDECAR.DYNAMIC_SIDECAR_R_CLONE_SETTINGS ) - if app_settings.DIRECTOR_V2_DEV_FEATURE_R_CLONE_MOUNTS_ENABLED: - await sleep_for( - WAIT_FOR_R_CLONE_VOLUME_TO_SYNC_DATA, - "Waiting for rclone to sync data from the docker volume", - ) - - dy_path_volume_before = ( - await _fetch_data_via_aioboto( - r_clone_settings=r_clone_settings, - dir_tag="dy", - temp_dir=tmp_path, - node_id=services_node_uuids.dy, - project_id=current_study.uuid, - ) - if app_settings.DIRECTOR_V2_DEV_FEATURE_R_CLONE_MOUNTS_ENABLED - else await _fetch_data_from_container( - dir_tag="dy", service_uuid=services_node_uuids.dy, temp_dir=tmp_path - ) + dy_path_volume_before = await _fetch_data_from_container( + dir_tag="dy", service_uuid=services_node_uuids.dy, temp_dir=tmp_path ) - dy_compose_spec_path_volume_before = ( - await _fetch_data_via_aioboto( - r_clone_settings=r_clone_settings, - dir_tag="dy_compose_spec", - temp_dir=tmp_path, - node_id=services_node_uuids.dy_compose_spec, - project_id=current_study.uuid, - ) - if app_settings.DIRECTOR_V2_DEV_FEATURE_R_CLONE_MOUNTS_ENABLED - else await _fetch_data_from_container( - dir_tag="dy_compose_spec", - service_uuid=services_node_uuids.dy_compose_spec, - temp_dir=tmp_path, - ) + dy_compose_spec_path_volume_before = await _fetch_data_from_container( + dir_tag="dy_compose_spec", + service_uuid=services_node_uuids.dy_compose_spec, + temp_dir=tmp_path, ) # STEP 5 @@ -1127,52 +1047,26 @@ async def test_nodeports_integration( await _wait_for_dy_services_to_fully_stop(async_client) - if app_settings.DIRECTOR_V2_DEV_FEATURE_R_CLONE_MOUNTS_ENABLED: - await sleep_for( - WAIT_FOR_R_CLONE_VOLUME_TO_SYNC_DATA, - "Waiting for rclone to sync data from the docker volume", - ) - - dy_path_data_manager_before = ( - await _fetch_data_via_aioboto( - r_clone_settings=r_clone_settings, - dir_tag="dy", - temp_dir=tmp_path, - node_id=services_node_uuids.dy, - project_id=current_study.uuid, - ) - if app_settings.DIRECTOR_V2_DEV_FEATURE_R_CLONE_MOUNTS_ENABLED - else await _fetch_data_via_data_manager( - r_clone_settings=r_clone_settings, - dir_tag="dy", - user_id=current_user["id"], - project_id=current_study.uuid, - service_uuid=NodeID(services_node_uuids.dy), - temp_dir=tmp_path, - io_log_redirect_cb=mock_io_log_redirect_cb, - faker=faker, - ) + dy_path_data_manager_before = await _fetch_data_via_data_manager( + r_clone_settings=r_clone_settings, + dir_tag="dy", + user_id=current_user["id"], + project_id=current_study.uuid, + service_uuid=NodeID(services_node_uuids.dy), + temp_dir=tmp_path, + io_log_redirect_cb=mock_io_log_redirect_cb, + faker=faker, ) - dy_compose_spec_path_data_manager_before = ( - await _fetch_data_via_aioboto( - r_clone_settings=r_clone_settings, - dir_tag="dy_compose_spec", - temp_dir=tmp_path, - node_id=services_node_uuids.dy_compose_spec, - project_id=current_study.uuid, - ) - if app_settings.DIRECTOR_V2_DEV_FEATURE_R_CLONE_MOUNTS_ENABLED - else await _fetch_data_via_data_manager( - r_clone_settings=r_clone_settings, - dir_tag="dy_compose_spec", - user_id=current_user["id"], - project_id=current_study.uuid, - service_uuid=NodeID(services_node_uuids.dy_compose_spec), - temp_dir=tmp_path, - io_log_redirect_cb=mock_io_log_redirect_cb, - faker=faker, - ) + dy_compose_spec_path_data_manager_before = await _fetch_data_via_data_manager( + r_clone_settings=r_clone_settings, + dir_tag="dy_compose_spec", + user_id=current_user["id"], + project_id=current_study.uuid, + service_uuid=NodeID(services_node_uuids.dy_compose_spec), + temp_dir=tmp_path, + io_log_redirect_cb=mock_io_log_redirect_cb, + faker=faker, ) # STEP 6 @@ -1187,33 +1081,13 @@ async def test_nodeports_integration( catalog_url=services_endpoint["catalog"], ) - dy_path_volume_after = ( - await _fetch_data_via_aioboto( - r_clone_settings=r_clone_settings, - dir_tag="dy", - temp_dir=tmp_path, - node_id=services_node_uuids.dy, - project_id=current_study.uuid, - ) - if app_settings.DIRECTOR_V2_DEV_FEATURE_R_CLONE_MOUNTS_ENABLED - else await _fetch_data_from_container( - dir_tag="dy", service_uuid=services_node_uuids.dy, temp_dir=tmp_path - ) + dy_path_volume_after = await _fetch_data_from_container( + dir_tag="dy", service_uuid=services_node_uuids.dy, temp_dir=tmp_path ) - dy_compose_spec_path_volume_after = ( - await _fetch_data_via_aioboto( - r_clone_settings=r_clone_settings, - dir_tag="dy_compose_spec", - temp_dir=tmp_path, - node_id=services_node_uuids.dy_compose_spec, - project_id=current_study.uuid, - ) - if app_settings.DIRECTOR_V2_DEV_FEATURE_R_CLONE_MOUNTS_ENABLED - else await _fetch_data_from_container( - dir_tag="dy_compose_spec", - service_uuid=services_node_uuids.dy_compose_spec, - temp_dir=tmp_path, - ) + dy_compose_spec_path_volume_after = await _fetch_data_from_container( + dir_tag="dy_compose_spec", + service_uuid=services_node_uuids.dy_compose_spec, + temp_dir=tmp_path, ) # STEP 7 diff --git a/services/director-v2/tests/unit/test_core_settings.py b/services/director-v2/tests/unit/test_core_settings.py index 9b35f54957ac..1c1e557d6bff 100644 --- a/services/director-v2/tests/unit/test_core_settings.py +++ b/services/director-v2/tests/unit/test_core_settings.py @@ -26,7 +26,7 @@ def _get_backend_type_options() -> set[str]: def test_supported_backends_did_not_change() -> None: - _EXPECTED = {"AWS", "CEPH", "MINIO"} + _EXPECTED = {"AWS", "CEPH", "MINIO", "AWS_MOTO"} assert _get_backend_type_options() == _EXPECTED, ( "Backend configuration change, please code support for " "it in volumes_resolver -> _get_s3_volume_driver_config. " diff --git a/services/director-v2/tests/unit/test_modules_dynamic_sidecar_docker_service_specs_sidecar.py b/services/director-v2/tests/unit/test_modules_dynamic_sidecar_docker_service_specs_sidecar.py index da645555f4ce..eb4c04dca65a 100644 --- a/services/director-v2/tests/unit/test_modules_dynamic_sidecar_docker_service_specs_sidecar.py +++ b/services/director-v2/tests/unit/test_modules_dynamic_sidecar_docker_service_specs_sidecar.py @@ -44,6 +44,7 @@ "POSTGRES_PASSWORD", "POSTGRES_PORT", "POSTGRES_USER", + "R_CLONE_MOUNT_SETTINGS", "R_CLONE_OPTION_BUFFER_SIZE", "R_CLONE_OPTION_RETRIES", "R_CLONE_OPTION_TRANSFERS", diff --git a/services/docker-compose.yml b/services/docker-compose.yml index 13282c470e08..92d14a6fd73a 100644 --- a/services/docker-compose.yml +++ b/services/docker-compose.yml @@ -419,6 +419,7 @@ services: R_CLONE_OPTION_RETRIES: ${R_CLONE_OPTION_RETRIES} R_CLONE_OPTION_TRANSFERS: ${R_CLONE_OPTION_TRANSFERS} R_CLONE_PROVIDER: ${R_CLONE_PROVIDER} + R_CLONE_MOUNT_SETTINGS: ${R_CLONE_MOUNT_SETTINGS} EFS_DNS_NAME: ${EFS_DNS_NAME} EFS_MOUNTED_PATH: ${EFS_MOUNTED_PATH} diff --git a/services/dynamic-sidecar/docker/boot.sh b/services/dynamic-sidecar/docker/boot.sh index 984ef554a3b8..d6bc4dc4d570 100755 --- a/services/dynamic-sidecar/docker/boot.sh +++ b/services/dynamic-sidecar/docker/boot.sh @@ -48,6 +48,10 @@ DYNAMIC_SIDECAR_REMOTE_DEBUGGING_PORT=${DYNAMIC_SIDECAR_REMOTE_DEBUGGING_PORT:-3 SERVER_LOG_LEVEL=$(echo "${APP_LOG_LEVEL}" | tr '[:upper:]' '[:lower:]') echo "$INFO" "Log-level app/server: $APP_LOG_LEVEL/$SERVER_LOG_LEVEL" +R_CLONE_VERSION=$(rclone version | head -n1 | awk '{print $2}' | sed 's/^v//') && \ + echo "$INFO" "R_CLONE_VERSION=${R_CLONE_VERSION}" && \ + export R_CLONE_VERSION + if [ "${SC_BOOT_MODE}" = "debug" ]; then reload_dir_packages=$(fdfind src /devel/packages --exec echo '--reload-dir {} ' | tr '\n' ' ') diff --git a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/core/application.py b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/core/application.py index b644f8529adf..70834b28db96 100644 --- a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/core/application.py +++ b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/core/application.py @@ -30,6 +30,7 @@ from ..modules.notifications import setup_notifications from ..modules.outputs import setup_outputs from ..modules.prometheus_metrics import setup_prometheus_metrics +from ..modules.r_clone_mount_manager import setup_r_clone_mount_manager from ..modules.resource_tracking import setup_resource_tracking from ..modules.system_monitor import setup_system_monitor from ..modules.user_services_preferences import setup_user_services_preferences @@ -197,6 +198,8 @@ def create_app() -> FastAPI: setup_user_services_preferences(app) + setup_r_clone_mount_manager(app) + if application_settings.are_prometheus_metrics_enabled: setup_prometheus_metrics(app) diff --git a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/long_running_tasks.py b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/long_running_tasks.py index 9387a3867fae..776d2d12563c 100644 --- a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/long_running_tasks.py +++ b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/long_running_tasks.py @@ -2,6 +2,7 @@ import logging from collections.abc import AsyncGenerator from contextlib import asynccontextmanager +from dataclasses import dataclass from pathlib import Path from typing import Any, Final @@ -11,7 +12,7 @@ from models_library.generated_models.docker_rest_api import ContainerState from models_library.rabbitmq_messages import ProgressType, SimcorePlatformStatus from models_library.service_settings_labels import LegacyState -from pydantic import PositiveInt +from pydantic import NonNegativeInt, PositiveInt from servicelib.fastapi import long_running_tasks from servicelib.file_utils import log_directory_changes from servicelib.logging_utils import log_context @@ -19,6 +20,7 @@ from servicelib.progress_bar import ProgressBarData from servicelib.utils import logged_gather from simcore_sdk.node_data import data_manager +from simcore_sdk.node_ports_common.r_clone_mount import MountActivity, Transferring from tenacity import retry from tenacity.before_sleep import before_sleep_log from tenacity.retry import retry_if_result @@ -55,6 +57,7 @@ from ..modules.mounted_fs import MountedVolumes from ..modules.notifications._notifications_ports import PortNotifier from ..modules.outputs import OutputsManager, event_propagation_disabled +from ..modules.r_clone_mount_manager import get_r_clone_mount_manager from .long_running_tasks_utils import ( ensure_read_permissions_on_user_service_data, run_before_shutdown_actions, @@ -344,24 +347,104 @@ def _get_legacy_state_with_dy_volumes_path( ) +_EXPECTED_BIND_PATHS_COUNT: Final[NonNegativeInt] = 2 + + +async def _handler_get_bind_path( + settings: ApplicationSettings, mounted_volumes: MountedVolumes, state_path: Path +) -> list: + vfs_cache_path = await mounted_volumes.get_vfs_cache_docker_volume( + settings.DY_SIDECAR_RUN_ID + ) + + vfs_source, vfs_target = ( + f"{vfs_cache_path}".replace( + f"{settings.DYNAMIC_SIDECAR_DY_VOLUMES_MOUNT_DIR}", "" + ) + ).split(":") + + bind_paths: list[dict] = [ + { + "Type": "bind", + "Source": vfs_source, + "Target": vfs_target, + "BindOptions": {"Propagation": "rshared"}, + } + ] + + state_path_no_dy_volume = state_path.relative_to( + settings.DYNAMIC_SIDECAR_DY_VOLUMES_MOUNT_DIR + ) + matcher = f":/{state_path_no_dy_volume}" + + async for entry in mounted_volumes.iter_state_paths_to_docker_volumes( + settings.DY_SIDECAR_RUN_ID + ): + if entry.endswith(matcher): + mount_str = entry.replace(f"/{state_path_no_dy_volume}", f"{state_path}") + source, target = mount_str.split(":") + bind_paths.append( + { + "Type": "bind", + "Source": source, + "Target": target, + "BindOptions": {"Propagation": "rshared"}, + } + ) + break + + if len(bind_paths) != _EXPECTED_BIND_PATHS_COUNT: + msg = f"Could not resolve volume path for {state_path}" + raise RuntimeError(msg) + + return bind_paths + + +@dataclass +class MountActivitySummary: + path: Path + queued: int + transferring: Transferring + + +async def _handler_mount_activity(state_path: Path, activity: MountActivity) -> None: + # TODO: this object should be pushed to the FE in the future + summary = MountActivitySummary( + path=state_path, queued=len(activity.queued), transferring=activity.transferring + ) + _logger.info("Mount activity %s", summary) + + async def _restore_state_folder( app: FastAPI, *, settings: ApplicationSettings, progress_bar: ProgressBarData, state_path: Path, + index: NonNegativeInt, + mounted_volumes: MountedVolumes, ) -> None: + + assert settings.DY_SIDECAR_PRODUCT_NAME is not None # nosec await data_manager.pull( + product_name=settings.DY_SIDECAR_PRODUCT_NAME, user_id=settings.DY_SIDECAR_USER_ID, project_id=settings.DY_SIDECAR_PROJECT_ID, node_uuid=settings.DY_SIDECAR_NODE_ID, destination_path=Path(state_path), + index=index, io_log_redirect_cb=functools.partial( post_sidecar_log_message, app, log_level=logging.INFO ), r_clone_settings=settings.DY_SIDECAR_R_CLONE_SETTINGS, progress_bar=progress_bar, legacy_state=_get_legacy_state_with_dy_volumes_path(settings), + application_name=f"{APP_NAME}-{settings.DY_SIDECAR_NODE_ID}", + mount_manager=get_r_clone_mount_manager(app), + handler_get_bind_paths=functools.partial( + _handler_get_bind_path, settings, mounted_volumes + ), + handler_mount_activity=_handler_mount_activity, ) @@ -400,9 +483,14 @@ async def restore_user_services_state_paths( await logged_gather( *( _restore_state_folder( - app, settings=settings, progress_bar=root_progress, state_path=path + app, + settings=settings, + progress_bar=root_progress, + state_path=path, + index=k, + mounted_volumes=mounted_volumes, ) - for path in mounted_volumes.disk_state_paths_iter() + for k, path in enumerate(mounted_volumes.disk_state_paths_iter()) ), max_concurrency=CONCURRENCY_STATE_SAVE_RESTORE, reraise=True, # this should raise if there is an issue @@ -422,13 +510,16 @@ async def _save_state_folder( settings: ApplicationSettings, progress_bar: ProgressBarData, state_path: Path, + index: NonNegativeInt, mounted_volumes: MountedVolumes, ) -> None: + assert settings.DY_SIDECAR_PRODUCT_NAME is not None # nosec await data_manager.push( user_id=settings.DY_SIDECAR_USER_ID, project_id=settings.DY_SIDECAR_PROJECT_ID, node_uuid=settings.DY_SIDECAR_NODE_ID, source_path=state_path, + index=index, r_clone_settings=settings.DY_SIDECAR_R_CLONE_SETTINGS, exclude_patterns=mounted_volumes.state_exclude, io_log_redirect_cb=functools.partial( @@ -437,6 +528,7 @@ async def _save_state_folder( progress_bar=progress_bar, legacy_state=_get_legacy_state_with_dy_volumes_path(settings), application_name=f"{APP_NAME}-{settings.DY_SIDECAR_NODE_ID}", + mount_manager=get_r_clone_mount_manager(app), ) @@ -469,9 +561,10 @@ async def save_user_services_state_paths( settings=settings, progress_bar=root_progress, state_path=state_path, + index=k, mounted_volumes=mounted_volumes, ) - for state_path in state_paths + for k, state_path in enumerate(state_paths) ], max_concurrency=CONCURRENCY_STATE_SAVE_RESTORE, ) diff --git a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/mounted_fs.py b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/mounted_fs.py index eeedd4d16173..a01fe80067ad 100644 --- a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/mounted_fs.py +++ b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/mounted_fs.py @@ -7,6 +7,7 @@ from models_library.projects_nodes_io import NodeID from models_library.services import ServiceRunID from servicelib.docker_constants import PREFIX_DYNAMIC_SIDECAR_VOLUMES +from settings_library.r_clone import DEFAULT_VFS_CACHE_PATH from ..core.docker_utils import get_volume_by_label from ..core.settings import ApplicationSettings @@ -73,6 +74,13 @@ def volume_name_outputs(self) -> str: f"_{_name_from_full_path(self.outputs_path)[::-1]}" ) + @cached_property + def volume_name_vfs_cache(self) -> str: + return ( + f"{PREFIX_DYNAMIC_SIDECAR_VOLUMES}_{self.service_run_id}_{self.node_id}" + f"_{_name_from_full_path(DEFAULT_VFS_CACHE_PATH)[::-1]}" + ) + @cached_property def volume_user_preferences(self) -> str | None: if self.user_preferences_path is None: @@ -97,6 +105,10 @@ def disk_inputs_path(self) -> Path: def disk_outputs_path(self) -> Path: return _ensure_path(self._dy_volumes / self.outputs_path.relative_to("/")) + @cached_property + def vfs_cache_path(self) -> Path: + return _ensure_path(self._dy_volumes / DEFAULT_VFS_CACHE_PATH.relative_to("/")) + def disk_state_paths_iter(self) -> Iterator[Path]: for state_path in self.state_paths: yield _ensure_path(self._dy_volumes / state_path.relative_to("/")) @@ -136,6 +148,12 @@ async def get_outputs_docker_volume(self, service_run_id: ServiceRunID) -> str: ) return f"{bind_path}:{self.outputs_path}" + async def get_vfs_cache_docker_volume(self, service_run_id: ServiceRunID) -> str: + bind_path: Path = await self._get_bind_path_from_label( + self.volume_name_vfs_cache, service_run_id + ) + return f"{bind_path}:{self.vfs_cache_path}" + async def get_user_preferences_path_volume( self, service_run_id: ServiceRunID ) -> str | None: diff --git a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/r_clone_mount_manager.py b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/r_clone_mount_manager.py new file mode 100644 index 000000000000..1e36894ec5ad --- /dev/null +++ b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/r_clone_mount_manager.py @@ -0,0 +1,68 @@ +import logging +from functools import partial + +from fastapi import FastAPI +from models_library.api_schemas_dynamic_scheduler.dynamic_services import ( + DynamicServiceStop, +) +from servicelib.logging_utils import log_context +from servicelib.rabbitmq.rpc_interfaces.dynamic_scheduler.services import ( + stop_dynamic_service, +) +from simcore_sdk.node_ports_common.r_clone_mount import RCloneMountManager + +from ..core.rabbitmq import get_rabbitmq_rpc_client, post_sidecar_log_message +from ..core.settings import ApplicationSettings + +_logger = logging.getLogger(__file__) + + +async def _handle_shutdown_request(app: FastAPI) -> None: + settings: ApplicationSettings = app.state.settings + client = get_rabbitmq_rpc_client(app) + + with log_context( + _logger, logging.INFO, "requesting service shutdown via dynamic-scheduler" + ): + await stop_dynamic_service( + client, + dynamic_service_stop=DynamicServiceStop( + user_id=settings.DY_SIDECAR_USER_ID, + project_id=settings.DY_SIDECAR_PROJECT_ID, + node_id=settings.DY_SIDECAR_NODE_ID, + simcore_user_agent="", + save_state=True, + ), + ) + await post_sidecar_log_message( + app, + ( + "Your service was closed due to an issue that would create unexpected behavior. " + "No data was lost. Thank you for your understanding." + ), + log_level=logging.WARNING, + ) + + +def setup_r_clone_mount_manager(app: FastAPI): + settings: ApplicationSettings = app.state.settings + + async def _on_startup() -> None: + + app.state.r_clone_mount_manager = r_clone_mount_manager = RCloneMountManager( + settings.DY_SIDECAR_R_CLONE_SETTINGS, + handler_request_shutdown=partial(_handle_shutdown_request, app), + ) + await r_clone_mount_manager.setup() + + async def _on_shutdown() -> None: + r_clone_mount_manager: RCloneMountManager = app.state.r_clone_mount_manager + await r_clone_mount_manager.teardown() + + app.add_event_handler("startup", _on_startup) + app.add_event_handler("shutdown", _on_shutdown) + + +def get_r_clone_mount_manager(app: FastAPI) -> RCloneMountManager: + assert isinstance(app.state.r_clone_mount_manager, RCloneMountManager) # nosec + return app.state.r_clone_mount_manager diff --git a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/rclone/__init__.py b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/rclone/__init__.py new file mode 100644 index 000000000000..e69de29bb2d1