Skip to content

Background processing of ChannelManager and ChannelMonitor events #920

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
May 25, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion fuzz/src/chanmon_consistency.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ use lightning::util::errors::APIError;
use lightning::util::events;
use lightning::util::logger::Logger;
use lightning::util::config::UserConfig;
use lightning::util::events::{EventsProvider, MessageSendEventsProvider};
use lightning::util::events::MessageSendEventsProvider;
use lightning::util::ser::{Readable, ReadableArgs, Writeable, Writer};
use lightning::routing::router::{Route, RouteHop};

Expand Down
2 changes: 1 addition & 1 deletion fuzz/src/full_stack.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ use lightning::ln::msgs::DecodeError;
use lightning::routing::router::get_route;
use lightning::routing::network_graph::NetGraphMsgHandler;
use lightning::util::config::UserConfig;
use lightning::util::events::{EventsProvider,Event};
use lightning::util::events::Event;
use lightning::util::enforcing_trait_impls::EnforcingSigner;
use lightning::util::logger::Logger;
use lightning::util::ser::Readable;
Expand Down
182 changes: 139 additions & 43 deletions lightning-background-processor/src/lib.rs

Large diffs are not rendered by default.

103 changes: 40 additions & 63 deletions lightning-net-tokio/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,21 +12,19 @@
//!
//! Designed to be as simple as possible, the high-level usage is almost as simple as "hand over a
//! TcpStream and a reference to a PeerManager and the rest is handled", except for the
//! [Event](../lightning/util/events/enum.Event.html) handlng mechanism, see below.
//! [Event](../lightning/util/events/enum.Event.html) handling mechanism; see example below.
//!
//! The PeerHandler, due to the fire-and-forget nature of this logic, must be an Arc, and must use
//! the SocketDescriptor provided here as the PeerHandler's SocketDescriptor.
//!
//! Three methods are exposed to register a new connection for handling in tokio::spawn calls, see
//! their individual docs for more. All three take a
//! [mpsc::Sender<()>](../tokio/sync/mpsc/struct.Sender.html) which is sent into every time
//! something occurs which may result in lightning [Events](../lightning/util/events/enum.Event.html).
//! The call site should, thus, look something like this:
//! Three methods are exposed to register a new connection for handling in tokio::spawn calls; see
//! their individual docs for details.
//!
//! # Example
//! ```
//! use tokio::sync::mpsc;
//! use std::net::TcpStream;
//! use bitcoin::secp256k1::key::PublicKey;
//! use lightning::util::events::EventsProvider;
//! use lightning::util::events::{Event, EventHandler, EventsProvider};
//! use std::net::SocketAddr;
//! use std::sync::Arc;
//!
Expand All @@ -43,32 +41,30 @@
//!
//! // Connect to node with pubkey their_node_id at addr:
//! async fn connect_to_node(peer_manager: PeerManager, chain_monitor: Arc<ChainMonitor>, channel_manager: ChannelManager, their_node_id: PublicKey, addr: SocketAddr) {
//! let (sender, mut receiver) = mpsc::channel(2);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hum, so could this change affect new users of the sample if it isn't updated? Been wondering if the sample should be pinned to a specific commit for this reason. Or maybe the Cargo.lock makes it ok?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good question! I think Cargo.lock handles this for us since it will add the revision to the package source.

//! lightning_net_tokio::connect_outbound(peer_manager, sender, their_node_id, addr).await;
//! loop {
//! receiver.recv().await;
//! for _event in channel_manager.get_and_clear_pending_events().drain(..) {
//! // Handle the event!
//! }
//! for _event in chain_monitor.get_and_clear_pending_events().drain(..) {
//! // Handle the event!
//! }
//! }
//! lightning_net_tokio::connect_outbound(peer_manager, their_node_id, addr).await;
//! loop {
//! channel_manager.await_persistable_update();
//! channel_manager.process_pending_events(&|event| {
//! // Handle the event!
//! });
//! chain_monitor.process_pending_events(&|event| {
//! // Handle the event!
//! });
//! }
//! }
//!
//! // Begin reading from a newly accepted socket and talk to the peer:
//! async fn accept_socket(peer_manager: PeerManager, chain_monitor: Arc<ChainMonitor>, channel_manager: ChannelManager, socket: TcpStream) {
//! let (sender, mut receiver) = mpsc::channel(2);
//! lightning_net_tokio::setup_inbound(peer_manager, sender, socket);
//! loop {
//! receiver.recv().await;
//! for _event in channel_manager.get_and_clear_pending_events().drain(..) {
//! // Handle the event!
//! }
//! for _event in chain_monitor.get_and_clear_pending_events().drain(..) {
//! // Handle the event!
//! }
//! }
//! lightning_net_tokio::setup_inbound(peer_manager, socket);
//! loop {
//! channel_manager.await_persistable_update();
//! channel_manager.process_pending_events(&|event| {
//! // Handle the event!
//! });
//! chain_monitor.process_pending_events(&|event| {
//! // Handle the event!
//! });
//! }
//! }
//! ```

Expand All @@ -90,7 +86,7 @@ use lightning::util::logger::Logger;
use std::{task, thread};
use std::net::SocketAddr;
use std::net::TcpStream as StdTcpStream;
use std::sync::{Arc, Mutex, MutexGuard};
use std::sync::{Arc, Mutex};
use std::sync::atomic::{AtomicU64, Ordering};
use std::time::Duration;
use std::hash::Hash;
Expand All @@ -102,7 +98,6 @@ static ID_COUNTER: AtomicU64 = AtomicU64::new(0);
/// read future (which is returned by schedule_read).
struct Connection {
writer: Option<io::WriteHalf<TcpStream>>,
event_notify: mpsc::Sender<()>,
// Because our PeerManager is templated by user-provided types, and we can't (as far as I can
// tell) have a const RawWakerVTable built out of templated functions, we need some indirection
// between being woken up with write-ready and calling PeerManager::write_buffer_space_avail.
Expand All @@ -129,21 +124,10 @@ struct Connection {
id: u64,
}
impl Connection {
fn event_trigger(us: &mut MutexGuard<Self>) {
match us.event_notify.try_send(()) {
Ok(_) => {},
Err(mpsc::error::TrySendError::Full(_)) => {
// Ignore full errors as we just need the user to poll after this point, so if they
// haven't received the last send yet, it doesn't matter.
},
_ => panic!()
}
}
async fn schedule_read<CMH, RMH, L>(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor, Arc<CMH>, Arc<RMH>, Arc<L>>>, us: Arc<Mutex<Self>>, mut reader: io::ReadHalf<TcpStream>, mut read_wake_receiver: mpsc::Receiver<()>, mut write_avail_receiver: mpsc::Receiver<()>) where
CMH: ChannelMessageHandler + 'static,
RMH: RoutingMessageHandler + 'static,
L: Logger + 'static + ?Sized {
let peer_manager_ref = peer_manager.clone();
// 8KB is nice and big but also should never cause any issues with stack overflowing.
let mut buf = [0; 8192];

Expand Down Expand Up @@ -201,7 +185,6 @@ impl Connection {
if pause_read {
us_lock.read_paused = true;
}
Self::event_trigger(&mut us_lock);
},
Err(e) => shutdown_socket!(e, Disconnect::CloseConnection),
}
Expand All @@ -210,19 +193,20 @@ impl Connection {
Err(e) => shutdown_socket!(e, Disconnect::PeerDisconnected),
},
}
peer_manager.process_events();
};
let writer_option = us.lock().unwrap().writer.take();
if let Some(mut writer) = writer_option {
// If the socket is already closed, shutdown() will fail, so just ignore it.
let _ = writer.shutdown().await;
}
if let Disconnect::PeerDisconnected = disconnect_type {
peer_manager_ref.socket_disconnected(&our_descriptor);
Self::event_trigger(&mut us.lock().unwrap());
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oops, I forgot that we're also relying on the event trigger thing here to call peer_manager.process_events() (which is also dumb...). Can you add a call to it after the select in the loop and maybe here after the disconnect as well? (while you're at it, we don't need the peer_manager_ref clone anymore, it seems?).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

peer_manager.socket_disconnected(&our_descriptor);
peer_manager.process_events();
}
}

fn new(event_notify: mpsc::Sender<()>, stream: StdTcpStream) -> (io::ReadHalf<TcpStream>, mpsc::Receiver<()>, mpsc::Receiver<()>, Arc<Mutex<Self>>) {
fn new(stream: StdTcpStream) -> (io::ReadHalf<TcpStream>, mpsc::Receiver<()>, mpsc::Receiver<()>, Arc<Mutex<Self>>) {
// We only ever need a channel of depth 1 here: if we returned a non-full write to the
// PeerManager, we will eventually get notified that there is room in the socket to write
// new bytes, which will generate an event. That event will be popped off the queue before
Expand All @@ -238,7 +222,7 @@ impl Connection {

(reader, write_receiver, read_receiver,
Arc::new(Mutex::new(Self {
writer: Some(writer), event_notify, write_avail, read_waker, read_paused: false,
writer: Some(writer), write_avail, read_waker, read_paused: false,
block_disconnect_socket: false, rl_requested_disconnect: false,
id: ID_COUNTER.fetch_add(1, Ordering::AcqRel)
})))
Expand All @@ -251,13 +235,11 @@ impl Connection {
/// The returned future will complete when the peer is disconnected and associated handling
/// futures are freed, though, because all processing futures are spawned with tokio::spawn, you do
/// not need to poll the provided future in order to make progress.
///
/// See the module-level documentation for how to handle the event_notify mpsc::Sender.
pub fn setup_inbound<CMH, RMH, L>(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor, Arc<CMH>, Arc<RMH>, Arc<L>>>, event_notify: mpsc::Sender<()>, stream: StdTcpStream) -> impl std::future::Future<Output=()> where
pub fn setup_inbound<CMH, RMH, L>(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor, Arc<CMH>, Arc<RMH>, Arc<L>>>, stream: StdTcpStream) -> impl std::future::Future<Output=()> where
CMH: ChannelMessageHandler + 'static + Send + Sync,
RMH: RoutingMessageHandler + 'static + Send + Sync,
L: Logger + 'static + ?Sized + Send + Sync {
let (reader, write_receiver, read_receiver, us) = Connection::new(event_notify, stream);
let (reader, write_receiver, read_receiver, us) = Connection::new(stream);
#[cfg(debug_assertions)]
let last_us = Arc::clone(&us);

Expand Down Expand Up @@ -293,13 +275,11 @@ pub fn setup_inbound<CMH, RMH, L>(peer_manager: Arc<peer_handler::PeerManager<So
/// The returned future will complete when the peer is disconnected and associated handling
/// futures are freed, though, because all processing futures are spawned with tokio::spawn, you do
/// not need to poll the provided future in order to make progress.
///
/// See the module-level documentation for how to handle the event_notify mpsc::Sender.
pub fn setup_outbound<CMH, RMH, L>(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor, Arc<CMH>, Arc<RMH>, Arc<L>>>, event_notify: mpsc::Sender<()>, their_node_id: PublicKey, stream: StdTcpStream) -> impl std::future::Future<Output=()> where
pub fn setup_outbound<CMH, RMH, L>(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor, Arc<CMH>, Arc<RMH>, Arc<L>>>, their_node_id: PublicKey, stream: StdTcpStream) -> impl std::future::Future<Output=()> where
CMH: ChannelMessageHandler + 'static + Send + Sync,
RMH: RoutingMessageHandler + 'static + Send + Sync,
L: Logger + 'static + ?Sized + Send + Sync {
let (reader, mut write_receiver, read_receiver, us) = Connection::new(event_notify, stream);
let (reader, mut write_receiver, read_receiver, us) = Connection::new(stream);
#[cfg(debug_assertions)]
let last_us = Arc::clone(&us);

Expand Down Expand Up @@ -365,14 +345,12 @@ pub fn setup_outbound<CMH, RMH, L>(peer_manager: Arc<peer_handler::PeerManager<S
/// disconnected and associated handling futures are freed, though, because all processing in said
/// futures are spawned with tokio::spawn, you do not need to poll the second future in order to
/// make progress.
///
/// See the module-level documentation for how to handle the event_notify mpsc::Sender.
pub async fn connect_outbound<CMH, RMH, L>(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor, Arc<CMH>, Arc<RMH>, Arc<L>>>, event_notify: mpsc::Sender<()>, their_node_id: PublicKey, addr: SocketAddr) -> Option<impl std::future::Future<Output=()>> where
pub async fn connect_outbound<CMH, RMH, L>(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor, Arc<CMH>, Arc<RMH>, Arc<L>>>, their_node_id: PublicKey, addr: SocketAddr) -> Option<impl std::future::Future<Output=()>> where
CMH: ChannelMessageHandler + 'static + Send + Sync,
RMH: RoutingMessageHandler + 'static + Send + Sync,
L: Logger + 'static + ?Sized + Send + Sync {
if let Ok(Ok(stream)) = time::timeout(Duration::from_secs(10), async { TcpStream::connect(&addr).await.map(|s| s.into_std().unwrap()) }).await {
Some(setup_outbound(peer_manager, event_notify, their_node_id, stream))
Some(setup_outbound(peer_manager, their_node_id, stream))
} else { None }
}

Expand Down Expand Up @@ -634,9 +612,8 @@ mod tests {
(std::net::TcpStream::connect("127.0.0.1:46926").unwrap(), listener.accept().unwrap().0)
} else { panic!("Failed to bind to v4 localhost on common ports"); };

let (sender, _receiver) = mpsc::channel(2);
let fut_a = super::setup_outbound(Arc::clone(&a_manager), sender.clone(), b_pub, conn_a);
let fut_b = super::setup_inbound(b_manager, sender, conn_b);
let fut_a = super::setup_outbound(Arc::clone(&a_manager), b_pub, conn_a);
let fut_b = super::setup_inbound(b_manager, conn_b);

tokio::time::timeout(Duration::from_secs(10), a_connected.recv()).await.unwrap();
tokio::time::timeout(Duration::from_secs(1), b_connected.recv()).await.unwrap();
Expand Down
24 changes: 20 additions & 4 deletions lightning/src/chain/chainmonitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ use chain::transaction::{OutPoint, TransactionData};
use chain::keysinterface::Sign;
use util::logger::Logger;
use util::events;
use util::events::Event;
use util::events::EventHandler;

use std::collections::{HashMap, hash_map};
use std::sync::RwLock;
Expand Down Expand Up @@ -139,6 +139,15 @@ where C::Target: chain::Filter,
persister,
}
}

#[cfg(any(test, feature = "fuzztarget", feature = "_test_utils"))]
pub fn get_and_clear_pending_events(&self) -> Vec<events::Event> {
use util::events::EventsProvider;
let events = std::cell::RefCell::new(Vec::new());
let event_handler = |event| events.borrow_mut().push(event);
self.process_pending_events(&event_handler);
events.into_inner()
}
}

impl<ChannelSigner: Sign, C: Deref, T: Deref, F: Deref, L: Deref, P: Deref>
Expand Down Expand Up @@ -306,12 +315,20 @@ impl<ChannelSigner: Sign, C: Deref, T: Deref, F: Deref, L: Deref, P: Deref> even
L::Target: Logger,
P::Target: channelmonitor::Persist<ChannelSigner>,
{
fn get_and_clear_pending_events(&self) -> Vec<Event> {
/// Processes [`SpendableOutputs`] events produced from each [`ChannelMonitor`] upon maturity.
///
/// An [`EventHandler`] may safely call back to the provider, though this shouldn't be needed in
/// order to handle these events.
///
/// [`SpendableOutputs`]: events::Event::SpendableOutputs
fn process_pending_events<H: Deref>(&self, handler: H) where H::Target: EventHandler {
let mut pending_events = Vec::new();
for monitor in self.monitors.read().unwrap().values() {
pending_events.append(&mut monitor.get_and_clear_pending_events());
}
pending_events
for event in pending_events.drain(..) {
handler.handle_event(event);
}
}
}

Expand All @@ -320,7 +337,6 @@ mod tests {
use ::{check_added_monitors, get_local_commitment_txn};
use ln::features::InitFeatures;
use ln::functional_test_utils::*;
use util::events::EventsProvider;
use util::events::MessageSendEventsProvider;
use util::test_utils::{OnRegisterOutput, TxOutReference};

Expand Down
10 changes: 5 additions & 5 deletions lightning/src/chain/channelmonitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -222,11 +222,11 @@ pub(crate) const CLTV_CLAIM_BUFFER: u32 = 18;
pub(crate) const LATENCY_GRACE_PERIOD_BLOCKS: u32 = 3;
/// Number of blocks we wait on seeing a HTLC output being solved before we fail corresponding inbound
/// HTLCs. This prevents us from failing backwards and then getting a reorg resulting in us losing money.
/// We use also this delay to be sure we can remove our in-flight claim txn from bump candidates buffer.
/// It may cause spurrious generation of bumped claim txn but that's allright given the outpoint is already
/// solved by a previous claim tx. What we want to avoid is reorg evicting our claim tx and us not
/// keeping bumping another claim tx to solve the outpoint.
pub(crate) const ANTI_REORG_DELAY: u32 = 6;
// We also use this delay to be sure we can remove our in-flight claim txn from bump candidates buffer.
// It may cause spurious generation of bumped claim txn but that's alright given the outpoint is already
// solved by a previous claim tx. What we want to avoid is reorg evicting our claim tx and us not
// keep bumping another claim tx to solve the outpoint.
pub const ANTI_REORG_DELAY: u32 = 6;
/// Number of blocks before confirmation at which we fail back an un-relayed HTLC or at which we
/// refuse to accept a new HTLC.
///
Expand Down
2 changes: 1 addition & 1 deletion lightning/src/ln/chanmon_update_fail_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use ln::msgs::{ChannelMessageHandler, ErrorAction, RoutingMessageHandler};
use routing::router::get_route;
use util::config::UserConfig;
use util::enforcing_trait_impls::EnforcingSigner;
use util::events::{Event, EventsProvider, MessageSendEvent, MessageSendEventsProvider};
use util::events::{Event, MessageSendEvent, MessageSendEventsProvider};
use util::errors::APIError;
use util::ser::{ReadableArgs, Writeable};

Expand Down
Loading