Skip to content

Commit 4894f4f

Browse files
Send and receive onion messages in PeerManager
Largely, this adds the boilerplate needed for PeerManager and OnionMessenger to work together on sending and receiving and replaces the stopgaps added in #1604.
1 parent 1755492 commit 4894f4f

File tree

10 files changed

+146
-77
lines changed

10 files changed

+146
-77
lines changed

fuzz/src/full_stack.rs

+2-1
Original file line numberDiff line numberDiff line change
@@ -164,7 +164,7 @@ type ChannelMan = ChannelManager<
164164
EnforcingSigner,
165165
Arc<chainmonitor::ChainMonitor<EnforcingSigner, Arc<dyn chain::Filter>, Arc<TestBroadcaster>, Arc<FuzzEstimator>, Arc<dyn Logger>, Arc<TestPersister>>>,
166166
Arc<TestBroadcaster>, Arc<KeyProvider>, Arc<FuzzEstimator>, Arc<dyn Logger>>;
167-
type PeerMan<'a> = PeerManager<Peer<'a>, Arc<ChannelMan>, Arc<P2PGossipSync<Arc<NetworkGraph<Arc<dyn Logger>>>, Arc<dyn chain::Access>, Arc<dyn Logger>>>, Arc<dyn Logger>, IgnoringMessageHandler>;
167+
type PeerMan<'a> = PeerManager<Peer<'a>, Arc<ChannelMan>, Arc<P2PGossipSync<Arc<NetworkGraph<Arc<dyn Logger>>>, Arc<dyn chain::Access>, Arc<dyn Logger>>>, IgnoringMessageHandler, Arc<dyn Logger>, IgnoringMessageHandler>;
168168

169169
struct MoneyLossDetector<'a> {
170170
manager: Arc<ChannelMan>,
@@ -412,6 +412,7 @@ pub fn do_test(data: &[u8], logger: &Arc<dyn Logger>) {
412412
let mut loss_detector = MoneyLossDetector::new(&peers, channelmanager.clone(), monitor.clone(), PeerManager::new(MessageHandler {
413413
chan_handler: channelmanager.clone(),
414414
route_handler: gossip_sync.clone(),
415+
onion_message_handler: IgnoringMessageHandler {},
415416
}, our_network_key, &[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 15, 0], Arc::clone(&logger), IgnoringMessageHandler{}));
416417

417418
let mut should_forward = false;

lightning-background-processor/src/lib.rs

+6-4
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ use lightning::chain::chaininterface::{BroadcasterInterface, FeeEstimator};
1616
use lightning::chain::chainmonitor::{ChainMonitor, Persist};
1717
use lightning::chain::keysinterface::{Sign, KeysInterface};
1818
use lightning::ln::channelmanager::ChannelManager;
19-
use lightning::ln::msgs::{ChannelMessageHandler, RoutingMessageHandler};
19+
use lightning::ln::msgs::{ChannelMessageHandler, OnionMessageHandler, RoutingMessageHandler};
2020
use lightning::ln::peer_handler::{CustomMessageHandler, PeerManager, SocketDescriptor};
2121
use lightning::routing::gossip::{NetworkGraph, P2PGossipSync};
2222
use lightning::routing::scoring::WriteableScore;
@@ -278,6 +278,7 @@ impl BackgroundProcessor {
278278
P: 'static + Deref + Send + Sync,
279279
Descriptor: 'static + SocketDescriptor + Send + Sync,
280280
CMH: 'static + Deref + Send + Sync,
281+
OMH: 'static + Deref + Send + Sync,
281282
RMH: 'static + Deref + Send + Sync,
282283
EH: 'static + EventHandler + Send,
283284
PS: 'static + Deref + Send,
@@ -286,7 +287,7 @@ impl BackgroundProcessor {
286287
PGS: 'static + Deref<Target = P2PGossipSync<G, CA, L>> + Send + Sync,
287288
RGS: 'static + Deref<Target = RapidGossipSync<G, L>> + Send,
288289
UMH: 'static + Deref + Send + Sync,
289-
PM: 'static + Deref<Target = PeerManager<Descriptor, CMH, RMH, L, UMH>> + Send + Sync,
290+
PM: 'static + Deref<Target = PeerManager<Descriptor, CMH, RMH, OMH, L, UMH>> + Send + Sync,
290291
S: 'static + Deref<Target = SC> + Send + Sync,
291292
SC: WriteableScore<'a>,
292293
>(
@@ -303,6 +304,7 @@ impl BackgroundProcessor {
303304
L::Target: 'static + Logger,
304305
P::Target: 'static + Persist<Signer>,
305306
CMH::Target: 'static + ChannelMessageHandler,
307+
OMH::Target: 'static + OnionMessageHandler,
306308
RMH::Target: 'static + RoutingMessageHandler,
307309
UMH::Target: 'static + CustomMessageHandler,
308310
PS::Target: 'static + Persister<'a, Signer, CW, T, K, F, L, SC>,
@@ -538,7 +540,7 @@ mod tests {
538540
node: Arc<SimpleArcChannelManager<ChainMonitor, test_utils::TestBroadcaster, test_utils::TestFeeEstimator, test_utils::TestLogger>>,
539541
p2p_gossip_sync: PGS,
540542
rapid_gossip_sync: RGS,
541-
peer_manager: Arc<PeerManager<TestDescriptor, Arc<test_utils::TestChannelMessageHandler>, Arc<test_utils::TestRoutingMessageHandler>, Arc<test_utils::TestLogger>, IgnoringMessageHandler>>,
543+
peer_manager: Arc<PeerManager<TestDescriptor, Arc<test_utils::TestChannelMessageHandler>, Arc<test_utils::TestRoutingMessageHandler>, IgnoringMessageHandler, Arc<test_utils::TestLogger>, IgnoringMessageHandler>>,
542544
chain_monitor: Arc<ChainMonitor>,
543545
persister: Arc<FilesystemPersister>,
544546
tx_broadcaster: Arc<test_utils::TestBroadcaster>,
@@ -657,7 +659,7 @@ mod tests {
657659
let network_graph = Arc::new(NetworkGraph::new(genesis_block.header.block_hash(), logger.clone()));
658660
let p2p_gossip_sync = Arc::new(P2PGossipSync::new(network_graph.clone(), Some(chain_source.clone()), logger.clone()));
659661
let rapid_gossip_sync = Arc::new(RapidGossipSync::new(network_graph.clone()));
660-
let msg_handler = MessageHandler { chan_handler: Arc::new(test_utils::TestChannelMessageHandler::new()), route_handler: Arc::new(test_utils::TestRoutingMessageHandler::new() )};
662+
let msg_handler = MessageHandler { chan_handler: Arc::new(test_utils::TestChannelMessageHandler::new()), route_handler: Arc::new(test_utils::TestRoutingMessageHandler::new()), onion_message_handler: IgnoringMessageHandler{}};
661663
let peer_manager = Arc::new(PeerManager::new(msg_handler, keys_manager.get_node_secret(Recipient::Node).unwrap(), &seed, logger.clone(), IgnoringMessageHandler{}));
662664
let scorer = Arc::new(Mutex::new(test_utils::TestScorer::with_penalty(0)));
663665
let node = Node { node: manager, p2p_gossip_sync, rapid_gossip_sync, peer_manager, chain_monitor, persister, tx_broadcaster, network_graph, logger, best_block, scorer };

lightning-net-tokio/src/lib.rs

+19-6
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ use tokio::io::{AsyncReadExt, AsyncWrite, AsyncWriteExt};
8181
use lightning::ln::peer_handler;
8282
use lightning::ln::peer_handler::SocketDescriptor as LnSocketTrait;
8383
use lightning::ln::peer_handler::CustomMessageHandler;
84-
use lightning::ln::msgs::{ChannelMessageHandler, RoutingMessageHandler, NetAddress};
84+
use lightning::ln::msgs::{ChannelMessageHandler, OnionMessageHandler, NetAddress, RoutingMessageHandler};
8585
use lightning::util::logger::Logger;
8686

8787
use std::ops::Deref;
@@ -121,13 +121,15 @@ struct Connection {
121121
id: u64,
122122
}
123123
impl Connection {
124-
async fn poll_event_process<CMH, RMH, L, UMH>(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor, CMH, RMH, L, UMH>>, mut event_receiver: mpsc::Receiver<()>) where
124+
async fn poll_event_process<CMH, RMH, OMH, L, UMH>(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor, CMH, RMH, OMH, L, UMH>>, mut event_receiver: mpsc::Receiver<()>) where
125125
CMH: Deref + 'static + Send + Sync,
126126
RMH: Deref + 'static + Send + Sync,
127+
OMH: Deref + 'static + Send + Sync,
127128
L: Deref + 'static + Send + Sync,
128129
UMH: Deref + 'static + Send + Sync,
129130
CMH::Target: ChannelMessageHandler + Send + Sync,
130131
RMH::Target: RoutingMessageHandler + Send + Sync,
132+
OMH::Target: OnionMessageHandler + Send + Sync,
131133
L::Target: Logger + Send + Sync,
132134
UMH::Target: CustomMessageHandler + Send + Sync,
133135
{
@@ -139,13 +141,15 @@ impl Connection {
139141
}
140142
}
141143

142-
async fn schedule_read<CMH, RMH, L, UMH>(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor, CMH, RMH, L, UMH>>, us: Arc<Mutex<Self>>, mut reader: io::ReadHalf<TcpStream>, mut read_wake_receiver: mpsc::Receiver<()>, mut write_avail_receiver: mpsc::Receiver<()>) where
144+
async fn schedule_read<CMH, RMH, OMH, L, UMH>(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor, CMH, RMH, OMH, L, UMH>>, us: Arc<Mutex<Self>>, mut reader: io::ReadHalf<TcpStream>, mut read_wake_receiver: mpsc::Receiver<()>, mut write_avail_receiver: mpsc::Receiver<()>) where
143145
CMH: Deref + 'static + Send + Sync,
144146
RMH: Deref + 'static + Send + Sync,
147+
OMH: Deref + 'static + Send + Sync,
145148
L: Deref + 'static + Send + Sync,
146149
UMH: Deref + 'static + Send + Sync,
147150
CMH::Target: ChannelMessageHandler + 'static + Send + Sync,
148151
RMH::Target: RoutingMessageHandler + 'static + Send + Sync,
152+
OMH::Target: OnionMessageHandler + 'static + Send + Sync,
149153
L::Target: Logger + 'static + Send + Sync,
150154
UMH::Target: CustomMessageHandler + 'static + Send + Sync,
151155
{
@@ -266,13 +270,15 @@ fn get_addr_from_stream(stream: &StdTcpStream) -> Option<NetAddress> {
266270
/// The returned future will complete when the peer is disconnected and associated handling
267271
/// futures are freed, though, because all processing futures are spawned with tokio::spawn, you do
268272
/// not need to poll the provided future in order to make progress.
269-
pub fn setup_inbound<CMH, RMH, L, UMH>(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor, CMH, RMH, L, UMH>>, stream: StdTcpStream) -> impl std::future::Future<Output=()> where
273+
pub fn setup_inbound<CMH, RMH, OMH, L, UMH>(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor, CMH, RMH, OMH, L, UMH>>, stream: StdTcpStream) -> impl std::future::Future<Output=()> where
270274
CMH: Deref + 'static + Send + Sync,
271275
RMH: Deref + 'static + Send + Sync,
276+
OMH: Deref + 'static + Send + Sync,
272277
L: Deref + 'static + Send + Sync,
273278
UMH: Deref + 'static + Send + Sync,
274279
CMH::Target: ChannelMessageHandler + Send + Sync,
275280
RMH::Target: RoutingMessageHandler + Send + Sync,
281+
OMH::Target: OnionMessageHandler + Send + Sync,
276282
L::Target: Logger + Send + Sync,
277283
UMH::Target: CustomMessageHandler + Send + Sync,
278284
{
@@ -313,13 +319,15 @@ pub fn setup_inbound<CMH, RMH, L, UMH>(peer_manager: Arc<peer_handler::PeerManag
313319
/// The returned future will complete when the peer is disconnected and associated handling
314320
/// futures are freed, though, because all processing futures are spawned with tokio::spawn, you do
315321
/// not need to poll the provided future in order to make progress.
316-
pub fn setup_outbound<CMH, RMH, L, UMH>(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor, CMH, RMH, L, UMH>>, their_node_id: PublicKey, stream: StdTcpStream) -> impl std::future::Future<Output=()> where
322+
pub fn setup_outbound<CMH, RMH, OMH, L, UMH>(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor, CMH, RMH, OMH, L, UMH>>, their_node_id: PublicKey, stream: StdTcpStream) -> impl std::future::Future<Output=()> where
317323
CMH: Deref + 'static + Send + Sync,
318324
RMH: Deref + 'static + Send + Sync,
325+
OMH: Deref + 'static + Send + Sync,
319326
L: Deref + 'static + Send + Sync,
320327
UMH: Deref + 'static + Send + Sync,
321328
CMH::Target: ChannelMessageHandler + Send + Sync,
322329
RMH::Target: RoutingMessageHandler + Send + Sync,
330+
OMH::Target: OnionMessageHandler + Send + Sync,
323331
L::Target: Logger + Send + Sync,
324332
UMH::Target: CustomMessageHandler + Send + Sync,
325333
{
@@ -389,13 +397,15 @@ pub fn setup_outbound<CMH, RMH, L, UMH>(peer_manager: Arc<peer_handler::PeerMana
389397
/// disconnected and associated handling futures are freed, though, because all processing in said
390398
/// futures are spawned with tokio::spawn, you do not need to poll the second future in order to
391399
/// make progress.
392-
pub async fn connect_outbound<CMH, RMH, L, UMH>(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor, CMH, RMH, L, UMH>>, their_node_id: PublicKey, addr: SocketAddr) -> Option<impl std::future::Future<Output=()>> where
400+
pub async fn connect_outbound<CMH, RMH, OMH, L, UMH>(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor, CMH, RMH, OMH, L, UMH>>, their_node_id: PublicKey, addr: SocketAddr) -> Option<impl std::future::Future<Output=()>> where
393401
CMH: Deref + 'static + Send + Sync,
394402
RMH: Deref + 'static + Send + Sync,
403+
OMH: Deref + 'static + Send + Sync,
395404
L: Deref + 'static + Send + Sync,
396405
UMH: Deref + 'static + Send + Sync,
397406
CMH::Target: ChannelMessageHandler + Send + Sync,
398407
RMH::Target: RoutingMessageHandler + Send + Sync,
408+
OMH::Target: OnionMessageHandler + Send + Sync,
399409
L::Target: Logger + Send + Sync,
400410
UMH::Target: CustomMessageHandler + Send + Sync,
401411
{
@@ -644,6 +654,7 @@ mod tests {
644654
let a_manager = Arc::new(PeerManager::new(MessageHandler {
645655
chan_handler: Arc::clone(&a_handler),
646656
route_handler: Arc::clone(&a_handler),
657+
onion_message_handler: Arc::new(lightning::ln::peer_handler::IgnoringMessageHandler{}),
647658
}, a_key.clone(), &[1; 32], Arc::new(TestLogger()), Arc::new(lightning::ln::peer_handler::IgnoringMessageHandler{})));
648659

649660
let (b_connected_sender, mut b_connected) = mpsc::channel(1);
@@ -658,6 +669,7 @@ mod tests {
658669
let b_manager = Arc::new(PeerManager::new(MessageHandler {
659670
chan_handler: Arc::clone(&b_handler),
660671
route_handler: Arc::clone(&b_handler),
672+
onion_message_handler: Arc::new(lightning::ln::peer_handler::IgnoringMessageHandler{}),
661673
}, b_key.clone(), &[2; 32], Arc::new(TestLogger()), Arc::new(lightning::ln::peer_handler::IgnoringMessageHandler{})));
662674

663675
// We bind on localhost, hoping the environment is properly configured with a local
@@ -709,6 +721,7 @@ mod tests {
709721

710722
let a_manager = Arc::new(PeerManager::new(MessageHandler {
711723
chan_handler: Arc::new(lightning::ln::peer_handler::ErroringMessageHandler::new()),
724+
onion_message_handler: Arc::new(lightning::ln::peer_handler::IgnoringMessageHandler{}),
712725
route_handler: Arc::new(lightning::ln::peer_handler::IgnoringMessageHandler{}),
713726
}, a_key, &[1; 32], Arc::new(TestLogger()), Arc::new(lightning::ln::peer_handler::IgnoringMessageHandler{})));
714727

lightning/src/ln/channelmanager.rs

+1
Original file line numberDiff line numberDiff line change
@@ -6153,6 +6153,7 @@ impl<Signer: Sign, M: Deref , T: Deref , K: Deref , F: Deref , L: Deref >
61536153
&events::MessageSendEvent::SendShortIdsQuery { .. } => false,
61546154
&events::MessageSendEvent::SendReplyChannelRange { .. } => false,
61556155
&events::MessageSendEvent::SendGossipTimestampFilter { .. } => false,
6156+
&events::MessageSendEvent::SendOnionMessage { .. } => false,
61566157
}
61576158
});
61586159
}

lightning/src/ln/msgs.rs

+7-1
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ use core::fmt::Debug;
4040
use io::{self, Read};
4141
use io_extras::read_to_end;
4242

43-
use util::events::MessageSendEventsProvider;
43+
use util::events::{MessageSendEventsProvider, OnionMessageProvider};
4444
use util::logger;
4545
use util::ser::{LengthReadable, Readable, ReadableArgs, Writeable, Writer, FixedLengthReader, HighZeroBytesDroppedVarInt, Hostname};
4646

@@ -945,6 +945,12 @@ pub trait RoutingMessageHandler : MessageSendEventsProvider {
945945
fn handle_query_short_channel_ids(&self, their_node_id: &PublicKey, msg: QueryShortChannelIds) -> Result<(), LightningError>;
946946
}
947947

948+
/// A trait to describe an object that can receive onion messages.
949+
pub trait OnionMessageHandler : OnionMessageProvider {
950+
/// Handle an incoming onion_message message from the given peer.
951+
fn handle_onion_message(&self, peer_node_id: &PublicKey, msg: &OnionMessage);
952+
}
953+
948954
mod fuzzy_internal_msgs {
949955
use prelude::*;
950956
use ln::{PaymentPreimage, PaymentSecret};

0 commit comments

Comments
 (0)