1
1
use super :: db;
2
+ use crate :: corpus;
2
3
use crate :: corpus:: { Engine , Task } ;
3
4
use crate :: organize:: find_git_repository_workdirs;
4
5
use anyhow:: { bail, Context } ;
5
6
use bytesize:: ByteSize ;
6
7
use gix:: Progress ;
7
8
use rusqlite:: params;
8
9
use std:: path:: { Path , PathBuf } ;
9
- use std:: time:: Instant ;
10
+ use std:: sync:: atomic:: Ordering ;
11
+ use std:: time:: { Duration , Instant } ;
10
12
11
- impl < P > Engine < P >
12
- where
13
- P : gix:: Progress ,
14
- {
13
+ impl Engine {
15
14
/// Open the corpus DB or create it.
16
- pub fn open_or_create ( db : PathBuf , gitoxide_version : String , progress : P ) -> anyhow:: Result < Engine < P > > {
15
+ pub fn open_or_create ( db : PathBuf , gitoxide_version : String , progress : corpus :: Progress ) -> anyhow:: Result < Engine > {
17
16
let con = crate :: corpus:: db:: create ( db) . context ( "Could not open or create database" ) ?;
18
17
Ok ( Engine {
19
18
progress,
@@ -23,13 +22,13 @@ where
23
22
}
24
23
25
24
/// Run on the existing set of repositories we have already seen or obtain them from `path` if there is none yet.
26
- pub fn run ( & mut self , corpus_path : PathBuf ) -> anyhow:: Result < ( ) > {
25
+ pub fn run ( & mut self , corpus_path : PathBuf , threads : Option < usize > ) -> anyhow:: Result < ( ) > {
27
26
let ( corpus_path, corpus_id) = self . prepare_corpus_path ( corpus_path) ?;
28
27
let gitoxide_id = self . gitoxide_version_id_or_insert ( ) ?;
29
28
let runner_id = self . runner_id_or_insert ( ) ?;
30
29
let repos = self . find_repos_or_insert ( & corpus_path, corpus_id) ?;
31
30
let tasks = self . tasks_or_insert ( ) ?;
32
- self . perform_run ( gitoxide_id, runner_id, & tasks, & repos)
31
+ self . perform_run ( & corpus_path , gitoxide_id, runner_id, & tasks, repos, threads )
33
32
}
34
33
35
34
pub fn refresh ( & mut self , corpus_path : PathBuf ) -> anyhow:: Result < ( ) > {
@@ -44,42 +43,91 @@ where
44
43
}
45
44
}
46
45
47
- impl < P > Engine < P >
48
- where
49
- P : gix:: Progress ,
50
- {
46
+ impl Engine {
51
47
fn perform_run (
52
48
& mut self ,
49
+ corpus_path : & Path ,
53
50
gitoxide_id : db:: Id ,
54
51
runner_id : db:: Id ,
55
52
tasks : & [ ( db:: Id , & ' static Task ) ] ,
56
- repos : & [ db:: Repo ] ,
53
+ mut repos : Vec < db:: Repo > ,
54
+ threads : Option < usize > ,
57
55
) -> anyhow:: Result < ( ) > {
58
56
let start = Instant :: now ( ) ;
59
57
let task_progress = & mut self . progress ;
60
58
task_progress. set_name ( "run" ) ;
61
59
task_progress. init ( Some ( tasks. len ( ) ) , gix:: progress:: count ( "tasks" ) ) ;
60
+ let threads = gix:: parallel:: num_threads ( threads) ;
62
61
for ( task_id, task) in tasks {
63
62
let task_start = Instant :: now ( ) ;
64
- let mut run_progress = task_progress. add_child ( format ! ( "run '{}'" , task. name ) ) ;
65
- run_progress . init ( Some ( repos. len ( ) ) , gix:: progress:: count ( "repos" ) ) ;
63
+ let mut repo_progress = task_progress. add_child ( format ! ( "run '{}'" , task. short_name ) ) ;
64
+ repo_progress . init ( Some ( repos. len ( ) ) , gix:: progress:: count ( "repos" ) ) ;
66
65
67
- if task. execute_exclusive {
68
- for repo in repos {
66
+ if task. execute_exclusive || threads == 1 {
67
+ let mut run_progress = repo_progress. add_child ( "set later" ) ;
68
+ for repo in & repos {
69
69
if gix:: interrupt:: is_triggered ( ) {
70
70
bail ! ( "interrupted by user" ) ;
71
71
}
72
+ run_progress. set_name ( format ! (
73
+ "{:?}" ,
74
+ repo. path. strip_prefix( corpus_path) . expect( "corpus contains repo" )
75
+ ) ) ;
72
76
let mut run = Self :: insert_run ( & self . con , gitoxide_id, runner_id, * task_id, repo. id ) ?;
73
- task. perform ( & mut run, & repo. path ) ;
77
+ task. perform (
78
+ & mut run,
79
+ & repo. path ,
80
+ & mut run_progress,
81
+ Some ( threads) ,
82
+ & gix:: interrupt:: IS_INTERRUPTED ,
83
+ ) ;
74
84
Self :: update_run ( & self . con , run) ?;
75
- run_progress . inc ( ) ;
85
+ repo_progress . inc ( ) ;
76
86
}
87
+ repo_progress. show_throughput ( task_start) ;
77
88
} else {
78
- // gix::parallel::in_parallel_with_slice()
79
- todo ! ( "shared" )
89
+ let counter = repo_progress. counter ( ) ;
90
+ let repo_progress = gix:: threading:: OwnShared :: new ( gix:: threading:: Mutable :: new ( repo_progress) ) ;
91
+ gix:: parallel:: in_parallel_with_slice (
92
+ & mut repos,
93
+ Some ( threads) ,
94
+ {
95
+ let shared_repo_progress = repo_progress. clone ( ) ;
96
+ let path = self . con . path ( ) . expect ( "opened from path on disk" ) . to_owned ( ) ;
97
+ move |tid| {
98
+ (
99
+ gix:: threading:: lock ( & shared_repo_progress) . add_child ( format ! ( "{tid}" ) ) ,
100
+ rusqlite:: Connection :: open ( & path) ,
101
+ )
102
+ }
103
+ } ,
104
+ |repo, ( progress, con) , _threads_left, should_interrupt| -> anyhow:: Result < ( ) > {
105
+ progress. set_name ( format ! (
106
+ "{:?}" ,
107
+ repo. path. strip_prefix( corpus_path) . expect( "corpus contains repo" )
108
+ ) ) ;
109
+ let con = match con {
110
+ Ok ( con) => con,
111
+ Err ( err) => {
112
+ progress. fail ( format ! ( "{err:#?}" ) ) ;
113
+ should_interrupt. store ( true , Ordering :: SeqCst ) ;
114
+ return Ok ( ( ) ) ;
115
+ }
116
+ } ;
117
+ let mut run = Self :: insert_run ( con, gitoxide_id, runner_id, * task_id, repo. id ) ?;
118
+ task. perform ( & mut run, & repo. path , progress, Some ( 1 ) , should_interrupt) ;
119
+ Self :: update_run ( con, run) ?;
120
+ if let Some ( counter) = counter. as_ref ( ) {
121
+ counter. fetch_add ( 1 , Ordering :: SeqCst ) ;
122
+ }
123
+ Ok ( ( ) )
124
+ } ,
125
+ || ( !gix:: interrupt:: is_triggered ( ) ) . then ( || Duration :: from_millis ( 100 ) ) ,
126
+ std:: convert:: identity,
127
+ ) ?;
128
+ gix:: threading:: lock ( & repo_progress) . show_throughput ( task_start) ;
80
129
}
81
130
82
- run_progress. show_throughput ( task_start) ;
83
131
task_progress. inc ( ) ;
84
132
}
85
133
task_progress. show_throughput ( start) ;
0 commit comments