Skip to content

Commit 27911f7

Browse files
committed
Add deadlock detection
1 parent 8b2ff87 commit 27911f7

File tree

4 files changed

+162
-16
lines changed

4 files changed

+162
-16
lines changed

rayon-core/src/lib.rs

Lines changed: 26 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,6 @@
2020
//! succeed.
2121
2222
#![doc(html_root_url = "https://docs.rs/rayon-core/1.7")]
23-
#![deny(missing_debug_implementations)]
24-
#![deny(missing_docs)]
25-
#![deny(unreachable_pub)]
2623

2724
use std::any::Any;
2825
use std::env;
@@ -56,6 +53,7 @@ pub mod tlv;
5653

5754
pub use self::join::{join, join_context};
5855
pub use self::registry::ThreadBuilder;
56+
pub use self::registry::{mark_blocked, mark_unblocked, Registry};
5957
pub use self::scope::{scope, Scope};
6058
pub use self::scope::{scope_fifo, ScopeFifo};
6159
pub use self::spawn::{spawn, spawn_fifo};
@@ -134,6 +132,9 @@ pub struct ThreadPoolBuilder<S = DefaultSpawn> {
134132
/// The stack size for the created worker threads
135133
stack_size: Option<usize>,
136134

135+
/// Closure invoked on deadlock.
136+
deadlock_handler: Option<Box<DeadlockHandler>>,
137+
137138
/// Closure invoked on worker thread start.
138139
start_handler: Option<Box<StartHandler>>,
139140

@@ -161,6 +162,9 @@ pub struct Configuration {
161162
/// may be invoked multiple times in parallel.
162163
type PanicHandler = dyn Fn(Box<dyn Any + Send>) + Send + Sync;
163164

165+
/// The type for a closure that gets invoked when the Rayon thread pool deadlocks
166+
type DeadlockHandler = dyn Fn() + Send + Sync;
167+
164168
/// The type for a closure that gets invoked when a thread starts. The
165169
/// closure is passed the index of the thread on which it is invoked.
166170
/// Note that this same closure may be invoked multiple times in parallel.
@@ -181,6 +185,7 @@ impl Default for ThreadPoolBuilder {
181185
stack_size: None,
182186
start_handler: None,
183187
exit_handler: None,
188+
deadlock_handler: None,
184189
spawn_handler: DefaultSpawn,
185190
breadth_first: false,
186191
}
@@ -365,6 +370,7 @@ impl<S> ThreadPoolBuilder<S> {
365370
stack_size: self.stack_size,
366371
start_handler: self.start_handler,
367372
exit_handler: self.exit_handler,
373+
deadlock_handler: self.deadlock_handler,
368374
breadth_first: self.breadth_first,
369375
}
370376
}
@@ -523,6 +529,20 @@ impl<S> ThreadPoolBuilder<S> {
523529
self.breadth_first
524530
}
525531

532+
/// Takes the current deadlock callback, leaving `None`.
533+
fn take_deadlock_handler(&mut self) -> Option<Box<DeadlockHandler>> {
534+
self.deadlock_handler.take()
535+
}
536+
537+
/// Set a callback to be invoked on current deadlock.
538+
pub fn deadlock_handler<H>(mut self, deadlock_handler: H) -> Self
539+
where
540+
H: Fn() + Send + Sync + 'static,
541+
{
542+
self.deadlock_handler = Some(Box::new(deadlock_handler));
543+
self
544+
}
545+
526546
/// Takes the current thread start callback, leaving `None`.
527547
fn take_start_handler(&mut self) -> Option<Box<StartHandler>> {
528548
self.start_handler.take()
@@ -676,6 +696,7 @@ impl<S> fmt::Debug for ThreadPoolBuilder<S> {
676696
ref get_thread_name,
677697
ref panic_handler,
678698
ref stack_size,
699+
ref deadlock_handler,
679700
ref start_handler,
680701
ref exit_handler,
681702
spawn_handler: _,
@@ -692,6 +713,7 @@ impl<S> fmt::Debug for ThreadPoolBuilder<S> {
692713
}
693714
let get_thread_name = get_thread_name.as_ref().map(|_| ClosurePlaceholder);
694715
let panic_handler = panic_handler.as_ref().map(|_| ClosurePlaceholder);
716+
let deadlock_handler = deadlock_handler.as_ref().map(|_| ClosurePlaceholder);
695717
let start_handler = start_handler.as_ref().map(|_| ClosurePlaceholder);
696718
let exit_handler = exit_handler.as_ref().map(|_| ClosurePlaceholder);
697719

@@ -700,6 +722,7 @@ impl<S> fmt::Debug for ThreadPoolBuilder<S> {
700722
.field("get_thread_name", &get_thread_name)
701723
.field("panic_handler", &panic_handler)
702724
.field("stack_size", &stack_size)
725+
.field("deadlock_handler", &deadlock_handler)
703726
.field("start_handler", &start_handler)
704727
.field("exit_handler", &exit_handler)
705728
.field("breadth_first", &breadth_first)

rayon-core/src/registry.rs

Lines changed: 30 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,8 @@ use crate::sleep::Sleep;
55
use crate::unwind;
66
use crate::util::leak;
77
use crate::{
8-
ErrorKind, ExitHandler, PanicHandler, StartHandler, ThreadPoolBuildError, ThreadPoolBuilder,
8+
DeadlockHandler, ErrorKind, ExitHandler, PanicHandler, StartHandler, ThreadPoolBuildError,
9+
ThreadPoolBuilder,
910
};
1011
use crossbeam_deque::{Steal, Stealer, Worker};
1112
use crossbeam_queue::SegQueue;
@@ -131,11 +132,12 @@ where
131132
}
132133
}
133134

134-
pub(super) struct Registry {
135+
pub struct Registry {
135136
thread_infos: Vec<ThreadInfo>,
136137
sleep: Sleep,
137138
injected_jobs: SegQueue<JobRef>,
138139
panic_handler: Option<Box<PanicHandler>>,
140+
deadlock_handler: Option<Box<DeadlockHandler>>,
139141
start_handler: Option<Box<StartHandler>>,
140142
exit_handler: Option<Box<ExitHandler>>,
141143

@@ -235,10 +237,11 @@ impl Registry {
235237

236238
let registry = Arc::new(Registry {
237239
thread_infos: stealers.into_iter().map(ThreadInfo::new).collect(),
238-
sleep: Sleep::new(),
240+
sleep: Sleep::new(n_threads),
239241
injected_jobs: SegQueue::new(),
240242
terminate_latch: CountLatch::new(),
241243
panic_handler: builder.take_panic_handler(),
244+
deadlock_handler: builder.take_deadlock_handler(),
242245
start_handler: builder.take_start_handler(),
243246
exit_handler: builder.take_exit_handler(),
244247
});
@@ -265,7 +268,7 @@ impl Registry {
265268
Ok(registry.clone())
266269
}
267270

268-
pub(super) fn current() -> Arc<Registry> {
271+
pub fn current() -> Arc<Registry> {
269272
unsafe {
270273
let worker_thread = WorkerThread::current();
271274
if worker_thread.is_null() {
@@ -512,6 +515,24 @@ impl Registry {
512515
}
513516
}
514517

518+
/// Mark a Rayon worker thread as blocked. This triggers the deadlock handler
519+
/// if no other worker thread is active
520+
#[inline]
521+
pub fn mark_blocked() {
522+
let worker_thread = WorkerThread::current();
523+
assert!(!worker_thread.is_null());
524+
unsafe {
525+
let registry = &(*worker_thread).registry;
526+
registry.sleep.mark_blocked(&registry.deadlock_handler)
527+
}
528+
}
529+
530+
/// Mark a previously blocked Rayon worker thread as unblocked
531+
#[inline]
532+
pub fn mark_unblocked(registry: &Registry) {
533+
registry.sleep.mark_unblocked()
534+
}
535+
515536
#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord)]
516537
pub(super) struct RegistryId {
517538
addr: usize,
@@ -666,7 +687,11 @@ impl WorkerThread {
666687
yields = self.registry.sleep.work_found(self.index, yields);
667688
self.execute(job);
668689
} else {
669-
yields = self.registry.sleep.no_work_found(self.index, yields);
690+
yields = self.registry.sleep.no_work_found(
691+
self.index,
692+
yields,
693+
&self.registry.deadlock_handler,
694+
);
670695
}
671696
}
672697

rayon-core/src/sleep/README.md

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -386,3 +386,36 @@ some of them were hit hard:
386386
- 8-10% overhead on nbody-parreduce
387387
- 35% overhead on increment-all
388388
- 245% overhead on join-recursively
389+
390+
# Deadlock detection
391+
392+
This module tracks a number of variables in order to detect deadlocks due to user code blocking.
393+
These variables are stored in the `SleepData` struct which itself is kept behind a mutex.
394+
It contains the following fields:
395+
- `worker_count` - The number of threads in the thread pool.
396+
- `active_threads` - The number of threads in the thread pool which are running
397+
and aren't blocked in user code or sleeping.
398+
- `blocked_threads` - The number of threads which are blocked in user code.
399+
This doesn't include threads blocked by Rayon.
400+
401+
User code can indicate blocking by calling `mark_blocked` before blocking and
402+
calling `mark_unblocked` before unblocking a thread.
403+
This will adjust `active_threads` and `blocked_threads` accordingly.
404+
405+
When we tickle the thread pool in `Sleep::tickle_cold`, we set `active_threads` to
406+
`worker_count` - `blocked_threads` since we wake up all Rayon threads, but not thread blocked
407+
by user code.
408+
409+
A deadlock is detected by checking if `active_threads` is 0 and `blocked_threads` is above 0.
410+
If we ignored `blocked_threads` we would have a deadlock
411+
immediately when creating the thread pool.
412+
We would also deadlock once the thread pool ran out of work.
413+
It is not possible for Rayon itself to deadlock.
414+
Deadlocks can only be caused by user code blocking, so this condition doesn't miss any deadlocks.
415+
416+
We check for the deadlock condition when
417+
threads fall asleep in `mark_unblocked` and in `Sleep::sleep`.
418+
If there's a deadlock detected we call the user provided deadlock handler while we hold the
419+
lock to `SleepData`. This means the deadlock handler cannot call `mark_blocked` and
420+
`mark_unblocked`. The user is expected to handle the deadlock in some non-Rayon thread.
421+
Once the deadlock handler returns, the thread which called the deadlock handler will go to sleep.

rayon-core/src/sleep/mod.rs

Lines changed: 73 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -2,14 +2,38 @@
22
//! for an overview.
33
44
use crate::log::Event::*;
5+
use crate::DeadlockHandler;
56
use std::sync::atomic::{AtomicUsize, Ordering};
67
use std::sync::{Condvar, Mutex};
78
use std::thread;
89
use std::usize;
910

11+
struct SleepData {
12+
/// The number of threads in the thread pool.
13+
worker_count: usize,
14+
15+
/// The number of threads in the thread pool which are running and
16+
/// aren't blocked in user code or sleeping.
17+
active_threads: usize,
18+
19+
/// The number of threads which are blocked in user code.
20+
/// This doesn't include threads blocked by this module.
21+
blocked_threads: usize,
22+
}
23+
24+
impl SleepData {
25+
/// Checks if the conditions for a deadlock holds and if so calls the deadlock handler
26+
#[inline]
27+
pub fn deadlock_check(&self, deadlock_handler: &Option<Box<DeadlockHandler>>) {
28+
if self.active_threads == 0 && self.blocked_threads > 0 {
29+
(deadlock_handler.as_ref().unwrap())();
30+
}
31+
}
32+
}
33+
1034
pub(super) struct Sleep {
1135
state: AtomicUsize,
12-
data: Mutex<()>,
36+
data: Mutex<SleepData>,
1337
tickle: Condvar,
1438
}
1539

@@ -20,14 +44,42 @@ const ROUNDS_UNTIL_SLEEPY: usize = 32;
2044
const ROUNDS_UNTIL_ASLEEP: usize = 64;
2145

2246
impl Sleep {
23-
pub(super) fn new() -> Sleep {
47+
pub(super) fn new(worker_count: usize) -> Sleep {
2448
Sleep {
2549
state: AtomicUsize::new(AWAKE),
26-
data: Mutex::new(()),
50+
data: Mutex::new(SleepData {
51+
worker_count,
52+
active_threads: worker_count,
53+
blocked_threads: 0,
54+
}),
2755
tickle: Condvar::new(),
2856
}
2957
}
3058

59+
/// Mark a Rayon worker thread as blocked. This triggers the deadlock handler
60+
/// if no other worker thread is active
61+
#[inline]
62+
pub fn mark_blocked(&self, deadlock_handler: &Option<Box<DeadlockHandler>>) {
63+
let mut data = self.data.lock().unwrap();
64+
debug_assert!(data.active_threads > 0);
65+
debug_assert!(data.blocked_threads < data.worker_count);
66+
debug_assert!(data.active_threads > 0);
67+
data.active_threads -= 1;
68+
data.blocked_threads += 1;
69+
70+
data.deadlock_check(deadlock_handler);
71+
}
72+
73+
/// Mark a previously blocked Rayon worker thread as unblocked
74+
#[inline]
75+
pub fn mark_unblocked(&self) {
76+
let mut data = self.data.lock().unwrap();
77+
debug_assert!(data.active_threads < data.worker_count);
78+
debug_assert!(data.blocked_threads > 0);
79+
data.active_threads += 1;
80+
data.blocked_threads -= 1;
81+
}
82+
3183
fn anyone_sleeping(&self, state: usize) -> bool {
3284
state & SLEEPING != 0
3385
}
@@ -61,7 +113,12 @@ impl Sleep {
61113
}
62114

63115
#[inline]
64-
pub(super) fn no_work_found(&self, worker_index: usize, yields: usize) -> usize {
116+
pub(super) fn no_work_found(
117+
&self,
118+
worker_index: usize,
119+
yields: usize,
120+
deadlock_handler: &Option<Box<DeadlockHandler>>,
121+
) -> usize {
65122
log!(DidNotFindWork {
66123
worker: worker_index,
67124
yields: yields,
@@ -88,7 +145,7 @@ impl Sleep {
88145
}
89146
} else {
90147
debug_assert_eq!(yields, ROUNDS_UNTIL_ASLEEP);
91-
self.sleep(worker_index);
148+
self.sleep(worker_index, deadlock_handler);
92149
0
93150
}
94151
}
@@ -122,7 +179,10 @@ impl Sleep {
122179
old_state: old_state,
123180
});
124181
if self.anyone_sleeping(old_state) {
125-
let _data = self.data.lock().unwrap();
182+
let mut data = self.data.lock().unwrap();
183+
// Set the active threads to the number of workers,
184+
// excluding threads blocked by the user since we won't wake those up
185+
data.active_threads = data.worker_count - data.blocked_threads;
126186
self.tickle.notify_all();
127187
}
128188
}
@@ -188,7 +248,7 @@ impl Sleep {
188248
self.worker_is_sleepy(state, worker_index)
189249
}
190250

191-
fn sleep(&self, worker_index: usize) {
251+
fn sleep(&self, worker_index: usize, deadlock_handler: &Option<Box<DeadlockHandler>>) {
192252
loop {
193253
// Acquire here suffices. If we observe that the current worker is still
194254
// sleepy, then in fact we know that no writes have occurred, and anyhow
@@ -235,7 +295,7 @@ impl Sleep {
235295
// reason for the `compare_exchange` to fail is if an
236296
// awaken comes, in which case the next cycle around
237297
// the loop will just return.
238-
let data = self.data.lock().unwrap();
298+
let mut data = self.data.lock().unwrap();
239299

240300
// This must be SeqCst on success because we want to
241301
// ensure:
@@ -264,6 +324,11 @@ impl Sleep {
264324
log!(FellAsleep {
265325
worker: worker_index
266326
});
327+
328+
// Decrement the number of active threads and check for a deadlock
329+
data.active_threads -= 1;
330+
data.deadlock_check(deadlock_handler);
331+
267332
let _ = self.tickle.wait(data).unwrap();
268333
log!(GotAwoken {
269334
worker: worker_index

0 commit comments

Comments
 (0)