Skip to content

Commit 159c3a1

Browse files
committed
The LevelDB queues can actually wait on empty instead of polling
Slight refactor to cause leveldb queues to wait on empty instead of polling. Signed-off-by: Andrew Thornton <[email protected]>
1 parent ee3fb92 commit 159c3a1

File tree

3 files changed

+43
-18
lines changed

3 files changed

+43
-18
lines changed

modules/queue/queue_bytefifo.go

Lines changed: 41 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -17,23 +17,26 @@ import (
1717
// ByteFIFOQueueConfiguration is the configuration for a ByteFIFOQueue
1818
type ByteFIFOQueueConfiguration struct {
1919
WorkerPoolConfiguration
20-
Workers int
21-
Name string
20+
Workers int
21+
Name string
22+
WaitOnEmpty bool
2223
}
2324

2425
var _ Queue = &ByteFIFOQueue{}
2526

2627
// ByteFIFOQueue is a Queue formed from a ByteFIFO and WorkerPool
2728
type ByteFIFOQueue struct {
2829
*WorkerPool
29-
byteFIFO ByteFIFO
30-
typ Type
31-
closed chan struct{}
32-
terminated chan struct{}
33-
exemplar interface{}
34-
workers int
35-
name string
36-
lock sync.Mutex
30+
byteFIFO ByteFIFO
31+
typ Type
32+
closed chan struct{}
33+
terminated chan struct{}
34+
exemplar interface{}
35+
workers int
36+
name string
37+
lock sync.Mutex
38+
waitOnEmpty bool
39+
pushed chan struct{}
3740
}
3841

3942
// NewByteFIFOQueue creates a new ByteFIFOQueue
@@ -45,14 +48,16 @@ func NewByteFIFOQueue(typ Type, byteFIFO ByteFIFO, handle HandlerFunc, cfg, exem
4548
config := configInterface.(ByteFIFOQueueConfiguration)
4649

4750
return &ByteFIFOQueue{
48-
WorkerPool: NewWorkerPool(handle, config.WorkerPoolConfiguration),
49-
byteFIFO: byteFIFO,
50-
typ: typ,
51-
closed: make(chan struct{}),
52-
terminated: make(chan struct{}),
53-
exemplar: exemplar,
54-
workers: config.Workers,
55-
name: config.Name,
51+
WorkerPool: NewWorkerPool(handle, config.WorkerPoolConfiguration),
52+
byteFIFO: byteFIFO,
53+
typ: typ,
54+
closed: make(chan struct{}),
55+
terminated: make(chan struct{}),
56+
exemplar: exemplar,
57+
workers: config.Workers,
58+
name: config.Name,
59+
waitOnEmpty: config.WaitOnEmpty,
60+
pushed: make(chan struct{}, 1),
5661
}, nil
5762
}
5863

@@ -76,6 +81,14 @@ func (q *ByteFIFOQueue) PushFunc(data Data, fn func() error) error {
7681
if err != nil {
7782
return err
7883
}
84+
if q.waitOnEmpty {
85+
defer func() {
86+
select {
87+
case q.pushed <- struct{}{}:
88+
default:
89+
}
90+
}()
91+
}
7992
return q.byteFIFO.PushFunc(bs, fn)
8093
}
8194

@@ -131,6 +144,16 @@ func (q *ByteFIFOQueue) readToChan() {
131144
}
132145

133146
if len(bs) == 0 {
147+
if q.waitOnEmpty && q.byteFIFO.Len() == 0 {
148+
q.lock.Unlock()
149+
log.Trace("%s: %s Waiting on Empty", q.typ, q.name)
150+
select {
151+
case <-q.pushed:
152+
continue
153+
case <-q.closed:
154+
continue
155+
}
156+
}
134157
q.lock.Unlock()
135158
time.Sleep(time.Millisecond * 100)
136159
continue

modules/queue/queue_disk.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ func NewLevelQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, error)
3737
if len(config.ConnectionString) == 0 {
3838
config.ConnectionString = config.DataDir
3939
}
40+
config.WaitOnEmpty = true
4041

4142
byteFIFO, err := NewLevelQueueByteFIFO(config.ConnectionString, config.QueueName)
4243
if err != nil {

modules/queue/unique_queue_disk.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ func NewLevelUniqueQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue,
4141
if len(config.ConnectionString) == 0 {
4242
config.ConnectionString = config.DataDir
4343
}
44+
config.WaitOnEmpty = true
4445

4546
byteFIFO, err := NewLevelUniqueQueueByteFIFO(config.ConnectionString, config.QueueName)
4647
if err != nil {

0 commit comments

Comments
 (0)