@@ -332,11 +332,15 @@ func (c *Controller[request]) startEventSourcesLocked(ctx context.Context) error
332
332
sourceStartCtx , cancel := context .WithTimeout (ctx , c .CacheSyncTimeout )
333
333
defer cancel ()
334
334
335
- sourceStartErrChan := make (chan error , 1 ) // Buffer chan to not leak goroutine if we time out
335
+ sourceStartErrChan := make (chan error , 1 ) // Buffer chan to not leak goroutine if we time out
336
+ hasAccessedQueueChan := make (chan struct {}) //
336
337
go func () {
337
338
defer close (sourceStartErrChan )
338
339
log .Info ("Starting EventSource" )
339
- if err := watch .Start (ctx , c .Queue ); err != nil {
340
+
341
+ q := c .Queue
342
+ close (hasAccessedQueueChan )
343
+ if err := watch .Start (ctx , q ); err != nil {
340
344
sourceStartErrChan <- err
341
345
return
342
346
}
@@ -356,8 +360,8 @@ func (c *Controller[request]) startEventSourcesLocked(ctx context.Context) error
356
360
case err := <- sourceStartErrChan :
357
361
return err
358
362
case <- sourceStartCtx .Done ():
359
- defer func () { <- sourceStartErrChan }() // Ensure that watch.Start has been called to avoid prematurely releasing lock before accessing c.Queue
360
- if didStartSyncingSource .Load () { // We are racing with WaitForSync, wait for it to let it tell us what happened
363
+ defer func () { <- hasAccessedQueueChan }() // Ensure that watch.Start has been called to avoid prematurely releasing lock before accessing c.Queue
364
+ if didStartSyncingSource .Load () { // We are racing with WaitForSync, wait for it to let it tell us what happened
361
365
return <- sourceStartErrChan
362
366
}
363
367
if ctx .Err () != nil { // Don't return an error if the root context got cancelled
0 commit comments