From 6253bacb4679dc9a0ddbd23a4927c57edb35bc2a Mon Sep 17 00:00:00 2001 From: "klapitom@uk.ibm.com" <7372253+tomklapiscak@users.noreply.github.com> Date: Thu, 13 Feb 2025 16:57:50 +0000 Subject: [PATCH 1/8] [minor] WIP: SaaS Job Cleanup Logic --- bin/mas-devops-saas-job-cleaner | 38 ++++++++++++++++++++ src/mas/devops/saas/__init__.py | 9 +++++ src/mas/devops/saas/job_cleaner.py | 57 ++++++++++++++++++++++++++++++ 3 files changed, 104 insertions(+) create mode 100644 bin/mas-devops-saas-job-cleaner create mode 100644 src/mas/devops/saas/__init__.py create mode 100644 src/mas/devops/saas/job_cleaner.py diff --git a/bin/mas-devops-saas-job-cleaner b/bin/mas-devops-saas-job-cleaner new file mode 100644 index 00000000..32c83552 --- /dev/null +++ b/bin/mas-devops-saas-job-cleaner @@ -0,0 +1,38 @@ + +from kubernetes import client, config +from kubernetes.config.config_exception import ConfigException +import logging +import argparse +from mas.devops.saas import job_cleaner + +import urllib3 +urllib3.disable_warnings() + + +if __name__ == "__main__": + # Initialize the properties we need + parser = argparse.ArgumentParser() + + # Primary Options + parser.add_argument("--log-level", required=False, choices=["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"], default="WARNING") + parser.add_argument("--label", required=True, help="Kubernetes resource label used to identify Job groups and associated ConfigMap.") + + args, unknown = parser.parse_known_args() + + log_level = getattr(logging, args.log_level) + logging.basicConfig() + logging.getLogger('mas.devops.saas.job_cleaner').setLevel(level=log_level) + + try: + # Try to load in-cluster configuration + config.load_incluster_config() + print("Loaded in-cluster configuration") + except ConfigException: + # If that fails, fall back to kubeconfig file + config.load_kube_config() + print("Loaded kubeconfig file") + + job_cleaner.cleanup_jobs( + client.api_client.ApiClient(), + args.label + ) diff --git a/src/mas/devops/saas/__init__.py b/src/mas/devops/saas/__init__.py new file mode 100644 index 00000000..725fd6de --- /dev/null +++ b/src/mas/devops/saas/__init__.py @@ -0,0 +1,9 @@ +# ***************************************************************************** +# Copyright (c) 2024 IBM Corporation and other Contributors. +# +# All rights reserved. This program and the accompanying materials +# are made available under the terms of the Eclipse Public License v1.0 +# which accompanies this distribution, and is available at +# http://www.eclipse.org/legal/epl-v10.html +# +# ***************************************************************************** diff --git a/src/mas/devops/saas/job_cleaner.py b/src/mas/devops/saas/job_cleaner.py new file mode 100644 index 00000000..73696878 --- /dev/null +++ b/src/mas/devops/saas/job_cleaner.py @@ -0,0 +1,57 @@ +# ***************************************************************************** +# Copyright (c) 2024 IBM Corporation and other Contributors. +# +# All rights reserved. This program and the accompanying materials +# are made available under the terms of the Eclipse Public License v1.0 +# which accompanies this distribution, and is available at +# http://www.eclipse.org/legal/epl-v10.html +# +# ***************************************************************************** + +from kubernetes import client +import logging + +logger = logging.getLogger(__name__) + + +def cleanup_jobs(k8s_client: client.api_client.ApiClient, label: str): + core_v1_api = client.CoreV1Api(k8s_client) + batch_v1_api = client.BatchV1Api(k8s_client) + + cms = core_v1_api.list_config_map_for_all_namespaces(label_selector=label) + + for cm in cms.items: + cm_ns = cm.metadata.namespace + job_cleanup_group = cm.metadata.labels[label] + logger.info(f"{job_cleanup_group} in {cm_ns}") + try: + current_job_name = cm.data['current_job_name'] + + # get all Jobs in the same namespace as the configmap that have LABEL: job_cleanup_group + jobs_in_cleanup_group = batch_v1_api.list_namespaced_job(cm_ns, label_selector=f"{label}={job_cleanup_group}") + + # sanity checks, abort cleanup of this group if any of these fail + # - one of the jobs should be named current_job_name + # - all jobs names should have job_cleanup_group as a prefix + found_current_job = False + for job in jobs_in_cleanup_group.items: + job_name = job.metadata.name + + if job_name == current_job_name: + found_current_job = True + + if not job_name.startswith(job_cleanup_group): + raise Exception(f"Job name {job_name} has unexpected prefix") + + if found_current_job: + logger.info(f" Found current Job resource: {current_job_name}") + else: + raise Exception(f"Could not find current job {current_job_name}") + + for job in jobs_in_cleanup_group.items: + job_name = job.metadata.name + if job_name != current_job_name: + logger.info(f" Deleting old Job resource: {job_name}") + + except Exception as e: + logger.error(f"Skipping {job_cleanup_group} in {cm_ns}: {repr(e)}") From 83e8b53f73cc3ad10a092aed9092aa3173746728 Mon Sep 17 00:00:00 2001 From: "klapitom@uk.ibm.com" <7372253+tomklapiscak@users.noreply.github.com> Date: Thu, 13 Feb 2025 18:10:20 +0000 Subject: [PATCH 2/8] job cleaner updates --- bin/mas-devops-saas-job-cleaner | 12 ++++++++++++ src/mas/devops/saas/job_cleaner.py | 25 +++++-------------------- 2 files changed, 17 insertions(+), 20 deletions(-) diff --git a/bin/mas-devops-saas-job-cleaner b/bin/mas-devops-saas-job-cleaner index 32c83552..0ac82aab 100644 --- a/bin/mas-devops-saas-job-cleaner +++ b/bin/mas-devops-saas-job-cleaner @@ -1,3 +1,15 @@ +#!/usr/bin/env python3 + +# ***************************************************************************** +# Copyright (c) 2025 IBM Corporation and other Contributors. +# +# All rights reserved. This program and the accompanying materials +# are made available under the terms of the Eclipse Public License v1.0 +# which accompanies this distribution, and is available at +# http://www.eclipse.org/legal/epl-v10.html +# +# ***************************************************************************** + from kubernetes import client, config from kubernetes.config.config_exception import ConfigException diff --git a/src/mas/devops/saas/job_cleaner.py b/src/mas/devops/saas/job_cleaner.py index 73696878..27b12635 100644 --- a/src/mas/devops/saas/job_cleaner.py +++ b/src/mas/devops/saas/job_cleaner.py @@ -1,5 +1,5 @@ # ***************************************************************************** -# Copyright (c) 2024 IBM Corporation and other Contributors. +# Copyright (c) 2025 IBM Corporation and other Contributors. # # All rights reserved. This program and the accompanying materials # are made available under the terms of the Eclipse Public License v1.0 @@ -23,35 +23,20 @@ def cleanup_jobs(k8s_client: client.api_client.ApiClient, label: str): for cm in cms.items: cm_ns = cm.metadata.namespace job_cleanup_group = cm.metadata.labels[label] + logger.info("") logger.info(f"{job_cleanup_group} in {cm_ns}") + logger.info("-------------------------------") try: current_job_name = cm.data['current_job_name'] + logger.info(f"Current Job Name: {current_job_name}") # get all Jobs in the same namespace as the configmap that have LABEL: job_cleanup_group jobs_in_cleanup_group = batch_v1_api.list_namespaced_job(cm_ns, label_selector=f"{label}={job_cleanup_group}") - # sanity checks, abort cleanup of this group if any of these fail - # - one of the jobs should be named current_job_name - # - all jobs names should have job_cleanup_group as a prefix - found_current_job = False - for job in jobs_in_cleanup_group.items: - job_name = job.metadata.name - - if job_name == current_job_name: - found_current_job = True - - if not job_name.startswith(job_cleanup_group): - raise Exception(f"Job name {job_name} has unexpected prefix") - - if found_current_job: - logger.info(f" Found current Job resource: {current_job_name}") - else: - raise Exception(f"Could not find current job {current_job_name}") - for job in jobs_in_cleanup_group.items: job_name = job.metadata.name if job_name != current_job_name: - logger.info(f" Deleting old Job resource: {job_name}") + logger.info(f"Deleting old Job resource: {job_name}") except Exception as e: logger.error(f"Skipping {job_cleanup_group} in {cm_ns}: {repr(e)}") From 0438c26bfee0f4b061707aac84cd6acc51529202 Mon Sep 17 00:00:00 2001 From: "klapitom@uk.ibm.com" <7372253+tomklapiscak@users.noreply.github.com> Date: Fri, 14 Feb 2025 15:39:46 +0000 Subject: [PATCH 3/8] remove need for configmap (use job creation dates instead). support pagination. limit memory usage. --- src/mas/devops/saas/job_cleaner.py | 117 ++++++++++++++++++++++++----- 1 file changed, 97 insertions(+), 20 deletions(-) diff --git a/src/mas/devops/saas/job_cleaner.py b/src/mas/devops/saas/job_cleaner.py index 27b12635..c08215e8 100644 --- a/src/mas/devops/saas/job_cleaner.py +++ b/src/mas/devops/saas/job_cleaner.py @@ -10,33 +10,110 @@ from kubernetes import client import logging +import itertools logger = logging.getLogger(__name__) +# TODO: dry-run mode that just logs (does not delete anything) -def cleanup_jobs(k8s_client: client.api_client.ApiClient, label: str): - core_v1_api = client.CoreV1Api(k8s_client) + +# TODO: test case: four jobs with same cleanup_group id but different namespaces + + +def job_details(job, label): + name = job.metadata.name + namespace = job.metadata.namespace + creation_timestamp = job.metadata.creation_timestamp + cleanup_group = job.metadata.labels[label] + + return f"{name} {namespace} {cleanup_group} {creation_timestamp}" + + +def cleanup_jobs(k8s_client: client.api_client.ApiClient, label: str, limit: int = 100): batch_v1_api = client.BatchV1Api(k8s_client) - cms = core_v1_api.list_config_map_for_all_namespaces(label_selector=label) + # we need to be sure we have all Jobs loaded up front (we can't do the cleanup page by page) + # so a page boundary may cut a cleanup_group in half, which would cause inconsistent behaviour + + # set of tuples (namespace, cleanup_group_id) + cleanup_groups = set() + _continue = None + while True: + + # to avoid loading all jobs into memory at once (there may be a LOT), + # do an initial query to look for all unique group_ids in the cluster + # later, for each group_id, another query to find all jobs belonging to that group + # We're trading cpu time / network io for memory here.. + + jobs_page = batch_v1_api.list_job_for_all_namespaces( + label_selector=label, + limit=limit, + _continue=_continue + ) + _continue = jobs_page.metadata._continue + + for job in jobs_page.items: + cleanup_groups.add((job.metadata.namespace, job.metadata.labels[label])) + + if _continue is None: + break + + # NOTE: it's possible for things to change in the cluster while this process is ongoing + # e.g.: + # - a new sync cycle creates a newer version of Job; not a problem, just means an orphaned job will stick around for one extra cycle + # - a new cleanup group appears; not a problem, the new cleanup group will be handled in the next cycle + # - ... other race conditions? + # this process is eventually consistent + + # Now we know all the cleanup group ids in the cluster + # we can deal with each one separately; we only have to load the job resources for that particular group into memory at once + # (we have to load into memory in order to guarantee the jobs are sorted by creation_date + # if we could (can?) rely on K8S to always return them in this order then we could evaluate each page of Jobs lazily + for (namespace, cleanup_group_id) in cleanup_groups: + + print() + print() + print(f"{namespace} / {cleanup_group_id}") + print("============================") + + # page through all jobs in this namespace and group, and chain together all the resulting iterators + job_items_iters = [] + while True: + jobs_page = batch_v1_api.list_namespaced_job( + namespace, + label_selector=f"{label}={cleanup_group_id}", + limit=limit, + _continue=_continue + ) + job_items_iters.append(jobs_page.items) + _continue = jobs_page.metadata._continue + if _continue is None: + break + + jobs = itertools.chain(*job_items_iters) - for cm in cms.items: - cm_ns = cm.metadata.namespace - job_cleanup_group = cm.metadata.labels[label] - logger.info("") - logger.info(f"{job_cleanup_group} in {cm_ns}") - logger.info("-------------------------------") - try: - current_job_name = cm.data['current_job_name'] - logger.info(f"Current Job Name: {current_job_name}") + # sort the jobs by creation_timestamp + jobs_sorted = iter(sorted( + jobs, + key=lambda group_job: group_job.metadata.creation_timestamp, + reverse=True + )) - # get all Jobs in the same namespace as the configmap that have LABEL: job_cleanup_group - jobs_in_cleanup_group = batch_v1_api.list_namespaced_job(cm_ns, label_selector=f"{label}={job_cleanup_group}") + # inspect the first Job - i.e. the one created most recently + # whatever happens we definitely will not be deleting this job (in this cycle, at least) + most_recent_job = next(jobs_sorted) + print() + print("Most recent Job") + print("------") + print(job_details(most_recent_job, label)) - for job in jobs_in_cleanup_group.items: - job_name = job.metadata.name - if job_name != current_job_name: - logger.info(f"Deleting old Job resource: {job_name}") + # TODO: prune prior jobs even if most recent job has failed? + # or leave them be as they may provide valuable debugging info? - except Exception as e: - logger.error(f"Skipping {job_cleanup_group} in {cm_ns}: {repr(e)}") + print() + print("Old Jobs to be pruned") + print("------") + for job in jobs_sorted: + # prune prior jobs even if most recent job has failed? + # or leave them be as they may provide valuable debugging info? + print(job_details(job, label)) From c7f90c3dc10aa4e5ad7196081f2449eeea68e4e1 Mon Sep 17 00:00:00 2001 From: "klapitom@uk.ibm.com" <7372253+tomklapiscak@users.noreply.github.com> Date: Fri, 14 Feb 2025 18:06:54 +0000 Subject: [PATCH 4/8] tidy up, improve, add features --- bin/mas-devops-saas-job-cleaner | 46 ++++++-- src/mas/devops/saas/job_cleaner.py | 179 +++++++++++++++-------------- 2 files changed, 130 insertions(+), 95 deletions(-) diff --git a/bin/mas-devops-saas-job-cleaner b/bin/mas-devops-saas-job-cleaner index 0ac82aab..549ce0da 100644 --- a/bin/mas-devops-saas-job-cleaner +++ b/bin/mas-devops-saas-job-cleaner @@ -15,7 +15,7 @@ from kubernetes import client, config from kubernetes.config.config_exception import ConfigException import logging import argparse -from mas.devops.saas import job_cleaner +from mas.devops.saas.job_cleaner import JobCleaner import urllib3 urllib3.disable_warnings() @@ -27,24 +27,50 @@ if __name__ == "__main__": # Primary Options parser.add_argument("--log-level", required=False, choices=["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"], default="WARNING") - parser.add_argument("--label", required=True, help="Kubernetes resource label used to identify Job groups and associated ConfigMap.") + parser.add_argument("--label", required=True, help="Kubernetes resource label used to identify Job groups to cleanup") + parser.add_argument("--limit", required=False, help="Limit page sizes fetched from K8S API. Larger values will use more memory but less cpu time / network IO.", default=100) + parser.add_argument("--dry-run", required=False, help="When specified, nothing will actually be deleted from the cluster", action="store_true") args, unknown = parser.parse_known_args() log_level = getattr(logging, args.log_level) - logging.basicConfig() - logging.getLogger('mas.devops.saas.job_cleaner').setLevel(level=log_level) + + logger = logging.getLogger() + logger.setLevel(log_level) + + ch = logging.StreamHandler() + ch.setLevel(log_level) + chFormatter = logging.Formatter( + "%(asctime)-25s %(name)-50s [%(threadName)s] %(levelname)-8s %(message)s" + ) + ch.setFormatter(chFormatter) + logger.addHandler(ch) + + + limit = args.limit + label = args.label + dry_run = args.dry_run + + logger.info("Configuration:") + logger.info("--------------") + logger.info(f"log_level: {log_level}") + logger.info(f"label: {label}") + logger.info(f"limit: {limit}") + logger.info(f"dry_run: {dry_run}") + + logger.info("") try: # Try to load in-cluster configuration config.load_incluster_config() - print("Loaded in-cluster configuration") + logger.info("Loaded in-cluster configuration") except ConfigException: # If that fails, fall back to kubeconfig file config.load_kube_config() - print("Loaded kubeconfig file") + logger.info("Loaded kubeconfig file") - job_cleaner.cleanup_jobs( - client.api_client.ApiClient(), - args.label - ) + logger.info("") + + job_cleaner = JobCleaner(client.api_client.ApiClient()) + + job_cleaner.cleanup_jobs(args.label, args.limit, dry_run) diff --git a/src/mas/devops/saas/job_cleaner.py b/src/mas/devops/saas/job_cleaner.py index c08215e8..db9a45af 100644 --- a/src/mas/devops/saas/job_cleaner.py +++ b/src/mas/devops/saas/job_cleaner.py @@ -12,108 +12,117 @@ import logging import itertools -logger = logging.getLogger(__name__) - # TODO: dry-run mode that just logs (does not delete anything) - - # TODO: test case: four jobs with same cleanup_group id but different namespaces +# TODO: behaviours that diverge from default ArgoCD behaviour (if auto_delete: true were set), but may be useful?: +# - support option to only purge jobs >n iterations old +# - avoid purging jobs that are still running +# - prune prior jobs even if most recent job has failed, or leave them be as they may provide valuable debugging info +# - save details / logs from purged jobs (where? to a PV?) -def job_details(job, label): - name = job.metadata.name - namespace = job.metadata.namespace - creation_timestamp = job.metadata.creation_timestamp - cleanup_group = job.metadata.labels[label] - - return f"{name} {namespace} {cleanup_group} {creation_timestamp}" - - -def cleanup_jobs(k8s_client: client.api_client.ApiClient, label: str, limit: int = 100): - batch_v1_api = client.BatchV1Api(k8s_client) - - # we need to be sure we have all Jobs loaded up front (we can't do the cleanup page by page) - # so a page boundary may cut a cleanup_group in half, which would cause inconsistent behaviour - # set of tuples (namespace, cleanup_group_id) - cleanup_groups = set() - _continue = None - while True: +class JobCleaner: + def __init__(self, k8s_client: client.api_client.ApiClient): + self.k8s_client = k8s_client + self.batch_v1_api = client.BatchV1Api(self.k8s_client) + self.logger = logging.getLogger(f"{__name__}.{self.__class__.__name__}") - # to avoid loading all jobs into memory at once (there may be a LOT), - # do an initial query to look for all unique group_ids in the cluster - # later, for each group_id, another query to find all jobs belonging to that group - # We're trading cpu time / network io for memory here.. - - jobs_page = batch_v1_api.list_job_for_all_namespaces( - label_selector=label, - limit=limit, - _continue=_continue - ) - _continue = jobs_page.metadata._continue - - for job in jobs_page.items: - cleanup_groups.add((job.metadata.namespace, job.metadata.labels[label])) - - if _continue is None: - break + def _get_all_cleanup_groups(self, label: str, limit: int): + # set of tuples (namespace, cleanup_group_id) + cleanup_groups = set() + _continue = None + while True: - # NOTE: it's possible for things to change in the cluster while this process is ongoing - # e.g.: - # - a new sync cycle creates a newer version of Job; not a problem, just means an orphaned job will stick around for one extra cycle - # - a new cleanup group appears; not a problem, the new cleanup group will be handled in the next cycle - # - ... other race conditions? - # this process is eventually consistent + jobs_page = self.batch_v1_api.list_job_for_all_namespaces( + label_selector=label, + limit=limit, + _continue=_continue + ) + _continue = jobs_page.metadata._continue - # Now we know all the cleanup group ids in the cluster - # we can deal with each one separately; we only have to load the job resources for that particular group into memory at once - # (we have to load into memory in order to guarantee the jobs are sorted by creation_date - # if we could (can?) rely on K8S to always return them in this order then we could evaluate each page of Jobs lazily - for (namespace, cleanup_group_id) in cleanup_groups: + for job in jobs_page.items: + cleanup_groups.add((job.metadata.namespace, job.metadata.labels[label])) - print() - print() - print(f"{namespace} / {cleanup_group_id}") - print("============================") + if _continue is None: + return cleanup_groups + def _get_all_jobs(self, namespace: str, group_id: str, label: str, limit: int): # page through all jobs in this namespace and group, and chain together all the resulting iterators job_items_iters = [] + _continue = None while True: - jobs_page = batch_v1_api.list_namespaced_job( + jobs_page = self.batch_v1_api.list_namespaced_job( namespace, - label_selector=f"{label}={cleanup_group_id}", + label_selector=f"{label}={group_id}", limit=limit, _continue=_continue ) job_items_iters.append(jobs_page.items) _continue = jobs_page.metadata._continue if _continue is None: - break - - jobs = itertools.chain(*job_items_iters) - - # sort the jobs by creation_timestamp - jobs_sorted = iter(sorted( - jobs, - key=lambda group_job: group_job.metadata.creation_timestamp, - reverse=True - )) - - # inspect the first Job - i.e. the one created most recently - # whatever happens we definitely will not be deleting this job (in this cycle, at least) - most_recent_job = next(jobs_sorted) - print() - print("Most recent Job") - print("------") - print(job_details(most_recent_job, label)) - - # TODO: prune prior jobs even if most recent job has failed? - # or leave them be as they may provide valuable debugging info? - - print() - print("Old Jobs to be pruned") - print("------") - for job in jobs_sorted: - # prune prior jobs even if most recent job has failed? - # or leave them be as they may provide valuable debugging info? - print(job_details(job, label)) + return itertools.chain(*job_items_iters) + + def cleanup_jobs(self, label: str, limit: int, dry_run: bool): + dry_run_param = None + if dry_run: + dry_run_param = "All" + + # We want to avoid loading all Jobs into memory at once (there may be a lot) + # We cannot lazily page through Job resources in case a page boundary lands half way through a group + # Instead, we'll trade cpu time / network IO to save memory by: + # - Performing an initial query to load all unique (namespace, group IDs) into memory + + cleanup_groups = self._get_all_cleanup_groups(label, limit) + + self.logger.info(f"Found {len(cleanup_groups)} unique (namespace, cleanup group ID) pairs, processing ...") + + # NOTE: it's possible for things to change in the cluster while this process is ongoing + # e.g.: + # - a new sync cycle creates a newer version of Job; not a problem, just means an orphaned job will stick around for one extra cycle + # - a new cleanup group appears; not a problem, the new cleanup group will be handled in the next cycle + # - ... other race conditions? + # this process is eventually consistent + + # Now we know all the cleanup group ids in the cluster + # we can deal with each one separately; we only have to load the job resources for that particular group into memory at once + # (we have to load into memory in order to guarantee the jobs are sorted by creation_date) + i = 0 + for (namespace, group_id) in cleanup_groups: + + self.logger.info("") + self.logger.info(f"{i}) {group_id} {namespace}") + + jobs = self._get_all_jobs(namespace, group_id, label, limit) + + # sort the jobs by creation_timestamp + jobs_sorted = sorted( + jobs, + key=lambda group_job: group_job.metadata.creation_timestamp, + reverse=True + ) + + # TODO: sanity checks? + # - all jobs start with same prefix (everything before final `-`)? + + if len(jobs_sorted) == 0: + self.logger.warning("No Jobs found in group, must have been deleted by some other process, skipping") + continue + else: + first = True + for job in jobs_sorted: + name = job.metadata.name + creation_timestamp = str(job.metadata.creation_timestamp) + if first: + self.logger.info("{0:<6} {1:<65} {2:<65}".format("SKIP", name, creation_timestamp)) + first = False + else: + try: + self.batch_v1_api.delete_namespaced_job(name, namespace, dry_run=dry_run_param, propagation_policy="Foreground") + result = "SUCCESS" + except client.rest.ApiException as e: + result = f"FAILED: {e}" + + self.logger.info("{0:<6} {1:<65} {2:<65} {3}".format("PURGE", name, creation_timestamp, result)) + + i = i + 1 From 48e2a159843d5d824d05dfd96e3af939b57c856c Mon Sep 17 00:00:00 2001 From: "klapitom@uk.ibm.com" <7372253+tomklapiscak@users.noreply.github.com> Date: Tue, 18 Feb 2025 15:13:42 +0000 Subject: [PATCH 5/8] add script --- setup.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/setup.py b/setup.py index a0643b2f..520b4673 100644 --- a/setup.py +++ b/setup.py @@ -84,6 +84,7 @@ def get_version(rel_path): 'Topic :: Software Development :: Libraries :: Python Modules' ], scripts=[ - 'bin/mas-devops-db2-validate-config' + 'bin/mas-devops-db2-validate-config', + 'bin/mas-devops-saas-job-cleaner' ] ) From 9fcde9d6373b7c17f0a427cddd126f981a241b62 Mon Sep 17 00:00:00 2001 From: "klapitom@uk.ibm.com" <7372253+tomklapiscak@users.noreply.github.com> Date: Wed, 19 Feb 2025 18:31:36 +0000 Subject: [PATCH 6/8] WIP: unit tests for JobCleaner --- src/mas/devops/saas/job_cleaner.py | 7 +- test/src/saas/test_job_cleaner.py | 101 +++++++++++++++++++++++++++++ 2 files changed, 102 insertions(+), 6 deletions(-) create mode 100644 test/src/saas/test_job_cleaner.py diff --git a/src/mas/devops/saas/job_cleaner.py b/src/mas/devops/saas/job_cleaner.py index db9a45af..69806535 100644 --- a/src/mas/devops/saas/job_cleaner.py +++ b/src/mas/devops/saas/job_cleaner.py @@ -12,13 +12,11 @@ import logging import itertools -# TODO: dry-run mode that just logs (does not delete anything) # TODO: test case: four jobs with same cleanup_group id but different namespaces -# TODO: behaviours that diverge from default ArgoCD behaviour (if auto_delete: true were set), but may be useful?: +# Possible future features: behaviours that diverge from default ArgoCD behaviour (if auto_delete: true were set), but may be useful?: # - support option to only purge jobs >n iterations old # - avoid purging jobs that are still running -# - prune prior jobs even if most recent job has failed, or leave them be as they may provide valuable debugging info # - save details / logs from purged jobs (where? to a PV?) @@ -102,9 +100,6 @@ def cleanup_jobs(self, label: str, limit: int, dry_run: bool): reverse=True ) - # TODO: sanity checks? - # - all jobs start with same prefix (everything before final `-`)? - if len(jobs_sorted) == 0: self.logger.warning("No Jobs found in group, must have been deleted by some other process, skipping") continue diff --git a/test/src/saas/test_job_cleaner.py b/test/src/saas/test_job_cleaner.py new file mode 100644 index 00000000..059608a8 --- /dev/null +++ b/test/src/saas/test_job_cleaner.py @@ -0,0 +1,101 @@ +# ***************************************************************************** +# Copyright (c) 2024 IBM Corporation and other Contributors. +# +# All rights reserved. This program and the accompanying materials +# are made available under the terms of the Eclipse Public License v1.0 +# which accompanies this distribution, and is available at +# http://www.eclipse.org/legal/epl-v10.html +# +# ***************************************************************************** +from unittest.mock import patch, Mock +from mas.devops.saas.job_cleaner import JobCleaner + + +def mock_job(name, namespace, labels, creation_timestamp): + + mock = Mock() + mock.metadata = Mock() + mock.metadata.name = name + mock.metadata.namespace = namespace + mock.metadata.labels = labels + mock.metadata.creation_timestamp = creation_timestamp + + return mock + + +jobs_in_cluster = [ + mock_job("job-xa-1", "x", {"mas.ibm.com/job-cleanup-group": "a"}, 1), + mock_job("job-xa-2", "x", {"mas.ibm.com/job-cleanup-group": "a"}, 2), + mock_job("job-xb-1", "x", {"mas.ibm.com/job-cleanup-group": "b"}, 1), + mock_job("job-ya-1", "y", {"mas.ibm.com/job-cleanup-group": "a"}, 1), + mock_job("job-yothera-1", "y", {"otherlabel": "a"}, 1), + mock_job("job-zothera-1", "z", {"otherlabel": "a"}, 1) +] + + +def list_jobs(namespace, label_selector, limit, _continue): + if _continue is None: + _continue = 0 + + label_selector_kv = label_selector.split("=") + + def filter_func(job): + if not label_selector_kv[0] in job.metadata.labels: + return False + if len(label_selector_kv) == 2 and not job.metadata.labels[label_selector_kv[0]] == label_selector_kv[1]: + return False + if namespace is not None and job.metadata.namespace != namespace: + return False + return True + + filtered_jobs = list(filter(filter_func, jobs_in_cluster)) + + jobs_page = filtered_jobs[_continue:_continue + limit] + + if len(jobs_page) == 0: + _continue = None + else: + _continue = _continue + limit + + return Mock( + items=jobs_page, + metadata=Mock( + _continue=_continue + ) + ) + + +def list_job_for_all_namespaces(label_selector, limit, _continue): + return list_jobs(None, label_selector, limit, _continue) + + +def list_namespaced_job(namespace, label_selector, limit, _continue): + return list_jobs(namespace, label_selector, limit, _continue) + + +@patch("kubernetes.client.BatchV1Api") +def test_get_all_cleanup_groups(mock_batch_v1_api): + mock_batch_v1_api.return_value.list_job_for_all_namespaces.side_effect = list_job_for_all_namespaces + jc = JobCleaner(None) + for limit in [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]: + assert jc._get_all_cleanup_groups("mas.ibm.com/job-cleanup-group", limit) == {('x', 'a'), ('x', 'b'), ('y', 'a')} + + +@patch("kubernetes.client.BatchV1Api") +def test_get_all_jobs(mock_batch_v1_api): + mock_batch_v1_api.return_value.list_namespaced_job.side_effect = list_namespaced_job + jc = JobCleaner(None) + for limit in [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]: + assert list(map(lambda job: job.metadata.name, jc._get_all_jobs("x", "a", "mas.ibm.com/job-cleanup-group", limit))) == ["job-xa-1", "job-xa-2"] + assert list(map(lambda job: job.metadata.name, jc._get_all_jobs("x", "b", "mas.ibm.com/job-cleanup-group", limit))) == ["job-xb-1"] + assert list(map(lambda job: job.metadata.name, jc._get_all_jobs("y", "a", "mas.ibm.com/job-cleanup-group", limit))) == ["job-ya-1"] + assert list(map(lambda job: job.metadata.name, jc._get_all_jobs("y", "b", "mas.ibm.com/job-cleanup-group", limit))) == [] + assert list(map(lambda job: job.metadata.name, jc._get_all_jobs("y", "a", "otherlabel", limit))) == ["job-yothera-1"] + +# TODO +# @patch("kubernetes.client.BatchV1Api") +# def test_cleanup_jobs(mock_batch_v1_api): +# mock_batch_v1_api.return_value.list_job_for_all_namespaces.side_effect = list_job_for_all_namespaces +# mock_batch_v1_api.return_value.list_namespaced_job.side_effect = list_namespaced_job +# jc = JobCleaner(None) +# jc.cleanup_jobs() From 51a31d8100c2f6121a2497b6994e604ca8ed9cf2 Mon Sep 17 00:00:00 2001 From: "klapitom@uk.ibm.com" <7372253+tomklapiscak@users.noreply.github.com> Date: Thu, 20 Feb 2025 17:04:24 +0000 Subject: [PATCH 7/8] unit tests for JobCleaner --- src/mas/devops/saas/job_cleaner.py | 2 -- test/src/saas/test_job_cleaner.py | 55 +++++++++++++++++++++++------- 2 files changed, 43 insertions(+), 14 deletions(-) diff --git a/src/mas/devops/saas/job_cleaner.py b/src/mas/devops/saas/job_cleaner.py index 69806535..43a539b3 100644 --- a/src/mas/devops/saas/job_cleaner.py +++ b/src/mas/devops/saas/job_cleaner.py @@ -12,8 +12,6 @@ import logging import itertools -# TODO: test case: four jobs with same cleanup_group id but different namespaces - # Possible future features: behaviours that diverge from default ArgoCD behaviour (if auto_delete: true were set), but may be useful?: # - support option to only purge jobs >n iterations old # - avoid purging jobs that are still running diff --git a/test/src/saas/test_job_cleaner.py b/test/src/saas/test_job_cleaner.py index 059608a8..07729681 100644 --- a/test/src/saas/test_job_cleaner.py +++ b/test/src/saas/test_job_cleaner.py @@ -7,7 +7,7 @@ # http://www.eclipse.org/legal/epl-v10.html # # ***************************************************************************** -from unittest.mock import patch, Mock +from unittest.mock import patch, Mock, call from mas.devops.saas.job_cleaner import JobCleaner @@ -26,8 +26,16 @@ def mock_job(name, namespace, labels, creation_timestamp): jobs_in_cluster = [ mock_job("job-xa-1", "x", {"mas.ibm.com/job-cleanup-group": "a"}, 1), mock_job("job-xa-2", "x", {"mas.ibm.com/job-cleanup-group": "a"}, 2), + mock_job("job-xa-3", "x", {"mas.ibm.com/job-cleanup-group": "a"}, 3), + mock_job("job-xb-1", "x", {"mas.ibm.com/job-cleanup-group": "b"}, 1), + mock_job("job-xb-2", "x", {"mas.ibm.com/job-cleanup-group": "b"}, 2), + + mock_job("job-xc-1", "x", {"mas.ibm.com/job-cleanup-group": "c"}, 2), + + mock_job("job-ya-2", "y", {"mas.ibm.com/job-cleanup-group": "a"}, 2), mock_job("job-ya-1", "y", {"mas.ibm.com/job-cleanup-group": "a"}, 1), + mock_job("job-yothera-1", "y", {"otherlabel": "a"}, 1), mock_job("job-zothera-1", "z", {"otherlabel": "a"}, 1) ] @@ -78,7 +86,7 @@ def test_get_all_cleanup_groups(mock_batch_v1_api): mock_batch_v1_api.return_value.list_job_for_all_namespaces.side_effect = list_job_for_all_namespaces jc = JobCleaner(None) for limit in [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]: - assert jc._get_all_cleanup_groups("mas.ibm.com/job-cleanup-group", limit) == {('x', 'a'), ('x', 'b'), ('y', 'a')} + assert jc._get_all_cleanup_groups("mas.ibm.com/job-cleanup-group", limit) == {('x', 'a'), ('x', 'b'), ('x', 'c'), ('y', 'a')} @patch("kubernetes.client.BatchV1Api") @@ -86,16 +94,39 @@ def test_get_all_jobs(mock_batch_v1_api): mock_batch_v1_api.return_value.list_namespaced_job.side_effect = list_namespaced_job jc = JobCleaner(None) for limit in [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]: - assert list(map(lambda job: job.metadata.name, jc._get_all_jobs("x", "a", "mas.ibm.com/job-cleanup-group", limit))) == ["job-xa-1", "job-xa-2"] - assert list(map(lambda job: job.metadata.name, jc._get_all_jobs("x", "b", "mas.ibm.com/job-cleanup-group", limit))) == ["job-xb-1"] - assert list(map(lambda job: job.metadata.name, jc._get_all_jobs("y", "a", "mas.ibm.com/job-cleanup-group", limit))) == ["job-ya-1"] + assert list(map(lambda job: job.metadata.name, jc._get_all_jobs("x", "a", "mas.ibm.com/job-cleanup-group", limit))) == ["job-xa-1", "job-xa-2", "job-xa-3"] + assert list(map(lambda job: job.metadata.name, jc._get_all_jobs("x", "b", "mas.ibm.com/job-cleanup-group", limit))) == ["job-xb-1", "job-xb-2"] + assert list(map(lambda job: job.metadata.name, jc._get_all_jobs("x", "c", "mas.ibm.com/job-cleanup-group", limit))) == ["job-xc-1"] + assert list(map(lambda job: job.metadata.name, jc._get_all_jobs("y", "a", "mas.ibm.com/job-cleanup-group", limit))) == ["job-ya-2", "job-ya-1"] assert list(map(lambda job: job.metadata.name, jc._get_all_jobs("y", "b", "mas.ibm.com/job-cleanup-group", limit))) == [] assert list(map(lambda job: job.metadata.name, jc._get_all_jobs("y", "a", "otherlabel", limit))) == ["job-yothera-1"] -# TODO -# @patch("kubernetes.client.BatchV1Api") -# def test_cleanup_jobs(mock_batch_v1_api): -# mock_batch_v1_api.return_value.list_job_for_all_namespaces.side_effect = list_job_for_all_namespaces -# mock_batch_v1_api.return_value.list_namespaced_job.side_effect = list_namespaced_job -# jc = JobCleaner(None) -# jc.cleanup_jobs() + +@patch("kubernetes.client.BatchV1Api") +def test_cleanup_jobs(mock_batch_v1_api): + mock_batch_v1_api.return_value.list_job_for_all_namespaces.side_effect = list_job_for_all_namespaces + mock_batch_v1_api.return_value.list_namespaced_job.side_effect = list_namespaced_job + + jc = JobCleaner(None) + for dry_run in [False, True]: + dry_run_param = None + if dry_run: + dry_run_param = "All" + + expected_calls = [ + call('job-ya-1', 'y', dry_run=dry_run_param, propagation_policy='Foreground'), + call('job-xa-2', 'x', dry_run=dry_run_param, propagation_policy='Foreground'), + call('job-xa-1', 'x', dry_run=dry_run_param, propagation_policy='Foreground'), + call('job-xb-1', 'x', dry_run=dry_run_param, propagation_policy='Foreground'), + ] + + for limit in [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]: + mock_batch_v1_api.return_value.delete_namespaced_job.reset_mock() + jc.cleanup_jobs("mas.ibm.com/job-cleanup-group", 3, dry_run) + + mock_batch_v1_api.return_value.delete_namespaced_job.assert_has_calls( + expected_calls, + any_order=True + ) + + assert mock_batch_v1_api.return_value.delete_namespaced_job.call_count == len(expected_calls) From 857daa656d7b8433fc4bcf2dc69869bd23783b39 Mon Sep 17 00:00:00 2001 From: "klapitom@uk.ibm.com" <7372253+tomklapiscak@users.noreply.github.com> Date: Mon, 14 Apr 2025 12:06:45 +0100 Subject: [PATCH 8/8] copyright --- .gitignore | 1 + src/mas/devops/saas/__init__.py | 2 +- test/src/saas/test_job_cleaner.py | 2 +- 3 files changed, 3 insertions(+), 2 deletions(-) diff --git a/.gitignore b/.gitignore index ed45b735..688c5d28 100644 --- a/.gitignore +++ b/.gitignore @@ -14,6 +14,7 @@ README.rst # Environments .venv/ venv/ +.venv39 # Other kubectl.exe diff --git a/src/mas/devops/saas/__init__.py b/src/mas/devops/saas/__init__.py index 725fd6de..494ce588 100644 --- a/src/mas/devops/saas/__init__.py +++ b/src/mas/devops/saas/__init__.py @@ -1,5 +1,5 @@ # ***************************************************************************** -# Copyright (c) 2024 IBM Corporation and other Contributors. +# Copyright (c) 2025 IBM Corporation and other Contributors. # # All rights reserved. This program and the accompanying materials # are made available under the terms of the Eclipse Public License v1.0 diff --git a/test/src/saas/test_job_cleaner.py b/test/src/saas/test_job_cleaner.py index 07729681..5305a2b8 100644 --- a/test/src/saas/test_job_cleaner.py +++ b/test/src/saas/test_job_cleaner.py @@ -1,5 +1,5 @@ # ***************************************************************************** -# Copyright (c) 2024 IBM Corporation and other Contributors. +# Copyright (c) 2025 IBM Corporation and other Contributors. # # All rights reserved. This program and the accompanying materials # are made available under the terms of the Eclipse Public License v1.0