Skip to content

Commit d84dcc8

Browse files
committed
Fix unit tests
1 parent 2e4e659 commit d84dcc8

File tree

3 files changed

+62
-61
lines changed

3 files changed

+62
-61
lines changed

fuzz/src/full_stack.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ use lightning::chain::transaction::OutPoint;
2222
use lightning::chain::keysinterface::{InMemoryChannelKeys, KeysInterface};
2323
use lightning::ln::channelmonitor;
2424
use lightning::ln::channelmanager::{ChannelManager, PaymentHash, PaymentPreimage, PaymentSecret};
25-
use lightning::ln::peer_handler::{MessageHandler,PeerManager,SocketDescriptor};
25+
use lightning::ln::peers::handler::{MessageHandler,PeerManager,SocketDescriptor};
2626
use lightning::routing::router::get_route;
2727
use lightning::routing::network_graph::NetGraphMsgHandler;
2828
use lightning::util::events::{EventsProvider,Event};

lightning-net-tokio/src/lib.rs

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@
2828
//! type ChainWatchInterface = dyn lightning::chain::chaininterface::ChainWatchInterface;
2929
//! type ChannelMonitor = lightning::ln::channelmonitor::SimpleManyChannelMonitor<lightning::chain::transaction::OutPoint, lightning::chain::keysinterface::InMemoryChannelKeys, Arc<TxBroadcaster>, Arc<FeeEstimator>, Arc<Logger>, Arc<ChainWatchInterface>>;
3030
//! type ChannelManager = lightning::ln::channelmanager::SimpleArcChannelManager<ChannelMonitor, TxBroadcaster, FeeEstimator, Logger>;
31-
//! type PeerManager = lightning::ln::peer_handler::SimpleArcPeerManager<lightning_net_tokio::SocketDescriptor, ChannelMonitor, TxBroadcaster, FeeEstimator, ChainWatchInterface, Logger>;
31+
//! type PeerManager = lightning::ln::peers::handler::SimpleArcPeerManager<lightning_net_tokio::SocketDescriptor, ChannelMonitor, TxBroadcaster, FeeEstimator, ChainWatchInterface, Logger>;
3232
//!
3333
//! // Connect to node with pubkey their_node_id at addr:
3434
//! async fn connect_to_node(peer_manager: PeerManager, channel_monitor: Arc<ChannelMonitor>, channel_manager: ChannelManager, their_node_id: PublicKey, addr: SocketAddr) {
@@ -68,8 +68,8 @@ use tokio::{io, time};
6868
use tokio::sync::mpsc;
6969
use tokio::io::{AsyncReadExt, AsyncWrite, AsyncWriteExt};
7070

71-
use lightning::ln::peer_handler;
72-
use lightning::ln::peer_handler::SocketDescriptor as LnSocketTrait;
71+
use lightning::ln::peers::handler;
72+
use lightning::ln::peers::handler::SocketDescriptor as LnSocketTrait;
7373
use lightning::ln::msgs::{ChannelMessageHandler, RoutingMessageHandler};
7474
use lightning::util::logger::Logger;
7575

@@ -124,7 +124,7 @@ impl Connection {
124124
_ => panic!()
125125
}
126126
}
127-
async fn schedule_read<CMH, RMH, L>(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor, Arc<CMH>, Arc<RMH>, Arc<L>>>, us: Arc<Mutex<Self>>, mut reader: io::ReadHalf<TcpStream>, mut read_wake_receiver: mpsc::Receiver<()>, mut write_avail_receiver: mpsc::Receiver<()>) where
127+
async fn schedule_read<CMH, RMH, L>(peer_manager: Arc<handler::PeerManager<SocketDescriptor, Arc<CMH>, Arc<RMH>, Arc<L>>>, us: Arc<Mutex<Self>>, mut reader: io::ReadHalf<TcpStream>, mut read_wake_receiver: mpsc::Receiver<()>, mut write_avail_receiver: mpsc::Receiver<()>) where
128128
CMH: ChannelMessageHandler + 'static,
129129
RMH: RoutingMessageHandler + 'static,
130130
L: Logger + 'static + ?Sized {
@@ -237,7 +237,7 @@ impl Connection {
237237
/// not need to poll the provided future in order to make progress.
238238
///
239239
/// See the module-level documentation for how to handle the event_notify mpsc::Sender.
240-
pub fn setup_inbound<CMH, RMH, L>(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor, Arc<CMH>, Arc<RMH>, Arc<L>>>, event_notify: mpsc::Sender<()>, stream: TcpStream) -> impl std::future::Future<Output=()> where
240+
pub fn setup_inbound<CMH, RMH, L>(peer_manager: Arc<handler::PeerManager<SocketDescriptor, Arc<CMH>, Arc<RMH>, Arc<L>>>, event_notify: mpsc::Sender<()>, stream: TcpStream) -> impl std::future::Future<Output=()> where
241241
CMH: ChannelMessageHandler + 'static,
242242
RMH: RoutingMessageHandler + 'static,
243243
L: Logger + 'static + ?Sized {
@@ -279,7 +279,7 @@ pub fn setup_inbound<CMH, RMH, L>(peer_manager: Arc<peer_handler::PeerManager<So
279279
/// not need to poll the provided future in order to make progress.
280280
///
281281
/// See the module-level documentation for how to handle the event_notify mpsc::Sender.
282-
pub fn setup_outbound<CMH, RMH, L>(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor, Arc<CMH>, Arc<RMH>, Arc<L>>>, event_notify: mpsc::Sender<()>, their_node_id: PublicKey, stream: TcpStream) -> impl std::future::Future<Output=()> where
282+
pub fn setup_outbound<CMH, RMH, L>(peer_manager: Arc<handler::PeerManager<SocketDescriptor, Arc<CMH>, Arc<RMH>, Arc<L>>>, event_notify: mpsc::Sender<()>, their_node_id: PublicKey, stream: TcpStream) -> impl std::future::Future<Output=()> where
283283
CMH: ChannelMessageHandler + 'static,
284284
RMH: RoutingMessageHandler + 'static,
285285
L: Logger + 'static + ?Sized {
@@ -351,7 +351,7 @@ pub fn setup_outbound<CMH, RMH, L>(peer_manager: Arc<peer_handler::PeerManager<S
351351
/// make progress.
352352
///
353353
/// See the module-level documentation for how to handle the event_notify mpsc::Sender.
354-
pub async fn connect_outbound<CMH, RMH, L>(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor, Arc<CMH>, Arc<RMH>, Arc<L>>>, event_notify: mpsc::Sender<()>, their_node_id: PublicKey, addr: SocketAddr) -> Option<impl std::future::Future<Output=()>> where
354+
pub async fn connect_outbound<CMH, RMH, L>(peer_manager: Arc<handler::PeerManager<SocketDescriptor, Arc<CMH>, Arc<RMH>, Arc<L>>>, event_notify: mpsc::Sender<()>, their_node_id: PublicKey, addr: SocketAddr) -> Option<impl std::future::Future<Output=()>> where
355355
CMH: ChannelMessageHandler + 'static,
356356
RMH: RoutingMessageHandler + 'static,
357357
L: Logger + 'static + ?Sized {
@@ -402,7 +402,7 @@ impl SocketDescriptor {
402402
Self { conn, id }
403403
}
404404
}
405-
impl peer_handler::SocketDescriptor for SocketDescriptor {
405+
impl handler::SocketDescriptor for SocketDescriptor {
406406
fn send_data(&mut self, data: &[u8], resume_read: bool) -> usize {
407407
// To send data, we take a lock on our Connection to access the WriteHalf of the TcpStream,
408408
// writing to it if there's room in the kernel buffer, or otherwise create a new Waker with
@@ -494,7 +494,7 @@ impl Hash for SocketDescriptor {
494494
mod tests {
495495
use lightning::ln::features::*;
496496
use lightning::ln::msgs::*;
497-
use lightning::ln::peer_handler::{MessageHandler, PeerManager};
497+
use lightning::ln::peers::handler::{MessageHandler, PeerManager};
498498
use lightning::util::events::*;
499499
use bitcoin::secp256k1::{Secp256k1, SecretKey, PublicKey};
500500

lightning/src/ln/peers/handler.rs

Lines changed: 52 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -544,7 +544,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
544544

545545
log_trace!(self.logger, "Enqueueing message of type {} to {}", message.type_id(), log_pubkey!(peer.their_node_id.unwrap()));
546546
match peer.encryptor {
547-
PeerState::Connected(ref mut conduit) => peer.pending_outbound_buffer.push_back(conduit.encrypt(&encode_msg!($msg)[..])),
547+
PeerState::Connected(ref mut conduit) => peer.pending_outbound_buffer.push_back(conduit.encrypt(&encoded_message[..])),
548548
_ => panic!("peer must be connected!")
549549
}
550550
peers_needing_send.insert(descriptor);
@@ -563,7 +563,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
563563
let data_processing_decision = peer.encryptor.process_peer_data(data, &mut peer.pending_outbound_buffer);
564564
match data_processing_decision {
565565
PeerDataProcessingDecision::Disconnect(e) => {
566-
log_trace!(self, "Invalid act message; disconnecting: {}", e);
566+
log_trace!(self.logger, "Invalid act message; disconnecting: {}", e);
567567
return Err(e);
568568
}
569569

@@ -577,61 +577,35 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
577577
// insert node id
578578
match peers.node_id_to_descriptor.entry(peer.their_node_id.unwrap()) {
579579
hash_map::Entry::Occupied(_) => {
580-
log_trace!(self, "Got second connection with {}, closing", log_pubkey!(peer.their_node_id.unwrap()));
580+
log_trace!(self.logger, "Got second connection with {}, closing", log_pubkey!(peer.their_node_id.unwrap()));
581581
peer.their_node_id = None; // Unset so that we don't generate a peer_disconnected event
582582
return Err(PeerHandleError { no_connection_possible: false });
583583
}
584584
hash_map::Entry::Vacant(entry) => {
585-
log_trace!(self, "Finished noise handshake for connection with {}", log_pubkey!(peer.their_node_id.unwrap()));
585+
log_trace!(self.logger, "Finished noise handshake for connection with {}", log_pubkey!(peer.their_node_id.unwrap()));
586586
entry.insert(peer_descriptor.clone())
587587
}
588588
};
589589
}
590590
_ => {}
591591
};
592592

593-
if let &mut PeerState::Connected(ref mut conduit) = &mut peer.encryptor {
593+
if send_init_message {
594+
let mut features = InitFeatures::known();
595+
if !self.message_handler.route_handler.should_request_full_sync(&peer.their_node_id.unwrap()) {
596+
features.clear_initial_routing_sync();
597+
}
598+
599+
let resp = msgs::Init { features };
600+
self.enqueue_message(&mut peers.peers_needing_send, peer, peer_descriptor.clone(), &resp);
601+
send_init_message = false
602+
}
594603

604+
let mut received_messages = vec![];
605+
if let &mut PeerState::Connected(ref mut conduit) = &mut peer.encryptor {
595606
let encryptor = &mut conduit.encryptor;
596607
let decryptor = &mut conduit.decryptor;
597608

598-
macro_rules! try_potential_handleerror {
599-
($thing: expr) => {
600-
match $thing {
601-
Ok(x) => x,
602-
Err(e) => {
603-
match e.action {
604-
msgs::ErrorAction::DisconnectPeer { msg: _ } => {
605-
//TODO: Try to push msg
606-
log_trace!(self.logger, "Got Err handling message, disconnecting peer because {}", e.err);
607-
return Err(PeerHandleError{ no_connection_possible: false });
608-
},
609-
msgs::ErrorAction::IgnoreError => {
610-
log_trace!(self.logger, "Got Err handling message, ignoring because {}", e.err);
611-
continue;
612-
},
613-
msgs::ErrorAction::SendErrorMessage { msg } => {
614-
log_trace!(self.logger, "Got Err handling message, sending Error message because {}", e.err);
615-
self.enqueue_message(&mut peers.peers_needing_send, peer, peer_descriptor.clone(), &msg);
616-
continue;
617-
},
618-
}
619-
}
620-
};
621-
}
622-
}
623-
624-
if send_init_message {
625-
let mut features = InitFeatures::known();
626-
if !self.message_handler.route_handler.should_request_full_sync(&peer.their_node_id.unwrap()) {
627-
features.clear_initial_routing_sync();
628-
}
629-
630-
let resp = msgs::Init { features };
631-
self.enqueue_message(&mut peers.peers_needing_send, peer, peer_descriptor.clone(), &resp);
632-
send_init_message = false
633-
}
634-
635609
for msg_data in decryptor {
636610
let mut reader = ::std::io::Cursor::new(&msg_data[..]);
637611
let message_result = wire::read(&mut reader);
@@ -658,16 +632,43 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
658632
}
659633
};
660634

661-
if let Err(handling_error) = self.handle_message(&mut peers.peers_needing_send, peer, peer_descriptor.clone(), message){
662-
match handling_error {
663-
MessageHandlingError::PeerHandleError(e) => { return Err(e) },
664-
MessageHandlingError::LightningError(e) => {
665-
try_potential_handleerror!(Err(e));
666-
},
667-
}
635+
received_messages.push(message);
636+
}
637+
}
638+
639+
for message in received_messages {
640+
macro_rules! try_potential_handleerror {
641+
($thing: expr) => {
642+
match $thing {
643+
Ok(x) => x,
644+
Err(e) => {
645+
match e.action {
646+
msgs::ErrorAction::DisconnectPeer { msg: _ } => {
647+
//TODO: Try to push msg
648+
log_trace!(self.logger, "Got Err handling message, disconnecting peer because {}", e.err);
649+
return Err(PeerHandleError{ no_connection_possible: false });
650+
},
651+
msgs::ErrorAction::IgnoreError => {
652+
log_trace!(self.logger, "Got Err handling message, ignoring because {}", e.err);
653+
continue;
654+
},
655+
msgs::ErrorAction::SendErrorMessage { msg } => {
656+
log_trace!(self.logger, "Got Err handling message, sending Error message because {}", e.err);
657+
self.enqueue_message(&mut peers.peers_needing_send, peer, peer_descriptor.clone(), &msg);
658+
continue;
659+
},
668660
}
669661
}
670-
}
662+
};
663+
}
664+
}
665+
666+
if let Err(handling_error) = self.handle_message(&mut peers.peers_needing_send, peer, peer_descriptor.clone(), message){
667+
match handling_error {
668+
MessageHandlingError::PeerHandleError(e) => { return Err(e) },
669+
MessageHandlingError::LightningError(e) => {
670+
try_potential_handleerror!(Err(e));
671+
},
671672
}
672673
}
673674
}
@@ -1267,7 +1268,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
12671268

12681269
#[cfg(test)]
12691270
mod tests {
1270-
use ln::peer_handler::{PeerManager, MessageHandler, SocketDescriptor};
1271+
use ln::peers::handler::{PeerManager, MessageHandler, SocketDescriptor};
12711272
use ln::msgs;
12721273
use util::events;
12731274
use util::test_utils;

0 commit comments

Comments
 (0)