@@ -306,6 +306,10 @@ enum InitSyncTracker{
306
306
/// forwarding gossip messages to peers altogether.
307
307
const FORWARD_INIT_SYNC_BUFFER_LIMIT_RATIO : usize = 2 ;
308
308
309
+ /// The ratio between buffer sizes at which we stop sending initial sync messages vs when we pause
310
+ /// forwarding onion messages to peers altogether.
311
+ const OM_BUFFER_LIMIT_RATIO : usize = 2 ;
312
+
309
313
/// When the outbound buffer has this many messages, we'll stop reading bytes from the peer until
310
314
/// we have fewer than this many messages in the outbound buffer again.
311
315
/// We also use this as the target number of outbound gossip messages to keep in the write buffer,
@@ -315,6 +319,10 @@ const OUTBOUND_BUFFER_LIMIT_READ_PAUSE: usize = 10;
315
319
/// the peer.
316
320
const OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP : usize = OUTBOUND_BUFFER_LIMIT_READ_PAUSE * FORWARD_INIT_SYNC_BUFFER_LIMIT_RATIO ;
317
321
322
+ /// When the outbound buffer has this many messages, we won't poll for new onion messages for this
323
+ /// peer.
324
+ const OUTBOUND_BUFFER_LIMIT_PAUSE_OMS : usize = 16 ;
325
+
318
326
/// If we've sent a ping, and are still awaiting a response, we may need to churn our way through
319
327
/// the socket receive buffer before receiving the ping.
320
328
///
@@ -417,6 +425,14 @@ impl Peer {
417
425
}
418
426
true
419
427
}
428
+
429
+ /// Returns the number of onion messages we can fit in this peer's buffer.
430
+ fn onion_message_buffer_slots_available ( & self ) -> usize {
431
+ cmp:: min (
432
+ OUTBOUND_BUFFER_LIMIT_PAUSE_OMS . saturating_sub ( self . pending_outbound_buffer . len ( ) ) ,
433
+ ( BUFFER_DRAIN_MSGS_PER_TICK * OM_BUFFER_LIMIT_RATIO ) . saturating_sub ( self . msgs_sent_since_pong ) )
434
+ }
435
+
420
436
}
421
437
422
438
/// SimpleArcPeerManager is useful when you need a PeerManager with a static lifetime, e.g.
@@ -824,8 +840,12 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
824
840
/// ready to call `[write_buffer_space_avail`] again if a write call generated here isn't
825
841
/// sufficient!
826
842
///
843
+ /// If any bytes are written, [`process_events`] should be called afterwards.
844
+ // TODO: why?
845
+ ///
827
846
/// [`send_data`]: SocketDescriptor::send_data
828
847
/// [`write_buffer_space_avail`]: PeerManager::write_buffer_space_avail
848
+ /// [`process_events`]: PeerManager::process_events
829
849
pub fn write_buffer_space_avail ( & self , descriptor : & mut Descriptor ) -> Result < ( ) , PeerHandleError > {
830
850
let peers = self . peers . read ( ) . unwrap ( ) ;
831
851
match peers. get ( descriptor) {
@@ -1668,7 +1688,30 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
1668
1688
}
1669
1689
1670
1690
for ( descriptor, peer_mutex) in peers. iter ( ) {
1671
- self . do_attempt_write_data ( & mut ( * descriptor) . clone ( ) , & mut * peer_mutex. lock ( ) . unwrap ( ) ) ;
1691
+ let peer_node_id = {
1692
+ let mut peer = peer_mutex. lock ( ) . unwrap ( ) ;
1693
+ self . do_attempt_write_data ( & mut ( * descriptor) . clone ( ) , & mut * peer) ;
1694
+ peer. their_node_id
1695
+ } ;
1696
+
1697
+ // Only see if we have room for onion messages after we've written all channel messages, to
1698
+ // ensure the latter take priority.
1699
+ if let Some ( peer_node_id) = peer_node_id {
1700
+ loop {
1701
+ let om_buffer_slots_avail = {
1702
+ let peer = peer_mutex. lock ( ) . unwrap ( ) ;
1703
+ peer. onion_message_buffer_slots_available ( )
1704
+ } ;
1705
+ if om_buffer_slots_avail == 0 { break ; }
1706
+ let onion_msgs = self . message_handler . onion_message_handler . next_onion_messages_for_peer (
1707
+ peer_node_id, om_buffer_slots_avail) ;
1708
+ if onion_msgs. len ( ) == 0 { break ; }
1709
+ for msg in onion_msgs {
1710
+ self . enqueue_message ( & mut * get_peer_for_forwarding ! ( & peer_node_id) , & msg) ;
1711
+ }
1712
+ self . do_attempt_write_data ( & mut ( * descriptor) . clone ( ) , & mut * peer_mutex. lock ( ) . unwrap ( ) ) ;
1713
+ }
1714
+ }
1672
1715
}
1673
1716
}
1674
1717
if !peers_to_disconnect. is_empty ( ) {
0 commit comments