44
55import random
66import string
7- from collections .abc import AsyncGenerator , Callable
7+ from collections .abc import AsyncGenerator , AsyncIterable , Callable
88from contextlib import nullcontext
99from datetime import UTC , datetime
1010from typing import Any , overload
1111
12- from sqlalchemy import Select , delete , func , select , text
12+ from sqlalchemy import Select , and_ , delete , func , select , text
1313from sqlalchemy .exc import IntegrityError
1414from sqlalchemy .ext .asyncio import AsyncSession , AsyncSessionTransaction
1515from sqlalchemy .orm import joinedload , selectinload
3131 Slug ,
3232)
3333from renku_data_services .data_connectors .models import DataConnector
34+ from renku_data_services .data_connectors .orm import DataConnectorORM
3435from renku_data_services .namespace import models
3536from renku_data_services .namespace import orm as schemas
37+ from renku_data_services .project .models import Project
3638from renku_data_services .project .orm import ProjectORM
37- from renku_data_services .search .db import SearchUpdatesRepo
39+ from renku_data_services .search .db import GlobalDataConnector , SearchUpdatesRepo
3840from renku_data_services .search .decorators import update_search_document
3941from renku_data_services .users import models as user_models
4042from renku_data_services .users import orm as user_schemas
4345logger = logging .getLogger (__name__ )
4446
4547
48+ async def _check_namespace_permissions (
49+ user : base_models .APIUser , authz : Authz , ns : models .Namespace , scope : Scope
50+ ) -> None :
51+ """Helper function to check for namespace permissions."""
52+ is_user_namespace = ns .kind == models .NamespaceKind .user
53+ ns_id = ns .id if is_user_namespace else ns .underlying_resource_id
54+ allowed = await authz .has_permission (user , ns .kind .to_resource_type (), ns_id , scope )
55+ if not allowed :
56+ raise errors .missing_or_unauthorized (ns .kind , ns .underlying_resource_id )
57+
58+
59+ async def _upsert_old_entity_slug (session : AsyncSession , old_entity_slug : schemas .EntitySlugORM ) -> None :
60+ """This function checks if an old entity slug exists and if so then it updates it.
61+
62+ If the old entity slug does not exists then it inserts one.
63+ This is needed so that when a slug is renamed then the old slug still points to the new
64+ and current entity.
65+ """
66+ stmt = select (schemas .EntitySlugOldORM ).where (schemas .EntitySlugOldORM .slug == old_entity_slug .slug )
67+ if old_entity_slug .project_id is not None :
68+ stmt = stmt .where (schemas .EntitySlugOldORM .project_id == old_entity_slug .project_id )
69+ else :
70+ stmt = stmt .where (schemas .EntitySlugOldORM .project_id .is_ (None ))
71+ if old_entity_slug .data_connector_id is not None :
72+ stmt = stmt .where (schemas .EntitySlugOldORM .data_connector_id == old_entity_slug .data_connector_id )
73+ else :
74+ stmt = stmt .where (schemas .EntitySlugOldORM .data_connector_id .is_ (None ))
75+ existing_old_slug = await session .scalar (stmt )
76+
77+ if not existing_old_slug :
78+ session .add (
79+ schemas .EntitySlugOldORM (
80+ slug = old_entity_slug .slug ,
81+ latest_slug_id = old_entity_slug .id ,
82+ project_id = old_entity_slug .project_id ,
83+ data_connector_id = old_entity_slug .data_connector_id ,
84+ )
85+ )
86+ return
87+
88+ existing_old_slug .slug = old_entity_slug .slug
89+ existing_old_slug .latest_slug_id = old_entity_slug .id
90+ existing_old_slug .project_id = old_entity_slug .project_id
91+ existing_old_slug .data_connector_id = old_entity_slug .data_connector_id
92+
93+
4694class GroupRepository :
4795 """Repository for groups."""
4896
@@ -521,16 +569,6 @@ async def move_data_connector(
521569 ) -> None :
522570 """Rename or move a namespace."""
523571
524- async def _check_ns_permissions (
525- user : base_models .APIUser , authz : Authz , ns : models .Namespace , scope : Scope
526- ) -> None :
527- """Helper function to check for namespace permissions."""
528- is_user_namespace = ns .kind == models .NamespaceKind .user
529- ns_id = ns .id if is_user_namespace else ns .underlying_resource_id
530- allowed = await authz .has_permission (user , ns .kind .to_resource_type (), ns_id , scope )
531- if not allowed :
532- raise errors .missing_or_unauthorized (ns .kind , ns .underlying_resource_id )
533-
534572 async def _get_dc_slug (session : AsyncSession , dc_id : ULID ) -> schemas .EntitySlugORM :
535573 """Helper function to get the data connector slug or raise an exception."""
536574 dc_slug = await session .scalar (
@@ -562,40 +600,6 @@ async def _check_dc_slug_not_taken(
562600 message = f"The owner already has a data connector with slug { new_slug } , please try a different one"
563601 )
564602
565- async def _upsert_old_dc_slug (session : AsyncSession , old_dc_slug : schemas .EntitySlugORM ) -> None :
566- """This function checks if an old entity slug exists and if so then it updates it.
567-
568- If the old entity slug does not exists then it inserts one.
569- This is needed so that when a slug is renamed then the old slug still points to the new
570- and current entity.
571- """
572- stmt = select (schemas .EntitySlugOldORM ).where (schemas .EntitySlugOldORM .slug == old_dc_slug .slug )
573- if old_dc_slug .project_id is not None :
574- stmt .where (schemas .EntitySlugOldORM .project_id == old_dc_slug .project_id )
575- else :
576- stmt .where (schemas .EntitySlugOldORM .project_id .is_ (None ))
577- if old_dc_slug .data_connector_id is not None :
578- stmt .where (schemas .EntitySlugOldORM .data_connector_id == old_dc_slug .data_connector_id )
579- else :
580- stmt .where (schemas .EntitySlugOldORM .data_connector_id .is_ (None ))
581- existing_old_slug = await session .scalar (stmt )
582-
583- if not existing_old_slug :
584- session .add (
585- schemas .EntitySlugOldORM (
586- slug = old_dc_slug .slug ,
587- latest_slug_id = old_dc_slug .id ,
588- project_id = old_dc_slug .project_id ,
589- data_connector_id = old_dc_slug .data_connector_id ,
590- )
591- )
592- return
593-
594- existing_old_slug .slug = old_dc_slug .slug
595- existing_old_slug .latest_slug_id = old_dc_slug .id
596- existing_old_slug .project_id = old_dc_slug .project_id
597- existing_old_slug .data_connector_id = old_dc_slug .data_connector_id
598-
599603 session_ctx : AsyncSession | nullcontext = nullcontext ()
600604 transaction : AsyncSessionTransaction | nullcontext = nullcontext ()
601605 if session is None :
@@ -615,7 +619,7 @@ async def _upsert_old_dc_slug(session: AsyncSession, old_dc_slug: schemas.Entity
615619 pass
616620 case (new_path , old_path , old_slug , new_slug ) if new_path == old_path and old_slug != new_slug :
617621 await _check_dc_slug_not_taken (session , dc .namespace , new_slug )
618- await _upsert_old_dc_slug (session , dc_slug )
622+ await _upsert_old_entity_slug (session , dc_slug )
619623 dc_slug .slug = new_slug .value
620624 case (NamespacePath (), NamespacePath () as new_path , old_slug , new_slug ):
621625 new_usr_grp_ns = await self .get_namespace_by_path (user , new_path , session )
@@ -624,10 +628,10 @@ async def _upsert_old_dc_slug(session: AsyncSession, old_dc_slug: schemas.Entity
624628 message = f"The data connector namespace { new .parent ()} cannot be found or "
625629 "you do not have sufficient permissions to access it."
626630 )
627- await _check_ns_permissions (user , self .authz , dc .namespace , Scope .WRITE )
628- await _check_ns_permissions (user , self .authz , new_usr_grp_ns , Scope .WRITE )
631+ await _check_namespace_permissions (user , self .authz , dc .namespace , Scope .WRITE )
632+ await _check_namespace_permissions (user , self .authz , new_usr_grp_ns , Scope .WRITE )
629633 await _check_dc_slug_not_taken (session , new_usr_grp_ns , new_slug )
630- await _upsert_old_dc_slug (session , dc_slug )
634+ await _upsert_old_entity_slug (session , dc_slug )
631635 dc_slug .namespace_id = new_usr_grp_ns .id
632636 if old_slug != new_slug :
633637 dc_slug .slug = new_slug .value
@@ -638,10 +642,10 @@ async def _upsert_old_dc_slug(session: AsyncSession, old_dc_slug: schemas.Entity
638642 message = f"The data connector namespace { new_path } cannot be found or "
639643 "you do not have sufficient permissions to access it."
640644 )
641- await _check_ns_permissions (user , self .authz , dc .namespace , Scope .WRITE )
642- await _check_ns_permissions (user , self .authz , new_proj_ns , Scope .WRITE )
645+ await _check_namespace_permissions (user , self .authz , dc .namespace , Scope .WRITE )
646+ await _check_namespace_permissions (user , self .authz , new_proj_ns , Scope .WRITE )
643647 await _check_dc_slug_not_taken (session , new_proj_ns , new_slug )
644- await _upsert_old_dc_slug (session , dc_slug )
648+ await _upsert_old_entity_slug (session , dc_slug )
645649 dc_slug .project_id = new_proj_ns .underlying_resource_id
646650 dc_slug .namespace_id = new_proj_ns .id
647651 if old_slug != new_slug :
@@ -653,10 +657,10 @@ async def _upsert_old_dc_slug(session: AsyncSession, old_dc_slug: schemas.Entity
653657 message = f"The data connector namespace { new_path } cannot be found or "
654658 "you do not have sufficient permissions to access it."
655659 )
656- await _check_ns_permissions (user , self .authz , dc .namespace , Scope .WRITE )
657- await _check_ns_permissions (user , self .authz , new_usr_grp_ns , Scope .WRITE )
660+ await _check_namespace_permissions (user , self .authz , dc .namespace , Scope .WRITE )
661+ await _check_namespace_permissions (user , self .authz , new_usr_grp_ns , Scope .WRITE )
658662 await _check_dc_slug_not_taken (session , new_usr_grp_ns , new_slug )
659- await _upsert_old_dc_slug (session , dc_slug )
663+ await _upsert_old_entity_slug (session , dc_slug )
660664 dc_slug .project_id = None
661665 dc_slug .namespace_id = new_usr_grp_ns .id
662666 if old_slug != new_slug :
@@ -668,10 +672,10 @@ async def _upsert_old_dc_slug(session: AsyncSession, old_dc_slug: schemas.Entity
668672 message = f"The data connector namespace { new_path } cannot be found or "
669673 "you do not have sufficient permissions to access it."
670674 )
671- await _check_ns_permissions (user , self .authz , dc .namespace , Scope .WRITE )
672- await _check_ns_permissions (user , self .authz , new_proj_ns , Scope .WRITE )
675+ await _check_namespace_permissions (user , self .authz , dc .namespace , Scope .WRITE )
676+ await _check_namespace_permissions (user , self .authz , new_proj_ns , Scope .WRITE )
673677 await _check_dc_slug_not_taken (session , new_proj_ns , new_slug )
674- await _upsert_old_dc_slug (session , dc_slug )
678+ await _upsert_old_entity_slug (session , dc_slug )
675679 dc_slug .project_id = new_proj_ns .underlying_resource_id
676680 dc_slug .namespace_id = new_proj_ns .id
677681 if old_slug != new_slug :
@@ -682,6 +686,101 @@ async def _upsert_old_dc_slug(session: AsyncSession, old_dc_slug: schemas.Entity
682686 "path combination when changing data connector ownership."
683687 )
684688
689+ async def move_project (
690+ self ,
691+ user : base_models .APIUser ,
692+ project : Project ,
693+ new : ProjectPath ,
694+ session : AsyncSession | None = None ,
695+ ) -> None :
696+ """Rename or move a project."""
697+
698+ async def _get_project_slug (session : AsyncSession , project_id : ULID ) -> schemas .EntitySlugORM :
699+ """Helper function to get the project slug or raise an exception."""
700+ project_slug = await session .scalar (
701+ select (schemas .EntitySlugORM )
702+ .where (schemas .EntitySlugORM .project_id == project_id )
703+ .where (schemas .EntitySlugORM .data_connector_id .is_ (None ))
704+ )
705+ if not project_slug :
706+ raise errors .missing_or_unauthorized (ResourceType .project , project_id )
707+ return project_slug
708+
709+ async def _get_dcs_in_project (session : AsyncSession , project_id : ULID ) -> AsyncIterable [DataConnectorORM ]:
710+ stmt = select (DataConnectorORM ).where (
711+ DataConnectorORM .slug .has (
712+ and_ (
713+ schemas .EntitySlugORM .project_id == project_id ,
714+ schemas .EntitySlugORM .data_connector_id .is_not (None ),
715+ )
716+ )
717+ )
718+ stream = await session .stream_scalars (stmt )
719+ async for dc in stream :
720+ yield dc
721+
722+ async def _check_proj_slug_not_taken (
723+ session : AsyncSession , new_namespace : models .GroupNamespace | models .UserNamespace , new_slug : Slug
724+ ) -> None :
725+ """Helper function to make sure a new project slug is available."""
726+ stmt = (
727+ select (sa_count ("*" ))
728+ .select_from (schemas .EntitySlugORM )
729+ .where (schemas .EntitySlugORM .namespace_id == new_namespace .id )
730+ .where (schemas .EntitySlugORM .data_connector_id .is_ (None ))
731+ .where (schemas .EntitySlugORM .project_id .is_not (None ))
732+ .where (schemas .EntitySlugORM .slug == new_slug .value )
733+ )
734+
735+ cnt = await session .scalar (stmt )
736+ if cnt is not None and cnt > 0 :
737+ raise errors .ValidationError (
738+ message = f"The owner already has a project with slug { new_slug } , please try a different one"
739+ )
740+
741+ session_ctx : AsyncSession | nullcontext = nullcontext ()
742+ transaction : AsyncSessionTransaction | nullcontext = nullcontext ()
743+ if session is None :
744+ session = self .session_maker ()
745+ session_ctx = session
746+ transaction = session .begin ()
747+
748+ required_scope = Scope .DELETE
749+ allowed = await self .authz .has_permission (user , ResourceType .project , project .id , required_scope )
750+ if not allowed :
751+ raise errors .missing_or_unauthorized (ResourceType .project , project .id )
752+
753+ async with session_ctx , transaction :
754+ proj_slug = await _get_project_slug (session , project .id )
755+ project_ns_changed = project .path .parent () != new .parent ()
756+ project_slug_changed = project .path .last () != new .last ()
757+ if project_ns_changed :
758+ new_ns = await self .get_namespace_by_path (user , new .parent (), session )
759+ if not new_ns :
760+ raise errors .missing_or_unauthorized ("namespace" , new .serialize ())
761+ await _check_namespace_permissions (user , self .authz , project .namespace , Scope .WRITE )
762+ await _check_namespace_permissions (user , self .authz , new_ns , Scope .WRITE )
763+ await _check_proj_slug_not_taken (session , new_ns , project .path .second )
764+ await _upsert_old_entity_slug (session , proj_slug )
765+ proj_slug .namespace_id = new_ns .id
766+ await session .flush ()
767+ await session .refresh (proj_slug )
768+ if project_slug_changed :
769+ await _check_proj_slug_not_taken (session , project .namespace , new .last ())
770+ await _upsert_old_entity_slug (session , proj_slug )
771+ proj_slug .slug = new .last ().value
772+ await session .flush ()
773+ await session .refresh (proj_slug )
774+ if project_ns_changed or project_slug_changed :
775+ # move all data connectors from the project in the new namespace too
776+ async for dc in _get_dcs_in_project (session , project .id ):
777+ dc_model = dc .dump ()
778+ if isinstance (dc_model , GlobalDataConnector ):
779+ continue
780+ await self .move_data_connector (
781+ user , dc_model , new / base_models .DataConnectorSlug (dc_model .slug ), session
782+ )
783+
685784 async def get_user_namespace (self , user_id : str ) -> models .Namespace | None :
686785 """Get the namespace corresponding to a given user."""
687786 async with self .session_maker () as session , session .begin ():
0 commit comments