Skip to content

Commit e9edadf

Browse files
author
Stjepan Glavina
committed
Fix a deadlock in channel
1 parent 20cdf73 commit e9edadf

File tree

4 files changed

+174
-182
lines changed

4 files changed

+174
-182
lines changed

src/sync/channel.rs

+43-52
Original file line numberDiff line numberDiff line change
@@ -138,41 +138,34 @@ impl<T> Sender<T> {
138138
type Output = ();
139139

140140
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
141-
let msg = self.msg.take().unwrap();
142-
143-
// Try sending the message.
144-
let poll = match self.channel.try_send(msg) {
145-
Ok(()) => Poll::Ready(()),
146-
Err(TrySendError::Disconnected(msg)) => {
147-
self.msg = Some(msg);
148-
Poll::Pending
141+
loop {
142+
let msg = self.msg.take().unwrap();
143+
144+
// If the current task is in the set, remove it.
145+
if let Some(key) = self.opt_key.take() {
146+
self.channel.send_wakers.remove(key);
149147
}
150-
Err(TrySendError::Full(msg)) => {
151-
// Insert this send operation.
152-
match self.opt_key {
153-
None => self.opt_key = Some(self.channel.send_wakers.insert(cx)),
154-
Some(key) => self.channel.send_wakers.update(key, cx),
148+
149+
// Try sending the message.
150+
match self.channel.try_send(msg) {
151+
Ok(()) => return Poll::Ready(()),
152+
Err(TrySendError::Disconnected(msg)) => {
153+
self.msg = Some(msg);
154+
return Poll::Pending;
155155
}
156+
Err(TrySendError::Full(msg)) => {
157+
self.msg = Some(msg);
158+
159+
// Insert this send operation.
160+
self.opt_key = Some(self.channel.send_wakers.insert(cx));
156161

157-
// Try sending the message again.
158-
match self.channel.try_send(msg) {
159-
Ok(()) => Poll::Ready(()),
160-
Err(TrySendError::Disconnected(msg)) | Err(TrySendError::Full(msg)) => {
161-
self.msg = Some(msg);
162-
Poll::Pending
162+
// If the channel is still full and not disconnected, return.
163+
if self.channel.is_full() && !self.channel.is_disconnected() {
164+
return Poll::Pending;
163165
}
164166
}
165167
}
166-
};
167-
168-
if poll.is_ready() {
169-
// If the current task is in the set, remove it.
170-
if let Some(key) = self.opt_key.take() {
171-
self.channel.send_wakers.complete(key);
172-
}
173168
}
174-
175-
poll
176169
}
177170
}
178171

@@ -543,34 +536,27 @@ fn poll_recv<T>(
543536
opt_key: &mut Option<usize>,
544537
cx: &mut Context<'_>,
545538
) -> Poll<Option<T>> {
546-
// Try receiving a message.
547-
let poll = match channel.try_recv() {
548-
Ok(msg) => Poll::Ready(Some(msg)),
549-
Err(TryRecvError::Disconnected) => Poll::Ready(None),
550-
Err(TryRecvError::Empty) => {
551-
// Insert this receive operation.
552-
match *opt_key {
553-
None => *opt_key = Some(wakers.insert(cx)),
554-
Some(key) => wakers.update(key, cx),
555-
}
556-
557-
// Try receiving a message again.
558-
match channel.try_recv() {
559-
Ok(msg) => Poll::Ready(Some(msg)),
560-
Err(TryRecvError::Disconnected) => Poll::Ready(None),
561-
Err(TryRecvError::Empty) => Poll::Pending,
562-
}
563-
}
564-
};
565-
566-
if poll.is_ready() {
539+
loop {
567540
// If the current task is in the set, remove it.
568541
if let Some(key) = opt_key.take() {
569-
wakers.complete(key);
542+
wakers.remove(key);
570543
}
571-
}
572544

573-
poll
545+
// Try receiving a message.
546+
match channel.try_recv() {
547+
Ok(msg) => return Poll::Ready(Some(msg)),
548+
Err(TryRecvError::Disconnected) => return Poll::Ready(None),
549+
Err(TryRecvError::Empty) => {
550+
// Insert this receive operation.
551+
*opt_key = Some(wakers.insert(cx));
552+
553+
// If the channel is still empty and not disconnected, return.
554+
if channel.is_empty() && !channel.is_disconnected() {
555+
return Poll::Pending;
556+
}
557+
}
558+
}
559+
}
574560
}
575561

576562
/// A slot in a channel.
@@ -862,6 +848,11 @@ impl<T> Channel<T> {
862848
}
863849
}
864850

851+
/// Returns `true` if the channel is disconnected.
852+
pub fn is_disconnected(&self) -> bool {
853+
self.tail.load(Ordering::SeqCst) & self.mark_bit != 0
854+
}
855+
865856
/// Returns `true` if the channel is empty.
866857
fn is_empty(&self) -> bool {
867858
let head = self.head.load(Ordering::SeqCst);

src/sync/mutex.rs

+18-24
Original file line numberDiff line numberDiff line change
@@ -104,32 +104,26 @@ impl<T> Mutex<T> {
104104
type Output = MutexGuard<'a, T>;
105105

106106
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
107-
let poll = match self.mutex.try_lock() {
108-
Some(guard) => Poll::Ready(guard),
109-
None => {
110-
// Insert this lock operation.
111-
match self.opt_key {
112-
None => self.opt_key = Some(self.mutex.wakers.insert(cx)),
113-
Some(key) => self.mutex.wakers.update(key, cx),
114-
}
115-
116-
// Try locking again because it's possible the mutex got unlocked just
117-
// before the current task was inserted into the waker set.
118-
match self.mutex.try_lock() {
119-
Some(guard) => Poll::Ready(guard),
120-
None => Poll::Pending,
121-
}
122-
}
123-
};
124-
125-
if poll.is_ready() {
107+
loop {
126108
// If the current task is in the set, remove it.
127109
if let Some(key) = self.opt_key.take() {
128-
self.mutex.wakers.complete(key);
110+
self.mutex.wakers.remove(key);
129111
}
130-
}
131112

132-
poll
113+
// Try acquiring the lock.
114+
match self.mutex.try_lock() {
115+
Some(guard) => return Poll::Ready(guard),
116+
None => {
117+
// Insert this lock operation.
118+
self.opt_key = Some(self.mutex.wakers.insert(cx));
119+
120+
// If the mutex is still locked, return.
121+
if self.mutex.locked.load(Ordering::SeqCst) {
122+
return Poll::Pending;
123+
}
124+
}
125+
}
126+
}
133127
}
134128
}
135129

@@ -266,8 +260,8 @@ impl<T> Drop for MutexGuard<'_, T> {
266260
// Use `SeqCst` ordering to synchronize with `WakerSet::insert()` and `WakerSet::update()`.
267261
self.0.locked.store(false, Ordering::SeqCst);
268262

269-
// Notify one blocked `lock()` operation.
270-
self.0.wakers.notify_one();
263+
// Notify a blocked `lock()` operation if none were notified already.
264+
self.0.wakers.notify_any();
271265
}
272266
}
273267

src/sync/rwlock.rs

+40-50
Original file line numberDiff line numberDiff line change
@@ -108,32 +108,26 @@ impl<T> RwLock<T> {
108108
type Output = RwLockReadGuard<'a, T>;
109109

110110
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
111-
let poll = match self.lock.try_read() {
112-
Some(guard) => Poll::Ready(guard),
113-
None => {
114-
// Insert this lock operation.
115-
match self.opt_key {
116-
None => self.opt_key = Some(self.lock.read_wakers.insert(cx)),
117-
Some(key) => self.lock.read_wakers.update(key, cx),
118-
}
119-
120-
// Try locking again because it's possible the lock got unlocked just
121-
// before the current task was inserted into the waker set.
122-
match self.lock.try_read() {
123-
Some(guard) => Poll::Ready(guard),
124-
None => Poll::Pending,
125-
}
126-
}
127-
};
128-
129-
if poll.is_ready() {
111+
loop {
130112
// If the current task is in the set, remove it.
131113
if let Some(key) = self.opt_key.take() {
132-
self.lock.read_wakers.complete(key);
114+
self.lock.read_wakers.remove(key);
133115
}
134-
}
135116

136-
poll
117+
// Try acquiring a read lock.
118+
match self.lock.try_read() {
119+
Some(guard) => return Poll::Ready(guard),
120+
None => {
121+
// Insert this lock operation.
122+
self.opt_key = Some(self.lock.read_wakers.insert(cx));
123+
124+
// If the lock is still acquired for writing, return.
125+
if self.lock.state.load(Ordering::SeqCst) & WRITE_LOCK != 0 {
126+
return Poll::Pending;
127+
}
128+
}
129+
}
130+
}
137131
}
138132
}
139133

@@ -143,9 +137,10 @@ impl<T> RwLock<T> {
143137
if let Some(key) = self.opt_key {
144138
self.lock.read_wakers.cancel(key);
145139

146-
// If there are no active readers, wake one of the writers.
140+
// If there are no active readers, notify a blocked writer if none were
141+
// notified already.
147142
if self.lock.state.load(Ordering::SeqCst) & READ_COUNT_MASK == 0 {
148-
self.lock.write_wakers.notify_one();
143+
self.lock.write_wakers.notify_any();
149144
}
150145
}
151146
}
@@ -238,32 +233,26 @@ impl<T> RwLock<T> {
238233
type Output = RwLockWriteGuard<'a, T>;
239234

240235
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
241-
let poll = match self.lock.try_write() {
242-
Some(guard) => Poll::Ready(guard),
243-
None => {
244-
// Insert this lock operation.
245-
match self.opt_key {
246-
None => self.opt_key = Some(self.lock.write_wakers.insert(cx)),
247-
Some(key) => self.lock.write_wakers.update(key, cx),
248-
}
249-
250-
// Try locking again because it's possible the lock got unlocked just
251-
// before the current task was inserted into the waker set.
252-
match self.lock.try_write() {
253-
Some(guard) => Poll::Ready(guard),
254-
None => Poll::Pending,
255-
}
256-
}
257-
};
258-
259-
if poll.is_ready() {
236+
loop {
260237
// If the current task is in the set, remove it.
261238
if let Some(key) = self.opt_key.take() {
262-
self.lock.write_wakers.complete(key);
239+
self.lock.write_wakers.remove(key);
263240
}
264-
}
265241

266-
poll
242+
// Try acquiring a write lock.
243+
match self.lock.try_write() {
244+
Some(guard) => return Poll::Ready(guard),
245+
None => {
246+
// Insert this lock operation.
247+
self.opt_key = Some(self.lock.write_wakers.insert(cx));
248+
249+
// If the lock is still acquired for reading or writing, return.
250+
if self.lock.state.load(Ordering::SeqCst) != 0 {
251+
return Poll::Pending;
252+
}
253+
}
254+
}
255+
}
267256
}
268257
}
269258

@@ -392,9 +381,9 @@ impl<T> Drop for RwLockReadGuard<'_, T> {
392381
fn drop(&mut self) {
393382
let state = self.0.state.fetch_sub(ONE_READ, Ordering::SeqCst);
394383

395-
// If this was the last read, wake one of the writers.
384+
// If this was the last reader, notify a blocked writer if none were notified already.
396385
if state & READ_COUNT_MASK == ONE_READ {
397-
self.0.write_wakers.notify_one();
386+
self.0.write_wakers.notify_any();
398387
}
399388
}
400389
}
@@ -431,8 +420,9 @@ impl<T> Drop for RwLockWriteGuard<'_, T> {
431420

432421
// Notify all blocked readers.
433422
if !self.0.read_wakers.notify_all() {
434-
// If there were no blocked readers, notify a blocked writer.
435-
self.0.write_wakers.notify_one();
423+
// If there were no blocked readers, notify a blocked writer if none were notified
424+
// already.
425+
self.0.write_wakers.notify_any();
436426
}
437427
}
438428
}

0 commit comments

Comments
 (0)