Skip to content

Commit 514e0cf

Browse files
committed
Add a Future which can receive manager persistence events
This allows users who don't wish to block a full thread to receive persistence events. The `Future` added here is really just a trivial list of callbacks, but from that we can build a (somewhat ineffecient) std::future::Future implementation and can (at least once a mapping for Box<dyn Trait> is added) include the future in no-std bindings as well. Fixes #1595
1 parent 68b3d2e commit 514e0cf

File tree

2 files changed

+136
-30
lines changed

2 files changed

+136
-30
lines changed

lightning/src/ln/channelmanager.rs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ use util::config::{UserConfig, ChannelConfig};
5555
use util::events::{EventHandler, EventsProvider, MessageSendEvent, MessageSendEventsProvider, ClosureReason, HTLCDestination};
5656
use util::{byte_utils, events};
5757
use util::crypto::sign;
58-
use util::wakers::PersistenceNotifier;
58+
use util::wakers::{Future, PersistenceNotifier};
5959
use util::scid_utils::fake_scid;
6060
use util::ser::{BigSize, FixedLengthReader, Readable, ReadableArgs, MaybeReadable, Writeable, Writer, VecWriter};
6161
use util::logger::{Level, Logger};
@@ -5988,6 +5988,13 @@ where
59885988
self.persistence_notifier.wait()
59895989
}
59905990

5991+
/// Gets a [`Future`] which completes when a persistable update is available. Note that
5992+
/// callbacks registered on the [`Future`] MUST NOT call back into this [`ChannelManager`] and
5993+
/// should instead register actions to be taken later.
5994+
pub fn get_persistable_update_future(&self) -> Future {
5995+
self.persistence_notifier.get_future()
5996+
}
5997+
59915998
#[cfg(any(test, feature = "_test_utils"))]
59925999
pub fn get_persistence_condvar_value(&self) -> bool {
59936000
self.persistence_notifier.needs_persist()

lightning/src/util/wakers.rs

Lines changed: 128 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -13,40 +13,47 @@
1313
//!
1414
//! [`ChannelManager`]: crate::ln::channelmanager::ChannelManager
1515
16+
use alloc::sync::Arc;
1617
use core::mem;
1718
use core::time::Duration;
1819
use sync::{Condvar, Mutex};
1920

2021
#[cfg(any(test, feature = "std"))]
2122
use std::time::Instant;
2223

24+
#[cfg(feature = "std")]
25+
use std::future::Future as StdFuture;
26+
#[cfg(feature = "std")]
27+
use std::task::{Context, Poll};
28+
#[cfg(feature = "std")]
29+
use std::pin::Pin;
30+
2331
/// Used to signal to the ChannelManager persister that the manager needs to be re-persisted to
2432
/// disk/backups, through `await_persistable_update_timeout` and `await_persistable_update`.
2533
pub(crate) struct PersistenceNotifier {
26-
/// Users won't access the persistence_lock directly, but rather wait on its bool using
27-
/// `wait_timeout` and `wait`.
28-
persistence_lock: (Mutex<bool>, Condvar),
34+
persist_pending: Mutex<(bool, Option<Arc<Mutex<FutureState>>>)>,
35+
persistence_condvar: Condvar,
2936
}
3037

3138
impl PersistenceNotifier {
3239
pub(crate) fn new() -> Self {
3340
Self {
34-
persistence_lock: (Mutex::new(false), Condvar::new()),
41+
persist_pending: Mutex::new((false, None)),
42+
persistence_condvar: Condvar::new(),
3543
}
3644
}
3745

3846
pub(crate) fn wait(&self) {
3947
loop {
40-
let &(ref mtx, ref cvar) = &self.persistence_lock;
41-
let mut guard = mtx.lock().unwrap();
42-
if *guard {
43-
*guard = false;
48+
let mut guard = self.persist_pending.lock().unwrap();
49+
if guard.0 {
50+
guard.0 = false;
4451
return;
4552
}
46-
guard = cvar.wait(guard).unwrap();
47-
let result = *guard;
53+
guard = self.persistence_condvar.wait(guard).unwrap();
54+
let result = guard.0;
4855
if result {
49-
*guard = false;
56+
guard.0 = false;
5057
return
5158
}
5259
}
@@ -56,22 +63,21 @@ impl PersistenceNotifier {
5663
pub(crate) fn wait_timeout(&self, max_wait: Duration) -> bool {
5764
let current_time = Instant::now();
5865
loop {
59-
let &(ref mtx, ref cvar) = &self.persistence_lock;
60-
let mut guard = mtx.lock().unwrap();
61-
if *guard {
62-
*guard = false;
66+
let mut guard = self.persist_pending.lock().unwrap();
67+
if guard.0 {
68+
guard.0 = false;
6369
return true;
6470
}
65-
guard = cvar.wait_timeout(guard, max_wait).unwrap().0;
71+
guard = self.persistence_condvar.wait_timeout(guard, max_wait).unwrap().0;
6672
// Due to spurious wakeups that can happen on `wait_timeout`, here we need to check if the
6773
// desired wait time has actually passed, and if not then restart the loop with a reduced wait
6874
// time. Note that this logic can be highly simplified through the use of
6975
// `Condvar::wait_while` and `Condvar::wait_timeout_while`, if and when our MSRV is raised to
7076
// 1.42.0.
7177
let elapsed = current_time.elapsed();
72-
let result = *guard;
78+
let result = guard.0;
7379
if result || elapsed >= max_wait {
74-
*guard = false;
80+
guard.0 = false;
7581
return result;
7682
}
7783
match max_wait.checked_sub(elapsed) {
@@ -83,18 +89,112 @@ impl PersistenceNotifier {
8389

8490
/// Wake waiters, tracking that persistence needs to occur.
8591
pub(crate) fn notify(&self) {
86-
let &(ref persist_mtx, ref cnd) = &self.persistence_lock;
87-
let mut persistence_lock = persist_mtx.lock().unwrap();
88-
*persistence_lock = true;
92+
let mut persistence_lock = self.persist_pending.lock().unwrap();
93+
persistence_lock.0 = true;
94+
if let Some(future_state) = persistence_lock.1.take() {
95+
future_state.lock().unwrap().complete();
96+
}
8997
mem::drop(persistence_lock);
90-
cnd.notify_all();
98+
self.persistence_condvar.notify_all();
99+
}
100+
101+
/// Gets a [`Future`] that will get woken up with any waiters
102+
pub(crate) fn get_future(&self) -> Future {
103+
let mut persistence_lock = self.persist_pending.lock().unwrap();
104+
if persistence_lock.0 {
105+
Future {
106+
state: Arc::new(Mutex::new(FutureState {
107+
callbacks: Vec::new(),
108+
complete: false,
109+
}))
110+
}
111+
} else if let Some(existing_state) = &persistence_lock.1 {
112+
Future { state: Arc::clone(&existing_state) }
113+
} else {
114+
let state = Arc::new(Mutex::new(FutureState {
115+
callbacks: Vec::new(),
116+
complete: false,
117+
}));
118+
persistence_lock.1 = Some(Arc::clone(&state));
119+
Future { state }
120+
}
91121
}
92122

93123
#[cfg(any(test, feature = "_test_utils"))]
94124
pub fn needs_persist(&self) -> bool {
95-
let &(ref mtx, _) = &self.persistence_lock;
96-
let guard = mtx.lock().unwrap();
97-
*guard
125+
self.persist_pending.lock().unwrap().0
126+
}
127+
}
128+
129+
/// A callback which is called when a [`Future`] completes.
130+
///
131+
/// Note that this MUST NOT call back into LDK directly, it must instead schedule actions to be
132+
/// taken later. Rust users should use the [`std::future::Future`] implementation for [`Future`]
133+
/// instead.
134+
///
135+
/// Note that the [`std::future::Future`] implementation may only work for runtimes which schedule
136+
/// futures when they receive a wake, rather than immediately executing them.
137+
pub trait FutureCallback : Send {
138+
/// The method which is called.
139+
fn call(&self);
140+
}
141+
142+
pub struct FutureState {
143+
callbacks: Vec<Box<dyn FutureCallback>>,
144+
complete: bool,
145+
}
146+
147+
impl FutureState {
148+
fn complete(&mut self) {
149+
for callback in self.callbacks.drain(..) {
150+
callback.call();
151+
}
152+
self.complete = true;
153+
}
154+
}
155+
156+
/// A simple future which can complete once, and calls some callback(s) when it does so.
157+
pub struct Future {
158+
state: Arc<Mutex<FutureState>>,
159+
}
160+
161+
impl Future {
162+
/// Registers a callback to be called upon completion of this future. If the future has already
163+
/// completed, the callback will be called immediately.
164+
pub fn register_callback(&self, callback: Box<dyn FutureCallback>) {
165+
let mut state = self.state.lock().unwrap();
166+
if state.complete {
167+
mem::drop(state);
168+
callback.call();
169+
} else {
170+
state.callbacks.push(callback);
171+
}
172+
}
173+
}
174+
175+
#[cfg(feature = "std")]
176+
mod std_future {
177+
use std::task::Waker;
178+
pub struct StdWaker(pub Waker);
179+
impl super::FutureCallback for StdWaker {
180+
fn call(&self) { self.0.wake_by_ref() }
181+
}
182+
}
183+
184+
#[cfg(feature = "std")]
185+
/// (C-not exported) as Rust Futures aren't usable in language bindings.
186+
impl<'a> StdFuture for Future {
187+
type Output = ();
188+
189+
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
190+
let mut state = self.state.lock().unwrap();
191+
if state.complete {
192+
Poll::Ready(())
193+
} else {
194+
let waker = cx.waker().clone();
195+
state.callbacks.push(Box::new(std_future::StdWaker(waker)));
196+
Poll::Pending
197+
}
98198
}
99199
}
100200

@@ -115,10 +215,9 @@ mod tests {
115215
let exit_thread_clone = exit_thread.clone();
116216
thread::spawn(move || {
117217
loop {
118-
let &(ref persist_mtx, ref cnd) = &thread_notifier.persistence_lock;
119-
let mut persistence_lock = persist_mtx.lock().unwrap();
120-
*persistence_lock = true;
121-
cnd.notify_all();
218+
let mut persistence_lock = thread_notifier.persist_pending.lock().unwrap();
219+
persistence_lock.0 = true;
220+
thread_notifier.persistence_condvar.notify_all();
122221

123222
if exit_thread_clone.load(Ordering::SeqCst) {
124223
break

0 commit comments

Comments
 (0)