-
Notifications
You must be signed in to change notification settings - Fork 1.2k
✨ [Warm Replicas] Implement warm replica support for controllers. #3192
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 31 commits
8239300
73fc8fa
be1b1c2
c9b99eb
e7a2bbf
854987c
072ad4b
43118a3
b67bc65
fc7c8c5
6bb4616
ccc7485
54f4fe3
667bb03
66e3be4
d9cc96b
65a04d5
c201bfa
5a13db4
4879527
57acc77
1987b54
a49f3a4
89f5479
9d5ddfb
66f64f0
0563114
aa20ef5
c9a2973
79a7b95
c1d8ea4
5df573f
d8650df
a03f404
dcf4b8b
ba51d28
ea2aa0e
730b30e
bca3e2a
12e938c
de4232d
a3dc13b
84d2053
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -38,6 +38,50 @@ import ( | |
"sigs.k8s.io/controller-runtime/pkg/source" | ||
) | ||
|
||
type ControllerOptions[request comparable] struct { | ||
// Reconciler is a function that can be called at any time with the Name / Namespace of an object and | ||
// ensures that the state of the system matches the state specified in the object. | ||
// Defaults to the DefaultReconcileFunc. | ||
Do reconcile.TypedReconciler[request] | ||
|
||
// RateLimiter is used to limit how frequently requests may be queued into the work queue. | ||
RateLimiter workqueue.TypedRateLimiter[request] | ||
|
||
// NewQueue constructs the queue for this controller once the controller is ready to start. | ||
// This is a func because the standard Kubernetes work queues start themselves immediately, which | ||
// leads to goroutine leaks if something calls controller.New repeatedly. | ||
NewQueue func(controllerName string, rateLimiter workqueue.TypedRateLimiter[request]) workqueue.TypedRateLimitingInterface[request] | ||
|
||
// MaxConcurrentReconciles is the maximum number of concurrent Reconciles which can be run. Defaults to 1. | ||
MaxConcurrentReconciles int | ||
|
||
// CacheSyncTimeout refers to the time limit set on waiting for cache to sync | ||
// Defaults to 2 minutes if not set. | ||
CacheSyncTimeout time.Duration | ||
|
||
// Name is used to uniquely identify a Controller in tracing, logging and monitoring. Name is required. | ||
Name string | ||
|
||
// LogConstructor is used to construct a logger to then log messages to users during reconciliation, | ||
// or for example when a watch is started. | ||
// Note: LogConstructor has to be able to handle nil requests as we are also using it | ||
// outside the context of a reconciliation. | ||
LogConstructor func(request *request) logr.Logger | ||
|
||
// RecoverPanic indicates whether the panic caused by reconcile should be recovered. | ||
// Defaults to true. | ||
RecoverPanic *bool | ||
|
||
// LeaderElected indicates whether the controller is leader elected or always running. | ||
LeaderElected *bool | ||
|
||
// EnableWarmup specifies whether the controller should start its sources | ||
// when the manager is not the leader. | ||
// Defaults to false, which means that the controller will wait for leader election to start | ||
godwinpang marked this conversation as resolved.
Show resolved
Hide resolved
|
||
// before starting sources. | ||
EnableWarmup *bool | ||
} | ||
|
||
// Controller implements controller.Controller. | ||
type Controller[request comparable] struct { | ||
// Name is used to uniquely identify a Controller in tracing, logging and monitoring. Name is required. | ||
|
@@ -83,6 +127,9 @@ type Controller[request comparable] struct { | |
// startWatches maintains a list of sources, handlers, and predicates to start when the controller is started. | ||
startWatches []source.TypedSource[request] | ||
|
||
// didStartEventSourcesOnce is used to ensure that the event sources are only started once. | ||
didStartEventSourcesOnce sync.Once | ||
|
||
// LogConstructor is used to construct a logger to then log messages to users during reconciliation, | ||
// or for example when a watch is started. | ||
// Note: LogConstructor has to be able to handle nil requests as we are also using it | ||
|
@@ -95,6 +142,34 @@ type Controller[request comparable] struct { | |
|
||
// LeaderElected indicates whether the controller is leader elected or always running. | ||
LeaderElected *bool | ||
|
||
// EnableWarmup specifies whether the controller should start its sources when the manager is not | ||
// the leader. This is useful for cases where sources take a long time to start, as it allows | ||
// for the controller to warm up its caches even before it is elected as the leader. This | ||
// improves leadership failover time, as the caches will be prepopulated before the controller | ||
// transitions to be leader. | ||
// | ||
// Setting EnableWarmup to true and NeedLeaderElection to true means the controller will start its | ||
// sources without waiting to become leader. | ||
// Setting EnableWarmup to true and NeedLeaderElection to false is a no-op as controllers without | ||
// leader election do not wait on leader election to start their sources. | ||
// Defaults to false. | ||
EnableWarmup *bool | ||
} | ||
|
||
func New[request comparable](options ControllerOptions[request]) *Controller[request] { | ||
return &Controller[request]{ | ||
Do: options.Do, | ||
RateLimiter: options.RateLimiter, | ||
NewQueue: options.NewQueue, | ||
MaxConcurrentReconciles: options.MaxConcurrentReconciles, | ||
CacheSyncTimeout: options.CacheSyncTimeout, | ||
Name: options.Name, | ||
LogConstructor: options.LogConstructor, | ||
RecoverPanic: options.RecoverPanic, | ||
LeaderElected: options.LeaderElected, | ||
EnableWarmup: options.EnableWarmup, | ||
} | ||
} | ||
|
||
// Reconcile implements reconcile.Reconciler. | ||
|
@@ -144,6 +219,15 @@ func (c *Controller[request]) NeedLeaderElection() bool { | |
return *c.LeaderElected | ||
} | ||
|
||
// Warmup implements the manager.WarmupRunnable interface. | ||
func (c *Controller[request]) Warmup(ctx context.Context) error { | ||
sbueringer marked this conversation as resolved.
Show resolved
Hide resolved
|
||
if c.EnableWarmup == nil || !*c.EnableWarmup { | ||
return nil | ||
} | ||
|
||
return c.startEventSources(ctx) | ||
godwinpang marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
|
||
// Start implements controller.Controller. | ||
func (c *Controller[request]) Start(ctx context.Context) error { | ||
// use an IIFE to get proper lock handling | ||
|
@@ -185,12 +269,6 @@ func (c *Controller[request]) Start(ctx context.Context) error { | |
|
||
c.LogConstructor(nil).Info("Starting Controller") | ||
|
||
// All the watches have been started, we can reset the local slice. | ||
// | ||
// We should never hold watches more than necessary, each watch source can hold a backing cache, | ||
// which won't be garbage collected if we hold a reference to it. | ||
c.startWatches = nil | ||
|
||
// Launch workers to process resources | ||
c.LogConstructor(nil).Info("Starting workers", "worker count", c.MaxConcurrentReconciles) | ||
wg.Add(c.MaxConcurrentReconciles) | ||
|
@@ -221,60 +299,71 @@ func (c *Controller[request]) Start(ctx context.Context) error { | |
// startEventSources launches all the sources registered with this controller and waits | ||
// for them to sync. It returns an error if any of the sources fail to start or sync. | ||
func (c *Controller[request]) startEventSources(ctx context.Context) error { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think c.Started doesn't work anymore as expected. With the current version of the PR it's used for two purposes:
=> 1. still works as expected. 2 leads to problems Now the following can happen:
=> So the Source added in 2. is never started I would suggest to
@alvaroaleman does this sound correct to you? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, makes sense. Also, lets add a test for this There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. d8650df fixes this but there is another race that this uncovers while testing L336 starts a goroutine that can potentially outlive the duration of the caller holding the lock. The errGroup blocks on The problem variable is There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe we can do something as follows? Keep the current error handling behavior, but add a channel blocking until watch.Start is called in the defer block
edit: 730b30e was the attempt, but looks like it doesn't work because it fails the case where watch.Start blocks indefinitely. edit 2: There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think the problem is bigger. Does Warmup start the sources with a nil queue? (if it is called before Start) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think we would have to write c.Queue already in New. This also starts the queue though. The easiest might be to move it into startEventSourcesLocked (and rename that func to startEventSourcesAndQueueLocked) Like this c.didStartEventSourcesOnce.Do(func() {
queue := c.NewQueue(c.Name, c.RateLimiter)
if priorityQueue, isPriorityQueue := queue.(priorityqueue.PriorityQueue[request]); isPriorityQueue {
c.Queue = priorityQueue
} else {
c.Queue = &priorityQueueWrapper[request]{TypedRateLimitingInterface: queue}
}
go func() {
<-ctx.Done()
c.Queue.ShutDown()
}() There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think one consequence is that depending on if Warmup or Start is actually running the sync.Once in startEventSourcesLocked the queue is getting shutdown if either the Warmup or one of the other runnable groups is shutdown, but this should be fine? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Ohh, extremely good catch.
This makes sense to me.
I think so, yeah There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we make sure that our tests validate that the queue we pass to the sources is non-nil? This should've been caught by tests and not by a human ideally There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I added a UT to verify this in 12e938c and also changed the existing integration tests at both the manager and controller levels to exercise controllers with warmup enabled so there is some e2e coverage. Let me also go through the existing tests around |
||
errGroup := &errgroup.Group{} | ||
for _, watch := range c.startWatches { | ||
log := c.LogConstructor(nil) | ||
_, ok := watch.(interface { | ||
String() string | ||
}) | ||
|
||
if !ok { | ||
log = log.WithValues("source", fmt.Sprintf("%T", watch)) | ||
} else { | ||
log = log.WithValues("source", fmt.Sprintf("%s", watch)) | ||
} | ||
didStartSyncingSource := &atomic.Bool{} | ||
errGroup.Go(func() error { | ||
// Use a timeout for starting and syncing the source to avoid silently | ||
// blocking startup indefinitely if it doesn't come up. | ||
sourceStartCtx, cancel := context.WithTimeout(ctx, c.CacheSyncTimeout) | ||
defer cancel() | ||
|
||
sourceStartErrChan := make(chan error, 1) // Buffer chan to not leak goroutine if we time out | ||
go func() { | ||
defer close(sourceStartErrChan) | ||
log.Info("Starting EventSource") | ||
if err := watch.Start(ctx, c.Queue); err != nil { | ||
sourceStartErrChan <- err | ||
return | ||
} | ||
syncingSource, ok := watch.(source.TypedSyncingSource[request]) | ||
if !ok { | ||
return | ||
} | ||
didStartSyncingSource.Store(true) | ||
if err := syncingSource.WaitForSync(sourceStartCtx); err != nil { | ||
err := fmt.Errorf("failed to wait for %s caches to sync %v: %w", c.Name, syncingSource, err) | ||
log.Error(err, "Could not wait for Cache to sync") | ||
sourceStartErrChan <- err | ||
var retErr error | ||
alvaroaleman marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
c.didStartEventSourcesOnce.Do(func() { | ||
errGroup := &errgroup.Group{} | ||
for _, watch := range c.startWatches { | ||
log := c.LogConstructor(nil) | ||
_, ok := watch.(interface { | ||
String() string | ||
}) | ||
if !ok { | ||
log = log.WithValues("source", fmt.Sprintf("%T", watch)) | ||
} else { | ||
log = log.WithValues("source", fmt.Sprintf("%s", watch)) | ||
} | ||
didStartSyncingSource := &atomic.Bool{} | ||
errGroup.Go(func() error { | ||
// Use a timeout for starting and syncing the source to avoid silently | ||
// blocking startup indefinitely if it doesn't come up. | ||
sourceStartCtx, cancel := context.WithTimeout(ctx, c.CacheSyncTimeout) | ||
defer cancel() | ||
|
||
sourceStartErrChan := make(chan error, 1) // Buffer chan to not leak goroutine if we time out | ||
go func() { | ||
defer close(sourceStartErrChan) | ||
log.Info("Starting EventSource") | ||
if err := watch.Start(ctx, c.Queue); err != nil { | ||
sourceStartErrChan <- err | ||
return | ||
} | ||
syncingSource, ok := watch.(source.TypedSyncingSource[request]) | ||
if !ok { | ||
return | ||
} | ||
didStartSyncingSource.Store(true) | ||
if err := syncingSource.WaitForSync(sourceStartCtx); err != nil { | ||
err := fmt.Errorf("failed to wait for %s caches to sync %v: %w", c.Name, syncingSource, err) | ||
log.Error(err, "Could not wait for Cache to sync") | ||
sourceStartErrChan <- err | ||
} | ||
}() | ||
|
||
select { | ||
case err := <-sourceStartErrChan: | ||
return err | ||
case <-sourceStartCtx.Done(): | ||
if didStartSyncingSource.Load() { // We are racing with WaitForSync, wait for it to let it tell us what happened | ||
return <-sourceStartErrChan | ||
} | ||
if ctx.Err() != nil { // Don't return an error if the root context got cancelled | ||
return nil | ||
} | ||
return fmt.Errorf("timed out waiting for source %s to Start. Please ensure that its Start() method is non-blocking", watch) | ||
} | ||
}() | ||
}) | ||
} | ||
retErr = errGroup.Wait() | ||
|
||
select { | ||
case err := <-sourceStartErrChan: | ||
return err | ||
case <-sourceStartCtx.Done(): | ||
if didStartSyncingSource.Load() { // We are racing with WaitForSync, wait for it to let it tell us what happened | ||
return <-sourceStartErrChan | ||
} | ||
if ctx.Err() != nil { // Don't return an error if the root context got cancelled | ||
return nil | ||
} | ||
return fmt.Errorf("timed out waiting for source %s to Start. Please ensure that its Start() method is non-blocking", watch) | ||
} | ||
}) | ||
} | ||
return errGroup.Wait() | ||
// All the watches have been started, we can reset the local slice. | ||
// | ||
// We should never hold watches more than necessary, each watch source can hold a backing cache, | ||
// which won't be garbage collected if we hold a reference to it. | ||
c.startWatches = nil | ||
}) | ||
|
||
return retErr | ||
} | ||
|
||
// processNextWorkItem will read a single work item off the workqueue and | ||
|
Uh oh!
There was an error while loading. Please reload this page.