Skip to content

Commit e608514

Browse files
committed
Add gossip_queries methods to RoutingMessageHandler trait
Defines message handlers for gossip_queries messages in the RoutingMessageHandler trait. The MessageSendEventsProvider supertrait is added to RoutingMessageHandler so that the implementor can use SendMessageEvents to send messages to a peer at the appropriate time. The trait methods are stubbed in NetGraphMsgHandler which implements RoutingMessageHandler and return a "not implemented" error.
1 parent d08e2d3 commit e608514

File tree

5 files changed

+135
-9
lines changed

5 files changed

+135
-9
lines changed

lightning-net-tokio/src/lib.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -536,6 +536,12 @@ mod tests {
536536
fn get_next_channel_announcements(&self, _starting_point: u64, _batch_amount: u8) -> Vec<(ChannelAnnouncement, Option<ChannelUpdate>, Option<ChannelUpdate>)> { Vec::new() }
537537
fn get_next_node_announcements(&self, _starting_point: Option<&PublicKey>, _batch_amount: u8) -> Vec<NodeAnnouncement> { Vec::new() }
538538
fn should_request_full_sync(&self, _node_id: &PublicKey) -> bool { false }
539+
fn query_channel_range(&self, _their_node_id: &PublicKey, _chain_hash: bitcoin::BlockHash, _first_blocknum: u32, _block_range: u32) -> Result<(), LightningError> { Ok(()) }
540+
fn query_short_channel_ids(&self, _their_node_id: &PublicKey, _chain_hash: bitcoin::BlockHash, _short_channel_ids: Vec<u64>) -> Result<(), LightningError> { Ok(()) }
541+
fn handle_reply_channel_range(&self, _their_node_id: &PublicKey, _msg: &ReplyChannelRange) -> Result<(), LightningError> { Ok(()) }
542+
fn handle_reply_short_channel_ids_end(&self, _their_node_id: &PublicKey, _msg: &ReplyShortChannelIdsEnd) -> Result<(), LightningError> { Ok(()) }
543+
fn handle_query_channel_range(&self, _their_node_id: &PublicKey, _msg: &QueryChannelRange) -> Result<(), LightningError> { Ok(()) }
544+
fn handle_query_short_channel_ids(&self, _their_node_id: &PublicKey, _msg: &QueryShortChannelIds) -> Result<(), LightningError> { Ok(()) }
539545
}
540546
impl ChannelMessageHandler for MsgHandler {
541547
fn handle_open_channel(&self, _their_node_id: &PublicKey, _their_features: InitFeatures, _msg: &OpenChannel) {}

lightning/src/ln/msgs.rs

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -804,7 +804,7 @@ pub trait ChannelMessageHandler : events::MessageSendEventsProvider + Send + Syn
804804
}
805805

806806
/// A trait to describe an object which can receive routing messages.
807-
pub trait RoutingMessageHandler : Send + Sync {
807+
pub trait RoutingMessageHandler : Send + Sync + events::MessageSendEventsProvider {
808808
/// Handle an incoming node_announcement message, returning true if it should be forwarded on,
809809
/// false or returning an Err otherwise.
810810
fn handle_node_announcement(&self, msg: &NodeAnnouncement) -> Result<bool, LightningError>;
@@ -827,6 +827,27 @@ pub trait RoutingMessageHandler : Send + Sync {
827827
fn get_next_node_announcements(&self, starting_point: Option<&PublicKey>, batch_amount: u8) -> Vec<NodeAnnouncement>;
828828
/// Returns whether a full sync should be requested from a peer.
829829
fn should_request_full_sync(&self, node_id: &PublicKey) -> bool;
830+
/// Queries a peer for a list of channels with a funding UTXO in the requested
831+
/// chain and range of blocks.
832+
fn query_channel_range(&self, their_node_id: &PublicKey, chain_hash: BlockHash, first_blocknum: u32, number_of_blocks: u32) -> Result<(), LightningError>;
833+
/// Queries a peer for routing gossip messages for a set of channels identified
834+
/// by their short_channel_ids.
835+
fn query_short_channel_ids(&self, their_node_id: &PublicKey, chain_hash: BlockHash, short_channel_ids: Vec<u64>) -> Result<(), LightningError>;
836+
/// Handles the reply of a query we initiated to learn about channels
837+
/// for a given range of blocks. We can expect to receive one or more
838+
/// replies to a single query.
839+
fn handle_reply_channel_range(&self, their_node_id: &PublicKey, msg: &ReplyChannelRange) -> Result<(), LightningError>;
840+
/// Handles the reply of a query we initiated asking for routing gossip
841+
/// messages for a list of channels. We should receive this message when
842+
/// a node has completed its best effort to send us the pertaining routing
843+
/// gossip messages.
844+
fn handle_reply_short_channel_ids_end(&self, their_node_id: &PublicKey, msg: &ReplyShortChannelIdsEnd) -> Result<(), LightningError>;
845+
/// Handles when a peer asks us to send a list of short_channel_ids
846+
/// for the requested range of blocks.
847+
fn handle_query_channel_range(&self, their_node_id: &PublicKey, msg: &QueryChannelRange) -> Result<(), LightningError>;
848+
/// Handles when a peer asks us to send routing gossip messages for a
849+
/// list of short_channel_ids.
850+
fn handle_query_short_channel_ids(&self, their_node_id: &PublicKey, msg: &QueryShortChannelIds) -> Result<(), LightningError>;
830851
}
831852

832853
mod fuzzy_internal_msgs {

lightning/src/ln/peer_handler.rs

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -841,17 +841,17 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
841841
// TODO: forward msg along to all our other peers!
842842
}
843843
},
844-
wire::Message::QueryShortChannelIds(_msg) => {
845-
// TODO: handle message
844+
wire::Message::QueryShortChannelIds(msg) => {
845+
self.message_handler.route_handler.handle_query_short_channel_ids(&peer.their_node_id.unwrap(), &msg)?;
846846
},
847-
wire::Message::ReplyShortChannelIdsEnd(_msg) => {
848-
// TODO: handle message
847+
wire::Message::ReplyShortChannelIdsEnd(msg) => {
848+
self.message_handler.route_handler.handle_reply_short_channel_ids_end(&peer.their_node_id.unwrap(), &msg)?;
849849
},
850-
wire::Message::QueryChannelRange(_msg) => {
851-
// TODO: handle message
850+
wire::Message::QueryChannelRange(msg) => {
851+
self.message_handler.route_handler.handle_query_channel_range(&peer.their_node_id.unwrap(), &msg)?;
852852
},
853-
wire::Message::ReplyChannelRange(_msg) => {
854-
// TODO: handle message
853+
wire::Message::ReplyChannelRange(msg) => {
854+
self.message_handler.route_handler.handle_reply_channel_range(&peer.their_node_id.unwrap(), &msg)?;
855855
},
856856
wire::Message::GossipTimestampFilter(_msg) => {
857857
// TODO: handle message
@@ -880,6 +880,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
880880
// drop optional-ish messages when send buffers get full!
881881

882882
let mut events_generated = self.message_handler.chan_handler.get_and_clear_pending_msg_events();
883+
events_generated.append(&mut self.message_handler.route_handler.get_and_clear_pending_msg_events());
883884
let mut peers_lock = self.peers.lock().unwrap();
884885
let peers = &mut *peers_lock;
885886
for event in events_generated.drain(..) {

lightning/src/routing/network_graph.rs

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,19 +18,23 @@ use bitcoin::hashes::Hash;
1818
use bitcoin::blockdata::script::Builder;
1919
use bitcoin::blockdata::transaction::TxOut;
2020
use bitcoin::blockdata::opcodes;
21+
use bitcoin::hash_types::BlockHash;
2122

2223
use chain;
2324
use chain::Access;
2425
use ln::features::{ChannelFeatures, NodeFeatures};
2526
use ln::msgs::{DecodeError, ErrorAction, LightningError, RoutingMessageHandler, NetAddress, MAX_VALUE_MSAT};
2627
use ln::msgs::{ChannelAnnouncement, ChannelUpdate, NodeAnnouncement, OptionalField};
28+
use ln::msgs::{QueryChannelRange, ReplyChannelRange, QueryShortChannelIds, ReplyShortChannelIdsEnd};
2729
use ln::msgs;
2830
use util::ser::{Writeable, Readable, Writer};
2931
use util::logger::Logger;
32+
use util::events;
3033

3134
use std::{cmp, fmt};
3235
use std::sync::{RwLock, RwLockReadGuard};
3336
use std::sync::atomic::{AtomicUsize, Ordering};
37+
use std::sync::Mutex;
3438
use std::collections::BTreeMap;
3539
use std::collections::btree_map::Entry as BtreeEntry;
3640
use std::ops::Deref;
@@ -59,6 +63,7 @@ pub struct NetGraphMsgHandler<C: Deref, L: Deref> where C::Target: chain::Access
5963
pub network_graph: RwLock<NetworkGraph>,
6064
chain_access: Option<C>,
6165
full_syncs_requested: AtomicUsize,
66+
pending_events: Mutex<Vec<events::MessageSendEvent>>,
6267
logger: L,
6368
}
6469

@@ -77,6 +82,7 @@ impl<C: Deref, L: Deref> NetGraphMsgHandler<C, L> where C::Target: chain::Access
7782
}),
7883
full_syncs_requested: AtomicUsize::new(0),
7984
chain_access,
85+
pending_events: Mutex::new(vec![]),
8086
logger,
8187
}
8288
}
@@ -89,6 +95,7 @@ impl<C: Deref, L: Deref> NetGraphMsgHandler<C, L> where C::Target: chain::Access
8995
network_graph: RwLock::new(network_graph),
9096
full_syncs_requested: AtomicUsize::new(0),
9197
chain_access,
98+
pending_events: Mutex::new(vec![]),
9299
logger,
93100
}
94101
}
@@ -212,6 +219,67 @@ impl<C: Deref + Sync + Send, L: Deref + Sync + Send> RoutingMessageHandler for N
212219
false
213220
}
214221
}
222+
223+
fn query_channel_range(&self, _their_node_id: &PublicKey, _chain_hash: BlockHash, _first_blocknum: u32, _number_of_blocks: u32) -> Result<(), LightningError> {
224+
// TODO
225+
Err(LightningError {
226+
err: String::from("Not implemented"),
227+
action: ErrorAction::IgnoreError,
228+
})
229+
}
230+
231+
fn query_short_channel_ids(&self, _their_node_id: &PublicKey, _chain_hash: BlockHash, _short_channel_ids: Vec<u64>) -> Result<(), LightningError> {
232+
// TODO
233+
Err(LightningError {
234+
err: String::from("Not implemented"),
235+
action: ErrorAction::IgnoreError,
236+
})
237+
}
238+
239+
fn handle_reply_channel_range(&self, _their_node_id: &PublicKey, _msg: &ReplyChannelRange) -> Result<(), LightningError> {
240+
// TODO
241+
Err(LightningError {
242+
err: String::from("Not implemented"),
243+
action: ErrorAction::IgnoreError,
244+
})
245+
}
246+
247+
fn handle_reply_short_channel_ids_end(&self, _their_node_id: &PublicKey, _msg: &ReplyShortChannelIdsEnd) -> Result<(), LightningError> {
248+
// TODO
249+
Err(LightningError {
250+
err: String::from("Not implemented"),
251+
action: ErrorAction::IgnoreError,
252+
})
253+
}
254+
255+
fn handle_query_channel_range(&self, _their_node_id: &PublicKey, _msg: &QueryChannelRange) -> Result<(), LightningError> {
256+
// TODO
257+
Err(LightningError {
258+
err: String::from("Not implemented"),
259+
action: ErrorAction::IgnoreError,
260+
})
261+
}
262+
263+
fn handle_query_short_channel_ids(&self, _their_node_id: &PublicKey, _msg: &QueryShortChannelIds) -> Result<(), LightningError> {
264+
// TODO
265+
Err(LightningError {
266+
err: String::from("Not implemented"),
267+
action: ErrorAction::IgnoreError,
268+
})
269+
}
270+
}
271+
272+
impl<C: Deref, L: Deref> events::MessageSendEventsProvider for NetGraphMsgHandler<C, L>
273+
where
274+
C::Target: chain::Access,
275+
L::Target: Logger,
276+
{
277+
fn get_and_clear_pending_msg_events(&self) -> Vec<events::MessageSendEvent> {
278+
let mut ret = Vec::new();
279+
let mut pending_events = self.pending_events.lock().unwrap();
280+
std::mem::swap(&mut ret, &mut pending_events);
281+
ret
282+
}
215283
}
216284

217285
#[derive(PartialEq, Debug)]

lightning/src/util/test_utils.rs

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -319,6 +319,36 @@ impl msgs::RoutingMessageHandler for TestRoutingMessageHandler {
319319
fn should_request_full_sync(&self, _node_id: &PublicKey) -> bool {
320320
self.request_full_sync.load(Ordering::Acquire)
321321
}
322+
323+
fn query_channel_range(&self, _their_node_id: &PublicKey, _chain_hash: BlockHash, _first_blocknum: u32, _number_of_blocks: u32) -> Result<(), msgs::LightningError> {
324+
Ok(())
325+
}
326+
327+
fn query_short_channel_ids(&self, _their_node_id: &PublicKey, _chain_hash: BlockHash, _short_channel_ids: Vec<u64>) -> Result<(), msgs::LightningError> {
328+
Ok(())
329+
}
330+
331+
fn handle_reply_channel_range(&self, _their_node_id: &PublicKey, _msg: &msgs::ReplyChannelRange) -> Result<(), msgs::LightningError> {
332+
Ok(())
333+
}
334+
335+
fn handle_reply_short_channel_ids_end(&self, _their_node_id: &PublicKey, _msg: &msgs::ReplyShortChannelIdsEnd) -> Result<(), msgs::LightningError> {
336+
Ok(())
337+
}
338+
339+
fn handle_query_channel_range(&self, _their_node_id: &PublicKey, _msg: &msgs::QueryChannelRange) -> Result<(), msgs::LightningError> {
340+
Ok(())
341+
}
342+
343+
fn handle_query_short_channel_ids(&self, _their_node_id: &PublicKey, _msg: &msgs::QueryShortChannelIds) -> Result<(), msgs::LightningError> {
344+
Ok(())
345+
}
346+
}
347+
348+
impl events::MessageSendEventsProvider for TestRoutingMessageHandler {
349+
fn get_and_clear_pending_msg_events(&self) -> Vec<events::MessageSendEvent> {
350+
vec![]
351+
}
322352
}
323353

324354
pub struct TestLogger {

0 commit comments

Comments
 (0)