Skip to content

Commit 3ee065b

Browse files
committed
Backfill gossip without buffering directly in LDK
Instead of backfilling gossip by buffering (up to) ten messages at a time, only buffer one message at a time, as the peers' outbound socket buffer drains. This moves the outbound backfill messages out of `PeerHandler` and into the operating system buffer, where it arguably belongs. Not buffering causes us to walk the gossip B-Trees somewhat more often, but avoids allocating vecs for the responses. While its probably (without having benchmarked it) a net performance loss, it simplifies buffer tracking and leaves us with more room to play with the buffer sizing constants as we add onion message forwarding which is an important win. Note that because we change how often we check if we're out of messages to send before pinging, we slightly change how many messages are exchanged at once, impacting the `test_do_attempt_write_data` constants.
1 parent 7c2b3ed commit 3ee065b

File tree

6 files changed

+84
-109
lines changed

6 files changed

+84
-109
lines changed

lightning-net-tokio/src/lib.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -562,8 +562,8 @@ mod tests {
562562
fn handle_node_announcement(&self, _msg: &NodeAnnouncement) -> Result<bool, LightningError> { Ok(false) }
563563
fn handle_channel_announcement(&self, _msg: &ChannelAnnouncement) -> Result<bool, LightningError> { Ok(false) }
564564
fn handle_channel_update(&self, _msg: &ChannelUpdate) -> Result<bool, LightningError> { Ok(false) }
565-
fn get_next_channel_announcements(&self, _starting_point: u64, _batch_amount: u8) -> Vec<(ChannelAnnouncement, Option<ChannelUpdate>, Option<ChannelUpdate>)> { Vec::new() }
566-
fn get_next_node_announcements(&self, _starting_point: Option<&PublicKey>, _batch_amount: u8) -> Vec<NodeAnnouncement> { Vec::new() }
565+
fn get_next_channel_announcements(&self, _starting_point: u64) -> Option<(ChannelAnnouncement, Option<ChannelUpdate>, Option<ChannelUpdate>)> { None }
566+
fn get_next_node_announcements(&self, _starting_point: Option<&PublicKey>) -> Option<NodeAnnouncement> { None }
567567
fn peer_connected(&self, _their_node_id: &PublicKey, _init_msg: &Init) { }
568568
fn handle_reply_channel_range(&self, _their_node_id: &PublicKey, _msg: ReplyChannelRange) -> Result<(), LightningError> { Ok(()) }
569569
fn handle_reply_short_channel_ids_end(&self, _their_node_id: &PublicKey, _msg: ReplyShortChannelIdsEnd) -> Result<(), LightningError> { Ok(()) }

lightning/src/ln/functional_test_utils.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -318,20 +318,20 @@ impl<'a, 'b, 'c> Drop for Node<'a, 'b, 'c> {
318318
);
319319
let mut chan_progress = 0;
320320
loop {
321-
let orig_announcements = self.gossip_sync.get_next_channel_announcements(chan_progress, 255);
322-
let deserialized_announcements = gossip_sync.get_next_channel_announcements(chan_progress, 255);
321+
let orig_announcements = self.gossip_sync.get_next_channel_announcements(chan_progress);
322+
let deserialized_announcements = gossip_sync.get_next_channel_announcements(chan_progress);
323323
assert!(orig_announcements == deserialized_announcements);
324-
chan_progress = match orig_announcements.last() {
324+
chan_progress = match orig_announcements {
325325
Some(announcement) => announcement.0.contents.short_channel_id + 1,
326326
None => break,
327327
};
328328
}
329329
let mut node_progress = None;
330330
loop {
331-
let orig_announcements = self.gossip_sync.get_next_node_announcements(node_progress.as_ref(), 255);
332-
let deserialized_announcements = gossip_sync.get_next_node_announcements(node_progress.as_ref(), 255);
331+
let orig_announcements = self.gossip_sync.get_next_node_announcements(node_progress.as_ref());
332+
let deserialized_announcements = gossip_sync.get_next_node_announcements(node_progress.as_ref());
333333
assert!(orig_announcements == deserialized_announcements);
334-
node_progress = match orig_announcements.last() {
334+
node_progress = match orig_announcements {
335335
Some(announcement) => Some(announcement.contents.node_id),
336336
None => break,
337337
};

lightning/src/ln/msgs.rs

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -915,15 +915,15 @@ pub trait RoutingMessageHandler : MessageSendEventsProvider {
915915
/// Handle an incoming channel_update message, returning true if it should be forwarded on,
916916
/// false or returning an Err otherwise.
917917
fn handle_channel_update(&self, msg: &ChannelUpdate) -> Result<bool, LightningError>;
918-
/// Gets a subset of the channel announcements and updates required to dump our routing table
919-
/// to a remote node, starting at the short_channel_id indicated by starting_point and
920-
/// including the batch_amount entries immediately higher in numerical value than starting_point.
921-
fn get_next_channel_announcements(&self, starting_point: u64, batch_amount: u8) -> Vec<(ChannelAnnouncement, Option<ChannelUpdate>, Option<ChannelUpdate>)>;
922-
/// Gets a subset of the node announcements required to dump our routing table to a remote node,
923-
/// starting at the node *after* the provided publickey and including batch_amount entries
924-
/// immediately higher (as defined by <PublicKey as Ord>::cmp) than starting_point.
918+
/// Gets channel announcements and updates required to dump our routing table to a remote node,
919+
/// starting at the short_channel_id indicated by starting_point and including announcements
920+
/// for a single channel.
921+
fn get_next_channel_announcements(&self, starting_point: u64) -> Option<(ChannelAnnouncement, Option<ChannelUpdate>, Option<ChannelUpdate>)>;
922+
/// Gets a node announcement required to dump our routing table to a remote node, starting at
923+
/// the node *after* the provided publickey and including up to one announcement immediately
924+
/// higher (as defined by <PublicKey as Ord>::cmp) than starting_point.
925925
/// If None is provided for starting_point, we start at the first node.
926-
fn get_next_node_announcements(&self, starting_point: Option<&PublicKey>, batch_amount: u8) -> Vec<NodeAnnouncement>;
926+
fn get_next_node_announcements(&self, starting_point: Option<&PublicKey>) -> Option<NodeAnnouncement>;
927927
/// Called when a connection is established with a peer. This can be used to
928928
/// perform routing table synchronization using a strategy defined by the
929929
/// implementor.

lightning/src/ln/peer_handler.rs

Lines changed: 28 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -67,9 +67,9 @@ impl RoutingMessageHandler for IgnoringMessageHandler {
6767
fn handle_node_announcement(&self, _msg: &msgs::NodeAnnouncement) -> Result<bool, LightningError> { Ok(false) }
6868
fn handle_channel_announcement(&self, _msg: &msgs::ChannelAnnouncement) -> Result<bool, LightningError> { Ok(false) }
6969
fn handle_channel_update(&self, _msg: &msgs::ChannelUpdate) -> Result<bool, LightningError> { Ok(false) }
70-
fn get_next_channel_announcements(&self, _starting_point: u64, _batch_amount: u8) ->
71-
Vec<(msgs::ChannelAnnouncement, Option<msgs::ChannelUpdate>, Option<msgs::ChannelUpdate>)> { Vec::new() }
72-
fn get_next_node_announcements(&self, _starting_point: Option<&PublicKey>, _batch_amount: u8) -> Vec<msgs::NodeAnnouncement> { Vec::new() }
70+
fn get_next_channel_announcements(&self, _starting_point: u64) ->
71+
Option<(msgs::ChannelAnnouncement, Option<msgs::ChannelUpdate>, Option<msgs::ChannelUpdate>)> { None }
72+
fn get_next_node_announcements(&self, _starting_point: Option<&PublicKey>) -> Option<msgs::NodeAnnouncement> { None }
7373
fn peer_connected(&self, _their_node_id: &PublicKey, _init: &msgs::Init) {}
7474
fn handle_reply_channel_range(&self, _their_node_id: &PublicKey, _msg: msgs::ReplyChannelRange) -> Result<(), LightningError> { Ok(()) }
7575
fn handle_reply_short_channel_ids_end(&self, _their_node_id: &PublicKey, _msg: msgs::ReplyShortChannelIdsEnd) -> Result<(), LightningError> { Ok(()) }
@@ -383,19 +383,17 @@ impl Peer {
383383
}
384384
}
385385

386-
/// Returns the number of gossip messages we can fit in this peer's buffer.
387-
fn gossip_buffer_slots_available(&self) -> usize {
388-
OUTBOUND_BUFFER_LIMIT_READ_PAUSE.saturating_sub(self.pending_outbound_buffer.len())
389-
}
390-
391386
/// Returns whether we should be reading bytes from this peer, based on whether its outbound
392387
/// buffer still has space and we don't need to pause reads to get some writes out.
393388
fn should_read(&self) -> bool {
394389
self.pending_outbound_buffer.len() < OUTBOUND_BUFFER_LIMIT_READ_PAUSE
395390
}
396391

397-
fn should_backfill_gossip(&self) -> bool {
398-
self.pending_outbound_buffer.len() < OUTBOUND_BUFFER_LIMIT_READ_PAUSE &&
392+
/// Determines if we should push additional gossip messages onto a peer's outbound buffer for
393+
/// backfilling gossip data to the peer. This is checked every time the peer's buffer may have
394+
/// been drained.
395+
fn should_buffer_gossip_backfill(&self) -> bool {
396+
self.pending_outbound_buffer.is_empty() &&
399397
self.msgs_sent_since_pong < BUFFER_DRAIN_MSGS_PER_TICK
400398
}
401399

@@ -739,46 +737,39 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
739737

740738
fn do_attempt_write_data(&self, descriptor: &mut Descriptor, peer: &mut Peer) {
741739
while !peer.awaiting_write_event {
742-
if peer.should_backfill_gossip() {
740+
if peer.should_buffer_gossip_backfill() {
743741
match peer.sync_status {
744742
InitSyncTracker::NoSyncRequested => {},
745743
InitSyncTracker::ChannelsSyncing(c) if c < 0xffff_ffff_ffff_ffff => {
746-
let steps = ((peer.gossip_buffer_slots_available() + 2) / 3) as u8;
747-
let all_messages = self.message_handler.route_handler.get_next_channel_announcements(c, steps);
748-
for &(ref announce, ref update_a_option, ref update_b_option) in all_messages.iter() {
749-
self.enqueue_message(peer, announce);
750-
if let &Some(ref update_a) = update_a_option {
751-
self.enqueue_message(peer, update_a);
744+
if let Some((announce, update_a_option, update_b_option)) =
745+
self.message_handler.route_handler.get_next_channel_announcements(c)
746+
{
747+
self.enqueue_message(peer, &announce);
748+
if let Some(update_a) = update_a_option {
749+
self.enqueue_message(peer, &update_a);
752750
}
753-
if let &Some(ref update_b) = update_b_option {
754-
self.enqueue_message(peer, update_b);
751+
if let Some(update_b) = update_b_option {
752+
self.enqueue_message(peer, &update_b);
755753
}
756754
peer.sync_status = InitSyncTracker::ChannelsSyncing(announce.contents.short_channel_id + 1);
757-
}
758-
if all_messages.is_empty() || all_messages.len() != steps as usize {
755+
} else {
759756
peer.sync_status = InitSyncTracker::ChannelsSyncing(0xffff_ffff_ffff_ffff);
760757
}
761758
},
762759
InitSyncTracker::ChannelsSyncing(c) if c == 0xffff_ffff_ffff_ffff => {
763-
let steps = peer.gossip_buffer_slots_available() as u8;
764-
let all_messages = self.message_handler.route_handler.get_next_node_announcements(None, steps);
765-
for msg in all_messages.iter() {
766-
self.enqueue_message(peer, msg);
760+
if let Some(msg) = self.message_handler.route_handler.get_next_node_announcements(None) {
761+
self.enqueue_message(peer, &msg);
767762
peer.sync_status = InitSyncTracker::NodesSyncing(msg.contents.node_id);
768-
}
769-
if all_messages.is_empty() || all_messages.len() != steps as usize {
763+
} else {
770764
peer.sync_status = InitSyncTracker::NoSyncRequested;
771765
}
772766
},
773767
InitSyncTracker::ChannelsSyncing(_) => unreachable!(),
774768
InitSyncTracker::NodesSyncing(key) => {
775-
let steps = peer.gossip_buffer_slots_available() as u8;
776-
let all_messages = self.message_handler.route_handler.get_next_node_announcements(Some(&key), steps);
777-
for msg in all_messages.iter() {
778-
self.enqueue_message(peer, msg);
769+
if let Some(msg) = self.message_handler.route_handler.get_next_node_announcements(Some(&key)) {
770+
self.enqueue_message(peer, &msg);
779771
peer.sync_status = InitSyncTracker::NodesSyncing(msg.contents.node_id);
780-
}
781-
if all_messages.is_empty() || all_messages.len() != steps as usize {
772+
} else {
782773
peer.sync_status = InitSyncTracker::NoSyncRequested;
783774
}
784775
},
@@ -2082,10 +2073,10 @@ mod tests {
20822073

20832074
// Check that each peer has received the expected number of channel updates and channel
20842075
// announcements.
2085-
assert_eq!(cfgs[0].routing_handler.chan_upds_recvd.load(Ordering::Acquire), 100);
2086-
assert_eq!(cfgs[0].routing_handler.chan_anns_recvd.load(Ordering::Acquire), 50);
2087-
assert_eq!(cfgs[1].routing_handler.chan_upds_recvd.load(Ordering::Acquire), 100);
2088-
assert_eq!(cfgs[1].routing_handler.chan_anns_recvd.load(Ordering::Acquire), 50);
2076+
assert_eq!(cfgs[0].routing_handler.chan_upds_recvd.load(Ordering::Acquire), 108);
2077+
assert_eq!(cfgs[0].routing_handler.chan_anns_recvd.load(Ordering::Acquire), 54);
2078+
assert_eq!(cfgs[1].routing_handler.chan_upds_recvd.load(Ordering::Acquire), 108);
2079+
assert_eq!(cfgs[1].routing_handler.chan_anns_recvd.load(Ordering::Acquire), 54);
20892080
}
20902081

20912082
#[test]

lightning/src/routing/gossip.rs

Lines changed: 32 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -318,11 +318,10 @@ where C::Target: chain::Access, L::Target: Logger
318318
Ok(msg.contents.excess_data.len() <= MAX_EXCESS_BYTES_FOR_RELAY)
319319
}
320320

321-
fn get_next_channel_announcements(&self, starting_point: u64, batch_amount: u8) -> Vec<(ChannelAnnouncement, Option<ChannelUpdate>, Option<ChannelUpdate>)> {
322-
let mut result = Vec::with_capacity(batch_amount as usize);
321+
fn get_next_channel_announcements(&self, starting_point: u64) -> Option<(ChannelAnnouncement, Option<ChannelUpdate>, Option<ChannelUpdate>)> {
323322
let channels = self.network_graph.channels.read().unwrap();
324323
let mut iter = channels.range(starting_point..);
325-
while result.len() < batch_amount as usize {
324+
loop {
326325
if let Some((_, ref chan)) = iter.next() {
327326
if chan.announcement_message.is_some() {
328327
let chan_announcement = chan.announcement_message.clone().unwrap();
@@ -334,20 +333,18 @@ where C::Target: chain::Access, L::Target: Logger
334333
if let Some(two_to_one) = chan.two_to_one.as_ref() {
335334
two_to_one_announcement = two_to_one.last_update_message.clone();
336335
}
337-
result.push((chan_announcement, one_to_two_announcement, two_to_one_announcement));
336+
return Some((chan_announcement, one_to_two_announcement, two_to_one_announcement));
338337
} else {
339338
// TODO: We may end up sending un-announced channel_updates if we are sending
340339
// initial sync data while receiving announce/updates for this channel.
341340
}
342341
} else {
343-
return result;
342+
return None;
344343
}
345344
}
346-
result
347345
}
348346

349-
fn get_next_node_announcements(&self, starting_point: Option<&PublicKey>, batch_amount: u8) -> Vec<NodeAnnouncement> {
350-
let mut result = Vec::with_capacity(batch_amount as usize);
347+
fn get_next_node_announcements(&self, starting_point: Option<&PublicKey>) -> Option<NodeAnnouncement> {
351348
let nodes = self.network_graph.nodes.read().unwrap();
352349
let mut iter = if let Some(pubkey) = starting_point {
353350
let mut iter = nodes.range(NodeId::from_pubkey(pubkey)..);
@@ -356,18 +353,17 @@ where C::Target: chain::Access, L::Target: Logger
356353
} else {
357354
nodes.range::<NodeId, _>(..)
358355
};
359-
while result.len() < batch_amount as usize {
356+
loop {
360357
if let Some((_, ref node)) = iter.next() {
361358
if let Some(node_info) = node.announcement_info.as_ref() {
362-
if node_info.announcement_message.is_some() {
363-
result.push(node_info.announcement_message.clone().unwrap());
359+
if let Some(msg) = node_info.announcement_message.clone() {
360+
return Some(msg);
364361
}
365362
}
366363
} else {
367-
return result;
364+
return None;
368365
}
369366
}
370-
result
371367
}
372368

373369
/// Initiates a stateless sync of routing gossip information with a peer
@@ -2412,8 +2408,8 @@ mod tests {
24122408
let node_2_privkey = &SecretKey::from_slice(&[41; 32]).unwrap();
24132409

24142410
// Channels were not announced yet.
2415-
let channels_with_announcements = gossip_sync.get_next_channel_announcements(0, 1);
2416-
assert_eq!(channels_with_announcements.len(), 0);
2411+
let channels_with_announcements = gossip_sync.get_next_channel_announcements(0);
2412+
assert!(channels_with_announcements.is_none());
24172413

24182414
let short_channel_id;
24192415
{
@@ -2427,17 +2423,15 @@ mod tests {
24272423
}
24282424

24292425
// Contains initial channel announcement now.
2430-
let channels_with_announcements = gossip_sync.get_next_channel_announcements(short_channel_id, 1);
2431-
assert_eq!(channels_with_announcements.len(), 1);
2432-
if let Some(channel_announcements) = channels_with_announcements.first() {
2433-
let &(_, ref update_1, ref update_2) = channel_announcements;
2426+
let channels_with_announcements = gossip_sync.get_next_channel_announcements(short_channel_id);
2427+
if let Some(channel_announcements) = channels_with_announcements {
2428+
let (_, ref update_1, ref update_2) = channel_announcements;
24342429
assert_eq!(update_1, &None);
24352430
assert_eq!(update_2, &None);
24362431
} else {
24372432
panic!();
24382433
}
24392434

2440-
24412435
{
24422436
// Valid channel update
24432437
let valid_channel_update = get_signed_channel_update(|unsigned_channel_update| {
@@ -2450,10 +2444,9 @@ mod tests {
24502444
}
24512445

24522446
// Now contains an initial announcement and an update.
2453-
let channels_with_announcements = gossip_sync.get_next_channel_announcements(short_channel_id, 1);
2454-
assert_eq!(channels_with_announcements.len(), 1);
2455-
if let Some(channel_announcements) = channels_with_announcements.first() {
2456-
let &(_, ref update_1, ref update_2) = channel_announcements;
2447+
let channels_with_announcements = gossip_sync.get_next_channel_announcements(short_channel_id);
2448+
if let Some(channel_announcements) = channels_with_announcements {
2449+
let (_, ref update_1, ref update_2) = channel_announcements;
24572450
assert_ne!(update_1, &None);
24582451
assert_eq!(update_2, &None);
24592452
} else {
@@ -2473,19 +2466,18 @@ mod tests {
24732466
}
24742467

24752468
// Test that announcements with excess data won't be returned
2476-
let channels_with_announcements = gossip_sync.get_next_channel_announcements(short_channel_id, 1);
2477-
assert_eq!(channels_with_announcements.len(), 1);
2478-
if let Some(channel_announcements) = channels_with_announcements.first() {
2479-
let &(_, ref update_1, ref update_2) = channel_announcements;
2469+
let channels_with_announcements = gossip_sync.get_next_channel_announcements(short_channel_id);
2470+
if let Some(channel_announcements) = channels_with_announcements {
2471+
let (_, ref update_1, ref update_2) = channel_announcements;
24802472
assert_eq!(update_1, &None);
24812473
assert_eq!(update_2, &None);
24822474
} else {
24832475
panic!();
24842476
}
24852477

24862478
// Further starting point have no channels after it
2487-
let channels_with_announcements = gossip_sync.get_next_channel_announcements(short_channel_id + 1000, 1);
2488-
assert_eq!(channels_with_announcements.len(), 0);
2479+
let channels_with_announcements = gossip_sync.get_next_channel_announcements(short_channel_id + 1000);
2480+
assert!(channels_with_announcements.is_none());
24892481
}
24902482

24912483
#[test]
@@ -2497,8 +2489,8 @@ mod tests {
24972489
let node_id_1 = PublicKey::from_secret_key(&secp_ctx, node_1_privkey);
24982490

24992491
// No nodes yet.
2500-
let next_announcements = gossip_sync.get_next_node_announcements(None, 10);
2501-
assert_eq!(next_announcements.len(), 0);
2492+
let next_announcements = gossip_sync.get_next_node_announcements(None);
2493+
assert!(next_announcements.is_none());
25022494

25032495
{
25042496
// Announce a channel to add 2 nodes
@@ -2509,10 +2501,9 @@ mod tests {
25092501
};
25102502
}
25112503

2512-
25132504
// Nodes were never announced
2514-
let next_announcements = gossip_sync.get_next_node_announcements(None, 3);
2515-
assert_eq!(next_announcements.len(), 0);
2505+
let next_announcements = gossip_sync.get_next_node_announcements(None);
2506+
assert!(next_announcements.is_none());
25162507

25172508
{
25182509
let valid_announcement = get_signed_node_announcement(|_| {}, node_1_privkey, &secp_ctx);
@@ -2528,12 +2519,12 @@ mod tests {
25282519
};
25292520
}
25302521

2531-
let next_announcements = gossip_sync.get_next_node_announcements(None, 3);
2532-
assert_eq!(next_announcements.len(), 2);
2522+
let next_announcements = gossip_sync.get_next_node_announcements(None);
2523+
assert!(next_announcements.is_some());
25332524

25342525
// Skip the first node.
2535-
let next_announcements = gossip_sync.get_next_node_announcements(Some(&node_id_1), 2);
2536-
assert_eq!(next_announcements.len(), 1);
2526+
let next_announcements = gossip_sync.get_next_node_announcements(Some(&node_id_1));
2527+
assert!(next_announcements.is_some());
25372528

25382529
{
25392530
// Later announcement which should not be relayed (excess data) prevent us from sharing a node
@@ -2547,8 +2538,8 @@ mod tests {
25472538
};
25482539
}
25492540

2550-
let next_announcements = gossip_sync.get_next_node_announcements(Some(&node_id_1), 2);
2551-
assert_eq!(next_announcements.len(), 0);
2541+
let next_announcements = gossip_sync.get_next_node_announcements(Some(&node_id_1));
2542+
assert!(next_announcements.is_none());
25522543
}
25532544

25542545
#[test]

0 commit comments

Comments
 (0)