Skip to content

Commit 7c9186a

Browse files
committed
fix: more robustness in the face of a trampling-herd of threads loading a single index.
The motivating example is here: praetorian-inc/noseyparker#179
1 parent 7b3dc92 commit 7c9186a

File tree

5 files changed

+91
-20
lines changed

5 files changed

+91
-20
lines changed

gitoxide-core/src/repository/odb.rs

Lines changed: 67 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
use std::io;
2+
use std::sync::atomic::Ordering;
23

34
use anyhow::bail;
45

@@ -50,6 +51,8 @@ pub mod statistics {
5051
pub struct Options {
5152
pub format: OutputFormat,
5253
pub thread_limit: Option<usize>,
54+
/// A debug-flag that triggers looking up the headers of all objects again, but without indices preloaded
55+
pub extra_header_lookup: bool,
5356
}
5457
}
5558

@@ -59,7 +62,11 @@ pub fn statistics(
5962
mut progress: impl gix::Progress,
6063
out: impl io::Write,
6164
mut err: impl io::Write,
62-
statistics::Options { format, thread_limit }: statistics::Options,
65+
statistics::Options {
66+
format,
67+
thread_limit,
68+
extra_header_lookup,
69+
}: statistics::Options,
6370
) -> anyhow::Result<()> {
6471
use bytesize::ByteSize;
6572
use gix::odb::{find, HeaderExt};
@@ -76,6 +83,10 @@ pub fn statistics(
7683
#[cfg_attr(feature = "serde", derive(serde::Serialize))]
7784
#[derive(Default)]
7885
struct Statistics {
86+
/// All objects that were used to produce these statistics.
87+
/// Only `Some` if we are doing an extra round of header queries on a repository without loaded indices.
88+
#[cfg_attr(feature = "serde", serde(skip_serializing))]
89+
ids: Option<Vec<gix::ObjectId>>,
7990
total_objects: usize,
8091
loose_objects: usize,
8192
packed_objects: usize,
@@ -135,14 +146,17 @@ pub fn statistics(
135146
}
136147

137148
impl gix::parallel::Reduce for Reduce {
138-
type Input = Result<Vec<gix::odb::find::Header>, anyhow::Error>;
149+
type Input = Result<Vec<(gix::ObjectId, gix::odb::find::Header)>, anyhow::Error>;
139150
type FeedProduce = ();
140151
type Output = Statistics;
141152
type Error = anyhow::Error;
142153

143154
fn feed(&mut self, items: Self::Input) -> Result<Self::FeedProduce, Self::Error> {
144-
for item in items? {
155+
for (id, item) in items? {
145156
self.stats.consume(item);
157+
if let Some(ids) = self.stats.ids.as_mut() {
158+
ids.push(id);
159+
}
146160
}
147161
Ok(())
148162
}
@@ -154,9 +168,9 @@ pub fn statistics(
154168
}
155169

156170
let cancelled = || anyhow::anyhow!("Cancelled by user");
157-
let object_ids = repo.objects.store_ref().iter()?.filter_map(Result::ok);
171+
let object_ids = repo.objects.iter()?.filter_map(Result::ok);
158172
let chunk_size = 1_000;
159-
let stats = if gix::parallel::num_threads(thread_limit) > 1 {
173+
let mut stats = if gix::parallel::num_threads(thread_limit) > 1 {
160174
gix::parallel::in_parallel(
161175
gix::interrupt::Iter::new(
162176
gix::features::iter::Chunks {
@@ -166,19 +180,30 @@ pub fn statistics(
166180
cancelled,
167181
),
168182
thread_limit,
169-
move |_| (repo.objects.clone().into_inner(), counter),
183+
{
184+
let objects = repo.objects.clone();
185+
move |_| (objects.clone().into_inner(), counter)
186+
},
170187
|ids, (handle, counter)| {
171188
let ids = ids?;
172-
counter.fetch_add(ids.len(), std::sync::atomic::Ordering::Relaxed);
189+
counter.fetch_add(ids.len(), Ordering::Relaxed);
173190
let out = ids
174191
.into_iter()
175-
.map(|id| handle.header(id))
192+
.map(|id| handle.header(id).map(|hdr| (id, hdr)))
176193
.collect::<Result<Vec<_>, _>>()?;
177194
Ok(out)
178195
},
179-
Reduce::default(),
196+
Reduce {
197+
stats: Statistics {
198+
ids: extra_header_lookup.then(Vec::new),
199+
..Default::default()
200+
},
201+
},
180202
)?
181203
} else {
204+
if extra_header_lookup {
205+
bail!("extra-header-lookup is only meaningful in threaded mode");
206+
}
182207
let mut stats = Statistics::default();
183208

184209
for (count, id) in object_ids.enumerate() {
@@ -193,6 +218,39 @@ pub fn statistics(
193218

194219
progress.show_throughput(start);
195220

221+
if let Some(mut ids) = stats.ids.take() {
222+
// Critical to re-open the repo to assure we don't have any ODB state and start fresh.
223+
let start = std::time::Instant::now();
224+
let repo = gix::open_opts(repo.git_dir(), repo.open_options().to_owned())?;
225+
progress.set_name("re-counting".into());
226+
progress.init(Some(ids.len()), gix::progress::count("objects"));
227+
let counter = progress.counter();
228+
counter.store(0, Ordering::Relaxed);
229+
let errors = gix::parallel::in_parallel_with_slice(
230+
&mut ids,
231+
thread_limit,
232+
{
233+
let objects = repo.objects.clone();
234+
move |_| (objects.clone().into_inner(), counter, false)
235+
},
236+
|id, (odb, counter, has_error), _threads_left, _stop_everything| -> anyhow::Result<()> {
237+
counter.fetch_add(1, Ordering::Relaxed);
238+
if let Err(_err) = odb.header(id) {
239+
*has_error = true;
240+
gix::trace::error!(err = ?_err, "Object that is known to be present wasn't found");
241+
}
242+
Ok(())
243+
},
244+
|| Some(std::time::Duration::from_millis(100)),
245+
|(_, _, has_error)| has_error,
246+
)?;
247+
248+
progress.show_throughput(start);
249+
if errors.contains(&true) {
250+
bail!("At least one object couldn't be looked up even though it must exist");
251+
}
252+
}
253+
196254
#[cfg(feature = "serde")]
197255
{
198256
serde_json::to_writer_pretty(out, &stats)?;

gix-odb/src/store_impls/dynamic/load_index.rs

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,7 @@ impl super::Store {
8686
Ok(Some(self.collect_snapshot()))
8787
} else {
8888
// always compare to the latest state
89-
// Nothing changed in the mean time, try to load another index…
89+
// Nothing changed in the meantime, try to load another index…
9090
if self.load_next_index(index) {
9191
Ok(Some(self.collect_snapshot()))
9292
} else {
@@ -119,7 +119,7 @@ impl super::Store {
119119
let slot = &self.files[index.slot_indices[slot_map_index]];
120120
let _lock = slot.write.lock();
121121
if slot.generation.load(Ordering::SeqCst) > index.generation {
122-
// There is a disk consolidation in progress which just overwrote a slot that cold be disposed with some other
122+
// There is a disk consolidation in progress which just overwrote a slot that could be disposed with some other
123123
// index, one we didn't intend to load.
124124
// Continue with the next slot index in the hope there is something else we can do…
125125
continue 'retry_with_next_slot_index;
@@ -134,7 +134,8 @@ impl super::Store {
134134
slot.files.store(bundle);
135135
break 'retry_with_next_slot_index;
136136
}
137-
Err(_) => {
137+
Err(_err) => {
138+
gix_features::trace::error!(err=?_err, "Failed to load index file - some objects may seem to not exist");
138139
slot.files.store(bundle);
139140
continue 'retry_with_next_slot_index;
140141
}
@@ -197,13 +198,12 @@ impl super::Store {
197198

198199
// We might not be able to detect by pointer if the state changed, as this itself is racy. So we keep track of double-initialization
199200
// using a flag, which means that if `needs_init` was true we saw the index uninitialized once, but now that we are here it's
200-
// initialized meaning that somebody was faster and we couldn't detect it by comparisons to the index.
201+
// initialized meaning that somebody was faster, and we couldn't detect it by comparisons to the index.
201202
// If so, make sure we collect the snapshot instead of returning None in case nothing actually changed, which is likely with a
202203
// race like this.
203204
if !was_uninitialized && needs_init {
204205
return Ok(Some(self.collect_snapshot()));
205206
}
206-
self.num_disk_state_consolidation.fetch_add(1, Ordering::Relaxed);
207207

208208
let db_paths: Vec<_> = std::iter::once(objects_directory.to_owned())
209209
.chain(crate::alternate::resolve(objects_directory.clone(), &self.current_dir)?)
@@ -408,8 +408,13 @@ impl super::Store {
408408

409409
let new_index = self.index.load();
410410
Ok(if index.state_id() == new_index.state_id() {
411-
// there was no change, and nothing was loaded in the meantime, reflect that in the return value to not get into loops
412-
None
411+
// there was no change, and nothing was loaded in the meantime, reflect that in the return value to not get into loops.
412+
// However, if we came here in desperation for something, we should collect what's there for a chance that
413+
// it will be what was needed.
414+
// The original problem here is that a trampling-herd of threads want to load just a single index. Most of these threads
415+
// notice that something changed in the meantime and can avoid checking disk once again. However, one might not get it
416+
// as their index is just too new. Maybe there is a way to detect this kind of race or to prevent it?
417+
load_new_index.then(|| self.collect_snapshot())
413418
} else {
414419
if load_new_index {
415420
self.load_next_index(new_index);

gix-odb/src/store_impls/dynamic/types.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ pub(crate) type AtomicGeneration = AtomicU32;
1818

1919
/// A way to indicate which pack indices we have seen already and which of them are loaded, along with an idea
2020
/// of whether stored `PackId`s are still usable.
21-
#[derive(Default, Copy, Clone)]
21+
#[derive(Default, Copy, Clone, Debug)]
2222
pub struct SlotIndexMarker {
2323
/// The generation the `loaded_until_index` belongs to. Indices of different generations are completely incompatible.
2424
/// This value changes once the internal representation is compacted, something that may happen only if there is no handle

src/plumbing/main.rs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1153,7 +1153,7 @@ pub fn main() -> Result<()> {
11531153
),
11541154
},
11551155
Subcommands::Odb(cmd) => match cmd {
1156-
odb::Subcommands::Stats => prepare_and_run(
1156+
odb::Subcommands::Stats { extra_header_lookup } => prepare_and_run(
11571157
"odb-stats",
11581158
trace,
11591159
auto_verbose,
@@ -1166,7 +1166,11 @@ pub fn main() -> Result<()> {
11661166
progress,
11671167
out,
11681168
err,
1169-
core::repository::odb::statistics::Options { format, thread_limit },
1169+
core::repository::odb::statistics::Options {
1170+
format,
1171+
thread_limit,
1172+
extra_header_lookup,
1173+
},
11701174
)
11711175
},
11721176
),

src/plumbing/options/mod.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -586,7 +586,11 @@ pub mod odb {
586586
Info,
587587
/// Count and obtain information on all, possibly duplicate, objects in the database.
588588
#[clap(visible_alias = "statistics")]
589-
Stats,
589+
Stats {
590+
/// Lookup headers again, but without preloading indices.
591+
#[clap(long)]
592+
extra_header_lookup: bool,
593+
},
590594
}
591595
}
592596

0 commit comments

Comments
 (0)