Skip to content

Commit c650a25

Browse files
committed
[net-tokio] Call PeerManager::process_events without blocking reads
Unlike very ancient versions of lightning-net-tokio, this does not rely on a single global process_events future, but instead has one per connection. This could still cause significant contention, so we'll ensure only two process_events calls can exist at once in the next few commits.
1 parent d2b32af commit c650a25

File tree

1 file changed

+22
-5
lines changed

1 file changed

+22
-5
lines changed

lightning-net-tokio/src/lib.rs

Lines changed: 22 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -118,11 +118,28 @@ struct Connection {
118118
id: u64,
119119
}
120120
impl Connection {
121+
async fn poll_event_process<CMH, RMH, L, UMH>(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor, Arc<CMH>, Arc<RMH>, Arc<L>, Arc<UMH>>>, mut event_receiver: mpsc::Receiver<()>) where
122+
CMH: ChannelMessageHandler + 'static + Send + Sync,
123+
RMH: RoutingMessageHandler + 'static + Send + Sync,
124+
L: Logger + 'static + ?Sized + Send + Sync,
125+
UMH: CustomMessageHandler + 'static + Send + Sync {
126+
loop {
127+
if event_receiver.recv().await.is_none() {
128+
return;
129+
}
130+
peer_manager.process_events();
131+
}
132+
}
133+
121134
async fn schedule_read<CMH, RMH, L, UMH>(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor, Arc<CMH>, Arc<RMH>, Arc<L>, Arc<UMH>>>, us: Arc<Mutex<Self>>, mut reader: io::ReadHalf<TcpStream>, mut read_wake_receiver: mpsc::Receiver<()>, mut write_avail_receiver: mpsc::Receiver<()>) where
122-
CMH: ChannelMessageHandler + 'static,
123-
RMH: RoutingMessageHandler + 'static,
124-
L: Logger + 'static + ?Sized,
125-
UMH: CustomMessageHandler + 'static {
135+
CMH: ChannelMessageHandler + 'static + Send + Sync,
136+
RMH: RoutingMessageHandler + 'static + Send + Sync,
137+
L: Logger + 'static + ?Sized + Send + Sync,
138+
UMH: CustomMessageHandler + 'static + Send + Sync {
139+
// Create a waker to wake up poll_event_process, above
140+
let (event_waker, event_receiver) = mpsc::channel(1);
141+
tokio::spawn(Self::poll_event_process(Arc::clone(&peer_manager), event_receiver));
142+
126143
// 8KB is nice and big but also should never cause any issues with stack overflowing.
127144
let mut buf = [0; 8192];
128145

@@ -173,7 +190,7 @@ impl Connection {
173190
Err(_) => break Disconnect::PeerDisconnected,
174191
},
175192
}
176-
peer_manager.process_events();
193+
let _ = event_waker.try_send(());
177194
};
178195
let writer_option = us.lock().unwrap().writer.take();
179196
if let Some(mut writer) = writer_option {

0 commit comments

Comments
 (0)