Skip to content

Commit 8c8f2a4

Browse files
committed
Avoid calling PeerManager::process_events more often than needed
Instead of calling `process_events` after each byte read from a peer, only call it when we have no data left to process. This should reduce write lock contention somewhat inside `PeerManager`.
1 parent d2b32af commit 8c8f2a4

File tree

1 file changed

+31
-19
lines changed

1 file changed

+31
-19
lines changed

lightning-net-tokio/src/lib.rs

Lines changed: 31 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -71,10 +71,10 @@
7171

7272
use bitcoin::secp256k1::key::PublicKey;
7373

74-
use tokio::net::TcpStream;
74+
use tokio::net::{TcpStream, tcp::OwnedReadHalf, tcp::OwnedWriteHalf};
7575
use tokio::{io, time};
7676
use tokio::sync::mpsc;
77-
use tokio::io::{AsyncReadExt, AsyncWrite, AsyncWriteExt};
77+
use tokio::io::{AsyncWrite, AsyncWriteExt};
7878

7979
use lightning::ln::peer_handler;
8080
use lightning::ln::peer_handler::SocketDescriptor as LnSocketTrait;
@@ -96,7 +96,7 @@ static ID_COUNTER: AtomicU64 = AtomicU64::new(0);
9696
/// Connection object (in an Arc<Mutex<>>) in each SocketDescriptor we create as well as in the
9797
/// read future (which is returned by schedule_read).
9898
struct Connection {
99-
writer: Option<io::WriteHalf<TcpStream>>,
99+
writer: Option<OwnedWriteHalf>,
100100
// Because our PeerManager is templated by user-provided types, and we can't (as far as I can
101101
// tell) have a const RawWakerVTable built out of templated functions, we need some indirection
102102
// between being woken up with write-ready and calling PeerManager::write_buffer_space_avail.
@@ -118,7 +118,7 @@ struct Connection {
118118
id: u64,
119119
}
120120
impl Connection {
121-
async fn schedule_read<CMH, RMH, L, UMH>(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor, Arc<CMH>, Arc<RMH>, Arc<L>, Arc<UMH>>>, us: Arc<Mutex<Self>>, mut reader: io::ReadHalf<TcpStream>, mut read_wake_receiver: mpsc::Receiver<()>, mut write_avail_receiver: mpsc::Receiver<()>) where
121+
async fn schedule_read<CMH, RMH, L, UMH>(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor, Arc<CMH>, Arc<RMH>, Arc<L>, Arc<UMH>>>, us: Arc<Mutex<Self>>, reader: OwnedReadHalf, mut read_wake_receiver: mpsc::Receiver<()>, mut write_avail_receiver: mpsc::Receiver<()>) where
122122
CMH: ChannelMessageHandler + 'static,
123123
RMH: RoutingMessageHandler + 'static,
124124
L: Logger + 'static + ?Sized,
@@ -140,7 +140,7 @@ impl Connection {
140140
// Rust-Lightning that the socket is gone.
141141
PeerDisconnected
142142
}
143-
let disconnect_type = loop {
143+
let disconnect_type = 'socket_loop: loop {
144144
let read_paused = {
145145
let us_lock = us.lock().unwrap();
146146
if us_lock.rl_requested_disconnect {
@@ -156,21 +156,33 @@ impl Connection {
156156
}
157157
},
158158
_ = read_wake_receiver.recv() => {},
159-
read = reader.read(&mut buf), if !read_paused => match read {
160-
Ok(0) => break Disconnect::PeerDisconnected,
161-
Ok(len) => {
162-
let read_res = peer_manager.read_event(&mut our_descriptor, &buf[0..len]);
163-
let mut us_lock = us.lock().unwrap();
164-
match read_res {
165-
Ok(pause_read) => {
166-
if pause_read {
167-
us_lock.read_paused = true;
159+
_ = reader.as_ref().readable(), if !read_paused => {
160+
// We fully drain the socket before we move on and call `process_events` or
161+
// possibly call `write_buffer_space_avail`. LDK will ensure we aren't waiting
162+
// for write and overfill our write buffer by telling us to pause read, which
163+
// will break the loop.
164+
// This prevents us from calling `process_events` more often than needed while
165+
// there's still work to be done processing incoming data.
166+
'read_loop: loop {
167+
match reader.as_ref().try_read(&mut buf) {
168+
Ok(0) => break 'socket_loop Disconnect::PeerDisconnected,
169+
Ok(len) => {
170+
let read_res = peer_manager.read_event(&mut our_descriptor, &buf[0..len]);
171+
let mut us_lock = us.lock().unwrap();
172+
match read_res {
173+
Ok(pause_read) => {
174+
if pause_read {
175+
us_lock.read_paused = true;
176+
break 'read_loop;
177+
}
178+
},
179+
Err(_) => break 'socket_loop Disconnect::CloseConnection,
168180
}
169181
},
170-
Err(_) => break Disconnect::CloseConnection,
182+
Err(e) if e.kind() == io::ErrorKind::WouldBlock => break 'read_loop,
183+
Err(_) => break 'socket_loop Disconnect::PeerDisconnected,
171184
}
172-
},
173-
Err(_) => break Disconnect::PeerDisconnected,
185+
}
174186
},
175187
}
176188
peer_manager.process_events();
@@ -186,7 +198,7 @@ impl Connection {
186198
}
187199
}
188200

189-
fn new(stream: StdTcpStream) -> (io::ReadHalf<TcpStream>, mpsc::Receiver<()>, mpsc::Receiver<()>, Arc<Mutex<Self>>) {
201+
fn new(stream: StdTcpStream) -> (OwnedReadHalf, mpsc::Receiver<()>, mpsc::Receiver<()>, Arc<Mutex<Self>>) {
190202
// We only ever need a channel of depth 1 here: if we returned a non-full write to the
191203
// PeerManager, we will eventually get notified that there is room in the socket to write
192204
// new bytes, which will generate an event. That event will be popped off the queue before
@@ -198,7 +210,7 @@ impl Connection {
198210
// false.
199211
let (read_waker, read_receiver) = mpsc::channel(1);
200212
stream.set_nonblocking(true).unwrap();
201-
let (reader, writer) = io::split(TcpStream::from_std(stream).unwrap());
213+
let (reader, writer) = TcpStream::from_std(stream).unwrap().into_split();
202214

203215
(reader, write_receiver, read_receiver,
204216
Arc::new(Mutex::new(Self {

0 commit comments

Comments
 (0)