@@ -42,16 +42,79 @@ use lightning::ln::peer_handler::APeerManager;
42
42
use lightning:: ln:: msgs:: NetAddress ;
43
43
44
44
use std:: ops:: Deref ;
45
- use std:: task;
45
+ use std:: task:: { self , Poll } ;
46
+ use std:: future:: Future ;
46
47
use std:: net:: SocketAddr ;
47
48
use std:: net:: TcpStream as StdTcpStream ;
48
49
use std:: sync:: { Arc , Mutex } ;
49
50
use std:: sync:: atomic:: { AtomicU64 , Ordering } ;
50
51
use std:: time:: Duration ;
52
+ use std:: pin:: Pin ;
51
53
use std:: hash:: Hash ;
52
54
53
55
static ID_COUNTER : AtomicU64 = AtomicU64 :: new ( 0 ) ;
54
56
57
+ // We only need to select over multiple futures in one place, and taking on the full `tokio/macros`
58
+ // dependency tree in order to do so (which has broken our MSRV before) is excessive. Instead, we
59
+ // define a trivial two- and three- select macro with the specific types we need and just use that.
60
+
61
+ pub ( crate ) enum SelectorOutput {
62
+ A ( Option < ( ) > ) , B ( Option < ( ) > ) , C ( tokio:: io:: Result < usize > ) ,
63
+ }
64
+
65
+ pub ( crate ) struct TwoSelector <
66
+ A : Future < Output =Option < ( ) > > + Unpin , B : Future < Output =Option < ( ) > > + Unpin
67
+ > {
68
+ pub a : A ,
69
+ pub b : B ,
70
+ }
71
+
72
+ impl <
73
+ A : Future < Output =Option < ( ) > > + Unpin , B : Future < Output =Option < ( ) > > + Unpin
74
+ > Future for TwoSelector < A , B > {
75
+ type Output = SelectorOutput ;
76
+ fn poll ( mut self : Pin < & mut Self > , ctx : & mut task:: Context < ' _ > ) -> Poll < SelectorOutput > {
77
+ match Pin :: new ( & mut self . a ) . poll ( ctx) {
78
+ Poll :: Ready ( res) => { return Poll :: Ready ( SelectorOutput :: A ( res) ) ; } ,
79
+ Poll :: Pending => { } ,
80
+ }
81
+ match Pin :: new ( & mut self . b ) . poll ( ctx) {
82
+ Poll :: Ready ( res) => { return Poll :: Ready ( SelectorOutput :: B ( res) ) ; } ,
83
+ Poll :: Pending => { } ,
84
+ }
85
+ Poll :: Pending
86
+ }
87
+ }
88
+
89
+ pub ( crate ) struct ThreeSelector <
90
+ A : Future < Output =Option < ( ) > > + Unpin , B : Future < Output =Option < ( ) > > + Unpin , C : Future < Output =tokio:: io:: Result < usize > > + Unpin
91
+ > {
92
+ pub a : A ,
93
+ pub b : B ,
94
+ pub c : C ,
95
+ }
96
+
97
+ impl <
98
+ A : Future < Output =Option < ( ) > > + Unpin , B : Future < Output =Option < ( ) > > + Unpin , C : Future < Output =tokio:: io:: Result < usize > > + Unpin
99
+ > Future for ThreeSelector < A , B , C > {
100
+ type Output = SelectorOutput ;
101
+ fn poll ( mut self : Pin < & mut Self > , ctx : & mut task:: Context < ' _ > ) -> Poll < SelectorOutput > {
102
+ match Pin :: new ( & mut self . a ) . poll ( ctx) {
103
+ Poll :: Ready ( res) => { return Poll :: Ready ( SelectorOutput :: A ( res) ) ; } ,
104
+ Poll :: Pending => { } ,
105
+ }
106
+ match Pin :: new ( & mut self . b ) . poll ( ctx) {
107
+ Poll :: Ready ( res) => { return Poll :: Ready ( SelectorOutput :: B ( res) ) ; } ,
108
+ Poll :: Pending => { } ,
109
+ }
110
+ match Pin :: new ( & mut self . c ) . poll ( ctx) {
111
+ Poll :: Ready ( res) => { return Poll :: Ready ( SelectorOutput :: C ( res) ) ; } ,
112
+ Poll :: Pending => { } ,
113
+ }
114
+ Poll :: Pending
115
+ }
116
+ }
117
+
55
118
/// Connection contains all our internal state for a connection - we hold a reference to the
56
119
/// Connection object (in an Arc<Mutex<>>) in each SocketDescriptor we create as well as in the
57
120
/// read future (which is returned by schedule_read).
@@ -127,29 +190,44 @@ impl Connection {
127
190
}
128
191
us_lock. read_paused
129
192
} ;
130
- tokio:: select! {
131
- v = write_avail_receiver. recv( ) => {
193
+ // TODO: Drop the Box'ing of the futures once Rust has pin-on-stack support.
194
+ let select_result = if read_paused {
195
+ TwoSelector {
196
+ a : Box :: pin ( write_avail_receiver. recv ( ) ) ,
197
+ b : Box :: pin ( read_wake_receiver. recv ( ) ) ,
198
+ } . await
199
+ } else {
200
+ ThreeSelector {
201
+ a : Box :: pin ( write_avail_receiver. recv ( ) ) ,
202
+ b : Box :: pin ( read_wake_receiver. recv ( ) ) ,
203
+ c : Box :: pin ( reader. read ( & mut buf) ) ,
204
+ } . await
205
+ } ;
206
+ match select_result {
207
+ SelectorOutput :: A ( v) => {
132
208
assert ! ( v. is_some( ) ) ; // We can't have dropped the sending end, its in the us Arc!
133
209
if peer_manager. as_ref ( ) . write_buffer_space_avail ( & mut our_descriptor) . is_err ( ) {
134
210
break Disconnect :: CloseConnection ;
135
211
}
136
212
} ,
137
- _ = read_wake_receiver. recv( ) => { } ,
138
- read = reader. read( & mut buf) , if !read_paused => match read {
139
- Ok ( 0 ) => break Disconnect :: PeerDisconnected ,
140
- Ok ( len) => {
141
- let read_res = peer_manager. as_ref( ) . read_event( & mut our_descriptor, & buf[ 0 ..len] ) ;
142
- let mut us_lock = us. lock( ) . unwrap( ) ;
143
- match read_res {
144
- Ok ( pause_read) => {
145
- if pause_read {
146
- us_lock. read_paused = true ;
147
- }
148
- } ,
149
- Err ( _) => break Disconnect :: CloseConnection ,
150
- }
151
- } ,
152
- Err ( _) => break Disconnect :: PeerDisconnected ,
213
+ SelectorOutput :: B ( _) => { } ,
214
+ SelectorOutput :: C ( read) => {
215
+ match read {
216
+ Ok ( 0 ) => break Disconnect :: PeerDisconnected ,
217
+ Ok ( len) => {
218
+ let read_res = peer_manager. as_ref ( ) . read_event ( & mut our_descriptor, & buf[ 0 ..len] ) ;
219
+ let mut us_lock = us. lock ( ) . unwrap ( ) ;
220
+ match read_res {
221
+ Ok ( pause_read) => {
222
+ if pause_read {
223
+ us_lock. read_paused = true ;
224
+ }
225
+ } ,
226
+ Err ( _) => break Disconnect :: CloseConnection ,
227
+ }
228
+ } ,
229
+ Err ( _) => break Disconnect :: PeerDisconnected ,
230
+ }
153
231
} ,
154
232
}
155
233
let _ = event_waker. try_send ( ( ) ) ;
0 commit comments