Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 92 additions & 1 deletion mergin/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
import typing
import warnings

from .common import ClientError, LoginError, InvalidProject, ErrorCode
from .common import ClientError, LoginError, WorkspaceRole, ProjectRole
from .merginproject import MerginProject
from .client_pull import (
download_file_finalize,
Expand All @@ -36,6 +36,7 @@
from .version import __version__

this_dir = os.path.dirname(os.path.realpath(__file__))
json_headers = {"Content-Type": "application/json"}


class TokenError(Exception):
Expand Down Expand Up @@ -244,6 +245,11 @@ def patch(self, path, data=None, headers={}):
request = urllib.request.Request(url, data, headers, method="PATCH")
return self._do_request(request)

def delete(self, path):
url = urllib.parse.urljoin(self.url, urllib.parse.quote(path))
request = urllib.request.Request(url, method="DELETE")
return self._do_request(request)

def login(self, login, password):
"""
Authenticate login credentials and store session token
Expand Down Expand Up @@ -1228,3 +1234,88 @@ def has_editor_support(self):
Returns whether the server version is acceptable for editor support.
"""
return is_version_acceptable(self.server_version(), "2024.4.0")

def create_user(
self,
email: str,
password: str,
workspace_id: int,
workspace_role: WorkspaceRole,
username: str = None,
notify_user: bool = False,
):
"""
Create a new user in a workspace. The username is generated from the email address.
"""
params = {
"email": email,
"password": password,
"workspace_id": workspace_id,
"role": workspace_role.value,
"notify_user": notify_user,
}
if username:
params["username"] = username
self.post("v2/users", params, json_headers)

def get_workspace_member(self, workspace_id: int, user_id: int):
"""
Get a workspace member detail
"""
resp = self.get(f"v2/workspaces/{workspace_id}/members/{user_id}")
return json.load(resp)

def list_workspace_members(self, workspace_id: int):
"""
Get a list of workspace members
"""
resp = self.get(f"v2/workspaces/{workspace_id}/members")
return json.load(resp)

def update_workspace_member(
self, workspace_id: int, user_id: int, workspace_role: WorkspaceRole, reset_projects_roles: bool = False
):
"""
Update workspace role of a workspace member, optionally resets the projects role
"""
params = {
"reset_projects_roles": reset_projects_roles,
"workspace_role": workspace_role.value,
}
resp = self.patch(f"v2/workspaces/{workspace_id}/members/{user_id}", params, json_headers)
return json.load(resp)

def remove_workspace_member(self, workspace_id: int, user_id: int):
"""
Remove a user from workspace members
"""
self.delete(f"v2/workspaces/{workspace_id}/members/{user_id}")

def list_project_collaborators(self, project_id: int):
"""
Get a list of project collaborators
"""
project_collaborators = self.get(f"v2/projects/{project_id}/collaborators")
return json.load(project_collaborators)

def add_project_collaborator(self, project_id: int, user: str, project_role: ProjectRole):
"""
Add a user to project collaborators and grant them a project role
"""
params = {"role": project_role.value, "user": user}
project_collaborator = self.post(f"v2/projects/{project_id}/collaborators", params, json_headers)
return json.load(project_collaborator)

def update_project_collaborator(self, project_id: int, user_id: int, project_role: ProjectRole):
"""
Update project role of the existing project collaborator
"""
params = {"role": project_role.value}
project_collaborator = self.patch(f"v2/projects/{project_id}/collaborators/{user_id}", params, json_headers)
return json.load(project_collaborator)

def remove_project_collaborator(self, project_id: int, user_id: int):
"""
Remove a user from project collaborators
"""
self.delete(f"v2/projects/{project_id}/collaborators/{user_id}")
24 changes: 24 additions & 0 deletions mergin/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,3 +65,27 @@ class InvalidProject(Exception):

import dateutil.parser
from dateutil.tz import tzlocal


class WorkspaceRole(Enum):
"""
Workspace roles
"""

GUEST = "guest"
READER = "reader"
EDITOR = "editor"
WRITER = "writer"
ADMIN = "admin"
OWNER = "owner"


class ProjectRole(Enum):
"""
Project roles
"""

READER = "reader"
EDITOR = "editor"
WRITER = "writer"
OWNER = "owner"
6 changes: 3 additions & 3 deletions mergin/editor.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from itertools import filterfalse
from typing import Callable
from typing import Callable, Dict, List

from .utils import is_mergin_config, is_qgis_file, is_versioned_file

Expand All @@ -24,7 +24,7 @@ def is_editor_enabled(mc, project_info: dict) -> bool:
return server_support and project_role == EDITOR_ROLE_NAME


def _apply_editor_filters(changes: dict[str, list[dict]]) -> dict[str, list[dict]]:
def _apply_editor_filters(changes: Dict[str, List[dict]]) -> Dict[str, List[dict]]:
"""
Applies editor-specific filters to the changes dictionary, removing any changes to files that are not in the editor's list of allowed files.

Expand All @@ -40,7 +40,7 @@ def _apply_editor_filters(changes: dict[str, list[dict]]) -> dict[str, list[dict
return changes


def filter_changes(mc, project_info: dict, changes: dict[str, list[dict]]) -> dict[str, list[dict]]:
def filter_changes(mc, project_info: dict, changes: Dict[str, List[dict]]) -> Dict[str, List[dict]]:
"""
Filters the given changes dictionary based on the editor's enabled state.

Expand Down
86 changes: 84 additions & 2 deletions mergin/test/test_client.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import json
import logging
import os
import random
import tempfile
import subprocess
import shutil
Expand All @@ -19,7 +20,6 @@
decode_token_data,
TokenError,
ServerType,
ErrorCode,
)
from ..client_push import push_project_async, push_project_cancel
from ..client_pull import (
Expand All @@ -39,7 +39,7 @@
from ..merginproject import pygeodiff
from ..report import create_report
from ..editor import EDITOR_ROLE_NAME, filter_changes, is_editor_enabled

from ..common import ErrorCode, WorkspaceRole, ProjectRole

SERVER_URL = os.environ.get("TEST_MERGIN_URL")
API_USER = os.environ.get("TEST_API_USERNAME")
Expand All @@ -51,6 +51,8 @@
CHANGED_SCHEMA_DIR = os.path.join(os.path.dirname(os.path.realpath(__file__)), "modified_schema")
STORAGE_WORKSPACE = os.environ.get("TEST_STORAGE_WORKSPACE", "testpluginstorage")

json_headers = {"Content-Type": "application/json"}


def get_limit_overrides(storage: int):
return {"storage": storage, "projects": 2, "api_allowed": True}
Expand Down Expand Up @@ -2742,3 +2744,83 @@ def test_workspace_requests(mc2: MerginClient):
assert service["plan"]["product_id"] == None
assert service["plan"]["type"] == "custom"
assert service["subscription"] == None


def test_access_management(mc: MerginClient, mc2: MerginClient):
# create a user in the workspace
workspace_id = next((w["id"] for w in mc.workspaces_list() if w["name"] == mc.username()))
email = "create_user" + str(random.randint(1000, 9999)) + "@client.py"
password = "Il0vemergin"
ws_role = WorkspaceRole.WRITER
mc.create_user(email, password, workspace_id, ws_role)
workspace_members = mc.list_workspace_members(workspace_id)
new_user = next((m for m in workspace_members if m["email"] == email))
assert new_user
assert new_user["workspace_role"] == ws_role.value
# get workspace member
ws_member = mc.get_workspace_member(workspace_id, new_user["id"])
assert ws_member["email"] == email
assert ws_member["workspace_role"] == ws_role.value
updated_role = WorkspaceRole.ADMIN
# update workspace member
mc.update_workspace_member(workspace_id, new_user["id"], updated_role)
updated_user = mc.get_workspace_member(workspace_id, new_user["id"])
assert updated_user["workspace_role"] == updated_role.value
# test permissions - a different client cannot update the role
with pytest.raises(ClientError, match=f"You do not have admin permissions to workspace"):
mc2.update_workspace_member(workspace_id, new_user["id"], ws_role)
# remove workspace member
mc.remove_workspace_member(workspace_id, new_user["id"])
workspace_members = mc.list_workspace_members(workspace_id)
assert not any(m["id"] == new_user["id"] for m in workspace_members)
# duplicated call
with pytest.raises(ClientError) as exc_info:
mc.remove_workspace_member(workspace_id, new_user["id"])
assert exc_info.value.http_error == 404
# add project
test_project_name = "test_collaborators"
test_project_fullname = API_USER + "/" + test_project_name
project_dir = os.path.join(TMP_DIR, test_project_name, API_USER)
cleanup(mc, test_project_fullname, [project_dir])
mc.create_project(test_project_name)
project_info = get_project_info(mc, API_USER, test_project_name)
test_project_id = project_info["id"]
project_role = ProjectRole.READER
# user must be added to project collaborators before updating project role
updated_role = ProjectRole.OWNER
with pytest.raises(ClientError) as exc_info2:
mc.update_project_collaborator(test_project_id, new_user["id"], updated_role)
assert exc_info2.value.http_error == 404
# add project collaborator
mc.add_project_collaborator(test_project_id, new_user["email"], project_role)
collaborators = mc.list_project_collaborators(test_project_id)
new_collaborator = next((c for c in collaborators if c["id"] == new_user["id"]))
assert new_collaborator
assert new_collaborator["project_role"] == project_role.value
# update project collaborator
mc.update_project_collaborator(test_project_id, new_user["id"], updated_role)
collaborators = mc.list_project_collaborators(test_project_id)
updated_collaborator = next((c for c in collaborators if c["id"] == new_user["id"]))
assert updated_collaborator["project_role"] == updated_role.value
# remove project collaborator
mc.remove_project_collaborator(test_project_id, new_user["id"])
collaborators = mc.list_project_collaborators(test_project_id)
assert not any(c["id"] == new_user["id"] for c in collaborators)
# try to assign new editor when editors limit is reached
ws_usage = mc.workspace_usage(workspace_id)
editors_usage = ws_usage["editors"]["editors_count"] + ws_usage["editors"]["invitations_count"]
mc.patch(
f"/v1/tests/workspaces/{workspace_id}",
{"limits_override": {"editors": editors_usage}},
json_headers,
)
editor_role = ProjectRole.EDITOR
with pytest.raises(ClientError, match="Maximum number of editors in this workspace is reached."):
mc.add_project_collaborator(test_project_id, new_user["email"], editor_role)
# set limits to the original state
orig_projects_limit = ws_usage["projects"]["quota"]
mc.patch(
f"/v1/tests/workspaces/{workspace_id}",
{"limits_override": {"projects": orig_projects_limit}},
json_headers,
)
Loading