Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions cluster-autoscaler/core/autoscaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/expander/factory"
"k8s.io/autoscaler/cluster-autoscaler/observers/loopstart"
ca_processors "k8s.io/autoscaler/cluster-autoscaler/processors"
"k8s.io/autoscaler/cluster-autoscaler/resourcequotas"
"k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot/predicate"
"k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot/store"
"k8s.io/autoscaler/cluster-autoscaler/simulator/drainability/rules"
Expand Down Expand Up @@ -76,6 +77,7 @@ func NewAutoscaler(opts coreoptions.AutoscalerOptions, informerFactory informers
opts.DeleteOptions,
opts.DrainabilityRules,
opts.DraProvider,
opts.QuotasTrackerOptions,
), nil
}

Expand Down Expand Up @@ -142,6 +144,13 @@ func initializeDefaultOptions(opts *coreoptions.AutoscalerOptions, informerFacto
}
opts.ExpanderStrategy = expanderStrategy
}
if opts.QuotasTrackerOptions.QuotaProvider == nil {
cloudQuotasProvider := resourcequotas.NewCloudQuotasProvider(opts.CloudProvider)
opts.QuotasTrackerOptions.QuotaProvider = resourcequotas.NewCombinedQuotasProvider([]resourcequotas.Provider{cloudQuotasProvider})
}
if opts.QuotasTrackerOptions.CustomResourcesProcessor == nil {
opts.QuotasTrackerOptions.CustomResourcesProcessor = opts.Processors.CustomResourcesProcessor
}

return nil
}
2 changes: 2 additions & 0 deletions cluster-autoscaler/core/options/autoscaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/expander"
"k8s.io/autoscaler/cluster-autoscaler/observers/loopstart"
ca_processors "k8s.io/autoscaler/cluster-autoscaler/processors"
"k8s.io/autoscaler/cluster-autoscaler/resourcequotas"
"k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot"
"k8s.io/autoscaler/cluster-autoscaler/simulator/drainability/rules"
draprovider "k8s.io/autoscaler/cluster-autoscaler/simulator/dynamicresources/provider"
Expand Down Expand Up @@ -57,4 +58,5 @@ type AutoscalerOptions struct {
DeleteOptions options.NodeDeleteOptions
DrainabilityRules rules.Rules
DraProvider *draprovider.Provider
QuotasTrackerOptions resourcequotas.TrackerOptions
}
101 changes: 66 additions & 35 deletions cluster-autoscaler/core/scaleup/orchestrator/orchestrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,18 +22,21 @@ import (

appsv1 "k8s.io/api/apps/v1"
apiv1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/autoscaler/cluster-autoscaler/cloudprovider"
"k8s.io/autoscaler/cluster-autoscaler/clusterstate"
ca_context "k8s.io/autoscaler/cluster-autoscaler/context"
"k8s.io/autoscaler/cluster-autoscaler/core/scaleup/equivalence"
"k8s.io/autoscaler/cluster-autoscaler/core/scaleup/resource"
"k8s.io/autoscaler/cluster-autoscaler/core/utils"
"k8s.io/autoscaler/cluster-autoscaler/estimator"
"k8s.io/autoscaler/cluster-autoscaler/expander"
"k8s.io/autoscaler/cluster-autoscaler/metrics"
ca_processors "k8s.io/autoscaler/cluster-autoscaler/processors"
"k8s.io/autoscaler/cluster-autoscaler/processors/nodegroups"
"k8s.io/autoscaler/cluster-autoscaler/processors/nodegroupset"
"k8s.io/autoscaler/cluster-autoscaler/processors/status"
"k8s.io/autoscaler/cluster-autoscaler/resourcequotas"
"k8s.io/autoscaler/cluster-autoscaler/simulator"
"k8s.io/autoscaler/cluster-autoscaler/simulator/framework"
"k8s.io/autoscaler/cluster-autoscaler/utils/errors"
Expand All @@ -47,6 +50,7 @@ type ScaleUpOrchestrator struct {
autoscalingCtx *ca_context.AutoscalingContext
processors *ca_processors.AutoscalingProcessors
resourceManager *resource.Manager
quotasTrackerFactory *resourcequotas.TrackerFactory
clusterStateRegistry *clusterstate.ClusterStateRegistry
scaleUpExecutor *scaleUpExecutor
estimatorBuilder estimator.EstimatorBuilder
Expand All @@ -68,6 +72,7 @@ func (o *ScaleUpOrchestrator) Initialize(
clusterStateRegistry *clusterstate.ClusterStateRegistry,
estimatorBuilder estimator.EstimatorBuilder,
taintConfig taints.TaintConfig,
quotasTrackerFactory *resourcequotas.TrackerFactory,
) {
o.autoscalingCtx = autoscalingCtx
o.processors = processors
Expand All @@ -76,6 +81,7 @@ func (o *ScaleUpOrchestrator) Initialize(
o.taintConfig = taintConfig
o.resourceManager = resource.NewManager(processors.CustomResourcesProcessor)
o.scaleUpExecutor = newScaleUpExecutor(autoscalingCtx, processors.ScaleStateNotifier, o.processors.AsyncNodeGroupStateChecker)
o.quotasTrackerFactory = quotasTrackerFactory
o.initialized = true
}

Expand Down Expand Up @@ -121,15 +127,15 @@ func (o *ScaleUpOrchestrator) ScaleUp(
// Initialise binpacking limiter.
o.processors.BinpackingLimiter.InitBinpacking(o.autoscalingCtx, nodeGroups)

resourcesLeft, aErr := o.resourceManager.ResourcesLeft(o.autoscalingCtx, nodeInfos, nodes)
if aErr != nil {
return status.UpdateScaleUpError(&status.ScaleUpStatus{}, aErr.AddPrefix("could not compute total resources: "))
tracker, err := o.newQuotasTracker()
if err != nil {
return status.UpdateScaleUpError(&status.ScaleUpStatus{}, errors.ToAutoscalerError(errors.InternalError, err).AddPrefix("could not create quotas tracker: "))
}

now := time.Now()

// Filter out invalid node groups
validNodeGroups, skippedNodeGroups := o.filterValidScaleUpNodeGroups(nodeGroups, nodeInfos, resourcesLeft, len(nodes)+len(upcomingNodes), now)
validNodeGroups, skippedNodeGroups := o.filterValidScaleUpNodeGroups(nodeGroups, nodeInfos, tracker, len(nodes)+len(upcomingNodes), now)

// Mark skipped node groups as processed.
for nodegroupID := range skippedNodeGroups {
Expand Down Expand Up @@ -194,15 +200,15 @@ func (o *ScaleUpOrchestrator) ScaleUp(
return status.UpdateScaleUpError(&status.ScaleUpStatus{PodsTriggeredScaleUp: bestOption.Pods}, aErr)
}

newNodes, aErr = o.applyLimits(newNodes, resourcesLeft, bestOption.NodeGroup, nodeInfos)
newNodes, aErr = o.applyLimits(newNodes, tracker, bestOption.NodeGroup, nodeInfos)
if aErr != nil {
return status.UpdateScaleUpError(
&status.ScaleUpStatus{PodsTriggeredScaleUp: bestOption.Pods},
aErr)
}

if newNodes < bestOption.NodeCount {
klog.V(1).Infof("Only %d nodes can be added to %s due to cluster-wide limits", newNodes, bestOption.NodeGroup.Id())
klog.V(1).Infof("Only %d nodes can be added to %s due to resource quotas", newNodes, bestOption.NodeGroup.Id())
if allOrNothing {
// Can't execute a scale-up that will accommodate all pods, so nothing is considered schedulable.
klog.V(1).Info("Not attempting scale-up due to all-or-nothing strategy: not all pods would be accommodated")
Expand Down Expand Up @@ -283,14 +289,18 @@ func (o *ScaleUpOrchestrator) ScaleUp(
}, nil
}

func (o *ScaleUpOrchestrator) applyLimits(newNodes int, resourcesLeft resource.Limits, nodeGroup cloudprovider.NodeGroup, nodeInfos map[string]*framework.NodeInfo) (int, errors.AutoscalerError) {
func (o *ScaleUpOrchestrator) applyLimits(newNodes int, tracker *resourcequotas.Tracker, nodeGroup cloudprovider.NodeGroup, nodeInfos map[string]*framework.NodeInfo) (int, errors.AutoscalerError) {
nodeInfo, found := nodeInfos[nodeGroup.Id()]
if !found {
// This should never happen, as we already should have retrieved nodeInfo for any considered nodegroup.
klog.Errorf("No node info for: %s", nodeGroup.Id())
return 0, errors.NewAutoscalerError(errors.CloudProviderError, "No node info for best expansion option!")
}
return o.resourceManager.ApplyLimits(o.autoscalingCtx, newNodes, resourcesLeft, nodeInfo, nodeGroup)
checkResult, err := tracker.CheckDelta(o.autoscalingCtx, nodeGroup, nodeInfo.Node(), newNodes)
if err != nil {
return 0, errors.ToAutoscalerError(errors.InternalError, err).AddPrefix("failed to check resource quotas: ")
}
return checkResult.AllowedDelta, nil
}

// ScaleUpToNodeGroupMinSize tries to scale up node groups that have less nodes
Expand All @@ -309,9 +319,9 @@ func (o *ScaleUpOrchestrator) ScaleUpToNodeGroupMinSize(
nodeGroups := o.autoscalingCtx.CloudProvider.NodeGroups()
scaleUpInfos := make([]nodegroupset.ScaleUpInfo, 0)

resourcesLeft, aErr := o.resourceManager.ResourcesLeft(o.autoscalingCtx, nodeInfos, nodes)
if aErr != nil {
return status.UpdateScaleUpError(&status.ScaleUpStatus{}, aErr.AddPrefix("could not compute total resources: "))
tracker, err := o.newQuotasTracker()
if err != nil {
return status.UpdateScaleUpError(&status.ScaleUpStatus{}, errors.ToAutoscalerError(errors.InternalError, err).AddPrefix("could not create quotas tracker: "))
}

for _, ng := range nodeGroups {
Expand Down Expand Up @@ -342,17 +352,18 @@ func (o *ScaleUpOrchestrator) ScaleUpToNodeGroupMinSize(
continue
}

if skipReason := o.IsNodeGroupResourceExceeded(resourcesLeft, ng, nodeInfo, 1); skipReason != nil {
if skipReason := o.IsNodeGroupResourceExceeded(tracker, ng, nodeInfo, 1); skipReason != nil {
klog.Warningf("ScaleUpToNodeGroupMinSize: node group resource excceded: %v", skipReason)
continue
}

newNodeCount := ng.MinSize() - targetSize
newNodeCount, err = o.resourceManager.ApplyLimits(o.autoscalingCtx, newNodeCount, resourcesLeft, nodeInfo, ng)
checkResult, err := tracker.CheckDelta(o.autoscalingCtx, ng, nodeInfo.Node(), newNodeCount)
if err != nil {
klog.Warningf("ScaleUpToNodeGroupMinSize: failed to apply resource limits: %v", err)
klog.Warningf("ScaleUpToNodeGroupMinSize: failed to check resource quotas: %v", err)
continue
}
newNodeCount = checkResult.AllowedDelta

newNodeCount, err = o.GetCappedNewNodeCount(newNodeCount, targetSize)
if err != nil {
Expand Down Expand Up @@ -397,7 +408,7 @@ func (o *ScaleUpOrchestrator) ScaleUpToNodeGroupMinSize(
func (o *ScaleUpOrchestrator) filterValidScaleUpNodeGroups(
nodeGroups []cloudprovider.NodeGroup,
nodeInfos map[string]*framework.NodeInfo,
resourcesLeft resource.Limits,
tracker *resourcequotas.Tracker,
currentNodeCount int,
now time.Time,
) ([]cloudprovider.NodeGroup, map[string]status.Reasons) {
Expand Down Expand Up @@ -441,7 +452,7 @@ func (o *ScaleUpOrchestrator) filterValidScaleUpNodeGroups(
skippedNodeGroups[nodeGroup.Id()] = NotReadyReason
continue
}
if skipReason := o.IsNodeGroupResourceExceeded(resourcesLeft, nodeGroup, nodeInfo, numNodes); skipReason != nil {
if skipReason := o.IsNodeGroupResourceExceeded(tracker, nodeGroup, nodeInfo, numNodes); skipReason != nil {
skippedNodeGroups[nodeGroup.Id()] = skipReason
continue
}
Expand Down Expand Up @@ -664,31 +675,35 @@ func (o *ScaleUpOrchestrator) IsNodeGroupReadyToScaleUp(nodeGroup cloudprovider.
}

// IsNodeGroupResourceExceeded returns nil if node group resource limits are not exceeded, otherwise a reason is provided.
func (o *ScaleUpOrchestrator) IsNodeGroupResourceExceeded(resourcesLeft resource.Limits, nodeGroup cloudprovider.NodeGroup, nodeInfo *framework.NodeInfo, numNodes int) status.Reasons {
resourcesDelta, err := o.resourceManager.DeltaForNode(o.autoscalingCtx, nodeInfo, nodeGroup)
func (o *ScaleUpOrchestrator) IsNodeGroupResourceExceeded(tracker *resourcequotas.Tracker, nodeGroup cloudprovider.NodeGroup, nodeInfo *framework.NodeInfo, numNodes int) status.Reasons {
checkResult, err := tracker.CheckDelta(o.autoscalingCtx, nodeGroup, nodeInfo.Node(), numNodes)
if err != nil {
klog.Errorf("Skipping node group %s; error getting node group resources: %v", nodeGroup.Id(), err)
klog.Errorf("Skipping node group %s; error checking resource quotas: %v", nodeGroup.Id(), err)
return NotReadyReason
}

for resource, delta := range resourcesDelta {
resourcesDelta[resource] = delta * int64(numNodes)
}

checkResult := resource.CheckDeltaWithinLimits(resourcesLeft, resourcesDelta)
if checkResult.Exceeded {
klog.V(4).Infof("Skipping node group %s; maximal limit exceeded for %v", nodeGroup.Id(), checkResult.ExceededResources)
for _, resource := range checkResult.ExceededResources {
switch resource {
case cloudprovider.ResourceNameCores:
metrics.RegisterSkippedScaleUpCPU()
case cloudprovider.ResourceNameMemory:
metrics.RegisterSkippedScaleUpMemory()
default:
continue
if checkResult.Exceeded() {
resources := make(sets.Set[string])
for _, quota := range checkResult.ExceededQuotas {
klog.V(4).Infof(
"Skipping node group %s; %q quota exceeded, resources: %v", nodeGroup.Id(), quota.ID, quota.ExceededResources,
)
for _, resource := range quota.ExceededResources {
if resources.Has(resource) {
continue
}
resources.Insert(resource)
switch resource {
case cloudprovider.ResourceNameCores:
metrics.RegisterSkippedScaleUpCPU()
case cloudprovider.ResourceNameMemory:
metrics.RegisterSkippedScaleUpMemory()
default:
continue
}
}
}
return NewMaxResourceLimitReached(checkResult.ExceededResources)
return NewMaxResourceLimitReached(checkResult.ExceededQuotas)
}
return nil
}
Expand Down Expand Up @@ -787,6 +802,22 @@ func (o *ScaleUpOrchestrator) ComputeSimilarNodeGroups(
return validSimilarNodeGroups
}

func (o *ScaleUpOrchestrator) newQuotasTracker() (*resourcequotas.Tracker, error) {
var nodes []*apiv1.Node
nodeInfos, err := o.autoscalingCtx.ClusterSnapshot.ListNodeInfos()
if err != nil {
return nil, err
}
for _, nodeInfo := range nodeInfos {
node := nodeInfo.Node()
if utils.IsVirtualKubeletNode(node) {
continue
}
nodes = append(nodes, nodeInfo.Node())
}
return o.quotasTrackerFactory.NewQuotasTracker(o.autoscalingCtx, nodes)
}
Comment on lines +805 to +819
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That probably could be done better:

  1. Checking for virtual kubelet nodes could be done via resourcequotas.NodeFilter
  2. Passing all the nodes, including upcoming nodes via nodes parameter in ScaleUp (this way we could also remove some logic in ScaleUp around upcoming nodes, which are used for checking total node limits)

That would make it also easier to integrate with the other implementations of the Orchestrator

Notes to interviewers: do you think it's worth the hassle?


func matchingSchedulablePodGroups(podGroups []estimator.PodEquivalenceGroup, similarPodGroups []estimator.PodEquivalenceGroup) bool {
schedulableSamplePods := make(map[*apiv1.Pod]bool)
for _, podGroup := range similarPodGroups {
Expand Down
Loading