Skip to content

Add Condvar APIs not susceptible to spurious wake #47970

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 9 commits into from
Feb 25, 2018
Merged
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
208 changes: 206 additions & 2 deletions src/libstd/sync/condvar.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use sync::{mutex, MutexGuard, PoisonError};
use sys_common::condvar as sys;
use sys_common::mutex as sys_mutex;
use sys_common::poison::{self, LockResult};
use time::Duration;
use time::{Duration, Instant};

/// A type indicating whether a timed wait on a condition variable returned
/// due to a time out or not.
Expand Down Expand Up @@ -219,6 +219,62 @@ impl Condvar {
}
}

/// Blocks the current thread until this condition variable receives a
/// notification and the required condition is met. Spurious wakeups are
/// ignored and this function will only return once the condition has been
/// met.
///
/// This function will atomically unlock the mutex specified (represented by
/// `guard`) and block the current thread. This means that any calls
/// to [`notify_one`] or [`notify_all`] which happen logically after the
/// mutex is unlocked are candidates to wake this thread up. When this
/// function call returns, the lock specified will have been re-acquired.
///
/// # Errors
///
/// This function will return an error if the mutex being waited on is
/// poisoned when this thread re-acquires the lock. For more information,
/// see information about [poisoning] on the [`Mutex`] type.
///
/// [`notify_one`]: #method.notify_one
/// [`notify_all`]: #method.notify_all
/// [poisoning]: ../sync/struct.Mutex.html#poisoning
/// [`Mutex`]: ../sync/struct.Mutex.html
///
/// # Examples
///
/// ```
/// use std::sync::{Arc, Mutex, Condvar};
/// use std::thread;
///
/// let pair = Arc::new((Mutex::new(false), Condvar::new()));
/// let pair2 = pair.clone();
///
/// thread::spawn(move|| {
/// let &(ref lock, ref cvar) = &*pair2;
/// let mut started = lock.lock().unwrap();
/// *started = true;
/// // We notify the condvar that the value has changed.
/// cvar.notify_one();
/// });
///
/// // Wait for the thread to start up.
/// let &(ref lock, ref cvar) = &*pair;
/// // As long as the value inside the `Mutex` is false, we wait.
/// cvar.wait_until(lock.lock().unwrap(), |started| { started });
/// ```
#[stable(feature = "wait_until", since = "1.24")]
pub fn wait_until<'a, T, F>(&self, mut guard: MutexGuard<'a, T>,
mut condition: F)
-> LockResult<MutexGuard<'a, T>>
where F: FnMut(&mut T) -> bool {
while !condition(&mut *guard) {
guard = self.wait(guard)?;
}
Ok(guard)
}


/// Waits on this condition variable for a notification, timing out after a
/// specified duration.
///
Expand Down Expand Up @@ -293,7 +349,15 @@ impl Condvar {
///
/// Note that the best effort is made to ensure that the time waited is
/// measured with a monotonic clock, and not affected by the changes made to
/// the system time.
/// the system time. This function is susceptible to spurious wakeups.
/// Condition variables normally have a boolean predicate associated with
/// them, and the predicate must always be checked each time this function
/// returns to protect against spurious wakeups. Additionally, it is
/// typically desirable for the time-out to not exceed some duration in
/// spite of spurious wakes, thus the sleep-duration is decremented by the
/// amount slept. Alternatively, use the `wait_timeout_until` method
/// to wait until a condition is met with a total time-out regardless
/// of spurious wakes.
///
/// The returned [`WaitTimeoutResult`] value indicates if the timeout is
/// known to have elapsed.
Expand All @@ -302,6 +366,7 @@ impl Condvar {
/// returns, regardless of whether the timeout elapsed or not.
///
/// [`wait`]: #method.wait
/// [`wait_timeout_until`]: #method.wait_timeout_until
/// [`WaitTimeoutResult`]: struct.WaitTimeoutResult.html
///
/// # Examples
Expand Down Expand Up @@ -353,6 +418,76 @@ impl Condvar {
}
}

/// Waits on this condition variable for a notification, timing out after a
/// specified duration. Spurious wakes will not cause this function to
/// return.
///
/// The semantics of this function are equivalent to [`wait_until`] except
/// that the thread will be blocked for roughly no longer than `dur`. This
/// method should not be used for precise timing due to anomalies such as
/// preemption or platform differences that may not cause the maximum
/// amount of time waited to be precisely `dur`.
///
/// Note that the best effort is made to ensure that the time waited is
/// measured with a monotonic clock, and not affected by the changes made to
/// the system time.
///
/// The returned [`WaitTimeoutResult`] value indicates if the timeout is
/// known to have elapsed without the condition being met.
///
/// Like [`wait_until`], the lock specified will be re-acquired when this
/// function returns, regardless of whether the timeout elapsed or not.
///
/// [`wait_until`]: #method.wait_until
/// [`wait_timeout`]: #method.wait_timeout
/// [`WaitTimeoutResult`]: struct.WaitTimeoutResult.html
///
/// # Examples
///
/// ```
/// use std::sync::{Arc, Mutex, Condvar};
/// use std::thread;
/// use std::time::Duration;
///
/// let pair = Arc::new((Mutex::new(false), Condvar::new()));
/// let pair2 = pair.clone();
///
/// thread::spawn(move|| {
/// let &(ref lock, ref cvar) = &*pair2;
/// let mut started = lock.lock().unwrap();
/// *started = true;
/// // We notify the condvar that the value has changed.
/// cvar.notify_one();
/// });
///
/// // wait for the thread to start up
/// let &(ref lock, ref cvar) = &*pair;
/// let result = cvar.wait_timeout_until(lock, Duration::from_millis(100), |started| {
/// started
/// }).unwrap();
/// if result.1.timed_out() {
/// // timed-out without the condition ever evaluating to true.
/// }
/// // access the locked mutex via result.0
/// ```
#[stable(feature = "wait_timeout_until", since = "1.24")]
pub fn wait_timeout_until<'a, T, F>(&self, mut guard: MutexGuard<'a, T>,
mut dur: Duration, mut condition: F)
-> LockResult<(MutexGuard<'a, T>, WaitTimeoutResult)>
where F: FnMut(&mut T) -> bool {
let start = Instant::now();
loop {
if condition(&mut *guard) {
return Ok((guard, WaitTimeoutResult(false)));
}
let timeout = match dur.checked_sub(start.elapsed()) {
Some(timeout) => timeout,
None => return Ok((guard, WaitTimeoutResult(true))),
}
guard = self.wait_timeout(guard, dur)?.0;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should wait for timeout rather than dur.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

}
}

/// Wakes up one blocked thread on this condvar.
///
/// If there is a blocked thread on this condition variable, then it will
Expand Down Expand Up @@ -546,6 +681,29 @@ mod tests {
}
}

#[test]
#[cfg_attr(target_os = "emscripten", ignore)]
fn wait_until() {
let pair = Arc::new((Mutex::new(false), Condvar::new()));
let pair2 = pair.clone();

// Inside of our lock, spawn a new thread, and then wait for it to start.
thread::spawn(move|| {
let &(ref lock, ref cvar) = &*pair2;
let mut started = lock.lock().unwrap();
*started = true;
// We notify the condvar that the value has changed.
cvar.notify_one();
});

// Wait for the thread to start up.
let &(ref lock, ref cvar) = &*pair;
let guard = cvar.wait_until(lock.lock().unwrap(), |started| {
started
});
assert!(*guard);
}

#[test]
#[cfg_attr(target_os = "emscripten", ignore)]
fn wait_timeout_wait() {
Expand All @@ -565,6 +723,52 @@ mod tests {
}
}

#[test]
#[cfg_attr(target_os = "emscripten", ignore)]
fn wait_timeout_until_wait() {
let m = Arc::new(Mutex::new(()));
let c = Arc::new(Condvar::new());

let g = m.lock().unwrap();
let (_g, wait) = c.wait_timeout_until(g, Duration::from_millis(1), || { false }).unwrap();
// no spurious wakeups. ensure it timed-out
assert!(wait.timed_out());
}

#[test]
#[cfg_attr(target_os = "emscripten", ignore)]
fn wait_timeout_until_instant_satisfy() {
let m = Arc::new(Mutex::new(()));
let c = Arc::new(Condvar::new());

let g = m.lock().unwrap();
let (_g, wait) = c.wait_timeout_until(g, Duration::from_millis(0), || { true }).unwrap();
// ensure it didn't time-out even if we were not given any time.
assert!(!wait.timed_out());
}

#[test]
#[cfg_attr(target_os = "emscripten", ignore)]
fn wait_timeout_until_wake() {
let pair = Arc::new((Mutex::new(false), Condvar::new()));
let pair_copy = pair.clone();

let g = m.lock().unwrap();
let t = thread::spawn(move || {
let &(ref lock, ref cvar) = &*pair2;
let mut started = lock.lock().unwrap();
thread::sleep(Duration::from_millis(1));
started = true;
cvar.notify_one();
});
let (g2, wait) = c.wait_timeout_until(g, Duration::from_millis(u64::MAX), |&notified| {
notified
}).unwrap();
// ensure it didn't time-out even if we were not given any time.
assert!(!wait.timed_out());
assert!(*g2);
}

#[test]
#[cfg_attr(target_os = "emscripten", ignore)]
fn wait_timeout_wake() {
Expand Down