@@ -393,6 +393,10 @@ pub struct PeerManager<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: De
393
393
/// lock held. Entries may be added with only the `peers` read lock held (though the
394
394
/// `Descriptor` value must already exist in `peers`).
395
395
node_id_to_descriptor : Mutex < HashMap < PublicKey , Descriptor > > ,
396
+ /// We can only have one thread processing events at once, but we don't usually need the full
397
+ /// `peers` write lock to do so, so instead we block on this empty mutex when entering
398
+ /// `process_events`.
399
+ event_processing_lock : Mutex < ( ) > ,
396
400
our_node_secret : SecretKey ,
397
401
ephemeral_key_midstate : Sha256Engine ,
398
402
custom_message_handler : CMH ,
@@ -486,6 +490,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
486
490
peers : HashMap :: new ( ) ,
487
491
} ) ,
488
492
node_id_to_descriptor : Mutex :: new ( HashMap :: new ( ) ) ,
493
+ event_processing_lock : Mutex :: new ( ( ) ) ,
489
494
our_node_secret,
490
495
ephemeral_key_midstate,
491
496
peer_counter_low : AtomicUsize :: new ( 0 ) ,
@@ -1189,20 +1194,28 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
1189
1194
/// [`ChannelManager::process_pending_htlc_forwards`]: crate::ln::channelmanager::ChannelManager::process_pending_htlc_forwards
1190
1195
/// [`send_data`]: SocketDescriptor::send_data
1191
1196
pub fn process_events ( & self ) {
1197
+ let _single_processor_lock = self . event_processing_lock . lock ( ) . unwrap ( ) ;
1198
+
1199
+ let mut disconnect_peers = HashMap :: new ( ) ;
1200
+ let mut events_generated = self . message_handler . chan_handler . get_and_clear_pending_msg_events ( ) ;
1201
+ events_generated. append ( & mut self . message_handler . route_handler . get_and_clear_pending_msg_events ( ) ) ;
1202
+
1192
1203
{
1193
1204
// TODO: There are some DoS attacks here where you can flood someone's outbound send
1194
1205
// buffer by doing things like announcing channels on another node. We should be willing to
1195
1206
// drop optional-ish messages when send buffers get full!
1196
1207
1197
- let mut peers_lock = self . peers . write ( ) . unwrap ( ) ;
1198
- let mut events_generated = self . message_handler . chan_handler . get_and_clear_pending_msg_events ( ) ;
1199
- events_generated. append ( & mut self . message_handler . route_handler . get_and_clear_pending_msg_events ( ) ) ;
1200
- let peers = & mut * peers_lock;
1208
+ let peers_lock = self . peers . read ( ) . unwrap ( ) ;
1209
+ let peers = & * peers_lock;
1201
1210
macro_rules! get_peer_for_forwarding {
1202
1211
( $node_id: expr) => {
1203
1212
{
1213
+ if disconnect_peers. get( $node_id) . is_some( ) {
1214
+ // If we've "disconnected" this peer, do not send to it.
1215
+ continue ;
1216
+ }
1204
1217
match self . node_id_to_descriptor. lock( ) . unwrap( ) . get( $node_id) {
1205
- Some ( descriptor) => match peers. peers. get_mut ( & descriptor) {
1218
+ Some ( descriptor) => match peers. peers. get ( & descriptor) {
1206
1219
Some ( peer_mutex) => {
1207
1220
let peer_lock = peer_mutex. lock( ) . unwrap( ) ;
1208
1221
if peer_lock. their_features. is_none( ) {
@@ -1336,28 +1349,10 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
1336
1349
MessageSendEvent :: HandleError { ref node_id, ref action } => {
1337
1350
match * action {
1338
1351
msgs:: ErrorAction :: DisconnectPeer { ref msg } => {
1339
- // Note that since we are holding the peers *write* lock we can
1340
- // remove from node_id_to_descriptor immediately (as no other
1341
- // thread can be holding the peer lock if we have the global write
1342
- // lock).
1343
- if let Some ( mut descriptor) = self . node_id_to_descriptor . lock ( ) . unwrap ( ) . remove ( node_id) {
1344
- if let Some ( peer_mutex) = peers. peers . remove ( & descriptor) {
1345
- if let Some ( ref msg) = * msg {
1346
- log_trace ! ( self . logger, "Handling DisconnectPeer HandleError event in peer_handler for node {} with message {}" ,
1347
- log_pubkey!( node_id) ,
1348
- msg. data) ;
1349
- let mut peer = peer_mutex. lock ( ) . unwrap ( ) ;
1350
- self . enqueue_message ( & mut * peer, msg) ;
1351
- // This isn't guaranteed to work, but if there is enough free
1352
- // room in the send buffer, put the error message there...
1353
- self . do_attempt_write_data ( & mut descriptor, & mut * peer) ;
1354
- } else {
1355
- log_trace ! ( self . logger, "Handling DisconnectPeer HandleError event in peer_handler for node {} with no message" , log_pubkey!( node_id) ) ;
1356
- }
1357
- }
1358
- descriptor. disconnect_socket ( ) ;
1359
- self . message_handler . chan_handler . peer_disconnected ( & node_id, false ) ;
1360
- }
1352
+ // We do not have the peers write lock, so we just store that we're
1353
+ // about to disconenct the peer and do it after we finish
1354
+ // processing most messages.
1355
+ disconnect_peers. insert ( * node_id, msg. clone ( ) ) ;
1361
1356
} ,
1362
1357
msgs:: ErrorAction :: IgnoreAndLog ( level) => {
1363
1358
log_given_level ! ( self . logger, level, "Received a HandleError event to be ignored for node {}" , log_pubkey!( node_id) ) ;
@@ -1392,13 +1387,43 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
1392
1387
}
1393
1388
1394
1389
for ( node_id, msg) in self . custom_message_handler . get_and_clear_pending_msg ( ) {
1390
+ if disconnect_peers. get ( & node_id) . is_some ( ) { continue ; }
1395
1391
self . enqueue_message ( & mut * get_peer_for_forwarding ! ( & node_id) , & msg) ;
1396
1392
}
1397
1393
1398
- for ( descriptor, peer_mutex) in peers. peers . iter_mut ( ) {
1394
+ for ( descriptor, peer_mutex) in peers. peers . iter ( ) {
1399
1395
self . do_attempt_write_data ( & mut ( * descriptor) . clone ( ) , & mut * peer_mutex. lock ( ) . unwrap ( ) ) ;
1400
1396
}
1401
1397
}
1398
+ if !disconnect_peers. is_empty ( ) {
1399
+ let mut peers_lock = self . peers . write ( ) . unwrap ( ) ;
1400
+ let peers = & mut * peers_lock;
1401
+ for ( node_id, msg) in disconnect_peers. drain ( ) {
1402
+ // Note that since we are holding the peers *write* lock we can
1403
+ // remove from node_id_to_descriptor immediately (as no other
1404
+ // thread can be holding the peer lock if we have the global write
1405
+ // lock).
1406
+
1407
+ if let Some ( mut descriptor) = self . node_id_to_descriptor . lock ( ) . unwrap ( ) . remove ( & node_id) {
1408
+ if let Some ( peer_mutex) = peers. peers . remove ( & descriptor) {
1409
+ if let Some ( msg) = msg {
1410
+ log_trace ! ( self . logger, "Handling DisconnectPeer HandleError event in peer_handler for node {} with message {}" ,
1411
+ log_pubkey!( node_id) ,
1412
+ msg. data) ;
1413
+ let mut peer = peer_mutex. lock ( ) . unwrap ( ) ;
1414
+ self . enqueue_message ( & mut * peer, & msg) ;
1415
+ // This isn't guaranteed to work, but if there is enough free
1416
+ // room in the send buffer, put the error message there...
1417
+ self . do_attempt_write_data ( & mut descriptor, & mut * peer) ;
1418
+ } else {
1419
+ log_trace ! ( self . logger, "Handling DisconnectPeer HandleError event in peer_handler for node {} with no message" , log_pubkey!( node_id) ) ;
1420
+ }
1421
+ }
1422
+ descriptor. disconnect_socket ( ) ;
1423
+ self . message_handler . chan_handler . peer_disconnected ( & node_id, false ) ;
1424
+ }
1425
+ }
1426
+ }
1402
1427
}
1403
1428
1404
1429
/// Indicates that the given socket descriptor's connection is now closed.
0 commit comments