Skip to content

Commit a1d01e5

Browse files
committed
Always process holding cell ChannelMonitorUpdates asynchronously
We currently have two codepaths on most channel update functions - most methods return a set of messages to send a peer iff the `ChannelMonitorUpdate` succeeds, but if it does not we push the messages back into the `Channel` and then pull them back out when the `ChannelMonitorUpdate` completes and send them then. This adds a substantial amount of complexity in very critical codepaths. Instead, here we swap all our channel update codepaths to immediately set the channel-update-required flag and only return a `ChannelMonitorUpdate` to the `ChannelManager`. Internally in the `Channel` we store a queue of `ChannelMonitorUpdate`s, which will become critical in future work to surface pending `ChannelMonitorUpdate`s to users at startup so they can complete. This leaves some redundant work in `Channel` to be cleaned up later. Specifically, we still generate the messages which we will now ignore and regenerate later. This commit updates the `ChannelMonitorUpdate` pipeline when freeing the holding cell, including when initiating shutdown.
1 parent 6168a96 commit a1d01e5

File tree

3 files changed

+79
-84
lines changed

3 files changed

+79
-84
lines changed

lightning/src/ln/chanmon_update_fail_tests.rs

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2600,7 +2600,15 @@ fn test_permanent_error_during_sending_shutdown() {
26002600
chanmon_cfgs[0].persister.set_update_ret(ChannelMonitorUpdateStatus::PermanentFailure);
26012601

26022602
assert!(nodes[0].node.close_channel(&channel_id, &nodes[1].node.get_our_node_id()).is_ok());
2603-
check_closed_broadcast!(nodes[0], true);
2603+
2604+
// We always send the `shutdown` response when initiating a shutdown, even if we immediately
2605+
// close the channel thereafter.
2606+
let msg_events = nodes[0].node.get_and_clear_pending_msg_events();
2607+
assert_eq!(msg_events.len(), 3);
2608+
if let MessageSendEvent::SendShutdown { .. } = msg_events[0] {} else { panic!(); }
2609+
if let MessageSendEvent::BroadcastChannelUpdate { .. } = msg_events[1] {} else { panic!(); }
2610+
if let MessageSendEvent::HandleError { .. } = msg_events[2] {} else { panic!(); }
2611+
26042612
check_added_monitors!(nodes[0], 2);
26052613
check_closed_event!(nodes[0], 1, ClosureReason::ProcessingError { err: "ChannelMonitor storage failure".to_string() });
26062614
}

lightning/src/ln/channel.rs

Lines changed: 27 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -3188,16 +3188,16 @@ impl<Signer: Sign> Channel<Signer> {
31883188
/// Public version of the below, checking relevant preconditions first.
31893189
/// If we're not in a state where freeing the holding cell makes sense, this is a no-op and
31903190
/// returns `(None, Vec::new())`.
3191-
pub fn maybe_free_holding_cell_htlcs<L: Deref>(&mut self, logger: &L) -> Result<(Option<(msgs::CommitmentUpdate, ChannelMonitorUpdate)>, Vec<(HTLCSource, PaymentHash)>), ChannelError> where L::Target: Logger {
3191+
pub fn maybe_free_holding_cell_htlcs<L: Deref>(&mut self, logger: &L) -> (Option<&ChannelMonitorUpdate>, Vec<(HTLCSource, PaymentHash)>) where L::Target: Logger {
31923192
if self.channel_state >= ChannelState::ChannelReady as u32 &&
31933193
(self.channel_state & (ChannelState::AwaitingRemoteRevoke as u32 | ChannelState::PeerDisconnected as u32 | ChannelState::MonitorUpdateInProgress as u32)) == 0 {
31943194
self.free_holding_cell_htlcs(logger)
3195-
} else { Ok((None, Vec::new())) }
3195+
} else { (None, Vec::new()) }
31963196
}
31973197

31983198
/// Frees any pending commitment updates in the holding cell, generating the relevant messages
31993199
/// for our counterparty.
3200-
fn free_holding_cell_htlcs<L: Deref>(&mut self, logger: &L) -> Result<(Option<(msgs::CommitmentUpdate, ChannelMonitorUpdate)>, Vec<(HTLCSource, PaymentHash)>), ChannelError> where L::Target: Logger {
3200+
fn free_holding_cell_htlcs<L: Deref>(&mut self, logger: &L) -> (Option<&ChannelMonitorUpdate>, Vec<(HTLCSource, PaymentHash)>) where L::Target: Logger {
32013201
assert_eq!(self.channel_state & ChannelState::MonitorUpdateInProgress as u32, 0);
32023202
if self.holding_cell_htlc_updates.len() != 0 || self.holding_cell_update_fee.is_some() {
32033203
log_trace!(logger, "Freeing holding cell with {} HTLC updates{} in channel {}", self.holding_cell_htlc_updates.len(),
@@ -3278,16 +3278,16 @@ impl<Signer: Sign> Channel<Signer> {
32783278
}
32793279
}
32803280
if update_add_htlcs.is_empty() && update_fulfill_htlcs.is_empty() && update_fail_htlcs.is_empty() && self.holding_cell_update_fee.is_none() {
3281-
return Ok((None, htlcs_to_fail));
3281+
return (None, htlcs_to_fail);
32823282
}
32833283
let update_fee = if let Some(feerate) = self.holding_cell_update_fee.take() {
32843284
self.send_update_fee(feerate, false, logger)
32853285
} else {
32863286
None
32873287
};
32883288

3289-
let (commitment_signed, mut additional_update) = self.send_commitment_no_status_check(logger)?;
3290-
// send_commitment_no_status_check and get_update_fulfill_htlc may bump latest_monitor_id
3289+
let mut additional_update = self.build_commitment_no_status_check(logger);
3290+
// build_commitment_no_status_check and get_update_fulfill_htlc may bump latest_monitor_id
32913291
// but we want them to be strictly increasing by one, so reset it here.
32923292
self.latest_monitor_update_id = monitor_update.update_id;
32933293
monitor_update.updates.append(&mut additional_update.updates);
@@ -3296,16 +3296,11 @@ impl<Signer: Sign> Channel<Signer> {
32963296
log_bytes!(self.channel_id()), if update_fee.is_some() { "a fee update, " } else { "" },
32973297
update_add_htlcs.len(), update_fulfill_htlcs.len(), update_fail_htlcs.len());
32983298

3299-
Ok((Some((msgs::CommitmentUpdate {
3300-
update_add_htlcs,
3301-
update_fulfill_htlcs,
3302-
update_fail_htlcs,
3303-
update_fail_malformed_htlcs: Vec::new(),
3304-
update_fee,
3305-
commitment_signed,
3306-
}, monitor_update)), htlcs_to_fail))
3299+
self.monitor_updating_paused(false, true, false, Vec::new(), Vec::new(), Vec::new());
3300+
self.pending_monitor_updates.push(monitor_update);
3301+
(Some(self.pending_monitor_updates.last().unwrap()), htlcs_to_fail)
33073302
} else {
3308-
Ok((None, Vec::new()))
3303+
(None, Vec::new())
33093304
}
33103305
}
33113306

@@ -3515,17 +3510,9 @@ impl<Signer: Sign> Channel<Signer> {
35153510
return Ok((Vec::new(), self.pending_monitor_updates.last().unwrap()));
35163511
}
35173512

3518-
match self.free_holding_cell_htlcs(logger)? {
3519-
(Some((mut commitment_update, mut additional_update)), htlcs_to_fail) => {
3520-
commitment_update.update_fail_htlcs.reserve(update_fail_htlcs.len());
3521-
for fail_msg in update_fail_htlcs.drain(..) {
3522-
commitment_update.update_fail_htlcs.push(fail_msg);
3523-
}
3524-
commitment_update.update_fail_malformed_htlcs.reserve(update_fail_malformed_htlcs.len());
3525-
for fail_msg in update_fail_malformed_htlcs.drain(..) {
3526-
commitment_update.update_fail_malformed_htlcs.push(fail_msg);
3527-
}
3528-
3513+
match self.free_holding_cell_htlcs(logger) {
3514+
(Some(_), htlcs_to_fail) => {
3515+
let mut additional_update = self.pending_monitor_updates.pop().unwrap();
35293516
// free_holding_cell_htlcs may bump latest_monitor_id multiple times but we want them to be
35303517
// strictly increasing by one, so decrement it here.
35313518
self.latest_monitor_update_id = monitor_update.update_id;
@@ -5845,8 +5832,12 @@ impl<Signer: Sign> Channel<Signer> {
58455832

58465833
/// Begins the shutdown process, getting a message for the remote peer and returning all
58475834
/// holding cell HTLCs for payment failure.
5848-
pub fn get_shutdown<SP: Deref>(&mut self, signer_provider: &SP, their_features: &InitFeatures, target_feerate_sats_per_kw: Option<u32>)
5849-
-> Result<(msgs::Shutdown, Option<ChannelMonitorUpdate>, Vec<(HTLCSource, PaymentHash)>), APIError>
5835+
///
5836+
/// May jump to the channel being fully shutdown (see [`Self::is_shutdown`]) in which case no
5837+
/// [`ChannelMonitorUpdate`] will be returned).
5838+
pub fn get_shutdown<SP: Deref>(&mut self, signer_provider: &SP, their_features: &InitFeatures,
5839+
target_feerate_sats_per_kw: Option<u32>)
5840+
-> Result<(msgs::Shutdown, Option<&ChannelMonitorUpdate>, Vec<(HTLCSource, PaymentHash)>), APIError>
58505841
where SP::Target: SignerProvider {
58515842
for htlc in self.pending_outbound_htlcs.iter() {
58525843
if let OutboundHTLCState::LocalAnnounced(_) = htlc.state {
@@ -5889,12 +5880,15 @@ impl<Signer: Sign> Channel<Signer> {
58895880

58905881
let monitor_update = if update_shutdown_script {
58915882
self.latest_monitor_update_id += 1;
5892-
Some(ChannelMonitorUpdate {
5883+
let monitor_update = ChannelMonitorUpdate {
58935884
update_id: self.latest_monitor_update_id,
58945885
updates: vec![ChannelMonitorUpdateStep::ShutdownScript {
58955886
scriptpubkey: self.get_closing_scriptpubkey(),
58965887
}],
5897-
})
5888+
};
5889+
self.monitor_updating_paused(false, false, false, Vec::new(), Vec::new(), Vec::new());
5890+
self.pending_monitor_updates.push(monitor_update);
5891+
Some(self.pending_monitor_updates.last().unwrap())
58985892
} else { None };
58995893
let shutdown = msgs::Shutdown {
59005894
channel_id: self.channel_id,
@@ -5915,6 +5909,9 @@ impl<Signer: Sign> Channel<Signer> {
59155909
}
59165910
});
59175911

5912+
debug_assert!(!self.is_shutdown() || monitor_update.is_none(),
5913+
"we can't both complete shutdown and return a monitor update");
5914+
59185915
Ok((shutdown, monitor_update, dropped_outbound_htlcs))
59195916
}
59205917

lightning/src/ln/channelmanager.rs

Lines changed: 43 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -1785,28 +1785,30 @@ where
17851785
}
17861786

17871787
let mut peer_state_lock = peer_state_mutex_opt.unwrap().lock().unwrap();
1788-
let peer_state = &mut *peer_state_lock;
1788+
let peer_state: &mut PeerState<_> = &mut *peer_state_lock;
17891789
match peer_state.channel_by_id.entry(channel_id.clone()) {
17901790
hash_map::Entry::Occupied(mut chan_entry) => {
1791-
let (shutdown_msg, monitor_update, htlcs) = chan_entry.get_mut().get_shutdown(&self.signer_provider, &peer_state.latest_features, target_feerate_sats_per_1000_weight)?;
1791+
let funding_txo_opt = chan_entry.get().get_funding_txo();
1792+
let their_features = &peer_state.latest_features;
1793+
let (shutdown_msg, mut monitor_update_opt, htlcs) = chan_entry.get_mut()
1794+
.get_shutdown(&self.signer_provider, their_features, target_feerate_sats_per_1000_weight)?;
17921795
failed_htlcs = htlcs;
17931796

1794-
// Update the monitor with the shutdown script if necessary.
1795-
if let Some(monitor_update) = monitor_update {
1796-
let update_res = self.chain_monitor.update_channel(chan_entry.get().get_funding_txo().unwrap(), &monitor_update);
1797-
let (result, is_permanent) =
1798-
handle_monitor_update_res!(self, update_res, chan_entry.get_mut(), RAACommitmentOrder::CommitmentFirst, chan_entry.key(), NO_UPDATE);
1799-
if is_permanent {
1800-
remove_channel!(self, chan_entry);
1801-
break result;
1802-
}
1803-
}
1804-
1797+
// We can send the `shutdown` message before updating the `ChannelMonitor`
1798+
// here as we don't need the monitor update to complete until we send a
1799+
// `shutdown_signed`, which we'll delay if we're pending a monitor update.
18051800
peer_state.pending_msg_events.push(events::MessageSendEvent::SendShutdown {
18061801
node_id: *counterparty_node_id,
1807-
msg: shutdown_msg
1802+
msg: shutdown_msg,
18081803
});
18091804

1805+
// Update the monitor with the shutdown script if necessary.
1806+
if let Some(monitor_update) = monitor_update_opt.take() {
1807+
let update_id = monitor_update.update_id;
1808+
let update_res = self.chain_monitor.update_channel(funding_txo_opt.unwrap(), monitor_update);
1809+
break handle_new_monitor_update!(self, update_res, update_id, peer_state_lock, peer_state, chan_entry);
1810+
}
1811+
18101812
if chan_entry.get().is_shutdown() {
18111813
let channel = remove_channel!(self, chan_entry);
18121814
if let Ok(channel_update) = self.get_channel_update_for_broadcast(&channel) {
@@ -5073,49 +5075,37 @@ where
50735075
let mut has_monitor_update = false;
50745076
let mut failed_htlcs = Vec::new();
50755077
let mut handle_errors = Vec::new();
5076-
{
5077-
let per_peer_state = self.per_peer_state.read().unwrap();
5078+
let per_peer_state = self.per_peer_state.read().unwrap();
50785079

5079-
for (_cp_id, peer_state_mutex) in per_peer_state.iter() {
5080+
for (_cp_id, peer_state_mutex) in per_peer_state.iter() {
5081+
'chan_loop: loop {
50805082
let mut peer_state_lock = peer_state_mutex.lock().unwrap();
5081-
let peer_state = &mut *peer_state_lock;
5082-
let pending_msg_events = &mut peer_state.pending_msg_events;
5083-
peer_state.channel_by_id.retain(|channel_id, chan| {
5084-
match chan.maybe_free_holding_cell_htlcs(&self.logger) {
5085-
Ok((commitment_opt, holding_cell_failed_htlcs)) => {
5086-
if !holding_cell_failed_htlcs.is_empty() {
5087-
failed_htlcs.push((
5088-
holding_cell_failed_htlcs,
5089-
*channel_id,
5090-
chan.get_counterparty_node_id()
5091-
));
5092-
}
5093-
if let Some((commitment_update, monitor_update)) = commitment_opt {
5094-
match self.chain_monitor.update_channel(chan.get_funding_txo().unwrap(), &monitor_update) {
5095-
ChannelMonitorUpdateStatus::Completed => {
5096-
pending_msg_events.push(events::MessageSendEvent::UpdateHTLCs {
5097-
node_id: chan.get_counterparty_node_id(),
5098-
updates: commitment_update,
5099-
});
5100-
},
5101-
e => {
5102-
has_monitor_update = true;
5103-
let (res, close_channel) = handle_monitor_update_res!(self, e, chan, RAACommitmentOrder::CommitmentFirst, channel_id, COMMITMENT_UPDATE_ONLY);
5104-
handle_errors.push((chan.get_counterparty_node_id(), res));
5105-
if close_channel { return false; }
5106-
},
5107-
}
5108-
}
5109-
true
5110-
},
5111-
Err(e) => {
5112-
let (close_channel, res) = convert_chan_err!(self, e, chan, channel_id);
5113-
handle_errors.push((chan.get_counterparty_node_id(), Err(res)));
5114-
// ChannelClosed event is generated by handle_error for us
5115-
!close_channel
5083+
let peer_state: &mut PeerState<_> = &mut *peer_state_lock;
5084+
for (channel_id, chan) in peer_state.channel_by_id.iter_mut() {
5085+
let counterparty_node_id = chan.get_counterparty_node_id();
5086+
let funding_txo = chan.get_funding_txo();
5087+
let (monitor_opt, holding_cell_failed_htlcs) =
5088+
chan.maybe_free_holding_cell_htlcs(&self.logger);
5089+
if !holding_cell_failed_htlcs.is_empty() {
5090+
failed_htlcs.push((holding_cell_failed_htlcs, *channel_id, counterparty_node_id));
5091+
}
5092+
if let Some(monitor_update) = monitor_opt {
5093+
has_monitor_update = true;
5094+
5095+
let update_res = self.chain_monitor.update_channel(
5096+
funding_txo.expect("channel is live"), monitor_update);
5097+
let update_id = monitor_update.update_id;
5098+
let channel_id: [u8; 32] = *channel_id;
5099+
let res = handle_new_monitor_update!(self, update_res, update_id,
5100+
peer_state_lock, peer_state, chan, MANUALLY_REMOVING,
5101+
peer_state.channel_by_id.remove(&channel_id));
5102+
if res.is_err() {
5103+
handle_errors.push((counterparty_node_id, res));
51165104
}
5105+
continue 'chan_loop;
51175106
}
5118-
});
5107+
}
5108+
break 'chan_loop;
51195109
}
51205110
}
51215111

0 commit comments

Comments
 (0)