Skip to content

Overhaul and make Condvar::wait_timeout public #21132

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jan 17, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
123 changes: 113 additions & 10 deletions src/libstd/sync/condvar.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use prelude::v1::*;

use sync::atomic::{AtomicUsize, Ordering, ATOMIC_USIZE_INIT};
use sync::poison::{self, LockResult};
use sys::time::SteadyTime;
use sys_common::condvar as sys;
use sys_common::mutex as sys_mutex;
use time::Duration;
Expand Down Expand Up @@ -153,20 +154,34 @@ impl Condvar {
///
/// Like `wait`, the lock specified will be re-acquired when this function
/// returns, regardless of whether the timeout elapsed or not.
// Note that this method is *not* public, and this is quite intentional
// because we're not quite sure about the semantics of relative vs absolute
// durations or how the timing guarantees play into what the system APIs
// provide. There are also additional concerns about the unix-specific
// implementation which may need to be addressed.
#[allow(dead_code)]
fn wait_timeout<'a, T>(&self, guard: MutexGuard<'a, T>, dur: Duration)
#[unstable]
pub fn wait_timeout<'a, T>(&self, guard: MutexGuard<'a, T>, dur: Duration)
-> LockResult<(MutexGuard<'a, T>, bool)> {
unsafe {
let me: &'static Condvar = &*(self as *const _);
me.inner.wait_timeout(guard, dur)
}
}

/// Wait on this condition variable for a notification, timing out after a
/// specified duration.
///
/// The semantics of this function are equivalent to `wait_timeout` except
/// that the implementation will repeatedly wait while the duration has not
/// passed and the provided function returns `false`.
#[unstable]
pub fn wait_timeout_with<'a, T, F>(&self,
guard: MutexGuard<'a, T>,
dur: Duration,
f: F)
-> LockResult<(MutexGuard<'a, T>, bool)>
where F: FnMut(LockResult<&mut T>) -> bool {
unsafe {
let me: &'static Condvar = &*(self as *const _);
me.inner.wait_timeout_with(guard, dur, f)
}
}

/// Wake up one blocked thread on this condvar.
///
/// If there is a blocked thread on this condition variable, then it will
Expand Down Expand Up @@ -220,9 +235,9 @@ impl StaticCondvar {
/// specified duration.
///
/// See `Condvar::wait_timeout`.
#[allow(dead_code)] // may want to stabilize this later, see wait_timeout above
fn wait_timeout<'a, T>(&'static self, guard: MutexGuard<'a, T>, dur: Duration)
-> LockResult<(MutexGuard<'a, T>, bool)> {
#[unstable = "may be merged with Condvar in the future"]
pub fn wait_timeout<'a, T>(&'static self, guard: MutexGuard<'a, T>, dur: Duration)
-> LockResult<(MutexGuard<'a, T>, bool)> {
let (poisoned, success) = unsafe {
let lock = mutex::guard_lock(&guard);
self.verify(lock);
Expand All @@ -236,6 +251,50 @@ impl StaticCondvar {
}
}

/// Wait on this condition variable for a notification, timing out after a
/// specified duration.
///
/// The implementation will repeatedly wait while the duration has not
/// passed and the function returns `false`.
///
/// See `Condvar::wait_timeout_with`.
#[unstable = "may be merged with Condvar in the future"]
pub fn wait_timeout_with<'a, T, F>(&'static self,
guard: MutexGuard<'a, T>,
dur: Duration,
mut f: F)
-> LockResult<(MutexGuard<'a, T>, bool)>
where F: FnMut(LockResult<&mut T>) -> bool {
// This could be made more efficient by pushing the implementation into sys::condvar
let start = SteadyTime::now();
let mut guard_result: LockResult<MutexGuard<'a, T>> = Ok(guard);
while !f(guard_result
.as_mut()
.map(|g| &mut **g)
.map_err(|e| poison::new_poison_error(&mut **e.get_mut()))) {
let now = SteadyTime::now();
let consumed = &now - &start;
let guard = guard_result.unwrap_or_else(|e| e.into_inner());
let (new_guard_result, no_timeout) = match self.wait_timeout(guard, dur - consumed) {
Ok((new_guard, no_timeout)) => (Ok(new_guard), no_timeout),
Err(err) => {
let (new_guard, no_timeout) = err.into_inner();
(Err(poison::new_poison_error(new_guard)), no_timeout)
}
};
guard_result = new_guard_result;
if !no_timeout {
let result = f(guard_result
.as_mut()
.map(|g| &mut **g)
.map_err(|e| poison::new_poison_error(&mut **e.get_mut())));
return poison::map_result(guard_result, |g| (g, result));
}
}

poison::map_result(guard_result, |g| (g, true))
}

/// Wake up one blocked thread on this condvar.
///
/// See `Condvar::notify_one`.
Expand Down Expand Up @@ -285,6 +344,7 @@ mod tests {
use super::{StaticCondvar, CONDVAR_INIT};
use sync::mpsc::channel;
use sync::{StaticMutex, MUTEX_INIT, Condvar, Mutex, Arc};
use sync::atomic::{AtomicUsize, ATOMIC_USIZE_INIT, Ordering};
use thread::Thread;
use time::Duration;

Expand Down Expand Up @@ -372,6 +432,49 @@ mod tests {
unsafe { C.destroy(); M.destroy(); }
}

#[test]
fn wait_timeout_with() {
static C: StaticCondvar = CONDVAR_INIT;
static M: StaticMutex = MUTEX_INIT;
static S: AtomicUsize = ATOMIC_USIZE_INIT;

let g = M.lock().unwrap();
let (g, success) = C.wait_timeout_with(g, Duration::nanoseconds(1000), |_| false).unwrap();
assert!(!success);

let (tx, rx) = channel();
let _t = Thread::scoped(move || {
rx.recv().unwrap();
let g = M.lock().unwrap();
S.store(1, Ordering::SeqCst);
C.notify_one();
drop(g);

rx.recv().unwrap();
let g = M.lock().unwrap();
S.store(2, Ordering::SeqCst);
C.notify_one();
drop(g);

rx.recv().unwrap();
let _g = M.lock().unwrap();
S.store(3, Ordering::SeqCst);
C.notify_one();
});

let mut state = 0;
let (_g, success) = C.wait_timeout_with(g, Duration::days(1), |_| {
assert_eq!(state, S.load(Ordering::SeqCst));
tx.send(()).unwrap();
state += 1;
match state {
1|2 => false,
_ => true,
}
}).unwrap();
assert!(success);
}

#[test]
#[should_fail]
fn two_mutexes() {
Expand Down
17 changes: 16 additions & 1 deletion src/libstd/sync/poison.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,8 +99,23 @@ impl<T> fmt::Show for PoisonError<T> {
impl<T> PoisonError<T> {
/// Consumes this error indicating that a lock is poisoned, returning the
/// underlying guard to allow access regardless.
#[stable]
#[deprecated="renamed to into_inner"]
pub fn into_guard(self) -> T { self.guard }

/// Consumes this error indicating that a lock is poisoned, returning the
/// underlying guard to allow access regardless.
#[unstable]
pub fn into_inner(self) -> T { self.guard }

/// Reaches into this error indicating that a lock is poisoned, returning a
/// reference to the underlying guard to allow access regardless.
#[unstable]
pub fn get_ref(&self) -> &T { &self.guard }

/// Reaches into this error indicating that a lock is poisoned, returning a
/// mutable reference to the underlying guard to allow access regardless.
#[unstable]
pub fn get_mut(&mut self) -> &mut T { &mut self.guard }
}

impl<T> FromError<PoisonError<T>> for TryLockError<T> {
Expand Down
56 changes: 36 additions & 20 deletions src/libstd/sys/unix/condvar.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,12 @@

use cell::UnsafeCell;
use libc;
use std::option::Option::{Some, None};
use sys::mutex::{self, Mutex};
use sys::time;
use sys::sync as ffi;
use time::Duration;
use num::{Int, NumCast};

pub struct Condvar { inner: UnsafeCell<ffi::pthread_cond_t> }

Expand Down Expand Up @@ -46,33 +49,46 @@ impl Condvar {
debug_assert_eq!(r, 0);
}

// This implementation is modeled after libcxx's condition_variable
// https://github.com/llvm-mirror/libcxx/blob/release_35/src/condition_variable.cpp#L46
// https://github.com/llvm-mirror/libcxx/blob/release_35/include/__mutex_base#L367
pub unsafe fn wait_timeout(&self, mutex: &Mutex, dur: Duration) -> bool {
assert!(dur >= Duration::nanoseconds(0));
if dur <= Duration::zero() {
return false;
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure how semantically significant this is, but this means that the lock is never unlocked. It seems pretty minor though.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's probably fine if it's documented clearly, but I'm also not sure what the benefit it.


// First, figure out what time it currently is
let mut tv = libc::timeval { tv_sec: 0, tv_usec: 0 };
let r = ffi::gettimeofday(&mut tv, 0 as *mut _);
// First, figure out what time it currently is, in both system and stable time.
// pthread_cond_timedwait uses system time, but we want to report timeout based on stable
// time.
let mut sys_now = libc::timeval { tv_sec: 0, tv_usec: 0 };
let stable_now = time::SteadyTime::now();
let r = ffi::gettimeofday(&mut sys_now, 0 as *mut _);
debug_assert_eq!(r, 0);

// Offset that time with the specified duration
let abs = Duration::seconds(tv.tv_sec as i64) +
Duration::microseconds(tv.tv_usec as i64) +
dur;
let ns = abs.num_nanoseconds().unwrap() as u64;
let timeout = libc::timespec {
tv_sec: (ns / 1000000000) as libc::time_t,
tv_nsec: (ns % 1000000000) as libc::c_long,
let seconds = NumCast::from(dur.num_seconds());
let timeout = match seconds.and_then(|s| sys_now.tv_sec.checked_add(s)) {
Some(sec) => {
libc::timespec {
tv_sec: sec,
tv_nsec: (dur - Duration::seconds(dur.num_seconds()))
.num_nanoseconds().unwrap() as libc::c_long,
}
}
None => {
libc::timespec {
tv_sec: Int::max_value(),
tv_nsec: 1_000_000_000 - 1,
}
}
};

// And wait!
let r = ffi::pthread_cond_timedwait(self.inner.get(), mutex::raw(mutex),
&timeout);
if r != 0 {
debug_assert_eq!(r as int, libc::ETIMEDOUT as int);
false
} else {
true
}
let r = ffi::pthread_cond_timedwait(self.inner.get(), mutex::raw(mutex), &timeout);
debug_assert!(r == libc::ETIMEDOUT || r == 0);

// ETIMEDOUT is not a totally reliable method of determining timeout due to clock shifts,
// so do the check ourselves
&time::SteadyTime::now() - &stable_now < dur
}

#[inline]
Expand Down
1 change: 1 addition & 0 deletions src/libstd/sys/unix/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ pub mod sync;
pub mod tcp;
pub mod thread;
pub mod thread_local;
pub mod time;
pub mod timer;
pub mod tty;
pub mod udp;
Expand Down
Loading