Skip to content

Commit 382101e

Browse files
zeripathlunny
andauthored
In disk_channel queues synchronously push to disk on shutdown (#18415) (#18788)
Partial Backport of #18415 Instead of using an asynchronous goroutine to push to disk on shutdown just close the datachan and immediately push to the disk. Prevents messages of incompletely flushed queues. Signed-off-by: Andrew Thornton <[email protected]> Co-authored-by: Lunny Xiao <[email protected]>
1 parent 86c3481 commit 382101e

File tree

4 files changed

+12
-12
lines changed

4 files changed

+12
-12
lines changed

modules/queue/queue_bytefifo.go

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -195,9 +195,11 @@ loop:
195195
}
196196
}
197197

198-
var errQueueEmpty = fmt.Errorf("empty queue")
199-
var errEmptyBytes = fmt.Errorf("empty bytes")
200-
var errUnmarshal = fmt.Errorf("failed to unmarshal")
198+
var (
199+
errQueueEmpty = fmt.Errorf("empty queue")
200+
errEmptyBytes = fmt.Errorf("empty bytes")
201+
errUnmarshal = fmt.Errorf("failed to unmarshal")
202+
)
201203

202204
func (q *ByteFIFOQueue) doPop() error {
203205
q.lock.Lock()

modules/queue/queue_disk_channel.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -251,8 +251,8 @@ func (q *PersistableChannelQueue) Shutdown() {
251251
q.channelQueue.Wait()
252252
q.internal.(*LevelQueue).Wait()
253253
// Redirect all remaining data in the chan to the internal channel
254-
close(q.channelQueue.dataChan)
255254
log.Trace("PersistableChannelQueue: %s Redirecting remaining data", q.delayedStarter.name)
255+
close(q.channelQueue.dataChan)
256256
for data := range q.channelQueue.dataChan {
257257
_ = q.internal.Push(data)
258258
atomic.AddInt64(&q.channelQueue.numInQueue, -1)

modules/queue/queue_disk_channel_test.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -188,5 +188,4 @@ func TestPersistableChannelQueue(t *testing.T) {
188188
for _, callback := range callbacks {
189189
callback()
190190
}
191-
192191
}

modules/queue/unique_queue_disk_channel.go

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -238,13 +238,12 @@ func (q *PersistableChannelUniqueQueue) Shutdown() {
238238
q.channelQueue.Wait()
239239
q.internal.(*LevelUniqueQueue).Wait()
240240
// Redirect all remaining data in the chan to the internal channel
241-
go func() {
242-
log.Trace("PersistableChannelUniqueQueue: %s Redirecting remaining data", q.delayedStarter.name)
243-
for data := range q.channelQueue.dataChan {
244-
_ = q.internal.Push(data)
245-
}
246-
log.Trace("PersistableChannelUniqueQueue: %s Done Redirecting remaining data", q.delayedStarter.name)
247-
}()
241+
close(q.channelQueue.dataChan)
242+
log.Trace("PersistableChannelUniqueQueue: %s Redirecting remaining data", q.delayedStarter.name)
243+
for data := range q.channelQueue.dataChan {
244+
_ = q.internal.Push(data)
245+
}
246+
log.Trace("PersistableChannelUniqueQueue: %s Done Redirecting remaining data", q.delayedStarter.name)
248247

249248
log.Debug("PersistableChannelUniqueQueue: %s Shutdown", q.delayedStarter.name)
250249
}

0 commit comments

Comments
 (0)