diff --git a/.gitignore b/.gitignore index ed45b735..688c5d28 100644 --- a/.gitignore +++ b/.gitignore @@ -14,6 +14,7 @@ README.rst # Environments .venv/ venv/ +.venv39 # Other kubectl.exe diff --git a/bin/mas-devops-saas-job-cleaner b/bin/mas-devops-saas-job-cleaner new file mode 100644 index 00000000..549ce0da --- /dev/null +++ b/bin/mas-devops-saas-job-cleaner @@ -0,0 +1,76 @@ +#!/usr/bin/env python3 + +# ***************************************************************************** +# Copyright (c) 2025 IBM Corporation and other Contributors. +# +# All rights reserved. This program and the accompanying materials +# are made available under the terms of the Eclipse Public License v1.0 +# which accompanies this distribution, and is available at +# http://www.eclipse.org/legal/epl-v10.html +# +# ***************************************************************************** + + +from kubernetes import client, config +from kubernetes.config.config_exception import ConfigException +import logging +import argparse +from mas.devops.saas.job_cleaner import JobCleaner + +import urllib3 +urllib3.disable_warnings() + + +if __name__ == "__main__": + # Initialize the properties we need + parser = argparse.ArgumentParser() + + # Primary Options + parser.add_argument("--log-level", required=False, choices=["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"], default="WARNING") + parser.add_argument("--label", required=True, help="Kubernetes resource label used to identify Job groups to cleanup") + parser.add_argument("--limit", required=False, help="Limit page sizes fetched from K8S API. Larger values will use more memory but less cpu time / network IO.", default=100) + parser.add_argument("--dry-run", required=False, help="When specified, nothing will actually be deleted from the cluster", action="store_true") + + args, unknown = parser.parse_known_args() + + log_level = getattr(logging, args.log_level) + + logger = logging.getLogger() + logger.setLevel(log_level) + + ch = logging.StreamHandler() + ch.setLevel(log_level) + chFormatter = logging.Formatter( + "%(asctime)-25s %(name)-50s [%(threadName)s] %(levelname)-8s %(message)s" + ) + ch.setFormatter(chFormatter) + logger.addHandler(ch) + + + limit = args.limit + label = args.label + dry_run = args.dry_run + + logger.info("Configuration:") + logger.info("--------------") + logger.info(f"log_level: {log_level}") + logger.info(f"label: {label}") + logger.info(f"limit: {limit}") + logger.info(f"dry_run: {dry_run}") + + logger.info("") + + try: + # Try to load in-cluster configuration + config.load_incluster_config() + logger.info("Loaded in-cluster configuration") + except ConfigException: + # If that fails, fall back to kubeconfig file + config.load_kube_config() + logger.info("Loaded kubeconfig file") + + logger.info("") + + job_cleaner = JobCleaner(client.api_client.ApiClient()) + + job_cleaner.cleanup_jobs(args.label, args.limit, dry_run) diff --git a/setup.py b/setup.py index a0643b2f..520b4673 100644 --- a/setup.py +++ b/setup.py @@ -84,6 +84,7 @@ def get_version(rel_path): 'Topic :: Software Development :: Libraries :: Python Modules' ], scripts=[ - 'bin/mas-devops-db2-validate-config' + 'bin/mas-devops-db2-validate-config', + 'bin/mas-devops-saas-job-cleaner' ] ) diff --git a/src/mas/devops/saas/__init__.py b/src/mas/devops/saas/__init__.py new file mode 100644 index 00000000..494ce588 --- /dev/null +++ b/src/mas/devops/saas/__init__.py @@ -0,0 +1,9 @@ +# ***************************************************************************** +# Copyright (c) 2025 IBM Corporation and other Contributors. +# +# All rights reserved. This program and the accompanying materials +# are made available under the terms of the Eclipse Public License v1.0 +# which accompanies this distribution, and is available at +# http://www.eclipse.org/legal/epl-v10.html +# +# ***************************************************************************** diff --git a/src/mas/devops/saas/job_cleaner.py b/src/mas/devops/saas/job_cleaner.py new file mode 100644 index 00000000..43a539b3 --- /dev/null +++ b/src/mas/devops/saas/job_cleaner.py @@ -0,0 +1,121 @@ +# ***************************************************************************** +# Copyright (c) 2025 IBM Corporation and other Contributors. +# +# All rights reserved. This program and the accompanying materials +# are made available under the terms of the Eclipse Public License v1.0 +# which accompanies this distribution, and is available at +# http://www.eclipse.org/legal/epl-v10.html +# +# ***************************************************************************** + +from kubernetes import client +import logging +import itertools + +# Possible future features: behaviours that diverge from default ArgoCD behaviour (if auto_delete: true were set), but may be useful?: +# - support option to only purge jobs >n iterations old +# - avoid purging jobs that are still running +# - save details / logs from purged jobs (where? to a PV?) + + +class JobCleaner: + def __init__(self, k8s_client: client.api_client.ApiClient): + self.k8s_client = k8s_client + self.batch_v1_api = client.BatchV1Api(self.k8s_client) + self.logger = logging.getLogger(f"{__name__}.{self.__class__.__name__}") + + def _get_all_cleanup_groups(self, label: str, limit: int): + # set of tuples (namespace, cleanup_group_id) + cleanup_groups = set() + _continue = None + while True: + + jobs_page = self.batch_v1_api.list_job_for_all_namespaces( + label_selector=label, + limit=limit, + _continue=_continue + ) + _continue = jobs_page.metadata._continue + + for job in jobs_page.items: + cleanup_groups.add((job.metadata.namespace, job.metadata.labels[label])) + + if _continue is None: + return cleanup_groups + + def _get_all_jobs(self, namespace: str, group_id: str, label: str, limit: int): + # page through all jobs in this namespace and group, and chain together all the resulting iterators + job_items_iters = [] + _continue = None + while True: + jobs_page = self.batch_v1_api.list_namespaced_job( + namespace, + label_selector=f"{label}={group_id}", + limit=limit, + _continue=_continue + ) + job_items_iters.append(jobs_page.items) + _continue = jobs_page.metadata._continue + if _continue is None: + return itertools.chain(*job_items_iters) + + def cleanup_jobs(self, label: str, limit: int, dry_run: bool): + dry_run_param = None + if dry_run: + dry_run_param = "All" + + # We want to avoid loading all Jobs into memory at once (there may be a lot) + # We cannot lazily page through Job resources in case a page boundary lands half way through a group + # Instead, we'll trade cpu time / network IO to save memory by: + # - Performing an initial query to load all unique (namespace, group IDs) into memory + + cleanup_groups = self._get_all_cleanup_groups(label, limit) + + self.logger.info(f"Found {len(cleanup_groups)} unique (namespace, cleanup group ID) pairs, processing ...") + + # NOTE: it's possible for things to change in the cluster while this process is ongoing + # e.g.: + # - a new sync cycle creates a newer version of Job; not a problem, just means an orphaned job will stick around for one extra cycle + # - a new cleanup group appears; not a problem, the new cleanup group will be handled in the next cycle + # - ... other race conditions? + # this process is eventually consistent + + # Now we know all the cleanup group ids in the cluster + # we can deal with each one separately; we only have to load the job resources for that particular group into memory at once + # (we have to load into memory in order to guarantee the jobs are sorted by creation_date) + i = 0 + for (namespace, group_id) in cleanup_groups: + + self.logger.info("") + self.logger.info(f"{i}) {group_id} {namespace}") + + jobs = self._get_all_jobs(namespace, group_id, label, limit) + + # sort the jobs by creation_timestamp + jobs_sorted = sorted( + jobs, + key=lambda group_job: group_job.metadata.creation_timestamp, + reverse=True + ) + + if len(jobs_sorted) == 0: + self.logger.warning("No Jobs found in group, must have been deleted by some other process, skipping") + continue + else: + first = True + for job in jobs_sorted: + name = job.metadata.name + creation_timestamp = str(job.metadata.creation_timestamp) + if first: + self.logger.info("{0:<6} {1:<65} {2:<65}".format("SKIP", name, creation_timestamp)) + first = False + else: + try: + self.batch_v1_api.delete_namespaced_job(name, namespace, dry_run=dry_run_param, propagation_policy="Foreground") + result = "SUCCESS" + except client.rest.ApiException as e: + result = f"FAILED: {e}" + + self.logger.info("{0:<6} {1:<65} {2:<65} {3}".format("PURGE", name, creation_timestamp, result)) + + i = i + 1 diff --git a/test/src/saas/test_job_cleaner.py b/test/src/saas/test_job_cleaner.py new file mode 100644 index 00000000..5305a2b8 --- /dev/null +++ b/test/src/saas/test_job_cleaner.py @@ -0,0 +1,132 @@ +# ***************************************************************************** +# Copyright (c) 2025 IBM Corporation and other Contributors. +# +# All rights reserved. This program and the accompanying materials +# are made available under the terms of the Eclipse Public License v1.0 +# which accompanies this distribution, and is available at +# http://www.eclipse.org/legal/epl-v10.html +# +# ***************************************************************************** +from unittest.mock import patch, Mock, call +from mas.devops.saas.job_cleaner import JobCleaner + + +def mock_job(name, namespace, labels, creation_timestamp): + + mock = Mock() + mock.metadata = Mock() + mock.metadata.name = name + mock.metadata.namespace = namespace + mock.metadata.labels = labels + mock.metadata.creation_timestamp = creation_timestamp + + return mock + + +jobs_in_cluster = [ + mock_job("job-xa-1", "x", {"mas.ibm.com/job-cleanup-group": "a"}, 1), + mock_job("job-xa-2", "x", {"mas.ibm.com/job-cleanup-group": "a"}, 2), + mock_job("job-xa-3", "x", {"mas.ibm.com/job-cleanup-group": "a"}, 3), + + mock_job("job-xb-1", "x", {"mas.ibm.com/job-cleanup-group": "b"}, 1), + mock_job("job-xb-2", "x", {"mas.ibm.com/job-cleanup-group": "b"}, 2), + + mock_job("job-xc-1", "x", {"mas.ibm.com/job-cleanup-group": "c"}, 2), + + mock_job("job-ya-2", "y", {"mas.ibm.com/job-cleanup-group": "a"}, 2), + mock_job("job-ya-1", "y", {"mas.ibm.com/job-cleanup-group": "a"}, 1), + + mock_job("job-yothera-1", "y", {"otherlabel": "a"}, 1), + mock_job("job-zothera-1", "z", {"otherlabel": "a"}, 1) +] + + +def list_jobs(namespace, label_selector, limit, _continue): + if _continue is None: + _continue = 0 + + label_selector_kv = label_selector.split("=") + + def filter_func(job): + if not label_selector_kv[0] in job.metadata.labels: + return False + if len(label_selector_kv) == 2 and not job.metadata.labels[label_selector_kv[0]] == label_selector_kv[1]: + return False + if namespace is not None and job.metadata.namespace != namespace: + return False + return True + + filtered_jobs = list(filter(filter_func, jobs_in_cluster)) + + jobs_page = filtered_jobs[_continue:_continue + limit] + + if len(jobs_page) == 0: + _continue = None + else: + _continue = _continue + limit + + return Mock( + items=jobs_page, + metadata=Mock( + _continue=_continue + ) + ) + + +def list_job_for_all_namespaces(label_selector, limit, _continue): + return list_jobs(None, label_selector, limit, _continue) + + +def list_namespaced_job(namespace, label_selector, limit, _continue): + return list_jobs(namespace, label_selector, limit, _continue) + + +@patch("kubernetes.client.BatchV1Api") +def test_get_all_cleanup_groups(mock_batch_v1_api): + mock_batch_v1_api.return_value.list_job_for_all_namespaces.side_effect = list_job_for_all_namespaces + jc = JobCleaner(None) + for limit in [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]: + assert jc._get_all_cleanup_groups("mas.ibm.com/job-cleanup-group", limit) == {('x', 'a'), ('x', 'b'), ('x', 'c'), ('y', 'a')} + + +@patch("kubernetes.client.BatchV1Api") +def test_get_all_jobs(mock_batch_v1_api): + mock_batch_v1_api.return_value.list_namespaced_job.side_effect = list_namespaced_job + jc = JobCleaner(None) + for limit in [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]: + assert list(map(lambda job: job.metadata.name, jc._get_all_jobs("x", "a", "mas.ibm.com/job-cleanup-group", limit))) == ["job-xa-1", "job-xa-2", "job-xa-3"] + assert list(map(lambda job: job.metadata.name, jc._get_all_jobs("x", "b", "mas.ibm.com/job-cleanup-group", limit))) == ["job-xb-1", "job-xb-2"] + assert list(map(lambda job: job.metadata.name, jc._get_all_jobs("x", "c", "mas.ibm.com/job-cleanup-group", limit))) == ["job-xc-1"] + assert list(map(lambda job: job.metadata.name, jc._get_all_jobs("y", "a", "mas.ibm.com/job-cleanup-group", limit))) == ["job-ya-2", "job-ya-1"] + assert list(map(lambda job: job.metadata.name, jc._get_all_jobs("y", "b", "mas.ibm.com/job-cleanup-group", limit))) == [] + assert list(map(lambda job: job.metadata.name, jc._get_all_jobs("y", "a", "otherlabel", limit))) == ["job-yothera-1"] + + +@patch("kubernetes.client.BatchV1Api") +def test_cleanup_jobs(mock_batch_v1_api): + mock_batch_v1_api.return_value.list_job_for_all_namespaces.side_effect = list_job_for_all_namespaces + mock_batch_v1_api.return_value.list_namespaced_job.side_effect = list_namespaced_job + + jc = JobCleaner(None) + for dry_run in [False, True]: + dry_run_param = None + if dry_run: + dry_run_param = "All" + + expected_calls = [ + call('job-ya-1', 'y', dry_run=dry_run_param, propagation_policy='Foreground'), + call('job-xa-2', 'x', dry_run=dry_run_param, propagation_policy='Foreground'), + call('job-xa-1', 'x', dry_run=dry_run_param, propagation_policy='Foreground'), + call('job-xb-1', 'x', dry_run=dry_run_param, propagation_policy='Foreground'), + ] + + for limit in [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]: + mock_batch_v1_api.return_value.delete_namespaced_job.reset_mock() + jc.cleanup_jobs("mas.ibm.com/job-cleanup-group", 3, dry_run) + + mock_batch_v1_api.return_value.delete_namespaced_job.assert_has_calls( + expected_calls, + any_order=True + ) + + assert mock_batch_v1_api.return_value.delete_namespaced_job.call_count == len(expected_calls)