diff --git a/modules/python/clients/kubernetes_client.py b/modules/python/clients/kubernetes_client.py index 60838d2ce4..d85202b50f 100644 --- a/modules/python/clients/kubernetes_client.py +++ b/modules/python/clients/kubernetes_client.py @@ -975,6 +975,8 @@ def wait_for_condition(self, resource_type: str, wait_condition_type: str, names valid_conditions = { 'deployment': ['available', 'progressing', 'replicafailure', 'ready'], 'deployments': ['available', 'progressing', 'replicafailure', 'ready'], + 'job': ['complete', 'failed'], + 'jobs': ['complete', 'failed'], # Add more resource types as needed } @@ -1049,6 +1051,9 @@ def _check_resource_condition(self, resource_type: str, resource_name: str, cond if resource_type_lower in ['deployment', 'deployments']: return self._check_deployment_condition(resource_name, condition_type, namespace, wait_all) + if resource_type_lower in ['job', 'jobs']: + return self._check_job_condition(resource_name, condition_type, namespace, wait_all) + logger.warning(f"Unsupported resource type for condition checking: {resource_type}") return False @@ -1097,6 +1102,47 @@ def _is_deployment_condition_met(self, deployment, condition_type: str) -> bool: return False + def _check_job_condition(self, resource_name: str, condition_type: str, namespace: str, wait_all: bool) -> bool: + """Check job condition ('complete' or 'failed').""" + try: + if wait_all or not resource_name: + jobs = self.batch.list_namespaced_job(namespace=namespace).items + else: + job = self.batch.read_namespaced_job(name=resource_name, namespace=namespace) + jobs = [job] + + for job in jobs: + if not self._is_job_condition_met(job, condition_type): + return False + + return True + + except client.rest.ApiException as e: + if e.status == 404: + logger.debug("Job not found, waiting...") + return False + raise e + + def _is_job_condition_met(self, job, condition_type: str) -> bool: + """Check if a job meets the specified condition.""" + if not job.status: + return False + + condition_type_lower = condition_type.lower() + + if condition_type_lower == 'complete': + return bool(job.status.completion_time and job.status.succeeded and job.status.succeeded > 0) + + if condition_type_lower == 'failed': + return bool(job.status.failed and job.status.failed > 0 and not job.status.active) + + if job.status.conditions: + for condition in job.status.conditions: + if condition.type.lower() == condition_type_lower and condition.status == "True": + return True + + return False + def _expand_and_validate_manifests(self, manifests): """ Validate and expand manifests, handling List kind and non-dict manifests. @@ -1159,6 +1205,8 @@ def _apply_single_manifest(self, manifest, namespace=None): self.app.create_namespaced_daemon_set(namespace=namespace, body=manifest) elif kind == "StatefulSet": self.app.create_namespaced_stateful_set(namespace=namespace, body=manifest) + elif kind == "Job": + self.batch.create_namespaced_job(namespace=namespace, body=manifest) elif kind == "Service": self.api.create_namespaced_service(namespace=namespace, body=manifest) elif kind == "ConfigMap": @@ -1313,6 +1361,11 @@ def _update_single_manifest(self, manifest, namespace=None): self.app.patch_namespaced_stateful_set(name=name, namespace=namespace, body=manifest) else: raise ValueError("StatefulSet requires a namespace") + elif kind == "Job": + if namespace: + self.batch.patch_namespaced_job(name=name, namespace=namespace, body=manifest) + else: + raise ValueError("Job requires a namespace") elif kind == "Service": if namespace: self.api.patch_namespaced_service(name=name, namespace=namespace, body=manifest) @@ -1494,6 +1547,11 @@ def _delete_single_manifest(self, manifest, ignore_not_found: bool = True, names self.app.delete_namespaced_stateful_set(name=resource_name, namespace=namespace, body=delete_options) else: raise ValueError("StatefulSet requires a namespace") + elif kind == "Job": + if namespace: + self.batch.delete_namespaced_job(name=resource_name, namespace=namespace, body=delete_options) + else: + raise ValueError("Job requires a namespace") elif kind == "Service": if namespace: self.api.delete_namespaced_service(name=resource_name, namespace=namespace, body=delete_options) diff --git a/modules/python/crud/azure/node_pool_crud.py b/modules/python/crud/azure/node_pool_crud.py index 208589d3d7..006de983f6 100644 --- a/modules/python/crud/azure/node_pool_crud.py +++ b/modules/python/crud/azure/node_pool_crud.py @@ -297,8 +297,8 @@ def _apply_deployment( # Generate deployment name deployment_name = f"myapp-{node_pool_name}-{deployment_index}" - # Use per-deployment label to avoid selector collision - deployment_label = f"{label_selector.split('=', 1)[-1]}-{deployment_index}" + # Use per-deployment label to avoid selector collision across workload types + deployment_label = f"{label_selector.split('=', 1)[-1]}-deployment-{deployment_index}" # Create deployment template using k8s_client.create_template deployment_template = k8s_client.create_template( @@ -409,3 +409,130 @@ def create_deployment( return True logger.warning("Created %d/%d deployment(s)", successful_deployments, number_of_deployments) return False + + def _apply_job( + self, + k8s_client, + node_pool_name, + job_index, + completions, + manifest_dir, + label_selector, + namespace + ): + """Helper for create_job — applies and verifies a single Job.""" + if manifest_dir: + # Use the template path from manifest_dir + template_path = f"{manifest_dir}/job.yml" + else: + # Use default template path (relative to workingDirectory: modules/python) + template_path = os.path.join( + os.path.dirname(os.path.abspath(__file__)), + "..", "workload_templates", "job.yml" + ) + + # Generate job name + job_name = f"myapp-{node_pool_name}-{job_index}" + + # Use per-job label to avoid selector collision across workload types + job_label = f"{label_selector.split('=', 1)[-1]}-job-{job_index}" + + # Create job template using k8s_client.create_template + job_template = k8s_client.create_template( + template_path, + { + "JOB_COMPLETIONS": completions, + "NODE_POOL_NAME": node_pool_name, + "INDEX": job_index, + "LABEL_VALUE": job_label, + } + ) + + # Apply each document in the rendered multi-doc template + for doc in yaml.safe_load_all(job_template): + if doc: + k8s_client.apply_manifest_from_file(manifest_dict=doc, namespace=namespace) + + logger.info("Applied manifest for job %s", job_name) + + # Wait for job to complete (successful job verification) + logger.info("Waiting for job %s to complete...", job_name) + job_ready = k8s_client.wait_for_condition( + resource_type="job", + wait_condition_type="complete", + resource_name=job_name, + namespace=namespace, + timeout_seconds=self.step_timeout + ) + + if not job_ready: + raise TimeoutError( + f"Job {job_name} failed to complete within timeout" + ) + + logger.info("Job %s is successfully complete", job_name) + logger.info("Successfully created and verified job %d", job_index) + + def create_job( + self, + node_pool_name, + completions=1, + manifest_dir=None, + number_of_jobs=1, + label_selector="app=nginx-container", + namespace="default" + ): + """ + Create Kubernetes jobs after node pool operations. + + Args: + node_pool_name: Name of the node pool to target + namespace: Kubernetes namespace (default: "default") + completions: Number of job completions (default: 1) + manifest_dir: Directory containing Kubernetes manifest files + number_of_jobs: Number of jobs to create (default: 1) + label_selector: Label selector for pods (default: "app=nginx-container") + + Returns: + True if all job creations were successful, False otherwise + """ + logger.info("Creating %d job(s)", number_of_jobs) + logger.info("Target node pool: %s", node_pool_name) + logger.info("Job completions: %d", completions) + logger.info("Using manifest directory: %s", manifest_dir) + + # Get Kubernetes client from AKS client + k8s_client = self.aks_client.k8s_client + + if not k8s_client: + logger.error("Kubernetes client not available") + return False + + successful_jobs = 0 + + # Loop through number of jobs + for job_index in range(1, number_of_jobs + 1): + logger.info("Creating job %d/%d", job_index, number_of_jobs) + + try: + self._apply_job( + k8s_client=k8s_client, + node_pool_name=node_pool_name, + job_index=job_index, + completions=completions, + manifest_dir=manifest_dir, + label_selector=label_selector, + namespace=namespace + ) + successful_jobs += 1 + except Exception as e: + logger.error("Failed to create job %d: %s", job_index, e) + # Continue with next job instead of failing completely + continue + + # Check if all jobs were successful + if successful_jobs == number_of_jobs: + logger.info("Successfully created all %d job(s)", number_of_jobs) + return True + logger.warning("Created %d/%d job(s)", successful_jobs, number_of_jobs) + return False diff --git a/modules/python/crud/main.py b/modules/python/crud/main.py index 635d14462a..612bd6a53a 100644 --- a/modules/python/crud/main.py +++ b/modules/python/crud/main.py @@ -147,7 +147,7 @@ def handle_node_pool_operation(node_pool_crud, args): return 1 def handle_workload_operations(node_pool_crud, args): - """Handle workload operations (deployment, statefulset, jobs) based on the command""" + """Handle workload operations (deployment, job) based on the command""" command = args.command result = None @@ -167,6 +167,21 @@ def handle_workload_operations(node_pool_crud, args): } result = node_pool_crud.create_deployment(**deploy_kwargs) + elif command == "job": + if not hasattr(node_pool_crud, 'create_job'): + logger.error("Cloud provider does not support job workload operations") + return 1 + + # Prepare job arguments + job_kwargs = { + "node_pool_name": args.node_pool_name, + "completions": args.completions, + "manifest_dir": args.manifest_dir, + "number_of_jobs": args.count, + "label_selector": args.label_selector, + } + + result = node_pool_crud.create_job(**job_kwargs) else: logger.error("Unknown workload command: '%s'", command) return 1 @@ -386,6 +401,20 @@ def main(): ) deployment_parser.set_defaults(func=handle_workload_operations) + # Job command + job_parser = subparsers.add_parser( + "job", + parents=[common_parser, workload_common_parser], + help="create jobs" + ) + job_parser.add_argument( + "--completions", + type=int, + default=1, + help="Number of job completions" + ) + job_parser.set_defaults(func=handle_workload_operations) + # Arguments provided, run node pool operations and collect benchmark results try: args = parser.parse_args() diff --git a/modules/python/crud/workload_templates/job.yml b/modules/python/crud/workload_templates/job.yml new file mode 100644 index 0000000000..4fc6642bff --- /dev/null +++ b/modules/python/crud/workload_templates/job.yml @@ -0,0 +1,20 @@ +apiVersion: batch/v1 +kind: Job +metadata: + name: myapp-{{NODE_POOL_NAME}}-{{INDEX}} + labels: + app: {{LABEL_VALUE}} +spec: + completions: {{JOB_COMPLETIONS}} + template: + metadata: + labels: + app: {{LABEL_VALUE}} + spec: + restartPolicy: Never + containers: + - name: {{LABEL_VALUE}} + image: mcr.microsoft.com/oss/nginx/nginx:1.21.6 + command: ["nginx", "-t"] + ports: + - containerPort: 80 diff --git a/modules/python/tests/clients/test_kubernetes_client.py b/modules/python/tests/clients/test_kubernetes_client.py index 8190789361..318ca2e13d 100644 --- a/modules/python/tests/clients/test_kubernetes_client.py +++ b/modules/python/tests/clients/test_kubernetes_client.py @@ -2586,6 +2586,36 @@ def test_apply_single_manifest_statefulset_no_namespace(self, mock_create_statef mock_create_statefulset.assert_called_once_with( namespace="default", body=manifest) + @patch('kubernetes.client.BatchV1Api.create_namespaced_job') + def test_apply_single_manifest_job(self, mock_create_job): + """Test _apply_single_manifest with Job resource.""" + manifest = { + "apiVersion": "batch/v1", + "kind": "Job", + "metadata": {"name": "test-job", "namespace": "test-namespace"}, + "spec": {} + } + + # pylint: disable=protected-access + self.client._apply_single_manifest(manifest) + mock_create_job.assert_called_once_with( + namespace="test-namespace", body=manifest) + + @patch('kubernetes.client.BatchV1Api.create_namespaced_job') + def test_apply_single_manifest_job_no_namespace(self, mock_create_job): + """Test _apply_single_manifest with Job missing namespace uses 'default'.""" + manifest = { + "apiVersion": "batch/v1", + "kind": "Job", + "metadata": {"name": "test-job"}, + "spec": {} + } + + # pylint: disable=protected-access + self.client._apply_single_manifest(manifest) + mock_create_job.assert_called_once_with( + namespace="default", body=manifest) + @patch('kubernetes.client.CoreV1Api.create_namespaced_service') def test_apply_single_manifest_service(self, mock_create_service): """Test _apply_single_manifest with Service resource.""" @@ -3407,6 +3437,160 @@ def test_wait_for_condition_case_insensitive(self): ) self.assertTrue(result) + # Tests for wait_for_condition with job resource type + + @patch('time.time') + def test_wait_for_condition_job_complete_success(self, mock_time): + """Test wait_for_condition for a completed job - success case""" + mock_time.side_effect = [0, 0, 1, 2, 2] + + mock_job = MagicMock() + mock_job.status.completion_time = "2026-04-20T00:00:00Z" + mock_job.status.succeeded = 1 + mock_job.status.failed = None + mock_job.status.active = None + mock_job.status.conditions = None + + with patch.object(self.client, 'batch') as mock_batch, \ + patch('time.sleep'): + mock_batch.read_namespaced_job.return_value = mock_job + + result = self.client.wait_for_condition( + resource_type="job", + resource_name="test-job", + wait_condition_type="complete", + namespace="test-namespace", + timeout_seconds=5 + ) + + self.assertTrue(result) + mock_batch.read_namespaced_job.assert_called_with( + name="test-job", + namespace="test-namespace" + ) + + @patch('time.time') + def test_wait_for_condition_job_complete_timeout(self, mock_time): + """Test wait_for_condition for a job - timeout when not yet complete""" + mock_time.side_effect = [0, 0, 2, 5, 6, 6] + + mock_job = MagicMock() + mock_job.status.completion_time = None + mock_job.status.succeeded = 0 + mock_job.status.failed = None + mock_job.status.active = 1 + mock_job.status.conditions = None + + with patch.object(self.client, 'batch') as mock_batch, \ + patch('time.sleep'): + mock_batch.read_namespaced_job.return_value = mock_job + + result = self.client.wait_for_condition( + resource_type="job", + resource_name="test-job", + wait_condition_type="complete", + namespace="test-namespace", + timeout_seconds=1 + ) + + self.assertFalse(result) + + @patch('time.time') + def test_wait_for_condition_job_failed_success(self, mock_time): + """Test wait_for_condition for a failed job - detects failure""" + mock_time.side_effect = [0, 0, 1, 2, 2] + + mock_job = MagicMock() + mock_job.status.completion_time = None + mock_job.status.succeeded = 0 + mock_job.status.failed = 3 + mock_job.status.active = 0 + mock_job.status.conditions = None + + with patch.object(self.client, 'batch') as mock_batch, \ + patch('time.sleep'): + mock_batch.read_namespaced_job.return_value = mock_job + + result = self.client.wait_for_condition( + resource_type="job", + resource_name="test-job", + wait_condition_type="failed", + namespace="test-namespace", + timeout_seconds=5 + ) + + self.assertTrue(result) + + @patch('time.time') + def test_wait_for_condition_all_jobs_complete(self, mock_time): + """Test wait_for_condition for all jobs in a namespace - all complete""" + mock_time.side_effect = [0, 0, 1, 2, 2] + + mock_job1 = MagicMock() + mock_job1.status.completion_time = "2026-04-20T00:00:00Z" + mock_job1.status.succeeded = 1 + mock_job1.status.failed = None + mock_job1.status.active = None + mock_job1.status.conditions = None + + mock_job2 = MagicMock() + mock_job2.status.completion_time = "2026-04-20T00:01:00Z" + mock_job2.status.succeeded = 2 + mock_job2.status.failed = None + mock_job2.status.active = None + mock_job2.status.conditions = None + + with patch.object(self.client, 'batch') as mock_batch, \ + patch('time.sleep'): + mock_batch.list_namespaced_job.return_value.items = [mock_job1, mock_job2] + + result = self.client.wait_for_condition( + resource_type="job", + resource_name=None, + wait_condition_type="complete", + namespace="test-namespace", + timeout_seconds=5, + wait_all=True + ) + + self.assertTrue(result) + mock_batch.list_namespaced_job.assert_called_with(namespace="test-namespace") + + @patch('time.time') + def test_wait_for_condition_job_not_found(self, mock_time): + """Test wait_for_condition for a job that doesn't exist - times out""" + mock_time.side_effect = [0, 0, 2, 5, 6, 6] + + api_exception = ApiException(status=404, reason="Not Found") + + with patch.object(self.client, 'batch') as mock_batch, \ + patch('time.sleep'): + mock_batch.read_namespaced_job.side_effect = api_exception + + result = self.client.wait_for_condition( + resource_type="job", + resource_name="nonexistent-job", + wait_condition_type="complete", + namespace="test-namespace", + timeout_seconds=1 + ) + + self.assertFalse(result) + + def test_wait_for_condition_job_invalid_condition(self): + """Test wait_for_condition with invalid condition for job resource""" + with self.assertRaises(ValueError) as context: + self.client.wait_for_condition( + resource_type="job", + resource_name="test-job", + wait_condition_type="available", + namespace="test-namespace", + timeout_seconds=1 + ) + + self.assertIn("Invalid condition 'available' for resource type 'job'", str(context.exception)) + self.assertIn("Valid conditions: complete, failed", str(context.exception)) + # Tests for the enhanced apply_manifest_from_file method with folder support @patch('os.path.isdir') @patch('os.path.isfile') @@ -4020,6 +4204,42 @@ def test_delete_single_manifest_statefulset_no_namespace(self): self.assertIn("StatefulSet requires a namespace", str(context.exception)) + def test_delete_single_manifest_job(self): + """Test deleting a single Job manifest.""" + manifest = { + "apiVersion": "batch/v1", + "kind": "Job", + "metadata": {"name": "test-job", "namespace": "test-namespace"}, + "spec": {} + } + + with patch.object(self.client, 'batch') as mock_batch: + mock_batch.delete_namespaced_job.return_value = None + + # pylint: disable=protected-access + self.client._delete_single_manifest(manifest) + + mock_batch.delete_namespaced_job.assert_called_once_with( + name="test-job", + namespace="test-namespace", + body=unittest.mock.ANY + ) + + def test_delete_single_manifest_job_no_namespace(self): + """Test _delete_single_manifest with Job missing namespace raises error.""" + manifest = { + "apiVersion": "batch/v1", + "kind": "Job", + "metadata": {"name": "test-job"}, + "spec": {} + } + + with self.assertRaises(Exception) as context: + # pylint: disable=protected-access + self.client._delete_single_manifest(manifest) + + self.assertIn("Job requires a namespace", str(context.exception)) + def test_delete_single_manifest_service(self): """Test deleting a single service manifest.""" manifest = { diff --git a/modules/python/tests/crud/test_azure_node_pool_crud.py b/modules/python/tests/crud/test_azure_node_pool_crud.py index 9064a5af4a..7baabe28df 100644 --- a/modules/python/tests/crud/test_azure_node_pool_crud.py +++ b/modules/python/tests/crud/test_azure_node_pool_crud.py @@ -393,5 +393,75 @@ def test_create_deployment_partial_success(self): # Verify create_template was called 3 times (attempted all deployments) self.assertEqual(mock_k8s_client.create_template.call_count, 3) + def test_create_job_success(self): + """Test successful job creation""" + # Setup + mock_k8s_client = mock.MagicMock() + self.mock_aks_client.k8s_client = mock_k8s_client + # Must return a real string - yaml.safe_load_all(MagicMock()) causes an infinite loop + mock_k8s_client.create_template.return_value = "apiVersion: batch/v1\nkind: Job\n" + mock_k8s_client.wait_for_condition.return_value = True + + # Execute + result = self.node_pool_crud.create_job(node_pool_name="test-pool") + + # Verify + self.assertTrue(result) + + def test_create_job_failure(self): + """Test job creation failure""" + # Setup + mock_k8s_client = mock.MagicMock() + self.mock_aks_client.k8s_client = mock_k8s_client + # Must return a real string - yaml.safe_load_all(MagicMock()) causes an infinite loop + mock_k8s_client.create_template.return_value = "apiVersion: batch/v1\nkind: Job\n" + mock_k8s_client.wait_for_condition.return_value = False + + # Execute + result = self.node_pool_crud.create_job(node_pool_name="test-pool") + + # Verify + self.assertFalse(result) + + def test_create_job_no_client(self): + """Test job creation with no Kubernetes client""" + # Setup + self.mock_aks_client.k8s_client = None + + # Execute + result = self.node_pool_crud.create_job(node_pool_name="test-pool") + + # Verify + self.assertFalse(result) + + def test_create_job_partial_success(self): + """Test job creation when some jobs succeed and others fail""" + # Setup + mock_k8s_client = mock.MagicMock() + self.mock_aks_client.k8s_client = mock_k8s_client + + # Must return a real string - yaml.safe_load_all(MagicMock()) causes an infinite loop + mock_k8s_client.create_template.return_value = "apiVersion: batch/v1\nkind: Job\n" + + # Simulate: job 1 succeeds, job 2 fails, job 3 succeeds + # wait_for_condition returns True/False for each job + mock_k8s_client.wait_for_condition.side_effect = [True, False, True] + + # Execute - request 3 jobs + result = self.node_pool_crud.create_job( + node_pool_name="test-pool", + number_of_jobs=3, + completions=5 + ) + + # Verify - should return False (not all jobs succeeded) + self.assertFalse(result) + + # Verify wait_for_condition was called 3 times (once per job) + self.assertEqual(mock_k8s_client.wait_for_condition.call_count, 3) + + # Verify create_template was called 3 times (attempted all jobs) + self.assertEqual(mock_k8s_client.create_template.call_count, 3) + if __name__ == "__main__": unittest.main() diff --git a/steps/engine/crud/k8s/execute.yml b/steps/engine/crud/k8s/execute.yml index bd3fbef2e3..7686202438 100644 --- a/steps/engine/crud/k8s/execute.yml +++ b/steps/engine/crud/k8s/execute.yml @@ -10,6 +10,7 @@ parameters: step_wait_time: 30 gpu_node_pool: false count: 1 + completions: 1 replicas: 10 manifest_dir: "" @@ -86,6 +87,33 @@ steps: REPLICAS: ${{ parameters.replicas }} MANIFEST_DIR: ${{ parameters.manifest_dir }} + - script: | + set -eo pipefail + + # Deploy Jobs + PYTHONPATH=$PYTHONPATH:$(pwd) python3 "$PYTHON_SCRIPT_FILE" job \ + --cloud "$CLOUD" \ + --run-id "$RUN_ID" \ + --result-dir "$RESULT_DIR" \ + --node-pool-name "$POOL_NAME" \ + --count "$COUNT" \ + --completions "$COMPLETIONS" \ + ${MANIFEST_DIR:+--manifest-dir "$MANIFEST_DIR"} \ + --step-timeout "$STEP_TIME_OUT" \ + ${GPU_NODE_POOL:+--gpu-node-pool} + displayName: 'Execute K8s Job operations for ${{ parameters.cloud }}' + workingDirectory: modules/python + env: + PYTHON_SCRIPT_FILE: $(Pipeline.Workspace)/s/modules/python/crud/main.py + POOL_NAME: ${{ parameters.pool_name }} + CLOUD: ${{ parameters.cloud }} + STEP_TIME_OUT: ${{ parameters.step_time_out }} + RESULT_DIR: $(System.DefaultWorkingDirectory)/$(RUN_ID) + GPU_NODE_POOL: ${{ parameters.gpu_node_pool }} + COUNT: ${{ parameters.count }} + COMPLETIONS: ${{ parameters.completions }} + MANIFEST_DIR: ${{ parameters.manifest_dir }} + - script: | set -eo pipefail diff --git a/steps/topology/k8s-crud-gpu/execute-crud.yml b/steps/topology/k8s-crud-gpu/execute-crud.yml index 1c2b02d9c6..7301e9b23f 100644 --- a/steps/topology/k8s-crud-gpu/execute-crud.yml +++ b/steps/topology/k8s-crud-gpu/execute-crud.yml @@ -23,5 +23,6 @@ steps: gpu_node_pool: $(GPU_NODE_POOL) step_wait_time: $(STEP_WAIT_TIME) count: $(COUNT) + completions: $(COMPLETIONS) replicas: $(REPLICAS) manifest_dir: $(MANIFEST_DIR)