Skip to content

Commit be3f9f5

Browse files
committed
Implement the ability to block on multiple futures at once
In the next commits we'll be adding a second notify pipeline - from the `ChainMonitor` back to the background processor. This will cause the `background-processor` to need to await multiple wakers at once, which we cannot do in the current scheme without taking on a full async runtime. Building a multi-future waiter isn't so bad, and notably will allow us to remove the existing sleep pipeline entirely, reducing the complexity of our wakers implementation by only having one notify path to consider.
1 parent eeb721d commit be3f9f5

File tree

1 file changed

+95
-62
lines changed

1 file changed

+95
-62
lines changed

lightning/src/util/wakers.rs

Lines changed: 95 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -30,75 +30,22 @@ use core::pin::Pin;
3030
/// Used to signal to one of many waiters that the condition they're waiting on has happened.
3131
pub(crate) struct Notifier {
3232
notify_pending: Mutex<(bool, Option<Arc<Mutex<FutureState>>>)>,
33-
condvar: Condvar,
34-
}
35-
36-
macro_rules! check_woken {
37-
($guard: expr, $retval: expr) => { {
38-
if $guard.0 {
39-
$guard.0 = false;
40-
if $guard.1.as_ref().map(|l| l.lock().unwrap().complete).unwrap_or(false) {
41-
// If we're about to return as woken, and the future state is marked complete, wipe
42-
// the future state and let the next future wait until we get a new notify.
43-
$guard.1.take();
44-
}
45-
return $retval;
46-
}
47-
} }
4833
}
4934

5035
impl Notifier {
5136
pub(crate) fn new() -> Self {
5237
Self {
5338
notify_pending: Mutex::new((false, None)),
54-
condvar: Condvar::new(),
55-
}
56-
}
57-
58-
fn propagate_future_state_to_notify_flag(&self) -> MutexGuard<(bool, Option<Arc<Mutex<FutureState>>>)> {
59-
let mut lock = self.notify_pending.lock().unwrap();
60-
if let Some(existing_state) = &lock.1 {
61-
if existing_state.lock().unwrap().callbacks_made {
62-
// If the existing `FutureState` has completed and actually made callbacks,
63-
// consider the notification flag to have been cleared and reset the future state.
64-
lock.1.take();
65-
lock.0 = false;
66-
}
6739
}
68-
lock
6940
}
7041

7142
pub(crate) fn wait(&self) {
72-
loop {
73-
let mut guard = self.propagate_future_state_to_notify_flag();
74-
check_woken!(guard, ());
75-
guard = self.condvar.wait(guard).unwrap();
76-
check_woken!(guard, ());
77-
}
43+
Sleeper::from_single_future(self.get_future()).wait();
7844
}
7945

8046
#[cfg(any(test, feature = "std"))]
8147
pub(crate) fn wait_timeout(&self, max_wait: Duration) -> bool {
82-
let current_time = Instant::now();
83-
loop {
84-
let mut guard = self.propagate_future_state_to_notify_flag();
85-
check_woken!(guard, true);
86-
guard = self.condvar.wait_timeout(guard, max_wait).unwrap().0;
87-
check_woken!(guard, true);
88-
// Due to spurious wakeups that can happen on `wait_timeout`, here we need to check if the
89-
// desired wait time has actually passed, and if not then restart the loop with a reduced wait
90-
// time. Note that this logic can be highly simplified through the use of
91-
// `Condvar::wait_while` and `Condvar::wait_timeout_while`, if and when our MSRV is raised to
92-
// 1.42.0.
93-
let elapsed = current_time.elapsed();
94-
if elapsed >= max_wait {
95-
return false;
96-
}
97-
match max_wait.checked_sub(elapsed) {
98-
None => return false,
99-
Some(_) => continue
100-
}
101-
}
48+
Sleeper::from_single_future(self.get_future()).wait_timeout(max_wait)
10249
}
10350

10451
/// Wake waiters, tracking that wake needs to occur even if there are currently no waiters.
@@ -111,13 +58,19 @@ impl Notifier {
11158
}
11259
}
11360
lock.0 = true;
114-
mem::drop(lock);
115-
self.condvar.notify_all();
11661
}
11762

11863
/// Gets a [`Future`] that will get woken up with any waiters
11964
pub(crate) fn get_future(&self) -> Future {
120-
let mut lock = self.propagate_future_state_to_notify_flag();
65+
let mut lock = self.notify_pending.lock().unwrap();
66+
if let Some(existing_state) = &lock.1 {
67+
if existing_state.lock().unwrap().callbacks_made {
68+
// If the existing `FutureState` has completed and actually made callbacks,
69+
// consider the notification flag to have been cleared and reset the future state.
70+
lock.1.take();
71+
lock.0 = false;
72+
}
73+
}
12174
if let Some(existing_state) = &lock.1 {
12275
Future { state: Arc::clone(&existing_state) }
12376
} else {
@@ -175,6 +128,9 @@ impl FutureState {
175128
}
176129

177130
/// A simple future which can complete once, and calls some callback(s) when it does so.
131+
///
132+
/// Clones can be made and all futures cloned from the same source will complete at the same time.
133+
#[derive(Clone)]
178134
pub struct Future {
179135
state: Arc<Mutex<FutureState>>,
180136
}
@@ -229,6 +185,86 @@ impl<'a> StdFuture for Future {
229185
}
230186
}
231187

188+
/// A struct which can be used to await multiple [`Future`]s at once without relying on a full
189+
/// async context.
190+
pub struct Sleeper {
191+
notifiers: Vec<Arc<Mutex<FutureState>>>,
192+
}
193+
194+
impl Sleeper {
195+
/// Constructs a new sleeper from one future, allowing blocking on it.
196+
pub fn from_single_future(future: Future) -> Self {
197+
Self { notifiers: vec![future.state] }
198+
}
199+
/// Constructs a new sleeper from two futures, allowing blocking on both at once.
200+
// Note that this is the common case - a ChannelManager and ChainMonitor.
201+
pub fn from_two_futures(fut_a: Future, fut_b: Future) -> Self {
202+
Self { notifiers: vec![fut_a.state, fut_b.state] }
203+
}
204+
/// Constructs a new sleeper on many futures, allowing blocking on all at once.
205+
pub fn new(futures: Vec<Future>) -> Self {
206+
Self { notifiers: futures.into_iter().map(|f| f.state).collect() }
207+
}
208+
fn setup_wait(&self) -> (Arc<Condvar>, Arc<Mutex<Option<Arc<Mutex<FutureState>>>>>) {
209+
let cv = Arc::new(Condvar::new());
210+
let notified_fut_mtx = Arc::new(Mutex::new(None));
211+
{
212+
for notifier_mtx in self.notifiers.iter() {
213+
let cv_ref = Arc::clone(&cv);
214+
let notified_ref = Arc::clone(&notified_fut_mtx);
215+
let notifier_ref = Arc::clone(&notifier_mtx);
216+
let mut notifier = notifier_mtx.lock().unwrap();
217+
if notifier.complete {
218+
*notified_fut_mtx.lock().unwrap() = Some(notifier_ref);
219+
break;
220+
}
221+
notifier.callbacks.push((false, Box::new(move || {
222+
*notified_ref.lock().unwrap() = Some(Arc::clone(&notifier_ref));
223+
cv_ref.notify_all();
224+
})));
225+
}
226+
}
227+
(cv, notified_fut_mtx)
228+
}
229+
230+
/// Wait until one of the [`Future`]s registered with this [`Sleeper`] has completed.
231+
pub fn wait(&self) {
232+
let (cv, notified_fut_mtx) = self.setup_wait();
233+
let notified_fut = {
234+
let mut notified_fut_lck = notified_fut_mtx.lock().unwrap();
235+
loop {
236+
if let Some(notified_fut) = notified_fut_lck.take() {
237+
break notified_fut;
238+
}
239+
notified_fut_lck = cv.wait(notified_fut_lck).unwrap();
240+
}
241+
};
242+
notified_fut.lock().unwrap().callbacks_made = true;
243+
}
244+
245+
/// Wait until one of the [`Future`]s registered with this [`Sleeper`] has completed or the
246+
/// given amount of time has elapsed. Returns true if a [`Future`] completed, false if the time
247+
/// elapsed.
248+
#[cfg(any(test, feature = "std"))]
249+
pub fn wait_timeout(&self, max_wait: Duration) -> bool {
250+
let start_time = Instant::now();
251+
let (cv, notified_fut_mtx) = self.setup_wait();
252+
let notified_fut = {
253+
let mut notified_fut_lck = notified_fut_mtx.lock().unwrap();
254+
loop {
255+
if let Some(notified_fut) = notified_fut_lck.take() {
256+
break notified_fut;
257+
}
258+
let sleep_time = max_wait.saturating_sub(start_time.elapsed());
259+
if sleep_time == Duration::from_secs(0) { return false; }
260+
notified_fut_lck = cv.wait_timeout(notified_fut_lck, max_wait).unwrap().0;
261+
}
262+
};
263+
notified_fut.lock().unwrap().callbacks_made = true;
264+
true
265+
}
266+
}
267+
232268
#[cfg(test)]
233269
mod tests {
234270
use super::*;
@@ -327,10 +363,7 @@ mod tests {
327363
let exit_thread_clone = exit_thread.clone();
328364
thread::spawn(move || {
329365
loop {
330-
let mut lock = thread_notifier.notify_pending.lock().unwrap();
331-
lock.0 = true;
332-
thread_notifier.condvar.notify_all();
333-
366+
thread_notifier.notify();
334367
if exit_thread_clone.load(Ordering::SeqCst) {
335368
break
336369
}

0 commit comments

Comments
 (0)