Skip to content

Commit 507d759

Browse files
Merge pull request #1623 from lexe-tech/custom-smart-pointers
lightning-net-tokio: Allow custom smart pointers
2 parents 5023ff0 + 5019b31 commit 507d759

File tree

1 file changed

+51
-25
lines changed

1 file changed

+51
-25
lines changed

lightning-net-tokio/src/lib.rs

Lines changed: 51 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,7 @@ use lightning::ln::peer_handler::CustomMessageHandler;
8484
use lightning::ln::msgs::{ChannelMessageHandler, RoutingMessageHandler, NetAddress};
8585
use lightning::util::logger::Logger;
8686

87+
use std::ops::Deref;
8788
use std::task;
8889
use std::net::SocketAddr;
8990
use std::net::TcpStream as StdTcpStream;
@@ -120,11 +121,16 @@ struct Connection {
120121
id: u64,
121122
}
122123
impl Connection {
123-
async fn poll_event_process<CMH, RMH, L, UMH>(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor, Arc<CMH>, Arc<RMH>, Arc<L>, Arc<UMH>>>, mut event_receiver: mpsc::Receiver<()>) where
124-
CMH: ChannelMessageHandler + 'static + Send + Sync,
125-
RMH: RoutingMessageHandler + 'static + Send + Sync,
126-
L: Logger + 'static + ?Sized + Send + Sync,
127-
UMH: CustomMessageHandler + 'static + Send + Sync {
124+
async fn poll_event_process<CMH, RMH, L, UMH>(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor, CMH, RMH, L, UMH>>, mut event_receiver: mpsc::Receiver<()>) where
125+
CMH: Deref + 'static + Send + Sync,
126+
RMH: Deref + 'static + Send + Sync,
127+
L: Deref + 'static + Send + Sync,
128+
UMH: Deref + 'static + Send + Sync,
129+
CMH::Target: ChannelMessageHandler + Send + Sync,
130+
RMH::Target: RoutingMessageHandler + Send + Sync,
131+
L::Target: Logger + Send + Sync,
132+
UMH::Target: CustomMessageHandler + Send + Sync,
133+
{
128134
loop {
129135
if event_receiver.recv().await.is_none() {
130136
return;
@@ -133,11 +139,16 @@ impl Connection {
133139
}
134140
}
135141

136-
async fn schedule_read<CMH, RMH, L, UMH>(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor, Arc<CMH>, Arc<RMH>, Arc<L>, Arc<UMH>>>, us: Arc<Mutex<Self>>, mut reader: io::ReadHalf<TcpStream>, mut read_wake_receiver: mpsc::Receiver<()>, mut write_avail_receiver: mpsc::Receiver<()>) where
137-
CMH: ChannelMessageHandler + 'static + Send + Sync,
138-
RMH: RoutingMessageHandler + 'static + Send + Sync,
139-
L: Logger + 'static + ?Sized + Send + Sync,
140-
UMH: CustomMessageHandler + 'static + Send + Sync {
142+
async fn schedule_read<CMH, RMH, L, UMH>(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor, CMH, RMH, L, UMH>>, us: Arc<Mutex<Self>>, mut reader: io::ReadHalf<TcpStream>, mut read_wake_receiver: mpsc::Receiver<()>, mut write_avail_receiver: mpsc::Receiver<()>) where
143+
CMH: Deref + 'static + Send + Sync,
144+
RMH: Deref + 'static + Send + Sync,
145+
L: Deref + 'static + Send + Sync,
146+
UMH: Deref + 'static + Send + Sync,
147+
CMH::Target: ChannelMessageHandler + 'static + Send + Sync,
148+
RMH::Target: RoutingMessageHandler + 'static + Send + Sync,
149+
L::Target: Logger + 'static + Send + Sync,
150+
UMH::Target: CustomMessageHandler + 'static + Send + Sync,
151+
{
141152
// Create a waker to wake up poll_event_process, above
142153
let (event_waker, event_receiver) = mpsc::channel(1);
143154
tokio::spawn(Self::poll_event_process(Arc::clone(&peer_manager), event_receiver));
@@ -255,11 +266,16 @@ fn get_addr_from_stream(stream: &StdTcpStream) -> Option<NetAddress> {
255266
/// The returned future will complete when the peer is disconnected and associated handling
256267
/// futures are freed, though, because all processing futures are spawned with tokio::spawn, you do
257268
/// not need to poll the provided future in order to make progress.
258-
pub fn setup_inbound<CMH, RMH, L, UMH>(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor, Arc<CMH>, Arc<RMH>, Arc<L>, Arc<UMH>>>, stream: StdTcpStream) -> impl std::future::Future<Output=()> where
259-
CMH: ChannelMessageHandler + 'static + Send + Sync,
260-
RMH: RoutingMessageHandler + 'static + Send + Sync,
261-
L: Logger + 'static + ?Sized + Send + Sync,
262-
UMH: CustomMessageHandler + 'static + Send + Sync {
269+
pub fn setup_inbound<CMH, RMH, L, UMH>(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor, CMH, RMH, L, UMH>>, stream: StdTcpStream) -> impl std::future::Future<Output=()> where
270+
CMH: Deref + 'static + Send + Sync,
271+
RMH: Deref + 'static + Send + Sync,
272+
L: Deref + 'static + Send + Sync,
273+
UMH: Deref + 'static + Send + Sync,
274+
CMH::Target: ChannelMessageHandler + Send + Sync,
275+
RMH::Target: RoutingMessageHandler + Send + Sync,
276+
L::Target: Logger + Send + Sync,
277+
UMH::Target: CustomMessageHandler + Send + Sync,
278+
{
263279
let remote_addr = get_addr_from_stream(&stream);
264280
let (reader, write_receiver, read_receiver, us) = Connection::new(stream);
265281
#[cfg(debug_assertions)]
@@ -297,11 +313,16 @@ pub fn setup_inbound<CMH, RMH, L, UMH>(peer_manager: Arc<peer_handler::PeerManag
297313
/// The returned future will complete when the peer is disconnected and associated handling
298314
/// futures are freed, though, because all processing futures are spawned with tokio::spawn, you do
299315
/// not need to poll the provided future in order to make progress.
300-
pub fn setup_outbound<CMH, RMH, L, UMH>(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor, Arc<CMH>, Arc<RMH>, Arc<L>, Arc<UMH>>>, their_node_id: PublicKey, stream: StdTcpStream) -> impl std::future::Future<Output=()> where
301-
CMH: ChannelMessageHandler + 'static + Send + Sync,
302-
RMH: RoutingMessageHandler + 'static + Send + Sync,
303-
L: Logger + 'static + ?Sized + Send + Sync,
304-
UMH: CustomMessageHandler + 'static + Send + Sync {
316+
pub fn setup_outbound<CMH, RMH, L, UMH>(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor, CMH, RMH, L, UMH>>, their_node_id: PublicKey, stream: StdTcpStream) -> impl std::future::Future<Output=()> where
317+
CMH: Deref + 'static + Send + Sync,
318+
RMH: Deref + 'static + Send + Sync,
319+
L: Deref + 'static + Send + Sync,
320+
UMH: Deref + 'static + Send + Sync,
321+
CMH::Target: ChannelMessageHandler + Send + Sync,
322+
RMH::Target: RoutingMessageHandler + Send + Sync,
323+
L::Target: Logger + Send + Sync,
324+
UMH::Target: CustomMessageHandler + Send + Sync,
325+
{
305326
let remote_addr = get_addr_from_stream(&stream);
306327
let (reader, mut write_receiver, read_receiver, us) = Connection::new(stream);
307328
#[cfg(debug_assertions)]
@@ -368,11 +389,16 @@ pub fn setup_outbound<CMH, RMH, L, UMH>(peer_manager: Arc<peer_handler::PeerMana
368389
/// disconnected and associated handling futures are freed, though, because all processing in said
369390
/// futures are spawned with tokio::spawn, you do not need to poll the second future in order to
370391
/// make progress.
371-
pub async fn connect_outbound<CMH, RMH, L, UMH>(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor, Arc<CMH>, Arc<RMH>, Arc<L>, Arc<UMH>>>, their_node_id: PublicKey, addr: SocketAddr) -> Option<impl std::future::Future<Output=()>> where
372-
CMH: ChannelMessageHandler + 'static + Send + Sync,
373-
RMH: RoutingMessageHandler + 'static + Send + Sync,
374-
L: Logger + 'static + ?Sized + Send + Sync,
375-
UMH: CustomMessageHandler + 'static + Send + Sync {
392+
pub async fn connect_outbound<CMH, RMH, L, UMH>(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor, CMH, RMH, L, UMH>>, their_node_id: PublicKey, addr: SocketAddr) -> Option<impl std::future::Future<Output=()>> where
393+
CMH: Deref + 'static + Send + Sync,
394+
RMH: Deref + 'static + Send + Sync,
395+
L: Deref + 'static + Send + Sync,
396+
UMH: Deref + 'static + Send + Sync,
397+
CMH::Target: ChannelMessageHandler + Send + Sync,
398+
RMH::Target: RoutingMessageHandler + Send + Sync,
399+
L::Target: Logger + Send + Sync,
400+
UMH::Target: CustomMessageHandler + Send + Sync,
401+
{
376402
if let Ok(Ok(stream)) = time::timeout(Duration::from_secs(10), async { TcpStream::connect(&addr).await.map(|s| s.into_std().unwrap()) }).await {
377403
Some(setup_outbound(peer_manager, their_node_id, stream))
378404
} else { None }

0 commit comments

Comments
 (0)