Skip to content

Initiate gossip_queries #736

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 12 commits into from
Dec 15, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions fuzz/src/full_stack.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use bitcoin::blockdata::script::{Builder, Script};
use bitcoin::blockdata::opcodes;
use bitcoin::consensus::encode::deserialize;
use bitcoin::network::constants::Network;
use bitcoin::blockdata::constants::genesis_block;

use bitcoin::hashes::Hash as TraitImport;
use bitcoin::hashes::HashEngine as TraitImportEngine;
Expand Down Expand Up @@ -343,7 +344,7 @@ pub fn do_test(data: &[u8], logger: &Arc<dyn Logger>) {
config.peer_channel_config_limits.min_dust_limit_satoshis = 0;
let channelmanager = Arc::new(ChannelManager::new(Network::Bitcoin, fee_est.clone(), monitor.clone(), broadcast.clone(), Arc::clone(&logger), keys_manager.clone(), config, 0));
let our_id = PublicKey::from_secret_key(&Secp256k1::signing_only(), &keys_manager.get_node_secret());
let net_graph_msg_handler = Arc::new(NetGraphMsgHandler::new(None, Arc::clone(&logger)));
let net_graph_msg_handler = Arc::new(NetGraphMsgHandler::new(genesis_block(Network::Bitcoin).header.block_hash(), None, Arc::clone(&logger)));

let peers = RefCell::new([false; 256]);
let mut loss_detector = MoneyLossDetector::new(&peers, channelmanager.clone(), monitor.clone(), PeerManager::new(MessageHandler {
Expand Down Expand Up @@ -609,7 +610,7 @@ mod tests {
// What each byte represents is broken down below, and then everything is concatenated into
// one large test at the end (you want %s/ -.*//g %s/\n\| \|\t\|\///g).

// Following BOLT 8, lightning message on the wire are: 2-byte encrypted message length +
// Following BOLT 8, lightning message on the wire are: 2-byte encrypted message length +
// 16-byte MAC of the encrypted message length + encrypted Lightning message + 16-byte MAC
// of the Lightning message
// I.e 2nd inbound read, len 18 : 0006 (encrypted message length) + 03000000000000000000000000000000 (MAC of the encrypted message length)
Expand Down
4 changes: 3 additions & 1 deletion fuzz/src/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ use lightning::util::ser::Readable;
use lightning::routing::network_graph::{NetworkGraph, RoutingFees};

use bitcoin::secp256k1::key::PublicKey;
use bitcoin::network::constants::Network;
use bitcoin::blockdata::constants::genesis_block;

use utils::test_logger;

Expand Down Expand Up @@ -155,7 +157,7 @@ pub fn do_test<Out: test_logger::Output>(data: &[u8], out: Out) {
let logger: Arc<dyn Logger> = Arc::new(test_logger::TestLogger::new("".to_owned(), out));

let our_pubkey = get_pubkey!();
let mut net_graph = NetworkGraph::new();
let mut net_graph = NetworkGraph::new(genesis_block(Network::Bitcoin).header.block_hash());

let mut node_pks = HashSet::new();
let mut scid = 42;
Expand Down
6 changes: 5 additions & 1 deletion lightning-net-tokio/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -535,7 +535,11 @@ mod tests {
fn handle_htlc_fail_channel_update(&self, _update: &HTLCFailChannelUpdate) { }
fn get_next_channel_announcements(&self, _starting_point: u64, _batch_amount: u8) -> Vec<(ChannelAnnouncement, Option<ChannelUpdate>, Option<ChannelUpdate>)> { Vec::new() }
fn get_next_node_announcements(&self, _starting_point: Option<&PublicKey>, _batch_amount: u8) -> Vec<NodeAnnouncement> { Vec::new() }
fn should_request_full_sync(&self, _node_id: &PublicKey) -> bool { false }
fn sync_routing_table(&self, _their_node_id: &PublicKey, _init_msg: &Init) { }
fn handle_reply_channel_range(&self, _their_node_id: &PublicKey, _msg: ReplyChannelRange) -> Result<(), LightningError> { Ok(()) }
fn handle_reply_short_channel_ids_end(&self, _their_node_id: &PublicKey, _msg: ReplyShortChannelIdsEnd) -> Result<(), LightningError> { Ok(()) }
fn handle_query_channel_range(&self, _their_node_id: &PublicKey, _msg: QueryChannelRange) -> Result<(), LightningError> { Ok(()) }
fn handle_query_short_channel_ids(&self, _their_node_id: &PublicKey, _msg: QueryShortChannelIds) -> Result<(), LightningError> { Ok(()) }
}
impl ChannelMessageHandler for MsgHandler {
fn handle_open_channel(&self, _their_node_id: &PublicKey, _their_features: InitFeatures, _msg: &OpenChannel) {}
Expand Down
2 changes: 2 additions & 0 deletions lightning/src/ln/channelmanager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3404,6 +3404,8 @@ impl<ChanSigner: ChannelKeys, M: Deref + Sync + Send, T: Deref + Sync + Send, K:
&events::MessageSendEvent::BroadcastChannelUpdate { .. } => true,
&events::MessageSendEvent::HandleError { ref node_id, .. } => node_id != counterparty_node_id,
&events::MessageSendEvent::PaymentFailureNetworkUpdate { .. } => true,
&events::MessageSendEvent::SendChannelRangeQuery { .. } => false,
&events::MessageSendEvent::SendShortIdsQuery { .. } => false,
}
});
}
Expand Down
36 changes: 33 additions & 3 deletions lightning/src/ln/features.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ mod sealed {
],
optional_features: [
// Byte 0
DataLossProtect | InitialRoutingSync | UpfrontShutdownScript,
DataLossProtect | InitialRoutingSync | UpfrontShutdownScript | GossipQueries,
// Byte 1
VariableLengthOnion | PaymentSecret,
// Byte 2
Expand All @@ -122,7 +122,7 @@ mod sealed {
],
optional_features: [
// Byte 0
DataLossProtect | UpfrontShutdownScript,
DataLossProtect | UpfrontShutdownScript | GossipQueries,
// Byte 1
VariableLengthOnion | PaymentSecret,
// Byte 2
Expand Down Expand Up @@ -243,6 +243,8 @@ mod sealed {
"Feature flags for `initial_routing_sync`.");
define_feature!(5, UpfrontShutdownScript, [InitContext, NodeContext],
"Feature flags for `option_upfront_shutdown_script`.");
define_feature!(7, GossipQueries, [InitContext, NodeContext],
"Feature flags for `gossip_queries`.");
define_feature!(9, VariableLengthOnion, [InitContext, NodeContext],
"Feature flags for `var_onion_optin`.");
define_feature!(13, StaticRemoteKey, [InitContext, NodeContext],
Expand Down Expand Up @@ -473,6 +475,22 @@ impl<T: sealed::UpfrontShutdownScript> Features<T> {
}
}


impl<T: sealed::GossipQueries> Features<T> {
#[cfg(test)]
pub(crate) fn requires_gossip_queries(&self) -> bool {
<T as sealed::GossipQueries>::requires_feature(&self.flags)
}
pub(crate) fn supports_gossip_queries(&self) -> bool {
<T as sealed::GossipQueries>::supports_feature(&self.flags)
}
#[cfg(test)]
pub(crate) fn clear_gossip_queries(mut self) -> Self {
<T as sealed::GossipQueries>::clear_bits(&mut self.flags);
self
}
}

impl<T: sealed::VariableLengthOnion> Features<T> {
#[cfg(test)]
pub(crate) fn requires_variable_length_onion(&self) -> bool {
Expand All @@ -497,6 +515,10 @@ impl<T: sealed::InitialRoutingSync> Features<T> {
pub(crate) fn initial_routing_sync(&self) -> bool {
<T as sealed::InitialRoutingSync>::supports_feature(&self.flags)
}
// We are no longer setting initial_routing_sync now that gossip_queries
// is enabled. This feature is ignored by a peer when gossip_queries has
// been negotiated.
#[cfg(test)]
pub(crate) fn clear_initial_routing_sync(&mut self) {
<T as sealed::InitialRoutingSync>::clear_bits(&mut self.flags)
}
Expand Down Expand Up @@ -568,6 +590,11 @@ mod tests {
assert!(!InitFeatures::known().requires_upfront_shutdown_script());
assert!(!NodeFeatures::known().requires_upfront_shutdown_script());

assert!(InitFeatures::known().supports_gossip_queries());
assert!(NodeFeatures::known().supports_gossip_queries());
assert!(!InitFeatures::known().requires_gossip_queries());
assert!(!NodeFeatures::known().requires_gossip_queries());

assert!(InitFeatures::known().supports_data_loss_protect());
assert!(NodeFeatures::known().supports_data_loss_protect());
assert!(!InitFeatures::known().requires_data_loss_protect());
Expand Down Expand Up @@ -620,9 +647,10 @@ mod tests {

#[test]
fn convert_to_context_with_relevant_flags() {
let init_features = InitFeatures::known().clear_upfront_shutdown_script();
let init_features = InitFeatures::known().clear_upfront_shutdown_script().clear_gossip_queries();
assert!(init_features.initial_routing_sync());
assert!(!init_features.supports_upfront_shutdown_script());
assert!(!init_features.supports_gossip_queries());

let node_features: NodeFeatures = init_features.to_context();
{
Expand All @@ -639,8 +667,10 @@ mod tests {
// Check that cleared flags are kept blank when converting back:
// - initial_routing_sync was not applicable to NodeContext
// - upfront_shutdown_script was cleared before converting
// - gossip_queries was cleared before converting
let features: InitFeatures = node_features.to_context_internal();
assert!(!features.initial_routing_sync());
assert!(!features.supports_upfront_shutdown_script());
assert!(!init_features.supports_gossip_queries());
}
}
2 changes: 1 addition & 1 deletion lightning/src/ln/functional_test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1171,7 +1171,7 @@ pub fn create_network<'a, 'b: 'a, 'c: 'b>(node_count: usize, cfgs: &'b Vec<NodeC
let payment_count = Rc::new(RefCell::new(0));

for i in 0..node_count {
let net_graph_msg_handler = NetGraphMsgHandler::new(None, cfgs[i].logger);
let net_graph_msg_handler = NetGraphMsgHandler::new(cfgs[i].chain_source.genesis_hash, None, cfgs[i].logger);
nodes.push(Node{ chain_source: cfgs[i].chain_source,
tx_broadcaster: cfgs[i].tx_broadcaster, chain_monitor: &cfgs[i].chain_monitor,
keys_manager: &cfgs[i].keys_manager, node: &chan_mgrs[i], net_graph_msg_handler,
Expand Down
29 changes: 26 additions & 3 deletions lightning/src/ln/msgs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -804,7 +804,13 @@ pub trait ChannelMessageHandler : events::MessageSendEventsProvider + Send + Syn
}

/// A trait to describe an object which can receive routing messages.
pub trait RoutingMessageHandler : Send + Sync {
///
/// # Implementor DoS Warnings
///
/// For `gossip_queries` messages there are potential DoS vectors when handling
/// inbound queries. Implementors using an on-disk network graph should be aware of
/// repeated disk I/O for queries accessing different parts of the network graph.
pub trait RoutingMessageHandler : Send + Sync + events::MessageSendEventsProvider {
/// Handle an incoming node_announcement message, returning true if it should be forwarded on,
/// false or returning an Err otherwise.
fn handle_node_announcement(&self, msg: &NodeAnnouncement) -> Result<bool, LightningError>;
Expand All @@ -825,8 +831,25 @@ pub trait RoutingMessageHandler : Send + Sync {
/// immediately higher (as defined by <PublicKey as Ord>::cmp) than starting_point.
/// If None is provided for starting_point, we start at the first node.
fn get_next_node_announcements(&self, starting_point: Option<&PublicKey>, batch_amount: u8) -> Vec<NodeAnnouncement>;
/// Returns whether a full sync should be requested from a peer.
fn should_request_full_sync(&self, node_id: &PublicKey) -> bool;
/// Called when a connection is established with a peer. This can be used to
/// perform routing table synchronization using a strategy defined by the
/// implementor.
fn sync_routing_table(&self, their_node_id: &PublicKey, init: &Init);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since its ultimately up to the implementer what they want to do here, it may make sense for the docs to be a little less prescriptive (ie, after the first few nodes you may not want to always sync the routing table) and just describe that this is called on connection and implementers should decide if they want to sync the table.

/// Handles the reply of a query we initiated to learn about channels
/// for a given range of blocks. We can expect to receive one or more
/// replies to a single query.
fn handle_reply_channel_range(&self, their_node_id: &PublicKey, msg: ReplyChannelRange) -> Result<(), LightningError>;
/// Handles the reply of a query we initiated asking for routing gossip
/// messages for a list of channels. We should receive this message when
/// a node has completed its best effort to send us the pertaining routing
/// gossip messages.
fn handle_reply_short_channel_ids_end(&self, their_node_id: &PublicKey, msg: ReplyShortChannelIdsEnd) -> Result<(), LightningError>;
/// Handles when a peer asks us to send a list of short_channel_ids
/// for the requested range of blocks.
fn handle_query_channel_range(&self, their_node_id: &PublicKey, msg: QueryChannelRange) -> Result<(), LightningError>;
/// Handles when a peer asks us to send routing gossip messages for a
/// list of short_channel_ids.
fn handle_query_short_channel_ids(&self, their_node_id: &PublicKey, msg: QueryShortChannelIds) -> Result<(), LightningError>;
}

mod fuzzy_internal_msgs {
Expand Down
87 changes: 32 additions & 55 deletions lightning/src/ln/peer_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -584,11 +584,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D

peer.their_node_id = Some(their_node_id);
insert_node_id!();
let mut features = InitFeatures::known();
if !self.message_handler.route_handler.should_request_full_sync(&peer.their_node_id.unwrap()) {
features.clear_initial_routing_sync();
}

let features = InitFeatures::known();
let resp = msgs::Init { features };
self.enqueue_message(&mut peers.peers_needing_send, peer, peer_descriptor.clone(), &resp);
},
Expand Down Expand Up @@ -694,10 +690,11 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
}

log_info!(
self.logger, "Received peer Init message: data_loss_protect: {}, initial_routing_sync: {}, upfront_shutdown_script: {}, static_remote_key: {}, unknown flags (local and global): {}",
self.logger, "Received peer Init message: data_loss_protect: {}, initial_routing_sync: {}, upfront_shutdown_script: {}, gossip_queries: {}, static_remote_key: {}, unknown flags (local and global): {}",
if msg.features.supports_data_loss_protect() { "supported" } else { "not supported"},
if msg.features.initial_routing_sync() { "requested" } else { "not requested" },
if msg.features.supports_upfront_shutdown_script() { "supported" } else { "not supported"},
if msg.features.supports_gossip_queries() { "supported" } else { "not supported" },
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I presume we should call sync_routing_table at the bottom of this block where we call peer_connected?

if msg.features.supports_static_remote_key() { "supported" } else { "not supported"},
if msg.features.supports_unknown_bits() { "present" } else { "none" }
);
Expand All @@ -712,15 +709,13 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
}

if !peer.outbound {
let mut features = InitFeatures::known();
if !self.message_handler.route_handler.should_request_full_sync(&peer.their_node_id.unwrap()) {
features.clear_initial_routing_sync();
}

let features = InitFeatures::known();
let resp = msgs::Init { features };
self.enqueue_message(peers_needing_send, peer, peer_descriptor.clone(), &resp);
}

self.message_handler.route_handler.sync_routing_table(&peer.their_node_id.unwrap(), &msg);

self.message_handler.chan_handler.peer_connected(&peer.their_node_id.unwrap(), &msg);
peer.their_features = Some(msg.features);
},
Expand Down Expand Up @@ -840,6 +835,21 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
// TODO: forward msg along to all our other peers!
}
},
wire::Message::QueryShortChannelIds(msg) => {
self.message_handler.route_handler.handle_query_short_channel_ids(&peer.their_node_id.unwrap(), msg)?;
},
wire::Message::ReplyShortChannelIdsEnd(msg) => {
self.message_handler.route_handler.handle_reply_short_channel_ids_end(&peer.their_node_id.unwrap(), msg)?;
},
wire::Message::QueryChannelRange(msg) => {
self.message_handler.route_handler.handle_query_channel_range(&peer.their_node_id.unwrap(), msg)?;
},
wire::Message::ReplyChannelRange(msg) => {
self.message_handler.route_handler.handle_reply_channel_range(&peer.their_node_id.unwrap(), msg)?;
},
wire::Message::GossipTimestampFilter(_msg) => {
// TODO: handle message
},

// Unknown messages:
wire::Message::Unknown(msg_type) if msg_type.is_even() => {
Expand All @@ -864,6 +874,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
// drop optional-ish messages when send buffers get full!

let mut events_generated = self.message_handler.chan_handler.get_and_clear_pending_msg_events();
events_generated.append(&mut self.message_handler.route_handler.get_and_clear_pending_msg_events());
let mut peers_lock = self.peers.lock().unwrap();
let peers = &mut *peers_lock;
for event in events_generated.drain(..) {
Expand Down Expand Up @@ -1115,6 +1126,16 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
self.do_attempt_write_data(&mut descriptor, peer);
},
}
},
MessageSendEvent::SendChannelRangeQuery { ref node_id, ref msg } => {
let (mut descriptor, peer) = get_peer_for_forwarding!(node_id, {});
peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg)));
self.do_attempt_write_data(&mut descriptor, peer);
},
MessageSendEvent::SendShortIdsQuery { ref node_id, ref msg } => {
let (mut descriptor, peer) = get_peer_for_forwarding!(node_id, {});
peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg)));
self.do_attempt_write_data(&mut descriptor, peer);
}
}
}
Expand Down Expand Up @@ -1302,13 +1323,6 @@ mod tests {
(fd_a.clone(), fd_b.clone())
}

fn establish_connection_and_read_events<'a>(peer_a: &PeerManager<FileDescriptor, &'a test_utils::TestChannelMessageHandler, &'a test_utils::TestRoutingMessageHandler, &'a test_utils::TestLogger>, peer_b: &PeerManager<FileDescriptor, &'a test_utils::TestChannelMessageHandler, &'a test_utils::TestRoutingMessageHandler, &'a test_utils::TestLogger>) -> (FileDescriptor, FileDescriptor) {
let (mut fd_a, mut fd_b) = establish_connection(peer_a, peer_b);
assert_eq!(peer_b.read_event(&mut fd_b, &fd_a.outbound_data.lock().unwrap().split_off(0)).unwrap(), false);
assert_eq!(peer_a.read_event(&mut fd_a, &fd_b.outbound_data.lock().unwrap().split_off(0)).unwrap(), false);
(fd_a.clone(), fd_b.clone())
}

#[test]
fn test_disconnect_peer() {
// Simple test which builds a network of PeerManager, connects and brings them to NoiseState::Finished and
Expand Down Expand Up @@ -1377,41 +1391,4 @@ mod tests {
assert_eq!(cfgs[1].routing_handler.chan_upds_recvd.load(Ordering::Acquire), 100);
assert_eq!(cfgs[1].routing_handler.chan_anns_recvd.load(Ordering::Acquire), 50);
}

#[test]
fn limit_initial_routing_sync_requests() {
// Inbound peer 0 requests initial_routing_sync, but outbound peer 1 does not.
{
let cfgs = create_peermgr_cfgs(2);
cfgs[0].routing_handler.request_full_sync.store(true, Ordering::Release);
let peers = create_network(2, &cfgs);
let (fd_0_to_1, fd_1_to_0) = establish_connection_and_read_events(&peers[0], &peers[1]);

let peer_0 = peers[0].peers.lock().unwrap();
let peer_1 = peers[1].peers.lock().unwrap();

let peer_0_features = peer_1.peers.get(&fd_1_to_0).unwrap().their_features.as_ref();
let peer_1_features = peer_0.peers.get(&fd_0_to_1).unwrap().their_features.as_ref();

assert!(peer_0_features.unwrap().initial_routing_sync());
assert!(!peer_1_features.unwrap().initial_routing_sync());
}

// Outbound peer 1 requests initial_routing_sync, but inbound peer 0 does not.
{
let cfgs = create_peermgr_cfgs(2);
cfgs[1].routing_handler.request_full_sync.store(true, Ordering::Release);
let peers = create_network(2, &cfgs);
let (fd_0_to_1, fd_1_to_0) = establish_connection_and_read_events(&peers[0], &peers[1]);

let peer_0 = peers[0].peers.lock().unwrap();
let peer_1 = peers[1].peers.lock().unwrap();

let peer_0_features = peer_1.peers.get(&fd_1_to_0).unwrap().their_features.as_ref();
let peer_1_features = peer_0.peers.get(&fd_0_to_1).unwrap().their_features.as_ref();

assert!(!peer_0_features.unwrap().initial_routing_sync());
assert!(peer_1_features.unwrap().initial_routing_sync());
}
}
}
Loading