Skip to content

Commit 9af06f2

Browse files
authored
[shell-operator] Count tasks to start compaction (#796)
Signed-off-by: Pavel Okhlopkov <pavel.okhlopkov@flant.com>
1 parent 2e65385 commit 9af06f2

18 files changed

+984
-148
lines changed

internal/metrics/metrics.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
package metrics
2+
3+
const (
4+
TasksQueueActionDurationSeconds = "{PREFIX}tasks_queue_action_duration_seconds"
5+
TasksQueueCompactionInQueueTasks = "d8_telemetry_{PREFIX}tasks_queue_compaction_in_queue_tasks"
6+
TasksQueueCompactionReached = "d8_telemetry_{PREFIX}tasks_queue_compaction_reached"
7+
)

pkg/debug/debug-cmd.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,8 +49,10 @@ func DefineDebugCommands(kpApp *kingpin.Application) {
4949
}
5050
}
5151

52+
client := Queue(DefaultClient())
53+
5254
for {
53-
out, err := Queue(DefaultClient()).List(outputFormat, showEmpty)
55+
out, err := client.List(outputFormat, showEmpty)
5456
if err != nil {
5557
return err
5658
}

pkg/shell-operator/combine_binding_context_test.go

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
. "github.com/onsi/gomega"
99
"github.com/stretchr/testify/assert"
1010

11+
"github.com/flant/shell-operator/internal/metrics"
1112
bindingcontext "github.com/flant/shell-operator/pkg/hook/binding_context"
1213
"github.com/flant/shell-operator/pkg/hook/task_metadata"
1314
"github.com/flant/shell-operator/pkg/hook/types"
@@ -22,14 +23,16 @@ func Test_CombineBindingContext_MultipleHooks(t *testing.T) {
2223

2324
metricStorage := metric.NewStorageMock(t)
2425
metricStorage.HistogramObserveMock.Set(func(metric string, value float64, labels map[string]string, buckets []float64) {
25-
assert.Equal(t, metric, "{PREFIX}tasks_queue_action_duration_seconds")
26+
assert.Equal(t, metric, metrics.TasksQueueActionDurationSeconds)
2627
assert.NotZero(t, value)
2728
assert.Equal(t, map[string]string{
2829
"queue_action": "AddLast",
2930
"queue_name": "test_multiple_hooks",
3031
}, labels)
3132
assert.Nil(t, buckets)
3233
})
34+
metricStorage.GaugeSetMock.Set(func(_ string, _ float64, _ map[string]string) {
35+
})
3336

3437
TaskQueues := queue.NewTaskQueueSet().WithMetricStorage(metricStorage)
3538
TaskQueues.WithContext(context.Background())
@@ -130,14 +133,16 @@ func Test_CombineBindingContext_Nil_On_NoCombine(t *testing.T) {
130133

131134
metricStorage := metric.NewStorageMock(t)
132135
metricStorage.HistogramObserveMock.Set(func(metric string, value float64, labels map[string]string, buckets []float64) {
133-
assert.Equal(t, metric, "{PREFIX}tasks_queue_action_duration_seconds")
136+
assert.Equal(t, metric, metrics.TasksQueueActionDurationSeconds)
134137
assert.NotZero(t, value)
135138
assert.Equal(t, map[string]string{
136139
"queue_action": "AddLast",
137140
"queue_name": "test_no_combine",
138141
}, labels)
139142
assert.Nil(t, buckets)
140143
})
144+
metricStorage.GaugeSetMock.Set(func(_ string, _ float64, _ map[string]string) {
145+
})
141146

142147
TaskQueues := queue.NewTaskQueueSet().WithMetricStorage(metricStorage)
143148
TaskQueues.WithContext(context.Background())
@@ -203,14 +208,16 @@ func Test_CombineBindingContext_Group_Compaction(t *testing.T) {
203208

204209
metricStorage := metric.NewStorageMock(t)
205210
metricStorage.HistogramObserveMock.Set(func(metric string, value float64, labels map[string]string, buckets []float64) {
206-
assert.Equal(t, metric, "{PREFIX}tasks_queue_action_duration_seconds")
211+
assert.Equal(t, metric, metrics.TasksQueueActionDurationSeconds)
207212
assert.NotZero(t, value)
208213
assert.Equal(t, map[string]string{
209214
"queue_action": "AddLast",
210215
"queue_name": "test_multiple_hooks",
211216
}, labels)
212217
assert.Nil(t, buckets)
213218
})
219+
metricStorage.GaugeSetMock.Set(func(_ string, _ float64, _ map[string]string) {
220+
})
214221

215222
TaskQueues := queue.NewTaskQueueSet().WithMetricStorage(metricStorage)
216223
TaskQueues.WithContext(context.Background())
@@ -319,14 +326,16 @@ func Test_CombineBindingContext_Group_Type(t *testing.T) {
319326

320327
metricStorage := metric.NewStorageMock(t)
321328
metricStorage.HistogramObserveMock.Set(func(metric string, value float64, labels map[string]string, buckets []float64) {
322-
assert.Equal(t, metric, "{PREFIX}tasks_queue_action_duration_seconds")
329+
assert.Equal(t, metric, metrics.TasksQueueActionDurationSeconds)
323330
assert.NotZero(t, value)
324331
assert.Equal(t, map[string]string{
325332
"queue_action": "AddLast",
326333
"queue_name": "test_multiple_hooks",
327334
}, labels)
328335
assert.Nil(t, buckets)
329336
})
337+
metricStorage.GaugeSetMock.Set(func(_ string, _ float64, _ map[string]string) {
338+
})
330339

331340
TaskQueues := queue.NewTaskQueueSet().WithMetricStorage(metricStorage)
332341
TaskQueues.WithContext(context.Background())

pkg/shell-operator/metrics_operator.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package shell_operator
33
import (
44
"net/http"
55

6+
"github.com/flant/shell-operator/internal/metrics"
67
"github.com/flant/shell-operator/pkg/app"
78
"github.com/flant/shell-operator/pkg/metric"
89
metricstorage "github.com/flant/shell-operator/pkg/metric_storage"
@@ -32,7 +33,7 @@ func registerCommonMetrics(metricStorage metric.Storage) {
3233
// This function is used in the addon-operator
3334
func registerTaskQueueMetrics(metricStorage metric.Storage) {
3435
metricStorage.RegisterHistogram(
35-
"{PREFIX}tasks_queue_action_duration_seconds",
36+
metrics.TasksQueueActionDurationSeconds,
3637
map[string]string{
3738
"queue_name": "",
3839
"queue_action": "",

pkg/shell-operator/operator.go

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -151,7 +151,8 @@ func (op *ShellOperator) initHookManager() error {
151151
Group: info.Group,
152152
}).
153153
WithLogLabels(logLabels).
154-
WithQueueName(info.QueueName)
154+
WithQueueName(info.QueueName).
155+
WithCompactionID(hook.Name)
155156

156157
logEntry.With("queue", info.QueueName).
157158
Info("queue task", slog.String("name", newTask.GetDescription()))
@@ -178,7 +179,8 @@ func (op *ShellOperator) initHookManager() error {
178179
Group: info.Group,
179180
}).
180181
WithLogLabels(logLabels).
181-
WithQueueName(info.QueueName)
182+
WithQueueName(info.QueueName).
183+
WithCompactionID(hook.Name)
182184

183185
logEntry.With("queue", info.QueueName).
184186
Info("queue task", slog.String("name", newTask.GetDescription()))
@@ -243,7 +245,9 @@ func (op *ShellOperator) initValidatingWebhookManager() error {
243245
Binding: info.Binding,
244246
Group: info.Group,
245247
}).
246-
WithLogLabels(logLabels)
248+
WithLogLabels(logLabels).
249+
WithCompactionID(hook.Name)
250+
247251
admissionTask = newTask
248252
})
249253

@@ -354,7 +358,9 @@ func (op *ShellOperator) conversionEventHandler(ctx context.Context, crdName str
354358
Binding: info.Binding,
355359
Group: info.Group,
356360
}).
357-
WithLogLabels(logLabels)
361+
WithLogLabels(logLabels).
362+
WithCompactionID(hook.Name)
363+
358364
convTask = newTask
359365
})
360366

@@ -483,7 +489,9 @@ func (op *ShellOperator) taskHandleEnableKubernetesBindings(ctx context.Context,
483489
ExecuteOnSynchronization: info.KubernetesBinding.ExecuteHookOnSynchronization,
484490
}).
485491
WithLogLabels(hookLogLabels).
486-
WithQueueName("main")
492+
WithQueueName("main").
493+
WithCompactionID(taskHook.Name)
494+
487495
hookRunTasks = append(hookRunTasks, newTask)
488496
})
489497

@@ -849,7 +857,9 @@ func (op *ShellOperator) bootstrapMainQueue(tqs *queue.TaskQueueSet) {
849857
BindingType: types.OnStartup,
850858
BindingContext: []bindingcontext.BindingContext{bc},
851859
}).
860+
WithCompactionID(hookName).
852861
WithQueuedAt(time.Now())
862+
853863
mainQueue.AddLast(newTask)
854864
logEntry.Info("queue task with hook",
855865
slog.String("task", newTask.GetDescription()),
@@ -866,6 +876,7 @@ func (op *ShellOperator) bootstrapMainQueue(tqs *queue.TaskQueueSet) {
866876
HookName: hookName,
867877
Binding: string(task_metadata.EnableKubernetesBindings),
868878
}).
879+
WithCompactionID(hookName).
869880
WithQueuedAt(time.Now())
870881
mainQueue.AddLast(newTask)
871882
logEntry.Info("queue task with hook",
@@ -879,6 +890,7 @@ func (op *ShellOperator) bootstrapMainQueue(tqs *queue.TaskQueueSet) {
879890
HookName: hookName,
880891
Binding: string(task_metadata.EnableScheduleBindings),
881892
}).
893+
WithCompactionID(hookName).
882894
WithQueuedAt(time.Now())
883895
mainQueue.AddLast(newTask)
884896
logEntry.Info("queue task with hook",

pkg/shell-operator/operator_test.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
. "github.com/onsi/gomega"
99
"github.com/stretchr/testify/assert"
1010

11+
"github.com/flant/shell-operator/internal/metrics"
1112
. "github.com/flant/shell-operator/pkg/hook/task_metadata"
1213
htypes "github.com/flant/shell-operator/pkg/hook/types"
1314
"github.com/flant/shell-operator/pkg/metric"
@@ -23,14 +24,16 @@ func Test_Operator_startup_tasks(t *testing.T) {
2324

2425
metricStorage := metric.NewStorageMock(t)
2526
metricStorage.HistogramObserveMock.Set(func(metric string, value float64, labels map[string]string, buckets []float64) {
26-
assert.Equal(t, metric, "{PREFIX}tasks_queue_action_duration_seconds")
27+
assert.Equal(t, metric, metrics.TasksQueueActionDurationSeconds)
2728
assert.NotZero(t, value)
2829
assert.Equal(t, map[string]string{
2930
"queue_action": "AddLast",
3031
"queue_name": "main",
3132
}, labels)
3233
assert.Nil(t, buckets)
3334
})
35+
metricStorage.GaugeSetMock.Set(func(_ string, _ float64, _ map[string]string) {
36+
})
3437

3538
op := NewShellOperator(context.Background(), WithLogger(log.NewNop()))
3639
op.MetricStorage = metricStorage

pkg/task/dump/dump_test.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111
"github.com/stretchr/testify/assert"
1212
"sigs.k8s.io/yaml"
1313

14+
"github.com/flant/shell-operator/internal/metrics"
1415
"github.com/flant/shell-operator/pkg/hook/task_metadata"
1516
"github.com/flant/shell-operator/pkg/metric"
1617
"github.com/flant/shell-operator/pkg/task"
@@ -74,7 +75,7 @@ func Test_Dump(t *testing.T) {
7475

7576
metricStorage := metric.NewStorageMock(t)
7677
metricStorage.HistogramObserveMock.Set(func(metric string, value float64, labels map[string]string, buckets []float64) {
77-
assert.Equal(t, metric, "{PREFIX}tasks_queue_action_duration_seconds")
78+
assert.Equal(t, metric, metrics.TasksQueueActionDurationSeconds)
7879
assert.NotZero(t, value)
7980

8081
mapSlice := []map[string]string{
@@ -94,6 +95,8 @@ func Test_Dump(t *testing.T) {
9495
assert.Contains(t, mapSlice, labels)
9596
assert.Nil(t, buckets)
9697
})
98+
metricStorage.GaugeSetMock.Set(func(_ string, _ float64, _ map[string]string) {
99+
})
97100

98101
tqs := queue.NewTaskQueueSet().WithMetricStorage(metricStorage)
99102
tqs.WithMainName("main")

pkg/task/queue/queue_set.go

Lines changed: 7 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -75,35 +75,13 @@ func (tqs *TaskQueueSet) Add(queue *TaskQueue) {
7575
tqs.m.Unlock()
7676
}
7777

78-
type QueueOption func(*TaskQueue)
79-
80-
func WithCompactableTypes(taskTypes ...task.TaskType) QueueOption {
81-
return func(q *TaskQueue) {
82-
q.compactableTypes = make(map[task.TaskType]struct{}, len(taskTypes))
83-
for _, taskType := range taskTypes {
84-
q.compactableTypes[taskType] = struct{}{}
85-
}
86-
}
87-
}
88-
89-
func WithCompactionCallback(callback func(compactedTasks []task.Task, targetTask task.Task)) QueueOption {
90-
return func(q *TaskQueue) {
91-
q.compactionCallback = callback
92-
}
93-
}
94-
95-
func WithLogger(logger *log.Logger) QueueOption {
96-
return func(q *TaskQueue) {
97-
q.logger = logger
98-
}
99-
}
100-
101-
func (tqs *TaskQueueSet) NewNamedQueue(name string, handler func(ctx context.Context, t task.Task) TaskResult, opts ...QueueOption) {
102-
q := NewTasksQueue()
103-
q.WithName(name)
104-
q.WithHandler(handler)
105-
q.WithContext(tqs.ctx)
106-
q.WithMetricStorage(tqs.metricStorage)
78+
func (tqs *TaskQueueSet) NewNamedQueue(name string, handler func(ctx context.Context, t task.Task) TaskResult, opts ...TaskQueueOption) {
79+
q := NewTasksQueue(
80+
tqs.metricStorage,
81+
WithName(name),
82+
WithHandler(handler),
83+
WithContext(tqs.ctx),
84+
)
10785

10886
for _, opt := range opts {
10987
opt(q)

0 commit comments

Comments
 (0)