@@ -378,6 +378,31 @@ impl Peer {
378
378
InitSyncTracker :: NodesSyncing ( pk) => pk < node_id,
379
379
}
380
380
}
381
+
382
+ /// Returns the number of gossip messages we can fit in this peer's buffer.
383
+ fn gossip_buffer_slots_available ( & self ) -> usize {
384
+ OUTBOUND_BUFFER_LIMIT_READ_PAUSE . saturating_sub ( self . pending_outbound_buffer . len ( ) )
385
+ }
386
+
387
+ /// Returns whether we should be reading bytes from this peer, based on whether its outbound
388
+ /// buffer still has space and we don't need to pause reads to get some writes out.
389
+ fn should_read ( & self ) -> bool {
390
+ self . pending_outbound_buffer . len ( ) < OUTBOUND_BUFFER_LIMIT_READ_PAUSE
391
+ }
392
+
393
+ fn should_backfill_gossip ( & self ) -> bool {
394
+ self . pending_outbound_buffer . len ( ) < OUTBOUND_BUFFER_LIMIT_READ_PAUSE &&
395
+ self . msgs_sent_since_pong < BUFFER_DRAIN_MSGS_PER_TICK
396
+ }
397
+
398
+ /// Returns whether this peer's buffer is full and we should drop gossip messages.
399
+ fn buffer_full_drop_gossip ( & self ) -> bool {
400
+ if self . pending_outbound_buffer . len ( ) > OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP
401
+ || self . msgs_sent_since_pong > BUFFER_DRAIN_MSGS_PER_TICK * FORWARD_INIT_SYNC_BUFFER_LIMIT_RATIO {
402
+ return false
403
+ }
404
+ true
405
+ }
381
406
}
382
407
383
408
/// SimpleArcPeerManager is useful when you need a PeerManager with a static lifetime, e.g.
@@ -710,11 +735,11 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
710
735
711
736
fn do_attempt_write_data ( & self , descriptor : & mut Descriptor , peer : & mut Peer ) {
712
737
while !peer. awaiting_write_event {
713
- if peer. pending_outbound_buffer . len ( ) < OUTBOUND_BUFFER_LIMIT_READ_PAUSE && peer . msgs_sent_since_pong < BUFFER_DRAIN_MSGS_PER_TICK {
738
+ if peer. should_backfill_gossip ( ) {
714
739
match peer. sync_status {
715
740
InitSyncTracker :: NoSyncRequested => { } ,
716
741
InitSyncTracker :: ChannelsSyncing ( c) if c < 0xffff_ffff_ffff_ffff => {
717
- let steps = ( ( OUTBOUND_BUFFER_LIMIT_READ_PAUSE - peer. pending_outbound_buffer . len ( ) + 2 ) / 3 ) as u8 ;
742
+ let steps = ( ( peer. gossip_buffer_slots_available ( ) + 2 ) / 3 ) as u8 ;
718
743
let all_messages = self . message_handler . route_handler . get_next_channel_announcements ( c, steps) ;
719
744
for & ( ref announce, ref update_a_option, ref update_b_option) in all_messages. iter ( ) {
720
745
self . enqueue_message ( peer, announce) ;
@@ -731,7 +756,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
731
756
}
732
757
} ,
733
758
InitSyncTracker :: ChannelsSyncing ( c) if c == 0xffff_ffff_ffff_ffff => {
734
- let steps = ( OUTBOUND_BUFFER_LIMIT_READ_PAUSE - peer. pending_outbound_buffer . len ( ) ) as u8 ;
759
+ let steps = peer. gossip_buffer_slots_available ( ) as u8 ;
735
760
let all_messages = self . message_handler . route_handler . get_next_node_announcements ( None , steps) ;
736
761
for msg in all_messages. iter ( ) {
737
762
self . enqueue_message ( peer, msg) ;
@@ -743,7 +768,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
743
768
} ,
744
769
InitSyncTracker :: ChannelsSyncing ( _) => unreachable ! ( ) ,
745
770
InitSyncTracker :: NodesSyncing ( key) => {
746
- let steps = ( OUTBOUND_BUFFER_LIMIT_READ_PAUSE - peer. pending_outbound_buffer . len ( ) ) as u8 ;
771
+ let steps = peer. gossip_buffer_slots_available ( ) as u8 ;
747
772
let all_messages = self . message_handler . route_handler . get_next_node_announcements ( Some ( & key) , steps) ;
748
773
for msg in all_messages. iter ( ) {
749
774
self . enqueue_message ( peer, msg) ;
@@ -765,9 +790,8 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
765
790
Some ( buff) => buff,
766
791
} ;
767
792
768
- let should_be_reading = peer. pending_outbound_buffer . len ( ) < OUTBOUND_BUFFER_LIMIT_READ_PAUSE ;
769
793
let pending = & next_buff[ peer. pending_outbound_buffer_first_msg_offset ..] ;
770
- let data_sent = descriptor. send_data ( pending, should_be_reading ) ;
794
+ let data_sent = descriptor. send_data ( pending, peer . should_read ( ) ) ;
771
795
peer. pending_outbound_buffer_first_msg_offset += data_sent;
772
796
if peer. pending_outbound_buffer_first_msg_offset == next_buff. len ( ) { true } else { false }
773
797
} {
@@ -1045,7 +1069,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
1045
1069
}
1046
1070
}
1047
1071
}
1048
- pause_read = peer. pending_outbound_buffer . len ( ) > OUTBOUND_BUFFER_LIMIT_READ_PAUSE ;
1072
+ pause_read = ! peer. should_read ( ) ;
1049
1073
1050
1074
if let Some ( message) = msg_to_handle {
1051
1075
match self . handle_message ( & peer_mutex, peer_lock, message) {
@@ -1308,9 +1332,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
1308
1332
!peer. should_forward_channel_announcement ( msg. contents . short_channel_id ) {
1309
1333
continue
1310
1334
}
1311
- if peer. pending_outbound_buffer . len ( ) > OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP
1312
- || peer. msgs_sent_since_pong > BUFFER_DRAIN_MSGS_PER_TICK * FORWARD_INIT_SYNC_BUFFER_LIMIT_RATIO
1313
- {
1335
+ if peer. buffer_full_drop_gossip ( ) {
1314
1336
log_gossip ! ( self . logger, "Skipping broadcast message to {:?} as its outbound buffer is full" , peer. their_node_id) ;
1315
1337
continue ;
1316
1338
}
@@ -1334,9 +1356,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
1334
1356
!peer. should_forward_node_announcement ( msg. contents . node_id ) {
1335
1357
continue
1336
1358
}
1337
- if peer. pending_outbound_buffer . len ( ) > OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP
1338
- || peer. msgs_sent_since_pong > BUFFER_DRAIN_MSGS_PER_TICK * FORWARD_INIT_SYNC_BUFFER_LIMIT_RATIO
1339
- {
1359
+ if peer. buffer_full_drop_gossip ( ) {
1340
1360
log_gossip ! ( self . logger, "Skipping broadcast message to {:?} as its outbound buffer is full" , peer. their_node_id) ;
1341
1361
continue ;
1342
1362
}
@@ -1359,9 +1379,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
1359
1379
!peer. should_forward_channel_announcement ( msg. contents . short_channel_id ) {
1360
1380
continue
1361
1381
}
1362
- if peer. pending_outbound_buffer . len ( ) > OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP
1363
- || peer. msgs_sent_since_pong > BUFFER_DRAIN_MSGS_PER_TICK * FORWARD_INIT_SYNC_BUFFER_LIMIT_RATIO
1364
- {
1382
+ if peer. buffer_full_drop_gossip ( ) {
1365
1383
log_gossip ! ( self . logger, "Skipping broadcast message to {:?} as its outbound buffer is full" , peer. their_node_id) ;
1366
1384
continue ;
1367
1385
}
0 commit comments