Skip to content

Commit 0b4056f

Browse files
Implement some rate limiting for onion messages.
In this commit, we add business logic for checking if a peer's outbound buffer has room for onion messages, and if so pulls them from an implementer of a new trait, OnionMessageProvider. Makes sure channel messages are prioritized over OMs. The onion_message module remains private until further rate limiting is added.
1 parent b317453 commit 0b4056f

File tree

1 file changed

+42
-2
lines changed

1 file changed

+42
-2
lines changed

lightning/src/ln/peer_handler.rs

Lines changed: 42 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -309,15 +309,23 @@ enum InitSyncTracker{
309309
/// forwarding gossip messages to peers altogether.
310310
const FORWARD_INIT_SYNC_BUFFER_LIMIT_RATIO: usize = 2;
311311

312+
/// The ratio between buffer sizes at which we stop sending initial sync messages vs when we pause
313+
/// forwarding onion messages to peers altogether.
314+
const OM_BUFFER_LIMIT_RATIO: usize = 2;
315+
312316
/// When the outbound buffer has this many messages, we'll stop reading bytes from the peer until
313317
/// we have fewer than this many messages in the outbound buffer again.
314-
/// We also use this as the target number of outbound gossip messages to keep in the write buffer,
315-
/// refilled as we send bytes.
318+
/// We also use this as the target number of outbound gossip and onion messages to keep in the write
319+
/// buffer, refilled as we send bytes.
316320
const OUTBOUND_BUFFER_LIMIT_READ_PAUSE: usize = 10;
317321
/// When the outbound buffer has this many messages, we'll simply skip relaying gossip messages to
318322
/// the peer.
319323
const OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP: usize = OUTBOUND_BUFFER_LIMIT_READ_PAUSE * FORWARD_INIT_SYNC_BUFFER_LIMIT_RATIO;
320324

325+
/// When the outbound buffer has this many messages, we won't poll for new onion messages for this
326+
/// peer.
327+
const OUTBOUND_BUFFER_LIMIT_PAUSE_OMS: usize = OUTBOUND_BUFFER_LIMIT_READ_PAUSE * OM_BUFFER_LIMIT_RATIO;
328+
321329
/// If we've sent a ping, and are still awaiting a response, we may need to churn our way through
322330
/// the socket receive buffer before receiving the ping.
323331
///
@@ -393,6 +401,14 @@ impl Peer {
393401
InitSyncTracker::NodesSyncing(pk) => pk < node_id,
394402
}
395403
}
404+
405+
/// Returns the number of onion messages we can fit in this peer's buffer.
406+
fn onion_message_buffer_slots_available(&self) -> usize {
407+
cmp::min(
408+
OUTBOUND_BUFFER_LIMIT_PAUSE_OMS.saturating_sub(self.pending_outbound_buffer.len()),
409+
(BUFFER_DRAIN_MSGS_PER_TICK * OM_BUFFER_LIMIT_RATIO).saturating_sub(self.msgs_sent_since_pong))
410+
}
411+
396412
/// Returns whether this peer's buffer is full and we should drop gossip messages.
397413
fn buffer_full_drop_gossip(&self) -> bool {
398414
if self.pending_outbound_buffer.len() > OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP
@@ -817,8 +833,12 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
817833
/// ready to call `[write_buffer_space_avail`] again if a write call generated here isn't
818834
/// sufficient!
819835
///
836+
/// If any bytes are written, [`process_events`] should be called afterwards.
837+
// TODO: why?
838+
///
820839
/// [`send_data`]: SocketDescriptor::send_data
821840
/// [`write_buffer_space_avail`]: PeerManager::write_buffer_space_avail
841+
/// [`process_events`]: PeerManager::process_events
822842
pub fn write_buffer_space_avail(&self, descriptor: &mut Descriptor) -> Result<(), PeerHandleError> {
823843
let peers = self.peers.read().unwrap();
824844
match peers.get(descriptor) {
@@ -1412,6 +1432,8 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
14121432
/// You don't have to call this function explicitly if you are using [`lightning-net-tokio`]
14131433
/// or one of the other clients provided in our language bindings.
14141434
///
1435+
/// Note that this method should be called again if any bytes are written.
1436+
///
14151437
/// Note that if there are any other calls to this function waiting on lock(s) this may return
14161438
/// without doing any work. All available events that need handling will be handled before the
14171439
/// other calls return.
@@ -1666,6 +1688,24 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
16661688

16671689
for (descriptor, peer_mutex) in peers.iter() {
16681690
self.do_attempt_write_data(&mut (*descriptor).clone(), &mut *peer_mutex.lock().unwrap());
1691+
1692+
// Only see if we have room for onion messages after we've written all channel messages, to
1693+
// ensure they take priority.
1694+
let (peer_node_id, om_buffer_slots_avail) = {
1695+
let peer = peer_mutex.lock().unwrap();
1696+
if let Some(peer_node_id) = peer.their_node_id {
1697+
(Some(peer_node_id.clone()), peer.onion_message_buffer_slots_available())
1698+
} else { (None, 0) }
1699+
};
1700+
if peer_node_id.is_some() && om_buffer_slots_avail > 0 {
1701+
for event in self.message_handler.onion_message_handler.next_onion_messages_for_peer(
1702+
peer_node_id.unwrap(), om_buffer_slots_avail)
1703+
{
1704+
if let MessageSendEvent::SendOnionMessage { ref node_id, ref msg } = event {
1705+
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
1706+
}
1707+
}
1708+
}
16691709
}
16701710
}
16711711
if !peers_to_disconnect.is_empty() {

0 commit comments

Comments
 (0)