diff --git a/cluster-autoscaler/main.go b/cluster-autoscaler/main.go index 6800e0fec041..123881daae2c 100644 --- a/cluster-autoscaler/main.go +++ b/cluster-autoscaler/main.go @@ -197,7 +197,7 @@ func buildAutoscaler(ctx context.Context, debuggingSnapshotter debuggingsnapshot bufferPodInjector := cbprocessor.NewCapacityBufferPodListProcessor( capacitybufferClient, []string{common.ActiveProvisioningStrategy}, - buffersPodsRegistry) + buffersPodsRegistry, true) podListProcessor = pods.NewCombinedPodListProcessor([]pods.PodListProcessor{bufferPodInjector, podListProcessor}) opts.Processors.ScaleUpStatusProcessor = status.NewCombinedScaleUpStatusProcessor([]status.ScaleUpStatusProcessor{ cbprocessor.NewFakePodsScaleUpStatusProcessor(buffersPodsRegistry), opts.Processors.ScaleUpStatusProcessor}) diff --git a/cluster-autoscaler/processors/capacitybuffer/pod_list_processor.go b/cluster-autoscaler/processors/capacitybuffer/pod_list_processor.go index 5184c4e6d2a3..0fee2da1b581 100644 --- a/cluster-autoscaler/processors/capacitybuffer/pod_list_processor.go +++ b/cluster-autoscaler/processors/capacitybuffer/pod_list_processor.go @@ -31,6 +31,7 @@ import ( "k8s.io/autoscaler/cluster-autoscaler/capacitybuffer/common" buffersfilter "k8s.io/autoscaler/cluster-autoscaler/capacitybuffer/filters" ca_context "k8s.io/autoscaler/cluster-autoscaler/context" + "k8s.io/autoscaler/cluster-autoscaler/utils/drain" ) // Pods annotation keys and values for fake pods created by capacity buffer pod list processor @@ -42,11 +43,12 @@ const ( // CapacityBufferPodListProcessor processes the pod lists before scale up // and adds buffres api virtual pods. type CapacityBufferPodListProcessor struct { - client *client.CapacityBufferClient - statusFilter buffersfilter.Filter - podTemplateGenFilter buffersfilter.Filter - provStrategies map[string]bool - buffersRegistry *capacityBuffersFakePodsRegistry + client *client.CapacityBufferClient + statusFilter buffersfilter.Filter + podTemplateGenFilter buffersfilter.Filter + provStrategies map[string]bool + buffersRegistry *capacityBuffersFakePodsRegistry + forceSafeToEvictFakePods bool } // capacityBuffersFakePodsRegistry a struct that keeps the status of capacity buffer @@ -66,7 +68,7 @@ func NewDefaultCapacityBuffersFakePodsRegistry() *capacityBuffersFakePodsRegistr } // NewCapacityBufferPodListProcessor creates a new CapacityRequestPodListProcessor. -func NewCapacityBufferPodListProcessor(client *client.CapacityBufferClient, provStrategies []string, buffersRegistry *capacityBuffersFakePodsRegistry) *CapacityBufferPodListProcessor { +func NewCapacityBufferPodListProcessor(client *client.CapacityBufferClient, provStrategies []string, buffersRegistry *capacityBuffersFakePodsRegistry, forceSafeToEvictFakePods bool) *CapacityBufferPodListProcessor { provStrategiesMap := map[string]bool{} for _, ps := range provStrategies { provStrategiesMap[ps] = true @@ -77,9 +79,10 @@ func NewCapacityBufferPodListProcessor(client *client.CapacityBufferClient, prov common.ReadyForProvisioningCondition: common.ConditionTrue, common.ProvisioningCondition: common.ConditionTrue, }), - podTemplateGenFilter: buffersfilter.NewPodTemplateGenerationChangedFilter(client), - provStrategies: provStrategiesMap, - buffersRegistry: buffersRegistry, + podTemplateGenFilter: buffersfilter.NewPodTemplateGenerationChangedFilter(client), + provStrategies: provStrategiesMap, + buffersRegistry: buffersRegistry, + forceSafeToEvictFakePods: forceSafeToEvictFakePods, } } @@ -138,7 +141,7 @@ func (p *CapacityBufferPodListProcessor) provision(buffer *v1alpha1.CapacityBuff p.updateBufferStatus(buffer) return []*apiv1.Pod{} } - fakePods, err := makeFakePods(buffer, &podTemplate.Template, int(*replicas)) + fakePods, err := makeFakePods(buffer, &podTemplate.Template, int(*replicas), p.forceSafeToEvictFakePods) if err != nil { common.UpdateBufferStatusToFailedProvisioing(buffer, "FailedToMakeFakePods", fmt.Sprintf("failed to create fake pods with error: %v", err.Error())) p.updateBufferStatus(buffer) @@ -152,7 +155,6 @@ func (p *CapacityBufferPodListProcessor) provision(buffer *v1alpha1.CapacityBuff func (p *CapacityBufferPodListProcessor) filterBuffersProvStrategy(buffers []*v1alpha1.CapacityBuffer) []*v1alpha1.CapacityBuffer { var filteredBuffers []*v1alpha1.CapacityBuffer for _, buffer := range buffers { - if buffer.Status.ProvisioningStrategy != nil && p.provStrategies[*buffer.Status.ProvisioningStrategy] { filteredBuffers = append(filteredBuffers, buffer) } @@ -168,15 +170,18 @@ func (p *CapacityBufferPodListProcessor) updateBufferStatus(buffer *v1alpha1.Cap } // makeFakePods creates podCount number of copies of the sample pod -func makeFakePods(buffer *v1alpha1.CapacityBuffer, samplePodTemplate *apiv1.PodTemplateSpec, podCount int) ([]*apiv1.Pod, error) { +func makeFakePods(buffer *v1alpha1.CapacityBuffer, samplePodTemplate *apiv1.PodTemplateSpec, podCount int, forceSafeToEvictFakePods bool) ([]*apiv1.Pod, error) { var fakePods []*apiv1.Pod samplePod := getPodFromTemplate(samplePodTemplate, buffer.Namespace) + samplePod.Spec.NodeName = "" + samplePod = withCapacityBufferFakePodAnnotation(samplePod) + if forceSafeToEvictFakePods { + samplePod = withSafeToEvictAnnotation(samplePod) + } for i := 1; i <= podCount; i++ { fakePod := samplePod.DeepCopy() - fakePod = withCapacityBufferFakePodAnnotation(fakePod) fakePod.Name = fmt.Sprintf("capacity-buffer-%s-%d", buffer.Name, i) fakePod.UID = types.UID(fmt.Sprintf("%s-%d", string(buffer.UID), i)) - fakePod.Spec.NodeName = "" fakePods = append(fakePods, fakePod) } return fakePods, nil @@ -190,7 +195,16 @@ func withCapacityBufferFakePodAnnotation(pod *apiv1.Pod) *apiv1.Pod { return pod } -func isFakeCapacityBuffersPod(pod *apiv1.Pod) bool { +func withSafeToEvictAnnotation(pod *apiv1.Pod) *apiv1.Pod { + if pod.Annotations == nil { + pod.Annotations = make(map[string]string, 1) + } + pod.Annotations[drain.PodSafeToEvictKey] = "true" + return pod +} + +// IsFakeCapacityBuffersPod checks if the pod is a capacity buffer fake pod using pod annotation. +func IsFakeCapacityBuffersPod(pod *apiv1.Pod) bool { if pod.Annotations == nil { return false } diff --git a/cluster-autoscaler/processors/capacitybuffer/pod_list_processor_test.go b/cluster-autoscaler/processors/capacitybuffer/pod_list_processor_test.go index e95c937a1f9f..37bae7398073 100644 --- a/cluster-autoscaler/processors/capacitybuffer/pod_list_processor_test.go +++ b/cluster-autoscaler/processors/capacitybuffer/pod_list_processor_test.go @@ -19,12 +19,14 @@ package capacitybufferpodlister import ( "context" "fmt" + "strconv" "testing" "github.com/stretchr/testify/assert" apiv1 "k8s.io/autoscaler/cluster-autoscaler/apis/capacitybuffer/autoscaling.x-k8s.io/v1alpha1" "k8s.io/autoscaler/cluster-autoscaler/capacitybuffer/client" "k8s.io/autoscaler/cluster-autoscaler/capacitybuffer/common" + "k8s.io/autoscaler/cluster-autoscaler/utils/drain" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -47,6 +49,7 @@ func TestPodListProcessor(t *testing.T) { objectsInKubernetesClient []runtime.Object objectsInBuffersClient []runtime.Object unschedulablePods []*corev1.Pod + forceSafeToEvict bool expectedUnschedPodsCount int expectedUnschedFakePodsCount int expectedBuffersProvCondition map[string]metav1.Condition @@ -94,6 +97,7 @@ func TestPodListProcessor(t *testing.T) { objectsInKubernetesClient: []runtime.Object{getTestingPodTemplate("ref", 1)}, objectsInBuffersClient: []runtime.Object{getTestingBuffer("buffer", "ref", 1, 1, true, 1, testProvStrategyAllowed)}, unschedulablePods: []*corev1.Pod{getTestingPod("Pod")}, + forceSafeToEvict: true, expectedUnschedPodsCount: 2, expectedUnschedFakePodsCount: 1, expectedBuffersProvCondition: map[string]metav1.Condition{"buffer": {Type: common.ProvisioningCondition, Status: common.ConditionTrue}}, @@ -117,6 +121,7 @@ func TestPodListProcessor(t *testing.T) { getTestingBuffer("buffer2", "ref2", 5, 1, true, 1, testProvStrategyAllowed), }, unschedulablePods: []*corev1.Pod{getTestingPod("Pod1"), getTestingPod("Pod2"), getTestingPod("Pod3")}, + forceSafeToEvict: false, expectedUnschedPodsCount: 11, expectedUnschedFakePodsCount: 8, expectedBuffersProvCondition: map[string]metav1.Condition{ @@ -145,16 +150,18 @@ func TestPodListProcessor(t *testing.T) { fakeBuffersClient := buffersfake.NewSimpleClientset(test.objectsInBuffersClient...) fakeCapacityBuffersClient, _ := client.NewCapacityBufferClientFromClients(fakeBuffersClient, fakeKubernetesClient, nil, nil) - processor := NewCapacityBufferPodListProcessor(fakeCapacityBuffersClient, []string{testProvStrategyAllowed}, NewDefaultCapacityBuffersFakePodsRegistry()) + processor := NewCapacityBufferPodListProcessor(fakeCapacityBuffersClient, []string{testProvStrategyAllowed}, NewDefaultCapacityBuffersFakePodsRegistry(), test.forceSafeToEvict) resUnschedulablePods, err := processor.Process(nil, test.unschedulablePods) assert.Equal(t, err != nil, test.expectError) numberOfFakePods := 0 fakePodsNames := map[string]bool{} for _, pod := range resUnschedulablePods { - if isFakeCapacityBuffersPod(pod) { + if IsFakeCapacityBuffersPod(pod) { numberOfFakePods += 1 assert.False(t, fakePodsNames[pod.Name]) + safeToEvict, err := strconv.ParseBool(pod.Annotations[drain.PodSafeToEvictKey]) + assert.Equal(t, err == nil && safeToEvict, test.forceSafeToEvict) fakePodsNames[pod.Name] = true } } @@ -210,12 +217,12 @@ func TestCapacityBufferFakePodsRegistry(t *testing.T) { fakeCapacityBuffersClient, _ := client.NewCapacityBufferClientFromClients(fakeBuffersClient, fakeKubernetesClient, nil, nil) registry := NewDefaultCapacityBuffersFakePodsRegistry() - processor := NewCapacityBufferPodListProcessor(fakeCapacityBuffersClient, []string{testProvStrategyAllowed}, registry) + processor := NewCapacityBufferPodListProcessor(fakeCapacityBuffersClient, []string{testProvStrategyAllowed}, registry, false) resUnschedulablePods, err := processor.Process(nil, test.unschedulablePods) assert.Equal(t, nil, err) assert.Equal(t, test.expectedUnschedPodsCount, len(resUnschedulablePods)) for _, pod := range resUnschedulablePods { - if isFakeCapacityBuffersPod(pod) { + if IsFakeCapacityBuffersPod(pod) { podBufferObj, found := registry.fakePodsUIDToBuffer[string(pod.UID)] assert.True(t, found) expectedPodsNum, found := test.expectedBuffersPodsNum[podBufferObj.Name] diff --git a/cluster-autoscaler/processors/capacitybuffer/scale_up_status_processor.go b/cluster-autoscaler/processors/capacitybuffer/scale_up_status_processor.go index 74d187763581..f326e99f0a03 100644 --- a/cluster-autoscaler/processors/capacitybuffer/scale_up_status_processor.go +++ b/cluster-autoscaler/processors/capacitybuffer/scale_up_status_processor.go @@ -112,7 +112,7 @@ func filterOutCapacityBuffersPod[T any](podsWrappers []T, getPod func(T) *apiv1. filteredOutPodsSources := make([]T, 0) for _, podsWrapper := range podsWrappers { currentPod := getPod(podsWrapper) - if isFakeCapacityBuffersPod(currentPod) { + if IsFakeCapacityBuffersPod(currentPod) { filteredOutPodsSources = append(filteredOutPodsSources, podsWrapper) } else { filteredPodsSources = append(filteredPodsSources, podsWrapper)