@@ -151,13 +151,20 @@ impl Peer {
151
151
}
152
152
}
153
153
154
+ enum AnnouncementMsg {
155
+ ChanUpdate ( msgs:: ChannelUpdate ) ,
156
+ ChanAnnounce ( msgs:: ChannelAnnouncement ) ,
157
+ NodeAnnounce ( msgs:: NodeAnnouncement ) ,
158
+ }
159
+
154
160
struct PeerHolder < Descriptor : SocketDescriptor > {
155
161
peers : HashMap < Descriptor , Peer > ,
156
162
/// Added to by do_read_event for cases where we pushed a message onto the send buffer but
157
163
/// didn't call do_attempt_write_data to avoid reentrancy. Cleared in process_events()
158
164
peers_needing_send : HashSet < Descriptor > ,
159
165
/// Only add to this set when noise completes:
160
166
node_id_to_descriptor : HashMap < PublicKey , Descriptor > ,
167
+ pending_broadcasts : Vec < ( PublicKey , AnnouncementMsg ) > ,
161
168
}
162
169
163
170
#[ cfg( not( any( target_pointer_width = "32" , target_pointer_width = "64" ) ) ) ]
@@ -226,7 +233,8 @@ impl<Descriptor: SocketDescriptor, CM: Deref, L: Deref> PeerManager<Descriptor,
226
233
peers : Mutex :: new ( PeerHolder {
227
234
peers : HashMap :: new ( ) ,
228
235
peers_needing_send : HashSet :: new ( ) ,
229
- node_id_to_descriptor : HashMap :: new ( )
236
+ node_id_to_descriptor : HashMap :: new ( ) ,
237
+ pending_broadcasts : Vec :: new ( ) ,
230
238
} ) ,
231
239
our_node_secret,
232
240
ephemeral_key_midstate,
@@ -750,21 +758,21 @@ impl<Descriptor: SocketDescriptor, CM: Deref, L: Deref> PeerManager<Descriptor,
750
758
let should_forward = try_potential_handleerror ! ( self . message_handler. route_handler. handle_channel_announcement( & msg) ) ;
751
759
752
760
if should_forward {
753
- // TODO: forward msg along to all our other peers!
761
+ peers . pending_broadcasts . push ( ( peer . their_node_id . unwrap ( ) . clone ( ) , AnnouncementMsg :: ChanAnnounce ( msg) ) ) ;
754
762
}
755
763
} ,
756
764
wire:: Message :: NodeAnnouncement ( msg) => {
757
765
let should_forward = try_potential_handleerror ! ( self . message_handler. route_handler. handle_node_announcement( & msg) ) ;
758
766
759
767
if should_forward {
760
- // TODO: forward msg along to all our other peers!
768
+ peers . pending_broadcasts . push ( ( peer . their_node_id . unwrap ( ) . clone ( ) , AnnouncementMsg :: NodeAnnounce ( msg) ) ) ;
761
769
}
762
770
} ,
763
771
wire:: Message :: ChannelUpdate ( msg) => {
764
772
let should_forward = try_potential_handleerror ! ( self . message_handler. route_handler. handle_channel_update( & msg) ) ;
765
773
766
774
if should_forward {
767
- // TODO: forward msg along to all our other peers!
775
+ peers . pending_broadcasts . push ( ( peer . their_node_id . unwrap ( ) . clone ( ) , AnnouncementMsg :: ChanUpdate ( msg) ) ) ;
768
776
}
769
777
} ,
770
778
@@ -808,6 +816,54 @@ impl<Descriptor: SocketDescriptor, CM: Deref, L: Deref> PeerManager<Descriptor,
808
816
let mut events_generated = self . message_handler . chan_handler . get_and_clear_pending_msg_events ( ) ;
809
817
let mut peers_lock = self . peers . lock ( ) . unwrap ( ) ;
810
818
let peers = & mut * peers_lock;
819
+
820
+ macro_rules! broadcast_msgs {
821
+ ( { $( $except_check: stmt) , * } , { $( $encoded_msg: expr) , * } ) => { {
822
+ for ( ref descriptor, ref mut peer) in peers. peers. iter_mut( ) {
823
+ if !peer. channel_encryptor. is_ready_for_encryption( ) || peer. their_features. is_none( ) {
824
+ continue
825
+ }
826
+ match peer. their_node_id {
827
+ None => continue ,
828
+ Some ( their_node_id) => {
829
+ $(
830
+ if { $except_check } ( & peer, their_node_id) { continue }
831
+ ) *
832
+ }
833
+ }
834
+ $( peer. pending_outbound_buffer. push_back( peer. channel_encryptor. encrypt_message( & $encoded_msg) ) ; ) *
835
+ self . do_attempt_write_data( & mut ( * descriptor) . clone( ) , peer) ;
836
+ }
837
+ } }
838
+ }
839
+
840
+ for ( from_node_id, broadcast) in peers. pending_broadcasts . drain ( ..) {
841
+ match broadcast {
842
+ AnnouncementMsg :: ChanUpdate ( msg) => {
843
+ let encoded_msg = encode_msg ! ( & msg) ;
844
+ broadcast_msgs ! ( { |peer: & & mut Peer , _| !peer. should_forward_channel_announcement( msg. contents. short_channel_id) ,
845
+ |_, their_node_id| their_node_id == from_node_id } ,
846
+ { encoded_msg } ) ;
847
+ } ,
848
+ AnnouncementMsg :: ChanAnnounce ( msg) => {
849
+ let encoded_msg = encode_msg ! ( & msg) ;
850
+ broadcast_msgs ! ( { |peer: & & mut Peer , _| !peer. should_forward_channel_announcement( msg. contents. short_channel_id) ,
851
+ |_, their_node_id| their_node_id == msg. contents. node_id_1,
852
+ |_, their_node_id| their_node_id == msg. contents. node_id_2,
853
+ |_, their_node_id| their_node_id == from_node_id } ,
854
+ { encoded_msg } ) ;
855
+ } ,
856
+ AnnouncementMsg :: NodeAnnounce ( msg) => {
857
+ let encoded_msg = encode_msg ! ( & msg) ;
858
+
859
+ broadcast_msgs ! ( { |peer: & & mut Peer , _| !peer. should_forward_node_announcement( msg. contents. node_id) ,
860
+ |_, their_node_id| their_node_id == msg. contents. node_id,
861
+ |_, their_node_id| their_node_id == from_node_id } ,
862
+ { encoded_msg } ) ;
863
+ }
864
+ }
865
+ }
866
+
811
867
for event in events_generated. drain ( ..) {
812
868
macro_rules! get_peer_for_forwarding {
813
869
( $node_id: expr, $handle_no_such_peer: block) => {
@@ -970,54 +1026,29 @@ impl<Descriptor: SocketDescriptor, CM: Deref, L: Deref> PeerManager<Descriptor,
970
1026
if self . message_handler . route_handler . handle_channel_announcement ( msg) . is_ok ( ) && self . message_handler . route_handler . handle_channel_update ( update_msg) . is_ok ( ) {
971
1027
let encoded_msg = encode_msg ! ( msg) ;
972
1028
let encoded_update_msg = encode_msg ! ( update_msg) ;
973
-
974
- for ( ref descriptor, ref mut peer) in peers. peers . iter_mut ( ) {
975
- if !peer. channel_encryptor . is_ready_for_encryption ( ) || peer. their_features . is_none ( ) ||
976
- !peer. should_forward_channel_announcement ( msg. contents . short_channel_id ) {
977
- continue
978
- }
979
- match peer. their_node_id {
980
- None => continue ,
981
- Some ( their_node_id) => {
982
- if their_node_id == msg. contents . node_id_1 || their_node_id == msg. contents . node_id_2 {
983
- continue
984
- }
985
- }
986
- }
987
- peer. pending_outbound_buffer . push_back ( peer. channel_encryptor . encrypt_message ( & encoded_msg[ ..] ) ) ;
988
- peer. pending_outbound_buffer . push_back ( peer. channel_encryptor . encrypt_message ( & encoded_update_msg[ ..] ) ) ;
989
- self . do_attempt_write_data ( & mut ( * descriptor) . clone ( ) , peer) ;
990
- }
1029
+ broadcast_msgs ! ( { |peer: & & mut Peer , _| !peer. should_forward_channel_announcement( msg. contents. short_channel_id) ,
1030
+ |_, their_node_id| their_node_id == msg. contents. node_id_1,
1031
+ |_, their_node_id| their_node_id == msg. contents. node_id_2 } ,
1032
+ { encoded_msg, encoded_update_msg } ) ;
991
1033
}
992
1034
} ,
993
1035
MessageSendEvent :: BroadcastNodeAnnouncement { ref msg } => {
994
1036
log_trace ! ( self . logger, "Handling BroadcastNodeAnnouncement event in peer_handler" ) ;
995
1037
if self . message_handler . route_handler . handle_node_announcement ( msg) . is_ok ( ) {
996
1038
let encoded_msg = encode_msg ! ( msg) ;
997
1039
998
- for ( ref descriptor, ref mut peer) in peers. peers . iter_mut ( ) {
999
- if !peer. channel_encryptor . is_ready_for_encryption ( ) || peer. their_features . is_none ( ) ||
1000
- !peer. should_forward_node_announcement ( msg. contents . node_id ) {
1001
- continue
1002
- }
1003
- peer. pending_outbound_buffer . push_back ( peer. channel_encryptor . encrypt_message ( & encoded_msg[ ..] ) ) ;
1004
- self . do_attempt_write_data ( & mut ( * descriptor) . clone ( ) , peer) ;
1005
- }
1040
+ broadcast_msgs ! ( { |peer: & & mut Peer , _| !peer. should_forward_node_announcement( msg. contents. node_id) ,
1041
+ |_, their_node_id| their_node_id == msg. contents. node_id } ,
1042
+ { encoded_msg } ) ;
1006
1043
}
1007
1044
} ,
1008
1045
MessageSendEvent :: BroadcastChannelUpdate { ref msg } => {
1009
1046
log_trace ! ( self . logger, "Handling BroadcastChannelUpdate event in peer_handler for short channel id {}" , msg. contents. short_channel_id) ;
1010
1047
if self . message_handler . route_handler . handle_channel_update ( msg) . is_ok ( ) {
1011
1048
let encoded_msg = encode_msg ! ( msg) ;
1012
1049
1013
- for ( ref descriptor, ref mut peer) in peers. peers . iter_mut ( ) {
1014
- if !peer. channel_encryptor . is_ready_for_encryption ( ) || peer. their_features . is_none ( ) ||
1015
- !peer. should_forward_channel_announcement ( msg. contents . short_channel_id ) {
1016
- continue
1017
- }
1018
- peer. pending_outbound_buffer . push_back ( peer. channel_encryptor . encrypt_message ( & encoded_msg[ ..] ) ) ;
1019
- self . do_attempt_write_data ( & mut ( * descriptor) . clone ( ) , peer) ;
1020
- }
1050
+ broadcast_msgs ! ( { |peer: & & mut Peer , _| !peer. should_forward_channel_announcement( msg. contents. short_channel_id) } ,
1051
+ { encoded_msg } ) ;
1021
1052
}
1022
1053
} ,
1023
1054
MessageSendEvent :: PaymentFailureNetworkUpdate { ref update } => {
0 commit comments