Skip to content

Commit 04a4393

Browse files
committed
Drop tokio/macros dependency in lightning-net-tokio, fix MSRV
The `tokio` `macros` feature depends on `proc-macro2`, which recently broke its MSRV in a patch version. Such crates aren't reasonable for us to have as dependencies, so instead we replace the one trivial use we have of `tokio::select!()` with our own manual future.
1 parent c5c5f3f commit 04a4393

File tree

3 files changed

+99
-21
lines changed

3 files changed

+99
-21
lines changed

lightning-background-processor/Cargo.toml

+1-1
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ lightning = { version = "0.0.116-rc1", path = "../lightning", default-features =
2525
lightning-rapid-gossip-sync = { version = "0.0.116-rc1", path = "../lightning-rapid-gossip-sync", default-features = false }
2626

2727
[dev-dependencies]
28-
tokio = { version = "1.14", features = [ "macros", "rt", "rt-multi-thread", "sync", "time" ] }
28+
tokio = { version = "1.14", features = [ "rt", "rt-multi-thread", "sync", "time" ] }
2929
lightning = { version = "0.0.116-rc1", path = "../lightning", features = ["_test_utils"] }
3030
lightning-invoice = { version = "0.24.0-rc1", path = "../lightning-invoice" }
3131
lightning-persister = { version = "0.0.116-rc1", path = "../lightning-persister" }

lightning-net-tokio/Cargo.toml

+1-1
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ rustdoc-args = ["--cfg", "docsrs"]
1717
[dependencies]
1818
bitcoin = "0.29.0"
1919
lightning = { version = "0.0.116-rc1", path = "../lightning" }
20-
tokio = { version = "1.0", features = [ "io-util", "macros", "rt", "sync", "net", "time" ] }
20+
tokio = { version = "1.0", features = [ "io-util", "rt", "sync", "net", "time" ] }
2121

2222
[dev-dependencies]
2323
tokio = { version = "1.14", features = [ "io-util", "macros", "rt", "rt-multi-thread", "sync", "net", "time" ] }

lightning-net-tokio/src/lib.rs

+97-19
Original file line numberDiff line numberDiff line change
@@ -42,16 +42,79 @@ use lightning::ln::peer_handler::APeerManager;
4242
use lightning::ln::msgs::NetAddress;
4343

4444
use std::ops::Deref;
45-
use std::task;
45+
use std::task::{self, Poll};
46+
use std::future::Future;
4647
use std::net::SocketAddr;
4748
use std::net::TcpStream as StdTcpStream;
4849
use std::sync::{Arc, Mutex};
4950
use std::sync::atomic::{AtomicU64, Ordering};
5051
use std::time::Duration;
52+
use std::pin::Pin;
5153
use std::hash::Hash;
5254

5355
static ID_COUNTER: AtomicU64 = AtomicU64::new(0);
5456

57+
// We only need to select over multiple futures in one place, and taking on the full `tokio/macros`
58+
// dependency tree in order to do so (which has broken our MSRV before) is excessive. Instead, we
59+
// define a trivial two- and three- select macro with the specific types we need and just use that.
60+
61+
pub(crate) enum SelectorOutput {
62+
A(Option<()>), B(Option<()>), C(tokio::io::Result<usize>),
63+
}
64+
65+
pub(crate) struct TwoSelector<
66+
A: Future<Output=Option<()>> + Unpin, B: Future<Output=Option<()>> + Unpin
67+
> {
68+
pub a: A,
69+
pub b: B,
70+
}
71+
72+
impl<
73+
A: Future<Output=Option<()>> + Unpin, B: Future<Output=Option<()>> + Unpin
74+
> Future for TwoSelector<A, B> {
75+
type Output = SelectorOutput;
76+
fn poll(mut self: Pin<&mut Self>, ctx: &mut task::Context<'_>) -> Poll<SelectorOutput> {
77+
match Pin::new(&mut self.a).poll(ctx) {
78+
Poll::Ready(res) => { return Poll::Ready(SelectorOutput::A(res)); },
79+
Poll::Pending => {},
80+
}
81+
match Pin::new(&mut self.b).poll(ctx) {
82+
Poll::Ready(res) => { return Poll::Ready(SelectorOutput::B(res)); },
83+
Poll::Pending => {},
84+
}
85+
Poll::Pending
86+
}
87+
}
88+
89+
pub(crate) struct ThreeSelector<
90+
A: Future<Output=Option<()>> + Unpin, B: Future<Output=Option<()>> + Unpin, C: Future<Output=tokio::io::Result<usize>> + Unpin
91+
> {
92+
pub a: A,
93+
pub b: B,
94+
pub c: C,
95+
}
96+
97+
impl<
98+
A: Future<Output=Option<()>> + Unpin, B: Future<Output=Option<()>> + Unpin, C: Future<Output=tokio::io::Result<usize>> + Unpin
99+
> Future for ThreeSelector<A, B, C> {
100+
type Output = SelectorOutput;
101+
fn poll(mut self: Pin<&mut Self>, ctx: &mut task::Context<'_>) -> Poll<SelectorOutput> {
102+
match Pin::new(&mut self.a).poll(ctx) {
103+
Poll::Ready(res) => { return Poll::Ready(SelectorOutput::A(res)); },
104+
Poll::Pending => {},
105+
}
106+
match Pin::new(&mut self.b).poll(ctx) {
107+
Poll::Ready(res) => { return Poll::Ready(SelectorOutput::B(res)); },
108+
Poll::Pending => {},
109+
}
110+
match Pin::new(&mut self.c).poll(ctx) {
111+
Poll::Ready(res) => { return Poll::Ready(SelectorOutput::C(res)); },
112+
Poll::Pending => {},
113+
}
114+
Poll::Pending
115+
}
116+
}
117+
55118
/// Connection contains all our internal state for a connection - we hold a reference to the
56119
/// Connection object (in an Arc<Mutex<>>) in each SocketDescriptor we create as well as in the
57120
/// read future (which is returned by schedule_read).
@@ -127,29 +190,44 @@ impl Connection {
127190
}
128191
us_lock.read_paused
129192
};
130-
tokio::select! {
131-
v = write_avail_receiver.recv() => {
193+
// TODO: Drop the Box'ing of the futures once Rust has pin-on-stack support.
194+
let select_result = if read_paused {
195+
TwoSelector {
196+
a: Box::pin(write_avail_receiver.recv()),
197+
b: Box::pin(read_wake_receiver.recv()),
198+
}.await
199+
} else {
200+
ThreeSelector {
201+
a: Box::pin(write_avail_receiver.recv()),
202+
b: Box::pin(read_wake_receiver.recv()),
203+
c: Box::pin(reader.read(&mut buf)),
204+
}.await
205+
};
206+
match select_result {
207+
SelectorOutput::A(v) => {
132208
assert!(v.is_some()); // We can't have dropped the sending end, its in the us Arc!
133209
if peer_manager.as_ref().write_buffer_space_avail(&mut our_descriptor).is_err() {
134210
break Disconnect::CloseConnection;
135211
}
136212
},
137-
_ = read_wake_receiver.recv() => {},
138-
read = reader.read(&mut buf), if !read_paused => match read {
139-
Ok(0) => break Disconnect::PeerDisconnected,
140-
Ok(len) => {
141-
let read_res = peer_manager.as_ref().read_event(&mut our_descriptor, &buf[0..len]);
142-
let mut us_lock = us.lock().unwrap();
143-
match read_res {
144-
Ok(pause_read) => {
145-
if pause_read {
146-
us_lock.read_paused = true;
147-
}
148-
},
149-
Err(_) => break Disconnect::CloseConnection,
150-
}
151-
},
152-
Err(_) => break Disconnect::PeerDisconnected,
213+
SelectorOutput::B(_) => {},
214+
SelectorOutput::C(read) => {
215+
match read {
216+
Ok(0) => break Disconnect::PeerDisconnected,
217+
Ok(len) => {
218+
let read_res = peer_manager.as_ref().read_event(&mut our_descriptor, &buf[0..len]);
219+
let mut us_lock = us.lock().unwrap();
220+
match read_res {
221+
Ok(pause_read) => {
222+
if pause_read {
223+
us_lock.read_paused = true;
224+
}
225+
},
226+
Err(_) => break Disconnect::CloseConnection,
227+
}
228+
},
229+
Err(_) => break Disconnect::PeerDisconnected,
230+
}
153231
},
154232
}
155233
let _ = event_waker.try_send(());

0 commit comments

Comments
 (0)