Skip to content

Commit 7b4c3c7

Browse files
authored
Prevent intermittent NPE in queue tests (#19301)
There appears to be an intermittent NPE in queue tests relating to the deferred shutdown/terminate functions. This PR more formally asserts that shutdown and termination occurs before starting and finishing the tests but leaves the defer in place to ensure that if there is an issue shutdown/termination will occur. Signed-off-by: Andrew Thornton <[email protected]>
1 parent cf5d4a7 commit 7b4c3c7

File tree

2 files changed

+97
-28
lines changed

2 files changed

+97
-28
lines changed

modules/queue/queue_channel_test.go

Lines changed: 40 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,8 @@ func TestChannelQueue_Pause(t *testing.T) {
128128
queueShutdown := []func(){}
129129
queueTerminate := []func(){}
130130

131+
terminated := make(chan struct{})
132+
131133
queue, err = NewChannelQueue(handle,
132134
ChannelQueueConfiguration{
133135
WorkerPoolConfiguration: WorkerPoolConfiguration{
@@ -142,15 +144,18 @@ func TestChannelQueue_Pause(t *testing.T) {
142144
}, &testData{})
143145
assert.NoError(t, err)
144146

145-
go queue.Run(func(shutdown func()) {
146-
lock.Lock()
147-
defer lock.Unlock()
148-
queueShutdown = append(queueShutdown, shutdown)
149-
}, func(terminate func()) {
150-
lock.Lock()
151-
defer lock.Unlock()
152-
queueTerminate = append(queueTerminate, terminate)
153-
})
147+
go func() {
148+
queue.Run(func(shutdown func()) {
149+
lock.Lock()
150+
defer lock.Unlock()
151+
queueShutdown = append(queueShutdown, shutdown)
152+
}, func(terminate func()) {
153+
lock.Lock()
154+
defer lock.Unlock()
155+
queueTerminate = append(queueTerminate, terminate)
156+
})
157+
close(terminated)
158+
}()
154159

155160
// Shutdown and Terminate in defer
156161
defer func() {
@@ -278,4 +283,30 @@ func TestChannelQueue_Pause(t *testing.T) {
278283
}
279284
assert.Equal(t, test1.TestString, result1.TestString)
280285
assert.Equal(t, test1.TestInt, result1.TestInt)
286+
287+
lock.Lock()
288+
callbacks := make([]func(), len(queueShutdown))
289+
copy(callbacks, queueShutdown)
290+
queueShutdown = queueShutdown[:0]
291+
lock.Unlock()
292+
// Now shutdown the queue
293+
for _, callback := range callbacks {
294+
callback()
295+
}
296+
297+
// terminate the queue
298+
lock.Lock()
299+
callbacks = make([]func(), len(queueTerminate))
300+
copy(callbacks, queueTerminate)
301+
queueShutdown = queueTerminate[:0]
302+
lock.Unlock()
303+
for _, callback := range callbacks {
304+
callback()
305+
}
306+
select {
307+
case <-terminated:
308+
case <-time.After(10 * time.Second):
309+
assert.Fail(t, "Queue should have terminated")
310+
return
311+
}
281312
}

modules/queue/queue_disk_channel_test.go

Lines changed: 57 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -221,6 +221,7 @@ func TestPersistableChannelQueue_Pause(t *testing.T) {
221221

222222
queueShutdown := []func(){}
223223
queueTerminate := []func(){}
224+
terminated := make(chan struct{})
224225

225226
tmpDir, err := os.MkdirTemp("", "persistable-channel-queue-pause-test-data")
226227
assert.NoError(t, err)
@@ -237,15 +238,18 @@ func TestPersistableChannelQueue_Pause(t *testing.T) {
237238
}, &testData{})
238239
assert.NoError(t, err)
239240

240-
go queue.Run(func(shutdown func()) {
241-
lock.Lock()
242-
defer lock.Unlock()
243-
queueShutdown = append(queueShutdown, shutdown)
244-
}, func(terminate func()) {
245-
lock.Lock()
246-
defer lock.Unlock()
247-
queueTerminate = append(queueTerminate, terminate)
248-
})
241+
go func() {
242+
queue.Run(func(shutdown func()) {
243+
lock.Lock()
244+
defer lock.Unlock()
245+
queueShutdown = append(queueShutdown, shutdown)
246+
}, func(terminate func()) {
247+
lock.Lock()
248+
defer lock.Unlock()
249+
queueTerminate = append(queueTerminate, terminate)
250+
})
251+
close(terminated)
252+
}()
249253

250254
// Shutdown and Terminate in defer
251255
defer func() {
@@ -417,14 +421,18 @@ func TestPersistableChannelQueue_Pause(t *testing.T) {
417421
case <-handleChan:
418422
assert.Fail(t, "Handler processing should have stopped")
419423
return
420-
default:
424+
case <-terminated:
425+
case <-time.After(10 * time.Second):
426+
assert.Fail(t, "Queue should have terminated")
427+
return
421428
}
422429

423430
lock.Lock()
424431
pushBack = true
425432
lock.Unlock()
426433

427434
// Reopen queue
435+
terminated = make(chan struct{})
428436
queue, err = NewPersistableChannelQueue(handle, PersistableChannelQueueConfiguration{
429437
DataDir: tmpDir,
430438
BatchLength: 1,
@@ -442,15 +450,18 @@ func TestPersistableChannelQueue_Pause(t *testing.T) {
442450

443451
paused, _ = pausable.IsPausedIsResumed()
444452

445-
go queue.Run(func(shutdown func()) {
446-
lock.Lock()
447-
defer lock.Unlock()
448-
queueShutdown = append(queueShutdown, shutdown)
449-
}, func(terminate func()) {
450-
lock.Lock()
451-
defer lock.Unlock()
452-
queueTerminate = append(queueTerminate, terminate)
453-
})
453+
go func() {
454+
queue.Run(func(shutdown func()) {
455+
lock.Lock()
456+
defer lock.Unlock()
457+
queueShutdown = append(queueShutdown, shutdown)
458+
}, func(terminate func()) {
459+
lock.Lock()
460+
defer lock.Unlock()
461+
queueTerminate = append(queueTerminate, terminate)
462+
})
463+
close(terminated)
464+
}()
454465

455466
select {
456467
case <-handleChan:
@@ -510,4 +521,31 @@ func TestPersistableChannelQueue_Pause(t *testing.T) {
510521

511522
assert.Equal(t, test2.TestString, result4.TestString)
512523
assert.Equal(t, test2.TestInt, result4.TestInt)
524+
525+
lock.Lock()
526+
callbacks = make([]func(), len(queueShutdown))
527+
copy(callbacks, queueShutdown)
528+
queueShutdown = queueShutdown[:0]
529+
lock.Unlock()
530+
// Now shutdown the queue
531+
for _, callback := range callbacks {
532+
callback()
533+
}
534+
535+
// terminate the queue
536+
lock.Lock()
537+
callbacks = make([]func(), len(queueTerminate))
538+
copy(callbacks, queueTerminate)
539+
queueShutdown = queueTerminate[:0]
540+
lock.Unlock()
541+
for _, callback := range callbacks {
542+
callback()
543+
}
544+
545+
select {
546+
case <-time.After(10 * time.Second):
547+
assert.Fail(t, "Queue should have terminated")
548+
return
549+
case <-terminated:
550+
}
513551
}

0 commit comments

Comments
 (0)