Skip to content

Commit 8917e99

Browse files
committed
rework and document backoff behavior of sync::mpsc
1 parent f8276c9 commit 8917e99

File tree

4 files changed

+30
-31
lines changed

4 files changed

+30
-31
lines changed

library/std/src/sync/mpmc/array.rs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -168,7 +168,7 @@ impl<T> Channel<T> {
168168
return true;
169169
}
170170
Err(_) => {
171-
backoff.spin();
171+
backoff.spin_light();
172172
tail = self.tail.load(Ordering::Relaxed);
173173
}
174174
}
@@ -182,11 +182,11 @@ impl<T> Channel<T> {
182182
return false;
183183
}
184184

185-
backoff.spin();
185+
backoff.spin_light();
186186
tail = self.tail.load(Ordering::Relaxed);
187187
} else {
188188
// Snooze because we need to wait for the stamp to get updated.
189-
backoff.snooze();
189+
backoff.spin_heavy();
190190
tail = self.tail.load(Ordering::Relaxed);
191191
}
192192
}
@@ -251,7 +251,7 @@ impl<T> Channel<T> {
251251
return true;
252252
}
253253
Err(_) => {
254-
backoff.spin();
254+
backoff.spin_light();
255255
head = self.head.load(Ordering::Relaxed);
256256
}
257257
}
@@ -273,11 +273,11 @@ impl<T> Channel<T> {
273273
}
274274
}
275275

276-
backoff.spin();
276+
backoff.spin_light();
277277
head = self.head.load(Ordering::Relaxed);
278278
} else {
279279
// Snooze because we need to wait for the stamp to get updated.
280-
backoff.snooze();
280+
backoff.spin_heavy();
281281
head = self.head.load(Ordering::Relaxed);
282282
}
283283
}
@@ -330,7 +330,7 @@ impl<T> Channel<T> {
330330
if backoff.is_completed() {
331331
break;
332332
} else {
333-
backoff.spin();
333+
backoff.spin_light();
334334
}
335335
}
336336

library/std/src/sync/mpmc/list.rs

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ impl<T> Slot<T> {
4646
fn wait_write(&self) {
4747
let backoff = Backoff::new();
4848
while self.state.load(Ordering::Acquire) & WRITE == 0 {
49-
backoff.snooze();
49+
backoff.spin_heavy();
5050
}
5151
}
5252
}
@@ -82,7 +82,7 @@ impl<T> Block<T> {
8282
if !next.is_null() {
8383
return next;
8484
}
85-
backoff.snooze();
85+
backoff.spin_heavy();
8686
}
8787
}
8888

@@ -191,7 +191,7 @@ impl<T> Channel<T> {
191191

192192
// If we reached the end of the block, wait until the next one is installed.
193193
if offset == BLOCK_CAP {
194-
backoff.snooze();
194+
backoff.spin_heavy();
195195
tail = self.tail.index.load(Ordering::Acquire);
196196
block = self.tail.block.load(Ordering::Acquire);
197197
continue;
@@ -247,7 +247,7 @@ impl<T> Channel<T> {
247247
return true;
248248
},
249249
Err(_) => {
250-
backoff.spin();
250+
backoff.spin_light();
251251
tail = self.tail.index.load(Ordering::Acquire);
252252
block = self.tail.block.load(Ordering::Acquire);
253253
}
@@ -286,7 +286,7 @@ impl<T> Channel<T> {
286286

287287
// If we reached the end of the block, wait until the next one is installed.
288288
if offset == BLOCK_CAP {
289-
backoff.snooze();
289+
backoff.spin_heavy();
290290
head = self.head.index.load(Ordering::Acquire);
291291
block = self.head.block.load(Ordering::Acquire);
292292
continue;
@@ -320,7 +320,7 @@ impl<T> Channel<T> {
320320
// The block can be null here only if the first message is being sent into the channel.
321321
// In that case, just wait until it gets initialized.
322322
if block.is_null() {
323-
backoff.snooze();
323+
backoff.spin_heavy();
324324
head = self.head.index.load(Ordering::Acquire);
325325
block = self.head.block.load(Ordering::Acquire);
326326
continue;
@@ -351,7 +351,7 @@ impl<T> Channel<T> {
351351
return true;
352352
},
353353
Err(_) => {
354-
backoff.spin();
354+
backoff.spin_light();
355355
head = self.head.index.load(Ordering::Acquire);
356356
block = self.head.block.load(Ordering::Acquire);
357357
}
@@ -542,7 +542,7 @@ impl<T> Channel<T> {
542542
// New updates to tail will be rejected by MARK_BIT and aborted unless it's
543543
// at boundary. We need to wait for the updates take affect otherwise there
544544
// can be memory leaks.
545-
backoff.snooze();
545+
backoff.spin_heavy();
546546
tail = self.tail.index.load(Ordering::Acquire);
547547
}
548548

library/std/src/sync/mpmc/utils.rs

Lines changed: 14 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -91,9 +91,8 @@ impl<T> DerefMut for CachePadded<T> {
9191
}
9292

9393
const SPIN_LIMIT: u32 = 6;
94-
const YIELD_LIMIT: u32 = 10;
9594

96-
/// Performs exponential backoff in spin loops.
95+
/// Performs quadratic backoff in spin loops.
9796
pub struct Backoff {
9897
step: Cell<u32>,
9998
}
@@ -104,25 +103,27 @@ impl Backoff {
104103
Backoff { step: Cell::new(0) }
105104
}
106105

107-
/// Backs off in a lock-free loop.
106+
/// Backs off using lightweight spinning.
108107
///
109-
/// This method should be used when we need to retry an operation because another thread made
110-
/// progress.
108+
/// This method should be used for:
109+
/// - Retrying an operation because another thread made progress. i.e. on CAS failure.
110+
/// - Waiting for an operation to complete by spinning optimistically for a few iterations
111+
/// before falling back to parking the thread (see `Backoff::is_completed`).
111112
#[inline]
112-
pub fn spin(&self) {
113+
pub fn spin_light(&self) {
113114
let step = self.step.get().min(SPIN_LIMIT);
114115
for _ in 0..step.pow(2) {
115116
crate::hint::spin_loop();
116117
}
117118

118-
if self.step.get() <= SPIN_LIMIT {
119-
self.step.set(self.step.get() + 1);
120-
}
119+
self.step.set(self.step.get() + 1);
121120
}
122121

123-
/// Backs off in a blocking loop.
122+
/// Backs off using heavyweight spinning.
123+
///
124+
/// This method should be used in blocking loops where parking the thread is not an option.
124125
#[inline]
125-
pub fn snooze(&self) {
126+
pub fn spin_heavy(&self) {
126127
if self.step.get() <= SPIN_LIMIT {
127128
for _ in 0..self.step.get().pow(2) {
128129
crate::hint::spin_loop()
@@ -131,12 +132,10 @@ impl Backoff {
131132
crate::thread::yield_now();
132133
}
133134

134-
if self.step.get() <= YIELD_LIMIT {
135-
self.step.set(self.step.get() + 1);
136-
}
135+
self.step.set(self.step.get() + 1);
137136
}
138137

139-
/// Returns `true` if quadratic backoff has completed and blocking the thread is advised.
138+
/// Returns `true` if quadratic backoff has completed and parking the thread is advised.
140139
#[inline]
141140
pub fn is_completed(&self) -> bool {
142141
self.step.get() > SPIN_LIMIT

library/std/src/sync/mpmc/zero.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ impl<T> Packet<T> {
5757
fn wait_ready(&self) {
5858
let backoff = Backoff::new();
5959
while !self.ready.load(Ordering::Acquire) {
60-
backoff.snooze();
60+
backoff.spin_heavy();
6161
}
6262
}
6363
}

0 commit comments

Comments
 (0)