Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 57 additions & 0 deletions src/mas/devops/ocp.py
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,63 @@ def crdExists(dynClient: DynamicClient, crdName: str) -> bool:
return False


def getCR(dynClient: DynamicClient, cr_api_version: str, cr_kind: str, cr_name: str, namespace: str = None) -> dict:
"""
Get a Custom Resource
"""

try:
crAPI = dynClient.resources.get(api_version=cr_api_version, kind=cr_kind)
if namespace:
cr = crAPI.get(name=cr_name, namespace=namespace)
else:
cr = crAPI.get(name=cr_name)
return cr
except NotFoundError:
logger.debug(f"CR {cr_name} of kind {cr_kind} does not exist in namespace {namespace}")

return {}


def getSecret(dynClient: DynamicClient, namespace: str, secret_name: str) -> dict:
"""
Get a Secret
"""
try:
secretAPI = dynClient.resources.get(api_version="v1", kind="Secret")
secret = secretAPI.get(name=secret_name, namespace=namespace)
logger.debug(f"Secret {secret_name} exists in namespace {namespace}")
return secret.to_dict()
except NotFoundError:
logger.debug(f"Secret {secret_name} does not exist in namespace {namespace}")
return {}


def apply_resource(dynClient: DynamicClient, resource_yaml: str, namespace: str):
"""
Apply a Kubernetes resource from its YAML definition.
If the resource already exists, it will be updated.
If it does not exist, it will be created.
"""
resource_dict = yaml.safe_load(resource_yaml)
kind = resource_dict['kind']
api_version = resource_dict['apiVersion']
metadata = resource_dict['metadata']
name = metadata['name']

try:
resource = dynClient.resources.get(api_version=api_version, kind=kind)
# Try to get the existing resource
resource.get(name=name, namespace=namespace)
# If found, update it
logger.debug(f"Updating existing {kind} '{name}' in namespace '{namespace}'")
resource.patch(body=resource_dict, namespace=namespace, name=name)
except NotFoundError:
# If not found, create it
logger.debug(f"Creating new {kind} '{name}' in namespace '{namespace}'")
resource.create(body=resource_dict, namespace=namespace)


def listInstances(dynClient: DynamicClient, apiVersion: str, kind: str) -> list:
"""
Get a list of instances of a particular CR on the cluster
Expand Down