Skip to content
Merged
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ README.rst
# Environments
.venv/
venv/
.venv39

# Other
kubectl.exe
Expand Down
76 changes: 76 additions & 0 deletions bin/mas-devops-saas-job-cleaner
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
#!/usr/bin/env python3

# *****************************************************************************
# Copyright (c) 2025 IBM Corporation and other Contributors.
#
# All rights reserved. This program and the accompanying materials
# are made available under the terms of the Eclipse Public License v1.0
# which accompanies this distribution, and is available at
# http://www.eclipse.org/legal/epl-v10.html
#
# *****************************************************************************


from kubernetes import client, config
from kubernetes.config.config_exception import ConfigException
import logging
import argparse
from mas.devops.saas.job_cleaner import JobCleaner

import urllib3
urllib3.disable_warnings()


if __name__ == "__main__":
# Initialize the properties we need
parser = argparse.ArgumentParser()

# Primary Options
parser.add_argument("--log-level", required=False, choices=["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"], default="WARNING")
parser.add_argument("--label", required=True, help="Kubernetes resource label used to identify Job groups to cleanup")
parser.add_argument("--limit", required=False, help="Limit page sizes fetched from K8S API. Larger values will use more memory but less cpu time / network IO.", default=100)
parser.add_argument("--dry-run", required=False, help="When specified, nothing will actually be deleted from the cluster", action="store_true")

args, unknown = parser.parse_known_args()

log_level = getattr(logging, args.log_level)

logger = logging.getLogger()
logger.setLevel(log_level)

ch = logging.StreamHandler()
ch.setLevel(log_level)
chFormatter = logging.Formatter(
"%(asctime)-25s %(name)-50s [%(threadName)s] %(levelname)-8s %(message)s"
)
ch.setFormatter(chFormatter)
logger.addHandler(ch)


limit = args.limit
label = args.label
dry_run = args.dry_run

logger.info("Configuration:")
logger.info("--------------")
logger.info(f"log_level: {log_level}")
logger.info(f"label: {label}")
logger.info(f"limit: {limit}")
logger.info(f"dry_run: {dry_run}")

logger.info("")

try:
# Try to load in-cluster configuration
config.load_incluster_config()
logger.info("Loaded in-cluster configuration")
except ConfigException:
# If that fails, fall back to kubeconfig file
config.load_kube_config()
logger.info("Loaded kubeconfig file")

logger.info("")

job_cleaner = JobCleaner(client.api_client.ApiClient())

job_cleaner.cleanup_jobs(args.label, args.limit, dry_run)
3 changes: 2 additions & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ def get_version(rel_path):
'Topic :: Software Development :: Libraries :: Python Modules'
],
scripts=[
'bin/mas-devops-db2-validate-config'
'bin/mas-devops-db2-validate-config',
'bin/mas-devops-saas-job-cleaner'
]
)
9 changes: 9 additions & 0 deletions src/mas/devops/saas/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
# *****************************************************************************
# Copyright (c) 2025 IBM Corporation and other Contributors.
#
# All rights reserved. This program and the accompanying materials
# are made available under the terms of the Eclipse Public License v1.0
# which accompanies this distribution, and is available at
# http://www.eclipse.org/legal/epl-v10.html
#
# *****************************************************************************
121 changes: 121 additions & 0 deletions src/mas/devops/saas/job_cleaner.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
# *****************************************************************************
# Copyright (c) 2025 IBM Corporation and other Contributors.
#
# All rights reserved. This program and the accompanying materials
# are made available under the terms of the Eclipse Public License v1.0
# which accompanies this distribution, and is available at
# http://www.eclipse.org/legal/epl-v10.html
#
# *****************************************************************************

from kubernetes import client
import logging
import itertools

# Possible future features: behaviours that diverge from default ArgoCD behaviour (if auto_delete: true were set), but may be useful?:
# - support option to only purge jobs >n iterations old
# - avoid purging jobs that are still running
# - save details / logs from purged jobs (where? to a PV?)


class JobCleaner:
def __init__(self, k8s_client: client.api_client.ApiClient):
self.k8s_client = k8s_client
self.batch_v1_api = client.BatchV1Api(self.k8s_client)
self.logger = logging.getLogger(f"{__name__}.{self.__class__.__name__}")

def _get_all_cleanup_groups(self, label: str, limit: int):
# set of tuples (namespace, cleanup_group_id)
cleanup_groups = set()
_continue = None
while True:

jobs_page = self.batch_v1_api.list_job_for_all_namespaces(
label_selector=label,
limit=limit,
_continue=_continue
)
_continue = jobs_page.metadata._continue

for job in jobs_page.items:
cleanup_groups.add((job.metadata.namespace, job.metadata.labels[label]))

if _continue is None:
return cleanup_groups

def _get_all_jobs(self, namespace: str, group_id: str, label: str, limit: int):
# page through all jobs in this namespace and group, and chain together all the resulting iterators
job_items_iters = []
_continue = None
while True:
jobs_page = self.batch_v1_api.list_namespaced_job(
namespace,
label_selector=f"{label}={group_id}",
limit=limit,
_continue=_continue
)
job_items_iters.append(jobs_page.items)
_continue = jobs_page.metadata._continue
if _continue is None:
return itertools.chain(*job_items_iters)

def cleanup_jobs(self, label: str, limit: int, dry_run: bool):
dry_run_param = None
if dry_run:
dry_run_param = "All"

# We want to avoid loading all Jobs into memory at once (there may be a lot)
# We cannot lazily page through Job resources in case a page boundary lands half way through a group
# Instead, we'll trade cpu time / network IO to save memory by:
# - Performing an initial query to load all unique (namespace, group IDs) into memory

cleanup_groups = self._get_all_cleanup_groups(label, limit)

self.logger.info(f"Found {len(cleanup_groups)} unique (namespace, cleanup group ID) pairs, processing ...")

# NOTE: it's possible for things to change in the cluster while this process is ongoing
# e.g.:
# - a new sync cycle creates a newer version of Job; not a problem, just means an orphaned job will stick around for one extra cycle
# - a new cleanup group appears; not a problem, the new cleanup group will be handled in the next cycle
# - ... other race conditions?
# this process is eventually consistent

# Now we know all the cleanup group ids in the cluster
# we can deal with each one separately; we only have to load the job resources for that particular group into memory at once
# (we have to load into memory in order to guarantee the jobs are sorted by creation_date)
i = 0
for (namespace, group_id) in cleanup_groups:

self.logger.info("")
self.logger.info(f"{i}) {group_id} {namespace}")

jobs = self._get_all_jobs(namespace, group_id, label, limit)

# sort the jobs by creation_timestamp
jobs_sorted = sorted(
jobs,
key=lambda group_job: group_job.metadata.creation_timestamp,
reverse=True
)

if len(jobs_sorted) == 0:
self.logger.warning("No Jobs found in group, must have been deleted by some other process, skipping")
continue
else:
first = True
for job in jobs_sorted:
name = job.metadata.name
creation_timestamp = str(job.metadata.creation_timestamp)
if first:
self.logger.info("{0:<6} {1:<65} {2:<65}".format("SKIP", name, creation_timestamp))
first = False
else:
try:
self.batch_v1_api.delete_namespaced_job(name, namespace, dry_run=dry_run_param, propagation_policy="Foreground")
result = "SUCCESS"
except client.rest.ApiException as e:
result = f"FAILED: {e}"

self.logger.info("{0:<6} {1:<65} {2:<65} {3}".format("PURGE", name, creation_timestamp, result))

i = i + 1
132 changes: 132 additions & 0 deletions test/src/saas/test_job_cleaner.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
# *****************************************************************************
# Copyright (c) 2025 IBM Corporation and other Contributors.
#
# All rights reserved. This program and the accompanying materials
# are made available under the terms of the Eclipse Public License v1.0
# which accompanies this distribution, and is available at
# http://www.eclipse.org/legal/epl-v10.html
#
# *****************************************************************************
from unittest.mock import patch, Mock, call
from mas.devops.saas.job_cleaner import JobCleaner


def mock_job(name, namespace, labels, creation_timestamp):

mock = Mock()
mock.metadata = Mock()
mock.metadata.name = name
mock.metadata.namespace = namespace
mock.metadata.labels = labels
mock.metadata.creation_timestamp = creation_timestamp

return mock


jobs_in_cluster = [
mock_job("job-xa-1", "x", {"mas.ibm.com/job-cleanup-group": "a"}, 1),
mock_job("job-xa-2", "x", {"mas.ibm.com/job-cleanup-group": "a"}, 2),
mock_job("job-xa-3", "x", {"mas.ibm.com/job-cleanup-group": "a"}, 3),

mock_job("job-xb-1", "x", {"mas.ibm.com/job-cleanup-group": "b"}, 1),
mock_job("job-xb-2", "x", {"mas.ibm.com/job-cleanup-group": "b"}, 2),

mock_job("job-xc-1", "x", {"mas.ibm.com/job-cleanup-group": "c"}, 2),

mock_job("job-ya-2", "y", {"mas.ibm.com/job-cleanup-group": "a"}, 2),
mock_job("job-ya-1", "y", {"mas.ibm.com/job-cleanup-group": "a"}, 1),

mock_job("job-yothera-1", "y", {"otherlabel": "a"}, 1),
mock_job("job-zothera-1", "z", {"otherlabel": "a"}, 1)
]


def list_jobs(namespace, label_selector, limit, _continue):
if _continue is None:
_continue = 0

label_selector_kv = label_selector.split("=")

def filter_func(job):
if not label_selector_kv[0] in job.metadata.labels:
return False
if len(label_selector_kv) == 2 and not job.metadata.labels[label_selector_kv[0]] == label_selector_kv[1]:
return False
if namespace is not None and job.metadata.namespace != namespace:
return False
return True

filtered_jobs = list(filter(filter_func, jobs_in_cluster))

jobs_page = filtered_jobs[_continue:_continue + limit]

if len(jobs_page) == 0:
_continue = None
else:
_continue = _continue + limit

return Mock(
items=jobs_page,
metadata=Mock(
_continue=_continue
)
)


def list_job_for_all_namespaces(label_selector, limit, _continue):
return list_jobs(None, label_selector, limit, _continue)


def list_namespaced_job(namespace, label_selector, limit, _continue):
return list_jobs(namespace, label_selector, limit, _continue)


@patch("kubernetes.client.BatchV1Api")
def test_get_all_cleanup_groups(mock_batch_v1_api):
mock_batch_v1_api.return_value.list_job_for_all_namespaces.side_effect = list_job_for_all_namespaces
jc = JobCleaner(None)
for limit in [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]:
assert jc._get_all_cleanup_groups("mas.ibm.com/job-cleanup-group", limit) == {('x', 'a'), ('x', 'b'), ('x', 'c'), ('y', 'a')}


@patch("kubernetes.client.BatchV1Api")
def test_get_all_jobs(mock_batch_v1_api):
mock_batch_v1_api.return_value.list_namespaced_job.side_effect = list_namespaced_job
jc = JobCleaner(None)
for limit in [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]:
assert list(map(lambda job: job.metadata.name, jc._get_all_jobs("x", "a", "mas.ibm.com/job-cleanup-group", limit))) == ["job-xa-1", "job-xa-2", "job-xa-3"]
assert list(map(lambda job: job.metadata.name, jc._get_all_jobs("x", "b", "mas.ibm.com/job-cleanup-group", limit))) == ["job-xb-1", "job-xb-2"]
assert list(map(lambda job: job.metadata.name, jc._get_all_jobs("x", "c", "mas.ibm.com/job-cleanup-group", limit))) == ["job-xc-1"]
assert list(map(lambda job: job.metadata.name, jc._get_all_jobs("y", "a", "mas.ibm.com/job-cleanup-group", limit))) == ["job-ya-2", "job-ya-1"]
assert list(map(lambda job: job.metadata.name, jc._get_all_jobs("y", "b", "mas.ibm.com/job-cleanup-group", limit))) == []
assert list(map(lambda job: job.metadata.name, jc._get_all_jobs("y", "a", "otherlabel", limit))) == ["job-yothera-1"]


@patch("kubernetes.client.BatchV1Api")
def test_cleanup_jobs(mock_batch_v1_api):
mock_batch_v1_api.return_value.list_job_for_all_namespaces.side_effect = list_job_for_all_namespaces
mock_batch_v1_api.return_value.list_namespaced_job.side_effect = list_namespaced_job

jc = JobCleaner(None)
for dry_run in [False, True]:
dry_run_param = None
if dry_run:
dry_run_param = "All"

expected_calls = [
call('job-ya-1', 'y', dry_run=dry_run_param, propagation_policy='Foreground'),
call('job-xa-2', 'x', dry_run=dry_run_param, propagation_policy='Foreground'),
call('job-xa-1', 'x', dry_run=dry_run_param, propagation_policy='Foreground'),
call('job-xb-1', 'x', dry_run=dry_run_param, propagation_policy='Foreground'),
]

for limit in [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]:
mock_batch_v1_api.return_value.delete_namespaced_job.reset_mock()
jc.cleanup_jobs("mas.ibm.com/job-cleanup-group", 3, dry_run)

mock_batch_v1_api.return_value.delete_namespaced_job.assert_has_calls(
expected_calls,
any_order=True
)

assert mock_batch_v1_api.return_value.delete_namespaced_job.call_count == len(expected_calls)