Skip to content

Commit 6890e43

Browse files
committed
Wake the background processor if an async monitor update completes
If the `ChainMonitor` gets an async monitor update completion, this means the `ChannelManager` needs to be polled for event processing. Here we wake it using the new multi-`Future`-await `Sleeper`, or the existing `select` block in the async BP. Fixes #2052.
1 parent 3acf7e2 commit 6890e43

File tree

2 files changed

+42
-7
lines changed

2 files changed

+42
-7
lines changed

lightning-background-processor/src/lib.rs

+23-7
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,8 @@ use lightning::routing::router::Router;
3838
use lightning::routing::scoring::{Score, WriteableScore};
3939
use lightning::util::logger::Logger;
4040
use lightning::util::persist::Persister;
41+
#[cfg(feature = "std")]
42+
use lightning::util::wakers::Sleeper;
4143
use lightning_rapid_gossip_sync::RapidGossipSync;
4244

4345
use core::ops::Deref;
@@ -388,23 +390,32 @@ pub(crate) mod futures_util {
388390
use core::task::{Poll, Waker, RawWaker, RawWakerVTable};
389391
use core::pin::Pin;
390392
use core::marker::Unpin;
391-
pub(crate) struct Selector<A: Future<Output=()> + Unpin, B: Future<Output=bool> + Unpin> {
393+
pub(crate) struct Selector<
394+
A: Future<Output=()> + Unpin, B: Future<Output=()> + Unpin, C: Future<Output=bool> + Unpin
395+
> {
392396
pub a: A,
393397
pub b: B,
398+
pub c: C,
394399
}
395400
pub(crate) enum SelectorOutput {
396-
A, B(bool),
401+
A, B, C(bool),
397402
}
398403

399-
impl<A: Future<Output=()> + Unpin, B: Future<Output=bool> + Unpin> Future for Selector<A, B> {
404+
impl<
405+
A: Future<Output=()> + Unpin, B: Future<Output=()> + Unpin, C: Future<Output=bool> + Unpin
406+
> Future for Selector<A, B, C> {
400407
type Output = SelectorOutput;
401408
fn poll(mut self: Pin<&mut Self>, ctx: &mut core::task::Context<'_>) -> Poll<SelectorOutput> {
402409
match Pin::new(&mut self.a).poll(ctx) {
403410
Poll::Ready(()) => { return Poll::Ready(SelectorOutput::A); },
404411
Poll::Pending => {},
405412
}
406413
match Pin::new(&mut self.b).poll(ctx) {
407-
Poll::Ready(res) => { return Poll::Ready(SelectorOutput::B(res)); },
414+
Poll::Ready(()) => { return Poll::Ready(SelectorOutput::B); },
415+
Poll::Pending => {},
416+
}
417+
match Pin::new(&mut self.c).poll(ctx) {
418+
Poll::Ready(res) => { return Poll::Ready(SelectorOutput::C(res)); },
408419
Poll::Pending => {},
409420
}
410421
Poll::Pending
@@ -514,11 +525,13 @@ where
514525
gossip_sync, peer_manager, logger, scorer, should_break, {
515526
let fut = Selector {
516527
a: channel_manager.get_persistable_update_future(),
517-
b: sleeper(Duration::from_millis(100)),
528+
b: chain_monitor.get_update_future(),
529+
c: sleeper(Duration::from_millis(100)),
518530
};
519531
match fut.await {
520532
SelectorOutput::A => true,
521-
SelectorOutput::B(exit) => {
533+
SelectorOutput::B => false,
534+
SelectorOutput::C(exit) => {
522535
should_break = exit;
523536
false
524537
}
@@ -643,7 +656,10 @@ impl BackgroundProcessor {
643656
define_run_body!(persister, chain_monitor, chain_monitor.process_pending_events(&event_handler),
644657
channel_manager, channel_manager.process_pending_events(&event_handler),
645658
gossip_sync, peer_manager, logger, scorer, stop_thread.load(Ordering::Acquire),
646-
channel_manager.get_persistable_update_future().wait_timeout(Duration::from_millis(100)),
659+
Sleeper::from_two_futures(
660+
channel_manager.get_persistable_update_future(),
661+
chain_monitor.get_update_future()
662+
).wait_timeout(Duration::from_millis(100)),
647663
|_| Instant::now(), |time: &Instant, dur| time.elapsed().as_secs() > dur)
648664
});
649665
Self { stop_thread: stop_thread_clone, thread_handle: Some(handle) }

lightning/src/chain/chainmonitor.rs

+19
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ use crate::events::{Event, EventHandler};
3737
use crate::util::atomic_counter::AtomicCounter;
3838
use crate::util::logger::Logger;
3939
use crate::util::errors::APIError;
40+
use crate::util::wakers::{Future, Notifier};
4041
use crate::ln::channelmanager::ChannelDetails;
4142

4243
use crate::prelude::*;
@@ -240,6 +241,8 @@ pub struct ChainMonitor<ChannelSigner: WriteableEcdsaChannelSigner, C: Deref, T:
240241
pending_monitor_events: Mutex<Vec<(OutPoint, Vec<MonitorEvent>, Option<PublicKey>)>>,
241242
/// The best block height seen, used as a proxy for the passage of time.
242243
highest_chain_height: AtomicUsize,
244+
245+
event_notifier: Notifier,
243246
}
244247

245248
impl<ChannelSigner: WriteableEcdsaChannelSigner, C: Deref, T: Deref, F: Deref, L: Deref, P: Deref> ChainMonitor<ChannelSigner, C, T, F, L, P>
@@ -300,6 +303,7 @@ where C::Target: chain::Filter,
300303
ChannelMonitorUpdateStatus::PermanentFailure => {
301304
monitor_state.channel_perm_failed.store(true, Ordering::Release);
302305
self.pending_monitor_events.lock().unwrap().push((*funding_outpoint, vec![MonitorEvent::UpdateFailed(*funding_outpoint)], monitor.get_counterparty_node_id()));
306+
self.event_notifier.notify();
303307
},
304308
ChannelMonitorUpdateStatus::InProgress => {
305309
log_debug!(self.logger, "Channel Monitor sync for channel {} in progress, holding events until completion!", log_funding_info!(monitor));
@@ -345,6 +349,7 @@ where C::Target: chain::Filter,
345349
persister,
346350
pending_monitor_events: Mutex::new(Vec::new()),
347351
highest_chain_height: AtomicUsize::new(0),
352+
event_notifier: Notifier::new(),
348353
}
349354
}
350355

@@ -472,6 +477,7 @@ where C::Target: chain::Filter,
472477
}
473478
},
474479
}
480+
self.event_notifier.notify();
475481
Ok(())
476482
}
477483

@@ -486,6 +492,7 @@ where C::Target: chain::Filter,
486492
funding_txo,
487493
monitor_update_id,
488494
}], counterparty_node_id));
495+
self.event_notifier.notify();
489496
}
490497

491498
#[cfg(any(test, fuzzing, feature = "_test_utils"))]
@@ -514,6 +521,18 @@ where C::Target: chain::Filter,
514521
handler(event).await;
515522
}
516523
}
524+
525+
/// Gets a [`Future`] that completes when an event is available either via
526+
/// [`chain::Watch::release_pending_monitor_events`] or
527+
/// [`EventsProvider::process_pending_events`].
528+
///
529+
/// Note that callbacks registered on the [`Future`] MUST NOT call back into this
530+
/// [`ChainMonitor`] and should instead register actions to be taken later.
531+
///
532+
/// [`EventsProvider::process_pending_events`]: crate::events::EventsProvider::process_pending_events
533+
pub fn get_update_future(&self) -> Future {
534+
self.event_notifier.get_future()
535+
}
517536
}
518537

519538
impl<ChannelSigner: WriteableEcdsaChannelSigner, C: Deref, T: Deref, F: Deref, L: Deref, P: Deref>

0 commit comments

Comments
 (0)