Skip to content

Commit 1fcb2d5

Browse files
committed
Make runtime able to scale up and down
1 parent fc4e472 commit 1fcb2d5

File tree

1 file changed

+159
-49
lines changed

1 file changed

+159
-49
lines changed

src/rt/runtime.rs

+159-49
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,14 @@ use std::cell::Cell;
22
use std::io;
33
use std::iter;
44
use std::sync::atomic::{self, Ordering};
5-
use std::sync::{Arc, Mutex};
5+
use std::sync::atomic::{AtomicBool, AtomicUsize};
6+
use std::sync::{Arc, Mutex, RwLock};
67
use std::thread;
78
use std::time::Duration;
89

10+
use crossbeam_channel::{unbounded, Receiver, Sender};
911
use crossbeam_deque::{Injector, Steal, Stealer, Worker};
10-
use crossbeam_utils::thread::scope;
12+
use crossbeam_utils::thread::{scope, Scope};
1113
use once_cell::unsync::OnceCell;
1214

1315
use crate::rt::Reactor;
@@ -28,46 +30,59 @@ struct Scheduler {
2830
polling: bool,
2931
}
3032

33+
/// Task to be sent to worker thread
34+
enum Task {
35+
Runnable(Runnable),
36+
Terminate,
37+
}
38+
39+
/// Action to be sent to runtime
40+
enum Action {
41+
ScaleUp,
42+
ScaleDown,
43+
Terminate,
44+
}
45+
3146
/// An async runtime.
3247
pub struct Runtime {
3348
/// The reactor.
3449
reactor: Reactor,
3550

3651
/// The global queue of tasks.
37-
injector: Injector<Runnable>,
52+
injector: Injector<Task>,
3853

3954
/// Handles to local queues for stealing work.
40-
stealers: Vec<Stealer<Runnable>>,
41-
42-
/// Machines to start
43-
machines: Vec<Arc<Machine>>,
55+
stealers: RwLock<Vec<(Stealer<Task>, usize)>>,
4456

4557
/// The scheduler state.
4658
sched: Mutex<Scheduler>,
59+
60+
/// Number of minimal worker thread that must available
61+
min_worker: usize,
62+
63+
/// Counter for generating id of worker thread
64+
counter: AtomicUsize,
65+
66+
/// Reciever side for runtime action channel
67+
reciever: Receiver<Action>,
68+
69+
/// Sender side for runtime action channel
70+
sender: Sender<Action>,
4771
}
4872

4973
impl Runtime {
5074
/// Creates a new runtime.
5175
pub fn new() -> Runtime {
52-
let cpus = num_cpus::get().max(1);
53-
let processors: Vec<_> = (0..cpus).map(|_| Processor::new()).collect();
54-
55-
let machines: Vec<_> = processors
56-
.into_iter()
57-
.map(|p| Arc::new(Machine::new(p)))
58-
.collect();
59-
60-
let stealers = machines
61-
.iter()
62-
.map(|m| m.processor.lock().worker.stealer())
63-
.collect();
64-
76+
let (sender, reciever) = unbounded();
6577
Runtime {
6678
reactor: Reactor::new().unwrap(),
6779
injector: Injector::new(),
68-
stealers,
69-
machines,
80+
stealers: RwLock::new(vec![]),
7081
sched: Mutex::new(Scheduler { polling: false }),
82+
min_worker: num_cpus::get().max(1),
83+
counter: AtomicUsize::new(0),
84+
reciever,
85+
sender,
7186
}
7287
}
7388

@@ -88,32 +103,88 @@ impl Runtime {
88103
// Otherwise, push it into the global task queue.
89104
match machine.get() {
90105
None => {
91-
self.injector.push(task);
106+
self.injector.push(Task::Runnable(task));
92107
self.notify();
93108
}
94-
Some(m) => m.schedule(&self, task),
109+
Some(m) => m.schedule(&self, Task::Runnable(task)),
95110
}
96111
});
97112
}
98113

114+
/// Start a worker thread.
115+
fn start_new_thread<'e, 's: 'e>(&'s self, scope: &Scope<'e>) {
116+
let id = self.counter.fetch_add(1, Ordering::SeqCst /* ??? */);
117+
let m = Arc::new(Machine::new(id, Processor::new()));
118+
119+
self.stealers
120+
.write()
121+
.unwrap()
122+
.push((m.processor.lock().worker.stealer(), id));
123+
124+
scope
125+
.builder()
126+
.name("async-std/machine".to_string())
127+
.spawn(move |_| {
128+
abort_on_panic(|| {
129+
let _ = MACHINE.with(|machine| machine.set(m.clone()));
130+
m.run(self);
131+
})
132+
})
133+
.expect("cannot start a machine thread");
134+
}
135+
99136
/// Runs the runtime on the current thread.
100137
pub fn run(&self) {
101138
scope(|s| {
102-
for m in &self.machines {
103-
s.builder()
104-
.name("async-std/machine".to_string())
105-
.spawn(move |_| {
106-
abort_on_panic(|| {
107-
let _ = MACHINE.with(|machine| machine.set(m.clone()));
108-
m.run(self);
109-
})
110-
})
111-
.expect("cannot start a machine thread");
139+
(0..self.min_worker).for_each(|_| self.start_new_thread(s));
140+
141+
loop {
142+
match self.reciever.recv().unwrap() {
143+
Action::ScaleUp => self.start_new_thread(s),
144+
Action::ScaleDown => {
145+
// Random worker thread will recieve this notification
146+
// and terminate itself
147+
self.injector.push(Task::Terminate)
148+
}
149+
Action::Terminate => return,
150+
}
112151
}
113152
})
114153
.unwrap();
115154
}
116155

156+
/// Create more worker thread for the runtime
157+
pub fn scale_up(&self) {
158+
self.sender.send(Action::ScaleUp).unwrap()
159+
}
160+
161+
/// Terminate 1 worker thread, it is guaranteed that
162+
/// the number of worker thread won't go down
163+
/// below the number of available cpu
164+
pub fn scale_down(&self) {
165+
if self.stealers.read().unwrap().len() > self.min_worker {
166+
self.sender.send(Action::ScaleDown).unwrap()
167+
}
168+
}
169+
170+
/// Deregister worker thread by theirs id
171+
fn deregister(&self, id: usize) {
172+
let mut stealers = self.stealers.write().unwrap();
173+
let mut index = None;
174+
for i in 0..stealers.len() {
175+
if stealers[i].1 == id {
176+
index = Some(i);
177+
break;
178+
}
179+
}
180+
if let Some(index) = index {
181+
stealers.remove(index);
182+
if stealers.is_empty() {
183+
self.sender.send(Action::Terminate).unwrap();
184+
}
185+
}
186+
}
187+
117188
/// Unparks a thread polling the reactor.
118189
fn notify(&self) {
119190
atomic::fence(Ordering::SeqCst);
@@ -140,23 +211,34 @@ impl Runtime {
140211
struct Machine {
141212
/// Holds the processor until it gets stolen.
142213
processor: Spinlock<Processor>,
214+
215+
id: usize,
216+
drained: AtomicBool,
143217
}
144218

145219
impl Machine {
146220
/// Creates a new machine running a processor.
147-
fn new(p: Processor) -> Machine {
221+
fn new(id: usize, p: Processor) -> Machine {
148222
Machine {
149223
processor: Spinlock::new(p),
224+
id,
225+
drained: AtomicBool::new(false),
150226
}
151227
}
152228

153229
/// Schedules a task onto the machine.
154-
fn schedule(&self, rt: &Runtime, task: Runnable) {
155-
self.processor.lock().schedule(rt, task);
230+
fn schedule(&self, rt: &Runtime, task: Task) {
231+
if !self.drained.load(Ordering::SeqCst /* ??? */) {
232+
self.processor.lock().schedule(rt, task);
233+
} else {
234+
// We don't accept task anymore,
235+
// push to global queue
236+
rt.injector.push(task);
237+
}
156238
}
157239

158240
/// Finds the next runnable task.
159-
fn find_task(&self, rt: &Runtime) -> Steal<Runnable> {
241+
fn find_task(&self, rt: &Runtime) -> Steal<Task> {
160242
let mut retry = false;
161243

162244
// First try finding a task in the local queue or in the global queue.
@@ -187,7 +269,11 @@ impl Machine {
187269
Steal::Success(task) => return Steal::Success(task),
188270
}
189271

190-
if retry { Steal::Retry } else { Steal::Empty }
272+
if retry {
273+
Steal::Retry
274+
} else {
275+
Steal::Empty
276+
}
191277
}
192278

193279
/// Runs the machine on the current thread.
@@ -229,7 +315,13 @@ impl Machine {
229315

230316
// Try to find a runnable task.
231317
if let Steal::Success(task) = self.find_task(rt) {
232-
task.run();
318+
match task {
319+
Task::Runnable(task) => task.run(),
320+
Task::Terminate => {
321+
self.deregister(rt);
322+
return;
323+
}
324+
}
233325
runs += 1;
234326
fails = 0;
235327
continue;
@@ -277,14 +369,21 @@ impl Machine {
277369
fails = 0;
278370
}
279371
}
372+
373+
/// deregister this worker thread from Runtime
374+
fn deregister(&self, rt: &Runtime) {
375+
self.drained.store(true, Ordering::SeqCst /* ??? */);
376+
self.processor.lock().drain(rt);
377+
rt.deregister(self.id);
378+
}
280379
}
281380

282381
struct Processor {
283382
/// The local task queue.
284-
worker: Worker<Runnable>,
383+
worker: Worker<Task>,
285384

286385
/// Contains the next task to run as an optimization that skips the queue.
287-
slot: Option<Runnable>,
386+
slot: Option<Task>,
288387
}
289388

290389
impl Processor {
@@ -297,7 +396,7 @@ impl Processor {
297396
}
298397

299398
/// Schedules a task to run on this processor.
300-
fn schedule(&mut self, rt: &Runtime, task: Runnable) {
399+
fn schedule(&mut self, rt: &Runtime, task: Task) {
301400
match self.slot.replace(task) {
302401
None => {}
303402
Some(task) => {
@@ -316,28 +415,39 @@ impl Processor {
316415
}
317416

318417
/// Pops a task from this processor.
319-
fn pop_task(&mut self) -> Option<Runnable> {
418+
fn pop_task(&mut self) -> Option<Task> {
320419
self.slot.take().or_else(|| self.worker.pop())
321420
}
322421

323422
/// Steals a task from the global queue.
324-
fn steal_from_global(&self, rt: &Runtime) -> Steal<Runnable> {
423+
fn steal_from_global(&self, rt: &Runtime) -> Steal<Task> {
325424
rt.injector.steal_batch_and_pop(&self.worker)
326425
}
327426

328427
/// Steals a task from other processors.
329-
fn steal_from_others(&self, rt: &Runtime) -> Steal<Runnable> {
428+
fn steal_from_others(&self, rt: &Runtime) -> Steal<Task> {
429+
let stealers = rt.stealers.read().unwrap();
430+
330431
// Pick a random starting point in the list of queues.
331-
let len = rt.stealers.len();
432+
let len = stealers.len();
332433
let start = random(len as u32) as usize;
333434

334435
// Create an iterator over stealers that starts from the chosen point.
335-
let (l, r) = rt.stealers.split_at(start);
436+
let (l, r) = stealers.split_at(start);
336437
let stealers = r.iter().chain(l.iter());
337438

338439
// Try stealing a batch of tasks from each queue.
339440
stealers
340-
.map(|s| s.steal_batch_and_pop(&self.worker))
441+
.map(|s| s.0.steal_batch_and_pop(&self.worker))
341442
.collect()
342443
}
444+
445+
// Move all pending tasks to global queue.
446+
fn drain(&mut self, rt: &Runtime) {
447+
self.flush_slot(rt);
448+
449+
while let Some(task) = self.worker.pop() {
450+
rt.injector.push(task);
451+
}
452+
}
343453
}

0 commit comments

Comments
 (0)