@@ -26,7 +26,7 @@ pub struct Hydroflow {
26
26
27
27
// TODO(mingwei): separate scheduler into its own struct/trait?
28
28
// Index is stratum, value is FIFO queue for that stratum.
29
- ready_queue : Vec < VecDeque < SubgraphId > > ,
29
+ stratum_queues : Vec < VecDeque < SubgraphId > > ,
30
30
current_stratum : usize ,
31
31
current_epoch : usize ,
32
32
@@ -35,14 +35,14 @@ pub struct Hydroflow {
35
35
}
36
36
impl Default for Hydroflow {
37
37
fn default ( ) -> Self {
38
- let ( subgraphs, handoffs, states, ready_queue ) = Default :: default ( ) ;
38
+ let ( subgraphs, handoffs, states, stratum_queues ) = Default :: default ( ) ;
39
39
let ( event_queue_send, event_queue_recv) = mpsc:: unbounded_channel ( ) ;
40
40
Self {
41
41
subgraphs,
42
42
handoffs,
43
43
states,
44
44
45
- ready_queue ,
45
+ stratum_queues ,
46
46
current_stratum : 0 ,
47
47
current_epoch : 0 ,
48
48
@@ -84,7 +84,7 @@ impl Hydroflow {
84
84
// Add any external jobs to ready queue.
85
85
self . try_recv_events ( ) ;
86
86
87
- while let Some ( sg_id) = self . ready_queue [ self . current_stratum ] . pop_front ( ) {
87
+ while let Some ( sg_id) = self . stratum_queues [ self . current_stratum ] . pop_front ( ) {
88
88
{
89
89
let sg_data = & mut self . subgraphs [ sg_id] ;
90
90
// This must be true for the subgraph to be enqueued.
@@ -111,7 +111,7 @@ impl Hydroflow {
111
111
continue ;
112
112
}
113
113
succ_sg_data. is_scheduled . set ( true ) ;
114
- self . ready_queue [ succ_sg_data. stratum ] . push_back ( succ_id) ;
114
+ self . stratum_queues [ succ_sg_data. stratum ] . push_back ( succ_id) ;
115
115
}
116
116
}
117
117
}
@@ -128,12 +128,12 @@ impl Hydroflow {
128
128
let old_stratum = self . current_stratum ;
129
129
loop {
130
130
// If current stratum has work, return true.
131
- if !self . ready_queue [ self . current_stratum ] . is_empty ( ) {
131
+ if !self . stratum_queues [ self . current_stratum ] . is_empty ( ) {
132
132
return true ;
133
133
}
134
134
// Increment stratum counter.
135
135
self . current_stratum += 1 ;
136
- if self . current_stratum >= self . ready_queue . len ( ) {
136
+ if self . current_stratum >= self . stratum_queues . len ( ) {
137
137
self . current_stratum = 0 ;
138
138
self . current_epoch += 1 ;
139
139
}
@@ -175,7 +175,7 @@ impl Hydroflow {
175
175
while let Ok ( sg_id) = self . event_queue_recv . try_recv ( ) {
176
176
let sg_data = & self . subgraphs [ sg_id] ;
177
177
if !sg_data. is_scheduled . replace ( true ) {
178
- self . ready_queue [ sg_data. stratum ] . push_back ( sg_id) ;
178
+ self . stratum_queues [ sg_data. stratum ] . push_back ( sg_id) ;
179
179
enqueued_count += 1 ;
180
180
}
181
181
}
@@ -189,7 +189,7 @@ impl Hydroflow {
189
189
let sg_id = self . event_queue_recv . blocking_recv ( ) ?;
190
190
let sg_data = & self . subgraphs [ sg_id] ;
191
191
if !sg_data. is_scheduled . replace ( true ) {
192
- self . ready_queue [ sg_data. stratum ] . push_back ( sg_id) ;
192
+ self . stratum_queues [ sg_data. stratum ] . push_back ( sg_id) ;
193
193
194
194
// Enqueue any other immediate events.
195
195
return Some ( NonZeroUsize :: new ( self . try_recv_events ( ) + 1 ) . unwrap ( ) ) ;
@@ -204,7 +204,7 @@ impl Hydroflow {
204
204
let sg_id = self . event_queue_recv . recv ( ) . await ?;
205
205
let sg_data = & self . subgraphs [ sg_id] ;
206
206
if !sg_data. is_scheduled . replace ( true ) {
207
- self . ready_queue [ sg_data. stratum ] . push_back ( sg_id) ;
207
+ self . stratum_queues [ sg_data. stratum ] . push_back ( sg_id) ;
208
208
209
209
// Enqueue any other immediate events.
210
210
return Some ( NonZeroUsize :: new ( self . try_recv_events ( ) + 1 ) . unwrap ( ) ) ;
@@ -265,7 +265,7 @@ impl Hydroflow {
265
265
true ,
266
266
) ) ;
267
267
self . init_stratum ( stratum) ;
268
- self . ready_queue [ stratum] . push_back ( sg_id) ;
268
+ self . stratum_queues [ stratum] . push_back ( sg_id) ;
269
269
270
270
sg_id
271
271
}
@@ -354,15 +354,16 @@ impl Hydroflow {
354
354
true ,
355
355
) ) ;
356
356
self . init_stratum ( stratum) ;
357
- self . ready_queue [ stratum] . push_back ( sg_id) ;
357
+ self . stratum_queues [ stratum] . push_back ( sg_id) ;
358
358
359
359
sg_id
360
360
}
361
361
362
362
/// Makes sure stratum STRATUM is initialized.
363
363
fn init_stratum ( & mut self , stratum : usize ) {
364
- if self . ready_queue . len ( ) <= stratum {
365
- self . ready_queue . resize_with ( stratum + 1 , Default :: default) ;
364
+ if self . stratum_queues . len ( ) <= stratum {
365
+ self . stratum_queues
366
+ . resize_with ( stratum + 1 , Default :: default) ;
366
367
}
367
368
}
368
369
@@ -457,7 +458,7 @@ struct SubgraphData {
457
458
#[ allow( dead_code) ]
458
459
preds : Vec < HandoffId > ,
459
460
succs : Vec < HandoffId > ,
460
- /// If this subgraph is scheduled in [`Hydroflow::ready_queue `].
461
+ /// If this subgraph is scheduled in [`Hydroflow::stratum_queues `].
461
462
/// [`Cell`] allows modifying this field when iterating `Self::preds` or
462
463
/// `Self::succs`, as all `SubgraphData` are owned by the same vec
463
464
/// `Hydroflow::subgraphs`.
0 commit comments