@@ -20,7 +20,8 @@ use core::prelude::*;
20
20
use core:: borrow;
21
21
use core:: comm;
22
22
use core:: task;
23
- use core:: unstable:: sync:: { Exclusive , exclusive} ;
23
+ use core:: unstable:: sync:: { Exclusive , exclusive, UnsafeAtomicRcBox } ;
24
+ use core:: unstable:: atomics;
24
25
use core:: util;
25
26
26
27
/****************************************************************************
@@ -37,6 +38,7 @@ type SignalEnd = comm::ChanOne<()>;
37
38
struct Waitqueue { head : comm:: Port < SignalEnd > ,
38
39
tail : comm:: Chan < SignalEnd > }
39
40
41
+ #[ doc( hidden) ]
40
42
fn new_waitqueue ( ) -> Waitqueue {
41
43
let ( block_head, block_tail) = comm:: stream ( ) ;
42
44
Waitqueue { head : block_head, tail : block_tail }
@@ -166,22 +168,27 @@ impl Sem<~[Waitqueue]> {
166
168
// FIXME(#3588) should go inside of access()
167
169
#[ doc( hidden) ]
168
170
type SemRelease < ' self > = SemReleaseGeneric < ' self , ( ) > ;
171
+ #[ doc( hidden) ]
169
172
type SemAndSignalRelease < ' self > = SemReleaseGeneric < ' self , ~[ Waitqueue ] > ;
173
+ #[ doc( hidden) ]
170
174
struct SemReleaseGeneric < ' self , Q > { sem : & ' self Sem < Q > }
171
175
176
+ #[ doc( hidden) ]
172
177
#[ unsafe_destructor]
173
178
impl < ' self , Q : Owned > Drop for SemReleaseGeneric < ' self , Q > {
174
179
fn finalize ( & self ) {
175
180
self . sem . release ( ) ;
176
181
}
177
182
}
178
183
184
+ #[ doc( hidden) ]
179
185
fn SemRelease < ' r > ( sem : & ' r Sem < ( ) > ) -> SemRelease < ' r > {
180
186
SemReleaseGeneric {
181
187
sem : sem
182
188
}
183
189
}
184
190
191
+ #[ doc( hidden) ]
185
192
fn SemAndSignalRelease < ' r > ( sem : & ' r Sem < ~[ Waitqueue ] > )
186
193
-> SemAndSignalRelease < ' r > {
187
194
SemReleaseGeneric {
@@ -465,8 +472,23 @@ impl Mutex {
465
472
466
473
#[ doc( hidden) ]
467
474
struct RWlockInner {
475
+ // You might ask, "Why don't you need to use an atomic for the mode flag?"
476
+ // This flag affects the behaviour of readers (for plain readers, they
477
+ // assert on it; for downgraders, they use it to decide which mode to
478
+ // unlock for). Consider that the flag is only unset when the very last
479
+ // reader exits; therefore, it can never be unset during a reader/reader
480
+ // (or reader/downgrader) race.
481
+ // By the way, if we didn't care about the assert in the read unlock path,
482
+ // we could instead store the mode flag in write_downgrade's stack frame,
483
+ // and have the downgrade tokens store a borrowed pointer to it.
468
484
read_mode : bool ,
469
- read_count : uint
485
+ // The only way the count flag is ever accessed is with xadd. Since it is
486
+ // a read-modify-write operation, multiple xadds on different cores will
487
+ // always be consistent with respect to each other, so a monotonic/relaxed
488
+ // consistency ordering suffices (i.e., no extra barriers are needed).
489
+ // FIXME(#6598): The atomics module has no relaxed ordering flag, so I use
490
+ // acquire/release orderings superfluously. Change these someday.
491
+ read_count : atomics:: AtomicUint ,
470
492
}
471
493
472
494
/**
@@ -479,7 +501,7 @@ struct RWlockInner {
479
501
pub struct RWlock {
480
502
priv order_lock : Semaphore ,
481
503
priv access_lock : Sem < ~[ Waitqueue ] > ,
482
- priv state : Exclusive < RWlockInner >
504
+ priv state : UnsafeAtomicRcBox < RWlockInner > ,
483
505
}
484
506
485
507
/// Create a new rwlock, with one associated condvar.
@@ -490,10 +512,13 @@ pub fn RWlock() -> RWlock { rwlock_with_condvars(1) }
490
512
* Similar to mutex_with_condvars.
491
513
*/
492
514
pub fn rwlock_with_condvars ( num_condvars : uint ) -> RWlock {
493
- RWlock { order_lock : semaphore ( 1 ) ,
515
+ let state = UnsafeAtomicRcBox :: new ( RWlockInner {
516
+ read_mode : false ,
517
+ read_count : atomics:: AtomicUint :: new ( 0 ) ,
518
+ } ) ;
519
+ RWlock { order_lock : semaphore ( 1 ) ,
494
520
access_lock : new_sem_and_signal ( 1 , num_condvars) ,
495
- state : exclusive ( RWlockInner { read_mode : false ,
496
- read_count : 0 } ) }
521
+ state : state, }
497
522
}
498
523
499
524
impl RWlock {
@@ -513,20 +538,11 @@ impl RWlock {
513
538
unsafe {
514
539
do task:: unkillable {
515
540
do ( & self . order_lock ) . access {
516
- let mut first_reader = false ;
517
- do self. state . with |state| {
518
- first_reader = ( state. read_count == 0 ) ;
519
- state. read_count += 1 ;
520
- }
521
- if first_reader {
541
+ let state = & mut * self . state . get ( ) ;
542
+ let old_count = state. read_count . fetch_add ( 1 , atomics:: Acquire ) ;
543
+ if old_count == 0 {
522
544
( & self . access_lock ) . acquire ( ) ;
523
- do self. state . with |state| {
524
- // Must happen *after* getting access_lock. If
525
- // this is set while readers are waiting, but
526
- // while a writer holds the lock, the writer will
527
- // be confused if they downgrade-then-unlock.
528
- state. read_mode = true ;
529
- }
545
+ state. read_mode = true ;
530
546
}
531
547
}
532
548
release = Some ( RWlockReleaseRead ( self ) ) ;
@@ -606,12 +622,12 @@ impl RWlock {
606
622
* # Example
607
623
*
608
624
* ~~~ {.rust}
609
- * do lock.write_downgrade |write_mode | {
610
- * do (&write_mode ).write_cond |condvar| {
625
+ * do lock.write_downgrade |write_token | {
626
+ * do (&write_token ).write_cond |condvar| {
611
627
* ... exclusive access ...
612
628
* }
613
- * let read_mode = lock.downgrade(write_mode );
614
- * do (&read_mode ).read {
629
+ * let read_token = lock.downgrade(write_token );
630
+ * do (&read_token ).read {
615
631
* ... shared access ...
616
632
* }
617
633
* }
@@ -640,14 +656,15 @@ impl RWlock {
640
656
}
641
657
unsafe {
642
658
do task:: unkillable {
643
- let mut first_reader = false ;
644
- do self . state . with |state| {
645
- assert ! ( !state. read_mode) ;
646
- state. read_mode = true ;
647
- first_reader = ( state. read_count == 0 ) ;
648
- state. read_count += 1 ;
649
- }
650
- if !first_reader {
659
+ let state = & mut * self . state . get ( ) ;
660
+ assert ! ( !state. read_mode) ;
661
+ state. read_mode = true ;
662
+ // If a reader attempts to enter at this point, both the
663
+ // downgrader and reader will set the mode flag. This is fine.
664
+ let old_count = state. read_count . fetch_add ( 1 , atomics:: Release ) ;
665
+ // If another reader was already blocking, we need to hand-off
666
+ // the "reader cloud" access lock to them.
667
+ if old_count != 0 {
651
668
// Guaranteed not to let another writer in, because
652
669
// another reader was holding the order_lock. Hence they
653
670
// must be the one to get the access_lock (because all
@@ -667,29 +684,30 @@ struct RWlockReleaseRead<'self> {
667
684
lock : & ' self RWlock ,
668
685
}
669
686
687
+ #[ doc( hidden) ]
670
688
#[ unsafe_destructor]
671
689
impl < ' self > Drop for RWlockReleaseRead < ' self > {
672
690
fn finalize ( & self ) {
673
691
unsafe {
674
692
do task:: unkillable {
675
- let mut last_reader = false ;
676
- do self . lock . state . with |state| {
677
- assert ! ( state. read_mode) ;
678
- assert ! ( state. read_count > 0 ) ;
679
- state. read_count -= 1 ;
680
- if state. read_count == 0 {
681
- last_reader = true ;
682
- state. read_mode = false ;
683
- }
684
- }
685
- if last_reader {
693
+ let state = & mut * self . lock . state . get ( ) ;
694
+ assert ! ( state. read_mode) ;
695
+ let old_count = state. read_count . fetch_sub ( 1 , atomics:: Release ) ;
696
+ assert ! ( old_count > 0 ) ;
697
+ if old_count == 1 {
698
+ state. read_mode = false ;
699
+ // Note: this release used to be outside of a locked access
700
+ // to exclusive-protected state. If this code is ever
701
+ // converted back to such (instead of using atomic ops),
702
+ // this access MUST NOT go inside the exclusive access.
686
703
( & self . lock . access_lock ) . release ( ) ;
687
704
}
688
705
}
689
706
}
690
707
}
691
708
}
692
709
710
+ #[ doc( hidden) ]
693
711
fn RWlockReleaseRead < ' r > ( lock : & ' r RWlock ) -> RWlockReleaseRead < ' r > {
694
712
RWlockReleaseRead {
695
713
lock : lock
@@ -703,37 +721,42 @@ struct RWlockReleaseDowngrade<'self> {
703
721
lock : & ' self RWlock ,
704
722
}
705
723
724
+ #[ doc( hidden) ]
706
725
#[ unsafe_destructor]
707
726
impl < ' self > Drop for RWlockReleaseDowngrade < ' self > {
708
727
fn finalize ( & self ) {
709
728
unsafe {
710
729
do task:: unkillable {
711
- let mut writer_or_last_reader = false ;
712
- do self . lock . state . with |state| {
713
- if state. read_mode {
714
- assert ! ( state. read_count > 0 ) ;
715
- state. read_count -= 1 ;
716
- if state. read_count == 0 {
717
- // Case 1: Writer downgraded & was the last reader
718
- writer_or_last_reader = true ;
719
- state. read_mode = false ;
720
- } else {
721
- // Case 2: Writer downgraded & was not the last
722
- // reader
723
- }
724
- } else {
725
- // Case 3: Writer did not downgrade
730
+ let writer_or_last_reader;
731
+ // Check if we're releasing from read mode or from write mode.
732
+ let state = & mut * self . lock . state . get ( ) ;
733
+ if state. read_mode {
734
+ // Releasing from read mode.
735
+ let old_count = state. read_count . fetch_sub ( 1 , atomics:: Release ) ;
736
+ assert ! ( old_count > 0 ) ;
737
+ // Check if other readers remain.
738
+ if old_count == 1 {
739
+ // Case 1: Writer downgraded & was the last reader
726
740
writer_or_last_reader = true ;
741
+ state. read_mode = false ;
742
+ } else {
743
+ // Case 2: Writer downgraded & was not the last reader
744
+ writer_or_last_reader = false ;
727
745
}
746
+ } else {
747
+ // Case 3: Writer did not downgrade
748
+ writer_or_last_reader = true ;
728
749
}
729
750
if writer_or_last_reader {
751
+ // Nobody left inside; release the "reader cloud" lock.
730
752
( & self . lock . access_lock ) . release ( ) ;
731
753
}
732
754
}
733
755
}
734
756
}
735
757
}
736
758
759
+ #[ doc( hidden) ]
737
760
fn RWlockReleaseDowngrade < ' r > ( lock : & ' r RWlock )
738
761
-> RWlockReleaseDowngrade < ' r > {
739
762
RWlockReleaseDowngrade {
0 commit comments