@@ -658,6 +658,8 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
658
658
let pause_read = {
659
659
let mut peers_lock = self . peers . lock ( ) . unwrap ( ) ;
660
660
let peers = & mut * peers_lock;
661
+ let mut msgs_to_forward = Vec :: new ( ) ;
662
+ let mut peer_node_id = None ;
661
663
let pause_read = match peers. peers . get_mut ( peer_descriptor) {
662
664
None => panic ! ( "Descriptor for read_event is not already known to PeerManager" ) ,
663
665
Some ( peer) => {
@@ -793,13 +795,18 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
793
795
}
794
796
} ;
795
797
796
- if let Err ( handling_error ) = self . handle_message ( & mut peers. peers_needing_send , peer, peer_descriptor. clone ( ) , message) {
797
- match handling_error {
798
+ match self . handle_message ( & mut peers. peers_needing_send , peer, peer_descriptor. clone ( ) , message) {
799
+ Err ( handling_error ) => match handling_error {
798
800
MessageHandlingError :: PeerHandleError ( e) => { return Err ( e) } ,
799
801
MessageHandlingError :: LightningError ( e) => {
800
802
try_potential_handleerror ! ( Err ( e) ) ;
801
803
} ,
802
- }
804
+ } ,
805
+ Ok ( Some ( msg) ) => {
806
+ peer_node_id = Some ( peer. their_node_id . expect ( "After noise is complete, their_node_id is always set" ) ) ;
807
+ msgs_to_forward. push ( msg) ;
808
+ } ,
809
+ Ok ( None ) => { } ,
803
810
}
804
811
}
805
812
}
@@ -811,14 +818,19 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
811
818
}
812
819
} ;
813
820
821
+ for msg in msgs_to_forward. drain ( ..) {
822
+ self . forward_broadcast_msg ( peers, & msg, peer_node_id. as_ref ( ) ) ;
823
+ }
824
+
814
825
pause_read
815
826
} ;
816
827
817
828
Ok ( pause_read)
818
829
}
819
830
820
831
/// Process an incoming message and return a decision (ok, lightning error, peer handling error) regarding the next action with the peer
821
- fn handle_message ( & self , peers_needing_send : & mut HashSet < Descriptor > , peer : & mut Peer , peer_descriptor : Descriptor , message : wire:: Message ) -> Result < ( ) , MessageHandlingError > {
832
+ /// Returns the message back if it needs to be broadcasted to all other peers.
833
+ fn handle_message ( & self , peers_needing_send : & mut HashSet < Descriptor > , peer : & mut Peer , peer_descriptor : Descriptor , message : wire:: Message ) -> Result < Option < wire:: Message > , MessageHandlingError > {
822
834
log_trace ! ( self . logger, "Received message of type {} from {}" , message. type_id( ) , log_pubkey!( peer. their_node_id. unwrap( ) ) ) ;
823
835
824
836
// Need an Init as first message
@@ -828,6 +840,8 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
828
840
return Err ( PeerHandleError { no_connection_possible : false } . into ( ) ) ;
829
841
}
830
842
843
+ let mut should_forward = None ;
844
+
831
845
match message {
832
846
// Setup and Control messages:
833
847
wire:: Message :: Init ( msg) => {
@@ -950,34 +964,28 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
950
964
self . message_handler . chan_handler . handle_announcement_signatures ( & peer. their_node_id . unwrap ( ) , & msg) ;
951
965
} ,
952
966
wire:: Message :: ChannelAnnouncement ( msg) => {
953
- let should_forward = match self . message_handler . route_handler . handle_channel_announcement ( & msg) {
967
+ if match self . message_handler . route_handler . handle_channel_announcement ( & msg) {
954
968
Ok ( v) => v,
955
969
Err ( e) => { return Err ( e. into ( ) ) ; } ,
956
- } ;
957
-
958
- if should_forward {
959
- // TODO: forward msg along to all our other peers!
970
+ } {
971
+ should_forward = Some ( wire:: Message :: ChannelAnnouncement ( msg) ) ;
960
972
}
961
973
} ,
962
974
wire:: Message :: NodeAnnouncement ( msg) => {
963
- let should_forward = match self . message_handler . route_handler . handle_node_announcement ( & msg) {
975
+ if match self . message_handler . route_handler . handle_node_announcement ( & msg) {
964
976
Ok ( v) => v,
965
977
Err ( e) => { return Err ( e. into ( ) ) ; } ,
966
- } ;
967
-
968
- if should_forward {
969
- // TODO: forward msg along to all our other peers!
978
+ } {
979
+ should_forward = Some ( wire:: Message :: NodeAnnouncement ( msg) ) ;
970
980
}
971
981
} ,
972
982
wire:: Message :: ChannelUpdate ( msg) => {
973
983
self . message_handler . chan_handler . handle_channel_update ( & peer. their_node_id . unwrap ( ) , & msg) ;
974
- let should_forward = match self . message_handler . route_handler . handle_channel_update ( & msg) {
984
+ if match self . message_handler . route_handler . handle_channel_update ( & msg) {
975
985
Ok ( v) => v,
976
986
Err ( e) => { return Err ( e. into ( ) ) ; } ,
977
- } ;
978
-
979
- if should_forward {
980
- // TODO: forward msg along to all our other peers!
987
+ } {
988
+ should_forward = Some ( wire:: Message :: ChannelUpdate ( msg) ) ;
981
989
}
982
990
} ,
983
991
wire:: Message :: QueryShortChannelIds ( msg) => {
@@ -1006,7 +1014,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
1006
1014
log_trace ! ( self . logger, "Received unknown odd message of type {}, ignoring" , msg_type) ;
1007
1015
}
1008
1016
} ;
1009
- Ok ( ( ) )
1017
+ Ok ( should_forward )
1010
1018
}
1011
1019
1012
1020
fn forward_broadcast_msg ( & self , peers : & mut PeerHolder < Descriptor > , msg : & wire:: Message , except_node : Option < & PublicKey > ) {
0 commit comments