Skip to content

Commit 7714038

Browse files
committed
refactoring, now user can provide their monitor
1 parent 7ec8db1 commit 7714038

File tree

4 files changed

+140
-55
lines changed

4 files changed

+140
-55
lines changed

src/lib.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -270,7 +270,7 @@ cfg_default! {
270270
pub mod fs;
271271
pub mod path;
272272
pub mod net;
273-
pub(crate) mod rt;
273+
pub mod rt;
274274
}
275275

276276
cfg_unstable! {

src/rt/mod.rs

+9-5
Original file line numberDiff line numberDiff line change
@@ -6,22 +6,25 @@ use once_cell::sync::Lazy;
66

77
use crate::utils::abort_on_panic;
88

9-
pub use reactor::{Reactor, Watcher};
10-
pub use runtime::Runtime;
9+
pub(crate) use reactor::{Reactor, Watcher};
10+
pub(crate) use runtime::Runtime;
1111

1212
mod monitor;
13+
pub use monitor::replace_monitor;
14+
pub use monitor::Monitor;
15+
pub use monitor::ABORT_ON_BLOCKING_MONITOR;
16+
pub use monitor::DEFAULT_MONITOR;
17+
1318
mod reactor;
1419
mod runtime;
1520

1621
/// The global runtime.
17-
pub static RUNTIME: Lazy<Runtime> = Lazy::new(|| {
22+
pub(crate) static RUNTIME: Lazy<Runtime> = Lazy::new(|| {
1823
thread::Builder::new()
1924
.name("async-std/runtime".to_string())
2025
.spawn(|| abort_on_panic(|| RUNTIME.run()))
2126
.expect("cannot start a runtime thread");
2227

23-
monitor::spawn_thread();
24-
2528
Runtime::new()
2629
});
2730

@@ -31,6 +34,7 @@ pub fn scale_up() {
3134
}
3235

3336
/// Terminate one worker thread.
37+
///
3438
/// The number of worker thread cannot go below the number of availabe cpus,
3539
/// so this function when will do nothing if that happen.
3640
pub fn scale_down() {

src/rt/monitor.rs

+120-44
Original file line numberDiff line numberDiff line change
@@ -2,50 +2,126 @@ use std::sync::{Arc, Mutex};
22
use std::thread;
33
use std::time::Duration;
44

5+
use once_cell::sync::Lazy;
6+
57
use crate::rt;
68
use crate::task;
7-
use crate::utils::abort_on_panic;
8-
9-
pub fn spawn_thread() {
10-
thread::Builder::new()
11-
.name("async-std/monitor".to_string())
12-
.spawn(|| {
13-
const PROBING_DURATION_MS: u64 = 500;
14-
const SCALING_DOWN_SEC: u64 = 1 * 60; // 1 minute
15-
16-
abort_on_panic(|| {
17-
let running = &Arc::new(Mutex::new(false));
18-
19-
{
20-
let running = Arc::clone(running);
21-
task::spawn(async move {
22-
loop {
23-
*running.lock().unwrap() = true;
24-
task::sleep(Duration::from_millis(PROBING_DURATION_MS)).await;
25-
}
26-
});
27-
}
28-
29-
{
30-
task::spawn(async {
31-
loop {
32-
task::sleep(Duration::from_secs(SCALING_DOWN_SEC)).await;
33-
rt::scale_down();
34-
}
35-
});
36-
}
37-
38-
loop {
39-
*running.lock().unwrap() = false;
40-
thread::sleep(Duration::from_millis(PROBING_DURATION_MS * 2));
41-
if !*running.lock().unwrap() {
42-
eprintln!(
43-
"WARNING: You are blocking the runtime, please use spawn_blocking"
44-
);
45-
rt::scale_up();
46-
}
47-
}
48-
})
49-
})
50-
.expect("cannot start a monitor thread");
9+
10+
pub(crate) static MONITOR: Lazy<Mutex<&'static Monitor>> =
11+
Lazy::new(|| Mutex::new(&DEFAULT_MONITOR));
12+
13+
/// Monitor function.
14+
#[derive(Debug)]
15+
pub struct Monitor(pub fn());
16+
17+
impl Monitor {
18+
pub(crate) fn run(&self) {
19+
self.0();
20+
}
21+
}
22+
23+
/// Replace the monitor.
24+
///
25+
/// This will replace monitor function used by monitor thread by runtime.
26+
///
27+
/// Monitor thread is special thread that can be used to monitor wether
28+
/// the runtime is blocked or not.
29+
///
30+
/// Default monitor is [`DEFAULT_MONITOR`]
31+
///
32+
/// [`DEFAULT_MONITOR`]: static.DEFAULT_MONITOR.html
33+
pub fn replace_monitor(new: &'static Monitor) -> &'static Monitor {
34+
let mut m = MONITOR.lock().unwrap();
35+
let old = &m as &'static Monitor;
36+
*m = new;
37+
old
38+
}
39+
40+
/// Default monitor
41+
///
42+
/// Whenever runtime is blocked, print warning and scale up the runtime.
43+
/// This also try to scale down when there are too many worker thread.
44+
pub static DEFAULT_MONITOR: Monitor = Monitor(default_monitor);
45+
46+
/// Abort on blocking monitor.
47+
///
48+
/// Whenever runtime is blocked, abort the program.
49+
pub static ABORT_ON_BLOCKING_MONITOR: Monitor = Monitor(abort_on_blocking_monitor);
50+
51+
async fn repeater(interval: Duration, before: impl Fn(), after: impl Fn()) {
52+
loop {
53+
before();
54+
task::sleep(interval).await;
55+
after();
56+
}
57+
}
58+
59+
fn repeater_blocking(interval: Duration, before: impl Fn(), after: impl Fn()) {
60+
loop {
61+
before();
62+
thread::sleep(interval);
63+
after();
64+
}
65+
}
66+
67+
fn default_monitor() {
68+
const PROB_INTERVAL: Duration = Duration::from_millis(500);
69+
const SCALE_DOWN_INTERVAL: Duration = Duration::from_secs(60);
70+
71+
let running = &Arc::new(Mutex::new(false));
72+
73+
{
74+
let running = Arc::clone(running);
75+
task::spawn(repeater(
76+
PROB_INTERVAL,
77+
move || {
78+
*running.lock().unwrap() = true;
79+
},
80+
|| {},
81+
));
82+
}
83+
84+
task::spawn(repeater(SCALE_DOWN_INTERVAL, || rt::scale_down(), || {}));
85+
86+
repeater_blocking(
87+
PROB_INTERVAL + Duration::from_millis(50),
88+
|| {
89+
*running.lock().unwrap() = false;
90+
},
91+
|| {
92+
if !*running.lock().unwrap() {
93+
eprintln!("WARNING: You are blocking the runtime, please use spawn_blocking");
94+
rt::scale_up();
95+
}
96+
},
97+
);
98+
}
99+
100+
fn abort_on_blocking_monitor() {
101+
const PROB_INTERVAL: Duration = Duration::from_secs(1);
102+
103+
let running = &Arc::new(Mutex::new(false));
104+
105+
{
106+
let running = Arc::clone(running);
107+
task::spawn(repeater(
108+
PROB_INTERVAL,
109+
move || {
110+
*running.lock().unwrap() = true;
111+
},
112+
|| {},
113+
));
114+
}
115+
116+
repeater_blocking(
117+
PROB_INTERVAL + Duration::from_millis(50),
118+
|| {
119+
*running.lock().unwrap() = false;
120+
},
121+
|| {
122+
if !*running.lock().unwrap() {
123+
panic!("FATAL: You are blocking the runtime, please use spawn_blocking");
124+
}
125+
},
126+
);
51127
}

src/rt/runtime.rs

+10-5
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ use crossbeam_deque::{Injector, Steal, Stealer, Worker};
1212
use crossbeam_utils::thread::{scope, Scope};
1313
use once_cell::unsync::OnceCell;
1414

15+
use crate::rt::monitor::MONITOR;
1516
use crate::rt::Reactor;
1617
use crate::sync::Spinlock;
1718
use crate::task::Runnable;
@@ -40,7 +41,6 @@ enum Task {
4041
enum Action {
4142
ScaleUp,
4243
ScaleDown,
43-
Terminate,
4444
}
4545

4646
/// An async runtime.
@@ -138,6 +138,15 @@ impl Runtime {
138138
scope(|s| {
139139
(0..self.min_worker).for_each(|_| self.start_new_thread(s));
140140

141+
s.builder()
142+
.name("async-std/monitor".to_string())
143+
.spawn(move |_| {
144+
abort_on_panic(|| {
145+
MONITOR.lock().unwrap().run();
146+
})
147+
})
148+
.expect("cannot start a monitor thread");
149+
141150
loop {
142151
match self.reciever.recv().unwrap() {
143152
Action::ScaleUp => self.start_new_thread(s),
@@ -146,7 +155,6 @@ impl Runtime {
146155
// and terminate itself
147156
self.injector.push(Task::Terminate)
148157
}
149-
Action::Terminate => return,
150158
}
151159
}
152160
})
@@ -179,9 +187,6 @@ impl Runtime {
179187
}
180188
if let Some(index) = index {
181189
stealers.remove(index);
182-
if stealers.is_empty() {
183-
self.sender.send(Action::Terminate).unwrap();
184-
}
185190
}
186191
}
187192

0 commit comments

Comments
 (0)