Skip to content

Commit b15a1f7

Browse files
committed
parallel status check
1 parent 4af2fc8 commit b15a1f7

File tree

7 files changed

+313
-98
lines changed

7 files changed

+313
-98
lines changed

gix-features/src/parallel/in_parallel.rs

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,76 @@ pub fn build_thread() -> std::thread::Builder {
3434
std::thread::Builder::new()
3535
}
3636

37+
/// Read items from `input` and `consume` them in multiple threads,
38+
/// whose output output is collected by a `reducer`. Its task is to
39+
/// aggregate these outputs into the final result returned by this function with the benefit of not having to be thread-safe.
40+
///
41+
/// * if `thread_limit` is `Some`, the given amount of threads will be used. If `None`, all logical cores will be used.
42+
/// * `new_thread_state(thread_number) -> State` produces thread-local state once per thread to be based to `consume`
43+
/// * `consume(Item, &mut State) -> Output` produces an output given an input obtained by `input` along with mutable state initially
44+
/// created by `new_thread_state(…)`.
45+
/// * For `reducer`, see the [`Reduce`] trait
46+
pub fn in_parallel_chunks<'a, I, S, O, R>(
47+
input: &'a mut [I],
48+
chunk_size: usize,
49+
thread_limit: Option<usize>,
50+
new_thread_state: impl Fn(usize) -> S + Send + Clone,
51+
consume: impl Fn(&'a mut I, &mut S) -> Option<O> + Send + Clone,
52+
mut reducer: R,
53+
) -> Result<<R as Reduce>::Output, <R as Reduce>::Error>
54+
where
55+
R: Reduce<Input = O>,
56+
I: Send,
57+
O: Send,
58+
{
59+
let num_threads = num_threads(thread_limit);
60+
std::thread::scope(move |s| {
61+
let receive_result = {
62+
let (send_input, receive_input) = crossbeam_channel::bounded::<&mut [I]>(num_threads);
63+
let (send_result, receive_result) = crossbeam_channel::bounded::<O>(num_threads);
64+
for thread_id in 0..num_threads {
65+
std::thread::Builder::new()
66+
.name(format!("gitoxide.in_parallel.produce.{thread_id}"))
67+
.spawn_scoped(s, {
68+
let send_result = send_result.clone();
69+
let receive_input = receive_input.clone();
70+
let new_thread_state = new_thread_state.clone();
71+
let consume = consume.clone();
72+
move || {
73+
let mut state = new_thread_state(thread_id);
74+
for chunk in receive_input {
75+
for item in chunk {
76+
if let Some(output) = consume(item, &mut state) {
77+
if send_result.send(output).is_err() {
78+
break;
79+
}
80+
}
81+
}
82+
}
83+
}
84+
})
85+
.expect("valid name");
86+
}
87+
std::thread::Builder::new()
88+
.name("gitoxide.in_parallel.feed".into())
89+
.spawn_scoped(s, move || {
90+
for chunk in input.chunks_mut(chunk_size) {
91+
if send_input.send(chunk).is_err() {
92+
break;
93+
}
94+
}
95+
})
96+
.expect("valid name");
97+
receive_result
98+
};
99+
100+
for item in receive_result {
101+
drop(reducer.feed(item)?);
102+
}
103+
reducer.finalize()
104+
})
105+
}
106+
37107
/// Read items from `input` and `consume` them in multiple threads,
38108
/// whose output output is collected by a `reducer`. Its task is to
39109
/// aggregate these outputs into the final result returned by this function with the benefit of not having to be thread-safe.

gix-features/src/parallel/mod.rs

Lines changed: 49 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,11 +35,11 @@
3535
#[cfg(feature = "parallel")]
3636
mod in_parallel;
3737
#[cfg(feature = "parallel")]
38-
pub use in_parallel::{build_thread, in_parallel, in_parallel_with_slice, join, threads};
38+
pub use in_parallel::{build_thread, in_parallel, in_parallel_chunks, in_parallel_with_slice, join, threads};
3939

4040
mod serial;
4141
#[cfg(not(feature = "parallel"))]
42-
pub use serial::{build_thread, in_parallel, in_parallel_with_slice, join, threads};
42+
pub use serial::{build_thread, in_parallel, in_parallel_chunks, in_parallel_with_slice, join, threads};
4343

4444
mod in_order;
4545
pub use in_order::{InOrderIter, SequenceId};
@@ -128,6 +128,53 @@ pub fn num_threads(thread_limit: Option<usize>) -> usize {
128128
.unwrap_or(logical_cores)
129129
}
130130

131+
/// Run [`in_parallel()`] only if the given `condition()` returns true when eagerly evaluated.
132+
///
133+
/// For parameters, see the documentation of [`in_parallel()`]
134+
#[cfg(feature = "parallel")]
135+
pub fn in_parallel_chunks_if<'a, I, S, O, R>(
136+
condition: impl FnOnce() -> bool,
137+
input: &'a mut [I],
138+
chunk_size: usize,
139+
thread_limit: Option<usize>,
140+
new_thread_state: impl Fn(usize) -> S + Send + Clone,
141+
consume: impl Fn(&'a mut I, &mut S) -> Option<O> + Send + Clone,
142+
reducer: R,
143+
) -> Result<<R as Reduce>::Output, <R as Reduce>::Error>
144+
where
145+
R: Reduce<Input = O>,
146+
I: Send,
147+
O: Send,
148+
{
149+
if num_threads(thread_limit) > 1 && condition() {
150+
in_parallel_chunks(input, chunk_size, thread_limit, new_thread_state, consume, reducer)
151+
} else {
152+
serial::in_parallel_chunks(input, chunk_size, thread_limit, new_thread_state, consume, reducer)
153+
}
154+
}
155+
156+
/// Run [`in_parallel()`] only if the given `condition()` returns true when eagerly evaluated.
157+
///
158+
/// For parameters, see the documentation of [`in_parallel()`]
159+
///
160+
/// Note that the non-parallel version is equivalent to [`in_parallel()`].
161+
#[cfg(not(feature = "parallel"))]
162+
pub fn in_parallel_chunks_if<'a, I, S, O, R>(
163+
_condition: impl FnOnce() -> bool,
164+
input: &'a mut [I],
165+
chunk_size: usize,
166+
thread_limit: Option<usize>,
167+
new_thread_state: impl Fn(usize) -> S + Send + Clone,
168+
consume: impl Fn(&'a mut I, &mut S) -> Option<O> + Send + Clone,
169+
reducer: R,
170+
) -> Result<<R as Reduce>::Output, <R as Reduce>::Error>
171+
where
172+
R: Reduce<Input = O>,
173+
I: Send,
174+
O: Send,
175+
{
176+
serial::in_parallel_chunks(input, chunk_size, thread_limit, new_thread_state, consume, reducer)
177+
}
131178
/// Run [`in_parallel()`] only if the given `condition()` returns true when eagerly evaluated.
132179
///
133180
/// For parameters, see the documentation of [`in_parallel()`]

gix-features/src/parallel/serial.rs

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -133,3 +133,35 @@ where
133133
}
134134
reducer.finalize()
135135
}
136+
137+
/// Read items from `input` and `consume` them in a single thread, producing an output to be collected by a `reducer`,
138+
/// whose task is to aggregate these outputs into the final result returned by this function.
139+
///
140+
/// * `new_thread_state(thread_number) -> State` produces thread-local state once per thread to be based to `consume`
141+
/// * `consume(Item, &mut State) -> Output` produces an output given an input along with mutable state.
142+
/// * For `reducer`, see the [`Reduce`] trait
143+
/// * if `thread_limit` has no effect as everything is run on the main thread, but is present to keep the signature
144+
/// similar to the parallel version.
145+
///
146+
/// **This serial version performing all calculations on the current thread.**
147+
pub fn in_parallel_chunks<'a, I, S, O, R>(
148+
input: &'a mut [I],
149+
_chunk_size: usize,
150+
_thread_limit: Option<usize>,
151+
new_thread_state: impl Fn(usize) -> S + Send + Clone,
152+
consume: impl Fn(&'a mut I, &mut S) -> Option<O> + Send + Clone,
153+
mut reducer: R,
154+
) -> Result<<R as Reduce>::Output, <R as Reduce>::Error>
155+
where
156+
R: Reduce<Input = O>,
157+
I: Send,
158+
O: Send,
159+
{
160+
let mut state = new_thread_state(0);
161+
for item in input {
162+
if let Some(res) = consume(item, &mut state) {
163+
drop(reducer.feed(res)?);
164+
}
165+
}
166+
reducer.finalize()
167+
}

gix-index/src/lib.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -93,9 +93,10 @@ pub struct State {
9393
/// same timestamp as this as potentially changed, checking more thoroughly if a change actually happened.
9494
timestamp: FileTime,
9595
version: Version,
96-
entries: Vec<Entry>,
96+
///
97+
pub entries: Vec<Entry>,
9798
/// A memory area keeping all index paths, in full length, independently of the index version.
98-
path_backing: PathStorage,
99+
pub path_backing: PathStorage,
99100
/// True if one entry in the index has a special marker mode
100101
is_sparse: bool,
101102

gix-worktree/src/index/status.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ pub enum Status<T = ()> {
4646
}
4747

4848
///
49-
pub trait Collector<'index> {
49+
pub trait Collector<'index>: Send {
5050
/// Data generated by comparing two files/entries
5151
type Diff;
5252
///

gix-worktree/src/index/status/recorder.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ pub struct Recorder<'index, T = ()> {
1111
pub records: Vec<(&'index BStr, Status<T>, bool)>,
1212
}
1313

14-
impl<'index, T> Collector<'index> for Recorder<'index, T> {
14+
impl<'index, T: Send> Collector<'index> for Recorder<'index, T> {
1515
type Diff = T;
1616

1717
fn visit_entry(

0 commit comments

Comments
 (0)