Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 37 additions & 1 deletion modules/python/clients/kubernetes_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -975,7 +975,8 @@ def wait_for_condition(self, resource_type: str, wait_condition_type: str, names
valid_conditions = {
'deployment': ['available', 'progressing', 'replicafailure', 'ready'],
'deployments': ['available', 'progressing', 'replicafailure', 'ready'],
# Add more resource types as needed
'statefulset': ['ready'],
'statefulsets': ['ready'],
}

# Validate wait_condition_type format and type
Expand Down Expand Up @@ -1049,6 +1050,9 @@ def _check_resource_condition(self, resource_type: str, resource_name: str, cond
if resource_type_lower in ['deployment', 'deployments']:
return self._check_deployment_condition(resource_name, condition_type, namespace, wait_all)

if resource_type_lower in ['statefulset', 'statefulsets']:
return self._check_statefulset_condition(resource_name, namespace, wait_all)

logger.warning(f"Unsupported resource type for condition checking: {resource_type}")
return False

Expand Down Expand Up @@ -1079,6 +1083,38 @@ def _check_deployment_condition(self, resource_name: str, condition_type: str, n
return False
raise e

def _check_statefulset_condition(self, resource_name: str, namespace: str, wait_all: bool) -> bool:
"""Check statefulset condition (e.g., 'ready')."""
try:
if wait_all or not resource_name:
statefulsets = self.app.list_namespaced_stateful_set(namespace=namespace).items
else:
statefulset = self.app.read_namespaced_stateful_set(name=resource_name, namespace=namespace)
statefulsets = [statefulset]

for statefulset in statefulsets:
if not self._is_statefulset_ready(statefulset):
return False

return True

except client.rest.ApiException as e:
if e.status == 404:
logger.debug("StatefulSet not found, waiting...")
return False
raise e

def _is_statefulset_ready(self, statefulset) -> bool:
"""Check if a statefulset has all replicas ready."""
status = statefulset.status
spec_replicas = statefulset.spec.replicas or 0
return (
status is not None
and status.ready_replicas is not None
and status.ready_replicas == spec_replicas
and spec_replicas > 0
)

def _is_deployment_condition_met(self, deployment, condition_type: str) -> bool:
"""Check if a deployment meets the specified condition."""
if not deployment.status or not deployment.status.conditions:
Expand Down
137 changes: 137 additions & 0 deletions modules/python/crud/azure/node_pool_crud.py
Original file line number Diff line number Diff line change
Expand Up @@ -409,3 +409,140 @@ def create_deployment(
return True
logger.warning("Created %d/%d deployment(s)", successful_deployments, number_of_deployments)
return False

def _apply_statefulset(
self,
k8s_client,
node_pool_name,
statefulset_index,
replicas,
manifest_dir,
label_selector,
namespace
):
"""Helper for create_statefulset — applies and verifies a single StatefulSet."""
if manifest_dir:
# Use the template path from manifest_dir
template_path = f"{manifest_dir}/statefulset.yml"
else:
# Use default template path (relative to workingDirectory: modules/python)
template_path = os.path.join(
os.path.dirname(os.path.abspath(__file__)),
"..", "workload_templates", "statefulset.yml"
)

# Generate StatefulSet name
statefulset_name = f"myapp-{node_pool_name}-{statefulset_index}"

# Use per-statefulset label to avoid selector collision
statefulset_label = f"{label_selector.split('=', 1)[-1]}-{statefulset_index}"

# Create StatefulSet template using k8s_client.create_template
statefulset_template = k8s_client.create_template(
template_path,
{
"STATEFULSET_REPLICAS": replicas,
"NODE_POOL_NAME": node_pool_name,
"INDEX": statefulset_index,
"LABEL_VALUE": statefulset_label,
}
)

# Apply each document in the rendered multi-doc template
for doc in yaml.safe_load_all(statefulset_template):
if doc:
k8s_client.apply_manifest_from_file(manifest_dict=doc, namespace=namespace)

logger.info("Applied manifest for StatefulSet %s", statefulset_name)

# Wait for statefulset to be ready (successful statefulset verification)
logger.info("Waiting for StatefulSet %s to become ready...", statefulset_name)
statefulset_ready = k8s_client.wait_for_condition(
resource_type="statefulset",
wait_condition_type="ready",
resource_name=statefulset_name,
namespace=namespace,
timeout_seconds=self.step_timeout
)

if not statefulset_ready:
raise TimeoutError(
f"StatefulSet {statefulset_name} failed to become ready within timeout"
)

logger.info("StatefulSet %s is successfully ready", statefulset_name)

# Additionally wait for pods to be ready
logger.info("Waiting for pods of StatefulSet %s to be ready...", statefulset_name)
k8s_client.wait_for_pods_ready(
operation_timeout_in_minutes=5,
namespace=namespace,
pod_count=replicas,
label_selector=f"app={statefulset_label}"
)

logger.info("Successfully created and verified StatefulSet %d", statefulset_index)

def create_statefulset(
self,
node_pool_name,
replicas=10,
manifest_dir=None,
number_of_statefulsets=1,
label_selector="app=nginx-container",
namespace="default"
):
"""
Create Kubernetes StatefulSets after node pool operations.

Args:
node_pool_name: Name of the node pool to target
namespace: Kubernetes namespace (default: "default")
replicas: Number of replicas for the StatefulSet (default: 10)
manifest_dir: Directory containing Kubernetes manifest files
number_of_statefulsets: Number of StatefulSets to create (default: 1)
label_selector: Label selector for pods (default: "app=nginx-container")

Returns:
True if all StatefulSet creations were successful, False otherwise
"""
logger.info("Creating %d StatefulSet(s)", number_of_statefulsets)
logger.info("Target node pool: %s", node_pool_name)
logger.info("Replicas per StatefulSet: %d", replicas)
logger.info("Using manifest directory: %s", manifest_dir)

# Get Kubernetes client from AKS client
k8s_client = self.aks_client.k8s_client

if not k8s_client:
logger.error("Kubernetes client not available")
return False

successful_statefulsets = 0

# Loop through number of StatefulSets
for statefulset_index in range(1, number_of_statefulsets + 1):
logger.info("Creating StatefulSet %d/%d", statefulset_index, number_of_statefulsets)

try:
self._apply_statefulset(
k8s_client=k8s_client,
node_pool_name=node_pool_name,
statefulset_index=statefulset_index,
replicas=replicas,
manifest_dir=manifest_dir,
label_selector=label_selector,
namespace=namespace
)
successful_statefulsets += 1
except Exception as e:
logger.error("Failed to create StatefulSet %d: %s", statefulset_index, e)
# Continue with next StatefulSet instead of failing completely
continue

# Check if all StatefulSets were successful
if successful_statefulsets == number_of_statefulsets:
logger.info("Successfully created all %d StatefulSet(s)", number_of_statefulsets)
return True
logger.warning("Created %d/%d StatefulSet(s)", successful_statefulsets, number_of_statefulsets)
return False
25 changes: 24 additions & 1 deletion modules/python/crud/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ def handle_node_pool_operation(node_pool_crud, args):
return 1

def handle_workload_operations(node_pool_crud, args):
"""Handle workload operations (deployment, statefulset, jobs) based on the command"""
"""Handle workload operations (deployment, statefulset) based on the command"""
command = args.command
result = None

Expand All @@ -167,6 +167,21 @@ def handle_workload_operations(node_pool_crud, args):
}

result = node_pool_crud.create_deployment(**deploy_kwargs)
elif command == "statefulset":
if not hasattr(node_pool_crud, 'create_statefulset'):
logger.error("Cloud provider does not support statefulset workload operations")
return 1

# Prepare statefulset arguments
statefulset_kwargs = {
"node_pool_name": args.node_pool_name,
"replicas": args.replicas,
"manifest_dir": args.manifest_dir,
"number_of_statefulsets": args.count,
"label_selector": args.label_selector,
}

result = node_pool_crud.create_statefulset(**statefulset_kwargs)
else:
logger.error("Unknown workload command: '%s'", command)
return 1
Expand Down Expand Up @@ -386,6 +401,14 @@ def main():
)
deployment_parser.set_defaults(func=handle_workload_operations)

# StatefulSet command
statefulset_parser = subparsers.add_parser(
"statefulset",
parents=[common_parser, workload_common_parser],
help="create statefulsets"
)
statefulset_parser.set_defaults(func=handle_workload_operations)

# Arguments provided, run node pool operations and collect benchmark results
try:
args = parser.parse_args()
Expand Down
34 changes: 34 additions & 0 deletions modules/python/crud/workload_templates/statefulset.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
apiVersion: apps/v1
kind: StatefulSet
metadata:
name: myapp-{{NODE_POOL_NAME}}-{{INDEX}}
labels:
app: {{LABEL_VALUE}}
spec:
serviceName: myapp-{{NODE_POOL_NAME}}-{{INDEX}}
replicas: {{STATEFULSET_REPLICAS}}
selector:
matchLabels:
app: {{LABEL_VALUE}}
template:
metadata:
labels:
app: {{LABEL_VALUE}}
spec:
containers:
- name: {{LABEL_VALUE}}
image: mcr.microsoft.com/oss/nginx/nginx:1.21.6
ports:
- containerPort: 80
---
apiVersion: v1
kind: Service
metadata:
name: myapp-{{NODE_POOL_NAME}}-{{INDEX}}
spec:
ports:
- port: 80
name: myapp
clusterIP: None
selector:
app: {{LABEL_VALUE}}
71 changes: 71 additions & 0 deletions modules/python/tests/clients/test_kubernetes_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -3407,6 +3407,77 @@ def test_wait_for_condition_case_insensitive(self):
)
self.assertTrue(result)

@patch('time.time')
def test_wait_for_condition_statefulset_success(self, mock_time):
"""Test wait_for_condition for statefulset - success case"""
mock_time.side_effect = [0, 0, 1, 2, 2]

mock_statefulset = MagicMock()
mock_statefulset.spec.replicas = 3
mock_statefulset.status.ready_replicas = 3

with patch.object(self.client, 'app') as mock_app, \
patch('time.sleep'):
mock_app.read_namespaced_stateful_set.return_value = mock_statefulset

result = self.client.wait_for_condition(
resource_type="statefulset",
resource_name="test-statefulset",
wait_condition_type="ready",
namespace="test-namespace",
timeout_seconds=5
)

self.assertTrue(result)
mock_app.read_namespaced_stateful_set.assert_called_with(
name="test-statefulset",
namespace="test-namespace"
)

@patch('time.time')
def test_wait_for_condition_statefulset_timeout(self, mock_time):
"""Test wait_for_condition for statefulset - timeout case"""
mock_time.side_effect = [0, 0, 2, 5, 6, 6]

mock_statefulset = MagicMock()
mock_statefulset.spec.replicas = 3
mock_statefulset.status.ready_replicas = 1

with patch.object(self.client, 'app') as mock_app, \
patch('time.sleep'):
mock_app.read_namespaced_stateful_set.return_value = mock_statefulset

result = self.client.wait_for_condition(
resource_type="statefulset",
resource_name="test-statefulset",
wait_condition_type="ready",
namespace="test-namespace",
timeout_seconds=1
)

self.assertFalse(result)

@patch('time.time')
def test_wait_for_condition_statefulset_not_found(self, mock_time):
"""Test wait_for_condition for statefulset - not found case"""
mock_time.side_effect = [0, 0, 2, 5, 6, 6]

api_exception = ApiException(status=404, reason="Not Found")

with patch.object(self.client, 'app') as mock_app, \
patch('time.sleep'):
mock_app.read_namespaced_stateful_set.side_effect = api_exception

result = self.client.wait_for_condition(
resource_type="statefulset",
resource_name="nonexistent",
wait_condition_type="ready",
namespace="test-namespace",
timeout_seconds=1
)

self.assertFalse(result)

# Tests for the enhanced apply_manifest_from_file method with folder support
@patch('os.path.isdir')
@patch('os.path.isfile')
Expand Down
Loading
Loading