Skip to content

Commit 29e09e8

Browse files
committed
Add a Future which can receive manager persistence events
This allows users who don't wish to block a full thread to receive persistence events. The `Future` added here is really just a trivial list of callbacks, but from that we can build a (somewhat ineffecient) std::future::Future implementation and can (at least once a mapping for Box<dyn Trait> is added) include the future in no-std bindings as well. Fixes #1595
1 parent 8c5b793 commit 29e09e8

File tree

2 files changed

+137
-31
lines changed

2 files changed

+137
-31
lines changed

lightning/src/ln/channelmanager.rs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ use util::config::{UserConfig, ChannelConfig};
5555
use util::events::{EventHandler, EventsProvider, MessageSendEvent, MessageSendEventsProvider, ClosureReason, HTLCDestination};
5656
use util::{byte_utils, events};
5757
use util::crypto::sign;
58-
use util::wakers::Notifier;
58+
use util::wakers::{Future, Notifier};
5959
use util::scid_utils::fake_scid;
6060
use util::ser::{BigSize, FixedLengthReader, Readable, ReadableArgs, MaybeReadable, Writeable, Writer, VecWriter};
6161
use util::logger::{Level, Logger};
@@ -5988,6 +5988,13 @@ where
59885988
self.persistence_notifier.wait()
59895989
}
59905990

5991+
/// Gets a [`Future`] which completes when a persistable update is available. Note that
5992+
/// callbacks registered on the [`Future`] MUST NOT call back into this [`ChannelManager`] and
5993+
/// should instead register actions to be taken later.
5994+
pub fn get_persistable_update_future(&self) -> Future {
5995+
self.persistence_notifier.get_future()
5996+
}
5997+
59915998
#[cfg(any(test, feature = "_test_utils"))]
59925999
pub fn get_persistence_condvar_value(&self) -> bool {
59936000
self.persistence_notifier.needs_persist()

lightning/src/util/wakers.rs

Lines changed: 129 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -13,39 +13,46 @@
1313
//!
1414
//! [`ChannelManager`]: crate::ln::channelmanager::ChannelManager
1515
16+
use alloc::sync::Arc;
1617
use core::mem;
1718
use core::time::Duration;
1819
use sync::{Condvar, Mutex};
1920

2021
#[cfg(any(test, feature = "std"))]
2122
use std::time::Instant;
2223

24+
#[cfg(feature = "std")]
25+
use std::future::Future as StdFuture;
26+
#[cfg(feature = "std")]
27+
use std::task::{Context, Poll};
28+
#[cfg(feature = "std")]
29+
use std::pin::Pin;
30+
2331
/// Used to signal to one of many waiters that the condition they're waiting on has happened.
2432
pub(crate) struct Notifier {
25-
/// Users won't access the lock directly, but rather wait on its bool using
26-
/// `wait_timeout` and `wait`.
27-
lock: (Mutex<bool>, Condvar),
33+
persist_pending: Mutex<(bool, Option<Arc<Mutex<FutureState>>>)>,
34+
persistence_condvar: Condvar,
2835
}
2936

3037
impl Notifier {
3138
pub(crate) fn new() -> Self {
3239
Self {
33-
lock: (Mutex::new(false), Condvar::new()),
40+
persist_pending: Mutex::new((false, None)),
41+
persistence_condvar: Condvar::new(),
3442
}
3543
}
3644

3745
pub(crate) fn wait(&self) {
3846
loop {
39-
let &(ref mtx, ref cvar) = &self.lock;
40-
let mut guard = mtx.lock().unwrap();
41-
if *guard {
42-
*guard = false;
47+
let mut guard = self.persist_pending.lock().unwrap();
48+
if guard.0 {
49+
guard.0 = false;
4350
return;
4451
}
45-
guard = cvar.wait(guard).unwrap();
46-
let result = *guard;
52+
guard = self.persistence_condvar.wait(guard).unwrap();
53+
let result = guard.0;
4754
if result {
48-
*guard = false;
55+
guard.0 = false;
4956
return
5057
}
5158
}
@@ -55,22 +62,21 @@ impl Notifier {
5562
pub(crate) fn wait_timeout(&self, max_wait: Duration) -> bool {
5663
let current_time = Instant::now();
5764
loop {
58-
let &(ref mtx, ref cvar) = &self.lock;
59-
let mut guard = mtx.lock().unwrap();
60-
if *guard {
61-
*guard = false;
65+
let mut guard = self.persist_pending.lock().unwrap();
66+
if guard.0 {
67+
guard.0 = false;
6268
return true;
6369
}
64-
guard = cvar.wait_timeout(guard, max_wait).unwrap().0;
70+
guard = self.persistence_condvar.wait_timeout(guard, max_wait).unwrap().0;
6571
// Due to spurious wakeups that can happen on `wait_timeout`, here we need to check if the
6672
// desired wait time has actually passed, and if not then restart the loop with a reduced wait
6773
// time. Note that this logic can be highly simplified through the use of
6874
// `Condvar::wait_while` and `Condvar::wait_timeout_while`, if and when our MSRV is raised to
6975
// 1.42.0.
7076
let elapsed = current_time.elapsed();
71-
let result = *guard;
77+
let result = guard.0;
7278
if result || elapsed >= max_wait {
73-
*guard = false;
79+
guard.0 = false;
7480
return result;
7581
}
7682
match max_wait.checked_sub(elapsed) {
@@ -82,18 +88,112 @@ impl Notifier {
8288

8389
/// Wake waiters, tracking that persistence needs to occur.
8490
pub(crate) fn notify(&self) {
85-
let &(ref persist_mtx, ref cnd) = &self.lock;
86-
let mut lock = persist_mtx.lock().unwrap();
87-
*lock = true;
88-
mem::drop(lock);
89-
cnd.notify_all();
91+
let mut persistence_lock = self.persist_pending.lock().unwrap();
92+
persistence_lock.0 = true;
93+
if let Some(future_state) = persistence_lock.1.take() {
94+
future_state.lock().unwrap().complete();
95+
}
96+
mem::drop(persistence_lock);
97+
self.persistence_condvar.notify_all();
98+
}
99+
100+
/// Gets a [`Future`] that will get woken up with any waiters
101+
pub(crate) fn get_future(&self) -> Future {
102+
let mut persistence_lock = self.persist_pending.lock().unwrap();
103+
if persistence_lock.0 {
104+
Future {
105+
state: Arc::new(Mutex::new(FutureState {
106+
callbacks: Vec::new(),
107+
complete: false,
108+
}))
109+
}
110+
} else if let Some(existing_state) = &persistence_lock.1 {
111+
Future { state: Arc::clone(&existing_state) }
112+
} else {
113+
let state = Arc::new(Mutex::new(FutureState {
114+
callbacks: Vec::new(),
115+
complete: false,
116+
}));
117+
persistence_lock.1 = Some(Arc::clone(&state));
118+
Future { state }
119+
}
90120
}
91121

92122
#[cfg(any(test, feature = "_test_utils"))]
93123
pub fn needs_persist(&self) -> bool {
94-
let &(ref mtx, _) = &self.lock;
95-
let guard = mtx.lock().unwrap();
96-
*guard
124+
self.persist_pending.lock().unwrap().0
125+
}
126+
}
127+
128+
/// A callback which is called when a [`Future`] completes.
129+
///
130+
/// Note that this MUST NOT call back into LDK directly, it must instead schedule actions to be
131+
/// taken later. Rust users should use the [`std::future::Future`] implementation for [`Future`]
132+
/// instead.
133+
///
134+
/// Note that the [`std::future::Future`] implementation may only work for runtimes which schedule
135+
/// futures when they receive a wake, rather than immediately executing them.
136+
pub trait FutureCallback : Send {
137+
/// The method which is called.
138+
fn call(&self);
139+
}
140+
141+
pub struct FutureState {
142+
callbacks: Vec<Box<dyn FutureCallback>>,
143+
complete: bool,
144+
}
145+
146+
impl FutureState {
147+
fn complete(&mut self) {
148+
for callback in self.callbacks.drain(..) {
149+
callback.call();
150+
}
151+
self.complete = true;
152+
}
153+
}
154+
155+
/// A simple future which can complete once, and calls some callback(s) when it does so.
156+
pub struct Future {
157+
state: Arc<Mutex<FutureState>>,
158+
}
159+
160+
impl Future {
161+
/// Registers a callback to be called upon completion of this future. If the future has already
162+
/// completed, the callback will be called immediately.
163+
pub fn register_callback(&self, callback: Box<dyn FutureCallback>) {
164+
let mut state = self.state.lock().unwrap();
165+
if state.complete {
166+
mem::drop(state);
167+
callback.call();
168+
} else {
169+
state.callbacks.push(callback);
170+
}
171+
}
172+
}
173+
174+
#[cfg(feature = "std")]
175+
mod std_future {
176+
use std::task::Waker;
177+
pub struct StdWaker(pub Waker);
178+
impl super::FutureCallback for StdWaker {
179+
fn call(&self) { self.0.wake_by_ref() }
180+
}
181+
}
182+
183+
#[cfg(feature = "std")]
184+
/// (C-not exported) as Rust Futures aren't usable in language bindings.
185+
impl<'a> StdFuture for Future {
186+
type Output = ();
187+
188+
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
189+
let mut state = self.state.lock().unwrap();
190+
if state.complete {
191+
Poll::Ready(())
192+
} else {
193+
let waker = cx.waker().clone();
194+
state.callbacks.push(Box::new(std_future::StdWaker(waker)));
195+
Poll::Pending
196+
}
97197
}
98198
}
99199

@@ -114,10 +214,9 @@ mod tests {
114214
let exit_thread_clone = exit_thread.clone();
115215
thread::spawn(move || {
116216
loop {
117-
let &(ref persist_mtx, ref cnd) = &thread_notifier.lock;
118-
let mut lock = persist_mtx.lock().unwrap();
119-
*lock = true;
120-
cnd.notify_all();
217+
let mut persistence_lock = thread_notifier.persist_pending.lock().unwrap();
218+
persistence_lock.0 = true;
219+
thread_notifier.persistence_condvar.notify_all();
121220

122221
if exit_thread_clone.load(Ordering::SeqCst) {
123222
break

0 commit comments

Comments
 (0)