Skip to content

Wake background-processor on async monitor update completion #2090

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
Apr 3, 2023
56 changes: 43 additions & 13 deletions lightning-background-processor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ use lightning::routing::router::Router;
use lightning::routing::scoring::{Score, WriteableScore};
use lightning::util::logger::Logger;
use lightning::util::persist::Persister;
#[cfg(feature = "std")]
use lightning::util::wakers::Sleeper;
use lightning_rapid_gossip_sync::RapidGossipSync;

use core::ops::Deref;
Expand Down Expand Up @@ -114,6 +116,13 @@ const FIRST_NETWORK_PRUNE_TIMER: u64 = 60;
#[cfg(test)]
const FIRST_NETWORK_PRUNE_TIMER: u64 = 1;

#[cfg(feature = "futures")]
/// core::cmp::min is not currently const, so we define a trivial (and equivalent) replacement
const fn min_u64(a: u64, b: u64) -> u64 { if a < b { a } else { b } }
#[cfg(feature = "futures")]
const FASTEST_TIMER: u64 = min_u64(min_u64(FRESHNESS_TIMER, PING_TIMER),
min_u64(SCORER_PERSIST_TIMER, FIRST_NETWORK_PRUNE_TIMER));

/// Either [`P2PGossipSync`] or [`RapidGossipSync`].
pub enum GossipSync<
P: Deref<Target = P2PGossipSync<G, U, L>>,
Expand Down Expand Up @@ -256,7 +265,8 @@ macro_rules! define_run_body {
($persister: ident, $chain_monitor: ident, $process_chain_monitor_events: expr,
$channel_manager: ident, $process_channel_manager_events: expr,
$gossip_sync: ident, $peer_manager: ident, $logger: ident, $scorer: ident,
$loop_exit_check: expr, $await: expr, $get_timer: expr, $timer_elapsed: expr)
$loop_exit_check: expr, $await: expr, $get_timer: expr, $timer_elapsed: expr,
$check_slow_await: expr)
=> { {
log_trace!($logger, "Calling ChannelManager's timer_tick_occurred on startup");
$channel_manager.timer_tick_occurred();
Expand Down Expand Up @@ -286,9 +296,10 @@ macro_rules! define_run_body {

// We wait up to 100ms, but track how long it takes to detect being put to sleep,
// see `await_start`'s use below.
let mut await_start = $get_timer(1);
let mut await_start = None;
if $check_slow_await { await_start = Some($get_timer(1)); }
let updates_available = $await;
let await_slow = $timer_elapsed(&mut await_start, 1);
let await_slow = if $check_slow_await { $timer_elapsed(&mut await_start.unwrap(), 1) } else { false };

if updates_available {
log_trace!($logger, "Persisting ChannelManager...");
Expand Down Expand Up @@ -388,23 +399,32 @@ pub(crate) mod futures_util {
use core::task::{Poll, Waker, RawWaker, RawWakerVTable};
use core::pin::Pin;
use core::marker::Unpin;
pub(crate) struct Selector<A: Future<Output=()> + Unpin, B: Future<Output=bool> + Unpin> {
pub(crate) struct Selector<
A: Future<Output=()> + Unpin, B: Future<Output=()> + Unpin, C: Future<Output=bool> + Unpin
> {
pub a: A,
pub b: B,
pub c: C,
}
pub(crate) enum SelectorOutput {
A, B(bool),
A, B, C(bool),
}

impl<A: Future<Output=()> + Unpin, B: Future<Output=bool> + Unpin> Future for Selector<A, B> {
impl<
A: Future<Output=()> + Unpin, B: Future<Output=()> + Unpin, C: Future<Output=bool> + Unpin
> Future for Selector<A, B, C> {
type Output = SelectorOutput;
fn poll(mut self: Pin<&mut Self>, ctx: &mut core::task::Context<'_>) -> Poll<SelectorOutput> {
match Pin::new(&mut self.a).poll(ctx) {
Poll::Ready(()) => { return Poll::Ready(SelectorOutput::A); },
Poll::Pending => {},
}
match Pin::new(&mut self.b).poll(ctx) {
Poll::Ready(res) => { return Poll::Ready(SelectorOutput::B(res)); },
Poll::Ready(()) => { return Poll::Ready(SelectorOutput::B); },
Poll::Pending => {},
}
match Pin::new(&mut self.c).poll(ctx) {
Poll::Ready(res) => { return Poll::Ready(SelectorOutput::C(res)); },
Poll::Pending => {},
}
Poll::Pending
Expand Down Expand Up @@ -438,6 +458,11 @@ use core::task;
/// feature, doing so will skip calling [`NetworkGraph::remove_stale_channels_and_tracking`],
/// you should call [`NetworkGraph::remove_stale_channels_and_tracking_with_time`] regularly
/// manually instead.
///
/// The `mobile_interruptable_platform` flag should be set if we're currently running on a
/// mobile device, where we may need to check for interruption of the application regularly. If you
/// are unsure, you should set the flag, as the performance impact of it is minimal unless there
/// are hundreds or thousands of simultaneous process calls running.
#[cfg(feature = "futures")]
pub async fn process_events_async<
'a,
Expand Down Expand Up @@ -473,7 +498,7 @@ pub async fn process_events_async<
>(
persister: PS, event_handler: EventHandler, chain_monitor: M, channel_manager: CM,
gossip_sync: GossipSync<PGS, RGS, G, UL, L>, peer_manager: PM, logger: L, scorer: Option<S>,
sleeper: Sleeper,
sleeper: Sleeper, mobile_interruptable_platform: bool,
) -> Result<(), lightning::io::Error>
where
UL::Target: 'static + UtxoLookup,
Expand Down Expand Up @@ -514,11 +539,13 @@ where
gossip_sync, peer_manager, logger, scorer, should_break, {
let fut = Selector {
a: channel_manager.get_persistable_update_future(),
b: sleeper(Duration::from_millis(100)),
b: chain_monitor.get_update_future(),
c: sleeper(if mobile_interruptable_platform { Duration::from_millis(100) } else { Duration::from_secs(FASTEST_TIMER) }),
};
match fut.await {
SelectorOutput::A => true,
SelectorOutput::B(exit) => {
SelectorOutput::B => false,
SelectorOutput::C(exit) => {
should_break = exit;
false
}
Expand All @@ -528,7 +555,7 @@ where
let mut waker = dummy_waker();
let mut ctx = task::Context::from_waker(&mut waker);
core::pin::Pin::new(fut).poll(&mut ctx).is_ready()
})
}, mobile_interruptable_platform)
}

#[cfg(feature = "std")]
Expand Down Expand Up @@ -643,8 +670,11 @@ impl BackgroundProcessor {
define_run_body!(persister, chain_monitor, chain_monitor.process_pending_events(&event_handler),
channel_manager, channel_manager.process_pending_events(&event_handler),
gossip_sync, peer_manager, logger, scorer, stop_thread.load(Ordering::Acquire),
channel_manager.await_persistable_update_timeout(Duration::from_millis(100)),
|_| Instant::now(), |time: &Instant, dur| time.elapsed().as_secs() > dur)
Sleeper::from_two_futures(
channel_manager.get_persistable_update_future(),
chain_monitor.get_update_future()
).wait_timeout(Duration::from_millis(100)),
|_| Instant::now(), |time: &Instant, dur| time.elapsed().as_secs() > dur, false)
});
Self { stop_thread: stop_thread_clone, thread_handle: Some(handle) }
}
Expand Down
61 changes: 8 additions & 53 deletions lightning-net-tokio/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,64 +8,19 @@
// licenses.

//! A socket handling library for those running in Tokio environments who wish to use
//! rust-lightning with native TcpStreams.
//! rust-lightning with native [`TcpStream`]s.
//!
//! Designed to be as simple as possible, the high-level usage is almost as simple as "hand over a
//! TcpStream and a reference to a PeerManager and the rest is handled", except for the
//! [Event](../lightning/util/events/enum.Event.html) handling mechanism; see example below.
//! [`TcpStream`] and a reference to a [`PeerManager`] and the rest is handled".
//!
//! The PeerHandler, due to the fire-and-forget nature of this logic, must be an Arc, and must use
//! the SocketDescriptor provided here as the PeerHandler's SocketDescriptor.
//! The [`PeerManager`], due to the fire-and-forget nature of this logic, must be a reference,
//! (e.g. an [`Arc`]) and must use the [`SocketDescriptor`] provided here as the [`PeerManager`]'s
//! `SocketDescriptor` implementation.
//!
//! Three methods are exposed to register a new connection for handling in tokio::spawn calls; see
//! their individual docs for details.
//! Three methods are exposed to register a new connection for handling in [`tokio::spawn`] calls;
//! see their individual docs for details.
//!
//! # Example
//! ```
//! use std::net::TcpStream;
//! use bitcoin::secp256k1::PublicKey;
//! use lightning::events::{Event, EventHandler, EventsProvider};
//! use std::net::SocketAddr;
//! use std::sync::Arc;
//!
//! // Define concrete types for our high-level objects:
//! type TxBroadcaster = dyn lightning::chain::chaininterface::BroadcasterInterface + Send + Sync;
//! type FeeEstimator = dyn lightning::chain::chaininterface::FeeEstimator + Send + Sync;
//! type Logger = dyn lightning::util::logger::Logger + Send + Sync;
//! type NodeSigner = dyn lightning::chain::keysinterface::NodeSigner + Send + Sync;
//! type UtxoLookup = dyn lightning::routing::utxo::UtxoLookup + Send + Sync;
//! type ChainFilter = dyn lightning::chain::Filter + Send + Sync;
//! type DataPersister = dyn lightning::chain::chainmonitor::Persist<lightning::chain::keysinterface::InMemorySigner> + Send + Sync;
//! type ChainMonitor = lightning::chain::chainmonitor::ChainMonitor<lightning::chain::keysinterface::InMemorySigner, Arc<ChainFilter>, Arc<TxBroadcaster>, Arc<FeeEstimator>, Arc<Logger>, Arc<DataPersister>>;
//! type ChannelManager = Arc<lightning::ln::channelmanager::SimpleArcChannelManager<ChainMonitor, TxBroadcaster, FeeEstimator, Logger>>;
//! type PeerManager = Arc<lightning::ln::peer_handler::SimpleArcPeerManager<lightning_net_tokio::SocketDescriptor, ChainMonitor, TxBroadcaster, FeeEstimator, UtxoLookup, Logger>>;
//!
//! // Connect to node with pubkey their_node_id at addr:
//! async fn connect_to_node(peer_manager: PeerManager, chain_monitor: Arc<ChainMonitor>, channel_manager: ChannelManager, their_node_id: PublicKey, addr: SocketAddr) {
//! lightning_net_tokio::connect_outbound(peer_manager, their_node_id, addr).await;
//! loop {
//! let event_handler = |event: Event| {
//! // Handle the event!
//! };
//! channel_manager.await_persistable_update();
//! channel_manager.process_pending_events(&event_handler);
//! chain_monitor.process_pending_events(&event_handler);
//! }
//! }
//!
//! // Begin reading from a newly accepted socket and talk to the peer:
//! async fn accept_socket(peer_manager: PeerManager, chain_monitor: Arc<ChainMonitor>, channel_manager: ChannelManager, socket: TcpStream) {
//! lightning_net_tokio::setup_inbound(peer_manager, socket);
//! loop {
//! let event_handler = |event: Event| {
//! // Handle the event!
//! };
//! channel_manager.await_persistable_update();
//! channel_manager.process_pending_events(&event_handler);
//! chain_monitor.process_pending_events(&event_handler);
//! }
//! }
//! ```
//! [`PeerManager`]: lightning::ln::peer_handler::PeerManager

// Prefix these with `rustdoc::` when we update our MSRV to be >= 1.52 to remove warnings.
#![deny(broken_intra_doc_links)]
Expand Down
19 changes: 19 additions & 0 deletions lightning/src/chain/chainmonitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ use crate::events::{Event, EventHandler};
use crate::util::atomic_counter::AtomicCounter;
use crate::util::logger::Logger;
use crate::util::errors::APIError;
use crate::util::wakers::{Future, Notifier};
use crate::ln::channelmanager::ChannelDetails;

use crate::prelude::*;
Expand Down Expand Up @@ -240,6 +241,8 @@ pub struct ChainMonitor<ChannelSigner: WriteableEcdsaChannelSigner, C: Deref, T:
pending_monitor_events: Mutex<Vec<(OutPoint, Vec<MonitorEvent>, Option<PublicKey>)>>,
/// The best block height seen, used as a proxy for the passage of time.
highest_chain_height: AtomicUsize,

event_notifier: Notifier,
}

impl<ChannelSigner: WriteableEcdsaChannelSigner, C: Deref, T: Deref, F: Deref, L: Deref, P: Deref> ChainMonitor<ChannelSigner, C, T, F, L, P>
Expand Down Expand Up @@ -300,6 +303,7 @@ where C::Target: chain::Filter,
ChannelMonitorUpdateStatus::PermanentFailure => {
monitor_state.channel_perm_failed.store(true, Ordering::Release);
self.pending_monitor_events.lock().unwrap().push((*funding_outpoint, vec![MonitorEvent::UpdateFailed(*funding_outpoint)], monitor.get_counterparty_node_id()));
self.event_notifier.notify();
},
ChannelMonitorUpdateStatus::InProgress => {
log_debug!(self.logger, "Channel Monitor sync for channel {} in progress, holding events until completion!", log_funding_info!(monitor));
Expand Down Expand Up @@ -345,6 +349,7 @@ where C::Target: chain::Filter,
persister,
pending_monitor_events: Mutex::new(Vec::new()),
highest_chain_height: AtomicUsize::new(0),
event_notifier: Notifier::new(),
}
}

Expand Down Expand Up @@ -472,6 +477,7 @@ where C::Target: chain::Filter,
}
},
}
self.event_notifier.notify();
Ok(())
}

Expand All @@ -486,6 +492,7 @@ where C::Target: chain::Filter,
funding_txo,
monitor_update_id,
}], counterparty_node_id));
self.event_notifier.notify();
}

#[cfg(any(test, fuzzing, feature = "_test_utils"))]
Expand Down Expand Up @@ -514,6 +521,18 @@ where C::Target: chain::Filter,
handler(event).await;
}
}

/// Gets a [`Future`] that completes when an event is available either via
/// [`chain::Watch::release_pending_monitor_events`] or
/// [`EventsProvider::process_pending_events`].
///
/// Note that callbacks registered on the [`Future`] MUST NOT call back into this
/// [`ChainMonitor`] and should instead register actions to be taken later.
///
/// [`EventsProvider::process_pending_events`]: crate::events::EventsProvider::process_pending_events
pub fn get_update_future(&self) -> Future {
self.event_notifier.get_future()
}
}

impl<ChannelSigner: WriteableEcdsaChannelSigner, C: Deref, T: Deref, F: Deref, L: Deref, P: Deref>
Expand Down
Loading