Skip to content

Commit 658a76c

Browse files
committed
Integrate resource quotas with scale up
1 parent 4177783 commit 658a76c

25 files changed

+592
-169
lines changed

cluster-autoscaler/core/autoscaler.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import (
2828
"k8s.io/autoscaler/cluster-autoscaler/expander/factory"
2929
"k8s.io/autoscaler/cluster-autoscaler/observers/loopstart"
3030
ca_processors "k8s.io/autoscaler/cluster-autoscaler/processors"
31+
"k8s.io/autoscaler/cluster-autoscaler/resourcequotas"
3132
"k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot/predicate"
3233
"k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot/store"
3334
"k8s.io/autoscaler/cluster-autoscaler/simulator/drainability/rules"
@@ -76,6 +77,7 @@ func NewAutoscaler(opts coreoptions.AutoscalerOptions, informerFactory informers
7677
opts.DeleteOptions,
7778
opts.DrainabilityRules,
7879
opts.DraProvider,
80+
opts.QuotasTrackerOptions,
7981
), nil
8082
}
8183

@@ -142,6 +144,13 @@ func initializeDefaultOptions(opts *coreoptions.AutoscalerOptions, informerFacto
142144
}
143145
opts.ExpanderStrategy = expanderStrategy
144146
}
147+
if opts.QuotasTrackerOptions.QuotaProvider == nil {
148+
cloudQuotasProvider := resourcequotas.NewCloudQuotasProvider(opts.CloudProvider)
149+
opts.QuotasTrackerOptions.QuotaProvider = resourcequotas.NewCombinedQuotasProvider([]resourcequotas.Provider{cloudQuotasProvider})
150+
}
151+
if opts.QuotasTrackerOptions.CustomResourcesProcessor == nil {
152+
opts.QuotasTrackerOptions.CustomResourcesProcessor = opts.Processors.CustomResourcesProcessor
153+
}
145154

146155
return nil
147156
}

cluster-autoscaler/core/options/autoscaler.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import (
2727
"k8s.io/autoscaler/cluster-autoscaler/expander"
2828
"k8s.io/autoscaler/cluster-autoscaler/observers/loopstart"
2929
ca_processors "k8s.io/autoscaler/cluster-autoscaler/processors"
30+
"k8s.io/autoscaler/cluster-autoscaler/resourcequotas"
3031
"k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot"
3132
"k8s.io/autoscaler/cluster-autoscaler/simulator/drainability/rules"
3233
draprovider "k8s.io/autoscaler/cluster-autoscaler/simulator/dynamicresources/provider"
@@ -57,4 +58,5 @@ type AutoscalerOptions struct {
5758
DeleteOptions options.NodeDeleteOptions
5859
DrainabilityRules rules.Rules
5960
DraProvider *draprovider.Provider
61+
QuotasTrackerOptions resourcequotas.TrackerOptions
6062
}

cluster-autoscaler/core/scaleup/orchestrator/orchestrator.go

Lines changed: 66 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -22,18 +22,21 @@ import (
2222

2323
appsv1 "k8s.io/api/apps/v1"
2424
apiv1 "k8s.io/api/core/v1"
25+
"k8s.io/apimachinery/pkg/util/sets"
2526
"k8s.io/autoscaler/cluster-autoscaler/cloudprovider"
2627
"k8s.io/autoscaler/cluster-autoscaler/clusterstate"
2728
ca_context "k8s.io/autoscaler/cluster-autoscaler/context"
2829
"k8s.io/autoscaler/cluster-autoscaler/core/scaleup/equivalence"
2930
"k8s.io/autoscaler/cluster-autoscaler/core/scaleup/resource"
31+
"k8s.io/autoscaler/cluster-autoscaler/core/utils"
3032
"k8s.io/autoscaler/cluster-autoscaler/estimator"
3133
"k8s.io/autoscaler/cluster-autoscaler/expander"
3234
"k8s.io/autoscaler/cluster-autoscaler/metrics"
3335
ca_processors "k8s.io/autoscaler/cluster-autoscaler/processors"
3436
"k8s.io/autoscaler/cluster-autoscaler/processors/nodegroups"
3537
"k8s.io/autoscaler/cluster-autoscaler/processors/nodegroupset"
3638
"k8s.io/autoscaler/cluster-autoscaler/processors/status"
39+
"k8s.io/autoscaler/cluster-autoscaler/resourcequotas"
3740
"k8s.io/autoscaler/cluster-autoscaler/simulator"
3841
"k8s.io/autoscaler/cluster-autoscaler/simulator/framework"
3942
"k8s.io/autoscaler/cluster-autoscaler/utils/errors"
@@ -47,6 +50,7 @@ type ScaleUpOrchestrator struct {
4750
autoscalingCtx *ca_context.AutoscalingContext
4851
processors *ca_processors.AutoscalingProcessors
4952
resourceManager *resource.Manager
53+
quotasTrackerFactory *resourcequotas.TrackerFactory
5054
clusterStateRegistry *clusterstate.ClusterStateRegistry
5155
scaleUpExecutor *scaleUpExecutor
5256
estimatorBuilder estimator.EstimatorBuilder
@@ -68,6 +72,7 @@ func (o *ScaleUpOrchestrator) Initialize(
6872
clusterStateRegistry *clusterstate.ClusterStateRegistry,
6973
estimatorBuilder estimator.EstimatorBuilder,
7074
taintConfig taints.TaintConfig,
75+
quotasTrackerFactory *resourcequotas.TrackerFactory,
7176
) {
7277
o.autoscalingCtx = autoscalingCtx
7378
o.processors = processors
@@ -76,6 +81,7 @@ func (o *ScaleUpOrchestrator) Initialize(
7681
o.taintConfig = taintConfig
7782
o.resourceManager = resource.NewManager(processors.CustomResourcesProcessor)
7883
o.scaleUpExecutor = newScaleUpExecutor(autoscalingCtx, processors.ScaleStateNotifier, o.processors.AsyncNodeGroupStateChecker)
84+
o.quotasTrackerFactory = quotasTrackerFactory
7985
o.initialized = true
8086
}
8187

@@ -121,15 +127,15 @@ func (o *ScaleUpOrchestrator) ScaleUp(
121127
// Initialise binpacking limiter.
122128
o.processors.BinpackingLimiter.InitBinpacking(o.autoscalingCtx, nodeGroups)
123129

124-
resourcesLeft, aErr := o.resourceManager.ResourcesLeft(o.autoscalingCtx, nodeInfos, nodes)
125-
if aErr != nil {
126-
return status.UpdateScaleUpError(&status.ScaleUpStatus{}, aErr.AddPrefix("could not compute total resources: "))
130+
tracker, err := o.newQuotasTracker()
131+
if err != nil {
132+
return status.UpdateScaleUpError(&status.ScaleUpStatus{}, errors.ToAutoscalerError(errors.InternalError, err).AddPrefix("could not create quotas tracker: "))
127133
}
128134

129135
now := time.Now()
130136

131137
// Filter out invalid node groups
132-
validNodeGroups, skippedNodeGroups := o.filterValidScaleUpNodeGroups(nodeGroups, nodeInfos, resourcesLeft, len(nodes)+len(upcomingNodes), now)
138+
validNodeGroups, skippedNodeGroups := o.filterValidScaleUpNodeGroups(nodeGroups, nodeInfos, tracker, len(nodes)+len(upcomingNodes), now)
133139

134140
// Mark skipped node groups as processed.
135141
for nodegroupID := range skippedNodeGroups {
@@ -194,15 +200,15 @@ func (o *ScaleUpOrchestrator) ScaleUp(
194200
return status.UpdateScaleUpError(&status.ScaleUpStatus{PodsTriggeredScaleUp: bestOption.Pods}, aErr)
195201
}
196202

197-
newNodes, aErr = o.applyLimits(newNodes, resourcesLeft, bestOption.NodeGroup, nodeInfos)
203+
newNodes, aErr = o.applyLimits(newNodes, tracker, bestOption.NodeGroup, nodeInfos)
198204
if aErr != nil {
199205
return status.UpdateScaleUpError(
200206
&status.ScaleUpStatus{PodsTriggeredScaleUp: bestOption.Pods},
201207
aErr)
202208
}
203209

204210
if newNodes < bestOption.NodeCount {
205-
klog.V(1).Infof("Only %d nodes can be added to %s due to cluster-wide limits", newNodes, bestOption.NodeGroup.Id())
211+
klog.V(1).Infof("Only %d nodes can be added to %s due to resource quotas", newNodes, bestOption.NodeGroup.Id())
206212
if allOrNothing {
207213
// Can't execute a scale-up that will accommodate all pods, so nothing is considered schedulable.
208214
klog.V(1).Info("Not attempting scale-up due to all-or-nothing strategy: not all pods would be accommodated")
@@ -283,14 +289,18 @@ func (o *ScaleUpOrchestrator) ScaleUp(
283289
}, nil
284290
}
285291

286-
func (o *ScaleUpOrchestrator) applyLimits(newNodes int, resourcesLeft resource.Limits, nodeGroup cloudprovider.NodeGroup, nodeInfos map[string]*framework.NodeInfo) (int, errors.AutoscalerError) {
292+
func (o *ScaleUpOrchestrator) applyLimits(newNodes int, tracker *resourcequotas.Tracker, nodeGroup cloudprovider.NodeGroup, nodeInfos map[string]*framework.NodeInfo) (int, errors.AutoscalerError) {
287293
nodeInfo, found := nodeInfos[nodeGroup.Id()]
288294
if !found {
289295
// This should never happen, as we already should have retrieved nodeInfo for any considered nodegroup.
290296
klog.Errorf("No node info for: %s", nodeGroup.Id())
291297
return 0, errors.NewAutoscalerError(errors.CloudProviderError, "No node info for best expansion option!")
292298
}
293-
return o.resourceManager.ApplyLimits(o.autoscalingCtx, newNodes, resourcesLeft, nodeInfo, nodeGroup)
299+
checkResult, err := tracker.CheckDelta(o.autoscalingCtx, nodeGroup, nodeInfo.Node(), newNodes)
300+
if err != nil {
301+
return 0, errors.ToAutoscalerError(errors.InternalError, err).AddPrefix("failed to check resource quotas: ")
302+
}
303+
return checkResult.AllowedDelta, nil
294304
}
295305

296306
// ScaleUpToNodeGroupMinSize tries to scale up node groups that have less nodes
@@ -309,9 +319,9 @@ func (o *ScaleUpOrchestrator) ScaleUpToNodeGroupMinSize(
309319
nodeGroups := o.autoscalingCtx.CloudProvider.NodeGroups()
310320
scaleUpInfos := make([]nodegroupset.ScaleUpInfo, 0)
311321

312-
resourcesLeft, aErr := o.resourceManager.ResourcesLeft(o.autoscalingCtx, nodeInfos, nodes)
313-
if aErr != nil {
314-
return status.UpdateScaleUpError(&status.ScaleUpStatus{}, aErr.AddPrefix("could not compute total resources: "))
322+
tracker, err := o.newQuotasTracker()
323+
if err != nil {
324+
return status.UpdateScaleUpError(&status.ScaleUpStatus{}, errors.ToAutoscalerError(errors.InternalError, err).AddPrefix("could not create quotas tracker: "))
315325
}
316326

317327
for _, ng := range nodeGroups {
@@ -342,17 +352,18 @@ func (o *ScaleUpOrchestrator) ScaleUpToNodeGroupMinSize(
342352
continue
343353
}
344354

345-
if skipReason := o.IsNodeGroupResourceExceeded(resourcesLeft, ng, nodeInfo, 1); skipReason != nil {
355+
if skipReason := o.IsNodeGroupResourceExceeded(tracker, ng, nodeInfo, 1); skipReason != nil {
346356
klog.Warningf("ScaleUpToNodeGroupMinSize: node group resource excceded: %v", skipReason)
347357
continue
348358
}
349359

350360
newNodeCount := ng.MinSize() - targetSize
351-
newNodeCount, err = o.resourceManager.ApplyLimits(o.autoscalingCtx, newNodeCount, resourcesLeft, nodeInfo, ng)
361+
checkResult, err := tracker.CheckDelta(o.autoscalingCtx, ng, nodeInfo.Node(), newNodeCount)
352362
if err != nil {
353-
klog.Warningf("ScaleUpToNodeGroupMinSize: failed to apply resource limits: %v", err)
363+
klog.Warningf("ScaleUpToNodeGroupMinSize: failed to check resource quotas: %v", err)
354364
continue
355365
}
366+
newNodeCount = checkResult.AllowedDelta
356367

357368
newNodeCount, err = o.GetCappedNewNodeCount(newNodeCount, targetSize)
358369
if err != nil {
@@ -397,7 +408,7 @@ func (o *ScaleUpOrchestrator) ScaleUpToNodeGroupMinSize(
397408
func (o *ScaleUpOrchestrator) filterValidScaleUpNodeGroups(
398409
nodeGroups []cloudprovider.NodeGroup,
399410
nodeInfos map[string]*framework.NodeInfo,
400-
resourcesLeft resource.Limits,
411+
tracker *resourcequotas.Tracker,
401412
currentNodeCount int,
402413
now time.Time,
403414
) ([]cloudprovider.NodeGroup, map[string]status.Reasons) {
@@ -441,7 +452,7 @@ func (o *ScaleUpOrchestrator) filterValidScaleUpNodeGroups(
441452
skippedNodeGroups[nodeGroup.Id()] = NotReadyReason
442453
continue
443454
}
444-
if skipReason := o.IsNodeGroupResourceExceeded(resourcesLeft, nodeGroup, nodeInfo, numNodes); skipReason != nil {
455+
if skipReason := o.IsNodeGroupResourceExceeded(tracker, nodeGroup, nodeInfo, numNodes); skipReason != nil {
445456
skippedNodeGroups[nodeGroup.Id()] = skipReason
446457
continue
447458
}
@@ -664,31 +675,35 @@ func (o *ScaleUpOrchestrator) IsNodeGroupReadyToScaleUp(nodeGroup cloudprovider.
664675
}
665676

666677
// IsNodeGroupResourceExceeded returns nil if node group resource limits are not exceeded, otherwise a reason is provided.
667-
func (o *ScaleUpOrchestrator) IsNodeGroupResourceExceeded(resourcesLeft resource.Limits, nodeGroup cloudprovider.NodeGroup, nodeInfo *framework.NodeInfo, numNodes int) status.Reasons {
668-
resourcesDelta, err := o.resourceManager.DeltaForNode(o.autoscalingCtx, nodeInfo, nodeGroup)
678+
func (o *ScaleUpOrchestrator) IsNodeGroupResourceExceeded(tracker *resourcequotas.Tracker, nodeGroup cloudprovider.NodeGroup, nodeInfo *framework.NodeInfo, numNodes int) status.Reasons {
679+
checkResult, err := tracker.CheckDelta(o.autoscalingCtx, nodeGroup, nodeInfo.Node(), numNodes)
669680
if err != nil {
670-
klog.Errorf("Skipping node group %s; error getting node group resources: %v", nodeGroup.Id(), err)
681+
klog.Errorf("Skipping node group %s; error checking resource quotas: %v", nodeGroup.Id(), err)
671682
return NotReadyReason
672683
}
673684

674-
for resource, delta := range resourcesDelta {
675-
resourcesDelta[resource] = delta * int64(numNodes)
676-
}
677-
678-
checkResult := resource.CheckDeltaWithinLimits(resourcesLeft, resourcesDelta)
679-
if checkResult.Exceeded {
680-
klog.V(4).Infof("Skipping node group %s; maximal limit exceeded for %v", nodeGroup.Id(), checkResult.ExceededResources)
681-
for _, resource := range checkResult.ExceededResources {
682-
switch resource {
683-
case cloudprovider.ResourceNameCores:
684-
metrics.RegisterSkippedScaleUpCPU()
685-
case cloudprovider.ResourceNameMemory:
686-
metrics.RegisterSkippedScaleUpMemory()
687-
default:
688-
continue
685+
if checkResult.Exceeded() {
686+
resources := make(sets.Set[string])
687+
for _, quota := range checkResult.ExceededQuotas {
688+
klog.V(4).Infof(
689+
"Skipping node group %s; %q quota exceeded, resources: %v", nodeGroup.Id(), quota.ID, quota.ExceededResources,
690+
)
691+
for _, resource := range quota.ExceededResources {
692+
if resources.Has(resource) {
693+
continue
694+
}
695+
resources.Insert(resource)
696+
switch resource {
697+
case cloudprovider.ResourceNameCores:
698+
metrics.RegisterSkippedScaleUpCPU()
699+
case cloudprovider.ResourceNameMemory:
700+
metrics.RegisterSkippedScaleUpMemory()
701+
default:
702+
continue
703+
}
689704
}
690705
}
691-
return NewMaxResourceLimitReached(checkResult.ExceededResources)
706+
return NewMaxResourceLimitReached(checkResult.ExceededQuotas)
692707
}
693708
return nil
694709
}
@@ -787,6 +802,22 @@ func (o *ScaleUpOrchestrator) ComputeSimilarNodeGroups(
787802
return validSimilarNodeGroups
788803
}
789804

805+
func (o *ScaleUpOrchestrator) newQuotasTracker() (*resourcequotas.Tracker, error) {
806+
var nodes []*apiv1.Node
807+
nodeInfos, err := o.autoscalingCtx.ClusterSnapshot.ListNodeInfos()
808+
if err != nil {
809+
return nil, err
810+
}
811+
for _, nodeInfo := range nodeInfos {
812+
node := nodeInfo.Node()
813+
if utils.IsVirtualKubeletNode(node) {
814+
continue
815+
}
816+
nodes = append(nodes, nodeInfo.Node())
817+
}
818+
return o.quotasTrackerFactory.NewQuotasTracker(o.autoscalingCtx, nodes)
819+
}
820+
790821
func matchingSchedulablePodGroups(podGroups []estimator.PodEquivalenceGroup, similarPodGroups []estimator.PodEquivalenceGroup) bool {
791822
schedulableSamplePods := make(map[*apiv1.Pod]bool)
792823
for _, podGroup := range similarPodGroups {

0 commit comments

Comments
 (0)