@@ -3421,6 +3421,38 @@ where
3421
3421
true
3422
3422
}
3423
3423
3424
+ /// When a peer disconnects but still has channels, the peer's `peer_state` entry in the
3425
+ /// `per_peer_state` is not removed by the `peer_disconnected` function. If the channels of
3426
+ /// to that peer is later closed while still being disconnected (i.e. force closed), we
3427
+ /// therefore need to remove the peer from `peer_state` separately.
3428
+ /// To avoid having to take the `per_peer_state` `write` lock once the channels are closed, we
3429
+ /// instead remove such peers awaiting removal through this function, which is called on a
3430
+ /// timer through `timer_tick_occurred`, passing the peers disconnected peers with no channels,
3431
+ /// to limit the negative effects on parallelism as much as possible.
3432
+ ///
3433
+ /// Must be called without the `per_peer_state` lock acquired.
3434
+ fn remove_peers_awaiting_removal ( & self , pending_peers_awaiting_removal : HashSet < PublicKey > ) {
3435
+ if pending_peers_awaiting_removal. len ( ) > 0 {
3436
+ let mut per_peer_state = self . per_peer_state . write ( ) . unwrap ( ) ;
3437
+ for counterparty_node_id in pending_peers_awaiting_removal {
3438
+ match per_peer_state. entry ( counterparty_node_id) {
3439
+ hash_map:: Entry :: Occupied ( entry) => {
3440
+ // Remove the entry if the peer is still disconnected and we still
3441
+ // have no channels to the peer.
3442
+ let remove_entry = {
3443
+ let peer_state = entry. get ( ) . lock ( ) . unwrap ( ) ;
3444
+ !peer_state. is_connected && peer_state. channel_by_id . len ( ) == 0
3445
+ } ;
3446
+ if remove_entry {
3447
+ entry. remove_entry ( ) ;
3448
+ }
3449
+ } ,
3450
+ hash_map:: Entry :: Vacant ( _) => { /* The PeerState has already been removed */ }
3451
+ }
3452
+ }
3453
+ }
3454
+ }
3455
+
3424
3456
#[ cfg( any( test, feature = "_test_utils" ) ) ]
3425
3457
/// Process background events, for functional testing
3426
3458
pub fn test_process_background_events ( & self ) {
@@ -3481,6 +3513,7 @@ where
3481
3513
/// the channel.
3482
3514
/// * Expiring a channel's previous `ChannelConfig` if necessary to only allow forwarding HTLCs
3483
3515
/// with the current `ChannelConfig`.
3516
+ /// * Removing peers which have disconnected but and no longer have any channels.
3484
3517
///
3485
3518
/// Note that this may cause reentrancy through `chain::Watch::update_channel` calls or feerate
3486
3519
/// estimate fetches.
@@ -3493,19 +3526,21 @@ where
3493
3526
3494
3527
let mut handle_errors: Vec < ( Result < ( ) , _ > , _ ) > = Vec :: new ( ) ;
3495
3528
let mut timed_out_mpp_htlcs = Vec :: new ( ) ;
3529
+ let mut pending_peers_awaiting_removal = HashSet :: new ( ) ;
3496
3530
{
3497
3531
let per_peer_state = self . per_peer_state . read ( ) . unwrap ( ) ;
3498
3532
for ( counterparty_node_id, peer_state_mutex) in per_peer_state. iter ( ) {
3499
3533
let mut peer_state_lock = peer_state_mutex. lock ( ) . unwrap ( ) ;
3500
3534
let peer_state = & mut * peer_state_lock;
3501
3535
let pending_msg_events = & mut peer_state. pending_msg_events ;
3536
+ let counterparty_node_id = * counterparty_node_id;
3502
3537
peer_state. channel_by_id . retain ( |chan_id, chan| {
3503
3538
let chan_needs_persist = self . update_channel_fee ( chan_id, chan, new_feerate) ;
3504
3539
if chan_needs_persist == NotifyOption :: DoPersist { should_persist = NotifyOption :: DoPersist ; }
3505
3540
3506
3541
if let Err ( e) = chan. timer_check_closing_negotiation_progress ( ) {
3507
3542
let ( needs_close, err) = convert_chan_err ! ( self , e, chan, chan_id) ;
3508
- handle_errors. push ( ( Err ( err) , * counterparty_node_id) ) ;
3543
+ handle_errors. push ( ( Err ( err) , counterparty_node_id) ) ;
3509
3544
if needs_close { return false ; }
3510
3545
}
3511
3546
@@ -3539,8 +3574,13 @@ where
3539
3574
3540
3575
true
3541
3576
} ) ;
3577
+ let peer_should_be_removed = !peer_state. is_connected && peer_state. channel_by_id . len ( ) == 0 ;
3578
+ if peer_should_be_removed {
3579
+ pending_peers_awaiting_removal. insert ( counterparty_node_id) ;
3580
+ }
3542
3581
}
3543
3582
}
3583
+ self . remove_peers_awaiting_removal ( pending_peers_awaiting_removal) ;
3544
3584
3545
3585
self . claimable_payments . lock ( ) . unwrap ( ) . claimable_htlcs . retain ( |payment_hash, ( _, htlcs) | {
3546
3586
if htlcs. is_empty ( ) {
@@ -8116,6 +8156,40 @@ mod tests {
8116
8156
}
8117
8157
}
8118
8158
8159
+ #[ test]
8160
+ fn test_drop_disconnected_peers_when_removing_channels ( ) {
8161
+ let chanmon_cfgs = create_chanmon_cfgs ( 2 ) ;
8162
+ let node_cfgs = create_node_cfgs ( 2 , & chanmon_cfgs) ;
8163
+ let node_chanmgrs = create_node_chanmgrs ( 2 , & node_cfgs, & [ None , None ] ) ;
8164
+ let nodes = create_network ( 2 , & node_cfgs, & node_chanmgrs) ;
8165
+
8166
+ let chan = create_announced_chan_between_nodes ( & nodes, 0 , 1 ) ;
8167
+
8168
+ nodes[ 0 ] . node . peer_disconnected ( & nodes[ 1 ] . node . get_our_node_id ( ) , false ) ;
8169
+ nodes[ 1 ] . node . peer_disconnected ( & nodes[ 0 ] . node . get_our_node_id ( ) , false ) ;
8170
+
8171
+ nodes[ 0 ] . node . force_close_broadcasting_latest_txn ( & chan. 2 , & nodes[ 1 ] . node . get_our_node_id ( ) ) . unwrap ( ) ;
8172
+ check_closed_broadcast ! ( nodes[ 0 ] , true ) ;
8173
+ check_added_monitors ! ( nodes[ 0 ] , 1 ) ;
8174
+ check_closed_event ! ( nodes[ 0 ] , 1 , ClosureReason :: HolderForceClosed ) ;
8175
+
8176
+ {
8177
+ // Assert that nodes[1] is awaiting removal for nodes[0] once nodes[1] has been
8178
+ // disconnected and the channel between has been force closed.
8179
+ let nodes_0_per_peer_state = nodes[ 0 ] . node . per_peer_state . read ( ) . unwrap ( ) ;
8180
+ // Assert that nodes[1] isn't removed before `timer_tick_occurred` has been executed.
8181
+ assert_eq ! ( nodes_0_per_peer_state. len( ) , 1 ) ;
8182
+ assert ! ( nodes_0_per_peer_state. get( & nodes[ 1 ] . node. get_our_node_id( ) ) . is_some( ) ) ;
8183
+ }
8184
+
8185
+ nodes[ 0 ] . node . timer_tick_occurred ( ) ;
8186
+
8187
+ {
8188
+ // Assert that nodes[1] has now been removed.
8189
+ assert_eq ! ( nodes[ 0 ] . node. per_peer_state. read( ) . unwrap( ) . len( ) , 0 ) ;
8190
+ }
8191
+ }
8192
+
8119
8193
#[ test]
8120
8194
fn bad_inbound_payment_hash ( ) {
8121
8195
// Add coverage for checking that a user-provided payment hash matches the payment secret.
0 commit comments