Skip to content

Commit b49abe1

Browse files
committed
Implement wait_timeout_until
And add warnings about spurious wakeups to wait and wait_timeout
1 parent 7cefb25 commit b49abe1

File tree

2 files changed

+96
-40
lines changed

2 files changed

+96
-40
lines changed

src/sync/condvar.rs

+80-40
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,10 @@
11
use std::pin::Pin;
22
use std::time::Duration;
33

4-
use futures_timer::Delay;
54
use slab::Slab;
65

76
use super::mutex::{guard_lock, MutexGuard};
8-
use crate::future::Future;
7+
use crate::future::{timeout, Future};
98
use crate::task::{Context, Poll, Waker};
109

1110
#[derive(Debug, PartialEq, Eq, Copy, Clone)]
@@ -29,7 +28,7 @@ impl WaitTimeoutResult {
2928
/// # Examples
3029
///
3130
/// ```
32-
/// # fn main() { async_std::task::block_on(async {
31+
/// # async_std::task::block_on(async {
3332
/// #
3433
/// use std::sync::Arc;
3534
///
@@ -55,7 +54,7 @@ impl WaitTimeoutResult {
5554
/// started = cvar.wait(started).await;
5655
/// }
5756
///
58-
/// # }) }
57+
/// # })
5958
/// ```
6059
#[derive(Debug)]
6160
pub struct Condvar {
@@ -89,10 +88,16 @@ impl Condvar {
8988
/// Unlike the std equivalent, this does not check that a single mutex is used at runtime.
9089
/// However, as a best practice avoid using with multiple mutexes.
9190
///
91+
/// # Warning
92+
/// Any attempt to poll this future before the notification is received will result in a
93+
/// spurious wakeup. This allows the implementation to be efficient, and is technically valid
94+
/// semantics for a condition variable. However, this may result in unexpected behaviour when this future is
95+
/// used with future combinators. In most cases `Condvar::wait_until` is easier to use correctly.
96+
///
9297
/// # Examples
9398
///
9499
/// ```
95-
/// # fn main() { async_std::task::block_on(async {
100+
/// # async_std::task::block_on(async {
96101
/// use std::sync::Arc;
97102
///
98103
/// use async_std::sync::{Mutex, Condvar};
@@ -115,7 +120,7 @@ impl Condvar {
115120
/// while !*started {
116121
/// started = cvar.wait(started).await;
117122
/// }
118-
/// # }) }
123+
/// # })
119124
/// ```
120125
#[allow(clippy::needless_lifetimes)]
121126
pub async fn wait<'a, T>(&self, guard: MutexGuard<'a, T>) -> MutexGuard<'a, T> {
@@ -142,7 +147,7 @@ impl Condvar {
142147
/// # Examples
143148
///
144149
/// ```
145-
/// # fn main() { async_std::task::block_on(async {
150+
/// # async_std::task::block_on(async {
146151
/// #
147152
/// use std::sync::Arc;
148153
///
@@ -165,9 +170,8 @@ impl Condvar {
165170
/// // As long as the value inside the `Mutex<bool>` is `false`, we wait.
166171
/// let _guard = cvar.wait_until(lock.lock().await, |started| { *started }).await;
167172
/// #
168-
/// # }) }
173+
/// # })
169174
/// ```
170-
#[cfg(feature = "unstable")]
171175
#[allow(clippy::needless_lifetimes)]
172176
pub async fn wait_until<'a, T, F>(
173177
&self,
@@ -185,10 +189,19 @@ impl Condvar {
185189

186190
/// Waits on this condition variable for a notification, timing out after a specified duration.
187191
///
192+
/// # Warning
193+
/// This has similar limitations to `Condvar::wait`, where polling before a notify is sent can
194+
/// result in a spurious wakeup. In addition, the timeout may itself trigger a spurious wakeup,
195+
/// if no other task is holding the mutex when the future is polled. Thus the
196+
/// `WaitTimeoutResult` should not be trusted to determine if the condition variable was
197+
/// actually notified.
198+
///
199+
/// For these reasons `Condvar::wait_timeout_until` is recommended in most cases.
200+
///
188201
/// # Examples
189202
///
190203
/// ```
191-
/// # fn main() { async_std::task::block_on(async {
204+
/// # async_std::task::block_on(async {
192205
/// #
193206
/// use std::sync::Arc;
194207
/// use std::time::Duration;
@@ -219,22 +232,73 @@ impl Condvar {
219232
/// }
220233
/// }
221234
/// #
222-
/// # }) }
235+
/// # })
223236
/// ```
237+
#[cfg(feature = "unstable")]
224238
#[allow(clippy::needless_lifetimes)]
225239
pub async fn wait_timeout<'a, T>(
226240
&self,
227241
guard: MutexGuard<'a, T>,
228242
dur: Duration,
229243
) -> (MutexGuard<'a, T>, WaitTimeoutResult) {
230244
let mutex = guard_lock(&guard);
231-
let timeout_result = TimeoutWaitFuture {
232-
await_notify: self.await_notify(guard),
233-
delay: Delay::new(dur),
245+
match timeout(dur, self.wait(guard)).await {
246+
Ok(guard) => (guard, WaitTimeoutResult(false)),
247+
Err(_) => (mutex.lock().await, WaitTimeoutResult(true)),
234248
}
235-
.await;
249+
}
236250

237-
(mutex.lock().await, timeout_result)
251+
/// Waits on this condition variable for a notification, timing out after a specified duration.
252+
/// Spurious wakes will not cause this function to return.
253+
///
254+
/// # Examples
255+
/// ```
256+
/// # async_std::task::block_on(async {
257+
/// use std::sync::Arc;
258+
/// use std::time::Duration;
259+
///
260+
/// use async_std::sync::{Mutex, Condvar};
261+
/// use async_std::task;
262+
///
263+
/// let pair = Arc::new((Mutex::new(false), Condvar::new()));
264+
/// let pair2 = pair.clone();
265+
///
266+
/// task::spawn(async move {
267+
/// let (lock, cvar) = &*pair2;
268+
/// let mut started = lock.lock().await;
269+
/// *started = true;
270+
/// // We notify the condvar that the value has changed.
271+
/// cvar.notify_one();
272+
/// });
273+
///
274+
/// // wait for the thread to start up
275+
/// let (lock, cvar) = &*pair;
276+
/// let result = cvar.wait_timeout_until(
277+
/// lock.lock().await,
278+
/// Duration::from_millis(100),
279+
/// |&mut started| started,
280+
/// ).await;
281+
/// if result.1.timed_out() {
282+
/// // timed-out without the condition ever evaluating to true.
283+
/// }
284+
/// // access the locked mutex via result.0
285+
/// # });
286+
/// ```
287+
#[allow(clippy::needless_lifetimes)]
288+
pub async fn wait_timeout_until<'a, T, F>(
289+
&self,
290+
guard: MutexGuard<'a, T>,
291+
dur: Duration,
292+
condition: F,
293+
) -> (MutexGuard<'a, T>, WaitTimeoutResult)
294+
where
295+
F: FnMut(&mut T) -> bool,
296+
{
297+
let mutex = guard_lock(&guard);
298+
match timeout(dur, self.wait_until(guard, condition)).await {
299+
Ok(guard) => (guard, WaitTimeoutResult(false)),
300+
Err(_) => (mutex.lock().await, WaitTimeoutResult(true)),
301+
}
238302
}
239303

240304
/// Wakes up one blocked task on this condvar.
@@ -365,27 +429,3 @@ impl<'a, 'b, T> Drop for AwaitNotify<'a, 'b, T> {
365429
}
366430
}
367431
}
368-
369-
struct TimeoutWaitFuture<'a, 'b, T> {
370-
await_notify: AwaitNotify<'a, 'b, T>,
371-
delay: Delay,
372-
}
373-
374-
impl<'a, 'b, T> TimeoutWaitFuture<'a, 'b, T> {
375-
pin_utils::unsafe_pinned!(await_notify: AwaitNotify<'a, 'b, T>);
376-
pin_utils::unsafe_pinned!(delay: Delay);
377-
}
378-
379-
impl<'a, 'b, T> Future for TimeoutWaitFuture<'a, 'b, T> {
380-
type Output = WaitTimeoutResult;
381-
382-
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
383-
match self.as_mut().delay().poll(cx) {
384-
Poll::Pending => match self.await_notify().poll(cx) {
385-
Poll::Ready(_) => Poll::Ready(WaitTimeoutResult(false)),
386-
Poll::Pending => Poll::Pending,
387-
},
388-
Poll::Ready(_) => Poll::Ready(WaitTimeoutResult(true)),
389-
}
390-
}
391-
}

tests/condvar.rs

+16
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ use std::time::Duration;
44
use async_std::sync::{Condvar, Mutex};
55
use async_std::task::{self, JoinHandle};
66

7+
#[cfg(feature = "unstable")]
78
#[test]
89
fn wait_timeout() {
910
task::block_on(async {
@@ -25,6 +26,21 @@ fn wait_timeout() {
2526
})
2627
}
2728

29+
#[test]
30+
fn wait_timeout_until_timed_out() {
31+
task::block_on(async {
32+
let m = Mutex::new(false);
33+
let c = Condvar::new();
34+
35+
let (_, wait_result) = c
36+
.wait_timeout_until(m.lock().await, Duration::from_millis(10), |&mut started| {
37+
started
38+
})
39+
.await;
40+
assert!(wait_result.timed_out());
41+
})
42+
}
43+
2844
#[test]
2945
fn notify_all() {
3046
task::block_on(async {

0 commit comments

Comments
 (0)