@@ -2906,6 +2906,95 @@ where
2906
2906
Ok(counterparty_node_id)
2907
2907
}
2908
2908
2909
+ #[cfg(test)]
2910
+ // Function to test the peer removing from per_peer_state midway of a force close.
2911
+ fn test_force_close_channel_with_peer(&self, channel_id: &ChannelId, peer_node_id: &PublicKey, peer_msg: Option<&String>, broadcast: bool)
2912
+ -> Result<PublicKey, APIError> {
2913
+ let per_peer_state = self.per_peer_state.read().unwrap();
2914
+ let peer_state_mutex = per_peer_state.get(peer_node_id)
2915
+ .ok_or_else(|| APIError::ChannelUnavailable { err: format!("Can't find a peer matching the passed counterparty node_id {}", peer_node_id) })?;
2916
+ let (update_opt, counterparty_node_id) = {
2917
+ let mut peer_state = peer_state_mutex.lock().unwrap();
2918
+ let closure_reason = if let Some(peer_msg) = peer_msg {
2919
+ ClosureReason::CounterpartyForceClosed { peer_msg: UntrustedString(peer_msg.to_string()) }
2920
+ } else {
2921
+ ClosureReason::HolderForceClosed
2922
+ };
2923
+ if let hash_map::Entry::Occupied(chan_phase_entry) = peer_state.channel_by_id.entry(channel_id.clone()) {
2924
+ log_error!(self.logger, "Force-closing channel {}", channel_id);
2925
+ self.issue_channel_close_events(&chan_phase_entry.get().context(), closure_reason);
2926
+ let mut chan_phase = remove_channel_phase!(self, chan_phase_entry);
2927
+ mem::drop(peer_state);
2928
+ mem::drop(per_peer_state);
2929
+ match chan_phase {
2930
+ ChannelPhase::Funded(mut chan) => {
2931
+ self.finish_close_channel(chan.context.force_shutdown(broadcast));
2932
+ (self.get_channel_update_for_broadcast(&chan).ok(), chan.context.get_counterparty_node_id())
2933
+ },
2934
+ ChannelPhase::UnfundedOutboundV1(_) | ChannelPhase::UnfundedInboundV1(_) => {
2935
+ self.finish_close_channel(chan_phase.context_mut().force_shutdown(false));
2936
+ // Unfunded channel has no update
2937
+ (None, chan_phase.context().get_counterparty_node_id())
2938
+ },
2939
+ }
2940
+ } else if peer_state.inbound_channel_request_by_id.remove(channel_id).is_some() {
2941
+ log_error!(self.logger, "Force-closing channel {}", &channel_id);
2942
+ // N.B. that we don't send any channel close event here: we
2943
+ // don't have a user_channel_id, and we never sent any opening
2944
+ // events anyway.
2945
+ (None, *peer_node_id)
2946
+ } else {
2947
+ return Err(APIError::ChannelUnavailable{ err: format!("Channel with id {} not found for the passed counterparty node_id {}", channel_id, peer_node_id) });
2948
+ }
2949
+ };
2950
+
2951
+ // Test: The peer_state corresponding to counterparty_node is removed at this point
2952
+ {
2953
+ let mut per_peer_state = self.per_peer_state.write().unwrap();
2954
+ per_peer_state.remove(peer_node_id);
2955
+ }
2956
+
2957
+ if let Some(update) = update_opt {
2958
+ // Try to send the `BroadcastChannelUpdate` to the peer we just force-closed on, but if
2959
+ // not try to broadcast it via whatever peer we are connected to.
2960
+ let brodcast_message_evt = events::MessageSendEvent::BroadcastChannelUpdate {
2961
+ msg: update
2962
+ };
2963
+
2964
+ let per_peer_state = self.per_peer_state.read().unwrap();
2965
+
2966
+ // Attempt to get the a_peer_state_mutex for the peer we force-closed on.
2967
+ let a_peer_state_mutex_opt = per_peer_state
2968
+ .get(peer_node_id)
2969
+ .map(|v| v);
2970
+
2971
+ // If the particular peer is not present, select any random connected peer from the ones we are connected to.
2972
+ let a_peer_state_mutex_opt = a_peer_state_mutex_opt.or_else(|| {
2973
+ per_peer_state
2974
+ .iter()
2975
+ .find(|(_, v)| v.lock().unwrap().is_connected)
2976
+ .map(|(_, v)| v)
2977
+ });
2978
+
2979
+ match a_peer_state_mutex_opt {
2980
+ Some(a_peer_state_mutex) => {
2981
+ // Handle the case where a connected peer is found.
2982
+ let mut a_peer_state = a_peer_state_mutex.lock().unwrap();
2983
+ a_peer_state.pending_msg_events.push(brodcast_message_evt);
2984
+ }
2985
+ None => {
2986
+ // Handle the case where no connected peer is found.
2987
+ let mut pending_broadcast_messages = self.pending_broadcast_messages.lock().unwrap();
2988
+ pending_broadcast_messages.push(brodcast_message_evt);
2989
+ log_info!(self.logger, "Not able to broadcast channel_update of force-closed channel right now.
2990
+ Will try rebroadcasting later.");
2991
+ }
2992
+ }
2993
+ }
2994
+
2995
+ Ok(counterparty_node_id)
2996
+ }
2997
+
2909
2998
fn force_close_sending_error(&self, channel_id: &ChannelId, counterparty_node_id: &PublicKey, broadcast: bool) -> Result<(), APIError> {
2910
2999
let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(self);
2911
3000
match self.force_close_channel_with_peer(channel_id, counterparty_node_id, None, broadcast) {
@@ -9039,16 +9128,12 @@ where
9039
9128
9040
9129
{
9041
9130
// Get pending messages to be broadcasted.
9042
- let broadcast_evts = self.pending_broadcast_messages.lock().unwrap();
9131
+ let mut broadcast_evts: crate::sync::MutexGuard<'_, Vec<MessageSendEvent>> = self.pending_broadcast_messages.lock().unwrap();
9043
9132
9044
9133
// If we have some pending message to broadcast, and we are connected to peers.
9045
- if broadcast_evts.len() > 0 && per_peer_state.len() > 0 {
9046
- let a_peer_state_mutex = per_peer_state.values().next().unwrap();
9047
- let mut a_peer_state = a_peer_state_mutex.lock().unwrap();
9048
-
9049
- a_peer_state.pending_msg_events.extend(broadcast_evts.iter().cloned());
9050
-
9051
- self.pending_broadcast_messages.lock().unwrap().clear();
9134
+ if broadcast_evts.len() > 0 {
9135
+ pending_msg_events.extend(broadcast_evts.iter().cloned());
9136
+ broadcast_evts.clear();
9052
9137
9053
9138
return NotifyOption::DoPersist;
9054
9139
}
@@ -11576,6 +11661,69 @@ mod tests {
11576
11661
}
11577
11662
}
11578
11663
11664
+ fn do_test_rebroadcasting_of_force_close_msg_to_a_peer(connected: bool) {
11665
+ let chanmon_cfgs = create_chanmon_cfgs(3);
11666
+ let node_cfgs = create_node_cfgs(3, &chanmon_cfgs);
11667
+ let node_chanmgrs = create_node_chanmgrs(3, &node_cfgs, &[None, None, None]);
11668
+ let nodes = create_network(3, &node_cfgs, &node_chanmgrs);
11669
+
11670
+ let chan = create_announced_chan_between_nodes(&nodes, 0, 1);
11671
+
11672
+ nodes[0].node.peer_disconnected(&nodes[1].node.get_our_node_id());
11673
+ nodes[1].node.peer_disconnected(&nodes[0].node.get_our_node_id());
11674
+
11675
+ if !connected {
11676
+ nodes[0].node.peer_disconnected(&nodes[2].node.get_our_node_id());
11677
+ nodes[2].node.peer_disconnected(&nodes[0].node.get_our_node_id());
11678
+ }
11679
+
11680
+ nodes[0].node.test_force_close_channel_with_peer(&chan.2, &nodes[1].node.get_our_node_id(), None, true).unwrap();
11681
+ check_added_monitors!(nodes[0], 1);
11682
+ check_closed_event!(nodes[0], 1, ClosureReason::HolderForceClosed, [nodes[1].node.get_our_node_id()], 100000);
11683
+
11684
+ if connected {
11685
+ // Assert that channelUpdate message has been added to node[2] pending msg events
11686
+ let nodes_0_per_peer_state = nodes[0].node.per_peer_state.read().unwrap();
11687
+ let peer_state_2 = nodes_0_per_peer_state.get(&nodes[2].node.get_our_node_id()).unwrap().lock().unwrap();
11688
+ assert_eq!(peer_state_2.pending_msg_events.len(), 1);
11689
+ }
11690
+ else {
11691
+ {
11692
+ // Assert that channelUpdate message has been added to node[2] pending msg events
11693
+ let pending_broadcast_messages= nodes[0].node.pending_broadcast_messages.lock().unwrap();
11694
+ assert_eq!(pending_broadcast_messages.len(), 1);
11695
+ }
11696
+ // Now node 0, and 2 reconnects
11697
+ nodes[0].node.peer_connected(&nodes[2].node.get_our_node_id(), &msgs::Init {
11698
+ features: nodes[1].node.init_features(), networks: None, remote_network_address: None
11699
+ }, true).unwrap();
11700
+ nodes[2].node.peer_connected(&nodes[0].node.get_our_node_id(), &msgs::Init {
11701
+ features: nodes[0].node.init_features(), networks: None, remote_network_address: None
11702
+ }, false).unwrap();
11703
+
11704
+ {
11705
+ // Assert that channelUpdate message has been added to node[2] pending msg events
11706
+ let nodes_0_per_peer_state = nodes[0].node.per_peer_state.read().unwrap();
11707
+ let peer_state_2 = nodes_0_per_peer_state.get(&nodes[2].node.get_our_node_id()).unwrap().lock().unwrap();
11708
+ assert_eq!(peer_state_2.pending_msg_events.len(), 1);
11709
+ }
11710
+
11711
+ {
11712
+ // Assert that channelUpdate message has been added to node[2] pending msg events
11713
+ let pending_broadcast_messages= nodes[0].node.pending_broadcast_messages.lock().unwrap();
11714
+ assert_eq!(pending_broadcast_messages.len(), 0);
11715
+ }
11716
+ }
11717
+
11718
+ let _ = nodes[0].node.get_and_clear_pending_msg_events();
11719
+ }
11720
+
11721
+ #[test]
11722
+ fn test_rebroadcasting_of_force_close_msg_to_a_peer() {
11723
+ do_test_rebroadcasting_of_force_close_msg_to_a_peer(false);
11724
+ do_test_rebroadcasting_of_force_close_msg_to_a_peer(true);
11725
+ }
11726
+
11579
11727
#[test]
11580
11728
fn test_drop_disconnected_peers_when_removing_channels() {
11581
11729
let chanmon_cfgs = create_chanmon_cfgs(2);
0 commit comments