Skip to content

Commit fe61471

Browse files
committed
Extract parallel operations in rustc_data_structures::sync into a new parallel submodule
1 parent 51a9df8 commit fe61471

File tree

2 files changed

+193
-171
lines changed

2 files changed

+193
-171
lines changed

compiler/rustc_data_structures/src/sync.rs

+5-171
Original file line numberDiff line numberDiff line change
@@ -41,19 +41,21 @@
4141
//! [^2] `MTLockRef` is a typedef.
4242
4343
pub use crate::marker::*;
44-
use parking_lot::Mutex;
45-
use std::any::Any;
4644
use std::collections::HashMap;
4745
use std::hash::{BuildHasher, Hash};
4846
use std::ops::{Deref, DerefMut};
49-
use std::panic::{catch_unwind, resume_unwind, AssertUnwindSafe};
5047

5148
mod lock;
5249
pub use lock::{Lock, LockGuard};
5350

5451
mod worker_local;
5552
pub use worker_local::{Registry, WorkerLocal};
5653

54+
mod parallel;
55+
#[cfg(parallel_compiler)]
56+
pub use parallel::scope;
57+
pub use parallel::{join, par_for_each_in, par_map, parallel_guard};
58+
5759
pub use std::sync::atomic::Ordering;
5860
pub use std::sync::atomic::Ordering::SeqCst;
5961

@@ -105,37 +107,6 @@ mod mode {
105107

106108
pub use mode::{is_dyn_thread_safe, set_dyn_thread_safe_mode};
107109

108-
/// A guard used to hold panics that occur during a parallel section to later by unwound.
109-
/// This is used for the parallel compiler to prevent fatal errors from non-deterministically
110-
/// hiding errors by ensuring that everything in the section has completed executing before
111-
/// continuing with unwinding. It's also used for the non-parallel code to ensure error message
112-
/// output match the parallel compiler for testing purposes.
113-
pub struct ParallelGuard {
114-
panic: Mutex<Option<Box<dyn Any + std::marker::Send + 'static>>>,
115-
}
116-
117-
impl ParallelGuard {
118-
pub fn run<R>(&self, f: impl FnOnce() -> R) -> Option<R> {
119-
catch_unwind(AssertUnwindSafe(f))
120-
.map_err(|err| {
121-
*self.panic.lock() = Some(err);
122-
})
123-
.ok()
124-
}
125-
}
126-
127-
/// This gives access to a fresh parallel guard in the closure and will unwind any panics
128-
/// caught in it after the closure returns.
129-
#[inline]
130-
pub fn parallel_guard<R>(f: impl FnOnce(&ParallelGuard) -> R) -> R {
131-
let guard = ParallelGuard { panic: Mutex::new(None) };
132-
let ret = f(&guard);
133-
if let Some(panic) = guard.panic.into_inner() {
134-
resume_unwind(panic);
135-
}
136-
ret
137-
}
138-
139110
cfg_if! {
140111
if #[cfg(not(parallel_compiler))] {
141112
use std::ops::Add;
@@ -227,44 +198,6 @@ cfg_if! {
227198
pub type AtomicU32 = Atomic<u32>;
228199
pub type AtomicU64 = Atomic<u64>;
229200

230-
pub fn join<A, B, RA, RB>(oper_a: A, oper_b: B) -> (RA, RB)
231-
where A: FnOnce() -> RA,
232-
B: FnOnce() -> RB
233-
{
234-
let (a, b) = parallel_guard(|guard| {
235-
let a = guard.run(oper_a);
236-
let b = guard.run(oper_b);
237-
(a, b)
238-
});
239-
(a.unwrap(), b.unwrap())
240-
}
241-
242-
#[macro_export]
243-
macro_rules! parallel {
244-
($($blocks:block),*) => {{
245-
$crate::sync::parallel_guard(|guard| {
246-
$(guard.run(|| $blocks);)*
247-
});
248-
}}
249-
}
250-
251-
pub fn par_for_each_in<T: IntoIterator>(t: T, mut for_each: impl FnMut(T::Item) + Sync + Send) {
252-
parallel_guard(|guard| {
253-
t.into_iter().for_each(|i| {
254-
guard.run(|| for_each(i));
255-
});
256-
})
257-
}
258-
259-
pub fn par_map<T: IntoIterator, R, C: FromIterator<R>>(
260-
t: T,
261-
mut map: impl FnMut(<<T as IntoIterator>::IntoIter as Iterator>::Item) -> R,
262-
) -> C {
263-
parallel_guard(|guard| {
264-
t.into_iter().filter_map(|i| guard.run(|| map(i))).collect()
265-
})
266-
}
267-
268201
pub use std::rc::Rc as Lrc;
269202
pub use std::rc::Weak as Weak;
270203
pub use std::cell::Ref as ReadGuard;
@@ -370,105 +303,6 @@ cfg_if! {
370303

371304
use std::thread;
372305

373-
#[inline]
374-
pub fn join<A, B, RA: DynSend, RB: DynSend>(oper_a: A, oper_b: B) -> (RA, RB)
375-
where
376-
A: FnOnce() -> RA + DynSend,
377-
B: FnOnce() -> RB + DynSend,
378-
{
379-
if mode::is_dyn_thread_safe() {
380-
let oper_a = FromDyn::from(oper_a);
381-
let oper_b = FromDyn::from(oper_b);
382-
let (a, b) = rayon::join(move || FromDyn::from(oper_a.into_inner()()), move || FromDyn::from(oper_b.into_inner()()));
383-
(a.into_inner(), b.into_inner())
384-
} else {
385-
let (a, b) = parallel_guard(|guard| {
386-
let a = guard.run(oper_a);
387-
let b = guard.run(oper_b);
388-
(a, b)
389-
});
390-
(a.unwrap(), b.unwrap())
391-
}
392-
}
393-
394-
// This function only works when `mode::is_dyn_thread_safe()`.
395-
pub fn scope<'scope, OP, R>(op: OP) -> R
396-
where
397-
OP: FnOnce(&rayon::Scope<'scope>) -> R + DynSend,
398-
R: DynSend,
399-
{
400-
let op = FromDyn::from(op);
401-
rayon::scope(|s| FromDyn::from(op.into_inner()(s))).into_inner()
402-
}
403-
404-
/// Runs a list of blocks in parallel. The first block is executed immediately on
405-
/// the current thread. Use that for the longest running block.
406-
#[macro_export]
407-
macro_rules! parallel {
408-
(impl $fblock:block [$($c:expr,)*] [$block:expr $(, $rest:expr)*]) => {
409-
parallel!(impl $fblock [$block, $($c,)*] [$($rest),*])
410-
};
411-
(impl $fblock:block [$($blocks:expr,)*] []) => {
412-
::rustc_data_structures::sync::scope(|s| {
413-
$(let block = rustc_data_structures::sync::FromDyn::from(|| $blocks);
414-
s.spawn(move |_| block.into_inner()());)*
415-
(|| $fblock)();
416-
});
417-
};
418-
($fblock:block, $($blocks:block),*) => {
419-
if rustc_data_structures::sync::is_dyn_thread_safe() {
420-
// Reverse the order of the later blocks since Rayon executes them in reverse order
421-
// when using a single thread. This ensures the execution order matches that
422-
// of a single threaded rustc.
423-
parallel!(impl $fblock [] [$($blocks),*]);
424-
} else {
425-
$crate::sync::parallel_guard(|guard| {
426-
guard.run(|| $fblock);
427-
$(guard.run(|| $blocks);)*
428-
});
429-
}
430-
};
431-
}
432-
433-
use rayon::iter::{FromParallelIterator, IntoParallelIterator, ParallelIterator};
434-
435-
pub fn par_for_each_in<I, T: IntoIterator<Item = I> + IntoParallelIterator<Item = I>>(
436-
t: T,
437-
for_each: impl Fn(I) + DynSync + DynSend
438-
) {
439-
parallel_guard(|guard| {
440-
if mode::is_dyn_thread_safe() {
441-
let for_each = FromDyn::from(for_each);
442-
t.into_par_iter().for_each(|i| {
443-
guard.run(|| for_each(i));
444-
});
445-
} else {
446-
t.into_iter().for_each(|i| {
447-
guard.run(|| for_each(i));
448-
});
449-
}
450-
});
451-
}
452-
453-
pub fn par_map<
454-
I,
455-
T: IntoIterator<Item = I> + IntoParallelIterator<Item = I>,
456-
R: std::marker::Send,
457-
C: FromIterator<R> + FromParallelIterator<R>
458-
>(
459-
t: T,
460-
map: impl Fn(I) -> R + DynSync + DynSend
461-
) -> C {
462-
parallel_guard(|guard| {
463-
if mode::is_dyn_thread_safe() {
464-
let map = FromDyn::from(map);
465-
t.into_par_iter().filter_map(|i| guard.run(|| map(i))).collect()
466-
} else {
467-
t.into_iter().filter_map(|i| guard.run(|| map(i))).collect()
468-
}
469-
})
470-
}
471-
472306
/// This makes locks panic if they are already held.
473307
/// It is only useful when you are running in a single thread
474308
const ERROR_CHECKING: bool = false;

0 commit comments

Comments
 (0)