Skip to content

Commit 4036693

Browse files
Forward onion messages in PeerManager.
We want to prioritize channel messages over onion messages, so we only pull an onion message to forward if the peer's outbound buffer is empty enough.
1 parent cdca641 commit 4036693

File tree

10 files changed

+168
-47
lines changed

10 files changed

+168
-47
lines changed

fuzz/src/full_stack.rs

+2-1
Original file line numberDiff line numberDiff line change
@@ -163,7 +163,7 @@ type ChannelMan = ChannelManager<
163163
EnforcingSigner,
164164
Arc<chainmonitor::ChainMonitor<EnforcingSigner, Arc<dyn chain::Filter>, Arc<TestBroadcaster>, Arc<FuzzEstimator>, Arc<dyn Logger>, Arc<TestPersister>>>,
165165
Arc<TestBroadcaster>, Arc<KeyProvider>, Arc<FuzzEstimator>, Arc<dyn Logger>>;
166-
type PeerMan<'a> = PeerManager<Peer<'a>, Arc<ChannelMan>, Arc<P2PGossipSync<Arc<NetworkGraph<Arc<dyn Logger>>>, Arc<dyn chain::Access>, Arc<dyn Logger>>>, Arc<dyn Logger>, IgnoringMessageHandler>;
166+
type PeerMan<'a> = PeerManager<Peer<'a>, Arc<ChannelMan>, Arc<P2PGossipSync<Arc<NetworkGraph<Arc<dyn Logger>>>, Arc<dyn chain::Access>, Arc<dyn Logger>>>, IgnoringMessageHandler, Arc<dyn Logger>, IgnoringMessageHandler>;
167167

168168
struct MoneyLossDetector<'a> {
169169
manager: Arc<ChannelMan>,
@@ -403,6 +403,7 @@ pub fn do_test(data: &[u8], logger: &Arc<dyn Logger>) {
403403
let mut loss_detector = MoneyLossDetector::new(&peers, channelmanager.clone(), monitor.clone(), PeerManager::new(MessageHandler {
404404
chan_handler: channelmanager.clone(),
405405
route_handler: gossip_sync.clone(),
406+
onion_message_handler: IgnoringMessageHandler {},
406407
}, our_network_key, &[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 15, 0], Arc::clone(&logger), IgnoringMessageHandler{}));
407408

408409
let mut should_forward = false;

lightning-background-processor/src/lib.rs

+6-4
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ use lightning::chain::chaininterface::{BroadcasterInterface, FeeEstimator};
1616
use lightning::chain::chainmonitor::{ChainMonitor, Persist};
1717
use lightning::chain::keysinterface::{Sign, KeysInterface};
1818
use lightning::ln::channelmanager::ChannelManager;
19-
use lightning::ln::msgs::{ChannelMessageHandler, RoutingMessageHandler};
19+
use lightning::ln::msgs::{ChannelMessageHandler, OnionMessageHandler, RoutingMessageHandler};
2020
use lightning::ln::peer_handler::{CustomMessageHandler, PeerManager, SocketDescriptor};
2121
use lightning::routing::gossip::{NetworkGraph, P2PGossipSync};
2222
use lightning::routing::scoring::WriteableScore;
@@ -229,6 +229,7 @@ impl BackgroundProcessor {
229229
P: 'static + Deref + Send + Sync,
230230
Descriptor: 'static + SocketDescriptor + Send + Sync,
231231
CMH: 'static + Deref + Send + Sync,
232+
OMH: 'static + Deref + Send + Sync,
232233
RMH: 'static + Deref + Send + Sync,
233234
EH: 'static + EventHandler + Send,
234235
PS: 'static + Deref + Send,
@@ -237,7 +238,7 @@ impl BackgroundProcessor {
237238
PGS: 'static + Deref<Target = P2PGossipSync<G, CA, L>> + Send + Sync,
238239
RGS: 'static + Deref<Target = RapidGossipSync<G, L>> + Send,
239240
UMH: 'static + Deref + Send + Sync,
240-
PM: 'static + Deref<Target = PeerManager<Descriptor, CMH, RMH, L, UMH>> + Send + Sync,
241+
PM: 'static + Deref<Target = PeerManager<Descriptor, CMH, RMH, OMH, L, UMH>> + Send + Sync,
241242
S: 'static + Deref<Target = SC> + Send + Sync,
242243
SC: WriteableScore<'a>,
243244
>(
@@ -254,6 +255,7 @@ impl BackgroundProcessor {
254255
L::Target: 'static + Logger,
255256
P::Target: 'static + Persist<Signer>,
256257
CMH::Target: 'static + ChannelMessageHandler,
258+
OMH::Target: 'static + OnionMessageHandler,
257259
RMH::Target: 'static + RoutingMessageHandler,
258260
UMH::Target: 'static + CustomMessageHandler,
259261
PS::Target: 'static + Persister<'a, Signer, CW, T, K, F, L, SC>,
@@ -489,7 +491,7 @@ mod tests {
489491
node: Arc<SimpleArcChannelManager<ChainMonitor, test_utils::TestBroadcaster, test_utils::TestFeeEstimator, test_utils::TestLogger>>,
490492
p2p_gossip_sync: PGS,
491493
rapid_gossip_sync: RGS,
492-
peer_manager: Arc<PeerManager<TestDescriptor, Arc<test_utils::TestChannelMessageHandler>, Arc<test_utils::TestRoutingMessageHandler>, Arc<test_utils::TestLogger>, IgnoringMessageHandler>>,
494+
peer_manager: Arc<PeerManager<TestDescriptor, Arc<test_utils::TestChannelMessageHandler>, Arc<test_utils::TestRoutingMessageHandler>, IgnoringMessageHandler, Arc<test_utils::TestLogger>, IgnoringMessageHandler>>,
493495
chain_monitor: Arc<ChainMonitor>,
494496
persister: Arc<FilesystemPersister>,
495497
tx_broadcaster: Arc<test_utils::TestBroadcaster>,
@@ -608,7 +610,7 @@ mod tests {
608610
let network_graph = Arc::new(NetworkGraph::new(genesis_block.header.block_hash(), logger.clone()));
609611
let p2p_gossip_sync = Arc::new(P2PGossipSync::new(network_graph.clone(), Some(chain_source.clone()), logger.clone()));
610612
let rapid_gossip_sync = Arc::new(RapidGossipSync::new(network_graph.clone()));
611-
let msg_handler = MessageHandler { chan_handler: Arc::new(test_utils::TestChannelMessageHandler::new()), route_handler: Arc::new(test_utils::TestRoutingMessageHandler::new() )};
613+
let msg_handler = MessageHandler { chan_handler: Arc::new(test_utils::TestChannelMessageHandler::new()), route_handler: Arc::new(test_utils::TestRoutingMessageHandler::new()), onion_message_handler: IgnoringMessageHandler{}};
612614
let peer_manager = Arc::new(PeerManager::new(msg_handler, keys_manager.get_node_secret(Recipient::Node).unwrap(), &seed, logger.clone(), IgnoringMessageHandler{}));
613615
let scorer = Arc::new(Mutex::new(test_utils::TestScorer::with_penalty(0)));
614616
let node = Node { node: manager, p2p_gossip_sync, rapid_gossip_sync, peer_manager, chain_monitor, persister, tx_broadcaster, network_graph, logger, best_block, scorer };

lightning-net-tokio/src/lib.rs

+14-6
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ use tokio::io::{AsyncReadExt, AsyncWrite, AsyncWriteExt};
8181
use lightning::ln::peer_handler;
8282
use lightning::ln::peer_handler::SocketDescriptor as LnSocketTrait;
8383
use lightning::ln::peer_handler::CustomMessageHandler;
84-
use lightning::ln::msgs::{ChannelMessageHandler, RoutingMessageHandler, NetAddress};
84+
use lightning::ln::msgs::{ChannelMessageHandler, OnionMessageHandler, NetAddress, RoutingMessageHandler};
8585
use lightning::util::logger::Logger;
8686

8787
use std::task;
@@ -120,9 +120,10 @@ struct Connection {
120120
id: u64,
121121
}
122122
impl Connection {
123-
async fn poll_event_process<CMH, RMH, L, UMH>(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor, Arc<CMH>, Arc<RMH>, Arc<L>, Arc<UMH>>>, mut event_receiver: mpsc::Receiver<()>) where
123+
async fn poll_event_process<CMH, RMH, OMH, L, UMH>(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor, Arc<CMH>, Arc<RMH>, Arc<OMH>, Arc<L>, Arc<UMH>>>, mut event_receiver: mpsc::Receiver<()>) where
124124
CMH: ChannelMessageHandler + 'static + Send + Sync,
125125
RMH: RoutingMessageHandler + 'static + Send + Sync,
126+
OMH: OnionMessageHandler + 'static + Send + Sync,
126127
L: Logger + 'static + ?Sized + Send + Sync,
127128
UMH: CustomMessageHandler + 'static + Send + Sync {
128129
loop {
@@ -133,9 +134,10 @@ impl Connection {
133134
}
134135
}
135136

136-
async fn schedule_read<CMH, RMH, L, UMH>(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor, Arc<CMH>, Arc<RMH>, Arc<L>, Arc<UMH>>>, us: Arc<Mutex<Self>>, mut reader: io::ReadHalf<TcpStream>, mut read_wake_receiver: mpsc::Receiver<()>, mut write_avail_receiver: mpsc::Receiver<()>) where
137+
async fn schedule_read<CMH, RMH, OMH, L, UMH>(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor, Arc<CMH>, Arc<RMH>, Arc<OMH>, Arc<L>, Arc<UMH>>>, us: Arc<Mutex<Self>>, mut reader: io::ReadHalf<TcpStream>, mut read_wake_receiver: mpsc::Receiver<()>, mut write_avail_receiver: mpsc::Receiver<()>) where
137138
CMH: ChannelMessageHandler + 'static + Send + Sync,
138139
RMH: RoutingMessageHandler + 'static + Send + Sync,
140+
OMH: OnionMessageHandler + 'static + Send + Sync,
139141
L: Logger + 'static + ?Sized + Send + Sync,
140142
UMH: CustomMessageHandler + 'static + Send + Sync {
141143
// Create a waker to wake up poll_event_process, above
@@ -255,9 +257,10 @@ fn get_addr_from_stream(stream: &StdTcpStream) -> Option<NetAddress> {
255257
/// The returned future will complete when the peer is disconnected and associated handling
256258
/// futures are freed, though, because all processing futures are spawned with tokio::spawn, you do
257259
/// not need to poll the provided future in order to make progress.
258-
pub fn setup_inbound<CMH, RMH, L, UMH>(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor, Arc<CMH>, Arc<RMH>, Arc<L>, Arc<UMH>>>, stream: StdTcpStream) -> impl std::future::Future<Output=()> where
260+
pub fn setup_inbound<CMH, RMH, OMH, L, UMH>(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor, Arc<CMH>, Arc<RMH>, Arc<OMH>, Arc<L>, Arc<UMH>>>, stream: StdTcpStream) -> impl std::future::Future<Output=()> where
259261
CMH: ChannelMessageHandler + 'static + Send + Sync,
260262
RMH: RoutingMessageHandler + 'static + Send + Sync,
263+
OMH: OnionMessageHandler + 'static + Send + Sync,
261264
L: Logger + 'static + ?Sized + Send + Sync,
262265
UMH: CustomMessageHandler + 'static + Send + Sync {
263266
let remote_addr = get_addr_from_stream(&stream);
@@ -297,9 +300,10 @@ pub fn setup_inbound<CMH, RMH, L, UMH>(peer_manager: Arc<peer_handler::PeerManag
297300
/// The returned future will complete when the peer is disconnected and associated handling
298301
/// futures are freed, though, because all processing futures are spawned with tokio::spawn, you do
299302
/// not need to poll the provided future in order to make progress.
300-
pub fn setup_outbound<CMH, RMH, L, UMH>(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor, Arc<CMH>, Arc<RMH>, Arc<L>, Arc<UMH>>>, their_node_id: PublicKey, stream: StdTcpStream) -> impl std::future::Future<Output=()> where
303+
pub fn setup_outbound<CMH, RMH, OMH, L, UMH>(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor, Arc<CMH>, Arc<RMH>, Arc<OMH>, Arc<L>, Arc<UMH>>>, their_node_id: PublicKey, stream: StdTcpStream) -> impl std::future::Future<Output=()> where
301304
CMH: ChannelMessageHandler + 'static + Send + Sync,
302305
RMH: RoutingMessageHandler + 'static + Send + Sync,
306+
OMH: OnionMessageHandler + 'static + Send + Sync,
303307
L: Logger + 'static + ?Sized + Send + Sync,
304308
UMH: CustomMessageHandler + 'static + Send + Sync {
305309
let remote_addr = get_addr_from_stream(&stream);
@@ -368,9 +372,10 @@ pub fn setup_outbound<CMH, RMH, L, UMH>(peer_manager: Arc<peer_handler::PeerMana
368372
/// disconnected and associated handling futures are freed, though, because all processing in said
369373
/// futures are spawned with tokio::spawn, you do not need to poll the second future in order to
370374
/// make progress.
371-
pub async fn connect_outbound<CMH, RMH, L, UMH>(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor, Arc<CMH>, Arc<RMH>, Arc<L>, Arc<UMH>>>, their_node_id: PublicKey, addr: SocketAddr) -> Option<impl std::future::Future<Output=()>> where
375+
pub async fn connect_outbound<CMH, RMH, OMH, L, UMH>(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor, Arc<CMH>, Arc<RMH>, Arc<OMH>, Arc<L>, Arc<UMH>>>, their_node_id: PublicKey, addr: SocketAddr) -> Option<impl std::future::Future<Output=()>> where
372376
CMH: ChannelMessageHandler + 'static + Send + Sync,
373377
RMH: RoutingMessageHandler + 'static + Send + Sync,
378+
OMH: OnionMessageHandler + 'static + Send + Sync,
374379
L: Logger + 'static + ?Sized + Send + Sync,
375380
UMH: CustomMessageHandler + 'static + Send + Sync {
376381
if let Ok(Ok(stream)) = time::timeout(Duration::from_secs(10), async { TcpStream::connect(&addr).await.map(|s| s.into_std().unwrap()) }).await {
@@ -618,6 +623,7 @@ mod tests {
618623
let a_manager = Arc::new(PeerManager::new(MessageHandler {
619624
chan_handler: Arc::clone(&a_handler),
620625
route_handler: Arc::clone(&a_handler),
626+
onion_message_handler: Arc::new(lightning::ln::peer_handler::IgnoringMessageHandler{}),
621627
}, a_key.clone(), &[1; 32], Arc::new(TestLogger()), Arc::new(lightning::ln::peer_handler::IgnoringMessageHandler{})));
622628

623629
let (b_connected_sender, mut b_connected) = mpsc::channel(1);
@@ -632,6 +638,7 @@ mod tests {
632638
let b_manager = Arc::new(PeerManager::new(MessageHandler {
633639
chan_handler: Arc::clone(&b_handler),
634640
route_handler: Arc::clone(&b_handler),
641+
onion_message_handler: Arc::new(lightning::ln::peer_handler::IgnoringMessageHandler{}),
635642
}, b_key.clone(), &[2; 32], Arc::new(TestLogger()), Arc::new(lightning::ln::peer_handler::IgnoringMessageHandler{})));
636643

637644
// We bind on localhost, hoping the environment is properly configured with a local
@@ -683,6 +690,7 @@ mod tests {
683690

684691
let a_manager = Arc::new(PeerManager::new(MessageHandler {
685692
chan_handler: Arc::new(lightning::ln::peer_handler::ErroringMessageHandler::new()),
693+
onion_message_handler: Arc::new(lightning::ln::peer_handler::IgnoringMessageHandler{}),
686694
route_handler: Arc::new(lightning::ln::peer_handler::IgnoringMessageHandler{}),
687695
}, a_key, &[1; 32], Arc::new(TestLogger()), Arc::new(lightning::ln::peer_handler::IgnoringMessageHandler{})));
688696

lightning/src/ln/channelmanager.rs

+1
Original file line numberDiff line numberDiff line change
@@ -6005,6 +6005,7 @@ impl<Signer: Sign, M: Deref , T: Deref , K: Deref , F: Deref , L: Deref >
60056005
&events::MessageSendEvent::SendShortIdsQuery { .. } => false,
60066006
&events::MessageSendEvent::SendReplyChannelRange { .. } => false,
60076007
&events::MessageSendEvent::SendGossipTimestampFilter { .. } => false,
6008+
&events::MessageSendEvent::SendOnionMessage { .. } => false,
60086009
}
60096010
});
60106011
}

lightning/src/ln/msgs.rs

+7-1
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ use core::fmt::Debug;
3939
use io::{self, Read};
4040
use io_extras::read_to_end;
4141

42-
use util::events::MessageSendEventsProvider;
42+
use util::events::{MessageSendEventsProvider, OnionMessageProvider};
4343
use util::logger;
4444
use util::ser::{LengthReadable, Readable, Writeable, Writer, FixedLengthReader, HighZeroBytesDroppedVarInt, Hostname};
4545

@@ -945,6 +945,12 @@ pub trait RoutingMessageHandler : MessageSendEventsProvider {
945945
fn handle_query_short_channel_ids(&self, their_node_id: &PublicKey, msg: QueryShortChannelIds) -> Result<(), LightningError>;
946946
}
947947

948+
/// A trait to describe an object that can receive onion messages.
949+
pub trait OnionMessageHandler : OnionMessageProvider {
950+
/// Handle an incoming onion_message message from the given peer.
951+
fn handle_onion_message(&self, peer_node_id: &PublicKey, msg: &OnionMessage);
952+
}
953+
948954
mod fuzzy_internal_msgs {
949955
use prelude::*;
950956
use ln::{PaymentPreimage, PaymentSecret};

0 commit comments

Comments
 (0)