@@ -44,13 +44,13 @@ pub struct Connection {
44
44
}
45
45
impl Connection {
46
46
fn schedule_read < CMH : ChannelMessageHandler + ' static > ( peer_manager : Arc < peer_handler:: PeerManager < SocketDescriptor < CMH > , Arc < CMH > > > , us : Arc < Mutex < Self > > , reader : futures:: stream:: SplitStream < tokio_codec:: Framed < TcpStream , tokio_codec:: BytesCodec > > ) {
47
- let us_ref = us. clone ( ) ;
48
- let us_close_ref = us. clone ( ) ;
49
- let peer_manager_ref = peer_manager. clone ( ) ;
47
+ let connection = us. clone ( ) ;
48
+ let connection_close = us. clone ( ) ;
49
+ let peer_manager_close = peer_manager. clone ( ) ;
50
50
tokio:: spawn ( reader. for_each ( move |b| {
51
51
let pending_read = b. to_vec ( ) ;
52
52
{
53
- let mut lock = us_ref . lock ( ) . unwrap ( ) ;
53
+ let mut lock = connection . lock ( ) . unwrap ( ) ;
54
54
assert ! ( lock. pending_read. is_empty( ) ) ;
55
55
if lock. read_paused {
56
56
lock. pending_read = pending_read;
@@ -60,31 +60,31 @@ impl Connection {
60
60
}
61
61
}
62
62
//TODO: There's a race where we don't meet the requirements of disconnect_socket if its
63
- //called right here, after we release the us_ref lock in the scope above, but before we
63
+ //called right here, after we release the connection lock in the scope above, but before we
64
64
//call read_event!
65
- match peer_manager. read_event ( & mut SocketDescriptor :: new ( us_ref . clone ( ) , peer_manager. clone ( ) ) , pending_read) {
65
+ match peer_manager. read_event ( & mut SocketDescriptor :: new ( connection . clone ( ) , peer_manager. clone ( ) ) , pending_read) {
66
66
Ok ( pause_read) => {
67
67
if pause_read {
68
- let mut lock = us_ref . lock ( ) . unwrap ( ) ;
68
+ let mut lock = connection . lock ( ) . unwrap ( ) ;
69
69
lock. read_paused = true ;
70
70
}
71
71
} ,
72
72
Err ( e) => {
73
- us_ref . lock ( ) . unwrap ( ) . need_disconnect = false ;
73
+ connection . lock ( ) . unwrap ( ) . need_disconnect = false ;
74
74
return future:: Either :: B ( future:: result ( Err ( std:: io:: Error :: new ( std:: io:: ErrorKind :: InvalidData , e) ) ) ) ;
75
75
}
76
76
}
77
77
78
- if let Err ( e) = us_ref . lock ( ) . unwrap ( ) . event_notify . try_send ( ( ) ) {
78
+ if let Err ( e) = connection . lock ( ) . unwrap ( ) . event_notify . try_send ( ( ) ) {
79
79
// Ignore full errors as we just need them to poll after this point, so if the user
80
80
// hasn't received the last send yet, it doesn't matter.
81
81
assert ! ( e. is_full( ) ) ;
82
82
}
83
83
84
84
future:: Either :: B ( future:: result ( Ok ( ( ) ) ) )
85
85
} ) . then ( move |_| {
86
- if us_close_ref . lock ( ) . unwrap ( ) . need_disconnect {
87
- peer_manager_ref . disconnect_event ( & SocketDescriptor :: new ( us_close_ref , peer_manager_ref . clone ( ) ) ) ;
86
+ if connection_close . lock ( ) . unwrap ( ) . need_disconnect {
87
+ peer_manager_close . disconnect_event ( & SocketDescriptor :: new ( connection_close , peer_manager_close . clone ( ) ) ) ;
88
88
println ! ( "Peer disconnected!" ) ;
89
89
} else {
90
90
println ! ( "We disconnected peer!" ) ;
@@ -101,9 +101,9 @@ impl Connection {
101
101
} ) ) . then ( |_| {
102
102
future:: result ( Ok ( ( ) ) )
103
103
} ) ) ;
104
- let us = Arc :: new ( Mutex :: new ( Self { writer : Some ( send_sink) , event_notify, pending_read : Vec :: new ( ) , read_blocker : None , read_paused : false , need_disconnect : true , id : ID_COUNTER . fetch_add ( 1 , Ordering :: AcqRel ) } ) ) ;
104
+ let connection = Arc :: new ( Mutex :: new ( Self { writer : Some ( send_sink) , event_notify, pending_read : Vec :: new ( ) , read_blocker : None , read_paused : false , need_disconnect : true , id : ID_COUNTER . fetch_add ( 1 , Ordering :: AcqRel ) } ) ) ;
105
105
106
- ( reader, us )
106
+ ( reader, connection )
107
107
}
108
108
109
109
/// Process incoming messages and feed outgoing messages on the provided socket generated by
@@ -112,10 +112,10 @@ impl Connection {
112
112
/// You should poll the Receive end of event_notify and call get_and_clear_pending_events() on
113
113
/// ChannelManager and ChannelMonitor objects.
114
114
pub fn setup_inbound < CMH : ChannelMessageHandler + ' static > ( peer_manager : Arc < peer_handler:: PeerManager < SocketDescriptor < CMH > , Arc < CMH > > > , event_notify : mpsc:: Sender < ( ) > , stream : TcpStream ) {
115
- let ( reader, us ) = Self :: new ( event_notify, stream) ;
115
+ let ( reader, connection ) = Self :: new ( event_notify, stream) ;
116
116
117
- if let Ok ( _) = peer_manager. new_inbound_connection ( SocketDescriptor :: new ( us . clone ( ) , peer_manager. clone ( ) ) ) {
118
- Self :: schedule_read ( peer_manager, us , reader) ;
117
+ if let Ok ( _) = peer_manager. new_inbound_connection ( SocketDescriptor :: new ( connection . clone ( ) , peer_manager. clone ( ) ) ) {
118
+ Self :: schedule_read ( peer_manager, connection , reader) ;
119
119
}
120
120
}
121
121
@@ -126,11 +126,11 @@ impl Connection {
126
126
/// You should poll the Receive end of event_notify and call get_and_clear_pending_events() on
127
127
/// ChannelManager and ChannelMonitor objects.
128
128
pub fn setup_outbound < CMH : ChannelMessageHandler + ' static > ( peer_manager : Arc < peer_handler:: PeerManager < SocketDescriptor < CMH > , Arc < CMH > > > , event_notify : mpsc:: Sender < ( ) > , their_node_id : PublicKey , stream : TcpStream ) {
129
- let ( reader, us ) = Self :: new ( event_notify, stream) ;
129
+ let ( reader, connection ) = Self :: new ( event_notify, stream) ;
130
130
131
- if let Ok ( initial_send) = peer_manager. new_outbound_connection ( their_node_id, SocketDescriptor :: new ( us . clone ( ) , peer_manager. clone ( ) ) ) {
132
- if SocketDescriptor :: new ( us . clone ( ) , peer_manager. clone ( ) ) . send_data ( & initial_send, true ) == initial_send. len ( ) {
133
- Self :: schedule_read ( peer_manager, us , reader) ;
131
+ if let Ok ( initial_send) = peer_manager. new_outbound_connection ( their_node_id, SocketDescriptor :: new ( connection . clone ( ) , peer_manager. clone ( ) ) ) {
132
+ if SocketDescriptor :: new ( connection . clone ( ) , peer_manager. clone ( ) ) . send_data ( & initial_send, true ) == initial_send. len ( ) {
133
+ Self :: schedule_read ( peer_manager, connection , reader) ;
134
134
} else {
135
135
println ! ( "Failed to write first full message to socket!" ) ;
136
136
}
@@ -172,16 +172,16 @@ impl<CMH: ChannelMessageHandler> SocketDescriptor<CMH> {
172
172
impl < CMH : ChannelMessageHandler > peer_handler:: SocketDescriptor for SocketDescriptor < CMH > {
173
173
fn send_data ( & mut self , data : & [ u8 ] , resume_read : bool ) -> usize {
174
174
macro_rules! schedule_read {
175
- ( $us_ref : expr) => {
175
+ ( $descriptor : expr) => {
176
176
tokio:: spawn( future:: lazy( move || -> Result <( ) , ( ) > {
177
177
let mut read_data = Vec :: new( ) ;
178
178
{
179
- let mut us = $us_ref . conn. lock( ) . unwrap( ) ;
180
- mem:: swap( & mut read_data, & mut us . pending_read) ;
179
+ let mut connection = $descriptor . conn. lock( ) . unwrap( ) ;
180
+ mem:: swap( & mut read_data, & mut connection . pending_read) ;
181
181
}
182
182
if !read_data. is_empty( ) {
183
- let mut us_clone = $us_ref . clone( ) ;
184
- match $us_ref . peer_manager. read_event( & mut us_clone , read_data) {
183
+ // let mut us_clone = $descriptor .clone();
184
+ match $descriptor . peer_manager. read_event( & mut $descriptor . clone ( ) , read_data) {
185
185
Ok ( pause_read) => {
186
186
if pause_read { return Ok ( ( ) ) ; }
187
187
} ,
@@ -191,12 +191,12 @@ impl<CMH: ChannelMessageHandler> peer_handler::SocketDescriptor for SocketDescri
191
191
}
192
192
}
193
193
}
194
- let mut us = $us_ref . conn. lock( ) . unwrap( ) ;
195
- if let Some ( sender) = us . read_blocker. take( ) {
194
+ let mut connection = $descriptor . conn. lock( ) . unwrap( ) ;
195
+ if let Some ( sender) = connection . read_blocker. take( ) {
196
196
sender. send( Ok ( ( ) ) ) . unwrap( ) ;
197
197
}
198
- us . read_paused = false ;
199
- if let Err ( e) = us . event_notify. try_send( ( ) ) {
198
+ connection . read_paused = false ;
199
+ if let Err ( e) = connection . event_notify. try_send( ( ) ) {
200
200
// Ignore full errors as we just need them to poll after this point, so if the user
201
201
// hasn't received the last send yet, it doesn't matter.
202
202
assert!( e. is_full( ) ) ;
@@ -206,36 +206,36 @@ impl<CMH: ChannelMessageHandler> peer_handler::SocketDescriptor for SocketDescri
206
206
}
207
207
}
208
208
209
- let mut us = self . conn . lock ( ) . unwrap ( ) ;
209
+ let mut connection = self . conn . lock ( ) . unwrap ( ) ;
210
210
if resume_read {
211
- let us_ref = self . clone ( ) ;
212
- schedule_read ! ( us_ref ) ;
211
+ let descriptor = self . clone ( ) ;
212
+ schedule_read ! ( descriptor ) ;
213
213
}
214
214
if data. is_empty ( ) { return 0 ; }
215
- if us . writer . is_none ( ) {
216
- us . read_paused = true ;
215
+ if connection . writer . is_none ( ) {
216
+ connection . read_paused = true ;
217
217
return 0 ;
218
218
}
219
219
220
220
let mut bytes = bytes:: BytesMut :: with_capacity ( data. len ( ) ) ;
221
221
bytes. put ( data) ;
222
- let write_res = us . writer . as_mut ( ) . unwrap ( ) . start_send ( bytes. freeze ( ) ) ;
222
+ let write_res = connection . writer . as_mut ( ) . unwrap ( ) . start_send ( bytes. freeze ( ) ) ;
223
223
match write_res {
224
224
Ok ( res) => {
225
225
match res {
226
226
AsyncSink :: Ready => {
227
227
data. len ( )
228
228
} ,
229
229
AsyncSink :: NotReady ( _) => {
230
- us . read_paused = true ;
231
- let us_ref = self . clone ( ) ;
232
- tokio:: spawn ( us . writer . take ( ) . unwrap ( ) . flush ( ) . then ( move |writer_res| -> Result < ( ) , ( ) > {
230
+ connection . read_paused = true ;
231
+ let descriptor = self . clone ( ) ;
232
+ tokio:: spawn ( connection . writer . take ( ) . unwrap ( ) . flush ( ) . then ( move |writer_res| -> Result < ( ) , ( ) > {
233
233
if let Ok ( writer) = writer_res {
234
234
{
235
- let mut us = us_ref . conn . lock ( ) . unwrap ( ) ;
236
- us . writer = Some ( writer) ;
235
+ let mut connection = descriptor . conn . lock ( ) . unwrap ( ) ;
236
+ connection . writer = Some ( writer) ;
237
237
}
238
- schedule_read ! ( us_ref ) ;
238
+ schedule_read ! ( descriptor ) ;
239
239
} // we'll fire the disconnect event on the socket reader end
240
240
Ok ( ( ) )
241
241
} ) ) ;
@@ -251,9 +251,9 @@ impl<CMH: ChannelMessageHandler> peer_handler::SocketDescriptor for SocketDescri
251
251
}
252
252
253
253
fn disconnect_socket ( & mut self ) {
254
- let mut us = self . conn . lock ( ) . unwrap ( ) ;
255
- us . need_disconnect = true ;
256
- us . read_paused = true ;
254
+ let mut connection = self . conn . lock ( ) . unwrap ( ) ;
255
+ connection . need_disconnect = true ;
256
+ connection . read_paused = true ;
257
257
}
258
258
}
259
259
impl < CMH : ChannelMessageHandler > Clone for SocketDescriptor < CMH > {
0 commit comments