Skip to content

Commit 36a3229

Browse files
committed
a first example of a task that can run in parallel
1 parent daf41bf commit 36a3229

File tree

3 files changed

+154
-20
lines changed

3 files changed

+154
-20
lines changed

gitoxide-core/src/corpus/db.rs

Lines changed: 50 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,12 @@
1-
use crate::corpus::engine::Id;
2-
use crate::corpus::Engine;
1+
use crate::corpus::{Engine, Run};
32
use anyhow::{bail, Context};
43
use bytesize::ByteSize;
54
use rusqlite::{params, OptionalExtension};
65
use std::path::{Path, PathBuf};
76
use sysinfo::{CpuExt, CpuRefreshKind, RefreshKind, SystemExt};
87

8+
pub(crate) type Id = u32;
9+
910
/// a husk of a repository
1011
pub(crate) struct Repo {
1112
pub(crate) id: Id,
@@ -110,16 +111,27 @@ pub fn create(path: impl AsRef<std::path::Path>) -> anyhow::Result<rusqlite::Con
110111
)
111112
"#,
112113
)?;
114+
con.execute_batch(
115+
r#"
116+
CREATE TABLE if not exists task(
117+
id integer PRIMARY KEY,
118+
name text UNIQUE -- the unique name of the task, which is considered its id and which is immutable once run once
119+
)
120+
"#,
121+
)?;
113122
con.execute_batch(
114123
r#"
115124
CREATE TABLE if not exists run(
125+
id integer PRIMARY KEY,
116126
repository integer,
117127
runner integer,
128+
task integer,
118129
gitoxide_version integer,
119-
start_time integer,
120-
end_time integer, -- or NULL if not yet finished (either successfull or with failure)
121-
error text, -- or NULL if there was on error
130+
insertion_time integer NOT NULL, -- in seconds since UNIX epoch
131+
duration real, -- in seconds or NULL if not yet finished (either successfull or with failure)
132+
error text, -- or NULL if there was no error
122133
FOREIGN KEY (repository) REFERENCES repository (id),
134+
FOREIGN KEY (task) REFERENCES task (id),
123135
FOREIGN KEY (runner) REFERENCES runner (id),
124136
FOREIGN KEY (gitoxide_version) REFERENCES gitoxide_version (id)
125137
)
@@ -163,4 +175,37 @@ impl<P> Engine<P> {
163175
|r| r.get(0),
164176
)?)
165177
}
178+
pub(crate) fn tasks_or_insert(&self) -> anyhow::Result<Vec<(Id, &'static super::Task)>> {
179+
let mut out: Vec<_> = super::run::ALL.iter().map(|task| (0, task)).collect();
180+
for (id, task) in out.iter_mut() {
181+
*id = self.con.query_row(
182+
"INSERT INTO task (name) VALUES (?1) ON CONFLICT DO UPDATE SET name = name RETURNING id",
183+
[task.name],
184+
|r| r.get(0),
185+
)?;
186+
}
187+
Ok(out)
188+
}
189+
pub(crate) fn insert_run(
190+
con: &rusqlite::Connection,
191+
gitoxide_version: Id,
192+
runner: Id,
193+
task: Id,
194+
repository: Id,
195+
) -> anyhow::Result<Run> {
196+
let insertion_time = std::time::UNIX_EPOCH.elapsed()?.as_secs();
197+
let id = con.query_row("INSERT INTO run (gitoxide_version, runner, task, repository, insertion_time) VALUES (?1, ?2, ?3, ?4, ?5) RETURNING id", params![gitoxide_version, runner, task, repository, insertion_time], |r| r.get(0))?;
198+
Ok(Run {
199+
id,
200+
duration: Default::default(),
201+
error: None,
202+
})
203+
}
204+
pub(crate) fn update_run(con: &rusqlite::Connection, run: Run) -> anyhow::Result<()> {
205+
con.execute(
206+
"UPDATE run SET duration = ?2, error = ?3 WHERE id = ?1",
207+
params![run.id, run.duration.as_secs_f64(), run.error.as_deref()],
208+
)?;
209+
Ok(())
210+
}
166211
}

gitoxide-core/src/corpus/engine.rs

Lines changed: 47 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,13 @@
11
use super::db;
2-
use crate::corpus::Engine;
2+
use crate::corpus::{Engine, Task};
33
use crate::organize::find_git_repository_workdirs;
4-
use anyhow::Context;
4+
use anyhow::{bail, Context};
55
use bytesize::ByteSize;
6+
use gix::Progress;
67
use rusqlite::params;
78
use std::path::{Path, PathBuf};
89
use std::time::Instant;
910

10-
pub(crate) type Id = u32;
11-
1211
impl<P> Engine<P>
1312
where
1413
P: gix::Progress,
@@ -29,7 +28,8 @@ where
2928
let gitoxide_id = self.gitoxide_version_id_or_insert()?;
3029
let runner_id = self.runner_id_or_insert()?;
3130
let repos = self.find_repos_or_insert(&corpus_path, corpus_id)?;
32-
self.perform_run(gitoxide_id, runner_id, repos)
31+
let tasks = self.tasks_or_insert()?;
32+
self.perform_run(gitoxide_id, runner_id, &tasks, &repos)
3333
}
3434

3535
pub fn refresh(&mut self, corpus_path: PathBuf) -> anyhow::Result<()> {
@@ -48,17 +48,51 @@ impl<P> Engine<P>
4848
where
4949
P: gix::Progress,
5050
{
51-
fn perform_run(&self, _gitoxide_id: Id, _runner_id: Id, _repos: Vec<db::Repo>) -> anyhow::Result<()> {
52-
todo!()
51+
fn perform_run(
52+
&mut self,
53+
gitoxide_id: db::Id,
54+
runner_id: db::Id,
55+
tasks: &[(db::Id, &'static Task)],
56+
repos: &[db::Repo],
57+
) -> anyhow::Result<()> {
58+
let start = Instant::now();
59+
let task_progress = &mut self.progress;
60+
task_progress.set_name("run");
61+
task_progress.init(Some(tasks.len()), gix::progress::count("tasks"));
62+
for (task_id, task) in tasks {
63+
let task_start = Instant::now();
64+
let mut run_progress = task_progress.add_child(format!("run '{}'", task.name));
65+
run_progress.init(Some(repos.len()), gix::progress::count("repos"));
66+
67+
if task.execute_exclusive {
68+
for repo in repos {
69+
if gix::interrupt::is_triggered() {
70+
bail!("interrupted by user");
71+
}
72+
let mut run = Self::insert_run(&self.con, gitoxide_id, runner_id, *task_id, repo.id)?;
73+
task.perform(&mut run, &repo.path);
74+
Self::update_run(&self.con, run)?;
75+
run_progress.inc();
76+
}
77+
} else {
78+
// gix::parallel::in_parallel_with_slice()
79+
todo!("shared")
80+
}
81+
82+
run_progress.show_throughput(task_start);
83+
task_progress.inc();
84+
}
85+
task_progress.show_throughput(start);
86+
Ok(())
5387
}
5488

55-
fn prepare_corpus_path(&self, corpus_path: PathBuf) -> anyhow::Result<(PathBuf, Id)> {
89+
fn prepare_corpus_path(&self, corpus_path: PathBuf) -> anyhow::Result<(PathBuf, db::Id)> {
5690
let corpus_path = gix::path::realpath(corpus_path)?;
5791
let corpus_id = self.corpus_id_or_insert(&corpus_path)?;
5892
Ok((corpus_path, corpus_id))
5993
}
6094

61-
fn find_repos(&mut self, corpus_id: Id) -> anyhow::Result<Vec<db::Repo>> {
95+
fn find_repos(&mut self, corpus_path: &Path, corpus_id: db::Id) -> anyhow::Result<Vec<db::Repo>> {
6296
self.progress.set_name("query db-repos");
6397
self.progress.init(None, gix::progress::count("repos"));
6498

@@ -68,7 +102,7 @@ where
68102
.query_map([corpus_id], |r| {
69103
Ok(db::Repo {
70104
id: r.get(0)?,
71-
path: r.get::<_, String>(1)?.into(),
105+
path: corpus_path.join(r.get::<_, String>(1)?),
72106
odb_size: ByteSize(r.get(2)?),
73107
num_objects: r.get(3)?,
74108
num_references: r.get(4)?,
@@ -78,7 +112,7 @@ where
78112
.collect::<Result<_, _>>()?)
79113
}
80114

81-
fn refresh_repos(&mut self, corpus_path: &Path, corpus_id: Id) -> anyhow::Result<Vec<db::Repo>> {
115+
fn refresh_repos(&mut self, corpus_path: &Path, corpus_id: db::Id) -> anyhow::Result<Vec<db::Repo>> {
82116
let start = Instant::now();
83117
self.progress.set_name("refresh");
84118
self.progress.init(None, gix::progress::count("repos"));
@@ -117,7 +151,6 @@ where
117151
let write_db = scope.spawn(move || -> anyhow::Result<Vec<db::Repo>> {
118152
progress.set_name("write to DB");
119153
progress.init(None, gix::progress::count("repos"));
120-
let start = Instant::now();
121154

122155
let mut out = Vec::new();
123156
let transaction = con.transaction()?;
@@ -154,13 +187,12 @@ where
154187
}
155188
})?;
156189

157-
self.progress.show_throughput(start);
158190
Ok(repos)
159191
}
160192

161-
fn find_repos_or_insert(&mut self, corpus_path: &Path, corpus_id: Id) -> anyhow::Result<Vec<db::Repo>> {
193+
fn find_repos_or_insert(&mut self, corpus_path: &Path, corpus_id: db::Id) -> anyhow::Result<Vec<db::Repo>> {
162194
let start = Instant::now();
163-
let repos = self.find_repos(corpus_id)?;
195+
let repos = self.find_repos(corpus_path, corpus_id)?;
164196
if repos.is_empty() {
165197
self.refresh_repos(corpus_path, corpus_id)
166198
} else {

gitoxide-core/src/corpus/mod.rs

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,3 +13,60 @@ pub struct RunOutcome {
1313

1414
pub(crate) mod db;
1515
pub(crate) mod engine;
16+
17+
/// Contains all information necessary to run a task.
18+
pub(crate) struct Task {
19+
/// The unique name of the task, which must not be changed after creating it.
20+
///
21+
/// However, if it is changed it will be treated as new kind of task entirely and won't compare
22+
/// to previous runs of the task.
23+
name: &'static str,
24+
/// `true` if the task cannot be run in parallel as it needs all resources by itself.
25+
execute_exclusive: bool,
26+
/// The actual implementation
27+
execute: &'static (dyn run::Execute + Send + Sync),
28+
}
29+
30+
pub(crate) struct Run {
31+
/// Our own ID for finding the respective database row.
32+
id: db::Id,
33+
/// The time at which the run was inserted.
34+
duration: std::time::Duration,
35+
error: Option<String>,
36+
}
37+
38+
pub(crate) mod run {
39+
use crate::corpus::{Run, Task};
40+
use std::path::Path;
41+
42+
impl Task {
43+
pub fn perform(&self, run: &mut Run, repo: &Path) {
44+
let start = std::time::Instant::now();
45+
if let Err(err) = self.execute.execute(repo) {
46+
run.error = Some(format!("{err:#?}"))
47+
}
48+
run.duration = start.elapsed();
49+
}
50+
}
51+
52+
/// Note that once runs have been recorded, the implementation must not change anymore to keep it comparable.
53+
/// If changes have be done, rather change the name of the owning task to start a new kind of task.
54+
pub(crate) trait Execute {
55+
fn execute(&self, repo: &Path) -> anyhow::Result<()>;
56+
}
57+
58+
pub(crate) static ALL: &'static [Task] = &[Task {
59+
name: "open repository (isolated)",
60+
execute_exclusive: true, // TODO: false
61+
execute: &OpenRepo,
62+
}];
63+
64+
struct OpenRepo;
65+
66+
impl Execute for OpenRepo {
67+
fn execute(&self, repo: &Path) -> anyhow::Result<()> {
68+
gix::open_opts(&repo, gix::open::Options::isolated())?;
69+
Ok(())
70+
}
71+
}
72+
}

0 commit comments

Comments
 (0)