Skip to content

Commit 1cd41c8

Browse files
committed
Wake the background processor if an async monitor update completes
If the `ChainMonitor` gets an async monitor update completion, this means the `ChannelManager` needs to be polled for event processing. Here we wakt it using the new multi-`Future`-await `Waier`, or the existing `select` block in the async BP. Fixes #2052.
1 parent 4caf6fe commit 1cd41c8

File tree

2 files changed

+26
-2
lines changed

2 files changed

+26
-2
lines changed

lightning-background-processor/src/lib.rs

+8-2
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,8 @@ use lightning::util::events::{Event, PathFailure};
3838
use lightning::util::events::{EventHandler, EventsProvider};
3939
use lightning::util::logger::Logger;
4040
use lightning::util::persist::Persister;
41+
#[cfg(feature = "std")]
42+
use lightning::util::wakers::Sleeper;
4143
use lightning_rapid_gossip_sync::RapidGossipSync;
4244

4345
use core::ops::Deref;
@@ -471,7 +473,8 @@ where
471473
channel_manager, channel_manager.process_pending_events_async(async_event_handler).await,
472474
gossip_sync, peer_manager, logger, scorer, should_break, {
473475
select_biased! {
474-
_ = channel_manager.get_persistable_update_future().fuse() => true,
476+
_ = chain_monitor.get_update_future().fuse() => true,
477+
_ = channel_manager.get_persistable_update_future().fuse() => false,
475478
exit = sleeper(Duration::from_millis(100)).fuse() => {
476479
should_break = exit;
477480
false
@@ -597,7 +600,10 @@ impl BackgroundProcessor {
597600
define_run_body!(persister, chain_monitor, chain_monitor.process_pending_events(&event_handler),
598601
channel_manager, channel_manager.process_pending_events(&event_handler),
599602
gossip_sync, peer_manager, logger, scorer, stop_thread.load(Ordering::Acquire),
600-
channel_manager.get_persistable_update_future().wait_timeout(Duration::from_millis(100)),
603+
Sleeper::from_two_futures(
604+
channel_manager.get_persistable_update_future(),
605+
chain_monitor.get_update_future()
606+
).wait_timeout(Duration::from_millis(100)),
601607
|_| Instant::now(), |time: &Instant, dur| time.elapsed().as_secs() > dur)
602608
});
603609
Self { stop_thread: stop_thread_clone, thread_handle: Some(handle) }

lightning/src/chain/chainmonitor.rs

+18
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ use crate::util::logger::Logger;
3737
use crate::util::errors::APIError;
3838
use crate::util::events;
3939
use crate::util::events::{Event, EventHandler};
40+
use crate::util::wakers::{Future, Notifier};
4041
use crate::ln::channelmanager::ChannelDetails;
4142

4243
use crate::prelude::*;
@@ -240,6 +241,8 @@ pub struct ChainMonitor<ChannelSigner: WriteableEcdsaChannelSigner, C: Deref, T:
240241
pending_monitor_events: Mutex<Vec<(OutPoint, Vec<MonitorEvent>, Option<PublicKey>)>>,
241242
/// The best block height seen, used as a proxy for the passage of time.
242243
highest_chain_height: AtomicUsize,
244+
245+
persistence_notifier: Notifier,
243246
}
244247

245248
impl<ChannelSigner: WriteableEcdsaChannelSigner, C: Deref, T: Deref, F: Deref, L: Deref, P: Deref> ChainMonitor<ChannelSigner, C, T, F, L, P>
@@ -345,6 +348,7 @@ where C::Target: chain::Filter,
345348
persister,
346349
pending_monitor_events: Mutex::new(Vec::new()),
347350
highest_chain_height: AtomicUsize::new(0),
351+
persistence_notifier: Notifier::new(),
348352
}
349353
}
350354

@@ -472,6 +476,7 @@ where C::Target: chain::Filter,
472476
}
473477
},
474478
}
479+
self.persistence_notifier.notify();
475480
Ok(())
476481
}
477482

@@ -486,6 +491,7 @@ where C::Target: chain::Filter,
486491
funding_txo,
487492
monitor_update_id,
488493
}], counterparty_node_id));
494+
self.persistence_notifier.notify();
489495
}
490496

491497
#[cfg(any(test, fuzzing, feature = "_test_utils"))]
@@ -514,6 +520,18 @@ where C::Target: chain::Filter,
514520
handler(event).await;
515521
}
516522
}
523+
524+
/// Gets a [`Future`] that completes when an event is available either via
525+
/// [`chain::Watch::release_pending_monitor_events`] or
526+
/// [`EventsProvider::process_pending_events`].
527+
///
528+
/// Note that callbacks registered on the [`Future`] MUST NOT call back into this
529+
/// [`ChainMonitor`] and should instead register actions to be taken later.
530+
///
531+
/// [`EventsProvider::process_pending_events`]: crate::util::events::EventsProvider::process_pending_events
532+
pub fn get_update_future(&self) -> Future {
533+
self.persistence_notifier.get_future()
534+
}
517535
}
518536

519537
impl<ChannelSigner: WriteableEcdsaChannelSigner, C: Deref, T: Deref, F: Deref, L: Deref, P: Deref>

0 commit comments

Comments
 (0)