diff --git a/modules/python/clients/kubernetes_client.py b/modules/python/clients/kubernetes_client.py index 60838d2ce4..2785dbfd17 100644 --- a/modules/python/clients/kubernetes_client.py +++ b/modules/python/clients/kubernetes_client.py @@ -975,7 +975,8 @@ def wait_for_condition(self, resource_type: str, wait_condition_type: str, names valid_conditions = { 'deployment': ['available', 'progressing', 'replicafailure', 'ready'], 'deployments': ['available', 'progressing', 'replicafailure', 'ready'], - # Add more resource types as needed + 'statefulset': ['ready'], + 'statefulsets': ['ready'], } # Validate wait_condition_type format and type @@ -1049,6 +1050,9 @@ def _check_resource_condition(self, resource_type: str, resource_name: str, cond if resource_type_lower in ['deployment', 'deployments']: return self._check_deployment_condition(resource_name, condition_type, namespace, wait_all) + if resource_type_lower in ['statefulset', 'statefulsets']: + return self._check_statefulset_condition(resource_name, namespace, wait_all) + logger.warning(f"Unsupported resource type for condition checking: {resource_type}") return False @@ -1079,6 +1083,38 @@ def _check_deployment_condition(self, resource_name: str, condition_type: str, n return False raise e + def _check_statefulset_condition(self, resource_name: str, namespace: str, wait_all: bool) -> bool: + """Check statefulset condition (e.g., 'ready').""" + try: + if wait_all or not resource_name: + statefulsets = self.app.list_namespaced_stateful_set(namespace=namespace).items + else: + statefulset = self.app.read_namespaced_stateful_set(name=resource_name, namespace=namespace) + statefulsets = [statefulset] + + for statefulset in statefulsets: + if not self._is_statefulset_ready(statefulset): + return False + + return True + + except client.rest.ApiException as e: + if e.status == 404: + logger.debug("StatefulSet not found, waiting...") + return False + raise e + + def _is_statefulset_ready(self, statefulset) -> bool: + """Check if a statefulset has all replicas ready.""" + status = statefulset.status + spec_replicas = statefulset.spec.replicas or 0 + return ( + status is not None + and status.ready_replicas is not None + and status.ready_replicas == spec_replicas + and spec_replicas > 0 + ) + def _is_deployment_condition_met(self, deployment, condition_type: str) -> bool: """Check if a deployment meets the specified condition.""" if not deployment.status or not deployment.status.conditions: diff --git a/modules/python/crud/azure/node_pool_crud.py b/modules/python/crud/azure/node_pool_crud.py index 208589d3d7..f25fd98b3f 100644 --- a/modules/python/crud/azure/node_pool_crud.py +++ b/modules/python/crud/azure/node_pool_crud.py @@ -409,3 +409,140 @@ def create_deployment( return True logger.warning("Created %d/%d deployment(s)", successful_deployments, number_of_deployments) return False + + def _apply_statefulset( + self, + k8s_client, + node_pool_name, + statefulset_index, + replicas, + manifest_dir, + label_selector, + namespace + ): + """Helper for create_statefulset — applies and verifies a single StatefulSet.""" + if manifest_dir: + # Use the template path from manifest_dir + template_path = f"{manifest_dir}/statefulset.yml" + else: + # Use default template path (relative to workingDirectory: modules/python) + template_path = os.path.join( + os.path.dirname(os.path.abspath(__file__)), + "..", "workload_templates", "statefulset.yml" + ) + + # Generate StatefulSet name + statefulset_name = f"myapp-{node_pool_name}-{statefulset_index}" + + # Use per-statefulset label to avoid selector collision + statefulset_label = f"{label_selector.split('=', 1)[-1]}-{statefulset_index}" + + # Create StatefulSet template using k8s_client.create_template + statefulset_template = k8s_client.create_template( + template_path, + { + "STATEFULSET_REPLICAS": replicas, + "NODE_POOL_NAME": node_pool_name, + "INDEX": statefulset_index, + "LABEL_VALUE": statefulset_label, + } + ) + + # Apply each document in the rendered multi-doc template + for doc in yaml.safe_load_all(statefulset_template): + if doc: + k8s_client.apply_manifest_from_file(manifest_dict=doc, namespace=namespace) + + logger.info("Applied manifest for StatefulSet %s", statefulset_name) + + # Wait for statefulset to be ready (successful statefulset verification) + logger.info("Waiting for StatefulSet %s to become ready...", statefulset_name) + statefulset_ready = k8s_client.wait_for_condition( + resource_type="statefulset", + wait_condition_type="ready", + resource_name=statefulset_name, + namespace=namespace, + timeout_seconds=self.step_timeout + ) + + if not statefulset_ready: + raise TimeoutError( + f"StatefulSet {statefulset_name} failed to become ready within timeout" + ) + + logger.info("StatefulSet %s is successfully ready", statefulset_name) + + # Additionally wait for pods to be ready + logger.info("Waiting for pods of StatefulSet %s to be ready...", statefulset_name) + k8s_client.wait_for_pods_ready( + operation_timeout_in_minutes=5, + namespace=namespace, + pod_count=replicas, + label_selector=f"app={statefulset_label}" + ) + + logger.info("Successfully created and verified StatefulSet %d", statefulset_index) + + def create_statefulset( + self, + node_pool_name, + replicas=10, + manifest_dir=None, + number_of_statefulsets=1, + label_selector="app=nginx-container", + namespace="default" + ): + """ + Create Kubernetes StatefulSets after node pool operations. + + Args: + node_pool_name: Name of the node pool to target + namespace: Kubernetes namespace (default: "default") + replicas: Number of replicas for the StatefulSet (default: 10) + manifest_dir: Directory containing Kubernetes manifest files + number_of_statefulsets: Number of StatefulSets to create (default: 1) + label_selector: Label selector for pods (default: "app=nginx-container") + + Returns: + True if all StatefulSet creations were successful, False otherwise + """ + logger.info("Creating %d StatefulSet(s)", number_of_statefulsets) + logger.info("Target node pool: %s", node_pool_name) + logger.info("Replicas per StatefulSet: %d", replicas) + logger.info("Using manifest directory: %s", manifest_dir) + + # Get Kubernetes client from AKS client + k8s_client = self.aks_client.k8s_client + + if not k8s_client: + logger.error("Kubernetes client not available") + return False + + successful_statefulsets = 0 + + # Loop through number of StatefulSets + for statefulset_index in range(1, number_of_statefulsets + 1): + logger.info("Creating StatefulSet %d/%d", statefulset_index, number_of_statefulsets) + + try: + self._apply_statefulset( + k8s_client=k8s_client, + node_pool_name=node_pool_name, + statefulset_index=statefulset_index, + replicas=replicas, + manifest_dir=manifest_dir, + label_selector=label_selector, + namespace=namespace + ) + successful_statefulsets += 1 + except Exception as e: + logger.error("Failed to create StatefulSet %d: %s", statefulset_index, e) + # Continue with next StatefulSet instead of failing completely + continue + + # Check if all StatefulSets were successful + if successful_statefulsets == number_of_statefulsets: + logger.info("Successfully created all %d StatefulSet(s)", number_of_statefulsets) + return True + logger.warning("Created %d/%d StatefulSet(s)", successful_statefulsets, number_of_statefulsets) + return False diff --git a/modules/python/crud/main.py b/modules/python/crud/main.py index 635d14462a..23ff741fbf 100644 --- a/modules/python/crud/main.py +++ b/modules/python/crud/main.py @@ -147,7 +147,7 @@ def handle_node_pool_operation(node_pool_crud, args): return 1 def handle_workload_operations(node_pool_crud, args): - """Handle workload operations (deployment, statefulset, jobs) based on the command""" + """Handle workload operations (deployment, statefulset) based on the command""" command = args.command result = None @@ -167,6 +167,21 @@ def handle_workload_operations(node_pool_crud, args): } result = node_pool_crud.create_deployment(**deploy_kwargs) + elif command == "statefulset": + if not hasattr(node_pool_crud, 'create_statefulset'): + logger.error("Cloud provider does not support statefulset workload operations") + return 1 + + # Prepare statefulset arguments + statefulset_kwargs = { + "node_pool_name": args.node_pool_name, + "replicas": args.replicas, + "manifest_dir": args.manifest_dir, + "number_of_statefulsets": args.count, + "label_selector": args.label_selector, + } + + result = node_pool_crud.create_statefulset(**statefulset_kwargs) else: logger.error("Unknown workload command: '%s'", command) return 1 @@ -386,6 +401,14 @@ def main(): ) deployment_parser.set_defaults(func=handle_workload_operations) + # StatefulSet command + statefulset_parser = subparsers.add_parser( + "statefulset", + parents=[common_parser, workload_common_parser], + help="create statefulsets" + ) + statefulset_parser.set_defaults(func=handle_workload_operations) + # Arguments provided, run node pool operations and collect benchmark results try: args = parser.parse_args() diff --git a/modules/python/crud/workload_templates/statefulset.yml b/modules/python/crud/workload_templates/statefulset.yml new file mode 100644 index 0000000000..c85725316c --- /dev/null +++ b/modules/python/crud/workload_templates/statefulset.yml @@ -0,0 +1,34 @@ +apiVersion: apps/v1 +kind: StatefulSet +metadata: + name: myapp-{{NODE_POOL_NAME}}-{{INDEX}} + labels: + app: {{LABEL_VALUE}} +spec: + serviceName: myapp-{{NODE_POOL_NAME}}-{{INDEX}} + replicas: {{STATEFULSET_REPLICAS}} + selector: + matchLabels: + app: {{LABEL_VALUE}} + template: + metadata: + labels: + app: {{LABEL_VALUE}} + spec: + containers: + - name: {{LABEL_VALUE}} + image: mcr.microsoft.com/oss/nginx/nginx:1.21.6 + ports: + - containerPort: 80 +--- +apiVersion: v1 +kind: Service +metadata: + name: myapp-{{NODE_POOL_NAME}}-{{INDEX}} +spec: + ports: + - port: 80 + name: myapp + clusterIP: None + selector: + app: {{LABEL_VALUE}} diff --git a/modules/python/tests/clients/test_kubernetes_client.py b/modules/python/tests/clients/test_kubernetes_client.py index 8190789361..a67d52fef2 100644 --- a/modules/python/tests/clients/test_kubernetes_client.py +++ b/modules/python/tests/clients/test_kubernetes_client.py @@ -3407,6 +3407,77 @@ def test_wait_for_condition_case_insensitive(self): ) self.assertTrue(result) + @patch('time.time') + def test_wait_for_condition_statefulset_success(self, mock_time): + """Test wait_for_condition for statefulset - success case""" + mock_time.side_effect = [0, 0, 1, 2, 2] + + mock_statefulset = MagicMock() + mock_statefulset.spec.replicas = 3 + mock_statefulset.status.ready_replicas = 3 + + with patch.object(self.client, 'app') as mock_app, \ + patch('time.sleep'): + mock_app.read_namespaced_stateful_set.return_value = mock_statefulset + + result = self.client.wait_for_condition( + resource_type="statefulset", + resource_name="test-statefulset", + wait_condition_type="ready", + namespace="test-namespace", + timeout_seconds=5 + ) + + self.assertTrue(result) + mock_app.read_namespaced_stateful_set.assert_called_with( + name="test-statefulset", + namespace="test-namespace" + ) + + @patch('time.time') + def test_wait_for_condition_statefulset_timeout(self, mock_time): + """Test wait_for_condition for statefulset - timeout case""" + mock_time.side_effect = [0, 0, 2, 5, 6, 6] + + mock_statefulset = MagicMock() + mock_statefulset.spec.replicas = 3 + mock_statefulset.status.ready_replicas = 1 + + with patch.object(self.client, 'app') as mock_app, \ + patch('time.sleep'): + mock_app.read_namespaced_stateful_set.return_value = mock_statefulset + + result = self.client.wait_for_condition( + resource_type="statefulset", + resource_name="test-statefulset", + wait_condition_type="ready", + namespace="test-namespace", + timeout_seconds=1 + ) + + self.assertFalse(result) + + @patch('time.time') + def test_wait_for_condition_statefulset_not_found(self, mock_time): + """Test wait_for_condition for statefulset - not found case""" + mock_time.side_effect = [0, 0, 2, 5, 6, 6] + + api_exception = ApiException(status=404, reason="Not Found") + + with patch.object(self.client, 'app') as mock_app, \ + patch('time.sleep'): + mock_app.read_namespaced_stateful_set.side_effect = api_exception + + result = self.client.wait_for_condition( + resource_type="statefulset", + resource_name="nonexistent", + wait_condition_type="ready", + namespace="test-namespace", + timeout_seconds=1 + ) + + self.assertFalse(result) + # Tests for the enhanced apply_manifest_from_file method with folder support @patch('os.path.isdir') @patch('os.path.isfile') diff --git a/modules/python/tests/crud/test_azure_node_pool_crud.py b/modules/python/tests/crud/test_azure_node_pool_crud.py index 9064a5af4a..20ba49e558 100644 --- a/modules/python/tests/crud/test_azure_node_pool_crud.py +++ b/modules/python/tests/crud/test_azure_node_pool_crud.py @@ -393,5 +393,75 @@ def test_create_deployment_partial_success(self): # Verify create_template was called 3 times (attempted all deployments) self.assertEqual(mock_k8s_client.create_template.call_count, 3) + def test_create_statefulset_success(self): + """Test successful statefulset creation""" + # Setup + mock_k8s_client = mock.MagicMock() + self.mock_aks_client.k8s_client = mock_k8s_client + # Must return a real string - yaml.safe_load_all(MagicMock()) causes an infinite loop + mock_k8s_client.create_template.return_value = "apiVersion: apps/v1\nkind: StatefulSet\n" + mock_k8s_client.wait_for_condition.return_value = True + + # Execute + result = self.node_pool_crud.create_statefulset(node_pool_name="test-pool") + + # Verify + self.assertTrue(result) + + def test_create_statefulset_failure(self): + """Test statefulset creation failure""" + # Setup + mock_k8s_client = mock.MagicMock() + self.mock_aks_client.k8s_client = mock_k8s_client + # Must return a real string - yaml.safe_load_all(MagicMock()) causes an infinite loop + mock_k8s_client.create_template.return_value = "apiVersion: apps/v1\nkind: StatefulSet\n" + mock_k8s_client.wait_for_condition.return_value = False + + # Execute + result = self.node_pool_crud.create_statefulset(node_pool_name="test-pool") + + # Verify + self.assertFalse(result) + + def test_create_statefulset_no_client(self): + """Test statefulset creation with no Kubernetes client""" + # Setup + self.mock_aks_client.k8s_client = None + + # Execute + result = self.node_pool_crud.create_statefulset(node_pool_name="test-pool") + + # Verify + self.assertFalse(result) + + def test_create_statefulset_partial_success(self): + """Test Statefulset creation when some Statefulsets succeed and others fail""" + # Setup + mock_k8s_client = mock.MagicMock() + self.mock_aks_client.k8s_client = mock_k8s_client + + # Must return a real string - yaml.safe_load_all(MagicMock()) causes an infinite loop + mock_k8s_client.create_template.return_value = "apiVersion: apps/v1\nkind: StatefulSet\n" + + # Simulate: StatefulSet 1 succeeds, StatefulSet 2 fails, StatefulSet 3 succeeds + # wait_for_condition returns True/False for each StatefulSet + mock_k8s_client.wait_for_condition.side_effect = [True, False, True] + + # Execute - request 3 StatefulSets + result = self.node_pool_crud.create_statefulset( + node_pool_name="test-pool", + number_of_statefulsets=3, + replicas=5 + ) + + # Verify - should return False (not all statefulsets succeeded) + self.assertFalse(result) + + # Verify wait_for_condition was called 3 times (once per statefulset) + self.assertEqual(mock_k8s_client.wait_for_condition.call_count, 3) + + # Verify create_template was called 3 times (attempted all statefulsets) + self.assertEqual(mock_k8s_client.create_template.call_count, 3) + if __name__ == "__main__": unittest.main() diff --git a/pipelines/system/new-pipeline-test.yml b/pipelines/system/new-pipeline-test.yml index 63d55f02d9..22b4435c8a 100644 --- a/pipelines/system/new-pipeline-test.yml +++ b/pipelines/system/new-pipeline-test.yml @@ -1,25 +1,33 @@ trigger: none variables: - SCENARIO_TYPE: - SCENARIO_NAME: + SCENARIO_TYPE: system + SCENARIO_NAME: k8s-crud-statefulset-test stages: - - stage: # format: [_]+ (e.g. azure_eastus2, aws_eastus_westus) + - stage: azure_australiaeast dependsOn: [] jobs: - - template: /jobs/competitive-test.yml # must keep as is + - template: /jobs/competitive-test.yml parameters: - cloud: # e.g. azure, aws - regions: # list of regions - - region1 # e.g. eastus2 - topology: # e.g. cluster-autoscaler - engine: # e.g. clusterloader2 - matrix: # list of test parameters to customize the provisioned resources - : - : - : - max_parallel: # required - credential_type: service_connection # required + cloud: azure + regions: + - australiaeast + topology: k8s-crud-gpu + engine: crud + matrix: + statefulset_crud: + pool_name: testpool + vm_size: Standard_D4s_v3 + create_node_count: 0 + scale_node_count: 3 + scale_step_size: 1 + step_time_out: 600 + step_wait_time: 30 + count: 1 + replicas: 10 + manifest_dir: "" + max_parallel: 1 + credential_type: service_connection ssh_key_enabled: false - timeout_in_minutes: 60 # if not specified, default is 60 + timeout_in_minutes: 120 diff --git a/steps/engine/crud/k8s/execute.yml b/steps/engine/crud/k8s/execute.yml index bd3fbef2e3..3191982a65 100644 --- a/steps/engine/crud/k8s/execute.yml +++ b/steps/engine/crud/k8s/execute.yml @@ -86,6 +86,33 @@ steps: REPLICAS: ${{ parameters.replicas }} MANIFEST_DIR: ${{ parameters.manifest_dir }} + - script: | + set -eo pipefail + + # Deploy StatefulSets + PYTHONPATH=$PYTHONPATH:$(pwd) python3 "$PYTHON_SCRIPT_FILE" statefulset \ + --cloud "$CLOUD" \ + --run-id "$RUN_ID" \ + --result-dir "$RESULT_DIR" \ + --node-pool-name "$POOL_NAME" \ + --count "$COUNT" \ + --replicas "$REPLICAS" \ + ${MANIFEST_DIR:+--manifest-dir "$MANIFEST_DIR"} \ + --step-timeout "$STEP_TIME_OUT" \ + ${GPU_NODE_POOL:+--gpu-node-pool} + displayName: 'Execute K8s StatefulSet operations for ${{ parameters.cloud }}' + workingDirectory: modules/python + env: + PYTHON_SCRIPT_FILE: $(Pipeline.Workspace)/s/modules/python/crud/main.py + POOL_NAME: ${{ parameters.pool_name }} + CLOUD: ${{ parameters.cloud }} + STEP_TIME_OUT: ${{ parameters.step_time_out }} + RESULT_DIR: $(System.DefaultWorkingDirectory)/$(RUN_ID) + GPU_NODE_POOL: ${{ parameters.gpu_node_pool }} + COUNT: ${{ parameters.count }} + REPLICAS: ${{ parameters.replicas }} + MANIFEST_DIR: ${{ parameters.manifest_dir }} + - script: | set -eo pipefail