@@ -544,7 +544,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
544
544
545
545
log_trace ! ( self . logger, "Enqueueing message of type {} to {}" , message. type_id( ) , log_pubkey!( peer. their_node_id. unwrap( ) ) ) ;
546
546
match peer. encryptor {
547
- PeerState :: Connected ( ref mut conduit) => peer. pending_outbound_buffer . push_back ( conduit. encrypt ( & encode_msg ! ( $msg ) [ ..] ) ) ,
547
+ PeerState :: Connected ( ref mut conduit) => peer. pending_outbound_buffer . push_back ( conduit. encrypt ( & encoded_message [ ..] ) ) ,
548
548
_ => panic ! ( "peer must be connected!" )
549
549
}
550
550
peers_needing_send. insert ( descriptor) ;
@@ -563,7 +563,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
563
563
let data_processing_decision = peer. encryptor . process_peer_data ( data, & mut peer. pending_outbound_buffer ) ;
564
564
match data_processing_decision {
565
565
PeerDataProcessingDecision :: Disconnect ( e) => {
566
- log_trace ! ( self , "Invalid act message; disconnecting: {}" , e) ;
566
+ log_trace ! ( self . logger , "Invalid act message; disconnecting: {}" , e) ;
567
567
return Err ( e) ;
568
568
}
569
569
@@ -577,61 +577,35 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
577
577
// insert node id
578
578
match peers. node_id_to_descriptor . entry ( peer. their_node_id . unwrap ( ) ) {
579
579
hash_map:: Entry :: Occupied ( _) => {
580
- log_trace ! ( self , "Got second connection with {}, closing" , log_pubkey!( peer. their_node_id. unwrap( ) ) ) ;
580
+ log_trace ! ( self . logger , "Got second connection with {}, closing" , log_pubkey!( peer. their_node_id. unwrap( ) ) ) ;
581
581
peer. their_node_id = None ; // Unset so that we don't generate a peer_disconnected event
582
582
return Err ( PeerHandleError { no_connection_possible : false } ) ;
583
583
}
584
584
hash_map:: Entry :: Vacant ( entry) => {
585
- log_trace ! ( self , "Finished noise handshake for connection with {}" , log_pubkey!( peer. their_node_id. unwrap( ) ) ) ;
585
+ log_trace ! ( self . logger , "Finished noise handshake for connection with {}" , log_pubkey!( peer. their_node_id. unwrap( ) ) ) ;
586
586
entry. insert ( peer_descriptor. clone ( ) )
587
587
}
588
588
} ;
589
589
}
590
590
_ => { }
591
591
} ;
592
592
593
- if let & mut PeerState :: Connected ( ref mut conduit) = & mut peer. encryptor {
593
+ if send_init_message {
594
+ let mut features = InitFeatures :: known ( ) ;
595
+ if !self . message_handler . route_handler . should_request_full_sync ( & peer. their_node_id . unwrap ( ) ) {
596
+ features. clear_initial_routing_sync ( ) ;
597
+ }
598
+
599
+ let resp = msgs:: Init { features } ;
600
+ self . enqueue_message ( & mut peers. peers_needing_send , peer, peer_descriptor. clone ( ) , & resp) ;
601
+ send_init_message = false
602
+ }
594
603
604
+ let mut received_messages = vec ! [ ] ;
605
+ if let & mut PeerState :: Connected ( ref mut conduit) = & mut peer. encryptor {
595
606
let encryptor = & mut conduit. encryptor ;
596
607
let decryptor = & mut conduit. decryptor ;
597
608
598
- macro_rules! try_potential_handleerror {
599
- ( $thing: expr) => {
600
- match $thing {
601
- Ok ( x) => x,
602
- Err ( e) => {
603
- match e. action {
604
- msgs:: ErrorAction :: DisconnectPeer { msg: _ } => {
605
- //TODO: Try to push msg
606
- log_trace!( self . logger, "Got Err handling message, disconnecting peer because {}" , e. err) ;
607
- return Err ( PeerHandleError { no_connection_possible: false } ) ;
608
- } ,
609
- msgs:: ErrorAction :: IgnoreError => {
610
- log_trace!( self . logger, "Got Err handling message, ignoring because {}" , e. err) ;
611
- continue ;
612
- } ,
613
- msgs:: ErrorAction :: SendErrorMessage { msg } => {
614
- log_trace!( self . logger, "Got Err handling message, sending Error message because {}" , e. err) ;
615
- self . enqueue_message( & mut peers. peers_needing_send, peer, peer_descriptor. clone( ) , & msg) ;
616
- continue ;
617
- } ,
618
- }
619
- }
620
- } ;
621
- }
622
- }
623
-
624
- if send_init_message {
625
- let mut features = InitFeatures :: known ( ) ;
626
- if !self . message_handler . route_handler . should_request_full_sync ( & peer. their_node_id . unwrap ( ) ) {
627
- features. clear_initial_routing_sync ( ) ;
628
- }
629
-
630
- let resp = msgs:: Init { features } ;
631
- self . enqueue_message ( & mut peers. peers_needing_send , peer, peer_descriptor. clone ( ) , & resp) ;
632
- send_init_message = false
633
- }
634
-
635
609
for msg_data in decryptor {
636
610
let mut reader = :: std:: io:: Cursor :: new ( & msg_data[ ..] ) ;
637
611
let message_result = wire:: read ( & mut reader) ;
@@ -658,16 +632,43 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
658
632
}
659
633
} ;
660
634
661
- if let Err ( handling_error) = self . handle_message ( & mut peers. peers_needing_send , peer, peer_descriptor. clone ( ) , message) {
662
- match handling_error {
663
- MessageHandlingError :: PeerHandleError ( e) => { return Err ( e) } ,
664
- MessageHandlingError :: LightningError ( e) => {
665
- try_potential_handleerror ! ( Err ( e) ) ;
666
- } ,
667
- }
635
+ received_messages. push ( message) ;
636
+ }
637
+ }
638
+
639
+ for message in received_messages {
640
+ macro_rules! try_potential_handleerror {
641
+ ( $thing: expr) => {
642
+ match $thing {
643
+ Ok ( x) => x,
644
+ Err ( e) => {
645
+ match e. action {
646
+ msgs:: ErrorAction :: DisconnectPeer { msg: _ } => {
647
+ //TODO: Try to push msg
648
+ log_trace!( self . logger, "Got Err handling message, disconnecting peer because {}" , e. err) ;
649
+ return Err ( PeerHandleError { no_connection_possible: false } ) ;
650
+ } ,
651
+ msgs:: ErrorAction :: IgnoreError => {
652
+ log_trace!( self . logger, "Got Err handling message, ignoring because {}" , e. err) ;
653
+ continue ;
654
+ } ,
655
+ msgs:: ErrorAction :: SendErrorMessage { msg } => {
656
+ log_trace!( self . logger, "Got Err handling message, sending Error message because {}" , e. err) ;
657
+ self . enqueue_message( & mut peers. peers_needing_send, peer, peer_descriptor. clone( ) , & msg) ;
658
+ continue ;
659
+ } ,
668
660
}
669
661
}
670
- }
662
+ } ;
663
+ }
664
+ }
665
+
666
+ if let Err ( handling_error) = self . handle_message ( & mut peers. peers_needing_send , peer, peer_descriptor. clone ( ) , message) {
667
+ match handling_error {
668
+ MessageHandlingError :: PeerHandleError ( e) => { return Err ( e) } ,
669
+ MessageHandlingError :: LightningError ( e) => {
670
+ try_potential_handleerror ! ( Err ( e) ) ;
671
+ } ,
671
672
}
672
673
}
673
674
}
@@ -1267,7 +1268,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
1267
1268
1268
1269
#[ cfg( test) ]
1269
1270
mod tests {
1270
- use ln:: peer_handler :: { PeerManager , MessageHandler , SocketDescriptor } ;
1271
+ use ln:: peers :: handler :: { PeerManager , MessageHandler , SocketDescriptor } ;
1271
1272
use ln:: msgs;
1272
1273
use util:: events;
1273
1274
use util:: test_utils;
0 commit comments