Skip to content

Commit d0a2973

Browse files
authored
Merge pull request #7992 from voelzmo/enh/concurrent-recommender
Make VPA and Checkpoint updates concurrent
2 parents 708f442 + 0ee2d8a commit d0a2973

File tree

6 files changed

+237
-97
lines changed

6 files changed

+237
-97
lines changed

vertical-pod-autoscaler/common/flags.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,8 +39,8 @@ type CommonFlags struct {
3939
func InitCommonFlags() *CommonFlags {
4040
cf := &CommonFlags{}
4141
flag.StringVar(&cf.KubeConfig, "kubeconfig", "", "Path to a kubeconfig. Only required if out-of-cluster.")
42-
flag.Float64Var(&cf.KubeApiQps, "kube-api-qps", 5.0, "QPS limit when making requests to Kubernetes apiserver")
43-
flag.Float64Var(&cf.KubeApiBurst, "kube-api-burst", 10.0, "QPS burst limit when making requests to Kubernetes apiserver")
42+
flag.Float64Var(&cf.KubeApiQps, "kube-api-qps", 50.0, "QPS limit when making requests to Kubernetes apiserver")
43+
flag.Float64Var(&cf.KubeApiBurst, "kube-api-burst", 100.0, "QPS burst limit when making requests to Kubernetes apiserver")
4444
flag.BoolVar(&cf.EnableProfiling, "profiling", false, "Is debug/pprof endpoint enabled")
4545
flag.StringVar(&cf.VpaObjectNamespace, "vpa-object-namespace", apiv1.NamespaceAll, "Specifies the namespace to search for VPA objects. Leave empty to include all namespaces. If provided, the garbage collector will only clean this namespace.")
4646
flag.StringVar(&cf.IgnoredVpaObjectNamespaces, "ignored-vpa-object-namespaces", "", "A comma-separated list of namespaces to ignore when searching for VPA objects. Leave empty to avoid ignoring any namespaces. These namespaces will not be cleaned by the garbage collector.")

vertical-pod-autoscaler/docs/flags.md

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,8 @@ This document is auto-generated from the flag definitions in the VPA admission-c
1616
| `--client-ca-file` | "/etc/tls-certs/caCert.pem" | Path to CA PEM file. |
1717
| `--feature-gates` | | A set of key=value pairs that describe feature gates for alpha/experimental features. Options are: |
1818
| `--ignored-vpa-object-namespaces` | | A comma-separated list of namespaces to ignore when searching for VPA objects. Leave empty to avoid ignoring any namespaces. These namespaces will not be cleaned by the garbage collector. |
19-
| `--kube-api-burst` | 10 | QPS burst limit when making requests to Kubernetes apiserver |
20-
| `--kube-api-qps` | 5 | QPS limit when making requests to Kubernetes apiserver |
19+
| `--kube-api-burst` | 100 | QPS burst limit when making requests to Kubernetes apiserver |
20+
| `--kube-api-qps` | 50 | QPS limit when making requests to Kubernetes apiserver |
2121
| `--kubeconfig` | | Path to a kubeconfig. Only required if out-of-cluster. |
2222
| `--log-backtrace-at` | :0 | when logging hits line file:N, emit a stack trace |
2323
| `--log-dir` | | If non-empty, write log files in this directory (no effect when -logtostderr=true) |
@@ -73,8 +73,8 @@ This document is auto-generated from the flag definitions in the VPA recommender
7373
| `--history-resolution` | "1h" | Resolution at which Prometheus is queried for historical metrics |
7474
| `--humanize-memory` | | Convert memory values in recommendations to the highest appropriate SI unit with up to 2 decimal places for better readability. |
7575
| `--ignored-vpa-object-namespaces` | | A comma-separated list of namespaces to ignore when searching for VPA objects. Leave empty to avoid ignoring any namespaces. These namespaces will not be cleaned by the garbage collector. |
76-
| `--kube-api-burst` | 10 | QPS burst limit when making requests to Kubernetes apiserver |
77-
| `--kube-api-qps` | 5 | QPS limit when making requests to Kubernetes apiserver |
76+
| `--kube-api-burst` | 100 | QPS burst limit when making requests to Kubernetes apiserver |
77+
| `--kube-api-qps` | 50 | QPS limit when making requests to Kubernetes apiserver |
7878
| `--kubeconfig` | | Path to a kubeconfig. Only required if out-of-cluster. |
7979
| `--leader-elect` | | Start a leader election client and gain leadership before executing the main loop. Enable this when running replicated components for high availability. |
8080
| `--leader-elect-lease-duration` | 15s | The duration that non-leader candidates will wait after observing a leadership renewal until attempting to acquire leadership of a led but unrenewed leader slot. This is effectively the maximum duration that a leader can be stopped before it is replaced by another candidate. This is only applicable if leader election is enabled. |
@@ -93,7 +93,7 @@ This document is auto-generated from the flag definitions in the VPA recommender
9393
| `--memory-histogram-decay-half-life` | 24h0m0s | The amount of time it takes a historical memory usage sample to lose half of its weight. In other words, a fresh usage sample is twice as 'important' as one with age equal to the half life period. |
9494
| `--memory-saver` | | If true, only track pods which have an associated VPA |
9595
| `--metric-for-pod-labels` | "up{job=\"kubernetes-pods\"}" | Which metric to look for pod labels in metrics |
96-
| `--min-checkpoints` | 10 | Minimum number of checkpoints to write per recommender's main loop |
96+
| `--min-checkpoints` | 10 | Minimum number of checkpoints to write per recommender's main loop. WARNING: this flag is deprecated and doesn't have any effect. It will be removed in a future release. Refer to update-worker-count to influence the minimum number of checkpoints written per loop. |
9797
| `--one-output` | | If true, only write logs to their native severity level (vs also writing to each lower severity level; no effect when -logtostderr=true) |
9898
| `--oom-bump-up-ratio` | 1.2 | The memory bump up ratio when OOM occurred, default is 1.2. |
9999
| `--oom-min-bump-up-bytes` | 1.048576e+08 | The minimal increase of memory when OOM occurred in bytes, default is 100 * 1024 * 1024 |
@@ -121,6 +121,7 @@ This document is auto-generated from the flag definitions in the VPA recommender
121121
| `--storage` | | Specifies storage mode. Supported values: prometheus, checkpoint |
122122
| `--target-cpu-percentile` | 0.9 | CPU usage percentile that will be used as a base for CPU target recommendation. Doesn't affect CPU lower bound, CPU upper bound nor memory recommendations. |
123123
| `--target-memory-percentile` | 0.9 | Memory usage percentile that will be used as a base for memory target recommendation. Doesn't affect memory lower bound nor memory upper bound. |
124+
| `--update-worker-count` | 10 | Number of concurrent workers to update VPA recommendations and checkpoints. When increasing this setting, make sure the client-side rate limits (kube-api-qps and `kube-api-burst`) are either increased or turned off as well. Determines the minimum number of VPA checkpoints written per recommender loop. |
124125
| `--use-external-metrics` | | ALPHA. Use an external metrics provider instead of metrics_server. |
125126
| `--username` | | The username used in the prometheus server basic auth |
126127
| `--v` | 4 | Set the log level verbosity |
@@ -142,8 +143,8 @@ This document is auto-generated from the flag definitions in the VPA updater cod
142143
| `--feature-gates` | | A set of key=value pairs that describe feature gates for alpha/experimental features. Options are: |
143144
| `--ignored-vpa-object-namespaces` | | A comma-separated list of namespaces to ignore when searching for VPA objects. Leave empty to avoid ignoring any namespaces. These namespaces will not be cleaned by the garbage collector. |
144145
| `--in-recommendation-bounds-eviction-lifetime-threshold` | 12h0m0s | Pods that live for at least that long can be evicted even if their request is within the [MinRecommended...MaxRecommended] range |
145-
| `--kube-api-burst` | 10 | QPS burst limit when making requests to Kubernetes apiserver |
146-
| `--kube-api-qps` | 5 | QPS limit when making requests to Kubernetes apiserver |
146+
| `--kube-api-burst` | 100 | QPS burst limit when making requests to Kubernetes apiserver |
147+
| `--kube-api-qps` | 50 | QPS limit when making requests to Kubernetes apiserver |
147148
| `--kubeconfig` | | Path to a kubeconfig. Only required if out-of-cluster. |
148149
| `--leader-elect` | | Start a leader election client and gain leadership before executing the main loop. Enable this when running replicated components for high availability. |
149150
| `--leader-elect-lease-duration` | 15s | The duration that non-leader candidates will wait after observing a leadership renewal until attempting to acquire leadership of a led but unrenewed leader slot. This is effectively the maximum duration that a leader can be stopped before it is replaced by another candidate. This is only applicable if leader election is enabled. |

vertical-pod-autoscaler/pkg/recommender/checkpoint/checkpoint_writer.go

Lines changed: 63 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
"context"
2121
"fmt"
2222
"sort"
23+
"sync"
2324
"time"
2425

2526
v1 "k8s.io/api/core/v1"
@@ -35,9 +36,9 @@ import (
3536
// CheckpointWriter persistently stores aggregated historical usage of containers
3637
// controlled by VPA objects. This state can be restored to initialize the model after restart.
3738
type CheckpointWriter interface {
38-
// StoreCheckpoints writes at least minCheckpoints if there are more checkpoints to write.
39+
// StoreCheckpoints writes checkpoints for at least `concurrentWorkers` number of VPAs.
3940
// Checkpoints are written until ctx permits or all checkpoints are written.
40-
StoreCheckpoints(ctx context.Context, now time.Time, minCheckpoints int) error
41+
StoreCheckpoints(ctx context.Context, concurrentWorkers int)
4142
}
4243

4344
type checkpointWriter struct {
@@ -76,48 +77,72 @@ func getVpasToCheckpoint(clusterVpas map[model.VpaID]*model.Vpa) []*model.Vpa {
7677
return vpas
7778
}
7879

79-
func (writer *checkpointWriter) StoreCheckpoints(ctx context.Context, now time.Time, minCheckpoints int) error {
80+
func processCheckpointUpdateForVPA(vpa *model.Vpa, writer *checkpointWriter) {
81+
now := time.Now()
82+
aggregateContainerStateMap := buildAggregateContainerStateMap(vpa, writer.cluster, now)
83+
for container, aggregatedContainerState := range aggregateContainerStateMap {
84+
containerCheckpoint, err := aggregatedContainerState.SaveToCheckpoint()
85+
if err != nil {
86+
klog.ErrorS(err, "Cannot serialize checkpoint", "vpa", klog.KRef(vpa.ID.Namespace, vpa.ID.VpaName), "container", container)
87+
continue
88+
}
89+
checkpointName := fmt.Sprintf("%s-%s", vpa.ID.VpaName, container)
90+
vpaCheckpoint := vpa_types.VerticalPodAutoscalerCheckpoint{
91+
ObjectMeta: metav1.ObjectMeta{Name: checkpointName},
92+
Spec: vpa_types.VerticalPodAutoscalerCheckpointSpec{
93+
ContainerName: container,
94+
VPAObjectName: vpa.ID.VpaName,
95+
},
96+
Status: *containerCheckpoint,
97+
}
98+
err = api_util.CreateOrUpdateVpaCheckpoint(writer.vpaCheckpointClient.VerticalPodAutoscalerCheckpoints(vpa.ID.Namespace), &vpaCheckpoint)
99+
if err != nil {
100+
klog.ErrorS(err, "Cannot save checkpoint for VPA", "vpa", klog.KRef(vpa.ID.Namespace, vpaCheckpoint.Spec.VPAObjectName), "container", vpaCheckpoint.Spec.ContainerName)
101+
} else {
102+
klog.V(3).InfoS("Saved checkpoint for VPA", "vpa", klog.KRef(vpa.ID.Namespace, vpaCheckpoint.Spec.VPAObjectName), "container", vpaCheckpoint.Spec.ContainerName)
103+
vpa.CheckpointWritten = now
104+
}
105+
}
106+
}
107+
108+
func (writer *checkpointWriter) StoreCheckpoints(ctx context.Context, concurrentWorkers int) {
80109
vpas := getVpasToCheckpoint(writer.cluster.VPAs())
110+
111+
// Create a channel to send VPA updates to workers
112+
vpaCheckpointUpdates := make(chan *model.Vpa, len(vpas))
113+
114+
// Create a wait group to wait for all workers to finish
115+
var wg sync.WaitGroup
116+
// Start workers. Each worker processes at least one checkpoint before checking for a cancelled context.
117+
for i := 0; i < concurrentWorkers; i++ {
118+
wg.Add(1)
119+
go func() {
120+
defer wg.Done()
121+
for vpaToCheckpoint := range vpaCheckpointUpdates {
122+
processCheckpointUpdateForVPA(vpaToCheckpoint, writer)
123+
select {
124+
case <-ctx.Done():
125+
return
126+
default:
127+
}
128+
}
129+
}()
130+
}
131+
132+
// Send VPA Checkpoint updates to the workers
81133
for _, vpa := range vpas {
134+
vpaCheckpointUpdates <- vpa
135+
}
82136

83-
// Draining ctx.Done() channel. ctx.Err() will be checked if timeout occurred, but minCheckpoints have
84-
// to be written before return from this function.
85-
select {
86-
case <-ctx.Done():
87-
default:
88-
}
137+
// Close the channel to signal workers to stop after draining the channel
138+
close(vpaCheckpointUpdates)
89139

90-
if ctx.Err() != nil && minCheckpoints <= 0 {
91-
return ctx.Err()
92-
}
140+
// Wait for all workers to finish
141+
wg.Wait()
93142

94-
aggregateContainerStateMap := buildAggregateContainerStateMap(vpa, writer.cluster, now)
95-
for container, aggregatedContainerState := range aggregateContainerStateMap {
96-
containerCheckpoint, err := aggregatedContainerState.SaveToCheckpoint()
97-
if err != nil {
98-
klog.ErrorS(err, "Cannot serialize checkpoint", "vpa", klog.KRef(vpa.ID.Namespace, vpa.ID.VpaName), "container", container)
99-
continue
100-
}
101-
checkpointName := fmt.Sprintf("%s-%s", vpa.ID.VpaName, container)
102-
vpaCheckpoint := vpa_types.VerticalPodAutoscalerCheckpoint{
103-
ObjectMeta: metav1.ObjectMeta{Name: checkpointName},
104-
Spec: vpa_types.VerticalPodAutoscalerCheckpointSpec{
105-
ContainerName: container,
106-
VPAObjectName: vpa.ID.VpaName,
107-
},
108-
Status: *containerCheckpoint,
109-
}
110-
err = api_util.CreateOrUpdateVpaCheckpoint(writer.vpaCheckpointClient.VerticalPodAutoscalerCheckpoints(vpa.ID.Namespace), &vpaCheckpoint)
111-
if err != nil {
112-
klog.ErrorS(err, "Cannot save checkpoint for VPA", "vpa", klog.KRef(vpa.ID.Namespace, vpaCheckpoint.Spec.VPAObjectName), "container", vpaCheckpoint.Spec.ContainerName)
113-
} else {
114-
klog.V(3).InfoS("Saved checkpoint for VPA", "vpa", klog.KRef(vpa.ID.Namespace, vpaCheckpoint.Spec.VPAObjectName), "container", vpaCheckpoint.Spec.ContainerName)
115-
vpa.CheckpointWritten = now
116-
}
117-
minCheckpoints--
118-
}
143+
if ctx.Err() != nil {
144+
klog.V(0).InfoS("Failed to store all checkpoints within the configured `checkpoints-timeout`", "err", ctx.Err())
119145
}
120-
return nil
121146
}
122147

123148
// Build the AggregateContainerState for the purpose of the checkpoint. This is an aggregation of state of all

vertical-pod-autoscaler/pkg/recommender/checkpoint/checkpoint_writer_test.go

Lines changed: 77 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,17 +17,26 @@ limitations under the License.
1717
package checkpoint
1818

1919
import (
20+
"bytes"
21+
"context"
2022
"fmt"
2123
"testing"
2224
"time"
2325

26+
"github.com/stretchr/testify/assert"
27+
autoscalingv1 "k8s.io/api/autoscaling/v1"
2428
v1 "k8s.io/api/core/v1"
2529
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
30+
"k8s.io/apimachinery/pkg/labels"
31+
"k8s.io/apimachinery/pkg/runtime"
32+
core "k8s.io/client-go/testing"
33+
"k8s.io/klog/v2"
34+
klogtest "k8s.io/klog/v2/test"
2635

2736
vpa_types "k8s.io/autoscaler/vertical-pod-autoscaler/pkg/apis/autoscaling.k8s.io/v1"
37+
fakeautoscalingv1 "k8s.io/autoscaler/vertical-pod-autoscaler/pkg/client/clientset/versioned/typed/autoscaling.k8s.io/v1/fake"
2838
"k8s.io/autoscaler/vertical-pod-autoscaler/pkg/recommender/model"
29-
30-
"github.com/stretchr/testify/assert"
39+
"k8s.io/autoscaler/vertical-pod-autoscaler/pkg/utils/test"
3140
)
3241

3342
// TODO: Extract these constants to a common test module.
@@ -171,3 +180,69 @@ func TestGetVpasToCheckpointSorts(t *testing.T) {
171180
assert.Equal(t, genVpaID(2), result[2].ID)
172181

173182
}
183+
184+
func TestStoreCheckpointsMakesProgressEvenForCancelledContext(t *testing.T) {
185+
klogtest.InitKlog(t)
186+
tmpLogBuffer := bytes.NewBuffer(nil)
187+
klog.SetOutput(tmpLogBuffer)
188+
189+
concurrentWorkers := 2
190+
191+
// immediately cancel the context to check if at least checkpoints for `concurrentWorkers` number of VPAs get written
192+
ctx, cancelFunc := context.WithCancel(context.Background())
193+
cancelFunc()
194+
clusterState := model.NewClusterState(testGcPeriod)
195+
196+
// prepare ClusterState with 5 VPAs referencing Pods
197+
vpaBuilder := test.VerticalPodAutoscaler().WithContainer("container-1").WithContainer("container-2").WithNamespace("test-namespace")
198+
199+
for i := 0; i < 5; i++ {
200+
targetRef := &autoscalingv1.CrossVersionObjectReference{
201+
Kind: "Pod",
202+
Name: fmt.Sprintf("pod-%d", i),
203+
APIVersion: "apps/v1",
204+
}
205+
labelSelector, _ := labels.Parse(fmt.Sprintf("app=pod-%d", i))
206+
vpa := vpaBuilder.WithName(fmt.Sprintf("vpa-%d", i)).WithTargetRef(targetRef).Get()
207+
err := clusterState.AddOrUpdateVpa(vpa, labelSelector)
208+
assert.NoError(t, err)
209+
}
210+
211+
// prepare ClusterState with 5 pods that have 2 containers each
212+
for i := 0; i < 5; i++ {
213+
podID := model.PodID{
214+
Namespace: "test-namespace",
215+
PodName: fmt.Sprintf("pod-%d", i),
216+
}
217+
podLabels := map[string]string{"app": fmt.Sprintf("pod-%d", i)}
218+
clusterState.AddOrUpdatePod(podID, podLabels, v1.PodRunning)
219+
for j := 0; j < 2; j++ {
220+
containerID := model.ContainerID{
221+
PodID: podID,
222+
ContainerName: fmt.Sprintf("container-%d", j),
223+
}
224+
err := clusterState.AddOrUpdateContainer(containerID, testRequest)
225+
assert.NoError(t, err)
226+
}
227+
}
228+
229+
patchedCheckpoints := []string{}
230+
checkpointClient := &fakeautoscalingv1.FakeAutoscalingV1{Fake: &core.Fake{}}
231+
checkpointClient.Fake.AddReactor("patch", "verticalpodautoscalercheckpoints", func(action core.Action) (handled bool, ret runtime.Object, err error) {
232+
patchAction := action.(core.PatchAction)
233+
name := patchAction.GetName()
234+
time.Sleep(2 * time.Millisecond) // Simulate some delay in patching, such that we can test the timeout
235+
patchedCheckpoints = append(patchedCheckpoints, name)
236+
237+
return true, nil, nil
238+
})
239+
240+
writer := NewCheckpointWriter(clusterState, checkpointClient)
241+
writer.StoreCheckpoints(ctx, concurrentWorkers)
242+
243+
// Because we have 2 concurrent workers, expect 2 VPAs to get processed. Each worker picks a VPA to process before checking if the context has been cancelled.
244+
// Each VPA has 2 Containers, therefore we expect 4 Checkpoints to be written
245+
assert.Equal(t, 4, len(patchedCheckpoints), "Expected 4 checkpoints to be written, but got %d", len(patchedCheckpoints))
246+
247+
assert.Contains(t, tmpLogBuffer.String(), "context canceled")
248+
}

vertical-pod-autoscaler/pkg/recommender/main.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@ var (
6565
address = flag.String("address", ":8942", "The address to expose Prometheus metrics.")
6666
storage = flag.String("storage", "", `Specifies storage mode. Supported values: prometheus, checkpoint (default)`)
6767
memorySaver = flag.Bool("memory-saver", false, `If true, only track pods which have an associated VPA`)
68+
updateWorkerCount = flag.Int("update-worker-count", 10, "Number of concurrent workers to update VPA recommendations and checkpoints. When increasing this setting, make sure the client-side rate limits (`kube-api-qps` and `kube-api-burst`) are either increased or turned off as well. Determines the minimum number of VPA checkpoints written per recommender loop.")
6869
)
6970

7071
// Prometheus history provider flags
@@ -142,6 +143,11 @@ func main() {
142143
klog.ErrorS(nil, "--vpa-object-namespace and --ignored-vpa-object-namespaces are mutually exclusive and can't be set together.")
143144
os.Exit(255)
144145
}
146+
147+
if *routines.MinCheckpointsPerRun != 10 { // Default value is 10
148+
klog.InfoS("DEPRECATION WARNING: The 'min-checkpoints' flag is deprecated and has no effect. It will be removed in a future release.")
149+
}
150+
145151
ctx := context.Background()
146152

147153
healthCheck := metrics.NewHealthCheck(*metricsFetcherInterval * 5)
@@ -284,6 +290,7 @@ func run(ctx context.Context, healthCheck *metrics.HealthCheck, commonFlag *comm
284290
RecommendationPostProcessors: postProcessors,
285291
CheckpointsGCInterval: *checkpointsGCInterval,
286292
UseCheckpoints: useCheckpoints,
293+
UpdateWorkerCount: *updateWorkerCount,
287294
}.Make()
288295

289296
promQueryTimeout, err := time.ParseDuration(*queryTimeout)

0 commit comments

Comments
 (0)