Skip to content

Commit f792de6

Browse files
Enable PeerManager to fetch events from OnionMessenger + boilerplate
The next commit will implement fetching and writing out events in PeerManager
1 parent 265f82a commit f792de6

File tree

10 files changed

+163
-59
lines changed

10 files changed

+163
-59
lines changed

fuzz/src/full_stack.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -164,7 +164,7 @@ type ChannelMan = ChannelManager<
164164
EnforcingSigner,
165165
Arc<chainmonitor::ChainMonitor<EnforcingSigner, Arc<dyn chain::Filter>, Arc<TestBroadcaster>, Arc<FuzzEstimator>, Arc<dyn Logger>, Arc<TestPersister>>>,
166166
Arc<TestBroadcaster>, Arc<KeyProvider>, Arc<FuzzEstimator>, Arc<dyn Logger>>;
167-
type PeerMan<'a> = PeerManager<Peer<'a>, Arc<ChannelMan>, Arc<P2PGossipSync<Arc<NetworkGraph<Arc<dyn Logger>>>, Arc<dyn chain::Access>, Arc<dyn Logger>>>, Arc<dyn Logger>, IgnoringMessageHandler>;
167+
type PeerMan<'a> = PeerManager<Peer<'a>, Arc<ChannelMan>, Arc<P2PGossipSync<Arc<NetworkGraph<Arc<dyn Logger>>>, Arc<dyn chain::Access>, Arc<dyn Logger>>>, IgnoringMessageHandler, Arc<dyn Logger>, IgnoringMessageHandler>;
168168

169169
struct MoneyLossDetector<'a> {
170170
manager: Arc<ChannelMan>,
@@ -412,6 +412,7 @@ pub fn do_test(data: &[u8], logger: &Arc<dyn Logger>) {
412412
let mut loss_detector = MoneyLossDetector::new(&peers, channelmanager.clone(), monitor.clone(), PeerManager::new(MessageHandler {
413413
chan_handler: channelmanager.clone(),
414414
route_handler: gossip_sync.clone(),
415+
onion_message_handler: IgnoringMessageHandler {},
415416
}, our_network_key, &[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 15, 0], Arc::clone(&logger), IgnoringMessageHandler{}));
416417

417418
let mut should_forward = false;

lightning-background-processor/src/lib.rs

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ use lightning::chain::chaininterface::{BroadcasterInterface, FeeEstimator};
1616
use lightning::chain::chainmonitor::{ChainMonitor, Persist};
1717
use lightning::chain::keysinterface::{Sign, KeysInterface};
1818
use lightning::ln::channelmanager::ChannelManager;
19-
use lightning::ln::msgs::{ChannelMessageHandler, RoutingMessageHandler};
19+
use lightning::ln::msgs::{ChannelMessageHandler, OnionMessageHandler, RoutingMessageHandler};
2020
use lightning::ln::peer_handler::{CustomMessageHandler, PeerManager, SocketDescriptor};
2121
use lightning::routing::gossip::{NetworkGraph, P2PGossipSync};
2222
use lightning::routing::scoring::WriteableScore;
@@ -278,6 +278,7 @@ impl BackgroundProcessor {
278278
P: 'static + Deref + Send + Sync,
279279
Descriptor: 'static + SocketDescriptor + Send + Sync,
280280
CMH: 'static + Deref + Send + Sync,
281+
OMH: 'static + Deref + Send + Sync,
281282
RMH: 'static + Deref + Send + Sync,
282283
EH: 'static + EventHandler + Send,
283284
PS: 'static + Deref + Send,
@@ -286,7 +287,7 @@ impl BackgroundProcessor {
286287
PGS: 'static + Deref<Target = P2PGossipSync<G, CA, L>> + Send + Sync,
287288
RGS: 'static + Deref<Target = RapidGossipSync<G, L>> + Send,
288289
UMH: 'static + Deref + Send + Sync,
289-
PM: 'static + Deref<Target = PeerManager<Descriptor, CMH, RMH, L, UMH>> + Send + Sync,
290+
PM: 'static + Deref<Target = PeerManager<Descriptor, CMH, RMH, OMH, L, UMH>> + Send + Sync,
290291
S: 'static + Deref<Target = SC> + Send + Sync,
291292
SC: WriteableScore<'a>,
292293
>(
@@ -303,6 +304,7 @@ impl BackgroundProcessor {
303304
L::Target: 'static + Logger,
304305
P::Target: 'static + Persist<Signer>,
305306
CMH::Target: 'static + ChannelMessageHandler,
307+
OMH::Target: 'static + OnionMessageHandler,
306308
RMH::Target: 'static + RoutingMessageHandler,
307309
UMH::Target: 'static + CustomMessageHandler,
308310
PS::Target: 'static + Persister<'a, Signer, CW, T, K, F, L, SC>,
@@ -538,7 +540,7 @@ mod tests {
538540
node: Arc<SimpleArcChannelManager<ChainMonitor, test_utils::TestBroadcaster, test_utils::TestFeeEstimator, test_utils::TestLogger>>,
539541
p2p_gossip_sync: PGS,
540542
rapid_gossip_sync: RGS,
541-
peer_manager: Arc<PeerManager<TestDescriptor, Arc<test_utils::TestChannelMessageHandler>, Arc<test_utils::TestRoutingMessageHandler>, Arc<test_utils::TestLogger>, IgnoringMessageHandler>>,
543+
peer_manager: Arc<PeerManager<TestDescriptor, Arc<test_utils::TestChannelMessageHandler>, Arc<test_utils::TestRoutingMessageHandler>, IgnoringMessageHandler, Arc<test_utils::TestLogger>, IgnoringMessageHandler>>,
542544
chain_monitor: Arc<ChainMonitor>,
543545
persister: Arc<FilesystemPersister>,
544546
tx_broadcaster: Arc<test_utils::TestBroadcaster>,
@@ -657,7 +659,7 @@ mod tests {
657659
let network_graph = Arc::new(NetworkGraph::new(genesis_block.header.block_hash(), logger.clone()));
658660
let p2p_gossip_sync = Arc::new(P2PGossipSync::new(network_graph.clone(), Some(chain_source.clone()), logger.clone()));
659661
let rapid_gossip_sync = Arc::new(RapidGossipSync::new(network_graph.clone()));
660-
let msg_handler = MessageHandler { chan_handler: Arc::new(test_utils::TestChannelMessageHandler::new()), route_handler: Arc::new(test_utils::TestRoutingMessageHandler::new() )};
662+
let msg_handler = MessageHandler { chan_handler: Arc::new(test_utils::TestChannelMessageHandler::new()), route_handler: Arc::new(test_utils::TestRoutingMessageHandler::new()), onion_message_handler: IgnoringMessageHandler{}};
661663
let peer_manager = Arc::new(PeerManager::new(msg_handler, keys_manager.get_node_secret(Recipient::Node).unwrap(), &seed, logger.clone(), IgnoringMessageHandler{}));
662664
let scorer = Arc::new(Mutex::new(test_utils::TestScorer::with_penalty(0)));
663665
let node = Node { node: manager, p2p_gossip_sync, rapid_gossip_sync, peer_manager, chain_monitor, persister, tx_broadcaster, network_graph, logger, best_block, scorer };

lightning-net-tokio/src/lib.rs

Lines changed: 19 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ use tokio::io::{AsyncReadExt, AsyncWrite, AsyncWriteExt};
8181
use lightning::ln::peer_handler;
8282
use lightning::ln::peer_handler::SocketDescriptor as LnSocketTrait;
8383
use lightning::ln::peer_handler::CustomMessageHandler;
84-
use lightning::ln::msgs::{ChannelMessageHandler, RoutingMessageHandler, NetAddress};
84+
use lightning::ln::msgs::{ChannelMessageHandler, OnionMessageHandler, NetAddress, RoutingMessageHandler};
8585
use lightning::util::logger::Logger;
8686

8787
use std::ops::Deref;
@@ -121,13 +121,15 @@ struct Connection {
121121
id: u64,
122122
}
123123
impl Connection {
124-
async fn poll_event_process<CMH, RMH, L, UMH>(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor, CMH, RMH, L, UMH>>, mut event_receiver: mpsc::Receiver<()>) where
124+
async fn poll_event_process<CMH, RMH, OMH, L, UMH>(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor, CMH, RMH, OMH, L, UMH>>, mut event_receiver: mpsc::Receiver<()>) where
125125
CMH: Deref + 'static + Send + Sync,
126126
RMH: Deref + 'static + Send + Sync,
127+
OMH: Deref + 'static + Send + Sync,
127128
L: Deref + 'static + Send + Sync,
128129
UMH: Deref + 'static + Send + Sync,
129130
CMH::Target: ChannelMessageHandler + Send + Sync,
130131
RMH::Target: RoutingMessageHandler + Send + Sync,
132+
OMH::Target: OnionMessageHandler + Send + Sync,
131133
L::Target: Logger + Send + Sync,
132134
UMH::Target: CustomMessageHandler + Send + Sync,
133135
{
@@ -139,13 +141,15 @@ impl Connection {
139141
}
140142
}
141143

142-
async fn schedule_read<CMH, RMH, L, UMH>(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor, CMH, RMH, L, UMH>>, us: Arc<Mutex<Self>>, mut reader: io::ReadHalf<TcpStream>, mut read_wake_receiver: mpsc::Receiver<()>, mut write_avail_receiver: mpsc::Receiver<()>) where
144+
async fn schedule_read<CMH, RMH, OMH, L, UMH>(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor, CMH, RMH, OMH, L, UMH>>, us: Arc<Mutex<Self>>, mut reader: io::ReadHalf<TcpStream>, mut read_wake_receiver: mpsc::Receiver<()>, mut write_avail_receiver: mpsc::Receiver<()>) where
143145
CMH: Deref + 'static + Send + Sync,
144146
RMH: Deref + 'static + Send + Sync,
147+
OMH: Deref + 'static + Send + Sync,
145148
L: Deref + 'static + Send + Sync,
146149
UMH: Deref + 'static + Send + Sync,
147150
CMH::Target: ChannelMessageHandler + 'static + Send + Sync,
148151
RMH::Target: RoutingMessageHandler + 'static + Send + Sync,
152+
OMH::Target: OnionMessageHandler + 'static + Send + Sync,
149153
L::Target: Logger + 'static + Send + Sync,
150154
UMH::Target: CustomMessageHandler + 'static + Send + Sync,
151155
{
@@ -266,13 +270,15 @@ fn get_addr_from_stream(stream: &StdTcpStream) -> Option<NetAddress> {
266270
/// The returned future will complete when the peer is disconnected and associated handling
267271
/// futures are freed, though, because all processing futures are spawned with tokio::spawn, you do
268272
/// not need to poll the provided future in order to make progress.
269-
pub fn setup_inbound<CMH, RMH, L, UMH>(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor, CMH, RMH, L, UMH>>, stream: StdTcpStream) -> impl std::future::Future<Output=()> where
273+
pub fn setup_inbound<CMH, RMH, OMH, L, UMH>(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor, CMH, RMH, OMH, L, UMH>>, stream: StdTcpStream) -> impl std::future::Future<Output=()> where
270274
CMH: Deref + 'static + Send + Sync,
271275
RMH: Deref + 'static + Send + Sync,
276+
OMH: Deref + 'static + Send + Sync,
272277
L: Deref + 'static + Send + Sync,
273278
UMH: Deref + 'static + Send + Sync,
274279
CMH::Target: ChannelMessageHandler + Send + Sync,
275280
RMH::Target: RoutingMessageHandler + Send + Sync,
281+
OMH::Target: OnionMessageHandler + Send + Sync,
276282
L::Target: Logger + Send + Sync,
277283
UMH::Target: CustomMessageHandler + Send + Sync,
278284
{
@@ -313,13 +319,15 @@ pub fn setup_inbound<CMH, RMH, L, UMH>(peer_manager: Arc<peer_handler::PeerManag
313319
/// The returned future will complete when the peer is disconnected and associated handling
314320
/// futures are freed, though, because all processing futures are spawned with tokio::spawn, you do
315321
/// not need to poll the provided future in order to make progress.
316-
pub fn setup_outbound<CMH, RMH, L, UMH>(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor, CMH, RMH, L, UMH>>, their_node_id: PublicKey, stream: StdTcpStream) -> impl std::future::Future<Output=()> where
322+
pub fn setup_outbound<CMH, RMH, OMH, L, UMH>(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor, CMH, RMH, OMH, L, UMH>>, their_node_id: PublicKey, stream: StdTcpStream) -> impl std::future::Future<Output=()> where
317323
CMH: Deref + 'static + Send + Sync,
318324
RMH: Deref + 'static + Send + Sync,
325+
OMH: Deref + 'static + Send + Sync,
319326
L: Deref + 'static + Send + Sync,
320327
UMH: Deref + 'static + Send + Sync,
321328
CMH::Target: ChannelMessageHandler + Send + Sync,
322329
RMH::Target: RoutingMessageHandler + Send + Sync,
330+
OMH::Target: OnionMessageHandler + Send + Sync,
323331
L::Target: Logger + Send + Sync,
324332
UMH::Target: CustomMessageHandler + Send + Sync,
325333
{
@@ -389,13 +397,15 @@ pub fn setup_outbound<CMH, RMH, L, UMH>(peer_manager: Arc<peer_handler::PeerMana
389397
/// disconnected and associated handling futures are freed, though, because all processing in said
390398
/// futures are spawned with tokio::spawn, you do not need to poll the second future in order to
391399
/// make progress.
392-
pub async fn connect_outbound<CMH, RMH, L, UMH>(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor, CMH, RMH, L, UMH>>, their_node_id: PublicKey, addr: SocketAddr) -> Option<impl std::future::Future<Output=()>> where
400+
pub async fn connect_outbound<CMH, RMH, OMH, L, UMH>(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor, CMH, RMH, OMH, L, UMH>>, their_node_id: PublicKey, addr: SocketAddr) -> Option<impl std::future::Future<Output=()>> where
393401
CMH: Deref + 'static + Send + Sync,
394402
RMH: Deref + 'static + Send + Sync,
403+
OMH: Deref + 'static + Send + Sync,
395404
L: Deref + 'static + Send + Sync,
396405
UMH: Deref + 'static + Send + Sync,
397406
CMH::Target: ChannelMessageHandler + Send + Sync,
398407
RMH::Target: RoutingMessageHandler + Send + Sync,
408+
OMH::Target: OnionMessageHandler + Send + Sync,
399409
L::Target: Logger + Send + Sync,
400410
UMH::Target: CustomMessageHandler + Send + Sync,
401411
{
@@ -644,6 +654,7 @@ mod tests {
644654
let a_manager = Arc::new(PeerManager::new(MessageHandler {
645655
chan_handler: Arc::clone(&a_handler),
646656
route_handler: Arc::clone(&a_handler),
657+
onion_message_handler: Arc::new(lightning::ln::peer_handler::IgnoringMessageHandler{}),
647658
}, a_key.clone(), &[1; 32], Arc::new(TestLogger()), Arc::new(lightning::ln::peer_handler::IgnoringMessageHandler{})));
648659

649660
let (b_connected_sender, mut b_connected) = mpsc::channel(1);
@@ -658,6 +669,7 @@ mod tests {
658669
let b_manager = Arc::new(PeerManager::new(MessageHandler {
659670
chan_handler: Arc::clone(&b_handler),
660671
route_handler: Arc::clone(&b_handler),
672+
onion_message_handler: Arc::new(lightning::ln::peer_handler::IgnoringMessageHandler{}),
661673
}, b_key.clone(), &[2; 32], Arc::new(TestLogger()), Arc::new(lightning::ln::peer_handler::IgnoringMessageHandler{})));
662674

663675
// We bind on localhost, hoping the environment is properly configured with a local
@@ -709,6 +721,7 @@ mod tests {
709721

710722
let a_manager = Arc::new(PeerManager::new(MessageHandler {
711723
chan_handler: Arc::new(lightning::ln::peer_handler::ErroringMessageHandler::new()),
724+
onion_message_handler: Arc::new(lightning::ln::peer_handler::IgnoringMessageHandler{}),
712725
route_handler: Arc::new(lightning::ln::peer_handler::IgnoringMessageHandler{}),
713726
}, a_key, &[1; 32], Arc::new(TestLogger()), Arc::new(lightning::ln::peer_handler::IgnoringMessageHandler{})));
714727

lightning/src/ln/channelmanager.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6153,6 +6153,7 @@ impl<Signer: Sign, M: Deref , T: Deref , K: Deref , F: Deref , L: Deref >
61536153
&events::MessageSendEvent::SendShortIdsQuery { .. } => false,
61546154
&events::MessageSendEvent::SendReplyChannelRange { .. } => false,
61556155
&events::MessageSendEvent::SendGossipTimestampFilter { .. } => false,
6156+
&events::MessageSendEvent::SendOnionMessage { .. } => false,
61566157
}
61576158
});
61586159
}

lightning/src/ln/msgs.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ use core::fmt::Debug;
4040
use io::{self, Read};
4141
use io_extras::read_to_end;
4242

43-
use util::events::MessageSendEventsProvider;
43+
use util::events::{MessageSendEventsProvider, OnionMessageProvider};
4444
use util::logger;
4545
use util::ser::{LengthReadable, Readable, ReadableArgs, Writeable, Writer, FixedLengthReader, HighZeroBytesDroppedVarInt, Hostname};
4646

@@ -945,6 +945,12 @@ pub trait RoutingMessageHandler : MessageSendEventsProvider {
945945
fn handle_query_short_channel_ids(&self, their_node_id: &PublicKey, msg: QueryShortChannelIds) -> Result<(), LightningError>;
946946
}
947947

948+
/// A trait to describe an object that can receive onion messages.
949+
pub trait OnionMessageHandler : OnionMessageProvider {
950+
/// Handle an incoming onion_message message from the given peer.
951+
fn handle_onion_message(&self, peer_node_id: &PublicKey, msg: &OnionMessage);
952+
}
953+
948954
mod fuzzy_internal_msgs {
949955
use prelude::*;
950956
use ln::{PaymentPreimage, PaymentSecret};

0 commit comments

Comments
 (0)