Skip to content

Commit aea2e32

Browse files
authored
✨ Add a priority queue (#3014)
* :sparkling: POC of a priority queue This change contains the POC of a priority workqueue that allows to prioritize events over one another. It is opt-in and will by default de-prioritize events originating from the initial listwatch and from periodic resyncs. * Use a btree, it is faster ``` $ benchstat slice.txt btree.txt goos: darwin goarch: arm64 pkg: sigs.k8s.io/controller-runtime/pkg/controllerworkqueue cpu: Apple M2 Pro │ slice.txt │ btree.txt │ │ sec/op │ sec/op vs base │ AddGetDone-10 5.078m ± 0% 1.163m ± 0% -77.09% (p=0.000 n=10) │ slice.txt │ btree.txt │ │ B/op │ B/op vs base │ AddGetDone-10 55.11Ki ± 0% 46.98Ki ± 0% -14.75% (p=0.000 n=10) │ slice.txt │ btree.txt │ │ allocs/op │ allocs/op vs base │ AddGetDone-10 3.000k ± 0% 1.000k ± 0% -66.67% (p=0.000 n=10) ``` * Add fuzztest and fix bug it found * Fix handler * Move into package priorityqueue * Metric: Adds are only counted if the object didn't exist yet * Validate correct usage of btree and tick * Add retry metrics * Fix missing notification when item is added * Add tests for handler * Controller tests * Add some benchmarks * Make Add non-blocking * Revert "Make Add non-blocking" This reverts commit ce23de5. Speedup is tiny and at the expense of increased mem usage (which due to increasing GC pressure is likely the explanation why its so small), so doesn't seem worth it overall: ``` goos: darwin goarch: arm64 pkg: sigs.k8s.io/controller-runtime/pkg/controller/priorityqueue cpu: Apple M2 Pro │ blocking.txt │ non-blocking.txt │ │ sec/op │ sec/op vs base │ AddGetDone-10 1.320m ± 1% 1.410m ± 0% +6.81% (p=0.000 n=10) AddOnly-10 373.9µ ± 1% 343.2µ ± 1% -8.22% (p=0.000 n=10) AddLockContended-10 375.8µ ± 1% 342.8µ ± 1% -8.78% (p=0.000 n=10) geomean 570.3µ 549.4µ -3.66% │ blocking.txt │ non-blocking.txt │ │ B/op │ B/op vs base │ AddGetDone-10 109.9Ki ± 0% 164.2Ki ± 0% +49.42% (p=0.000 n=10) AddOnly-10 553.0 ± 2% 56045.0 ± 0% +10034.72% (p=0.000 n=10) AddLockContended-10 569.0 ± 6% 56045.0 ± 0% +9749.74% (p=0.000 n=10) geomean 3.207Ki 78.94Ki +2361.60% │ blocking.txt │ non-blocking.txt │ │ allocs/op │ allocs/op vs base │ AddGetDone-10 3.013k ± 0% 5.001k ± 0% +65.98% (p=0.000 n=10) AddOnly-10 16.00 ± 6% 2000.00 ± 0% +12400.00% (p=0.000 n=10) AddLockContended-10 16.00 ± 6% 2000.00 ± 0% +12400.00% (p=0.000 n=10) geomean 91.71 2.715k +2860.01% ``` * Remove unneccesarry timestamp * Consolidate require directiv * Godocs and simplification * Fix priorityqueue defaulting * Avoid unnecessary else when returning
1 parent 203ef4e commit aea2e32

20 files changed

+1599
-106
lines changed

.golangci.yml

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -165,6 +165,14 @@ issues:
165165
- linters:
166166
- dupl
167167
path: _test\.go
168+
- linters:
169+
- revive
170+
path: .*/internal/.*
171+
- linters:
172+
- unused
173+
# Seems to incorrectly trigger on the two implementations that are only
174+
# used through an interface and not directly..?
175+
path: pkg/controller/priorityqueue/metrics\.go
168176

169177
run:
170178
go: "1.23"

.gomodcheck.yaml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,3 +12,6 @@ excludedModules:
1212
# --- test dependencies:
1313
- github.com/onsi/ginkgo/v2
1414
- github.com/onsi/gomega
15+
16+
# --- We want a newer version with generics support for this
17+
- github.com/google/btree

examples/priorityqueue/main.go

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
/*
2+
Copyright 2024 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package main
18+
19+
import (
20+
"context"
21+
"fmt"
22+
"os"
23+
"time"
24+
25+
corev1 "k8s.io/api/core/v1"
26+
"sigs.k8s.io/controller-runtime/pkg/builder"
27+
kubeconfig "sigs.k8s.io/controller-runtime/pkg/client/config"
28+
"sigs.k8s.io/controller-runtime/pkg/config"
29+
"sigs.k8s.io/controller-runtime/pkg/log"
30+
"sigs.k8s.io/controller-runtime/pkg/log/zap"
31+
"sigs.k8s.io/controller-runtime/pkg/manager"
32+
"sigs.k8s.io/controller-runtime/pkg/manager/signals"
33+
"sigs.k8s.io/controller-runtime/pkg/reconcile"
34+
)
35+
36+
func init() {
37+
}
38+
39+
func main() {
40+
if err := run(); err != nil {
41+
fmt.Fprintf(os.Stderr, "%v\n", err)
42+
os.Exit(1)
43+
}
44+
}
45+
46+
func run() error {
47+
log.SetLogger(zap.New())
48+
49+
// Setup a Manager
50+
mgr, err := manager.New(kubeconfig.GetConfigOrDie(), manager.Options{
51+
Controller: config.Controller{UsePriorityQueue: true},
52+
})
53+
if err != nil {
54+
return fmt.Errorf("failed to set up controller-manager: %w", err)
55+
}
56+
57+
if err := builder.ControllerManagedBy(mgr).
58+
For(&corev1.ConfigMap{}).
59+
Complete(reconcile.Func(func(ctx context.Context, r reconcile.Request) (reconcile.Result, error) {
60+
log.FromContext(ctx).Info("Reconciling")
61+
time.Sleep(10 * time.Second)
62+
63+
return reconcile.Result{}, nil
64+
})); err != nil {
65+
return fmt.Errorf("failed to set up controller: %w", err)
66+
}
67+
68+
if err := mgr.Start(signals.SetupSignalHandler()); err != nil {
69+
return fmt.Errorf("failed to start manager: %w", err)
70+
}
71+
72+
return nil
73+
}

examples/scratch-env/go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ require (
2222
github.com/go-openapi/swag v0.23.0 // indirect
2323
github.com/gogo/protobuf v1.3.2 // indirect
2424
github.com/golang/protobuf v1.5.4 // indirect
25+
github.com/google/btree v1.1.3 // indirect
2526
github.com/google/gnostic-models v0.6.8 // indirect
2627
github.com/google/go-cmp v0.6.0 // indirect
2728
github.com/google/gofuzz v1.2.0 // indirect

examples/scratch-env/go.sum

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,8 @@ github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q=
3333
github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q=
3434
github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek=
3535
github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps=
36+
github.com/google/btree v1.1.3 h1:CVpQJjYgC4VbzxeGVHfvZrv1ctoYCAI8vbl07Fcxlyg=
37+
github.com/google/btree v1.1.3/go.mod h1:qOPhT0dTNdNzV6Z/lhRX0YXUafgPLFUh+gZMl761Gm4=
3638
github.com/google/gnostic-models v0.6.8 h1:yo/ABAfM5IMRsS1VnXjTBvUb61tFIHozhlYvRgGre9I=
3739
github.com/google/gnostic-models v0.6.8/go.mod h1:5n7qKqH0f5wFt+aWF8CW6pZLLNOfYuF5OpfBSENuI8U=
3840
github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=

go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ require (
66
github.com/evanphx/json-patch/v5 v5.9.0
77
github.com/go-logr/logr v1.4.2
88
github.com/go-logr/zapr v1.3.0
9+
github.com/google/btree v1.1.3
910
github.com/google/go-cmp v0.6.0
1011
github.com/google/gofuzz v1.2.0
1112
github.com/onsi/ginkgo/v2 v2.21.0

go.sum

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,8 @@ github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q=
5151
github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q=
5252
github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek=
5353
github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps=
54+
github.com/google/btree v1.1.3 h1:CVpQJjYgC4VbzxeGVHfvZrv1ctoYCAI8vbl07Fcxlyg=
55+
github.com/google/btree v1.1.3/go.mod h1:qOPhT0dTNdNzV6Z/lhRX0YXUafgPLFUh+gZMl761Gm4=
5456
github.com/google/cel-go v0.22.0 h1:b3FJZxpiv1vTMo2/5RDUqAHPxkT8mmMfJIrq1llbf7g=
5557
github.com/google/cel-go v0.22.0/go.mod h1:BuznPXXfQDpXKWQ9sPW3TzlAJN5zzFe+i9tIs0yC4s8=
5658
github.com/google/gnostic-models v0.6.8 h1:yo/ABAfM5IMRsS1VnXjTBvUb61tFIHozhlYvRgGre9I=

pkg/builder/controller.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -163,7 +163,7 @@ func (blder *TypedBuilder[request]) Watches(
163163
) *TypedBuilder[request] {
164164
input := WatchesInput[request]{
165165
obj: object,
166-
handler: eventHandler,
166+
handler: handler.WithLowPriorityWhenUnchanged(eventHandler),
167167
}
168168
for _, opt := range opts {
169169
opt.ApplyToWatches(&input)
@@ -317,7 +317,7 @@ func (blder *TypedBuilder[request]) doWatch() error {
317317
}
318318

319319
var hdler handler.TypedEventHandler[client.Object, request]
320-
reflect.ValueOf(&hdler).Elem().Set(reflect.ValueOf(&handler.EnqueueRequestForObject{}))
320+
reflect.ValueOf(&hdler).Elem().Set(reflect.ValueOf(handler.WithLowPriorityWhenUnchanged(&handler.EnqueueRequestForObject{})))
321321
allPredicates := append([]predicate.Predicate(nil), blder.globalPredicates...)
322322
allPredicates = append(allPredicates, blder.forInput.predicates...)
323323
src := source.TypedKind(blder.mgr.GetCache(), obj, hdler, allPredicates...)
@@ -341,11 +341,11 @@ func (blder *TypedBuilder[request]) doWatch() error {
341341
}
342342

343343
var hdler handler.TypedEventHandler[client.Object, request]
344-
reflect.ValueOf(&hdler).Elem().Set(reflect.ValueOf(handler.EnqueueRequestForOwner(
344+
reflect.ValueOf(&hdler).Elem().Set(reflect.ValueOf(handler.WithLowPriorityWhenUnchanged(handler.EnqueueRequestForOwner(
345345
blder.mgr.GetScheme(), blder.mgr.GetRESTMapper(),
346346
blder.forInput.object,
347347
opts...,
348-
)))
348+
))))
349349
allPredicates := append([]predicate.Predicate(nil), blder.globalPredicates...)
350350
allPredicates = append(allPredicates, own.predicates...)
351351
src := source.TypedKind(blder.mgr.GetCache(), obj, hdler, allPredicates...)

pkg/config/controller.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,4 +53,10 @@ type Controller struct {
5353
// NeedLeaderElection indicates whether the controller needs to use leader election.
5454
// Defaults to true, which means the controller will use leader election.
5555
NeedLeaderElection *bool
56+
57+
// UsePriorityQueue configures the controllers queue to use the controller-runtime provided
58+
// priority queue.
59+
//
60+
// Note: This flag is disabled by default until a future version. It's currently in beta.
61+
UsePriorityQueue bool
5662
}

pkg/controller/controller.go

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import (
2525
"k8s.io/client-go/util/workqueue"
2626
"k8s.io/klog/v2"
2727

28+
"sigs.k8s.io/controller-runtime/pkg/controller/priorityqueue"
2829
"sigs.k8s.io/controller-runtime/pkg/internal/controller"
2930
"sigs.k8s.io/controller-runtime/pkg/manager"
3031
"sigs.k8s.io/controller-runtime/pkg/reconcile"
@@ -189,11 +190,20 @@ func NewTypedUnmanaged[request comparable](name string, mgr manager.Manager, opt
189190
}
190191

191192
if options.RateLimiter == nil {
192-
options.RateLimiter = workqueue.DefaultTypedControllerRateLimiter[request]()
193+
if mgr.GetControllerOptions().UsePriorityQueue {
194+
options.RateLimiter = workqueue.NewTypedItemExponentialFailureRateLimiter[request](5*time.Millisecond, 1000*time.Second)
195+
} else {
196+
options.RateLimiter = workqueue.DefaultTypedControllerRateLimiter[request]()
197+
}
193198
}
194199

195200
if options.NewQueue == nil {
196201
options.NewQueue = func(controllerName string, rateLimiter workqueue.TypedRateLimiter[request]) workqueue.TypedRateLimitingInterface[request] {
202+
if mgr.GetControllerOptions().UsePriorityQueue {
203+
return priorityqueue.New(controllerName, func(o *priorityqueue.Opts[request]) {
204+
o.RateLimiter = rateLimiter
205+
})
206+
}
197207
return workqueue.NewTypedRateLimitingQueueWithConfig(rateLimiter, workqueue.TypedRateLimitingQueueConfig[request]{
198208
Name: controllerName,
199209
})

pkg/controller/controller_test.go

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ import (
2929

3030
"sigs.k8s.io/controller-runtime/pkg/config"
3131
"sigs.k8s.io/controller-runtime/pkg/controller"
32+
"sigs.k8s.io/controller-runtime/pkg/controller/priorityqueue"
3233
"sigs.k8s.io/controller-runtime/pkg/event"
3334
"sigs.k8s.io/controller-runtime/pkg/handler"
3435
internalcontroller "sigs.k8s.io/controller-runtime/pkg/internal/controller"
@@ -437,5 +438,41 @@ var _ = Describe("controller.Controller", func() {
437438
_, ok := c.(manager.LeaderElectionRunnable)
438439
Expect(ok).To(BeTrue())
439440
})
441+
442+
It("should configure a priority queue if UsePriorityQueue is set", func() {
443+
m, err := manager.New(cfg, manager.Options{
444+
Controller: config.Controller{UsePriorityQueue: true},
445+
})
446+
Expect(err).NotTo(HaveOccurred())
447+
448+
c, err := controller.New("new-controller-16", m, controller.Options{
449+
Reconciler: rec,
450+
})
451+
Expect(err).NotTo(HaveOccurred())
452+
453+
ctrl, ok := c.(*internalcontroller.Controller[reconcile.Request])
454+
Expect(ok).To(BeTrue())
455+
456+
q := ctrl.NewQueue("foo", nil)
457+
_, ok = q.(priorityqueue.PriorityQueue[reconcile.Request])
458+
Expect(ok).To(BeTrue())
459+
})
460+
461+
It("should not configure a priority queue if UsePriorityQueue is not set", func() {
462+
m, err := manager.New(cfg, manager.Options{})
463+
Expect(err).NotTo(HaveOccurred())
464+
465+
c, err := controller.New("new-controller-17", m, controller.Options{
466+
Reconciler: rec,
467+
})
468+
Expect(err).NotTo(HaveOccurred())
469+
470+
ctrl, ok := c.(*internalcontroller.Controller[reconcile.Request])
471+
Expect(ok).To(BeTrue())
472+
473+
q := ctrl.NewQueue("foo", nil)
474+
_, ok = q.(priorityqueue.PriorityQueue[reconcile.Request])
475+
Expect(ok).To(BeFalse())
476+
})
440477
})
441478
})

0 commit comments

Comments
 (0)