Skip to content

Commit 550e24f

Browse files
committed
Use WakerSet for Condvar
This should also address concerns about spurious wakeups.
1 parent b49abe1 commit 550e24f

File tree

5 files changed

+54
-53
lines changed

5 files changed

+54
-53
lines changed

src/sync/condvar.rs

+27-50
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,11 @@
1+
use std::fmt;
12
use std::pin::Pin;
23
use std::time::Duration;
34

4-
use slab::Slab;
5-
65
use super::mutex::{guard_lock, MutexGuard};
76
use crate::future::{timeout, Future};
8-
use crate::task::{Context, Poll, Waker};
7+
use crate::sync::WakerSet;
8+
use crate::task::{Context, Poll};
99

1010
#[derive(Debug, PartialEq, Eq, Copy, Clone)]
1111
pub struct WaitTimeoutResult(bool);
@@ -56,11 +56,13 @@ impl WaitTimeoutResult {
5656
///
5757
/// # })
5858
/// ```
59-
#[derive(Debug)]
6059
pub struct Condvar {
61-
blocked: std::sync::Mutex<Slab<Option<Waker>>>,
60+
wakers: WakerSet,
6261
}
6362

63+
unsafe impl Send for Condvar {}
64+
unsafe impl Sync for Condvar {}
65+
6466
impl Default for Condvar {
6567
fn default() -> Self {
6668
Condvar::new()
@@ -79,7 +81,7 @@ impl Condvar {
7981
/// ```
8082
pub fn new() -> Self {
8183
Condvar {
82-
blocked: std::sync::Mutex::new(Slab::new()),
84+
wakers: WakerSet::new(),
8385
}
8486
}
8587

@@ -88,12 +90,6 @@ impl Condvar {
8890
/// Unlike the std equivalent, this does not check that a single mutex is used at runtime.
8991
/// However, as a best practice avoid using with multiple mutexes.
9092
///
91-
/// # Warning
92-
/// Any attempt to poll this future before the notification is received will result in a
93-
/// spurious wakeup. This allows the implementation to be efficient, and is technically valid
94-
/// semantics for a condition variable. However, this may result in unexpected behaviour when this future is
95-
/// used with future combinators. In most cases `Condvar::wait_until` is easier to use correctly.
96-
///
9793
/// # Examples
9894
///
9995
/// ```
@@ -136,7 +132,6 @@ impl Condvar {
136132
cond: self,
137133
guard: Some(guard),
138134
key: None,
139-
notified: false,
140135
}
141136
}
142137

@@ -189,13 +184,6 @@ impl Condvar {
189184

190185
/// Waits on this condition variable for a notification, timing out after a specified duration.
191186
///
192-
/// # Warning
193-
/// This has similar limitations to `Condvar::wait`, where polling before a notify is sent can
194-
/// result in a spurious wakeup. In addition, the timeout may itself trigger a spurious wakeup,
195-
/// if no other task is holding the mutex when the future is polled. Thus the
196-
/// `WaitTimeoutResult` should not be trusted to determine if the condition variable was
197-
/// actually notified.
198-
///
199187
/// For these reasons `Condvar::wait_timeout_until` is recommended in most cases.
200188
///
201189
/// # Examples
@@ -234,7 +222,6 @@ impl Condvar {
234222
/// #
235223
/// # })
236224
/// ```
237-
#[cfg(feature = "unstable")]
238225
#[allow(clippy::needless_lifetimes)]
239226
pub async fn wait_timeout<'a, T>(
240227
&self,
@@ -332,8 +319,7 @@ impl Condvar {
332319
/// # }) }
333320
/// ```
334321
pub fn notify_one(&self) {
335-
let blocked = self.blocked.lock().unwrap();
336-
notify(blocked, false);
322+
self.wakers.notify_one();
337323
}
338324

339325
/// Wakes up all blocked tasks on this condvar.
@@ -369,28 +355,21 @@ impl Condvar {
369355
/// # }) }
370356
/// ```
371357
pub fn notify_all(&self) {
372-
let blocked = self.blocked.lock().unwrap();
373-
notify(blocked, true);
358+
self.wakers.notify_all();
374359
}
375360
}
376361

377-
#[inline]
378-
fn notify(mut blocked: std::sync::MutexGuard<'_, Slab<Option<Waker>>>, all: bool) {
379-
for (_, entry) in blocked.iter_mut() {
380-
if let Some(w) = entry.take() {
381-
w.wake();
382-
if !all {
383-
return;
384-
}
385-
}
362+
impl fmt::Debug for Condvar {
363+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
364+
//f.debug_struct("Condvar").finish()
365+
f.pad("Condvar { .. }")
386366
}
387367
}
388368

389369
struct AwaitNotify<'a, 'b, T> {
390370
cond: &'a Condvar,
391371
guard: Option<MutexGuard<'b, T>>,
392372
key: Option<usize>,
393-
notified: bool,
394373
}
395374

396375
impl<'a, 'b, T> Future for AwaitNotify<'a, 'b, T> {
@@ -399,16 +378,22 @@ impl<'a, 'b, T> Future for AwaitNotify<'a, 'b, T> {
399378
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
400379
match self.guard.take() {
401380
Some(_) => {
402-
let mut blocked = self.cond.blocked.lock().unwrap();
403-
let w = cx.waker().clone();
404-
self.key = Some(blocked.insert(Some(w)));
405-
381+
self.key = Some(self.cond.wakers.insert(cx));
406382
// the guard is dropped when we return, which frees the lock
407383
Poll::Pending
408384
}
409385
None => {
410-
self.notified = true;
411-
Poll::Ready(())
386+
if let Some(key) = self.key {
387+
if self.cond.wakers.complete_if_notified(key, cx) {
388+
self.key = None;
389+
Poll::Ready(())
390+
} else {
391+
Poll::Pending
392+
}
393+
} else {
394+
// This should only happen if it is polled twice after receiving a notification
395+
Poll::Ready(())
396+
}
412397
}
413398
}
414399
}
@@ -417,15 +402,7 @@ impl<'a, 'b, T> Future for AwaitNotify<'a, 'b, T> {
417402
impl<'a, 'b, T> Drop for AwaitNotify<'a, 'b, T> {
418403
fn drop(&mut self) {
419404
if let Some(key) = self.key {
420-
let mut blocked = self.cond.blocked.lock().unwrap();
421-
let opt_waker = blocked.remove(key);
422-
423-
if opt_waker.is_none() && !self.notified {
424-
// wake up the next task, because this task was notified, but
425-
// we are dropping it before it can finished.
426-
// This may result in a spurious wake-up, but that's ok.
427-
notify(blocked, false);
428-
}
405+
self.cond.wakers.cancel(key);
429406
}
430407
}
431408
}

src/sync/mod.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -176,19 +176,19 @@
176176
#[doc(inline)]
177177
pub use std::sync::{Arc, Weak};
178178

179-
pub use condvar::Condvar;
180179
pub use mutex::{Mutex, MutexGuard};
181180
pub use rwlock::{RwLock, RwLockReadGuard, RwLockWriteGuard};
182181

183-
mod condvar;
184182
mod mutex;
185183
mod rwlock;
186184

187185
cfg_unstable! {
188186
pub use barrier::{Barrier, BarrierWaitResult};
189187
pub use channel::{channel, Sender, Receiver};
188+
pub use condvar::Condvar;
190189

191190
mod barrier;
191+
mod condvar;
192192
mod channel;
193193
}
194194

src/sync/mutex.rs

+1
Original file line numberDiff line numberDiff line change
@@ -286,6 +286,7 @@ impl<T> DerefMut for MutexGuard<'_, T> {
286286
}
287287
}
288288

289+
#[cfg(feature = "unstable")]
289290
pub fn guard_lock<'a, T>(guard: &MutexGuard<'a, T>) -> &'a Mutex<T> {
290291
guard.0
291292
}

src/sync/waker_set.rs

+23
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,29 @@ impl WakerSet {
8080
}
8181
}
8282

83+
/// If the waker for this key is still waiting for a notification, then update
84+
/// the waker for the entry, and return false. If the waker has been notified,
85+
/// treat the entry as completed and return true.
86+
#[cfg(feature = "unstable")]
87+
pub fn complete_if_notified(&self, key: usize, cx: &Context<'_>) -> bool {
88+
let mut inner = self.lock();
89+
90+
match &mut inner.entries[key] {
91+
None => {
92+
inner.entries.remove(key);
93+
inner.none_count -= 1;
94+
true
95+
}
96+
Some(w) => {
97+
// We were never woken, so update instead
98+
if !w.will_wake(cx.waker()) {
99+
*w = cx.waker().clone();
100+
}
101+
false
102+
}
103+
}
104+
}
105+
83106
/// Removes the waker of a cancelled operation.
84107
///
85108
/// Returns `true` if another blocked operation from the set was notified.

tests/condvar.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
1+
#![cfg(feature = "unstable")]
12
use std::sync::Arc;
23
use std::time::Duration;
34

45
use async_std::sync::{Condvar, Mutex};
56
use async_std::task::{self, JoinHandle};
67

7-
#[cfg(feature = "unstable")]
88
#[test]
99
fn wait_timeout() {
1010
task::block_on(async {

0 commit comments

Comments
 (0)