-
Notifications
You must be signed in to change notification settings - Fork 1.2k
✨ [Warm Replicas] Implement warm replica support for controllers. #3192
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 10 commits
8239300
73fc8fa
be1b1c2
c9b99eb
e7a2bbf
854987c
072ad4b
43118a3
b67bc65
fc7c8c5
6bb4616
ccc7485
54f4fe3
667bb03
66e3be4
d9cc96b
65a04d5
c201bfa
5a13db4
4879527
57acc77
1987b54
a49f3a4
89f5479
9d5ddfb
66f64f0
0563114
aa20ef5
c9a2973
79a7b95
c1d8ea4
5df573f
d8650df
a03f404
dcf4b8b
ba51d28
ea2aa0e
730b30e
bca3e2a
12e938c
de4232d
a3dc13b
84d2053
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -29,7 +29,9 @@ import ( | |
"k8s.io/apimachinery/pkg/types" | ||
utilruntime "k8s.io/apimachinery/pkg/util/runtime" | ||
"k8s.io/apimachinery/pkg/util/uuid" | ||
"k8s.io/apimachinery/pkg/util/wait" | ||
"k8s.io/client-go/util/workqueue" | ||
"k8s.io/utils/ptr" | ||
|
||
"sigs.k8s.io/controller-runtime/pkg/controller/priorityqueue" | ||
ctrlmetrics "sigs.k8s.io/controller-runtime/pkg/internal/controller/metrics" | ||
|
@@ -38,6 +40,11 @@ import ( | |
"sigs.k8s.io/controller-runtime/pkg/source" | ||
) | ||
|
||
const ( | ||
// syncedPollPeriod is the period to poll for cache sync | ||
syncedPollPeriod = 100 * time.Millisecond | ||
) | ||
|
||
// Controller implements controller.Controller. | ||
type Controller[request comparable] struct { | ||
// Name is used to uniquely identify a Controller in tracing, logging and monitoring. Name is required. | ||
|
@@ -83,6 +90,16 @@ type Controller[request comparable] struct { | |
// startWatches maintains a list of sources, handlers, and predicates to start when the controller is started. | ||
startWatches []source.TypedSource[request] | ||
|
||
// didStartEventSources is used to indicate whether the event sources have been started. | ||
didStartEventSources atomic.Bool | ||
|
||
// didEventSourcesFinishSync is used to indicate whether the event sources have finished | ||
// successfully. It stores a *bool where | ||
// - nil: not finished syncing | ||
// - true: finished syncing without error | ||
// - false: finished syncing with error | ||
didEventSourcesFinishSync atomic.Value | ||
godwinpang marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
// LogConstructor is used to construct a logger to then log messages to users during reconciliation, | ||
// or for example when a watch is started. | ||
// Note: LogConstructor has to be able to handle nil requests as we are also using it | ||
|
@@ -95,6 +112,12 @@ type Controller[request comparable] struct { | |
|
||
// LeaderElected indicates whether the controller is leader elected or always running. | ||
LeaderElected *bool | ||
|
||
// NeedWarmup specifies whether the controller should start its sources | ||
// when the manager is not the leader. | ||
// Defaults to false, which means that the controller will wait for leader election to start | ||
// before starting sources. | ||
NeedWarmup *bool | ||
} | ||
|
||
// Reconcile implements reconcile.Reconciler. | ||
|
@@ -144,6 +167,38 @@ func (c *Controller[request]) NeedLeaderElection() bool { | |
return *c.LeaderElected | ||
} | ||
|
||
// Warmup implements the manager.WarmupRunnable interface. | ||
func (c *Controller[request]) Warmup(ctx context.Context) error { | ||
sbueringer marked this conversation as resolved.
Show resolved
Hide resolved
|
||
if c.NeedWarmup == nil || !*c.NeedWarmup { | ||
return nil | ||
} | ||
return c.startEventSources(ctx) | ||
godwinpang marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
|
||
// DidFinishWarmup implements the manager.WarmupRunnable interface. | ||
sbueringer marked this conversation as resolved.
Show resolved
Hide resolved
|
||
func (c *Controller[request]) DidFinishWarmup(ctx context.Context) bool { | ||
sbueringer marked this conversation as resolved.
Show resolved
Hide resolved
|
||
err := wait.PollUntilContextCancel(ctx, syncedPollPeriod, true, func(ctx context.Context) (bool, error) { | ||
sbueringer marked this conversation as resolved.
Show resolved
Hide resolved
|
||
didFinishSync, ok := c.didEventSourcesFinishSync.Load().(*bool) | ||
if !ok { | ||
return false, errors.New("unexpected error: didEventSourcesFinishSync is not a bool pointer") | ||
} | ||
|
||
if didFinishSync == nil { | ||
// event sources not finished syncing | ||
return false, nil | ||
} | ||
|
||
if !*didFinishSync { | ||
// event sources finished syncing with an error | ||
return true, errors.New("event sources did not finish syncing successfully") | ||
} | ||
|
||
return true, nil | ||
}) | ||
|
||
return err == nil | ||
} | ||
|
||
// Start implements controller.Controller. | ||
func (c *Controller[request]) Start(ctx context.Context) error { | ||
// use an IIFE to get proper lock handling | ||
|
@@ -221,13 +276,19 @@ func (c *Controller[request]) Start(ctx context.Context) error { | |
// startEventSources launches all the sources registered with this controller and waits | ||
// for them to sync. It returns an error if any of the sources fail to start or sync. | ||
func (c *Controller[request]) startEventSources(ctx context.Context) error { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think c.Started doesn't work anymore as expected. With the current version of the PR it's used for two purposes:
=> 1. still works as expected. 2 leads to problems Now the following can happen:
=> So the Source added in 2. is never started I would suggest to
@alvaroaleman does this sound correct to you? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, makes sense. Also, lets add a test for this There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. d8650df fixes this but there is another race that this uncovers while testing L336 starts a goroutine that can potentially outlive the duration of the caller holding the lock. The errGroup blocks on The problem variable is There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe we can do something as follows? Keep the current error handling behavior, but add a channel blocking until watch.Start is called in the defer block
edit: 730b30e was the attempt, but looks like it doesn't work because it fails the case where watch.Start blocks indefinitely. edit 2: There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think the problem is bigger. Does Warmup start the sources with a nil queue? (if it is called before Start) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think we would have to write c.Queue already in New. This also starts the queue though. The easiest might be to move it into startEventSourcesLocked (and rename that func to startEventSourcesAndQueueLocked) Like this c.didStartEventSourcesOnce.Do(func() {
queue := c.NewQueue(c.Name, c.RateLimiter)
if priorityQueue, isPriorityQueue := queue.(priorityqueue.PriorityQueue[request]); isPriorityQueue {
c.Queue = priorityQueue
} else {
c.Queue = &priorityQueueWrapper[request]{TypedRateLimitingInterface: queue}
}
go func() {
<-ctx.Done()
c.Queue.ShutDown()
}() There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think one consequence is that depending on if Warmup or Start is actually running the sync.Once in startEventSourcesLocked the queue is getting shutdown if either the Warmup or one of the other runnable groups is shutdown, but this should be fine? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Ohh, extremely good catch.
This makes sense to me.
I think so, yeah There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we make sure that our tests validate that the queue we pass to the sources is non-nil? This should've been caught by tests and not by a human ideally There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I added a UT to verify this in 12e938c and also changed the existing integration tests at both the manager and controller levels to exercise controllers with warmup enabled so there is some e2e coverage. Let me also go through the existing tests around |
||
// CAS returns false if value is already true, so early exit since another goroutine must have | ||
// called startEventSources previously | ||
if !c.didStartEventSources.CompareAndSwap(false, true) { | ||
c.LogConstructor(nil).Info("Skipping starting event sources since it was already started") | ||
godwinpang marked this conversation as resolved.
Show resolved
Hide resolved
|
||
return nil | ||
} | ||
|
||
errGroup := &errgroup.Group{} | ||
for _, watch := range c.startWatches { | ||
log := c.LogConstructor(nil) | ||
_, ok := watch.(interface { | ||
String() string | ||
}) | ||
|
||
if !ok { | ||
log = log.WithValues("source", fmt.Sprintf("%T", watch)) | ||
} else { | ||
|
@@ -274,7 +335,11 @@ func (c *Controller[request]) startEventSources(ctx context.Context) error { | |
} | ||
}) | ||
} | ||
return errGroup.Wait() | ||
err := errGroup.Wait() | ||
|
||
c.didEventSourcesFinishSync.Store(ptr.To(err == nil)) | ||
|
||
return err | ||
} | ||
|
||
// processNextWorkItem will read a single work item off the workqueue and | ||
|
Uh oh!
There was an error while loading. Please reload this page.