@@ -88,6 +88,12 @@ impl error::Error for PeerHandleError {
88
88
}
89
89
}
90
90
91
+ enum InitSyncTracker {
92
+ NoSyncRequested ,
93
+ ChannelsSyncing ( u64 ) ,
94
+ NodesSyncing ( PublicKey ) ,
95
+ }
96
+
91
97
struct Peer {
92
98
channel_encryptor : PeerChannelEncryptor ,
93
99
outbound : bool ,
@@ -102,6 +108,24 @@ struct Peer {
102
108
pending_read_buffer : Vec < u8 > ,
103
109
pending_read_buffer_pos : usize ,
104
110
pending_read_is_header : bool ,
111
+
112
+ sync_status : InitSyncTracker ,
113
+ }
114
+
115
+ impl Peer {
116
+ /// Returns true if the the channel announcements/updates for the given channel should be
117
+ /// forwarded to this peer.
118
+ /// If we are sending our routing table to this peer and we have not yet sent channel
119
+ /// announcements/updates for the given channel_id then we will send it when we get to that
120
+ /// point and we shouldn't send it yet to avoid sending duplicate updates. If we've already
121
+ /// sent the old versions, we should send the update, and so return true here.
122
+ fn should_forward_channel ( & self , channel_id : u64 ) ->bool {
123
+ match self . sync_status {
124
+ InitSyncTracker :: NoSyncRequested => true ,
125
+ InitSyncTracker :: ChannelsSyncing ( i) => i < channel_id,
126
+ InitSyncTracker :: NodesSyncing ( _) => true ,
127
+ }
128
+ }
105
129
}
106
130
107
131
struct PeerHolder < Descriptor : SocketDescriptor > {
@@ -221,6 +245,8 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
221
245
pending_read_buffer : pending_read_buffer,
222
246
pending_read_buffer_pos : 0 ,
223
247
pending_read_is_header : false ,
248
+
249
+ sync_status : InitSyncTracker :: NoSyncRequested ,
224
250
} ) . is_some ( ) {
225
251
panic ! ( "PeerManager driver duplicated descriptors!" ) ;
226
252
} ;
@@ -255,21 +281,74 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
255
281
pending_read_buffer : pending_read_buffer,
256
282
pending_read_buffer_pos : 0 ,
257
283
pending_read_is_header : false ,
284
+
285
+ sync_status : InitSyncTracker :: NoSyncRequested ,
258
286
} ) . is_some ( ) {
259
287
panic ! ( "PeerManager driver duplicated descriptors!" ) ;
260
288
} ;
261
289
Ok ( ( ) )
262
290
}
263
291
264
- fn do_attempt_write_data ( descriptor : & mut Descriptor , peer : & mut Peer ) {
292
+ fn do_attempt_write_data ( & self , descriptor : & mut Descriptor , peer : & mut Peer ) {
293
+ macro_rules! encode_and_send_msg {
294
+ ( $msg: expr, $msg_code: expr) => {
295
+ {
296
+ log_trace!( self , "Encoding and sending sync update message of type {} to {}" , $msg_code, log_pubkey!( peer. their_node_id. unwrap( ) ) ) ;
297
+ peer. pending_outbound_buffer. push_back( peer. channel_encryptor. encrypt_message( & encode_msg!( $msg, $msg_code) [ ..] ) ) ;
298
+ }
299
+ }
300
+ }
301
+ const MSG_BUFF_SIZE : usize = 10 ;
265
302
while !peer. awaiting_write_event {
303
+ if peer. pending_outbound_buffer . len ( ) < MSG_BUFF_SIZE {
304
+ match peer. sync_status {
305
+ InitSyncTracker :: NoSyncRequested => { } ,
306
+ InitSyncTracker :: ChannelsSyncing ( c) if c < 0xffff_ffff_ffff_ffff => {
307
+ let steps = ( ( MSG_BUFF_SIZE - peer. pending_outbound_buffer . len ( ) + 2 ) / 3 ) as u8 ;
308
+ let all_messages = self . message_handler . route_handler . get_next_channel_announcements ( 0 , steps) ;
309
+ for & ( ref announce, ref update_a, ref update_b) in all_messages. iter ( ) {
310
+ encode_and_send_msg ! ( announce, 256 ) ;
311
+ encode_and_send_msg ! ( update_a, 258 ) ;
312
+ encode_and_send_msg ! ( update_b, 258 ) ;
313
+ peer. sync_status = InitSyncTracker :: ChannelsSyncing ( announce. contents . short_channel_id + 1 ) ;
314
+ }
315
+ if all_messages. is_empty ( ) || all_messages. len ( ) != steps as usize {
316
+ peer. sync_status = InitSyncTracker :: ChannelsSyncing ( 0xffff_ffff_ffff_ffff ) ;
317
+ }
318
+ } ,
319
+ InitSyncTracker :: ChannelsSyncing ( c) if c == 0xffff_ffff_ffff_ffff => {
320
+ let steps = ( MSG_BUFF_SIZE - peer. pending_outbound_buffer . len ( ) ) as u8 ;
321
+ let all_messages = self . message_handler . route_handler . get_next_node_announcements ( None , steps) ;
322
+ for msg in all_messages. iter ( ) {
323
+ encode_and_send_msg ! ( msg, 256 ) ;
324
+ peer. sync_status = InitSyncTracker :: NodesSyncing ( msg. contents . node_id ) ;
325
+ }
326
+ if all_messages. is_empty ( ) || all_messages. len ( ) != steps as usize {
327
+ peer. sync_status = InitSyncTracker :: NoSyncRequested ;
328
+ }
329
+ } ,
330
+ InitSyncTracker :: ChannelsSyncing ( _) => unreachable ! ( ) ,
331
+ InitSyncTracker :: NodesSyncing ( key) => {
332
+ let steps = ( MSG_BUFF_SIZE - peer. pending_outbound_buffer . len ( ) ) as u8 ;
333
+ let all_messages = self . message_handler . route_handler . get_next_node_announcements ( Some ( & key) , steps) ;
334
+ for msg in all_messages. iter ( ) {
335
+ encode_and_send_msg ! ( msg, 256 ) ;
336
+ peer. sync_status = InitSyncTracker :: NodesSyncing ( msg. contents . node_id ) ;
337
+ }
338
+ if all_messages. is_empty ( ) || all_messages. len ( ) != steps as usize {
339
+ peer. sync_status = InitSyncTracker :: NoSyncRequested ;
340
+ }
341
+ } ,
342
+ }
343
+ }
344
+
266
345
if {
267
346
let next_buff = match peer. pending_outbound_buffer . front ( ) {
268
347
None => return ,
269
348
Some ( buff) => buff,
270
349
} ;
271
- let should_be_reading = peer. pending_outbound_buffer . len ( ) < 10 ;
272
350
351
+ let should_be_reading = peer. pending_outbound_buffer . len ( ) < MSG_BUFF_SIZE ;
273
352
let data_sent = descriptor. send_data ( next_buff, peer. pending_outbound_buffer_first_msg_offset , should_be_reading) ;
274
353
peer. pending_outbound_buffer_first_msg_offset += data_sent;
275
354
if peer. pending_outbound_buffer_first_msg_offset == next_buff. len ( ) { true } else { false }
@@ -297,7 +376,7 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
297
376
None => panic ! ( "Descriptor for write_event is not already known to PeerManager" ) ,
298
377
Some ( peer) => {
299
378
peer. awaiting_write_event = false ;
300
- Self :: do_attempt_write_data ( descriptor, peer) ;
379
+ self . do_attempt_write_data ( descriptor, peer) ;
301
380
}
302
381
} ;
303
382
Ok ( ( ) )
@@ -522,6 +601,10 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
522
601
if msg. local_features. supports_unknown_bits( ) { "present" } else { "none" } ,
523
602
if msg. global_features. supports_unknown_bits( ) { "present" } else { "none" } ) ;
524
603
604
+ if msg. local_features . initial_routing_sync ( ) {
605
+ peer. sync_status = InitSyncTracker :: ChannelsSyncing ( 0 ) ;
606
+ peers. peers_needing_send . insert ( peer_descriptor. clone ( ) ) ;
607
+ }
525
608
peer. their_global_features = Some ( msg. global_features ) ;
526
609
peer. their_local_features = Some ( msg. local_features ) ;
527
610
@@ -531,6 +614,7 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
531
614
self . initial_syncs_sent . fetch_add ( 1 , Ordering :: AcqRel ) ;
532
615
local_features. set_initial_routing_sync ( ) ;
533
616
}
617
+
534
618
encode_and_send_msg ! ( msgs:: Init {
535
619
global_features: msgs:: GlobalFeatures :: new( ) ,
536
620
local_features,
@@ -678,7 +762,7 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
678
762
}
679
763
}
680
764
681
- Self :: do_attempt_write_data ( peer_descriptor, peer) ;
765
+ self . do_attempt_write_data ( peer_descriptor, peer) ;
682
766
683
767
peer. pending_outbound_buffer . len ( ) > 10 // pause_read
684
768
}
@@ -735,7 +819,7 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
735
819
//TODO: Drop the pending channel? (or just let it timeout, but that sucks)
736
820
} ) ;
737
821
peer. pending_outbound_buffer . push_back ( peer. channel_encryptor . encrypt_message ( & encode_msg ! ( msg, 33 ) ) ) ;
738
- Self :: do_attempt_write_data ( & mut descriptor, peer) ;
822
+ self . do_attempt_write_data ( & mut descriptor, peer) ;
739
823
} ,
740
824
MessageSendEvent :: SendOpenChannel { ref node_id, ref msg } => {
741
825
log_trace ! ( self , "Handling SendOpenChannel event in peer_handler for node {} for channel {}" ,
@@ -745,7 +829,7 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
745
829
//TODO: Drop the pending channel? (or just let it timeout, but that sucks)
746
830
} ) ;
747
831
peer. pending_outbound_buffer . push_back ( peer. channel_encryptor . encrypt_message ( & encode_msg ! ( msg, 32 ) ) ) ;
748
- Self :: do_attempt_write_data ( & mut descriptor, peer) ;
832
+ self . do_attempt_write_data ( & mut descriptor, peer) ;
749
833
} ,
750
834
MessageSendEvent :: SendFundingCreated { ref node_id, ref msg } => {
751
835
log_trace ! ( self , "Handling SendFundingCreated event in peer_handler for node {} for channel {} (which becomes {})" ,
@@ -757,7 +841,7 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
757
841
//they should just throw away this funding transaction
758
842
} ) ;
759
843
peer. pending_outbound_buffer . push_back ( peer. channel_encryptor . encrypt_message ( & encode_msg ! ( msg, 34 ) ) ) ;
760
- Self :: do_attempt_write_data ( & mut descriptor, peer) ;
844
+ self . do_attempt_write_data ( & mut descriptor, peer) ;
761
845
} ,
762
846
MessageSendEvent :: SendFundingSigned { ref node_id, ref msg } => {
763
847
log_trace ! ( self , "Handling SendFundingSigned event in peer_handler for node {} for channel {}" ,
@@ -768,7 +852,7 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
768
852
//they should just throw away this funding transaction
769
853
} ) ;
770
854
peer. pending_outbound_buffer . push_back ( peer. channel_encryptor . encrypt_message ( & encode_msg ! ( msg, 35 ) ) ) ;
771
- Self :: do_attempt_write_data ( & mut descriptor, peer) ;
855
+ self . do_attempt_write_data ( & mut descriptor, peer) ;
772
856
} ,
773
857
MessageSendEvent :: SendFundingLocked { ref node_id, ref msg } => {
774
858
log_trace ! ( self , "Handling SendFundingLocked event in peer_handler for node {} for channel {}" ,
@@ -778,7 +862,7 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
778
862
//TODO: Do whatever we're gonna do for handling dropped messages
779
863
} ) ;
780
864
peer. pending_outbound_buffer . push_back ( peer. channel_encryptor . encrypt_message ( & encode_msg ! ( msg, 36 ) ) ) ;
781
- Self :: do_attempt_write_data ( & mut descriptor, peer) ;
865
+ self . do_attempt_write_data ( & mut descriptor, peer) ;
782
866
} ,
783
867
MessageSendEvent :: SendAnnouncementSignatures { ref node_id, ref msg } => {
784
868
log_trace ! ( self , "Handling SendAnnouncementSignatures event in peer_handler for node {} for channel {})" ,
@@ -789,7 +873,7 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
789
873
//they should just throw away this funding transaction
790
874
} ) ;
791
875
peer. pending_outbound_buffer . push_back ( peer. channel_encryptor . encrypt_message ( & encode_msg ! ( msg, 259 ) ) ) ;
792
- Self :: do_attempt_write_data ( & mut descriptor, peer) ;
876
+ self . do_attempt_write_data ( & mut descriptor, peer) ;
793
877
} ,
794
878
MessageSendEvent :: UpdateHTLCs { ref node_id, updates : msgs:: CommitmentUpdate { ref update_add_htlcs, ref update_fulfill_htlcs, ref update_fail_htlcs, ref update_fail_malformed_htlcs, ref update_fee, ref commitment_signed } } => {
795
879
log_trace ! ( self , "Handling UpdateHTLCs event in peer_handler for node {} with {} adds, {} fulfills, {} fails for channel {}" ,
@@ -817,7 +901,7 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
817
901
peer. pending_outbound_buffer . push_back ( peer. channel_encryptor . encrypt_message ( & encode_msg ! ( msg, 134 ) ) ) ;
818
902
}
819
903
peer. pending_outbound_buffer . push_back ( peer. channel_encryptor . encrypt_message ( & encode_msg ! ( commitment_signed, 132 ) ) ) ;
820
- Self :: do_attempt_write_data ( & mut descriptor, peer) ;
904
+ self . do_attempt_write_data ( & mut descriptor, peer) ;
821
905
} ,
822
906
MessageSendEvent :: SendRevokeAndACK { ref node_id, ref msg } => {
823
907
log_trace ! ( self , "Handling SendRevokeAndACK event in peer_handler for node {} for channel {}" ,
@@ -827,7 +911,7 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
827
911
//TODO: Do whatever we're gonna do for handling dropped messages
828
912
} ) ;
829
913
peer. pending_outbound_buffer . push_back ( peer. channel_encryptor . encrypt_message ( & encode_msg ! ( msg, 133 ) ) ) ;
830
- Self :: do_attempt_write_data ( & mut descriptor, peer) ;
914
+ self . do_attempt_write_data ( & mut descriptor, peer) ;
831
915
} ,
832
916
MessageSendEvent :: SendClosingSigned { ref node_id, ref msg } => {
833
917
log_trace ! ( self , "Handling SendClosingSigned event in peer_handler for node {} for channel {}" ,
@@ -837,7 +921,7 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
837
921
//TODO: Do whatever we're gonna do for handling dropped messages
838
922
} ) ;
839
923
peer. pending_outbound_buffer . push_back ( peer. channel_encryptor . encrypt_message ( & encode_msg ! ( msg, 39 ) ) ) ;
840
- Self :: do_attempt_write_data ( & mut descriptor, peer) ;
924
+ self . do_attempt_write_data ( & mut descriptor, peer) ;
841
925
} ,
842
926
MessageSendEvent :: SendShutdown { ref node_id, ref msg } => {
843
927
log_trace ! ( self , "Handling Shutdown event in peer_handler for node {} for channel {}" ,
@@ -847,7 +931,7 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
847
931
//TODO: Do whatever we're gonna do for handling dropped messages
848
932
} ) ;
849
933
peer. pending_outbound_buffer . push_back ( peer. channel_encryptor . encrypt_message ( & encode_msg ! ( msg, 38 ) ) ) ;
850
- Self :: do_attempt_write_data ( & mut descriptor, peer) ;
934
+ self . do_attempt_write_data ( & mut descriptor, peer) ;
851
935
} ,
852
936
MessageSendEvent :: SendChannelReestablish { ref node_id, ref msg } => {
853
937
log_trace ! ( self , "Handling SendChannelReestablish event in peer_handler for node {} for channel {}" ,
@@ -857,7 +941,7 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
857
941
//TODO: Do whatever we're gonna do for handling dropped messages
858
942
} ) ;
859
943
peer. pending_outbound_buffer . push_back ( peer. channel_encryptor . encrypt_message ( & encode_msg ! ( msg, 136 ) ) ) ;
860
- Self :: do_attempt_write_data ( & mut descriptor, peer) ;
944
+ self . do_attempt_write_data ( & mut descriptor, peer) ;
861
945
} ,
862
946
MessageSendEvent :: BroadcastChannelAnnouncement { ref msg, ref update_msg } => {
863
947
log_trace ! ( self , "Handling BroadcastChannelAnnouncement event in peer_handler for short channel id {}" , msg. contents. short_channel_id) ;
@@ -866,7 +950,8 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
866
950
let encoded_update_msg = encode_msg ! ( update_msg, 258 ) ;
867
951
868
952
for ( ref descriptor, ref mut peer) in peers. peers . iter_mut ( ) {
869
- if !peer. channel_encryptor . is_ready_for_encryption ( ) || peer. their_global_features . is_none ( ) {
953
+ if !peer. channel_encryptor . is_ready_for_encryption ( ) || peer. their_global_features . is_none ( ) ||
954
+ !peer. should_forward_channel ( msg. contents . short_channel_id ) {
870
955
continue
871
956
}
872
957
match peer. their_node_id {
@@ -879,7 +964,7 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
879
964
}
880
965
peer. pending_outbound_buffer . push_back ( peer. channel_encryptor . encrypt_message ( & encoded_msg[ ..] ) ) ;
881
966
peer. pending_outbound_buffer . push_back ( peer. channel_encryptor . encrypt_message ( & encoded_update_msg[ ..] ) ) ;
882
- Self :: do_attempt_write_data ( & mut ( * descriptor) . clone ( ) , peer) ;
967
+ self . do_attempt_write_data ( & mut ( * descriptor) . clone ( ) , peer) ;
883
968
}
884
969
}
885
970
} ,
@@ -889,11 +974,12 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
889
974
let encoded_msg = encode_msg ! ( msg, 258 ) ;
890
975
891
976
for ( ref descriptor, ref mut peer) in peers. peers . iter_mut ( ) {
892
- if !peer. channel_encryptor . is_ready_for_encryption ( ) || peer. their_global_features . is_none ( ) {
977
+ if !peer. channel_encryptor . is_ready_for_encryption ( ) || peer. their_global_features . is_none ( ) ||
978
+ !peer. should_forward_channel ( msg. contents . short_channel_id ) {
893
979
continue
894
980
}
895
981
peer. pending_outbound_buffer . push_back ( peer. channel_encryptor . encrypt_message ( & encoded_msg[ ..] ) ) ;
896
- Self :: do_attempt_write_data ( & mut ( * descriptor) . clone ( ) , peer) ;
982
+ self . do_attempt_write_data ( & mut ( * descriptor) . clone ( ) , peer) ;
897
983
}
898
984
}
899
985
} ,
@@ -914,7 +1000,7 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
914
1000
peer. pending_outbound_buffer . push_back ( peer. channel_encryptor . encrypt_message ( & encode_msg ! ( msg, 17 ) ) ) ;
915
1001
// This isn't guaranteed to work, but if there is enough free
916
1002
// room in the send buffer, put the error message there...
917
- Self :: do_attempt_write_data ( & mut descriptor, & mut peer) ;
1003
+ self . do_attempt_write_data ( & mut descriptor, & mut peer) ;
918
1004
} else {
919
1005
log_trace ! ( self , "Handling DisconnectPeer HandleError event in peer_handler for node {} with no message" , log_pubkey!( node_id) ) ;
920
1006
}
@@ -932,7 +1018,7 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
932
1018
//TODO: Do whatever we're gonna do for handling dropped messages
933
1019
} ) ;
934
1020
peer. pending_outbound_buffer . push_back ( peer. channel_encryptor . encrypt_message ( & encode_msg ! ( msg, 17 ) ) ) ;
935
- Self :: do_attempt_write_data ( & mut descriptor, peer) ;
1021
+ self . do_attempt_write_data ( & mut descriptor, peer) ;
936
1022
} ,
937
1023
}
938
1024
} else {
@@ -944,7 +1030,7 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
944
1030
945
1031
for mut descriptor in peers. peers_needing_send . drain ( ) {
946
1032
match peers. peers . get_mut ( & descriptor) {
947
- Some ( peer) => Self :: do_attempt_write_data ( & mut descriptor, peer) ,
1033
+ Some ( peer) => self . do_attempt_write_data ( & mut descriptor, peer) ,
948
1034
None => panic ! ( "Inconsistent peers set state!" ) ,
949
1035
}
950
1036
}
0 commit comments