Skip to content

Commit 8920bfb

Browse files
committed
adding monitor
1 parent 1fcb2d5 commit 8920bfb

File tree

3 files changed

+62
-5
lines changed

3 files changed

+62
-5
lines changed

src/rt/mod.rs

+9
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ use crate::utils::abort_on_panic;
99
pub use reactor::{Reactor, Watcher};
1010
pub use runtime::Runtime;
1111

12+
mod monitor;
1213
mod reactor;
1314
mod runtime;
1415

@@ -21,3 +22,11 @@ pub static RUNTIME: Lazy<Runtime> = Lazy::new(|| {
2122

2223
Runtime::new()
2324
});
25+
26+
pub fn scale_up() {
27+
RUNTIME.scale_up();
28+
}
29+
30+
pub fn scale_down() {
31+
RUNTIME.scale_down();
32+
}

src/rt/monitor.rs

+42
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
//! Monitor for the runtime.
2+
3+
use std::sync::atomic::{AtomicBool, Ordering};
4+
use std::sync::Arc;
5+
use std::thread;
6+
use std::time::{Duration, Instant};
7+
8+
use crate::rt;
9+
use crate::task;
10+
11+
pub fn run() {
12+
const PROB_INTERVAL: Duration = Duration::from_millis(500);
13+
const SCALE_DOWN_INTERVAL: Duration = Duration::from_secs(5);
14+
15+
let running = &Arc::new(AtomicBool::new(false));
16+
17+
{
18+
let running = Arc::clone(running);
19+
task::spawn(async move {
20+
loop {
21+
running.store(true, Ordering::SeqCst);
22+
task::sleep(PROB_INTERVAL).await;
23+
}
24+
});
25+
}
26+
27+
let mut next_scalling_down = Instant::now() + SCALE_DOWN_INTERVAL;
28+
29+
loop {
30+
running.store(false, Ordering::SeqCst);
31+
thread::sleep(PROB_INTERVAL + Duration::from_millis(10));
32+
if !running.load(Ordering::SeqCst) {
33+
eprintln!("WARNING: You are blocking the runtime, please use spawn_blocking");
34+
rt::scale_up();
35+
}
36+
37+
if next_scalling_down <= Instant::now() {
38+
rt::scale_down();
39+
next_scalling_down += SCALE_DOWN_INTERVAL;
40+
}
41+
}
42+
}

src/rt/runtime.rs

+11-5
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ use crossbeam_deque::{Injector, Steal, Stealer, Worker};
1212
use crossbeam_utils::thread::{scope, Scope};
1313
use once_cell::unsync::OnceCell;
1414

15+
use crate::rt::monitor;
1516
use crate::rt::Reactor;
1617
use crate::sync::Spinlock;
1718
use crate::task::Runnable;
@@ -40,7 +41,6 @@ enum Task {
4041
enum Action {
4142
ScaleUp,
4243
ScaleDown,
43-
Terminate,
4444
}
4545

4646
/// An async runtime.
@@ -138,6 +138,16 @@ impl Runtime {
138138
scope(|s| {
139139
(0..self.min_worker).for_each(|_| self.start_new_thread(s));
140140

141+
s.builder()
142+
.name("async-std/monitor".to_string())
143+
.spawn(move |_| {
144+
abort_on_panic(|| {
145+
monitor::run();
146+
panic!("Monitor function must not return");
147+
})
148+
})
149+
.expect("cannot start a monitor thread");
150+
141151
loop {
142152
match self.reciever.recv().unwrap() {
143153
Action::ScaleUp => self.start_new_thread(s),
@@ -146,7 +156,6 @@ impl Runtime {
146156
// and terminate itself
147157
self.injector.push(Task::Terminate)
148158
}
149-
Action::Terminate => return,
150159
}
151160
}
152161
})
@@ -179,9 +188,6 @@ impl Runtime {
179188
}
180189
if let Some(index) = index {
181190
stealers.remove(index);
182-
if stealers.is_empty() {
183-
self.sender.send(Action::Terminate).unwrap();
184-
}
185191
}
186192
}
187193

0 commit comments

Comments
 (0)