Skip to content

Commit 5f4e243

Browse files
WIP: separate gossip broadcasts into their own queue in PeerManager
This allows us to better prioritize channel messages over gossip broadcasts and lays groundwork for rate limiting onion messages more simply.
1 parent 06d3daf commit 5f4e243

File tree

1 file changed

+47
-23
lines changed

1 file changed

+47
-23
lines changed

lightning/src/ln/peer_handler.rs

Lines changed: 47 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -337,6 +337,8 @@ struct Peer {
337337

338338
pending_outbound_buffer: LinkedList<Vec<u8>>,
339339
pending_outbound_buffer_first_msg_offset: usize,
340+
gossip_broadcast_buffer: LinkedList<Vec<u8>>,
341+
gossip_broadcast_buffer_first_msg_offset: usize,
340342
awaiting_write_event: bool,
341343

342344
pending_read_buffer: Vec<u8>,
@@ -393,15 +395,18 @@ impl Peer {
393395
/// backfilling gossip data to the peer. This is checked every time the peer's buffer may have
394396
/// been drained.
395397
fn should_buffer_gossip_backfill(&self) -> bool {
396-
self.pending_outbound_buffer.is_empty() &&
397-
self.msgs_sent_since_pong < BUFFER_DRAIN_MSGS_PER_TICK
398+
self.pending_outbound_buffer.is_empty() && !self.buffer_full_drop_gossip_broadcast() // TODO: are broadcasts higher priority than background sync? seems like maybe they should be
399+
&& self.msgs_sent_since_pong < BUFFER_DRAIN_MSGS_PER_TICK
398400
}
399401

400-
/// Returns whether this peer's buffer is full and we should drop gossip messages.
402+
/// Returns whether this peer's outbound buffers are full and we should drop gossip broadcasts.
401403
fn buffer_full_drop_gossip_broadcast(&self) -> bool {
402-
if self.pending_outbound_buffer.len() > OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP
403-
|| self.msgs_sent_since_pong > BUFFER_DRAIN_MSGS_PER_TICK * FORWARD_INIT_SYNC_BUFFER_LIMIT_RATIO {
404-
return false
404+
let total_outbound_buffered =
405+
self.gossip_broadcast_buffer.len() + self.pending_outbound_buffer.len();
406+
407+
if total_outbound_buffered > OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP ||
408+
self.msgs_sent_since_pong > BUFFER_DRAIN_MSGS_PER_TICK * FORWARD_INIT_SYNC_BUFFER_LIMIT_RATIO
409+
{
405410
return true
406411
}
407412
false
@@ -672,6 +677,8 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
672677

673678
pending_outbound_buffer: LinkedList::new(),
674679
pending_outbound_buffer_first_msg_offset: 0,
680+
gossip_broadcast_buffer: LinkedList::new(),
681+
gossip_broadcast_buffer_first_msg_offset: 0,
675682
awaiting_write_event: false,
676683

677684
pending_read_buffer,
@@ -718,6 +725,8 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
718725

719726
pending_outbound_buffer: LinkedList::new(),
720727
pending_outbound_buffer_first_msg_offset: 0,
728+
gossip_broadcast_buffer: LinkedList::new(),
729+
gossip_broadcast_buffer_first_msg_offset: 0,
721730
awaiting_write_event: false,
722731

723732
pending_read_buffer,
@@ -780,20 +789,29 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
780789
self.maybe_send_extra_ping(peer);
781790
}
782791

783-
let next_buff = match peer.pending_outbound_buffer.front() {
784-
None => return,
785-
Some(buff) => buff,
786-
};
787-
788-
let pending = &next_buff[peer.pending_outbound_buffer_first_msg_offset..];
789-
let data_sent = descriptor.send_data(pending, peer.should_read());
790-
peer.pending_outbound_buffer_first_msg_offset += data_sent;
791-
if peer.pending_outbound_buffer_first_msg_offset == next_buff.len() {
792-
peer.pending_outbound_buffer_first_msg_offset = 0;
793-
peer.pending_outbound_buffer.pop_front();
794-
} else {
795-
peer.awaiting_write_event = true;
796-
}
792+
// TODO: break into methods in `peer` or somehow DRY
793+
// We prioritize channel messages over gossip broadcasts
794+
if let Some(next_buff) = peer.pending_outbound_buffer.front() {
795+
let pending = &next_buff[peer.pending_outbound_buffer_first_msg_offset..];
796+
let data_sent = descriptor.send_data(pending, peer.should_read());
797+
peer.pending_outbound_buffer_first_msg_offset += data_sent;
798+
if peer.pending_outbound_buffer_first_msg_offset == next_buff.len() {
799+
peer.pending_outbound_buffer_first_msg_offset = 0;
800+
peer.pending_outbound_buffer.pop_front();
801+
} else {
802+
peer.awaiting_write_event = true;
803+
}
804+
} else if let Some(next_buff) = peer.gossip_broadcast_buffer.front() {
805+
let pending = &next_buff[peer.gossip_broadcast_buffer_first_msg_offset..];
806+
let data_sent = descriptor.send_data(pending, peer.should_read());
807+
peer.gossip_broadcast_buffer_first_msg_offset += data_sent;
808+
if peer.gossip_broadcast_buffer_first_msg_offset == next_buff.len() {
809+
peer.gossip_broadcast_buffer_first_msg_offset = 0;
810+
peer.gossip_broadcast_buffer.pop_front();
811+
} else {
812+
peer.awaiting_write_event = true;
813+
}
814+
} else { return }
797815
}
798816
}
799817

@@ -858,6 +876,12 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
858876
peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encoded_message[..]));
859877
}
860878

879+
/// Append a message to a peer's pending outbound/write gossip broadcast buffer
880+
fn enqueue_encoded_gossip_broadcast(&self, peer: &mut Peer, encoded_message: &Vec<u8>) {
881+
peer.msgs_sent_since_pong += 1;
882+
peer.gossip_broadcast_buffer.push_back(peer.channel_encryptor.encrypt_message(&encoded_message[..]));
883+
}
884+
861885
/// Append a message to a peer's pending outbound/write buffer
862886
fn enqueue_message<M: wire::Type>(&self, peer: &mut Peer, message: &M) {
863887
let mut buffer = VecWriter(Vec::with_capacity(2048));
@@ -1337,7 +1361,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
13371361
if except_node.is_some() && peer.their_node_id.as_ref() == except_node {
13381362
continue;
13391363
}
1340-
self.enqueue_encoded_message(&mut *peer, &encoded_msg);
1364+
self.enqueue_encoded_gossip_broadcast(&mut *peer, &encoded_msg);
13411365
}
13421366
},
13431367
wire::Message::NodeAnnouncement(ref msg) => {
@@ -1360,7 +1384,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
13601384
if except_node.is_some() && peer.their_node_id.as_ref() == except_node {
13611385
continue;
13621386
}
1363-
self.enqueue_encoded_message(&mut *peer, &encoded_msg);
1387+
self.enqueue_encoded_gossip_broadcast(&mut *peer, &encoded_msg);
13641388
}
13651389
},
13661390
wire::Message::ChannelUpdate(ref msg) => {
@@ -1380,7 +1404,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
13801404
if except_node.is_some() && peer.their_node_id.as_ref() == except_node {
13811405
continue;
13821406
}
1383-
self.enqueue_encoded_message(&mut *peer, &encoded_msg);
1407+
self.enqueue_encoded_gossip_broadcast(&mut *peer, &encoded_msg);
13841408
}
13851409
},
13861410
_ => debug_assert!(false, "We shouldn't attempt to forward anything but gossip messages"),

0 commit comments

Comments
 (0)