@@ -67,9 +67,9 @@ impl RoutingMessageHandler for IgnoringMessageHandler {
67
67
fn handle_node_announcement ( & self , _msg : & msgs:: NodeAnnouncement ) -> Result < bool , LightningError > { Ok ( false ) }
68
68
fn handle_channel_announcement ( & self , _msg : & msgs:: ChannelAnnouncement ) -> Result < bool , LightningError > { Ok ( false ) }
69
69
fn handle_channel_update ( & self , _msg : & msgs:: ChannelUpdate ) -> Result < bool , LightningError > { Ok ( false ) }
70
- fn get_next_channel_announcements ( & self , _starting_point : u64 , _batch_amount : u8 ) ->
71
- Vec < ( msgs:: ChannelAnnouncement , Option < msgs:: ChannelUpdate > , Option < msgs:: ChannelUpdate > ) > { Vec :: new ( ) }
72
- fn get_next_node_announcements ( & self , _starting_point : Option < & PublicKey > , _batch_amount : u8 ) -> Vec < msgs:: NodeAnnouncement > { Vec :: new ( ) }
70
+ fn get_next_channel_announcement ( & self , _starting_point : u64 ) ->
71
+ Option < ( msgs:: ChannelAnnouncement , Option < msgs:: ChannelUpdate > , Option < msgs:: ChannelUpdate > ) > { None }
72
+ fn get_next_node_announcement ( & self , _starting_point : Option < & PublicKey > ) -> Option < msgs:: NodeAnnouncement > { None }
73
73
fn peer_connected ( & self , _their_node_id : & PublicKey , _init : & msgs:: Init ) { }
74
74
fn handle_reply_channel_range ( & self , _their_node_id : & PublicKey , _msg : msgs:: ReplyChannelRange ) -> Result < ( ) , LightningError > { Ok ( ( ) ) }
75
75
fn handle_reply_short_channel_ids_end ( & self , _their_node_id : & PublicKey , _msg : msgs:: ReplyShortChannelIdsEnd ) -> Result < ( ) , LightningError > { Ok ( ( ) ) }
@@ -323,6 +323,10 @@ const MAX_BUFFER_DRAIN_TICK_INTERVALS_PER_PEER: i8 = 4;
323
323
/// tick. Once we have sent this many messages since the last ping, we send a ping right away to
324
324
/// ensures we don't just fill up our send buffer and leave the peer with too many messages to
325
325
/// process before the next ping.
326
+ ///
327
+ /// Note that we continue responding to other messages even after we've sent this many messages, so
328
+ /// it's more of a general guideline used for gossip backfill (and gossip forwarding, times
329
+ /// [`FORWARD_INIT_SYNC_BUFFER_LIMIT_RATIO`]) than a hard limit.
326
330
const BUFFER_DRAIN_MSGS_PER_TICK : usize = 32 ;
327
331
328
332
struct Peer {
@@ -378,6 +382,29 @@ impl Peer {
378
382
InitSyncTracker :: NodesSyncing ( pk) => pk < node_id,
379
383
}
380
384
}
385
+
386
+ /// Returns whether we should be reading bytes from this peer, based on whether its outbound
387
+ /// buffer still has space and we don't need to pause reads to get some writes out.
388
+ fn should_read ( & self ) -> bool {
389
+ self . pending_outbound_buffer . len ( ) < OUTBOUND_BUFFER_LIMIT_READ_PAUSE
390
+ }
391
+
392
+ /// Determines if we should push additional gossip messages onto a peer's outbound buffer for
393
+ /// backfilling gossip data to the peer. This is checked every time the peer's buffer may have
394
+ /// been drained.
395
+ fn should_buffer_gossip_backfill ( & self ) -> bool {
396
+ self . pending_outbound_buffer . is_empty ( ) &&
397
+ self . msgs_sent_since_pong < BUFFER_DRAIN_MSGS_PER_TICK
398
+ }
399
+
400
+ /// Returns whether this peer's buffer is full and we should drop gossip messages.
401
+ fn buffer_full_drop_gossip ( & self ) -> bool {
402
+ if self . pending_outbound_buffer . len ( ) > OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP
403
+ || self . msgs_sent_since_pong > BUFFER_DRAIN_MSGS_PER_TICK * FORWARD_INIT_SYNC_BUFFER_LIMIT_RATIO {
404
+ return false
405
+ }
406
+ true
407
+ }
381
408
}
382
409
383
410
/// SimpleArcPeerManager is useful when you need a PeerManager with a static lifetime, e.g.
@@ -710,46 +737,39 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
710
737
711
738
fn do_attempt_write_data ( & self , descriptor : & mut Descriptor , peer : & mut Peer ) {
712
739
while !peer. awaiting_write_event {
713
- if peer. pending_outbound_buffer . len ( ) < OUTBOUND_BUFFER_LIMIT_READ_PAUSE && peer . msgs_sent_since_pong < BUFFER_DRAIN_MSGS_PER_TICK {
740
+ if peer. should_buffer_gossip_backfill ( ) {
714
741
match peer. sync_status {
715
742
InitSyncTracker :: NoSyncRequested => { } ,
716
743
InitSyncTracker :: ChannelsSyncing ( c) if c < 0xffff_ffff_ffff_ffff => {
717
- let steps = ( ( OUTBOUND_BUFFER_LIMIT_READ_PAUSE - peer . pending_outbound_buffer . len ( ) + 2 ) / 3 ) as u8 ;
718
- let all_messages = self . message_handler . route_handler . get_next_channel_announcements ( c , steps ) ;
719
- for & ( ref announce , ref update_a_option , ref update_b_option ) in all_messages . iter ( ) {
720
- self . enqueue_message ( peer, announce) ;
721
- if let & Some ( ref update_a) = update_a_option {
722
- self . enqueue_message ( peer, update_a) ;
744
+ if let Some ( ( announce , update_a_option , update_b_option ) ) =
745
+ self . message_handler . route_handler . get_next_channel_announcement ( c )
746
+ {
747
+ self . enqueue_message ( peer, & announce) ;
748
+ if let Some ( update_a) = update_a_option {
749
+ self . enqueue_message ( peer, & update_a) ;
723
750
}
724
- if let & Some ( ref update_b) = update_b_option {
725
- self . enqueue_message ( peer, update_b) ;
751
+ if let Some ( update_b) = update_b_option {
752
+ self . enqueue_message ( peer, & update_b) ;
726
753
}
727
754
peer. sync_status = InitSyncTracker :: ChannelsSyncing ( announce. contents . short_channel_id + 1 ) ;
728
- }
729
- if all_messages. is_empty ( ) || all_messages. len ( ) != steps as usize {
755
+ } else {
730
756
peer. sync_status = InitSyncTracker :: ChannelsSyncing ( 0xffff_ffff_ffff_ffff ) ;
731
757
}
732
758
} ,
733
759
InitSyncTracker :: ChannelsSyncing ( c) if c == 0xffff_ffff_ffff_ffff => {
734
- let steps = ( OUTBOUND_BUFFER_LIMIT_READ_PAUSE - peer. pending_outbound_buffer . len ( ) ) as u8 ;
735
- let all_messages = self . message_handler . route_handler . get_next_node_announcements ( None , steps) ;
736
- for msg in all_messages. iter ( ) {
737
- self . enqueue_message ( peer, msg) ;
760
+ if let Some ( msg) = self . message_handler . route_handler . get_next_node_announcement ( None ) {
761
+ self . enqueue_message ( peer, & msg) ;
738
762
peer. sync_status = InitSyncTracker :: NodesSyncing ( msg. contents . node_id ) ;
739
- }
740
- if all_messages. is_empty ( ) || all_messages. len ( ) != steps as usize {
763
+ } else {
741
764
peer. sync_status = InitSyncTracker :: NoSyncRequested ;
742
765
}
743
766
} ,
744
767
InitSyncTracker :: ChannelsSyncing ( _) => unreachable ! ( ) ,
745
768
InitSyncTracker :: NodesSyncing ( key) => {
746
- let steps = ( OUTBOUND_BUFFER_LIMIT_READ_PAUSE - peer. pending_outbound_buffer . len ( ) ) as u8 ;
747
- let all_messages = self . message_handler . route_handler . get_next_node_announcements ( Some ( & key) , steps) ;
748
- for msg in all_messages. iter ( ) {
749
- self . enqueue_message ( peer, msg) ;
769
+ if let Some ( msg) = self . message_handler . route_handler . get_next_node_announcement ( Some ( & key) ) {
770
+ self . enqueue_message ( peer, & msg) ;
750
771
peer. sync_status = InitSyncTracker :: NodesSyncing ( msg. contents . node_id ) ;
751
- }
752
- if all_messages. is_empty ( ) || all_messages. len ( ) != steps as usize {
772
+ } else {
753
773
peer. sync_status = InitSyncTracker :: NoSyncRequested ;
754
774
}
755
775
} ,
@@ -759,18 +779,15 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
759
779
self . maybe_send_extra_ping ( peer) ;
760
780
}
761
781
762
- if {
763
- let next_buff = match peer. pending_outbound_buffer . front ( ) {
764
- None => return ,
765
- Some ( buff) => buff,
766
- } ;
782
+ let next_buff = match peer. pending_outbound_buffer . front ( ) {
783
+ None => return ,
784
+ Some ( buff) => buff,
785
+ } ;
767
786
768
- let should_be_reading = peer. pending_outbound_buffer . len ( ) < OUTBOUND_BUFFER_LIMIT_READ_PAUSE ;
769
- let pending = & next_buff[ peer. pending_outbound_buffer_first_msg_offset ..] ;
770
- let data_sent = descriptor. send_data ( pending, should_be_reading) ;
771
- peer. pending_outbound_buffer_first_msg_offset += data_sent;
772
- if peer. pending_outbound_buffer_first_msg_offset == next_buff. len ( ) { true } else { false }
773
- } {
787
+ let pending = & next_buff[ peer. pending_outbound_buffer_first_msg_offset ..] ;
788
+ let data_sent = descriptor. send_data ( pending, peer. should_read ( ) ) ;
789
+ peer. pending_outbound_buffer_first_msg_offset += data_sent;
790
+ if peer. pending_outbound_buffer_first_msg_offset == next_buff. len ( ) {
774
791
peer. pending_outbound_buffer_first_msg_offset = 0 ;
775
792
peer. pending_outbound_buffer . pop_front ( ) ;
776
793
} else {
@@ -1045,7 +1062,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
1045
1062
}
1046
1063
}
1047
1064
}
1048
- pause_read = peer. pending_outbound_buffer . len ( ) > OUTBOUND_BUFFER_LIMIT_READ_PAUSE ;
1065
+ pause_read = ! peer. should_read ( ) ;
1049
1066
1050
1067
if let Some ( message) = msg_to_handle {
1051
1068
match self . handle_message ( & peer_mutex, peer_lock, message) {
@@ -1308,9 +1325,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
1308
1325
!peer. should_forward_channel_announcement ( msg. contents . short_channel_id ) {
1309
1326
continue
1310
1327
}
1311
- if peer. pending_outbound_buffer . len ( ) > OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP
1312
- || peer. msgs_sent_since_pong > BUFFER_DRAIN_MSGS_PER_TICK * FORWARD_INIT_SYNC_BUFFER_LIMIT_RATIO
1313
- {
1328
+ if peer. buffer_full_drop_gossip ( ) {
1314
1329
log_gossip ! ( self . logger, "Skipping broadcast message to {:?} as its outbound buffer is full" , peer. their_node_id) ;
1315
1330
continue ;
1316
1331
}
@@ -1334,9 +1349,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
1334
1349
!peer. should_forward_node_announcement ( msg. contents . node_id ) {
1335
1350
continue
1336
1351
}
1337
- if peer. pending_outbound_buffer . len ( ) > OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP
1338
- || peer. msgs_sent_since_pong > BUFFER_DRAIN_MSGS_PER_TICK * FORWARD_INIT_SYNC_BUFFER_LIMIT_RATIO
1339
- {
1352
+ if peer. buffer_full_drop_gossip ( ) {
1340
1353
log_gossip ! ( self . logger, "Skipping broadcast message to {:?} as its outbound buffer is full" , peer. their_node_id) ;
1341
1354
continue ;
1342
1355
}
@@ -1359,9 +1372,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
1359
1372
!peer. should_forward_channel_announcement ( msg. contents . short_channel_id ) {
1360
1373
continue
1361
1374
}
1362
- if peer. pending_outbound_buffer . len ( ) > OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP
1363
- || peer. msgs_sent_since_pong > BUFFER_DRAIN_MSGS_PER_TICK * FORWARD_INIT_SYNC_BUFFER_LIMIT_RATIO
1364
- {
1375
+ if peer. buffer_full_drop_gossip ( ) {
1365
1376
log_gossip ! ( self . logger, "Skipping broadcast message to {:?} as its outbound buffer is full" , peer. their_node_id) ;
1366
1377
continue ;
1367
1378
}
@@ -2060,10 +2071,10 @@ mod tests {
2060
2071
2061
2072
// Check that each peer has received the expected number of channel updates and channel
2062
2073
// announcements.
2063
- assert_eq ! ( cfgs[ 0 ] . routing_handler. chan_upds_recvd. load( Ordering :: Acquire ) , 100 ) ;
2064
- assert_eq ! ( cfgs[ 0 ] . routing_handler. chan_anns_recvd. load( Ordering :: Acquire ) , 50 ) ;
2065
- assert_eq ! ( cfgs[ 1 ] . routing_handler. chan_upds_recvd. load( Ordering :: Acquire ) , 100 ) ;
2066
- assert_eq ! ( cfgs[ 1 ] . routing_handler. chan_anns_recvd. load( Ordering :: Acquire ) , 50 ) ;
2074
+ assert_eq ! ( cfgs[ 0 ] . routing_handler. chan_upds_recvd. load( Ordering :: Acquire ) , 108 ) ;
2075
+ assert_eq ! ( cfgs[ 0 ] . routing_handler. chan_anns_recvd. load( Ordering :: Acquire ) , 54 ) ;
2076
+ assert_eq ! ( cfgs[ 1 ] . routing_handler. chan_upds_recvd. load( Ordering :: Acquire ) , 108 ) ;
2077
+ assert_eq ! ( cfgs[ 1 ] . routing_handler. chan_anns_recvd. load( Ordering :: Acquire ) , 54 ) ;
2067
2078
}
2068
2079
2069
2080
#[ test]
0 commit comments