Skip to content

Fix persistence-required futures always completing instantly #1845

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 11 additions & 4 deletions lightning/src/ln/channelmanager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5916,18 +5916,25 @@ where

/// Blocks until ChannelManager needs to be persisted or a timeout is reached. It returns a bool
/// indicating whether persistence is necessary. Only one listener on
/// `await_persistable_update` or `await_persistable_update_timeout` is guaranteed to be woken
/// up.
/// [`await_persistable_update`], [`await_persistable_update_timeout`], or a future returned by
/// [`get_persistable_update_future`] is guaranteed to be woken up.
///
/// Note that this method is not available with the `no-std` feature.
///
/// [`await_persistable_update`]: Self::await_persistable_update
/// [`await_persistable_update_timeout`]: Self::await_persistable_update_timeout
/// [`get_persistable_update_future`]: Self::get_persistable_update_future
#[cfg(any(test, feature = "std"))]
pub fn await_persistable_update_timeout(&self, max_wait: Duration) -> bool {
self.persistence_notifier.wait_timeout(max_wait)
}

/// Blocks until ChannelManager needs to be persisted. Only one listener on
/// `await_persistable_update` or `await_persistable_update_timeout` is guaranteed to be woken
/// up.
/// [`await_persistable_update`], `await_persistable_update_timeout`, or a future returned by
/// [`get_persistable_update_future`] is guaranteed to be woken up.
///
/// [`await_persistable_update`]: Self::await_persistable_update
/// [`get_persistable_update_future`]: Self::get_persistable_update_future
pub fn await_persistable_update(&self) {
self.persistence_notifier.wait()
}
Expand Down
60 changes: 57 additions & 3 deletions lightning/src/util/wakers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,10 +88,19 @@ impl Notifier {
/// Wake waiters, tracking that wake needs to occur even if there are currently no waiters.
pub(crate) fn notify(&self) {
let mut lock = self.notify_pending.lock().unwrap();
lock.0 = true;
let mut future_probably_generated_calls = false;
if let Some(future_state) = lock.1.take() {
future_state.lock().unwrap().complete();
future_probably_generated_calls |= future_state.lock().unwrap().complete();
future_probably_generated_calls |= Arc::strong_count(&future_state) > 1;
}
if future_probably_generated_calls {
// If a future made some callbacks or has not yet been drop'd (i.e. the state has more
// than the one reference we hold), assume the user was notified and skip setting the
// notification-required flag. This will not cause the `wait` functions above to return
// and avoid any future `Future`s starting in a completed state.
return;
}
lock.0 = true;
mem::drop(lock);
self.condvar.notify_all();
}
Expand Down Expand Up @@ -147,11 +156,14 @@ pub(crate) struct FutureState {
}

impl FutureState {
fn complete(&mut self) {
fn complete(&mut self) -> bool {
let mut made_calls = false;
for callback in self.callbacks.drain(..) {
callback.call();
made_calls = true;
}
self.complete = true;
made_calls
}
}

Expand Down Expand Up @@ -231,6 +243,48 @@ mod tests {
assert!(callback.load(Ordering::SeqCst));
}

#[test]
fn notifier_future_completes_wake() {
// Previously, if we were only using the `Future` interface to learn when a `Notifier` has
// been notified, we'd never mark the notifier as not-awaiting-notify. This caused the
// `lightning-background-processor` to persist in a tight loop.
let notifier = Notifier::new();

// First check the simple case, ensuring if we get notified a new future isn't woken until
// a second `notify`.
let callback = Arc::new(AtomicBool::new(false));
let callback_ref = Arc::clone(&callback);
notifier.get_future().register_callback(Box::new(move || assert!(!callback_ref.fetch_or(true, Ordering::SeqCst))));
assert!(!callback.load(Ordering::SeqCst));

notifier.notify();
assert!(callback.load(Ordering::SeqCst));

let callback = Arc::new(AtomicBool::new(false));
let callback_ref = Arc::clone(&callback);
notifier.get_future().register_callback(Box::new(move || assert!(!callback_ref.fetch_or(true, Ordering::SeqCst))));
assert!(!callback.load(Ordering::SeqCst));

notifier.notify();
assert!(callback.load(Ordering::SeqCst));

// Then check the case where the future is fetched before the notification, but a callback
// is only registered after the `notify`, ensuring that it is still sufficient to ensure we
// don't get an instant-wake when we get a new future.
let future = notifier.get_future();
notifier.notify();

let callback = Arc::new(AtomicBool::new(false));
let callback_ref = Arc::clone(&callback);
future.register_callback(Box::new(move || assert!(!callback_ref.fetch_or(true, Ordering::SeqCst))));
assert!(callback.load(Ordering::SeqCst));

let callback = Arc::new(AtomicBool::new(false));
let callback_ref = Arc::clone(&callback);
notifier.get_future().register_callback(Box::new(move || assert!(!callback_ref.fetch_or(true, Ordering::SeqCst))));
assert!(!callback.load(Ordering::SeqCst));
}

#[cfg(feature = "std")]
#[test]
fn test_wait_timeout() {
Expand Down