diff --git a/README.md b/README.md index 94f7a830..869f56cb 100644 --- a/README.md +++ b/README.md @@ -37,3 +37,56 @@ updateTektonDefinitions(pipelinesNamespace, "/mascli/templates/ibm-mas-tekton.ya pipelineURL = launchUpgradePipeline(self.dynamicClient, instanceId) print(pipelineURL) ``` + + +mas-devops-create-initial-users +--------------------------------------------- + + +Add to /etc/hosts +``` +127.0.0.1 tgk01-masdev.mas-tgk01-manage.svc.cluster.local +127.0.0.1 coreapi.mas-tgk01-core.svc.cluster.local +127.0.0.1 admin-dashboard.mas-tgk01-core.svc.cluster.local +``` + +```bash +SM_AWS_REGION="" +SM_AWS_ACCESS_KEY_ID="" +SM_AWS_SECRET_ACCESS_KEY="" + +aws configure set default.region ${SM_AWS_REGION} +aws configure set aws_access_key_id ${SM_AWS_ACCESS_KEY_ID} +aws configure set aws_secret_access_key ${SM_AWS_SECRET_ACCESS_KEY} + + +oc login --token=sha256~xxx --server=https://xxx:6443 + +oc port-forward service/admin-dashboard 8445:443 -n mas-tgk01-core +oc port-forward service/coreapi 8444:443 -n mas-tgk01-core +oc port-forward service/tgk01-masdev 8443:443 -n mas-tgk01-manage + +mas-devops-create-initial-users-for-saas \ + --mas-instance-id tgk01 \ + --mas-workspace-id masdev \ + --log-level INFO \ + --initial-users-secret-name "aws-dev/noble4/tgk01/initial_users" \ + --manage-api-port 8443 \ + --coreapi-port 8444 \ + --admin-dashboard-port 8445 + + +mas-devops-create-initial-users-for-saas \ + --mas-instance-id tgk01 \ + --mas-workspace-id masdev \ + --log-level INFO \ + --initial-users-yaml-file /home/tom/workspaces/notes/mascore3423/example-users-single.yaml \ + --manage-api-port 8443 \ + --coreapi-port 8444 \ + --admin-dashboard-port 8445 +``` + +Example of initial_users secret: +```json +{"john.smith1@example.com":"primary,john1,smith1","john.smith2@example.com":"primary,john2,smith2","john.smith3@example.com":"secondary,john3,smith3"} +``` \ No newline at end of file diff --git a/bin/mas-devops-create-initial-users-for-saas b/bin/mas-devops-create-initial-users-for-saas new file mode 100644 index 00000000..bac45ce9 --- /dev/null +++ b/bin/mas-devops-create-initial-users-for-saas @@ -0,0 +1,146 @@ +#!/usr/bin/env python3 + +# ***************************************************************************** +# Copyright (c) 2025 IBM Corporation and other Contributors. +# +# All rights reserved. This program and the accompanying materials +# are made available under the terms of the Eclipse Public License v1.0 +# which accompanies this distribution, and is available at +# http://www.eclipse.org/legal/epl-v10.html +# +# ***************************************************************************** + +from kubernetes import client, config +from kubernetes.config.config_exception import ConfigException +import argparse +import logging +import urllib3 +urllib3.disable_warnings() +import yaml +import json +import sys + +import boto3 +from botocore.exceptions import ClientError + +from mas.devops.users import MASUserUtils + + + +if __name__ == "__main__": + parser = argparse.ArgumentParser() + + # Primary Options + parser.add_argument("--mas-instance-id", required=True) + parser.add_argument("--mas-workspace-id", required=True) + parser.add_argument("--log-level", required=False, choices=["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"], default="INFO") + parser.add_argument("--coreapi-port", required=False, default=443) + parser.add_argument("--admin-dashboard-port", required=False, default=443) + parser.add_argument("--manage-api-port", required=False, default=443) + + + group = parser.add_mutually_exclusive_group(required=True) + group.add_argument("--initial-users-yaml-file") + group.add_argument("--initial-users-secret-name") + + args, unknown = parser.parse_known_args() + + log_level = getattr(logging, args.log_level) + + logger = logging.getLogger() + logger.setLevel(log_level) + + ch = logging.StreamHandler() + ch.setLevel(log_level) + chFormatter = logging.Formatter( + "%(asctime)-25s %(name)-50s [%(threadName)s] %(levelname)-8s %(message)s" + ) + ch.setFormatter(chFormatter) + logger.addHandler(ch) + + mas_instance_id = args.mas_instance_id + mas_workspace_id = args.mas_workspace_id + initial_users_yaml_file = args.initial_users_yaml_file + initial_users_secret_name = args.initial_users_secret_name + coreapi_port = args.coreapi_port + admin_dashboard_port = args.admin_dashboard_port + manage_api_port = args.manage_api_port + + + logger.info("Configuration:") + logger.info("--------------") + logger.info(f"mas_instance_id: {mas_instance_id}") + logger.info(f"mas_workspace_id: {mas_workspace_id}") + logger.info(f"initial_users_yaml_file: {initial_users_yaml_file}") + logger.info(f"initial_users_secret_name: {initial_users_secret_name}") + logger.info(f"log_level: {log_level}") + logger.info(f"coreapi_port: {coreapi_port}") + logger.info(f"admin_dashboard_port: {admin_dashboard_port}") + logger.info(f"manage_api_port: {manage_api_port}") + logger.info("") + + try: + # Try to load in-cluster configuration + config.load_incluster_config() + logger.debug("Loaded in-cluster configuration") + except ConfigException: + # If that fails, fall back to kubeconfig file + config.load_kube_config() + logger.debug("Loaded kubeconfig file") + + + user_utils = MASUserUtils(mas_instance_id, mas_workspace_id, client.api_client.ApiClient(), coreapi_port=coreapi_port, admin_dashboard_port=admin_dashboard_port, manage_api_port=manage_api_port) + + if initial_users_secret_name is not None: + + logger.info(f"Loading initial_users configuration from secret {initial_users_secret_name}") + + session = boto3.session.Session() + aws_sm_client = session.client( + service_name='secretsmanager', + ) + try: + initial_users_secret = aws_sm_client.get_secret_value( # pragma: allowlist secret + SecretId=initial_users_secret_name + ) + except ClientError as e: + if e.response['Error']['Code'] == 'ResourceNotFoundException': + logger.info(f"Secret {initial_users_secret_name} was not found, nothing to do, exiting now.") + sys.exit(0) + + raise Exception(f"Failed to fetch secret {initial_users_secret_name}: {str(e)}") + + secret_json = json.loads(initial_users_secret['SecretString']) + initial_users = user_utils.parse_initial_users_from_aws_secret_json(secret_json) + elif initial_users_yaml_file is not None: + with open(initial_users_yaml_file, 'r') as file: + initial_users = yaml.safe_load(file) + else: + raise Exception("Something unexpected happened") + + + result = user_utils.create_initial_users_for_saas(initial_users) + + # if user details were sourced from an AWS SM secret, remove the completed entries from the secret + # so we don't try and resync them the next time round (and potentially undo an update made by a customer) + if initial_users_secret_name is not None: + has_updates = False + for completed_user in result["completed"]: + logger.info(f"Removing synced user {completed_user['email']} from {initial_users_secret_name} secret") + secret_json.pop(completed_user["email"]) + has_updates = True + + if has_updates: + logger.info(f"Updating secret {initial_users_secret_name}") + try: + aws_sm_client.update_secret( # pragma: allowlist secret + SecretId=initial_users_secret_name, + SecretString=json.dumps(secret_json) + ) + except ClientError as e: + raise Exception(f"Failed to update secret {initial_users_secret_name}: {str(e)}") + + + if len(result["failed"]) > 0: + failed_user_ids = list(map(lambda u : u["email"], result["failed"])) + raise Exception(f"Sync failed for the following user IDs {failed_user_ids}") \ No newline at end of file diff --git a/setup.py b/setup.py index 520b4673..d3f72dd8 100644 --- a/setup.py +++ b/setup.py @@ -60,14 +60,16 @@ def get_version(rel_path): 'kubernetes', # Apache Software License 'kubeconfig', # BSD License 'jinja2', # BSD License - 'jinja2-base64-filters' # MIT License + 'jinja2-base64-filters', # MIT License + 'boto3' # Apache Software License ], extras_require={ 'dev': [ - 'build', # MIT License - 'flake8', # MIT License - 'pytest', # MIT License - 'pytest-mock' # MIT License + 'build', # MIT License + 'flake8', # MIT License + 'pytest', # MIT License + 'pytest-mock', # MIT License + 'requests-mock' # Apache Software License ] }, classifiers=[ @@ -85,6 +87,7 @@ def get_version(rel_path): ], scripts=[ 'bin/mas-devops-db2-validate-config', + 'bin/mas-devops-create-initial-users-for-saas', 'bin/mas-devops-saas-job-cleaner' ] ) diff --git a/src/mas/devops/users.py b/src/mas/devops/users.py new file mode 100644 index 00000000..9a094509 --- /dev/null +++ b/src/mas/devops/users.py @@ -0,0 +1,937 @@ +# ***************************************************************************** +# Copyright (c) 2025 IBM Corporation and other Contributors. +# +# All rights reserved. This program and the accompanying materials +# are made available under the terms of the Eclipse Public License v1.0 +# which accompanies this distribution, and is available at +# http://www.eclipse.org/legal/epl-v10.html +# +# ***************************************************************************** + +import requests +import logging +from kubernetes import client +from openshift.dynamic import DynamicClient +import base64 +import atexit +import tempfile +import os +import time +import re + + +class MASUserUtils(): + ''' + A collection of utilities for interacting with the MAS Core V3 User APIs and related APIs. + Each instance of this class is tied to a specific MAS instance and workspace ID. + ''' + + MAXADMIN = "MAXADMIN" + + def __init__(self, mas_instance_id: str, mas_workspace_id: str, k8s_client: client.api_client.ApiClient, coreapi_port: int = 443, admin_dashboard_port: int = 443, manage_api_port: int = 443): + self.mas_instance_id = mas_instance_id + self.mas_workspace_id = mas_workspace_id + self.logger = logging.getLogger(f"{__name__}.{self.__class__.__name__}") + + self.mas_core_namespace = f"mas-{self.mas_instance_id}-core" + self.manage_namespace = f"mas-{self.mas_instance_id}-manage" + + dyn_client = DynamicClient(k8s_client) + self.v1_secrets = dyn_client.resources.get(api_version="v1", kind="Secret") + + self._mas_superuser_credentials = None + self._superuser_auth_token = None + + self.mas_admin_url_internal = f'https://admin-dashboard.{self.mas_core_namespace}.svc.cluster.local:{admin_dashboard_port}' + self._admin_internal_tls_secret = None + self._admin_internal_ca_pem_file_path = None + + self.mas_api_url_internal = f'https://coreapi.{self.mas_core_namespace}.svc.cluster.local:{coreapi_port}' + self._core_internal_tls_secret = None + self._core_internal_ca_pem_file_path = None + + self.manage_api_url_internal = f'https://{self.mas_instance_id}-{self.mas_workspace_id}.{self.manage_namespace}.svc.cluster.local:{manage_api_port}' + self._manage_internal_tls_secret = None + self._manage_internal_ca_pem_file_path = None + self._manage_internal_client_pem_file_path = None + + self._mas_workspace_application_ids = None + + @property + def mas_superuser_credentials(self): + if self._mas_superuser_credentials is None: + k8s_secret = self.v1_secrets.get(name=f"{self.mas_instance_id}-credentials-superuser", namespace=self.mas_core_namespace) + self._mas_superuser_credentials = dict( + username=base64.b64decode(k8s_secret.data["username"]).decode("utf-8"), + password=base64.b64decode(k8s_secret.data["password"]).decode("utf-8"), + ) + return self._mas_superuser_credentials + + @property + def admin_internal_tls_secret(self): + if self._admin_internal_tls_secret is None: + self._admin_internal_tls_secret = self.v1_secrets.get(name=f"{self.mas_instance_id}-admindashboard-cert-internal", namespace=self.mas_core_namespace) + return self._admin_internal_tls_secret + + @property + def admin_internal_ca_pem_file_path(self): + if self._admin_internal_ca_pem_file_path is None: + ca = base64.b64decode(self.admin_internal_tls_secret.data["ca.crt"]).decode('utf-8') + with tempfile.NamedTemporaryFile(delete=False, suffix=".pem") as pem_file: + pem_file.write(ca.encode()) + pem_file.flush() + pem_file.close() + atexit.register(os.remove, pem_file.name) + self._admin_internal_ca_pem_file_path = pem_file.name + return self._admin_internal_ca_pem_file_path + + @property + def core_internal_tls_secret(self): + if self._core_internal_tls_secret is None: + self._core_internal_tls_secret = self.v1_secrets.get(name=f"{self.mas_instance_id}-coreapi-cert-internal", namespace=self.mas_core_namespace) + return self._core_internal_tls_secret + + @property + def core_internal_ca_pem_file_path(self): + if self._core_internal_ca_pem_file_path is None: + ca = base64.b64decode(self.core_internal_tls_secret.data["ca.crt"]).decode('utf-8') + with tempfile.NamedTemporaryFile(delete=False, suffix=".pem") as pem_file: + pem_file.write(ca.encode()) + pem_file.flush() + pem_file.close() + atexit.register(os.remove, pem_file.name) + self._core_internal_ca_pem_file_path = pem_file.name + return self._core_internal_ca_pem_file_path + + @property + def superuser_auth_token(self): + if self._superuser_auth_token is None: + self.logger.debug("Getting superuser auth token") + url = f"{self.mas_admin_url_internal}/logininitial" + headers = { + "Content-Type": "application/json" + } + querystring = { + "verify": False + } + payload = self.mas_superuser_credentials + response = requests.post( + url, + json=payload, + headers=headers, + params=querystring, + verify=self.admin_internal_ca_pem_file_path + ) + self._superuser_auth_token = response.json()["token"] + return self._superuser_auth_token + + @property + def manage_internal_tls_secret(self): + if self._manage_internal_tls_secret is None: + self._manage_internal_tls_secret = self.v1_secrets.get(name=f"{self.mas_instance_id}-internal-manage-tls", namespace=self.manage_namespace) + return self._manage_internal_tls_secret + + @property + def manage_internal_client_pem_file_path(self): + if self._manage_internal_client_pem_file_path is None: + cert = base64.b64decode(self.manage_internal_tls_secret.data["tls.crt"]).decode('utf-8') + key = base64.b64decode(self.manage_internal_tls_secret.data["tls.key"]).decode('utf-8') + with tempfile.NamedTemporaryFile(delete=False, suffix=".pem") as pem_file: + pem_file.write(key.encode()) + pem_file.write(cert.encode()) + pem_file.flush() + pem_file.close() + atexit.register(os.remove, pem_file.name) + self._manage_internal_client_pem_file_path = pem_file.name + return self._manage_internal_client_pem_file_path + + @property + def manage_internal_ca_pem_file_path(self): + if self._manage_internal_ca_pem_file_path is None: + ca = base64.b64decode(self.manage_internal_tls_secret.data["ca.crt"]).decode('utf-8') + with tempfile.NamedTemporaryFile(delete=False, suffix=".pem") as pem_file: + pem_file.write(ca.encode()) + pem_file.flush() + pem_file.close() + atexit.register(os.remove, pem_file.name) + self._manage_internal_ca_pem_file_path = pem_file.name + return self._manage_internal_ca_pem_file_path + + @property + def mas_workspace_application_ids(self): + if self._mas_workspace_application_ids is None: + self._mas_workspace_application_ids = list(map(lambda ma: ma["id"], self.get_mas_applications_in_workspace())) + return self._mas_workspace_application_ids + + def get_user(self, user_id): + self.logger.debug(f"Getting user {user_id}") + url = f"{self.mas_api_url_internal}/v3/users/{user_id}" + headers = { + "Accept": "application/json", + "x-access-token": self.superuser_auth_token + } + response = requests.get( + url, + headers=headers, + verify=self.core_internal_ca_pem_file_path + ) + + if response.status_code == 404: + return None + + if response.status_code == 200: + return response.json() + + raise Exception(f"{response.status_code} {response.text}") + + def get_or_create_user(self, payload): + ''' + User is identified by payload["id"] field + If user already exists, return their record. No attempt will be made to update the user if other fields of the payload differ from the existing user. + Otherwise, the user will be created. + + Example payload: + { + "id": user_id, + "status": {"active": True}, + "username": username, + "token": password, + "owner": "local", + "emails": [ + { + "value": email, + "type": "Work", + "primary": True + } + ], + "displayName": display_name, + "issuer": "local", + "permissions": { + "systemAdmin": True, + "userAdmin": True, + "apikeyAdmin": True + }, + "entitlement": { + "application": "PREMIUM", + "admin": "ADMIN_PREMIUM", + "alwaysReserveLicense": True + }, + "title": title, + "givenName": given_name, + "familyName": family_name + } + ''' + existing_user = self.get_user(payload["id"]) + + if existing_user is not None: + self.logger.info(f"Existing user {existing_user['id']} found") + return existing_user + + self.logger.info(f"Creating new user {payload['id']}") + + url = f"{self.mas_api_url_internal}/v3/users" + querystring = {} + headers = { + "Content-Type": "application/json", + "x-access-token": self.superuser_auth_token + } + response = requests.post( + url, + json=payload, + headers=headers, + params=querystring, + verify=self.core_internal_ca_pem_file_path + ) + if response.status_code == 201: + return response.json() + + # if response.status_code == 409: + # json = response.json() + # if "exception" in json and "message" in json["exception"] and json["exception"]["message"] == "AIUCO1005E": + # return None + + raise Exception(f"{response.status_code} {response.text}") + + def update_user(self, payload): + user_id = payload["id"] + self.logger.debug(f"Updating user {user_id}") + url = f"{self.mas_api_url_internal}/v3/users/{user_id}" + headers = { + "Accept": "application/json", + "x-access-token": self.superuser_auth_token + } + response = requests.put( + url, + headers=headers, + json=payload, + verify=self.core_internal_ca_pem_file_path + ) + + if response.status_code == 200: + return response.json() + + raise Exception(f"{response.status_code} {response.text}") + + def update_user_display_name(self, user_id, display_name): + self.logger.debug(f"Updating user display name {user_id} to {display_name}") + url = f"{self.mas_api_url_internal}/v3/users/{user_id}" + headers = { + "Accept": "application/json", + "x-access-token": self.superuser_auth_token + } + response = requests.patch( + url, + headers=headers, + json={ + "displayName": display_name + }, + verify=self.core_internal_ca_pem_file_path + ) + + if response.status_code == 200: + return response.json() + + raise Exception(f"{response.status_code} {response.text}") + + def link_user_to_local_idp(self, user_id, email_password=False): + ''' + Checks if user already has a local identity, no-op if so. + Assumes user exists, raises if not + ''' + + # For the sake of idempotency, check if the user already has a local identity + user = self.get_user(user_id) + if user is None: + raise Exception(f"User {user_id} was not found") + + if "identities" in user and "_local" in user["identities"]: + self.logger.info(f"User {user_id} already has a local identity") + return None + + self.logger.info(f"Linking user {user_id} to local IDP (email_password: {email_password})") + url = f"{self.mas_api_url_internal}/v3/users/{user_id}/idps/local" + querystring = { + "emailPassword": email_password + } + payload = { + "idpUserId": user_id, + } + headers = { + "Content-Type": "application/json", + "x-access-token": self.superuser_auth_token + } + response = requests.put( + url, + json=payload, + headers=headers, + params=querystring, + verify=self.core_internal_ca_pem_file_path + ) + if response.status_code != 200: + raise Exception(response.text) + + # Important: HTTP 200 output will contain generated user token; DO NOT LOG + + return None + + def get_user_workspaces(self, user_id): + ''' + Assumes user exists, raises if not. + ''' + self.logger.debug(f"Getting workspaces for user {user_id}") + url = f"{self.mas_api_url_internal}/v3/users/{user_id}/workspaces" + headers = { + "Accept": "application/json", + "x-access-token": self.superuser_auth_token + } + response = requests.get( + url, + headers=headers, + verify=self.core_internal_ca_pem_file_path + ) + + if response.status_code == 404: + raise Exception(f"User {user_id} does not exist") + + if response.status_code == 200: + return response.json() + + raise Exception(f"{response.status_code} {response.text}") + + def add_user_to_workspace(self, user_id, is_workspace_admin=False): + ''' + No-op if user is already a member of the workspace. No attempt will be made to update their existing is_workspace_admin flag if it differs. + ''' + workspaces = self.get_user_workspaces(user_id) + for workspace in workspaces: + if "id" in workspace and workspace["id"] == self.mas_workspace_id: + self.logger.info(f"User {user_id} is already a member of workspace {self.mas_workspace_id}") + return None + + self.logger.info(f"Adding user {user_id} to {self.mas_workspace_id} (is_workspace_admin: {is_workspace_admin})") + url = f"{self.mas_api_url_internal}/workspaces/{self.mas_workspace_id}/users/{user_id}" + querystring = {} + payload = { + "permissions": { + "workspaceAdmin": is_workspace_admin + } + } + headers = { + "Content-Type": "application/json", + "x-access-token": self.superuser_auth_token + } + response = requests.put( + url, + json=payload, + headers=headers, + params=querystring, + verify=self.core_internal_ca_pem_file_path + ) + + if response.status_code == 200: + return None + + raise Exception(f"{response.status_code} {response.text}") + + def get_user_application_permissions(self, user_id, application_id): + self.logger.debug(f"Getting user {user_id} permissions for application {application_id}") + url = f"{self.mas_api_url_internal}/workspaces/{self.mas_workspace_id}/applications/{application_id}/users/{user_id}" + headers = { + "Accept": "application/json", + "x-access-token": self.superuser_auth_token + } + response = requests.get( + url, + headers=headers, + verify=self.core_internal_ca_pem_file_path + ) + + if response.status_code == 200: + return response.json() + + if response.status_code == 404: + return None + + raise Exception(f"{response.status_code} {response.text}") + + def set_user_application_permission(self, user_id, application_id, role): + ''' + No-op if user already has a role established for the application. No attempt will be made to update the role if it differs. + ''' + + existing_permissions = self.get_user_application_permissions(user_id, application_id) + + if existing_permissions is not None: + self.logger.info(f"User {user_id} already has permissions set for application {application_id}") + return None + + self.logger.info(f"Setting user {user_id} role for {application_id} to {role}") + url = f"{self.mas_api_url_internal}/workspaces/{self.mas_workspace_id}/applications/{application_id}/users/{user_id}" + querystring = {} + payload = { + "role": role + } + headers = { + "Content-Type": "application/json", + "x-access-token": self.superuser_auth_token + } + response = requests.put( + url, + json=payload, + headers=headers, + params=querystring, + verify=self.core_internal_ca_pem_file_path + ) + + if response.status_code == 200: + return None + + raise Exception(f"{response.status_code} {response.text}") + + def check_user_sync(self, user_id, application_id, timeout_secs=60 * 10, retry_interval_secs=5): + t_end = time.time() + timeout_secs + self.logger.info(f"Awaiting user {user_id} sync status \"SUCCESS\" for app {application_id}: {t_end - time.time():.2f} seconds remaining") + while time.time() < t_end: + user = self.get_user(user_id) + + if "applications" not in user or application_id not in user["applications"] or "sync" not in user["applications"][application_id] or "state" not in user["applications"][application_id]["sync"]: + self.logger.warning(f"User {user_id} does not have any sync state for application {application_id}, triggering resync") + self.resync_users([user_id]) + time.sleep(retry_interval_secs) + else: + sync_state = user["applications"][application_id]["sync"]["state"] + if sync_state == "SUCCESS": + return + elif sync_state == "ERROR": + self.logger.warning(f"User {user_id} sync state for {application_id} was {sync_state}, triggering resync") + self.resync_users([user_id]) + time.sleep(retry_interval_secs) + else: + self.logger.info(f"User {user_id} sync has not been completed yet for app {application_id} (currrently {sync_state}): {t_end - time.time():.2f} seconds remaining") + time.sleep(retry_interval_secs) + raise Exception(f"User {user_id} sync failed to complete for app within {timeout_secs} seconds") + + def resync_users(self, user_ids): + self.logger.info(f"Issuing resync request(s) for user(s) {user_ids}") + + # The "/v3/users/utils/resync" API is only available in MAS Core >= 9.1 (coreapi >= 25.2.3) + # Until it is available in all supported versions of MAS, + # we instead perform a no-op update to the user to achieve the same effect + # (the "update user profile" API is used as this is this allows us to isolate the displayName field, + # which reduces the impact of concurrent updates leading to race conditions) + + for user_id in user_ids: + user = self.get_user(user_id) + self.update_user_display_name(user_id, user["displayName"]) + + def create_or_get_manage_api_key_for_user(self, user_id, temporary=False): + ''' + Get singleton API for user_id if it already exists, create it if not + if temporary is True AND we created the API key, delete it on exit + ''' + self.logger.debug(f"Attempting to create Manage API Key for user {user_id}") + url = f"{self.manage_api_url_internal}/maximo/api/os/mxapiapikey" + querystring = { + "ccm": 1, + "lean": 1, + } + + payload = { + "expiration": -1, + "userid": user_id + } + headers = { + "Content-Type": "application/json", + } + response = requests.post( + url, + json=payload, + headers=headers, + params=querystring, + verify=self.manage_internal_ca_pem_file_path, + cert=self.manage_internal_client_pem_file_path, + ) + + if response.status_code == 400: + # Assisted by watsonx Code Assistant + try: + error_json = response.json() + except ValueError: + raise Exception(f"{response.status_code} {response.text}") + + if "Error" in error_json and "reasonCode" in error_json["Error"] and error_json["Error"]["reasonCode"] == "BMXAA10051E": + # BMXAA10051E - Only one API key allowed per user. + self.logger.info(f"Reusing existing Manage API Key for user {user_id}") + pass + else: + # any other 400 error is unexpected + raise Exception(f"{response.status_code} {response.text}") + + elif response.status_code == 201: + self.logger.info(f"Creating new Manage API Key for user {user_id}") + else: + # any other status code is unexpected + raise Exception(f"{response.status_code} {response.text}") + + # otherwise, retrieve the apikey (either it already existed, or we just created it) + + apikey = self.get_manage_api_key_for_user(user_id) + if apikey is None: + # either create call reported that apikey already exists, or we created the api key + # so we expect the get call to find it + raise Exception("API key was unexpectedly not found") + + if temporary and response.status_code == 201: + atexit.register(self.delete_manage_api_key, apikey) + + return apikey + + def get_manage_api_key_for_user(self, user_id): + self.logger.debug(f"Getting Manage API Key for user {user_id}") + url = f"{self.manage_api_url_internal}/maximo/api/os/mxapiapikey" + querystring = { + "ccm": 1, + "lean": 1, + "oslc.select": "*", + "oslc.where": f"userid=\"{user_id}\"", + } + headers = { + "Accept": "application/json", + } + + response = requests.get( + url, + headers=headers, + params=querystring, + verify=self.manage_internal_ca_pem_file_path, + cert=self.manage_internal_client_pem_file_path + ) + + if response.status_code == 200: + json = response.json() + + if "member" in json and len(json["member"]) > 0: + return json["member"][0] + + return None + + raise Exception(f"{response.status_code} {response.text}") + + def delete_manage_api_key(self, manage_api_key): + self.logger.info(f"Deleting Manage API Key for user {manage_api_key['userid']}") + + # extract the apikey's identifier from the href + match = re.search(r'\/maximo\/api\/os\/mxapiapikey\/(.*)', manage_api_key['href']) + if match is None: + raise Exception(f"Could not parse API Key href: {manage_api_key['href']}") + + id = match.group(1) + + url = f"{self.manage_api_url_internal}/maximo/api/os/mxapiapikey/{id}" + querystring = { + "ccm": 1, + "lean": 1, + } + headers = { + "Accept": "application/json", + } + response = requests.delete( + url, + headers=headers, + params=querystring, + verify=self.manage_internal_ca_pem_file_path, + cert=self.manage_internal_client_pem_file_path, + ) + + if response.status_code != 204 and response.status_code != 404: + raise Exception(f"{response.status_code} {response.text}") + + def get_manage_group_id(self, group_name, manage_api_key): + self.logger.debug(f"Getting ID for Manage group with name {group_name}") + url = f"{self.manage_api_url_internal}/maximo/api/os/mxapigroup" + querystring = { + "ccm": 1, + "lean": 1, + "oslc.select": "maxgroupid", + "oslc.where": f"groupname=\"{group_name}\"", + } + headers = { + "Accept": "application/json", + "apikey": manage_api_key["apikey"], # <--- careful, don't log headers as-is (apikey is sensitive) + } + response = requests.get( + url, + headers=headers, + params=querystring, + verify=self.manage_internal_ca_pem_file_path, + ) + if response.status_code != 200: + raise Exception(f"{response.status_code} {response.text}") + + json = response.json() + + if "member" in json and len(json["member"]) > 0 and "maxgroupid" in json["member"][0]: + return json["member"][0]['maxgroupid'] + + return None + + def is_user_in_manage_group(self, group_name, user_id, manage_api_key): + self.logger.debug(f"Checking if {user_id} is a member of Manage group with name {group_name}") + + group_id = self.get_manage_group_id(group_name, manage_api_key) + + if group_id is None: + raise Exception(f"No Manage group found with name {group_name}") + + url = f"{self.manage_api_url_internal}/maximo/api/os/mxapigroup/{group_id}/groupuser" + querystring = { + "lean": 1, + "oslc.where": f"userid=\"{user_id}\"", + } + headers = { + "Accept": "application/json", + "apikey": manage_api_key["apikey"], # <--- careful, don't log headers as-is (apikey is sensitive) + } + + response = requests.get( + url, + headers=headers, + params=querystring, + verify=self.manage_internal_ca_pem_file_path, + ) + + if response.status_code == 200: + json = response.json() + return "member" in json and len(json["member"]) > 0 + + raise Exception(f"{response.status_code} {response.text}") + + def add_user_to_manage_group(self, user_id, group_name, manage_api_key): + ''' + No-op if user_id is already a member of the manage security group + ''' + + if self.is_user_in_manage_group(group_name, user_id, manage_api_key): + self.logger.info(f"User {user_id} is already a member of Manage Security Group {group_name}") + return None + + self.logger.info(f"Adding user {user_id} to Manage group {group_name}") + + group_id = self.get_manage_group_id(group_name, manage_api_key) + + url = f"{self.manage_api_url_internal}/maximo/api/os/mxapigroup/{group_id}" + querystring = { + "lean": 1, + } + headers = { + "Content-Type": "application/json", + "Accept": "application/json", + "x-method-override": "PATCH", + "patchtype": "MERGE", + "apikey": manage_api_key["apikey"], # <--- careful, don't log headers as-is (apikey is sensitive) + } + payload = { + "groupuser": [ + { + "userid": f"{user_id}" + } + ] + } + response = requests.post( + url, + headers=headers, + params=querystring, + json=payload, + verify=self.manage_internal_ca_pem_file_path, + ) + if response.status_code == 204: + return None + + raise Exception(f"{response.status_code} {response.text}") + + def get_mas_applications_in_workspace(self): + self.logger.debug(f"Getting MAS Applications in workspace {self.mas_workspace_id}") + url = f"{self.mas_api_url_internal}/workspaces/{self.mas_workspace_id}/applications" + headers = { + "Accept": "application/json", + "x-access-token": self.superuser_auth_token + } + response = requests.get( + url, + headers=headers, + verify=self.core_internal_ca_pem_file_path + ) + if response.status_code == 200: + return response.json() + raise Exception(f"{response.status_code} {response.text}") + + def get_mas_application_availability(self, mas_application_id): + self.logger.debug(f"Getting availability of MAS Application {mas_application_id} in workspace {self.mas_workspace_id}") + url = f"{self.mas_api_url_internal}/workspaces/{self.mas_workspace_id}/applications/{mas_application_id}" + headers = { + "Accept": "application/json", + "x-access-token": self.superuser_auth_token + } + response = requests.get( + url, + headers=headers, + verify=self.core_internal_ca_pem_file_path + ) + if response.status_code == 200: + return response.json() + raise Exception(f"{response.status_code} {response.text}") + + def await_mas_application_availability(self, mas_application_id, timeout_secs=60 * 10, retry_interval_secs=5): + t_end = time.time() + timeout_secs + self.logger.info(f"Waiting for {mas_application_id} to become ready and available: {t_end - time.time():.2f} seconds remaining") + while time.time() < t_end: + app = self.get_mas_application_availability(mas_application_id) + if "available" in app and "ready" in app and app["ready"] and app["available"]: + return + else: + self.logger.info(f"{mas_application_id} is not ready or available, retry in {retry_interval_secs} seconds: {t_end - time.time():.2f} seconds remaining") + time.sleep(retry_interval_secs) + raise Exception(f"{mas_application_id} did not become ready and available in time, aborting") + + def parse_initial_users_from_aws_secret_json(self, secret_json): + primary = [] + secondary = [] + for (email, csv) in secret_json.items(): + values = csv.split(",") + + if len(values) != 3: + raise Exception(f"Wrong number of CSV values for {email} (expected 3 but got {len(values)})") + + user_type = values[0].strip() + given_name = values[1].strip() + family_name = values[2].strip() + + user = { + "email": email, + "given_name": given_name, + "family_name": family_name + } + if user_type == "primary": + primary.append(user) + elif user_type == "secondary": + secondary.append(user) + else: + raise Exception(f"Unknown user type for {email}: {user_type}") + + initial_users = { + "users": { + "primary": primary, + "secondary": secondary + } + } + return initial_users + + def create_initial_users_for_saas(self, initial_users): + + # Validate input + if "users" not in initial_users: + raise Exception("expected top-level key 'users' not found") + users = initial_users["users"] + if "primary" not in users: + raise Exception("expected key 'users.primary' not found") + primary_users = users["primary"] + if type(primary_users) is not list: + raise Exception("'users.primary' is not a list") + if "secondary" not in users: + raise Exception("expected key 'users.secondary' not found") + secondary_users = users["secondary"] + if type(secondary_users) is not list: + raise Exception("'users.secondary' is not a list") + + if len(primary_users) == 0 and len(secondary_users) == 0: + self.logger.info("No users left to sync, nothing to do") + return {"completed": [], "failed": []} + + # before we do anything, let's check all MAS applications are ready + for mas_application_id in self.mas_workspace_application_ids: + self.await_mas_application_availability(mas_application_id) + + completed = [] + failed = [] + + for primary_user in primary_users: + self.logger.info("") + try: + self.logger.info(f"Syncing primary user {primary_user['email']}") + self.create_initial_user_for_saas(primary_user, "PRIMARY") + completed.append(primary_user) + self.logger.info(f"Completed sync of primary user {primary_user['email']}") + except Exception as e: + self.logger.error(f"Sync of primary user {primary_user['email']} failed: {str(e)}") + failed.append(primary_user) + + for secondary_user in secondary_users: + self.logger.info("") + try: + self.logger.info("") + self.logger.info(f"Syncing secondary user {secondary_user['email']}") + self.create_initial_user_for_saas(secondary_user, "SECONDARY") + completed.append(secondary_user) + self.logger.info(f"Completed sync of secondary user {secondary_user['email']}") + except Exception as e: + self.logger.error(f"Sync of secondary user {secondary_user['email']} failed: {str(e)}") + failed.append(secondary_user) + self.logger.info("") + + return { + "completed": completed, + "failed": failed + } + + def create_initial_user_for_saas(self, user, user_type): + if "email" not in user: + raise Exception("'email' not found in at least one of the user defs") + if "given_name" not in user: + raise Exception("'given_name' not found in at least one of the user defs") + if "family_name" not in user: + raise Exception("'family_name' not found in at least one of the user defs") + + user_email = user["email"] + user_given_name = user["given_name"] + user_family_name = user["family_name"] + + user_id = user_email + username = user_email + # display_name = re.search('^([^@]+)@', user_email).group(1) # local part of the email + display_name = f"{user_given_name} {user_family_name}" + + # Set user permissions and entitlements based on requested user_type + if user_type == "PRIMARY": + permissions = { + "systemAdmin": False, + "userAdmin": True, + "apikeyAdmin": False + } + entitlement = { + "application": "PREMIUM", + "admin": "ADMIN_BASE", + "alwaysReserveLicense": True + } + is_workspace_admin = True + application_role = "ADMIN" + # TODO: check which security groups primary users should be members of + manage_security_groups = ["MAXADMIN"] + elif user_type == "SECONDARY": + permissions = { + "systemAdmin": False, + "userAdmin": False, + "apikeyAdmin": False + } + entitlement = { + "application": "BASE", + "admin": "NONE", + "alwaysReserveLicense": True + } + is_workspace_admin = False + application_role = "USER" + # TODO: check which security groups secondary users should be members of + manage_security_groups = [] + else: + raise Exception(f"Unsupported user_type: {user_type}") + + user_def = { + "id": user_id, + "status": {"active": True}, + "username": username, + "owner": "local", + "emails": [ + { + "value": user_email, + "type": "Work", + "primary": True + } + ], + "displayName": display_name, + "issuer": "local", + "permissions": permissions, + "entitlement": entitlement, + "givenName": user_given_name, + "familyName": user_family_name + } + + self.get_or_create_user(user_def) + self.link_user_to_local_idp(user_id, email_password=True) + self.add_user_to_workspace(user_id, is_workspace_admin=is_workspace_admin) + + for mas_application_id in self.mas_workspace_application_ids: + self.await_mas_application_availability(mas_application_id) + if mas_application_id == "manage": + # special case for manage; role is always "MANAGEUSER" + role = "MANAGEUSER" + else: + # otherwise grant the user the appropriate role for their user_type + role = application_role + self.set_user_application_permission(user_id, mas_application_id, role) + + for mas_application_id in self.mas_workspace_application_ids: + self.check_user_sync(user_id, mas_application_id) + + if len(manage_security_groups) > 0 and "manage" in self.mas_workspace_application_ids: + maxadmin_manage_api_key = self.create_or_get_manage_api_key_for_user(MASUserUtils.MAXADMIN, temporary=True) + for manage_security_group in manage_security_groups: + self.add_user_to_manage_group(user_id, manage_security_group, maxadmin_manage_api_key) diff --git a/test/src/test_users.py b/test/src/test_users.py new file mode 100644 index 00000000..7be29e60 --- /dev/null +++ b/test/src/test_users.py @@ -0,0 +1,1802 @@ +# ***************************************************************************** +# Copyright (c) 2024 IBM Corporation and other Contributors. +# +# All rights reserved. This program and the accompanying materials +# are made available under the terms of the Eclipse Public License v1.0 +# which accompanies this distribution, and is available at +# http://www.eclipse.org/legal/epl-v10.html +# +# ***************************************************************************** + +import pytest +import base64 +from unittest.mock import MagicMock, patch, call +from pytest import fixture + +import os + +from mas.devops.users import MASUserUtils + +SUPERUSER_USERNAME = "superuser_username" +SUPERUSER_PASSWORD = "superuser_password" # pragma: allowlist secret + + +ADMINDASHBOARD_CA_CRT = "admindashboard-ca" +COREAPI_CA_CRT = "coreapi-ca" +MANAGE_CA_CRT = "manage-ca" +MANAGE_TLS_CRT = "manage-tls-crt" +MANAGE_TLS_KEY = "manage-tls-key" + +TOKEN = "TOKEN" +MAS_INSTANCE_ID = "inst1" +MAS_WORKSPACE_ID = "masdev" + +MAS_CORE_NAMESPACE = f"mas-{MAS_INSTANCE_ID}-core" +MANAGE_NAMESPACE = f"mas-{MAS_INSTANCE_ID}-manage" + +ADMIN_DASHBOARD_PORT = 1 +COREAPI_PORT = 2 +MANAGE_API_PORT = 3 + +MAS_ADMIN_URL = f"https://admin-dashboard.{MAS_CORE_NAMESPACE}.svc.cluster.local:{ADMIN_DASHBOARD_PORT}" +MAS_API_URL = f'https://coreapi.{MAS_CORE_NAMESPACE}.svc.cluster.local:{COREAPI_PORT}' +MANAGE_API_URL = f'https://{MAS_INSTANCE_ID}-{MAS_WORKSPACE_ID}.{MANAGE_NAMESPACE}.svc.cluster.local:{MANAGE_API_PORT}' + +PEM_PATH = "pempath" + + +def additional_matcher(req, json=None, verify=PEM_PATH, cert=None): + if json is not None: + assert req.json() == json + assert req.verify == verify + assert req.cert == cert + return True + + +def get_secret(name, namespace): + if name == f"{MAS_INSTANCE_ID}-credentials-superuser": + data = { + "username": base64.b64encode(SUPERUSER_USERNAME.encode("utf-8")), + "password": base64.b64encode(SUPERUSER_PASSWORD.encode("utf-8")), + } + + if name == f"{MAS_INSTANCE_ID}-admindashboard-cert-internal": + data = { + "ca.crt": base64.b64encode(ADMINDASHBOARD_CA_CRT.encode("utf-8")) + } + + if name == f"{MAS_INSTANCE_ID}-coreapi-cert-internal": + data = { + "ca.crt": base64.b64encode(COREAPI_CA_CRT.encode("utf-8")) + } + + if name == f"{MAS_INSTANCE_ID}-internal-manage-tls": + data = { + "ca.crt": base64.b64encode(MANAGE_CA_CRT.encode("utf-8")), + "tls.crt": base64.b64encode(MANAGE_TLS_CRT.encode("utf-8")), + "tls.key": base64.b64encode(MANAGE_TLS_KEY.encode("utf-8")), + } + + return MagicMock( + data=data + ) + + +@fixture +def mock_atexit(): + with patch('atexit.register') as mock_atexit: + yield mock_atexit + + +@fixture +def mock_named_temporary_file(mock_atexit): + with patch('tempfile.NamedTemporaryFile') as mock_named_temporary_file: + mock_file = MagicMock() + mock_file.name = PEM_PATH + mock_named_temporary_file.return_value.__enter__.return_value = mock_file + yield mock_file + + +@fixture +def mock_v1_secrets(): + with patch('mas.devops.users.DynamicClient') as mock_DynamicClientCls: + mock_DynamicClient = mock_DynamicClientCls.return_value + mock_v1_secrets = mock_DynamicClient.resources.get.return_value + mock_v1_secrets.get.side_effect = get_secret + yield mock_v1_secrets + + +@fixture +def mock_logininitial_endpoint(requests_mock): + yield requests_mock.post( + f"{MAS_ADMIN_URL}/logininitial", + json=dict(token=TOKEN), + additional_matcher=lambda req: additional_matcher(req, json={"username": SUPERUSER_USERNAME, "password": SUPERUSER_PASSWORD}) + ) + + +@fixture +def user_utils(mock_v1_secrets, mock_logininitial_endpoint, mock_named_temporary_file, mock_atexit): + k8s_client = MagicMock() # DynamicClient is mocked out, no methods will be called on the k8s_client + user_utils = MASUserUtils( + MAS_INSTANCE_ID, + MAS_WORKSPACE_ID, + k8s_client, + coreapi_port=COREAPI_PORT, + admin_dashboard_port=ADMIN_DASHBOARD_PORT, + manage_api_port=MANAGE_API_PORT + ) + + yield user_utils + + +@fixture +def mock_manage_api_key(requests_mock): + ''' + Setup mock Manage APIs for setting up an API Key + ''' + user_id = "user1" + apikey = {"userid": user_id, "href": f"https://{MANAGE_API_URL}/maximo/api/os/mxapikey/theapikeyid"} + + requests_mock.post( + f"{MANAGE_API_URL}/maximo/api/os/mxapiapikey?ccm=1&lean=1", + request_headers={"content-type": "application/json"}, + json={"id": user_id}, + status_code=201, + additional_matcher=lambda req: additional_matcher(req, json={"expiration": -1, "userid": user_id}, cert=PEM_PATH) + ) + + requests_mock.get( + f"{MANAGE_API_URL}/maximo/api/os/mxapiapikey?ccm=1&lean=1&oslc.select=*&oslc.where=userid=\"{user_id}\"", + request_headers={"accept": "application/json"}, + json={"member": [apikey]}, + status_code=200, + additional_matcher=lambda req: additional_matcher(req, cert=PEM_PATH) + ) + + yield apikey + + +def test_admin_internal_ca_pem_file_path(user_utils, mock_named_temporary_file, mock_atexit): + assert str(user_utils.admin_internal_ca_pem_file_path) == PEM_PATH + assert mock_named_temporary_file.mock_calls == [call.write(ADMINDASHBOARD_CA_CRT.encode()), call.flush(), call.close()] + assert mock_atexit.mock_calls == [call(os.remove, PEM_PATH)] + + # verify caching + assert str(user_utils.admin_internal_ca_pem_file_path) == PEM_PATH + assert mock_named_temporary_file.mock_calls == [call.write(ADMINDASHBOARD_CA_CRT.encode()), call.flush(), call.close()] + assert mock_atexit.mock_calls == [call(os.remove, PEM_PATH)] + + +def mock_get_user(requests_mock, user_id, json, status_code): + return requests_mock.get( + f"{MAS_API_URL}/v3/users/{user_id}", + request_headers={"x-access-token": TOKEN}, + json=json, + status_code=status_code, + additional_matcher=lambda req: additional_matcher(req) + ) + + +def mock_get_user_200(requests_mock, user_id): + return mock_get_user( + requests_mock, user_id, {"id": user_id, "displayName": user_id}, 200 + ) + + +def mock_get_user_404(requests_mock, user_id): + return mock_get_user( + requests_mock, user_id, {"error": "notfound"}, 404 + ) + + +def mock_get_user_500(requests_mock, user_id): + return mock_get_user( + requests_mock, user_id, {"error": "internal"}, 500 + ) + + +def test_mas_superuser_credentials(user_utils, mock_v1_secrets): + assert mock_v1_secrets.get.call_count == 0 + assert user_utils.mas_superuser_credentials == {"username": SUPERUSER_USERNAME, "password": SUPERUSER_PASSWORD} + assert mock_v1_secrets.get.call_count == 1 + # verify caching is working + assert user_utils.mas_superuser_credentials == {"username": SUPERUSER_USERNAME, "password": SUPERUSER_PASSWORD} + assert mock_v1_secrets.get.call_count == 1 + + +def test_admin_internal_tls_secret(user_utils, mock_v1_secrets): + assert mock_v1_secrets.get.call_count == 0 + assert user_utils.admin_internal_tls_secret.data["ca.crt"] == base64.b64encode(ADMINDASHBOARD_CA_CRT.encode('utf-8')) + assert mock_v1_secrets.get.call_count == 1 + assert user_utils.admin_internal_tls_secret.data["ca.crt"] == base64.b64encode(ADMINDASHBOARD_CA_CRT.encode('utf-8')) + assert mock_v1_secrets.get.call_count == 1 + + +def test_core_internal_tls_secret(user_utils, mock_v1_secrets): + assert mock_v1_secrets.get.call_count == 0 + assert user_utils.core_internal_tls_secret.data["ca.crt"] == base64.b64encode(COREAPI_CA_CRT.encode('utf-8')) + assert mock_v1_secrets.get.call_count == 1 + assert user_utils.core_internal_tls_secret.data["ca.crt"] == base64.b64encode(COREAPI_CA_CRT.encode('utf-8')) + assert mock_v1_secrets.get.call_count == 1 + + +def test_core_internal_ca_pem_file_path(user_utils, mock_named_temporary_file, mock_atexit): + ''' + Check the correct content is written to core_internal_ca_pem_file_path tempfile, that an exit handler is registered to + delete the temp file, and that the tempfile is only written once (with its path cached) + ''' + assert str(user_utils.core_internal_ca_pem_file_path) == PEM_PATH + assert mock_named_temporary_file.mock_calls == [call.write(COREAPI_CA_CRT.encode()), call.flush(), call.close()] + assert mock_atexit.mock_calls == [call(os.remove, PEM_PATH)] + + # verify caching + assert str(user_utils.core_internal_ca_pem_file_path) == PEM_PATH + assert mock_named_temporary_file.mock_calls == [call.write(COREAPI_CA_CRT.encode()), call.flush(), call.close()] + assert mock_atexit.mock_calls == [call(os.remove, PEM_PATH)] + + +def test_superuser_auth_token(user_utils, mock_logininitial_endpoint): + assert mock_logininitial_endpoint.call_count == 0 + assert user_utils.superuser_auth_token == TOKEN + assert mock_logininitial_endpoint.call_count == 1 + + # verify caching + user_utils.superuser_auth_token + assert mock_logininitial_endpoint.call_count == 1 + + +def test_manage_internal_tls_secret(user_utils, mock_v1_secrets): + assert mock_v1_secrets.get.call_count == 0 + assert user_utils.manage_internal_tls_secret.data["ca.crt"] == base64.b64encode(MANAGE_CA_CRT.encode('utf-8')) + assert user_utils.manage_internal_tls_secret.data["tls.crt"] == base64.b64encode(MANAGE_TLS_CRT.encode('utf-8')) + assert user_utils.manage_internal_tls_secret.data["tls.key"] == base64.b64encode(MANAGE_TLS_KEY.encode('utf-8')) + assert mock_v1_secrets.get.call_count == 1 + assert user_utils.manage_internal_tls_secret.data["ca.crt"] == base64.b64encode(MANAGE_CA_CRT.encode('utf-8')) + assert user_utils.manage_internal_tls_secret.data["tls.crt"] == base64.b64encode(MANAGE_TLS_CRT.encode('utf-8')) + assert user_utils.manage_internal_tls_secret.data["tls.key"] == base64.b64encode(MANAGE_TLS_KEY.encode('utf-8')) + assert mock_v1_secrets.get.call_count == 1 + + +def test_manage_internal_client_pem_file_path(user_utils, mock_named_temporary_file, mock_atexit): + assert str(user_utils.manage_internal_client_pem_file_path) == PEM_PATH + assert mock_named_temporary_file.mock_calls == [call.write(MANAGE_TLS_KEY.encode()), call.write(MANAGE_TLS_CRT.encode()), call.flush(), call.close()] + assert mock_atexit.mock_calls == [call(os.remove, PEM_PATH)] + + # verify caching + assert str(user_utils.manage_internal_client_pem_file_path) == PEM_PATH + assert mock_named_temporary_file.mock_calls == [call.write(MANAGE_TLS_KEY.encode()), call.write(MANAGE_TLS_CRT.encode()), call.flush(), call.close()] + assert mock_atexit.mock_calls == [call(os.remove, PEM_PATH)] + + +def test_manage_internal_ca_pem_file_path(user_utils, mock_named_temporary_file, mock_atexit): + assert str(user_utils.manage_internal_ca_pem_file_path) == PEM_PATH + assert mock_named_temporary_file.mock_calls == [call.write(MANAGE_CA_CRT.encode()), call.flush(), call.close()] + assert mock_atexit.mock_calls == [call(os.remove, PEM_PATH)] + + # verify caching + assert str(user_utils.manage_internal_ca_pem_file_path) == PEM_PATH + assert mock_named_temporary_file.mock_calls == [call.write(MANAGE_CA_CRT.encode()), call.flush(), call.close()] + assert mock_atexit.mock_calls == [call(os.remove, PEM_PATH)] + + +def test_mas_workspace_application_ids(user_utils, requests_mock): + get = requests_mock.get( + f"{MAS_API_URL}/workspaces/{MAS_WORKSPACE_ID}/applications", + request_headers={"x-access-token": TOKEN}, + json=[{"id": "manage"}, {"id": "iot"}], + status_code=200 + ) + assert user_utils.mas_workspace_application_ids == ["manage", "iot"] + assert get.call_count == 1 + + # verify caching + assert user_utils.mas_workspace_application_ids == ["manage", "iot"] + assert get.call_count == 1 + + +def test_get_user_exists(user_utils, requests_mock): + user_id = "user1" + get = mock_get_user_200(requests_mock, user_id) + assert user_utils.get_user(user_id) == {"id": user_id, "displayName": user_id} + assert get.call_count == 1 + + +def test_get_user_notfound(user_utils, requests_mock): + user_id = "user1" + get = mock_get_user_404(requests_mock, user_id) + assert user_utils.get_user(user_id) is None + assert get.call_count == 1 + + +def test_get_user_error(user_utils, requests_mock): + user_id = "user1" + get = mock_get_user_500(requests_mock, user_id) + with pytest.raises(Exception): + user_utils.get_user(user_id) + assert get.call_count == 1 + + +def test_get_or_create_user_exists(user_utils, requests_mock): + user_id = "user1" + get = mock_get_user_200(requests_mock, user_id) + + post = requests_mock.post( + f"{MAS_API_URL}/v3/users", + request_headers={"x-access-token": TOKEN}, + json={"id": user_id}, + status_code=201, + additional_matcher=lambda req: additional_matcher(req, json={"id": user_id}) + ) + + assert user_utils.get_or_create_user({"id": user_id}) == {"id": user_id, "displayName": user_id} + assert get.call_count == 1 + assert post.call_count == 0 + + +def test_get_or_create_user_notfound(user_utils, requests_mock): + user_id = "user1" + get = mock_get_user_404(requests_mock, user_id) + + post = requests_mock.post( + f"{MAS_API_URL}/v3/users", + request_headers={"x-access-token": TOKEN}, + json={"id": user_id, "displayName": user_id}, + status_code=201, + additional_matcher=lambda req: additional_matcher(req, json={"id": user_id}) + ) + + assert user_utils.get_or_create_user({"id": user_id}) == {"id": user_id, "displayName": user_id} + assert get.call_count == 1 + assert post.call_count == 1 + + +def test_get_or_create_user_error(user_utils, requests_mock): + user_id = "user1" + get = mock_get_user_404(requests_mock, user_id) + post = requests_mock.post( + f"{MAS_API_URL}/v3/users", + request_headers={"x-access-token": TOKEN}, + json={"error": "unknown"}, + status_code=500, + additional_matcher=lambda req: additional_matcher(req, json={"id": user_id}) + ) + + with pytest.raises(Exception): + user_utils.get_or_create_user({"id": user_id}) + assert get.call_count == 1 + assert post.call_count == 1 + + +def test_update_user(user_utils, requests_mock): + user_id = "user1" + put = requests_mock.put( + f"{MAS_API_URL}/v3/users/{user_id}", + request_headers={"x-access-token": TOKEN}, + json={"id": user_id}, + status_code=200, + additional_matcher=lambda req: additional_matcher(req, json={"id": user_id}) + ) + user_utils.update_user({"id": user_id}) + assert put.call_count == 1 + + +def test_update_user_error(user_utils, requests_mock): + user_id = "user1" + put = requests_mock.put( + f"{MAS_API_URL}/v3/users/{user_id}", + request_headers={"x-access-token": TOKEN}, + json={"error": "nofound"}, + status_code=404, + additional_matcher=lambda req: additional_matcher(req, json={"id": user_id}) + ) + with pytest.raises(Exception): + user_utils.update_user({"id": user_id}) + assert put.call_count == 1 + + +def test_update_user_display_name(user_utils, requests_mock): + user_id = "user1" + patche = requests_mock.patch( + f"{MAS_API_URL}/v3/users/{user_id}", + request_headers={"x-access-token": TOKEN}, + json={"id": user_id}, + status_code=200, + additional_matcher=lambda req: additional_matcher(req, json={"displayName": "display_name"}) + ) + user_utils.update_user_display_name(user_id, "display_name") + assert patche.call_count == 1 + + +def test_update_user_display_name_error(user_utils, requests_mock): + user_id = "user1" + patche = requests_mock.patch( + f"{MAS_API_URL}/v3/users/{user_id}", + request_headers={"x-access-token": TOKEN}, + json={"error": "notfound"}, + status_code=404, + additional_matcher=lambda req: additional_matcher(req, json={"displayName": "display_name"}) + ) + with pytest.raises(Exception): + user_utils.update_user_display_name(user_id, "display_name") + assert patche.call_count == 1 + + +def test_link_user_to_local_idp(user_utils, requests_mock): + user_id = "user1" + email_password = True + get = mock_get_user_200(requests_mock, user_id) + + put = requests_mock.put( + f"{MAS_API_URL}/v3/users/{user_id}/idps/local?emailPassword={email_password}", + request_headers={"x-access-token": TOKEN}, + json={"id": user_id}, + status_code=200, + additional_matcher=lambda req: additional_matcher(req, json={"idpUserId": user_id}) + ) + + user_utils.link_user_to_local_idp(user_id, email_password=email_password) + + assert get.call_count == 1 + assert put.call_count == 1 + + +def test_link_user_to_local_idp_usernotfound(user_utils, requests_mock): + user_id = "user1" + get = mock_get_user_404(requests_mock, user_id) + put = requests_mock.put( + f"{MAS_API_URL}/v3/users/{user_id}/idps/local", + additional_matcher=lambda req: additional_matcher(req, json={"idpUserId": user_id}) + ) + + with pytest.raises(Exception): + user_utils.link_user_to_local_idp(user_id) + + assert get.call_count == 1 + assert put.call_count == 0 + + +def test_link_user_to_local_idp_already_linked(user_utils, requests_mock): + user_id = "user1" + email_password = True + get = mock_get_user( + requests_mock, user_id, {"id": user_id, "identities": {"_local": {}}}, 200 + ) + + put = requests_mock.put( + f"{MAS_API_URL}/v3/users/{user_id}/idps/local?emailPassword={email_password}", + request_headers={"x-access-token": TOKEN}, + json={"identities": {}}, + status_code=200, + additional_matcher=lambda req: additional_matcher(req, json={"idpUserId": user_id}) + ) + + user_utils.link_user_to_local_idp(user_id, email_password=email_password) + + assert get.call_count == 1 + assert put.call_count == 0 + + +def test_get_user_workspaces(user_utils, requests_mock): + user_id = "user1" + get = requests_mock.get( + f"{MAS_API_URL}/v3/users/{user_id}/workspaces", + request_headers={"x-access-token": TOKEN}, + json=[{"id": "masdev"}], + status_code=200, + additional_matcher=lambda req: additional_matcher(req) + ) + workspaces = user_utils.get_user_workspaces(user_id) + assert workspaces == [{"id": "masdev"}] + assert get.call_count == 1 + + +def test_get_user_workspaces_usernotfound(user_utils, requests_mock): + user_id = "user1" + get = requests_mock.get( + f"{MAS_API_URL}/v3/users/{user_id}/workspaces", + request_headers={"x-access-token": TOKEN}, + json={}, + status_code=404, + additional_matcher=lambda req: additional_matcher(req) + ) + with pytest.raises(Exception): + user_utils.get_user_workspaces(user_id) + assert get.call_count == 1 + + +def test_get_user_workspaces_error(user_utils, requests_mock): + user_id = "user1" + get = requests_mock.get( + f"{MAS_API_URL}/v3/users/{user_id}/workspaces", + request_headers={"x-access-token": TOKEN}, + json={"error": "internal"}, + status_code=500, + additional_matcher=lambda req: additional_matcher(req) + ) + with pytest.raises(Exception): + user_utils.get_user_workspaces(user_id) + assert get.call_count == 1 + + +def test_add_user_to_workspace_already_a_member(user_utils, requests_mock): + user_id = "user1" + get = requests_mock.get( + f"{MAS_API_URL}/v3/users/{user_id}/workspaces", + request_headers={"x-access-token": TOKEN}, + json=[{"id": "someotherworkspace"}, {"id": MAS_WORKSPACE_ID}], + status_code=200, + additional_matcher=lambda req: additional_matcher(req) + ) + put = requests_mock.put( + f"{MAS_API_URL}/workspaces/{MAS_WORKSPACE_ID}/users/{user_id}", + request_headers={"x-access-token": TOKEN}, + json=[{"id": "masdev"}], + status_code=200, + additional_matcher=lambda req: additional_matcher(req, json={"permissions": {"workspaceAdmin": True}}) + ) + user_utils.add_user_to_workspace(user_id, is_workspace_admin=True) + assert get.call_count == 1 + assert put.call_count == 0 + + +def test_add_user_to_workspace(user_utils, requests_mock): + user_id = "user1" + get = requests_mock.get( + f"{MAS_API_URL}/v3/users/{user_id}/workspaces", + request_headers={"x-access-token": TOKEN}, + json=[{"id": "someotherworkspace"}], + status_code=200 + ) + put = requests_mock.put( + f"{MAS_API_URL}/workspaces/{MAS_WORKSPACE_ID}/users/{user_id}", + request_headers={"x-access-token": TOKEN}, + json={}, + status_code=200, + additional_matcher=lambda req: additional_matcher(req, json={"permissions": {"workspaceAdmin": True}}) + ) + user_utils.add_user_to_workspace(user_id, is_workspace_admin=True) + assert get.call_count == 1 + assert put.call_count == 1 + + +def test_add_user_to_workspace_error(user_utils, requests_mock): + user_id = "user1" + get = requests_mock.get( + f"{MAS_API_URL}/v3/users/{user_id}/workspaces", + request_headers={"x-access-token": TOKEN}, + json=[{"id": "someotherworkspace"}], + status_code=200 + ) + put = requests_mock.put( + f"{MAS_API_URL}/workspaces/{MAS_WORKSPACE_ID}/users/{user_id}", + request_headers={"x-access-token": TOKEN}, + json={"error": "internal"}, + status_code=500, + additional_matcher=lambda req: additional_matcher(req, json={"permissions": {"workspaceAdmin": True}}) + ) + with pytest.raises(Exception): + user_utils.add_user_to_workspace(user_id, is_workspace_admin=True) + assert get.call_count == 1 + assert put.call_count == 1 + + +def test_get_user_application_permissions(user_utils, requests_mock): + user_id = "user1" + application_id = "manage" + response_json = { + "role": "USER", + "userId": user_id, + "workspaceId": MAS_WORKSPACE_ID, + "userUrl": "https://api.yourmasdomain.com/users/joebloggs", + "workspaceUrl": "https://api.yourmasdomain.com/workspaces/myworkspace1" + } + get = requests_mock.get( + f"{MAS_API_URL}/workspaces/{MAS_WORKSPACE_ID}/applications/{application_id}/users/{user_id}", + request_headers={"x-access-token": TOKEN}, + json=response_json, + status_code=200, + additional_matcher=lambda req: additional_matcher(req) + ) + assert user_utils.get_user_application_permissions(user_id, application_id) == response_json + assert get.call_count == 1 + + +def test_get_user_application_permissions_notfound(user_utils, requests_mock): + user_id = "user1" + application_id = "manage" + get = requests_mock.get( + f"{MAS_API_URL}/workspaces/{MAS_WORKSPACE_ID}/applications/{application_id}/users/{user_id}", + request_headers={"x-access-token": TOKEN}, + json={"error": "notfound"}, + status_code=404, + additional_matcher=lambda req: additional_matcher(req) + ) + assert user_utils.get_user_application_permissions(user_id, application_id) is None + assert get.call_count == 1 + + +def test_get_user_application_permissions_error(user_utils, requests_mock): + user_id = "user1" + application_id = "manage" + get = requests_mock.get( + f"{MAS_API_URL}/workspaces/{MAS_WORKSPACE_ID}/applications/{application_id}/users/{user_id}", + request_headers={"x-access-token": TOKEN}, + json={"error": "internal"}, + status_code=500, + additional_matcher=lambda req: additional_matcher(req) + ) + with pytest.raises(Exception): + user_utils.get_user_application_permissions(user_id, application_id) + assert get.call_count == 1 + + +def test_set_user_application_permissions(user_utils, requests_mock): + user_id = "user1" + application_id = "manage" + get = requests_mock.get( + f"{MAS_API_URL}/workspaces/{MAS_WORKSPACE_ID}/applications/{application_id}/users/{user_id}", + request_headers={"x-access-token": TOKEN}, + json={"error": "notfound"}, + status_code=404, + additional_matcher=lambda req: additional_matcher(req) + ) + put = requests_mock.put( + f"{MAS_API_URL}/workspaces/{MAS_WORKSPACE_ID}/applications/{application_id}/users/{user_id}", + request_headers={"x-access-token": TOKEN}, + json={}, + status_code=200, + additional_matcher=lambda req: additional_matcher(req, json={"role": "USER"}) + ) + user_utils.set_user_application_permission(user_id, application_id, "USER") + assert get.call_count == 1 + assert put.call_count == 1 + + +def test_set_user_application_permissions_alreadyset(user_utils, requests_mock): + user_id = "user1" + application_id = "manage" + get_response_json = { + "role": "ADMINISTRATOR", + "userId": user_id, + "workspaceId": MAS_WORKSPACE_ID, + "userUrl": "https://api.yourmasdomain.com/users/joebloggs", + "workspaceUrl": "https://api.yourmasdomain.com/workspaces/myworkspace1" + } + get = requests_mock.get( + f"{MAS_API_URL}/workspaces/{MAS_WORKSPACE_ID}/applications/{application_id}/users/{user_id}", + request_headers={"x-access-token": TOKEN}, + json=get_response_json, + status_code=200, + additional_matcher=lambda req: additional_matcher(req) + ) + put = requests_mock.put( + f"{MAS_API_URL}/workspaces/{MAS_WORKSPACE_ID}/applications/{application_id}/users/{user_id}", + request_headers={"x-access-token": TOKEN}, + json={}, + status_code=200, + additional_matcher=lambda req: additional_matcher(req, json={"role": "USER"}) + ) + user_utils.set_user_application_permission(user_id, application_id, "USER") + assert get.call_count == 1 + assert put.call_count == 0 + + +def test_resync_users(user_utils, requests_mock): + user_ids = ["user1", "user2"] + + gets = [] + patches = [] + for user_id in user_ids: + gets.append(mock_get_user_200(requests_mock, user_id)) + + patches.append( + requests_mock.patch( + f"{MAS_API_URL}/v3/users/{user_id}", + request_headers={"x-access-token": TOKEN}, + json={"id": user_id}, + status_code=200, + # uid=user_id captures the current value of user_id during each loop iteration, ensuring that the lambda uses the correct value when it is eventually called. + additional_matcher=lambda req, uid=user_id: additional_matcher(req, json={"displayName": uid}) + ) + ) + + user_utils.resync_users(user_ids) + + for get in gets: + assert get.call_count == 1 + + for patche in patches: + assert patche.call_count == 1 + + +def test_check_user_sync(user_utils, requests_mock): + user_id = "user1" + application_id = "manage" + + # transitions from PENDING -> SUCCESS on the third call + attempts = 0 + + def json_callback(request, context): + nonlocal attempts + if attempts >= 2: + state = "SUCCESS" + else: + state = "PENDING" + attempts = attempts + 1 + return { + "id": user_id, + "applications": { + "other": { + "sync": { + "state": "ERROR" + } + }, + application_id: { + "sync": { + "state": state + } + } + } + } + + get = mock_get_user( + requests_mock, + user_id, + json_callback, + 200 + ) + + user_utils.check_user_sync(user_id, application_id, timeout_secs=8, retry_interval_secs=0) + assert get.call_count == 3 + + +def test_check_user_sync_timeout(user_utils, requests_mock): + user_id = "user1" + application_id = "manage" + + get = mock_get_user( + requests_mock, + user_id, + { + "id": user_id, + "applications": { + "other": { + "sync": { + "state": "ERROR" + } + }, + application_id: { + "sync": { + "state": "PENDING" + } + } + } + }, + 200 + ) + with pytest.raises(Exception) as excinfo: + user_utils.check_user_sync(user_id, application_id, timeout_secs=0.3, retry_interval_secs=0.05) + assert str(excinfo.value) == f"User {user_id} sync failed to complete for app within {0.3} seconds" + assert get.call_count > 1 + + +def test_check_user_sync_appstate_notfound(user_utils, requests_mock): + user_id = "user1" + application_id = "manage" + + # first call (made bvy check_user_sync) returns user record with missing sync status for app + # subsequent calls will include sync status and so should succeed + # a single resync should have been triggered + attempts = 0 + + def json_callback(request, context): + nonlocal attempts + if attempts >= 1: + ret = { + "id": user_id, + "displayName": user_id, + "applications": { + "other": { + "sync": { + "state": "ERROR" + } + }, + application_id: { + "sync": { + "state": "SUCCESS" + } + } + } + } + else: + ret = { + "id": user_id, + "displayName": user_id, + "applications": { + "other": { + "sync": { + "state": "ERROR" + } + }, + } + } + attempts = attempts + 1 + return ret + + patche = requests_mock.patch( + f"{MAS_API_URL}/v3/users/{user_id}", + request_headers={"x-access-token": TOKEN}, + json={"id": user_id}, + status_code=200 + ) + + get = mock_get_user( + requests_mock, + user_id, + json_callback, + 200 + ) + + user_utils.check_user_sync(user_id, application_id, timeout_secs=8, retry_interval_secs=0) + assert get.call_count == 3 + + # a single resync should have been triggered + assert patche.call_count == 1 + + +def test_check_user_sync_appstate_transient_error(user_utils, requests_mock): + user_id = "user1" + application_id = "manage" + + # first call (made bvy check_user_sync) returns user record with sync state error + # subsequent calls will have successful sync state and so should succeed + # a single resync should have been triggered + attempts = 0 + + def json_callback(request, context): + nonlocal attempts + if attempts >= 1: + ret = { + "id": user_id, + "displayName": user_id, + "applications": { + application_id: { + "sync": { + "state": "SUCCESS" + } + } + } + } + else: + ret = { + "id": user_id, + "displayName": user_id, + "applications": { + application_id: { + "sync": { + "state": "ERROR" + } + } + } + } + attempts = attempts + 1 + return ret + + patche = requests_mock.patch( + f"{MAS_API_URL}/v3/users/{user_id}", + request_headers={"x-access-token": TOKEN}, + json={"id": user_id}, + status_code=200 + ) + + get = mock_get_user( + requests_mock, + user_id, + json_callback, + 200 + ) + + user_utils.check_user_sync(user_id, application_id, timeout_secs=8, retry_interval_secs=0) + assert get.call_count == 3 + + # a single resync should have been triggered + assert patche.call_count == 1 + + +def test_check_user_sync_appstate_persistent_error(user_utils, requests_mock): + user_id = "user1" + application_id = "manage" + + patche = requests_mock.patch( + f"{MAS_API_URL}/v3/users/{user_id}", + request_headers={"x-access-token": TOKEN}, + json={"id": user_id}, + status_code=200 + ) + + get = mock_get_user( + requests_mock, + user_id, + { + "id": user_id, + "displayName": user_id, + "applications": { + application_id: { + "sync": { + "state": "ERROR" + } + } + } + }, + 200 + ) + + with pytest.raises(Exception) as excinfo: + user_utils.check_user_sync(user_id, application_id, timeout_secs=0.3, retry_interval_secs=0.05) + assert str(excinfo.value) == f"User {user_id} sync failed to complete for app within {0.3} seconds" + assert get.call_count > 1 + + # an "update_user_display_name" should have been triggered for every 2 get calls (1 call by check_user_sync, 1 by resync) + assert patche.call_count == get.call_count / 2 + + +def test_get_manage_api_key_for_user_exists(user_utils, requests_mock): + user_id = "user1" + apikey = {"userid": user_id, "href": f"https://{MANAGE_API_URL}/maximo/api/os/mxapikey/theapikeyid"} + + get = requests_mock.get( + f"{MANAGE_API_URL}/maximo/api/os/mxapiapikey?ccm=1&lean=1&oslc.select=*&oslc.where=userid=\"{user_id}\"", + request_headers={"accept": "application/json"}, + json={"member": [apikey]}, + status_code=200, + additional_matcher=lambda req: additional_matcher(req, cert=PEM_PATH) + ) + + assert user_utils.get_manage_api_key_for_user(user_id) == apikey + assert get.call_count == 1 + + +def test_get_manage_api_key_for_user_notfound(user_utils, requests_mock): + user_id = "user1" + + get = requests_mock.get( + f"{MANAGE_API_URL}/maximo/api/os/mxapiapikey?ccm=1&lean=1&oslc.select=*&oslc.where=userid=\"{user_id}\"", + request_headers={"accept": "application/json"}, + json={"member": []}, + status_code=200, + additional_matcher=lambda req: additional_matcher(req, cert=PEM_PATH) + ) + + assert user_utils.get_manage_api_key_for_user(user_id) is None + assert get.call_count == 1 + + +def test_get_manage_api_key_for_user_error(user_utils, requests_mock): + user_id = "user1" + + get = requests_mock.get( + f"{MANAGE_API_URL}/maximo/api/os/mxapiapikey?ccm=1&lean=1&oslc.select=*&oslc.where=userid=\"{user_id}\"", + request_headers={"accept": "application/json"}, + text="boom", + status_code=500, + additional_matcher=lambda req: additional_matcher(req, cert=PEM_PATH) + ) + + with pytest.raises(Exception) as excinfo: + user_utils.get_manage_api_key_for_user(user_id) + assert str(excinfo.value) == "500 boom" + assert get.call_count == 1 + + +@pytest.mark.parametrize("temporary", [(True), (False)]) +def test_create_or_get_manage_api_key_for_user_new_api_key(temporary, user_utils, requests_mock, mock_atexit): + user_id = "user1" + apikey = {"userid": user_id, "href": f"https://{MANAGE_API_URL}/maximo/api/os/mxapikey/theapikeyid"} + + post = requests_mock.post( + f"{MANAGE_API_URL}/maximo/api/os/mxapiapikey?ccm=1&lean=1", + request_headers={"content-type": "application/json"}, + json={"id": user_id}, + status_code=201, + additional_matcher=lambda req: additional_matcher(req, json={"expiration": -1, "userid": user_id}, cert=PEM_PATH) + ) + + get = requests_mock.get( + f"{MANAGE_API_URL}/maximo/api/os/mxapiapikey?ccm=1&lean=1&oslc.select=*&oslc.where=userid=\"{user_id}\"", + request_headers={"accept": "application/json"}, + json={"member": [apikey]}, + status_code=200, + additional_matcher=lambda req: additional_matcher(req, cert=PEM_PATH) + ) + + assert user_utils.create_or_get_manage_api_key_for_user(user_id, temporary=temporary) == apikey + assert post.call_count == 1 + assert get.call_count == 1 + + # if temporary, check we registered the exit hook to delete the temporary Manage API Key + if temporary: + assert call(user_utils.delete_manage_api_key, apikey) in mock_atexit.mock_calls, "delete_manage_api_key exit hook not registered for temporary api key that we created" + else: + assert call(user_utils.delete_manage_api_key, apikey) not in mock_atexit.mock_calls, "delete_manage_api_key exit hook registered unexpectedly for non-temporary api key that we created" + + +@pytest.mark.parametrize("temporary", [(True), (False)]) +def test_create_or_get_manage_api_key_for_user_existing_api_key(temporary, user_utils, requests_mock, mock_atexit): + user_id = "user1" + apikey = {"userid": user_id, "href": f"https://{MANAGE_API_URL}/maximo/api/os/mxapikey/theapikeyid"} + + post = requests_mock.post( + f"{MANAGE_API_URL}/maximo/api/os/mxapiapikey?ccm=1&lean=1", + request_headers={"content-type": "application/json"}, + json={"Error": {"reasonCode": "BMXAA10051E"}}, + status_code=400, + additional_matcher=lambda req: additional_matcher(req, json={"expiration": -1, "userid": user_id}, cert=PEM_PATH) + ) + + get = requests_mock.get( + f"{MANAGE_API_URL}/maximo/api/os/mxapiapikey?ccm=1&lean=1&oslc.select=*&oslc.where=userid=\"{user_id}\"", + request_headers={"accept": "application/json"}, + json={"member": [apikey]}, + status_code=200, + additional_matcher=lambda req: additional_matcher(req, cert=PEM_PATH) + ) + + assert user_utils.create_or_get_manage_api_key_for_user(user_id, temporary=temporary) == apikey + assert post.call_count == 1 + assert get.call_count == 1 + + # even if temporary is set, because we did not create the api key, we should not registered a hook to delete it + assert call(user_utils.delete_manage_api_key, apikey) not in mock_atexit.mock_calls, "delete_manage_api_key exit hook registered unexpectedly for existing API Key that we did not create" + + +def test_create_or_get_manage_api_key_for_user_error(user_utils, requests_mock, mock_atexit): + user_id = "user1" + apikey = {"userid": user_id, "href": f"https://{MANAGE_API_URL}/maximo/api/os/mxapikey/theapikeyid"} + + post = requests_mock.post( + f"{MANAGE_API_URL}/maximo/api/os/mxapiapikey?ccm=1&lean=1", + request_headers={"content-type": "application/json"}, + text="boom", + status_code=400, + additional_matcher=lambda req: additional_matcher(req, json={"expiration": -1, "userid": user_id}, cert=PEM_PATH) + ) + + get = requests_mock.get( + f"{MANAGE_API_URL}/maximo/api/os/mxapiapikey?ccm=1&lean=1&oslc.select=*&oslc.where=userid=\"{user_id}\"", + request_headers={"accept": "application/json"}, + json={"member": [apikey]}, + status_code=200, + additional_matcher=lambda req: additional_matcher(req, cert=PEM_PATH) + ) + + with pytest.raises(Exception) as excinfo: + user_utils.create_or_get_manage_api_key_for_user(user_id, temporary=True) + assert str(excinfo.value) == "400 boom" + assert post.call_count == 1 + assert get.call_count == 0 + assert call(user_utils.delete_manage_api_key, apikey) not in mock_atexit.mock_calls, "delete_manage_api_key exit hook not registered even though we failed to create the api key" + + +def test_delete_manage_api_key(user_utils, requests_mock): + user_id = "user1" + apikey_id = "theapikeyid" + apikey = {"userid": user_id, "href": f"https://{MANAGE_API_URL}:{MANAGE_API_PORT}/maximo/api/os/mxapiapikey/{apikey_id}"} + + delete = requests_mock.delete( + f"{MANAGE_API_URL}/maximo/api/os/mxapiapikey/{apikey_id}?ccm=1&lean=1", + request_headers={"accept": "application/json"}, + text="notused", + status_code=204, + additional_matcher=lambda req: additional_matcher(req, cert=PEM_PATH) + ) + + user_utils.delete_manage_api_key(apikey) + assert delete.call_count == 1 + + +def test_delete_manage_api_key_notfound(user_utils, requests_mock): + user_id = "user1" + apikey_id = "theapikeyid" + apikey = {"userid": user_id, "href": f"https://{MANAGE_API_URL}:{MANAGE_API_PORT}/maximo/api/os/mxapiapikey/{apikey_id}"} + + delete = requests_mock.delete( + f"{MANAGE_API_URL}/maximo/api/os/mxapiapikey/{apikey_id}?ccm=1&lean=1", + request_headers={"accept": "application/json"}, + text="notused", + status_code=404, + additional_matcher=lambda req: additional_matcher(req, cert=PEM_PATH) + ) + + user_utils.delete_manage_api_key(apikey) + assert delete.call_count == 1 + + +def test_delete_manage_api_key_bad_href(user_utils, requests_mock): + user_id = "user1" + apikey_id = "theapikeyid" + apikey = {"userid": user_id, "href": f"notgood/{apikey_id}"} + + delete = requests_mock.delete( + f"{MANAGE_API_URL}/maximo/api/os/mxapiapikey/{apikey_id}?ccm=1&lean=1", + request_headers={"accept": "application/json"}, + text="notused", + status_code=204, + additional_matcher=lambda req: additional_matcher(req, cert=PEM_PATH) + ) + + with pytest.raises(Exception) as excinfo: + user_utils.delete_manage_api_key(apikey) + assert str(excinfo.value) == f"Could not parse API Key href: notgood/{apikey_id}" + assert delete.call_count == 0 + + +def test_delete_manage_api_key_error(user_utils, requests_mock): + user_id = "user1" + apikey_id = "theapikeyid" + apikey = {"userid": user_id, "href": f"https://{MANAGE_API_URL}:{MANAGE_API_PORT}/maximo/api/os/mxapiapikey/{apikey_id}"} + + delete = requests_mock.delete( + f"{MANAGE_API_URL}/maximo/api/os/mxapiapikey/{apikey_id}?ccm=1&lean=1", + request_headers={"accept": "application/json"}, + text="boom", + status_code=500, + additional_matcher=lambda req: additional_matcher(req, cert=PEM_PATH) + ) + + with pytest.raises(Exception) as excinfo: + user_utils.delete_manage_api_key(apikey) + assert str(excinfo.value) == "500 boom" + assert delete.call_count == 1 + + +def test_get_manage_group_id(user_utils, requests_mock): + user_id = "user1" + apikey = {"userid": user_id, "apikey": "342fwasdasd", "href": f"https://{MANAGE_API_URL}/maximo/api/os/mxapikey/theapikeyid"} # pragma: allowlist secret + group_name = "thegroup" + group_id = "39231234" + + get = requests_mock.get( + f"{MANAGE_API_URL}/maximo/api/os/mxapigroup?ccm=1&lean=1&oslc.select=maxgroupid&oslc.where=groupname=\"{group_name}\"", + request_headers={"accept": "application/json", "apikey": apikey["apikey"]}, + json={"member": [{"maxgroupid": group_id}]}, + status_code=200, + additional_matcher=lambda req: additional_matcher(req) + ) + + assert user_utils.get_manage_group_id(group_name, apikey) == group_id + assert get.call_count == 1 + + +def test_get_manage_group_id_error(user_utils, requests_mock): + user_id = "user1" + apikey = {"userid": user_id, "apikey": "342fwasdasd", "href": f"https://{MANAGE_API_URL}/maximo/api/os/mxapikey/theapikeyid"} # pragma: allowlist secret + group_name = "thegroup" + + get = requests_mock.get( + f"{MANAGE_API_URL}/maximo/api/os/mxapigroup?ccm=1&lean=1&oslc.select=maxgroupid&oslc.where=groupname=\"{group_name}\"", + request_headers={"accept": "application/json", "apikey": apikey["apikey"]}, + text="boom", + status_code=500, + additional_matcher=lambda req: additional_matcher(req) + ) + with pytest.raises(Exception) as excinfo: + user_utils.get_manage_group_id(group_name, apikey) + assert str(excinfo.value) == "500 boom" + assert get.call_count == 1 + + +def test_get_manage_group_id_notfound(user_utils, requests_mock): + user_id = "user1" + apikey = {"userid": user_id, "apikey": "342fwasdasd", "href": f"https://{MANAGE_API_URL}/maximo/api/os/mxapikey/theapikeyid"} # pragma: allowlist secret + group_name = "thegroup" + + get = requests_mock.get( + f"{MANAGE_API_URL}/maximo/api/os/mxapigroup?ccm=1&lean=1&oslc.select=maxgroupid&oslc.where=groupname=\"{group_name}\"", + request_headers={"accept": "application/json", "apikey": apikey["apikey"]}, + json={"member": [{}]}, + status_code=200, + additional_matcher=lambda req: additional_matcher(req) + ) + assert user_utils.get_manage_group_id(group_name, apikey) is None + assert get.call_count == 1 + + +def test_is_user_in_manage_group_yes(user_utils, requests_mock): + user_id = "user1" + apikey = {"userid": user_id, "apikey": "342fwasdasd", "href": f"https://{MANAGE_API_URL}/maximo/api/os/mxapikey/theapikeyid"} # pragma: allowlist secret + group_name = "thegroup" + group_id = "39231234" + + get_group_id = requests_mock.get( + f"{MANAGE_API_URL}/maximo/api/os/mxapigroup?ccm=1&lean=1&oslc.select=maxgroupid&oslc.where=groupname=\"{group_name}\"", + request_headers={"accept": "application/json"}, + json={"member": [{"maxgroupid": group_id}], "apikey": apikey["apikey"]}, + status_code=200, + additional_matcher=lambda req: additional_matcher(req) + ) + + get_group_user = requests_mock.get( + f"{MANAGE_API_URL}/maximo/api/os/mxapigroup/{group_id}/groupuser?lean=1&oslc.where=userid=\"{user_id}\"", + request_headers={"accept": "application/json", "apikey": apikey["apikey"]}, + json={"member": [{}]}, # <--- member length non-empty indicates that the user is a member of the group + status_code=200, + additional_matcher=lambda req: additional_matcher(req) + ) + + assert user_utils.is_user_in_manage_group(group_name, user_id, apikey) + assert get_group_id.call_count == 1 + assert get_group_user.call_count == 1 + + +def test_is_user_in_manage_group_no(user_utils, requests_mock): + user_id = "user1" + apikey = {"userid": user_id, "apikey": "342fwasdasd", "href": f"https://{MANAGE_API_URL}/maximo/api/os/mxapikey/theapikeyid"} # pragma: allowlist secret + group_name = "thegroup" + group_id = "39231234" + + get_group_id = requests_mock.get( + f"{MANAGE_API_URL}/maximo/api/os/mxapigroup?ccm=1&lean=1&oslc.select=maxgroupid&oslc.where=groupname=\"{group_name}\"", + request_headers={"accept": "application/json"}, + json={"member": [{"maxgroupid": group_id}], "apikey": apikey["apikey"]}, + status_code=200, + additional_matcher=lambda req: additional_matcher(req) + ) + + get_group_user = requests_mock.get( + f"{MANAGE_API_URL}/maximo/api/os/mxapigroup/{group_id}/groupuser?lean=1&oslc.where=userid=\"{user_id}\"", + request_headers={"accept": "application/json", "apikey": apikey["apikey"]}, + json={"member": []}, + status_code=200, + additional_matcher=lambda req: additional_matcher(req) + ) + + assert not user_utils.is_user_in_manage_group(group_name, user_id, apikey) + assert get_group_id.call_count == 1 + assert get_group_user.call_count == 1 + + +def test_is_user_in_manage_group_error(user_utils, requests_mock): + user_id = "user1" + apikey = {"userid": user_id, "apikey": "342fwasdasd", "href": f"https://{MANAGE_API_URL}/maximo/api/os/mxapikey/theapikeyid"} # pragma: allowlist secret + group_name = "thegroup" + group_id = "39231234" + + get_group_id = requests_mock.get( + f"{MANAGE_API_URL}/maximo/api/os/mxapigroup?ccm=1&lean=1&oslc.select=maxgroupid&oslc.where=groupname=\"{group_name}\"", + request_headers={"accept": "application/json"}, + json={"member": [{"maxgroupid": group_id}], "apikey": apikey["apikey"]}, + status_code=200, + additional_matcher=lambda req: additional_matcher(req) + ) + + get_group_user = requests_mock.get( + f"{MANAGE_API_URL}/maximo/api/os/mxapigroup/{group_id}/groupuser?lean=1&oslc.where=userid=\"{user_id}\"", + request_headers={"accept": "application/json", "apikey": apikey["apikey"]}, + text="boom", + status_code=500, + additional_matcher=lambda req: additional_matcher(req) + ) + + with pytest.raises(Exception) as excinfo: + user_utils.is_user_in_manage_group(group_name, user_id, apikey) + assert str(excinfo.value) == "500 boom" + assert get_group_id.call_count == 1 + assert get_group_user.call_count == 1 + + +def test_is_user_in_manage_group_no_group_found(user_utils, requests_mock): + user_id = "user1" + apikey = {"userid": user_id, "apikey": "342fwasdasd", "href": f"https://{MANAGE_API_URL}/maximo/api/os/mxapikey/theapikeyid"} # pragma: allowlist secret + group_name = "thegroup" + group_id = "39231234" + + get_group_id = requests_mock.get( + f"{MANAGE_API_URL}/maximo/api/os/mxapigroup?ccm=1&lean=1&oslc.select=maxgroupid&oslc.where=groupname=\"{group_name}\"", + request_headers={"accept": "application/json"}, + json={"member": [], "apikey": apikey["apikey"]}, + status_code=200, + additional_matcher=lambda req: additional_matcher(req) + ) + + get_group_user = requests_mock.get( + f"{MANAGE_API_URL}/maximo/api/os/mxapigroup/{group_id}/groupuser?lean=1&oslc.where=userid=\"{user_id}\"", + request_headers={"accept": "application/json", "apikey": apikey["apikey"]}, + text="boom", + status_code=500, + additional_matcher=lambda req: additional_matcher(req) + ) + + with pytest.raises(Exception) as excinfo: + user_utils.is_user_in_manage_group(group_name, user_id, apikey) + assert str(excinfo.value) == f"No Manage group found with name {group_name}" + assert get_group_id.call_count == 1 + assert get_group_user.call_count == 0 + + +def test_add_user_to_manage_group(user_utils, requests_mock): + user_id = "user1" + apikey = {"userid": user_id, "apikey": "342fwasdasd", "href": f"https://{MANAGE_API_URL}/maximo/api/os/mxapikey/theapikeyid"} # pragma: allowlist secret + group_name = "thegroup" + group_id = "39231234" + + get_group_id = requests_mock.get( + f"{MANAGE_API_URL}/maximo/api/os/mxapigroup?ccm=1&lean=1&oslc.select=maxgroupid&oslc.where=groupname=\"{group_name}\"", + request_headers={"accept": "application/json"}, + json={"member": [{"maxgroupid": group_id}], "apikey": apikey["apikey"]}, + status_code=200, + additional_matcher=lambda req: additional_matcher(req) + ) + + get_group_user = requests_mock.get( + f"{MANAGE_API_URL}/maximo/api/os/mxapigroup/{group_id}/groupuser?lean=1", + request_headers={"accept": "application/json", "apikey": apikey["apikey"]}, + json={"member": []}, + status_code=200, + additional_matcher=lambda req: additional_matcher(req) + ) + + add_group_user = requests_mock.post( + f"{MANAGE_API_URL}/maximo/api/os/mxapigroup/{group_id}", + request_headers={ + "accept": "application/json", + "content-type": "application/json", + "x-method-override": "PATCH", + "patchtype": "MERGE", + "apikey": apikey["apikey"] + }, + json={}, + status_code=204, + additional_matcher=lambda req: additional_matcher(req, json={"groupuser": [{"userid": user_id}]}) + ) + + assert user_utils.add_user_to_manage_group(user_id, group_name, apikey) is None + assert get_group_id.call_count == 2 + assert get_group_user.call_count == 1 + assert add_group_user.call_count == 1 + + +def test_add_user_to_manage_group_already_member(user_utils, requests_mock): + user_id = "user1" + apikey = {"userid": user_id, "apikey": "342fwasdasd", "href": f"https://{MANAGE_API_URL}/maximo/api/os/mxapikey/theapikeyid"} # pragma: allowlist secret + group_name = "thegroup" + group_id = "39231234" + + get_group_id = requests_mock.get( + f"{MANAGE_API_URL}/maximo/api/os/mxapigroup?ccm=1&lean=1&oslc.select=maxgroupid&oslc.where=groupname=\"{group_name}\"", + request_headers={"accept": "application/json"}, + json={"member": [{"maxgroupid": group_id}], "apikey": apikey["apikey"]}, + status_code=200, + additional_matcher=lambda req: additional_matcher(req) + ) + + get_group_user = requests_mock.get( + f"{MANAGE_API_URL}/maximo/api/os/mxapigroup/{group_id}/groupuser?lean=1", + request_headers={"accept": "application/json", "apikey": apikey["apikey"]}, + json={"member": [{}]}, # <--- member length non-empty indicates that the user is a member of the group + status_code=200, + additional_matcher=lambda req: additional_matcher(req) + ) + + add_group_user = requests_mock.post( + f"{MANAGE_API_URL}/maximo/api/os/mxapigroup/{group_id}", + request_headers={ + "accept": "application/json", + "content-type": "application/json", + "x-method-override": "PATCH", + "patchtype": "MERGE", + "apikey": apikey["apikey"] + }, + json={}, + status_code=204, + additional_matcher=lambda req: additional_matcher(req, json={"groupuser": [{"userid": user_id}]}) + ) + + assert user_utils.add_user_to_manage_group(user_id, group_name, apikey) is None + assert get_group_id.call_count == 1 + assert get_group_user.call_count == 1 + assert add_group_user.call_count == 0 + + +def test_add_user_to_manage_group_error(user_utils, requests_mock): + user_id = "user1" + apikey = {"userid": user_id, "apikey": "342fwasdasd", "href": f"https://{MANAGE_API_URL}/maximo/api/os/mxapikey/theapikeyid"} # pragma: allowlist secret + group_name = "thegroup" + group_id = "39231234" + + get_group_id = requests_mock.get( + f"{MANAGE_API_URL}/maximo/api/os/mxapigroup?ccm=1&lean=1&oslc.select=maxgroupid&oslc.where=groupname=\"{group_name}\"", + request_headers={"accept": "application/json"}, + json={"member": [{"maxgroupid": group_id}], "apikey": apikey["apikey"]}, + status_code=200, + additional_matcher=lambda req: additional_matcher(req) + ) + + get_group_user = requests_mock.get( + f"{MANAGE_API_URL}/maximo/api/os/mxapigroup/{group_id}/groupuser?lean=1", + request_headers={"accept": "application/json", "apikey": apikey["apikey"]}, + json={"member": []}, + status_code=200, + additional_matcher=lambda req: additional_matcher(req) + ) + + add_group_user = requests_mock.post( + f"{MANAGE_API_URL}/maximo/api/os/mxapigroup/{group_id}", + request_headers={ + "accept": "application/json", + "content-type": "application/json", + "x-method-override": "PATCH", + "patchtype": "MERGE", + "apikey": apikey["apikey"] + }, + text="boom", + status_code=500, + additional_matcher=lambda req: additional_matcher(req, json={"groupuser": [{"userid": user_id}]}) + ) + with pytest.raises(Exception) as excinfo: + user_utils.add_user_to_manage_group(user_id, group_name, apikey) + assert str(excinfo.value) == "500 boom" + assert get_group_id.call_count == 2 + assert get_group_user.call_count == 1 + assert add_group_user.call_count == 1 + + +def test_get_mas_applications_in_workspace(user_utils, requests_mock): + get = requests_mock.get( + f"{MAS_API_URL}/workspaces/{MAS_WORKSPACE_ID}/applications", + request_headers={"x-access-token": TOKEN}, + json=[{"id": "manage"}], + status_code=200 + ) + assert user_utils.get_mas_applications_in_workspace() == [{"id": "manage"}] + assert get.call_count == 1 + + +def test_get_mas_applications_in_workspace_error(user_utils, requests_mock): + get = requests_mock.get( + f"{MAS_API_URL}/workspaces/{MAS_WORKSPACE_ID}/applications", + request_headers={"x-access-token": TOKEN}, + json={"error": "internal"}, + status_code=500 + ) + with pytest.raises(Exception) as excinfo: + user_utils.get_mas_applications_in_workspace() + assert get.call_count == 1 + assert str(excinfo.value) == '500 {"error": "internal"}' + + +def test_get_mas_application_availability(user_utils, requests_mock): + application_id = "manage" + get = requests_mock.get( + f"{MAS_API_URL}/workspaces/{MAS_WORKSPACE_ID}/applications/{application_id}", + request_headers={"x-access-token": TOKEN}, + json={"id": "manage"}, + status_code=200 + ) + assert user_utils.get_mas_application_availability(application_id) == {"id": "manage"} + assert get.call_count == 1 + + +def test_get_mas_application_availability_error(user_utils, requests_mock): + application_id = "manage" + get = requests_mock.get( + f"{MAS_API_URL}/workspaces/{MAS_WORKSPACE_ID}/applications/{application_id}", + request_headers={"x-access-token": TOKEN}, + json={"error": "internal"}, + status_code=500 + ) + with pytest.raises(Exception) as excinfo: + user_utils.get_mas_application_availability(application_id) + assert get.call_count == 1 + assert str(excinfo.value) == '500 {"error": "internal"}' + + +def test_await_mas_application_availability(user_utils, requests_mock): + application_id = "manage" + + # returns all possible permutations of the endpoint, until finally returning the + # response that should cause the retry logic to exit + return_values = [ + { + "id": application_id, + }, + { + "available": False, + }, + { + "available": True, + }, + { + "ready": False, + }, + { + "ready": True, + }, + { + "available": False, + "ready": False, + }, + { + "available": True, + "ready": False, + }, + { + "available": False, + "ready": True, + }, + { + "available": True, + "ready": True, + }, + ] + attempt = 0 + + def json_callback(request, context): + nonlocal attempt + nonlocal return_values + ret = return_values[attempt] + attempt = attempt + 1 + return ret + + get = requests_mock.get( + f"{MAS_API_URL}/workspaces/{MAS_WORKSPACE_ID}/applications/{application_id}", + request_headers={"x-access-token": TOKEN}, + json=json_callback, + status_code=200 + ) + + user_utils.await_mas_application_availability(application_id, timeout_secs=5, retry_interval_secs=0) + assert get.call_count == len(return_values) + + +def test_await_mas_application_availability_timeout(user_utils, requests_mock): + application_id = "manage" + + get = requests_mock.get( + f"{MAS_API_URL}/workspaces/{MAS_WORKSPACE_ID}/applications/{application_id}", + request_headers={"x-access-token": TOKEN}, + json={ + "available": False, + "ready": False, + }, + status_code=200 + ) + + with pytest.raises(Exception) as excinfo: + user_utils.await_mas_application_availability(application_id, timeout_secs=1, retry_interval_secs=0.1) + assert get.call_count > 1 + assert str(excinfo.value) == f"{application_id} did not become ready and available in time, aborting" + + +def test_parse_initial_users_from_aws_secret_json(user_utils): + + actual_initial_users = user_utils.parse_initial_users_from_aws_secret_json( + { + "user1@example.com": "primary,joe,bloggs", + "user2@example.com": " primary , ben , bob ", + "user3@example.com": "secondary ,bill, bibb" + } + ) + + expected_initial_users = { + "users": { + "primary": [ + { + "email": "user1@example.com", + "given_name": "joe", + "family_name": "bloggs" + }, + { + "email": "user2@example.com", + "given_name": "ben", + "family_name": "bob" + } + ], + "secondary": [ + { + "email": "user3@example.com", + "given_name": "bill", + "family_name": "bibb" + } + ] + } + } + + assert actual_initial_users == expected_initial_users + + with pytest.raises(Exception) as excinfo: + user_utils.parse_initial_users_from_aws_secret_json({ + "user1@example.com": "primary" + }) + assert "Wrong number of CSV values for user1@example.com (expected 3 but got 1)" == str(excinfo.value) + + with pytest.raises(Exception) as excinfo: + user_utils.parse_initial_users_from_aws_secret_json({ + "user1@example.com": "unknown,x,y" + }) + assert "Unknown user type for user1@example.com: unknown" == str(excinfo.value) + + +def test_create_initial_user_for_saas_no_email(user_utils): + with pytest.raises(Exception) as excinfo: + user_utils.create_initial_user_for_saas({"given_name": "asdasd", "family_name": "sdfzsd"}, None) + assert str(excinfo.value) == "'email' not found in at least one of the user defs" + + +def test_create_initial_user_for_saas_no_given_name(user_utils): + with pytest.raises(Exception) as excinfo: + user_utils.create_initial_user_for_saas({"email": "asda", "family_name": "sdfzsd"}, None) + assert str(excinfo.value) == "'given_name' not found in at least one of the user defs" + + +def test_create_initial_user_for_saas_no_family_name(user_utils): + with pytest.raises(Exception) as excinfo: + user_utils.create_initial_user_for_saas({"email": "asda", "given_name": "asdasd"}, None) + assert str(excinfo.value) == "'family_name' not found in at least one of the user defs" + + +def test_create_initial_user_for_saas_unsupported_type(user_utils): + with pytest.raises(Exception) as excinfo: + user_utils.create_initial_user_for_saas({"given_name": "asdasd", "family_name": "sdfzsd", "email": "asdasd"}, "whoknows") + assert str(excinfo.value) == "Unsupported user_type: whoknows" + +# Assisted by watsonx Code Assistant + + +@pytest.mark.parametrize("user_type, permissions, entitlement, is_workspace_admin, application_role, manage_security_groups", [ + ( + "PRIMARY", + {"systemAdmin": False, "userAdmin": True, "apikeyAdmin": False}, + {"application": "PREMIUM", "admin": "ADMIN_BASE", "alwaysReserveLicense": True}, + True, + "ADMIN", + ["MAXADMIN"] + ), + ( + "SECONDARY", + {"systemAdmin": False, "userAdmin": False, "apikeyAdmin": False}, + {"application": "BASE", "admin": "NONE", "alwaysReserveLicense": True}, + False, + "USER", + [] + ) +]) +def test_create_initial_user_for_saas( + user_type, permissions, entitlement, is_workspace_admin, application_role, manage_security_groups, + user_utils, requests_mock +): + user_utils.get_or_create_user = MagicMock() + user_utils.link_user_to_local_idp = MagicMock() + user_utils.add_user_to_workspace = MagicMock() + mas_workspace_application_ids = ["manage", "iot"] + user_utils.get_mas_applications_in_workspace = MagicMock(return_value=map(lambda x: {"id": x}, mas_workspace_application_ids)) + user_utils.await_mas_application_availability = MagicMock() + user_utils.set_user_application_permission = MagicMock() + user_utils.check_user_sync = MagicMock() + manage_api_key = "manage_api_key" # pragma: allowlist secret + user_utils.create_or_get_manage_api_key_for_user = MagicMock(return_value=manage_api_key) + user_utils.add_user_to_manage_group = MagicMock() + + user_email = "bill.bob@acme.com" + user_given_name = "billy" + user_family_name = "bobby" + user_id = user_email + username = user_email + display_name = f"{user_given_name} {user_family_name}" + + user_utils.create_initial_user_for_saas({ + "email": user_email, + "given_name": user_given_name, + "family_name": user_family_name + }, + user_type + ) + + user_utils.get_or_create_user.assert_called_once_with({ + "id": user_id, + "status": {"active": True}, + "username": username, + "owner": "local", + "emails": [ + { + "value": user_email, + "type": "Work", + "primary": True + } + ], + "displayName": display_name, + "issuer": "local", + "permissions": permissions, + "entitlement": entitlement, + "givenName": user_given_name, + "familyName": user_family_name + }) + user_utils.link_user_to_local_idp.assert_called_once_with(user_id, email_password=True) + user_utils.add_user_to_workspace.assert_called_once_with(user_id, is_workspace_admin=is_workspace_admin) + user_utils.await_mas_application_availability.assert_has_calls([call("manage"), call("iot")]) + user_utils.set_user_application_permission.assert_has_calls([ + call(user_id, "manage", "MANAGEUSER"), + call(user_id, "iot", application_role), + ]) + user_utils.check_user_sync.assert_has_calls([ + call(user_id, "manage"), + call(user_id, "iot") + ]) + + if len(manage_security_groups) > 0: + user_utils.create_or_get_manage_api_key_for_user.assert_called_once_with("MAXADMIN", temporary=True) + else: + user_utils.create_or_get_manage_api_key_for_user.assert_not_called() + + user_utils.add_user_to_manage_group.assert_has_calls( + map(lambda sg: call(user_id, sg, manage_api_key), manage_security_groups) + ) + + +def test_create_initial_users_for_saas_invalid_inputs(user_utils): + with pytest.raises(Exception) as excinfo: + user_utils.create_initial_users_for_saas({}) + assert str(excinfo.value) == "expected top-level key 'users' not found" + + with pytest.raises(Exception) as excinfo: + user_utils.create_initial_users_for_saas({"users": {}}) + assert str(excinfo.value) == "expected key 'users.primary' not found" + + with pytest.raises(Exception) as excinfo: + user_utils.create_initial_users_for_saas({"users": {"primary": "nope"}}) + assert str(excinfo.value) == "'users.primary' is not a list" + + with pytest.raises(Exception) as excinfo: + user_utils.create_initial_users_for_saas({"users": {"primary": []}}) + assert str(excinfo.value) == "expected key 'users.secondary' not found" + + with pytest.raises(Exception) as excinfo: + user_utils.create_initial_users_for_saas({"users": {"primary": [], "secondary": "nope"}}) + assert str(excinfo.value) == "'users.secondary' is not a list" + + +def test_create_initial_users_for_saas_no_users(user_utils): + assert user_utils.create_initial_users_for_saas({"users": {"primary": [], "secondary": []}}) == {"completed": [], "failed": []} + + +def test_create_initial_users_for_saas(user_utils): + + mas_workspace_application_ids = ["manage", "iot"] + user_utils.get_mas_applications_in_workspace = MagicMock(return_value=map(lambda x: {"id": x}, mas_workspace_application_ids)) + user_utils.await_mas_application_availability = MagicMock() + user_utils.create_initial_user_for_saas = MagicMock() + + def fail_for_users_b_and_e(user, user_type): + if user["email"] in ["b", "e"]: + raise Exception(f"{user['email']} should fail") + user_utils.create_initial_user_for_saas.side_effect = fail_for_users_b_and_e + + initial_users = { + "users": { + "primary": [ + {"email": "a"}, + {"email": "b"}, + {"email": "c"} + ], + "secondary": [ + {"email": "d"}, + {"email": "e"}, + {"email": "f"} + ] + } + } + + assert user_utils.create_initial_users_for_saas(initial_users) == { + "completed": [ + {"email": "a"}, + {"email": "c"}, + {"email": "d"}, + {"email": "f"}, + ], + "failed": [ + {"email": "b"}, + {"email": "e"}, + ] + } + + user_utils.await_mas_application_availability.assert_has_calls([call("manage"), call("iot")])