Skip to content

Commit b455fb5

Browse files
committed
Implement the ability to block on multiple futures at once
In the next commits we'll be adding a second notify pipeline - from the `ChainMonitor` back to the background processor. This will cause the `background-processor` to need to await multiple wakers at once, which we cannot do in the current scheme without taking on a full async runtime. Building a multi-future waiter isn't so bad, and notably will allow us to remove the existing sleep pipeline entirely, reducing the complexity of our wakers implementation by only having one notify path to consider.
1 parent 3284073 commit b455fb5

File tree

1 file changed

+150
-64
lines changed

1 file changed

+150
-64
lines changed

lightning/src/util/wakers.rs

+150-64
Original file line numberDiff line numberDiff line change
@@ -15,12 +15,12 @@
1515
1616
use alloc::sync::Arc;
1717
use core::mem;
18-
use crate::sync::{Condvar, Mutex, MutexGuard};
18+
use crate::sync::{Condvar, Mutex};
1919

2020
use crate::prelude::*;
2121

2222
#[cfg(any(test, feature = "std"))]
23-
use std::time::{Duration, Instant};
23+
use std::time::Duration;
2424

2525
use core::future::Future as StdFuture;
2626
use core::task::{Context, Poll};
@@ -30,75 +30,22 @@ use core::pin::Pin;
3030
/// Used to signal to one of many waiters that the condition they're waiting on has happened.
3131
pub(crate) struct Notifier {
3232
notify_pending: Mutex<(bool, Option<Arc<Mutex<FutureState>>>)>,
33-
condvar: Condvar,
34-
}
35-
36-
macro_rules! check_woken {
37-
($guard: expr, $retval: expr) => { {
38-
if $guard.0 {
39-
$guard.0 = false;
40-
if $guard.1.as_ref().map(|l| l.lock().unwrap().complete).unwrap_or(false) {
41-
// If we're about to return as woken, and the future state is marked complete, wipe
42-
// the future state and let the next future wait until we get a new notify.
43-
$guard.1.take();
44-
}
45-
return $retval;
46-
}
47-
} }
4833
}
4934

5035
impl Notifier {
5136
pub(crate) fn new() -> Self {
5237
Self {
5338
notify_pending: Mutex::new((false, None)),
54-
condvar: Condvar::new(),
55-
}
56-
}
57-
58-
fn propagate_future_state_to_notify_flag(&self) -> MutexGuard<(bool, Option<Arc<Mutex<FutureState>>>)> {
59-
let mut lock = self.notify_pending.lock().unwrap();
60-
if let Some(existing_state) = &lock.1 {
61-
if existing_state.lock().unwrap().callbacks_made {
62-
// If the existing `FutureState` has completed and actually made callbacks,
63-
// consider the notification flag to have been cleared and reset the future state.
64-
lock.1.take();
65-
lock.0 = false;
66-
}
6739
}
68-
lock
6940
}
7041

7142
pub(crate) fn wait(&self) {
72-
loop {
73-
let mut guard = self.propagate_future_state_to_notify_flag();
74-
check_woken!(guard, ());
75-
guard = self.condvar.wait(guard).unwrap();
76-
check_woken!(guard, ());
77-
}
43+
Sleeper::from_single_future(self.get_future()).wait();
7844
}
7945

8046
#[cfg(any(test, feature = "std"))]
8147
pub(crate) fn wait_timeout(&self, max_wait: Duration) -> bool {
82-
let current_time = Instant::now();
83-
loop {
84-
let mut guard = self.propagate_future_state_to_notify_flag();
85-
check_woken!(guard, true);
86-
guard = self.condvar.wait_timeout(guard, max_wait).unwrap().0;
87-
check_woken!(guard, true);
88-
// Due to spurious wakeups that can happen on `wait_timeout`, here we need to check if the
89-
// desired wait time has actually passed, and if not then restart the loop with a reduced wait
90-
// time. Note that this logic can be highly simplified through the use of
91-
// `Condvar::wait_while` and `Condvar::wait_timeout_while`, if and when our MSRV is raised to
92-
// 1.42.0.
93-
let elapsed = current_time.elapsed();
94-
if elapsed >= max_wait {
95-
return false;
96-
}
97-
match max_wait.checked_sub(elapsed) {
98-
None => return false,
99-
Some(_) => continue
100-
}
101-
}
48+
Sleeper::from_single_future(self.get_future()).wait_timeout(max_wait)
10249
}
10350

10451
/// Wake waiters, tracking that wake needs to occur even if there are currently no waiters.
@@ -111,13 +58,19 @@ impl Notifier {
11158
}
11259
}
11360
lock.0 = true;
114-
mem::drop(lock);
115-
self.condvar.notify_all();
11661
}
11762

11863
/// Gets a [`Future`] that will get woken up with any waiters
11964
pub(crate) fn get_future(&self) -> Future {
120-
let mut lock = self.propagate_future_state_to_notify_flag();
65+
let mut lock = self.notify_pending.lock().unwrap();
66+
if let Some(existing_state) = &lock.1 {
67+
if existing_state.lock().unwrap().callbacks_made {
68+
// If the existing `FutureState` has completed and actually made callbacks,
69+
// consider the notification flag to have been cleared and reset the future state.
70+
lock.1.take();
71+
lock.0 = false;
72+
}
73+
}
12174
if let Some(existing_state) = &lock.1 {
12275
Future { state: Arc::clone(&existing_state) }
12376
} else {
@@ -182,6 +135,9 @@ impl FutureState {
182135
}
183136

184137
/// A simple future which can complete once, and calls some callback(s) when it does so.
138+
///
139+
/// Clones can be made and all futures cloned from the same source will complete at the same time.
140+
#[derive(Clone)]
185141
pub struct Future {
186142
state: Arc<Mutex<FutureState>>,
187143
}
@@ -236,6 +192,77 @@ impl<'a> StdFuture for Future {
236192
}
237193
}
238194

195+
/// A struct which can be used to select across many [`Future`]s at once without relying on a full
196+
/// async context.
197+
pub struct Sleeper {
198+
notifiers: Vec<Arc<Mutex<FutureState>>>,
199+
}
200+
201+
impl Sleeper {
202+
/// Constructs a new sleeper from one future, allowing blocking on it.
203+
pub fn from_single_future(future: Future) -> Self {
204+
Self { notifiers: vec![future.state] }
205+
}
206+
/// Constructs a new sleeper from two futures, allowing blocking on both at once.
207+
// Note that this is the common case - a ChannelManager and ChainMonitor.
208+
pub fn from_two_futures(fut_a: Future, fut_b: Future) -> Self {
209+
Self { notifiers: vec![fut_a.state, fut_b.state] }
210+
}
211+
/// Constructs a new sleeper on many futures, allowing blocking on all at once.
212+
pub fn new(futures: Vec<Future>) -> Self {
213+
Self { notifiers: futures.into_iter().map(|f| f.state).collect() }
214+
}
215+
/// Prepares to go into a wait loop body, creating a condition variable which we can block on
216+
/// and an `Arc<Mutex<Option<_>>>` which gets set to the waking `Future`'s state prior to the
217+
/// condition variable being woken.
218+
fn setup_wait(&self) -> (Arc<Condvar>, Arc<Mutex<Option<Arc<Mutex<FutureState>>>>>) {
219+
let cv = Arc::new(Condvar::new());
220+
let notified_fut_mtx = Arc::new(Mutex::new(None));
221+
{
222+
for notifier_mtx in self.notifiers.iter() {
223+
let cv_ref = Arc::clone(&cv);
224+
let notified_fut_ref = Arc::clone(&notified_fut_mtx);
225+
let notifier_ref = Arc::clone(&notifier_mtx);
226+
let mut notifier = notifier_mtx.lock().unwrap();
227+
if notifier.complete {
228+
*notified_fut_mtx.lock().unwrap() = Some(notifier_ref);
229+
break;
230+
}
231+
notifier.callbacks.push((false, Box::new(move || {
232+
*notified_fut_ref.lock().unwrap() = Some(Arc::clone(&notifier_ref));
233+
cv_ref.notify_all();
234+
})));
235+
}
236+
}
237+
(cv, notified_fut_mtx)
238+
}
239+
240+
/// Wait until one of the [`Future`]s registered with this [`Sleeper`] has completed.
241+
pub fn wait(&self) {
242+
let (cv, notified_fut_mtx) = self.setup_wait();
243+
let notified_fut = cv.wait_while(notified_fut_mtx.lock().unwrap(), |fut_opt| fut_opt.is_none())
244+
.unwrap().take().expect("CV wait shouldn't have returned until the notifying future was set");
245+
notified_fut.lock().unwrap().callbacks_made = true;
246+
}
247+
248+
/// Wait until one of the [`Future`]s registered with this [`Sleeper`] has completed or the
249+
/// given amount of time has elapsed. Returns true if a [`Future`] completed, false if the time
250+
/// elapsed.
251+
#[cfg(any(test, feature = "std"))]
252+
pub fn wait_timeout(&self, max_wait: Duration) -> bool {
253+
let (cv, notified_fut_mtx) = self.setup_wait();
254+
let notified_fut =
255+
match cv.wait_timeout_while(notified_fut_mtx.lock().unwrap(), max_wait, |fut_opt| fut_opt.is_none()) {
256+
Ok((_, e)) if e.timed_out() => return false,
257+
Ok((mut notified_fut, _)) =>
258+
notified_fut.take().expect("CV wait shouldn't have returned until the notifying future was set"),
259+
Err(_) => panic!("Previous panic while a lock was held led to a lock panic"),
260+
};
261+
notified_fut.lock().unwrap().callbacks_made = true;
262+
true
263+
}
264+
}
265+
239266
#[cfg(test)]
240267
mod tests {
241268
use super::*;
@@ -334,10 +361,7 @@ mod tests {
334361
let exit_thread_clone = exit_thread.clone();
335362
thread::spawn(move || {
336363
loop {
337-
let mut lock = thread_notifier.notify_pending.lock().unwrap();
338-
lock.0 = true;
339-
thread_notifier.condvar.notify_all();
340-
364+
thread_notifier.notify();
341365
if exit_thread_clone.load(Ordering::SeqCst) {
342366
break
343367
}
@@ -539,4 +563,66 @@ mod tests {
539563
assert!(woken.load(Ordering::SeqCst));
540564
assert_eq!(Pin::new(&mut future).poll(&mut Context::from_waker(&waker)), Poll::Ready(()));
541565
}
566+
567+
#[test]
568+
fn test_multi_future_sleep() {
569+
// Tests the `Sleeper` with multiple futures.
570+
let notifier_a = Notifier::new();
571+
let notifier_b = Notifier::new();
572+
573+
// Set both notifiers as woken without sleeping yet.
574+
notifier_a.notify();
575+
notifier_b.notify();
576+
Sleeper::from_two_futures(notifier_a.get_future(), notifier_b.get_future()).wait();
577+
578+
// One future has woken us up, but the other should still have a pending notification.
579+
Sleeper::from_two_futures(notifier_a.get_future(), notifier_b.get_future()).wait();
580+
581+
// However once we've slept twice, we should no longer have any pending notifications
582+
assert!(!Sleeper::from_two_futures(notifier_a.get_future(), notifier_b.get_future())
583+
.wait_timeout(Duration::from_millis(10)));
584+
585+
// Test ordering somewhat more.
586+
notifier_a.notify();
587+
Sleeper::from_two_futures(notifier_a.get_future(), notifier_b.get_future()).wait();
588+
}
589+
590+
#[test]
591+
#[cfg(feature = "std")]
592+
fn sleeper_with_pending_callbacks() {
593+
// This is similar to the above `test_multi_future_sleep` test, but in addition registers
594+
// "normal" callbacks which will cause the futures to assume notification has occurred,
595+
// rather than waiting for a woken sleeper.
596+
let notifier_a = Notifier::new();
597+
let notifier_b = Notifier::new();
598+
599+
// Set both notifiers as woken without sleeping yet.
600+
notifier_a.notify();
601+
notifier_b.notify();
602+
603+
// After sleeping one future (not guaranteed which one, however) will have its notification
604+
// bit cleared.
605+
Sleeper::from_two_futures(notifier_a.get_future(), notifier_b.get_future()).wait();
606+
607+
// By registering a callback on the futures for both notifiers, one will complete
608+
// immediately, but one will remain tied to the notifier, and will complete once the
609+
// notifier is next woken, which will be considered the completion of the notification.
610+
let callback_a = Arc::new(AtomicBool::new(false));
611+
let callback_b = Arc::new(AtomicBool::new(false));
612+
let callback_a_ref = Arc::clone(&callback_a);
613+
let callback_b_ref = Arc::clone(&callback_b);
614+
notifier_a.get_future().register_callback(Box::new(move || assert!(!callback_a_ref.fetch_or(true, Ordering::SeqCst))));
615+
notifier_b.get_future().register_callback(Box::new(move || assert!(!callback_b_ref.fetch_or(true, Ordering::SeqCst))));
616+
assert!(callback_a.load(Ordering::SeqCst) ^ callback_b.load(Ordering::SeqCst));
617+
618+
// If we now notify both notifiers again, the other callback will fire, completing the
619+
// notification, and we'll be back to one pending notification.
620+
notifier_a.notify();
621+
notifier_b.notify();
622+
623+
assert!(callback_a.load(Ordering::SeqCst) && callback_b.load(Ordering::SeqCst));
624+
Sleeper::from_two_futures(notifier_a.get_future(), notifier_b.get_future()).wait();
625+
assert!(!Sleeper::from_two_futures(notifier_a.get_future(), notifier_b.get_future())
626+
.wait_timeout(Duration::from_millis(10)));
627+
}
542628
}

0 commit comments

Comments
 (0)