Skip to content

Commit 6222b1f

Browse files
committed
Delay broadcasting Channel Updates until connected to peers
- We might generate channel updates to be broadcasted when we are not connected to any peers to broadcast them to. - This PR ensures to cache them and broadcast them only when we are connected to some peers. Other Changes: 1. Introduce a test. 2. Update the relevant current tests affected by this change. 3. Fix a typo.
1 parent 5bf58f0 commit 6222b1f

File tree

2 files changed

+92
-26
lines changed

2 files changed

+92
-26
lines changed

lightning/src/ln/channelmanager.rs

Lines changed: 83 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1392,6 +1392,10 @@ where
13921392

13931393
pending_offers_messages: Mutex<Vec<PendingOnionMessage<OffersMessage>>>,
13941394

1395+
/// Tracks the channel_update message that were not broadcasted because
1396+
/// we were not connected to any peers.
1397+
pending_broadcast_messages: Mutex<Vec<MessageSendEvent>>,
1398+
13951399
entropy_source: ES,
13961400
node_signer: NS,
13971401
signer_provider: SP,
@@ -2466,6 +2470,7 @@ where
24662470
funding_batch_states: Mutex::new(BTreeMap::new()),
24672471

24682472
pending_offers_messages: Mutex::new(Vec::new()),
2473+
pending_broadcast_messages: Mutex::new(Vec::new()),
24692474

24702475
entropy_source,
24712476
node_signer,
@@ -2957,17 +2962,11 @@ where
29572962
}
29582963
};
29592964
if let Some(update) = update_opt {
2960-
// Try to send the `BroadcastChannelUpdate` to the peer we just force-closed on, but if
2961-
// not try to broadcast it via whatever peer we have.
2962-
let per_peer_state = self.per_peer_state.read().unwrap();
2963-
let a_peer_state_opt = per_peer_state.get(peer_node_id)
2964-
.ok_or(per_peer_state.values().next());
2965-
if let Ok(a_peer_state_mutex) = a_peer_state_opt {
2966-
let mut a_peer_state = a_peer_state_mutex.lock().unwrap();
2967-
a_peer_state.pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate {
2968-
msg: update
2969-
});
2970-
}
2965+
// If we have some Channel Update to broadcast, we cache it and broadcast it later.
2966+
let mut pending_broadcast_messages = self.pending_broadcast_messages.lock().unwrap();
2967+
pending_broadcast_messages.push(events::MessageSendEvent::BroadcastChannelUpdate {
2968+
msg: update
2969+
});
29712970
}
29722971

29732972
Ok(counterparty_node_id)
@@ -8209,7 +8208,7 @@ where
82098208
/// will randomly be placed first or last in the returned array.
82108209
///
82118210
/// Note that even though `BroadcastChannelAnnouncement` and `BroadcastChannelUpdate`
8212-
/// `MessageSendEvent`s are intended to be broadcasted to all peers, they will be pleaced among
8211+
/// `MessageSendEvent`s are intended to be broadcasted to all peers, they will be placed among
82138212
/// the `MessageSendEvent`s to the specific peer they were generated under.
82148213
fn get_and_clear_pending_msg_events(&self) -> Vec<MessageSendEvent> {
82158214
let events = RefCell::new(Vec::new());
@@ -8229,6 +8228,7 @@ where
82298228
result = NotifyOption::DoPersist;
82308229
}
82318230

8231+
let mut is_some_peer_connected = false;
82328232
let mut pending_events = Vec::new();
82338233
let per_peer_state = self.per_peer_state.read().unwrap();
82348234
for (_cp_id, peer_state_mutex) in per_peer_state.iter() {
@@ -8237,6 +8237,15 @@ where
82378237
if peer_state.pending_msg_events.len() > 0 {
82388238
pending_events.append(&mut peer_state.pending_msg_events);
82398239
}
8240+
if peer_state.is_connected {
8241+
is_some_peer_connected = true
8242+
}
8243+
}
8244+
8245+
// Ensure that we are connected to some peers before getting broadcast messages.
8246+
if is_some_peer_connected {
8247+
let mut broadcast_msgs = self.pending_broadcast_messages.lock().unwrap();
8248+
pending_events.append(&mut broadcast_msgs);
82408249
}
82418250

82428251
if !pending_events.is_empty() {
@@ -11149,6 +11158,8 @@ where
1114911158

1115011159
pending_offers_messages: Mutex::new(Vec::new()),
1115111160

11161+
pending_broadcast_messages: Mutex::new(Vec::new()),
11162+
1115211163
entropy_source: args.entropy_source,
1115311164
node_signer: args.node_signer,
1115411165
signer_provider: args.signer_provider,
@@ -11678,12 +11689,62 @@ mod tests {
1167811689
}
1167911690
}
1168011691

11692+
#[test]
11693+
fn test_channel_update_cached() {
11694+
let chanmon_cfgs = create_chanmon_cfgs(3);
11695+
let node_cfgs = create_node_cfgs(3, &chanmon_cfgs);
11696+
let node_chanmgrs = create_node_chanmgrs(3, &node_cfgs, &[None, None, None]);
11697+
let nodes = create_network(3, &node_cfgs, &node_chanmgrs);
11698+
11699+
let chan = create_announced_chan_between_nodes(&nodes, 0, 1);
11700+
11701+
nodes[0].node.force_close_channel_with_peer(&chan.2, &nodes[1].node.get_our_node_id(), None, true).unwrap();
11702+
check_added_monitors!(nodes[0], 1);
11703+
check_closed_event!(nodes[0], 1, ClosureReason::HolderForceClosed, [nodes[1].node.get_our_node_id()], 100000);
11704+
11705+
{
11706+
// Assert that ChannelUpdate message has been added to node[0] pending broadcast messages
11707+
let pending_broadcast_messages= nodes[0].node.pending_broadcast_messages.lock().unwrap();
11708+
assert_eq!(pending_broadcast_messages.len(), 1);
11709+
}
11710+
11711+
// Confirm that the channel_update was not sent immediately to node[1] but was cached.
11712+
let node_1_events = nodes[1].node.get_and_clear_pending_msg_events();
11713+
assert_eq!(node_1_events.len(), 0);
11714+
11715+
// Test that we do not retrieve the pending broadcast messages when we are not connected to any peer
11716+
nodes[0].node.peer_disconnected(&nodes[1].node.get_our_node_id());
11717+
nodes[1].node.peer_disconnected(&nodes[0].node.get_our_node_id());
11718+
11719+
nodes[0].node.peer_disconnected(&nodes[2].node.get_our_node_id());
11720+
nodes[2].node.peer_disconnected(&nodes[0].node.get_our_node_id());
11721+
11722+
let node_0_events = nodes[0].node.get_and_clear_pending_msg_events();
11723+
assert_eq!(node_0_events.len(), 0);
11724+
11725+
// Now we reconnect to a peer
11726+
nodes[0].node.peer_connected(&nodes[2].node.get_our_node_id(), &msgs::Init {
11727+
features: nodes[2].node.init_features(), networks: None, remote_network_address: None
11728+
}, true).unwrap();
11729+
nodes[2].node.peer_connected(&nodes[0].node.get_our_node_id(), &msgs::Init {
11730+
features: nodes[0].node.init_features(), networks: None, remote_network_address: None
11731+
}, false).unwrap();
11732+
11733+
// Confirm that get_and_clear_pending_msg_events correctly captures pending broadcast messages
11734+
let node_0_events = nodes[0].node.get_and_clear_pending_msg_events();
11735+
assert_eq!(node_0_events.len(), 1);
11736+
match &node_0_events[0] {
11737+
MessageSendEvent::BroadcastChannelUpdate { .. } => (),
11738+
_ => panic!("Unexpected event"),
11739+
}
11740+
}
11741+
1168111742
#[test]
1168211743
fn test_drop_disconnected_peers_when_removing_channels() {
11683-
let chanmon_cfgs = create_chanmon_cfgs(2);
11684-
let node_cfgs = create_node_cfgs(2, &chanmon_cfgs);
11685-
let node_chanmgrs = create_node_chanmgrs(2, &node_cfgs, &[None, None]);
11686-
let nodes = create_network(2, &node_cfgs, &node_chanmgrs);
11744+
let chanmon_cfgs = create_chanmon_cfgs(3);
11745+
let node_cfgs = create_node_cfgs(3, &chanmon_cfgs);
11746+
let node_chanmgrs = create_node_chanmgrs(3, &node_cfgs, &[None, None, None]);
11747+
let nodes = create_network(3, &node_cfgs, &node_chanmgrs);
1168711748

1168811749
let chan = create_announced_chan_between_nodes(&nodes, 0, 1);
1168911750

@@ -11700,15 +11761,15 @@ mod tests {
1170011761
// disconnected and the channel between has been force closed.
1170111762
let nodes_0_per_peer_state = nodes[0].node.per_peer_state.read().unwrap();
1170211763
// Assert that nodes[1] isn't removed before `timer_tick_occurred` has been executed.
11703-
assert_eq!(nodes_0_per_peer_state.len(), 1);
11764+
assert_eq!(nodes_0_per_peer_state.len(), 2);
1170411765
assert!(nodes_0_per_peer_state.get(&nodes[1].node.get_our_node_id()).is_some());
1170511766
}
1170611767

1170711768
nodes[0].node.timer_tick_occurred();
1170811769

1170911770
{
1171011771
// Assert that nodes[1] has now been removed.
11711-
assert_eq!(nodes[0].node.per_peer_state.read().unwrap().len(), 0);
11772+
assert_eq!(nodes[0].node.per_peer_state.read().unwrap().len(), 1);
1171211773
}
1171311774
}
1171411775

@@ -12412,11 +12473,11 @@ mod tests {
1241212473

1241312474
#[test]
1241412475
fn test_trigger_lnd_force_close() {
12415-
let chanmon_cfg = create_chanmon_cfgs(2);
12416-
let node_cfg = create_node_cfgs(2, &chanmon_cfg);
12476+
let chanmon_cfg = create_chanmon_cfgs(3);
12477+
let node_cfg = create_node_cfgs(3, &chanmon_cfg);
1241712478
let user_config = test_default_channel_config();
12418-
let node_chanmgr = create_node_chanmgrs(2, &node_cfg, &[Some(user_config), Some(user_config)]);
12419-
let nodes = create_network(2, &node_cfg, &node_chanmgr);
12479+
let node_chanmgr = create_node_chanmgrs(3, &node_cfg, &[Some(user_config), Some(user_config), Some(user_config)]);
12480+
let nodes = create_network(3, &node_cfg, &node_chanmgr);
1242012481

1242112482
// Open a channel, immediately disconnect each other, and broadcast Alice's latest state.
1242212483
let (_, _, chan_id, funding_tx) = create_announced_chan_between_nodes(&nodes, 0, 1);

lightning/src/ln/reorg_tests.rs

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -763,21 +763,21 @@ fn test_htlc_preimage_claim_prev_counterparty_commitment_after_current_counterpa
763763
fn do_test_retries_own_commitment_broadcast_after_reorg(anchors: bool, revoked_counterparty_commitment: bool) {
764764
// Tests that a node will retry broadcasting its own commitment after seeing a confirmed
765765
// counterparty commitment be reorged out.
766-
let mut chanmon_cfgs = create_chanmon_cfgs(2);
766+
let mut chanmon_cfgs = create_chanmon_cfgs(3);
767767
if revoked_counterparty_commitment {
768768
chanmon_cfgs[1].keys_manager.disable_revocation_policy_check = true;
769769
}
770-
let node_cfgs = create_node_cfgs(2, &chanmon_cfgs);
770+
let node_cfgs = create_node_cfgs(3, &chanmon_cfgs);
771771
let mut config = test_default_channel_config();
772772
if anchors {
773773
config.channel_handshake_config.negotiate_anchors_zero_fee_htlc_tx = true;
774774
config.manually_accept_inbound_channels = true;
775775
}
776776
let persister;
777777
let new_chain_monitor;
778-
let node_chanmgrs = create_node_chanmgrs(2, &node_cfgs, &[Some(config), Some(config)]);
778+
let node_chanmgrs = create_node_chanmgrs(3, &node_cfgs, &[Some(config), Some(config), Some(config)]);
779779
let nodes_1_deserialized;
780-
let mut nodes = create_network(2, &node_cfgs, &node_chanmgrs);
780+
let mut nodes = create_network(3, &node_cfgs, &node_chanmgrs);
781781

782782
let (_, _, chan_id, funding_tx) = create_announced_chan_between_nodes(&nodes, 0, 1);
783783

@@ -801,6 +801,11 @@ fn do_test_retries_own_commitment_broadcast_after_reorg(anchors: bool, revoked_c
801801
reload_node!(
802802
nodes[1], config, &serialized_node, &[&serialized_monitor], persister, new_chain_monitor, nodes_1_deserialized
803803
);
804+
805+
// Reconnect node[1] with node[2] to allow successful channel_update broadcast later
806+
nodes[1].node.peer_connected(&nodes[2].node.get_our_node_id(), &Init {
807+
features: nodes[2].node.init_features(), networks: None, remote_network_address: None
808+
}, true).unwrap();
804809
}
805810

806811
// Connect blocks until the HTLC expiry is met, prompting a commitment broadcast by A.

0 commit comments

Comments
 (0)