@@ -892,7 +892,7 @@ pub(super) struct PeerState<SP: Deref> where SP::Target: SignerProvider {
892
892
/// The peer is currently connected (i.e. we've seen a
893
893
/// [`ChannelMessageHandler::peer_connected`] and no corresponding
894
894
/// [`ChannelMessageHandler::peer_disconnected`].
895
- is_connected: bool,
895
+ pub is_connected: bool,
896
896
}
897
897
898
898
impl <SP: Deref> PeerState<SP> where SP::Target: SignerProvider {
@@ -1392,6 +1392,9 @@ where
1392
1392
1393
1393
pending_offers_messages: Mutex<Vec<PendingOnionMessage<OffersMessage>>>,
1394
1394
1395
+ /// Tracks the message events that are to be broadcasted when we are connected to some peer.
1396
+ pending_broadcast_messages: Mutex<Vec<MessageSendEvent>>,
1397
+
1395
1398
entropy_source: ES,
1396
1399
node_signer: NS,
1397
1400
signer_provider: SP,
@@ -1976,7 +1979,7 @@ macro_rules! handle_error {
1976
1979
match $internal {
1977
1980
Ok(msg) => Ok(msg),
1978
1981
Err(MsgHandleErrInternal { err, shutdown_finish, .. }) => {
1979
- let mut msg_events = Vec::with_capacity(2) ;
1982
+ let mut msg_event = None ;
1980
1983
1981
1984
if let Some((shutdown_res, update_option)) = shutdown_finish {
1982
1985
let counterparty_node_id = shutdown_res.counterparty_node_id;
@@ -1988,7 +1991,8 @@ macro_rules! handle_error {
1988
1991
1989
1992
$self.finish_close_channel(shutdown_res);
1990
1993
if let Some(update) = update_option {
1991
- msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate {
1994
+ let mut pending_broadcast_messages = $self.pending_broadcast_messages.lock().unwrap();
1995
+ pending_broadcast_messages.push(events::MessageSendEvent::BroadcastChannelUpdate {
1992
1996
msg: update
1993
1997
});
1994
1998
}
@@ -1998,17 +2002,17 @@ macro_rules! handle_error {
1998
2002
1999
2003
if let msgs::ErrorAction::IgnoreError = err.action {
2000
2004
} else {
2001
- msg_events.push (events::MessageSendEvent::HandleError {
2005
+ msg_event = Some (events::MessageSendEvent::HandleError {
2002
2006
node_id: $counterparty_node_id,
2003
2007
action: err.action.clone()
2004
2008
});
2005
2009
}
2006
2010
2007
- if !msg_events.is_empty() {
2011
+ if let Some(msg_event) = msg_event {
2008
2012
let per_peer_state = $self.per_peer_state.read().unwrap();
2009
2013
if let Some(peer_state_mutex) = per_peer_state.get(&$counterparty_node_id) {
2010
2014
let mut peer_state = peer_state_mutex.lock().unwrap();
2011
- peer_state.pending_msg_events.append(&mut msg_events );
2015
+ peer_state.pending_msg_events.push(msg_event );
2012
2016
}
2013
2017
}
2014
2018
@@ -2466,6 +2470,7 @@ where
2466
2470
funding_batch_states: Mutex::new(BTreeMap::new()),
2467
2471
2468
2472
pending_offers_messages: Mutex::new(Vec::new()),
2473
+ pending_broadcast_messages: Mutex::new(Vec::new()),
2469
2474
2470
2475
entropy_source,
2471
2476
node_signer,
@@ -2957,17 +2962,11 @@ where
2957
2962
}
2958
2963
};
2959
2964
if let Some(update) = update_opt {
2960
- // Try to send the `BroadcastChannelUpdate` to the peer we just force-closed on, but if
2961
- // not try to broadcast it via whatever peer we have.
2962
- let per_peer_state = self.per_peer_state.read().unwrap();
2963
- let a_peer_state_opt = per_peer_state.get(peer_node_id)
2964
- .ok_or(per_peer_state.values().next());
2965
- if let Ok(a_peer_state_mutex) = a_peer_state_opt {
2966
- let mut a_peer_state = a_peer_state_mutex.lock().unwrap();
2967
- a_peer_state.pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate {
2968
- msg: update
2969
- });
2970
- }
2965
+ // If we have some Channel Update to broadcast, we cache it and broadcast it later.
2966
+ let mut pending_broadcast_messages = self.pending_broadcast_messages.lock().unwrap();
2967
+ pending_broadcast_messages.push(events::MessageSendEvent::BroadcastChannelUpdate {
2968
+ msg: update
2969
+ });
2971
2970
}
2972
2971
2973
2972
Ok(counterparty_node_id)
@@ -4043,6 +4042,7 @@ where
4043
4042
.ok_or_else(|| APIError::ChannelUnavailable { err: format!("Can't find a peer matching the passed counterparty node_id {}", counterparty_node_id) })?;
4044
4043
let mut peer_state_lock = peer_state_mutex.lock().unwrap();
4045
4044
let peer_state = &mut *peer_state_lock;
4045
+
4046
4046
for channel_id in channel_ids {
4047
4047
if !peer_state.has_channel(channel_id) {
4048
4048
return Err(APIError::ChannelUnavailable {
@@ -4059,7 +4059,8 @@ where
4059
4059
}
4060
4060
if let ChannelPhase::Funded(channel) = channel_phase {
4061
4061
if let Ok(msg) = self.get_channel_update_for_broadcast(channel) {
4062
- peer_state.pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate { msg });
4062
+ let mut pending_broadcast_messages = self.pending_broadcast_messages.lock().unwrap();
4063
+ pending_broadcast_messages.push(events::MessageSendEvent::BroadcastChannelUpdate { msg });
4063
4064
} else if let Ok(msg) = self.get_channel_update_for_unicast(channel) {
4064
4065
peer_state.pending_msg_events.push(events::MessageSendEvent::SendChannelUpdate {
4065
4066
node_id: channel.context.get_counterparty_node_id(),
@@ -4969,7 +4970,8 @@ where
4969
4970
if n >= DISABLE_GOSSIP_TICKS {
4970
4971
chan.set_channel_update_status(ChannelUpdateStatus::Disabled);
4971
4972
if let Ok(update) = self.get_channel_update_for_broadcast(&chan) {
4972
- pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate {
4973
+ let mut pending_broadcast_messages = self.pending_broadcast_messages.lock().unwrap();
4974
+ pending_broadcast_messages.push(events::MessageSendEvent::BroadcastChannelUpdate {
4973
4975
msg: update
4974
4976
});
4975
4977
}
@@ -4983,7 +4985,8 @@ where
4983
4985
if n >= ENABLE_GOSSIP_TICKS {
4984
4986
chan.set_channel_update_status(ChannelUpdateStatus::Enabled);
4985
4987
if let Ok(update) = self.get_channel_update_for_broadcast(&chan) {
4986
- pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate {
4988
+ let mut pending_broadcast_messages = self.pending_broadcast_messages.lock().unwrap();
4989
+ pending_broadcast_messages.push(events::MessageSendEvent::BroadcastChannelUpdate {
4987
4990
msg: update
4988
4991
});
4989
4992
}
@@ -6642,9 +6645,8 @@ where
6642
6645
}
6643
6646
if let Some(ChannelPhase::Funded(chan)) = chan_option {
6644
6647
if let Ok(update) = self.get_channel_update_for_broadcast(&chan) {
6645
- let mut peer_state_lock = peer_state_mutex.lock().unwrap();
6646
- let peer_state = &mut *peer_state_lock;
6647
- peer_state.pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate {
6648
+ let mut pending_broadcast_messages = self.pending_broadcast_messages.lock().unwrap();
6649
+ pending_broadcast_messages.push(events::MessageSendEvent::BroadcastChannelUpdate {
6648
6650
msg: update
6649
6651
});
6650
6652
}
@@ -7304,7 +7306,8 @@ where
7304
7306
if let ChannelPhase::Funded(mut chan) = remove_channel_phase!(self, chan_phase_entry) {
7305
7307
failed_channels.push(chan.context.force_shutdown(false, ClosureReason::HolderForceClosed));
7306
7308
if let Ok(update) = self.get_channel_update_for_broadcast(&chan) {
7307
- pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate {
7309
+ let mut pending_broadcast_messages = self.pending_broadcast_messages.lock().unwrap();
7310
+ pending_broadcast_messages.push(events::MessageSendEvent::BroadcastChannelUpdate {
7308
7311
msg: update
7309
7312
});
7310
7313
}
@@ -7489,7 +7492,8 @@ where
7489
7492
// We're done with this channel. We got a closing_signed and sent back
7490
7493
// a closing_signed with a closing transaction to broadcast.
7491
7494
if let Ok(update) = self.get_channel_update_for_broadcast(&chan) {
7492
- pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate {
7495
+ let mut pending_broadcast_messages = self.pending_broadcast_messages.lock().unwrap();
7496
+ pending_broadcast_messages.push(events::MessageSendEvent::BroadcastChannelUpdate {
7493
7497
msg: update
7494
7498
});
7495
7499
}
@@ -8209,7 +8213,7 @@ where
8209
8213
/// will randomly be placed first or last in the returned array.
8210
8214
///
8211
8215
/// Note that even though `BroadcastChannelAnnouncement` and `BroadcastChannelUpdate`
8212
- /// `MessageSendEvent`s are intended to be broadcasted to all peers, they will be pleaced among
8216
+ /// `MessageSendEvent`s are intended to be broadcasted to all peers, they will be placed among
8213
8217
/// the `MessageSendEvent`s to the specific peer they were generated under.
8214
8218
fn get_and_clear_pending_msg_events(&self) -> Vec<MessageSendEvent> {
8215
8219
let events = RefCell::new(Vec::new());
@@ -8229,6 +8233,7 @@ where
8229
8233
result = NotifyOption::DoPersist;
8230
8234
}
8231
8235
8236
+ let mut is_some_peer_connected = false;
8232
8237
let mut pending_events = Vec::new();
8233
8238
let per_peer_state = self.per_peer_state.read().unwrap();
8234
8239
for (_cp_id, peer_state_mutex) in per_peer_state.iter() {
@@ -8237,6 +8242,15 @@ where
8237
8242
if peer_state.pending_msg_events.len() > 0 {
8238
8243
pending_events.append(&mut peer_state.pending_msg_events);
8239
8244
}
8245
+ if peer_state.is_connected {
8246
+ is_some_peer_connected = true
8247
+ }
8248
+ }
8249
+
8250
+ // Ensure that we are connected to some peers before getting broadcast messages.
8251
+ if is_some_peer_connected {
8252
+ let mut broadcast_msgs = self.pending_broadcast_messages.lock().unwrap();
8253
+ pending_events.append(&mut broadcast_msgs);
8240
8254
}
8241
8255
8242
8256
if !pending_events.is_empty() {
@@ -8441,6 +8455,7 @@ where
8441
8455
let mut peer_state_lock = peer_state_mutex.lock().unwrap();
8442
8456
let peer_state = &mut *peer_state_lock;
8443
8457
let pending_msg_events = &mut peer_state.pending_msg_events;
8458
+
8444
8459
peer_state.channel_by_id.retain(|_, phase| {
8445
8460
match phase {
8446
8461
// Retain unfunded channels.
@@ -8513,7 +8528,8 @@ where
8513
8528
let reason_message = format!("{}", reason);
8514
8529
failed_channels.push(channel.context.force_shutdown(true, reason));
8515
8530
if let Ok(update) = self.get_channel_update_for_broadcast(&channel) {
8516
- pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate {
8531
+ let mut pending_broadcast_messages = self.pending_broadcast_messages.lock().unwrap();
8532
+ pending_broadcast_messages.push(events::MessageSendEvent::BroadcastChannelUpdate {
8517
8533
msg: update
8518
8534
});
8519
8535
}
@@ -8960,7 +8976,9 @@ where
8960
8976
// Gossip
8961
8977
&events::MessageSendEvent::SendChannelAnnouncement { .. } => false,
8962
8978
&events::MessageSendEvent::BroadcastChannelAnnouncement { .. } => true,
8963
- &events::MessageSendEvent::BroadcastChannelUpdate { .. } => true,
8979
+ // [`ChannelManager::pending_broadcast_events`] holds the [`BroadcastChannelUpdate`]
8980
+ // This check here is to ensure exhaustivity.
8981
+ &events::MessageSendEvent::BroadcastChannelUpdate { .. } => false,
8964
8982
&events::MessageSendEvent::BroadcastNodeAnnouncement { .. } => true,
8965
8983
&events::MessageSendEvent::SendChannelUpdate { .. } => false,
8966
8984
&events::MessageSendEvent::SendChannelRangeQuery { .. } => false,
@@ -11149,6 +11167,8 @@ where
11149
11167
11150
11168
pending_offers_messages: Mutex::new(Vec::new()),
11151
11169
11170
+ pending_broadcast_messages: Mutex::new(Vec::new()),
11171
+
11152
11172
entropy_source: args.entropy_source,
11153
11173
node_signer: args.node_signer,
11154
11174
signer_provider: args.signer_provider,
@@ -11678,6 +11698,56 @@ mod tests {
11678
11698
}
11679
11699
}
11680
11700
11701
+ #[test]
11702
+ fn test_channel_update_cached() {
11703
+ let chanmon_cfgs = create_chanmon_cfgs(3);
11704
+ let node_cfgs = create_node_cfgs(3, &chanmon_cfgs);
11705
+ let node_chanmgrs = create_node_chanmgrs(3, &node_cfgs, &[None, None, None]);
11706
+ let nodes = create_network(3, &node_cfgs, &node_chanmgrs);
11707
+
11708
+ let chan = create_announced_chan_between_nodes(&nodes, 0, 1);
11709
+
11710
+ nodes[0].node.force_close_channel_with_peer(&chan.2, &nodes[1].node.get_our_node_id(), None, true).unwrap();
11711
+ check_added_monitors!(nodes[0], 1);
11712
+ check_closed_event!(nodes[0], 1, ClosureReason::HolderForceClosed, [nodes[1].node.get_our_node_id()], 100000);
11713
+
11714
+ {
11715
+ // Assert that ChannelUpdate message has been added to node[0] pending broadcast messages
11716
+ let pending_broadcast_messages= nodes[0].node.pending_broadcast_messages.lock().unwrap();
11717
+ assert_eq!(pending_broadcast_messages.len(), 1);
11718
+ }
11719
+
11720
+ // Confirm that the channel_update was not sent immediately to node[1] but was cached.
11721
+ let node_1_events = nodes[1].node.get_and_clear_pending_msg_events();
11722
+ assert_eq!(node_1_events.len(), 0);
11723
+
11724
+ // Test that we do not retrieve the pending broadcast messages when we are not connected to any peer
11725
+ nodes[0].node.peer_disconnected(&nodes[1].node.get_our_node_id());
11726
+ nodes[1].node.peer_disconnected(&nodes[0].node.get_our_node_id());
11727
+
11728
+ nodes[0].node.peer_disconnected(&nodes[2].node.get_our_node_id());
11729
+ nodes[2].node.peer_disconnected(&nodes[0].node.get_our_node_id());
11730
+
11731
+ let node_0_events = nodes[0].node.get_and_clear_pending_msg_events();
11732
+ assert_eq!(node_0_events.len(), 0);
11733
+
11734
+ // Now we reconnect to a peer
11735
+ nodes[0].node.peer_connected(&nodes[2].node.get_our_node_id(), &msgs::Init {
11736
+ features: nodes[2].node.init_features(), networks: None, remote_network_address: None
11737
+ }, true).unwrap();
11738
+ nodes[2].node.peer_connected(&nodes[0].node.get_our_node_id(), &msgs::Init {
11739
+ features: nodes[0].node.init_features(), networks: None, remote_network_address: None
11740
+ }, false).unwrap();
11741
+
11742
+ // Confirm that get_and_clear_pending_msg_events correctly captures pending broadcast messages
11743
+ let node_0_events = nodes[0].node.get_and_clear_pending_msg_events();
11744
+ assert_eq!(node_0_events.len(), 1);
11745
+ match &node_0_events[0] {
11746
+ MessageSendEvent::BroadcastChannelUpdate { .. } => (),
11747
+ _ => panic!("Unexpected event"),
11748
+ }
11749
+ }
11750
+
11681
11751
#[test]
11682
11752
fn test_drop_disconnected_peers_when_removing_channels() {
11683
11753
let chanmon_cfgs = create_chanmon_cfgs(2);
0 commit comments