Skip to content

Commit 356ec9b

Browse files
Add boilerplate for sending and receiving onion messages in PeerManager
Adds the boilerplate needed for PeerManager and OnionMessenger to work together, with some corresponding docs and misc updates mostly due to the PeerManager public API changing.
1 parent 12687d7 commit 356ec9b

File tree

9 files changed

+120
-43
lines changed

9 files changed

+120
-43
lines changed

fuzz/src/full_stack.rs

+2-1
Original file line numberDiff line numberDiff line change
@@ -166,7 +166,7 @@ type ChannelMan = ChannelManager<
166166
EnforcingSigner,
167167
Arc<chainmonitor::ChainMonitor<EnforcingSigner, Arc<dyn chain::Filter>, Arc<TestBroadcaster>, Arc<FuzzEstimator>, Arc<dyn Logger>, Arc<TestPersister>>>,
168168
Arc<TestBroadcaster>, Arc<KeyProvider>, Arc<FuzzEstimator>, Arc<dyn Logger>>;
169-
type PeerMan<'a> = PeerManager<Peer<'a>, Arc<ChannelMan>, Arc<P2PGossipSync<Arc<NetworkGraph<Arc<dyn Logger>>>, Arc<dyn chain::Access>, Arc<dyn Logger>>>, Arc<dyn Logger>, IgnoringMessageHandler>;
169+
type PeerMan<'a> = PeerManager<Peer<'a>, Arc<ChannelMan>, Arc<P2PGossipSync<Arc<NetworkGraph<Arc<dyn Logger>>>, Arc<dyn chain::Access>, Arc<dyn Logger>>>, IgnoringMessageHandler, Arc<dyn Logger>, IgnoringMessageHandler>;
170170

171171
struct MoneyLossDetector<'a> {
172172
manager: Arc<ChannelMan>,
@@ -414,6 +414,7 @@ pub fn do_test(data: &[u8], logger: &Arc<dyn Logger>) {
414414
let mut loss_detector = MoneyLossDetector::new(&peers, channelmanager.clone(), monitor.clone(), PeerManager::new(MessageHandler {
415415
chan_handler: channelmanager.clone(),
416416
route_handler: gossip_sync.clone(),
417+
onion_message_handler: IgnoringMessageHandler {},
417418
}, our_network_key, &[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 15, 0], Arc::clone(&logger), IgnoringMessageHandler{}));
418419

419420
let mut should_forward = false;

lightning-background-processor/src/lib.rs

+6-4
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ use lightning::chain::chaininterface::{BroadcasterInterface, FeeEstimator};
1919
use lightning::chain::chainmonitor::{ChainMonitor, Persist};
2020
use lightning::chain::keysinterface::{Sign, KeysInterface};
2121
use lightning::ln::channelmanager::ChannelManager;
22-
use lightning::ln::msgs::{ChannelMessageHandler, RoutingMessageHandler};
22+
use lightning::ln::msgs::{ChannelMessageHandler, OnionMessageHandler, RoutingMessageHandler};
2323
use lightning::ln::peer_handler::{CustomMessageHandler, PeerManager, SocketDescriptor};
2424
use lightning::routing::gossip::{NetworkGraph, P2PGossipSync};
2525
use lightning::routing::scoring::WriteableScore;
@@ -281,6 +281,7 @@ impl BackgroundProcessor {
281281
P: 'static + Deref + Send + Sync,
282282
Descriptor: 'static + SocketDescriptor + Send + Sync,
283283
CMH: 'static + Deref + Send + Sync,
284+
OMH: 'static + Deref + Send + Sync,
284285
RMH: 'static + Deref + Send + Sync,
285286
EH: 'static + EventHandler + Send,
286287
PS: 'static + Deref + Send,
@@ -289,7 +290,7 @@ impl BackgroundProcessor {
289290
PGS: 'static + Deref<Target = P2PGossipSync<G, CA, L>> + Send + Sync,
290291
RGS: 'static + Deref<Target = RapidGossipSync<G, L>> + Send,
291292
UMH: 'static + Deref + Send + Sync,
292-
PM: 'static + Deref<Target = PeerManager<Descriptor, CMH, RMH, L, UMH>> + Send + Sync,
293+
PM: 'static + Deref<Target = PeerManager<Descriptor, CMH, RMH, OMH, L, UMH>> + Send + Sync,
293294
S: 'static + Deref<Target = SC> + Send + Sync,
294295
SC: WriteableScore<'a>,
295296
>(
@@ -306,6 +307,7 @@ impl BackgroundProcessor {
306307
L::Target: 'static + Logger,
307308
P::Target: 'static + Persist<Signer>,
308309
CMH::Target: 'static + ChannelMessageHandler,
310+
OMH::Target: 'static + OnionMessageHandler,
309311
RMH::Target: 'static + RoutingMessageHandler,
310312
UMH::Target: 'static + CustomMessageHandler,
311313
PS::Target: 'static + Persister<'a, Signer, CW, T, K, F, L, SC>,
@@ -544,7 +546,7 @@ mod tests {
544546
node: Arc<SimpleArcChannelManager<ChainMonitor, test_utils::TestBroadcaster, test_utils::TestFeeEstimator, test_utils::TestLogger>>,
545547
p2p_gossip_sync: PGS,
546548
rapid_gossip_sync: RGS,
547-
peer_manager: Arc<PeerManager<TestDescriptor, Arc<test_utils::TestChannelMessageHandler>, Arc<test_utils::TestRoutingMessageHandler>, Arc<test_utils::TestLogger>, IgnoringMessageHandler>>,
549+
peer_manager: Arc<PeerManager<TestDescriptor, Arc<test_utils::TestChannelMessageHandler>, Arc<test_utils::TestRoutingMessageHandler>, IgnoringMessageHandler, Arc<test_utils::TestLogger>, IgnoringMessageHandler>>,
548550
chain_monitor: Arc<ChainMonitor>,
549551
persister: Arc<FilesystemPersister>,
550552
tx_broadcaster: Arc<test_utils::TestBroadcaster>,
@@ -663,7 +665,7 @@ mod tests {
663665
let network_graph = Arc::new(NetworkGraph::new(genesis_block.header.block_hash(), logger.clone()));
664666
let p2p_gossip_sync = Arc::new(P2PGossipSync::new(network_graph.clone(), Some(chain_source.clone()), logger.clone()));
665667
let rapid_gossip_sync = Arc::new(RapidGossipSync::new(network_graph.clone()));
666-
let msg_handler = MessageHandler { chan_handler: Arc::new(test_utils::TestChannelMessageHandler::new()), route_handler: Arc::new(test_utils::TestRoutingMessageHandler::new() )};
668+
let msg_handler = MessageHandler { chan_handler: Arc::new(test_utils::TestChannelMessageHandler::new()), route_handler: Arc::new(test_utils::TestRoutingMessageHandler::new()), onion_message_handler: IgnoringMessageHandler{}};
667669
let peer_manager = Arc::new(PeerManager::new(msg_handler, keys_manager.get_node_secret(Recipient::Node).unwrap(), &seed, logger.clone(), IgnoringMessageHandler{}));
668670
let scorer = Arc::new(Mutex::new(test_utils::TestScorer::with_penalty(0)));
669671
let node = Node { node: manager, p2p_gossip_sync, rapid_gossip_sync, peer_manager, chain_monitor, persister, tx_broadcaster, network_graph, logger, best_block, scorer };

lightning-net-tokio/src/lib.rs

+19-6
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ use tokio::io::{AsyncReadExt, AsyncWrite, AsyncWriteExt};
8383
use lightning::ln::peer_handler;
8484
use lightning::ln::peer_handler::SocketDescriptor as LnSocketTrait;
8585
use lightning::ln::peer_handler::CustomMessageHandler;
86-
use lightning::ln::msgs::{ChannelMessageHandler, RoutingMessageHandler, NetAddress};
86+
use lightning::ln::msgs::{ChannelMessageHandler, NetAddress, OnionMessageHandler, RoutingMessageHandler};
8787
use lightning::util::logger::Logger;
8888

8989
use std::ops::Deref;
@@ -123,13 +123,15 @@ struct Connection {
123123
id: u64,
124124
}
125125
impl Connection {
126-
async fn poll_event_process<CMH, RMH, L, UMH>(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor, CMH, RMH, L, UMH>>, mut event_receiver: mpsc::Receiver<()>) where
126+
async fn poll_event_process<CMH, RMH, OMH, L, UMH>(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor, CMH, RMH, OMH, L, UMH>>, mut event_receiver: mpsc::Receiver<()>) where
127127
CMH: Deref + 'static + Send + Sync,
128128
RMH: Deref + 'static + Send + Sync,
129+
OMH: Deref + 'static + Send + Sync,
129130
L: Deref + 'static + Send + Sync,
130131
UMH: Deref + 'static + Send + Sync,
131132
CMH::Target: ChannelMessageHandler + Send + Sync,
132133
RMH::Target: RoutingMessageHandler + Send + Sync,
134+
OMH::Target: OnionMessageHandler + Send + Sync,
133135
L::Target: Logger + Send + Sync,
134136
UMH::Target: CustomMessageHandler + Send + Sync,
135137
{
@@ -141,13 +143,15 @@ impl Connection {
141143
}
142144
}
143145

144-
async fn schedule_read<CMH, RMH, L, UMH>(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor, CMH, RMH, L, UMH>>, us: Arc<Mutex<Self>>, mut reader: io::ReadHalf<TcpStream>, mut read_wake_receiver: mpsc::Receiver<()>, mut write_avail_receiver: mpsc::Receiver<()>) where
146+
async fn schedule_read<CMH, RMH, OMH, L, UMH>(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor, CMH, RMH, OMH, L, UMH>>, us: Arc<Mutex<Self>>, mut reader: io::ReadHalf<TcpStream>, mut read_wake_receiver: mpsc::Receiver<()>, mut write_avail_receiver: mpsc::Receiver<()>) where
145147
CMH: Deref + 'static + Send + Sync,
146148
RMH: Deref + 'static + Send + Sync,
149+
OMH: Deref + 'static + Send + Sync,
147150
L: Deref + 'static + Send + Sync,
148151
UMH: Deref + 'static + Send + Sync,
149152
CMH::Target: ChannelMessageHandler + 'static + Send + Sync,
150153
RMH::Target: RoutingMessageHandler + 'static + Send + Sync,
154+
OMH::Target: OnionMessageHandler + 'static + Send + Sync,
151155
L::Target: Logger + 'static + Send + Sync,
152156
UMH::Target: CustomMessageHandler + 'static + Send + Sync,
153157
{
@@ -268,13 +272,15 @@ fn get_addr_from_stream(stream: &StdTcpStream) -> Option<NetAddress> {
268272
/// The returned future will complete when the peer is disconnected and associated handling
269273
/// futures are freed, though, because all processing futures are spawned with tokio::spawn, you do
270274
/// not need to poll the provided future in order to make progress.
271-
pub fn setup_inbound<CMH, RMH, L, UMH>(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor, CMH, RMH, L, UMH>>, stream: StdTcpStream) -> impl std::future::Future<Output=()> where
275+
pub fn setup_inbound<CMH, RMH, OMH, L, UMH>(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor, CMH, RMH, OMH, L, UMH>>, stream: StdTcpStream) -> impl std::future::Future<Output=()> where
272276
CMH: Deref + 'static + Send + Sync,
273277
RMH: Deref + 'static + Send + Sync,
278+
OMH: Deref + 'static + Send + Sync,
274279
L: Deref + 'static + Send + Sync,
275280
UMH: Deref + 'static + Send + Sync,
276281
CMH::Target: ChannelMessageHandler + Send + Sync,
277282
RMH::Target: RoutingMessageHandler + Send + Sync,
283+
OMH::Target: OnionMessageHandler + Send + Sync,
278284
L::Target: Logger + Send + Sync,
279285
UMH::Target: CustomMessageHandler + Send + Sync,
280286
{
@@ -315,13 +321,15 @@ pub fn setup_inbound<CMH, RMH, L, UMH>(peer_manager: Arc<peer_handler::PeerManag
315321
/// The returned future will complete when the peer is disconnected and associated handling
316322
/// futures are freed, though, because all processing futures are spawned with tokio::spawn, you do
317323
/// not need to poll the provided future in order to make progress.
318-
pub fn setup_outbound<CMH, RMH, L, UMH>(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor, CMH, RMH, L, UMH>>, their_node_id: PublicKey, stream: StdTcpStream) -> impl std::future::Future<Output=()> where
324+
pub fn setup_outbound<CMH, RMH, OMH, L, UMH>(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor, CMH, RMH, OMH, L, UMH>>, their_node_id: PublicKey, stream: StdTcpStream) -> impl std::future::Future<Output=()> where
319325
CMH: Deref + 'static + Send + Sync,
320326
RMH: Deref + 'static + Send + Sync,
327+
OMH: Deref + 'static + Send + Sync,
321328
L: Deref + 'static + Send + Sync,
322329
UMH: Deref + 'static + Send + Sync,
323330
CMH::Target: ChannelMessageHandler + Send + Sync,
324331
RMH::Target: RoutingMessageHandler + Send + Sync,
332+
OMH::Target: OnionMessageHandler + Send + Sync,
325333
L::Target: Logger + Send + Sync,
326334
UMH::Target: CustomMessageHandler + Send + Sync,
327335
{
@@ -391,13 +399,15 @@ pub fn setup_outbound<CMH, RMH, L, UMH>(peer_manager: Arc<peer_handler::PeerMana
391399
/// disconnected and associated handling futures are freed, though, because all processing in said
392400
/// futures are spawned with tokio::spawn, you do not need to poll the second future in order to
393401
/// make progress.
394-
pub async fn connect_outbound<CMH, RMH, L, UMH>(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor, CMH, RMH, L, UMH>>, their_node_id: PublicKey, addr: SocketAddr) -> Option<impl std::future::Future<Output=()>> where
402+
pub async fn connect_outbound<CMH, RMH, OMH, L, UMH>(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor, CMH, RMH, OMH, L, UMH>>, their_node_id: PublicKey, addr: SocketAddr) -> Option<impl std::future::Future<Output=()>> where
395403
CMH: Deref + 'static + Send + Sync,
396404
RMH: Deref + 'static + Send + Sync,
405+
OMH: Deref + 'static + Send + Sync,
397406
L: Deref + 'static + Send + Sync,
398407
UMH: Deref + 'static + Send + Sync,
399408
CMH::Target: ChannelMessageHandler + Send + Sync,
400409
RMH::Target: RoutingMessageHandler + Send + Sync,
410+
OMH::Target: OnionMessageHandler + Send + Sync,
401411
L::Target: Logger + Send + Sync,
402412
UMH::Target: CustomMessageHandler + Send + Sync,
403413
{
@@ -646,6 +656,7 @@ mod tests {
646656
let a_manager = Arc::new(PeerManager::new(MessageHandler {
647657
chan_handler: Arc::clone(&a_handler),
648658
route_handler: Arc::clone(&a_handler),
659+
onion_message_handler: Arc::new(lightning::ln::peer_handler::IgnoringMessageHandler{}),
649660
}, a_key.clone(), &[1; 32], Arc::new(TestLogger()), Arc::new(lightning::ln::peer_handler::IgnoringMessageHandler{})));
650661

651662
let (b_connected_sender, mut b_connected) = mpsc::channel(1);
@@ -660,6 +671,7 @@ mod tests {
660671
let b_manager = Arc::new(PeerManager::new(MessageHandler {
661672
chan_handler: Arc::clone(&b_handler),
662673
route_handler: Arc::clone(&b_handler),
674+
onion_message_handler: Arc::new(lightning::ln::peer_handler::IgnoringMessageHandler{}),
663675
}, b_key.clone(), &[2; 32], Arc::new(TestLogger()), Arc::new(lightning::ln::peer_handler::IgnoringMessageHandler{})));
664676

665677
// We bind on localhost, hoping the environment is properly configured with a local
@@ -711,6 +723,7 @@ mod tests {
711723

712724
let a_manager = Arc::new(PeerManager::new(MessageHandler {
713725
chan_handler: Arc::new(lightning::ln::peer_handler::ErroringMessageHandler::new()),
726+
onion_message_handler: Arc::new(lightning::ln::peer_handler::IgnoringMessageHandler{}),
714727
route_handler: Arc::new(lightning::ln::peer_handler::IgnoringMessageHandler{}),
715728
}, a_key, &[1; 32], Arc::new(TestLogger()), Arc::new(lightning::ln::peer_handler::IgnoringMessageHandler{})));
716729

lightning/src/ln/msgs.rs

+7-1
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ use core::fmt::Debug;
4040
use io::{self, Read};
4141
use io_extras::read_to_end;
4242

43-
use util::events::MessageSendEventsProvider;
43+
use util::events::{MessageSendEventsProvider, OnionMessageProvider};
4444
use util::logger;
4545
use util::ser::{BigSize, LengthReadable, Readable, ReadableArgs, Writeable, Writer, FixedLengthReader, HighZeroBytesDroppedBigSize, Hostname};
4646

@@ -945,6 +945,12 @@ pub trait RoutingMessageHandler : MessageSendEventsProvider {
945945
fn handle_query_short_channel_ids(&self, their_node_id: &PublicKey, msg: QueryShortChannelIds) -> Result<(), LightningError>;
946946
}
947947

948+
/// A trait to describe an object that can receive onion messages.
949+
pub trait OnionMessageHandler : OnionMessageProvider {
950+
/// Handle an incoming onion_message message from the given peer.
951+
fn handle_onion_message(&self, peer_node_id: &PublicKey, msg: &OnionMessage);
952+
}
953+
948954
mod fuzzy_internal_msgs {
949955
use prelude::*;
950956
use ln::{PaymentPreimage, PaymentSecret};

0 commit comments

Comments
 (0)