Skip to content
Merged
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 47 additions & 12 deletions mergin/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,13 +45,27 @@ class TokenError(Exception):
pass


class AuthTokenExpiredError(Exception):
pass


class ServerType(Enum):
OLD = auto() # Server is old and does not support workspaces
CE = auto() # Server is Community Edition
EE = auto() # Server is Enterprise Edition
SAAS = auto() # Server is SaaS


class LoginType(Enum):
"""Types of login supported by Mergin Maps."""

PASSWORD = "password" # classic login with username and password
SSO = "sso" # login with SSO token

def __str__(self) -> str:
return self.value


def decode_token_data(token):
token_prefix = "Bearer ."
if not token.startswith(token_prefix):
Expand Down Expand Up @@ -80,14 +94,24 @@ class MerginClient:
Currently, only HTTP proxies are supported.
"""

def __init__(self, url=None, auth_token=None, login=None, password=None, plugin_version=None, proxy_config=None):
def __init__(
self,
url=None,
auth_token=None,
login=None,
password=None,
plugin_version=None,
proxy_config=None,
login_type: LoginType = LoginType.PASSWORD,
):
self.url = url if url is not None else MerginClient.default_url()
self._auth_params = None
self._auth_session = None
self._user_info = None
self._server_type = None
self._server_version = None
self.client_version = "Python-client/" + __version__
self._login_type = login_type
if plugin_version is not None: # this could be e.g. "Plugin/2020.1 QGIS/3.14"
self.client_version += " " + plugin_version
self.setup_logging()
Expand Down Expand Up @@ -134,15 +158,20 @@ def __init__(self, url=None, auth_token=None, login=None, password=None, plugin_
self.opener = urllib.request.build_opener(*handlers, https_handler)
urllib.request.install_opener(self.opener)

if login and not password:
raise ClientError("Unable to log in: no password provided for '{}'".format(login))
if password and not login:
raise ClientError("Unable to log in: password provided but no username/email")
if self._login_type == LoginType.PASSWORD:
if login and not password:
raise ClientError("Unable to log in: no password provided for '{}'".format(login))
if password and not login:
raise ClientError("Unable to log in: password provided but no username/email")

if login and password:
self._auth_params = {"login": login, "password": password}
if login and password:
self._auth_params = {"login": login, "password": password}
if not self._auth_session:
self.login(login, password)

elif self._login_type == LoginType.SSO:
if not self._auth_session:
self.login(login, password)
raise ClientError("Unable to log in: no auth token provided for SSO login")

def setup_logging(self):
"""Setup Mergin Maps client logging."""
Expand Down Expand Up @@ -189,12 +218,18 @@ def wrapper(self, *args):
# Refresh auth token if it expired or will expire very soon
delta = self._auth_session["expire"] - datetime.now(timezone.utc)
if delta.total_seconds() < 5:
self.log.info("Token has expired - refreshing...")
self.login(self._auth_params["login"], self._auth_params["password"])
if self._login_type == LoginType.PASSWORD:
self.log.info("Token has expired - refreshing...")
self.login(self._auth_params["login"], self._auth_params["password"])
elif self._login_type == LoginType.SSO:
raise AuthTokenExpiredError("Token has expired - please re-login")
else:
# Create a new authorization token
self.log.info(f"No token - login user: {self._auth_params['login']}")
self.login(self._auth_params["login"], self._auth_params["password"])
if self._login_type == LoginType.PASSWORD:
self.log.info(f"No token - login user: {self._auth_params['login']}")
self.login(self._auth_params["login"], self._auth_params["password"])
elif self._login_type == LoginType.SSO:
raise AuthTokenExpiredError("Token has expired - please re-login")
return f(self, *args)

return wrapper
Expand Down