Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 18 additions & 5 deletions internal/controller/appwrapper/appwrapper_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ var _ = Describe("AppWrapper Controller", func() {
awConfig.FaultTolerance.RetryLimit = 0
awConfig.FaultTolerance.SuccessTTL = 0 * time.Second
awConfig.Autopilot.ResourceTaints["nvidia.com/gpu"] = append(awConfig.Autopilot.ResourceTaints["nvidia.com/gpu"], v1.Taint{Key: "extra", Value: "test", Effect: v1.TaintEffectNoExecute})
awConfig.Autopilot.ResourceTaints["nvidia.com/gpu"] = append(awConfig.Autopilot.ResourceTaints["nvidia.com/gpu"], v1.Taint{Key: "extra2", Value: "test2", Effect: v1.TaintEffectPreferNoSchedule})

awReconciler = &AppWrapperReconciler{
Client: k8sClient,
Expand Down Expand Up @@ -187,11 +188,23 @@ var _ = Describe("AppWrapper Controller", func() {
mes := p.Spec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms[0].MatchExpressions
for _, taint := range awReconciler.Config.Autopilot.ResourceTaints["nvidia.com/gpu"] {
found := false
for _, me := range mes {
if me.Key == taint.Key {
Expect(me.Operator).Should(Equal(v1.NodeSelectorOpNotIn))
Expect(me.Values).Should(ContainElement(taint.Value))
found = true
if taint.Effect == v1.TaintEffectNoExecute || taint.Effect == v1.TaintEffectNoSchedule {
for _, me := range mes {
if me.Key == taint.Key {
Expect(me.Operator).Should(Equal(v1.NodeSelectorOpNotIn))
Expect(me.Values).Should(ContainElement(taint.Value))
found = true
}
}
} else {
for _, st := range p.Spec.Affinity.NodeAffinity.PreferredDuringSchedulingIgnoredDuringExecution {
for _, me := range st.Preference.MatchExpressions {
if me.Key == taint.Key {
Expect(me.Operator).Should(Equal(v1.NodeSelectorOpNotIn))
Expect(me.Values).Should(ContainElement(taint.Value))
found = true
}
}
}
}
Expect(found).Should(BeTrue())
Expand Down
10 changes: 6 additions & 4 deletions internal/controller/appwrapper/node_health_monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,10 +161,12 @@ func (r *NodeHealthMonitor) updateNoScheduleNodes(ctx context.Context, node *v1.
for key, value := range node.GetLabels() {
for resourceName, taints := range r.Config.Autopilot.ResourceTaints {
for _, taint := range taints {
if key == taint.Key && value == taint.Value {
quantity := node.Status.Capacity.Name(v1.ResourceName(resourceName), resource.DecimalSI)
if !quantity.IsZero() {
noScheduleResources[v1.ResourceName(resourceName)] = *quantity
if taint.Effect == v1.TaintEffectNoExecute || taint.Effect == v1.TaintEffectNoSchedule {
if key == taint.Key && value == taint.Value {
quantity := node.Status.Capacity.Name(v1.ResourceName(resourceName), resource.DecimalSI)
if !quantity.IsZero() {
noScheduleResources[v1.ResourceName(resourceName)] = *quantity
}
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions internal/controller/appwrapper/node_health_monitor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ var _ = Describe("NodeMonitor Controller", func() {
Expect(noExecuteNodes[node1Name.Name]).Should(HaveKey("nvidia.com/gpu"))

By("Removing the EVICT label updates unhealthyNodes")
node.Labels["autopilot.ibm.com/gpuhealth"] = "ERR"
node.Labels["autopilot.ibm.com/gpuhealth"] = "WARN"
Expect(k8sClient.Update(ctx, node)).Should(Succeed())
_, err = nodeMonitor.Reconcile(ctx, reconcile.Request{NamespacedName: node1Name})
Expect(err).NotTo(HaveOccurred())
Expand Down Expand Up @@ -151,7 +151,7 @@ var _ = Describe("NodeMonitor Controller", func() {

// remove another 4 gpus, lending limit should be 0 = max(0, 6-4-4)
node2 := getNode(node2Name.Name)
node2.Labels["autopilot.ibm.com/gpuhealth"] = "ERR"
node2.Labels["autopilot.ibm.com/gpuhealth"] = "TESTING"
Expect(k8sClient.Update(ctx, node2)).Should(Succeed())
_, err = nodeMonitor.Reconcile(ctx, reconcile.Request{NamespacedName: node2Name})
Expect(err).NotTo(HaveOccurred())
Expand Down
110 changes: 74 additions & 36 deletions internal/controller/appwrapper/resource_management.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
kresource "k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/utils/ptr"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/log"
Expand Down Expand Up @@ -108,7 +109,7 @@ func hasResourceRequest(spec map[string]interface{}, resource string) bool {
return false
}

func addNodeSelectorsToAffinity(spec map[string]interface{}, exprsToAdd []v1.NodeSelectorRequirement) error {
func addNodeSelectorsToAffinity(spec map[string]interface{}, exprsToAdd []v1.NodeSelectorRequirement, required bool, weight int32) error {
if _, ok := spec["affinity"]; !ok {
spec["affinity"] = map[string]interface{}{}
}
Expand All @@ -123,44 +124,64 @@ func addNodeSelectorsToAffinity(spec map[string]interface{}, exprsToAdd []v1.Nod
if !ok {
return fmt.Errorf("spec.affinity.nodeAffinity is not a map")
}
if _, ok := nodeAffinity["requiredDuringSchedulingIgnoredDuringExecution"]; !ok {
nodeAffinity["requiredDuringSchedulingIgnoredDuringExecution"] = map[string]interface{}{}
}
nodeSelector, ok := nodeAffinity["requiredDuringSchedulingIgnoredDuringExecution"].(map[string]interface{})
if !ok {
return fmt.Errorf("spec.affinity.nodeAffinity.requiredDuringSchedulingIgnoredDuringExecution is not a map")
}
if _, ok := nodeSelector["nodeSelectorTerms"]; !ok {
nodeSelector["nodeSelectorTerms"] = []interface{}{map[string]interface{}{}}
}
existingTerms, ok := nodeSelector["nodeSelectorTerms"].([]interface{})
if !ok {
return fmt.Errorf("spec.affinity.nodeAffinity.requiredDuringSchedulingIgnoredDuringExecution.nodeSelectorTerms is not an array")
}
for idx, term := range existingTerms {
selTerm, ok := term.(map[string]interface{})
if required {
if _, ok := nodeAffinity["requiredDuringSchedulingIgnoredDuringExecution"]; !ok {
nodeAffinity["requiredDuringSchedulingIgnoredDuringExecution"] = map[string]interface{}{}
}
nodeSelector, ok := nodeAffinity["requiredDuringSchedulingIgnoredDuringExecution"].(map[string]interface{})
if !ok {
return fmt.Errorf("spec.affinity.nodeAffinity.requiredDuringSchedulingIgnoredDuringExecution.nodeSelectorTerms[%v] is not an map", idx)
return fmt.Errorf("spec.affinity.nodeAffinity.requiredDuringSchedulingIgnoredDuringExecution is not a map")
}
if _, ok := selTerm["matchExpressions"]; !ok {
selTerm["matchExpressions"] = []interface{}{}
if _, ok := nodeSelector["nodeSelectorTerms"]; !ok {
nodeSelector["nodeSelectorTerms"] = []interface{}{map[string]interface{}{}}
}
matchExpressions, ok := selTerm["matchExpressions"].([]interface{})
existingTerms, ok := nodeSelector["nodeSelectorTerms"].([]interface{})
if !ok {
return fmt.Errorf("spec.affinity.nodeAffinity.requiredDuringSchedulingIgnoredDuringExecution.nodeSelectorTerms[%v].matchExpressions is not an map", idx)
return fmt.Errorf("spec.affinity.nodeAffinity.requiredDuringSchedulingIgnoredDuringExecution.nodeSelectorTerms is not an array")
}
for _, expr := range exprsToAdd {
bytes, err := json.Marshal(expr)
if err != nil {
return fmt.Errorf("marshalling selectorTerm %v: %w", expr, err)
for idx, term := range existingTerms {
selTerm, ok := term.(map[string]interface{})
if !ok {
return fmt.Errorf("spec.affinity.nodeAffinity.requiredDuringSchedulingIgnoredDuringExecution.nodeSelectorTerms[%v] is not an map", idx)
}
var obj interface{}
if err = json.Unmarshal(bytes, &obj); err != nil {
return fmt.Errorf("unmarshalling selectorTerm %v: %w", expr, err)
if _, ok := selTerm["matchExpressions"]; !ok {
selTerm["matchExpressions"] = []interface{}{}
}
matchExpressions = append(matchExpressions, obj)
matchExpressions, ok := selTerm["matchExpressions"].([]interface{})
if !ok {
return fmt.Errorf("spec.affinity.nodeAffinity.requiredDuringSchedulingIgnoredDuringExecution.nodeSelectorTerms[%v].matchExpressions is not an map", idx)
}
for _, expr := range exprsToAdd {
bytes, err := json.Marshal(expr)
if err != nil {
return fmt.Errorf("marshalling selectorTerm %v: %w", expr, err)
}
var obj interface{}
if err = json.Unmarshal(bytes, &obj); err != nil {
return fmt.Errorf("unmarshalling selectorTerm %v: %w", expr, err)
}
matchExpressions = append(matchExpressions, obj)
}
selTerm["matchExpressions"] = matchExpressions
}
} else {
if _, ok := nodeAffinity["preferredDuringSchedulingIgnoredDuringExecution"]; !ok {
nodeAffinity["preferredDuringSchedulingIgnoredDuringExecution"] = []interface{}{}
}
selTerm["matchExpressions"] = matchExpressions
terms, ok := nodeAffinity["preferredDuringSchedulingIgnoredDuringExecution"].([]interface{})
if !ok {
return fmt.Errorf("spec.affinity.nodeAffinity.preferredDuringSchedulingIgnoredDuringExecution is not an array")
}
bytes, err := json.Marshal(v1.PreferredSchedulingTerm{Weight: weight, Preference: v1.NodeSelectorTerm{MatchExpressions: exprsToAdd}})
if err != nil {
return fmt.Errorf("marshalling selectorTerms %v: %w", exprsToAdd, err)
}
var obj interface{}
if err = json.Unmarshal(bytes, &obj); err != nil {
return fmt.Errorf("unmarshalling selectorTerms %v: %w", exprsToAdd, err)
}
terms = append(terms, obj)
nodeAffinity["preferredDuringSchedulingIgnoredDuringExecution"] = terms
}

return nil
Expand Down Expand Up @@ -291,20 +312,35 @@ func (r *AppWrapperReconciler) createComponent(ctx context.Context, aw *workload
}

if r.Config.Autopilot != nil && r.Config.Autopilot.InjectAntiAffinities {
toAdd := map[string][]string{}
toAddRequired := map[string][]string{}
toAddPreferred := map[string][]string{}
for resource, taints := range r.Config.Autopilot.ResourceTaints {
if hasResourceRequest(spec, resource) {
for _, taint := range taints {
toAdd[taint.Key] = append(toAdd[taint.Key], taint.Value)
if taint.Effect == v1.TaintEffectNoExecute || taint.Effect == v1.TaintEffectNoSchedule {
toAddRequired[taint.Key] = append(toAddRequired[taint.Key], taint.Value)
} else if taint.Effect == v1.TaintEffectPreferNoSchedule {
toAddPreferred[taint.Key] = append(toAddPreferred[taint.Key], taint.Value)
}
}
}
}
if len(toAdd) > 0 {
if len(toAddRequired) > 0 {
matchExpressions := []v1.NodeSelectorRequirement{}
for k, v := range toAddRequired {
matchExpressions = append(matchExpressions, v1.NodeSelectorRequirement{Operator: v1.NodeSelectorOpNotIn, Key: k, Values: v})
}
if err := addNodeSelectorsToAffinity(spec, matchExpressions, true, 0); err != nil {
log.FromContext(ctx).Error(err, "failed to inject Autopilot affinities")
}
}
if len(toAddPreferred) > 0 {
matchExpressions := []v1.NodeSelectorRequirement{}
for k, v := range toAdd {
for k, v := range toAddPreferred {
matchExpressions = append(matchExpressions, v1.NodeSelectorRequirement{Operator: v1.NodeSelectorOpNotIn, Key: k, Values: v})
}
if err := addNodeSelectorsToAffinity(spec, matchExpressions); err != nil {
weight := ptr.Deref(r.Config.Autopilot.PreferNoScheduleWeight, 1)
if err := addNodeSelectorsToAffinity(spec, matchExpressions, false, weight); err != nil {
log.FromContext(ctx).Error(err, "failed to inject Autopilot affinities")
}
}
Expand All @@ -315,6 +351,8 @@ func (r *AppWrapperReconciler) createComponent(ctx context.Context, aw *workload
return err, true
}

log.FromContext(ctx).Info("After injection", "obj", obj)

orig := copyForStatusPatch(aw)
if meta.FindStatusCondition(aw.Status.ComponentStatus[componentIdx].Conditions, string(workloadv1beta2.ResourcesDeployed)) == nil {
aw.Status.ComponentStatus[componentIdx].Name = obj.GetName()
Expand Down
11 changes: 7 additions & 4 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (

v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/utils/ptr"
"sigs.k8s.io/kueue/apis/config/v1beta1"
)

Expand Down Expand Up @@ -51,9 +52,10 @@ type KueueJobReconcillerConfig struct {
}

type AutopilotConfig struct {
InjectAntiAffinities bool `json:"injectAntiAffinities,omitempty"`
MonitorNodes bool `json:"monitorNodes,omitempty"`
ResourceTaints map[string][]v1.Taint `json:"resourceTaints,omitempty"`
InjectAntiAffinities bool `json:"injectAntiAffinities,omitempty"`
MonitorNodes bool `json:"monitorNodes,omitempty"`
ResourceTaints map[string][]v1.Taint `json:"resourceTaints,omitempty"`
PreferNoScheduleWeight *int32 `json:"preferNoScheduleWeight,omitempty"`
}

type FaultToleranceConfig struct {
Expand Down Expand Up @@ -116,10 +118,11 @@ func NewAppWrapperConfig() *AppWrapperConfig {
MonitorNodes: true,
ResourceTaints: map[string][]v1.Taint{
"nvidia.com/gpu": {
{Key: "autopilot.ibm.com/gpuhealth", Value: "ERR", Effect: v1.TaintEffectNoSchedule},
{Key: "autopilot.ibm.com/gpuhealth", Value: "WARN", Effect: v1.TaintEffectPreferNoSchedule},
{Key: "autopilot.ibm.com/gpuhealth", Value: "TESTING", Effect: v1.TaintEffectNoSchedule},
{Key: "autopilot.ibm.com/gpuhealth", Value: "EVICT", Effect: v1.TaintEffectNoExecute}},
},
PreferNoScheduleWeight: ptr.To(int32(50)),
},
UserRBACAdmissionCheck: true,
FaultTolerance: &FaultToleranceConfig{
Expand Down