Skip to content

Commit 6a938a6

Browse files
committed
std: implement RwLock::downgrade for the queue-based RwLock
1 parent 454329e commit 6a938a6

File tree

1 file changed

+159
-103
lines changed

1 file changed

+159
-103
lines changed

library/std/src/sys/sync/rwlock/queue.rs

+159-103
Original file line numberDiff line numberDiff line change
@@ -39,16 +39,16 @@
3939
//!
4040
//! ## State
4141
//!
42-
//! A single [`AtomicPtr`] is used as state variable. The lowest three bits are used
42+
//! A single [`AtomicPtr`] is used as state variable. The lowest four bits are used
4343
//! to indicate the meaning of the remaining bits:
4444
//!
45-
//! | [`LOCKED`] | [`QUEUED`] | [`QUEUE_LOCKED`] | Remaining | |
46-
//! |:-----------|:-----------|:-----------------|:-------------|:----------------------------------------------------------------------------------------------------------------------------|
47-
//! | 0 | 0 | 0 | 0 | The lock is unlocked, no threads are waiting |
48-
//! | 1 | 0 | 0 | 0 | The lock is write-locked, no threads waiting |
49-
//! | 1 | 0 | 0 | n > 0 | The lock is read-locked with n readers |
50-
//! | 0 | 1 | * | `*mut Node` | The lock is unlocked, but some threads are waiting. Only writers may lock the lock |
51-
//! | 1 | 1 | * | `*mut Node` | The lock is locked, but some threads are waiting. If the lock is read-locked, the last queue node contains the reader count |
45+
//! | [`DOWNGRADE`] | [`LOCKED`] | [`QUEUED`] | [`QUEUE_LOCKED`] | Remaining | |
46+
//! |:--------------|------------|:-----------|:-----------------|:-------------|:----------------------------------------------------------------------------------------------------------------------------|
47+
//! | 0 | 0 | 0 | 0 | 0 | The lock is unlocked, no threads are waiting |
48+
//! | 0 | 1 | 0 | 0 | 0 | The lock is write-locked, no threads waiting |
49+
//! | 0 | 1 | 0 | 0 | n > 0 | The lock is read-locked with n readers |
50+
//! | 0 | 0 | 1 | * | `*mut Node` | The lock is unlocked, but some threads are waiting. Only writers may lock the lock |
51+
//! | * | 1 | 1 | * | `*mut Node` | The lock is locked, but some threads are waiting. If the lock is read-locked, the last queue node contains the reader count |
5252
//!
5353
//! ## Waiter queue
5454
//!
@@ -94,9 +94,10 @@
9494
//! queue, which drastically improves performance, and after unlocking the lock
9595
//! to wake the next waiter(s). This is done atomically at the same time as the
9696
//! enqueuing/unlocking operation. The thread releasing the `QUEUE_LOCK` bit
97-
//! will check the state of the lock and wake up waiters as appropriate. This
98-
//! guarantees forward-progress even if the unlocking thread could not acquire
99-
//! the queue lock.
97+
//! will check the state of the lock (in particular, whether a downgrade was
98+
//! requested using the [`DOWNGRADE`] bit) and wake up waiters as appropriate.
99+
//! This guarantees forward-progress even if the unlocking or downgrading thread
100+
//! could not acquire the queue lock.
100101
//!
101102
//! ## Memory orderings
102103
//!
@@ -129,14 +130,15 @@ const UNLOCKED: State = without_provenance_mut(0);
129130
const LOCKED: usize = 1;
130131
const QUEUED: usize = 2;
131132
const QUEUE_LOCKED: usize = 4;
132-
const SINGLE: usize = 8;
133-
const MASK: usize = !(QUEUE_LOCKED | QUEUED | LOCKED);
133+
const DOWNGRADE: usize = 8;
134+
const SINGLE: usize = 16;
135+
const STATE: usize = DOWNGRADE | QUEUE_LOCKED | QUEUED | LOCKED;
136+
const MASK: usize = !STATE;
134137

135138
/// Marks the state as write-locked, if possible.
136139
#[inline]
137140
fn write_lock(state: State) -> Option<State> {
138-
let state = state.wrapping_byte_add(LOCKED);
139-
if state.addr() & LOCKED == LOCKED { Some(state) } else { None }
141+
if state.addr() & LOCKED == 0 { Some(state.wrapping_byte_add(LOCKED)) } else { None }
140142
}
141143

142144
/// Marks the state as read-locked, if possible.
@@ -175,7 +177,7 @@ impl AtomicLink {
175177
}
176178
}
177179

178-
#[repr(align(8))]
180+
#[repr(align(16))]
179181
struct Node {
180182
next: AtomicLink,
181183
prev: AtomicLink,
@@ -272,6 +274,25 @@ unsafe fn add_backlinks_and_find_tail(head: NonNull<Node>) -> NonNull<Node> {
272274
}
273275
}
274276

277+
/// [`complete`](Node::complete)s all threads in the queue ending with `tail`.
278+
///
279+
/// # Safety
280+
/// * `tail` must be the tail of a fully linked queue.
281+
/// * The current thread must have exclusive access to that queue.
282+
unsafe fn complete_all(tail: NonNull<Node>) {
283+
let mut current = tail;
284+
loop {
285+
let prev = unsafe { current.as_ref().prev.get() };
286+
unsafe {
287+
Node::complete(current);
288+
}
289+
match prev {
290+
Some(prev) => current = prev,
291+
None => return,
292+
}
293+
}
294+
}
295+
275296
pub struct RwLock {
276297
state: AtomicState,
277298
}
@@ -404,6 +425,13 @@ impl RwLock {
404425
if state.addr() & QUEUED == 0 {
405426
let count = state.addr() - (SINGLE | LOCKED);
406427
Some(if count > 0 { without_provenance_mut(count | LOCKED) } else { UNLOCKED })
428+
} else if state.addr() & DOWNGRADE != 0 {
429+
// This thread used to have exclusive access, but requested a
430+
// downgrade. This has not been completed yet, so we still have
431+
// exclusive access. Retract the downgrade request and unlock,
432+
// but leave waking up new threads to the thread that already
433+
// holds the queue lock.
434+
Some(state.wrapping_byte_sub(DOWNGRADE | LOCKED))
407435
} else {
408436
None
409437
}
@@ -422,9 +450,9 @@ impl RwLock {
422450

423451
// SAFETY:
424452
// Because new read-locks cannot be acquired while threads are queued,
425-
// all queue-lock owners will observe the set `LOCKED` bit. Because they
426-
// do not modify the queue while there is a lock owner, the queue will
427-
// not be removed from here.
453+
// all queue-lock owners will observe the set `LOCKED` bit. Because no
454+
// downgrade can be in progress (we checked above), they hence do not
455+
// modify the queue, so the queue will not be removed from here.
428456
let tail = unsafe { add_backlinks_and_find_tail(to_node(state)).as_ref() };
429457
// The lock count is stored in the `next` field of `tail`.
430458
// Decrement it, making sure to observe all changes made to the queue
@@ -451,74 +479,88 @@ impl RwLock {
451479
}
452480
}
453481

482+
/// # Safety
483+
/// * The lock must be exclusively owned by this thread.
484+
/// * There must be threads queued on the lock.
485+
#[cold]
486+
unsafe fn unlock_contended(&self, mut state: State) {
487+
debug_assert!(state.addr() & STATE == (QUEUED | LOCKED));
488+
loop {
489+
// Atomically release the lock and try to acquire the queue lock.
490+
let next = state.wrapping_byte_sub(LOCKED);
491+
if state.addr() & QUEUE_LOCKED != 0 {
492+
// Another thread already holds the queue lock, leave waking up
493+
// waiters to it.
494+
match self.state.compare_exchange_weak(state, next, Release, Relaxed) {
495+
Ok(_) => return,
496+
Err(new) => state = new,
497+
}
498+
} else {
499+
// Acquire the queue lock and wake up the next waiter.
500+
let next = next.wrapping_byte_add(QUEUE_LOCKED);
501+
match self.state.compare_exchange_weak(state, next, AcqRel, Relaxed) {
502+
Ok(_) => unsafe {
503+
return self.unlock_queue(next);
504+
},
505+
Err(new) => state = new,
506+
}
507+
}
508+
}
509+
}
510+
454511
#[inline]
455512
pub unsafe fn downgrade(&self) {
456-
// Atomically set to read-locked with a single reader, without any waiting threads.
457-
if let Err(mut state) = self.state.compare_exchange(
513+
// Atomically set to read-locked with a single reader, without any
514+
// waiting threads.
515+
if let Err(state) = self.state.compare_exchange(
458516
without_provenance_mut(LOCKED),
459-
without_provenance_mut(LOCKED | SINGLE),
517+
without_provenance_mut(SINGLE | LOCKED),
460518
Release,
461519
Relaxed,
462520
) {
463-
// Attempt to grab the queue lock.
464-
loop {
465-
let next = state.map_addr(|addr| addr | QUEUE_LOCKED);
466-
match self.state.compare_exchange(state, next, AcqRel, Relaxed) {
467-
Err(new_state) => state = new_state,
468-
Ok(new_state) => {
469-
assert_eq!(
470-
new_state.mask(!MASK).addr(),
471-
LOCKED | QUEUED | QUEUE_LOCKED,
472-
"{:p}",
473-
new_state
474-
);
475-
state = new_state;
476-
break;
477-
}
478-
}
479-
}
480-
481-
assert_eq!(state.mask(!MASK).addr(), LOCKED | QUEUED | QUEUE_LOCKED);
482-
483-
// SAFETY: We have the queue lock so all safety contracts are fulfilled.
484-
let tail = unsafe { add_backlinks_and_find_tail(to_node(state)).as_ref() };
485-
486-
// Increment the reader count from 0 to 1.
487-
assert_eq!(
488-
tail.next.0.fetch_byte_add(SINGLE, AcqRel).addr(),
489-
0,
490-
"Reader count was not zero while we had the write lock"
491-
);
492-
493-
// Release the queue lock.
494-
self.state.fetch_byte_sub(QUEUE_LOCKED, Release);
521+
// The only way the state can have changed is if there are threads
522+
// queued. Wake all of them up.
523+
unsafe { self.downgrade_slow(state) }
495524
}
496525
}
497526

498-
/// # Safety
499-
/// * The lock must be exclusively owned by this thread.
500-
/// * There must be threads queued on the lock.
501527
#[cold]
502-
unsafe fn unlock_contended(&self, mut state: State) {
528+
unsafe fn downgrade_slow(&self, mut state: State) {
529+
debug_assert!(state.addr() & (DOWNGRADE | QUEUED | LOCKED) == (QUEUED | LOCKED));
530+
// Attempt to grab the entire waiter queue.
503531
loop {
504-
// Atomically release the lock and try to acquire the queue lock.
505-
let next = state.map_addr(|a| (a & !LOCKED) | QUEUE_LOCKED);
506-
match self.state.compare_exchange_weak(state, next, AcqRel, Relaxed) {
507-
// The queue lock was acquired. Release it, waking up the next
508-
// waiter in the process.
509-
Ok(_) if state.addr() & QUEUE_LOCKED == 0 => unsafe {
510-
return self.unlock_queue(next);
511-
},
512-
// Another thread already holds the queue lock, leave waking up
513-
// waiters to it.
514-
Ok(_) => return,
515-
Err(new) => state = new,
532+
if state.addr() & QUEUE_LOCKED != 0 {
533+
// Another thread already holds the queue lock. Tell it to wake
534+
// up all waiters. If it completes before we release our lock,
535+
// the effect will be just the same as if we had changed the
536+
// state below. Otherwise, the `DOWNGRADE` bit will still be
537+
// set, meaning `read_unlock` will realize that the lock is still
538+
// exclusively locked and act accordingly.
539+
let next = state.wrapping_byte_add(DOWNGRADE);
540+
match self.state.compare_exchange_weak(state, next, Release, Relaxed) {
541+
Ok(_) => return,
542+
Err(new) => state = new,
543+
}
544+
} else {
545+
// Grab the whole queue.
546+
let next = ptr::without_provenance_mut(SINGLE | LOCKED);
547+
if let Err(new) = self.state.compare_exchange_weak(state, next, AcqRel, Relaxed) {
548+
state = new;
549+
continue;
550+
}
551+
552+
// Wake up all waiters in FIFO order.
553+
let tail = unsafe { add_backlinks_and_find_tail(to_node(state)) };
554+
// SAFETY: `tail` was just computed, meaning the whole queue is
555+
// linked. Since the queue was not locked and we are not in read
556+
// mode, we have complete control over this queue.
557+
return unsafe { complete_all(tail) };
516558
}
517559
}
518560
}
519561

520-
/// Unlocks the queue. If the lock is unlocked, wakes up the next eligible
521-
/// thread(s).
562+
/// Unlocks the queue. Wakes up all threads if a downgrade was requested,
563+
/// otherwise wakes up the next eligible thread(s) if the lock is unlocked.
522564
///
523565
/// # Safety
524566
/// The queue lock must be held by the current thread.
@@ -528,12 +570,13 @@ impl RwLock {
528570
loop {
529571
let tail = unsafe { add_backlinks_and_find_tail(to_node(state)) };
530572

531-
if state.addr() & LOCKED == LOCKED {
532-
// Another thread has locked the lock. Leave waking up waiters
533-
// to them by releasing the queue lock.
573+
if state.addr() & (DOWNGRADE | LOCKED) == LOCKED {
574+
// Another thread has locked the lock and no downgrade was
575+
// requested. Leave waking up waiters to them by releasing
576+
// the queue lock.
534577
match self.state.compare_exchange_weak(
535578
state,
536-
state.mask(!QUEUE_LOCKED),
579+
state.wrapping_byte_sub(QUEUE_LOCKED),
537580
Release,
538581
Acquire,
539582
) {
@@ -545,11 +588,20 @@ impl RwLock {
545588
}
546589
}
547590

591+
// Since we hold the queue lock and downgrades cannot be requested
592+
// if the lock is already read-locked, we have exclusive control
593+
// over the queue here and can make modifications.
594+
595+
let downgrade = state.addr() & DOWNGRADE != 0;
548596
let is_writer = unsafe { tail.as_ref().write };
549-
if is_writer && let Some(prev) = unsafe { tail.as_ref().prev.get() } {
550-
// `tail` is a writer and there is a node before `tail`.
551-
// Split off `tail`.
597+
if !downgrade
598+
&& is_writer
599+
&& let Some(prev) = unsafe { tail.as_ref().prev.get() }
600+
{
601+
// If we are not downgrading and the next thread is a writer,
602+
// only wake up that thread.
552603

604+
// Split off `tail`.
553605
// There are no set `tail` links before the node pointed to by
554606
// `state`, so the first non-null tail field will be current
555607
// (invariant 2). Invariant 4 is fullfilled since `find_tail`
@@ -558,41 +610,45 @@ impl RwLock {
558610
to_node(state).as_ref().tail.set(Some(prev));
559611
}
560612

561-
// Release the queue lock. Doing this by subtraction is more
562-
// efficient on modern processors since it is a single instruction
563-
// instead of an update loop, which will fail if new threads are
564-
// added to the list.
565-
self.state.fetch_byte_sub(QUEUE_LOCKED, Release);
613+
// Try to release the queue lock. We need to check the state
614+
// again since another thread might have acquired the lock
615+
// and requested a downgrade.
616+
let next = state.wrapping_byte_sub(QUEUE_LOCKED);
617+
if let Err(new) = self.state.compare_exchange_weak(state, next, Release, Acquire) {
618+
// Undo the tail modification above, so that we can find the
619+
// tail again above. As mentioned above, we have exclusive
620+
// control over the queue, so no other thread could have
621+
// noticed the change.
622+
unsafe {
623+
to_node(state).as_ref().tail.set(Some(tail));
624+
}
625+
state = new;
626+
continue;
627+
}
566628

567629
// The tail was split off and the lock released. Mark the node as
568630
// completed.
569631
unsafe {
570632
return Node::complete(tail);
571633
}
572634
} else {
573-
// The next waiter is a reader or the queue only consists of one
574-
// waiter. Just wake all threads.
575-
576-
// The lock cannot be locked (checked above), so mark it as
577-
// unlocked to reset the queue.
578-
if let Err(new) =
579-
self.state.compare_exchange_weak(state, UNLOCKED, Release, Acquire)
580-
{
635+
// Either we are downgrading, the next waiter is a reader or the
636+
// queue only consists of one waiter. In any case, just wake all
637+
// threads.
638+
639+
// Clear the queue.
640+
let next =
641+
if downgrade { ptr::without_provenance_mut(SINGLE | LOCKED) } else { UNLOCKED };
642+
if let Err(new) = self.state.compare_exchange_weak(state, next, Release, Acquire) {
581643
state = new;
582644
continue;
583645
}
584646

585-
let mut current = tail;
586-
loop {
587-
let prev = unsafe { current.as_ref().prev.get() };
588-
unsafe {
589-
// There must be threads waiting.
590-
Node::complete(current);
591-
}
592-
match prev {
593-
Some(prev) => current = prev,
594-
None => return,
595-
}
647+
// SAFETY: we computed `tail` above, and no new nodes can have
648+
// been added since (otherwise the CAS above would have failed).
649+
// Thus we have complete control over the whole queue.
650+
unsafe {
651+
return complete_all(tail);
596652
}
597653
}
598654
}

0 commit comments

Comments
 (0)