1
- use std:: alloc:: Layout ;
2
- use std:: cell:: Cell ;
3
- use std:: fmt;
4
- use std:: sync:: atomic:: { AtomicUsize , Ordering } ;
5
- use std:: task:: Waker ;
6
-
7
- use crossbeam_utils:: Backoff ;
1
+ use core:: alloc:: Layout ;
2
+ use core:: cell:: UnsafeCell ;
3
+ use core:: fmt;
4
+ use core:: sync:: atomic:: { AtomicUsize , Ordering } ;
5
+ use core:: task:: Waker ;
8
6
9
7
use crate :: raw:: TaskVTable ;
10
8
use crate :: state:: * ;
@@ -22,7 +20,7 @@ pub(crate) struct Header {
22
20
/// The task that is blocked on the `JoinHandle`.
23
21
///
24
22
/// This waker needs to be woken up once the task completes or is closed.
25
- pub ( crate ) awaiter : Cell < Option < Waker > > ,
23
+ pub ( crate ) awaiter : UnsafeCell < Option < Waker > > ,
26
24
27
25
/// The virtual table.
28
26
///
@@ -55,7 +53,7 @@ impl Header {
55
53
Ok ( _) => {
56
54
// Notify the awaiter that the task has been closed.
57
55
if state & AWAITER != 0 {
58
- self . notify ( ) ;
56
+ self . notify ( None ) ;
59
57
}
60
58
61
59
break ;
@@ -67,68 +65,105 @@ impl Header {
67
65
68
66
/// Notifies the awaiter blocked on this task.
69
67
///
70
- /// If there is a registered waker, it will be removed from the header and woken up .
68
+ /// If the awaiter is the same as the current waker, it will not be notified .
71
69
#[ inline]
72
- pub ( crate ) fn notify ( & self ) {
73
- if let Some ( waker) = self . swap_awaiter ( None ) {
74
- // We need a safeguard against panics because waking can panic.
75
- abort_on_panic ( || {
76
- waker. wake ( ) ;
77
- } ) ;
78
- }
79
- }
70
+ pub ( crate ) fn notify ( & self , current : Option < & Waker > ) {
71
+ // Mark the awaiter as being notified.
72
+ let state = self . state . fetch_or ( NOTIFYING , Ordering :: AcqRel ) ;
80
73
81
- /// Notifies the awaiter blocked on this task, unless its waker matches `current`.
82
- ///
83
- /// If there is a registered waker, it will be removed from the header in any case.
84
- #[ inline]
85
- pub ( crate ) fn notify_unless ( & self , current : & Waker ) {
86
- if let Some ( waker) = self . swap_awaiter ( None ) {
87
- if !waker. will_wake ( current) {
74
+ // If the awaiter was not being notified nor registered...
75
+ if state & ( NOTIFYING | REGISTERING ) == 0 {
76
+ // Take the waker out.
77
+ let waker = unsafe { ( * self . awaiter . get ( ) ) . take ( ) } ;
78
+
79
+ // Mark the state as not being notified anymore nor containing an awaiter.
80
+ self . state
81
+ . fetch_and ( !NOTIFYING & !AWAITER , Ordering :: Release ) ;
82
+
83
+ if let Some ( w) = waker {
88
84
// We need a safeguard against panics because waking can panic.
89
- abort_on_panic ( || {
90
- waker. wake ( ) ;
85
+ abort_on_panic ( || match current {
86
+ None => w. wake ( ) ,
87
+ Some ( c) if !w. will_wake ( c) => w. wake ( ) ,
88
+ Some ( _) => { }
91
89
} ) ;
92
90
}
93
91
}
94
92
}
95
93
96
- /// Swaps the awaiter for a new waker and returns the previous value.
94
+ /// Registers a new awaiter blocked on this task.
95
+ ///
96
+ /// This method is called when `JoinHandle` is polled and the task has not completed.
97
97
#[ inline]
98
- pub ( crate ) fn swap_awaiter ( & self , new : Option < Waker > ) -> Option < Waker > {
99
- let new_is_none = new. is_none ( ) ;
98
+ pub ( crate ) fn register ( & self , waker : & Waker ) {
99
+ // Load the state and synchronize with it.
100
+ let mut state = self . state . fetch_or ( 0 , Ordering :: Acquire ) ;
100
101
101
- // We're about to try acquiring the lock in a loop. If it's already being held by another
102
- // thread, we'll have to spin for a while so it's best to employ a backoff strategy.
103
- let backoff = Backoff :: new ( ) ;
104
102
loop {
105
- // Acquire the lock. If we're storing an awaiter, then also set the awaiter flag.
106
- let state = if new_is_none {
107
- self . state . fetch_or ( LOCKED , Ordering :: Acquire )
108
- } else {
109
- self . state . fetch_or ( LOCKED | AWAITER , Ordering :: Acquire )
110
- } ;
103
+ // There can't be two concurrent registrations because `JoinHandle` can only be polled
104
+ // by a unique pinned reference.
105
+ debug_assert ! ( state & REGISTERING == 0 ) ;
106
+
107
+ // If the state is being notified at this moment, just wake and return without
108
+ // registering.
109
+ if state & NOTIFYING != 0 {
110
+ waker. wake_by_ref ( ) ;
111
+ return ;
112
+ }
111
113
112
- // If the lock was acquired, break from the loop.
113
- if state & LOCKED == 0 {
114
- break ;
114
+ // Mark the state to let other threads know we're registering a new awaiter.
115
+ match self . state . compare_exchange_weak (
116
+ state,
117
+ state | REGISTERING ,
118
+ Ordering :: AcqRel ,
119
+ Ordering :: Acquire ,
120
+ ) {
121
+ Ok ( _) => {
122
+ state |= REGISTERING ;
123
+ break ;
124
+ }
125
+ Err ( s) => state = s,
115
126
}
127
+ }
116
128
117
- // Snooze for a little while because the lock is held by another thread.
118
- backoff. snooze ( ) ;
129
+ // Put the waker into the awaiter field.
130
+ unsafe {
131
+ abort_on_panic ( || ( * self . awaiter . get ( ) ) = Some ( waker. clone ( ) ) ) ;
119
132
}
120
133
121
- // Replace the awaiter.
122
- let old = self . awaiter . replace ( new) ;
134
+ // This variable will contain the newly registered waker if a notification comes in before
135
+ // we complete registration.
136
+ let mut waker = None ;
137
+
138
+ loop {
139
+ // If there was a notification, take the waker out of the awaiter field.
140
+ if state & NOTIFYING != 0 {
141
+ if let Some ( w) = unsafe { ( * self . awaiter . get ( ) ) . take ( ) } {
142
+ waker = Some ( w) ;
143
+ }
144
+ }
145
+
146
+ // The new state is not being notified nor registered, but there might or might not be
147
+ // an awaiter depending on whether there was a concurrent notification.
148
+ let new = if waker. is_none ( ) {
149
+ ( state & !NOTIFYING & !REGISTERING ) | AWAITER
150
+ } else {
151
+ state & !NOTIFYING & !REGISTERING & !AWAITER
152
+ } ;
123
153
124
- // Release the lock. If we've cleared the awaiter, then also unset the awaiter flag.
125
- if new_is_none {
126
- self . state . fetch_and ( !LOCKED & !AWAITER , Ordering :: Release ) ;
127
- } else {
128
- self . state . fetch_and ( !LOCKED , Ordering :: Release ) ;
154
+ match self
155
+ . state
156
+ . compare_exchange_weak ( state, new, Ordering :: AcqRel , Ordering :: Acquire )
157
+ {
158
+ Ok ( _) => break ,
159
+ Err ( s) => state = s,
160
+ }
129
161
}
130
162
131
- old
163
+ // If there was a notification during registration, wake the awaiter now.
164
+ if let Some ( w) = waker {
165
+ abort_on_panic ( || w. wake ( ) ) ;
166
+ }
132
167
}
133
168
134
169
/// Returns the offset at which the tag of type `T` is stored.
0 commit comments