Skip to content

Commit 4476a85

Browse files
committed
refactoring, now user can provide their monitor
1 parent 7ec8db1 commit 4476a85

File tree

4 files changed

+142
-63
lines changed

4 files changed

+142
-63
lines changed

src/lib.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -270,7 +270,7 @@ cfg_default! {
270270
pub mod fs;
271271
pub mod path;
272272
pub mod net;
273-
pub(crate) mod rt;
273+
pub mod rt;
274274
}
275275

276276
cfg_unstable! {

src/rt/mod.rs

+6-12
Original file line numberDiff line numberDiff line change
@@ -6,33 +6,27 @@ use once_cell::sync::Lazy;
66

77
use crate::utils::abort_on_panic;
88

9-
pub use reactor::{Reactor, Watcher};
10-
pub use runtime::Runtime;
9+
pub(crate) use reactor::{Reactor, Watcher};
10+
pub(crate) use runtime::Runtime;
1111

12-
mod monitor;
12+
pub mod monitor;
1313
mod reactor;
1414
mod runtime;
1515

1616
/// The global runtime.
17-
pub static RUNTIME: Lazy<Runtime> = Lazy::new(|| {
17+
pub(crate) static RUNTIME: Lazy<Runtime> = Lazy::new(|| {
1818
thread::Builder::new()
1919
.name("async-std/runtime".to_string())
2020
.spawn(|| abort_on_panic(|| RUNTIME.run()))
2121
.expect("cannot start a runtime thread");
2222

23-
monitor::spawn_thread();
24-
2523
Runtime::new()
2624
});
2725

28-
/// Spawn new worker thread
29-
pub fn scale_up() {
26+
pub(crate) fn scale_up() {
3027
RUNTIME.scale_up();
3128
}
3229

33-
/// Terminate one worker thread.
34-
/// The number of worker thread cannot go below the number of availabe cpus,
35-
/// so this function when will do nothing if that happen.
36-
pub fn scale_down() {
30+
pub(crate) fn scale_down() {
3731
RUNTIME.scale_down();
3832
}

src/rt/monitor.rs

+124-45
Original file line numberDiff line numberDiff line change
@@ -1,51 +1,130 @@
1+
//! Monitor for the runtime.
2+
3+
use std::sync::atomic::{AtomicBool, Ordering};
14
use std::sync::{Arc, Mutex};
25
use std::thread;
3-
use std::time::Duration;
6+
use std::time::{Duration, Instant};
7+
8+
use once_cell::sync::Lazy;
49

510
use crate::rt;
611
use crate::task;
7-
use crate::utils::abort_on_panic;
8-
9-
pub fn spawn_thread() {
10-
thread::Builder::new()
11-
.name("async-std/monitor".to_string())
12-
.spawn(|| {
13-
const PROBING_DURATION_MS: u64 = 500;
14-
const SCALING_DOWN_SEC: u64 = 1 * 60; // 1 minute
15-
16-
abort_on_panic(|| {
17-
let running = &Arc::new(Mutex::new(false));
18-
19-
{
20-
let running = Arc::clone(running);
21-
task::spawn(async move {
22-
loop {
23-
*running.lock().unwrap() = true;
24-
task::sleep(Duration::from_millis(PROBING_DURATION_MS)).await;
25-
}
26-
});
27-
}
28-
29-
{
30-
task::spawn(async {
31-
loop {
32-
task::sleep(Duration::from_secs(SCALING_DOWN_SEC)).await;
33-
rt::scale_down();
34-
}
35-
});
36-
}
37-
38-
loop {
39-
*running.lock().unwrap() = false;
40-
thread::sleep(Duration::from_millis(PROBING_DURATION_MS * 2));
41-
if !*running.lock().unwrap() {
42-
eprintln!(
43-
"WARNING: You are blocking the runtime, please use spawn_blocking"
44-
);
45-
rt::scale_up();
46-
}
47-
}
48-
})
49-
})
50-
.expect("cannot start a monitor thread");
12+
13+
/// Context of monitor thread
14+
#[derive(Debug)]
15+
pub struct Context(());
16+
17+
impl Context {
18+
/// Spawn a new worker thread.
19+
pub fn scale_up(&self) {
20+
rt::scale_up();
21+
}
22+
23+
/// Terminate a worker thread if the number of current worker thread
24+
/// is greater than number avaliable cpu
25+
pub fn scale_down(&self) {
26+
rt::scale_down();
27+
}
28+
}
29+
30+
pub(crate) static MONITOR: Lazy<Mutex<&'static Monitor>> = Lazy::new(|| Mutex::new(&DEFAULT));
31+
32+
/// Monitor function.
33+
pub struct Monitor(pub fn(&Context));
34+
35+
impl Monitor {
36+
pub(crate) fn run(&self) {
37+
let ctx = Context(());
38+
self.0(&ctx);
39+
}
40+
}
41+
42+
impl std::fmt::Debug for Monitor {
43+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
44+
f.write_str("Monitor")
45+
}
46+
}
47+
48+
/// Replace the monitor.
49+
///
50+
/// This will replace monitor function used by monitor thread by runtime.
51+
///
52+
/// Monitor can only be replaced before any task spawned into runtime.
53+
///
54+
/// Default monitor is [`DEFAULT`]
55+
///
56+
/// [`DEFAULT`]: static.DEFAULT.html
57+
pub fn replace(new: &'static Monitor) -> Result<&'static Monitor, &'static str> {
58+
let mut m = MONITOR.try_lock().map_err(|_| "Cannot change monitor")?;
59+
let old: &'static Monitor = &m;
60+
*m = new;
61+
Ok(old)
62+
}
63+
64+
/// Default monitor
65+
///
66+
/// Whenever runtime is blocked, print warning and scale up the runtime.
67+
/// This also try to scale down when there are too many worker thread.
68+
pub static DEFAULT: Monitor = Monitor(default);
69+
70+
fn default(ctx: &Context) {
71+
const PROB_INTERVAL: Duration = Duration::from_millis(500);
72+
const SCALE_DOWN_INTERVAL: Duration = Duration::from_secs(10);
73+
74+
let running = &Arc::new(AtomicBool::new(false));
75+
76+
{
77+
let running = Arc::clone(running);
78+
task::spawn(async move {
79+
loop {
80+
running.store(true, Ordering::SeqCst);
81+
task::sleep(PROB_INTERVAL).await;
82+
}
83+
});
84+
}
85+
86+
let mut next_scalling_down = Instant::now() + SCALE_DOWN_INTERVAL;
87+
88+
loop {
89+
running.store(false, Ordering::SeqCst);
90+
thread::sleep(PROB_INTERVAL + Duration::from_millis(10));
91+
if !running.load(Ordering::SeqCst) {
92+
eprintln!("WARNING: You are blocking the runtime, please use spawn_blocking");
93+
ctx.scale_up();
94+
}
95+
96+
if next_scalling_down <= Instant::now() {
97+
ctx.scale_down();
98+
next_scalling_down += SCALE_DOWN_INTERVAL;
99+
}
100+
}
101+
}
102+
103+
/// Abort on blocking.
104+
///
105+
/// Will panic whenever runtime is blocked.
106+
pub static PANIC_ON_BLOCKING: Monitor = Monitor(panic_on_blocking);
107+
108+
fn panic_on_blocking(_ctx: &Context) {
109+
const PROB_INTERVAL: Duration = Duration::from_millis(500);
110+
111+
let running = &Arc::new(AtomicBool::new(false));
112+
113+
{
114+
let running = Arc::clone(running);
115+
task::spawn(async move {
116+
loop {
117+
running.store(true, Ordering::SeqCst);
118+
task::sleep(PROB_INTERVAL).await;
119+
}
120+
});
121+
}
122+
123+
loop {
124+
running.store(false, Ordering::SeqCst);
125+
thread::sleep(PROB_INTERVAL + PROB_INTERVAL);
126+
if !running.load(Ordering::SeqCst) {
127+
panic!("FATAL: You are blocking the runtime, please use spawn_blocking");
128+
}
129+
}
51130
}

src/rt/runtime.rs

+11-5
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ use crossbeam_deque::{Injector, Steal, Stealer, Worker};
1212
use crossbeam_utils::thread::{scope, Scope};
1313
use once_cell::unsync::OnceCell;
1414

15+
use crate::rt::monitor::MONITOR;
1516
use crate::rt::Reactor;
1617
use crate::sync::Spinlock;
1718
use crate::task::Runnable;
@@ -40,7 +41,6 @@ enum Task {
4041
enum Action {
4142
ScaleUp,
4243
ScaleDown,
43-
Terminate,
4444
}
4545

4646
/// An async runtime.
@@ -138,6 +138,16 @@ impl Runtime {
138138
scope(|s| {
139139
(0..self.min_worker).for_each(|_| self.start_new_thread(s));
140140

141+
s.builder()
142+
.name("async-std/monitor".to_string())
143+
.spawn(move |_| {
144+
abort_on_panic(|| {
145+
MONITOR.lock().unwrap().run();
146+
panic!("Monitor function must not return");
147+
})
148+
})
149+
.expect("cannot start a monitor thread");
150+
141151
loop {
142152
match self.reciever.recv().unwrap() {
143153
Action::ScaleUp => self.start_new_thread(s),
@@ -146,7 +156,6 @@ impl Runtime {
146156
// and terminate itself
147157
self.injector.push(Task::Terminate)
148158
}
149-
Action::Terminate => return,
150159
}
151160
}
152161
})
@@ -179,9 +188,6 @@ impl Runtime {
179188
}
180189
if let Some(index) = index {
181190
stealers.remove(index);
182-
if stealers.is_empty() {
183-
self.sender.send(Action::Terminate).unwrap();
184-
}
185191
}
186192
}
187193

0 commit comments

Comments
 (0)