Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions cluster-autoscaler/config/autoscaling_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,18 @@ type GpuLimits struct {
Max int64
}

// DraLimits define lower and upper bound on DRA instances of given type in cluster
type DraLimits struct {
// Driver is the driver name for the DRA (e.g. nvidia)
Driver string
// DeviceAttribute is the device's attribute by which autoscaler will filter resources (e.g. productName=tesla-k80)
DeviceAttribute string
// Lower bound on number of DRAs of given type in cluster
Min int64
// Upper bound on number of DRAs of given type in cluster
Max int64
}

// NodeGroupAutoscalingOptions contain various options to customize how autoscaling of
// a given NodeGroup works. Different options can be used for each NodeGroup.
type NodeGroupAutoscalingOptions struct {
Expand Down Expand Up @@ -120,6 +132,8 @@ type AutoscalingOptions struct {
MinMemoryTotal int64
// GpuTotal is a list of strings with configuration of min/max limits for different GPUs.
GpuTotal []GpuLimits
// DraTotal is a list of strings with configuration of min/max limits for different DRAs.
DraTotal []DraLimits
// NodeGroupAutoDiscovery represents one or more definition(s) of node group auto-discovery
NodeGroupAutoDiscovery []string
// EstimatorName is the estimator used to estimate the number of needed nodes in scale up.
Expand Down
49 changes: 45 additions & 4 deletions cluster-autoscaler/config/flags/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ var (
coresTotal = flag.String("cores-total", minMaxFlagString(0, config.DefaultMaxClusterCores), "Minimum and maximum number of cores in cluster, in the format <min>:<max>. Cluster autoscaler will not scale the cluster beyond these numbers.")
memoryTotal = flag.String("memory-total", minMaxFlagString(0, config.DefaultMaxClusterMemory), "Minimum and maximum number of gigabytes of memory in cluster, in the format <min>:<max>. Cluster autoscaler will not scale the cluster beyond these numbers.")
gpuTotal = multiStringFlag("gpu-total", "Minimum and maximum number of different GPUs in cluster, in the format <gpu_type>:<min>:<max>. Cluster autoscaler will not scale the cluster beyond these numbers. Can be passed multiple times. CURRENTLY THIS FLAG ONLY WORKS ON GKE.")
draTotal = multiStringFlag("dra-total", "Minimum and maximum number of DRA devices with specific attributes in cluster, in the format <driver>:<device_identifier_attribute>:<min>:<max>. Cluster autoscaler will not scale the cluster beyond these numbers. 'driver' refers to the DRA driver in which the limits will apply. 'device_identifier_attribute' is used to identify unique devices types (e.g. the productName value in the gpu.nvidia.com driver, like 'nvidia l4'). This flag can be passed multiple times.")
cloudProviderFlag = flag.String("cloud-provider", cloudBuilder.DefaultCloudProvider,
"Cloud provider type. Available values: ["+strings.Join(cloudBuilder.AvailableCloudProviders, ",")+"]")
maxBulkSoftTaintCount = flag.Int("max-bulk-soft-taint-count", 10, "Maximum number of nodes that can be tainted/untainted PreferNoSchedule at the same time. Set to 0 to turn off such tainting.")
Expand Down Expand Up @@ -263,7 +264,12 @@ func createAutoscalingOptions() config.AutoscalingOptions {
minMemoryTotal = minMemoryTotal * units.GiB
maxMemoryTotal = maxMemoryTotal * units.GiB

parsedGpuTotal, err := parseMultipleGpuLimits(*gpuTotal)
parsedGpuTotal, err := parseMultipleGpuLimits(*gpuTotal, parseSingleGpuLimit)
if err != nil {
klog.Fatalf("Failed to parse flags: %v", err)
}

parsedDraTotal, err := parseMultipleGpuLimits(*draTotal, parseSingleDraLimit)
if err != nil {
klog.Fatalf("Failed to parse flags: %v", err)
}
Expand Down Expand Up @@ -325,6 +331,7 @@ func createAutoscalingOptions() config.AutoscalingOptions {
MaxMemoryTotal: maxMemoryTotal,
MinMemoryTotal: minMemoryTotal,
GpuTotal: parsedGpuTotal,
DraTotal: parsedDraTotal,
NodeGroups: *nodeGroupsFlag,
EnforceNodeGroupMinSize: *enforceNodeGroupMinSize,
ScaleDownDelayAfterAdd: *scaleDownDelayAfterAdd,
Expand Down Expand Up @@ -467,10 +474,10 @@ func parseMinMaxFlag(flag string) (int64, int64, error) {
return min, max, nil
}

func parseMultipleGpuLimits(flags MultiStringFlag) ([]config.GpuLimits, error) {
parsedFlags := make([]config.GpuLimits, 0, len(flags))
func parseMultipleGpuLimits[T any](flags MultiStringFlag, parser func(string) (T, error)) ([]T, error) {
parsedFlags := make([]T, 0, len(flags))
for _, flag := range flags {
parsedFlag, err := parseSingleGpuLimit(flag)
parsedFlag, err := parser(flag)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -510,6 +517,40 @@ func parseSingleGpuLimit(limits string) (config.GpuLimits, error) {
return parsedGpuLimits, nil
}

// parseSingleDraLimit parses a string in the format "driver:deviceAttribute:min:max" into a DraLimits struct.
func parseSingleDraLimit(limits string) (config.DraLimits, error) {
parts := strings.Split(limits, ":")
if len(parts) != 4 {
return config.DraLimits{}, fmt.Errorf("incorrect DRA limit specification: %v", limits)
}
driver := parts[0]
deviceAttribute := parts[1]
minVal, err := strconv.ParseInt(parts[2], 10, 64)
if err != nil {
return config.DraLimits{}, fmt.Errorf("incorrect DRA limit - min is not integer: %v", limits)
}
maxVal, err := strconv.ParseInt(parts[3], 10, 64)
if err != nil {
return config.DraLimits{}, fmt.Errorf("incorrect DRA limit - max is not integer: %v", limits)
}
if minVal < 0 {
return config.DraLimits{}, fmt.Errorf("incorrect DRA limit - min is less than 0; %v", limits)
}
if maxVal < 0 {
return config.DraLimits{}, fmt.Errorf("incorrect DRA limit - max is less than 0; %v", limits)
}
if minVal > maxVal {
return config.DraLimits{}, fmt.Errorf("incorrect DRA limit - min is greater than max; %v", limits)
}
parsedDraLimits := config.DraLimits{
Driver: driver,
DeviceAttribute: deviceAttribute,
Min: minVal,
Max: maxVal,
}
return parsedDraLimits, nil
}

// parseShutdownGracePeriodsAndPriorities parse priorityGracePeriodStr and returns an array of ShutdownGracePeriodByPodPriority if succeeded.
// Otherwise, returns an empty list
func parseShutdownGracePeriodsAndPriorities(priorityGracePeriodStr string) []kubelet_config.ShutdownGracePeriodByPodPriority {
Expand Down
68 changes: 68 additions & 0 deletions cluster-autoscaler/config/flags/flags_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,74 @@ func TestParseSingleGpuLimit(t *testing.T) {
}
}

func TestParseSingleDraLimit(t *testing.T) {
type testcase struct {
input string
expectError bool
expectedLimits config.DraLimits
expectedErrorMessage string
}

testcases := []testcase{
{
input: "gpu.nvidia.com:H100:1:10",
expectError: false,
expectedLimits: config.DraLimits{
Driver: "gpu.nvidia.com",
DeviceAttribute: "H100",
Min: 1,
Max: 10,
},
},
{
input: "gpu.nvidia.com:H100:1",
expectError: true,
expectedErrorMessage: "incorrect DRA limit specification: gpu.nvidia.com:H100:1",
},
{
input: "gpu.nvidia.com:H100:1:10:x",
expectError: true,
expectedErrorMessage: "incorrect DRA limit specification: gpu.nvidia.com:H100:1:10:x",
},
{
input: "gpu.nvidia.com:H100:x:10",
expectError: true,
expectedErrorMessage: "incorrect DRA limit - min is not integer: gpu.nvidia.com:H100:x:10",
},
{
input: "gpu.nvidia.com:H100:1:y",
expectError: true,
expectedErrorMessage: "incorrect DRA limit - max is not integer: gpu.nvidia.com:H100:1:y",
},
{
input: "gpu.nvidia.com:H100:-1:10",
expectError: true,
expectedErrorMessage: "incorrect DRA limit - min is less than 0; gpu.nvidia.com:H100:-1:10",
},
{
input: "gpu.nvidia.com:H100:1:-10",
expectError: true,
expectedErrorMessage: "incorrect DRA limit - max is less than 0; gpu.nvidia.com:H100:1:-10",
},
{
input: "gpu.nvidia.com:H100:10:1",
expectError: true,
expectedErrorMessage: "incorrect DRA limit - min is greater than max; gpu.nvidia.com:H100:10:1",
},
}
for _, testcase := range testcases {
limits, err := parseSingleDraLimit(testcase.input)
if testcase.expectError {
assert.NotNil(t, err)
if err != nil {
assert.Equal(t, testcase.expectedErrorMessage, err.Error())
}
} else {
assert.Equal(t, testcase.expectedLimits, limits)
}
}
}

func TestParseShutdownGracePeriodsAndPriorities(t *testing.T) {
testCases := []struct {
name string
Expand Down
8 changes: 8 additions & 0 deletions cluster-autoscaler/context/autoscaling_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot"
draprovider "k8s.io/autoscaler/cluster-autoscaler/simulator/dynamicresources/provider"
"k8s.io/autoscaler/cluster-autoscaler/simulator/framework"
"k8s.io/autoscaler/cluster-autoscaler/utils/dra"
kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes"
"k8s.io/client-go/informers"
kube_client "k8s.io/client-go/kubernetes"
Expand Down Expand Up @@ -96,6 +97,13 @@ func NewResourceLimiterFromAutoscalingOptions(options config.AutoscalingOptions)
minResources[gpuLimits.GpuType] = gpuLimits.Min
maxResources[gpuLimits.GpuType] = gpuLimits.Max
}

for _, draLimits := range options.DraTotal {
resourceName := dra.GetDraResourceName(draLimits.Driver, draLimits.DeviceAttribute)
minResources[resourceName] = draLimits.Min
maxResources[resourceName] = draLimits.Max
}

return cloudprovider.NewResourceLimiter(minResources, maxResources)
}

Expand Down
128 changes: 128 additions & 0 deletions cluster-autoscaler/core/scaleup/resource/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,17 +25,20 @@ import (

appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
resource "k8s.io/api/resource/v1"
"k8s.io/autoscaler/cluster-autoscaler/cloudprovider"
testprovider "k8s.io/autoscaler/cluster-autoscaler/cloudprovider/test"
"k8s.io/autoscaler/cluster-autoscaler/config"
ca_context "k8s.io/autoscaler/cluster-autoscaler/context"
"k8s.io/autoscaler/cluster-autoscaler/core/test"
"k8s.io/autoscaler/cluster-autoscaler/processors/nodeinfosprovider"
processorstest "k8s.io/autoscaler/cluster-autoscaler/processors/test"
"k8s.io/autoscaler/cluster-autoscaler/simulator/framework"
kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes"
"k8s.io/autoscaler/cluster-autoscaler/utils/taints"
utils_test "k8s.io/autoscaler/cluster-autoscaler/utils/test"
"k8s.io/client-go/kubernetes/fake"
"k8s.io/utils/ptr"
)

type nodeGroupConfig struct {
Expand Down Expand Up @@ -257,6 +260,131 @@ func TestResourceManagerWithGpuResource(t *testing.T) {
assert.Equal(t, 3, newNodeCount) // gpu left / grpu per node: 12 / 4 = 3
}

func TestResourceManagerWithDraResource(t *testing.T) {
provider := testprovider.NewTestCloudProviderBuilder().Build()
resourceLimiter := cloudprovider.NewResourceLimiter(
map[string]int64{cloudprovider.ResourceNameCores: 0, cloudprovider.ResourceNameMemory: 0, "gpu.nvidia.com:nvidia l4": 0},
map[string]int64{cloudprovider.ResourceNameCores: 320, cloudprovider.ResourceNameMemory: 640, "gpu.nvidia.com:nvidia l4": 16},
)
provider.SetResourceLimiter(resourceLimiter)

autoscalingCtx := newAutoscalingContext(t, provider)
processors := processorstest.NewTestProcessors(&autoscalingCtx)

n1 := newNode(t, "n1", 8, 16)
provider.AddNodeGroup("ng1", 3, 10, 1)
provider.AddNode("ng1", n1)
ngi := framework.NewTestNodeInfo(n1)
ngi.LocalResourceSlices = []*resource.ResourceSlice{
{
Spec: resource.ResourceSliceSpec{
Driver: "gpu.nvidia.com",
NodeName: ptr.To("n1"),
Pool: resource.ResourcePool{
Name: "n1",
Generation: 1,
ResourceSliceCount: 1,
},
Devices: []resource.Device{
{
Name: "gpu-0",
Attributes: map[resource.QualifiedName]resource.DeviceAttribute{
"uuid": {
StringValue: ptr.To("GPU-be40a35e-4b7b-16cf-09c4-2fd3e9166701"),
},
"productName": {
StringValue: ptr.To("NVIDIA L4"),
},
},
},
{
Name: "gpu-0-partition-0",
Attributes: map[resource.QualifiedName]resource.DeviceAttribute{
"uuid": {
StringValue: ptr.To("GPU-be40a35e-4b7b-16cf-09c4-2fd3e9166701"),
},
"productName": {
StringValue: ptr.To("NVIDIA L4"),
},
},
},
{
Name: "gpu-0-partition-1",
Attributes: map[resource.QualifiedName]resource.DeviceAttribute{
"uuid": {
StringValue: ptr.To("GPU-be40a35e-4b7b-16cf-09c4-2fd3e9166701"),
},
"productName": {
StringValue: ptr.To("NVIDIA L4"),
},
},
},
{
Name: "gpu-1",
Attributes: map[resource.QualifiedName]resource.DeviceAttribute{
"uuid": {
StringValue: ptr.To("GPU-39807e58-5aca-9931-8819-3920ede08201"),
},
"productName": {
StringValue: ptr.To("NVIDIA L4"),
},
},
},
{
Name: "gpu-2",
Attributes: map[resource.QualifiedName]resource.DeviceAttribute{
"uuid": {
StringValue: ptr.To("GPU-8ed40f31-c63d-c26b-46ea-5c024ec56802"),
},
"productName": {
StringValue: ptr.To("NVIDIA L4"),
},
},
},
{
Name: "gpu-3",
Attributes: map[resource.QualifiedName]resource.DeviceAttribute{
"uuid": {
StringValue: ptr.To("GPU-f095b087-5812-4317-b9d1-193b8af1cd03"),
},
"productName": {
StringValue: ptr.To("NVIDIA L4"),
},
},
},
},
},
},
}
provider.SetMachineTemplates(map[string]*framework.NodeInfo{
"ng1": ngi,
})

ng1, err := provider.NodeGroupForNode(n1)
assert.NoError(t, err)

nodes := []*corev1.Node{n1}
err = autoscalingCtx.ClusterSnapshot.SetClusterState(nodes, nil, nil)
assert.NoError(t, err)

rm := NewManager(processors.CustomResourcesProcessor)

delta, err := rm.DeltaForNode(&autoscalingCtx, ngi, ng1)
assert.Equal(t, int64(8), delta[cloudprovider.ResourceNameCores])
assert.Equal(t, int64(16), delta[cloudprovider.ResourceNameMemory])
assert.Equal(t, int64(4), delta["gpu.nvidia.com:nvidia l4"])

left, err := rm.ResourcesLeft(&autoscalingCtx, map[string]*framework.NodeInfo{"ng1": ngi}, nodes)
assert.NoError(t, err)
assert.Equal(t, Limits{"cpu": 312, "memory": 624, "gpu.nvidia.com:nvidia l4": 12}, left) // cpu: 320-8*1=312; memory: 640-16*1=624; gpu: 16-4*1=12
result := CheckDeltaWithinLimits(left, delta)
assert.False(t, result.Exceeded)
assert.Zero(t, len(result.ExceededResources))

newNodeCount, err := rm.ApplyLimits(&autoscalingCtx, 10, left, ngi, ng1)
assert.Equal(t, 3, newNodeCount) // gpu left / grpu per node: 12 / 4 = 3
}

func newCloudProvider(t *testing.T, cpu, mem int64) *testprovider.TestCloudProvider {
provider := testprovider.NewTestCloudProviderBuilder().Build()
assert.NotNil(t, provider)
Expand Down
Loading