1
+ pub const PROGRESS_RANGE : std:: ops:: RangeInclusive < u8 > = 0 ..=3 ;
2
+
1
3
pub struct Engine < P > {
2
4
progress : P ,
3
5
con : rusqlite:: Connection ,
4
6
gitoxide_version : String ,
5
7
}
6
8
9
+ pub struct RunOutcome {
10
+ /// the relative path to the repositories that could not be found on disk
11
+ pub missing_repos_rela_paths : usize ,
12
+ }
13
+
7
14
pub mod engine {
15
+ use super :: db;
8
16
use crate :: corpus:: Engine ;
17
+ use crate :: organize:: find_git_repository_workdirs;
9
18
use anyhow:: Context ;
10
- use std:: path:: PathBuf ;
19
+ use bytesize:: ByteSize ;
20
+ use rusqlite:: params;
21
+ use std:: path:: { Path , PathBuf } ;
22
+ use std:: time:: Instant ;
11
23
12
24
pub ( crate ) type Id = u32 ;
13
25
@@ -26,11 +38,124 @@ pub mod engine {
26
38
}
27
39
28
40
/// Run on the existing set of repositories we have already seen or obtain them from `path` if there is none yet.
29
- pub fn run ( & self , path : PathBuf ) -> anyhow:: Result < ( ) > {
30
- let _corpus_id = self . corpus_id_or_insert ( & path) ?;
41
+ pub fn run ( & mut self , corpus_path : PathBuf ) -> anyhow:: Result < ( ) > {
42
+ let corpus_path = gix:: path:: realpath ( corpus_path) ?;
43
+ let corpus_id = self . corpus_id_or_insert ( & corpus_path) ?;
31
44
let _gitoxide_id = self . gitoxide_version_id_or_insert ( ) ?;
32
45
let _runner_id = self . runner_id_or_insert ( ) ?;
33
- todo ! ( )
46
+ let _repos = self . find_repos_or_insert ( & corpus_path, corpus_id) ?;
47
+ todo ! ( "do run on repos" )
48
+ }
49
+
50
+ fn find_repos ( & mut self , corpus_id : Id ) -> anyhow:: Result < Vec < db:: Repo > > {
51
+ self . progress . set_name ( "query db-repos" ) ;
52
+ self . progress . init ( None , gix:: progress:: count ( "repos" ) ) ;
53
+
54
+ Ok ( self
55
+ . con
56
+ . prepare (
57
+ "SELECT id, rela_path, odb_size, num_objects, num_references FROM repository WHERE corpus = ?1" ,
58
+ ) ?
59
+ . query_map ( [ corpus_id] , |r| {
60
+ Ok ( db:: Repo {
61
+ id : r. get ( 0 ) ?,
62
+ path : r. get :: < _ , String > ( 1 ) ?. into ( ) ,
63
+ odb_size : ByteSize ( r. get ( 2 ) ?) ,
64
+ num_objects : r. get ( 3 ) ?,
65
+ num_references : r. get ( 4 ) ?,
66
+ } )
67
+ } ) ?
68
+ . inspect ( |_| self . progress . inc ( ) )
69
+ . collect :: < Result < _ , _ > > ( ) ?)
70
+ }
71
+
72
+ fn refresh_repos ( & mut self , corpus_path : & Path , corpus_id : Id ) -> anyhow:: Result < Vec < db:: Repo > > {
73
+ let start = Instant :: now ( ) ;
74
+ self . progress . set_name ( "refresh" ) ;
75
+ self . progress . init ( None , gix:: progress:: count ( "repos" ) ) ;
76
+
77
+ let repos = std:: thread:: scope ( {
78
+ let progress = & mut self . progress ;
79
+ let con = & mut self . con ;
80
+ |scope| -> anyhow:: Result < _ > {
81
+ let threads = std:: thread:: available_parallelism ( )
82
+ . map ( std:: num:: NonZeroUsize :: get)
83
+ . ok ( )
84
+ . unwrap_or ( 1 ) ;
85
+ let ( path_tx, repo_rx) = {
86
+ let ( path_tx, path_rx) = crossbeam_channel:: bounded ( threads * 2 ) ;
87
+ let ( repo_tx, repo_rx) = std:: sync:: mpsc:: channel :: < ( PathBuf , anyhow:: Result < db:: Repo > ) > ( ) ;
88
+ ( 0 ..threads) . for_each ( |_| {
89
+ scope. spawn ( {
90
+ let path_rx = path_rx. clone ( ) ;
91
+ let repo_tx = repo_tx. clone ( ) ;
92
+ move || -> anyhow:: Result < _ > {
93
+ for repo_path in path_rx {
94
+ let res = ( || {
95
+ let repo = gix:: open_opts ( & repo_path, gix:: open:: Options :: isolated ( ) ) ?;
96
+ db:: Repo :: try_from ( & repo)
97
+ } ) ( ) ;
98
+ repo_tx. send ( ( repo_path, res) ) ?;
99
+ }
100
+ Ok ( ( ) )
101
+ }
102
+ } ) ;
103
+ } ) ;
104
+ ( path_tx, repo_rx)
105
+ } ;
106
+
107
+ let find_progress = progress. add_child ( "find" ) ;
108
+ let write_db = scope. spawn ( move || -> anyhow:: Result < Vec < db:: Repo > > {
109
+ progress. set_name ( "write to DB" ) ;
110
+ progress. init ( None , gix:: progress:: count ( "repos" ) ) ;
111
+ let start = Instant :: now ( ) ;
112
+
113
+ let mut out = Vec :: new ( ) ;
114
+ let mut statement = con. prepare ( "INSERT INTO repository (rela_path, corpus, odb_size, num_objects, num_references) VALUES (?1, ?2, ?3, ?4, ?5)\
115
+ ON CONFLICT DO UPDATE SET rela_path = rela_path, corpus = corpus, odb_size = ?3, num_objects = ?4, num_references = ?5\
116
+ RETURNING id") ?;
117
+ for ( repo_path, repo_res) in repo_rx {
118
+ match repo_res {
119
+ Ok ( mut repo) => {
120
+ let rela_path = repo. path . strip_prefix ( corpus_path) ?;
121
+ repo. id = statement. query_row ( params ! [ rela_path. to_str( ) . context( "only valid UTF8 is allowed for repository paths" ) ?, corpus_id, repo. odb_size. as_u64( ) , repo. num_objects, repo. num_references] , |r| r. get ( 0 ) ) ?;
122
+ out. push ( repo) ;
123
+ progress. inc ( ) ;
124
+ }
125
+ Err ( err) => progress. fail ( format ! ( "{repo_path:?}: {err:#?}" ) ) ,
126
+ }
127
+ }
128
+ statement. finalize ( ) ?;
129
+ progress. show_throughput ( start) ;
130
+ Ok ( out)
131
+ } ) ;
132
+
133
+ let repos = gix:: interrupt:: Iter :: new (
134
+ find_git_repository_workdirs ( corpus_path, find_progress, false , Some ( threads) ) ,
135
+ || anyhow:: anyhow!( "interrupted by user" ) ,
136
+ ) ;
137
+ for res in repos {
138
+ let ( repo_path, _kind) = res?;
139
+ path_tx. send ( repo_path) ?;
140
+ }
141
+ drop ( path_tx) ;
142
+ write_db. join ( ) . expect ( "no panic" )
143
+ }
144
+ } ) ?;
145
+
146
+ self . progress . show_throughput ( start) ;
147
+ Ok ( repos)
148
+ }
149
+
150
+ fn find_repos_or_insert ( & mut self , corpus_path : & Path , corpus_id : Id ) -> anyhow:: Result < Vec < db:: Repo > > {
151
+ let start = Instant :: now ( ) ;
152
+ let repos = self . find_repos ( corpus_id) ?;
153
+ if repos. is_empty ( ) {
154
+ self . refresh_repos ( corpus_path, corpus_id)
155
+ } else {
156
+ self . progress . show_throughput ( start) ;
157
+ Ok ( repos)
158
+ }
34
159
}
35
160
}
36
161
}
@@ -39,10 +164,49 @@ pub mod db {
39
164
use crate :: corpus:: engine:: Id ;
40
165
use crate :: corpus:: Engine ;
41
166
use anyhow:: { bail, Context } ;
167
+ use bytesize:: ByteSize ;
42
168
use rusqlite:: { params, OptionalExtension } ;
43
- use std:: path:: Path ;
169
+ use std:: path:: { Path , PathBuf } ;
44
170
use sysinfo:: { CpuExt , CpuRefreshKind , RefreshKind , SystemExt } ;
45
171
172
+ /// a husk of a repository
173
+ pub ( crate ) struct Repo {
174
+ pub ( crate ) id : Id ,
175
+ /// The full path to the repository on disk, not yet validated to exist.
176
+ pub ( crate ) path : PathBuf ,
177
+ /// The size of the object database, counted quickly by packs only.
178
+ pub ( crate ) odb_size : ByteSize ,
179
+ /// The amount of objects stored in the object database.
180
+ pub ( crate ) num_objects : u64 ,
181
+ /// The total amount of references, no matter which type.
182
+ pub ( crate ) num_references : usize ,
183
+ }
184
+
185
+ impl Repo {
186
+ pub ( crate ) fn try_from ( repo : & gix:: Repository ) -> anyhow:: Result < Self > {
187
+ let num_references = repo. refs . iter ( ) ?. all ( ) ?. count ( ) ;
188
+ let num_objects = repo. objects . packed_object_count ( ) ?;
189
+ let odb_size = ByteSize (
190
+ std:: fs:: read_dir ( repo. objects . store_ref ( ) . path ( ) . join ( "pack" ) )
191
+ . map ( |dir| {
192
+ dir. filter_map ( Result :: ok)
193
+ . filter_map ( |e| e. metadata ( ) . ok ( ) )
194
+ . filter_map ( |m| m. is_file ( ) . then_some ( m. len ( ) ) )
195
+ . sum ( )
196
+ } )
197
+ . unwrap_or_default ( ) ,
198
+ ) ;
199
+
200
+ Ok ( Repo {
201
+ id : 0 ,
202
+ path : repo. path ( ) . to_owned ( ) ,
203
+ odb_size,
204
+ num_objects,
205
+ num_references,
206
+ } )
207
+ }
208
+ }
209
+
46
210
/// A version to be incremented whenever the database layout is changed, to refresh it automatically.
47
211
const VERSION : usize = 1 ;
48
212
@@ -67,9 +231,11 @@ pub mod db {
67
231
} ,
68
232
_ => { }
69
233
}
234
+ con. execute_batch ( "PRAGMA synchronous = OFF;" ) ?;
70
235
con. execute_batch (
71
236
r#"
72
237
CREATE TABLE if not exists runner(
238
+ id integer PRIMARY KEY,
73
239
vendor text,
74
240
brand text,
75
241
host_name text, -- this is just to help ID the runner
@@ -80,22 +246,29 @@ pub mod db {
80
246
con. execute_batch (
81
247
r#"
82
248
CREATE TABLE if not exists corpus(
249
+ id integer PRIMARY KEY,
83
250
root text UNIQUE -- the root path of all repositories we want to consider, as canonicalized path
84
251
)
85
252
"# ,
86
253
) ?;
87
254
con. execute_batch (
88
255
r#"
89
256
CREATE TABLE if not exists repository(
90
- rela_path text UNIQUE, -- the path to the repository on disk, relative to the corpus root path, without leading `./` or `.\`
257
+ id integer PRIMARY KEY,
258
+ rela_path text, -- the path to the repository on disk, relative to the corpus root path, without leading `./` or `.\`
91
259
corpus integer,
92
- FOREIGN KEY (corpus) REFERENCES corpus (rowid)
260
+ odb_size integer, -- the object database size in bytes
261
+ num_references integer, -- the total amount of references
262
+ num_objects integer, -- the total amount of objects
263
+ FOREIGN KEY (corpus) REFERENCES corpus (id)
264
+ UNIQUE (rela_path, corpus)
93
265
)
94
266
"# ,
95
267
) ?;
96
268
con. execute_batch (
97
269
r#"
98
270
CREATE TABLE if not exists gitoxide_version(
271
+ id integer PRIMARY KEY,
99
272
version text UNIQUE -- the unique git version via gix describe
100
273
)
101
274
"# ,
@@ -109,9 +282,9 @@ pub mod db {
109
282
start_time integer,
110
283
end_time integer, -- or NULL if not yet finished (either successfull or with failure)
111
284
error text, -- or NULL if there was on error
112
- FOREIGN KEY (repository) REFERENCES repository (rowid ),
113
- FOREIGN KEY (runner) REFERENCES runner (rowid ),
114
- FOREIGN KEY (gitoxide_version) REFERENCES gitoxide_version (rowid )
285
+ FOREIGN KEY (repository) REFERENCES repository (id ),
286
+ FOREIGN KEY (runner) REFERENCES runner (id ),
287
+ FOREIGN KEY (gitoxide_version) REFERENCES gitoxide_version (id )
115
288
)
116
289
"# ,
117
290
) ?;
@@ -129,73 +302,30 @@ pub mod db {
129
302
let vendor = Some ( cpu. vendor_id ( ) . to_owned ( ) ) ;
130
303
let host = sys. host_name ( ) ;
131
304
let brand = Some ( cpu. brand ( ) . to_owned ( ) ) ;
132
- Ok (
133
- match self
134
- . con
135
- . query_row (
136
- "SELECT rowid FROM runner WHERE vendor = ?1 AND brand = ?2" ,
137
- [ vendor. as_deref ( ) , brand. as_deref ( ) ] ,
138
- |r| r. get ( 0 ) ,
139
- )
140
- . optional ( ) ?
141
- {
142
- Some ( existing) => existing,
143
- None => {
144
- self . con . execute (
145
- "INSERT INTO runner (vendor, brand, host_name) VALUES (?1, ?2, ?3)" ,
146
- [ vendor. as_deref ( ) , brand. as_deref ( ) , host. as_deref ( ) ] ,
147
- ) ?;
148
- self . con . query_row (
149
- "SELECT rowid FROM runner WHERE vendor = ?1 AND brand = ?2" ,
150
- [ vendor, brand] ,
151
- |r| r. get ( 0 ) ,
152
- ) ?
153
- }
154
- } ,
155
- )
305
+ Ok ( self . con . query_row (
306
+ "INSERT INTO runner (vendor, brand, host_name) VALUES (?1, ?2, ?3) \
307
+ ON CONFLICT DO UPDATE SET vendor = vendor, brand = brand, host_name = ?3 RETURNING id",
308
+ [ vendor. as_deref ( ) , brand. as_deref ( ) , host. as_deref ( ) ] ,
309
+ |r| r. get ( 0 ) ,
310
+ ) ?)
156
311
}
157
312
pub ( crate ) fn corpus_id_or_insert ( & self , path : & Path ) -> anyhow:: Result < Id > {
158
313
let path = path. to_str ( ) . context ( "corpus root cannot contain illformed UTF-8" ) ?;
159
- Ok (
160
- match self
161
- . con
162
- . query_row ( "SELECT rowid FROM corpus WHERE root = ?1" , [ path] , |r| r. get ( 0 ) )
163
- . optional ( ) ?
164
- {
165
- Some ( existing) => existing,
166
- None => {
167
- self . con . execute ( "INSERT INTO corpus (root) VALUES (?1)" , [ path] ) ?;
168
- self . con
169
- . query_row ( "SELECT rowid FROM corpus WHERE root = ?1" , [ path] , |r| r. get ( 0 ) ) ?
170
- }
171
- } ,
172
- )
314
+ Ok ( self . con . query_row (
315
+ "INSERT INTO corpus (root) VALUES (?1) \
316
+ ON CONFLICT DO UPDATE SET root = root RETURNING id",
317
+ [ path] ,
318
+ |r| r. get ( 0 ) ,
319
+ ) ?)
173
320
}
174
321
pub ( crate ) fn gitoxide_version_id_or_insert ( & self ) -> anyhow:: Result < Id > {
175
- Ok (
176
- match self
322
+ Ok ( self
177
323
. con
178
324
. query_row (
179
- "SELECT rowid FROM gitoxide_version WHERE version = ?1 " ,
325
+ "INSERT INTO gitoxide_version ( version) VALUES (?1) ON CONFLICT DO UPDATE SET version = version RETURNING id " ,
180
326
[ & self . gitoxide_version ] ,
181
327
|r| r. get ( 0 ) ,
182
- )
183
- . optional ( ) ?
184
- {
185
- Some ( existing) => existing,
186
- None => {
187
- self . con . execute (
188
- "INSERT INTO gitoxide_version (version) VALUES (?1)" ,
189
- [ & self . gitoxide_version ] ,
190
- ) ?;
191
- self . con . query_row (
192
- "SELECT rowid FROM gitoxide_version WHERE version = ?1" ,
193
- [ & self . gitoxide_version ] ,
194
- |r| r. get ( 0 ) ,
195
- ) ?
196
- }
197
- } ,
198
- )
328
+ ) ?)
199
329
}
200
330
}
201
331
}
0 commit comments