Skip to content

Commit 8157c01

Browse files
committed
Never store more than one StdWaker per live Future
When an `std::future::Future` is `poll()`ed, we're only supposed to use the latest `Waker` provided. However, we currently push an `StdWaker` onto our callback list every time `poll` is called, waking every `Waker` but also using more and more memory until the `Future` itself is woken. Here we fix this by removing any `StdWaker`s stored for a given `Future` when it is `drop`ped or prior to pushing a new `StdWaker` onto the list when `poll`ed. Sadly, the introduction of a `Drop` impl for `Future` means we can't trivially destructure the struct any longer, causing a few methods to need to take `Future`s by reference rather than ownership and `clone` a few `Arc`s. Fixes #2874
1 parent 5f404b9 commit 8157c01

File tree

2 files changed

+68
-19
lines changed

2 files changed

+68
-19
lines changed

lightning-background-processor/src/lib.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -854,8 +854,8 @@ impl BackgroundProcessor {
854854
peer_manager.onion_message_handler().process_pending_events(&event_handler),
855855
gossip_sync, logger, scorer, stop_thread.load(Ordering::Acquire),
856856
{ Sleeper::from_two_futures(
857-
channel_manager.get_event_or_persistence_needed_future(),
858-
chain_monitor.get_update_future()
857+
&channel_manager.get_event_or_persistence_needed_future(),
858+
&chain_monitor.get_update_future()
859859
).wait_timeout(Duration::from_millis(100)); },
860860
|_| Instant::now(), |time: &Instant, dur| time.elapsed().as_secs() > dur, false,
861861
|| {

lightning/src/util/wakers.rs

+66-17
Original file line numberDiff line numberDiff line change
@@ -180,16 +180,16 @@ impl Future {
180180

181181
/// Waits until this [`Future`] completes.
182182
#[cfg(feature = "std")]
183-
pub fn wait(self) {
184-
Sleeper::from_single_future(self).wait();
183+
pub fn wait(&self) {
184+
Sleeper::from_single_future(&self).wait();
185185
}
186186

187187
/// Waits until this [`Future`] completes or the given amount of time has elapsed.
188188
///
189189
/// Returns true if the [`Future`] completed, false if the time elapsed.
190190
#[cfg(feature = "std")]
191-
pub fn wait_timeout(self, max_wait: Duration) -> bool {
192-
Sleeper::from_single_future(self).wait_timeout(max_wait)
191+
pub fn wait_timeout(&self, max_wait: Duration) -> bool {
192+
Sleeper::from_single_future(&self).wait_timeout(max_wait)
193193
}
194194

195195
#[cfg(test)]
@@ -202,6 +202,12 @@ impl Future {
202202
}
203203
}
204204

205+
impl Drop for Future {
206+
fn drop(&mut self) {
207+
self.state.lock().unwrap().std_future_callbacks.retain(|(idx, _)| *idx != self.self_idx);
208+
}
209+
}
210+
205211
use core::task::Waker;
206212
struct StdWaker(pub Waker);
207213

@@ -216,6 +222,7 @@ impl<'a> StdFuture for Future {
216222
Poll::Ready(())
217223
} else {
218224
let waker = cx.waker().clone();
225+
state.std_future_callbacks.retain(|(idx, _)| *idx != self.self_idx);
219226
state.std_future_callbacks.push((self.self_idx, StdWaker(waker)));
220227
Poll::Pending
221228
}
@@ -232,17 +239,17 @@ pub struct Sleeper {
232239
#[cfg(feature = "std")]
233240
impl Sleeper {
234241
/// Constructs a new sleeper from one future, allowing blocking on it.
235-
pub fn from_single_future(future: Future) -> Self {
236-
Self { notifiers: vec![future.state] }
242+
pub fn from_single_future(future: &Future) -> Self {
243+
Self { notifiers: vec![Arc::clone(&future.state)] }
237244
}
238245
/// Constructs a new sleeper from two futures, allowing blocking on both at once.
239246
// Note that this is the common case - a ChannelManager and ChainMonitor.
240-
pub fn from_two_futures(fut_a: Future, fut_b: Future) -> Self {
241-
Self { notifiers: vec![fut_a.state, fut_b.state] }
247+
pub fn from_two_futures(fut_a: &Future, fut_b: &Future) -> Self {
248+
Self { notifiers: vec![Arc::clone(&fut_a.state), Arc::clone(&fut_b.state)] }
242249
}
243250
/// Constructs a new sleeper on many futures, allowing blocking on all at once.
244251
pub fn new(futures: Vec<Future>) -> Self {
245-
Self { notifiers: futures.into_iter().map(|f| f.state).collect() }
252+
Self { notifiers: futures.into_iter().map(|f| Arc::clone(&f.state)).collect() }
246253
}
247254
/// Prepares to go into a wait loop body, creating a condition variable which we can block on
248255
/// and an `Arc<Mutex<Option<_>>>` which gets set to the waking `Future`'s state prior to the
@@ -447,13 +454,15 @@ mod tests {
447454

448455
// Wait on the other thread to finish its sleep, note that the leak only happened if we
449456
// actually have to sleep here, not if we immediately return.
450-
Sleeper::from_two_futures(future_a, future_b).wait();
457+
Sleeper::from_two_futures(&future_a, &future_b).wait();
451458

452459
join_handle.join().unwrap();
453460

454461
// then drop the notifiers and make sure the future states are gone.
455462
mem::drop(notifier_a);
456463
mem::drop(notifier_b);
464+
mem::drop(future_a);
465+
mem::drop(future_b);
457466

458467
assert!(future_state_a.upgrade().is_none() && future_state_b.upgrade().is_none());
459468
}
@@ -655,18 +664,18 @@ mod tests {
655664
// Set both notifiers as woken without sleeping yet.
656665
notifier_a.notify();
657666
notifier_b.notify();
658-
Sleeper::from_two_futures(notifier_a.get_future(), notifier_b.get_future()).wait();
667+
Sleeper::from_two_futures(&notifier_a.get_future(), &notifier_b.get_future()).wait();
659668

660669
// One future has woken us up, but the other should still have a pending notification.
661-
Sleeper::from_two_futures(notifier_a.get_future(), notifier_b.get_future()).wait();
670+
Sleeper::from_two_futures(&notifier_a.get_future(), &notifier_b.get_future()).wait();
662671

663672
// However once we've slept twice, we should no longer have any pending notifications
664-
assert!(!Sleeper::from_two_futures(notifier_a.get_future(), notifier_b.get_future())
673+
assert!(!Sleeper::from_two_futures(&notifier_a.get_future(), &notifier_b.get_future())
665674
.wait_timeout(Duration::from_millis(10)));
666675

667676
// Test ordering somewhat more.
668677
notifier_a.notify();
669-
Sleeper::from_two_futures(notifier_a.get_future(), notifier_b.get_future()).wait();
678+
Sleeper::from_two_futures(&notifier_a.get_future(), &notifier_b.get_future()).wait();
670679
}
671680

672681
#[test]
@@ -684,7 +693,7 @@ mod tests {
684693

685694
// After sleeping one future (not guaranteed which one, however) will have its notification
686695
// bit cleared.
687-
Sleeper::from_two_futures(notifier_a.get_future(), notifier_b.get_future()).wait();
696+
Sleeper::from_two_futures(&notifier_a.get_future(), &notifier_b.get_future()).wait();
688697

689698
// By registering a callback on the futures for both notifiers, one will complete
690699
// immediately, but one will remain tied to the notifier, and will complete once the
@@ -703,8 +712,48 @@ mod tests {
703712
notifier_b.notify();
704713

705714
assert!(callback_a.load(Ordering::SeqCst) && callback_b.load(Ordering::SeqCst));
706-
Sleeper::from_two_futures(notifier_a.get_future(), notifier_b.get_future()).wait();
707-
assert!(!Sleeper::from_two_futures(notifier_a.get_future(), notifier_b.get_future())
715+
Sleeper::from_two_futures(&notifier_a.get_future(), &notifier_b.get_future()).wait();
716+
assert!(!Sleeper::from_two_futures(&notifier_a.get_future(), &notifier_b.get_future())
708717
.wait_timeout(Duration::from_millis(10)));
709718
}
719+
720+
#[test]
721+
#[cfg(feature = "std")]
722+
fn multi_poll_stores_single_waker() {
723+
// When a `Future` is `poll()`ed multiple times, only the last `Waker` should be called,
724+
// but previously we'd store all `Waker`s until they're all woken at once. This tests a few
725+
// cases to ensure `Future`s avoid storing an endless set of `Waker`s.
726+
let notifier = Notifier::new();
727+
let future_state = Arc::clone(&notifier.get_future().state);
728+
assert_eq!(future_state.lock().unwrap().std_future_callbacks.len(), 0);
729+
730+
// Test that simply polling a future twice doesn't result in two pending `Waker`s.
731+
let mut future_a = notifier.get_future();
732+
assert_eq!(Pin::new(&mut future_a).poll(&mut Context::from_waker(&create_waker().1)), Poll::Pending);
733+
assert_eq!(future_state.lock().unwrap().std_future_callbacks.len(), 1);
734+
assert_eq!(Pin::new(&mut future_a).poll(&mut Context::from_waker(&create_waker().1)), Poll::Pending);
735+
assert_eq!(future_state.lock().unwrap().std_future_callbacks.len(), 1);
736+
737+
// If we poll a second future, however, that will store a second `Waker`.
738+
let mut future_b = notifier.get_future();
739+
assert_eq!(Pin::new(&mut future_b).poll(&mut Context::from_waker(&create_waker().1)), Poll::Pending);
740+
assert_eq!(future_state.lock().unwrap().std_future_callbacks.len(), 2);
741+
742+
// but when we drop the `Future`s, the pending Wakers will also be dropped.
743+
mem::drop(future_a);
744+
assert_eq!(future_state.lock().unwrap().std_future_callbacks.len(), 1);
745+
mem::drop(future_b);
746+
assert_eq!(future_state.lock().unwrap().std_future_callbacks.len(), 0);
747+
748+
// Further, after polling a future twice, if the notifier is woken all Wakers are dropped.
749+
let mut future_a = notifier.get_future();
750+
assert_eq!(Pin::new(&mut future_a).poll(&mut Context::from_waker(&create_waker().1)), Poll::Pending);
751+
assert_eq!(future_state.lock().unwrap().std_future_callbacks.len(), 1);
752+
assert_eq!(Pin::new(&mut future_a).poll(&mut Context::from_waker(&create_waker().1)), Poll::Pending);
753+
assert_eq!(future_state.lock().unwrap().std_future_callbacks.len(), 1);
754+
notifier.notify();
755+
assert_eq!(future_state.lock().unwrap().std_future_callbacks.len(), 0);
756+
assert_eq!(Pin::new(&mut future_a).poll(&mut Context::from_waker(&create_waker().1)), Poll::Ready(()));
757+
assert_eq!(future_state.lock().unwrap().std_future_callbacks.len(), 0);
758+
}
710759
}

0 commit comments

Comments
 (0)