Skip to content

Commit d4fc1a7

Browse files
authored
Merge pull request #2233 from TheBlueMatt/2023-04-fix-future-leak
Fix a leak in FutureState when a Notifier is dropped un-woken
2 parents c182567 + 7caa584 commit d4fc1a7

File tree

1 file changed

+63
-17
lines changed

1 file changed

+63
-17
lines changed

lightning/src/util/wakers.rs

+63-17
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ impl Notifier {
4545
pub(crate) fn notify(&self) {
4646
let mut lock = self.notify_pending.lock().unwrap();
4747
if let Some(future_state) = &lock.1 {
48-
if future_state.lock().unwrap().complete() {
48+
if complete_future(future_state) {
4949
lock.1 = None;
5050
return;
5151
}
@@ -69,6 +69,7 @@ impl Notifier {
6969
} else {
7070
let state = Arc::new(Mutex::new(FutureState {
7171
callbacks: Vec::new(),
72+
callbacks_with_state: Vec::new(),
7273
complete: lock.0,
7374
callbacks_made: false,
7475
}));
@@ -112,19 +113,24 @@ pub(crate) struct FutureState {
112113
// first bool - set to false if we're just calling a Waker, and true if we're calling an actual
113114
// user-provided function.
114115
callbacks: Vec<(bool, Box<dyn FutureCallback>)>,
116+
callbacks_with_state: Vec<(bool, Box<dyn Fn(&Arc<Mutex<FutureState>>) -> () + Send>)>,
115117
complete: bool,
116118
callbacks_made: bool,
117119
}
118120

119-
impl FutureState {
120-
fn complete(&mut self) -> bool {
121-
for (counts_as_call, callback) in self.callbacks.drain(..) {
122-
callback.call();
123-
self.callbacks_made |= counts_as_call;
124-
}
125-
self.complete = true;
126-
self.callbacks_made
121+
fn complete_future(this: &Arc<Mutex<FutureState>>) -> bool {
122+
let mut state_lock = this.lock().unwrap();
123+
let state = &mut *state_lock;
124+
for (counts_as_call, callback) in state.callbacks.drain(..) {
125+
callback.call();
126+
state.callbacks_made |= counts_as_call;
127+
}
128+
for (counts_as_call, callback) in state.callbacks_with_state.drain(..) {
129+
(callback)(this);
130+
state.callbacks_made |= counts_as_call;
127131
}
132+
state.complete = true;
133+
state.callbacks_made
128134
}
129135

130136
/// A simple future which can complete once, and calls some callback(s) when it does so.
@@ -240,14 +246,13 @@ impl Sleeper {
240246
for notifier_mtx in self.notifiers.iter() {
241247
let cv_ref = Arc::clone(&cv);
242248
let notified_fut_ref = Arc::clone(&notified_fut_mtx);
243-
let notifier_ref = Arc::clone(&notifier_mtx);
244249
let mut notifier = notifier_mtx.lock().unwrap();
245250
if notifier.complete {
246-
*notified_fut_mtx.lock().unwrap() = Some(notifier_ref);
251+
*notified_fut_mtx.lock().unwrap() = Some(Arc::clone(&notifier_mtx));
247252
break;
248253
}
249-
notifier.callbacks.push((false, Box::new(move || {
250-
*notified_fut_ref.lock().unwrap() = Some(Arc::clone(&notifier_ref));
254+
notifier.callbacks_with_state.push((false, Box::new(move |notifier_ref| {
255+
*notified_fut_ref.lock().unwrap() = Some(Arc::clone(notifier_ref));
251256
cv_ref.notify_all();
252257
})));
253258
}
@@ -407,11 +412,50 @@ mod tests {
407412
}
408413
}
409414

415+
#[cfg(feature = "std")]
416+
#[test]
417+
fn test_state_drops() {
418+
// Previously, there was a leak if a `Notifier` was `drop`ed without ever being notified
419+
// but after having been slept-on. This tests for that leak.
420+
use crate::sync::Arc;
421+
use std::thread;
422+
423+
let notifier_a = Arc::new(Notifier::new());
424+
let notifier_b = Arc::new(Notifier::new());
425+
426+
let thread_notifier_a = Arc::clone(&notifier_a);
427+
428+
let future_a = notifier_a.get_future();
429+
let future_state_a = Arc::downgrade(&future_a.state);
430+
431+
let future_b = notifier_b.get_future();
432+
let future_state_b = Arc::downgrade(&future_b.state);
433+
434+
let join_handle = thread::spawn(move || {
435+
// Let the other thread get to the wait point, then notify it.
436+
std::thread::sleep(Duration::from_millis(50));
437+
thread_notifier_a.notify();
438+
});
439+
440+
// Wait on the other thread to finish its sleep, note that the leak only happened if we
441+
// actually have to sleep here, not if we immediately return.
442+
Sleeper::from_two_futures(future_a, future_b).wait();
443+
444+
join_handle.join().unwrap();
445+
446+
// then drop the notifiers and make sure the future states are gone.
447+
mem::drop(notifier_a);
448+
mem::drop(notifier_b);
449+
450+
assert!(future_state_a.upgrade().is_none() && future_state_b.upgrade().is_none());
451+
}
452+
410453
#[test]
411454
fn test_future_callbacks() {
412455
let future = Future {
413456
state: Arc::new(Mutex::new(FutureState {
414457
callbacks: Vec::new(),
458+
callbacks_with_state: Vec::new(),
415459
complete: false,
416460
callbacks_made: false,
417461
}))
@@ -421,21 +465,22 @@ mod tests {
421465
future.register_callback(Box::new(move || assert!(!callback_ref.fetch_or(true, Ordering::SeqCst))));
422466

423467
assert!(!callback.load(Ordering::SeqCst));
424-
future.state.lock().unwrap().complete();
468+
complete_future(&future.state);
425469
assert!(callback.load(Ordering::SeqCst));
426-
future.state.lock().unwrap().complete();
470+
complete_future(&future.state);
427471
}
428472

429473
#[test]
430474
fn test_pre_completed_future_callbacks() {
431475
let future = Future {
432476
state: Arc::new(Mutex::new(FutureState {
433477
callbacks: Vec::new(),
478+
callbacks_with_state: Vec::new(),
434479
complete: false,
435480
callbacks_made: false,
436481
}))
437482
};
438-
future.state.lock().unwrap().complete();
483+
complete_future(&future.state);
439484

440485
let callback = Arc::new(AtomicBool::new(false));
441486
let callback_ref = Arc::clone(&callback);
@@ -469,6 +514,7 @@ mod tests {
469514
let mut future = Future {
470515
state: Arc::new(Mutex::new(FutureState {
471516
callbacks: Vec::new(),
517+
callbacks_with_state: Vec::new(),
472518
complete: false,
473519
callbacks_made: false,
474520
}))
@@ -483,7 +529,7 @@ mod tests {
483529
assert_eq!(Pin::new(&mut second_future).poll(&mut Context::from_waker(&second_waker)), Poll::Pending);
484530
assert!(!second_woken.load(Ordering::SeqCst));
485531

486-
future.state.lock().unwrap().complete();
532+
complete_future(&future.state);
487533
assert!(woken.load(Ordering::SeqCst));
488534
assert!(second_woken.load(Ordering::SeqCst));
489535
assert_eq!(Pin::new(&mut future).poll(&mut Context::from_waker(&waker)), Poll::Ready(()));

0 commit comments

Comments
 (0)