diff --git a/src/mas/devops/ocp.py b/src/mas/devops/ocp.py index 4219c5cc..50d24530 100644 --- a/src/mas/devops/ocp.py +++ b/src/mas/devops/ocp.py @@ -243,6 +243,63 @@ def crdExists(dynClient: DynamicClient, crdName: str) -> bool: return False +def getCR(dynClient: DynamicClient, cr_api_version: str, cr_kind: str, cr_name: str, namespace: str = None) -> dict: + """ + Get a Custom Resource + """ + + try: + crAPI = dynClient.resources.get(api_version=cr_api_version, kind=cr_kind) + if namespace: + cr = crAPI.get(name=cr_name, namespace=namespace) + else: + cr = crAPI.get(name=cr_name) + return cr + except NotFoundError: + logger.debug(f"CR {cr_name} of kind {cr_kind} does not exist in namespace {namespace}") + + return {} + + +def getSecret(dynClient: DynamicClient, namespace: str, secret_name: str) -> dict: + """ + Get a Secret + """ + try: + secretAPI = dynClient.resources.get(api_version="v1", kind="Secret") + secret = secretAPI.get(name=secret_name, namespace=namespace) + logger.debug(f"Secret {secret_name} exists in namespace {namespace}") + return secret.to_dict() + except NotFoundError: + logger.debug(f"Secret {secret_name} does not exist in namespace {namespace}") + return {} + + +def apply_resource(dynClient: DynamicClient, resource_yaml: str, namespace: str): + """ + Apply a Kubernetes resource from its YAML definition. + If the resource already exists, it will be updated. + If it does not exist, it will be created. + """ + resource_dict = yaml.safe_load(resource_yaml) + kind = resource_dict['kind'] + api_version = resource_dict['apiVersion'] + metadata = resource_dict['metadata'] + name = metadata['name'] + + try: + resource = dynClient.resources.get(api_version=api_version, kind=kind) + # Try to get the existing resource + resource.get(name=name, namespace=namespace) + # If found, update it + logger.debug(f"Updating existing {kind} '{name}' in namespace '{namespace}'") + resource.patch(body=resource_dict, namespace=namespace, name=name) + except NotFoundError: + # If not found, create it + logger.debug(f"Creating new {kind} '{name}' in namespace '{namespace}'") + resource.create(body=resource_dict, namespace=namespace) + + def listInstances(dynClient: DynamicClient, apiVersion: str, kind: str) -> list: """ Get a list of instances of a particular CR on the cluster