Skip to content

Commit f4b2311

Browse files
committed
Add a test of gossip message buffer limiting in PeerManager
This adds a simple test that the gossip message buffer in `PeerManager` is limited, including the new behavior of bypassing the limit when the broadcast comes from the `ChannelMessageHandler`.
1 parent 21904ff commit f4b2311

File tree

3 files changed

+109
-19
lines changed

3 files changed

+109
-19
lines changed

lightning/src/ln/peer_handler.rs

Lines changed: 78 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2729,20 +2729,21 @@ fn is_gossip_msg(type_id: u16) -> bool {
27292729

27302730
#[cfg(test)]
27312731
mod tests {
2732+
use super::*;
2733+
27322734
use crate::sign::{NodeSigner, Recipient};
27332735
use crate::events;
27342736
use crate::io;
27352737
use crate::ln::types::ChannelId;
27362738
use crate::ln::features::{InitFeatures, NodeFeatures};
27372739
use crate::ln::peer_channel_encryptor::PeerChannelEncryptor;
2738-
use crate::ln::peer_handler::{CustomMessageHandler, PeerManager, MessageHandler, SocketDescriptor, IgnoringMessageHandler, filter_addresses, ErroringMessageHandler, MAX_BUFFER_DRAIN_TICK_INTERVALS_PER_PEER};
27392740
use crate::ln::{msgs, wire};
27402741
use crate::ln::msgs::{Init, LightningError, SocketAddress};
27412742
use crate::util::test_utils;
27422743

27432744
use bitcoin::Network;
27442745
use bitcoin::constants::ChainHash;
2745-
use bitcoin::secp256k1::{PublicKey, SecretKey};
2746+
use bitcoin::secp256k1::{PublicKey, SecretKey, Secp256k1};
27462747

27472748
use crate::sync::{Arc, Mutex};
27482749
use core::convert::Infallible;
@@ -3200,6 +3201,8 @@ mod tests {
32003201
let cfgs = create_peermgr_cfgs(2);
32013202
cfgs[0].routing_handler.request_full_sync.store(true, Ordering::Release);
32023203
cfgs[1].routing_handler.request_full_sync.store(true, Ordering::Release);
3204+
cfgs[0].routing_handler.announcement_available_for_sync.store(true, Ordering::Release);
3205+
cfgs[1].routing_handler.announcement_available_for_sync.store(true, Ordering::Release);
32033206
let peers = create_network(2, &cfgs);
32043207

32053208
// By calling establish_connect, we trigger do_attempt_write_data between
@@ -3363,6 +3366,79 @@ mod tests {
33633366
assert_eq!(peer_b.peers.read().unwrap().len(), 0);
33643367
}
33653368

3369+
#[test]
3370+
fn test_gossip_flood_pause() {
3371+
use crate::routing::test_utils::channel_announcement;
3372+
use lightning_types::features::ChannelFeatures;
3373+
3374+
// Simple test which connects two nodes to a PeerManager and checks that if we run out of
3375+
// socket buffer space we'll stop forwarding gossip but still push our own gossip.
3376+
let cfgs = create_peermgr_cfgs(2);
3377+
let peers = create_network(2, &cfgs);
3378+
let (mut fd_a, mut fd_b) = establish_connection(&peers[0], &peers[1]);
3379+
3380+
macro_rules! drain_queues { () => {
3381+
loop {
3382+
peers[0].process_events();
3383+
peers[1].process_events();
3384+
3385+
let msg = fd_a.outbound_data.lock().unwrap().split_off(0);
3386+
if !msg.is_empty() {
3387+
assert_eq!(peers[1].read_event(&mut fd_b, &msg).unwrap(), false);
3388+
continue;
3389+
}
3390+
let msg = fd_b.outbound_data.lock().unwrap().split_off(0);
3391+
if !msg.is_empty() {
3392+
assert_eq!(peers[0].read_event(&mut fd_a, &msg).unwrap(), false);
3393+
continue;
3394+
}
3395+
break;
3396+
}
3397+
} }
3398+
3399+
// First, make sure all pending messages have been processed and queues drained.
3400+
drain_queues!();
3401+
3402+
let secp_ctx = Secp256k1::new();
3403+
let key = SecretKey::from_slice(&[1; 32]).unwrap();
3404+
let msg = channel_announcement(&key, &key, ChannelFeatures::empty(), 42, &secp_ctx);
3405+
let msg_ev = MessageSendEvent::BroadcastChannelAnnouncement {
3406+
msg,
3407+
update_msg: None,
3408+
};
3409+
3410+
fd_a.hang_writes.store(true, Ordering::Relaxed);
3411+
3412+
// Now push an arbitrarily large number of messages and check that only
3413+
// `OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP` messages end up in the queue.
3414+
for _ in 0..OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP * 2 {
3415+
cfgs[0].routing_handler.pending_events.lock().unwrap().push(msg_ev.clone());
3416+
peers[0].process_events();
3417+
}
3418+
3419+
assert_eq!(peers[0].peers.read().unwrap().get(&fd_a).unwrap().lock().unwrap().gossip_broadcast_buffer.len(),
3420+
OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP);
3421+
3422+
// Check that if a broadcast message comes in from the channel handler (i.e. it is an
3423+
// announcement for our own channel), it gets queued anyway.
3424+
cfgs[0].chan_handler.pending_events.lock().unwrap().push(msg_ev);
3425+
peers[0].process_events();
3426+
assert_eq!(peers[0].peers.read().unwrap().get(&fd_a).unwrap().lock().unwrap().gossip_broadcast_buffer.len(),
3427+
OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP + 1);
3428+
3429+
// Finally, deliver all the messages and make sure we got the right count. Note that there
3430+
// was an extra message that had already moved from the broadcast queue to the encrypted
3431+
// message queue so we actually receive `OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP + 2` messages.
3432+
fd_a.hang_writes.store(false, Ordering::Relaxed);
3433+
cfgs[1].routing_handler.chan_anns_recvd.store(0, Ordering::Relaxed);
3434+
peers[0].write_buffer_space_avail(&mut fd_a).unwrap();
3435+
3436+
drain_queues!();
3437+
assert!(peers[0].peers.read().unwrap().get(&fd_a).unwrap().lock().unwrap().gossip_broadcast_buffer.is_empty());
3438+
assert_eq!(cfgs[1].routing_handler.chan_anns_recvd.load(Ordering::Relaxed),
3439+
OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP + 2);
3440+
}
3441+
33663442
#[test]
33673443
fn test_filter_addresses(){
33683444
// Tests the filter_addresses function.

lightning/src/routing/test_utils.rs

Lines changed: 18 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -27,13 +27,11 @@ use crate::sync::{self, Arc};
2727

2828
use crate::routing::gossip::NodeId;
2929

30-
// Using the same keys for LN and BTC ids
31-
pub(crate) fn add_channel(
32-
gossip_sync: &P2PGossipSync<Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestChainSource>, Arc<test_utils::TestLogger>>,
33-
secp_ctx: &Secp256k1<All>, node_1_privkey: &SecretKey, node_2_privkey: &SecretKey, features: ChannelFeatures, short_channel_id: u64
34-
) {
35-
let node_1_pubkey = PublicKey::from_secret_key(&secp_ctx, node_1_privkey);
36-
let node_id_1 = NodeId::from_pubkey(&node_1_pubkey);
30+
pub(crate) fn channel_announcement(
31+
node_1_privkey: &SecretKey, node_2_privkey: &SecretKey, features: ChannelFeatures,
32+
short_channel_id: u64, secp_ctx: &Secp256k1<All>,
33+
) -> ChannelAnnouncement {
34+
let node_id_1 = NodeId::from_pubkey(&PublicKey::from_secret_key(&secp_ctx, node_1_privkey));
3735
let node_id_2 = NodeId::from_pubkey(&PublicKey::from_secret_key(&secp_ctx, node_2_privkey));
3836

3937
let unsigned_announcement = UnsignedChannelAnnouncement {
@@ -48,13 +46,23 @@ pub(crate) fn add_channel(
4846
};
4947

5048
let msghash = hash_to_message!(&Sha256dHash::hash(&unsigned_announcement.encode()[..])[..]);
51-
let valid_announcement = ChannelAnnouncement {
49+
ChannelAnnouncement {
5250
node_signature_1: secp_ctx.sign_ecdsa(&msghash, node_1_privkey),
5351
node_signature_2: secp_ctx.sign_ecdsa(&msghash, node_2_privkey),
5452
bitcoin_signature_1: secp_ctx.sign_ecdsa(&msghash, node_1_privkey),
5553
bitcoin_signature_2: secp_ctx.sign_ecdsa(&msghash, node_2_privkey),
5654
contents: unsigned_announcement.clone(),
57-
};
55+
}
56+
}
57+
58+
// Using the same keys for LN and BTC ids
59+
pub(crate) fn add_channel(
60+
gossip_sync: &P2PGossipSync<Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestChainSource>, Arc<test_utils::TestLogger>>,
61+
secp_ctx: &Secp256k1<All>, node_1_privkey: &SecretKey, node_2_privkey: &SecretKey, features: ChannelFeatures, short_channel_id: u64
62+
) {
63+
let valid_announcement =
64+
channel_announcement(node_1_privkey, node_2_privkey, features, short_channel_id, secp_ctx);
65+
let node_1_pubkey = PublicKey::from_secret_key(&secp_ctx, node_1_privkey);
5866
match gossip_sync.handle_channel_announcement(Some(node_1_pubkey), &valid_announcement) {
5967
Ok(res) => assert!(res),
6068
_ => panic!()
@@ -108,7 +116,7 @@ pub(crate) fn update_channel(
108116

109117
pub(super) fn get_nodes(secp_ctx: &Secp256k1<All>) -> (SecretKey, PublicKey, Vec<SecretKey>, Vec<PublicKey>) {
110118
let privkeys: Vec<SecretKey> = (2..22).map(|i| {
111-
SecretKey::from_slice(&<Vec<u8>>::from_hex(&format!("{:02x}", i).repeat(32)).unwrap()[..]).unwrap()
119+
SecretKey::from_slice(&[i; 32]).unwrap()
112120
}).collect();
113121

114122
let pubkeys = privkeys.iter().map(|secret| PublicKey::from_secret_key(&secp_ctx, secret)).collect();

lightning/src/util/test_utils.rs

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -986,6 +986,7 @@ pub struct TestRoutingMessageHandler {
986986
pub chan_anns_recvd: AtomicUsize,
987987
pub pending_events: Mutex<Vec<events::MessageSendEvent>>,
988988
pub request_full_sync: AtomicBool,
989+
pub announcement_available_for_sync: AtomicBool,
989990
}
990991

991992
impl TestRoutingMessageHandler {
@@ -995,27 +996,32 @@ impl TestRoutingMessageHandler {
995996
chan_anns_recvd: AtomicUsize::new(0),
996997
pending_events: Mutex::new(vec![]),
997998
request_full_sync: AtomicBool::new(false),
999+
announcement_available_for_sync: AtomicBool::new(false),
9981000
}
9991001
}
10001002
}
10011003
impl msgs::RoutingMessageHandler for TestRoutingMessageHandler {
10021004
fn handle_node_announcement(&self, _their_node_id: Option<PublicKey>, _msg: &msgs::NodeAnnouncement) -> Result<bool, msgs::LightningError> {
1003-
Err(msgs::LightningError { err: "".to_owned(), action: msgs::ErrorAction::IgnoreError })
1005+
Ok(true)
10041006
}
10051007
fn handle_channel_announcement(&self, _their_node_id: Option<PublicKey>, _msg: &msgs::ChannelAnnouncement) -> Result<bool, msgs::LightningError> {
10061008
self.chan_anns_recvd.fetch_add(1, Ordering::AcqRel);
1007-
Err(msgs::LightningError { err: "".to_owned(), action: msgs::ErrorAction::IgnoreError })
1009+
Ok(true)
10081010
}
10091011
fn handle_channel_update(&self, _their_node_id: Option<PublicKey>, _msg: &msgs::ChannelUpdate) -> Result<bool, msgs::LightningError> {
10101012
self.chan_upds_recvd.fetch_add(1, Ordering::AcqRel);
1011-
Err(msgs::LightningError { err: "".to_owned(), action: msgs::ErrorAction::IgnoreError })
1013+
Ok(true)
10121014
}
10131015
fn get_next_channel_announcement(&self, starting_point: u64) -> Option<(msgs::ChannelAnnouncement, Option<msgs::ChannelUpdate>, Option<msgs::ChannelUpdate>)> {
1014-
let chan_upd_1 = get_dummy_channel_update(starting_point);
1015-
let chan_upd_2 = get_dummy_channel_update(starting_point);
1016-
let chan_ann = get_dummy_channel_announcement(starting_point);
1016+
if self.announcement_available_for_sync.load(Ordering::Acquire) {
1017+
let chan_upd_1 = get_dummy_channel_update(starting_point);
1018+
let chan_upd_2 = get_dummy_channel_update(starting_point);
1019+
let chan_ann = get_dummy_channel_announcement(starting_point);
10171020

1018-
Some((chan_ann, Some(chan_upd_1), Some(chan_upd_2)))
1021+
Some((chan_ann, Some(chan_upd_1), Some(chan_upd_2)))
1022+
} else {
1023+
None
1024+
}
10191025
}
10201026

10211027
fn get_next_node_announcement(&self, _starting_point: Option<&NodeId>) -> Option<msgs::NodeAnnouncement> {

0 commit comments

Comments
 (0)