@@ -397,6 +397,11 @@ pub struct PeerManager<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: De
397
397
/// `peers` write lock to do so, so instead we block on this empty mutex when entering
398
398
/// `process_events`.
399
399
event_processing_lock : Mutex < ( ) > ,
400
+ /// Because event processing is global and always does all available work before returning,
401
+ /// there is no reason for us to have many event processors waiting on the lock at once.
402
+ /// Instead, we limit the total blocked event processors to always exactly one using this
403
+ /// counter.
404
+ blocked_event_processors : AtomicUsize ,
400
405
our_node_secret : SecretKey ,
401
406
ephemeral_key_midstate : Sha256Engine ,
402
407
custom_message_handler : CMH ,
@@ -491,6 +496,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
491
496
} ) ,
492
497
node_id_to_descriptor : Mutex :: new ( HashMap :: new ( ) ) ,
493
498
event_processing_lock : Mutex :: new ( ( ) ) ,
499
+ blocked_event_processors : AtomicUsize :: new ( 0 ) ,
494
500
our_node_secret,
495
501
ephemeral_key_midstate,
496
502
peer_counter_low : AtomicUsize :: new ( 0 ) ,
@@ -1194,7 +1200,22 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
1194
1200
/// [`ChannelManager::process_pending_htlc_forwards`]: crate::ln::channelmanager::ChannelManager::process_pending_htlc_forwards
1195
1201
/// [`send_data`]: SocketDescriptor::send_data
1196
1202
pub fn process_events ( & self ) {
1197
- let _single_processor_lock = self . event_processing_lock . lock ( ) . unwrap ( ) ;
1203
+ let mut _single_processor_lock = self . event_processing_lock . try_lock ( ) ;
1204
+ if _single_processor_lock. is_err ( ) {
1205
+ // While we could wake the older sleeper here with a CV and make more even waiting
1206
+ // times, that would be a lot of overengineering for a simple "reduce total waiter
1207
+ // count" goal.
1208
+ if self . blocked_event_processors . fetch_add ( 1 , Ordering :: AcqRel ) != 0 {
1209
+ self . blocked_event_processors . fetch_sub ( 1 , Ordering :: AcqRel ) ;
1210
+ return ;
1211
+ } else {
1212
+ // We're the only waiter, as the running process_events may have emptied the
1213
+ // pending events "long" ago and there are new events for us to process, wait until
1214
+ // its done and process any leftover events before returning.
1215
+ _single_processor_lock = Ok ( self . event_processing_lock . lock ( ) . unwrap ( ) ) ;
1216
+ self . blocked_event_processors . fetch_sub ( 1 , Ordering :: Release ) ;
1217
+ }
1218
+ }
1198
1219
1199
1220
let mut disconnect_peers = HashMap :: new ( ) ;
1200
1221
let mut events_generated = self . message_handler . chan_handler . get_and_clear_pending_msg_events ( ) ;
0 commit comments