Skip to content

Commit c878bb2

Browse files
committed
Add a Future which can receive manager persistence events
This allows users who don't wish to block a full thread to receive persistence events. The `Future` added here is really just a trivial list of callbacks, but from that we can build a (somewhat ineffecient) std::future::Future implementation and can (at least once a mapping for Box<dyn Trait> is added) include the future in no-std bindings as well. Fixes #1595
1 parent ca9b6e8 commit c878bb2

File tree

2 files changed

+225
-32
lines changed

2 files changed

+225
-32
lines changed

lightning/src/ln/channelmanager.rs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ use util::config::{UserConfig, ChannelConfig};
5555
use util::events::{EventHandler, EventsProvider, MessageSendEvent, MessageSendEventsProvider, ClosureReason, HTLCDestination};
5656
use util::{byte_utils, events};
5757
use util::crypto::sign;
58-
use util::wakers::Notifier;
58+
use util::wakers::{Future, Notifier};
5959
use util::scid_utils::fake_scid;
6060
use util::ser::{BigSize, FixedLengthReader, Readable, ReadableArgs, MaybeReadable, Writeable, Writer, VecWriter};
6161
use util::logger::{Level, Logger};
@@ -5988,6 +5988,13 @@ where
59885988
self.persistence_notifier.wait()
59895989
}
59905990

5991+
/// Gets a [`Future`] that completes when a persistable update is available. Note that
5992+
/// callbacks registered on the [`Future`] MUST NOT call back into this [`ChannelManager`] and
5993+
/// should instead register actions to be taken later.
5994+
pub fn get_persistable_update_future(&self) -> Future {
5995+
self.persistence_notifier.get_future()
5996+
}
5997+
59915998
#[cfg(any(test, feature = "_test_utils"))]
59925999
pub fn get_persistence_condvar_value(&self) -> bool {
59936000
self.persistence_notifier.notify_pending()

lightning/src/util/wakers.rs

Lines changed: 217 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -13,39 +13,46 @@
1313
//!
1414
//! [`ChannelManager`]: crate::ln::channelmanager::ChannelManager
1515
16+
use alloc::sync::Arc;
1617
use core::mem;
1718
use core::time::Duration;
1819
use sync::{Condvar, Mutex};
1920

2021
#[cfg(any(test, feature = "std"))]
2122
use std::time::Instant;
2223

24+
#[cfg(feature = "std")]
25+
use std::future::Future as StdFuture;
26+
#[cfg(feature = "std")]
27+
use std::task::{Context, Poll};
28+
#[cfg(feature = "std")]
29+
use std::pin::Pin;
30+
2331
/// Used to signal to one of many waiters that the condition they're waiting on has happened.
2432
pub(crate) struct Notifier {
25-
/// Users won't access the lock directly, but rather wait on its bool using
26-
/// `wait_timeout` and `wait`.
27-
lock: (Mutex<bool>, Condvar),
33+
notify_pending: Mutex<(bool, Option<Arc<Mutex<FutureState>>>)>,
34+
condvar: Condvar,
2835
}
2936

3037
impl Notifier {
3138
pub(crate) fn new() -> Self {
3239
Self {
33-
lock: (Mutex::new(false), Condvar::new()),
40+
notify_pending: Mutex::new((false, None)),
41+
condvar: Condvar::new(),
3442
}
3543
}
3644

3745
pub(crate) fn wait(&self) {
3846
loop {
39-
let &(ref mtx, ref cvar) = &self.lock;
40-
let mut guard = mtx.lock().unwrap();
41-
if *guard {
42-
*guard = false;
47+
let mut guard = self.notify_pending.lock().unwrap();
48+
if guard.0 {
49+
guard.0 = false;
4350
return;
4451
}
45-
guard = cvar.wait(guard).unwrap();
46-
let result = *guard;
52+
guard = self.condvar.wait(guard).unwrap();
53+
let result = guard.0;
4754
if result {
48-
*guard = false;
55+
guard.0 = false;
4956
return
5057
}
5158
}
@@ -55,22 +62,21 @@ impl Notifier {
5562
pub(crate) fn wait_timeout(&self, max_wait: Duration) -> bool {
5663
let current_time = Instant::now();
5764
loop {
58-
let &(ref mtx, ref cvar) = &self.lock;
59-
let mut guard = mtx.lock().unwrap();
60-
if *guard {
61-
*guard = false;
65+
let mut guard = self.notify_pending.lock().unwrap();
66+
if guard.0 {
67+
guard.0 = false;
6268
return true;
6369
}
64-
guard = cvar.wait_timeout(guard, max_wait).unwrap().0;
70+
guard = self.condvar.wait_timeout(guard, max_wait).unwrap().0;
6571
// Due to spurious wakeups that can happen on `wait_timeout`, here we need to check if the
6672
// desired wait time has actually passed, and if not then restart the loop with a reduced wait
6773
// time. Note that this logic can be highly simplified through the use of
6874
// `Condvar::wait_while` and `Condvar::wait_timeout_while`, if and when our MSRV is raised to
6975
// 1.42.0.
7076
let elapsed = current_time.elapsed();
71-
let result = *guard;
77+
let result = guard.0;
7278
if result || elapsed >= max_wait {
73-
*guard = false;
79+
guard.0 = false;
7480
return result;
7581
}
7682
match max_wait.checked_sub(elapsed) {
@@ -82,29 +88,130 @@ impl Notifier {
8288

8389
/// Wake waiters, tracking that wake needs to occur even if there are currently no waiters.
8490
pub(crate) fn notify(&self) {
85-
let &(ref persist_mtx, ref cnd) = &self.lock;
86-
let mut lock = persist_mtx.lock().unwrap();
87-
*lock = true;
91+
let mut lock = self.notify_pending.lock().unwrap();
92+
lock.0 = true;
93+
if let Some(future_state) = lock.1.take() {
94+
future_state.lock().unwrap().complete();
95+
}
8896
mem::drop(lock);
89-
cnd.notify_all();
97+
self.condvar.notify_all();
98+
}
99+
100+
/// Gets a [`Future`] that will get woken up with any waiters
101+
pub(crate) fn get_future(&self) -> Future {
102+
let mut lock = self.notify_pending.lock().unwrap();
103+
if lock.0 {
104+
Future {
105+
state: Arc::new(Mutex::new(FutureState {
106+
callbacks: Vec::new(),
107+
complete: false,
108+
}))
109+
}
110+
} else if let Some(existing_state) = &lock.1 {
111+
Future { state: Arc::clone(&existing_state) }
112+
} else {
113+
let state = Arc::new(Mutex::new(FutureState {
114+
callbacks: Vec::new(),
115+
complete: false,
116+
}));
117+
lock.1 = Some(Arc::clone(&state));
118+
Future { state }
119+
}
90120
}
91121

92122
#[cfg(any(test, feature = "_test_utils"))]
93123
pub fn notify_pending(&self) -> bool {
94-
let &(ref mtx, _) = &self.lock;
95-
let guard = mtx.lock().unwrap();
96-
*guard
124+
self.notify_pending.lock().unwrap().0
125+
}
126+
}
127+
128+
/// A callback which is called when a [`Future`] completes.
129+
///
130+
/// Note that this MUST NOT call back into LDK directly, it must instead schedule actions to be
131+
/// taken later. Rust users should use the [`std::future::Future`] implementation for [`Future`]
132+
/// instead.
133+
///
134+
/// Note that the [`std::future::Future`] implementation may only work for runtimes which schedule
135+
/// futures when they receive a wake, rather than immediately executing them.
136+
pub trait FutureCallback : Send {
137+
/// The method which is called.
138+
fn call(&self);
139+
}
140+
141+
impl<F: Fn() + Send> FutureCallback for F {
142+
fn call(&self) { (self)(); }
143+
}
144+
145+
pub(crate) struct FutureState {
146+
callbacks: Vec<Box<dyn FutureCallback>>,
147+
complete: bool,
148+
}
149+
150+
impl FutureState {
151+
fn complete(&mut self) {
152+
for callback in self.callbacks.drain(..) {
153+
callback.call();
154+
}
155+
self.complete = true;
156+
}
157+
}
158+
159+
/// A simple future which can complete once, and calls some callback(s) when it does so.
160+
pub struct Future {
161+
state: Arc<Mutex<FutureState>>,
162+
}
163+
164+
impl Future {
165+
/// Registers a callback to be called upon completion of this future. If the future has already
166+
/// completed, the callback will be called immediately.
167+
pub fn register_callback(&self, callback: Box<dyn FutureCallback>) {
168+
let mut state = self.state.lock().unwrap();
169+
if state.complete {
170+
mem::drop(state);
171+
callback.call();
172+
} else {
173+
state.callbacks.push(callback);
174+
}
175+
}
176+
}
177+
178+
#[cfg(feature = "std")]
179+
mod std_future {
180+
use std::task::Waker;
181+
pub struct StdWaker(pub Waker);
182+
impl super::FutureCallback for StdWaker {
183+
fn call(&self) { self.0.wake_by_ref() }
184+
}
185+
}
186+
187+
#[cfg(feature = "std")]
188+
/// (C-not exported) as Rust Futures aren't usable in language bindings.
189+
impl<'a> StdFuture for Future {
190+
type Output = ();
191+
192+
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
193+
let mut state = self.state.lock().unwrap();
194+
if state.complete {
195+
Poll::Ready(())
196+
} else {
197+
let waker = cx.waker().clone();
198+
state.callbacks.push(Box::new(std_future::StdWaker(waker)));
199+
Poll::Pending
200+
}
97201
}
98202
}
99203

100204
#[cfg(test)]
101205
mod tests {
206+
use super::*;
207+
use core::sync::atomic::{AtomicBool, Ordering};
208+
use core::future::Future as FutureTrait;
209+
use core::task::{Context, Poll, RawWaker, RawWakerVTable, Waker};
210+
102211
#[cfg(feature = "std")]
103212
#[test]
104213
fn test_wait_timeout() {
105-
use super::*;
106214
use sync::Arc;
107-
use core::sync::atomic::{AtomicBool, Ordering};
108215
use std::thread;
109216

110217
let persistence_notifier = Arc::new(Notifier::new());
@@ -114,10 +221,9 @@ mod tests {
114221
let exit_thread_clone = exit_thread.clone();
115222
thread::spawn(move || {
116223
loop {
117-
let &(ref persist_mtx, ref cnd) = &thread_notifier.lock;
118-
let mut lock = persist_mtx.lock().unwrap();
119-
*lock = true;
120-
cnd.notify_all();
224+
let mut lock = thread_notifier.notify_pending.lock().unwrap();
225+
lock.0 = true;
226+
thread_notifier.condvar.notify_all();
121227

122228
if exit_thread_clone.load(Ordering::SeqCst) {
123229
break
@@ -146,4 +252,84 @@ mod tests {
146252
}
147253
}
148254
}
255+
256+
#[test]
257+
fn test_future_callbacks() {
258+
let future = Future {
259+
state: Arc::new(Mutex::new(FutureState {
260+
callbacks: Vec::new(),
261+
complete: false,
262+
}))
263+
};
264+
let callback = Arc::new(AtomicBool::new(false));
265+
let callback_ref = Arc::clone(&callback);
266+
future.register_callback(Box::new(move || assert!(!callback_ref.fetch_or(true, Ordering::SeqCst))));
267+
268+
assert!(!callback.load(Ordering::SeqCst));
269+
future.state.lock().unwrap().complete();
270+
assert!(callback.load(Ordering::SeqCst));
271+
future.state.lock().unwrap().complete();
272+
}
273+
274+
#[test]
275+
fn test_pre_completed_future_callbacks() {
276+
let future = Future {
277+
state: Arc::new(Mutex::new(FutureState {
278+
callbacks: Vec::new(),
279+
complete: false,
280+
}))
281+
};
282+
future.state.lock().unwrap().complete();
283+
284+
let callback = Arc::new(AtomicBool::new(false));
285+
let callback_ref = Arc::clone(&callback);
286+
future.register_callback(Box::new(move || assert!(!callback_ref.fetch_or(true, Ordering::SeqCst))));
287+
288+
assert!(callback.load(Ordering::SeqCst));
289+
assert!(future.state.lock().unwrap().callbacks.is_empty());
290+
}
291+
292+
// Rather annoyingly, there's no safe way in Rust std to construct a Waker despite it being
293+
// totally possible to construct from a trait implementation (though somewhat less effecient
294+
// compared to a raw VTable). Instead, we have to write out a lot of boilerplate to build a
295+
// waker, which we do here with a trivial Arc<AtomicBool> data element to track woke-ness.
296+
const WAKER_V_TABLE: RawWakerVTable = RawWakerVTable::new(waker_clone, wake, wake_by_ref, drop);
297+
unsafe fn wake_by_ref(ptr: *const ()) { let p = ptr as *const Arc<AtomicBool>; assert!(!(*p).fetch_or(true, Ordering::SeqCst)); }
298+
unsafe fn drop(ptr: *const ()) { let p = ptr as *mut Arc<AtomicBool>; Box::from_raw(p); }
299+
unsafe fn wake(ptr: *const ()) { wake_by_ref(ptr); drop(ptr); }
300+
unsafe fn waker_clone(ptr: *const ()) -> RawWaker {
301+
let p = ptr as *const Arc<AtomicBool>;
302+
RawWaker::new(Box::into_raw(Box::new(Arc::clone(&*p))) as *const (), &WAKER_V_TABLE)
303+
}
304+
305+
fn create_waker() -> (Arc<AtomicBool>, Waker) {
306+
let a = Arc::new(AtomicBool::new(false));
307+
let waker = unsafe { Waker::from_raw(waker_clone((&a as *const Arc<AtomicBool>) as *const ())) };
308+
(a, waker)
309+
}
310+
311+
#[test]
312+
fn test_future() {
313+
let mut future = Future {
314+
state: Arc::new(Mutex::new(FutureState {
315+
callbacks: Vec::new(),
316+
complete: false,
317+
}))
318+
};
319+
let mut second_future = Future { state: Arc::clone(&future.state) };
320+
321+
let (woken, waker) = create_waker();
322+
assert_eq!(Pin::new(&mut future).poll(&mut Context::from_waker(&waker)), Poll::Pending);
323+
assert!(!woken.load(Ordering::SeqCst));
324+
325+
let (second_woken, second_waker) = create_waker();
326+
assert_eq!(Pin::new(&mut second_future).poll(&mut Context::from_waker(&second_waker)), Poll::Pending);
327+
assert!(!second_woken.load(Ordering::SeqCst));
328+
329+
future.state.lock().unwrap().complete();
330+
assert!(woken.load(Ordering::SeqCst));
331+
assert!(second_woken.load(Ordering::SeqCst));
332+
assert_eq!(Pin::new(&mut future).poll(&mut Context::from_waker(&waker)), Poll::Ready(()));
333+
assert_eq!(Pin::new(&mut second_future).poll(&mut Context::from_waker(&second_waker)), Poll::Ready(()));
334+
}
149335
}

0 commit comments

Comments
 (0)