Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
653cc89
add product_name arg
giancarloromeo Dec 18, 2025
ea16505
fix missing arg
giancarloromeo Dec 18, 2025
ddd5590
fix rename
giancarloromeo Dec 18, 2025
52464ad
fix missing arg
giancarloromeo Dec 18, 2025
e297cc3
fix typo
giancarloromeo Dec 18, 2025
5406c90
add product_name arg
giancarloromeo Dec 18, 2025
4cc1cde
add product_name query param
giancarloromeo Dec 18, 2025
202bf3d
fix missing query param
giancarloromeo Dec 18, 2025
4371189
add product_name query param
giancarloromeo Dec 18, 2025
66e9cd4
add product_name query param
giancarloromeo Dec 18, 2025
dd6b742
add product_name query param
giancarloromeo Dec 18, 2025
645d0c6
fix test
giancarloromeo Dec 18, 2025
e9c6873
add product_name arg
giancarloromeo Dec 18, 2025
d70aa0d
fix test
giancarloromeo Dec 18, 2025
1a09d66
add product_name param
giancarloromeo Dec 18, 2025
790379e
add product_name param
giancarloromeo Dec 18, 2025
c3fe800
add product_name query param
giancarloromeo Dec 18, 2025
cbef0cf
add product_name query param
giancarloromeo Dec 18, 2025
dd26f73
add product fake factory
giancarloromeo Dec 18, 2025
c78d1f4
set default product
giancarloromeo Dec 18, 2025
6e12492
fix test
giancarloromeo Dec 18, 2025
8b6fbea
add product_name query param
giancarloromeo Dec 18, 2025
d4496cb
fix test
giancarloromeo Dec 18, 2025
5c4f2b0
fix type
giancarloromeo Dec 18, 2025
d7e1e20
add product_name query param
giancarloromeo Dec 19, 2025
c2b9011
fix tests
giancarloromeo Dec 19, 2025
74b159a
fix simcore_sdk tests
giancarloromeo Dec 19, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 11 additions & 11 deletions packages/aws-library/src/aws_library/s3/_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,7 @@ async def list_entries_paginated(
prefix: str,
*,
items_per_page: int = _MAX_ITEMS_PER_PAGE,
) -> AsyncGenerator[list[S3MetaData | S3DirectoryMetaData], None]:
) -> AsyncGenerator[list[S3MetaData | S3DirectoryMetaData]]:
"""Breadth-first recursive listing of S3 entries (files + directories).

Yields:
Expand Down Expand Up @@ -564,18 +564,18 @@ async def upload_file(
bucket: S3BucketName,
file: Path,
object_key: S3ObjectKey,
bytes_transfered_cb: UploadedBytesTransferredCallback | None,
bytes_transferred_cb: UploadedBytesTransferredCallback | None,
) -> None:
"""upload a file using aioboto3 transfer manager (e.g. works >5Gb and creates multiple threads)"""
upload_options: dict[str, Any] = {
"Bucket": bucket,
"Key": object_key,
"Config": TransferConfig(max_concurrency=self.transfer_max_concurrency),
}
if bytes_transfered_cb:
if bytes_transferred_cb:
upload_options |= {
"Callback": functools.partial(
bytes_transfered_cb, file_name=f"{object_key}"
bytes_transferred_cb, file_name=f"{object_key}"
)
}
await self._client.upload_file(f"{file}", **upload_options)
Expand All @@ -587,7 +587,7 @@ async def copy_object(
bucket: S3BucketName,
src_object_key: S3ObjectKey,
dst_object_key: S3ObjectKey,
bytes_transfered_cb: CopiedBytesTransferredCallback | None,
bytes_transferred_cb: CopiedBytesTransferredCallback | None,
object_metadata: S3MetaData | None = None,
) -> None:
"""copy a file in S3 using aioboto3 transfer manager (e.g. works >5Gb and creates multiple threads)"""
Expand All @@ -600,22 +600,22 @@ async def copy_object(
multipart_threshold=MULTIPART_COPY_THRESHOLD,
),
}
if bytes_transfered_cb:
if bytes_transferred_cb:
copy_options |= {
"Callback": functools.partial(
bytes_transfered_cb, file_name=f"{dst_object_key}"
bytes_transferred_cb, file_name=f"{dst_object_key}"
)
}
# NOTE: boto3 copy function uses copy_object until 'multipart_threshold' is reached then switches to multipart copy
# copy_object does not provide any callbacks so we can't track progress so we need to ensure at least the completion
# of the object is tracked
await self._client.copy(**copy_options)
if bytes_transfered_cb:
if bytes_transferred_cb:
if object_metadata is None:
object_metadata = await self.get_object_metadata(
bucket=bucket, object_key=dst_object_key
)
bytes_transfered_cb(object_metadata.size, file_name=f"{dst_object_key}")
bytes_transferred_cb(object_metadata.size, file_name=f"{dst_object_key}")

@s3_exception_handler(_logger)
async def copy_objects_recursively(
Expand All @@ -624,7 +624,7 @@ async def copy_objects_recursively(
bucket: S3BucketName,
src_prefix: str,
dst_prefix: str,
bytes_transfered_cb: CopiedBytesTransferredCallback | None,
bytes_transferred_cb: CopiedBytesTransferredCallback | None,
) -> None:
"""copy from 1 location in S3 to another recreating the same structure"""
dst_metadata = await self.get_directory_metadata(
Expand All @@ -638,7 +638,7 @@ async def copy_objects_recursively(
bucket=bucket,
src_object_key=s3_object.object_key,
dst_object_key=s3_object.object_key.replace(src_prefix, dst_prefix),
bytes_transfered_cb=bytes_transfered_cb,
bytes_transferred_cb=bytes_transferred_cb,
object_metadata=s3_object,
)
async for s3_object in self._list_all_objects(
Expand Down
36 changes: 18 additions & 18 deletions packages/aws-library/tests/test_s3_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -289,14 +289,14 @@ class _UploadProgressCallback:
file_size: int
action: str
logger: logging.Logger
_total_bytes_transfered: int = 0
_total_bytes_transferred: int = 0

def __call__(self, bytes_transferred: int, *, file_name: str) -> None:
self._total_bytes_transfered += bytes_transferred
assert self._total_bytes_transfered <= self.file_size
self._total_bytes_transferred += bytes_transferred
assert self._total_bytes_transferred <= self.file_size
self.logger.info(
"progress: %s",
f"{self.action} {file_name=} {self._total_bytes_transfered} / {self.file_size} bytes",
f"{self.action} {file_name=} {self._total_bytes_transferred} / {self.file_size} bytes",
)


Expand All @@ -305,14 +305,14 @@ class _CopyProgressCallback:
file_size: int
action: str
logger: logging.Logger
_total_bytes_transfered: int = 0
_total_bytes_transferred: int = 0

def __call__(self, total_bytes_copied: int, *, file_name: str) -> None:
self._total_bytes_transfered = total_bytes_copied
assert self._total_bytes_transfered <= self.file_size
self._total_bytes_transferred = total_bytes_copied
assert self._total_bytes_transferred <= self.file_size
self.logger.info(
"progress: %s",
f"{self.action} {file_name=} {self._total_bytes_transfered} / {self.file_size} bytes",
f"{self.action} {file_name=} {self._total_bytes_transferred} / {self.file_size} bytes",
)


Expand All @@ -339,7 +339,7 @@ async def _uploader(file: Path, base_path: Path | None = None) -> UploadedFile:
bucket=with_s3_bucket,
file=file,
object_key=object_key,
bytes_transfered_cb=progress_cb,
bytes_transferred_cb=progress_cb,
)
# there is no response from aioboto3...
assert not response
Expand Down Expand Up @@ -432,7 +432,7 @@ async def _copier(src_key: S3ObjectKey, dst_key: S3ObjectKey) -> S3ObjectKey:
bucket=with_s3_bucket,
src_object_key=src_key,
dst_object_key=dst_key,
bytes_transfered_cb=progress_cb,
bytes_transferred_cb=progress_cb,
)
copied_object_keys.append(dst_key)
return dst_key
Expand Down Expand Up @@ -467,7 +467,7 @@ async def _copier(src_prefix: str, dst_prefix: str) -> str:
bucket=with_s3_bucket,
src_prefix=src_prefix,
dst_prefix=dst_prefix,
bytes_transfered_cb=progress_cb,
bytes_transferred_cb=progress_cb,
)

dst_directory_metadata = await simcore_s3_api.get_directory_metadata(
Expand Down Expand Up @@ -1599,7 +1599,7 @@ async def test_upload_file_invalid_raises(
bucket=non_existing_s3_bucket,
file=file,
object_key=faker.pystr(),
bytes_transfered_cb=None,
bytes_transferred_cb=None,
)


Expand Down Expand Up @@ -1654,15 +1654,15 @@ async def test_copy_file_invalid_raises(
bucket=non_existing_s3_bucket,
src_object_key=uploaded_file.s3_key,
dst_object_key=dst_object_key,
bytes_transfered_cb=None,
bytes_transferred_cb=None,
)
fake_src_key = faker.file_name()
with pytest.raises(S3KeyNotFoundError, match=rf"{fake_src_key}"):
await simcore_s3_api.copy_object(
bucket=with_s3_bucket,
src_object_key=fake_src_key,
dst_object_key=dst_object_key,
bytes_transfered_cb=None,
bytes_transferred_cb=None,
)


Expand Down Expand Up @@ -1838,7 +1838,7 @@ async def test_copy_files_recursively_raises(
bucket=non_existing_s3_bucket,
src_prefix="",
dst_prefix="",
bytes_transfered_cb=None,
bytes_transferred_cb=None,
)


Expand Down Expand Up @@ -1924,7 +1924,7 @@ def run_async_test(*args, **kwargs) -> None:
],
ids=byte_size_ids,
)
def test_copy_recurively_performance(
def test_copy_recursively_performance(
mocked_s3_server_envs: EnvVarsDict,
with_uploaded_folder_on_s3: list[UploadedFile],
copy_files_recursively: Callable[[str, str], Awaitable[str]],
Expand Down Expand Up @@ -2070,7 +2070,7 @@ async def path_s3_files_for_archive(

@pytest.fixture
def archive_download_path(tmp_path: Path, faker: Faker) -> Iterator[Path]:
path = tmp_path / f"downlaoded_ardhive_{faker.uuid4()}.zip"
path = tmp_path / f"downloaded_ardhive_{faker.uuid4()}.zip"
yield path
if path.exists():
path.unlink()
Expand Down Expand Up @@ -2119,7 +2119,7 @@ async def test_workflow_compress_s3_objects_and_local_files_in_a_single_archive_
# - files are read form disk and S3
# - a zip archive is created on the go
# - the zip archive is streamed to S3 as soon as chunks inside it are created
# Uses no disk and constant memory for the entire opration.
# Uses no disk and constant memory for the entire operation.

# 1. assemble and upload zip archive

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from .models.base import metadata
from .models.file_meta_data import file_meta_data
from .models.groups import groups, user_to_groups
from .models.products import products
from .models.projects import projects
from .models.tokens import tokens
from .models.users import users
Expand All @@ -15,6 +16,7 @@
"tokens",
"file_meta_data",
"metadata",
"products",
"projects",
"users",
"groups",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
AsyncJobGet,
)
from models_library.api_schemas_webserver.storage import PathToExport
from models_library.products import ProductName
from models_library.users import UserID
from pydantic import TypeAdapter, validate_call
from pytest_mock import MockType
Expand All @@ -29,7 +30,8 @@ async def start_export_data(
paths_to_export: list[PathToExport],
export_as: Literal["path", "download_link"],
owner_metadata: OwnerMetadata,
user_id: UserID
user_id: UserID,
product_name: ProductName,
) -> tuple[AsyncJobGet, OwnerMetadata]:
assert rabbitmq_rpc_client
assert owner_metadata
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,17 @@
import pytest
import sqlalchemy as sa
from faker import Faker
from models_library.products import ProductName
from models_library.projects import ProjectID
from models_library.projects_nodes_io import NodeID
from models_library.users import UserID
from pydantic import TypeAdapter
from simcore_postgres_database.models.project_to_groups import project_to_groups
from simcore_postgres_database.storage_models import projects, users
from simcore_postgres_database.storage_models import products, projects, users
from sqlalchemy.dialects.postgresql import insert as pg_insert
from sqlalchemy.ext.asyncio import AsyncConnection, AsyncEngine

from .helpers.faker_factories import DEFAULT_FAKER, random_project
from .helpers.faker_factories import DEFAULT_FAKER, random_product, random_project
from .helpers.postgres_users import insert_and_get_user_and_secrets_lifespan


Expand Down Expand Up @@ -51,14 +52,49 @@ async def other_user_id(sqlalchemy_async_engine: AsyncEngine) -> AsyncIterator[U
yield new_user_id


@pytest.fixture
async def create_product(
sqlalchemy_async_engine: AsyncEngine,
) -> AsyncIterator[Callable[..., Awaitable[dict[str, Any]]]]:
created_product_names = []

async def _creator(**kwargs) -> dict[str, Any]:
product_config = {}
product_config.update(kwargs)
async with sqlalchemy_async_engine.begin() as conn:
result = await conn.execute(
products.insert()
.values(**random_product(**product_config))
.returning(sa.literal_column("*"))
)
row = result.one()
created_product_names.append(row.name)
return dict(row._asdict())

yield _creator

async with sqlalchemy_async_engine.begin() as conn:
await conn.execute(
products.delete().where(products.c.name.in_(created_product_names))
)


@pytest.fixture
async def product_name(
create_product: Callable[..., Awaitable[dict[str, Any]]],
) -> ProductName:
product = await create_product()
return ProductName(product["name"])


@pytest.fixture
async def create_project(
user_id: UserID, sqlalchemy_async_engine: AsyncEngine
user_id: UserID, product_name: ProductName, sqlalchemy_async_engine: AsyncEngine
) -> AsyncIterator[Callable[..., Awaitable[dict[str, Any]]]]:
created_project_uuids = []

async def _creator(**kwargs) -> dict[str, Any]:
prj_config = {"prj_owner": user_id}
prj_config = {"prj_owner": user_id, "product_name": product_name}
prj_config.update(kwargs)
async with sqlalchemy_async_engine.begin() as conn:
result = await conn.execute(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
AsyncJobGet,
)
from models_library.api_schemas_storage import STORAGE_RPC_NAMESPACE
from models_library.products import ProductName
from models_library.projects_nodes_io import LocationID
from models_library.rabbitmq_basic_types import RPCMethodName
from models_library.users import UserID
Expand All @@ -20,7 +21,8 @@ async def compute_path_size(
location_id: LocationID,
path: Path,
owner_metadata: OwnerMetadata,
user_id: UserID
user_id: UserID,
product_name: ProductName,
) -> tuple[AsyncJobGet, OwnerMetadata]:
async_job_rpc_get = await submit(
rabbitmq_rpc_client=client,
Expand All @@ -30,6 +32,7 @@ async def compute_path_size(
location_id=location_id,
path=path,
user_id=user_id,
product_name=product_name,
)
return async_job_rpc_get, owner_metadata

Expand All @@ -40,7 +43,8 @@ async def delete_paths(
location_id: LocationID,
paths: set[Path],
owner_metadata: OwnerMetadata,
user_id: UserID
user_id: UserID,
product_name: ProductName,
) -> tuple[AsyncJobGet, OwnerMetadata]:
async_job_rpc_get = await submit(
rabbitmq_rpc_client=client,
Expand All @@ -50,5 +54,6 @@ async def delete_paths(
location_id=location_id,
paths=paths,
user_id=user_id,
product_name=product_name,
)
return async_job_rpc_get, owner_metadata
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from models_library.api_schemas_storage import STORAGE_RPC_NAMESPACE
from models_library.api_schemas_storage.storage_schemas import FoldersBody
from models_library.api_schemas_webserver.storage import PathToExport
from models_library.products import ProductName
from models_library.rabbitmq_basic_types import RPCMethodName
from models_library.users import UserID
from pydantic import TypeAdapter
Expand Down Expand Up @@ -42,6 +43,7 @@ async def start_export_data(
export_as: Literal["path", "download_link"],
owner_metadata: OwnerMetadata,
user_id: UserID,
product_name: ProductName,
) -> tuple[AsyncJobGet, OwnerMetadata]:
async_job_rpc_get = await submit(
rabbitmq_rpc_client,
Expand All @@ -51,5 +53,6 @@ async def start_export_data(
paths_to_export=paths_to_export,
export_as=export_as,
user_id=user_id,
product_name=product_name,
)
return async_job_rpc_get, owner_metadata
Loading
Loading