@@ -903,7 +903,7 @@ pub(super) struct PeerState<SP: Deref> where SP::Target: SignerProvider {
903
903
/// The peer is currently connected (i.e. we've seen a
904
904
/// [`ChannelMessageHandler::peer_connected`] and no corresponding
905
905
/// [`ChannelMessageHandler::peer_disconnected`].
906
- is_connected: bool,
906
+ pub is_connected: bool,
907
907
}
908
908
909
909
impl <SP: Deref> PeerState<SP> where SP::Target: SignerProvider {
@@ -1428,6 +1428,9 @@ where
1428
1428
1429
1429
pending_offers_messages: Mutex<Vec<PendingOnionMessage<OffersMessage>>>,
1430
1430
1431
+ /// Tracks the message events that are to be broadcasted when we are connected to some peer.
1432
+ pending_broadcast_messages: Mutex<Vec<MessageSendEvent>>,
1433
+
1431
1434
entropy_source: ES,
1432
1435
node_signer: NS,
1433
1436
signer_provider: SP,
@@ -2019,7 +2022,7 @@ macro_rules! handle_error {
2019
2022
match $internal {
2020
2023
Ok(msg) => Ok(msg),
2021
2024
Err(MsgHandleErrInternal { err, shutdown_finish, .. }) => {
2022
- let mut msg_events = Vec::with_capacity(2) ;
2025
+ let mut msg_event = None ;
2023
2026
2024
2027
if let Some((shutdown_res, update_option)) = shutdown_finish {
2025
2028
let counterparty_node_id = shutdown_res.counterparty_node_id;
@@ -2031,7 +2034,8 @@ macro_rules! handle_error {
2031
2034
2032
2035
$self.finish_close_channel(shutdown_res);
2033
2036
if let Some(update) = update_option {
2034
- msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate {
2037
+ let mut pending_broadcast_messages = $self.pending_broadcast_messages.lock().unwrap();
2038
+ pending_broadcast_messages.push(events::MessageSendEvent::BroadcastChannelUpdate {
2035
2039
msg: update
2036
2040
});
2037
2041
}
@@ -2041,17 +2045,17 @@ macro_rules! handle_error {
2041
2045
2042
2046
if let msgs::ErrorAction::IgnoreError = err.action {
2043
2047
} else {
2044
- msg_events.push (events::MessageSendEvent::HandleError {
2048
+ msg_event = Some (events::MessageSendEvent::HandleError {
2045
2049
node_id: $counterparty_node_id,
2046
2050
action: err.action.clone()
2047
2051
});
2048
2052
}
2049
2053
2050
- if !msg_events.is_empty() {
2054
+ if let Some(msg_event) = msg_event {
2051
2055
let per_peer_state = $self.per_peer_state.read().unwrap();
2052
2056
if let Some(peer_state_mutex) = per_peer_state.get(&$counterparty_node_id) {
2053
2057
let mut peer_state = peer_state_mutex.lock().unwrap();
2054
- peer_state.pending_msg_events.append(&mut msg_events );
2058
+ peer_state.pending_msg_events.push(msg_event );
2055
2059
}
2056
2060
}
2057
2061
@@ -2522,6 +2526,7 @@ where
2522
2526
funding_batch_states: Mutex::new(BTreeMap::new()),
2523
2527
2524
2528
pending_offers_messages: Mutex::new(Vec::new()),
2529
+ pending_broadcast_messages: Mutex::new(Vec::new()),
2525
2530
2526
2531
entropy_source,
2527
2532
node_signer,
@@ -3020,17 +3025,11 @@ where
3020
3025
}
3021
3026
};
3022
3027
if let Some(update) = update_opt {
3023
- // Try to send the `BroadcastChannelUpdate` to the peer we just force-closed on, but if
3024
- // not try to broadcast it via whatever peer we have.
3025
- let per_peer_state = self.per_peer_state.read().unwrap();
3026
- let a_peer_state_opt = per_peer_state.get(peer_node_id)
3027
- .ok_or(per_peer_state.values().next());
3028
- if let Ok(a_peer_state_mutex) = a_peer_state_opt {
3029
- let mut a_peer_state = a_peer_state_mutex.lock().unwrap();
3030
- a_peer_state.pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate {
3031
- msg: update
3032
- });
3033
- }
3028
+ // If we have some Channel Update to broadcast, we cache it and broadcast it later.
3029
+ let mut pending_broadcast_messages = self.pending_broadcast_messages.lock().unwrap();
3030
+ pending_broadcast_messages.push(events::MessageSendEvent::BroadcastChannelUpdate {
3031
+ msg: update
3032
+ });
3034
3033
}
3035
3034
3036
3035
Ok(counterparty_node_id)
@@ -4113,6 +4112,7 @@ where
4113
4112
.ok_or_else(|| APIError::ChannelUnavailable { err: format!("Can't find a peer matching the passed counterparty node_id {}", counterparty_node_id) })?;
4114
4113
let mut peer_state_lock = peer_state_mutex.lock().unwrap();
4115
4114
let peer_state = &mut *peer_state_lock;
4115
+
4116
4116
for channel_id in channel_ids {
4117
4117
if !peer_state.has_channel(channel_id) {
4118
4118
return Err(APIError::ChannelUnavailable {
@@ -4129,7 +4129,8 @@ where
4129
4129
}
4130
4130
if let ChannelPhase::Funded(channel) = channel_phase {
4131
4131
if let Ok(msg) = self.get_channel_update_for_broadcast(channel) {
4132
- peer_state.pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate { msg });
4132
+ let mut pending_broadcast_messages = self.pending_broadcast_messages.lock().unwrap();
4133
+ pending_broadcast_messages.push(events::MessageSendEvent::BroadcastChannelUpdate { msg });
4133
4134
} else if let Ok(msg) = self.get_channel_update_for_unicast(channel) {
4134
4135
peer_state.pending_msg_events.push(events::MessageSendEvent::SendChannelUpdate {
4135
4136
node_id: channel.context.get_counterparty_node_id(),
@@ -5179,7 +5180,8 @@ where
5179
5180
if n >= DISABLE_GOSSIP_TICKS {
5180
5181
chan.set_channel_update_status(ChannelUpdateStatus::Disabled);
5181
5182
if let Ok(update) = self.get_channel_update_for_broadcast(&chan) {
5182
- pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate {
5183
+ let mut pending_broadcast_messages = self.pending_broadcast_messages.lock().unwrap();
5184
+ pending_broadcast_messages.push(events::MessageSendEvent::BroadcastChannelUpdate {
5183
5185
msg: update
5184
5186
});
5185
5187
}
@@ -5193,7 +5195,8 @@ where
5193
5195
if n >= ENABLE_GOSSIP_TICKS {
5194
5196
chan.set_channel_update_status(ChannelUpdateStatus::Enabled);
5195
5197
if let Ok(update) = self.get_channel_update_for_broadcast(&chan) {
5196
- pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate {
5198
+ let mut pending_broadcast_messages = self.pending_broadcast_messages.lock().unwrap();
5199
+ pending_broadcast_messages.push(events::MessageSendEvent::BroadcastChannelUpdate {
5197
5200
msg: update
5198
5201
});
5199
5202
}
@@ -6919,9 +6922,8 @@ where
6919
6922
}
6920
6923
if let Some(ChannelPhase::Funded(chan)) = chan_option {
6921
6924
if let Ok(update) = self.get_channel_update_for_broadcast(&chan) {
6922
- let mut peer_state_lock = peer_state_mutex.lock().unwrap();
6923
- let peer_state = &mut *peer_state_lock;
6924
- peer_state.pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate {
6925
+ let mut pending_broadcast_messages = self.pending_broadcast_messages.lock().unwrap();
6926
+ pending_broadcast_messages.push(events::MessageSendEvent::BroadcastChannelUpdate {
6925
6927
msg: update
6926
6928
});
6927
6929
}
@@ -7606,7 +7608,8 @@ where
7606
7608
};
7607
7609
failed_channels.push(chan.context.force_shutdown(false, reason.clone()));
7608
7610
if let Ok(update) = self.get_channel_update_for_broadcast(&chan) {
7609
- pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate {
7611
+ let mut pending_broadcast_messages = self.pending_broadcast_messages.lock().unwrap();
7612
+ pending_broadcast_messages.push(events::MessageSendEvent::BroadcastChannelUpdate {
7610
7613
msg: update
7611
7614
});
7612
7615
}
@@ -7791,7 +7794,8 @@ where
7791
7794
// We're done with this channel. We got a closing_signed and sent back
7792
7795
// a closing_signed with a closing transaction to broadcast.
7793
7796
if let Ok(update) = self.get_channel_update_for_broadcast(&chan) {
7794
- pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate {
7797
+ let mut pending_broadcast_messages = self.pending_broadcast_messages.lock().unwrap();
7798
+ pending_broadcast_messages.push(events::MessageSendEvent::BroadcastChannelUpdate {
7795
7799
msg: update
7796
7800
});
7797
7801
}
@@ -8551,7 +8555,7 @@ where
8551
8555
/// will randomly be placed first or last in the returned array.
8552
8556
///
8553
8557
/// Note that even though `BroadcastChannelAnnouncement` and `BroadcastChannelUpdate`
8554
- /// `MessageSendEvent`s are intended to be broadcasted to all peers, they will be pleaced among
8558
+ /// `MessageSendEvent`s are intended to be broadcasted to all peers, they will be placed among
8555
8559
/// the `MessageSendEvent`s to the specific peer they were generated under.
8556
8560
fn get_and_clear_pending_msg_events(&self) -> Vec<MessageSendEvent> {
8557
8561
let events = RefCell::new(Vec::new());
@@ -8571,6 +8575,7 @@ where
8571
8575
result = NotifyOption::DoPersist;
8572
8576
}
8573
8577
8578
+ let mut is_any_peer_connected = false;
8574
8579
let mut pending_events = Vec::new();
8575
8580
let per_peer_state = self.per_peer_state.read().unwrap();
8576
8581
for (_cp_id, peer_state_mutex) in per_peer_state.iter() {
@@ -8579,6 +8584,15 @@ where
8579
8584
if peer_state.pending_msg_events.len() > 0 {
8580
8585
pending_events.append(&mut peer_state.pending_msg_events);
8581
8586
}
8587
+ if peer_state.is_connected {
8588
+ is_any_peer_connected = true
8589
+ }
8590
+ }
8591
+
8592
+ // Ensure that we are connected to some peers before getting broadcast messages.
8593
+ if is_any_peer_connected {
8594
+ let mut broadcast_msgs = self.pending_broadcast_messages.lock().unwrap();
8595
+ pending_events.append(&mut broadcast_msgs);
8582
8596
}
8583
8597
8584
8598
if !pending_events.is_empty() {
@@ -8783,6 +8797,7 @@ where
8783
8797
let mut peer_state_lock = peer_state_mutex.lock().unwrap();
8784
8798
let peer_state = &mut *peer_state_lock;
8785
8799
let pending_msg_events = &mut peer_state.pending_msg_events;
8800
+
8786
8801
peer_state.channel_by_id.retain(|_, phase| {
8787
8802
match phase {
8788
8803
// Retain unfunded channels.
@@ -8858,7 +8873,8 @@ where
8858
8873
let reason_message = format!("{}", reason);
8859
8874
failed_channels.push(channel.context.force_shutdown(true, reason));
8860
8875
if let Ok(update) = self.get_channel_update_for_broadcast(&channel) {
8861
- pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate {
8876
+ let mut pending_broadcast_messages = self.pending_broadcast_messages.lock().unwrap();
8877
+ pending_broadcast_messages.push(events::MessageSendEvent::BroadcastChannelUpdate {
8862
8878
msg: update
8863
8879
});
8864
8880
}
@@ -9315,7 +9331,12 @@ where
9315
9331
// Gossip
9316
9332
&events::MessageSendEvent::SendChannelAnnouncement { .. } => false,
9317
9333
&events::MessageSendEvent::BroadcastChannelAnnouncement { .. } => true,
9318
- &events::MessageSendEvent::BroadcastChannelUpdate { .. } => true,
9334
+ // [`ChannelManager::pending_broadcast_events`] holds the [`BroadcastChannelUpdate`]
9335
+ // This check here is to ensure exhaustivity.
9336
+ &events::MessageSendEvent::BroadcastChannelUpdate { .. } => {
9337
+ debug_assert!(false, "This event shouldn't have been here");
9338
+ false
9339
+ },
9319
9340
&events::MessageSendEvent::BroadcastNodeAnnouncement { .. } => true,
9320
9341
&events::MessageSendEvent::SendChannelUpdate { .. } => false,
9321
9342
&events::MessageSendEvent::SendChannelRangeQuery { .. } => false,
@@ -11602,6 +11623,8 @@ where
11602
11623
11603
11624
pending_offers_messages: Mutex::new(Vec::new()),
11604
11625
11626
+ pending_broadcast_messages: Mutex::new(Vec::new()),
11627
+
11605
11628
entropy_source: args.entropy_source,
11606
11629
node_signer: args.node_signer,
11607
11630
signer_provider: args.signer_provider,
@@ -12133,6 +12156,61 @@ mod tests {
12133
12156
}
12134
12157
}
12135
12158
12159
+ #[test]
12160
+ fn test_channel_update_cached() {
12161
+ let chanmon_cfgs = create_chanmon_cfgs(3);
12162
+ let node_cfgs = create_node_cfgs(3, &chanmon_cfgs);
12163
+ let node_chanmgrs = create_node_chanmgrs(3, &node_cfgs, &[None, None, None]);
12164
+ let nodes = create_network(3, &node_cfgs, &node_chanmgrs);
12165
+
12166
+ let chan = create_announced_chan_between_nodes(&nodes, 0, 1);
12167
+
12168
+ nodes[0].node.force_close_channel_with_peer(&chan.2, &nodes[1].node.get_our_node_id(), None, true).unwrap();
12169
+ check_added_monitors!(nodes[0], 1);
12170
+ check_closed_event!(nodes[0], 1, ClosureReason::HolderForceClosed, [nodes[1].node.get_our_node_id()], 100000);
12171
+
12172
+ // Confirm that the channel_update was not sent immediately to node[1] but was cached.
12173
+ let node_1_events = nodes[1].node.get_and_clear_pending_msg_events();
12174
+ assert_eq!(node_1_events.len(), 0);
12175
+
12176
+ {
12177
+ // Assert that ChannelUpdate message has been added to node[0] pending broadcast messages
12178
+ let pending_broadcast_messages= nodes[0].node.pending_broadcast_messages.lock().unwrap();
12179
+ assert_eq!(pending_broadcast_messages.len(), 1);
12180
+ }
12181
+
12182
+ // Test that we do not retrieve the pending broadcast messages when we are not connected to any peer
12183
+ nodes[0].node.peer_disconnected(&nodes[1].node.get_our_node_id());
12184
+ nodes[1].node.peer_disconnected(&nodes[0].node.get_our_node_id());
12185
+
12186
+ nodes[0].node.peer_disconnected(&nodes[2].node.get_our_node_id());
12187
+ nodes[2].node.peer_disconnected(&nodes[0].node.get_our_node_id());
12188
+
12189
+ let node_0_events = nodes[0].node.get_and_clear_pending_msg_events();
12190
+ assert_eq!(node_0_events.len(), 0);
12191
+
12192
+ // Now we reconnect to a peer
12193
+ nodes[0].node.peer_connected(&nodes[2].node.get_our_node_id(), &msgs::Init {
12194
+ features: nodes[2].node.init_features(), networks: None, remote_network_address: None
12195
+ }, true).unwrap();
12196
+ nodes[2].node.peer_connected(&nodes[0].node.get_our_node_id(), &msgs::Init {
12197
+ features: nodes[0].node.init_features(), networks: None, remote_network_address: None
12198
+ }, false).unwrap();
12199
+
12200
+ // Confirm that get_and_clear_pending_msg_events correctly captures pending broadcast messages
12201
+ let node_0_events = nodes[0].node.get_and_clear_pending_msg_events();
12202
+ assert_eq!(node_0_events.len(), 1);
12203
+ match &node_0_events[0] {
12204
+ MessageSendEvent::BroadcastChannelUpdate { .. } => (),
12205
+ _ => panic!("Unexpected event"),
12206
+ }
12207
+ {
12208
+ // Assert that ChannelUpdate message has been cleared from nodes[0] pending broadcast messages
12209
+ let pending_broadcast_messages= nodes[0].node.pending_broadcast_messages.lock().unwrap();
12210
+ assert_eq!(pending_broadcast_messages.len(), 0);
12211
+ }
12212
+ }
12213
+
12136
12214
#[test]
12137
12215
fn test_drop_disconnected_peers_when_removing_channels() {
12138
12216
let chanmon_cfgs = create_chanmon_cfgs(2);
0 commit comments