Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cluster-autoscaler/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ func buildAutoscaler(ctx context.Context, debuggingSnapshotter debuggingsnapshot
bufferPodInjector := cbprocessor.NewCapacityBufferPodListProcessor(
capacitybufferClient,
[]string{common.ActiveProvisioningStrategy},
buffersPodsRegistry)
buffersPodsRegistry, true)
podListProcessor = pods.NewCombinedPodListProcessor([]pods.PodListProcessor{bufferPodInjector, podListProcessor})
opts.Processors.ScaleUpStatusProcessor = status.NewCombinedScaleUpStatusProcessor([]status.ScaleUpStatusProcessor{
cbprocessor.NewFakePodsScaleUpStatusProcessor(buffersPodsRegistry), opts.Processors.ScaleUpStatusProcessor})
Expand Down
44 changes: 29 additions & 15 deletions cluster-autoscaler/processors/capacitybuffer/pod_list_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/capacitybuffer/common"
buffersfilter "k8s.io/autoscaler/cluster-autoscaler/capacitybuffer/filters"
ca_context "k8s.io/autoscaler/cluster-autoscaler/context"
"k8s.io/autoscaler/cluster-autoscaler/utils/drain"
)

// Pods annotation keys and values for fake pods created by capacity buffer pod list processor
Expand All @@ -42,11 +43,12 @@ const (
// CapacityBufferPodListProcessor processes the pod lists before scale up
// and adds buffres api virtual pods.
type CapacityBufferPodListProcessor struct {
client *client.CapacityBufferClient
statusFilter buffersfilter.Filter
podTemplateGenFilter buffersfilter.Filter
provStrategies map[string]bool
buffersRegistry *capacityBuffersFakePodsRegistry
client *client.CapacityBufferClient
statusFilter buffersfilter.Filter
podTemplateGenFilter buffersfilter.Filter
provStrategies map[string]bool
buffersRegistry *capacityBuffersFakePodsRegistry
forceSafeToEvictFakePods bool
}

// capacityBuffersFakePodsRegistry a struct that keeps the status of capacity buffer
Expand All @@ -66,7 +68,7 @@ func NewDefaultCapacityBuffersFakePodsRegistry() *capacityBuffersFakePodsRegistr
}

// NewCapacityBufferPodListProcessor creates a new CapacityRequestPodListProcessor.
func NewCapacityBufferPodListProcessor(client *client.CapacityBufferClient, provStrategies []string, buffersRegistry *capacityBuffersFakePodsRegistry) *CapacityBufferPodListProcessor {
func NewCapacityBufferPodListProcessor(client *client.CapacityBufferClient, provStrategies []string, buffersRegistry *capacityBuffersFakePodsRegistry, forceSafeToEvictFakePods bool) *CapacityBufferPodListProcessor {
provStrategiesMap := map[string]bool{}
for _, ps := range provStrategies {
provStrategiesMap[ps] = true
Expand All @@ -77,9 +79,10 @@ func NewCapacityBufferPodListProcessor(client *client.CapacityBufferClient, prov
common.ReadyForProvisioningCondition: common.ConditionTrue,
common.ProvisioningCondition: common.ConditionTrue,
}),
podTemplateGenFilter: buffersfilter.NewPodTemplateGenerationChangedFilter(client),
provStrategies: provStrategiesMap,
buffersRegistry: buffersRegistry,
podTemplateGenFilter: buffersfilter.NewPodTemplateGenerationChangedFilter(client),
provStrategies: provStrategiesMap,
buffersRegistry: buffersRegistry,
forceSafeToEvictFakePods: forceSafeToEvictFakePods,
}
}

Expand Down Expand Up @@ -138,7 +141,7 @@ func (p *CapacityBufferPodListProcessor) provision(buffer *v1alpha1.CapacityBuff
p.updateBufferStatus(buffer)
return []*apiv1.Pod{}
}
fakePods, err := makeFakePods(buffer, &podTemplate.Template, int(*replicas))
fakePods, err := makeFakePods(buffer, &podTemplate.Template, int(*replicas), p.forceSafeToEvictFakePods)
if err != nil {
common.UpdateBufferStatusToFailedProvisioing(buffer, "FailedToMakeFakePods", fmt.Sprintf("failed to create fake pods with error: %v", err.Error()))
p.updateBufferStatus(buffer)
Expand All @@ -152,7 +155,6 @@ func (p *CapacityBufferPodListProcessor) provision(buffer *v1alpha1.CapacityBuff
func (p *CapacityBufferPodListProcessor) filterBuffersProvStrategy(buffers []*v1alpha1.CapacityBuffer) []*v1alpha1.CapacityBuffer {
var filteredBuffers []*v1alpha1.CapacityBuffer
for _, buffer := range buffers {

if buffer.Status.ProvisioningStrategy != nil && p.provStrategies[*buffer.Status.ProvisioningStrategy] {
filteredBuffers = append(filteredBuffers, buffer)
}
Expand All @@ -168,15 +170,18 @@ func (p *CapacityBufferPodListProcessor) updateBufferStatus(buffer *v1alpha1.Cap
}

// makeFakePods creates podCount number of copies of the sample pod
func makeFakePods(buffer *v1alpha1.CapacityBuffer, samplePodTemplate *apiv1.PodTemplateSpec, podCount int) ([]*apiv1.Pod, error) {
func makeFakePods(buffer *v1alpha1.CapacityBuffer, samplePodTemplate *apiv1.PodTemplateSpec, podCount int, forceSafeToEvictFakePods bool) ([]*apiv1.Pod, error) {
var fakePods []*apiv1.Pod
samplePod := getPodFromTemplate(samplePodTemplate, buffer.Namespace)
samplePod.Spec.NodeName = ""
samplePod = withCapacityBufferFakePodAnnotation(samplePod)
if forceSafeToEvictFakePods {
samplePod = withSafeToEvictAnnotation(samplePod)
}
for i := 1; i <= podCount; i++ {
fakePod := samplePod.DeepCopy()
fakePod = withCapacityBufferFakePodAnnotation(fakePod)
fakePod.Name = fmt.Sprintf("capacity-buffer-%s-%d", buffer.Name, i)
fakePod.UID = types.UID(fmt.Sprintf("%s-%d", string(buffer.UID), i))
fakePod.Spec.NodeName = ""
fakePods = append(fakePods, fakePod)
}
return fakePods, nil
Expand All @@ -190,7 +195,16 @@ func withCapacityBufferFakePodAnnotation(pod *apiv1.Pod) *apiv1.Pod {
return pod
}

func isFakeCapacityBuffersPod(pod *apiv1.Pod) bool {
func withSafeToEvictAnnotation(pod *apiv1.Pod) *apiv1.Pod {
if pod.Annotations == nil {
pod.Annotations = make(map[string]string, 1)
}
pod.Annotations[drain.PodSafeToEvictKey] = "true"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because we're passing in a static true for this option, this represents a change. We're O.K. that given that this functionality hasn't yet been released?

return pod
}

// IsFakeCapacityBuffersPod checks if the pod is a capacity buffer fake pod using pod annotation.
func IsFakeCapacityBuffersPod(pod *apiv1.Pod) bool {
if pod.Annotations == nil {
return false
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,14 +145,14 @@ func TestPodListProcessor(t *testing.T) {
fakeBuffersClient := buffersfake.NewSimpleClientset(test.objectsInBuffersClient...)
fakeCapacityBuffersClient, _ := client.NewCapacityBufferClientFromClients(fakeBuffersClient, fakeKubernetesClient, nil, nil)

processor := NewCapacityBufferPodListProcessor(fakeCapacityBuffersClient, []string{testProvStrategyAllowed}, NewDefaultCapacityBuffersFakePodsRegistry())
processor := NewCapacityBufferPodListProcessor(fakeCapacityBuffersClient, []string{testProvStrategyAllowed}, NewDefaultCapacityBuffersFakePodsRegistry(), false)
resUnschedulablePods, err := processor.Process(nil, test.unschedulablePods)
assert.Equal(t, err != nil, test.expectError)

numberOfFakePods := 0
fakePodsNames := map[string]bool{}
for _, pod := range resUnschedulablePods {
if isFakeCapacityBuffersPod(pod) {
if IsFakeCapacityBuffersPod(pod) {
numberOfFakePods += 1
assert.False(t, fakePodsNames[pod.Name])
fakePodsNames[pod.Name] = true
Expand Down Expand Up @@ -210,12 +210,12 @@ func TestCapacityBufferFakePodsRegistry(t *testing.T) {
fakeCapacityBuffersClient, _ := client.NewCapacityBufferClientFromClients(fakeBuffersClient, fakeKubernetesClient, nil, nil)

registry := NewDefaultCapacityBuffersFakePodsRegistry()
processor := NewCapacityBufferPodListProcessor(fakeCapacityBuffersClient, []string{testProvStrategyAllowed}, registry)
processor := NewCapacityBufferPodListProcessor(fakeCapacityBuffersClient, []string{testProvStrategyAllowed}, registry, false)
resUnschedulablePods, err := processor.Process(nil, test.unschedulablePods)
assert.Equal(t, nil, err)
assert.Equal(t, test.expectedUnschedPodsCount, len(resUnschedulablePods))
for _, pod := range resUnschedulablePods {
if isFakeCapacityBuffersPod(pod) {
if IsFakeCapacityBuffersPod(pod) {
podBufferObj, found := registry.fakePodsUIDToBuffer[string(pod.UID)]
assert.True(t, found)
expectedPodsNum, found := test.expectedBuffersPodsNum[podBufferObj.Name]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ func filterOutCapacityBuffersPod[T any](podsWrappers []T, getPod func(T) *apiv1.
filteredOutPodsSources := make([]T, 0)
for _, podsWrapper := range podsWrappers {
currentPod := getPod(podsWrapper)
if isFakeCapacityBuffersPod(currentPod) {
if IsFakeCapacityBuffersPod(currentPod) {
filteredOutPodsSources = append(filteredOutPodsSources, podsWrapper)
} else {
filteredPodsSources = append(filteredPodsSources, podsWrapper)
Expand Down