@@ -88,10 +88,19 @@ impl Notifier {
88
88
/// Wake waiters, tracking that wake needs to occur even if there are currently no waiters.
89
89
pub ( crate ) fn notify ( & self ) {
90
90
let mut lock = self . notify_pending . lock ( ) . unwrap ( ) ;
91
- lock . 0 = true ;
91
+ let mut future_probably_generated_calls = false ;
92
92
if let Some ( future_state) = lock. 1 . take ( ) {
93
- future_state. lock ( ) . unwrap ( ) . complete ( ) ;
93
+ future_probably_generated_calls |= future_state. lock ( ) . unwrap ( ) . complete ( ) ;
94
+ future_probably_generated_calls |= Arc :: strong_count ( & future_state) > 1 ;
95
+ }
96
+ if future_probably_generated_calls {
97
+ // If a future made some callbacks or has not yet been drop'd (i.e. the state has more
98
+ // than the one reference we hold), assume the user was notified and skip setting the
99
+ // notification-required flag. This will not cause the `wait` functions above to return
100
+ // and avoid any future `Future`s starting in a completed state.
101
+ return ;
94
102
}
103
+ lock. 0 = true ;
95
104
mem:: drop ( lock) ;
96
105
self . condvar . notify_all ( ) ;
97
106
}
@@ -147,11 +156,14 @@ pub(crate) struct FutureState {
147
156
}
148
157
149
158
impl FutureState {
150
- fn complete ( & mut self ) {
159
+ fn complete ( & mut self ) -> bool {
160
+ let mut made_calls = false ;
151
161
for callback in self . callbacks . drain ( ..) {
152
162
callback. call ( ) ;
163
+ made_calls = true ;
153
164
}
154
165
self . complete = true ;
166
+ made_calls
155
167
}
156
168
}
157
169
@@ -231,6 +243,48 @@ mod tests {
231
243
assert ! ( callback. load( Ordering :: SeqCst ) ) ;
232
244
}
233
245
246
+ #[ test]
247
+ fn notifier_future_completes_wake ( ) {
248
+ // Previously, if we were only using the `Future` interface to learn when a `Notifier` has
249
+ // been notified, we'd never mark the notifier as not-awaiting-notify. This caused the
250
+ // `lightning-background-processor` to persist in a tight loop.
251
+ let notifier = Notifier :: new ( ) ;
252
+
253
+ // First check the simple case, ensuring if we get notified a new future isn't woken until
254
+ // a second `notify`.
255
+ let callback = Arc :: new ( AtomicBool :: new ( false ) ) ;
256
+ let callback_ref = Arc :: clone ( & callback) ;
257
+ notifier. get_future ( ) . register_callback ( Box :: new ( move || assert ! ( !callback_ref. fetch_or( true , Ordering :: SeqCst ) ) ) ) ;
258
+ assert ! ( !callback. load( Ordering :: SeqCst ) ) ;
259
+
260
+ notifier. notify ( ) ;
261
+ assert ! ( callback. load( Ordering :: SeqCst ) ) ;
262
+
263
+ let callback = Arc :: new ( AtomicBool :: new ( false ) ) ;
264
+ let callback_ref = Arc :: clone ( & callback) ;
265
+ notifier. get_future ( ) . register_callback ( Box :: new ( move || assert ! ( !callback_ref. fetch_or( true , Ordering :: SeqCst ) ) ) ) ;
266
+ assert ! ( !callback. load( Ordering :: SeqCst ) ) ;
267
+
268
+ notifier. notify ( ) ;
269
+ assert ! ( callback. load( Ordering :: SeqCst ) ) ;
270
+
271
+ // Then check the case where the future is fetched before the notification, but a callback
272
+ // is only registered after the `notify`, ensuring that it is still sufficient to ensure we
273
+ // don't get an instant-wake when we get a new future.
274
+ let future = notifier. get_future ( ) ;
275
+ notifier. notify ( ) ;
276
+
277
+ let callback = Arc :: new ( AtomicBool :: new ( false ) ) ;
278
+ let callback_ref = Arc :: clone ( & callback) ;
279
+ future. register_callback ( Box :: new ( move || assert ! ( !callback_ref. fetch_or( true , Ordering :: SeqCst ) ) ) ) ;
280
+ assert ! ( callback. load( Ordering :: SeqCst ) ) ;
281
+
282
+ let callback = Arc :: new ( AtomicBool :: new ( false ) ) ;
283
+ let callback_ref = Arc :: clone ( & callback) ;
284
+ notifier. get_future ( ) . register_callback ( Box :: new ( move || assert ! ( !callback_ref. fetch_or( true , Ordering :: SeqCst ) ) ) ) ;
285
+ assert ! ( !callback. load( Ordering :: SeqCst ) ) ;
286
+ }
287
+
234
288
#[ cfg( feature = "std" ) ]
235
289
#[ test]
236
290
fn test_wait_timeout ( ) {
0 commit comments