@@ -306,6 +306,10 @@ enum InitSyncTracker{
306
306
/// forwarding gossip messages to peers altogether.
307
307
const FORWARD_INIT_SYNC_BUFFER_LIMIT_RATIO : usize = 2 ;
308
308
309
+ /// The ratio between buffer sizes at which we stop sending initial sync messages vs when we pause
310
+ /// forwarding onion messages to peers altogether.
311
+ const OM_BUFFER_LIMIT_RATIO : usize = 2 ;
312
+
309
313
/// When the outbound buffer has this many messages, we'll stop reading bytes from the peer until
310
314
/// we have fewer than this many messages in the outbound buffer again.
311
315
/// We also use this as the target number of outbound gossip messages to keep in the write buffer,
@@ -315,6 +319,10 @@ const OUTBOUND_BUFFER_LIMIT_READ_PAUSE: usize = 10;
315
319
/// the peer.
316
320
const OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP : usize = OUTBOUND_BUFFER_LIMIT_READ_PAUSE * FORWARD_INIT_SYNC_BUFFER_LIMIT_RATIO ;
317
321
322
+ /// When the outbound buffer has this many messages, we won't poll for new onion messages for this
323
+ /// peer.
324
+ const OUTBOUND_BUFFER_LIMIT_PAUSE_OMS : usize = 8 ;
325
+
318
326
/// If we've sent a ping, and are still awaiting a response, we may need to churn our way through
319
327
/// the socket receive buffer before receiving the ping.
320
328
///
@@ -417,6 +425,13 @@ impl Peer {
417
425
}
418
426
true
419
427
}
428
+
429
+ /// Returns the number of onion messages we can fit in this peer's buffer.
430
+ fn onion_message_buffer_slots_available ( & self ) -> usize {
431
+ cmp:: min (
432
+ OUTBOUND_BUFFER_LIMIT_PAUSE_OMS . saturating_sub ( self . pending_outbound_buffer . len ( ) ) ,
433
+ ( BUFFER_DRAIN_MSGS_PER_TICK * OM_BUFFER_LIMIT_RATIO ) . saturating_sub ( self . msgs_sent_since_pong ) )
434
+ }
420
435
}
421
436
422
437
/// SimpleArcPeerManager is useful when you need a PeerManager with a static lifetime, e.g.
@@ -824,8 +839,11 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
824
839
/// ready to call `[write_buffer_space_avail`] again if a write call generated here isn't
825
840
/// sufficient!
826
841
///
842
+ /// If any bytes are written, [`process_events`] should be called afterwards.
843
+ ///
827
844
/// [`send_data`]: SocketDescriptor::send_data
828
845
/// [`write_buffer_space_avail`]: PeerManager::write_buffer_space_avail
846
+ /// [`process_events`]: PeerManager::process_events
829
847
pub fn write_buffer_space_avail ( & self , descriptor : & mut Descriptor ) -> Result < ( ) , PeerHandleError > {
830
848
let peers = self . peers . read ( ) . unwrap ( ) ;
831
849
match peers. get ( descriptor) {
@@ -1669,6 +1687,25 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
1669
1687
1670
1688
for ( descriptor, peer_mutex) in peers. iter ( ) {
1671
1689
self . do_attempt_write_data ( & mut ( * descriptor) . clone ( ) , & mut * peer_mutex. lock ( ) . unwrap ( ) ) ;
1690
+
1691
+ // Only see if we have room for onion messages after we've written all channel messages, to
1692
+ // ensure the latter take priority.
1693
+ loop {
1694
+ let ( peer_node_id, om_buffer_slots_avail) = {
1695
+ let peer = peer_mutex. lock ( ) . unwrap ( ) ;
1696
+ if let Some ( peer_node_id) = peer. their_node_id {
1697
+ ( peer_node_id, peer. onion_message_buffer_slots_available ( ) )
1698
+ } else { break ; }
1699
+ } ;
1700
+ if om_buffer_slots_avail == 0 { break ; }
1701
+ let onion_msgs = self . message_handler . onion_message_handler . next_onion_messages_for_peer (
1702
+ peer_node_id, om_buffer_slots_avail) ;
1703
+ if onion_msgs. len ( ) == 0 { break ; }
1704
+ for msg in onion_msgs {
1705
+ self . enqueue_message ( & mut * get_peer_for_forwarding ! ( & peer_node_id) , & msg) ;
1706
+ }
1707
+ self . do_attempt_write_data ( & mut ( * descriptor) . clone ( ) , & mut * peer_mutex. lock ( ) . unwrap ( ) ) ;
1708
+ }
1672
1709
}
1673
1710
}
1674
1711
if !peers_to_disconnect. is_empty ( ) {
0 commit comments