Skip to content

Commit b46c151

Browse files
committed
refactoring, now user can provide their monitor
1 parent 7ec8db1 commit b46c151

File tree

4 files changed

+168
-63
lines changed

4 files changed

+168
-63
lines changed

src/lib.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -270,7 +270,7 @@ cfg_default! {
270270
pub mod fs;
271271
pub mod path;
272272
pub mod net;
273-
pub(crate) mod rt;
273+
pub mod rt;
274274
}
275275

276276
cfg_unstable! {

src/rt/mod.rs

+6-12
Original file line numberDiff line numberDiff line change
@@ -6,33 +6,27 @@ use once_cell::sync::Lazy;
66

77
use crate::utils::abort_on_panic;
88

9-
pub use reactor::{Reactor, Watcher};
10-
pub use runtime::Runtime;
9+
pub(crate) use reactor::{Reactor, Watcher};
10+
pub(crate) use runtime::Runtime;
1111

12-
mod monitor;
12+
pub mod monitor;
1313
mod reactor;
1414
mod runtime;
1515

1616
/// The global runtime.
17-
pub static RUNTIME: Lazy<Runtime> = Lazy::new(|| {
17+
pub(crate) static RUNTIME: Lazy<Runtime> = Lazy::new(|| {
1818
thread::Builder::new()
1919
.name("async-std/runtime".to_string())
2020
.spawn(|| abort_on_panic(|| RUNTIME.run()))
2121
.expect("cannot start a runtime thread");
2222

23-
monitor::spawn_thread();
24-
2523
Runtime::new()
2624
});
2725

28-
/// Spawn new worker thread
29-
pub fn scale_up() {
26+
pub(crate) fn scale_up() {
3027
RUNTIME.scale_up();
3128
}
3229

33-
/// Terminate one worker thread.
34-
/// The number of worker thread cannot go below the number of availabe cpus,
35-
/// so this function when will do nothing if that happen.
36-
pub fn scale_down() {
30+
pub(crate) fn scale_down() {
3731
RUNTIME.scale_down();
3832
}

src/rt/monitor.rs

+150-45
Original file line numberDiff line numberDiff line change
@@ -1,51 +1,156 @@
1+
//! Monitor for the runtime.
2+
13
use std::sync::{Arc, Mutex};
24
use std::thread;
3-
use std::time::Duration;
5+
use std::time::{Duration, Instant};
6+
7+
use once_cell::sync::Lazy;
48

59
use crate::rt;
610
use crate::task;
7-
use crate::utils::abort_on_panic;
8-
9-
pub fn spawn_thread() {
10-
thread::Builder::new()
11-
.name("async-std/monitor".to_string())
12-
.spawn(|| {
13-
const PROBING_DURATION_MS: u64 = 500;
14-
const SCALING_DOWN_SEC: u64 = 1 * 60; // 1 minute
15-
16-
abort_on_panic(|| {
17-
let running = &Arc::new(Mutex::new(false));
18-
19-
{
20-
let running = Arc::clone(running);
21-
task::spawn(async move {
22-
loop {
23-
*running.lock().unwrap() = true;
24-
task::sleep(Duration::from_millis(PROBING_DURATION_MS)).await;
25-
}
26-
});
27-
}
28-
29-
{
30-
task::spawn(async {
31-
loop {
32-
task::sleep(Duration::from_secs(SCALING_DOWN_SEC)).await;
33-
rt::scale_down();
34-
}
35-
});
36-
}
37-
38-
loop {
39-
*running.lock().unwrap() = false;
40-
thread::sleep(Duration::from_millis(PROBING_DURATION_MS * 2));
41-
if !*running.lock().unwrap() {
42-
eprintln!(
43-
"WARNING: You are blocking the runtime, please use spawn_blocking"
44-
);
45-
rt::scale_up();
46-
}
47-
}
48-
})
49-
})
50-
.expect("cannot start a monitor thread");
11+
12+
/// Context of monitor thread
13+
#[derive(Debug)]
14+
pub struct Context(());
15+
16+
impl Context {
17+
/// Spawn a new worker thread.
18+
pub fn scale_up(&self) {
19+
rt::scale_up();
20+
}
21+
22+
/// Terminate a worker thread if the number of current worker thread
23+
/// is greater than number avaliable cpu
24+
pub fn scale_down(&self) {
25+
rt::scale_down();
26+
}
27+
}
28+
29+
pub(crate) static MONITOR: Lazy<Mutex<&'static Monitor>> =
30+
Lazy::new(|| Mutex::new(&DEFAULT_MONITOR));
31+
32+
/// Monitor function.
33+
pub struct Monitor(pub fn(&Context));
34+
35+
impl Monitor {
36+
pub(crate) fn run(&self) {
37+
let scaler = Context(());
38+
self.0(&scaler);
39+
}
40+
}
41+
42+
impl std::fmt::Debug for Monitor {
43+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
44+
f.write_str("<monitor>")
45+
}
46+
}
47+
48+
/// Replace the monitor.
49+
///
50+
/// This will replace monitor function used by monitor thread by runtime.
51+
///
52+
/// Monitor can only be replaced before any task spawned into runtime.
53+
///
54+
/// Default monitor is [`DEFAULT_MONITOR`]
55+
///
56+
/// [`DEFAULT_MONITOR`]: static.DEFAULT_MONITOR.html
57+
pub fn replace(new: &'static Monitor) -> Result<&'static Monitor, &'static str> {
58+
let mut m = MONITOR.try_lock().map_err(|_| "Cannot change monitor")?;
59+
let old: &'static Monitor = &m;
60+
*m = new;
61+
Ok(old)
62+
}
63+
64+
async fn repeater(interval: Duration, mut before: impl FnMut(), mut after: impl FnMut()) {
65+
loop {
66+
before();
67+
task::sleep(interval).await;
68+
after();
69+
}
70+
}
71+
72+
fn repeater_blocking(interval: Duration, mut before: impl FnMut(), mut after: impl FnMut()) {
73+
loop {
74+
before();
75+
thread::sleep(interval);
76+
after();
77+
}
78+
}
79+
80+
/// Default monitor
81+
///
82+
/// Whenever runtime is blocked, print warning and scale up the runtime.
83+
/// This also try to scale down when there are too many worker thread.
84+
pub static DEFAULT_MONITOR: Monitor = Monitor(default_monitor);
85+
86+
fn default_monitor(scaler: &Context) {
87+
const PROB_INTERVAL: Duration = Duration::from_millis(500);
88+
const SCALE_DOWN_INTERVAL: Duration = Duration::from_secs(60);
89+
90+
let running = &Arc::new(Mutex::new(false));
91+
92+
{
93+
let running = Arc::clone(running);
94+
task::spawn(repeater(
95+
PROB_INTERVAL,
96+
move || {
97+
*running.lock().unwrap() = true;
98+
},
99+
|| {},
100+
));
101+
}
102+
103+
let mut next_scalling_down = Instant::now() + SCALE_DOWN_INTERVAL;
104+
105+
repeater_blocking(
106+
PROB_INTERVAL + Duration::from_millis(50),
107+
|| {
108+
*running.lock().unwrap() = false;
109+
},
110+
move || {
111+
if !*running.lock().unwrap() {
112+
eprintln!("WARNING: You are blocking the runtime, please use spawn_blocking");
113+
scaler.scale_up();
114+
}
115+
116+
if next_scalling_down < Instant::now() {
117+
scaler.scale_down();
118+
next_scalling_down += SCALE_DOWN_INTERVAL;
119+
}
120+
},
121+
);
122+
}
123+
124+
/// Abort on blocking monitor.
125+
///
126+
/// Whenever runtime is blocked, abort the program.
127+
pub static ABORT_ON_BLOCKING_MONITOR: Monitor = Monitor(abort_on_blocking_monitor);
128+
129+
fn abort_on_blocking_monitor(_scaler: &Context) {
130+
const PROB_INTERVAL: Duration = Duration::from_secs(1);
131+
132+
let running = &Arc::new(Mutex::new(false));
133+
134+
{
135+
let running = Arc::clone(running);
136+
task::spawn(repeater(
137+
PROB_INTERVAL,
138+
move || {
139+
*running.lock().unwrap() = true;
140+
},
141+
|| {},
142+
));
143+
}
144+
145+
repeater_blocking(
146+
PROB_INTERVAL + Duration::from_millis(50),
147+
|| {
148+
*running.lock().unwrap() = false;
149+
},
150+
|| {
151+
if !*running.lock().unwrap() {
152+
panic!("FATAL: You are blocking the runtime, please use spawn_blocking");
153+
}
154+
},
155+
);
51156
}

src/rt/runtime.rs

+11-5
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ use crossbeam_deque::{Injector, Steal, Stealer, Worker};
1212
use crossbeam_utils::thread::{scope, Scope};
1313
use once_cell::unsync::OnceCell;
1414

15+
use crate::rt::monitor::MONITOR;
1516
use crate::rt::Reactor;
1617
use crate::sync::Spinlock;
1718
use crate::task::Runnable;
@@ -40,7 +41,6 @@ enum Task {
4041
enum Action {
4142
ScaleUp,
4243
ScaleDown,
43-
Terminate,
4444
}
4545

4646
/// An async runtime.
@@ -138,6 +138,16 @@ impl Runtime {
138138
scope(|s| {
139139
(0..self.min_worker).for_each(|_| self.start_new_thread(s));
140140

141+
s.builder()
142+
.name("async-std/monitor".to_string())
143+
.spawn(move |_| {
144+
abort_on_panic(|| {
145+
MONITOR.lock().unwrap().run();
146+
panic!("Monitor function must not return");
147+
})
148+
})
149+
.expect("cannot start a monitor thread");
150+
141151
loop {
142152
match self.reciever.recv().unwrap() {
143153
Action::ScaleUp => self.start_new_thread(s),
@@ -146,7 +156,6 @@ impl Runtime {
146156
// and terminate itself
147157
self.injector.push(Task::Terminate)
148158
}
149-
Action::Terminate => return,
150159
}
151160
}
152161
})
@@ -179,9 +188,6 @@ impl Runtime {
179188
}
180189
if let Some(index) = index {
181190
stealers.remove(index);
182-
if stealers.is_empty() {
183-
self.sender.send(Action::Terminate).unwrap();
184-
}
185191
}
186192
}
187193

0 commit comments

Comments
 (0)