Skip to content

Commit daf41bf

Browse files
committed
refactor
1 parent a0b4385 commit daf41bf

File tree

3 files changed

+339
-345
lines changed

3 files changed

+339
-345
lines changed

gitoxide-core/src/corpus/db.rs

Lines changed: 166 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,166 @@
1+
use crate::corpus::engine::Id;
2+
use crate::corpus::Engine;
3+
use anyhow::{bail, Context};
4+
use bytesize::ByteSize;
5+
use rusqlite::{params, OptionalExtension};
6+
use std::path::{Path, PathBuf};
7+
use sysinfo::{CpuExt, CpuRefreshKind, RefreshKind, SystemExt};
8+
9+
/// a husk of a repository
10+
pub(crate) struct Repo {
11+
pub(crate) id: Id,
12+
/// The full path to the repository on disk, not yet validated to exist.
13+
pub(crate) path: PathBuf,
14+
/// The size of the object database, counted quickly by packs only.
15+
pub(crate) odb_size: ByteSize,
16+
/// The amount of objects stored in the object database.
17+
pub(crate) num_objects: u64,
18+
/// The total amount of references, no matter which type.
19+
pub(crate) num_references: usize,
20+
}
21+
22+
impl Repo {
23+
pub(crate) fn try_from(repo: &gix::Repository) -> anyhow::Result<Self> {
24+
let num_references = repo.refs.iter()?.all()?.count();
25+
let num_objects = repo.objects.packed_object_count()?;
26+
let odb_size = ByteSize(
27+
std::fs::read_dir(repo.objects.store_ref().path().join("pack"))
28+
.map(|dir| {
29+
dir.filter_map(Result::ok)
30+
.filter_map(|e| e.metadata().ok())
31+
.filter_map(|m| m.is_file().then_some(m.len()))
32+
.sum()
33+
})
34+
.unwrap_or_default(),
35+
);
36+
37+
Ok(Repo {
38+
id: 0,
39+
path: repo.path().to_owned(),
40+
odb_size,
41+
num_objects,
42+
num_references,
43+
})
44+
}
45+
}
46+
47+
/// A version to be incremented whenever the database layout is changed, to refresh it automatically.
48+
const VERSION: usize = 1;
49+
50+
pub fn create(path: impl AsRef<std::path::Path>) -> anyhow::Result<rusqlite::Connection> {
51+
let path = path.as_ref();
52+
let con = rusqlite::Connection::open(path)?;
53+
let meta_table = r#"
54+
CREATE TABLE if not exists meta(
55+
version int
56+
)"#;
57+
con.execute_batch(meta_table)?;
58+
let version: Option<usize> = con.query_row("SELECT version FROM meta", [], |r| r.get(0)).optional()?;
59+
match version {
60+
None => {
61+
con.execute("INSERT into meta(version) values(?)", params![VERSION])?;
62+
}
63+
Some(version) if version != VERSION => match con.close() {
64+
Ok(()) => {
65+
bail!("Cannot handle database with version {version}, cannot yet migrate to {VERSION} - maybe migrate by hand?");
66+
}
67+
Err((_, err)) => return Err(err.into()),
68+
},
69+
_ => {}
70+
}
71+
con.execute_batch("PRAGMA synchronous = OFF;")?;
72+
con.execute_batch(
73+
r#"
74+
CREATE TABLE if not exists runner(
75+
id integer PRIMARY KEY,
76+
vendor text,
77+
brand text,
78+
host_name text, -- this is just to help ID the runner
79+
UNIQUE (vendor, brand)
80+
)
81+
"#,
82+
)?;
83+
con.execute_batch(
84+
r#"
85+
CREATE TABLE if not exists corpus(
86+
id integer PRIMARY KEY,
87+
root text UNIQUE -- the root path of all repositories we want to consider, as canonicalized path
88+
)
89+
"#,
90+
)?;
91+
con.execute_batch(
92+
r#"
93+
CREATE TABLE if not exists repository(
94+
id integer PRIMARY KEY,
95+
rela_path text, -- the path to the repository on disk, relative to the corpus root path, without leading `./` or `.\`
96+
corpus integer,
97+
odb_size integer, -- the object database size in bytes
98+
num_references integer, -- the total amount of references
99+
num_objects integer, -- the total amount of objects
100+
FOREIGN KEY (corpus) REFERENCES corpus (id)
101+
UNIQUE (rela_path, corpus)
102+
)
103+
"#,
104+
)?;
105+
con.execute_batch(
106+
r#"
107+
CREATE TABLE if not exists gitoxide_version(
108+
id integer PRIMARY KEY,
109+
version text UNIQUE -- the unique git version via gix describe
110+
)
111+
"#,
112+
)?;
113+
con.execute_batch(
114+
r#"
115+
CREATE TABLE if not exists run(
116+
repository integer,
117+
runner integer,
118+
gitoxide_version integer,
119+
start_time integer,
120+
end_time integer, -- or NULL if not yet finished (either successfull or with failure)
121+
error text, -- or NULL if there was on error
122+
FOREIGN KEY (repository) REFERENCES repository (id),
123+
FOREIGN KEY (runner) REFERENCES runner (id),
124+
FOREIGN KEY (gitoxide_version) REFERENCES gitoxide_version (id)
125+
)
126+
"#,
127+
)?;
128+
129+
Ok(con)
130+
}
131+
132+
/// Utilities
133+
impl<P> Engine<P> {
134+
pub(crate) fn runner_id_or_insert(&self) -> anyhow::Result<Id> {
135+
let sys =
136+
sysinfo::System::new_with_specifics(RefreshKind::new().with_cpu(CpuRefreshKind::new().with_frequency()));
137+
let cpu = &sys.cpus()[0];
138+
let vendor = Some(cpu.vendor_id().to_owned());
139+
let host = sys.host_name();
140+
let brand = Some(cpu.brand().to_owned());
141+
Ok(self.con.query_row(
142+
"INSERT INTO runner (vendor, brand, host_name) VALUES (?1, ?2, ?3) \
143+
ON CONFLICT DO UPDATE SET vendor = vendor, brand = brand, host_name = ?3 RETURNING id",
144+
[vendor.as_deref(), brand.as_deref(), host.as_deref()],
145+
|r| r.get(0),
146+
)?)
147+
}
148+
pub(crate) fn corpus_id_or_insert(&self, path: &Path) -> anyhow::Result<Id> {
149+
let path = path.to_str().context("corpus root cannot contain illformed UTF-8")?;
150+
Ok(self.con.query_row(
151+
"INSERT INTO corpus (root) VALUES (?1) \
152+
ON CONFLICT DO UPDATE SET root = root RETURNING id",
153+
[path],
154+
|r| r.get(0),
155+
)?)
156+
}
157+
pub(crate) fn gitoxide_version_id_or_insert(&self) -> anyhow::Result<Id> {
158+
Ok(self
159+
.con
160+
.query_row(
161+
"INSERT INTO gitoxide_version (version) VALUES (?1) ON CONFLICT DO UPDATE SET version = version RETURNING id",
162+
[&self.gitoxide_version],
163+
|r| r.get(0),
164+
)?)
165+
}
166+
}

gitoxide-core/src/corpus/engine.rs

Lines changed: 171 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,171 @@
1+
use super::db;
2+
use crate::corpus::Engine;
3+
use crate::organize::find_git_repository_workdirs;
4+
use anyhow::Context;
5+
use bytesize::ByteSize;
6+
use rusqlite::params;
7+
use std::path::{Path, PathBuf};
8+
use std::time::Instant;
9+
10+
pub(crate) type Id = u32;
11+
12+
impl<P> Engine<P>
13+
where
14+
P: gix::Progress,
15+
{
16+
/// Open the corpus DB or create it.
17+
pub fn open_or_create(db: PathBuf, gitoxide_version: String, progress: P) -> anyhow::Result<Engine<P>> {
18+
let con = crate::corpus::db::create(db).context("Could not open or create database")?;
19+
Ok(Engine {
20+
progress,
21+
con,
22+
gitoxide_version,
23+
})
24+
}
25+
26+
/// Run on the existing set of repositories we have already seen or obtain them from `path` if there is none yet.
27+
pub fn run(&mut self, corpus_path: PathBuf) -> anyhow::Result<()> {
28+
let (corpus_path, corpus_id) = self.prepare_corpus_path(corpus_path)?;
29+
let gitoxide_id = self.gitoxide_version_id_or_insert()?;
30+
let runner_id = self.runner_id_or_insert()?;
31+
let repos = self.find_repos_or_insert(&corpus_path, corpus_id)?;
32+
self.perform_run(gitoxide_id, runner_id, repos)
33+
}
34+
35+
pub fn refresh(&mut self, corpus_path: PathBuf) -> anyhow::Result<()> {
36+
let (corpus_path, corpus_id) = self.prepare_corpus_path(corpus_path)?;
37+
let repos = self.refresh_repos(&corpus_path, corpus_id)?;
38+
self.progress.set_name("refresh repos");
39+
self.progress.info(format!(
40+
"Added or updated {} repositories under {corpus_path:?}",
41+
repos.len()
42+
));
43+
Ok(())
44+
}
45+
}
46+
47+
impl<P> Engine<P>
48+
where
49+
P: gix::Progress,
50+
{
51+
fn perform_run(&self, _gitoxide_id: Id, _runner_id: Id, _repos: Vec<db::Repo>) -> anyhow::Result<()> {
52+
todo!()
53+
}
54+
55+
fn prepare_corpus_path(&self, corpus_path: PathBuf) -> anyhow::Result<(PathBuf, Id)> {
56+
let corpus_path = gix::path::realpath(corpus_path)?;
57+
let corpus_id = self.corpus_id_or_insert(&corpus_path)?;
58+
Ok((corpus_path, corpus_id))
59+
}
60+
61+
fn find_repos(&mut self, corpus_id: Id) -> anyhow::Result<Vec<db::Repo>> {
62+
self.progress.set_name("query db-repos");
63+
self.progress.init(None, gix::progress::count("repos"));
64+
65+
Ok(self
66+
.con
67+
.prepare("SELECT id, rela_path, odb_size, num_objects, num_references FROM repository WHERE corpus = ?1")?
68+
.query_map([corpus_id], |r| {
69+
Ok(db::Repo {
70+
id: r.get(0)?,
71+
path: r.get::<_, String>(1)?.into(),
72+
odb_size: ByteSize(r.get(2)?),
73+
num_objects: r.get(3)?,
74+
num_references: r.get(4)?,
75+
})
76+
})?
77+
.inspect(|_| self.progress.inc())
78+
.collect::<Result<_, _>>()?)
79+
}
80+
81+
fn refresh_repos(&mut self, corpus_path: &Path, corpus_id: Id) -> anyhow::Result<Vec<db::Repo>> {
82+
let start = Instant::now();
83+
self.progress.set_name("refresh");
84+
self.progress.init(None, gix::progress::count("repos"));
85+
86+
let repos = std::thread::scope({
87+
let progress = &mut self.progress;
88+
let con = &mut self.con;
89+
|scope| -> anyhow::Result<_> {
90+
let threads = std::thread::available_parallelism()
91+
.map(std::num::NonZeroUsize::get)
92+
.ok()
93+
.unwrap_or(1);
94+
let (path_tx, repo_rx) = {
95+
let (path_tx, path_rx) = crossbeam_channel::bounded(threads * 2);
96+
let (repo_tx, repo_rx) = std::sync::mpsc::channel::<(PathBuf, anyhow::Result<db::Repo>)>();
97+
(0..threads).for_each(|_| {
98+
scope.spawn({
99+
let path_rx = path_rx.clone();
100+
let repo_tx = repo_tx.clone();
101+
move || -> anyhow::Result<_> {
102+
for repo_path in path_rx {
103+
let res = (|| {
104+
let repo = gix::open_opts(&repo_path, gix::open::Options::isolated())?;
105+
db::Repo::try_from(&repo)
106+
})();
107+
repo_tx.send((repo_path, res))?;
108+
}
109+
Ok(())
110+
}
111+
});
112+
});
113+
(path_tx, repo_rx)
114+
};
115+
116+
let find_progress = progress.add_child("find");
117+
let write_db = scope.spawn(move || -> anyhow::Result<Vec<db::Repo>> {
118+
progress.set_name("write to DB");
119+
progress.init(None, gix::progress::count("repos"));
120+
let start = Instant::now();
121+
122+
let mut out = Vec::new();
123+
let transaction = con.transaction()?;
124+
let mut statement = transaction.prepare("INSERT INTO repository (rela_path, corpus, odb_size, num_objects, num_references) VALUES (?1, ?2, ?3, ?4, ?5)\
125+
ON CONFLICT DO UPDATE SET rela_path = rela_path, corpus = corpus, odb_size = ?3, num_objects = ?4, num_references = ?5\
126+
RETURNING id")?;
127+
for (repo_path, repo_res) in repo_rx {
128+
match repo_res {
129+
Ok(mut repo) => {
130+
let rela_path = repo.path.strip_prefix(corpus_path)?;
131+
repo.id = statement.query_row(params![rela_path.to_str().context("only valid UTF8 is allowed for repository paths")?, corpus_id, repo.odb_size.as_u64(), repo.num_objects, repo.num_references], |r| r.get(0))?;
132+
out.push(repo);
133+
progress.inc();
134+
}
135+
Err(err) => progress.fail(format!("{repo_path:?}: {err:#?}")),
136+
}
137+
}
138+
statement.finalize()?;
139+
transaction.commit()?;
140+
progress.show_throughput(start);
141+
Ok(out)
142+
});
143+
144+
let repos = gix::interrupt::Iter::new(
145+
find_git_repository_workdirs(corpus_path, find_progress, false, Some(threads)),
146+
|| anyhow::anyhow!("interrupted by user"),
147+
);
148+
for res in repos {
149+
let (repo_path, _kind) = res?;
150+
path_tx.send(repo_path)?;
151+
}
152+
drop(path_tx);
153+
write_db.join().expect("no panic")
154+
}
155+
})?;
156+
157+
self.progress.show_throughput(start);
158+
Ok(repos)
159+
}
160+
161+
fn find_repos_or_insert(&mut self, corpus_path: &Path, corpus_id: Id) -> anyhow::Result<Vec<db::Repo>> {
162+
let start = Instant::now();
163+
let repos = self.find_repos(corpus_id)?;
164+
if repos.is_empty() {
165+
self.refresh_repos(corpus_path, corpus_id)
166+
} else {
167+
self.progress.show_throughput(start);
168+
Ok(repos)
169+
}
170+
}
171+
}

0 commit comments

Comments
 (0)