Skip to content

Commit 3373f42

Browse files
committed
Wake the background processor if an async monitor update completes
If the `ChainMonitor` gets an async monitor update completion, this means the `ChannelManager` needs to be polled for event processing. Here we wake it using the new multi-`Future`-await `Sleeper`, or the existing `select` block in the async BP. Fixes #2052.
1 parent 41f5c12 commit 3373f42

File tree

2 files changed

+27
-2
lines changed

2 files changed

+27
-2
lines changed

lightning-background-processor/src/lib.rs

+8-2
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,8 @@ use lightning::routing::router::Router;
3838
use lightning::routing::scoring::{Score, WriteableScore};
3939
use lightning::util::logger::Logger;
4040
use lightning::util::persist::Persister;
41+
#[cfg(feature = "std")]
42+
use lightning::util::wakers::Sleeper;
4143
use lightning_rapid_gossip_sync::RapidGossipSync;
4244

4345
use core::ops::Deref;
@@ -471,7 +473,8 @@ where
471473
channel_manager, channel_manager.process_pending_events_async(async_event_handler).await,
472474
gossip_sync, peer_manager, logger, scorer, should_break, {
473475
select_biased! {
474-
_ = channel_manager.get_persistable_update_future().fuse() => true,
476+
_ = chain_monitor.get_update_future().fuse() => true,
477+
_ = channel_manager.get_persistable_update_future().fuse() => false,
475478
exit = sleeper(Duration::from_millis(100)).fuse() => {
476479
should_break = exit;
477480
false
@@ -597,7 +600,10 @@ impl BackgroundProcessor {
597600
define_run_body!(persister, chain_monitor, chain_monitor.process_pending_events(&event_handler),
598601
channel_manager, channel_manager.process_pending_events(&event_handler),
599602
gossip_sync, peer_manager, logger, scorer, stop_thread.load(Ordering::Acquire),
600-
channel_manager.get_persistable_update_future().wait_timeout(Duration::from_millis(100)),
603+
Sleeper::from_two_futures(
604+
channel_manager.get_persistable_update_future(),
605+
chain_monitor.get_update_future()
606+
).wait_timeout(Duration::from_millis(100)),
601607
|_| Instant::now(), |time: &Instant, dur| time.elapsed().as_secs() > dur)
602608
});
603609
Self { stop_thread: stop_thread_clone, thread_handle: Some(handle) }

lightning/src/chain/chainmonitor.rs

+19
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ use crate::events::{Event, EventHandler};
3737
use crate::util::atomic_counter::AtomicCounter;
3838
use crate::util::logger::Logger;
3939
use crate::util::errors::APIError;
40+
use crate::util::wakers::{Future, Notifier};
4041
use crate::ln::channelmanager::ChannelDetails;
4142

4243
use crate::prelude::*;
@@ -240,6 +241,8 @@ pub struct ChainMonitor<ChannelSigner: WriteableEcdsaChannelSigner, C: Deref, T:
240241
pending_monitor_events: Mutex<Vec<(OutPoint, Vec<MonitorEvent>, Option<PublicKey>)>>,
241242
/// The best block height seen, used as a proxy for the passage of time.
242243
highest_chain_height: AtomicUsize,
244+
245+
event_notifier: Notifier,
243246
}
244247

245248
impl<ChannelSigner: WriteableEcdsaChannelSigner, C: Deref, T: Deref, F: Deref, L: Deref, P: Deref> ChainMonitor<ChannelSigner, C, T, F, L, P>
@@ -300,6 +303,7 @@ where C::Target: chain::Filter,
300303
ChannelMonitorUpdateStatus::PermanentFailure => {
301304
monitor_state.channel_perm_failed.store(true, Ordering::Release);
302305
self.pending_monitor_events.lock().unwrap().push((*funding_outpoint, vec![MonitorEvent::UpdateFailed(*funding_outpoint)], monitor.get_counterparty_node_id()));
306+
self.event_notifier.notify();
303307
},
304308
ChannelMonitorUpdateStatus::InProgress => {
305309
log_debug!(self.logger, "Channel Monitor sync for channel {} in progress, holding events until completion!", log_funding_info!(monitor));
@@ -345,6 +349,7 @@ where C::Target: chain::Filter,
345349
persister,
346350
pending_monitor_events: Mutex::new(Vec::new()),
347351
highest_chain_height: AtomicUsize::new(0),
352+
event_notifier: Notifier::new(),
348353
}
349354
}
350355

@@ -472,6 +477,7 @@ where C::Target: chain::Filter,
472477
}
473478
},
474479
}
480+
self.event_notifier.notify();
475481
Ok(())
476482
}
477483

@@ -486,6 +492,7 @@ where C::Target: chain::Filter,
486492
funding_txo,
487493
monitor_update_id,
488494
}], counterparty_node_id));
495+
self.event_notifier.notify();
489496
}
490497

491498
#[cfg(any(test, fuzzing, feature = "_test_utils"))]
@@ -514,6 +521,18 @@ where C::Target: chain::Filter,
514521
handler(event).await;
515522
}
516523
}
524+
525+
/// Gets a [`Future`] that completes when an event is available either via
526+
/// [`chain::Watch::release_pending_monitor_events`] or
527+
/// [`EventsProvider::process_pending_events`].
528+
///
529+
/// Note that callbacks registered on the [`Future`] MUST NOT call back into this
530+
/// [`ChainMonitor`] and should instead register actions to be taken later.
531+
///
532+
/// [`EventsProvider::process_pending_events`]: crate::events::EventsProvider::process_pending_events
533+
pub fn get_update_future(&self) -> Future {
534+
self.event_notifier.get_future()
535+
}
517536
}
518537

519538
impl<ChannelSigner: WriteableEcdsaChannelSigner, C: Deref, T: Deref, F: Deref, L: Deref, P: Deref>

0 commit comments

Comments
 (0)