-
Notifications
You must be signed in to change notification settings - Fork 406
Background processing of ChannelManager and ChannelMonitor events #920
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
7c465d6
282d092
248a107
86ce446
501b543
f63fd83
a1f95de
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Large diffs are not rendered by default.
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -12,21 +12,19 @@ | |
//! | ||
//! Designed to be as simple as possible, the high-level usage is almost as simple as "hand over a | ||
//! TcpStream and a reference to a PeerManager and the rest is handled", except for the | ||
//! [Event](../lightning/util/events/enum.Event.html) handlng mechanism, see below. | ||
//! [Event](../lightning/util/events/enum.Event.html) handling mechanism; see example below. | ||
//! | ||
//! The PeerHandler, due to the fire-and-forget nature of this logic, must be an Arc, and must use | ||
//! the SocketDescriptor provided here as the PeerHandler's SocketDescriptor. | ||
//! | ||
//! Three methods are exposed to register a new connection for handling in tokio::spawn calls, see | ||
//! their individual docs for more. All three take a | ||
//! [mpsc::Sender<()>](../tokio/sync/mpsc/struct.Sender.html) which is sent into every time | ||
//! something occurs which may result in lightning [Events](../lightning/util/events/enum.Event.html). | ||
//! The call site should, thus, look something like this: | ||
//! Three methods are exposed to register a new connection for handling in tokio::spawn calls; see | ||
//! their individual docs for details. | ||
//! | ||
//! # Example | ||
//! ``` | ||
//! use tokio::sync::mpsc; | ||
//! use std::net::TcpStream; | ||
//! use bitcoin::secp256k1::key::PublicKey; | ||
//! use lightning::util::events::EventsProvider; | ||
//! use lightning::util::events::{Event, EventHandler, EventsProvider}; | ||
//! use std::net::SocketAddr; | ||
//! use std::sync::Arc; | ||
//! | ||
|
@@ -43,32 +41,30 @@ | |
//! | ||
//! // Connect to node with pubkey their_node_id at addr: | ||
//! async fn connect_to_node(peer_manager: PeerManager, chain_monitor: Arc<ChainMonitor>, channel_manager: ChannelManager, their_node_id: PublicKey, addr: SocketAddr) { | ||
//! let (sender, mut receiver) = mpsc::channel(2); | ||
//! lightning_net_tokio::connect_outbound(peer_manager, sender, their_node_id, addr).await; | ||
//! loop { | ||
//! receiver.recv().await; | ||
//! for _event in channel_manager.get_and_clear_pending_events().drain(..) { | ||
//! // Handle the event! | ||
//! } | ||
//! for _event in chain_monitor.get_and_clear_pending_events().drain(..) { | ||
//! // Handle the event! | ||
//! } | ||
//! } | ||
//! lightning_net_tokio::connect_outbound(peer_manager, their_node_id, addr).await; | ||
//! loop { | ||
//! channel_manager.await_persistable_update(); | ||
//! channel_manager.process_pending_events(&|event| { | ||
//! // Handle the event! | ||
//! }); | ||
//! chain_monitor.process_pending_events(&|event| { | ||
//! // Handle the event! | ||
//! }); | ||
//! } | ||
//! } | ||
//! | ||
//! // Begin reading from a newly accepted socket and talk to the peer: | ||
//! async fn accept_socket(peer_manager: PeerManager, chain_monitor: Arc<ChainMonitor>, channel_manager: ChannelManager, socket: TcpStream) { | ||
//! let (sender, mut receiver) = mpsc::channel(2); | ||
//! lightning_net_tokio::setup_inbound(peer_manager, sender, socket); | ||
//! loop { | ||
//! receiver.recv().await; | ||
//! for _event in channel_manager.get_and_clear_pending_events().drain(..) { | ||
//! // Handle the event! | ||
//! } | ||
//! for _event in chain_monitor.get_and_clear_pending_events().drain(..) { | ||
//! // Handle the event! | ||
//! } | ||
//! } | ||
//! lightning_net_tokio::setup_inbound(peer_manager, socket); | ||
//! loop { | ||
//! channel_manager.await_persistable_update(); | ||
//! channel_manager.process_pending_events(&|event| { | ||
//! // Handle the event! | ||
//! }); | ||
//! chain_monitor.process_pending_events(&|event| { | ||
//! // Handle the event! | ||
//! }); | ||
//! } | ||
//! } | ||
//! ``` | ||
|
||
|
@@ -90,7 +86,7 @@ use lightning::util::logger::Logger; | |
use std::{task, thread}; | ||
use std::net::SocketAddr; | ||
use std::net::TcpStream as StdTcpStream; | ||
use std::sync::{Arc, Mutex, MutexGuard}; | ||
use std::sync::{Arc, Mutex}; | ||
use std::sync::atomic::{AtomicU64, Ordering}; | ||
use std::time::Duration; | ||
use std::hash::Hash; | ||
|
@@ -102,7 +98,6 @@ static ID_COUNTER: AtomicU64 = AtomicU64::new(0); | |
/// read future (which is returned by schedule_read). | ||
struct Connection { | ||
writer: Option<io::WriteHalf<TcpStream>>, | ||
event_notify: mpsc::Sender<()>, | ||
// Because our PeerManager is templated by user-provided types, and we can't (as far as I can | ||
// tell) have a const RawWakerVTable built out of templated functions, we need some indirection | ||
// between being woken up with write-ready and calling PeerManager::write_buffer_space_avail. | ||
|
@@ -129,21 +124,10 @@ struct Connection { | |
id: u64, | ||
} | ||
impl Connection { | ||
fn event_trigger(us: &mut MutexGuard<Self>) { | ||
match us.event_notify.try_send(()) { | ||
Ok(_) => {}, | ||
Err(mpsc::error::TrySendError::Full(_)) => { | ||
// Ignore full errors as we just need the user to poll after this point, so if they | ||
// haven't received the last send yet, it doesn't matter. | ||
}, | ||
_ => panic!() | ||
} | ||
} | ||
async fn schedule_read<CMH, RMH, L>(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor, Arc<CMH>, Arc<RMH>, Arc<L>>>, us: Arc<Mutex<Self>>, mut reader: io::ReadHalf<TcpStream>, mut read_wake_receiver: mpsc::Receiver<()>, mut write_avail_receiver: mpsc::Receiver<()>) where | ||
CMH: ChannelMessageHandler + 'static, | ||
RMH: RoutingMessageHandler + 'static, | ||
L: Logger + 'static + ?Sized { | ||
let peer_manager_ref = peer_manager.clone(); | ||
// 8KB is nice and big but also should never cause any issues with stack overflowing. | ||
let mut buf = [0; 8192]; | ||
|
||
|
@@ -201,7 +185,6 @@ impl Connection { | |
if pause_read { | ||
us_lock.read_paused = true; | ||
} | ||
Self::event_trigger(&mut us_lock); | ||
}, | ||
Err(e) => shutdown_socket!(e, Disconnect::CloseConnection), | ||
} | ||
|
@@ -210,19 +193,20 @@ impl Connection { | |
Err(e) => shutdown_socket!(e, Disconnect::PeerDisconnected), | ||
}, | ||
} | ||
peer_manager.process_events(); | ||
}; | ||
let writer_option = us.lock().unwrap().writer.take(); | ||
if let Some(mut writer) = writer_option { | ||
// If the socket is already closed, shutdown() will fail, so just ignore it. | ||
let _ = writer.shutdown().await; | ||
} | ||
if let Disconnect::PeerDisconnected = disconnect_type { | ||
peer_manager_ref.socket_disconnected(&our_descriptor); | ||
Self::event_trigger(&mut us.lock().unwrap()); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Oops, I forgot that we're also relying on the event trigger thing here to call There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done. |
||
peer_manager.socket_disconnected(&our_descriptor); | ||
peer_manager.process_events(); | ||
} | ||
} | ||
|
||
fn new(event_notify: mpsc::Sender<()>, stream: StdTcpStream) -> (io::ReadHalf<TcpStream>, mpsc::Receiver<()>, mpsc::Receiver<()>, Arc<Mutex<Self>>) { | ||
fn new(stream: StdTcpStream) -> (io::ReadHalf<TcpStream>, mpsc::Receiver<()>, mpsc::Receiver<()>, Arc<Mutex<Self>>) { | ||
// We only ever need a channel of depth 1 here: if we returned a non-full write to the | ||
// PeerManager, we will eventually get notified that there is room in the socket to write | ||
// new bytes, which will generate an event. That event will be popped off the queue before | ||
|
@@ -238,7 +222,7 @@ impl Connection { | |
|
||
(reader, write_receiver, read_receiver, | ||
Arc::new(Mutex::new(Self { | ||
writer: Some(writer), event_notify, write_avail, read_waker, read_paused: false, | ||
writer: Some(writer), write_avail, read_waker, read_paused: false, | ||
block_disconnect_socket: false, rl_requested_disconnect: false, | ||
id: ID_COUNTER.fetch_add(1, Ordering::AcqRel) | ||
}))) | ||
|
@@ -251,13 +235,11 @@ impl Connection { | |
/// The returned future will complete when the peer is disconnected and associated handling | ||
/// futures are freed, though, because all processing futures are spawned with tokio::spawn, you do | ||
/// not need to poll the provided future in order to make progress. | ||
/// | ||
/// See the module-level documentation for how to handle the event_notify mpsc::Sender. | ||
pub fn setup_inbound<CMH, RMH, L>(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor, Arc<CMH>, Arc<RMH>, Arc<L>>>, event_notify: mpsc::Sender<()>, stream: StdTcpStream) -> impl std::future::Future<Output=()> where | ||
pub fn setup_inbound<CMH, RMH, L>(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor, Arc<CMH>, Arc<RMH>, Arc<L>>>, stream: StdTcpStream) -> impl std::future::Future<Output=()> where | ||
CMH: ChannelMessageHandler + 'static + Send + Sync, | ||
RMH: RoutingMessageHandler + 'static + Send + Sync, | ||
L: Logger + 'static + ?Sized + Send + Sync { | ||
let (reader, write_receiver, read_receiver, us) = Connection::new(event_notify, stream); | ||
let (reader, write_receiver, read_receiver, us) = Connection::new(stream); | ||
#[cfg(debug_assertions)] | ||
let last_us = Arc::clone(&us); | ||
|
||
|
@@ -293,13 +275,11 @@ pub fn setup_inbound<CMH, RMH, L>(peer_manager: Arc<peer_handler::PeerManager<So | |
/// The returned future will complete when the peer is disconnected and associated handling | ||
/// futures are freed, though, because all processing futures are spawned with tokio::spawn, you do | ||
/// not need to poll the provided future in order to make progress. | ||
/// | ||
/// See the module-level documentation for how to handle the event_notify mpsc::Sender. | ||
pub fn setup_outbound<CMH, RMH, L>(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor, Arc<CMH>, Arc<RMH>, Arc<L>>>, event_notify: mpsc::Sender<()>, their_node_id: PublicKey, stream: StdTcpStream) -> impl std::future::Future<Output=()> where | ||
pub fn setup_outbound<CMH, RMH, L>(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor, Arc<CMH>, Arc<RMH>, Arc<L>>>, their_node_id: PublicKey, stream: StdTcpStream) -> impl std::future::Future<Output=()> where | ||
CMH: ChannelMessageHandler + 'static + Send + Sync, | ||
RMH: RoutingMessageHandler + 'static + Send + Sync, | ||
L: Logger + 'static + ?Sized + Send + Sync { | ||
let (reader, mut write_receiver, read_receiver, us) = Connection::new(event_notify, stream); | ||
let (reader, mut write_receiver, read_receiver, us) = Connection::new(stream); | ||
#[cfg(debug_assertions)] | ||
let last_us = Arc::clone(&us); | ||
|
||
|
@@ -365,14 +345,12 @@ pub fn setup_outbound<CMH, RMH, L>(peer_manager: Arc<peer_handler::PeerManager<S | |
/// disconnected and associated handling futures are freed, though, because all processing in said | ||
/// futures are spawned with tokio::spawn, you do not need to poll the second future in order to | ||
/// make progress. | ||
/// | ||
/// See the module-level documentation for how to handle the event_notify mpsc::Sender. | ||
pub async fn connect_outbound<CMH, RMH, L>(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor, Arc<CMH>, Arc<RMH>, Arc<L>>>, event_notify: mpsc::Sender<()>, their_node_id: PublicKey, addr: SocketAddr) -> Option<impl std::future::Future<Output=()>> where | ||
pub async fn connect_outbound<CMH, RMH, L>(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor, Arc<CMH>, Arc<RMH>, Arc<L>>>, their_node_id: PublicKey, addr: SocketAddr) -> Option<impl std::future::Future<Output=()>> where | ||
CMH: ChannelMessageHandler + 'static + Send + Sync, | ||
RMH: RoutingMessageHandler + 'static + Send + Sync, | ||
L: Logger + 'static + ?Sized + Send + Sync { | ||
if let Ok(Ok(stream)) = time::timeout(Duration::from_secs(10), async { TcpStream::connect(&addr).await.map(|s| s.into_std().unwrap()) }).await { | ||
Some(setup_outbound(peer_manager, event_notify, their_node_id, stream)) | ||
Some(setup_outbound(peer_manager, their_node_id, stream)) | ||
} else { None } | ||
} | ||
|
||
|
@@ -634,9 +612,8 @@ mod tests { | |
(std::net::TcpStream::connect("127.0.0.1:46926").unwrap(), listener.accept().unwrap().0) | ||
} else { panic!("Failed to bind to v4 localhost on common ports"); }; | ||
|
||
let (sender, _receiver) = mpsc::channel(2); | ||
let fut_a = super::setup_outbound(Arc::clone(&a_manager), sender.clone(), b_pub, conn_a); | ||
let fut_b = super::setup_inbound(b_manager, sender, conn_b); | ||
let fut_a = super::setup_outbound(Arc::clone(&a_manager), b_pub, conn_a); | ||
let fut_b = super::setup_inbound(b_manager, conn_b); | ||
|
||
tokio::time::timeout(Duration::from_secs(10), a_connected.recv()).await.unwrap(); | ||
tokio::time::timeout(Duration::from_secs(1), b_connected.recv()).await.unwrap(); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hum, so could this change affect new users of the sample if it isn't updated? Been wondering if the sample should be pinned to a specific commit for this reason. Or maybe the Cargo.lock makes it ok?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good question! I think
Cargo.lock
handles this for us since it will add the revision to the package source.