diff --git a/packages/celery-library/src/celery_library/rpc/__init__.py b/packages/celery-library/src/celery_library/rpc/__init__.py deleted file mode 100644 index e69de29bb2d1..000000000000 diff --git a/packages/celery-library/src/celery_library/rpc/_async_jobs.py b/packages/celery-library/src/celery_library/rpc/_async_jobs.py deleted file mode 100644 index ff3d735d5b68..000000000000 --- a/packages/celery-library/src/celery_library/rpc/_async_jobs.py +++ /dev/null @@ -1,144 +0,0 @@ -# pylint: disable=unused-argument - -import logging - -from models_library.api_schemas_rpc_async_jobs.async_jobs import ( - AsyncJobGet, - AsyncJobId, - AsyncJobResult, - AsyncJobStatus, -) -from models_library.api_schemas_rpc_async_jobs.exceptions import ( - JobAbortedError, - JobError, - JobMissingError, - JobNotDoneError, - JobSchedulerError, -) -from servicelib.celery.models import OwnerMetadata, TaskState -from servicelib.celery.task_manager import TaskManager -from servicelib.logging_utils import log_catch -from servicelib.rabbitmq import RPCRouter - -from ..errors import ( - TaskManagerError, - TaskNotFoundError, - TransferrableCeleryError, - decode_celery_transferrable_error, -) - -_logger = logging.getLogger(__name__) -router = RPCRouter() - - -@router.expose(reraise_if_error_type=(JobSchedulerError, JobMissingError)) -async def cancel( - task_manager: TaskManager, job_id: AsyncJobId, owner_metadata: OwnerMetadata -): - assert task_manager # nosec - assert owner_metadata # nosec - try: - await task_manager.cancel_task( - owner_metadata=owner_metadata, - task_uuid=job_id, - ) - except TaskNotFoundError as exc: - raise JobMissingError(job_id=job_id) from exc - except TaskManagerError as exc: - raise JobSchedulerError(exc=f"{exc}") from exc - - -@router.expose(reraise_if_error_type=(JobSchedulerError, JobMissingError)) -async def status( - task_manager: TaskManager, job_id: AsyncJobId, owner_metadata: OwnerMetadata -) -> AsyncJobStatus: - assert task_manager # nosec - assert owner_metadata # nosec - - try: - task_status = await task_manager.get_task_status( - owner_metadata=owner_metadata, - task_uuid=job_id, - ) - except TaskNotFoundError as exc: - raise JobMissingError(job_id=job_id) from exc - except TaskManagerError as exc: - raise JobSchedulerError(exc=f"{exc}") from exc - - return AsyncJobStatus( - job_id=job_id, - progress=task_status.progress_report, - done=task_status.is_done, - ) - - -@router.expose( - reraise_if_error_type=( - JobAbortedError, - JobError, - JobMissingError, - JobNotDoneError, - JobSchedulerError, - ) -) -async def result( - task_manager: TaskManager, job_id: AsyncJobId, owner_metadata: OwnerMetadata -) -> AsyncJobResult: - assert task_manager # nosec - assert job_id # nosec - assert owner_metadata # nosec - - try: - _status = await task_manager.get_task_status( - owner_metadata=owner_metadata, - task_uuid=job_id, - ) - if not _status.is_done: - raise JobNotDoneError(job_id=job_id) - _result = await task_manager.get_task_result( - owner_metadata=owner_metadata, - task_uuid=job_id, - ) - except TaskNotFoundError as exc: - raise JobMissingError(job_id=job_id) from exc - except TaskManagerError as exc: - raise JobSchedulerError(exc=f"{exc}") from exc - - if _status.task_state == TaskState.FAILURE: - # fallback exception to report - exc_type = type(_result).__name__ - exc_msg = f"{_result}" - - # try to recover the original error - exception = None - with log_catch(_logger, reraise=False): - assert isinstance(_result, TransferrableCeleryError) # nosec - exception = decode_celery_transferrable_error(_result) - exc_type = type(exception).__name__ - exc_msg = f"{exception}" - - if exception is None: - _logger.warning("Was not expecting '%s': '%s'", exc_type, exc_msg) - - # NOTE: cannot transfer original exception since this will not be able to be serialized - # outside of storage - raise JobError(job_id=job_id, exc_type=exc_type, exc_msg=exc_msg) - - return AsyncJobResult(result=_result) - - -@router.expose(reraise_if_error_type=(JobSchedulerError,)) -async def list_jobs( - task_manager: TaskManager, owner_metadata: OwnerMetadata -) -> list[AsyncJobGet]: - assert task_manager # nosec - try: - tasks = await task_manager.list_tasks( - owner_metadata=owner_metadata, - ) - except TaskManagerError as exc: - raise JobSchedulerError(exc=f"{exc}") from exc - - return [ - AsyncJobGet(job_id=task.uuid, job_name=task.metadata.name) for task in tasks - ] diff --git a/packages/celery-library/tests/unit/test_async_jobs.py b/packages/celery-library/tests/unit/test_async_jobs.py deleted file mode 100644 index d6a7c82cb2fe..000000000000 --- a/packages/celery-library/tests/unit/test_async_jobs.py +++ /dev/null @@ -1,381 +0,0 @@ -# pylint: disable=redefined-outer-name -# pylint: disable=unused-argument - -import asyncio -import pickle -from collections.abc import Awaitable, Callable -from datetime import timedelta -from enum import Enum -from typing import Any, Final - -import pytest -from celery import Celery, Task -from celery.contrib.testing.worker import TestWorkController -from celery_library.rpc import _async_jobs -from celery_library.task import register_task -from common_library.errors_classes import OsparcErrorMixin -from faker import Faker -from models_library.api_schemas_rpc_async_jobs.async_jobs import ( - AsyncJobGet, -) -from models_library.api_schemas_rpc_async_jobs.exceptions import ( - JobError, - JobMissingError, -) -from models_library.products import ProductName -from models_library.rabbitmq_basic_types import RPCNamespace -from models_library.users import UserID -from pydantic import TypeAdapter -from servicelib.celery.models import ExecutionMetadata, OwnerMetadata, TaskKey -from servicelib.celery.task_manager import TaskManager -from servicelib.rabbitmq import RabbitMQRPCClient, RPCRouter -from servicelib.rabbitmq.rpc_interfaces.async_jobs import async_jobs -from tenacity import ( - AsyncRetrying, - retry_if_exception_type, - stop_after_delay, - wait_fixed, -) - -pytest_simcore_core_services_selection = [ - "rabbit", - "redis", -] - - -class AccessRightError(OsparcErrorMixin, RuntimeError): - msg_template: str = ( - "User {user_id} does not have access to file {file_id} with location {location_id}" - ) - - -@pytest.fixture -async def async_jobs_rabbitmq_rpc_client( - rabbitmq_rpc_client: Callable[[str], Awaitable[RabbitMQRPCClient]], -) -> RabbitMQRPCClient: - rpc_client = await rabbitmq_rpc_client("pytest_async_jobs_rpc_client") - assert rpc_client - return rpc_client - - -@pytest.fixture -def user_id(faker: Faker) -> UserID: - return faker.pyint(min_value=1) - - -@pytest.fixture -def product_name(faker: Faker) -> ProductName: - return faker.word() - - -###### RPC Interface ###### -router = RPCRouter() - -ASYNC_JOBS_RPC_NAMESPACE: Final[RPCNamespace] = TypeAdapter( - RPCNamespace -).validate_python("async_jobs") - - -@router.expose() -async def rpc_sync_job( - task_manager: TaskManager, *, owner_metadata: OwnerMetadata, **kwargs: Any -) -> AsyncJobGet: - task_name = sync_job.__name__ - task_uuid = await task_manager.submit_task( - ExecutionMetadata(name=task_name), owner_metadata=owner_metadata, **kwargs - ) - - return AsyncJobGet(job_id=task_uuid, job_name=task_name) - - -@router.expose() -async def rpc_async_job( - task_manager: TaskManager, *, owner_metadata: OwnerMetadata, **kwargs: Any -) -> AsyncJobGet: - task_name = async_job.__name__ - task_uuid = await task_manager.submit_task( - ExecutionMetadata(name=task_name), owner_metadata=owner_metadata, **kwargs - ) - - return AsyncJobGet(job_id=task_uuid, job_name=task_name) - - -################################# - - -###### CELERY TASKS ###### -class Action(str, Enum): - ECHO = "ECHO" - RAISE = "RAISE" - SLEEP = "SLEEP" - - -async def _process_action(action: str, payload: Any) -> Any: - match action: - case Action.ECHO: - return payload - case Action.RAISE: - raise pickle.loads(payload) # noqa: S301 - case Action.SLEEP: - await asyncio.sleep(payload) - return None - - -def sync_job(task: Task, task_key: TaskKey, action: Action, payload: Any) -> Any: - _ = task - _ = task_key - return asyncio.run(_process_action(action, payload)) - - -async def async_job(task: Task, task_key: TaskKey, action: Action, payload: Any) -> Any: - _ = task - _ = task_key - return await _process_action(action, payload) - - -################################# - - -@pytest.fixture -async def register_rpc_routes( - async_jobs_rabbitmq_rpc_client: RabbitMQRPCClient, task_manager: TaskManager -) -> None: - await async_jobs_rabbitmq_rpc_client.register_router( - _async_jobs.router, ASYNC_JOBS_RPC_NAMESPACE, task_manager=task_manager - ) - await async_jobs_rabbitmq_rpc_client.register_router( - router, ASYNC_JOBS_RPC_NAMESPACE, task_manager=task_manager - ) - - -async def _start_task_via_rpc( - client: RabbitMQRPCClient, - *, - rpc_task_name: str, - user_id: UserID, - product_name: ProductName, - **kwargs: Any, -) -> tuple[AsyncJobGet, OwnerMetadata]: - owner_metadata = OwnerMetadata( - user_id=user_id, product_name=product_name, owner="pytest_client" - ) - async_job_get = await async_jobs.submit( - rabbitmq_rpc_client=client, - rpc_namespace=ASYNC_JOBS_RPC_NAMESPACE, - method_name=rpc_task_name, - owner_metadata=owner_metadata, - **kwargs, - ) - return async_job_get, owner_metadata - - -@pytest.fixture -def register_celery_tasks() -> Callable[[Celery], None]: - def _(celery_app: Celery) -> None: - register_task( - celery_app, - sync_job, - max_retries=1, - delay_between_retries=timedelta(seconds=1), - dont_autoretry_for=(AccessRightError,), - ) - register_task( - celery_app, - async_job, - max_retries=1, - delay_between_retries=timedelta(seconds=1), - dont_autoretry_for=(AccessRightError,), - ) - - return _ - - -async def _wait_for_job( - rpc_client: RabbitMQRPCClient, - *, - async_job_get: AsyncJobGet, - owner_metadata: OwnerMetadata, - stop_after: timedelta = timedelta(seconds=5), -) -> None: - - async for attempt in AsyncRetrying( - stop=stop_after_delay(stop_after.total_seconds()), - wait=wait_fixed(0.1), - retry=retry_if_exception_type(AssertionError), - reraise=True, - ): - with attempt: - result = await async_jobs.status( - rpc_client, - rpc_namespace=ASYNC_JOBS_RPC_NAMESPACE, - job_id=async_job_get.job_id, - owner_metadata=owner_metadata, - ) - assert ( - result.done is True - ), "Please check logs above, something whent wrong with task execution" - - -@pytest.mark.parametrize( - "exposed_rpc_start", - [ - rpc_sync_job.__name__, - rpc_async_job.__name__, - ], -) -@pytest.mark.parametrize( - "payload", - [ - None, - 1, - "a_string", - {"a": "dict"}, - ["a", "list"], - {"a", "set"}, - ], -) -async def test_async_jobs_workflow( - register_rpc_routes: None, - async_jobs_rabbitmq_rpc_client: RabbitMQRPCClient, - with_celery_worker: TestWorkController, - user_id: UserID, - product_name: ProductName, - exposed_rpc_start: str, - payload: Any, -): - async_job_get, owner_metadata = await _start_task_via_rpc( - async_jobs_rabbitmq_rpc_client, - rpc_task_name=exposed_rpc_start, - user_id=user_id, - product_name=product_name, - action=Action.ECHO, - payload=payload, - ) - - jobs = await async_jobs.list_jobs( - async_jobs_rabbitmq_rpc_client, - rpc_namespace=ASYNC_JOBS_RPC_NAMESPACE, - owner_metadata=owner_metadata, - ) - assert len(jobs) > 0 - - await _wait_for_job( - async_jobs_rabbitmq_rpc_client, - async_job_get=async_job_get, - owner_metadata=owner_metadata, - ) - - async_job_result = await async_jobs.result( - async_jobs_rabbitmq_rpc_client, - rpc_namespace=ASYNC_JOBS_RPC_NAMESPACE, - job_id=async_job_get.job_id, - owner_metadata=owner_metadata, - ) - assert async_job_result.result == payload - - -@pytest.mark.parametrize( - "exposed_rpc_start", - [ - rpc_async_job.__name__, - ], -) -async def test_async_jobs_cancel( - register_rpc_routes: None, - async_jobs_rabbitmq_rpc_client: RabbitMQRPCClient, - with_celery_worker: TestWorkController, - user_id: UserID, - product_name: ProductName, - exposed_rpc_start: str, -): - async_job_get, owner_metadata = await _start_task_via_rpc( - async_jobs_rabbitmq_rpc_client, - rpc_task_name=exposed_rpc_start, - user_id=user_id, - product_name=product_name, - action=Action.SLEEP, - payload=60 * 10, # test hangs if not cancelled properly - ) - - await async_jobs.cancel( - async_jobs_rabbitmq_rpc_client, - rpc_namespace=ASYNC_JOBS_RPC_NAMESPACE, - job_id=async_job_get.job_id, - owner_metadata=owner_metadata, - ) - - jobs = await async_jobs.list_jobs( - async_jobs_rabbitmq_rpc_client, - rpc_namespace=ASYNC_JOBS_RPC_NAMESPACE, - owner_metadata=owner_metadata, - ) - assert async_job_get.job_id not in [job.job_id for job in jobs] - - with pytest.raises(JobMissingError): - await async_jobs.status( - async_jobs_rabbitmq_rpc_client, - rpc_namespace=ASYNC_JOBS_RPC_NAMESPACE, - job_id=async_job_get.job_id, - owner_metadata=owner_metadata, - ) - - with pytest.raises(JobMissingError): - await async_jobs.result( - async_jobs_rabbitmq_rpc_client, - rpc_namespace=ASYNC_JOBS_RPC_NAMESPACE, - job_id=async_job_get.job_id, - owner_metadata=owner_metadata, - ) - - -@pytest.mark.parametrize( - "exposed_rpc_start", - [ - rpc_sync_job.__name__, - rpc_async_job.__name__, - ], -) -@pytest.mark.parametrize( - "error", - [ - pytest.param(Exception("generic error"), id="generic-error"), - pytest.param( - AccessRightError(user_id=1, file_id="fake_key", location_id=0), - id="custom-osparc-error", - ), - ], -) -async def test_async_jobs_raises( - register_rpc_routes: None, - async_jobs_rabbitmq_rpc_client: RabbitMQRPCClient, - with_celery_worker: TestWorkController, - user_id: UserID, - product_name: ProductName, - exposed_rpc_start: str, - error: Exception, -): - async_job_get, owner_metadata = await _start_task_via_rpc( - async_jobs_rabbitmq_rpc_client, - rpc_task_name=exposed_rpc_start, - user_id=user_id, - product_name=product_name, - action=Action.RAISE, - payload=pickle.dumps(error), - ) - - await _wait_for_job( - async_jobs_rabbitmq_rpc_client, - async_job_get=async_job_get, - owner_metadata=owner_metadata, - stop_after=timedelta(minutes=1), - ) - - with pytest.raises(JobError) as exc: - await async_jobs.result( - async_jobs_rabbitmq_rpc_client, - rpc_namespace=ASYNC_JOBS_RPC_NAMESPACE, - job_id=async_job_get.job_id, - owner_metadata=owner_metadata, - ) - assert exc.value.exc_type == type(error).__name__ - assert exc.value.exc_msg == f"{error}" diff --git a/packages/service-library/src/servicelib/rabbitmq/rpc_interfaces/async_jobs/__init__.py b/packages/service-library/src/servicelib/rabbitmq/rpc_interfaces/async_jobs/__init__.py deleted file mode 100644 index e69de29bb2d1..000000000000 diff --git a/packages/service-library/src/servicelib/rabbitmq/rpc_interfaces/async_jobs/async_jobs.py b/packages/service-library/src/servicelib/rabbitmq/rpc_interfaces/async_jobs/async_jobs.py deleted file mode 100644 index 9374b60c9b69..000000000000 --- a/packages/service-library/src/servicelib/rabbitmq/rpc_interfaces/async_jobs/async_jobs.py +++ /dev/null @@ -1,273 +0,0 @@ -import datetime -import logging -from asyncio import CancelledError -from collections.abc import AsyncGenerator, Awaitable -from typing import Any, Final - -from attr import dataclass -from models_library.api_schemas_rpc_async_jobs.async_jobs import ( - AsyncJobGet, - AsyncJobId, - AsyncJobResult, - AsyncJobStatus, -) -from models_library.api_schemas_rpc_async_jobs.exceptions import JobMissingError -from models_library.rabbitmq_basic_types import RPCMethodName, RPCNamespace -from pydantic import NonNegativeInt, TypeAdapter -from tenacity import ( - AsyncRetrying, - TryAgain, - before_sleep_log, - retry, - retry_if_exception_type, - stop_after_attempt, - stop_after_delay, - wait_fixed, - wait_random_exponential, -) - -from ....celery.models import OwnerMetadata -from ....rabbitmq import RemoteMethodNotRegisteredError -from ... import RabbitMQRPCClient - -_DEFAULT_TIMEOUT_S: Final[NonNegativeInt] = 30 -_DEFAULT_POLL_INTERVAL_S: Final[float] = 0.1 - -_logger = logging.getLogger(__name__) - - -async def cancel( - rabbitmq_rpc_client: RabbitMQRPCClient, - *, - rpc_namespace: RPCNamespace, - job_id: AsyncJobId, - owner_metadata: OwnerMetadata, -) -> None: - await rabbitmq_rpc_client.request( - rpc_namespace, - TypeAdapter(RPCMethodName).validate_python("cancel"), - job_id=job_id, - owner_metadata=owner_metadata, - timeout_s=_DEFAULT_TIMEOUT_S, - ) - - -async def status( - rabbitmq_rpc_client: RabbitMQRPCClient, - *, - rpc_namespace: RPCNamespace, - job_id: AsyncJobId, - owner_metadata: OwnerMetadata, -) -> AsyncJobStatus: - _result = await rabbitmq_rpc_client.request( - rpc_namespace, - TypeAdapter(RPCMethodName).validate_python("status"), - job_id=job_id, - owner_metadata=owner_metadata, - timeout_s=_DEFAULT_TIMEOUT_S, - ) - assert isinstance(_result, AsyncJobStatus) - return _result - - -async def result( - rabbitmq_rpc_client: RabbitMQRPCClient, - *, - rpc_namespace: RPCNamespace, - job_id: AsyncJobId, - owner_metadata: OwnerMetadata, -) -> AsyncJobResult: - _result = await rabbitmq_rpc_client.request( - rpc_namespace, - TypeAdapter(RPCMethodName).validate_python("result"), - job_id=job_id, - owner_metadata=owner_metadata, - timeout_s=_DEFAULT_TIMEOUT_S, - ) - assert isinstance(_result, AsyncJobResult) - return _result - - -async def list_jobs( - rabbitmq_rpc_client: RabbitMQRPCClient, - *, - rpc_namespace: RPCNamespace, - owner_metadata: OwnerMetadata, -) -> list[AsyncJobGet]: - _result: list[AsyncJobGet] = await rabbitmq_rpc_client.request( - rpc_namespace, - TypeAdapter(RPCMethodName).validate_python("list_jobs"), - owner_metadata=owner_metadata, - timeout_s=_DEFAULT_TIMEOUT_S, - ) - return _result - - -async def submit( - rabbitmq_rpc_client: RabbitMQRPCClient, - *, - rpc_namespace: RPCNamespace, - method_name: str, - owner_metadata: OwnerMetadata, - **kwargs, -) -> AsyncJobGet: - _result = await rabbitmq_rpc_client.request( - rpc_namespace, - TypeAdapter(RPCMethodName).validate_python(method_name), - owner_metadata=owner_metadata, - **kwargs, - timeout_s=_DEFAULT_TIMEOUT_S, - ) - assert isinstance(_result, AsyncJobGet) # nosec - return _result - - -_DEFAULT_RPC_RETRY_POLICY: dict[str, Any] = { - "retry": retry_if_exception_type((RemoteMethodNotRegisteredError,)), - "wait": wait_random_exponential(max=20), - "stop": stop_after_attempt(30), - "reraise": True, - "before_sleep": before_sleep_log(_logger, logging.WARNING), -} - - -@retry(**_DEFAULT_RPC_RETRY_POLICY) -async def _wait_for_completion( - rabbitmq_rpc_client: RabbitMQRPCClient, - *, - rpc_namespace: RPCNamespace, - method_name: RPCMethodName, - job_id: AsyncJobId, - owner_metadata: OwnerMetadata, - client_timeout: datetime.timedelta, -) -> AsyncGenerator[AsyncJobStatus]: - try: - async for attempt in AsyncRetrying( - stop=stop_after_delay(client_timeout.total_seconds()), - reraise=True, - retry=retry_if_exception_type((TryAgain, JobMissingError)), - before_sleep=before_sleep_log(_logger, logging.DEBUG), - wait=wait_fixed(_DEFAULT_POLL_INTERVAL_S), - ): - with attempt: - job_status = await status( - rabbitmq_rpc_client, - rpc_namespace=rpc_namespace, - job_id=job_id, - owner_metadata=owner_metadata, - ) - yield job_status - if not job_status.done: - msg = f"{job_status.job_id=}: '{job_status.progress=}'" - raise TryAgain(msg) # noqa: TRY301 - - except TryAgain as exc: - # this is a timeout - msg = f"Async job {job_id=}, calling to '{method_name}' timed-out after {client_timeout}" - raise TimeoutError(msg) from exc - - -@dataclass(frozen=True) -class AsyncJobComposedResult: - status: AsyncJobStatus - _result: Awaitable[Any] | None = None - - @property - def done(self) -> bool: - return self._result is not None - - async def result(self) -> Any: - if not self._result: - msg = "No result ready!" - raise ValueError(msg) - return await self._result - - -async def wait_and_get_result( - rabbitmq_rpc_client: RabbitMQRPCClient, - *, - rpc_namespace: RPCNamespace, - method_name: str, - job_id: AsyncJobId, - owner_metadata: OwnerMetadata, - client_timeout: datetime.timedelta, -) -> AsyncGenerator[AsyncJobComposedResult]: - """when a job is already submitted this will wait for its completion - and return the composed result""" - try: - job_status = None - async for job_status in _wait_for_completion( - rabbitmq_rpc_client, - rpc_namespace=rpc_namespace, - method_name=method_name, - job_id=job_id, - owner_metadata=owner_metadata, - client_timeout=client_timeout, - ): - assert job_status is not None # nosec - yield AsyncJobComposedResult(job_status) - - # return the result - if job_status: - yield AsyncJobComposedResult( - job_status, - result( - rabbitmq_rpc_client, - rpc_namespace=rpc_namespace, - job_id=job_id, - owner_metadata=owner_metadata, - ), - ) - except (TimeoutError, CancelledError) as error: - try: - await cancel( - rabbitmq_rpc_client, - rpc_namespace=rpc_namespace, - job_id=job_id, - owner_metadata=owner_metadata, - ) - except Exception as exc: - raise exc from error # NOSONAR - raise - - -async def submit_and_wait( - rabbitmq_rpc_client: RabbitMQRPCClient, - *, - rpc_namespace: RPCNamespace, - method_name: str, - owner_metadata: OwnerMetadata, - client_timeout: datetime.timedelta, - **kwargs, -) -> AsyncGenerator[AsyncJobComposedResult]: - async_job_rpc_get = None - try: - async_job_rpc_get = await submit( - rabbitmq_rpc_client, - rpc_namespace=rpc_namespace, - method_name=method_name, - owner_metadata=owner_metadata, - **kwargs, - ) - except (TimeoutError, CancelledError) as error: - if async_job_rpc_get is not None: - try: - await cancel( - rabbitmq_rpc_client, - rpc_namespace=rpc_namespace, - job_id=async_job_rpc_get.job_id, - owner_metadata=owner_metadata, - ) - except Exception as exc: - raise exc from error - raise - - async for wait_and_ in wait_and_get_result( - rabbitmq_rpc_client, - rpc_namespace=rpc_namespace, - method_name=method_name, - job_id=async_job_rpc_get.job_id, - owner_metadata=owner_metadata, - client_timeout=client_timeout, - ): - yield wait_and_ diff --git a/services/api-server/src/simcore_service_api_server/services_rpc/async_jobs.py b/services/api-server/src/simcore_service_api_server/services_rpc/async_jobs.py deleted file mode 100644 index 0c1cb911e2fd..000000000000 --- a/services/api-server/src/simcore_service_api_server/services_rpc/async_jobs.py +++ /dev/null @@ -1,96 +0,0 @@ -import functools -from dataclasses import dataclass - -from models_library.api_schemas_rpc_async_jobs.async_jobs import ( - AsyncJobGet, - AsyncJobId, - AsyncJobResult, - AsyncJobStatus, -) -from models_library.api_schemas_rpc_async_jobs.exceptions import ( - JobAbortedError, - JobError, - JobNotDoneError, - JobSchedulerError, -) -from models_library.api_schemas_storage import STORAGE_RPC_NAMESPACE -from servicelib.celery.models import OwnerMetadata -from servicelib.rabbitmq._client_rpc import RabbitMQRPCClient -from servicelib.rabbitmq.rpc_interfaces.async_jobs import async_jobs - -from ..exceptions.service_errors_utils import service_exception_mapper -from ..exceptions.task_errors import ( - TaskCancelledError, - TaskError, - TaskResultMissingError, - TaskSchedulerError, -) - -_exception_mapper = functools.partial( - service_exception_mapper, service_name="Async jobs" -) - - -@dataclass -class AsyncJobClient: - _rabbitmq_rpc_client: RabbitMQRPCClient - - @_exception_mapper( - rpc_exception_map={ - JobSchedulerError: TaskSchedulerError, - } - ) - async def cancel( - self, *, job_id: AsyncJobId, owner_metadata: OwnerMetadata - ) -> None: - return await async_jobs.cancel( - self._rabbitmq_rpc_client, - rpc_namespace=STORAGE_RPC_NAMESPACE, - job_id=job_id, - owner_metadata=owner_metadata, - ) - - @_exception_mapper( - rpc_exception_map={ - JobSchedulerError: TaskSchedulerError, - } - ) - async def status( - self, *, job_id: AsyncJobId, owner_metadata: OwnerMetadata - ) -> AsyncJobStatus: - return await async_jobs.status( - self._rabbitmq_rpc_client, - rpc_namespace=STORAGE_RPC_NAMESPACE, - job_id=job_id, - owner_metadata=owner_metadata, - ) - - @_exception_mapper( - rpc_exception_map={ - JobSchedulerError: TaskSchedulerError, - JobNotDoneError: TaskResultMissingError, - JobAbortedError: TaskCancelledError, - JobError: TaskError, - } - ) - async def result( - self, *, job_id: AsyncJobId, owner_metadata: OwnerMetadata - ) -> AsyncJobResult: - return await async_jobs.result( - self._rabbitmq_rpc_client, - rpc_namespace=STORAGE_RPC_NAMESPACE, - job_id=job_id, - owner_metadata=owner_metadata, - ) - - @_exception_mapper( - rpc_exception_map={ - JobSchedulerError: TaskSchedulerError, - } - ) - async def list_jobs(self, *, owner_metadata: OwnerMetadata) -> list[AsyncJobGet]: - return await async_jobs.list_jobs( - self._rabbitmq_rpc_client, - rpc_namespace=STORAGE_RPC_NAMESPACE, - owner_metadata=owner_metadata, - ) diff --git a/services/storage/src/simcore_service_storage/api/rpc/routes.py b/services/storage/src/simcore_service_storage/api/rpc/routes.py index ebf1ba604112..6a69c21b34f8 100644 --- a/services/storage/src/simcore_service_storage/api/rpc/routes.py +++ b/services/storage/src/simcore_service_storage/api/rpc/routes.py @@ -1,6 +1,5 @@ import logging -from celery_library.rpc import _async_jobs from fastapi import FastAPI from models_library.api_schemas_storage import STORAGE_RPC_NAMESPACE from servicelib.logging_utils import log_context @@ -14,7 +13,6 @@ ROUTERS: list[RPCRouter] = [ - _async_jobs.router, _paths.router, _simcore_s3.router, ]