@@ -309,15 +309,23 @@ enum InitSyncTracker{
309
309
/// forwarding gossip messages to peers altogether.
310
310
const FORWARD_INIT_SYNC_BUFFER_LIMIT_RATIO : usize = 2 ;
311
311
312
+ /// The ratio between buffer sizes at which we stop sending initial sync messages vs when we pause
313
+ /// forwarding onion messages to peers altogether.
314
+ const OM_BUFFER_LIMIT_RATIO : usize = 2 ;
315
+
312
316
/// When the outbound buffer has this many messages, we'll stop reading bytes from the peer until
313
317
/// we have fewer than this many messages in the outbound buffer again.
314
- /// We also use this as the target number of outbound gossip messages to keep in the write buffer,
315
- /// refilled as we send bytes.
318
+ /// We also use this as the target number of outbound gossip and onion messages to keep in the write
319
+ /// buffer, refilled as we send bytes.
316
320
const OUTBOUND_BUFFER_LIMIT_READ_PAUSE : usize = 10 ;
317
321
/// When the outbound buffer has this many messages, we'll simply skip relaying gossip messages to
318
322
/// the peer.
319
323
const OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP : usize = OUTBOUND_BUFFER_LIMIT_READ_PAUSE * FORWARD_INIT_SYNC_BUFFER_LIMIT_RATIO ;
320
324
325
+ /// When the outbound buffer has this many messages, we won't poll for new onion messages for this
326
+ /// peer.
327
+ const OUTBOUND_BUFFER_LIMIT_PAUSE_OMS : usize = OUTBOUND_BUFFER_LIMIT_READ_PAUSE * OM_BUFFER_LIMIT_RATIO ;
328
+
321
329
/// If we've sent a ping, and are still awaiting a response, we may need to churn our way through
322
330
/// the socket receive buffer before receiving the ping.
323
331
///
@@ -393,6 +401,22 @@ impl Peer {
393
401
InitSyncTracker :: NodesSyncing ( pk) => pk < node_id,
394
402
}
395
403
}
404
+
405
+ /// Returns the number of onion messages we can fit in this peer's buffer.
406
+ fn onion_message_buffer_slots_available ( & self ) -> usize {
407
+ cmp:: min (
408
+ OUTBOUND_BUFFER_LIMIT_PAUSE_OMS . saturating_sub ( self . pending_outbound_buffer . len ( ) ) ,
409
+ ( BUFFER_DRAIN_MSGS_PER_TICK * OM_BUFFER_LIMIT_RATIO ) . saturating_sub ( self . msgs_sent_since_pong ) )
410
+ }
411
+
412
+ /// Returns whether this peer's buffer is full and we should drop gossip messages.
413
+ fn buffer_full_drop_gossip ( & self ) -> bool {
414
+ if self . pending_outbound_buffer . len ( ) > OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP
415
+ || self . msgs_sent_since_pong > BUFFER_DRAIN_MSGS_PER_TICK * FORWARD_INIT_SYNC_BUFFER_LIMIT_RATIO {
416
+ return false
417
+ }
418
+ true
419
+ }
396
420
}
397
421
398
422
/// SimpleArcPeerManager is useful when you need a PeerManager with a static lifetime, e.g.
@@ -809,8 +833,12 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
809
833
/// ready to call `[write_buffer_space_avail`] again if a write call generated here isn't
810
834
/// sufficient!
811
835
///
836
+ /// If any bytes are written, [`process_events`] should be called afterwards.
837
+ // TODO: why?
838
+ ///
812
839
/// [`send_data`]: SocketDescriptor::send_data
813
840
/// [`write_buffer_space_avail`]: PeerManager::write_buffer_space_avail
841
+ /// [`process_events`]: PeerManager::process_events
814
842
pub fn write_buffer_space_avail ( & self , descriptor : & mut Descriptor ) -> Result < ( ) , PeerHandleError > {
815
843
let peers = self . peers . read ( ) . unwrap ( ) ;
816
844
match peers. get ( descriptor) {
@@ -1333,9 +1361,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
1333
1361
!peer. should_forward_channel_announcement ( msg. contents . short_channel_id ) {
1334
1362
continue
1335
1363
}
1336
- if peer. pending_outbound_buffer . len ( ) > OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP
1337
- || peer. msgs_sent_since_pong > BUFFER_DRAIN_MSGS_PER_TICK * FORWARD_INIT_SYNC_BUFFER_LIMIT_RATIO
1338
- {
1364
+ if peer. buffer_full_drop_gossip ( ) {
1339
1365
log_gossip ! ( self . logger, "Skipping broadcast message to {:?} as its outbound buffer is full" , peer. their_node_id) ;
1340
1366
continue ;
1341
1367
}
@@ -1359,9 +1385,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
1359
1385
!peer. should_forward_node_announcement ( msg. contents . node_id ) {
1360
1386
continue
1361
1387
}
1362
- if peer. pending_outbound_buffer . len ( ) > OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP
1363
- || peer. msgs_sent_since_pong > BUFFER_DRAIN_MSGS_PER_TICK * FORWARD_INIT_SYNC_BUFFER_LIMIT_RATIO
1364
- {
1388
+ if peer. buffer_full_drop_gossip ( ) {
1365
1389
log_gossip ! ( self . logger, "Skipping broadcast message to {:?} as its outbound buffer is full" , peer. their_node_id) ;
1366
1390
continue ;
1367
1391
}
@@ -1384,9 +1408,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
1384
1408
!peer. should_forward_channel_announcement ( msg. contents . short_channel_id ) {
1385
1409
continue
1386
1410
}
1387
- if peer. pending_outbound_buffer . len ( ) > OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP
1388
- || peer. msgs_sent_since_pong > BUFFER_DRAIN_MSGS_PER_TICK * FORWARD_INIT_SYNC_BUFFER_LIMIT_RATIO
1389
- {
1411
+ if peer. buffer_full_drop_gossip ( ) {
1390
1412
log_gossip ! ( self . logger, "Skipping broadcast message to {:?} as its outbound buffer is full" , peer. their_node_id) ;
1391
1413
continue ;
1392
1414
}
@@ -1410,6 +1432,8 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
1410
1432
/// You don't have to call this function explicitly if you are using [`lightning-net-tokio`]
1411
1433
/// or one of the other clients provided in our language bindings.
1412
1434
///
1435
+ /// Note that this method should be called again if any bytes are written.
1436
+ ///
1413
1437
/// Note that if there are any other calls to this function waiting on lock(s) this may return
1414
1438
/// without doing any work. All available events that need handling will be handled before the
1415
1439
/// other calls return.
@@ -1664,6 +1688,24 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
1664
1688
1665
1689
for ( descriptor, peer_mutex) in peers. iter ( ) {
1666
1690
self . do_attempt_write_data ( & mut ( * descriptor) . clone ( ) , & mut * peer_mutex. lock ( ) . unwrap ( ) ) ;
1691
+
1692
+ // Only see if we have room for onion messages after we've written all channel messages, to
1693
+ // ensure they take priority.
1694
+ let ( peer_node_id, om_buffer_slots_avail) = {
1695
+ let peer = peer_mutex. lock ( ) . unwrap ( ) ;
1696
+ if let Some ( peer_node_id) = peer. their_node_id {
1697
+ ( Some ( peer_node_id. clone ( ) ) , peer. onion_message_buffer_slots_available ( ) )
1698
+ } else { ( None , 0 ) }
1699
+ } ;
1700
+ if peer_node_id. is_some ( ) && om_buffer_slots_avail > 0 {
1701
+ for event in self . message_handler . onion_message_handler . next_onion_messages_for_peer (
1702
+ peer_node_id. unwrap ( ) , om_buffer_slots_avail)
1703
+ {
1704
+ if let MessageSendEvent :: SendOnionMessage { ref node_id, ref msg } = event {
1705
+ self . enqueue_message ( & mut * get_peer_for_forwarding ! ( node_id) , msg) ;
1706
+ }
1707
+ }
1708
+ }
1667
1709
}
1668
1710
}
1669
1711
if !peers_to_disconnect. is_empty ( ) {
0 commit comments