Skip to content

Commit 9b3fe9e

Browse files
Separate gossip broadcasts into their own queue in PeerManager
This allows us to better prioritize channel messages over gossip broadcasts and lays groundwork for rate limiting onion messages more simply, since they won't be competing with gossip broadcasts for space in the main message queue.
1 parent f7c31dc commit 9b3fe9e

File tree

1 file changed

+37
-17
lines changed

1 file changed

+37
-17
lines changed

lightning/src/ln/peer_handler.rs

Lines changed: 37 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -337,6 +337,9 @@ struct Peer {
337337

338338
pending_outbound_buffer: LinkedList<Vec<u8>>,
339339
pending_outbound_buffer_first_msg_offset: usize,
340+
// Queue gossip broadcasts separately from `pending_outbound_buffer` so we can easily prioritize
341+
// channel messages over them.
342+
gossip_broadcast_buffer: LinkedList<Vec<u8>>,
340343
awaiting_write_event: bool,
341344

342345
pending_read_buffer: Vec<u8>,
@@ -389,17 +392,26 @@ impl Peer {
389392
self.pending_outbound_buffer.len() < OUTBOUND_BUFFER_LIMIT_READ_PAUSE
390393
}
391394

392-
/// Determines if we should push additional gossip messages onto a peer's outbound buffer for
393-
/// backfilling gossip data to the peer. This is checked every time the peer's buffer may have
394-
/// been drained.
395+
/// Determines if we should push additional gossip background sync (aka "backfill") onto a peer's
396+
/// outbound buffer. This is checked every time the peer's buffer may have been drained.
395397
fn should_buffer_gossip_backfill(&self) -> bool {
396-
self.pending_outbound_buffer.is_empty() &&
397-
self.msgs_sent_since_pong < BUFFER_DRAIN_MSGS_PER_TICK
398+
self.pending_outbound_buffer.is_empty() && self.gossip_broadcast_buffer.is_empty()
399+
&& self.msgs_sent_since_pong < BUFFER_DRAIN_MSGS_PER_TICK
398400
}
399401

400-
/// Returns whether this peer's buffer is full and we should drop gossip messages.
402+
/// Determines if we should push additional gossip broadcast messages onto a peer's outbound
403+
/// buffer. This is checked every time the peer's buffer may have been drained.
404+
fn should_buffer_gossip_broadcast(&self) -> bool {
405+
self.pending_outbound_buffer.is_empty()
406+
&& self.msgs_sent_since_pong < BUFFER_DRAIN_MSGS_PER_TICK
407+
}
408+
409+
/// Returns whether this peer's outbound buffers are full and we should drop gossip broadcasts.
401410
fn buffer_full_drop_gossip_broadcast(&self) -> bool {
402-
self.pending_outbound_buffer.len() > OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP ||
411+
let total_outbound_buffered =
412+
self.gossip_broadcast_buffer.len() + self.pending_outbound_buffer.len();
413+
414+
total_outbound_buffered > OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP ||
403415
self.msgs_sent_since_pong > BUFFER_DRAIN_MSGS_PER_TICK * FORWARD_INIT_SYNC_BUFFER_LIMIT_RATIO
404416
}
405417
}
@@ -668,6 +680,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
668680

669681
pending_outbound_buffer: LinkedList::new(),
670682
pending_outbound_buffer_first_msg_offset: 0,
683+
gossip_broadcast_buffer: LinkedList::new(),
671684
awaiting_write_event: false,
672685

673686
pending_read_buffer,
@@ -714,6 +727,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
714727

715728
pending_outbound_buffer: LinkedList::new(),
716729
pending_outbound_buffer_first_msg_offset: 0,
730+
gossip_broadcast_buffer: LinkedList::new(),
717731
awaiting_write_event: false,
718732

719733
pending_read_buffer,
@@ -734,6 +748,11 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
734748

735749
fn do_attempt_write_data(&self, descriptor: &mut Descriptor, peer: &mut Peer) {
736750
while !peer.awaiting_write_event {
751+
if peer.should_buffer_gossip_broadcast() {
752+
if let Some(msg) = peer.gossip_broadcast_buffer.pop_front() {
753+
peer.pending_outbound_buffer.push_back(msg);
754+
}
755+
}
737756
if peer.should_buffer_gossip_backfill() {
738757
match peer.sync_status {
739758
InitSyncTracker::NoSyncRequested => {},
@@ -848,12 +867,6 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
848867
}
849868
}
850869

851-
/// Append a message to a peer's pending outbound/write buffer
852-
fn enqueue_encoded_message(&self, peer: &mut Peer, encoded_message: &Vec<u8>) {
853-
peer.msgs_sent_since_pong += 1;
854-
peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encoded_message[..]));
855-
}
856-
857870
/// Append a message to a peer's pending outbound/write buffer
858871
fn enqueue_message<M: wire::Type>(&self, peer: &mut Peer, message: &M) {
859872
let mut buffer = VecWriter(Vec::with_capacity(2048));
@@ -864,7 +877,14 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
864877
} else {
865878
log_trace!(self.logger, "Enqueueing message {:?} to {}", message, log_pubkey!(peer.their_node_id.unwrap()))
866879
}
867-
self.enqueue_encoded_message(peer, &buffer.0);
880+
peer.msgs_sent_since_pong += 1;
881+
peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&buffer.0[..]));
882+
}
883+
884+
/// Append a message to a peer's pending outbound/write gossip broadcast buffer
885+
fn enqueue_encoded_gossip_broadcast(&self, peer: &mut Peer, encoded_message: &Vec<u8>) {
886+
peer.msgs_sent_since_pong += 1;
887+
peer.gossip_broadcast_buffer.push_back(peer.channel_encryptor.encrypt_message(&encoded_message[..]));
868888
}
869889

870890
fn do_read_event(&self, peer_descriptor: &mut Descriptor, data: &[u8]) -> Result<bool, PeerHandleError> {
@@ -1333,7 +1353,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
13331353
if except_node.is_some() && peer.their_node_id.as_ref() == except_node {
13341354
continue;
13351355
}
1336-
self.enqueue_encoded_message(&mut *peer, &encoded_msg);
1356+
self.enqueue_encoded_gossip_broadcast(&mut *peer, &encoded_msg);
13371357
}
13381358
},
13391359
wire::Message::NodeAnnouncement(ref msg) => {
@@ -1356,7 +1376,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
13561376
if except_node.is_some() && peer.their_node_id.as_ref() == except_node {
13571377
continue;
13581378
}
1359-
self.enqueue_encoded_message(&mut *peer, &encoded_msg);
1379+
self.enqueue_encoded_gossip_broadcast(&mut *peer, &encoded_msg);
13601380
}
13611381
},
13621382
wire::Message::ChannelUpdate(ref msg) => {
@@ -1376,7 +1396,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
13761396
if except_node.is_some() && peer.their_node_id.as_ref() == except_node {
13771397
continue;
13781398
}
1379-
self.enqueue_encoded_message(&mut *peer, &encoded_msg);
1399+
self.enqueue_encoded_gossip_broadcast(&mut *peer, &encoded_msg);
13801400
}
13811401
},
13821402
_ => debug_assert!(false, "We shouldn't attempt to forward anything but gossip messages"),

0 commit comments

Comments
 (0)