@@ -15,7 +15,7 @@ use util::byte_utils;
15
15
use util:: events:: { MessageSendEvent } ;
16
16
use util:: logger:: Logger ;
17
17
18
- use std:: collections:: { HashMap , hash_map, LinkedList } ;
18
+ use std:: collections:: { HashMap , hash_map, HashSet , LinkedList } ;
19
19
use std:: sync:: { Arc , Mutex } ;
20
20
use std:: sync:: atomic:: { AtomicUsize , Ordering } ;
21
21
use std:: { cmp, error, hash, fmt} ;
@@ -106,17 +106,22 @@ struct Peer {
106
106
107
107
struct PeerHolder < Descriptor : SocketDescriptor > {
108
108
peers : HashMap < Descriptor , Peer > ,
109
+ /// Added to by do_read_event for cases where we pushed a message onto the send buffer but
110
+ /// didn't call do_attempt_write_data to avoid reentrancy. Cleared in process_events()
111
+ peers_needing_send : HashSet < Descriptor > ,
109
112
/// Only add to this set when noise completes:
110
113
node_id_to_descriptor : HashMap < PublicKey , Descriptor > ,
111
114
}
112
115
struct MutPeerHolder < ' a , Descriptor : SocketDescriptor + ' a > {
113
116
peers : & ' a mut HashMap < Descriptor , Peer > ,
117
+ peers_needing_send : & ' a mut HashSet < Descriptor > ,
114
118
node_id_to_descriptor : & ' a mut HashMap < PublicKey , Descriptor > ,
115
119
}
116
120
impl < Descriptor : SocketDescriptor > PeerHolder < Descriptor > {
117
121
fn borrow_parts ( & mut self ) -> MutPeerHolder < Descriptor > {
118
122
MutPeerHolder {
119
123
peers : & mut self . peers ,
124
+ peers_needing_send : & mut self . peers_needing_send ,
120
125
node_id_to_descriptor : & mut self . node_id_to_descriptor ,
121
126
}
122
127
}
@@ -162,7 +167,11 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
162
167
pub fn new ( message_handler : MessageHandler , our_node_secret : SecretKey , logger : Arc < Logger > ) -> PeerManager < Descriptor > {
163
168
PeerManager {
164
169
message_handler : message_handler,
165
- peers : Mutex :: new ( PeerHolder { peers : HashMap :: new ( ) , node_id_to_descriptor : HashMap :: new ( ) } ) ,
170
+ peers : Mutex :: new ( PeerHolder {
171
+ peers : HashMap :: new ( ) ,
172
+ peers_needing_send : HashSet :: new ( ) ,
173
+ node_id_to_descriptor : HashMap :: new ( )
174
+ } ) ,
166
175
our_node_secret : our_node_secret,
167
176
initial_syncs_sent : AtomicUsize :: new ( 0 ) ,
168
177
logger,
@@ -188,7 +197,7 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
188
197
/// Note that if an Err is returned here you MUST NOT call disconnect_event for the new
189
198
/// descriptor but must disconnect the connection immediately.
190
199
///
191
- /// Returns some bytes to send to the remote node.
200
+ /// Returns a small number of bytes to send to the remote node (currently always 50) .
192
201
///
193
202
/// Panics if descriptor is duplicative with some other descriptor which has not yet has a
194
203
/// disconnect_event.
@@ -298,16 +307,12 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
298
307
///
299
308
/// May return an Err to indicate that the connection should be closed.
300
309
///
301
- /// Will very likely call send_data on the descriptor passed in (or a descriptor handed into
302
- /// new_*_connection) before returning. Thus, be very careful with reentrancy issues! The
303
- /// invariants around calling write_event in case a write did not fully complete must still
304
- /// hold. Note that this function will often call send_data on many peers before returning, not
305
- /// just this peer!
310
+ /// Will *not* call back into send_data on any descriptors to avoid reentrancy complexity.
311
+ /// Thus, however, you almost certainly want to call process_events() after any read_event to
312
+ /// generate send_data calls to handle responses.
306
313
///
307
314
/// If Ok(true) is returned, further read_events should not be triggered until a write_event on
308
- /// this file descriptor has resume_read set (preventing DoS issues in the send buffer). Note
309
- /// that this must be true even if a send_data call with resume_read=true was made during the
310
- /// course of this function!
315
+ /// this file descriptor has resume_read set (preventing DoS issues in the send buffer).
311
316
///
312
317
/// Panics if the descriptor was not previously registered in a new_*_connection event.
313
318
pub fn read_event ( & self , peer_descriptor : & mut Descriptor , data : Vec < u8 > ) -> Result < bool , PeerHandleError > {
@@ -347,6 +352,7 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
347
352
{
348
353
log_trace!( self , "Encoding and sending message of type {} to {}" , $msg_code, log_pubkey!( peer. their_node_id. unwrap( ) ) ) ;
349
354
peer. pending_outbound_buffer. push_back( peer. channel_encryptor. encrypt_message( & encode_msg!( $msg, $msg_code) [ ..] ) ) ;
355
+ peers. peers_needing_send. insert( peer_descriptor. clone( ) ) ;
350
356
}
351
357
}
352
358
}
@@ -673,21 +679,21 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
673
679
pause_read
674
680
} ;
675
681
676
- self . process_events ( ) ;
677
-
678
682
Ok ( pause_read)
679
683
}
680
684
681
- /// Checks for any events generated by our handlers and processes them. May be needed after eg
682
- /// calls to ChannelManager::process_pending_htlc_forward.
685
+ /// Checks for any events generated by our handlers and processes them. Includes sending most
686
+ /// response messages as well as messages generated by calls to handler functions directly (eg
687
+ /// functions like ChannelManager::process_pending_htlc_forward or send_payment).
683
688
pub fn process_events ( & self ) {
684
689
{
685
690
// TODO: There are some DoS attacks here where you can flood someone's outbound send
686
691
// buffer by doing things like announcing channels on another node. We should be willing to
687
692
// drop optional-ish messages when send buffers get full!
688
693
689
694
let mut events_generated = self . message_handler . chan_handler . get_and_clear_pending_msg_events ( ) ;
690
- let mut peers = self . peers . lock ( ) . unwrap ( ) ;
695
+ let mut peers_lock = self . peers . lock ( ) . unwrap ( ) ;
696
+ let peers = peers_lock. borrow_parts ( ) ;
691
697
for event in events_generated. drain ( ..) {
692
698
macro_rules! get_peer_for_forwarding {
693
699
( $node_id: expr, $handle_no_such_peer: block) => {
@@ -888,6 +894,7 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
888
894
match * action {
889
895
msgs:: ErrorAction :: DisconnectPeer { ref msg } => {
890
896
if let Some ( mut descriptor) = peers. node_id_to_descriptor . remove ( node_id) {
897
+ peers. peers_needing_send . remove ( & descriptor) ;
891
898
if let Some ( mut peer) = peers. peers . remove ( & descriptor) {
892
899
if let Some ( ref msg) = * msg {
893
900
log_trace ! ( self , "Handling DisconnectPeer HandleError event in peer_handler for node {} with message {}" ,
@@ -923,6 +930,13 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
923
930
}
924
931
}
925
932
}
933
+
934
+ for mut descriptor in peers. peers_needing_send . drain ( ) {
935
+ match peers. peers . get_mut ( & descriptor) {
936
+ Some ( peer) => Self :: do_attempt_write_data ( & mut descriptor, peer) ,
937
+ None => panic ! ( "Inconsistent peers set state!" ) ,
938
+ }
939
+ }
926
940
}
927
941
}
928
942
@@ -938,6 +952,7 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
938
952
939
953
fn disconnect_event_internal ( & self , descriptor : & Descriptor , no_connection_possible : bool ) {
940
954
let mut peers = self . peers . lock ( ) . unwrap ( ) ;
955
+ peers. peers_needing_send . remove ( descriptor) ;
941
956
let peer_option = peers. peers . remove ( descriptor) ;
942
957
match peer_option {
943
958
None => panic ! ( "Descriptor for disconnect_event is not already known to PeerManager" ) ,
0 commit comments