@@ -22,50 +22,61 @@ use core::cell::UnsafeCell;
22
22
23
23
use sync:: atomic:: { AtomicPtr , AtomicUsize , Ordering } ;
24
24
25
+ use super :: cache_aligned:: CacheAligned ;
26
+
25
27
// Node within the linked list queue of messages to send
26
28
struct Node < T > {
27
29
// FIXME: this could be an uninitialized T if we're careful enough, and
28
30
// that would reduce memory usage (and be a bit faster).
29
31
// is it worth it?
30
32
value : Option < T > , // nullable for re-use of nodes
33
+ cached : bool , // This node goes into the node cache
31
34
next : AtomicPtr < Node < T > > , // next node in the queue
32
35
}
33
36
34
37
/// The single-producer single-consumer queue. This structure is not cloneable,
35
38
/// but it can be safely shared in an Arc if it is guaranteed that there
36
39
/// is only one popper and one pusher touching the queue at any one point in
37
40
/// time.
38
- pub struct Queue < T > {
41
+ pub struct Queue < T , ProducerAddition = ( ) , ConsumerAddition = ( ) > {
39
42
// consumer fields
43
+ consumer : CacheAligned < Consumer < T , ConsumerAddition > > ,
44
+
45
+ // producer fields
46
+ producer : CacheAligned < Producer < T , ProducerAddition > > ,
47
+ }
48
+
49
+ struct Consumer < T , Addition > {
40
50
tail : UnsafeCell < * mut Node < T > > , // where to pop from
41
51
tail_prev : AtomicPtr < Node < T > > , // where to pop from
52
+ cache_bound : usize , // maximum cache size
53
+ cached_nodes : AtomicUsize , // number of nodes marked as cachable
54
+ addition : Addition ,
55
+ }
42
56
43
- // producer fields
57
+ struct Producer < T , Addition > {
44
58
head : UnsafeCell < * mut Node < T > > , // where to push to
45
59
first : UnsafeCell < * mut Node < T > > , // where to get new nodes from
46
60
tail_copy : UnsafeCell < * mut Node < T > > , // between first/tail
47
-
48
- // Cache maintenance fields. Additions and subtractions are stored
49
- // separately in order to allow them to use nonatomic addition/subtraction.
50
- cache_bound : usize ,
51
- cache_additions : AtomicUsize ,
52
- cache_subtractions : AtomicUsize ,
61
+ addition : Addition ,
53
62
}
54
63
55
- unsafe impl < T : Send > Send for Queue < T > { }
64
+ unsafe impl < T : Send , P : Send + Sync , C : Send + Sync > Send for Queue < T , P , C > { }
56
65
57
- unsafe impl < T : Send > Sync for Queue < T > { }
66
+ unsafe impl < T : Send , P : Send + Sync , C : Send + Sync > Sync for Queue < T , P , C > { }
58
67
59
68
impl < T > Node < T > {
60
69
fn new ( ) -> * mut Node < T > {
61
70
Box :: into_raw ( box Node {
62
71
value : None ,
72
+ cached : false ,
63
73
next : AtomicPtr :: new ( ptr:: null_mut :: < Node < T > > ( ) ) ,
64
74
} )
65
75
}
66
76
}
67
77
68
78
impl < T > Queue < T > {
79
+ #[ cfg( test) ]
69
80
/// Creates a new queue.
70
81
///
71
82
/// This is unsafe as the type system doesn't enforce a single
@@ -84,18 +95,60 @@ impl<T> Queue<T> {
84
95
/// no bound. Otherwise, the cache will never grow larger than
85
96
/// `bound` (although the queue itself could be much larger.
86
97
pub unsafe fn new ( bound : usize ) -> Queue < T > {
98
+ Self :: with_additions ( bound, ( ) , ( ) )
99
+ }
100
+ }
101
+
102
+ impl < T , ProducerAddition , ConsumerAddition > Queue < T , ProducerAddition , ConsumerAddition > {
103
+
104
+ /// Creates a new queue. With given additional elements in the producer and
105
+ /// consumer portions of the queue.
106
+ ///
107
+ /// Due to the performance implications of cache-contention,
108
+ /// we wish to keep fields used mainly by the producer on a separate cache
109
+ /// line than those used by the consumer.
110
+ /// Since cache lines are usually 64 bytes, it is unreasonably expensive to
111
+ /// allocate one for small fields, so we allow users to insert additional
112
+ /// fields into the cache lines already allocated by this for the producer
113
+ /// and consumer.
114
+ ///
115
+ /// This is unsafe as the type system doesn't enforce a single
116
+ /// consumer-producer relationship. It also allows the consumer to `pop`
117
+ /// items while there is a `peek` active due to all methods having a
118
+ /// non-mutable receiver.
119
+ ///
120
+ /// # Arguments
121
+ ///
122
+ /// * `bound` - This queue implementation is implemented with a linked
123
+ /// list, and this means that a push is always a malloc. In
124
+ /// order to amortize this cost, an internal cache of nodes is
125
+ /// maintained to prevent a malloc from always being
126
+ /// necessary. This bound is the limit on the size of the
127
+ /// cache (if desired). If the value is 0, then the cache has
128
+ /// no bound. Otherwise, the cache will never grow larger than
129
+ /// `bound` (although the queue itself could be much larger.
130
+ pub unsafe fn with_additions (
131
+ bound : usize ,
132
+ producer_addition : ProducerAddition ,
133
+ consumer_addition : ConsumerAddition ,
134
+ ) -> Self {
87
135
let n1 = Node :: new ( ) ;
88
136
let n2 = Node :: new ( ) ;
89
137
( * n1) . next . store ( n2, Ordering :: Relaxed ) ;
90
138
Queue {
91
- tail : UnsafeCell :: new ( n2) ,
92
- tail_prev : AtomicPtr :: new ( n1) ,
93
- head : UnsafeCell :: new ( n2) ,
94
- first : UnsafeCell :: new ( n1) ,
95
- tail_copy : UnsafeCell :: new ( n1) ,
96
- cache_bound : bound,
97
- cache_additions : AtomicUsize :: new ( 0 ) ,
98
- cache_subtractions : AtomicUsize :: new ( 0 ) ,
139
+ consumer : CacheAligned :: new ( Consumer {
140
+ tail : UnsafeCell :: new ( n2) ,
141
+ tail_prev : AtomicPtr :: new ( n1) ,
142
+ cache_bound : bound,
143
+ cached_nodes : AtomicUsize :: new ( 0 ) ,
144
+ addition : consumer_addition
145
+ } ) ,
146
+ producer : CacheAligned :: new ( Producer {
147
+ head : UnsafeCell :: new ( n2) ,
148
+ first : UnsafeCell :: new ( n1) ,
149
+ tail_copy : UnsafeCell :: new ( n1) ,
150
+ addition : producer_addition
151
+ } ) ,
99
152
}
100
153
}
101
154
@@ -109,35 +162,25 @@ impl<T> Queue<T> {
109
162
assert ! ( ( * n) . value. is_none( ) ) ;
110
163
( * n) . value = Some ( t) ;
111
164
( * n) . next . store ( ptr:: null_mut ( ) , Ordering :: Relaxed ) ;
112
- ( * * self . head . get ( ) ) . next . store ( n, Ordering :: Release ) ;
113
- * self . head . get ( ) = n;
165
+ ( * * self . producer . head . get ( ) ) . next . store ( n, Ordering :: Release ) ;
166
+ * ( & self . producer . head ) . get ( ) = n;
114
167
}
115
168
}
116
169
117
170
unsafe fn alloc ( & self ) -> * mut Node < T > {
118
171
// First try to see if we can consume the 'first' node for our uses.
119
- // We try to avoid as many atomic instructions as possible here, so
120
- // the addition to cache_subtractions is not atomic (plus we're the
121
- // only one subtracting from the cache).
122
- if * self . first . get ( ) != * self . tail_copy . get ( ) {
123
- if self . cache_bound > 0 {
124
- let b = self . cache_subtractions . load ( Ordering :: Relaxed ) ;
125
- self . cache_subtractions . store ( b + 1 , Ordering :: Relaxed ) ;
126
- }
127
- let ret = * self . first . get ( ) ;
128
- * self . first . get ( ) = ( * ret) . next . load ( Ordering :: Relaxed ) ;
172
+ if * self . producer . first . get ( ) != * self . producer . tail_copy . get ( ) {
173
+ let ret = * self . producer . first . get ( ) ;
174
+ * self . producer . 0 . first . get ( ) = ( * ret) . next . load ( Ordering :: Relaxed ) ;
129
175
return ret;
130
176
}
131
177
// If the above fails, then update our copy of the tail and try
132
178
// again.
133
- * self . tail_copy . get ( ) = self . tail_prev . load ( Ordering :: Acquire ) ;
134
- if * self . first . get ( ) != * self . tail_copy . get ( ) {
135
- if self . cache_bound > 0 {
136
- let b = self . cache_subtractions . load ( Ordering :: Relaxed ) ;
137
- self . cache_subtractions . store ( b + 1 , Ordering :: Relaxed ) ;
138
- }
139
- let ret = * self . first . get ( ) ;
140
- * self . first . get ( ) = ( * ret) . next . load ( Ordering :: Relaxed ) ;
179
+ * self . producer . 0 . tail_copy . get ( ) =
180
+ self . consumer . tail_prev . load ( Ordering :: Acquire ) ;
181
+ if * self . producer . first . get ( ) != * self . producer . tail_copy . get ( ) {
182
+ let ret = * self . producer . first . get ( ) ;
183
+ * self . producer . 0 . first . get ( ) = ( * ret) . next . load ( Ordering :: Relaxed ) ;
141
184
return ret;
142
185
}
143
186
// If all of that fails, then we have to allocate a new node
@@ -153,27 +196,27 @@ impl<T> Queue<T> {
153
196
// sentinel from where we should start popping from. Hence, look at
154
197
// tail's next field and see if we can use it. If we do a pop, then
155
198
// the current tail node is a candidate for going into the cache.
156
- let tail = * self . tail . get ( ) ;
199
+ let tail = * self . consumer . tail . get ( ) ;
157
200
let next = ( * tail) . next . load ( Ordering :: Acquire ) ;
158
201
if next. is_null ( ) { return None }
159
202
assert ! ( ( * next) . value. is_some( ) ) ;
160
203
let ret = ( * next) . value . take ( ) ;
161
204
162
- * self . tail . get ( ) = next;
163
- if self . cache_bound == 0 {
164
- self . tail_prev . store ( tail, Ordering :: Release ) ;
205
+ * self . consumer . 0 . tail . get ( ) = next;
206
+ if self . consumer . cache_bound == 0 {
207
+ self . consumer . tail_prev . store ( tail, Ordering :: Release ) ;
165
208
} else {
166
- // FIXME: this is dubious with overflow.
167
- let additions = self . cache_additions . load ( Ordering :: Relaxed ) ;
168
- let subtractions = self . cache_subtractions . load ( Ordering :: Relaxed ) ;
169
- let size = additions - subtractions ;
170
-
171
- if size < self . cache_bound {
172
- self . tail_prev . store ( tail, Ordering :: Release ) ;
173
- self . cache_additions . store ( additions + 1 , Ordering :: Relaxed ) ;
209
+ let cached_nodes = self . consumer . cached_nodes . load ( Ordering :: Relaxed ) ;
210
+ if cached_nodes < self . consumer . cache_bound && ! ( * tail ) . cached {
211
+ self . consumer . cached_nodes . store ( cached_nodes , Ordering :: Relaxed ) ;
212
+ ( * tail ) . cached = true ;
213
+ }
214
+
215
+ if ( * tail) . cached {
216
+ self . consumer . tail_prev . store ( tail , Ordering :: Release ) ;
174
217
} else {
175
- ( * self . tail_prev . load ( Ordering :: Relaxed ) )
176
- . next . store ( next, Ordering :: Relaxed ) ;
218
+ ( * self . consumer . tail_prev . load ( Ordering :: Relaxed ) )
219
+ . next . store ( next, Ordering :: Relaxed ) ;
177
220
// We have successfully erased all references to 'tail', so
178
221
// now we can safely drop it.
179
222
let _: Box < Node < T > > = Box :: from_raw ( tail) ;
@@ -194,17 +237,25 @@ impl<T> Queue<T> {
194
237
// This is essentially the same as above with all the popping bits
195
238
// stripped out.
196
239
unsafe {
197
- let tail = * self . tail . get ( ) ;
240
+ let tail = * self . consumer . tail . get ( ) ;
198
241
let next = ( * tail) . next . load ( Ordering :: Acquire ) ;
199
242
if next. is_null ( ) { None } else { ( * next) . value . as_mut ( ) }
200
243
}
201
244
}
245
+
246
+ pub fn producer_addition ( & self ) -> & ProducerAddition {
247
+ & self . producer . addition
248
+ }
249
+
250
+ pub fn consumer_addition ( & self ) -> & ConsumerAddition {
251
+ & self . consumer . addition
252
+ }
202
253
}
203
254
204
- impl < T > Drop for Queue < T > {
255
+ impl < T , ProducerAddition , ConsumerAddition > Drop for Queue < T , ProducerAddition , ConsumerAddition > {
205
256
fn drop ( & mut self ) {
206
257
unsafe {
207
- let mut cur = * self . first . get ( ) ;
258
+ let mut cur = * self . producer . first . get ( ) ;
208
259
while !cur. is_null ( ) {
209
260
let next = ( * cur) . next . load ( Ordering :: Relaxed ) ;
210
261
let _n: Box < Node < T > > = Box :: from_raw ( cur) ;
0 commit comments