1
+ pub const PROGRESS_RANGE : std:: ops:: RangeInclusive < u8 > = 0 ..=3 ;
2
+
1
3
pub struct Engine < P > {
2
4
progress : P ,
3
5
con : rusqlite:: Connection ,
4
6
gitoxide_version : String ,
5
7
}
6
8
9
+ pub struct RunOutcome {
10
+ /// the relative path to the repositories that could not be found on disk
11
+ pub missing_repos_rela_paths : usize ,
12
+ }
13
+
7
14
pub mod engine {
15
+ use super :: db;
8
16
use crate :: corpus:: Engine ;
17
+ use crate :: organize:: find_git_repository_workdirs;
9
18
use anyhow:: Context ;
10
- use std:: path:: PathBuf ;
19
+ use bytesize:: ByteSize ;
20
+ use rusqlite:: params;
21
+ use std:: path:: { Path , PathBuf } ;
22
+ use std:: time:: Instant ;
11
23
12
24
pub ( crate ) type Id = u32 ;
13
25
@@ -26,11 +38,126 @@ pub mod engine {
26
38
}
27
39
28
40
/// Run on the existing set of repositories we have already seen or obtain them from `path` if there is none yet.
29
- pub fn run ( & self , path : PathBuf ) -> anyhow:: Result < ( ) > {
30
- let _corpus_id = self . corpus_id_or_insert ( & path) ?;
41
+ pub fn run ( & mut self , corpus_path : PathBuf ) -> anyhow:: Result < ( ) > {
42
+ let corpus_path = gix:: path:: realpath ( corpus_path) ?;
43
+ let corpus_id = self . corpus_id_or_insert ( & corpus_path) ?;
31
44
let _gitoxide_id = self . gitoxide_version_id_or_insert ( ) ?;
32
45
let _runner_id = self . runner_id_or_insert ( ) ?;
33
- todo ! ( )
46
+ let _repos = self . find_repos_or_insert ( & corpus_path, corpus_id) ?;
47
+ todo ! ( "do run on repos" )
48
+ }
49
+
50
+ fn find_repos ( & mut self , corpus_id : Id ) -> anyhow:: Result < Vec < db:: Repo > > {
51
+ self . progress . set_name ( "query db-repos" ) ;
52
+ self . progress . init ( None , gix:: progress:: count ( "repos" ) ) ;
53
+
54
+ Ok ( self
55
+ . con
56
+ . prepare (
57
+ "SELECT id, rela_path, odb_size, num_objects, num_references FROM repository WHERE corpus = ?1" ,
58
+ ) ?
59
+ . query_map ( [ corpus_id] , |r| {
60
+ Ok ( db:: Repo {
61
+ id : r. get ( 0 ) ?,
62
+ path : r. get :: < _ , String > ( 1 ) ?. into ( ) ,
63
+ odb_size : ByteSize ( r. get ( 2 ) ?) ,
64
+ num_objects : r. get ( 3 ) ?,
65
+ num_references : r. get ( 4 ) ?,
66
+ } )
67
+ } ) ?
68
+ . inspect ( |_| self . progress . inc ( ) )
69
+ . collect :: < Result < _ , _ > > ( ) ?)
70
+ }
71
+
72
+ fn refresh_repos ( & mut self , corpus_path : & Path , corpus_id : Id ) -> anyhow:: Result < Vec < db:: Repo > > {
73
+ let start = Instant :: now ( ) ;
74
+ self . progress . set_name ( "refresh" ) ;
75
+ self . progress . init ( None , gix:: progress:: count ( "repos" ) ) ;
76
+
77
+ let repos = std:: thread:: scope ( {
78
+ let progress = & mut self . progress ;
79
+ let con = & mut self . con ;
80
+ |scope| -> anyhow:: Result < _ > {
81
+ let threads = std:: thread:: available_parallelism ( )
82
+ . map ( std:: num:: NonZeroUsize :: get)
83
+ . ok ( )
84
+ . unwrap_or ( 1 ) ;
85
+ let ( path_tx, repo_rx) = {
86
+ let ( path_tx, path_rx) = crossbeam_channel:: bounded ( threads * 2 ) ;
87
+ let ( repo_tx, repo_rx) = std:: sync:: mpsc:: channel :: < ( PathBuf , anyhow:: Result < db:: Repo > ) > ( ) ;
88
+ ( 0 ..threads) . for_each ( |_| {
89
+ scope. spawn ( {
90
+ let path_rx = path_rx. clone ( ) ;
91
+ let repo_tx = repo_tx. clone ( ) ;
92
+ move || -> anyhow:: Result < _ > {
93
+ for repo_path in path_rx {
94
+ let res = ( || {
95
+ let repo = gix:: open_opts ( & repo_path, gix:: open:: Options :: isolated ( ) ) ?;
96
+ db:: Repo :: try_from ( & repo)
97
+ } ) ( ) ;
98
+ repo_tx. send ( ( repo_path, res) ) ?;
99
+ }
100
+ Ok ( ( ) )
101
+ }
102
+ } ) ;
103
+ } ) ;
104
+ ( path_tx, repo_rx)
105
+ } ;
106
+
107
+ let find_progress = progress. add_child ( "find" ) ;
108
+ let write_db = scope. spawn ( move || -> anyhow:: Result < Vec < db:: Repo > > {
109
+ progress. set_name ( "write to DB" ) ;
110
+ progress. init ( None , gix:: progress:: count ( "repos" ) ) ;
111
+ let start = Instant :: now ( ) ;
112
+
113
+ let mut out = Vec :: new ( ) ;
114
+ let transaction = con. transaction ( ) ?;
115
+ let mut statement = transaction. prepare ( "INSERT INTO repository (rela_path, corpus, odb_size, num_objects, num_references) VALUES (?1, ?2, ?3, ?4, ?5)\
116
+ ON CONFLICT DO UPDATE SET rela_path = rela_path, corpus = corpus, odb_size = ?3, num_objects = ?4, num_references = ?5\
117
+ RETURNING id") ?;
118
+ for ( repo_path, repo_res) in repo_rx {
119
+ match repo_res {
120
+ Ok ( mut repo) => {
121
+ let rela_path = repo. path . strip_prefix ( corpus_path) ?;
122
+ repo. id = statement. query_row ( params ! [ rela_path. to_str( ) . context( "only valid UTF8 is allowed for repository paths" ) ?, corpus_id, repo. odb_size. as_u64( ) , repo. num_objects, repo. num_references] , |r| r. get ( 0 ) ) ?;
123
+ out. push ( repo) ;
124
+ progress. inc ( ) ;
125
+ }
126
+ Err ( err) => progress. fail ( format ! ( "{repo_path:?}: {err:#?}" ) ) ,
127
+ }
128
+ }
129
+ statement. finalize ( ) ?;
130
+ transaction. commit ( ) ?;
131
+ progress. show_throughput ( start) ;
132
+ Ok ( out)
133
+ } ) ;
134
+
135
+ let repos = gix:: interrupt:: Iter :: new (
136
+ find_git_repository_workdirs ( corpus_path, find_progress, false , Some ( threads) ) ,
137
+ || anyhow:: anyhow!( "interrupted by user" ) ,
138
+ ) ;
139
+ for res in repos {
140
+ let ( repo_path, _kind) = res?;
141
+ path_tx. send ( repo_path) ?;
142
+ }
143
+ drop ( path_tx) ;
144
+ write_db. join ( ) . expect ( "no panic" )
145
+ }
146
+ } ) ?;
147
+
148
+ self . progress . show_throughput ( start) ;
149
+ Ok ( repos)
150
+ }
151
+
152
+ fn find_repos_or_insert ( & mut self , corpus_path : & Path , corpus_id : Id ) -> anyhow:: Result < Vec < db:: Repo > > {
153
+ let start = Instant :: now ( ) ;
154
+ let repos = self . find_repos ( corpus_id) ?;
155
+ if repos. is_empty ( ) {
156
+ self . refresh_repos ( corpus_path, corpus_id)
157
+ } else {
158
+ self . progress . show_throughput ( start) ;
159
+ Ok ( repos)
160
+ }
34
161
}
35
162
}
36
163
}
@@ -39,10 +166,49 @@ pub mod db {
39
166
use crate :: corpus:: engine:: Id ;
40
167
use crate :: corpus:: Engine ;
41
168
use anyhow:: { bail, Context } ;
169
+ use bytesize:: ByteSize ;
42
170
use rusqlite:: { params, OptionalExtension } ;
43
- use std:: path:: Path ;
171
+ use std:: path:: { Path , PathBuf } ;
44
172
use sysinfo:: { CpuExt , CpuRefreshKind , RefreshKind , SystemExt } ;
45
173
174
+ /// a husk of a repository
175
+ pub ( crate ) struct Repo {
176
+ pub ( crate ) id : Id ,
177
+ /// The full path to the repository on disk, not yet validated to exist.
178
+ pub ( crate ) path : PathBuf ,
179
+ /// The size of the object database, counted quickly by packs only.
180
+ pub ( crate ) odb_size : ByteSize ,
181
+ /// The amount of objects stored in the object database.
182
+ pub ( crate ) num_objects : u64 ,
183
+ /// The total amount of references, no matter which type.
184
+ pub ( crate ) num_references : usize ,
185
+ }
186
+
187
+ impl Repo {
188
+ pub ( crate ) fn try_from ( repo : & gix:: Repository ) -> anyhow:: Result < Self > {
189
+ let num_references = repo. refs . iter ( ) ?. all ( ) ?. count ( ) ;
190
+ let num_objects = repo. objects . packed_object_count ( ) ?;
191
+ let odb_size = ByteSize (
192
+ std:: fs:: read_dir ( repo. objects . store_ref ( ) . path ( ) . join ( "pack" ) )
193
+ . map ( |dir| {
194
+ dir. filter_map ( Result :: ok)
195
+ . filter_map ( |e| e. metadata ( ) . ok ( ) )
196
+ . filter_map ( |m| m. is_file ( ) . then_some ( m. len ( ) ) )
197
+ . sum ( )
198
+ } )
199
+ . unwrap_or_default ( ) ,
200
+ ) ;
201
+
202
+ Ok ( Repo {
203
+ id : 0 ,
204
+ path : repo. path ( ) . to_owned ( ) ,
205
+ odb_size,
206
+ num_objects,
207
+ num_references,
208
+ } )
209
+ }
210
+ }
211
+
46
212
/// A version to be incremented whenever the database layout is changed, to refresh it automatically.
47
213
const VERSION : usize = 1 ;
48
214
@@ -67,9 +233,11 @@ pub mod db {
67
233
} ,
68
234
_ => { }
69
235
}
236
+ con. execute_batch ( "PRAGMA synchronous = OFF;" ) ?;
70
237
con. execute_batch (
71
238
r#"
72
239
CREATE TABLE if not exists runner(
240
+ id integer PRIMARY KEY,
73
241
vendor text,
74
242
brand text,
75
243
host_name text, -- this is just to help ID the runner
@@ -80,22 +248,29 @@ pub mod db {
80
248
con. execute_batch (
81
249
r#"
82
250
CREATE TABLE if not exists corpus(
251
+ id integer PRIMARY KEY,
83
252
root text UNIQUE -- the root path of all repositories we want to consider, as canonicalized path
84
253
)
85
254
"# ,
86
255
) ?;
87
256
con. execute_batch (
88
257
r#"
89
258
CREATE TABLE if not exists repository(
90
- rela_path text UNIQUE, -- the path to the repository on disk, relative to the corpus root path, without leading `./` or `.\`
259
+ id integer PRIMARY KEY,
260
+ rela_path text, -- the path to the repository on disk, relative to the corpus root path, without leading `./` or `.\`
91
261
corpus integer,
92
- FOREIGN KEY (corpus) REFERENCES corpus (rowid)
262
+ odb_size integer, -- the object database size in bytes
263
+ num_references integer, -- the total amount of references
264
+ num_objects integer, -- the total amount of objects
265
+ FOREIGN KEY (corpus) REFERENCES corpus (id)
266
+ UNIQUE (rela_path, corpus)
93
267
)
94
268
"# ,
95
269
) ?;
96
270
con. execute_batch (
97
271
r#"
98
272
CREATE TABLE if not exists gitoxide_version(
273
+ id integer PRIMARY KEY,
99
274
version text UNIQUE -- the unique git version via gix describe
100
275
)
101
276
"# ,
@@ -109,9 +284,9 @@ pub mod db {
109
284
start_time integer,
110
285
end_time integer, -- or NULL if not yet finished (either successfull or with failure)
111
286
error text, -- or NULL if there was on error
112
- FOREIGN KEY (repository) REFERENCES repository (rowid ),
113
- FOREIGN KEY (runner) REFERENCES runner (rowid ),
114
- FOREIGN KEY (gitoxide_version) REFERENCES gitoxide_version (rowid )
287
+ FOREIGN KEY (repository) REFERENCES repository (id ),
288
+ FOREIGN KEY (runner) REFERENCES runner (id ),
289
+ FOREIGN KEY (gitoxide_version) REFERENCES gitoxide_version (id )
115
290
)
116
291
"# ,
117
292
) ?;
@@ -129,73 +304,30 @@ pub mod db {
129
304
let vendor = Some ( cpu. vendor_id ( ) . to_owned ( ) ) ;
130
305
let host = sys. host_name ( ) ;
131
306
let brand = Some ( cpu. brand ( ) . to_owned ( ) ) ;
132
- Ok (
133
- match self
134
- . con
135
- . query_row (
136
- "SELECT rowid FROM runner WHERE vendor = ?1 AND brand = ?2" ,
137
- [ vendor. as_deref ( ) , brand. as_deref ( ) ] ,
138
- |r| r. get ( 0 ) ,
139
- )
140
- . optional ( ) ?
141
- {
142
- Some ( existing) => existing,
143
- None => {
144
- self . con . execute (
145
- "INSERT INTO runner (vendor, brand, host_name) VALUES (?1, ?2, ?3)" ,
146
- [ vendor. as_deref ( ) , brand. as_deref ( ) , host. as_deref ( ) ] ,
147
- ) ?;
148
- self . con . query_row (
149
- "SELECT rowid FROM runner WHERE vendor = ?1 AND brand = ?2" ,
150
- [ vendor, brand] ,
151
- |r| r. get ( 0 ) ,
152
- ) ?
153
- }
154
- } ,
155
- )
307
+ Ok ( self . con . query_row (
308
+ "INSERT INTO runner (vendor, brand, host_name) VALUES (?1, ?2, ?3) \
309
+ ON CONFLICT DO UPDATE SET vendor = vendor, brand = brand, host_name = ?3 RETURNING id",
310
+ [ vendor. as_deref ( ) , brand. as_deref ( ) , host. as_deref ( ) ] ,
311
+ |r| r. get ( 0 ) ,
312
+ ) ?)
156
313
}
157
314
pub ( crate ) fn corpus_id_or_insert ( & self , path : & Path ) -> anyhow:: Result < Id > {
158
315
let path = path. to_str ( ) . context ( "corpus root cannot contain illformed UTF-8" ) ?;
159
- Ok (
160
- match self
161
- . con
162
- . query_row ( "SELECT rowid FROM corpus WHERE root = ?1" , [ path] , |r| r. get ( 0 ) )
163
- . optional ( ) ?
164
- {
165
- Some ( existing) => existing,
166
- None => {
167
- self . con . execute ( "INSERT INTO corpus (root) VALUES (?1)" , [ path] ) ?;
168
- self . con
169
- . query_row ( "SELECT rowid FROM corpus WHERE root = ?1" , [ path] , |r| r. get ( 0 ) ) ?
170
- }
171
- } ,
172
- )
316
+ Ok ( self . con . query_row (
317
+ "INSERT INTO corpus (root) VALUES (?1) \
318
+ ON CONFLICT DO UPDATE SET root = root RETURNING id",
319
+ [ path] ,
320
+ |r| r. get ( 0 ) ,
321
+ ) ?)
173
322
}
174
323
pub ( crate ) fn gitoxide_version_id_or_insert ( & self ) -> anyhow:: Result < Id > {
175
- Ok (
176
- match self
324
+ Ok ( self
177
325
. con
178
326
. query_row (
179
- "SELECT rowid FROM gitoxide_version WHERE version = ?1 " ,
327
+ "INSERT INTO gitoxide_version ( version) VALUES (?1) ON CONFLICT DO UPDATE SET version = version RETURNING id " ,
180
328
[ & self . gitoxide_version ] ,
181
329
|r| r. get ( 0 ) ,
182
- )
183
- . optional ( ) ?
184
- {
185
- Some ( existing) => existing,
186
- None => {
187
- self . con . execute (
188
- "INSERT INTO gitoxide_version (version) VALUES (?1)" ,
189
- [ & self . gitoxide_version ] ,
190
- ) ?;
191
- self . con . query_row (
192
- "SELECT rowid FROM gitoxide_version WHERE version = ?1" ,
193
- [ & self . gitoxide_version ] ,
194
- |r| r. get ( 0 ) ,
195
- ) ?
196
- }
197
- } ,
198
- )
330
+ ) ?)
199
331
}
200
332
}
201
333
}
0 commit comments