@@ -51,6 +51,9 @@ pub(super) trait ITransport {
51
51
/// Returns true if the connection is established and encrypted messages can be sent.
52
52
fn is_connected ( & self ) -> bool ;
53
53
54
+ /// Returns the node_id of the remote node. Panics if not connected.
55
+ fn get_their_node_id ( & self ) -> PublicKey ;
56
+
54
57
/// Returns all Messages that have been received and can be parsed by the Transport
55
58
fn drain_messages < L : Deref > ( & mut self , logger : L ) -> Result < Vec < Message > , PeerHandleError > where L :: Target : Logger ;
56
59
@@ -184,7 +187,6 @@ enum InitSyncTracker{
184
187
struct Peer {
185
188
transport : Transport ,
186
189
outbound : bool ,
187
- their_node_id : Option < PublicKey > ,
188
190
their_features : Option < InitFeatures > ,
189
191
190
192
pending_outbound_buffer : OutboundQueue ,
@@ -330,7 +332,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
330
332
if !p. transport . is_connected ( ) || p. their_features . is_none ( ) {
331
333
return None ;
332
334
}
333
- p . their_node_id
335
+ Some ( p . transport . get_their_node_id ( ) )
334
336
} ) . collect ( )
335
337
}
336
338
@@ -363,7 +365,6 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
363
365
if peers. peers . insert ( descriptor, Peer {
364
366
transport,
365
367
outbound : true ,
366
- their_node_id : Some ( their_node_id. clone ( ) ) ,
367
368
their_features : None ,
368
369
369
370
pending_outbound_buffer : OutboundQueue :: new ( MSG_BUFF_SIZE ) ,
@@ -391,7 +392,6 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
391
392
if peers. peers . insert ( descriptor, Peer {
392
393
transport : Transport :: new_inbound ( & self . our_node_secret , & self . get_ephemeral_key ( ) ) ,
393
394
outbound : false ,
394
- their_node_id : None ,
395
395
their_features : None ,
396
396
397
397
pending_outbound_buffer : OutboundQueue :: new ( MSG_BUFF_SIZE ) ,
@@ -530,22 +530,20 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
530
530
531
531
// If the transport is newly connected, do the appropriate set up for the connection
532
532
if peer. transport . is_connected ( ) {
533
- let their_node_id = peer. transport . their_node_id . unwrap ( ) ;
533
+ let their_node_id = peer. transport . get_their_node_id ( ) ;
534
534
535
535
match peers. node_id_to_descriptor . entry ( their_node_id. clone ( ) ) {
536
536
hash_map:: Entry :: Occupied ( entry) => {
537
537
if entry. get ( ) != peer_descriptor {
538
538
// Existing entry in map is from a different descriptor, this is a duplicate
539
539
log_trace ! ( self . logger, "Got second connection with {}, closing" , log_pubkey!( & their_node_id) ) ;
540
- peer. their_node_id = None ;
541
540
return Err ( PeerHandleError { no_connection_possible : false } ) ;
542
541
} else {
543
542
// read_event for existing peer
544
543
}
545
544
} ,
546
545
hash_map:: Entry :: Vacant ( entry) => {
547
546
log_trace ! ( self . logger, "Finished noise handshake for connection with {}" , log_pubkey!( & their_node_id) ) ;
548
- peer. their_node_id = Some ( their_node_id. clone ( ) ) ;
549
547
550
548
if peer. outbound {
551
549
let mut features = InitFeatures :: known ( ) ;
@@ -616,12 +614,12 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
616
614
617
615
/// Process an incoming message and return a decision (ok, lightning error, peer handling error) regarding the next action with the peer
618
616
fn handle_message ( & self , peers_needing_send : & mut HashSet < Descriptor > , peer : & mut Peer , peer_descriptor : Descriptor , message : wire:: Message ) -> Result < ( ) , MessageHandlingError > {
619
- log_trace ! ( self . logger, "Received message of type {} from {}" , message. type_id( ) , log_pubkey!( peer. their_node_id . unwrap ( ) ) ) ;
617
+ log_trace ! ( self . logger, "Received message of type {} from {}" , message. type_id( ) , log_pubkey!( peer. transport . get_their_node_id ( ) ) ) ;
620
618
621
619
// Need an Init as first message
622
620
if let wire:: Message :: Init ( _) = message {
623
621
} else if peer. their_features . is_none ( ) {
624
- log_trace ! ( self . logger, "Peer {} sent non-Init first message" , log_pubkey!( peer. their_node_id . unwrap ( ) ) ) ;
622
+ log_trace ! ( self . logger, "Peer {} sent non-Init first message" , log_pubkey!( peer. transport . get_their_node_id ( ) ) ) ;
625
623
return Err ( PeerHandleError { no_connection_possible : false } . into ( ) ) ;
626
624
}
627
625
@@ -654,21 +652,21 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
654
652
peers_needing_send. insert ( peer_descriptor. clone ( ) ) ;
655
653
}
656
654
if !msg. features . supports_static_remote_key ( ) {
657
- log_debug ! ( self . logger, "Peer {} does not support static remote key, disconnecting with no_connection_possible" , log_pubkey!( peer. their_node_id . unwrap ( ) ) ) ;
655
+ log_debug ! ( self . logger, "Peer {} does not support static remote key, disconnecting with no_connection_possible" , log_pubkey!( peer. transport . get_their_node_id ( ) ) ) ;
658
656
return Err ( PeerHandleError { no_connection_possible : true } . into ( ) ) ;
659
657
}
660
658
661
659
if !peer. outbound {
662
660
let mut features = InitFeatures :: known ( ) ;
663
- if !self . message_handler . route_handler . should_request_full_sync ( & peer. their_node_id . unwrap ( ) ) {
661
+ if !self . message_handler . route_handler . should_request_full_sync ( & peer. transport . get_their_node_id ( ) ) {
664
662
features. clear_initial_routing_sync ( ) ;
665
663
}
666
664
667
665
let resp = msgs:: Init { features } ;
668
666
self . enqueue_message ( peers_needing_send, & mut peer. transport , & mut peer. pending_outbound_buffer , & peer_descriptor, & resp) ;
669
667
}
670
668
671
- self . message_handler . chan_handler . peer_connected ( & peer. their_node_id . unwrap ( ) , & msg) ;
669
+ self . message_handler . chan_handler . peer_connected ( & peer. transport . get_their_node_id ( ) , & msg) ;
672
670
peer. their_features = Some ( msg. features ) ;
673
671
} ,
674
672
wire:: Message :: Error ( msg) => {
@@ -681,11 +679,11 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
681
679
}
682
680
683
681
if data_is_printable {
684
- log_debug ! ( self . logger, "Got Err message from {}: {}" , log_pubkey!( peer. their_node_id . unwrap ( ) ) , msg. data) ;
682
+ log_debug ! ( self . logger, "Got Err message from {}: {}" , log_pubkey!( peer. transport . get_their_node_id ( ) ) , msg. data) ;
685
683
} else {
686
- log_debug ! ( self . logger, "Got Err message from {} with non-ASCII error message" , log_pubkey!( peer. their_node_id . unwrap ( ) ) ) ;
684
+ log_debug ! ( self . logger, "Got Err message from {} with non-ASCII error message" , log_pubkey!( peer. transport . get_their_node_id ( ) ) ) ;
687
685
}
688
- self . message_handler . chan_handler . handle_error ( & peer. their_node_id . unwrap ( ) , & msg) ;
686
+ self . message_handler . chan_handler . handle_error ( & peer. transport . get_their_node_id ( ) , & msg) ;
689
687
if msg. channel_id == [ 0 ; 32 ] {
690
688
return Err ( PeerHandleError { no_connection_possible : true } . into ( ) ) ;
691
689
}
@@ -703,59 +701,59 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
703
701
704
702
// Channel messages:
705
703
wire:: Message :: OpenChannel ( msg) => {
706
- self . message_handler . chan_handler . handle_open_channel ( & peer. their_node_id . unwrap ( ) , peer. their_features . clone ( ) . unwrap ( ) , & msg) ;
704
+ self . message_handler . chan_handler . handle_open_channel ( & peer. transport . get_their_node_id ( ) , peer. their_features . clone ( ) . unwrap ( ) , & msg) ;
707
705
} ,
708
706
wire:: Message :: AcceptChannel ( msg) => {
709
- self . message_handler . chan_handler . handle_accept_channel ( & peer. their_node_id . unwrap ( ) , peer. their_features . clone ( ) . unwrap ( ) , & msg) ;
707
+ self . message_handler . chan_handler . handle_accept_channel ( & peer. transport . get_their_node_id ( ) , peer. their_features . clone ( ) . unwrap ( ) , & msg) ;
710
708
} ,
711
709
712
710
wire:: Message :: FundingCreated ( msg) => {
713
- self . message_handler . chan_handler . handle_funding_created ( & peer. their_node_id . unwrap ( ) , & msg) ;
711
+ self . message_handler . chan_handler . handle_funding_created ( & peer. transport . get_their_node_id ( ) , & msg) ;
714
712
} ,
715
713
wire:: Message :: FundingSigned ( msg) => {
716
- self . message_handler . chan_handler . handle_funding_signed ( & peer. their_node_id . unwrap ( ) , & msg) ;
714
+ self . message_handler . chan_handler . handle_funding_signed ( & peer. transport . get_their_node_id ( ) , & msg) ;
717
715
} ,
718
716
wire:: Message :: FundingLocked ( msg) => {
719
- self . message_handler . chan_handler . handle_funding_locked ( & peer. their_node_id . unwrap ( ) , & msg) ;
717
+ self . message_handler . chan_handler . handle_funding_locked ( & peer. transport . get_their_node_id ( ) , & msg) ;
720
718
} ,
721
719
722
720
wire:: Message :: Shutdown ( msg) => {
723
- self . message_handler . chan_handler . handle_shutdown ( & peer. their_node_id . unwrap ( ) , & msg) ;
721
+ self . message_handler . chan_handler . handle_shutdown ( & peer. transport . get_their_node_id ( ) , & msg) ;
724
722
} ,
725
723
wire:: Message :: ClosingSigned ( msg) => {
726
- self . message_handler . chan_handler . handle_closing_signed ( & peer. their_node_id . unwrap ( ) , & msg) ;
724
+ self . message_handler . chan_handler . handle_closing_signed ( & peer. transport . get_their_node_id ( ) , & msg) ;
727
725
} ,
728
726
729
727
// Commitment messages:
730
728
wire:: Message :: UpdateAddHTLC ( msg) => {
731
- self . message_handler . chan_handler . handle_update_add_htlc ( & peer. their_node_id . unwrap ( ) , & msg) ;
729
+ self . message_handler . chan_handler . handle_update_add_htlc ( & peer. transport . get_their_node_id ( ) , & msg) ;
732
730
} ,
733
731
wire:: Message :: UpdateFulfillHTLC ( msg) => {
734
- self . message_handler . chan_handler . handle_update_fulfill_htlc ( & peer. their_node_id . unwrap ( ) , & msg) ;
732
+ self . message_handler . chan_handler . handle_update_fulfill_htlc ( & peer. transport . get_their_node_id ( ) , & msg) ;
735
733
} ,
736
734
wire:: Message :: UpdateFailHTLC ( msg) => {
737
- self . message_handler . chan_handler . handle_update_fail_htlc ( & peer. their_node_id . unwrap ( ) , & msg) ;
735
+ self . message_handler . chan_handler . handle_update_fail_htlc ( & peer. transport . get_their_node_id ( ) , & msg) ;
738
736
} ,
739
737
wire:: Message :: UpdateFailMalformedHTLC ( msg) => {
740
- self . message_handler . chan_handler . handle_update_fail_malformed_htlc ( & peer. their_node_id . unwrap ( ) , & msg) ;
738
+ self . message_handler . chan_handler . handle_update_fail_malformed_htlc ( & peer. transport . get_their_node_id ( ) , & msg) ;
741
739
} ,
742
740
743
741
wire:: Message :: CommitmentSigned ( msg) => {
744
- self . message_handler . chan_handler . handle_commitment_signed ( & peer. their_node_id . unwrap ( ) , & msg) ;
742
+ self . message_handler . chan_handler . handle_commitment_signed ( & peer. transport . get_their_node_id ( ) , & msg) ;
745
743
} ,
746
744
wire:: Message :: RevokeAndACK ( msg) => {
747
- self . message_handler . chan_handler . handle_revoke_and_ack ( & peer. their_node_id . unwrap ( ) , & msg) ;
745
+ self . message_handler . chan_handler . handle_revoke_and_ack ( & peer. transport . get_their_node_id ( ) , & msg) ;
748
746
} ,
749
747
wire:: Message :: UpdateFee ( msg) => {
750
- self . message_handler . chan_handler . handle_update_fee ( & peer. their_node_id . unwrap ( ) , & msg) ;
748
+ self . message_handler . chan_handler . handle_update_fee ( & peer. transport . get_their_node_id ( ) , & msg) ;
751
749
} ,
752
750
wire:: Message :: ChannelReestablish ( msg) => {
753
- self . message_handler . chan_handler . handle_channel_reestablish ( & peer. their_node_id . unwrap ( ) , & msg) ;
751
+ self . message_handler . chan_handler . handle_channel_reestablish ( & peer. transport . get_their_node_id ( ) , & msg) ;
754
752
} ,
755
753
756
754
// Routing messages:
757
755
wire:: Message :: AnnouncementSignatures ( msg) => {
758
- self . message_handler . chan_handler . handle_announcement_signatures ( & peer. their_node_id . unwrap ( ) , & msg) ;
756
+ self . message_handler . chan_handler . handle_announcement_signatures ( & peer. transport . get_their_node_id ( ) , & msg) ;
759
757
} ,
760
758
wire:: Message :: ChannelAnnouncement ( msg) => {
761
759
let should_forward = match self . message_handler . route_handler . handle_channel_announcement ( & msg) {
@@ -1000,13 +998,10 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
1000
998
!peer. should_forward_channel_announcement ( msg. contents . short_channel_id ) {
1001
999
continue
1002
1000
}
1003
- match peer. their_node_id {
1004
- None => continue ,
1005
- Some ( their_node_id) => {
1006
- if their_node_id == msg. contents . node_id_1 || their_node_id == msg. contents . node_id_2 {
1007
- continue
1008
- }
1009
- }
1001
+
1002
+ let their_node_id = peer. transport . get_their_node_id ( ) ;
1003
+ if their_node_id == msg. contents . node_id_1 || their_node_id == msg. contents . node_id_2 {
1004
+ continue
1010
1005
}
1011
1006
if peer. transport . is_connected ( ) {
1012
1007
peer. transport . enqueue_message ( msg, & mut peer. pending_outbound_buffer , & * self . logger ) ;
@@ -1119,12 +1114,17 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
1119
1114
match peer_option {
1120
1115
None => panic ! ( "Descriptor for disconnect_event is not already known to PeerManager" ) ,
1121
1116
Some ( peer) => {
1122
- match peer. their_node_id {
1123
- Some ( node_id) => {
1117
+ if peer. transport . is_connected ( ) {
1118
+ let node_id = peer. transport . get_their_node_id ( ) ;
1119
+
1120
+ if peers. node_id_to_descriptor . get ( & node_id) . unwrap ( ) == descriptor {
1124
1121
peers. node_id_to_descriptor . remove ( & node_id) ;
1125
1122
self . message_handler . chan_handler . peer_disconnected ( & node_id, no_connection_possible) ;
1126
- } ,
1127
- None => { }
1123
+ } else {
1124
+ // This must have been generated from a duplicate connection error
1125
+ }
1126
+ } else {
1127
+ // Unconnected nodes never make it into node_id_to_descriptor
1128
1128
}
1129
1129
}
1130
1130
} ;
@@ -1147,18 +1147,11 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
1147
1147
if peer. awaiting_pong {
1148
1148
peers_needing_send. remove ( descriptor) ;
1149
1149
descriptors_needing_disconnect. push ( descriptor. clone ( ) ) ;
1150
- match peer. their_node_id {
1151
- Some ( node_id) => {
1152
- log_trace ! ( self . logger, "Disconnecting peer with id {} due to ping timeout" , node_id) ;
1153
- node_id_to_descriptor. remove ( & node_id) ;
1154
- self . message_handler . chan_handler . peer_disconnected ( & node_id, false ) ;
1155
- }
1156
- None => {
1157
- // This can't actually happen as we should have hit
1158
- // is_connected() previously on this same peer.
1159
- unreachable ! ( ) ;
1160
- } ,
1161
- }
1150
+ let their_node_id = peer. transport . get_their_node_id ( ) ;
1151
+ log_trace ! ( self . logger, "Disconnecting peer with id {} due to ping timeout" , their_node_id) ;
1152
+ node_id_to_descriptor. remove ( & their_node_id) ;
1153
+ self . message_handler . chan_handler . peer_disconnected ( & their_node_id, false ) ;
1154
+
1162
1155
return false ;
1163
1156
}
1164
1157
0 commit comments