Skip to content

Commit 2e65385

Browse files
authored
[shell-operator] fix queue compaction (#794)
Signed-off-by: Pavel Okhlopkov <pavel.okhlopkov@flant.com>
1 parent 066c21e commit 2e65385

File tree

12 files changed

+341
-207
lines changed

12 files changed

+341
-207
lines changed

pkg/shell-operator/combine_binding_context_test.go

Lines changed: 16 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -33,15 +33,14 @@ func Test_CombineBindingContext_MultipleHooks(t *testing.T) {
3333

3434
TaskQueues := queue.NewTaskQueueSet().WithMetricStorage(metricStorage)
3535
TaskQueues.WithContext(context.Background())
36-
TaskQueues.NewNamedQueue("test_multiple_hooks", queue.QueueOpts{
37-
Handler: func(_ context.Context, _ task.Task) queue.TaskResult {
36+
TaskQueues.NewNamedQueue("test_multiple_hooks",
37+
func(_ context.Context, _ task.Task) queue.TaskResult {
3838
return queue.TaskResult{
3939
Status: "Success",
4040
}
4141
},
42-
CompactableTypes: []task.TaskType{task_metadata.HookRun},
43-
CompactionCallback: nil,
44-
})
42+
queue.WithCompactableTypes(task_metadata.HookRun),
43+
)
4544

4645
tasks := []task.Task{
4746
task.NewTask(task_metadata.HookRun).
@@ -142,15 +141,14 @@ func Test_CombineBindingContext_Nil_On_NoCombine(t *testing.T) {
142141

143142
TaskQueues := queue.NewTaskQueueSet().WithMetricStorage(metricStorage)
144143
TaskQueues.WithContext(context.Background())
145-
TaskQueues.NewNamedQueue("test_no_combine", queue.QueueOpts{
146-
Handler: func(_ context.Context, _ task.Task) queue.TaskResult {
144+
TaskQueues.NewNamedQueue("test_no_combine",
145+
func(_ context.Context, _ task.Task) queue.TaskResult {
147146
return queue.TaskResult{
148147
Status: "Success",
149148
}
150149
},
151-
CompactableTypes: []task.TaskType{task_metadata.HookRun},
152-
CompactionCallback: nil,
153-
})
150+
queue.WithCompactableTypes(task_metadata.HookRun),
151+
)
154152

155153
tasks := []task.Task{
156154
task.NewTask(task_metadata.HookRun).
@@ -216,15 +214,14 @@ func Test_CombineBindingContext_Group_Compaction(t *testing.T) {
216214

217215
TaskQueues := queue.NewTaskQueueSet().WithMetricStorage(metricStorage)
218216
TaskQueues.WithContext(context.Background())
219-
TaskQueues.NewNamedQueue("test_multiple_hooks", queue.QueueOpts{
220-
Handler: func(_ context.Context, _ task.Task) queue.TaskResult {
217+
TaskQueues.NewNamedQueue("test_multiple_hooks",
218+
func(_ context.Context, _ task.Task) queue.TaskResult {
221219
return queue.TaskResult{
222220
Status: "Success",
223221
}
224222
},
225-
CompactableTypes: []task.TaskType{task_metadata.HookRun},
226-
CompactionCallback: nil,
227-
})
223+
queue.WithCompactableTypes(task_metadata.HookRun),
224+
)
228225

229226
bcMeta := bindingcontext.BindingContext{}.Metadata
230227
bcMeta.Group = "pods"
@@ -333,15 +330,14 @@ func Test_CombineBindingContext_Group_Type(t *testing.T) {
333330

334331
TaskQueues := queue.NewTaskQueueSet().WithMetricStorage(metricStorage)
335332
TaskQueues.WithContext(context.Background())
336-
TaskQueues.NewNamedQueue("test_multiple_hooks", queue.QueueOpts{
337-
Handler: func(_ context.Context, _ task.Task) queue.TaskResult {
333+
TaskQueues.NewNamedQueue("test_multiple_hooks",
334+
func(_ context.Context, _ task.Task) queue.TaskResult {
338335
return queue.TaskResult{
339336
Status: "Success",
340337
}
341338
},
342-
CompactableTypes: []task.TaskType{task_metadata.HookRun},
343-
CompactionCallback: nil,
344-
})
339+
queue.WithCompactableTypes(task_metadata.HookRun),
340+
)
345341

346342
bcMeta := bindingcontext.BindingContext{}.Metadata
347343
bcMeta.Group = "pods"

pkg/shell-operator/operator.go

Lines changed: 16 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -505,7 +505,7 @@ func (op *ShellOperator) taskHandleEnableKubernetesBindings(ctx context.Context,
505505
for _, t := range hookRunTasks {
506506
t.WithQueuedAt(now)
507507
}
508-
res.HeadTasks = hookRunTasks
508+
res.AddHeadTasks(hookRunTasks...)
509509
}
510510

511511
op.MetricStorage.CounterAdd("{PREFIX}hook_enable_kubernetes_bindings_errors_total", errors, metricLabels)
@@ -822,12 +822,11 @@ func (op *ShellOperator) bootstrapMainQueue(tqs *queue.TaskQueueSet) {
822822

823823
// Prepopulate main queue with 'onStartup' tasks and 'enable kubernetes bindings' tasks.
824824
tqs.WithMainName("main")
825-
tqs.NewNamedQueue("main", queue.QueueOpts{
826-
Handler: op.taskHandler,
827-
CompactableTypes: []task.TaskType{task_metadata.HookRun},
828-
CompactionCallback: nil,
829-
Logger: op.logger.With("operator.component", "mainQueue"),
830-
})
825+
tqs.NewNamedQueue("main",
826+
op.taskHandler,
827+
queue.WithCompactableTypes(task_metadata.HookRun),
828+
queue.WithLogger(op.logger.With("operator.component", "mainQueue")),
829+
)
831830

832831
mainQueue := tqs.GetMain()
833832

@@ -896,12 +895,11 @@ func (op *ShellOperator) initAndStartHookQueues() {
896895
h := op.HookManager.GetHook(hookName)
897896
for _, hookBinding := range h.Config.Schedules {
898897
if op.TaskQueues.GetByName(hookBinding.Queue) == nil {
899-
op.TaskQueues.NewNamedQueue(hookBinding.Queue, queue.QueueOpts{
900-
Handler: op.taskHandler,
901-
CompactableTypes: []task.TaskType{task_metadata.HookRun},
902-
CompactionCallback: nil,
903-
Logger: op.logger.With("operator.component", "hookQueue", "hook", hookName, "queue", hookBinding.Queue),
904-
})
898+
op.TaskQueues.NewNamedQueue(hookBinding.Queue,
899+
op.taskHandler,
900+
queue.WithCompactableTypes(task_metadata.HookRun),
901+
queue.WithLogger(op.logger.With("operator.component", "hookQueue", "hook", hookName, "queue", hookBinding.Queue)),
902+
)
905903
op.TaskQueues.GetByName(hookBinding.Queue).Start(op.ctx)
906904
}
907905
}
@@ -912,12 +910,11 @@ func (op *ShellOperator) initAndStartHookQueues() {
912910
h := op.HookManager.GetHook(hookName)
913911
for _, hookBinding := range h.Config.OnKubernetesEvents {
914912
if op.TaskQueues.GetByName(hookBinding.Queue) == nil {
915-
op.TaskQueues.NewNamedQueue(hookBinding.Queue, queue.QueueOpts{
916-
Handler: op.taskHandler,
917-
CompactableTypes: []task.TaskType{task_metadata.HookRun},
918-
CompactionCallback: nil,
919-
Logger: op.logger.With("operator.component", "hookQueue", "hook", hookName, "queue", hookBinding.Queue),
920-
})
913+
op.TaskQueues.NewNamedQueue(hookBinding.Queue,
914+
op.taskHandler,
915+
queue.WithCompactableTypes(task_metadata.HookRun),
916+
queue.WithLogger(op.logger.With("operator.component", "hookQueue", "hook", hookName, "queue", hookBinding.Queue)),
917+
)
921918
op.TaskQueues.GetByName(hookBinding.Queue).Start(op.ctx)
922919
}
923920
}

pkg/task/dump/dump_test.go

Lines changed: 4 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -107,11 +107,7 @@ func Test_Dump(t *testing.T) {
107107

108108
// Create and fill main queue.
109109
t.Run("single main queue", func(t *testing.T) {
110-
tqs.NewNamedQueue("main", queue.QueueOpts{
111-
Handler: nil,
112-
CompactableTypes: []task.TaskType{task_metadata.HookRun},
113-
CompactionCallback: nil,
114-
})
110+
tqs.NewNamedQueue("main", nil, queue.WithCompactableTypes(task_metadata.HookRun))
115111

116112
// Single empty main should be reported only as summary.
117113
dump := testDumpQueuesWrapper(tqs, "text", true)
@@ -135,11 +131,8 @@ func Test_Dump(t *testing.T) {
135131

136132
// Create and fill active queue.
137133
t.Run("fill active queue", func(_ *testing.T) {
138-
tqs.NewNamedQueue("active-queue", queue.QueueOpts{
139-
Handler: nil,
140-
CompactableTypes: []task.TaskType{task_metadata.HookRun},
141-
CompactionCallback: nil,
142-
})
134+
tqs.NewNamedQueue("active-queue", nil, queue.WithCompactableTypes(task_metadata.HookRun))
135+
143136
fillQueue(tqs.GetByName("active-queue"), activeTasks)
144137

145138
dump := testDumpQueuesWrapper(tqs, "text", true)
@@ -150,11 +143,7 @@ func Test_Dump(t *testing.T) {
150143

151144
// Create empty queue.
152145
t.Run("create empty queue", func(t *testing.T) {
153-
tqs.NewNamedQueue("empty", queue.QueueOpts{
154-
Handler: nil,
155-
CompactableTypes: []task.TaskType{task_metadata.HookRun},
156-
CompactionCallback: nil,
157-
})
146+
tqs.NewNamedQueue("empty", nil, queue.WithCompactableTypes(task_metadata.HookRun))
158147

159148
dump := testDumpQueuesWrapper(tqs, "text", true)
160149
t.Log(dump)

pkg/task/queue/queue_set.go

Lines changed: 30 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -75,26 +75,44 @@ func (tqs *TaskQueueSet) Add(queue *TaskQueue) {
7575
tqs.m.Unlock()
7676
}
7777

78-
type QueueOpts struct {
79-
Handler func(ctx context.Context, t task.Task) TaskResult
80-
CompactableTypes []task.TaskType
81-
CompactionCallback func(compactedTasks []task.Task, targetTask task.Task)
82-
Logger *log.Logger
78+
type QueueOption func(*TaskQueue)
79+
80+
func WithCompactableTypes(taskTypes ...task.TaskType) QueueOption {
81+
return func(q *TaskQueue) {
82+
q.compactableTypes = make(map[task.TaskType]struct{}, len(taskTypes))
83+
for _, taskType := range taskTypes {
84+
q.compactableTypes[taskType] = struct{}{}
85+
}
86+
}
87+
}
88+
89+
func WithCompactionCallback(callback func(compactedTasks []task.Task, targetTask task.Task)) QueueOption {
90+
return func(q *TaskQueue) {
91+
q.compactionCallback = callback
92+
}
8393
}
8494

85-
func (tqs *TaskQueueSet) NewNamedQueue(name string, opts QueueOpts) {
95+
func WithLogger(logger *log.Logger) QueueOption {
96+
return func(q *TaskQueue) {
97+
q.logger = logger
98+
}
99+
}
100+
101+
func (tqs *TaskQueueSet) NewNamedQueue(name string, handler func(ctx context.Context, t task.Task) TaskResult, opts ...QueueOption) {
86102
q := NewTasksQueue()
87103
q.WithName(name)
88-
q.WithHandler(opts.Handler)
104+
q.WithHandler(handler)
89105
q.WithContext(tqs.ctx)
90106
q.WithMetricStorage(tqs.metricStorage)
91-
q.WithCompactableTypes(opts.CompactableTypes)
92-
if opts.CompactionCallback != nil {
93-
q.WithCompactionCallback(opts.CompactionCallback)
107+
108+
for _, opt := range opts {
109+
opt(q)
94110
}
95-
if opts.Logger != nil {
96-
q.WithLogger(opts.Logger)
111+
112+
if q.logger == nil {
113+
q.logger = log.NewLogger().Named("task_queue")
97114
}
115+
98116
tqs.m.Lock()
99117
tqs.Queues[name] = q
100118
tqs.m.Unlock()

pkg/task/queue/task_queue.go

Lines changed: 28 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -166,16 +166,21 @@ func (q *TaskQueueSlice) Length() int {
166166
}
167167

168168
// AddFirst adds new head element.
169-
func (q *TaskQueueSlice) AddFirst(t task.Task) {
169+
func (q *TaskQueueSlice) AddFirst(tasks ...task.Task) {
170170
defer q.MeasureActionTime("AddFirst")()
171171
q.withLock(func() {
172-
q.addFirst(t)
172+
q.addFirst(tasks...)
173173
})
174174
}
175175

176176
// addFirst adds new head element.
177-
func (q *TaskQueueSlice) addFirst(t task.Task) {
178-
q.items = append([]task.Task{t}, q.items...)
177+
func (q *TaskQueueSlice) addFirst(tasks ...task.Task) {
178+
// Also, add tasks in reverse order
179+
// at the start of the queue. The first task in HeadTasks
180+
// become the new first task in the queue.
181+
for i := len(tasks) - 1; i >= 0; i-- {
182+
q.items = append([]task.Task{tasks[i]}, q.items...)
183+
}
179184
}
180185

181186
// RemoveFirst deletes a head element, so head is moved.
@@ -215,20 +220,26 @@ func (q *TaskQueueSlice) GetFirst() task.Task {
215220
}
216221

217222
// AddLast adds new tail element.
218-
func (q *TaskQueueSlice) AddLast(t task.Task) {
223+
func (q *TaskQueueSlice) AddLast(tasks ...task.Task) {
219224
defer q.MeasureActionTime("AddLast")()
220225
q.withLock(func() {
221-
q.addLast(t)
226+
q.addLast(tasks...)
222227
})
223228
}
224229

225230
// addFirst adds new tail element.
226-
func (q *TaskQueueSlice) addLast(t task.Task) {
227-
q.items = append(q.items, t)
228-
taskType := t.GetType()
231+
func (q *TaskQueueSlice) addLast(tasks ...task.Task) {
232+
for _, t := range tasks {
233+
q.items = append(q.items, t)
234+
taskType := t.GetType()
229235

230-
if _, ok := q.CompactableTypes[taskType]; ok {
231-
q.isCompactable = true
236+
if q.isCompactable {
237+
continue
238+
}
239+
240+
if _, ok := q.CompactableTypes[taskType]; ok {
241+
q.isCompactable = true
242+
}
232243
}
233244

234245
if q.isCompactable && len(q.items) > 100 {
@@ -714,8 +725,8 @@ func (q *TaskQueueSlice) Start(ctx context.Context) {
714725
case Success, Keep:
715726
// Insert new tasks right after the current task in reverse order.
716727
q.withLock(func() {
717-
for i := len(taskRes.AfterTasks) - 1; i >= 0; i-- {
718-
q.addAfter(t.GetId(), taskRes.AfterTasks[i])
728+
for i := len(taskRes.afterTasks) - 1; i >= 0; i-- {
729+
q.addAfter(t.GetId(), taskRes.afterTasks[i])
719730
}
720731
// Remove current task on success.
721732
if taskRes.Status == Success {
@@ -724,16 +735,11 @@ func (q *TaskQueueSlice) Start(ctx context.Context) {
724735
// Reset processing flag for kept task
725736
t.SetProcessing(false)
726737
}
727-
// Also, add HeadTasks in reverse order
728-
// at the start of the queue. The first task in HeadTasks
729-
// become the new first task in the queue.
730-
for i := len(taskRes.HeadTasks) - 1; i >= 0; i-- {
731-
q.addFirst(taskRes.HeadTasks[i])
732-
}
738+
739+
q.addFirst(taskRes.headTasks...)
740+
733741
// Add tasks to the end of the queue
734-
for _, newTask := range taskRes.TailTasks {
735-
q.addLast(newTask)
736-
}
742+
q.addLast(taskRes.GetTailTasks()...)
737743
})
738744
q.SetStatus("")
739745
case Repeat:

pkg/task/queue/task_queue_benchmark_test.go

Lines changed: 24 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -53,9 +53,30 @@ func (t *mockTaskBench) SetProp(_ string, _ interface{}) {}
5353
func (t *mockTaskBench) GetQueuedAt() time.Time { return time.Now() }
5454
func (t *mockTaskBench) WithQueuedAt(_ time.Time) task.Task { return t }
5555

56+
func (t *mockTaskBench) deepCopy() *mockTaskBench {
57+
newTask := &mockTaskBench{
58+
Id: t.Id,
59+
Type: t.Type,
60+
FailureCount: t.FailureCount,
61+
FailureMessage: t.FailureMessage,
62+
Metadata: t.Metadata,
63+
}
64+
65+
// Copy atomic bool value
66+
newTask.processing.Store(t.processing.Load())
67+
68+
return newTask
69+
}
70+
71+
func (t *mockTaskBench) DeepCopyWithNewUUID() task.Task {
72+
newTask := t.deepCopy()
73+
newTask.Id = uuid.Must(uuid.NewV4()).String()
74+
return newTask
75+
}
76+
5677
type Queue interface {
57-
AddLast(t task.Task)
58-
AddFirst(t task.Task)
78+
AddLast(tasks ...task.Task)
79+
AddFirst(tasks ...task.Task)
5980
GetFirst() task.Task
6081
RemoveFirst() task.Task
6182
Get(id string) task.Task
@@ -228,7 +249,7 @@ func benchmarkTaskQueueCompaction(b *testing.B, size int) {
228249
for i := 0; i < b.N; i++ {
229250
b.StopTimer()
230251
q := NewTasksQueue()
231-
q.WithCompactableTypes([]task.TaskType{task_metadata.HookRun})
252+
q.compactableTypes = map[task.TaskType]struct{}{task_metadata.HookRun: {}}
232253
tasks := createCompactionBenchmarkData(b, size)
233254
// Setup queue without triggering compaction
234255
for _, t := range tasks {

0 commit comments

Comments
 (0)