Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 58 additions & 0 deletions modules/python/clients/kubernetes_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -975,6 +975,8 @@ def wait_for_condition(self, resource_type: str, wait_condition_type: str, names
valid_conditions = {
'deployment': ['available', 'progressing', 'replicafailure', 'ready'],
'deployments': ['available', 'progressing', 'replicafailure', 'ready'],
'job': ['complete', 'failed'],
'jobs': ['complete', 'failed'],
# Add more resource types as needed
}

Expand Down Expand Up @@ -1049,6 +1051,9 @@ def _check_resource_condition(self, resource_type: str, resource_name: str, cond
if resource_type_lower in ['deployment', 'deployments']:
return self._check_deployment_condition(resource_name, condition_type, namespace, wait_all)

if resource_type_lower in ['job', 'jobs']:
return self._check_job_condition(resource_name, condition_type, namespace, wait_all)

logger.warning(f"Unsupported resource type for condition checking: {resource_type}")
return False

Expand Down Expand Up @@ -1097,6 +1102,47 @@ def _is_deployment_condition_met(self, deployment, condition_type: str) -> bool:

return False

def _check_job_condition(self, resource_name: str, condition_type: str, namespace: str, wait_all: bool) -> bool:
"""Check job condition ('complete' or 'failed')."""
try:
if wait_all or not resource_name:
jobs = self.batch.list_namespaced_job(namespace=namespace).items
else:
job = self.batch.read_namespaced_job(name=resource_name, namespace=namespace)
jobs = [job]

for job in jobs:
if not self._is_job_condition_met(job, condition_type):
return False

return True

except client.rest.ApiException as e:
if e.status == 404:
logger.debug("Job not found, waiting...")
return False
raise e

def _is_job_condition_met(self, job, condition_type: str) -> bool:
"""Check if a job meets the specified condition."""
if not job.status:
return False

condition_type_lower = condition_type.lower()

if condition_type_lower == 'complete':
return bool(job.status.completion_time and job.status.succeeded and job.status.succeeded > 0)

if condition_type_lower == 'failed':
return bool(job.status.failed and job.status.failed > 0 and not job.status.active)

if job.status.conditions:
for condition in job.status.conditions:
if condition.type.lower() == condition_type_lower and condition.status == "True":
return True

return False

def _expand_and_validate_manifests(self, manifests):
"""
Validate and expand manifests, handling List kind and non-dict manifests.
Expand Down Expand Up @@ -1159,6 +1205,8 @@ def _apply_single_manifest(self, manifest, namespace=None):
self.app.create_namespaced_daemon_set(namespace=namespace, body=manifest)
elif kind == "StatefulSet":
self.app.create_namespaced_stateful_set(namespace=namespace, body=manifest)
elif kind == "Job":
self.batch.create_namespaced_job(namespace=namespace, body=manifest)
elif kind == "Service":
self.api.create_namespaced_service(namespace=namespace, body=manifest)
elif kind == "ConfigMap":
Expand Down Expand Up @@ -1313,6 +1361,11 @@ def _update_single_manifest(self, manifest, namespace=None):
self.app.patch_namespaced_stateful_set(name=name, namespace=namespace, body=manifest)
else:
raise ValueError("StatefulSet requires a namespace")
elif kind == "Job":
if namespace:
self.batch.patch_namespaced_job(name=name, namespace=namespace, body=manifest)
else:
raise ValueError("Job requires a namespace")
elif kind == "Service":
if namespace:
self.api.patch_namespaced_service(name=name, namespace=namespace, body=manifest)
Expand Down Expand Up @@ -1494,6 +1547,11 @@ def _delete_single_manifest(self, manifest, ignore_not_found: bool = True, names
self.app.delete_namespaced_stateful_set(name=resource_name, namespace=namespace, body=delete_options)
else:
raise ValueError("StatefulSet requires a namespace")
elif kind == "Job":
if namespace:
self.batch.delete_namespaced_job(name=resource_name, namespace=namespace, body=delete_options)
else:
raise ValueError("Job requires a namespace")
elif kind == "Service":
if namespace:
self.api.delete_namespaced_service(name=resource_name, namespace=namespace, body=delete_options)
Expand Down
131 changes: 129 additions & 2 deletions modules/python/crud/azure/node_pool_crud.py
Original file line number Diff line number Diff line change
Expand Up @@ -297,8 +297,8 @@ def _apply_deployment(
# Generate deployment name
deployment_name = f"myapp-{node_pool_name}-{deployment_index}"

# Use per-deployment label to avoid selector collision
deployment_label = f"{label_selector.split('=', 1)[-1]}-{deployment_index}"
# Use per-deployment label to avoid selector collision across workload types
deployment_label = f"{label_selector.split('=', 1)[-1]}-deployment-{deployment_index}"

# Create deployment template using k8s_client.create_template
deployment_template = k8s_client.create_template(
Expand Down Expand Up @@ -409,3 +409,130 @@ def create_deployment(
return True
logger.warning("Created %d/%d deployment(s)", successful_deployments, number_of_deployments)
return False

def _apply_job(
self,
k8s_client,
node_pool_name,
job_index,
completions,
manifest_dir,
label_selector,
namespace
):
"""Helper for create_job — applies and verifies a single Job."""
if manifest_dir:
# Use the template path from manifest_dir
template_path = f"{manifest_dir}/job.yml"
else:
# Use default template path (relative to workingDirectory: modules/python)
template_path = os.path.join(
os.path.dirname(os.path.abspath(__file__)),
"..", "workload_templates", "job.yml"
)

# Generate job name
job_name = f"myapp-{node_pool_name}-{job_index}"

# Use per-job label to avoid selector collision across workload types
job_label = f"{label_selector.split('=', 1)[-1]}-job-{job_index}"

# Create job template using k8s_client.create_template
job_template = k8s_client.create_template(
template_path,
{
"JOB_COMPLETIONS": completions,
"NODE_POOL_NAME": node_pool_name,
"INDEX": job_index,
"LABEL_VALUE": job_label,
}
)

# Apply each document in the rendered multi-doc template
for doc in yaml.safe_load_all(job_template):
if doc:
k8s_client.apply_manifest_from_file(manifest_dict=doc, namespace=namespace)

logger.info("Applied manifest for job %s", job_name)

# Wait for job to complete (successful job verification)
logger.info("Waiting for job %s to complete...", job_name)
job_ready = k8s_client.wait_for_condition(
resource_type="job",
wait_condition_type="complete",
resource_name=job_name,
namespace=namespace,
timeout_seconds=self.step_timeout
)

if not job_ready:
raise TimeoutError(
f"Job {job_name} failed to complete within timeout"
)

logger.info("Job %s is successfully complete", job_name)
logger.info("Successfully created and verified job %d", job_index)

def create_job(
self,
node_pool_name,
completions=1,
manifest_dir=None,
number_of_jobs=1,
label_selector="app=nginx-container",
namespace="default"
):
"""
Create Kubernetes jobs after node pool operations.

Args:
node_pool_name: Name of the node pool to target
namespace: Kubernetes namespace (default: "default")
completions: Number of job completions (default: 1)
manifest_dir: Directory containing Kubernetes manifest files
number_of_jobs: Number of jobs to create (default: 1)
label_selector: Label selector for pods (default: "app=nginx-container")

Returns:
True if all job creations were successful, False otherwise
"""
logger.info("Creating %d job(s)", number_of_jobs)
logger.info("Target node pool: %s", node_pool_name)
logger.info("Job completions: %d", completions)
logger.info("Using manifest directory: %s", manifest_dir)

# Get Kubernetes client from AKS client
k8s_client = self.aks_client.k8s_client

if not k8s_client:
logger.error("Kubernetes client not available")
return False

successful_jobs = 0

# Loop through number of jobs
for job_index in range(1, number_of_jobs + 1):
logger.info("Creating job %d/%d", job_index, number_of_jobs)

try:
self._apply_job(
k8s_client=k8s_client,
node_pool_name=node_pool_name,
job_index=job_index,
completions=completions,
manifest_dir=manifest_dir,
label_selector=label_selector,
namespace=namespace
)
successful_jobs += 1
except Exception as e:
logger.error("Failed to create job %d: %s", job_index, e)
# Continue with next job instead of failing completely
continue

# Check if all jobs were successful
if successful_jobs == number_of_jobs:
logger.info("Successfully created all %d job(s)", number_of_jobs)
return True
logger.warning("Created %d/%d job(s)", successful_jobs, number_of_jobs)
return False
31 changes: 30 additions & 1 deletion modules/python/crud/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ def handle_node_pool_operation(node_pool_crud, args):
return 1

def handle_workload_operations(node_pool_crud, args):
"""Handle workload operations (deployment, statefulset, jobs) based on the command"""
"""Handle workload operations (deployment, job) based on the command"""
command = args.command
result = None

Expand All @@ -167,6 +167,21 @@ def handle_workload_operations(node_pool_crud, args):
}

result = node_pool_crud.create_deployment(**deploy_kwargs)
elif command == "job":
if not hasattr(node_pool_crud, 'create_job'):
logger.error("Cloud provider does not support job workload operations")
return 1

# Prepare job arguments
job_kwargs = {
"node_pool_name": args.node_pool_name,
"completions": args.completions,
"manifest_dir": args.manifest_dir,
"number_of_jobs": args.count,
"label_selector": args.label_selector,
}

result = node_pool_crud.create_job(**job_kwargs)
else:
logger.error("Unknown workload command: '%s'", command)
return 1
Expand Down Expand Up @@ -386,6 +401,20 @@ def main():
)
deployment_parser.set_defaults(func=handle_workload_operations)

# Job command
job_parser = subparsers.add_parser(
"job",
parents=[common_parser, workload_common_parser],
help="create jobs"
)
job_parser.add_argument(
"--completions",
type=int,
default=1,
help="Number of job completions"
)
job_parser.set_defaults(func=handle_workload_operations)

# Arguments provided, run node pool operations and collect benchmark results
try:
args = parser.parse_args()
Expand Down
20 changes: 20 additions & 0 deletions modules/python/crud/workload_templates/job.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
apiVersion: batch/v1
kind: Job
metadata:
name: myapp-{{NODE_POOL_NAME}}-{{INDEX}}
labels:
app: {{LABEL_VALUE}}
spec:
completions: {{JOB_COMPLETIONS}}
template:
metadata:
labels:
app: {{LABEL_VALUE}}
spec:
restartPolicy: Never
containers:
- name: {{LABEL_VALUE}}
image: mcr.microsoft.com/oss/nginx/nginx:1.21.6
command: ["nginx", "-t"]
ports:
- containerPort: 80
Loading
Loading