Skip to content

Commit e32020c

Browse files
authored
Merge pull request #2894 from TheBlueMatt/2024-02-future-poll-leak
Never store more than one StdWaker per live Future
2 parents 3fd4b39 + 8157c01 commit e32020c

File tree

2 files changed

+111
-45
lines changed

2 files changed

+111
-45
lines changed

lightning-background-processor/src/lib.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -854,8 +854,8 @@ impl BackgroundProcessor {
854854
peer_manager.onion_message_handler().process_pending_events(&event_handler),
855855
gossip_sync, logger, scorer, stop_thread.load(Ordering::Acquire),
856856
{ Sleeper::from_two_futures(
857-
channel_manager.get_event_or_persistence_needed_future(),
858-
chain_monitor.get_update_future()
857+
&channel_manager.get_event_or_persistence_needed_future(),
858+
&chain_monitor.get_update_future()
859859
).wait_timeout(Duration::from_millis(100)); },
860860
|_| Instant::now(), |time: &Instant, dur| time.elapsed().as_secs() > dur, false,
861861
|| {

lightning/src/util/wakers.rs

+109-43
Original file line numberDiff line numberDiff line change
@@ -56,25 +56,33 @@ impl Notifier {
5656
/// Gets a [`Future`] that will get woken up with any waiters
5757
pub(crate) fn get_future(&self) -> Future {
5858
let mut lock = self.notify_pending.lock().unwrap();
59+
let mut self_idx = 0;
5960
if let Some(existing_state) = &lock.1 {
60-
if existing_state.lock().unwrap().callbacks_made {
61+
let mut locked = existing_state.lock().unwrap();
62+
if locked.callbacks_made {
6163
// If the existing `FutureState` has completed and actually made callbacks,
6264
// consider the notification flag to have been cleared and reset the future state.
65+
mem::drop(locked);
6366
lock.1.take();
6467
lock.0 = false;
68+
} else {
69+
self_idx = locked.next_idx;
70+
locked.next_idx += 1;
6571
}
6672
}
6773
if let Some(existing_state) = &lock.1 {
68-
Future { state: Arc::clone(&existing_state) }
74+
Future { state: Arc::clone(&existing_state), self_idx }
6975
} else {
7076
let state = Arc::new(Mutex::new(FutureState {
7177
callbacks: Vec::new(),
78+
std_future_callbacks: Vec::new(),
7279
callbacks_with_state: Vec::new(),
7380
complete: lock.0,
7481
callbacks_made: false,
82+
next_idx: 1,
7583
}));
7684
lock.1 = Some(Arc::clone(&state));
77-
Future { state }
85+
Future { state, self_idx: 0 }
7886
}
7987
}
8088

@@ -109,36 +117,39 @@ define_callback!(Send);
109117
define_callback!();
110118

111119
pub(crate) struct FutureState {
112-
// When we're tracking whether a callback counts as having woken the user's code, we check the
113-
// first bool - set to false if we're just calling a Waker, and true if we're calling an actual
114-
// user-provided function.
115-
callbacks: Vec<(bool, Box<dyn FutureCallback>)>,
116-
callbacks_with_state: Vec<(bool, Box<dyn Fn(&Arc<Mutex<FutureState>>) -> () + Send>)>,
120+
// `callbacks` count as having woken the users' code (as they go direct to the user), but
121+
// `std_future_callbacks` and `callbacks_with_state` do not (as the first just wakes a future,
122+
// we only count it after another `poll()` and the second wakes a `Sleeper` which handles
123+
// setting `callbacks_made` itself).
124+
callbacks: Vec<Box<dyn FutureCallback>>,
125+
std_future_callbacks: Vec<(usize, StdWaker)>,
126+
callbacks_with_state: Vec<Box<dyn Fn(&Arc<Mutex<FutureState>>) -> () + Send>>,
117127
complete: bool,
118128
callbacks_made: bool,
129+
next_idx: usize,
119130
}
120131

121132
fn complete_future(this: &Arc<Mutex<FutureState>>) -> bool {
122133
let mut state_lock = this.lock().unwrap();
123134
let state = &mut *state_lock;
124-
for (counts_as_call, callback) in state.callbacks.drain(..) {
135+
for callback in state.callbacks.drain(..) {
125136
callback.call();
126-
state.callbacks_made |= counts_as_call;
137+
state.callbacks_made = true;
127138
}
128-
for (counts_as_call, callback) in state.callbacks_with_state.drain(..) {
139+
for (_, waker) in state.std_future_callbacks.drain(..) {
140+
waker.0.wake_by_ref();
141+
}
142+
for callback in state.callbacks_with_state.drain(..) {
129143
(callback)(this);
130-
state.callbacks_made |= counts_as_call;
131144
}
132145
state.complete = true;
133146
state.callbacks_made
134147
}
135148

136149
/// A simple future which can complete once, and calls some callback(s) when it does so.
137-
///
138-
/// Clones can be made and all futures cloned from the same source will complete at the same time.
139-
#[derive(Clone)]
140150
pub struct Future {
141151
state: Arc<Mutex<FutureState>>,
152+
self_idx: usize,
142153
}
143154

144155
impl Future {
@@ -153,7 +164,7 @@ impl Future {
153164
mem::drop(state);
154165
callback.call();
155166
} else {
156-
state.callbacks.push((true, callback));
167+
state.callbacks.push(callback);
157168
}
158169
}
159170

@@ -169,16 +180,16 @@ impl Future {
169180

170181
/// Waits until this [`Future`] completes.
171182
#[cfg(feature = "std")]
172-
pub fn wait(self) {
173-
Sleeper::from_single_future(self).wait();
183+
pub fn wait(&self) {
184+
Sleeper::from_single_future(&self).wait();
174185
}
175186

176187
/// Waits until this [`Future`] completes or the given amount of time has elapsed.
177188
///
178189
/// Returns true if the [`Future`] completed, false if the time elapsed.
179190
#[cfg(feature = "std")]
180-
pub fn wait_timeout(self, max_wait: Duration) -> bool {
181-
Sleeper::from_single_future(self).wait_timeout(max_wait)
191+
pub fn wait_timeout(&self, max_wait: Duration) -> bool {
192+
Sleeper::from_single_future(&self).wait_timeout(max_wait)
182193
}
183194

184195
#[cfg(test)]
@@ -191,11 +202,14 @@ impl Future {
191202
}
192203
}
193204

205+
impl Drop for Future {
206+
fn drop(&mut self) {
207+
self.state.lock().unwrap().std_future_callbacks.retain(|(idx, _)| *idx != self.self_idx);
208+
}
209+
}
210+
194211
use core::task::Waker;
195212
struct StdWaker(pub Waker);
196-
impl FutureCallback for StdWaker {
197-
fn call(&self) { self.0.wake_by_ref() }
198-
}
199213

200214
/// This is not exported to bindings users as Rust Futures aren't usable in language bindings.
201215
impl<'a> StdFuture for Future {
@@ -208,7 +222,8 @@ impl<'a> StdFuture for Future {
208222
Poll::Ready(())
209223
} else {
210224
let waker = cx.waker().clone();
211-
state.callbacks.push((false, Box::new(StdWaker(waker))));
225+
state.std_future_callbacks.retain(|(idx, _)| *idx != self.self_idx);
226+
state.std_future_callbacks.push((self.self_idx, StdWaker(waker)));
212227
Poll::Pending
213228
}
214229
}
@@ -224,17 +239,17 @@ pub struct Sleeper {
224239
#[cfg(feature = "std")]
225240
impl Sleeper {
226241
/// Constructs a new sleeper from one future, allowing blocking on it.
227-
pub fn from_single_future(future: Future) -> Self {
228-
Self { notifiers: vec![future.state] }
242+
pub fn from_single_future(future: &Future) -> Self {
243+
Self { notifiers: vec![Arc::clone(&future.state)] }
229244
}
230245
/// Constructs a new sleeper from two futures, allowing blocking on both at once.
231246
// Note that this is the common case - a ChannelManager and ChainMonitor.
232-
pub fn from_two_futures(fut_a: Future, fut_b: Future) -> Self {
233-
Self { notifiers: vec![fut_a.state, fut_b.state] }
247+
pub fn from_two_futures(fut_a: &Future, fut_b: &Future) -> Self {
248+
Self { notifiers: vec![Arc::clone(&fut_a.state), Arc::clone(&fut_b.state)] }
234249
}
235250
/// Constructs a new sleeper on many futures, allowing blocking on all at once.
236251
pub fn new(futures: Vec<Future>) -> Self {
237-
Self { notifiers: futures.into_iter().map(|f| f.state).collect() }
252+
Self { notifiers: futures.into_iter().map(|f| Arc::clone(&f.state)).collect() }
238253
}
239254
/// Prepares to go into a wait loop body, creating a condition variable which we can block on
240255
/// and an `Arc<Mutex<Option<_>>>` which gets set to the waking `Future`'s state prior to the
@@ -251,10 +266,10 @@ impl Sleeper {
251266
*notified_fut_mtx.lock().unwrap() = Some(Arc::clone(&notifier_mtx));
252267
break;
253268
}
254-
notifier.callbacks_with_state.push((false, Box::new(move |notifier_ref| {
269+
notifier.callbacks_with_state.push(Box::new(move |notifier_ref| {
255270
*notified_fut_ref.lock().unwrap() = Some(Arc::clone(notifier_ref));
256271
cv_ref.notify_all();
257-
})));
272+
}));
258273
}
259274
}
260275
(cv, notified_fut_mtx)
@@ -439,13 +454,15 @@ mod tests {
439454

440455
// Wait on the other thread to finish its sleep, note that the leak only happened if we
441456
// actually have to sleep here, not if we immediately return.
442-
Sleeper::from_two_futures(future_a, future_b).wait();
457+
Sleeper::from_two_futures(&future_a, &future_b).wait();
443458

444459
join_handle.join().unwrap();
445460

446461
// then drop the notifiers and make sure the future states are gone.
447462
mem::drop(notifier_a);
448463
mem::drop(notifier_b);
464+
mem::drop(future_a);
465+
mem::drop(future_b);
449466

450467
assert!(future_state_a.upgrade().is_none() && future_state_b.upgrade().is_none());
451468
}
@@ -455,10 +472,13 @@ mod tests {
455472
let future = Future {
456473
state: Arc::new(Mutex::new(FutureState {
457474
callbacks: Vec::new(),
475+
std_future_callbacks: Vec::new(),
458476
callbacks_with_state: Vec::new(),
459477
complete: false,
460478
callbacks_made: false,
461-
}))
479+
next_idx: 1,
480+
})),
481+
self_idx: 0,
462482
};
463483
let callback = Arc::new(AtomicBool::new(false));
464484
let callback_ref = Arc::clone(&callback);
@@ -475,10 +495,13 @@ mod tests {
475495
let future = Future {
476496
state: Arc::new(Mutex::new(FutureState {
477497
callbacks: Vec::new(),
498+
std_future_callbacks: Vec::new(),
478499
callbacks_with_state: Vec::new(),
479500
complete: false,
480501
callbacks_made: false,
481-
}))
502+
next_idx: 1,
503+
})),
504+
self_idx: 0,
482505
};
483506
complete_future(&future.state);
484507

@@ -514,12 +537,15 @@ mod tests {
514537
let mut future = Future {
515538
state: Arc::new(Mutex::new(FutureState {
516539
callbacks: Vec::new(),
540+
std_future_callbacks: Vec::new(),
517541
callbacks_with_state: Vec::new(),
518542
complete: false,
519543
callbacks_made: false,
520-
}))
544+
next_idx: 2,
545+
})),
546+
self_idx: 0,
521547
};
522-
let mut second_future = Future { state: Arc::clone(&future.state) };
548+
let mut second_future = Future { state: Arc::clone(&future.state), self_idx: 1 };
523549

524550
let (woken, waker) = create_waker();
525551
assert_eq!(Pin::new(&mut future).poll(&mut Context::from_waker(&waker)), Poll::Pending);
@@ -638,18 +664,18 @@ mod tests {
638664
// Set both notifiers as woken without sleeping yet.
639665
notifier_a.notify();
640666
notifier_b.notify();
641-
Sleeper::from_two_futures(notifier_a.get_future(), notifier_b.get_future()).wait();
667+
Sleeper::from_two_futures(&notifier_a.get_future(), &notifier_b.get_future()).wait();
642668

643669
// One future has woken us up, but the other should still have a pending notification.
644-
Sleeper::from_two_futures(notifier_a.get_future(), notifier_b.get_future()).wait();
670+
Sleeper::from_two_futures(&notifier_a.get_future(), &notifier_b.get_future()).wait();
645671

646672
// However once we've slept twice, we should no longer have any pending notifications
647-
assert!(!Sleeper::from_two_futures(notifier_a.get_future(), notifier_b.get_future())
673+
assert!(!Sleeper::from_two_futures(&notifier_a.get_future(), &notifier_b.get_future())
648674
.wait_timeout(Duration::from_millis(10)));
649675

650676
// Test ordering somewhat more.
651677
notifier_a.notify();
652-
Sleeper::from_two_futures(notifier_a.get_future(), notifier_b.get_future()).wait();
678+
Sleeper::from_two_futures(&notifier_a.get_future(), &notifier_b.get_future()).wait();
653679
}
654680

655681
#[test]
@@ -667,7 +693,7 @@ mod tests {
667693

668694
// After sleeping one future (not guaranteed which one, however) will have its notification
669695
// bit cleared.
670-
Sleeper::from_two_futures(notifier_a.get_future(), notifier_b.get_future()).wait();
696+
Sleeper::from_two_futures(&notifier_a.get_future(), &notifier_b.get_future()).wait();
671697

672698
// By registering a callback on the futures for both notifiers, one will complete
673699
// immediately, but one will remain tied to the notifier, and will complete once the
@@ -686,8 +712,48 @@ mod tests {
686712
notifier_b.notify();
687713

688714
assert!(callback_a.load(Ordering::SeqCst) && callback_b.load(Ordering::SeqCst));
689-
Sleeper::from_two_futures(notifier_a.get_future(), notifier_b.get_future()).wait();
690-
assert!(!Sleeper::from_two_futures(notifier_a.get_future(), notifier_b.get_future())
715+
Sleeper::from_two_futures(&notifier_a.get_future(), &notifier_b.get_future()).wait();
716+
assert!(!Sleeper::from_two_futures(&notifier_a.get_future(), &notifier_b.get_future())
691717
.wait_timeout(Duration::from_millis(10)));
692718
}
719+
720+
#[test]
721+
#[cfg(feature = "std")]
722+
fn multi_poll_stores_single_waker() {
723+
// When a `Future` is `poll()`ed multiple times, only the last `Waker` should be called,
724+
// but previously we'd store all `Waker`s until they're all woken at once. This tests a few
725+
// cases to ensure `Future`s avoid storing an endless set of `Waker`s.
726+
let notifier = Notifier::new();
727+
let future_state = Arc::clone(&notifier.get_future().state);
728+
assert_eq!(future_state.lock().unwrap().std_future_callbacks.len(), 0);
729+
730+
// Test that simply polling a future twice doesn't result in two pending `Waker`s.
731+
let mut future_a = notifier.get_future();
732+
assert_eq!(Pin::new(&mut future_a).poll(&mut Context::from_waker(&create_waker().1)), Poll::Pending);
733+
assert_eq!(future_state.lock().unwrap().std_future_callbacks.len(), 1);
734+
assert_eq!(Pin::new(&mut future_a).poll(&mut Context::from_waker(&create_waker().1)), Poll::Pending);
735+
assert_eq!(future_state.lock().unwrap().std_future_callbacks.len(), 1);
736+
737+
// If we poll a second future, however, that will store a second `Waker`.
738+
let mut future_b = notifier.get_future();
739+
assert_eq!(Pin::new(&mut future_b).poll(&mut Context::from_waker(&create_waker().1)), Poll::Pending);
740+
assert_eq!(future_state.lock().unwrap().std_future_callbacks.len(), 2);
741+
742+
// but when we drop the `Future`s, the pending Wakers will also be dropped.
743+
mem::drop(future_a);
744+
assert_eq!(future_state.lock().unwrap().std_future_callbacks.len(), 1);
745+
mem::drop(future_b);
746+
assert_eq!(future_state.lock().unwrap().std_future_callbacks.len(), 0);
747+
748+
// Further, after polling a future twice, if the notifier is woken all Wakers are dropped.
749+
let mut future_a = notifier.get_future();
750+
assert_eq!(Pin::new(&mut future_a).poll(&mut Context::from_waker(&create_waker().1)), Poll::Pending);
751+
assert_eq!(future_state.lock().unwrap().std_future_callbacks.len(), 1);
752+
assert_eq!(Pin::new(&mut future_a).poll(&mut Context::from_waker(&create_waker().1)), Poll::Pending);
753+
assert_eq!(future_state.lock().unwrap().std_future_callbacks.len(), 1);
754+
notifier.notify();
755+
assert_eq!(future_state.lock().unwrap().std_future_callbacks.len(), 0);
756+
assert_eq!(Pin::new(&mut future_a).poll(&mut Context::from_waker(&create_waker().1)), Poll::Ready(()));
757+
assert_eq!(future_state.lock().unwrap().std_future_callbacks.len(), 0);
758+
}
693759
}

0 commit comments

Comments
 (0)