Skip to content

Commit 0aa6381

Browse files
committed
Windows: Implement mutex using futex
Well, the Windows equivalent: `WaitOnAddress`, `WakeByAddressSingle` and `WakeByAddressAll`.
1 parent 89b7830 commit 0aa6381

File tree

13 files changed

+169
-39
lines changed

13 files changed

+169
-39
lines changed

library/std/src/sys/locks/condvar/mod.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
cfg_if::cfg_if! {
22
if #[cfg(any(
3+
all(target_os = "windows", not(target_vendor="win7")),
34
target_os = "linux",
45
target_os = "android",
56
target_os = "freebsd",
@@ -14,9 +15,9 @@ cfg_if::cfg_if! {
1415
} else if #[cfg(target_family = "unix")] {
1516
mod pthread;
1617
pub use pthread::Condvar;
17-
} else if #[cfg(target_os = "windows")] {
18-
mod windows;
19-
pub use windows::Condvar;
18+
} else if #[cfg(all(target_os = "windows", target_vendor = "win7"))] {
19+
mod windows7;
20+
pub use windows7::Condvar;
2021
} else if #[cfg(all(target_vendor = "fortanix", target_env = "sgx"))] {
2122
mod sgx;
2223
pub use sgx::Condvar;

library/std/src/sys/locks/mutex/futex.rs

Lines changed: 66 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -1,36 +1,89 @@
11
use crate::sync::atomic::{
2-
AtomicU32,
2+
self,
33
Ordering::{Acquire, Relaxed, Release},
44
};
5-
use crate::sys::futex::{futex_wait, futex_wake};
5+
use crate::sys::futex::futex_wake;
6+
7+
cfg_if::cfg_if! {
8+
if #[cfg(windows)] {
9+
// On Windows we can have a smol futex
10+
type Atomic = atomic::AtomicU8;
11+
type State = u8;
12+
} else {
13+
type Atomic = atomic::AtomicU32;
14+
type State = u32;
15+
}
16+
}
617

718
pub struct Mutex {
8-
/// 0: unlocked
9-
/// 1: locked, no other threads waiting
10-
/// 2: locked, and other threads waiting (contended)
11-
futex: AtomicU32,
19+
futex: Atomic,
1220
}
1321

22+
const UNLOCKED: State = 0;
23+
const LOCKED: State = 1; // locked, no other threads waiting
24+
const CONTENDED: State = 2; // locked, and other threads waiting (contended)
25+
1426
impl Mutex {
1527
#[inline]
16-
pub const fn new() -> Self {
17-
Self { futex: AtomicU32::new(0) }
28+
pub const fn new() -> Mutex {
29+
Mutex { futex: Atomic::new(UNLOCKED) }
30+
}
31+
32+
#[inline]
33+
pub fn lock(&self) {
34+
if let Err(state) = self.futex.compare_exchange(UNLOCKED, LOCKED, Acquire, Relaxed) {
35+
self.lock_contended(state)
36+
}
1837
}
1938

2039
#[inline]
2140
pub fn try_lock(&self) -> bool {
22-
self.futex.compare_exchange(0, 1, Acquire, Relaxed).is_ok()
41+
self.futex.compare_exchange(UNLOCKED, LOCKED, Acquire, Relaxed).is_ok()
2342
}
2443

2544
#[inline]
26-
pub fn lock(&self) {
27-
if self.futex.compare_exchange(0, 1, Acquire, Relaxed).is_err() {
28-
self.lock_contended();
45+
pub unsafe fn unlock(&self) {
46+
if self.futex.swap(UNLOCKED, Release) == CONTENDED {
47+
// We only wake up one thread. When that thread locks the mutex, it
48+
// will mark the mutex as CONTENDED (see lock_contended above),
49+
// which makes sure that any other waiting threads will also be
50+
// woken up eventually.
51+
self.wake();
2952
}
3053
}
3154

3255
#[cold]
33-
fn lock_contended(&self) {
56+
fn wake(&self) {
57+
futex_wake(&self.futex);
58+
}
59+
}
60+
61+
#[cfg(windows)]
62+
impl Mutex {
63+
#[cold]
64+
fn lock_contended(&self, mut state: State) {
65+
use crate::sys::futex::wait_on_address;
66+
// Note: WaitOnAddress is already quite spin-happy so we don't do any further spinning on top.
67+
loop {
68+
// Put the lock in contended state.
69+
// We avoid an unnecessary write if it as already set to CONTENDED,
70+
// to be friendlier for the caches.
71+
if state != CONTENDED && self.futex.swap(CONTENDED, Acquire) == UNLOCKED {
72+
// We changed it from UNLOCKED to CONTENDED, so we just successfully locked it.
73+
return;
74+
}
75+
// Wait for the futex to change state, assuming it is still CONTENDED.
76+
wait_on_address(&self.futex, CONTENDED, None);
77+
state = self.futex.load(Relaxed);
78+
}
79+
}
80+
}
81+
82+
#[cfg(not(windows))]
83+
impl Mutex {
84+
#[cold]
85+
fn lock_contended(&self, _state: State) {
86+
use crate::sys::futex::futex_wait;
3487
// Spin first to speed things up if the lock is released quickly.
3588
let mut state = self.spin();
3689

@@ -77,20 +130,4 @@ impl Mutex {
77130
spin -= 1;
78131
}
79132
}
80-
81-
#[inline]
82-
pub unsafe fn unlock(&self) {
83-
if self.futex.swap(0, Release) == 2 {
84-
// We only wake up one thread. When that thread locks the mutex, it
85-
// will mark the mutex as contended (2) (see lock_contended above),
86-
// which makes sure that any other waiting threads will also be
87-
// woken up eventually.
88-
self.wake();
89-
}
90-
}
91-
92-
#[cold]
93-
fn wake(&self) {
94-
futex_wake(&self.futex);
95-
}
96133
}

library/std/src/sys/locks/mutex/mod.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
cfg_if::cfg_if! {
22
if #[cfg(any(
3+
all(target_os = "windows", not(target_vendor = "win7")),
34
target_os = "linux",
45
target_os = "android",
56
target_os = "freebsd",
@@ -19,9 +20,9 @@ cfg_if::cfg_if! {
1920
))] {
2021
mod pthread;
2122
pub use pthread::{Mutex, raw};
22-
} else if #[cfg(target_os = "windows")] {
23-
mod windows;
24-
pub use windows::{Mutex, raw};
23+
} else if #[cfg(all(target_os = "windows", target_vendor = "win7"))] {
24+
mod windows7;
25+
pub use windows7::{Mutex, raw};
2526
} else if #[cfg(all(target_vendor = "fortanix", target_env = "sgx"))] {
2627
mod sgx;
2728
pub use sgx::Mutex;

library/std/src/sys/locks/rwlock/futex.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -291,7 +291,10 @@ impl RwLock {
291291
}
292292

293293
/// Spin for a while, but stop directly at the given condition.
294+
///
295+
/// We avoid spinning on Windows because the futex implementation spins enough.
294296
#[inline]
297+
#[cfg(not(windows))]
295298
fn spin_until(&self, f: impl Fn(u32) -> bool) -> u32 {
296299
let mut spin = 100; // Chosen by fair dice roll.
297300
loop {
@@ -304,6 +307,12 @@ impl RwLock {
304307
}
305308
}
306309

310+
#[inline]
311+
#[cfg(windows)]
312+
fn spin_until(&self, _f: impl Fn(u32) -> bool) -> u32 {
313+
self.state.load(Relaxed)
314+
}
315+
307316
#[inline]
308317
fn spin_write(&self) -> u32 {
309318
// Stop spinning when it's unlocked or when there's waiting writers, to keep things somewhat fair.

library/std/src/sys/locks/rwlock/mod.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
cfg_if::cfg_if! {
22
if #[cfg(any(
3+
all(target_os = "windows", not(target_vendor = "win7")),
34
target_os = "linux",
45
target_os = "android",
56
target_os = "freebsd",
@@ -14,9 +15,9 @@ cfg_if::cfg_if! {
1415
} else if #[cfg(target_family = "unix")] {
1516
mod queue;
1617
pub use queue::RwLock;
17-
} else if #[cfg(target_os = "windows")] {
18-
mod windows;
19-
pub use windows::RwLock;
18+
} else if #[cfg(all(target_os = "windows", target_vendor = "win7"))] {
19+
mod windows7;
20+
pub use windows7::RwLock;
2021
} else if #[cfg(all(target_vendor = "fortanix", target_env = "sgx"))] {
2122
mod sgx;
2223
pub use sgx::RwLock;

library/std/src/sys/pal/windows/c.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ pub type LPVOID = *mut c_void;
3636
pub type LPWCH = *mut WCHAR;
3737
pub type LPWSTR = *mut WCHAR;
3838

39+
#[cfg(target_vendor = "win7")]
3940
pub type PSRWLOCK = *mut SRWLOCK;
4041

4142
pub type socklen_t = c_int;
@@ -50,7 +51,9 @@ pub const INVALID_HANDLE_VALUE: HANDLE = ::core::ptr::without_provenance_mut(-1i
5051
pub const EXIT_SUCCESS: u32 = 0;
5152
pub const EXIT_FAILURE: u32 = 1;
5253

54+
#[cfg(target_vendor = "win7")]
5355
pub const CONDITION_VARIABLE_INIT: CONDITION_VARIABLE = CONDITION_VARIABLE { Ptr: ptr::null_mut() };
56+
#[cfg(target_vendor = "win7")]
5457
pub const SRWLOCK_INIT: SRWLOCK = SRWLOCK { Ptr: ptr::null_mut() };
5558
pub const INIT_ONCE_STATIC_INIT: INIT_ONCE = INIT_ONCE { Ptr: ptr::null_mut() };
5659

@@ -373,6 +376,7 @@ extern "system" {
373376
dwmilliseconds: u32,
374377
) -> BOOL;
375378
pub fn WakeByAddressSingle(address: *const c_void);
379+
pub fn WakeByAddressAll(address: *const c_void);
376380
}
377381

378382
#[cfg(target_vendor = "win7")]
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
use crate::sys::api;
2+
use crate::sys::c;
3+
use crate::sys::dur2timeout;
4+
use core::ffi::c_void;
5+
use core::mem;
6+
use core::ptr;
7+
use core::time::Duration;
8+
9+
#[inline(always)]
10+
pub fn wait_on_address<T, U>(address: &T, compare: U, timeout: Option<Duration>) -> bool {
11+
assert_eq!(mem::size_of::<T>(), mem::size_of::<U>());
12+
unsafe {
13+
let addr = ptr::addr_of!(*address).cast::<c_void>();
14+
let size = mem::size_of::<T>();
15+
let compare_addr = ptr::addr_of!(compare).cast::<c_void>();
16+
let timeout = timeout.map(dur2timeout).unwrap_or(c::INFINITE);
17+
c::WaitOnAddress(addr, compare_addr, size, timeout) == c::TRUE
18+
}
19+
}
20+
21+
#[inline(always)]
22+
pub fn wake_by_address_single<T>(address: &T) -> bool {
23+
unsafe {
24+
let addr = ptr::addr_of!(*address).cast::<c_void>();
25+
c::WakeByAddressSingle(addr);
26+
false
27+
}
28+
}
29+
30+
#[inline(always)]
31+
pub fn wake_by_address_all<T>(address: &T) {
32+
unsafe {
33+
let addr = ptr::addr_of!(*address).cast::<c_void>();
34+
c::WakeByAddressAll(addr);
35+
}
36+
}
37+
38+
#[inline(always)]
39+
pub fn futex_wait<T, U>(futex: &T, expected: U, timeout: Option<Duration>) -> bool {
40+
// return false only on timeout
41+
if wait_on_address(futex, expected, timeout) {
42+
true
43+
} else {
44+
api::get_last_error().code != c::ERROR_TIMEOUT
45+
}
46+
}
47+
#[inline(always)]
48+
pub fn futex_wake<T>(futex: &T) -> bool {
49+
wake_by_address_single(futex);
50+
false
51+
}
52+
pub fn futex_wake_all<T>(futex: &T) {
53+
wake_by_address_all(futex)
54+
}

library/std/src/sys/pal/windows/mod.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@ pub mod args;
1717
pub mod c;
1818
pub mod env;
1919
pub mod fs;
20+
#[cfg(not(target_vendor = "win7"))]
21+
pub mod futex;
2022
pub mod handle;
2123
pub mod io;
2224
pub mod net;
@@ -39,7 +41,7 @@ cfg_if::cfg_if! {
3941
}
4042
}
4143

42-
mod api;
44+
pub(in crate::sys) mod api;
4345

4446
/// Map a Result<T, WinError> to io::Result<T>.
4547
trait IoResult<T> {

src/tools/miri/src/shims/windows/foreign_items.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -366,6 +366,12 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriInterpCxExt<'mir, 'tcx> {
366366

367367
this.WakeByAddressSingle(ptr_op)?;
368368
}
369+
"WakeByAddressAll" => {
370+
let [ptr_op] =
371+
this.check_shim(abi, Abi::System { unwind: false }, link_name, args)?;
372+
373+
this.WakeByAddressAll(ptr_op)?;
374+
}
369375

370376
// Dynamic symbol loading
371377
"GetProcAddress" => {

src/tools/miri/src/shims/windows/sync.rs

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -384,6 +384,21 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriInterpCxExt<'mir, 'tcx> {
384384

385385
Ok(())
386386
}
387+
fn WakeByAddressAll(&mut self, ptr_op: &OpTy<'tcx, Provenance>) -> InterpResult<'tcx> {
388+
let this = self.eval_context_mut();
389+
390+
let ptr = this.read_pointer(ptr_op)?;
391+
392+
// See the Linux futex implementation for why this fence exists.
393+
this.atomic_fence(AtomicFenceOrd::SeqCst)?;
394+
395+
while let Some(thread) = this.futex_wake(ptr.addr().bytes(), u32::MAX) {
396+
this.unblock_thread(thread);
397+
this.unregister_timeout_callback_if_exists(thread);
398+
}
399+
400+
Ok(())
401+
}
387402

388403
fn SleepConditionVariableSRW(
389404
&mut self,

0 commit comments

Comments
 (0)