Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 30 additions & 6 deletions site/cds_rdm/generators.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@
from invenio_records_permissions.generators import AuthenticatedUser, Generator
from invenio_search.engine import dsl

oais_archiver_role = RoleNeed("oais-archiver")
archiver_read_all_role = RoleNeed("archiver-read-all")
archiver_notification_role = RoleNeed("archiver-notification")

clc_sync_action = action_factory("clc-sync")
clc_sync_permission = Permission(clc_sync_action)
Expand Down Expand Up @@ -62,25 +63,48 @@ class AuthenticatedRegularUser(AuthenticatedUser):
def excludes(self, **kwargs):
"""Exclude service/robot accounts."""
excludes = super().excludes(**kwargs)
return excludes + [oais_archiver_role]
return excludes + [archiver_read_all_role, archiver_notification_role]


class Archiver(Generator):
"""Allows system_process role."""
class ArchiverRole(Generator):
"""Base generator class to define Archiver roles."""

@property
def archiver_role(self):
"""Role property."""
raise NotImplementedError()

def needs(self, **kwargs):
"""Enabling Needs."""
return [oais_archiver_role]
return [self.archiver_role]

def query_filter(self, identity=None, **kwargs):
"""Filters for current identity as system process."""
for need in identity.provides:
if need == oais_archiver_role:
if need == self.archiver_role:
return dsl.Q("match_all")
else:
return []


class ArchiverRead(ArchiverRole):
"""Allows by archiver_read_all role."""

@property
def archiver_role(self):
"""Role property."""
return archiver_read_all_role


class ArchiverNotification(ArchiverRole):
"""Allows by archiver_notification role."""

@property
def archiver_role(self):
"""Role property."""
return archiver_notification_role


class Librarian(Generator):
"""Allows librarian role."""

Expand Down
19 changes: 10 additions & 9 deletions site/cds_rdm/permissions.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@
from invenio_users_resources.services.permissions import UserManager

from .generators import (
Archiver,
ArchiverNotification,
ArchiverRead,
AuthenticatedRegularUser,
CERNEmailsGroups,
Librarian,
Expand Down Expand Up @@ -65,19 +66,19 @@ class CDSRDMRecordPermissionPolicy(RDMRecordPermissionPolicy):
"""Record permission policy."""

can_create = [AuthenticatedRegularUser(), SystemProcess()]
can_read = RDMRecordPermissionPolicy.can_read + [Archiver()]
can_search = RDMRecordPermissionPolicy.can_search + [Archiver()]
can_read_files = RDMRecordPermissionPolicy.can_read_files + [Archiver()]
can_read = RDMRecordPermissionPolicy.can_read + [ArchiverRead()]
can_search = RDMRecordPermissionPolicy.can_search + [ArchiverRead()]
can_read_files = RDMRecordPermissionPolicy.can_read_files + [ArchiverRead()]
can_get_content_files = RDMRecordPermissionPolicy.can_get_content_files + [
Archiver()
ArchiverRead()
]
can_media_get_content_files = RDMRecordPermissionPolicy.can_get_content_files + [
Archiver()
ArchiverRead()
]
can_read_deleted = [
IfRecordDeleted(
then_=[UserManager, SystemProcess()],
else_=can_read + [Archiver()],
else_=can_read + [ArchiverRead()],
)
]

Expand All @@ -92,8 +93,8 @@ class CDSRDMRecordPermissionPolicy(RDMRecordPermissionPolicy):
class CDSRDMPreservationSyncPermissionPolicy(DefaultPreservationInfoPermissionPolicy):
"""PreservationSync permission policy."""

can_read = RDMRecordPermissionPolicy.can_read + [Archiver()]
can_create = [Archiver()]
can_read = RDMRecordPermissionPolicy.can_read + [ArchiverNotification()]
can_create = [ArchiverNotification()]


class CDSRequestsPermissionPolicy(RDMRequestsPermissionPolicy):
Expand Down
2 changes: 1 addition & 1 deletion site/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -541,7 +541,7 @@ def archiver(UserFixture, app, db):
confirmed=True,
)
user_obj = user.create(app, db)
r = ds.create_role(name="oais-archiver", description="1234")
r = ds.create_role(name="archiver-read-all", description="1234")
ds.add_role_to_user(user.user, r)

return user
Expand Down
Loading