@@ -309,15 +309,23 @@ enum InitSyncTracker{
309
309
/// forwarding gossip messages to peers altogether.
310
310
const FORWARD_INIT_SYNC_BUFFER_LIMIT_RATIO : usize = 2 ;
311
311
312
+ /// The ratio between buffer sizes at which we stop sending initial sync messages vs when we pause
313
+ /// forwarding onion messages to peers altogether.
314
+ const OM_BUFFER_LIMIT_RATIO : usize = 2 ;
315
+
312
316
/// When the outbound buffer has this many messages, we'll stop reading bytes from the peer until
313
317
/// we have fewer than this many messages in the outbound buffer again.
314
- /// We also use this as the target number of outbound gossip messages to keep in the write buffer,
315
- /// refilled as we send bytes.
318
+ /// We also use this as the target number of outbound gossip and onion messages to keep in the write
319
+ /// buffer, refilled as we send bytes.
316
320
const OUTBOUND_BUFFER_LIMIT_READ_PAUSE : usize = 10 ;
317
321
/// When the outbound buffer has this many messages, we'll simply skip relaying gossip messages to
318
322
/// the peer.
319
323
const OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP : usize = OUTBOUND_BUFFER_LIMIT_READ_PAUSE * FORWARD_INIT_SYNC_BUFFER_LIMIT_RATIO ;
320
324
325
+ /// When the outbound buffer has this many messages, we won't poll for new onion messages for this
326
+ /// peer.
327
+ const OUTBOUND_BUFFER_LIMIT_PAUSE_OMS : usize = OUTBOUND_BUFFER_LIMIT_READ_PAUSE * OM_BUFFER_LIMIT_RATIO ;
328
+
321
329
/// If we've sent a ping, and are still awaiting a response, we may need to churn our way through
322
330
/// the socket receive buffer before receiving the ping.
323
331
///
@@ -393,6 +401,22 @@ impl Peer {
393
401
InitSyncTracker :: NodesSyncing ( pk) => pk < node_id,
394
402
}
395
403
}
404
+
405
+ /// Returns the number of onion messages we can fit in this peer's buffer.
406
+ fn onion_message_buffer_slots_available ( & self ) -> usize {
407
+ cmp:: min (
408
+ OUTBOUND_BUFFER_LIMIT_PAUSE_OMS . saturating_sub ( self . pending_outbound_buffer . len ( ) ) ,
409
+ ( BUFFER_DRAIN_MSGS_PER_TICK * OM_BUFFER_LIMIT_RATIO ) . saturating_sub ( self . msgs_sent_since_pong ) )
410
+ }
411
+
412
+ /// Returns whether this peer's buffer is full and we should drop gossip messages.
413
+ fn buffer_full_drop_gossip ( & self ) -> bool {
414
+ if self . pending_outbound_buffer . len ( ) > OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP
415
+ || self . msgs_sent_since_pong > BUFFER_DRAIN_MSGS_PER_TICK * FORWARD_INIT_SYNC_BUFFER_LIMIT_RATIO {
416
+ return false
417
+ }
418
+ true
419
+ }
396
420
}
397
421
398
422
/// SimpleArcPeerManager is useful when you need a PeerManager with a static lifetime, e.g.
@@ -811,8 +835,12 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
811
835
/// ready to call `[write_buffer_space_avail`] again if a write call generated here isn't
812
836
/// sufficient!
813
837
///
838
+ /// If any bytes are written, [`process_events`] should be called afterwards.
839
+ // TODO: why?
840
+ ///
814
841
/// [`send_data`]: SocketDescriptor::send_data
815
842
/// [`write_buffer_space_avail`]: PeerManager::write_buffer_space_avail
843
+ /// [`process_events`]: PeerManager::process_events
816
844
pub fn write_buffer_space_avail ( & self , descriptor : & mut Descriptor ) -> Result < ( ) , PeerHandleError > {
817
845
let peers = self . peers . read ( ) . unwrap ( ) ;
818
846
match peers. get ( descriptor) {
@@ -1335,9 +1363,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
1335
1363
!peer. should_forward_channel_announcement ( msg. contents . short_channel_id ) {
1336
1364
continue
1337
1365
}
1338
- if peer. pending_outbound_buffer . len ( ) > OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP
1339
- || peer. msgs_sent_since_pong > BUFFER_DRAIN_MSGS_PER_TICK * FORWARD_INIT_SYNC_BUFFER_LIMIT_RATIO
1340
- {
1366
+ if peer. buffer_full_drop_gossip ( ) {
1341
1367
log_gossip ! ( self . logger, "Skipping broadcast message to {:?} as its outbound buffer is full" , peer. their_node_id) ;
1342
1368
continue ;
1343
1369
}
@@ -1361,9 +1387,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
1361
1387
!peer. should_forward_node_announcement ( msg. contents . node_id ) {
1362
1388
continue
1363
1389
}
1364
- if peer. pending_outbound_buffer . len ( ) > OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP
1365
- || peer. msgs_sent_since_pong > BUFFER_DRAIN_MSGS_PER_TICK * FORWARD_INIT_SYNC_BUFFER_LIMIT_RATIO
1366
- {
1390
+ if peer. buffer_full_drop_gossip ( ) {
1367
1391
log_gossip ! ( self . logger, "Skipping broadcast message to {:?} as its outbound buffer is full" , peer. their_node_id) ;
1368
1392
continue ;
1369
1393
}
@@ -1386,9 +1410,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
1386
1410
!peer. should_forward_channel_announcement ( msg. contents . short_channel_id ) {
1387
1411
continue
1388
1412
}
1389
- if peer. pending_outbound_buffer . len ( ) > OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP
1390
- || peer. msgs_sent_since_pong > BUFFER_DRAIN_MSGS_PER_TICK * FORWARD_INIT_SYNC_BUFFER_LIMIT_RATIO
1391
- {
1413
+ if peer. buffer_full_drop_gossip ( ) {
1392
1414
log_gossip ! ( self . logger, "Skipping broadcast message to {:?} as its outbound buffer is full" , peer. their_node_id) ;
1393
1415
continue ;
1394
1416
}
@@ -1412,6 +1434,8 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
1412
1434
/// You don't have to call this function explicitly if you are using [`lightning-net-tokio`]
1413
1435
/// or one of the other clients provided in our language bindings.
1414
1436
///
1437
+ /// Note that this method should be called again if any bytes are written.
1438
+ ///
1415
1439
/// Note that if there are any other calls to this function waiting on lock(s) this may return
1416
1440
/// without doing any work. All available events that need handling will be handled before the
1417
1441
/// other calls return.
@@ -1666,6 +1690,24 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
1666
1690
1667
1691
for ( descriptor, peer_mutex) in peers. iter ( ) {
1668
1692
self . do_attempt_write_data ( & mut ( * descriptor) . clone ( ) , & mut * peer_mutex. lock ( ) . unwrap ( ) ) ;
1693
+
1694
+ // Only see if we have room for onion messages after we've written all channel messages, to
1695
+ // ensure they take priority.
1696
+ let ( peer_node_id, om_buffer_slots_avail) = {
1697
+ let peer = peer_mutex. lock ( ) . unwrap ( ) ;
1698
+ if let Some ( peer_node_id) = peer. their_node_id {
1699
+ ( Some ( peer_node_id. clone ( ) ) , peer. onion_message_buffer_slots_available ( ) )
1700
+ } else { ( None , 0 ) }
1701
+ } ;
1702
+ if peer_node_id. is_some ( ) && om_buffer_slots_avail > 0 {
1703
+ for event in self . message_handler . onion_message_handler . next_onion_messages_for_peer (
1704
+ peer_node_id. unwrap ( ) , om_buffer_slots_avail)
1705
+ {
1706
+ if let MessageSendEvent :: SendOnionMessage { ref node_id, ref msg } = event {
1707
+ self . enqueue_message ( & mut * get_peer_for_forwarding ! ( node_id) , msg) ;
1708
+ }
1709
+ }
1710
+ }
1669
1711
}
1670
1712
}
1671
1713
if !peers_to_disconnect. is_empty ( ) {
0 commit comments