Skip to content

Commit 2185b8b

Browse files
Add boilerplate for OnionMessenger to send/receive msgs on the wire
1 parent 1eda181 commit 2185b8b

File tree

9 files changed

+116
-35
lines changed

9 files changed

+116
-35
lines changed

fuzz/src/full_stack.rs

+2-1
Original file line numberDiff line numberDiff line change
@@ -163,7 +163,7 @@ type ChannelMan = ChannelManager<
163163
EnforcingSigner,
164164
Arc<chainmonitor::ChainMonitor<EnforcingSigner, Arc<dyn chain::Filter>, Arc<TestBroadcaster>, Arc<FuzzEstimator>, Arc<dyn Logger>, Arc<TestPersister>>>,
165165
Arc<TestBroadcaster>, Arc<KeyProvider>, Arc<FuzzEstimator>, Arc<dyn Logger>>;
166-
type PeerMan<'a> = PeerManager<Peer<'a>, Arc<ChannelMan>, Arc<P2PGossipSync<Arc<NetworkGraph<Arc<dyn Logger>>>, Arc<dyn chain::Access>, Arc<dyn Logger>>>, Arc<dyn Logger>, IgnoringMessageHandler>;
166+
type PeerMan<'a> = PeerManager<Peer<'a>, Arc<ChannelMan>, Arc<P2PGossipSync<Arc<NetworkGraph<Arc<dyn Logger>>>, Arc<dyn chain::Access>, Arc<dyn Logger>>>, IgnoringMessageHandler, Arc<dyn Logger>, IgnoringMessageHandler>;
167167

168168
struct MoneyLossDetector<'a> {
169169
manager: Arc<ChannelMan>,
@@ -403,6 +403,7 @@ pub fn do_test(data: &[u8], logger: &Arc<dyn Logger>) {
403403
let mut loss_detector = MoneyLossDetector::new(&peers, channelmanager.clone(), monitor.clone(), PeerManager::new(MessageHandler {
404404
chan_handler: channelmanager.clone(),
405405
route_handler: gossip_sync.clone(),
406+
onion_message_handler: IgnoringMessageHandler {},
406407
}, our_network_key, &[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 15, 0], Arc::clone(&logger), IgnoringMessageHandler{}));
407408

408409
let mut should_forward = false;

lightning-background-processor/src/lib.rs

+6-4
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ use lightning::chain::chaininterface::{BroadcasterInterface, FeeEstimator};
1616
use lightning::chain::chainmonitor::{ChainMonitor, Persist};
1717
use lightning::chain::keysinterface::{Sign, KeysInterface};
1818
use lightning::ln::channelmanager::ChannelManager;
19-
use lightning::ln::msgs::{ChannelMessageHandler, RoutingMessageHandler};
19+
use lightning::ln::msgs::{ChannelMessageHandler, OnionMessageHandler, RoutingMessageHandler};
2020
use lightning::ln::peer_handler::{CustomMessageHandler, PeerManager, SocketDescriptor};
2121
use lightning::routing::gossip::{NetworkGraph, P2PGossipSync};
2222
use lightning::routing::scoring::WriteableScore;
@@ -229,6 +229,7 @@ impl BackgroundProcessor {
229229
P: 'static + Deref + Send + Sync,
230230
Descriptor: 'static + SocketDescriptor + Send + Sync,
231231
CMH: 'static + Deref + Send + Sync,
232+
OMH: 'static + Deref + Send + Sync,
232233
RMH: 'static + Deref + Send + Sync,
233234
EH: 'static + EventHandler + Send,
234235
PS: 'static + Deref + Send,
@@ -237,7 +238,7 @@ impl BackgroundProcessor {
237238
PGS: 'static + Deref<Target = P2PGossipSync<G, CA, L>> + Send + Sync,
238239
RGS: 'static + Deref<Target = RapidGossipSync<G, L>> + Send,
239240
UMH: 'static + Deref + Send + Sync,
240-
PM: 'static + Deref<Target = PeerManager<Descriptor, CMH, RMH, L, UMH>> + Send + Sync,
241+
PM: 'static + Deref<Target = PeerManager<Descriptor, CMH, RMH, OMH, L, UMH>> + Send + Sync,
241242
S: 'static + Deref<Target = SC> + Send + Sync,
242243
SC: WriteableScore<'a>,
243244
>(
@@ -254,6 +255,7 @@ impl BackgroundProcessor {
254255
L::Target: 'static + Logger,
255256
P::Target: 'static + Persist<Signer>,
256257
CMH::Target: 'static + ChannelMessageHandler,
258+
OMH::Target: 'static + OnionMessageHandler,
257259
RMH::Target: 'static + RoutingMessageHandler,
258260
UMH::Target: 'static + CustomMessageHandler,
259261
PS::Target: 'static + Persister<'a, Signer, CW, T, K, F, L, SC>,
@@ -489,7 +491,7 @@ mod tests {
489491
node: Arc<SimpleArcChannelManager<ChainMonitor, test_utils::TestBroadcaster, test_utils::TestFeeEstimator, test_utils::TestLogger>>,
490492
p2p_gossip_sync: PGS,
491493
rapid_gossip_sync: RGS,
492-
peer_manager: Arc<PeerManager<TestDescriptor, Arc<test_utils::TestChannelMessageHandler>, Arc<test_utils::TestRoutingMessageHandler>, Arc<test_utils::TestLogger>, IgnoringMessageHandler>>,
494+
peer_manager: Arc<PeerManager<TestDescriptor, Arc<test_utils::TestChannelMessageHandler>, Arc<test_utils::TestRoutingMessageHandler>, IgnoringMessageHandler, Arc<test_utils::TestLogger>, IgnoringMessageHandler>>,
493495
chain_monitor: Arc<ChainMonitor>,
494496
persister: Arc<FilesystemPersister>,
495497
tx_broadcaster: Arc<test_utils::TestBroadcaster>,
@@ -608,7 +610,7 @@ mod tests {
608610
let network_graph = Arc::new(NetworkGraph::new(genesis_block.header.block_hash(), logger.clone()));
609611
let p2p_gossip_sync = Arc::new(P2PGossipSync::new(network_graph.clone(), Some(chain_source.clone()), logger.clone()));
610612
let rapid_gossip_sync = Arc::new(RapidGossipSync::new(network_graph.clone()));
611-
let msg_handler = MessageHandler { chan_handler: Arc::new(test_utils::TestChannelMessageHandler::new()), route_handler: Arc::new(test_utils::TestRoutingMessageHandler::new() )};
613+
let msg_handler = MessageHandler { chan_handler: Arc::new(test_utils::TestChannelMessageHandler::new()), route_handler: Arc::new(test_utils::TestRoutingMessageHandler::new()), onion_message_handler: IgnoringMessageHandler{}};
612614
let peer_manager = Arc::new(PeerManager::new(msg_handler, keys_manager.get_node_secret(Recipient::Node).unwrap(), &seed, logger.clone(), IgnoringMessageHandler{}));
613615
let scorer = Arc::new(Mutex::new(test_utils::TestScorer::with_penalty(0)));
614616
let node = Node { node: manager, p2p_gossip_sync, rapid_gossip_sync, peer_manager, chain_monitor, persister, tx_broadcaster, network_graph, logger, best_block, scorer };

lightning-net-tokio/src/lib.rs

+14-6
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ use tokio::io::{AsyncReadExt, AsyncWrite, AsyncWriteExt};
8181
use lightning::ln::peer_handler;
8282
use lightning::ln::peer_handler::SocketDescriptor as LnSocketTrait;
8383
use lightning::ln::peer_handler::CustomMessageHandler;
84-
use lightning::ln::msgs::{ChannelMessageHandler, RoutingMessageHandler, NetAddress};
84+
use lightning::ln::msgs::{ChannelMessageHandler, OnionMessageHandler, NetAddress, RoutingMessageHandler};
8585
use lightning::util::logger::Logger;
8686

8787
use std::task;
@@ -120,9 +120,10 @@ struct Connection {
120120
id: u64,
121121
}
122122
impl Connection {
123-
async fn poll_event_process<CMH, RMH, L, UMH>(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor, Arc<CMH>, Arc<RMH>, Arc<L>, Arc<UMH>>>, mut event_receiver: mpsc::Receiver<()>) where
123+
async fn poll_event_process<CMH, RMH, OMH, L, UMH>(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor, Arc<CMH>, Arc<RMH>, Arc<OMH>, Arc<L>, Arc<UMH>>>, mut event_receiver: mpsc::Receiver<()>) where
124124
CMH: ChannelMessageHandler + 'static + Send + Sync,
125125
RMH: RoutingMessageHandler + 'static + Send + Sync,
126+
OMH: OnionMessageHandler + 'static + Send + Sync,
126127
L: Logger + 'static + ?Sized + Send + Sync,
127128
UMH: CustomMessageHandler + 'static + Send + Sync {
128129
loop {
@@ -133,9 +134,10 @@ impl Connection {
133134
}
134135
}
135136

136-
async fn schedule_read<CMH, RMH, L, UMH>(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor, Arc<CMH>, Arc<RMH>, Arc<L>, Arc<UMH>>>, us: Arc<Mutex<Self>>, mut reader: io::ReadHalf<TcpStream>, mut read_wake_receiver: mpsc::Receiver<()>, mut write_avail_receiver: mpsc::Receiver<()>) where
137+
async fn schedule_read<CMH, RMH, OMH, L, UMH>(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor, Arc<CMH>, Arc<RMH>, Arc<OMH>, Arc<L>, Arc<UMH>>>, us: Arc<Mutex<Self>>, mut reader: io::ReadHalf<TcpStream>, mut read_wake_receiver: mpsc::Receiver<()>, mut write_avail_receiver: mpsc::Receiver<()>) where
137138
CMH: ChannelMessageHandler + 'static + Send + Sync,
138139
RMH: RoutingMessageHandler + 'static + Send + Sync,
140+
OMH: OnionMessageHandler + 'static + Send + Sync,
139141
L: Logger + 'static + ?Sized + Send + Sync,
140142
UMH: CustomMessageHandler + 'static + Send + Sync {
141143
// Create a waker to wake up poll_event_process, above
@@ -255,9 +257,10 @@ fn get_addr_from_stream(stream: &StdTcpStream) -> Option<NetAddress> {
255257
/// The returned future will complete when the peer is disconnected and associated handling
256258
/// futures are freed, though, because all processing futures are spawned with tokio::spawn, you do
257259
/// not need to poll the provided future in order to make progress.
258-
pub fn setup_inbound<CMH, RMH, L, UMH>(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor, Arc<CMH>, Arc<RMH>, Arc<L>, Arc<UMH>>>, stream: StdTcpStream) -> impl std::future::Future<Output=()> where
260+
pub fn setup_inbound<CMH, RMH, OMH, L, UMH>(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor, Arc<CMH>, Arc<RMH>, Arc<OMH>, Arc<L>, Arc<UMH>>>, stream: StdTcpStream) -> impl std::future::Future<Output=()> where
259261
CMH: ChannelMessageHandler + 'static + Send + Sync,
260262
RMH: RoutingMessageHandler + 'static + Send + Sync,
263+
OMH: OnionMessageHandler + 'static + Send + Sync,
261264
L: Logger + 'static + ?Sized + Send + Sync,
262265
UMH: CustomMessageHandler + 'static + Send + Sync {
263266
let remote_addr = get_addr_from_stream(&stream);
@@ -297,9 +300,10 @@ pub fn setup_inbound<CMH, RMH, L, UMH>(peer_manager: Arc<peer_handler::PeerManag
297300
/// The returned future will complete when the peer is disconnected and associated handling
298301
/// futures are freed, though, because all processing futures are spawned with tokio::spawn, you do
299302
/// not need to poll the provided future in order to make progress.
300-
pub fn setup_outbound<CMH, RMH, L, UMH>(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor, Arc<CMH>, Arc<RMH>, Arc<L>, Arc<UMH>>>, their_node_id: PublicKey, stream: StdTcpStream) -> impl std::future::Future<Output=()> where
303+
pub fn setup_outbound<CMH, RMH, OMH, L, UMH>(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor, Arc<CMH>, Arc<RMH>, Arc<OMH>, Arc<L>, Arc<UMH>>>, their_node_id: PublicKey, stream: StdTcpStream) -> impl std::future::Future<Output=()> where
301304
CMH: ChannelMessageHandler + 'static + Send + Sync,
302305
RMH: RoutingMessageHandler + 'static + Send + Sync,
306+
OMH: OnionMessageHandler + 'static + Send + Sync,
303307
L: Logger + 'static + ?Sized + Send + Sync,
304308
UMH: CustomMessageHandler + 'static + Send + Sync {
305309
let remote_addr = get_addr_from_stream(&stream);
@@ -368,9 +372,10 @@ pub fn setup_outbound<CMH, RMH, L, UMH>(peer_manager: Arc<peer_handler::PeerMana
368372
/// disconnected and associated handling futures are freed, though, because all processing in said
369373
/// futures are spawned with tokio::spawn, you do not need to poll the second future in order to
370374
/// make progress.
371-
pub async fn connect_outbound<CMH, RMH, L, UMH>(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor, Arc<CMH>, Arc<RMH>, Arc<L>, Arc<UMH>>>, their_node_id: PublicKey, addr: SocketAddr) -> Option<impl std::future::Future<Output=()>> where
375+
pub async fn connect_outbound<CMH, RMH, OMH, L, UMH>(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor, Arc<CMH>, Arc<RMH>, Arc<OMH>, Arc<L>, Arc<UMH>>>, their_node_id: PublicKey, addr: SocketAddr) -> Option<impl std::future::Future<Output=()>> where
372376
CMH: ChannelMessageHandler + 'static + Send + Sync,
373377
RMH: RoutingMessageHandler + 'static + Send + Sync,
378+
OMH: OnionMessageHandler + 'static + Send + Sync,
374379
L: Logger + 'static + ?Sized + Send + Sync,
375380
UMH: CustomMessageHandler + 'static + Send + Sync {
376381
if let Ok(Ok(stream)) = time::timeout(Duration::from_secs(10), async { TcpStream::connect(&addr).await.map(|s| s.into_std().unwrap()) }).await {
@@ -618,6 +623,7 @@ mod tests {
618623
let a_manager = Arc::new(PeerManager::new(MessageHandler {
619624
chan_handler: Arc::clone(&a_handler),
620625
route_handler: Arc::clone(&a_handler),
626+
onion_message_handler: Arc::new(lightning::ln::peer_handler::IgnoringMessageHandler{}),
621627
}, a_key.clone(), &[1; 32], Arc::new(TestLogger()), Arc::new(lightning::ln::peer_handler::IgnoringMessageHandler{})));
622628

623629
let (b_connected_sender, mut b_connected) = mpsc::channel(1);
@@ -632,6 +638,7 @@ mod tests {
632638
let b_manager = Arc::new(PeerManager::new(MessageHandler {
633639
chan_handler: Arc::clone(&b_handler),
634640
route_handler: Arc::clone(&b_handler),
641+
onion_message_handler: Arc::new(lightning::ln::peer_handler::IgnoringMessageHandler{}),
635642
}, b_key.clone(), &[2; 32], Arc::new(TestLogger()), Arc::new(lightning::ln::peer_handler::IgnoringMessageHandler{})));
636643

637644
// We bind on localhost, hoping the environment is properly configured with a local
@@ -683,6 +690,7 @@ mod tests {
683690

684691
let a_manager = Arc::new(PeerManager::new(MessageHandler {
685692
chan_handler: Arc::new(lightning::ln::peer_handler::ErroringMessageHandler::new()),
693+
onion_message_handler: Arc::new(lightning::ln::peer_handler::IgnoringMessageHandler{}),
686694
route_handler: Arc::new(lightning::ln::peer_handler::IgnoringMessageHandler{}),
687695
}, a_key, &[1; 32], Arc::new(TestLogger()), Arc::new(lightning::ln::peer_handler::IgnoringMessageHandler{})));
688696

lightning/src/ln/channelmanager.rs

+1
Original file line numberDiff line numberDiff line change
@@ -5997,6 +5997,7 @@ impl<Signer: Sign, M: Deref , T: Deref , K: Deref , F: Deref , L: Deref >
59975997
&events::MessageSendEvent::SendShortIdsQuery { .. } => false,
59985998
&events::MessageSendEvent::SendReplyChannelRange { .. } => false,
59995999
&events::MessageSendEvent::SendGossipTimestampFilter { .. } => false,
6000+
&events::MessageSendEvent::SendOnionMessage { .. } => false,
60006001
}
60016002
});
60026003
}

lightning/src/ln/msgs.rs

+6
Original file line numberDiff line numberDiff line change
@@ -923,6 +923,12 @@ pub trait RoutingMessageHandler : MessageSendEventsProvider {
923923
fn handle_query_short_channel_ids(&self, their_node_id: &PublicKey, msg: QueryShortChannelIds) -> Result<(), LightningError>;
924924
}
925925

926+
/// A trait to describe an object that can receive onion messages.
927+
pub trait OnionMessageHandler : MessageSendEventsProvider {
928+
/// Handle an incoming onion_message message from the given peer.
929+
fn handle_onion_message(&self, their_node_id: &PublicKey, msg: &OnionMessage);
930+
}
931+
926932
mod fuzzy_internal_msgs {
927933
use prelude::*;
928934
use ln::{PaymentPreimage, PaymentSecret};

0 commit comments

Comments
 (0)